using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Collections.ObjectModel; using System.Diagnostics; using System.Linq; using System.Reflection; using System.Text; using System.Threading; using System.Threading.Tasks; using System.Timers; using Tango.Core; using Tango.Core.ExtensionMethods; using Tango.Insights; using Tango.Integration.Operation; using Tango.Logging; using Tango.PMR.Diagnostics; using Tango.PMR.Insights; using Tango.Telemetry.Reporting; using Tango.Telemetry.Telemetries; namespace Tango.Telemetry { /// /// TelemetryPublisher is responsible for collecting telemetry data from sources, /// queuing it, storing it if needed, and publishing it to one or more destinations. /// It supports streaming, historical, and retry-based telemetry flows. /// public class TelemetryPublisher : ExtendedObject, ITelemetryPublisher { /// /// Occurs before a telemetry package is published to a destination. /// public event EventHandler PublishingPackage; /// /// Occurs when a telemetry package has been successfully published to a destination. /// public event EventHandler PackagePublished; /// /// Occurs when a telemetry package fails to publish to a destination. /// public event EventHandler PublishPackageFailed; /// /// Occurs when a telemetry publish operation has completed and a publish result is available, /// indicating the success or failure status for each destination. /// public event EventHandler PublishResultAvailable; // Timer to periodically check and publish pending telemetry from local storage private System.Timers.Timer _pendingStorageCheckTimer; // Timer to periodically fetch historical data from ITelemetryHistorySource private System.Timers.Timer _historicalDataTimer; // Indicates if the publisher has been disposed protected bool _isDisposed; // Background thread responsible for dequeuing and publishing telemetry private Thread _publishThread; // Source used to tag telemetry loaded from pending storage private TelemetryPendingStorageSource _pendingStorageSource; //Timer responsible for triggering periodic cleanup of the published telemetries cache, private System.Timers.Timer _publishedTelemetriesCacheCleanupTimer; private List _pastResults; #region Properties /// /// Indicates whether the publisher is actively running. /// public bool IsStarted { get; private set; } /// /// Publisher configuration containing telemetry parameters and limits. /// public TelemetryPublisherConfiguration Config { get; } /// /// Manages persistence of telemetry data (e.g., LiteDB). /// public ITelemetryStorageManager StorageManager { get; } private List InnerSources { get; } /// /// Public read-only access to telemetry sources. /// public ReadOnlyCollection Sources { get; } private List InnerDestinations { get; } /// /// Public read-only access to telemetry destinations. /// public ReadOnlyCollection Destinations { get; } /// /// Manages telemetry queuing between ingestion and publish phases. /// public ITelemetryQueueManager QueueManager { get; private set; } /// /// Gets the client used for remote checkpoint recovery. /// public ITelemetryCheckpointsRecoveryClient CheckpointsRecoveryClient { get; } #endregion #region Constructor /// /// Initializes the telemetry publisher with default storage and queue managers. /// public TelemetryPublisher(TelemetryPublisherConfiguration config, ITelemetryCheckpointsRecoveryClient checkPointsRecoveryClient) { _pastResults = new List(); Config = config ?? new TelemetryPublisherConfiguration(); _pendingStorageSource = new TelemetryPendingStorageSource(); InnerSources = new List(); Sources = new ReadOnlyCollection(InnerSources); InnerDestinations = new List(); Destinations = new ReadOnlyCollection(InnerDestinations); _publishThread = new Thread(PublishThreadMethod); _publishThread.IsBackground = true; CheckpointsRecoveryClient = checkPointsRecoveryClient; StorageManager = new TelemetryLiteDBStorageManager(); QueueManager = new TelemetryInMemoryQueueManager(); } /// /// Initializes the telemetry publisher with custom storage and queue managers. /// public TelemetryPublisher(ITelemetryStorageManager storageManager, ITelemetryCheckpointsRecoveryClient checkPointsRecoveryClient, ITelemetryQueueManager queueManager, TelemetryPublisherConfiguration config) : this(config, checkPointsRecoveryClient) { StorageManager = storageManager; QueueManager = queueManager; } #endregion #region Sources /// /// Registers a telemetry source, such as a streaming or historical source. /// public void RegisterSource(ITelemetrySource source) { if (source == null) return; if (InnerSources.Exists(x => x.GetType() == source.GetType())) { LogManager.Log($"Telemetry source {source.Name} has already been registered. Ignoring.", LogCategory.Warning); return; } InnerSources.Add(source); if (source is ITelemetryStreamingSource streamingSource) { streamingSource.TelemetryAvailable += StreamingSource_TelemetryAvailable; if (IsStarted) { try { streamingSource.Start(); } catch (Exception ex) { LogManager.Log(ex, $"Error starting telemetry source {source.Name}."); } } } LogManager.Log($"Telemetry source {source.Name} registered."); } /// /// Callback when a telemetry streaming source emits new telemetry. /// private void StreamingSource_TelemetryAvailable(object sender, TelemetryAvailableEventArgs e) { if (_isDisposed) return; var source = sender as ITelemetrySource; if (source != null) { LogManager.Log($"Telemetry stream received {source.Name} -> {e.TelemetryObject.ToTelemetryName()}.", LogCategory.Debug); PushTelemetryPackage(source, e.TelemetryObject, TelemetrySourceTypes.Streaming, e.DisableDeliveryRetries); } } #endregion #region Destinations /// /// Registers a telemetry destination, such as a cloud service or local database. /// public void RegisterDestination(ITelemetryDestination destination) { if (destination == null) return; if (InnerDestinations.Exists(x => x.Name == destination.Name)) { LogManager.Log($"Telemetry destination with name {destination.Name} has already been registered. Ignoring.", LogCategory.Warning); return; } InnerDestinations.Add(destination); LogManager.Log($"Telemetry destination {destination.Name} registered."); } #endregion #region Start / Stop /// /// Starts all timers, threads, and streaming sources for publishing telemetry. /// public async Task Start() { if (!IsStarted) { try { LogManager.Log($"Starting telemetry publisher...\nConfig:\n{Config.ToJsonString()}\nSources: {String.Join(", ", Sources.Select(x => x.Name))}\nDestinations: {String.Join(", ", Destinations.Select(x => x.Name))}"); Config.Validate(); Validate(); IsStarted = true; await StorageManager.Init(CheckpointsRecoveryClient); if (_pendingStorageCheckTimer == null) { _pendingStorageCheckTimer = new System.Timers.Timer(); _pendingStorageCheckTimer.Interval = Config.PendingStorageCheckInterval.TotalMilliseconds; _pendingStorageCheckTimer.Elapsed += PendingStorageCheckTimer_Elapsed; } _pendingStorageCheckTimer.Start(); if (_historicalDataTimer == null) { _historicalDataTimer = new System.Timers.Timer(); _historicalDataTimer.Interval = Config.HistorySourcesRequestInterval.TotalMilliseconds; _historicalDataTimer.Elapsed += HistoricalDataTimer_Elapsed; } _historicalDataTimer.Start(); if (_publishedTelemetriesCacheCleanupTimer == null) { _publishedTelemetriesCacheCleanupTimer = new System.Timers.Timer(); _publishedTelemetriesCacheCleanupTimer.Interval = Config.PublishedTelemetriesCacheCleanupInterval.TotalMilliseconds; _publishedTelemetriesCacheCleanupTimer.Elapsed += PublishedTelemetriesCacheCleanupTimer_Elapsed; } _publishedTelemetriesCacheCleanupTimer.Start(); _publishThread.Start(); InnerSources.OfType().ToList().ForEach(x => x.Start()); LogManager.Log($"Telemetry publisher started."); } catch (Exception ex) { LogManager.Log(ex, "Error starting telemetry publisher."); await Stop(); throw; } } } /// /// Stops all activity and releases threads and sources gracefully. /// public Task Stop() { if (IsStarted) { IsStarted = false; LogManager.Log("Stopping telemetry publisher..."); InnerSources.OfType().ToList().ForEach(x => { try { x.Stop(); } catch (Exception ex) { LogManager.Log(ex, $"Error while trying to stop telemetry source {x.Name}."); } }); _pendingStorageCheckTimer?.Stop(); _historicalDataTimer?.Stop(); QueueManager?.Enqueue(null); LogManager.Log("Telemetry publisher stopped."); } return Task.FromResult(true); } /// /// Performs runtime validation of configuration, sources, and destinations. /// public void Validate() { // Validate all registered sources foreach (var source in InnerSources) { if (string.IsNullOrWhiteSpace(source.Name)) { throw new ArgumentException("A registered telemetry source has an invalid or missing Name."); } } // Validate all registered destinations foreach (var destination in InnerDestinations) { if (string.IsNullOrWhiteSpace(destination.Name)) { throw new ArgumentException("A registered telemetry destination has an invalid or missing Name."); } if (destination.SupportedSourceTypes == null || !destination.SupportedSourceTypes.Any()) { throw new InvalidOperationException($"Telemetry destination '{destination.Name}' must support at least one telemetry source."); } } // Validate StorageManager if (StorageManager == null) { throw new NullReferenceException("StorageManager is not configured."); } // Validate QueueManager if (QueueManager == null) { throw new NullReferenceException("QueueManager is not configured."); } } #endregion #region Timers /// /// Periodically invoked to process telemetry from persistent local storage. /// private async void PendingStorageCheckTimer_Elapsed(object sender, ElapsedEventArgs e) { LogManager.Log("Pending storage check timer elapsed. Starting flush operation for pending telemetries.", LogCategory.Debug); _pendingStorageCheckTimer.Stop(); try { var results = await FlushPendingTelemetries(Config.MaxPendingStorageTelemetriesPerCycle); LogManager.Log($"Flush operation completed. {results.Count} telemetry package(s) processed from pending storage.", LogCategory.Debug); } catch (Exception ex) { LogManager.Log(ex, LogCategory.Error, "Exception occurred while flushing pending telemetry packages."); } finally { _pendingStorageCheckTimer.Start(); LogManager.Log("Pending storage check timer restarted.", LogCategory.Debug); } } /// /// Periodically invoked to fetch and push historical data from history sources. /// private async void HistoricalDataTimer_Elapsed(object sender, ElapsedEventArgs e) { LogManager.Log("Historical data timer elapsed. Checking for available capacity...", LogCategory.Debug); _historicalDataTimer.Stop(); try { int queueCount = QueueManager.Count; int storageCount = StorageManager.GetPendingTelemetriesCount(); LogManager.Log($"Current queue count: {queueCount}, pending storage count: {storageCount}", LogCategory.Debug); if (queueCount < Config.MaxPendingTelemetries && storageCount < Config.MaxPendingTelemetries) { foreach (var source in InnerSources.OfType().ToList()) { try { TelemetryHistorySourceCheckPoint checkpoint = StorageManager.GetHistorySourceCheckPoint(source); if (checkpoint == null) { checkpoint = new TelemetryHistorySourceCheckPoint(); checkpoint.SourceName = source.Name; checkpoint.Time = source.Direction == TelemetryHistorySourceDirection.Ascending ? DateTime.MinValue : DateTime.MaxValue; } LogManager.Log($"Evaluating history source '{source.Name}' at checkpoint time {checkpoint?.Time:u}", LogCategory.Debug); if (await source.CanRequestHistory(checkpoint.Time)) { List historyTelemetries = new List(); if (source.Direction == TelemetryHistorySourceDirection.Ascending) { historyTelemetries = (await source.RequestHistory(checkpoint.Time)).OrderBy(x => x.Time).ToList(); } else { historyTelemetries = (await source.RequestHistory(checkpoint.Time)).OrderByDescending(x => x.Time).ToList(); } LogManager.Log($"History source '{source.Name}' returned {historyTelemetries.Count} telemetry items.", LogCategory.Debug); foreach (var telemetry in historyTelemetries) { await PushTelemetryPackageAwait(source, telemetry, TelemetrySourceTypes.ExternalStorage); checkpoint.Time = telemetry.Time; checkpoint.TotalCount++; StorageManager.SetHistorySourceCheckPoint(source, checkpoint.Time, checkpoint.TotalCount); } LogManager.Log($"Checkpoint updated for source '{source.Name}': time = {checkpoint.Time:u}, total = {checkpoint.TotalCount}", LogCategory.Debug); } else { LogManager.Log($"History request for source '{source.Name}' was not permitted at checkpoint time {checkpoint?.Time:u}", LogCategory.Debug); } } catch (Exception ex) { LogManager.Log(ex, LogCategory.Error, $"Exception while processing history for source '{source?.Name}'"); } } } else { LogManager.Log("Historical data fetch skipped due to max pending telemetry limit reached.", LogCategory.Debug); } } catch (Exception ex) { LogManager.Log(ex, LogCategory.Critical, "Unexpected error during HistoricalDataTimer_Elapsed."); } finally { _historicalDataTimer.Start(); LogManager.Log("Historical data timer restarted.", LogCategory.Debug); } } /// /// Handles the elapsed event of the published telemetries cache cleanup timer. /// Determines the earliest checkpoint across all history sources and removes published telemetry entries /// older than that point to keep the cache size manageable over time. /// private void PublishedTelemetriesCacheCleanupTimer_Elapsed(object sender, ElapsedEventArgs e) { LogManager.Log("Published telemetry cache cleanup timer elapsed. Starting cleanup process...", LogCategory.Debug); _publishedTelemetriesCacheCleanupTimer.Stop(); try { var checkPoints = StorageManager.GetHistorySourcesCheckPoints(); LogManager.Log($"Retrieved {checkPoints.Count} source checkpoints for cleanup evaluation.", LogCategory.Debug); if (checkPoints.Count > 0) { DateTime olderThan = checkPoints.Min(x => x.Time); LogManager.Log($"Initiating cleanup of published telemetries older than {olderThan:u}.", LogCategory.Debug); StorageManager.PerformPublishedTelemetriesCleanUp(olderThan); LogManager.Log("Published telemetry cache cleanup completed successfully.", LogCategory.Debug); } else { LogManager.Log("No checkpoints found. Cleanup skipped.", LogCategory.Debug); } } catch (Exception ex) { LogManager.Log(ex, LogCategory.Error, "Exception occurred during published telemetry cache cleanup."); } finally { _publishedTelemetriesCacheCleanupTimer.Start(); LogManager.Log("Published telemetry cache cleanup timer restarted.", LogCategory.Debug); } } #endregion #region Push /// /// Enqueues telemetry into the system based on a source and type. /// private TelemetryPublishPackage PushTelemetryPackage(ITelemetrySource source, ITelemetry telemetry, TelemetrySourceTypes sourceType, bool disableDeliveryRetries = false) { PendingTelemetry pendingTelemetry = new PendingTelemetry(); pendingTelemetry.Created = DateTime.UtcNow; pendingTelemetry.Source = source.Name; pendingTelemetry.SourceType = sourceType; pendingTelemetry.TelemetryObject = telemetry; var package = new TelemetryPublishPackage() { Source = source, PendingTelemetry = pendingTelemetry, SourceType = sourceType, DisableDeliveryRetries = disableDeliveryRetries }; PushTelemetryPackage(package); return package; } /// /// Enqueues telemetry and returns a task that resolves when it is published. /// private Task PushTelemetryPackageAwait(ITelemetrySource source, ITelemetry telemetry, TelemetrySourceTypes sourceType) { return PushTelemetryPackage(source, telemetry, sourceType).CompletionSource.Task; } /// /// Enqueues an already-wrapped package for background publishing. /// private void PushTelemetryPackage(TelemetryPublishPackage package) { QueueManager.Enqueue(package); } /// /// Enqueues a wrapped package and awaits publish result asynchronously. /// private Task PushTelemetryPackageAwait(TelemetryPublishPackage package) { PushTelemetryPackage(package); return package.CompletionSource.Task; } #endregion #region Publish /// /// Background thread method to publish telemetry from the queue. /// private async void PublishThreadMethod() { while (IsStarted) { TelemetryPublishPackage package = QueueManager.Dequeue(); if (package == null) { QueueManager.Clear(); return; } try { await PublishTelemetryPackage(package); } catch { Thread.Sleep(1000); } } } // This method is responsible for publishing a telemetry package to all configured destinations. // It handles per-destination retry logic, exponential backoff, availability checks, and result reporting. // The goal is to guarantee eventual delivery of telemetry with robust fault tolerance and observability. protected virtual async Task PublishTelemetryPackage(TelemetryPublishPackage package) { LogManager.Log($"Starting publish process for telemetry package from source '{package.Source?.Name}' with type '{package.SourceType}'", LogCategory.Debug); Stopwatch totalWatch = Stopwatch.StartNew(); // Start measuring total publish duration var result = new TelemetryPublishResult(); // Result container with per-destination feedback result.Source = package.Source; result.SourceType = package.SourceType; // Abort early if the publisher is inactive if (!IsStarted || _isDisposed) { LogManager.Log("Publish attempt skipped because the publisher is not started or has been disposed.", LogCategory.Warning); package.CompletionSource.SetResult(result); return result; } //Marking the telemetry as published to avoid duplication from streaming and history sources that can produce the same telemetry. if (package.Source.RequiresTelemetryDuplicationTracking) { if (StorageManager.IsTelemetryInPublishedCache(package.PendingTelemetry.TelemetryObject)) { LogManager.Log("Publish attempt skipped because the telemetry was already published.", LogCategory.Warning); package.CompletionSource.SetResult(result); return result; } else { StorageManager.AddToPublishedTelemetryCache(package.PendingTelemetry.TelemetryObject); } } // Prepare standard metadata properties attached to all telemetry sent var telemetryName = package.PendingTelemetry.TelemetryObject.ToTelemetryName(); var telemetryVersion = package.PendingTelemetry.TelemetryObject.ToTelemetryVersion(); List> properties = new List>(); properties.Add(new KeyValuePair("SerialNumber", Config.SerialNumber)); properties.Add(new KeyValuePair("MachineType", Config.MachineType.ToShortName())); properties.Add(new KeyValuePair("Organization", Config.Organization)); properties.Add(new KeyValuePair("Site", Config.Site)); properties.Add(new KeyValuePair("Environment", Config.Environment)); properties.Add(new KeyValuePair("Type", telemetryName)); //Setting telemetry package basic properties for destination.. package.TelemetryName = telemetryName; package.TelemetryVersion = telemetryVersion; package.SerialNumber = Config.SerialNumber; package.Environment = Config.Environment; package.Organization = Config.Organization; package.Site = Config.Site; package.MachineType = Config.MachineType.ToShortName(); var now = DateTime.UtcNow; // Capture timestamp once for all retry logic List pendingDestinations = package.PendingTelemetry.PendingDestinations.ToList(); // If this is a fresh package, initialize pending destinations if (package.SourceType == TelemetrySourceTypes.Streaming || package.SourceType == TelemetrySourceTypes.ExternalStorage) { LogManager.Log("Evaluating destinations for initial pending destination registration...", LogCategory.Debug); foreach (var destination in Destinations) { if (destination.SupportedSourceTypes.Contains(package.SourceType)) { if (!pendingDestinations.Exists(x => x.Name == destination.Name)) { pendingDestinations.Add(new TelemetryPendingDestination { Name = destination.Name, RetryCount = 0, LastAttempt = DateTime.MinValue, NextEligibleAttempt = now }); LogManager.Log($"Added destination '{destination.Name}' to pending destinations.", LogCategory.Debug); } } } } // Try publishing to each valid destination foreach (var destination in Destinations.Where(x => x.SupportedSourceTypes.Contains(package.SourceType))) { var pendingEntry = pendingDestinations.FirstOrDefault(x => x.Name == destination.Name); if (pendingEntry == null) continue; // Skip destinations not pending for this package // Prepare result tracking for this destination var destinationResult = new TelemetryPublishResult.DestinationResult(); destinationResult.Destination = destination; destinationResult.RetryCount = pendingEntry.RetryCount; destinationResult.RetryDelay = TimeSpan.FromSeconds(Math.Max(0, (pendingEntry.NextEligibleAttempt - now).TotalSeconds)); result.DestinationsResults.Add(destinationResult); // If we're still in a backoff delay, skip for now if (Config.EnableBackoff && now < pendingEntry.NextEligibleAttempt) { destinationResult.Status = TelemetryPublishResult.DestinationStatus.Postponed; destinationResult.ElapsedTime = TimeSpan.Zero; LogManager.Log($"Skipping '{destination.Name}' until {pendingEntry.NextEligibleAttempt:O} (backoff in effect).", LogCategory.Debug); continue; } Stopwatch destinationWatch = Stopwatch.StartNew(); // Measure this attempt duration try { // Remove destination from pending list so we can re-add it if needed after this attempt pendingDestinations.RemoveAll(x => x.Name == destination.Name); LogManager.Log($"Attempting to publish to destination '{destination.Name}'...", LogCategory.Debug); // Allow event handlers to cancel or inspect the publish if (OnPublishingPackage(package, destination)) { // Ensure destination is ready before sending if (await destination.IsAvailable()) { await destination.Publish(package, properties); // Perform publish OnPackagePublished(package, destination); // Notify success event destinationWatch.Stop(); destinationResult.RetryDelay = TimeSpan.Zero; destinationResult.Status = TelemetryPublishResult.DestinationStatus.Passed; destinationResult.ElapsedTime = destinationWatch.Elapsed; LogManager.Log($"Successfully published to '{destination.Name}' in {destinationResult.ElapsedTime.TotalMilliseconds} ms.", LogCategory.Debug); } else { // Mark as temporarily unavailable and schedule retry destinationWatch.Stop(); destinationResult.Status = TelemetryPublishResult.DestinationStatus.Unavailable; destinationResult.ElapsedTime = destinationWatch.Elapsed; LogManager.Log($"Destination '{destination.Name}' is unavailable.", LogCategory.Warning); if (destination.SupportedSourceTypes.Contains(TelemetrySourceTypes.PendingStorage)) { pendingEntry.RetryCount++; pendingEntry.LastAttempt = now; int delay = Math.Min((int)Math.Pow(2, pendingEntry.RetryCount), (int)Config.MaxExponentialBackoff.TotalSeconds); pendingEntry.NextEligibleAttempt = now.AddSeconds(delay); LogManager.Log($"Scheduled retry to '{destination.Name}' in {delay} seconds.", LogCategory.Debug); pendingDestinations.Add(pendingEntry); } else { LogManager.Log($"'{destination.Name}' is not retryable. Removed from pending.", LogCategory.Debug); } } } } catch (Exception ex) { // Log unexpected failure and retry if supported destinationWatch.Stop(); destinationResult.Status = TelemetryPublishResult.DestinationStatus.Failed; destinationResult.Error = ex; destinationResult.ElapsedTime = destinationWatch.Elapsed; LogManager.Log(ex, $"Error publishing telemetry to '{destination.Name}'."); OnPackagePublishFailed(package, destination, ex); if (destination.SupportedSourceTypes.Contains(TelemetrySourceTypes.PendingStorage)) { pendingEntry.RetryCount++; pendingEntry.LastAttempt = now; int delay = Math.Min((int)Math.Pow(2, pendingEntry.RetryCount), (int)Config.MaxExponentialBackoff.TotalSeconds); pendingEntry.NextEligibleAttempt = now.AddSeconds(delay); LogManager.Log($"Scheduled retry to '{destination.Name}' in {delay} seconds due to failure.", LogCategory.Debug); pendingDestinations.Add(pendingEntry); } else { LogManager.Log($"'{destination.Name}' is not retryable. Removed from pending after failure.", LogCategory.Debug); } } } // Save retry state back into the package package.PendingTelemetry.PendingDestinations = pendingDestinations; if (!package.DisableDeliveryRetries) { // Remove from storage if all destinations succeeded; otherwise persist state if (package.PendingTelemetry.PendingDestinations.Count == 0) { LogManager.Log("Deleting successfully published telemetry from storage.", LogCategory.Debug); StorageManager.DeletePendingTelemetry(package.PendingTelemetry); } else { LogManager.Log("Saving telemetry package for future retry or tracking.", LogCategory.Debug); StorageManager.UpsertPendingTelemetry(package.PendingTelemetry); } } // Finalize result and notify completion totalWatch.Stop(); result.TotalElapsedTime = totalWatch.Elapsed; LogManager.Log($"Completed publish process for telemetry from source '{package.Source?.Name}' in {result.TotalElapsedTime.TotalMilliseconds} ms.", LogCategory.Debug); //Add results for reporting _pastResults.Add(result); //Set task result for once that are awaiting from outside this method. package.CompletionSource.SetResult(result); //Raising final event OnPublishResultAvailable(package, result); return result; } #endregion #region Flush /// /// Flushes up to the specified number of pending telemetries from local storage, /// attempting to publish them immediately. This can be used to force a retry of previously failed or postponed telemetry packages. /// /// The maximum number of pending telemetry packages to flush. /// /// A task that represents the asynchronous flush operation, returning a list of publish results for the flushed packages. /// public async Task> FlushPendingTelemetries(int maxCount) { if (!IsStarted || _isDisposed) { LogManager.Log("FlushPendingTelemetries called while publisher is not started or already disposed. Operation aborted.", LogCategory.Warning); return new List(); } var batch = StorageManager.GetPendingTelemetries(maxCount); List results = new List(); if (batch.Count > 0) { LogManager.Log($"Flushing {batch.Count} pending telemetry package(s).", LogCategory.Info); foreach (var pendingTelemetry in batch) { try { var package = new TelemetryPublishPackage() { Source = _pendingStorageSource, PendingTelemetry = pendingTelemetry, SourceType = TelemetrySourceTypes.PendingStorage }; var result = await PushTelemetryPackageAwait(package); results.Add(result); LogManager.Log( $"Flushed telemetry to destinations: {string.Join(", ", result.DestinationsResults.Select(r => $"{r.Destination.Name}={r.Status}"))}", LogCategory.Debug); } catch (Exception ex) { LogManager.Log(ex, LogCategory.Error, "Exception occurred while flushing a pending telemetry package."); } if (!IsStarted || _isDisposed) { LogManager.Log("Flush operation interrupted: publisher is no longer active.", LogCategory.Warning); return results; } } if (results.Any(x => x.DestinationsResults.Any(y => y.Status == TelemetryPublishResult.DestinationStatus.Passed))) { LogManager.Log("FlushPendingTelemetries completed successfully.", LogCategory.Info); } } return results; } #endregion #region Reporting /// /// Generates a detailed telemetry report summarizing the current state of the telemetry system. /// The report includes statistics on published and pending telemetry, as well as per-source and per-destination results. /// public Task GetTelemetryReport() { return Task.Factory.StartNew(() => { TelemetryReport report = new TelemetryReport { GeneratedAt = DateTime.UtcNow, TotalPending = StorageManager.GetPendingTelemetriesCount() }; var results = _pastResults.ToList(); report.TotalPublished = results.Count; foreach (var result in results) { var sourceType = result.SourceType; var sourceName = result.Source?.Name ?? "UnknownSource"; if (!report.SourceTypes.TryGetValue(sourceType, out var sourceTypeSummary)) { sourceTypeSummary = new SourceTypeSummary { SourceType = sourceType }; report.SourceTypes[sourceType] = sourceTypeSummary; } if (!sourceTypeSummary.Sources.TryGetValue(sourceName, out var sourceSummary)) { sourceSummary = new SourceSummary { SourceName = sourceName }; sourceTypeSummary.Sources[sourceName] = sourceSummary; } foreach (var destResult in result.DestinationsResults) { var destName = destResult.Destination.Name; if (!sourceSummary.Destinations.TryGetValue(destName, out var destSummary)) { destSummary = new DestinationStatusSummary { DestinationName = destName }; sourceSummary.Destinations[destName] = destSummary; } switch (destResult.Status) { case TelemetryPublishResult.DestinationStatus.Passed: destSummary.Passed++; break; case TelemetryPublishResult.DestinationStatus.Failed: destSummary.Failed++; break; case TelemetryPublishResult.DestinationStatus.Postponed: destSummary.Postponed++; break; case TelemetryPublishResult.DestinationStatus.Unavailable: destSummary.Unavailable++; break; } } } return report; }); } #endregion #region Virtual Methods /// /// Called before a package is published to allow for canceling or preprocessing. /// protected virtual bool OnPublishingPackage(TelemetryPublishPackage package, ITelemetryDestination destination) { try { var args = new TelemetryPackagePublishingEventArgs() { Package = package, Destination = destination }; PublishingPackage?.Invoke(this, args); return !args.Cancel; } catch { return true; } } /// /// Called after a package has been successfully delivered to a destination. /// protected virtual void OnPackagePublished(TelemetryPublishPackage package, ITelemetryDestination destination) { try { PackagePublished?.Invoke(this, new TelemetryPackagePublishedEventArgs() { Package = package, Destination = destination }); } catch { } } /// /// Called after a failed attempt to publish a telemetry package. /// protected virtual void OnPackagePublishFailed(TelemetryPublishPackage package, ITelemetryDestination destination, Exception exception) { try { PublishPackageFailed?.Invoke(this, new TelemetryPackagePublishFailedEventArgs() { Package = package, Destination = destination, Exception = exception }); } catch { } } /// /// Called when a publish result is available after a complete publish pass. /// /// The package. /// The result. protected virtual void OnPublishResultAvailable(TelemetryPublishPackage package, TelemetryPublishResult result) { try { Debug.WriteLine($"[TELEMETRY] Package Publish Result Available: {result}"); PublishResultAvailable?.Invoke(this, new TelemetryPublishResultAvailableEventArgs() { Package = package, PublishResult = result }); } catch { } } #endregion #region Dispose /// /// Disposes all sources, destinations, timers, and gracefully shuts down. /// public void Dispose() { if (!_isDisposed) { _isDisposed = true; foreach (var source in InnerSources) { try { if (source is ITelemetryStreamingSource streamingSource) { streamingSource.Stop(); streamingSource.TelemetryAvailable -= StreamingSource_TelemetryAvailable; } source.Dispose(); } catch (Exception ex) { LogManager.Log(ex, $"Error disposing telemetry source {source.Name}."); } } if (IsStarted) { Stop(); } foreach (var destination in Destinations) { try { destination.Dispose(); } catch (Exception ex) { LogManager.Log(ex, $"Error disposing telemetry destination {destination.Name}."); } } } } #endregion } }