diff --git a/src/Interpreters/ClusterProxy/SelectStreamFactory.cpp b/src/Interpreters/ClusterProxy/SelectStreamFactory.cpp index 2521e7e229ab..7297cb6f70c7 100644 --- a/src/Interpreters/ClusterProxy/SelectStreamFactory.cpp +++ b/src/Interpreters/ClusterProxy/SelectStreamFactory.cpp @@ -67,7 +67,8 @@ ASTPtr rewriteSelectQuery( const ASTPtr & query, const std::string & remote_database, const std::string & remote_table, - ASTPtr table_function_ptr) + ASTPtr table_function_ptr, + ASTPtr additional_filter) { auto modified_query_ast = query->clone(); @@ -80,8 +81,33 @@ ASTPtr rewriteSelectQuery( if (!context->getSettingsRef()[Setting::allow_experimental_analyzer]) { + // Apply additional filter if provided + if (additional_filter) + { + if (select_query.where()) + { + /// WHERE AND + select_query.setExpression( + ASTSelectQuery::Expression::WHERE, + makeASTFunction("and", select_query.where(), additional_filter)); + } + else + { + /// No WHERE – simply set it + select_query.setExpression( + ASTSelectQuery::Expression::WHERE, additional_filter->clone()); + } + } + if (table_function_ptr) + { select_query.addTableFunction(table_function_ptr); + + // Reset semantic table information for all column identifiers to prevent + // RestoreQualifiedNamesVisitor from adding wrong table names + ResetSemanticTableVisitor::Data data; + ResetSemanticTableVisitor(data).visit(modified_query_ast); + } else select_query.replaceDatabaseAndTable(remote_database, remote_table); @@ -93,6 +119,7 @@ ASTPtr rewriteSelectQuery( data.distributed_table = DatabaseAndTableWithAlias(*getTableExpression(query->as(), 0)); data.remote_table.database = remote_database; data.remote_table.table = remote_table; + RestoreQualifiedNamesVisitor(data).visit(modified_query_ast); } } diff --git a/src/Interpreters/ClusterProxy/SelectStreamFactory.h b/src/Interpreters/ClusterProxy/SelectStreamFactory.h index 2d04816fba8a..64ae76c39e7d 100644 --- a/src/Interpreters/ClusterProxy/SelectStreamFactory.h +++ b/src/Interpreters/ClusterProxy/SelectStreamFactory.h @@ -42,7 +42,8 @@ ASTPtr rewriteSelectQuery( const ASTPtr & query, const std::string & remote_database, const std::string & remote_table, - ASTPtr table_function_ptr = nullptr); + ASTPtr table_function_ptr = nullptr, + ASTPtr additional_filter = nullptr); using ColumnsDescriptionByShardNum = std::unordered_map; using AdditionalShardFilterGenerator = std::function; diff --git a/src/Interpreters/TranslateQualifiedNamesVisitor.cpp b/src/Interpreters/TranslateQualifiedNamesVisitor.cpp index c21c4d34fa84..5346bd93fd44 100644 --- a/src/Interpreters/TranslateQualifiedNamesVisitor.cpp +++ b/src/Interpreters/TranslateQualifiedNamesVisitor.cpp @@ -399,4 +399,15 @@ void RestoreQualifiedNamesMatcher::visit(ASTIdentifier & identifier, ASTPtr &, D } } +void ResetSemanticTableMatcher::visit(ASTPtr & ast, Data & data) +{ + if (auto * t = ast->as()) + visit(*t, ast, data); +} + +void ResetSemanticTableMatcher::visit(ASTIdentifier & identifier, ASTPtr &, Data &) +{ + identifier.resetSemanticTable(); +} + } diff --git a/src/Interpreters/TranslateQualifiedNamesVisitor.h b/src/Interpreters/TranslateQualifiedNamesVisitor.h index 00c85d08873f..474ee20b72fd 100644 --- a/src/Interpreters/TranslateQualifiedNamesVisitor.h +++ b/src/Interpreters/TranslateQualifiedNamesVisitor.h @@ -80,4 +80,33 @@ struct RestoreQualifiedNamesMatcher using RestoreQualifiedNamesVisitor = InDepthNodeVisitor; + +/// Reset semantic->table for all column identifiers in the AST. +/// +/// PROBLEM DESCRIPTION: +/// When an AST is passed through multiple query rewrites (e.g., in TieredDistributedMerge -> remote), +/// the semantic->table information attached to ASTIdentifier nodes can become stale and +/// cause incorrect column qualification. This happens because: +/// +/// 1. During initial parsing, semantic->table is populated with the original table name +/// 2. When the query is rewritten (e.g., FROM clause changed from table to remote() function inside TieredDistributedMerge), +/// the AST structure is modified but semantic->table information is preserved +/// 3. Subsequent visitors like RestoreQualifiedNamesVisitor called in remote() function over the same AST +/// may use this stale semantic->table information to incorrectly qualify column names with the original table name +/// +/// SOLUTION: +/// This visitor clears semantic->table for all column identifiers, ensuring that subsequent +/// visitors work with clean semantic information and don't apply stale table qualifications. +struct ResetSemanticTableMatcher +{ + // No data needed for this visitor + struct Data {}; + + static bool needChildVisit(const ASTPtr &, const ASTPtr &) { return true; } + static void visit(ASTPtr & ast, Data & data); + static void visit(ASTIdentifier & identifier, ASTPtr &, Data & data); +}; + +using ResetSemanticTableVisitor = InDepthNodeVisitor; + } diff --git a/src/Parsers/ASTIdentifier.cpp b/src/Parsers/ASTIdentifier.cpp index cc7a940b85e1..8c9d51cd8d59 100644 --- a/src/Parsers/ASTIdentifier.cpp +++ b/src/Parsers/ASTIdentifier.cpp @@ -169,6 +169,17 @@ void ASTIdentifier::restoreTable() } } +void ASTIdentifier::resetSemanticTable() +{ + // Only reset semantic table for column identifiers (not table identifiers) + if (semantic && !semantic->special) + { + semantic->table.clear(); + semantic->can_be_alias = true; + semantic->membership = std::nullopt; + } +} + std::shared_ptr ASTIdentifier::createTable() const { if (name_parts.size() == 1) return std::make_shared(name_parts[0]); diff --git a/src/Parsers/ASTIdentifier.h b/src/Parsers/ASTIdentifier.h index 72dde7f644fb..3ea66264ca24 100644 --- a/src/Parsers/ASTIdentifier.h +++ b/src/Parsers/ASTIdentifier.h @@ -52,6 +52,7 @@ class ASTIdentifier : public ASTWithAlias void updateTreeHashImpl(SipHash & hash_state, bool ignore_alias) const override; void restoreTable(); // TODO(ilezhankin): get rid of this + void resetSemanticTable(); // Reset semantic to empty string (see ResetSemanticTableVisitor) std::shared_ptr createTable() const; // returns |nullptr| if identifier is not table. String full_name; diff --git a/src/Planner/PlannerActionsVisitor.cpp b/src/Planner/PlannerActionsVisitor.cpp index dd4070125d5e..af0f41755f59 100644 --- a/src/Planner/PlannerActionsVisitor.cpp +++ b/src/Planner/PlannerActionsVisitor.cpp @@ -348,7 +348,7 @@ class ActionNodeNameHelper } default: { - throw Exception(ErrorCodes::LOGICAL_ERROR, "Invalid action query tree node {}", node->formatASTForErrorMessage()); + throw Exception(ErrorCodes::LOGICAL_ERROR, "Invalid action query tree node {} (node_type: {})", node->formatASTForErrorMessage(), static_cast(node_type)); } } diff --git a/src/Storages/StorageDistributed.cpp b/src/Storages/StorageDistributed.cpp index c5a89813f79c..8a4d057932d9 100644 --- a/src/Storages/StorageDistributed.cpp +++ b/src/Storages/StorageDistributed.cpp @@ -5,6 +5,13 @@ #include #include +#include +#include +#include +#include +#include +#include +#include #include #include @@ -85,6 +92,7 @@ #include #include +#include #include #include @@ -94,6 +102,7 @@ #include #include #include +#include #include #include #include @@ -500,6 +509,10 @@ QueryProcessingStage::Enum StorageDistributed::getQueryProcessingStage( if (to_stage == QueryProcessingStage::WithMergeableState) return QueryProcessingStage::WithMergeableState; + // TODO: check logic + if (!additional_table_functions.empty()) + nodes += additional_table_functions.size(); + /// If there is only one node, the query can be fully processed by the /// shard, initiator will work as a proxy only. if (nodes == 1) @@ -542,6 +555,9 @@ QueryProcessingStage::Enum StorageDistributed::getQueryProcessingStage( bool StorageDistributed::isShardingKeySuitsQueryTreeNodeExpression( const QueryTreeNodePtr & expr, const SelectQueryInfo & query_info) const { + if (!additional_table_functions.empty()) + return false; + ColumnsWithTypeAndName empty_input_columns; ColumnNodePtrWithHashSet empty_correlated_columns_set; // When comparing sharding key expressions, we need to ignore table qualifiers in column names @@ -582,6 +598,7 @@ bool StorageDistributed::isShardingKeySuitsQueryTreeNodeExpression( return allOutputsDependsOnlyOnAllowedNodes(sharding_key_dag, irreducibe_nodes, matches); } +// TODO: support additional table functions std::optional StorageDistributed::getOptimizedQueryProcessingStageAnalyzer(const SelectQueryInfo & query_info, const Settings & settings) const { bool optimize_sharding_key_aggregation = settings[Setting::optimize_skip_unused_shards] && settings[Setting::optimize_distributed_group_by_sharding_key] @@ -640,6 +657,7 @@ std::optional StorageDistributed::getOptimizedQueryP return QueryProcessingStage::Complete; } +// TODO: support additional table functions std::optional StorageDistributed::getOptimizedQueryProcessingStage(const SelectQueryInfo & query_info, const Settings & settings) const { bool optimize_sharding_key_aggregation = settings[Setting::optimize_skip_unused_shards] && settings[Setting::optimize_distributed_group_by_sharding_key] @@ -749,9 +767,11 @@ static bool requiresObjectColumns(const ColumnsDescription & all_columns, ASTPtr StorageSnapshotPtr StorageDistributed::getStorageSnapshot(const StorageMetadataPtr & metadata_snapshot, ContextPtr query_context) const { + /// TODO: support additional table functions return getStorageSnapshotForQuery(metadata_snapshot, nullptr, query_context); } +/// TODO: support additional table functions StorageSnapshotPtr StorageDistributed::getStorageSnapshotForQuery( const StorageMetadataPtr & metadata_snapshot, const ASTPtr & query, ContextPtr /*query_context*/) const { @@ -887,7 +907,8 @@ bool rewriteJoinToGlobalJoinIfNeeded(QueryTreeNodePtr join_tree) QueryTreeNodePtr buildQueryTreeDistributed(SelectQueryInfo & query_info, const StorageSnapshotPtr & distributed_storage_snapshot, const StorageID & remote_storage_id, - const ASTPtr & remote_table_function) + const ASTPtr & remote_table_function, + const ASTPtr & additional_filter = nullptr) { auto & planner_context = query_info.planner_context; const auto & query_context = planner_context->getQueryContext(); @@ -954,7 +975,29 @@ QueryTreeNodePtr buildQueryTreeDistributed(SelectQueryInfo & query_info, replacement_table_expression->setAlias(query_info.table_expression->getAlias()); + + QueryTreeNodePtr filter; + + if (additional_filter) + { + const auto & context = query_info.planner_context->getQueryContext(); + + filter = buildQueryTree(additional_filter->clone(), query_context); + + QueryAnalysisPass(replacement_table_expression).run(filter, context); + } + auto query_tree_to_modify = query_info.query_tree->cloneAndReplace(query_info.table_expression, std::move(replacement_table_expression)); + + // Apply additional filter if provided + if (filter) + { + auto & query = query_tree_to_modify->as(); + query.getWhere() = query.hasWhere() + ? mergeConditionNodes({query.getWhere(), filter}, query_context) + : std::move(filter); + } + ReplaseAliasColumnsVisitor replase_alias_columns_visitor; replase_alias_columns_visitor.visit(query_tree_to_modify); @@ -973,6 +1016,7 @@ QueryTreeNodePtr buildQueryTreeDistributed(SelectQueryInfo & query_info, } return buildQueryTreeForShard(query_info.planner_context, query_tree_to_modify, /*allow_global_join_for_right_table*/ false); + } } @@ -991,28 +1035,66 @@ void StorageDistributed::read( SelectQueryInfo modified_query_info = query_info; + std::vector all_headers; + std::vector all_query_infos; + const auto & settings = local_context->getSettingsRef(); if (settings[Setting::allow_experimental_analyzer]) { - StorageID remote_storage_id = StorageID{remote_database, remote_table}; + StorageID remote_storage_id = StorageID::createEmpty(); + if (!remote_table_function_ptr) + remote_storage_id = StorageID{remote_database, remote_table}; auto query_tree_distributed = buildQueryTreeDistributed(modified_query_info, query_info.initial_storage_snapshot ? query_info.initial_storage_snapshot : storage_snapshot, remote_storage_id, - remote_table_function_ptr); - header = InterpreterSelectQueryAnalyzer::getSampleBlock(query_tree_distributed, local_context, SelectQueryOptions(processed_stage).analyze()); + remote_table_function_ptr, + additional_filter); + + + SelectQueryOptions options = SelectQueryOptions(processed_stage).analyze(); + header = InterpreterSelectQueryAnalyzer::getSampleBlock(query_tree_distributed, local_context, options); + /** For distributed tables we do not need constants in header, since we don't send them to remote servers. * Moreover, constants can break some functions like `hostName` that are constants only for local queries. */ for (auto & column : header) column.column = column.column->convertToFullColumnIfConst(); + modified_query_info.query = queryNodeToDistributedSelectQuery(query_tree_distributed); modified_query_info.query_tree = std::move(query_tree_distributed); - /// Return directly (with correct header) if no shard to query. - if (modified_query_info.getCluster()->getShardsInfo().empty()) + if (!additional_table_functions.empty()) + { + for (const auto & table_function_entry : additional_table_functions) + { + // Create a modified query info with the additional predicate + SelectQueryInfo additional_query_info = query_info; + + auto additional_query_tree = buildQueryTreeDistributed(additional_query_info, + query_info.initial_storage_snapshot ? query_info.initial_storage_snapshot : storage_snapshot, + StorageID::createEmpty(), + table_function_entry.table_function_ast, + table_function_entry.predicate_ast); + + // TODO: somewhere here the DESCRIBE TABLE is triggered, try to avoid it. + auto additional_header = InterpreterSelectQueryAnalyzer::getSampleBlock(additional_query_tree, local_context, SelectQueryOptions(processed_stage).analyze()); + + for (auto & column : additional_header) + column.column = column.column->convertToFullColumnIfConst(); + + additional_query_info.query = queryNodeToDistributedSelectQuery(additional_query_tree); + additional_query_info.query_tree = std::move(additional_query_tree); + + all_headers.push_back(additional_header); + all_query_infos.push_back(additional_query_info); + } + } + + // For empty shards - avoid early return if we have additional table functions + if (modified_query_info.getCluster()->getShardsInfo().empty() && additional_table_functions.empty()) return; } else @@ -1021,9 +1103,29 @@ void StorageDistributed::read( modified_query_info.query = ClusterProxy::rewriteSelectQuery( local_context, modified_query_info.query, - remote_database, remote_table, remote_table_function_ptr); + remote_database, remote_table, remote_table_function_ptr, + additional_filter); - if (modified_query_info.getCluster()->getShardsInfo().empty()) + if (!additional_table_functions.empty()) + { + for (const auto & table_function_entry : additional_table_functions) + { + SelectQueryInfo additional_query_info = query_info; + + auto additional_header = InterpreterSelectQuery(additional_query_info.query, local_context, SelectQueryOptions(processed_stage).analyze()).getSampleBlock(); + + additional_query_info.query = ClusterProxy::rewriteSelectQuery( + local_context, additional_query_info.query, + "", "", table_function_entry.table_function_ast, + table_function_entry.predicate_ast); + + all_headers.push_back(additional_header); + all_query_infos.push_back(additional_query_info); + } + } + + // For empty shards - avoid early return if we have additional table functions + if (modified_query_info.getCluster()->getShardsInfo().empty() && additional_table_functions.empty()) { Pipe pipe(std::make_shared(header)); auto read_from_pipe = std::make_unique(std::move(pipe)); @@ -1035,35 +1137,98 @@ void StorageDistributed::read( } const auto & snapshot_data = assert_cast(*storage_snapshot->data); - ClusterProxy::SelectStreamFactory select_stream_factory = - ClusterProxy::SelectStreamFactory( + + if (!modified_query_info.getCluster()->getShardsInfo().empty()) + { + ClusterProxy::SelectStreamFactory select_stream_factory = + ClusterProxy::SelectStreamFactory( + header, + snapshot_data.objects_by_shard, + storage_snapshot, + processed_stage); + + auto shard_filter_generator = ClusterProxy::getShardFilterGeneratorForCustomKey( + *modified_query_info.getCluster(), local_context, getInMemoryMetadataPtr()->columns); + + ClusterProxy::executeQuery( + query_plan, header, - snapshot_data.objects_by_shard, - storage_snapshot, - processed_stage); - - auto shard_filter_generator = ClusterProxy::getShardFilterGeneratorForCustomKey( - *modified_query_info.getCluster(), local_context, getInMemoryMetadataPtr()->columns); - - ClusterProxy::executeQuery( - query_plan, - header, - processed_stage, - remote_storage, - remote_table_function_ptr, - select_stream_factory, - log, - local_context, - modified_query_info, - sharding_key_expr, - sharding_key_column_name, - *distributed_settings, - shard_filter_generator, - is_remote_function); - - /// This is a bug, it is possible only when there is no shards to query, and this is handled earlier. - if (!query_plan.isInitialized()) - throw Exception(ErrorCodes::LOGICAL_ERROR, "Pipeline is not initialized"); + processed_stage, + remote_storage, + remote_table_function_ptr, + select_stream_factory, + log, + local_context, + modified_query_info, + sharding_key_expr, + sharding_key_column_name, + *distributed_settings, + shard_filter_generator, + is_remote_function); + + /// This is a bug, it is possible only when there is no shards to query, and this is handled earlier. + if (!query_plan.isInitialized()) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Pipeline is not initialized"); + } + + std::vector additional_plans; + for (size_t i = 0; i < all_query_infos.size(); ++i) + { + auto additional_query_info = all_query_infos[i]; + const auto & storage = additional_table_functions[i].storage; + auto additional_header = all_headers[i]; + + // Create a new query plan for this additional storage + QueryPlan additional_plan; + // Execute the query against the additional storage + storage->read( + additional_plan, + {}, // column names + storage->getStorageSnapshot(storage->getInMemoryMetadataPtr(), local_context), + additional_query_info, + local_context, + processed_stage, + 0, // max_block_size + 0); // num_streams + + additional_plans.push_back(std::move(additional_plan)); + } + + // Combine all plans using UnionStep + if (!additional_plans.empty()) + { + // Convert QueryPlan objects to QueryPlanPtr + std::vector plan_ptrs; + plan_ptrs.reserve(additional_plans.size() + 1); + + // Add the main plan to the list + plan_ptrs.push_back(std::make_unique(std::move(query_plan))); + + // Add additional plans + for (auto & plan : additional_plans) + { + plan_ptrs.push_back(std::make_unique(std::move(plan))); + } + + // Create a new query plan that unions all the results + QueryPlan union_plan; + + // Get headers from all plans + std::vector headers; + headers.reserve(plan_ptrs.size()); + for (const auto & plan_ptr : plan_ptrs) + { + headers.push_back(plan_ptr->getCurrentHeader()); + } + + // Create UnionStep to combine all plans + auto union_step = std::make_unique(std::move(headers), 0); + + union_plan.unitePlans(std::move(union_step), std::move(plan_ptrs)); + + // Replace the original query plan with the union plan + query_plan = std::move(union_plan); + } } @@ -2159,6 +2324,192 @@ void registerStorageDistributed(StorageFactory & factory) .source_access_type = AccessType::REMOTE, .has_builtin_setting_fn = DistributedSettings::hasBuiltin, }); + + // Register TieredDistributedMerge engine + // TODO: consider moving it to a separate file / subclass of StorageDistributed + factory.registerStorage("TieredDistributedMerge", [](const StorageFactory::Arguments & args) -> StoragePtr + { + ASTs & engine_args = args.engine_args; + + if (engine_args.size() < 2) + throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH, + "Storage TieredDistributedMerge requires at least 2 arguments, got {}", engine_args.size()); + + // Validate first argument - must be a table function + ASTPtr first_arg = engine_args[0]; + if (const auto * func = first_arg->as()) + { + // Check if it's a valid table function name + if (!TableFunctionFactory::instance().isTableFunctionName(func->name)) + throw Exception(ErrorCodes::BAD_ARGUMENTS, + "First argument must be a table function, got: {}", func->name); + + // Check if it's one of the supported remote table functions + if (func->name != "remote" && func->name != "remoteSecure" && + func->name != "cluster" && func->name != "clusterAllReplicas") + throw Exception(ErrorCodes::BAD_ARGUMENTS, + "First argument must be one of: remote, remoteSecure, cluster, clusterAllReplicas, got: {}", func->name); + } + else + { + throw Exception(ErrorCodes::BAD_ARGUMENTS, + "First argument must be a table function, got: {}", first_arg->getID()); + } + + // TODO: Validate second argument - must be a SQL expression (just rejecting a string literal for now) + ASTPtr second_arg = engine_args[1]; + if (const auto * literal = second_arg->as()) + { + // Check if it's a string literal (which would be invalid for a SQL expression) + if (literal->value.getType() == Field::Types::String) + { + throw Exception(ErrorCodes::BAD_ARGUMENTS, + "Second argument must be a SQL expression, got string literal"); + } + } + + // Create the underlying StorageDistributed using the table function + const ContextPtr & context = args.getContext(); + + // Parse additional table function pairs (if any) + std::vector additional_table_functions; + for (size_t i = 2; i < engine_args.size(); i += 2) + { + if (i + 1 >= engine_args.size()) + throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH, + "Table function pairs must have both table function and predicate, got odd number of arguments"); + + ASTPtr table_function_ast = engine_args[i]; + ASTPtr predicate_ast = engine_args[i + 1]; + + // Validate table function + const auto * func = table_function_ast->as(); + if (!func) + { + throw Exception(ErrorCodes::BAD_ARGUMENTS, + "Additional table function must be a table function, got: {}", table_function_ast->getID()); + } + else if (!TableFunctionFactory::instance().isTableFunctionName(func->name)) + { + throw Exception(ErrorCodes::BAD_ARGUMENTS, + "Argument #{}: additional table function must be a valid table function, got: {}", i, func->name); + } + + // Additional table functions can be either TableFunctionRemote or ITableFunctionCluster + // Check if it's a supported table function type + String table_function_name = func->name; + + // Check for ITableFunctionCluster types (ending with "Cluster") + // Use a whitelist of supported table function names for clarity and safety + static const std::unordered_set supported_table_functions = { + "remote", "remoteSecure", "cluster", "clusterAllReplicas", + "s3Cluster", "urlCluster", "fileCluster", "S3Cluster", "AzureCluster", "HDFSCluster", + "IcebergS3Cluster", "IcebergAzureCluster", "IcebergHDFSCluster", "DeltaLakeCluster", "HudiCluster" + }; + + if (supported_table_functions.find(table_function_name) == supported_table_functions.end()) + { + throw Exception(ErrorCodes::BAD_ARGUMENTS, + "Argument #{}: additional table function '{}' is not supported. " + "TieredDistributedMerge engine requires additional table functions to be either ITableFunctionCluster-based " + "(like s3Cluster, urlCluster, DeltaLakeCluster, etc.) or TableFunctionRemote-based " + "(like remote, remoteSecure, cluster, clusterAllReplicas).", i, table_function_name); + } + + // TODO: Validate predicate - must be a SQL expression (just rejecting a string literal for now) + if (const auto * literal = predicate_ast->as()) + { + if (literal->value.getType() == Field::Types::String) + { + throw Exception(ErrorCodes::BAD_ARGUMENTS, + "Additional predicate must be a SQL expression, got string literal"); + } + } + + // Create table function instance and execute it to get StoragePtr + auto table_function = TableFunctionFactory::instance().get(table_function_ast, context); + if (!table_function) + throw Exception(ErrorCodes::BAD_ARGUMENTS, "Invalid additional table function in TieredDistributedMerge engine"); + + // Execute the table function to get the underlying storage + StoragePtr additional_storage = table_function->execute( + table_function_ast, + context, + args.table_id.table_name, + args.columns, // Use the same columns as the main table function + false, // use_global_context = false + false); // is_insert_query = false + + // Handle StorageTableFunctionProxy if present + if (auto proxy = std::dynamic_pointer_cast(additional_storage)) + { + additional_storage = proxy->getNested(); + } + + // additional_storage->renameInMemory({args.table_id.database_name, args.table_id.table_name, args.table_id.uuid}); + + + additional_table_functions.emplace_back(std::move(additional_storage), table_function_ast, predicate_ast); + } + + // Now handle the first table function (which must be a TableFunctionRemote) + auto table_function = TableFunctionFactory::instance().get(first_arg, context); + if (!table_function) + throw Exception(ErrorCodes::BAD_ARGUMENTS, "Invalid table function in TieredDistributedMerge engine"); + + // For schema inference, we need to determine the columns first if they're not provided + ColumnsDescription columns_to_use = args.columns; + if (columns_to_use.empty()) + { + // Get the column structure from the table function + columns_to_use = table_function->getActualTableStructure(context, true); + } + + // Execute the table function to get the underlying storage + StoragePtr storage = table_function->execute( + first_arg, + context, + args.table_id.table_name, + columns_to_use, + false, // use_global_context = false + false); // is_insert_query = false + + // table function execution wraps the actual storage in a StorageTableFunctionProxy, to make initialize it lazily + // so we need to get the nested storage + if (auto proxy = std::dynamic_pointer_cast(storage)) + { + storage = proxy->getNested(); + } + + // Cast to StorageDistributed to access its methods + auto distributed_storage = std::dynamic_pointer_cast(storage); + if (!distributed_storage) + { + // Debug: Print the actual type we got + std::string actual_type = storage ? storage->getName() : "nullptr"; + throw Exception(ErrorCodes::LOGICAL_ERROR, + "TableFunctionRemote did not return a StorageDistributed or StorageProxy, got: {}", actual_type); + } + + // Fix the database and table names - this is the same pattern used in InterpreterCreateQuery + // The TableFunctionRemote creates a StorageDistributed with "_table_function" database, + // but we need to rename it to the correct database and table names + distributed_storage->renameInMemory({args.table_id.database_name, args.table_id.table_name, args.table_id.uuid}); + + // Store the filter expression for later use in read operations + distributed_storage->setAdditionalFilter(second_arg); + + // Store additional table functions for later use + distributed_storage->setAdditionalTableFunctions(std::move(additional_table_functions)); + + return distributed_storage; + }, + { + .supports_settings = false, + .supports_parallel_insert = true, + .supports_schema_inference = true, + .source_access_type = AccessType::REMOTE, + }); } bool StorageDistributed::initializeDiskOnConfigChange(const std::set & new_added_disks) diff --git a/src/Storages/StorageDistributed.h b/src/Storages/StorageDistributed.h index 784c39e8f755..1a756de5b708 100644 --- a/src/Storages/StorageDistributed.h +++ b/src/Storages/StorageDistributed.h @@ -50,6 +50,20 @@ class StorageDistributed final : public IStorage, WithContext friend class StorageSystemDistributionQueue; public: + /// Structure to hold storage, its AST, and associated predicate + struct TableFunctionEntry + { + StoragePtr storage; + ASTPtr table_function_ast; + ASTPtr predicate_ast; + + TableFunctionEntry(StoragePtr storage_, ASTPtr table_function_ast_, ASTPtr predicate_ast_) + : storage(std::move(storage_)) + , table_function_ast(std::move(table_function_ast_)) + , predicate_ast(std::move(predicate_ast_)) + {} + }; + StorageDistributed( const StorageID & id_, const ColumnsDescription & columns_, @@ -70,7 +84,10 @@ class StorageDistributed final : public IStorage, WithContext ~StorageDistributed() override; - std::string getName() const override { return "Distributed"; } + std::string getName() const override + { + return additional_table_functions.empty() ? "Distributed" : "TieredDistributedMerge"; + } bool supportsSampling() const override { return true; } bool supportsFinal() const override { return true; } @@ -149,6 +166,21 @@ class StorageDistributed final : public IStorage, WithContext size_t getShardCount() const; + /// Set additional filter for TieredDistributedMerge engine + void setAdditionalFilter(ASTPtr filter) { additional_filter = std::move(filter); } + + /// Set additional table functions for TieredDistributedMerge engine + void setAdditionalTableFunctions(std::vector additional_table_functions_) + { + additional_table_functions = std::move(additional_table_functions_); + } + + /// Getter methods for ClusterProxy::executeQuery + StorageID getRemoteStorageID() const { return remote_storage; } + ExpressionActionsPtr getShardingKeyExpression() const { return sharding_key_expr; } + const DistributedSettings * getDistributedSettings() const { return distributed_settings.get(); } + bool isRemoteFunction() const { return is_remote_function; } + bool initializeDiskOnConfigChange(const std::set & new_added_disks) override; private: @@ -282,6 +314,12 @@ class StorageDistributed final : public IStorage, WithContext pcg64 rng; bool is_remote_function; + + /// Additional filter expression for TieredDistributedMerge engine + ASTPtr additional_filter; + + /// Additional table functions for TieredDistributedMerge engine + std::vector additional_table_functions; }; } diff --git a/tests/queries/0_stateless/03372_tiered_distributed_merge.reference b/tests/queries/0_stateless/03372_tiered_distributed_merge.reference new file mode 100644 index 000000000000..2cf8256d7f20 --- /dev/null +++ b/tests/queries/0_stateless/03372_tiered_distributed_merge.reference @@ -0,0 +1,13 @@ +Code: 42. DB::Exception: Storage TieredDistributedMerge requires at least 2 arguments, got 0. (NUMBER_OF_ARGUMENTS_DOESNT_MATCH) + +Code: 42. DB::Exception: Storage TieredDistributedMerge requires at least 2 arguments, got 1. (NUMBER_OF_ARGUMENTS_DOESNT_MATCH) + +Code: 36. DB::Exception: First argument must be a table function, got: Literal_'invalid_arg'. (BAD_ARGUMENTS) + +Code: 36. DB::Exception: First argument must be a table function, got: sin. (BAD_ARGUMENTS) + +Code: 36. DB::Exception: First argument must be one of: remote, remoteSecure, cluster, clusterAllReplicas, got: url. (BAD_ARGUMENTS) + +Code: 42. DB::Exception: Storage TieredDistributedMerge requires at least 2 arguments, got 1. (NUMBER_OF_ARGUMENTS_DOESNT_MATCH) + +Code: 36. DB::Exception: Second argument must be a SQL expression, got string literal. (BAD_ARGUMENTS) \ No newline at end of file diff --git a/tests/queries/0_stateless/03372_tiered_distributed_merge.sql b/tests/queries/0_stateless/03372_tiered_distributed_merge.sql new file mode 100644 index 000000000000..5c139972c9ce --- /dev/null +++ b/tests/queries/0_stateless/03372_tiered_distributed_merge.sql @@ -0,0 +1,11 @@ +-- Test TieredDistributedMerge engine registration and basic validation +CREATE TABLE test_tiered_distributed_merge_no_args ( `id` UInt32, `name` String ) ENGINE = TieredDistributedMerge(); -- { serverError NUMBER_OF_ARGUMENTS_DOESNT_MATCH } +CREATE TABLE test_tiered_distributed_merge_one_arg ( `id` UInt32, `name` String ) ENGINE = TieredDistributedMerge(1); -- { serverError NUMBER_OF_ARGUMENTS_DOESNT_MATCH } +CREATE TABLE test_tiered_distributed_merge_invalid_first_arg ( `id` UInt32, `name` String) ENGINE = TieredDistributedMerge('invalid_arg', 1); -- { serverError BAD_ARGUMENTS } +CREATE TABLE test_tiered_distributed_merge_invalid_first_arg ( `id` UInt32, `name` String) ENGINE = TieredDistributedMerge(sin(3), 1); -- { serverError BAD_ARGUMENTS } +CREATE TABLE test_tiered_distributed_merge_invalid_first_arg ( `id` UInt32, `name` String) ENGINE = TieredDistributedMerge(url('http://google.com', 'RawBLOB'), 1); -- { serverError BAD_ARGUMENTS } +CREATE TABLE test_tiered_distributed_merge_invalid_first_arg ( `id` UInt32, `name` String) ENGINE = TieredDistributedMerge(urlCluster('test_cluster', 'http://example.com')); -- { serverError BAD_ARGUMENTS } +CREATE TABLE test_tiered_distributed_merge_invalid_second_arg (`id` UInt32, `name` String) ENGINE = TieredDistributedMerge(remote('test_cluster', 'db', 'table'), 'invalid_predicate');-- { serverError BAD_ARGUMENTS } + +-- Test 8: TieredDistributedMerge with 2 arguments (valid table function + SQL expression) should work +CREATE TABLE test_tiered_distributed_merge_valid (`id` UInt32, `name` String) ENGINE = TieredDistributedMerge(remote('test_cluster', 'db', 'table'), id > 0); diff --git a/tests/queries/0_stateless/03374_tiered_distributed_merge_real_table.reference b/tests/queries/0_stateless/03374_tiered_distributed_merge_real_table.reference new file mode 100644 index 000000000000..218a4473a581 --- /dev/null +++ b/tests/queries/0_stateless/03374_tiered_distributed_merge_real_table.reference @@ -0,0 +1,51 @@ +10 +6 +2 Bob 200.3 +2 Bob 200.3 +4 David 300.2 +4 David 300.2 +5 Eve 250.1 +5 Eve 250.1 +Union + Expression ((Project names + Projection)) + Expression + ReadFromMergeTree (default.test_local_table) + ReadFromRemote (Read from remote replica) +Expression (Project names) + Sorting (Sorting for ORDER BY) + Expression ((Before ORDER BY + Projection)) + MergingAggregated + Union + Aggregating + Expression (Before GROUP BY) + Expression + ReadFromMergeTree (default.test_local_table) + ReadFromRemote (Read from remote replica) +5 +2 Bob 200.3 +2 Bob 200.3 +4 David 300.2 +4 David 300.2 +5 Eve 250.1 +5 Eve 250.1 +0 Invalid 0.5 +0 Invalid 0.5 +1 Alice 100.5 +1 Alice 100.5 +2 Bob 200.3 +2 Bob 200.3 +2 Bob 200.3 +2 Bob 200.3 +2 Bob 200.3 +2 Bob 200.3 +0 Invalid 0.5 +0 Invalid 0.5 +1 Alice 100.5 +1 Alice 100.5 +2 Bob 200.3 +2 Bob 200.3 +Union + Expression ((Project names + Projection)) + Expression + ReadFromMergeTree (default.test_local_table) + ReadFromRemote (Read from remote replica) \ No newline at end of file diff --git a/tests/queries/0_stateless/03374_tiered_distributed_merge_real_table.sql b/tests/queries/0_stateless/03374_tiered_distributed_merge_real_table.sql new file mode 100644 index 000000000000..08c523a10087 --- /dev/null +++ b/tests/queries/0_stateless/03374_tiered_distributed_merge_real_table.sql @@ -0,0 +1,94 @@ +-- Test TieredDistributedMerge engine with real table connection +-- This test ensures we can actually create tables and select data + +DROP TABLE IF EXISTS test_local_table; +DROP TABLE IF EXISTS test_tiered_real_connection; + +-- Create a local table for testing +CREATE TABLE test_local_table +( + `id` UInt32, + `name` String, + `event_time` DateTime, + `value` Float64 +) ENGINE = MergeTree() +ORDER BY id; + +-- Insert some test data +INSERT INTO test_local_table VALUES + (0, 'Invalid', '2022-01-01 10:00:00', 0.5), + (1, 'Alice', '2022-01-01 10:00:00', 100.5), + (2, 'Bob', '2022-01-02 11:00:00', 200.3), + (3, 'Charlie', '2022-01-03 12:00:00', 150.7), + (4, 'David', '2022-01-04 13:00:00', 300.2), + (5, 'Eve', '2022-01-05 14:00:00', 250.1); + +-- Create TieredDistributedMerge table that connects to localhost (current server) +-- This will create a real connection to the local table +CREATE TABLE test_tiered_real_connection +( + `id` UInt32, + `name` String, + `event_time` DateTime, + `value` Float64 +) ENGINE = TieredDistributedMerge( + remote('127.0.0.1:9000,127.0.0.2:9000', currentDatabase(), 'test_local_table'), + id > 0 +); + +-- Test that we can select data from the TieredDistributedMerge table +-- This should return the same data as the local table +SELECT count() FROM test_tiered_real_connection; + +-- Test with WHERE condition +SELECT count() FROM test_tiered_real_connection WHERE value > 200; + +-- Test with ORDER BY +SELECT id, name, value FROM test_tiered_real_connection WHERE id > 2 ORDER BY value DESC; + +-- Test with LIMIT +SELECT * FROM test_tiered_real_connection ORDER BY id LIMIT 3; + +SET prefer_localhost_replica = 1; -- avoid getting different plans due to that setting + +-- Test EXPLAIN to see the query plan +EXPLAIN SELECT * FROM test_tiered_real_connection WHERE value > 150; + +-- Test EXPLAIN with more complex query +EXPLAIN SELECT + name, + count() as count, + avg(value) as avg_value +FROM test_tiered_real_connection +WHERE event_time >= '2022-01-02' +GROUP BY name +ORDER BY avg_value DESC; + +-- Test that the additional filter (id > 0) is working correctly +-- This should return all 5 rows since all ids are > 0 +SELECT count() FROM test_tiered_real_connection; + +-- Test with a WHERE condition that should be combined with the additional filter +-- The query should be: SELECT * FROM test_local_table WHERE (id > 0) AND (value > 200) +-- This should return rows with id > 0 AND value > 200 +SELECT id, name, value FROM test_tiered_real_connection WHERE value > 200 ORDER BY id; + +-- Test with a WHERE condition that conflicts with the additional filter +-- The query should be: SELECT * FROM test_local_table WHERE (id > 0) AND (id < 3) +-- This should return rows with id > 0 AND id < 3 (i.e., id = 1, 2) +SELECT id, name, value FROM test_tiered_real_connection WHERE id < 3 ORDER BY id; + +-- should work correctly together with additional_table_filters +SELECT id, name, value FROM test_tiered_real_connection WHERE id < 3 ORDER BY id SETTINGS additional_table_filters = {'test_tiered_real_connection' : 'id > 1'}, allow_experimental_analyzer = 0; +SELECT id, name, value FROM test_tiered_real_connection WHERE id < 3 ORDER BY id SETTINGS additional_table_filters = {'test_tiered_real_connection' : 'id > 1'}, allow_experimental_analyzer = 1; + +SELECT id, name, value FROM test_tiered_real_connection WHERE id < 3 ORDER BY id SETTINGS allow_experimental_analyzer = 0; +SELECT id, name, value FROM test_tiered_real_connection WHERE id < 3 ORDER BY id SETTINGS allow_experimental_analyzer = 1; + + +-- Test EXPLAIN to see how the additional filter is applied +EXPLAIN SELECT * FROM test_tiered_real_connection WHERE value > 200; + +-- Clean up +DROP TABLE IF EXISTS test_tiered_real_connection; +DROP TABLE IF EXISTS test_local_table; diff --git a/tests/queries/0_stateless/03376_tiered_distributed_merge_predicate_filtering.reference b/tests/queries/0_stateless/03376_tiered_distributed_merge_predicate_filtering.reference new file mode 100644 index 000000000000..87577cf0fc65 --- /dev/null +++ b/tests/queries/0_stateless/03376_tiered_distributed_merge_predicate_filtering.reference @@ -0,0 +1,31 @@ +Test 1: Predicate filtering with analyzer OFF +6 +3 +3 +Test 2: Virtual column _table_index with analyzer OFF +1 3 +2 3 +Test 3: Predicate filtering with analyzer ON +6 +3 +3 +Test 4: Virtual column _table_index with analyzer ON +1 3 +2 3 +Test 5: Complex predicate filtering +2 +1 1 +2 1 +Test 6: Data integrity check +1 4 David 2025-09-05 400 +1 5 Eve 2025-09-10 500 +1 6 Frank 2025-09-15 600 +2 1 Alice 2025-08-15 100 +2 2 Bob 2025-08-20 200 +2 3 Charlie 2025-08-25 300 +Test 7: Additional WHERE clause +2 +3 +Test 8: Additional WHERE clause with analyzer ON +2 +3 diff --git a/tests/queries/0_stateless/03376_tiered_distributed_merge_predicate_filtering.sql b/tests/queries/0_stateless/03376_tiered_distributed_merge_predicate_filtering.sql new file mode 100644 index 000000000000..68afff56b7fe --- /dev/null +++ b/tests/queries/0_stateless/03376_tiered_distributed_merge_predicate_filtering.sql @@ -0,0 +1,148 @@ +-- Tags: no-random-merge-tree-settings + +-- Test TieredDistributedMerge engine predicate filtering and virtual column functionality + +DROP TABLE IF EXISTS test_table1_local SYNC; +DROP TABLE IF EXISTS test_table2_local SYNC; +DROP TABLE IF EXISTS test_tiered_predicate_filtering_analyzer_off SYNC; + +-- Create local tables with data before and after watermark +CREATE TABLE test_table1_local +( + `id` UInt32, + `name` String, + `date` Date, + `value` UInt32 +) +ENGINE = MergeTree() +ORDER BY id; + +CREATE TABLE test_table2_local +( + `id` UInt32, + `name` String, + `date` Date, + `value` UInt32 +) +ENGINE = MergeTree() +ORDER BY id; + +-- Insert data before watermark (2025-09-01) +INSERT INTO test_table1_local VALUES + (11, 'Alice', '2025-08-15', 100), + (12, 'Bob', '2025-08-20', 200), + (13, 'Charlie', '2025-08-25', 300); + +INSERT INTO test_table2_local VALUES + (21, 'Alice', '2025-08-15', 100), + (22, 'Bob', '2025-08-20', 200), + (23, 'Charlie', '2025-08-25', 300); + +-- Insert data after watermark (2025-09-01) +INSERT INTO test_table1_local VALUES + (14, 'David', '2025-09-05', 400), + (15, 'Eve', '2025-09-10', 500), + (16, 'Frank', '2025-09-15', 600); + +INSERT INTO test_table2_local VALUES + (24, 'David', '2025-09-05', 400), + (25, 'Eve', '2025-09-10', 500), + (26, 'Frank', '2025-09-15', 600); + +-- Test 1: Basic predicate filtering with analyzer disabled +SET allow_experimental_analyzer = 1; + +CREATE TABLE test_tiered_predicate_filtering_analyzer_off +( + `id` UInt32, + `name` String, + `date` Date, + `value` UInt32 +) +ENGINE = TieredDistributedMerge( + remote('127.0.0.2:9000', currentDatabase(), 'test_table1_local'), + date >= '2025-09-01', + remote('127.0.0.2:9000', currentDatabase(), 'test_table2_local'), + date < '2025-09-01' +); + +-- Test predicate filtering - should return only data after watermark from table1 and before watermark from table2 +SELECT 'Test 1: Predicate filtering with analyzer OFF' as test_name; +SELECT * FROM test_tiered_predicate_filtering_analyzer_off ORDER BY id; +SELECT count() as total_rows FROM test_tiered_predicate_filtering_analyzer_off; +SELECT count() as rows_after_watermark FROM test_tiered_predicate_filtering_analyzer_off WHERE date >= '2025-09-01'; +SELECT count() as rows_before_watermark FROM test_tiered_predicate_filtering_analyzer_off WHERE date < '2025-09-01'; + +-- Test virtual column functionality +SELECT 'Test 2: Virtual column _table_index with analyzer OFF' as test_name; +SELECT _table_index, count() as row_count FROM test_tiered_predicate_filtering_analyzer_off GROUP BY _table_index ORDER BY _table_index; + +-- Test 3: Basic predicate filtering with analyzer enabled +SET allow_experimental_analyzer = 1; + +CREATE TABLE test_tiered_predicate_filtering_analyzer_on +( + `id` UInt32, + `name` String, + `date` Date, + `value` UInt32 +) +ENGINE = TieredDistributedMerge( + remote('127.0.0.1:9000,127.0.0.2:9000', currentDatabase(), 'test_table1_local'), + date >= '2025-09-01', + remote('127.0.0.1:9000,127.0.0.2:9000', currentDatabase(), 'test_table2_local'), + date < '2025-09-01' +); + +-- Test predicate filtering - should return only data after watermark from table1 and before watermark from table2 +SELECT 'Test 3: Predicate filtering with analyzer ON' as test_name; +SELECT count() as total_rows FROM test_tiered_predicate_filtering_analyzer_on; +SELECT count() as rows_after_watermark FROM test_tiered_predicate_filtering_analyzer_on WHERE date >= '2025-09-01'; +SELECT count() as rows_before_watermark FROM test_tiered_predicate_filtering_analyzer_on WHERE date < '2025-09-01'; + +-- Test virtual column functionality +SELECT 'Test 4: Virtual column _table_index with analyzer ON' as test_name; +SELECT _table_index, count() as row_count FROM test_tiered_predicate_filtering_analyzer_on GROUP BY _table_index ORDER BY _table_index; + +-- Test 5: Complex predicate with multiple conditions +SET allow_experimental_analyzer = 0; + +CREATE TABLE test_tiered_complex_predicate +( + `id` UInt32, + `name` String, + `date` Date, + `value` UInt32 +) +ENGINE = TieredDistributedMerge( + remote('127.0.0.1:9000,127.0.0.2:9000', currentDatabase(), 'test_table1_local'), + date >= '2025-09-01' AND value > 400, + remote('127.0.0.1:9000,127.0.0.2:9000', currentDatabase(), 'test_table2_local'), + date < '2025-09-01' AND value < 300 +); + +SELECT 'Test 5: Complex predicate filtering' as test_name; +SELECT count() as total_rows FROM test_tiered_complex_predicate; +SELECT _table_index, count() as row_count FROM test_tiered_complex_predicate GROUP BY _table_index ORDER BY _table_index; + +-- Test 6: Verify data integrity - check specific values +SELECT 'Test 6: Data integrity check' as test_name; +SELECT _table_index, id, name, date, value FROM test_tiered_predicate_filtering_analyzer_off ORDER BY _table_index, id; + +-- Test 7: Test with additional WHERE clause on top of engine predicates +SELECT 'Test 7: Additional WHERE clause' as test_name; +SELECT count() as alice_rows FROM test_tiered_predicate_filtering_analyzer_off WHERE name = 'Alice'; +SELECT count() as high_value_rows FROM test_tiered_predicate_filtering_analyzer_off WHERE value > 300; + +-- Test 8: Test with analyzer enabled and additional WHERE clause +SET allow_experimental_analyzer = 1; +SELECT 'Test 8: Additional WHERE clause with analyzer ON' as test_name; +SELECT count() as alice_rows FROM test_tiered_predicate_filtering_analyzer_on WHERE name = 'Alice'; +SELECT count() as high_value_rows FROM test_tiered_predicate_filtering_analyzer_on WHERE value > 300; + +-- Clean up +DROP TABLE IF EXISTS test_tiered_predicate_filtering_analyzer_off; +DROP TABLE IF EXISTS test_tiered_predicate_filtering_analyzer_on; +DROP TABLE IF EXISTS test_tiered_complex_predicate; +DROP TABLE IF EXISTS test_table1_local; +DROP TABLE IF EXISTS test_table2_local;