Skip to content
7 changes: 6 additions & 1 deletion src/Storages/MergeTree/MergeTreeData.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,7 @@ namespace Setting
extern const SettingsUInt64 min_bytes_to_use_direct_io;
extern const SettingsBool export_merge_tree_part_overwrite_file_if_exists;
extern const SettingsBool output_format_parallel_formatting;
extern const SettingsBool output_format_parquet_parallel_encoding;
}

namespace MergeTreeSetting
Expand Down Expand Up @@ -6244,7 +6245,9 @@ void MergeTreeData::exportPartToTable(const PartitionCommand & command, ContextP
dest_storage->getStorageID(),
part,
query_context->getSettingsRef()[Setting::export_merge_tree_part_overwrite_file_if_exists],
query_context->getSettingsRef()[Setting::output_format_parallel_formatting]);
query_context->getSettingsRef()[Setting::output_format_parallel_formatting],
query_context->getSettingsRef()[Setting::output_format_parquet_parallel_encoding],
query_context->getSettingsRef()[Setting::max_threads]);

std::lock_guard lock(export_manifests_mutex);

Expand Down Expand Up @@ -6292,6 +6295,8 @@ void MergeTreeData::exportPartToTableImpl(
{
auto context_copy = Context::createCopy(local_context);
context_copy->setSetting("output_format_parallel_formatting", manifest.parallel_formatting);
context_copy->setSetting("output_format_parquet_parallel_encoding", manifest.parallel_formatting_parquet);
context_copy->setSetting("max_threads", manifest.max_threads);

sink = destination_storage->import(
manifest.data_part->name + "_" + manifest.data_part->checksums.getTotalChecksumHex(),
Expand Down
9 changes: 8 additions & 1 deletion src/Storages/MergeTree/MergeTreeExportManifest.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,24 @@ struct MergeTreeExportManifest
const StorageID & destination_storage_id_,
const DataPartPtr & data_part_,
bool overwrite_file_if_exists_,
bool parallel_formatting_)
bool parallel_formatting_,
bool parallel_formatting_parquet_,
std::size_t max_threads_)
: destination_storage_id(destination_storage_id_),
data_part(data_part_),
overwrite_file_if_exists(overwrite_file_if_exists_),
parallel_formatting(parallel_formatting_),
parallel_formatting_parquet(parallel_formatting_parquet_),
max_threads(max_threads_),
create_time(time(nullptr)) {}

StorageID destination_storage_id;
DataPartPtr data_part;
bool overwrite_file_if_exists;
bool parallel_formatting;
/// parquet has a different setting for parallel formatting
bool parallel_formatting_parquet;
Copy link
Member

@Enmk Enmk Nov 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe rename that one to be similar to the setting name? something like

Suggested change
bool parallel_formatting_parquet;
bool parquet_parallel_encoding;

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

std::size_t max_threads;

time_t create_time;
mutable bool in_progress = false;
Expand Down
3 changes: 2 additions & 1 deletion src/Storages/MergeTree/MergeTreeSequentialSource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,8 @@ MergeTreeSequentialSource::MergeTreeSequentialSource(
addThrottler(read_settings.local_throttler, context->getMergesThrottler());
break;
case Export:
read_settings.local_throttler = context->getExportsThrottler();
addThrottler(read_settings.local_throttler, context->getExportsThrottler());
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Merge glitch, only found it now

addThrottler(read_settings.remote_throttler, context->getExportsThrottler());
break;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ namespace DB

result += raw_path;

if (raw_path.back() != '/')
if (!raw_path.empty() && raw_path.back() != '/')
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unrelated but required

{
result += "/";
}
Expand Down
3 changes: 2 additions & 1 deletion src/Storages/ObjectStorage/StorageObjectStorage.cpp
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
#include <optional>
#include <thread>
#include <Core/ColumnWithTypeAndName.h>
#include <Storages/ObjectStorage/StorageObjectStorage.h>
Expand Down Expand Up @@ -507,7 +508,7 @@ SinkToStoragePtr StorageObjectStorage::import(
destination_file_path,
object_storage,
configuration,
format_settings,
std::nullopt, /// passing nullopt to force rebuild for format_settings based on query context
std::make_shared<const Block>(getInMemoryMetadataPtr()->getSampleBlock()),
local_context);
}
Expand Down
Loading