Skip to content

Commit 6083539

Browse files
apollo_consensus_orchestrator: improve inputs to build_proposal::get_proposal_content (#10091)
1 parent 5b81f16 commit 6083539

File tree

1 file changed

+20
-37
lines changed

1 file changed

+20
-37
lines changed

crates/apollo_consensus_orchestrator/src/build_proposal.rs

Lines changed: 20 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
#[path = "build_proposal_test.rs"]
33
mod build_proposal_test;
44

5+
use std::borrow::BorrowMut;
56
use std::sync::{Arc, Mutex};
67
use std::time::Duration;
78

@@ -11,11 +12,8 @@ use apollo_batcher_types::batcher_types::{
1112
ProposalId,
1213
ProposeBlockInput,
1314
};
14-
use apollo_batcher_types::communication::{BatcherClient, BatcherClientError};
15-
use apollo_class_manager_types::transaction_converter::{
16-
TransactionConverterError,
17-
TransactionConverterTrait,
18-
};
15+
use apollo_batcher_types::communication::BatcherClientError;
16+
use apollo_class_manager_types::transaction_converter::TransactionConverterError;
1917
use apollo_consensus::types::{ProposalCommitment, Round};
2018
use apollo_l1_gas_price_types::errors::{EthToStrkOracleClientError, L1GasPriceClientError};
2119
use apollo_protobuf::consensus::{
@@ -25,7 +23,7 @@ use apollo_protobuf::consensus::{
2523
ProposalPart,
2624
TransactionBatch,
2725
};
28-
use apollo_time::time::{Clock, DateTime};
26+
use apollo_time::time::DateTime;
2927
use starknet_api::block::{BlockNumber, GasPrice};
3028
use starknet_api::consensus_transaction::InternalConsensusTransaction;
3129
use starknet_api::core::ContractAddress;
@@ -116,17 +114,7 @@ pub(crate) async fn build_proposal(
116114
.await
117115
.expect("Failed to send block info");
118116

119-
let (proposal_commitment, content) = get_proposal_content(
120-
args.proposal_id,
121-
args.deps.batcher.as_ref(),
122-
args.stream_sender,
123-
args.cende_write_success,
124-
args.deps.transaction_converter,
125-
args.cancel_token,
126-
args.deps.clock,
127-
batcher_deadline,
128-
)
129-
.await?;
117+
let (proposal_commitment, content) = get_proposal_content(&mut args, batcher_deadline).await?;
130118

131119
// Update valid_proposals before sending fin to avoid a race condition
132120
// with `repropose` being called before `valid_proposals` is updated.
@@ -187,33 +175,26 @@ async fn initiate_build(args: &ProposalBuildArguments) -> BuildProposalResult<Co
187175
/// 1. Receive chunks of content from the batcher.
188176
/// 2. Forward these to the stream handler to be streamed out to the network.
189177
/// 3. Once finished, receive the commitment from the batcher.
190-
// TODO(guyn): consider passing a ref to BuildProposalArguments instead of all the fields
191-
// separately.
192-
#[allow(clippy::too_many_arguments)]
193178
async fn get_proposal_content(
194-
proposal_id: ProposalId,
195-
batcher: &dyn BatcherClient,
196-
mut stream_sender: StreamSender,
197-
cende_write_success: AbortOnDropHandle<bool>,
198-
transaction_converter: Arc<dyn TransactionConverterTrait>,
199-
cancel_token: CancellationToken,
200-
clock: Arc<dyn Clock>,
179+
args: &mut ProposalBuildArguments,
201180
batcher_deadline: DateTime,
202181
) -> BuildProposalResult<(ProposalCommitment, Vec<Vec<InternalConsensusTransaction>>)> {
203182
let mut content = Vec::new();
204183
loop {
205-
if cancel_token.is_cancelled() {
184+
if args.cancel_token.is_cancelled() {
206185
return Err(BuildProposalError::Interrupted);
207186
}
208187
// We currently want one part of the node failing to cause all components to fail. If this
209188
// changes, we can simply return None and consider this as a failed proposal which consensus
210189
// should support.
211-
let response = batcher
212-
.get_proposal_content(GetProposalContentInput { proposal_id })
190+
let response = args
191+
.deps
192+
.batcher
193+
.get_proposal_content(GetProposalContentInput { proposal_id: args.proposal_id })
213194
.await
214195
.map_err(|err| {
215196
BuildProposalError::Batcher(
216-
format!("Failed to get proposal content for proposal_id {proposal_id}."),
197+
format!("Failed to get proposal content for proposal_id {}.", args.proposal_id),
217198
err,
218199
)
219200
})?;
@@ -228,14 +209,16 @@ async fn get_proposal_content(
228209
txs.len()
229210
);
230211
let transactions = futures::future::join_all(txs.into_iter().map(|tx| {
231-
transaction_converter.convert_internal_consensus_tx_to_consensus_tx(tx)
212+
args.deps
213+
.transaction_converter
214+
.convert_internal_consensus_tx_to_consensus_tx(tx)
232215
}))
233216
.await
234217
.into_iter()
235218
.collect::<Result<Vec<_>, _>>()?;
236219

237220
trace!(?transactions, "Sending transaction batch with {} txs.", transactions.len());
238-
stream_sender
221+
args.stream_sender
239222
.send(ProposalPart::Transactions(TransactionBatch { transactions }))
240223
.await
241224
.expect("Failed to broadcast proposal content");
@@ -256,11 +239,11 @@ async fn get_proposal_content(
256239
// If the blob writing operation to Aerospike doesn't return a success status, we
257240
// can't finish the proposal. Must wait for it at least until batcher_timeout is
258241
// reached.
259-
let remaining = (batcher_deadline - clock.now())
242+
let remaining = (batcher_deadline - args.deps.clock.now())
260243
.to_std()
261244
.unwrap_or_default()
262245
.max(MIN_WAIT_DURATION); // Ensure we wait at least 1 ms to avoid immediate timeout.
263-
match tokio::time::timeout(remaining, cende_write_success).await {
246+
match tokio::time::timeout(remaining, args.cende_write_success.borrow_mut()).await {
264247
Err(_) => {
265248
return Err(BuildProposalError::CendeWriteError(
266249
"Writing blob to Aerospike didn't return in time.".to_string(),
@@ -282,13 +265,13 @@ async fn get_proposal_content(
282265
let final_n_executed_txs_u64 = final_n_executed_txs
283266
.try_into()
284267
.expect("Number of executed transactions should fit in u64");
285-
stream_sender
268+
args.stream_sender
286269
.send(ProposalPart::ExecutedTransactionCount(final_n_executed_txs_u64))
287270
.await
288271
.expect("Failed to broadcast executed transaction count");
289272
let fin = ProposalFin { proposal_commitment };
290273
info!("Sending fin={fin:?}");
291-
stream_sender
274+
args.stream_sender
292275
.send(ProposalPart::Fin(fin))
293276
.await
294277
.expect("Failed to broadcast proposal fin");

0 commit comments

Comments
 (0)