diff --git a/contrib/tiflash-proxy b/contrib/tiflash-proxy index ac4614fdfbb..ae5205414b7 160000 --- a/contrib/tiflash-proxy +++ b/contrib/tiflash-proxy @@ -1 +1 @@ -Subproject commit ac4614fdfbb9ac88beb09d5044d707bd703d65d6 +Subproject commit ae5205414b7b8c3b6eee6307026cab2f716f46b2 diff --git a/dbms/src/Common/TiFlashMetrics.h b/dbms/src/Common/TiFlashMetrics.h index 3385b075295..45780510757 100644 --- a/dbms/src/Common/TiFlashMetrics.h +++ b/dbms/src/Common/TiFlashMetrics.h @@ -160,6 +160,20 @@ namespace DB F(type_v3, {{"type", "v3"}}, ExpBuckets{0.0005, 2, 20})) \ M(tiflash_storage_page_write_batch_size, "The size of each write batch in bytes", Histogram, \ F(type_v3, {{"type", "v3"}}, ExpBuckets{4 * 1024, 4, 10})) \ + M(tiflash_storage_page_write_duration_seconds, "The duration of each write batch", Histogram, \ + F(type_total, {{"type", "total"}}, ExpBuckets{0.0001, 2, 20}), \ + F(type_blob, {{"type", "blob"}}, ExpBuckets{0.0001, 2, 20}), \ + /* the bucket range for apply in memory is 50us ~ 120s */ \ + F(type_latch, {{"type", "latch"}}, ExpBuckets{0.00005, 1.8, 26}), \ + F(type_wal, {{"type", "wal"}}, ExpBuckets{0.00005, 1.8, 26}), \ + F(type_commit, {{"type", "commmit"}}, ExpBuckets{0.00005, 1.8, 26}), \ + F(type_edit, {{"type", "edit"}}, ExpBuckets{0.00005, 1.8, 26}), \ + F(type_get_stat, {{"type", "get_stat"}}, ExpBuckets{0.00005, 1.8, 26}), \ + F(type_get_stat_latch, {{"type", "get_stat_latch"}}, ExpBuckets{0.00005, 1.8, 26}), \ + F(type_blob_write, {{"type", "blob_write"}}, ExpBuckets{0.00005, 1.8, 26}), \ + F(type_choose_stat, {{"type", "choose_stat"}}, ExpBuckets{0.00005, 1.8, 26}), \ + F(type_lock_stat, {{"type", "lock_stat"}}, ExpBuckets{0.00005, 1.8, 26}), \ + F(type_get_pos_from_stat, {{"type", "get_pos_from_stat"}}, ExpBuckets{0.00005, 1.8, 26})) \ M(tiflash_storage_logical_throughput_bytes, "The logical throughput of read tasks of storage in bytes", Histogram, \ F(type_read, {{"type", "read"}}, EqualWidthBuckets{1 * 1024 * 1024, 60, 50 * 1024 * 1024})) \ M(tiflash_storage_io_limiter, "Storage I/O limiter metrics", Counter, F(type_fg_read_req_bytes, {"type", "fg_read_req_bytes"}), \ @@ -237,7 +251,7 @@ namespace DB struct ExpBuckets { const double start; - const int base; + const double base; const size_t size; inline operator prometheus::Histogram::BucketBoundaries() const && { diff --git a/dbms/src/Interpreters/Context.cpp b/dbms/src/Interpreters/Context.cpp index f1c04025bd1..047f7168891 100644 --- a/dbms/src/Interpreters/Context.cpp +++ b/dbms/src/Interpreters/Context.cpp @@ -55,6 +55,7 @@ #include #include #include +#include #include #include #include @@ -65,6 +66,8 @@ #include #include + + #include #include #include @@ -106,6 +109,35 @@ namespace FailPoints extern const char force_context_path[]; } // namespace FailPoints +struct UniversalPageStorageWrapper +{ + UniversalPageStoragePtr uni_page_storage; + BackgroundProcessingPool::TaskHandle gc_handle; + std::atomic last_try_gc_time = Clock::now(); + + void restore(Context & global_context) + { + uni_page_storage->restore(); + gc_handle = global_context.getBackgroundPool().addTask( + [this, global_context] { + return this->gc(); + }, + false); + } + + bool gc() + { + Timepoint now = Clock::now(); + const std::chrono::seconds try_gc_period(10); + if (now < (last_try_gc_time.load() + try_gc_period)) + return false; + + last_try_gc_time = now; + return this->uni_page_storage->gc(); + } +}; +using UniversalPageStorageWrapperPtr = std::shared_ptr; + /** Set of known objects (environment), that could be used in query. * Shared (global) part. Order of members (especially, order of destruction) is very important. @@ -159,6 +191,7 @@ struct ContextShared IORateLimiter io_rate_limiter; PageStorageRunMode storage_run_mode = PageStorageRunMode::ONLY_V3; DM::GlobalStoragePoolPtr global_storage_pool; + UniversalPageStorageWrapperPtr uni_page_storage_wrapper; /// Named sessions. The user could specify session identifier to reuse settings and temporary tables in subsequent requests. class SessionKeyHash @@ -1615,6 +1648,24 @@ DM::GlobalStoragePoolPtr Context::getGlobalStoragePool() const return shared->global_storage_pool; } +void Context::initializeGlobalUniversalPageStorage(const PathPool & path_pool, const FileProviderPtr & file_provider) +{ + auto lock = getLock(); + if (shared->uni_page_storage_wrapper) + throw Exception("UniversalPageStorage has already been initialized.", ErrorCodes::LOGICAL_ERROR); + + shared->uni_page_storage_wrapper = std::make_shared(); + shared->uni_page_storage_wrapper->uni_page_storage = UniversalPageStorage::create("global", path_pool.getPSDiskDelegatorGlobalMulti("global"), {}, file_provider); + shared->uni_page_storage_wrapper->restore(*this); + LOG_INFO(shared->log, "initialized GlobalUniversalPageStorage"); +} + +UniversalPageStoragePtr Context::getGlobalUniversalPageStorage() const +{ + auto lock = getLock(); + return shared->uni_page_storage_wrapper->uni_page_storage; +} + UInt16 Context::getTCPPort() const { auto lock = getLock(); diff --git a/dbms/src/Interpreters/Context.h b/dbms/src/Interpreters/Context.h index 34fdc0536aa..d1a1ced9757 100644 --- a/dbms/src/Interpreters/Context.h +++ b/dbms/src/Interpreters/Context.h @@ -120,6 +120,9 @@ using Dependencies = std::vector; using TableAndCreateAST = std::pair; using TableAndCreateASTs = std::map; +class UniversalPageStorage; +using UniversalPageStoragePtr = std::shared_ptr; + /** A set of known objects that can be used in the query. * Consists of a shared part (always common to all sessions and queries) * and copied part (which can be its own for each session or query). @@ -421,6 +424,9 @@ class Context bool initializeGlobalStoragePoolIfNeed(const PathPool & path_pool); DM::GlobalStoragePoolPtr getGlobalStoragePool() const; + void initializeGlobalUniversalPageStorage(const PathPool & path_pool, const FileProviderPtr & file_provider); + UniversalPageStoragePtr getGlobalUniversalPageStorage() const; + /// Call after initialization before using system logs. Call for global context. void initializeSystemLogs(); diff --git a/dbms/src/Server/Server.cpp b/dbms/src/Server/Server.cpp index 34f63b32579..3271092363d 100644 --- a/dbms/src/Server/Server.cpp +++ b/dbms/src/Server/Server.cpp @@ -1087,6 +1087,9 @@ int Server::main(const std::vector & /*args*/) global_context->initializeGlobalStoragePoolIfNeed(global_context->getPathPool()); LOG_FMT_INFO(log, "Global PageStorage run mode is {}", static_cast(global_context->getPageStorageRunMode())); + /// + global_context->initializeGlobalUniversalPageStorage(global_context->getPathPool(), global_context->getFileProvider()); + /// Initialize RateLimiter. global_context->initializeRateLimiter(config(), bg_pool, blockable_bg_pool); diff --git a/dbms/src/Storages/Page/V3/Blob/BlobStat.cpp b/dbms/src/Storages/Page/V3/Blob/BlobStat.cpp index e494b8108d4..1377bb63ab4 100644 --- a/dbms/src/Storages/Page/V3/Blob/BlobStat.cpp +++ b/dbms/src/Storages/Page/V3/Blob/BlobStat.cpp @@ -18,6 +18,7 @@ #include #include +#include "ext/scope_guard.h" namespace ProfileEvents { @@ -157,13 +158,20 @@ BlobStats::BlobStatPtr BlobStats::createStatNotChecking(BlobFileId blob_file_id, /// If creating a new BlobFile, we need to register the BlobFile's path to delegator, so it's necessary to call `addPageFileUsedSize` here. delegator->addPageFileUsedSize({blob_file_id, 0}, 0, path, true); stats_map[path].emplace_back(stat); + if (stats_map_next_index.find(path) == stats_map_next_index.end()) + { + stats_map_next_index[path] = 0; + } return stat; } void BlobStats::eraseStat(const BlobStatPtr && stat, const std::lock_guard &) { PageFileIdAndLevel id_lvl{stat->id, 0}; - stats_map[delegator->getPageFilePath(id_lvl)].remove(stat); + auto & stats = stats_map[delegator->getPageFilePath(id_lvl)]; + auto iter = std::find(stats.begin(), stats.end(), stat); + assert(iter != stats.end()); + stats.erase(iter); } void BlobStats::eraseStat(BlobFileId blob_file_id, const std::lock_guard & lock) @@ -196,9 +204,6 @@ void BlobStats::eraseStat(BlobFileId blob_file_id, const std::lock_guard BlobStats::chooseStat(size_t buf_size, const std::lock_guard &) { - BlobStatPtr stat_ptr = nullptr; - double smallest_valid_rate = 2; - // No stats exist if (stats_map.empty()) { @@ -213,27 +218,25 @@ std::pair BlobStats::chooseStat(size_t buf_s std::advance(stats_iter, stats_map_path_index); size_t path_iter_idx = 0; + SCOPE_EXIT({ + // advance the `stats_map_path_idx` without size checking + stats_map_path_index += path_iter_idx + 1; + }); for (path_iter_idx = 0; path_iter_idx < stats_map.size(); ++path_iter_idx) { // Try to find a suitable stat under current path (path=`stats_iter->first`) - for (const auto & stat : stats_iter->second) + for (size_t i = 0; i < stats_iter->second.size(); i++) { - auto lock = stat->lock(); // TODO: will it bring performance regression? - if (stat->isNormal() - && stat->sm_max_caps >= buf_size - && stat->sm_valid_rate < smallest_valid_rate) + stats_map_next_index[stats_iter->first] = stats_map_next_index[stats_iter->first] % stats_iter->second.size(); + const auto & stat = (stats_iter->second)[stats_map_next_index[stats_iter->first]]; + stats_map_next_index[stats_iter->first] += 1; + auto lock = stat->defer_lock(); // TODO: will it bring performance regression? + if (lock.try_lock() && stat->isNormal() && stat->sm_max_caps >= buf_size) { - smallest_valid_rate = stat->sm_valid_rate; - stat_ptr = stat; + return std::make_pair(stat, INVALID_BLOBFILE_ID); } } - // Already find the available stat under current path. - if (stat_ptr != nullptr) - { - break; - } - // Try to find stat in the next path. stats_iter++; if (stats_iter == stats_map.end()) @@ -242,16 +245,7 @@ std::pair BlobStats::chooseStat(size_t buf_s } } - // advance the `stats_map_path_idx` without size checking - stats_map_path_index += path_iter_idx + 1; - - // Can not find a suitable stat under all paths - if (stat_ptr == nullptr) - { - return std::make_pair(nullptr, roll_id); - } - - return std::make_pair(stat_ptr, INVALID_BLOBFILE_ID); + return std::make_pair(nullptr, roll_id); } BlobStats::BlobStatPtr BlobStats::blobIdToStat(BlobFileId file_id, bool ignore_not_exist) @@ -283,14 +277,14 @@ BlobStats::BlobStatPtr BlobStats::blobIdToStat(BlobFileId file_id, bool ignore_n * BlobStat methods * ********************/ -BlobFileOffset BlobStats::BlobStat::getPosFromStat(size_t buf_size, const std::lock_guard &) +BlobFileOffset BlobStats::BlobStat::getPosFromStat(size_t buf_size, const std::unique_lock &) { BlobFileOffset offset = 0; UInt64 max_cap = 0; bool expansion = true; std::tie(offset, max_cap, expansion) = smap->searchInsertOffset(buf_size); - ProfileEvents::increment(expansion ? ProfileEvents::PSV3MBlobExpansion : ProfileEvents::PSV3MBlobReused); +// ProfileEvents::increment(expansion ? ProfileEvents::PSV3MBlobExpansion : ProfileEvents::PSV3MBlobReused); /** * Whatever `searchInsertOffset` success or failed, @@ -321,7 +315,7 @@ BlobFileOffset BlobStats::BlobStat::getPosFromStat(size_t buf_size, const std::l return offset; } -size_t BlobStats::BlobStat::removePosFromStat(BlobFileOffset offset, size_t buf_size, const std::lock_guard &) +size_t BlobStats::BlobStat::removePosFromStat(BlobFileOffset offset, size_t buf_size, const std::unique_lock &) { if (!smap->markFree(offset, buf_size)) { diff --git a/dbms/src/Storages/Page/V3/Blob/BlobStat.h b/dbms/src/Storages/Page/V3/Blob/BlobStat.h index 20f1565a25b..4e0a5b9d87b 100644 --- a/dbms/src/Storages/Page/V3/Blob/BlobStat.h +++ b/dbms/src/Storages/Page/V3/Blob/BlobStat.h @@ -53,6 +53,8 @@ class BlobStats struct BlobStat { + String parent_path; + const BlobFileId id; std::atomic type; @@ -79,9 +81,14 @@ class BlobStats , sm_max_caps(sm_max_caps_) {} - [[nodiscard]] std::lock_guard lock() + [[nodiscard]] std::unique_lock lock() + { + return std::unique_lock(sm_lock); + } + + [[nodiscard]] std::unique_lock defer_lock() { - return std::lock_guard(sm_lock); + return std::unique_lock(sm_lock, std::defer_lock); } bool isNormal() const @@ -99,12 +106,12 @@ class BlobStats type.store(BlobStatType::READ_ONLY); } - BlobFileOffset getPosFromStat(size_t buf_size, const std::lock_guard &); + BlobFileOffset getPosFromStat(size_t buf_size, const std::unique_lock &); /** * The return value is the valid data size remained in the BlobFile after the remove */ - size_t removePosFromStat(BlobFileOffset offset, size_t buf_size, const std::lock_guard &); + size_t removePosFromStat(BlobFileOffset offset, size_t buf_size, const std::unique_lock &); /** * This method is only used when blobstore restore @@ -168,7 +175,7 @@ class BlobStats BlobStatPtr blobIdToStat(BlobFileId file_id, bool ignore_not_exist = false); - using StatsMap = std::map>; + using StatsMap = std::map>; StatsMap getStats() const { auto guard = lock(); @@ -196,7 +203,8 @@ class BlobStats BlobFileId roll_id = 1; // Index for selecting next path for creating new blobfile UInt32 stats_map_path_index = 0; - std::map> stats_map; + std::map> stats_map; + std::map stats_map_next_index; }; } // namespace DB::PS::V3 diff --git a/dbms/src/Storages/Page/V3/BlobStore.cpp b/dbms/src/Storages/Page/V3/BlobStore.cpp index f82d9da714d..01ce819c78e 100644 --- a/dbms/src/Storages/Page/V3/BlobStore.cpp +++ b/dbms/src/Storages/Page/V3/BlobStore.cpp @@ -14,6 +14,7 @@ #include #include +#include #include #include #include @@ -245,6 +246,7 @@ template typename Trait::PageEntriesEdit BlobStore::write(typename Trait::WriteBatch & wb, const WriteLimiterPtr & write_limiter) { + Stopwatch watch; ProfileEvents::increment(ProfileEvents::PSMWritePages, wb.putWriteCount()); const size_t all_page_data_size = wb.getTotalDataSize(); @@ -308,6 +310,8 @@ BlobStore::write(typename Trait::WriteBatch & wb, const WriteLimiterPtr & size_t actually_allocated_size = all_page_data_size + replenish_size; auto [blob_id, offset_in_file] = getPosFromStats(actually_allocated_size); + GET_METRIC(tiflash_storage_page_write_duration_seconds, type_get_stat).Observe(watch.elapsedSeconds()); + watch.restart(); size_t offset_in_allocated = 0; @@ -390,10 +394,13 @@ BlobStore::write(typename Trait::WriteBatch & wb, const WriteLimiterPtr & ErrorCodes::LOGICAL_ERROR); } + GET_METRIC(tiflash_storage_page_write_duration_seconds, type_edit).Observe(watch.elapsedSeconds()); + watch.restart(); try { auto blob_file = getBlobFile(blob_id); blob_file->write(buffer, offset_in_file, all_page_data_size, write_limiter); + GET_METRIC(tiflash_storage_page_write_duration_seconds, type_blob_write).Observe(watch.elapsedSeconds()); } catch (DB::Exception & e) { @@ -457,12 +464,19 @@ void BlobStore::remove(const PageEntriesV3 & del_entries) template std::pair BlobStore::getPosFromStats(size_t size) { + Stopwatch watch; BlobStatPtr stat; auto lock_stat = [size, this, &stat]() { + Stopwatch watch_inner; auto lock_stats = blob_stats.lock(); BlobFileId blob_file_id = INVALID_BLOBFILE_ID; std::tie(stat, blob_file_id) = blob_stats.chooseStat(size, lock_stats); + GET_METRIC(tiflash_storage_page_write_duration_seconds, type_choose_stat).Observe(watch_inner.elapsedSeconds()); + watch_inner.restart(); + SCOPE_EXIT({ + GET_METRIC(tiflash_storage_page_write_duration_seconds, type_lock_stat).Observe(watch_inner.elapsedSeconds()); + }); if (stat == nullptr) { // No valid stat for putting data with `size`, create a new one @@ -478,6 +492,12 @@ std::pair BlobStore::getPosFromStats(size_t s return stat->lock(); }(); + GET_METRIC(tiflash_storage_page_write_duration_seconds, type_get_stat_latch).Observe(watch.elapsedSeconds()); + watch.restart(); + SCOPE_EXIT({ + GET_METRIC(tiflash_storage_page_write_duration_seconds, type_get_pos_from_stat).Observe(watch.elapsedSeconds()); + }); + // We need to assume that this insert will reduce max_cap. // Because other threads may also be waiting for BlobStats to chooseStat during this time. // If max_cap is not reduced, it may cause the same BlobStat to accept multiple buffers and exceed its max_cap. diff --git a/dbms/src/Storages/Page/V3/PageDirectory.cpp b/dbms/src/Storages/Page/V3/PageDirectory.cpp index 3f09d33e3e8..cd890937398 100644 --- a/dbms/src/Storages/Page/V3/PageDirectory.cpp +++ b/dbms/src/Storages/Page/V3/PageDirectory.cpp @@ -27,6 +27,8 @@ #include #include #include +#include +#include #include #include @@ -509,8 +511,7 @@ bool VersionedPageEntries::cleanOutdatedEntries( UInt64 lowest_seq, typename Trait::EntriesDerefMap * normal_entries_to_deref, PageEntriesV3 * entries_removed, - const PageLock & /*page_lock*/, - bool keep_last_valid_var_entry) + const PageLock & /*page_lock*/) { if (type == EditRecordType::VAR_EXTERNAL) { @@ -560,11 +561,8 @@ bool VersionedPageEntries::cleanOutdatedEntries( // If the first version less than is entry, // then we can remove those entries prev of it. // If the first version less than is delete, - // we may keep the first valid entry before the delete entry in the following case: - // 1) if `keep_last_valid_var_entry` is true - // (this is only used when dump snapshot because there may be some upsert entry in later wal files, - // so we need keep the last valid entry here to avoid the delete entry being removed) - // 2) if `being_ref_count` > 1(this means the entry is ref by other entries) + // we may keep the first valid entry before the delete entry + // if `being_ref_count` > 1 (this means the entry is ref by other entries) bool last_entry_is_delete = !iter->second.isEntry(); --iter; // keep the first version less than while (true) @@ -578,7 +576,7 @@ bool VersionedPageEntries::cleanOutdatedEntries( { if (last_entry_is_delete) { - if (!keep_last_valid_var_entry && iter->second.being_ref_count == 1) + if (iter->second.being_ref_count == 1) { if (entries_removed) { @@ -610,13 +608,7 @@ bool VersionedPageEntries::cleanOutdatedEntries( } template -bool VersionedPageEntries::derefAndClean( - UInt64 lowest_seq, - const typename Trait::PageId & page_id, - const PageVersion & deref_ver, - const Int64 deref_count, - PageEntriesV3 * entries_removed, - bool keep_last_valid_var_entry) +bool VersionedPageEntries::derefAndClean(UInt64 lowest_seq, const typename Trait::PageId & page_id, const PageVersion & deref_ver, const Int64 deref_count, PageEntriesV3 * entries_removed) { auto page_lock = acquireLock(); if (type == EditRecordType::VAR_EXTERNAL) @@ -657,7 +649,7 @@ bool VersionedPageEntries::derefAndClean( // Clean outdated entries after decreased the ref-counter // set `normal_entries_to_deref` to be nullptr to ignore cleaning ref-var-entries - return cleanOutdatedEntries(lowest_seq, /*normal_entries_to_deref*/ nullptr, entries_removed, page_lock, keep_last_valid_var_entry); + return cleanOutdatedEntries(lowest_seq, /*normal_entries_to_deref*/ nullptr, entries_removed, page_lock); } throw Exception(fmt::format("calling derefAndClean with invalid state [state={}]", toDebugString())); @@ -1083,6 +1075,19 @@ typename Trait::PageIdSet PageDirectory::getRangePageIds(const typename T return page_ids; } +template +typename Trait::PageIds PageDirectory::getLowerBound(const typename Trait::PageId & start) +{ + typename Trait::PageIds page_ids; + + std::shared_lock read_lock(table_rw_mutex); + if (auto iter = mvcc_table_directory.lower_bound(start); iter != mvcc_table_directory.end()) + { + page_ids.emplace_back(iter->first); + } + return page_ids; +} + template void PageDirectory::applyRefEditRecord( MVCCMapType & mvcc_table_directory, @@ -1187,10 +1192,13 @@ void PageDirectory::applyRefEditRecord( template void PageDirectory::apply(typename Trait::PageEntriesEdit && edit, const WriteLimiterPtr & write_limiter) { + Stopwatch watch; // Note that we need to make sure increasing `sequence` in order, so it // also needs to be protected by `write_lock` throughout the `apply` // TODO: It is totally serialized, make it a pipeline std::unique_lock write_lock(table_rw_mutex); + GET_METRIC(tiflash_storage_page_write_duration_seconds, type_latch).Observe(watch.elapsedSeconds()); + watch.restart(); UInt64 last_sequence = sequence.load(); PageVersion new_version(last_sequence + 1, 0); @@ -1200,6 +1208,9 @@ void PageDirectory::apply(typename Trait::PageEntriesEdit && edit, const r.version = new_version; } wal->apply(Trait::Serializer::serializeTo(edit), write_limiter); + GET_METRIC(tiflash_storage_page_write_duration_seconds, type_wal).Observe(watch.elapsedSeconds()); + watch.restart(); + SCOPE_EXIT({ GET_METRIC(tiflash_storage_page_write_duration_seconds, type_commit).Observe(watch.elapsedSeconds()); }); // stage 2, create entry version list for page_id. for (const auto & r : edit.getRecords()) @@ -1372,8 +1383,7 @@ bool PageDirectory::tryDumpSnapshot(const ReadLimiterPtr & read_limiter, return factory.createFromReader( identifier, std::move(snapshot_reader), - /* wal */ nullptr, - /* for_dump_snapshot */ true); + /* wal */ nullptr); } else if constexpr (std::is_same_v) { @@ -1381,8 +1391,7 @@ bool PageDirectory::tryDumpSnapshot(const ReadLimiterPtr & read_limiter, return factory.createFromReader( identifier, std::move(snapshot_reader), - /* wal */ nullptr, - /* for_dump_snapshot */ true); + /* wal */ nullptr); } }(); // The records persisted in `files_snap` is older than or equal to all records in `edit` @@ -1397,7 +1406,7 @@ bool PageDirectory::tryDumpSnapshot(const ReadLimiterPtr & read_limiter, } template -PageEntriesV3 PageDirectory::gcInMemEntries(bool return_removed_entries, bool keep_last_valid_var_entry) +PageEntriesV3 PageDirectory::gcInMemEntries(bool return_removed_entries) { UInt64 lowest_seq = sequence.load(); @@ -1462,8 +1471,7 @@ PageEntriesV3 PageDirectory::gcInMemEntries(bool return_removed_entries, lowest_seq, &normal_entries_to_deref, return_removed_entries ? &all_del_entries : nullptr, - iter->second->acquireLock(), - keep_last_valid_var_entry); + iter->second->acquireLock()); { std::unique_lock write_lock(table_rw_mutex); @@ -1501,8 +1509,7 @@ PageEntriesV3 PageDirectory::gcInMemEntries(bool return_removed_entries, page_id, /*deref_ver=*/deref_counter.first, /*deref_count=*/deref_counter.second, - return_removed_entries ? &all_del_entries : nullptr, - keep_last_valid_var_entry); + return_removed_entries ? &all_del_entries : nullptr); if (all_deleted) { diff --git a/dbms/src/Storages/Page/V3/PageDirectory.h b/dbms/src/Storages/Page/V3/PageDirectory.h index 7c58c21471c..c3185f94bfa 100644 --- a/dbms/src/Storages/Page/V3/PageDirectory.h +++ b/dbms/src/Storages/Page/V3/PageDirectory.h @@ -200,7 +200,6 @@ class VersionedPageEntries * to be decreased the ref count by `derefAndClean`. * The elem is > * `entries_removed`: Return the entries removed from the version list - * `keep_last_valid_var_entry`: Keep the last valid entry, useful for dumping snapshot. * * Return `true` iff this page can be totally removed from the whole `PageDirectory`. */ @@ -208,15 +207,13 @@ class VersionedPageEntries UInt64 lowest_seq, typename Trait::EntriesDerefMap * normal_entries_to_deref, PageEntriesV3 * entries_removed, - const PageLock & page_lock, - bool keep_last_valid_var_entry = false); + const PageLock & page_lock); bool derefAndClean( UInt64 lowest_seq, const typename Trait::PageId & page_id, const PageVersion & deref_ver, Int64 deref_count, - PageEntriesV3 * entries_removed, - bool keep_last_valid_var_entry = false); + PageEntriesV3 * entries_removed); void collapseTo(UInt64 seq, const typename Trait::PageId & page_id, typename Trait::PageEntriesEdit & edit); @@ -315,6 +312,8 @@ class PageDirectory typename Trait::PageIdSet getRangePageIds(const typename Trait::PageId & start, const typename Trait::PageId & end); + typename Trait::PageIds getLowerBound(const typename Trait::PageId & start); + void apply(typename Trait::PageEntriesEdit && edit, const WriteLimiterPtr & write_limiter = nullptr); std::pair @@ -329,8 +328,7 @@ class PageDirectory // Perform a GC for in-memory entries and return the removed entries. // If `return_removed_entries` is false, then just return an empty set. - // When dump snapshot, we need to keep the last valid entry. Check out `tryDumpSnapshot` for the reason. - PageEntriesV3 gcInMemEntries(bool return_removed_entries = true, bool keep_last_valid_var_entry = false); + PageEntriesV3 gcInMemEntries(bool return_removed_entries = true); private: using ExternalIdTrait = typename Trait::ExternalIdTrait; diff --git a/dbms/src/Storages/Page/V3/PageDirectoryFactory.cpp b/dbms/src/Storages/Page/V3/PageDirectoryFactory.cpp index 70df01b9d79..c34574cf89b 100644 --- a/dbms/src/Storages/Page/V3/PageDirectoryFactory.cpp +++ b/dbms/src/Storages/Page/V3/PageDirectoryFactory.cpp @@ -40,7 +40,7 @@ PageDirectoryFactory::create(String storage_name, FileProviderPtr & file_ template typename Trait::PageDirectoryPtr -PageDirectoryFactory::createFromReader(String storage_name, WALStoreReaderPtr reader, WALStorePtr wal, bool for_dump_snapshot) +PageDirectoryFactory::createFromReader(String storage_name, WALStoreReaderPtr reader, WALStorePtr wal) { typename Trait::PageDirectoryPtr dir = std::make_unique(storage_name, std::move(wal)); loadFromDisk(dir, std::move(reader)); @@ -51,10 +51,10 @@ PageDirectoryFactory::createFromReader(String storage_name, WALStoreReade // After restoring from the disk, we need cleanup all invalid entries in memory, or it will // try to run GC again on some entries that are already marked as invalid in BlobStore. // It's no need to remove the expired entries in BlobStore, so skip filling removed_entries to improve performance. - dir->gcInMemEntries(/*return_removed_entries=*/false, /* keep_last_delete_entry */ for_dump_snapshot); - LOG_FMT_INFO(DB::Logger::get("PageDirectoryFactory", storage_name), "PageDirectory restored [max_page_id={}] [max_applied_ver={}]", dir->getMaxId(), dir->sequence); + dir->gcInMemEntries(/*return_removed_entries=*/false); + LOG_INFO(DB::Logger::get(storage_name), "PageDirectory restored [max_page_id={}] [max_applied_ver={}]", dir->getMaxId(), dir->sequence); - if (!for_dump_snapshot && blob_stats) + if (blob_stats) { // After all entries restored to `mvcc_table_directory`, only apply // the latest entry to `blob_stats`, or we may meet error since diff --git a/dbms/src/Storages/Page/V3/PageDirectoryFactory.h b/dbms/src/Storages/Page/V3/PageDirectoryFactory.h index a45ad708c95..bee04874e03 100644 --- a/dbms/src/Storages/Page/V3/PageDirectoryFactory.h +++ b/dbms/src/Storages/Page/V3/PageDirectoryFactory.h @@ -54,7 +54,7 @@ class PageDirectoryFactory typename Trait::PageDirectoryPtr create(String storage_name, FileProviderPtr & file_provider, PSDiskDelegatorPtr & delegator, WALConfig config); - typename Trait::PageDirectoryPtr createFromReader(String storage_name, WALStoreReaderPtr reader, WALStorePtr wal, bool for_dump_snapshot = false); + typename Trait::PageDirectoryPtr createFromReader(String storage_name, WALStoreReaderPtr reader, WALStorePtr wal); // just for test typename Trait::PageDirectoryPtr createFromEdit(String storage_name, FileProviderPtr & file_provider, PSDiskDelegatorPtr & delegator, const typename Trait::PageEntriesEdit & edit); diff --git a/dbms/src/Storages/Page/V3/WALStore.cpp b/dbms/src/Storages/Page/V3/WALStore.cpp index 3d006f1931b..06d5f3879fd 100644 --- a/dbms/src/Storages/Page/V3/WALStore.cpp +++ b/dbms/src/Storages/Page/V3/WALStore.cpp @@ -14,6 +14,7 @@ #include #include +#include #include #include #include @@ -82,20 +83,26 @@ void WALStore::apply(String && serialized_edit, const WriteLimiterPtr & write_li { std::lock_guard lock(log_file_mutex); - // Roll to a new log file - // TODO: Make it configurable if (log_file == nullptr || log_file->writtenBytes() > config.roll_size) { - auto log_num = last_log_num++; - auto [new_log_file, filename] = createLogWriter({log_num, 0}, false); - (void)filename; - log_file.swap(new_log_file); + // Roll to a new log file + rollToNewLogWriter(lock); } log_file->addRecord(payload, serialized_edit.size(), write_limiter); } } +Format::LogNumberType WALStore::rollToNewLogWriter(const std::lock_guard &) +{ + // Roll to a new log file + auto log_num = last_log_num++; + auto [new_log_file, filename] = createLogWriter({log_num, 0}, false); + UNUSED(filename); + log_file.swap(new_log_file); + return log_num; +} + std::tuple, LogFilename> WALStore::createLogWriter( const std::pair & new_log_lvl, bool manual_flush) @@ -135,9 +142,7 @@ std::tuple, LogFilename> WALStore::createLogWriter( new_log_lvl.first, /*recycle*/ true, /*manual_flush*/ manual_flush); - return { - std::move(log_writer), - log_filename}; + return {std::move(log_writer), log_filename}; } WALStore::FilesSnapshot WALStore::getFilesSnapshot() const @@ -185,7 +190,7 @@ bool WALStore::saveSnapshot( if (files_snap.persisted_log_files.empty()) return false; - LOG_FMT_INFO(logger, "Saving directory snapshot"); + LOG_FMT_INFO(logger, "Saving directory snapshot [num_records={}]", num_records); // Use {largest_log_num, 1} to save the `edit` const auto log_num = files_snap.persisted_log_files.rbegin()->log_num; @@ -226,7 +231,7 @@ bool WALStore::saveSnapshot( files_snap.persisted_log_files.begin(), files_snap.persisted_log_files.end(), [](const auto & arg, FmtBuffer & fb) { - fb.fmtAppend("{}", arg.filename(arg.stage)); + fb.append(arg.filename(arg.stage)); }, ", "); fmt_buf.fmtAppend("] [num_records={}] [file={}] [size={}].", diff --git a/dbms/src/Storages/Page/V3/WALStore.h b/dbms/src/Storages/Page/V3/WALStore.h index 91f02c67771..12b9923a428 100644 --- a/dbms/src/Storages/Page/V3/WALStore.h +++ b/dbms/src/Storages/Page/V3/WALStore.h @@ -35,6 +35,10 @@ class PSDiskDelegator; using PSDiskDelegatorPtr = std::shared_ptr; namespace PS::V3 { +namespace tests +{ +class WALStoreTest; +} class WALStore; using WALStorePtr = std::unique_ptr; @@ -74,6 +78,7 @@ class WALStore } }; + FilesSnapshot getFilesSnapshot() const; bool saveSnapshot( @@ -84,19 +89,22 @@ class WALStore const String & name() { return storage_name; } + friend class tests::WALStoreTest; // for testing + private: - WALStore( - String storage_name, - const PSDiskDelegatorPtr & delegator_, - const FileProviderPtr & provider_, - Format::LogNumberType last_log_num_, - WALConfig config); + WALStore(String storage_name, + const PSDiskDelegatorPtr & delegator_, + const FileProviderPtr & provider_, + Format::LogNumberType last_log_num_, + WALConfig config); std::tuple, LogFilename> createLogWriter( const std::pair & new_log_lvl, bool manual_flush); + Format::LogNumberType rollToNewLogWriter(const std::lock_guard &); + private: const String storage_name; PSDiskDelegatorPtr delegator; diff --git a/dbms/src/Storages/Page/V3/spacemap/SpaceMapSTDMap.h b/dbms/src/Storages/Page/V3/spacemap/SpaceMapSTDMap.h index 41ddd77d03a..0c47cdb9590 100644 --- a/dbms/src/Storages/Page/V3/spacemap/SpaceMapSTDMap.h +++ b/dbms/src/Storages/Page/V3/spacemap/SpaceMapSTDMap.h @@ -253,10 +253,16 @@ class STDMapSpaceMap // Update return start offset = it->first; + bool is_champion = it->first <= hint_biggest_offset && hint_biggest_offset < it->first + it->second; + if (is_champion) + { + // TODO: change to exception + assert(hint_biggest_offset + hint_biggest_cap <= it->first + it->second); + } if (it->second == size) { // It is not champion, just return - if (it->first != hint_biggest_offset) + if (!is_champion) { free_map.erase(it); max_cap = hint_biggest_cap; @@ -277,7 +283,7 @@ class STDMapSpaceMap it = free_map.insert(/*hint=*/it, {k, v}); // Use the `it` after erased as a hint, should be good for performance // It is not champion, just return - if (k - size != hint_biggest_offset) + if (!is_champion) { max_cap = hint_biggest_cap; return std::make_tuple(offset, max_cap, last_offset == offset); diff --git a/dbms/src/Storages/Page/V3/tests/gtest_page_storage.cpp b/dbms/src/Storages/Page/V3/tests/gtest_page_storage.cpp index 2d4de975fdf..e122b397f47 100644 --- a/dbms/src/Storages/Page/V3/tests/gtest_page_storage.cpp +++ b/dbms/src/Storages/Page/V3/tests/gtest_page_storage.cpp @@ -1635,7 +1635,7 @@ try page_storage->write(std::move(batch)); } - // create a snapshot to avoid gc + // create a snapshot to avoid page0 being GC-ed auto snap = page_storage->getSnapshot(); { @@ -1644,21 +1644,29 @@ try page_storage->write(std::move(batch)); } +<<<<<<< HEAD auto getLogFileNum = [&]() { auto log_files = WALStoreReader::listAllFiles(delegator, Logger::get("PageStorageTest", "")); +======= + auto get_log_file_num = [&]() { + auto log_files = WALStoreReader::listAllFiles(delegator, Logger::get()); +>>>>>>> 47480fc3a (PageStorage: Fix peak memory usage when running GC on PageDirectory (#6168)) return log_files.size(); }; // write until there are more than one wal file - while (getLogFileNum() <= 1) + while (get_log_file_num() <= 1) { WriteBatch batch; PageId page_id1 = 130; batch.putPage(page_id1, 0, std::make_shared(c_buff, buf_sz), buf_sz, {}); page_storage->write(std::move(batch)); } + + // read with latest snapshot, we can not get page0 ASSERT_ANY_THROW(page_storage->read(page_id0)); + // after the page0 get deleted in previouse log file, // write an upsert entry into the current writing log file auto done_full_gc = page_storage->gc(); EXPECT_TRUE(done_full_gc); @@ -1671,6 +1679,8 @@ try page_storage = reopenWithConfig(config); } + // After restored from disk, we should not see page0 again + // or it could be an entry pointing to a non-exist BlobFile ASSERT_ANY_THROW(page_storage->read(page_id0)); } CATCH @@ -1720,13 +1730,18 @@ try page_storage->write(std::move(batch)); } +<<<<<<< HEAD auto getLogFileNum = [&]() { auto log_files = WALStoreReader::listAllFiles(delegator, Logger::get("PageStorageTest", "")); +======= + auto get_log_file_num = [&]() { + auto log_files = WALStoreReader::listAllFiles(delegator, Logger::get()); +>>>>>>> 47480fc3a (PageStorage: Fix peak memory usage when running GC on PageDirectory (#6168)) return log_files.size(); }; // write until there are more than one wal file - while (getLogFileNum() <= 1) + while (get_log_file_num() <= 1) { WriteBatch batch; PageId page_id2 = 130; @@ -1735,6 +1750,7 @@ try } ASSERT_ANY_THROW(page_storage->read(page_id0)); + // after the page0 get deleted in previouse log file, // write an upsert entry into the current writing log file auto done_full_gc = page_storage->gc(); EXPECT_TRUE(done_full_gc); @@ -1747,6 +1763,8 @@ try page_storage = reopenWithConfig(config); } + // After restored from disk, we should not see page0 again + // or it could be an entry pointing to a non-exist BlobFile ASSERT_ANY_THROW(page_storage->read(page_id0)); } CATCH diff --git a/dbms/src/Storages/Page/V3/tests/gtest_wal_store.cpp b/dbms/src/Storages/Page/V3/tests/gtest_wal_store.cpp index a629d6ad9b1..72858fd56ec 100644 --- a/dbms/src/Storages/Page/V3/tests/gtest_wal_store.cpp +++ b/dbms/src/Storages/Page/V3/tests/gtest_wal_store.cpp @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include #include #include #include @@ -29,6 +30,8 @@ #include #include +#include +#include #include namespace DB::PS::V3::tests @@ -335,7 +338,11 @@ class WALStoreTest } protected: +<<<<<<< HEAD static void applyWithSameVersion(WALStorePtr & wal, u128::PageEntriesEdit & edit, const PageVersion & version) +======= + static void applyWithSameVersion(const WALStorePtr & wal, PageEntriesEdit & edit, const PageVersion & version) +>>>>>>> 47480fc3a (PageStorage: Fix peak memory usage when running GC on PageDirectory (#6168)) { for (auto & r : edit.getMutRecords()) { @@ -344,6 +351,18 @@ class WALStoreTest wal->apply(u128::Serializer::serializeTo(edit)); } + static void rollToNewLogWriter(const WALStorePtr & wal) + { + std::lock_guard guard(wal->log_file_mutex); + wal->rollToNewLogWriter(guard); + } + + size_t getNumLogFiles() + { + auto log_files = WALStoreReader::listAllFiles(delegator, log); + return log_files.size(); + } + private: const bool multi_paths; @@ -353,9 +372,10 @@ class WALStoreTest LoggerPtr log; }; -TEST_P(WALStoreTest, FindCheckpointFile) +TEST(WALStoreReaderTest, FindCheckpointFile) { - auto path = getTemporaryPath(); + auto log = Logger::get(); + auto path = base::TiFlashStorageTestBasic::getTemporaryPath(); { // no checkpoint @@ -704,8 +724,7 @@ try // Test for save snapshot (with encryption) LogFilenameSet persisted_log_files = WALStoreReader::listAllFiles(delegator, log); - WALStore::FilesSnapshot file_snap{.current_writing_log_num = 100, // just a fake value - .persisted_log_files = persisted_log_files}; + WALStore::FilesSnapshot file_snap{.persisted_log_files = persisted_log_files}; u128::PageEntriesEdit snap_edit; PageEntryV3 entry{.file_id = 2, .size = 1, .padded_size = 0, .tag = 0, .offset = 0x123, .checksum = 0x4567}; @@ -743,6 +762,68 @@ try } CATCH +TEST_P(WALStoreTest, GetFileSnapshot) +{ + auto ctx = DB::tests::TiFlashTestEnv::getContext(); + auto provider = ctx.getFileProvider(); + auto path = getTemporaryPath(); + + auto [wal, reader] = WALStore::create(getCurrentTestName(), provider, delegator, config); + ASSERT_NE(wal, nullptr); + + // running gc right before any writes is skip + ASSERT_FALSE(wal->tryGetFilesSnapshot(1, false).isValid()); + + // generate log_1_0, log_2_0, log_3_0 + rollToNewLogWriter(wal); + rollToNewLogWriter(wal); + rollToNewLogWriter(wal); + + ASSERT_EQ(getNumLogFiles(), 3); + // num of files not exceed 5, skip + ASSERT_FALSE(wal->tryGetFilesSnapshot(5, false).isValid()); + // num of files not exceed 3, skip + ASSERT_FALSE(wal->tryGetFilesSnapshot(3, false).isValid()); + // num of files not exceed 3, but still valid when `force` is true + ASSERT_TRUE(wal->tryGetFilesSnapshot(3, true).isValid()); + + rollToNewLogWriter(wal); + // num of files exceed 3, return + { + ASSERT_EQ(getNumLogFiles(), 4); + auto files = wal->tryGetFilesSnapshot(3, false); + ASSERT_TRUE(files.isValid()); + ASSERT_EQ(files.persisted_log_files.size(), 4); + ASSERT_EQ(files.persisted_log_files.begin()->log_num, 1); + ASSERT_EQ(files.persisted_log_files.rbegin()->log_num, 4); + ASSERT_EQ(getNumLogFiles(), 4); + } + + { + // write new edit, new log file generated + PageEntriesEdit edit; + edit.del(buildV3Id(TEST_NAMESPACE_ID, 100)); + wal->apply(ser::serializeTo(edit)); + } + + { + ASSERT_EQ(getNumLogFiles(), 5); + auto files = wal->tryGetFilesSnapshot(3, false); + ASSERT_TRUE(files.isValid()); + ASSERT_EQ(files.persisted_log_files.size(), 5); + ASSERT_EQ(files.persisted_log_files.begin()->log_num, 1); + ASSERT_EQ(files.persisted_log_files.rbegin()->log_num, 5); + ASSERT_EQ(getNumLogFiles(), 5); + + // empty + PageEntriesEdit snap_edit; + bool done = wal->saveSnapshot(std::move(files), ser::serializeTo(snap_edit), snap_edit.size()); + ASSERT_TRUE(done); + ASSERT_EQ(getNumLogFiles(), 1); + } +} + + INSTANTIATE_TEST_CASE_P( Disks, WALStoreTest, diff --git a/dbms/src/Storages/Page/universal/UniversalPageStorage.cpp b/dbms/src/Storages/Page/universal/UniversalPageStorage.cpp index 8a36aa63deb..8e4e9653c15 100644 --- a/dbms/src/Storages/Page/universal/UniversalPageStorage.cpp +++ b/dbms/src/Storages/Page/universal/UniversalPageStorage.cpp @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include #include #include #include @@ -52,7 +53,131 @@ void UniversalPageStorage::write(UniversalWriteBatch && write_batch, const Write if (unlikely(write_batch.empty())) return; + Stopwatch watch; + SCOPE_EXIT({ GET_METRIC(tiflash_storage_page_write_duration_seconds, type_total).Observe(watch.elapsedSeconds()); }); auto edit = blob_store->write(write_batch, write_limiter); + GET_METRIC(tiflash_storage_page_write_duration_seconds, type_blob).Observe(watch.elapsedSeconds()); page_directory->apply(std::move(edit), write_limiter); } + +String UniversalPageStorage::GCTimeStatistics::toLogging() const +{ + const std::string_view stage_suffix = [this]() { + switch (stage) + { + case GCStageType::Unknown: + return " "; + case GCStageType::OnlyInMem: + return " without full gc"; + case GCStageType::FullGCNothingMoved: + return " without moving any entry"; + case GCStageType::FullGC: + return ""; + } + }(); + const auto get_external_msg = [this]() -> String { + if (clean_external_page_ms == 0) + return String(""); + static constexpr double SCALE_NS_TO_MS = 1'000'000.0; + return fmt::format(" [external_callbacks={}] [external_gc={}ms] [scanner={:.2f}ms] [get_alive={:.2f}ms] [remover={:.2f}ms]", + num_external_callbacks, + clean_external_page_ms, + external_page_scan_ns / SCALE_NS_TO_MS, + external_page_get_alive_ns / SCALE_NS_TO_MS, + external_page_remove_ns / SCALE_NS_TO_MS); + }; + return fmt::format("GC finished{}." + " [total time={}ms]" + " [dump snapshots={}ms] [gc in mem entries={}ms]" + " [blobstore remove entries={}ms] [blobstore get status={}ms]" + " [get gc entries={}ms] [blobstore full gc={}ms]" + " [gc apply={}ms]" + "{}", // a placeholder for external page gc at last + stage_suffix, + total_cost_ms, + dump_snapshots_ms, + gc_in_mem_entries_ms, + blobstore_remove_entries_ms, + blobstore_get_gc_stats_ms, + full_gc_get_entries_ms, + full_gc_blobstore_copy_ms, + full_gc_apply_ms, + get_external_msg()); +} + +UniversalPageStorage::GCTimeStatistics UniversalPageStorage::doGC(const WriteLimiterPtr & write_limiter, const ReadLimiterPtr & read_limiter) +{ + Stopwatch gc_watch; + SCOPE_EXIT({ + GET_METRIC(tiflash_storage_page_gc_count, type_v3).Increment(); + GET_METRIC(tiflash_storage_page_gc_duration_seconds, type_v3).Observe(gc_watch.elapsedSeconds()); + bool is_running = true; + gc_is_running.compare_exchange_strong(is_running, false); + }); + + GCTimeStatistics statistics; + + // 1. Do the MVCC gc, clean up expired snapshot. + // And get the expired entries. + if (page_directory->tryDumpSnapshot(read_limiter, write_limiter)) + { + GET_METRIC(tiflash_storage_page_gc_count, type_v3_mvcc_dumped).Increment(); + } + statistics.dump_snapshots_ms = gc_watch.elapsedMillisecondsFromLastTime(); + + const auto & del_entries = page_directory->gcInMemEntries(); + statistics.gc_in_mem_entries_ms = gc_watch.elapsedMillisecondsFromLastTime(); + + // 2. Remove the expired entries in BlobStore. + // It won't delete the data on the disk. + // It will only update the SpaceMap which in memory. + blob_store->remove(del_entries); + statistics.blobstore_remove_entries_ms = gc_watch.elapsedMillisecondsFromLastTime(); + + // 3. Analyze the status of each Blob in order to obtain the Blobs that need to do `full GC`. + // Blobs that do not need to do full GC will also do ftruncate to reduce space amplification. + const auto & blob_ids_need_gc = blob_store->getGCStats(); + statistics.blobstore_get_gc_stats_ms = gc_watch.elapsedMillisecondsFromLastTime(); + if (blob_ids_need_gc.empty()) + { + statistics.stage = GCStageType::OnlyInMem; + statistics.total_cost_ms = gc_watch.elapsedMilliseconds(); + return statistics; + } + + // Execute full gc + GET_METRIC(tiflash_storage_page_gc_count, type_v3_bs_full_gc).Increment(blob_ids_need_gc.size()); + // 4. Filter out entries in MVCC by BlobId. + // We also need to filter the version of the entry. + // So that the `gc_apply` can proceed smoothly. + auto [blob_gc_info, total_page_size] = page_directory->getEntriesByBlobIds(blob_ids_need_gc); + statistics.full_gc_get_entries_ms = gc_watch.elapsedMillisecondsFromLastTime(); + if (blob_gc_info.empty()) + { + statistics.stage = GCStageType::FullGCNothingMoved; + statistics.total_cost_ms = gc_watch.elapsedMilliseconds(); + return statistics; + } + + // 5. Do the BlobStore GC + // After BlobStore GC, these entries will be migrated to a new blob. + // Then we should notify MVCC apply the change. + PS::V3::universal::PageEntriesEdit gc_edit = blob_store->gc(blob_gc_info, total_page_size, write_limiter, read_limiter); + statistics.full_gc_blobstore_copy_ms = gc_watch.elapsedMillisecondsFromLastTime(); + RUNTIME_CHECK_MSG(!gc_edit.empty(), "Something wrong after BlobStore GC"); + + // 6. MVCC gc apply + // MVCC will apply the migrated entries. + // Also it will generate a new version for these entries. + // Note that if the process crash between step 5 and step 6, the stats in BlobStore will + // be reset to correct state during restore. If any exception thrown, then some BlobFiles + // will be remained as "read-only" files while entries in them are useless in actual. + // Those BlobFiles should be cleaned during next restore. + page_directory->gcApply(std::move(gc_edit), write_limiter); + statistics.full_gc_apply_ms = gc_watch.elapsedMillisecondsFromLastTime(); + + statistics.stage = GCStageType::FullGC; + statistics.total_cost_ms = gc_watch.elapsedMilliseconds(); + return statistics; +} } // namespace DB diff --git a/dbms/src/Storages/Page/universal/UniversalPageStorage.h b/dbms/src/Storages/Page/universal/UniversalPageStorage.h index d4a1d6ced51..f8088ff7dbe 100644 --- a/dbms/src/Storages/Page/universal/UniversalPageStorage.h +++ b/dbms/src/Storages/Page/universal/UniversalPageStorage.h @@ -69,9 +69,45 @@ class UniversalPageStorage final , delegator(std::move(delegator_)) , config(config_) , file_provider(file_provider_) + , log(Logger::get("UniversalPageStorage", name)) { } + enum class GCStageType + { + Unknown, + OnlyInMem, + FullGCNothingMoved, + FullGC, + }; + + struct GCTimeStatistics + { + GCStageType stage = GCStageType::Unknown; + bool executeNextImmediately() const { return stage == GCStageType::FullGC; }; + + UInt64 total_cost_ms = 0; + + UInt64 dump_snapshots_ms = 0; + UInt64 gc_in_mem_entries_ms = 0; + UInt64 blobstore_remove_entries_ms = 0; + UInt64 blobstore_get_gc_stats_ms = 0; + // Full GC + UInt64 full_gc_get_entries_ms = 0; + UInt64 full_gc_blobstore_copy_ms = 0; + UInt64 full_gc_apply_ms = 0; + + // GC external page + UInt64 clean_external_page_ms = 0; + UInt64 num_external_callbacks = 0; + // ms is usually too big for these operation, store by ns (10^-9) + UInt64 external_page_scan_ns = 0; + UInt64 external_page_get_alive_ns = 0; + UInt64 external_page_remove_ns = 0; + + String toLogging() const; + }; + ~UniversalPageStorage() = default; void restore(); @@ -118,10 +154,21 @@ class UniversalPageStorage final // We may skip the GC to reduce useless reading by default. bool gc(bool not_skip = false, const WriteLimiterPtr & write_limiter = nullptr, const ReadLimiterPtr & read_limiter = nullptr) { - UNUSED(not_skip, write_limiter, read_limiter); - return false; + std::ignore = not_skip; + // If another thread is running gc, just return; + bool v = false; + if (!gc_is_running.compare_exchange_strong(v, true)) + return false; + + const GCTimeStatistics statistics = doGC(write_limiter, read_limiter); + assert(statistics.stage != GCStageType::Unknown); // `doGC` must set the stage + LOG_DEBUG(log, statistics.toLogging()); + + return statistics.executeNextImmediately(); } + GCTimeStatistics doGC(const WriteLimiterPtr & write_limiter, const ReadLimiterPtr & read_limiter); + // Register and unregister external pages GC callbacks // Note that user must ensure that it is safe to call `scanner` and `remover` even after unregister. void registerExternalPagesCallbacks(const ExternalPageCallbacks & callbacks) { UNUSED(callbacks); } @@ -134,6 +181,10 @@ class UniversalPageStorage final PS::V3::universal::PageDirectoryPtr page_directory; PS::V3::universal::BlobStorePtr blob_store; + + std::atomic gc_is_running = false; + + LoggerPtr log; }; class KVStoreReader final @@ -211,6 +262,38 @@ class RaftLogReader final } } + void traverse2(const UniversalPageId & start, const UniversalPageId & end, const std::function & acceptor) + { + // always traverse with the latest snapshot + auto snapshot = uni_storage.getSnapshot(fmt::format("scan_r_{}_{}", start, end)); + const auto page_ids = uni_storage.page_directory->getRangePageIds(start, end); + for (const auto & page_id : page_ids) + { + const auto page_id_and_entry = uni_storage.page_directory->getByID(page_id, snapshot); + acceptor(uni_storage.blob_store->read(page_id_and_entry)); + } + } + + UniversalPage read(const UniversalPageId & page_id) + { + // always traverse with the latest snapshot + auto snapshot = uni_storage.getSnapshot(fmt::format("read_{}", page_id)); + const auto page_id_and_entry = uni_storage.page_directory->getByIDOrNull(page_id, snapshot); + if (page_id_and_entry.second.isValid()) + { + return uni_storage.blob_store->read(page_id_and_entry); + } + else + { + return UniversalPage({}); + } + } + + UniversalPageIds getLowerBound(const UniversalPageId & page_id) + { + return uni_storage.page_directory->getLowerBound(page_id); + } + private: UniversalPageStorage & uni_storage; }; diff --git a/dbms/src/Storages/Transaction/ProxyFFI.cpp b/dbms/src/Storages/Transaction/ProxyFFI.cpp index d66875c268d..892648c9e54 100644 --- a/dbms/src/Storages/Transaction/ProxyFFI.cpp +++ b/dbms/src/Storages/Transaction/ProxyFFI.cpp @@ -14,8 +14,11 @@ #include #include +#include #include #include +#include +#include #include #include #include @@ -157,6 +160,185 @@ uint8_t TryFlushData(EngineStoreServerWrap * server, uint64_t region_id, uint8_t } } +RawCppPtr CreateWriteBatch() +{ +// LOG_DEBUG(&Poco::Logger::get("ProxyFFIDebug"), "create write batch"); + return GenRawCppPtr(new UniversalWriteBatch(), RawCppPtrTypeImpl::WriteBatch); +} + +void WriteBatchPutPage(RawVoidPtr ptr, BaseBuffView page_id, BaseBuffView value) +{ + auto * wb = reinterpret_cast(ptr); + MemoryWriteBuffer buf(0, value.len); + buf.write(value.data, value.len); + auto data_size = buf.count(); + assert(data_size == value.len); + wb->putPage(UniversalPageId(page_id.data, page_id.len), 0, buf.tryGetReadBuffer(), data_size); +// std::cout << "page " << Redact::keyToHexString(page_id.data, page_id.len) << " value " << Redact::keyToHexString(value.data, value.len) << std::endl; +} + +void WriteBatchDelPage(RawVoidPtr ptr, BaseBuffView page_id) +{ + auto * wb = reinterpret_cast(ptr); + wb->delPage(UniversalPageId(page_id.data, page_id.len)); +} + +uint64_t WriteBatchSize(RawVoidPtr ptr) +{ + auto * wb = reinterpret_cast(ptr); + return wb->getTotalDataSize(); +} + +uint8_t WriteBatchIsEmpty(RawVoidPtr ptr) +{ + auto * wb = reinterpret_cast(ptr); + return wb->empty(); +} + +void WriteBatchMerge(RawVoidPtr lhs, RawVoidPtr rhs) +{ + auto * lwb = reinterpret_cast(lhs); + auto * rwb = reinterpret_cast(rhs); + lwb->merge(*rwb); + // TODO: do we need clear rhs here? +} + +void WriteBatchClear(RawVoidPtr ptr) +{ + auto * wb = reinterpret_cast(ptr); + wb->clear(); +} + +void ConsumeWriteBatch(const EngineStoreServerWrap * server, RawVoidPtr ptr) +{ + try + { + auto uni_ps = server->tmt->getContext().getGlobalUniversalPageStorage(); + auto * wb = reinterpret_cast(ptr); + uni_ps->write(std::move(*wb)); + // TODO: verify that clear is allowed after std::move and the wb is reusable + wb->clear(); + } + catch (...) + { + tryLogCurrentException(__PRETTY_FUNCTION__); + exit(-1); + } +} + +PageWithView HandleReadPage(const EngineStoreServerWrap * server, BaseBuffView page_id) +{ + try + { + auto uni_ps = server->tmt->getContext().getGlobalUniversalPageStorage(); + RaftLogReader reader(*uni_ps); + UniversalPageId id{page_id.data, page_id.len}; + auto * page = new UniversalPage(reader.read(id)); + if (page->isValid()) + { +// LOG_DEBUG(&Poco::Logger::get("ProxyFFIDebug"), "handle read page"); + return PageWithView{.inner = GenRawCppPtr(page, RawCppPtrTypeImpl::UniversalPage), .view = BaseBuffView{page->data.begin(), page->data.size()}}; + } + else + { + return PageWithView{.inner = GenRawCppPtr(), .view = BaseBuffView{}}; + } + } + catch (...) + { + tryLogCurrentException(__PRETTY_FUNCTION__); + exit(-1); + } +} + +PageWithViewVec HandleScanPage(const EngineStoreServerWrap * server, BaseBuffView start_page_id, BaseBuffView end_page_id) +{ + try + { + auto uni_ps = server->tmt->getContext().getGlobalUniversalPageStorage(); + RaftLogReader reader(*uni_ps); + UniversalPageId start_id{start_page_id.data, start_page_id.len}; + UniversalPageId end_id{end_page_id.data, end_page_id.len}; + std::vector pages; + auto checker = [&](DB::UniversalPage page) { + pages.push_back(new UniversalPage(std::move(page))); + }; + reader.traverse2(start_id, end_id, checker); + auto * data = static_cast(malloc(pages.size() * sizeof(PageWithView))); + for (size_t i = 0; i < pages.size(); i++) + { + auto * target = reinterpret_cast(data) + i; + if (pages[i]->isValid()) + { + target->inner = GenRawCppPtr(pages[i], RawCppPtrTypeImpl::UniversalPage); + BaseBuffView temp{.data = pages[i]->data.begin(), .len = pages[i]->data.size()}; + memcpy(reinterpret_cast(target) + sizeof(RawCppPtr), &temp, sizeof(BaseBuffView)); + } + else + { + target->inner = GenRawCppPtr(); + BaseBuffView temp{.data = nullptr, .len = 0}; + memcpy(reinterpret_cast(target) + sizeof(RawCppPtr), &temp, sizeof(BaseBuffView)); + } + } +// LOG_DEBUG(&Poco::Logger::get("ProxyFFIDebug"), "handle scan page {}", pages.size()); + return PageWithViewVec{.inner = reinterpret_cast(data), .len = pages.size() }; + } + catch (...) + { + tryLogCurrentException(__PRETTY_FUNCTION__); + exit(-1); + } +} + +void GcPageWithViewVec(PageWithView * inner, uint64_t len) +{ + for (size_t i = 0; i < len; i++) + { + GcRawCppPtr(inner[i].inner.ptr, inner[i].inner.type); + } + delete inner; +} + +void PurgePageStorage(const EngineStoreServerWrap * server) +{ + try + { + auto uni_ps = server->tmt->getContext().getGlobalUniversalPageStorage(); + uni_ps->gc(); + } + catch (...) + { + tryLogCurrentException(__PRETTY_FUNCTION__); + exit(-1); + } +} + +CppStrWithView SeekPSKey(const EngineStoreServerWrap * server, BaseBuffView raw_page_id) +{ + try + { + auto uni_ps = server->tmt->getContext().getGlobalUniversalPageStorage(); + RaftLogReader reader(*uni_ps); + UniversalPageId page_id{raw_page_id.data, raw_page_id.len}; + auto page_ids = reader.getLowerBound(page_id); + if (page_ids.empty()) + { + return CppStrWithView{.inner = GenRawCppPtr(), .view = BaseBuffView{}}; + } + else + { + auto * s = RawCppString::New(page_ids[0]); + return CppStrWithView{.inner = GenRawCppPtr(s, RawCppPtrTypeImpl::String), .view = BaseBuffView{s->data(), s->size()}}; + } + } + catch (...) + { + tryLogCurrentException(__PRETTY_FUNCTION__); + exit(-1); + } +} + static_assert(sizeof(RaftStoreProxyFFIHelper) == sizeof(TiFlashRaftProxyHelper)); static_assert(alignof(RaftStoreProxyFFIHelper) == alignof(TiFlashRaftProxyHelper)); @@ -451,6 +633,13 @@ void GcRawCppPtr(RawVoidPtr ptr, RawCppPtrType type) case RawCppPtrTypeImpl::WakerNotifier: delete reinterpret_cast(ptr); break; + case RawCppPtrTypeImpl::WriteBatch: +// LOG_DEBUG(&Poco::Logger::get("ProxyFFIDebug"), "destroy write batch"); + delete reinterpret_cast(ptr); + break; + case RawCppPtrTypeImpl::UniversalPage: + delete reinterpret_cast(ptr); + break; default: LOG_FMT_ERROR(&Poco::Logger::get(__FUNCTION__), "unknown type {}", type); exit(-1); diff --git a/dbms/src/Storages/Transaction/ProxyFFI.h b/dbms/src/Storages/Transaction/ProxyFFI.h index fde97041873..0ae27b1e16a 100644 --- a/dbms/src/Storages/Transaction/ProxyFFI.h +++ b/dbms/src/Storages/Transaction/ProxyFFI.h @@ -58,6 +58,8 @@ enum class RawCppPtrTypeImpl : RawCppPtrType String, PreHandledSnapshotWithFiles, WakerNotifier, + WriteBatch, + UniversalPage, }; RawCppPtr GenRawCppPtr(RawVoidPtr ptr_ = nullptr, RawCppPtrTypeImpl type_ = RawCppPtrTypeImpl::None); @@ -127,6 +129,19 @@ EngineStoreApplyRes HandleWriteRaftCmd(const EngineStoreServerWrap * server, RaftCmdHeader header); uint8_t NeedFlushData(EngineStoreServerWrap * server, uint64_t region_id); uint8_t TryFlushData(EngineStoreServerWrap * server, uint64_t region_id, uint8_t until_succeed, uint64_t index, uint64_t term); +RawCppPtr CreateWriteBatch(); +void WriteBatchPutPage(RawVoidPtr ptr, BaseBuffView page_id, BaseBuffView value); +void WriteBatchDelPage(RawVoidPtr ptr, BaseBuffView page_id); +uint64_t WriteBatchSize(RawVoidPtr ptr); +uint8_t WriteBatchIsEmpty(RawVoidPtr ptr); +void WriteBatchMerge(RawVoidPtr lhs, RawVoidPtr rhs); +void WriteBatchClear(RawVoidPtr ptr); +void ConsumeWriteBatch(const EngineStoreServerWrap * server, RawVoidPtr ptr); +PageWithView HandleReadPage(const EngineStoreServerWrap * server, BaseBuffView page_id); +PageWithViewVec HandleScanPage(const EngineStoreServerWrap * server, BaseBuffView start_page_id, BaseBuffView end_page_id); +void GcPageWithViewVec(PageWithView * inner, uint64_t len); +void PurgePageStorage(const EngineStoreServerWrap * server); +CppStrWithView SeekPSKey(const EngineStoreServerWrap * server, BaseBuffView raw_page_id); void AtomicUpdateProxy(EngineStoreServerWrap * server, RaftStoreProxyFFIHelper * proxy); void HandleDestroy(EngineStoreServerWrap * server, uint64_t region_id); EngineStoreApplyRes HandleIngestSST(EngineStoreServerWrap * server, SSTViewVec snaps, RaftCmdHeader header); @@ -163,6 +178,19 @@ inline EngineStoreServerHelper GetEngineStoreServerHelper( .fn_handle_admin_raft_cmd = HandleAdminRaftCmd, .fn_need_flush_data = NeedFlushData, .fn_try_flush_data = TryFlushData, + .fn_create_write_batch = CreateWriteBatch, + .fn_write_batch_put_page = WriteBatchPutPage, + .fn_write_batch_del_page = WriteBatchDelPage, + .fn_write_batch_size = WriteBatchSize, + .fn_write_batch_is_empty = WriteBatchIsEmpty, + .fn_write_batch_merge = WriteBatchMerge, + .fn_write_batch_clear = WriteBatchClear, + .fn_consume_write_batch = ConsumeWriteBatch, + .fn_handle_read_page = HandleReadPage, + .fn_handle_scan_page = HandleScanPage, + .fn_gc_page_with_view_vec = GcPageWithViewVec, + .fn_handle_purge_pagestorage = PurgePageStorage, + .fn_handle_seek_ps_key = SeekPSKey, .fn_atomic_update_proxy = AtomicUpdateProxy, .fn_handle_destroy = HandleDestroy, .fn_handle_ingest_sst = HandleIngestSST,