Skip to content

Commit cc9ef87

Browse files
committed
datastore: add truncation support
1 parent b4bec5b commit cc9ef87

File tree

6 files changed

+127
-58
lines changed

6 files changed

+127
-58
lines changed

crates/core/src/db/relational_db.rs

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ use spacetimedb_datastore::system_tables::StModuleRow;
2323
use spacetimedb_datastore::system_tables::{StFields, StVarFields, StVarName, StVarRow, ST_MODULE_ID, ST_VAR_ID};
2424
use spacetimedb_datastore::traits::{
2525
InsertFlags, IsolationLevel, Metadata, MutTx as _, MutTxDatastore, Program, RowTypeForTable, Tx as _, TxDatastore,
26-
UpdateFlags,
26+
TxTableTruncated, UpdateFlags,
2727
};
2828
use spacetimedb_datastore::{
2929
locking_tx_datastore::{
@@ -869,11 +869,17 @@ impl RelationalDB {
869869
.collect();
870870
let deletes: Box<_> = tx_data
871871
.deletes()
872-
.map(|(table_id, rowdata)| Ops {
872+
.filter(|(_, truncated, _)| *truncated == TxTableTruncated::No)
873+
.map(|(table_id, _, rowdata)| Ops {
873874
table_id: *table_id,
874875
rowdata: rowdata.clone(),
875876
})
876877
.collect();
878+
let truncates: Box<_> = tx_data
879+
.deletes()
880+
.filter(|(_, truncated, _)| *truncated == TxTableTruncated::Yes)
881+
.map(|(table_id, ..)| *table_id)
882+
.collect();
877883

878884
let inputs = reducer_context.map(|rcx| rcx.into());
879885

@@ -883,7 +889,7 @@ impl RelationalDB {
883889
mutations: Some(Mutations {
884890
inserts,
885891
deletes,
886-
truncates: [].into(),
892+
truncates,
887893
}),
888894
};
889895

crates/core/src/subscription/tx.rs

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -76,9 +76,10 @@ impl DeltaTableIndexes {
7676
indexes
7777
}
7878

79+
let deletes = data.deletes().map(|(table_id, _, rows)| (table_id, rows));
7980
Self {
8081
inserts: build_indexes_for_rows(tx, meta, data.inserts()),
81-
deletes: build_indexes_for_rows(tx, meta, data.deletes()),
82+
deletes: build_indexes_for_rows(tx, meta, deletes),
8283
}
8384
}
8485
}
@@ -133,8 +134,8 @@ impl DeltaStore for DeltaTx<'_> {
133134
self.data
134135
.and_then(|data| {
135136
data.inserts()
136-
.find(|(id, _)| **id == table_id)
137-
.map(|(_, rows)| rows.len())
137+
.find(|(id, ..)| **id == table_id)
138+
.map(|(.., rows)| rows.len())
138139
})
139140
.unwrap_or_default()
140141
}
@@ -143,25 +144,25 @@ impl DeltaStore for DeltaTx<'_> {
143144
self.data
144145
.and_then(|data| {
145146
data.deletes()
146-
.find(|(id, _)| **id == table_id)
147-
.map(|(_, rows)| rows.len())
147+
.find(|(id, ..)| **id == table_id)
148+
.map(|(.., rows)| rows.len())
148149
})
149150
.unwrap_or_default()
150151
}
151152

152153
fn inserts_for_table(&self, table_id: TableId) -> Option<std::slice::Iter<'_, ProductValue>> {
153154
self.data.and_then(|data| {
154155
data.inserts()
155-
.find(|(id, _)| **id == table_id)
156-
.map(|(_, rows)| rows.iter())
156+
.find(|(id, ..)| **id == table_id)
157+
.map(|(.., rows)| rows.iter())
157158
})
158159
}
159160

160161
fn deletes_for_table(&self, table_id: TableId) -> Option<std::slice::Iter<'_, ProductValue>> {
161162
self.data.and_then(|data| {
162163
data.deletes()
163-
.find(|(id, _)| **id == table_id)
164-
.map(|(_, rows)| rows.iter())
164+
.find(|(id, ..)| **id == table_id)
165+
.map(|(.., rows)| rows.iter())
165166
})
166167
}
167168

