Skip to content

Commit fa14989

Browse files
Fixed bug ADO.NET/PoolManager: SemaphoreSlim.WaitAsync over-release on cancellation. (#524)
1 parent 59ac3f4 commit fa14989

File tree

6 files changed

+29
-18
lines changed

6 files changed

+29
-18
lines changed

src/Ydb.Sdk/CHANGELOG.md

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,9 @@
1+
- Fixed bug ADO.NET/PoolManager: `SemaphoreSlim.WaitAsync` over-release on cancellation.
12
- Feat ADO.NET: Mark `YdbConnection.State` as `Broken` when the underlying session is broken, including background deactivation.
23
- Feat ADO.NET: Added YdbDataSource `ExecuteAsync` and `ExecuteInTransaction` convenience methods.
34
- **Breaking Change**: moved and renamed `Ydb.Sdk.Services.Query.TxMode` -> `Ydb.Sdk.Ado.TransactionMode`.
45
- Feat ADO.NET: Cache gRPC transport by `gRPCConnectionString` to reuse channels.
5-
- Fix bug wrap-around ADO.NET: Big parameterized Decimal — `((ulong)bits[1] << 32)` -> `((ulong)(uint)bits[1] << 32)`.
6+
- Fixed bug wrap-around ADO.NET: Big parameterized Decimal — `((ulong)bits[1] << 32)` -> `((ulong)(uint)bits[1] << 32)`.
67
- Feat ADO.NET: Parameterized Decimal overflow check: `Precision` and `Scale`.
78
- Feat ADO.NET: Deleted support for `DateTimeOffset` was a mistake.
89
- Feat ADO.NET: Added support for `Date32`, `Datetime64`, `Timestamp64` and `Interval64` types in YDB.
@@ -28,7 +29,7 @@
2829
- Canceling AttachStream after calling the `DeleteSession` method.
2930
- Fixed bug: fixed issue where session was not deleted (`ClientTransportTimeout`).
3031
- Fixed bug: Grpc.Core.StatusCode.Cancelled was mapped to server's Canceled status.
31-
- ADO.NET: PoolingSessionSource 2.0 based on Npgsql pooling algorithm.
32+
- Feat ADO.NET: PoolingSessionSource 2.0 based on lock-free FIFO pooling algorithm.
3233
- Added new ADO.NET options:
3334
- `MinSessionPool`: The minimum connection pool size.
3435
- `SessionIdleTimeout`: The time (in seconds) to wait before closing idle session in the pool if the count of all sessions exceeds `MinSessionPool`.

src/Ydb.Sdk/src/Ado/PoolManager.cs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,10 @@ CancellationToken cancellationToken
2020
return await sessionPool.OpenSession(cancellationToken);
2121
}
2222

