Skip to content

Commit 1da99aa

Browse files
erichmengeErich Menge
andauthored
Expose the paging token API from the C/C++ driver (#22)
Co-authored-by: Erich Menge <[email protected]>
1 parent da2a404 commit 1da99aa

File tree

3 files changed

+100
-0
lines changed

3 files changed

+100
-0
lines changed

Sources/CassandraClient/Data.swift

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@ import Foundation
1717
import Logging
1818
import NIO
1919

20+
public protocol PagingStateToken: ContiguousBytes {}
21+
2022
extension CassandraClient {
2123
/// Resulting row(s) of a Cassandra query. Data are returned all at once.
2224
public final class Rows: Sequence {
@@ -46,6 +48,35 @@ extension CassandraClient {
4648
Iterator(rows: self)
4749
}
4850

51+
/// Returns a reusable paging token.
52+
///
53+
/// - Warning: This token is not suitable or safe for sharing externally.
54+
public func opaquePagingStateToken() throws -> OpaquePagingStateToken {
55+
try OpaquePagingStateToken(token: self.rawPagingStateToken())
56+
}
57+
58+
private func rawPagingStateToken() throws -> [UInt8] {
59+
var buffer: UnsafePointer<CChar>?
60+
var length = 0
61+
62+
// The underlying memory is freed with the Rows result
63+
let result = cass_result_paging_state_token(self.rawPointer, &buffer, &length)
64+
guard result == CASS_OK, let bytesPointer = buffer else {
65+
throw CassandraClient.Error(result)
66+
}
67+
68+
let tokenBytes: [UInt8] = bytesPointer.withMemoryRebound(to: UInt8.self, capacity: length) {
69+
let bufferPointer = UnsafeBufferPointer(start: $0, count: length)
70+
return Array(unsafeUninitializedCapacity: length) { storagePointer, storageCount in
71+
var (unwritten, endIndex) = storagePointer.initialize(from: bufferPointer)
72+
precondition(unwritten.next() == nil)
73+
storageCount = storagePointer.distance(from: storagePointer.startIndex, to: endIndex)
74+
}
75+
}
76+
77+
return tokenBytes
78+
}
79+
4980
public final class Iterator: IteratorProtocol {
5081
public typealias Element = Row
5182

@@ -283,6 +314,20 @@ extension CassandraClient {
283314
cass_value_is_null(self.rawPointer) == cass_true
284315
}
285316
}
317+
318+
/// A reusable page token that can be used by `Statement` to resume querying
319+
/// at a specific position.
320+
public struct OpaquePagingStateToken: PagingStateToken {
321+
let token: [UInt8]
322+
323+
internal init(token: [UInt8]) {
324+
self.token = token
325+
}
326+
327+
public func withUnsafeBytes<R>(_ body: (UnsafeRawBufferPointer) throws -> R) rethrows -> R {
328+
try self.token.withUnsafeBytes(body)
329+
}
330+
}
286331
}
287332

288333
// MARK: - Int8

Sources/CassandraClient/Statement.swift

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,23 @@ extension CassandraClient {
9191
try checkResult { cass_statement_set_paging_size(self.rawPointer, pagingSize) }
9292
}
9393

94+
/// Sets the starting page of the returned paginated results.
95+
///
96+
/// The paging state token can be obtained by the `pagingStateToken()`
97+
/// function on `Rows`.
98+
///
99+
/// - Warning: The paging state should not be exposed to or come from
100+
/// untrusted environments. The paging state could be spoofed and
101+
/// potentially used to gain access to other data.
102+
public func setPagingStateToken(_ pagingStateToken: PagingStateToken) throws {
103+
try checkResult {
104+
pagingStateToken.withUnsafeBytes {
105+
let buffer = $0.bindMemory(to: CChar.self)
106+
return cass_statement_set_paging_state_token(self.rawPointer, buffer.baseAddress, buffer.count)
107+
}
108+
}
109+
}
110+
94111
deinit {
95112
cass_statement_free(self.rawPointer)
96113
}

Tests/CassandraClientTests/CassandraClientTests.swift

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -233,6 +233,44 @@ final class Tests: XCTestCase {
233233
}
234234
}
235235

236+
func testPagingToken() throws {
237+
let tableName = "test_\(DispatchTime.now().uptimeNanoseconds)"
238+
try self.cassandraClient.run("create table \(tableName) (id int primary key, data text);").wait()
239+
240+
let options = CassandraClient.Statement.Options(consistency: .localQuorum)
241+
242+
let count = Int.random(in: 5000 ... 6000)
243+
var futures = [EventLoopFuture<Void>]()
244+
(0 ..< count).forEach { index in
245+
futures.append(
246+
self.cassandraClient.run(
247+
"insert into \(tableName) (id, data) values (?, ?);",
248+
parameters: [.int32(Int32(index)), .string(UUID().uuidString)],
249+
options: options
250+
)
251+
)
252+
}
253+
254+
let initialPages = try self.cassandraClient.query("select id, data from \(tableName);", pageSize: Int32(5)).wait()
255+
256+
for _ in 0 ..< Int.random(in: 10 ... 20) {
257+
_ = try! initialPages.nextPage().wait()
258+
}
259+
260+
let page = try initialPages.nextPage().wait()
261+
let pageToken = try page.opaquePagingStateToken()
262+
let row = try initialPages.nextPage().wait().first!
263+
264+
let statement = try CassandraClient.Statement(query: "select id, data from \(tableName);")
265+
try! statement.setPagingStateToken(pageToken)
266+
let offsetPages = try self.cassandraClient.execute(statement: statement, pageSize: Int32(5), on: nil).wait()
267+
let pagedRow: CassandraClient.Row = try offsetPages.nextPage().wait().first!
268+
269+
let id1: CassandraClient.Column = pagedRow.column(0)!
270+
let id2: CassandraClient.Column = row.column(0)!
271+
XCTAssertEqual(id1.int32, id2.int32)
272+
}
273+
236274
@available(macOS 12, iOS 15, tvOS 15, watchOS 8, *)
237275
func testQueryAsyncIterator() throws {
238276
#if !(compiler(>=5.5) && canImport(_Concurrency))

0 commit comments

Comments
 (0)