using Google.Protobuf; using System; using System.Collections.Generic; using System.Collections.ObjectModel; using System.IO; using System.Linq; using System.Text; using System.Threading; using System.Threading.Tasks; using Tango.Core; using Tango.Core.Commands; using Tango.Core.ExtensionMethods; using Tango.Core.Threading; using Tango.Logging; using Tango.PMR.IO; using Tango.Transport; namespace Tango.Integration.Storage { public class StorageManager : ExtendedObject { private ITransporter _transporter; #region Properties private String _currentPath; /// /// Gets or sets the current path. /// public String CurrentPath { get { return _currentPath; } set { _currentPath = value; RaisePropertyChangedAuto(); } } private StorageFolder _currentFolder; /// /// Gets or sets the current folder. /// public StorageFolder CurrentFolder { get { return _currentFolder; } set { _currentFolder = value; RaisePropertyChangedAuto(); } } private StorageDrive _storageDrive; /// /// Gets or sets the storage drive. /// public StorageDrive StorageDrive { get { return _storageDrive; } set { _storageDrive = value; RaisePropertyChangedAuto(); } } /// /// Gets or sets a value indicating whether to disable the transporter keep alive mechanism while a file is being uploaded. /// public bool SuppressKeepAliveWhileFileUploads { get; set; } #endregion #region Constructor /// /// Initializes a new instance of the class. /// /// The transporter. public StorageManager(ITransporter transporter) { _transporter = transporter; } #endregion #region Private Methods /// /// Ensures the transporter is connected. /// /// Error executing storage command. Transporter is not connected. private void EnsureTransporter() { if (_transporter.State != TransportComponentState.Connected) { throw new InvalidOperationException("Error executing storage command. Transporter is not connected."); } } #endregion #region Public Methods /// /// Gets the storage drive information. /// /// public async Task GetStorageDrive() { EnsureTransporter(); GetStorageInfoResponse response = null; GetStorageInfoRequest request = new GetStorageInfoRequest(); try { response = await _transporter.SendRequest(request, new TransportRequestConfig() { ShouldLog = true }); } catch (Exception ex) { throw ex; } StorageDrive = new StorageDrive() { Capacity = response.Capacity, FreeSpace = response.FreeSpace, Root = response.Root, }; return StorageDrive; } /// /// Gets the root folder of the current storage driver. /// /// public Task GetRootFolder() { return GetFolder(new StorageFolder() { Path = StorageDrive.Root, }); } /// /// Gets the specified folder information. /// /// The folder. /// public Task GetFolder(StorageFolder folder) { return GetFolder(folder.Path); } /// /// Gets the specified path folder information. /// /// The path. /// public async Task GetFolder(String path) { EnsureTransporter(); GetFilesResponse response = null; GetFilesRequest request = new GetFilesRequest(); request.Path = path; try { response = await _transporter.SendRequest(request, new TransportRequestConfig() { ShouldLog = true }); } catch (Exception ex) { throw ex; } StorageFolder sf = new StorageFolder(); sf.Path = path; List items = new List(); foreach (var item in response.Items) { if (!item.Attribute.HasFlag(FileAttribute.Directory)) { items.Add(new StorageFile() { Length = item.Length, Path = item.FullPath, Attribute = item.Attribute, }); } else { items.Add(new StorageFolder() { Path = item.FullPath, Attribute = item.Attribute, }); } } sf.Items = new ReadOnlyCollection(items); _currentFolder = sf; _currentPath = sf.Path; RaisePropertyChanged(nameof(CurrentFolder)); RaisePropertyChanged(nameof(CurrentPath)); return sf; } /// /// Uploads the specified file stream to the specified destination path. /// Returns a file handler for keeping track on the upload progress. /// /// The path. /// The stream. /// public async Task UploadFile(String path, Stream stream) { FileUploadRequest request = new FileUploadRequest(); request.Path = path; request.Length = stream.Length; var fileUploadResponse = await _transporter.SendRequest(request, new TransportRequestConfig() { ShouldLog = true }); String uploadId = fileUploadResponse.Message.UploadID; long max_length = fileUploadResponse.Message.MaxChunkLength; bool canceled = false; StorageFileHandler handler = new StorageFileHandler(new StorageFile() { Path = path, Length = (int)stream.Length, }, () => { canceled = true; }); handler.Total = stream.Length; ThreadFactory.StartNew(() => { bool oldKeepAlive = _transporter.UseKeepAlive; try { if (SuppressKeepAliveWhileFileUploads) { _transporter.UseKeepAlive = false; } while (stream.Position < stream.Length) { if (!canceled) { if (!handler.IsPaused) { byte[] buffer = new byte[Math.Min(max_length, stream.Length - stream.Position)]; stream.Read(buffer, 0, buffer.Length); FileChunkUploadRequest chunk = new FileChunkUploadRequest(); chunk.UploadID = uploadId; chunk.Path = path; chunk.Buffer = ByteString.CopyFrom(buffer); var chunk_response = _transporter.SendRequest(chunk, new TransportRequestConfig() { Priority = QueuePriority.Low }).Result; if (chunk_response.Message.IsCanceled) { canceled = true; handler.RaiseFailed(new IOException("The storage device controller has canceled the current upload.")); return; } handler.Current = stream.Position; } else { Thread.Sleep(100); } } else { handler.RaiseCanceled(); return; } } if (!canceled) { handler.RaiseCompleted(); } } catch (Exception ex) { handler.RaiseFailed(ex); } finally { if (SuppressKeepAliveWhileFileUploads) { _transporter.UseKeepAlive = oldKeepAlive; } } }); return handler; } /// /// Uploads the specified file stream to the specified destination path. /// /// The path. /// The stream. /// public async Task UploadFileSync(String path, Stream stream) { TaskCompletionSource source = new TaskCompletionSource(); var handler = await UploadFile(path, stream); handler.Completed += (_, __) => { source.SetResult(true); }; handler.Failed += (_, ex) => { source.SetException(ex); }; await source.Task; } /// /// Downloads the specified storage file to the specified stream. /// Returns a file handler for keeping track on the download progress. /// /// The file. /// The stream. /// public async Task DownloadFile(StorageFile file, Stream stream) { FileDownloadRequest request = new FileDownloadRequest(); request.FileName = file.Path; var fileDownloadResponse = await _transporter.SendRequest(request, new TransportRequestConfig() { ShouldLog = true }); String download_id = fileDownloadResponse.Message.DownloadID; long max_length = fileDownloadResponse.Message.MaxChunkLength; bool canceled = false; StorageFileHandler handler = new StorageFileHandler(file, () => { canceled = true; }); handler.Total = file.Length; ThreadFactory.StartNew(() => { try { while (stream.Length < file.Length) { if (!canceled) { if (!handler.IsPaused) { FileChunkDownloadRequest chunk = new FileChunkDownloadRequest(); chunk.DownloadID = download_id; chunk.FileName = file.Path; chunk.Position = stream.Length; var chunk_response = _transporter.SendRequest(chunk, new TransportRequestConfig() { Priority = QueuePriority.Low }).Result; if (chunk_response.Message.IsCanceled) { canceled = true; handler.RaiseFailed(new IOException("The storage device controller has canceled the current download.")); return; } byte[] buffer = chunk_response.Message.Buffer.ToByteArray(); stream.Write(buffer, 0, buffer.Length); handler.Current = stream.Length; } else { Thread.Sleep(100); } } else { handler.RaiseCanceled(); return; } } if (!canceled) { handler.RaiseCompleted(); } } catch (Exception ex) { handler.RaiseFailed(ex); } }); return handler; } /// /// Deletes the specified storage item. /// /// The item. /// public async Task DeleteItem(StorageItem item) { await _transporter.SendRequest(new DeleteRequest() { Path = item.Path, Attribute = item.Attribute, }, new TransportRequestConfig() { ShouldLog = true }); } /// /// Creates a new folder on the specified destination path. /// /// The path. /// public async Task CreateFolder(String path) { await _transporter.SendRequest(new CreateRequest() { Path = path, Attribute = FileAttribute.Directory, }, new TransportRequestConfig() { ShouldLog = true }); } #endregion } }