Skip to content

Commit 0e7ffe0

Browse files
committed
Atomic asyncronous stop/start swarm
1 parent 60a4473 commit 0e7ffe0

File tree

6 files changed

+74
-27
lines changed

6 files changed

+74
-27
lines changed

src/Interpreters/ClusterDiscovery.cpp

Lines changed: 32 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,13 @@ class ClusterDiscovery::Flags
108108
cv.notify_one();
109109
}
110110

111+
void wakeup()
112+
{
113+
std::unique_lock<std::mutex> lk(mu);
114+
any_need_update = true;
115+
cv.notify_one();
116+
}
117+
111118
private:
112119
std::condition_variable cv;
113120
std::mutex mu;
@@ -528,20 +535,14 @@ void ClusterDiscovery::initialUpdate()
528535

529536
void ClusterDiscovery::registerAll()
530537
{
531-
for (auto & [_, info] : clusters_info)
532-
{
533-
auto zk = context->getDefaultOrAuxiliaryZooKeeper(info.zk_name);
534-
registerInZk(zk, info);
535-
}
538+
register_change_flag = RegisterChangeFlag::RCF_REGISTER_ALL;
539+
clusters_to_update->wakeup();
536540
}
537541

538542
void ClusterDiscovery::unregisterAll()
539543
{
540-
for (auto & [_, info] : clusters_info)
541-
{
542-
auto zk = context->getDefaultOrAuxiliaryZooKeeper(info.zk_name);
543-
unregisterFromZk(zk, info);
544-
}
544+
register_change_flag = RegisterChangeFlag::RCF_UNREGISTER_ALL;
545+
clusters_to_update->wakeup();
545546
}
546547

547548
void ClusterDiscovery::findDynamicClusters(
@@ -767,6 +768,27 @@ bool ClusterDiscovery::runMainThread(std::function<void()> up_to_date_callback)
767768
{
768769
up_to_date_callback();
769770
}
771+
772+
RegisterChangeFlag flag = register_change_flag.exchange(RegisterChangeFlag::RCF_NONE);
773+
774+
if (flag == RegisterChangeFlag::RCF_REGISTER_ALL)
775+
{
776+
LOG_DEBUG(log, "Register in all dynamic clusters");
777+
for (auto & [_, info] : clusters_info)
778+
{
779+
auto zk = context->getDefaultOrAuxiliaryZooKeeper(info.zk_name);
780+
registerInZk(zk, info);
781+
}
782+
}
783+
else if (flag == RegisterChangeFlag::RCF_UNREGISTER_ALL)
784+
{
785+
LOG_DEBUG(log, "Unregister in all dynamic clusters");
786+
for (auto & [_, info] : clusters_info)
787+
{
788+
auto zk = context->getDefaultOrAuxiliaryZooKeeper(info.zk_name);
789+
unregisterFromZk(zk, info);
790+
}
791+
}
770792
}
771793
LOG_DEBUG(log, "Worker thread stopped");
772794
return finished;

src/Interpreters/ClusterDiscovery.h

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -211,6 +211,15 @@ class ClusterDiscovery
211211
std::shared_ptr<std::vector<std::shared_ptr<MulticlusterDiscovery>>> multicluster_discovery_paths;
212212

213213
MultiVersion<Macros>::Version macros;
214+
215+
enum RegisterChangeFlag
216+
{
217+
RCF_NONE,
218+
RCF_REGISTER_ALL,
219+
RCF_UNREGISTER_ALL,
220+
};
221+
222+
std::atomic<RegisterChangeFlag> register_change_flag = RegisterChangeFlag::RCF_NONE;
214223
};
215224

216225
}

