Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 9 additions & 3 deletions crates/core/src/db/relational_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use spacetimedb_datastore::system_tables::StModuleRow;
use spacetimedb_datastore::system_tables::{StFields, StVarFields, StVarName, StVarRow, ST_MODULE_ID, ST_VAR_ID};
use spacetimedb_datastore::traits::{
InsertFlags, IsolationLevel, Metadata, MutTx as _, MutTxDatastore, Program, RowTypeForTable, Tx as _, TxDatastore,
UpdateFlags,
TxTableTruncated, UpdateFlags,
};
use spacetimedb_datastore::{
locking_tx_datastore::{
Expand Down Expand Up @@ -869,11 +869,17 @@ impl RelationalDB {
.collect();
let deletes: Box<_> = tx_data
.deletes()
.map(|(table_id, rowdata)| Ops {
.filter(|(_, truncated, _)| *truncated == TxTableTruncated::No)
.map(|(table_id, _, rowdata)| Ops {
table_id: *table_id,
rowdata: rowdata.clone(),
})
.collect();
let truncates: Box<_> = tx_data
.deletes()
.filter(|(_, truncated, _)| *truncated == TxTableTruncated::Yes)
.map(|(table_id, ..)| *table_id)
.collect();

let inputs = reducer_context.map(|rcx| rcx.into());

Expand All @@ -883,7 +889,7 @@ impl RelationalDB {
mutations: Some(Mutations {
inserts,
deletes,
truncates: [].into(),
truncates,
}),
};

Expand Down
19 changes: 10 additions & 9 deletions crates/core/src/subscription/tx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,10 @@ impl DeltaTableIndexes {
indexes
}

let deletes = data.deletes().map(|(table_id, _, rows)| (table_id, rows));
Self {
inserts: build_indexes_for_rows(tx, meta, data.inserts()),
deletes: build_indexes_for_rows(tx, meta, data.deletes()),
deletes: build_indexes_for_rows(tx, meta, deletes),
}
}
}
Expand Down Expand Up @@ -133,8 +134,8 @@ impl DeltaStore for DeltaTx<'_> {
self.data
.and_then(|data| {
data.inserts()
.find(|(id, _)| **id == table_id)
.map(|(_, rows)| rows.len())
.find(|(id, ..)| **id == table_id)
.map(|(.., rows)| rows.len())
})
.unwrap_or_default()
}
Expand All @@ -143,25 +144,25 @@ impl DeltaStore for DeltaTx<'_> {
self.data
.and_then(|data| {
data.deletes()
.find(|(id, _)| **id == table_id)
.map(|(_, rows)| rows.len())
.find(|(id, ..)| **id == table_id)
.map(|(.., rows)| rows.len())
})
.unwrap_or_default()
}

fn inserts_for_table(&self, table_id: TableId) -> Option<std::slice::Iter<'_, ProductValue>> {
self.data.and_then(|data| {
data.inserts()
.find(|(id, _)| **id == table_id)
.map(|(_, rows)| rows.iter())
.find(|(id, ..)| **id == table_id)
.map(|(.., rows)| rows.iter())
})
}

fn deletes_for_table(&self, table_id: TableId) -> Option<std::slice::Iter<'_, ProductValue>> {
self.data.and_then(|data| {
data.deletes()
.find(|(id, _)| **id == table_id)
.map(|(_, rows)| rows.iter())
.find(|(id, ..)| **id == table_id)
.map(|(.., rows)| rows.iter())
})
}

Expand Down
73 changes: 49 additions & 24 deletions crates/datastore/src/locking_tx_datastore/committed_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -345,19 +345,35 @@ impl CommittedState {
Ok(())
}

pub(super) fn replay_truncate(&mut self, table_id: TableId) -> Result<()> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

visit_delete happens before visit_truncate, which removes rows from st_* tables, hence schema_for_table fails inside visit_truncate. I think we have to avoid calling that method.

// (1) Table dropped? Avoid an error and just ignore the row instead.
if self.table_dropped.contains(&table_id) {
return Ok(());
}

// Get the table for mutation.
let (table, blob_store, ..) = self.get_table_and_blob_store_mut(table_id)?;

// We do not need to consider a truncation of `st_table` itself,
// as if that happens, the database is bricked.

table.clear(blob_store);

Ok(())
}

