using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Collections.ObjectModel;
using System.Diagnostics;
using System.Linq;
using System.Reflection;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.Timers;
using Tango.Core;
using Tango.Core.ExtensionMethods;
using Tango.Insights;
using Tango.Integration.Operation;
using Tango.Logging;
using Tango.PMR.Diagnostics;
using Tango.PMR.Insights;
using Tango.Telemetry.Reporting;
using Tango.Telemetry.Telemetries;
namespace Tango.Telemetry
{
///
/// TelemetryPublisher is responsible for collecting telemetry data from sources,
/// queuing it, storing it if needed, and publishing it to one or more destinations.
/// It supports streaming, historical, and retry-based telemetry flows.
///
public class TelemetryPublisher : ExtendedObject, ITelemetryPublisher
{
///
/// Occurs before a telemetry package is published to a destination.
///
public event EventHandler PublishingPackage;
///
/// Occurs when a telemetry package has been successfully published to a destination.
///
public event EventHandler PackagePublished;
///
/// Occurs when a telemetry package fails to publish to a destination.
///
public event EventHandler PublishPackageFailed;
///
/// Occurs when a telemetry publish operation has completed and a publish result is available,
/// indicating the success or failure status for each destination.
///
public event EventHandler PublishResultAvailable;
// Timer to periodically check and publish pending telemetry from local storage
private System.Timers.Timer _pendingStorageCheckTimer;
// Timer to periodically fetch historical data from ITelemetryHistorySource
private System.Timers.Timer _historicalDataTimer;
// Indicates if the publisher has been disposed
protected bool _isDisposed;
// Background thread responsible for dequeuing and publishing telemetry
private Thread _publishThread;
// Source used to tag telemetry loaded from pending storage
private TelemetryPendingStorageSource _pendingStorageSource;
//Timer responsible for triggering periodic cleanup of the published telemetries cache,
private System.Timers.Timer _publishedTelemetriesCacheCleanupTimer;
private List _pastResults;
#region Properties
///
/// Indicates whether the publisher is actively running.
///
public bool IsStarted { get; private set; }
///
/// Publisher configuration containing telemetry parameters and limits.
///
public TelemetryPublisherConfiguration Config { get; }
///
/// Manages persistence of telemetry data (e.g., LiteDB).
///
public ITelemetryStorageManager StorageManager { get; }
private List InnerSources { get; }
///
/// Public read-only access to telemetry sources.
///
public ReadOnlyCollection Sources { get; }
private List InnerDestinations { get; }
///
/// Public read-only access to telemetry destinations.
///
public ReadOnlyCollection Destinations { get; }
///
/// Manages telemetry queuing between ingestion and publish phases.
///
public ITelemetryQueueManager QueueManager { get; private set; }
///
/// Gets the client used for remote checkpoint recovery.
///
public ITelemetryCheckpointsRecoveryClient CheckpointsRecoveryClient { get; }
#endregion
#region Constructor
///
/// Initializes the telemetry publisher with default storage and queue managers.
///
public TelemetryPublisher(TelemetryPublisherConfiguration config, ITelemetryCheckpointsRecoveryClient checkPointsRecoveryClient)
{
_pastResults = new List();
Config = config ?? new TelemetryPublisherConfiguration();
_pendingStorageSource = new TelemetryPendingStorageSource();
InnerSources = new List();
Sources = new ReadOnlyCollection(InnerSources);
InnerDestinations = new List();
Destinations = new ReadOnlyCollection(InnerDestinations);
_publishThread = new Thread(PublishThreadMethod);
_publishThread.IsBackground = true;
CheckpointsRecoveryClient = checkPointsRecoveryClient;
StorageManager = new TelemetryLiteDBStorageManager();
QueueManager = new TelemetryInMemoryQueueManager();
}
///
/// Initializes the telemetry publisher with custom storage and queue managers.
///
public TelemetryPublisher(ITelemetryStorageManager storageManager, ITelemetryCheckpointsRecoveryClient checkPointsRecoveryClient, ITelemetryQueueManager queueManager, TelemetryPublisherConfiguration config) : this(config, checkPointsRecoveryClient)
{
StorageManager = storageManager;
QueueManager = queueManager;
}
#endregion
#region Sources
///
/// Registers a telemetry source, such as a streaming or historical source.
///
public void RegisterSource(ITelemetrySource source)
{
if (source == null) return;
if (InnerSources.Exists(x => x.GetType() == source.GetType()))
{
LogManager.Log($"Telemetry source {source.Name} has already been registered. Ignoring.", LogCategory.Warning);
return;
}
InnerSources.Add(source);
if (source is ITelemetryStreamingSource streamingSource)
{
streamingSource.TelemetryAvailable += StreamingSource_TelemetryAvailable;
if (IsStarted)
{
try
{
streamingSource.Start();
}
catch (Exception ex)
{
LogManager.Log(ex, $"Error starting telemetry source {source.Name}.");
}
}
}
LogManager.Log($"Telemetry source {source.Name} registered.");
}
///
/// Callback when a telemetry streaming source emits new telemetry.
///
private void StreamingSource_TelemetryAvailable(object sender, TelemetryAvailableEventArgs e)
{
if (_isDisposed) return;
var source = sender as ITelemetrySource;
if (source != null)
{
LogManager.Log($"Telemetry stream received {source.Name} -> {e.TelemetryObject.ToTelemetryName()}.", LogCategory.Debug);
PushTelemetryPackage(source, e.TelemetryObject, TelemetrySourceTypes.Streaming, e.DisableDeliveryRetries);
}
}
#endregion
#region Destinations
///
/// Registers a telemetry destination, such as a cloud service or local database.
///
public void RegisterDestination(ITelemetryDestination destination)
{
if (destination == null) return;
if (InnerDestinations.Exists(x => x.Name == destination.Name))
{
LogManager.Log($"Telemetry destination with name {destination.Name} has already been registered. Ignoring.", LogCategory.Warning);
return;
}
InnerDestinations.Add(destination);
LogManager.Log($"Telemetry destination {destination.Name} registered.");
}
#endregion
#region Start / Stop
///
/// Starts all timers, threads, and streaming sources for publishing telemetry.
///
public async Task Start()
{
if (!IsStarted)
{
try
{
LogManager.Log($"Starting telemetry publisher...\nConfig:\n{Config.ToJsonString()}\nSources: {String.Join(", ", Sources.Select(x => x.Name))}\nDestinations: {String.Join(", ", Destinations.Select(x => x.Name))}");
Config.Validate();
Validate();
IsStarted = true;
await StorageManager.Init(CheckpointsRecoveryClient);
if (_pendingStorageCheckTimer == null)
{
_pendingStorageCheckTimer = new System.Timers.Timer();
_pendingStorageCheckTimer.Interval = Config.PendingStorageCheckInterval.TotalMilliseconds;
_pendingStorageCheckTimer.Elapsed += PendingStorageCheckTimer_Elapsed;
}
_pendingStorageCheckTimer.Start();
if (_historicalDataTimer == null)
{
_historicalDataTimer = new System.Timers.Timer();
_historicalDataTimer.Interval = Config.HistorySourcesRequestInterval.TotalMilliseconds;
_historicalDataTimer.Elapsed += HistoricalDataTimer_Elapsed;
}
_historicalDataTimer.Start();
if (_publishedTelemetriesCacheCleanupTimer == null)
{
_publishedTelemetriesCacheCleanupTimer = new System.Timers.Timer();
_publishedTelemetriesCacheCleanupTimer.Interval = Config.PublishedTelemetriesCacheCleanupInterval.TotalMilliseconds;
_publishedTelemetriesCacheCleanupTimer.Elapsed += PublishedTelemetriesCacheCleanupTimer_Elapsed;
}
_publishedTelemetriesCacheCleanupTimer.Start();
_publishThread.Start();
InnerSources.OfType().ToList().ForEach(x => x.Start());
LogManager.Log($"Telemetry publisher started.");
}
catch (Exception ex)
{
LogManager.Log(ex, "Error starting telemetry publisher.");
await Stop();
throw;
}
}
}
///
/// Stops all activity and releases threads and sources gracefully.
///
public Task Stop()
{
if (IsStarted)
{
IsStarted = false;
LogManager.Log("Stopping telemetry publisher...");
InnerSources.OfType().ToList().ForEach(x =>
{
try
{
x.Stop();
}
catch (Exception ex)
{
LogManager.Log(ex, $"Error while trying to stop telemetry source {x.Name}.");
}
});
_pendingStorageCheckTimer?.Stop();
_historicalDataTimer?.Stop();
QueueManager?.Enqueue(null);
LogManager.Log("Telemetry publisher stopped.");
}
return Task.FromResult(true);
}
///
/// Performs runtime validation of configuration, sources, and destinations.
///
public void Validate()
{
// Validate all registered sources
foreach (var source in InnerSources)
{
if (string.IsNullOrWhiteSpace(source.Name))
{
throw new ArgumentException("A registered telemetry source has an invalid or missing Name.");
}
}
// Validate all registered destinations
foreach (var destination in InnerDestinations)
{
if (string.IsNullOrWhiteSpace(destination.Name))
{
throw new ArgumentException("A registered telemetry destination has an invalid or missing Name.");
}
if (destination.SupportedSourceTypes == null || !destination.SupportedSourceTypes.Any())
{
throw new InvalidOperationException($"Telemetry destination '{destination.Name}' must support at least one telemetry source.");
}
}
// Validate StorageManager
if (StorageManager == null)
{
throw new NullReferenceException("StorageManager is not configured.");
}
// Validate QueueManager
if (QueueManager == null)
{
throw new NullReferenceException("QueueManager is not configured.");
}
}
#endregion
#region Timers
///
/// Periodically invoked to process telemetry from persistent local storage.
///
private async void PendingStorageCheckTimer_Elapsed(object sender, ElapsedEventArgs e)
{
LogManager.Log("Pending storage check timer elapsed. Starting flush operation for pending telemetries.", LogCategory.Debug);
_pendingStorageCheckTimer.Stop();
try
{
var results = await FlushPendingTelemetries(Config.MaxPendingStorageTelemetriesPerCycle);
LogManager.Log($"Flush operation completed. {results.Count} telemetry package(s) processed from pending storage.", LogCategory.Debug);
}
catch (Exception ex)
{
LogManager.Log(ex, LogCategory.Error, "Exception occurred while flushing pending telemetry packages.");
}
finally
{
_pendingStorageCheckTimer.Start();
LogManager.Log("Pending storage check timer restarted.", LogCategory.Debug);
}
}
///
/// Periodically invoked to fetch and push historical data from history sources.
///
private async void HistoricalDataTimer_Elapsed(object sender, ElapsedEventArgs e)
{
LogManager.Log("Historical data timer elapsed. Checking for available capacity...", LogCategory.Debug);
_historicalDataTimer.Stop();
try
{
int queueCount = QueueManager.Count;
int storageCount = StorageManager.GetPendingTelemetriesCount();
LogManager.Log($"Current queue count: {queueCount}, pending storage count: {storageCount}", LogCategory.Debug);
if (queueCount < Config.MaxPendingTelemetries && storageCount < Config.MaxPendingTelemetries)
{
foreach (var source in InnerSources.OfType().ToList())
{
try
{
TelemetryHistorySourceCheckPoint checkpoint = StorageManager.GetHistorySourceCheckPoint(source);
if (checkpoint == null)
{
checkpoint = new TelemetryHistorySourceCheckPoint();
checkpoint.SourceName = source.Name;
checkpoint.Time = source.Direction == TelemetryHistorySourceDirection.Ascending ? DateTime.MinValue : DateTime.MaxValue;
}
LogManager.Log($"Evaluating history source '{source.Name}' at checkpoint time {checkpoint?.Time:u}", LogCategory.Debug);
if (await source.CanRequestHistory(checkpoint.Time))
{
List historyTelemetries = new List();
if (source.Direction == TelemetryHistorySourceDirection.Ascending)
{
historyTelemetries = (await source.RequestHistory(checkpoint.Time)).OrderBy(x => x.Time).ToList();
}
else
{
historyTelemetries = (await source.RequestHistory(checkpoint.Time)).OrderByDescending(x => x.Time).ToList();
}
LogManager.Log($"History source '{source.Name}' returned {historyTelemetries.Count} telemetry items.", LogCategory.Debug);
foreach (var telemetry in historyTelemetries)
{
await PushTelemetryPackageAwait(source, telemetry, TelemetrySourceTypes.ExternalStorage);
checkpoint.Time = telemetry.Time;
checkpoint.TotalCount++;
StorageManager.SetHistorySourceCheckPoint(source, checkpoint.Time, checkpoint.TotalCount);
}
LogManager.Log($"Checkpoint updated for source '{source.Name}': time = {checkpoint.Time:u}, total = {checkpoint.TotalCount}", LogCategory.Debug);
}
else
{
LogManager.Log($"History request for source '{source.Name}' was not permitted at checkpoint time {checkpoint?.Time:u}", LogCategory.Debug);
}
}
catch (Exception ex)
{
LogManager.Log(ex, LogCategory.Error, $"Exception while processing history for source '{source?.Name}'");
}
}
}
else
{
LogManager.Log("Historical data fetch skipped due to max pending telemetry limit reached.", LogCategory.Debug);
}
}
catch (Exception ex)
{
LogManager.Log(ex, LogCategory.Critical, "Unexpected error during HistoricalDataTimer_Elapsed.");
}
finally
{
_historicalDataTimer.Start();
LogManager.Log("Historical data timer restarted.", LogCategory.Debug);
}
}
///
/// Handles the elapsed event of the published telemetries cache cleanup timer.
/// Determines the earliest checkpoint across all history sources and removes published telemetry entries
/// older than that point to keep the cache size manageable over time.
///
private void PublishedTelemetriesCacheCleanupTimer_Elapsed(object sender, ElapsedEventArgs e)
{
LogManager.Log("Published telemetry cache cleanup timer elapsed. Starting cleanup process...", LogCategory.Debug);
_publishedTelemetriesCacheCleanupTimer.Stop();
try
{
var checkPoints = StorageManager.GetHistorySourcesCheckPoints();
LogManager.Log($"Retrieved {checkPoints.Count} source checkpoints for cleanup evaluation.", LogCategory.Debug);
if (checkPoints.Count > 0)
{
DateTime olderThan = checkPoints.Min(x => x.Time);
LogManager.Log($"Initiating cleanup of published telemetries older than {olderThan:u}.", LogCategory.Debug);
StorageManager.PerformPublishedTelemetriesCleanUp(olderThan);
LogManager.Log("Published telemetry cache cleanup completed successfully.", LogCategory.Debug);
}
else
{
LogManager.Log("No checkpoints found. Cleanup skipped.", LogCategory.Debug);
}
}
catch (Exception ex)
{
LogManager.Log(ex, LogCategory.Error, "Exception occurred during published telemetry cache cleanup.");
}
finally
{
_publishedTelemetriesCacheCleanupTimer.Start();
LogManager.Log("Published telemetry cache cleanup timer restarted.", LogCategory.Debug);
}
}
#endregion
#region Push
///
/// Enqueues telemetry into the system based on a source and type.
///
private TelemetryPublishPackage PushTelemetryPackage(ITelemetrySource source, ITelemetry telemetry, TelemetrySourceTypes sourceType, bool disableDeliveryRetries = false)
{
PendingTelemetry pendingTelemetry = new PendingTelemetry();
pendingTelemetry.Created = DateTime.UtcNow;
pendingTelemetry.Source = source.Name;
pendingTelemetry.SourceType = sourceType;
pendingTelemetry.TelemetryObject = telemetry;
var package = new TelemetryPublishPackage() { Source = source, PendingTelemetry = pendingTelemetry, SourceType = sourceType, DisableDeliveryRetries = disableDeliveryRetries };
PushTelemetryPackage(package);
return package;
}
///
/// Enqueues telemetry and returns a task that resolves when it is published.
///
private Task PushTelemetryPackageAwait(ITelemetrySource source, ITelemetry telemetry, TelemetrySourceTypes sourceType)
{
return PushTelemetryPackage(source, telemetry, sourceType).CompletionSource.Task;
}
///
/// Enqueues an already-wrapped package for background publishing.
///
private void PushTelemetryPackage(TelemetryPublishPackage package)
{
QueueManager.Enqueue(package);
}
///
/// Enqueues a wrapped package and awaits publish result asynchronously.
///
private Task PushTelemetryPackageAwait(TelemetryPublishPackage package)
{
PushTelemetryPackage(package);
return package.CompletionSource.Task;
}
#endregion
#region Publish
///
/// Background thread method to publish telemetry from the queue.
///
private async void PublishThreadMethod()
{
while (IsStarted)
{
TelemetryPublishPackage package = QueueManager.Dequeue();
if (package == null)
{
QueueManager.Clear();
return;
}
try
{
await PublishTelemetryPackage(package);
}
catch
{
Thread.Sleep(1000);
}
}
}
// This method is responsible for publishing a telemetry package to all configured destinations.
// It handles per-destination retry logic, exponential backoff, availability checks, and result reporting.
// The goal is to guarantee eventual delivery of telemetry with robust fault tolerance and observability.
protected virtual async Task PublishTelemetryPackage(TelemetryPublishPackage package)
{
LogManager.Log($"Starting publish process for telemetry package from source '{package.Source?.Name}' with type '{package.SourceType}'", LogCategory.Debug);
Stopwatch totalWatch = Stopwatch.StartNew(); // Start measuring total publish duration
var result = new TelemetryPublishResult(); // Result container with per-destination feedback
result.Source = package.Source;
result.SourceType = package.SourceType;
// Abort early if the publisher is inactive
if (!IsStarted || _isDisposed)
{
LogManager.Log("Publish attempt skipped because the publisher is not started or has been disposed.", LogCategory.Warning);
package.CompletionSource.SetResult(result);
return result;
}
//Marking the telemetry as published to avoid duplication from streaming and history sources that can produce the same telemetry.
if (package.Source.RequiresTelemetryDuplicationTracking)
{
if (StorageManager.IsTelemetryInPublishedCache(package.PendingTelemetry.TelemetryObject))
{
LogManager.Log("Publish attempt skipped because the telemetry was already published.", LogCategory.Warning);
package.CompletionSource.SetResult(result);
return result;
}
else
{
StorageManager.AddToPublishedTelemetryCache(package.PendingTelemetry.TelemetryObject);
}
}
// Prepare standard metadata properties attached to all telemetry sent
var telemetryName = package.PendingTelemetry.TelemetryObject.ToTelemetryName();
var telemetryVersion = package.PendingTelemetry.TelemetryObject.ToTelemetryVersion();
List> properties = new List>();
properties.Add(new KeyValuePair("SerialNumber", Config.SerialNumber));
properties.Add(new KeyValuePair("MachineType", Config.MachineType.ToShortName()));
properties.Add(new KeyValuePair("Organization", Config.Organization));
properties.Add(new KeyValuePair("Site", Config.Site));
properties.Add(new KeyValuePair("Environment", Config.Environment));
properties.Add(new KeyValuePair("Type", telemetryName));
//Setting telemetry package basic properties for destination..
package.TelemetryName = telemetryName;
package.TelemetryVersion = telemetryVersion;
package.SerialNumber = Config.SerialNumber;
package.Environment = Config.Environment;
package.Organization = Config.Organization;
package.Site = Config.Site;
package.MachineType = Config.MachineType.ToShortName();
var now = DateTime.UtcNow; // Capture timestamp once for all retry logic
List pendingDestinations = package.PendingTelemetry.PendingDestinations.ToList();
// If this is a fresh package, initialize pending destinations
if (package.SourceType == TelemetrySourceTypes.Streaming || package.SourceType == TelemetrySourceTypes.ExternalStorage)
{
LogManager.Log("Evaluating destinations for initial pending destination registration...", LogCategory.Debug);
foreach (var destination in Destinations)
{
if (destination.SupportedSourceTypes.Contains(package.SourceType))
{
if (!pendingDestinations.Exists(x => x.Name == destination.Name))
{
pendingDestinations.Add(new TelemetryPendingDestination
{
Name = destination.Name,
RetryCount = 0,
LastAttempt = DateTime.MinValue,
NextEligibleAttempt = now
});
LogManager.Log($"Added destination '{destination.Name}' to pending destinations.", LogCategory.Debug);
}
}
}
}
// Try publishing to each valid destination
foreach (var destination in Destinations.Where(x => x.SupportedSourceTypes.Contains(package.SourceType)))
{
var pendingEntry = pendingDestinations.FirstOrDefault(x => x.Name == destination.Name);
if (pendingEntry == null) continue; // Skip destinations not pending for this package
// Prepare result tracking for this destination
var destinationResult = new TelemetryPublishResult.DestinationResult();
destinationResult.Destination = destination;
destinationResult.RetryCount = pendingEntry.RetryCount;
destinationResult.RetryDelay = TimeSpan.FromSeconds(Math.Max(0, (pendingEntry.NextEligibleAttempt - now).TotalSeconds));
result.DestinationsResults.Add(destinationResult);
// If we're still in a backoff delay, skip for now
if (Config.EnableBackoff && now < pendingEntry.NextEligibleAttempt)
{
destinationResult.Status = TelemetryPublishResult.DestinationStatus.Postponed;
destinationResult.ElapsedTime = TimeSpan.Zero;
LogManager.Log($"Skipping '{destination.Name}' until {pendingEntry.NextEligibleAttempt:O} (backoff in effect).", LogCategory.Debug);
continue;
}
Stopwatch destinationWatch = Stopwatch.StartNew(); // Measure this attempt duration
try
{
// Remove destination from pending list so we can re-add it if needed after this attempt
pendingDestinations.RemoveAll(x => x.Name == destination.Name);
LogManager.Log($"Attempting to publish to destination '{destination.Name}'...", LogCategory.Debug);
// Allow event handlers to cancel or inspect the publish
if (OnPublishingPackage(package, destination))
{
// Ensure destination is ready before sending
if (await destination.IsAvailable())
{
await destination.Publish(package, properties); // Perform publish
OnPackagePublished(package, destination); // Notify success event
destinationWatch.Stop();
destinationResult.RetryDelay = TimeSpan.Zero;
destinationResult.Status = TelemetryPublishResult.DestinationStatus.Passed;
destinationResult.ElapsedTime = destinationWatch.Elapsed;
LogManager.Log($"Successfully published to '{destination.Name}' in {destinationResult.ElapsedTime.TotalMilliseconds} ms.", LogCategory.Debug);
}
else
{
// Mark as temporarily unavailable and schedule retry
destinationWatch.Stop();
destinationResult.Status = TelemetryPublishResult.DestinationStatus.Unavailable;
destinationResult.ElapsedTime = destinationWatch.Elapsed;
LogManager.Log($"Destination '{destination.Name}' is unavailable.", LogCategory.Warning);
if (destination.SupportedSourceTypes.Contains(TelemetrySourceTypes.PendingStorage))
{
pendingEntry.RetryCount++;
pendingEntry.LastAttempt = now;
int delay = Math.Min((int)Math.Pow(2, pendingEntry.RetryCount), (int)Config.MaxExponentialBackoff.TotalSeconds);
pendingEntry.NextEligibleAttempt = now.AddSeconds(delay);
LogManager.Log($"Scheduled retry to '{destination.Name}' in {delay} seconds.", LogCategory.Debug);
pendingDestinations.Add(pendingEntry);
}
else
{
LogManager.Log($"'{destination.Name}' is not retryable. Removed from pending.", LogCategory.Debug);
}
}
}
}
catch (Exception ex)
{
// Log unexpected failure and retry if supported
destinationWatch.Stop();
destinationResult.Status = TelemetryPublishResult.DestinationStatus.Failed;
destinationResult.Error = ex;
destinationResult.ElapsedTime = destinationWatch.Elapsed;
LogManager.Log(ex, $"Error publishing telemetry to '{destination.Name}'.");
OnPackagePublishFailed(package, destination, ex);
if (destination.SupportedSourceTypes.Contains(TelemetrySourceTypes.PendingStorage))
{
pendingEntry.RetryCount++;
pendingEntry.LastAttempt = now;
int delay = Math.Min((int)Math.Pow(2, pendingEntry.RetryCount), (int)Config.MaxExponentialBackoff.TotalSeconds);
pendingEntry.NextEligibleAttempt = now.AddSeconds(delay);
LogManager.Log($"Scheduled retry to '{destination.Name}' in {delay} seconds due to failure.", LogCategory.Debug);
pendingDestinations.Add(pendingEntry);
}
else
{
LogManager.Log($"'{destination.Name}' is not retryable. Removed from pending after failure.", LogCategory.Debug);
}
}
}
// Save retry state back into the package
package.PendingTelemetry.PendingDestinations = pendingDestinations;
if (!package.DisableDeliveryRetries)
{
// Remove from storage if all destinations succeeded; otherwise persist state
if (package.PendingTelemetry.PendingDestinations.Count == 0)
{
LogManager.Log("Deleting successfully published telemetry from storage.", LogCategory.Debug);
StorageManager.DeletePendingTelemetry(package.PendingTelemetry);
}
else
{
LogManager.Log("Saving telemetry package for future retry or tracking.", LogCategory.Debug);
StorageManager.UpsertPendingTelemetry(package.PendingTelemetry);
}
}
// Finalize result and notify completion
totalWatch.Stop();
result.TotalElapsedTime = totalWatch.Elapsed;
LogManager.Log($"Completed publish process for telemetry from source '{package.Source?.Name}' in {result.TotalElapsedTime.TotalMilliseconds} ms.", LogCategory.Debug);
//Add results for reporting
_pastResults.Add(result);
//Set task result for once that are awaiting from outside this method.
package.CompletionSource.SetResult(result);
//Raising final event
OnPublishResultAvailable(package, result);
return result;
}
#endregion
#region Flush
///
/// Flushes up to the specified number of pending telemetries from local storage,
/// attempting to publish them immediately. This can be used to force a retry of previously failed or postponed telemetry packages.
///
/// The maximum number of pending telemetry packages to flush.
///
/// A task that represents the asynchronous flush operation, returning a list of publish results for the flushed packages.
///
public async Task> FlushPendingTelemetries(int maxCount)
{
if (!IsStarted || _isDisposed)
{
LogManager.Log("FlushPendingTelemetries called while publisher is not started or already disposed. Operation aborted.", LogCategory.Warning);
return new List();
}
var batch = StorageManager.GetPendingTelemetries(maxCount);
List results = new List();
if (batch.Count > 0)
{
LogManager.Log($"Flushing {batch.Count} pending telemetry package(s).", LogCategory.Info);
foreach (var pendingTelemetry in batch)
{
try
{
var package = new TelemetryPublishPackage()
{
Source = _pendingStorageSource,
PendingTelemetry = pendingTelemetry,
SourceType = TelemetrySourceTypes.PendingStorage
};
var result = await PushTelemetryPackageAwait(package);
results.Add(result);
LogManager.Log(
$"Flushed telemetry to destinations: {string.Join(", ", result.DestinationsResults.Select(r => $"{r.Destination.Name}={r.Status}"))}",
LogCategory.Debug);
}
catch (Exception ex)
{
LogManager.Log(ex, LogCategory.Error, "Exception occurred while flushing a pending telemetry package.");
}
if (!IsStarted || _isDisposed)
{
LogManager.Log("Flush operation interrupted: publisher is no longer active.", LogCategory.Warning);
return results;
}
}
if (results.Any(x => x.DestinationsResults.Any(y => y.Status == TelemetryPublishResult.DestinationStatus.Passed)))
{
LogManager.Log("FlushPendingTelemetries completed successfully.", LogCategory.Info);
}
}
return results;
}
#endregion
#region Reporting
///
/// Generates a detailed telemetry report summarizing the current state of the telemetry system.
/// The report includes statistics on published and pending telemetry, as well as per-source and per-destination results.
///
public Task GetTelemetryReport()
{
return Task.Factory.StartNew(() =>
{
TelemetryReport report = new TelemetryReport
{
GeneratedAt = DateTime.UtcNow,
TotalPending = StorageManager.GetPendingTelemetriesCount()
};
var results = _pastResults.ToList();
report.TotalPublished = results.Count;
foreach (var result in results)
{
var sourceType = result.SourceType;
var sourceName = result.Source?.Name ?? "UnknownSource";
if (!report.SourceTypes.TryGetValue(sourceType, out var sourceTypeSummary))
{
sourceTypeSummary = new SourceTypeSummary
{
SourceType = sourceType
};
report.SourceTypes[sourceType] = sourceTypeSummary;
}
if (!sourceTypeSummary.Sources.TryGetValue(sourceName, out var sourceSummary))
{
sourceSummary = new SourceSummary
{
SourceName = sourceName
};
sourceTypeSummary.Sources[sourceName] = sourceSummary;
}
foreach (var destResult in result.DestinationsResults)
{
var destName = destResult.Destination.Name;
if (!sourceSummary.Destinations.TryGetValue(destName, out var destSummary))
{
destSummary = new DestinationStatusSummary
{
DestinationName = destName
};
sourceSummary.Destinations[destName] = destSummary;
}
switch (destResult.Status)
{
case TelemetryPublishResult.DestinationStatus.Passed:
destSummary.Passed++;
break;
case TelemetryPublishResult.DestinationStatus.Failed:
destSummary.Failed++;
break;
case TelemetryPublishResult.DestinationStatus.Postponed:
destSummary.Postponed++;
break;
case TelemetryPublishResult.DestinationStatus.Unavailable:
destSummary.Unavailable++;
break;
}
}
}
return report;
});
}
#endregion
#region Virtual Methods
///
/// Called before a package is published to allow for canceling or preprocessing.
///
protected virtual bool OnPublishingPackage(TelemetryPublishPackage package, ITelemetryDestination destination)
{
try
{
var args = new TelemetryPackagePublishingEventArgs() { Package = package, Destination = destination };
PublishingPackage?.Invoke(this, args);
return !args.Cancel;
}
catch
{
return true;
}
}
///
/// Called after a package has been successfully delivered to a destination.
///
protected virtual void OnPackagePublished(TelemetryPublishPackage package, ITelemetryDestination destination)
{
try
{
PackagePublished?.Invoke(this, new TelemetryPackagePublishedEventArgs() { Package = package, Destination = destination });
}
catch { }
}
///
/// Called after a failed attempt to publish a telemetry package.
///
protected virtual void OnPackagePublishFailed(TelemetryPublishPackage package, ITelemetryDestination destination, Exception exception)
{
try
{
PublishPackageFailed?.Invoke(this, new TelemetryPackagePublishFailedEventArgs() { Package = package, Destination = destination, Exception = exception });
}
catch { }
}
///
/// Called when a publish result is available after a complete publish pass.
///
/// The package.
/// The result.
protected virtual void OnPublishResultAvailable(TelemetryPublishPackage package, TelemetryPublishResult result)
{
try
{
Debug.WriteLine($"[TELEMETRY] Package Publish Result Available: {result}");
PublishResultAvailable?.Invoke(this, new TelemetryPublishResultAvailableEventArgs() { Package = package, PublishResult = result });
}
catch { }
}
#endregion
#region Dispose
///
/// Disposes all sources, destinations, timers, and gracefully shuts down.
///
public void Dispose()
{
if (!_isDisposed)
{
_isDisposed = true;
foreach (var source in InnerSources)
{
try
{
if (source is ITelemetryStreamingSource streamingSource)
{
streamingSource.Stop();
streamingSource.TelemetryAvailable -= StreamingSource_TelemetryAvailable;
}
source.Dispose();
}
catch (Exception ex)
{
LogManager.Log(ex, $"Error disposing telemetry source {source.Name}.");
}
}
if (IsStarted)
{
Stop();
}
foreach (var destination in Destinations)
{
try
{
destination.Dispose();
}
catch (Exception ex)
{
LogManager.Log(ex, $"Error disposing telemetry destination {destination.Name}.");
}
}
}
}
#endregion
}
}