Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions Sources/CassandraClient/Configuration.swift
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@ extension CassandraClient {
/// Sets the cluster's consistency level. Default is `.localOne`.
public var consistency: CassandraClient.Consistency?

/// Sets the cluster's serial consistency level for LWT operations.
/// Default is `.serial`.
public var serialConsistency: CassandraClient.SerialConsistency?

/// The load balancing strategy to use. Default is `nil` which uses ``LoadBalancingStrategy/dataCenterAware(_:)``.
public var loadBalancingStrategy: LoadBalancingStrategy?

Expand Down Expand Up @@ -239,6 +243,9 @@ extension CassandraClient {
if let value = self.consistency {
try cluster.setConsistency(value.cassConsistency)
}
if let value = self.serialConsistency {
try cluster.setSerialConsistency(value.cassConsistency)
}

return cluster
}
Expand Down Expand Up @@ -398,6 +405,10 @@ internal final class Cluster {
try self.checkResult { cass_cluster_set_consistency(self.rawPointer, consistency) }
}

func setSerialConsistency(_ consistency: CassConsistency) throws {
try self.checkResult { cass_cluster_set_serial_consistency(self.rawPointer, consistency) }
}

func setSSL(_ ssl: SSLContext) throws {
cass_cluster_set_ssl(self.rawPointer, ssl.rawPointer)
}
Expand Down
15 changes: 15 additions & 0 deletions Sources/CassandraClient/Consistency.swift
Original file line number Diff line number Diff line change
Expand Up @@ -56,4 +56,19 @@ extension CassandraClient {
}
}
}

/// Serial consistency levels
public enum SerialConsistency: Hashable {
case serial
case localSerial

var cassConsistency: CassConsistency {
switch self {
case .serial:
return CASS_CONSISTENCY_SERIAL
case .localSerial:
return CASS_CONSISTENCY_LOCAL_SERIAL
}
}
}
}
2 changes: 1 addition & 1 deletion Sources/CassandraClient/Data+PaginatedRows.swift
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ extension CassandraClient.PaginatedRows: AsyncSequence {
///
/// - Warning:
/// This can be called only once for each ``PaginatedRows``,
/// otherwise it will throw ``CassandraClient.Error.rowsExhausted``.
/// Otherwise it will throw ``CassandraClient/Error/rowsExhausted`` error.
public func makeAsyncIterator() -> AsyncIterator {
AsyncIterator(self)
}
Expand Down
98 changes: 98 additions & 0 deletions Tests/CassandraClientTests/CassandraClientTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -812,6 +812,104 @@ final class Tests: XCTestCase {
}
}

func testSerialConsistency() {
let env = ProcessInfo.processInfo.environment
let keyspace = env["CASSANDRA_KEYSPACE"] ?? "test"

var serialConfig = CassandraClient.Configuration(
contactPointsProvider: { callback in
callback(.success([env["CASSANDRA_HOST"] ?? "127.0.0.1"]))
},
port: env["CASSANDRA_CQL_PORT"].flatMap(Int32.init) ?? 9042,
protocolVersion: .v3
)
serialConfig.username = env["CASSANDRA_USER"]
serialConfig.password = env["CASSANDRA_PASSWORD"]
serialConfig.keyspace = keyspace
serialConfig.requestTimeoutMillis = UInt32(24_000)
serialConfig.connectTimeoutMillis = UInt32(10_000)
serialConfig.serialConsistency = .serial

var logger = Logger(label: "test")
logger.logLevel = .debug

let serialClient = CassandraClient(configuration: serialConfig, logger: logger)
defer { XCTAssertNoThrow(try serialClient.shutdown()) }

XCTAssertNoThrow(
try serialClient.withSession(keyspace: .none) { session in
try session.run(
"create keyspace if not exists \(keyspace) with replication = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 }"
).wait()
}
)

let serialSession = serialClient.makeSession(keyspace: keyspace)
defer { XCTAssertNoThrow(try serialSession.shutdown()) }

let tableName = "test_serial_\(DispatchTime.now().uptimeNanoseconds)"
XCTAssertNoThrow(try serialSession.run("create table \(tableName) (id int primary key, value int);").wait())
XCTAssertNoThrow(try serialSession.run("insert into \(tableName) (id, value) values (1, 100);").wait())

let lwtQuery = "update \(tableName) set value = 200 where id = 1 if value = 100;"
var serialResult: CassandraClient.Rows?
XCTAssertNoThrow(serialResult = try serialSession.query(lwtQuery).wait())
XCTAssertNotNil(serialResult, "Serial consistency LWT should succeed")

let serialRows = Array(serialResult!)
XCTAssertFalse(serialRows.isEmpty, "Serial LWT query should return at least one row")
if let firstRow = serialRows.first {
XCTAssertNotNil(firstRow.column("[applied]")?.bool, "Serial LWT result should contain [applied] column")
}

var localSerialConfig = serialConfig
localSerialConfig.serialConsistency = .localSerial

let localSerialClient = CassandraClient(configuration: localSerialConfig, logger: logger)
defer { XCTAssertNoThrow(try localSerialClient.shutdown()) }

let localSerialSession = localSerialClient.makeSession(keyspace: keyspace)
defer { XCTAssertNoThrow(try localSerialSession.shutdown()) }

let tableName2 = "test_local_serial_\(DispatchTime.now().uptimeNanoseconds)"
XCTAssertNoThrow(
try localSerialSession.run("create table \(tableName2) (id int primary key, value int);").wait()
)
XCTAssertNoThrow(try localSerialSession.run("insert into \(tableName2) (id, value) values (1, 300);").wait())

let localLwtQuery = "update \(tableName2) set value = 400 where id = 1 if value = 300;"
var localSerialResult: CassandraClient.Rows?
XCTAssertNoThrow(localSerialResult = try localSerialSession.query(localLwtQuery).wait())
XCTAssertNotNil(localSerialResult, "Local serial consistency LWT should succeed")

let localSerialRows = Array(localSerialResult!)
XCTAssertFalse(localSerialRows.isEmpty, "Local serial LWT query should return at least one row")
if let firstRow = localSerialRows.first {
XCTAssertNotNil(
firstRow.column("[applied]")?.bool,
"Local serial LWT result should contain [applied] column"
)
}

var nilSerialConfig = serialConfig
nilSerialConfig.serialConsistency = nil

let nilSerialClient = CassandraClient(configuration: nilSerialConfig, logger: logger)
defer { XCTAssertNoThrow(try nilSerialClient.shutdown()) }

let nilSerialSession = nilSerialClient.makeSession(keyspace: keyspace)
defer { XCTAssertNoThrow(try nilSerialSession.shutdown()) }

let tableName3 = "test_nil_serial_\(DispatchTime.now().uptimeNanoseconds)"
XCTAssertNoThrow(try nilSerialSession.run("create table \(tableName3) (id int primary key, value int);").wait())
XCTAssertNoThrow(try nilSerialSession.run("insert into \(tableName3) (id, value) values (1, 500);").wait())

let nilLwtQuery = "update \(tableName3) set value = 600 where id = 1 if value = 500;"
var nilSerialResult: CassandraClient.Rows?
XCTAssertNoThrow(nilSerialResult = try nilSerialSession.query(nilLwtQuery).wait())
XCTAssertNotNil(nilSerialResult, "Default serial consistency LWT should succeed")
}

// meh, but nothing cross platform available
func randomBytes(size: Int) -> [UInt8] {
var buffer = [UInt8]()
Expand Down