diff --git a/crates/core/src/db/relational_db.rs b/crates/core/src/db/relational_db.rs index 6fec0c1b62b..55d60bd8858 100644 --- a/crates/core/src/db/relational_db.rs +++ b/crates/core/src/db/relational_db.rs @@ -1397,13 +1397,9 @@ impl RelationalDB { } /// Clear all rows from a table without dropping it. - pub fn clear_table(&self, tx: &mut MutTx, table_id: TableId) -> Result<(), DBError> { - let relation = self - .iter_mut(tx, table_id)? - .map(|row_ref| row_ref.pointer()) - .collect::>(); - self.delete(tx, table_id, relation); - Ok(()) + pub fn clear_table(&self, tx: &mut MutTx, table_id: TableId) -> Result { + let rows_deleted = tx.clear_table(table_id)?; + Ok(rows_deleted) } pub fn create_sequence(&self, tx: &mut MutTx, sequence_schema: SequenceSchema) -> Result { diff --git a/crates/datastore/src/locking_tx_datastore/committed_state.rs b/crates/datastore/src/locking_tx_datastore/committed_state.rs index ad3c98ca2b1..4f2377f0561 100644 --- a/crates/datastore/src/locking_tx_datastore/committed_state.rs +++ b/crates/datastore/src/locking_tx_datastore/committed_state.rs @@ -24,7 +24,7 @@ use crate::{ use anyhow::anyhow; use core::{convert::Infallible, ops::RangeBounds}; use itertools::Itertools; -use spacetimedb_data_structures::map::{HashSet, IntMap}; +use spacetimedb_data_structures::map::{HashSet, IntMap, IntSet}; use spacetimedb_lib::{ db::auth::{StAccess, StTableType}, Identity, @@ -44,6 +44,7 @@ use spacetimedb_table::{ }; use std::collections::BTreeMap; use std::sync::Arc; +use thin_vec::ThinVec; /// Contains the live, in-memory snapshot of a database. This structure /// is exposed in order to support tools wanting to process the commit @@ -65,6 +66,11 @@ pub struct CommittedState { /// Pages are shared between all modules running on a particular host, /// not allocated per-module. pub(super) page_pool: PagePool, + /// Whether the table was dropped during replay. + /// TODO(centril): Only used during bootstrap and is otherwise unused. + /// We should split `CommittedState` into two types + /// where one, e.g., `ReplayCommittedState`, has this field. + table_dropped: IntSet, } impl MemoryUsage for CommittedState { @@ -75,9 +81,14 @@ impl MemoryUsage for CommittedState { blob_store, index_id_map, page_pool: _, + table_dropped, } = self; // NOTE(centril): We do not want to include the heap usage of `page_pool` as it's a shared resource. - next_tx_offset.heap_usage() + tables.heap_usage() + blob_store.heap_usage() + index_id_map.heap_usage() + next_tx_offset.heap_usage() + + tables.heap_usage() + + blob_store.heap_usage() + + index_id_map.heap_usage() + + table_dropped.heap_usage() } } @@ -156,6 +167,7 @@ impl CommittedState { tables: <_>::default(), blob_store: <_>::default(), index_id_map: <_>::default(), + table_dropped: <_>::default(), page_pool, } } @@ -333,17 +345,35 @@ impl CommittedState { Ok(()) } - pub(super) fn replay_delete_by_rel(&mut self, table_id: TableId, rel: &ProductValue) -> Result<()> { - let table = self - .tables - .get_mut(&table_id) - .ok_or(TableError::IdNotFoundState(table_id))?; + pub(super) fn replay_delete_by_rel(&mut self, table_id: TableId, row: &ProductValue) -> Result<()> { + // Get the table for mutation. + let table = match self.tables.get_mut(&table_id) { + Some(t) => t, + // (1) If it was dropped, avoid an error and just ignore the row instead. + None if self.table_dropped.contains(&table_id) => return Ok(()), + None => return Err(TableError::IdNotFoundState(table_id).into()), + }; + + // Delete the row. let blob_store = &mut self.blob_store; table - .delete_equal_row(&self.page_pool, blob_store, rel) + .delete_equal_row(&self.page_pool, blob_store, row) .map_err(TableError::Bflatn)? .ok_or_else(|| anyhow!("Delete for non-existent row when replaying transaction"))?; + if table_id == ST_TABLE_ID { + // A row was removed from `st_table`, so a table was dropped. + // Remove that table from the in-memory structures. + let dropped_table_id = Self::read_table_id(row); + self.tables + .remove(&dropped_table_id) + .expect("table to remove should exist"); + // Mark the table as dropped so that when + // processing row deletions for that table later, + // they are simply ignored in (1). + self.table_dropped.insert(dropped_table_id); + } + Ok(()) } @@ -378,8 +408,7 @@ impl CommittedState { /// /// The `row_ptr` is a pointer to `row`. fn st_column_changed(&mut self, row: &ProductValue, row_ptr: RowPointer) -> Result<()> { - let target_table_id = TableId::deserialize(ValueDeserializer::from_ref(&row.elements[0])) - .expect("first field in `st_column` should decode to a `TableId`"); + let target_table_id = Self::read_table_id(row); let target_col_id = ColId::deserialize(ValueDeserializer::from_ref(&row.elements[1])) .expect("second field in `st_column` should decode to a `ColId`"); @@ -410,6 +439,12 @@ impl CommittedState { Ok(()) } + /// Assuming that a `TableId` is stored as the first field in `row`, read it. + fn read_table_id(row: &ProductValue) -> TableId { + TableId::deserialize(ValueDeserializer::from_ref(&row.elements[0])) + .expect("first field in `st_column` should decode to a `TableId`") + } + pub(super) fn build_sequence_state(&mut self, sequence_state: &mut SequencesState) -> Result<()> { let st_sequences = self.tables.get(&ST_SEQUENCE_ID).unwrap(); for row_ref in st_sequences.scan_rows(&self.blob_store) { @@ -594,7 +629,7 @@ impl CommittedState { let mut tx_data = TxData::default(); // First, apply deletes. This will free up space in the committed tables. - self.merge_apply_deletes(&mut tx_data, tx_state.delete_tables); + self.merge_apply_deletes(&mut tx_data, tx_state.delete_tables, tx_state.pending_schema_changes); // Then, apply inserts. This will re-fill the holes freed by deletions // before allocating new pages. @@ -610,33 +645,68 @@ impl CommittedState { tx_data } - fn merge_apply_deletes(&mut self, tx_data: &mut TxData, delete_tables: BTreeMap) { + fn merge_apply_deletes( + &mut self, + tx_data: &mut TxData, + delete_tables: BTreeMap, + pending_schema_changes: ThinVec, + ) { + fn delete_rows( + tx_data: &mut TxData, + table_id: TableId, + table: &mut Table, + blob_store: &mut dyn BlobStore, + row_ptrs_len: usize, + row_ptrs: impl Iterator, + ) { + let mut deletes = Vec::with_capacity(row_ptrs_len); + + // Note: we maintain the invariant that the delete_tables + // holds only committed rows which should be deleted, + // i.e. `RowPointer`s with `SquashedOffset::COMMITTED_STATE`, + // so no need to check before applying the deletes. + for row_ptr in row_ptrs { + debug_assert!(row_ptr.squashed_offset().is_committed_state()); + + // TODO: re-write `TxData` to remove `ProductValue`s + let pv = table + .delete(blob_store, row_ptr, |row| row.to_product_value()) + .expect("Delete for non-existent row!"); + deletes.push(pv); + } + + if !deletes.is_empty() { + let table_name = &*table.get_schema().table_name; + // TODO(centril): Pass this along to record truncated tables. + let _truncated = table.row_count == 0; + tx_data.set_deletes_for_table(table_id, table_name, deletes.into()); + } + } + for (table_id, row_ptrs) in delete_tables { if let (Some(table), blob_store, _) = self.get_table_and_blob_store_mut(table_id) { - let mut deletes = Vec::with_capacity(row_ptrs.len()); - - // Note: we maintain the invariant that the delete_tables - // holds only committed rows which should be deleted, - // i.e. `RowPointer`s with `SquashedOffset::COMMITTED_STATE`, - // so no need to check before applying the deletes. - for row_ptr in row_ptrs.iter() { - debug_assert!(row_ptr.squashed_offset().is_committed_state()); - - // TODO: re-write `TxData` to remove `ProductValue`s - let pv = table - .delete(blob_store, row_ptr, |row| row.to_product_value()) - .expect("Delete for non-existent row!"); - deletes.push(pv); - } - - if !deletes.is_empty() { - let table_name = &*table.get_schema().table_name; - tx_data.set_deletes_for_table(table_id, table_name, deletes.into()); - } + delete_rows(tx_data, table_id, table, blob_store, row_ptrs.len(), row_ptrs.iter()); } else if !row_ptrs.is_empty() { panic!("Deletion for non-existent table {table_id:?}... huh?"); } } + + // Delete all tables marked for deletion. + // The order here does not matter as once a `table_id` has been dropped + // it will never be re-created. + for change in pending_schema_changes { + if let PendingSchemaChange::TableRemoved(table_id, mut table) = change { + let row_ptrs = table.scan_all_row_ptrs(); + delete_rows( + tx_data, + table_id, + &mut table, + &mut self.blob_store, + row_ptrs.len(), + row_ptrs.into_iter(), + ); + } + } } fn merge_apply_inserts( diff --git a/crates/datastore/src/locking_tx_datastore/datastore.rs b/crates/datastore/src/locking_tx_datastore/datastore.rs index 1577a379c43..c5dc3fe5abe 100644 --- a/crates/datastore/src/locking_tx_datastore/datastore.rs +++ b/crates/datastore/src/locking_tx_datastore/datastore.rs @@ -666,7 +666,9 @@ impl MutTxDatastore for Locking { /// Various measurements, needed for metrics, of the work performed by a transaction. #[must_use = "TxMetrics should be reported"] pub struct TxMetrics { - table_stats: HashMap, + /// The transaction metrics for a particular table. + /// The value `None` for a [`TableId`] means that it was deleted. + table_stats: HashMap>, workload: WorkloadType, database_identity: Identity, elapsed_time: Duration, @@ -715,17 +717,12 @@ impl TxMetrics { let mut table_stats = as HashCollectionExt>::with_capacity(tx_data.num_tables_affected()); for (table_id, _) in tx_data.table_ids_and_names() { - let table = committed_state - .get_table(table_id) - .expect("should have a table in committed state for one in tx data"); - table_stats.insert( - table_id, - TableStats { - row_count: table.row_count, - bytes_occupied_overestimate: table.bytes_occupied_overestimate(), - num_indices: table.num_indices(), - }, - ); + let stats = committed_state.get_table(table_id).map(|table| TableStats { + row_count: table.row_count, + bytes_occupied_overestimate: table.bytes_occupied_overestimate(), + num_indices: table.num_indices(), + }); + table_stats.insert(table_id, stats); } table_stats }) @@ -775,63 +772,106 @@ impl TxMetrics { get_exec_counter(self.workload).record(&self.exec_metrics); + // TODO(centril): simplify this by exposing `tx_data.for_table(table_id)`. if let Some(tx_data) = tx_data { // Update table rows and table size gauges, // and sets them to zero if no table is present. for (table_id, table_name) in tx_data.table_ids_and_names() { - let stats = self.table_stats.get(&table_id).unwrap(); - - DB_METRICS - .rdb_num_table_rows - .with_label_values(db, &table_id.0, table_name) - .set(stats.row_count as i64); - DB_METRICS - .rdb_table_size - .with_label_values(db, &table_id.0, table_name) - .set(stats.bytes_occupied_overestimate as i64); + if let Some(stats) = self.table_stats.get(&table_id).unwrap() { + DB_METRICS + .rdb_num_table_rows + .with_label_values(db, &table_id.0, table_name) + .set(stats.row_count as i64); + DB_METRICS + .rdb_table_size + .with_label_values(db, &table_id.0, table_name) + .set(stats.bytes_occupied_overestimate as i64); + } else { + // Table was dropped, remove the metrics. + let _ = DB_METRICS + .rdb_num_table_rows + .remove_label_values(db, &table_id.0, table_name); + let _ = DB_METRICS + .rdb_table_size + .remove_label_values(db, &table_id.0, table_name); + } } // Record inserts. for (table_id, table_name, inserts) in tx_data.inserts_with_table_name() { - let stats = self.table_stats.get(table_id).unwrap(); - let num_inserts = inserts.len() as u64; - let num_indices = stats.num_indices as u64; - - // Increment rows inserted counter. - DB_METRICS - .rdb_num_rows_inserted - .with_label_values(workload, db, reducer, &table_id.0, table_name) - .inc_by(num_inserts); - - // We don't have sparse indexes, so we can just multiply by the number of indexes. - if stats.num_indices > 0 { - // Increment index rows inserted counter + if let Some(stats) = self.table_stats.get(table_id).unwrap() { + let num_inserts = inserts.len() as u64; + let num_indices = stats.num_indices as u64; + + // Increment rows inserted counter. DB_METRICS - .rdb_num_index_entries_inserted + .rdb_num_rows_inserted .with_label_values(workload, db, reducer, &table_id.0, table_name) - .inc_by(num_inserts * num_indices); + .inc_by(num_inserts); + + // We don't have sparse indexes, so we can just multiply by the number of indexes. + if stats.num_indices > 0 { + // Increment index rows inserted counter + DB_METRICS + .rdb_num_index_entries_inserted + .with_label_values(workload, db, reducer, &table_id.0, table_name) + .inc_by(num_inserts * num_indices); + } + } else { + // Table was dropped, remove the metrics. + let _ = DB_METRICS.rdb_num_rows_inserted.remove_label_values( + workload, + db, + reducer, + &table_id.0, + table_name, + ); + let _ = DB_METRICS.rdb_num_index_entries_inserted.remove_label_values( + workload, + db, + reducer, + &table_id.0, + table_name, + ); } } // Record deletes. for (table_id, table_name, deletes) in tx_data.deletes_with_table_name() { - let stats = self.table_stats.get(table_id).unwrap(); - let num_deletes = deletes.len() as u64; - let num_indices = stats.num_indices as u64; - - // Increment rows deleted counter. - DB_METRICS - .rdb_num_rows_deleted - .with_label_values(workload, db, reducer, &table_id.0, table_name) - .inc_by(num_deletes); - - // We don't have sparse indexes, so we can just multiply by the number of indexes. - if num_indices > 0 { - // Increment index rows deleted counter. + if let Some(stats) = self.table_stats.get(table_id).unwrap() { + let num_deletes = deletes.len() as u64; + let num_indices = stats.num_indices as u64; + + // Increment rows deleted counter. DB_METRICS - .rdb_num_index_entries_deleted + .rdb_num_rows_deleted .with_label_values(workload, db, reducer, &table_id.0, table_name) - .inc_by(num_deletes * num_indices); + .inc_by(num_deletes); + + // We don't have sparse indexes, so we can just multiply by the number of indexes. + if num_indices > 0 { + // Increment index rows deleted counter. + DB_METRICS + .rdb_num_index_entries_deleted + .with_label_values(workload, db, reducer, &table_id.0, table_name) + .inc_by(num_deletes * num_indices); + } + } else { + // Table was dropped, remove the metrics. + let _ = DB_METRICS.rdb_num_rows_deleted.remove_label_values( + workload, + db, + reducer, + &table_id.0, + table_name, + ); + let _ = DB_METRICS.rdb_num_index_entries_deleted.remove_label_values( + workload, + db, + reducer, + &table_id.0, + table_name, + ); } } } @@ -3066,7 +3106,7 @@ mod tests { let (datastore, mut tx, table_id) = setup_table()?; // Insert a row and commit. - let row = random_row(); + let row: ProductValue = random_row(); insert(&datastore, &mut tx, table_id, &row)?; commit(&datastore, tx)?; @@ -3089,13 +3129,32 @@ mod tests { let _ = datastore.rollback_mut_tx(tx); // Ensure the table still exists in the next transaction. - let tx = begin_mut_tx(&datastore); + let mut tx = begin_mut_tx(&datastore); assert_eq!(tx.pending_schema_changes(), []); assert!( datastore.table_id_exists_mut_tx(&tx, &table_id), "Table should still exist", ); - assert_eq!(all_rows(&datastore, &tx, table_id), [row]); + assert_eq!(all_rows(&datastore, &tx, table_id), [row.clone()]); + + // Now drop the table again and commit. + assert!(datastore.drop_table_mut_tx(&mut tx, table_id).is_ok()); + let tx_data = commit(&datastore, tx)?; + let (_, deleted) = tx_data + .deletes() + .find(|(id, _)| **id == table_id) + .expect("should have deleted rows for `table_id`"); + assert_eq!(&**deleted, [row]); + + // In the next transaction, the table doesn't exist. + assert!( + !datastore.table_id_exists_mut_tx(&begin_mut_tx(&datastore), &table_id), + "Table should be removed", + ); + assert!( + !datastore.table_id_exists_tx(&begin_tx(&datastore), &table_id), + "Table should be removed", + ); Ok(()) } diff --git a/crates/datastore/src/locking_tx_datastore/mut_tx.rs b/crates/datastore/src/locking_tx_datastore/mut_tx.rs index c3346cd231a..fbee223c89a 100644 --- a/crates/datastore/src/locking_tx_datastore/mut_tx.rs +++ b/crates/datastore/src/locking_tx_datastore/mut_tx.rs @@ -326,6 +326,8 @@ impl MutTxId { } pub fn drop_table(&mut self, table_id: TableId) -> Result<()> { + self.clear_table(table_id)?; + let schema = &*self.schema_for_table(table_id)?; for row in &schema.indexes { @@ -352,8 +354,10 @@ impl MutTxId { )?; } - // Delete the table and its rows and indexes from memory. + // Delete the table from memory, both in the tx an committed states. self.tx_state.insert_tables.remove(&table_id); + // No need to keep the delete tables. + // By seeing `PendingSchemaChange::TableRemoved`, `merge` knows that all rows were deleted. self.tx_state.delete_tables.remove(&table_id); let commit_table = self .committed_state_write_lock @@ -1865,6 +1869,26 @@ impl MutTxId { row_pointer, ) } + + // Clears the table for `table_id`, removing all rows. + pub fn clear_table(&mut self, table_id: TableId) -> Result { + // Get the commit table. + let (commit_table, commit_bs, ..) = self.committed_state_write_lock.get_table_and_blob_store(table_id)?; + + // Get the insert table and delete all rows from it. + let (tx_table, tx_blob_store, delete_table) = self + .tx_state + .get_table_and_blob_store_or_create_from(table_id, commit_table); + let mut rows_removed = tx_table.clear(tx_blob_store); + + // Mark every row in the committed state as deleted. + for row in commit_table.scan_rows(commit_bs) { + delete_table.insert(row.pointer()); + rows_removed += 1; + } + + Ok(rows_removed) + } } pub(super) fn delete( diff --git a/crates/memory-usage/src/lib.rs b/crates/memory-usage/src/lib.rs index 40bbb8fc7ef..8460281cfe5 100644 --- a/crates/memory-usage/src/lib.rs +++ b/crates/memory-usage/src/lib.rs @@ -112,6 +112,13 @@ impl MemoryUsage for hashbrown::HashSet { + fn heap_usage(&self) -> usize { + self.allocation_size() + self.iter().map(|k| k.heap_usage()).sum::() + } +} + impl MemoryUsage for std::collections::BTreeMap { fn heap_usage(&self) -> usize { // NB: this is best-effort, since we don't have a `capacity()` method on `BTreeMap`. diff --git a/crates/table/src/table.rs b/crates/table/src/table.rs index 11493edddca..8b907851d29 100644 --- a/crates/table/src/table.rs +++ b/crates/table/src/table.rs @@ -1,3 +1,5 @@ +use crate::blob_store::NullBlobStore; + use super::{ bflatn_from::serialize_row_from_page, bflatn_to::{write_row_to_pages, write_row_to_pages_bsatn, Error}, @@ -1138,10 +1140,10 @@ impl Table { /// Deletes the row identified by `ptr` from the table. /// - /// Returns the number of blob bytes deleted. This method does not update statistics by itself. + /// This method does update statistics. /// /// SAFETY: `self.is_row_present(row)` must hold. - unsafe fn delete_unchecked(&mut self, blob_store: &mut dyn BlobStore, ptr: RowPointer) -> BlobNumBytes { + unsafe fn delete_unchecked(&mut self, blob_store: &mut dyn BlobStore, ptr: RowPointer) { // Delete row from indices. // Do this before the actual deletion, as `index.delete` needs a `RowRef` // so it can extract the appropriate value. @@ -1149,7 +1151,9 @@ impl Table { unsafe { self.delete_from_indices(blob_store, ptr) }; // SAFETY: Caller promised that `self.is_row_present(row)` holds. - unsafe { self.delete_internal(blob_store, ptr) } + let blob_bytes_deleted = unsafe { self.delete_internal(blob_store, ptr) }; + + self.update_statistics_deleted_row(blob_bytes_deleted); } /// Delete `row_ref` from all the indices of this table until `index_id` is reached. @@ -1201,8 +1205,7 @@ impl Table { let ret = before(row_ref); // SAFETY: We've checked above that `self.is_row_present(ptr)`. - let blob_bytes_deleted = unsafe { self.delete_unchecked(blob_store, ptr) }; - self.update_statistics_deleted_row(blob_bytes_deleted); + unsafe { self.delete_unchecked(blob_store, ptr) }; Some(ret) } @@ -1242,11 +1245,8 @@ impl Table { // If an equal row was present, delete it. if let Some(existing_row_ptr) = existing_row_ptr { - let blob_bytes_deleted = unsafe { - // SAFETY: `find_same_row` ensures that the pointer is valid. - self.delete_unchecked(blob_store, existing_row_ptr) - }; - self.update_statistics_deleted_row(blob_bytes_deleted); + // SAFETY: `find_same_row` ensures that the pointer is valid. + unsafe { self.delete_unchecked(blob_store, existing_row_ptr) }; } // Remove the temporary row we inserted in the beginning. @@ -1259,6 +1259,17 @@ impl Table { Ok(existing_row_ptr) } + /// Clears this table, removing all present rows from it. + pub fn clear(&mut self, blob_store: &mut dyn BlobStore) -> usize { + let ptrs = self.scan_all_row_ptrs(); + let len = ptrs.len(); + for ptr in ptrs { + // SAFETY: `ptr` came rom `self.scan_rows(...)`, so it's present. + unsafe { self.delete_unchecked(blob_store, ptr) }; + } + len + } + /// Returns the row type for rows in this table. pub fn get_row_type(&self) -> &ProductType { self.get_schema().get_row_type() @@ -1359,7 +1370,7 @@ impl Table { Some(index) } - /// Returns an iterator over all the rows of `self`, yielded as [`RefRef`]s. + /// Returns an iterator over all the rows of `self`, yielded as [`RowRef`]s. pub fn scan_rows<'a>(&'a self, blob_store: &'a dyn BlobStore) -> TableScanIter<'a> { TableScanIter { current_page: None, // Will be filled by the iterator. @@ -1369,6 +1380,13 @@ impl Table { } } + /// Returns a list of all present row pointers. + pub fn scan_all_row_ptrs(&self) -> Vec { + let mut ptrs = Vec::with_capacity(self.row_count as usize); + ptrs.extend(self.scan_rows(&NullBlobStore).map(|row| row.pointer())); + ptrs + } + /// Returns this table combined with the index for [`IndexId`], if any. pub fn get_index_by_id_with_table<'a>( &'a self,