Skip to content

Commit 9f7b7c5

Browse files
committed
Move Envelope to v1
1 parent 9ea36f6 commit 9f7b7c5

File tree

2 files changed

+248
-238
lines changed

2 files changed

+248
-238
lines changed

crates/wal-protocol/src/lib.rs

Lines changed: 2 additions & 238 deletions
Original file line numberDiff line numberDiff line change
@@ -8,244 +8,8 @@
88
// the Business Source License, use of this software will be governed
99
// by the Apache License, Version 2.0.
1010

11-
use restate_storage_api::deduplication_table::DedupInformation;
12-
use restate_types::identifiers::{LeaderEpoch, PartitionId, PartitionKey, WithPartitionKey};
13-
use restate_types::invocation::{
14-
AttachInvocationRequest, GetInvocationOutputResponse, InvocationResponse,
15-
InvocationTermination, NotifySignalRequest, PurgeInvocationRequest,
16-
RestartAsNewInvocationRequest, ResumeInvocationRequest, ServiceInvocation,
17-
};
18-
use restate_types::logs::{self, HasRecordKeys, Keys, MatchKeyQuery};
19-
use restate_types::message::MessageIndex;
20-
use restate_types::state_mut::ExternalStateMutation;
21-
22-
use crate::control::{AnnounceLeader, UpsertSchema, VersionBarrier};
23-
use crate::timer::TimerKeyValue;
24-
25-
use self::control::PartitionDurability;
26-
2711
pub mod control;
2812
pub mod timer;
13+
mod v1;
2914

