Skip to content

Commit df3fd1a

Browse files
authored
Merge pull request #1014 from Altinity/feature/system_preshutdown_v2
SYSTEM STOP SWARM MODE command for graceful shutdown swarm node merge attempt v2
2 parents 89d7ee8 + e74130f commit df3fd1a

35 files changed

+320
-17
lines changed

docs/en/sql-reference/statements/system.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -206,6 +206,12 @@ SYSTEM RELOAD USERS [ON CLUSTER cluster_name]
206206
207207
Normally shuts down ClickHouse (like `service clickhouse-server stop` / `kill {$pid_clickhouse-server}`)
208208
209+
## PRESHUTDOWN {#preshutdown}
210+
211+
<CloudNotSupportedBadge/>
212+
213+
Prepare node for graceful shutdown. Unregister in autodiscovered clusters, stop accepting distributed requests to object storages (s3Cluster, icebergCluster, etc.).
214+
209215
## KILL {#kill}
210216
211217
Aborts ClickHouse process (like `kill -9 {$ pid_clickhouse-server}`)

programs/server/Server.cpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2292,6 +2292,8 @@ try
22922292

22932293
}
22942294

2295+
global_context->startSwarmMode();
2296+
22952297
{
22962298
std::lock_guard lock(servers_lock);
22972299
/// We should start interserver communications before (and more important shutdown after) tables.
@@ -2701,6 +2703,8 @@ try
27012703

27022704
is_cancelled = true;
27032705

2706+
global_context->stopSwarmMode();
2707+
27042708
LOG_DEBUG(log, "Waiting for current connections to close.");
27052709

27062710
size_t current_connections = 0;

src/Access/Common/AccessType.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -200,6 +200,7 @@ enum class AccessType : uint8_t
200200
M(SYSTEM_TTL_MERGES, "SYSTEM STOP TTL MERGES, SYSTEM START TTL MERGES, STOP TTL MERGES, START TTL MERGES", TABLE, SYSTEM) \
201201
M(SYSTEM_FETCHES, "SYSTEM STOP FETCHES, SYSTEM START FETCHES, STOP FETCHES, START FETCHES", TABLE, SYSTEM) \
202202
M(SYSTEM_MOVES, "SYSTEM STOP MOVES, SYSTEM START MOVES, STOP MOVES, START MOVES", TABLE, SYSTEM) \
203+
M(SYSTEM_SWARM, "SYSTEM STOP SWARM MODE, SYSTEM START SWARM MODE, STOP SWARM MODE, START SWARM MODE", GLOBAL, SYSTEM) \
203204
M(SYSTEM_PULLING_REPLICATION_LOG, "SYSTEM STOP PULLING REPLICATION LOG, SYSTEM START PULLING REPLICATION LOG", TABLE, SYSTEM) \
204205
M(SYSTEM_CLEANUP, "SYSTEM STOP CLEANUP, SYSTEM START CLEANUP", TABLE, SYSTEM) \
205206
M(SYSTEM_VIEWS, "SYSTEM REFRESH VIEW, SYSTEM START VIEWS, SYSTEM STOP VIEWS, SYSTEM START VIEW, SYSTEM STOP VIEW, SYSTEM CANCEL VIEW, REFRESH VIEW, START VIEWS, STOP VIEWS, START VIEW, STOP VIEW, CANCEL VIEW", VIEW, SYSTEM) \

src/Common/CurrentMetrics.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -430,6 +430,7 @@
430430
M(StartupScriptsExecutionState, "State of startup scripts execution: 0 = not finished, 1 = success, 2 = failure.") \
431431
\
432432
M(IsServerShuttingDown, "Indicates if the server is shutting down: 0 = no, 1 = yes") \
433+
M(IsSwarmModeEnabled, "Indicates if the swarm mode enabled or not: 0 = disabled, 1 = enabled") \
433434
\
434435
M(TotalMergeFailures, "Number of all failed merges since startup, including the ones that were aborted") \
435436
M(NonAbortedMergeFailures, "Number of failed merges since startup, excluding the merges that were aborted") \

src/Interpreters/ClusterDiscovery.cpp

Lines changed: 61 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,13 @@ class ClusterDiscovery::Flags
108108
cv.notify_one();
109109
}
110110

