Skip to content

Commit 15c1a6a

Browse files
authored
perf: optimised marker operations removing heap allocations (#107)
1 parent 36ad066 commit 15c1a6a

File tree

5 files changed

+126
-41
lines changed

5 files changed

+126
-41
lines changed

cpp_test/test_line_sender.cpp

Lines changed: 18 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -178,29 +178,27 @@ TEST_CASE("line_sender c api basics")
178178
2.7,
179179
48121.5,
180180
4.3};
181-
CHECK(
182-
::line_sender_buffer_column_f64_arr_byte_strides(
183-
buffer,
184-
arr_name,
185-
rank,
186-
shape,
187-
strides,
188-
reinterpret_cast<uint8_t*>(arr_data.data()),
189-
sizeof(arr_data),
190-
&err));
181+
CHECK(::line_sender_buffer_column_f64_arr_byte_strides(
182+
buffer,
183+
arr_name,
184+
rank,
185+
shape,
186+
strides,
187+
reinterpret_cast<uint8_t*>(arr_data.data()),
188+
sizeof(arr_data),
189+
&err));
191190

192191
line_sender_column_name arr_name2 = QDB_COLUMN_NAME_LITERAL("a2");
193192
intptr_t elem_strides[] = {6, 2, 1};
194-
CHECK(
195-
::line_sender_buffer_column_f64_arr_elem_strides(
196-
buffer,
197-
arr_name2,
198-
rank,
199-
shape,
200-
elem_strides,
201-
reinterpret_cast<uint8_t*>(arr_data.data()),
202-
sizeof(arr_data),
203-
&err));
193+
CHECK(::line_sender_buffer_column_f64_arr_elem_strides(
194+
buffer,
195+
arr_name2,
196+
rank,
197+
shape,
198+
elem_strides,
199+
reinterpret_cast<uint8_t*>(arr_data.data()),
200+
sizeof(arr_data),
201+
&err));
204202
CHECK(::line_sender_buffer_at_nanos(buffer, 10000000, &err));
205203
CHECK(server.recv() == 0);
206204
CHECK(::line_sender_buffer_size(buffer) == 266);

