Skip to content

Commit 22b068b

Browse files
committed
replay_delete_by_rel: delete in-mmory table during replay
1 parent b48e42e commit 22b068b

File tree

2 files changed

+46
-10
lines changed

2 files changed

+46
-10
lines changed

crates/datastore/src/locking_tx_datastore/committed_state.rs

Lines changed: 39 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ use crate::{
2424
use anyhow::anyhow;
2525
use core::{convert::Infallible, ops::RangeBounds};
2626
use itertools::Itertools;
27-
use spacetimedb_data_structures::map::{HashSet, IntMap};
27+
use spacetimedb_data_structures::map::{HashSet, IntMap, IntSet};
2828
use spacetimedb_lib::{
2929
db::auth::{StAccess, StTableType},
3030
Identity,
@@ -66,6 +66,11 @@ pub struct CommittedState {
6666
/// Pages are shared between all modules running on a particular host,
6767
/// not allocated per-module.
6868
pub(super) page_pool: PagePool,
69+
/// Whether the table was dropped during replay.
70+
/// TODO(centril): Only used during bootstrap and is otherwise unused.
71+
/// We should split `CommittedState` into two types
72+
/// where one, e.g., `ReplayCommittedState`, has this field.
73+
table_dropped: IntSet<TableId>,
6974
}
7075

7176
impl MemoryUsage for CommittedState {
@@ -76,9 +81,14 @@ impl MemoryUsage for CommittedState {
7681
blob_store,
7782
index_id_map,
7883
page_pool: _,
84+
table_dropped,
7985
} = self;
8086
// NOTE(centril): We do not want to include the heap usage of `page_pool` as it's a shared resource.
81-
next_tx_offset.heap_usage() + tables.heap_usage() + blob_store.heap_usage() + index_id_map.heap_usage()
87+
next_tx_offset.heap_usage()
88+
+ tables.heap_usage()
89+
+ blob_store.heap_usage()
90+
+ index_id_map.heap_usage()
91+
+ table_dropped.heap_usage()
8292
}
8393
}
8494

@@ -157,6 +167,7 @@ impl CommittedState {
157167
tables: <_>::default(),
158168
blob_store: <_>::default(),
159169
index_id_map: <_>::default(),
170+
table_dropped: <_>::default(),
160171
page_pool,
161172
}
162173
}
@@ -334,17 +345,30 @@ impl CommittedState {
334345
Ok(())
335346
}
336347

337-
pub(super) fn replay_delete_by_rel(&mut self, table_id: TableId, rel: &ProductValue) -> Result<()> {
338-
let table = self
339-
.tables
340-
.get_mut(&table_id)
341-
.ok_or(TableError::IdNotFoundState(table_id))?;
348+
pub(super) fn replay_delete_by_rel(&mut self, table_id: TableId, row: &ProductValue) -> Result<()> {
349+
// Get the table for mutation.
350+
// If it was dropped, avoid an error and just ignore the row instead.
351+
let table = match self.tables.get_mut(&table_id) {
352+
Some(t) => t,
353+
None if self.table_dropped.contains(&table_id) => return Ok(()),
354+
None => return Err(TableError::IdNotFoundState(table_id).into()),
355+
};
356+
357+
// Delete the row.
342358
let blob_store = &mut self.blob_store;
343359
table
344-
.delete_equal_row(&self.page_pool, blob_store, rel)
360+
.delete_equal_row(&self.page_pool, blob_store, row)
345361
.map_err(TableError::Bflatn)?
346362
.ok_or_else(|| anyhow!("Delete for non-existent row when replaying transaction"))?;
347363

364+
if table_id == ST_TABLE_ID {
365+
// A row was removed from `st_table`, so a table was dropped.
366+
// Remove that table from the in-memory structures.
367+
self.tables
368+
.remove(&Self::read_table_id(row))
369+
.expect("table to remove should exist");
370+
}
371+
348372
Ok(())
349373
}
350374

@@ -379,8 +403,7 @@ impl CommittedState {
379403
///
380404
/// The `row_ptr` is a pointer to `row`.
381405
fn st_column_changed(&mut self, row: &ProductValue, row_ptr: RowPointer) -> Result<()> {
382-
let target_table_id = TableId::deserialize(ValueDeserializer::from_ref(&row.elements[0]))
383-
.expect("first field in `st_column` should decode to a `TableId`");
406+
let target_table_id = Self::read_table_id(row);
384407
let target_col_id = ColId::deserialize(ValueDeserializer::from_ref(&row.elements[1]))
385408
.expect("second field in `st_column` should decode to a `ColId`");
386409

@@ -411,6 +434,12 @@ impl CommittedState {
411434
Ok(())
412435
}
413436

437+
/// Assuming that a `TableId` is stored as the first field in `row`, read it.
438+
fn read_table_id(row: &ProductValue) -> TableId {
439+
TableId::deserialize(ValueDeserializer::from_ref(&row.elements[0]))
440+
.expect("first field in `st_column` should decode to a `TableId`")
441+
}
442+
414443
pub(super) fn build_sequence_state(&mut self, sequence_state: &mut SequencesState) -> Result<()> {
415444
let st_sequences = self.tables.get(&ST_SEQUENCE_ID).unwrap();
416445
for row_ref in st_sequences.scan_rows(&self.blob_store) {

crates/memory-usage/src/lib.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,13 @@ impl<K: MemoryUsage + Eq + core::hash::Hash, V: MemoryUsage, S: core::hash::Buil
112112
}
113113
}
114114

115+
#[cfg(feature = "hashbrown")]
116+
impl<K: MemoryUsage + Eq + core::hash::Hash, S: core::hash::BuildHasher> MemoryUsage for hashbrown::HashSet<K, S> {
117+
fn heap_usage(&self) -> usize {
118+
self.allocation_size() + self.iter().map(|k| k.heap_usage()).sum::<usize>()
119+
}
120+
}
121+
115122
impl<K: MemoryUsage, V: MemoryUsage> MemoryUsage for std::collections::BTreeMap<K, V> {
116123
fn heap_usage(&self) -> usize {
117124
// NB: this is best-effort, since we don't have a `capacity()` method on `BTreeMap`.

0 commit comments

Comments
 (0)