Skip to content

Commit 53d1f5e

Browse files
authored
Merge pull request #972 from Altinity/feature/antalya-25.6.5/s3cluster_global_join
2 parents 1633469 + c0468e7 commit 53d1f5e

File tree

9 files changed

+372
-4
lines changed

9 files changed

+372
-4
lines changed

src/Core/Settings.cpp

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1713,6 +1713,22 @@ Possible values:
17131713
- `global` — Replaces the `IN`/`JOIN` query with `GLOBAL IN`/`GLOBAL JOIN.`
17141714
- `allow` — Allows the use of these types of subqueries.
17151715
)", IMPORTANT) \
1716+
DECLARE(ObjectStorageClusterJoinMode, object_storage_cluster_join_mode, ObjectStorageClusterJoinMode::ALLOW, R"(
1717+
Changes the behaviour of object storage cluster function or table.
1718+
1719+
ClickHouse applies this setting when the query contains the product of object storage cluster function ot table, i.e. when the query for a object storage cluster function ot table contains a non-GLOBAL subquery for the object storage cluster function ot table.
1720+
1721+
Restrictions:
1722+
1723+
- Only applied for JOIN subqueries.
1724+
- Only if the FROM section uses a object storage cluster function ot table.
1725+
1726+
Possible values:
1727+
1728+
- `local` — Replaces the database and table in the subquery with local ones for the destination server (shard), leaving the normal `IN`/`JOIN.`
1729+
- `global` — Unsupported for now. Replaces the `IN`/`JOIN` query with `GLOBAL IN`/`GLOBAL JOIN.`
1730+
- `allow` — Default value. Allows the use of these types of subqueries.
1731+
)", 0) \
17161732
\
17171733
DECLARE(UInt64, max_concurrent_queries_for_all_users, 0, R"(
17181734
Throw exception if the value of this setting is less or equal than the current number of simultaneously processed queries.

src/Core/Settings.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ class WriteBuffer;
5858
M(CLASS_NAME, DistributedCachePoolBehaviourOnLimit) /* Cloud only */ \
5959
M(CLASS_NAME, DistributedDDLOutputMode) \
6060
M(CLASS_NAME, DistributedProductMode) \
61+
M(CLASS_NAME, ObjectStorageClusterJoinMode) \
6162
M(CLASS_NAME, Double) \
6263
M(CLASS_NAME, EscapingRule) \
6364
M(CLASS_NAME, Float) \

src/Core/SettingsChangesHistory.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,7 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory()
7676
{"lock_object_storage_task_distribution_ms", 0, 0, "New setting."},
7777
{"object_storage_cluster", "", "", "New setting"},
7878
{"object_storage_max_nodes", 0, 0, "New setting"},
79+
{"object_storage_cluster_join_mode", "allow", "allow", "New setting"},
7980
{"object_storage_remote_initiator", false, false, "New setting."},
8081
{"allow_experimental_export_merge_tree_part", false, false, "New setting."},
8182
});

src/Core/SettingsEnums.cpp

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,11 @@ IMPLEMENT_SETTING_ENUM(DistributedProductMode, ErrorCodes::UNKNOWN_DISTRIBUTED_P
9090
{"global", DistributedProductMode::GLOBAL},
9191
{"allow", DistributedProductMode::ALLOW}})
9292

93+
IMPLEMENT_SETTING_ENUM(ObjectStorageClusterJoinMode, ErrorCodes::BAD_ARGUMENTS,
94+
{{"local", ObjectStorageClusterJoinMode::LOCAL},
95+
{"global", ObjectStorageClusterJoinMode::GLOBAL},
96+
{"allow", ObjectStorageClusterJoinMode::ALLOW}})
97+
9398