30-
/// The primary envelope for all messages in the system.
31-
#[derive(Debug, Clone)]
32-
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
33-
pub struct Envelope {
34-
pub header: Header,
35-
pub command: Command,
36-
}
37-
38-
#[cfg(feature = "serde")]
39-
restate_types::flexbuffers_storage_encode_decode!(Envelope);
40-
41-
impl Envelope {
42-
pub fn new(header: Header, command: Command) -> Self {
43-
Self { header, command }
44-
}
45-
}
46-
47-
/// Header is set on every message
48-
#[derive(Debug, Clone, PartialEq, Eq)]
49-
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
50-
pub struct Header {
51-
pub source: Source,
52-
pub dest: Destination,
53-
}
54-
55-
/// Identifies the source of a message
56-
#[derive(Debug, Clone, PartialEq, Eq)]
57-
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
58-
pub enum Source {
59-
/// Message is sent from another partition processor
60-
Processor {
61-
/// if possible, this is used to reroute responses in case of splits/merges
62-
/// v1.4 requires this to be set.
63-
/// v1.5 Marked as `Option`.
64-
/// v1.6 always set to `None`.
65-
/// Will be removed in v1.7.
66-
#[cfg_attr(feature = "serde", serde(default))]
67-
partition_id: Option<PartitionId>,
68-
#[cfg_attr(feature = "serde", serde(default))]
69-
partition_key: Option<PartitionKey>,
70-
/// The current epoch of the partition leader. Readers should observe this to decide which
71-
/// messages to accept. Readers should ignore messages coming from
72-
/// epochs lower than the max observed for a given partition id.
73-
leader_epoch: LeaderEpoch,
74-
// Which node is this message from?
75-
// First deprecation in v1.1, but since v1.5 we switched to Option<PlainNodeId> and it's
76-
// still being set to Some(v) to maintain compatibility with v1.4.
77-
//
78-
// v1.6 field is removed. -- Kept here for reference only.
79-
// #[cfg_attr(feature = "serde", serde(default))]
80-
// node_id: Option<PlainNodeId>,
81-
82-
// From v1.1 this is always set, but maintained to support rollback to v1.0.
83-
// Deprecated(v1.5): It's set to Some(v) to maintain support for v1.4 but
84-
// will be removed in v1.6. Commands that need the node-id of the sender should
85-
// include the node-id in the command payload itself (e.g. in the [`AnnounceLeader`])
86-
// v1.6 field is removed. -- Kept here for reference only.
87-
// #[cfg_attr(feature = "serde", serde(default))]
88-
// generational_node_id: Option<GenerationalNodeId>,
89-
},
90-
/// Message is sent from an ingress node
91-
Ingress {
92-
// The identity of the sender node. Generational for fencing. Ingress is
93-
// stateless, so we shouldn't respond to requests from older generation
94-
// if a new generation is alive.
95-
//
96-
// Deprecated(v1.5): This field is set to Some(v) to maintain compatibility with v1.4.
97-
// but will be removed in v1.6.
98-
// v1.6 field is removed. -- Kept here for reference only.
99-
// #[cfg_attr(feature = "serde", serde(default))]
100-
// node_id: Option<GenerationalNodeId>,
101-
102-
// Last config version observed by sender. If this is a newer generation
103-
// or an unknown ID, we might need to update our config.
104-
//
105-
// Deprecated(v1.5): This field is set to Some(v) to maintain compatibility with v1.4.
106-
// but will be removed in v1.6.
107-
// v1.6 field is removed. -- Kept here for reference only.
108-
// #[cfg_attr(feature = "serde", serde(default))]
109-
// nodes_config_version: Option<Version>,
110-
},
111-
/// Message is sent from some control plane component (controller, cli, etc.)
112-
ControlPlane {
113-
// Reserved for future use.
114-
},
115-
}
116-
117-
/// Identifies the intended destination of the message
118-
#[derive(Debug, Clone, PartialEq, Eq)]
119-
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
120-
pub enum Destination {
121-
/// Message is sent to partition processor
122-
Processor {
123-
partition_key: PartitionKey,
124-
#[cfg_attr(feature = "serde", serde(default))]
125-
dedup: Option<DedupInformation>,
126-
},
127-
}
128-
129-
/// State machine input commands
130-
#[derive(Debug, Clone, strum::EnumDiscriminants, strum::VariantNames)]
131-
#[strum_discriminants(derive(strum::IntoStaticStr))]
132-
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
133-
pub enum Command {
134-
/// Updates the `PARTITION_DURABILITY` FSM variable to the given value.
135-
/// See [`PartitionDurability`] for more details.
136-
///
137-
/// *Since v1.4.2*
138-
UpdatePartitionDurability(PartitionDurability),
139-
/// A version barrier to fence off state machine changes that require a certain minimum
140-
/// version of restate server.
141-
/// *Since v1.4.0*
142-
VersionBarrier(VersionBarrier),
143-
// -- Control-plane related events
144-
AnnounceLeader(Box<AnnounceLeader>),
145-
146-
// -- Partition processor commands
147-
/// Manual patching of storage state
148-
PatchState(ExternalStateMutation),
149-
/// Terminate an ongoing invocation
150-
TerminateInvocation(InvocationTermination),
151-
/// Purge a completed invocation
152-
PurgeInvocation(PurgeInvocationRequest),
153-
/// Purge a completed invocation journal
154-
PurgeJournal(PurgeInvocationRequest),
155-
/// Start an invocation on this partition
156-
Invoke(Box<ServiceInvocation>),
157-
/// Truncate the message outbox up to, and including, the specified index.
158-
TruncateOutbox(MessageIndex),
159-
/// Proxy a service invocation through this partition processor, to reuse the deduplication id map.
160-
ProxyThrough(Box<ServiceInvocation>),
161-
/// Attach to an existing invocation
162-
AttachInvocation(AttachInvocationRequest),
163-
/// Resume an invocation
164-
ResumeInvocation(ResumeInvocationRequest),
165-
/// Restart as new invocation from prefix
166-
RestartAsNewInvocation(RestartAsNewInvocationRequest),
167-
168-
// -- Partition processor events for PP
169-
/// Invoker is reporting effect(s) from an ongoing invocation.
170-
InvokerEffect(Box<restate_invoker_api::Effect>),
171-
/// Timer has fired
172-
Timer(TimerKeyValue),
173-
/// Schedule timer
174-
ScheduleTimer(TimerKeyValue),
175-
/// Another partition processor is reporting a response of an invocation we requested.
176-
///
177-
/// KINDA DEPRECATED: When Journal Table V1 is removed, this command should be used only to reply to invocations.
178-
/// Now it's abused for a bunch of other scenarios, like replying to get promise and get invocation output.
179-
///
180-
/// For more details see `OnNotifyInvocationResponse`.
181-
InvocationResponse(InvocationResponse),
182-
183-
// -- New PP <-> PP commands using Journal V2
184-
/// Notify Get invocation output
185-
NotifyGetInvocationOutputResponse(GetInvocationOutputResponse),
186-
/// Notify a signal.
187-
NotifySignal(NotifySignalRequest),
188-
189-
/// Upsert schema for consistent schema across replicas
190-
/// *Since v1.6.0
191-
UpsertSchema(UpsertSchema),
192-
}
193-
194-
impl Command {
195-
pub fn name(&self) -> &'static str {
196-
CommandDiscriminants::from(self).into()
197-
}
198-
}
199-
200-
impl WithPartitionKey for Envelope {
201-
fn partition_key(&self) -> PartitionKey {
202-
match self.header.dest {
203-
Destination::Processor { partition_key, .. } => partition_key,
204-
}
205-
}
206-
}
207-
208-
impl HasRecordKeys for Envelope {
209-
fn record_keys(&self) -> logs::Keys {
210-
match &self.command {
211-
// the partition_key is used as key here since the command targets the partition by ID.
212-
// Partitions will ignore this message at read time if the paritition ID (in body)
213-
// does not match. Alternatively, we could use the partition key range or `Keys::None`
214-
// but this would just be a waste of effort for readers after a partition has been
215-
// split or if the log is shared between multiple partitions.
216-
Command::UpdatePartitionDurability(_) => Keys::Single(self.partition_key()),
217-
Command::VersionBarrier(barrier) => barrier.partition_key_range.clone(),
218-
Command::AnnounceLeader(announce) => {
219-
Keys::RangeInclusive(announce.partition_key_range.clone())
220-
}
221-
Command::PatchState(mutation) => Keys::Single(mutation.service_id.partition_key()),
222-
Command::TerminateInvocation(terminate) => {
223-
Keys::Single(terminate.invocation_id.partition_key())
224-
}
225-
Command::PurgeInvocation(purge) => Keys::Single(purge.invocation_id.partition_key()),
226-
Command::PurgeJournal(purge) => Keys::Single(purge.invocation_id.partition_key()),
227-
Command::Invoke(invoke) => Keys::Single(invoke.partition_key()),
228-
// todo: Remove this, or pass the partition key range but filter based on partition-id
229-
// on read if needed.
230-
Command::TruncateOutbox(_) => Keys::Single(self.partition_key()),
231-
Command::ProxyThrough(_) => Keys::Single(self.partition_key()),
232-
Command::AttachInvocation(_) => Keys::Single(self.partition_key()),
233-
Command::ResumeInvocation(req) => Keys::Single(req.partition_key()),
234-
Command::RestartAsNewInvocation(req) => Keys::Single(req.partition_key()),
235-
// todo: Handle journal entries that request cross-partition invocations
236-
Command::InvokerEffect(effect) => Keys::Single(effect.invocation_id.partition_key()),
237-
Command::Timer(timer) => Keys::Single(timer.invocation_id().partition_key()),
238-
Command::ScheduleTimer(timer) => Keys::Single(timer.invocation_id().partition_key()),
239-
Command::InvocationResponse(response) => Keys::Single(response.partition_key()),
240-
Command::NotifySignal(sig) => Keys::Single(sig.partition_key()),
241-
Command::NotifyGetInvocationOutputResponse(res) => Keys::Single(res.partition_key()),
242-
Command::UpsertSchema(schema) => schema.partition_key_range.clone(),
243-
}
244-
}
245-
}
246-
247-
impl MatchKeyQuery for Envelope {
248-
fn matches_key_query(&self, query: &logs::KeyFilter) -> bool {
249-
self.record_keys().matches_key_query(query)
250-
}
251-
}
15+
pub use v1::{Command, Destination, Envelope, Header, Source};

0 commit comments

Comments
 (0)