Skip to content

Commit d33d623

Browse files
committed
CSHARP-3458: Extend IAsyncCursor and IAsyncCursorSource to support IAsyncEnumerable
1 parent e62da2b commit d33d623

File tree

7 files changed

+137
-4
lines changed

7 files changed

+137
-4
lines changed

src/MongoDB.Driver/Core/IAsyncCursor.cs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -360,6 +360,17 @@ public static class IAsyncCursorExtensions
360360
return new AsyncCursorEnumerableOneTimeAdapter<TDocument>(cursor, cancellationToken);
361361
}
362362

363+
/// <summary>
364+
/// Wraps a cursor in an IAsyncEnumerable that can be enumerated one time.
365+
/// </summary>
366+
/// <typeparam name="TDocument">The type of the document.</typeparam>
367+
/// <param name="cursor">The cursor.</param>
368+
/// <returns>An IAsyncEnumerable.</returns>
369+
public static IAsyncEnumerable<TDocument> ToAsyncEnumerable<TDocument>(this IAsyncCursor<TDocument> cursor)
370+
{
371+
return new AsyncCursorEnumerableOneTimeAdapter<TDocument>(cursor);
372+
}
373+
363374
/// <summary>
364375
/// Returns a list containing all the documents returned by a cursor.
365376
/// </summary>

src/MongoDB.Driver/Core/IAsyncCursorSource.cs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -336,6 +336,17 @@ public static class IAsyncCursorSourceExtensions
336336
return new AsyncCursorSourceEnumerableAdapter<TDocument>(source, cancellationToken);
337337
}
338338

339+
/// <summary>
340+
/// Wraps a cursor source in an IAsyncEnumerable. Each time GetAsyncEnumerator is called a new cursor is fetched from the cursor source.
341+
/// </summary>
342+
/// <typeparam name="TDocument">The type of the document.</typeparam>
343+
/// <param name="source">The source.</param>
344+
/// <returns>An IAsyncEnumerable.</returns>
345+
public static IAsyncEnumerable<TDocument> ToAsyncEnumerable<TDocument>(this IAsyncCursorSource<TDocument> source)
346+
{
347+
return new AsyncCursorSourceEnumerableAdapter<TDocument>(source);
348+
}
349+
339350
/// <summary>
340351
/// Returns a list containing all the documents returned by the cursor returned by a cursor source.
341352
/// </summary>

src/MongoDB.Driver/Core/Operations/AsyncCursorEnumerableOneTimeAdapter.cs

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,18 +21,33 @@
2121

