From 2706c4c4d9503888b58c621c85527473a32c5309 Mon Sep 17 00:00:00 2001 From: Mazdak Farrokhzad Date: Mon, 1 Sep 2025 16:50:40 +0200 Subject: [PATCH 1/8] datastore: add clear_table op --- crates/core/src/db/relational_db.rs | 10 ++---- .../src/locking_tx_datastore/mut_tx.rs | 20 ++++++++++++ crates/table/src/table.rs | 31 ++++++++++++------- 3 files changed, 43 insertions(+), 18 deletions(-) diff --git a/crates/core/src/db/relational_db.rs b/crates/core/src/db/relational_db.rs index 6fec0c1b62b..55d60bd8858 100644 --- a/crates/core/src/db/relational_db.rs +++ b/crates/core/src/db/relational_db.rs @@ -1397,13 +1397,9 @@ impl RelationalDB { } /// Clear all rows from a table without dropping it. - pub fn clear_table(&self, tx: &mut MutTx, table_id: TableId) -> Result<(), DBError> { - let relation = self - .iter_mut(tx, table_id)? - .map(|row_ref| row_ref.pointer()) - .collect::>(); - self.delete(tx, table_id, relation); - Ok(()) + pub fn clear_table(&self, tx: &mut MutTx, table_id: TableId) -> Result { + let rows_deleted = tx.clear_table(table_id)?; + Ok(rows_deleted) } pub fn create_sequence(&self, tx: &mut MutTx, sequence_schema: SequenceSchema) -> Result { diff --git a/crates/datastore/src/locking_tx_datastore/mut_tx.rs b/crates/datastore/src/locking_tx_datastore/mut_tx.rs index c3346cd231a..aaafcec63b1 100644 --- a/crates/datastore/src/locking_tx_datastore/mut_tx.rs +++ b/crates/datastore/src/locking_tx_datastore/mut_tx.rs @@ -1865,6 +1865,26 @@ impl MutTxId { row_pointer, ) } + + // Clears the table for `table_id`, removing all rows. + pub fn clear_table(&mut self, table_id: TableId) -> Result { + // Get the commit table. + let (commit_table, commit_bs, ..) = self.committed_state_write_lock.get_table_and_blob_store(table_id)?; + + // Get the insert table and delete all rows from it. + let (tx_table, tx_blob_store, delete_table) = self + .tx_state + .get_table_and_blob_store_or_create_from(table_id, commit_table); + let mut rows_removed = tx_table.clear(tx_blob_store); + + // Mark every table in the committed state as deleted. + for row in commit_table.scan_rows(commit_bs) { + delete_table.insert(row.pointer()); + rows_removed += 1; + } + + Ok(rows_removed) + } } pub(super) fn delete( diff --git a/crates/table/src/table.rs b/crates/table/src/table.rs index 11493edddca..8f130137cce 100644 --- a/crates/table/src/table.rs +++ b/crates/table/src/table.rs @@ -1138,10 +1138,10 @@ impl Table { /// Deletes the row identified by `ptr` from the table. /// - /// Returns the number of blob bytes deleted. This method does not update statistics by itself. + /// This method does update statistics. /// /// SAFETY: `self.is_row_present(row)` must hold. - unsafe fn delete_unchecked(&mut self, blob_store: &mut dyn BlobStore, ptr: RowPointer) -> BlobNumBytes { + unsafe fn delete_unchecked(&mut self, blob_store: &mut dyn BlobStore, ptr: RowPointer) { // Delete row from indices. // Do this before the actual deletion, as `index.delete` needs a `RowRef` // so it can extract the appropriate value. @@ -1149,7 +1149,9 @@ impl Table { unsafe { self.delete_from_indices(blob_store, ptr) }; // SAFETY: Caller promised that `self.is_row_present(row)` holds. - unsafe { self.delete_internal(blob_store, ptr) } + let blob_bytes_deleted = unsafe { self.delete_internal(blob_store, ptr) }; + + self.update_statistics_deleted_row(blob_bytes_deleted); } /// Delete `row_ref` from all the indices of this table until `index_id` is reached. @@ -1201,8 +1203,7 @@ impl Table { let ret = before(row_ref); // SAFETY: We've checked above that `self.is_row_present(ptr)`. - let blob_bytes_deleted = unsafe { self.delete_unchecked(blob_store, ptr) }; - self.update_statistics_deleted_row(blob_bytes_deleted); + unsafe { self.delete_unchecked(blob_store, ptr) }; Some(ret) } @@ -1242,11 +1243,8 @@ impl Table { // If an equal row was present, delete it. if let Some(existing_row_ptr) = existing_row_ptr { - let blob_bytes_deleted = unsafe { - // SAFETY: `find_same_row` ensures that the pointer is valid. - self.delete_unchecked(blob_store, existing_row_ptr) - }; - self.update_statistics_deleted_row(blob_bytes_deleted); + // SAFETY: `find_same_row` ensures that the pointer is valid. + unsafe { self.delete_unchecked(blob_store, existing_row_ptr) }; } // Remove the temporary row we inserted in the beginning. @@ -1259,6 +1257,17 @@ impl Table { Ok(existing_row_ptr) } + /// Clears this table, removing all present rows from it. + pub fn clear(&mut self, blob_store: &mut dyn BlobStore) -> usize { + let ptrs = self.scan_rows(blob_store).map(|row| row.pointer()).collect::>(); + let len = ptrs.len(); + for ptr in ptrs { + // SAFETY: `ptr` came rom `self.scan_rows(...)`, so it's present. + unsafe { self.delete_unchecked(blob_store, ptr) }; + } + len + } + /// Returns the row type for rows in this table. pub fn get_row_type(&self) -> &ProductType { self.get_schema().get_row_type() @@ -1359,7 +1368,7 @@ impl Table { Some(index) } - /// Returns an iterator over all the rows of `self`, yielded as [`RefRef`]s. + /// Returns an iterator over all the rows of `self`, yielded as [`RowRef`]s. pub fn scan_rows<'a>(&'a self, blob_store: &'a dyn BlobStore) -> TableScanIter<'a> { TableScanIter { current_page: None, // Will be filled by the iterator. From 6ae80bb7ba2a6e72f4d239f850edd9479fecc986 Mon Sep 17 00:00:00 2001 From: Mazdak Farrokhzad Date: Mon, 1 Sep 2025 17:56:03 +0200 Subject: [PATCH 2/8] datastore: fix drop_table not deleting rows --- .../locking_tx_datastore/committed_state.rs | 80 ++++++++++++++----- .../src/locking_tx_datastore/mut_tx.rs | 6 +- crates/table/src/table.rs | 11 ++- 3 files changed, 73 insertions(+), 24 deletions(-) diff --git a/crates/datastore/src/locking_tx_datastore/committed_state.rs b/crates/datastore/src/locking_tx_datastore/committed_state.rs index ad3c98ca2b1..11a32d417e5 100644 --- a/crates/datastore/src/locking_tx_datastore/committed_state.rs +++ b/crates/datastore/src/locking_tx_datastore/committed_state.rs @@ -44,6 +44,7 @@ use spacetimedb_table::{ }; use std::collections::BTreeMap; use std::sync::Arc; +use thin_vec::ThinVec; /// Contains the live, in-memory snapshot of a database. This structure /// is exposed in order to support tools wanting to process the commit @@ -594,7 +595,7 @@ impl CommittedState { let mut tx_data = TxData::default(); // First, apply deletes. This will free up space in the committed tables. - self.merge_apply_deletes(&mut tx_data, tx_state.delete_tables); + self.merge_apply_deletes(&mut tx_data, tx_state.delete_tables, tx_state.pending_schema_changes); // Then, apply inserts. This will re-fill the holes freed by deletions // before allocating new pages. @@ -610,33 +611,68 @@ impl CommittedState { tx_data } - fn merge_apply_deletes(&mut self, tx_data: &mut TxData, delete_tables: BTreeMap) { + fn merge_apply_deletes( + &mut self, + tx_data: &mut TxData, + delete_tables: BTreeMap, + pending_schema_changes: ThinVec, + ) { + fn delete_rows( + tx_data: &mut TxData, + table_id: TableId, + table: &mut Table, + blob_store: &mut dyn BlobStore, + row_ptrs_len: usize, + row_ptrs: impl Iterator, + ) { + let mut deletes = Vec::with_capacity(row_ptrs_len); + + // Note: we maintain the invariant that the delete_tables + // holds only committed rows which should be deleted, + // i.e. `RowPointer`s with `SquashedOffset::COMMITTED_STATE`, + // so no need to check before applying the deletes. + for row_ptr in row_ptrs { + debug_assert!(row_ptr.squashed_offset().is_committed_state()); + + // TODO: re-write `TxData` to remove `ProductValue`s + let pv = table + .delete(blob_store, row_ptr, |row| row.to_product_value()) + .expect("Delete for non-existent row!"); + deletes.push(pv); + } + + if !deletes.is_empty() { + let table_name = &*table.get_schema().table_name; + // TODO(centril): Pass this along to record truncated tables. + let _truncated = table.row_count == 0; + tx_data.set_deletes_for_table(table_id, table_name, deletes.into()); + } + } + for (table_id, row_ptrs) in delete_tables { if let (Some(table), blob_store, _) = self.get_table_and_blob_store_mut(table_id) { - let mut deletes = Vec::with_capacity(row_ptrs.len()); - - // Note: we maintain the invariant that the delete_tables - // holds only committed rows which should be deleted, - // i.e. `RowPointer`s with `SquashedOffset::COMMITTED_STATE`, - // so no need to check before applying the deletes. - for row_ptr in row_ptrs.iter() { - debug_assert!(row_ptr.squashed_offset().is_committed_state()); - - // TODO: re-write `TxData` to remove `ProductValue`s - let pv = table - .delete(blob_store, row_ptr, |row| row.to_product_value()) - .expect("Delete for non-existent row!"); - deletes.push(pv); - } - - if !deletes.is_empty() { - let table_name = &*table.get_schema().table_name; - tx_data.set_deletes_for_table(table_id, table_name, deletes.into()); - } + delete_rows(tx_data, table_id, table, blob_store, row_ptrs.len(), row_ptrs.iter()); } else if !row_ptrs.is_empty() { panic!("Deletion for non-existent table {table_id:?}... huh?"); } } + + // Delete all tables marked for deletion. + // The order here does not matter as once a `table_id` has been dropped + // it will never be re-created. + for change in pending_schema_changes { + if let PendingSchemaChange::TableRemoved(table_id, mut table) = change { + let row_ptrs = table.scan_all_row_ptrs(); + delete_rows( + tx_data, + table_id, + &mut table, + &mut self.blob_store, + row_ptrs.len(), + row_ptrs.into_iter(), + ); + } + } } fn merge_apply_inserts( diff --git a/crates/datastore/src/locking_tx_datastore/mut_tx.rs b/crates/datastore/src/locking_tx_datastore/mut_tx.rs index aaafcec63b1..5a754f13021 100644 --- a/crates/datastore/src/locking_tx_datastore/mut_tx.rs +++ b/crates/datastore/src/locking_tx_datastore/mut_tx.rs @@ -326,6 +326,8 @@ impl MutTxId { } pub fn drop_table(&mut self, table_id: TableId) -> Result<()> { + self.clear_table(table_id)?; + let schema = &*self.schema_for_table(table_id)?; for row in &schema.indexes { @@ -352,8 +354,10 @@ impl MutTxId { )?; } - // Delete the table and its rows and indexes from memory. + // Delete the table from memory, both in the tx an committed states. self.tx_state.insert_tables.remove(&table_id); + // No need to keep the delete tables. + // By seeing `PendingSchemaChange::TableRemoved`, `merge` knows that all rows were deleted. self.tx_state.delete_tables.remove(&table_id); let commit_table = self .committed_state_write_lock diff --git a/crates/table/src/table.rs b/crates/table/src/table.rs index 8f130137cce..8b907851d29 100644 --- a/crates/table/src/table.rs +++ b/crates/table/src/table.rs @@ -1,3 +1,5 @@ +use crate::blob_store::NullBlobStore; + use super::{ bflatn_from::serialize_row_from_page, bflatn_to::{write_row_to_pages, write_row_to_pages_bsatn, Error}, @@ -1259,7 +1261,7 @@ impl Table { /// Clears this table, removing all present rows from it. pub fn clear(&mut self, blob_store: &mut dyn BlobStore) -> usize { - let ptrs = self.scan_rows(blob_store).map(|row| row.pointer()).collect::>(); + let ptrs = self.scan_all_row_ptrs(); let len = ptrs.len(); for ptr in ptrs { // SAFETY: `ptr` came rom `self.scan_rows(...)`, so it's present. @@ -1378,6 +1380,13 @@ impl Table { } } + /// Returns a list of all present row pointers. + pub fn scan_all_row_ptrs(&self) -> Vec { + let mut ptrs = Vec::with_capacity(self.row_count as usize); + ptrs.extend(self.scan_rows(&NullBlobStore).map(|row| row.pointer())); + ptrs + } + /// Returns this table combined with the index for [`IndexId`], if any. pub fn get_index_by_id_with_table<'a>( &'a self, From 4b6e2aa48439d7e2a15742bcfde281d271ea9fd9 Mon Sep 17 00:00:00 2001 From: Shubham Mishra Date: Tue, 2 Sep 2025 06:24:11 +0530 Subject: [PATCH 3/8] datastore: test & handle `expect` during tx_metrics reporting --- .../src/locking_tx_datastore/datastore.rs | 32 ++++++++++++------- 1 file changed, 21 insertions(+), 11 deletions(-) diff --git a/crates/datastore/src/locking_tx_datastore/datastore.rs b/crates/datastore/src/locking_tx_datastore/datastore.rs index 1577a379c43..c550b6d29ee 100644 --- a/crates/datastore/src/locking_tx_datastore/datastore.rs +++ b/crates/datastore/src/locking_tx_datastore/datastore.rs @@ -715,17 +715,16 @@ impl TxMetrics { let mut table_stats = as HashCollectionExt>::with_capacity(tx_data.num_tables_affected()); for (table_id, _) in tx_data.table_ids_and_names() { - let table = committed_state - .get_table(table_id) - .expect("should have a table in committed state for one in tx data"); - table_stats.insert( - table_id, - TableStats { - row_count: table.row_count, - bytes_occupied_overestimate: table.bytes_occupied_overestimate(), - num_indices: table.num_indices(), - }, - ); + committed_state.get_table(table_id).and_then(|table| { + table_stats.insert( + table_id, + TableStats { + row_count: table.row_count, + bytes_occupied_overestimate: table.bytes_occupied_overestimate(), + num_indices: table.num_indices(), + }, + ) + }); } table_stats }) @@ -3096,6 +3095,17 @@ mod tests { "Table should still exist", ); assert_eq!(all_rows(&datastore, &tx, table_id), [row]); + let _ = datastore.rollback_mut_tx(tx); + + let mut tx = begin_mut_tx(&datastore); + assert!(datastore.drop_table_mut_tx(&mut tx, table_id).is_ok()); + commit(&datastore, tx)?; + + let tx = begin_mut_tx(&datastore); + assert!( + !datastore.table_id_exists_mut_tx(&tx, &table_id), + "Table should be removed", + ); Ok(()) } From fb9bfec5d38fe5f79e77e1b3bf3d2f55fc0d1dcb Mon Sep 17 00:00:00 2001 From: Mazdak Farrokhzad Date: Tue, 2 Sep 2025 08:37:00 +0200 Subject: [PATCH 4/8] fix typo Co-authored-by: Shubham Mishra Signed-off-by: Mazdak Farrokhzad --- crates/datastore/src/locking_tx_datastore/mut_tx.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/datastore/src/locking_tx_datastore/mut_tx.rs b/crates/datastore/src/locking_tx_datastore/mut_tx.rs index 5a754f13021..fbee223c89a 100644 --- a/crates/datastore/src/locking_tx_datastore/mut_tx.rs +++ b/crates/datastore/src/locking_tx_datastore/mut_tx.rs @@ -1881,7 +1881,7 @@ impl MutTxId { .get_table_and_blob_store_or_create_from(table_id, commit_table); let mut rows_removed = tx_table.clear(tx_blob_store); - // Mark every table in the committed state as deleted. + // Mark every row in the committed state as deleted. for row in commit_table.scan_rows(commit_bs) { delete_table.insert(row.pointer()); rows_removed += 1; From b48e42eb666374f79643a44c8aa2756a98cf22a1 Mon Sep 17 00:00:00 2001 From: Mazdak Farrokhzad Date: Tue, 2 Sep 2025 08:47:00 +0200 Subject: [PATCH 5/8] enhance test_drop_table_is_transactional further --- .../src/locking_tx_datastore/datastore.rs | 24 ++++++++++++------- 1 file changed, 16 insertions(+), 8 deletions(-) diff --git a/crates/datastore/src/locking_tx_datastore/datastore.rs b/crates/datastore/src/locking_tx_datastore/datastore.rs index c550b6d29ee..499cf08a35e 100644 --- a/crates/datastore/src/locking_tx_datastore/datastore.rs +++ b/crates/datastore/src/locking_tx_datastore/datastore.rs @@ -3065,7 +3065,7 @@ mod tests { let (datastore, mut tx, table_id) = setup_table()?; // Insert a row and commit. - let row = random_row(); + let row: ProductValue = random_row(); insert(&datastore, &mut tx, table_id, &row)?; commit(&datastore, tx)?; @@ -3088,22 +3088,30 @@ mod tests { let _ = datastore.rollback_mut_tx(tx); // Ensure the table still exists in the next transaction. - let tx = begin_mut_tx(&datastore); + let mut tx = begin_mut_tx(&datastore); assert_eq!(tx.pending_schema_changes(), []); assert!( datastore.table_id_exists_mut_tx(&tx, &table_id), "Table should still exist", ); - assert_eq!(all_rows(&datastore, &tx, table_id), [row]); - let _ = datastore.rollback_mut_tx(tx); + assert_eq!(all_rows(&datastore, &tx, table_id), [row.clone()]); - let mut tx = begin_mut_tx(&datastore); + // Now drop the table again and commit. assert!(datastore.drop_table_mut_tx(&mut tx, table_id).is_ok()); - commit(&datastore, tx)?; + let tx_data = commit(&datastore, tx)?; + let (_, deleted) = tx_data + .deletes() + .find(|(id, _)| **id == table_id) + .expect("should have deleted rows for `table_id`"); + assert_eq!(&**deleted, [row]); - let tx = begin_mut_tx(&datastore); + // In the next transaction, the table doesn't exist. assert!( - !datastore.table_id_exists_mut_tx(&tx, &table_id), + !datastore.table_id_exists_mut_tx(&begin_mut_tx(&datastore), &table_id), + "Table should be removed", + ); + assert!( + !datastore.table_id_exists_tx(&begin_tx(&datastore), &table_id), "Table should be removed", ); From 22b068bbc186daecb71acd4872f1855f20204b90 Mon Sep 17 00:00:00 2001 From: Mazdak Farrokhzad Date: Fri, 5 Sep 2025 14:48:00 +0200 Subject: [PATCH 6/8] replay_delete_by_rel: delete in-mmory table during replay --- .../locking_tx_datastore/committed_state.rs | 49 +++++++++++++++---- crates/memory-usage/src/lib.rs | 7 +++ 2 files changed, 46 insertions(+), 10 deletions(-) diff --git a/crates/datastore/src/locking_tx_datastore/committed_state.rs b/crates/datastore/src/locking_tx_datastore/committed_state.rs index 11a32d417e5..62e541bf826 100644 --- a/crates/datastore/src/locking_tx_datastore/committed_state.rs +++ b/crates/datastore/src/locking_tx_datastore/committed_state.rs @@ -24,7 +24,7 @@ use crate::{ use anyhow::anyhow; use core::{convert::Infallible, ops::RangeBounds}; use itertools::Itertools; -use spacetimedb_data_structures::map::{HashSet, IntMap}; +use spacetimedb_data_structures::map::{HashSet, IntMap, IntSet}; use spacetimedb_lib::{ db::auth::{StAccess, StTableType}, Identity, @@ -66,6 +66,11 @@ pub struct CommittedState { /// Pages are shared between all modules running on a particular host, /// not allocated per-module. pub(super) page_pool: PagePool, + /// Whether the table was dropped during replay. + /// TODO(centril): Only used during bootstrap and is otherwise unused. + /// We should split `CommittedState` into two types + /// where one, e.g., `ReplayCommittedState`, has this field. + table_dropped: IntSet, } impl MemoryUsage for CommittedState { @@ -76,9 +81,14 @@ impl MemoryUsage for CommittedState { blob_store, index_id_map, page_pool: _, + table_dropped, } = self; // NOTE(centril): We do not want to include the heap usage of `page_pool` as it's a shared resource. - next_tx_offset.heap_usage() + tables.heap_usage() + blob_store.heap_usage() + index_id_map.heap_usage() + next_tx_offset.heap_usage() + + tables.heap_usage() + + blob_store.heap_usage() + + index_id_map.heap_usage() + + table_dropped.heap_usage() } } @@ -157,6 +167,7 @@ impl CommittedState { tables: <_>::default(), blob_store: <_>::default(), index_id_map: <_>::default(), + table_dropped: <_>::default(), page_pool, } } @@ -334,17 +345,30 @@ impl CommittedState { Ok(()) } - pub(super) fn replay_delete_by_rel(&mut self, table_id: TableId, rel: &ProductValue) -> Result<()> { - let table = self - .tables - .get_mut(&table_id) - .ok_or(TableError::IdNotFoundState(table_id))?; + pub(super) fn replay_delete_by_rel(&mut self, table_id: TableId, row: &ProductValue) -> Result<()> { + // Get the table for mutation. + // If it was dropped, avoid an error and just ignore the row instead. + let table = match self.tables.get_mut(&table_id) { + Some(t) => t, + None if self.table_dropped.contains(&table_id) => return Ok(()), + None => return Err(TableError::IdNotFoundState(table_id).into()), + }; + + // Delete the row. let blob_store = &mut self.blob_store; table - .delete_equal_row(&self.page_pool, blob_store, rel) + .delete_equal_row(&self.page_pool, blob_store, row) .map_err(TableError::Bflatn)? .ok_or_else(|| anyhow!("Delete for non-existent row when replaying transaction"))?; + if table_id == ST_TABLE_ID { + // A row was removed from `st_table`, so a table was dropped. + // Remove that table from the in-memory structures. + self.tables + .remove(&Self::read_table_id(row)) + .expect("table to remove should exist"); + } + Ok(()) } @@ -379,8 +403,7 @@ impl CommittedState { /// /// The `row_ptr` is a pointer to `row`. fn st_column_changed(&mut self, row: &ProductValue, row_ptr: RowPointer) -> Result<()> { - let target_table_id = TableId::deserialize(ValueDeserializer::from_ref(&row.elements[0])) - .expect("first field in `st_column` should decode to a `TableId`"); + let target_table_id = Self::read_table_id(row); let target_col_id = ColId::deserialize(ValueDeserializer::from_ref(&row.elements[1])) .expect("second field in `st_column` should decode to a `ColId`"); @@ -411,6 +434,12 @@ impl CommittedState { Ok(()) } + /// Assuming that a `TableId` is stored as the first field in `row`, read it. + fn read_table_id(row: &ProductValue) -> TableId { + TableId::deserialize(ValueDeserializer::from_ref(&row.elements[0])) + .expect("first field in `st_column` should decode to a `TableId`") + } + pub(super) fn build_sequence_state(&mut self, sequence_state: &mut SequencesState) -> Result<()> { let st_sequences = self.tables.get(&ST_SEQUENCE_ID).unwrap(); for row_ref in st_sequences.scan_rows(&self.blob_store) { diff --git a/crates/memory-usage/src/lib.rs b/crates/memory-usage/src/lib.rs index 40bbb8fc7ef..8460281cfe5 100644 --- a/crates/memory-usage/src/lib.rs +++ b/crates/memory-usage/src/lib.rs @@ -112,6 +112,13 @@ impl MemoryUsage for hashbrown::HashSet { + fn heap_usage(&self) -> usize { + self.allocation_size() + self.iter().map(|k| k.heap_usage()).sum::() + } +} + impl MemoryUsage for std::collections::BTreeMap { fn heap_usage(&self) -> usize { // NB: this is best-effort, since we don't have a `capacity()` method on `BTreeMap`. From 813f34c228dc16ff7623457c8014dc36621db05d Mon Sep 17 00:00:00 2001 From: Mazdak Farrokhzad Date: Fri, 5 Sep 2025 15:09:49 +0200 Subject: [PATCH 7/8] TxMetrics::report: remove_label_values for dropped tables --- .../src/locking_tx_datastore/datastore.rs | 141 +++++++++++------- 1 file changed, 91 insertions(+), 50 deletions(-) diff --git a/crates/datastore/src/locking_tx_datastore/datastore.rs b/crates/datastore/src/locking_tx_datastore/datastore.rs index 499cf08a35e..c5dc3fe5abe 100644 --- a/crates/datastore/src/locking_tx_datastore/datastore.rs +++ b/crates/datastore/src/locking_tx_datastore/datastore.rs @@ -666,7 +666,9 @@ impl MutTxDatastore for Locking { /// Various measurements, needed for metrics, of the work performed by a transaction. #[must_use = "TxMetrics should be reported"] pub struct TxMetrics { - table_stats: HashMap, + /// The transaction metrics for a particular table. + /// The value `None` for a [`TableId`] means that it was deleted. + table_stats: HashMap>, workload: WorkloadType, database_identity: Identity, elapsed_time: Duration, @@ -715,16 +717,12 @@ impl TxMetrics { let mut table_stats = as HashCollectionExt>::with_capacity(tx_data.num_tables_affected()); for (table_id, _) in tx_data.table_ids_and_names() { - committed_state.get_table(table_id).and_then(|table| { - table_stats.insert( - table_id, - TableStats { - row_count: table.row_count, - bytes_occupied_overestimate: table.bytes_occupied_overestimate(), - num_indices: table.num_indices(), - }, - ) + let stats = committed_state.get_table(table_id).map(|table| TableStats { + row_count: table.row_count, + bytes_occupied_overestimate: table.bytes_occupied_overestimate(), + num_indices: table.num_indices(), }); + table_stats.insert(table_id, stats); } table_stats }) @@ -774,63 +772,106 @@ impl TxMetrics { get_exec_counter(self.workload).record(&self.exec_metrics); + // TODO(centril): simplify this by exposing `tx_data.for_table(table_id)`. if let Some(tx_data) = tx_data { // Update table rows and table size gauges, // and sets them to zero if no table is present. for (table_id, table_name) in tx_data.table_ids_and_names() { - let stats = self.table_stats.get(&table_id).unwrap(); - - DB_METRICS - .rdb_num_table_rows - .with_label_values(db, &table_id.0, table_name) - .set(stats.row_count as i64); - DB_METRICS - .rdb_table_size - .with_label_values(db, &table_id.0, table_name) - .set(stats.bytes_occupied_overestimate as i64); + if let Some(stats) = self.table_stats.get(&table_id).unwrap() { + DB_METRICS + .rdb_num_table_rows + .with_label_values(db, &table_id.0, table_name) + .set(stats.row_count as i64); + DB_METRICS + .rdb_table_size + .with_label_values(db, &table_id.0, table_name) + .set(stats.bytes_occupied_overestimate as i64); + } else { + // Table was dropped, remove the metrics. + let _ = DB_METRICS + .rdb_num_table_rows + .remove_label_values(db, &table_id.0, table_name); + let _ = DB_METRICS + .rdb_table_size + .remove_label_values(db, &table_id.0, table_name); + } } // Record inserts. for (table_id, table_name, inserts) in tx_data.inserts_with_table_name() { - let stats = self.table_stats.get(table_id).unwrap(); - let num_inserts = inserts.len() as u64; - let num_indices = stats.num_indices as u64; - - // Increment rows inserted counter. - DB_METRICS - .rdb_num_rows_inserted - .with_label_values(workload, db, reducer, &table_id.0, table_name) - .inc_by(num_inserts); - - // We don't have sparse indexes, so we can just multiply by the number of indexes. - if stats.num_indices > 0 { - // Increment index rows inserted counter + if let Some(stats) = self.table_stats.get(table_id).unwrap() { + let num_inserts = inserts.len() as u64; + let num_indices = stats.num_indices as u64; + + // Increment rows inserted counter. DB_METRICS - .rdb_num_index_entries_inserted + .rdb_num_rows_inserted .with_label_values(workload, db, reducer, &table_id.0, table_name) - .inc_by(num_inserts * num_indices); + .inc_by(num_inserts); + + // We don't have sparse indexes, so we can just multiply by the number of indexes. + if stats.num_indices > 0 { + // Increment index rows inserted counter + DB_METRICS + .rdb_num_index_entries_inserted + .with_label_values(workload, db, reducer, &table_id.0, table_name) + .inc_by(num_inserts * num_indices); + } + } else { + // Table was dropped, remove the metrics. + let _ = DB_METRICS.rdb_num_rows_inserted.remove_label_values( + workload, + db, + reducer, + &table_id.0, + table_name, + ); + let _ = DB_METRICS.rdb_num_index_entries_inserted.remove_label_values( + workload, + db, + reducer, + &table_id.0, + table_name, + ); } } // Record deletes. for (table_id, table_name, deletes) in tx_data.deletes_with_table_name() { - let stats = self.table_stats.get(table_id).unwrap(); - let num_deletes = deletes.len() as u64; - let num_indices = stats.num_indices as u64; - - // Increment rows deleted counter. - DB_METRICS - .rdb_num_rows_deleted - .with_label_values(workload, db, reducer, &table_id.0, table_name) - .inc_by(num_deletes); - - // We don't have sparse indexes, so we can just multiply by the number of indexes. - if num_indices > 0 { - // Increment index rows deleted counter. + if let Some(stats) = self.table_stats.get(table_id).unwrap() { + let num_deletes = deletes.len() as u64; + let num_indices = stats.num_indices as u64; + + // Increment rows deleted counter. DB_METRICS - .rdb_num_index_entries_deleted + .rdb_num_rows_deleted .with_label_values(workload, db, reducer, &table_id.0, table_name) - .inc_by(num_deletes * num_indices); + .inc_by(num_deletes); + + // We don't have sparse indexes, so we can just multiply by the number of indexes. + if num_indices > 0 { + // Increment index rows deleted counter. + DB_METRICS + .rdb_num_index_entries_deleted + .with_label_values(workload, db, reducer, &table_id.0, table_name) + .inc_by(num_deletes * num_indices); + } + } else { + // Table was dropped, remove the metrics. + let _ = DB_METRICS.rdb_num_rows_deleted.remove_label_values( + workload, + db, + reducer, + &table_id.0, + table_name, + ); + let _ = DB_METRICS.rdb_num_index_entries_deleted.remove_label_values( + workload, + db, + reducer, + &table_id.0, + table_name, + ); } } } From a50df64423a77bf6687b03b2c2fe10d9c2e989c7 Mon Sep 17 00:00:00 2001 From: Mazdak Farrokhzad Date: Mon, 8 Sep 2025 14:53:36 +0200 Subject: [PATCH 8/8] replay_delete_by_rel: actually populate 'table_dropped' --- .../src/locking_tx_datastore/committed_state.rs | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/crates/datastore/src/locking_tx_datastore/committed_state.rs b/crates/datastore/src/locking_tx_datastore/committed_state.rs index 62e541bf826..4f2377f0561 100644 --- a/crates/datastore/src/locking_tx_datastore/committed_state.rs +++ b/crates/datastore/src/locking_tx_datastore/committed_state.rs @@ -347,9 +347,9 @@ impl CommittedState { pub(super) fn replay_delete_by_rel(&mut self, table_id: TableId, row: &ProductValue) -> Result<()> { // Get the table for mutation. - // If it was dropped, avoid an error and just ignore the row instead. let table = match self.tables.get_mut(&table_id) { Some(t) => t, + // (1) If it was dropped, avoid an error and just ignore the row instead. None if self.table_dropped.contains(&table_id) => return Ok(()), None => return Err(TableError::IdNotFoundState(table_id).into()), }; @@ -364,9 +364,14 @@ impl CommittedState { if table_id == ST_TABLE_ID { // A row was removed from `st_table`, so a table was dropped. // Remove that table from the in-memory structures. + let dropped_table_id = Self::read_table_id(row); self.tables - .remove(&Self::read_table_id(row)) + .remove(&dropped_table_id) .expect("table to remove should exist"); + // Mark the table as dropped so that when + // processing row deletions for that table later, + // they are simply ignored in (1). + self.table_dropped.insert(dropped_table_id); } Ok(())