Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 42 additions & 10 deletions crates/datastore/src/locking_tx_datastore/committed_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ use spacetimedb_table::{
};
use std::collections::BTreeMap;
use std::sync::Arc;
use thin_vec::ThinVec;

/// Contains the live, in-memory snapshot of a database. This structure
/// is exposed in order to support tools wanting to process the commit
Expand Down Expand Up @@ -593,12 +594,23 @@ impl CommittedState {
pub(super) fn merge(&mut self, tx_state: TxState, ctx: &ExecutionContext) -> TxData {
let mut tx_data = TxData::default();

// This transaction may have dropped tables.
// After applying `merge_apply_deletes` and `merge_apply_inserts`,
// any tables no longer referenced by `st_table` should be
// removed from the committed state.
let mut tables_to_drop = ThinVec::<TableId>::new();

// First, apply deletes. This will free up space in the committed tables.
self.merge_apply_deletes(&mut tx_data, tx_state.delete_tables);
self.merge_apply_deletes(&mut tx_data, &mut tables_to_drop, tx_state.delete_tables);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
self.merge_apply_deletes(&mut tx_data, &mut tables_to_drop, tx_state.delete_tables);
// Any rows deleted from `st_table` will be noted in `tables_to_drop`.
self.merge_apply_deletes(&mut tx_data, &mut tables_to_drop, tx_state.delete_tables);

I don't actually know if this is true (this is the first part of my review), it's just my assumption about what's going on here.


// Then, apply inserts. This will re-fill the holes freed by deletions
// before allocating new pages.
self.merge_apply_inserts(&mut tx_data, tx_state.insert_tables, tx_state.blob_store);
self.merge_apply_inserts(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
self.merge_apply_inserts(
// Any rows inserted into `st_table` will be removed from `tables_to_drop`.
// This allows us to avoid incorrectly dropping tables whose `st_table` rows have been "updated,"
// i.e. replaced with a new version.
self.merge_apply_inserts(

I don't actually know if this is true (this is the first part of my review), it's just my assumption about what's going on here.

&mut tx_data,
&mut tables_to_drop,
tx_state.insert_tables,
tx_state.blob_store,
);

// If the TX will be logged, record its projected tx offset,
// then increment the counter.
Expand All @@ -607,10 +619,19 @@ impl CommittedState {
self.next_tx_offset += 1;
}

for table_id in tables_to_drop {
self.tables.remove(&table_id);
}

tx_data
}

fn merge_apply_deletes(&mut self, tx_data: &mut TxData, delete_tables: BTreeMap<TableId, DeleteTable>) {
fn merge_apply_deletes(
&mut self,
tx_data: &mut TxData,
tables_to_drop: &mut ThinVec<TableId>,
delete_tables: BTreeMap<TableId, DeleteTable>,
) {
for (table_id, row_ptrs) in delete_tables {
if let (Some(table), blob_store, _) = self.get_table_and_blob_store_mut(table_id) {
let mut deletes = Vec::with_capacity(row_ptrs.len());
Expand All @@ -626,6 +647,14 @@ impl CommittedState {
let pv = table
.delete(blob_store, row_ptr, |row| row.to_product_value())
.expect("Delete for non-existent row!");

if table_id == ST_TABLE_ID {
let st_table_row =
StTableRow::deserialize(ValueDeserializer::from_ref(&AlgebraicValue::Product(pv.clone())))
.expect("st_table row should deserialize");
tables_to_drop.push(st_table_row.table_id);
}

deletes.push(pv);
}

Expand All @@ -642,6 +671,7 @@ impl CommittedState {
fn merge_apply_inserts(
&mut self,
tx_data: &mut TxData,
tables_to_drop: &mut ThinVec<TableId>,
insert_tables: BTreeMap<TableId, Table>,
tx_blob_store: impl BlobStore,
) {
Expand All @@ -665,6 +695,15 @@ impl CommittedState {
.insert(page_pool, commit_blob_store, &pv)
.expect("Failed to insert when merging commit");

// If we inserted a row back into `st_table`,
// it means transaction only updates a row and do not drop the table.
if table_id == ST_TABLE_ID {
let st_table_row =
StTableRow::deserialize(ValueDeserializer::from_ref(&AlgebraicValue::Product(pv.clone())))
.expect("st_table row should deserialize");
tables_to_drop.retain(|&id| id != st_table_row.table_id);
}

inserts.push(pv);
}

Expand Down Expand Up @@ -717,13 +756,6 @@ impl CommittedState {
table.with_mut_schema(|s| s.remove_index(index_id));
self.index_id_map.remove(&index_id);
}
// A table was removed. Add it back.
TableRemoved(table_id, table) => {
// We don't need to deal with sub-components.
// That is, we don't need to add back indices and such.
// Instead, there will be separate pending schema changes like `IndexRemoved`.
self.tables.insert(table_id, table);
}
// A table was added. Remove it.
TableAdded(table_id) => {
// We don't need to deal with sub-components.
Expand Down
29 changes: 19 additions & 10 deletions crates/datastore/src/locking_tx_datastore/datastore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -675,6 +675,7 @@ pub struct TxMetrics {
exec_metrics: ExecutionMetrics,
}

#[derive(Default)]
struct TableStats {
/// The number of rows in the table after this transaction.
///
Expand Down Expand Up @@ -715,16 +716,16 @@ impl TxMetrics {
let mut table_stats =
<HashMap<_, _, _> as HashCollectionExt>::with_capacity(tx_data.num_tables_affected());
for (table_id, _) in tx_data.table_ids_and_names() {
let table = committed_state
.get_table(table_id)
.expect("should have a table in committed state for one in tx data");
table_stats.insert(
table_id,
TableStats {
row_count: table.row_count,
bytes_occupied_overestimate: table.bytes_occupied_overestimate(),
num_indices: table.num_indices(),
},
committed_state
.get_table(table_id)
.map(|table| TableStats {
row_count: table.row_count,
bytes_occupied_overestimate: table.bytes_occupied_overestimate(),
num_indices: table.num_indices(),
})
.unwrap_or_default(),
);
}
table_stats
Expand Down Expand Up @@ -3082,9 +3083,7 @@ mod tests {
PendingSchemaChange::SequenceRemoved(..),
PendingSchemaChange::ConstraintRemoved(..),
PendingSchemaChange::ConstraintRemoved(..),
PendingSchemaChange::TableRemoved(removed_table_id, _)
]
if *removed_table_id == table_id
);
let _ = datastore.rollback_mut_tx(tx);

Expand All @@ -3096,7 +3095,17 @@ mod tests {
"Table should still exist",
);
assert_eq!(all_rows(&datastore, &tx, table_id), [row]);
let _ = datastore.rollback_mut_tx(tx);

let mut tx = begin_mut_tx(&datastore);
assert!(datastore.drop_table_mut_tx(&mut tx, table_id).is_ok());
commit(&datastore, tx)?;

let tx = begin_mut_tx(&datastore);
assert!(
!datastore.table_id_exists_mut_tx(&tx, &table_id),
"Table should be removed",
);
Ok(())
}

Expand Down
13 changes: 6 additions & 7 deletions crates/datastore/src/locking_tx_datastore/mut_tx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -352,15 +352,14 @@ impl MutTxId {
)?;
}

// Delete all rows in the table.
let ptrs: Vec<_> = self.iter(table_id)?.map(|row| row.pointer()).collect();
for ptr in ptrs {
self.delete(table_id, ptr)?;
}

// Delete the table and its rows and indexes from memory.
self.tx_state.insert_tables.remove(&table_id);
self.tx_state.delete_tables.remove(&table_id);
let commit_table = self
.committed_state_write_lock
.tables
.remove(&table_id)
.expect("there should be a schema in the committed state if we reach here");
self.push_schema_change(PendingSchemaChange::TableRemoved(table_id, commit_table));
Comment on lines -357 to -363
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you're removing this code, you need to update the comment about how the commit table isn't eagerly deleted, but what code does delete it, and how queries later in the transaction avoid reading from it.


Ok(())
}
Expand Down
2 changes: 0 additions & 2 deletions crates/datastore/src/locking_tx_datastore/tx_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,6 @@ pub enum PendingSchemaChange {
/// If adding this index caused the pointer map to be removed,
/// it will be present here.
IndexAdded(TableId, IndexId, Option<PointerMap>),
/// The [`Table`] with [`TableId`] was removed.
TableRemoved(TableId, Table),
/// The table with [`TableId`] was added.
TableAdded(TableId),
/// The access of the table with [`TableId`] was changed.
Expand Down
Loading