diff --git a/src/clients/blobscan/mod.rs b/src/clients/blobscan/mod.rs index 53f5576..b355844 100644 --- a/src/clients/blobscan/mod.rs +++ b/src/clients/blobscan/mod.rs @@ -51,13 +51,13 @@ pub struct BlobscanClient { base_url: Url, client: reqwest::Client, jwt_manager: JWTManager, - exp_backoff: Option, + exp_backoff: ExponentialBackoff, } pub struct Config { pub base_url: String, pub secret_key: String, - pub exp_backoff: Option, + pub exp_backoff: ExponentialBackoff, } #[async_trait] @@ -94,7 +94,7 @@ impl CommonBlobscanClient for BlobscanClient { blobs, }; - json_put!(&self.client, url, token, &req).map(|_: Option<()>| ()) + json_put!(&self.client, url, token, &req, self.exp_backoff.clone()).map(|_: Option<()>| ()) } async fn get_block(&self, slot: u32) -> ClientResult> { @@ -116,7 +116,7 @@ impl CommonBlobscanClient for BlobscanClient { rewinded_blocks, }; - json_put!(&self.client, url, ReorgedBlocksRequestBody, token, &req).map(|_| ()) + json_put!(&self.client, url, ReorgedBlocksRequestBody, token, &req, self.exp_backoff.clone()).map(|_| ()) } async fn update_sync_state(&self, sync_state: BlockchainSyncState) -> ClientResult<()> { @@ -124,7 +124,7 @@ impl CommonBlobscanClient for BlobscanClient { let token = self.jwt_manager.get_token()?; let req: BlockchainSyncStateRequest = sync_state.into(); - json_put!(&self.client, url, token, &req).map(|_: Option<()>| ()) + json_put!(&self.client, url, token, &req, self.exp_backoff.clone()).map(|_: Option<()>| ()) } async fn get_sync_state(&self) -> ClientResult> { diff --git a/src/clients/macros.rs b/src/clients/macros.rs index f9f6fb4..a1949ee 100644 --- a/src/clients/macros.rs +++ b/src/clients/macros.rs @@ -1,5 +1,5 @@ #[macro_export] -/// Make a GET request sending and expecting JSON. +/// Make a GET request sending and expecting JSON with retry using exponential backoff. /// if JSON deser fails, emit a `WARN` level tracing event macro_rules! json_get { ($client:expr, $url:expr, $expected:ty, $exp_backoff:expr) => { @@ -16,14 +16,13 @@ macro_rules! json_get { req = req.bearer_auth($auth_token); } - let resp = if $exp_backoff.is_some() { - match backoff::future::retry_notify( - $exp_backoff.unwrap(), - || { - let req = req.try_clone().unwrap(); + let resp = match backoff::future::retry_notify( + $exp_backoff, + || { + let req = req.try_clone().unwrap(); - async move { req.send().await.map_err(|err| err.into()) } - }, + async move { req.send().await.map_err(|err| err.into()) } + }, |error, duration: std::time::Duration| { let duration = duration.as_secs(); @@ -35,32 +34,17 @@ macro_rules! json_get { ); }, ) - .await { - Ok(resp) => resp, - Err(error) => { - tracing::warn!( - method = "GET", - url = %url, - ?error, - "Failed to send request. All retries failed" - ); + .await { + Ok(resp) => resp, + Err(error) => { + tracing::warn!( + method = "GET", + url = %url, + ?error, + "Failed to send request. All retries failed" + ); - return Err(error.into()) - } - } - } else { - match req.send().await { - Err(error) => { - tracing::warn!( - method = "GET", - url = %url, - ?error, - "Failed to send request" - ); - - return Err(error.into()) - }, - Ok(resp) => resp + return Err(error.into()) } }; @@ -92,38 +76,52 @@ macro_rules! json_get { } #[macro_export] -/// Make a PUT request sending JSON. +/// Make a PUT request sending JSON with retry using exponential backoff. /// if JSON deser fails, emit a `WARN` level tracing event macro_rules! json_put { - ($client:expr, $url:expr, $auth_token:expr, $body:expr) => { - json_put!($client, $url, (), $auth_token, $body) + ($client:expr, $url:expr, $auth_token:expr, $body:expr, $exp_backoff:expr) => { + json_put!($client, $url, (), $auth_token, $body, $exp_backoff) }; - ($client:expr, $url:expr, $expected:ty, $auth_token:expr, $body:expr) => {{ + ($client:expr, $url:expr, $expected:ty, $auth_token:expr, $body:expr, $exp_backoff:expr) => {{ let url = $url.clone(); let body = format!("{:?}", $body); tracing::trace!(method = "PUT", url = url.as_str(), body, "Dispatching API client request"); + let resp = match backoff::future::retry_notify( + $exp_backoff, + || { + let req = $client + .put($url.clone()) + .bearer_auth($auth_token.clone()) + .json($body); - let resp = match $client - .put($url) - .bearer_auth($auth_token) - .json($body) - .send() - .await { - Err(error) => { - tracing::warn!( - method = "PUT", - url = %url, - body = body, - ?error, - "Failed to send request" - ); + async move { req.send().await.map_err(|err| err.into()) } + }, + |error, duration: std::time::Duration| { + let duration = duration.as_secs(); - return Err(error.into()) - }, - Ok(resp) => resp - }; + tracing::warn!( + method = "PUT", + url = %url, + ?error, + "Failed to send request. Retrying in {duration} seconds…" + ); + }, + ) + .await { + Ok(resp) => resp, + Err(error) => { + tracing::warn!( + method = "PUT", + url = %url, + ?error, + "Failed to send request. All retries failed" + ); + + return Err(error.into()) + } + }; let text = resp.text().await?; let result: $crate::clients::common::ClientResponse<$expected> = text.parse()?;