Skip to content

Commit 5d1257b

Browse files
committed
[UPDATE] modify create and update api fro persistent-subscriptions
1 parent 49ec43a commit 5d1257b

8 files changed

+157
-114
lines changed

Sources/KurrentDB/Core/PersistenSubscription/PersistentSubscription.Settings.swift

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,5 +65,8 @@ extension PersistentSubscription {
6565
self.checkpointAfter = checkpointAfter
6666
self.consumerStrategy = consumerStrategy
6767
}
68+
69+
70+
6871
}
6972
}

Sources/KurrentDB/PersistentSubscriptions/Additions/EventStore_Client_PersistentSubscriptions+Additions.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ extension EventStore_Client_PersistentSubscriptions_CreateReq.Settings {
4040
}
4141

4242
extension EventStore_Client_PersistentSubscriptions_UpdateReq.Settings {
43-
package static func make(settings: PersistentSubscription.Settings) -> Self {
43+
package static func from(settings: PersistentSubscription.Settings) -> Self {
4444
.with {
4545
$0.resolveLinks = settings.resolveLink
4646
$0.extraStatistics = settings.extraStatistics

Sources/KurrentDB/PersistentSubscriptions/PersistentSubscriptions.swift

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -109,8 +109,8 @@ extension PersistentSubscriptions where Target == PersistentSubscription.All {
109109
/// - Parameters:
110110
/// - options: Configuration options for creating the subscription, defaulting to an empty configuration.
111111
/// - Throws: An error if the creation fails.
112-
public func create(options: CreateToAll.Options = .init()) async throws(KurrentError) {
113-
let usecase: CreateToAll = .init(group: group, options: options)
112+
public func create(startFrom cursor: PositionCursor = .end, options: CreateToAll.Options = .init()) async throws(KurrentError) {
113+
let usecase = CreateToAll(group: group, cursor: cursor, options: options)
114114
_ = try await usecase.perform(settings: clientSettings, callOptions: callOptions)
115115
}
116116

@@ -119,8 +119,8 @@ extension PersistentSubscriptions where Target == PersistentSubscription.All {
119119
/// - Parameters:
120120
/// - options: Configuration options for updating the subscription, defaulting to an empty configuration.
121121
/// - Throws: An error if the update fails.
122-
public func update(options: UpdateToAll.Options = .init()) async throws(KurrentError) {
123-
let usecase = UpdateToAll(group: group, options: options)
122+
public func update(startFrom cursor: PositionCursor = .end, options: UpdateToAll.Options = .init()) async throws(KurrentError) {
123+
let usecase = UpdateToAll(group: group, cursor: cursor, options: options)
124124
_ = try await usecase.perform(settings: clientSettings, callOptions: callOptions)
125125
}
126126

@@ -185,8 +185,8 @@ extension PersistentSubscriptions where Target == PersistentSubscription.Specifi
185185
/// - Parameters:
186186
/// - options: Configuration options for creating the subscription, defaulting to an empty configuration.
187187
/// - Throws: An error if the creation fails.
188-
public func create(options: CreateToStream.Options = .init()) async throws(KurrentError) {
189-
let usecase: CreateToStream = .init(streamIdentifier: streamIdentifier, group: group, options: options)
188+
public func create(startFrom cursor: RevisionCursor = .end, options: CreateToStream.Options = .init()) async throws(KurrentError) {
189+
let usecase = CreateToStream(streamIdentifier: streamIdentifier, group: group, cursor: cursor, options: options)
190190
_ = try await usecase.perform(settings: clientSettings, callOptions: callOptions)
191191
}
192192

@@ -195,8 +195,8 @@ extension PersistentSubscriptions where Target == PersistentSubscription.Specifi
195195
/// - Parameters:
196196
/// - options: Configuration options for updating the subscription, defaulting to an empty configuration.
197197
/// - Throws: An error if the update fails.
198-
public func update(settings: PersistentSubscription.Settings, from cursor: RevisionCursor = .end) async throws(KurrentError) {
199-
let usecase = UpdateToStream(streamIdentifier: streamIdentifier, group: group, options: .init(settings: settings).startFrom(cursor: cursor))
198+
public func update(startFrom cursor: RevisionCursor = .end, options: UpdateToStream.Options = .init()) async throws(KurrentError) {
199+
let usecase = UpdateToStream(streamIdentifier: streamIdentifier, group: group, cursor: cursor, options: options)
200200
_ = try await usecase.perform(settings: clientSettings, callOptions: callOptions)
201201
}
202202

Sources/KurrentDB/PersistentSubscriptions/PersistentSubscriptionsCommonOptions.swift

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,76 +14,147 @@ protocol PersistentSubscriptionsCommonOptions: EventStoreOptions {
1414

1515
extension PersistentSubscriptionsCommonOptions {
1616
@discardableResult
17+
public func resolveLink() -> Self {
18+
withCopy { $0.settings.resolveLink = true }
19+
}
20+
21+
@discardableResult
22+
public func extraStatistics() -> Self {
23+
withCopy { $0.settings.extraStatistics = true }
24+
}
25+
26+
@discardableResult
27+
public func messageTimeout(_ value: TimeSpan) -> Self {
28+
withCopy { $0.settings.messageTimeout = value }
29+
}
30+
31+
@discardableResult
32+
public func maxRetryCount(_ value: Int32) -> Self {
33+
withCopy { $0.settings.maxRetryCount = value }
34+
}
35+
36+
@discardableResult
37+
public func checkpoint(count value: ClosedRange<Int32>) -> Self {
38+
withCopy { $0.settings.checkpointCount = value }
39+
}
40+
41+
@discardableResult
42+
public func checkpoint(after value: TimeSpan) -> Self {
43+
withCopy { $0.settings.checkpointAfter = value }
44+
}
45+
46+
@discardableResult
47+
public func maxSubscriberCount(_ value: Int32) -> Self {
48+
withCopy { $0.settings.maxSubscriberCount = value }
49+
}
50+
51+
@discardableResult
52+
public func liveBufferSize(_ value: Int32) -> Self {
53+
withCopy { $0.settings.liveBufferSize = value }
54+
}
55+
56+
@discardableResult
57+
public func readBatchSize(_ value: Int32) -> Self {
58+
withCopy { $0.settings.readBatchSize = value }
59+
}
60+
61+
@discardableResult
62+
public func historyBufferSize(_ value: Int32) -> Self {
63+
withCopy { $0.settings.historyBufferSize = value }
64+
}
65+
66+
67+
68+
@discardableResult
69+
public func consumerStrategy(_ value: PersistentSubscription.SystemConsumerStrategy) -> Self {
70+
withCopy { $0.settings.consumerStrategy = value }
71+
}
72+
}
73+
74+
// MARK: - Deprecated
75+
extension PersistentSubscriptionsCommonOptions {
76+
@discardableResult
77+
@available(*, deprecated)
1778
public mutating func set(resolveLinks: Bool) -> Self {
1879
withCopy { copied in
1980
copied.settings.resolveLink = resolveLinks
2081
}
2182
}
2283

2384
@discardableResult
85+
@available(*, deprecated)
2486
public mutating func set(extraStatistics: Bool) -> Self {
2587
withCopy { copied in
2688
copied.settings.extraStatistics = extraStatistics
2789
}
2890
}
2991

3092
@discardableResult
93+
@available(*, deprecated)
3194
public mutating func set(maxRetryCount: Int32) -> Self {
3295
withCopy { copied in
3396
copied.settings.maxRetryCount = maxRetryCount
3497
}
3598
}
3699

37100
@discardableResult
101+
@available(*, deprecated)
38102
public mutating func set(minCheckpointCount: Int32) -> Self {
39103
withCopy { copied in
40104
copied.settings.checkpointCount = minCheckpointCount ... settings.checkpointCount.upperBound
41105
}
42106
}
43107

44108
@discardableResult
109+
@available(*, deprecated)
45110
public mutating func set(maxCheckpointCount: Int32) -> Self {
46111
withCopy { copied in
47112
copied.settings.checkpointCount = settings.checkpointCount.lowerBound ... maxCheckpointCount
48113
}
49114
}
50115

51116
@discardableResult
117+
@available(*, deprecated)
52118
public mutating func set(maxSubscriberCount: Int32) -> Self {
53119
withCopy { copied in
54120
copied.settings.maxSubscriberCount = maxSubscriberCount
55121
}
56122
}
57123

58124
@discardableResult
125+
@available(*, deprecated)
59126
public mutating func set(liveBufferSize: Int32) -> Self {
60127
withCopy { copied in
61128
copied.settings.liveBufferSize = liveBufferSize
62129
}
63130
}
64131

65132
@discardableResult
133+
@available(*, deprecated)
66134
public mutating func set(readBatchSize: Int32) -> Self {
67135
withCopy { copied in
68136
copied.settings.readBatchSize = readBatchSize
69137
}
70138
}
71139

72140
@discardableResult
141+
@available(*, deprecated)
73142
public mutating func set(historyBufferSize: Int32) -> Self {
74143
withCopy { copied in
75144
copied.settings.historyBufferSize = historyBufferSize
76145
}
77146
}
78147

79148
@discardableResult
149+
@available(*, deprecated)
80150
public mutating func set(messageTimeout timeout: TimeSpan) -> Self {
81151
withCopy { copied in
82152
copied.settings.messageTimeout = timeout
83153
}
84154
}
85155

86156
@discardableResult
157+
@available(*, deprecated)
87158
public mutating func setCheckpoint(after span: TimeSpan) -> Self {
88159
withCopy { copied in
89160
copied.settings.checkpointAfter = span

Sources/KurrentDB/PersistentSubscriptions/Usecase/PersistentSubscriptions.CreateToAll.swift

Lines changed: 16 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -16,17 +16,30 @@ extension PersistentSubscriptions {
1616
package typealias Response = DiscardedResponse<UnderlyingResponse>
1717

1818
let group: String
19+
let cursor: PositionCursor
1920
let options: Options
2021

21-
public init(group: String, options: Options) {
22+
public init(group: String, cursor: PositionCursor, options: Options) {
2223
self.group = group
24+
self.cursor = cursor
2325
self.options = options
2426
}
2527

2628
package func requestMessage() throws -> UnderlyingRequest {
2729
.with {
2830
$0.options = options.build()
2931
$0.options.groupName = group
32+
switch cursor {
33+
case .start:
34+
$0.options.all.start = .init()
35+
case .end:
36+
$0.options.all.end = .init()
37+
case let .position(commitPosition, preparePosition):
38+
$0.options.all.position = .with {
39+
$0.commitPosition = commitPosition
40+
$0.preparePosition = preparePosition
41+
}
42+
}
3043
}
3144
}
3245

@@ -53,33 +66,13 @@ extension PersistentSubscriptions.CreateToAll {
5366
}
5467

5568
@discardableResult
56-
public func startFrom(position: PositionCursor) -> Self {
57-
withCopy { options in
58-
options.cursor = position
59-
}
60-
}
61-
62-
@discardableResult
63-
public mutating func set(consumerStrategy: PersistentSubscription.SystemConsumerStrategy) -> Self {
64-
withCopy { options in
65-
options.settings.consumerStrategy = consumerStrategy
66-
}
69+
public func filter(_ value: SubscriptionFilter) -> Self {
70+
withCopy { $0.filter = filter }
6771
}
6872

6973
package func build() -> UnderlyingMessage {
7074
.with {
7175
$0.settings = .make(settings: settings)
72-
switch cursor {
73-
case .start:
74-
$0.all.start = .init()
75-
case .end:
76-
$0.all.end = .init()
77-
case let .position(commitPosition, preparePosition):
78-
$0.all.position = .with {
79-
$0.commitPosition = commitPosition
80-
$0.preparePosition = preparePosition
81-
}
82-
}
8376

8477
if let filter {
8578
$0.all.filter = .make(with: filter)

Sources/KurrentDB/PersistentSubscriptions/Usecase/PersistentSubscriptions.CreateToStream.swift

Lines changed: 16 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -15,20 +15,32 @@ extension PersistentSubscriptions {
1515
package typealias Response = DiscardedResponse<UnderlyingResponse>
1616

1717
var streamIdentifier: StreamIdentifier
18+
var cursor: RevisionCursor
1819
var group: String
1920
var options: Options
2021

21-
public init(streamIdentifier: StreamIdentifier, group: String, options: Options) {
22+
public init(streamIdentifier: StreamIdentifier, group: String, cursor: RevisionCursor, options: Options) {
2223
self.streamIdentifier = streamIdentifier
2324
self.group = group
25+
self.cursor = cursor
2426
self.options = options
2527
}
2628

2729
package func requestMessage() throws -> UnderlyingRequest {
2830
try .with {
2931
$0.options = options.build()
30-
$0.options.groupName = group
32+
3133
$0.options.stream.streamIdentifier = try streamIdentifier.build()
34+
$0.options.groupName = group
35+
36+
switch cursor {
37+
case .start:
38+
$0.options.stream.start = .init()
39+
case .end:
40+
$0.options.stream.end = .init()
41+
case let .revision(revision):
42+
$0.options.stream.revision = revision
43+
}
3244
}
3345
}
3446

@@ -45,39 +57,14 @@ extension PersistentSubscriptions.CreateToStream {
4557
package typealias UnderlyingMessage = UnderlyingRequest.Options
4658

4759
public var settings: PersistentSubscription.Settings
48-
public var cursor: RevisionCursor
4960

50-
public init(settings: PersistentSubscription.Settings = .init(), from cursor: RevisionCursor = .end) {
51-
self.settings = settings
52-
self.cursor = cursor
53-
}
54-
55-
@discardableResult
56-
public func startFrom(_ cursor: RevisionCursor) -> Self {
57-
withCopy { options in
58-
options.cursor = cursor
59-
}
60-
}
61-
62-
@discardableResult
63-
public mutating func set(consumerStrategy: PersistentSubscription.SystemConsumerStrategy) -> Self {
64-
withCopy { options in
65-
options.settings.consumerStrategy = consumerStrategy
66-
}
61+
public init() {
62+
self.settings = .init()
6763
}
6864

6965
package func build() -> UnderlyingMessage {
7066
.with {
7167
$0.settings = .make(settings: settings)
72-
73-
switch cursor {
74-
case .start:
75-
$0.stream.start = .init()
76-
case .end:
77-
$0.stream.end = .init()
78-
case let .revision(revision):
79-
$0.stream.revision = revision
80-
}
8168
}
8269
}
8370
}

0 commit comments

Comments
 (0)