diff --git a/src/Ydb.Sdk/CHANGELOG.md b/src/Ydb.Sdk/CHANGELOG.md index b5d5e014..606c74cf 100644 --- a/src/Ydb.Sdk/CHANGELOG.md +++ b/src/Ydb.Sdk/CHANGELOG.md @@ -1,8 +1,9 @@ +- Fixed bug ADO.NET/PoolManager: `SemaphoreSlim.WaitAsync` over-release on cancellation. - Feat ADO.NET: Mark `YdbConnection.State` as `Broken` when the underlying session is broken, including background deactivation. - Feat ADO.NET: Added YdbDataSource `ExecuteAsync` and `ExecuteInTransaction` convenience methods. - **Breaking Change**: moved and renamed `Ydb.Sdk.Services.Query.TxMode` -> `Ydb.Sdk.Ado.TransactionMode`. - Feat ADO.NET: Cache gRPC transport by `gRPCConnectionString` to reuse channels. -- Fix bug wrap-around ADO.NET: Big parameterized Decimal — `((ulong)bits[1] << 32)` -> `((ulong)(uint)bits[1] << 32)`. +- Fixed bug wrap-around ADO.NET: Big parameterized Decimal — `((ulong)bits[1] << 32)` -> `((ulong)(uint)bits[1] << 32)`. - Feat ADO.NET: Parameterized Decimal overflow check: `Precision` and `Scale`. - Feat ADO.NET: Deleted support for `DateTimeOffset` was a mistake. - Feat ADO.NET: Added support for `Date32`, `Datetime64`, `Timestamp64` and `Interval64` types in YDB. @@ -28,7 +29,7 @@ - Canceling AttachStream after calling the `DeleteSession` method. - Fixed bug: fixed issue where session was not deleted (`ClientTransportTimeout`). - Fixed bug: Grpc.Core.StatusCode.Cancelled was mapped to server's Canceled status. -- ADO.NET: PoolingSessionSource 2.0 based on Npgsql pooling algorithm. +- Feat ADO.NET: PoolingSessionSource 2.0 based on lock-free FIFO pooling algorithm. - Added new ADO.NET options: - `MinSessionPool`: The minimum connection pool size. - `SessionIdleTimeout`: The time (in seconds) to wait before closing idle session in the pool if the count of all sessions exceeds `MinSessionPool`. diff --git a/src/Ydb.Sdk/src/Ado/PoolManager.cs b/src/Ydb.Sdk/src/Ado/PoolManager.cs index feb13ee5..ce4d4d29 100644 --- a/src/Ydb.Sdk/src/Ado/PoolManager.cs +++ b/src/Ydb.Sdk/src/Ado/PoolManager.cs @@ -20,10 +20,10 @@ CancellationToken cancellationToken return await sessionPool.OpenSession(cancellationToken); } + await SemaphoreSlim.WaitAsync(cancellationToken); + try { - await SemaphoreSlim.WaitAsync(cancellationToken); - if (Pools.TryGetValue(settings.ConnectionString, out var pool)) { return await pool.OpenSession(cancellationToken); @@ -52,10 +52,9 @@ internal static async Task ClearPool(string connectionString) { if (Pools.TryRemove(connectionString, out var sessionPool)) { + await SemaphoreSlim.WaitAsync(); try { - await SemaphoreSlim.WaitAsync(); - await sessionPool.DisposeAsync(); } finally diff --git a/src/Ydb.Sdk/src/IDriver.cs b/src/Ydb.Sdk/src/IDriver.cs index c0a8c2ff..b5edea8c 100644 --- a/src/Ydb.Sdk/src/IDriver.cs +++ b/src/Ydb.Sdk/src/IDriver.cs @@ -210,14 +210,14 @@ protected async ValueTask GetCallOptions(GrpcRequestSettings settin } public ILoggerFactory LoggerFactory { get; } - public void RegisterOwner() => ++_ownerCount; + public void RegisterOwner() => Interlocked.Increment(ref _ownerCount); public bool IsDisposed => Disposed == 1; public void Dispose() => DisposeAsync().AsTask().GetAwaiter().GetResult(); public async ValueTask DisposeAsync() { - if (--_ownerCount <= 0 && Interlocked.CompareExchange(ref Disposed, 1, 0) == 0) + if (Interlocked.Decrement(ref _ownerCount) <= 0 && Interlocked.CompareExchange(ref Disposed, 1, 0) == 0) { await ChannelPool.DisposeAsync(); diff --git a/src/Ydb.Sdk/test/Ydb.Sdk.Ado.Tests/PoolManagerTests.cs b/src/Ydb.Sdk/test/Ydb.Sdk.Ado.Tests/PoolManagerTests.cs index c3c5c6a7..840ec21a 100644 --- a/src/Ydb.Sdk/test/Ydb.Sdk.Ado.Tests/PoolManagerTests.cs +++ b/src/Ydb.Sdk/test/Ydb.Sdk.Ado.Tests/PoolManagerTests.cs @@ -3,8 +3,7 @@ namespace Ydb.Sdk.Ado.Tests; -[Collection("PoolManagerTests")] -[CollectionDefinition("PoolManagerTests", DisableParallelization = true)] +[Collection("DisableParallelization")] public class PoolManagerTests { [Theory] @@ -25,6 +24,17 @@ public class PoolManagerTests "MinSessionSize=1;ConnectTimeout=8", "MinSessionSize=1;ConnectTimeout=9" }, 5, 5)] // 5 transport, 5 five pools [InlineData(new[] { "MinSessionSize=1" }, 1, 1)] // simple case + [InlineData(new[] + { + "MinSessionSize=1", "MinSessionSize=1", "MinSessionSize=1", "MinSessionSize=1", "MinSessionSize=1", + "MinSessionSize=1", "MinSessionSize=1", "MinSessionSize=1", "MinSessionSize=1", "MinSessionSize=1", + "MinSessionSize=1", "MinSessionSize=1", "MinSessionSize=1", "MinSessionSize=1", "MinSessionSize=1", + "MinSessionSize=1", "MinSessionSize=1", "MinSessionSize=1", "MinSessionSize=1", "MinSessionSize=1", + "MinSessionSize=2", "MinSessionSize=2", "MinSessionSize=2", "MinSessionSize=2", "MinSessionSize=2", + "MinSessionSize=2", "MinSessionSize=2", "MinSessionSize=2", "MinSessionSize=2", "MinSessionSize=2", + "MinSessionSize=2", "MinSessionSize=2", "MinSessionSize=2", "MinSessionSize=2", "MinSessionSize=3", + "MinSessionSize=3", "MinSessionSize=3", "MinSessionSize=3", "MinSessionSize=3", "MinSessionSize=3" + }, 1, 3)] // duplicate rows — we expect 1 transport, 3 pools, stress test public async Task PoolManager_CachingAndCleanup(string[] connectionStrings, int expectedDrivers, int expectedPools) { await YdbConnection.ClearAllPools(); @@ -35,23 +45,19 @@ public async Task PoolManager_CachingAndCleanup(string[] connectionStrings, int var connections = connectionStrings .Select(connectionString => new YdbConnection(connectionString)) .ToImmutableArray(); - var parallelTasks = connections.Select(connection => connection.OpenAsync()).ToList(); - await Task.WhenAll(parallelTasks); + await Task.WhenAll(connections.Select(connection => connection.OpenAsync())); Assert.Equal(expectedDrivers, PoolManager.Drivers.Count); Assert.Equal(expectedPools, PoolManager.Pools.Count); await ClearAllConnections(connections); - - parallelTasks = connections.Select(connection => connection.OpenAsync()).ToList(); - await Task.WhenAll(parallelTasks); + await Task.WhenAll(connections.Select(connection => connection.OpenAsync())); foreach (var (_, driver) in PoolManager.Drivers) Assert.False(driver.IsDisposed); Assert.Equal(expectedDrivers, PoolManager.Drivers.Count); Assert.Equal(expectedPools, PoolManager.Pools.Count); - await ClearAllConnections(connections); } diff --git a/src/Ydb.Sdk/test/Ydb.Sdk.Ado.Tests/Utils/DisableParallelization.cs b/src/Ydb.Sdk/test/Ydb.Sdk.Ado.Tests/Utils/DisableParallelization.cs new file mode 100644 index 00000000..358ca5de --- /dev/null +++ b/src/Ydb.Sdk/test/Ydb.Sdk.Ado.Tests/Utils/DisableParallelization.cs @@ -0,0 +1,6 @@ +using Xunit; + +namespace Ydb.Sdk.Ado.Tests.Utils; + +[CollectionDefinition("DisableParallelization", DisableParallelization = true)] +public class DisableParallelization; diff --git a/src/Ydb.Sdk/test/Ydb.Sdk.Ado.Tests/YdbSchemaTests.cs b/src/Ydb.Sdk/test/Ydb.Sdk.Ado.Tests/YdbSchemaTests.cs index 17b69d25..fb46acb8 100644 --- a/src/Ydb.Sdk/test/Ydb.Sdk.Ado.Tests/YdbSchemaTests.cs +++ b/src/Ydb.Sdk/test/Ydb.Sdk.Ado.Tests/YdbSchemaTests.cs @@ -3,8 +3,7 @@ namespace Ydb.Sdk.Ado.Tests; -[Collection("YdbSchemaTests isolation test")] -[CollectionDefinition("YdbSchemaTests isolation test", DisableParallelization = true)] +[Collection("DisableParallelization")] public class YdbSchemaTests : TestBase { private readonly string _table1;