Skip to content

Commit b7bb652

Browse files
committed
Add columns to table and test
1 parent b4bec5b commit b7bb652

File tree

7 files changed

+424
-52
lines changed

7 files changed

+424
-52
lines changed

crates/core/src/db/relational_db.rs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ use futures::channel::mpsc;
1010
use futures::StreamExt;
1111
use parking_lot::RwLock;
1212
use spacetimedb_commitlog as commitlog;
13+
use spacetimedb_data_structures::map::IntMap;
1314
use spacetimedb_datastore::db_metrics::DB_METRICS;
1415
use spacetimedb_datastore::error::{DatastoreError, TableError};
1516
use spacetimedb_datastore::execution_context::{ReducerContext, Workload, WorkloadType};
@@ -1052,6 +1053,18 @@ impl RelationalDB {
10521053
Ok(self.inner.alter_table_row_type_mut_tx(tx, table_id, column_schemas)?)
10531054
}
10541055

1056+
pub(crate) fn add_columns_to_table(
1057+
&self,
1058+
tx: &mut MutTx,
1059+
table_id: TableId,
1060+
column_schemas: Vec<ColumnSchema>,
1061+
default_values: IntMap<ColId, AlgebraicValue>,
1062+
) -> Result<TableId, DBError> {
1063+
Ok(self
1064+
.inner
1065+
.add_columns_to_table_mut_tx(tx, table_id, column_schemas, default_values)?)
1066+
}
1067+
10551068
/// Reports the `TxMetrics`s passed.
10561069
///
10571070
/// Should only be called after the tx lock has been fully released.

crates/core/src/db/update.rs

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,12 @@
11
use super::relational_db::RelationalDB;
22
use crate::database_logger::SystemLogger;
33
use crate::sql::parser::RowLevelExpr;
4-
use spacetimedb_data_structures::map::HashMap;
4+
use spacetimedb_data_structures::map::{HashMap, IntMap};
55
use spacetimedb_datastore::locking_tx_datastore::MutTxId;
66
use spacetimedb_lib::db::auth::StTableType;
77
use spacetimedb_lib::identity::AuthCtx;
88
use spacetimedb_lib::AlgebraicValue;
9-
use spacetimedb_primitives::{ColSet, TableId};
9+
use spacetimedb_primitives::{ColId, ColSet, TableId};
1010
use spacetimedb_schema::auto_migrate::{AutoMigratePlan, ManualMigratePlan, MigratePlan};
1111
use spacetimedb_schema::def::TableDef;
1212
use spacetimedb_schema::schema::{column_schemas_from_defs, IndexSchema, Schema, SequenceSchema, TableSchema};
@@ -252,6 +252,18 @@ fn auto_migrate_database(
252252
log!(logger, "Removing-row level security `{sql_rls}`");
253253
stdb.drop_row_level_security(tx, sql_rls.clone())?;
254254
}
255+
spacetimedb_schema::auto_migrate::AutoMigrateStep::AddColumns(table_name) => {
256+
let table_def = plan.new.stored_in_table_def(table_name).expect("table must exist");
257+
let table_id = stdb.table_id_from_name_mut(tx, table_name).unwrap().unwrap();
258+
let column_schemas = column_schemas_from_defs(plan.new, &table_def.columns, table_id);
259+
260+
let default_values: IntMap<ColId, AlgebraicValue> = table_def
261+
.columns
262+
.iter()
263+
.filter_map(|col_def| col_def.default_value.as_ref().map(|v| (col_def.col_id, v.clone())))
264+
.collect();
265+
stdb.add_columns_to_table(tx, table_id, column_schemas, default_values)?;
266+
}
255267
_ => anyhow::bail!("migration step not implemented: {step:?}"),
256268
}
257269
}

