diff --git a/crates/types/src/restate_version.rs b/crates/types/src/restate_version.rs index dc9e1084ff..ec6e7111ad 100644 --- a/crates/types/src/restate_version.rs +++ b/crates/types/src/restate_version.rs @@ -88,7 +88,7 @@ impl SemanticRestateVersion { Self(semver::Version::parse("0.0.0-unknown").unwrap()) } - pub fn new(major: u64, minor: u64, patch: u64) -> Self { + pub const fn new(major: u64, minor: u64, patch: u64) -> Self { Self(semver::Version::new(major, minor, patch)) } diff --git a/crates/types/src/schema/metadata/mod.rs b/crates/types/src/schema/metadata/mod.rs index 0598d51f00..c7effdee88 100644 --- a/crates/types/src/schema/metadata/mod.rs +++ b/crates/types/src/schema/metadata/mod.rs @@ -46,8 +46,9 @@ use crate::{Version, Versioned, identifiers}; /// Serializable data structure representing the schema registry /// /// Do not leak the representation as this data structure, as it strictly depends on SchemaUpdater, SchemaRegistry and the Admin API. -#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +#[derive(derive_more::Debug, Clone, serde::Serialize, serde::Deserialize)] #[serde(from = "serde_hacks::Schema", into = "serde_hacks::Schema")] +#[debug("Schema(version: {version})")] pub struct Schema { /// This gets bumped on each update. version: Version, diff --git a/crates/wal-protocol/src/control.rs b/crates/wal-protocol/src/control.rs index c51d642729..01bfa59368 100644 --- a/crates/wal-protocol/src/control.rs +++ b/crates/wal-protocol/src/control.rs @@ -12,6 +12,7 @@ use std::ops::RangeInclusive; use restate_types::identifiers::{LeaderEpoch, PartitionId, PartitionKey}; use restate_types::logs::{Keys, Lsn}; +use restate_types::schema::Schema; use restate_types::time::MillisSinceEpoch; use restate_types::{GenerationalNodeId, SemanticRestateVersion}; @@ -62,3 +63,13 @@ pub struct PartitionDurability { /// Timestamp which the durability point was updated pub modification_time: MillisSinceEpoch, } + +/// Consistently store schema across partition replicas. +/// +/// Since v1.6.0. +#[derive(Debug, Clone)] +#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))] +pub struct UpsertSchema { + pub partition_key_range: Keys, + pub schema: Schema, +} diff --git a/crates/wal-protocol/src/lib.rs b/crates/wal-protocol/src/lib.rs index fd9f29482e..fda2b286ca 100644 --- a/crates/wal-protocol/src/lib.rs +++ b/crates/wal-protocol/src/lib.rs @@ -15,12 +15,12 @@ use restate_types::invocation::{ InvocationTermination, NotifySignalRequest, PurgeInvocationRequest, RestartAsNewInvocationRequest, ResumeInvocationRequest, ServiceInvocation, }; -use restate_types::logs; use restate_types::logs::{HasRecordKeys, Keys, MatchKeyQuery}; use restate_types::message::MessageIndex; use restate_types::state_mut::ExternalStateMutation; +use restate_types::{flexbuffers_storage_encode_decode, logs}; -use crate::control::{AnnounceLeader, VersionBarrier}; +use crate::control::{AnnounceLeader, UpsertSchema, VersionBarrier}; use crate::timer::TimerKeyValue; use self::control::PartitionDurability; @@ -29,13 +29,15 @@ pub mod control; pub mod timer; /// The primary envelope for all messages in the system. -#[derive(Debug, Clone, PartialEq, Eq)] +#[derive(Debug, Clone)] #[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))] pub struct Envelope { pub header: Header, pub command: Command, } +flexbuffers_storage_encode_decode!(Envelope); + impl Envelope { pub fn new(header: Header, command: Command) -> Self { Self { header, command } @@ -125,7 +127,7 @@ pub enum Destination { } /// State machine input commands -#[derive(Debug, Clone, PartialEq, Eq, strum::EnumDiscriminants, strum::VariantNames)] +#[derive(Debug, Clone, strum::EnumDiscriminants, strum::VariantNames)] #[strum_discriminants(derive(strum::IntoStaticStr))] #[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))] pub enum Command { @@ -183,6 +185,10 @@ pub enum Command { NotifyGetInvocationOutputResponse(GetInvocationOutputResponse), /// Notify a signal. NotifySignal(NotifySignalRequest), + + /// Upsert schema for consistent schema across replicas + /// *Since v1.6.0 + UpsertSchema(UpsertSchema), } impl Command { @@ -233,6 +239,7 @@ impl HasRecordKeys for Envelope { Command::InvocationResponse(response) => Keys::Single(response.partition_key()), Command::NotifySignal(sig) => Keys::Single(sig.partition_key()), Command::NotifyGetInvocationOutputResponse(res) => Keys::Single(res.partition_key()), + Command::UpsertSchema(schema) => schema.partition_key_range.clone(), } } } @@ -242,397 +249,3 @@ impl MatchKeyQuery for Envelope { self.record_keys().matches_key_query(query) } } - -#[cfg(feature = "serde")] -mod envelope { - use bilrost::{Message, OwnedMessage}; - use bytes::{Buf, Bytes, BytesMut}; - - use restate_storage_api::protobuf_types::v1 as protobuf; - use restate_types::storage::decode::{decode_bilrost, decode_serde}; - use restate_types::storage::encode::{encode_bilrost, encode_serde}; - use restate_types::storage::{ - StorageCodecKind, StorageDecode, StorageDecodeError, StorageEncode, StorageEncodeError, - }; - - use crate::Command; - - impl StorageEncode for crate::Envelope { - fn encode(&self, buf: &mut BytesMut) -> Result<(), StorageEncodeError> { - use bytes::BufMut; - match self.default_codec() { - StorageCodecKind::FlexbuffersSerde => encode_serde(self, buf, self.default_codec()), - StorageCodecKind::Custom => { - buf.put_slice(&encode(self)?); - Ok(()) - } - _ => unreachable!("developer error"), - } - } - - fn default_codec(&self) -> StorageCodecKind { - // todo: Could be changed in v1.6 - StorageCodecKind::FlexbuffersSerde - } - } - - impl StorageDecode for crate::Envelope { - fn decode( - buf: &mut B, - kind: StorageCodecKind, - ) -> Result - where - Self: Sized, - { - match kind { - StorageCodecKind::Json - | StorageCodecKind::BincodeSerde - | StorageCodecKind::FlexbuffersSerde => decode_serde(buf, kind).map_err(|err| { - tracing::error!(%err, "{} decode failure (decoding Envelope)", kind); - err - }), - StorageCodecKind::LengthPrefixedRawBytes - | StorageCodecKind::Protobuf - | StorageCodecKind::Bilrost => Err(StorageDecodeError::UnsupportedCodecKind(kind)), - StorageCodecKind::Custom => decode(buf), - } - } - } - - #[derive(Debug, thiserror::Error)] - enum DecodeError { - #[error("missing field codec")] - MissingFieldCodec, - #[error("unknown command kind")] - UnknownCommandKind, - #[error("unexpected codec kind {0}")] - UnexpectedCodec(StorageCodecKind), - } - - impl From for StorageDecodeError { - fn from(value: DecodeError) -> Self { - Self::DecodeValue(value.into()) - } - } - - #[derive(PartialEq, Eq, bilrost::Enumeration)] - enum CommandKind { - Unknown = 0, - AnnounceLeader = 1, // flexbuffers - PatchState = 2, // protobuf - TerminateInvocation = 3, // flexbuffers - PurgeInvocation = 4, // flexbuffers - Invoke = 5, // protobuf - TruncateOutbox = 6, // flexbuffers - ProxyThrough = 7, // protobuf - AttachInvocation = 8, // protobuf - InvokerEffect = 9, // flexbuffers - Timer = 10, // flexbuffers - ScheduleTimer = 11, // flexbuffers - InvocationResponse = 12, // protobuf - NotifyGetInvocationOutputResponse = 13, // bilrost - NotifySignal = 14, // protobuf - PurgeJournal = 15, // flexbuffers - VersionBarrier = 16, // bilrost - UpdatePartitionDurability = 17, // bilrost - ResumeInvocation = 18, // flexbuffers - RestartAsNewInvocation = 19, // flexbuffers - } - - #[derive(bilrost::Message)] - struct Field { - #[bilrost(1)] - codec: Option, - #[bilrost(2)] - bytes: Bytes, - } - - impl Field { - fn encode_serde( - codec: StorageCodecKind, - value: &T, - ) -> Result { - let mut buf = BytesMut::new(); - encode_serde(value, &mut buf, codec)?; - - Ok(Self { - codec: Some(codec), - bytes: buf.freeze(), - }) - } - - fn encode_bilrost(value: &T) -> Result { - Ok(Self { - codec: Some(StorageCodecKind::Bilrost), - bytes: encode_bilrost(value), - }) - } - - fn encode_protobuf(value: &T) -> Result { - let mut buf = BytesMut::new(); - value - .encode(&mut buf) - .map_err(|err| StorageEncodeError::EncodeValue(err.into()))?; - - Ok(Self { - codec: Some(StorageCodecKind::Protobuf), - bytes: buf.freeze(), - }) - } - - fn decode_serde(mut self) -> Result { - let codec = self.codec()?; - if !matches!( - codec, - StorageCodecKind::Json - | StorageCodecKind::FlexbuffersSerde - | StorageCodecKind::BincodeSerde - ) { - return Err(StorageDecodeError::UnsupportedCodecKind(codec)); - } - - decode_serde( - &mut self.bytes, - self.codec.ok_or(DecodeError::MissingFieldCodec)?, - ) - } - - fn decode_bilrost(mut self) -> Result { - let codec = self.codec()?; - if codec != StorageCodecKind::Bilrost { - return Err(StorageDecodeError::UnsupportedCodecKind(codec)); - } - - decode_bilrost(&mut self.bytes) - } - - fn decode_protobuf(self) -> Result { - let codec = self.codec()?; - if codec != StorageCodecKind::Protobuf { - return Err(StorageDecodeError::UnsupportedCodecKind(codec)); - } - - T::decode(self.bytes).map_err(|err| StorageDecodeError::DecodeValue(err.into())) - } - - fn codec(&self) -> Result { - self.codec.ok_or(DecodeError::MissingFieldCodec) - } - } - - #[derive(bilrost::Message)] - struct Envelope { - #[bilrost(1)] - header: Field, - #[bilrost(2)] - command_kind: CommandKind, - #[bilrost(3)] - command: Field, - } - - macro_rules! codec_or_error { - ($field:expr, $expected:path) => {{ - let codec = $field.codec()?; - if !matches!(codec, $expected) { - return Err(DecodeError::UnexpectedCodec(codec).into()); - } - }}; - } - - pub fn encode(envelope: &super::Envelope) -> Result { - // todo(azmy): avoid clone? this will require change to `From` implementation - let (command_kind, command) = match &envelope.command { - Command::UpdatePartitionDurability(value) => ( - CommandKind::UpdatePartitionDurability, - Field::encode_bilrost(value), - ), - Command::VersionBarrier(value) => { - (CommandKind::VersionBarrier, Field::encode_bilrost(value)) - } - Command::AnnounceLeader(value) => ( - CommandKind::AnnounceLeader, - Field::encode_serde(StorageCodecKind::FlexbuffersSerde, value), - ), - Command::PatchState(value) => { - let value = protobuf::StateMutation::from(value.clone()); - (CommandKind::PatchState, Field::encode_protobuf(&value)) - } - Command::TerminateInvocation(value) => ( - CommandKind::TerminateInvocation, - Field::encode_serde(StorageCodecKind::FlexbuffersSerde, value), - ), - Command::PurgeInvocation(value) => ( - CommandKind::PurgeInvocation, - Field::encode_serde(StorageCodecKind::FlexbuffersSerde, value), - ), - Command::ResumeInvocation(value) => ( - CommandKind::ResumeInvocation, - Field::encode_serde(StorageCodecKind::FlexbuffersSerde, value), - ), - Command::RestartAsNewInvocation(value) => ( - CommandKind::RestartAsNewInvocation, - Field::encode_serde(StorageCodecKind::FlexbuffersSerde, value), - ), - Command::PurgeJournal(value) => ( - CommandKind::PurgeJournal, - Field::encode_serde(StorageCodecKind::FlexbuffersSerde, value), - ), - Command::Invoke(value) => { - let value = protobuf::ServiceInvocation::from(value.as_ref()); - (CommandKind::Invoke, Field::encode_protobuf(&value)) - } - Command::TruncateOutbox(value) => ( - CommandKind::TruncateOutbox, - Field::encode_serde(StorageCodecKind::FlexbuffersSerde, value), - ), - Command::ProxyThrough(value) => { - let value = protobuf::ServiceInvocation::from(value.as_ref()); - (CommandKind::ProxyThrough, Field::encode_protobuf(&value)) - } - Command::AttachInvocation(value) => { - let value = protobuf::outbox_message::AttachInvocationRequest::from(value.clone()); - ( - CommandKind::AttachInvocation, - Field::encode_protobuf(&value), - ) - } - Command::InvokerEffect(value) => ( - CommandKind::InvokerEffect, - Field::encode_serde(StorageCodecKind::FlexbuffersSerde, value), - ), - Command::Timer(value) => ( - CommandKind::Timer, - Field::encode_serde(StorageCodecKind::FlexbuffersSerde, value), - ), - Command::ScheduleTimer(value) => ( - CommandKind::ScheduleTimer, - Field::encode_serde(StorageCodecKind::FlexbuffersSerde, value), - ), - Command::InvocationResponse(value) => { - let value = - protobuf::outbox_message::OutboxServiceInvocationResponse::from(value.clone()); - ( - CommandKind::InvocationResponse, - Field::encode_protobuf(&value), - ) - } - Command::NotifyGetInvocationOutputResponse(value) => ( - CommandKind::NotifyGetInvocationOutputResponse, - Field::encode_bilrost(value), - ), - Command::NotifySignal(value) => { - let value = protobuf::outbox_message::NotifySignal::from(value.clone()); - (CommandKind::NotifySignal, Field::encode_protobuf(&value)) - } - }; - - let dto = Envelope { - header: Field::encode_serde(StorageCodecKind::FlexbuffersSerde, &envelope.header)?, - command_kind, - command: command?, - }; - - Ok(dto.encode_contiguous().into_vec().into()) - } - - pub fn decode(buf: B) -> Result { - let envelope = - Envelope::decode(buf).map_err(|err| StorageDecodeError::DecodeValue(err.into()))?; - - // header is encoded with serde - codec_or_error!(envelope.header, StorageCodecKind::FlexbuffersSerde); - let header = envelope.header.decode_serde::()?; - - let command = match envelope.command_kind { - CommandKind::Unknown => return Err(DecodeError::UnknownCommandKind.into()), - CommandKind::UpdatePartitionDurability => { - codec_or_error!(envelope.command, StorageCodecKind::Bilrost); - Command::UpdatePartitionDurability(envelope.command.decode_bilrost()?) - } - CommandKind::VersionBarrier => { - codec_or_error!(envelope.command, StorageCodecKind::Bilrost); - Command::VersionBarrier(envelope.command.decode_bilrost()?) - } - CommandKind::AnnounceLeader => { - codec_or_error!(envelope.command, StorageCodecKind::FlexbuffersSerde); - Command::AnnounceLeader(envelope.command.decode_serde()?) - } - CommandKind::PatchState => { - codec_or_error!(envelope.command, StorageCodecKind::Protobuf); - let value: protobuf::StateMutation = envelope.command.decode_protobuf()?; - Command::PatchState(value.try_into()?) - } - CommandKind::TerminateInvocation => { - codec_or_error!(envelope.command, StorageCodecKind::FlexbuffersSerde); - Command::TerminateInvocation(envelope.command.decode_serde()?) - } - CommandKind::PurgeInvocation => { - codec_or_error!(envelope.command, StorageCodecKind::FlexbuffersSerde); - Command::PurgeInvocation(envelope.command.decode_serde()?) - } - CommandKind::ResumeInvocation => { - codec_or_error!(envelope.command, StorageCodecKind::FlexbuffersSerde); - Command::ResumeInvocation(envelope.command.decode_serde()?) - } - CommandKind::RestartAsNewInvocation => { - codec_or_error!(envelope.command, StorageCodecKind::FlexbuffersSerde); - Command::RestartAsNewInvocation(envelope.command.decode_serde()?) - } - CommandKind::PurgeJournal => { - codec_or_error!(envelope.command, StorageCodecKind::FlexbuffersSerde); - Command::PurgeJournal(envelope.command.decode_serde()?) - } - CommandKind::Invoke => { - codec_or_error!(envelope.command, StorageCodecKind::Protobuf); - let value: protobuf::ServiceInvocation = envelope.command.decode_protobuf()?; - Command::Invoke(Box::new(value.try_into()?)) - } - CommandKind::TruncateOutbox => { - codec_or_error!(envelope.command, StorageCodecKind::FlexbuffersSerde); - Command::TruncateOutbox(envelope.command.decode_serde()?) - } - CommandKind::ProxyThrough => { - codec_or_error!(envelope.command, StorageCodecKind::Protobuf); - let value: protobuf::ServiceInvocation = envelope.command.decode_protobuf()?; - Command::ProxyThrough(Box::new(value.try_into()?)) - } - CommandKind::AttachInvocation => { - codec_or_error!(envelope.command, StorageCodecKind::Protobuf); - let value: protobuf::outbox_message::AttachInvocationRequest = - envelope.command.decode_protobuf()?; - Command::AttachInvocation(value.try_into()?) - } - CommandKind::InvokerEffect => { - codec_or_error!(envelope.command, StorageCodecKind::FlexbuffersSerde); - Command::InvokerEffect(envelope.command.decode_serde()?) - } - CommandKind::Timer => { - codec_or_error!(envelope.command, StorageCodecKind::FlexbuffersSerde); - Command::Timer(envelope.command.decode_serde()?) - } - CommandKind::ScheduleTimer => { - codec_or_error!(envelope.command, StorageCodecKind::FlexbuffersSerde); - Command::ScheduleTimer(envelope.command.decode_serde()?) - } - CommandKind::InvocationResponse => { - codec_or_error!(envelope.command, StorageCodecKind::Protobuf); - let value: protobuf::outbox_message::OutboxServiceInvocationResponse = - envelope.command.decode_protobuf()?; - Command::InvocationResponse(value.try_into()?) - } - CommandKind::NotifyGetInvocationOutputResponse => { - codec_or_error!(envelope.command, StorageCodecKind::Bilrost); - Command::NotifyGetInvocationOutputResponse(envelope.command.decode_bilrost()?) - } - CommandKind::NotifySignal => { - codec_or_error!(envelope.command, StorageCodecKind::Protobuf); - let value: protobuf::outbox_message::NotifySignal = - envelope.command.decode_protobuf()?; - - Command::NotifySignal(value.try_into()?) - } - }; - - Ok(super::Envelope { header, command }) - } -} diff --git a/crates/worker/src/partition/leadership/leader_state.rs b/crates/worker/src/partition/leadership/leader_state.rs index 9fd6b52a4b..00ae801f3c 100644 --- a/crates/worker/src/partition/leadership/leader_state.rs +++ b/crates/worker/src/partition/leadership/leader_state.rs @@ -12,6 +12,7 @@ use std::collections::hash_map::Entry; use std::collections::{HashMap, VecDeque}; use std::future; use std::future::Future; +use std::ops::RangeInclusive; use std::pin::Pin; use std::task::{Context, Poll, ready}; use std::time::{Duration, SystemTime}; @@ -20,12 +21,14 @@ use futures::future::OptionFuture; use futures::stream::FuturesUnordered; use futures::{FutureExt, StreamExt, stream}; use metrics::counter; -use tokio_stream::wrappers::ReceiverStream; +use restate_types::logs::Keys; +use restate_wal_protocol::control::UpsertSchema; +use tokio_stream::wrappers::{ReceiverStream, WatchStream}; use tracing::{debug, trace}; use restate_bifrost::CommitToken; use restate_core::network::{Oneshot, Reciprocal}; -use restate_core::{TaskCenter, TaskHandle, TaskId}; +use restate_core::{Metadata, MetadataKind, TaskCenter, TaskHandle, TaskId}; use restate_partition_store::PartitionStore; use restate_types::identifiers::{ InvocationId, LeaderEpoch, PartitionId, PartitionKey, PartitionProcessorRpcRequestId, @@ -36,6 +39,7 @@ use restate_types::net::partition_processor::{ PartitionProcessorRpcError, PartitionProcessorRpcResponse, }; use restate_types::time::MillisSinceEpoch; +use restate_types::{SemanticRestateVersion, Version, Versioned}; use restate_wal_protocol::Command; use restate_wal_protocol::timer::TimerKeyValue; @@ -45,7 +49,7 @@ use crate::partition::leadership::self_proposer::SelfProposer; use crate::partition::leadership::{ActionEffect, Error, InvokerStream, TimerService}; use crate::partition::shuffle; use crate::partition::shuffle::HintSender; -use crate::partition::state_machine::Action; +use crate::partition::state_machine::{Action, StateMachine}; use super::durability_tracker::DurabilityTracker; @@ -58,7 +62,7 @@ pub struct LeaderState { pub(crate) partition_id: PartitionId, pub leader_epoch: LeaderEpoch, // only needed for proposing TruncateOutbox to ourselves - own_partition_key: PartitionKey, + partition_key_range: RangeInclusive, pub shuffle_hint_tx: HintSender, // It's illegal to await the shuffle task handle once it has @@ -73,6 +77,7 @@ pub struct LeaderState { invoker_stream: InvokerStream, shuffle_stream: ReceiverStream, + schema_stream: WatchStream, pub pending_cleanup_timers_to_schedule: VecDeque<(InvocationId, Duration)>, cleaner_task_id: TaskId, trimmer_task_id: TaskId, @@ -84,7 +89,7 @@ impl LeaderState { pub fn new( partition_id: PartitionId, leader_epoch: LeaderEpoch, - own_partition_key: PartitionKey, + partition_key_range: RangeInclusive, shuffle_task_handle: TaskHandle>, cleaner_task_id: TaskId, trimmer_task_id: TaskId, @@ -98,11 +103,14 @@ impl LeaderState { LeaderState { partition_id, leader_epoch, - own_partition_key, + partition_key_range, shuffle_task_handle: Some(shuffle_task_handle), cleaner_task_id, trimmer_task_id, shuffle_hint_tx, + schema_stream: Metadata::with_current(|m| { + WatchStream::new(m.watch(MetadataKind::Schema)) + }), timer_service: Box::pin(timer_service), self_proposer, awaiting_rpc_actions: Default::default(), @@ -119,7 +127,7 @@ impl LeaderState { /// /// Important: The future needs to be cancellation safe since it is polled as a tokio::select /// arm! - pub async fn run(&mut self) -> Result, Error> { + pub async fn run(&mut self, state_machine: &StateMachine) -> Result, Error> { let timer_stream = std::pin::pin!(stream::unfold( &mut self.timer_service, |timer_service| async { @@ -128,6 +136,21 @@ impl LeaderState { } )); + let schema_stream = (&mut self.schema_stream).filter_map(|_| { + // only upsert schema iff version is newer than current version + let current_version = state_machine + .schema + .as_ref() + .map(|schema| schema.version()) + .unwrap_or_else(Version::invalid); + + std::future::ready( + Some(Metadata::with_current(|m| m.schema())) + .filter(|schema| schema.version() > current_version) + .map(|schema| ActionEffect::UpsertSchema(schema.clone())), + ) + }); + let invoker_stream = (&mut self.invoker_stream).map(ActionEffect::Invoker); let shuffle_stream = (&mut self.shuffle_stream).map(ActionEffect::Shuffle); let dur_tracker_stream = @@ -155,7 +178,8 @@ impl LeaderState { timer_stream, action_effects_stream, awaiting_rpc_self_propose_stream, - dur_tracker_stream + dur_tracker_stream, + schema_stream ); let mut all_streams = all_streams.ready_chunks(BATCH_READY_UP_TO); @@ -245,7 +269,7 @@ impl LeaderState { // the replica-set as a sufficient source of durability, or only snapshots. self.self_proposer .propose( - self.own_partition_key, + *self.partition_key_range.start(), Command::UpdatePartitionDurability(partition_durability), ) .await?; @@ -263,7 +287,7 @@ impl LeaderState { // specific destination messages that are identified by a partition_id self.self_proposer .propose( - self.own_partition_key, + *self.partition_key_range.start(), Command::TruncateOutbox(outbox_truncation.index()), ) .await?; @@ -284,6 +308,24 @@ impl LeaderState { ) .await?; } + ActionEffect::UpsertSchema(schema) => { + const GATE_VERSION: SemanticRestateVersion = + SemanticRestateVersion::new(1, 7, 0); + + if SemanticRestateVersion::current().is_equal_or_newer_than(&GATE_VERSION) { + self.self_proposer + .propose( + *self.partition_key_range.start(), + Command::UpsertSchema(UpsertSchema { + partition_key_range: Keys::RangeInclusive( + self.partition_key_range.clone(), + ), + schema, + }), + ) + .await?; + } + } ActionEffect::AwaitingRpcSelfProposeDone => { // Nothing to do here } diff --git a/crates/worker/src/partition/leadership/mod.rs b/crates/worker/src/partition/leadership/mod.rs index 989df6ed09..b356807625 100644 --- a/crates/worker/src/partition/leadership/mod.rs +++ b/crates/worker/src/partition/leadership/mod.rs @@ -52,6 +52,7 @@ use restate_types::net::partition_processor::{ use restate_types::partitions::Partition; use restate_types::partitions::state::PartitionReplicaSetStates; use restate_types::retries::with_jitter; +use restate_types::schema::Schema; use restate_types::storage::StorageEncodeError; use restate_wal_protocol::Command; use restate_wal_protocol::control::{AnnounceLeader, PartitionDurability}; @@ -63,7 +64,7 @@ use crate::partition::leadership::leader_state::LeaderState; use crate::partition::leadership::self_proposer::SelfProposer; use crate::partition::shuffle; use crate::partition::shuffle::{OutboxReaderError, Shuffle, ShuffleMetadata}; -use crate::partition::state_machine::Action; +use crate::partition::state_machine::{Action, StateMachine}; use crate::partition::types::InvokerEffect; use self::durability_tracker::DurabilityTracker; @@ -124,6 +125,7 @@ pub(crate) enum ActionEffect { Timer(TimerKeyValue), ScheduleCleanupTimer(InvocationId, Duration), PartitionMaintenance(PartitionDurability), + UpsertSchema(Schema), AwaitingRpcSelfProposeDone, } enum State { @@ -413,7 +415,7 @@ where self.state = State::Leader(Box::new(LeaderState::new( self.partition.partition_id, *leader_epoch, - *self.partition.key_range.start(), + self.partition.key_range.clone(), shuffle_task_handle, cleaner_task_id, trimmer_task_id, @@ -518,7 +520,7 @@ where /// * Follower: Nothing to do /// * Candidate: Monitor appender task /// * Leader: Await action effects and monitor appender task - pub async fn run(&mut self) -> Result, Error> { + pub async fn run(&mut self, state_machine: &StateMachine) -> Result, Error> { match &mut self.state { State::Follower => Ok(futures::future::pending::>().await), State::Candidate { self_proposer, .. } => Err(self_proposer @@ -527,7 +529,7 @@ where .join_on_err() .await .expect_err("never should never be returned")), - State::Leader(leader_state) => leader_state.run().await, + State::Leader(leader_state) => leader_state.run(state_machine).await, } } diff --git a/crates/worker/src/partition/mod.rs b/crates/worker/src/partition/mod.rs index c840827f9f..1d90e27304 100644 --- a/crates/worker/src/partition/mod.rs +++ b/crates/worker/src/partition/mod.rs @@ -181,6 +181,7 @@ where let outbox_seq_number = partition_store.get_outbox_seq_number().await?; let outbox_head_seq_number = partition_store.get_outbox_head_seq_number().await?; let min_restate_version = partition_store.get_min_restate_version().await?; + let schema = partition_store.get_schema().await?; if !SemanticRestateVersion::current().is_equal_or_newer_than(&min_restate_version) { gauge!(PARTITION_BLOCKED_FLARE, PARTITION_LABEL => @@ -199,6 +200,7 @@ where partition_store.partition_key_range().clone(), min_restate_version, EnumSet::empty(), + schema, ); Ok(state_machine) @@ -561,7 +563,7 @@ where transaction.commit().await?; self.leadership_state.handle_actions(action_collector.drain(..))?; }, - result = self.leadership_state.run() => { + result = self.leadership_state.run(&self.state_machine) => { let action_effects = result?; // We process the action_effects not directly in the run future because it // requires the run future to be cancellation safe. In the future this could be diff --git a/crates/worker/src/partition/state_machine/lifecycle/version_barrier.rs b/crates/worker/src/partition/state_machine/lifecycle/version_barrier.rs index 6cc0091e2e..e8cd206b14 100644 --- a/crates/worker/src/partition/state_machine/lifecycle/version_barrier.rs +++ b/crates/worker/src/partition/state_machine/lifecycle/version_barrier.rs @@ -64,6 +64,7 @@ mod tests { PartitionKey::MIN..=PartitionKey::MAX, SemanticRestateVersion::unknown().clone(), Default::default(), + None, ); // this is fine as we are always above the unknown version (current > 0.0.0) let mut test_env = TestEnv::create_with_state_machine(state_machine).await; @@ -106,6 +107,7 @@ mod tests { PartitionKey::MIN..=PartitionKey::MAX, SemanticRestateVersion::unknown().clone(), Default::default(), + None, ); // this is fine as we are always above the unknown version (current > 0.0.0) let mut test_env = TestEnv::create_with_state_machine(state_machine).await; diff --git a/crates/worker/src/partition/state_machine/mod.rs b/crates/worker/src/partition/state_machine/mod.rs index 806ac991ce..26db066ca9 100644 --- a/crates/worker/src/partition/state_machine/mod.rs +++ b/crates/worker/src/partition/state_machine/mod.rs @@ -90,7 +90,6 @@ use restate_types::journal::enriched::{ AwakeableEnrichmentResult, CallEnrichmentResult, EnrichedEntryHeader, }; use restate_types::journal::raw::{EntryHeader, RawEntryCodec, RawEntryCodecError}; -use restate_types::journal::*; use restate_types::journal_v2; use restate_types::journal_v2::command::{OutputCommand, OutputResult}; use restate_types::journal_v2::raw::RawNotification; @@ -99,11 +98,13 @@ use restate_types::journal_v2::{ }; use restate_types::logs::Lsn; use restate_types::message::MessageIndex; +use restate_types::schema::Schema; use restate_types::service_protocol::ServiceProtocolVersion; use restate_types::state_mut::ExternalStateMutation; use restate_types::state_mut::StateMutationVersion; use restate_types::time::MillisSinceEpoch; use restate_types::{RestateVersion, SemanticRestateVersion}; +use restate_types::{Versioned, journal::*}; use restate_wal_protocol::Command; use restate_wal_protocol::timer::TimerKeyDisplay; use restate_wal_protocol::timer::TimerKeyValue; @@ -118,17 +119,20 @@ pub enum ExperimentalFeature {} pub struct StateMachine { // initialized from persistent storage - inbox_seq_number: MessageIndex, + pub(crate) inbox_seq_number: MessageIndex, /// First outbox message index. - outbox_head_seq_number: Option, + pub(crate) outbox_head_seq_number: Option, /// The minimum version of restate server that we currently support - min_restate_version: SemanticRestateVersion, + pub(crate) min_restate_version: SemanticRestateVersion, /// Sequence number of the next outbox message to be appended. - outbox_seq_number: MessageIndex, - partition_key_range: RangeInclusive, + pub(crate) outbox_seq_number: MessageIndex, + /// Consistent schema + pub(crate) schema: Option, + + pub(crate) partition_key_range: RangeInclusive, /// Enabled experimental features. - experimental_features: EnumSet, + pub(crate) experimental_features: EnumSet, } impl Debug for StateMachine { @@ -214,6 +218,7 @@ impl StateMachine { partition_key_range: RangeInclusive, min_restate_version: SemanticRestateVersion, experimental_features: EnumSet, + schema: Option, ) -> Self { Self { inbox_seq_number, @@ -222,6 +227,7 @@ impl StateMachine { partition_key_range, min_restate_version, experimental_features, + schema, } } } @@ -235,6 +241,7 @@ pub(crate) struct StateMachineApplyContext<'a, S> { outbox_seq_number: &'a mut MessageIndex, outbox_head_seq_number: &'a mut Option, min_restate_version: &'a mut SemanticRestateVersion, + schema: &'a mut Option, partition_key_range: RangeInclusive, #[allow(dead_code)] experimental_features: &'a EnumSet, @@ -273,6 +280,7 @@ impl StateMachine { outbox_seq_number: &mut self.outbox_seq_number, outbox_head_seq_number: &mut self.outbox_head_seq_number, min_restate_version: &mut self.min_restate_version, + schema: &mut self.schema, partition_key_range: self.partition_key_range.clone(), experimental_features: &self.experimental_features, is_leader, @@ -606,6 +614,24 @@ impl StateMachineApplyContext<'_, S> { .await?; Ok(()) } + Command::UpsertSchema(upsert) => { + trace!( + "Upsert schema record to version '{}'", + upsert.schema.version() + ); + if self + .schema + .as_ref() + .map(|current| current.version() < upsert.schema.version()) + .unwrap_or(true) + { + // only update if schema is none or has a smaller version + debug!("Schema updated to version '{}'", upsert.schema.version()); + *self.schema = Some(upsert.schema); + } + + Ok(()) + } } } diff --git a/crates/worker/src/partition/state_machine/tests/mod.rs b/crates/worker/src/partition/state_machine/tests/mod.rs index 28bc74b901..31c3cb59e4 100644 --- a/crates/worker/src/partition/state_machine/tests/mod.rs +++ b/crates/worker/src/partition/state_machine/tests/mod.rs @@ -99,6 +99,7 @@ impl TestEnv { PartitionKey::MIN..=PartitionKey::MAX, SemanticRestateVersion::unknown().clone(), experimental_features, + None, )) .await } @@ -1099,6 +1100,7 @@ async fn truncate_outbox_with_gap() -> Result<(), Error> { PartitionKey::MIN..=PartitionKey::MAX, SemanticRestateVersion::unknown().clone(), EnumSet::empty(), + None, )) .await;