Skip to content

Commit 8949ae2

Browse files
Feat: Implement YdbRetryPolicy with AWS-inspired Exponential Backoff and Jitter (#515)
1 parent f488056 commit 8949ae2

File tree

14 files changed

+305
-51
lines changed

14 files changed

+305
-51
lines changed

examples/Ydb.Sdk.AdoNet.QuickStart/Program.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -364,7 +364,8 @@ ORDER BY -- Sorting of the results.
364364

365365
while (await ydbDataReader.ReadAsync())
366366
{
367-
_logger.LogInformation("season_title: {}, series_title: {}, series_id: {}, season_id: {}",
367+
_logger.LogInformation("season_title: {SeasonTitle}, series_title: {SeriesTitle}, " +
368+
"series_id: {SeriesId}, season_id: {SeasonId}",
368369
ydbDataReader.GetString("season_title"), ydbDataReader.GetString("series_title"),
369370
ydbDataReader.GetUint64(2), ydbDataReader.GetUint64(3));
370371
}

src/EFCore.Ydb/src/Storage/Internal/YdbDatabaseCreator.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ public override async Task DeleteAsync(CancellationToken cancellationToken = def
7878
.GetSchemaAsync("Tables", [null, "TABLE"], cancellationToken);
7979

8080
var dropTableOperations = (from DataRow row in dataTable.Rows
81-
select new DropTableOperation { Name = row["table_name"].ToString() }).ToList();
81+
select new DropTableOperation { Name = row["table_name"].ToString()! }).ToList();
8282

8383
await Dependencies.MigrationCommandExecutor.ExecuteNonQueryAsync(Dependencies.MigrationsSqlGenerator
8484
.Generate(dropTableOperations), connection, cancellationToken);

src/Ydb.Sdk/CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
- Feat: Implement `YdbRetryPolicy` with AWS-inspired Exponential Backoff and Jitter.
12
- Dev: LogLevel `Warning` -> `Debug` on DeleteSession has been `RpcException`.
23

34
## v0.22.0
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
namespace Ydb.Sdk.Ado.Internal;
2+
3+
public interface IRandom
4+
{
5+
public int Next(int maxValue);
6+
}
7+
8+
internal class ThreadLocalRandom : IRandom
9+
{
10+
internal static readonly ThreadLocalRandom Instance = new();
11+
12+
public int Next(int maxValue) => Random.Shared.Next(maxValue);
13+
}

src/Ydb.Sdk/src/Ado/Internal/StatusCodeUtils.cs

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,25 @@ public static class StatusCodeUtils
1616
internal static StatusCode Code(this StatusIds.Types.StatusCode statusCode) =>
1717
Enum.IsDefined(typeof(StatusCode), (int)statusCode) ? (StatusCode)statusCode : StatusCode.Unavailable;
1818

19-
internal static bool IsNotSuccess(this StatusIds.Types.StatusCode code) =>
20-
code != StatusIds.Types.StatusCode.Success;
19+
internal static bool IsNotSuccess(this StatusIds.Types.StatusCode statusCode) =>
20+
statusCode != StatusIds.Types.StatusCode.Success;
2121

2222
internal static string ToMessage(this StatusCode statusCode, IReadOnlyList<IssueMessage> issueMessages) =>
2323
issueMessages.Count == 0
2424
? $"Status: {statusCode}"
2525
: $"Status: {statusCode}, Issues:{Environment.NewLine}{issueMessages.IssuesToString()}";
26+
27+
internal static bool IsTransient(this StatusCode statusCode) => statusCode is
28+
StatusCode.BadSession or
29+
StatusCode.SessionBusy or
30+
StatusCode.Aborted or
31+
StatusCode.Unavailable or
32+
StatusCode.Overloaded or
33+
StatusCode.SessionExpired or
34+
StatusCode.ClientTransportResourceExhausted;
35+
36+
internal static bool IsTransientWhenIdempotent(this StatusCode statusCode) => statusCode.IsTransient() ||
37+
statusCode is StatusCode.Undetermined or
38+
StatusCode.ClientTransportUnknown or
39+
StatusCode.ClientTransportUnavailable;
2640
}
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
namespace Ydb.Sdk.Ado.RetryPolicy;
2+
3+
public interface IRetryPolicy
4+
{
5+
public TimeSpan? GetNextDelay(YdbException ydbException, int attempt);
6+
}
Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
using Ydb.Sdk.Ado.Internal;
2+
3+
namespace Ydb.Sdk.Ado.RetryPolicy;
4+
5+
/// <summary>
6+
/// See <a href="https://aws.amazon.com/ru/blogs/architecture/exponential-backoff-and-jitter/">AWS paper</a>
7+
/// </summary>
8+
public class YdbRetryPolicy : IRetryPolicy
9+
{
10+
public static readonly YdbRetryPolicy Default = new(YdbRetryPolicyConfig.Default);
11+
12+
private readonly int _maxAttempt;
13+
private readonly int _fastBackoffBaseMs;
14+
private readonly int _slowBackoffBaseMs;
15+
private readonly int _fastCeiling;
16+
private readonly int _slowCeiling;
17+
private readonly int _fastCapBackoffMs;
18+
private readonly int _slowCapBackoffMs;
19+
private readonly bool _enableRetryIdempotence;
20+
private readonly IRandom _random;
21+
22+
public YdbRetryPolicy(YdbRetryPolicyConfig config)
23+
{
24+
_maxAttempt = config.MaxAttempt;
25+
_fastBackoffBaseMs = config.FastBackoffBaseMs;
26+
_slowBackoffBaseMs = config.SlowBackoffBaseMs;
27+
_fastCeiling = (int)Math.Ceiling(Math.Log(config.FastCapBackoffMs + 1, 2));
28+
_slowCeiling = (int)Math.Ceiling(Math.Log(config.SlowCapBackoffMs + 1, 2));
29+
_fastCapBackoffMs = config.FastCapBackoffMs;
30+
_slowCapBackoffMs = config.SlowCapBackoffMs;
31+
_enableRetryIdempotence = config.EnableRetryIdempotence;
32+
_random = ThreadLocalRandom.Instance;
33+
}
34+
35+
internal YdbRetryPolicy(YdbRetryPolicyConfig config, IRandom random) : this(config)
36+
{
37+
_random = random;
38+
}
39+
40+
public TimeSpan? GetNextDelay(YdbException ydbException, int attempt)
41+
{
42+
if (attempt >= _maxAttempt || (!_enableRetryIdempotence && !ydbException.IsTransient))
43+
return null;
44+
45+
return ydbException.Code switch
46+
{
47+
StatusCode.BadSession or StatusCode.SessionBusy => TimeSpan.Zero,
48+
StatusCode.Aborted or StatusCode.Undetermined =>
49+
FullJitter(_fastBackoffBaseMs, _fastCapBackoffMs, _fastCeiling, attempt, _random),
50+
StatusCode.Unavailable or StatusCode.ClientTransportUnknown or StatusCode.ClientTransportUnavailable =>
51+
EqualJitter(_fastBackoffBaseMs, _fastCapBackoffMs, _fastCeiling, attempt, _random),
52+
StatusCode.Overloaded or StatusCode.ClientTransportResourceExhausted =>
53+
EqualJitter(_slowBackoffBaseMs, _slowCapBackoffMs, _slowCeiling, attempt, _random),
54+
_ => null
55+
};
56+
}
57+
58+
private static TimeSpan FullJitter(int backoffBaseMs, int capMs, int ceiling, int attempt, IRandom random) =>
59+
TimeSpan.FromMilliseconds(random.Next(CalculateBackoff(backoffBaseMs, capMs, ceiling, attempt) + 1));
60+
61+
private static TimeSpan EqualJitter(int backoffBaseMs, int capMs, int ceiling, int attempt, IRandom random)
62+
{
63+
var calculatedBackoff = CalculateBackoff(backoffBaseMs, capMs, ceiling, attempt);
64+
var temp = calculatedBackoff / 2;
65+
66+
return TimeSpan.FromMilliseconds(temp + calculatedBackoff % 2 + random.Next(temp + 1));
67+
}
68+
69+
private static int CalculateBackoff(int backoffBaseMs, int capMs, int ceiling, int attempt) =>
70+
Math.Min(backoffBaseMs * (1 << Math.Min(ceiling, attempt)), capMs);
71+
}
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
namespace Ydb.Sdk.Ado.RetryPolicy;
2+
3+
public class YdbRetryPolicyConfig
4+
{
5+
public static readonly YdbRetryPolicyConfig Default = new();
6+
7+
public int MaxAttempt { get; init; } = 10;
8+
9+
public int FastBackoffBaseMs { get; init; } = 5;
10+
11+
public int SlowBackoffBaseMs { get; init; } = 50;
12+
13+
public int FastCapBackoffMs { get; init; } = 500;
14+
15+
public int SlowCapBackoffMs { get; init; } = 5_000;
16+
17+
public bool EnableRetryIdempotence { get; init; } = false;
18+
}

src/Ydb.Sdk/src/Ado/YdbConnection.cs

Lines changed: 18 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -53,23 +53,6 @@ public YdbConnection(YdbConnectionStringBuilder connectionStringBuilder)
5353
ConnectionStringBuilder = connectionStringBuilder;
5454
}
5555