include/questdb/ingress/line_sender.h

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -515,8 +515,10 @@ bool line_sender_buffer_column_str(
515515
* @param[in] rank Number of dimensions of the array.
516516
* @param[in] shape Array of dimension sizes (length = `rank`).
517517
* Each element must be a positive integer.
518-
* @param[in] strides Array strides, in the unit of bytes. Strides can be negative.
519-
* @param[in] data_buffer Array data, laid out according to the provided shape and strides.
518+
* @param[in] strides Array strides, in the unit of bytes. Strides can be
519+
* negative.
520+
* @param[in] data_buffer Array data, laid out according to the provided shape
521+
* and strides.
520522
* @param[in] data_buffer_len Length of the array data block in bytes.
521523
* @param[out] err_out Set to an error object on failure (if non-NULL).
522524
* @return true on success, false on error.
@@ -543,8 +545,10 @@ bool line_sender_buffer_column_f64_arr_byte_strides(
543545
* @param[in] rank Number of dimensions of the array.
544546
* @param[in] shape Array of dimension sizes (length = `rank`).
545547
* Each element must be a positive integer.
546-
* @param[in] strides Array strides, in the unit of elements. Strides can be negative.
547-
* @param[in] data_buffer Array data, laid out according to the provided shape and strides.
548+
* @param[in] strides Array strides, in the unit of elements. Strides can be
549+
* negative.
550+
* @param[in] data_buffer Array data, laid out according to the provided shape
551+
* and strides.
548552
* @param[in] data_buffer_len Length of the array data block in bytes.
549553
* @param[out] err_out Set to an error object on failure (if non-NULL).
550554
* @return true on success, false on error.

include/questdb/ingress/line_sender.hpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -652,8 +652,8 @@ class line_sender_buffer
652652
* @param name Column name.
653653
* @param shape Array dimensions (e.g., [2,3] for a 2x3 matrix).
654654
* @param strides Strides for each dimension, in the unit specified by `B`.
655-
* @param data Array elements laid out in row-major order. Their number must
656-
* match the product of dimension sizes.
655+
* @param data Array elements laid out in row-major order. Their number
656+
* must match the product of dimension sizes.
657657
*/
658658
template <bool B, typename T, size_t N>
659659
line_sender_buffer& column(

questdb-rs/src/ingress/mod.rs

Lines changed: 26 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ use std::collections::HashMap;
3939
use std::convert::Infallible;
4040
use std::fmt::{Debug, Display, Formatter, Write};
4141
use std::io::{self, BufRead, BufReader, ErrorKind, Write as IoWrite};
42+
use std::num::NonZeroUsize;
4243
use std::ops::Deref;
4344
use std::path::PathBuf;
4445
use std::slice::from_raw_parts_mut;
@@ -476,11 +477,13 @@ impl OpCase {
476477
}
477478
}
478479

479-
#[derive(Debug, Clone)]
480+
// IMPORTANT: This struct MUST remain `Copy` to ensure that
481+
// there are no heap allocations when performing marker operations.
482+
#[derive(Debug, Clone, Copy)]
480483
struct BufferState {
481484
op_case: OpCase,
482485
row_count: usize,
483-
first_table: Option<String>,
486+
first_table_len: Option<NonZeroUsize>,
484487
transactional: bool,
485488
}
486489

@@ -489,17 +492,10 @@ impl BufferState {
489492
Self {
490493
op_case: OpCase::Init,
491494
row_count: 0,
492-
first_table: None,
495+
first_table_len: None,
493496
transactional: true,
494497
}
495498
}
496-
497-
fn clear(&mut self) {
498-
self.op_case = OpCase::Init;
499-
self.row_count = 0;
500-
self.first_table = None;
501-
self.transactional = true;
502-
}
503499
}
504500

505501
/// A reusable buffer to prepare a batch of ILP messages.
@@ -688,7 +684,7 @@ impl Buffer {
688684
)
689685
));
690686
}
691-
self.marker = Some((self.output.len(), self.state.clone()));
687+
self.marker = Some((self.output.len(), self.state));
692688
Ok(())
693689
}
694690

