From f3ce14ed34c798ccd180c54d502bc81385b73566 Mon Sep 17 00:00:00 2001 From: Eric Raio Date: Sat, 13 Sep 2025 08:04:27 -0700 Subject: [PATCH] Add serial consistency configuration for LWT operations Motivation: The swift-cassandra-client didn't expose serial consistency configuration needed for Lightweight Transactions, despite the C++ driver supporting it via cass_cluster_set_serial_consistency. Modifications: - Added serialConsistency field to Configuration struct - Added setSerialConsistency method to Cluster class - Applied serial consistency in makeCluster method - Added testSerialConsistency unit test Result: Users can now set configuration.serialConsistency = .serial or .localSerial for proper LWT consistency guarantees. --- Sources/CassandraClient/Configuration.swift | 11 +++ Sources/CassandraClient/Consistency.swift | 15 +++ .../CassandraClient/Data+PaginatedRows.swift | 2 +- .../CassandraClientTests.swift | 98 +++++++++++++++++++ 4 files changed, 125 insertions(+), 1 deletion(-) diff --git a/Sources/CassandraClient/Configuration.swift b/Sources/CassandraClient/Configuration.swift index e2b06ca..b9a8aea 100644 --- a/Sources/CassandraClient/Configuration.swift +++ b/Sources/CassandraClient/Configuration.swift @@ -51,6 +51,10 @@ extension CassandraClient { /// Sets the cluster's consistency level. Default is `.localOne`. public var consistency: CassandraClient.Consistency? + /// Sets the cluster's serial consistency level for LWT operations. + /// Default is `.serial`. + public var serialConsistency: CassandraClient.SerialConsistency? + /// The load balancing strategy to use. Default is `nil` which uses ``LoadBalancingStrategy/dataCenterAware(_:)``. public var loadBalancingStrategy: LoadBalancingStrategy? @@ -239,6 +243,9 @@ extension CassandraClient { if let value = self.consistency { try cluster.setConsistency(value.cassConsistency) } + if let value = self.serialConsistency { + try cluster.setSerialConsistency(value.cassConsistency) + } return cluster } @@ -398,6 +405,10 @@ internal final class Cluster { try self.checkResult { cass_cluster_set_consistency(self.rawPointer, consistency) } } + func setSerialConsistency(_ consistency: CassConsistency) throws { + try self.checkResult { cass_cluster_set_serial_consistency(self.rawPointer, consistency) } + } + func setSSL(_ ssl: SSLContext) throws { cass_cluster_set_ssl(self.rawPointer, ssl.rawPointer) } diff --git a/Sources/CassandraClient/Consistency.swift b/Sources/CassandraClient/Consistency.swift index afd71c4..157b805 100644 --- a/Sources/CassandraClient/Consistency.swift +++ b/Sources/CassandraClient/Consistency.swift @@ -56,4 +56,19 @@ extension CassandraClient { } } } + + /// Serial consistency levels + public enum SerialConsistency: Hashable { + case serial + case localSerial + + var cassConsistency: CassConsistency { + switch self { + case .serial: + return CASS_CONSISTENCY_SERIAL + case .localSerial: + return CASS_CONSISTENCY_LOCAL_SERIAL + } + } + } } diff --git a/Sources/CassandraClient/Data+PaginatedRows.swift b/Sources/CassandraClient/Data+PaginatedRows.swift index f2db99b..fd89493 100644 --- a/Sources/CassandraClient/Data+PaginatedRows.swift +++ b/Sources/CassandraClient/Data+PaginatedRows.swift @@ -202,7 +202,7 @@ extension CassandraClient.PaginatedRows: AsyncSequence { /// /// - Warning: /// This can be called only once for each ``PaginatedRows``, - /// otherwise it will throw ``CassandraClient.Error.rowsExhausted``. + /// Otherwise it will throw ``CassandraClient/Error/rowsExhausted`` error. public func makeAsyncIterator() -> AsyncIterator { AsyncIterator(self) } diff --git a/Tests/CassandraClientTests/CassandraClientTests.swift b/Tests/CassandraClientTests/CassandraClientTests.swift index c8c516c..ec03c9d 100644 --- a/Tests/CassandraClientTests/CassandraClientTests.swift +++ b/Tests/CassandraClientTests/CassandraClientTests.swift @@ -812,6 +812,104 @@ final class Tests: XCTestCase { } } + func testSerialConsistency() { + let env = ProcessInfo.processInfo.environment + let keyspace = env["CASSANDRA_KEYSPACE"] ?? "test" + + var serialConfig = CassandraClient.Configuration( + contactPointsProvider: { callback in + callback(.success([env["CASSANDRA_HOST"] ?? "127.0.0.1"])) + }, + port: env["CASSANDRA_CQL_PORT"].flatMap(Int32.init) ?? 9042, + protocolVersion: .v3 + ) + serialConfig.username = env["CASSANDRA_USER"] + serialConfig.password = env["CASSANDRA_PASSWORD"] + serialConfig.keyspace = keyspace + serialConfig.requestTimeoutMillis = UInt32(24_000) + serialConfig.connectTimeoutMillis = UInt32(10_000) + serialConfig.serialConsistency = .serial + + var logger = Logger(label: "test") + logger.logLevel = .debug + + let serialClient = CassandraClient(configuration: serialConfig, logger: logger) + defer { XCTAssertNoThrow(try serialClient.shutdown()) } + + XCTAssertNoThrow( + try serialClient.withSession(keyspace: .none) { session in + try session.run( + "create keyspace if not exists \(keyspace) with replication = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 }" + ).wait() + } + ) + + let serialSession = serialClient.makeSession(keyspace: keyspace) + defer { XCTAssertNoThrow(try serialSession.shutdown()) } + + let tableName = "test_serial_\(DispatchTime.now().uptimeNanoseconds)" + XCTAssertNoThrow(try serialSession.run("create table \(tableName) (id int primary key, value int);").wait()) + XCTAssertNoThrow(try serialSession.run("insert into \(tableName) (id, value) values (1, 100);").wait()) + + let lwtQuery = "update \(tableName) set value = 200 where id = 1 if value = 100;" + var serialResult: CassandraClient.Rows? + XCTAssertNoThrow(serialResult = try serialSession.query(lwtQuery).wait()) + XCTAssertNotNil(serialResult, "Serial consistency LWT should succeed") + + let serialRows = Array(serialResult!) + XCTAssertFalse(serialRows.isEmpty, "Serial LWT query should return at least one row") + if let firstRow = serialRows.first { + XCTAssertNotNil(firstRow.column("[applied]")?.bool, "Serial LWT result should contain [applied] column") + } + + var localSerialConfig = serialConfig + localSerialConfig.serialConsistency = .localSerial + + let localSerialClient = CassandraClient(configuration: localSerialConfig, logger: logger) + defer { XCTAssertNoThrow(try localSerialClient.shutdown()) } + + let localSerialSession = localSerialClient.makeSession(keyspace: keyspace) + defer { XCTAssertNoThrow(try localSerialSession.shutdown()) } + + let tableName2 = "test_local_serial_\(DispatchTime.now().uptimeNanoseconds)" + XCTAssertNoThrow( + try localSerialSession.run("create table \(tableName2) (id int primary key, value int);").wait() + ) + XCTAssertNoThrow(try localSerialSession.run("insert into \(tableName2) (id, value) values (1, 300);").wait()) + + let localLwtQuery = "update \(tableName2) set value = 400 where id = 1 if value = 300;" + var localSerialResult: CassandraClient.Rows? + XCTAssertNoThrow(localSerialResult = try localSerialSession.query(localLwtQuery).wait()) + XCTAssertNotNil(localSerialResult, "Local serial consistency LWT should succeed") + + let localSerialRows = Array(localSerialResult!) + XCTAssertFalse(localSerialRows.isEmpty, "Local serial LWT query should return at least one row") + if let firstRow = localSerialRows.first { + XCTAssertNotNil( + firstRow.column("[applied]")?.bool, + "Local serial LWT result should contain [applied] column" + ) + } + + var nilSerialConfig = serialConfig + nilSerialConfig.serialConsistency = nil + + let nilSerialClient = CassandraClient(configuration: nilSerialConfig, logger: logger) + defer { XCTAssertNoThrow(try nilSerialClient.shutdown()) } + + let nilSerialSession = nilSerialClient.makeSession(keyspace: keyspace) + defer { XCTAssertNoThrow(try nilSerialSession.shutdown()) } + + let tableName3 = "test_nil_serial_\(DispatchTime.now().uptimeNanoseconds)" + XCTAssertNoThrow(try nilSerialSession.run("create table \(tableName3) (id int primary key, value int);").wait()) + XCTAssertNoThrow(try nilSerialSession.run("insert into \(tableName3) (id, value) values (1, 500);").wait()) + + let nilLwtQuery = "update \(tableName3) set value = 600 where id = 1 if value = 500;" + var nilSerialResult: CassandraClient.Rows? + XCTAssertNoThrow(nilSerialResult = try nilSerialSession.query(nilLwtQuery).wait()) + XCTAssertNotNil(nilSerialResult, "Default serial consistency LWT should succeed") + } + // meh, but nothing cross platform available func randomBytes(size: Int) -> [UInt8] { var buffer = [UInt8]()