@@ -20,12 +20,12 @@ use futures::future::OptionFuture;
2020use futures:: stream:: FuturesUnordered ;
2121use futures:: { FutureExt , StreamExt , stream} ;
2222use metrics:: counter;
23- use tokio_stream:: wrappers:: ReceiverStream ;
23+ use tokio_stream:: wrappers:: { ReceiverStream , WatchStream } ;
2424use tracing:: { debug, trace} ;
2525
2626use restate_bifrost:: CommitToken ;
2727use restate_core:: network:: { Oneshot , Reciprocal } ;
28- use restate_core:: { TaskCenter , TaskHandle , TaskId } ;
28+ use restate_core:: { Metadata , MetadataKind , TaskCenter , TaskHandle , TaskId } ;
2929use restate_partition_store:: PartitionStore ;
3030use restate_types:: identifiers:: {
3131 InvocationId , LeaderEpoch , PartitionId , PartitionKey , PartitionProcessorRpcRequestId ,
@@ -36,6 +36,7 @@ use restate_types::net::partition_processor::{
3636 PartitionProcessorRpcError , PartitionProcessorRpcResponse ,
3737} ;
3838use restate_types:: time:: MillisSinceEpoch ;
39+ use restate_types:: { Version , Versioned } ;
3940use restate_wal_protocol:: Command ;
4041use restate_wal_protocol:: timer:: TimerKeyValue ;
4142
@@ -45,7 +46,7 @@ use crate::partition::leadership::self_proposer::SelfProposer;
4546use crate :: partition:: leadership:: { ActionEffect , Error , InvokerStream , TimerService } ;
4647use crate :: partition:: shuffle;
4748use crate :: partition:: shuffle:: HintSender ;
48- use crate :: partition:: state_machine:: Action ;
49+ use crate :: partition:: state_machine:: { Action , StateMachineRef } ;
4950
5051use super :: durability_tracker:: DurabilityTracker ;
5152
@@ -73,6 +74,7 @@ pub struct LeaderState {
7374
7475 invoker_stream : InvokerStream ,
7576 shuffle_stream : ReceiverStream < shuffle:: OutboxTruncation > ,
77+ schema_stream : WatchStream < Version > ,
7678 pub pending_cleanup_timers_to_schedule : VecDeque < ( InvocationId , Duration ) > ,
7779 cleaner_task_id : TaskId ,
7880 trimmer_task_id : TaskId ,
@@ -103,6 +105,9 @@ impl LeaderState {
103105 cleaner_task_id,
104106 trimmer_task_id,
105107 shuffle_hint_tx,
108+ schema_stream : Metadata :: with_current ( |m| {
109+ WatchStream :: new ( m. watch ( MetadataKind :: Schema ) )
110+ } ) ,
106111 timer_service : Box :: pin ( timer_service) ,
107112 self_proposer,
108113 awaiting_rpc_actions : Default :: default ( ) ,
@@ -119,7 +124,10 @@ impl LeaderState {
119124 ///
120125 /// Important: The future needs to be cancellation safe since it is polled as a tokio::select
121126 /// arm!
122- pub async fn run ( & mut self ) -> Result < Vec < ActionEffect > , Error > {
127+ pub async fn run (
128+ & mut self ,
129+ state_machine : StateMachineRef < ' _ > ,
130+ ) -> Result < Vec < ActionEffect > , Error > {
123131 let timer_stream = std:: pin:: pin!( stream:: unfold(
124132 & mut self . timer_service,
125133 |timer_service| async {
@@ -128,6 +136,21 @@ impl LeaderState {
128136 }
129137 ) ) ;
130138
139+ let schema_stream = ( & mut self . schema_stream )
140+ . filter ( |version| {
141+ // only upsert schema iff version is newer that
142+ futures:: future:: ready (
143+ state_machine
144+ . schema
145+ . as_ref ( )
146+ . map ( |schema| schema. version ( ) < * version)
147+ . unwrap_or ( true ) ,
148+ )
149+ } )
150+ . map ( |_| {
151+ let schema = Metadata :: with_current ( |m| m. schema ( ) . clone ( ) ) ;
152+ ActionEffect :: UpsertSchema ( schema)
153+ } ) ;
131154 let invoker_stream = ( & mut self . invoker_stream ) . map ( ActionEffect :: Invoker ) ;
132155 let shuffle_stream = ( & mut self . shuffle_stream ) . map ( ActionEffect :: Shuffle ) ;
133156 let dur_tracker_stream =
@@ -155,7 +178,8 @@ impl LeaderState {
155178 timer_stream,
156179 action_effects_stream,
157180 awaiting_rpc_self_propose_stream,
158- dur_tracker_stream
181+ dur_tracker_stream,
182+ schema_stream
159183 ) ;
160184 let mut all_streams = all_streams. ready_chunks ( BATCH_READY_UP_TO ) ;
161185
@@ -284,6 +308,16 @@ impl LeaderState {
284308 )
285309 . await ?;
286310 }
311+ ActionEffect :: UpsertSchema ( schema) => {
312+ debug ! (
313+ "Self purposing {schema:?} for partition key {}" ,
314+ self . own_partition_key
315+ ) ;
316+
317+ self . self_proposer
318+ . propose ( self . own_partition_key , Command :: UpsertSchema ( schema) )
319+ . await ?;
320+ }
287321 ActionEffect :: AwaitingRpcSelfProposeDone => {
288322 // Nothing to do here
289323 }
0 commit comments