From e086e62daa7b24fd591c0a7287055900d5663459 Mon Sep 17 00:00:00 2001 From: Martijn Hoekstra Date: Tue, 3 Dec 2019 14:08:34 +0100 Subject: [PATCH 1/7] paralellize pre-processing --- Hotsapi.Uploader.Common/Manager.cs | 84 ++++++++++++++----- Hotsapi.Uploader.Common/UploadStatus.cs | 4 +- Hotsapi.Uploader.Common/Uploader.cs | 2 +- Hotsapi.Uploader.Common/WorkerPool.cs | 78 +++++++++++++++++ .../UIHelpers/UploadColorConverter.cs | 8 +- 5 files changed, 149 insertions(+), 27 deletions(-) create mode 100644 Hotsapi.Uploader.Common/WorkerPool.cs diff --git a/Hotsapi.Uploader.Common/Manager.cs b/Hotsapi.Uploader.Common/Manager.cs index d994bd2..fd80ee9 100644 --- a/Hotsapi.Uploader.Common/Manager.cs +++ b/Hotsapi.Uploader.Common/Manager.cs @@ -21,7 +21,8 @@ public class Manager : INotifyPropertyChanged /// /// Upload thead count /// - public const int MaxThreads = 4; + public readonly int MaxThreads = Environment.ProcessorCount; + public const int MaxUploads = 4; /// /// Replay list @@ -115,9 +116,7 @@ public async void Start(IMonitor monitor, IAnalyzer analyzer, IUploader uploader _analyzer.MinimumBuild = await _uploader.GetMinimumBuild(); - for (int i = 0; i < MaxThreads; i++) { - Task.Run(UploadLoop).Forget(); - } + _ = UploadLoop(); } public void Stop() @@ -128,32 +127,72 @@ public void Stop() private async Task UploadLoop() { - while (await processingQueue.OutputAvailableAsync()) { - try { - var file = await processingQueue.TakeAsync(); + var rateLimitUploading = new SemaphoreSlim(MaxUploads); + using (var rateLimitParsing = new SemaphoreSlim(MaxThreads)) { + while (await processingQueue.OutputAvailableAsync()) { + try { + var file = await processingQueue.TakeAsync(); + Task parsing = null; + try { + await rateLimitParsing.WaitAsync(); + file.UploadStatus = UploadStatus.Preprocessing; + parsing = WorkerPool.RunBackground(() => _analyzer.Analyze(file)); + } + finally { + _ = rateLimitParsing.Release(); + } + + if (parsing != null) { + + var settingUpdate = parsing.ContinueWith((Task trp) => { + if (trp.Status == TaskStatus.RanToCompletion) { + file.UploadStatus = UploadStatus.Preprocessed; + } + return trp.Result; + }); - file.UploadStatus = UploadStatus.InProgress; + //don't await the upload task, but bound it by the upload ratelimiter + _ = Task.Run(async () => { + try { + await rateLimitUploading.WaitAsync(); + var parsed = await settingUpdate; + file.UploadStatus = UploadStatus.Uploading; + await DoFileUpload(file, parsed); + } + finally { + _ = rateLimitUploading.Release(); + } + }); + } - // test if replay is eligible for upload (not AI, PTR, Custom, etc) - var replay = _analyzer.Analyze(file); - if (file.UploadStatus == UploadStatus.InProgress) { - // if it is, upload it - await _uploader.Upload(file); } - SaveReplayList(); - if (ShouldDelete(file, replay)) { - DeleteReplay(file); + catch (Exception ex) { + _log.Error(ex, "Error in upload loop"); } } - catch (Exception ex) { - _log.Error(ex, "Error in upload loop"); - } } } + private async Task DoFileUpload(ReplayFile file, Replay replay) + { + // Analyze will set the upload status as a side-effect when it's unsuitable for uploading + if (file.UploadStatus == UploadStatus.Uploading) { + await _uploader.Upload(file); + } + SaveReplayList(); + if (ShouldDelete(file, replay)) { + DeleteReplay(file); + } + } + + private bool IsProcessingStatus(UploadStatus status) => + status == UploadStatus.Preprocessing || + status == UploadStatus.Preprocessed || + status == UploadStatus.Uploading; + private void RefreshStatusAndAggregates() { - _status = Files.Any(x => x.UploadStatus == UploadStatus.InProgress) ? "Uploading..." : "Idle"; + _status = Files.Select(x => x.UploadStatus).Any(IsProcessingStatus) ? "Processing..." : "Idle"; _aggregates = Files.GroupBy(x => x.UploadStatus).ToDictionary(x => x.Key, x => x.Count()); PropertyChanged?.Invoke(this, new PropertyChangedEventArgs(nameof(Status))); PropertyChanged?.Invoke(this, new PropertyChangedEventArgs(nameof(Aggregates))); @@ -163,8 +202,9 @@ private void SaveReplayList() { try { // save only replays with fixed status. Will retry failed ones on next launch. - var ignored = new[] { UploadStatus.None, UploadStatus.UploadError, UploadStatus.InProgress }; - _storage.Save(Files.Where(x => !ignored.Contains(x.UploadStatus))); + var ignored = new[] { UploadStatus.None, UploadStatus.UploadError}; + bool isIgnored(UploadStatus status) => ignored.Contains(status) || IsProcessingStatus(status); + _storage.Save(Files.Where(file => !isIgnored(file.UploadStatus))); } catch (Exception ex) { _log.Error(ex, "Error saving replay list"); diff --git a/Hotsapi.Uploader.Common/UploadStatus.cs b/Hotsapi.Uploader.Common/UploadStatus.cs index d2bb9ef..3059f17 100644 --- a/Hotsapi.Uploader.Common/UploadStatus.cs +++ b/Hotsapi.Uploader.Common/UploadStatus.cs @@ -8,7 +8,9 @@ public enum UploadStatus { None, Success, - InProgress, + Preprocessed, + Preprocessing, + Uploading, UploadError, Duplicate, AiDetected, diff --git a/Hotsapi.Uploader.Common/Uploader.cs b/Hotsapi.Uploader.Common/Uploader.cs index 130f947..500dc4a 100644 --- a/Hotsapi.Uploader.Common/Uploader.cs +++ b/Hotsapi.Uploader.Common/Uploader.cs @@ -35,7 +35,7 @@ public Uploader() /// public async Task Upload(ReplayFile file) { - file.UploadStatus = UploadStatus.InProgress; + file.UploadStatus = UploadStatus.Uploading; if (file.Fingerprint != null && await CheckDuplicate(file.Fingerprint)) { _log.Debug($"File {file} marked as duplicate"); file.UploadStatus = UploadStatus.Duplicate; diff --git a/Hotsapi.Uploader.Common/WorkerPool.cs b/Hotsapi.Uploader.Common/WorkerPool.cs new file mode 100644 index 0000000..797cabd --- /dev/null +++ b/Hotsapi.Uploader.Common/WorkerPool.cs @@ -0,0 +1,78 @@ +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Diagnostics; +using System.Threading; +using System.Threading.Tasks; + +namespace Hotsapi.Uploader.Common +{ + internal static class WorkerPool + { + private static TaskScheduler BackgroundScheduler = new LowPriorityScheduler(); + public static Task RunBackground(Action action) => RunBackground(action, CancellationToken.None); + public static Task RunBackground(Action action, CancellationToken token) => Task.Factory.StartNew(action, token, TaskCreationOptions.LongRunning, BackgroundScheduler); + public static Task RunBackground(Func action) => RunBackground(action, CancellationToken.None); + public static Task RunBackground(Func action, CancellationToken token) => Task.Factory.StartNew(action, token, TaskCreationOptions.LongRunning, BackgroundScheduler); + + } + + /// + /// Fixed number threadpool task scheduler that runs tasks on low-priority threads + /// + public class LowPriorityScheduler : TaskScheduler, IDisposable + { + private BlockingCollection TaskQueue { get; } = new BlockingCollection(); + private IEnumerable Consumable => TaskQueue.GetConsumingEnumerable(); + + public LowPriorityScheduler(int maxThreads) + { + for (var i = 0; i < maxThreads; i++) { + var workerThread = new Thread(PerformTasks) { + Name = $"Low Priority Tread {i}/{maxThreads}", + IsBackground = true, + Priority = ThreadPriority.BelowNormal + }; + workerThread.Start(); + } + } + + public LowPriorityScheduler() : this(Environment.ProcessorCount) { } + + private void PerformTasks() + { + foreach (var task in Consumable) { + if (task.Status == TaskStatus.WaitingToRun || task.Status == TaskStatus.WaitingForActivation || task.Status == TaskStatus.Created) { + // execute the task under the current thread + if (!base.TryExecuteTask(task)) Trace.WriteLine("Error"); + } else { + Trace.WriteLine("Unrunnable Task Status"); + } + } + var col = TaskQueue; + if (col != null) col.Dispose(); + } + protected override IEnumerable GetScheduledTasks() => TaskQueue; + protected override void QueueTask(Task task) => TaskQueue.Add(task); + protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued) => false; + + #region IDisposable Support + private bool disposedValue = false; // To detect redundant calls + + protected virtual void Dispose(bool disposing) + { + if (!disposedValue) { + if (disposing) { + TaskQueue.CompleteAdding(); + } + disposedValue = true; + } + } + + // This code added to correctly implement the disposable pattern. + public void Dispose() => + // Do not change this code. Put cleanup code in Dispose(bool disposing) above. + Dispose(true); + #endregion + } +} diff --git a/Hotsapi.Uploader.Windows/UIHelpers/UploadColorConverter.cs b/Hotsapi.Uploader.Windows/UIHelpers/UploadColorConverter.cs index 0736c34..3e466e1 100644 --- a/Hotsapi.Uploader.Windows/UIHelpers/UploadColorConverter.cs +++ b/Hotsapi.Uploader.Windows/UIHelpers/UploadColorConverter.cs @@ -11,9 +11,11 @@ protected override Brush Convert(UploadStatus value) { switch (value) { case UploadStatus.Success: - return GetBrush("StatusUploadSuccessBrush"); - - case UploadStatus.InProgress: + return GetBrush("StatusUploadSuccessBrush"); + + case UploadStatus.Preprocessing: + case UploadStatus.Preprocessed: + case UploadStatus.Uploading: return GetBrush("StatusUploadInProgressBrush"); case UploadStatus.Duplicate: From 69427811d647bd55e5102902c60ae1650796d483 Mon Sep 17 00:00:00 2001 From: Martijn Hoekstra Date: Wed, 4 Dec 2019 13:36:27 +0100 Subject: [PATCH 2/7] restructure manager three queues, one for parsing, one for (bulk) duplicate checks, one for uploads, each running at its own pace --- Hotsapi.Uploader.Common/Analyzer.cs | 39 +- .../ConcurrentAyncBuffer.cs | 98 +++ Hotsapi.Uploader.Common/LockExtensions.cs | 77 +++ Hotsapi.Uploader.Common/Manager.cs | 581 +++++++++--------- Hotsapi.Uploader.Common/UploadStatus.cs | 7 +- Hotsapi.Uploader.Common/Uploader.cs | 14 +- Hotsapi.Uploader.Windows/App.config | 2 +- 7 files changed, 497 insertions(+), 321 deletions(-) create mode 100644 Hotsapi.Uploader.Common/ConcurrentAyncBuffer.cs create mode 100644 Hotsapi.Uploader.Common/LockExtensions.cs diff --git a/Hotsapi.Uploader.Common/Analyzer.cs b/Hotsapi.Uploader.Common/Analyzer.cs index 9dd5eb3..9bc18e7 100644 --- a/Hotsapi.Uploader.Common/Analyzer.cs +++ b/Hotsapi.Uploader.Common/Analyzer.cs @@ -16,21 +16,17 @@ public class Analyzer : IAnalyzer private static Logger _log = LogManager.GetCurrentClassLogger(); /// - /// Analyze replay locally before uploading + /// Analyze replay locally before uploading. + /// + /// Sets file status as a side-effect. /// /// Replay file public Replay Analyze(ReplayFile file) { try { - var result = DataParser.ParseReplay(file.Filename, false, false, false, true); - var replay = result.Item2; - var parseResult = result.Item1; - var status = GetPreStatus(replay, parseResult); - - if (status != null) { - file.UploadStatus = status.Value; - } - + file.UploadStatus = UploadStatus.Preprocessing; + var (parseResult, replay) = DataParser.ParseReplay(file.Filename, false, false, false, true); + file.UploadStatus = GetPreStatus(replay, parseResult) ?? file.UploadStatus; if (parseResult != DataParser.ReplayParseResult.Success) { return null; } @@ -44,7 +40,7 @@ public Replay Analyze(ReplayFile file) } } - public UploadStatus? GetPreStatus(Replay replay, DataParser.ReplayParseResult parseResult) + private UploadStatus? GetPreStatus(Replay replay, DataParser.ReplayParseResult parseResult) { switch (parseResult) { case DataParser.ReplayParseResult.ComputerPlayerFound: @@ -55,22 +51,15 @@ public Replay Analyze(ReplayFile file) return UploadStatus.PtrRegion; case DataParser.ReplayParseResult.PreAlphaWipe: - return UploadStatus.TooOld; - } - - if (parseResult != DataParser.ReplayParseResult.Success) { - return null; - } - - if (replay.GameMode == GameMode.Custom) { - return UploadStatus.CustomGame; - } - - if (replay.ReplayBuild < MinimumBuild) { - return UploadStatus.TooOld; + return UploadStatus.TooOld; + case DataParser.ReplayParseResult.Incomplete: + return UploadStatus.Incomplete; } - return null; + return parseResult != DataParser.ReplayParseResult.Success ? null + : replay.GameMode == GameMode.Custom ? (UploadStatus?)UploadStatus.CustomGame + : replay.ReplayBuild < MinimumBuild ? (UploadStatus?)UploadStatus.TooOld + : (UploadStatus?)UploadStatus.Preprocessed; } /// diff --git a/Hotsapi.Uploader.Common/ConcurrentAyncBuffer.cs b/Hotsapi.Uploader.Common/ConcurrentAyncBuffer.cs new file mode 100644 index 0000000..9ec144b --- /dev/null +++ b/Hotsapi.Uploader.Common/ConcurrentAyncBuffer.cs @@ -0,0 +1,98 @@ +using System.Collections.Generic; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; + +namespace Hotsapi.Uploader.Common +{ + /// + /// A thread-safe, concurrent buffer that has asynchronous operations for enqueuing and bulk dequeueing. + /// + public class ConcurrentAyncBuffer + { + private Queue Items { get; } = new Queue(); + private Queue>> BulkRequests { get; } = new Queue>>(); + private SemaphoreSlim Mutex { get; } = new SemaphoreSlim(1); + /// + /// Enqueue a single item + /// + /// the item to enqueue + /// A task that will be completed when the item is enqueued + public Task EnqueueAsync(A item) => EnqueueAsync(item, CancellationToken.None); + /// + /// Enqueue a single item + /// + /// the item to enqueue + /// a cancellation token to cancel trying to enqueue the item + /// A task that will be completed when the item is enqueued + public Task EnqueueAsync(A item, CancellationToken token) => EnqueueManyAsync(new List() { item }, token); + /// + /// Enqueue all items + /// + /// the items to enqueue + /// a cancellation token to cancel trying to enqueue items + /// A task that will be completed when the items are enqueued + public Task EnqueueManyAsync(IEnumerable items, CancellationToken token) => Mutex.Locked(() => { + var succeeded = false; + while (!succeeded) { + if (BulkRequests.Any()) { + succeeded = BulkRequests.Dequeue().TrySetResult(items); + } else { + foreach (var item in items) { + Items.Enqueue(item); + } + succeeded = true; + } + } + }, token); + + /// + /// Enqueue all items + /// + /// the items to enqueue + /// a task that will be completed when the items are enqueued. + public Task EnqueueManyAsync(IEnumerable items) => EnqueueManyAsync(items, CancellationToken.None); + + /// + /// Dequeue all A's as soon as they're available. + /// + /// A cancellationtoken that will cancel fetching data + /// A task that will complete with all A's available + public async Task> DequeueAsync(CancellationToken token) + { + var locked = 0; + try { + await Mutex.WaitAsync(token); + locked = 1; + Task> resultTask; + if (Items.Any()) { + var result = new List(); + while (Items.Any()) { + result.Add(Items.Dequeue()); + } + resultTask = Task.FromResult>(result); + } else { + var completion = new TaskCompletionSource>(); + BulkRequests.Enqueue(completion); + _ = token.Register(() => completion.TrySetCanceled()); + resultTask = completion.Task; + } + if (locked > 0) { + _ = Mutex.Release(locked); + } + locked = 0; + return await resultTask; + } + finally { + if (locked > 0) { + _ = Mutex.Release(locked); + } + } + } + /// + /// Dequeue all A's as soon as any are available + /// + /// A task that will complete with all A's available + public Task> DequeueAsync() => DequeueAsync(CancellationToken.None); + } +} diff --git a/Hotsapi.Uploader.Common/LockExtensions.cs b/Hotsapi.Uploader.Common/LockExtensions.cs new file mode 100644 index 0000000..d590ee4 --- /dev/null +++ b/Hotsapi.Uploader.Common/LockExtensions.cs @@ -0,0 +1,77 @@ +using System; +using System.Threading; +using System.Threading.Tasks; + +namespace Hotsapi.Uploader.Common +{ + /// + /// Provides extensions for using SemaphoreSlim in a safe way that will always release the semaphore + /// + public static class LockExtensions + { + public static async Task Locked(this SemaphoreSlim semaphore, Func thunk, CancellationToken token) + { + var locked = 0; + try { + await semaphore.WaitAsync(token); + locked = 1; + return thunk(); + } + finally { + if (locked > 0) { + _ = semaphore.Release(locked); + } + } + } + + public static async Task Locked(this SemaphoreSlim semaphore, Action thunk, CancellationToken token) + { + var locked = 0; + try { + await semaphore.WaitAsync(token); + locked = 1; + thunk(); + } + finally { + if (locked > 0) { + _ = semaphore.Release(locked); + } + } + } + + public static async Task LockedTask(this SemaphoreSlim semaphore, Func thunk, CancellationToken token) + { + var locked = 0; + try { + await semaphore.WaitAsync(token); + locked = 1; + await thunk(); + } + finally { + if (locked > 0) { + _ = semaphore.Release(locked); + } + } + } + + public static async Task LockedTask(this SemaphoreSlim semaphore, Func> thunk, CancellationToken token) + { + var locked = 0; + try { + await semaphore.WaitAsync(token); + locked = 1; + return await thunk(); + } + finally { + if (locked > 0) { + _ = semaphore.Release(locked); + } + } + } + + public static Task Locked(this SemaphoreSlim semaphore, Func thunk) => Locked(semaphore, thunk, CancellationToken.None); + public static Task Locked(this SemaphoreSlim semaphore, Action thunk) => Locked(semaphore, thunk, CancellationToken.None); + public static Task LockedTask(this SemaphoreSlim semaphore, Func> thunk) => LockedTask(semaphore, thunk, CancellationToken.None); + public static Task LockedTask(this SemaphoreSlim semaphore, Func thunk) => LockedTask(semaphore, thunk, CancellationToken.None); + } +} diff --git a/Hotsapi.Uploader.Common/Manager.cs b/Hotsapi.Uploader.Common/Manager.cs index fd80ee9..66c9004 100644 --- a/Hotsapi.Uploader.Common/Manager.cs +++ b/Hotsapi.Uploader.Common/Manager.cs @@ -1,291 +1,292 @@ -using System; -using System.Collections.Generic; -using System.Collections.ObjectModel; -using System.Collections.Specialized; -using System.ComponentModel; -using System.Linq; -using System.Text; -using System.Threading.Tasks; -using System.IO; -using System.Threading; -using NLog; -using Nito.AsyncEx; -using System.Diagnostics; -using Heroes.ReplayParser; -using System.Collections.Concurrent; - -namespace Hotsapi.Uploader.Common -{ - public class Manager : INotifyPropertyChanged - { - /// - /// Upload thead count - /// - public readonly int MaxThreads = Environment.ProcessorCount; - public const int MaxUploads = 4; - - /// - /// Replay list - /// - public ObservableCollectionEx Files { get; private set; } = new ObservableCollectionEx(); - - private static Logger _log = LogManager.GetCurrentClassLogger(); - private bool _initialized = false; - private AsyncCollection processingQueue = new AsyncCollection(new ConcurrentStack()); - private readonly IReplayStorage _storage; - private IUploader _uploader; - private IAnalyzer _analyzer; - private IMonitor _monitor; - - public event PropertyChangedEventHandler PropertyChanged; - - private string _status = ""; - /// - /// Current uploader status - /// - public string Status - { - get { - return _status; - } - } - - private Dictionary _aggregates = new Dictionary(); - /// - /// List of aggregate upload stats - /// - public Dictionary Aggregates - { - get { - return _aggregates; - } - } - - /// - /// Whether to mark replays for upload to hotslogs - /// - public bool UploadToHotslogs - { - get { - return _uploader?.UploadToHotslogs ?? false; - } - set { - if (_uploader != null) { - _uploader.UploadToHotslogs = value; - } - } - } - - /// - /// Which replays to delete after upload - /// - public DeleteFiles DeleteAfterUpload { get; set; } - - public Manager(IReplayStorage storage) - { - this._storage = storage; - Files.ItemPropertyChanged += (_, __) => { RefreshStatusAndAggregates(); }; - Files.CollectionChanged += (_, __) => { RefreshStatusAndAggregates(); }; - } - - /// - /// Start uploading and watching for new replays - /// - public async void Start(IMonitor monitor, IAnalyzer analyzer, IUploader uploader) - { - if (_initialized) { - return; - } - _initialized = true; - - _uploader = uploader; - _analyzer = analyzer; - _monitor = monitor; - - var replays = ScanReplays(); - Files.AddRange(replays); - replays.Where(x => x.UploadStatus == UploadStatus.None).Reverse().Map(x => processingQueue.Add(x)); - - _monitor.ReplayAdded += async (_, e) => { - await EnsureFileAvailable(e.Data, 3000); - var replay = new ReplayFile(e.Data); - Files.Insert(0, replay); - processingQueue.Add(replay); - }; - _monitor.Start(); - - _analyzer.MinimumBuild = await _uploader.GetMinimumBuild(); - - _ = UploadLoop(); - } - - public void Stop() - { - _monitor.Stop(); - processingQueue.CompleteAdding(); - } - - private async Task UploadLoop() - { - var rateLimitUploading = new SemaphoreSlim(MaxUploads); - using (var rateLimitParsing = new SemaphoreSlim(MaxThreads)) { - while (await processingQueue.OutputAvailableAsync()) { - try { - var file = await processingQueue.TakeAsync(); - Task parsing = null; - try { - await rateLimitParsing.WaitAsync(); - file.UploadStatus = UploadStatus.Preprocessing; - parsing = WorkerPool.RunBackground(() => _analyzer.Analyze(file)); - } - finally { - _ = rateLimitParsing.Release(); - } - - if (parsing != null) { - - var settingUpdate = parsing.ContinueWith((Task trp) => { - if (trp.Status == TaskStatus.RanToCompletion) { - file.UploadStatus = UploadStatus.Preprocessed; - } - return trp.Result; - }); - - //don't await the upload task, but bound it by the upload ratelimiter - _ = Task.Run(async () => { - try { - await rateLimitUploading.WaitAsync(); - var parsed = await settingUpdate; - file.UploadStatus = UploadStatus.Uploading; - await DoFileUpload(file, parsed); - } - finally { - _ = rateLimitUploading.Release(); - } - }); - } - - } - catch (Exception ex) { - _log.Error(ex, "Error in upload loop"); - } - } - } - } - - private async Task DoFileUpload(ReplayFile file, Replay replay) - { - // Analyze will set the upload status as a side-effect when it's unsuitable for uploading - if (file.UploadStatus == UploadStatus.Uploading) { - await _uploader.Upload(file); - } - SaveReplayList(); - if (ShouldDelete(file, replay)) { - DeleteReplay(file); - } - } - - private bool IsProcessingStatus(UploadStatus status) => - status == UploadStatus.Preprocessing || - status == UploadStatus.Preprocessed || - status == UploadStatus.Uploading; - - private void RefreshStatusAndAggregates() - { - _status = Files.Select(x => x.UploadStatus).Any(IsProcessingStatus) ? "Processing..." : "Idle"; - _aggregates = Files.GroupBy(x => x.UploadStatus).ToDictionary(x => x.Key, x => x.Count()); - PropertyChanged?.Invoke(this, new PropertyChangedEventArgs(nameof(Status))); - PropertyChanged?.Invoke(this, new PropertyChangedEventArgs(nameof(Aggregates))); - } - - private void SaveReplayList() - { - try { - // save only replays with fixed status. Will retry failed ones on next launch. - var ignored = new[] { UploadStatus.None, UploadStatus.UploadError}; - bool isIgnored(UploadStatus status) => ignored.Contains(status) || IsProcessingStatus(status); - _storage.Save(Files.Where(file => !isIgnored(file.UploadStatus))); - } - catch (Exception ex) { - _log.Error(ex, "Error saving replay list"); - } - } - - /// - /// Load replay cache and merge it with folder scan results - /// - private List ScanReplays() - { - var replays = new List(_storage.Load()); - var lookup = new HashSet(replays); - var comparer = new ReplayFile.ReplayFileComparer(); - replays.AddRange(_monitor.ScanReplays().Select(x => new ReplayFile(x)).Where(x => !lookup.Contains(x, comparer))); - return replays.OrderByDescending(x => x.Created).ToList(); - } - - /// - /// Delete replay file - /// - private static void DeleteReplay(ReplayFile file) - { - try { - _log.Info($"Deleting replay {file}"); - file.Deleted = true; - File.Delete(file.Filename); - } - catch (Exception ex) { - _log.Error(ex, "Error deleting file"); - } - } - - /// - /// Ensure that HotS client finished writing replay file and it can be safely open - /// - /// Filename to test - /// Timeout in milliseconds - /// Whether to test read or write access - public async Task EnsureFileAvailable(string filename, int timeout, bool testWrite = true) - { - var timer = new Stopwatch(); - timer.Start(); - while (timer.ElapsedMilliseconds < timeout) { - try { - if (testWrite) { - File.OpenWrite(filename).Close(); - } else { - File.OpenRead(filename).Close(); - } - return; - } - catch (IOException) { - // File is still in use - await Task.Delay(100); - } - catch { - return; - } - } - } - - /// - /// Decide whether a replay should be deleted according to current settings - /// - /// replay file metadata - /// Parsed replay - private bool ShouldDelete(ReplayFile file, Replay replay) - { - return - DeleteAfterUpload.HasFlag(DeleteFiles.PTR) && file.UploadStatus == UploadStatus.PtrRegion || - DeleteAfterUpload.HasFlag(DeleteFiles.Ai) && file.UploadStatus == UploadStatus.AiDetected || - DeleteAfterUpload.HasFlag(DeleteFiles.Custom) && file.UploadStatus == UploadStatus.CustomGame || - file.UploadStatus == UploadStatus.Success && ( - DeleteAfterUpload.HasFlag(DeleteFiles.Brawl) && replay.GameMode == GameMode.Brawl || - DeleteAfterUpload.HasFlag(DeleteFiles.QuickMatch) && replay.GameMode == GameMode.QuickMatch || - DeleteAfterUpload.HasFlag(DeleteFiles.UnrankedDraft) && replay.GameMode == GameMode.UnrankedDraft || - DeleteAfterUpload.HasFlag(DeleteFiles.HeroLeague) && replay.GameMode == GameMode.HeroLeague || - DeleteAfterUpload.HasFlag(DeleteFiles.TeamLeague) && replay.GameMode == GameMode.TeamLeague || - DeleteAfterUpload.HasFlag(DeleteFiles.StormLeague) && replay.GameMode == GameMode.StormLeague - ); - } - } +using System; +using System.Collections.Generic; +using System.Collections.ObjectModel; +using System.Collections.Specialized; +using System.ComponentModel; +using System.Linq; +using System.Text; +using System.Threading.Tasks; +using System.IO; +using System.Threading; +using NLog; +using Nito.AsyncEx; +using System.Diagnostics; +using Heroes.ReplayParser; +using System.Collections.Concurrent; + +namespace Hotsapi.Uploader.Common +{ + public class Manager : INotifyPropertyChanged + { + /// + /// Upload thead count + /// + public readonly int MaxThreads = Environment.ProcessorCount; + public const int MaxUploads = 4; + + /// + /// Replay list + /// + public ObservableCollectionEx Files { get; private set; } = new ObservableCollectionEx(); + + private static Logger _log = LogManager.GetCurrentClassLogger(); + private bool _initialized = false; + private AsyncCollection processingQueue = new AsyncCollection(new ConcurrentStack()); + private ConcurrentAyncBuffer<(Replay, ReplayFile)> FingerprintingQueue = new ConcurrentAyncBuffer<(Replay, ReplayFile)>(); + private ConcurrentAyncBuffer<(Replay, ReplayFile)> UploadQueue = new ConcurrentAyncBuffer<(Replay, ReplayFile)>(); + private readonly IReplayStorage _storage; + private IUploader _uploader; + private IAnalyzer _analyzer; + private IMonitor _monitor; + + public event PropertyChangedEventHandler PropertyChanged; + + private string _status = ""; + /// + /// Current uploader status + /// + public string Status + { + get { + return _status; + } + } + + private Dictionary _aggregates = new Dictionary(); + /// + /// List of aggregate upload stats + /// + public Dictionary Aggregates + { + get { + return _aggregates; + } + } + + /// + /// Whether to mark replays for upload to hotslogs + /// + public bool UploadToHotslogs + { + get { + return _uploader?.UploadToHotslogs ?? false; + } + set { + if (_uploader != null) { + _uploader.UploadToHotslogs = value; + } + } + } + + /// + /// Which replays to delete after upload + /// + public DeleteFiles DeleteAfterUpload { get; set; } + + public Manager(IReplayStorage storage) + { + _storage = storage; + Files.ItemPropertyChanged += (_, __) => { RefreshStatusAndAggregates(); }; + Files.CollectionChanged += (_, __) => { RefreshStatusAndAggregates(); }; + } + + /// + /// Start uploading and watching for new replays + /// + public async void Start(IMonitor monitor, IAnalyzer analyzer, IUploader uploader) + { + if (_initialized) { + return; + } + _initialized = true; + + _uploader = uploader; + _analyzer = analyzer; + _monitor = monitor; + + var replays = ScanReplays(); + Files.AddRange(replays); + replays.Where(x => x.UploadStatus == UploadStatus.None).Reverse().Map(x => processingQueue.Add(x)); + + _monitor.ReplayAdded += async (_, e) => { + await EnsureFileAvailable(e.Data, 3000); + var replay = new ReplayFile(e.Data); + Files.Insert(0, replay); + processingQueue.Add(replay); + }; + _monitor.Start(); + + _analyzer.MinimumBuild = await _uploader.GetMinimumBuild(); + + _ = Task.Run(() => ParseLoop()); + _ = Task.Run(() => FingerprintLoop()); + _ = Task.Run(() => UploadLoop()); + } + + public void Stop() + { + _monitor.Stop(); + processingQueue.CompleteAdding(); + } + + private async Task ParseLoop() { + using (var rateLimitParsing = new SemaphoreSlim(MaxThreads)) { + while (await processingQueue.OutputAvailableAsync()) { + try { + var file = await processingQueue.TakeAsync(); + await rateLimitParsing.Locked(() => { + _ = WorkerPool.RunBackground(async () => { + var replay = _analyzer.Analyze(file); + if (replay != null && file.UploadStatus == UploadStatus.Preprocessed) { + await FingerprintingQueue.EnqueueAsync((replay, file)); + } + }); + }); + } + catch (Exception ex) { + _log.Error(ex, "Error in parse loop"); + } + } + } + } + private async Task FingerprintLoop() { + while (true) { + var UnFingerprinted = await FingerprintingQueue.DequeueAsync(); + var eligible = UnFingerprinted.Where(pair => pair.Item2.UploadStatus == UploadStatus.Preprocessed).ToList(); + await _uploader.CheckDuplicate(eligible.Select(pair => pair.Item2)); + await UploadQueue.EnqueueManyAsync(eligible.Where(pair => pair.Item2.UploadStatus == UploadStatus.ReadyForUpload)); + } + } + private async Task UploadLoop() { + using (var rateLimitUploading = new SemaphoreSlim(MaxUploads)){ + while (true) { + var parsed = await UploadQueue.DequeueAsync(); + foreach (var (replay, replayfile) in parsed) { + if (replayfile.UploadStatus == UploadStatus.ReadyForUpload) { + //don't await the upload task, but bound it by the upload ratelimiter + _ = rateLimitUploading.Locked(async () => + await DoFileUpload(replayfile, replay)); + } + } + } + } + } + + + private async Task DoFileUpload(ReplayFile file, Replay replay) + { + // Analyze will set the upload status as a side-effect when it's unsuitable for uploading + if (file.UploadStatus == UploadStatus.ReadyForUpload) { + await _uploader.Upload(file); + } + SaveReplayList(); + if (ShouldDelete(file, replay)) { + DeleteReplay(file); + } + } + + private bool IsProcessingStatus(UploadStatus status) => + status == UploadStatus.Preprocessing || + status == UploadStatus.Preprocessed || + status == UploadStatus.ReadyForUpload || + status == UploadStatus.Uploading; + + private void RefreshStatusAndAggregates() + { + _status = Files.Select(x => x.UploadStatus).Any(IsProcessingStatus) ? "Processing..." : "Idle"; + _aggregates = Files.GroupBy(x => x.UploadStatus).ToDictionary(x => x.Key, x => x.Count()); + PropertyChanged?.Invoke(this, new PropertyChangedEventArgs(nameof(Status))); + PropertyChanged?.Invoke(this, new PropertyChangedEventArgs(nameof(Aggregates))); + } + + private void SaveReplayList() + { + try { + // save only replays with fixed status. Will retry failed ones on next launch. + var ignored = new[] { UploadStatus.None, UploadStatus.UploadError}; + bool isIgnored(UploadStatus status) => ignored.Contains(status) || IsProcessingStatus(status); + _storage.Save(Files.Where(file => !isIgnored(file.UploadStatus))); + } + catch (Exception ex) { + _log.Error(ex, "Error saving replay list"); + } + } + + /// + /// Load replay cache and merge it with folder scan results + /// + private List ScanReplays() + { + var replays = new List(_storage.Load()); + var lookup = new HashSet(replays); + var comparer = new ReplayFile.ReplayFileComparer(); + replays.AddRange(_monitor.ScanReplays().Select(x => new ReplayFile(x)).Where(x => !lookup.Contains(x, comparer))); + return replays.OrderByDescending(x => x.Created).ToList(); + } + + /// + /// Delete replay file + /// + private static void DeleteReplay(ReplayFile file) + { + try { + _log.Info($"Deleting replay {file}"); + file.Deleted = true; + File.Delete(file.Filename); + } + catch (Exception ex) { + _log.Error(ex, "Error deleting file"); + } + } + + /// + /// Ensure that HotS client finished writing replay file and it can be safely open + /// + /// Filename to test + /// Timeout in milliseconds + /// Whether to test read or write access + public async Task EnsureFileAvailable(string filename, int timeout, bool testWrite = true) + { + var timer = new Stopwatch(); + timer.Start(); + while (timer.ElapsedMilliseconds < timeout) { + try { + if (testWrite) { + File.OpenWrite(filename).Close(); + } else { + File.OpenRead(filename).Close(); + } + return; + } + catch (IOException) { + // File is still in use + await Task.Delay(100); + } + catch { + return; + } + } + } + + /// + /// Decide whether a replay should be deleted according to current settings + /// + /// replay file metadata + /// Parsed replay + private bool ShouldDelete(ReplayFile file, Replay replay) + { + return + DeleteAfterUpload.HasFlag(DeleteFiles.PTR) && file.UploadStatus == UploadStatus.PtrRegion || + DeleteAfterUpload.HasFlag(DeleteFiles.Ai) && file.UploadStatus == UploadStatus.AiDetected || + DeleteAfterUpload.HasFlag(DeleteFiles.Custom) && file.UploadStatus == UploadStatus.CustomGame || + file.UploadStatus == UploadStatus.Success && ( + DeleteAfterUpload.HasFlag(DeleteFiles.Brawl) && replay.GameMode == GameMode.Brawl || + DeleteAfterUpload.HasFlag(DeleteFiles.QuickMatch) && replay.GameMode == GameMode.QuickMatch || + DeleteAfterUpload.HasFlag(DeleteFiles.UnrankedDraft) && replay.GameMode == GameMode.UnrankedDraft || + DeleteAfterUpload.HasFlag(DeleteFiles.HeroLeague) && replay.GameMode == GameMode.HeroLeague || + DeleteAfterUpload.HasFlag(DeleteFiles.TeamLeague) && replay.GameMode == GameMode.TeamLeague || + DeleteAfterUpload.HasFlag(DeleteFiles.StormLeague) && replay.GameMode == GameMode.StormLeague + ); + } + } } \ No newline at end of file diff --git a/Hotsapi.Uploader.Common/UploadStatus.cs b/Hotsapi.Uploader.Common/UploadStatus.cs index 3059f17..bfe4d25 100644 --- a/Hotsapi.Uploader.Common/UploadStatus.cs +++ b/Hotsapi.Uploader.Common/UploadStatus.cs @@ -8,15 +8,18 @@ public enum UploadStatus { None, Success, - Preprocessed, Preprocessing, + Preprocessed, + ReadyForUpload, Uploading, UploadError, + CheckingDuplicates, Duplicate, AiDetected, CustomGame, PtrRegion, Incomplete, - TooOld, + TooOld, + } } diff --git a/Hotsapi.Uploader.Common/Uploader.cs b/Hotsapi.Uploader.Common/Uploader.cs index 500dc4a..c98edbd 100644 --- a/Hotsapi.Uploader.Common/Uploader.cs +++ b/Hotsapi.Uploader.Common/Uploader.cs @@ -1,9 +1,11 @@ -using Newtonsoft.Json.Linq; +using Newtonsoft.Json; +using Newtonsoft.Json.Linq; using NLog; using System; using System.Collections.Generic; using System.Linq; using System.Net; +using System.Net.Http; using System.Text; using System.Threading; using System.Threading.Tasks; @@ -35,8 +37,9 @@ public Uploader() /// public async Task Upload(ReplayFile file) { + var checkDuplicate = file.UploadStatus != UploadStatus.ReadyForUpload; file.UploadStatus = UploadStatus.Uploading; - if (file.Fingerprint != null && await CheckDuplicate(file.Fingerprint)) { + if (file.Fingerprint != null && !checkDuplicate || await CheckDuplicate(file.Fingerprint)) { _log.Debug($"File {file} marked as duplicate"); file.UploadStatus = UploadStatus.Duplicate; } else { @@ -131,8 +134,13 @@ private async Task CheckDuplicate(IEnumerable fingerprints) /// public async Task CheckDuplicate(IEnumerable replays) { + foreach(var replay in replays) { + replay.UploadStatus = UploadStatus.CheckingDuplicates; + } var exists = new HashSet(await CheckDuplicate(replays.Select(x => x.Fingerprint))); - replays.Where(x => exists.Contains(x.Fingerprint)).Map(x => x.UploadStatus = UploadStatus.Duplicate); + foreach(var replay in replays) { + replay.UploadStatus = exists.Contains(replay.Fingerprint) ? UploadStatus.Duplicate : UploadStatus.ReadyForUpload; + } } /// diff --git a/Hotsapi.Uploader.Windows/App.config b/Hotsapi.Uploader.Windows/App.config index 82d5ea4..d9b9904 100644 --- a/Hotsapi.Uploader.Windows/App.config +++ b/Hotsapi.Uploader.Windows/App.config @@ -33,7 +33,7 @@ 600 - 700 + 800 False From 13a46b4beb18bdfd98edfc43f6c0927e518631d8 Mon Sep 17 00:00:00 2001 From: Martijn Hoekstra Date: Wed, 4 Dec 2019 21:15:43 +0100 Subject: [PATCH 3/7] fix duplicate check logic --- Hotsapi.Uploader.Common/Uploader.cs | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/Hotsapi.Uploader.Common/Uploader.cs b/Hotsapi.Uploader.Common/Uploader.cs index c98edbd..ab14dc8 100644 --- a/Hotsapi.Uploader.Common/Uploader.cs +++ b/Hotsapi.Uploader.Common/Uploader.cs @@ -1,11 +1,11 @@ -using Newtonsoft.Json; +using Newtonsoft.Json; using Newtonsoft.Json.Linq; using NLog; using System; using System.Collections.Generic; using System.Linq; using System.Net; -using System.Net.Http; +using System.Net.Http; using System.Text; using System.Threading; using System.Threading.Tasks; @@ -34,12 +34,12 @@ public Uploader() /// /// Upload replay /// - /// + /// The file to upload public async Task Upload(ReplayFile file) { - var checkDuplicate = file.UploadStatus != UploadStatus.ReadyForUpload; + var doDuplicateCheck = file.UploadStatus != UploadStatus.ReadyForUpload; file.UploadStatus = UploadStatus.Uploading; - if (file.Fingerprint != null && !checkDuplicate || await CheckDuplicate(file.Fingerprint)) { + if (file.Fingerprint != null && doDuplicateCheck && await CheckDuplicate(file.Fingerprint)) { _log.Debug($"File {file} marked as duplicate"); file.UploadStatus = UploadStatus.Duplicate; } else { @@ -134,12 +134,12 @@ private async Task CheckDuplicate(IEnumerable fingerprints) /// public async Task CheckDuplicate(IEnumerable replays) { - foreach(var replay in replays) { - replay.UploadStatus = UploadStatus.CheckingDuplicates; + foreach(var replay in replays) { + replay.UploadStatus = UploadStatus.CheckingDuplicates; } var exists = new HashSet(await CheckDuplicate(replays.Select(x => x.Fingerprint))); - foreach(var replay in replays) { - replay.UploadStatus = exists.Contains(replay.Fingerprint) ? UploadStatus.Duplicate : UploadStatus.ReadyForUpload; + foreach(var replay in replays) { + replay.UploadStatus = exists.Contains(replay.Fingerprint) ? UploadStatus.Duplicate : UploadStatus.ReadyForUpload; } } From f95fbb138416644cb089c71f3288af462afd39fd Mon Sep 17 00:00:00 2001 From: Martijn Hoekstra Date: Wed, 4 Dec 2019 21:22:37 +0100 Subject: [PATCH 4/7] run background pool without semaphore protection --- Hotsapi.Uploader.Common/Manager.cs | 37 ++++++++++++++++-------------- 1 file changed, 20 insertions(+), 17 deletions(-) diff --git a/Hotsapi.Uploader.Common/Manager.cs b/Hotsapi.Uploader.Common/Manager.cs index 66c9004..d65b00d 100644 --- a/Hotsapi.Uploader.Common/Manager.cs +++ b/Hotsapi.Uploader.Common/Manager.cs @@ -129,26 +129,29 @@ public void Stop() processingQueue.CompleteAdding(); } - private async Task ParseLoop() { - using (var rateLimitParsing = new SemaphoreSlim(MaxThreads)) { - while (await processingQueue.OutputAvailableAsync()) { - try { - var file = await processingQueue.TakeAsync(); - await rateLimitParsing.Locked(() => { - _ = WorkerPool.RunBackground(async () => { - var replay = _analyzer.Analyze(file); - if (replay != null && file.UploadStatus == UploadStatus.Preprocessed) { - await FingerprintingQueue.EnqueueAsync((replay, file)); - } - }); - }); - } - catch (Exception ex) { - _log.Error(ex, "Error in parse loop"); - } + private async Task ParseLoop() + { + //OutputAvailableAsync will keep returning true + //untill all data is processed and processQueue.CompleteAdding is called + while (await processingQueue.OutputAvailableAsync()) { + try { + var file = await processingQueue.TakeAsync(); + //don't wait for completion of background pool task. + //it's internally limited to a fixed number of low-priority threads + //so we can throw as much work on there as we want without choking it + _ = WorkerPool.RunBackground(async () => { + var replay = _analyzer.Analyze(file); + if (replay != null && file.UploadStatus == UploadStatus.Preprocessed) { + await FingerprintingQueue.EnqueueAsync((replay, file)); + } + }); + } + catch (Exception ex) { + _log.Error(ex, "Error in parse loop"); } } } + private async Task FingerprintLoop() { while (true) { var UnFingerprinted = await FingerprintingQueue.DequeueAsync(); From 75c9e2f3ffde7cf3216861bb419ed2995becdd09 Mon Sep 17 00:00:00 2001 From: Martijn Hoekstra Date: Sun, 8 Dec 2019 17:15:37 +0100 Subject: [PATCH 5/7] fix mocks to set success status --- Hotsapi.Uploader.Common.Test/ManagerTests.cs | 7 ++++--- Hotsapi.Uploader.Common.Test/MockAnalizer.cs | 5 ++++- Hotsapi.Uploader.Common.Test/MockUploader.cs | 8 +++++++- Hotsapi.Uploader.Common/Manager.cs | 4 +++- 4 files changed, 18 insertions(+), 6 deletions(-) diff --git a/Hotsapi.Uploader.Common.Test/ManagerTests.cs b/Hotsapi.Uploader.Common.Test/ManagerTests.cs index ff33583..6c855db 100644 --- a/Hotsapi.Uploader.Common.Test/ManagerTests.cs +++ b/Hotsapi.Uploader.Common.Test/ManagerTests.cs @@ -8,7 +8,7 @@ namespace Hotsapi.Uploader.Common.Test [TestClass] public partial class ManagerTests { - private Task ShortRandomDelay() + public static Task ShortRandomDelay() { var r = new Random(); var delay = r.Next(100, 200); @@ -132,8 +132,9 @@ public async Task AllInitialFilesProcessed() }); manager.Start(new NoNewFilesMonitor(), new MockAnalizer(), uploadTester); - var finished = await Task.WhenAny(Task.Delay(4000), done.Task); - await finished; + var num = await done.Task; + //var finished = await Task.WhenAny(Task.Delay(4000), done.Task); + //await finished; Assert.AreEqual(3, uploadsSeen); } } diff --git a/Hotsapi.Uploader.Common.Test/MockAnalizer.cs b/Hotsapi.Uploader.Common.Test/MockAnalizer.cs index 318a4df..d529033 100644 --- a/Hotsapi.Uploader.Common.Test/MockAnalizer.cs +++ b/Hotsapi.Uploader.Common.Test/MockAnalizer.cs @@ -7,7 +7,10 @@ public partial class ManagerTests private class MockAnalizer : IAnalyzer { public int MinimumBuild { get; set; } - public Replay Analyze(ReplayFile file) => new Replay(); + public Replay Analyze(ReplayFile file) { + file.UploadStatus = UploadStatus.Preprocessed; + return new Replay(); + } public string GetFingerprint(Replay replay) => "dummy fingerprint"; } } diff --git a/Hotsapi.Uploader.Common.Test/MockUploader.cs b/Hotsapi.Uploader.Common.Test/MockUploader.cs index 332e063..bda73f0 100644 --- a/Hotsapi.Uploader.Common.Test/MockUploader.cs +++ b/Hotsapi.Uploader.Common.Test/MockUploader.cs @@ -21,7 +21,13 @@ public void SetUploadCallback(Func onUpload) }; } - public Task CheckDuplicate(IEnumerable replays) => Task.CompletedTask; + public async Task CheckDuplicate(IEnumerable replays) + { + foreach (var replay in replays) { + replay.UploadStatus = UploadStatus.ReadyForUpload; + } + await ShortRandomDelay(); //todo: put this elsewhere + } public Task GetMinimumBuild() => Task.FromResult(1); public Task Upload(ReplayFile file) { diff --git a/Hotsapi.Uploader.Common/Manager.cs b/Hotsapi.Uploader.Common/Manager.cs index d65b00d..6d07450 100644 --- a/Hotsapi.Uploader.Common/Manager.cs +++ b/Hotsapi.Uploader.Common/Manager.cs @@ -106,7 +106,9 @@ public async void Start(IMonitor monitor, IAnalyzer analyzer, IUploader uploader var replays = ScanReplays(); Files.AddRange(replays); - replays.Where(x => x.UploadStatus == UploadStatus.None).Reverse().Map(x => processingQueue.Add(x)); + replays.Where(x => x.UploadStatus == UploadStatus.None) + .Reverse() + .Map(processingQueue.Add); _monitor.ReplayAdded += async (_, e) => { await EnsureFileAvailable(e.Data, 3000); From 9ab3b36fb62acaa6ffbe1754c6d2dd62b325dc77 Mon Sep 17 00:00:00 2001 From: Martijn Hoekstra Date: Mon, 9 Dec 2019 15:07:30 +0100 Subject: [PATCH 6/7] uploads end in-order --- Hotsapi.Uploader.Common.Test/ManagerTests.cs | 88 +-- Hotsapi.Uploader.Common.Test/MockAnalizer.cs | 4 +- Hotsapi.Uploader.Common.Test/MockUploader.cs | 26 +- Hotsapi.Uploader.Common/IUploader.cs | 4 +- Hotsapi.Uploader.Common/Manager.cs | 628 ++++++++++--------- Hotsapi.Uploader.Common/Uploader.cs | 93 ++- 6 files changed, 444 insertions(+), 399 deletions(-) diff --git a/Hotsapi.Uploader.Common.Test/ManagerTests.cs b/Hotsapi.Uploader.Common.Test/ManagerTests.cs index 6c855db..89a21e5 100644 --- a/Hotsapi.Uploader.Common.Test/ManagerTests.cs +++ b/Hotsapi.Uploader.Common.Test/ManagerTests.cs @@ -1,6 +1,7 @@ using Microsoft.VisualStudio.TestTools.UnitTesting; using System; using System.Collections.Generic; +using System.Linq; using System.Threading.Tasks; namespace Hotsapi.Uploader.Common.Test @@ -14,68 +15,27 @@ public static Task ShortRandomDelay() var delay = r.Next(100, 200); return Task.Delay(delay); } - private static IEnumerable ThreeInOrder + private static IEnumerable FilesInOrder { get { - var one = new ReplayFile("one") { - Created = new DateTime(2020, 1, 1, 0, 0, 1) - }; - var two = new ReplayFile("two") { - Created = new DateTime(2020, 1, 1, 0, 0, 10) - }; - var three = new ReplayFile("three") { - Created = new DateTime(2020, 1, 1, 0, 0, 20) - }; - var initialFiles = new List() { one, two, three }; - return initialFiles; + var next = new DateTime(2020, 1, 1, 0, 0, 0); + var increment = new TimeSpan(0, 0, 1); + var rand = new Random(); + var nums = Enumerable.Range(1, 24).OrderBy(rf => rand.NextDouble()); + + return nums.Select(i => { + next += increment; + return new ReplayFile($"upload_{i}") { + Created = next + }; + }); + } } - - [TestMethod] - [Ignore("Known intermittant failure: multiple uploads are started in parallel and don't always start in order")] - public async Task InitialFilesStartInOrder() - { - var initialFiles = ThreeInOrder; - - var manager = new Manager(new MockStorage(initialFiles)); - var uploadTester = new MockUploader(); - - var promise = new TaskCompletionSource(); - Task done = promise.Task; - - var uploadsSeen = 0; - var l = new object(); - ReplayFile lastUploadStarted = null; - uploadTester.SetUploadCallback(async rf => { - if (lastUploadStarted != null) { - try { - Assert.IsTrue(rf.Created >= lastUploadStarted.Created, $"upload started out of order, {lastUploadStarted} started after {rf}"); - } catch (Exception e) { - promise.TrySetException(e); - } - } - lastUploadStarted = rf; - await ShortRandomDelay(); - var isDone = false; - lock (l) { - uploadsSeen++; - isDone = uploadsSeen >= 3; - } - if (isDone) { - promise.TrySetResult(uploadsSeen); - } - }); - - manager.Start(new NoNewFilesMonitor(), new MockAnalizer(), uploadTester); - await done; - } - - [TestMethod] - [Ignore("Known intermittant failure: multiple uploads are started in parallel and don't always end in order")] public async Task InitialFilesEndInorder() { - var initialFiles = ThreeInOrder; + var initialFiles = FilesInOrder; var manager = new Manager(new MockStorage(initialFiles)); var uploadTester = new MockUploader(); @@ -85,11 +45,11 @@ public async Task InitialFilesEndInorder() { var uploadsSeen = 0; var l = new object(); ReplayFile lastUploadFinished = null; - uploadTester.SetUploadCallback(async rf => { - await ShortRandomDelay(); + uploadTester.UploadFinished = async rf => { if (lastUploadFinished != null) { try { - Assert.IsTrue(rf.Created >= lastUploadFinished.Created, $"upload completed out of order, {lastUploadFinished} completed after {rf}"); + var isInOrder = rf.Created >= lastUploadFinished.Created; + Assert.IsTrue(isInOrder, $"upload completed out of order, {rf} completed after {lastUploadFinished}"); } catch (Exception e) { promise.TrySetException(e); @@ -104,7 +64,7 @@ public async Task InitialFilesEndInorder() { if (isDone) { promise.TrySetResult(uploadsSeen); } - }); + }; manager.Start(new NoNewFilesMonitor(), new MockAnalizer(), uploadTester); await done; @@ -113,7 +73,7 @@ public async Task InitialFilesEndInorder() { [TestMethod] public async Task AllInitialFilesProcessed() { - var initialFiles = ThreeInOrder; + var initialFiles = FilesInOrder; var manager = new Manager(new MockStorage(initialFiles)); var uploadTester = new MockUploader(); @@ -121,7 +81,7 @@ public async Task AllInitialFilesProcessed() var uploadsSeen = 0; object l = new object(); - uploadTester.SetUploadCallback(async rf => { + uploadTester.UploadFinished = async rf => { await ShortRandomDelay(); lock (l) { uploadsSeen++; @@ -129,13 +89,11 @@ public async Task AllInitialFilesProcessed() done.SetResult(uploadsSeen); } } - }); + }; manager.Start(new NoNewFilesMonitor(), new MockAnalizer(), uploadTester); var num = await done.Task; - //var finished = await Task.WhenAny(Task.Delay(4000), done.Task); - //await finished; - Assert.AreEqual(3, uploadsSeen); + Assert.AreEqual(3, num); } } } diff --git a/Hotsapi.Uploader.Common.Test/MockAnalizer.cs b/Hotsapi.Uploader.Common.Test/MockAnalizer.cs index d529033..c7e2b7c 100644 --- a/Hotsapi.Uploader.Common.Test/MockAnalizer.cs +++ b/Hotsapi.Uploader.Common.Test/MockAnalizer.cs @@ -9,7 +9,9 @@ private class MockAnalizer : IAnalyzer public int MinimumBuild { get; set; } public Replay Analyze(ReplayFile file) { file.UploadStatus = UploadStatus.Preprocessed; - return new Replay(); + return new Replay() { + Timestamp = file.Created + }; } public string GetFingerprint(Replay replay) => "dummy fingerprint"; } diff --git a/Hotsapi.Uploader.Common.Test/MockUploader.cs b/Hotsapi.Uploader.Common.Test/MockUploader.cs index bda73f0..c4eed15 100644 --- a/Hotsapi.Uploader.Common.Test/MockUploader.cs +++ b/Hotsapi.Uploader.Common.Test/MockUploader.cs @@ -10,33 +10,27 @@ private class MockUploader : IUploader { public bool UploadToHotslogs { get; set; } - private Func UploadCallback = _ => Task.CompletedTask; - public void SetUploadCallback(Func onUpload) - { - var old = UploadCallback; - - UploadCallback = async (ReplayFile file) => { - await old(file); - await onUpload(file); - }; - } + public Func UploadStarted { get; set; } = _ => Task.CompletedTask; + public Func UploadFinished { get; set; } = _ => Task.CompletedTask; public async Task CheckDuplicate(IEnumerable replays) { foreach (var replay in replays) { replay.UploadStatus = UploadStatus.ReadyForUpload; } - await ShortRandomDelay(); //todo: put this elsewhere + await ShortRandomDelay(); } public Task GetMinimumBuild() => Task.FromResult(1); - public Task Upload(ReplayFile file) + public async Task Upload(ReplayFile file, Task mayComplete) { - UploadCallback(file); - return Task.CompletedTask; + await UploadStarted(file); + await Upload(file.Filename, mayComplete); + await UploadFinished(file); } - public async Task Upload(string file) + public async Task Upload(string file, Task mayComplete) { - await Task.Delay(100); + await ShortRandomDelay(); + await mayComplete; return UploadStatus.Success; } } diff --git a/Hotsapi.Uploader.Common/IUploader.cs b/Hotsapi.Uploader.Common/IUploader.cs index b731770..ca16573 100644 --- a/Hotsapi.Uploader.Common/IUploader.cs +++ b/Hotsapi.Uploader.Common/IUploader.cs @@ -8,7 +8,7 @@ public interface IUploader bool UploadToHotslogs { get; set; } Task CheckDuplicate(IEnumerable replays); Task GetMinimumBuild(); - Task Upload(ReplayFile file); - Task Upload(string file); + Task Upload(ReplayFile file, Task mayComplete); + Task Upload(string file, Task mayComplete); } } \ No newline at end of file diff --git a/Hotsapi.Uploader.Common/Manager.cs b/Hotsapi.Uploader.Common/Manager.cs index 6d07450..9f27ac0 100644 --- a/Hotsapi.Uploader.Common/Manager.cs +++ b/Hotsapi.Uploader.Common/Manager.cs @@ -1,297 +1,333 @@ -using System; -using System.Collections.Generic; -using System.Collections.ObjectModel; -using System.Collections.Specialized; -using System.ComponentModel; -using System.Linq; -using System.Text; -using System.Threading.Tasks; -using System.IO; -using System.Threading; -using NLog; -using Nito.AsyncEx; -using System.Diagnostics; -using Heroes.ReplayParser; -using System.Collections.Concurrent; - -namespace Hotsapi.Uploader.Common -{ - public class Manager : INotifyPropertyChanged - { - /// - /// Upload thead count - /// - public readonly int MaxThreads = Environment.ProcessorCount; - public const int MaxUploads = 4; - - /// - /// Replay list - /// - public ObservableCollectionEx Files { get; private set; } = new ObservableCollectionEx(); - - private static Logger _log = LogManager.GetCurrentClassLogger(); - private bool _initialized = false; - private AsyncCollection processingQueue = new AsyncCollection(new ConcurrentStack()); - private ConcurrentAyncBuffer<(Replay, ReplayFile)> FingerprintingQueue = new ConcurrentAyncBuffer<(Replay, ReplayFile)>(); - private ConcurrentAyncBuffer<(Replay, ReplayFile)> UploadQueue = new ConcurrentAyncBuffer<(Replay, ReplayFile)>(); - private readonly IReplayStorage _storage; - private IUploader _uploader; - private IAnalyzer _analyzer; - private IMonitor _monitor; - - public event PropertyChangedEventHandler PropertyChanged; - - private string _status = ""; - /// - /// Current uploader status - /// - public string Status - { - get { - return _status; - } - } - - private Dictionary _aggregates = new Dictionary(); - /// - /// List of aggregate upload stats - /// - public Dictionary Aggregates - { - get { - return _aggregates; - } - } - - /// - /// Whether to mark replays for upload to hotslogs - /// - public bool UploadToHotslogs - { - get { - return _uploader?.UploadToHotslogs ?? false; - } - set { - if (_uploader != null) { - _uploader.UploadToHotslogs = value; - } - } - } - - /// - /// Which replays to delete after upload - /// - public DeleteFiles DeleteAfterUpload { get; set; } - - public Manager(IReplayStorage storage) - { - _storage = storage; - Files.ItemPropertyChanged += (_, __) => { RefreshStatusAndAggregates(); }; - Files.CollectionChanged += (_, __) => { RefreshStatusAndAggregates(); }; - } - - /// - /// Start uploading and watching for new replays - /// - public async void Start(IMonitor monitor, IAnalyzer analyzer, IUploader uploader) - { - if (_initialized) { - return; - } - _initialized = true; - - _uploader = uploader; - _analyzer = analyzer; - _monitor = monitor; - - var replays = ScanReplays(); - Files.AddRange(replays); - replays.Where(x => x.UploadStatus == UploadStatus.None) - .Reverse() - .Map(processingQueue.Add); - - _monitor.ReplayAdded += async (_, e) => { - await EnsureFileAvailable(e.Data, 3000); - var replay = new ReplayFile(e.Data); - Files.Insert(0, replay); - processingQueue.Add(replay); - }; - _monitor.Start(); - - _analyzer.MinimumBuild = await _uploader.GetMinimumBuild(); - - _ = Task.Run(() => ParseLoop()); - _ = Task.Run(() => FingerprintLoop()); - _ = Task.Run(() => UploadLoop()); - } - - public void Stop() - { - _monitor.Stop(); - processingQueue.CompleteAdding(); - } - - private async Task ParseLoop() - { - //OutputAvailableAsync will keep returning true - //untill all data is processed and processQueue.CompleteAdding is called - while (await processingQueue.OutputAvailableAsync()) { - try { - var file = await processingQueue.TakeAsync(); - //don't wait for completion of background pool task. - //it's internally limited to a fixed number of low-priority threads - //so we can throw as much work on there as we want without choking it - _ = WorkerPool.RunBackground(async () => { - var replay = _analyzer.Analyze(file); - if (replay != null && file.UploadStatus == UploadStatus.Preprocessed) { - await FingerprintingQueue.EnqueueAsync((replay, file)); - } - }); - } - catch (Exception ex) { - _log.Error(ex, "Error in parse loop"); - } - } - } - - private async Task FingerprintLoop() { - while (true) { - var UnFingerprinted = await FingerprintingQueue.DequeueAsync(); - var eligible = UnFingerprinted.Where(pair => pair.Item2.UploadStatus == UploadStatus.Preprocessed).ToList(); - await _uploader.CheckDuplicate(eligible.Select(pair => pair.Item2)); - await UploadQueue.EnqueueManyAsync(eligible.Where(pair => pair.Item2.UploadStatus == UploadStatus.ReadyForUpload)); - } - } - private async Task UploadLoop() { - using (var rateLimitUploading = new SemaphoreSlim(MaxUploads)){ - while (true) { - var parsed = await UploadQueue.DequeueAsync(); - foreach (var (replay, replayfile) in parsed) { - if (replayfile.UploadStatus == UploadStatus.ReadyForUpload) { - //don't await the upload task, but bound it by the upload ratelimiter - _ = rateLimitUploading.Locked(async () => - await DoFileUpload(replayfile, replay)); - } - } - } - } - } - - - private async Task DoFileUpload(ReplayFile file, Replay replay) - { - // Analyze will set the upload status as a side-effect when it's unsuitable for uploading - if (file.UploadStatus == UploadStatus.ReadyForUpload) { - await _uploader.Upload(file); - } - SaveReplayList(); - if (ShouldDelete(file, replay)) { - DeleteReplay(file); - } - } - - private bool IsProcessingStatus(UploadStatus status) => - status == UploadStatus.Preprocessing || - status == UploadStatus.Preprocessed || - status == UploadStatus.ReadyForUpload || - status == UploadStatus.Uploading; - - private void RefreshStatusAndAggregates() - { - _status = Files.Select(x => x.UploadStatus).Any(IsProcessingStatus) ? "Processing..." : "Idle"; - _aggregates = Files.GroupBy(x => x.UploadStatus).ToDictionary(x => x.Key, x => x.Count()); - PropertyChanged?.Invoke(this, new PropertyChangedEventArgs(nameof(Status))); - PropertyChanged?.Invoke(this, new PropertyChangedEventArgs(nameof(Aggregates))); - } - - private void SaveReplayList() - { - try { - // save only replays with fixed status. Will retry failed ones on next launch. - var ignored = new[] { UploadStatus.None, UploadStatus.UploadError}; - bool isIgnored(UploadStatus status) => ignored.Contains(status) || IsProcessingStatus(status); - _storage.Save(Files.Where(file => !isIgnored(file.UploadStatus))); - } - catch (Exception ex) { - _log.Error(ex, "Error saving replay list"); - } - } - - /// - /// Load replay cache and merge it with folder scan results - /// - private List ScanReplays() - { - var replays = new List(_storage.Load()); - var lookup = new HashSet(replays); - var comparer = new ReplayFile.ReplayFileComparer(); - replays.AddRange(_monitor.ScanReplays().Select(x => new ReplayFile(x)).Where(x => !lookup.Contains(x, comparer))); - return replays.OrderByDescending(x => x.Created).ToList(); - } - - /// - /// Delete replay file - /// - private static void DeleteReplay(ReplayFile file) - { - try { - _log.Info($"Deleting replay {file}"); - file.Deleted = true; - File.Delete(file.Filename); - } - catch (Exception ex) { - _log.Error(ex, "Error deleting file"); - } - } - - /// - /// Ensure that HotS client finished writing replay file and it can be safely open - /// - /// Filename to test - /// Timeout in milliseconds - /// Whether to test read or write access - public async Task EnsureFileAvailable(string filename, int timeout, bool testWrite = true) - { - var timer = new Stopwatch(); - timer.Start(); - while (timer.ElapsedMilliseconds < timeout) { - try { - if (testWrite) { - File.OpenWrite(filename).Close(); - } else { - File.OpenRead(filename).Close(); - } - return; - } - catch (IOException) { - // File is still in use - await Task.Delay(100); - } - catch { - return; - } - } - } - - /// - /// Decide whether a replay should be deleted according to current settings - /// - /// replay file metadata - /// Parsed replay - private bool ShouldDelete(ReplayFile file, Replay replay) - { - return - DeleteAfterUpload.HasFlag(DeleteFiles.PTR) && file.UploadStatus == UploadStatus.PtrRegion || - DeleteAfterUpload.HasFlag(DeleteFiles.Ai) && file.UploadStatus == UploadStatus.AiDetected || - DeleteAfterUpload.HasFlag(DeleteFiles.Custom) && file.UploadStatus == UploadStatus.CustomGame || - file.UploadStatus == UploadStatus.Success && ( - DeleteAfterUpload.HasFlag(DeleteFiles.Brawl) && replay.GameMode == GameMode.Brawl || - DeleteAfterUpload.HasFlag(DeleteFiles.QuickMatch) && replay.GameMode == GameMode.QuickMatch || - DeleteAfterUpload.HasFlag(DeleteFiles.UnrankedDraft) && replay.GameMode == GameMode.UnrankedDraft || - DeleteAfterUpload.HasFlag(DeleteFiles.HeroLeague) && replay.GameMode == GameMode.HeroLeague || - DeleteAfterUpload.HasFlag(DeleteFiles.TeamLeague) && replay.GameMode == GameMode.TeamLeague || - DeleteAfterUpload.HasFlag(DeleteFiles.StormLeague) && replay.GameMode == GameMode.StormLeague - ); - } - } +using System; +using System.Collections.Generic; +using System.Collections.ObjectModel; +using System.Collections.Specialized; +using System.ComponentModel; +using System.Linq; +using System.Text; +using System.Threading.Tasks; +using System.IO; +using System.Threading; +using NLog; +using Nito.AsyncEx; +using System.Diagnostics; +using Heroes.ReplayParser; +using System.Collections.Concurrent; + +namespace Hotsapi.Uploader.Common +{ + public class Manager : INotifyPropertyChanged + { + /// + /// Maximum number of simultaneous uploads in progress + /// + public const int MaxUploads = 4; + + /// + /// Replay list + /// + public ObservableCollectionEx Files { get; private set; } = new ObservableCollectionEx(); + + private static Logger _log = LogManager.GetCurrentClassLogger(); + private bool _initialized = false; + private AsyncCollection processingQueue = new AsyncCollection(new ConcurrentQueue()); + private ConcurrentAyncBuffer<(Replay, ReplayFile)> FingerprintingQueue = new ConcurrentAyncBuffer<(Replay, ReplayFile)>(); + private ConcurrentAyncBuffer<(Replay, ReplayFile)> UploadQueue = new ConcurrentAyncBuffer<(Replay, ReplayFile)>(); + private readonly IReplayStorage _storage; + private IUploader _uploader; + private IAnalyzer _analyzer; + private IMonitor _monitor; + + public event PropertyChangedEventHandler PropertyChanged; + + private string _status = ""; + /// + /// Current uploader status + /// + public string Status + { + get { + return _status; + } + } + + private Dictionary _aggregates = new Dictionary(); + /// + /// List of aggregate upload stats + /// + public Dictionary Aggregates + { + get { + return _aggregates; + } + } + + /// + /// Whether to mark replays for upload to hotslogs + /// + public bool UploadToHotslogs + { + get { + return _uploader?.UploadToHotslogs ?? false; + } + set { + if (_uploader != null) { + _uploader.UploadToHotslogs = value; + } + } + } + + /// + /// Which replays to delete after upload + /// + public DeleteFiles DeleteAfterUpload { get; set; } + + public Manager(IReplayStorage storage) + { + _storage = storage; + Files.ItemPropertyChanged += (_, __) => { RefreshStatusAndAggregates(); }; + Files.CollectionChanged += (_, __) => { RefreshStatusAndAggregates(); }; + } + + /// + /// Start uploading and watching for new replays + /// + public async void Start(IMonitor monitor, IAnalyzer analyzer, IUploader uploader) + { + if (_initialized) { + return; + } + _initialized = true; + + _uploader = uploader; + _analyzer = analyzer; + _monitor = monitor; + + var replays = ScanReplays(); + Files.AddRange(replays.Reverse()); + replays.Where(x => x.UploadStatus == UploadStatus.None) + .Map(processingQueue.Add); + + _monitor.ReplayAdded += async (_, e) => { + await EnsureFileAvailable(e.Data, 3000); + var replay = new ReplayFile(e.Data); + Files.Insert(0, replay); + processingQueue.Add(replay); + }; + _monitor.Start(); + + _analyzer.MinimumBuild = await _uploader.GetMinimumBuild(); + + _ = Task.Run(() => ParseLoop()); + _ = Task.Run(() => FingerprintLoop()); + _ = Task.Run(() => UploadLoop()); + } + + public void Stop() + { + _monitor.Stop(); + processingQueue.CompleteAdding(); + } + + private async Task ParseLoop() + { + //OutputAvailableAsync will keep returning true + //untill all data is processed and processQueue.CompleteAdding is called + + var inFlight = new HashSet(); + var submissionBatch = new List<(Replay, ReplayFile)>(); + var l = new object(); + while (await processingQueue.OutputAvailableAsync()) { + try { + var file = await processingQueue.TakeAsync(); + lock (l) { + inFlight.Add(file); + } + //don't wait for completion of background pool task. + //it's internally limited to a fixed number of low-priority threads + //so we can throw as much work on there as we want without choking it + + //don't submit files for fingerprinting if we have a younger file in-flight + _ = WorkerPool.RunBackground(async () => { + var replay = _analyzer.Analyze(file); + var doEnqueue = Task.CompletedTask; + lock (l) { + _ = inFlight.Remove(file); + + if (replay != null && file.UploadStatus == UploadStatus.Preprocessed) { + submissionBatch.Add((replay, file)); + var youngestSubmit = submissionBatch.Select(rp => rp.Item2.Created).Min(); + var youngerInFlight = inFlight.Any(rf => rf.Created < youngestSubmit); + + if (!youngerInFlight) { + doEnqueue = FingerprintingQueue.EnqueueManyAsync(submissionBatch); + submissionBatch = new List<(Replay, ReplayFile)>(); + } + } + } + await doEnqueue; + }); + } + catch (Exception ex) { + _log.Error(ex, "Error in parse loop"); + } + } + } + + private async Task FingerprintLoop() { + while (true) { + //take batches from the fingerprinting queue, fingerprint + //those with status Preprocessed are checked for duplicates + //(should be all, but future concurrent processes could change that) + //and those that aren't duplicates (have status ReadyForUpload) + //are enqueued for upload + var UnFingerprinted = await FingerprintingQueue.DequeueAsync(); + var eligible = UnFingerprinted.Where(pair => pair.Item2.UploadStatus == UploadStatus.Preprocessed).ToList(); + await _uploader.CheckDuplicate(eligible.Select(pair => pair.Item2)); + var read = eligible.Where(pair => pair.Item2.UploadStatus == UploadStatus.ReadyForUpload); + await UploadQueue.EnqueueManyAsync(read.OrderBy(pair => pair.Item1.Timestamp)); + } + } + private async Task UploadLoop() { + //Make sure that the next upload doesn't *end* before the previous ended + //but it's OK for multiple uploads to run concurrently + var previousDone = Task.CompletedTask; + var l = new object(); + using (var rateLimitUploading = new SemaphoreSlim(MaxUploads)){ + while (true) { + var parsed = await UploadQueue.DequeueAsync(); + foreach (var (replay, replayfile) in parsed) { + if (replayfile.UploadStatus == UploadStatus.ReadyForUpload) { + //don't await the upload task, but bound it by the upload ratelimiter + _ = rateLimitUploading.Locked(async () => { + Task thisDone; + lock (l) { + thisDone = DoFileUpload(replayfile, replay, previousDone); + previousDone = thisDone; + } + await thisDone; + }); + } + } + } + } + } + + + private async Task DoFileUpload(ReplayFile file, Replay replay, Task mayComplete) + { + // Analyze will set the upload status as a side-effect when it's unsuitable for uploading + if (file.UploadStatus == UploadStatus.ReadyForUpload) { + await _uploader.Upload(file, mayComplete); + } + SaveReplayList(); + if (ShouldDelete(file, replay)) { + DeleteReplay(file); + } + } + + private bool IsProcessingStatus(UploadStatus status) => + status == UploadStatus.Preprocessing || + status == UploadStatus.Preprocessed || + status == UploadStatus.ReadyForUpload || + status == UploadStatus.Uploading; + + private void RefreshStatusAndAggregates() + { + _status = Files.Select(x => x.UploadStatus).Any(IsProcessingStatus) ? "Processing..." : "Idle"; + _aggregates = Files.GroupBy(x => x.UploadStatus).ToDictionary(x => x.Key, x => x.Count()); + PropertyChanged?.Invoke(this, new PropertyChangedEventArgs(nameof(Status))); + PropertyChanged?.Invoke(this, new PropertyChangedEventArgs(nameof(Aggregates))); + } + + private void SaveReplayList() + { + try { + // save only replays with fixed status. Will retry failed ones on next launch. + var ignored = new[] { UploadStatus.None, UploadStatus.UploadError}; + bool isIgnored(UploadStatus status) => ignored.Contains(status) || IsProcessingStatus(status); + _storage.Save(Files.Where(file => !isIgnored(file.UploadStatus))); + } + catch (Exception ex) { + _log.Error(ex, "Error saving replay list"); + } + } + + /// + /// Load replay cache and merge it with folder scan results + /// + private IEnumerable ScanReplays() + { + var replays = new List(_storage.Load()); + var lookup = new HashSet(replays); + var comparer = new ReplayFile.ReplayFileComparer(); + replays.AddRange(_monitor.ScanReplays().Select(x => new ReplayFile(x)).Where(x => !lookup.Contains(x, comparer))); + return replays.OrderBy(x => x.Created).ToList(); + } + + /// + /// Delete replay file + /// + private static void DeleteReplay(ReplayFile file) + { + try { + _log.Info($"Deleting replay {file}"); + file.Deleted = true; + File.Delete(file.Filename); + } + catch (Exception ex) { + _log.Error(ex, "Error deleting file"); + } + } + + /// + /// Ensure that HotS client finished writing replay file and it can be safely open + /// + /// Filename to test + /// Timeout in milliseconds + /// Whether to test read or write access + public async Task EnsureFileAvailable(string filename, int timeout, bool testWrite = true) + { + var timer = new Stopwatch(); + timer.Start(); + while (timer.ElapsedMilliseconds < timeout) { + try { + if (testWrite) { + File.OpenWrite(filename).Close(); + } else { + File.OpenRead(filename).Close(); + } + return; + } + catch (IOException) { + // File is still in use + await Task.Delay(100); + } + catch { + return; + } + } + } + + /// + /// Decide whether a replay should be deleted according to current settings + /// + /// replay file metadata + /// Parsed replay + private bool ShouldDelete(ReplayFile file, Replay replay) + { + return + DeleteAfterUpload.HasFlag(DeleteFiles.PTR) && file.UploadStatus == UploadStatus.PtrRegion || + DeleteAfterUpload.HasFlag(DeleteFiles.Ai) && file.UploadStatus == UploadStatus.AiDetected || + DeleteAfterUpload.HasFlag(DeleteFiles.Custom) && file.UploadStatus == UploadStatus.CustomGame || + file.UploadStatus == UploadStatus.Success && ( + DeleteAfterUpload.HasFlag(DeleteFiles.Brawl) && replay.GameMode == GameMode.Brawl || + DeleteAfterUpload.HasFlag(DeleteFiles.QuickMatch) && replay.GameMode == GameMode.QuickMatch || + DeleteAfterUpload.HasFlag(DeleteFiles.UnrankedDraft) && replay.GameMode == GameMode.UnrankedDraft || + DeleteAfterUpload.HasFlag(DeleteFiles.HeroLeague) && replay.GameMode == GameMode.HeroLeague || + DeleteAfterUpload.HasFlag(DeleteFiles.TeamLeague) && replay.GameMode == GameMode.TeamLeague || + DeleteAfterUpload.HasFlag(DeleteFiles.StormLeague) && replay.GameMode == GameMode.StormLeague + ); + } + } } \ No newline at end of file diff --git a/Hotsapi.Uploader.Common/Uploader.cs b/Hotsapi.Uploader.Common/Uploader.cs index ab14dc8..7466035 100644 --- a/Hotsapi.Uploader.Common/Uploader.cs +++ b/Hotsapi.Uploader.Common/Uploader.cs @@ -1,13 +1,11 @@ -using Newtonsoft.Json; -using Newtonsoft.Json.Linq; +using Newtonsoft.Json.Linq; using NLog; using System; using System.Collections.Generic; +using System.IO; using System.Linq; using System.Net; using System.Net.Http; -using System.Text; -using System.Threading; using System.Threading.Tasks; namespace Hotsapi.Uploader.Common @@ -16,11 +14,16 @@ public class Uploader : IUploader { private static readonly Logger _log = LogManager.GetCurrentClassLogger(); #if DEBUG - const string ApiEndpoint = "http://hotsapi.local/api/v1"; + private const string ApiEndpoint = "http://hotsapi.local/api/v1"; #else const string ApiEndpoint = "https://hotsapi.net/api/v1"; #endif + private string UploadUrl => $"{ApiEndpoint}/upload?uploadToHotslogs={UploadToHotslogs}"; + private string BulkFingerprintUrl => $"{ApiEndpoint}/replays/fingerprints?uploadToHotslogs={UploadToHotslogs}"; + private string FingerprintOneUrl(string fingerprint) => $"{ApiEndpoint}/replays/fingerprints/v3/{fingerprint}?uploadToHotslogs={UploadToHotslogs}"; + + public bool UploadToHotslogs { get; set; } /// @@ -35,7 +38,7 @@ public Uploader() /// Upload replay /// /// The file to upload - public async Task Upload(ReplayFile file) + public async Task Upload(ReplayFile file, Task mayComplete) { var doDuplicateCheck = file.UploadStatus != UploadStatus.ReadyForUpload; file.UploadStatus = UploadStatus.Uploading; @@ -43,7 +46,54 @@ public async Task Upload(ReplayFile file) _log.Debug($"File {file} marked as duplicate"); file.UploadStatus = UploadStatus.Duplicate; } else { - file.UploadStatus = await Upload(file.Filename); + file.UploadStatus = await Upload(file.Filename, mayComplete); + } + } + + private class ReplayUpload : HttpContent + { + private const int defaultBuffersize = 1024; + private readonly string filename; + private readonly int buffersize; + private readonly Task mayComplete; + + public ReplayUpload(string filename, int buffersize, Task mayComplete) + { + this.filename = filename; + this.buffersize = buffersize; + this.mayComplete = mayComplete; + } + + public ReplayUpload(string filename, int buffersize) : this(filename, buffersize, Task.CompletedTask) { } + public ReplayUpload(string filename, Task canComplete) : this(filename, defaultBuffersize, canComplete) { } + public ReplayUpload(string filename) : this(filename, defaultBuffersize, Task.CompletedTask) { } + + + protected override async Task SerializeToStreamAsync(Stream stream, TransportContext context) + { + using (var input = File.OpenRead(filename)) { + var buffer = new byte[buffersize]; + var i = 0; + var done = false; + while (!done) { + var availableSpace = buffer.Length - i; + var bytesRead = await input.ReadAsync(buffer, i, availableSpace); + if (availableSpace > bytesRead) { + done = true; + await mayComplete; + } + await stream.WriteAsync(buffer, i, bytesRead); + i = 0; + } + await stream.FlushAsync(); + stream.Close(); + input.Close(); + } + } + protected override bool TryComputeLength(out long length) + { + length = -1; + return false; } } @@ -52,17 +102,18 @@ public async Task Upload(ReplayFile file) /// /// Path to file /// Upload result - public async Task Upload(string file) + public async Task Upload(string file, Task mayComplete) { try { string response; - using (var client = new WebClient()) { - var bytes = await client.UploadFileTaskAsync($"{ApiEndpoint}/upload?uploadToHotslogs={UploadToHotslogs}", file); - response = Encoding.UTF8.GetString(bytes); + using (var client = new HttpClient()) { + var upload = new ReplayUpload(file, mayComplete); + var responseMessage = await client.PostAsync(UploadUrl, upload); + response = await responseMessage.Content.ReadAsStringAsync(); } dynamic json = JObject.Parse(response); if ((bool)json.success) { - if (Enum.TryParse((string)json.status, out UploadStatus status)) { + if (Enum.TryParse((string)json.status, out var status)) { _log.Debug($"Uploaded file '{file}': {status}"); return status; } else { @@ -76,7 +127,7 @@ public async Task Upload(string file) } catch (WebException ex) { if (await CheckApiThrottling(ex.Response)) { - return await Upload(file); + return await Upload(file, mayComplete); } _log.Warn(ex, $"Error uploading file '{file}'"); return UploadStatus.UploadError; @@ -92,7 +143,7 @@ private async Task CheckDuplicate(string fingerprint) try { string response; using (var client = new WebClient()) { - response = await client.DownloadStringTaskAsync($"{ApiEndpoint}/replays/fingerprints/v3/{fingerprint}?uploadToHotslogs={UploadToHotslogs}"); + response = await client.DownloadStringTaskAsync(FingerprintOneUrl(fingerprint)); } dynamic json = JObject.Parse(response); return (bool)json.exists; @@ -106,6 +157,8 @@ private async Task CheckDuplicate(string fingerprint) } } + + /// /// Mass check replay fingerprints against database to detect duplicates /// @@ -115,7 +168,7 @@ private async Task CheckDuplicate(IEnumerable fingerprints) try { string response; using (var client = new WebClient()) { - response = await client.UploadStringTaskAsync($"{ApiEndpoint}/replays/fingerprints?uploadToHotslogs={UploadToHotslogs}", String.Join("\n", fingerprints)); + response = await client.UploadStringTaskAsync(BulkFingerprintUrl, String.Join("\n", fingerprints)); } dynamic json = JObject.Parse(response); return (json.exists as JArray).Select(x => x.ToString()).ToArray(); @@ -129,16 +182,18 @@ private async Task CheckDuplicate(IEnumerable fingerprints) } } + + /// /// Mass check replay fingerprints against database to detect duplicates /// public async Task CheckDuplicate(IEnumerable replays) { - foreach(var replay in replays) { + foreach (var replay in replays) { replay.UploadStatus = UploadStatus.CheckingDuplicates; } var exists = new HashSet(await CheckDuplicate(replays.Select(x => x.Fingerprint))); - foreach(var replay in replays) { + foreach (var replay in replays) { replay.UploadStatus = exists.Contains(replay.Fingerprint) ? UploadStatus.Duplicate : UploadStatus.ReadyForUpload; } } @@ -151,7 +206,7 @@ public async Task GetMinimumBuild() try { using (var client = new WebClient()) { var response = await client.DownloadStringTaskAsync($"{ApiEndpoint}/replays/min-build"); - if (!int.TryParse(response, out int build)) { + if (!Int32.TryParse(response, out var build)) { _log.Warn($"Error parsing minimum build: {response}"); return 0; } @@ -171,7 +226,7 @@ public async Task GetMinimumBuild() /// Check if Hotsapi request limit is reached and wait if it is /// /// Server response to examine - private async Task CheckApiThrottling(WebResponse response) + private static async Task CheckApiThrottling(WebResponse response) { if (response != null && (int)(response as HttpWebResponse).StatusCode == 429) { _log.Warn($"Too many requests, waiting"); From 5a05b47b5b9acfe985bb8c98370507fcd8b386e8 Mon Sep 17 00:00:00 2001 From: Martijn Hoekstra Date: Fri, 3 Jan 2020 22:19:12 +0100 Subject: [PATCH 7/7] ensure MaxUploads is respected --- Hotsapi.Uploader.Common.Test/ManagerTests.cs | 34 ++++++ Hotsapi.Uploader.Common/Analyzer.cs | 11 +- Hotsapi.Uploader.Common/Manager.cs | 8 +- Hotsapi.Uploader.Common/ReplayUpload.cs | 56 ++++++++++ Hotsapi.Uploader.Common/Uploader.cs | 100 ++++++------------ .../UIHelpers/UploadColorConverter.cs | 1 + 6 files changed, 137 insertions(+), 73 deletions(-) create mode 100644 Hotsapi.Uploader.Common/ReplayUpload.cs diff --git a/Hotsapi.Uploader.Common.Test/ManagerTests.cs b/Hotsapi.Uploader.Common.Test/ManagerTests.cs index 89a21e5..9bda7aa 100644 --- a/Hotsapi.Uploader.Common.Test/ManagerTests.cs +++ b/Hotsapi.Uploader.Common.Test/ManagerTests.cs @@ -2,6 +2,7 @@ using System; using System.Collections.Generic; using System.Linq; +using System.Threading; using System.Threading.Tasks; namespace Hotsapi.Uploader.Common.Test @@ -95,5 +96,38 @@ public async Task AllInitialFilesProcessed() var num = await done.Task; Assert.AreEqual(3, num); } + + [TestMethod] + public async Task UploadIsRateLimited() + { + var initialFiles = FilesInOrder; + + var manager = new Manager(new MockStorage(initialFiles)); + var uploadTester = new MockUploader(); + var simulaneousUploads = 0; + var processedUploads = 0; + var totalFiles = initialFiles.Count(); + var isDone = new TaskCompletionSource(); + + uploadTester.UploadStarted = rf => { + var inFlight = Interlocked.Increment(ref simulaneousUploads); + try { + Assert.IsTrue(inFlight <= Manager.MaxUploads, "may not have more uploads in flight than Manager.MaxUploads"); + } catch (Exception e) { + isDone.TrySetException(e); + } + return Task.CompletedTask; + }; + uploadTester.UploadFinished = rf => { + Interlocked.Decrement(ref simulaneousUploads); + if(Interlocked.Increment(ref processedUploads) >= totalFiles) { + isDone.SetResult(processedUploads); + } + return Task.CompletedTask; + }; + + manager.Start(new NoNewFilesMonitor(), new MockAnalizer(), uploadTester); + await isDone.Task; + } } } diff --git a/Hotsapi.Uploader.Common/Analyzer.cs b/Hotsapi.Uploader.Common/Analyzer.cs index 9bc18e7..f048596 100644 --- a/Hotsapi.Uploader.Common/Analyzer.cs +++ b/Hotsapi.Uploader.Common/Analyzer.cs @@ -51,15 +51,16 @@ public Replay Analyze(ReplayFile file) return UploadStatus.PtrRegion; case DataParser.ReplayParseResult.PreAlphaWipe: - return UploadStatus.TooOld; + return UploadStatus.TooOld; case DataParser.ReplayParseResult.Incomplete: + case DataParser.ReplayParseResult.UnexpectedResult: return UploadStatus.Incomplete; } - return parseResult != DataParser.ReplayParseResult.Success ? null - : replay.GameMode == GameMode.Custom ? (UploadStatus?)UploadStatus.CustomGame - : replay.ReplayBuild < MinimumBuild ? (UploadStatus?)UploadStatus.TooOld - : (UploadStatus?)UploadStatus.Preprocessed; + return parseResult != DataParser.ReplayParseResult.Success ? null + : replay.GameMode == GameMode.Custom ? (UploadStatus?)UploadStatus.CustomGame + : replay.ReplayBuild < MinimumBuild ? (UploadStatus?)UploadStatus.TooOld + : (UploadStatus?)UploadStatus.Preprocessed; } /// diff --git a/Hotsapi.Uploader.Common/Manager.cs b/Hotsapi.Uploader.Common/Manager.cs index 9f27ac0..3cc8f70 100644 --- a/Hotsapi.Uploader.Common/Manager.cs +++ b/Hotsapi.Uploader.Common/Manager.cs @@ -150,10 +150,12 @@ private async Task ParseLoop() //don't submit files for fingerprinting if we have a younger file in-flight _ = WorkerPool.RunBackground(async () => { var replay = _analyzer.Analyze(file); + if(replay == null) { + file.UploadStatus = UploadStatus.UploadError; + } var doEnqueue = Task.CompletedTask; lock (l) { _ = inFlight.Remove(file); - if (replay != null && file.UploadStatus == UploadStatus.Preprocessed) { submissionBatch.Add((replay, file)); var youngestSubmit = submissionBatch.Select(rp => rp.Item2.Created).Min(); @@ -190,7 +192,7 @@ private async Task FingerprintLoop() { } private async Task UploadLoop() { //Make sure that the next upload doesn't *end* before the previous ended - //but it's OK for multiple uploads to run concurrently + //but it's OK for up to MaxUploads uploads to run concurrently var previousDone = Task.CompletedTask; var l = new object(); using (var rateLimitUploading = new SemaphoreSlim(MaxUploads)){ @@ -199,7 +201,7 @@ private async Task UploadLoop() { foreach (var (replay, replayfile) in parsed) { if (replayfile.UploadStatus == UploadStatus.ReadyForUpload) { //don't await the upload task, but bound it by the upload ratelimiter - _ = rateLimitUploading.Locked(async () => { + _ = rateLimitUploading.LockedTask(async () => { Task thisDone; lock (l) { thisDone = DoFileUpload(replayfile, replay, previousDone); diff --git a/Hotsapi.Uploader.Common/ReplayUpload.cs b/Hotsapi.Uploader.Common/ReplayUpload.cs new file mode 100644 index 0000000..ca30e39 --- /dev/null +++ b/Hotsapi.Uploader.Common/ReplayUpload.cs @@ -0,0 +1,56 @@ +using System.IO; +using System.Net; +using System.Net.Http; +using System.Threading.Tasks; + +namespace Hotsapi.Uploader.Common +{ + internal class ReplayUpload : HttpContent + { + private const int defaultBuffersize = 1024; + private readonly string filename; + private readonly int buffersize; + private readonly Task mayComplete; + + public ReplayUpload(string filename, int buffersize, Task mayComplete) + { + this.filename = filename; + this.buffersize = buffersize; + this.mayComplete = mayComplete; + } + + public ReplayUpload(string filename, int buffersize) : this(filename, buffersize, Task.CompletedTask) { } + public ReplayUpload(string filename, Task canComplete) : this(filename, defaultBuffersize, canComplete) { } + public ReplayUpload(string filename) : this(filename, defaultBuffersize, Task.CompletedTask) { } + + + protected override async Task SerializeToStreamAsync(Stream stream, TransportContext context) + { + using (var input = File.OpenRead(filename)) { + var buffer = new byte[buffersize]; + var i = 0; + var done = false; + while (!done) { + var availableSpace = buffer.Length - i; + var bytesRead = await input.ReadAsync(buffer, i, availableSpace); + if (bytesRead == 0) { + done = true; + } + if (bytesRead < availableSpace) { + await mayComplete; + } + await stream.WriteAsync(buffer, i, bytesRead); + i = 0; + } + await stream.FlushAsync(); + input.Close(); + } + } + protected override bool TryComputeLength(out long length) + { + length = -1; + return false; + } + } + +} diff --git a/Hotsapi.Uploader.Common/Uploader.cs b/Hotsapi.Uploader.Common/Uploader.cs index 7466035..261a589 100644 --- a/Hotsapi.Uploader.Common/Uploader.cs +++ b/Hotsapi.Uploader.Common/Uploader.cs @@ -1,8 +1,8 @@ -using Newtonsoft.Json.Linq; +using Newtonsoft.Json; +using Newtonsoft.Json.Linq; using NLog; using System; using System.Collections.Generic; -using System.IO; using System.Linq; using System.Net; using System.Net.Http; @@ -16,12 +16,22 @@ public class Uploader : IUploader #if DEBUG private const string ApiEndpoint = "http://hotsapi.local/api/v1"; #else - const string ApiEndpoint = "https://hotsapi.net/api/v1"; + private const string ApiEndpoint = "https://hotsapi.net/api/v1"; #endif private string UploadUrl => $"{ApiEndpoint}/upload?uploadToHotslogs={UploadToHotslogs}"; private string BulkFingerprintUrl => $"{ApiEndpoint}/replays/fingerprints?uploadToHotslogs={UploadToHotslogs}"; private string FingerprintOneUrl(string fingerprint) => $"{ApiEndpoint}/replays/fingerprints/v3/{fingerprint}?uploadToHotslogs={UploadToHotslogs}"; + private HttpClient _httpClient; + private HttpClient HttpClient + { + get { + if (_httpClient == null) { + _httpClient = new HttpClient(); + } + return _httpClient; + } + } public bool UploadToHotslogs { get; set; } @@ -41,62 +51,15 @@ public Uploader() public async Task Upload(ReplayFile file, Task mayComplete) { var doDuplicateCheck = file.UploadStatus != UploadStatus.ReadyForUpload; - file.UploadStatus = UploadStatus.Uploading; if (file.Fingerprint != null && doDuplicateCheck && await CheckDuplicate(file.Fingerprint)) { _log.Debug($"File {file} marked as duplicate"); file.UploadStatus = UploadStatus.Duplicate; } else { + file.UploadStatus = UploadStatus.Uploading; file.UploadStatus = await Upload(file.Filename, mayComplete); } } - private class ReplayUpload : HttpContent - { - private const int defaultBuffersize = 1024; - private readonly string filename; - private readonly int buffersize; - private readonly Task mayComplete; - - public ReplayUpload(string filename, int buffersize, Task mayComplete) - { - this.filename = filename; - this.buffersize = buffersize; - this.mayComplete = mayComplete; - } - - public ReplayUpload(string filename, int buffersize) : this(filename, buffersize, Task.CompletedTask) { } - public ReplayUpload(string filename, Task canComplete) : this(filename, defaultBuffersize, canComplete) { } - public ReplayUpload(string filename) : this(filename, defaultBuffersize, Task.CompletedTask) { } - - - protected override async Task SerializeToStreamAsync(Stream stream, TransportContext context) - { - using (var input = File.OpenRead(filename)) { - var buffer = new byte[buffersize]; - var i = 0; - var done = false; - while (!done) { - var availableSpace = buffer.Length - i; - var bytesRead = await input.ReadAsync(buffer, i, availableSpace); - if (availableSpace > bytesRead) { - done = true; - await mayComplete; - } - await stream.WriteAsync(buffer, i, bytesRead); - i = 0; - } - await stream.FlushAsync(); - stream.Close(); - input.Close(); - } - } - protected override bool TryComputeLength(out long length) - { - length = -1; - return false; - } - } - /// /// Upload replay /// @@ -105,23 +68,29 @@ protected override bool TryComputeLength(out long length) public async Task Upload(string file, Task mayComplete) { try { - string response; - using (var client = new HttpClient()) { - var upload = new ReplayUpload(file, mayComplete); - var responseMessage = await client.PostAsync(UploadUrl, upload); - response = await responseMessage.Content.ReadAsStringAsync(); - } - dynamic json = JObject.Parse(response); - if ((bool)json.success) { - if (Enum.TryParse((string)json.status, out var status)) { - _log.Debug($"Uploaded file '{file}': {status}"); - return status; + var upload = new ReplayUpload(file, mayComplete); + var multipart = new MultipartFormDataContent { + { upload, "file", file } + }; + var responseMessage = await HttpClient.PostAsync(UploadUrl, multipart); + var response = await responseMessage.Content.ReadAsStringAsync(); + try { + dynamic json = JObject.Parse(response); + if ((bool)json.success) { + if (Enum.TryParse((string)json.status, out var status)) { + _log.Debug($"Uploaded file '{file}': {status}"); + return status; + } else { + _log.Error($"Unknown upload status '{file}': {json.status}"); + return UploadStatus.UploadError; + } } else { - _log.Error($"Unknown upload status '{file}': {json.status}"); + _log.Warn($"Error uploading file '{file}': {response}"); return UploadStatus.UploadError; } - } else { - _log.Warn($"Error uploading file '{file}': {response}"); + } + catch(JsonReaderException jre) { + _log.Warn($"Error processing upload response for file '{file}': {jre.Message}"); return UploadStatus.UploadError; } } @@ -237,4 +206,5 @@ private static async Task CheckApiThrottling(WebResponse response) } } } + } diff --git a/Hotsapi.Uploader.Windows/UIHelpers/UploadColorConverter.cs b/Hotsapi.Uploader.Windows/UIHelpers/UploadColorConverter.cs index 3e466e1..544c11d 100644 --- a/Hotsapi.Uploader.Windows/UIHelpers/UploadColorConverter.cs +++ b/Hotsapi.Uploader.Windows/UIHelpers/UploadColorConverter.cs @@ -15,6 +15,7 @@ protected override Brush Convert(UploadStatus value) case UploadStatus.Preprocessing: case UploadStatus.Preprocessed: + case UploadStatus.ReadyForUpload: case UploadStatus.Uploading: return GetBrush("StatusUploadInProgressBrush");