diff --git a/crates/core/src/db/relational_db.rs b/crates/core/src/db/relational_db.rs index 55d60bd8858..4c0f0abd381 100644 --- a/crates/core/src/db/relational_db.rs +++ b/crates/core/src/db/relational_db.rs @@ -1052,6 +1052,18 @@ impl RelationalDB { Ok(self.inner.alter_table_row_type_mut_tx(tx, table_id, column_schemas)?) } + pub(crate) fn add_columns_to_table( + &self, + tx: &mut MutTx, + table_id: TableId, + column_schemas: Vec, + default_values: Vec, + ) -> Result { + Ok(self + .inner + .add_columns_to_table_mut_tx(tx, table_id, column_schemas, default_values)?) + } + /// Reports the `TxMetrics`s passed. /// /// Should only be called after the tx lock has been fully released. diff --git a/crates/core/src/db/update.rs b/crates/core/src/db/update.rs index d2496b5bdb5..fa3ea5bdc31 100644 --- a/crates/core/src/db/update.rs +++ b/crates/core/src/db/update.rs @@ -252,6 +252,18 @@ fn auto_migrate_database( log!(logger, "Removing-row level security `{sql_rls}`"); stdb.drop_row_level_security(tx, sql_rls.clone())?; } + spacetimedb_schema::auto_migrate::AutoMigrateStep::AddColumns(table_name) => { + let table_def = plan.new.stored_in_table_def(table_name).expect("table must exist"); + let table_id = stdb.table_id_from_name_mut(tx, table_name).unwrap().unwrap(); + let column_schemas = column_schemas_from_defs(plan.new, &table_def.columns, table_id); + + let default_values: Vec = table_def + .columns + .iter() + .filter_map(|col_def| col_def.default_value.clone()) + .collect(); + stdb.add_columns_to_table(tx, table_id, column_schemas, default_values)?; + } _ => anyhow::bail!("migration step not implemented: {step:?}"), } } diff --git a/crates/datastore/src/error.rs b/crates/datastore/src/error.rs index 144a62788d8..3a571f7d579 100644 --- a/crates/datastore/src/error.rs +++ b/crates/datastore/src/error.rs @@ -81,6 +81,8 @@ pub enum TableError { #[error(transparent)] // Error here is `Box`ed to avoid triggering https://rust-lang.github.io/rust-clippy/master/index.html#result_large_err . ChangeColumnsError(#[from] Box), + #[error(transparent)] + AddColumnsError(#[from] Box), } #[derive(Error, Debug, PartialEq, Eq)] diff --git a/crates/datastore/src/locking_tx_datastore/datastore.rs b/crates/datastore/src/locking_tx_datastore/datastore.rs index c5dc3fe5abe..2788b3a48d5 100644 --- a/crates/datastore/src/locking_tx_datastore/datastore.rs +++ b/crates/datastore/src/locking_tx_datastore/datastore.rs @@ -313,6 +313,16 @@ impl Locking { ) -> Result<()> { tx.alter_table_row_type(table_id, column_schemas) } + + pub fn add_columns_to_table_mut_tx( + &self, + tx: &mut MutTxId, + table_id: TableId, + column_schemas: Vec, + defaults: Vec, + ) -> Result { + tx.add_columns_to_table(table_id, column_schemas, defaults) + } } impl DataRow for Locking { @@ -1200,6 +1210,7 @@ mod tests { use core::{fmt, mem}; use itertools::Itertools; use pretty_assertions::{assert_eq, assert_matches}; + use spacetimedb_execution::dml::MutDatastore as _; use spacetimedb_execution::Datastore; use spacetimedb_lib::db::auth::{StAccess, StTableType}; use spacetimedb_lib::error::ResultTest; @@ -3361,8 +3372,202 @@ mod tests { Ok(()) } - // TODO: Add the following tests // - Create a tx that inserts 2000 rows with an auto_inc column // - Create a tx that inserts 2000 rows with an auto_inc column and then rolls back + + #[test] + fn test_add_columns_to_table() -> ResultTest<()> { + let datastore = get_datastore()?; + + let mut tx = begin_mut_tx(&datastore); + + let initial_sum_type = AlgebraicType::sum([("ba", AlgebraicType::U16)]); + let initial_columns = [ + ColumnSchema::for_test(0, "a", AlgebraicType::U64), + ColumnSchema::for_test(1, "b", initial_sum_type.clone()), + ]; + + let initial_indices = [ + IndexSchema::for_test("index_a", BTreeAlgorithm::from(0)), + IndexSchema::for_test("index_b", BTreeAlgorithm::from(1)), + ]; + + let sequence = SequenceRow { + id: SequenceId::SENTINEL.into(), + table: 0, + col_pos: 0, + name: "Foo_id_seq", + start: 5, + }; + + let schema = user_public_table( + initial_columns, + initial_indices.clone(), + [], + map_array([sequence]), + None, + None, + ); + + let table_id = datastore.create_table_mut_tx(&mut tx, schema)?; + + let mut columns_original = tx.get_schema(table_id).unwrap().columns.to_vec(); + + // Insert initial rows + let initial_row = product![0u64, AlgebraicValue::sum(0, AlgebraicValue::U16(1))]; + insert(&datastore, &mut tx, table_id, &initial_row).unwrap(); + insert(&datastore, &mut tx, table_id, &initial_row).unwrap(); + commit(&datastore, tx)?; + + // Alter Table: Add Variant and Column + // + // Change the `b` column, adding a variant. + let vars_ref = &mut columns_original[1].col_type.as_sum_mut().unwrap().variants; + let mut vars = Vec::from(mem::take(vars_ref)); + vars.push(SumTypeVariant::new_named(AlgebraicType::U8, "bb")); + *vars_ref = vars.into(); + // Add column `c` + let mut new_columns = columns_original.clone(); + new_columns.push(ColumnSchema::for_test(2, "c", AlgebraicType::U8)); + let defaults = vec![AlgebraicValue::U8(42)]; + + let mut tx = begin_mut_tx(&datastore); + // insert a row in tx_state when before adding column + tx.insert_product_value(table_id, &initial_row).unwrap(); + // add column and then rollback + let rollback_table_id = + datastore.add_columns_to_table_mut_tx(&mut tx, table_id, new_columns.clone(), defaults.clone())?; + let _ = tx.rollback(); + + let old_rows = [ + product![5u64, AlgebraicValue::sum(0, 1u16.into())], + product![6u64, AlgebraicValue::sum(0, 1u16.into())], + ]; + + let mut tx = begin_mut_tx(&datastore); + // check rollback was successful + let rows = tx + .table_scan(table_id) + .unwrap() + .map(|row| row.to_product_value()) + .collect::>(); + assert_eq!(rows, old_rows, "Rows shouldn't be changed if rolledback"); + let table = tx.table(rollback_table_id); + assert!(table.is_none(), "new table shouldn't be created if rolledback"); + + // Add colum and actually commit this time + tx.insert_product_value(table_id, &initial_row).unwrap(); + let new_table_id = datastore.add_columns_to_table_mut_tx(&mut tx, table_id, new_columns.clone(), defaults)?; + + let tx_data = commit(&datastore, tx)?; + + assert_ne!( + new_table_id, table_id, + "New table ID after migration should differ from old one" + ); + + // Validate Commitlog Changes + let (_, deletes) = tx_data + .deletes() + .find(|(id, _)| **id == table_id) + .expect("Expected delete log for original table"); + + assert_eq!( + &**deletes, &old_rows, + "Unexpected delete entries after altering the table" + ); + + let inserted_rows = [ + product![5u64, AlgebraicValue::sum(0, 1u16.into()), 42u8], + product![6u64, AlgebraicValue::sum(0, 1u16.into()), 42u8], + product![8u64, AlgebraicValue::sum(0, 1u16.into()), 42u8], + ]; + + let (_, inserts) = tx_data + .inserts() + .find(|(id, _)| **id == new_table_id) + .expect("Expected insert log for new table"); + + assert_eq!( + &**inserts, &inserted_rows, + "Unexpected insert entries after altering the table" + ); + + // Insert Rows into New Table + let mut tx = begin_mut_tx(&datastore); + + let new_row = product![0u64, AlgebraicValue::sum(0, 1u16.into()), 0u8]; + tx.insert_product_value(new_table_id, &new_row).unwrap(); + commit(&datastore, tx)?; + + // test for auto_inc feields + let tx = begin_mut_tx(&datastore); + let rows = tx.table_scan(new_table_id).unwrap().map(|row| row.to_product_value()); + + let mut last_row_auto_inc = 0; + for row in rows { + let auto_inc_col = row.get_field(0, None)?; + if let AlgebraicValue::U64(val) = auto_inc_col { + assert!(val > &last_row_auto_inc, "Auto-increment value did not increase"); + last_row_auto_inc = *val; + } + } + + Ok(()) + } + + #[test] + fn test_table_schemas_consistency() -> ResultTest<()> { + let datastore = get_datastore()?; + + let mut tx = begin_mut_tx(&datastore); + + let initial_sum_type = AlgebraicType::sum([("ba", AlgebraicType::U16)]); + let initial_columns = [ + ColumnSchema::for_test(0, "a", AlgebraicType::U64), + ColumnSchema::for_test(1, "b", initial_sum_type.clone()), + ]; + + let initial_indices = [ + IndexSchema::for_test("index_a", BTreeAlgorithm::from(0)), + IndexSchema::for_test("index_b", BTreeAlgorithm::from(1)), + ]; + + let sequence = SequenceRow { + id: SequenceId::SENTINEL.into(), + table: 0, + col_pos: 0, + name: "Foo_id_seq", + start: 5, + }; + + let schema = user_public_table( + initial_columns, + initial_indices.clone(), + [], + map_array([sequence]), + None, + None, + ); + + let table_id = datastore.create_table_mut_tx(&mut tx, schema)?; + let initial_row = product![0u64, AlgebraicValue::sum(0, AlgebraicValue::U16(1))]; + + for _ in 0..5000 { + insert(&datastore, &mut tx, table_id, &initial_row).unwrap(); + } + commit(&datastore, tx)?; + + let tx = begin_tx(&datastore); + let table_schema = tx.schema_for_table(table_id).unwrap(); + let table_schema_raw = tx.schema_for_table_raw(table_id).unwrap(); + + //TODO: Fix the bug and update the assert + assert_ne!( + table_schema.sequences[0].allocated, table_schema_raw.sequences[0].allocated, + "Allocated sequence values are differ between schema and raw schema" + ); + Ok(()) + } } diff --git a/crates/datastore/src/locking_tx_datastore/mut_tx.rs b/crates/datastore/src/locking_tx_datastore/mut_tx.rs index fbee223c89a..6f309d38b55 100644 --- a/crates/datastore/src/locking_tx_datastore/mut_tx.rs +++ b/crates/datastore/src/locking_tx_datastore/mut_tx.rs @@ -54,6 +54,7 @@ use spacetimedb_table::{ table_index::TableIndex, }; use std::{ + collections::HashMap, sync::Arc, time::{Duration, Instant}, }; @@ -493,6 +494,121 @@ impl MutTxId { Ok(()) } + /// Change the row type of the table identified by `table_id`. + /// + /// This is an incompatible change that requires a new table to be created. + /// The existing rows are copied over to the new table, + /// with `default_values` appended to each row. + /// + /// `column_schemas` must contain all the old columns in same order as before, + /// followed by the new columns to be added. + /// All new columns must have a default value in `default_values`. + /// + /// After calling this method, Table referred by `table_id` is dropped, + /// and a new table is created with the same name but a new `table_id`. + /// The new `table_id` is returned. + pub(crate) fn add_columns_to_table( + &mut self, + table_id: TableId, + column_schemas: Vec, + default_values: Vec, + ) -> Result { + let ((tx_table, ..), _) = self.get_or_create_insert_table_mut(table_id)?; + tx_table + .validate_add_columns_schema(&column_schemas, &default_values) + .map_err(TableError::from)?; + + // Copy all rows as `ProductValue` before dropping the table. + // This approach isn't ideal, as allocating a large contiguous block of memory + // can lead to memory overflow errors. + // Ideally, we should use iterators to avoid this, but that's not sufficient on its own. + // `CommittedState::merge_apply_inserts` also allocates a similar `Vec` to hold the data. + // So, if we really find this problematic in practice, this should be fixed in both places. + let mut table_rows: Vec = iter(&self.tx_state, &self.committed_state_write_lock, table_id)? + .map(|r| r.to_product_value()) + .collect(); + + // Intentionally using `schema_for_table_raw` instead of `schema_for_table`. + // + // Rationale: + // - `schema_for_table_raw` queries system tables directly, ensuring the + // most up-to-date schema. + // - `schema_for_table` relies on an in-memory cache, which may not reflect + // the latest sequence updates and can therefore return stale schemas. + let original_table_schema = self.schema_for_table_raw(table_id)?; + + log::debug!( + "ADDING TABLE COLUMN (incompatible layout): {}, table_id: {}", + original_table_schema.table_name, + table_id + ); + + // Store sequence values to restore them later with new table. + // Using a map from name to value as the new sequence ids will be different. + // and I am not sure if we should rely on the order of sequences in the table schema. + let seq_values: HashMap, i128> = original_table_schema + .sequences + .iter() + .map(|s| { + ( + s.sequence_name.clone(), + self.sequence_state_lock + .get_sequence_mut(s.sequence_id) + .expect("sequence just created") + .value, + ) + }) + .collect(); + + // Drop existing table first due to unique constraints on table name in `st_table` + self.drop_table(table_id)?; + + // Update existing (dropped) table schema with provided columns, reset Ids. + let mut updated_table_schema = original_table_schema; + updated_table_schema.columns = column_schemas; + updated_table_schema.reset(); + let new_table_id = self.create_table_and_update_seq(updated_table_schema, seq_values)?; + + // Populate rows with default values for new columns + for product_value in table_rows.iter_mut() { + let mut row_elements = product_value.elements.to_vec(); + row_elements.extend(default_values.iter().cloned()); + product_value.elements = row_elements.into(); + } + + let (new_table, tx_blob_store) = self + .tx_state + .get_table_and_blob_store(new_table_id) + .ok_or(TableError::IdNotFoundState(new_table_id))?; + + for row in table_rows { + new_table.insert(&self.committed_state_write_lock.page_pool, tx_blob_store, &row)?; + } + + Ok(new_table_id) + } + + fn create_table_and_update_seq( + &mut self, + table_schema: TableSchema, + seq_values: HashMap, i128>, + ) -> Result { + let table_id = self.create_table(table_schema.clone())?; + let table_schema = self.schema_for_table(table_id)?; + + for seq in table_schema.sequences.iter() { + let new_seq = self + .sequence_state_lock + .get_sequence_mut(seq.sequence_id) + .expect("sequence just created"); + new_seq.value = *seq_values + .get(&seq.sequence_name) + .ok_or_else(|| SequenceError::NotFound(seq.sequence_id))?; + } + + Ok(table_id) + } + /// Create an index. /// /// Requires: diff --git a/crates/schema/src/schema.rs b/crates/schema/src/schema.rs index 5f1013eb379..d7a0978bc48 100644 --- a/crates/schema/src/schema.rs +++ b/crates/schema/src/schema.rs @@ -175,6 +175,20 @@ impl TableSchema { } } + /// Reset all the ids in this schema to sentinel values. + /// It is useful when cloning a schema to create a new table. + pub fn reset(&mut self) { + self.update_table_id(TableId::SENTINEL); + self.indexes.iter_mut().for_each(|i| i.index_id = IndexId::SENTINEL); + self.sequences + .iter_mut() + .for_each(|i| i.sequence_id = SequenceId::SENTINEL); + self.constraints + .iter_mut() + .for_each(|i| i.constraint_id = ConstraintId::SENTINEL); + self.row_type = columns_to_row_type(&self.columns); + } + /// Convert a table schema into a list of columns. pub fn into_columns(self) -> Vec { self.columns diff --git a/crates/table/src/table.rs b/crates/table/src/table.rs index 8b907851d29..e88d135e3d0 100644 --- a/crates/table/src/table.rs +++ b/crates/table/src/table.rs @@ -300,6 +300,35 @@ pub enum ChangeColumnsErrorReason { IncompatibleRowLayout(#[from] IncompatibleTypeLayoutError), } +/// Error that can occur when attempting to [`Table::validate_add_columns_schema`]. +/// +/// Like [`ChangeColumnsError`], this should never normally be seen at runtime. +/// Any such error indicates a **bug in SpacetimeDB**, since incompatible +/// "add column" migrations should be caught earlier by the checks in +/// [`spacetimedb_schema::auto_migrate`]. +#[derive(Error, Debug)] +#[error("Cannot change the columns of table `{table_name}` with id {table_id} from `{old:?}` to `{new:?}`: {reason}")] +pub struct AddColumnsError { + table_id: TableId, + table_name: Box, + old: Vec, + new: Vec, + default_values: Vec, + reason: AddColumnsErrorReason, +} + +#[derive(Error, Debug)] +pub enum AddColumnsErrorReason { + #[error("Default value type does not match column type for column {0}")] + DefaultValueTypeNotMatch(ColId), + #[error("Missing default value for column {0}")] + DefaultValueMissing(ColId), + #[error("New column schema missing existing columns")] + MissingExistingColumns, + #[error(transparent)] + ExistingColumnsTypeMismatched(#[from] ChangeColumnsErrorReason), +} + /// Computes the parts of a table definition, that are row type dependent, from the row type. fn table_row_type_dependents(row_type: ProductType) -> (RowTypeLayout, StaticLayoutInTable, VarLenVisitorProgram) { let row_layout: RowTypeLayout = row_type.into(); @@ -329,60 +358,109 @@ impl Table { &mut self, column_schemas: Vec, ) -> Result, Box> { - /// Validate that the old row type layout can be changed to the new. - // Intentionally fail fast rather than combining errors with [`spacetimedb_data_structures::error_stream`] - // because we've (at least theoretically) already passed through - // `spacetimedb_schema::auto_migrate::ensure_old_ty_upgradable_to_new` to get here, - // and that method has proper pretty error reporting with `ErrorStream`. - // The error here is for internal debugging. - fn validate( - this: &Table, - new_row_layout: &RowTypeLayout, - column_schemas: &[ColumnSchema], - ) -> Result<(), Box> { - let schema = this.get_schema(); - let row_layout = this.row_layout(); - - let make_err = |reason| { - Box::new(ChangeColumnsError { - table_id: schema.table_id, - table_name: schema.table_name.clone(), - old: schema.columns().to_vec(), - new: column_schemas.to_vec(), - reason, - }) - }; + unsafe { self.change_columns_to_unchecked(column_schemas, Self::validate_row_type_layout) } + } - // Require that a scheduler table doesn't change the `id` and `at` fields. - if let Some((schedule, pk)) = schema.schedule.as_ref().zip(schema.pk()) { - let at_col = schedule.at_column.idx(); - if row_layout[at_col] != new_row_layout[at_col] { - return Err(make_err(ChangeColumnsErrorReason::ScheduleAtColumnChanged { - index: at_col, - old: row_layout[at_col].clone(), - new: new_row_layout[at_col].clone(), - })); - } + /// Validate that the old row type layout can be changed to the new. + // Intentionally fail fast rather than combining errors with [`spacetimedb_data_structures::error_stream`] + // because we've (at least theoretically) already passed through + // `spacetimedb_schema::auto_migrate::ensure_old_ty_upgradable_to_new` to get here, + // and that method has proper pretty error reporting with `ErrorStream`. + // The error here is for internal debugging. + fn validate_row_type_layout( + &self, + new_row_layout: &RowTypeLayout, + column_schemas: &[ColumnSchema], + ) -> Result<(), Box> { + let schema = self.get_schema(); + let row_layout = self.row_layout(); + + let make_err = |reason| { + Box::new(ChangeColumnsError { + table_id: schema.table_id, + table_name: schema.table_name.clone(), + old: schema.columns().to_vec(), + new: column_schemas.to_vec(), + reason, + }) + }; - let id_col = pk.col_pos.idx(); - if row_layout[id_col] != new_row_layout[id_col] { - return Err(make_err(ChangeColumnsErrorReason::ScheduleIdColumnChanged { - index: id_col, - old: row_layout[id_col].clone(), - new: new_row_layout[id_col].clone(), - })); - } + // Require that a scheduler table doesn't change the `id` and `at` fields. + if let Some((schedule, pk)) = schema.schedule.as_ref().zip(schema.pk()) { + let at_col = schedule.at_column.idx(); + if row_layout[at_col] != new_row_layout[at_col] { + return Err(make_err(ChangeColumnsErrorReason::ScheduleAtColumnChanged { + index: at_col, + old: row_layout[at_col].clone(), + new: new_row_layout[at_col].clone(), + })); } - // The `row_layout` must also be compatible with the new. - if let Err(reason) = row_layout.ensure_compatible_with(new_row_layout) { - return Err(make_err((*reason).into())); + let id_col = pk.col_pos.idx(); + if row_layout[id_col] != new_row_layout[id_col] { + return Err(make_err(ChangeColumnsErrorReason::ScheduleIdColumnChanged { + index: id_col, + old: row_layout[id_col].clone(), + new: new_row_layout[id_col].clone(), + })); } + } - Ok(()) + // The `row_layout` must also be compatible with the new. + if let Err(reason) = row_layout.ensure_compatible_with(new_row_layout) { + return Err(make_err((*reason).into())); } - unsafe { self.change_columns_to_unchecked(column_schemas, validate) } + Ok(()) + } + + /// Validates that the proposed `new_columns` schema is compatible with the + /// existing table schema and that all newly added columns are initialized + /// with default values. + ///`new_columns`: columns schema after adding new columns ordered by `ColId`s. + ///`default_values`: default values for newly added columns ordered by `ColId`s. + pub fn validate_add_columns_schema( + &self, + new_columns: &[ColumnSchema], + default_values: &[AlgebraicValue], + ) -> Result<(), Box> { + let schema = self.get_schema(); + let existing_columns = &schema.columns; + + let make_err = |reason| { + Box::new(AddColumnsError { + table_id: schema.table_id, + table_name: schema.table_name.clone(), + old: schema.columns().to_vec(), + new: new_columns.to_vec(), + default_values: default_values.to_vec(), + reason, + }) + }; + + // Ensure we have at least as many (prefix) as the existing columns. + let Some(old_cols_in_new_schema) = &new_columns.get(..existing_columns.len()) else { + return Err(make_err(AddColumnsErrorReason::MissingExistingColumns)); + }; + + // Ensure that the existing prefix is compatible with the new prefix. + let new_row_layout: RowTypeLayout = columns_to_row_type(old_cols_in_new_schema).into(); + self.validate_row_type_layout(&new_row_layout, new_columns) + .map_err(|e| make_err(e.reason.into()))?; + + // Validate that all new columns have default values and thier types match. + for (idx, new_col) in new_columns.iter().skip(existing_columns.len()).enumerate() { + let default_value = default_values + .get(idx) + .ok_or_else(|| make_err(AddColumnsErrorReason::DefaultValueMissing(new_col.col_pos)))?; + if Some(new_col.col_type.clone()) != default_value.type_of() { + return Err(make_err(AddColumnsErrorReason::DefaultValueTypeNotMatch( + new_col.col_pos, + ))); + } + } + + Ok(()) } /// Change the columns of `self` to those in `column_schemas`