111+
void wakeup()
112+
{
113+
std::unique_lock<std::mutex> lk(mu);
114+
any_need_update = true;
115+
cv.notify_one();
116+
}
117+
111118
private:
112119
std::condition_variable cv;
113120
std::mutex mu;
@@ -391,7 +398,9 @@ bool ClusterDiscovery::upsertCluster(ClusterInfo & cluster_info)
391398
return true;
392399
};
393400

394-
if (!cluster_info.current_node_is_observer && !contains(node_uuids, current_node_name))
401+
if (!cluster_info.current_node_is_observer
402+
&& context->isSwarmModeEnabled()
403+
&& !contains(node_uuids, current_node_name))
395404
{
396405
LOG_ERROR(log, "Can't find current node in cluster '{}', will register again", cluster_info.name);
397406
registerInZk(zk, cluster_info);
@@ -455,12 +464,30 @@ void ClusterDiscovery::registerInZk(zkutil::ZooKeeperPtr & zk, ClusterInfo & inf
455464
return;
456465
}
457466

467+
if (!context->isSwarmModeEnabled())
468+
{
469+
LOG_DEBUG(log, "STOP SWARM MODE called, skip self-registering current node {} in cluster {}", current_node_name, info.name);
470+
return;
471+
}
472+
458473
LOG_DEBUG(log, "Registering current node {} in cluster {}", current_node_name, info.name);
459474

460475
zk->createOrUpdate(node_path, info.current_node.serialize(), zkutil::CreateMode::Ephemeral);
461476
LOG_DEBUG(log, "Current node {} registered in cluster {}", current_node_name, info.name);
462477
}
463478

479+
void ClusterDiscovery::unregisterFromZk(zkutil::ZooKeeperPtr & zk, ClusterInfo & info)
480+
{
481+
if (info.current_node_is_observer)
482+
return;
483+
484+
String node_path = getShardsListPath(info.zk_root) / current_node_name;
485+
LOG_DEBUG(log, "Removing current node {} from cluster {}", current_node_name, info.name);
486+
487+
zk->remove(node_path);
488+
LOG_DEBUG(log, "Current node {} removed from cluster {}", current_node_name, info.name);
489+
}
490+
464491
void ClusterDiscovery::initialUpdate()
465492
{
466493
LOG_DEBUG(log, "Initializing");
@@ -506,6 +533,18 @@ void ClusterDiscovery::initialUpdate()
506533
is_initialized = true;
507534
}
508535

536+
void ClusterDiscovery::registerAll()
537+
{
538+
register_change_flag = RegisterChangeFlag::RCF_REGISTER_ALL;
539+
clusters_to_update->wakeup();
540+
}
541+
542+
void ClusterDiscovery::unregisterAll()
543+
{
544+
register_change_flag = RegisterChangeFlag::RCF_UNREGISTER_ALL;
545+
clusters_to_update->wakeup();
546+
}
547+
509548
void ClusterDiscovery::findDynamicClusters(
510549
std::unordered_map<String, ClusterDiscovery::ClusterInfo> & info,
511550
std::unordered_set<size_t> * unchanged_roots)
@@ -729,6 +768,27 @@ bool ClusterDiscovery::runMainThread(std::function<void()> up_to_date_callback)
729768
{
730769
up_to_date_callback();
731770
}
771+
772+
RegisterChangeFlag flag = register_change_flag.exchange(RegisterChangeFlag::RCF_NONE);
773+
774+
if (flag == RegisterChangeFlag::RCF_REGISTER_ALL)
775+
{
776+
LOG_DEBUG(log, "Register in all dynamic clusters");
777+
for (auto & [_, info] : clusters_info)
778+
{
779+
auto zk = context->getDefaultOrAuxiliaryZooKeeper(info.zk_name);
780+
registerInZk(zk, info);
781+
}
782+
}
783+
else if (flag == RegisterChangeFlag::RCF_UNREGISTER_ALL)
784+
{
785+
LOG_DEBUG(log, "Unregister in all dynamic clusters");
786+
for (auto & [_, info] : clusters_info)
787+
{
788+
auto zk = context->getDefaultOrAuxiliaryZooKeeper(info.zk_name);
789+
unregisterFromZk(zk, info);
790+
}
791+
}
732792
}
733793
LOG_DEBUG(log, "Worker thread stopped");
734794
return finished;

src/Interpreters/ClusterDiscovery.h

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,9 @@ class ClusterDiscovery
3838

