Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions src/Core/Settings.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6873,6 +6873,16 @@ Use roaring bitmap for iceberg positional deletes.
)", 0) \
DECLARE(Bool, export_merge_tree_part_overwrite_file_if_exists, false, R"(
Overwrite file if it already exists when exporting a merge tree part
)", 0) \
DECLARE(Timezone, iceberg_timezone_for_timestamptz, "UTC", R"(
Timezone for Iceberg timestamptz field.

Possible values:

- Any valid timezone, e.g. `Europe/Berlin`, `UTC` or `Zulu`
- `` (empty value) - use session timezone

Default value is `UTC`.
)", 0) \
\
/* ####################################################### */ \
Expand Down
1 change: 1 addition & 0 deletions src/Core/SettingsChangesHistory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory()
{"allow_retries_in_cluster_requests", false, false, "New setting"},
{"object_storage_remote_initiator", false, false, "New setting."},
{"allow_experimental_export_merge_tree_part", false, true, "Turned ON by default for Antalya."},
{"iceberg_timezone_for_timestamptz", "UTC", "UTC", "New setting."}
});
addSettingsChanges(settings_changes_history, "25.8",
{
Expand Down
12 changes: 6 additions & 6 deletions src/Databases/DataLake/Common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,14 +61,14 @@ std::vector<String> splitTypeArguments(const String & type_str)
return args;
}

DB::DataTypePtr getType(const String & type_name, bool nullable, const String & prefix)
DB::DataTypePtr getType(const String & type_name, bool nullable, DB::ContextPtr context, const String & prefix)
{
String name = trim(type_name);

if (name.starts_with("array<") && name.ends_with(">"))
{
String inner = name.substr(6, name.size() - 7);
return std::make_shared<DB::DataTypeArray>(getType(inner, nullable));
return std::make_shared<DB::DataTypeArray>(getType(inner, nullable, context));
}

if (name.starts_with("map<") && name.ends_with(">"))
Expand All @@ -79,7 +79,7 @@ DB::DataTypePtr getType(const String & type_name, bool nullable, const String &
if (args.size() != 2)
throw DB::Exception(DB::ErrorCodes::DATALAKE_DATABASE_ERROR, "Invalid data type {}", type_name);

return std::make_shared<DB::DataTypeMap>(getType(args[0], false), getType(args[1], nullable));
return std::make_shared<DB::DataTypeMap>(getType(args[0], false, context), getType(args[1], nullable, context));
}

if (name.starts_with("struct<") && name.ends_with(">"))
Expand All @@ -101,13 +101,13 @@ DB::DataTypePtr getType(const String & type_name, bool nullable, const String &
String full_field_name = prefix.empty() ? field_name : prefix + "." + field_name;

field_names.push_back(full_field_name);
field_types.push_back(getType(field_type, nullable, full_field_name));
field_types.push_back(getType(field_type, nullable, context, full_field_name));
}
return std::make_shared<DB::DataTypeTuple>(field_types, field_names);
}

return nullable ? DB::makeNullable(DB::Iceberg::IcebergSchemaProcessor::getSimpleType(name))
: DB::Iceberg::IcebergSchemaProcessor::getSimpleType(name);
return nullable ? DB::makeNullable(DB::Iceberg::IcebergSchemaProcessor::getSimpleType(name, context))
: DB::Iceberg::IcebergSchemaProcessor::getSimpleType(name, context);
}

std::pair<std::string, std::string> parseTableName(const std::string & name)
Expand Down
3 changes: 2 additions & 1 deletion src/Databases/DataLake/Common.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

#include <Core/NamesAndTypes.h>
#include <Core/Types.h>
#include <Interpreters/Context_fwd.h>