9499
IMPLEMENT_SETTING_ENUM(QueryResultCacheNondeterministicFunctionHandling, ErrorCodes::BAD_ARGUMENTS,
95100
{{"throw", QueryResultCacheNondeterministicFunctionHandling::Throw},

src/Core/SettingsEnums.h

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -163,6 +163,16 @@ enum class DistributedProductMode : uint8_t
163163

164164
DECLARE_SETTING_ENUM(DistributedProductMode)
165165

166+
/// The setting for executing object storage cluster function ot table JOIN sections.
167+
enum class ObjectStorageClusterJoinMode : uint8_t
168+
{
169+
LOCAL, /// Convert to local query
170+
GLOBAL, /// Convert to global query
171+
ALLOW /// Enable
172+
};
173+
174+
DECLARE_SETTING_ENUM(ObjectStorageClusterJoinMode)
175+
166176
/// How the query result cache handles queries with non-deterministic functions, e.g. now()
167177
enum class QueryResultCacheNondeterministicFunctionHandling : uint8_t
168178
{

src/Planner/PlannerJoinTree.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1311,6 +1311,8 @@ JoinTreeQueryPlan buildQueryPlanForTableExpression(QueryTreeNodePtr table_expres
13111311
/// Hopefully there is no other case when we read from Distributed up to FetchColumns.
13121312
if (table_node && table_node->getStorage()->isRemote() && select_query_options.to_stage == QueryProcessingStage::FetchColumns)
13131313
updated_actions_dag_outputs.push_back(output_node);
1314+
else if (table_function_node && table_function_node->getStorage()->isRemote())
1315+
updated_actions_dag_outputs.push_back(output_node);
13141316
}
13151317
else
13161318
updated_actions_dag_outputs.push_back(&rename_actions_dag.addAlias(*output_node, *column_identifier));

src/Storages/IStorageCluster.cpp

Lines changed: 214 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,11 @@
2626
#include <Storages/IStorage.h>
2727
#include <Storages/SelectQueryInfo.h>
2828
#include <Storages/StorageDictionary.h>
29+
#include <Planner/Utils.h>
30+
#include <Analyzer/QueryTreeBuilder.h>
31+
#include <Analyzer/QueryNode.h>
32+
#include <Analyzer/ColumnNode.h>
33+
#include <Analyzer/InDepthQueryTreeVisitor.h>
2934
#include <Storages/StorageDistributed.h>
3035
#include <TableFunctions/TableFunctionFactory.h>
3136
#include <Storages/extractTableFunctionFromSelectQuery.h>
@@ -47,12 +52,14 @@ namespace Setting
4752
extern const SettingsString cluster_for_parallel_replicas;
4853
extern const SettingsNonZeroUInt64 max_parallel_replicas;
4954
extern const SettingsUInt64 object_storage_max_nodes;
55+
extern const SettingsObjectStorageClusterJoinMode object_storage_cluster_join_mode;
5056
extern const SettingsBool object_storage_remote_initiator;
5157
}
5258

5359
namespace ErrorCodes
5460
{
5561
extern const int NOT_IMPLEMENTED;
62+
extern const int LOGICAL_ERROR;
5663
}
5764

5865
namespace ErrorCodes
@@ -89,6 +96,175 @@ void ReadFromCluster::createExtension(const ActionsDAG::Node * predicate)
8996
extension = storage->getTaskIteratorExtension(predicate, filter_actions_dag, context, cluster);
9097
}
9198

99+
namespace
100+
{
101+
102+
/*
103+
Helping class to find in query tree first node of required type
104+
*/
105+
class SearcherVisitor : public InDepthQueryTreeVisitorWithContext<SearcherVisitor>
106+
{
107+
public:
108+
using Base = InDepthQueryTreeVisitorWithContext<SearcherVisitor>;
109+
using Base::Base;
110+
111+
explicit SearcherVisitor(QueryTreeNodeType type_, ContextPtr context) : Base(context), type(type_) {}
112+
113+
bool needChildVisit(QueryTreeNodePtr &, QueryTreeNodePtr & /*child*/)
114+
{
115+
return !passed_node;
116+
}
117+
118+
void enterImpl(QueryTreeNodePtr & node)
119+
{
120+
if (passed_node)
121+
return;
122+
123+
auto node_type = node->getNodeType();
124+
125+
if (node_type == type)
126+
passed_node = node;
127+
}
128+
129+
QueryTreeNodePtr getNode() const { return passed_node; }
130+
131+
private:
132+
QueryTreeNodeType type;
133+
QueryTreeNodePtr passed_node;
134+
};
135+
136+
/*
137+
Helping class to find all used columns with specific source
138+
*/
139+
class CollectUsedColumnsForSourceVisitor : public InDepthQueryTreeVisitorWithContext<CollectUsedColumnsForSourceVisitor>
140+
{
141+
public:
142+
using Base = InDepthQueryTreeVisitorWithContext<CollectUsedColumnsForSourceVisitor>;
143+
using Base::Base;
144+
145+
explicit CollectUsedColumnsForSourceVisitor(
146+
QueryTreeNodePtr source_,
147+
ContextPtr context,
148+
bool collect_columns_from_other_sources_ = false)
149+
: Base(context)
150+
, source(source_)
151+
, collect_columns_from_other_sources(collect_columns_from_other_sources_)
152+
{}
153+
154+
void enterImpl(QueryTreeNodePtr & node)
155+
{
156+
auto node_type = node->getNodeType();
157+
158+
if (node_type != QueryTreeNodeType::COLUMN)
159+
return;
160+
161+
auto & column_node = node->as<ColumnNode &>();
162+
auto column_source = column_node.getColumnSourceOrNull();
163+
if (!column_source)
164+
return;
165+
166+
if ((column_source == source) != collect_columns_from_other_sources)
167+
{
168+
const auto & name = column_node.getColumnName();
169+
if (!names.count(name))
170+
{
171+
columns.emplace_back(column_node.getColumn());
172+
names.insert(name);
173+
}
174+
}
175+
}
176+
177+
const NamesAndTypes & getColumns() const { return columns; }
178+
179+
private:
180+
std::unordered_set<std::string> names;
181+
QueryTreeNodePtr source;
182+
NamesAndTypes columns;
183+
bool collect_columns_from_other_sources;
184+
};
185+
186+
};
187+
188+
/*
189+
Try to make subquery to send on nodes
190+
Converts
191+
192+
SELECT s3.c1, s3.c2, t.c3
193+
FROM
194+
s3Cluster(...) AS s3
195+
JOIN
196+
localtable as t
197+
ON s3.key == t.key
198+
199+
to
200+
201+
SELECT s3.c1, s3.c2, s3.key
202+
FROM
203+
s3Cluster(...) AS s3
204+
*/
205+
void IStorageCluster::updateQueryWithJoinToSendIfNeeded(
206+
ASTPtr & query_to_send,
207+
QueryTreeNodePtr query_tree,
208+
const ContextPtr & context)
209+
{
210+
auto object_storage_cluster_join_mode = context->getSettingsRef()[Setting::object_storage_cluster_join_mode];
211+
switch (object_storage_cluster_join_mode)
212+
{
213+
case ObjectStorageClusterJoinMode::LOCAL:
214+
{
215+
auto modified_query_tree = query_tree->clone();
216+
bool need_modify = false;
217+
218+
SearcherVisitor table_function_searcher(QueryTreeNodeType::TABLE_FUNCTION, context);
219+
table_function_searcher.visit(query_tree);
220+
auto table_function_node = table_function_searcher.getNode();
221+
if (!table_function_node)
222+
throw Exception(ErrorCodes::LOGICAL_ERROR, "Can't find table function node");
223+
224+
if (has_join)
225+
{
226+
auto table_function = extractTableFunctionASTPtrFromSelectQuery(query_to_send);
227+
auto query_tree_distributed = buildTableFunctionQueryTree(table_function, context);
228+
auto & table_function_ast = table_function->as<ASTFunction &>();
229+
query_tree_distributed->setAlias(table_function_ast.alias);
230+
231+
// Find add used columns from table function to make proper projection list
232+
CollectUsedColumnsForSourceVisitor collector(table_function_node, context);
233+
collector.visit(query_tree);
234+
const auto & columns = collector.getColumns();
235+
236+
auto & query_node = modified_query_tree->as<QueryNode &>();
237+
query_node.resolveProjectionColumns(columns);
238+
auto column_nodes_to_select = std::make_shared<ListNode>();
239+
column_nodes_to_select->getNodes().reserve(columns.size());
240+
for (auto & column : columns)
241+
column_nodes_to_select->getNodes().emplace_back(std::make_shared<ColumnNode>(column, table_function_node));
242+
query_node.getProjectionNode() = column_nodes_to_select;
243+
244+
// Left only table function to send on cluster nodes
245+
modified_query_tree = modified_query_tree->cloneAndReplace(query_node.getJoinTree(), query_tree_distributed);
246+
247+
need_modify = true;
248+
}
249+
250+
if (has_local_columns_in_where)
251+
{
252+
auto & query_node = modified_query_tree->as<QueryNode &>();
253+
query_node.getWhere() = {};
254+
}
255+
256+
if (need_modify)
257+
query_to_send = queryNodeToDistributedSelectQuery(modified_query_tree);
258+
return;
259+
}
260+
case ObjectStorageClusterJoinMode::GLOBAL:
261+
// TODO
262+
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "`Global` mode for `object_storage_cluster_join_mode` setting is unimplemented for now");
263+
case ObjectStorageClusterJoinMode::ALLOW: // Do nothing special
264+
return;
265+
}
266+
}
267+
92268
/// The code executes on initiator
93269
void IStorageCluster::read(
94270
QueryPlan & query_plan,
@@ -119,13 +295,15 @@ void IStorageCluster::read(
119295
Block sample_block;
120296
ASTPtr query_to_send = query_info.query;
121297

298+
updateQueryWithJoinToSendIfNeeded(query_to_send, query_info.query_tree, context);
299+
122300
if (settings[Setting::allow_experimental_analyzer])
123301
{
124-
sample_block = InterpreterSelectQueryAnalyzer::getSampleBlock(query_info.query, context, SelectQueryOptions(processed_stage));
302+
sample_block = InterpreterSelectQueryAnalyzer::getSampleBlock(query_to_send, context, SelectQueryOptions(processed_stage));
125303
}
126304
else
127305
{
128-
auto interpreter = InterpreterSelectQuery(query_info.query, context, SelectQueryOptions(processed_stage).analyze());
306+
auto interpreter = InterpreterSelectQuery(query_to_send, context, SelectQueryOptions(processed_stage).analyze());
129307
sample_block = interpreter.getSampleBlock();
130308
query_to_send = interpreter.getQueryInfo().query->clone();
131309
}
@@ -144,7 +322,7 @@ void IStorageCluster::read(
144322
}
145323

146324
RestoreQualifiedNamesVisitor::Data data;
147-
data.distributed_table = DatabaseAndTableWithAlias(*getTableExpression(query_info.query->as<ASTSelectQuery &>(), 0));
325+
data.distributed_table = DatabaseAndTableWithAlias(*getTableExpression(query_to_send->as<ASTSelectQuery &>(), 0));
148326
data.remote_table.database = context->getCurrentDatabase();
149327
data.remote_table.table = getName();
150328
RestoreQualifiedNamesVisitor(data).visit(query_to_send);
@@ -309,8 +487,40 @@ void ReadFromCluster::initializePipeline(QueryPipelineBuilder & pipeline, const
309487
}
310488

311489
QueryProcessingStage::Enum IStorageCluster::getQueryProcessingStage(
312-
ContextPtr context, QueryProcessingStage::Enum to_stage, const StorageSnapshotPtr &, SelectQueryInfo &) const
490+
ContextPtr context, QueryProcessingStage::Enum to_stage, const StorageSnapshotPtr &, SelectQueryInfo & query_info) const
313491
{
492+
auto object_storage_cluster_join_mode = context->getSettingsRef()[Setting::object_storage_cluster_join_mode];
493+
494+
if (object_storage_cluster_join_mode != ObjectStorageClusterJoinMode::ALLOW)
495+
{
496+
if (!context->getSettingsRef()[Setting::allow_experimental_analyzer])
497+
throw Exception(ErrorCodes::NOT_IMPLEMENTED,
498+
"object_storage_cluster_join_mode!='allow' is not supported without allow_experimental_analyzer=true");
499+
500+
SearcherVisitor join_searcher(QueryTreeNodeType::JOIN, context);
501+
join_searcher.visit(query_info.query_tree);
502+
if (join_searcher.getNode())
503+
has_join = true;
504+
505+
SearcherVisitor table_function_searcher(QueryTreeNodeType::TABLE_FUNCTION, context);
506+
table_function_searcher.visit(query_info.query_tree);
507+
auto table_function_node = table_function_searcher.getNode();
508+
if (!table_function_node)
509+
throw Exception(ErrorCodes::LOGICAL_ERROR, "Can't find table function node");
510+
511+
CollectUsedColumnsForSourceVisitor collector_where(table_function_node, context, true);
512+
auto & query_node = query_info.query_tree->as<QueryNode &>();
513+
if (query_node.hasWhere())
514+
collector_where.visit(query_node.getWhere());
515+
516+
// Can't use 'WHERE' on remote node if it contains columns from other sources
517+
if (!collector_where.getColumns().empty())
518+
has_local_columns_in_where = true;
519+
520+
if (has_join || has_local_columns_in_where)
521+
return QueryProcessingStage::Enum::FetchColumns;
522+
}
523+
314524
/// Initiator executes query on remote node.
315525
if (context->getClientInfo().query_kind == ClientInfo::QueryKind::INITIAL_QUERY)
316526
if (to_stage >= QueryProcessingStage::Enum::WithMergeableState)

src/Storages/IStorageCluster.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ class IStorageCluster : public IStorage
6161

6262
protected:
6363
virtual void updateQueryToSendIfNeeded(ASTPtr & /*query*/, const StorageSnapshotPtr & /*storage_snapshot*/, const ContextPtr & /*context*/) {}
64+
void updateQueryWithJoinToSendIfNeeded(ASTPtr & query_to_send, QueryTreeNodePtr query_tree, const ContextPtr & context);
6465

6566
struct RemoteCallVariables
6667
{
@@ -101,6 +102,9 @@ class IStorageCluster : public IStorage
101102

102103
LoggerPtr log;
103104
String cluster_name;
105+
106+
mutable bool has_join = false;
107+
mutable bool has_local_columns_in_where = false;
104108
};
105109

106110

0 commit comments

Comments
 (0)