Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 18 additions & 27 deletions slo/src/AdoNet/SloTableContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,9 @@ public class SloTableContext : SloTableContext<YdbDataSource>
{
protected override string Job => "AdoNet";

protected override YdbDataSource CreateClient(Config config) => new(
new YdbConnectionStringBuilder(config.ConnectionString)
{
LoggerFactory = ISloContext.Factory,
RetryPolicy = new YdbRetryPolicy(new YdbRetryPolicyConfig { EnableRetryIdempotence = true })
}
);
protected override YdbDataSource CreateClient(Config config) => new YdbDataSourceBuilder(
new YdbConnectionStringBuilder(config.ConnectionString) { LoggerFactory = ISloContext.Factory }
) { RetryPolicy = new YdbRetryPolicy(new YdbRetryPolicyConfig { EnableRetryIdempotence = true }) }.Build();

protected override async Task Create(YdbDataSource client, int operationTimeout)
{
Expand Down Expand Up @@ -96,34 +92,29 @@ await client.ExecuteAsync(async ydbConnection =>
return attempts;
}

protected override async Task<(int, object?)> Select(
protected override async Task<object?> Select(
YdbDataSource client,
(Guid Guid, int Id) select,
int readTimeout
)
{
var attempts = 0;
var policyResult = await client.ExecuteAsync(async ydbConnection =>
await using var ydbConnection = await client.OpenRetryableConnectionAsync();

var ydbCommand = new YdbCommand(ydbConnection)
{
attempts++;
var ydbCommand = new YdbCommand(ydbConnection)
CommandText = $"""
SELECT Guid, Id, PayloadStr, PayloadDouble, PayloadTimestamp
FROM `{SloTable.Name}` WHERE Guid = @Guid AND Id = @Id;
""",
CommandTimeout = readTimeout,
Parameters =
{
CommandText = $"""
SELECT Guid, Id, PayloadStr, PayloadDouble, PayloadTimestamp
FROM `{SloTable.Name}` WHERE Guid = @Guid AND Id = @Id;
""",
CommandTimeout = readTimeout,
Parameters =
{
new YdbParameter { ParameterName = "Guid", DbType = DbType.Guid, Value = select.Guid },
new YdbParameter { ParameterName = "Id", DbType = DbType.Int32, Value = select.Id }
}
};

return await ydbCommand.ExecuteScalarAsync();
});
new YdbParameter { ParameterName = "Guid", DbType = DbType.Guid, Value = select.Guid },
new YdbParameter { ParameterName = "Id", DbType = DbType.Int32, Value = select.Id }
}
};

return (attempts, policyResult);
return await ydbCommand.ExecuteScalarAsync();
}

protected override async Task<int> SelectCount(YdbDataSource client)
Expand Down
56 changes: 20 additions & 36 deletions slo/src/Dapper/SloTableContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,9 @@ public class SloTableContext : SloTableContext<YdbDataSource>
{
protected override string Job => "Dapper";

protected override YdbDataSource CreateClient(Config config) => new(
new YdbConnectionStringBuilder(config.ConnectionString)
{
LoggerFactory = ISloContext.Factory,
RetryPolicy = new YdbRetryPolicy(new YdbRetryPolicyConfig { EnableRetryIdempotence = true })
}
);
protected override YdbDataSource CreateClient(Config config) => new YdbDataSourceBuilder(
new YdbConnectionStringBuilder(config.ConnectionString) { LoggerFactory = ISloContext.Factory }
) { RetryPolicy = new YdbRetryPolicy(new YdbRetryPolicyConfig { EnableRetryIdempotence = true }) }.Build();

protected override async Task Create(YdbDataSource client, int operationTimeout)
{
Expand All @@ -28,45 +24,33 @@ await connection.ExecuteAsync($"""
PayloadDouble Double,
PayloadTimestamp Timestamp,
PRIMARY KEY (Guid, Id)
);
);
{SloTable.Options}
""");
}

protected override async Task<int> Save(YdbDataSource client, SloTable sloTable, int writeTimeout)
{
var attempt = 0;
await client.ExecuteAsync(async ydbConnection =>
{
attempt++;
await ydbConnection.ExecuteAsync(
$"""
UPSERT INTO `{SloTable.Name}` (Guid, Id, PayloadStr, PayloadDouble, PayloadTimestamp)
VALUES (@Guid, @Id, @PayloadStr, @PayloadDouble, @PayloadTimestamp)
""", sloTable);
}
);

return attempt;
await using var ydbConnection = await client.OpenRetryableConnectionAsync();
await ydbConnection.ExecuteAsync(
$"""
UPSERT INTO `{SloTable.Name}` (Guid, Id, PayloadStr, PayloadDouble, PayloadTimestamp)
VALUES (@Guid, @Id, @PayloadStr, @PayloadDouble, @PayloadTimestamp)
""", sloTable);

return 1;
}

protected override async Task<(int, object?)> Select(YdbDataSource client, (Guid Guid, int Id) select,
protected override async Task<object?> Select(YdbDataSource client, (Guid Guid, int Id) select,
int readTimeout)
{
var attempts = 0;
var policyResult = await client.ExecuteAsync(async ydbConnection =>
{
attempts++;
return await ydbConnection.QueryFirstOrDefaultAsync<SloTable>(
$"""
SELECT Guid, Id, PayloadStr, PayloadDouble, PayloadTimestamp
FROM `{SloTable.Name}` WHERE Guid = @Guid AND Id = @Id;
""",
new { select.Guid, select.Id }
);
});

return (attempts, policyResult);
await using var ydbConnection = await client.OpenRetryableConnectionAsync();
return await ydbConnection.QueryFirstOrDefaultAsync<SloTable>(
$"""
SELECT Guid, Id, PayloadStr, PayloadDouble, PayloadTimestamp
FROM `{SloTable.Name}` WHERE Guid = @Guid AND Id = @Id;
""", new { select.Guid, select.Id }
);
}

protected override async Task<int> SelectCount(YdbDataSource client)
Expand Down
6 changes: 3 additions & 3 deletions slo/src/EF/SloTableContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -74,15 +74,15 @@ await executeStrategy.ExecuteAsync(async () =>
return 0;
}

protected override async Task<(int, object?)> Select(
protected override async Task<object?> Select(
PooledDbContextFactory<TableDbContext> client,
(Guid Guid, int Id) select,
int readTimeout
)
{
await using var dbContext = await client.CreateDbContextAsync();
return (0, await dbContext.SloEntities.FirstOrDefaultAsync(table =>
table.Guid == select.Guid && table.Id == select.Id));
return await dbContext.SloEntities.FirstOrDefaultAsync(table =>
table.Guid == select.Guid && table.Id == select.Id);
}

protected override async Task<int> SelectCount(PooledDbContextFactory<TableDbContext> client)
Expand Down
21 changes: 6 additions & 15 deletions slo/src/Internal/SloTableContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ public abstract class SloTableContext<T> : ISloContext
{
private const int IntervalMs = 100;

protected static readonly ILogger Logger = ISloContext.Factory.CreateLogger<SloTableContext<T>>();
private static readonly ILogger Logger = ISloContext.Factory.CreateLogger<SloTableContext<T>>();

private volatile int _maxId;

Expand Down Expand Up @@ -129,8 +129,7 @@ public async Task Run(RunConfig runConfig)
Logger.LogInformation("Run task is finished");
return;

async Task ShootingTask(RateLimiter rateLimitPolicy, string operationType,
Func<T, RunConfig, Task<int>> action)
async Task ShootingTask(RateLimiter rateLimitPolicy, string operationType, Func<T, RunConfig, Task> action)
{
var metricFactory = Metrics.WithLabels(new Dictionary<string, string>
{
Expand Down Expand Up @@ -182,11 +181,6 @@ async Task ShootingTask(RateLimiter rateLimitPolicy, string operationType,
}
);

var retryAttempts = metricFactory.CreateGauge(
"sdk_retry_attempts",
"Current retry attempts, categorized by operation type."
);

var pendingOperations = metricFactory.CreateGauge(
"sdk_pending_operations",
"Current number of pending operations, categorized by type."
Expand Down Expand Up @@ -218,9 +212,8 @@ async Task ShootingTask(RateLimiter rateLimitPolicy, string operationType,
var sw = Stopwatch.StartNew();
try
{
var attempts = await action(client, runConfig);
await action(client, runConfig);
sw.Stop();
retryAttempts.Set(attempts);
operationsTotal.Inc();
pendingOperations.Dec();
operationsSuccessTotal.Inc();
Expand Down Expand Up @@ -248,7 +241,7 @@ async Task ShootingTask(RateLimiter rateLimitPolicy, string operationType,
// return attempt count & StatusCode operation
protected abstract Task<int> Save(T client, SloTable sloTable, int writeTimeout);

protected abstract Task<(int, object?)> Select(T client, (Guid Guid, int Id) select, int readTimeout);
protected abstract Task<object?> Select(T client, (Guid Guid, int Id) select, int readTimeout);

protected abstract Task<int> SelectCount(T client);

Expand All @@ -272,12 +265,10 @@ private Task<int> Save(T client, Config config)
return Save(client, sloTable, config.WriteTimeout);
}

private async Task<int> Select(T client, RunConfig config)
private async Task Select(T client, RunConfig config)
{
var id = Random.Shared.Next(_maxId);
var (attempts, _) = await Select(client, new ValueTuple<Guid, int>(GuidFromInt(id), id), config.ReadTimeout);

return attempts;
_ = await Select(client, new ValueTuple<Guid, int>(GuidFromInt(id), id), config.ReadTimeout);
}

private static Guid GuidFromInt(int value)
Expand Down
1 change: 1 addition & 0 deletions src/Ydb.Sdk/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
- Feat ADO.NET: `YdbDataSource.OpenRetryableConnectionAsync` opens a retryable connection with automatic retries for transient failures.
- Fixed bug ADO.NET/PoolManager: `SemaphoreSlim.WaitAsync` over-release on cancellation.
- Feat ADO.NET: Mark `YdbConnection.State` as `Broken` when the underlying session is broken, including background deactivation.
- Feat ADO.NET: Added YdbDataSource `ExecuteAsync` and `ExecuteInTransaction` convenience methods.
Expand Down
8 changes: 4 additions & 4 deletions src/Ydb.Sdk/src/Ado/PoolManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,14 @@ internal static class PoolManager
internal static readonly ConcurrentDictionary<string, IDriver> Drivers = new();
internal static readonly ConcurrentDictionary<string, ISessionSource> Pools = new();

internal static async Task<ISession> GetSession(
internal static async ValueTask<ISessionSource> Get(
YdbConnectionStringBuilder settings,
CancellationToken cancellationToken
)
{
if (Pools.TryGetValue(settings.ConnectionString, out var sessionPool))
{
return await sessionPool.OpenSession(cancellationToken);
return sessionPool;
}

await SemaphoreSlim.WaitAsync(cancellationToken);
Expand All @@ -26,7 +26,7 @@ CancellationToken cancellationToken
{
if (Pools.TryGetValue(settings.ConnectionString, out var pool))
{
return await pool.OpenSession(cancellationToken);
return pool;
}

var driver = Drivers.TryGetValue(settings.GrpcConnectionString, out var cacheDriver) &&
Expand All @@ -40,7 +40,7 @@ CancellationToken cancellationToken

Pools[settings.ConnectionString] = newSessionPool;

return await newSessionPool.OpenSession(cancellationToken);
return newSessionPool;
}
finally
{
Expand Down
4 changes: 1 addition & 3 deletions src/Ydb.Sdk/src/Ado/Session/ISession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

namespace Ydb.Sdk.Ado.Session;

internal interface ISession
internal interface ISession : IDisposable
{
IDriver Driver { get; }

Expand All @@ -21,6 +21,4 @@ ValueTask<IServerStream<ExecuteQueryResponsePart>> ExecuteQuery(
Task RollbackTransaction(string txId, CancellationToken cancellationToken = default);

void OnNotSuccessStatusCode(StatusCode code);

void Close();
}
5 changes: 2 additions & 3 deletions src/Ydb.Sdk/src/Ado/Session/ImplicitSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,9 @@ public void OnNotSuccessStatusCode(StatusCode code)
{
}

public void Close()
public void Dispose()
{
}

private static YdbException NotSupportedTransaction =>
new(StatusCode.BadRequest, "Transactions are not supported in implicit sessions");
private static YdbException NotSupportedTransaction => new("Transactions are not supported in implicit sessions");
}
4 changes: 3 additions & 1 deletion src/Ydb.Sdk/src/Ado/Session/PoolingSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,9 @@ StatusCode.BadSession or
StatusCode.SessionBusy or
StatusCode.SessionExpired or
StatusCode.ClientTransportTimeout or
StatusCode.ClientTransportUnavailable)
StatusCode.ClientTransportUnavailable or
StatusCode.ClientTransportResourceExhausted or
StatusCode.ClientTransportUnknown)
{
_logger.LogWarning("Session[{SessionId}] is deactivated. Reason Status: {Status}", SessionId, statusCode);

Expand Down
2 changes: 1 addition & 1 deletion src/Ydb.Sdk/src/Ado/Session/PoolingSessionSource.cs
Original file line number Diff line number Diff line change
Expand Up @@ -337,5 +337,5 @@ public abstract ValueTask<IServerStream<ExecuteQueryResponsePart>> ExecuteQuery(

public abstract void OnNotSuccessStatusCode(StatusCode code);

public void Close() => _source.Return((T)this);
public void Dispose() => _source.Return((T)this);
}
Loading
Loading