Skip to content

Commit e74130f

Browse files
committed
Merge antalya-25.6.5
2 parents 0e7ffe0 + 89d7ee8 commit e74130f

12 files changed

+256
-54
lines changed

src/Core/Protocol.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,8 +96,10 @@ namespace Protocol
9696
MergeTreeReadTaskRequest = 16, /// Request from a MergeTree replica to a coordinator
9797
TimezoneUpdate = 17, /// Receive server's (session-wide) default timezone
9898
SSHChallenge = 18, /// Return challenge for SSH signature signing
99+
99100
MAX = SSHChallenge,
100101

102+
ConnectionLost = 255, /// Exception that occurred on the client side.
101103
};
102104

103105
/// NOTE: If the type of packet argument would be Enum, the comparison packet >= 0 && packet < 10

src/Core/Settings.cpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6883,6 +6883,9 @@ Possible values:
68836883
- '' - do not force any kind of Exchange operators, let the optimizer choose,
68846884
- 'Persisted' - use temporary files in object storage,
68856885
- 'Streaming' - stream exchange data over network.
6886+
)", EXPERIMENTAL) \
6887+
DECLARE(Bool, allow_retries_in_cluster_requests, false, R"(
6888+
Allow retries in cluster request, when one node goes offline
68866889
)", EXPERIMENTAL) \
68876890
DECLARE(Bool, object_storage_remote_initiator, false, R"(
68886891
Execute request to object storage as remote on one of object_storage_cluster nodes.

src/Core/SettingsChangesHistory.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -196,6 +196,8 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory()
196196
{"parallel_replicas_for_cluster_engines", false, true, "New setting."},
197197
{"parallel_hash_join_threshold", 0, 0, "New setting"},
198198
/// Release closed. Please use 25.4
199+
{"use_object_storage_list_objects_cache", true, false, "New setting."},
200+
{"allow_retries_in_cluster_requests", false, false, "New setting."},
199201
});
200202
addSettingsChanges(settings_changes_history, "24.12.2.20000",
201203
{

src/QueryPipeline/RemoteQueryExecutor.cpp

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ namespace Setting
5252
extern const SettingsBool use_hedged_requests;
5353
extern const SettingsBool push_external_roles_in_interserver_queries;
5454
extern const SettingsMilliseconds parallel_replicas_connect_timeout_ms;
55+
extern const SettingsBool allow_retries_in_cluster_requests;
5556
}
5657

5758
namespace ErrorCodes
@@ -82,6 +83,7 @@ RemoteQueryExecutor::RemoteQueryExecutor(
8283
, extension(extension_)
8384
, priority_func(priority_func_)
8485
, read_packet_type_separately(context->canUseParallelReplicasOnInitiator() && !context->getSettingsRef()[Setting::use_hedged_requests])
86+
, allow_retries_in_cluster_requests(context->getSettingsRef()[Setting::allow_retries_in_cluster_requests])
8587
{
8688
if (stage == QueryProcessingStage::QueryPlan && !query_plan)
8789
throw Exception(ErrorCodes::LOGICAL_ERROR, "Query plan is not passed for QueryPlan processing stage");
@@ -484,7 +486,8 @@ int RemoteQueryExecutor::sendQueryAsync()
484486
read_context = std::make_unique<ReadContext>(
485487
*this,
486488
/*suspend_when_query_sent*/ true,
487-
read_packet_type_separately);
489+
read_packet_type_separately,
490+
allow_retries_in_cluster_requests);
488491

489492
/// If query already sent, do nothing. Note that we cannot use sent_query flag here,
490493
/// because we can still be in process of sending scalars or external tables.
@@ -557,7 +560,8 @@ RemoteQueryExecutor::ReadResult RemoteQueryExecutor::readAsync()
557560
read_context = std::make_unique<ReadContext>(
558561
*this,
559562
/*suspend_when_query_sent*/ false,
560-
read_packet_type_separately);
563+
read_packet_type_separately,
564+
allow_retries_in_cluster_requests);
561565
recreate_read_context = false;
562566
}
563567

