diff --git a/src/slots_processor/error.rs b/src/slots_processor/error.rs index 6e808f0..20d128e 100644 --- a/src/slots_processor/error.rs +++ b/src/slots_processor/error.rs @@ -8,6 +8,11 @@ pub enum SlotProcessingError { ClientError(#[from] crate::clients::common::ClientError), #[error(transparent)] Provider(#[from] alloy::transports::TransportError), + #[error("Operation timed out: {operation} for slot {slot}")] + OperationTimeout { + operation: String, + slot: u32, + }, #[error(transparent)] Other(#[from] anyhow::Error), } diff --git a/src/slots_processor/mod.rs b/src/slots_processor/mod.rs index 814c8c1..479e65a 100644 --- a/src/slots_processor/mod.rs +++ b/src/slots_processor/mod.rs @@ -2,9 +2,11 @@ use alloy::{ primitives::B256, rpc::types::BlockTransactionsKind, transports::http::ReqwestTransport, }; use anyhow::{anyhow, Context as AnyhowContext, Result}; +use futures::future::timeout; +use std::time::Duration; use crate::clients::beacon::types::BlockHeader; -use tracing::{debug, info, Instrument}; +use tracing::{debug, info, Instrument, warn}; use crate::{ clients::{ @@ -42,6 +44,8 @@ impl From<&BlockData> for BlockHeader { pub struct SlotsProcessor { context: Box>, pub last_processed_block: Option, + // Timeout for network operations in seconds + request_timeout: u64, } impl SlotsProcessor { @@ -52,6 +56,19 @@ impl SlotsProcessor { Self { context, last_processed_block, + request_timeout: 30, + } + } + + pub fn with_timeout( + context: Box>, + last_processed_block: Option, + timeout_secs: u64, + ) -> SlotsProcessor { + SlotsProcessor { + context, + last_processed_block, + request_timeout: timeout_secs, } } @@ -70,11 +87,16 @@ impl SlotsProcessor { let mut last_processed_block = self.last_processed_block.clone(); for current_slot in slots { - let block_header = match self - .context - .beacon_client() - .get_block_header(current_slot.into()) - .await? + let block_header_result = timeout( + Duration::from_secs(self.request_timeout), + self.context.beacon_client().get_block_header(current_slot.into()) + ).await + .map_err(|_| SlotProcessingError::OperationTimeout { + operation: "get_block_header".to_string(), + slot: current_slot, + })??; + + let block_header = match block_header_result { Some(header) => header, None => { @@ -97,9 +119,18 @@ impl SlotsProcessor { "Reorg detected!", ); - self.process_reorg(&prev_block_header, &block_header) - .await - .map_err(|error| SlotsProcessorError::FailedReorgProcessing { + timeout( + Duration::from_secs(self.request_timeout * 2), + self.process_reorg(&prev_block_header, &block_header) + ).await + .map_err(|_| SlotsProcessorError::FailedReorgProcessing { + old_slot: prev_block_header.slot, + new_slot: block_header.slot, + new_head_block_root: block_header.root, + old_head_block_root: prev_block_header.root, + error: anyhow!("Operation timed out: process_reorg"), + })?? + .map_err(|error| SlotsProcessorError::FailedReorgProcessing { old_slot: prev_block_header.slot, new_slot: block_header.slot, new_head_block_root: block_header.root, @@ -110,7 +141,25 @@ impl SlotsProcessor { } } - if let Err(error) = self.process_block(&block_header).await { + let process_block_result = timeout( + Duration::from_secs(self.request_timeout * 2), + self.process_block(&block_header) + ).await; + + if let Err(timeout_error) = process_block_result { + warn!(slot = current_slot, "Process block operation timed out"); + return Err(SlotsProcessorError::FailedSlotsProcessing { + initial_slot, + final_slot, + failed_slot: current_slot, + error: SlotProcessingError::OperationTimeout { + operation: "process_block".to_string(), + slot: current_slot, + }, + }); + } + + if let Err(error) = process_block_result.unwrap() { return Err(SlotsProcessorError::FailedSlotsProcessing { initial_slot, final_slot, diff --git a/src/synchronizer/error.rs b/src/synchronizer/error.rs index 2fe1a8d..60b9cb9 100644 --- a/src/synchronizer/error.rs +++ b/src/synchronizer/error.rs @@ -19,6 +19,11 @@ pub enum SynchronizerError { slot: u32, error: crate::clients::common::ClientError, }, + #[error("Operation timed out: {operation} for block_id {block_id}")] + OperationTimeout { + operation: String, + block_id: String, + }, #[error(transparent)] Other(#[from] anyhow::Error), } diff --git a/src/synchronizer/mod.rs b/src/synchronizer/mod.rs index 6f912b0..f0485ff 100644 --- a/src/synchronizer/mod.rs +++ b/src/synchronizer/mod.rs @@ -1,11 +1,12 @@ use std::fmt::Debug; +use std::time::Duration; use alloy::transports::http::ReqwestTransport; use anyhow::anyhow; use async_trait::async_trait; -use futures::future::join_all; +use futures::future::{join_all, timeout}; use tokio::task::JoinHandle; -use tracing::{debug, info, Instrument}; +use tracing::{debug, info, warn, Instrument}; #[cfg(test)] use mockall::automock; @@ -52,6 +53,8 @@ pub struct Synchronizer { slots_checkpoint: u32, checkpoint_type: CheckpointType, last_synced_block: Option, + // Timeout for network operations in seconds + request_timeout: u64, } #[derive(Clone, Copy, Debug, PartialEq)] @@ -69,6 +72,7 @@ impl Default for SynchronizerBuilder { slots_checkpoint: 1000, checkpoint_type: CheckpointType::Upper, last_synced_block: None, + request_timeout: 30, } } } @@ -101,6 +105,11 @@ impl SynchronizerBuilder { self } + pub fn with_request_timeout(&mut self, timeout_secs: u64) -> &mut Self { + self.request_timeout = timeout_secs; + self + } + pub fn build( &self, context: Box>, @@ -112,6 +121,7 @@ impl SynchronizerBuilder { slots_checkpoint: self.slots_checkpoint, checkpoint_type: self.checkpoint_type, last_synced_block: self.last_synced_block.clone(), + request_timeout: self.request_timeout, } } } @@ -251,9 +261,14 @@ impl Synchronizer { checkpoint_final_slot = final_chunk_slot ); - self.process_slots(initial_chunk_slot, final_chunk_slot) - .instrument(sync_slots_chunk_span) - .await?; + timeout( + Duration::from_secs(self.request_timeout * 3), + self.process_slots(initial_chunk_slot, final_chunk_slot).instrument(sync_slots_chunk_span) + ).await + .map_err(|_| SynchronizerError::OperationTimeout { + operation: "process_slots".to_string(), + block_id: format!("from {} to {}", initial_chunk_slot, final_chunk_slot), + })??; let last_slot = Some(if is_reverse_sync { final_chunk_slot + 1 @@ -277,17 +292,24 @@ impl Synchronizer { self.last_synced_block.as_ref().map(|block| block.slot); } - if let Err(error) = self - .context - .blobscan_client() - .update_sync_state(BlockchainSyncState { - last_finalized_block: None, - last_lower_synced_slot, - last_upper_synced_slot, - last_upper_synced_block_root, - last_upper_synced_block_slot, - }) - .await + let update_result = timeout( + Duration::from_secs(self.request_timeout), + self.context + .blobscan_client() + .update_sync_state(BlockchainSyncState { + last_finalized_block: None, + last_lower_synced_slot, + last_upper_synced_slot, + last_upper_synced_block_root, + last_upper_synced_block_slot, + }) + ).await + .map_err(|_| SynchronizerError::OperationTimeout { + operation: "update_sync_state".to_string(), + block_id: format!("slot {}", last_lower_synced_slot.or(last_upper_synced_slot).unwrap_or(0)), + })?; + + if let Err(error) = update_result { let new_synced_slot = match last_lower_synced_slot.or(last_upper_synced_slot) { Some(slot) => slot, @@ -339,12 +361,26 @@ impl CommonSynchronizer for Synchronizer { } async fn sync_block(&mut self, block_id: BlockId) -> Result<(), SynchronizerError> { - let final_slot = block_id - .resolve_to_slot(self.context.beacon_client()) - .await?; - - self.process_slots_by_checkpoints(final_slot, final_slot + 1) - .await?; + let slot_resolution = timeout( + Duration::from_secs(self.request_timeout), + block_id.resolve_to_slot(self.context.beacon_client()) + ).await + .map_err(|_| SynchronizerError::OperationTimeout { + operation: "resolve_to_slot".to_string(), + block_id: format!("{:?}", block_id), + })?? + ; + + let final_slot = slot_resolution; + + timeout( + Duration::from_secs(self.request_timeout * 2), + self.process_slots_by_checkpoints(final_slot, final_slot + 1) + ).await + .map_err(|_| SynchronizerError::OperationTimeout { + operation: "process_slots_by_checkpoints".to_string(), + block_id: format!("{}", final_slot), + })??; Ok(()) } @@ -354,12 +390,25 @@ impl CommonSynchronizer for Synchronizer { initial_block_id: BlockId, final_block_id: BlockId, ) -> Result<(), SynchronizerError> { - let initial_slot = initial_block_id - .resolve_to_slot(self.context.beacon_client()) - .await?; - let mut final_slot = final_block_id - .resolve_to_slot(self.context.beacon_client()) - .await?; + let initial_slot = timeout( + Duration::from_secs(self.request_timeout), + initial_block_id.resolve_to_slot(self.context.beacon_client()) + ).await + .map_err(|_| SynchronizerError::OperationTimeout { + operation: "resolve_to_slot (initial)".to_string(), + block_id: format!("{:?}", initial_block_id), + })?? + ; + + let mut final_slot = timeout( + Duration::from_secs(self.request_timeout), + final_block_id.resolve_to_slot(self.context.beacon_client()) + ).await + .map_err(|_| SynchronizerError::OperationTimeout { + operation: "resolve_to_slot (final)".to_string(), + block_id: format!("{:?}", final_block_id), + })?? + ; if initial_slot == final_slot { return Ok(());