Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions src/Core/Settings.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1713,6 +1713,22 @@ Possible values:
- `global` — Replaces the `IN`/`JOIN` query with `GLOBAL IN`/`GLOBAL JOIN.`
- `allow` — Allows the use of these types of subqueries.
)", IMPORTANT) \
DECLARE(ObjectStorageClusterJoinMode, object_storage_cluster_join_mode, ObjectStorageClusterJoinMode::ALLOW, R"(
Changes the behaviour of object storage cluster function or table.

ClickHouse applies this setting when the query contains the product of object storage cluster function ot table, i.e. when the query for a object storage cluster function ot table contains a non-GLOBAL subquery for the object storage cluster function ot table.

Restrictions:

- Only applied for JOIN subqueries.
- Only if the FROM section uses a object storage cluster function ot table.

Possible values:

- `local` — Replaces the database and table in the subquery with local ones for the destination server (shard), leaving the normal `IN`/`JOIN.`
- `global` — Unsupported for now. Replaces the `IN`/`JOIN` query with `GLOBAL IN`/`GLOBAL JOIN.`
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if it is unsupported, then why was it added? I suggest ditching that value from enum

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's for part 2, when right table is calculated on initiator and is sent as part of the query to the swarm nodes.

- `allow` — Default value. Allows the use of these types of subqueries.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
- `allow` — Default value. Allows the use of these types of subqueries.
- `allow` — Default value. Allows the use of these types of subqueries. The join will be executed on the initiator.

should we also support 'forbid' ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW - what will happen when joining xxxCluster with non-distributed table? Maybe a test?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, it's a default old behavour - query executed on remote nodes "as is" and join executed there.
I guess this is optimal for casual, non-swarm mode, when right table is present on all cluster nodes.

)", 0) \
\
DECLARE(UInt64, max_concurrent_queries_for_all_users, 0, R"(
Throw exception if the value of this setting is less or equal than the current number of simultaneously processed queries.
Expand Down
1 change: 1 addition & 0 deletions src/Core/Settings.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ class WriteBuffer;
M(CLASS_NAME, DistributedCachePoolBehaviourOnLimit) /* Cloud only */ \
M(CLASS_NAME, DistributedDDLOutputMode) \
M(CLASS_NAME, DistributedProductMode) \
M(CLASS_NAME, ObjectStorageClusterJoinMode) \
M(CLASS_NAME, Double) \
M(CLASS_NAME, EscapingRule) \
M(CLASS_NAME, Float) \
Expand Down
1 change: 1 addition & 0 deletions src/Core/SettingsChangesHistory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory()
{"lock_object_storage_task_distribution_ms", 0, 0, "New setting."},
{"object_storage_cluster", "", "", "New setting"},
{"object_storage_max_nodes", 0, 0, "New setting"},
{"object_storage_cluster_join_mode", "allow", "allow", "New setting"},
{"object_storage_remote_initiator", false, false, "New setting."},
});
addSettingsChanges(settings_changes_history, "25.6",
Expand Down
5 changes: 5 additions & 0 deletions src/Core/SettingsEnums.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,11 @@ IMPLEMENT_SETTING_ENUM(DistributedProductMode, ErrorCodes::UNKNOWN_DISTRIBUTED_P
{"global", DistributedProductMode::GLOBAL},
{"allow", DistributedProductMode::ALLOW}})

IMPLEMENT_SETTING_ENUM(ObjectStorageClusterJoinMode, ErrorCodes::BAD_ARGUMENTS,
{{"local", ObjectStorageClusterJoinMode::LOCAL},
{"global", ObjectStorageClusterJoinMode::GLOBAL},
{"allow", ObjectStorageClusterJoinMode::ALLOW}})


