Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions src/clients/blobscan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,13 +51,13 @@ pub struct BlobscanClient {
base_url: Url,
client: reqwest::Client,
jwt_manager: JWTManager,
exp_backoff: Option<ExponentialBackoff>,
exp_backoff: ExponentialBackoff,
}

pub struct Config {
pub base_url: String,
pub secret_key: String,
pub exp_backoff: Option<ExponentialBackoff>,
pub exp_backoff: ExponentialBackoff,
}

#[async_trait]
Expand Down Expand Up @@ -94,7 +94,7 @@ impl CommonBlobscanClient for BlobscanClient {
blobs,
};

json_put!(&self.client, url, token, &req).map(|_: Option<()>| ())
json_put!(&self.client, url, token, &req, self.exp_backoff.clone()).map(|_: Option<()>| ())
}

async fn get_block(&self, slot: u32) -> ClientResult<Option<BlobscanBlock>> {
Expand All @@ -116,15 +116,15 @@ impl CommonBlobscanClient for BlobscanClient {
rewinded_blocks,
};

json_put!(&self.client, url, ReorgedBlocksRequestBody, token, &req).map(|_| ())
json_put!(&self.client, url, ReorgedBlocksRequestBody, token, &req, self.exp_backoff.clone()).map(|_| ())
}

async fn update_sync_state(&self, sync_state: BlockchainSyncState) -> ClientResult<()> {
let url = self.base_url.join("blockchain-sync-state")?;
let token = self.jwt_manager.get_token()?;
let req: BlockchainSyncStateRequest = sync_state.into();

json_put!(&self.client, url, token, &req).map(|_: Option<()>| ())
json_put!(&self.client, url, token, &req, self.exp_backoff.clone()).map(|_: Option<()>| ())
}

async fn get_sync_state(&self) -> ClientResult<Option<BlockchainSyncState>> {
Expand Down
108 changes: 53 additions & 55 deletions src/clients/macros.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#[macro_export]
/// Make a GET request sending and expecting JSON.
/// Make a GET request sending and expecting JSON with retry using exponential backoff.
/// if JSON deser fails, emit a `WARN` level tracing event
macro_rules! json_get {
($client:expr, $url:expr, $expected:ty, $exp_backoff:expr) => {
Expand All @@ -16,14 +16,13 @@ macro_rules! json_get {
req = req.bearer_auth($auth_token);
}

let resp = if $exp_backoff.is_some() {
match backoff::future::retry_notify(
$exp_backoff.unwrap(),
|| {
let req = req.try_clone().unwrap();
let resp = match backoff::future::retry_notify(
$exp_backoff,
|| {
let req = req.try_clone().unwrap();

async move { req.send().await.map_err(|err| err.into()) }
},
async move { req.send().await.map_err(|err| err.into()) }
},
|error, duration: std::time::Duration| {
let duration = duration.as_secs();

Expand All @@ -35,32 +34,17 @@ macro_rules! json_get {
);
},
)
.await {
Ok(resp) => resp,
Err(error) => {
tracing::warn!(
method = "GET",
url = %url,
?error,
"Failed to send request. All retries failed"
);
.await {
Ok(resp) => resp,
Err(error) => {
tracing::warn!(
method = "GET",
url = %url,
?error,
"Failed to send request. All retries failed"
);

return Err(error.into())
}
}
} else {
match req.send().await {
Err(error) => {
tracing::warn!(
method = "GET",
url = %url,
?error,
"Failed to send request"
);

return Err(error.into())
},
Ok(resp) => resp
return Err(error.into())
}
};

Expand Down Expand Up @@ -92,38 +76,52 @@ macro_rules! json_get {
}

#[macro_export]
/// Make a PUT request sending JSON.
/// Make a PUT request sending JSON with retry using exponential backoff.
/// if JSON deser fails, emit a `WARN` level tracing event
macro_rules! json_put {
($client:expr, $url:expr, $auth_token:expr, $body:expr) => {
json_put!($client, $url, (), $auth_token, $body)
($client:expr, $url:expr, $auth_token:expr, $body:expr, $exp_backoff:expr) => {
json_put!($client, $url, (), $auth_token, $body, $exp_backoff)
};
($client:expr, $url:expr, $expected:ty, $auth_token:expr, $body:expr) => {{
($client:expr, $url:expr, $expected:ty, $auth_token:expr, $body:expr, $exp_backoff:expr) => {{
let url = $url.clone();
let body = format!("{:?}", $body);

tracing::trace!(method = "PUT", url = url.as_str(), body, "Dispatching API client request");

let resp = match backoff::future::retry_notify(
$exp_backoff,
|| {
let req = $client
.put($url.clone())
.bearer_auth($auth_token.clone())
.json($body);

let resp = match $client
.put($url)
.bearer_auth($auth_token)
.json($body)
.send()
.await {
Err(error) => {
tracing::warn!(
method = "PUT",
url = %url,
body = body,
?error,
"Failed to send request"
);
async move { req.send().await.map_err(|err| err.into()) }
},
|error, duration: std::time::Duration| {
let duration = duration.as_secs();

return Err(error.into())
},
Ok(resp) => resp
};
tracing::warn!(
method = "PUT",
url = %url,
?error,
"Failed to send request. Retrying in {duration} seconds…"
);
},
)
.await {
Ok(resp) => resp,
Err(error) => {
tracing::warn!(
method = "PUT",
url = %url,
?error,
"Failed to send request. All retries failed"
);

return Err(error.into())
}
};

let text = resp.text().await?;
let result: $crate::clients::common::ClientResponse<$expected> = text.parse()?;
Expand Down
Loading