Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
55 commits
Select commit Hold shift + click to select a range
a0927a4
feat: add EnableImplicitSession flag support with parsing tests
LiamHamsters Aug 12, 2025
f18fbf7
feat: add EnableImplicitSession flag support with parsing tests
LiamHamsters Aug 12, 2025
5c7cfc9
resolve conflict
LiamHamsters Aug 12, 2025
5c9ec84
Merge remote-tracking branch 'origin/add-implicit-session-flag' into …
LiamHamsters Aug 12, 2025
eab1450
fix ci
LiamHamsters Aug 12, 2025
72c53f3
feat: add integration tests and rework implicit session handling via …
LiamHamsters Aug 14, 2025
7c35439
`Warning` -> `Debug` on DeleteSession has been `RpcException` & fixes…
KirillKurdyukov Aug 13, 2025
4dfa0bd
Feat: Implement `YdbRetryPolicy` with AWS-inspired Exponential Backof…
KirillKurdyukov Aug 18, 2025
c44750b
feat: added support new datetime types (#517)
KirillKurdyukov Aug 29, 2025
a19f8bb
feat: enforce DECIMAL(p,s) overflow in parameters (#519)
KirillKurdyukov Aug 29, 2025
4ac1c01
Feat ADO.NET: cache gRPC transport by `gRPCConnectionString` to reuse…
KirillKurdyukov Sep 1, 2025
f7da918
feat: add EnableImplicitSession flag support with parsing tests
LiamHamsters Aug 12, 2025
812a077
feat: add EnableImplicitSession flag support with parsing tests
LiamHamsters Aug 12, 2025
0df1155
Merge branch 'main' into add-implicit-session-flag
LiamHamsters Sep 2, 2025
74eac7d
feat: update ImplicitSession for singleton driver
LiamHamsters Sep 3, 2025
c57c3d6
try fix DisableParallelization in PMTests and autoformat
LiamHamsters Sep 3, 2025
a5154b5
fix lint
LiamHamsters Sep 3, 2025
3addeae
fix: keep single command session; make ImplicitSessionSource dispose-…
LiamHamsters Sep 3, 2025
0c53efb
Move implicit session creation to PoolManager by flag
LiamHamsters Sep 4, 2025
46b9ca3
test: validate implicit session disallows transactions but supports n…
LiamHamsters Sep 4, 2025
c6d6a59
fix lint
LiamHamsters Sep 4, 2025
741e28b
feat(ado): add ImplicitSession with PoolManager integration and stres…
LiamHamsters Sep 12, 2025
4253895
Merge branch 'main' into add-implicit-session-flag
LiamHamsters Sep 12, 2025
09dd895
fix lint
LiamHamsters Sep 12, 2025
b8db711
fix
LiamHamsters Sep 12, 2025
fad0f85
hot fix
LiamHamsters Sep 12, 2025
1c2d3ba
hot fix
LiamHamsters Sep 12, 2025
04050d6
feat: add owner registration and dispose logic for implicit sessions
LiamHamsters Sep 12, 2025
274a0e3
refactor(ado): remove onEmpty callback from ImplicitSessionSource
LiamHamsters Sep 12, 2025
80ac532
hot fix
LiamHamsters Sep 12, 2025
275d874
refactor
LiamHamsters Sep 12, 2025
04a31a6
delete onEmpty in stressTest
LiamHamsters Sep 12, 2025
f0c9bff
Refactored implicit session handling and stress tests
LiamHamsters Sep 15, 2025
b8feb70
Merge branch 'main' into add-implicit-session-flag
LiamHamsters Sep 15, 2025
4075173
feat: add EnableImplicitSession flag support with parsing tests
LiamHamsters Aug 12, 2025
bdfdd11
fix ci
LiamHamsters Aug 12, 2025
eb6fcb8
feat: add integration tests and rework implicit session handling via …
LiamHamsters Aug 14, 2025
991a412
feat: update ImplicitSession for singleton driver
LiamHamsters Sep 3, 2025
04f41b0
test
LiamHamsters Sep 16, 2025
ed0de42
test
LiamHamsters Sep 17, 2025
f6c2ff3
Merge branch 'main' into add-implicit-session-flag
LiamHamsters Sep 17, 2025
ef47d68
test
LiamHamsters Sep 17, 2025
0d16841
Revert "test"
LiamHamsters Sep 17, 2025
d058f92
test
LiamHamsters Sep 17, 2025
c4158c2
test
LiamHamsters Sep 17, 2025
622b48e
test
LiamHamsters Sep 17, 2025
e748a85
fix lint
LiamHamsters Sep 17, 2025
08e6bc2
tests: add double OpenSession check in stress tests;
LiamHamsters Sep 18, 2025
448244f
try fix
LiamHamsters Sep 18, 2025
e001a79
test
LiamHamsters Sep 19, 2025
ad4f8c2
Merge branch 'main' into add-implicit-session-flag
LiamHamsters Sep 23, 2025
ff61b60
made dispose two-phase
LiamHamsters Sep 24, 2025
dd0ba06
Merge remote-tracking branch 'origin/add-implicit-session-flag' into …
LiamHamsters Sep 24, 2025
6852546
tried to make a “two-phase”
LiamHamsters Sep 24, 2025
401e7d2
try
LiamHamsters Sep 24, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/Ydb.Sdk/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
- Added provider support for implicit sessions.

## v0.23.1

- Fixed bug Topic Reader: NullReferenceException when handling StopPartitionSessionRequest ([#528](https://github.com/ydb-platform/ydb-dotnet-sdk/issues/528)).
Expand Down
14 changes: 12 additions & 2 deletions src/Ydb.Sdk/src/Ado/PoolManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,20 @@ CancellationToken cancellationToken
!cacheDriver.IsDisposed
? cacheDriver
: Drivers[settings.GrpcConnectionString] = await settings.BuildDriver();

driver.RegisterOwner();

var factory = new PoolingSessionFactory(driver, settings);
var newSessionPool = new PoolingSessionSource<PoolingSession>(factory, settings);
ISessionSource newSessionPool;

if (settings.MaxSessionPool > 0)
{
var factory = new PoolingSessionFactory(driver, settings);
newSessionPool = new PoolingSessionSource<PoolingSession>(factory, settings);
}
else
{
newSessionPool = new ImplicitSessionSource(driver);
}

Pools[settings.ConnectionString] = newSessionPool;

Expand Down
9 changes: 5 additions & 4 deletions src/Ydb.Sdk/src/Ado/Session/ImplicitSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,12 @@ namespace Ydb.Sdk.Ado.Session;

internal class ImplicitSession : ISession
{
public ImplicitSession(IDriver driver)
private readonly ImplicitSessionSource _source;

public ImplicitSession(IDriver driver, ImplicitSessionSource source)
{
Driver = driver;
_source = source;
}

public IDriver Driver { get; }
Expand Down Expand Up @@ -47,9 +50,7 @@ public void OnNotSuccessStatusCode(StatusCode code)
{
}

public void Dispose()
{
}
public void Dispose() => _source.ReleaseLease();

private static YdbException NotSupportedTransaction => new("Transactions are not supported in implicit sessions");
}
71 changes: 71 additions & 0 deletions src/Ydb.Sdk/src/Ado/Session/ImplicitSessionSource.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
namespace Ydb.Sdk.Ado.Session;

internal sealed class ImplicitSessionSource : ISessionSource
{
private readonly IDriver _driver;
private readonly ManualResetEventSlim _allReleased = new(false);

private int _state;
private int _activeLeaseCount;

internal ImplicitSessionSource(IDriver driver)
{
_driver = driver;
}

public ValueTask<ISession> OpenSession(CancellationToken cancellationToken)
{
cancellationToken.ThrowIfCancellationRequested();

if (!TryAcquireLease())
throw new ObjectDisposedException(nameof(ImplicitSessionSource));

return new ValueTask<ISession>(new ImplicitSession(_driver, this));
}

private bool TryAcquireLease()
{
if (Volatile.Read(ref _state) == 2)
return false;

var newCount = Interlocked.Increment(ref _activeLeaseCount);

var state = Volatile.Read(ref _state);

if (state == 2 || (state == 1 && newCount == 1))
{
Interlocked.Decrement(ref _activeLeaseCount);
return false;
}

return true;
}

internal void ReleaseLease()
{
if (Interlocked.Decrement(ref _activeLeaseCount) == 0 &&
Volatile.Read(ref _state) != 0)
{
_allReleased.Set();
}
}

public async ValueTask DisposeAsync()
{
if (Interlocked.CompareExchange(ref _state, 1, 0) != 0)
return;

if (Volatile.Read(ref _activeLeaseCount) != 0)
_allReleased.Wait();

try
{
Volatile.Write(ref _state, 2);
await _driver.DisposeAsync();
}
finally
{
_allReleased.Dispose();
}
}
}
16 changes: 16 additions & 0 deletions src/Ydb.Sdk/src/Ado/YdbConnectionStringBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ private void InitDefaultValues()
_maxReceiveMessageSize = GrpcDefaultSettings.MaxReceiveMessageSize;
_disableDiscovery = GrpcDefaultSettings.DisableDiscovery;
_disableServerBalancer = false;
_enableImplicitSession = false;
}

public string Host
Expand Down Expand Up @@ -314,6 +315,18 @@ public int CreateSessionTimeout

private int _createSessionTimeout;

public bool EnableImplicitSession
{
get => _enableImplicitSession;
set
{
_enableImplicitSession = value;
SaveValue(nameof(EnableImplicitSession), value);
}
}

private bool _enableImplicitSession;

public ILoggerFactory LoggerFactory { get; init; } = NullLoggerFactory.Instance;

public ICredentialsProvider? CredentialsProvider { get; init; }
Expand Down Expand Up @@ -495,6 +508,9 @@ static YdbConnectionOption()
AddOption(new YdbConnectionOption<bool>(BoolExtractor,
(builder, disableServerBalancer) => builder.DisableServerBalancer = disableServerBalancer),
"DisableServerBalancer", "Disable Server Balancer");
AddOption(new YdbConnectionOption<bool>(BoolExtractor,
(builder, enableImplicitSession) => builder.EnableImplicitSession = enableImplicitSession),
"EnableImplicitSession", "ImplicitSession");
}

private static void AddOption(YdbConnectionOption option, params string[] keys)
Expand Down
154 changes: 154 additions & 0 deletions src/Ydb.Sdk/test/Ydb.Sdk.Ado.Tests/Session/YdbImplicitStressTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
using Moq;
using Xunit;
using Ydb.Sdk.Ado.Session;

namespace Ydb.Sdk.Ado.Tests.Session;

public class YdbImplicitStressTests : TestBase
{
private static IDriver DummyDriver()
{
var m = new Mock<IDriver>(MockBehavior.Loose);
m.Setup(d => d.DisposeAsync()).Returns(ValueTask.CompletedTask);
return m.Object;
}

private sealed class Counter
{
public int Value;
public void Inc() => Interlocked.Increment(ref Value);
}

[Fact(Timeout = 30_000)]
public async Task Dispose_WaitsForAllLeases_AndSignalsOnEmptyExactlyOnce()
{
var driver = DummyDriver();

var opened = new Counter();
var closed = new Counter();

var source = new ImplicitSessionSource(driver);

var workers = Enumerable.Range(0, 200).Select(async _ =>
{
var rnd = Random.Shared;
for (var j = 0; j < 10; j++)
{
ISession s;
try
{
s = await source.OpenSession(CancellationToken.None);
opened.Inc();

await Task.Delay(rnd.Next(0, 5));
}
catch (ObjectDisposedException)
{
return;
}

var s2 = await source.OpenSession(CancellationToken.None);
s2.Dispose();

s.Dispose();
closed.Inc();
}
}).ToArray();

var disposer = Task.Run(async () =>
{
await Task.Delay(10);
await source.DisposeAsync();
});

await Task.WhenAll(workers.Append(disposer));

Assert.True(opened.Value > 0);
Assert.Equal(opened.Value, closed.Value);

await Assert.ThrowsAsync<ObjectDisposedException>(() => source.OpenSession(CancellationToken.None).AsTask());
}

[Fact(Timeout = 30_000)]
public async Task Stress_Counts_AreBalanced()
{
var driver = DummyDriver();

var opened = new Counter();
var closed = new Counter();

var source = new ImplicitSessionSource(driver);

var workers = Enumerable.Range(0, 200).Select(async _ =>
{
var rnd = Random.Shared;
for (var j = 0; j < 10; j++)
{
ISession s;
try
{
s = await source.OpenSession(CancellationToken.None);
opened.Inc();

await Task.Delay(rnd.Next(0, 3));
}
catch (ObjectDisposedException)
{
return;
}

var s2 = await source.OpenSession(CancellationToken.None);
s2.Dispose();

s.Dispose();
closed.Inc();
}
}).ToArray();

var disposer = Task.Run(async () => await source.DisposeAsync());

await Task.WhenAll(workers.Append(disposer));

Assert.Equal(opened.Value, closed.Value);
Assert.True(opened.Value > 0);

await Assert.ThrowsAsync<ObjectDisposedException>(() => source.OpenSession(CancellationToken.None).AsTask());
}

[Fact(Timeout = 30_000)]
public async Task Open_RacingWithDispose_StateRemainsConsistent()
{
var driver = DummyDriver();

var source = new ImplicitSessionSource(driver);

var opens = Enumerable.Range(0, 1000).Select(async _ =>
{
ISession s;
try
{
s = await source.OpenSession(CancellationToken.None);
}
catch (ObjectDisposedException)
{
return 0;
}

var s2 = await source.OpenSession(CancellationToken.None);
s2.Dispose();

s.Dispose();
return 1;
}).ToArray();

var disposeTask = Task.Run(async () =>
{
await Task.Yield();
await source.DisposeAsync();
});

await Task.WhenAll(opens.Append(disposeTask));

await Assert.ThrowsAsync<ObjectDisposedException>(() => source.OpenSession(CancellationToken.None).AsTask());
}
}
Loading
Loading