Skip to content

Commit 1abef61

Browse files
authored
Merge pull request #883 from Altinity/backports/25.3/81451_iceberg_support_compressed_metadata
2 parents eb56e01 + 99abc24 commit 1abef61

File tree

2 files changed

+95
-14
lines changed

2 files changed

+95
-14
lines changed

src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.cpp

Lines changed: 54 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
#include <Storages/ObjectStorage/StorageObjectStorageSource.h>
1414
#include <Storages/ObjectStorage/StorageObjectStorageSettings.h>
1515
#include <Interpreters/ExpressionActions.h>
16+
#include <IO/CompressedReadBufferWrapper.h>
1617

1718
#include <Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.h>
1819
#include <Storages/ObjectStorage/DataLakes/Iceberg/Utils.h>
@@ -106,10 +107,16 @@ std::string normalizeUuid(const std::string & uuid)
106107
}
107108

108109
Poco::JSON::Object::Ptr
109-
readJSON(const String & metadata_file_path, ObjectStoragePtr object_storage, const ContextPtr & local_context, LoggerPtr log)
110+
readJSON(const String & metadata_file_path, ObjectStoragePtr object_storage, const ContextPtr & local_context, LoggerPtr log, CompressionMethod compression_method)
110111
{
111112
ObjectInfo object_info(metadata_file_path);
112-
auto buf = StorageObjectStorageSource::createReadBuffer(object_info, object_storage, local_context, log);
113+
auto source_buf = StorageObjectStorageSource::createReadBuffer(object_info, object_storage, local_context, log);
114+
115+
std::unique_ptr<ReadBuffer> buf;
116+
if (compression_method != CompressionMethod::None)
117+
buf = wrapReadBufferWithCompressionMethod(std::move(source_buf), compression_method);
118+
else
119+
buf = std::move(source_buf);
113120

114121
String json_str;
115122
readJSONObjectPossiblyInvalid(json_str, *buf);
@@ -263,7 +270,30 @@ Int32 IcebergMetadata::parseTableSchema(
263270
}
264271
}
265272

