diff --git a/src/Core/SettingsEnums.cpp b/src/Core/SettingsEnums.cpp index 1762022102dd..84487ff5087b 100644 --- a/src/Core/SettingsEnums.cpp +++ b/src/Core/SettingsEnums.cpp @@ -249,4 +249,11 @@ IMPLEMENT_SETTING_ENUM( GroupArrayActionWhenLimitReached, ErrorCodes::BAD_ARGUMENTS, {{"throw", GroupArrayActionWhenLimitReached::THROW}, {"discard", GroupArrayActionWhenLimitReached::DISCARD}}) + +IMPLEMENT_SETTING_ENUM( + SearchOrphanedPartsDisks, + ErrorCodes::BAD_ARGUMENTS, + {{"any", SearchOrphanedPartsDisks::ANY}, + {"local", SearchOrphanedPartsDisks::LOCAL}, + {"none", SearchOrphanedPartsDisks::NONE}}) } diff --git a/src/Core/SettingsEnums.h b/src/Core/SettingsEnums.h index d8d77504b85f..2c92fe6f6432 100644 --- a/src/Core/SettingsEnums.h +++ b/src/Core/SettingsEnums.h @@ -354,4 +354,12 @@ enum class GroupArrayActionWhenLimitReached : uint8_t }; DECLARE_SETTING_ENUM(GroupArrayActionWhenLimitReached) +enum class SearchOrphanedPartsDisks : uint8_t +{ + NONE, + LOCAL, + ANY +}; + +DECLARE_SETTING_ENUM(SearchOrphanedPartsDisks) } diff --git a/src/Interpreters/DatabaseCatalog.cpp b/src/Interpreters/DatabaseCatalog.cpp index fb4fad85f66d..bf43688d464b 100644 --- a/src/Interpreters/DatabaseCatalog.cpp +++ b/src/Interpreters/DatabaseCatalog.cpp @@ -14,6 +14,8 @@ #include #include #include +#include +#include #include #include #include @@ -1370,11 +1372,29 @@ void DatabaseCatalog::dropTableFinally(const TableMarkedAsDropped & table) table.table->drop(); } + /// Check if we are interested in a particular disk + /// or it is better to bypass it e.g. to avoid interactions with a remote storage + auto is_disk_eligible_for_search = [this](DiskPtr disk, std::shared_ptr storage) + { + bool is_disk_eligible = !disk->isReadOnly(); + + /// Disk is not actually used by MergeTree table + if (is_disk_eligible && storage && !storage->getStoragePolicy()->tryGetVolumeIndexByDiskName(disk->getName()).has_value()) + { + SearchOrphanedPartsDisks mode = storage->getSettings()->search_orphaned_parts_disks; + is_disk_eligible = mode == SearchOrphanedPartsDisks::ANY || (mode == SearchOrphanedPartsDisks::LOCAL && !disk->isRemote()); + } + + LOG_TRACE(log, "is disk {} eligible for search: {}", disk->getName(), is_disk_eligible); + return is_disk_eligible; + }; + /// Even if table is not loaded, try remove its data from disks. for (const auto & [disk_name, disk] : getContext()->getDisksMap()) { String data_path = "store/" + getPathForUUID(table.table_id.uuid); - if (disk->isReadOnly() || !disk->exists(data_path)) + auto table_merge_tree = std::dynamic_pointer_cast(table.table); + if (!is_disk_eligible_for_search(disk, table_merge_tree) || !disk->exists(data_path)) continue; LOG_INFO(log, "Removing data directory {} of dropped table {} from disk {}", data_path, table.table_id.getNameForLogs(), disk_name); diff --git a/src/Storages/MergeTree/MergeTreeData.cpp b/src/Storages/MergeTree/MergeTreeData.cpp index 76215c0f8673..4061f791763f 100644 --- a/src/Storages/MergeTree/MergeTreeData.cpp +++ b/src/Storages/MergeTree/MergeTreeData.cpp @@ -1683,6 +1683,14 @@ std::vector MergeTreeData::loadDataPartsFromDisk( return loaded_parts; } +bool MergeTreeData::isDiskEligibleForOrphanedPartsSearch(DiskPtr disk) const +{ + SearchOrphanedPartsDisks mode = getSettings()->search_orphaned_parts_disks; + bool is_disk_eligible = !disk->isBroken() && !disk->isCustomDisk() && (mode == SearchOrphanedPartsDisks::ANY || (mode == SearchOrphanedPartsDisks::LOCAL && !disk->isRemote())); + + LOG_TRACE(log, "is disk {} eligible for search: {} (mode {})", disk->getName(), is_disk_eligible, mode); + return is_disk_eligible; +} void MergeTreeData::loadDataParts(bool skip_sanity_checks, std::optional> expected_parts) { @@ -1697,7 +1705,7 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks, std::optionalisDefaultPolicy() && !skip_sanity_checks) { - /// Check extra parts at different disks, in order to not allow to miss data parts at undefined disks. + /// Check extra (AKA orpahned) parts on different disks, in order to not allow to miss data parts at undefined disks. std::unordered_set defined_disk_names; for (const auto & disk_ptr : disks) @@ -1731,14 +1739,13 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks, std::optional skip_check_disks; for (const auto & [disk_name, disk] : getContext()->getDisksMap()) { - if (disk->isBroken() || disk->isCustomDisk()) + if (!isDiskEligibleForOrphanedPartsSearch(disk)) { skip_check_disks.insert(disk_name); continue; } bool is_disk_defined = defined_disk_names.contains(disk_name); - if (!is_disk_defined && disk->exists(relative_data_path)) { /// There still a chance that underlying disk is defined in storage policy diff --git a/src/Storages/MergeTree/MergeTreeData.h b/src/Storages/MergeTree/MergeTreeData.h index dc37d5e7dadf..af14d7c9d98f 100644 --- a/src/Storages/MergeTree/MergeTreeData.h +++ b/src/Storages/MergeTree/MergeTreeData.h @@ -1752,6 +1752,11 @@ class MergeTreeData : public IStorage, public WithMutableContext void checkColumnFilenamesForCollision(const StorageInMemoryMetadata & metadata, bool throw_on_error) const; void checkColumnFilenamesForCollision(const ColumnsDescription & columns, const MergeTreeSettings & settings, bool throw_on_error) const; + + /// Is the disk should be searched for orphaned parts (ones that belong to a table based on file names, but located + /// on disks that are not a part of storage policy of the table). + /// Sometimes it is better to bypass a disk e.g. to avoid interactions with a remote storage + bool isDiskEligibleForOrphanedPartsSearch(DiskPtr disk) const; }; /// RAII struct to record big parts that are submerging or emerging. diff --git a/src/Storages/MergeTree/MergeTreeSettings.h b/src/Storages/MergeTree/MergeTreeSettings.h index ea8b92da752e..26110b089101 100644 --- a/src/Storages/MergeTree/MergeTreeSettings.h +++ b/src/Storages/MergeTree/MergeTreeSettings.h @@ -195,6 +195,7 @@ struct Settings; M(Bool, disable_fetch_partition_for_zero_copy_replication, true, "Disable FETCH PARTITION query for zero copy replication.", 0) \ M(Bool, enable_block_number_column, false, "Enable persisting column _block_number for each row.", 0) ALIAS(allow_experimental_block_number_column) \ M(Bool, enable_block_offset_column, false, "Enable persisting column _block_offset for each row.", 0) \ + M(SearchOrphanedPartsDisks, search_orphaned_parts_disks, SearchOrphanedPartsDisks::ANY, "ClickHouse scans all disks for orphaned parts upon any ATTACH or CREATE table in order to not allow to miss data parts at undefined (not included in policy) disks. Orphaned parts originates from potentially unsafe storage reconfiguration, e.g. if a disk was excluded from storage policy. This setting limits scope of disks to search by traits of the disks. Possible values: - any - scope is not limited, - local - scope is limited by local disks, - none - empty scope, do not search", 0) \ \ /** Experimental/work in progress feature. Unsafe for production. */ \ M(UInt64, part_moves_between_shards_enable, 0, "Experimental/Incomplete feature to move parts between shards. Does not take into account sharding expressions.", 0) \ diff --git a/tests/integration/test_search_orphaned_parts/__init__.py b/tests/integration/test_search_orphaned_parts/__init__.py new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/tests/integration/test_search_orphaned_parts/configs/storage_conf.xml b/tests/integration/test_search_orphaned_parts/configs/storage_conf.xml new file mode 100644 index 000000000000..28fae70569a6 --- /dev/null +++ b/tests/integration/test_search_orphaned_parts/configs/storage_conf.xml @@ -0,0 +1,66 @@ + + + + + 1024 + + + s3_plain + http://minio1:9001/root/data/disks/disk_s3_plain/ + minio + minio123 + true + + + /internal/1/ + + + local_blob_storage + /internal/2/ + + + + + + + + + + + + + cache + local_disk_2 + /local_cache/ + LRU + 12M + 100K + 100K + 1 + + + + + + + default + + + + + +
+ local_disk_1 +
+
+
+ + +
+ local_cache +
+
+
+
+
+
diff --git a/tests/integration/test_search_orphaned_parts/configs/users.xml b/tests/integration/test_search_orphaned_parts/configs/users.xml new file mode 100644 index 000000000000..5a46eca567df --- /dev/null +++ b/tests/integration/test_search_orphaned_parts/configs/users.xml @@ -0,0 +1,10 @@ + + + + 1 + 5 + 0 + 0 + + + diff --git a/tests/integration/test_search_orphaned_parts/test.py b/tests/integration/test_search_orphaned_parts/test.py new file mode 100644 index 000000000000..654c13501cd4 --- /dev/null +++ b/tests/integration/test_search_orphaned_parts/test.py @@ -0,0 +1,72 @@ +import logging +import pytest + +from helpers.cluster import ClickHouseCluster + + +def get_cluster(with_minio): + cluster = ClickHouseCluster(__file__) + cluster.add_instance( + "node", + main_configs=["configs/storage_conf.xml"], + user_configs=["configs/users.xml"], + with_minio=with_minio, + stay_alive=True, + # remote database disk adds MinIO implicitly + # FIXME: disable with_remote_database_disk if with_minio set to False explicitly + ) + logging.info("Starting cluster...") + cluster.start() + logging.info("Cluster started") + + return cluster + + +# ClickHouse checks extra (AKA orpahned) parts on different disks, in order to not allow to miss data parts at undefined disks. +# The test verifies how the search of orphaned parts works if there is no connection to MinIO. +# The following is expected +# * search_orphaned_parts_disks is `none` - does not search s3, the query is successful +# * search_orphaned_parts_disks is `local` - does not search s3, the query is successful +# * search_orphaned_parts_disks is `any` - searches s3, the query throws if no MinIO +# Note that disk_s3_plain is configured disk that is not used either in n_s3 or local_cache policies. +@pytest.mark.parametrize("with_minio", [True, False]) +def test_search_orphaned_parts(with_minio): + table_name = "t1" + + try: + cluster = get_cluster(with_minio) + + node = cluster.instances["node"] + + for search_mode in ["any", "local", "none"]: + for storage_policy in ["no_s3", "local_cache"]: + node.query(f"DROP TABLE IF EXISTS {table_name} SYNC") + + if search_mode == "any" and not with_minio: + assert "Code: 499. DB::Exception" in node.query_and_get_error( + f""" + CREATE TABLE {table_name} ( + id Int64, + data String + ) ENGINE=MergeTree() + PARTITION BY id % 10 + ORDER BY id + SETTINGS storage_policy='{storage_policy}', search_orphaned_parts_disks='{search_mode}' + """ + ) + else: + node.query( + f""" + CREATE TABLE {table_name} ( + id Int64, + data String + ) ENGINE=MergeTree() + PARTITION BY id % 10 + ORDER BY id + SETTINGS storage_policy='{storage_policy}', search_orphaned_parts_disks='{search_mode}' + """ + ) + node.query(f"DROP TABLE IF EXISTS {table_name} SYNC") + + finally: + cluster.shutdown()