diff --git a/slo/src/AdoNet/SloTableContext.cs b/slo/src/AdoNet/SloTableContext.cs index 43190480..0ef0e613 100644 --- a/slo/src/AdoNet/SloTableContext.cs +++ b/slo/src/AdoNet/SloTableContext.cs @@ -9,13 +9,9 @@ public class SloTableContext : SloTableContext { protected override string Job => "AdoNet"; - protected override YdbDataSource CreateClient(Config config) => new( - new YdbConnectionStringBuilder(config.ConnectionString) - { - LoggerFactory = ISloContext.Factory, - RetryPolicy = new YdbRetryPolicy(new YdbRetryPolicyConfig { EnableRetryIdempotence = true }) - } - ); + protected override YdbDataSource CreateClient(Config config) => new YdbDataSourceBuilder( + new YdbConnectionStringBuilder(config.ConnectionString) { LoggerFactory = ISloContext.Factory } + ) { RetryPolicy = new YdbRetryPolicy(new YdbRetryPolicyConfig { EnableRetryIdempotence = true }) }.Build(); protected override async Task Create(YdbDataSource client, int operationTimeout) { @@ -96,34 +92,29 @@ await client.ExecuteAsync(async ydbConnection => return attempts; } - protected override async Task<(int, object?)> Select( + protected override async Task Select( YdbDataSource client, (Guid Guid, int Id) select, int readTimeout ) { - var attempts = 0; - var policyResult = await client.ExecuteAsync(async ydbConnection => + await using var ydbConnection = await client.OpenRetryableConnectionAsync(); + + var ydbCommand = new YdbCommand(ydbConnection) { - attempts++; - var ydbCommand = new YdbCommand(ydbConnection) + CommandText = $""" + SELECT Guid, Id, PayloadStr, PayloadDouble, PayloadTimestamp + FROM `{SloTable.Name}` WHERE Guid = @Guid AND Id = @Id; + """, + CommandTimeout = readTimeout, + Parameters = { - CommandText = $""" - SELECT Guid, Id, PayloadStr, PayloadDouble, PayloadTimestamp - FROM `{SloTable.Name}` WHERE Guid = @Guid AND Id = @Id; - """, - CommandTimeout = readTimeout, - Parameters = - { - new YdbParameter { ParameterName = "Guid", DbType = DbType.Guid, Value = select.Guid }, - new YdbParameter { ParameterName = "Id", DbType = DbType.Int32, Value = select.Id } - } - }; - - return await ydbCommand.ExecuteScalarAsync(); - }); + new YdbParameter { ParameterName = "Guid", DbType = DbType.Guid, Value = select.Guid }, + new YdbParameter { ParameterName = "Id", DbType = DbType.Int32, Value = select.Id } + } + }; - return (attempts, policyResult); + return await ydbCommand.ExecuteScalarAsync(); } protected override async Task SelectCount(YdbDataSource client) diff --git a/slo/src/Dapper/SloTableContext.cs b/slo/src/Dapper/SloTableContext.cs index 28122bdf..93350a59 100644 --- a/slo/src/Dapper/SloTableContext.cs +++ b/slo/src/Dapper/SloTableContext.cs @@ -9,13 +9,9 @@ public class SloTableContext : SloTableContext { protected override string Job => "Dapper"; - protected override YdbDataSource CreateClient(Config config) => new( - new YdbConnectionStringBuilder(config.ConnectionString) - { - LoggerFactory = ISloContext.Factory, - RetryPolicy = new YdbRetryPolicy(new YdbRetryPolicyConfig { EnableRetryIdempotence = true }) - } - ); + protected override YdbDataSource CreateClient(Config config) => new YdbDataSourceBuilder( + new YdbConnectionStringBuilder(config.ConnectionString) { LoggerFactory = ISloContext.Factory } + ) { RetryPolicy = new YdbRetryPolicy(new YdbRetryPolicyConfig { EnableRetryIdempotence = true }) }.Build(); protected override async Task Create(YdbDataSource client, int operationTimeout) { @@ -28,45 +24,33 @@ await connection.ExecuteAsync($""" PayloadDouble Double, PayloadTimestamp Timestamp, PRIMARY KEY (Guid, Id) - ); + ); {SloTable.Options} """); } protected override async Task Save(YdbDataSource client, SloTable sloTable, int writeTimeout) { - var attempt = 0; - await client.ExecuteAsync(async ydbConnection => - { - attempt++; - await ydbConnection.ExecuteAsync( - $""" - UPSERT INTO `{SloTable.Name}` (Guid, Id, PayloadStr, PayloadDouble, PayloadTimestamp) - VALUES (@Guid, @Id, @PayloadStr, @PayloadDouble, @PayloadTimestamp) - """, sloTable); - } - ); - - return attempt; + await using var ydbConnection = await client.OpenRetryableConnectionAsync(); + await ydbConnection.ExecuteAsync( + $""" + UPSERT INTO `{SloTable.Name}` (Guid, Id, PayloadStr, PayloadDouble, PayloadTimestamp) + VALUES (@Guid, @Id, @PayloadStr, @PayloadDouble, @PayloadTimestamp) + """, sloTable); + + return 1; } - protected override async Task<(int, object?)> Select(YdbDataSource client, (Guid Guid, int Id) select, + protected override async Task Select(YdbDataSource client, (Guid Guid, int Id) select, int readTimeout) { - var attempts = 0; - var policyResult = await client.ExecuteAsync(async ydbConnection => - { - attempts++; - return await ydbConnection.QueryFirstOrDefaultAsync( - $""" - SELECT Guid, Id, PayloadStr, PayloadDouble, PayloadTimestamp - FROM `{SloTable.Name}` WHERE Guid = @Guid AND Id = @Id; - """, - new { select.Guid, select.Id } - ); - }); - - return (attempts, policyResult); + await using var ydbConnection = await client.OpenRetryableConnectionAsync(); + return await ydbConnection.QueryFirstOrDefaultAsync( + $""" + SELECT Guid, Id, PayloadStr, PayloadDouble, PayloadTimestamp + FROM `{SloTable.Name}` WHERE Guid = @Guid AND Id = @Id; + """, new { select.Guid, select.Id } + ); } protected override async Task SelectCount(YdbDataSource client) diff --git a/slo/src/EF/SloTableContext.cs b/slo/src/EF/SloTableContext.cs index 2e6cc204..651ef93c 100644 --- a/slo/src/EF/SloTableContext.cs +++ b/slo/src/EF/SloTableContext.cs @@ -74,15 +74,15 @@ await executeStrategy.ExecuteAsync(async () => return 0; } - protected override async Task<(int, object?)> Select( + protected override async Task Select( PooledDbContextFactory client, (Guid Guid, int Id) select, int readTimeout ) { await using var dbContext = await client.CreateDbContextAsync(); - return (0, await dbContext.SloEntities.FirstOrDefaultAsync(table => - table.Guid == select.Guid && table.Id == select.Id)); + return await dbContext.SloEntities.FirstOrDefaultAsync(table => + table.Guid == select.Guid && table.Id == select.Id); } protected override async Task SelectCount(PooledDbContextFactory client) diff --git a/slo/src/Internal/SloTableContext.cs b/slo/src/Internal/SloTableContext.cs index 8abcceab..5b9cce18 100644 --- a/slo/src/Internal/SloTableContext.cs +++ b/slo/src/Internal/SloTableContext.cs @@ -22,7 +22,7 @@ public abstract class SloTableContext : ISloContext { private const int IntervalMs = 100; - protected static readonly ILogger Logger = ISloContext.Factory.CreateLogger>(); + private static readonly ILogger Logger = ISloContext.Factory.CreateLogger>(); private volatile int _maxId; @@ -129,8 +129,7 @@ public async Task Run(RunConfig runConfig) Logger.LogInformation("Run task is finished"); return; - async Task ShootingTask(RateLimiter rateLimitPolicy, string operationType, - Func> action) + async Task ShootingTask(RateLimiter rateLimitPolicy, string operationType, Func action) { var metricFactory = Metrics.WithLabels(new Dictionary { @@ -182,11 +181,6 @@ async Task ShootingTask(RateLimiter rateLimitPolicy, string operationType, } ); - var retryAttempts = metricFactory.CreateGauge( - "sdk_retry_attempts", - "Current retry attempts, categorized by operation type." - ); - var pendingOperations = metricFactory.CreateGauge( "sdk_pending_operations", "Current number of pending operations, categorized by type." @@ -218,9 +212,8 @@ async Task ShootingTask(RateLimiter rateLimitPolicy, string operationType, var sw = Stopwatch.StartNew(); try { - var attempts = await action(client, runConfig); + await action(client, runConfig); sw.Stop(); - retryAttempts.Set(attempts); operationsTotal.Inc(); pendingOperations.Dec(); operationsSuccessTotal.Inc(); @@ -248,7 +241,7 @@ async Task ShootingTask(RateLimiter rateLimitPolicy, string operationType, // return attempt count & StatusCode operation protected abstract Task Save(T client, SloTable sloTable, int writeTimeout); - protected abstract Task<(int, object?)> Select(T client, (Guid Guid, int Id) select, int readTimeout); + protected abstract Task Select(T client, (Guid Guid, int Id) select, int readTimeout); protected abstract Task SelectCount(T client); @@ -272,12 +265,10 @@ private Task Save(T client, Config config) return Save(client, sloTable, config.WriteTimeout); } - private async Task Select(T client, RunConfig config) + private async Task Select(T client, RunConfig config) { var id = Random.Shared.Next(_maxId); - var (attempts, _) = await Select(client, new ValueTuple(GuidFromInt(id), id), config.ReadTimeout); - - return attempts; + _ = await Select(client, new ValueTuple(GuidFromInt(id), id), config.ReadTimeout); } private static Guid GuidFromInt(int value) diff --git a/src/Ydb.Sdk/CHANGELOG.md b/src/Ydb.Sdk/CHANGELOG.md index 606c74cf..ec28d670 100644 --- a/src/Ydb.Sdk/CHANGELOG.md +++ b/src/Ydb.Sdk/CHANGELOG.md @@ -1,3 +1,4 @@ +- Feat ADO.NET: `YdbDataSource.OpenRetryableConnectionAsync` opens a retryable connection with automatic retries for transient failures. - Fixed bug ADO.NET/PoolManager: `SemaphoreSlim.WaitAsync` over-release on cancellation. - Feat ADO.NET: Mark `YdbConnection.State` as `Broken` when the underlying session is broken, including background deactivation. - Feat ADO.NET: Added YdbDataSource `ExecuteAsync` and `ExecuteInTransaction` convenience methods. diff --git a/src/Ydb.Sdk/src/Ado/PoolManager.cs b/src/Ydb.Sdk/src/Ado/PoolManager.cs index ce4d4d29..3f14c285 100644 --- a/src/Ydb.Sdk/src/Ado/PoolManager.cs +++ b/src/Ydb.Sdk/src/Ado/PoolManager.cs @@ -10,14 +10,14 @@ internal static class PoolManager internal static readonly ConcurrentDictionary Drivers = new(); internal static readonly ConcurrentDictionary Pools = new(); - internal static async Task GetSession( + internal static async ValueTask Get( YdbConnectionStringBuilder settings, CancellationToken cancellationToken ) { if (Pools.TryGetValue(settings.ConnectionString, out var sessionPool)) { - return await sessionPool.OpenSession(cancellationToken); + return sessionPool; } await SemaphoreSlim.WaitAsync(cancellationToken); @@ -26,7 +26,7 @@ CancellationToken cancellationToken { if (Pools.TryGetValue(settings.ConnectionString, out var pool)) { - return await pool.OpenSession(cancellationToken); + return pool; } var driver = Drivers.TryGetValue(settings.GrpcConnectionString, out var cacheDriver) && @@ -40,7 +40,7 @@ CancellationToken cancellationToken Pools[settings.ConnectionString] = newSessionPool; - return await newSessionPool.OpenSession(cancellationToken); + return newSessionPool; } finally { diff --git a/src/Ydb.Sdk/src/Ado/Session/ISession.cs b/src/Ydb.Sdk/src/Ado/Session/ISession.cs index a2237436..db59ef4b 100644 --- a/src/Ydb.Sdk/src/Ado/Session/ISession.cs +++ b/src/Ydb.Sdk/src/Ado/Session/ISession.cs @@ -3,7 +3,7 @@ namespace Ydb.Sdk.Ado.Session; -internal interface ISession +internal interface ISession : IDisposable { IDriver Driver { get; } @@ -21,6 +21,4 @@ ValueTask> ExecuteQuery( Task RollbackTransaction(string txId, CancellationToken cancellationToken = default); void OnNotSuccessStatusCode(StatusCode code); - - void Close(); } diff --git a/src/Ydb.Sdk/src/Ado/Session/ImplicitSession.cs b/src/Ydb.Sdk/src/Ado/Session/ImplicitSession.cs index 596f78b2..2a3c1e0e 100644 --- a/src/Ydb.Sdk/src/Ado/Session/ImplicitSession.cs +++ b/src/Ydb.Sdk/src/Ado/Session/ImplicitSession.cs @@ -47,10 +47,9 @@ public void OnNotSuccessStatusCode(StatusCode code) { } - public void Close() + public void Dispose() { } - private static YdbException NotSupportedTransaction => - new(StatusCode.BadRequest, "Transactions are not supported in implicit sessions"); + private static YdbException NotSupportedTransaction => new("Transactions are not supported in implicit sessions"); } diff --git a/src/Ydb.Sdk/src/Ado/Session/PoolingSession.cs b/src/Ydb.Sdk/src/Ado/Session/PoolingSession.cs index 2849a61f..18e6c808 100644 --- a/src/Ydb.Sdk/src/Ado/Session/PoolingSession.cs +++ b/src/Ydb.Sdk/src/Ado/Session/PoolingSession.cs @@ -99,7 +99,9 @@ StatusCode.BadSession or StatusCode.SessionBusy or StatusCode.SessionExpired or StatusCode.ClientTransportTimeout or - StatusCode.ClientTransportUnavailable) + StatusCode.ClientTransportUnavailable or + StatusCode.ClientTransportResourceExhausted or + StatusCode.ClientTransportUnknown) { _logger.LogWarning("Session[{SessionId}] is deactivated. Reason Status: {Status}", SessionId, statusCode); diff --git a/src/Ydb.Sdk/src/Ado/Session/PoolingSessionSource.cs b/src/Ydb.Sdk/src/Ado/Session/PoolingSessionSource.cs index a9d4f99d..a0a70817 100644 --- a/src/Ydb.Sdk/src/Ado/Session/PoolingSessionSource.cs +++ b/src/Ydb.Sdk/src/Ado/Session/PoolingSessionSource.cs @@ -337,5 +337,5 @@ public abstract ValueTask> ExecuteQuery( public abstract void OnNotSuccessStatusCode(StatusCode code); - public void Close() => _source.Return((T)this); + public void Dispose() => _source.Return((T)this); } diff --git a/src/Ydb.Sdk/src/Ado/Session/RetryableSession.cs b/src/Ydb.Sdk/src/Ado/Session/RetryableSession.cs new file mode 100644 index 00000000..ee4d220d --- /dev/null +++ b/src/Ydb.Sdk/src/Ado/Session/RetryableSession.cs @@ -0,0 +1,121 @@ +using Ydb.Query; +using Ydb.Sdk.Ado.Internal; +using Ydb.Sdk.Ado.RetryPolicy; + +namespace Ydb.Sdk.Ado.Session; + +internal class RetryableSession : ISession +{ + private readonly ISessionSource _sessionSource; + private readonly YdbRetryPolicyExecutor _retryPolicyExecutor; + + internal RetryableSession(ISessionSource sessionSource, YdbRetryPolicyExecutor retryPolicyExecutor) + { + _sessionSource = sessionSource; + _retryPolicyExecutor = retryPolicyExecutor; + } + + public IDriver Driver => throw new NotImplementedException(); + public bool IsBroken => false; + + public ValueTask> ExecuteQuery( + string query, + Dictionary parameters, + GrpcRequestSettings settings, + TransactionControl? txControl + ) + { + if (txControl is not null && !txControl.CommitTx) + { + throw NotSupportedTransaction; + } + + return new ValueTask>( + new InMemoryServerStream(_sessionSource, _retryPolicyExecutor, query, parameters, settings)); + } + + public Task CommitTransaction(string txId, CancellationToken cancellationToken = default) => + throw NotSupportedTransaction; + + public Task RollbackTransaction(string txId, CancellationToken cancellationToken = default) => + throw NotSupportedTransaction; + + public void OnNotSuccessStatusCode(StatusCode code) + { + } + + public void Dispose() + { + } + + private static YdbException NotSupportedTransaction => new("Transactions are not supported in retryable session"); +} + +internal sealed class InMemoryServerStream : IServerStream +{ + private readonly ISessionSource _sessionSource; + private readonly YdbRetryPolicyExecutor _ydbRetryPolicyExecutor; + private readonly string _query; + private readonly Dictionary _parameters; + private readonly GrpcRequestSettings _settings; + + private List? _responses; + private int _index = -1; + + public InMemoryServerStream( + ISessionSource sessionSource, + YdbRetryPolicyExecutor retryPolicyExecutor, + string query, + Dictionary parameters, + GrpcRequestSettings settings) + { + _sessionSource = sessionSource; + _ydbRetryPolicyExecutor = retryPolicyExecutor; + _query = query; + _parameters = parameters; + _settings = settings; + } + + public async Task MoveNextAsync(CancellationToken cancellationToken = default) + { + _responses ??= await _ydbRetryPolicyExecutor.ExecuteAsync>(async ct => + { + using var session = await _sessionSource.OpenSession(ct); + + try + { + var responses = new List(); + var serverStream = await session.ExecuteQuery(_query, _parameters, _settings, null); + + while (await serverStream.MoveNextAsync(ct)) + { + var current = serverStream.Current; + + if (current.Status.IsNotSuccess()) + { + throw YdbException.FromServer(current.Status, current.Issues); + } + + responses.Add(serverStream.Current); + } + + return responses; + } + catch (YdbException e) + { + session.OnNotSuccessStatusCode(e.Code); + throw; + } + }, cancellationToken); + + return ++_index < _responses.Count; + } + + public ExecuteQueryResponsePart Current => _responses is not null && _index >= 0 && _index < _responses.Count + ? _responses[_index] + : throw new InvalidOperationException("Enumeration has not started or has already finished"); + + public void Dispose() + { + } +} diff --git a/src/Ydb.Sdk/src/Ado/YdbConnection.cs b/src/Ydb.Sdk/src/Ado/YdbConnection.cs index 83a2a80e..efd44cad 100644 --- a/src/Ydb.Sdk/src/Ado/YdbConnection.cs +++ b/src/Ydb.Sdk/src/Ado/YdbConnection.cs @@ -2,6 +2,7 @@ using System.Data.Common; using System.Diagnostics.CodeAnalysis; using Ydb.Sdk.Ado.BulkUpsert; +using Ydb.Sdk.Ado.RetryPolicy; using Ydb.Sdk.Ado.Session; using static System.Data.IsolationLevel; @@ -81,9 +82,7 @@ public YdbTransaction BeginTransaction(TransactionMode transactionMode = Transac return CurrentTransaction; } - public override void ChangeDatabase(string databaseName) - { - } + public override void ChangeDatabase(string databaseName) => throw new NotSupportedException(); public override void Close() => CloseAsync().GetAwaiter().GetResult(); @@ -93,13 +92,29 @@ public override async Task OpenAsync(CancellationToken cancellationToken) { ThrowIfConnectionOpen(); - Session = await PoolManager.GetSession(ConnectionStringBuilder, cancellationToken); + var sessionSource = await PoolManager.Get(ConnectionStringBuilder, cancellationToken); + + Session = await sessionSource.OpenSession(cancellationToken); OnStateChange(ClosedToOpenEventArgs); ConnectionState = ConnectionState.Open; } + internal async ValueTask OpenAsync( + YdbRetryPolicyExecutor retryPolicyExecutor, + CancellationToken cancellationToken = default + ) + { + ThrowIfConnectionOpen(); + + var sessionSource = await PoolManager.Get(ConnectionStringBuilder, cancellationToken); + + Session = new RetryableSession(sessionSource, retryPolicyExecutor); + + ConnectionState = ConnectionState.Open; + } + public override async Task CloseAsync() { // ReSharper disable once SwitchStatementHandlesSomeKnownEnumValuesWithDefault @@ -109,7 +124,7 @@ public override async Task CloseAsync() return; case ConnectionState.Broken: ConnectionState = ConnectionState.Closed; - _session.Close(); + _session.Dispose(); return; default: try @@ -130,7 +145,7 @@ public override async Task CloseAsync() } finally { - _session.Close(); + _session.Dispose(); } break; diff --git a/src/Ydb.Sdk/src/Ado/YdbConnectionStringBuilder.cs b/src/Ydb.Sdk/src/Ado/YdbConnectionStringBuilder.cs index 4f990495..d5ce7f51 100644 --- a/src/Ydb.Sdk/src/Ado/YdbConnectionStringBuilder.cs +++ b/src/Ydb.Sdk/src/Ado/YdbConnectionStringBuilder.cs @@ -3,7 +3,6 @@ using System.Security.Cryptography.X509Certificates; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging.Abstractions; -using Ydb.Sdk.Ado.RetryPolicy; using Ydb.Sdk.Auth; using Ydb.Sdk.Transport; @@ -321,10 +320,6 @@ public int CreateSessionTimeout public X509Certificate2Collection? ServerCertificates { get; init; } - public IRetryPolicy RetryPolicy { get; init; } = YdbRetryPolicy.Default; - - internal YdbRetryPolicyExecutor YdbRetryPolicyExecutor => new(RetryPolicy); - private void SaveValue(string propertyName, object? value) { if (value == null) diff --git a/src/Ydb.Sdk/src/Ado/YdbDataSource.cs b/src/Ydb.Sdk/src/Ado/YdbDataSource.cs index 69f0ff42..d37e94c3 100644 --- a/src/Ydb.Sdk/src/Ado/YdbDataSource.cs +++ b/src/Ydb.Sdk/src/Ado/YdbDataSource.cs @@ -22,22 +22,23 @@ private static YdbRetryPolicyExecutor GetExecutor(YdbRetryPolicyConfig config) = private readonly YdbConnectionStringBuilder _ydbConnectionStringBuilder; private readonly YdbRetryPolicyExecutor _retryPolicyExecutor; - public YdbDataSource(YdbConnectionStringBuilder connectionStringBuilder) + public YdbDataSource() : this(new YdbDataSourceBuilder()) { - _ydbConnectionStringBuilder = connectionStringBuilder; - _retryPolicyExecutor = _ydbConnectionStringBuilder.YdbRetryPolicyExecutor; } - public YdbDataSource(string connectionString) + public YdbDataSource(string connectionString) : this(new YdbDataSourceBuilder(connectionString)) { - _ydbConnectionStringBuilder = new YdbConnectionStringBuilder(connectionString); - _retryPolicyExecutor = _ydbConnectionStringBuilder.YdbRetryPolicyExecutor; } - public YdbDataSource() + public YdbDataSource(YdbConnectionStringBuilder connectionStringBuilder) + : this(new YdbDataSourceBuilder(connectionStringBuilder)) { - _ydbConnectionStringBuilder = new YdbConnectionStringBuilder(); - _retryPolicyExecutor = _ydbConnectionStringBuilder.YdbRetryPolicyExecutor; + } + + internal YdbDataSource(YdbDataSourceBuilder builder) + { + _ydbConnectionStringBuilder = builder.ConnectionStringBuilder; + _retryPolicyExecutor = new YdbRetryPolicyExecutor(builder.RetryPolicy); } protected @@ -383,4 +384,58 @@ public Task ExecuteInTransactionAsync( await transaction.CommitAsync(ct); return result; }, cancellationToken); + + public async ValueTask OpenRetryableConnectionAsync(CancellationToken cancellationToken = default) + { + var ydbConnection = CreateDbConnection(); + try + { + await ydbConnection.OpenAsync(_retryPolicyExecutor, cancellationToken); + + return ydbConnection; + } + catch + { + await ydbConnection.CloseAsync(); + throw; + } + } + + public async ValueTask OpenRetryableConnectionAsync( + YdbRetryPolicyConfig ydbRetryPolicyConfig, + CancellationToken cancellationToken = default + ) + { + var ydbConnection = CreateDbConnection(); + try + { + await ydbConnection.OpenAsync(GetExecutor(ydbRetryPolicyConfig), cancellationToken); + + return ydbConnection; + } + catch + { + await ydbConnection.CloseAsync(); + throw; + } + } + + public async ValueTask OpenRetryableConnectionAsync( + IRetryPolicy retryPolicy, + CancellationToken cancellationToken = default + ) + { + var ydbConnection = CreateDbConnection(); + try + { + await ydbConnection.OpenAsync(new YdbRetryPolicyExecutor(retryPolicy), cancellationToken); + + return ydbConnection; + } + catch + { + await ydbConnection.CloseAsync(); + throw; + } + } } diff --git a/src/Ydb.Sdk/src/Ado/YdbDataSourceBuilder.cs b/src/Ydb.Sdk/src/Ado/YdbDataSourceBuilder.cs new file mode 100644 index 00000000..e1c3ad0e --- /dev/null +++ b/src/Ydb.Sdk/src/Ado/YdbDataSourceBuilder.cs @@ -0,0 +1,35 @@ +using Ydb.Sdk.Ado.RetryPolicy; + +namespace Ydb.Sdk.Ado; + +public class YdbDataSourceBuilder +{ + public YdbDataSourceBuilder() + { + ConnectionStringBuilder = new YdbConnectionStringBuilder(); + } + + public YdbDataSourceBuilder(string connectionString) + { + ConnectionStringBuilder = new YdbConnectionStringBuilder(connectionString); + } + + public YdbDataSourceBuilder(YdbConnectionStringBuilder connectionStringBuilder) + { + ConnectionStringBuilder = connectionStringBuilder; + } + + /// + /// A connection string builder that can be used to configure the connection string on the builder. + /// + public YdbConnectionStringBuilder ConnectionStringBuilder { get; } + + /// + /// Returns the connection string, as currently configured on the builder. + /// + public string ConnectionString => ConnectionStringBuilder.ConnectionString; + + public IRetryPolicy RetryPolicy { get; set; } = new YdbRetryPolicy(YdbRetryPolicyConfig.Default); + + public YdbDataSource Build() => new(this); +} diff --git a/src/Ydb.Sdk/src/IDriver.cs b/src/Ydb.Sdk/src/IDriver.cs index b5edea8c..be30ec3b 100644 --- a/src/Ydb.Sdk/src/IDriver.cs +++ b/src/Ydb.Sdk/src/IDriver.cs @@ -41,7 +41,7 @@ public interface IBidirectionalStream : IDisposable { public Task Write(TRequest request); - public ValueTask MoveNextAsync(); + public Task MoveNextAsync(); public TResponse Current { get; } @@ -52,7 +52,7 @@ public interface IBidirectionalStream : IDisposable public interface IServerStream : IDisposable { - public ValueTask MoveNextAsync(CancellationToken cancellationToken = default); + public Task MoveNextAsync(CancellationToken cancellationToken = default); public TResponse Current { get; } } @@ -237,7 +237,7 @@ internal ServerStream(AsyncServerStreamingCall stream, Action MoveNextAsync(CancellationToken cancellationToken = default) + public async Task MoveNextAsync(CancellationToken cancellationToken = default) { try { @@ -294,7 +294,7 @@ public async Task Write(TRequest request) } } - public async ValueTask MoveNextAsync() + public async Task MoveNextAsync() { try { diff --git a/src/Ydb.Sdk/test/Ydb.Sdk.Ado.Benchmarks/SessionSourceBenchmark.cs b/src/Ydb.Sdk/test/Ydb.Sdk.Ado.Benchmarks/SessionSourceBenchmark.cs index ae41ef9f..f923d637 100644 --- a/src/Ydb.Sdk/test/Ydb.Sdk.Ado.Benchmarks/SessionSourceBenchmark.cs +++ b/src/Ydb.Sdk/test/Ydb.Sdk.Ado.Benchmarks/SessionSourceBenchmark.cs @@ -23,8 +23,7 @@ public void Setup() [Benchmark] public async Task SingleThreaded_OpenClose() { - var session = await _poolingSessionSource.OpenSession(); - session.Close(); + using var session = await _poolingSessionSource.OpenSession(); } [Benchmark] @@ -36,9 +35,8 @@ public async Task MultiThreaded_OpenClose() { tasks[i] = Task.Run(async () => { - var session = await _poolingSessionSource.OpenSession(); + using var session = await _poolingSessionSource.OpenSession(); await Task.Yield(); - session.Close(); }); } @@ -55,9 +53,8 @@ public async Task HighContention_OpenClose() { tasks[i] = Task.Run(async () => { - var session = await _poolingSessionSource.OpenSession(); + using var session = await _poolingSessionSource.OpenSession(); await Task.Yield(); - session.Close(); }); } @@ -76,9 +73,8 @@ public async Task SessionReuse_Pattern() { for (var j = 0; j < iterations; j++) { - var session = await _poolingSessionSource.OpenSession(); + using var session = await _poolingSessionSource.OpenSession(); await Task.Yield(); - session.Close(); } }); } @@ -99,9 +95,8 @@ public async Task SessionReuse_HighContention_Pattern() { for (var j = 0; j < iterations; j++) { - var session = await _poolingSessionSource.OpenSession(); + using var session = await _poolingSessionSource.OpenSession(); await Task.Yield(); - session.Close(); } }); } @@ -121,9 +116,8 @@ public async Task SessionReuse_HighIterations_Pattern() { for (var j = 0; j < iterations; j++) { - var session = await _poolingSessionSource.OpenSession(); + using var session = await _poolingSessionSource.OpenSession(); await Task.Yield(); - session.Close(); } }); } diff --git a/src/Ydb.Sdk/test/Ydb.Sdk.Ado.Tests/Session/MockPoolingSessionFactory.cs b/src/Ydb.Sdk/test/Ydb.Sdk.Ado.Tests/Session/MockPoolingSessionFactory.cs new file mode 100644 index 00000000..946f62d5 --- /dev/null +++ b/src/Ydb.Sdk/test/Ydb.Sdk.Ado.Tests/Session/MockPoolingSessionFactory.cs @@ -0,0 +1,83 @@ +using Xunit; +using Ydb.Query; +using Ydb.Sdk.Ado.Session; + +namespace Ydb.Sdk.Ado.Tests.Session; + +internal class MockPoolingSessionFactory(int maxSessionSize) : IPoolingSessionFactory +{ + private int _sessionOpened; + private int _numSession; + + internal int SessionOpenedCount => Volatile.Read(ref _sessionOpened); + internal int NumSession => Volatile.Read(ref _numSession); + + internal Func Open { private get; init; } = _ => Task.CompletedTask; + internal Func IsBroken { private get; init; } = _ => false; + internal Func Dispose { private get; init; } = () => ValueTask.CompletedTask; + + internal Func> ExecuteQuery { private get; init; } = + _ => throw new NotImplementedException(); + + public MockPoolingSession NewSession(PoolingSessionSource source) => + new(source, + async sessionCountOpened => + { + await Open(sessionCountOpened); + + Assert.True(Interlocked.Increment(ref _numSession) <= maxSessionSize); + + await Task.Yield(); + }, + () => + { + Assert.True(Interlocked.Decrement(ref _numSession) >= 0); + + return Task.CompletedTask; + }, + IsBroken, + ExecuteQuery, + Interlocked.Increment(ref _sessionOpened) + ); + + public ValueTask DisposeAsync() => Dispose(); +} + +internal class MockPoolingSession( + PoolingSessionSource source, + Func mockOpen, + Func mockDeleteSession, + Func mockIsBroken, + Func> executeQuery, + int sessionId +) : PoolingSessionBase(source) +{ + private bool _isBroken; + + public int SessionId => sessionId; + public override IDriver Driver => null!; + public override bool IsBroken => _isBroken || mockIsBroken(sessionId); + + internal override Task Open(CancellationToken cancellationToken) => mockOpen(sessionId); + internal override Task DeleteSession() => mockDeleteSession(); + + public override ValueTask> ExecuteQuery( + string query, + Dictionary parameters, + GrpcRequestSettings settings, + TransactionControl? txControl + ) => new(executeQuery(sessionId)); + + public override Task CommitTransaction(string txId, CancellationToken cancellationToken = default) => + throw new NotImplementedException(); + + public override Task RollbackTransaction(string txId, CancellationToken cancellationToken = default) => + throw new NotImplementedException(); + + public override void OnNotSuccessStatusCode(StatusCode code) => _isBroken = true; +} + +internal static class ISessionExtension +{ + internal static int SessionId(this ISession session) => ((MockPoolingSession)session).SessionId; +} diff --git a/src/Ydb.Sdk/test/Ydb.Sdk.Ado.Tests/Session/PoolingSessionSourceMockTests.cs b/src/Ydb.Sdk/test/Ydb.Sdk.Ado.Tests/Session/PoolingSessionSourceMockTests.cs index 92e1c32c..f1a096af 100644 --- a/src/Ydb.Sdk/test/Ydb.Sdk.Ado.Tests/Session/PoolingSessionSourceMockTests.cs +++ b/src/Ydb.Sdk/test/Ydb.Sdk.Ado.Tests/Session/PoolingSessionSourceMockTests.cs @@ -1,6 +1,5 @@ using System.Collections.Concurrent; using Xunit; -using Ydb.Query; using Ydb.Sdk.Ado.Session; namespace Ydb.Sdk.Ado.Tests.Session; @@ -18,11 +17,16 @@ public async Task Reuse_Session_Before_Creating_new() { var sessionSource = new PoolingSessionSource(new MockPoolingSessionFactory(1), new YdbConnectionStringBuilder()); - var session = await sessionSource.OpenSession(); - var sessionId = session.SessionId(); - session.Close(); - session = await sessionSource.OpenSession(); - Assert.Equal(sessionId, session.SessionId()); + int sessionId; + using (var session = await sessionSource.OpenSession()) + { + sessionId = session.SessionId(); + } + + using (var session = await sessionSource.OpenSession()) + { + Assert.Equal(sessionId, session.SessionId()); + } } [Fact] @@ -54,11 +58,10 @@ public async Task Creating_Session_Throw_Exception() { try { - var session = await sessionSource.OpenSession(); + using var session = await sessionSource.OpenSession(); // ReSharper disable once AccessToModifiedClosure Interlocked.Increment(ref countSuccess); Assert.True(session.SessionId() > maxSessionSize * 2); - session.Close(); } catch (YdbException e) { @@ -92,10 +95,9 @@ public async Task HighContention_OpenClose_NotCanceledException() { tasks[i] = Task.Run(async () => { - var session = await sessionSource.OpenSession(); + using var session = await sessionSource.OpenSession(); Assert.True(session.SessionId() <= maxSessionSize); await Task.Yield(); - session.Close(); }); } @@ -123,8 +125,7 @@ public async Task DisposeAsync_Cancel_WaitersSession() { waitingSessionTasks.Add(Task.Run(async () => { - var session = await sessionSource.OpenSession(); - session.Close(); + using var session = await sessionSource.OpenSession(); })); } @@ -133,7 +134,7 @@ public async Task DisposeAsync_Cancel_WaitersSession() await Task.Delay(5_000); for (var i = 0; i < maxSessionSize; i++) { - openSessions[i].Close(); + openSessions[i].Dispose(); } await disposeTask; @@ -182,9 +183,8 @@ public async Task StressTest_DisposeAsync_Close_Driver() try { - var session = await sessionSource.OpenSession(); + using var session = await sessionSource.OpenSession(); await Task.Yield(); - session.Close(); } catch (YdbException e) { @@ -226,7 +226,7 @@ public async Task IdleTimeout_MinSessionSize_CloseNumSessionsMinusMinSessionCoun foreach (var it in openSessions) { - it.Close(); + it.Dispose(); } await Task.Delay(TimeSpan.FromSeconds(idleTimeoutSeconds * 5)); // cleaning idle sessions @@ -240,7 +240,7 @@ public async Task IdleTimeout_MinSessionSize_CloseNumSessionsMinusMinSessionCoun foreach (var it in openSessionTasks) { - (await it).Close(); + (await it).Dispose(); } Assert.Equal(minSessionSize, mockFactory.NumSession); @@ -285,9 +285,11 @@ public async Task StressTest_HighContention_OpenClose() { while (!cts.IsCancellationRequested) { - var session = await sessionSource.OpenSession(cts.Token); - Assert.False(sessionIdIsBroken[session.SessionId()]); - session.Close(); + using (var session = await sessionSource.OpenSession(cts.Token)) + { + Assert.False(sessionIdIsBroken[session.SessionId()]); + } + await Task.Delay(Random.Shared.Next(maxSessionSize), cts.Token); } } @@ -314,21 +316,20 @@ public async Task Get_Session_From_Exhausted_Pool() }; var sessionSource = new PoolingSessionSource(mockFactory, settings); - var session = await sessionSource.OpenSession(); + using var session = await sessionSource.OpenSession(); var cts = new CancellationTokenSource(); cts.CancelAfter(500); Assert.Equal("The connection pool has been exhausted, either raise 'MaxSessionPool' (currently 1) " + "or 'CreateSessionTimeout' (currently 5 seconds) in your connection string.", (await Assert.ThrowsAsync(async () => await sessionSource.OpenSession(cts.Token))).Message); - session.Close(); Assert.Equal(1, mockFactory.NumSession); Assert.Equal(1, mockFactory.SessionOpenedCount); } [Fact] - public async Task Return_IsBroken_Session() + public async Task ReturnToPool_WhenSessionIsBroken_IsSkipped() { const int maxSessionSize = 10; var mockFactory = new MockPoolingSessionFactory(maxSessionSize) { IsBroken = _ => true }; @@ -341,8 +342,7 @@ public async Task Return_IsBroken_Session() for (var it = 0; it < maxSessionSize * 2; it++) { - var session = await sessionSource.OpenSession(); - session.Close(); + using var session = await sessionSource.OpenSession(); } Assert.Equal(0, mockFactory.NumSession); @@ -371,7 +371,7 @@ public async Task CheckIdleSession_WhenIsBrokenInStack_CreateNewSession() foreach (var session in openSessions) { - session.Close(); + session.Dispose(); } Assert.Equal(maxSessionSize, mockFactory.NumSession); @@ -379,85 +379,11 @@ public async Task CheckIdleSession_WhenIsBrokenInStack_CreateNewSession() isBroken = true; for (var it = 0; it < maxSessionSize; it++) { - var session = await sessionSource.OpenSession(); + using var session = await sessionSource.OpenSession(); isBroken = false; - session.Close(); } Assert.Equal(1, mockFactory.NumSession); Assert.Equal(maxSessionSize + 1, mockFactory.SessionOpenedCount); } } - -internal static class ISessionExtension -{ - internal static int SessionId(this ISession session) => ((MockPoolingSession)session).SessionId; -} - -internal class MockPoolingSessionFactory(int maxSessionSize) : IPoolingSessionFactory -{ - private int _sessionOpened; - private int _numSession; - - internal int SessionOpenedCount => Volatile.Read(ref _sessionOpened); - internal int NumSession => Volatile.Read(ref _numSession); - - internal Func Open { private get; init; } = _ => Task.CompletedTask; - internal Func IsBroken { private get; init; } = _ => false; - internal Func Dispose { private get; init; } = () => ValueTask.CompletedTask; - - public MockPoolingSession NewSession(PoolingSessionSource source) => - new(source, - async sessionCountOpened => - { - await Open(sessionCountOpened); - - Assert.True(Interlocked.Increment(ref _numSession) <= maxSessionSize); - - await Task.Yield(); - }, - () => - { - Assert.True(Interlocked.Decrement(ref _numSession) >= 0); - - return Task.CompletedTask; - }, - sessionNum => IsBroken(sessionNum), - Interlocked.Increment(ref _sessionOpened) - ); - - public ValueTask DisposeAsync() => Dispose(); -} - -internal class MockPoolingSession( - PoolingSessionSource source, - Func mockOpen, - Func mockDeleteSession, - Func mockIsBroken, - int sessionNum -) : PoolingSessionBase(source) -{ - public int SessionId => sessionNum; - public override IDriver Driver => null!; - public override bool IsBroken => mockIsBroken(sessionNum); - - internal override Task Open(CancellationToken cancellationToken) => mockOpen(sessionNum); - internal override Task DeleteSession() => mockDeleteSession(); - - public override ValueTask> ExecuteQuery( - string query, - Dictionary parameters, - GrpcRequestSettings settings, - TransactionControl? txControl - ) => throw new NotImplementedException(); - - public override Task CommitTransaction(string txId, CancellationToken cancellationToken = default) => - throw new NotImplementedException(); - - public override Task RollbackTransaction(string txId, CancellationToken cancellationToken = default) => - throw new NotImplementedException(); - - public override void OnNotSuccessStatusCode(StatusCode code) - { - } -} diff --git a/src/Ydb.Sdk/test/Ydb.Sdk.Ado.Tests/Session/PoolingSessionTests.cs b/src/Ydb.Sdk/test/Ydb.Sdk.Ado.Tests/Session/PoolingSessionTests.cs index 281b77a3..4021b992 100644 --- a/src/Ydb.Sdk/test/Ydb.Sdk.Ado.Tests/Session/PoolingSessionTests.cs +++ b/src/Ydb.Sdk/test/Ydb.Sdk.Ado.Tests/Session/PoolingSessionTests.cs @@ -42,6 +42,8 @@ public PoolingSessionTests() [InlineData(StatusCode.SessionExpired, true)] [InlineData(StatusCode.ClientTransportTimeout, true)] [InlineData(StatusCode.ClientTransportUnavailable, true)] + [InlineData(StatusCode.ClientTransportResourceExhausted, true)] + [InlineData(StatusCode.ClientTransportUnknown, true)] [InlineData(StatusCode.Overloaded, false)] public async Task OnNotSuccessStatusCode_WhenStatusCodeIsNotSuccess_UpdateIsBroken(StatusCode statusCode, bool isError) @@ -281,7 +283,7 @@ private TaskCompletionSource SetupAttachStream() _mockAttachStream.SetupSequence(attachStream => attachStream.MoveNextAsync(It.IsAny())) .ReturnsAsync(true) - .Returns(new ValueTask(tcsSecondMoveAttachStream.Task)); + .Returns(tcsSecondMoveAttachStream.Task); _mockAttachStream.SetupSequence(attachStream => attachStream.Current) .Returns(new SessionState { Status = StatusIds.Types.StatusCode.Success }) .Returns(new SessionState { Status = StatusIds.Types.StatusCode.BadSession }); diff --git a/src/Ydb.Sdk/test/Ydb.Sdk.Ado.Tests/Session/RetryableSessionTests.cs b/src/Ydb.Sdk/test/Ydb.Sdk.Ado.Tests/Session/RetryableSessionTests.cs new file mode 100644 index 00000000..c27012e0 --- /dev/null +++ b/src/Ydb.Sdk/test/Ydb.Sdk.Ado.Tests/Session/RetryableSessionTests.cs @@ -0,0 +1,111 @@ +using Xunit; +using Ydb.Query; +using Ydb.Sdk.Ado.Internal; +using Ydb.Sdk.Ado.RetryPolicy; +using Ydb.Sdk.Ado.Session; +using Ydb.Sdk.Ado.Tests.Utils; + +namespace Ydb.Sdk.Ado.Tests.Session; + +public class RetryableSessionTests +{ + [Fact] + public async Task MoveNextAsync_WhenRetryableStatus_RetriesUpToMaxAttempts_ThenThrows() + { + var factory = new MockPoolingSessionFactory(1) + { + IsBroken = _ => false, + ExecuteQuery = _ => new MockAsyncEnumerator( + new List { new() { Status = StatusIds.Types.StatusCode.BadSession } }) + }; + + var retryableSession = new RetryableSession(new PoolingSessionSource( + factory, + new YdbConnectionStringBuilder { MaxSessionPool = 1 }), + new YdbRetryPolicyExecutor(new YdbRetryPolicy(new YdbRetryPolicyConfig { MaxAttempts = 5 })) + ); + + var inMemoryStream = await retryableSession.ExecuteQuery( + "SELECT * FROM session", + new Dictionary(), + new GrpcRequestSettings(), + null + ); + + Assert.Equal(StatusCode.BadSession, + (await Assert.ThrowsAsync(async () => await inMemoryStream.MoveNextAsync())).Code); + Assert.Equal(5, factory.SessionOpenedCount); + } + + [Fact] + public async Task MoveNextAsync_WhenNonRetryable_ThrowsWithoutRetry() + { + var factory = new MockPoolingSessionFactory(1) + { + IsBroken = _ => false, + ExecuteQuery = _ => new MockAsyncEnumerator( + new List { new() { Status = StatusIds.Types.StatusCode.Unauthorized } }) + }; + + var retryableSession = new RetryableSession(new PoolingSessionSource( + factory, + new YdbConnectionStringBuilder { MaxSessionPool = 1 }), + new YdbRetryPolicyExecutor(new YdbRetryPolicy(new YdbRetryPolicyConfig { MaxAttempts = 5 })) + ); + + var inMemoryStream = await retryableSession.ExecuteQuery( + "SELECT * FROM session", + new Dictionary(), + new GrpcRequestSettings(), + null + ); + + Assert.Equal(StatusCode.Unauthorized, + (await Assert.ThrowsAsync(async () => await inMemoryStream.MoveNextAsync())).Code); + Assert.Equal(1, factory.SessionOpenedCount); + } + + [Fact] + public async Task MoveNextAsync_SucceedsOnThirdAttempt_StopsRetrying() + { + var attempt = 0; + var factory = new MockPoolingSessionFactory(1) + { + IsBroken = _ => false, + ExecuteQuery = _ => + { + attempt++; + if (attempt < 3) + return new MockAsyncEnumerator( + new List + { + new() { Status = StatusIds.Types.StatusCode.BadSession } + }); + return new MockAsyncEnumerator( + new List { new() { Status = StatusIds.Types.StatusCode.Success } } + ); + } + }; + + var retryableSession = new RetryableSession(new PoolingSessionSource( + factory, + new YdbConnectionStringBuilder { MaxSessionPool = 1 }), + new YdbRetryPolicyExecutor(new YdbRetryPolicy(new YdbRetryPolicyConfig { MaxAttempts = 5 })) + ); + var inMemoryStream = await retryableSession.ExecuteQuery( + "SELECT * FROM session", + new Dictionary(), + new GrpcRequestSettings(), + null + ); + + Assert.Throws(() => inMemoryStream.Current); + var hasItem = await inMemoryStream.MoveNextAsync(); + Assert.True(hasItem); + Assert.False(inMemoryStream.Current.Status.IsNotSuccess()); + Assert.False(await inMemoryStream.MoveNextAsync()); + Assert.Throws(() => inMemoryStream.Current); + + Assert.Equal(3, factory.SessionOpenedCount); + } +} diff --git a/src/Ydb.Sdk/test/Ydb.Sdk.Ado.Tests/Utils/MockAsyncEnumerator.cs b/src/Ydb.Sdk/test/Ydb.Sdk.Ado.Tests/Utils/MockAsyncEnumerator.cs index b87e34dc..63ad2e49 100644 --- a/src/Ydb.Sdk/test/Ydb.Sdk.Ado.Tests/Utils/MockAsyncEnumerator.cs +++ b/src/Ydb.Sdk/test/Ydb.Sdk.Ado.Tests/Utils/MockAsyncEnumerator.cs @@ -6,7 +6,7 @@ public class MockAsyncEnumerator(IEnumerable items) : IServerStream public T Current => _inner.Current; - public ValueTask MoveNextAsync(CancellationToken cancellationToken) => new(_inner.MoveNext()); + public Task MoveNextAsync(CancellationToken cancellationToken) => Task.FromResult(_inner.MoveNext()); public void Dispose() { diff --git a/src/Ydb.Sdk/test/Ydb.Sdk.Ado.Tests/YdbDataSourceTests.cs b/src/Ydb.Sdk/test/Ydb.Sdk.Ado.Tests/YdbDataSourceTests.cs index edcc8424..ee1a739a 100644 --- a/src/Ydb.Sdk/test/Ydb.Sdk.Ado.Tests/YdbDataSourceTests.cs +++ b/src/Ydb.Sdk/test/Ydb.Sdk.Ado.Tests/YdbDataSourceTests.cs @@ -233,7 +233,7 @@ public async Task ExecuteInTransactionAsync_WhenTLI_ThenRetriesUntilSuccess(int CommandText = $"UPDATE {tableName} SET count = @count + 1 WHERE id = 1", Parameters = { new YdbParameter { Value = count, ParameterName = "count" } } }.ExecuteNonQueryAsync(); - }, new YdbRetryPolicyConfig { FastBackoffBaseMs = 100 })); + }, new YdbRetryPolicyConfig { MaxAttempts = concurrentJob })); } await Task.WhenAll(tasks); @@ -246,4 +246,15 @@ public async Task ExecuteInTransactionAsync_WhenTLI_ThenRetriesUntilSuccess(int await new YdbCommand(ydbConnection) { CommandText = $"DROP TABLE {tableName}" }.ExecuteNonQueryAsync(); } } + + [Fact] + public async Task RetryableConnection_WhenOpenTransaction_Throws() + { + await using var ydbConnection = await _dataSource.OpenRetryableConnectionAsync(); + await using var transaction = ydbConnection.BeginTransaction(); + + Assert.Equal("Transactions are not supported in retryable session", + (await Assert.ThrowsAsync(async () => await new YdbCommand(ydbConnection) + { CommandText = "SELECT 1" }.ExecuteScalarAsync())).Message); + } } diff --git a/src/Ydb.Sdk/test/Ydb.Sdk.Ado.Tests/YdbParameterTests.cs b/src/Ydb.Sdk/test/Ydb.Sdk.Ado.Tests/YdbParameterTests.cs index afeae156..51252b73 100644 --- a/src/Ydb.Sdk/test/Ydb.Sdk.Ado.Tests/YdbParameterTests.cs +++ b/src/Ydb.Sdk/test/Ydb.Sdk.Ado.Tests/YdbParameterTests.cs @@ -218,6 +218,7 @@ public async Task Decimal_WhenDecimalIsScaleAndPrecision_ReturnDecimal(string? v [InlineData("-98.765", 5, 2)] [InlineData("100.01", 5, 1)] [InlineData("100000", 5, 0)] + [InlineData("12345678901", 10, 0)] public async Task Decimal_WhenNotRepresentableBySystemDecimal_ThrowsOverflowException(string value, byte precision, byte scale) { @@ -225,7 +226,7 @@ public async Task Decimal_WhenNotRepresentableBySystemDecimal_ThrowsOverflowExce var tableName = $"DecimalOverflowTable__{Random.Shared.Next()}"; var decimalValue = decimal.Parse(value, CultureInfo.InvariantCulture); await new YdbCommand(ydbConnection) - { CommandText = $"CREATE TABLE {tableName}(d Decimal(5,2), PRIMARY KEY(d))" } + { CommandText = $"CREATE TABLE {tableName}(d Decimal({precision}, {scale}), PRIMARY KEY(d))" } .ExecuteNonQueryAsync(); Assert.Equal($"Value {decimalValue} does not fit Decimal({precision}, {scale})", @@ -234,7 +235,7 @@ public async Task Decimal_WhenNotRepresentableBySystemDecimal_ThrowsOverflowExce CommandText = $"INSERT INTO {tableName}(d) VALUES (@d);", Parameters = { - new YdbParameter("d", DbType.Decimal, 123.456m) + new YdbParameter("d", DbType.Decimal, decimal.Parse(value, CultureInfo.InvariantCulture)) { Value = decimalValue, Precision = precision, Scale = scale } } }.ExecuteNonQueryAsync())).Message); diff --git a/src/Ydb.Sdk/test/Ydb.Sdk.Topic.Tests/ReaderUnitTests.cs b/src/Ydb.Sdk/test/Ydb.Sdk.Topic.Tests/ReaderUnitTests.cs index 8d5795f9..72ffa91d 100644 --- a/src/Ydb.Sdk/test/Ydb.Sdk.Topic.Tests/ReaderUnitTests.cs +++ b/src/Ydb.Sdk/test/Ydb.Sdk.Topic.Tests/ReaderUnitTests.cs @@ -27,7 +27,7 @@ public class ReaderUnitTests private readonly Mock _mockIDriver = new(); private readonly Mock _mockStream = new(); - private readonly ValueTask _lastMoveNext; + private readonly Task _lastMoveNext; public ReaderUnitTests() { @@ -40,7 +40,7 @@ public ReaderUnitTests() var tcsLastMoveNext = new TaskCompletionSource(); - _lastMoveNext = new ValueTask(tcsLastMoveNext.Task); + _lastMoveNext = tcsLastMoveNext.Task; _mockStream.Setup(stream => stream.RequestStreamComplete()).Returns(() => { tcsLastMoveNext.TrySetResult(false); @@ -97,8 +97,8 @@ public async Task Initialize_WhenFailWriteMessage_ShouldRetryInitializeAndReadTh _mockStream.SetupSequence(stream => stream.MoveNextAsync()) .ReturnsAsync(true) .ReturnsAsync(true) - .Returns(new ValueTask(tcsMoveNext.Task)) - .Returns(new ValueTask(tcsCommitMessage.Task)) + .Returns(tcsMoveNext.Task) + .Returns(tcsCommitMessage.Task) .Returns(_lastMoveNext); _mockStream.SetupSequence(stream => stream.Current) @@ -190,8 +190,8 @@ public async Task Initialize_WhenFailMoveNextAsync_ShouldRetryInitializeAndReadT .ThrowsAsync(new YdbException(new RpcException(Grpc.Core.Status.DefaultCancelled))) .ReturnsAsync(true) .ReturnsAsync(true) - .Returns(new ValueTask(tcsMoveNext.Task)) - .Returns(new ValueTask(tcsCommitMessage.Task)) + .Returns(tcsMoveNext.Task) + .Returns(tcsCommitMessage.Task) .Returns(_lastMoveNext); _mockStream.SetupSequence(stream => stream.Current) @@ -289,8 +289,8 @@ public async Task Initialize_WhenInitResponseStatusIsRetryable_ShouldRetryInitia .ReturnsAsync(true) .ReturnsAsync(true) .ReturnsAsync(true) - .Returns(new ValueTask(tcsMoveNext.Task)) - .Returns(new ValueTask(tcsCommitMessage.Task)) + .Returns(tcsMoveNext.Task) + .Returns(tcsCommitMessage.Task) .Returns(_lastMoveNext); _mockStream.SetupSequence(stream => stream.Current) @@ -445,8 +445,8 @@ public async Task .ThrowsAsync(new YdbException(new RpcException(Grpc.Core.Status.DefaultCancelled))) .ReturnsAsync(true) .ReturnsAsync(true) - .Returns(new ValueTask(tcsMoveNext.Task)) - .Returns(new ValueTask(tcsCommitMessage.Task)) + .Returns(tcsMoveNext.Task) + .Returns(tcsCommitMessage.Task) .Returns(_lastMoveNext); _mockStream.SetupSequence(stream => stream.Current) @@ -584,13 +584,13 @@ public async Task _mockStream.SetupSequence(stream => stream.MoveNextAsync()) .ReturnsAsync(true) .ReturnsAsync(true) - .Returns(new ValueTask(tcsMoveNextFirst.Task)) + .Returns(tcsMoveNextFirst.Task) .ReturnsAsync(true) .ReturnsAsync(true) - .Returns(new ValueTask(tcsMoveNextSecond.Task)) - .Returns(new ValueTask(tcsCommitMessage1.Task)) - .Returns(new ValueTask(tcsCommitMessage2.Task)) - .Returns(new ValueTask(tcsCommitMessage3.Task)) + .Returns(tcsMoveNextSecond.Task) + .Returns(tcsCommitMessage1.Task) + .Returns(tcsCommitMessage2.Task) + .Returns(tcsCommitMessage3.Task) .Returns(_lastMoveNext); _mockStream.SetupSequence(stream => stream.Current) @@ -740,12 +740,12 @@ public async Task _mockStream.SetupSequence(stream => stream.MoveNextAsync()) .ReturnsAsync(true) .ReturnsAsync(true) - .Returns(new ValueTask(tcsMoveNextFirst.Task)) - .Returns(new ValueTask(tcsMoveNextSecond.Task)) + .Returns(tcsMoveNextFirst.Task) + .Returns(tcsMoveNextSecond.Task) .ReturnsAsync(true) .ReturnsAsync(true) - .Returns(new ValueTask(tcsMoveNextThird.Task)) - .Returns(new ValueTask(tcsCommitMessage.Task)) + .Returns(tcsMoveNextThird.Task) + .Returns(tcsCommitMessage.Task) .Returns(_lastMoveNext); _mockStream.SetupSequence(stream => stream.Current) @@ -876,12 +876,12 @@ public async Task _mockStream.SetupSequence(stream => stream.MoveNextAsync()) .ReturnsAsync(true) .ReturnsAsync(true) - .Returns(new ValueTask(tcsMoveNextFirst.Task)) - .Returns(new ValueTask(tcsMoveNextSecond.Task)) + .Returns(tcsMoveNextFirst.Task) + .Returns(tcsMoveNextSecond.Task) .ReturnsAsync(true) .ReturnsAsync(true) - .Returns(new ValueTask(tcsMoveNextThird.Task)) - .Returns(new ValueTask(tcsCommitMessage.Task)) + .Returns(tcsMoveNextThird.Task) + .Returns(tcsCommitMessage.Task) .Returns(_lastMoveNext); _mockStream.SetupSequence(stream => stream.Current) @@ -1052,12 +1052,12 @@ public async Task _mockStream.SetupSequence(stream => stream.MoveNextAsync()) .ReturnsAsync(true) .ReturnsAsync(true) - .Returns(new ValueTask(tcsMoveNextFirst.Task)) - .Returns(new ValueTask(tcsMoveNextSecond.Task)) + .Returns(tcsMoveNextFirst.Task) + .Returns(tcsMoveNextSecond.Task) .ReturnsAsync(true) .ReturnsAsync(true) - .Returns(new ValueTask(tcsMoveNextThird.Task)) - .Returns(new ValueTask(tcsCommitMessage.Task)) + .Returns(tcsMoveNextThird.Task) + .Returns(tcsCommitMessage.Task) .Returns(_lastMoveNext); _mockStream.SetupSequence(stream => stream.Current) @@ -1185,12 +1185,12 @@ public async Task _mockStream.SetupSequence(stream => stream.MoveNextAsync()) .ReturnsAsync(true) .ReturnsAsync(true) - .Returns(new ValueTask(tcsMoveNextFirst.Task)) - .Returns(new ValueTask(tcsMoveNextSecond.Task)) + .Returns(tcsMoveNextFirst.Task) + .Returns(tcsMoveNextSecond.Task) .ReturnsAsync(true) .ReturnsAsync(true) - .Returns(new ValueTask(tcsMoveNextThird.Task)) - .Returns(new ValueTask(tcsCommitMessage.Task)) + .Returns(tcsMoveNextThird.Task) + .Returns(tcsCommitMessage.Task) .ReturnsAsync(true) .Returns(_lastMoveNext); @@ -1294,8 +1294,8 @@ public async Task _mockStream.SetupSequence(stream => stream.MoveNextAsync()) .ReturnsAsync(true) .ReturnsAsync(true) - .Returns(new ValueTask(tcsMoveNext.Task)) - .Returns(new ValueTask(stopPartitionSessionRequest.Task)) + .Returns(tcsMoveNext.Task) + .Returns(stopPartitionSessionRequest.Task) .ReturnsAsync(true) .Returns(_lastMoveNext); @@ -1384,7 +1384,7 @@ public async Task ReadAsync_WhenFailDeserializer_ThrowReaderExceptionAndInvokeRe _mockStream.SetupSequence(stream => stream.MoveNextAsync()) .ReturnsAsync(true) .ReturnsAsync(true) - .Returns(new ValueTask(tcsMoveNext.Task)) + .Returns(tcsMoveNext.Task) .Returns(_lastMoveNext); _mockStream.SetupSequence(stream => stream.Current) @@ -1459,8 +1459,8 @@ public async Task ReadAsync_WhenTokenIsUpdatedOneTime_SuccessUpdateToken() _mockStream.SetupSequence(stream => stream.MoveNextAsync()) .ReturnsAsync(true) .ReturnsAsync(true) - .Returns(new ValueTask(tcsMoveNext.Task)) - .Returns(new ValueTask(tcsCommitMessage.Task)) + .Returns(tcsMoveNext.Task) + .Returns(tcsCommitMessage.Task) .Returns(_lastMoveNext); _mockStream.SetupSequence(stream => stream.Current) @@ -1526,8 +1526,8 @@ public async Task DisposeAsync_WhenCommitMessagesInFlight_CompleteThisCommits() _mockStream.SetupSequence(stream => stream.MoveNextAsync()) .ReturnsAsync(true) .ReturnsAsync(true) - .Returns(new ValueTask(tcsMoveNext.Task)) - .Returns(new ValueTask(tcsCommitMessage.Task)) + .Returns(tcsMoveNext.Task) + .Returns(tcsCommitMessage.Task) .Returns(_lastMoveNext); _mockStream.SetupSequence(stream => stream.Current) diff --git a/src/Ydb.Sdk/test/Ydb.Sdk.Topic.Tests/WriterUnitTests.cs b/src/Ydb.Sdk/test/Ydb.Sdk.Topic.Tests/WriterUnitTests.cs index eb5e7fab..43c62233 100644 --- a/src/Ydb.Sdk/test/Ydb.Sdk.Topic.Tests/WriterUnitTests.cs +++ b/src/Ydb.Sdk/test/Ydb.Sdk.Topic.Tests/WriterUnitTests.cs @@ -19,7 +19,7 @@ public class WriterUnitTests { private readonly Mock _mockIDriver = new(); private readonly Mock _mockStream = new(); - private readonly ValueTask _lastMoveNext; + private readonly Task _lastMoveNext; public WriterUnitTests() { @@ -32,7 +32,7 @@ public WriterUnitTests() var tcsLastMoveNext = new TaskCompletionSource(); - _lastMoveNext = new ValueTask(tcsLastMoveNext.Task); + _lastMoveNext = tcsLastMoveNext.Task; _mockStream.Setup(stream => stream.RequestStreamComplete()).Returns(() => { tcsLastMoveNext.TrySetResult(false); @@ -92,7 +92,7 @@ public async Task Initialize_WhenStreamClosedByServer_ShouldRetryInitializeAndRe _mockStream.SetupSequence(stream => stream.MoveNextAsync()) .ReturnsAsync(false) .ReturnsAsync(true) - .Returns(() => new ValueTask(taskNextComplete.Task)) + .Returns(taskNextComplete.Task) .Returns(_lastMoveNext); SetupReadOneWriteAckMessage(); @@ -136,7 +136,7 @@ public async Task Initialize_WhenFailWriteMessage_ShouldRetryInitializeAndReturn }); _mockStream.SetupSequence(stream => stream.MoveNextAsync()) .ReturnsAsync(true) - .Returns(() => new ValueTask(taskNextComplete.Task)) + .Returns(taskNextComplete.Task) .Returns(_lastMoveNext); SetupReadOneWriteAckMessage(); @@ -184,7 +184,7 @@ public async Task Initialize_WhenFailMoveNextAsync_ShouldRetryInitializeAndRetur .ThrowsAsync(new YdbException( new RpcException(new Grpc.Core.Status(Grpc.Core.StatusCode.DeadlineExceeded, "Some message")))) .ReturnsAsync(true) - .Returns(() => new ValueTask(taskNextComplete.Task)) + .Returns(taskNextComplete.Task) .Returns(_lastMoveNext); SetupReadOneWriteAckMessage(); @@ -232,7 +232,7 @@ public async Task Initialize_WhenInitResponseStatusIsRetryable_ShouldRetryInitia _mockStream.SetupSequence(stream => stream.MoveNextAsync()) .ReturnsAsync(true) .ReturnsAsync(true) - .Returns(() => new ValueTask(taskNextComplete.Task)) + .Returns(taskNextComplete.Task) .Returns(_lastMoveNext); _mockStream.SetupSequence(stream => stream.Current) @@ -284,7 +284,7 @@ public async Task _mockStream.Setup(stream => stream.Write(It.IsAny())) .Returns(Task.CompletedTask); _mockStream.Setup(stream => stream.MoveNextAsync()) - .Returns(new ValueTask(true)); + .ReturnsAsync(true); _mockStream.Setup(stream => stream.Current) .Returns(new StreamWriteMessage.Types.FromServer { @@ -311,7 +311,7 @@ public async Task Initialize_WhenNotSupportedCodec_ThrowWriterExceptionOnWriteAs _mockStream.Setup(stream => stream.Write(It.IsAny())) .Returns(Task.CompletedTask); _mockStream.Setup(stream => stream.MoveNextAsync()) - .Returns(new ValueTask(true)); + .ReturnsAsync(true); _mockStream.Setup(stream => stream.Current) .Returns(new StreamWriteMessage.Types.FromServer { @@ -374,9 +374,9 @@ public async Task WriteAsync_WhenTransportExceptionOnWriteInWriterSession_Should }); _mockStream.SetupSequence(stream => stream.MoveNextAsync()) .ReturnsAsync(true) - .Returns(new ValueTask(moveTcs.Task)) + .Returns(moveTcs.Task) .ReturnsAsync(true) - .Returns(new ValueTask(moveTcsRetry.Task)) + .Returns(moveTcsRetry.Task) .Returns(_lastMoveNext); _mockStream.SetupSequence(stream => stream.Current) .Returns(new StreamWriteMessage.Types.FromServer @@ -450,7 +450,7 @@ public async Task WriteAsync_WhenTransportExceptionOnProcessingWriteAck_ShouldRe .ReturnsAsync(true) .ThrowsAsync(new YdbException(new RpcException(Grpc.Core.Status.DefaultCancelled))) .ReturnsAsync(true) - .Returns(() => new ValueTask(moveTcs.Task)) // retry init writer session + .Returns(moveTcs.Task) // retry init writer session .Returns(_lastMoveNext); _mockStream.SetupSequence(stream => stream.Current) .Returns(new StreamWriteMessage.Types.FromServer @@ -525,7 +525,7 @@ public async Task WriteAsync_WhenStreamIsClosingOnProcessingWriteAck_ShouldRecon .ReturnsAsync(true) .ReturnsAsync(false) .ReturnsAsync(true) - .Returns(() => new ValueTask(moveTcs.Task)) // retry init writer session + .Returns(moveTcs.Task) // retry init writer session .Returns(_lastMoveNext); _mockStream.SetupSequence(stream => stream.Current) .Returns(new StreamWriteMessage.Types.FromServer @@ -584,7 +584,7 @@ public async Task WriteAsync_WhenCancellationTokenIsClosed_ThrowCancellationExce { ProducerId = "producerId" }.Build(); var task = writer.WriteAsync(123L, cancellationTokenSource.Token); - cancellationTokenSource.Cancel(); + await cancellationTokenSource.CancelAsync(); await Assert.ThrowsAsync(() => task); } @@ -598,7 +598,7 @@ public async Task WriteAsync_WhenTaskIsAcceptedBeforeCancel_ReturnWrittenStatus( .Returns(Task.CompletedTask); _mockStream.SetupSequence(stream => stream.MoveNextAsync()) .ReturnsAsync(true) - .Returns(new ValueTask(nextCompleted.Task)) + .Returns(nextCompleted.Task) .Returns(_lastMoveNext); SetupReadOneWriteAckMessage(); @@ -608,7 +608,7 @@ public async Task WriteAsync_WhenTaskIsAcceptedBeforeCancel_ReturnWrittenStatus( var task = writer.WriteAsync(123L, cancellationTokenSource.Token); nextCompleted.SetResult(true); Assert.Equal(PersistenceStatus.Written, (await task).Status); - cancellationTokenSource.Cancel(); + await cancellationTokenSource.CancelAsync(); } @@ -667,9 +667,9 @@ public async Task WriteAsync_WhenInFlightBufferSendInInitialize_ReturnCompletedT _mockStream.SetupSequence(stream => stream.MoveNextAsync()) .ReturnsAsync(true) - .Returns(new ValueTask(moveTcs.Task)) + .Returns(moveTcs.Task) .ReturnsAsync(true) - .Returns(new ValueTask(moveTcsRetry.Task)) + .Returns(moveTcsRetry.Task) .Returns(_lastMoveNext); _mockStream.SetupSequence(stream => stream.Current) .Returns(new StreamWriteMessage.Types.FromServer @@ -707,7 +707,7 @@ public async Task WriteAsync_WhenInFlightBufferSendInInitialize_ReturnCompletedT var ctx = new CancellationTokenSource(); var runTaskWithCancel = writer.WriteAsync(100L, ctx.Token); await writeTcs1.Task; - ctx.Cancel(); // reconnect write invoke cancel on cancellation token + await ctx.CancelAsync(); // reconnect write invoke cancel on cancellation token // ReSharper disable once MethodSupportsCancellation var runTask1 = writer.WriteAsync(100L); @@ -803,9 +803,9 @@ public async Task WriteAsync_WhenTokenIsUpdatedOneTime_SuccessUpdateToken() _mockStream.SetupSequence(stream => stream.MoveNextAsync()) .ReturnsAsync(true) - .Returns(new ValueTask(writeTcs1.Task)) - .Returns(new ValueTask(writeTcs2.Task)) - .Returns(new ValueTask(writeTcs3.Task)) + .Returns(writeTcs1.Task) + .Returns(writeTcs2.Task) + .Returns(writeTcs3.Task) .Returns(_lastMoveNext); SetupReadOneWriteAckMessage() @@ -885,9 +885,9 @@ public async Task DisposeAsync_WhenInFlightMessages_WaitingInFlightMessages() }); _mockStream.SetupSequence(stream => stream.MoveNextAsync()) .ReturnsAsync(true) - .Returns(new ValueTask(writeTcs1.Task)) + .Returns(writeTcs1.Task) .ReturnsAsync(true) - .Returns(new ValueTask(moveTcsRetry.Task)) + .Returns(moveTcsRetry.Task) .Returns(_lastMoveNext); _mockStream.SetupSequence(stream => stream.Current)