diff --git a/Cargo.lock b/Cargo.lock index c2d03fb6b56..a8c65fe12c8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7517,6 +7517,7 @@ version = "1.7.0" dependencies = [ "anyhow", "bigdecimal", + "bytes", "derive_more 0.99.20", "ethnum", "pretty_assertions", diff --git a/crates/client-api-messages/src/energy.rs b/crates/client-api-messages/src/energy.rs index fe58aa054d5..7109d9c1e85 100644 --- a/crates/client-api-messages/src/energy.rs +++ b/crates/client-api-messages/src/energy.rs @@ -121,21 +121,21 @@ impl fmt::Debug for EnergyBalance { } } -/// A measure of energy representing the energy budget for a reducer. +/// A measure of energy representing the energy budget for a reducer or any callable function. /// /// In contrast to [`EnergyQuanta`], this is represented by a 64-bit integer. This makes energy handling /// for reducers easier, while still providing a unlikely-to-ever-be-reached maximum value (e.g. for wasmtime: /// `(u64::MAX eV / 1000 eV/instruction) * 3 ns/instruction = 640 days`) #[derive(Copy, Clone, From, Add, Sub)] -pub struct ReducerBudget(u64); +pub struct FunctionBudget(u64); -impl ReducerBudget { +impl FunctionBudget { // 1 second of wasm runtime is roughly 2 TeV, so this is // roughly 1 minute of wasm runtime - pub const DEFAULT_BUDGET: Self = ReducerBudget(120_000_000_000_000); + pub const DEFAULT_BUDGET: Self = FunctionBudget(120_000_000_000_000); - pub const ZERO: Self = ReducerBudget(0); - pub const MAX: Self = ReducerBudget(u64::MAX); + pub const ZERO: Self = FunctionBudget(0); + pub const MAX: Self = FunctionBudget(u64::MAX); pub fn new(v: u64) -> Self { Self(v) @@ -151,13 +151,13 @@ impl ReducerBudget { } } -impl From for EnergyQuanta { - fn from(value: ReducerBudget) -> Self { +impl From for EnergyQuanta { + fn from(value: FunctionBudget) -> Self { EnergyQuanta::new(value.0.into()) } } -impl fmt::Debug for ReducerBudget { +impl fmt::Debug for FunctionBudget { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_tuple("ReducerBudget") .field(&EnergyQuanta::from(*self)) diff --git a/crates/client-api/src/lib.rs b/crates/client-api/src/lib.rs index fd426bb154c..32ec5668434 100644 --- a/crates/client-api/src/lib.rs +++ b/crates/client-api/src/lib.rs @@ -78,57 +78,52 @@ impl Host { let (tx_offset, durable_offset, json) = self .host_controller - .using_database( - database, - self.replica_id, - move |db| -> axum::response::Result<_, (StatusCode, String)> { - tracing::info!(sql = body); - - // We need a header for query results - let mut header = vec![]; - - let sql_start = std::time::Instant::now(); - let sql_span = - tracing::trace_span!("execute_sql", total_duration = tracing::field::Empty,).entered(); - - let result = sql::execute::run( - // Returns an empty result set for mutations - db, - &body, - auth, - Some(&module_host.info().subscriptions), - &mut header, - ) - .map_err(|e| { - log::warn!("{e}"); - if let Some(auth_err) = e.get_auth_error() { - (StatusCode::UNAUTHORIZED, auth_err.to_string()) - } else { - (StatusCode::BAD_REQUEST, e.to_string()) - } - })?; - - let total_duration = sql_start.elapsed(); - sql_span.record("total_duration", tracing::field::debug(total_duration)); - - // Turn the header into a `ProductType` - let schema = header - .into_iter() - .map(|(col_name, col_type)| ProductTypeElement::new(col_type, Some(col_name))) - .collect(); - - Ok(( - result.tx_offset, - db.durable_tx_offset(), - vec![SqlStmtResult { - schema, - rows: result.rows, - total_duration_micros: total_duration.as_micros() as u64, - stats: SqlStmtStats::from_metrics(&result.metrics), - }], - )) - }, - ) + .using_database(database, self.replica_id, move |db| async move { + tracing::info!(sql = body); + let mut header = vec![]; + let sql_start = std::time::Instant::now(); + let sql_span = tracing::trace_span!("execute_sql", total_duration = tracing::field::Empty,); + let _guard = sql_span.enter(); + + let result = sql::execute::run( + &db, + &body, + auth, + Some(&module_host.info.subscriptions), + Some(&module_host), + auth.caller, + &mut header, + ) + .await + .map_err(|e| { + log::warn!("{e}"); + if let Some(auth_err) = e.get_auth_error() { + (StatusCode::UNAUTHORIZED, auth_err.to_string()) + } else { + (StatusCode::BAD_REQUEST, e.to_string()) + } + })?; + + let total_duration = sql_start.elapsed(); + drop(_guard); + sql_span.record("total_duration", tracing::field::debug(total_duration)); + + let schema = header + .into_iter() + .map(|(col_name, col_type)| ProductTypeElement::new(col_type, Some(col_name))) + .collect(); + + Ok::<_, (StatusCode, String)>(( + result.tx_offset, + db.durable_tx_offset(), + vec![SqlStmtResult { + schema, + rows: result.rows, + total_duration_micros: total_duration.as_micros() as u64, + stats: SqlStmtStats::from_metrics(&result.metrics), + }], + )) + }) .await .map_err(log_and_500)??; @@ -154,7 +149,6 @@ impl Host { .await } } - /// Parameters for publishing a database. /// /// See [`ControlStateDelegate::publish_database`]. diff --git a/crates/core/src/db/relational_db.rs b/crates/core/src/db/relational_db.rs index 2010b0cd93d..5ac4f781dcd 100644 --- a/crates/core/src/db/relational_db.rs +++ b/crates/core/src/db/relational_db.rs @@ -1,17 +1,20 @@ use crate::db::MetricsRecorderQueue; use crate::error::{DBError, DatabaseError, RestoreSnapshotError}; +use crate::host::ArgsTuple; use crate::messages::control_db::HostType; use crate::subscription::ExecutionCounters; use crate::util::{asyncify, spawn_rayon}; use crate::worker_metrics::WORKER_METRICS; use anyhow::{anyhow, Context}; +use bytes::Bytes; use enum_map::EnumMap; use fs2::FileExt; +use log::trace; use spacetimedb_commitlog as commitlog; use spacetimedb_commitlog::repo::OnNewSegmentFn; use spacetimedb_data_structures::map::IntSet; use spacetimedb_datastore::db_metrics::DB_METRICS; -use spacetimedb_datastore::error::{DatastoreError, TableError}; +use spacetimedb_datastore::error::{DatastoreError, TableError, ViewError}; use spacetimedb_datastore::execution_context::{ReducerContext, Workload, WorkloadType}; use spacetimedb_datastore::locking_tx_datastore::datastore::TxMetrics; use spacetimedb_datastore::locking_tx_datastore::state_view::{ @@ -32,16 +35,18 @@ use spacetimedb_datastore::{ traits::TxData, }; use spacetimedb_durability as durability; +use spacetimedb_lib::bsatn::ToBsatn; use spacetimedb_lib::db::auth::StAccess; use spacetimedb_lib::db::raw_def::v9::{btree, RawModuleDefV9Builder, RawSql}; +use spacetimedb_lib::de::DeserializeSeed as _; use spacetimedb_lib::st_var::StVarValue; -use spacetimedb_lib::ConnectionId; use spacetimedb_lib::Identity; +use spacetimedb_lib::{bsatn, ConnectionId}; use spacetimedb_paths::server::{CommitLogDir, ReplicaDir, SnapshotsPath}; use spacetimedb_primitives::*; use spacetimedb_sats::algebraic_type::fmt::fmt_algebraic_type; use spacetimedb_sats::memory_usage::MemoryUsage; -use spacetimedb_sats::{AlgebraicType, AlgebraicValue, ProductType, ProductValue}; +use spacetimedb_sats::{AlgebraicType, AlgebraicTypeRef, AlgebraicValue, ProductType, ProductValue, Typespace}; use spacetimedb_schema::def::{ModuleDef, TableDef, ViewDef}; use spacetimedb_schema::schema::{ ColumnSchema, IndexSchema, RowLevelSecuritySchema, Schema, SequenceSchema, TableSchema, @@ -1073,11 +1078,11 @@ impl RelationalDB { tx: &mut MutTx, module_def: &ModuleDef, view_def: &ViewDef, - ) -> Result<(ViewId, TableId), DBError> { + ) -> Result<(ViewDatabaseId, TableId), DBError> { Ok(tx.create_view(module_def, view_def)?) } - pub fn drop_view(&self, tx: &mut MutTx, view_id: ViewId) -> Result<(), DBError> { + pub fn drop_view(&self, tx: &mut MutTx, view_id: ViewDatabaseId) -> Result<(), DBError> { Ok(tx.drop_view(view_id)?) } @@ -1168,7 +1173,7 @@ impl RelationalDB { Ok(self.inner.rename_table_mut_tx(tx, table_id, new_name)?) } - pub fn view_id_from_name_mut(&self, tx: &MutTx, view_name: &str) -> Result, DBError> { + pub fn view_id_from_name_mut(&self, tx: &MutTx, view_name: &str) -> Result, DBError> { Ok(self.inner.view_id_from_name_mut_tx(tx, view_name)?) } @@ -1503,8 +1508,99 @@ impl RelationalDB { .into() }) } -} + /// Materialize View backing table. + /// + /// # Process + /// 1. Serializes view arguments into `ST_VIEW_ARG_ID` + /// 2. Deletes stale rows matching the view arguments + /// 3. Deserializes the new view execution results + /// 4. Inserts fresh rows with the corresponding arg_id + /// + /// # Arguments + /// * `tx` - Mutable transaction context + /// * `view` - Name of the view to update + /// * `args` - Arguments passed to the view call + /// * `return_type` - Expected return type of the view + /// * `bytes` - Serialized (bsatn encoded) return value from view execution + /// * `typespace` - Type information for deserialization + /// * `caller_identity` - Identity of the caller (for non-anonymous views) + #[allow(clippy::too_many_arguments)] + pub fn materialize_view( + &self, + tx: &mut MutTxId, + view: &str, + args: ArgsTuple, + return_type: AlgebraicTypeRef, + bytes: Bytes, + typespace: &Typespace, + caller_identity: Identity, + ) -> Result<(), DBError> { + // Fetch view metadata + let st_view_row = tx.lookup_st_view_by_name(view)?; + let table_id = st_view_row.table_id.expect("View table must exist for materialization"); + let view_id = st_view_row.view_id; + let is_anonymous = st_view_row.is_anonymous; + let arg_id = tx.get_or_insert_st_view_arg(args.get_bsatn())?; + + // Build the filter key for identifying rows to update + let mut input_args = Vec::new(); + if !is_anonymous { + input_args.push(AlgebraicValue::OptionSome(caller_identity.into())); + } + if !tx.is_view_parameterized(view_id)? { + input_args.push(AlgebraicValue::U64(arg_id)); + } + let input_args = ProductValue { + elements: input_args.into_boxed_slice(), + }; + let col_list: ColList = (0..input_args.elements.len()).collect(); + + // Remove stale View entries + let rows_to_delete: Vec<_> = self + .iter_by_col_eq_mut(tx, table_id, col_list, &input_args.clone().into())? + .map(|res| res.pointer()) + .collect(); + + let deleted_count = self.delete(tx, table_id, rows_to_delete); + trace!("Deleted {deleted_count} stale rows from view table {table_id} for arg_id {arg_id}"); + + // Deserialize the return value + let seed = spacetimedb_sats::WithTypespace::new(typespace, &return_type).resolve(return_type); + let return_val = seed + .deserialize(bsatn::Deserializer::new(&mut &bytes[..])) + .map_err(|e| DatastoreError::from(ViewError::DeserializeReturn(e.to_string())))?; + + // Extract products from return value (must be array) + let products: Vec = return_val + .into_array() + .expect("Expected return_val to be an array") + .into_iter() + .map(|v| v.into_product().expect("Expected array elements to be ProductValue")) + .collect(); + + // Insert fresh results into the view table + let mut elements: Vec = + Vec::with_capacity(input_args.elements.len() + products.first().map_or(0, |p| p.elements.len())); + for product in products { + elements.clear(); + elements.extend_from_slice(&input_args.elements); + elements.extend_from_slice(&product.elements); + + let row = ProductValue { + elements: elements.as_slice().into(), + }; + + let row_bytes = row + .to_bsatn_vec() + .map_err(|_| DatastoreError::from(ViewError::SerializeRow))?; + + self.insert(tx, table_id, &row_bytes)?; + } + + Ok(()) + } +} #[allow(unused)] #[derive(Clone)] struct LockFile { @@ -2099,7 +2195,7 @@ pub mod tests_utils { name: &str, schema: &[(&str, AlgebraicType)], is_anonymous: bool, - ) -> Result { + ) -> Result<(ViewDatabaseId, TableId), DBError> { let mut builder = RawModuleDefV9Builder::new(); // Add the view's product type to the typespace @@ -2123,7 +2219,6 @@ pub mod tests_utils { // Allocate a backing table and return its table id db.with_auto_commit(Workload::Internal, |tx| db.create_view(tx, &module_def, view_def)) - .map(|(_, table_id)| table_id) } /// Insert a row into a view's backing table @@ -2218,7 +2313,7 @@ mod tests { use super::tests_utils::begin_mut_tx; use super::*; use crate::db::relational_db::tests_utils::{ - begin_tx, insert, make_snapshot, with_auto_commit, with_read_only, TestDB, + begin_tx, create_view_for_test, insert, make_snapshot, with_auto_commit, with_read_only, TestDB, }; use anyhow::bail; use bytes::Bytes; @@ -2229,6 +2324,7 @@ mod tests { use spacetimedb_data_structures::map::IntMap; use spacetimedb_datastore::error::{DatastoreError, IndexError}; use spacetimedb_datastore::execution_context::ReducerContext; + use spacetimedb_datastore::locking_tx_datastore::ViewCall; use spacetimedb_datastore::system_tables::{ system_tables, StConstraintRow, StIndexRow, StSequenceRow, StTableRow, ST_CONSTRAINT_ID, ST_INDEX_ID, ST_SEQUENCE_ID, ST_TABLE_ID, @@ -2861,6 +2957,57 @@ mod tests { Ok(()) } + #[test] + fn test_is_materialized() -> anyhow::Result<()> { + let stdb = TestDB::in_memory()?; + let schema = [("col1", AlgebraicType::I64), ("col2", AlgebraicType::I64)]; + let table_schema = table("MyTable", ProductType::from(schema), |b| b); + + let view_schema = [("view_col", AlgebraicType::I64)]; + let view_name = "MyView"; + let args: Bytes = vec![].into(); + let sender = Identity::ZERO; + let (view_id, _) = create_view_for_test(&stdb, view_name, &view_schema, true)?; + + let mut tx = begin_mut_tx(&stdb); + let table_id = stdb.create_table(&mut tx, table_schema)?; + + assert!( + !tx.is_materialized(view_name, args.clone(), sender)?.0, + "view should not be materialized as read set is not recorded yet" + ); + + let view_call = Some(ViewCall::anonymous(view_id, args)); + tx.record_table_scan(view_call, table_id); + assert!( + tx.is_materialized(view_name, vec![].into(), sender)?.0, + "view should be materialized as read set is recorded" + ); + stdb.commit_tx(tx)?; + + let tx = begin_mut_tx(&stdb); + assert!( + tx.is_materialized(view_name, vec![].into(), sender)?.0, + "view should be materialized after commit" + ); + stdb.commit_tx(tx)?; + + let mut tx = begin_mut_tx(&stdb); + stdb.insert( + &mut tx, + table_id, + &product![AlgebraicValue::I64(1), AlgebraicValue::I64(2)].to_bsatn_vec()?, + )?; + stdb.commit_tx(tx)?; + + let tx = begin_mut_tx(&stdb); + assert!( + !tx.is_materialized(view_name, vec![].into(), sender)?.0, + "view should not be materialized after table modification" + ); + Ok(()) + } + #[test] /// Test that iteration yields each row only once /// in the edge case where a row is committed and has been deleted and re-inserted within the iterating TX. diff --git a/crates/core/src/energy.rs b/crates/core/src/energy.rs index aaf249be9da..c66378e07a5 100644 --- a/crates/core/src/energy.rs +++ b/crates/core/src/energy.rs @@ -5,18 +5,18 @@ use spacetimedb_lib::{Hash, Identity}; use crate::messages::control_db::Database; pub use spacetimedb_client_api_messages::energy::*; -pub struct ReducerFingerprint<'a> { +pub struct FunctionFingerprint<'a> { pub module_hash: Hash, pub module_identity: Identity, pub caller_identity: Identity, - pub reducer_name: &'a str, + pub function_name: &'a str, } pub trait EnergyMonitor: Send + Sync + 'static { - fn reducer_budget(&self, fingerprint: &ReducerFingerprint<'_>) -> ReducerBudget; + fn reducer_budget(&self, fingerprint: &FunctionFingerprint<'_>) -> FunctionBudget; fn record_reducer( &self, - fingerprint: &ReducerFingerprint<'_>, + fingerprint: &FunctionFingerprint<'_>, energy_used: EnergyQuanta, execution_duration: Duration, ); @@ -29,13 +29,13 @@ pub trait EnergyMonitor: Send + Sync + 'static { pub struct NullEnergyMonitor; impl EnergyMonitor for NullEnergyMonitor { - fn reducer_budget(&self, _fingerprint: &ReducerFingerprint<'_>) -> ReducerBudget { - ReducerBudget::DEFAULT_BUDGET + fn reducer_budget(&self, _fingerprint: &FunctionFingerprint<'_>) -> FunctionBudget { + FunctionBudget::DEFAULT_BUDGET } fn record_reducer( &self, - _fingerprint: &ReducerFingerprint<'_>, + _fingerprint: &FunctionFingerprint<'_>, _energy_used: EnergyQuanta, _execution_duration: Duration, ) { diff --git a/crates/core/src/host/host_controller.rs b/crates/core/src/host/host_controller.rs index dfecd0dbd3a..dab36e8751a 100644 --- a/crates/core/src/host/host_controller.rs +++ b/crates/core/src/host/host_controller.rs @@ -170,6 +170,22 @@ impl From<&EventStatus> for ReducerOutcome { } } +pub enum ViewOutcome { + Success, + Failed(String), + BudgetExceeded, +} + +impl From for ViewOutcome { + fn from(status: EventStatus) -> Self { + match status { + EventStatus::Committed(_) => ViewOutcome::Success, + EventStatus::Failed(e) => ViewOutcome::Failed(e), + EventStatus::OutOfEnergy => ViewOutcome::BudgetExceeded, + } + } +} + #[derive(Clone, Debug)] pub struct ProcedureCallResult { pub return_val: AlgebraicValue, @@ -326,9 +342,10 @@ impl HostController { /// If the computation `F` panics, the host is removed from this controller, /// releasing its resources. #[tracing::instrument(level = "trace", skip_all)] - pub async fn using_database(&self, database: Database, replica_id: u64, f: F) -> anyhow::Result + pub async fn using_database(&self, database: Database, replica_id: u64, f: F) -> anyhow::Result where - F: FnOnce(&RelationalDB) -> T + Send + 'static, + F: FnOnce(Arc) -> Fut + Send + 'static, + Fut: std::future::Future + Send + 'static, T: Send + 'static, { trace!("using database {}/{}", database.database_identity, replica_id); @@ -340,10 +357,9 @@ impl HostController { }); let db = module.replica_ctx().relational_db.clone(); - let result = module.on_module_thread("using_database", move || f(&db)).await?; + let result = module.on_module_thread_async("using_database", move || f(db)).await?; Ok(result) } - /// Update the [`ModuleHost`] identified by `replica_id` to the given /// program. /// @@ -869,8 +885,10 @@ impl Host { scheduler_starter.start(&module_host)?; let disk_metrics_recorder_task = tokio::spawn(metric_reporter(replica_ctx.clone())).abort_handle(); + let module = watch::Sender::new(module_host); + replica_ctx.subscriptions.init(module.subscribe()); Ok(Host { - module: watch::Sender::new(module_host), + module, replica_ctx, scheduler, disk_metrics_recorder_task, diff --git a/crates/core/src/host/instance_env.rs b/crates/core/src/host/instance_env.rs index 76b7c6d8882..d43b4b8e739 100644 --- a/crates/core/src/host/instance_env.rs +++ b/crates/core/src/host/instance_env.rs @@ -9,9 +9,9 @@ use core::mem; use parking_lot::{Mutex, MutexGuard}; use smallvec::SmallVec; use spacetimedb_datastore::locking_tx_datastore::state_view::StateView; -use spacetimedb_datastore::locking_tx_datastore::MutTxId; +use spacetimedb_datastore::locking_tx_datastore::{MutTxId, ViewCall}; use spacetimedb_lib::{ConnectionId, Identity, Timestamp}; -use spacetimedb_primitives::{ColId, ColList, IndexId, TableId, ViewId}; +use spacetimedb_primitives::{ColId, ColList, IndexId, TableId}; use spacetimedb_sats::{ bsatn::{self, ToBsatn}, buffer::{CountWriter, TeeWriter}, @@ -31,7 +31,7 @@ pub struct InstanceEnv { pub tx: TxSlot, /// The timestamp the current reducer began running. pub start_time: Timestamp, - pub view_id: Option, + pub view: Option, } #[derive(Clone, Default)] @@ -173,7 +173,7 @@ impl InstanceEnv { scheduler, tx: TxSlot::default(), start_time: Timestamp::now(), - view_id: None, + view: None, } } @@ -182,15 +182,16 @@ impl InstanceEnv { &self.replica_ctx.database.database_identity } - /// Signal to this `InstanceEnv` that a reducer or procedure call is beginning. + /// Signal to this `InstanceEnv` that a reducer, procedure call is beginning. pub fn start_funcall(&mut self, ts: Timestamp) { self.start_time = ts; - self.view_id = None; + self.view = None; } - /// Signal to this `InstanceEnv` that a we're going to execute a view and compute its read set. - pub fn start_view(&mut self, view_id: ViewId) { - self.view_id = Some(view_id); + /// Signal to this `InstanceEnv` that a view is starting. + pub fn start_view(&mut self, ts: Timestamp, view: ViewCall) { + self.start_time = ts; + self.view = Some(view); } fn get_tx(&self) -> Result + '_, GetTxError> { @@ -471,7 +472,7 @@ impl InstanceEnv { stdb.table_row_count_mut(tx, table_id) .ok_or(NodesError::TableNotFound) .inspect(|_| { - tx.record_table_scan(self.view_id, table_id); + tx.record_table_scan(self.view.clone(), table_id); }) } @@ -496,7 +497,7 @@ impl InstanceEnv { &mut bytes_scanned, ); - tx.record_table_scan(self.view_id, table_id); + tx.record_table_scan(self.view.clone(), table_id); tx.metrics.rows_scanned += rows_scanned; tx.metrics.bytes_scanned += bytes_scanned; @@ -527,7 +528,7 @@ impl InstanceEnv { // Scan the index and serialize rows to bsatn let chunks = ChunkedWriter::collect_iter(pool, iter, &mut rows_scanned, &mut bytes_scanned); - tx.record_index_scan(self.view_id, table_id, index_id, lower, upper); + tx.record_index_scan(self.view.clone(), table_id, index_id, lower, upper); tx.metrics.index_seeks += 1; tx.metrics.rows_scanned += rows_scanned; @@ -664,7 +665,7 @@ mod test { scheduler, tx: TxSlot::default(), start_time: Timestamp::now(), - view_id: None, + view: None, }, runtime, )) diff --git a/crates/core/src/host/mod.rs b/crates/core/src/host/mod.rs index 8cbcab7b926..d4f321720c3 100644 --- a/crates/core/src/host/mod.rs +++ b/crates/core/src/host/mod.rs @@ -132,6 +132,15 @@ pub struct InvalidProcedureArguments( InvalidFunctionArguments, ); +/// Newtype over [`InvalidFunctionArguments`] which renders with the word "view". +#[derive(thiserror::Error, Debug)] +#[error("invalid arguments for view {}: {}", .0.function_name, .0.err)] +pub struct InvalidViewArguments( + #[from] + #[source] + InvalidFunctionArguments, +); + fn from_json_seed<'de, T: serde::de::DeserializeSeed<'de>>(s: &'de str, seed: T) -> anyhow::Result { let mut de = serde_json::Deserializer::from_str(s); let mut track = serde_path_to_error::Track::new(); diff --git a/crates/core/src/host/module_host.rs b/crates/core/src/host/module_host.rs index 21304cde740..fc090ddcfec 100644 --- a/crates/core/src/host/module_host.rs +++ b/crates/core/src/host/module_host.rs @@ -10,7 +10,8 @@ use crate::energy::EnergyQuanta; use crate::error::DBError; use crate::estimation::estimate_rows_scanned; use crate::hash::Hash; -use crate::host::InvalidFunctionArguments; +use crate::host::host_controller::ViewOutcome; +use crate::host::{InvalidFunctionArguments, InvalidViewArguments}; use crate::identity::Identity; use crate::messages::control_db::{Database, HostType}; use crate::module_host_context::ModuleCreationContext; @@ -36,6 +37,7 @@ use spacetimedb_auth::identity::ConnectionAuthCtx; use spacetimedb_client_api_messages::websocket::{ByteListLen, Compression, OneOffTable, QueryUpdate}; use spacetimedb_data_structures::error_stream::ErrorStream; use spacetimedb_data_structures::map::{HashCollectionExt as _, IntMap}; +use spacetimedb_datastore::error::DatastoreError; use spacetimedb_datastore::execution_context::{ExecutionContext, ReducerContext, Workload, WorkloadType}; use spacetimedb_datastore::locking_tx_datastore::MutTxId; use spacetimedb_datastore::system_tables::{ST_CLIENT_ID, ST_CONNECTION_CREDENTIALS_ID, ST_VIEW_SUB_ID}; @@ -47,9 +49,9 @@ use spacetimedb_lib::identity::{AuthCtx, RequestId}; use spacetimedb_lib::metrics::ExecutionMetrics; use spacetimedb_lib::ConnectionId; use spacetimedb_lib::Timestamp; -use spacetimedb_primitives::{ProcedureId, TableId}; +use spacetimedb_primitives::{ProcedureId, TableId, ViewDatabaseId, ViewId}; use spacetimedb_query::compile_subscription; -use spacetimedb_sats::ProductValue; +use spacetimedb_sats::{AlgebraicTypeRef, ProductValue}; use spacetimedb_schema::auto_migrate::{AutoMigrateError, MigrationPolicy}; use spacetimedb_schema::def::deserialize::ArgsSeed; use spacetimedb_schema::def::{ModuleDef, ProcedureDef, ReducerDef, TableDef, ViewDef}; @@ -341,7 +343,7 @@ pub enum Instance { } impl Module { - fn replica_ctx(&self) -> &Arc { + pub fn replica_ctx(&self) -> &Arc { match self { Module::Wasm(module) => module.replica_ctx(), Module::Js(module) => module.replica_ctx(), @@ -403,6 +405,13 @@ impl Instance { } } + fn call_view(&mut self, tx: MutTxId, params: CallViewParams) -> ViewCallResult { + match self { + Instance::Wasm(inst) => inst.call_view(tx, params), + Instance::Js(_inst) => unimplemented!("JS views are not implemented yet"), + } + } + async fn call_procedure(&mut self, params: CallProcedureParams) -> Result { match self { Instance::Wasm(inst) => inst.call_procedure(params).await, @@ -528,6 +537,21 @@ pub struct CallReducerParams { pub args: ArgsTuple, } +pub struct CallViewParams { + pub timestamp: Timestamp, + pub caller_identity: Identity, + pub caller_connection_id: Option, + pub view_id: ViewId, + pub view_db_id: ViewDatabaseId, + pub args: ArgsTuple, + + /// The reference of return type of the view, used for deserializing the view call result. + /// This type information is obtained from the [`ViewDef::product_type_ref`]. + pub return_type: AlgebraicTypeRef, + /// Whether the view is being called anonymously (i.e., without a client identity). + pub is_anonymous: bool, +} + pub struct CallProcedureParams { pub timestamp: Timestamp, pub caller_identity: Identity, @@ -622,7 +646,7 @@ impl ModuleInstanceManager { #[derive(Clone)] pub struct ModuleHost { pub info: Arc, - module: Arc, + pub module: Arc, /// Called whenever a reducer call on this host panics. on_panic: Arc, instance_manager: Arc>, @@ -690,6 +714,27 @@ pub enum ReducerCallError { LifecycleReducer(Lifecycle), } +pub struct ViewCallResult { + pub outcome: ViewOutcome, + pub tx: MutTxId, + pub energy_used: EnergyQuanta, + pub execution_duration: Duration, +} + +#[derive(thiserror::Error, Debug)] +pub enum ViewCallError { + #[error(transparent)] + Args(#[from] InvalidViewArguments), + #[error(transparent)] + NoSuchModule(#[from] NoSuchModule), + #[error("no such view")] + NoSuchView, + #[error("missing client connection for view call trigged by subscription")] + MissingClientConnection, + #[error("DB error during view call: {0}")] + DatastoreError(#[from] DatastoreError), +} + #[derive(thiserror::Error, Debug)] pub enum ProcedureCallError { #[error(transparent)] @@ -800,6 +845,29 @@ impl ModuleHost { Ok(res) } + /// Run an async function on the JobThread for this module. + /// Similar to `on_module_thread`, but for async functions. + pub async fn on_module_thread_async(&self, label: &str, f: Fun) -> Result + where + Fun: (FnOnce() -> Fut) + Send + 'static, + Fut: Future + Send + 'static, + R: Send + 'static, + { + self.guard_closed()?; + + let timer_guard = self.start_call_timer(label); + + let res = self + .executor + .run_job(async move { + drop(timer_guard); + f().await + }) + .await; + + Ok(res) + } + fn start_call_timer(&self, label: &str) -> ScopeGuard<(), impl FnOnce(())> { // Record the time until our function starts running. let queue_timer = WORKER_METRICS @@ -826,7 +894,7 @@ impl ModuleHost { }) } - async fn call_async_with_instance(&self, label: &str, f: Fun) -> Result + pub async fn call_async_with_instance(&self, label: &str, f: Fun) -> Result where Fun: (FnOnce(Instance) -> Fut) + Send + 'static, Fut: Future + Send + 'static, @@ -1419,6 +1487,81 @@ impl ModuleHost { .await? } + pub async fn call_view( + &self, + tx: MutTxId, + view_name: &str, + args: FunctionArgs, + caller_identity: Identity, + caller_connection_id: Option, + ) -> Result { + let (view_id, view_def) = self + .info + .module_def + .view_full(view_name) + .ok_or(ViewCallError::NoSuchView)?; + + let view_seed = ArgsSeed(self.info.module_def.typespace().with_type(view_def)); + let args = args.into_tuple(view_seed).map_err(InvalidViewArguments)?; + + let res = self + .call_view_inner( + tx, + view_id, + view_def, + args.clone(), + caller_identity, + caller_connection_id, + ) + .await; + + let log_message = match &res { + Err(ViewCallError::NoSuchView) => Some(no_such_function_log_message("view", view_name)), + Err(ViewCallError::Args(_)) => Some(args_error_log_message("view", view_name)), + _ => None, + }; + + if let Some(log_message) = log_message { + self.inject_logs(LogLevel::Error, view_name, &log_message) + } + + res + } + + async fn call_view_inner( + &self, + tx: MutTxId, + view_id: ViewId, + view_def: &ViewDef, + args: ArgsTuple, + caller_identity: Identity, + caller_connection_id: Option, + ) -> Result { + let return_type = view_def.product_type_ref; + let is_anonymous = view_def.is_anonymous; + let view_db_id = tx + .view_id_from_name(&view_def.name)? + .ok_or_else(|| ViewCallError::NoSuchView)?; + + Ok(self + .call(&view_def.name, move |inst| { + inst.call_view( + tx, + CallViewParams { + timestamp: Timestamp::now(), + view_db_id, + caller_identity, + caller_connection_id, + view_id, + args, + return_type, + is_anonymous, + }, + ) + }) + .await?) + } + pub fn subscribe_to_logs(&self) -> anyhow::Result> { Ok(self.info().log_tx.subscribe()) } diff --git a/crates/core/src/host/v8/budget.rs b/crates/core/src/host/v8/budget.rs index 2a9fc4765e9..db43b9f8387 100644 --- a/crates/core/src/host/v8/budget.rs +++ b/crates/core/src/host/v8/budget.rs @@ -13,7 +13,7 @@ use core::ptr; use core::sync::atomic::Ordering; use core::time::Duration; use core::{ffi::c_void, sync::atomic::AtomicBool}; -use spacetimedb_client_api_messages::energy::ReducerBudget; +use spacetimedb_client_api_messages::energy::FunctionBudget; use std::sync::Arc; use v8::{Isolate, IsolateHandle}; @@ -25,7 +25,7 @@ pub(super) fn with_timeout_and_cb_every( _handle: IsolateHandle, _callback_every: u64, _callback: InterruptCallback, - _budget: ReducerBudget, + _budget: FunctionBudget, logic: impl FnOnce() -> R, ) -> R { // Start the concurrent thread. @@ -71,7 +71,7 @@ fn run_timeout_and_cb_every( handle: IsolateHandle, callback_every: u64, callback: InterruptCallback, - budget: ReducerBudget, + budget: FunctionBudget, ) -> Arc { // When `execution_done_flag` is set, the ticker thread will stop. let execution_done_flag = Arc::new(AtomicBool::new(false)); @@ -107,7 +107,7 @@ fn run_timeout_and_cb_every( } /// Converts a [`ReducerBudget`] to a [`Duration`]. -fn budget_to_duration(_budget: ReducerBudget) -> Duration { +fn budget_to_duration(_budget: FunctionBudget) -> Duration { // TODO(v8): This is fake logic that allows a maximum timeout. // Replace with sensible math. Duration::MAX @@ -115,15 +115,15 @@ fn budget_to_duration(_budget: ReducerBudget) -> Duration { /// Returns [`EnergyStats`] for a reducer given its `budget` /// and the `duration` it took to execute. -pub(super) fn energy_from_elapsed(budget: ReducerBudget, duration: Duration) -> EnergyStats { +pub(super) fn energy_from_elapsed(budget: FunctionBudget, duration: Duration) -> EnergyStats { let used = duration_to_budget(duration); let remaining = budget - used; EnergyStats { budget, remaining } } /// Converts a [`Duration`] to a [`ReducerBudget`]. -fn duration_to_budget(_duration: Duration) -> ReducerBudget { +fn duration_to_budget(_duration: Duration) -> FunctionBudget { // TODO(v8): This is fake logic that allows minimum energy usage. // Replace with sensible math. - ReducerBudget::ZERO + FunctionBudget::ZERO } diff --git a/crates/core/src/host/wasm_common.rs b/crates/core/src/host/wasm_common.rs index 0fbdec0d17b..aad9cf9b52b 100644 --- a/crates/core/src/host/wasm_common.rs +++ b/crates/core/src/host/wasm_common.rs @@ -16,6 +16,10 @@ pub const CALL_REDUCER_DUNDER: &str = "__call_reducer__"; pub const CALL_PROCEDURE_DUNDER: &str = "__call_procedure__"; +pub const CALL_VIEW_DUNDER: &str = "__call_view__"; + +pub const CALL_VIEW_ANON_DUNDER: &str = "__call_view_anon__"; + pub const DESCRIBE_MODULE_DUNDER: &str = "__describe_module__"; /// functions with this prefix run prior to __setup__, initializing global variables and the like diff --git a/crates/core/src/host/wasm_common/module_host_actor.rs b/crates/core/src/host/wasm_common/module_host_actor.rs index 6bf94421f2a..1db46c8aebb 100644 --- a/crates/core/src/host/wasm_common/module_host_actor.rs +++ b/crates/core/src/host/wasm_common/module_host_actor.rs @@ -1,8 +1,10 @@ use bytes::Bytes; use prometheus::{Histogram, IntCounter, IntGauge}; use spacetimedb_lib::db::raw_def::v9::Lifecycle; -use spacetimedb_lib::de::DeserializeSeed; +use spacetimedb_lib::de::DeserializeSeed as _; use spacetimedb_primitives::ProcedureId; +use spacetimedb_primitives::ViewDatabaseId; +use spacetimedb_primitives::ViewId; use spacetimedb_schema::auto_migrate::{MigratePlan, MigrationPolicy, MigrationPolicyError}; use std::future::Future; use std::sync::Arc; @@ -12,11 +14,14 @@ use tracing::span::EnteredSpan; use super::instrumentation::CallTimes; use crate::client::ClientConnectionSender; use crate::database_logger; -use crate::energy::{EnergyMonitor, ReducerBudget, ReducerFingerprint}; +use crate::energy::{EnergyMonitor, FunctionBudget, FunctionFingerprint}; +use crate::host::host_controller::ViewOutcome; use crate::host::instance_env::InstanceEnv; use crate::host::module_common::{build_common_module_from_raw, ModuleCommon}; +use crate::host::module_host::ViewCallResult; use crate::host::module_host::{ - CallProcedureParams, CallReducerParams, DatabaseUpdate, EventStatus, ModuleEvent, ModuleFunctionCall, ModuleInfo, + CallProcedureParams, CallReducerParams, CallViewParams, DatabaseUpdate, EventStatus, ModuleEvent, + ModuleFunctionCall, ModuleInfo, }; use crate::host::{ ArgsTuple, ProcedureCallError, ProcedureCallResult, ReducerCallResult, ReducerId, ReducerOutcome, Scheduler, @@ -61,26 +66,30 @@ pub trait WasmInstance: Send + Sync + 'static { fn instance_env(&self) -> &InstanceEnv; - fn call_reducer(&mut self, op: ReducerOp<'_>, budget: ReducerBudget) -> ExecuteResult; + fn call_reducer(&mut self, op: ReducerOp<'_>, budget: FunctionBudget) -> ExecuteResult; + + fn call_view(&mut self, op: ViewOp<'_>, budget: FunctionBudget) -> ViewExecuteResult; + + fn call_view_anon(&mut self, op: AnonymousViewOp<'_>, budget: FunctionBudget) -> ViewExecuteResult; fn log_traceback(func_type: &str, func: &str, trap: &anyhow::Error); - async fn call_procedure(&mut self, op: ProcedureOp, budget: ReducerBudget) -> ProcedureExecuteResult; + async fn call_procedure(&mut self, op: ProcedureOp, budget: FunctionBudget) -> ProcedureExecuteResult; } pub struct EnergyStats { - pub budget: ReducerBudget, - pub remaining: ReducerBudget, + pub budget: FunctionBudget, + pub remaining: FunctionBudget, } impl EnergyStats { pub const ZERO: Self = Self { - budget: ReducerBudget::ZERO, - remaining: ReducerBudget::ZERO, + budget: FunctionBudget::ZERO, + remaining: FunctionBudget::ZERO, }; /// Returns the used energy amount. - fn used(&self) -> ReducerBudget { + fn used(&self) -> FunctionBudget { (self.budget.get() - self.remaining.get()).into() } } @@ -111,6 +120,12 @@ pub struct ExecuteResult { pub call_result: anyhow::Result, } +pub struct ViewExecuteResult { + pub energy: EnergyStats, + pub timings: ExecutionTimings, + pub memory_allocation: usize, + pub call_result: anyhow::Result, +} pub struct ProcedureExecuteResult { #[allow(unused)] pub energy: EnergyStats, @@ -120,6 +135,40 @@ pub struct ProcedureExecuteResult { pub call_result: anyhow::Result, } +trait FunctionResult { + fn energy(&self) -> &EnergyStats; + fn timings(&self) -> &ExecutionTimings; + fn memory_allocation(&self) -> usize; +} + +impl FunctionResult for ExecuteResult { + fn energy(&self) -> &EnergyStats { + &self.energy + } + + fn timings(&self) -> &ExecutionTimings { + &self.timings + } + + fn memory_allocation(&self) -> usize { + self.memory_allocation + } +} + +impl FunctionResult for ViewExecuteResult { + fn energy(&self) -> &EnergyStats { + &self.energy + } + + fn timings(&self) -> &ExecutionTimings { + &self.timings + } + + fn memory_allocation(&self) -> usize { + self.memory_allocation + } +} + pub struct WasmModuleHostActor { module: T::InstancePre, common: ModuleCommon, @@ -312,6 +361,28 @@ impl WasmModuleInstance { self.trapped = trapped; res } + + pub fn call_view(&mut self, tx: MutTxId, params: CallViewParams) -> ViewCallResult { + let is_anonymous = params.is_anonymous; + let (res, trapped) = self.common.call_view_with_tx( + &self.instance.instance_env().replica_ctx.clone(), + tx, + params, + |ty, fun, err| T::log_traceback(ty, fun, err), + |tx, op: ViewOp<'_>, budget| { + self.instance.instance_env().tx.clone().set(tx, || { + if is_anonymous { + self.instance.call_view_anon(op.into(), budget) + } else { + self.instance.call_view(op, budget) + } + }) + }, + ); + + self.trapped = trapped; + res + } } pub(crate) struct InstanceCommon { @@ -397,7 +468,7 @@ impl InstanceCommon { &mut self, params: CallProcedureParams, log_traceback: impl FnOnce(&str, &str, &anyhow::Error), - vm_call_procedure: impl FnOnce(ProcedureOp, ReducerBudget) -> F, + vm_call_procedure: impl FnOnce(ProcedureOp, FunctionBudget) -> F, ) -> Result { let CallProcedureParams { timestamp, @@ -425,12 +496,11 @@ impl InstanceCommon { timestamp, arg_bytes: args.get_bsatn().clone(), }; - - let energy_fingerprint = ReducerFingerprint { + let energy_fingerprint = FunctionFingerprint { module_hash: self.info.module_hash, module_identity: self.info.owner_identity, caller_identity, - reducer_name: &procedure_def.name, + function_name: &procedure_def.name, }; // TODO(procedure-energy): replace with call to separate function `procedure_budget`. @@ -513,7 +583,7 @@ impl InstanceCommon { tx: Option, params: CallReducerParams, log_traceback: impl FnOnce(&str, &str, &anyhow::Error), - vm_call_reducer: impl FnOnce(MutTxId, ReducerOp<'_>, ReducerBudget) -> (MutTxId, ExecuteResult), + vm_call_reducer: impl FnOnce(MutTxId, ReducerOp<'_>, FunctionBudget) -> (MutTxId, ExecuteResult), ) -> (ReducerCallResult, bool) { let CallReducerParams { timestamp, @@ -529,23 +599,14 @@ impl InstanceCommon { let stdb = &*replica_ctx.relational_db.clone(); let database_identity = replica_ctx.database_identity; - let reducer_def = self.info.module_def.reducer_by_id(reducer_id); + let info = self.info.clone(); + let reducer_def = info.module_def.reducer_by_id(reducer_id); let reducer_name = &*reducer_def.name; - let reducer = reducer_name.to_string(); // Do some `with_label_values`. // TODO(perf, centril): consider caching this. let vm_metrics = VmMetrics::new(&database_identity, reducer_name); - - let _outer_span = start_call_reducer_span(reducer_name, &caller_identity, caller_connection_id_opt); - - let energy_fingerprint = ReducerFingerprint { - module_hash: self.info.module_hash, - module_identity: self.info.owner_identity, - caller_identity, - reducer_name, - }; - let budget = self.energy_monitor.reducer_budget(&energy_fingerprint); + let _outer_span = start_call_function_span(reducer_name, &caller_identity, caller_connection_id_opt); let op = ReducerOp { id: reducer_id, @@ -560,67 +621,40 @@ impl InstanceCommon { let tx = tx.unwrap_or_else(|| stdb.begin_mut_tx(IsolationLevel::Serializable, workload)); let _guard = vm_metrics.timer_guard_for_reducer_plus_query(tx.timer); - let reducer_span = start_run_reducer_span(budget); - - let (mut tx, result) = vm_call_reducer(tx, op, budget); - - let ExecuteResult { - energy, - timings, - memory_allocation, - call_result, - } = result; + let vm_metrics = VmMetrics::new(&database_identity, reducer_name); + let (tx, result) = self.call_function(caller_identity, reducer_name, |budget| { + let (tx, res) = vm_call_reducer(tx, op, budget); + (Some(tx), res) + }); + let mut tx = tx.expect("transaction should be present here"); - let energy_used = energy.used(); + let energy_used = result.energy.used(); let energy_quanta_used = energy_used.into(); + let timings = &result.timings; vm_metrics.report( energy_used.get(), - timings.total_duration, - &timings.wasm_instance_env_call_times, + result.timings.total_duration, + &result.timings.wasm_instance_env_call_times, ); - self.energy_monitor - .record_reducer(&energy_fingerprint, energy_quanta_used, timings.total_duration); - if self.allocated_memory != memory_allocation { - self.metric_wasm_memory_bytes.set(memory_allocation as i64); - self.allocated_memory = memory_allocation; - } - - reducer_span - .record("timings.total_duration", tracing::field::debug(timings.total_duration)) - .record("energy.used", tracing::field::debug(energy_used)); - - maybe_log_long_running_reducer(reducer_name, timings.total_duration); - reducer_span.exit(); - - let mut outer_error = false; - let status = match call_result { + let mut trapped = false; + let status = match result.call_result { Err(err) => { log_traceback("reducer", reducer_name, &err); - - WORKER_METRICS - .wasm_instance_errors - .with_label_values( - &caller_identity, - &self.info.module_hash, - &caller_connection_id, - reducer_name, - ) - .inc(); - // An outer error occurred. // This signifies a logic error in the module rather than a properly // handled bad argument from the caller of a reducer. // For WASM, this will be interpreted as a trap // and that the instance must be discarded. // However, that does not necessarily apply to e.g., V8. - outer_error = true; - - if energy.remaining.get() == 0 { - EventStatus::OutOfEnergy - } else { - EventStatus::Failed("The instance encountered a fatal error.".into()) - } + trapped = true; + + self.handle_outer_error( + &result.energy, + &caller_identity, + &Some(caller_connection_id), + reducer_name, + ) } // We haven't actually committed yet - `commit_and_broadcast_event` will commit // for us and replace this with the actual database update. @@ -648,7 +682,7 @@ impl InstanceCommon { caller_identity, caller_connection_id: caller_connection_id_opt, function_call: ModuleFunctionCall { - reducer, + reducer: reducer_name.to_string(), reducer_id, args, }, @@ -666,10 +700,160 @@ impl InstanceCommon { execution_duration: timings.total_duration, }; - (res, outer_error) + (res, trapped) } -} + fn handle_outer_error( + &mut self, + energy: &EnergyStats, + caller_identity: &Identity, + caller_connection_id: &Option, + reducer_name: &str, + ) -> EventStatus { + WORKER_METRICS + .wasm_instance_errors + .with_label_values( + caller_identity, + &self.info.module_hash, + &caller_connection_id.unwrap_or(ConnectionId::ZERO), + reducer_name, + ) + .inc(); + + if energy.remaining.get() == 0 { + EventStatus::OutOfEnergy + } else { + EventStatus::Failed("The instance encountered a fatal error.".into()) + } + } + + /// Calls a function (reducer, view) and performs energy monitoring. + fn call_function( + &mut self, + caller_identity: Identity, + function_name: &str, + vm_call_function: F, + ) -> (Option, R) + where + F: FnOnce(FunctionBudget) -> (Option, R), + { + let energy_fingerprint = FunctionFingerprint { + module_hash: self.info.module_hash, + module_identity: self.info.owner_identity, + caller_identity, + function_name, + }; + let budget = self.energy_monitor.reducer_budget(&energy_fingerprint); + + let function_span = start_run_function_span(budget); + + let (tx, result) = vm_call_function(budget); + + let energy_used = result.energy().used(); + let energy_quanta_used = energy_used.into(); + let timings = &result.timings(); + let memory_allocation = result.memory_allocation(); + + self.energy_monitor + .record_reducer(&energy_fingerprint, energy_quanta_used, timings.total_duration); + if self.allocated_memory != memory_allocation { + self.metric_wasm_memory_bytes.set(memory_allocation as i64); + self.allocated_memory = memory_allocation; + } + + maybe_log_long_running_function(function_name, timings.total_duration); + + function_span + .record("timings.total_duration", tracing::field::debug(timings.total_duration)) + .record("energy.used", tracing::field::debug(energy_used)); + + (tx, result) + } + + /// Execute a view. + /// + /// Similar to `call_reducer_with_tx`, but for views. + /// unlike to `call_reducer_with_tx`, It does not handle `tx`creation or commit, + /// It returns the updated `tx` instead. + fn call_view_with_tx( + &mut self, + replica_ctx: &ReplicaContext, + tx: MutTxId, + params: CallViewParams, + log_traceback: impl FnOnce(&str, &str, &anyhow::Error), + vm_call_view: impl FnOnce(MutTxId, ViewOp<'_>, FunctionBudget) -> (MutTxId, ViewExecuteResult), + ) -> (ViewCallResult, bool) { + let CallViewParams { + caller_identity, + caller_connection_id, + view_id, + args, + return_type, + timestamp, + view_db_id, + .. + } = params; + + let info = self.info.clone(); + let view_def = info.module_def.view_by_id(view_id); + let view_name = &*view_def.name; + + let _outer_span = start_call_function_span(view_name, &caller_identity, caller_connection_id); + + let op = ViewOp { + id: view_id, + db_id: view_db_id, + name: view_name, + caller_identity: &caller_identity, + args: &args, + timestamp, + }; + + let (tx, result) = self.call_function(caller_identity, view_name, |budget| { + let (tx, res) = vm_call_view(tx, op, budget); + (Some(tx), res) + }); + let mut tx = tx.expect("transaction should be present here"); + + let mut trapped = false; + let outcome = match result.call_result { + Err(err) => { + log_traceback("view", view_name, &err); + trapped = true; + + self.handle_outer_error(&result.energy, &caller_identity, &caller_connection_id, view_name) + .into() + } + Ok(res) => { + let db = &replica_ctx.relational_db.clone(); + db.materialize_view( + &mut tx, + view_name, + args, + return_type, + res, + info.module_def.typespace(), + caller_identity, + ) + .map_err(|err| { + log::info!("view returned error: {err}"); + err + }) + .expect("error updating view result"); + ViewOutcome::Success + } + }; + + let res = ViewCallResult { + outcome, + tx, + energy_used: result.energy.used().into(), + execution_duration: result.timings.total_duration, + }; + + (res, trapped) + } +} /// VM-related metrics for reducer execution. struct VmMetrics { /// The time spent executing a reducer + plus evaluating its subscription queries. @@ -719,13 +903,13 @@ impl VmMetrics { } } -/// Starts the `call_reducer` span. -fn start_call_reducer_span( +/// Starts the `call_function` span. +fn start_call_function_span( reducer_name: &str, caller_identity: &Identity, caller_connection_id_opt: Option, ) -> EnteredSpan { - tracing::trace_span!("call_reducer", + tracing::trace_span!("call_function", reducer_name, %caller_identity, caller_connection_id = caller_connection_id_opt.map(tracing::field::debug), @@ -733,10 +917,10 @@ fn start_call_reducer_span( .entered() } -/// Starts the `run_reducer` span. -fn start_run_reducer_span(budget: ReducerBudget) -> EnteredSpan { +/// Starts the `run_function` span. +fn start_run_function_span(budget: FunctionBudget) -> EnteredSpan { tracing::trace_span!( - "run_reducer", + "run_function", timings.total_duration = tracing::field::Empty, energy.budget = budget.get(), energy.used = tracing::field::Empty, @@ -745,7 +929,7 @@ fn start_run_reducer_span(budget: ReducerBudget) -> EnteredSpan { } /// Logs a tracing message if a reducer doesn't finish in a single frame at 60 FPS. -fn maybe_log_long_running_reducer(reducer_name: &str, total_duration: Duration) { +fn maybe_log_long_running_function(reducer_name: &str, total_duration: Duration) { const FRAME_LEN_60FPS: Duration = Duration::from_secs(1).checked_div(60).unwrap(); if total_duration > FRAME_LEN_60FPS { tracing::debug!( @@ -805,6 +989,48 @@ fn commit_and_broadcast_event( } } +/// Describes a view call in a cheaply shareable way. +#[derive(Clone, Debug)] +pub struct ViewOp<'a> { + pub id: ViewId, + pub db_id: ViewDatabaseId, + pub name: &'a str, + pub args: &'a ArgsTuple, + pub caller_identity: &'a Identity, + pub timestamp: Timestamp, +} + +/// Describes an anonymous view call in a cheaply shareable way. +#[derive(Clone, Debug)] +pub struct AnonymousViewOp<'a> { + pub id: ViewId, + pub db_id: ViewDatabaseId, + pub name: &'a str, + pub args: &'a ArgsTuple, + pub timestamp: Timestamp, +} + +impl<'a> From> for AnonymousViewOp<'a> { + fn from( + ViewOp { + id, + db_id, + name, + args, + timestamp, + .. + }: ViewOp<'a>, + ) -> Self { + Self { + id, + db_id, + name, + args, + timestamp, + } + } +} + /// Describes a reducer call in a cheaply shareable way. #[derive(Clone, Debug)] pub struct ReducerOp<'a> { diff --git a/crates/core/src/host/wasmtime/mod.rs b/crates/core/src/host/wasmtime/mod.rs index dd225ad3398..fb225cf2ccd 100644 --- a/crates/core/src/host/wasmtime/mod.rs +++ b/crates/core/src/host/wasmtime/mod.rs @@ -5,7 +5,7 @@ use anyhow::Context; use spacetimedb_paths::server::{ServerDataDir, WasmtimeCacheDir}; use wasmtime::{self, Engine, Linker, StoreContext, StoreContextMut}; -use crate::energy::{EnergyQuanta, ReducerBudget}; +use crate::energy::{EnergyQuanta, FunctionBudget}; use crate::error::NodesError; use crate::host::module_host::{Instance, ModuleRuntime}; use crate::module_host_context::ModuleCreationContext; @@ -154,8 +154,8 @@ impl WasmtimeFuel { const QUANTA_MULTIPLIER: u64 = 1_000; } -impl From for WasmtimeFuel { - fn from(v: ReducerBudget) -> Self { +impl From for WasmtimeFuel { + fn from(v: FunctionBudget) -> Self { // ReducerBudget being u64 is load-bearing here - if it was u128 and v was ReducerBudget::MAX, // truncating this result would mean that with set_store_fuel(budget.into()), get_store_fuel() // would be wildly different than the original `budget`, and the energy usage for the reducer @@ -164,9 +164,9 @@ impl From for WasmtimeFuel { } } -impl From for ReducerBudget { +impl From for FunctionBudget { fn from(v: WasmtimeFuel) -> Self { - ReducerBudget::new(v.0 * WasmtimeFuel::QUANTA_MULTIPLIER) + FunctionBudget::new(v.0 * WasmtimeFuel::QUANTA_MULTIPLIER) } } diff --git a/crates/core/src/host/wasmtime/wasm_instance_env.rs b/crates/core/src/host/wasmtime/wasm_instance_env.rs index 7aac129306d..9b5f752c7a9 100644 --- a/crates/core/src/host/wasmtime/wasm_instance_env.rs +++ b/crates/core/src/host/wasmtime/wasm_instance_env.rs @@ -9,6 +9,7 @@ use crate::host::wasm_common::{err_to_errno_and_log, RowIterIdx, RowIters, Timin use crate::host::AbiCall; use anyhow::Context as _; use spacetimedb_data_structures::map::IntMap; +use spacetimedb_datastore::locking_tx_datastore::ViewCall; use spacetimedb_lib::{ConnectionId, Timestamp}; use spacetimedb_primitives::{errno, ColId}; use std::future::Future; @@ -103,7 +104,7 @@ pub(super) struct WasmInstanceEnv { /// Track time spent in module-defined spans. timing_spans: TimingSpanSet, - /// The point in time the last, or current, reducer or procedure call started at. + /// The point in time the last, or current, reducer, procedure or view call started at. funcall_start: Instant, /// Track time spent in all wasm instance env calls (aka syscall time). @@ -112,7 +113,7 @@ pub(super) struct WasmInstanceEnv { /// to this tracker. call_times: CallTimes, - /// The name of the last, including current, reducer or procedure to be executed by this environment. + /// The name of the last, including current, reducer, procedure, or view to be executed by this environment. funcall_name: String, /// A pool of unused allocated chunks that can be reused. @@ -125,6 +126,13 @@ const STANDARD_BYTES_SINK: u32 = 1; type WasmResult = Result; type RtResult = anyhow::Result; +/// The type of function call being performed. +pub enum FuncCallType { + Reducer, + Procedure, + View(ViewCall), +} + /// Wraps an `InstanceEnv` with the magic necessary to push /// and pull bytes from webassembly memory. impl WasmInstanceEnv { @@ -224,7 +232,13 @@ impl WasmInstanceEnv { /// /// Returns the handle used by reducers and procedures to read from `args` /// as well as the handle used to write the reducer error message or procedure return value. - pub fn start_funcall(&mut self, name: &str, args: bytes::Bytes, ts: Timestamp) -> (BytesSourceId, u32) { + pub fn start_funcall( + &mut self, + name: &str, + args: bytes::Bytes, + ts: Timestamp, + func_type: FuncCallType, + ) -> (BytesSourceId, u32) { // Create the output sink. // Reducers which fail will write their error message here. // Procedures will write their result here. @@ -234,7 +248,15 @@ impl WasmInstanceEnv { self.funcall_start = Instant::now(); name.clone_into(&mut self.funcall_name); - self.instance_env.start_funcall(ts); + + match func_type { + FuncCallType::Reducer | FuncCallType::Procedure => { + self.instance_env.start_funcall(ts); + } + FuncCallType::View(view) => { + self.instance_env.start_view(ts, view); + } + } (args, errors) } diff --git a/crates/core/src/host/wasmtime/wasmtime_module.rs b/crates/core/src/host/wasmtime/wasmtime_module.rs index 7cae069db93..b6f13bf90ef 100644 --- a/crates/core/src/host/wasmtime/wasmtime_module.rs +++ b/crates/core/src/host/wasmtime/wasmtime_module.rs @@ -2,13 +2,15 @@ use self::module_host_actor::ReducerOp; use super::wasm_instance_env::WasmInstanceEnv; use super::{Mem, WasmtimeFuel, EPOCH_TICKS_PER_SECOND}; -use crate::energy::ReducerBudget; +use crate::energy::FunctionBudget; use crate::host::instance_env::InstanceEnv; use crate::host::module_common::run_describer; -use crate::host::wasm_common::module_host_actor::{DescribeError, InitializationError}; +use crate::host::wasm_common::module_host_actor::{AnonymousViewOp, DescribeError, InitializationError, ViewOp}; use crate::host::wasm_common::*; +use crate::host::wasmtime::wasm_instance_env::FuncCallType; use crate::util::string_from_utf8_lossy_owned; use futures_util::FutureExt; +use spacetimedb_datastore::locking_tx_datastore::ViewCall; use spacetimedb_lib::{ConnectionId, Identity}; use spacetimedb_primitives::errno::HOST_CALL_FAILURE; use wasmtime::{ @@ -40,7 +42,7 @@ impl WasmtimeModule { WasmtimeModule { module } } - pub const IMPLEMENTED_ABI: abi::VersionTuple = abi::VersionTuple::new(10, 3); + pub const IMPLEMENTED_ABI: abi::VersionTuple = abi::VersionTuple::new(10, 4); pub(super) fn link_imports(linker: &mut Linker) -> anyhow::Result<()> { const { assert!(WasmtimeModule::IMPLEMENTED_ABI.major == spacetimedb_lib::MODULE_ABI_MAJOR_VERSION) }; @@ -95,6 +97,18 @@ fn handle_error_sink_code(code: i32, error: Vec) -> Result<(), Box> { } } +/// Handle the return code from a function using a result sink. +/// +/// On success, returns the result bytes. +/// On failure, returns the error message. +fn handle_result_sink_code(code: i32, result: Vec) -> Result, Box> { + match code { + 0 => Ok(result), + CALL_FAILURE => Err(string_from_utf8_lossy_owned(result).into()), + _ => Err("unknown return code".into()), + } +} + const CALL_FAILURE: i32 = HOST_CALL_FAILURE.get() as i32; /// Invoke `typed_func` and assert that it doesn't yield. @@ -142,7 +156,7 @@ impl module_host_actor::WasmInstancePre for WasmtimeModule { }); // Note: this budget is just for initializers - set_store_fuel(&mut store, ReducerBudget::DEFAULT_BUDGET.into()); + set_store_fuel(&mut store, FunctionBudget::DEFAULT_BUDGET.into()); store.set_epoch_deadline(EPOCH_TICKS_PER_SECOND); for preinit in &func_names.preinits { @@ -172,12 +186,16 @@ impl module_host_actor::WasmInstancePre for WasmtimeModule { .expect("no call_reducer"); let call_procedure = get_call_procedure(&mut store, &instance); + let call_view = get_call_view(&mut store, &instance); + let call_view_anon = get_call_view_anon(&mut store, &instance); Ok(WasmtimeInstance { store, instance, call_reducer, call_procedure, + call_view, + call_view_anon, }) } } @@ -210,36 +228,105 @@ fn get_call_procedure(store: &mut Store, instance: &Instance) - ) } +/// Look up the `instance`'s export named by [`CALL_VIEW_DUNDER`]. +/// +/// Similar to [`get_call_procedure`], but for views. +fn get_call_view(store: &mut Store, instance: &Instance) -> Option { + let export = instance.get_export(store.as_context_mut(), CALL_VIEW_DUNDER)?; + Some( + export + .into_func() + .unwrap_or_else(|| panic!("{CALL_VIEW_DUNDER} export is not a function")) + .typed(store) + .unwrap_or_else(|err| panic!("{CALL_VIEW_DUNDER} export is a function with incorrect type: {err}")), + ) +} + +/// Look up the `instance`'s export named by [`CALL_VIEW_ANON_DUNDER`]. +/// +/// Similar to [`get_call_procedure`], but for anonymous views. +fn get_call_view_anon(store: &mut Store, instance: &Instance) -> Option { + let export = instance.get_export(store.as_context_mut(), CALL_VIEW_ANON_DUNDER)?; + Some( + export + .into_func() + .unwrap_or_else(|| panic!("{CALL_VIEW_ANON_DUNDER} export is not a function")) + .typed(store) + .unwrap_or_else(|err| panic!("{CALL_VIEW_ANON_DUNDER} export is a function with incorrect type: {err}")), + ) +} + +// `__call_procedure__` takes the same arguments as `__call_reducer__`. +type CallProcedureType = CallReducerType; + +/// The function signature of `__call_reducer__` type CallReducerType = TypedFunc< ( - // Reducer ID, + // ReducerId u32, - // Sender `Identity` + // sender_0 u64, + // sender_1 u64, + // sender_2 u64, + // sender_3 u64, - // Sender `ConnectionId`, or 0 for none. + // connection_id_0 u64, + // connection_id_1 u64, - // Start timestamp. + // timestamp u64, - // Args byte source. + // byte source id for args u32, - // Errors byte sink. + // byte sink id for return + u32, + ), + i32, +>; + +/// The function signature of `__call_view__` +type CallViewType = TypedFunc< + ( + // ViewId + u32, + // sender_0 + u64, + // sender_1 + u64, + // sender_2 + u64, + // sender_3 + u64, + // byte source id for args + u32, + // byte sink id for return + u32, + ), + i32, +>; + +/// The function signature of `__call_view_anon__` +type CallViewAnonType = TypedFunc< + ( + // ViewId + u32, + // byte source id for args + u32, + // byte sink id for return u32, ), - // Errno. i32, >; -// `__call_procedure__` takes the same arguments as `__call_reducer__`. -type CallProcedureType = CallReducerType; pub struct WasmtimeInstance { store: Store, instance: Instance, call_reducer: CallReducerType, call_procedure: Option, + call_view: Option, + call_view_anon: Option, } #[async_trait::async_trait] @@ -269,7 +356,7 @@ impl module_host_actor::WasmInstance for WasmtimeInstance { } #[tracing::instrument(level = "trace", skip_all)] - fn call_reducer(&mut self, op: ReducerOp<'_>, budget: ReducerBudget) -> module_host_actor::ExecuteResult { + fn call_reducer(&mut self, op: ReducerOp<'_>, budget: FunctionBudget) -> module_host_actor::ExecuteResult { let store = &mut self.store; prepare_store_for_call(store, budget); @@ -281,7 +368,10 @@ impl module_host_actor::WasmInstance for WasmtimeInstance { // Prepare arguments to the reducer + the error sink & start timings. let args_bytes = op.args.get_bsatn().clone(); - let (args_source, errors_sink) = store.data_mut().start_funcall(op.name, args_bytes, op.timestamp); + let (args_source, errors_sink) = + store + .data_mut() + .start_funcall(op.name, args_bytes, op.timestamp, FuncCallType::Reducer); let call_result = call_sync_typed_func( &self.call_reducer, @@ -309,7 +399,7 @@ impl module_host_actor::WasmInstance for WasmtimeInstance { // Compute fuel and heap usage. let remaining_fuel = get_store_fuel(store); - let remaining: ReducerBudget = remaining_fuel.into(); + let remaining: FunctionBudget = remaining_fuel.into(); let energy = module_host_actor::EnergyStats { budget, remaining }; let memory_allocation = store.data().get_mem().memory.data_size(&store); @@ -321,6 +411,126 @@ impl module_host_actor::WasmInstance for WasmtimeInstance { } } + fn call_view(&mut self, op: ViewOp<'_>, budget: FunctionBudget) -> module_host_actor::ViewExecuteResult { + let store = &mut self.store; + prepare_store_for_call(store, budget); + + let view = ViewCall::with_identity(*op.caller_identity, op.db_id, op.args.get_bsatn().clone()); + + // Prepare sender identity and connection ID, as LITTLE-ENDIAN byte arrays. + let [sender_0, sender_1, sender_2, sender_3] = prepare_identity_for_call(*op.caller_identity); + // Prepare arguments to the reducer + the error sink & start timings. + let args_bytes = op.args.get_bsatn().clone(); + + let (args_source, errors_sink) = + store + .data_mut() + .start_funcall(op.name, args_bytes, op.timestamp, FuncCallType::View(view)); + + let Some(call_view) = self.call_view.as_ref() else { + return module_host_actor::ViewExecuteResult { + energy: module_host_actor::EnergyStats::ZERO, + timings: module_host_actor::ExecutionTimings::zero(), + memory_allocation: get_memory_size(store), + call_result: Err(anyhow::anyhow!( + "Module defines view {} but does not export `{}`", + op.name, + CALL_VIEW_DUNDER, + )), + }; + }; + + let call_result = call_sync_typed_func( + call_view, + &mut *store, + ( + op.id.0, + sender_0, + sender_1, + sender_2, + sender_3, + args_source.0, + errors_sink, + ), + ); + + // Signal that this view call is finished. This gets us the timings + // associated to our view call, and clears all of the instance state + // associated to the call. + let (timings, result_bytes) = store.data_mut().finish_funcall(); + + let call_result = call_result + .and_then(|code| handle_result_sink_code(code, result_bytes).map_err(|e| anyhow::anyhow!(e))) + .map(|r| r.into()); + + // Compute fuel and heap usage. + let remaining_fuel = get_store_fuel(store); + let remaining: FunctionBudget = remaining_fuel.into(); + let energy = module_host_actor::EnergyStats { budget, remaining }; + let memory_allocation = store.data().get_mem().memory.data_size(&store); + + module_host_actor::ViewExecuteResult { + energy, + timings, + memory_allocation, + call_result, + } + } + + fn call_view_anon( + &mut self, + op: AnonymousViewOp<'_>, + budget: FunctionBudget, + ) -> module_host_actor::ViewExecuteResult { + let store = &mut self.store; + prepare_store_for_call(store, budget); + + let view = ViewCall::anonymous(op.db_id, op.args.get_bsatn().clone()); + // Prepare arguments to the reducer + the error sink & start timings. + let args_bytes = op.args.get_bsatn().clone(); + + let (args_source, errors_sink) = + store + .data_mut() + .start_funcall(op.name, args_bytes, op.timestamp, FuncCallType::View(view)); + + let Some(call_view_anon) = self.call_view_anon.as_ref() else { + return module_host_actor::ViewExecuteResult { + energy: module_host_actor::EnergyStats::ZERO, + timings: module_host_actor::ExecutionTimings::zero(), + memory_allocation: get_memory_size(store), + call_result: Err(anyhow::anyhow!( + "Module defines anonymous view {} but does not export `{}`", + op.name, + CALL_VIEW_ANON_DUNDER, + )), + }; + }; + + let call_result = call_sync_typed_func(call_view_anon, &mut *store, (op.id.0, args_source.0, errors_sink)); + + // Signal that this view call is finished. This gets us the timings + // associated to our view call, and clears all of the instance state + // associated to the call. + let (timings, result_bytes) = store.data_mut().finish_funcall(); + + let call_result = call_result + .and_then(|code| handle_result_sink_code(code, result_bytes).map_err(|e| anyhow::anyhow!(e))) + .map(|r| r.into()); + // Compute fuel and heap usage. + let remaining_fuel = get_store_fuel(store); + let remaining: FunctionBudget = remaining_fuel.into(); + let energy = module_host_actor::EnergyStats { budget, remaining }; + let memory_allocation = store.data().get_mem().memory.data_size(&store); + + module_host_actor::ViewExecuteResult { + energy, + timings, + memory_allocation, + call_result, + } + } + fn log_traceback(func_type: &str, func: &str, trap: &anyhow::Error) { log_traceback(func_type, func, trap) } @@ -329,7 +539,7 @@ impl module_host_actor::WasmInstance for WasmtimeInstance { async fn call_procedure( &mut self, op: module_host_actor::ProcedureOp, - budget: ReducerBudget, + budget: FunctionBudget, ) -> module_host_actor::ProcedureExecuteResult { let store = &mut self.store; prepare_store_for_call(store, budget); @@ -339,7 +549,10 @@ impl module_host_actor::WasmInstance for WasmtimeInstance { let [conn_id_0, conn_id_1] = prepare_connection_id_for_call(op.caller_connection_id); // Prepare arguments to the reducer + the error sink & start timings. - let (args_source, result_sink) = store.data_mut().start_funcall(&op.name, op.arg_bytes, op.timestamp); + let (args_source, result_sink) = + store + .data_mut() + .start_funcall(&op.name, op.arg_bytes, op.timestamp, FuncCallType::Procedure); let Some(call_procedure) = self.call_procedure.as_ref() else { return module_host_actor::ProcedureExecuteResult { @@ -383,7 +596,7 @@ impl module_host_actor::WasmInstance for WasmtimeInstance { }); let remaining_fuel = get_store_fuel(store); - let remaining = ReducerBudget::from(remaining_fuel); + let remaining = FunctionBudget::from(remaining_fuel); let energy = module_host_actor::EnergyStats { budget, remaining }; let memory_allocation = get_memory_size(store); @@ -405,8 +618,8 @@ fn get_store_fuel(store: &impl AsContext) -> WasmtimeFuel { WasmtimeFuel(store.as_context().get_fuel().unwrap()) } -fn prepare_store_for_call(store: &mut Store, budget: ReducerBudget) { - // note that ReducerBudget being a u64 is load-bearing here - although we convert budget right back into +fn prepare_store_for_call(store: &mut Store, budget: FunctionBudget) { + // note that FunctionBudget being a u64 is load-bearing here - although we convert budget right back into // EnergyQuanta at the end of this function, from_energy_quanta clamps it to a u64 range. // otherwise, we'd return something like `used: i128::MAX - u64::MAX`, which is inaccurate. set_store_fuel(store, budget.into()); @@ -458,7 +671,7 @@ mod tests { &wasmtime::Engine::new(wasmtime::Config::new().consume_fuel(true)).unwrap(), (), ); - let budget = ReducerBudget::DEFAULT_BUDGET; + let budget = FunctionBudget::DEFAULT_BUDGET; set_store_fuel(&mut store, budget.into()); store.set_fuel(store.get_fuel().unwrap() - 10).unwrap(); let remaining: EnergyQuanta = get_store_fuel(&store).into(); diff --git a/crates/core/src/sql/execute.rs b/crates/core/src/sql/execute.rs index b060b7c0641..314e1f8ede3 100644 --- a/crates/core/src/sql/execute.rs +++ b/crates/core/src/sql/execute.rs @@ -7,7 +7,7 @@ use crate::energy::EnergyQuanta; use crate::error::DBError; use crate::estimation::estimate_rows_scanned; use crate::host::module_host::{DatabaseTableUpdate, DatabaseUpdate, EventStatus, ModuleEvent, ModuleFunctionCall}; -use crate::host::ArgsTuple; +use crate::host::{ArgsTuple, ModuleHost}; use crate::subscription::module_subscription_actor::{ModuleSubscriptions, WriteConflict}; use crate::subscription::module_subscription_manager::TransactionOffset; use crate::subscription::tx::DeltaTx; @@ -20,8 +20,8 @@ use spacetimedb_datastore::traits::IsolationLevel; use spacetimedb_expr::statement::Statement; use spacetimedb_lib::identity::AuthCtx; use spacetimedb_lib::metrics::ExecutionMetrics; -use spacetimedb_lib::Timestamp; use spacetimedb_lib::{AlgebraicType, ProductType, ProductValue}; +use spacetimedb_lib::{Identity, Timestamp}; use spacetimedb_query::{compile_sql_stmt, execute_dml_stmt, execute_select_stmt}; use spacetimedb_schema::relation::FieldName; use spacetimedb_vm::eval::run_ast; @@ -186,21 +186,51 @@ pub struct SqlResult { } /// Run the `SQL` string using the `auth` credentials -pub fn run( +pub async fn run( db: &RelationalDB, sql_text: &str, auth: AuthCtx, subs: Option<&ModuleSubscriptions>, + module: Option<&ModuleHost>, + caller_identity: Identity, head: &mut Vec<(Box, AlgebraicType)>, ) -> Result { // We parse the sql statement in a mutable transaction. // If it turns out to be a query, we downgrade the tx. - let (tx, stmt) = db.with_auto_rollback(db.begin_mut_tx(IsolationLevel::Serializable, Workload::Sql), |tx| { + let (mut tx, stmt) = db.with_auto_rollback(db.begin_mut_tx(IsolationLevel::Serializable, Workload::Sql), |tx| { compile_sql_stmt(sql_text, &SchemaViewer::new(tx, &auth), &auth) })?; let mut metrics = ExecutionMetrics::default(); + for (view_name, args) in stmt.views() { + let (is_materialized, args) = tx + .is_materialized(view_name, args, caller_identity) + .map_err(|e| DBError::Other(anyhow!("Failed to check memoized view: {e}")))?; + + // Skip if already memoized + if is_materialized { + continue; + } + + let module = module + .as_ref() + .ok_or_else(|| anyhow!("Cannot execute view `{view_name}` without module context"))?; + + let res = module + .call_view( + tx, + view_name, + crate::host::FunctionArgs::Bsatn(args), + caller_identity, + None, + ) + .await + .map_err(|e| DBError::Other(anyhow!("Failed to execute view `{view_name}`: {e}")))?; + + tx = res.tx; + } + match stmt { Statement::Select(stmt) => { // Up to this point, the tx has been read-only, @@ -359,8 +389,18 @@ pub(crate) mod tests { /// Short-cut for simplify test execution pub(crate) fn run_for_testing(db: &RelationalDB, sql_text: &str) -> Result, DBError> { - let (subs, _runtime) = ModuleSubscriptions::for_test_new_runtime(Arc::new(db.clone())); - run(db, sql_text, AuthCtx::for_testing(), Some(&subs), &mut vec![]).map(|x| x.rows) + let (subs, runtime) = ModuleSubscriptions::for_test_new_runtime(Arc::new(db.clone())); + runtime + .block_on(run( + db, + sql_text, + AuthCtx::for_testing(), + Some(&subs), + None, + Identity::ZERO, + &mut vec![], + )) + .map(|x| x.rows) } fn create_data(total_rows: u64) -> ResultTest<(TestDB, MemTable)> { @@ -503,14 +543,15 @@ pub(crate) mod tests { } /// Assert this query returns the expected rows for this user - fn assert_query_results( + async fn assert_query_results( db: &RelationalDB, sql: &str, auth: &AuthCtx, expected: impl IntoIterator, ) { assert_eq!( - run(db, sql, *auth, None, &mut vec![]) + run(db, sql, *auth, None, None, Identity::ZERO, &mut vec![]) + .await .unwrap() .rows .into_iter() @@ -522,8 +563,8 @@ pub(crate) mod tests { } /// Test a query that uses a multi-column index - #[test] - fn test_multi_column_index() -> anyhow::Result<()> { + #[tokio::test] + async fn test_multi_column_index() -> anyhow::Result<()> { let db = TestDB::in_memory()?; let schema = [ @@ -549,14 +590,15 @@ pub(crate) mod tests { "select * from t where c = 1 and b = 2", &AuthCtx::for_testing(), [product![1_u64, 2_u64, 1_u64]], - ); + ) + .await; Ok(()) } /// Test querying a table with RLS rules - #[test] - fn test_rls_rules() -> anyhow::Result<()> { + #[tokio::test] + async fn test_rls_rules() -> anyhow::Result<()> { let db = TestDB::in_memory()?; let id_for_a = identity_from_u8(1); @@ -601,42 +643,48 @@ pub(crate) mod tests { "select * from users", &auth_for_a, [product![id_for_a]], - ); + ) + .await; assert_query_results( &db, // Should only return the identity for sender "b" "select * from users", &auth_for_b, [product![id_for_b]], - ); + ) + .await; assert_query_results( &db, // Should only return the orders for sender "a" "select * from users where identity = :sender", &auth_for_a, [product![id_for_a]], - ); + ) + .await; assert_query_results( &db, // Should only return the orders for sender "b" "select * from users where identity = :sender", &auth_for_b, [product![id_for_b]], - ); + ) + .await; assert_query_results( &db, // Should only return the orders for sender "a" &format!("select * from users where identity = 0x{}", id_for_a.to_hex()), &auth_for_a, [product![id_for_a]], - ); + ) + .await; assert_query_results( &db, // Should only return the orders for sender "b" &format!("select * from users where identity = 0x{}", id_for_b.to_hex()), &auth_for_b, [product![id_for_b]], - ); + ) + .await; assert_query_results( &db, // Should only return the orders for sender "a" @@ -646,7 +694,8 @@ pub(crate) mod tests { ), &auth_for_a, [product![id_for_a]], - ); + ) + .await; assert_query_results( &db, // Should only return the orders for sender "b" @@ -656,7 +705,8 @@ pub(crate) mod tests { ), &auth_for_b, [product![id_for_b]], - ); + ) + .await; assert_query_results( &db, // Should only return the orders for sender "a" @@ -666,7 +716,8 @@ pub(crate) mod tests { ), &auth_for_a, [product![id_for_a]], - ); + ) + .await; assert_query_results( &db, // Should only return the orders for sender "b" @@ -676,7 +727,8 @@ pub(crate) mod tests { ), &auth_for_b, [product![id_for_b]], - ); + ) + .await; assert_query_results( &db, // Should not return any rows. @@ -684,7 +736,8 @@ pub(crate) mod tests { &format!("select * from users where identity = 0x{}", id_for_b.to_hex()), &auth_for_a, [], - ); + ) + .await; assert_query_results( &db, // Should not return any rows. @@ -692,7 +745,8 @@ pub(crate) mod tests { &format!("select * from users where identity = 0x{}", id_for_a.to_hex()), &auth_for_b, [], - ); + ) + .await; assert_query_results( &db, // Should not return any rows. @@ -703,7 +757,8 @@ pub(crate) mod tests { ), &auth_for_a, [], - ); + ) + .await; assert_query_results( &db, // Should not return any rows. @@ -714,56 +769,63 @@ pub(crate) mod tests { ), &auth_for_b, [], - ); + ) + .await; assert_query_results( &db, // Should only return the orders for sender "a" "select * from sales", &auth_for_a, [product![1u64, id_for_a], product![3u64, id_for_a]], - ); + ) + .await; assert_query_results( &db, // Should only return the orders for sender "b" "select * from sales", &auth_for_b, [product![2u64, id_for_b], product![4u64, id_for_b]], - ); + ) + .await; assert_query_results( &db, // Should only return the orders for sender "a" "select s.* from users u join sales s on u.identity = s.customer", &auth_for_a, [product![1u64, id_for_a], product![3u64, id_for_a]], - ); + ) + .await; assert_query_results( &db, // Should only return the orders for sender "b" "select s.* from users u join sales s on u.identity = s.customer", &auth_for_b, [product![2u64, id_for_b], product![4u64, id_for_b]], - ); + ) + .await; assert_query_results( &db, // Should only return the orders for sender "a" "select s.* from users u join sales s on u.identity = s.customer where u.identity = :sender", &auth_for_a, [product![1u64, id_for_a], product![3u64, id_for_a]], - ); + ) + .await; assert_query_results( &db, // Should only return the orders for sender "b" "select s.* from users u join sales s on u.identity = s.customer where u.identity = :sender", &auth_for_b, [product![2u64, id_for_b], product![4u64, id_for_b]], - ); + ) + .await; Ok(()) } /// Test querying tables with multiple levels of RLS rules - #[test] - fn test_nested_rls_rules() -> anyhow::Result<()> { + #[tokio::test] + async fn test_nested_rls_rules() -> anyhow::Result<()> { let db = TestDB::in_memory()?; let id_for_a = identity_from_u8(1); @@ -814,21 +876,24 @@ pub(crate) mod tests { &auth_for_a, // Identity "a" is not an admin [], - ); + ) + .await; assert_query_results( &db, "select * from admins", &auth_for_b, // Identity "b" is not an admin [], - ); + ) + .await; assert_query_results( &db, "select * from admins", &auth_for_c, // Identity "c" is an admin [product![id_for_c]], - ); + ) + .await; assert_query_results( &db, @@ -836,21 +901,24 @@ pub(crate) mod tests { &auth_for_a, // Identity "a" can only see its own user vec![product![id_for_a]], - ); + ) + .await; assert_query_results( &db, "select * from users", &auth_for_b, // Identity "b" can only see its own user vec![product![id_for_b]], - ); + ) + .await; assert_query_results( &db, "select * from users", &auth_for_c, // Identity "c" is an admin so it can see everyone's users [product![id_for_a], product![id_for_b], product![id_for_c]], - ); + ) + .await; assert_query_results( &db, @@ -858,28 +926,31 @@ pub(crate) mod tests { &auth_for_a, // Identity "a" can only see its own orders [product![1u64, 1u64, id_for_a]], - ); + ) + .await; assert_query_results( &db, "select * from sales", &auth_for_b, // Identity "b" can only see its own orders [product![2u64, 2u64, id_for_b]], - ); + ) + .await; assert_query_results( &db, "select * from sales", &auth_for_c, // Identity "c" is an admin so it can see everyone's orders [product![1u64, 1u64, id_for_a], product![2u64, 2u64, id_for_b]], - ); + ) + .await; Ok(()) } /// Test projecting columns from both tables in join - #[test] - fn test_project_join() -> anyhow::Result<()> { + #[tokio::test] + async fn test_project_join() -> anyhow::Result<()> { let db = TestDB::in_memory()?; let t_schema = [("id", AlgebraicType::U8), ("x", AlgebraicType::U8)]; @@ -899,17 +970,18 @@ pub(crate) mod tests { "select t.x, s.y from t join s on t.id = s.id", &auth, [product![2_u8, 3_u8]], - ); + ) + .await; Ok(()) } - #[test] - fn test_view() -> anyhow::Result<()> { + #[tokio::test] + async fn test_view() -> anyhow::Result<()> { let db = TestDB::in_memory()?; let schema = [("a", AlgebraicType::U8), ("b", AlgebraicType::U8)]; - let table_id = tests_utils::create_view_for_test(&db, "my_view", &schema, false)?; + let (_, table_id) = tests_utils::create_view_for_test(&db, "my_view", &schema, false)?; with_auto_commit(&db, |tx| -> Result<_, DBError> { tests_utils::insert_into_view(&db, tx, table_id, Some(identity_from_u8(1)), product![0u8, 1u8])?; @@ -920,17 +992,17 @@ pub(crate) mod tests { let id = identity_from_u8(2); let auth = AuthCtx::new(Identity::ZERO, id); - assert_query_results(&db, "select * from my_view", &auth, [product![0u8, 2u8]]); + assert_query_results(&db, "select * from my_view", &auth, [product![0u8, 2u8]]).await; Ok(()) } - #[test] - fn test_anonymous_view() -> anyhow::Result<()> { + #[tokio::test] + async fn test_anonymous_view() -> anyhow::Result<()> { let db = TestDB::in_memory()?; let schema = [("a", AlgebraicType::U8), ("b", AlgebraicType::U8)]; - let table_id = tests_utils::create_view_for_test(&db, "my_view", &schema, true)?; + let (_, table_id) = tests_utils::create_view_for_test(&db, "my_view", &schema, true)?; with_auto_commit(&db, |tx| -> Result<_, DBError> { tests_utils::insert_into_view(&db, tx, table_id, None, product![0u8, 1u8])?; @@ -941,17 +1013,17 @@ pub(crate) mod tests { let id = identity_from_u8(1); let auth = AuthCtx::new(Identity::ZERO, id); - assert_query_results(&db, "select b from my_view", &auth, [product![1u8], product![2u8]]); + assert_query_results(&db, "select b from my_view", &auth, [product![1u8], product![2u8]]).await; Ok(()) } - #[test] - fn test_view_join_table() -> anyhow::Result<()> { + #[tokio::test] + async fn test_view_join_table() -> anyhow::Result<()> { let db = TestDB::in_memory()?; let schema = [("a", AlgebraicType::U8), ("b", AlgebraicType::U8)]; - let v_id = tests_utils::create_view_for_test(&db, "v", &schema, false)?; + let (_, v_id) = tests_utils::create_view_for_test(&db, "v", &schema, false)?; let schema = [("c", AlgebraicType::U8), ("d", AlgebraicType::U8)]; let t_id = db.create_table_for_test("t", &schema, &[0.into()])?; @@ -972,44 +1044,49 @@ pub(crate) mod tests { "select t.* from v join t on v.a = t.c", &auth, [product![1u8, 4u8]], - ); + ) + .await; assert_query_results( &db, "select v.* from v join t on v.a = t.c", &auth, [product![1u8, 2u8]], - ); + ) + .await; assert_query_results( &db, "select v.* from v join t where v.a = t.c", &auth, [product![1u8, 2u8]], - ); + ) + .await; assert_query_results( &db, "select v.b as b, t.d as d from v join t on v.a = t.c", &auth, [product![2u8, 4u8]], - ); + ) + .await; assert_query_results( &db, "select v.b as b, t.d as d from v join t where v.a = t.c", &auth, [product![2u8, 4u8]], - ); + ) + .await; Ok(()) } - #[test] - fn test_view_join_view() -> anyhow::Result<()> { + #[tokio::test] + async fn test_view_join_view() -> anyhow::Result<()> { let db = TestDB::in_memory()?; let schema = [("a", AlgebraicType::U8), ("b", AlgebraicType::U8)]; - let u_id = tests_utils::create_view_for_test(&db, "u", &schema, false)?; + let (_, u_id) = tests_utils::create_view_for_test(&db, "u", &schema, false)?; let schema = [("c", AlgebraicType::U8), ("d", AlgebraicType::U8)]; - let v_id = tests_utils::create_view_for_test(&db, "v", &schema, false)?; + let (_, v_id) = tests_utils::create_view_for_test(&db, "v", &schema, false)?; with_auto_commit(&db, |tx| -> Result<_, DBError> { tests_utils::insert_into_view(&db, tx, u_id, Some(identity_from_u8(1)), product![0u8, 1u8])?; @@ -1027,31 +1104,36 @@ pub(crate) mod tests { "select u.* from u join v on u.a = v.c", &auth, [product![1u8, 2u8]], - ); + ) + .await; assert_query_results( &db, "select v.* from u join v on u.a = v.c", &auth, [product![1u8, 4u8]], - ); + ) + .await; assert_query_results( &db, "select v.* from u join v where u.a = v.c", &auth, [product![1u8, 4u8]], - ); + ) + .await; assert_query_results( &db, "select u.b as b, v.d as d from u join v on u.a = v.c", &auth, [product![2u8, 4u8]], - ); + ) + .await; assert_query_results( &db, "select u.b as b, v.d as d from u join v where u.a = v.c", &auth, [product![2u8, 4u8]], - ); + ) + .await; Ok(()) } @@ -1419,26 +1501,38 @@ pub(crate) mod tests { let internal_auth = AuthCtx::new(server, server); let external_auth = AuthCtx::new(server, client); - let run = |db, sql, auth, subs| run(db, sql, auth, subs, &mut vec![]); + let tmp_vec = Vec::new(); + + let rt = db.runtime().expect("runtime should be there"); + let run = |db, sql, auth, subs, mut tmp_vec| { + rt.block_on(run(db, sql, auth, subs, None, Identity::ZERO, &mut tmp_vec)) + }; // No row limit, both queries pass. - assert!(run(&db, "SELECT * FROM T", internal_auth, None).is_ok()); - assert!(run(&db, "SELECT * FROM T", external_auth, None).is_ok()); + assert!(run(&db, "SELECT * FROM T", internal_auth, None, tmp_vec.clone()).is_ok()); + assert!(run(&db, "SELECT * FROM T", external_auth, None, tmp_vec.clone()).is_ok()); // Set row limit. - assert!(run(&db, "SET row_limit = 4", internal_auth, None).is_ok()); + assert!(run(&db, "SET row_limit = 4", internal_auth, None, tmp_vec.clone()).is_ok()); // External query fails. - assert!(run(&db, "SELECT * FROM T", internal_auth, None).is_ok()); - assert!(run(&db, "SELECT * FROM T", external_auth, None).is_err()); + assert!(run(&db, "SELECT * FROM T", internal_auth, None, tmp_vec.clone()).is_ok()); + assert!(run(&db, "SELECT * FROM T", external_auth, None, tmp_vec.clone()).is_err()); // Increase row limit. - assert!(run(&db, "DELETE FROM st_var WHERE name = 'row_limit'", internal_auth, None).is_ok()); - assert!(run(&db, "SET row_limit = 5", internal_auth, None).is_ok()); + assert!(run( + &db, + "DELETE FROM st_var WHERE name = 'row_limit'", + internal_auth, + None, + tmp_vec.clone() + ) + .is_ok()); + assert!(run(&db, "SET row_limit = 5", internal_auth, None, tmp_vec.clone()).is_ok()); // Both queries pass. - assert!(run(&db, "SELECT * FROM T", internal_auth, None).is_ok()); - assert!(run(&db, "SELECT * FROM T", external_auth, None).is_ok()); + assert!(run(&db, "SELECT * FROM T", internal_auth, None, tmp_vec.clone()).is_ok()); + assert!(run(&db, "SELECT * FROM T", external_auth, None, tmp_vec.clone()).is_ok()); Ok(()) } @@ -1456,14 +1550,19 @@ pub(crate) mod tests { Ok(()) })?; + let rt = db.runtime().expect("runtime should be there"); + let server = Identity::from_claims("issuer", "server"); let internal_auth = AuthCtx::new(server, server); - let run = |db, sql, auth, subs| run(db, sql, auth, subs, &mut vec![]); + let tmp_vec = Vec::new(); + let run = |db, sql, auth, subs, mut tmp_vec| async move { + run(db, sql, auth, subs, None, Identity::ZERO, &mut tmp_vec).await + }; let check = |db, sql, auth, metrics: ExecutionMetrics| { - let result = run(db, sql, auth, None)?; + let result = rt.block_on(run(db, sql, auth, None, tmp_vec.clone()))?; assert_eq!(result.rows, vec![]); assert_eq!(result.metrics.rows_inserted, metrics.rows_inserted); assert_eq!(result.metrics.rows_deleted, metrics.rows_deleted); @@ -1488,7 +1587,8 @@ pub(crate) mod tests { check(&db, "INSERT INTO T (a) VALUES (5)", internal_auth, ins)?; check(&db, "UPDATE T SET a = 2", internal_auth, upd)?; assert_eq!( - run(&db, "SELECT * FROM T", internal_auth, None)?.rows, + rt.block_on(run(&db, "SELECT * FROM T", internal_auth, None, tmp_vec.clone()))? + .rows, vec![product!(2u8)] ); check(&db, "DELETE FROM T", internal_auth, del)?; diff --git a/crates/core/src/subscription/module_subscription_actor.rs b/crates/core/src/subscription/module_subscription_actor.rs index e92c5d35e48..88abc8ca6c2 100644 --- a/crates/core/src/subscription/module_subscription_actor.rs +++ b/crates/core/src/subscription/module_subscription_actor.rs @@ -15,6 +15,7 @@ use crate::db::relational_db::{MutTx, RelationalDB, Tx}; use crate::error::DBError; use crate::estimation::estimate_rows_scanned; use crate::host::module_host::{DatabaseUpdate, EventStatus, ModuleEvent}; +use crate::host::{FunctionArgs, ModuleHost}; use crate::messages::websocket::Subscribe; use crate::subscription::query::is_subscribe_to_all_tables; use crate::subscription::{collect_table_update_for_view, execute_plans}; @@ -31,15 +32,16 @@ use spacetimedb_client_api_messages::websocket::{ use spacetimedb_datastore::db_metrics::DB_METRICS; use spacetimedb_datastore::execution_context::{Workload, WorkloadType}; use spacetimedb_datastore::locking_tx_datastore::datastore::TxMetrics; -use spacetimedb_datastore::locking_tx_datastore::TxId; +use spacetimedb_datastore::locking_tx_datastore::{MutTxId, TxId}; use spacetimedb_datastore::traits::TxData; use spacetimedb_durability::TxOffset; use spacetimedb_execution::pipelined::{PipelinedProject, ViewProject}; use spacetimedb_lib::identity::AuthCtx; use spacetimedb_lib::metrics::ExecutionMetrics; use spacetimedb_lib::Identity; +use std::sync::OnceLock; use std::{sync::Arc, time::Instant}; -use tokio::sync::oneshot; +use tokio::sync::{oneshot, watch}; type Subscriptions = Arc>; @@ -52,6 +54,7 @@ pub struct ModuleSubscriptions { broadcast_queue: BroadcastQueue, owner_identity: Identity, stats: Arc, + module_rx: OnceLock>, } #[derive(Debug, Clone)] @@ -190,9 +193,38 @@ impl ModuleSubscriptions { broadcast_queue, owner_identity, stats, + module_rx: OnceLock::new(), } } + /// Should be called once to initialize the `ModuleSubscriptions` with a `ModuleHost` receiver. + pub fn init(&self, module_host: watch::Receiver) { + self.module_rx + .set(module_host) + .expect("ModuleSubscriptions::init called twice"); + } + + #[allow(dead_code)] + async fn call_view( + &self, + tx: MutTxId, + view_name: &str, + args: FunctionArgs, + sender: Arc, + ) -> Result<(), DBError> { + let module_host_rx = self + .module_rx + .get() + .expect("ModuleSubscriptions::init not called before call_view"); + let module_host = module_host_rx.borrow(); + + let _result = module_host + .call_view(tx, view_name, args, sender.id.identity, Some(sender.id.connection_id)) + .await; + + // TODO: Handle result + Ok(()) + } /// Construct a new [`ModuleSubscriptions`] for use in testing, /// creating a new [`tokio::runtime::Runtime`] to run its send worker. pub fn for_test_new_runtime(db: Arc) -> (ModuleSubscriptions, tokio::runtime::Runtime) { @@ -2130,18 +2162,39 @@ mod tests { "INSERT INTO t (x, y) VALUES (0, 1)", auth, Some(&subs), + None, + Identity::ZERO, &mut vec![], - )?; + ) + .await?; // Client should receive insert assert_tx_update_for_table(rx.recv(), t_id, &schema, [product![0_u8, 1_u8]], []).await; - run(&db, "UPDATE t SET y=2 WHERE x=0", auth, Some(&subs), &mut vec![])?; + run( + &db, + "UPDATE t SET y=2 WHERE x=0", + auth, + Some(&subs), + None, + Identity::ZERO, + &mut vec![], + ) + .await?; // Client should receive update assert_tx_update_for_table(rx.recv(), t_id, &schema, [product![0_u8, 2_u8]], [product![0_u8, 1_u8]]).await; - run(&db, "DELETE FROM t WHERE x=0", auth, Some(&subs), &mut vec![])?; + run( + &db, + "DELETE FROM t WHERE x=0", + auth, + Some(&subs), + None, + Identity::ZERO, + &mut vec![], + ) + .await?; // Client should receive delete assert_tx_update_for_table(rx.recv(), t_id, &schema, [], [product![0_u8, 2_u8]]).await; @@ -2987,7 +3040,16 @@ mod tests { )); // Insert another row, using SQL. let auth = AuthCtx::new(identity_from_u8(0), identity_from_u8(0)); - run(&db, "INSERT INTO t (x) VALUES (2)", auth, Some(&subs), &mut vec![])?; + run( + &db, + "INSERT INTO t (x) VALUES (2)", + auth, + Some(&subs), + None, + Identity::ZERO, + &mut vec![], + ) + .await?; // Unconfirmed client should have received both rows. assert_tx_update_for_table(rx_for_unconfirmed.recv(), table, &schema, [product![1_u8]], []).await; diff --git a/crates/datastore/src/error.rs b/crates/datastore/src/error.rs index 3a571f7d579..936e077fd28 100644 --- a/crates/datastore/src/error.rs +++ b/crates/datastore/src/error.rs @@ -29,10 +29,36 @@ pub enum DatastoreError { // TODO(cloutiertyler): should this be a TableError? I couldn't get it to compile #[error("Error reading a value from a table through BSATN: {0}")] ReadViaBsatnError(#[from] ReadViaBsatnError), + + #[error("ViewError: {0}")] + View(#[from] ViewError), + #[error(transparent)] Other(#[from] anyhow::Error), } +#[derive(Error, Debug)] +pub enum ViewError { + #[error("view '{0}' not found")] + ViewNotFound(String), + #[error("failed to deserialize view arguments from row")] + DeserializeArgs, + #[error("failed to deserialize view return value: {0}")] + DeserializeReturn(String), + #[error("failed to serialize row to BSATN")] + SerializeRow, + #[error("invalid return type: expected Array or Option, got {0:?}")] + InvalidReturnType(AlgebraicType), + #[error("return type is Array but deserialized value is not Array")] + TypeMismatchArray, + #[error("return type is Option but deserialized value is not Option")] + TypeMismatchOption, + #[error("expected ProductValue in view result")] + ExpectedProduct, + #[error("failed to serialize view arguments")] + SerializeArgs, +} + #[derive(Error, Debug, EnumAsInner)] pub enum TableError { #[error("Table with name `{0}` start with 'st_' and that is reserved for internal system tables.")] diff --git a/crates/datastore/src/execution_context.rs b/crates/datastore/src/execution_context.rs index ef72cbbdf25..f363a5aa8f5 100644 --- a/crates/datastore/src/execution_context.rs +++ b/crates/datastore/src/execution_context.rs @@ -131,6 +131,7 @@ pub enum WorkloadType { Unsubscribe, Update, Internal, + View, Procedure, } diff --git a/crates/datastore/src/locking_tx_datastore/committed_state.rs b/crates/datastore/src/locking_tx_datastore/committed_state.rs index 501980284b9..0f55fa02a17 100644 --- a/crates/datastore/src/locking_tx_datastore/committed_state.rs +++ b/crates/datastore/src/locking_tx_datastore/committed_state.rs @@ -10,7 +10,10 @@ use crate::{ db_metrics::DB_METRICS, error::{DatastoreError, IndexError, TableError}, execution_context::ExecutionContext, - locking_tx_datastore::{mut_tx::ViewReadSets, state_view::iter_st_column_for_table}, + locking_tx_datastore::{ + mut_tx::{ViewCall, ViewReadSets}, + state_view::iter_st_column_for_table, + }, system_tables::{ system_tables, StColumnRow, StConstraintData, StConstraintRow, StIndexRow, StSequenceRow, StTableFields, StTableRow, SystemTable, ST_CLIENT_ID, ST_CLIENT_IDX, ST_COLUMN_ID, ST_COLUMN_IDX, ST_COLUMN_NAME, @@ -33,7 +36,7 @@ use core::{convert::Infallible, ops::RangeBounds}; use spacetimedb_data_structures::map::{HashMap, HashSet, IntMap, IntSet}; use spacetimedb_durability::TxOffset; use spacetimedb_lib::{db::auth::StTableType, Identity}; -use spacetimedb_primitives::{ColId, ColList, ColSet, IndexId, TableId, ViewId}; +use spacetimedb_primitives::{ColId, ColList, ColSet, IndexId, TableId}; use spacetimedb_sats::{algebraic_value::de::ValueDeserializer, memory_usage::MemoryUsage, Deserialize}; use spacetimedb_sats::{AlgebraicValue, ProductValue}; use spacetimedb_schema::{ @@ -50,29 +53,31 @@ use std::collections::BTreeMap; use std::sync::Arc; use thin_vec::ThinVec; -type IndexKeyReadSet = HashMap>; +type IndexKeyReadSet = HashMap>; type IndexColReadSet = HashMap; #[derive(Default)] struct CommittedReadSets { - tables: IntMap>, + tables: IntMap>, index_keys: IntMap, } impl MemoryUsage for CommittedReadSets { fn heap_usage(&self) -> usize { - self.tables.heap_usage() + self.index_keys.heap_usage() + //TODO: fix this + //self.tables.heap_usage() + self.index_keys.heap_usage() + self.views.heap_usage() + 0 } } impl CommittedReadSets { /// Record in the [`CommittedState`] that this view scans this table - fn view_scans_table(&mut self, view_id: ViewId, table_id: TableId) { - self.tables.entry(table_id).or_default().insert(view_id); + fn view_scans_table(&mut self, view: ViewCall, table_id: TableId) { + self.tables.entry(table_id).or_default().insert(view); } /// Record in the [`CommittedState`] that this view reads this index `key` for these table `cols` - fn view_reads_index_key(&mut self, view_id: ViewId, table_id: TableId, cols: ColList, key: &AlgebraicValue) { + fn view_reads_index_key(&mut self, view: ViewCall, table_id: TableId, cols: ColList, key: &AlgebraicValue) { self.index_keys .entry(table_id) .or_default() @@ -80,7 +85,26 @@ impl CommittedReadSets { .or_default() .entry(key.clone()) .or_default() - .insert(view_id); + .insert(view); + } + + /// Clear all read sets for views involving `table_id`. + /// This is called when a table is modified, + fn clear_views_for_table(&mut self, table_id: TableId) { + self.tables.remove(&table_id); + //TODO: clear from index only if stored indexed row has been updated + self.index_keys.remove(&table_id); + } + + /// Returns true if the given view exists in any read set. + /// This is used to determine whether a view needs to be re-evaluated. + fn is_materialized(&self, view: &ViewCall) -> bool { + self.tables.values().any(|views| views.contains(view)) + || self.index_keys.values().any(|col_map| { + col_map + .values() + .any(|key_map| key_map.values().any(|views| views.contains(view))) + }) } } @@ -665,9 +689,6 @@ impl CommittedState { let mut tx_data = TxData::default(); let mut truncates = IntSet::default(); - // Merge read sets from the `MutTxId` into the `CommittedState` - self.merge_read_sets(read_sets); - // First, apply deletes. This will free up space in the committed tables. self.merge_apply_deletes( &mut tx_data, @@ -688,6 +709,12 @@ impl CommittedState { // Record any truncated tables in the `TxData`. tx_data.add_truncates(truncates); + // Merge read sets from the `MutTxId` into the `CommittedState`. + // It's important that this happens after applying the changes to `tx_data`, + // which implies `tx_data` already contains inserts and deletes for view tables + // so that we can pass updated set of table ids. + self.merge_read_sets(read_sets, tx_data.table_ids_and_names().map(|(id, _)| id)); + // If the TX will be logged, record its projected tx offset, // then increment the counter. if self.tx_consumes_offset(&tx_data, ctx) { @@ -698,23 +725,26 @@ impl CommittedState { tx_data } - fn merge_read_set(&mut self, view_id: ViewId, read_set: ReadSet) { + fn merge_read_set(&mut self, view: ViewCall, read_set: ReadSet) { for table_id in read_set.tables_scanned() { - self.read_sets.view_scans_table(view_id, *table_id); + self.read_sets.view_scans_table(view.clone(), *table_id); } for (table_id, index_id, key) in read_set.index_keys_scanned() { if let Some(cols) = self .get_schema(*table_id) .map(|table_schema| table_schema.col_list_for_index_id(*index_id)) { - self.read_sets.view_reads_index_key(view_id, *table_id, cols, key); + self.read_sets.view_reads_index_key(view.clone(), *table_id, cols, key); } } } - fn merge_read_sets(&mut self, read_sets: ViewReadSets) { - for (view_id, read_set) in read_sets { - self.merge_read_set(view_id, read_set); + fn merge_read_sets(&mut self, read_sets: ViewReadSets, updated_tables: impl IntoIterator) { + for (view, read_set) in read_sets { + self.merge_read_set(view, read_set); + } + for table_id in updated_tables { + self.read_sets.clear_views_for_table(table_id); } } @@ -1031,6 +1061,10 @@ impl CommittedState { .with_label_values(&database_identity) .set(self.blob_store.bytes_used_by_blobs() as _); } + + pub(super) fn is_materialized(&self, view: &ViewCall) -> bool { + self.read_sets.is_materialized(view) + } } pub(super) type CommitTableForInsertion<'a> = (&'a Table, &'a dyn BlobStore, &'a IndexIdMap); diff --git a/crates/datastore/src/locking_tx_datastore/datastore.rs b/crates/datastore/src/locking_tx_datastore/datastore.rs index bc15c84be16..84fd47f9757 100644 --- a/crates/datastore/src/locking_tx_datastore/datastore.rs +++ b/crates/datastore/src/locking_tx_datastore/datastore.rs @@ -35,7 +35,7 @@ use spacetimedb_durability::TxOffset; use spacetimedb_lib::{db::auth::StAccess, metrics::ExecutionMetrics}; use spacetimedb_lib::{ConnectionId, Identity}; use spacetimedb_paths::server::SnapshotDirPath; -use spacetimedb_primitives::{ColList, ConstraintId, IndexId, SequenceId, TableId, ViewId}; +use spacetimedb_primitives::{ColList, ConstraintId, IndexId, SequenceId, TableId, ViewDatabaseId}; use spacetimedb_sats::{ algebraic_value::de::ValueDeserializer, bsatn, buffer::BufReader, AlgebraicValue, ProductValue, }; @@ -512,7 +512,7 @@ impl MutTxDatastore for Locking { tx.rename_table(table_id, new_name) } - fn view_id_from_name_mut_tx(&self, tx: &Self::MutTx, view_name: &str) -> Result> { + fn view_id_from_name_mut_tx(&self, tx: &Self::MutTx, view_name: &str) -> Result> { tx.view_id_from_name(view_name) } @@ -1272,7 +1272,7 @@ mod tests { use spacetimedb_lib::error::ResultTest; use spacetimedb_lib::st_var::StVarValue; use spacetimedb_lib::{resolved_type_via_v9, ScheduleAt, TimeDuration}; - use spacetimedb_primitives::{col_list, ArgId, ColId, ScheduleId, ViewId}; + use spacetimedb_primitives::{col_list, ArgId, ColId, ScheduleId, ViewDatabaseId}; use spacetimedb_sats::algebraic_value::ser::value_serialize; use spacetimedb_sats::bsatn::ToBsatn; use spacetimedb_sats::layout::RowTypeLayout; @@ -1718,6 +1718,7 @@ mod tests { TableRow { id: ST_VIEW_SUB_ID.into(), name: ST_VIEW_SUB_NAME, ty: StTableType::System, access: StAccess::Public, primary_key: None }, TableRow { id: ST_VIEW_ARG_ID.into(), name: ST_VIEW_ARG_NAME, ty: StTableType::System, access: StAccess::Public, primary_key: Some(StViewArgFields::Id.into()) }, + ])); #[rustfmt::skip] assert_eq!(query.scan_st_columns()?, map_array([ @@ -1777,23 +1778,23 @@ mod tests { ColRow { table: ST_CONNECTION_CREDENTIALS_ID.into(), pos: 0, name: "connection_id", ty: AlgebraicType::U128 }, ColRow { table: ST_CONNECTION_CREDENTIALS_ID.into(), pos: 1, name: "jwt_payload", ty: AlgebraicType::String }, - ColRow { table: ST_VIEW_ID.into(), pos: 0, name: "view_id", ty: ViewId::get_type() }, + ColRow { table: ST_VIEW_ID.into(), pos: 0, name: "view_id", ty: ViewDatabaseId::get_type() }, ColRow { table: ST_VIEW_ID.into(), pos: 1, name: "view_name", ty: AlgebraicType::String }, ColRow { table: ST_VIEW_ID.into(), pos: 2, name: "table_id", ty: AlgebraicType::option(TableId::get_type()) }, ColRow { table: ST_VIEW_ID.into(), pos: 3, name: "is_public", ty: AlgebraicType::Bool }, ColRow { table: ST_VIEW_ID.into(), pos: 4, name: "is_anonymous", ty: AlgebraicType::Bool }, - ColRow { table: ST_VIEW_PARAM_ID.into(), pos: 0, name: "view_id", ty: ViewId::get_type() }, + ColRow { table: ST_VIEW_PARAM_ID.into(), pos: 0, name: "view_id", ty: ViewDatabaseId::get_type() }, ColRow { table: ST_VIEW_PARAM_ID.into(), pos: 1, name: "param_pos", ty: ColId::get_type() }, ColRow { table: ST_VIEW_PARAM_ID.into(), pos: 2, name: "param_name", ty: AlgebraicType::String }, ColRow { table: ST_VIEW_PARAM_ID.into(), pos: 3, name: "param_type", ty: AlgebraicType::bytes() }, - ColRow { table: ST_VIEW_COLUMN_ID.into(), pos: 0, name: "view_id", ty: ViewId::get_type() }, + ColRow { table: ST_VIEW_COLUMN_ID.into(), pos: 0, name: "view_id", ty: ViewDatabaseId::get_type() }, ColRow { table: ST_VIEW_COLUMN_ID.into(), pos: 1, name: "col_pos", ty: ColId::get_type() }, ColRow { table: ST_VIEW_COLUMN_ID.into(), pos: 2, name: "col_name", ty: AlgebraicType::String }, ColRow { table: ST_VIEW_COLUMN_ID.into(), pos: 3, name: "col_type", ty: AlgebraicType::bytes() }, - ColRow { table: ST_VIEW_SUB_ID.into(), pos: 0, name: "view_id", ty: ViewId::get_type() }, + ColRow { table: ST_VIEW_SUB_ID.into(), pos: 0, name: "view_id", ty: ViewDatabaseId::get_type() }, ColRow { table: ST_VIEW_SUB_ID.into(), pos: 1, name: "arg_id", ty: ArgId::get_type() }, ColRow { table: ST_VIEW_SUB_ID.into(), pos: 2, name: "identity", ty: AlgebraicType::U256 }, ColRow { table: ST_VIEW_SUB_ID.into(), pos: 3, name: "num_subscribers", ty: AlgebraicType::U64 }, diff --git a/crates/datastore/src/locking_tx_datastore/mod.rs b/crates/datastore/src/locking_tx_datastore/mod.rs index 3abad597992..a22bbd4ff16 100644 --- a/crates/datastore/src/locking_tx_datastore/mod.rs +++ b/crates/datastore/src/locking_tx_datastore/mod.rs @@ -3,7 +3,7 @@ pub mod committed_state; pub mod datastore; mod mut_tx; -pub use mut_tx::MutTxId; +pub use mut_tx::{MutTxId, ViewCall}; mod sequence; pub mod state_view; pub use state_view::{IterByColEqTx, IterByColRangeTx}; diff --git a/crates/datastore/src/locking_tx_datastore/mut_tx.rs b/crates/datastore/src/locking_tx_datastore/mut_tx.rs index f11198a61ef..9efa373bc8b 100644 --- a/crates/datastore/src/locking_tx_datastore/mut_tx.rs +++ b/crates/datastore/src/locking_tx_datastore/mut_tx.rs @@ -10,8 +10,9 @@ use super::{ }; use crate::system_tables::{ system_tables, ConnectionIdViaU128, IdentityViaU256, StConnectionCredentialsFields, StConnectionCredentialsRow, - StViewColumnFields, StViewFields, StViewParamFields, StViewParamRow, StViewSubFields, StViewSubRow, - ST_CONNECTION_CREDENTIALS_ID, ST_VIEW_COLUMN_ID, ST_VIEW_ID, ST_VIEW_PARAM_ID, ST_VIEW_SUB_ID, + StViewArgFields, StViewArgRow, StViewColumnFields, StViewFields, StViewParamFields, StViewParamRow, + StViewSubFields, StViewSubRow, ST_CONNECTION_CREDENTIALS_ID, ST_VIEW_ARG_ID, ST_VIEW_COLUMN_ID, ST_VIEW_ID, + ST_VIEW_PARAM_ID, ST_VIEW_SUB_ID, }; use crate::traits::{InsertFlags, RowTypeForTable, TxData, UpdateFlags}; use crate::{ @@ -26,6 +27,7 @@ use crate::{ }; use crate::{execution_context::ExecutionContext, system_tables::StViewColumnRow}; use crate::{execution_context::Workload, system_tables::StViewRow}; +use bytes::Bytes; use core::ops::RangeBounds; use core::{cell::RefCell, mem}; use core::{iter, ops::Bound}; @@ -33,17 +35,18 @@ use smallvec::SmallVec; use spacetimedb_data_structures::map::{IntMap, IntSet}; use spacetimedb_durability::TxOffset; use spacetimedb_execution::{dml::MutDatastore, Datastore, DeltaStore, Row}; -use spacetimedb_lib::{db::raw_def::v9::RawSql, metrics::ExecutionMetrics, Timestamp}; +use spacetimedb_lib::{bsatn::ToBsatn as _, db::raw_def::v9::RawSql, metrics::ExecutionMetrics, Timestamp}; use spacetimedb_lib::{ db::{auth::StAccess, raw_def::SEQUENCE_ALLOCATION_STEP}, ConnectionId, Identity, }; use spacetimedb_primitives::{ - col_list, ArgId, ColId, ColList, ColSet, ConstraintId, IndexId, ScheduleId, SequenceId, TableId, ViewId, + col_list, ArgId, ColId, ColList, ColSet, ConstraintId, IndexId, ScheduleId, SequenceId, TableId, ViewDatabaseId, }; use spacetimedb_sats::{ bsatn::{self, to_writer, DecodeError, Deserializer}, de::{DeserializeSeed, WithBound}, + product, ser::Serialize, AlgebraicType, AlgebraicValue, ProductType, ProductValue, WithTypespace, }; @@ -116,7 +119,37 @@ impl ReadSet { } } -pub type ViewReadSets = IntMap; +#[derive(Clone, Debug, Eq, PartialEq, Hash)] +pub struct ViewCall { + identity: Option, + view_id: ViewDatabaseId, + //TODO: use arg_id from [`ST_VIEW_ARGS`] + args: Bytes, +} + +impl ViewCall { + pub fn anonymous(view_id: ViewDatabaseId, args: Bytes) -> Self { + Self { + identity: None, + view_id, + args, + } + } + + pub fn with_identity(identity: Identity, view_id: ViewDatabaseId, args: Bytes) -> Self { + Self { + identity: Some(identity), + view_id, + args, + } + } + + pub fn into_args(self) -> Bytes { + self.args + } +} + +pub type ViewReadSets = HashMap; /// Represents a Mutable transaction. Holds locks for its duration /// @@ -136,28 +169,28 @@ pub struct MutTxId { pub metrics: ExecutionMetrics, } -static_assert_size!(MutTxId, 432); +static_assert_size!(MutTxId, 448); impl MutTxId { /// Record that a view performs a table scan in this transaction's read set - pub fn record_table_scan(&mut self, view_id: Option, table_id: TableId) { - if let Some(view_id) = view_id { - self.read_sets.entry(view_id).or_default().insert_table_scan(table_id) + pub fn record_table_scan(&mut self, view: Option, table_id: TableId) { + if let Some(view) = view { + self.read_sets.entry(view).or_default().insert_table_scan(table_id) } } /// Record that a view performs an index scan in this transaction's read set pub fn record_index_scan( &mut self, - view_id: Option, + view: Option, table_id: TableId, index_id: IndexId, lower: Bound, upper: Bound, ) { - if let Some(view_id) = view_id { + if let Some(view) = view { self.read_sets - .entry(view_id) + .entry(view) .or_default() .insert_index_scan(table_id, index_id, lower, upper) } @@ -302,7 +335,7 @@ impl MutTxId { /// - Everything [`Self::create_table`] ensures. /// - The returned [`ViewId`] is unique and not [`ViewId::SENTINEL`]. /// - All view metadata maintained by the datastore is created atomically - pub fn create_view(&mut self, module_def: &ModuleDef, view_def: &ViewDef) -> Result<(ViewId, TableId)> { + pub fn create_view(&mut self, module_def: &ModuleDef, view_def: &ViewDef) -> Result<(ViewDatabaseId, TableId)> { let table_schema = TableSchema::from_view_def_for_datastore(module_def, view_def); let table_id = self.create_table(table_schema)?; @@ -322,7 +355,7 @@ impl MutTxId { } /// Drop the backing table of a view and update the system tables. - pub fn drop_view(&mut self, view_id: ViewId) -> Result<()> { + pub fn drop_view(&mut self, view_id: ViewDatabaseId) -> Result<()> { // Drop the view's metadata self.drop_st_view(view_id)?; self.drop_st_view_param(view_id)?; @@ -444,7 +477,7 @@ impl MutTxId { }) } - fn lookup_st_view(&self, view_id: ViewId) -> Result { + pub fn lookup_st_view(&self, view_id: ViewDatabaseId) -> Result { let row = self .iter_by_col_eq(ST_VIEW_ID, StViewFields::ViewId, &view_id.into())? .next() @@ -453,6 +486,22 @@ impl MutTxId { StViewRow::try_from(row) } + pub fn lookup_st_view_by_name(&self, view: &str) -> Result { + let st_view_row = self + .iter_by_col_eq(ST_VIEW_ID, StViewFields::ViewName, &view.into())? + .next() + .unwrap(); + + StViewRow::try_from(st_view_row) + } + + /// Check if view has parameters. + pub fn is_view_parameterized(&self, view_id: ViewDatabaseId) -> Result { + let view_id = view_id.into(); + let mut iter = self.iter_by_col_eq(ST_VIEW_PARAM_ID, StViewParamFields::ViewId, &view_id)?; + Ok(iter.next().is_some()) + } + /// Insert a row into `st_view`, auto-increments and returns the [`ViewId`]. fn insert_into_st_view( &mut self, @@ -460,12 +509,12 @@ impl MutTxId { table_id: TableId, is_public: bool, is_anonymous: bool, - ) -> Result { + ) -> Result { Ok(self .insert_via_serialize_bsatn( ST_VIEW_ID, &StViewRow { - view_id: ViewId::SENTINEL, + view_id: ViewDatabaseId::SENTINEL, view_name, table_id: Some(table_id), is_public, @@ -479,7 +528,7 @@ impl MutTxId { /// For each parameter of a view, insert a row into `st_view_param`. /// This does not include the context parameter. - fn insert_into_st_view_param(&mut self, view_id: ViewId, params: &ProductType) -> Result<()> { + fn insert_into_st_view_param(&mut self, view_id: ViewDatabaseId, params: &ProductType) -> Result<()> { for (i, field) in params.elements.iter().enumerate() { self.insert_via_serialize_bsatn( ST_VIEW_PARAM_ID, @@ -498,7 +547,7 @@ impl MutTxId { } /// For each column or field returned in a view, insert a row into `st_view_column`. - fn insert_into_st_view_column(&mut self, view_id: ViewId, columns: &[ViewColumnDef]) -> Result<()> { + fn insert_into_st_view_column(&mut self, view_id: ViewDatabaseId, columns: &[ViewColumnDef]) -> Result<()> { for def in columns { self.insert_via_serialize_bsatn( ST_VIEW_COLUMN_ID, @@ -564,17 +613,17 @@ impl MutTxId { } /// Drops the row in `st_view` for this `view_id` - fn drop_st_view(&mut self, view_id: ViewId) -> Result<()> { + fn drop_st_view(&mut self, view_id: ViewDatabaseId) -> Result<()> { self.delete_col_eq(ST_VIEW_ID, StViewFields::ViewId.col_id(), &view_id.into()) } /// Drops the rows in `st_view_param` for this `view_id` - fn drop_st_view_param(&mut self, view_id: ViewId) -> Result<()> { + fn drop_st_view_param(&mut self, view_id: ViewDatabaseId) -> Result<()> { self.delete_col_eq(ST_VIEW_PARAM_ID, StViewParamFields::ViewId.col_id(), &view_id.into()) } /// Drops the rows in `st_view_column` for this `view_id` - fn drop_st_view_column(&mut self, view_id: ViewId) -> Result<()> { + fn drop_st_view_column(&mut self, view_id: ViewDatabaseId) -> Result<()> { self.delete_col_eq(ST_VIEW_COLUMN_ID, StViewColumnFields::ViewId.col_id(), &view_id.into()) } @@ -645,7 +694,7 @@ impl MutTxId { Ok(ret) } - pub fn view_id_from_name(&self, view_name: &str) -> Result> { + pub fn view_id_from_name(&self, view_name: &str) -> Result> { let view_name = &view_name.into(); let row = self .iter_by_col_eq(ST_VIEW_ID, StViewFields::ViewName, view_name)? @@ -653,6 +702,14 @@ impl MutTxId { Ok(row.map(|row| row.read_col(StViewFields::ViewId).unwrap())) } + pub fn view_from_name(&self, view_name: &str) -> Result> { + let view_name = &view_name.into(); + let row = self + .iter_by_col_eq(ST_VIEW_ID, StViewFields::ViewName, view_name)? + .next(); + Ok(row.map(|row| row.try_into().expect("st_view row should be valid"))) + } + pub fn table_id_from_name(&self, table_name: &str) -> Result> { let table_name = &table_name.into(); let row = self @@ -686,6 +743,31 @@ impl MutTxId { Ok((tx, commit)) } + + /// Checks whether a materialized view exists for the given view name, arguments, and sender identity. + /// + /// If view is not materialized, [`RelationalDB::evaluate_view`] should be called to compute and store it. + /// + /// - `view_name`: The name of the view to look up. + /// - `args`: The serialized (bastn-encoded) arguments for the view. + /// - `sender`: The identity of the sender requesting the view. + pub fn is_materialized(&self, view_name: &str, args: Bytes, sender: Identity) -> Result<(bool, Bytes)> { + let (view_id, is_anonymous) = self + .view_from_name(view_name)? + .map(|view_row| (view_row.view_id, view_row.is_anonymous)) + .ok_or_else(|| anyhow::anyhow!("view `{view_name}` not found"))?; + + let view_call = if is_anonymous { + ViewCall::anonymous(view_id, args) + } else { + ViewCall::with_identity(sender, view_id, args) + }; + + let is_materialized = + self.read_sets.contains_key(&view_call) || self.committed_state_write_lock.is_materialized(&view_call); + + Ok((is_materialized, view_call.into_args())) + } } impl MutTxId { @@ -1768,7 +1850,7 @@ impl<'a, I: Iterator>> Iterator for FilterDeleted<'a, I> { impl MutTxId { /// Does this caller have an entry for `view_id` in `st_view_sub`? - pub fn is_view_materialized(&self, view_id: ViewId, arg_id: ArgId, sender: Identity) -> Result { + pub fn is_view_materialized(&self, view_id: ViewDatabaseId, arg_id: ArgId, sender: Identity) -> Result { use StViewSubFields::*; let sender = IdentityViaU256(sender); let cols = col_list![ViewId, ArgId, Identity]; @@ -1781,7 +1863,7 @@ impl MutTxId { /// Otherwise insert a row into `st_view_sub` with no subscribers. pub fn st_view_sub_update_or_insert_last_called( &mut self, - view_id: ViewId, + view_id: ViewDatabaseId, arg_id: ArgId, sender: Identity, ) -> Result<()> { @@ -1875,6 +1957,43 @@ impl MutTxId { Ok(()) } + /// Get or insert view argument into `ST_VIEW_ARG_ID`. + pub fn get_or_insert_st_view_arg(&mut self, args: &Bytes) -> Result { + let bytes_av = AlgebraicValue::Bytes(args.to_vec().into()); + let mut rows = self.iter_by_col_eq(ST_VIEW_ARG_ID, [StViewArgFields::Bytes], &bytes_av)?; + + // Extract the first matching `arg_id`, if any. + if let Some(res) = rows.next() { + let row = StViewArgRow::try_from(res).expect("valid StViewArgRow"); + return Ok(row.id); + } + + let view_arg_bytes = product![0u64, bytes_av] + .to_bsatn_vec() + .expect("StViewArgRow serialization to never fail"); + + let (_, view_arg_row, _) = self.insert_via_serialize_bsatn(ST_VIEW_ARG_ID, &view_arg_bytes)?; + let StViewArgRow { id: arg_id, .. } = view_arg_row.collapse().try_into().expect("valid StViewArgRow"); + + Ok(arg_id) + } + + /// Lookup a row in `st_view` by its primary key + fn st_view_row(&self, view_id: ViewDatabaseId) -> Result> { + self.iter_by_col_eq(ST_VIEW_ID, col_list![StViewFields::ViewId], &view_id.into())? + .next() + .map(StViewRow::try_from) + .transpose() + } + + /// Get the [`TableId`] for this view's backing table by probing `st_view`. + /// Note, all views with at least one subscriber are materialized. + pub fn get_table_id_for_view(&self, view_id: ViewDatabaseId) -> Result> { + Ok(self + .st_view_row(view_id)? + .and_then(|row| row.table_id.map(|id| (id, row.is_anonymous)))) + } + pub fn insert_st_client( &mut self, identity: Identity, diff --git a/crates/datastore/src/system_tables.rs b/crates/datastore/src/system_tables.rs index a8de22c0b99..e6539df8453 100644 --- a/crates/datastore/src/system_tables.rs +++ b/crates/datastore/src/system_tables.rs @@ -798,7 +798,7 @@ impl From for ProductValue { #[sats(crate = spacetimedb_lib)] pub struct StViewRow { /// An auto-inc id for each view - pub view_id: ViewId, + pub view_id: ViewDatabaseId, /// The name of the view function as defined in the module pub view_name: Box, /// The [`TableId`] for this view if materialized. @@ -907,7 +907,7 @@ impl From for StColumnRow { #[sats(crate = spacetimedb_lib)] pub struct StViewColumnRow { /// A foreign key referencing [`ST_VIEW_NAME`]. - pub view_id: ViewId, + pub view_id: ViewDatabaseId, pub col_pos: ColId, pub col_name: Box, pub col_type: AlgebraicTypeViaBytes, @@ -922,7 +922,7 @@ pub struct StViewColumnRow { #[sats(crate = spacetimedb_lib)] pub struct StViewParamRow { /// A foreign key referencing [`ST_VIEW_NAME`]. - pub view_id: ViewId, + pub view_id: ViewDatabaseId, pub param_pos: ColId, pub param_name: Box, pub param_type: AlgebraicTypeViaBytes, @@ -936,7 +936,7 @@ pub struct StViewParamRow { #[derive(Debug, Clone, Eq, PartialEq, SpacetimeType)] #[sats(crate = spacetimedb_lib)] pub struct StViewSubRow { - pub view_id: ViewId, + pub view_id: ViewDatabaseId, pub arg_id: ArgId, pub identity: IdentityViaU256, pub num_subscribers: u64, @@ -1025,6 +1025,13 @@ impl TryFrom> for StIndexRow { } } +impl TryFrom> for StViewArgRow { + type Error = DatastoreError; + fn try_from(row: RowRef<'_>) -> Result { + read_via_bsatn(row) + } +} + impl From for ProductValue { fn from(x: StIndexRow) -> Self { to_product_value(&x) diff --git a/crates/datastore/src/traits.rs b/crates/datastore/src/traits.rs index b04b2444863..f3992c68d24 100644 --- a/crates/datastore/src/traits.rs +++ b/crates/datastore/src/traits.rs @@ -509,7 +509,7 @@ pub trait MutTxDatastore: TxDatastore + MutTx { fn schema_for_table_mut_tx(&self, tx: &Self::MutTx, table_id: TableId) -> Result>; fn drop_table_mut_tx(&self, tx: &mut Self::MutTx, table_id: TableId) -> Result<()>; fn rename_table_mut_tx(&self, tx: &mut Self::MutTx, table_id: TableId, new_name: &str) -> Result<()>; - fn view_id_from_name_mut_tx(&self, tx: &Self::MutTx, view_name: &str) -> Result>; + fn view_id_from_name_mut_tx(&self, tx: &Self::MutTx, view_name: &str) -> Result>; fn table_id_from_name_mut_tx(&self, tx: &Self::MutTx, table_name: &str) -> Result>; fn table_id_exists_mut_tx(&self, tx: &Self::MutTx, table_id: &TableId) -> bool; fn table_name_from_id_mut_tx<'a>(&'a self, tx: &'a Self::MutTx, table_id: TableId) -> Result>>; diff --git a/crates/expr/Cargo.toml b/crates/expr/Cargo.toml index afc5fb9f401..4997aa60382 100644 --- a/crates/expr/Cargo.toml +++ b/crates/expr/Cargo.toml @@ -17,6 +17,7 @@ spacetimedb-primitives.workspace = true spacetimedb-sats.workspace = true spacetimedb-schema.workspace = true spacetimedb-sql-parser.workspace = true +bytes.workspace = true [dev-dependencies] pretty_assertions.workspace = true diff --git a/crates/expr/src/statement.rs b/crates/expr/src/statement.rs index 43f782ca698..291282c48a2 100644 --- a/crates/expr/src/statement.rs +++ b/crates/expr/src/statement.rs @@ -1,5 +1,6 @@ use std::sync::Arc; +use bytes::Bytes; use spacetimedb_lib::{identity::AuthCtx, st_var::StVarValue, AlgebraicType, AlgebraicValue, ProductValue}; use spacetimedb_primitives::{ColId, TableId}; use spacetimedb_schema::schema::{ColumnSchema, TableOrViewSchema}; @@ -31,6 +32,13 @@ pub enum Statement { DML(DML), } +impl Statement { + pub fn views(&self) -> Vec<(&str, Bytes)> { + //TODO: implement view name extraction + vec![] + } +} + pub enum DML { Insert(TableInsert), Update(TableUpdate), diff --git a/crates/primitives/src/ids.rs b/crates/primitives/src/ids.rs index 06c6d7fc38c..062da0446fb 100644 --- a/crates/primitives/src/ids.rs +++ b/crates/primitives/src/ids.rs @@ -80,9 +80,9 @@ auto_inc_system_id!(TableId); system_id! { /// An identifier for a view, unique within a database. - pub struct ViewId(pub u32); + pub struct ViewDatabaseId(pub u32); } -auto_inc_system_id!(ViewId); +auto_inc_system_id!(ViewDatabaseId); system_id! { /// An identifier for a list of arguments passed to a view. @@ -137,6 +137,12 @@ system_id! { pub struct ProcedureId(pub u32); } +system_id! { + /// the index of a view as defined in a module's view list. + // This is never stored in a system table, but is useful to have defined here. + pub struct ViewId(pub u32); +} + /// An id for a function exported from a module, which may be a reducer or a procedure. // This is never stored in a system table, // but is useful to have defined here to provide a shared language for downstream crates. diff --git a/crates/primitives/src/lib.rs b/crates/primitives/src/lib.rs index 3245a911200..fab5c541187 100644 --- a/crates/primitives/src/lib.rs +++ b/crates/primitives/src/lib.rs @@ -8,7 +8,8 @@ mod ids; pub use attr::{AttributeKind, ColumnAttribute, ConstraintKind, Constraints}; pub use col_list::{ColList, ColOrCols, ColSet}; pub use ids::{ - ArgId, ColId, ConstraintId, FunctionId, IndexId, ProcedureId, ReducerId, ScheduleId, SequenceId, TableId, ViewId, + ArgId, ColId, ConstraintId, FunctionId, IndexId, ProcedureId, ReducerId, ScheduleId, SequenceId, TableId, + ViewDatabaseId, ViewId, }; /// The minimum size of a chunk yielded by a wasm abi RowIter. diff --git a/crates/sats/src/algebraic_value.rs b/crates/sats/src/algebraic_value.rs index f5316018502..5b0859beeea 100644 --- a/crates/sats/src/algebraic_value.rs +++ b/crates/sats/src/algebraic_value.rs @@ -160,6 +160,18 @@ impl AlgebraicValue { } } + /// Converts `self` into an `Option`, if applicable. + pub fn into_option(self) -> Result, Self> { + match self { + AlgebraicValue::Sum(sum_value) => match sum_value.tag { + 0 => Ok(Some(*sum_value.value)), + 1 => Ok(None), + _ => Err(AlgebraicValue::Sum(sum_value)), + }, + _ => Err(self), + } + } + /// Returns an [`AlgebraicValue`] for `some: v`. /// /// The `some` variant is assigned the tag `0`. diff --git a/crates/sats/src/convert.rs b/crates/sats/src/convert.rs index ba38c6b2d74..b42fba8aa7e 100644 --- a/crates/sats/src/convert.rs +++ b/crates/sats/src/convert.rs @@ -1,7 +1,7 @@ use crate::sum_value::SumTag; use crate::{i256, u256}; use crate::{AlgebraicType, AlgebraicValue, ProductType, ProductValue}; -use spacetimedb_primitives::{ArgId, ColId, ConstraintId, IndexId, ScheduleId, SequenceId, TableId, ViewId}; +use spacetimedb_primitives::{ArgId, ColId, ConstraintId, IndexId, ScheduleId, SequenceId, TableId, ViewDatabaseId}; impl crate::Value for AlgebraicValue { type Type = AlgebraicType; @@ -65,7 +65,7 @@ macro_rules! system_id { } system_id!(ArgId); system_id!(TableId); -system_id!(ViewId); +system_id!(ViewDatabaseId); system_id!(ColId); system_id!(SequenceId); system_id!(IndexId); diff --git a/crates/sats/src/de/impls.rs b/crates/sats/src/de/impls.rs index d0c9a92c23d..7649fa525e4 100644 --- a/crates/sats/src/de/impls.rs +++ b/crates/sats/src/de/impls.rs @@ -743,7 +743,7 @@ impl FieldNameVisitor<'_> for TupleNameVisitor<'_> { impl_deserialize!([] spacetimedb_primitives::ArgId, de => u64::deserialize(de).map(Self)); impl_deserialize!([] spacetimedb_primitives::TableId, de => u32::deserialize(de).map(Self)); -impl_deserialize!([] spacetimedb_primitives::ViewId, de => u32::deserialize(de).map(Self)); +impl_deserialize!([] spacetimedb_primitives::ViewDatabaseId, de => u32::deserialize(de).map(Self)); impl_deserialize!([] spacetimedb_primitives::SequenceId, de => u32::deserialize(de).map(Self)); impl_deserialize!([] spacetimedb_primitives::IndexId, de => u32::deserialize(de).map(Self)); impl_deserialize!([] spacetimedb_primitives::ConstraintId, de => u32::deserialize(de).map(Self)); diff --git a/crates/sats/src/ser/impls.rs b/crates/sats/src/ser/impls.rs index 9baac393dff..6abd9e32a94 100644 --- a/crates/sats/src/ser/impls.rs +++ b/crates/sats/src/ser/impls.rs @@ -259,7 +259,7 @@ impl_serialize!([] ValueWithType<'_, ArrayValue>, (self, ser) => { impl_serialize!([] spacetimedb_primitives::ArgId, (self, ser) => ser.serialize_u64(self.0)); impl_serialize!([] spacetimedb_primitives::TableId, (self, ser) => ser.serialize_u32(self.0)); -impl_serialize!([] spacetimedb_primitives::ViewId, (self, ser) => ser.serialize_u32(self.0)); +impl_serialize!([] spacetimedb_primitives::ViewDatabaseId, (self, ser) => ser.serialize_u32(self.0)); impl_serialize!([] spacetimedb_primitives::SequenceId, (self, ser) => ser.serialize_u32(self.0)); impl_serialize!([] spacetimedb_primitives::IndexId, (self, ser) => ser.serialize_u32(self.0)); impl_serialize!([] spacetimedb_primitives::ConstraintId, (self, ser) => ser.serialize_u32(self.0)); diff --git a/crates/sats/src/typespace.rs b/crates/sats/src/typespace.rs index f7d0978d660..6451cf650a0 100644 --- a/crates/sats/src/typespace.rs +++ b/crates/sats/src/typespace.rs @@ -413,7 +413,7 @@ impl_st!([T] Option, ts => AlgebraicType::option(T::make_type(ts))); impl_st!([] spacetimedb_primitives::ArgId, AlgebraicType::U64); impl_st!([] spacetimedb_primitives::ColId, AlgebraicType::U16); impl_st!([] spacetimedb_primitives::TableId, AlgebraicType::U32); -impl_st!([] spacetimedb_primitives::ViewId, AlgebraicType::U32); +impl_st!([] spacetimedb_primitives::ViewDatabaseId, AlgebraicType::U32); impl_st!([] spacetimedb_primitives::IndexId, AlgebraicType::U32); impl_st!([] spacetimedb_primitives::SequenceId, AlgebraicType::U32); impl_st!([] spacetimedb_primitives::ConstraintId, AlgebraicType::U32); diff --git a/crates/schema/src/def.rs b/crates/schema/src/def.rs index 59e9e409395..79bdcd6e38d 100644 --- a/crates/schema/src/def.rs +++ b/crates/schema/src/def.rs @@ -37,7 +37,7 @@ use spacetimedb_lib::db::raw_def::v9::{ RawUniqueConstraintDataV9, RawViewDefV9, TableAccess, TableType, }; use spacetimedb_lib::{ProductType, RawModuleDef}; -use spacetimedb_primitives::{ColId, ColList, ColOrCols, ColSet, ProcedureId, ReducerId, TableId}; +use spacetimedb_primitives::{ColId, ColList, ColOrCols, ColSet, ProcedureId, ReducerId, TableId, ViewId}; use spacetimedb_sats::{AlgebraicType, AlgebraicValue}; use spacetimedb_sats::{AlgebraicTypeRef, Typespace}; @@ -245,6 +245,12 @@ impl ModuleDef { self.views.get(name) } + /// Convenience method to look up a view, possibly by a string, returning its id as well. + pub fn view_full>(&self, name: &K) -> Option<(ViewId, &ViewDef)> { + // If the string IS a valid identifier, we can just look it up. + self.views.get_full(name).map(|(idx, _, def)| (idx.into(), def)) + } + /// Convenience method to look up a reducer, possibly by a string. pub fn reducer>(&self, name: &K) -> Option<&ReducerDef> { // If the string IS a valid identifier, we can just look it up. @@ -289,6 +295,11 @@ impl ModuleDef { self.procedures.get_index(id.idx()).map(|(_, def)| def) } + /// Look up a view by its id, panicking if it doesn't exist. + pub fn view_by_id(&self, id: ViewId) -> &ViewDef { + &self.views[id.idx()] + } + /// Looks up a lifecycle reducer defined in the module. pub fn lifecycle_reducer(&self, lifecycle: Lifecycle) -> Option<(ReducerId, &ReducerDef)> { self.lifecycle_reducers[lifecycle].map(|i| (i, &self.reducers[i.idx()])) diff --git a/crates/schema/src/def/deserialize.rs b/crates/schema/src/def/deserialize.rs index 09b78f3d2d4..993bf064ac4 100644 --- a/crates/schema/src/def/deserialize.rs +++ b/crates/schema/src/def/deserialize.rs @@ -1,6 +1,6 @@ //! Helpers to allow deserializing data using a ReducerDef. -use crate::def::{ProcedureDef, ReducerDef}; +use crate::def::{ProcedureDef, ReducerDef, ViewDef}; use spacetimedb_lib::{ sats::{self, de, impl_serialize, ser, ProductValue}, ProductType, @@ -8,7 +8,7 @@ use spacetimedb_lib::{ /// Wrapper around a function def that allows deserializing to a [`ProductValue`] at the type of the def's parameter [`ProductType`]. /// -/// Sensible instantiations for `Def` are [`ProcedureDef`] and [`ReducerDef`]. +/// Sensible instantiations for `Def` are [`ProcedureDef`], [`ReducerDef`] and [`ViewDef`]. pub struct ArgsSeed<'a, Def>(pub sats::WithTypespace<'a, Def>); // Manual impls of traits rather than derives, @@ -44,6 +44,15 @@ impl FunctionDef for ProcedureDef { } } +impl FunctionDef for ViewDef { + fn params(&self) -> &ProductType { + &self.params + } + fn name(&self) -> &str { + &self.name + } +} + impl ArgsSeed<'_, Def> { pub fn name(&self) -> &str { self.0.ty().name() diff --git a/crates/schema/src/schema.rs b/crates/schema/src/schema.rs index 9b724d2f56c..141c99cfa66 100644 --- a/crates/schema/src/schema.rs +++ b/crates/schema/src/schema.rs @@ -52,7 +52,7 @@ pub trait Schema: Sized { #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub struct ViewInfo { - pub view_id: ViewId, + pub view_id: ViewDatabaseId, pub has_args: bool, pub is_anonymous: bool, } @@ -740,7 +740,7 @@ impl TableSchema { }; let view_info = ViewInfo { - view_id: ViewId::SENTINEL, + view_id: ViewDatabaseId::SENTINEL, has_args: !param_columns.is_empty(), is_anonymous: *is_anonymous, }; @@ -862,7 +862,7 @@ impl TableSchema { }; let view_info = ViewInfo { - view_id: ViewId::SENTINEL, + view_id: ViewDatabaseId::SENTINEL, has_args: !param_columns.is_empty(), is_anonymous: *is_anonymous, }; diff --git a/crates/table/src/read_column.rs b/crates/table/src/read_column.rs index 1cef02b2ff0..af07d24caa3 100644 --- a/crates/table/src/read_column.rs +++ b/crates/table/src/read_column.rs @@ -327,7 +327,7 @@ macro_rules! impl_read_column_via_from { impl_read_column_via_from! { u64 => spacetimedb_primitives::ArgId; u16 => spacetimedb_primitives::ColId; - u32 => spacetimedb_primitives::ViewId; + u32 => spacetimedb_primitives::ViewDatabaseId; u32 => spacetimedb_primitives::TableId; u32 => spacetimedb_primitives::IndexId; u32 => spacetimedb_primitives::ConstraintId;