diff --git a/src/Storages/ObjectStorage/DataLakes/IDataLakeMetadata.h b/src/Storages/ObjectStorage/DataLakes/IDataLakeMetadata.h index 049a1fa8d60a..b0c049ea9b7f 100644 --- a/src/Storages/ObjectStorage/DataLakes/IDataLakeMetadata.h +++ b/src/Storages/ObjectStorage/DataLakes/IDataLakeMetadata.h @@ -60,6 +60,9 @@ class IDataLakeMetadata : boost::noncopyable virtual std::optional totalRows(ContextPtr) const { return {}; } virtual std::optional totalBytes(ContextPtr) const { return {}; } + virtual std::optional partitionKey(ContextPtr) const { return {}; } + virtual std::optional sortingKey(ContextPtr) const { return {}; } + protected: ObjectIterator createKeysIterator( Strings && data_files_, diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.cpp b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.cpp index b8b26e9765d2..de02ced00c4d 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.cpp +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.cpp @@ -529,6 +529,197 @@ bool IcebergMetadata::update(const ContextPtr & local_context) return previous_snapshot_schema_id != relevant_snapshot_schema_id; } +namespace +{ + +using IdToName = std::unordered_map; + +IdToName buildIdToNameMap(const Poco::JSON::Object::Ptr & metadata_obj) +{ + IdToName map; + if (!metadata_obj || !metadata_obj->has("current-schema-id") || !metadata_obj->has("schemas")) + return map; + + const auto current_schema_id = metadata_obj->getValue("current-schema-id"); + auto schemas = metadata_obj->getArray("schemas"); + if (!schemas) + return map; + + for (size_t i = 0; i < schemas->size(); ++i) + { + auto schema = schemas->getObject(i); + + if (!schema || !schema->has("schema-id") || (schema->getValue("schema-id") != current_schema_id)) + continue; + + if (auto fields = schema->getArray("fields")) + { + for (size_t j = 0; j < fields->size(); ++j) + { + auto f = fields->getObject(j); + if (!f || !f->has("id") || !f->has("name")) + continue; + map.emplace(f->getValue("id"), f->getValue("name")); + } + } + break; + } + return map; +} + +String formatTransform( + const String & transform, + const Poco::JSON::Object::Ptr & field_obj, + const IdToName & id_to_name) +{ + Int32 source_id = (field_obj && field_obj->has("source-id")) + ? field_obj->getValue("source-id") + : -1; + + const auto it = id_to_name.find(source_id); + const String col = (it != id_to_name.end()) ? it->second : ("col_" + toString(source_id)); + + String base = transform; + String param; + if (const auto lpos = transform.find('['); lpos != String::npos && transform.back() == ']') + { + base = transform.substr(0, lpos); + param = transform.substr(lpos + 1, transform.size() - lpos - 2); // strip [ and ] + } + + String result; + if (base == "identity") + result = col; + else if (base == "year" || base == "month" || base == "day" || base == "hour") + result = base + "(" + col + ")"; + else if (base != "void") + { + if (!param.empty()) + result = base + "(" + param + ", " + col + ")"; + else + result = base + "(" + col + ")"; + } + return result; +} + +Poco::JSON::Array::Ptr findActivePartitionFields(const Poco::JSON::Object::Ptr & metadata_obj) +{ + if (!metadata_obj) + return nullptr; + + if (metadata_obj->has("partition-spec")) + return metadata_obj->getArray("partition-spec"); + + // If for some reason there is no partition-spec, try partition-specs + default- + if (metadata_obj->has("partition-specs") && metadata_obj->has("default-spec-id")) + { + const auto default_spec_id = metadata_obj->getValue("default-spec-id"); + if (auto specs = metadata_obj->getArray("partition-specs")) + { + for (size_t i = 0; i < specs->size(); ++i) + { + auto spec = specs->getObject(i); + if (!spec || !spec->has("spec-id")) + continue; + if (spec->getValue("spec-id") == default_spec_id) + return spec->has("fields") ? spec->getArray("fields") : nullptr; + } + } + } + + return nullptr; +} + +Poco::JSON::Array::Ptr findActiveSortFields(const Poco::JSON::Object::Ptr & metadata_obj) +{ + if (!metadata_obj || !metadata_obj->has("default-sort-order-id") || !metadata_obj->has("sort-orders")) + return nullptr; + + const auto default_sort_order_id = metadata_obj->getValue("default-sort-order-id"); + auto orders = metadata_obj->getArray("sort-orders"); + if (!orders) + return nullptr; + + for (size_t i = 0; i < orders->size(); ++i) + { + auto order = orders->getObject(i); + if (!order || !order->has("order-id")) + continue; + if (order->getValue("order-id") == default_sort_order_id) + return order->has("fields") ? order->getArray("fields") : nullptr; + } + return nullptr; +} + +String composeList( + const Poco::JSON::Array::Ptr & fields, + const IdToName & id_to_name, + bool lookup_sort_modifiers) +{ + if (!fields || fields->size() == 0) + return {}; + + Strings parts; + parts.reserve(fields->size()); + + for (size_t i = 0; i < fields->size(); ++i) + { + auto field = fields->getObject(i); + if (!field) + continue; + + const String transform = field->has("transform") ? field->getValue("transform") : "identity"; + String expr = formatTransform(transform, field, id_to_name); + if (expr.empty()) + continue; + + if (lookup_sort_modifiers) + { + if (field->has("direction")) + { + auto d = field->getValue("direction"); + expr += (Poco::icompare(d, "desc") == 0) ? "DESC" : "ASC"; + } + if (field->has("null-order")) + { + auto n = field->getValue("null-order"); + expr += (Poco::icompare(n, "nulls-last") == 0) ? "NULLS LAST" : "NULLS FIRST"; + } + } + + parts.push_back(std::move(expr)); + } + + if (parts.empty()) + return {}; + + String res; + for (size_t i = 0; i < parts.size(); ++i) + { + if (i) res += ", "; + res += parts[i]; + } + return res; +} + +std::pair, std::optional> extractIcebergKeys(const Poco::JSON::Object::Ptr & metadata_obj) +{ + std::optional partition_key; + std::optional sort_key; + + if (metadata_obj) + { + auto id_to_name = buildIdToNameMap(metadata_obj); + + partition_key = composeList(findActivePartitionFields(metadata_obj), id_to_name, /*lookup_sort_modifiers=*/ false); + sort_key = composeList(findActiveSortFields(metadata_obj), id_to_name, /*lookup_sort_modifiers=*/ true); + } + + return {partition_key, sort_key}; +} + +} + void IcebergMetadata::updateSnapshot(ContextPtr local_context, Poco::JSON::Object::Ptr metadata_object) { auto configuration_ptr = configuration.lock(); @@ -563,10 +754,11 @@ void IcebergMetadata::updateSnapshot(ContextPtr local_context, Poco::JSON::Objec total_bytes = summary_object->getValue(f_total_files_size); } + auto [partition_key, sorting_key] = extractIcebergKeys(metadata_object); relevant_snapshot = IcebergSnapshot{ getManifestList(local_context, getProperFilePathFromMetadataInfo( snapshot->getValue(f_manifest_list), configuration_ptr->getPathForRead().path, table_location, configuration_ptr->getNamespace())), - relevant_snapshot_id, total_rows, total_bytes}; + relevant_snapshot_id, total_rows, total_bytes, partition_key, sorting_key}; if (!snapshot->has(f_schema_id)) throw Exception( @@ -1011,6 +1203,19 @@ std::optional IcebergMetadata::totalBytes(ContextPtr local_context) cons return result; } +std::optional IcebergMetadata::partitionKey(ContextPtr) const +{ + SharedLockGuard lock(mutex); + return relevant_snapshot->partition_key; +} + +std::optional IcebergMetadata::sortingKey(ContextPtr) const +{ + SharedLockGuard lock(mutex); + return relevant_snapshot->sorting_key; +} + + ObjectIterator IcebergMetadata::iterate( const ActionsDAG * filter_dag, FileProgressCallback callback, diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.h b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.h index 0a6d16112625..c3ada72354e7 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.h +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.h @@ -80,6 +80,9 @@ class IcebergMetadata : public IDataLakeMetadata, private WithContext std::optional totalRows(ContextPtr Local_context) const override; std::optional totalBytes(ContextPtr Local_context) const override; + std::optional partitionKey(ContextPtr) const override; + std::optional sortingKey(ContextPtr) const override; + protected: ObjectIterator iterate( const ActionsDAG * filter_dag, diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/Snapshot.h b/src/Storages/ObjectStorage/DataLakes/Iceberg/Snapshot.h index 6a0abf753c7e..59a48460bab1 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/Snapshot.h +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/Snapshot.h @@ -16,6 +16,8 @@ struct IcebergSnapshot Int64 snapshot_id; std::optional total_rows; std::optional total_bytes; + std::optional partition_key; + std::optional sorting_key; }; struct IcebergHistoryRecord diff --git a/src/Storages/System/StorageSystemTables.cpp b/src/Storages/System/StorageSystemTables.cpp index 1b1c1f9faf5c..ef23f520f74f 100644 --- a/src/Storages/System/StorageSystemTables.cpp +++ b/src/Storages/System/StorageSystemTables.cpp @@ -22,6 +22,8 @@ #include #include #include +#include +#include #include #include #include @@ -595,18 +597,54 @@ class TablesBlockSource : public ISource ASTPtr expression_ptr; if (columns_mask[src_index++]) { - if (metadata_snapshot && (expression_ptr = metadata_snapshot->getPartitionKeyAST())) - res_columns[res_index++]->insert(format({context, *expression_ptr})); - else - res_columns[res_index++]->insertDefault(); + bool inserted = false; + // Extract from specific DataLake metadata if suitable + if (auto * obj = dynamic_cast(table.get())) + { + if (auto * dl_meta = obj->getExternalMetadata(context)) + { + if (auto p = dl_meta->partitionKey(context); p.has_value()) + { + res_columns[res_index++]->insert(*p); + inserted = true; + } + } + + } + + if (!inserted) + { + if (metadata_snapshot && (expression_ptr = metadata_snapshot->getPartitionKeyAST())) + res_columns[res_index++]->insert(format({context, *expression_ptr})); + else + res_columns[res_index++]->insertDefault(); + } } if (columns_mask[src_index++]) { - if (metadata_snapshot && (expression_ptr = metadata_snapshot->getSortingKey().expression_list_ast)) - res_columns[res_index++]->insert(format({context, *expression_ptr})); - else - res_columns[res_index++]->insertDefault(); + bool inserted = false; + + // Extract from specific DataLake metadata if suitable + if (auto * obj = dynamic_cast(table.get())) + { + if (auto * dl_meta = obj->getExternalMetadata(context)) + { + if (auto p = dl_meta->sortingKey(context); p.has_value()) + { + res_columns[res_index++]->insert(*p); + inserted = true; + } + } + } + + if (!inserted) + { + if (metadata_snapshot && (expression_ptr = metadata_snapshot->getSortingKey().expression_list_ast)) + res_columns[res_index++]->insert(format({context, *expression_ptr})); + else + res_columns[res_index++]->insertDefault(); + } } if (columns_mask[src_index++]) diff --git a/tests/integration/test_storage_iceberg/test.py b/tests/integration/test_storage_iceberg/test.py index a22a61383efb..d08ab8d6605e 100644 --- a/tests/integration/test_storage_iceberg/test.py +++ b/tests/integration/test_storage_iceberg/test.py @@ -3410,6 +3410,50 @@ def execute_spark_query(query: str): instance.query(f"SELECT * FROM {table_select_expression} ORDER BY ALL") +@pytest.mark.parametrize("storage_type", ["s3"]) +def test_system_tables_partition_sorting_keys(started_cluster, storage_type): + instance = started_cluster.instances["node1"] + spark = started_cluster.spark_session + + table_name = f"test_sys_tables_keys_{storage_type}_{uuid.uuid4().hex[:8]}" + fq_table = f"spark_catalog.default.{table_name}" + + spark.sql(f"DROP TABLE IF EXISTS {fq_table}") + spark.sql(f""" + CREATE TABLE {fq_table} ( + id INT, + ts TIMESTAMP, + payload STRING + ) + USING iceberg + PARTITIONED BY (bucket(16, id), day(ts)) + TBLPROPERTIES ('format-version' = '2') + """) + spark.sql(f"ALTER TABLE {fq_table} WRITE ORDERED BY (id DESC NULLS LAST, hour(ts))") + spark.sql(f""" + INSERT INTO {fq_table} VALUES + (1, timestamp'2024-01-01 10:00:00', 'a'), + (2, timestamp'2024-01-02 11:00:00', 'b'), + (NULL, timestamp'2024-01-03 12:00:00', 'c') + """) + + time.sleep(2) + default_upload_directory( + started_cluster, + storage_type, + f"/iceberg_data/default/{table_name}/", + f"/iceberg_data/default/{table_name}/", + ) + + create_iceberg_table(storage_type, instance, table_name, started_cluster) + + res = instance.query(f""" + SELECT partition_key, sorting_key + FROM system.tables + WHERE name = '{table_name}' FORMAT csv + """).strip().lower() + + assert res == '"bucket(16, id), day(ts)","iddescnulls last, hour(ts)ascnulls first"' @pytest.mark.parametrize("storage_type", ["local", "s3"]) def test_compressed_metadata(started_cluster, storage_type): @@ -3447,4 +3491,4 @@ def test_compressed_metadata(started_cluster, storage_type): create_iceberg_table(storage_type, instance, TABLE_NAME, started_cluster, explicit_metadata_path="") - assert instance.query(f"SELECT * FROM {TABLE_NAME} WHERE not ignore(*)") == "1\tAlice\n2\tBob\n" \ No newline at end of file + assert instance.query(f"SELECT * FROM {TABLE_NAME} WHERE not ignore(*)") == "1\tAlice\n2\tBob\n"