Skip to content

Commit eafaadb

Browse files
committed
Changed check_topology_and_refresh_if_diff to return a result
1 parent 77999c6 commit eafaadb

File tree

2 files changed

+50
-14
lines changed

2 files changed

+50
-14
lines changed

redis/src/cluster_async/mod.rs

Lines changed: 27 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1409,6 +1409,9 @@ where
14091409
Ok(())
14101410
}
14111411

1412+
/// Determines if the cluster topology has changed and refreshes slots and subscriptions if needed.
1413+
/// Returns `RedisResult` with `true` if changes were detected and slots were refreshed,
1414+
/// or `false` if no changes were found. Raises an error if refreshing the topology fails.
14121415
pub(crate) async fn check_topology_and_refresh_if_diff(
14131416
inner: Arc<InnerCore<C>>,
14141417
policy: &RefreshPolicy,
@@ -1423,14 +1426,30 @@ where
14231426
async fn periodic_topology_check(inner: Arc<InnerCore<C>>, interval_duration: Duration) {
14241427
loop {
14251428
let _ = boxed_sleep(interval_duration).await;
1426-
let topology_changed =
1427-
Self::check_topology_and_refresh_if_diff(inner.clone(), &RefreshPolicy::Throttable)
1428-
.await;
1429-
if !topology_changed {
1430-
// This serves as a safety measure for validating pubsub subsctiptions state in case it has drifted
1431-
// while topology stayed the same.
1432-
// For example, a failed attempt to refresh a connection which is triggered from refresh_pubsub_subscriptions(),
1433-
// might leave a node unconnected indefinitely in case topology is stable and no request are attempted to this node.
1429+
1430+
// Check and refresh topology if needed
1431+
let should_refresh_pubsub = match Self::check_topology_and_refresh_if_diff(
1432+
inner.clone(),
1433+
&RefreshPolicy::Throttable,
1434+
)
1435+
.await
1436+
{
1437+
Ok(topology_changed) => !topology_changed,
1438+
Err(err) => {
1439+
warn!(
1440+
"Failed to refresh slots during periodic topology checks:\n{:?}",
1441+
err
1442+
);
1443+
true
1444+
}
1445+
};
1446+
1447+
// Refresh pubsub subscriptions if topology wasn't changed or an error occurred.
1448+
// This serves as a safety measure for validating pubsub subsctiptions state in case it has drifted
1449+
// while topology stayed the same.
1450+
// For example, a failed attempt to refresh a connection which is triggered from refresh_pubsub_subscriptions(),
1451+
// might leave a node unconnected indefinitely in case topology is stable and no request are attempted to this node.
1452+
if should_refresh_pubsub {
14341453
Self::refresh_pubsub_subscriptions(inner.clone()).await;
14351454
}
14361455
}

redis/src/commands/cluster_scan.rs

Lines changed: 23 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -150,7 +150,7 @@ pub(crate) trait ClusterInScan {
150150
async fn are_all_slots_covered(&self) -> bool;
151151

152152
/// Check if the topology of the cluster has changed and refresh the slots if needed
153-
async fn refresh_if_topology_changed(&self);
153+
async fn refresh_if_topology_changed(&self) -> RedisResult<bool>;
154154
}
155155

156156
/// Represents the state of a scan operation in a Redis cluster.
@@ -288,7 +288,16 @@ impl ScanState {
288288
&mut self,
289289
connection: &C,
290290
) -> RedisResult<ScanState> {
291-
let _ = connection.refresh_if_topology_changed().await;
291+
connection
292+
.refresh_if_topology_changed()
293+
.await
294+
.map_err(|err| {
295+
RedisError::from((
296+
ErrorKind::ResponseError,
297+
"Error during cluster scan: failed to refresh slots",
298+
format!("{:?}", err),
299+
))
300+
})?;
292301
let mut scanned_slots_map = self.scanned_slots_map;
293302
// If the address epoch changed it mean that some slots in the address are new, so we cant know which slots been there from the beginning and which are new, or out and in later.
294303
// In this case we will skip updating the scanned_slots_map and will just update the address and the cursor
@@ -387,14 +396,14 @@ where
387396
async fn are_all_slots_covered(&self) -> bool {
388397
ClusterConnInner::<C>::check_if_all_slots_covered(&self.conn_lock.read().await.slot_map)
389398
}
390-
async fn refresh_if_topology_changed(&self) {
399+
async fn refresh_if_topology_changed(&self) -> RedisResult<bool> {
391400
ClusterConnInner::check_topology_and_refresh_if_diff(
392401
self.to_owned(),
393402
// The cluster SCAN implementation must refresh the slots when a topology change is found
394403
// to ensure the scan logic is correct.
395404
&RefreshPolicy::NotThrottable,
396405
)
397-
.await;
406+
.await
398407
}
399408
}
400409

@@ -529,7 +538,13 @@ where
529538
{
530539
// TODO: This mechanism of refreshing on failure to route to address should be part of the routing mechanism
531540
// After the routing mechanism is updated to handle this case, this refresh in the case bellow should be removed
532-
core.refresh_if_topology_changed().await;
541+
core.refresh_if_topology_changed().await.map_err(|err| {
542+
RedisError::from((
543+
ErrorKind::ResponseError,
544+
"Error during cluster scan: failed to refresh slots",
545+
format!("{:?}", err),
546+
))
547+
})?;
533548
if !core.are_all_slots_covered().await {
534549
return Err(RedisError::from((
535550
ErrorKind::NotAllSlotsCovered,
@@ -615,7 +630,9 @@ mod tests {
615630
struct MockConnection;
616631
#[async_trait]
617632
impl ClusterInScan for MockConnection {
618-
async fn refresh_if_topology_changed(&self) {}
633+
async fn refresh_if_topology_changed(&self) -> RedisResult<bool> {
634+
Ok(true)
635+
}
619636
async fn get_address_by_slot(&self, _slot: u16) -> RedisResult<String> {
620637
Ok("mock_address".to_string())
621638
}

0 commit comments

Comments
 (0)