Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion src/Ydb.Sdk/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
- Feat ADO.NET: Mark `YdbConnection.State` as `Broken` when the underlying session is broken, including background deactivation.
- Feat ADO.NET: Added YdbDataSource `ExecuteAsync` and `ExecuteInTransaction` convenience methods.
- **Breaking Change**: moved and renamed `Ydb.Sdk.Services.Query.TxMode` -> `Ydb.Sdk.Ado.TransactionMode`.
- Feat ADO.NET: cache gRPC transport by `gRPCConnectionString` to reuse channels.
- Feat ADO.NET: Cache gRPC transport by `gRPCConnectionString` to reuse channels.
- Fix bug wrap-around ADO.NET: Big parameterized Decimal — `((ulong)bits[1] << 32)` -> `((ulong)(uint)bits[1] << 32)`.
- Feat ADO.NET: Parameterized Decimal overflow check: `Precision` and `Scale`.
- Feat ADO.NET: Deleted support for `DateTimeOffset` was a mistake.
Expand Down
2 changes: 1 addition & 1 deletion src/Ydb.Sdk/src/Ado/PoolManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ CancellationToken cancellationToken

internal static async Task ClearPool(string connectionString)
{
if (Pools.Remove(connectionString, out var sessionPool))
if (Pools.TryRemove(connectionString, out var sessionPool))
{
try
{
Expand Down
73 changes: 38 additions & 35 deletions src/Ydb.Sdk/src/Ado/YdbConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -102,30 +102,38 @@ public override async Task OpenAsync(CancellationToken cancellationToken)

public override async Task CloseAsync()
{
if (State == ConnectionState.Closed)
// ReSharper disable once SwitchStatementHandlesSomeKnownEnumValuesWithDefault
switch (State)
{
return;
}

try
{
if (LastReader is { IsClosed: false })
{
await LastReader.CloseAsync();
}

if (CurrentTransaction is { Completed: false })
{
await CurrentTransaction.RollbackAsync();
}

OnStateChange(OpenToClosedEventArgs);

ConnectionState = ConnectionState.Closed;
}
finally
{
_session.Close();
case ConnectionState.Closed:
return;
case ConnectionState.Broken:
ConnectionState = ConnectionState.Closed;
_session.Close();
return;
default:
try
{
if (LastReader is { IsClosed: false })
{
await LastReader.CloseAsync();
}

if (CurrentTransaction is { Completed: false })
{
await CurrentTransaction.RollbackAsync();
}

OnStateChange(OpenToClosedEventArgs);

ConnectionState = ConnectionState.Closed;
}
finally
{
_session.Close();
}

break;
}
}

Expand All @@ -145,19 +153,14 @@ public override string ConnectionString

public override string Database => _connectionStringBuilder?.Database ?? string.Empty;

public override ConnectionState State => ConnectionState;
public override ConnectionState State =>
ConnectionState != ConnectionState.Closed && _session.IsBroken // maybe is updated asynchronously
? ConnectionState.Broken
: ConnectionState;

private ConnectionState ConnectionState { get; set; } = ConnectionState.Closed; // Invoke AsyncOpen()

internal void OnNotSuccessStatusCode(StatusCode code)
{
_session.OnNotSuccessStatusCode(code);

if (_session.IsBroken)
{
ConnectionState = ConnectionState.Broken;
}
}
internal void OnNotSuccessStatusCode(StatusCode code) => _session.OnNotSuccessStatusCode(code);

internal YdbDataReader? LastReader { get; set; }
internal string LastCommand { get; set; } = string.Empty;
Expand Down Expand Up @@ -203,15 +206,15 @@ public override Task<DataTable> GetSchemaAsync(

internal void ThrowIfConnectionClosed()
{
if (ConnectionState is ConnectionState.Closed or ConnectionState.Broken)
if (State is ConnectionState.Closed or ConnectionState.Broken)
{
throw new InvalidOperationException("Connection is closed");
}
}

private void ThrowIfConnectionOpen()
{
if (ConnectionState == ConnectionState.Open)
if (State == ConnectionState.Open)
{
throw new InvalidOperationException("Connection already open");
}
Expand Down
4 changes: 2 additions & 2 deletions src/Ydb.Sdk/src/IDriver.cs
Original file line number Diff line number Diff line change
Expand Up @@ -210,14 +210,14 @@ protected async ValueTask<CallOptions> GetCallOptions(GrpcRequestSettings settin
}

public ILoggerFactory LoggerFactory { get; }
public void RegisterOwner() => _ownerCount++;
public void RegisterOwner() => ++_ownerCount;
public bool IsDisposed => Disposed == 1;

public void Dispose() => DisposeAsync().AsTask().GetAwaiter().GetResult();

public async ValueTask DisposeAsync()
{
if (Interlocked.Decrement(ref _ownerCount) <= 0 && Interlocked.CompareExchange(ref Disposed, 1, 0) == 0)
if (--_ownerCount <= 0 && Interlocked.CompareExchange(ref Disposed, 1, 0) == 0)
{
await ChannelPool.DisposeAsync();

Expand Down
4 changes: 2 additions & 2 deletions src/Ydb.Sdk/test/Ydb.Sdk.Ado.Tests/PoolManagerTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ public class PoolManagerTests
public async Task PoolManager_CachingAndCleanup(string[] connectionStrings, int expectedDrivers, int expectedPools)
{
await YdbConnection.ClearAllPools();
foreach (var (_, driver) in PoolManager.Drivers)
Assert.True(driver.IsDisposed);
PoolManager.Drivers.Clear();

var connections = connectionStrings
Expand All @@ -45,9 +47,7 @@ public async Task PoolManager_CachingAndCleanup(string[] connectionStrings, int
await Task.WhenAll(parallelTasks);

foreach (var (_, driver) in PoolManager.Drivers)
{
Assert.False(driver.IsDisposed);
}

Assert.Equal(expectedDrivers, PoolManager.Drivers.Count);
Assert.Equal(expectedPools, PoolManager.Pools.Count);
Expand Down
Loading