Skip to content

Commit d08b115

Browse files
committed
[UPDATE] add beginGracefulShutdown() to stream response usecase.
1 parent 0eee269 commit d08b115

File tree

14 files changed

+36
-99
lines changed

14 files changed

+36
-99
lines changed

Sources/EventStoreDB/EventStoreDB.swift

Lines changed: 0 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -377,23 +377,6 @@ extension EventStoreDBClient {
377377
try await underlyingClient.restartPersistentSubscriptionSubsystem()
378378
}
379379

380-
/// Subscribes to a persistent subscription for the specified stream selector and group name.
381-
///
382-
/// - Parameters:
383-
/// - streamSelector: Identifies the stream or streams to subscribe to.
384-
/// - groupName: The name of the persistent subscription group.
385-
/// - configure: Optional closure to customize read options.
386-
///
387-
/// - Returns: An active persistent subscription for the specified stream and group.
388-
///
389-
/// - Throws: An error if the subscription cannot be established.
390-
@available(*, deprecated)
391-
public func subscribePersistentSubscription(to streamSelector: StreamSelector<StreamIdentifier>, groupName: String, configure: @Sendable (_ options: ReadOptions) -> ReadOptions = { $0 }) async throws -> PersistentSubscriptions<PersistentSubscription.AnyTarget>.Subscription {
392-
let node = try await underlyingClient.selector.select()
393-
let options = configure(.init())
394-
let usecase = PersistentSubscriptions<PersistentSubscription.AnyTarget>.ReadAnyTarget(streamSelector: streamSelector, group: groupName, options: options)
395-
return try await usecase.perform(node: node, callOptions: underlyingClient.defaultCallOptions)
396-
}
397380
}
398381

399382
public struct ReadOptions: EventStoreOptions {

Sources/EventStoreDB/PersistentSubscriptions/ReadAnyTarget.swift

Lines changed: 0 additions & 67 deletions
This file was deleted.

Sources/GRPCEncapsulates/Usecase/UnaryStream.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,5 +8,5 @@
88
import GRPCCore
99

1010
package protocol UnaryStream: Usecase, UnaryRequestBuildable, StreamResponseHandlable {
11-
func send(connection: GRPCClient<Transport>, request: ClientRequest<UnderlyingRequest>, callOptions: CallOptions) async throws -> Responses
11+
func send(connection: GRPCClient<Transport>, request: ClientRequest<UnderlyingRequest>, callOptions: CallOptions, finished: @Sendable @escaping ()->Void) async throws -> Responses
1212
}

Sources/KurrentDB/Core/Additions/Usecase/UnaryStream.swift

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,9 @@ extension UnaryStream where Transport == HTTP2ClientTransport.Posix, Responses =
1919
return try await withRethrowingError(usage: "\(Self.self)\(#function)") {
2020
let metadata = Metadata(from: node.settings)
2121
let request = try request(metadata: metadata)
22-
return try await send(connection: client, request: request, callOptions: callOptions)
22+
return try await send(connection: client, request: request, callOptions: callOptions){
23+
client.beginGracefulShutdown()
24+
}
2325
}
2426
}
2527

@@ -54,7 +56,9 @@ extension UnaryStream where Transport == HTTP2ClientTransport.Posix {
5456

5557
return try await withRethrowingError(usage: "\(Self.self)\(#function)") {
5658
let request = try request(metadata: metadata)
57-
return try await send(connection: client, request: request, callOptions: callOptions)
59+
return try await send(connection: client, request: request, callOptions: callOptions){
60+
client.beginGracefulShutdown()
61+
}
5862
}
5963
}
6064
}

Sources/KurrentDB/Monitoring/Usecase/Monitoring.Stats.swift

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ extension Monitoring {
3030
}
3131
}
3232