2222
namespace MongoDB.Driver.Core.Operations
2323
{
24-
internal sealed class AsyncCursorEnumerableOneTimeAdapter<TDocument> : IEnumerable<TDocument>
24+
internal sealed class AsyncCursorEnumerableOneTimeAdapter<TDocument> : IEnumerable<TDocument>, IAsyncEnumerable<TDocument>
2525
{
2626
private readonly CancellationToken _cancellationToken;
2727
private readonly IAsyncCursor<TDocument> _cursor;
2828
private bool _hasBeenEnumerated;
2929

30+
public AsyncCursorEnumerableOneTimeAdapter(IAsyncCursor<TDocument> cursor)
31+
{
32+
_cursor = Ensure.IsNotNull(cursor, nameof(cursor));
33+
}
34+
3035
public AsyncCursorEnumerableOneTimeAdapter(IAsyncCursor<TDocument> cursor, CancellationToken cancellationToken)
3136
{
3237
_cursor = Ensure.IsNotNull(cursor, nameof(cursor));
3338
_cancellationToken = cancellationToken;
3439
}
3540

41+
public IAsyncEnumerator<TDocument> GetAsyncEnumerator(CancellationToken cancellationToken = default)
42+
{
43+
if (_hasBeenEnumerated)
44+
{
45+
throw new InvalidOperationException("An IAsyncCursor can only be enumerated once.");
46+
}
47+
_hasBeenEnumerated = true;
48+
return new AsyncCursorEnumerator<TDocument>(_cursor, cancellationToken);
49+
}
50+
3651
public IEnumerator<TDocument> GetEnumerator()
3752
{
3853
if (_hasBeenEnumerated)

src/MongoDB.Driver/Core/Operations/AsyncCursorEnumerator.cs

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,12 @@
1717
using System.Collections;
1818
using System.Collections.Generic;
1919
using System.Threading;
20+
using System.Threading.Tasks;
2021
using MongoDB.Driver.Core.Misc;
2122

2223
namespace MongoDB.Driver.Core.Operations
2324
{
24-
internal class AsyncCursorEnumerator<TDocument> : IEnumerator<TDocument>
25+
internal class AsyncCursorEnumerator<TDocument> : IEnumerator<TDocument>, IAsyncEnumerator<TDocument>
2526
{
2627
// private fields
2728
private IEnumerator<TDocument> _batchEnumerator;
@@ -72,6 +73,12 @@ public void Dispose()
7273
}
7374
}
7475

76+
public ValueTask DisposeAsync()
77+
{
78+
Dispose();
79+
return new ValueTask(Task.CompletedTask);
80+
}
81+
7582
public bool MoveNext()
7683
{
7784
ThrowIfDisposed();
@@ -102,6 +109,28 @@ public bool MoveNext()
102109
}
103110
}
104111

112+
public async ValueTask<bool> MoveNextAsync()
113+
{
114+
ThrowIfDisposed();
115+
_started = true;
116+
117+
if (_batchEnumerator != null && _batchEnumerator.MoveNext())
118+
{
119+
return true;
120+
}
121+
122+
while (await _cursor.MoveNextAsync(_cancellationToken).ConfigureAwait(false))
123+
{
124+
_batchEnumerator?.Dispose();
125+
_batchEnumerator = _cursor.Current.GetEnumerator();
126+
return _batchEnumerator.MoveNext();
127+
}
128+
129+
_batchEnumerator = null;
130+
_finished = true;
131+
return false;
132+
}
133+
105134
public void Reset()
106135
{
107136
ThrowIfDisposed();

src/MongoDB.Driver/Core/Operations/AsyncCursorSourceEnumerableAdapter.cs

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,27 +13,37 @@
1313
* limitations under the License.
1414
*/
1515

16-
using System;
1716
using System.Collections;
1817
using System.Collections.Generic;
1918
using System.Threading;
2019
using MongoDB.Driver.Core.Misc;
2120

2221
namespace MongoDB.Driver.Core.Operations
2322
{
24-
internal class AsyncCursorSourceEnumerableAdapter<TDocument> : IEnumerable<TDocument>
23+
internal class AsyncCursorSourceEnumerableAdapter<TDocument> : IEnumerable<TDocument>, IAsyncEnumerable<TDocument>
2524
{
2625
// private fields
2726
private readonly CancellationToken _cancellationToken;
2827
private readonly IAsyncCursorSource<TDocument> _source;
2928

3029
// constructors
30+
public AsyncCursorSourceEnumerableAdapter(IAsyncCursorSource<TDocument> source)
31+
{
32+
_source = Ensure.IsNotNull(source, nameof(source));
33+
}
34+
3135
public AsyncCursorSourceEnumerableAdapter(IAsyncCursorSource<TDocument> source, CancellationToken cancellationToken)
3236
{
3337
_source = Ensure.IsNotNull(source, nameof(source));
3438
_cancellationToken = cancellationToken;
3539
}
3640

41+
public IAsyncEnumerator<TDocument> GetAsyncEnumerator(CancellationToken cancellationToken = default)
42+
{
43+
var cursor = _source.ToCursor(cancellationToken);
44+
return new AsyncCursorEnumerator<TDocument>(cursor, cancellationToken);
45+
}
46+
3747
// public methods
3848
public IEnumerator<TDocument> GetEnumerator()
3949
{

tests/MongoDB.Driver.Tests/Core/IAsyncCursorExtensionsTests.cs

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
using System;
1717
using System.Collections.Generic;
1818
using System.Linq;
19+
using System.Threading.Tasks;
1920
using FluentAssertions;
2021
using MongoDB.Bson;
2122
using MongoDB.Bson.Serialization.Serializers;
@@ -201,6 +202,37 @@ public void SingleOrDefault_should_throw_when_cursor_has_wrong_number_of_documen
201202
action.ShouldThrow<InvalidOperationException>();
202203
}
203204

205+
[Fact]
206+
public void ToAsyncEnumerable_result_should_only_be_enumerable_one_time()
207+
{
208+
var cursor = CreateCursor(2);
209+
var enumerable = cursor.ToAsyncEnumerable();
210+
enumerable.GetAsyncEnumerator();
211+
212+
Action action = () => enumerable.GetAsyncEnumerator();
213+
214+
action.ShouldThrow<InvalidOperationException>();
215+
}
216+
217+
[Fact]
218+
public async Task ToAsyncEnumerable_should_return_expected_result()
219+
{
220+
var cursor = CreateCursor(2);
221+
var expectedDocuments = new[]
222+
{
223+
new BsonDocument("_id", 0),
224+
new BsonDocument("_id", 1)
225+
};
226+
227+
var result = new List<BsonDocument>();
228+
await foreach (var doc in cursor.ToAsyncEnumerable())
229+
{
230+
result.Add(doc);
231+
}
232+
233+
result.Should().Equal(expectedDocuments);
234+
}
235+
204236
[Fact]
205237
public void ToEnumerable_result_should_only_be_enumerable_one_time()
206238
{

tests/MongoDB.Driver.Tests/Core/IAsyncCursorSourceExtensionsTests.cs

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -203,6 +203,31 @@ public void SingleOrDefault_should_throw_when_cursor_has_wrong_number_of_documen
203203
action.ShouldThrow<InvalidOperationException>();
204204
}
205205

206+
[Theory]
207+
[ParameterAttributeData]
208+
public async Task ToAsyncEnumerable_result_should_be_enumerable_multiple_times(
209+
[Values(1, 2)] int times)
210+
{
211+
var source = CreateCursorSource(2);
212+
var expectedDocuments = new[]
213+
{
214+
new BsonDocument("_id", 0),
215+
new BsonDocument("_id", 1)
216+
};
217+
218+
var result = new List<BsonDocument>();
219+
for (var i = 0; i < times; i++)
220+
{
221+
await foreach (var doc in source.ToAsyncEnumerable())
222+
{
223+
result.Add(doc);
224+
}
225+
226+
result.Should().Equal(expectedDocuments);
227+
result.Clear();
228+
}
229+
}
230+
206231
[Theory]
207232
[ParameterAttributeData]
208233
public void ToEnumerable_result_should_be_enumerable_multiple_times(

0 commit comments

Comments
 (0)