diff --git a/crates/tap-agent/src/agent/sender_account.rs b/crates/tap-agent/src/agent/sender_account.rs index 673614ad6..6eecd98f4 100644 --- a/crates/tap-agent/src/agent/sender_account.rs +++ b/crates/tap-agent/src/agent/sender_account.rs @@ -292,9 +292,6 @@ pub struct SenderAccountArgs { /// Prefix used to bypass limitations of global actor registry (used for tests) pub prefix: Option, - /// Configuration for retry scheduler in case sender is denied - pub retry_interval: Duration, - /// Sender type, used to decide which set of tables to use pub sender_type: SenderType, } @@ -343,8 +340,6 @@ pub struct State { /// Sender Balance used to verify if it has money in /// the escrow to pay for all non-redeemed fees (ravs and receipts) sender_balance: U256, - /// Configuration for retry scheduler in case sender is denied - retry_interval: Duration, /// Adaptative limiter for concurrent Rav Request /// @@ -547,7 +542,6 @@ impl State { If this doesn't work, open an issue on our Github." ) })?; - self.backoff_info.ok(); self.rav_request_for_allocation(allocation_id).await } @@ -586,12 +580,14 @@ impl State { match rav_result { Ok(signed_rav) => { self.sender_fee_tracker.ok_rav_request(allocation_id); + self.backoff_info.ok(); self.adaptive_limiter.on_success(); let rav_value = signed_rav.map_or(0, |rav| rav.value_aggregate); self.update_rav(allocation_id, rav_value); } Err(err) => { self.sender_fee_tracker.failed_rav_backoff(allocation_id); + self.backoff_info.fail(); self.adaptive_limiter.on_failure(); tracing::error!( "Error while requesting RAV for sender {} and allocation {}: {}", @@ -643,6 +639,46 @@ impl State { } } + fn cancel_scheduled_retry(&mut self) { + if let Some(scheduled_rav_request) = self.scheduled_rav_request.take() { + tracing::debug!(sender = %self.sender, "Aborting scheduled RAV request"); + scheduled_rav_request.abort(); + } + } + + fn next_retry_delay(&self) -> Duration { + let global_remaining = self.backoff_info.remaining(); + let allocation_remaining = self.sender_fee_tracker.min_remaining_backoff(); + + match (global_remaining, allocation_remaining) { + (Some(global), Some(allocation)) => std::cmp::max(global, allocation), + (Some(global), None) => global, + (None, Some(allocation)) => allocation, + (None, None) => Duration::ZERO, + } + } + + fn schedule_retry( + &mut self, + myself: &ActorRef, + allocation_id: AllocationId, + ) { + self.cancel_scheduled_retry(); + + let delay = self.next_retry_delay(); + tracing::debug!( + sender = %self.sender, + %allocation_id, + delay = ?delay, + "Scheduling RAV retry" + ); + + let retry_allocation_id = allocation_id; + self.scheduled_rav_request = Some(myself.send_after(delay, move || { + SenderAccountMessage::UpdateReceiptFees(retry_allocation_id, ReceiptFees::Retry) + })); + } + /// Determines whether the sender should be denied/blocked based on current fees and balance. /// /// The deny condition is reached when either: @@ -824,7 +860,6 @@ impl Actor for SenderAccount { sender_aggregator_endpoint, allocation_ids, prefix, - retry_interval, sender_type, }: Self::Arguments, ) -> Result { @@ -1161,9 +1196,7 @@ impl Actor for SenderAccount { sender: sender_id, denied, sender_balance, - retry_interval, adaptive_limiter: AdaptiveLimiter::new(INITIAL_RAV_REQUEST_CONCURRENT, 1..50), - escrow_accounts, escrow_subgraph, network_subgraph, domain_separator, @@ -1283,10 +1316,7 @@ impl Actor for SenderAccount { ); } // If we're here because of a new receipt, abort any scheduled UpdateReceiptFees - if let Some(scheduled_rav_request) = state.scheduled_rav_request.take() { - tracing::debug!(sender = %state.sender, "Aborting scheduled RAV request"); - scheduled_rav_request.abort(); - } + state.cancel_scheduled_retry(); match receipt_fees { ReceiptFees::NewReceipt(value, timestamp_ns) => { @@ -1425,13 +1455,7 @@ impl Actor for SenderAccount { // Action: Schedule another retry to attempt RAV creation again. (true, true) => { // retry in a moment - state.scheduled_rav_request = - Some(myself.send_after(state.retry_interval, move || { - SenderAccountMessage::UpdateReceiptFees( - allocation_id, - ReceiptFees::Retry, - ) - })); + state.schedule_retry(&myself, allocation_id); } _ => {} } @@ -1766,8 +1790,6 @@ pub mod tests { /// Prefix shared between tests so we don't have conflicts in the global registry const BUFFER_DURATION: Duration = Duration::from_millis(100); - const RETRY_DURATION: Duration = Duration::from_millis(1000); - async fn setup_mock_escrow_subgraph() -> MockServer { let mock_escrow_subgraph_server: MockServer = MockServer::start().await; mock_escrow_subgraph_server @@ -2248,15 +2270,15 @@ pub mod tests { .unwrap(); flush_messages(&mut msg_receiver).await; - // wait to try again so it's outside the buffer - tokio::time::sleep(RETRY_DURATION).await; + // wait briefly to allow the retry loop to process + tokio::time::sleep(Duration::from_millis(50)).await; assert_triggered!(triggered_rav_request); // Verify that no additional retry happens since the first RAV request // successfully cleared the unaggregated fees and resolved the deny condition. // This validates that the retry mechanism stops when the underlying issue is resolved, // which is the correct behavior according to the TAP protocol and retry logic. - tokio::time::sleep(RETRY_DURATION).await; + tokio::time::sleep(Duration::from_millis(50)).await; assert_not_triggered!(triggered_rav_request); } diff --git a/crates/tap-agent/src/agent/sender_accounts_manager.rs b/crates/tap-agent/src/agent/sender_accounts_manager.rs index 8737649f3..f4d93712f 100644 --- a/crates/tap-agent/src/agent/sender_accounts_manager.rs +++ b/crates/tap-agent/src/agent/sender_accounts_manager.rs @@ -6,7 +6,6 @@ use std::{ fmt::Display, str::FromStr, sync::LazyLock, - time::Duration, }; use anyhow::{anyhow, bail}; @@ -39,8 +38,6 @@ static RECEIPTS_CREATED: LazyLock = LazyLock::new(|| { .unwrap() }); -const RETRY_INTERVAL: Duration = Duration::from_secs(30); - /// Notification received by pgnotify for V1 (legacy) receipts /// /// This contains a list of properties that are sent by postgres when a V1 receipt is inserted @@ -1069,7 +1066,6 @@ impl State { .clone(), allocation_ids, prefix: self.prefix.clone(), - retry_interval: RETRY_INTERVAL, sender_type, }) } diff --git a/crates/tap-agent/src/backoff.rs b/crates/tap-agent/src/backoff.rs index c7ef5472e..e834e9a37 100644 --- a/crates/tap-agent/src/backoff.rs +++ b/crates/tap-agent/src/backoff.rs @@ -3,18 +3,13 @@ //! # backoff //! -//! This module is used to provide a helper that keep tracks of exponential backoff information in a -//! non-blocking way. This is important since Actors process one message at a time, and a sleep in -//! the middle would affect performance. -//! -//! This way we just mark something as "in backoff" and just check that information before sending -//! the request. -//! -//! This module is also used by [crate::tracker]. +//! Helper for tracking exponential backoff windows without blocking the async actor loop. Actors +//! process one message at a time, so we just mark the next instant when work is allowed again and +//! query that metadata before firing a request. use std::time::{Duration, Instant}; -/// Backoff information based on [Instant] +/// Backoff information based on [`Instant`] #[derive(Debug, Clone)] pub struct BackoffInfo { failed_count: u32, @@ -22,28 +17,30 @@ pub struct BackoffInfo { } impl BackoffInfo { - /// Callback representing a successful request - /// - /// This resets the failed_count + /// Marks a successful attempt, resetting counters and clearing any pending backoff delay. pub fn ok(&mut self) { self.failed_count = 0; + self.failed_backoff_time = Instant::now(); } - /// Callback representing a failed request - /// - /// This sets the backoff time to max(100ms * 2 ^ retries, 60s) + /// Marks a failed attempt, growing the backoff delay exponentially up to 60 seconds. pub fn fail(&mut self) { - // backoff = max(100ms * 2 ^ retries, 60s) - self.failed_backoff_time = Instant::now() - + (Duration::from_millis(100) * 2u32.pow(self.failed_count)) - .min(Duration::from_secs(60)); + let delay = + (Duration::from_millis(100) * 2u32.pow(self.failed_count)).min(Duration::from_secs(60)); + self.failed_backoff_time = Instant::now() + delay; self.failed_count += 1; } - /// Returns if backoff is in process + /// Returns the remaining backoff duration, if the current attempt should keep waiting. + pub fn remaining(&self) -> Option { + self.failed_backoff_time + .checked_duration_since(Instant::now()) + .filter(|remaining| !remaining.is_zero()) + } + + /// Returns whether the caller is still inside the backoff window. pub fn in_backoff(&self) -> bool { - let now = Instant::now(); - now < self.failed_backoff_time + self.remaining().is_some() } } diff --git a/crates/tap-agent/src/test.rs b/crates/tap-agent/src/test.rs index 57243cb6b..09c923c34 100644 --- a/crates/tap-agent/src/test.rs +++ b/crates/tap-agent/src/test.rs @@ -68,7 +68,6 @@ pub const RECEIPT_LIMIT: u64 = 10000; pub const DUMMY_URL: &str = "http://localhost:1234"; pub const ESCROW_VALUE: u128 = 1000; const BUFFER_DURATION: Duration = Duration::from_millis(100); -const RETRY_DURATION: Duration = Duration::from_millis(1000); const RAV_REQUEST_TIMEOUT: Duration = Duration::from_secs(60); const TAP_SENDER_TIMEOUT: Duration = Duration::from_secs(30); @@ -182,7 +181,6 @@ pub async fn create_sender_account( sender_aggregator_endpoint: aggregator_url, allocation_ids: HashSet::new(), prefix: Some(prefix.clone()), - retry_interval: RETRY_DURATION, sender_type: SenderType::Legacy, }; @@ -1101,3 +1099,51 @@ pub mod actors { (rx, sender_account) } } + +#[cfg(test)] +mod retry_helpers_tests { + use super::*; + use crate::{backoff::BackoffInfo, tracker::SenderFeeTracker}; + use std::time::Duration; + use thegraph_core::alloy::primitives::Address; + + #[test] + fn backoff_info_remaining_resets_after_ok() { + let mut info = BackoffInfo::default(); + assert!(info.remaining().is_none()); + assert!(!info.in_backoff()); + + info.fail(); + let remaining = info + .remaining() + .expect("backoff duration should be tracked after a failure"); + assert!(remaining > Duration::ZERO); + assert!(info.in_backoff()); + + info.ok(); + assert!(info.remaining().is_none()); + assert!(!info.in_backoff()); + } + + #[test] + fn sender_fee_tracker_min_remaining_backoff_roundtrip() { + let mut tracker = SenderFeeTracker::new(Duration::ZERO); + assert!(tracker.min_remaining_backoff().is_none()); + + let allocation = Address::from_low_u64_be(1); + tracker.failed_rav_backoff(allocation); + let first_delay = tracker + .min_remaining_backoff() + .expect("tracker should expose backoff after failure"); + assert!(first_delay > Duration::ZERO); + + tracker.failed_rav_backoff(allocation); + let second_delay = tracker + .min_remaining_backoff() + .expect("tracker should keep tracking backoff durations"); + assert!(second_delay >= first_delay); + + tracker.ok_rav_request(allocation); + assert!(tracker.min_remaining_backoff().is_none()); + } +} diff --git a/crates/tap-agent/src/tracker.rs b/crates/tap-agent/src/tracker.rs index ac3ffc23b..95c702194 100644 --- a/crates/tap-agent/src/tracker.rs +++ b/crates/tap-agent/src/tracker.rs @@ -27,6 +27,8 @@ mod tracker_tests; pub use generic_tracker::GlobalFeeTracker; +use std::time::Duration; + use crate::agent::unaggregated_receipts::UnaggregatedReceipts; /// Simple Tracker used for just `u128` fees and no extra blocking or unblocking feature @@ -41,6 +43,16 @@ pub type SimpleFeeTracker = GenericTracker; pub type SenderFeeTracker = GenericTracker; +impl SenderFeeTracker { + /// Returns the smallest remaining backoff across all tracked allocations, if any + pub fn min_remaining_backoff(&self) -> Option { + self.id_to_fee + .values() + .filter_map(|stats| stats.backoff_info.remaining()) + .min() + } +} + /// Stats trait used by the Counter of a given allocation. /// /// This is the data that goes in the Value side of the Map inside our Tracker