Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
0a6b83b
tests: Remove embed test funcs from PageEntriesEdit (#6074)
JaySon-Huang Sep 30, 2022
c484a4e
PageStorage: Make deserialize out of WALReader (#6070)
JaySon-Huang Oct 4, 2022
3e2addf
Fix that empty namespace won't be clean (#6108)
JaySon-Huang Oct 12, 2022
65eca8c
PageStorage: Refactor some file structure (#5828)
JaySon-Huang Sep 20, 2022
3030743
PageStorage: Refactor config (#6138)
JaySon-Huang Oct 14, 2022
f837392
Support using string as PageId
JaySon-Huang Sep 7, 2022
913be72
Add UniversalWriteBatch::merge
JaySon-Huang Oct 12, 2022
34a9c96
Add example for read/write RaftLog
JaySon-Huang Oct 13, 2022
6842fe9
Make PageWorkload work for uni
JaySon-Huang Oct 20, 2022
af946d7
alpha version
lidezhu Oct 20, 2022
54b7c73
update proxy
lidezhu Oct 21, 2022
59a20c6
uncomment
lidezhu Oct 21, 2022
9c9bdc3
Merge branch 'uni_ps_6.3' into raft_ps_6.3
lidezhu Oct 21, 2022
826733e
avoid check raft engine path
lidezhu Oct 21, 2022
493851a
fix isEmpty
lidezhu Oct 21, 2022
3d716ac
fix put page
lidezhu Oct 22, 2022
4036baa
Hackathon: avoid unnecessary copy
lidezhu Oct 22, 2022
c5ea4f2
try add gc for UniversalPageStorage
lidezhu Oct 31, 2022
024ef9e
fix build
lidezhu Oct 31, 2022
2bd2417
add some log for debug
lidezhu Oct 31, 2022
07bd32a
decrease gc interval
lidezhu Oct 31, 2022
308cf7c
fix gc
JaySon-Huang Oct 26, 2022
1c58345
try fix space allocation
lidezhu Nov 2, 2022
a8f07d6
try fix space allocation again
lidezhu Nov 2, 2022
a643cdd
add some metric
lidezhu Nov 2, 2022
4d5446b
fix metric
lidezhu Nov 2, 2022
fb2fe26
add more metric
lidezhu Nov 2, 2022
1ad8bc4
fix metric
lidezhu Nov 2, 2022
0561c23
add more metric
lidezhu Nov 2, 2022
142afe9
add more metric
lidezhu Nov 3, 2022
86feaa8
try lock less
lidezhu Nov 7, 2022
0d4e5f6
fix build
lidezhu Nov 7, 2022
b59a9f8
increase gc time
lidezhu Nov 7, 2022
e4509c5
fix crash
lidezhu Nov 7, 2022
effeb07
try reduce lock contention
lidezhu Nov 8, 2022
8045678
try increase gc interval
lidezhu Nov 8, 2022
81031a6
try improve gc logic
lidezhu Nov 9, 2022
53febb7
decrease gc interval to 10s
lidezhu Nov 9, 2022
e95765c
try remove unnecessary delete
lidezhu Nov 9, 2022
86eb27e
add more metric
lidezhu Nov 9, 2022
d7bf188
try use unique_lock
lidezhu Nov 10, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 15 additions & 1 deletion dbms/src/Common/TiFlashMetrics.h
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,20 @@ namespace DB
F(type_v3, {{"type", "v3"}}, ExpBuckets{0.0005, 2, 20})) \
M(tiflash_storage_page_write_batch_size, "The size of each write batch in bytes", Histogram, \
F(type_v3, {{"type", "v3"}}, ExpBuckets{4 * 1024, 4, 10})) \
M(tiflash_storage_page_write_duration_seconds, "The duration of each write batch", Histogram, \
F(type_total, {{"type", "total"}}, ExpBuckets{0.0001, 2, 20}), \
F(type_blob, {{"type", "blob"}}, ExpBuckets{0.0001, 2, 20}), \
/* the bucket range for apply in memory is 50us ~ 120s */ \
F(type_latch, {{"type", "latch"}}, ExpBuckets{0.00005, 1.8, 26}), \
F(type_wal, {{"type", "wal"}}, ExpBuckets{0.00005, 1.8, 26}), \
F(type_commit, {{"type", "commmit"}}, ExpBuckets{0.00005, 1.8, 26}), \
F(type_edit, {{"type", "edit"}}, ExpBuckets{0.00005, 1.8, 26}), \
F(type_get_stat, {{"type", "get_stat"}}, ExpBuckets{0.00005, 1.8, 26}), \
F(type_get_stat_latch, {{"type", "get_stat_latch"}}, ExpBuckets{0.00005, 1.8, 26}), \
F(type_blob_write, {{"type", "blob_write"}}, ExpBuckets{0.00005, 1.8, 26}), \
F(type_choose_stat, {{"type", "choose_stat"}}, ExpBuckets{0.00005, 1.8, 26}), \
F(type_lock_stat, {{"type", "lock_stat"}}, ExpBuckets{0.00005, 1.8, 26}), \
F(type_get_pos_from_stat, {{"type", "get_pos_from_stat"}}, ExpBuckets{0.00005, 1.8, 26})) \
M(tiflash_storage_logical_throughput_bytes, "The logical throughput of read tasks of storage in bytes", Histogram, \
F(type_read, {{"type", "read"}}, EqualWidthBuckets{1 * 1024 * 1024, 60, 50 * 1024 * 1024})) \
M(tiflash_storage_io_limiter, "Storage I/O limiter metrics", Counter, F(type_fg_read_req_bytes, {"type", "fg_read_req_bytes"}), \
Expand Down Expand Up @@ -237,7 +251,7 @@ namespace DB
struct ExpBuckets
{
const double start;
const int base;
const double base;
const size_t size;
inline operator prometheus::Histogram::BucketBoundaries() const &&
{
Expand Down
51 changes: 51 additions & 0 deletions dbms/src/Interpreters/Context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
#include <Storages/IStorage.h>
#include <Storages/MarkCache.h>
#include <Storages/Page/V3/PageStorageImpl.h>
#include <Storages/Page/universal/UniversalPageStorage.h>
#include <Storages/PathCapacityMetrics.h>
#include <Storages/PathPool.h>
#include <Storages/Transaction/BackgroundService.h>
Expand All @@ -65,6 +66,8 @@
#include <fiu.h>
#include <fmt/core.h>



#include <boost/functional/hash/hash.hpp>
#include <pcg_random.hpp>
#include <set>
Expand Down Expand Up @@ -106,6 +109,35 @@ namespace FailPoints
extern const char force_context_path[];
} // namespace FailPoints

struct UniversalPageStorageWrapper
{
UniversalPageStoragePtr uni_page_storage;
BackgroundProcessingPool::TaskHandle gc_handle;
std::atomic<Timepoint> last_try_gc_time = Clock::now();

void restore(Context & global_context)
{
uni_page_storage->restore();
gc_handle = global_context.getBackgroundPool().addTask(
[this, global_context] {
return this->gc();
},
false);
}

bool gc()
{
Timepoint now = Clock::now();
const std::chrono::seconds try_gc_period(10);
if (now < (last_try_gc_time.load() + try_gc_period))
return false;

last_try_gc_time = now;
return this->uni_page_storage->gc();
}
};
using UniversalPageStorageWrapperPtr = std::shared_ptr<UniversalPageStorageWrapper>;


/** Set of known objects (environment), that could be used in query.
* Shared (global) part. Order of members (especially, order of destruction) is very important.
Expand Down Expand Up @@ -159,6 +191,7 @@ struct ContextShared
IORateLimiter io_rate_limiter;
PageStorageRunMode storage_run_mode = PageStorageRunMode::ONLY_V3;
DM::GlobalStoragePoolPtr global_storage_pool;
UniversalPageStorageWrapperPtr uni_page_storage_wrapper;
/// Named sessions. The user could specify session identifier to reuse settings and temporary tables in subsequent requests.

class SessionKeyHash
Expand Down Expand Up @@ -1615,6 +1648,24 @@ DM::GlobalStoragePoolPtr Context::getGlobalStoragePool() const
return shared->global_storage_pool;
}

void Context::initializeGlobalUniversalPageStorage(const PathPool & path_pool, const FileProviderPtr & file_provider)
{
auto lock = getLock();
if (shared->uni_page_storage_wrapper)
throw Exception("UniversalPageStorage has already been initialized.", ErrorCodes::LOGICAL_ERROR);

shared->uni_page_storage_wrapper = std::make_shared<UniversalPageStorageWrapper>();
shared->uni_page_storage_wrapper->uni_page_storage = UniversalPageStorage::create("global", path_pool.getPSDiskDelegatorGlobalMulti("global"), {}, file_provider);
shared->uni_page_storage_wrapper->restore(*this);
LOG_INFO(shared->log, "initialized GlobalUniversalPageStorage");
}

UniversalPageStoragePtr Context::getGlobalUniversalPageStorage() const
{
auto lock = getLock();
return shared->uni_page_storage_wrapper->uni_page_storage;
}

UInt16 Context::getTCPPort() const
{
auto lock = getLock();
Expand Down
6 changes: 6 additions & 0 deletions dbms/src/Interpreters/Context.h
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,9 @@ using Dependencies = std::vector<DatabaseAndTableName>;
using TableAndCreateAST = std::pair<StoragePtr, ASTPtr>;
using TableAndCreateASTs = std::map<String, TableAndCreateAST>;

class UniversalPageStorage;
using UniversalPageStoragePtr = std::shared_ptr<UniversalPageStorage>;

/** A set of known objects that can be used in the query.
* Consists of a shared part (always common to all sessions and queries)
* and copied part (which can be its own for each session or query).
Expand Down Expand Up @@ -421,6 +424,9 @@ class Context
bool initializeGlobalStoragePoolIfNeed(const PathPool & path_pool);
DM::GlobalStoragePoolPtr getGlobalStoragePool() const;

void initializeGlobalUniversalPageStorage(const PathPool & path_pool, const FileProviderPtr & file_provider);
UniversalPageStoragePtr getGlobalUniversalPageStorage() const;

/// Call after initialization before using system logs. Call for global context.
void initializeSystemLogs();

Expand Down
3 changes: 3 additions & 0 deletions dbms/src/Server/Server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1087,6 +1087,9 @@ int Server::main(const std::vector<std::string> & /*args*/)
global_context->initializeGlobalStoragePoolIfNeed(global_context->getPathPool());
LOG_FMT_INFO(log, "Global PageStorage run mode is {}", static_cast<UInt8>(global_context->getPageStorageRunMode()));

///
global_context->initializeGlobalUniversalPageStorage(global_context->getPathPool(), global_context->getFileProvider());

/// Initialize RateLimiter.
global_context->initializeRateLimiter(config(), bg_pool, blockable_bg_pool);

Expand Down
54 changes: 24 additions & 30 deletions dbms/src/Storages/Page/V3/Blob/BlobStat.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

#include <boost/algorithm/string/classification.hpp>
#include <boost/algorithm/string/split.hpp>
#include "ext/scope_guard.h"

namespace ProfileEvents
{
Expand Down Expand Up @@ -157,13 +158,20 @@ BlobStats::BlobStatPtr BlobStats::createStatNotChecking(BlobFileId blob_file_id,
/// If creating a new BlobFile, we need to register the BlobFile's path to delegator, so it's necessary to call `addPageFileUsedSize` here.
delegator->addPageFileUsedSize({blob_file_id, 0}, 0, path, true);
stats_map[path].emplace_back(stat);
if (stats_map_next_index.find(path) == stats_map_next_index.end())
{
stats_map_next_index[path] = 0;
}
return stat;
}

void BlobStats::eraseStat(const BlobStatPtr && stat, const std::lock_guard<std::mutex> &)
{
PageFileIdAndLevel id_lvl{stat->id, 0};
stats_map[delegator->getPageFilePath(id_lvl)].remove(stat);
auto & stats = stats_map[delegator->getPageFilePath(id_lvl)];
auto iter = std::find(stats.begin(), stats.end(), stat);
assert(iter != stats.end());
stats.erase(iter);
}

void BlobStats::eraseStat(BlobFileId blob_file_id, const std::lock_guard<std::mutex> & lock)
Expand Down Expand Up @@ -196,9 +204,6 @@ void BlobStats::eraseStat(BlobFileId blob_file_id, const std::lock_guard<std::mu

std::pair<BlobStats::BlobStatPtr, BlobFileId> BlobStats::chooseStat(size_t buf_size, const std::lock_guard<std::mutex> &)
{
BlobStatPtr stat_ptr = nullptr;
double smallest_valid_rate = 2;

// No stats exist
if (stats_map.empty())
{
Expand All @@ -213,27 +218,25 @@ std::pair<BlobStats::BlobStatPtr, BlobFileId> BlobStats::chooseStat(size_t buf_s
std::advance(stats_iter, stats_map_path_index);

size_t path_iter_idx = 0;
SCOPE_EXIT({
// advance the `stats_map_path_idx` without size checking
stats_map_path_index += path_iter_idx + 1;
});
for (path_iter_idx = 0; path_iter_idx < stats_map.size(); ++path_iter_idx)
{
// Try to find a suitable stat under current path (path=`stats_iter->first`)
for (const auto & stat : stats_iter->second)
for (size_t i = 0; i < stats_iter->second.size(); i++)
{
auto lock = stat->lock(); // TODO: will it bring performance regression?
if (stat->isNormal()
&& stat->sm_max_caps >= buf_size
&& stat->sm_valid_rate < smallest_valid_rate)
stats_map_next_index[stats_iter->first] = stats_map_next_index[stats_iter->first] % stats_iter->second.size();
const auto & stat = (stats_iter->second)[stats_map_next_index[stats_iter->first]];
stats_map_next_index[stats_iter->first] += 1;
auto lock = stat->defer_lock(); // TODO: will it bring performance regression?
if (lock.try_lock() && stat->isNormal() && stat->sm_max_caps >= buf_size)
{
smallest_valid_rate = stat->sm_valid_rate;
stat_ptr = stat;
return std::make_pair(stat, INVALID_BLOBFILE_ID);
}
}

// Already find the available stat under current path.
if (stat_ptr != nullptr)
{
break;
}

// Try to find stat in the next path.
stats_iter++;
if (stats_iter == stats_map.end())
Expand All @@ -242,16 +245,7 @@ std::pair<BlobStats::BlobStatPtr, BlobFileId> BlobStats::chooseStat(size_t buf_s
}
}

// advance the `stats_map_path_idx` without size checking
stats_map_path_index += path_iter_idx + 1;

// Can not find a suitable stat under all paths
if (stat_ptr == nullptr)
{
return std::make_pair(nullptr, roll_id);
}

return std::make_pair(stat_ptr, INVALID_BLOBFILE_ID);
return std::make_pair(nullptr, roll_id);
}

BlobStats::BlobStatPtr BlobStats::blobIdToStat(BlobFileId file_id, bool ignore_not_exist)
Expand Down Expand Up @@ -283,14 +277,14 @@ BlobStats::BlobStatPtr BlobStats::blobIdToStat(BlobFileId file_id, bool ignore_n
* BlobStat methods *
********************/

BlobFileOffset BlobStats::BlobStat::getPosFromStat(size_t buf_size, const std::lock_guard<std::mutex> &)
BlobFileOffset BlobStats::BlobStat::getPosFromStat(size_t buf_size, const std::unique_lock<std::mutex> &)
{
BlobFileOffset offset = 0;
UInt64 max_cap = 0;
bool expansion = true;

std::tie(offset, max_cap, expansion) = smap->searchInsertOffset(buf_size);
ProfileEvents::increment(expansion ? ProfileEvents::PSV3MBlobExpansion : ProfileEvents::PSV3MBlobReused);
// ProfileEvents::increment(expansion ? ProfileEvents::PSV3MBlobExpansion : ProfileEvents::PSV3MBlobReused);

/**
* Whatever `searchInsertOffset` success or failed,
Expand Down Expand Up @@ -321,7 +315,7 @@ BlobFileOffset BlobStats::BlobStat::getPosFromStat(size_t buf_size, const std::l
return offset;
}

size_t BlobStats::BlobStat::removePosFromStat(BlobFileOffset offset, size_t buf_size, const std::lock_guard<std::mutex> &)
size_t BlobStats::BlobStat::removePosFromStat(BlobFileOffset offset, size_t buf_size, const std::unique_lock<std::mutex> &)
{
if (!smap->markFree(offset, buf_size))
{
Expand Down
20 changes: 14 additions & 6 deletions dbms/src/Storages/Page/V3/Blob/BlobStat.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ class BlobStats

struct BlobStat
{
String parent_path;

const BlobFileId id;
std::atomic<BlobStatType> type;

Expand All @@ -79,9 +81,14 @@ class BlobStats
, sm_max_caps(sm_max_caps_)
{}

[[nodiscard]] std::lock_guard<std::mutex> lock()
[[nodiscard]] std::unique_lock<std::mutex> lock()
{
return std::unique_lock(sm_lock);
}

[[nodiscard]] std::unique_lock<std::mutex> defer_lock()
{
return std::lock_guard(sm_lock);
return std::unique_lock(sm_lock, std::defer_lock);
}

bool isNormal() const
Expand All @@ -99,12 +106,12 @@ class BlobStats
type.store(BlobStatType::READ_ONLY);
}

BlobFileOffset getPosFromStat(size_t buf_size, const std::lock_guard<std::mutex> &);
BlobFileOffset getPosFromStat(size_t buf_size, const std::unique_lock<std::mutex> &);

/**
* The return value is the valid data size remained in the BlobFile after the remove
*/
size_t removePosFromStat(BlobFileOffset offset, size_t buf_size, const std::lock_guard<std::mutex> &);
size_t removePosFromStat(BlobFileOffset offset, size_t buf_size, const std::unique_lock<std::mutex> &);

/**
* This method is only used when blobstore restore
Expand Down Expand Up @@ -168,7 +175,7 @@ class BlobStats

BlobStatPtr blobIdToStat(BlobFileId file_id, bool ignore_not_exist = false);

using StatsMap = std::map<String, std::list<BlobStatPtr>>;
using StatsMap = std::map<String, std::vector<BlobStatPtr>>;
StatsMap getStats() const
{
auto guard = lock();
Expand Down Expand Up @@ -196,7 +203,8 @@ class BlobStats
BlobFileId roll_id = 1;
// Index for selecting next path for creating new blobfile
UInt32 stats_map_path_index = 0;
std::map<String, std::list<BlobStatPtr>> stats_map;
std::map<String, std::vector<BlobStatPtr>> stats_map;
std::map<String, UInt32> stats_map_next_index;
};

} // namespace DB::PS::V3
Loading