IMPLEMENT_SETTING_ENUM(QueryResultCacheNondeterministicFunctionHandling, ErrorCodes::BAD_ARGUMENTS,
{{"throw", QueryResultCacheNondeterministicFunctionHandling::Throw},
Expand Down
10 changes: 10 additions & 0 deletions src/Core/SettingsEnums.h
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,16 @@ enum class DistributedProductMode : uint8_t

DECLARE_SETTING_ENUM(DistributedProductMode)

/// The setting for executing object storage cluster function ot table JOIN sections.
enum class ObjectStorageClusterJoinMode : uint8_t
{
LOCAL, /// Convert to local query
GLOBAL, /// Convert to global query
ALLOW /// Enable
};

DECLARE_SETTING_ENUM(ObjectStorageClusterJoinMode)

/// How the query result cache handles queries with non-deterministic functions, e.g. now()
enum class QueryResultCacheNondeterministicFunctionHandling : uint8_t
{
Expand Down
2 changes: 2 additions & 0 deletions src/Planner/PlannerJoinTree.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1311,6 +1311,8 @@ JoinTreeQueryPlan buildQueryPlanForTableExpression(QueryTreeNodePtr table_expres
/// Hopefully there is no other case when we read from Distributed up to FetchColumns.
if (table_node && table_node->getStorage()->isRemote() && select_query_options.to_stage == QueryProcessingStage::FetchColumns)
updated_actions_dag_outputs.push_back(output_node);
else if (table_function_node && table_function_node->getStorage()->isRemote())
updated_actions_dag_outputs.push_back(output_node);
}
else
updated_actions_dag_outputs.push_back(&rename_actions_dag.addAlias(*output_node, *column_identifier));
Expand Down
218 changes: 214 additions & 4 deletions src/Storages/IStorageCluster.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,11 @@
#include <Storages/IStorage.h>
#include <Storages/SelectQueryInfo.h>
#include <Storages/StorageDictionary.h>
#include <Planner/Utils.h>
#include <Analyzer/QueryTreeBuilder.h>
#include <Analyzer/QueryNode.h>
#include <Analyzer/ColumnNode.h>
#include <Analyzer/InDepthQueryTreeVisitor.h>
#include <Storages/StorageDistributed.h>
#include <TableFunctions/TableFunctionFactory.h>
#include <Storages/extractTableFunctionFromSelectQuery.h>
Expand All @@ -47,12 +52,14 @@ namespace Setting
extern const SettingsString cluster_for_parallel_replicas;
extern const SettingsNonZeroUInt64 max_parallel_replicas;
extern const SettingsUInt64 object_storage_max_nodes;
extern const SettingsObjectStorageClusterJoinMode object_storage_cluster_join_mode;
extern const SettingsBool object_storage_remote_initiator;
}

namespace ErrorCodes
{
extern const int NOT_IMPLEMENTED;
extern const int LOGICAL_ERROR;
}

namespace ErrorCodes
Expand Down Expand Up @@ -89,6 +96,175 @@ void ReadFromCluster::createExtension(const ActionsDAG::Node * predicate)
extension = storage->getTaskIteratorExtension(predicate, filter_actions_dag, context, cluster);
}

namespace
{

/*
Helping class to find in query tree first node of required type
*/
class SearcherVisitor : public InDepthQueryTreeVisitorWithContext<SearcherVisitor>
{
public:
using Base = InDepthQueryTreeVisitorWithContext<SearcherVisitor>;
using Base::Base;

explicit SearcherVisitor(QueryTreeNodeType type_, ContextPtr context) : Base(context), type(type_) {}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it looks like the code is only for analyzer (i.e. build on the top of QueryTree, not on the top of AST). It's ok for new features, but maybe it's better to document it clearly. I was also thinking about adding throw when analizer is disabled. WDYT?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Full support for non-anylyzer code can be more challenging (see for example JoinToSubqueryTransformVisitor)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added exception


bool needChildVisit(QueryTreeNodePtr &, QueryTreeNodePtr & /*child*/)
{
return !passed_node;
}

void enterImpl(QueryTreeNodePtr & node)
{
if (passed_node)
return;

auto node_type = node->getNodeType();

if (node_type == type)
passed_node = node;
}

QueryTreeNodePtr getNode() const { return passed_node; }
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually i never looked close on the anazyzer code, thought it should be simpler (i.e. avoiding tree traversal, but i can be wrong).


private:
QueryTreeNodeType type;
QueryTreeNodePtr passed_node;
};

/*
Helping class to find all used columns with specific source
*/
class CollectUsedColumnsForSourceVisitor : public InDepthQueryTreeVisitorWithContext<CollectUsedColumnsForSourceVisitor>
{
public:
using Base = InDepthQueryTreeVisitorWithContext<CollectUsedColumnsForSourceVisitor>;
using Base::Base;

explicit CollectUsedColumnsForSourceVisitor(
QueryTreeNodePtr source_,
ContextPtr context,
bool collect_columns_from_other_sources_ = false)
: Base(context)
, source(source_)
, collect_columns_from_other_sources(collect_columns_from_other_sources_)
{}

void enterImpl(QueryTreeNodePtr & node)
{
auto node_type = node->getNodeType();

if (node_type != QueryTreeNodeType::COLUMN)
return;

auto & column_node = node->as<ColumnNode &>();
auto column_source = column_node.getColumnSourceOrNull();
if (!column_source)
return;

if ((column_source == source) != collect_columns_from_other_sources)
{
const auto & name = column_node.getColumnName();
if (!names.count(name))
{
columns.emplace_back(column_node.getColumn());
names.insert(name);
}
}
}

const NamesAndTypes & getColumns() const { return columns; }

private:
std::unordered_set<std::string> names;
QueryTreeNodePtr source;
NamesAndTypes columns;
bool collect_columns_from_other_sources;
};

};

/*
Try to make subquery to send on nodes
Converts

SELECT s3.c1, s3.c2, t.c3
FROM
s3Cluster(...) AS s3
JOIN
localtable as t
ON s3.key == t.key

to

SELECT s3.c1, s3.c2, s3.key
FROM
s3Cluster(...) AS s3
*/
void IStorageCluster::updateQueryWithJoinToSendIfNeeded(
ASTPtr & query_to_send,
QueryTreeNodePtr query_tree,
const ContextPtr & context)
{
auto object_storage_cluster_join_mode = context->getSettingsRef()[Setting::object_storage_cluster_join_mode];
switch (object_storage_cluster_join_mode)
{
case ObjectStorageClusterJoinMode::LOCAL:
{
auto modified_query_tree = query_tree->clone();
bool need_modify = false;

SearcherVisitor table_function_searcher(QueryTreeNodeType::TABLE_FUNCTION, context);
table_function_searcher.visit(query_tree);
auto table_function_node = table_function_searcher.getNode();
if (!table_function_node)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Can't find table function node");

if (has_join)
{
auto table_function = extractTableFunctionASTPtrFromSelectQuery(query_to_send);
auto query_tree_distributed = buildTableFunctionQueryTree(table_function, context);
auto & table_function_ast = table_function->as<ASTFunction &>();
query_tree_distributed->setAlias(table_function_ast.alias);

// Find add used columns from table function to make proper projection list
CollectUsedColumnsForSourceVisitor collector(table_function_node, context);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe things like tryResolveIdentifierFromJoinTreeNode / tryResolveIdentifierFromJoinTree / tryResolveIdentifierFromTableExpression can be reused.

collector.visit(query_tree);
const auto & columns = collector.getColumns();

auto & query_node = modified_query_tree->as<QueryNode &>();
query_node.resolveProjectionColumns(columns);
auto column_nodes_to_select = std::make_shared<ListNode>();
column_nodes_to_select->getNodes().reserve(columns.size());
for (auto & column : columns)
column_nodes_to_select->getNodes().emplace_back(std::make_shared<ColumnNode>(column, table_function_node));
query_node.getProjectionNode() = column_nodes_to_select;

// Left only table function to send on cluster nodes
modified_query_tree = modified_query_tree->cloneAndReplace(query_node.getJoinTree(), query_tree_distributed);

need_modify = true;
}

if (has_local_columns_in_where)
{
auto & query_node = modified_query_tree->as<QueryNode &>();
query_node.getWhere() = {};
}

if (need_modify)
query_to_send = queryNodeToDistributedSelectQuery(modified_query_tree);
return;
}
case ObjectStorageClusterJoinMode::GLOBAL:
// TODO
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "`Global` mode for `object_storage_cluster_join_mode` setting is unimplemented for now");
case ObjectStorageClusterJoinMode::ALLOW: // Do nothing special
return;
}
}

