Skip to content

Commit 0a5a33b

Browse files
author
Martijn Hoekstra
committed
restructure manager
three queues, one for parsing, one for (bulk) duplicate checks, one for uploads, each running at its own pace
1 parent d563418 commit 0a5a33b

File tree

7 files changed

+497
-321
lines changed

7 files changed

+497
-321
lines changed

Hotsapi.Uploader.Common/Analyzer.cs

Lines changed: 14 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -16,21 +16,17 @@ public class Analyzer : IAnalyzer
1616
private static Logger _log = LogManager.GetCurrentClassLogger();
1717

1818
/// <summary>
19-
/// Analyze replay locally before uploading
19+
/// Analyze replay locally before uploading.
20+
///
21+
/// Sets file status as a side-effect.
2022
/// </summary>
2123
/// <param name="file">Replay file</param>
2224
public Replay Analyze(ReplayFile file)
2325
{
2426
try {
25-
var result = DataParser.ParseReplay(file.Filename, false, false, false, true);
26-
var replay = result.Item2;
27-
var parseResult = result.Item1;
28-
var status = GetPreStatus(replay, parseResult);
29-
30-
if (status != null) {
31-
file.UploadStatus = status.Value;
32-
}
33-
27+
file.UploadStatus = UploadStatus.Preprocessing;
28+
var (parseResult, replay) = DataParser.ParseReplay(file.Filename, false, false, false, true);
29+
file.UploadStatus = GetPreStatus(replay, parseResult) ?? file.UploadStatus;
3430
if (parseResult != DataParser.ReplayParseResult.Success) {
3531
return null;
3632
}
@@ -44,7 +40,7 @@ public Replay Analyze(ReplayFile file)
4440
}
4541
}
4642

47-
public UploadStatus? GetPreStatus(Replay replay, DataParser.ReplayParseResult parseResult)
43+
private UploadStatus? GetPreStatus(Replay replay, DataParser.ReplayParseResult parseResult)
4844
{
4945
switch (parseResult) {
5046
case DataParser.ReplayParseResult.ComputerPlayerFound:
@@ -55,22 +51,15 @@ public Replay Analyze(ReplayFile file)
5551
return UploadStatus.PtrRegion;
5652

5753
case DataParser.ReplayParseResult.PreAlphaWipe:
58-
return UploadStatus.TooOld;
59-
}
60-
61-
if (parseResult != DataParser.ReplayParseResult.Success) {
62-
return null;
63-
}
64-
65-
if (replay.GameMode == GameMode.Custom) {
66-
return UploadStatus.CustomGame;
67-
}
68-
69-
if (replay.ReplayBuild < MinimumBuild) {
70-
return UploadStatus.TooOld;
54+
return UploadStatus.TooOld;
55+
case DataParser.ReplayParseResult.Incomplete:
56+
return UploadStatus.Incomplete;
7157
}
7258

73-
return null;
59+
return parseResult != DataParser.ReplayParseResult.Success ? null
60+
: replay.GameMode == GameMode.Custom ? (UploadStatus?)UploadStatus.CustomGame
61+
: replay.ReplayBuild < MinimumBuild ? (UploadStatus?)UploadStatus.TooOld
62+
: (UploadStatus?)UploadStatus.Preprocessed;
7463
}
7564