src/Interpreters/Context.cpp

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -750,6 +750,7 @@ struct ContextSharedPart : boost::noncopyable
750750
void shutdown() TSA_NO_THREAD_SAFETY_ANALYSIS
751751
{
752752
swarm_mode_enabled = false;
753+
CurrentMetrics::set(CurrentMetrics::IsSwarmModeEnabled, 0);
753754
bool is_shutdown_called = shutdown_called.exchange(true);
754755
if (is_shutdown_called)
755756
return;
@@ -4665,15 +4666,15 @@ std::shared_ptr<Cluster> Context::tryGetCluster(const std::string & cluster_name
46654666
return res;
46664667
}
46674668

4668-
void Context::unregisterInDynamicClusters()
4669+
void Context::unregisterInAutodiscoveryClusters()
46694670
{
46704671
std::lock_guard lock(shared->clusters_mutex);
46714672
if (!shared->cluster_discovery)
46724673
return;
46734674
shared->cluster_discovery->unregisterAll();
46744675
}
46754676

4676-
void Context::registerInDynamicClusters()
4677+
void Context::registerInAutodiscoveryClusters()
46774678
{
46784679
std::lock_guard lock(shared->clusters_mutex);
46794680
if (!shared->cluster_discovery)
@@ -5562,16 +5563,24 @@ void Context::shutdown() TSA_NO_THREAD_SAFETY_ANALYSIS
55625563
shared->shutdown();
55635564
}
55645565

5565-
void Context::stopSwarmMode()
5566+
bool Context::stopSwarmMode()
55665567
{
5567-
CurrentMetrics::set(CurrentMetrics::IsSwarmModeEnabled, 0);
5568-
shared->swarm_mode_enabled = false;
5568+
bool expected_is_enabled = true;
5569+
bool is_stopped_now = shared->swarm_mode_enabled.compare_exchange_strong(expected_is_enabled, false);
5570+
if (is_stopped_now)
5571+
CurrentMetrics::set(CurrentMetrics::IsSwarmModeEnabled, 0);
5572+
// return true if stop successful
5573+
return is_stopped_now;
55695574
}
55705575

5571-
void Context::startSwarmMode()
5576+
bool Context::startSwarmMode()
55725577
{
5573-
shared->swarm_mode_enabled = true;
5574-
CurrentMetrics::set(CurrentMetrics::IsSwarmModeEnabled, 1);
5578+
bool expected_is_enabled = false;
5579+
bool is_started_now = shared->swarm_mode_enabled.compare_exchange_strong(expected_is_enabled, true);
5580+
if (is_started_now)
5581+
CurrentMetrics::set(CurrentMetrics::IsSwarmModeEnabled, 1);
5582+
// return true if start successful
5583+
return is_started_now;
55755584
}
55765585

55775586
bool Context::isSwarmModeEnabled() const

src/Interpreters/Context.h

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1296,8 +1296,8 @@ class Context: public ContextData, public std::enable_shared_from_this<Context>
12961296
size_t getClustersVersion() const;
12971297

12981298
void startClusterDiscovery();
1299-
void registerInDynamicClusters();
1300-
void unregisterInDynamicClusters();
1299+
void registerInAutodiscoveryClusters();
1300+
void unregisterInAutodiscoveryClusters();
13011301

13021302
/// Sets custom cluster, but doesn't update configuration
13031303
void setCluster(const String & cluster_name, const std::shared_ptr<Cluster> & cluster);
@@ -1410,9 +1410,13 @@ class Context: public ContextData, public std::enable_shared_from_this<Context>
14101410

14111411
void shutdown();
14121412

1413-
/// Stop some works to allow graceful shutdown later
1414-
void stopSwarmMode();
1415-
void startSwarmMode();
1413+
/// Stop some works to allow graceful shutdown later.
1414+
/// Returns true if stop successful.
1415+
bool stopSwarmMode();
1416+
/// Resume some works if we change our mind.
1417+
/// Returns true if start successful.
1418+
bool startSwarmMode();
1419+
/// Return current swarm mode state.
14161420
bool isSwarmModeEnabled() const;
14171421

14181422
bool isInternalQuery() const { return is_internal_query; }

src/Interpreters/InterpreterSystemQuery.cpp

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -718,15 +718,15 @@ BlockIO InterpreterSystemQuery::execute()
718718
case Type::STOP_SWARM_MODE:
719719
{
720720
getContext()->checkAccess(AccessType::SYSTEM_SWARM);
721-
getContext()->stopSwarmMode();
722-
getContext()->unregisterInDynamicClusters();
721+
if (getContext()->stopSwarmMode())
722+
getContext()->unregisterInAutodiscoveryClusters();
723723
break;
724724
}
725725
case Type::START_SWARM_MODE:
726726
{
727727
getContext()->checkAccess(AccessType::SYSTEM_SWARM);
728-
getContext()->registerInDynamicClusters();
729-
getContext()->startSwarmMode();
728+
if (getContext()->startSwarmMode())
729+
getContext()->registerInAutodiscoveryClusters();
730730
break;
731731
}
732732
case Type::STOP_FETCHES:

src/QueryPipeline/RemoteQueryExecutorReadContext.cpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,9 @@ void RemoteQueryExecutorReadContext::Task::run(AsyncCallback async_callback, Sus
7474
}
7575
catch (const Exception & e)
7676
{
77+
/// If cluster node unxepectedly shutted down (kill/segfault/power off/etc.) socket just closes.
78+
/// If initiator did not process any data packets before, this fact can be ignored.
79+
/// Unprocessed tasks will be executed on other nodes.
7780
if (e.code() == ErrorCodes::ATTEMPT_TO_READ_AFTER_EOF
7881
&& !read_context.has_data_packets.load() && read_context.executor.skipUnavailableShards())
7982
{

0 commit comments

Comments
 (0)