diff --git a/.changes/fixed/3112.md b/.changes/fixed/3112.md new file mode 100644 index 00000000000..7efa291b31c --- /dev/null +++ b/.changes/fixed/3112.md @@ -0,0 +1 @@ +Use Protobuf types in serialization rather than opaque bytes \ No newline at end of file diff --git a/crates/services/block_aggregator_api/Cargo.toml b/crates/services/block_aggregator_api/Cargo.toml index 312e9e3fcfc..66e7ad6da6b 100644 --- a/crates/services/block_aggregator_api/Cargo.toml +++ b/crates/services/block_aggregator_api/Cargo.toml @@ -10,6 +10,9 @@ repository = { workspace = true } rust-version = { workspace = true } build = "build.rs" +[features] +fault-proving = ["fuel-core-types/fault-proving"] + [dependencies] anyhow = { workspace = true } async-trait = { workspace = true } @@ -22,7 +25,7 @@ futures = { workspace = true } log = "0.4.27" num_enum = { workspace = true } postcard = { workspace = true } -prost = { workspace = true } +prost = { workspace = true, features = ["derive"] } rand = { workspace = true } serde = { workspace = true, features = ["derive"] } strum = { workspace = true } diff --git a/crates/services/block_aggregator_api/build.rs b/crates/services/block_aggregator_api/build.rs index c438a06453f..190a1538000 100644 --- a/crates/services/block_aggregator_api/build.rs +++ b/crates/services/block_aggregator_api/build.rs @@ -1,4 +1,7 @@ fn main() -> Result<(), Box> { - tonic_prost_build::compile_protos("proto/api.proto")?; + tonic_prost_build::configure() + .type_attribute(".", "#[derive(serde::Serialize,serde::Deserialize)]") + .type_attribute(".", "#[allow(clippy::large_enum_variant)]") + .compile_protos(&["proto/api.proto"], &["proto/"])?; Ok(()) } diff --git a/crates/services/block_aggregator_api/proto/api.proto b/crates/services/block_aggregator_api/proto/api.proto index 1e34a8fa8de..e95f1399a91 100644 --- a/crates/services/block_aggregator_api/proto/api.proto +++ b/crates/services/block_aggregator_api/proto/api.proto @@ -14,9 +14,521 @@ message BlockRangeRequest { } message Block { - bytes data = 1; + oneof versioned_block { + V1Block v1 = 1; + } +} + +message V1Block { + Header header = 1; + repeated Transaction transactions = 2; +} + +message Header { + oneof versioned_header { + V1Header v1 = 1; + V2Header v2 = 2; + } +} + +// pub struct BlockHeaderV1 { +// /// The application header. +// pub(crate) application: ApplicationHeader, +// /// The consensus header. +// pub(crate) consensus: ConsensusHeader, +// /// The header metadata calculated during creation. +// /// The field is pub(crate) to enforce the use of the [`PartialBlockHeader::generate`] method. +// #[cfg_attr(feature = "serde", serde(skip))] +// #[educe(PartialEq(ignore))] +// pub(crate) metadata: Option, +//} +// pub struct ApplicationHeader { +// /// The layer 1 height of messages and events to include since the last layer 1 block number. +// /// This is not meant to represent the layer 1 block this was committed to. Validators will need +// /// to have some rules in place to ensure the block number was chosen in a reasonable way. For +// /// example, they should verify that the block number satisfies the finality requirements of the +// /// layer 1 chain. They should also verify that the block number isn't too stale and is increasing. +// /// Some similar concerns are noted in this issue: +// pub da_height: DaBlockHeight, +// /// The version of the consensus parameters used to execute this block. +// pub consensus_parameters_version: ConsensusParametersVersion, +// /// The version of the state transition bytecode used to execute this block. +// pub state_transition_bytecode_version: StateTransitionBytecodeVersion, +// /// Generated application fields. +// pub generated: Generated, +//} +// pub struct GeneratedApplicationFieldsV1 { +// /// Number of transactions in this block. +// pub transactions_count: u16, +// /// Number of message receipts in this block. +// pub message_receipt_count: u32, +// /// Merkle root of transactions. +// pub transactions_root: Bytes32, +// /// Merkle root of message receipts in this block. +// pub message_outbox_root: Bytes32, +// /// Root hash of all imported events from L1 +// pub event_inbox_root: Bytes32, +//} +// pub struct ConsensusHeader { +// /// Merkle root of all previous block header hashes. +// pub prev_root: Bytes32, +// /// Fuel block height. +// pub height: BlockHeight, +// /// The block producer time. +// pub time: Tai64, +// /// generated consensus fields. +// pub generated: Generated, +//} +// pub struct GeneratedConsensusFields { +// /// Hash of the application header. +// pub application_hash: Bytes32, +//} +// pub struct BlockHeaderMetadata { +// /// Hash of the header. +// id: BlockId, +//} +message V1Header { + uint32 da_height = 1; + uint32 consensus_parameters_version = 2; + uint32 state_transition_bytecode_version = 3; + uint32 transactions_count = 4; + uint32 message_receipt_count = 5; + bytes transactions_root = 6; + bytes message_outbox_root = 7; + bytes event_inbox_root = 8; + bytes prev_root = 9; + uint32 height = 10; + bytes time = 11; + bytes application_hash = 12; + optional bytes block_id = 13; +} + +// pub struct GeneratedApplicationFieldsV2 { +// /// Number of transactions in this block. +// pub transactions_count: u16, +// /// Number of message receipts in this block. +// pub message_receipt_count: u32, +// /// Merkle root of transactions. +// pub transactions_root: Bytes32, +// /// Merkle root of message receipts in this block. +// pub message_outbox_root: Bytes32, +// /// Root hash of all imported events from L1 +// pub event_inbox_root: Bytes32, +// /// TxID commitment +// pub tx_id_commitment: Bytes32, +//} +message V2Header { + uint32 da_height = 1; + uint32 consensus_parameters_version = 2; + uint32 state_transition_bytecode_version = 3; + uint32 transactions_count = 4; + uint32 message_receipt_count = 5; + bytes transactions_root = 6; + bytes message_outbox_root = 7; + bytes event_inbox_root = 8; + bytes tx_id_commitment = 9; + bytes prev_root = 10; + uint32 height = 11; + bytes time = 12; + bytes application_hash = 13; + optional bytes block_id = 14; +} + +// pub enum Transaction { +// Script(Script), +// Create(Create), +// Mint(Mint), +// Upgrade(Upgrade), +// Upload(Upload), +// Blob(Blob), +//} +// TODO: implement other transaction types +// https://github.com/FuelLabs/fuel-core/issues/3122 +message Transaction { + oneof variant { + ScriptTransaction script = 1; +// CreateTx create = 2; +// MintTx mint = 3; +// UpgradeTx upgrade = 4; +// UploadTx upload = 5; +// BlobTx blob = 6; + } +} + +// pub struct ChargeableTransaction +//where +// Body: BodyConstraints, +//{ +// pub(crate) body: Body, +// pub(crate) policies: Policies, +// pub(crate) inputs: Vec, +// pub(crate) outputs: Vec, +// pub(crate) witnesses: Vec, +// #[serde(skip)] +// #[cfg_attr(feature = "da-compression", compress(skip))] +// #[educe(PartialEq(ignore))] +// #[educe(Hash(ignore))] +// #[canonical(skip)] +// pub(crate) metadata: Option>, +//} +// pub struct ScriptBody { +// pub(crate) script_gas_limit: Word, +// #[cfg_attr(feature = "da-compression", compress(skip))] +// pub(crate) receipts_root: Bytes32, +// pub(crate) script: ScriptCode, +// #[educe(Debug(method(fmt_truncated_hex::<16>)))] +// pub(crate) script_data: Vec, +//} +// #[derive(Default, Debug, Clone, PartialEq, Eq, Hash)] +//pub struct ScriptMetadata { +// pub script_data_offset: usize, +//} +message ScriptTransaction { + uint64 script_gas_limit = 1; + bytes receipts_root = 2; + bytes script = 3; + bytes script_data = 4; + Policies policies = 5; + repeated Input inputs = 6; + repeated Output outputs = 7; + repeated bytes witnesses = 8; + ScriptMetadata metadata = 9; +} + +// pub struct Policies { +// /// A bitmask that indicates what policies are set. +// bits: PoliciesBits, +// /// The array of policy values. +// values: [Word; POLICIES_NUMBER], +//} +message Policies { + uint32 bits = 1; + repeated uint64 values = 2; +} + +// pub enum Input { +// CoinSigned(CoinSigned), +// CoinPredicate(CoinPredicate), +// Contract(Contract), +// MessageCoinSigned(MessageCoinSigned), +// MessageCoinPredicate(MessageCoinPredicate), +// MessageDataSigned(MessageDataSigned), +// MessageDataPredicate(MessageDataPredicate), +//} +message Input { + oneof variant { + CoinSignedInput coin_signed = 1; + CoinPredicateInput coin_predicate = 2; + ContractInput contract = 3; + MessageCoinSignedInput message_coin_signed = 4; + MessageCoinPredicateInput message_coin_predicate = 5; + MessageDataSignedInput message_data_signed = 6; + MessageDataPredicateInput message_data_predicate = 7; + } +} + +// pub struct Coin +//where +// Specification: CoinSpecification, +//{ +// pub utxo_id: UtxoId, +// #[cfg_attr(feature = "da-compression", compress(skip))] +// pub owner: Address, +// #[cfg_attr(feature = "da-compression", compress(skip))] +// pub amount: Word, +// #[cfg_attr(feature = "da-compression", compress(skip))] +// pub asset_id: AssetId, +// #[cfg_attr(feature = "da-compression", compress(skip))] +// pub tx_pointer: TxPointer, +// #[educe(Debug(method(fmt_as_field)))] +// pub witness_index: Specification::Witness, +// /// Exact amount of gas used by the predicate. +// /// If the predicate consumes different amount of gas, +// /// it's considered to be false. +// #[educe(Debug(method(fmt_as_field)))] +// pub predicate_gas_used: Specification::PredicateGasUsed, +// #[educe(Debug(method(fmt_as_field)))] +// pub predicate: Specification::Predicate, +// #[educe(Debug(method(fmt_as_field)))] +// pub predicate_data: Specification::PredicateData, +//} +// impl CoinSpecification for Signed { +// type Predicate = Empty; +// type PredicateData = Empty>; +// type PredicateGasUsed = Empty; +// type Witness = u16; +//} +message CoinSignedInput { + UtxoId utxo_id = 1; + bytes owner = 2; + uint64 amount = 3; + bytes asset_id = 4; + bytes tx_pointer = 5; + uint32 witness_index = 6; + uint64 predicate_gas_used = 7; + bytes predicate = 8; + bytes predicate_data = 9; +} + +//impl CoinSpecification for Predicate { +// type Predicate = PredicateCode; +// type PredicateData = Vec; +// type PredicateGasUsed = Word; +// type Witness = Empty; +//} +message CoinPredicateInput { + UtxoId utxo_id = 1; + bytes owner = 2; + uint64 amount = 3; + bytes asset_id = 4; + bytes tx_pointer = 5; + uint32 witness_index = 6; + uint64 predicate_gas_used = 7; + bytes predicate = 8; + bytes predicate_data = 9; +} + +// pub struct Contract { +// #[cfg_attr(feature = "da-compression", compress(skip))] +// pub utxo_id: UtxoId, +// #[cfg_attr(feature = "da-compression", compress(skip))] +// pub balance_root: Bytes32, +// #[cfg_attr(feature = "da-compression", compress(skip))] +// pub state_root: Bytes32, +// /// Pointer to transaction that last modified the contract state. +// #[cfg_attr(feature = "da-compression", compress(skip))] +// pub tx_pointer: TxPointer, +// pub contract_id: ContractId, +//} +message ContractInput { + UtxoId utxo_id = 1; + bytes balance_root = 2; + bytes state_root = 3; + bytes tx_pointer = 4; + bytes contract_id = 5; +} + +// pub struct Message +//where +// Specification: MessageSpecification, +//{ +// /// The sender from the L1 chain. +// #[cfg_attr(feature = "da-compression", compress(skip))] +// pub sender: Address, +// /// The receiver on the `Fuel` chain. +// #[cfg_attr(feature = "da-compression", compress(skip))] +// pub recipient: Address, +// #[cfg_attr(feature = "da-compression", compress(skip))] +// pub amount: Word, +// // Unique identifier of the message +// pub nonce: Nonce, +// #[educe(Debug(method(fmt_as_field)))] +// pub witness_index: Specification::Witness, +// /// Exact amount of gas used by the predicate. +// /// If the predicate consumes different amount of gas, +// /// it's considered to be false. +// #[educe(Debug(method(fmt_as_field)))] +// pub predicate_gas_used: Specification::PredicateGasUsed, +// #[cfg_attr(feature = "da-compression", compress(skip))] +// #[educe(Debug(method(fmt_as_field)))] +// pub data: Specification::Data, +// #[educe(Debug(method(fmt_as_field)))] +// pub predicate: Specification::Predicate, +// #[educe(Debug(method(fmt_as_field)))] +// pub predicate_data: Specification::PredicateData, +//} +// pub struct MessageCoin(core::marker::PhantomData); +// +// impl MessageSpecification for MessageCoin { +// type Data = Empty>; +// type Predicate = Empty; +// type PredicateData = Empty>; +// type PredicateGasUsed = Empty; +// type Witness = u16; +// } +message MessageCoinSignedInput { + bytes sender = 1; + bytes recipient = 2; + uint64 amount = 3; + uint32 nonce = 4; + uint32 witness_index = 5; + uint64 predicate_gas_used = 6; + bytes data = 7; + bytes predicate = 8; + bytes predicate_data = 9; +} + +// impl MessageSpecification for MessageCoin { +// type Data = Empty>; +// type Predicate = PredicateCode; +// type PredicateData = Vec; +// type PredicateGasUsed = Word; +// type Witness = Empty; +// } +message MessageCoinPredicateInput { + bytes sender = 1; + bytes recipient = 2; + uint64 amount = 3; + uint32 nonce = 4; + uint32 witness_index = 5; + uint64 predicate_gas_used = 6; + bytes data = 7; + bytes predicate = 8; + bytes predicate_data = 9; +} + +// pub type MessageDataSigned = Message>; +message MessageDataSignedInput { + bytes sender = 1; + bytes recipient = 2; + uint64 amount = 3; + uint32 nonce = 4; + uint32 witness_index = 5; + uint64 predicate_gas_used = 6; + bytes data = 7; + bytes predicate = 8; + bytes predicate_data = 9; +} + +// pub type MessageDataPredicate = +// Message>; +message MessageDataPredicateInput { + bytes sender = 1; + bytes recipient = 2; + uint64 amount = 3; + uint32 nonce = 4; + uint32 witness_index = 5; + uint64 predicate_gas_used = 6; + bytes data = 7; + bytes predicate = 8; + bytes predicate_data = 9; } +// pub enum Output { +// Coin { +// to: Address, +// amount: Word, +// asset_id: AssetId, +// }, +// +// Contract(Contract), +// +// Change { +// to: Address, +// #[cfg_attr(feature = "da-compression", compress(skip))] +// amount: Word, +// asset_id: AssetId, +// }, +// +// Variable { +// #[cfg_attr(feature = "da-compression", compress(skip))] +// to: Address, +// #[cfg_attr(feature = "da-compression", compress(skip))] +// amount: Word, +// #[cfg_attr(feature = "da-compression", compress(skip))] +// asset_id: AssetId, +// }, +// +// ContractCreated { +// contract_id: ContractId, +// state_root: Bytes32, +// }, +//} +message Output { + oneof variant { + CoinOutput coin = 1; + ContractOutput contract = 2; + ChangeOutput change = 3; + VariableOutput variable = 4; + ContractCreatedOutput contract_created = 5; + } +} +message CoinOutput { + bytes to = 1; + uint64 amount = 2; + bytes asset_id = 3; +} +message ContractOutput { + bytes contract_id = 1; + bytes state_root = 2; +} +message ChangeOutput { + bytes to = 1; + uint64 amount = 2; + bytes asset_id = 3; +} +message VariableOutput { + bytes to = 1; + uint64 amount = 2; + bytes asset_id = 3; +} +message ContractCreatedOutput { + bytes contract_id = 1; + bytes state_root = 2; +} + +// pub struct UtxoId { +// /// transaction id +// tx_id: TxId, +// /// output index +// output_index: u16, +//} +message UtxoId { + bytes tx_id = 1; + uint32 output_index = 2; +} + + +// #[derive(Debug, Clone, PartialEq, Eq, Hash)] +//pub struct ChargeableMetadata { +// pub common: CommonMetadata, +// pub body: Body, +//} +// pub struct ScriptBody { +// pub(crate) script_gas_limit: Word, +// #[cfg_attr(feature = "da-compression", compress(skip))] +// pub(crate) receipts_root: Bytes32, +// pub(crate) script: ScriptCode, +// #[educe(Debug(method(fmt_truncated_hex::<16>)))] +// pub(crate) script_data: Vec, +//} +// #[derive(Debug, Clone, PartialEq, Eq, Hash)] +//pub struct CommonMetadata { +// pub id: Bytes32, +// pub inputs_offset: usize, +// pub inputs_offset_at: Vec, +// pub inputs_predicate_offset_at: Vec>, +// pub outputs_offset: usize, +// pub outputs_offset_at: Vec, +// pub witnesses_offset: usize, +// pub witnesses_offset_at: Vec, +//} + +message ScriptMetadata { + bytes id = 1; + uint32 inputs_offset = 2; + repeated uint32 inputs_offset_at = 3; + repeated PredicateOffset inputs_predicate_offset_at = 4; + uint32 outputs_offset = 5; + repeated uint32 outputs_offset_at = 6; + uint32 witnesses_offset = 7; + repeated uint32 witnesses_offset_at = 8; + uint64 script_gas_limit = 9; + bytes receipts_root = 10; + bytes script = 11; + bytes script_data = 12; +} + +message PredicateOffset { + optional InnerPredicateOffset offset = 1; +} + +message InnerPredicateOffset { + uint32 offset = 1; + uint32 length = 2; +} + + message BlockResponse { oneof payload { Block literal = 1; @@ -30,4 +542,4 @@ service BlockAggregator { rpc GetBlockHeight (BlockHeightRequest) returns (BlockHeightResponse); rpc GetBlockRange (BlockRangeRequest) returns (stream BlockResponse); rpc NewBlockSubscription (NewBlockSubscriptionRequest) returns (stream BlockResponse); -} \ No newline at end of file +} diff --git a/crates/services/block_aggregator_api/src/api.rs b/crates/services/block_aggregator_api/src/api.rs index 3cc652bdd09..4beb51c47f3 100644 --- a/crates/services/block_aggregator_api/src/api.rs +++ b/crates/services/block_aggregator_api/src/api.rs @@ -1,7 +1,4 @@ -use crate::{ - NewBlock, - result::Result, -}; +use crate::result::Result; use fuel_core_types::fuel_types::BlockHeight; use std::fmt; @@ -11,14 +8,17 @@ pub mod protobuf_adapter; pub trait BlockAggregatorApi: Send + Sync { /// The type of the block range response. type BlockRangeResponse; + type Block; /// Awaits the next query to the block aggregator service. fn await_query( &mut self, - ) -> impl Future>> + Send; + ) -> impl Future< + Output = Result>, + > + Send; } -pub enum BlockAggregatorQuery { +pub enum BlockAggregatorQuery { GetBlockRange { first: BlockHeight, last: BlockHeight, @@ -29,11 +29,11 @@ pub enum BlockAggregatorQuery { }, // TODO: Do we need a way to unsubscribe or can we just see that the receiver is dropped? NewBlockSubscription { - response: tokio::sync::mpsc::Sender, + response: tokio::sync::mpsc::Sender, }, } -impl fmt::Debug for BlockAggregatorQuery { +impl fmt::Debug for BlockAggregatorQuery { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { BlockAggregatorQuery::GetBlockRange { first, last, .. } => f @@ -52,7 +52,7 @@ impl fmt::Debug for BlockAggregatorQuery { } #[cfg(test)] -impl BlockAggregatorQuery { +impl BlockAggregatorQuery { pub fn get_block_range>( first: H, last: H, @@ -74,7 +74,7 @@ impl BlockAggregatorQuery { (query, receiver) } - pub fn new_block_subscription() -> (Self, tokio::sync::mpsc::Receiver) { + pub fn new_block_subscription() -> (Self, tokio::sync::mpsc::Receiver) { const ARBITRARY_CHANNEL_SIZE: usize = 10; let (sender, receiver) = tokio::sync::mpsc::channel(ARBITRARY_CHANNEL_SIZE); let query = Self::NewBlockSubscription { response: sender }; diff --git a/crates/services/block_aggregator_api/src/api/protobuf_adapter.rs b/crates/services/block_aggregator_api/src/api/protobuf_adapter.rs index 03d5c468655..c944e199917 100644 --- a/crates/services/block_aggregator_api/src/api/protobuf_adapter.rs +++ b/crates/services/block_aggregator_api/src/api/protobuf_adapter.rs @@ -4,28 +4,42 @@ use crate::{ BlockAggregatorQuery, }, block_range_response::BlockRangeResponse, - result::Result, + protobuf_types::{ + Block as ProtoBlock, + BlockHeightRequest as ProtoBlockHeightRequest, + BlockHeightResponse as ProtoBlockHeightResponse, + BlockRangeRequest as ProtoBlockRangeRequest, + BlockResponse as ProtoBlockResponse, + NewBlockSubscriptionRequest as ProtoNewBlockSubscriptionRequest, + block_aggregator_server::{ + BlockAggregator, + BlockAggregatorServer as ProtoBlockAggregatorServer, + }, + block_response as proto_block_response, + }, + result::{ + Error, + Result, + }, }; use async_trait::async_trait; use futures::StreamExt; use tokio_stream::wrappers::ReceiverStream; use tonic::Status; -tonic::include_proto!("blockaggregator"); - -use crate::result::Error; -use block_aggregator_server::BlockAggregator; - #[cfg(test)] mod tests; pub struct Server { - query_sender: tokio::sync::mpsc::Sender>, + query_sender: + tokio::sync::mpsc::Sender>, } impl Server { pub fn new( - query_sender: tokio::sync::mpsc::Sender>, + query_sender: tokio::sync::mpsc::Sender< + BlockAggregatorQuery, + >, ) -> Self { Self { query_sender } } @@ -35,8 +49,8 @@ impl Server { impl BlockAggregator for Server { async fn get_block_height( &self, - request: tonic::Request, - ) -> Result, tonic::Status> { + request: tonic::Request, + ) -> Result, tonic::Status> { tracing::debug!("get_block_height: {:?}", request); let (response, receiver) = tokio::sync::oneshot::channel(); let query = BlockAggregatorQuery::GetCurrentHeight { response }; @@ -45,7 +59,7 @@ impl BlockAggregator for Server { })?; let res = receiver.await; match res { - Ok(height) => Ok(tonic::Response::new(BlockHeightResponse { + Ok(height) => Ok(tonic::Response::new(ProtoBlockHeightResponse { height: *height, })), Err(e) => Err(tonic::Status::internal(format!( @@ -54,13 +68,13 @@ impl BlockAggregator for Server { ))), } } - type GetBlockRangeStream = ReceiverStream>; + type GetBlockRangeStream = ReceiverStream>; async fn get_block_range( &self, - request: tonic::Request, + request: tonic::Request, ) -> Result, tonic::Status> { - tracing::debug!("get_block_range: {:?}", request); + const ARB_LITERAL_BLOCK_BUFFER_SIZE: usize = 100; let req = request.into_inner(); let (response, receiver) = tokio::sync::oneshot::channel(); let query = BlockAggregatorQuery::GetBlockRange { @@ -76,17 +90,15 @@ impl BlockAggregator for Server { match res { Ok(block_range_response) => match block_range_response { BlockRangeResponse::Literal(inner) => { - let (tx, rx) = - tokio::sync::mpsc::channel::>(16); + let (tx, rx) = tokio::sync::mpsc::channel::< + Result, + >(ARB_LITERAL_BLOCK_BUFFER_SIZE); tokio::spawn(async move { let mut s = inner; - while let Some(block) = s.next().await { - let pb = Block { - data: block.bytes().to_vec(), - }; - let response = BlockResponse { - payload: Some(block_response::Payload::Literal(pb)), + while let Some(pb) = s.next().await { + let response = ProtoBlockResponse { + payload: Some(proto_block_response::Payload::Literal(pb)), }; if tx.send(Ok(response)).await.is_err() { break; @@ -108,11 +120,11 @@ impl BlockAggregator for Server { } } - type NewBlockSubscriptionStream = ReceiverStream>; + type NewBlockSubscriptionStream = ReceiverStream>; async fn new_block_subscription( &self, - request: tonic::Request, + request: tonic::Request, ) -> Result, tonic::Status> { const ARB_CHANNEL_SIZE: usize = 100; tracing::warn!("get_block_range: {:?}", request); @@ -126,11 +138,8 @@ impl BlockAggregator for Server { let (task_sender, task_receiver) = tokio::sync::mpsc::channel(ARB_CHANNEL_SIZE); tokio::spawn(async move { while let Some(nb) = receiver.recv().await { - let block = Block { - data: nb.block.bytes().to_vec(), - }; - let response = BlockResponse { - payload: Some(block_response::Payload::Literal(block)), + let response = ProtoBlockResponse { + payload: Some(proto_block_response::Payload::Literal(nb)), }; if task_sender.send(Ok(response)).await.is_err() { break; @@ -145,19 +154,21 @@ impl BlockAggregator for Server { pub struct ProtobufAPI { _server_task_handle: tokio::task::JoinHandle<()>, shutdown_sender: Option>, - query_receiver: tokio::sync::mpsc::Receiver>, + query_receiver: + tokio::sync::mpsc::Receiver>, } impl ProtobufAPI { pub fn new(url: String) -> Self { - let (query_sender, query_receiver) = - tokio::sync::mpsc::channel::>(100); + let (query_sender, query_receiver) = tokio::sync::mpsc::channel::< + BlockAggregatorQuery, + >(100); let server = Server::new(query_sender); let addr = url.parse().unwrap(); let (shutdown_sender, shutdown_receiver) = tokio::sync::oneshot::channel::<()>(); let _server_task_handle = tokio::spawn(async move { let service = tonic::transport::Server::builder() - .add_service(block_aggregator_server::BlockAggregatorServer::new(server)); + .add_service(ProtoBlockAggregatorServer::new(server)); tokio::select! { res = service.serve(addr) => { if let Err(e) = res { @@ -181,10 +192,11 @@ impl ProtobufAPI { impl BlockAggregatorApi for ProtobufAPI { type BlockRangeResponse = BlockRangeResponse; + type Block = ProtoBlock; async fn await_query( &mut self, - ) -> Result> { + ) -> Result> { let query = self .query_receiver .recv() diff --git a/crates/services/block_aggregator_api/src/api/protobuf_adapter/tests.rs b/crates/services/block_aggregator_api/src/api/protobuf_adapter/tests.rs index 1617090a7dd..7807ac02180 100644 --- a/crates/services/block_aggregator_api/src/api/protobuf_adapter/tests.rs +++ b/crates/services/block_aggregator_api/src/api/protobuf_adapter/tests.rs @@ -1,24 +1,32 @@ #![allow(non_snake_case)] use crate::{ - NewBlock, api::{ BlockAggregatorApi, BlockAggregatorQuery, - protobuf_adapter::{ - BlockHeightRequest, - BlockRangeRequest, - NewBlockSubscriptionRequest, - ProtobufAPI, - block_aggregator_client::BlockAggregatorClient, - block_response::Payload, - }, + protobuf_adapter::ProtobufAPI, }, block_range_response::BlockRangeResponse, - blocks::Block, + blocks::importer_and_db_source::{ + BlockSerializer, + serializer_adapter::SerializerAdapter, + }, + protobuf_types::{ + Block as ProtoBlock, + BlockHeightRequest, + BlockRangeRequest, + NewBlockSubscriptionRequest, + block_aggregator_client::{ + BlockAggregatorClient as ProtoBlockAggregatorClient, + BlockAggregatorClient, + }, + block_response::Payload, + }, +}; +use fuel_core_types::{ + blockchain::block::Block as FuelBlock, + fuel_types::BlockHeight, }; -use bytes::Bytes; -use fuel_core_types::fuel_types::BlockHeight; use futures::{ StreamExt, TryStreamExt, @@ -40,7 +48,7 @@ async fn await_query__get_current_height__client_receives_expected_value() { // call get current height endpoint with client let url = format!("http://{}", path); - let mut client = BlockAggregatorClient::connect(url.to_string()) + let mut client = ProtoBlockAggregatorClient::connect(url.to_string()) .await .expect("could not connect to server"); let handle = tokio::spawn(async move { @@ -77,7 +85,7 @@ async fn await_query__get_block_range__client_receives_expected_value() { // call get current height endpoint with client let url = format!("http://{}", path); - let mut client = BlockAggregatorClient::connect(url.to_string()) + let mut client = ProtoBlockAggregatorClient::connect(url.to_string()) .await .expect("could not connect to server"); let request = BlockRangeRequest { start: 0, end: 1 }; @@ -94,8 +102,17 @@ async fn await_query__get_block_range__client_receives_expected_value() { let query = api.await_query().await.unwrap(); // then - let block1 = Block::new(Bytes::from(vec![0u8; 100])); - let block2 = Block::new(Bytes::from(vec![1u8; 100])); + let serializer_adapter = SerializerAdapter; + let fuel_block_1 = FuelBlock::default(); + let mut fuel_block_2 = FuelBlock::default(); + let block_height_2 = fuel_block_1.header().height().succ().unwrap(); + fuel_block_2.header_mut().set_block_height(block_height_2); + let block1 = serializer_adapter + .serialize_block(&fuel_block_1) + .expect("could not serialize block"); + let block2 = serializer_adapter + .serialize_block(&fuel_block_2) + .expect("could not serialize block"); let list = vec![block1, block2]; // return response through query's channel if let BlockAggregatorQuery::GetBlockRange { @@ -115,8 +132,8 @@ async fn await_query__get_block_range__client_receives_expected_value() { } tracing::info!("awaiting query"); let response = handle.await.unwrap(); - let expected: Vec> = list.iter().map(|b| b.bytes().to_vec()).collect(); - let actual: Vec> = response + let expected = list; + let actual: Vec = response .into_inner() .try_collect::>() .await @@ -124,7 +141,7 @@ async fn await_query__get_block_range__client_receives_expected_value() { .into_iter() .map(|b| { if let Some(Payload::Literal(inner)) = b.payload { - inner.data.to_vec() + inner } else { panic!("unexpected response type") } @@ -162,22 +179,30 @@ async fn await_query__new_block_stream__client_receives_expected_value() { // then let height1 = BlockHeight::new(0); let height2 = BlockHeight::new(1); - let block1 = Block::new(Bytes::from(vec![0u8; 100])); - let block2 = Block::new(Bytes::from(vec![1u8; 100])); - let list = vec![(height1, block1), (height2, block2)]; + let serializer_adapter = SerializerAdapter; + let mut fuel_block_1 = FuelBlock::default(); + fuel_block_1.header_mut().set_block_height(height1); + let mut fuel_block_2 = FuelBlock::default(); + fuel_block_2.header_mut().set_block_height(height2); + let block1 = serializer_adapter + .serialize_block(&fuel_block_1) + .expect("could not serialize block"); + let block2 = serializer_adapter + .serialize_block(&fuel_block_2) + .expect("could not serialize block"); + let list = vec![block1, block2]; if let BlockAggregatorQuery::NewBlockSubscription { response } = query { tracing::info!("correct query received, sending response"); - for (height, block) in list.clone() { - let new_block = NewBlock::new(height, block); - response.send(new_block).await.unwrap(); + for block in list.clone() { + response.send(block).await.unwrap(); } } else { panic!("expected GetBlockRange query"); } tracing::info!("awaiting query"); let response = handle.await.unwrap(); - let expected: Vec> = list.iter().map(|(_, b)| b.bytes().to_vec()).collect(); - let actual: Vec> = response + let expected = list; + let actual: Vec = response .into_inner() .try_collect::>() .await @@ -185,7 +210,7 @@ async fn await_query__new_block_stream__client_receives_expected_value() { .into_iter() .map(|b| { if let Some(Payload::Literal(inner)) = b.payload { - inner.data.to_vec() + inner } else { panic!("unexpected response type") } diff --git a/crates/services/block_aggregator_api/src/block_aggregator.rs b/crates/services/block_aggregator_api/src/block_aggregator.rs index a271c129b8e..4fde80d22b7 100644 --- a/crates/services/block_aggregator_api/src/block_aggregator.rs +++ b/crates/services/block_aggregator_api/src/block_aggregator.rs @@ -1,6 +1,5 @@ use crate::{ BlockAggregator, - NewBlock, api::{ BlockAggregatorApi, BlockAggregatorQuery, @@ -17,11 +16,12 @@ use fuel_core_services::{ }; use fuel_core_types::fuel_types::BlockHeight; -impl BlockAggregator +impl BlockAggregator where Api: BlockAggregatorApi, - DB: BlockAggregatorDB, + DB: BlockAggregatorDB, Blocks: BlockSource, + ::Block: Clone + std::fmt::Debug, BlockRangeResponse: Send, { pub fn new(query: Api, database: DB, block_source: Blocks) -> Self { @@ -40,7 +40,9 @@ where pub async fn handle_query( &mut self, - res: crate::result::Result>, + res: crate::result::Result< + BlockAggregatorQuery, + >, ) -> TaskNextAction { tracing::debug!("Handling query: {res:?}"); let query = try_or_stop!(res, |e| { @@ -98,7 +100,7 @@ where async fn handle_new_block_subscription( &mut self, - response: tokio::sync::mpsc::Sender, + response: tokio::sync::mpsc::Sender, ) -> TaskNextAction { self.new_block_subscriptions.push(response); TaskNextAction::Continue @@ -106,8 +108,11 @@ where pub async fn handle_block( &mut self, - res: crate::result::Result, - ) -> TaskNextAction { + res: crate::result::Result::Block>>, + ) -> TaskNextAction + where + ::Block: std::fmt::Debug, + { tracing::debug!("Handling block: {res:?}"); let event = try_or_stop!(res, |e| { tracing::error!("Error receiving block from source: {e:?}"); @@ -115,7 +120,7 @@ where let (id, block) = match event { BlockSourceEvent::NewBlock(id, block) => { self.new_block_subscriptions.retain_mut(|sub| { - let send_res = sub.try_send(NewBlock::new(id, block.clone())); + let send_res = sub.try_send(block.clone()); match send_res { Ok(_) => true, Err(tokio::sync::mpsc::error::TrySendError::Full(_)) => { diff --git a/crates/services/block_aggregator_api/src/block_range_response.rs b/crates/services/block_aggregator_api/src/block_range_response.rs index 5e071bc3328..24e78af6ff4 100644 --- a/crates/services/block_aggregator_api/src/block_range_response.rs +++ b/crates/services/block_aggregator_api/src/block_range_response.rs @@ -1,4 +1,4 @@ -use crate::blocks::Block; +use crate::protobuf_types::Block as ProtoBlock; use fuel_core_services::stream::Stream; pub type BoxStream = core::pin::Pin + Send + 'static>>; @@ -6,7 +6,7 @@ pub type BoxStream = core::pin::Pin + Send + 'static /// The response to a block range query, either as a literal stream of blocks or as a remote URL pub enum BlockRangeResponse { /// A literal stream of blocks - Literal(BoxStream), + Literal(BoxStream), /// A remote URL where the blocks can be fetched Remote(String), } diff --git a/crates/services/block_aggregator_api/src/blocks.rs b/crates/services/block_aggregator_api/src/blocks.rs index de56f280975..fb8dc76a9c1 100644 --- a/crates/services/block_aggregator_api/src/blocks.rs +++ b/crates/services/block_aggregator_api/src/blocks.rs @@ -7,17 +7,20 @@ pub mod importer_and_db_source; /// Source from which blocks can be gathered for aggregation pub trait BlockSource: Send + Sync { + type Block; /// Asynchronously fetch the next block and its height - fn next_block(&mut self) -> impl Future> + Send; + fn next_block( + &mut self, + ) -> impl Future>> + Send; /// Drain any remaining blocks from the source fn drain(&mut self) -> impl Future> + Send; } #[derive(Debug, Eq, PartialEq, Hash)] -pub enum BlockSourceEvent { - NewBlock(BlockHeight, Block), - OldBlock(BlockHeight, Block), +pub enum BlockSourceEvent { + NewBlock(BlockHeight, B), + OldBlock(BlockHeight, B), } #[derive(Clone, Debug, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)] diff --git a/crates/services/block_aggregator_api/src/blocks/importer_and_db_source.rs b/crates/services/block_aggregator_api/src/blocks/importer_and_db_source.rs index 48450b118f1..892b2b40120 100644 --- a/crates/services/block_aggregator_api/src/blocks/importer_and_db_source.rs +++ b/crates/services/block_aggregator_api/src/blocks/importer_and_db_source.rs @@ -1,6 +1,5 @@ use crate::{ blocks::{ - Block, BlockSource, BlockSourceEvent, importer_and_db_source::importer_service::ImporterTask, @@ -37,21 +36,23 @@ mod tests; pub mod serializer_adapter; pub trait BlockSerializer { - fn serialize_block(&self, block: &FuelBlock) -> Result; + type Block; + fn serialize_block(&self, block: &FuelBlock) -> Result; } pub struct ImporterAndDbSource where Serializer: BlockSerializer + Send + Sync + 'static, + ::Block: Send + Sync + 'static, DB: Send + Sync + 'static, DB: StorageInspect, DB: StorageInspect, E: std::fmt::Debug + Send, { - importer_task: ServiceRunner>, - sync_task: ServiceRunner>, + importer_task: ServiceRunner>, + sync_task: ServiceRunner>, /// Receive blocks from the importer and sync tasks - receiver: tokio::sync::mpsc::Receiver, + receiver: tokio::sync::mpsc::Receiver>, _error_marker: std::marker::PhantomData, } @@ -59,6 +60,7 @@ where impl ImporterAndDbSource where Serializer: BlockSerializer + Clone + Send + Sync + 'static, + ::Block: Send + Sync + 'static, DB: StorageInspect + Send + Sync, DB: StorageInspect + Send + 'static, E: std::fmt::Debug + Send, @@ -103,12 +105,15 @@ where impl BlockSource for ImporterAndDbSource where Serializer: BlockSerializer + Send + Sync + 'static, + ::Block: Send + Sync + 'static, DB: Send + Sync, DB: StorageInspect, DB: StorageInspect, E: std::fmt::Debug + Send + Sync, { - async fn next_block(&mut self) -> Result { + type Block = Serializer::Block; + + async fn next_block(&mut self) -> Result> { tracing::debug!("awaiting next block"); tokio::select! { block_res = self.receiver.recv() => { diff --git a/crates/services/block_aggregator_api/src/blocks/importer_and_db_source/importer_service.rs b/crates/services/block_aggregator_api/src/blocks/importer_and_db_source/importer_service.rs index 500d7d0de08..74151e2a0c7 100644 --- a/crates/services/block_aggregator_api/src/blocks/importer_and_db_source/importer_service.rs +++ b/crates/services/block_aggregator_api/src/blocks/importer_and_db_source/importer_service.rs @@ -18,21 +18,22 @@ use fuel_core_types::{ use futures::StreamExt; use tokio::sync::mpsc::Sender; -pub struct ImporterTask { +pub struct ImporterTask { importer: BoxStream, serializer: Serializer, - block_return_sender: Sender, + block_return_sender: Sender>, new_end_sender: Option>, } -impl ImporterTask +impl ImporterTask where Serializer: BlockSerializer + Send, + ::Block: Send, { pub fn new( importer: BoxStream, serializer: Serializer, - block_return: Sender, + block_return: Sender>, new_end_sender: Option>, ) -> Self { Self { @@ -43,9 +44,10 @@ where } } } -impl RunnableTask for ImporterTask +impl RunnableTask for ImporterTask where Serializer: BlockSerializer + Send + Sync, + ::Block: Send, { async fn run(&mut self, watcher: &mut StateWatcher) -> TaskNextAction { tokio::select! { @@ -61,7 +63,7 @@ where } } -impl ImporterTask +impl ImporterTask where Serializer: BlockSerializer + Send + Sync, { @@ -110,9 +112,10 @@ where } #[async_trait::async_trait] -impl RunnableService for ImporterTask +impl RunnableService for ImporterTask where Serializer: BlockSerializer + Send + Sync + 'static, + ::Block: Send + 'static, { const NAME: &'static str = "BlockSourceImporterTask"; type SharedData = (); diff --git a/crates/services/block_aggregator_api/src/blocks/importer_and_db_source/serializer_adapter.rs b/crates/services/block_aggregator_api/src/blocks/importer_and_db_source/serializer_adapter.rs index 028c66081bb..c8e24282fc6 100644 --- a/crates/services/block_aggregator_api/src/blocks/importer_and_db_source/serializer_adapter.rs +++ b/crates/services/block_aggregator_api/src/blocks/importer_and_db_source/serializer_adapter.rs @@ -1,23 +1,198 @@ +#[cfg(feature = "fault-proving")] +use crate::protobuf_types::V2Header as ProtoV2Header; use crate::{ - blocks::{ - Block, - importer_and_db_source::BlockSerializer, + blocks::importer_and_db_source::BlockSerializer, + protobuf_types::{ + Block as ProtoBlock, + Header as ProtoHeader, + Policies as ProtoPolicies, + ScriptTransaction as ProtoScriptTx, + Transaction as ProtoTransaction, + V1Block as ProtoV1Block, + V1Header as ProtoV1Header, + block::VersionedBlock as ProtoVersionedBlock, + header::VersionedHeader as ProtoVersionedHeader, + transaction::Variant as ProtoTransactionVariant, + }, +}; +#[cfg(feature = "fault-proving")] +use fuel_core_types::blockchain::header::BlockHeaderV2; +use fuel_core_types::{ + blockchain::{ + block::Block as FuelBlock, + header::{ + BlockHeader, + BlockHeaderV1, + ConsensusHeader, + GeneratedConsensusFields, + }, + primitives::BlockId, + }, + fuel_tx::{ + Transaction as FuelTransaction, + field::{ + Policies as _, + ReceiptsRoot as _, + Script as _, + ScriptData as _, + ScriptGasLimit as _, + Witnesses as _, + }, + policies::PolicyType, }, - result::Error, }; - -use anyhow::anyhow; -use fuel_core_types::blockchain::block::Block as FuelBlock; -use postcard::to_allocvec; #[derive(Clone)] pub struct SerializerAdapter; impl BlockSerializer for SerializerAdapter { - fn serialize_block(&self, block: &FuelBlock) -> crate::result::Result { - let bytes_vec = to_allocvec(block).map_err(|e| { - Error::BlockSource(anyhow!("failed to serialize block: {}", e)) - })?; - Ok(crate::blocks::Block::from(bytes_vec)) + type Block = ProtoBlock; + + fn serialize_block(&self, block: &FuelBlock) -> crate::result::Result { + // TODO: Should this be owned to begin with? + let (header, txs) = block.clone().into_inner(); + let proto_header = proto_header_from_header(header); + match &block { + FuelBlock::V1(_) => { + let proto_v1_block = ProtoV1Block { + header: Some(proto_header), + transactions: txs.into_iter().map(proto_tx_from_tx).collect(), + }; + Ok(ProtoBlock { + versioned_block: Some(ProtoVersionedBlock::V1(proto_v1_block)), + }) + } + } + } +} + +fn proto_header_from_header(header: BlockHeader) -> ProtoHeader { + let block_id = header.id(); + let consensus = *header.consensus(); + let versioned_header = match header { + BlockHeader::V1(header) => { + let proto_v1_header = + proto_v1_header_from_v1_header(consensus, block_id, header); + ProtoVersionedHeader::V1(proto_v1_header) + } + #[cfg(feature = "fault-proving")] + BlockHeader::V2(header) => { + let proto_v2_header = + proto_v2_header_from_v2_header(consensus, block_id, header); + ProtoVersionedHeader::V2(proto_v2_header) + } + }; + + ProtoHeader { + versioned_header: Some(versioned_header), + } +} + +fn proto_v1_header_from_v1_header( + consensus: ConsensusHeader, + block_id: BlockId, + header: BlockHeaderV1, +) -> ProtoV1Header { + let application = header.application(); + let generated = application.generated; + + ProtoV1Header { + da_height: saturating_u64_to_u32(application.da_height.0), + consensus_parameters_version: application.consensus_parameters_version, + state_transition_bytecode_version: application.state_transition_bytecode_version, + transactions_count: u32::from(generated.transactions_count), + message_receipt_count: generated.message_receipt_count, + transactions_root: bytes32_to_vec(&generated.transactions_root), + message_outbox_root: bytes32_to_vec(&generated.message_outbox_root), + event_inbox_root: bytes32_to_vec(&generated.event_inbox_root), + prev_root: bytes32_to_vec(&consensus.prev_root), + height: u32::from(consensus.height), + time: consensus.time.0.to_be_bytes().to_vec(), + application_hash: bytes32_to_vec(&consensus.generated.application_hash), + block_id: Some(block_id.as_slice().to_vec()), + } +} + +#[cfg(feature = "fault-proving")] +fn proto_v2_header_from_v2_header( + consensus: ConsensusHeader, + block_id: BlockId, + header: BlockHeaderV2, +) -> ProtoV2Header { + let application = *header.application(); + let generated = application.generated; + + ProtoV2Header { + da_height: saturating_u64_to_u32(application.da_height.0), + consensus_parameters_version: application.consensus_parameters_version, + state_transition_bytecode_version: application.state_transition_bytecode_version, + transactions_count: u32::from(generated.transactions_count), + message_receipt_count: generated.message_receipt_count, + transactions_root: bytes32_to_vec(&generated.transactions_root), + message_outbox_root: bytes32_to_vec(&generated.message_outbox_root), + event_inbox_root: bytes32_to_vec(&generated.event_inbox_root), + tx_id_commitment: bytes32_to_vec(&generated.tx_id_commitment), + prev_root: bytes32_to_vec(&consensus.prev_root), + height: u32::from(consensus.height), + time: consensus.time.0.to_be_bytes().to_vec(), + application_hash: bytes32_to_vec(&consensus.generated.application_hash), + block_id: Some(block_id.as_slice().to_vec()), + } +} + +fn proto_tx_from_tx(tx: FuelTransaction) -> ProtoTransaction { + match tx { + FuelTransaction::Script(script) => { + let proto_script = ProtoScriptTx { + script_gas_limit: *script.script_gas_limit(), + receipts_root: bytes32_to_vec(script.receipts_root()), + script: script.script().clone(), + script_data: script.script_data().clone(), + policies: Some(proto_policies_from_policies(script.policies())), + inputs: Vec::new(), + outputs: Vec::new(), + witnesses: script + .witnesses() + .iter() + .map(|witness| witness.as_ref().to_vec()) + .collect(), + metadata: None, + }; + + ProtoTransaction { + variant: Some(ProtoTransactionVariant::Script(proto_script)), + } + } + _ => ProtoTransaction { variant: None }, } } + +fn proto_policies_from_policies( + policies: &fuel_core_types::fuel_tx::policies::Policies, +) -> ProtoPolicies { + const POLICY_ORDER: [PolicyType; 5] = [ + PolicyType::Tip, + PolicyType::WitnessLimit, + PolicyType::Maturity, + PolicyType::MaxFee, + PolicyType::Expiration, + ]; + + let values = POLICY_ORDER + .iter() + .map(|policy_type| policies.get(*policy_type).unwrap_or_default()) + .collect(); + + ProtoPolicies { + bits: policies.bits(), + values, + } +} + +fn bytes32_to_vec(bytes: &fuel_core_types::fuel_types::Bytes32) -> Vec { + bytes.as_ref().to_vec() +} + +fn saturating_u64_to_u32(value: u64) -> u32 { + value.min(u32::MAX as u64) as u32 +} diff --git a/crates/services/block_aggregator_api/src/blocks/importer_and_db_source/sync_service.rs b/crates/services/block_aggregator_api/src/blocks/importer_and_db_source/sync_service.rs index af0cbf801be..be8b6b19e94 100644 --- a/crates/services/block_aggregator_api/src/blocks/importer_and_db_source/sync_service.rs +++ b/crates/services/block_aggregator_api/src/blocks/importer_and_db_source/sync_service.rs @@ -28,16 +28,16 @@ use fuel_core_types::{ use std::time::Duration; use tokio::sync::mpsc::Sender; -pub struct SyncTask { +pub struct SyncTask { serializer: Serializer, - block_return_sender: Sender, + block_return_sender: Sender>, db: DB, next_height: BlockHeight, maybe_stop_height: Option, new_ending_height: tokio::sync::oneshot::Receiver, } -impl SyncTask +impl SyncTask where Serializer: BlockSerializer + Send, DB: StorageInspect + Send + 'static, @@ -46,7 +46,7 @@ where { pub fn new( serializer: Serializer, - block_return: Sender, + block_return: Sender>, db: DB, db_starting_height: BlockHeight, db_ending_height: Option, @@ -107,9 +107,10 @@ where } } -impl RunnableTask for SyncTask +impl RunnableTask for SyncTask where Serializer: BlockSerializer + Send + Sync, + Serializer::Block: Send + Sync + 'static, DB: Send + Sync + 'static, DB: StorageInspect + Send + 'static, DB: StorageInspect + Send + 'static, @@ -152,9 +153,10 @@ where } #[async_trait::async_trait] -impl RunnableService for SyncTask +impl RunnableService for SyncTask where Serializer: BlockSerializer + Send + Sync + 'static, + ::Block: Send + Sync + 'static, DB: Send + Sync + 'static, DB: StorageInspect + Send + 'static, DB: StorageInspect + Send + 'static, diff --git a/crates/services/block_aggregator_api/src/blocks/importer_and_db_source/tests.rs b/crates/services/block_aggregator_api/src/blocks/importer_and_db_source/tests.rs index 92e04d69e5f..64d0256dbae 100644 --- a/crates/services/block_aggregator_api/src/blocks/importer_and_db_source/tests.rs +++ b/crates/services/block_aggregator_api/src/blocks/importer_and_db_source/tests.rs @@ -1,6 +1,7 @@ #![allow(non_snake_case)] use super::*; +use crate::blocks::Block; use ::postcard::to_allocvec; use fuel_core_services::stream::{ IntoBoxStream, @@ -34,6 +35,8 @@ use std::sync::Arc; pub struct MockSerializer; impl BlockSerializer for MockSerializer { + type Block = Block; + fn serialize_block(&self, block: &FuelBlock) -> Result { let bytes_vec = to_allocvec(block).map_err(|e| { Error::BlockSource(anyhow!("failed to serialize block: {}", e)) @@ -46,7 +49,6 @@ fn database() -> StorageTransaction> { InMemoryStorage::default().into_transaction() } -// let block_stream = tokio_stream::iter(blocks).chain(pending()).into_boxed(); fn stream_with_pending(items: Vec) -> BoxStream { tokio_stream::iter(items).chain(pending()).into_boxed() } diff --git a/crates/services/block_aggregator_api/src/db.rs b/crates/services/block_aggregator_api/src/db.rs index 13a0bcc8489..d664bd13932 100644 --- a/crates/services/block_aggregator_api/src/db.rs +++ b/crates/services/block_aggregator_api/src/db.rs @@ -1,13 +1,11 @@ -use crate::{ - blocks::Block, - result::Result, -}; +use crate::result::Result; use fuel_core_types::fuel_types::BlockHeight; pub mod storage_db; /// The definition of the block aggregator database. pub trait BlockAggregatorDB: Send + Sync { + type Block; /// The type used to report a range of blocks type BlockRangeResponse; @@ -15,7 +13,7 @@ pub trait BlockAggregatorDB: Send + Sync { fn store_block( &mut self, height: BlockHeight, - block: Block, + block: Self::Block, ) -> impl Future> + Send; /// Retrieves a range of blocks from the database diff --git a/crates/services/block_aggregator_api/src/db/storage_db.rs b/crates/services/block_aggregator_api/src/db/storage_db.rs index cac501b2ddf..7aeac0a91d1 100644 --- a/crates/services/block_aggregator_api/src/db/storage_db.rs +++ b/crates/services/block_aggregator_api/src/db/storage_db.rs @@ -1,10 +1,10 @@ use crate::{ block_range_response::BlockRangeResponse, - blocks::Block, db::{ BlockAggregatorDB, storage_db::table::Column, }, + protobuf_types::Block as ProtoBlock, result::{ Error, Result, @@ -105,9 +105,14 @@ where T: Unpin + Send + Sync + KeyValueInspect + 'static + std::fmt::Debug, StorageTransaction: StorageInspect, { + type Block = ProtoBlock; type BlockRangeResponse = BlockRangeResponse; - async fn store_block(&mut self, height: BlockHeight, block: Block) -> Result<()> { + async fn store_block( + &mut self, + height: BlockHeight, + block: ProtoBlock, + ) -> Result<()> { self.update_highest_contiguous_block(height); let mut tx = self.storage.write_transaction(); tx.storage_as_mut::() @@ -156,7 +161,7 @@ where S: Unpin + ReadTransaction + std::fmt::Debug, for<'a> StorageTransaction<&'a S>: StorageInspect, { - type Item = Block; + type Item = ProtoBlock; fn poll_next( self: Pin<&mut Self>, diff --git a/crates/services/block_aggregator_api/src/db/storage_db/table.rs b/crates/services/block_aggregator_api/src/db/storage_db/table.rs index 525645100e8..be11785c7af 100644 --- a/crates/services/block_aggregator_api/src/db/storage_db/table.rs +++ b/crates/services/block_aggregator_api/src/db/storage_db/table.rs @@ -1,4 +1,4 @@ -use crate::blocks::Block; +use crate::protobuf_types::Block as ProtoBlock; use fuel_core_storage::{ Mappable, blueprint::plain::Plain, @@ -51,7 +51,7 @@ impl Mappable for Blocks { type Key = Self::OwnedKey; type OwnedKey = BlockHeight; type Value = Self::OwnedValue; - type OwnedValue = Block; + type OwnedValue = ProtoBlock; } impl TableWithBlueprint for Blocks { diff --git a/crates/services/block_aggregator_api/src/db/storage_db/tests.rs b/crates/services/block_aggregator_api/src/db/storage_db/tests.rs index f09cdaafc2b..593839e406a 100644 --- a/crates/services/block_aggregator_api/src/db/storage_db/tests.rs +++ b/crates/services/block_aggregator_api/src/db/storage_db/tests.rs @@ -1,31 +1,43 @@ #![allow(non_snake_case)] use super::*; -use crate::db::storage_db::table::Column; +use crate::{ + blocks::importer_and_db_source::{ + BlockSerializer, + serializer_adapter::SerializerAdapter, + }, + db::storage_db::table::Column, +}; use fuel_core_storage::{ StorageAsRef, structured_storage::test::InMemoryStorage, transactional::IntoTransaction, }; use fuel_core_types::{ - ed25519::signature::rand_core::SeedableRng, + blockchain::block::Block as FuelBlock, + fuel_tx::Transaction, fuel_types::BlockHeight, }; use futures::StreamExt; -use rand::rngs::StdRng; fn database() -> StorageTransaction> { InMemoryStorage::default().into_transaction() } +fn proto_block_with_height(height: BlockHeight) -> ProtoBlock { + let serializer_adapter = SerializerAdapter; + let mut default_block = FuelBlock::::default(); + default_block.header_mut().set_block_height(height); + serializer_adapter.serialize_block(&default_block).unwrap() +} + #[tokio::test] async fn store_block__adds_to_storage() { - let mut rng = StdRng::seed_from_u64(666); // given let db = database(); let mut adapter = StorageDB::new(db); let height = BlockHeight::from(1u32); - let expected = Block::random(&mut rng); + let expected = proto_block_with_height(height); // when adapter.store_block(height, expected.clone()).await.unwrap(); @@ -43,15 +55,15 @@ async fn store_block__adds_to_storage() { #[tokio::test] async fn get_block__can_get_expected_range() { - let mut rng = StdRng::seed_from_u64(666); // given let mut db = database(); let height_1 = BlockHeight::from(1u32); let height_2 = BlockHeight::from(2u32); let height_3 = BlockHeight::from(3u32); - let expected_1 = Block::random(&mut rng); - let expected_2 = Block::random(&mut rng); - let expected_3 = Block::random(&mut rng); + + let expected_1 = proto_block_with_height(height_1); + let expected_2 = proto_block_with_height(height_2); + let expected_3 = proto_block_with_height(height_3); let mut tx = db.write_transaction(); tx.storage_as_mut::() @@ -82,12 +94,11 @@ async fn get_block__can_get_expected_range() { #[tokio::test] async fn store_block__updates_the_highest_continuous_block_if_contiguous() { - let mut rng = StdRng::seed_from_u64(666); // given let db = database(); let mut adapter = StorageDB::new_with_height(db, BlockHeight::from(0u32)); let height = BlockHeight::from(1u32); - let expected = Block::random(&mut rng); + let expected = proto_block_with_height(height); // when adapter.store_block(height, expected.clone()).await.unwrap(); @@ -100,13 +111,12 @@ async fn store_block__updates_the_highest_continuous_block_if_contiguous() { #[tokio::test] async fn store_block__does_not_update_the_highest_continuous_block_if_not_contiguous() { - let mut rng = StdRng::seed_from_u64(666); // given let db = database(); let starting_height = BlockHeight::from(0u32); let mut adapter = StorageDB::new_with_height(db, starting_height); let height = BlockHeight::from(2u32); - let expected = Block::random(&mut rng); + let expected = proto_block_with_height(height); // when adapter.store_block(height, expected.clone()).await.unwrap(); @@ -119,7 +129,6 @@ async fn store_block__does_not_update_the_highest_continuous_block_if_not_contig #[tokio::test] async fn store_block__updates_the_highest_continuous_block_if_filling_a_gap() { - let mut rng = StdRng::seed_from_u64(666); // given let db = database(); let starting_height = BlockHeight::from(0u32); @@ -129,7 +138,7 @@ async fn store_block__updates_the_highest_continuous_block_if_filling_a_gap() { for height in 2..=10u32 { let height = BlockHeight::from(height); orphaned_height = Some(height); - let block = Block::random(&mut rng); + let block = proto_block_with_height(height); adapter.store_block(height, block).await.unwrap(); } let expected = starting_height; @@ -138,8 +147,11 @@ async fn store_block__updates_the_highest_continuous_block_if_filling_a_gap() { // when let height = BlockHeight::from(1u32); - let expected = Block::random(&mut rng); - adapter.store_block(height, expected.clone()).await.unwrap(); + let some_block = proto_block_with_height(height); + adapter + .store_block(height, some_block.clone()) + .await + .unwrap(); // then let expected = orphaned_height.unwrap(); diff --git a/crates/services/block_aggregator_api/src/lib.rs b/crates/services/block_aggregator_api/src/lib.rs index 900e1c56087..e3e9057d7d7 100644 --- a/crates/services/block_aggregator_api/src/lib.rs +++ b/crates/services/block_aggregator_api/src/lib.rs @@ -1,9 +1,6 @@ use crate::{ api::BlockAggregatorApi, - blocks::{ - Block, - BlockSource, - }, + blocks::BlockSource, db::BlockAggregatorDB, }; use fuel_core_services::{ @@ -13,6 +10,8 @@ use fuel_core_services::{ TaskNextAction, }; use fuel_core_types::fuel_types::BlockHeight; +use protobuf_types::Block as ProtoBlock; +use std::fmt::Debug; pub mod api; pub mod blocks; @@ -21,6 +20,8 @@ pub mod result; pub mod block_range_response; +pub mod protobuf_types; + pub mod integration { use crate::{ BlockAggregator, @@ -33,6 +34,7 @@ pub mod integration { ImporterAndDbSource, }, db::BlockAggregatorDB, + protobuf_types::Block as ProtoBlock, }; use fuel_core_services::{ ServiceRunner, @@ -56,6 +58,7 @@ pub mod integration { pub addr: SocketAddr, } + #[allow(clippy::type_complexity)] pub fn new_service( config: &Config, db: DB, @@ -63,13 +66,19 @@ pub mod integration { onchain_db: OnchainDB, importer: BoxStream, ) -> ServiceRunner< - BlockAggregator>, + BlockAggregator< + ProtobufAPI, + DB, + ImporterAndDbSource, + ProtoBlock, + >, > where DB: BlockAggregatorDB< BlockRangeResponse = ::BlockRangeResponse, + Block = ProtoBlock, >, - S: BlockSerializer + Clone + Send + Sync + 'static, + S: BlockSerializer + Clone + Send + Sync + 'static, OnchainDB: Send + Sync, OnchainDB: StorageInspect, OnchainDB: StorageInspect, @@ -104,33 +113,35 @@ pub mod block_aggregator; // but we can change the name later /// The Block Aggregator service, which aggregates blocks from a source and stores them in a database /// Queries can be made to the service to retrieve data from the `DB` -pub struct BlockAggregator { +pub struct BlockAggregator { query: Api, database: DB, block_source: Blocks, - new_block_subscriptions: Vec>, + new_block_subscriptions: Vec>, } pub struct NewBlock { height: BlockHeight, - block: Block, + block: ProtoBlock, } impl NewBlock { - pub fn new(height: BlockHeight, block: Block) -> Self { + pub fn new(height: BlockHeight, block: ProtoBlock) -> Self { Self { height, block } } - pub fn into_inner(self) -> (BlockHeight, Block) { + pub fn into_inner(self) -> (BlockHeight, ProtoBlock) { (self.height, self.block) } } -impl RunnableTask for BlockAggregator +impl RunnableTask + for BlockAggregator where - Api: BlockAggregatorApi, - DB: BlockAggregatorDB, + Api: BlockAggregatorApi, + DB: BlockAggregatorDB, Blocks: BlockSource, + ::Block: Clone + std::fmt::Debug + Send, BlockRange: Send, { async fn run(&mut self, watcher: &mut StateWatcher) -> TaskNextAction { @@ -151,12 +162,15 @@ where } #[async_trait::async_trait] -impl RunnableService for BlockAggregator +impl RunnableService + for BlockAggregator where - Api: BlockAggregatorApi, - DB: BlockAggregatorDB, + Api: + BlockAggregatorApi + Send, + DB: BlockAggregatorDB + Send, Blocks: BlockSource, BlockRange: Send, + ::Block: Clone + Debug + Send, { const NAME: &'static str = "BlockAggregatorService"; type SharedData = (); diff --git a/crates/services/block_aggregator_api/src/protobuf_types.rs b/crates/services/block_aggregator_api/src/protobuf_types.rs new file mode 100644 index 00000000000..648ac0e278d --- /dev/null +++ b/crates/services/block_aggregator_api/src/protobuf_types.rs @@ -0,0 +1 @@ +tonic::include_proto!("blockaggregator"); diff --git a/crates/services/block_aggregator_api/src/tests.rs b/crates/services/block_aggregator_api/src/tests.rs index ac069687760..d8b9a8744e5 100644 --- a/crates/services/block_aggregator_api/src/tests.rs +++ b/crates/services/block_aggregator_api/src/tests.rs @@ -36,21 +36,22 @@ use tokio::{ type BlockRangeResponse = BoxStream; -struct FakeApi { - receiver: Receiver>, +struct FakeApi { + receiver: Receiver>, } -impl FakeApi { - fn new() -> (Self, Sender>) { +impl FakeApi { + fn new() -> (Self, Sender>) { let (sender, receiver) = tokio::sync::mpsc::channel(1); let api = Self { receiver }; (api, sender) } } -impl BlockAggregatorApi for FakeApi { +impl BlockAggregatorApi for FakeApi { type BlockRangeResponse = T; - async fn await_query(&mut self) -> Result> { + type Block = B; + async fn await_query(&mut self) -> Result> { Ok(self.receiver.recv().await.unwrap()) } } @@ -75,6 +76,7 @@ impl FakeDB { } impl BlockAggregatorDB for FakeDB { + type Block = Block; type BlockRangeResponse = BlockRangeResponse; async fn store_block(&mut self, id: BlockHeight, block: Block) -> Result<()> { @@ -111,11 +113,11 @@ impl BlockAggregatorDB for FakeDB { } struct FakeBlockSource { - blocks: Receiver, + blocks: Receiver>, } impl FakeBlockSource { - fn new() -> (Self, Sender) { + fn new() -> (Self, Sender>) { let (_sender, receiver) = tokio::sync::mpsc::channel(1); let _self = Self { blocks: receiver }; (_self, _sender) @@ -123,7 +125,9 @@ impl FakeBlockSource { } impl BlockSource for FakeBlockSource { - async fn next_block(&mut self) -> Result { + type Block = Block; + + async fn next_block(&mut self) -> Result> { self.blocks .recv() .await @@ -243,12 +247,8 @@ async fn run__new_block_subscription__sends_new_block() { let _ = srv.run(&mut watcher).await; // then - let (actual_height, actual_block) = await_response_with_timeout(response) - .await - .unwrap() - .into_inner(); + let actual_block = await_response_with_timeout(response).await.unwrap(); assert_eq!(expected_block, actual_block); - assert_eq!(expected_height, actual_height); // cleanup drop(source_sender); diff --git a/tests/Cargo.toml b/tests/Cargo.toml index d68dcf22481..377d92b1740 100644 --- a/tests/Cargo.toml +++ b/tests/Cargo.toml @@ -19,6 +19,7 @@ aws-kms = ["dep:aws-config", "dep:aws-sdk-kms", "fuel-core-bin/aws-kms"] fault-proving = [ "fuel-core/fault-proving", "fuel-core-types/fault-proving", + "fuel-core-block-aggregator-api/fault-proving", "fuel-core-storage/fault-proving", "fuel-core-upgradable-executor/fault-proving", "fuel-core-poa/fault-proving", diff --git a/tests/tests/rpc.rs b/tests/tests/rpc.rs index c2aecedecc1..aa6c564834b 100644 --- a/tests/tests/rpc.rs +++ b/tests/tests/rpc.rs @@ -1,5 +1,4 @@ #![allow(non_snake_case)] - use fuel_core::{ database::Database, service::{ @@ -7,16 +6,17 @@ use fuel_core::{ FuelService, }, }; -use fuel_core_block_aggregator_api::api::protobuf_adapter::{ - block_aggregator_client::BlockAggregatorClient, - block_response::Payload, +use fuel_core_block_aggregator_api::protobuf_types::{ + BlockHeightRequest as ProtoBlockHeightRequest, + BlockRangeRequest as ProtoBlockRangeRequest, + NewBlockSubscriptionRequest as ProtoNewBlockSubscriptionRequest, + block::VersionedBlock as ProtoVersionedBlock, + block_aggregator_client::BlockAggregatorClient as ProtoBlockAggregatorClient, + block_response::Payload as ProtoPayload, + header::VersionedHeader as ProtoVersionedHeader, }; use fuel_core_client::client::FuelClient; -use fuel_core_types::{ - blockchain::block::Block, - fuel_tx::*, - fuel_types::BlockHeight, -}; +use fuel_core_types::fuel_tx::*; use futures::StreamExt; use test_helpers::client_ext::ClientExt; @@ -35,7 +35,7 @@ async fn get_block_range__can_get_serialized_block_from_rpc() { let _ = graphql_client.submit_and_await_commit(&tx).await.unwrap(); let rpc_url = format!("http://{}", rpc_url); - let mut rpc_client = BlockAggregatorClient::connect(rpc_url) + let mut rpc_client = ProtoBlockAggregatorClient::connect(rpc_url) .await .expect("could not connect to server"); @@ -44,15 +44,11 @@ async fn get_block_range__can_get_serialized_block_from_rpc() { .await .unwrap() .unwrap(); - let header = expected_block.header; + let expected_header = expected_block.header; // when - let request = - fuel_core_block_aggregator_api::api::protobuf_adapter::BlockRangeRequest { - start: 1, - end: 1, - }; - let actual_bytes = if let Some(Payload::Literal(block)) = rpc_client + let request = ProtoBlockRangeRequest { start: 1, end: 1 }; + let actual_block = if let Some(ProtoPayload::Literal(block)) = rpc_client .get_block_range(request) .await .unwrap() @@ -63,23 +59,18 @@ async fn get_block_range__can_get_serialized_block_from_rpc() { .unwrap() .payload { - block.data + block } else { panic!("expected literal block payload"); }; - let actual_block: Block = postcard::from_bytes(&actual_bytes).unwrap(); + let ProtoVersionedBlock::V1(v1_block) = actual_block.versioned_block.unwrap(); + let actual_height = match v1_block.header.unwrap().versioned_header.unwrap() { + ProtoVersionedHeader::V1(v1_header) => v1_header.height, + ProtoVersionedHeader::V2(v2_header) => v2_header.height, + }; // then - assert_eq!( - BlockHeight::from(header.height.0), - *actual_block.header().height() - ); - // check txs - let actual_tx = actual_block.transactions().first().unwrap(); - let expected_opaque_tx = expected_block.transactions.first().unwrap().to_owned(); - let expected_tx: Transaction = expected_opaque_tx.try_into().unwrap(); - - assert_eq!(&expected_tx, actual_tx); + assert_eq!(expected_header.height.0, actual_height); } #[tokio::test(flavor = "multi_thread")] @@ -98,13 +89,12 @@ async fn get_block_height__can_get_value_from_rpc() { let _ = graphql_client.submit_and_await_commit(&tx).await.unwrap(); let rpc_url = format!("http://{}", rpc_url); - let mut rpc_client = BlockAggregatorClient::connect(rpc_url) + let mut rpc_client = ProtoBlockAggregatorClient::connect(rpc_url) .await .expect("could not connect to server"); // when - let request = - fuel_core_block_aggregator_api::api::protobuf_adapter::BlockHeightRequest {}; + let request = ProtoBlockHeightRequest {}; let expected_height = 1; let actual_height = rpc_client .get_block_height(request) @@ -131,12 +121,11 @@ async fn new_block_subscription__can_get_expect_block() { let tx = Transaction::default_test_tx(); let rpc_url = format!("http://{}", rpc_url); - let mut rpc_client = BlockAggregatorClient::connect(rpc_url) + let mut rpc_client = ProtoBlockAggregatorClient::connect(rpc_url) .await .expect("could not connect to server"); - let request = - fuel_core_block_aggregator_api::api::protobuf_adapter::NewBlockSubscriptionRequest {}; + let request = ProtoNewBlockSubscriptionRequest {}; let mut stream = rpc_client .new_block_subscription(request) .await @@ -148,29 +137,20 @@ async fn new_block_subscription__can_get_expect_block() { let next = tokio::time::timeout(std::time::Duration::from_secs(1), stream.next()) .await .unwrap(); - let actual_bytes = - if let Some(Payload::Literal(block)) = next.unwrap().unwrap().payload { - block.data + let actual_block = + if let Some(ProtoPayload::Literal(block)) = next.unwrap().unwrap().payload { + block } else { panic!("expected literal block payload"); }; + let ProtoVersionedBlock::V1(v1_block) = actual_block.versioned_block.unwrap(); + let actual_height = match v1_block.header.unwrap().versioned_header.unwrap() { + ProtoVersionedHeader::V1(v1_header) => v1_header.height, + ProtoVersionedHeader::V2(v2_header) => v2_header.height, + }; + // then - let expected_block = graphql_client - .full_block_by_height(1) - .await - .unwrap() - .unwrap(); - let header = expected_block.header; - let actual_block: Block = postcard::from_bytes(&actual_bytes).unwrap(); - assert_eq!( - BlockHeight::from(header.height.0), - *actual_block.header().height() - ); - // check txs - let actual_tx = actual_block.transactions().first().unwrap(); - let expected_opaque_tx = expected_block.transactions.first().unwrap().to_owned(); - let expected_tx: Transaction = expected_opaque_tx.try_into().unwrap(); - - assert_eq!(&expected_tx, actual_tx); + let expected_height = 1; + assert_eq!(expected_height, actual_height); }