pub(super) fn replay_delete_by_rel(&mut self, table_id: TableId, row: &ProductValue) -> Result<()> {
// (1) Table dropped? Avoid an error and just ignore the row instead.
if self.table_dropped.contains(&table_id) {
return Ok(());
}

// Get the table for mutation.
let table = match self.tables.get_mut(&table_id) {
Some(t) => t,
// (1) If it was dropped, avoid an error and just ignore the row instead.
None if self.table_dropped.contains(&table_id) => return Ok(()),
None => return Err(TableError::IdNotFoundState(table_id).into()),
};
let (table, blob_store, _, page_pool) = self.get_table_and_blob_store_mut(table_id)?;

// Delete the row.
let blob_store = &mut self.blob_store;
table
.delete_equal_row(&self.page_pool, blob_store, row)
.delete_equal_row(page_pool, blob_store, row)
.map_err(TableError::Bflatn)?
.ok_or_else(|| anyhow!("Delete for non-existent row when replaying transaction"))?;

Expand Down Expand Up @@ -493,9 +509,9 @@ impl CommittedState {
for index_row in rows {
let index_id = index_row.index_id;
let table_id = index_row.table_id;
let (Some(table), blob_store, index_id_map) = self.get_table_and_blob_store_mut(table_id) else {
panic!("Cannot create index for table which doesn't exist in committed state");
};
let (table, blob_store, index_id_map, _) = self
.get_table_and_blob_store_mut(table_id)
.expect("index should exist in committed state; cannot create it");
let algo: IndexAlgorithm = index_row.index_algorithm.into();
let columns: ColSet = algo.columns().into();
let is_unique = unique_constraints.contains(&(table_id, columns));
Expand Down Expand Up @@ -596,8 +612,7 @@ impl CommittedState {
"Cannot get TX_STATE RowPointer from CommittedState.",
);
let table = self
.tables
.get(&table_id)
.get_table(table_id)
.expect("Attempt to get COMMITTED_STATE row from table not present in tables.");
// TODO(perf, deep-integration): Use `get_row_ref_unchecked`.
table.get_row_ref(&self.blob_store, row_ptr).unwrap()
Expand Down Expand Up @@ -677,17 +692,18 @@ impl CommittedState {

if !deletes.is_empty() {
let table_name = &*table.get_schema().table_name;
// TODO(centril): Pass this along to record truncated tables.
let _truncated = table.row_count == 0;
tx_data.set_deletes_for_table(table_id, table_name, deletes.into());
let truncated = table.row_count == 0;
Copy link
Contributor

@Shubham8287 Shubham8287 Sep 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we rather let caller of fn delete_rows to pass truncated as param and maybe just assert on row counts here?(pass it true if table is from PendingSchemaChanges::Removed).

Reason is: this code considers empty table also as truncated table for generating TxData.
Which may not be the problem during replay due to get_table_and_blob_store_or_create will create it again but still not seems great.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reason is: this code considers empty table also as truncated table for generating TxData. Which may not be the problem during replay due to get_table_and_blob_store_or_create will create it again but still not seems great.

An emptied table is I think the meaning of truncation, see e.g., https://learn.microsoft.com/en-us/sql/t-sql/statements/truncate-table-transact-sql?view=sql-server-ver17

So you can think of truncate as clear_table.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From the link,

TRUNCATE TABLE removes all rows from a table, but the table structure and its columns, constraints, indexes, and so on, remain. To remove the table definition in addition to its data, use the DROP TABLE statement.

Makes sense.

Copy link
Contributor

@Shubham8287 Shubham8287 Sep 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So the problem is, table is temporarily empty and rows might be inserted back during merge_apply_inserts. So, we can not mark it truncated before checking that. I will push the fix for it.

tx_data.set_deletes_for_table(table_id, table_name, deletes.into(), truncated);
}
}

for (table_id, row_ptrs) in delete_tables {
if let (Some(table), blob_store, _) = self.get_table_and_blob_store_mut(table_id) {
delete_rows(tx_data, table_id, table, blob_store, row_ptrs.len(), row_ptrs.iter());
} else if !row_ptrs.is_empty() {
panic!("Deletion for non-existent table {table_id:?}... huh?");
match self.get_table_and_blob_store_mut(table_id) {
Ok((table, blob_store, ..)) => {
delete_rows(tx_data, table_id, table, blob_store, row_ptrs.len(), row_ptrs.iter())
}
Err(_) if !row_ptrs.is_empty() => panic!("Deletion for non-existent table {table_id:?}... huh?"),
Err(_) => {}
}
}

Expand Down Expand Up @@ -866,12 +882,21 @@ impl CommittedState {
pub(super) fn get_table_and_blob_store_mut(
&mut self,
table_id: TableId,
) -> (Option<&mut Table>, &mut dyn BlobStore, &mut IndexIdMap) {
(
self.tables.get_mut(&table_id),
) -> Result<(&mut Table, &mut dyn BlobStore, &mut IndexIdMap, &PagePool)> {
// NOTE(centril): `TableError` is a fairly large type.
// Not making this lazy made `TableError::drop` show up in perf.
// TODO(centril): Box all the errors.
#[allow(clippy::unnecessary_lazy_evaluations)]
let table = self
.tables
.get_mut(&table_id)
.ok_or_else(|| TableError::IdNotFoundState(table_id))?;
Ok((
table,
&mut self.blob_store as &mut dyn BlobStore,
&mut self.index_id_map,
)
&self.page_pool,
))
}

fn make_table(schema: Arc<TableSchema>) -> Table {
Expand Down
36 changes: 28 additions & 8 deletions crates/datastore/src/locking_tx_datastore/datastore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1139,8 +1139,26 @@ impl<F: FnMut(u64)> spacetimedb_commitlog::payload::txdata::Visitor for ReplayVi
Ok(row)
}

fn visit_truncate(&mut self, _table_id: TableId) -> std::result::Result<(), Self::Error> {
Err(anyhow!("visit: truncate not yet supported").into())
fn visit_truncate(&mut self, table_id: TableId) -> std::result::Result<(), Self::Error> {
let schema = self.committed_state.schema_for_table(table_id)?;
// TODO: avoid clone
let table_name = schema.table_name.clone();

self.committed_state.replay_truncate(table_id).with_context(|| {
format!(
"Error truncating table {:?} during transaction {:?} playback",
table_id, self.committed_state.next_tx_offset
)
})?;

// NOTE: the `rdb_num_table_rows` metric is used by the query optimizer,
// and therefore has performance implications and must not be disabled.
DB_METRICS
.rdb_num_table_rows
.with_label_values(self.database_identity, &table_id.into(), &table_name)
.set(0);

Ok(())
}

fn visit_tx_start(&mut self, offset: u64) -> std::result::Result<(), Self::Error> {
Expand Down Expand Up @@ -1194,7 +1212,7 @@ mod tests {
ST_ROW_LEVEL_SECURITY_NAME, ST_SCHEDULED_ID, ST_SCHEDULED_NAME, ST_SEQUENCE_ID, ST_SEQUENCE_NAME,
ST_TABLE_NAME, ST_VAR_ID, ST_VAR_NAME,
};
use crate::traits::{IsolationLevel, MutTx};
use crate::traits::{IsolationLevel, MutTx, TxTableTruncated};
use crate::Result;
use bsatn::to_vec;
use core::{fmt, mem};
Expand Down Expand Up @@ -2816,7 +2834,7 @@ mod tests {
let tx_data_2 = commit(&datastore, tx)?;
// Ensure that none of the commits deleted rows in our table.
for tx_data in [&tx_data_1, &tx_data_2] {
assert_eq!(tx_data.deletes().find(|(tid, _)| **tid == table_id), None);
assert_eq!(tx_data.deletes().find(|(tid, ..)| **tid == table_id), None);
}
// Ensure that the first commit added the row but that the second didn't.
for (tx_data, expected_rows) in [(&tx_data_1, vec![row.clone()]), (&tx_data_2, vec![])] {
Expand Down Expand Up @@ -3140,11 +3158,12 @@ mod tests {
// Now drop the table again and commit.
assert!(datastore.drop_table_mut_tx(&mut tx, table_id).is_ok());
let tx_data = commit(&datastore, tx)?;
let (_, deleted) = tx_data
let (_, truncated, deleted_rows) = tx_data
.deletes()
.find(|(id, _)| **id == table_id)
.find(|(id, ..)| **id == table_id)
.expect("should have deleted rows for `table_id`");
assert_eq!(&**deleted, [row]);
assert_eq!(&**deleted_rows, [row]);
assert_eq!(truncated, TxTableTruncated::Yes);

// In the next transaction, the table doesn't exist.
assert!(
Expand Down Expand Up @@ -3348,8 +3367,9 @@ mod tests {
let to_product = |col: &ColumnSchema| value_serialize(&StColumnRow::from(col.clone())).into_product().unwrap();
let (_, inserts) = tx_data.inserts().find(|(id, _)| **id == ST_COLUMN_ID).unwrap();
assert_eq!(&**inserts, [to_product(&columns[1])].as_slice());
let (_, deletes) = tx_data.deletes().find(|(id, _)| **id == ST_COLUMN_ID).unwrap();
let (_, truncated, deletes) = tx_data.deletes().find(|(id, ..)| **id == ST_COLUMN_ID).unwrap();
assert_eq!(&**deletes, [to_product(&columns_original[1])].as_slice());
assert_eq!(truncated, TxTableTruncated::No);

// Check that we can successfully scan using the new schema type post commit.
let tx = begin_tx(&datastore);
Expand Down
9 changes: 2 additions & 7 deletions crates/datastore/src/locking_tx_datastore/mut_tx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -414,13 +414,8 @@ impl MutTxId {
TxTableForInsertion<'_>,
(&mut Table, &mut dyn BlobStore, &mut IndexIdMap),
)> {
let (commit_table, commit_bs, idx_map) = self.committed_state_write_lock.get_table_and_blob_store_mut(table_id);
// NOTE(centril): `TableError` is a fairly large type.
// Not making this lazy made `TableError::drop` show up in perf.
// TODO(centril): Box all the errors.
#[allow(clippy::unnecessary_lazy_evaluations)]
let commit_table = commit_table.ok_or_else(|| TableError::IdNotFoundState(table_id))?;

let (commit_table, commit_bs, idx_map, _) =
self.committed_state_write_lock.get_table_and_blob_store_mut(table_id)?;
// Get the insert table, so we can write the row into it.
let tx = self
.tx_state
Expand Down
Loading
Loading