crates/datastore/src/locking_tx_datastore/committed_state.rs

Lines changed: 33 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -334,14 +334,16 @@ impl CommittedState {
334334
Ok(())
335335
}
336336

337+
pub(super) fn replay_truncate(&mut self, table_id: TableId) -> Result<()> {
338+
let (table, blob_store, ..) = self.get_table_and_blob_store_mut(table_id)?;
339+
table.clear(blob_store);
340+
Ok(())
341+
}
342+
337343
pub(super) fn replay_delete_by_rel(&mut self, table_id: TableId, rel: &ProductValue) -> Result<()> {
338-
let table = self
339-
.tables
340-
.get_mut(&table_id)
341-
.ok_or(TableError::IdNotFoundState(table_id))?;
342-
let blob_store = &mut self.blob_store;
344+
let (table, blob_store, _, page_pool) = self.get_table_and_blob_store_mut(table_id)?;
343345
table
344-
.delete_equal_row(&self.page_pool, blob_store, rel)
346+
.delete_equal_row(page_pool, blob_store, rel)
345347
.map_err(TableError::Bflatn)?
346348
.ok_or_else(|| anyhow!("Delete for non-existent row when replaying transaction"))?;
347349

@@ -459,9 +461,9 @@ impl CommittedState {
459461
for index_row in rows {
460462
let index_id = index_row.index_id;
461463
let table_id = index_row.table_id;
462-
let (Some(table), blob_store, index_id_map) = self.get_table_and_blob_store_mut(table_id) else {
463-
panic!("Cannot create index for table which doesn't exist in committed state");
464-
};
464+
let (table, blob_store, index_id_map, _) = self
465+
.get_table_and_blob_store_mut(table_id)
466+
.expect("index should exist in committed state; cannot create it");
465467
let algo: IndexAlgorithm = index_row.index_algorithm.into();
466468
let columns: ColSet = algo.columns().into();
467469
let is_unique = unique_constraints.contains(&(table_id, columns));
@@ -562,8 +564,7 @@ impl CommittedState {
562564
"Cannot get TX_STATE RowPointer from CommittedState.",
563565
);
564566
let table = self
565-
.tables
566-
.get(&table_id)
567+
.get_table(table_id)
567568
.expect("Attempt to get COMMITTED_STATE row from table not present in tables.");
568569
// TODO(perf, deep-integration): Use `get_row_ref_unchecked`.
569570
table.get_row_ref(&self.blob_store, row_ptr).unwrap()
@@ -643,17 +644,18 @@ impl CommittedState {
643644

644645
if !deletes.is_empty() {
645646
let table_name = &*table.get_schema().table_name;
646-
// TODO(centril): Pass this along to record truncated tables.
647-
let _truncated = table.row_count == 0;
648-
tx_data.set_deletes_for_table(table_id, table_name, deletes.into());
647+
let truncated = table.row_count == 0;
648+
tx_data.set_deletes_for_table(table_id, table_name, deletes.into(), truncated);
649649
}
650650
}
651651

652652
for (table_id, row_ptrs) in delete_tables {
653-
if let (Some(table), blob_store, _) = self.get_table_and_blob_store_mut(table_id) {
654-
delete_rows(tx_data, table_id, table, blob_store, row_ptrs.len(), row_ptrs.iter());
655-
} else if !row_ptrs.is_empty() {
656-
panic!("Deletion for non-existent table {table_id:?}... huh?");
653+
match self.get_table_and_blob_store_mut(table_id) {
654+
Ok((table, blob_store, ..)) => {
655+
delete_rows(tx_data, table_id, table, blob_store, row_ptrs.len(), row_ptrs.iter())
656+
}
657+
Err(_) if !row_ptrs.is_empty() => panic!("Deletion for non-existent table {table_id:?}... huh?"),
658+
Err(_) => {}
657659
}
658660
}
659661

@@ -832,12 +834,21 @@ impl CommittedState {
832834
pub(super) fn get_table_and_blob_store_mut(
833835
&mut self,
834836
table_id: TableId,
835-
) -> (Option<&mut Table>, &mut dyn BlobStore, &mut IndexIdMap) {
836-
(
837-
self.tables.get_mut(&table_id),
837+
) -> Result<(&mut Table, &mut dyn BlobStore, &mut IndexIdMap, &PagePool)> {
838+
// NOTE(centril): `TableError` is a fairly large type.
839+
// Not making this lazy made `TableError::drop` show up in perf.
840+
// TODO(centril): Box all the errors.
841+
#[allow(clippy::unnecessary_lazy_evaluations)]
842+
let table = self
843+
.tables
844+
.get_mut(&table_id)
845+
.ok_or_else(|| TableError::IdNotFoundState(table_id))?;
846+
Ok((
847+
table,
838848
&mut self.blob_store as &mut dyn BlobStore,
839849
&mut self.index_id_map,
840-
)
850+
&self.page_pool,
851+
))
841852
}
842853

843854
fn make_table(schema: Arc<TableSchema>) -> Table {

crates/datastore/src/locking_tx_datastore/datastore.rs

Lines changed: 28 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1098,8 +1098,26 @@ impl<F: FnMut(u64)> spacetimedb_commitlog::payload::txdata::Visitor for ReplayVi
10981098
Ok(row)
10991099
}
11001100

1101-
fn visit_truncate(&mut self, _table_id: TableId) -> std::result::Result<(), Self::Error> {
1102-
Err(anyhow!("visit: truncate not yet supported").into())
1101+
fn visit_truncate(&mut self, table_id: TableId) -> std::result::Result<(), Self::Error> {
1102+
let schema = self.committed_state.schema_for_table(table_id)?;
1103+
// TODO: avoid clone
1104+
let table_name = schema.table_name.clone();
1105+
1106+
self.committed_state.replay_truncate(table_id).with_context(|| {
1107+
format!(
1108+
"Error truncating table {:?} during transaction {:?} playback",
1109+
table_id, self.committed_state.next_tx_offset
1110+
)
1111+
})?;
1112+
1113+
// NOTE: the `rdb_num_table_rows` metric is used by the query optimizer,
1114+
// and therefore has performance implications and must not be disabled.
1115+
DB_METRICS
1116+
.rdb_num_table_rows
1117+
.with_label_values(self.database_identity, &table_id.into(), &table_name)
1118+
.set(0);
1119+
1120+
Ok(())
11031121
}
11041122

11051123
fn visit_tx_start(&mut self, offset: u64) -> std::result::Result<(), Self::Error> {
@@ -1153,7 +1171,7 @@ mod tests {
11531171
ST_ROW_LEVEL_SECURITY_NAME, ST_SCHEDULED_ID, ST_SCHEDULED_NAME, ST_SEQUENCE_ID, ST_SEQUENCE_NAME,
11541172
ST_TABLE_NAME, ST_VAR_ID, ST_VAR_NAME,
11551173
};
1156-
use crate::traits::{IsolationLevel, MutTx};
1174+
use crate::traits::{IsolationLevel, MutTx, TxTableTruncated};
11571175
use crate::Result;
11581176
use bsatn::to_vec;
11591177
use core::{fmt, mem};
@@ -2775,7 +2793,7 @@ mod tests {
27752793
let tx_data_2 = commit(&datastore, tx)?;
27762794
// Ensure that none of the commits deleted rows in our table.
27772795
for tx_data in [&tx_data_1, &tx_data_2] {
2778-
assert_eq!(tx_data.deletes().find(|(tid, _)| **tid == table_id), None);
2796+
assert_eq!(tx_data.deletes().find(|(tid, ..)| **tid == table_id), None);
27792797
}
27802798
// Ensure that the first commit added the row but that the second didn't.
27812799
for (tx_data, expected_rows) in [(&tx_data_1, vec![row.clone()]), (&tx_data_2, vec![])] {
@@ -3099,11 +3117,12 @@ mod tests {
30993117
// Now drop the table again and commit.
31003118
assert!(datastore.drop_table_mut_tx(&mut tx, table_id).is_ok());
31013119
let tx_data = commit(&datastore, tx)?;
3102-
let (_, deleted) = tx_data
3120+
let (_, truncated, deleted_rows) = tx_data
31033121
.deletes()
3104-
.find(|(id, _)| **id == table_id)
3122+
.find(|(id, ..)| **id == table_id)
31053123
.expect("should have deleted rows for `table_id`");
3106-
assert_eq!(&**deleted, [row]);
3124+
assert_eq!(&**deleted_rows, [row]);
3125+
assert_eq!(truncated, TxTableTruncated::Yes);
31073126

31083127
// In the next transaction, the table doesn't exist.
31093128
assert!(
@@ -3307,8 +3326,9 @@ mod tests {
33073326
let to_product = |col: &ColumnSchema| value_serialize(&StColumnRow::from(col.clone())).into_product().unwrap();
33083327
let (_, inserts) = tx_data.inserts().find(|(id, _)| **id == ST_COLUMN_ID).unwrap();
33093328
assert_eq!(&**inserts, [to_product(&columns[1])].as_slice());
3310-
let (_, deletes) = tx_data.deletes().find(|(id, _)| **id == ST_COLUMN_ID).unwrap();
3329+
let (_, truncated, deletes) = tx_data.deletes().find(|(id, ..)| **id == ST_COLUMN_ID).unwrap();
33113330
assert_eq!(&**deletes, [to_product(&columns_original[1])].as_slice());
3331+
assert_eq!(truncated, TxTableTruncated::No);
33123332

33133333
// Check that we can successfully scan using the new schema type post commit.
33143334
let tx = begin_tx(&datastore);

crates/datastore/src/locking_tx_datastore/mut_tx.rs

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -414,13 +414,8 @@ impl MutTxId {
414414
TxTableForInsertion<'_>,
415415
(&mut Table, &mut dyn BlobStore, &mut IndexIdMap),
416416
)> {
417-
let (commit_table, commit_bs, idx_map) = self.committed_state_write_lock.get_table_and_blob_store_mut(table_id);
418-
// NOTE(centril): `TableError` is a fairly large type.
419-
// Not making this lazy made `TableError::drop` show up in perf.
420-
// TODO(centril): Box all the errors.
421-
#[allow(clippy::unnecessary_lazy_evaluations)]
422-
let commit_table = commit_table.ok_or_else(|| TableError::IdNotFoundState(table_id))?;
423-
417+
let (commit_table, commit_bs, idx_map, _) =
418+
self.committed_state_write_lock.get_table_and_blob_store_mut(table_id)?;
424419
// Get the insert table, so we can write the row into it.
425420
let tx = self
426421
.tx_state

crates/datastore/src/traits.rs

Lines changed: 45 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -174,7 +174,9 @@ pub struct TxData {
174174
/// The inserted rows per table.
175175
inserts: BTreeMap<TableId, Arc<[ProductValue]>>,
176176
/// The deleted rows per table.
177-
deletes: BTreeMap<TableId, Arc<[ProductValue]>>,
177+
///
178+
/// Also stores per table whether it has been truncated.
179+
deletes: BTreeMap<TableId, TxDeleteEntry>,
178180
/// Map of all `TableId`s in both `inserts` and `deletes` to their
179181
/// corresponding table name.
180182
tables: IntMap<TableId, String>,
@@ -186,6 +188,24 @@ pub struct TxData {
186188
// TODO: Store an `Arc<String>` or equivalent instead.
187189
}
188190

191+
/// A record of a list of deletes for and potential truncation of a table,
192+
/// within a transaction.
193+
pub struct TxDeleteEntry {
194+
/// Were all rows previously in the table deleted within this transaction?
195+
truncated: TxTableTruncated,
196+
/// The deleted rows of the table.
197+
rows: Arc<[ProductValue]>,
198+
}
199+
200+
/// Whether a table was truncated in a transaction.
201+
#[derive(Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Debug)]
202+
pub enum TxTableTruncated {
203+
/// It was truncated.
204+
Yes,
205+
/// It was not truncated.
206+
No,
207+
}
208+
189209
impl TxData {
190210
/// Set `tx_offset` as the expected on-disk transaction offset of this transaction.
191211
pub fn set_tx_offset(&mut self, tx_offset: u64) {
@@ -208,8 +228,22 @@ impl TxData {
208228
}
209229

210230
/// Set `rows` as the deleted rows for `(table_id, table_name)`.
211-
pub fn set_deletes_for_table(&mut self, table_id: TableId, table_name: &str, rows: Arc<[ProductValue]>) {
212-
self.deletes.insert(table_id, rows);
231+
///
232+
/// When `truncated` is set, the table has been emptied in this transaction.
233+
pub fn set_deletes_for_table(
234+
&mut self,
235+
table_id: TableId,
236+
table_name: &str,
237+
rows: Arc<[ProductValue]>,
238+
truncated: bool,
239+
) {
240+
let truncated = if truncated {
241+
TxTableTruncated::Yes
242+
} else {
243+
TxTableTruncated::No
244+
};
245+
let entry = TxDeleteEntry { truncated, rows };
246+
self.deletes.insert(table_id, entry);
213247
self.tables.entry(table_id).or_insert_with(|| table_name.to_owned());
214248
}
215249

@@ -238,26 +272,28 @@ impl TxData {
238272
}
239273

240274
/// Obtain an iterator over the deleted rows per table.
241-
pub fn deletes(&self) -> impl Iterator<Item = (&TableId, &Arc<[ProductValue]>)> + '_ {
242-
self.deletes.iter()
275+
pub fn deletes(&self) -> impl Iterator<Item = (&TableId, TxTableTruncated, &Arc<[ProductValue]>)> + '_ {
276+
self.deletes
277+
.iter()
278+
.map(|(table_id, entry)| (table_id, entry.truncated, &entry.rows))
243279
}
244280

245281
/// Get the `i`th deleted row for `table_id` if it exists
246282
pub fn get_ith_delete(&self, table_id: TableId, i: usize) -> Option<&ProductValue> {
247-
self.deletes.get(&table_id).and_then(|rows| rows.get(i))
283+
self.deletes.get(&table_id).and_then(|entry| entry.rows.get(i))
248284
}
249285

250286
/// Obtain an iterator over the inserted rows per table.
251287
///
252288
/// If you don't need access to the table name, [`Self::deletes`] is
253289
/// slightly more efficient.
254290
pub fn deletes_with_table_name(&self) -> impl Iterator<Item = (&TableId, &str, &Arc<[ProductValue]>)> + '_ {
255-
self.deletes.iter().map(|(table_id, rows)| {
291+
self.deletes.iter().map(|(table_id, entry)| {
256292
let table_name = self
257293
.tables
258294
.get(table_id)
259295
.expect("invalid `TxData`: partial table name mapping");
260-
(table_id, table_name.as_str(), rows)
296+
(table_id, table_name.as_str(), &entry.rows)
261297
})
262298
}
263299

@@ -266,7 +302,7 @@ impl TxData {
266302
/// This is used to determine if a transaction should be written to disk.
267303
pub fn has_rows_or_connect_disconnect(&self, reducer_context: Option<&ReducerContext>) -> bool {
268304
self.inserts().any(|(_, inserted_rows)| !inserted_rows.is_empty())
269-
|| self.deletes().any(|(_, deleted_rows)| !deleted_rows.is_empty())
305+
|| self.deletes().any(|(.., deleted_rows)| !deleted_rows.is_empty())
270306
|| matches!(
271307
reducer_context.map(|rcx| rcx.name.strip_prefix("__identity_")),
272308
Some(Some("connected__" | "disconnected__"))

0 commit comments

Comments
 (0)