266-
static std::pair<Int32, String> getMetadataFileAndVersion(const std::string & path)
273+
struct MetadataFileWithInfo
274+
{
275+
Int32 version;
276+
String path;
277+
CompressionMethod compression_method;
278+
};
279+
280+
static CompressionMethod getCompressionMethodFromMetadataFile(const String & path)
281+
{
282+
constexpr std::string_view metadata_suffix = ".metadata.json";
283+
284+
auto compression_method = chooseCompressionMethod(path, "auto");
285+
286+
/// NOTE you will be surprised, but some metadata files store compression not in the end of the file name,
287+
/// but somewhere in the middle of the file name, before metadata.json suffix.
288+
/// Maybe history of Iceberg metadata files is not so long, but it is already full of surprises.
289+
/// Example of weird engineering decisions: 00000-85befd5a-69c7-46d4-bca6-cfbd67f0f7e6.gz.metadata.json
290+
if (compression_method == CompressionMethod::None && path.ends_with(metadata_suffix))
291+
compression_method = chooseCompressionMethod(path.substr(0, path.size() - metadata_suffix.size()), "auto");
292+
293+
return compression_method;
294+
}
295+
296+
static MetadataFileWithInfo getMetadataFileAndVersion(const std::string & path)
267297
{
268298
String file_name(path.begin() + path.find_last_of('/') + 1, path.end());
269299
String version_str;
@@ -278,7 +308,10 @@ static std::pair<Int32, String> getMetadataFileAndVersion(const std::string & pa
278308
throw Exception(
279309
ErrorCodes::BAD_ARGUMENTS, "Bad metadata file name: {}. Expected vN.metadata.json where N is a number", file_name);
280310

281-
return std::make_pair(std::stoi(version_str), path);
311+
return MetadataFileWithInfo{
312+
.version = std::stoi(version_str),
313+
.path = path,
314+
.compression_method = getCompressionMethodFromMetadataFile(path)};
282315
}
283316

284317
enum class MostRecentMetadataFileSelectionWay
@@ -289,7 +322,7 @@ enum class MostRecentMetadataFileSelectionWay
289322

290323
struct ShortMetadataFileInfo
291324
{
292-
UInt32 version;
325+
Int32 version;
293326
UInt64 last_updated_ms;
294327
String path;
295328
};
@@ -301,7 +334,7 @@ struct ShortMetadataFileInfo
301334
* 1) v<V>.metadata.json, where V - metadata version.
302335
* 2) <V>-<random-uuid>.metadata.json, where V - metadata version
303336
*/
304-
static std::pair<Int32, String> getLatestMetadataFileAndVersion(
337+
static MetadataFileWithInfo getLatestMetadataFileAndVersion(
305338
const ObjectStoragePtr & object_storage,
306339
const StorageObjectStorage::Configuration & configuration,
307340
const ContextPtr & local_context,
@@ -324,10 +357,10 @@ static std::pair<Int32, String> getLatestMetadataFileAndVersion(
324357
metadata_files_with_versions.reserve(metadata_files.size());
325358
for (const auto & path : metadata_files)
326359
{
327-
auto [version, metadata_file_path] = getMetadataFileAndVersion(path);
360+
auto [version, metadata_file_path, compression_method] = getMetadataFileAndVersion(path);
328361
if (need_all_metadata_files_parsing)
329362
{
330-
auto metadata_file_object = readJSON(metadata_file_path, object_storage, local_context, log);
363+
auto metadata_file_object = readJSON(metadata_file_path, object_storage, local_context, log, compression_method);
331364
if (table_uuid.has_value())
332365
{
333366
if (metadata_file_object->has("table-uuid"))
@@ -377,10 +410,11 @@ static std::pair<Int32, String> getLatestMetadataFileAndVersion(
377410
[](const ShortMetadataFileInfo & a, const ShortMetadataFileInfo & b) { return a.version < b.version; });
378411
}
379412
}();
380-
return {latest_metadata_file_info.version, latest_metadata_file_info.path};
413+
414+
return {latest_metadata_file_info.version, latest_metadata_file_info.path, getCompressionMethodFromMetadataFile(latest_metadata_file_info.path)};
381415
}
382416

383-
static std::pair<Int32, String> getLatestOrExplicitMetadataFileAndVersion(
417+
static MetadataFileWithInfo getLatestOrExplicitMetadataFileAndVersion(
384418
const ObjectStoragePtr & object_storage,
385419
const StorageObjectStorage::Configuration & configuration,
386420
const ContextPtr & local_context,
@@ -425,14 +459,14 @@ bool IcebergMetadata::update(const ContextPtr & local_context)
425459
{
426460
auto configuration_ptr = configuration.lock();
427461

428-
const auto [metadata_version, metadata_file_path]
462+
const auto [metadata_version, metadata_file_path, compression_method]
429463
= getLatestOrExplicitMetadataFileAndVersion(object_storage, *configuration_ptr, local_context, log.get());
430464

431465
bool metadata_file_changed = false;
432466
if (last_metadata_version != metadata_version)
433467
{
434468
last_metadata_version = metadata_version;
435-
last_metadata_object = ::DB::readJSON(metadata_file_path, object_storage, local_context, log);
469+
last_metadata_object = ::DB::readJSON(metadata_file_path, object_storage, local_context, log, compression_method);
436470
metadata_file_changed = true;
437471
}
438472

@@ -594,12 +628,18 @@ DataLakeMetadataPtr IcebergMetadata::create(
594628
else
595629
LOG_TRACE(log, "Not using in-memory cache for iceberg metadata files, because the setting use_iceberg_metadata_files_cache is false.");
596630

597-
const auto [metadata_version, metadata_file_path] = getLatestOrExplicitMetadataFileAndVersion(object_storage, *configuration_ptr, local_context, log.get());
631+
const auto [metadata_version, metadata_file_path, compression_method] = getLatestOrExplicitMetadataFileAndVersion(object_storage, *configuration_ptr, local_context, log.get());
598632

599633
auto create_fn = [&]()
600634
{
601635
ObjectInfo object_info(metadata_file_path); // NOLINT
602-
auto buf = StorageObjectStorageSource::createReadBuffer(object_info, object_storage, local_context, log);
636+
auto source_buf = StorageObjectStorageSource::createReadBuffer(object_info, object_storage, local_context, log);
637+
638+
std::unique_ptr<ReadBuffer> buf;
639+
if (compression_method != CompressionMethod::None)
640+
buf = wrapReadBufferWithCompressionMethod(std::move(source_buf), compression_method);
641+
else
642+
buf = std::move(source_buf);
603643

604644
String json_str;
605645
readJSONObjectPossiblyInvalid(json_str, *buf);

tests/integration/test_storage_iceberg/test.py

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import logging
22
import os
3+
import subprocess
34
import uuid
45
import time
56
from datetime import datetime, timezone
@@ -3267,3 +3268,43 @@ def check_validity_and_get_prunned_files(select_expression):
32673268

32683269
for query in queries:
32693270
assert check_validity_and_get_prunned_files(query) > 0
3271+
3272+
3273+
@pytest.mark.parametrize("storage_type", ["local", "s3"])
3274+
def test_compressed_metadata(started_cluster, storage_type):
3275+
instance = started_cluster.instances["node1"]
3276+
spark = started_cluster.spark_session
3277+
TABLE_NAME = "test_compressed_metadata_" + storage_type + "_" + get_uuid_str()
3278+
3279+
table_properties = {
3280+
"write.metadata.compression": "gzip"
3281+
}
3282+
3283+
df = spark.createDataFrame([
3284+
(1, "Alice"),
3285+
(2, "Bob")
3286+
], ["id", "name"])
3287+
3288+
# for some reason write.metadata.compression is not working :(
3289+
df.writeTo(TABLE_NAME) \
3290+
.tableProperty("write.metadata.compression", "gzip") \
3291+
.using("iceberg") \
3292+
.create()
3293+
3294+
# manual compression of metadata file before upload, still test some scenarios
3295+
subprocess.check_output(f"gzip /iceberg_data/default/{TABLE_NAME}/metadata/v1.metadata.json", shell=True)
3296+
3297+
# Weird but compression extension is really in the middle of the file name, not in the end...
3298+
subprocess.check_output(f"mv /iceberg_data/default/{TABLE_NAME}/metadata/v1.metadata.json.gz /iceberg_data/default/{TABLE_NAME}/metadata/v1.gz.metadata.json", shell=True)
3299+
3300+
default_upload_directory(
3301+
started_cluster,
3302+
storage_type,
3303+
f"/iceberg_data/default/{TABLE_NAME}/",
3304+
f"/iceberg_data/default/{TABLE_NAME}/",
3305+
)
3306+
3307+
create_iceberg_table(storage_type, instance, TABLE_NAME, started_cluster, explicit_metadata_path="")
3308+
3309+
assert instance.query(f"SELECT * FROM {TABLE_NAME} WHERE not ignore(*)") == "1\tAlice\n2\tBob\n"
3310+

0 commit comments

Comments
 (0)