namespace DataLake
{
Expand All @@ -10,7 +11,7 @@ String trim(const String & str);

std::vector<String> splitTypeArguments(const String & type_str);

DB::DataTypePtr getType(const String & type_name, bool nullable, const String & prefix = "");
DB::DataTypePtr getType(const String & type_name, bool nullable, DB::ContextPtr context, const String & prefix = "");

/// Parse a string, containing at least one dot, into a two substrings:
/// A.B.C.D.E -> A.B.C.D and E, where
Expand Down
6 changes: 3 additions & 3 deletions src/Databases/DataLake/DatabaseDataLake.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,7 @@ StoragePtr DatabaseDataLake::tryGetTableImpl(const String & name, ContextPtr con

auto [namespace_name, table_name] = DataLake::parseTableName(name);

if (!catalog->tryGetTableMetadata(namespace_name, table_name, table_metadata))
if (!catalog->tryGetTableMetadata(namespace_name, table_name, context_, table_metadata))
return nullptr;

if (ignore_if_not_iceberg && !table_metadata.isDefaultReadableTable())
Expand Down Expand Up @@ -632,15 +632,15 @@ ASTPtr DatabaseDataLake::getCreateDatabaseQuery() const

ASTPtr DatabaseDataLake::getCreateTableQueryImpl(
const String & name,
ContextPtr /* context_ */,
ContextPtr context_,
bool /* throw_on_error */) const
{
auto catalog = getCatalog();
auto table_metadata = DataLake::TableMetadata().withLocation().withSchema();

const auto [namespace_name, table_name] = DataLake::parseTableName(name);

if (!catalog->tryGetTableMetadata(namespace_name, table_name, table_metadata))
if (!catalog->tryGetTableMetadata(namespace_name, table_name, context_, table_metadata))
{
throw Exception(
ErrorCodes::CANNOT_GET_CREATE_TABLE_QUERY, "Table `{}` doesn't exist", name);
Expand Down
6 changes: 4 additions & 2 deletions src/Databases/DataLake/GlueCatalog.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,7 @@ bool GlueCatalog::existsTable(const std::string & database_name, const std::stri
bool GlueCatalog::tryGetTableMetadata(
const std::string & database_name,
const std::string & table_name,
DB::ContextPtr /* context_ */,
TableMetadata & result) const
{
Aws::Glue::Model::GetTableRequest request;
Expand Down Expand Up @@ -372,7 +373,7 @@ bool GlueCatalog::tryGetTableMetadata(
column_type = "timestamptz";
}

schema.push_back({column.GetName(), getType(column_type, can_be_nullable)});
schema.push_back({column.GetName(), getType(column_type, can_be_nullable, getContext())});
}
result.setSchema(schema);
}
Expand All @@ -394,9 +395,10 @@ bool GlueCatalog::tryGetTableMetadata(
void GlueCatalog::getTableMetadata(
const std::string & database_name,
const std::string & table_name,
DB::ContextPtr context_,
TableMetadata & result) const
{
if (!tryGetTableMetadata(database_name, table_name, result))
if (!tryGetTableMetadata(database_name, table_name, context_, result))
{
throw DB::Exception(
DB::ErrorCodes::DATALAKE_DATABASE_ERROR,
Expand Down
2 changes: 2 additions & 0 deletions src/Databases/DataLake/GlueCatalog.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,13 @@ class GlueCatalog final : public ICatalog, private DB::WithContext
void getTableMetadata(
const std::string & database_name,
const std::string & table_name,
DB::ContextPtr context_,
TableMetadata & result) const override;

bool tryGetTableMetadata(
const std::string & database_name,
const std::string & table_name,
DB::ContextPtr context_,
TableMetadata & result) const override;

std::optional<StorageType> getStorageType() const override
Expand Down
16 changes: 12 additions & 4 deletions src/Databases/DataLake/HiveCatalog.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -96,13 +96,21 @@ bool HiveCatalog::existsTable(const std::string & namespace_name, const std::str
return true;
}

void HiveCatalog::getTableMetadata(const std::string & namespace_name, const std::string & table_name, TableMetadata & result) const
void HiveCatalog::getTableMetadata(
const std::string & namespace_name,
const std::string & table_name,
DB::ContextPtr context_,
TableMetadata & result) const
{
if (!tryGetTableMetadata(namespace_name, table_name, result))
if (!tryGetTableMetadata(namespace_name, table_name, context_, result))
throw DB::Exception(DB::ErrorCodes::DATALAKE_DATABASE_ERROR, "No response from iceberg catalog");
}

bool HiveCatalog::tryGetTableMetadata(const std::string & namespace_name, const std::string & table_name, TableMetadata & result) const
bool HiveCatalog::tryGetTableMetadata(
const std::string & namespace_name,
const std::string & table_name,
DB::ContextPtr context_,
TableMetadata & result) const
{
Apache::Hadoop::Hive::Table table;

Expand Down Expand Up @@ -130,7 +138,7 @@ bool HiveCatalog::tryGetTableMetadata(const std::string & namespace_name, const
auto columns = table.sd.cols;
for (const auto & column : columns)
{
schema.push_back({column.name, getType(column.type, true)});
schema.push_back({column.name, getType(column.type, true, context_)});
}
result.setSchema(schema);
}
Expand Down
14 changes: 11 additions & 3 deletions src/Databases/DataLake/HiveCatalog.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,17 @@ class HiveCatalog final : public ICatalog, private DB::WithContext

bool existsTable(const std::string & namespace_name, const std::string & table_name) const override;

void getTableMetadata(const std::string & namespace_name, const std::string & table_name, TableMetadata & result) const override;

bool tryGetTableMetadata(const std::string & namespace_name, const std::string & table_name, TableMetadata & result) const override;
void getTableMetadata(
const std::string & namespace_name,
const std::string & table_name,
DB::ContextPtr context_,
TableMetadata & result) const override;

bool tryGetTableMetadata(
const std::string & namespace_name,
const std::string & table_name,
DB::ContextPtr context_,
TableMetadata & result) const override;

std::optional<StorageType> getStorageType() const override;

Expand Down
10 changes: 10 additions & 0 deletions src/Databases/DataLake/ICatalog.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,14 @@
#include <Databases/DataLake/DatabaseDataLakeStorageType.h>
#include <Poco/JSON/Object.h>

namespace DB
{

class Context;
using ContextPtr = std::shared_ptr<const Context>;

}

namespace DataLake
{

Expand Down Expand Up @@ -150,13 +158,15 @@ class ICatalog
virtual void getTableMetadata(
const std::string & namespace_name,
const std::string & table_name,
DB::ContextPtr context,
TableMetadata & result) const = 0;

/// Get table metadata in the given namespace.
/// Return `false` if table does not exist, `true` otherwise.
virtual bool tryGetTableMetadata(
const std::string & namespace_name,
const std::string & table_name,
DB::ContextPtr context,
TableMetadata & result) const = 0;

/// Get storage type, where Iceberg tables' data is stored.
Expand Down
13 changes: 8 additions & 5 deletions src/Databases/DataLake/RestCatalog.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -532,17 +532,18 @@ DB::Names RestCatalog::parseTables(DB::ReadBuffer & buf, const std::string & bas
bool RestCatalog::existsTable(const std::string & namespace_name, const std::string & table_name) const
{
TableMetadata table_metadata;
return tryGetTableMetadata(namespace_name, table_name, table_metadata);
return tryGetTableMetadata(namespace_name, table_name, getContext(), table_metadata);
}

bool RestCatalog::tryGetTableMetadata(
const std::string & namespace_name,
const std::string & table_name,
DB::ContextPtr context_,
TableMetadata & result) const
{
try
{
return getTableMetadataImpl(namespace_name, table_name, result);
return getTableMetadataImpl(namespace_name, table_name, context_, result);
}
catch (...)
{
Expand All @@ -554,15 +555,17 @@ bool RestCatalog::tryGetTableMetadata(
void RestCatalog::getTableMetadata(
const std::string & namespace_name,
const std::string & table_name,
DB::ContextPtr context_,
TableMetadata & result) const
{
if (!getTableMetadataImpl(namespace_name, table_name, result))
if (!getTableMetadataImpl(namespace_name, table_name, context_, result))
throw DB::Exception(DB::ErrorCodes::DATALAKE_DATABASE_ERROR, "No response from iceberg catalog");
}

bool RestCatalog::getTableMetadataImpl(
const std::string & namespace_name,
const std::string & table_name,
DB::ContextPtr context_,
TableMetadata & result) const
{
LOG_TEST(log, "Checking table {} in namespace {}", table_name, namespace_name);
Expand Down Expand Up @@ -623,8 +626,8 @@ bool RestCatalog::getTableMetadataImpl(
if (result.requiresSchema())
{
// int format_version = metadata_object->getValue<int>("format-version");
auto schema_processor = DB::Iceberg::IcebergSchemaProcessor();
auto id = DB::IcebergMetadata::parseTableSchema(metadata_object, schema_processor, log);
auto schema_processor = DB::Iceberg::IcebergSchemaProcessor(context_);
auto id = DB::IcebergMetadata::parseTableSchema(metadata_object, schema_processor, context_, log);
auto schema = schema_processor.getClickhouseTableSchemaById(id);
result.setSchema(*schema);
}
Expand Down
3 changes: 3 additions & 0 deletions src/Databases/DataLake/RestCatalog.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,13 @@ class RestCatalog final : public ICatalog, private DB::WithContext
void getTableMetadata(
const std::string & namespace_name,
const std::string & table_name,
DB::ContextPtr context_,
TableMetadata & result) const override;

bool tryGetTableMetadata(
const std::string & namespace_name,
const std::string & table_name,
DB::ContextPtr context_,
TableMetadata & result) const override;

std::optional<StorageType> getStorageType() const override;
Expand Down Expand Up @@ -126,6 +128,7 @@ class RestCatalog final : public ICatalog, private DB::WithContext
bool getTableMetadataImpl(
const std::string & namespace_name,
const std::string & table_name,
DB::ContextPtr context_,
TableMetadata & result) const;

Config loadConfig();
Expand Down
4 changes: 3 additions & 1 deletion src/Databases/DataLake/UnityCatalog.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -91,9 +91,10 @@ DB::Names UnityCatalog::getTables() const
void UnityCatalog::getTableMetadata(
const std::string & namespace_name,
const std::string & table_name,
DB::ContextPtr context_,
TableMetadata & result) const
{
if (!tryGetTableMetadata(namespace_name, table_name, result))
if (!tryGetTableMetadata(namespace_name, table_name, context_, result))
throw DB::Exception(DB::ErrorCodes::DATALAKE_DATABASE_ERROR, "No response from unity catalog");
}

Expand Down Expand Up @@ -136,6 +137,7 @@ void UnityCatalog::getCredentials(const std::string & table_id, TableMetadata &
bool UnityCatalog::tryGetTableMetadata(
const std::string & schema_name,
const std::string & table_name,
DB::ContextPtr /* context_ */,
TableMetadata & result) const
{
auto full_table_name = warehouse + "." + schema_name + "." + table_name;
Expand Down
2 changes: 2 additions & 0 deletions src/Databases/DataLake/UnityCatalog.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,13 @@ class UnityCatalog final : public ICatalog, private DB::WithContext
void getTableMetadata(
const std::string & namespace_name,
const std::string & table_name,
DB::ContextPtr context_,
TableMetadata & result) const override;

bool tryGetTableMetadata(
const std::string & schema_name,
const std::string & table_name,
DB::ContextPtr context_,
TableMetadata & result) const override;

std::optional<StorageType> getStorageType() const override { return std::nullopt; }
Expand Down
Loading
Loading