crates/datastore/src/error.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,8 @@ pub enum TableError {
8181
#[error(transparent)]
8282
// Error here is `Box`ed to avoid triggering https://rust-lang.github.io/rust-clippy/master/index.html#result_large_err .
8383
ChangeColumnsError(#[from] Box<table::ChangeColumnsError>),
84+
#[error(transparent)]
85+
AddColumnsError(#[from] Box<table::AddColumnsError>),
8486
}
8587

8688
#[derive(Error, Debug, PartialEq, Eq)]

crates/datastore/src/locking_tx_datastore/datastore.rs

Lines changed: 145 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,12 +27,12 @@ use anyhow::{anyhow, Context};
2727
use core::{cell::RefCell, ops::RangeBounds};
2828
use parking_lot::{Mutex, RwLock};
2929
use spacetimedb_commitlog::payload::{txdata, Txdata};
30-
use spacetimedb_data_structures::map::{HashCollectionExt, HashMap};
30+
use spacetimedb_data_structures::map::{HashCollectionExt, HashMap, IntMap};
3131
use spacetimedb_durability::TxOffset;
3232
use spacetimedb_lib::{db::auth::StAccess, metrics::ExecutionMetrics};
3333
use spacetimedb_lib::{ConnectionId, Identity};
3434
use spacetimedb_paths::server::SnapshotDirPath;
35-
use spacetimedb_primitives::{ColList, ConstraintId, IndexId, SequenceId, TableId};
35+
use spacetimedb_primitives::{ColId, ColList, ConstraintId, IndexId, SequenceId, TableId};
3636
use spacetimedb_sats::memory_usage::MemoryUsage;
3737
use spacetimedb_sats::{bsatn, buffer::BufReader, AlgebraicValue, ProductValue};
3838
use spacetimedb_schema::schema::{ColumnSchema, IndexSchema, SequenceSchema, TableSchema};
@@ -313,6 +313,16 @@ impl Locking {
313313
) -> Result<()> {
314314
tx.alter_table_row_type(table_id, column_schemas)
315315
}
316+
317+
pub fn add_columns_to_table_mut_tx(
318+
&self,
319+
tx: &mut MutTxId,
320+
table_id: TableId,
321+
column_schemas: Vec<ColumnSchema>,
322+
defaults: IntMap<ColId, AlgebraicValue>,
323+
) -> Result<TableId> {
324+
tx.add_columns_to_table(table_id, column_schemas, defaults)
325+
}
316326
}
317327

318328
impl DataRow for Locking {
@@ -1159,6 +1169,8 @@ mod tests {
11591169
use core::{fmt, mem};
11601170
use itertools::Itertools;
11611171
use pretty_assertions::{assert_eq, assert_matches};
1172+
use spacetimedb_data_structures::map::IntMap;
1173+
use spacetimedb_execution::dml::MutDatastore as _;
11621174
use spacetimedb_execution::Datastore;
11631175
use spacetimedb_lib::db::auth::{StAccess, StTableType};
11641176
use spacetimedb_lib::error::ResultTest;
@@ -3320,8 +3332,138 @@ mod tests {
33203332

33213333
Ok(())
33223334
}
3323-
33243335
// TODO: Add the following tests
33253336
// - Create a tx that inserts 2000 rows with an auto_inc column
33263337
// - Create a tx that inserts 2000 rows with an auto_inc column and then rolls back
3338+
3339+
#[test]
3340+
fn test_add_columns_to_table() -> ResultTest<()> {
3341+
let datastore = get_datastore()?;
3342+
3343+
let mut tx = begin_mut_tx(&datastore);
3344+
3345+
let initial_sum_type = AlgebraicType::sum([("ba", AlgebraicType::U16)]);
3346+
let initial_columns = [
3347+
ColumnSchema::for_test(0, "a", AlgebraicType::U64),
3348+
ColumnSchema::for_test(1, "b", initial_sum_type.clone()),
3349+
];
3350+
3351+
let initial_indices = [
3352+
IndexSchema::for_test("index_a", BTreeAlgorithm::from(0)),
3353+
IndexSchema::for_test("index_b", BTreeAlgorithm::from(1)),
3354+
];
3355+
3356+
let sequence = SequenceRow {
3357+
id: SequenceId::SENTINEL.into(),
3358+
table: 0,
3359+
col_pos: 0,
3360+
name: "Foo_id_seq",
3361+
start: 5,
3362+
};
3363+
3364+
let schema = user_public_table(
3365+
initial_columns,
3366+
initial_indices.clone(),
3367+
[],
3368+
map_array([sequence]),
3369+
None,
3370+
None,
3371+
);
3372+
3373+
let table_id = datastore.create_table_mut_tx(&mut tx, schema)?;
3374+
3375+
let columns_original = tx.get_schema(table_id).unwrap().columns.to_vec();
3376+
3377+
// Insert initial rows
3378+
let initial_row = ProductValue {
3379+
elements: [AlgebraicValue::U64(0), AlgebraicValue::sum(0, AlgebraicValue::U16(1))].into(),
3380+
};
3381+
tx.insert_product_value(table_id, &initial_row).unwrap();
3382+
tx.insert_product_value(table_id, &initial_row).unwrap();
3383+
3384+
commit(&datastore, tx)?;
3385+
3386+
// Alter Table: Add Variant and Column
3387+
let mut new_columns = columns_original.clone();
3388+
new_columns.push(ColumnSchema::for_test(2, "c", AlgebraicType::U8));
3389+
let defaults = IntMap::from_iter([(2.into(), AlgebraicValue::U8(42))]);
3390+
3391+
let mut tx = begin_mut_tx(&datastore);
3392+
let new_table_id = datastore.add_columns_to_table_mut_tx(&mut tx, table_id, new_columns.clone(), defaults)?;
3393+
let tx_data = commit(&datastore, tx)?;
3394+
3395+
assert_ne!(
3396+
new_table_id, table_id,
3397+
"New table ID after migration should differ from old one"
3398+
);
3399+
3400+
// Validate Commitlog Changes
3401+
let (_, deletes) = tx_data
3402+
.deletes()
3403+
.find(|(id, _)| **id == table_id)
3404+
.expect("Expected delete log for original table");
3405+
3406+
let deleted_rows = [
3407+
product![AlgebraicValue::U64(5), AlgebraicValue::sum(0, AlgebraicValue::U16(1))],
3408+
product![AlgebraicValue::U64(6), AlgebraicValue::sum(0, AlgebraicValue::U16(1))],
3409+
];
3410+
3411+
assert_eq!(
3412+
&**deletes, &deleted_rows,
3413+
"Unexpected delete entries after altering the table"
3414+
);
3415+
3416+
let inserted_rows = [
3417+
product![
3418+
AlgebraicValue::U64(5),
3419+
AlgebraicValue::sum(0, AlgebraicValue::U16(1)),
3420+
AlgebraicValue::U8(42)
3421+
],
3422+
product![
3423+
AlgebraicValue::U64(6),
3424+
AlgebraicValue::sum(0, AlgebraicValue::U16(1)),
3425+
AlgebraicValue::U8(42)
3426+
],
3427+
];
3428+
3429+
let (_, inserts) = tx_data
3430+
.inserts()
3431+
.find(|(id, _)| **id == new_table_id)
3432+
.expect("Expected insert log for new table");
3433+
3434+
assert_eq!(
3435+
&**inserts, &inserted_rows,
3436+
"Unexpected insert entries after altering the table"
3437+
);
3438+
3439+
// Insert Rows into Altered Table
3440+
let mut tx = begin_mut_tx(&datastore);
3441+
3442+
let new_row = ProductValue {
3443+
elements: [
3444+
AlgebraicValue::U64(0),
3445+
AlgebraicValue::sum(0, AlgebraicValue::U16(1)),
3446+
AlgebraicValue::U8(0),
3447+
]
3448+
.into(),
3449+
};
3450+
3451+
tx.insert_product_value(new_table_id, &new_row).unwrap();
3452+
commit(&datastore, tx)?;
3453+
3454+
// test for auto_inc feields
3455+
let tx = begin_mut_tx(&datastore);
3456+
let rows = tx.table_scan(new_table_id).unwrap().map(|row| row.to_product_value());
3457+
3458+
let mut last_row_auto_inc = 0;
3459+
for row in rows {
3460+
let auto_inc_col = row.get_field(0, None)?;
3461+
if let AlgebraicValue::U64(val) = auto_inc_col {
3462+
assert!(val > &last_row_auto_inc, "Auto-increment value did not increase");
3463+
last_row_auto_inc = *val;
3464+
}
3465+
}
3466+
3467+
Ok(())
3468+
}
33273469
}

crates/datastore/src/locking_tx_datastore/mut_tx.rs

Lines changed: 112 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,9 @@ use super::{
88
tx_state::{IndexIdMap, PendingSchemaChange, TxState, TxTableForInsertion},
99
SharedMutexGuard, SharedWriteGuard,
1010
};
11-
use crate::execution_context::ExecutionContext;
1211
use crate::execution_context::Workload;
1312
use crate::traits::{InsertFlags, RowTypeForTable, TxData, UpdateFlags};
13+
use crate::{error::DatastoreError, execution_context::ExecutionContext};
1414
use crate::{
1515
error::{IndexError, SequenceError, TableError},
1616
system_tables::{
@@ -24,7 +24,9 @@ use crate::{
2424
use core::ops::RangeBounds;
2525
use core::{cell::RefCell, mem};
2626
use core::{iter, ops::Bound};
27+
use itertools::Itertools;
2728
use smallvec::SmallVec;
29+
use spacetimedb_data_structures::map::IntMap;
2830
use spacetimedb_execution::{dml::MutDatastore, Datastore, DeltaStore, Row};
2931
use spacetimedb_lib::{db::raw_def::v9::RawSql, metrics::ExecutionMetrics};
3032
use spacetimedb_lib::{
@@ -493,6 +495,115 @@ impl MutTxId {
493495
Ok(())
494496
}
495497

498+
/// Change the row type of the table identified by `table_id`.
499+
///
500+
/// This is an incompatible change that requires a new table to be created.
501+
/// The existing rows are copied over to the new table,
502+
/// with `default_values` appended to each row.
503+
///
504+
/// `column_schemas` must contain all the old columns in same order as before,
505+
/// followed by the new columns to be added.
506+
/// All new columns must have a default value in `default_values`.
507+
///
508+
/// After calling this method, Table referred by `table_id` is dropped,
509+
/// and a new table is created with the same name but a new `table_id`.
510+
/// The new `table_id` is returned.
511+
pub(crate) fn add_columns_to_table(
512+
&mut self,
513+
table_id: TableId,
514+
column_schemas: Vec<ColumnSchema>,
515+
default_values: IntMap<ColId, AlgebraicValue>,
516+
) -> Result<TableId> {
517+
let ((tx_table, ..), _) = self.get_or_create_insert_table_mut(table_id)?;
518+
tx_table
519+
.validate_add_columns_schema(&column_schemas, &default_values)
520+
.map_err(TableError::from)?;
521+
522+
// Intentionally using `schema_for_table_raw` instead of `schema_for_table`.
523+
//
524+
// Rationale:
525+
// - `schema_for_table_raw` queries system tables directly, ensuring the
526+
// most up-to-date schema.
527+
// - `schema_for_table` relies on an in-memory cache, which may not reflect
528+
// the latest sequence updates and can therefore return stale schemas.
529+
let original_table_schema = self.schema_for_table_raw(table_id)?;
530+
531+
// Default values must be for a sequence of contiguous column IDs,
532+
// beginning immediately after the last column ID in the existing schema.
533+
// This has been checked inside `Table::validate_add_columns_schema`
534+
let default_values: Vec<AlgebraicValue> = default_values
535+
.into_iter()
536+
.sorted_by_key(|(col_id, _)| *col_id)
537+
.map(|(_, val)| val)
538+
.collect();
539+
540+
log::debug!(
541+
"TABLE ALTERING (incompatible layout): {}, table_id: {}",
542+
original_table_schema.table_name,
543+
table_id
544+
);
545+
546+
// Drop existing table first due to unique constraints on table name in `st_table`
547+
self.drop_table(table_id)?;
548+
let original_commit_table = match self.tx_state.pending_schema_changes.pop() {
549+
Some(PendingSchemaChange::TableRemoved(tid, table)) if tid == table_id => table,
550+
_ => {
551+
return Err(DatastoreError::Other(anyhow::anyhow!(
552+
"Expected last pending schema change to be dropping table {table_id}"
553+
)))
554+
}
555+
};
556+
557+
// Update existing (dropped) table schema with provided columns, reset Ids.
558+
let mut updated_table_schema = original_table_schema;
559+
updated_table_schema.columns = column_schemas;
560+
updated_table_schema.reset();
561+
let new_table_id = self.create_table_and_bump_seq(updated_table_schema)?;
562+
563+
// Populate rows with default values for new columns
564+
let committed_blob_store = &self.committed_state_write_lock.blob_store;
565+
let migrated_rows = original_commit_table
566+
.scan_rows(committed_blob_store)
567+
.map(|table_row| table_row.to_product_value())
568+
.map(|mut product_value| {
569+
let mut row_elements = product_value.elements.to_vec();
570+
row_elements.extend(default_values.iter().cloned());
571+
product_value.elements = row_elements.into();
572+
product_value
573+
});
574+
575+
let (new_table, tx_blob_store) = self
576+
.tx_state
577+
.get_table_and_blob_store(new_table_id)
578+
.ok_or(TableError::IdNotFoundState(new_table_id))?;
579+
580+
for migrated_row in migrated_rows {
581+
new_table.insert(&self.committed_state_write_lock.page_pool, tx_blob_store, &migrated_row)?;
582+
}
583+
584+
self.push_schema_change(PendingSchemaChange::TableRemoved(table_id, original_commit_table));
585+
self.push_schema_change(PendingSchemaChange::TableAdded(new_table_id));
586+
587+
Ok(new_table_id)
588+
}
589+
590+
fn create_table_and_bump_seq(&mut self, table_schema: TableSchema) -> Result<TableId> {
591+
let table_id = self.create_table(table_schema.clone())?;
592+
let table_schema = self.schema_for_table(table_id)?;
593+
594+
// Bump each sequence value so that the next generated value requires
595+
// a fresh allocation.
596+
for seq_id in table_schema.sequences.iter().map(|s| s.sequence_id) {
597+
let seq = self
598+
.sequence_state_lock
599+
.get_sequence_mut(seq_id)
600+
.expect("sequence just created");
601+
seq.value = seq.allocated() + 1;
602+
}
603+
604+
Ok(table_id)
605+
}
606+
496607
/// Create an index.
497608
///
498609
/// Requires:

0 commit comments

Comments
 (0)