diff --git a/src/Core/Settings.cpp b/src/Core/Settings.cpp index 059d8702c3ce..4f8c2e9905eb 100644 --- a/src/Core/Settings.cpp +++ b/src/Core/Settings.cpp @@ -6873,6 +6873,16 @@ Use roaring bitmap for iceberg positional deletes. )", 0) \ DECLARE(Bool, export_merge_tree_part_overwrite_file_if_exists, false, R"( Overwrite file if it already exists when exporting a merge tree part +)", 0) \ + DECLARE(Timezone, iceberg_timezone_for_timestamptz, "UTC", R"( +Timezone for Iceberg timestamptz field. + +Possible values: + +- Any valid timezone, e.g. `Europe/Berlin`, `UTC` or `Zulu` +- `` (empty value) - use session timezone + +Default value is `UTC`. )", 0) \ \ /* ####################################################### */ \ diff --git a/src/Core/SettingsChangesHistory.cpp b/src/Core/SettingsChangesHistory.cpp index 9edb667d20b5..5f0a92e449b5 100644 --- a/src/Core/SettingsChangesHistory.cpp +++ b/src/Core/SettingsChangesHistory.cpp @@ -47,6 +47,7 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory() {"allow_retries_in_cluster_requests", false, false, "New setting"}, {"object_storage_remote_initiator", false, false, "New setting."}, {"allow_experimental_export_merge_tree_part", false, true, "Turned ON by default for Antalya."}, + {"iceberg_timezone_for_timestamptz", "UTC", "UTC", "New setting."} }); addSettingsChanges(settings_changes_history, "25.8", { diff --git a/src/Databases/DataLake/Common.cpp b/src/Databases/DataLake/Common.cpp index 681dd957b43f..8946d3412d70 100644 --- a/src/Databases/DataLake/Common.cpp +++ b/src/Databases/DataLake/Common.cpp @@ -61,14 +61,14 @@ std::vector splitTypeArguments(const String & type_str) return args; } -DB::DataTypePtr getType(const String & type_name, bool nullable, const String & prefix) +DB::DataTypePtr getType(const String & type_name, bool nullable, DB::ContextPtr context, const String & prefix) { String name = trim(type_name); if (name.starts_with("array<") && name.ends_with(">")) { String inner = name.substr(6, name.size() - 7); - return std::make_shared(getType(inner, nullable)); + return std::make_shared(getType(inner, nullable, context)); } if (name.starts_with("map<") && name.ends_with(">")) @@ -79,7 +79,7 @@ DB::DataTypePtr getType(const String & type_name, bool nullable, const String & if (args.size() != 2) throw DB::Exception(DB::ErrorCodes::DATALAKE_DATABASE_ERROR, "Invalid data type {}", type_name); - return std::make_shared(getType(args[0], false), getType(args[1], nullable)); + return std::make_shared(getType(args[0], false, context), getType(args[1], nullable, context)); } if (name.starts_with("struct<") && name.ends_with(">")) @@ -101,13 +101,13 @@ DB::DataTypePtr getType(const String & type_name, bool nullable, const String & String full_field_name = prefix.empty() ? field_name : prefix + "." + field_name; field_names.push_back(full_field_name); - field_types.push_back(getType(field_type, nullable, full_field_name)); + field_types.push_back(getType(field_type, nullable, context, full_field_name)); } return std::make_shared(field_types, field_names); } - return nullable ? DB::makeNullable(DB::Iceberg::IcebergSchemaProcessor::getSimpleType(name)) - : DB::Iceberg::IcebergSchemaProcessor::getSimpleType(name); + return nullable ? DB::makeNullable(DB::Iceberg::IcebergSchemaProcessor::getSimpleType(name, context)) + : DB::Iceberg::IcebergSchemaProcessor::getSimpleType(name, context); } std::pair parseTableName(const std::string & name) diff --git a/src/Databases/DataLake/Common.h b/src/Databases/DataLake/Common.h index cd4b6214e343..9b0dd7c626a6 100644 --- a/src/Databases/DataLake/Common.h +++ b/src/Databases/DataLake/Common.h @@ -2,6 +2,7 @@ #include #include +#include namespace DataLake { @@ -10,7 +11,7 @@ String trim(const String & str); std::vector splitTypeArguments(const String & type_str); -DB::DataTypePtr getType(const String & type_name, bool nullable, const String & prefix = ""); +DB::DataTypePtr getType(const String & type_name, bool nullable, DB::ContextPtr context, const String & prefix = ""); /// Parse a string, containing at least one dot, into a two substrings: /// A.B.C.D.E -> A.B.C.D and E, where diff --git a/src/Databases/DataLake/DatabaseDataLake.cpp b/src/Databases/DataLake/DatabaseDataLake.cpp index aeac09799035..93e141c8c883 100644 --- a/src/Databases/DataLake/DatabaseDataLake.cpp +++ b/src/Databases/DataLake/DatabaseDataLake.cpp @@ -326,7 +326,7 @@ StoragePtr DatabaseDataLake::tryGetTableImpl(const String & name, ContextPtr con auto [namespace_name, table_name] = DataLake::parseTableName(name); - if (!catalog->tryGetTableMetadata(namespace_name, table_name, table_metadata)) + if (!catalog->tryGetTableMetadata(namespace_name, table_name, context_, table_metadata)) return nullptr; if (ignore_if_not_iceberg && !table_metadata.isDefaultReadableTable()) @@ -632,7 +632,7 @@ ASTPtr DatabaseDataLake::getCreateDatabaseQuery() const ASTPtr DatabaseDataLake::getCreateTableQueryImpl( const String & name, - ContextPtr /* context_ */, + ContextPtr context_, bool /* throw_on_error */) const { auto catalog = getCatalog(); @@ -640,7 +640,7 @@ ASTPtr DatabaseDataLake::getCreateTableQueryImpl( const auto [namespace_name, table_name] = DataLake::parseTableName(name); - if (!catalog->tryGetTableMetadata(namespace_name, table_name, table_metadata)) + if (!catalog->tryGetTableMetadata(namespace_name, table_name, context_, table_metadata)) { throw Exception( ErrorCodes::CANNOT_GET_CREATE_TABLE_QUERY, "Table `{}` doesn't exist", name); diff --git a/src/Databases/DataLake/GlueCatalog.cpp b/src/Databases/DataLake/GlueCatalog.cpp index e76e81c6c3c3..acd597ba531c 100644 --- a/src/Databases/DataLake/GlueCatalog.cpp +++ b/src/Databases/DataLake/GlueCatalog.cpp @@ -276,6 +276,7 @@ bool GlueCatalog::existsTable(const std::string & database_name, const std::stri bool GlueCatalog::tryGetTableMetadata( const std::string & database_name, const std::string & table_name, + DB::ContextPtr /* context_ */, TableMetadata & result) const { Aws::Glue::Model::GetTableRequest request; @@ -372,7 +373,7 @@ bool GlueCatalog::tryGetTableMetadata( column_type = "timestamptz"; } - schema.push_back({column.GetName(), getType(column_type, can_be_nullable)}); + schema.push_back({column.GetName(), getType(column_type, can_be_nullable, getContext())}); } result.setSchema(schema); } @@ -394,9 +395,10 @@ bool GlueCatalog::tryGetTableMetadata( void GlueCatalog::getTableMetadata( const std::string & database_name, const std::string & table_name, + DB::ContextPtr context_, TableMetadata & result) const { - if (!tryGetTableMetadata(database_name, table_name, result)) + if (!tryGetTableMetadata(database_name, table_name, context_, result)) { throw DB::Exception( DB::ErrorCodes::DATALAKE_DATABASE_ERROR, diff --git a/src/Databases/DataLake/GlueCatalog.h b/src/Databases/DataLake/GlueCatalog.h index 3bf1769ac83e..bcecfd2368ca 100644 --- a/src/Databases/DataLake/GlueCatalog.h +++ b/src/Databases/DataLake/GlueCatalog.h @@ -40,11 +40,13 @@ class GlueCatalog final : public ICatalog, private DB::WithContext void getTableMetadata( const std::string & database_name, const std::string & table_name, + DB::ContextPtr context_, TableMetadata & result) const override; bool tryGetTableMetadata( const std::string & database_name, const std::string & table_name, + DB::ContextPtr context_, TableMetadata & result) const override; std::optional getStorageType() const override diff --git a/src/Databases/DataLake/HiveCatalog.cpp b/src/Databases/DataLake/HiveCatalog.cpp index d174034349b0..7a80571fce56 100644 --- a/src/Databases/DataLake/HiveCatalog.cpp +++ b/src/Databases/DataLake/HiveCatalog.cpp @@ -96,13 +96,21 @@ bool HiveCatalog::existsTable(const std::string & namespace_name, const std::str return true; } -void HiveCatalog::getTableMetadata(const std::string & namespace_name, const std::string & table_name, TableMetadata & result) const +void HiveCatalog::getTableMetadata( + const std::string & namespace_name, + const std::string & table_name, + DB::ContextPtr context_, + TableMetadata & result) const { - if (!tryGetTableMetadata(namespace_name, table_name, result)) + if (!tryGetTableMetadata(namespace_name, table_name, context_, result)) throw DB::Exception(DB::ErrorCodes::DATALAKE_DATABASE_ERROR, "No response from iceberg catalog"); } -bool HiveCatalog::tryGetTableMetadata(const std::string & namespace_name, const std::string & table_name, TableMetadata & result) const +bool HiveCatalog::tryGetTableMetadata( + const std::string & namespace_name, + const std::string & table_name, + DB::ContextPtr context_, + TableMetadata & result) const { Apache::Hadoop::Hive::Table table; @@ -130,7 +138,7 @@ bool HiveCatalog::tryGetTableMetadata(const std::string & namespace_name, const auto columns = table.sd.cols; for (const auto & column : columns) { - schema.push_back({column.name, getType(column.type, true)}); + schema.push_back({column.name, getType(column.type, true, context_)}); } result.setSchema(schema); } diff --git a/src/Databases/DataLake/HiveCatalog.h b/src/Databases/DataLake/HiveCatalog.h index 29b4e6ce6c63..0fba0e132486 100644 --- a/src/Databases/DataLake/HiveCatalog.h +++ b/src/Databases/DataLake/HiveCatalog.h @@ -38,9 +38,17 @@ class HiveCatalog final : public ICatalog, private DB::WithContext bool existsTable(const std::string & namespace_name, const std::string & table_name) const override; - void getTableMetadata(const std::string & namespace_name, const std::string & table_name, TableMetadata & result) const override; - - bool tryGetTableMetadata(const std::string & namespace_name, const std::string & table_name, TableMetadata & result) const override; + void getTableMetadata( + const std::string & namespace_name, + const std::string & table_name, + DB::ContextPtr context_, + TableMetadata & result) const override; + + bool tryGetTableMetadata( + const std::string & namespace_name, + const std::string & table_name, + DB::ContextPtr context_, + TableMetadata & result) const override; std::optional getStorageType() const override; diff --git a/src/Databases/DataLake/ICatalog.h b/src/Databases/DataLake/ICatalog.h index ce3970d6d046..26964aa36433 100644 --- a/src/Databases/DataLake/ICatalog.h +++ b/src/Databases/DataLake/ICatalog.h @@ -8,6 +8,14 @@ #include #include +namespace DB +{ + +class Context; +using ContextPtr = std::shared_ptr; + +} + namespace DataLake { @@ -150,6 +158,7 @@ class ICatalog virtual void getTableMetadata( const std::string & namespace_name, const std::string & table_name, + DB::ContextPtr context, TableMetadata & result) const = 0; /// Get table metadata in the given namespace. @@ -157,6 +166,7 @@ class ICatalog virtual bool tryGetTableMetadata( const std::string & namespace_name, const std::string & table_name, + DB::ContextPtr context, TableMetadata & result) const = 0; /// Get storage type, where Iceberg tables' data is stored. diff --git a/src/Databases/DataLake/RestCatalog.cpp b/src/Databases/DataLake/RestCatalog.cpp index 7e385f034c98..39297f8b33a3 100644 --- a/src/Databases/DataLake/RestCatalog.cpp +++ b/src/Databases/DataLake/RestCatalog.cpp @@ -532,17 +532,18 @@ DB::Names RestCatalog::parseTables(DB::ReadBuffer & buf, const std::string & bas bool RestCatalog::existsTable(const std::string & namespace_name, const std::string & table_name) const { TableMetadata table_metadata; - return tryGetTableMetadata(namespace_name, table_name, table_metadata); + return tryGetTableMetadata(namespace_name, table_name, getContext(), table_metadata); } bool RestCatalog::tryGetTableMetadata( const std::string & namespace_name, const std::string & table_name, + DB::ContextPtr context_, TableMetadata & result) const { try { - return getTableMetadataImpl(namespace_name, table_name, result); + return getTableMetadataImpl(namespace_name, table_name, context_, result); } catch (...) { @@ -554,15 +555,17 @@ bool RestCatalog::tryGetTableMetadata( void RestCatalog::getTableMetadata( const std::string & namespace_name, const std::string & table_name, + DB::ContextPtr context_, TableMetadata & result) const { - if (!getTableMetadataImpl(namespace_name, table_name, result)) + if (!getTableMetadataImpl(namespace_name, table_name, context_, result)) throw DB::Exception(DB::ErrorCodes::DATALAKE_DATABASE_ERROR, "No response from iceberg catalog"); } bool RestCatalog::getTableMetadataImpl( const std::string & namespace_name, const std::string & table_name, + DB::ContextPtr context_, TableMetadata & result) const { LOG_TEST(log, "Checking table {} in namespace {}", table_name, namespace_name); @@ -623,8 +626,8 @@ bool RestCatalog::getTableMetadataImpl( if (result.requiresSchema()) { // int format_version = metadata_object->getValue("format-version"); - auto schema_processor = DB::Iceberg::IcebergSchemaProcessor(); - auto id = DB::IcebergMetadata::parseTableSchema(metadata_object, schema_processor, log); + auto schema_processor = DB::Iceberg::IcebergSchemaProcessor(context_); + auto id = DB::IcebergMetadata::parseTableSchema(metadata_object, schema_processor, context_, log); auto schema = schema_processor.getClickhouseTableSchemaById(id); result.setSchema(*schema); } diff --git a/src/Databases/DataLake/RestCatalog.h b/src/Databases/DataLake/RestCatalog.h index 182e3de0ae55..a98e719ff09d 100644 --- a/src/Databases/DataLake/RestCatalog.h +++ b/src/Databases/DataLake/RestCatalog.h @@ -42,11 +42,13 @@ class RestCatalog final : public ICatalog, private DB::WithContext void getTableMetadata( const std::string & namespace_name, const std::string & table_name, + DB::ContextPtr context_, TableMetadata & result) const override; bool tryGetTableMetadata( const std::string & namespace_name, const std::string & table_name, + DB::ContextPtr context_, TableMetadata & result) const override; std::optional getStorageType() const override; @@ -126,6 +128,7 @@ class RestCatalog final : public ICatalog, private DB::WithContext bool getTableMetadataImpl( const std::string & namespace_name, const std::string & table_name, + DB::ContextPtr context_, TableMetadata & result) const; Config loadConfig(); diff --git a/src/Databases/DataLake/UnityCatalog.cpp b/src/Databases/DataLake/UnityCatalog.cpp index 18c20931e608..8054d971c9e0 100644 --- a/src/Databases/DataLake/UnityCatalog.cpp +++ b/src/Databases/DataLake/UnityCatalog.cpp @@ -91,9 +91,10 @@ DB::Names UnityCatalog::getTables() const void UnityCatalog::getTableMetadata( const std::string & namespace_name, const std::string & table_name, + DB::ContextPtr context_, TableMetadata & result) const { - if (!tryGetTableMetadata(namespace_name, table_name, result)) + if (!tryGetTableMetadata(namespace_name, table_name, context_, result)) throw DB::Exception(DB::ErrorCodes::DATALAKE_DATABASE_ERROR, "No response from unity catalog"); } @@ -136,6 +137,7 @@ void UnityCatalog::getCredentials(const std::string & table_id, TableMetadata & bool UnityCatalog::tryGetTableMetadata( const std::string & schema_name, const std::string & table_name, + DB::ContextPtr /* context_ */, TableMetadata & result) const { auto full_table_name = warehouse + "." + schema_name + "." + table_name; diff --git a/src/Databases/DataLake/UnityCatalog.h b/src/Databases/DataLake/UnityCatalog.h index 2e6262d6e5d7..9d4dc0a74877 100644 --- a/src/Databases/DataLake/UnityCatalog.h +++ b/src/Databases/DataLake/UnityCatalog.h @@ -34,11 +34,13 @@ class UnityCatalog final : public ICatalog, private DB::WithContext void getTableMetadata( const std::string & namespace_name, const std::string & table_name, + DB::ContextPtr context_, TableMetadata & result) const override; bool tryGetTableMetadata( const std::string & schema_name, const std::string & table_name, + DB::ContextPtr context_, TableMetadata & result) const override; std::optional getStorageType() const override { return std::nullopt; } diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.cpp b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.cpp index 12fcb29b24ac..e6b2b449e899 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.cpp +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.cpp @@ -135,7 +135,7 @@ IcebergMetadata::IcebergMetadata( : object_storage(std::move(object_storage_)) , configuration(std::move(configuration_)) , persistent_components(PersistentTableComponents{ - .schema_processor = std::make_shared(), + .schema_processor = std::make_shared(context_), .metadata_cache = cache_ptr, .format_version = format_version_, .table_location = metadata_object_->getValue(f_location) @@ -148,7 +148,7 @@ IcebergMetadata::IcebergMetadata( updateState(context_, metadata_object_); } -void IcebergMetadata::addTableSchemaById(Int32 schema_id, Poco::JSON::Object::Ptr metadata_object) const +void IcebergMetadata::addTableSchemaById(Int32 schema_id, Poco::JSON::Object::Ptr metadata_object, ContextPtr context_) const { if (persistent_components.schema_processor->hasClickhouseTableSchemaById(schema_id)) return; @@ -163,7 +163,7 @@ void IcebergMetadata::addTableSchemaById(Int32 schema_id, Poco::JSON::Object::Pt auto current_schema = schemas->getObject(i); if (current_schema->has(f_schema_id) && current_schema->getValue(f_schema_id) == schema_id) { - persistent_components.schema_processor->addIcebergTableSchema(current_schema); + persistent_components.schema_processor->addIcebergTableSchema(current_schema, context_); return; } } @@ -174,13 +174,16 @@ void IcebergMetadata::addTableSchemaById(Int32 schema_id, Poco::JSON::Object::Pt } Int32 IcebergMetadata::parseTableSchema( - const Poco::JSON::Object::Ptr & metadata_object, IcebergSchemaProcessor & schema_processor, LoggerPtr metadata_logger) + const Poco::JSON::Object::Ptr & metadata_object, + IcebergSchemaProcessor & schema_processor, + ContextPtr context_, + LoggerPtr metadata_logger) { const auto format_version = metadata_object->getValue(f_format_version); if (format_version == 2) { auto [schema, current_schema_id] = parseTableSchemaV2Method(metadata_object); - schema_processor.addIcebergTableSchema(schema); + schema_processor.addIcebergTableSchema(schema, context_); return current_schema_id; } else @@ -188,7 +191,7 @@ Int32 IcebergMetadata::parseTableSchema( try { auto [schema, current_schema_id] = parseTableSchemaV1Method(metadata_object); - schema_processor.addIcebergTableSchema(schema); + schema_processor.addIcebergTableSchema(schema, context_); return current_schema_id; } catch (const Exception & first_error) @@ -198,7 +201,7 @@ Int32 IcebergMetadata::parseTableSchema( try { auto [schema, current_schema_id] = parseTableSchemaV2Method(metadata_object); - schema_processor.addIcebergTableSchema(schema); + schema_processor.addIcebergTableSchema(schema, context_); LOG_WARNING( metadata_logger, "Iceberg table schema was parsed using v2 specification, but it was impossible to parse it using v1 " @@ -457,7 +460,7 @@ void IcebergMetadata::updateSnapshot(ContextPtr local_context, Poco::JSON::Objec for (UInt32 j = 0; j < schemas->size(); ++j) { auto schema = schemas->getObject(j); - persistent_components.schema_processor->addIcebergTableSchema(schema); + persistent_components.schema_processor->addIcebergTableSchema(schema, local_context); } auto snapshots = metadata_object->get(f_snapshots).extract(); bool successfully_found_snapshot = false; @@ -522,7 +525,7 @@ void IcebergMetadata::updateSnapshot(ContextPtr local_context, Poco::JSON::Objec relevant_snapshot_id, configuration_ptr->getPathForRead().path); relevant_snapshot_schema_id = snapshot->getValue(f_schema_id); - addTableSchemaById(relevant_snapshot_schema_id, metadata_object); + addTableSchemaById(relevant_snapshot_schema_id, metadata_object, local_context); } } if (!successfully_found_snapshot) @@ -609,7 +612,11 @@ void IcebergMetadata::updateState(const ContextPtr & local_context, Poco::JSON:: { updateSnapshot(local_context, metadata_object); } - relevant_snapshot_schema_id = parseTableSchema(metadata_object, *persistent_components.schema_processor, log); + relevant_snapshot_schema_id = parseTableSchema( + metadata_object, + *persistent_components.schema_processor, + local_context, + log); } } @@ -626,14 +633,17 @@ std::shared_ptr IcebergMetadata::getInitialSchemaByPath(Conte : nullptr; } -std::shared_ptr IcebergMetadata::getSchemaTransformer(ContextPtr, ObjectInfoPtr object_info) const +std::shared_ptr IcebergMetadata::getSchemaTransformer(ContextPtr context_, ObjectInfoPtr object_info) const { IcebergDataObjectInfo * iceberg_object_info = dynamic_cast(object_info.get()); SharedLockGuard lock(mutex); if (!iceberg_object_info) return nullptr; return (iceberg_object_info->underlying_format_read_schema_id != relevant_snapshot_schema_id) - ? persistent_components.schema_processor->getSchemaTransformationDagByIds(iceberg_object_info->underlying_format_read_schema_id, relevant_snapshot_schema_id) + ? persistent_components.schema_processor->getSchemaTransformationDagByIds( + context_, + iceberg_object_info->underlying_format_read_schema_id, + relevant_snapshot_schema_id) : nullptr; } diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.h b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.h index 8af8e68dca13..113b5be49f3b 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.h +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.h @@ -80,7 +80,10 @@ class IcebergMetadata : public IDataLakeMetadata bool supportsSchemaEvolution() const override { return true; } static Int32 parseTableSchema( - const Poco::JSON::Object::Ptr & metadata_object, Iceberg::IcebergSchemaProcessor & schema_processor, LoggerPtr metadata_logger); + const Poco::JSON::Object::Ptr & metadata_object, + Iceberg::IcebergSchemaProcessor & schema_processor, + ContextPtr context_, + LoggerPtr metadata_logger); bool supportsUpdate() const override { return true; } bool supportsWrites() const override { return true; } @@ -150,7 +153,7 @@ class IcebergMetadata : public IDataLakeMetadata void updateState(const ContextPtr & local_context, Poco::JSON::Object::Ptr metadata_object) TSA_REQUIRES(mutex); void updateSnapshot(ContextPtr local_context, Poco::JSON::Object::Ptr metadata_object) TSA_REQUIRES(mutex); - void addTableSchemaById(Int32 schema_id, Poco::JSON::Object::Ptr metadata_object) const TSA_REQUIRES(mutex); + void addTableSchemaById(Int32 schema_id, Poco::JSON::Object::Ptr metadata_object, ContextPtr context_) const TSA_REQUIRES(mutex); std::optional getSchemaVersionByFileIfOutdated(String data_path) const TSA_REQUIRES_SHARED(mutex); void initializeSchemasFromManifestList(ContextPtr local_context, ManifestFileCacheKeys manifest_list_ptr) const TSA_REQUIRES(mutex); }; diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/ManifestFile.cpp b/src/Storages/ObjectStorage/DataLakes/Iceberg/ManifestFile.cpp index 1fdde4972ef0..9f4859e519d3 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/ManifestFile.cpp +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/ManifestFile.cpp @@ -200,7 +200,7 @@ ManifestFileContent::ManifestFileContent( const Poco::JSON::Object::Ptr & schema_object = json.extract(); Int32 manifest_schema_id = schema_object->getValue(f_schema_id); - schema_processor.addIcebergTableSchema(schema_object); + schema_processor.addIcebergTableSchema(schema_object, context); for (size_t i = 0; i != partition_specification->size(); ++i) { diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/SchemaProcessor.cpp b/src/Storages/ObjectStorage/DataLakes/Iceberg/SchemaProcessor.cpp index 6d74a15b8120..56a2cd976f6b 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/SchemaProcessor.cpp +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/SchemaProcessor.cpp @@ -17,6 +17,7 @@ #include #include #include +#include #include #include @@ -32,6 +33,8 @@ #include #include #include +#include +#include #include @@ -46,6 +49,10 @@ extern const int LOGICAL_ERROR; extern const int BAD_ARGUMENTS; } +namespace Setting +{ +extern const SettingsTimezone iceberg_timezone_for_timestamptz; +} namespace { @@ -144,7 +151,7 @@ namespace Iceberg std::string IcebergSchemaProcessor::default_link{}; -void IcebergSchemaProcessor::addIcebergTableSchema(Poco::JSON::Object::Ptr schema_ptr) +void IcebergSchemaProcessor::addIcebergTableSchema(Poco::JSON::Object::Ptr schema_ptr, ContextPtr context_) { std::lock_guard lock(mutex); @@ -167,7 +174,7 @@ void IcebergSchemaProcessor::addIcebergTableSchema(Poco::JSON::Object::Ptr schem auto name = field->getValue(f_name); bool required = field->getValue(f_required); current_full_name = name; - auto type = getFieldType(field, f_type, required, current_full_name, true); + auto type = getFieldType(field, f_type, context_, required, current_full_name, true); clickhouse_schema->push_back(NameAndTypePair{name, type}); clickhouse_types_by_source_ids[{schema_id, field->getValue(f_id)}] = NameAndTypePair{current_full_name, type}; clickhouse_ids_by_source_names[{schema_id, current_full_name}] = field->getValue(f_id); @@ -221,7 +228,7 @@ NamesAndTypesList IcebergSchemaProcessor::tryGetFieldsCharacteristics(Int32 sche return fields; } -DataTypePtr IcebergSchemaProcessor::getSimpleType(const String & type_name) +DataTypePtr IcebergSchemaProcessor::getSimpleType(const String & type_name, ContextPtr context_) { if (type_name == f_boolean) return DataTypeFactory::instance().get("Bool"); @@ -240,7 +247,10 @@ DataTypePtr IcebergSchemaProcessor::getSimpleType(const String & type_name) if (type_name == f_timestamp) return std::make_shared(6); if (type_name == f_timestamptz) - return std::make_shared(6, "UTC"); + { + std::string timezone = context_->getSettingsRef()[Setting::iceberg_timezone_for_timestamptz]; + return std::make_shared(6, timezone); + } if (type_name == f_string || type_name == f_binary) return std::make_shared(); if (type_name == f_uuid) @@ -265,21 +275,25 @@ DataTypePtr IcebergSchemaProcessor::getSimpleType(const String & type_name) } DataTypePtr -IcebergSchemaProcessor::getComplexTypeFromObject(const Poco::JSON::Object::Ptr & type, String & current_full_name, bool is_subfield_of_root) +IcebergSchemaProcessor::getComplexTypeFromObject( + const Poco::JSON::Object::Ptr & type, + String & current_full_name, + ContextPtr context_, + bool is_subfield_of_root) { String type_name = type->getValue(f_type); if (type_name == f_list) { bool element_required = type->getValue("element-required"); - auto element_type = getFieldType(type, f_element, element_required); + auto element_type = getFieldType(type, f_element, context_, element_required); return std::make_shared(element_type); } if (type_name == f_map) { - auto key_type = getFieldType(type, f_key, true); + auto key_type = getFieldType(type, f_key, context_, true); auto value_required = type->getValue("value-required"); - auto value_type = getFieldType(type, f_value, value_required); + auto value_type = getFieldType(type, f_value, context_, value_required); return std::make_shared(key_type, value_type); } @@ -303,7 +317,7 @@ IcebergSchemaProcessor::getComplexTypeFromObject(const Poco::JSON::Object::Ptr & (current_full_name += ".").append(element_names.back()); scope_guard guard([&] { current_full_name.resize(current_full_name.size() - element_names.back().size() - 1); }); - element_types.push_back(getFieldType(field, f_type, required, current_full_name, true)); + element_types.push_back(getFieldType(field, f_type, context_, required, current_full_name, true)); TSA_SUPPRESS_WARNING_FOR_WRITE(clickhouse_types_by_source_ids) [{schema_id, field->getValue(f_id)}] = NameAndTypePair{current_full_name, element_types.back()}; @@ -312,7 +326,7 @@ IcebergSchemaProcessor::getComplexTypeFromObject(const Poco::JSON::Object::Ptr & } else { - element_types.push_back(getFieldType(field, f_type, required)); + element_types.push_back(getFieldType(field, f_type, context_, required)); } } @@ -323,16 +337,21 @@ IcebergSchemaProcessor::getComplexTypeFromObject(const Poco::JSON::Object::Ptr & } DataTypePtr IcebergSchemaProcessor::getFieldType( - const Poco::JSON::Object::Ptr & field, const String & type_key, bool required, String & current_full_name, bool is_subfield_of_root) + const Poco::JSON::Object::Ptr & field, + const String & type_key, + ContextPtr context_, + bool required, + String & current_full_name, + bool is_subfield_of_root) { if (field->isObject(type_key)) - return getComplexTypeFromObject(field->getObject(type_key), current_full_name, is_subfield_of_root); + return getComplexTypeFromObject(field->getObject(type_key), current_full_name, context_, is_subfield_of_root); auto type = field->get(type_key); if (type.isString()) { const String & type_name = type.extract(); - auto data_type = getSimpleType(type_name); + auto data_type = getSimpleType(type_name, context_); return required ? data_type : makeNullable(data_type); } @@ -362,7 +381,11 @@ bool IcebergSchemaProcessor::allowPrimitiveTypeConversion(const String & old_typ // Ids are passed only for error logging purposes std::shared_ptr IcebergSchemaProcessor::getSchemaTransformationDag( - const Poco::JSON::Object::Ptr & old_schema, const Poco::JSON::Object::Ptr & new_schema, Int32 old_id, Int32 new_id) + const Poco::JSON::Object::Ptr & old_schema, + const Poco::JSON::Object::Ptr & new_schema, + ContextPtr context_, + Int32 old_id, + Int32 new_id) { std::unordered_map> old_schema_entries; auto old_schema_fields = old_schema->get(f_fields).extract(); @@ -374,7 +397,7 @@ std::shared_ptr IcebergSchemaProcessor::getSchemaTransformationDag( size_t id = field->getValue(f_id); auto name = field->getValue(f_name); bool required = field->getValue(f_required); - old_schema_entries[id] = {field, &dag->addInput(name, getFieldType(field, f_type, required))}; + old_schema_entries[id] = {field, &dag->addInput(name, getFieldType(field, f_type, context_, required))}; } auto new_schema_fields = new_schema->get(f_fields).extract(); for (size_t i = 0; i != new_schema_fields->size(); ++i) @@ -383,7 +406,7 @@ std::shared_ptr IcebergSchemaProcessor::getSchemaTransformationDag( size_t id = field->getValue(f_id); auto name = field->getValue(f_name); bool required = field->getValue(f_required); - auto type = getFieldType(field, f_type, required); + auto type = getFieldType(field, f_type, context_, required); auto old_node_it = old_schema_entries.find(id); if (old_node_it != old_schema_entries.end()) { @@ -393,7 +416,7 @@ std::shared_ptr IcebergSchemaProcessor::getSchemaTransformationDag( || field->getObject(f_type)->getValue(f_type) == "list" || field->getObject(f_type)->getValue(f_type) == "map")) { - auto old_type = getFieldType(old_json, "type", required); + auto old_type = getFieldType(old_json, "type", context_, required); auto transform = std::make_shared(std::vector{type}, std::vector{old_type}, old_json, field); old_node = &dag->addFunction(transform, std::vector{old_node}, name); @@ -423,7 +446,7 @@ std::shared_ptr IcebergSchemaProcessor::getSchemaTransformationDag( } else if (allowPrimitiveTypeConversion(old_type, new_type)) { - node = &dag->addCast(*old_node, getFieldType(field, f_type, required), name); + node = &dag->addCast(*old_node, getFieldType(field, f_type, context_, required), name); } outputs.push_back(node); } @@ -449,7 +472,10 @@ std::shared_ptr IcebergSchemaProcessor::getSchemaTransformationDag( return dag; } -std::shared_ptr IcebergSchemaProcessor::getSchemaTransformationDagByIds(Int32 old_id, Int32 new_id) +std::shared_ptr IcebergSchemaProcessor::getSchemaTransformationDagByIds( + ContextPtr context_, + Int32 old_id, + Int32 new_id) { if (old_id == new_id) return nullptr; @@ -468,7 +494,7 @@ std::shared_ptr IcebergSchemaProcessor::getSchemaTransformatio throw Exception(ErrorCodes::BAD_ARGUMENTS, "Schema with schema-id {} is unknown", new_id); return transform_dags_by_ids[{old_id, new_id}] - = getSchemaTransformationDag(old_schema_it->second, new_schema_it->second, old_id, new_id); + = getSchemaTransformationDag(old_schema_it->second, new_schema_it->second, context_, old_id, new_id); } Poco::JSON::Object::Ptr IcebergSchemaProcessor::getIcebergTableSchemaById(Int32 id) const diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/SchemaProcessor.h b/src/Storages/ObjectStorage/DataLakes/Iceberg/SchemaProcessor.h index 0fc42cccd266..b4a7a8d6fdeb 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/SchemaProcessor.h +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/SchemaProcessor.h @@ -75,16 +75,18 @@ ColumnMapperPtr createColumnMapper(Poco::JSON::Object::Ptr schema_object); * } * } */ -class IcebergSchemaProcessor +class IcebergSchemaProcessor : private WithContext { static std::string default_link; using Node = ActionsDAG::Node; public: - void addIcebergTableSchema(Poco::JSON::Object::Ptr schema_ptr); + explicit IcebergSchemaProcessor(ContextPtr context_) : WithContext(context_) {} + + void addIcebergTableSchema(Poco::JSON::Object::Ptr schema_ptr, ContextPtr context_); std::shared_ptr getClickhouseTableSchemaById(Int32 id); - std::shared_ptr getSchemaTransformationDagByIds(Int32 old_id, Int32 new_id); + std::shared_ptr getSchemaTransformationDagByIds(ContextPtr context_, Int32 old_id, Int32 new_id); NameAndTypePair getFieldCharacteristics(Int32 schema_version, Int32 source_id) const; std::optional tryGetFieldCharacteristics(Int32 schema_version, Int32 source_id) const; NamesAndTypesList tryGetFieldsCharacteristics(Int32 schema_id, const std::vector & source_ids) const; @@ -92,7 +94,7 @@ class IcebergSchemaProcessor Poco::JSON::Object::Ptr getIcebergTableSchemaById(Int32 id) const; bool hasClickhouseTableSchemaById(Int32 id) const; - static DataTypePtr getSimpleType(const String & type_name); + static DataTypePtr getSimpleType(const String & type_name, ContextPtr context_); static std::unordered_map traverseSchema(Poco::JSON::Array::Ptr schema); @@ -112,10 +114,15 @@ class IcebergSchemaProcessor std::unordered_map schema_id_by_snapshot TSA_GUARDED_BY(mutex); NamesAndTypesList getSchemaType(const Poco::JSON::Object::Ptr & schema); - DataTypePtr getComplexTypeFromObject(const Poco::JSON::Object::Ptr & type, String & current_full_name, bool is_subfield_of_root); + DataTypePtr getComplexTypeFromObject( + const Poco::JSON::Object::Ptr & type, + String & current_full_name, + ContextPtr context_, + bool is_subfield_of_root); DataTypePtr getFieldType( const Poco::JSON::Object::Ptr & field, const String & type_key, + ContextPtr context_, bool required, String & current_full_name = default_link, bool is_subfield_of_root = false); @@ -124,7 +131,11 @@ class IcebergSchemaProcessor const Node * getDefaultNodeForField(const Poco::JSON::Object::Ptr & field); std::shared_ptr getSchemaTransformationDag( - const Poco::JSON::Object::Ptr & old_schema, const Poco::JSON::Object::Ptr & new_schema, Int32 old_id, Int32 new_id); + const Poco::JSON::Object::Ptr & old_schema, + const Poco::JSON::Object::Ptr & new_schema, + ContextPtr context_, + Int32 old_id, + Int32 new_id); mutable SharedMutex mutex; }; diff --git a/tests/integration/test_database_iceberg/test.py b/tests/integration/test_database_iceberg/test.py index 6fa8113f5e37..8402af573265 100644 --- a/tests/integration/test_database_iceberg/test.py +++ b/tests/integration/test_database_iceberg/test.py @@ -535,6 +535,52 @@ def test_timestamps(started_cluster): assert node.query(f"SHOW CREATE TABLE {CATALOG_NAME}.`{root_namespace}.{table_name}`") == f"CREATE TABLE {CATALOG_NAME}.`{root_namespace}.{table_name}`\\n(\\n `timestamp` Nullable(DateTime64(6)),\\n `timestamptz` Nullable(DateTime64(6, \\'UTC\\'))\\n)\\nENGINE = Iceberg(\\'http://minio:9000/warehouse-rest/data/\\', \\'minio\\', \\'[HIDDEN]\\')\n" assert node.query(f"SELECT * FROM {CATALOG_NAME}.`{root_namespace}.{table_name}`") == "2024-01-01 12:00:00.000000\t2024-01-01 12:00:00.000000\n" + # Berlin - UTC+1 at winter + # Istanbul - UTC+3 at winter + + # 'UTC' is default value, responce is equal to query above + assert node.query(f""" + SELECT * FROM {CATALOG_NAME}.`{root_namespace}.{table_name}` + SETTINGS iceberg_timezone_for_timestamptz='UTC' + """) == "2024-01-01 12:00:00.000000\t2024-01-01 12:00:00.000000\n" + # Timezone from setting + assert node.query(f""" + SELECT * FROM {CATALOG_NAME}.`{root_namespace}.{table_name}` + SETTINGS iceberg_timezone_for_timestamptz='Europe/Berlin' + """) == "2024-01-01 12:00:00.000000\t2024-01-01 13:00:00.000000\n" + # Empty value means session timezone, by default it is 'UTC' too + assert node.query(f""" + SELECT * FROM {CATALOG_NAME}.`{root_namespace}.{table_name}` + SETTINGS iceberg_timezone_for_timestamptz='' + """) == "2024-01-01 12:00:00.000000\t2024-01-01 12:00:00.000000\n" + # If session timezone is used, `timestamptz` does not changed, 'UTC' by default + assert node.query(f""" + SELECT * FROM {CATALOG_NAME}.`{root_namespace}.{table_name}` + SETTINGS session_timezone='Asia/Istanbul' + """) == "2024-01-01 15:00:00.000000\t2024-01-01 12:00:00.000000\n" + # Setiing `iceberg_timezone_for_timestamptz` does not affect `timestamp` column + assert node.query(f""" + SELECT * FROM {CATALOG_NAME}.`{root_namespace}.{table_name}` + SETTINGS session_timezone='Asia/Istanbul', iceberg_timezone_for_timestamptz='Europe/Berlin' + """) == "2024-01-01 15:00:00.000000\t2024-01-01 13:00:00.000000\n" + # Empty value, used non-default session timezone + assert node.query(f""" + SELECT * FROM {CATALOG_NAME}.`{root_namespace}.{table_name}` + SETTINGS session_timezone='Asia/Istanbul', iceberg_timezone_for_timestamptz='' + """) == "2024-01-01 15:00:00.000000\t2024-01-01 15:00:00.000000\n" + # Invalid timezone + assert "Invalid time zone: Foo/Bar" in node.query_and_get_error(f""" + SELECT * FROM {CATALOG_NAME}.`{root_namespace}.{table_name}` + SETTINGS iceberg_timezone_for_timestamptz='Foo/Bar' + """) + + assert node.query(f"SHOW CREATE TABLE {CATALOG_NAME}.`{root_namespace}.{table_name}` SETTINGS iceberg_timezone_for_timestamptz='UTC'") == f"CREATE TABLE {CATALOG_NAME}.`{root_namespace}.{table_name}`\\n(\\n `timestamp` Nullable(DateTime64(6)),\\n `timestamptz` Nullable(DateTime64(6, \\'UTC\\'))\\n)\\nENGINE = Iceberg(\\'http://minio:9000/warehouse-rest/data/\\', \\'minio\\', \\'[HIDDEN]\\')\n" + assert node.query(f"SHOW CREATE TABLE {CATALOG_NAME}.`{root_namespace}.{table_name}` SETTINGS iceberg_timezone_for_timestamptz='Europe/Berlin'") == f"CREATE TABLE {CATALOG_NAME}.`{root_namespace}.{table_name}`\\n(\\n `timestamp` Nullable(DateTime64(6)),\\n `timestamptz` Nullable(DateTime64(6, \\'Europe/Berlin\\'))\\n)\\nENGINE = Iceberg(\\'http://minio:9000/warehouse-rest/data/\\', \\'minio\\', \\'[HIDDEN]\\')\n" + + assert node.query(f"SELECT timezoneOf(timestamptz) FROM {CATALOG_NAME}.`{root_namespace}.{table_name}` LIMIT 1") == "UTC\n" + assert node.query(f"SELECT timezoneOf(timestamptz) FROM {CATALOG_NAME}.`{root_namespace}.{table_name}` LIMIT 1 SETTINGS iceberg_timezone_for_timestamptz='UTC'") == "UTC\n" + assert node.query(f"SELECT timezoneOf(timestamptz) FROM {CATALOG_NAME}.`{root_namespace}.{table_name}` LIMIT 1 SETTINGS iceberg_timezone_for_timestamptz='Europe/Berlin'") == "Europe/Berlin\n" + def test_insert(started_cluster): node = started_cluster.instances["node1"]