Skip to content

Commit 77999c6

Browse files
committed
Added moved_redirect to the refresh slots function
1 parent 2d7200f commit 77999c6

File tree

1 file changed

+58
-12
lines changed

1 file changed

+58
-12
lines changed

redis/src/cluster_async/mod.rs

Lines changed: 58 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -625,6 +625,27 @@ impl From<String> for OperationTarget {
625625
}
626626
}
627627

628+
/// Represents a node to which a `MOVED` or `ASK` error redirects.
629+
#[derive(Clone, Debug)]
630+
pub(crate) struct RedirectNode {
631+
/// The address of the redirect node.
632+
pub _address: String,
633+
/// The slot of the redirect node.
634+
pub _slot: u16,
635+
}
636+
637+
impl RedirectNode {
638+
/// This function expects an `Option` containing a tuple with a string slice and a u16.
639+
/// The tuple represents an address and a slot, respectively. If the input is `Some`,
640+
/// the function converts the address to a `String` and constructs a `RedirectNode`.
641+
pub(crate) fn from_option_tuple(option: Option<(&str, u16)>) -> Option<Self> {
642+
option.map(|(address, slot)| RedirectNode {
643+
_address: address.to_string(),
644+
_slot: slot,
645+
})
646+
}
647+
}
648+
628649
struct Message<C> {
629650
cmd: CmdArg<C>,
630651
sender: oneshot::Sender<RedisResult<Response>>,
@@ -772,6 +793,7 @@ enum Next<C> {
772793
// if not set, then a slot refresh should happen without sending a request afterwards
773794
request: Option<PendingRequest<C>>,
774795
sleep_duration: Option<Duration>,
796+
moved_redirect: Option<RedirectNode>,
775797
},
776798
ReconnectToInitialNodes {
777799
// if not set, then a reconnect should happen without sending a request afterwards
@@ -816,6 +838,7 @@ impl<C> Future for Request<C> {
816838
Next::RefreshSlots {
817839
request: None,
818840
sleep_duration: None,
841+
moved_redirect: RedirectNode::from_option_tuple(err.redirect_node()),
819842
}
820843
.into()
821844
} else if matches!(err.retry_method(), crate::types::RetryMethod::Reconnect) {
@@ -861,6 +884,7 @@ impl<C> Future for Request<C> {
861884
return Next::RefreshSlots {
862885
request: Some(request),
863886
sleep_duration: Some(sleep_duration),
887+
moved_redirect: None,
864888
}
865889
.into();
866890
}
@@ -879,13 +903,15 @@ impl<C> Future for Request<C> {
879903
}
880904
crate::types::RetryMethod::MovedRedirect => {
881905
let mut request = this.request.take().unwrap();
906+
let redirect_node = err.redirect_node();
882907
request.info.set_redirect(
883908
err.redirect_node()
884909
.map(|(node, _slot)| Redirect::Moved(node.to_string())),
885910
);
886911
Next::RefreshSlots {
887912
request: Some(request),
888913
sleep_duration: None,
914+
moved_redirect: RedirectNode::from_option_tuple(redirect_node),
889915
}
890916
.into()
891917
}
@@ -994,6 +1020,7 @@ where
9941020
Self::refresh_slots_and_subscriptions_with_retries(
9951021
connection.inner.clone(),
9961022
&RefreshPolicy::NotThrottable,
1023+
None,
9971024
)
9981025
.await?;
9991026

@@ -1137,6 +1164,7 @@ where
11371164
if let Err(err) = Self::refresh_slots_and_subscriptions_with_retries(
11381165
inner.clone(),
11391166
&RefreshPolicy::Throttable,
1167+
None,
11401168
)
11411169
.await
11421170
{
@@ -1308,6 +1336,7 @@ where
13081336
async fn refresh_slots_and_subscriptions_with_retries(
13091337
inner: Arc<InnerCore<C>>,
13101338
policy: &RefreshPolicy,
1339+
moved_redirect: Option<RedirectNode>,
13111340
) -> RedisResult<()> {
13121341
let SlotRefreshState {
13131342
in_progress,
@@ -1321,7 +1350,7 @@ where
13211350
{
13221351
return Ok(());
13231352
}
1324-
let mut skip_slots_refresh = false;
1353+
let mut should_refresh_slots = true;
13251354
if *policy == RefreshPolicy::Throttable {
13261355
// Check if the current slot refresh is triggered before the wait duration has passed
13271356
let last_run_rlock = last_run.read().await;
@@ -1340,13 +1369,13 @@ where
13401369
if passed_time <= wait_duration {
13411370
debug!("Skipping slot refresh as the wait duration hasn't yet passed. Passed time = {:?},
13421371
Wait duration = {:?}", passed_time, wait_duration);
1343-
skip_slots_refresh = true;
1372+
should_refresh_slots = false;
13441373
}
13451374
}
13461375
}
13471376

13481377
let mut res = Ok(());
1349-
if !skip_slots_refresh {
1378+
if should_refresh_slots {
13501379
let retry_strategy = ExponentialBackoff {
13511380
initial_interval: DEFAULT_REFRESH_SLOTS_RETRY_INITIAL_INTERVAL,
13521381
max_interval: DEFAULT_REFRESH_SLOTS_RETRY_MAX_INTERVAL,
@@ -1359,6 +1388,10 @@ where
13591388
Self::refresh_slots(inner.clone(), curr_retry)
13601389
})
13611390
.await;
1391+
} else if moved_redirect.is_some() {
1392+
// Update relevant slots in the slots map based on the moved_redirect address,
1393+
// rather than refreshing all slots by querying the cluster nodes for their topology view.
1394+
Self::update_slots_for_redirect_change(inner.clone(), moved_redirect).await?;
13621395
}
13631396
in_progress.store(false, Ordering::Relaxed);
13641397

@@ -1367,15 +1400,24 @@ where
13671400
res
13681401
}
13691402

1403+
/// Update relevant slots in the slots map based on the moved_redirect address
1404+
pub(crate) async fn update_slots_for_redirect_change(
1405+
_inner: Arc<InnerCore<C>>,
1406+
_moved_redirect: Option<RedirectNode>,
1407+
) -> RedisResult<()> {
1408+
// TODO: Add implementation
1409+
Ok(())
1410+
}
1411+
13701412
pub(crate) async fn check_topology_and_refresh_if_diff(
13711413
inner: Arc<InnerCore<C>>,
13721414
policy: &RefreshPolicy,
1373-
) -> bool {
1415+
) -> RedisResult<bool> {
13741416
let topology_changed = Self::check_for_topology_diff(inner.clone()).await;
13751417
if topology_changed {
1376-
let _ = Self::refresh_slots_and_subscriptions_with_retries(inner.clone(), policy).await;
1418+
Self::refresh_slots_and_subscriptions_with_retries(inner.clone(), policy, None).await?;
13771419
}
1378-
topology_changed
1420+
Ok(topology_changed)
13791421
}
13801422

13811423
async fn periodic_topology_check(inner: Arc<InnerCore<C>>, interval_duration: Duration) {
@@ -1963,6 +2005,7 @@ where
19632005
*future = Box::pin(Self::refresh_slots_and_subscriptions_with_retries(
19642006
self.inner.clone(),
19652007
&RefreshPolicy::Throttable,
2008+
None,
19662009
));
19672010
Poll::Ready(Err(err))
19682011
}
@@ -2058,9 +2101,10 @@ where
20582101
Next::RefreshSlots {
20592102
request,
20602103
sleep_duration,
2104+
moved_redirect,
20612105
} => {
2062-
poll_flush_action =
2063-
poll_flush_action.change_state(PollFlushAction::RebuildSlots);
2106+
poll_flush_action = poll_flush_action
2107+
.change_state(PollFlushAction::RebuildSlots(moved_redirect));
20642108
if let Some(request) = request {
20652109
let future: RequestState<
20662110
Pin<Box<dyn Future<Output = OperationResult> + Send>>,
@@ -2130,7 +2174,7 @@ where
21302174

21312175
enum PollFlushAction {
21322176
None,
2133-
RebuildSlots,
2177+
RebuildSlots(Option<RedirectNode>),
21342178
Reconnect(Vec<String>),
21352179
ReconnectFromInitialConnections,
21362180
}
@@ -2145,8 +2189,9 @@ impl PollFlushAction {
21452189
PollFlushAction::ReconnectFromInitialConnections
21462190
}
21472191

2148-
(PollFlushAction::RebuildSlots, _) | (_, PollFlushAction::RebuildSlots) => {
2149-
PollFlushAction::RebuildSlots
2192+
(PollFlushAction::RebuildSlots(moved_redirect), _)
2193+
| (_, PollFlushAction::RebuildSlots(moved_redirect)) => {
2194+
PollFlushAction::RebuildSlots(moved_redirect)
21502195
}
21512196

21522197
(PollFlushAction::Reconnect(mut addrs), PollFlushAction::Reconnect(new_addrs)) => {
@@ -2207,11 +2252,12 @@ where
22072252

22082253
match ready!(self.poll_complete(cx)) {
22092254
PollFlushAction::None => return Poll::Ready(Ok(())),
2210-
PollFlushAction::RebuildSlots => {
2255+
PollFlushAction::RebuildSlots(moved_redirect) => {
22112256
self.state = ConnectionState::Recover(RecoverFuture::RecoverSlots(Box::pin(
22122257
ClusterConnInner::refresh_slots_and_subscriptions_with_retries(
22132258
self.inner.clone(),
22142259
&RefreshPolicy::Throttable,
2260+
moved_redirect,
22152261
),
22162262
)));
22172263
}

0 commit comments

Comments
 (0)