1010
1111use std:: marker:: PhantomData ;
1212
13- use bytes:: { Bytes , BytesMut } ;
13+ use bilrost:: encoding:: encoded_len_varint;
14+ use bilrost:: { Message , OwnedMessage } ;
15+ use bytes:: { BufMut , Bytes , BytesMut } ;
1416
15- use restate_encoding:: RestateEncoding ;
1617use restate_types:: identifiers:: { LeaderEpoch , PartitionId , PartitionKey , WithPartitionKey } ;
1718use restate_types:: logs:: { HasRecordKeys , Keys } ;
1819use restate_types:: storage:: {
1920 StorageCodecKind , StorageDecode , StorageDecodeError , StorageEncode , StorageEncodeError ,
2021} ;
2122
22- /// The primary envelope for all messages in the system.
23+ const ENCODING_VERSION : u8 = 1 ;
24+
2325#[ derive( Debug , Clone , bilrost:: Message ) ]
24- pub struct Envelope < M > {
26+ struct EnvelopeInner {
2527 #[ bilrost( 1 ) ]
26- pub header : Header ,
27-
28+ header : Header ,
2829 #[ bilrost( 2 ) ]
29- record_keys : Keys ,
30-
30+ keys : Keys ,
3131 #[ bilrost( 3 ) ]
32- record : RawRecord ,
32+ kind : RecordKind ,
33+ #[ bilrost( 4 ) ]
34+ encoding : Option < StorageCodecKind > ,
35+ }
3336
34- #[ bilrost( tag( 4 ) , encoding( RestateEncoding ) ) ]
37+ /// The primary envelope for all messages in the system.
38+ #[ derive( Debug , Clone ) ]
39+ pub struct Envelope < M > {
40+ inner : EnvelopeInner ,
41+ payload : Bytes ,
3542 phantom : PhantomData < M > ,
3643}
3744
45+ impl < M : Send + Sync + ' static > StorageEncode for Envelope < M > {
46+ fn default_codec ( & self ) -> StorageCodecKind {
47+ StorageCodecKind :: Custom
48+ }
49+
50+ fn encode ( & self , buf : & mut BytesMut ) -> Result < ( ) , StorageEncodeError > {
51+ buf. put_u8 ( ENCODING_VERSION ) ;
52+
53+ let len = self . inner . encoded_len ( ) ;
54+ buf. reserve ( encoded_len_varint ( len as u64 ) + len + self . payload . len ( ) ) ;
55+
56+ self . inner
57+ . encode_length_delimited ( buf)
58+ . map_err ( |err| StorageEncodeError :: EncodeValue ( err. into ( ) ) ) ?;
59+
60+ buf. put ( & self . payload [ ..] ) ;
61+ Ok ( ( ) )
62+ }
63+ }
64+
65+ impl StorageDecode for Envelope < Raw > {
66+ fn decode < B : bytes:: Buf > (
67+ buf : & mut B ,
68+ kind : StorageCodecKind ,
69+ ) -> Result < Self , StorageDecodeError >
70+ where
71+ Self : Sized ,
72+ {
73+ match kind {
74+ StorageCodecKind :: FlexbuffersSerde => {
75+ todo ! ( "implement loading from envelop V1" )
76+ }
77+ StorageCodecKind :: Custom => {
78+ let version = buf. get_u8 ( ) ;
79+ if version != ENCODING_VERSION {
80+ return Err ( StorageDecodeError :: DecodeValue (
81+ anyhow:: anyhow!( "Unknown envelope encoding version {version}" ) . into ( ) ,
82+ ) ) ;
83+ }
84+
85+ let inner = EnvelopeInner :: decode_length_delimited ( & mut * buf)
86+ . map_err ( |err| StorageDecodeError :: DecodeValue ( err. into ( ) ) ) ?;
87+
88+ Ok ( Self {
89+ inner,
90+ payload : buf. copy_to_bytes ( buf. remaining ( ) ) ,
91+ phantom : PhantomData ,
92+ } )
93+ }
94+ _ => {
95+ panic ! ( "unsupported encoding" ) ;
96+ }
97+ }
98+ }
99+ }
100+
38101impl < M : Send + Sync > HasRecordKeys for Envelope < M > {
39102 fn record_keys ( & self ) -> Keys {
40- self . record_keys . clone ( )
103+ self . inner . keys . clone ( )
41104 }
42105}
43106
44107impl < M > WithPartitionKey for Envelope < M > {
45108 fn partition_key ( & self ) -> PartitionKey {
46- match self . header . dest {
109+ match self . header ( ) . dest {
47110 Destination :: None => unimplemented ! ( "expect destinationt to be set" ) ,
48111 Destination :: Processor { partition_key, .. } => partition_key,
49112 }
50113 }
51114}
52115
53116impl < M > Envelope < M > {
117+ #[ inline]
54118 pub fn record_type ( & self ) -> RecordKind {
55- self . record . kind
119+ self . inner . kind
120+ }
121+
122+ #[ inline]
123+ pub fn header ( & self ) -> & Header {
124+ & self . inner . header
125+ }
126+
127+ pub fn kind ( & self ) -> RecordKind {
128+ self . inner . kind
56129 }
57130}
58131
@@ -64,19 +137,17 @@ impl Envelope<Raw> {
64137 /// Convers Raw Envelope into a Typed envelope. Panics
65138 /// if the record kind does not match the M::KIND
66139 pub fn into_typed < M : Record > ( self ) -> Envelope < M > {
67- assert_eq ! ( self . record . kind, M :: KIND ) ;
140+ assert_eq ! ( self . inner . kind, M :: KIND ) ;
68141
69142 let Self {
70- header,
71- record_keys,
72- record,
73- ..
143+ inner,
144+ payload,
145+ phantom : _,
74146 } = self ;
75147
76148 Envelope {
77- header,
78- record_keys,
79- record,
149+ inner,
150+ payload,
80151 phantom : PhantomData ,
81152 }
82153 }
@@ -94,25 +165,26 @@ impl<M: Record> Envelope<M> {
94165 {
95166 let mut buf = BytesMut :: new ( ) ;
96167 payload. encode ( & mut buf) ?;
97- let record = RawRecord {
98- data : buf. freeze ( ) ,
99- encoding : Some ( payload. default_codec ( ) ) ,
168+
169+ let inner = EnvelopeInner {
170+ header,
171+ keys : record_keys,
100172 kind : M :: KIND ,
173+ encoding : payload. default_codec ( ) . into ( ) ,
101174 } ;
102175
103176 Ok ( Self {
104- header,
105- record_keys,
106- record,
177+ inner,
178+ payload : buf. freeze ( ) ,
107179 phantom : PhantomData ,
108180 } )
109181 }
110182
111183 /// return the envelope payload
112184 pub fn payload ( & mut self ) -> Result < M :: Payload , StorageDecodeError > {
113185 M :: Payload :: decode (
114- & mut self . record . data ,
115- self . record . encoding . expect ( "encoding to be set" ) ,
186+ & mut self . payload ,
187+ self . inner . encoding . expect ( "encoding to be set" ) ,
116188 )
117189 }
118190
@@ -124,17 +196,11 @@ impl<M: Record> Envelope<M> {
124196/// It's always safe to go back to Raw Envelope
125197impl < M : Record > From < Envelope < M > > for Envelope < Raw > {
126198 fn from ( value : Envelope < M > ) -> Self {
127- let Envelope {
128- header,
129- record_keys,
130- record,
131- ..
132- } = value;
199+ let Envelope { inner, payload, .. } = value;
133200
134201 Self {
135- header,
136- record_keys,
137- record,
202+ inner,
203+ payload,
138204 phantom : PhantomData ,
139205 }
140206 }
@@ -466,10 +532,13 @@ mod sealed {
466532#[ cfg( test) ]
467533mod test {
468534
469- use bilrost:: { Message , OwnedMessage } ;
470- use bytes:: Bytes ;
535+ use bytes:: BytesMut ;
471536
472- use restate_types:: { GenerationalNodeId , logs:: Keys } ;
537+ use restate_types:: {
538+ GenerationalNodeId ,
539+ logs:: Keys ,
540+ storage:: { StorageCodecKind , StorageDecode , StorageEncode } ,
541+ } ;
473542
474543 use super :: { Dedup , Destination , Envelope , Header , Source , records} ;
475544 use crate :: {
@@ -500,9 +569,11 @@ mod test {
500569 records:: AnnounceLeader :: envelope ( header, Keys :: Single ( 1000 ) , payload. clone ( ) )
501570 . expect ( "to work" ) ;
502571
503- let data = envelope. encode_contiguous ( ) . into_vec ( ) ;
572+ let mut buf = BytesMut :: new ( ) ;
573+ envelope. encode ( & mut buf) . expect ( "to encode" ) ;
504574
505- let envelope = Envelope :: < Raw > :: decode ( Bytes :: from ( data) ) . expect ( "to decode" ) ;
575+ let envelope =
576+ Envelope :: < Raw > :: decode ( & mut buf, StorageCodecKind :: Custom ) . expect ( "to decode" ) ;
506577
507578 let mut typed = envelope. into_typed :: < records:: AnnounceLeader > ( ) ;
508579
0 commit comments