using System; using System.Collections.Generic; using System.Diagnostics; using System.IO; using System.Linq; using System.Text; using System.Threading.Tasks; using Tango.Core.DI; using Tango.DataStore; using Tango.DataStore.EF; using Tango.DataStore.Lite; using Tango.DataStore.Remote; using Tango.Integration.ExternalBridge; using Tango.PMR.DataStore; using Tango.PPC.Common.Connection; using Tango.PPC.Common.ExternalBridge; using Tango.Transport; using Tango.Core.ExtensionMethods; using Newtonsoft.Json.Linq; using Tango.BL; using Tango.DataStore.Editing; using Newtonsoft.Json; using Tango.Core; namespace Tango.PPC.Common.DataStore { [TangoCreateWhenRegistered] public class DefaultDataStoreService : ExtendedObject, IDataStoreService, IExternalBridgeRequestHandler { private IDataStoreManager _manager; private IMachineProvider _machineProvider; private List _listenerReceivers; private class ListerReceiver { public String Token { get; set; } public ExternalBridgeReceiver Receiver { get; set; } } public DefaultDataStoreService(IPPCExternalBridgeService externalBridge, IMachineProvider machineProvider) { externalBridge.RegisterRequestHandler(this); _listenerReceivers = new List(); _machineProvider = machineProvider; machineProvider.MachineOperator.RegisterRequestHandler(OnPutDataStoreItemRequest); machineProvider.MachineOperator.RegisterRequestHandler(OnGetDataStoreItemRequest); } public IDataStoreManager GetManager() { if (_manager == null) { _manager = new EFDataStoreManager(); } return _manager; } public void Dispose() { _manager?.Dispose(); } #region Generic Handlers [ExternalBridgeRequestHandlerMethod(typeof(RemoteDataStorePutRequest), RequestHandlerLoggingMode.LogRequestName)] public async Task OnRemoteDataStorePutRequest(RemoteDataStorePutRequest request, String token, ExternalBridgeReceiver receiver) { ValidateCollectionAndKey(request.Collection, request.Key); GetManager().GetCollection(request.Collection).Put(request.Key, request.Value); await receiver.SendGenericResponse(new RemoteDataStorePutResponse(), token); } [ExternalBridgeRequestHandlerMethod(typeof(RemoteDataStoreGetRequest), RequestHandlerLoggingMode.LogRequestName)] public async Task OnRemoteDataStoreGetRequest(RemoteDataStoreGetRequest request, String token, ExternalBridgeReceiver receiver) { ValidateCollectionAndKey(request.Collection, request.Key); if (request.DefaultValue is JObject obj) { request.DefaultValue = DataStoreProtoObject.FromJObject(obj); } var item = GetManager().GetCollection(request.Collection).GetItem(request.Key, request.DefaultValue); await receiver.SendGenericResponse(new RemoteDataStoreGetResponse() { DataType = item.Type, Value = item.Value, }, token); } [ExternalBridgeRequestHandlerMethod(typeof(RemoteDataStoreGetItemRequest), RequestHandlerLoggingMode.LogRequestName)] public async Task OnRemoteDataStoreGetItemRequest(RemoteDataStoreGetItemRequest request, String token, ExternalBridgeReceiver receiver) { ValidateCollectionAndKey(request.Collection, request.Key); if (request.DefaultValue is JObject obj) { request.DefaultValue = DataStoreProtoObject.FromJObject(obj); } var item = GetManager().GetCollection(request.Collection).GetItem(request.Key, request.DefaultValue); await receiver.SendGenericResponse(new RemoteDataStoreGetItemResponse() { Item = CreateRemoteItem(item) }, token); } [ExternalBridgeRequestHandlerMethod(typeof(RemoteDataStoreCountRequest), RequestHandlerLoggingMode.LogRequestName)] public async Task OnRemoteDataStoreCountRequest(RemoteDataStoreCountRequest request, String token, ExternalBridgeReceiver receiver) { var count = GetManager().GetCollection(request.Collection).Count(); await receiver.SendGenericResponse(new RemoteDataStoreCountResponse() { Count = count }, token); } [ExternalBridgeRequestHandlerMethod(typeof(RemoteDataStoreDeleteRequest), RequestHandlerLoggingMode.LogRequestName)] public async Task OnRemoteDataStoreDeleteRequest(RemoteDataStoreDeleteRequest request, String token, ExternalBridgeReceiver receiver) { throw new InvalidOperationException("Deleting from the data store is not allowed."); GetManager().GetCollection(request.Collection).Delete(request.Key); await receiver.SendGenericResponse(new RemoteDataStoreDeleteResponse(), token); } [ExternalBridgeRequestHandlerMethod(typeof(RemoteDataStoreDeleteAllRequest), RequestHandlerLoggingMode.LogRequestName)] public async Task OnRemoteDataStoreDeleteAllRequest(RemoteDataStoreDeleteAllRequest request, String token, ExternalBridgeReceiver receiver) { throw new InvalidOperationException("Deleting from the data store is not allowed."); GetManager().GetCollection(request.Collection).DeleteAll(); await receiver.SendGenericResponse(new RemoteDataStoreDeleteAllResponse(), token); } [ExternalBridgeRequestHandlerMethod(typeof(RemoteDataStoreGetAllRequest), RequestHandlerLoggingMode.LogRequestName)] public async Task OnRemoteDataStoreGetAllRequest(RemoteDataStoreGetAllRequest request, String token, ExternalBridgeReceiver receiver) { var all = GetManager().GetCollection(request.Collection).GetAll(); await receiver.SendGenericResponse(new RemoteDataStoreGetAllResponse() { Items = all.Select(x => CreateRemoteItem(x)).ToList() }, token); } [ExternalBridgeRequestHandlerMethod(typeof(RemoteDataStoreGetCollectionNamesRequest), RequestHandlerLoggingMode.LogRequestName)] public async Task OnRemoteDataStoreGetCollectionNamesRequest(RemoteDataStoreGetCollectionNamesRequest request, String token, ExternalBridgeReceiver receiver) { var names = GetManager().GetCollectionNames(); await receiver.SendGenericResponse(new RemoteDataStoreGetCollectionNamesResponse() { Names = names }, token); } [ExternalBridgeRequestHandlerMethod(typeof(RemoteDataStoreGetAllItemsRequest), RequestHandlerLoggingMode.LogRequestName)] public async Task OnRemoteDataStoreGetAllItemsRequest(RemoteDataStoreGetAllItemsRequest request, String token, ExternalBridgeReceiver receiver) { List collections = new List(); using (ObservablesContext db = ObservablesContext.CreateDefault()) { var items = db.DataStoreItems.Where(x => !x.IsDeleted).ToList(); foreach (var itemsGroup in items.GroupBy(x => x.CollectionName)) { RemoteDataStoreCollection collection = new RemoteDataStoreCollection(); collection.Name = itemsGroup.First().CollectionName; collections.Add(collection); foreach (var item in itemsGroup) { collection.Items.Add(CreateRemoteItem(item.ToDataStoreItem())); } } } await receiver.SendGenericResponse(new RemoteDataStoreGetAllItemsResponse() { Collections = collections }, token); } [ExternalBridgeRequestHandlerMethod(typeof(UpdateDataStoreRequest), RequestHandlerLoggingMode.LogRequestName)] public async Task OnUpdateDataStoreRequest(UpdateDataStoreRequest request, String token, ExternalBridgeReceiver receiver) { using (ObservablesContext db = ObservablesContext.CreateDefault()) { var allItems = db.DataStoreItems.ToList(); List deleted = new List(); foreach (var guid in request.ToDelete) { var item = allItems.FirstOrDefault(x => x.Guid == guid); if (item != null) { item.IsDeleted = true; item.IsSynchronized = true; item.LastUpdated = DateTime.UtcNow; deleted.Add(item); } } foreach (var item in request.ToUpsert) { ValidateCollectionAndKey(item.CollectionName, item.Key); } foreach (var item in request.ToUpsert) { var itemDb = allItems.FirstOrDefault(x => x.CollectionName == item.CollectionName && x.Key == item.Key); if (itemDb == null) { itemDb = new BL.Entities.DataStoreItem(); itemDb.Guid = item.Guid; db.DataStoreItems.Add(itemDb); } itemDb.CollectionName = item.CollectionName; itemDb.DataType = item.DataType; itemDb.IsDeleted = item.IsDeleted; itemDb.IsSynchronized = true; itemDb.Key = item.Key; itemDb.LastUpdated = item.LastUpdated; itemDb.Value = item.Value; } db.SaveChanges(); if (_machineProvider.IsConnected) { Core.Threading.ThreadFactory.StartNew(() => { foreach (var item in request.ToUpsert) { try { var response = _machineProvider.MachineOperator.SendRequest(new DataStoreItemModifiedRequest() { Collection = item.CollectionName, Key = item.Key }).Result; } catch (Exception ex) { Logging.LogManager.Default.Log(ex, $"Error notifying firmware about data store item change '{item.CollectionName}.{item.Key}'."); } } foreach (var item in deleted) { try { var response = _machineProvider.MachineOperator.SendRequest(new DataStoreItemModifiedRequest() { Collection = item.CollectionName, Key = item.Key }).Result; } catch (Exception ex) { Logging.LogManager.Default.Log(ex, $"Error notifying firmware about data store item change '{item.CollectionName}.{item.Key}'."); } } }); } } await receiver.SendGenericResponse(new UpdateDataStoreResponse(), token); } [ExternalBridgeRequestHandlerMethod(typeof(RemoteDataStoreStartListenRequest), RequestHandlerLoggingMode.LogRequestName)] public async Task OnRemoteDataStoreStartListenRequest(RemoteDataStoreStartListenRequest request, String token, ExternalBridgeReceiver receiver) { _listenerReceivers.Add(new ListerReceiver() { Receiver = receiver, Token = token }); await receiver.SendGenericResponse(new RemoteDataStoreStartListenResponse() { ChangeType = RemoteDataStoreChangeType.None, Item = null }, token); } private RemoteDataStoreItem CreateRemoteItem(IDataStoreItem item) { RemoteDataStoreItem remote = new RemoteDataStoreItem(); item.MapPropertiesTo(remote, MappingFlags.All); return remote; } #endregion #region Proto Handlers private async void OnPutDataStoreItemRequest(ITransporter transporter, PutDataStoreItemRequest request, string token) { try { ValidateCollectionAndKey(request.Collection, request.Key); GetManager().GetCollection(request.Collection).Put(request.Key, GetPMRValue(request.Item)); await transporter.SendResponse(new PutDataStoreItemResponse(), token); try { if (_listenerReceivers.Count > 0) { var item = GetManager().GetCollection(request.Collection).GetItem(request.Key); var remoteItem = CreateRemoteItem(item); foreach (var listener in _listenerReceivers.ToList()) { try { await listener.Receiver.SendGenericResponse(new RemoteDataStoreStartListenResponse() { ChangeType = RemoteDataStoreChangeType.Modified, CollectionName = request.Collection, Item = remoteItem }, listener.Token); } catch (Exception ex) { LogManager.Log(ex, $"Error sending data store item notification to receiver '{listener.Receiver.Adapter.ToString()}'"); } } } } catch (Exception ex) { LogManager.Log(ex, "Error generating data store item notifications."); } } catch (Exception ex) { try { await transporter.SendResponse(new PutDataStoreItemResponse(), token, new TransportResponseConfig() { ErrorCode = PMR.Common.ErrorCode.GeneralDatastoreError, ErrorMessage = ex.Message }); } catch (Exception exx) { Debug.WriteLine(exx); } } } private async void OnGetDataStoreItemRequest(ITransporter transporter, GetDataStoreItemRequest request, string token) { try { ValidateCollectionAndKey(request.Collection, request.Key); var item = GetManager().GetCollection(request.Collection).GetItem(request.Key, GetPMRValue(request.DefaultItem)); await transporter.SendResponse(new GetDataStoreItemResponse() { Key = item.Key, Item = CreatePMRDataStoreItem(item), }, token); } catch (KeyNotFoundException ex) { try { await transporter.SendResponse(new GetDataStoreItemResponse(), token, new TransportResponseConfig() { ErrorCode = PMR.Common.ErrorCode.KeyNotFound, ErrorMessage = ex.Message }); } catch (Exception exx) { Debug.WriteLine(exx); } } catch (Exception ex) { try { await transporter.SendResponse(new GetDataStoreItemResponse(), token, new TransportResponseConfig() { ErrorCode = PMR.Common.ErrorCode.GeneralDatastoreError, ErrorMessage = ex.Message }); } catch (Exception exx) { Debug.WriteLine(exx); } } } #region Helpers private DataStoreItem CreatePMRDataStoreItem(IDataStoreItem item) { DataStoreItem pmr = new DataStoreItem(); pmr.DataType = (PMR.DataStore.DataType)item.Type; switch (item.Type) { case Tango.DataStore.DataType.Int32: pmr.Int32Value = (int)Convert.ChangeType(item.Value, typeof(int)); break; case Tango.DataStore.DataType.Float: pmr.FloatValue = (float)Convert.ChangeType(item.Value, typeof(float)); break; case Tango.DataStore.DataType.Double: pmr.DoubleValue = (double)Convert.ChangeType(item.Value, typeof(double)); break; case Tango.DataStore.DataType.Boolean: pmr.BooleanValue = (bool)Convert.ChangeType(item.Value, typeof(bool)); break; case Tango.DataStore.DataType.String: pmr.StringValue = (String)Convert.ChangeType(item.Value, typeof(String)); break; case Tango.DataStore.DataType.Bytes: pmr.BytesValue = Google.Protobuf.ByteString.CopyFrom((byte[])Convert.ChangeType(item.Value, typeof(byte[]))); break; case Tango.DataStore.DataType.Proto: DataStoreProtoObject proto = item.Value as DataStoreProtoObject; pmr.BytesValue = Google.Protobuf.ByteString.CopyFrom(proto.Data); pmr.ProtoType = proto.MessageType; break; } return pmr; } private Object GetPMRValue(DataStoreItem item) { if (item == null) return null; switch (item.DataType) { case PMR.DataStore.DataType.Int32: return item.Int32Value; case PMR.DataStore.DataType.Float: return item.FloatValue; case PMR.DataStore.DataType.Double: return item.DoubleValue; case PMR.DataStore.DataType.Boolean: return item.BooleanValue; case PMR.DataStore.DataType.String: return item.StringValue; case PMR.DataStore.DataType.Bytes: return item.BytesValue.ToByteArray(); case PMR.DataStore.DataType.Proto: return DataStoreProtoObject.FromPMRDataStoreItem(item); } throw new NotSupportedException("The specified data type if not supported."); } #endregion #endregion private void ValidateCollectionAndKey(String collection = null, String key = null) { if (collection != null) { if (!DataStoreHelper.ValidateCollectionOrKeyName(collection)) { throw new ArgumentException("Collection name contains invalid characters."); } } if (key != null) { if (!DataStoreHelper.ValidateCollectionOrKeyName(key)) { throw new ArgumentException("Item key contains invalid characters."); } } } public void OnReceiverDisconnected(ExternalBridgeReceiver receiver) { //Do nothing. _listenerReceivers.RemoveAll(x => x.Receiver == receiver); } } }