Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions src/Core/SettingsEnums.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -249,4 +249,11 @@ IMPLEMENT_SETTING_ENUM(
GroupArrayActionWhenLimitReached,
ErrorCodes::BAD_ARGUMENTS,
{{"throw", GroupArrayActionWhenLimitReached::THROW}, {"discard", GroupArrayActionWhenLimitReached::DISCARD}})

IMPLEMENT_SETTING_ENUM(
SearchOrphanedPartsDisks,
ErrorCodes::BAD_ARGUMENTS,
{{"any", SearchOrphanedPartsDisks::ANY},
{"local", SearchOrphanedPartsDisks::LOCAL},
{"none", SearchOrphanedPartsDisks::NONE}})
}
8 changes: 8 additions & 0 deletions src/Core/SettingsEnums.h
Original file line number Diff line number Diff line change
Expand Up @@ -354,4 +354,12 @@ enum class GroupArrayActionWhenLimitReached : uint8_t
};
DECLARE_SETTING_ENUM(GroupArrayActionWhenLimitReached)

enum class SearchOrphanedPartsDisks : uint8_t
{
NONE,
LOCAL,
ANY
};

DECLARE_SETTING_ENUM(SearchOrphanedPartsDisks)
}
22 changes: 21 additions & 1 deletion src/Interpreters/DatabaseCatalog.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
#include <Databases/DatabaseOnDisk.h>
#include <Disks/IDisk.h>
#include <Storages/StorageMemory.h>
#include <Storages/MergeTree/MergeTreeSettings.h>
#include <Storages/MergeTree/MergeTreeData.h>
#include <Core/BackgroundSchedulePool.h>
#include <Parsers/formatAST.h>
#include <IO/ReadHelpers.h>
Expand Down Expand Up @@ -1370,11 +1372,29 @@ void DatabaseCatalog::dropTableFinally(const TableMarkedAsDropped & table)
table.table->drop();
}

/// Check if we are interested in a particular disk
/// or it is better to bypass it e.g. to avoid interactions with a remote storage
auto is_disk_eligible_for_search = [this](DiskPtr disk, std::shared_ptr<MergeTreeData> storage)
{
bool is_disk_eligible = !disk->isReadOnly();

/// Disk is not actually used by MergeTree table
if (is_disk_eligible && storage && !storage->getStoragePolicy()->tryGetVolumeIndexByDiskName(disk->getName()).has_value())
{
SearchOrphanedPartsDisks mode = storage->getSettings()->search_orphaned_parts_disks;
is_disk_eligible = mode == SearchOrphanedPartsDisks::ANY || (mode == SearchOrphanedPartsDisks::LOCAL && !disk->isRemote());
}

LOG_TRACE(log, "is disk {} eligible for search: {}", disk->getName(), is_disk_eligible);
return is_disk_eligible;
};

