@@ -1519,97 +1519,110 @@ impl RelationalDB {
15191519 } )
15201520 }
15211521
1522- /// Evaluate and update View.
1523- /// This involves:
1524- /// 1. Serializing the view arguments into `ST_VIEW_ARG_ID`
1525- /// 2. Deleting all rows in the view table matching the view arguments
1526- /// 3. Deserializing the return value from the view execution
1527- /// 4. Inserting all rows from the return value into the view table, with the arg_id
1528- /// set to the inserted view argument's id.
1529- /// The `typespace` is needed for deserializing the return value.
1530- #[ allow( clippy:: too_many_arguments) ]
1531- pub fn evaluate_view (
1532- & self ,
1533- tx : & mut MutTxId ,
1534- // Name of the view to update
1535- view : & str ,
1536- // Arguments passed to the view call
1537- args : ArgsTuple ,
1538- // Return type of the view call
1539- return_type : AlgebraicType ,
1540- // Serialized bytes of the return value from the view call
1541- //TODO: pass arg_id; do the insertion during starting of invoking view
1542- bytes : Bytes ,
1543- typespace : & Typespace ,
1544- // Identity of the caller (for non-anonymous views)
1545- caller_identity : Identity ,
1546- ) -> Result < ( ) , DBError > {
1547- let st_view_row = tx. lookup_st_view_by_name ( view) ?;
1548-
1549- let ( table_id, is_anonymous) = (
1550- st_view_row
1551- . table_id
1552- . expect ( "Tables are always created for views upon view creation" ) ,
1553- st_view_row. is_anonymous ,
1554- ) ;
1555-
1556- let arg_id = tx. get_or_insert_st_view_arg ( args. get_bsatn ( ) ) ?;
15571522
1558- let input_rows = product ! [
1559- if is_anonymous {
1560- AlgebraicValue :: OptionNone ( )
1561- } else {
1562- AlgebraicValue :: OptionSome ( caller_identity. into( ) )
1563- } ,
1564- AlgebraicValue :: U64 ( arg_id)
1565- ] ;
1566-
1567- // Delete all existing rows in the view table matching the view arguments
1568- let rows_to_delete: Vec < _ > = self
1569- . iter_by_col_eq_mut ( tx, table_id, [ 0 , 1 ] , & input_rows. clone ( ) . into ( ) ) ?
1570- . map ( |res| res. pointer ( ) )
1571- . collect ( ) ;
1572-
1573- let count = self . delete ( tx, table_id, rows_to_delete) ;
1574- trace ! ( "Deleted {count} rows from view table {table_id} for arg_id {arg_id}" ) ;
1575-
1576- // Deserialize the return value
1577- let seed = spacetimedb_sats:: WithTypespace :: new ( typespace, & return_type) ;
1578- let return_val = seed
1579- . deserialize ( bsatn:: Deserializer :: new ( & mut & bytes[ ..] ) )
1580- . map_err ( |e| DatastoreError :: from ( ViewError :: DeserializeReturn ( e. to_string ( ) ) ) ) ?;
1581-
1582- let products: Vec < ProductValue > = if return_type. is_array ( ) {
1583- let arr = return_val. into_array ( ) . expect ( "return type is array" ) ;
1584- Ok ( arr. into_iter ( ) . map ( |v| v. into_product ( ) . unwrap ( ) ) . collect ( ) )
1585- } else if return_type. is_option ( ) {
1586- let opt = return_val. into_option ( ) . expect ( "return type is option" ) ;
1587- Ok ( opt. into_iter ( ) . map ( |v| v. into_product ( ) . unwrap ( ) ) . collect ( ) )
1523+ /// Materialize View backing table.
1524+ ///
1525+ /// # Process
1526+ /// 1. Serializes view arguments into `ST_VIEW_ARG_ID`
1527+ /// 2. Deletes stale rows matching the view arguments
1528+ /// 3. Deserializes the new view execution results
1529+ /// 4. Inserts fresh rows with the corresponding arg_id
1530+ ///
1531+ /// # Arguments
1532+ /// * `tx` - Mutable transaction context
1533+ /// * `view` - Name of the view to update
1534+ /// * `args` - Arguments passed to the view call
1535+ /// * `return_type` - Expected return type of the view
1536+ /// * `bytes` - Serialized (bsatn encoded) return value from view execution
1537+ /// * `typespace` - Type information for deserialization
1538+ /// * `caller_identity` - Identity of the caller (for non-anonymous views)
1539+ #[ allow( clippy:: too_many_arguments) ]
1540+ pub fn materialize_view (
1541+ & self ,
1542+ tx : & mut MutTxId ,
1543+ view : & str ,
1544+ args : ArgsTuple ,
1545+ return_type : AlgebraicType ,
1546+ bytes : Bytes ,
1547+ typespace : & Typespace ,
1548+ caller_identity : Identity ,
1549+ ) -> Result < ( ) , DBError > {
1550+ // Fetch view metadata
1551+ let st_view_row = tx. lookup_st_view_by_name ( view) ?;
1552+ let table_id = st_view_row
1553+ . table_id
1554+ . expect ( "View table must exist for materialization" ) ;
1555+ let is_anonymous = st_view_row. is_anonymous ;
1556+
1557+ let arg_id = tx. get_or_insert_st_view_arg ( args. get_bsatn ( ) ) ?;
1558+
1559+ // Build the filter key for identifying rows to update
1560+ let input_args = product ! [
1561+ if is_anonymous {
1562+ AlgebraicValue :: OptionNone ( )
15881563 } else {
1589- Err ( DatastoreError :: from ( ViewError :: InvalidReturnType ( return_type. clone ( ) ) ) )
1590- } ?;
1591-
1592- // Insert all rows from the return value into the view table
1593- for product in products {
1594- let row = {
1595- let mut elements = Vec :: with_capacity ( 2 + product. elements . len ( ) ) ;
1596- elements. extend_from_slice ( & input_rows. elements ) ;
1597- elements. append ( & mut product. elements . to_vec ( ) ) ;
1598-
1599- ProductValue {
1600- elements : elements. into_boxed_slice ( ) ,
1601- }
1602- } ;
1603- let row_bytes = row
1604- . to_bsatn_vec ( )
1605- . map_err ( |_| DatastoreError :: from ( ViewError :: SerializeRow ) ) ?;
1606- self . insert ( tx, table_id, & row_bytes) ?;
1607- }
1564+ AlgebraicValue :: OptionSome ( caller_identity. into( ) )
1565+ } ,
1566+ AlgebraicValue :: U64 ( arg_id)
1567+ ] ;
1568+
1569+ // Remove stale View entries
1570+ let rows_to_delete: Vec < _ > = self
1571+ . iter_by_col_eq_mut ( tx, table_id, [ 0 , 1 ] , & input_args. clone ( ) . into ( ) ) ?
1572+ . map ( |res| res. pointer ( ) )
1573+ . collect ( ) ;
1574+
1575+ let deleted_count = self . delete ( tx, table_id, rows_to_delete) ;
1576+ trace ! (
1577+ "Deleted {deleted_count} stale rows from view table {table_id} for arg_id {arg_id}"
1578+ ) ;
1579+
1580+ // Deserialize the return value
1581+ let seed = spacetimedb_sats:: WithTypespace :: new ( typespace, & return_type) ;
1582+ let return_val = seed
1583+ . deserialize ( bsatn:: Deserializer :: new ( & mut & bytes[ ..] ) )
1584+ . map_err ( |e| DatastoreError :: from ( ViewError :: DeserializeReturn ( e. to_string ( ) ) ) ) ?;
1585+
1586+ // Extract products from return value (must be array or option)
1587+ let products: Vec < ProductValue > = if return_type. is_array ( ) {
1588+ let arr = return_val
1589+ . into_array ( )
1590+ . expect ( "return_type.is_array() ensures this is an array" ) ;
1591+
1592+ arr. into_iter ( ) . map ( |v| v. into_product ( ) . unwrap ( ) ) . collect ( )
1593+ } else if return_type. is_option ( ) {
1594+ let opt = return_val
1595+ . into_option ( )
1596+ . expect ( "return_type.is_option() ensures this is an option" ) ;
1597+ opt. into_iter ( ) . map ( |v| v. into_product ( ) . unwrap ( ) ) . collect ( )
1598+ } else {
1599+ return Err ( DatastoreError :: from ( ViewError :: InvalidReturnType ( return_type) ) . into ( ) ) ;
1600+ } ;
16081601
1609- Ok ( ( ) )
1602+ // Insert fresh results into the view table
1603+ let mut elements: Vec < AlgebraicValue > = Vec :: with_capacity ( input_args. elements . len ( ) + products. first ( ) . map_or ( 0 , |p| p. elements . len ( ) ) ) ;
1604+ for product in products {
1605+ elements. clear ( ) ;
1606+ // Build complete row by prepending filter key to product data
1607+ let mut elements = Vec :: with_capacity ( input_args. elements . len ( ) + product. elements . len ( ) ) ;
1608+ elements. extend_from_slice ( & input_args. elements ) ;
1609+ elements. extend_from_slice ( & product. elements ) ;
1610+
1611+ let row = ProductValue {
1612+ elements : elements. into_boxed_slice ( ) ,
1613+ } ;
1614+
1615+ let row_bytes = row
1616+ . to_bsatn_vec ( )
1617+ . map_err ( |_| DatastoreError :: from ( ViewError :: SerializeRow ) ) ?;
1618+
1619+ self . insert ( tx, table_id, & row_bytes) ?;
16101620 }
1621+
1622+ Ok ( ( ) )
16111623}
16121624
1625+ }
16131626#[ allow( unused) ]
16141627#[ derive( Clone ) ]
16151628struct LockFile {
0 commit comments