diff --git a/Hotsapi.Uploader.Common.Test/ManagerTests.cs b/Hotsapi.Uploader.Common.Test/ManagerTests.cs index ff33583..9bda7aa 100644 --- a/Hotsapi.Uploader.Common.Test/ManagerTests.cs +++ b/Hotsapi.Uploader.Common.Test/ManagerTests.cs @@ -1,6 +1,8 @@ using Microsoft.VisualStudio.TestTools.UnitTesting; using System; using System.Collections.Generic; +using System.Linq; +using System.Threading; using System.Threading.Tasks; namespace Hotsapi.Uploader.Common.Test @@ -8,74 +10,33 @@ namespace Hotsapi.Uploader.Common.Test [TestClass] public partial class ManagerTests { - private Task ShortRandomDelay() + public static Task ShortRandomDelay() { var r = new Random(); var delay = r.Next(100, 200); return Task.Delay(delay); } - private static IEnumerable ThreeInOrder + private static IEnumerable FilesInOrder { get { - var one = new ReplayFile("one") { - Created = new DateTime(2020, 1, 1, 0, 0, 1) - }; - var two = new ReplayFile("two") { - Created = new DateTime(2020, 1, 1, 0, 0, 10) - }; - var three = new ReplayFile("three") { - Created = new DateTime(2020, 1, 1, 0, 0, 20) - }; - var initialFiles = new List() { one, two, three }; - return initialFiles; + var next = new DateTime(2020, 1, 1, 0, 0, 0); + var increment = new TimeSpan(0, 0, 1); + var rand = new Random(); + var nums = Enumerable.Range(1, 24).OrderBy(rf => rand.NextDouble()); + + return nums.Select(i => { + next += increment; + return new ReplayFile($"upload_{i}") { + Created = next + }; + }); + } } - - [TestMethod] - [Ignore("Known intermittant failure: multiple uploads are started in parallel and don't always start in order")] - public async Task InitialFilesStartInOrder() - { - var initialFiles = ThreeInOrder; - - var manager = new Manager(new MockStorage(initialFiles)); - var uploadTester = new MockUploader(); - - var promise = new TaskCompletionSource(); - Task done = promise.Task; - - var uploadsSeen = 0; - var l = new object(); - ReplayFile lastUploadStarted = null; - uploadTester.SetUploadCallback(async rf => { - if (lastUploadStarted != null) { - try { - Assert.IsTrue(rf.Created >= lastUploadStarted.Created, $"upload started out of order, {lastUploadStarted} started after {rf}"); - } catch (Exception e) { - promise.TrySetException(e); - } - } - lastUploadStarted = rf; - await ShortRandomDelay(); - var isDone = false; - lock (l) { - uploadsSeen++; - isDone = uploadsSeen >= 3; - } - if (isDone) { - promise.TrySetResult(uploadsSeen); - } - }); - - manager.Start(new NoNewFilesMonitor(), new MockAnalizer(), uploadTester); - await done; - } - - [TestMethod] - [Ignore("Known intermittant failure: multiple uploads are started in parallel and don't always end in order")] public async Task InitialFilesEndInorder() { - var initialFiles = ThreeInOrder; + var initialFiles = FilesInOrder; var manager = new Manager(new MockStorage(initialFiles)); var uploadTester = new MockUploader(); @@ -85,11 +46,11 @@ public async Task InitialFilesEndInorder() { var uploadsSeen = 0; var l = new object(); ReplayFile lastUploadFinished = null; - uploadTester.SetUploadCallback(async rf => { - await ShortRandomDelay(); + uploadTester.UploadFinished = async rf => { if (lastUploadFinished != null) { try { - Assert.IsTrue(rf.Created >= lastUploadFinished.Created, $"upload completed out of order, {lastUploadFinished} completed after {rf}"); + var isInOrder = rf.Created >= lastUploadFinished.Created; + Assert.IsTrue(isInOrder, $"upload completed out of order, {rf} completed after {lastUploadFinished}"); } catch (Exception e) { promise.TrySetException(e); @@ -104,7 +65,7 @@ public async Task InitialFilesEndInorder() { if (isDone) { promise.TrySetResult(uploadsSeen); } - }); + }; manager.Start(new NoNewFilesMonitor(), new MockAnalizer(), uploadTester); await done; @@ -113,7 +74,7 @@ public async Task InitialFilesEndInorder() { [TestMethod] public async Task AllInitialFilesProcessed() { - var initialFiles = ThreeInOrder; + var initialFiles = FilesInOrder; var manager = new Manager(new MockStorage(initialFiles)); var uploadTester = new MockUploader(); @@ -121,7 +82,7 @@ public async Task AllInitialFilesProcessed() var uploadsSeen = 0; object l = new object(); - uploadTester.SetUploadCallback(async rf => { + uploadTester.UploadFinished = async rf => { await ShortRandomDelay(); lock (l) { uploadsSeen++; @@ -129,12 +90,44 @@ public async Task AllInitialFilesProcessed() done.SetResult(uploadsSeen); } } - }); + }; manager.Start(new NoNewFilesMonitor(), new MockAnalizer(), uploadTester); - var finished = await Task.WhenAny(Task.Delay(4000), done.Task); - await finished; - Assert.AreEqual(3, uploadsSeen); + var num = await done.Task; + Assert.AreEqual(3, num); + } + + [TestMethod] + public async Task UploadIsRateLimited() + { + var initialFiles = FilesInOrder; + + var manager = new Manager(new MockStorage(initialFiles)); + var uploadTester = new MockUploader(); + var simulaneousUploads = 0; + var processedUploads = 0; + var totalFiles = initialFiles.Count(); + var isDone = new TaskCompletionSource(); + + uploadTester.UploadStarted = rf => { + var inFlight = Interlocked.Increment(ref simulaneousUploads); + try { + Assert.IsTrue(inFlight <= Manager.MaxUploads, "may not have more uploads in flight than Manager.MaxUploads"); + } catch (Exception e) { + isDone.TrySetException(e); + } + return Task.CompletedTask; + }; + uploadTester.UploadFinished = rf => { + Interlocked.Decrement(ref simulaneousUploads); + if(Interlocked.Increment(ref processedUploads) >= totalFiles) { + isDone.SetResult(processedUploads); + } + return Task.CompletedTask; + }; + + manager.Start(new NoNewFilesMonitor(), new MockAnalizer(), uploadTester); + await isDone.Task; } } } diff --git a/Hotsapi.Uploader.Common.Test/MockAnalizer.cs b/Hotsapi.Uploader.Common.Test/MockAnalizer.cs index 318a4df..c7e2b7c 100644 --- a/Hotsapi.Uploader.Common.Test/MockAnalizer.cs +++ b/Hotsapi.Uploader.Common.Test/MockAnalizer.cs @@ -7,7 +7,12 @@ public partial class ManagerTests private class MockAnalizer : IAnalyzer { public int MinimumBuild { get; set; } - public Replay Analyze(ReplayFile file) => new Replay(); + public Replay Analyze(ReplayFile file) { + file.UploadStatus = UploadStatus.Preprocessed; + return new Replay() { + Timestamp = file.Created + }; + } public string GetFingerprint(Replay replay) => "dummy fingerprint"; } } diff --git a/Hotsapi.Uploader.Common.Test/MockUploader.cs b/Hotsapi.Uploader.Common.Test/MockUploader.cs index 332e063..c4eed15 100644 --- a/Hotsapi.Uploader.Common.Test/MockUploader.cs +++ b/Hotsapi.Uploader.Common.Test/MockUploader.cs @@ -10,27 +10,27 @@ private class MockUploader : IUploader { public bool UploadToHotslogs { get; set; } - private Func UploadCallback = _ => Task.CompletedTask; - public void SetUploadCallback(Func onUpload) - { - var old = UploadCallback; - - UploadCallback = async (ReplayFile file) => { - await old(file); - await onUpload(file); - }; - } + public Func UploadStarted { get; set; } = _ => Task.CompletedTask; + public Func UploadFinished { get; set; } = _ => Task.CompletedTask; - public Task CheckDuplicate(IEnumerable replays) => Task.CompletedTask; + public async Task CheckDuplicate(IEnumerable replays) + { + foreach (var replay in replays) { + replay.UploadStatus = UploadStatus.ReadyForUpload; + } + await ShortRandomDelay(); + } public Task GetMinimumBuild() => Task.FromResult(1); - public Task Upload(ReplayFile file) + public async Task Upload(ReplayFile file, Task mayComplete) { - UploadCallback(file); - return Task.CompletedTask; + await UploadStarted(file); + await Upload(file.Filename, mayComplete); + await UploadFinished(file); } - public async Task Upload(string file) + public async Task Upload(string file, Task mayComplete) { - await Task.Delay(100); + await ShortRandomDelay(); + await mayComplete; return UploadStatus.Success; } } diff --git a/Hotsapi.Uploader.Common/Analyzer.cs b/Hotsapi.Uploader.Common/Analyzer.cs index 9dd5eb3..f048596 100644 --- a/Hotsapi.Uploader.Common/Analyzer.cs +++ b/Hotsapi.Uploader.Common/Analyzer.cs @@ -16,21 +16,17 @@ public class Analyzer : IAnalyzer private static Logger _log = LogManager.GetCurrentClassLogger(); /// - /// Analyze replay locally before uploading + /// Analyze replay locally before uploading. + /// + /// Sets file status as a side-effect. /// /// Replay file public Replay Analyze(ReplayFile file) { try { - var result = DataParser.ParseReplay(file.Filename, false, false, false, true); - var replay = result.Item2; - var parseResult = result.Item1; - var status = GetPreStatus(replay, parseResult); - - if (status != null) { - file.UploadStatus = status.Value; - } - + file.UploadStatus = UploadStatus.Preprocessing; + var (parseResult, replay) = DataParser.ParseReplay(file.Filename, false, false, false, true); + file.UploadStatus = GetPreStatus(replay, parseResult) ?? file.UploadStatus; if (parseResult != DataParser.ReplayParseResult.Success) { return null; } @@ -44,7 +40,7 @@ public Replay Analyze(ReplayFile file) } } - public UploadStatus? GetPreStatus(Replay replay, DataParser.ReplayParseResult parseResult) + private UploadStatus? GetPreStatus(Replay replay, DataParser.ReplayParseResult parseResult) { switch (parseResult) { case DataParser.ReplayParseResult.ComputerPlayerFound: @@ -56,21 +52,15 @@ public Replay Analyze(ReplayFile file) case DataParser.ReplayParseResult.PreAlphaWipe: return UploadStatus.TooOld; + case DataParser.ReplayParseResult.Incomplete: + case DataParser.ReplayParseResult.UnexpectedResult: + return UploadStatus.Incomplete; } - if (parseResult != DataParser.ReplayParseResult.Success) { - return null; - } - - if (replay.GameMode == GameMode.Custom) { - return UploadStatus.CustomGame; - } - - if (replay.ReplayBuild < MinimumBuild) { - return UploadStatus.TooOld; - } - - return null; + return parseResult != DataParser.ReplayParseResult.Success ? null + : replay.GameMode == GameMode.Custom ? (UploadStatus?)UploadStatus.CustomGame + : replay.ReplayBuild < MinimumBuild ? (UploadStatus?)UploadStatus.TooOld + : (UploadStatus?)UploadStatus.Preprocessed; } /// diff --git a/Hotsapi.Uploader.Common/ConcurrentAyncBuffer.cs b/Hotsapi.Uploader.Common/ConcurrentAyncBuffer.cs new file mode 100644 index 0000000..9ec144b --- /dev/null +++ b/Hotsapi.Uploader.Common/ConcurrentAyncBuffer.cs @@ -0,0 +1,98 @@ +using System.Collections.Generic; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; + +namespace Hotsapi.Uploader.Common +{ + /// + /// A thread-safe, concurrent buffer that has asynchronous operations for enqueuing and bulk dequeueing. + /// + public class ConcurrentAyncBuffer + { + private Queue Items { get; } = new Queue(); + private Queue>> BulkRequests { get; } = new Queue>>(); + private SemaphoreSlim Mutex { get; } = new SemaphoreSlim(1); + /// + /// Enqueue a single item + /// + /// the item to enqueue + /// A task that will be completed when the item is enqueued + public Task EnqueueAsync(A item) => EnqueueAsync(item, CancellationToken.None); + /// + /// Enqueue a single item + /// + /// the item to enqueue + /// a cancellation token to cancel trying to enqueue the item + /// A task that will be completed when the item is enqueued + public Task EnqueueAsync(A item, CancellationToken token) => EnqueueManyAsync(new List() { item }, token); + /// + /// Enqueue all items + /// + /// the items to enqueue + /// a cancellation token to cancel trying to enqueue items + /// A task that will be completed when the items are enqueued + public Task EnqueueManyAsync(IEnumerable items, CancellationToken token) => Mutex.Locked(() => { + var succeeded = false; + while (!succeeded) { + if (BulkRequests.Any()) { + succeeded = BulkRequests.Dequeue().TrySetResult(items); + } else { + foreach (var item in items) { + Items.Enqueue(item); + } + succeeded = true; + } + } + }, token); + + /// + /// Enqueue all items + /// + /// the items to enqueue + /// a task that will be completed when the items are enqueued. + public Task EnqueueManyAsync(IEnumerable items) => EnqueueManyAsync(items, CancellationToken.None); + + /// + /// Dequeue all A's as soon as they're available. + /// + /// A cancellationtoken that will cancel fetching data + /// A task that will complete with all A's available + public async Task> DequeueAsync(CancellationToken token) + { + var locked = 0; + try { + await Mutex.WaitAsync(token); + locked = 1; + Task> resultTask; + if (Items.Any()) { + var result = new List(); + while (Items.Any()) { + result.Add(Items.Dequeue()); + } + resultTask = Task.FromResult>(result); + } else { + var completion = new TaskCompletionSource>(); + BulkRequests.Enqueue(completion); + _ = token.Register(() => completion.TrySetCanceled()); + resultTask = completion.Task; + } + if (locked > 0) { + _ = Mutex.Release(locked); + } + locked = 0; + return await resultTask; + } + finally { + if (locked > 0) { + _ = Mutex.Release(locked); + } + } + } + /// + /// Dequeue all A's as soon as any are available + /// + /// A task that will complete with all A's available + public Task> DequeueAsync() => DequeueAsync(CancellationToken.None); + } +} diff --git a/Hotsapi.Uploader.Common/IUploader.cs b/Hotsapi.Uploader.Common/IUploader.cs index b731770..ca16573 100644 --- a/Hotsapi.Uploader.Common/IUploader.cs +++ b/Hotsapi.Uploader.Common/IUploader.cs @@ -8,7 +8,7 @@ public interface IUploader bool UploadToHotslogs { get; set; } Task CheckDuplicate(IEnumerable replays); Task GetMinimumBuild(); - Task Upload(ReplayFile file); - Task Upload(string file); + Task Upload(ReplayFile file, Task mayComplete); + Task Upload(string file, Task mayComplete); } } \ No newline at end of file diff --git a/Hotsapi.Uploader.Common/LockExtensions.cs b/Hotsapi.Uploader.Common/LockExtensions.cs new file mode 100644 index 0000000..d590ee4 --- /dev/null +++ b/Hotsapi.Uploader.Common/LockExtensions.cs @@ -0,0 +1,77 @@ +using System; +using System.Threading; +using System.Threading.Tasks; + +namespace Hotsapi.Uploader.Common +{ + /// + /// Provides extensions for using SemaphoreSlim in a safe way that will always release the semaphore + /// + public static class LockExtensions + { + public static async Task Locked(this SemaphoreSlim semaphore, Func thunk, CancellationToken token) + { + var locked = 0; + try { + await semaphore.WaitAsync(token); + locked = 1; + return thunk(); + } + finally { + if (locked > 0) { + _ = semaphore.Release(locked); + } + } + } + + public static async Task Locked(this SemaphoreSlim semaphore, Action thunk, CancellationToken token) + { + var locked = 0; + try { + await semaphore.WaitAsync(token); + locked = 1; + thunk(); + } + finally { + if (locked > 0) { + _ = semaphore.Release(locked); + } + } + } + + public static async Task LockedTask(this SemaphoreSlim semaphore, Func thunk, CancellationToken token) + { + var locked = 0; + try { + await semaphore.WaitAsync(token); + locked = 1; + await thunk(); + } + finally { + if (locked > 0) { + _ = semaphore.Release(locked); + } + } + } + + public static async Task LockedTask(this SemaphoreSlim semaphore, Func> thunk, CancellationToken token) + { + var locked = 0; + try { + await semaphore.WaitAsync(token); + locked = 1; + return await thunk(); + } + finally { + if (locked > 0) { + _ = semaphore.Release(locked); + } + } + } + + public static Task Locked(this SemaphoreSlim semaphore, Func thunk) => Locked(semaphore, thunk, CancellationToken.None); + public static Task Locked(this SemaphoreSlim semaphore, Action thunk) => Locked(semaphore, thunk, CancellationToken.None); + public static Task LockedTask(this SemaphoreSlim semaphore, Func> thunk) => LockedTask(semaphore, thunk, CancellationToken.None); + public static Task LockedTask(this SemaphoreSlim semaphore, Func thunk) => LockedTask(semaphore, thunk, CancellationToken.None); + } +} diff --git a/Hotsapi.Uploader.Common/Manager.cs b/Hotsapi.Uploader.Common/Manager.cs index d994bd2..3cc8f70 100644 --- a/Hotsapi.Uploader.Common/Manager.cs +++ b/Hotsapi.Uploader.Common/Manager.cs @@ -19,9 +19,9 @@ namespace Hotsapi.Uploader.Common public class Manager : INotifyPropertyChanged { /// - /// Upload thead count + /// Maximum number of simultaneous uploads in progress /// - public const int MaxThreads = 4; + public const int MaxUploads = 4; /// /// Replay list @@ -30,7 +30,9 @@ public class Manager : INotifyPropertyChanged private static Logger _log = LogManager.GetCurrentClassLogger(); private bool _initialized = false; - private AsyncCollection processingQueue = new AsyncCollection(new ConcurrentStack()); + private AsyncCollection processingQueue = new AsyncCollection(new ConcurrentQueue()); + private ConcurrentAyncBuffer<(Replay, ReplayFile)> FingerprintingQueue = new ConcurrentAyncBuffer<(Replay, ReplayFile)>(); + private ConcurrentAyncBuffer<(Replay, ReplayFile)> UploadQueue = new ConcurrentAyncBuffer<(Replay, ReplayFile)>(); private readonly IReplayStorage _storage; private IUploader _uploader; private IAnalyzer _analyzer; @@ -82,7 +84,7 @@ public bool UploadToHotslogs public Manager(IReplayStorage storage) { - this._storage = storage; + _storage = storage; Files.ItemPropertyChanged += (_, __) => { RefreshStatusAndAggregates(); }; Files.CollectionChanged += (_, __) => { RefreshStatusAndAggregates(); }; } @@ -102,8 +104,9 @@ public async void Start(IMonitor monitor, IAnalyzer analyzer, IUploader uploader _monitor = monitor; var replays = ScanReplays(); - Files.AddRange(replays); - replays.Where(x => x.UploadStatus == UploadStatus.None).Reverse().Map(x => processingQueue.Add(x)); + Files.AddRange(replays.Reverse()); + replays.Where(x => x.UploadStatus == UploadStatus.None) + .Map(processingQueue.Add); _monitor.ReplayAdded += async (_, e) => { await EnsureFileAvailable(e.Data, 3000); @@ -115,9 +118,9 @@ public async void Start(IMonitor monitor, IAnalyzer analyzer, IUploader uploader _analyzer.MinimumBuild = await _uploader.GetMinimumBuild(); - for (int i = 0; i < MaxThreads; i++) { - Task.Run(UploadLoop).Forget(); - } + _ = Task.Run(() => ParseLoop()); + _ = Task.Run(() => FingerprintLoop()); + _ = Task.Run(() => UploadLoop()); } public void Stop() @@ -126,34 +129,114 @@ public void Stop() processingQueue.CompleteAdding(); } - private async Task UploadLoop() + private async Task ParseLoop() { + //OutputAvailableAsync will keep returning true + //untill all data is processed and processQueue.CompleteAdding is called + + var inFlight = new HashSet(); + var submissionBatch = new List<(Replay, ReplayFile)>(); + var l = new object(); while (await processingQueue.OutputAvailableAsync()) { try { var file = await processingQueue.TakeAsync(); + lock (l) { + inFlight.Add(file); + } + //don't wait for completion of background pool task. + //it's internally limited to a fixed number of low-priority threads + //so we can throw as much work on there as we want without choking it - file.UploadStatus = UploadStatus.InProgress; + //don't submit files for fingerprinting if we have a younger file in-flight + _ = WorkerPool.RunBackground(async () => { + var replay = _analyzer.Analyze(file); + if(replay == null) { + file.UploadStatus = UploadStatus.UploadError; + } + var doEnqueue = Task.CompletedTask; + lock (l) { + _ = inFlight.Remove(file); + if (replay != null && file.UploadStatus == UploadStatus.Preprocessed) { + submissionBatch.Add((replay, file)); + var youngestSubmit = submissionBatch.Select(rp => rp.Item2.Created).Min(); + var youngerInFlight = inFlight.Any(rf => rf.Created < youngestSubmit); - // test if replay is eligible for upload (not AI, PTR, Custom, etc) - var replay = _analyzer.Analyze(file); - if (file.UploadStatus == UploadStatus.InProgress) { - // if it is, upload it - await _uploader.Upload(file); - } - SaveReplayList(); - if (ShouldDelete(file, replay)) { - DeleteReplay(file); - } + if (!youngerInFlight) { + doEnqueue = FingerprintingQueue.EnqueueManyAsync(submissionBatch); + submissionBatch = new List<(Replay, ReplayFile)>(); + } + } + } + await doEnqueue; + }); } catch (Exception ex) { - _log.Error(ex, "Error in upload loop"); + _log.Error(ex, "Error in parse loop"); } } } + private async Task FingerprintLoop() { + while (true) { + //take batches from the fingerprinting queue, fingerprint + //those with status Preprocessed are checked for duplicates + //(should be all, but future concurrent processes could change that) + //and those that aren't duplicates (have status ReadyForUpload) + //are enqueued for upload + var UnFingerprinted = await FingerprintingQueue.DequeueAsync(); + var eligible = UnFingerprinted.Where(pair => pair.Item2.UploadStatus == UploadStatus.Preprocessed).ToList(); + await _uploader.CheckDuplicate(eligible.Select(pair => pair.Item2)); + var read = eligible.Where(pair => pair.Item2.UploadStatus == UploadStatus.ReadyForUpload); + await UploadQueue.EnqueueManyAsync(read.OrderBy(pair => pair.Item1.Timestamp)); + } + } + private async Task UploadLoop() { + //Make sure that the next upload doesn't *end* before the previous ended + //but it's OK for up to MaxUploads uploads to run concurrently + var previousDone = Task.CompletedTask; + var l = new object(); + using (var rateLimitUploading = new SemaphoreSlim(MaxUploads)){ + while (true) { + var parsed = await UploadQueue.DequeueAsync(); + foreach (var (replay, replayfile) in parsed) { + if (replayfile.UploadStatus == UploadStatus.ReadyForUpload) { + //don't await the upload task, but bound it by the upload ratelimiter + _ = rateLimitUploading.LockedTask(async () => { + Task thisDone; + lock (l) { + thisDone = DoFileUpload(replayfile, replay, previousDone); + previousDone = thisDone; + } + await thisDone; + }); + } + } + } + } + } + + + private async Task DoFileUpload(ReplayFile file, Replay replay, Task mayComplete) + { + // Analyze will set the upload status as a side-effect when it's unsuitable for uploading + if (file.UploadStatus == UploadStatus.ReadyForUpload) { + await _uploader.Upload(file, mayComplete); + } + SaveReplayList(); + if (ShouldDelete(file, replay)) { + DeleteReplay(file); + } + } + + private bool IsProcessingStatus(UploadStatus status) => + status == UploadStatus.Preprocessing || + status == UploadStatus.Preprocessed || + status == UploadStatus.ReadyForUpload || + status == UploadStatus.Uploading; + private void RefreshStatusAndAggregates() { - _status = Files.Any(x => x.UploadStatus == UploadStatus.InProgress) ? "Uploading..." : "Idle"; + _status = Files.Select(x => x.UploadStatus).Any(IsProcessingStatus) ? "Processing..." : "Idle"; _aggregates = Files.GroupBy(x => x.UploadStatus).ToDictionary(x => x.Key, x => x.Count()); PropertyChanged?.Invoke(this, new PropertyChangedEventArgs(nameof(Status))); PropertyChanged?.Invoke(this, new PropertyChangedEventArgs(nameof(Aggregates))); @@ -163,8 +246,9 @@ private void SaveReplayList() { try { // save only replays with fixed status. Will retry failed ones on next launch. - var ignored = new[] { UploadStatus.None, UploadStatus.UploadError, UploadStatus.InProgress }; - _storage.Save(Files.Where(x => !ignored.Contains(x.UploadStatus))); + var ignored = new[] { UploadStatus.None, UploadStatus.UploadError}; + bool isIgnored(UploadStatus status) => ignored.Contains(status) || IsProcessingStatus(status); + _storage.Save(Files.Where(file => !isIgnored(file.UploadStatus))); } catch (Exception ex) { _log.Error(ex, "Error saving replay list"); @@ -174,13 +258,13 @@ private void SaveReplayList() /// /// Load replay cache and merge it with folder scan results /// - private List ScanReplays() + private IEnumerable ScanReplays() { var replays = new List(_storage.Load()); var lookup = new HashSet(replays); var comparer = new ReplayFile.ReplayFileComparer(); replays.AddRange(_monitor.ScanReplays().Select(x => new ReplayFile(x)).Where(x => !lookup.Contains(x, comparer))); - return replays.OrderByDescending(x => x.Created).ToList(); + return replays.OrderBy(x => x.Created).ToList(); } /// diff --git a/Hotsapi.Uploader.Common/ReplayUpload.cs b/Hotsapi.Uploader.Common/ReplayUpload.cs new file mode 100644 index 0000000..ca30e39 --- /dev/null +++ b/Hotsapi.Uploader.Common/ReplayUpload.cs @@ -0,0 +1,56 @@ +using System.IO; +using System.Net; +using System.Net.Http; +using System.Threading.Tasks; + +namespace Hotsapi.Uploader.Common +{ + internal class ReplayUpload : HttpContent + { + private const int defaultBuffersize = 1024; + private readonly string filename; + private readonly int buffersize; + private readonly Task mayComplete; + + public ReplayUpload(string filename, int buffersize, Task mayComplete) + { + this.filename = filename; + this.buffersize = buffersize; + this.mayComplete = mayComplete; + } + + public ReplayUpload(string filename, int buffersize) : this(filename, buffersize, Task.CompletedTask) { } + public ReplayUpload(string filename, Task canComplete) : this(filename, defaultBuffersize, canComplete) { } + public ReplayUpload(string filename) : this(filename, defaultBuffersize, Task.CompletedTask) { } + + + protected override async Task SerializeToStreamAsync(Stream stream, TransportContext context) + { + using (var input = File.OpenRead(filename)) { + var buffer = new byte[buffersize]; + var i = 0; + var done = false; + while (!done) { + var availableSpace = buffer.Length - i; + var bytesRead = await input.ReadAsync(buffer, i, availableSpace); + if (bytesRead == 0) { + done = true; + } + if (bytesRead < availableSpace) { + await mayComplete; + } + await stream.WriteAsync(buffer, i, bytesRead); + i = 0; + } + await stream.FlushAsync(); + input.Close(); + } + } + protected override bool TryComputeLength(out long length) + { + length = -1; + return false; + } + } + +} diff --git a/Hotsapi.Uploader.Common/UploadStatus.cs b/Hotsapi.Uploader.Common/UploadStatus.cs index d2bb9ef..bfe4d25 100644 --- a/Hotsapi.Uploader.Common/UploadStatus.cs +++ b/Hotsapi.Uploader.Common/UploadStatus.cs @@ -8,13 +8,18 @@ public enum UploadStatus { None, Success, - InProgress, + Preprocessing, + Preprocessed, + ReadyForUpload, + Uploading, UploadError, + CheckingDuplicates, Duplicate, AiDetected, CustomGame, PtrRegion, Incomplete, - TooOld, + TooOld, + } } diff --git a/Hotsapi.Uploader.Common/Uploader.cs b/Hotsapi.Uploader.Common/Uploader.cs index 130f947..261a589 100644 --- a/Hotsapi.Uploader.Common/Uploader.cs +++ b/Hotsapi.Uploader.Common/Uploader.cs @@ -1,11 +1,11 @@ -using Newtonsoft.Json.Linq; +using Newtonsoft.Json; +using Newtonsoft.Json.Linq; using NLog; using System; using System.Collections.Generic; using System.Linq; using System.Net; -using System.Text; -using System.Threading; +using System.Net.Http; using System.Threading.Tasks; namespace Hotsapi.Uploader.Common @@ -14,11 +14,26 @@ public class Uploader : IUploader { private static readonly Logger _log = LogManager.GetCurrentClassLogger(); #if DEBUG - const string ApiEndpoint = "http://hotsapi.local/api/v1"; + private const string ApiEndpoint = "http://hotsapi.local/api/v1"; #else - const string ApiEndpoint = "https://hotsapi.net/api/v1"; + private const string ApiEndpoint = "https://hotsapi.net/api/v1"; #endif + private string UploadUrl => $"{ApiEndpoint}/upload?uploadToHotslogs={UploadToHotslogs}"; + private string BulkFingerprintUrl => $"{ApiEndpoint}/replays/fingerprints?uploadToHotslogs={UploadToHotslogs}"; + private string FingerprintOneUrl(string fingerprint) => $"{ApiEndpoint}/replays/fingerprints/v3/{fingerprint}?uploadToHotslogs={UploadToHotslogs}"; + private HttpClient _httpClient; + private HttpClient HttpClient + { + get { + if (_httpClient == null) { + _httpClient = new HttpClient(); + } + return _httpClient; + } + } + + public bool UploadToHotslogs { get; set; } /// @@ -32,15 +47,16 @@ public Uploader() /// /// Upload replay /// - /// - public async Task Upload(ReplayFile file) + /// The file to upload + public async Task Upload(ReplayFile file, Task mayComplete) { - file.UploadStatus = UploadStatus.InProgress; - if (file.Fingerprint != null && await CheckDuplicate(file.Fingerprint)) { + var doDuplicateCheck = file.UploadStatus != UploadStatus.ReadyForUpload; + if (file.Fingerprint != null && doDuplicateCheck && await CheckDuplicate(file.Fingerprint)) { _log.Debug($"File {file} marked as duplicate"); file.UploadStatus = UploadStatus.Duplicate; } else { - file.UploadStatus = await Upload(file.Filename); + file.UploadStatus = UploadStatus.Uploading; + file.UploadStatus = await Upload(file.Filename, mayComplete); } } @@ -49,31 +65,38 @@ public async Task Upload(ReplayFile file) /// /// Path to file /// Upload result - public async Task Upload(string file) + public async Task Upload(string file, Task mayComplete) { try { - string response; - using (var client = new WebClient()) { - var bytes = await client.UploadFileTaskAsync($"{ApiEndpoint}/upload?uploadToHotslogs={UploadToHotslogs}", file); - response = Encoding.UTF8.GetString(bytes); - } - dynamic json = JObject.Parse(response); - if ((bool)json.success) { - if (Enum.TryParse((string)json.status, out UploadStatus status)) { - _log.Debug($"Uploaded file '{file}': {status}"); - return status; + var upload = new ReplayUpload(file, mayComplete); + var multipart = new MultipartFormDataContent { + { upload, "file", file } + }; + var responseMessage = await HttpClient.PostAsync(UploadUrl, multipart); + var response = await responseMessage.Content.ReadAsStringAsync(); + try { + dynamic json = JObject.Parse(response); + if ((bool)json.success) { + if (Enum.TryParse((string)json.status, out var status)) { + _log.Debug($"Uploaded file '{file}': {status}"); + return status; + } else { + _log.Error($"Unknown upload status '{file}': {json.status}"); + return UploadStatus.UploadError; + } } else { - _log.Error($"Unknown upload status '{file}': {json.status}"); + _log.Warn($"Error uploading file '{file}': {response}"); return UploadStatus.UploadError; } - } else { - _log.Warn($"Error uploading file '{file}': {response}"); + } + catch(JsonReaderException jre) { + _log.Warn($"Error processing upload response for file '{file}': {jre.Message}"); return UploadStatus.UploadError; } } catch (WebException ex) { if (await CheckApiThrottling(ex.Response)) { - return await Upload(file); + return await Upload(file, mayComplete); } _log.Warn(ex, $"Error uploading file '{file}'"); return UploadStatus.UploadError; @@ -89,7 +112,7 @@ private async Task CheckDuplicate(string fingerprint) try { string response; using (var client = new WebClient()) { - response = await client.DownloadStringTaskAsync($"{ApiEndpoint}/replays/fingerprints/v3/{fingerprint}?uploadToHotslogs={UploadToHotslogs}"); + response = await client.DownloadStringTaskAsync(FingerprintOneUrl(fingerprint)); } dynamic json = JObject.Parse(response); return (bool)json.exists; @@ -103,6 +126,8 @@ private async Task CheckDuplicate(string fingerprint) } } + + /// /// Mass check replay fingerprints against database to detect duplicates /// @@ -112,7 +137,7 @@ private async Task CheckDuplicate(IEnumerable fingerprints) try { string response; using (var client = new WebClient()) { - response = await client.UploadStringTaskAsync($"{ApiEndpoint}/replays/fingerprints?uploadToHotslogs={UploadToHotslogs}", String.Join("\n", fingerprints)); + response = await client.UploadStringTaskAsync(BulkFingerprintUrl, String.Join("\n", fingerprints)); } dynamic json = JObject.Parse(response); return (json.exists as JArray).Select(x => x.ToString()).ToArray(); @@ -126,13 +151,20 @@ private async Task CheckDuplicate(IEnumerable fingerprints) } } + + /// /// Mass check replay fingerprints against database to detect duplicates /// public async Task CheckDuplicate(IEnumerable replays) { + foreach (var replay in replays) { + replay.UploadStatus = UploadStatus.CheckingDuplicates; + } var exists = new HashSet(await CheckDuplicate(replays.Select(x => x.Fingerprint))); - replays.Where(x => exists.Contains(x.Fingerprint)).Map(x => x.UploadStatus = UploadStatus.Duplicate); + foreach (var replay in replays) { + replay.UploadStatus = exists.Contains(replay.Fingerprint) ? UploadStatus.Duplicate : UploadStatus.ReadyForUpload; + } } /// @@ -143,7 +175,7 @@ public async Task GetMinimumBuild() try { using (var client = new WebClient()) { var response = await client.DownloadStringTaskAsync($"{ApiEndpoint}/replays/min-build"); - if (!int.TryParse(response, out int build)) { + if (!Int32.TryParse(response, out var build)) { _log.Warn($"Error parsing minimum build: {response}"); return 0; } @@ -163,7 +195,7 @@ public async Task GetMinimumBuild() /// Check if Hotsapi request limit is reached and wait if it is /// /// Server response to examine - private async Task CheckApiThrottling(WebResponse response) + private static async Task CheckApiThrottling(WebResponse response) { if (response != null && (int)(response as HttpWebResponse).StatusCode == 429) { _log.Warn($"Too many requests, waiting"); @@ -174,4 +206,5 @@ private async Task CheckApiThrottling(WebResponse response) } } } + } diff --git a/Hotsapi.Uploader.Common/WorkerPool.cs b/Hotsapi.Uploader.Common/WorkerPool.cs new file mode 100644 index 0000000..797cabd --- /dev/null +++ b/Hotsapi.Uploader.Common/WorkerPool.cs @@ -0,0 +1,78 @@ +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Diagnostics; +using System.Threading; +using System.Threading.Tasks; + +namespace Hotsapi.Uploader.Common +{ + internal static class WorkerPool + { + private static TaskScheduler BackgroundScheduler = new LowPriorityScheduler(); + public static Task RunBackground(Action action) => RunBackground(action, CancellationToken.None); + public static Task RunBackground(Action action, CancellationToken token) => Task.Factory.StartNew(action, token, TaskCreationOptions.LongRunning, BackgroundScheduler); + public static Task RunBackground(Func action) => RunBackground(action, CancellationToken.None); + public static Task RunBackground(Func action, CancellationToken token) => Task.Factory.StartNew(action, token, TaskCreationOptions.LongRunning, BackgroundScheduler); + + } + + /// + /// Fixed number threadpool task scheduler that runs tasks on low-priority threads + /// + public class LowPriorityScheduler : TaskScheduler, IDisposable + { + private BlockingCollection TaskQueue { get; } = new BlockingCollection(); + private IEnumerable Consumable => TaskQueue.GetConsumingEnumerable(); + + public LowPriorityScheduler(int maxThreads) + { + for (var i = 0; i < maxThreads; i++) { + var workerThread = new Thread(PerformTasks) { + Name = $"Low Priority Tread {i}/{maxThreads}", + IsBackground = true, + Priority = ThreadPriority.BelowNormal + }; + workerThread.Start(); + } + } + + public LowPriorityScheduler() : this(Environment.ProcessorCount) { } + + private void PerformTasks() + { + foreach (var task in Consumable) { + if (task.Status == TaskStatus.WaitingToRun || task.Status == TaskStatus.WaitingForActivation || task.Status == TaskStatus.Created) { + // execute the task under the current thread + if (!base.TryExecuteTask(task)) Trace.WriteLine("Error"); + } else { + Trace.WriteLine("Unrunnable Task Status"); + } + } + var col = TaskQueue; + if (col != null) col.Dispose(); + } + protected override IEnumerable GetScheduledTasks() => TaskQueue; + protected override void QueueTask(Task task) => TaskQueue.Add(task); + protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued) => false; + + #region IDisposable Support + private bool disposedValue = false; // To detect redundant calls + + protected virtual void Dispose(bool disposing) + { + if (!disposedValue) { + if (disposing) { + TaskQueue.CompleteAdding(); + } + disposedValue = true; + } + } + + // This code added to correctly implement the disposable pattern. + public void Dispose() => + // Do not change this code. Put cleanup code in Dispose(bool disposing) above. + Dispose(true); + #endregion + } +} diff --git a/Hotsapi.Uploader.Windows/App.config b/Hotsapi.Uploader.Windows/App.config index 82d5ea4..d9b9904 100644 --- a/Hotsapi.Uploader.Windows/App.config +++ b/Hotsapi.Uploader.Windows/App.config @@ -33,7 +33,7 @@ 600 - 700 + 800 False diff --git a/Hotsapi.Uploader.Windows/UIHelpers/UploadColorConverter.cs b/Hotsapi.Uploader.Windows/UIHelpers/UploadColorConverter.cs index 0736c34..544c11d 100644 --- a/Hotsapi.Uploader.Windows/UIHelpers/UploadColorConverter.cs +++ b/Hotsapi.Uploader.Windows/UIHelpers/UploadColorConverter.cs @@ -11,9 +11,12 @@ protected override Brush Convert(UploadStatus value) { switch (value) { case UploadStatus.Success: - return GetBrush("StatusUploadSuccessBrush"); - - case UploadStatus.InProgress: + return GetBrush("StatusUploadSuccessBrush"); + + case UploadStatus.Preprocessing: + case UploadStatus.Preprocessed: + case UploadStatus.ReadyForUpload: + case UploadStatus.Uploading: return GetBrush("StatusUploadInProgressBrush"); case UploadStatus.Duplicate: