Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions src/Ydb.Sdk/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
- Fixed bug ADO.NET/PoolManager: `SemaphoreSlim.WaitAsync` over-release on cancellation.
- Feat ADO.NET: Mark `YdbConnection.State` as `Broken` when the underlying session is broken, including background deactivation.
- Feat ADO.NET: Added YdbDataSource `ExecuteAsync` and `ExecuteInTransaction` convenience methods.
- **Breaking Change**: moved and renamed `Ydb.Sdk.Services.Query.TxMode` -> `Ydb.Sdk.Ado.TransactionMode`.
- Feat ADO.NET: Cache gRPC transport by `gRPCConnectionString` to reuse channels.
- Fix bug wrap-around ADO.NET: Big parameterized Decimal — `((ulong)bits[1] << 32)` -> `((ulong)(uint)bits[1] << 32)`.
- Fixed bug wrap-around ADO.NET: Big parameterized Decimal — `((ulong)bits[1] << 32)` -> `((ulong)(uint)bits[1] << 32)`.
- Feat ADO.NET: Parameterized Decimal overflow check: `Precision` and `Scale`.
- Feat ADO.NET: Deleted support for `DateTimeOffset` was a mistake.
- Feat ADO.NET: Added support for `Date32`, `Datetime64`, `Timestamp64` and `Interval64` types in YDB.
Expand All @@ -28,7 +29,7 @@
- Canceling AttachStream after calling the `DeleteSession` method.
- Fixed bug: fixed issue where session was not deleted (`ClientTransportTimeout`).
- Fixed bug: Grpc.Core.StatusCode.Cancelled was mapped to server's Canceled status.
- ADO.NET: PoolingSessionSource 2.0 based on Npgsql pooling algorithm.
- Feat ADO.NET: PoolingSessionSource 2.0 based on lock-free FIFO pooling algorithm.
- Added new ADO.NET options:
- `MinSessionPool`: The minimum connection pool size.
- `SessionIdleTimeout`: The time (in seconds) to wait before closing idle session in the pool if the count of all sessions exceeds `MinSessionPool`.
Expand Down
7 changes: 3 additions & 4 deletions src/Ydb.Sdk/src/Ado/PoolManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@ CancellationToken cancellationToken
return await sessionPool.OpenSession(cancellationToken);
}

await SemaphoreSlim.WaitAsync(cancellationToken);

try
{
await SemaphoreSlim.WaitAsync(cancellationToken);

if (Pools.TryGetValue(settings.ConnectionString, out var pool))
{
return await pool.OpenSession(cancellationToken);
Expand Down Expand Up @@ -52,10 +52,9 @@ internal static async Task ClearPool(string connectionString)
{
if (Pools.TryRemove(connectionString, out var sessionPool))
{
await SemaphoreSlim.WaitAsync();
try
{
await SemaphoreSlim.WaitAsync();

await sessionPool.DisposeAsync();
}
finally
Expand Down
4 changes: 2 additions & 2 deletions src/Ydb.Sdk/src/IDriver.cs
Original file line number Diff line number Diff line change
Expand Up @@ -210,14 +210,14 @@ protected async ValueTask<CallOptions> GetCallOptions(GrpcRequestSettings settin
}

public ILoggerFactory LoggerFactory { get; }
public void RegisterOwner() => ++_ownerCount;
public void RegisterOwner() => Interlocked.Increment(ref _ownerCount);
public bool IsDisposed => Disposed == 1;

public void Dispose() => DisposeAsync().AsTask().GetAwaiter().GetResult();

public async ValueTask DisposeAsync()
{
if (--_ownerCount <= 0 && Interlocked.CompareExchange(ref Disposed, 1, 0) == 0)
if (Interlocked.Decrement(ref _ownerCount) <= 0 && Interlocked.CompareExchange(ref Disposed, 1, 0) == 0)
{
await ChannelPool.DisposeAsync();

Expand Down
22 changes: 14 additions & 8 deletions src/Ydb.Sdk/test/Ydb.Sdk.Ado.Tests/PoolManagerTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,7 @@

namespace Ydb.Sdk.Ado.Tests;

[Collection("PoolManagerTests")]
[CollectionDefinition("PoolManagerTests", DisableParallelization = true)]
[Collection("DisableParallelization")]
public class PoolManagerTests
{
[Theory]
Expand All @@ -25,6 +24,17 @@ public class PoolManagerTests
"MinSessionSize=1;ConnectTimeout=8", "MinSessionSize=1;ConnectTimeout=9"
}, 5, 5)] // 5 transport, 5 five pools
[InlineData(new[] { "MinSessionSize=1" }, 1, 1)] // simple case
[InlineData(new[]
{
"MinSessionSize=1", "MinSessionSize=1", "MinSessionSize=1", "MinSessionSize=1", "MinSessionSize=1",
"MinSessionSize=1", "MinSessionSize=1", "MinSessionSize=1", "MinSessionSize=1", "MinSessionSize=1",
"MinSessionSize=1", "MinSessionSize=1", "MinSessionSize=1", "MinSessionSize=1", "MinSessionSize=1",
"MinSessionSize=1", "MinSessionSize=1", "MinSessionSize=1", "MinSessionSize=1", "MinSessionSize=1",
"MinSessionSize=2", "MinSessionSize=2", "MinSessionSize=2", "MinSessionSize=2", "MinSessionSize=2",
"MinSessionSize=2", "MinSessionSize=2", "MinSessionSize=2", "MinSessionSize=2", "MinSessionSize=2",
"MinSessionSize=2", "MinSessionSize=2", "MinSessionSize=2", "MinSessionSize=2", "MinSessionSize=3",
"MinSessionSize=3", "MinSessionSize=3", "MinSessionSize=3", "MinSessionSize=3", "MinSessionSize=3"
}, 1, 3)] // duplicate rows — we expect 1 transport, 3 pools, stress test
public async Task PoolManager_CachingAndCleanup(string[] connectionStrings, int expectedDrivers, int expectedPools)
{
await YdbConnection.ClearAllPools();
Expand All @@ -35,23 +45,19 @@ public async Task PoolManager_CachingAndCleanup(string[] connectionStrings, int
var connections = connectionStrings
.Select(connectionString => new YdbConnection(connectionString))
.ToImmutableArray();
var parallelTasks = connections.Select(connection => connection.OpenAsync()).ToList();
await Task.WhenAll(parallelTasks);
await Task.WhenAll(connections.Select(connection => connection.OpenAsync()));

Assert.Equal(expectedDrivers, PoolManager.Drivers.Count);
Assert.Equal(expectedPools, PoolManager.Pools.Count);

await ClearAllConnections(connections);

parallelTasks = connections.Select(connection => connection.OpenAsync()).ToList();
await Task.WhenAll(parallelTasks);
await Task.WhenAll(connections.Select(connection => connection.OpenAsync()));

foreach (var (_, driver) in PoolManager.Drivers)
Assert.False(driver.IsDisposed);

Assert.Equal(expectedDrivers, PoolManager.Drivers.Count);
Assert.Equal(expectedPools, PoolManager.Pools.Count);

await ClearAllConnections(connections);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
using Xunit;

namespace Ydb.Sdk.Ado.Tests.Utils;

[CollectionDefinition("DisableParallelization", DisableParallelization = true)]
public class DisableParallelization;
3 changes: 1 addition & 2 deletions src/Ydb.Sdk/test/Ydb.Sdk.Ado.Tests/YdbSchemaTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,7 @@

namespace Ydb.Sdk.Ado.Tests;

[Collection("YdbSchemaTests isolation test")]
[CollectionDefinition("YdbSchemaTests isolation test", DisableParallelization = true)]
[Collection("DisableParallelization")]
public class YdbSchemaTests : TestBase
{
private readonly string _table1;
Expand Down
Loading