33-
package func send(connection: GRPCClient<Transport>, request: ClientRequest<UnderlyingRequest>, callOptions: CallOptions) async throws -> Responses {
33+
package func send(connection: GRPCClient<Transport>, request: ClientRequest<UnderlyingRequest>, callOptions: CallOptions, finished: @Sendable @escaping ()->Void) async throws -> Responses {
3434
let (stream, continuation) = AsyncThrowingStream.makeStream(of: Response.self)
3535
Task {
3636
let client = ServiceClient(wrapping: connection)
@@ -39,6 +39,7 @@ extension Monitoring {
3939
try continuation.yield(.init(from: message))
4040
}
4141
continuation.finish()
42+
finished()
4243
}
4344
}
4445
return stream

Sources/KurrentDB/PersistentSubscriptions/PersistentSubscriptions.Subscription.swift

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ extension PersistentSubscriptions {
2424

2525
/// The writer responsible for sending requests to the subscription service.
2626
let writer: Writer
27-
27+
2828
/// The unique identifier of the subscription, if available.
2929
///
3030
/// This is set during initialization based on the first response from the server.
@@ -39,7 +39,7 @@ extension PersistentSubscriptions {
3939
/// - writer: The `Writer` instance used to send requests. Defaults to a new `Writer`.
4040
/// - reader: An asynchronous stream of responses from the subscription service.
4141
/// - Throws: An error if the initialization process fails, such as when the response stream cannot be processed.
42-
package init(requests writer: Writer = .init(), responses reader: AsyncThrowingStream<PersistentSubscriptions.ReadResponse, any Error>) async throws {
42+
package init(requests writer: Writer = .init(), responses reader: AsyncThrowingStream<PersistentSubscriptions.ReadResponse, any Error>, onTermination: @Sendable @escaping ()->Void) async throws {
4343
self.writer = writer
4444

4545
var iterator = reader.makeAsyncIterator()
@@ -50,6 +50,9 @@ extension PersistentSubscriptions {
5050
}
5151

5252
let (stream, continuation) = AsyncThrowingStream.makeStream(of: PersistentSubscription.EventResult.self)
53+
continuation.onTermination = { _ in
54+
onTermination()
55+
}
5356
Task {
5457
while let response = try await iterator.next() {
5558
if case let .readEvent(event, retryCount) = response {

Sources/KurrentDB/PersistentSubscriptions/Usecase/AllStream/PersistentSubscriptions.AllStream.Read.swift

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,9 @@ extension PersistentSubscriptions.AllStream {
6363
}
6464
}
6565
}
66-
return try await .init(requests: writer, responses: responses.stream)
66+
return try await .init(requests: writer, responses: responses.stream){
67+
connection.beginGracefulShutdown()
68+
}
6769
}
6870
}
6971
}

Sources/KurrentDB/PersistentSubscriptions/Usecase/Specified/PersistentSubscriptions.SpecifiedStream.Read.swift

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,9 @@ extension PersistentSubscriptions.SpecifiedStream {
6363
}
6464
}
6565
}
66-
return try await .init(requests: writer, responses: responses.stream)
66+
return try await .init(requests: writer, responses: responses.stream){
67+
connection.beginGracefulShutdown()
68+
}
6769
}
6870
}
6971
}

Sources/KurrentDB/Projections/Usecase/Projections.Statistics.swift

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ extension Projections {
4242
}
4343
}
4444

45-
package func send(connection: GRPCClient<Transport>, request: ClientRequest<UnderlyingRequest>, callOptions: CallOptions) async throws -> Responses {
45+
package func send(connection: GRPCClient<Transport>, request: ClientRequest<UnderlyingRequest>, callOptions: CallOptions, finished: @Sendable @escaping ()->Void) async throws -> Responses {
4646
try await withThrowingTaskGroup(of: Void.self) { _ in
4747
let (stream, continuation) = AsyncThrowingStream.makeStream(of: Response.self)
4848
let client = ServiceClient(wrapping: connection)
@@ -51,8 +51,8 @@ extension Projections {
5151
try continuation.yield(handle(message: message))
5252
}
5353
continuation.finish()
54+
finished()
5455
}
55-
continuation.finish()
5656
return stream
5757
}
5858
}

Sources/KurrentDB/Streams/Usecase/All/Streams.ReadAll.swift

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ extension Streams where Target == AllStreams {
2828
}
2929
}
3030

31-
package func send(connection: GRPCClient<Transport>, request: ClientRequest<UnderlyingRequest>, callOptions: CallOptions) async throws -> Responses {
31+
package func send(connection: GRPCClient<Transport>, request: ClientRequest<UnderlyingRequest>, callOptions: CallOptions, finished: @Sendable @escaping ()->Void) async throws -> Responses {
3232
try await withThrowingTaskGroup(of: Void.self) { _ in
3333
let client = ServiceClient(wrapping: connection)
3434
let (stream, continuation) = AsyncThrowingStream.makeStream(of: Response.self)
@@ -37,6 +37,7 @@ extension Streams where Target == AllStreams {
3737
try continuation.yield(handle(message: message))
3838
}
3939
continuation.finish()
40+
finished()
4041
}
4142
return stream
4243
}

0 commit comments

Comments
 (0)