@@ -721,7 +717,7 @@ impl Buffer {
721717
/// [`capacity`](Buffer::capacity).
722718
pub fn clear(&mut self) {
723719
self.output.clear();
724-
self.state.clear();
720+
self.state = BufferState::new();
725721
self.marker = None;
726722
}
727723

@@ -798,16 +794,31 @@ impl Buffer {
798794
let name: TableName<'a> = name.try_into()?;
799795
self.validate_max_name_len(name.name)?;
800796
self.check_op(Op::Table)?;
797+
let table_begin = self.output.len();
801798
write_escaped_unquoted(&mut self.output, name.name);
799+
let table_end = self.output.len();
802800
self.state.op_case = OpCase::TableWritten;
803801

804802
// A buffer stops being transactional if it targets multiple tables.
805-
if let Some(first_table) = &self.state.first_table {
806-
if first_table != name.name {
803+
if let Some(first_table_len) = &self.state.first_table_len {
804+
let first_table = &self.output[0..(first_table_len.get())];
805+
let this_table = &self.output[table_begin..table_end];
806+
if first_table != this_table {
807807
self.state.transactional = false;
808808
}
809809
} else {
810-
self.state.first_table = Some(name.name.to_owned());
810+
debug_assert!(table_begin == 0);
811+
812+
// This is a bit confusing, so worth explaining:
813+
// `NonZeroUsize::new(table_end)` will return `None` if `table_end` is 0,
814+
// but we know that `table_end` is never 0 here, we just need an option type
815+
// anyway, so we don't bother unwrapping it to then wrap it again.
816+
let first_table_len = NonZeroUsize::new(table_end);
817+
818+
// Instead we just assert that it's `Some`.
819+
debug_assert!(first_table_len.is_some());
820+
821+
self.state.first_table_len = first_table_len;
811822
}
812823
Ok(self)
813824
}

questdb-rs/src/tests/sender.rs

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -307,6 +307,78 @@ fn test_row_count() -> TestResult {
307307
Ok(())
308308
}
309309

310+
#[test]
311+
fn test_transactional() -> TestResult {
312+
let mut buffer = Buffer::new(ProtocolVersion::V2);
313+
314+
// transactional since there are no recorded tables yet
315+
assert_eq!(buffer.row_count(), 0);
316+
assert!(buffer.transactional());
317+
318+
buffer.set_marker()?;
319+
buffer.table("table 1.test")?.symbol("a", "b")?.at_now()?;
320+
assert_eq!(buffer.row_count(), 1); // tables={'table 1.test'}
321+
322+
// still transactional since there is only one single table.
323+
assert!(buffer.transactional());
324+
325+
buffer.table("table 2.test")?.symbol("c", "d")?.at_now()?;
326+
327+
// not transactional since we have both tables "x" and "y".
328+
assert_eq!(buffer.row_count(), 2); // tables={'table 1.test', 'table 2.test'}
329+
assert!(!buffer.transactional());
330+
331+
buffer.rewind_to_marker()?;
332+
// no tables, so we're transactional again
333+
assert_eq!(buffer.row_count(), 0); // tables=[]
334+
assert!(buffer.transactional());
335+
assert!(buffer.is_empty());
336+
337+
// We add another new and different table, so we are still transactional.
338+
buffer.table("test=table=3")?.symbol("e", "f")?.at_now()?;
339+
assert_eq!(buffer.row_count(), 1); // tables={'test=table=3'}
340+
assert!(buffer.transactional());
341+
342+
// Same table again, so we are still transactional.
343+
buffer.table("test=table=3")?.symbol("g", "h")?.at_now()?;
344+
assert_eq!(buffer.row_count(), 2); // tables={'test=table=3'}
345+
assert!(buffer.transactional());
346+
347+
buffer.set_marker()?;
348+
// We add a new different table: Name differs in length.
349+
buffer.table("test=table=3 ")?.symbol("i", "j")?.at_now()?;
350+
assert_eq!(buffer.row_count(), 3); // tables={'test=table=3', 'test=table=3 '}
351+
assert!(!buffer.transactional());
352+
353+
buffer.rewind_to_marker()?;
354+
assert_eq!(buffer.row_count(), 2); // tables={'test=table=3'}
355+
assert!(buffer.transactional());
356+
357+
buffer.set_marker()?;
358+
// We add a new different table: Name differs in content, but not in length.
359+
buffer.table("test=table=4")?.symbol("k", "l")?.at_now()?;
360+
assert_eq!(buffer.row_count(), 3); // tables={'test=table=3', 'test=table=4'}
361+
assert!(!buffer.transactional());
362+
363+
buffer.rewind_to_marker()?;
364+
assert_eq!(buffer.row_count(), 2); // tables={'test=table=3'}
365+
assert!(buffer.transactional());
366+
367+
buffer.clear();
368+
assert_eq!(buffer.row_count(), 0); // tables=[]
369+
assert!(buffer.transactional());
370+
assert!(buffer.is_empty());
371+
372+
// We add three rows of the same new table, so we are still transactional.
373+
buffer.table("test=table=5")?.symbol("m", "n")?.at_now()?;
374+
buffer.table("test=table=5")?.symbol("o", "p")?.at_now()?;
375+
buffer.table("test=table=5")?.symbol("q", "r")?.at_now()?;
376+
assert_eq!(buffer.row_count(), 3); // tables={'test=table=5'}
377+
assert!(buffer.transactional());
378+
379+
Ok(())
380+
}
381+
310382
#[test]
311383
fn test_auth_inconsistent_keys() -> TestResult {
312384
test_bad_key("fLKYEaoEb9lrn3nkwLDA-M_xnuFOdSt9y0Z7_vWSHLU", // d

0 commit comments

Comments
 (0)