using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading; using System.Threading.Tasks; using Tango.BL; using Tango.BL.Entities; using Tango.Core; using Tango.Core.DI; using Tango.PPC.Common; using Tango.PPC.Common.Application; using Tango.PPC.Common.Connection; using Tango.PPC.Common.Telemetry; using Tango.PPC.Common.Web; using Tango.Settings; using Tango.Telemetry; using Tango.Telemetry.Destinations; using Tango.Telemetry.Helpers; using Tango.Telemetry.Sources; namespace Tango.PPC.UI.Telemetry { [TangoCreateWhenRegistered] public class DefaultTelemetryProvider : ExtendedObject, ITelemetryProvider, ITelemetryCheckpointsRecoveryClient { private PPCWebClient _webClient; private Machine _machine; private IMachineProvider _machineProvider; private PPCSettings _ppcSettings; public TelemetrySettings Settings { get; set; } public ITelemetryPublisher TelemetryPublisher { get; set; } public DefaultTelemetryProvider(IPPCApplicationManager applicationManager, IMachineProvider machineProvider, PPCWebClient webClient) { Settings = SettingsManager.Default.GetOrCreate(); applicationManager.ApplicationReady += ApplicationManager_ApplicationReady; _webClient = webClient; _machineProvider = machineProvider; } private void ApplicationManager_ApplicationReady(object sender, EventArgs e) { try { if (Settings.Enable) { Init(); } } catch { } } private void Init() { if (_machineProvider.Machine == null) return; _machine = _machineProvider.Machine; _ppcSettings = SettingsManager.Default.GetOrCreate(); try { using (ObservablesContext db = ObservablesContext.CreateDefault()) { var site = db.Sites.FirstOrDefault(x => x.Guid == _machine.SiteGuid); TelemetryPublisher = new TelemetryPublisher(new TelemetryPublisherConfiguration() { Environment = _ppcSettings.DeploymentSlot.ToString(), Organization = _machine.Organization.Name, Site = site?.Name, SerialNumber = _machine.SerialNumber, MachineType = _machine.Type, EnableBackoff = Settings.EnableBackoff, HistorySourcesRequestInterval = Settings.HistorySourcesRequestInterval, MaxExponentialBackoff = Settings.MaxExponentialBackoff, PendingStorageCheckInterval = Settings.PendingStorageCheckInterval, }, this); (TelemetryPublisher.StorageManager as TelemetryLiteDBStorageManager).EnableCheckPointsRecovery = true; (TelemetryPublisher.StorageManager as TelemetryLiteDBStorageManager).EnforceCheckpointsRecovery = true; } InternetConnectivity.EnsureStarted(); } catch (Exception ex) { LogManager.Log(ex, "Error initializing telemetry publisher."); return; } Task.Factory.StartNew(async () => { LogManager.Log("Telemetry is waiting for an Internet connection..."); while (!InternetConnectivity.IsInternetAvailable()) { Thread.Sleep(5000); } LogManager.Log("Retrieving telemetry device connection string from machine service..."); try { if (!_webClient.IsAuthenticated) { await _webClient.Login(new LoginRequest() { Mode = LoginMode.Machine, SerialNumber = _machine.SerialNumber, MachineGuid = _machine.Guid }); } var response = await _webClient.GetTelemetryDeviceConnection(new TelemetryDeviceRegistrationRequest()); Settings.IoTHubConnectionString = response.ConnectionString; Settings.Save(); } catch (Exception ex) { LogManager.Log(ex, "Failed to retrieve telemetry device connection string from machine service. Falling back to the current connection string from settings..."); } if (Settings.EnableIoTHub) { TelemetryPublisher.RegisterDestination(new TelemetryAzureHubDestination(Settings.IoTHubConnectionString)); } if (Settings.EnableMqtt) { TelemetryPublisher.RegisterDestination(new TelemetryMqttDestination($"Twine {_machine.SerialNumber}")); } if (Settings.SendDiagnostics) { TelemetryPublisher.RegisterSource(new TelemetryDiagnosticsStreamingSource(_machineProvider.MachineOperator) { Config = new TelemetryDiagnosticsStreamingSourceConfig() { DiagnosticsSamplingInterval = Settings.DiagnosticsSamplingInterval, } }); } if (Settings.SendEvents) { TelemetryPublisher.RegisterSource(new TelemetryEventsStreamingSource(_machineProvider.MachineOperator)); } if (Settings.SendJobRuns) { TelemetryPublisher.RegisterSource(new TelemetryJobRunsStreamingSource(_machineProvider.MachineOperator)); } if (Settings.SendJobRunsHistory) { TelemetryPublisher.RegisterSource(new TelemetryJobRunsHistorySource() { Config = new TelemetryJobRunsHistorySourceConfig() { MaxJobRunsPerRequest = 10 } }); } if (Settings.SendJobStatus) { TelemetryPublisher.RegisterSource(new TelemetryJobStatusSource(_machineProvider.MachineOperator)); } if (Settings.SendLogs) { TelemetryPublisher.RegisterSource(new TelemetryLogsStreamingSource(_machineProvider.MachineOperator) { Config = new TelemetryLogsStreamingSourceConfig() { Categories = Settings.LogCategories } }); } if (Settings.SendMachineStatus) { TelemetryPublisher.RegisterSource(new TelemetryMachineStatusStreamingSource(_machineProvider.MachineOperator)); } if (Settings.SendMachineUpdates) { TelemetryPublisher.RegisterSource(new TelemetryMachineUpdatesStreamingSource()); } if (Settings.SendMachineUpdatesHistory) { TelemetryPublisher.RegisterSource(new TelemetryMachineUpdatesHistorySource()); } if (Settings.SendWires) { TelemetryPublisher.RegisterSource(new TelemetryWireStreamingSource(_machineProvider.MachineOperator)); } try { await TelemetryPublisher.Start(); } catch (Exception ex) { LogManager.Log(ex, "Error starting telemetry publisher."); } }); } public async Task> GetCheckpointsBackup() { var response = await _webClient.GetTelemetryCheckPoints(new TelemetryGetCheckPointsRequest()); return response.Checkpoints .Select(x => new TelemetryHistorySourceCheckPoint() { SourceName = x.SourceName, Time = x.Time, TotalCount = x.TotalCount }) .ToList(); } public async Task SaveCheckpointsBackup(List checkPoints) { await _webClient.SetTelemetryCheckPoints(new TelemetrySetCheckPointsRequest() { Checkpoints = checkPoints.Select(x => new TelemetryCheckPoint() { SourceName = x.SourceName, Time = x.Time, TotalCount = x.TotalCount }).ToList() }); } public void Dispose() { TelemetryPublisher?.Dispose(); TelemetryPublisher?.StorageManager.Dispose(); } } }