56-
public IBulkUpsertImporter BeginBulkUpsertImport(
57-
string name,
58-
IReadOnlyList<string> columns,
59-
CancellationToken cancellationToken = default)
60-
{
61-
ThrowIfConnectionClosed();
62-
if (CurrentTransaction is { Completed: false })
63-
throw new InvalidOperationException("BulkUpsert cannot be used inside an active transaction.");
64-
65-
var database = ConnectionStringBuilder.Database.TrimEnd('/');
66-
var tablePath = name.StartsWith(database) ? name : $"{database}/{name}";
67-
68-
var maxBytes = ConnectionStringBuilder.MaxSendMessageSize;
69-
70-
return new BulkUpsertImporter(Session.Driver, tablePath, columns, maxBytes, cancellationToken);
71-
}
72-
7356
protected override YdbTransaction BeginDbTransaction(IsolationLevel isolationLevel)
7457
{
7558
ThrowIfConnectionClosed();
@@ -279,4 +262,22 @@ public override async ValueTask DisposeAsync()
279262
/// to their pool.
280263
/// </summary>
281264
public static Task ClearAllPools() => PoolManager.ClearAllPools();
265+
266+
public IBulkUpsertImporter BeginBulkUpsertImport(
267+
string name,
268+
IReadOnlyList<string> columns,
269+
CancellationToken cancellationToken = default)
270+
{
271+
ThrowIfConnectionClosed();
272+
273+
if (CurrentTransaction is { Completed: false })
274+
throw new InvalidOperationException("BulkUpsert cannot be used inside an active transaction.");
275+
276+
var database = ConnectionStringBuilder.Database.TrimEnd('/');
277+
var tablePath = name.StartsWith(database) ? name : $"{database}/{name}";
278+
279+
var maxBytes = ConnectionStringBuilder.MaxSendMessageSize;
280+
281+
return new BulkUpsertImporter(Session.Driver, tablePath, columns, maxBytes, cancellationToken);
282+
}
282283
}

src/Ydb.Sdk/src/Ado/YdbException.cs

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@ internal YdbException(RpcException e) : this(e.Status.Code(), "Transport RPC cal
1818
internal static YdbException FromServer(StatusIds.Types.StatusCode statusCode, IReadOnlyList<IssueMessage> issues)
1919
{
2020
var code = statusCode.Code();
21-
2221
var message = code.ToMessage(issues);
2322

2423
return new YdbException(code, message);
@@ -28,10 +27,8 @@ internal YdbException(StatusCode statusCode, string message, Exception? innerExc
2827
: base(message, innerException)
2928
{
3029
Code = statusCode;
31-
var policy = RetrySettings.DefaultInstance.GetRetryRule(statusCode).Policy;
32-
33-
IsTransient = policy == RetryPolicy.Unconditional;
34-
IsTransientWhenIdempotent = policy != RetryPolicy.None;
30+
IsTransient = statusCode.IsTransient();
31+
IsTransientWhenIdempotent = statusCode.IsTransientWhenIdempotent();
3532
// TODO: Add SQLSTATE message with order with https://en.wikipedia.org/wiki/SQLSTATE
3633
}
3734

0 commit comments

Comments
 (0)