/// The code executes on initiator
void IStorageCluster::read(
QueryPlan & query_plan,
Expand Down Expand Up @@ -119,13 +295,15 @@ void IStorageCluster::read(
Block sample_block;
ASTPtr query_to_send = query_info.query;

updateQueryWithJoinToSendIfNeeded(query_to_send, query_info.query_tree, context);

if (settings[Setting::allow_experimental_analyzer])
{
sample_block = InterpreterSelectQueryAnalyzer::getSampleBlock(query_info.query, context, SelectQueryOptions(processed_stage));
sample_block = InterpreterSelectQueryAnalyzer::getSampleBlock(query_to_send, context, SelectQueryOptions(processed_stage));
}
else
{
auto interpreter = InterpreterSelectQuery(query_info.query, context, SelectQueryOptions(processed_stage).analyze());
auto interpreter = InterpreterSelectQuery(query_to_send, context, SelectQueryOptions(processed_stage).analyze());
sample_block = interpreter.getSampleBlock();
query_to_send = interpreter.getQueryInfo().query->clone();
}
Expand All @@ -144,7 +322,7 @@ void IStorageCluster::read(
}

RestoreQualifiedNamesVisitor::Data data;
data.distributed_table = DatabaseAndTableWithAlias(*getTableExpression(query_info.query->as<ASTSelectQuery &>(), 0));
data.distributed_table = DatabaseAndTableWithAlias(*getTableExpression(query_to_send->as<ASTSelectQuery &>(), 0));
data.remote_table.database = context->getCurrentDatabase();
data.remote_table.table = getName();
RestoreQualifiedNamesVisitor(data).visit(query_to_send);
Expand Down Expand Up @@ -309,8 +487,40 @@ void ReadFromCluster::initializePipeline(QueryPipelineBuilder & pipeline, const
}

QueryProcessingStage::Enum IStorageCluster::getQueryProcessingStage(
ContextPtr context, QueryProcessingStage::Enum to_stage, const StorageSnapshotPtr &, SelectQueryInfo &) const
ContextPtr context, QueryProcessingStage::Enum to_stage, const StorageSnapshotPtr &, SelectQueryInfo & query_info) const
{
auto object_storage_cluster_join_mode = context->getSettingsRef()[Setting::object_storage_cluster_join_mode];

if (object_storage_cluster_join_mode != ObjectStorageClusterJoinMode::ALLOW)
{
if (!context->getSettingsRef()[Setting::allow_experimental_analyzer])
throw Exception(ErrorCodes::NOT_IMPLEMENTED,
"object_storage_cluster_join_mode!='allow' is not supported without allow_experimental_analyzer=true");

SearcherVisitor join_searcher(QueryTreeNodeType::JOIN, context);
join_searcher.visit(query_info.query_tree);
if (join_searcher.getNode())
has_join = true;

SearcherVisitor table_function_searcher(QueryTreeNodeType::TABLE_FUNCTION, context);
table_function_searcher.visit(query_info.query_tree);
auto table_function_node = table_function_searcher.getNode();
if (!table_function_node)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Can't find table function node");

CollectUsedColumnsForSourceVisitor collector_where(table_function_node, context, true);
auto & query_node = query_info.query_tree->as<QueryNode &>();
if (query_node.hasWhere())
collector_where.visit(query_node.getWhere());

// Can't use 'WHERE' on remote node if it contains columns from other sources
if (!collector_where.getColumns().empty())
has_local_columns_in_where = true;

if (has_join || has_local_columns_in_where)
return QueryProcessingStage::Enum::FetchColumns;
}

/// Initiator executes query on remote node.
if (context->getClientInfo().query_kind == ClientInfo::QueryKind::INITIAL_QUERY)
if (to_stage >= QueryProcessingStage::Enum::WithMergeableState)
Expand Down
4 changes: 4 additions & 0 deletions src/Storages/IStorageCluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ class IStorageCluster : public IStorage

protected:
virtual void updateQueryToSendIfNeeded(ASTPtr & /*query*/, const StorageSnapshotPtr & /*storage_snapshot*/, const ContextPtr & /*context*/) {}
void updateQueryWithJoinToSendIfNeeded(ASTPtr & query_to_send, QueryTreeNodePtr query_tree, const ContextPtr & context);

struct RemoteCallVariables
{
Expand Down Expand Up @@ -101,6 +102,9 @@ class IStorageCluster : public IStorage

LoggerPtr log;
String cluster_name;

mutable bool has_join = false;
mutable bool has_local_columns_in_where = false;
};


Expand Down
Loading
Loading