-
Notifications
You must be signed in to change notification settings - Fork 8
s3cluster joins, part 1 #972
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
a8041af
dc71790
628b035
77b4f1f
70f47ed
6cc62ff
c0468e7
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -1713,6 +1713,22 @@ Possible values: | |||||
- `global` — Replaces the `IN`/`JOIN` query with `GLOBAL IN`/`GLOBAL JOIN.` | ||||||
- `allow` — Allows the use of these types of subqueries. | ||||||
)", IMPORTANT) \ | ||||||
DECLARE(ObjectStorageClusterJoinMode, object_storage_cluster_join_mode, ObjectStorageClusterJoinMode::ALLOW, R"( | ||||||
Changes the behaviour of object storage cluster function or table. | ||||||
|
||||||
ClickHouse applies this setting when the query contains the product of object storage cluster function ot table, i.e. when the query for a object storage cluster function ot table contains a non-GLOBAL subquery for the object storage cluster function ot table. | ||||||
|
||||||
Restrictions: | ||||||
|
||||||
- Only applied for JOIN subqueries. | ||||||
- Only if the FROM section uses a object storage cluster function ot table. | ||||||
|
||||||
Possible values: | ||||||
|
||||||
- `local` — Replaces the database and table in the subquery with local ones for the destination server (shard), leaving the normal `IN`/`JOIN.` | ||||||
- `global` — Unsupported for now. Replaces the `IN`/`JOIN` query with `GLOBAL IN`/`GLOBAL JOIN.` | ||||||
- `allow` — Default value. Allows the use of these types of subqueries. | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
should we also support 'forbid' ? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. BTW - what will happen when joining xxxCluster with non-distributed table? Maybe a test? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No, it's a default old behavour - query executed on remote nodes "as is" and join executed there. |
||||||
)", 0) \ | ||||||
\ | ||||||
DECLARE(UInt64, max_concurrent_queries_for_all_users, 0, R"( | ||||||
Throw exception if the value of this setting is less or equal than the current number of simultaneously processed queries. | ||||||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -26,6 +26,11 @@ | |
#include <Storages/IStorage.h> | ||
#include <Storages/SelectQueryInfo.h> | ||
#include <Storages/StorageDictionary.h> | ||
#include <Planner/Utils.h> | ||
#include <Analyzer/QueryTreeBuilder.h> | ||
#include <Analyzer/QueryNode.h> | ||
#include <Analyzer/ColumnNode.h> | ||
#include <Analyzer/InDepthQueryTreeVisitor.h> | ||
#include <Storages/StorageDistributed.h> | ||
#include <TableFunctions/TableFunctionFactory.h> | ||
#include <Storages/extractTableFunctionFromSelectQuery.h> | ||
|
@@ -47,12 +52,14 @@ namespace Setting | |
extern const SettingsString cluster_for_parallel_replicas; | ||
extern const SettingsNonZeroUInt64 max_parallel_replicas; | ||
extern const SettingsUInt64 object_storage_max_nodes; | ||
extern const SettingsObjectStorageClusterJoinMode object_storage_cluster_join_mode; | ||
extern const SettingsBool object_storage_remote_initiator; | ||
} | ||
|
||
namespace ErrorCodes | ||
{ | ||
extern const int NOT_IMPLEMENTED; | ||
extern const int LOGICAL_ERROR; | ||
} | ||
|
||
namespace ErrorCodes | ||
|
@@ -89,6 +96,175 @@ void ReadFromCluster::createExtension(const ActionsDAG::Node * predicate) | |
extension = storage->getTaskIteratorExtension(predicate, filter_actions_dag, context, cluster); | ||
} | ||
|
||
namespace | ||
{ | ||
|
||
/* | ||
Helping class to find in query tree first node of required type | ||
*/ | ||
class SearcherVisitor : public InDepthQueryTreeVisitorWithContext<SearcherVisitor> | ||
{ | ||
public: | ||
using Base = InDepthQueryTreeVisitorWithContext<SearcherVisitor>; | ||
using Base::Base; | ||
|
||
explicit SearcherVisitor(QueryTreeNodeType type_, ContextPtr context) : Base(context), type(type_) {} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. it looks like the code is only for analyzer (i.e. build on the top of QueryTree, not on the top of AST). It's ok for new features, but maybe it's better to document it clearly. I was also thinking about adding throw when analizer is disabled. WDYT? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Full support for non-anylyzer code can be more challenging (see for example JoinToSubqueryTransformVisitor) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Added exception |
||
|
||
bool needChildVisit(QueryTreeNodePtr &, QueryTreeNodePtr & /*child*/) | ||
{ | ||
return !passed_node; | ||
} | ||
|
||
void enterImpl(QueryTreeNodePtr & node) | ||
{ | ||
if (passed_node) | ||
return; | ||
|
||
auto node_type = node->getNodeType(); | ||
|
||
if (node_type == type) | ||
passed_node = node; | ||
} | ||
|
||
QueryTreeNodePtr getNode() const { return passed_node; } | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Actually i never looked close on the anazyzer code, thought it should be simpler (i.e. avoiding tree traversal, but i can be wrong). |
||
|
||
private: | ||
QueryTreeNodeType type; | ||
QueryTreeNodePtr passed_node; | ||
}; | ||
|
||
/* | ||
Helping class to find all used columns with specific source | ||
*/ | ||
class CollectUsedColumnsForSourceVisitor : public InDepthQueryTreeVisitorWithContext<CollectUsedColumnsForSourceVisitor> | ||
{ | ||
public: | ||
using Base = InDepthQueryTreeVisitorWithContext<CollectUsedColumnsForSourceVisitor>; | ||
using Base::Base; | ||
|
||
explicit CollectUsedColumnsForSourceVisitor( | ||
QueryTreeNodePtr source_, | ||
ContextPtr context, | ||
bool collect_columns_from_other_sources_ = false) | ||
: Base(context) | ||
, source(source_) | ||
, collect_columns_from_other_sources(collect_columns_from_other_sources_) | ||
{} | ||
|
||
void enterImpl(QueryTreeNodePtr & node) | ||
{ | ||
auto node_type = node->getNodeType(); | ||
|
||
if (node_type != QueryTreeNodeType::COLUMN) | ||
return; | ||
|
||
auto & column_node = node->as<ColumnNode &>(); | ||
auto column_source = column_node.getColumnSourceOrNull(); | ||
if (!column_source) | ||
return; | ||
|
||
if ((column_source == source) != collect_columns_from_other_sources) | ||
{ | ||
const auto & name = column_node.getColumnName(); | ||
if (!names.count(name)) | ||
{ | ||
columns.emplace_back(column_node.getColumn()); | ||
names.insert(name); | ||
} | ||
} | ||
} | ||
|
||
const NamesAndTypes & getColumns() const { return columns; } | ||
|
||
private: | ||
std::unordered_set<std::string> names; | ||
QueryTreeNodePtr source; | ||
NamesAndTypes columns; | ||
bool collect_columns_from_other_sources; | ||
}; | ||
|
||
}; | ||
|
||
/* | ||
Try to make subquery to send on nodes | ||
Converts | ||
|
||
SELECT s3.c1, s3.c2, t.c3 | ||
FROM | ||
s3Cluster(...) AS s3 | ||
JOIN | ||
localtable as t | ||
ON s3.key == t.key | ||
|
||
to | ||
|
||
SELECT s3.c1, s3.c2, s3.key | ||
FROM | ||
s3Cluster(...) AS s3 | ||
*/ | ||
void IStorageCluster::updateQueryWithJoinToSendIfNeeded( | ||
ASTPtr & query_to_send, | ||
QueryTreeNodePtr query_tree, | ||
const ContextPtr & context) | ||
{ | ||
auto object_storage_cluster_join_mode = context->getSettingsRef()[Setting::object_storage_cluster_join_mode]; | ||
switch (object_storage_cluster_join_mode) | ||
{ | ||
case ObjectStorageClusterJoinMode::LOCAL: | ||
{ | ||
auto modified_query_tree = query_tree->clone(); | ||
bool need_modify = false; | ||
|
||
SearcherVisitor table_function_searcher(QueryTreeNodeType::TABLE_FUNCTION, context); | ||
table_function_searcher.visit(query_tree); | ||
auto table_function_node = table_function_searcher.getNode(); | ||
if (!table_function_node) | ||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Can't find table function node"); | ||
|
||
if (has_join) | ||
{ | ||
auto table_function = extractTableFunctionASTPtrFromSelectQuery(query_to_send); | ||
auto query_tree_distributed = buildTableFunctionQueryTree(table_function, context); | ||
auto & table_function_ast = table_function->as<ASTFunction &>(); | ||
query_tree_distributed->setAlias(table_function_ast.alias); | ||
|
||
// Find add used columns from table function to make proper projection list | ||
CollectUsedColumnsForSourceVisitor collector(table_function_node, context); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe things like tryResolveIdentifierFromJoinTreeNode / tryResolveIdentifierFromJoinTree / tryResolveIdentifierFromTableExpression can be reused. |
||
collector.visit(query_tree); | ||
const auto & columns = collector.getColumns(); | ||
|
||
auto & query_node = modified_query_tree->as<QueryNode &>(); | ||
query_node.resolveProjectionColumns(columns); | ||
auto column_nodes_to_select = std::make_shared<ListNode>(); | ||
column_nodes_to_select->getNodes().reserve(columns.size()); | ||
for (auto & column : columns) | ||
column_nodes_to_select->getNodes().emplace_back(std::make_shared<ColumnNode>(column, table_function_node)); | ||
query_node.getProjectionNode() = column_nodes_to_select; | ||
|
||
// Left only table function to send on cluster nodes | ||
modified_query_tree = modified_query_tree->cloneAndReplace(query_node.getJoinTree(), query_tree_distributed); | ||
|
||
need_modify = true; | ||
} | ||
|
||
if (has_local_columns_in_where) | ||
{ | ||
auto & query_node = modified_query_tree->as<QueryNode &>(); | ||
query_node.getWhere() = {}; | ||
} | ||
|
||
if (need_modify) | ||
query_to_send = queryNodeToDistributedSelectQuery(modified_query_tree); | ||
return; | ||
} | ||
case ObjectStorageClusterJoinMode::GLOBAL: | ||
// TODO | ||
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "`Global` mode for `object_storage_cluster_join_mode` setting is unimplemented for now"); | ||
case ObjectStorageClusterJoinMode::ALLOW: // Do nothing special | ||
return; | ||
} | ||
} | ||
|
||
/// The code executes on initiator | ||
void IStorageCluster::read( | ||
QueryPlan & query_plan, | ||
|
@@ -119,13 +295,15 @@ void IStorageCluster::read( | |
Block sample_block; | ||
ASTPtr query_to_send = query_info.query; | ||
|
||
updateQueryWithJoinToSendIfNeeded(query_to_send, query_info.query_tree, context); | ||
|
||
if (settings[Setting::allow_experimental_analyzer]) | ||
{ | ||
sample_block = InterpreterSelectQueryAnalyzer::getSampleBlock(query_info.query, context, SelectQueryOptions(processed_stage)); | ||
sample_block = InterpreterSelectQueryAnalyzer::getSampleBlock(query_to_send, context, SelectQueryOptions(processed_stage)); | ||
} | ||
else | ||
{ | ||
auto interpreter = InterpreterSelectQuery(query_info.query, context, SelectQueryOptions(processed_stage).analyze()); | ||
auto interpreter = InterpreterSelectQuery(query_to_send, context, SelectQueryOptions(processed_stage).analyze()); | ||
sample_block = interpreter.getSampleBlock(); | ||
query_to_send = interpreter.getQueryInfo().query->clone(); | ||
} | ||
|
@@ -144,7 +322,7 @@ void IStorageCluster::read( | |
} | ||
|
||
RestoreQualifiedNamesVisitor::Data data; | ||
data.distributed_table = DatabaseAndTableWithAlias(*getTableExpression(query_info.query->as<ASTSelectQuery &>(), 0)); | ||
data.distributed_table = DatabaseAndTableWithAlias(*getTableExpression(query_to_send->as<ASTSelectQuery &>(), 0)); | ||
data.remote_table.database = context->getCurrentDatabase(); | ||
data.remote_table.table = getName(); | ||
RestoreQualifiedNamesVisitor(data).visit(query_to_send); | ||
|
@@ -309,8 +487,40 @@ void ReadFromCluster::initializePipeline(QueryPipelineBuilder & pipeline, const | |
} | ||
|
||
QueryProcessingStage::Enum IStorageCluster::getQueryProcessingStage( | ||
ContextPtr context, QueryProcessingStage::Enum to_stage, const StorageSnapshotPtr &, SelectQueryInfo &) const | ||
ContextPtr context, QueryProcessingStage::Enum to_stage, const StorageSnapshotPtr &, SelectQueryInfo & query_info) const | ||
{ | ||
auto object_storage_cluster_join_mode = context->getSettingsRef()[Setting::object_storage_cluster_join_mode]; | ||
|
||
if (object_storage_cluster_join_mode != ObjectStorageClusterJoinMode::ALLOW) | ||
{ | ||
if (!context->getSettingsRef()[Setting::allow_experimental_analyzer]) | ||
throw Exception(ErrorCodes::NOT_IMPLEMENTED, | ||
"object_storage_cluster_join_mode!='allow' is not supported without allow_experimental_analyzer=true"); | ||
|
||
SearcherVisitor join_searcher(QueryTreeNodeType::JOIN, context); | ||
join_searcher.visit(query_info.query_tree); | ||
if (join_searcher.getNode()) | ||
has_join = true; | ||
|
||
SearcherVisitor table_function_searcher(QueryTreeNodeType::TABLE_FUNCTION, context); | ||
table_function_searcher.visit(query_info.query_tree); | ||
auto table_function_node = table_function_searcher.getNode(); | ||
if (!table_function_node) | ||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Can't find table function node"); | ||
|
||
CollectUsedColumnsForSourceVisitor collector_where(table_function_node, context, true); | ||
auto & query_node = query_info.query_tree->as<QueryNode &>(); | ||
if (query_node.hasWhere()) | ||
collector_where.visit(query_node.getWhere()); | ||
|
||
// Can't use 'WHERE' on remote node if it contains columns from other sources | ||
if (!collector_where.getColumns().empty()) | ||
has_local_columns_in_where = true; | ||
|
||
if (has_join || has_local_columns_in_where) | ||
return QueryProcessingStage::Enum::FetchColumns; | ||
} | ||
|
||
/// Initiator executes query on remote node. | ||
if (context->getClientInfo().query_kind == ClientInfo::QueryKind::INITIAL_QUERY) | ||
if (to_stage >= QueryProcessingStage::Enum::WithMergeableState) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if it is unsupported, then why was it added? I suggest ditching that value from enum
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's for part 2, when right table is calculated on initiator and is sent as part of the query to the swarm nodes.