using Google.Protobuf;
using System;
using System.Collections.Generic;
using System.Collections.ObjectModel;
using System.IO;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Tango.Core;
using Tango.Core.Commands;
using Tango.Core.ExtensionMethods;
using Tango.Core.Threading;
using Tango.Logging;
using Tango.PMR.IO;
using Tango.Transport;
namespace Tango.Integration.Storage
{
public class StorageManager : ExtendedObject
{
private ITransporter _transporter;
#region Properties
private String _currentPath;
///
/// Gets or sets the current path.
///
public String CurrentPath
{
get { return _currentPath; }
set { _currentPath = value; RaisePropertyChangedAuto(); }
}
private StorageFolder _currentFolder;
///
/// Gets or sets the current folder.
///
public StorageFolder CurrentFolder
{
get { return _currentFolder; }
set { _currentFolder = value; RaisePropertyChangedAuto(); }
}
private StorageDrive _storageDrive;
///
/// Gets or sets the storage drive.
///
public StorageDrive StorageDrive
{
get { return _storageDrive; }
set { _storageDrive = value; RaisePropertyChangedAuto(); }
}
///
/// Gets or sets a value indicating whether to disable the transporter keep alive mechanism while a file is being uploaded.
///
public bool SuppressKeepAliveWhileFileUploads { get; set; }
#endregion
#region Constructor
///
/// Initializes a new instance of the class.
///
/// The transporter.
public StorageManager(ITransporter transporter)
{
_transporter = transporter;
}
#endregion
#region Private Methods
///
/// Ensures the transporter is connected.
///
/// Error executing storage command. Transporter is not connected.
private void EnsureTransporter()
{
if (_transporter.State != TransportComponentState.Connected)
{
throw new InvalidOperationException("Error executing storage command. Transporter is not connected.");
}
}
#endregion
#region Public Methods
///
/// Gets the storage drive information.
///
///
public async Task GetStorageDrive()
{
EnsureTransporter();
GetStorageInfoResponse response = null;
GetStorageInfoRequest request = new GetStorageInfoRequest();
try
{
response = await _transporter.SendRequest(request, new TransportRequestConfig() { ShouldLog = true });
}
catch (Exception ex)
{
throw ex;
}
StorageDrive = new StorageDrive()
{
Capacity = response.Capacity,
FreeSpace = response.FreeSpace,
Root = response.Root,
};
return StorageDrive;
}
///
/// Gets the root folder of the current storage driver.
///
///
public Task GetRootFolder()
{
return GetFolder(new StorageFolder()
{
Path = StorageDrive.Root,
});
}
///
/// Gets the specified folder information.
///
/// The folder.
///
public Task GetFolder(StorageFolder folder)
{
return GetFolder(folder.Path);
}
///
/// Gets the specified path folder information.
///
/// The path.
///
public async Task GetFolder(String path)
{
EnsureTransporter();
GetFilesResponse response = null;
GetFilesRequest request = new GetFilesRequest();
request.Path = path;
try
{
response = await _transporter.SendRequest(request, new TransportRequestConfig() { ShouldLog = true });
}
catch (Exception ex)
{
throw ex;
}
StorageFolder sf = new StorageFolder();
sf.Path = path;
List items = new List();
foreach (var item in response.Items)
{
if (!item.Attribute.HasFlag(FileAttribute.Directory))
{
items.Add(new StorageFile()
{
Length = item.Length,
Path = item.FullPath,
Attribute = item.Attribute,
});
}
else
{
items.Add(new StorageFolder()
{
Path = item.FullPath,
Attribute = item.Attribute,
});
}
}
sf.Items = new ReadOnlyCollection(items);
_currentFolder = sf;
_currentPath = sf.Path;
RaisePropertyChanged(nameof(CurrentFolder));
RaisePropertyChanged(nameof(CurrentPath));
return sf;
}
///
/// Uploads the specified file stream to the specified destination path.
/// Returns a file handler for keeping track on the upload progress.
///
/// The path.
/// The stream.
///
public async Task UploadFile(String path, Stream stream)
{
FileUploadRequest request = new FileUploadRequest();
request.Path = path;
request.Length = stream.Length;
var fileUploadResponse = await _transporter.SendRequest(request, new TransportRequestConfig() { ShouldLog = true });
String uploadId = fileUploadResponse.Message.UploadID;
long max_length = fileUploadResponse.Message.MaxChunkLength;
bool canceled = false;
StorageFileHandler handler = new StorageFileHandler(new StorageFile()
{
Path = path,
Length = (int)stream.Length,
}, () =>
{
canceled = true;
});
handler.Total = stream.Length;
ThreadFactory.StartNew(() =>
{
bool oldKeepAlive = _transporter.UseKeepAlive;
try
{
if (SuppressKeepAliveWhileFileUploads)
{
_transporter.UseKeepAlive = false;
}
while (stream.Position < stream.Length)
{
if (!canceled)
{
if (!handler.IsPaused)
{
byte[] buffer = new byte[Math.Min(max_length, stream.Length - stream.Position)];
stream.Read(buffer, 0, buffer.Length);
FileChunkUploadRequest chunk = new FileChunkUploadRequest();
chunk.UploadID = uploadId;
chunk.Path = path;
chunk.Buffer = ByteString.CopyFrom(buffer);
var chunk_response = _transporter.SendRequest(chunk, new TransportRequestConfig() { Priority = QueuePriority.Low }).Result;
if (chunk_response.Message.IsCanceled)
{
canceled = true;
handler.RaiseFailed(new IOException("The storage device controller has canceled the current upload."));
return;
}
handler.Current = stream.Position;
}
else
{
Thread.Sleep(100);
}
}
else
{
handler.RaiseCanceled();
return;
}
}
if (!canceled)
{
handler.RaiseCompleted();
}
}
catch (Exception ex)
{
handler.RaiseFailed(ex);
}
finally
{
if (SuppressKeepAliveWhileFileUploads)
{
_transporter.UseKeepAlive = oldKeepAlive;
}
}
});
return handler;
}
///
/// Uploads the specified file stream to the specified destination path.
///
/// The path.
/// The stream.
///
public async Task UploadFileSync(String path, Stream stream)
{
TaskCompletionSource