@@ -681,7 +685,11 @@ RemoteQueryExecutor::ReadResult RemoteQueryExecutor::processPacket(Packet packet
681685
/// We can actually return it, and the first call to RemoteQueryExecutor::read
682686
/// will return earlier. We should consider doing it.
683687
if (packet.block && (packet.block.rows() > 0))
688+
{
689+
if (extension && extension->replica_info)
690+
replica_has_processed_data.insert(extension->replica_info->number_of_current_replica);
684691
return ReadResult(adaptBlockStructure(packet.block, header));
692+
}
685693
break; /// If the block is empty - we will receive other packets before EndOfStream.
686694

687695
case Protocol::Server::Exception:
@@ -743,6 +751,22 @@ RemoteQueryExecutor::ReadResult RemoteQueryExecutor::processPacket(Packet packet
743751
case Protocol::Server::TimezoneUpdate:
744752
break;
745753

754+
case Protocol::Server::ConnectionLost:
755+
if (allow_retries_in_cluster_requests)
756+
{
757+
if (extension && extension->task_iterator && extension->task_iterator->supportRerunTask() && extension->replica_info)
758+
{
759+
if (!replica_has_processed_data.contains(extension->replica_info->number_of_current_replica))
760+
{
761+
finished = true;
762+
extension->task_iterator->rescheduleTasksFromReplica(extension->replica_info->number_of_current_replica);
763+
return ReadResult(Block{});
764+
}
765+
}
766+
}
767+
packet.exception->rethrow();
768+
break;
769+
746770
default:
747771
got_unknown_packet_from_replica = true;
748772
throw Exception(

src/QueryPipeline/RemoteQueryExecutor.h

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,22 @@ class RemoteQueryExecutorReadContext;
2828

2929
class ParallelReplicasReadingCoordinator;
3030

31-
/// This is the same type as StorageS3Source::IteratorWrapper
32-
using TaskIterator = std::function<String(size_t)>;
31+
namespace ErrorCodes
32+
{
33+
extern const int NOT_IMPLEMENTED;
34+
};
35+
36+
class TaskIterator
37+
{
38+
public:
39+
virtual ~TaskIterator() = default;
40+
virtual bool supportRerunTask() const { return false; }
41+
virtual void rescheduleTasksFromReplica(size_t /* number_of_current_replica */)
42+
{
43+
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method rescheduleTasksFromReplica is not implemented");
44+
}
45+
virtual std::string operator()(size_t number_of_current_replica) const = 0;
46+
};
3347

3448
/// This class allows one to launch queries on remote replicas of one shard and get results
3549
class RemoteQueryExecutor
@@ -333,6 +347,10 @@ class RemoteQueryExecutor
333347

334348
const bool read_packet_type_separately = false;
335349

350+
const bool allow_retries_in_cluster_requests = false;
351+
352+
std::unordered_set<size_t> replica_has_processed_data;
353+
336354
/// Send all scalars to remote servers
337355
void sendScalars();
338356

src/QueryPipeline/RemoteQueryExecutorReadContext.cpp

Lines changed: 40 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,15 @@ namespace ErrorCodes
2020
}
2121

2222
RemoteQueryExecutorReadContext::RemoteQueryExecutorReadContext(
23-
RemoteQueryExecutor & executor_, bool suspend_when_query_sent_, bool read_packet_type_separately_)
23+
RemoteQueryExecutor & executor_,
24+
bool suspend_when_query_sent_,
25+
bool read_packet_type_separately_,
26+
bool allow_retries_in_cluster_requests_)
2427
: AsyncTaskExecutor(std::make_unique<Task>(*this))
2528
, executor(executor_)
2629
, suspend_when_query_sent(suspend_when_query_sent_)
2730
, read_packet_type_separately(read_packet_type_separately_)
31+
, allow_retries_in_cluster_requests(allow_retries_in_cluster_requests_)
2832
{
2933
if (-1 == pipe2(pipe_fd, O_NONBLOCK))
3034
throw ErrnoException(ErrorCodes::CANNOT_OPEN_FILE, "Cannot create pipe");
@@ -55,37 +59,48 @@ void RemoteQueryExecutorReadContext::Task::run(AsyncCallback async_callback, Sus
5559
if (read_context.executor.needToSkipUnavailableShard())
5660
return;
5761

58-
while (true)
62+
try
5963
{
60-
try
64+
while (true)
6165
{
62-
read_context.has_read_packet_part = PacketPart::None;
63-
64-
if (read_context.read_packet_type_separately)
66+
try
6567
{
66-
read_context.packet.type = read_context.executor.getConnections().receivePacketTypeUnlocked(async_callback);
67-
read_context.has_read_packet_part = PacketPart::Type;
68-
suspend_callback();
68+
read_context.has_read_packet_part = PacketPart::None;
69+
70+
if (read_context.read_packet_type_separately)
71+
{
72+
read_context.packet.type = read_context.executor.getConnections().receivePacketTypeUnlocked(async_callback);
73+
read_context.has_read_packet_part = PacketPart::Type;
74+
suspend_callback();
75+
}
76+
read_context.packet = read_context.executor.getConnections().receivePacketUnlocked(async_callback);
77+
read_context.has_read_packet_part = PacketPart::Body;
78+
if (read_context.packet.type == Protocol::Server::Data)
79+
read_context.has_data_packets = true;
6980
}
70-
read_context.packet = read_context.executor.getConnections().receivePacketUnlocked(async_callback);
71-
read_context.has_read_packet_part = PacketPart::Body;
72-
if (read_context.packet.type == Protocol::Server::Data)
73-
read_context.has_data_packets = true;
74-
}
75-
catch (const Exception & e)
76-
{
77-
/// If cluster node unxepectedly shutted down (kill/segfault/power off/etc.) socket just closes.
78-
/// If initiator did not process any data packets before, this fact can be ignored.
79-
/// Unprocessed tasks will be executed on other nodes.
80-
if (e.code() == ErrorCodes::ATTEMPT_TO_READ_AFTER_EOF
81-
&& !read_context.has_data_packets.load() && read_context.executor.skipUnavailableShards())
81+
catch (const Exception & e)
8282
{
83-
read_context.has_read_packet_part = PacketPart::None;
83+
/// If cluster node unxepectedly shutted down (kill/segfault/power off/etc.) socket just closes.
84+
/// If initiator did not process any data packets before, this fact can be ignored.
85+
/// Unprocessed tasks will be executed on other nodes.
86+
if (e.code() == ErrorCodes::ATTEMPT_TO_READ_AFTER_EOF
87+
&& !read_context.has_data_packets.load() && read_context.executor.skipUnavailableShards())
88+
{
89+
read_context.has_read_packet_part = PacketPart::None;
90+
}
91+
else
92+
throw;
8493
}
85-
else
86-
throw;
87-
}
8894

95+
suspend_callback();
96+
}
97+
}
98+
catch (const Exception &)
99+
{
100+
if (!read_context.allow_retries_in_cluster_requests)
101+
throw;
102+
read_context.packet.type = Protocol::Server::ConnectionLost;
103+
read_context.packet.exception = std::make_unique<Exception>(getCurrentExceptionMessageAndPattern(true), getCurrentExceptionCode());
89104
suspend_callback();
90105
}
91106
}

src/QueryPipeline/RemoteQueryExecutorReadContext.h

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,10 @@ class RemoteQueryExecutorReadContext : public AsyncTaskExecutor
2626
{
2727
public:
2828
explicit RemoteQueryExecutorReadContext(
29-
RemoteQueryExecutor & executor_, bool suspend_when_query_sent_, bool read_packet_type_separately_);
29+
RemoteQueryExecutor & executor_,
30+
bool suspend_when_query_sent_,
31+
bool read_packet_type_separately_,
32+
bool allow_retries_in_cluster_requests_);
3033

3134
~RemoteQueryExecutorReadContext() override;
3235

@@ -109,6 +112,7 @@ class RemoteQueryExecutorReadContext : public AsyncTaskExecutor
109112
bool suspend_when_query_sent = false;
110113
bool is_query_sent = false;
111114
const bool read_packet_type_separately = false;
115+
const bool allow_retries_in_cluster_requests = false;
112116
};
113117