3939
~ClusterDiscovery();
4040

41+
void registerAll();
42+
void unregisterAll();
43+
4144
private:
4245
struct NodeInfo
4346
{
@@ -125,6 +128,7 @@ class ClusterDiscovery
125128
void initialUpdate();
126129

127130
void registerInZk(zkutil::ZooKeeperPtr & zk, ClusterInfo & info);
131+
void unregisterFromZk(zkutil::ZooKeeperPtr & zk, ClusterInfo & info);
128132

129133
Strings getNodeNames(zkutil::ZooKeeperPtr & zk,
130134
const String & zk_root,
@@ -207,6 +211,15 @@ class ClusterDiscovery
207211
std::shared_ptr<std::vector<std::shared_ptr<MulticlusterDiscovery>>> multicluster_discovery_paths;
208212

209213
MultiVersion<Macros>::Version macros;
214+
215+
enum RegisterChangeFlag
216+
{
217+
RCF_NONE,
218+
RCF_REGISTER_ALL,
219+
RCF_UNREGISTER_ALL,
220+
};
221+
222+
std::atomic<RegisterChangeFlag> register_change_flag = RegisterChangeFlag::RCF_NONE;
210223
};
211224

212225
}

src/Interpreters/Context.cpp

Lines changed: 43 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -188,6 +188,7 @@ namespace CurrentMetrics
188188
extern const Metric IcebergCatalogThreads;
189189
extern const Metric IcebergCatalogThreadsActive;
190190
extern const Metric IcebergCatalogThreadsScheduled;
191+
extern const Metric IsSwarmModeEnabled;
191192
}
192193

193194

@@ -579,6 +580,7 @@ struct ContextSharedPart : boost::noncopyable
579580
std::map<String, UInt16> server_ports;
580581

581582
std::atomic<bool> shutdown_called = false;
583+
std::atomic<bool> swarm_mode_enabled = true;
582584

583585
Stopwatch uptime_watch TSA_GUARDED_BY(mutex);
584586

@@ -747,6 +749,8 @@ struct ContextSharedPart : boost::noncopyable
747749
*/
748750
void shutdown() TSA_NO_THREAD_SAFETY_ANALYSIS
749751
{
752+
swarm_mode_enabled = false;
753+
CurrentMetrics::set(CurrentMetrics::IsSwarmModeEnabled, 0);
750754
bool is_shutdown_called = shutdown_called.exchange(true);
751755
if (is_shutdown_called)
752756
return;
@@ -4644,7 +4648,6 @@ std::shared_ptr<Cluster> Context::getCluster(const std::string & cluster_name) c
46444648
throw Exception(ErrorCodes::CLUSTER_DOESNT_EXIST, "Requested cluster '{}' not found", cluster_name);
46454649
}
46464650

