diff --git a/crates/codegen/src/rust.rs b/crates/codegen/src/rust.rs index 7dabb84f2c8..399dd5495ec 100644 --- a/crates/codegen/src/rust.rs +++ b/crates/codegen/src/rust.rs @@ -979,7 +979,7 @@ fn print_db_update_defn(module: &ModuleDef, out: &mut Indenter) { for table in iter_tables(module) { writeln!( out, - "{}: __sdk::TableUpdate<{}>,", + "pub {}: __sdk::TableUpdate<{}>,", table_method_name(&table.name), type_ref_name(module, table.product_type_ref), ); diff --git a/sdks/rust/src/db_connection.rs b/sdks/rust/src/db_connection.rs index 7306f45160e..6ed4c680c8e 100644 --- a/sdks/rust/src/db_connection.rs +++ b/sdks/rust/src/db_connection.rs @@ -43,6 +43,7 @@ use std::{ use tokio::{ runtime::{self, Runtime}, sync::Mutex as TokioMutex, + sync::mpsc::UnboundedSender as TokioSender, }; pub(crate) type SharedCell = Arc>; @@ -78,6 +79,9 @@ pub struct DbContextImpl { /// from which [Self::apply_pending_mutations] and friends read mutations. pending_mutations_recv: Arc>>>, + /// Send channel for all database updates + update_send: Option>, + /// This connection's `Identity`. /// /// May be `None` if we connected anonymously @@ -103,6 +107,7 @@ impl Clone for DbContextImpl { recv: Arc::clone(&self.recv), pending_mutations_send: self.pending_mutations_send.clone(), pending_mutations_recv: Arc::clone(&self.pending_mutations_recv), + update_send: self.update_send.clone(), identity: Arc::clone(&self.identity), connection_id: Arc::clone(&self.connection_id), } @@ -240,7 +245,12 @@ impl DbContextImpl { // so that it will be unlocked when callbacks run. let applied_diff = { let mut cache = self.cache.lock().unwrap(); - update.apply_to_client_cache(&mut *cache) + if let Some(update_send) = &self.update_send { + update_send.send(update).unwrap(); + Default::default() + } else { + update.apply_to_client_cache(&mut *cache) + } }; let mut inner = self.inner.lock().unwrap(); @@ -769,6 +779,8 @@ pub struct DbConnectionBuilder { on_connect_error: Option>, on_disconnect: Option>, + update_send: Option>, + params: WsParams, } @@ -816,6 +828,7 @@ impl DbConnectionBuilder { on_connect: None, on_connect_error: None, on_disconnect: None, + update_send: None, params: <_>::default(), } } @@ -901,6 +914,7 @@ but you must call one of them, or else the connection will never progress. recv: Arc::new(TokioMutex::new(parsed_recv_chan)), pending_mutations_send, pending_mutations_recv: Arc::new(TokioMutex::new(pending_mutations_recv)), + update_send: self.update_send, identity: Arc::new(StdMutex::new(None)), connection_id: Arc::new(StdMutex::new(connection_id_override)), }; @@ -980,6 +994,15 @@ but you must call one of them, or else the connection will never progress. self } + /// Register a channel that receives all database updates instead of the + /// client cache. If you use this method, none of the [`TableHandle`] + /// callbacks will ever fire, and no data is stored in the [`ClientCache`]. + #[doc(hidden)] + pub fn with_update_channel(mut self, sender: TokioSender) -> Self { + self.update_send = Some(sender); + self + } + /// Register a callback to run when the connection is successfully initiated. /// /// The callback will receive three arguments: diff --git a/sdks/rust/src/spacetime_module.rs b/sdks/rust/src/spacetime_module.rs index 8b795d7b426..e98cd3f051c 100644 --- a/sdks/rust/src/spacetime_module.rs +++ b/sdks/rust/src/spacetime_module.rs @@ -54,10 +54,10 @@ pub trait SpacetimeModule: Send + Sync + 'static { type SetReducerFlags: InModule + Send + 'static; /// Parsed and typed analogue of [`crate::ws::DatabaseUpdate`]. - type DbUpdate: DbUpdate; + type DbUpdate: DbUpdate + Send + 'static; /// The result of applying `Self::DbUpdate` to the client cache. - type AppliedDiff<'r>: AppliedDiff<'r, Module = Self>; + type AppliedDiff<'r>: AppliedDiff<'r, Module = Self> + Default; /// Module-specific `SubscriptionHandle` type, representing an ongoing incremental subscription to a query. type SubscriptionHandle: SubscriptionHandle;