diff --git a/examples/Ydb.Sdk.QueryService.QuickStart/DataUtils.cs b/examples/Ydb.Sdk.QueryService.QuickStart/DataUtils.cs deleted file mode 100644 index b65a7967..00000000 --- a/examples/Ydb.Sdk.QueryService.QuickStart/DataUtils.cs +++ /dev/null @@ -1,166 +0,0 @@ -using Ydb.Sdk.Value; - -namespace Ydb.Sdk.Examples; - -public record Series(ulong SeriesId, string Title, DateTime ReleaseDate, string Info) -{ - public static Series FromRow(Value.ResultSet.Row row) => - new( - SeriesId: (ulong)row["series_id"].GetOptionalUint64()!, - Title: (string)row["title"]!, - ReleaseDate: (DateTime)row["release_date"].GetOptionalDate()!, - Info: (string)row["series_info"]! - ); -} - -public record Season(ulong SeriesId, ulong SeasonId, string Title, DateTime FirstAired, DateTime LastAired); - -public record Episode(ulong SeriesId, ulong SeasonId, ulong EpisodeId, string Title, DateTime AirDate) -{ - public static Episode FromRow(Value.ResultSet.Row row) => - new( - SeriesId: (ulong)row["series_id"].GetOptionalUint64()!, - SeasonId: (ulong)row["season_id"].GetOptionalUint64()!, - EpisodeId: (ulong)row["episode_id"].GetOptionalUint64()!, - Title: (string)row["title"]!, - AirDate: (DateTime)row["air_date"].GetOptionalDate()! - ); -} - -public static class DataUtils -{ - public static Dictionary GetDataParams() - { - var series = new Series[] - { - new(SeriesId: 1, Title: "IT Crowd", ReleaseDate: DateTime.Parse("2006-02-03"), - Info: "The IT Crowd is a British sitcom produced by Channel 4, written by Graham Linehan, " + - "produced by Ash Atalla and starring Chris O'Dowd, Richard Ayoade, Katherine Parkinson, " + - "and Matt Berry."), - new(SeriesId: 2, Title: "Silicon Valley", ReleaseDate: DateTime.Parse("2014-04-06"), - Info: "Silicon Valley is an American comedy television series created by Mike Judge, " + - "John Altschuler and Dave Krinsky. The series focuses on five young men who founded " + - "a startup company in Silicon Valley.") - }; - - var seasons = new Season[] - { - new(1, 1, "Season 1", DateTime.Parse("2006-02-03"), DateTime.Parse("2006-03-03")), - new(1, 2, "Season 2", DateTime.Parse("2007-08-24"), DateTime.Parse("2007-09-28")), - new(1, 3, "Season 3", DateTime.Parse("2008-11-21"), DateTime.Parse("2008-12-26")), - new(1, 4, "Season 4", DateTime.Parse("2010-06-25"), DateTime.Parse("2010-07-30")), - new(2, 1, "Season 1", DateTime.Parse("2014-04-06"), DateTime.Parse("2014-06-01")), - new(2, 2, "Season 2", DateTime.Parse("2015-04-12"), DateTime.Parse("2015-06-14")), - new(2, 3, "Season 3", DateTime.Parse("2016-04-24"), DateTime.Parse("2016-06-26")), - new(2, 4, "Season 4", DateTime.Parse("2017-04-23"), DateTime.Parse("2017-06-25")), - new(2, 5, "Season 5", DateTime.Parse("2018-03-25"), DateTime.Parse("2018-05-13")) - }; - - var episodes = new Episode[] - { - new(1, 1, 1, "Yesterday's Jam", DateTime.Parse("2006-02-03")), - new(1, 1, 2, "Calamity Jen", DateTime.Parse("2006-02-03")), - new(1, 1, 3, "Fifty-Fifty", DateTime.Parse("2006-02-10")), - new(1, 1, 4, "The Red Door", DateTime.Parse("2006-02-17")), - new(1, 1, 5, "The Haunting of Bill Crouse", DateTime.Parse("2006-02-24")), - new(1, 1, 6, "Aunt Irma Visits", DateTime.Parse("2006-03-03")), - new(1, 2, 1, "The Work Outing", DateTime.Parse("2006-08-24")), - new(1, 2, 2, "Return of the Golden Child", DateTime.Parse("2007-08-31")), - new(1, 2, 3, "Moss and the German", DateTime.Parse("2007-09-07")), - new(1, 2, 4, "The Dinner Party", DateTime.Parse("2007-09-14")), - new(1, 2, 5, "Smoke and Mirrors", DateTime.Parse("2007-09-21")), - new(1, 2, 6, "Men Without Women", DateTime.Parse("2007-09-28")), - new(1, 3, 1, "From Hell", DateTime.Parse("2008-11-21")), - new(1, 3, 2, "Are We Not Men?", DateTime.Parse("2008-11-28")), - new(1, 3, 3, "Tramps Like Us", DateTime.Parse("2008-12-05")), - new(1, 3, 4, "The Speech", DateTime.Parse("2008-12-12")), - new(1, 3, 5, "Friendface", DateTime.Parse("2008-12-19")), - new(1, 3, 6, "Calendar Geeks", DateTime.Parse("2008-12-26")), - new(1, 4, 1, "Jen The Fredo", DateTime.Parse("2010-06-25")), - new(1, 4, 2, "The Final Countdown", DateTime.Parse("2010-07-02")), - new(1, 4, 3, "Something Happened", DateTime.Parse("2010-07-09")), - new(1, 4, 4, "Italian For Beginners", DateTime.Parse("2010-07-16")), - new(1, 4, 5, "Bad Boys", DateTime.Parse("2010-07-23")), - new(1, 4, 6, "Reynholm vs Reynholm", DateTime.Parse("2010-07-30")), - new(2, 1, 1, "Minimum Viable Product", DateTime.Parse("2014-04-06")), - new(2, 1, 2, "The Cap Table", DateTime.Parse("2014-04-13")), - new(2, 1, 3, "Articles of Incorporation", DateTime.Parse("2014-04-20")), - new(2, 1, 4, "Fiduciary Duties", DateTime.Parse("2014-04-27")), - new(2, 1, 5, "Signaling Risk", DateTime.Parse("2014-05-04")), - new(2, 1, 6, "Third Party Insourcing", DateTime.Parse("2014-05-11")), - new(2, 1, 7, "Proof of Concept", DateTime.Parse("2014-05-18")), - new(2, 1, 8, "Optimal Tip-to-Tip Efficiency", DateTime.Parse("2014-06-01")), - new(2, 2, 1, "Sand Hill Shuffle", DateTime.Parse("2015-04-12")), - new(2, 2, 2, "Runaway Devaluation", DateTime.Parse("2015-04-19")), - new(2, 2, 3, "Bad Money", DateTime.Parse("2015-04-26")), - new(2, 2, 4, "The Lady", DateTime.Parse("2015-05-03")), - new(2, 2, 5, "Server Space", DateTime.Parse("2015-05-10")), - new(2, 2, 6, "Homicide", DateTime.Parse("2015-05-17")), - new(2, 2, 7, "Adult Content", DateTime.Parse("2015-05-24")), - new(2, 2, 8, "White Hat/Black Hat", DateTime.Parse("2015-05-31")), - new(2, 2, 9, "Binding Arbitration", DateTime.Parse("2015-06-07")), - new(2, 2, 10, "Two Days of the Condor", DateTime.Parse("2015-06-14")), - new(2, 3, 1, "Founder Friendly", DateTime.Parse("2016-04-24")), - new(2, 3, 2, "Two in the Box", DateTime.Parse("2016-05-01")), - new(2, 3, 3, "Meinertzhagen's Haversack", DateTime.Parse("2016-05-08")), - new(2, 3, 4, "Maleant Data Systems Solutions", DateTime.Parse("2016-05-15")), - new(2, 3, 5, "The Empty Chair", DateTime.Parse("2016-05-22")), - new(2, 3, 6, "Bachmanity Insanity", DateTime.Parse("2016-05-29")), - new(2, 3, 7, "To Build a Better Beta", DateTime.Parse("2016-06-05")), - new(2, 3, 8, "Bachman's Earnings Over-Ride", DateTime.Parse("2016-06-12")), - new(2, 3, 9, "Daily Active Users", DateTime.Parse("2016-06-19")), - new(2, 3, 10, "The Uptick", DateTime.Parse("2016-06-26")), - new(2, 4, 1, "Success Failure", DateTime.Parse("2017-04-23")), - new(2, 4, 2, "Terms of Service", DateTime.Parse("2017-04-30")), - new(2, 4, 3, "Intellectual Property", DateTime.Parse("2017-05-07")), - new(2, 4, 4, "Teambuilding Exercise", DateTime.Parse("2017-05-14")), - new(2, 4, 5, "The Blood Boy", DateTime.Parse("2017-05-21")), - new(2, 4, 6, "Customer Service", DateTime.Parse("2017-05-28")), - new(2, 4, 7, "The Patent Troll", DateTime.Parse("2017-06-04")), - new(2, 4, 8, "The Keenan Vortex", DateTime.Parse("2017-06-11")), - new(2, 4, 9, "Hooli-Con", DateTime.Parse("2017-06-18")), - new(2, 4, 10, "Server Error", DateTime.Parse("2017-06-25")), - new(2, 5, 1, "Grow Fast or Die Slow", DateTime.Parse("2018-03-25")), - new(2, 5, 2, "Reorientation", DateTime.Parse("2018-04-01")), - new(2, 5, 3, "Chief Operating Officer", DateTime.Parse("2018-04-08")), - new(2, 5, 4, "Tech Evangelist", DateTime.Parse("2018-04-15")), - new(2, 5, 5, "Facial Recognition", DateTime.Parse("2018-04-22")), - new(2, 5, 6, "Artificial Emotional Intelligence", DateTime.Parse("2018-04-29")), - new(2, 5, 7, "Initial Coin Offering", DateTime.Parse("2018-05-06")), - new(2, 5, 8, "Fifty-One Percent", DateTime.Parse("2018-05-13")) - }; - - var seriesData = series.Select(s => YdbValue.MakeStruct(new Dictionary - { - { "series_id", YdbValue.MakeUint64(s.SeriesId) }, - { "title", YdbValue.MakeUtf8(s.Title) }, - { "series_info", YdbValue.MakeUtf8(s.Info) }, - { "release_date", YdbValue.MakeDate(s.ReleaseDate) } - })).ToList(); - - var seasonsData = seasons.Select(s => YdbValue.MakeStruct(new Dictionary - { - { "series_id", YdbValue.MakeUint64(s.SeriesId) }, - { "season_id", YdbValue.MakeUint64(s.SeasonId) }, - { "title", YdbValue.MakeUtf8(s.Title) }, - { "first_aired", YdbValue.MakeDate(s.FirstAired) }, - { "last_aired", YdbValue.MakeDate(s.LastAired) } - })).ToList(); - - var episodesData = episodes.Select(e => YdbValue.MakeStruct(new Dictionary - { - { "series_id", YdbValue.MakeUint64(e.SeriesId) }, - { "season_id", YdbValue.MakeUint64(e.SeasonId) }, - { "episode_id", YdbValue.MakeUint64(e.EpisodeId) }, - { "title", YdbValue.MakeUtf8(e.Title) }, - { "air_date", YdbValue.MakeDate(e.AirDate) } - })).ToList(); - - return new Dictionary - { - { "$seriesData", YdbValue.MakeList(seriesData) }, - { "$seasonsData", YdbValue.MakeList(seasonsData) }, - { "$episodesData", YdbValue.MakeList(episodesData) } - }; - } -} \ No newline at end of file diff --git a/examples/Ydb.Sdk.QueryService.QuickStart/Program.cs b/examples/Ydb.Sdk.QueryService.QuickStart/Program.cs deleted file mode 100644 index fd59ecde..00000000 --- a/examples/Ydb.Sdk.QueryService.QuickStart/Program.cs +++ /dev/null @@ -1,51 +0,0 @@ -using CommandLine; -using Microsoft.Extensions.DependencyInjection; -using Microsoft.Extensions.Logging; -using Microsoft.Extensions.Logging.Abstractions; - -namespace Ydb.Sdk.Examples; - -internal class CmdOptions -{ - [Option('e', "endpoint", Required = true, HelpText = "Database endpoint")] - public string Endpoint { get; set; } = ""; - - [Option('d', "database", Required = true, HelpText = "Database name")] - public string Database { get; set; } = ""; - - [Option('p', "path", HelpText = "Base path for tables")] - public string Path { get; set; } = "ydb-dotnet-basic"; - - [Option("anonymous", Required = false, HelpText = "Fallback anonymous")] - public bool FallbackAnonymous { get; set; } = false; -} - -internal static class Program -{ - private static ServiceProvider GetServiceProvider() => - new ServiceCollection() - .AddLogging(configure => configure.AddConsole().SetMinimumLevel(LogLevel.Information)) - .BuildServiceProvider(); - - private static async Task Run(CmdOptions cmdOptions) - { - await using var serviceProvider = GetServiceProvider(); - var loggerFactory = serviceProvider.GetService(); - - loggerFactory ??= NullLoggerFactory.Instance; - - await QueryExample.Run( - endpoint: cmdOptions.Endpoint, - database: cmdOptions.Database, - path: cmdOptions.Path, - loggerFactory: loggerFactory - ); - } - - public static async Task Main(string[] args) - { - AppContext.SetSwitch("System.Net.Http.SocketsHttpHandler.Http2UnencryptedSupport", true); - - await Parser.Default.ParseArguments(args).WithParsedAsync(Run); - } -} \ No newline at end of file diff --git a/examples/Ydb.Sdk.QueryService.QuickStart/QueryExample.cs b/examples/Ydb.Sdk.QueryService.QuickStart/QueryExample.cs deleted file mode 100644 index 46e12400..00000000 --- a/examples/Ydb.Sdk.QueryService.QuickStart/QueryExample.cs +++ /dev/null @@ -1,333 +0,0 @@ -using Microsoft.Extensions.Logging; -using Ydb.Sdk.Services.Query; -using Ydb.Sdk.Value; - -namespace Ydb.Sdk.Examples; - -public class QueryExample -{ - private QueryClient Client { get; } - private string BasePath { get; } - - private QueryExample(QueryClient client, string database, string path) - { - Client = client; - BasePath = string.Join('/', database, path); - } - - public static async Task Run( - string endpoint, - string database, - string path, - ILoggerFactory loggerFactory) - { - var config = new DriverConfig(endpoint: endpoint, database: database); - - await using var driver = await Driver.CreateInitialized( - config: config, - loggerFactory: loggerFactory - ); - - await using var tableClient = new QueryClient(driver, new QueryClientConfig()); - - var example = new QueryExample(tableClient, database, path); - - await example.SchemeQuery(); - await example.FillData(); - await example.SimpleSelect(1); - await example.SimpleUpsert(10, "Coming soon", DateTime.UtcNow); - await example.SimpleSelect(10); - await example.InteractiveTx(); - await example.StreamSelect(); - await example.ReadScalar(); - await example.ReadSingleRow(); - await example.ReadAllRows(); - await example.ReadAllResultSets(); - } - - private static ExecuteQuerySettings DefaultQuerySettings => - new() - { - // Transport timeout from the moment operation was sent to server. It is useful in case - // of possible network issues, to that query doesn't hang forever. - // It is recommended to set this value to a larger value than OperationTimeout to give - // server some time to issue a response. - TransportTimeout = TimeSpan.FromSeconds(5), - - Syntax = Syntax.YqlV1 - }; - - - private async Task SchemeQuery() - { - var createQuery = @$" - PRAGMA TablePathPrefix('{BasePath}'); - - CREATE TABLE series ( - series_id Uint64, - title Utf8, - series_info Utf8, - release_date Date, - PRIMARY KEY (series_id) - ); - - CREATE TABLE seasons ( - series_id Uint64, - season_id Uint64, - title Utf8, - first_aired Date, - last_aired Date, - PRIMARY KEY (series_id, season_id) - ); - - CREATE TABLE episodes ( - series_id Uint64, - season_id Uint64, - episode_id Uint64, - title Utf8, - air_date Date, - PRIMARY KEY (series_id, season_id, episode_id) - ); - "; - - - await Client.Exec( - query: createQuery - ); - } - - private async Task FillData() - { - var query = @$" - PRAGMA TablePathPrefix('{BasePath}'); - - REPLACE INTO series - SELECT * FROM AS_TABLE($seriesData); - - REPLACE INTO seasons - SELECT * FROM AS_TABLE($seasonsData); - - REPLACE INTO episodes - SELECT * FROM AS_TABLE($episodesData); - "; - - await Client.Exec( - query: query, - parameters: DataUtils.GetDataParams(), - txMode: TxMode.SerializableRw, - settings: DefaultQuerySettings - ); - } - - - private async Task SimpleSelect(ulong id) - { - var query = @$" - PRAGMA TablePathPrefix('{BasePath}'); - - SELECT * - FROM series - WHERE series_id = $id; - "; - - var parameters = new Dictionary { { "$id", (YdbValue)id } }; - - - var response = await Client.ReadAllRows( - query, - parameters: parameters - ); - - foreach (var row in response) - { - var series = Series.FromRow(row); - Console.WriteLine("> Series, " + - $"series_id: {series.SeriesId}, " + - $"title: {series.Title}, " + - $"release_date: {series.ReleaseDate}"); - } - } - - - private async Task SimpleUpsert(ulong id, string title, DateTime date) - { - var query = @$" - PRAGMA TablePathPrefix('{BasePath}'); - - UPSERT INTO series (series_id, title, release_date) VALUES - ($id, $title, $release_date); - "; - var parameters = new Dictionary - { - { "$id", YdbValue.MakeUint64(id) }, - { "$title", YdbValue.MakeUtf8(title) }, - { "$release_date", YdbValue.MakeDate(date) } - }; - - await Client.Exec( - query, - parameters - ); - } - - - private Task InteractiveTx() => - Client.DoTx(async tx => - { - var query1 = @$" - PRAGMA TablePathPrefix('{BasePath}'); - - SELECT first_aired FROM seasons - WHERE series_id = $series_id AND season_id = $season_id - LIMIT 1; - "; - var parameters1 = new Dictionary - { - { "$series_id", YdbValue.MakeUint64(1) }, - { "$season_id", YdbValue.MakeUint64(3) } - }; - - var response = await tx.ReadRow(query1, parameters1); - - var newAired = response![0].GetOptionalDate()!.Value.AddDays(2); - - var query2 = @$" - PRAGMA TablePathPrefix('{BasePath}'); - - UPSERT INTO seasons (series_id, season_id, first_aired) VALUES - ($series_id, $season_id, $first_aired); - "; - var parameters2 = new Dictionary - { - { "$series_id", YdbValue.MakeUint64(1) }, - { "$season_id", YdbValue.MakeUint64(3) }, - { "$first_aired", YdbValue.MakeDate(newAired) } - }; - - await tx.Exec(query2, parameters2); - } - ); - - private async Task StreamSelect() - { - var query = @$" - PRAGMA TablePathPrefix('{BasePath}'); - - SELECT * - FROM series; - "; - - - await Client.DoTx(async tx => - { - await foreach (var part in await tx.Stream(query, commit: true)) - { - foreach (var row in part.ResultSet!.Rows) - { - Console.WriteLine(Series.FromRow(row)); - } - } - }); - } - - private async Task ReadScalar() - { - var query = @$" - PRAGMA TablePathPrefix('{BasePath}'); - - SELECT COUNT(*) - FROM series; - "; - - - var row = await Client.ReadRow(query); - - var count = row![0].GetUint64(); - - Console.WriteLine($"There is {count} rows in 'series' table"); - } - - private async Task ReadSingleRow() - { - Console.WriteLine("StreamSelect"); - var query = @$" - PRAGMA TablePathPrefix('{BasePath}'); - - SELECT * - FROM series - LIMIT 1; - "; - - - var row = await Client.ReadRow(query); - - var series = Series.FromRow(row!); - - Console.WriteLine($"First row in 'series' table is {series}"); - } - - private async Task ReadAllRows() - { - Console.WriteLine("StreamSelect"); - var query = @$" - PRAGMA TablePathPrefix('{BasePath}'); - - SELECT * - FROM series; - "; - - - var response = await Client.ReadAllRows(query); - - var series = response.Select(Series.FromRow); - - Console.WriteLine("'series' table contains:"); - foreach (var elem in series) - { - Console.WriteLine($"\t{elem}"); - } - } - - private async Task ReadAllResultSets() - { - Console.WriteLine("StreamSelect"); - var query = @$" - PRAGMA TablePathPrefix('{BasePath}'); - - SELECT * - FROM series; -- First result set - - SELECT * - FROM episodes; -- Second result set - "; - - - var resultSets = await Client.DoTx(async tx => - { - var resultSets = new List(); - await foreach (var resultSet in await tx.Stream(query, commit: true)) - { - resultSets.Add(resultSet.ResultSet!); - } - - return resultSets; - }); - - var seriesSet = resultSets[0]; - var episodesSet = resultSets[1]; - - Console.WriteLine("Multiple sets selected:"); - - Console.WriteLine("\t'series' contains:"); - foreach (var row in seriesSet.Rows) - { - Console.WriteLine($"\t\t{Series.FromRow(row)}"); - } - - Console.WriteLine("\t'episodes' contains:"); - foreach (var row in episodesSet.Rows) - { - Console.WriteLine($"\t\t{Episode.FromRow(row)}"); - } - } -} \ No newline at end of file diff --git a/examples/Ydb.Sdk.QueryService.QuickStart/Ydb.Sdk.QueryService.QuickStart.csproj b/examples/Ydb.Sdk.QueryService.QuickStart/Ydb.Sdk.QueryService.QuickStart.csproj deleted file mode 100644 index 4e349d82..00000000 --- a/examples/Ydb.Sdk.QueryService.QuickStart/Ydb.Sdk.QueryService.QuickStart.csproj +++ /dev/null @@ -1,27 +0,0 @@ - - - - Exe - net6.0 - enable - enable - Ydb.Sdk.Examples.QueryExample - Ydb.Sdk.Examples - - - - git - https://github.com/ydb-platform/ydb-dotnet-sdk - https://github.com/ydb-platform/ydb-dotnet-sdk - YANDEX LLC - - - - - - - - - - - diff --git a/examples/YdbExamples.sln b/examples/YdbExamples.sln index 4e37d6b8..31c103ee 100644 --- a/examples/YdbExamples.sln +++ b/examples/YdbExamples.sln @@ -3,8 +3,6 @@ Microsoft Visual Studio Solution File, Format Version 12.00 # Visual Studio Version 16 VisualStudioVersion = 16.0.31205.134 MinimumVisualStudioVersion = 10.0.40219.1 -Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Ydb.Sdk.QueryService.QuickStart", "Ydb.Sdk.QueryService.QuickStart\Ydb.Sdk.QueryService.QuickStart.csproj", "{0BA2CD4F-BF38-4C0D-878B-3D0C2C2970D9}" -EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Ydb.Sdk.AdoNet.QuickStart", "Ydb.Sdk.AdoNet.QuickStart\Ydb.Sdk.AdoNet.QuickStart.csproj", "{5030F1BC-E974-46BB-9381-C4356E120D96}" EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Ydb.Sdk.AdoNet.Dapper.QuickStart", "Ydb.Sdk.AdoNet.Dapper.QuickStart\Ydb.Sdk.AdoNet.Dapper.QuickStart.csproj", "{AC8F1B10-31EB-4A29-BF35-7F49B06E1FA8}" @@ -35,10 +33,6 @@ Global Release|Any CPU = Release|Any CPU EndGlobalSection GlobalSection(ProjectConfigurationPlatforms) = postSolution - {0BA2CD4F-BF38-4C0D-878B-3D0C2C2970D9}.Debug|Any CPU.ActiveCfg = Debug|Any CPU - {0BA2CD4F-BF38-4C0D-878B-3D0C2C2970D9}.Debug|Any CPU.Build.0 = Debug|Any CPU - {0BA2CD4F-BF38-4C0D-878B-3D0C2C2970D9}.Release|Any CPU.ActiveCfg = Release|Any CPU - {0BA2CD4F-BF38-4C0D-878B-3D0C2C2970D9}.Release|Any CPU.Build.0 = Release|Any CPU {5030F1BC-E974-46BB-9381-C4356E120D96}.Debug|Any CPU.ActiveCfg = Debug|Any CPU {5030F1BC-E974-46BB-9381-C4356E120D96}.Debug|Any CPU.Build.0 = Debug|Any CPU {5030F1BC-E974-46BB-9381-C4356E120D96}.Release|Any CPU.ActiveCfg = Release|Any CPU diff --git a/slo/src/AdoNet/AdoNet.csproj b/slo/src/AdoNet/AdoNet.csproj index 8abfeaa7..99a59412 100644 --- a/slo/src/AdoNet/AdoNet.csproj +++ b/slo/src/AdoNet/AdoNet.csproj @@ -11,9 +11,4 @@ - - - - - diff --git a/slo/src/AdoNet/SloTableContext.cs b/slo/src/AdoNet/SloTableContext.cs index 7f76439c..43190480 100644 --- a/slo/src/AdoNet/SloTableContext.cs +++ b/slo/src/AdoNet/SloTableContext.cs @@ -1,21 +1,20 @@ using System.Data; using Internal; -using Polly; -using Ydb.Sdk; using Ydb.Sdk.Ado; +using Ydb.Sdk.Ado.RetryPolicy; namespace AdoNet; public class SloTableContext : SloTableContext { - private static readonly AsyncPolicy Policy = Polly.Policy - .Handle(exception => exception.IsTransient) - .RetryAsync(10); - protected override string Job => "AdoNet"; protected override YdbDataSource CreateClient(Config config) => new( - new YdbConnectionStringBuilder(config.ConnectionString) { LoggerFactory = ISloContext.Factory } + new YdbConnectionStringBuilder(config.ConnectionString) + { + LoggerFactory = ISloContext.Factory, + RetryPolicy = new YdbRetryPolicy(new YdbRetryPolicyConfig { EnableRetryIdempotence = true }) + } ); protected override async Task Create(YdbDataSource client, int operationTimeout) @@ -38,18 +37,16 @@ PRIMARY KEY (Guid, Id) }.ExecuteNonQueryAsync(); } - protected override async Task<(int, StatusCode)> Save( + protected override async Task Save( YdbDataSource client, SloTable sloTable, int writeTimeout ) { var attempts = 0; - var policyResult = await Policy.ExecuteAndCaptureAsync(async _ => + await client.ExecuteAsync(async ydbConnection => { attempts++; - await using var ydbConnection = await client.OpenConnectionAsync(); - var ydbCommand = new YdbCommand(ydbConnection) { CommandText = $""" @@ -93,24 +90,22 @@ int writeTimeout }; await ydbCommand.ExecuteNonQueryAsync(); - }, new Context()); + }); - return (attempts, ((YdbException)policyResult.FinalException)?.Code ?? StatusCode.Success); + return attempts; } - protected override async Task<(int, StatusCode, object?)> Select( + protected override async Task<(int, object?)> Select( YdbDataSource client, (Guid Guid, int Id) select, int readTimeout ) { var attempts = 0; - var policyResult = await Policy.ExecuteAndCaptureAsync(async _ => + var policyResult = await client.ExecuteAsync(async ydbConnection => { attempts++; - await using var ydbConnection = await client.OpenConnectionAsync(); - var ydbCommand = new YdbCommand(ydbConnection) { CommandText = $""" @@ -126,9 +121,9 @@ int readTimeout }; return await ydbCommand.ExecuteScalarAsync(); - }, new Context()); + }); - return (attempts, ((YdbException)policyResult.FinalException)?.Code ?? StatusCode.Success, policyResult.Result); + return (attempts, policyResult); } protected override async Task SelectCount(YdbDataSource client) diff --git a/slo/src/Dapper/AdoNet.Dapper.csproj b/slo/src/Dapper/AdoNet.Dapper.csproj index a3845e65..571dbbf8 100644 --- a/slo/src/Dapper/AdoNet.Dapper.csproj +++ b/slo/src/Dapper/AdoNet.Dapper.csproj @@ -13,7 +13,6 @@ - diff --git a/slo/src/Dapper/SloTableContext.cs b/slo/src/Dapper/SloTableContext.cs index b2e8ac99..28122bdf 100644 --- a/slo/src/Dapper/SloTableContext.cs +++ b/slo/src/Dapper/SloTableContext.cs @@ -1,21 +1,20 @@ using Dapper; using Internal; -using Polly; -using Ydb.Sdk; using Ydb.Sdk.Ado; +using Ydb.Sdk.Ado.RetryPolicy; namespace AdoNet.Dapper; public class SloTableContext : SloTableContext { - private static readonly AsyncPolicy Policy = Polly.Policy - .Handle(exception => exception.IsTransient) - .RetryAsync(10); - protected override string Job => "Dapper"; protected override YdbDataSource CreateClient(Config config) => new( - new YdbConnectionStringBuilder(config.ConnectionString) { LoggerFactory = ISloContext.Factory } + new YdbConnectionStringBuilder(config.ConnectionString) + { + LoggerFactory = ISloContext.Factory, + RetryPolicy = new YdbRetryPolicy(new YdbRetryPolicyConfig { EnableRetryIdempotence = true }) + } ); protected override async Task Create(YdbDataSource client, int operationTimeout) @@ -34,42 +33,40 @@ PRIMARY KEY (Guid, Id) """); } - protected override async Task<(int, StatusCode)> Save(YdbDataSource client, SloTable sloTable, int writeTimeout) + protected override async Task Save(YdbDataSource client, SloTable sloTable, int writeTimeout) { var attempt = 0; - var policyResult = await Policy.ExecuteAndCaptureAsync(async _ => + await client.ExecuteAsync(async ydbConnection => { attempt++; - await using var connection = await client.OpenConnectionAsync(); - await connection.ExecuteAsync($""" - UPSERT INTO `{SloTable.Name}` (Guid, Id, PayloadStr, PayloadDouble, PayloadTimestamp) - VALUES (@Guid, @Id, @PayloadStr, @PayloadDouble, @PayloadTimestamp) - """, sloTable); - }, new Context() + await ydbConnection.ExecuteAsync( + $""" + UPSERT INTO `{SloTable.Name}` (Guid, Id, PayloadStr, PayloadDouble, PayloadTimestamp) + VALUES (@Guid, @Id, @PayloadStr, @PayloadDouble, @PayloadTimestamp) + """, sloTable); + } ); - return (attempt, ((YdbException)policyResult.FinalException)?.Code ?? StatusCode.Success); + return attempt; } - protected override async Task<(int, StatusCode, object?)> Select(YdbDataSource client, (Guid Guid, int Id) select, + protected override async Task<(int, object?)> Select(YdbDataSource client, (Guid Guid, int Id) select, int readTimeout) { var attempts = 0; - var policyResult = await Policy.ExecuteAndCaptureAsync(async _ => + var policyResult = await client.ExecuteAsync(async ydbConnection => { attempts++; - await using var connection = await client.OpenConnectionAsync(); - - return await connection.QueryFirstOrDefaultAsync( + return await ydbConnection.QueryFirstOrDefaultAsync( $""" SELECT Guid, Id, PayloadStr, PayloadDouble, PayloadTimestamp FROM `{SloTable.Name}` WHERE Guid = @Guid AND Id = @Id; """, new { select.Guid, select.Id } ); - }, new Context()); + }); - return (attempts, ((YdbException)policyResult.FinalException)?.Code ?? StatusCode.Success, policyResult.Result); + return (attempts, policyResult); } protected override async Task SelectCount(YdbDataSource client) diff --git a/slo/src/EF/SloTableContext.cs b/slo/src/EF/SloTableContext.cs index a85b35a6..2e6cc204 100644 --- a/slo/src/EF/SloTableContext.cs +++ b/slo/src/EF/SloTableContext.cs @@ -3,7 +3,6 @@ using Internal; using Microsoft.EntityFrameworkCore; using Microsoft.EntityFrameworkCore.Infrastructure; -using Ydb.Sdk; using Ydb.Sdk.Ado; namespace EF; @@ -25,7 +24,7 @@ int operationTimeout await dbContext.Database.ExecuteSqlRawAsync(SloTable.Options); } - protected override async Task<(int, StatusCode)> Save( + protected override async Task Save( PooledDbContextFactory client, SloTable sloTable, int writeTimeout @@ -72,19 +71,18 @@ await executeStrategy.ExecuteAsync(async () => }); }); - return (1, StatusCode.Success); + return 0; } - protected override async Task<(int, StatusCode, object?)> Select( + protected override async Task<(int, object?)> Select( PooledDbContextFactory client, (Guid Guid, int Id) select, int readTimeout ) { await using var dbContext = await client.CreateDbContextAsync(); - return (0, StatusCode.Success, - await dbContext.SloEntities.FirstOrDefaultAsync(table => - table.Guid == select.Guid && table.Id == select.Id)); + return (0, await dbContext.SloEntities.FirstOrDefaultAsync(table => + table.Guid == select.Guid && table.Id == select.Id)); } protected override async Task SelectCount(PooledDbContextFactory client) diff --git a/slo/src/Internal/SloTableContext.cs b/slo/src/Internal/SloTableContext.cs index 3eea759d..8abcceab 100644 --- a/slo/src/Internal/SloTableContext.cs +++ b/slo/src/Internal/SloTableContext.cs @@ -5,6 +5,7 @@ using NLog.Extensions.Logging; using Prometheus; using Ydb.Sdk; +using Ydb.Sdk.Ado; namespace Internal; @@ -129,7 +130,7 @@ public async Task Run(RunConfig runConfig) return; async Task ShootingTask(RateLimiter rateLimitPolicy, string operationType, - Func> action) + Func> action) { var metricFactory = Metrics.WithLabels(new Dictionary { @@ -213,32 +214,25 @@ async Task ShootingTask(RateLimiter rateLimitPolicy, string operationType, await Task.Delay(Random.Shared.Next(IntervalMs / 2), cancellationTokenSource.Token); } + pendingOperations.Inc(); + var sw = Stopwatch.StartNew(); try { - pendingOperations.Inc(); - var sw = Stopwatch.StartNew(); - var (attempts, statusCode) = await action(client, runConfig); + var attempts = await action(client, runConfig); sw.Stop(); - retryAttempts.Set(attempts); operationsTotal.Inc(); pendingOperations.Dec(); - - if (statusCode != StatusCode.Success) - { - errorsTotal.WithLabels(statusCode.StatusName()).Inc(); - operationsFailureTotal.Inc(); - operationLatencySeconds.WithLabels("err").Observe(sw.Elapsed.TotalSeconds); - } - else - { - operationsSuccessTotal.Inc(); - operationLatencySeconds.WithLabels("success").Observe(sw.Elapsed.TotalSeconds); - } + operationsSuccessTotal.Inc(); + operationLatencySeconds.WithLabels("success").Observe(sw.Elapsed.TotalSeconds); } - catch (Exception e) + catch (YdbException e) { Logger.LogError(e, "Fail operation!"); + + errorsTotal.WithLabels(e.Code.StatusName()).Inc(); + operationsFailureTotal.Inc(); + operationLatencySeconds.WithLabels("err").Observe(sw.Elapsed.TotalSeconds); } } }, cancellationTokenSource.Token)); @@ -252,13 +246,13 @@ async Task ShootingTask(RateLimiter rateLimitPolicy, string operationType, } // return attempt count & StatusCode operation - protected abstract Task<(int, StatusCode)> Save(T client, SloTable sloTable, int writeTimeout); + protected abstract Task Save(T client, SloTable sloTable, int writeTimeout); - protected abstract Task<(int, StatusCode, object?)> Select(T client, (Guid Guid, int Id) select, int readTimeout); + protected abstract Task<(int, object?)> Select(T client, (Guid Guid, int Id) select, int readTimeout); protected abstract Task SelectCount(T client); - private Task<(int, StatusCode)> Save(T client, Config config) + private Task Save(T client, Config config) { const int minSizeStr = 20; const int maxSizeStr = 40; @@ -278,13 +272,12 @@ async Task ShootingTask(RateLimiter rateLimitPolicy, string operationType, return Save(client, sloTable, config.WriteTimeout); } - private async Task<(int, StatusCode)> Select(T client, RunConfig config) + private async Task Select(T client, RunConfig config) { var id = Random.Shared.Next(_maxId); - var (attempts, code, _) = - await Select(client, new ValueTuple(GuidFromInt(id), id), config.ReadTimeout); + var (attempts, _) = await Select(client, new ValueTuple(GuidFromInt(id), id), config.ReadTimeout); - return (attempts, code); + return attempts; } private static Guid GuidFromInt(int value) diff --git a/src/Ydb.Sdk/CHANGELOG.md b/src/Ydb.Sdk/CHANGELOG.md index 336f08a8..e6a04532 100644 --- a/src/Ydb.Sdk/CHANGELOG.md +++ b/src/Ydb.Sdk/CHANGELOG.md @@ -1,3 +1,5 @@ +- Feat ADO.NET: Added YdbDataSource `ExecuteAsync` and `ExecuteInTransaction` convenience methods. +- **Breaking Change**: moved and renamed `Ydb.Sdk.Services.Query.TxMode` -> `Ydb.Sdk.Ado.TransactionMode`. - Feat ADO.NET: cache gRPC transport by `gRPCConnectionString` to reuse channels. - Fix bug wrap-around ADO.NET: Big parameterized Decimal — `((ulong)bits[1] << 32)` -> `((ulong)(uint)bits[1] << 32)`. - Feat ADO.NET: Parameterized Decimal overflow check: `Precision` and `Scale`. diff --git a/src/Ydb.Sdk/src/Ado/Internal/StatusCodeUtils.cs b/src/Ydb.Sdk/src/Ado/Internal/StatusCodeUtils.cs index 6598d9f9..8cf2d377 100644 --- a/src/Ydb.Sdk/src/Ado/Internal/StatusCodeUtils.cs +++ b/src/Ydb.Sdk/src/Ado/Internal/StatusCodeUtils.cs @@ -32,9 +32,4 @@ StatusCode.Unavailable or StatusCode.Overloaded or StatusCode.SessionExpired or StatusCode.ClientTransportResourceExhausted; - - internal static bool IsTransientWhenIdempotent(this StatusCode statusCode) => statusCode.IsTransient() || - statusCode is StatusCode.Undetermined or - StatusCode.ClientTransportUnknown or - StatusCode.ClientTransportUnavailable; } diff --git a/src/Ydb.Sdk/src/Ado/RetryPolicy/YdbRetryPolicy.cs b/src/Ydb.Sdk/src/Ado/RetryPolicy/YdbRetryPolicy.cs index b9f96abd..ae4a641d 100644 --- a/src/Ydb.Sdk/src/Ado/RetryPolicy/YdbRetryPolicy.cs +++ b/src/Ydb.Sdk/src/Ado/RetryPolicy/YdbRetryPolicy.cs @@ -21,7 +21,7 @@ public class YdbRetryPolicy : IRetryPolicy public YdbRetryPolicy(YdbRetryPolicyConfig config) { - _maxAttempt = config.MaxAttempt; + _maxAttempt = config.MaxAttempts; _fastBackoffBaseMs = config.FastBackoffBaseMs; _slowBackoffBaseMs = config.SlowBackoffBaseMs; _fastCeiling = (int)Math.Ceiling(Math.Log(config.FastCapBackoffMs + 1, 2)); @@ -39,7 +39,7 @@ internal YdbRetryPolicy(YdbRetryPolicyConfig config, IRandom random) : this(conf public TimeSpan? GetNextDelay(YdbException ydbException, int attempt) { - if (attempt >= _maxAttempt || (!_enableRetryIdempotence && !ydbException.IsTransient)) + if (attempt >= _maxAttempt - 1 || (!_enableRetryIdempotence && !ydbException.IsTransient)) return null; return ydbException.Code switch diff --git a/src/Ydb.Sdk/src/Ado/RetryPolicy/YdbRetryPolicyConfig.cs b/src/Ydb.Sdk/src/Ado/RetryPolicy/YdbRetryPolicyConfig.cs index 2e6950bc..017c3d2c 100644 --- a/src/Ydb.Sdk/src/Ado/RetryPolicy/YdbRetryPolicyConfig.cs +++ b/src/Ydb.Sdk/src/Ado/RetryPolicy/YdbRetryPolicyConfig.cs @@ -4,7 +4,7 @@ public class YdbRetryPolicyConfig { public static readonly YdbRetryPolicyConfig Default = new(); - public int MaxAttempt { get; init; } = 10; + public int MaxAttempts { get; init; } = 10; public int FastBackoffBaseMs { get; init; } = 5; @@ -15,4 +15,11 @@ public class YdbRetryPolicyConfig public int SlowCapBackoffMs { get; init; } = 5_000; public bool EnableRetryIdempotence { get; init; } = false; + + public override string ToString() => $"MaxAttempt={MaxAttempts};" + + $"FastBackoffBaseMs={FastBackoffBaseMs};" + + $"SlowBackoffBaseMs={SlowBackoffBaseMs};" + + $"FastCapBackoffMs={FastCapBackoffMs};" + + $"SlowCapBackoffMs={SlowCapBackoffMs};" + + $"EnableRetryIdempotence={EnableRetryIdempotence}"; } diff --git a/src/Ydb.Sdk/src/Ado/RetryPolicy/YdbRetryPolicyExecutor.cs b/src/Ydb.Sdk/src/Ado/RetryPolicy/YdbRetryPolicyExecutor.cs new file mode 100644 index 00000000..59d61589 --- /dev/null +++ b/src/Ydb.Sdk/src/Ado/RetryPolicy/YdbRetryPolicyExecutor.cs @@ -0,0 +1,66 @@ +namespace Ydb.Sdk.Ado.RetryPolicy; + +internal sealed class YdbRetryPolicyExecutor +{ + private readonly IRetryPolicy _retryPolicy; + + public YdbRetryPolicyExecutor(IRetryPolicy retryPolicy) + { + _retryPolicy = retryPolicy; + } + + /// + /// Executes the specified asynchronous operation and returns the result. + /// + /// + /// A function that returns a started task of type . + /// + /// + /// A cancellation token used to cancel the retry operation, but not operations that are already in flight + /// or that already completed successfully. + /// + /// The result type of the returned by . + /// + /// A task that will run to completion if the original task completes successfully (either the + /// first time or after retrying transient failures). If the task fails with a non-transient error or + /// the retry limit is reached, the returned task will become faulted and the exception must be observed. + /// + public Task ExecuteAsync( + Func> operation, + CancellationToken cancellationToken = default + ) => ExecuteImplementationAsync(operation, cancellationToken); + + public async Task ExecuteAsync( + Func operation, + CancellationToken cancellationToken = default + ) => await ExecuteImplementationAsync(async ct => + { + await operation(ct).ConfigureAwait(false); + return 0; + }, cancellationToken).ConfigureAwait(false); + + private async Task ExecuteImplementationAsync( + Func> operation, + CancellationToken cancellationToken + ) + { + var attempt = 0; + while (true) + { + cancellationToken.ThrowIfCancellationRequested(); + + try + { + return await operation(cancellationToken).ConfigureAwait(false); + } + catch (YdbException e) + { + var delay = _retryPolicy.GetNextDelay(e, attempt++); + if (delay == null) + throw; + + await Task.Delay(delay.Value, cancellationToken).ConfigureAwait(false); + } + } + } +} diff --git a/src/Ydb.Sdk/src/Ado/Transaction/TransactionExtensions.cs b/src/Ydb.Sdk/src/Ado/Transaction/TransactionExtensions.cs new file mode 100644 index 00000000..f189f7d2 --- /dev/null +++ b/src/Ydb.Sdk/src/Ado/Transaction/TransactionExtensions.cs @@ -0,0 +1,35 @@ +using Ydb.Query; + +namespace Ydb.Sdk.Ado.Transaction; + +internal static class TransactionExtensions +{ + private static readonly TransactionSettings SerializableRw = new() + { SerializableReadWrite = new SerializableModeSettings() }; + + private static readonly TransactionSettings SnapshotRo = new() + { SnapshotReadOnly = new SnapshotModeSettings() }; + + private static readonly TransactionSettings StaleRo = new() + { StaleReadOnly = new StaleModeSettings() }; + + private static readonly TransactionSettings OnlineRo = new() + { OnlineReadOnly = new OnlineModeSettings { AllowInconsistentReads = false } }; + + private static readonly TransactionSettings OnlineInconsistentRo = new() + { OnlineReadOnly = new OnlineModeSettings { AllowInconsistentReads = true } }; + + internal static TransactionSettings TransactionSettings(this TransactionMode mode) => + mode switch + { + TransactionMode.SerializableRw => SerializableRw, + TransactionMode.SnapshotRo => SnapshotRo, + TransactionMode.StaleRo => StaleRo, + TransactionMode.OnlineRo => OnlineRo, + TransactionMode.OnlineInconsistentRo => OnlineInconsistentRo, + _ => throw new ArgumentOutOfRangeException(nameof(mode), mode, null) + }; + + internal static TransactionControl TransactionControl(this TransactionMode mode, bool commit = true) => + new() { BeginTx = mode.TransactionSettings(), CommitTx = commit }; +} diff --git a/src/Ydb.Sdk/src/Ado/Transaction/TransactionMode.cs b/src/Ydb.Sdk/src/Ado/Transaction/TransactionMode.cs new file mode 100644 index 00000000..ce516256 --- /dev/null +++ b/src/Ydb.Sdk/src/Ado/Transaction/TransactionMode.cs @@ -0,0 +1,13 @@ +// ReSharper disable once CheckNamespace + +namespace Ydb.Sdk.Ado; + +public enum TransactionMode +{ + SerializableRw, + SnapshotRo, + StaleRo, + + OnlineRo, + OnlineInconsistentRo +} diff --git a/src/Ydb.Sdk/src/Ado/YdbConnection.cs b/src/Ydb.Sdk/src/Ado/YdbConnection.cs index 1dd39ef1..848fc772 100644 --- a/src/Ydb.Sdk/src/Ado/YdbConnection.cs +++ b/src/Ydb.Sdk/src/Ado/YdbConnection.cs @@ -3,7 +3,6 @@ using System.Diagnostics.CodeAnalysis; using Ydb.Sdk.Ado.BulkUpsert; using Ydb.Sdk.Ado.Session; -using Ydb.Sdk.Services.Query; using static System.Data.IsolationLevel; namespace Ydb.Sdk.Ado; @@ -59,14 +58,14 @@ protected override YdbTransaction BeginDbTransaction(IsolationLevel isolationLev return BeginTransaction(isolationLevel switch { - Serializable or Unspecified => TxMode.SerializableRw, + Serializable or Unspecified => TransactionMode.SerializableRw, _ => throw new ArgumentException("Unsupported isolationLevel: " + isolationLevel) }); } public new YdbTransaction BeginTransaction(IsolationLevel isolationLevel) => BeginDbTransaction(isolationLevel); - public YdbTransaction BeginTransaction(TxMode txMode = TxMode.SerializableRw) + public YdbTransaction BeginTransaction(TransactionMode transactionMode = TransactionMode.SerializableRw) { ThrowIfConnectionClosed(); @@ -77,7 +76,7 @@ public YdbTransaction BeginTransaction(TxMode txMode = TxMode.SerializableRw) ); } - CurrentTransaction = new YdbTransaction(this, txMode); + CurrentTransaction = new YdbTransaction(this, transactionMode); return CurrentTransaction; } diff --git a/src/Ydb.Sdk/src/Ado/YdbConnectionStringBuilder.cs b/src/Ydb.Sdk/src/Ado/YdbConnectionStringBuilder.cs index d5ce7f51..4f990495 100644 --- a/src/Ydb.Sdk/src/Ado/YdbConnectionStringBuilder.cs +++ b/src/Ydb.Sdk/src/Ado/YdbConnectionStringBuilder.cs @@ -3,6 +3,7 @@ using System.Security.Cryptography.X509Certificates; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging.Abstractions; +using Ydb.Sdk.Ado.RetryPolicy; using Ydb.Sdk.Auth; using Ydb.Sdk.Transport; @@ -320,6 +321,10 @@ public int CreateSessionTimeout public X509Certificate2Collection? ServerCertificates { get; init; } + public IRetryPolicy RetryPolicy { get; init; } = YdbRetryPolicy.Default; + + internal YdbRetryPolicyExecutor YdbRetryPolicyExecutor => new(RetryPolicy); + private void SaveValue(string propertyName, object? value) { if (value == null) diff --git a/src/Ydb.Sdk/src/Ado/YdbDataSource.cs b/src/Ydb.Sdk/src/Ado/YdbDataSource.cs index 2a701ead..69f0ff42 100644 --- a/src/Ydb.Sdk/src/Ado/YdbDataSource.cs +++ b/src/Ydb.Sdk/src/Ado/YdbDataSource.cs @@ -1,30 +1,56 @@ +using System.Collections.Concurrent; +using Ydb.Sdk.Ado.RetryPolicy; #if NET7_0_OR_GREATER using System.Data.Common; +#endif namespace Ydb.Sdk.Ado; -public class YdbDataSource : DbDataSource +public class YdbDataSource +#if NET7_0_OR_GREATER + : DbDataSource +#else + : IAsyncDisposable +#endif { + private static readonly ConcurrentDictionary CacheYdbRetryPolicyExecutors = new(); + + private static YdbRetryPolicyExecutor GetExecutor(YdbRetryPolicyConfig config) => + CacheYdbRetryPolicyExecutors.GetOrAdd(config.ToString(), + new YdbRetryPolicyExecutor(new YdbRetryPolicy(config))); + private readonly YdbConnectionStringBuilder _ydbConnectionStringBuilder; + private readonly YdbRetryPolicyExecutor _retryPolicyExecutor; public YdbDataSource(YdbConnectionStringBuilder connectionStringBuilder) { _ydbConnectionStringBuilder = connectionStringBuilder; + _retryPolicyExecutor = _ydbConnectionStringBuilder.YdbRetryPolicyExecutor; } public YdbDataSource(string connectionString) { _ydbConnectionStringBuilder = new YdbConnectionStringBuilder(connectionString); + _retryPolicyExecutor = _ydbConnectionStringBuilder.YdbRetryPolicyExecutor; } public YdbDataSource() { _ydbConnectionStringBuilder = new YdbConnectionStringBuilder(); + _retryPolicyExecutor = _ydbConnectionStringBuilder.YdbRetryPolicyExecutor; } - protected override YdbConnection CreateDbConnection() => new(_ydbConnectionStringBuilder); + protected +#if NET7_0_OR_GREATER + override +#endif + YdbConnection CreateDbConnection() => new(_ydbConnectionStringBuilder); - protected override YdbConnection OpenDbConnection() + protected +#if NET7_0_OR_GREATER + override +#endif + YdbConnection OpenDbConnection() { var dbConnection = CreateDbConnection(); try @@ -39,11 +65,23 @@ protected override YdbConnection OpenDbConnection() } } - public new YdbConnection CreateConnection() => CreateDbConnection(); + public +#if NET7_0_OR_GREATER + new +#endif + YdbConnection CreateConnection() => CreateDbConnection(); - public new YdbConnection OpenConnection() => OpenDbConnection(); + public +#if NET7_0_OR_GREATER + new +#endif + YdbConnection OpenConnection() => OpenDbConnection(); - public new async ValueTask OpenConnectionAsync(CancellationToken cancellationToken = default) + public +#if NET7_0_OR_GREATER + new +#endif + async ValueTask OpenConnectionAsync(CancellationToken cancellationToken = default) { var ydbConnection = CreateDbConnection(); @@ -59,12 +97,290 @@ protected override YdbConnection OpenDbConnection() } } +#if NET7_0_OR_GREATER public override string ConnectionString => _ydbConnectionStringBuilder.ConnectionString; +#endif - protected override async ValueTask DisposeAsyncCore() => - await PoolManager.ClearPool(_ydbConnectionStringBuilder.ConnectionString); + protected +#if NET7_0_OR_GREATER + override +#endif + async ValueTask DisposeAsyncCore() => await PoolManager.ClearPool(_ydbConnectionStringBuilder.ConnectionString); - protected override void Dispose(bool disposing) => DisposeAsyncCore().AsTask().GetAwaiter().GetResult(); -} +#if NET7_0_OR_GREATER + protected override void Dispose(bool disposing) + { + if (disposing) + { + DisposeAsyncCore().AsTask().GetAwaiter().GetResult(); + } + } +#else + public async ValueTask DisposeAsync() + { + await DisposeAsyncCore().ConfigureAwait(false); + GC.SuppressFinalize(this); + } #endif + + public Task ExecuteAsync(Func func) => _retryPolicyExecutor + .ExecuteAsync(async cancellationToken => + { + await using var ydbConnection = await OpenConnectionAsync(cancellationToken); + await func(ydbConnection); + }); + + public Task ExecuteAsync(Func> func) => _retryPolicyExecutor + .ExecuteAsync(async cancellationToken => + { + await using var ydbConnection = await OpenConnectionAsync(cancellationToken); + return await func(ydbConnection); + }); + + public Task ExecuteAsync( + Func func, + CancellationToken cancellationToken = default + ) => _retryPolicyExecutor.ExecuteAsync(async ct => + { + await using var ydbConnection = await OpenConnectionAsync(ct); + await func(ydbConnection, ct); + }, cancellationToken); + + public Task ExecuteAsync( + Func> func, + CancellationToken cancellationToken = default + ) => _retryPolicyExecutor.ExecuteAsync(async ct => + { + await using var ydbConnection = await OpenConnectionAsync(ct); + return await func(ydbConnection, ct); + }, cancellationToken); + + public Task ExecuteAsync( + Func func, + YdbRetryPolicyConfig retryPolicyConfig + ) => GetExecutor(retryPolicyConfig).ExecuteAsync(async ct => + { + await using var ydbConnection = await OpenConnectionAsync(ct); + await func(ydbConnection); + }); + + public Task ExecuteAsync( + Func func, + IRetryPolicy retryPolicy + ) => new YdbRetryPolicyExecutor(retryPolicy).ExecuteAsync(async ct => + { + await using var ydbConnection = await OpenConnectionAsync(ct); + await func(ydbConnection); + }); + + public Task ExecuteAsync( + Func> func, + YdbRetryPolicyConfig retryPolicyConfig + ) => GetExecutor(retryPolicyConfig).ExecuteAsync(async ct => + { + await using var ydbConnection = await OpenConnectionAsync(ct); + return await func(ydbConnection); + }); + + public Task ExecuteAsync( + Func> func, + IRetryPolicy retryPolicy + ) => new YdbRetryPolicyExecutor(retryPolicy).ExecuteAsync(async ct => + { + await using var ydbConnection = await OpenConnectionAsync(ct); + return await func(ydbConnection); + }); + + public Task ExecuteAsync( + Func func, + YdbRetryPolicyConfig retryPolicyConfig, + CancellationToken cancellationToken = default + ) => GetExecutor(retryPolicyConfig).ExecuteAsync(async ct => + { + await using var ydbConnection = await OpenConnectionAsync(ct); + await func(ydbConnection, ct); + }, cancellationToken); + + public Task ExecuteAsync( + Func func, + IRetryPolicy retryPolicy, + CancellationToken cancellationToken = default + ) => new YdbRetryPolicyExecutor(retryPolicy).ExecuteAsync(async ct => + { + await using var ydbConnection = await OpenConnectionAsync(ct); + await func(ydbConnection, ct); + }, cancellationToken); + + public Task ExecuteAsync( + Func> func, + YdbRetryPolicyConfig retryPolicyConfig, + CancellationToken cancellationToken = default + ) => GetExecutor(retryPolicyConfig).ExecuteAsync(async ct => + { + await using var ydbConnection = await OpenConnectionAsync(ct); + return await func(ydbConnection, ct); + }, cancellationToken); + + public Task ExecuteAsync( + Func> func, + IRetryPolicy retryPolicy, + CancellationToken cancellationToken = default + ) => new YdbRetryPolicyExecutor(retryPolicy).ExecuteAsync(async ct => + { + await using var ydbConnection = await OpenConnectionAsync(ct); + return await func(ydbConnection, ct); + }, cancellationToken); + + public Task ExecuteInTransactionAsync( + Func func, + TransactionMode transactionMode = TransactionMode.SerializableRw + ) => _retryPolicyExecutor.ExecuteAsync(async cancellationToken => + { + await using var ydbConnection = await OpenConnectionAsync(cancellationToken); + await using var transaction = ydbConnection.BeginTransaction(transactionMode); + await func(ydbConnection); + await transaction.CommitAsync(cancellationToken); + }); + + public Task ExecuteInTransactionAsync( + Func> func, + TransactionMode transactionMode = TransactionMode.SerializableRw + ) => _retryPolicyExecutor.ExecuteAsync(async cancellationToken => + { + await using var ydbConnection = await OpenConnectionAsync(cancellationToken); + await using var transaction = ydbConnection.BeginTransaction(transactionMode); + var result = await func(ydbConnection).ConfigureAwait(false); + await transaction.CommitAsync(cancellationToken); + return result; + }); + + public Task ExecuteInTransactionAsync( + Func func, + TransactionMode transactionMode = TransactionMode.SerializableRw, + CancellationToken cancellationToken = default + ) => _retryPolicyExecutor.ExecuteAsync(async ct => + { + await using var ydbConnection = await OpenConnectionAsync(ct); + await using var transaction = ydbConnection.BeginTransaction(transactionMode); + await func(ydbConnection, ct); + await transaction.CommitAsync(ct); + }, cancellationToken); + + public Task ExecuteInTransactionAsync( + Func> func, + TransactionMode transactionMode = TransactionMode.SerializableRw, + CancellationToken cancellationToken = default + ) => _retryPolicyExecutor.ExecuteAsync(async ct => + { + await using var ydbConnection = await OpenConnectionAsync(ct); + await using var transaction = ydbConnection.BeginTransaction(transactionMode); + var result = await func(ydbConnection, ct); + await transaction.CommitAsync(ct); + return result; + }, cancellationToken); + + public Task ExecuteInTransactionAsync( + Func func, + YdbRetryPolicyConfig retryPolicyConfig, + TransactionMode transactionMode = TransactionMode.SerializableRw + ) => GetExecutor(retryPolicyConfig).ExecuteAsync(async cancellationToken => + { + await using var ydbConnection = await OpenConnectionAsync(cancellationToken); + await using var transaction = ydbConnection.BeginTransaction(transactionMode); + await func(ydbConnection); + await transaction.CommitAsync(cancellationToken); + }); + + public Task ExecuteInTransactionAsync( + Func> func, + YdbRetryPolicyConfig retryPolicyConfig, + TransactionMode transactionMode = TransactionMode.SerializableRw + ) => GetExecutor(retryPolicyConfig).ExecuteAsync(async cancellationToken => + { + await using var ydbConnection = await OpenConnectionAsync(cancellationToken); + await using var transaction = ydbConnection.BeginTransaction(transactionMode); + var result = await func(ydbConnection); + await transaction.CommitAsync(cancellationToken); + return result; + }); + + public Task ExecuteInTransactionAsync( + Func func, + YdbRetryPolicyConfig retryPolicyConfig, + TransactionMode transactionMode = TransactionMode.SerializableRw, + CancellationToken cancellationToken = default + ) => GetExecutor(retryPolicyConfig).ExecuteAsync(async ct => + { + await using var ydbConnection = await OpenConnectionAsync(ct); + await using var transaction = ydbConnection.BeginTransaction(transactionMode); + await func(ydbConnection, ct); + await transaction.CommitAsync(ct); + }, cancellationToken); + + public Task ExecuteInTransactionAsync( + Func> func, + YdbRetryPolicyConfig retryPolicyConfig, + TransactionMode transactionMode = TransactionMode.SerializableRw, + CancellationToken cancellationToken = default + ) => GetExecutor(retryPolicyConfig).ExecuteAsync(async ct => + { + await using var ydbConnection = await OpenConnectionAsync(ct); + await using var transaction = ydbConnection.BeginTransaction(transactionMode); + var result = await func(ydbConnection, ct); + await transaction.CommitAsync(ct); + return result; + }, cancellationToken); + + public Task ExecuteInTransactionAsync( + Func func, + IRetryPolicy retryPolicy, + TransactionMode transactionMode = TransactionMode.SerializableRw + ) => new YdbRetryPolicyExecutor(retryPolicy).ExecuteAsync(async ct => + { + await using var ydbConnection = await OpenConnectionAsync(ct); + await using var transaction = ydbConnection.BeginTransaction(transactionMode); + await func(ydbConnection); + await transaction.CommitAsync(ct); + }); + + public Task ExecuteInTransactionAsync( + Func> func, + IRetryPolicy retryPolicy, + TransactionMode transactionMode = TransactionMode.SerializableRw + ) => new YdbRetryPolicyExecutor(retryPolicy).ExecuteAsync(async cancellationToken => + { + await using var ydbConnection = await OpenConnectionAsync(cancellationToken); + await using var transaction = ydbConnection.BeginTransaction(transactionMode); + var result = await func(ydbConnection); + await transaction.CommitAsync(cancellationToken); + return result; + }); + + public Task ExecuteInTransactionAsync( + Func func, + IRetryPolicy retryPolicy, + TransactionMode transactionMode = TransactionMode.SerializableRw, + CancellationToken cancellationToken = default + ) => new YdbRetryPolicyExecutor(retryPolicy).ExecuteAsync(async ct => + { + await using var ydbConnection = await OpenConnectionAsync(ct); + await using var transaction = ydbConnection.BeginTransaction(transactionMode); + await func(ydbConnection, ct); + await transaction.CommitAsync(ct); + }, cancellationToken); + + public Task ExecuteInTransactionAsync( + Func> func, + IRetryPolicy retryPolicy, + TransactionMode transactionMode = TransactionMode.SerializableRw, + CancellationToken cancellationToken = default + ) => new YdbRetryPolicyExecutor(retryPolicy).ExecuteAsync(async ct => + { + await using var ydbConnection = await OpenConnectionAsync(ct); + await using var transaction = ydbConnection.BeginTransaction(transactionMode); + var result = await func(ydbConnection, ct); + await transaction.CommitAsync(ct); + return result; + }, cancellationToken); +} diff --git a/src/Ydb.Sdk/src/Ado/YdbException.cs b/src/Ydb.Sdk/src/Ado/YdbException.cs index aed5f07b..afadbefc 100644 --- a/src/Ydb.Sdk/src/Ado/YdbException.cs +++ b/src/Ydb.Sdk/src/Ado/YdbException.cs @@ -28,14 +28,11 @@ internal YdbException(StatusCode statusCode, string message, Exception? innerExc { Code = statusCode; IsTransient = statusCode.IsTransient(); - IsTransientWhenIdempotent = statusCode.IsTransientWhenIdempotent(); // TODO: Add SQLSTATE message with order with https://en.wikipedia.org/wiki/SQLSTATE } public override bool IsTransient { get; } - public bool IsTransientWhenIdempotent { get; } - public StatusCode Code { get; } } diff --git a/src/Ydb.Sdk/src/Ado/YdbTransaction.cs b/src/Ydb.Sdk/src/Ado/YdbTransaction.cs index 4355c3d3..de22c891 100644 --- a/src/Ydb.Sdk/src/Ado/YdbTransaction.cs +++ b/src/Ydb.Sdk/src/Ado/YdbTransaction.cs @@ -1,13 +1,13 @@ using System.Data; using System.Data.Common; using Ydb.Query; -using Ydb.Sdk.Services.Query; +using Ydb.Sdk.Ado.Transaction; namespace Ydb.Sdk.Ado; public sealed class YdbTransaction : DbTransaction { - private readonly TxMode _txMode; + private readonly TransactionMode _transactionMode; private bool _failed; private YdbConnection? _ydbConnection; @@ -29,13 +29,13 @@ internal bool Failed internal TransactionControl? TransactionControl => Completed ? null : TxId == null - ? new TransactionControl { BeginTx = _txMode.TransactionSettings() } + ? new TransactionControl { BeginTx = _transactionMode.TransactionSettings() } : new TransactionControl { TxId = TxId }; - internal YdbTransaction(YdbConnection ydbConnection, TxMode txMode) + internal YdbTransaction(YdbConnection ydbConnection, TransactionMode transactionMode) { _ydbConnection = ydbConnection; - _txMode = txMode; + _transactionMode = transactionMode; } public override void Commit() => CommitAsync().GetAwaiter().GetResult(); @@ -66,7 +66,7 @@ protected override YdbConnection? DbConnection } } - public override IsolationLevel IsolationLevel => _txMode == TxMode.SerializableRw + public override IsolationLevel IsolationLevel => _transactionMode == TransactionMode.SerializableRw ? IsolationLevel.Serializable : IsolationLevel.Unspecified; diff --git a/src/Ydb.Sdk/src/IDriver.cs b/src/Ydb.Sdk/src/IDriver.cs index 19ae9d08..aaa6533e 100644 --- a/src/Ydb.Sdk/src/IDriver.cs +++ b/src/Ydb.Sdk/src/IDriver.cs @@ -69,7 +69,7 @@ public abstract class BaseDriver : IDriver private int _ownerCount; - protected int Disposed; + protected volatile int Disposed; internal BaseDriver( DriverConfig config, @@ -217,7 +217,7 @@ protected async ValueTask GetCallOptions(GrpcRequestSettings settin public async ValueTask DisposeAsync() { - if (--_ownerCount <= 0 && Interlocked.CompareExchange(ref Disposed, 1, 0) == 0) + if (Interlocked.Decrement(ref _ownerCount) <= 0 && Interlocked.CompareExchange(ref Disposed, 1, 0) == 0) { await ChannelPool.DisposeAsync(); diff --git a/src/Ydb.Sdk/src/Services/Query/QueryClient.cs b/src/Ydb.Sdk/src/Services/Query/QueryClient.cs index 6820ba74..c22f4e67 100644 --- a/src/Ydb.Sdk/src/Services/Query/QueryClient.cs +++ b/src/Ydb.Sdk/src/Services/Query/QueryClient.cs @@ -1,5 +1,6 @@ using System.Collections.Immutable; using Ydb.Sdk.Ado; +using Ydb.Sdk.Ado.Transaction; using Ydb.Sdk.Pool; using Ydb.Sdk.Value; @@ -58,15 +59,15 @@ public QueryClient(IDriver driver, QueryClientConfig? config = null) } public Task Stream(string query, Func> onStream, - Dictionary? parameters = null, TxMode txMode = TxMode.NoTx, + Dictionary? parameters = null, TransactionMode? txMode = null, ExecuteQuerySettings? settings = null) => _sessionPool.ExecOnSession(async session => await onStream(new ExecuteQueryStream( await session.ExecuteQuery(query, parameters ?? new Dictionary(), - settings ?? new GrpcRequestSettings(), txMode.TransactionControl()))) + settings ?? new GrpcRequestSettings(), txMode?.TransactionControl()))) ); public Task Stream(string query, Func onStream, - Dictionary? parameters = null, TxMode txMode = TxMode.NoTx, + Dictionary? parameters = null, TransactionMode? txMode = null, ExecuteQuerySettings? settings = null) => Stream(query, async stream => { @@ -75,7 +76,7 @@ public Task Stream(string query, Func onStream, }, parameters, txMode, settings); public Task> ReadAllRows(string query, - Dictionary? parameters = null, TxMode txMode = TxMode.NoTx, + Dictionary? parameters = null, TransactionMode? txMode = null, ExecuteQuerySettings? settings = null) => Stream>(query, async stream => { @@ -96,7 +97,7 @@ public Task Stream(string query, Func onStream, }, parameters, txMode, settings); public async Task ReadRow(string query, - Dictionary? parameters = null, TxMode txMode = TxMode.NoTx, + Dictionary? parameters = null, TransactionMode? txMode = null, ExecuteQuerySettings? settings = null) { var result = await ReadAllRows(query, parameters, txMode, settings); @@ -105,7 +106,7 @@ public Task Stream(string query, Func onStream, } public async Task Exec(string query, Dictionary? parameters = null, - TxMode txMode = TxMode.NoTx, ExecuteQuerySettings? settings = null) => + TransactionMode? txMode = null, ExecuteQuerySettings? settings = null) => await Stream(query, async stream => { await using var uStream = stream; @@ -113,16 +114,11 @@ await Stream(query, async stream => _ = await stream.MoveNextAsync(); }, parameters, txMode, settings); - public Task DoTx(Func> queryTx, TxMode txMode = TxMode.SerializableRw) - { - if (txMode == TxMode.NoTx) + public Task DoTx(Func> queryTx, + TransactionMode transactionMode = TransactionMode.SerializableRw) => + _sessionPool.ExecOnSession(async session => { - throw new ArgumentException("DoTx requires a txMode other than None"); - } - - return _sessionPool.ExecOnSession(async session => - { - var tx = new QueryTx(session, txMode); + var tx = new QueryTx(session, transactionMode); try { @@ -146,17 +142,17 @@ public Task DoTx(Func> queryTx, TxMode txMode = TxMode.Se await session.Release(); } }); - } private static readonly object None = new(); - public async Task DoTx(Func queryTx, TxMode txMode = TxMode.SerializableRw) => + public async Task DoTx(Func queryTx, + TransactionMode transactionMode = TransactionMode.SerializableRw) => await DoTx(async tx => { await queryTx(tx); return None; - }, txMode); + }, transactionMode); public ValueTask DisposeAsync() { diff --git a/src/Ydb.Sdk/src/Services/Query/QueryTx.cs b/src/Ydb.Sdk/src/Services/Query/QueryTx.cs index 15c72d37..9ee15ca4 100644 --- a/src/Ydb.Sdk/src/Services/Query/QueryTx.cs +++ b/src/Ydb.Sdk/src/Services/Query/QueryTx.cs @@ -1,5 +1,7 @@ using System.Collections.Immutable; using Ydb.Query; +using Ydb.Sdk.Ado; +using Ydb.Sdk.Ado.Transaction; using Ydb.Sdk.Value; namespace Ydb.Sdk.Services.Query; @@ -7,24 +9,24 @@ namespace Ydb.Sdk.Services.Query; public class QueryTx { private readonly Session _session; - private readonly TxMode _txMode; + private readonly TransactionMode _transactionMode; private string? TxId { get; set; } private bool Commited { get; set; } - private TransactionControl? TxControl(bool commit) + private TransactionControl TxControl(bool commit) { Commited = commit | Commited; return TxId == null - ? _txMode.TransactionControl(commit: commit) + ? _transactionMode.TransactionControl(commit: commit) : new TransactionControl { TxId = TxId, CommitTx = commit }; } - internal QueryTx(Session session, TxMode txMode) + internal QueryTx(Session session, TransactionMode transactionMode) { _session = session; - _txMode = txMode; + _transactionMode = transactionMode; } public async ValueTask Stream(string query, Dictionary? parameters = null, diff --git a/src/Ydb.Sdk/src/Services/Query/Settings.cs b/src/Ydb.Sdk/src/Services/Query/Settings.cs index fc9cf6ae..a93659af 100644 --- a/src/Ydb.Sdk/src/Services/Query/Settings.cs +++ b/src/Ydb.Sdk/src/Services/Query/Settings.cs @@ -24,54 +24,6 @@ public class ExecuteQuerySettings : GrpcRequestSettings public bool ConcurrentResultSets { get; set; } } -public enum TxMode -{ - NoTx, - - SerializableRw, - SnapshotRo, - StaleRo, - - OnlineRo, - OnlineInconsistentRo -} - -internal static class TxModeExtensions -{ - private static readonly TransactionSettings SerializableRw = new() - { SerializableReadWrite = new SerializableModeSettings() }; - - private static readonly TransactionSettings SnapshotRo = new() - { SnapshotReadOnly = new SnapshotModeSettings() }; - - private static readonly TransactionSettings StaleRo = new() - { StaleReadOnly = new StaleModeSettings() }; - - private static readonly TransactionSettings OnlineRo = new() - { OnlineReadOnly = new OnlineModeSettings { AllowInconsistentReads = false } }; - - private static readonly TransactionSettings OnlineInconsistentRo = new() - { OnlineReadOnly = new OnlineModeSettings { AllowInconsistentReads = true } }; - - internal static TransactionSettings? TransactionSettings(this TxMode mode) => - mode switch - { - TxMode.SerializableRw => SerializableRw, - TxMode.SnapshotRo => SnapshotRo, - TxMode.StaleRo => StaleRo, - TxMode.OnlineRo => OnlineRo, - TxMode.OnlineInconsistentRo => OnlineInconsistentRo, - _ => null - }; - - internal static TransactionControl? TransactionControl(this TxMode mode, bool commit = true) => - mode switch - { - TxMode.NoTx => null, - _ => new TransactionControl { BeginTx = mode.TransactionSettings(), CommitTx = commit } - }; -} - public class ExecuteQueryPart { public Value.ResultSet? ResultSet { get; } diff --git a/src/Ydb.Sdk/test/Ydb.Sdk.Ado.Tests/RetryPolicy/YdbRetryPolicyTests.cs b/src/Ydb.Sdk/test/Ydb.Sdk.Ado.Tests/RetryPolicy/YdbRetryPolicyTests.cs index 3a38e696..fd411445 100644 --- a/src/Ydb.Sdk/test/Ydb.Sdk.Ado.Tests/RetryPolicy/YdbRetryPolicyTests.cs +++ b/src/Ydb.Sdk/test/Ydb.Sdk.Ado.Tests/RetryPolicy/YdbRetryPolicyTests.cs @@ -12,12 +12,11 @@ public class YdbRetryPolicyTests [InlineData(StatusCode.SessionBusy)] public void GetNextDelay_WhenStatusIsBadSessionOrBusySession_ReturnTimeSpanZero(StatusCode statusCode) { - var ydbRetryPolicy = new YdbRetryPolicy(new YdbRetryPolicyConfig { MaxAttempt = 2 }); + var ydbRetryPolicy = new YdbRetryPolicy(new YdbRetryPolicyConfig { MaxAttempts = 2 }); var ydbException = new YdbException(statusCode, "Mock message"); Assert.Equal(TimeSpan.Zero, ydbRetryPolicy.GetNextDelay(ydbException, 0)); - Assert.Equal(TimeSpan.Zero, ydbRetryPolicy.GetNextDelay(ydbException, 1)); - Assert.Null(ydbRetryPolicy.GetNextDelay(ydbException, 2)); + Assert.Null(ydbRetryPolicy.GetNextDelay(ydbException, 1)); } [Theory] @@ -25,7 +24,7 @@ public void GetNextDelay_WhenStatusIsBadSessionOrBusySession_ReturnTimeSpanZero( [InlineData(StatusCode.Undetermined)] public void GetNextDelay_WhenStatusIsIdempotenceAndDisableIdempotence_ReturnNull(StatusCode statusCode) { - var ydbRetryPolicy = new YdbRetryPolicy(new YdbRetryPolicyConfig { MaxAttempt = 2 }); + var ydbRetryPolicy = new YdbRetryPolicy(new YdbRetryPolicyConfig { MaxAttempts = 2 }); var ydbException = new YdbException(statusCode, "Mock message"); Assert.Null(ydbRetryPolicy.GetNextDelay(ydbException, 0)); diff --git a/src/Ydb.Sdk/test/Ydb.Sdk.Ado.Tests/YdbDataSourceTests.cs b/src/Ydb.Sdk/test/Ydb.Sdk.Ado.Tests/YdbDataSourceTests.cs index f4c468dc..edcc8424 100644 --- a/src/Ydb.Sdk/test/Ydb.Sdk.Ado.Tests/YdbDataSourceTests.cs +++ b/src/Ydb.Sdk/test/Ydb.Sdk.Ado.Tests/YdbDataSourceTests.cs @@ -1,4 +1,5 @@ using Xunit; +using Ydb.Sdk.Ado.RetryPolicy; namespace Ydb.Sdk.Ado.Tests; @@ -59,4 +60,190 @@ public void OpenConnection_FromDataSource_ReturnOpenConnection() using var ydbConnection = _dataSource.OpenConnection(); Assert.Equal(1, new YdbCommand(ydbConnection) { CommandText = "SELECT 1" }.ExecuteScalar()); } + + [Fact] + public async Task ExecuteAsync_WhenBadSession_ThenRetriesUntilSuccess() + { + var attempt = 0; + await _dataSource.ExecuteAsync(_ => + { + if (attempt++ < 3) + { + throw new YdbException(StatusCode.BadSession, "Bad Session"); + } + + return Task.CompletedTask; + }); + } + + [Theory] + [InlineData(StatusCode.Undetermined)] + [InlineData(StatusCode.ClientTransportUnknown)] + [InlineData(StatusCode.ClientTransportUnavailable)] + public async Task ExecuteAsync_WhenIsIdempotenceConfig_ThenRetriesUntilSuccess(StatusCode statusCode) + { + var attempt = 0; + await _dataSource.ExecuteAsync(_ => + { + if (attempt++ < 3) + { + throw new YdbException(statusCode, "Bad Session"); + } + + return Task.CompletedTask; + }, new YdbRetryPolicyConfig { EnableRetryIdempotence = true }); + } + + [Theory] + [InlineData(StatusCode.BadRequest)] + [InlineData(StatusCode.SchemeError)] + [InlineData(StatusCode.NotFound)] + public async Task ExecuteAsync_WhenNonRetryableStatus_ThenThrowsWithoutRetry(StatusCode code) + { + var attempt = 0; + + var ex = await Assert.ThrowsAsync(() => + _dataSource.ExecuteAsync(_ => + { + attempt++; + throw new YdbException(code, "Non-retryable"); + })); + + Assert.Equal(code, ex.Code); + Assert.Equal(1, attempt); + } + + [Fact] + public async Task ExecuteAsync_WhenAlwaysRetryableAndMaxAttemptsReached_ThenThrowsLastException() + { + var attempt = 0; + var config = new YdbRetryPolicyConfig { MaxAttempts = 3 }; // как у вас конфигурируется + + var ex = await Assert.ThrowsAsync(() => _dataSource.ExecuteAsync(_ => + { + attempt++; + throw new YdbException(StatusCode.BadSession, "Still bad"); + }, config)); + + Assert.Equal(3, attempt); + Assert.Equal(StatusCode.BadSession, ex.Code); + } + + [Fact] + public async Task ExecuteAsync_WhenTokenPreCanceled_ThenDoesNotInvokeDelegate() + { + using var cts = new CancellationTokenSource(); + await cts.CancelAsync(); + + var called = false; + await Assert.ThrowsAsync(() => + _dataSource.ExecuteAsync((_, _) => + { + called = true; + return Task.CompletedTask; + }, cts.Token)); + + Assert.False(called); + } + + [Fact] + public async Task ExecuteAsync_WhenUserCodeThrows_ThenDoesNotRetry() + { + var attempt = 0; + await Assert.ThrowsAsync(() => + _dataSource.ExecuteAsync(_ => + { + attempt++; + throw new InvalidOperationException("Bug"); + })); + + Assert.Equal(1, attempt); + } + + [Fact] + public async Task ExecuteAsync_WhenBadSession_ThenCreatesNewSessionPerAttempt() + { + var ydbConnections = new List(); + + var attempt = 0; + await _dataSource.ExecuteAsync(ydbConnection => + { + ydbConnection.OnNotSuccessStatusCode(StatusCode.BadSession); + ydbConnections.Add(ydbConnection); + if (attempt++ < 2) + throw new YdbException(StatusCode.BadSession, "Bad"); + return Task.CompletedTask; + }); + + Assert.Equal(3, attempt); + Assert.Equal(3, ydbConnections.Count); + Assert.True(ydbConnections.Distinct().Count() == ydbConnections.Count); // new one every time + } + + [Fact] + public async Task ExecuteAsync_WhenCancelsBetweenRetries_Throws() + { + using var cts = new CancellationTokenSource(); + var attempt = 0; + + await Assert.ThrowsAnyAsync(async () => + { + await _dataSource.ExecuteAsync(async (_, _) => + { + attempt++; + if (attempt == 1) + { + await cts.CancelAsync(); + throw new YdbException(StatusCode.BadSession, "Bad"); + } + }, cts.Token); + }); + + Assert.Equal(1, attempt); + } + + [Theory] + [InlineData(10)] + [InlineData(20)] + [InlineData(30)] + public async Task ExecuteInTransactionAsync_WhenTLI_ThenRetriesUntilSuccess(int concurrentJob) + { + var tableName = $"Table_TLI_{Random.Shared.Next()}"; + await using (var ydbConnection = await CreateOpenConnectionAsync()) + { + await new YdbCommand(ydbConnection) + { + CommandText = $"CREATE TABLE {tableName} (id Int32, count Int32, PRIMARY KEY (id));" + }.ExecuteNonQueryAsync(); + + await new YdbCommand(ydbConnection) + { CommandText = $"INSERT INTO {tableName} (id, count) VALUES (1, 0);" }.ExecuteNonQueryAsync(); + } + + var tasks = new List(); + for (var i = 0; i < concurrentJob; i++) + { + tasks.Add(_dataSource.ExecuteInTransactionAsync(async ydbConnection => + { + var count = (int)(await new YdbCommand(ydbConnection) + { CommandText = $"SELECT count FROM {tableName} WHERE id = 1" }.ExecuteScalarAsync())!; + + await new YdbCommand(ydbConnection) + { + CommandText = $"UPDATE {tableName} SET count = @count + 1 WHERE id = 1", + Parameters = { new YdbParameter { Value = count, ParameterName = "count" } } + }.ExecuteNonQueryAsync(); + }, new YdbRetryPolicyConfig { FastBackoffBaseMs = 100 })); + } + + await Task.WhenAll(tasks); + + await using (var ydbConnection = await CreateOpenConnectionAsync()) + { + Assert.Equal(concurrentJob, await new YdbCommand(ydbConnection) + { CommandText = $"SELECT count FROM {tableName} WHERE id = 1" }.ExecuteScalarAsync()); + + await new YdbCommand(ydbConnection) { CommandText = $"DROP TABLE {tableName}" }.ExecuteNonQueryAsync(); + } + } } diff --git a/src/Ydb.Sdk/test/Ydb.Sdk.Ado.Tests/YdbSchemaTests.cs b/src/Ydb.Sdk/test/Ydb.Sdk.Ado.Tests/YdbSchemaTests.cs index 6ab593df..17b69d25 100644 --- a/src/Ydb.Sdk/test/Ydb.Sdk.Ado.Tests/YdbSchemaTests.cs +++ b/src/Ydb.Sdk/test/Ydb.Sdk.Ado.Tests/YdbSchemaTests.cs @@ -3,8 +3,8 @@ namespace Ydb.Sdk.Ado.Tests; -[CollectionDefinition("YdbSchemaTests isolation test", DisableParallelization = true)] [Collection("YdbSchemaTests isolation test")] +[CollectionDefinition("YdbSchemaTests isolation test", DisableParallelization = true)] public class YdbSchemaTests : TestBase { private readonly string _table1; diff --git a/src/Ydb.Sdk/test/Ydb.Sdk.Ado.Tests/YdbTransactionTests.cs b/src/Ydb.Sdk/test/Ydb.Sdk.Ado.Tests/YdbTransactionTests.cs index d15ad992..f8ab1df8 100644 --- a/src/Ydb.Sdk/test/Ydb.Sdk.Ado.Tests/YdbTransactionTests.cs +++ b/src/Ydb.Sdk/test/Ydb.Sdk.Ado.Tests/YdbTransactionTests.cs @@ -1,7 +1,6 @@ using System.Data; using Xunit; using Ydb.Sdk.Ado.Tests.Utils; -using Ydb.Sdk.Services.Query; namespace Ydb.Sdk.Ado.Tests; @@ -76,7 +75,7 @@ public void Commit_WhenMakeTwoUpsertOperation_ReturnUpdatedTables() ydbCommand.ExecuteNonQuery(); ydbTransaction.Commit(); - ydbTransaction = connection.BeginTransaction(TxMode.SnapshotRo); + ydbTransaction = connection.BeginTransaction(TransactionMode.SnapshotRo); ydbCommand.Transaction = ydbTransaction; ydbCommand.CommandText = $"SELECT title FROM {Tables.Seasons} WHERE series_id = 2 AND season_id = 6"; var dbDataReader = ydbCommand.ExecuteReader();