4647-
46484651
std::shared_ptr<Cluster> Context::tryGetCluster(const std::string & cluster_name) const
46494652
{
46504653
std::shared_ptr<Cluster> res = nullptr;
@@ -4663,6 +4666,21 @@ std::shared_ptr<Cluster> Context::tryGetCluster(const std::string & cluster_name
46634666
return res;
46644667
}
46654668

4669+
void Context::unregisterInAutodiscoveryClusters()
4670+
{
4671+
std::lock_guard lock(shared->clusters_mutex);
4672+
if (!shared->cluster_discovery)
4673+
return;
4674+
shared->cluster_discovery->unregisterAll();
4675+
}
4676+
4677+
void Context::registerInAutodiscoveryClusters()
4678+
{
4679+
std::lock_guard lock(shared->clusters_mutex);
4680+
if (!shared->cluster_discovery)
4681+
return;
4682+
shared->cluster_discovery->registerAll();
4683+
}
46664684

46674685
void Context::reloadClusterConfig() const
46684686
{
@@ -5540,12 +5558,35 @@ void Context::stopServers(const ServerType & server_type) const
55405558
shared->stop_servers_callback(server_type);
55415559
}
55425560

5543-
55445561
void Context::shutdown() TSA_NO_THREAD_SAFETY_ANALYSIS
55455562
{
55465563
shared->shutdown();
55475564
}
55485565

5566+
bool Context::stopSwarmMode()
5567+
{
5568+
bool expected_is_enabled = true;
5569+
bool is_stopped_now = shared->swarm_mode_enabled.compare_exchange_strong(expected_is_enabled, false);
5570+
if (is_stopped_now)
5571+
CurrentMetrics::set(CurrentMetrics::IsSwarmModeEnabled, 0);
5572+
// return true if stop successful
5573+
return is_stopped_now;
5574+
}
5575+
5576+
bool Context::startSwarmMode()
5577+
{
5578+
bool expected_is_enabled = false;
5579+
bool is_started_now = shared->swarm_mode_enabled.compare_exchange_strong(expected_is_enabled, true);
5580+
if (is_started_now)
5581+
CurrentMetrics::set(CurrentMetrics::IsSwarmModeEnabled, 1);
5582+
// return true if start successful
5583+
return is_started_now;
5584+
}
5585+
5586+
bool Context::isSwarmModeEnabled() const
5587+
{
5588+
return shared->swarm_mode_enabled;
5589+
}
55495590

55505591
Context::ApplicationType Context::getApplicationType() const
55515592
{

src/Interpreters/Context.h

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1296,6 +1296,8 @@ class Context: public ContextData, public std::enable_shared_from_this<Context>
12961296
size_t getClustersVersion() const;
12971297

12981298
void startClusterDiscovery();
1299+
void registerInAutodiscoveryClusters();
1300+
void unregisterInAutodiscoveryClusters();
12991301

13001302
/// Sets custom cluster, but doesn't update configuration
13011303
void setCluster(const String & cluster_name, const std::shared_ptr<Cluster> & cluster);
@@ -1408,6 +1410,15 @@ class Context: public ContextData, public std::enable_shared_from_this<Context>
14081410

14091411
void shutdown();
14101412

1413+
/// Stop some works to allow graceful shutdown later.
1414+
/// Returns true if stop successful.
1415+
bool stopSwarmMode();
1416+
/// Resume some works if we change our mind.
1417+
/// Returns true if start successful.
1418+
bool startSwarmMode();
1419+
/// Return current swarm mode state.
1420+
bool isSwarmModeEnabled() const;
1421+
14111422
bool isInternalQuery() const { return is_internal_query; }
14121423
void setInternalQuery(bool internal) { is_internal_query = internal; }
14131424

src/Interpreters/InterpreterSystemQuery.cpp

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -715,6 +715,20 @@ BlockIO InterpreterSystemQuery::execute()
715715
case Type::START_MOVES:
716716
startStopAction(ActionLocks::PartsMove, true);
717717
break;
718+
case Type::STOP_SWARM_MODE:
719+
{
720+
getContext()->checkAccess(AccessType::SYSTEM_SWARM);
721+
if (getContext()->stopSwarmMode())
722+
getContext()->unregisterInAutodiscoveryClusters();
723+
break;
724+
}
725+
case Type::START_SWARM_MODE:
726+
{
727+
getContext()->checkAccess(AccessType::SYSTEM_SWARM);
728+
if (getContext()->startSwarmMode())
729+
getContext()->registerInAutodiscoveryClusters();
730+
break;
731+
}
718732
case Type::STOP_FETCHES:
719733
startStopAction(ActionLocks::PartsFetch, false);
720734
break;
@@ -1623,6 +1637,12 @@ AccessRightsElements InterpreterSystemQuery::getRequiredAccessForDDLOnCluster()
16231637
required_access.emplace_back(AccessType::SYSTEM_MOVES, query.getDatabase(), query.getTable());
16241638
break;
16251639
}
1640+
case Type::STOP_SWARM_MODE:
1641+
case Type::START_SWARM_MODE:
1642+
{
1643+
required_access.emplace_back(AccessType::SYSTEM_SWARM);
1644+
break;
1645+
}
16261646
case Type::STOP_PULLING_REPLICATION_LOG:
16271647
case Type::START_PULLING_REPLICATION_LOG:
16281648
{

src/Parsers/ASTSystemQuery.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -495,6 +495,8 @@ void ASTSystemQuery::formatImpl(WriteBuffer & ostr, const FormatSettings & setti
495495
case Type::DROP_PAGE_CACHE:
496496
case Type::STOP_REPLICATED_DDL_QUERIES:
497497
case Type::START_REPLICATED_DDL_QUERIES:
498+
case Type::STOP_SWARM_MODE:
499+
case Type::START_SWARM_MODE:
498500
break;
499501
case Type::UNKNOWN:
500502
case Type::END:

0 commit comments

Comments
 (0)