7665
/// <summary>
Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
using System.Collections.Generic;
2+
using System.Linq;
3+
using System.Threading;
4+
using System.Threading.Tasks;
5+
6+
namespace Hotsapi.Uploader.Common
7+
{
8+
/// <summary>
9+
/// A thread-safe, concurrent buffer that has asynchronous operations for enqueuing and bulk dequeueing.
10+
/// </summary>
11+
public class ConcurrentAyncBuffer<A>
12+
{
13+
private Queue<A> Items { get; } = new Queue<A>();
14+
private Queue<TaskCompletionSource<IEnumerable<A>>> BulkRequests { get; } = new Queue<TaskCompletionSource<IEnumerable<A>>>();
15+
private SemaphoreSlim Mutex { get; } = new SemaphoreSlim(1);
16+
/// <summary>
17+
/// Enqueue a single item
18+
/// </summary>
19+
/// <param name="item">the item to enqueue</param>
20+
/// <returns>A task that will be completed when the item is enqueued</returns>
21+
public Task EnqueueAsync(A item) => EnqueueAsync(item, CancellationToken.None);
22+
/// <summary>
23+
/// Enqueue a single item
24+
/// </summary>
25+
/// <param name="item">the item to enqueue</param>
26+
/// <param name="token">a cancellation token to cancel trying to enqueue the item</param>
27+
/// <returns>A task that will be completed when the item is enqueued</returns>
28+
public Task EnqueueAsync(A item, CancellationToken token) => EnqueueManyAsync(new List<A>() { item }, token);
29+
/// <summary>
30+
/// Enqueue all items
31+
/// </summary>
32+
/// <param name="items">the items to enqueue</param>
33+
/// <param name="token">a cancellation token to cancel trying to enqueue items</param>
34+
/// <returns>A task that will be completed when the items are enqueued</returns>
35+
public Task EnqueueManyAsync(IEnumerable<A> items, CancellationToken token) => Mutex.Locked(() => {
36+
var succeeded = false;
37+
while (!succeeded) {
38+
if (BulkRequests.Any()) {
39+
succeeded = BulkRequests.Dequeue().TrySetResult(items);
40+
} else {
41+
foreach (var item in items) {
42+
Items.Enqueue(item);
43+
}
44+
succeeded = true;
45+
}
46+
}
47+
}, token);
48+
49+
/// <summary>
50+
/// Enqueue all items
51+
/// </summary>
52+
/// <param name="items">the items to enqueue</param>
53+
/// <returns>a task that will be completed when the items are enqueued.</returns>
54+
public Task EnqueueManyAsync(IEnumerable<A> items) => EnqueueManyAsync(items, CancellationToken.None);
55+
56+
/// <summary>
57+
/// Dequeue all A's as soon as they're available.
58+
/// </summary>
59+
/// <param name="token">A cancellationtoken that will cancel fetching data</param>
60+
/// <returns>A task that will complete with all A's available</returns>
61+
public async Task<IEnumerable<A>> DequeueAsync(CancellationToken token)
62+
{
63+
var locked = 0;
64+
try {
65+
await Mutex.WaitAsync(token);
66+
locked = 1;
67+
Task<IEnumerable<A>> resultTask;
68+
if (Items.Any()) {
69+
var result = new List<A>();
70+
while (Items.Any()) {
71+
result.Add(Items.Dequeue());
72+
}
73+
resultTask = Task.FromResult<IEnumerable<A>>(result);
74+
} else {
75+
var completion = new TaskCompletionSource<IEnumerable<A>>();
76+
BulkRequests.Enqueue(completion);
77+
_ = token.Register(() => completion.TrySetCanceled());
78+
resultTask = completion.Task;
79+
}
80+
if (locked > 0) {
81+
_ = Mutex.Release(locked);
82+
}
83+
locked = 0;
84+
return await resultTask;
85+
}
86+
finally {
87+
if (locked > 0) {
88+
_ = Mutex.Release(locked);
89+
}
90+
}
91+
}
92+
/// <summary>
93+
/// Dequeue all A's as soon as any are available
94+
/// </summary>
95+
/// <returns>A task that will complete with all A's available</returns>
96+
public Task<IEnumerable<A>> DequeueAsync() => DequeueAsync(CancellationToken.None);
97+
}
98+
}
Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
using System;
2+
using System.Threading;
3+
using System.Threading.Tasks;
4+
5+
namespace Hotsapi.Uploader.Common
6+
{
7+
/// <summary>
8+
/// Provides extensions for using SemaphoreSlim in a safe way that will always release the semaphore
9+
/// </summary>
10+
public static class LockExtensions
11+
{
12+
public static async Task<A> Locked<A>(this SemaphoreSlim semaphore, Func<A> thunk, CancellationToken token)
13+
{
14+
var locked = 0;
15+
try {
16+
await semaphore.WaitAsync(token);
17+
locked = 1;
18+
return thunk();
19+
}
20+
finally {
21+
if (locked > 0) {
22+
_ = semaphore.Release(locked);
23+
}
24+
}
25+
}
26+
27+
public static async Task Locked(this SemaphoreSlim semaphore, Action thunk, CancellationToken token)
28+
{
29+
var locked = 0;
30+
try {
31+
await semaphore.WaitAsync(token);
32+
locked = 1;
33+
thunk();
34+
}
35+
finally {
36+
if (locked > 0) {
37+
_ = semaphore.Release(locked);
38+
}
39+
}
40+
}
41+
42+
public static async Task LockedTask(this SemaphoreSlim semaphore, Func<Task> thunk, CancellationToken token)
43+
{
44+
var locked = 0;
45+
try {
46+
await semaphore.WaitAsync(token);
47+
locked = 1;
48+
await thunk();
49+
}
50+
finally {
51+
if (locked > 0) {
52+
_ = semaphore.Release(locked);
53+
}
54+
}
55+
}
56+
57+
public static async Task<A> LockedTask<A>(this SemaphoreSlim semaphore, Func<Task<A>> thunk, CancellationToken token)
58+
{
59+
var locked = 0;
60+
try {
61+
await semaphore.WaitAsync(token);
62+
locked = 1;
63+
return await thunk();
64+
}
65+
finally {
66+
if (locked > 0) {
67+
_ = semaphore.Release(locked);
68+
}
69+
}
70+
}
71+
72+
public static Task<A> Locked<A>(this SemaphoreSlim semaphore, Func<A> thunk) => Locked(semaphore, thunk, CancellationToken.None);
73+
public static Task Locked(this SemaphoreSlim semaphore, Action thunk) => Locked(semaphore, thunk, CancellationToken.None);
74+
public static Task<A> LockedTask<A>(this SemaphoreSlim semaphore, Func<Task<A>> thunk) => LockedTask(semaphore, thunk, CancellationToken.None);
75+
public static Task LockedTask(this SemaphoreSlim semaphore, Func<Task> thunk) => LockedTask(semaphore, thunk, CancellationToken.None);
76+
}
77+
}

0 commit comments

Comments
 (0)