Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/encoding/src/bilrost_encodings/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

mod arc_encodings;
mod nonzero;
mod phantom_data;
mod range;

pub mod display_from_str;
Expand Down
56 changes: 56 additions & 0 deletions crates/encoding/src/bilrost_encodings/phantom_data.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
// Copyright (c) 2023 - 2025 Restate Software, Inc., Restate GmbH.
// All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use std::marker::PhantomData;

use bilrost::{
DecodeErrorKind,
encoding::{EmptyState, ForOverwrite, Proxiable},
};

use crate::bilrost_encodings::RestateEncoding;

struct PhantomDataTag;

impl<T> Proxiable<PhantomDataTag> for PhantomData<T> {
type Proxy = ();

fn encode_proxy(&self) -> Self::Proxy {}

fn decode_proxy(&mut self, _: Self::Proxy) -> Result<(), DecodeErrorKind> {
Ok(())
}
}

impl<T> ForOverwrite<RestateEncoding, PhantomData<T>> for () {
fn for_overwrite() -> PhantomData<T> {
PhantomData
}
}

impl<T> EmptyState<RestateEncoding, PhantomData<T>> for () {
fn empty() -> PhantomData<T> {
PhantomData
}

fn is_empty(_: &PhantomData<T>) -> bool {
true
}

fn clear(_: &mut PhantomData<T>) {}
}

bilrost::delegate_proxied_encoding!(
use encoding (::bilrost::encoding::General)
to encode proxied type (PhantomData<T>)
using proxy tag (PhantomDataTag)
with encoding (RestateEncoding)
with generics (T)
);
4 changes: 3 additions & 1 deletion crates/invoker-api/src/effects.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,11 @@ use restate_types::invocation::InvocationEpoch;
use restate_types::journal::EntryIndex;
use restate_types::journal::enriched::EnrichedRawEntry;
use restate_types::journal_events::raw::RawEvent;
use restate_types::journal_v2;
use restate_types::journal_v2::CommandIndex;
use restate_types::journal_v2::raw::RawEntry;
use restate_types::storage::{StoredRawEntry, StoredRawEntryHeader};
use restate_types::time::MillisSinceEpoch;
use restate_types::{flexbuffers_storage_encode_decode, journal_v2};
use std::collections::HashSet;

#[derive(Debug, Clone, PartialEq, Eq)]
Expand All @@ -35,6 +35,8 @@ pub struct Effect {
pub kind: EffectKind,
}

flexbuffers_storage_encode_decode!(Effect);

#[derive(Debug, Clone, PartialEq, Eq)]
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
// todo: fix this and box the large variant (EffectKind is 320 bytes)
Expand Down
15 changes: 13 additions & 2 deletions crates/types/src/identifiers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -349,15 +349,26 @@ impl From<InvocationUuid> for opentelemetry::trace::SpanId {
/// Services are isolated by key. This means that there cannot be two concurrent
/// invocations for the same service instance (service name, key).
#[derive(
Eq, Hash, PartialEq, PartialOrd, Ord, Clone, Debug, serde::Serialize, serde::Deserialize,
Eq,
Hash,
PartialEq,
PartialOrd,
Ord,
Clone,
Debug,
serde::Serialize,
serde::Deserialize,
bilrost::Message,
)]
pub struct ServiceId {
// TODO rename this to KeyedServiceId. This type can be used only by keyed service types (virtual objects and workflows)
/// Identifies the grpc service
#[bilrost(1)]
pub service_name: ByteString,
/// Identifies the service instance for the given service name
#[bilrost(2)]
pub key: ByteString,

#[bilrost(3)]
partition_key: PartitionKey,
}

Expand Down
23 changes: 22 additions & 1 deletion crates/types/src/invocation/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@ use crate::identifiers::{
};
use crate::journal_v2::{CompletionId, GetInvocationOutputResult, Signal};
use crate::time::MillisSinceEpoch;
use crate::{GenerationalNodeId, RestateVersion};
use crate::{
GenerationalNodeId, RestateVersion, bilrost_storage_encode_decode,
flexbuffers_storage_encode_decode,
};

use bytes::Bytes;
use bytestring::ByteString;
Expand Down Expand Up @@ -435,6 +438,8 @@ pub struct ServiceInvocation {
pub restate_version: RestateVersion,
}

flexbuffers_storage_encode_decode!(ServiceInvocation);

#[derive(Debug, Copy, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
#[serde(
from = "serde_hacks::SubmitNotificationSink",
Expand Down Expand Up @@ -577,6 +582,8 @@ pub struct InvocationResponse {
pub result: ResponseResult,
}

flexbuffers_storage_encode_decode!(InvocationResponse);