/// Even if table is not loaded, try remove its data from disks.
for (const auto & [disk_name, disk] : getContext()->getDisksMap())
{
String data_path = "store/" + getPathForUUID(table.table_id.uuid);
if (disk->isReadOnly() || !disk->exists(data_path))
auto table_merge_tree = std::dynamic_pointer_cast<MergeTreeData>(table.table);
if (!is_disk_eligible_for_search(disk, table_merge_tree) || !disk->exists(data_path))
continue;

LOG_INFO(log, "Removing data directory {} of dropped table {} from disk {}", data_path, table.table_id.getNameForLogs(), disk_name);
Expand Down
13 changes: 10 additions & 3 deletions src/Storages/MergeTree/MergeTreeData.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1683,6 +1683,14 @@ std::vector<MergeTreeData::LoadPartResult> MergeTreeData::loadDataPartsFromDisk(
return loaded_parts;
}

bool MergeTreeData::isDiskEligibleForOrphanedPartsSearch(DiskPtr disk) const
{
SearchOrphanedPartsDisks mode = getSettings()->search_orphaned_parts_disks;
bool is_disk_eligible = !disk->isBroken() && !disk->isCustomDisk() && (mode == SearchOrphanedPartsDisks::ANY || (mode == SearchOrphanedPartsDisks::LOCAL && !disk->isRemote()));

LOG_TRACE(log, "is disk {} eligible for search: {} (mode {})", disk->getName(), is_disk_eligible, mode);
return is_disk_eligible;
}

void MergeTreeData::loadDataParts(bool skip_sanity_checks, std::optional<std::unordered_set<std::string>> expected_parts)
{
Expand All @@ -1697,7 +1705,7 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks, std::optional<std::un
/// Only check if user did touch storage configuration for this table.
if (!getStoragePolicy()->isDefaultPolicy() && !skip_sanity_checks)
{
/// Check extra parts at different disks, in order to not allow to miss data parts at undefined disks.
/// Check extra (AKA orpahned) parts on different disks, in order to not allow to miss data parts at undefined disks.
std::unordered_set<String> defined_disk_names;

for (const auto & disk_ptr : disks)
Expand Down Expand Up @@ -1731,14 +1739,13 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks, std::optional<std::un
std::unordered_set<String> skip_check_disks;
for (const auto & [disk_name, disk] : getContext()->getDisksMap())
{
if (disk->isBroken() || disk->isCustomDisk())
if (!isDiskEligibleForOrphanedPartsSearch(disk))
{
skip_check_disks.insert(disk_name);
continue;
}

bool is_disk_defined = defined_disk_names.contains(disk_name);

if (!is_disk_defined && disk->exists(relative_data_path))
{
/// There still a chance that underlying disk is defined in storage policy
Expand Down
5 changes: 5 additions & 0 deletions src/Storages/MergeTree/MergeTreeData.h
Original file line number Diff line number Diff line change
Expand Up @@ -1752,6 +1752,11 @@ class MergeTreeData : public IStorage, public WithMutableContext

void checkColumnFilenamesForCollision(const StorageInMemoryMetadata & metadata, bool throw_on_error) const;
void checkColumnFilenamesForCollision(const ColumnsDescription & columns, const MergeTreeSettings & settings, bool throw_on_error) const;

/// Is the disk should be searched for orphaned parts (ones that belong to a table based on file names, but located
/// on disks that are not a part of storage policy of the table).
/// Sometimes it is better to bypass a disk e.g. to avoid interactions with a remote storage
bool isDiskEligibleForOrphanedPartsSearch(DiskPtr disk) const;
};

/// RAII struct to record big parts that are submerging or emerging.
Expand Down
1 change: 1 addition & 0 deletions src/Storages/MergeTree/MergeTreeSettings.h
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,7 @@ struct Settings;
M(Bool, disable_fetch_partition_for_zero_copy_replication, true, "Disable FETCH PARTITION query for zero copy replication.", 0) \
M(Bool, enable_block_number_column, false, "Enable persisting column _block_number for each row.", 0) ALIAS(allow_experimental_block_number_column) \
M(Bool, enable_block_offset_column, false, "Enable persisting column _block_offset for each row.", 0) \
M(SearchOrphanedPartsDisks, search_orphaned_parts_disks, SearchOrphanedPartsDisks::ANY, "ClickHouse scans all disks for orphaned parts upon any ATTACH or CREATE table in order to not allow to miss data parts at undefined (not included in policy) disks. Orphaned parts originates from potentially unsafe storage reconfiguration, e.g. if a disk was excluded from storage policy. This setting limits scope of disks to search by traits of the disks. Possible values: - any - scope is not limited, - local - scope is limited by local disks, - none - empty scope, do not search", 0) \
\
/** Experimental/work in progress feature. Unsafe for production. */ \
M(UInt64, part_moves_between_shards_enable, 0, "Experimental/Incomplete feature to move parts between shards. Does not take into account sharding expressions.", 0) \
Expand Down
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
<clickhouse>
<storage_configuration>
<disks>
<default>
<keep_free_space_bytes>1024</keep_free_space_bytes>
</default>
<disk_s3_plain>
<type>s3_plain</type>
<endpoint>http://minio1:9001/root/data/disks/disk_s3_plain/</endpoint>
<access_key_id>minio</access_key_id>
<secret_access_key>minio123</secret_access_key>
<skip_access_check>true</skip_access_check>
</disk_s3_plain>
<local_disk_1>
<path>/internal/1/</path>
</local_disk_1>
<local_disk_2>
<type>local_blob_storage</type>
<path>/internal/2/</path>
</local_disk_2>
<!-- plain_rewritable disk is not able to start w/o S3, since it tries to load metadata at start-->
<!-- <s3_plain_rewritable> -->
<!-- <type>object_storage</type> -->
<!-- <object_storage_type>s3</object_storage_type> -->
<!-- <metadata_type>plain_rewritable</metadata_type> -->
<!-- <endpoint>http://minio1:9001/root/data/disks/disk_s3_plain_rewritable/</endpoint> -->
<!-- <access_key_id>minio</access_key_id> -->
<!-- <secret_access_key>ClickHouse_Minio_P@ssw0rd</secret_access_key> -->
<!-- <skip_access_check>true</skip_access_check> -->
<!-- </s3_plain_rewritable> -->
<local_cache>
<type>cache</type>
<disk>local_disk_2</disk>
<path>/local_cache/</path>
<cache_policy>LRU</cache_policy>
<max_size>12M</max_size>
<max_file_segment_size>100K</max_file_segment_size>
<boundary_alignment>100K</boundary_alignment>
<cache_on_write_operations>1</cache_on_write_operations>
</local_cache>
</disks>
<policies>
<default>
<volumes>
<vol1>
<disk>default</disk>
</vol1>
</volumes>
</default>
<no_s3>
<volumes>
<main>
<disk>local_disk_1</disk>
</main>
</volumes>
</no_s3>
<local_cache>
<volumes>
<main>
<disk>local_cache</disk>
</main>
</volumes>
</local_cache>
</policies>
</storage_configuration>
</clickhouse>
10 changes: 10 additions & 0 deletions tests/integration/test_search_orphaned_parts/configs/users.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
<clickhouse>
<profiles>
<default>
<enable_s3_requests_logging>1</enable_s3_requests_logging>
<s3_retry_attempts>5</s3_retry_attempts>
<s3_use_adaptive_timeouts>0</s3_use_adaptive_timeouts>
<s3_validate_request_settings>0</s3_validate_request_settings>
</default>
</profiles>
</clickhouse>
72 changes: 72 additions & 0 deletions tests/integration/test_search_orphaned_parts/test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
import logging
import pytest

from helpers.cluster import ClickHouseCluster


def get_cluster(with_minio):
cluster = ClickHouseCluster(__file__)
cluster.add_instance(
"node",
main_configs=["configs/storage_conf.xml"],
user_configs=["configs/users.xml"],
with_minio=with_minio,
stay_alive=True,
# remote database disk adds MinIO implicitly
# FIXME: disable with_remote_database_disk if with_minio set to False explicitly
)
logging.info("Starting cluster...")
cluster.start()
logging.info("Cluster started")

return cluster


# ClickHouse checks extra (AKA orpahned) parts on different disks, in order to not allow to miss data parts at undefined disks.
# The test verifies how the search of orphaned parts works if there is no connection to MinIO.
# The following is expected
# * search_orphaned_parts_disks is `none` - does not search s3, the query is successful
# * search_orphaned_parts_disks is `local` - does not search s3, the query is successful
# * search_orphaned_parts_disks is `any` - searches s3, the query throws if no MinIO
# Note that disk_s3_plain is configured disk that is not used either in n_s3 or local_cache policies.
@pytest.mark.parametrize("with_minio", [True, False])
def test_search_orphaned_parts(with_minio):
table_name = "t1"

try:
cluster = get_cluster(with_minio)

node = cluster.instances["node"]

for search_mode in ["any", "local", "none"]:
for storage_policy in ["no_s3", "local_cache"]:
node.query(f"DROP TABLE IF EXISTS {table_name} SYNC")

if search_mode == "any" and not with_minio:
assert "Code: 499. DB::Exception" in node.query_and_get_error(
f"""
CREATE TABLE {table_name} (
id Int64,
data String
) ENGINE=MergeTree()
PARTITION BY id % 10
ORDER BY id
SETTINGS storage_policy='{storage_policy}', search_orphaned_parts_disks='{search_mode}'
"""
)
else:
node.query(
f"""
CREATE TABLE {table_name} (
id Int64,
data String
) ENGINE=MergeTree()
PARTITION BY id % 10
ORDER BY id
SETTINGS storage_policy='{storage_policy}', search_orphaned_parts_disks='{search_mode}'
"""
)
node.query(f"DROP TABLE IF EXISTS {table_name} SYNC")

finally:
cluster.shutdown()
Loading