Skip to content

Commit cd9b44e

Browse files
feat: YdbDataSource.OpenRetryableConnectionAsync() (#525)
1 parent fed905b commit cd9b44e

26 files changed

+611
-296
lines changed

slo/src/AdoNet/SloTableContext.cs

Lines changed: 18 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -9,13 +9,9 @@ public class SloTableContext : SloTableContext<YdbDataSource>
99
{
1010
protected override string Job => "AdoNet";
1111

12-
protected override YdbDataSource CreateClient(Config config) => new(
13-
new YdbConnectionStringBuilder(config.ConnectionString)
14-
{
15-
LoggerFactory = ISloContext.Factory,
16-
RetryPolicy = new YdbRetryPolicy(new YdbRetryPolicyConfig { EnableRetryIdempotence = true })
17-
}
18-
);
12+
protected override YdbDataSource CreateClient(Config config) => new YdbDataSourceBuilder(
13+
new YdbConnectionStringBuilder(config.ConnectionString) { LoggerFactory = ISloContext.Factory }
14+
) { RetryPolicy = new YdbRetryPolicy(new YdbRetryPolicyConfig { EnableRetryIdempotence = true }) }.Build();
1915

2016
protected override async Task Create(YdbDataSource client, int operationTimeout)
2117
{
@@ -96,34 +92,29 @@ await client.ExecuteAsync(async ydbConnection =>
9692
return attempts;
9793
}
9894

99-
protected override async Task<(int, object?)> Select(
95+
protected override async Task<object?> Select(
10096
YdbDataSource client,
10197
(Guid Guid, int Id) select,
10298
int readTimeout
10399
)
104100
{
105-
var attempts = 0;
106-
var policyResult = await client.ExecuteAsync(async ydbConnection =>
101+
await using var ydbConnection = await client.OpenRetryableConnectionAsync();
102+
103+
var ydbCommand = new YdbCommand(ydbConnection)
107104
{
108-
attempts++;
109-
var ydbCommand = new YdbCommand(ydbConnection)
105+
CommandText = $"""
106+
SELECT Guid, Id, PayloadStr, PayloadDouble, PayloadTimestamp
107+
FROM `{SloTable.Name}` WHERE Guid = @Guid AND Id = @Id;
108+
""",
109+
CommandTimeout = readTimeout,
110+
Parameters =
110111
{
111-
CommandText = $"""
112-
SELECT Guid, Id, PayloadStr, PayloadDouble, PayloadTimestamp
113-
FROM `{SloTable.Name}` WHERE Guid = @Guid AND Id = @Id;
114-
""",
115-
CommandTimeout = readTimeout,
116-
Parameters =
117-
{
118-
new YdbParameter { ParameterName = "Guid", DbType = DbType.Guid, Value = select.Guid },
119-
new YdbParameter { ParameterName = "Id", DbType = DbType.Int32, Value = select.Id }
120-
}
121-
};
122-
123-
return await ydbCommand.ExecuteScalarAsync();
124-
});
112+
new YdbParameter { ParameterName = "Guid", DbType = DbType.Guid, Value = select.Guid },
113+
new YdbParameter { ParameterName = "Id", DbType = DbType.Int32, Value = select.Id }
114+
}
115+
};
125116

126-
return (attempts, policyResult);
117+
return await ydbCommand.ExecuteScalarAsync();
127118
}
128119

129120
protected override async Task<int> SelectCount(YdbDataSource client)

slo/src/Dapper/SloTableContext.cs

Lines changed: 20 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -9,13 +9,9 @@ public class SloTableContext : SloTableContext<YdbDataSource>
99
{
1010
protected override string Job => "Dapper";
1111

12-
protected override YdbDataSource CreateClient(Config config) => new(
13-
new YdbConnectionStringBuilder(config.ConnectionString)
14-
{
15-
LoggerFactory = ISloContext.Factory,
16-
RetryPolicy = new YdbRetryPolicy(new YdbRetryPolicyConfig { EnableRetryIdempotence = true })
17-
}
18-
);
12+
protected override YdbDataSource CreateClient(Config config) => new YdbDataSourceBuilder(
13+
new YdbConnectionStringBuilder(config.ConnectionString) { LoggerFactory = ISloContext.Factory }
14+
) { RetryPolicy = new YdbRetryPolicy(new YdbRetryPolicyConfig { EnableRetryIdempotence = true }) }.Build();
1915

2016
protected override async Task Create(YdbDataSource client, int operationTimeout)
2117
{
@@ -28,45 +24,33 @@ await connection.ExecuteAsync($"""
2824
PayloadDouble Double,
2925
PayloadTimestamp Timestamp,
3026
PRIMARY KEY (Guid, Id)
31-
);
27+
);
3228
{SloTable.Options}
3329
""");
3430
}
3531

3632
protected override async Task<int> Save(YdbDataSource client, SloTable sloTable, int writeTimeout)
3733
{
38-
var attempt = 0;
39-
await client.ExecuteAsync(async ydbConnection =>
40-
{
41-
attempt++;
42-
await ydbConnection.ExecuteAsync(
43-
$"""
44-
UPSERT INTO `{SloTable.Name}` (Guid, Id, PayloadStr, PayloadDouble, PayloadTimestamp)
45-
VALUES (@Guid, @Id, @PayloadStr, @PayloadDouble, @PayloadTimestamp)
46-
""", sloTable);
47-
}
48-
);
49-
50-
return attempt;
34+
await using var ydbConnection = await client.OpenRetryableConnectionAsync();
35+
await ydbConnection.ExecuteAsync(
36+
$"""
37+
UPSERT INTO `{SloTable.Name}` (Guid, Id, PayloadStr, PayloadDouble, PayloadTimestamp)
38+
VALUES (@Guid, @Id, @PayloadStr, @PayloadDouble, @PayloadTimestamp)
39+
""", sloTable);
40+
41+
return 1;
5142
}
5243

53-
protected override async Task<(int, object?)> Select(YdbDataSource client, (Guid Guid, int Id) select,
44+
protected override async Task<object?> Select(YdbDataSource client, (Guid Guid, int Id) select,
5445
int readTimeout)
5546
{
56-
var attempts = 0;
57-
var policyResult = await client.ExecuteAsync(async ydbConnection =>
58-
{
59-
attempts++;
60-
return await ydbConnection.QueryFirstOrDefaultAsync<SloTable>(
61-
$"""
62-
SELECT Guid, Id, PayloadStr, PayloadDouble, PayloadTimestamp
63-
FROM `{SloTable.Name}` WHERE Guid = @Guid AND Id = @Id;
64-
""",
65-
new { select.Guid, select.Id }
66-
);
67-
});
68-
69-
return (attempts, policyResult);
47+
await using var ydbConnection = await client.OpenRetryableConnectionAsync();
48+
return await ydbConnection.QueryFirstOrDefaultAsync<SloTable>(
49+
$"""
50+
SELECT Guid, Id, PayloadStr, PayloadDouble, PayloadTimestamp
51+
FROM `{SloTable.Name}` WHERE Guid = @Guid AND Id = @Id;
52+
""", new { select.Guid, select.Id }
53+
);
7054
}
7155

7256
protected override async Task<int> SelectCount(YdbDataSource client)

slo/src/EF/SloTableContext.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -74,15 +74,15 @@ await executeStrategy.ExecuteAsync(async () =>
7474
return 0;
7575
}
7676

77-
protected override async Task<(int, object?)> Select(
77+
protected override async Task<object?> Select(
7878
PooledDbContextFactory<TableDbContext> client,
7979
(Guid Guid, int Id) select,
8080
int readTimeout
8181
)
8282
{
8383
await using var dbContext = await client.CreateDbContextAsync();
84-
return (0, await dbContext.SloEntities.FirstOrDefaultAsync(table =>
85-
table.Guid == select.Guid && table.Id == select.Id));
84+
return await dbContext.SloEntities.FirstOrDefaultAsync(table =>
85+
table.Guid == select.Guid && table.Id == select.Id);
8686
}
8787

8888
protected override async Task<int> SelectCount(PooledDbContextFactory<TableDbContext> client)

slo/src/Internal/SloTableContext.cs

Lines changed: 6 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ public abstract class SloTableContext<T> : ISloContext
2222
{
2323
private const int IntervalMs = 100;
2424

25-
protected static readonly ILogger Logger = ISloContext.Factory.CreateLogger<SloTableContext<T>>();
25+
private static readonly ILogger Logger = ISloContext.Factory.CreateLogger<SloTableContext<T>>();
2626

2727
private volatile int _maxId;
2828

@@ -129,8 +129,7 @@ public async Task Run(RunConfig runConfig)
129129
Logger.LogInformation("Run task is finished");
130130
return;
131131

132-
async Task ShootingTask(RateLimiter rateLimitPolicy, string operationType,
133-
Func<T, RunConfig, Task<int>> action)
132+
async Task ShootingTask(RateLimiter rateLimitPolicy, string operationType, Func<T, RunConfig, Task> action)
134133
{
135134
var metricFactory = Metrics.WithLabels(new Dictionary<string, string>
136135
{
@@ -182,11 +181,6 @@ async Task ShootingTask(RateLimiter rateLimitPolicy, string operationType,
182181
}
183182
);
184183

185-
var retryAttempts = metricFactory.CreateGauge(
186-
"sdk_retry_attempts",
187-
"Current retry attempts, categorized by operation type."
188-
);
189-
190184
var pendingOperations = metricFactory.CreateGauge(
191185
"sdk_pending_operations",
192186
"Current number of pending operations, categorized by type."
@@ -218,9 +212,8 @@ async Task ShootingTask(RateLimiter rateLimitPolicy, string operationType,
218212
var sw = Stopwatch.StartNew();
219213
try
220214
{
221-
var attempts = await action(client, runConfig);
215+
await action(client, runConfig);
222216
sw.Stop();
223-
retryAttempts.Set(attempts);
224217
operationsTotal.Inc();
225218
pendingOperations.Dec();
226219
operationsSuccessTotal.Inc();
@@ -248,7 +241,7 @@ async Task ShootingTask(RateLimiter rateLimitPolicy, string operationType,
248241
// return attempt count & StatusCode operation
249242
protected abstract Task<int> Save(T client, SloTable sloTable, int writeTimeout);
250243

251-
protected abstract Task<(int, object?)> Select(T client, (Guid Guid, int Id) select, int readTimeout);
244+
protected abstract Task<object?> Select(T client, (Guid Guid, int Id) select, int readTimeout);
252245

253246
protected abstract Task<int> SelectCount(T client);
254247

@@ -272,12 +265,10 @@ private Task<int> Save(T client, Config config)
272265
return Save(client, sloTable, config.WriteTimeout);
273266
}
274267

275-
private async Task<int> Select(T client, RunConfig config)
268+
private async Task Select(T client, RunConfig config)
276269
{
277270
var id = Random.Shared.Next(_maxId);
278-
var (attempts, _) = await Select(client, new ValueTuple<Guid, int>(GuidFromInt(id), id), config.ReadTimeout);
279-
280-
return attempts;
271+
_ = await Select(client, new ValueTuple<Guid, int>(GuidFromInt(id), id), config.ReadTimeout);
281272
}
282273

283274
private static Guid GuidFromInt(int value)

src/Ydb.Sdk/CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
- Feat ADO.NET: `YdbDataSource.OpenRetryableConnectionAsync` opens a retryable connection with automatic retries for transient failures.
12
- Fixed bug ADO.NET/PoolManager: `SemaphoreSlim.WaitAsync` over-release on cancellation.
23
- Feat ADO.NET: Mark `YdbConnection.State` as `Broken` when the underlying session is broken, including background deactivation.
34
- Feat ADO.NET: Added YdbDataSource `ExecuteAsync` and `ExecuteInTransaction` convenience methods.

src/Ydb.Sdk/src/Ado/PoolManager.cs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,14 +10,14 @@ internal static class PoolManager
1010
internal static readonly ConcurrentDictionary<string, IDriver> Drivers = new();
1111
internal static readonly ConcurrentDictionary<string, ISessionSource> Pools = new();
1212

13-
internal static async Task<ISession> GetSession(
13+
internal static async ValueTask<ISessionSource> Get(
1414
YdbConnectionStringBuilder settings,
1515
CancellationToken cancellationToken
1616
)
1717
{
1818
if (Pools.TryGetValue(settings.ConnectionString, out var sessionPool))
1919
{
20-
return await sessionPool.OpenSession(cancellationToken);
20+
return sessionPool;
2121
}
2222

2323
await SemaphoreSlim.WaitAsync(cancellationToken);
@@ -26,7 +26,7 @@ CancellationToken cancellationToken
2626
{
2727
if (Pools.TryGetValue(settings.ConnectionString, out var pool))
2828
{
29-
return await pool.OpenSession(cancellationToken);
29+
return pool;
3030
}
3131

3232
var driver = Drivers.TryGetValue(settings.GrpcConnectionString, out var cacheDriver) &&
@@ -40,7 +40,7 @@ CancellationToken cancellationToken
4040

4141
Pools[settings.ConnectionString] = newSessionPool;
4242

43-
return await newSessionPool.OpenSession(cancellationToken);
43+
return newSessionPool;
4444
}
4545
finally
4646
{

src/Ydb.Sdk/src/Ado/Session/ISession.cs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33

44
namespace Ydb.Sdk.Ado.Session;
55

6-
internal interface ISession
6+
internal interface ISession : IDisposable
77
{
88
IDriver Driver { get; }
99

@@ -21,6 +21,4 @@ ValueTask<IServerStream<ExecuteQueryResponsePart>> ExecuteQuery(
2121
Task RollbackTransaction(string txId, CancellationToken cancellationToken = default);
2222

2323
void OnNotSuccessStatusCode(StatusCode code);
24-
25-
void Close();
2624
}

src/Ydb.Sdk/src/Ado/Session/ImplicitSession.cs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -47,10 +47,9 @@ public void OnNotSuccessStatusCode(StatusCode code)
4747
{
4848
}
4949

50-
public void Close()
50+
public void Dispose()
5151
{
5252
}
5353

54-
private static YdbException NotSupportedTransaction =>
55-
new(StatusCode.BadRequest, "Transactions are not supported in implicit sessions");
54+
private static YdbException NotSupportedTransaction => new("Transactions are not supported in implicit sessions");
5655
}

src/Ydb.Sdk/src/Ado/Session/PoolingSession.cs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,9 @@ StatusCode.BadSession or
9999
StatusCode.SessionBusy or
100100
StatusCode.SessionExpired or
101101
StatusCode.ClientTransportTimeout or
102-
StatusCode.ClientTransportUnavailable)
102+
StatusCode.ClientTransportUnavailable or
103+
StatusCode.ClientTransportResourceExhausted or
104+
StatusCode.ClientTransportUnknown)
103105
{
104106
_logger.LogWarning("Session[{SessionId}] is deactivated. Reason Status: {Status}", SessionId, statusCode);
105107

src/Ydb.Sdk/src/Ado/Session/PoolingSessionSource.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -337,5 +337,5 @@ public abstract ValueTask<IServerStream<ExecuteQueryResponsePart>> ExecuteQuery(
337337

338338
public abstract void OnNotSuccessStatusCode(StatusCode code);
339339

340-
public void Close() => _source.Return((T)this);
340+
public void Dispose() => _source.Return((T)this);
341341
}

0 commit comments

Comments
 (0)