114118
}

src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp

Lines changed: 25 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -439,6 +439,30 @@ void StorageObjectStorageCluster::updateQueryToSendIfNeeded(
439439
args.insert(args.end(), object_storage_type_arg);
440440
}
441441

442+
class TaskDistributor : public TaskIterator
443+
{
444+
public:
445+
TaskDistributor(std::shared_ptr<IObjectIterator> iterator,
446+
const std::vector<std::string> & ids_of_hosts,
447+
uint64_t lock_object_storage_task_distribution_ms
448+
)
449+
: task_distributor(iterator, ids_of_hosts, lock_object_storage_task_distribution_ms) {}
450+
~TaskDistributor() override = default;
451+
bool supportRerunTask() const override { return true; }
452+
void rescheduleTasksFromReplica(size_t number_of_current_replica) override
453+
{
454+
task_distributor.rescheduleTasksFromReplica(number_of_current_replica);
455+
}
456+
457+
std::string operator()(size_t number_of_current_replica) const override
458+
{
459+
return task_distributor.getNextTask(number_of_current_replica).value_or("");
460+
}
461+
462+
private:
463+
mutable StorageObjectStorageStableTaskDistributor task_distributor;
464+
};
465+
442466
RemoteQueryExecutor::Extension StorageObjectStorageCluster::getTaskIteratorExtension(
443467
const ActionsDAG::Node * predicate,
444468
const std::optional<ActionsDAG> & filter_actions_dag,
@@ -474,14 +498,7 @@ RemoteQueryExecutor::Extension StorageObjectStorageCluster::getTaskIteratorExten
474498
lock_object_storage_task_distribution_ms_max
475499
);
476500

477-
auto task_distributor = std::make_shared<StorageObjectStorageStableTaskDistributor>(
478-
iterator,
479-
ids_of_hosts,
480-
lock_object_storage_task_distribution_ms);
481-
482-
auto callback = std::make_shared<TaskIterator>(
483-
[task_distributor](size_t number_of_current_replica) mutable -> String
484-
{ return task_distributor->getNextTask(number_of_current_replica).value_or(""); });
501+
auto callback = std::make_shared<TaskDistributor>(iterator, ids_of_hosts, lock_object_storage_task_distribution_ms);
485502

486503
return RemoteQueryExecutor::Extension{.task_iterator = std::move(callback)};
487504
}

0 commit comments

Comments
 (0)