23+
await SemaphoreSlim.WaitAsync(cancellationToken);
24+
2325
try
2426
{
25-
await SemaphoreSlim.WaitAsync(cancellationToken);
26-
2727
if (Pools.TryGetValue(settings.ConnectionString, out var pool))
2828
{
2929
return await pool.OpenSession(cancellationToken);
@@ -52,10 +52,9 @@ internal static async Task ClearPool(string connectionString)
5252
{
5353
if (Pools.TryRemove(connectionString, out var sessionPool))
5454
{
55+
await SemaphoreSlim.WaitAsync();
5556
try
5657
{
57-
await SemaphoreSlim.WaitAsync();
58-
5958
await sessionPool.DisposeAsync();
6059
}
6160
finally

src/Ydb.Sdk/src/IDriver.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -210,14 +210,14 @@ protected async ValueTask<CallOptions> GetCallOptions(GrpcRequestSettings settin
210210
}
211211

212212
public ILoggerFactory LoggerFactory { get; }
213-
public void RegisterOwner() => ++_ownerCount;
213+
public void RegisterOwner() => Interlocked.Increment(ref _ownerCount);
214214
public bool IsDisposed => Disposed == 1;
215215

216216
public void Dispose() => DisposeAsync().AsTask().GetAwaiter().GetResult();
217217

218218
public async ValueTask DisposeAsync()
219219
{
220-
if (--_ownerCount <= 0 && Interlocked.CompareExchange(ref Disposed, 1, 0) == 0)
220+
if (Interlocked.Decrement(ref _ownerCount) <= 0 && Interlocked.CompareExchange(ref Disposed, 1, 0) == 0)
221221
{
222222
await ChannelPool.DisposeAsync();
223223

src/Ydb.Sdk/test/Ydb.Sdk.Ado.Tests/PoolManagerTests.cs

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,7 @@
33

44
namespace Ydb.Sdk.Ado.Tests;
55

6-
[Collection("PoolManagerTests")]
7-
[CollectionDefinition("PoolManagerTests", DisableParallelization = true)]
6+
[Collection("DisableParallelization")]
87
public class PoolManagerTests
98
{
109
[Theory]
@@ -25,6 +24,17 @@ public class PoolManagerTests
2524
"MinSessionSize=1;ConnectTimeout=8", "MinSessionSize=1;ConnectTimeout=9"
2625
}, 5, 5)] // 5 transport, 5 five pools
2726
[InlineData(new[] { "MinSessionSize=1" }, 1, 1)] // simple case
27+
[InlineData(new[]
28+
{
29+
"MinSessionSize=1", "MinSessionSize=1", "MinSessionSize=1", "MinSessionSize=1", "MinSessionSize=1",
30+
"MinSessionSize=1", "MinSessionSize=1", "MinSessionSize=1", "MinSessionSize=1", "MinSessionSize=1",
31+
"MinSessionSize=1", "MinSessionSize=1", "MinSessionSize=1", "MinSessionSize=1", "MinSessionSize=1",
32+
"MinSessionSize=1", "MinSessionSize=1", "MinSessionSize=1", "MinSessionSize=1", "MinSessionSize=1",
33+
"MinSessionSize=2", "MinSessionSize=2", "MinSessionSize=2", "MinSessionSize=2", "MinSessionSize=2",
34+
"MinSessionSize=2", "MinSessionSize=2", "MinSessionSize=2", "MinSessionSize=2", "MinSessionSize=2",
35+
"MinSessionSize=2", "MinSessionSize=2", "MinSessionSize=2", "MinSessionSize=2", "MinSessionSize=3",
36+
"MinSessionSize=3", "MinSessionSize=3", "MinSessionSize=3", "MinSessionSize=3", "MinSessionSize=3"
37+
}, 1, 3)] // duplicate rows — we expect 1 transport, 3 pools, stress test
2838
public async Task PoolManager_CachingAndCleanup(string[] connectionStrings, int expectedDrivers, int expectedPools)
2939
{
3040
await YdbConnection.ClearAllPools();
@@ -35,23 +45,19 @@ public async Task PoolManager_CachingAndCleanup(string[] connectionStrings, int
3545
var connections = connectionStrings
3646
.Select(connectionString => new YdbConnection(connectionString))
3747
.ToImmutableArray();
38-
var parallelTasks = connections.Select(connection => connection.OpenAsync()).ToList();
39-
await Task.WhenAll(parallelTasks);
48+
await Task.WhenAll(connections.Select(connection => connection.OpenAsync()));
4049

4150
Assert.Equal(expectedDrivers, PoolManager.Drivers.Count);
4251
Assert.Equal(expectedPools, PoolManager.Pools.Count);
4352

4453
await ClearAllConnections(connections);
45-
46-
parallelTasks = connections.Select(connection => connection.OpenAsync()).ToList();
47-
await Task.WhenAll(parallelTasks);
54+
await Task.WhenAll(connections.Select(connection => connection.OpenAsync()));
4855

4956
foreach (var (_, driver) in PoolManager.Drivers)
5057
Assert.False(driver.IsDisposed);
5158

5259
Assert.Equal(expectedDrivers, PoolManager.Drivers.Count);
5360
Assert.Equal(expectedPools, PoolManager.Pools.Count);
54-
5561
await ClearAllConnections(connections);
5662
}
5763

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
using Xunit;
2+
3+
namespace Ydb.Sdk.Ado.Tests.Utils;
4+
5+
[CollectionDefinition("DisableParallelization", DisableParallelization = true)]
6+
public class DisableParallelization;

src/Ydb.Sdk/test/Ydb.Sdk.Ado.Tests/YdbSchemaTests.cs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,7 @@
33

44
namespace Ydb.Sdk.Ado.Tests;
55

6-
[Collection("YdbSchemaTests isolation test")]
7-
[CollectionDefinition("YdbSchemaTests isolation test", DisableParallelization = true)]
6+
[Collection("DisableParallelization")]
87
public class YdbSchemaTests : TestBase
98
{
109
private readonly string _table1;

0 commit comments

Comments
 (0)