impl WithInvocationId for InvocationResponse {
fn invocation_id(&self) -> InvocationId {
self.target.invocation_id()
Expand Down Expand Up @@ -623,6 +630,8 @@ pub struct GetInvocationOutputResponse {
pub result: GetInvocationOutputResult,
}

bilrost_storage_encode_decode!(GetInvocationOutputResponse);

impl WithInvocationId for GetInvocationOutputResponse {
fn invocation_id(&self) -> InvocationId {
self.target.invocation_id()
Expand Down Expand Up @@ -944,6 +953,8 @@ pub struct InvocationTermination {
pub response_sink: Option<InvocationMutationResponseSink>,
}

flexbuffers_storage_encode_decode!(InvocationTermination);

/// Flavor of the termination. Can be kill (hard stop) or graceful cancel.
#[derive(
Debug, Clone, Copy, Eq, PartialEq, serde::Serialize, serde::Deserialize, bilrost::Enumeration,
Expand All @@ -963,6 +974,8 @@ pub struct PurgeInvocationRequest {
pub response_sink: Option<InvocationMutationResponseSink>,
}

flexbuffers_storage_encode_decode!(PurgeInvocationRequest);

impl WithInvocationId for PurgeInvocationRequest {
fn invocation_id(&self) -> InvocationId {
self.invocation_id
Expand All @@ -979,6 +992,8 @@ pub struct ResumeInvocationRequest {
pub response_sink: Option<InvocationMutationResponseSink>,
}

flexbuffers_storage_encode_decode!(ResumeInvocationRequest);

impl WithInvocationId for ResumeInvocationRequest {
fn invocation_id(&self) -> InvocationId {
self.invocation_id
Expand All @@ -1001,6 +1016,8 @@ pub struct RestartAsNewInvocationRequest {
pub response_sink: Option<InvocationMutationResponseSink>,
}

flexbuffers_storage_encode_decode!(RestartAsNewInvocationRequest);

impl WithInvocationId for RestartAsNewInvocationRequest {
fn invocation_id(&self) -> InvocationId {
self.invocation_id
Expand Down Expand Up @@ -1320,6 +1337,8 @@ pub struct AttachInvocationRequest {
pub response_sink: ServiceInvocationResponseSink,
}

flexbuffers_storage_encode_decode!(AttachInvocationRequest);

impl WithPartitionKey for AttachInvocationRequest {
fn partition_key(&self) -> PartitionKey {
self.invocation_query.partition_key()
Expand All @@ -1333,6 +1352,8 @@ pub struct NotifySignalRequest {
pub signal: Signal,
}

flexbuffers_storage_encode_decode!(NotifySignalRequest);

impl WithInvocationId for NotifySignalRequest {
fn invocation_id(&self) -> InvocationId {
self.invocation_id
Expand Down
10 changes: 9 additions & 1 deletion crates/types/src/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

//! This module defines types used for the internal messaging between Restate components.

use crate::identifiers::PartitionId;
use crate::{bilrost_storage_encode_decode, identifiers::PartitionId};

/// Wrapper that extends a message with its target peer to which the message should be sent.
pub type PartitionTarget<Msg> = (PartitionId, Msg);
Expand All @@ -29,3 +29,11 @@ pub enum AckKind {
last_known_seq_number: MessageIndex,
},
}

#[derive(Debug, Clone, Copy, bilrost::Message)]
pub struct MessageIndexRecrod {
#[bilrost(1)]
pub index: MessageIndex,
}

bilrost_storage_encode_decode!(MessageIndexRecrod);
8 changes: 7 additions & 1 deletion crates/types/src/state_mut.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,21 +16,27 @@ use bytes::Bytes;
use serde_with::serde_as;
use sha2::{Digest, Sha256};

use crate::bilrost_storage_encode_decode;
use crate::identifiers::ServiceId;

#[serde_as]
/// ExternalStateMutation
///
/// represents an external request to mutate a user's state.
#[derive(Debug, Clone, Eq, PartialEq, serde::Serialize, serde::Deserialize)]
#[derive(Debug, Clone, Eq, PartialEq, serde::Serialize, serde::Deserialize, bilrost::Message)]
pub struct ExternalStateMutation {
#[bilrost(1)]
pub service_id: ServiceId,
#[bilrost(2)]
pub version: Option<String>,
// flexbuffers only supports string-keyed maps :-( --> so we store it as vector of kv pairs
#[bilrost(3)]
#[serde_as(as = "serde_with::Seq<(_, _)>")]
pub state: HashMap<Bytes, Bytes>,
}

bilrost_storage_encode_decode!(ExternalStateMutation);

/// # StateMutationVersion
///
/// This type represents a user state version. This implementation hashes canonically the raw key-value
Expand Down
38 changes: 37 additions & 1 deletion crates/types/src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ use crate::journal_v2::raw::{RawEntry, RawEntryError, TryFromEntry};
use crate::journal_v2::{Decoder, EntryMetadata, EntryType};
use crate::time::MillisSinceEpoch;

pub use tracing;

#[derive(Debug, thiserror::Error)]
pub enum StorageEncodeError {
#[error("encoding failed: {0}")]
Expand Down Expand Up @@ -187,7 +189,7 @@ macro_rules! flexbuffers_storage_encode_decode {
Self: Sized,
{
$crate::storage::decode::decode_serde(buf, kind).map_err(|err| {
::tracing::error!(%err, "{} decode failure (decoding {})", kind, stringify!($name));
$crate::storage::tracing::error!(%err, "{} decode failure (decoding {})", kind, stringify!($name));
err
})

Expand All @@ -196,6 +198,40 @@ macro_rules! flexbuffers_storage_encode_decode {
};
}

/// Implements the [`StorageEncode`] and [`StorageDecode`] by encoding/decoding the implementing
/// type using [`bilrost`].
#[macro_export]
macro_rules! bilrost_storage_encode_decode {
($name:tt) => {
impl $crate::storage::StorageEncode for $name {
fn default_codec(&self) -> $crate::storage::StorageCodecKind {
$crate::storage::StorageCodecKind::Bilrost
}

fn encode(
&self,
buf: &mut ::bytes::BytesMut,
) -> Result<(), $crate::storage::StorageEncodeError> {
bytes::BufMut::put(buf, $crate::storage::encode::encode_bilrost(self));
Ok(())
}
}

impl $crate::storage::StorageDecode for $name {
fn decode<B: ::bytes::Buf>(
buf: &mut B,
kind: $crate::storage::StorageCodecKind,
) -> Result<Self, $crate::storage::StorageDecodeError>
where
Self: Sized,
{
debug_assert_eq!(kind, $crate::storage::StorageCodecKind::Bilrost);
$crate::storage::decode::decode_bilrost(buf)
}
}
};
}

/// A polymorphic container of a buffer or a cached storage-encodeable object
#[derive(Clone, derive_more::Debug, BilrostAs)]
#[bilrost_as(dto::PolyBytes)]
Expand Down
2 changes: 1 addition & 1 deletion crates/wal-protocol/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ serde = ["dep:serde", "enum-map/serde", "bytestring/serde", "restate-invoker-api

[dependencies]
restate-workspace-hack = { workspace = true }

restate-encoding = { workspace = true }
restate-invoker-api = { workspace = true }
restate-storage-api = { workspace = true }
restate-types = { workspace = true }
Expand Down
18 changes: 16 additions & 2 deletions crates/wal-protocol/src/control.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,22 +13,26 @@ use std::ops::RangeInclusive;
use restate_types::identifiers::{LeaderEpoch, PartitionId, PartitionKey};
use restate_types::logs::{Keys, Lsn};
use restate_types::time::MillisSinceEpoch;
use restate_types::{GenerationalNodeId, SemanticRestateVersion};
use restate_types::{GenerationalNodeId, SemanticRestateVersion, bilrost_storage_encode_decode};

/// Announcing a new leader. This message can be written by any component to make the specified
/// partition processor the leader.
#[derive(Debug, Clone, PartialEq, Eq)]
#[derive(Debug, Clone, PartialEq, Eq, bilrost::Message)]
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
pub struct AnnounceLeader {
/// Sender of the announce leader message.
///
/// This became non-optional in v1.5. Noting that it has always been set in previous versions,
/// it's safe to assume that it's always set.
#[bilrost(1)]
pub node_id: GenerationalNodeId,
#[bilrost(2)]
pub leader_epoch: LeaderEpoch,
#[bilrost(3)]
pub partition_key_range: RangeInclusive<PartitionKey>,
}

bilrost_storage_encode_decode!(AnnounceLeader);
/// A version barrier to fence off state machine changes that require a certain minimum
/// version of restate server.
///
Expand All @@ -39,12 +43,17 @@ pub struct AnnounceLeader {
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
pub struct VersionBarrier {
/// The minimum version required (inclusive) to progress after this barrier.
#[bilrost(1)]
pub version: SemanticRestateVersion,
/// A human-readable reason for why this barrier exists.
#[bilrost(2)]
pub human_reason: Option<String>,
#[bilrost(3)]
pub partition_key_range: Keys,
}

bilrost_storage_encode_decode!(VersionBarrier);

/// Updates the `PARTITION_DURABILITY` FSM variable to the given value. Note that durability
/// only applies to partitions with the same `partition_id`. At replay time, the partition will
/// ignore updates that are not targeted to its own ID.
Expand All @@ -55,10 +64,15 @@ pub struct VersionBarrier {
#[derive(Debug, Clone, PartialEq, Eq, bilrost::Message)]
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
pub struct PartitionDurability {
#[bilrost(1)]
pub partition_id: PartitionId,
/// The partition has applied this LSN durably to the replica-set and/or has been
/// persisted in a snapshot in the snapshot repository.
#[bilrost(2)]
pub durable_point: Lsn,
/// Timestamp which the durability point was updated
#[bilrost(3)]
pub modification_time: MillisSinceEpoch,
}

bilrost_storage_encode_decode!(PartitionDurability);
Loading
Loading