diff --git a/src/Core/Settings.cpp b/src/Core/Settings.cpp index 76fdaa95c68d..efebc5e7586e 100644 --- a/src/Core/Settings.cpp +++ b/src/Core/Settings.cpp @@ -1713,6 +1713,22 @@ Possible values: - `global` — Replaces the `IN`/`JOIN` query with `GLOBAL IN`/`GLOBAL JOIN.` - `allow` — Allows the use of these types of subqueries. )", IMPORTANT) \ + DECLARE(ObjectStorageClusterJoinMode, object_storage_cluster_join_mode, ObjectStorageClusterJoinMode::ALLOW, R"( +Changes the behaviour of object storage cluster function or table. + +ClickHouse applies this setting when the query contains the product of object storage cluster function ot table, i.e. when the query for a object storage cluster function ot table contains a non-GLOBAL subquery for the object storage cluster function ot table. + +Restrictions: + +- Only applied for JOIN subqueries. +- Only if the FROM section uses a object storage cluster function ot table. + +Possible values: + +- `local` — Replaces the database and table in the subquery with local ones for the destination server (shard), leaving the normal `IN`/`JOIN.` +- `global` — Unsupported for now. Replaces the `IN`/`JOIN` query with `GLOBAL IN`/`GLOBAL JOIN.` +- `allow` — Default value. Allows the use of these types of subqueries. +)", 0) \ \ DECLARE(UInt64, max_concurrent_queries_for_all_users, 0, R"( Throw exception if the value of this setting is less or equal than the current number of simultaneously processed queries. diff --git a/src/Core/Settings.h b/src/Core/Settings.h index c93a1589c44d..1ade278234fb 100644 --- a/src/Core/Settings.h +++ b/src/Core/Settings.h @@ -58,6 +58,7 @@ class WriteBuffer; M(CLASS_NAME, DistributedCachePoolBehaviourOnLimit) /* Cloud only */ \ M(CLASS_NAME, DistributedDDLOutputMode) \ M(CLASS_NAME, DistributedProductMode) \ + M(CLASS_NAME, ObjectStorageClusterJoinMode) \ M(CLASS_NAME, Double) \ M(CLASS_NAME, EscapingRule) \ M(CLASS_NAME, Float) \ diff --git a/src/Core/SettingsChangesHistory.cpp b/src/Core/SettingsChangesHistory.cpp index 85a3965af357..c755970d1a51 100644 --- a/src/Core/SettingsChangesHistory.cpp +++ b/src/Core/SettingsChangesHistory.cpp @@ -76,6 +76,7 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory() {"lock_object_storage_task_distribution_ms", 0, 0, "New setting."}, {"object_storage_cluster", "", "", "New setting"}, {"object_storage_max_nodes", 0, 0, "New setting"}, + {"object_storage_cluster_join_mode", "allow", "allow", "New setting"}, {"object_storage_remote_initiator", false, false, "New setting."}, }); addSettingsChanges(settings_changes_history, "25.6", diff --git a/src/Core/SettingsEnums.cpp b/src/Core/SettingsEnums.cpp index bf6b3e52df62..71e1e20657be 100644 --- a/src/Core/SettingsEnums.cpp +++ b/src/Core/SettingsEnums.cpp @@ -90,6 +90,11 @@ IMPLEMENT_SETTING_ENUM(DistributedProductMode, ErrorCodes::UNKNOWN_DISTRIBUTED_P {"global", DistributedProductMode::GLOBAL}, {"allow", DistributedProductMode::ALLOW}}) +IMPLEMENT_SETTING_ENUM(ObjectStorageClusterJoinMode, ErrorCodes::BAD_ARGUMENTS, + {{"local", ObjectStorageClusterJoinMode::LOCAL}, + {"global", ObjectStorageClusterJoinMode::GLOBAL}, + {"allow", ObjectStorageClusterJoinMode::ALLOW}}) + IMPLEMENT_SETTING_ENUM(QueryResultCacheNondeterministicFunctionHandling, ErrorCodes::BAD_ARGUMENTS, {{"throw", QueryResultCacheNondeterministicFunctionHandling::Throw}, diff --git a/src/Core/SettingsEnums.h b/src/Core/SettingsEnums.h index a030c7fb741e..17ca34695b42 100644 --- a/src/Core/SettingsEnums.h +++ b/src/Core/SettingsEnums.h @@ -163,6 +163,16 @@ enum class DistributedProductMode : uint8_t DECLARE_SETTING_ENUM(DistributedProductMode) +/// The setting for executing object storage cluster function ot table JOIN sections. +enum class ObjectStorageClusterJoinMode : uint8_t +{ + LOCAL, /// Convert to local query + GLOBAL, /// Convert to global query + ALLOW /// Enable +}; + +DECLARE_SETTING_ENUM(ObjectStorageClusterJoinMode) + /// How the query result cache handles queries with non-deterministic functions, e.g. now() enum class QueryResultCacheNondeterministicFunctionHandling : uint8_t { diff --git a/src/Planner/PlannerJoinTree.cpp b/src/Planner/PlannerJoinTree.cpp index 70fb5a73a805..852d24e69cf9 100644 --- a/src/Planner/PlannerJoinTree.cpp +++ b/src/Planner/PlannerJoinTree.cpp @@ -1311,6 +1311,8 @@ JoinTreeQueryPlan buildQueryPlanForTableExpression(QueryTreeNodePtr table_expres /// Hopefully there is no other case when we read from Distributed up to FetchColumns. if (table_node && table_node->getStorage()->isRemote() && select_query_options.to_stage == QueryProcessingStage::FetchColumns) updated_actions_dag_outputs.push_back(output_node); + else if (table_function_node && table_function_node->getStorage()->isRemote()) + updated_actions_dag_outputs.push_back(output_node); } else updated_actions_dag_outputs.push_back(&rename_actions_dag.addAlias(*output_node, *column_identifier)); diff --git a/src/Storages/IStorageCluster.cpp b/src/Storages/IStorageCluster.cpp index 6826eb12627e..5fbce20a7472 100644 --- a/src/Storages/IStorageCluster.cpp +++ b/src/Storages/IStorageCluster.cpp @@ -26,6 +26,11 @@ #include #include #include +#include +#include +#include +#include +#include #include #include #include @@ -47,12 +52,14 @@ namespace Setting extern const SettingsString cluster_for_parallel_replicas; extern const SettingsNonZeroUInt64 max_parallel_replicas; extern const SettingsUInt64 object_storage_max_nodes; + extern const SettingsObjectStorageClusterJoinMode object_storage_cluster_join_mode; extern const SettingsBool object_storage_remote_initiator; } namespace ErrorCodes { extern const int NOT_IMPLEMENTED; + extern const int LOGICAL_ERROR; } namespace ErrorCodes @@ -89,6 +96,175 @@ void ReadFromCluster::createExtension(const ActionsDAG::Node * predicate) extension = storage->getTaskIteratorExtension(predicate, filter_actions_dag, context, cluster); } +namespace +{ + +/* +Helping class to find in query tree first node of required type +*/ +class SearcherVisitor : public InDepthQueryTreeVisitorWithContext +{ +public: + using Base = InDepthQueryTreeVisitorWithContext; + using Base::Base; + + explicit SearcherVisitor(QueryTreeNodeType type_, ContextPtr context) : Base(context), type(type_) {} + + bool needChildVisit(QueryTreeNodePtr &, QueryTreeNodePtr & /*child*/) + { + return !passed_node; + } + + void enterImpl(QueryTreeNodePtr & node) + { + if (passed_node) + return; + + auto node_type = node->getNodeType(); + + if (node_type == type) + passed_node = node; + } + + QueryTreeNodePtr getNode() const { return passed_node; } + +private: + QueryTreeNodeType type; + QueryTreeNodePtr passed_node; +}; + +/* +Helping class to find all used columns with specific source +*/ +class CollectUsedColumnsForSourceVisitor : public InDepthQueryTreeVisitorWithContext +{ +public: + using Base = InDepthQueryTreeVisitorWithContext; + using Base::Base; + + explicit CollectUsedColumnsForSourceVisitor( + QueryTreeNodePtr source_, + ContextPtr context, + bool collect_columns_from_other_sources_ = false) + : Base(context) + , source(source_) + , collect_columns_from_other_sources(collect_columns_from_other_sources_) + {} + + void enterImpl(QueryTreeNodePtr & node) + { + auto node_type = node->getNodeType(); + + if (node_type != QueryTreeNodeType::COLUMN) + return; + + auto & column_node = node->as(); + auto column_source = column_node.getColumnSourceOrNull(); + if (!column_source) + return; + + if ((column_source == source) != collect_columns_from_other_sources) + { + const auto & name = column_node.getColumnName(); + if (!names.count(name)) + { + columns.emplace_back(column_node.getColumn()); + names.insert(name); + } + } + } + + const NamesAndTypes & getColumns() const { return columns; } + +private: + std::unordered_set names; + QueryTreeNodePtr source; + NamesAndTypes columns; + bool collect_columns_from_other_sources; +}; + +}; + +/* +Try to make subquery to send on nodes +Converts + + SELECT s3.c1, s3.c2, t.c3 + FROM + s3Cluster(...) AS s3 + JOIN + localtable as t + ON s3.key == t.key + +to + + SELECT s3.c1, s3.c2, s3.key + FROM + s3Cluster(...) AS s3 +*/ +void IStorageCluster::updateQueryWithJoinToSendIfNeeded( + ASTPtr & query_to_send, + QueryTreeNodePtr query_tree, + const ContextPtr & context) +{ + auto object_storage_cluster_join_mode = context->getSettingsRef()[Setting::object_storage_cluster_join_mode]; + switch (object_storage_cluster_join_mode) + { + case ObjectStorageClusterJoinMode::LOCAL: + { + auto modified_query_tree = query_tree->clone(); + bool need_modify = false; + + SearcherVisitor table_function_searcher(QueryTreeNodeType::TABLE_FUNCTION, context); + table_function_searcher.visit(query_tree); + auto table_function_node = table_function_searcher.getNode(); + if (!table_function_node) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Can't find table function node"); + + if (has_join) + { + auto table_function = extractTableFunctionASTPtrFromSelectQuery(query_to_send); + auto query_tree_distributed = buildTableFunctionQueryTree(table_function, context); + auto & table_function_ast = table_function->as(); + query_tree_distributed->setAlias(table_function_ast.alias); + + // Find add used columns from table function to make proper projection list + CollectUsedColumnsForSourceVisitor collector(table_function_node, context); + collector.visit(query_tree); + const auto & columns = collector.getColumns(); + + auto & query_node = modified_query_tree->as(); + query_node.resolveProjectionColumns(columns); + auto column_nodes_to_select = std::make_shared(); + column_nodes_to_select->getNodes().reserve(columns.size()); + for (auto & column : columns) + column_nodes_to_select->getNodes().emplace_back(std::make_shared(column, table_function_node)); + query_node.getProjectionNode() = column_nodes_to_select; + + // Left only table function to send on cluster nodes + modified_query_tree = modified_query_tree->cloneAndReplace(query_node.getJoinTree(), query_tree_distributed); + + need_modify = true; + } + + if (has_local_columns_in_where) + { + auto & query_node = modified_query_tree->as(); + query_node.getWhere() = {}; + } + + if (need_modify) + query_to_send = queryNodeToDistributedSelectQuery(modified_query_tree); + return; + } + case ObjectStorageClusterJoinMode::GLOBAL: + // TODO + throw Exception(ErrorCodes::NOT_IMPLEMENTED, "`Global` mode for `object_storage_cluster_join_mode` setting is unimplemented for now"); + case ObjectStorageClusterJoinMode::ALLOW: // Do nothing special + return; + } +} + /// The code executes on initiator void IStorageCluster::read( QueryPlan & query_plan, @@ -119,13 +295,15 @@ void IStorageCluster::read( Block sample_block; ASTPtr query_to_send = query_info.query; + updateQueryWithJoinToSendIfNeeded(query_to_send, query_info.query_tree, context); + if (settings[Setting::allow_experimental_analyzer]) { - sample_block = InterpreterSelectQueryAnalyzer::getSampleBlock(query_info.query, context, SelectQueryOptions(processed_stage)); + sample_block = InterpreterSelectQueryAnalyzer::getSampleBlock(query_to_send, context, SelectQueryOptions(processed_stage)); } else { - auto interpreter = InterpreterSelectQuery(query_info.query, context, SelectQueryOptions(processed_stage).analyze()); + auto interpreter = InterpreterSelectQuery(query_to_send, context, SelectQueryOptions(processed_stage).analyze()); sample_block = interpreter.getSampleBlock(); query_to_send = interpreter.getQueryInfo().query->clone(); } @@ -144,7 +322,7 @@ void IStorageCluster::read( } RestoreQualifiedNamesVisitor::Data data; - data.distributed_table = DatabaseAndTableWithAlias(*getTableExpression(query_info.query->as(), 0)); + data.distributed_table = DatabaseAndTableWithAlias(*getTableExpression(query_to_send->as(), 0)); data.remote_table.database = context->getCurrentDatabase(); data.remote_table.table = getName(); RestoreQualifiedNamesVisitor(data).visit(query_to_send); @@ -309,8 +487,40 @@ void ReadFromCluster::initializePipeline(QueryPipelineBuilder & pipeline, const } QueryProcessingStage::Enum IStorageCluster::getQueryProcessingStage( - ContextPtr context, QueryProcessingStage::Enum to_stage, const StorageSnapshotPtr &, SelectQueryInfo &) const + ContextPtr context, QueryProcessingStage::Enum to_stage, const StorageSnapshotPtr &, SelectQueryInfo & query_info) const { + auto object_storage_cluster_join_mode = context->getSettingsRef()[Setting::object_storage_cluster_join_mode]; + + if (object_storage_cluster_join_mode != ObjectStorageClusterJoinMode::ALLOW) + { + if (!context->getSettingsRef()[Setting::allow_experimental_analyzer]) + throw Exception(ErrorCodes::NOT_IMPLEMENTED, + "object_storage_cluster_join_mode!='allow' is not supported without allow_experimental_analyzer=true"); + + SearcherVisitor join_searcher(QueryTreeNodeType::JOIN, context); + join_searcher.visit(query_info.query_tree); + if (join_searcher.getNode()) + has_join = true; + + SearcherVisitor table_function_searcher(QueryTreeNodeType::TABLE_FUNCTION, context); + table_function_searcher.visit(query_info.query_tree); + auto table_function_node = table_function_searcher.getNode(); + if (!table_function_node) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Can't find table function node"); + + CollectUsedColumnsForSourceVisitor collector_where(table_function_node, context, true); + auto & query_node = query_info.query_tree->as(); + if (query_node.hasWhere()) + collector_where.visit(query_node.getWhere()); + + // Can't use 'WHERE' on remote node if it contains columns from other sources + if (!collector_where.getColumns().empty()) + has_local_columns_in_where = true; + + if (has_join || has_local_columns_in_where) + return QueryProcessingStage::Enum::FetchColumns; + } + /// Initiator executes query on remote node. if (context->getClientInfo().query_kind == ClientInfo::QueryKind::INITIAL_QUERY) if (to_stage >= QueryProcessingStage::Enum::WithMergeableState) diff --git a/src/Storages/IStorageCluster.h b/src/Storages/IStorageCluster.h index b7c4b940d394..95a6a68b16ec 100644 --- a/src/Storages/IStorageCluster.h +++ b/src/Storages/IStorageCluster.h @@ -61,6 +61,7 @@ class IStorageCluster : public IStorage protected: virtual void updateQueryToSendIfNeeded(ASTPtr & /*query*/, const StorageSnapshotPtr & /*storage_snapshot*/, const ContextPtr & /*context*/) {} + void updateQueryWithJoinToSendIfNeeded(ASTPtr & query_to_send, QueryTreeNodePtr query_tree, const ContextPtr & context); struct RemoteCallVariables { @@ -101,6 +102,9 @@ class IStorageCluster : public IStorage LoggerPtr log; String cluster_name; + + mutable bool has_join = false; + mutable bool has_local_columns_in_where = false; }; diff --git a/tests/integration/test_s3_cluster/test.py b/tests/integration/test_s3_cluster/test.py index 691aafdd34ee..490e8c5a1f79 100644 --- a/tests/integration/test_s3_cluster/test.py +++ b/tests/integration/test_s3_cluster/test.py @@ -1100,3 +1100,122 @@ def query_cycle(): node_to_shutdown.start_clickhouse() assert errors == 0 + + +def test_joins(started_cluster): + node = started_cluster.instances["s0_0_0"] + + # Table join_table only exists on the node 's0_0_0'. + node.query( + """ + CREATE TABLE IF NOT EXISTS join_table ( + id UInt32, + name String + ) ENGINE=MergeTree() + ORDER BY id; + """ + ) + + node.query( + f""" + INSERT INTO join_table + SELECT value, concat(name, '_jt') FROM s3Cluster('cluster_simple', + 'http://minio1:9001/root/data/{{clickhouse,database}}/*', 'minio', '{minio_secret_key}', 'CSV', + 'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))'); + """ + ) + + result1 = node.query( + f""" + SELECT t1.name, t2.name FROM + s3Cluster('cluster_simple', + 'http://minio1:9001/root/data/{{clickhouse,database}}/*', 'minio', '{minio_secret_key}', 'CSV', + 'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))') AS t1 + JOIN + join_table AS t2 + ON t1.value = t2.id + ORDER BY t1.name + SETTINGS object_storage_cluster_join_mode='local'; + """ + ) + + res = list(map(str.split, result1.splitlines())) + assert len(res) == 25 + + for line in res: + if len(line) == 2: + assert line[1] == f"{line[0]}_jt" + else: + assert line == ["_jt"] # for empty name + + result2 = node.query( + f""" + SELECT t1.name, t2.name FROM + join_table AS t2 + JOIN + s3Cluster('cluster_simple', + 'http://minio1:9001/root/data/{{clickhouse,database}}/*', 'minio', '{minio_secret_key}', 'CSV', + 'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))') AS t1 + ON t1.value = t2.id + ORDER BY t1.name + SETTINGS object_storage_cluster_join_mode='local'; + """ + ) + + assert result1 == result2 + + # With WHERE clause with remote column only + result3 = node.query( + f""" + SELECT t1.name, t2.name FROM + s3Cluster('cluster_simple', + 'http://minio1:9001/root/data/{{clickhouse,database}}/*', 'minio', '{minio_secret_key}', 'CSV', + 'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))') AS t1 + JOIN + join_table AS t2 + ON t1.value = t2.id + WHERE (t1.value % 2) + ORDER BY t1.name + SETTINGS object_storage_cluster_join_mode='local'; + """ + ) + + res = list(map(str.split, result3.splitlines())) + assert len(res) == 8 + + # With WHERE clause with local column only + result4 = node.query( + f""" + SELECT t1.name, t2.name FROM + s3Cluster('cluster_simple', + 'http://minio1:9001/root/data/{{clickhouse,database}}/*', 'minio', '{minio_secret_key}', 'CSV', + 'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))') AS t1 + JOIN + join_table AS t2 + ON t1.value = t2.id + WHERE (t2.id % 2) + ORDER BY t1.name + SETTINGS object_storage_cluster_join_mode='local'; + """ + ) + + assert result3 == result4 + + # With WHERE clause with local and remote columns + result5 = node.query( + f""" + SELECT t1.name, t2.name FROM + s3Cluster('cluster_simple', + 'http://minio1:9001/root/data/{{clickhouse,database}}/*', 'minio', '{minio_secret_key}', 'CSV', + 'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))') AS t1 + JOIN + join_table AS t2 + ON t1.value = t2.id + WHERE (t1.value % 2) AND ((t2.id % 3) == 2) + ORDER BY t1.name + SETTINGS object_storage_cluster_join_mode='local'; + """ + ) + + res = list(map(str.split, result5.splitlines())) + assert len(res) == 6