From 4e1a8fc095570b8ecc9cee513b0eda2a53f49d1d Mon Sep 17 00:00:00 2001 From: Pablo Castellano Date: Mon, 19 May 2025 21:04:01 +0200 Subject: [PATCH] feat: add timeouts to prevent indexer hang during sync operations This commit implements timeout mechanisms throughout the synchronizer and slots_processor modules to prevent indefinite hangs. Key changes include: - Add configurable request_timeout parameter to Synchronizer and SlotsProcessor - Implement timeouts for network operations with proper error handling - Add longer timeouts for more complex operations like block processing - Create appropriate error types (OperationTimeout) with detailed error messages These changes ensure operations fail gracefully with clear error messages after timeout periods instead of hanging indefinitely, allowing the service to recover through pod restarts when network or processing issues occur. --- src/slots_processor/error.rs | 5 ++ src/slots_processor/mod.rs | 69 +++++++++++++++++++---- src/synchronizer/error.rs | 5 ++ src/synchronizer/mod.rs | 105 +++++++++++++++++++++++++---------- 4 files changed, 146 insertions(+), 38 deletions(-) diff --git a/src/slots_processor/error.rs b/src/slots_processor/error.rs index 6e808f0..20d128e 100644 --- a/src/slots_processor/error.rs +++ b/src/slots_processor/error.rs @@ -8,6 +8,11 @@ pub enum SlotProcessingError { ClientError(#[from] crate::clients::common::ClientError), #[error(transparent)] Provider(#[from] alloy::transports::TransportError), + #[error("Operation timed out: {operation} for slot {slot}")] + OperationTimeout { + operation: String, + slot: u32, + }, #[error(transparent)] Other(#[from] anyhow::Error), } diff --git a/src/slots_processor/mod.rs b/src/slots_processor/mod.rs index 814c8c1..479e65a 100644 --- a/src/slots_processor/mod.rs +++ b/src/slots_processor/mod.rs @@ -2,9 +2,11 @@ use alloy::{ primitives::B256, rpc::types::BlockTransactionsKind, transports::http::ReqwestTransport, }; use anyhow::{anyhow, Context as AnyhowContext, Result}; +use futures::future::timeout; +use std::time::Duration; use crate::clients::beacon::types::BlockHeader; -use tracing::{debug, info, Instrument}; +use tracing::{debug, info, Instrument, warn}; use crate::{ clients::{ @@ -42,6 +44,8 @@ impl From<&BlockData> for BlockHeader { pub struct SlotsProcessor { context: Box>, pub last_processed_block: Option, + // Timeout for network operations in seconds + request_timeout: u64, } impl SlotsProcessor { @@ -52,6 +56,19 @@ impl SlotsProcessor { Self { context, last_processed_block, + request_timeout: 30, + } + } + + pub fn with_timeout( + context: Box>, + last_processed_block: Option, + timeout_secs: u64, + ) -> SlotsProcessor { + SlotsProcessor { + context, + last_processed_block, + request_timeout: timeout_secs, } } @@ -70,11 +87,16 @@ impl SlotsProcessor { let mut last_processed_block = self.last_processed_block.clone(); for current_slot in slots { - let block_header = match self - .context - .beacon_client() - .get_block_header(current_slot.into()) - .await? + let block_header_result = timeout( + Duration::from_secs(self.request_timeout), + self.context.beacon_client().get_block_header(current_slot.into()) + ).await + .map_err(|_| SlotProcessingError::OperationTimeout { + operation: "get_block_header".to_string(), + slot: current_slot, + })??; + + let block_header = match block_header_result { Some(header) => header, None => { @@ -97,9 +119,18 @@ impl SlotsProcessor { "Reorg detected!", ); - self.process_reorg(&prev_block_header, &block_header) - .await - .map_err(|error| SlotsProcessorError::FailedReorgProcessing { + timeout( + Duration::from_secs(self.request_timeout * 2), + self.process_reorg(&prev_block_header, &block_header) + ).await + .map_err(|_| SlotsProcessorError::FailedReorgProcessing { + old_slot: prev_block_header.slot, + new_slot: block_header.slot, + new_head_block_root: block_header.root, + old_head_block_root: prev_block_header.root, + error: anyhow!("Operation timed out: process_reorg"), + })?? + .map_err(|error| SlotsProcessorError::FailedReorgProcessing { old_slot: prev_block_header.slot, new_slot: block_header.slot, new_head_block_root: block_header.root, @@ -110,7 +141,25 @@ impl SlotsProcessor { } } - if let Err(error) = self.process_block(&block_header).await { + let process_block_result = timeout( + Duration::from_secs(self.request_timeout * 2), + self.process_block(&block_header) + ).await; + + if let Err(timeout_error) = process_block_result { + warn!(slot = current_slot, "Process block operation timed out"); + return Err(SlotsProcessorError::FailedSlotsProcessing { + initial_slot, + final_slot, + failed_slot: current_slot, + error: SlotProcessingError::OperationTimeout { + operation: "process_block".to_string(), + slot: current_slot, + }, + }); + } + + if let Err(error) = process_block_result.unwrap() { return Err(SlotsProcessorError::FailedSlotsProcessing { initial_slot, final_slot, diff --git a/src/synchronizer/error.rs b/src/synchronizer/error.rs index 2fe1a8d..60b9cb9 100644 --- a/src/synchronizer/error.rs +++ b/src/synchronizer/error.rs @@ -19,6 +19,11 @@ pub enum SynchronizerError { slot: u32, error: crate::clients::common::ClientError, }, + #[error("Operation timed out: {operation} for block_id {block_id}")] + OperationTimeout { + operation: String, + block_id: String, + }, #[error(transparent)] Other(#[from] anyhow::Error), } diff --git a/src/synchronizer/mod.rs b/src/synchronizer/mod.rs index 6f912b0..f0485ff 100644 --- a/src/synchronizer/mod.rs +++ b/src/synchronizer/mod.rs @@ -1,11 +1,12 @@ use std::fmt::Debug; +use std::time::Duration; use alloy::transports::http::ReqwestTransport; use anyhow::anyhow; use async_trait::async_trait; -use futures::future::join_all; +use futures::future::{join_all, timeout}; use tokio::task::JoinHandle; -use tracing::{debug, info, Instrument}; +use tracing::{debug, info, warn, Instrument}; #[cfg(test)] use mockall::automock; @@ -52,6 +53,8 @@ pub struct Synchronizer { slots_checkpoint: u32, checkpoint_type: CheckpointType, last_synced_block: Option, + // Timeout for network operations in seconds + request_timeout: u64, } #[derive(Clone, Copy, Debug, PartialEq)] @@ -69,6 +72,7 @@ impl Default for SynchronizerBuilder { slots_checkpoint: 1000, checkpoint_type: CheckpointType::Upper, last_synced_block: None, + request_timeout: 30, } } } @@ -101,6 +105,11 @@ impl SynchronizerBuilder { self } + pub fn with_request_timeout(&mut self, timeout_secs: u64) -> &mut Self { + self.request_timeout = timeout_secs; + self + } + pub fn build( &self, context: Box>, @@ -112,6 +121,7 @@ impl SynchronizerBuilder { slots_checkpoint: self.slots_checkpoint, checkpoint_type: self.checkpoint_type, last_synced_block: self.last_synced_block.clone(), + request_timeout: self.request_timeout, } } } @@ -251,9 +261,14 @@ impl Synchronizer { checkpoint_final_slot = final_chunk_slot ); - self.process_slots(initial_chunk_slot, final_chunk_slot) - .instrument(sync_slots_chunk_span) - .await?; + timeout( + Duration::from_secs(self.request_timeout * 3), + self.process_slots(initial_chunk_slot, final_chunk_slot).instrument(sync_slots_chunk_span) + ).await + .map_err(|_| SynchronizerError::OperationTimeout { + operation: "process_slots".to_string(), + block_id: format!("from {} to {}", initial_chunk_slot, final_chunk_slot), + })??; let last_slot = Some(if is_reverse_sync { final_chunk_slot + 1 @@ -277,17 +292,24 @@ impl Synchronizer { self.last_synced_block.as_ref().map(|block| block.slot); } - if let Err(error) = self - .context - .blobscan_client() - .update_sync_state(BlockchainSyncState { - last_finalized_block: None, - last_lower_synced_slot, - last_upper_synced_slot, - last_upper_synced_block_root, - last_upper_synced_block_slot, - }) - .await + let update_result = timeout( + Duration::from_secs(self.request_timeout), + self.context + .blobscan_client() + .update_sync_state(BlockchainSyncState { + last_finalized_block: None, + last_lower_synced_slot, + last_upper_synced_slot, + last_upper_synced_block_root, + last_upper_synced_block_slot, + }) + ).await + .map_err(|_| SynchronizerError::OperationTimeout { + operation: "update_sync_state".to_string(), + block_id: format!("slot {}", last_lower_synced_slot.or(last_upper_synced_slot).unwrap_or(0)), + })?; + + if let Err(error) = update_result { let new_synced_slot = match last_lower_synced_slot.or(last_upper_synced_slot) { Some(slot) => slot, @@ -339,12 +361,26 @@ impl CommonSynchronizer for Synchronizer { } async fn sync_block(&mut self, block_id: BlockId) -> Result<(), SynchronizerError> { - let final_slot = block_id - .resolve_to_slot(self.context.beacon_client()) - .await?; - - self.process_slots_by_checkpoints(final_slot, final_slot + 1) - .await?; + let slot_resolution = timeout( + Duration::from_secs(self.request_timeout), + block_id.resolve_to_slot(self.context.beacon_client()) + ).await + .map_err(|_| SynchronizerError::OperationTimeout { + operation: "resolve_to_slot".to_string(), + block_id: format!("{:?}", block_id), + })?? + ; + + let final_slot = slot_resolution; + + timeout( + Duration::from_secs(self.request_timeout * 2), + self.process_slots_by_checkpoints(final_slot, final_slot + 1) + ).await + .map_err(|_| SynchronizerError::OperationTimeout { + operation: "process_slots_by_checkpoints".to_string(), + block_id: format!("{}", final_slot), + })??; Ok(()) } @@ -354,12 +390,25 @@ impl CommonSynchronizer for Synchronizer { initial_block_id: BlockId, final_block_id: BlockId, ) -> Result<(), SynchronizerError> { - let initial_slot = initial_block_id - .resolve_to_slot(self.context.beacon_client()) - .await?; - let mut final_slot = final_block_id - .resolve_to_slot(self.context.beacon_client()) - .await?; + let initial_slot = timeout( + Duration::from_secs(self.request_timeout), + initial_block_id.resolve_to_slot(self.context.beacon_client()) + ).await + .map_err(|_| SynchronizerError::OperationTimeout { + operation: "resolve_to_slot (initial)".to_string(), + block_id: format!("{:?}", initial_block_id), + })?? + ; + + let mut final_slot = timeout( + Duration::from_secs(self.request_timeout), + final_block_id.resolve_to_slot(self.context.beacon_client()) + ).await + .map_err(|_| SynchronizerError::OperationTimeout { + operation: "resolve_to_slot (final)".to_string(), + block_id: format!("{:?}", final_block_id), + })?? + ; if initial_slot == final_slot { return Ok(());