using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks; using Tango.BL; using Tango.BL.Enumerations; using Tango.Logging; using Tango.Telemetry.Destinations; using Tango.Telemetry.Mappers; using Tango.Telemetry.Sources; using Tango.Telemetry.Telemetries; using System.Data.Entity; using Tango.BL.Entities; using Tango.Insights; using System.Threading; using Tango.PMR.MachineStatus; namespace Tango.Telemetry.Tester.IOT.CLI { class Program { private static List _machineInfos; private static int _counter = 1; static void Main(string[] args) { LogManager.Default.RegisterLogger(new Tango.Logging.VSOutputLogger("IOT TESTER")); TelemetryPublisher publisher = new TelemetryPublisher(new TelemetryPublisherConfiguration() { Environment = "PROD", SerialNumber = "ROY_X1", Organization = "Twine", MachineType = MachineTypes.TS1800, HistorySourcesRequestInterval = TimeSpan.FromSeconds(1), EnableBackoff = false, }, null); (publisher.StorageManager as TelemetryLiteDBStorageManager).EnableCheckPointsRecovery = false; //publisher.RegisterSource(new JobRunsTestSource()); //publisher.RegisterSource(new DiagnosticsTestSource()); publisher.RegisterSource(new EventsTestSource()); publisher.RegisterSource(new MachineStatusTestSource()); var logsSource = new TelemetryLogsStreamingSource(); logsSource.Config.Categories.Add(LogCategory.Info); publisher.RegisterSource(logsSource); publisher.RegisterSource(new TelemetryLogsStreamingSource()); publisher.RegisterSource(new JobStatusTestSource()); publisher.RegisterDestination(new TelemetryAzureHubDestination("HostName=iot-twine-dev-weu.azure-devices.net;DeviceId=telemetry-dev-01;SharedAccessKey=cZhCMhiVL+TF7p13fpX+lFmyxoy8ZqCkbxUwumWw18Q=")); //publisher.RegisterDestination(new TelemetryMqttDestination("Telemetry MQTT")); publisher.PublishResultAvailable += Publisher_PublishResultAvailable; publisher.PublishingPackage += Publisher_PublishingPackage; publisher.Start().GetAwaiter().GetResult(); Console.Clear(); Console.WriteLine("=== Telemetry IoT Hub Test Utility ==="); Console.WriteLine($"Publisher started. Streaming every {publisher.Config.HistorySourcesRequestInterval.TotalSeconds} seconds."); Console.WriteLine("Press any key to stop streaming data..."); Console.ReadKey(); Console.WriteLine("Disposing publisher..."); publisher.Dispose(); } private static void Publisher_PublishingPackage(object sender, TelemetryPackagePublishingEventArgs e) { //TelemetryJobRun tRun = e.Package.PendingTelemetry.TelemetryObject as TelemetryJobRun; //if (_machineInfos == null) //{ // using (ObservablesContext db = ObservablesContext.CreateDefault()) // { // _machineInfos = new List(); // var machinesOrganizations = db.Machines.Include(x => x.Organization).Select(x => new { x, x.Organization.Name, x.SiteGuid }).ToList(); // foreach (var item in machinesOrganizations) // { // MachineExtendedInfo info = new MachineExtendedInfo(); // info.Machine = item.x; // info.Organization = item.Name; // info.Site = db.Sites.FirstOrDefault(x => x.Guid == item.SiteGuid)?.Name; // _machineInfos.Add(info); // } // } //} //var machineInfo = _machineInfos.FirstOrDefault(x => x.Machine.Guid == tRun.MachineGuid); //if (machineInfo == null) //{ // e.Cancel = true; //} //else //{ // e.Package.Organization = machineInfo.Organization; // e.Package.Site = machineInfo.Site; // e.Package.SerialNumber = machineInfo.Machine.SerialNumber; // e.Package.MachineType = machineInfo.Machine.Type.ToShortName(); //} } private static void Publisher_PublishResultAvailable(object sender, TelemetryPublishResultAvailableEventArgs e) { Console.ForegroundColor = ConsoleColor.DarkGray; Console.WriteLine($"Package publish result available:\n{e.PublishResult.ToString()}"); if (e.PublishResult.DestinationsResults.Any(d => d.Status == TelemetryPublishResult.DestinationStatus.Failed || d.Status == TelemetryPublishResult.DestinationStatus.Unavailable)) { Console.ForegroundColor = ConsoleColor.Red; Console.WriteLine("One or more destinations failed to receive the package."); } else { Console.ForegroundColor = ConsoleColor.Green; Console.WriteLine($"Package {_counter++} successfully published."); } Console.ResetColor(); Console.WriteLine("Press any key to stop streaming data..."); } } public class MachineExtendedInfo { public Machine Machine { get; set; } public String Organization { get; set; } public String Site { get; set; } } public class JobRunsTestSource : ITelemetryHistorySource { private bool _busy; public string Name { get; } = "JobRuns Source"; public bool RequiresTelemetryDuplicationTracking { get; } = false; public TelemetryHistorySourceDirection Direction { get => TelemetryHistorySourceDirection.Descending; } public Task CanRequestHistory(DateTime from) { return Task.FromResult(!_busy); } public void Dispose() { } public Task> RequestHistory(DateTime from) { _busy = true; ObservablesContext.OverrideSettingsDataSource(new Core.DataSource() { Address = "twine.database.windows.net", IntegratedSecurity = false, Catalog = "Tango", UserName = "Roy", Password = "Aa123456", Type = Core.DataSourceType.SQLServer, }); using (ObservablesContext db = ObservablesContext.CreateDefault()) { var runs = db.JobRuns.Where(x => x.LastUpdated < from).OrderByDescending(x => x.LastUpdated).Take(100).ToList(); var runsT = runs.Select(x => JobRunMapper.MapJobRun(x)).ToList(); _busy = false; return Task.FromResult>(runsT); } } } public class DiagnosticsTestSource : ITelemetryStreamingSource { public bool IsStarted { get; private set; } public string Name { get; } = "Diagnostics Test"; public bool RequiresTelemetryDuplicationTracking { get; } public event EventHandler TelemetryAvailable; public void Dispose() { Stop(); } public void Start() { IsStarted = true; Task.Factory.StartNew(() => { InsightsFile file = InsightsFile.FromFile("sample.insights"); foreach (var rawFrame in file.Frames.Where(x => !x.IsEmpty)) { var monitors = rawFrame.ToInsightsMonitors(); TelemetryDiagnosticsFrame frame = new TelemetryDiagnosticsFrame(); frame.ID = Guid.NewGuid().ToString(); frame.Time = DateTime.UtcNow; frame.Monitors = monitors; TelemetryAvailable?.Invoke(this, new TelemetryAvailableEventArgs() { TelemetryObject = frame }); Thread.Sleep(500); } }); } public void Stop() { IsStarted = false; } } public class LogsTestSource : TelemetryLogsStreamingSource { public LogsTestSource() { Config.Categories.Add(LogCategory.Info); } public override void Start() { base.Start(); Task.Factory.StartNew(() => { for (int i = 0; i < 100; i++) { Thread.Sleep(100); LogManager.Log($"This is a info test log {i + 1}.", LogCategory.Info); } }); } } public class EventsTestSource : ITelemetryStreamingSource { public bool IsStarted { get; } public string Name { get; } = "Events Test Streaming"; public bool RequiresTelemetryDuplicationTracking { get; } public event EventHandler TelemetryAvailable; public void Dispose() { } public void Start() { Task.Factory.StartNew(() => { using (ObservablesContext db = ObservablesContext.CreateDefault()) { foreach (var evType in db.EventTypes.Take(10).ToList()) { TelemetryEvent ev = new TelemetryEvent(); ev.Description = $"Description of {evType.Description}"; ev.EventTypeGuid = evType.Guid; ev.HostName = "TEST MACHINE"; ev.Time = DateTime.UtcNow; ev.Time = DateTime.SpecifyKind(ev.Time, DateTimeKind.Unspecified); TelemetryAvailable?.Invoke(this, new TelemetryAvailableEventArgs() { TelemetryObject = ev }); Thread.Sleep(2000); } } }); } public void Stop() { } } public class MachineStatusTestSource : ITelemetryStreamingSource { public bool IsStarted { get; } public string Name { get; } = "Machine Status Streaming"; public bool RequiresTelemetryDuplicationTracking { get; } public event EventHandler TelemetryAvailable; public void Dispose() { } public void Start() { Task.Factory.StartNew(() => { for (int i = 10; i < 20; i++) { TelemetryAvailable?.Invoke(this, new TelemetryAvailableEventArgs() { TelemetryObject = new TelemetryMachineStatus() { Status = new MachineStatus() { OverallTemperature = i } }, }); Thread.Sleep(1000); } }); } public void Stop() { } } public class JobStatusTestSource : ITelemetryStreamingSource { public bool IsStarted { get; } public string Name { get; } = "Machine Status Streaming"; public bool RequiresTelemetryDuplicationTracking { get; } public event EventHandler TelemetryAvailable; public void Dispose() { } public void Start() { String groupID = Guid.NewGuid().ToString(); Task.Factory.StartNew(() => { for (int i = 0; i < 22; i++) { TelemetryAvailable?.Invoke(this, new TelemetryAvailableEventArgs() { DisableDeliveryRetries = true, TelemetryObject = new TelemetryJobStatus() { JobName = "Test Job", ID = groupID, Status = i == 0 ? TelemetryJobStatus.JobStatus.InProgress : TelemetryJobStatus.JobStatus.Completed, TotalProgress = 21, Progress = i, RemainingTime = TimeSpan.FromSeconds(22 - i), Message = $"Job status message {i}..." }, }); LogManager.Default.Log($"Job Progress Updated {i}/22..."); Thread.Sleep(1000); } }); } public void Stop() { } } }