Skip to content

Commit 60c312c

Browse files
authored
Merge pull request #1106 from Altinity/parquet_parallel_formatting
Preserve parquet specific parallel formatting for export part
2 parents 6c60493 + 8238426 commit 60c312c

File tree

5 files changed

+19
-5
lines changed

5 files changed

+19
-5
lines changed

src/Storages/MergeTree/MergeTreeData.cpp

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -213,6 +213,7 @@ namespace Setting
213213
extern const SettingsUInt64 min_bytes_to_use_direct_io;
214214
extern const SettingsBool export_merge_tree_part_overwrite_file_if_exists;
215215
extern const SettingsBool output_format_parallel_formatting;
216+
extern const SettingsBool output_format_parquet_parallel_encoding;
216217
}
217218

218219
namespace MergeTreeSetting
@@ -6244,7 +6245,9 @@ void MergeTreeData::exportPartToTable(const PartitionCommand & command, ContextP
62446245
dest_storage->getStorageID(),
62456246
part,
62466247
query_context->getSettingsRef()[Setting::export_merge_tree_part_overwrite_file_if_exists],
6247-
query_context->getSettingsRef()[Setting::output_format_parallel_formatting]);
6248+
query_context->getSettingsRef()[Setting::output_format_parallel_formatting],
6249+
query_context->getSettingsRef()[Setting::output_format_parquet_parallel_encoding],
6250+
query_context->getSettingsRef()[Setting::max_threads]);
62486251

62496252
std::lock_guard lock(export_manifests_mutex);
62506253

@@ -6292,6 +6295,8 @@ void MergeTreeData::exportPartToTableImpl(
62926295
{
62936296
auto context_copy = Context::createCopy(local_context);
62946297
context_copy->setSetting("output_format_parallel_formatting", manifest.parallel_formatting);
6298+
context_copy->setSetting("output_format_parquet_parallel_encoding", manifest.parquet_parallel_encoding);
6299+
context_copy->setSetting("max_threads", manifest.max_threads);
62956300

62966301
sink = destination_storage->import(
62976302
manifest.data_part->name + "_" + manifest.data_part->checksums.getTotalChecksumHex(),

src/Storages/MergeTree/MergeTreeExportManifest.h

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,17 +13,24 @@ struct MergeTreeExportManifest
1313
const StorageID & destination_storage_id_,
1414
const DataPartPtr & data_part_,
1515
bool overwrite_file_if_exists_,
16-
bool parallel_formatting_)
16+
bool parallel_formatting_,
17+
bool parallel_formatting_parquet_,
18+
std::size_t max_threads_)
1719
: destination_storage_id(destination_storage_id_),
1820
data_part(data_part_),
1921
overwrite_file_if_exists(overwrite_file_if_exists_),
2022
parallel_formatting(parallel_formatting_),
23+
parquet_parallel_encoding(parallel_formatting_parquet_),
24+
max_threads(max_threads_),
2125
create_time(time(nullptr)) {}
2226

2327
StorageID destination_storage_id;
2428
DataPartPtr data_part;
2529
bool overwrite_file_if_exists;
2630
bool parallel_formatting;
31+
/// parquet has a different setting for parallel formatting
32+
bool parquet_parallel_encoding;
33+
std::size_t max_threads;
2734

2835
time_t create_time;
2936
mutable bool in_progress = false;

src/Storages/MergeTree/MergeTreeSequentialSource.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -169,7 +169,8 @@ MergeTreeSequentialSource::MergeTreeSequentialSource(
169169
addThrottler(read_settings.local_throttler, context->getMergesThrottler());
170170
break;
171171
case Export:
172-
read_settings.local_throttler = context->getExportsThrottler();
172+
addThrottler(read_settings.local_throttler, context->getExportsThrottler());
173+
addThrottler(read_settings.remote_throttler, context->getExportsThrottler());
173174
break;
174175
}
175176

src/Storages/ObjectStorage/ObjectStorageFilePathGenerator.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ namespace DB
5555

5656
result += raw_path;
5757

58-
if (raw_path.back() != '/')
58+
if (!raw_path.empty() && raw_path.back() != '/')
5959
{
6060
result += "/";
6161
}

src/Storages/ObjectStorage/StorageObjectStorage.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
#include <optional>
12
#include <thread>
23
#include <Core/ColumnWithTypeAndName.h>
34
#include <Storages/ObjectStorage/StorageObjectStorage.h>
@@ -507,7 +508,7 @@ SinkToStoragePtr StorageObjectStorage::import(
507508
destination_file_path,
508509
object_storage,
509510
configuration,
510-
format_settings,
511+
std::nullopt, /// passing nullopt to force rebuild for format_settings based on query context
511512
std::make_shared<const Block>(getInMemoryMetadataPtr()->getSampleBlock()),
512513
local_context);
513514
}

0 commit comments

Comments
 (0)