@@ -108,6 +108,13 @@ class ClusterDiscovery::Flags
108
108
cv.notify_one ();
109
109
}
110
110
111
+ void wakeup ()
112
+ {
113
+ std::unique_lock<std::mutex> lk (mu);
114
+ any_need_update = true ;
115
+ cv.notify_one ();
116
+ }
117
+
111
118
private:
112
119
std::condition_variable cv;
113
120
std::mutex mu;
@@ -391,7 +398,9 @@ bool ClusterDiscovery::upsertCluster(ClusterInfo & cluster_info)
391
398
return true ;
392
399
};
393
400
394
- if (!cluster_info.current_node_is_observer && !contains (node_uuids, current_node_name))
401
+ if (!cluster_info.current_node_is_observer
402
+ && context->isSwarmModeEnabled ()
403
+ && !contains (node_uuids, current_node_name))
395
404
{
396
405
LOG_ERROR (log, " Can't find current node in cluster '{}', will register again" , cluster_info.name );
397
406
registerInZk (zk, cluster_info);
@@ -455,12 +464,30 @@ void ClusterDiscovery::registerInZk(zkutil::ZooKeeperPtr & zk, ClusterInfo & inf
455
464
return ;
456
465
}
457
466
467
+ if (!context->isSwarmModeEnabled ())
468
+ {
469
+ LOG_DEBUG (log, " STOP SWARM MODE called, skip self-registering current node {} in cluster {}" , current_node_name, info.name );
470
+ return ;
471
+ }
472
+
458
473
LOG_DEBUG (log, " Registering current node {} in cluster {}" , current_node_name, info.name );
459
474
460
475
zk->createOrUpdate (node_path, info.current_node .serialize (), zkutil::CreateMode::Ephemeral);
461
476
LOG_DEBUG (log, " Current node {} registered in cluster {}" , current_node_name, info.name );
462
477
}
463
478
479
+ void ClusterDiscovery::unregisterFromZk (zkutil::ZooKeeperPtr & zk, ClusterInfo & info)
480
+ {
481
+ if (info.current_node_is_observer )
482
+ return ;
483
+
484
+ String node_path = getShardsListPath (info.zk_root ) / current_node_name;
485
+ LOG_DEBUG (log, " Removing current node {} from cluster {}" , current_node_name, info.name );
486
+
487
+ zk->remove (node_path);
488
+ LOG_DEBUG (log, " Current node {} removed from cluster {}" , current_node_name, info.name );
489
+ }
490
+
464
491
void ClusterDiscovery::initialUpdate ()
465
492
{
466
493
LOG_DEBUG (log, " Initializing" );
@@ -506,6 +533,18 @@ void ClusterDiscovery::initialUpdate()
506
533
is_initialized = true ;
507
534
}
508
535
536
+ void ClusterDiscovery::registerAll ()
537
+ {
538
+ register_change_flag = RegisterChangeFlag::RCF_REGISTER_ALL;
539
+ clusters_to_update->wakeup ();
540
+ }
541
+
542
+ void ClusterDiscovery::unregisterAll ()
543
+ {
544
+ register_change_flag = RegisterChangeFlag::RCF_UNREGISTER_ALL;
545
+ clusters_to_update->wakeup ();
546
+ }
547
+
509
548
void ClusterDiscovery::findDynamicClusters (
510
549
std::unordered_map<String, ClusterDiscovery::ClusterInfo> & info,
511
550
std::unordered_set<size_t > * unchanged_roots)
@@ -729,6 +768,27 @@ bool ClusterDiscovery::runMainThread(std::function<void()> up_to_date_callback)
729
768
{
730
769
up_to_date_callback ();
731
770
}
771
+
772
+ RegisterChangeFlag flag = register_change_flag.exchange (RegisterChangeFlag::RCF_NONE);
773
+
774
+ if (flag == RegisterChangeFlag::RCF_REGISTER_ALL)
775
+ {
776
+ LOG_DEBUG (log, " Register in all dynamic clusters" );
777
+ for (auto & [_, info] : clusters_info)
778
+ {
779
+ auto zk = context->getDefaultOrAuxiliaryZooKeeper (info.zk_name );
780
+ registerInZk (zk, info);
781
+ }
782
+ }
783
+ else if (flag == RegisterChangeFlag::RCF_UNREGISTER_ALL)
784
+ {
785
+ LOG_DEBUG (log, " Unregister in all dynamic clusters" );
786
+ for (auto & [_, info] : clusters_info)
787
+ {
788
+ auto zk = context->getDefaultOrAuxiliaryZooKeeper (info.zk_name );
789
+ unregisterFromZk (zk, info);
790
+ }
791
+ }
732
792
}
733
793
LOG_DEBUG (log, " Worker thread stopped" );
734
794
return finished;
0 commit comments