Skip to content

Commit 0f58cb8

Browse files
committed
invoke view
1 parent 3849a49 commit 0f58cb8

File tree

26 files changed

+1073
-257
lines changed

26 files changed

+1073
-257
lines changed

crates/client-api-messages/src/energy.rs

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -121,21 +121,21 @@ impl fmt::Debug for EnergyBalance {
121121
}
122122
}
123123

124-
/// A measure of energy representing the energy budget for a reducer.
124+
/// A measure of energy representing the energy budget for a reducer or any callable function.
125125
///
126126
/// In contrast to [`EnergyQuanta`], this is represented by a 64-bit integer. This makes energy handling
127127
/// for reducers easier, while still providing a unlikely-to-ever-be-reached maximum value (e.g. for wasmtime:
128128
/// `(u64::MAX eV / 1000 eV/instruction) * 3 ns/instruction = 640 days`)
129129
#[derive(Copy, Clone, From, Add, Sub)]
130-
pub struct ReducerBudget(u64);
130+
pub struct FunctionBudget(u64);
131131

132-
impl ReducerBudget {
132+
impl FunctionBudget {
133133
// 1 second of wasm runtime is roughly 2 TeV, so this is
134134
// roughly 1 minute of wasm runtime
135-
pub const DEFAULT_BUDGET: Self = ReducerBudget(120_000_000_000_000);
135+
pub const DEFAULT_BUDGET: Self = FunctionBudget(120_000_000_000_000);
136136

137-
pub const ZERO: Self = ReducerBudget(0);
138-
pub const MAX: Self = ReducerBudget(u64::MAX);
137+
pub const ZERO: Self = FunctionBudget(0);
138+
pub const MAX: Self = FunctionBudget(u64::MAX);
139139

140140
pub fn new(v: u64) -> Self {
141141
Self(v)
@@ -151,13 +151,13 @@ impl ReducerBudget {
151151
}
152152
}
153153

154-
impl From<ReducerBudget> for EnergyQuanta {
155-
fn from(value: ReducerBudget) -> Self {
154+
impl From<FunctionBudget> for EnergyQuanta {
155+
fn from(value: FunctionBudget) -> Self {
156156
EnergyQuanta::new(value.0.into())
157157
}
158158
}
159159

160-
impl fmt::Debug for ReducerBudget {
160+
impl fmt::Debug for FunctionBudget {
161161
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
162162
f.debug_tuple("ReducerBudget")
163163
.field(&EnergyQuanta::from(*self))

crates/client-api/src/lib.rs

Lines changed: 43 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -75,62 +75,49 @@ impl Host {
7575
.module()
7676
.await
7777
.map_err(|_| (StatusCode::NOT_FOUND, "module not found".to_string()))?;
78-
79-
let (tx_offset, durable_offset, json) = self
80-
.host_controller
81-
.using_database(
82-
database,
83-
self.replica_id,
84-
move |db| -> axum::response::Result<_, (StatusCode, String)> {
85-
tracing::info!(sql = body);
86-
87-
// We need a header for query results
88-
let mut header = vec![];
89-
90-
let sql_start = std::time::Instant::now();
91-
let sql_span =
92-
tracing::trace_span!("execute_sql", total_duration = tracing::field::Empty,).entered();
93-
94-
let result = sql::execute::run(
95-
// Returns an empty result set for mutations
96-
db,
97-
&body,
98-
auth,
99-
Some(&module_host.info().subscriptions),
100-
&mut header,
101-
)
102-
.map_err(|e| {
103-
log::warn!("{e}");
104-
if let Some(auth_err) = e.get_auth_error() {
105-
(StatusCode::UNAUTHORIZED, auth_err.to_string())
106-
} else {
107-
(StatusCode::BAD_REQUEST, e.to_string())
108-
}
109-
})?;
110-
111-
let total_duration = sql_start.elapsed();
112-
sql_span.record("total_duration", tracing::field::debug(total_duration));
113-
114-
// Turn the header into a `ProductType`
115-
let schema = header
116-
.into_iter()
117-
.map(|(col_name, col_type)| ProductTypeElement::new(col_type, Some(col_name)))
118-
.collect();
119-
120-
Ok((
121-
result.tx_offset,
122-
db.durable_tx_offset(),
123-
vec![SqlStmtResult {
124-
schema,
125-
rows: result.rows,
126-
total_duration_micros: total_duration.as_micros() as u64,
127-
stats: SqlStmtStats::from_metrics(&result.metrics),
128-
}],
129-
))
130-
},
131-
)
132-
.await
133-
.map_err(log_and_500)??;
78+
println!("Executing SQL: {}", body);
79+
80+
let (tx_offset, durable_offset, json) = {
81+
let db = module_host.module.replica_ctx().relational_db.clone();
82+
83+
tracing::info!(sql = body);
84+
85+
let mut header = vec![];
86+
let sql_start = std::time::Instant::now();
87+
let sql_span = tracing::trace_span!("execute_sql", total_duration = tracing::field::Empty);
88+
let _entered = sql_span.enter();
89+
90+
let result = sql::execute::run(&db, &body, auth, &Some(module_host), &mut header, auth.caller)
91+
.await
92+
.map_err(|e| {
93+
log::warn!("{e}");
94+
if let Some(auth_err) = e.get_auth_error() {
95+
(StatusCode::UNAUTHORIZED, auth_err.to_string())
96+
} else {
97+
(StatusCode::BAD_REQUEST, e.to_string())
98+
}
99+
})?;
100+
101+
let total_duration = sql_start.elapsed();
102+
sql_span.record("total_duration", tracing::field::debug(total_duration));
103+
104+
let schema = header
105+
.into_iter()
106+
.map(|(col_name, col_type)| ProductTypeElement::new(col_type, Some(col_name)))
107+
.collect();
108+
109+
Result::<_, axum::Error>::Ok((
110+
result.tx_offset,
111+
db.durable_tx_offset(),
112+
vec![SqlStmtResult {
113+
schema,
114+
rows: result.rows,
115+
total_duration_micros: total_duration.as_micros() as u64,
116+
stats: SqlStmtStats::from_metrics(&result.metrics),
117+
}],
118+
))
119+
}
120+
.unwrap();
134121

135122
if confirmed_read {
136123
if let Some(mut durable_offset) = durable_offset {
@@ -141,7 +128,6 @@ impl Host {
141128

142129
Ok(json)
143130
}
144-
145131
pub async fn update(
146132
&self,
147133
database: Database,

crates/core/src/db/relational_db.rs

Lines changed: 120 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,15 @@
11
use crate::db::MetricsRecorderQueue;
2-
use crate::error::{DBError, DatabaseError, RestoreSnapshotError};
2+
use crate::error::{DBError, DatabaseError, RestoreSnapshotError, ViewError};
3+
use crate::host::ArgsTuple;
34
use crate::messages::control_db::HostType;
45
use crate::subscription::ExecutionCounters;
56
use crate::util::{asyncify, spawn_rayon};
67
use crate::worker_metrics::WORKER_METRICS;
78
use anyhow::{anyhow, Context};
9+
use bytes::Bytes;
810
use enum_map::EnumMap;
911
use fs2::FileExt;
12+
use log::trace;
1013
use spacetimedb_commitlog as commitlog;
1114
use spacetimedb_commitlog::repo::OnNewSegmentFn;
1215
use spacetimedb_data_structures::map::IntSet;
@@ -20,6 +23,7 @@ use spacetimedb_datastore::locking_tx_datastore::state_view::{
2023
use spacetimedb_datastore::locking_tx_datastore::{MutTxId, TxId};
2124
use spacetimedb_datastore::system_tables::{system_tables, StModuleRow, StViewRow, ST_VIEW_ID};
2225
use spacetimedb_datastore::system_tables::{StFields, StVarFields, StVarName, StVarRow, ST_MODULE_ID, ST_VAR_ID};
26+
use spacetimedb_datastore::system_tables::{StViewArgRow, ST_VIEW_ARG_ID};
2327
use spacetimedb_datastore::traits::{
2428
InsertFlags, IsolationLevel, Metadata, MutTx as _, MutTxDatastore, Program, RowTypeForTable, Tx as _, TxDatastore,
2529
UpdateFlags,
@@ -32,16 +36,18 @@ use spacetimedb_datastore::{
3236
traits::TxData,
3337
};
3438
use spacetimedb_durability as durability;
39+
use spacetimedb_lib::bsatn::ToBsatn;
3540
use spacetimedb_lib::db::auth::StAccess;
3641
use spacetimedb_lib::db::raw_def::v9::{btree, RawModuleDefV9Builder, RawSql};
42+
use spacetimedb_lib::de::DeserializeSeed as _;
3743
use spacetimedb_lib::st_var::StVarValue;
38-
use spacetimedb_lib::ConnectionId;
3944
use spacetimedb_lib::Identity;
45+
use spacetimedb_lib::{bsatn, ConnectionId};
4046
use spacetimedb_paths::server::{CommitLogDir, ReplicaDir, SnapshotsPath};
4147
use spacetimedb_primitives::*;
4248
use spacetimedb_sats::algebraic_type::fmt::fmt_algebraic_type;
4349
use spacetimedb_sats::memory_usage::MemoryUsage;
44-
use spacetimedb_sats::{AlgebraicType, AlgebraicValue, ProductType, ProductValue};
50+
use spacetimedb_sats::{product, AlgebraicType, AlgebraicValue, ProductType, ProductValue, Typespace};
4551
use spacetimedb_schema::def::{ModuleDef, TableDef, ViewDef};
4652
use spacetimedb_schema::schema::{
4753
ColumnSchema, IndexSchema, RowLevelSecuritySchema, Schema, SequenceSchema, TableSchema,
@@ -1512,6 +1518,117 @@ impl RelationalDB {
15121518
.into()
15131519
})
15141520
}
1521+
1522+
/// Get or insert view argument into `ST_VIEW_ARG_ID`.
1523+
pub fn get_or_insert_st_view_arg(&self, tx: &mut MutTxId, args: &Bytes) -> Result<u64, DBError> {
1524+
let bytes_av = AlgebraicValue::Bytes(args.to_vec().into());
1525+
let mut rows = self.iter_by_col_eq_mut(tx, ST_VIEW_ARG_ID, col_list![1], &bytes_av)?;
1526+
1527+
// Extract the first matching `arg_id`, if any.
1528+
if let Some(res) = rows.next() {
1529+
let row = StViewArgRow::try_from(res).expect("valid StViewArgRow");
1530+
return Ok(row.id);
1531+
}
1532+
1533+
let view_arg_bytes = product![0u64, bytes_av]
1534+
.to_bsatn_vec()
1535+
.map_err(|_| ViewError::SerializeArgs)?;
1536+
1537+
let (_, view_arg_row, _) = self.insert(tx, ST_VIEW_ARG_ID, &view_arg_bytes)?;
1538+
let StViewArgRow { id: arg_id, .. } = view_arg_row.try_into().expect("valid StViewArgRow");
1539+
1540+
Ok(arg_id)
1541+
}
1542+
1543+
/// Evaluate and update View.
1544+
/// This involves:
1545+
/// 1. Serializing the view arguments into `ST_VIEW_ARG_ID`
1546+
/// 2. Deleting all rows in the view table matching the view arguments
1547+
/// 3. Deserializing the return value from the view execution
1548+
/// 4. Inserting all rows from the return value into the view table, with the arg_id
1549+
/// set to the inserted view argument's id.
1550+
/// The `typespace` is needed for deserializing the return value.
1551+
pub fn evaluate_view(
1552+
&self,
1553+
tx: &mut MutTxId,
1554+
// Name of the view to update
1555+
view: &str,
1556+
// Arguments passed to the view call
1557+
args: ArgsTuple,
1558+
// Return type of the view call
1559+
return_type: AlgebraicType,
1560+
// Serialized bytes of the return value from the view call
1561+
//TODO: pass arg_id; do the insertion during starting of invoking view
1562+
bytes: Bytes,
1563+
typespace: &Typespace,
1564+
// Identity of the caller (for non-anonymous views)
1565+
caller_identity: Identity,
1566+
) -> Result<(), DBError> {
1567+
let st_view_row = tx.lookup_st_view_by_name(view)?;
1568+
1569+
let (table_id, is_anonymous) = (
1570+
st_view_row
1571+
.table_id
1572+
.ok_or_else(|| ViewError::ViewNotFound(view.to_string()))?,
1573+
st_view_row.is_anonymous,
1574+
);
1575+
1576+
// Insert the view arguments into ST_VIEW_ARG_ID
1577+
let arg_id = self.get_or_insert_st_view_arg(tx, &args.get_bsatn())?;
1578+
1579+
// Delete all existing rows in the view table matching the view arguments
1580+
let av: AlgebraicValue = args.tuple.into();
1581+
let rows_to_delete: Vec<_> = self
1582+
.iter_by_col_eq_mut(tx, table_id, col_list![0], &av)?
1583+
.map(|res| res.pointer())
1584+
.collect();
1585+
let count = self.delete(tx, table_id, rows_to_delete);
1586+
trace!("Deleted {count} rows from view table {table_id} for arg_id {arg_id}");
1587+
1588+
// Deserialize the return value
1589+
let seed = spacetimedb_sats::WithTypespace::new(typespace, &return_type);
1590+
let return_val = seed
1591+
.deserialize(bsatn::Deserializer::new(&mut &bytes[..]))
1592+
.map_err(|e| ViewError::DeserializeReturn(e.to_string()))?;
1593+
let products = Self::extract_products(return_val, &return_type)?;
1594+
1595+
// Insert all rows from the return value into the view table
1596+
for product in products {
1597+
let row = {
1598+
let mut elements = Vec::with_capacity(2 + product.elements.len());
1599+
elements.push(if is_anonymous {
1600+
AlgebraicValue::OptionNone()
1601+
} else {
1602+
AlgebraicValue::OptionSome(caller_identity.into())
1603+
});
1604+
elements.push(AlgebraicValue::U64(arg_id));
1605+
elements.extend_from_slice(&product.elements);
1606+
1607+
ProductValue {
1608+
elements: elements.into_boxed_slice(),
1609+
}
1610+
};
1611+
let row_bytes = row.to_bsatn_vec().map_err(|_| ViewError::SerializeRow)?;
1612+
self.insert(tx, table_id, &row_bytes)?;
1613+
}
1614+
1615+
Ok(())
1616+
}
1617+
1618+
fn extract_products(
1619+
return_val: AlgebraicValue,
1620+
return_type: &AlgebraicType,
1621+
) -> Result<Vec<ProductValue>, ViewError> {
1622+
if return_type.is_array() {
1623+
let arr = return_val.into_array().expect("return type is array");
1624+
Ok(arr.into_iter().map(|v| v.into_product().unwrap()).collect())
1625+
} else if return_type.is_option() {
1626+
let opt = return_val.into_option().expect("return type is option");
1627+
Ok(opt.into_iter().map(|v| v.into_product().unwrap()).collect())
1628+
} else {
1629+
Err(ViewError::InvalidReturnType(return_type.clone()))
1630+
}
1631+
}
15151632
}
15161633

15171634
#[allow(unused)]

crates/core/src/energy.rs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -5,18 +5,18 @@ use spacetimedb_lib::{Hash, Identity};
55
use crate::messages::control_db::Database;
66

77
pub use spacetimedb_client_api_messages::energy::*;
8-
pub struct ReducerFingerprint<'a> {
8+
pub struct FunctionFingerprint<'a> {
99
pub module_hash: Hash,
1010
pub module_identity: Identity,
1111
pub caller_identity: Identity,
12-
pub reducer_name: &'a str,
12+
pub function_name: &'a str,
1313
}
1414

1515
pub trait EnergyMonitor: Send + Sync + 'static {
16-
fn reducer_budget(&self, fingerprint: &ReducerFingerprint<'_>) -> ReducerBudget;
16+
fn reducer_budget(&self, fingerprint: &FunctionFingerprint<'_>) -> FunctionBudget;
1717
fn record_reducer(
1818
&self,
19-
fingerprint: &ReducerFingerprint<'_>,
19+
fingerprint: &FunctionFingerprint<'_>,
2020
energy_used: EnergyQuanta,
2121
execution_duration: Duration,
2222
);
@@ -29,13 +29,13 @@ pub trait EnergyMonitor: Send + Sync + 'static {
2929
pub struct NullEnergyMonitor;
3030

3131
impl EnergyMonitor for NullEnergyMonitor {
32-
fn reducer_budget(&self, _fingerprint: &ReducerFingerprint<'_>) -> ReducerBudget {
33-
ReducerBudget::DEFAULT_BUDGET
32+
fn reducer_budget(&self, _fingerprint: &FunctionFingerprint<'_>) -> FunctionBudget {
33+
FunctionBudget::DEFAULT_BUDGET
3434
}
3535

3636
fn record_reducer(
3737
&self,
38-
_fingerprint: &ReducerFingerprint<'_>,
38+
_fingerprint: &FunctionFingerprint<'_>,
3939
_energy_used: EnergyQuanta,
4040
_execution_duration: Duration,
4141
) {

0 commit comments

Comments
 (0)