diff --git a/src/search/index_manager.cc b/src/search/index_manager.cc index c51f30d3e20..3685a8871b8 100644 --- a/src/search/index_manager.cc +++ b/src/search/index_manager.cc @@ -142,7 +142,7 @@ Status IndexManager::Create(engine::Context &ctx, std::unique_ptrns, info->name); auto cf = storage->GetCFHandle(ColumnFamilyID::Search); - auto batch = storage->GetWriteBatchBase(); + auto batch = storage->GetWriteBatchBase(ctx); std::string meta_val; info->metadata.Encode(&meta_val); @@ -244,7 +244,7 @@ Status IndexManager::Drop(engine::Context &ctx, std::string_view index_name, con SearchKey index_key(info->ns, info->name); auto cf = storage->GetCFHandle(ColumnFamilyID::Search); - auto batch = storage->GetWriteBatchBase(); + auto batch = storage->GetWriteBatchBase(ctx); auto s = batch->Delete(cf, index_key.ConstructIndexMeta()); if (!s.ok()) { diff --git a/src/search/indexer.cc b/src/search/indexer.cc index 84b5a6547b4..e4bca138e56 100644 --- a/src/search/indexer.cc +++ b/src/search/indexer.cc @@ -223,7 +223,7 @@ Status IndexUpdater::UpdateTagIndex(engine::Context &ctx, std::string_view key, } auto *storage = indexer->storage; - auto batch = storage->GetWriteBatchBase(); + auto batch = storage->GetWriteBatchBase(ctx); auto cf_handle = storage->GetCFHandle(ColumnFamilyID::Search); for (const auto &tag : tags_to_delete) { @@ -256,7 +256,7 @@ Status IndexUpdater::UpdateNumericIndex(engine::Context &ctx, std::string_view k CHECK(current.IsNull() || current.Is()); auto *storage = indexer->storage; - auto batch = storage->GetWriteBatchBase(); + auto batch = storage->GetWriteBatchBase(ctx); auto cf_handle = storage->GetCFHandle(ColumnFamilyID::Search); if (!original.IsNull()) { @@ -295,14 +295,14 @@ Status IndexUpdater::UpdateHnswVectorIndex(engine::Context &ctx, std::string_vie auto hnsw = HnswIndex(search_key, vector, storage); if (!original.IsNull()) { - auto batch = storage->GetWriteBatchBase(); + auto batch = storage->GetWriteBatchBase(ctx); GET_OR_RET(hnsw.DeleteVectorEntry(ctx, key, batch)); auto s = storage->Write(ctx, storage->DefaultWriteOptions(), batch->GetWriteBatch()); if (!s.ok()) return {Status::NotOK, s.ToString()}; } if (!current.IsNull()) { - auto batch = storage->GetWriteBatchBase(); + auto batch = storage->GetWriteBatchBase(ctx); GET_OR_RET(hnsw.InsertVectorEntry(ctx, key, current.Get(), batch)); auto s = storage->Write(ctx, storage->DefaultWriteOptions(), batch->GetWriteBatch()); if (!s.ok()) return {Status::NotOK, s.ToString()}; diff --git a/src/storage/redis_db.cc b/src/storage/redis_db.cc index 1ff52c8b008..3048543770c 100644 --- a/src/storage/redis_db.cc +++ b/src/storage/redis_db.cc @@ -128,7 +128,7 @@ rocksdb::Status Database::Expire(engine::Context &ctx, const Slice &user_key, ui } else { EncodeFixed32(value.data() + 1, Metadata::ExpireMsToS(timestamp)); } - auto batch = storage_->GetWriteBatchBase(); + auto batch = storage_->GetWriteBatchBase(ctx); WriteBatchLogData log_data(kRedisNone, {std::to_string(kRedisCmdExpire)}); s = batch->PutLogData(log_data.Encode()); if (!s.ok()) { @@ -168,7 +168,7 @@ rocksdb::Status Database::MDel(engine::Context &ctx, const std::vector &k ns_keys.emplace_back(std::move(ns_key)); } - auto batch = storage_->GetWriteBatchBase(); + auto batch = storage_->GetWriteBatchBase(ctx); WriteBatchLogData log_data(kRedisNone); auto s = batch->PutLogData(log_data.Encode()); if (!s.ok()) { @@ -668,7 +668,7 @@ rocksdb::Status Database::Copy(engine::Context &ctx, const std::string &key, con if (key == new_key) return rocksdb::Status::OK(); - auto batch = storage_->GetWriteBatchBase(); + auto batch = storage_->GetWriteBatchBase(ctx); WriteBatchLogData log_data(type); s = batch->PutLogData(log_data.Encode()); if (!s.ok()) { diff --git a/src/storage/redis_pubsub.cc b/src/storage/redis_pubsub.cc index 4bdfbeed244..eabd760e240 100644 --- a/src/storage/redis_pubsub.cc +++ b/src/storage/redis_pubsub.cc @@ -26,7 +26,7 @@ rocksdb::Status PubSub::Publish(engine::Context &ctx, const Slice &channel, cons if (storage_->GetConfig()->IsSlave()) { return rocksdb::Status::NotSupported("can't publish to db in slave mode"); } - auto batch = storage_->GetWriteBatchBase(); + auto batch = storage_->GetWriteBatchBase(ctx); auto s = batch->Put(pubsub_cf_handle_, channel, value); if (!s.ok()) { return s; diff --git a/src/storage/storage.cc b/src/storage/storage.cc index e59a0e602dd..c70a306e32d 100644 --- a/src/storage/storage.cc +++ b/src/storage/storage.cc @@ -714,13 +714,7 @@ rocksdb::Status Storage::writeToDB(engine::Context &ctx, const rocksdb::WriteOpt } if (ctx.txn_context_enabled) { - // Extract writes from the updates and append to the ctx.batch - if (ctx.batch == nullptr) { - ctx.batch = std::make_unique(); - } - WriteBatchIndexer handle(ctx); - auto s = updates->Iterate(&handle); - if (!s.ok()) return s; + CHECK(ctx.batch->GetWriteBatch() == updates); } else { CHECK(ctx.batch == nullptr); } @@ -730,7 +724,7 @@ rocksdb::Status Storage::writeToDB(engine::Context &ctx, const rocksdb::WriteOpt rocksdb::Status Storage::Delete(engine::Context &ctx, const rocksdb::WriteOptions &options, rocksdb::ColumnFamilyHandle *cf_handle, const rocksdb::Slice &key) { - auto batch = GetWriteBatchBase(); + auto batch = GetWriteBatchBase(ctx); auto s = batch->Delete(cf_handle, key); if (!s.ok()) { return s; @@ -740,7 +734,7 @@ rocksdb::Status Storage::Delete(engine::Context &ctx, const rocksdb::WriteOption rocksdb::Status Storage::DeleteRange(engine::Context &ctx, const rocksdb::WriteOptions &options, rocksdb::ColumnFamilyHandle *cf_handle, Slice begin, Slice end) { - auto batch = GetWriteBatchBase(); + auto batch = GetWriteBatchBase(ctx); auto s = batch->DeleteRange(cf_handle, begin, end); if (!s.ok()) { return s; @@ -760,7 +754,7 @@ rocksdb::Status Storage::FlushScripts(engine::Context &ctx, const rocksdb::Write // didn't contain the end key. end_key[end_key.size() - 1] += 1; - auto batch = GetWriteBatchBase(); + auto batch = GetWriteBatchBase(ctx); auto s = batch->DeleteRange(cf_handle, begin_key, end_key); if (!s.ok()) { return s; @@ -975,10 +969,16 @@ Status Storage::CommitTxn() { return {Status::NotOK, s.ToString()}; } -ObserverOrUniquePtr Storage::GetWriteBatchBase() { +ObserverOrUniquePtr Storage::GetWriteBatchBase(Context &ctx) { if (is_txn_mode_) { return ObserverOrUniquePtr(txn_write_batch_.get(), ObserverOrUnique::Observer); } + if (ctx.txn_context_enabled) { + if (!ctx.batch) { + ctx.batch = std::make_unique(); + } + return ObserverOrUniquePtr(ctx.batch.get(), ObserverOrUnique::Observer); + } return ObserverOrUniquePtr( new rocksdb::WriteBatch(0 /*reserved_bytes*/, GetWriteBatchMaxBytes()), ObserverOrUnique::Unique); } @@ -987,7 +987,7 @@ Status Storage::WriteToPropagateCF(engine::Context &ctx, const std::string &key, if (config_->IsSlave()) { return {Status::NotOK, "cannot write to propagate column family in slave mode"}; } - auto batch = GetWriteBatchBase(); + auto batch = GetWriteBatchBase(ctx); auto cf = GetCFHandle(ColumnFamilyID::Propagate); auto s = batch->Put(cf, key, value); s = Write(ctx, default_write_opts_, batch->GetWriteBatch()); diff --git a/src/storage/storage.h b/src/storage/storage.h index 9f2352d5e8e..d0b300afae5 100644 --- a/src/storage/storage.h +++ b/src/storage/storage.h @@ -298,7 +298,7 @@ class Storage { Status BeginTxn(); Status CommitTxn(); - ObserverOrUniquePtr GetWriteBatchBase(); + ObserverOrUniquePtr GetWriteBatchBase(Context &ctx); Storage(const Storage &) = delete; Storage &operator=(const Storage &) = delete; diff --git a/src/types/redis_bitmap.cc b/src/types/redis_bitmap.cc index 00c8d1b3fde..029dd19ac99 100644 --- a/src/types/redis_bitmap.cc +++ b/src/types/redis_bitmap.cc @@ -210,7 +210,7 @@ rocksdb::Status Bitmap::SetBit(engine::Context &ctx, const Slice &user_key, uint auto *data_ptr = reinterpret_cast(value.data()); *old_bit = util::lsb::GetBit(data_ptr, bit_offset_in_segment); util::lsb::SetBitTo(data_ptr, bit_offset_in_segment, new_bit); - auto batch = storage_->GetWriteBatchBase(); + auto batch = storage_->GetWriteBatchBase(ctx); WriteBatchLogData log_data(kRedisBitmap, {std::to_string(kRedisCmdSetBit), std::to_string(bit_offset)}); s = batch->PutLogData(log_data.Encode()); if (!s.ok()) return s; @@ -483,7 +483,7 @@ rocksdb::Status Bitmap::BitOp(engine::Context &ctx, BitOpFlags op_flag, const st } size_t num_keys = meta_pairs.size(); - auto batch = storage_->GetWriteBatchBase(); + auto batch = storage_->GetWriteBatchBase(ctx); if (max_bitmap_size == 0) { /* Compute the bit operation, if all bitmap is empty. cleanup the dest bitmap. */ auto s = batch->Delete(metadata_cf_handle_, ns_key); @@ -850,7 +850,7 @@ rocksdb::Status Bitmap::bitfield(engine::Context &ctx, const Slice &user_key, co if constexpr (!ReadOnly) { // Write changes into storage. - auto batch = storage_->GetWriteBatchBase(); + auto batch = storage_->GetWriteBatchBase(ctx); if (bitfieldWriteAheadLog(batch, ops)) { auto s = cache.BatchForFlush(batch); if (!s.ok()) { diff --git a/src/types/redis_bitmap_string.cc b/src/types/redis_bitmap_string.cc index 33202b6f52b..542bab85c00 100644 --- a/src/types/redis_bitmap_string.cc +++ b/src/types/redis_bitmap_string.cc @@ -56,7 +56,7 @@ rocksdb::Status BitmapString::SetBit(engine::Context &ctx, const Slice &ns_key, *raw_value = raw_value->substr(0, header_offset); raw_value->append(string_value); - auto batch = storage_->GetWriteBatchBase(); + auto batch = storage_->GetWriteBatchBase(ctx); WriteBatchLogData log_data(kRedisString); auto s = batch->PutLogData(log_data.Encode()); if (!s.ok()) { @@ -260,7 +260,7 @@ rocksdb::Status BitmapString::Bitfield(engine::Context &ctx, const Slice &ns_key raw_value->resize(header_offset); raw_value->append(string_value); - auto batch = storage_->GetWriteBatchBase(); + auto batch = storage_->GetWriteBatchBase(ctx); WriteBatchLogData log_data(kRedisString); auto s = batch->PutLogData(log_data.Encode()); if (!s.ok()) { diff --git a/src/types/redis_bloom_chain.cc b/src/types/redis_bloom_chain.cc index c2c512987bf..90a3636ba6f 100644 --- a/src/types/redis_bloom_chain.cc +++ b/src/types/redis_bloom_chain.cc @@ -76,7 +76,7 @@ rocksdb::Status BloomChain::createBloomChain(engine::Context &ctx, const Slice & auto [block_split_bloom_filter, _] = CreateBlockSplitBloomFilter(metadata->bloom_bytes); - auto batch = storage_->GetWriteBatchBase(); + auto batch = storage_->GetWriteBatchBase(ctx); WriteBatchLogData log_data(kRedisBloomFilter, {"createBloomChain"}); auto s = batch->PutLogData(log_data.Encode()); if (!s.ok()) return s; @@ -176,7 +176,7 @@ rocksdb::Status BloomChain::InsertCommon(engine::Context &ctx, const Slice &user getItemHashList(items, &item_hash_list); uint64_t origin_size = metadata.size; - auto batch = storage_->GetWriteBatchBase(); + auto batch = storage_->GetWriteBatchBase(ctx); WriteBatchLogData log_data(kRedisBloomFilter, {"insert"}); s = batch->PutLogData(log_data.Encode()); if (!s.ok()) return s; diff --git a/src/types/redis_hash.cc b/src/types/redis_hash.cc index 1f31b32793d..80758426b53 100644 --- a/src/types/redis_hash.cc +++ b/src/types/redis_hash.cc @@ -93,7 +93,7 @@ rocksdb::Status Hash::IncrBy(engine::Context &ctx, const Slice &user_key, const } *new_value = old_value + increment; - auto batch = storage_->GetWriteBatchBase(); + auto batch = storage_->GetWriteBatchBase(ctx); WriteBatchLogData log_data(kRedisHash); s = batch->PutLogData(log_data.Encode()); if (!s.ok()) return s; @@ -140,7 +140,7 @@ rocksdb::Status Hash::IncrByFloat(engine::Context &ctx, const Slice &user_key, c } *new_value = n; - auto batch = storage_->GetWriteBatchBase(); + auto batch = storage_->GetWriteBatchBase(ctx); WriteBatchLogData log_data(kRedisHash); s = batch->PutLogData(log_data.Encode()); if (!s.ok()) return s; @@ -205,7 +205,7 @@ rocksdb::Status Hash::Delete(engine::Context &ctx, const Slice &user_key, const std::string ns_key = AppendNamespacePrefix(user_key); HashMetadata metadata(false); - auto batch = storage_->GetWriteBatchBase(); + auto batch = storage_->GetWriteBatchBase(ctx); WriteBatchLogData log_data(kRedisHash); auto s = batch->PutLogData(log_data.Encode()); if (!s.ok()) return s; @@ -252,7 +252,7 @@ rocksdb::Status Hash::MSet(engine::Context &ctx, const Slice &user_key, const st ttl_updated = true; } int added = 0; - auto batch = storage_->GetWriteBatchBase(); + auto batch = storage_->GetWriteBatchBase(ctx); WriteBatchLogData log_data(kRedisHash); s = batch->PutLogData(log_data.Encode()); if (!s.ok()) return s; diff --git a/src/types/redis_hyperloglog.cc b/src/types/redis_hyperloglog.cc index 63ee0b2b926..6410bcb5b57 100644 --- a/src/types/redis_hyperloglog.cc +++ b/src/types/redis_hyperloglog.cc @@ -118,7 +118,7 @@ rocksdb::Status HyperLogLog::Add(engine::Context &ctx, const Slice &user_key, return s; } - auto batch = storage_->GetWriteBatchBase(); + auto batch = storage_->GetWriteBatchBase(ctx); WriteBatchLogData log_data(kRedisHyperLogLog); s = batch->PutLogData(log_data.Encode()); if (!s.ok()) return s; @@ -252,7 +252,7 @@ rocksdb::Status HyperLogLog::Merge(engine::Context &ctx, const Slice &dest_user_ s = mergeUserKeys(ctx, all_user_keys, ®isters); } - auto batch = storage_->GetWriteBatchBase(); + auto batch = storage_->GetWriteBatchBase(ctx); WriteBatchLogData log_data(kRedisHyperLogLog); s = batch->PutLogData(log_data.Encode()); if (!s.ok()) return s; diff --git a/src/types/redis_json.cc b/src/types/redis_json.cc index 43b516bad4e..199239616b3 100644 --- a/src/types/redis_json.cc +++ b/src/types/redis_json.cc @@ -29,7 +29,7 @@ namespace redis { rocksdb::Status Json::write(engine::Context &ctx, Slice ns_key, JsonMetadata *metadata, const JsonValue &json_val) { - auto batch = storage_->GetWriteBatchBase(); + auto batch = storage_->GetWriteBatchBase(ctx); WriteBatchLogData log_data(kRedisJson); auto s = batch->PutLogData(log_data.Encode()); if (!s.ok()) return s; @@ -94,7 +94,7 @@ rocksdb::Status Json::create(engine::Context &ctx, const std::string &ns_key, Js } rocksdb::Status Json::del(engine::Context &ctx, const Slice &ns_key) { - auto batch = storage_->GetWriteBatchBase(); + auto batch = storage_->GetWriteBatchBase(ctx); WriteBatchLogData log_data(kRedisJson); auto s = batch->PutLogData(log_data.Encode()); if (!s.ok()) return s; @@ -554,7 +554,7 @@ rocksdb::Status Json::MSet(engine::Context &ctx, const std::vector ns_keys.emplace_back(std::move(ns_key)); } - auto batch = storage_->GetWriteBatchBase(); + auto batch = storage_->GetWriteBatchBase(ctx); WriteBatchLogData log_data(kRedisJson); // A single JSON key may be modified multiple times in the MSET command, diff --git a/src/types/redis_list.cc b/src/types/redis_list.cc index acef1ed9cc6..d71d8fb6de7 100644 --- a/src/types/redis_list.cc +++ b/src/types/redis_list.cc @@ -58,7 +58,7 @@ rocksdb::Status List::push(engine::Context &ctx, const Slice &user_key, const st std::string ns_key = AppendNamespacePrefix(user_key); ListMetadata metadata; - auto batch = storage_->GetWriteBatchBase(); + auto batch = storage_->GetWriteBatchBase(ctx); RedisCommand cmd = left ? kRedisCmdLPush : kRedisCmdRPush; WriteBatchLogData log_data(kRedisList, {std::to_string(cmd)}); auto s = batch->PutLogData(log_data.Encode()); @@ -112,7 +112,7 @@ rocksdb::Status List::PopMulti(engine::Context &ctx, const rocksdb::Slice &user_ rocksdb::Status s = GetMetadata(ctx, ns_key, &metadata); if (!s.ok()) return s; - auto batch = storage_->GetWriteBatchBase(); + auto batch = storage_->GetWriteBatchBase(ctx); RedisCommand cmd = left ? kRedisCmdLPop : kRedisCmdRPop; WriteBatchLogData log_data(kRedisList, {std::to_string(cmd)}); s = batch->PutLogData(log_data.Encode()); @@ -210,7 +210,7 @@ rocksdb::Status List::Rem(engine::Context &ctx, const Slice &user_key, int count return rocksdb::Status::NotFound(); } - auto batch = storage_->GetWriteBatchBase(); + auto batch = storage_->GetWriteBatchBase(ctx); WriteBatchLogData log_data(kRedisList, {std::to_string(kRedisCmdLRem), std::to_string(count), elem.ToString()}); s = batch->PutLogData(log_data.Encode()); if (!s.ok()) return s; @@ -298,7 +298,7 @@ rocksdb::Status List::Insert(engine::Context &ctx, const Slice &user_key, const return rocksdb::Status::NotFound(); } - auto batch = storage_->GetWriteBatchBase(); + auto batch = storage_->GetWriteBatchBase(ctx); WriteBatchLogData log_data(kRedisList, {std::to_string(kRedisCmdLInsert), before ? "1" : "0", pivot.ToString(), elem.ToString()}); s = batch->PutLogData(log_data.Encode()); @@ -476,7 +476,7 @@ rocksdb::Status List::Set(engine::Context &ctx, const Slice &user_key, int index } if (value == elem) return rocksdb::Status::OK(); - auto batch = storage_->GetWriteBatchBase(); + auto batch = storage_->GetWriteBatchBase(ctx); WriteBatchLogData log_data(kRedisList, {std::to_string(kRedisCmdLSet), std::to_string(index)}); s = batch->PutLogData(log_data.Encode()); if (!s.ok()) return s; @@ -525,7 +525,7 @@ rocksdb::Status List::lmoveOnSingleList(engine::Context &ctx, const rocksdb::Sli return rocksdb::Status::OK(); } - auto batch = storage_->GetWriteBatchBase(); + auto batch = storage_->GetWriteBatchBase(ctx); WriteBatchLogData log_data(kRedisList, {std::to_string(kRedisCmdLMove), src.ToString(), src.ToString(), src_left ? "left" : "right", dst_left ? "left" : "right"}); s = batch->PutLogData(log_data.Encode()); @@ -576,7 +576,7 @@ rocksdb::Status List::lmoveOnTwoLists(engine::Context &ctx, const rocksdb::Slice elem->clear(); - auto batch = storage_->GetWriteBatchBase(); + auto batch = storage_->GetWriteBatchBase(ctx); WriteBatchLogData log_data(kRedisList, {std::to_string(kRedisCmdLMove), src.ToString(), dst.ToString(), src_left ? "left" : "right", dst_left ? "left" : "right"}); s = batch->PutLogData(log_data.Encode()); @@ -642,7 +642,7 @@ rocksdb::Status List::Trim(engine::Context &ctx, const Slice &user_key, int star } if (start < 0) start = 0; - auto batch = storage_->GetWriteBatchBase(); + auto batch = storage_->GetWriteBatchBase(ctx); WriteBatchLogData log_data(kRedisList, std::vector{std::to_string(kRedisCmdLTrim), std::to_string(start), std::to_string(stop)}); s = batch->PutLogData(log_data.Encode()); diff --git a/src/types/redis_set.cc b/src/types/redis_set.cc index 07d5c782943..d5fe33733f3 100644 --- a/src/types/redis_set.cc +++ b/src/types/redis_set.cc @@ -38,7 +38,7 @@ rocksdb::Status Set::Overwrite(engine::Context &ctx, Slice user_key, const std:: std::string ns_key = AppendNamespacePrefix(user_key); SetMetadata metadata; - auto batch = storage_->GetWriteBatchBase(); + auto batch = storage_->GetWriteBatchBase(ctx); WriteBatchLogData log_data(kRedisSet); auto s = batch->PutLogData(log_data.Encode()); if (!s.ok()) return s; @@ -66,7 +66,7 @@ rocksdb::Status Set::Add(engine::Context &ctx, const Slice &user_key, const std: if (!s.ok() && !s.IsNotFound()) return s; std::string value; - auto batch = storage_->GetWriteBatchBase(); + auto batch = storage_->GetWriteBatchBase(ctx); WriteBatchLogData log_data(kRedisSet); s = batch->PutLogData(log_data.Encode()); if (!s.ok()) return s; @@ -103,7 +103,7 @@ rocksdb::Status Set::Remove(engine::Context &ctx, const Slice &user_key, const s if (!s.ok()) return s.IsNotFound() ? rocksdb::Status::OK() : s; std::string value; - auto batch = storage_->GetWriteBatchBase(); + auto batch = storage_->GetWriteBatchBase(ctx); WriteBatchLogData log_data(kRedisSet); s = batch->PutLogData(log_data.Encode()); if (!s.ok()) return s; @@ -219,7 +219,7 @@ rocksdb::Status Set::Take(engine::Context &ctx, const Slice &user_key, std::vect rocksdb::Status s = GetMetadata(ctx, ns_key, &metadata); if (!s.ok()) return s.IsNotFound() ? rocksdb::Status::OK() : s; - ObserverOrUniquePtr batch = storage_->GetWriteBatchBase(); + ObserverOrUniquePtr batch = storage_->GetWriteBatchBase(ctx); if (pop) { WriteBatchLogData log_data(kRedisSet); s = batch->PutLogData(log_data.Encode()); diff --git a/src/types/redis_sortedint.cc b/src/types/redis_sortedint.cc index f29ca7de616..4ada41f46a3 100644 --- a/src/types/redis_sortedint.cc +++ b/src/types/redis_sortedint.cc @@ -43,7 +43,7 @@ rocksdb::Status Sortedint::Add(engine::Context &ctx, const Slice &user_key, cons if (!s.ok() && !s.IsNotFound()) return s; std::string value; - auto batch = storage_->GetWriteBatchBase(); + auto batch = storage_->GetWriteBatchBase(ctx); WriteBatchLogData log_data(kRedisSortedint); s = batch->PutLogData(log_data.Encode()); if (!s.ok()) return s; @@ -79,7 +79,7 @@ rocksdb::Status Sortedint::Remove(engine::Context &ctx, const Slice &user_key, c if (!s.ok()) return s.IsNotFound() ? rocksdb::Status::OK() : s; std::string value; - auto batch = storage_->GetWriteBatchBase(); + auto batch = storage_->GetWriteBatchBase(ctx); WriteBatchLogData log_data(kRedisSortedint); s = batch->PutLogData(log_data.Encode()); if (!s.ok()) return s; diff --git a/src/types/redis_stream.cc b/src/types/redis_stream.cc index df9284c2db5..7f3031c87b9 100644 --- a/src/types/redis_stream.cc +++ b/src/types/redis_stream.cc @@ -108,7 +108,7 @@ rocksdb::Status Stream::Add(engine::Context &ctx, const Slice &stream_name, cons auto status = options.next_id_strategy->GenerateID(metadata.last_generated_id, &next_entry_id); if (!status.IsOK()) return rocksdb::Status::InvalidArgument(status.Msg()); - auto batch = storage_->GetWriteBatchBase(); + auto batch = storage_->GetWriteBatchBase(ctx); WriteBatchLogData log_data(kRedisStream); s = batch->PutLogData(log_data.Encode()); if (!s.ok()) return s; @@ -345,7 +345,7 @@ rocksdb::Status Stream::DeletePelEntries(engine::Context &ctx, const Slice &stre return s.IsNotFound() ? rocksdb::Status::OK() : s; } - auto batch = storage_->GetWriteBatchBase(); + auto batch = storage_->GetWriteBatchBase(ctx); WriteBatchLogData log_data(kRedisStream); s = batch->PutLogData(log_data.Encode()); if (!s.ok()) return s; @@ -431,7 +431,7 @@ rocksdb::Status Stream::ClaimPelEntries(engine::Context &ctx, const Slice &strea consumer_metadata.last_attempted_interaction_ms = now; consumer_metadata.last_successful_interaction_ms = now; - auto batch = storage_->GetWriteBatchBase(); + auto batch = storage_->GetWriteBatchBase(ctx); WriteBatchLogData log_data(kRedisStream); s = batch->PutLogData(log_data.Encode()); if (!s.ok()) return s; @@ -574,7 +574,7 @@ rocksdb::Status Stream::AutoClaim(engine::Context &ctx, const Slice &stream_name std::vector deleted_entries; std::vector pending_entries; - auto batch = storage_->GetWriteBatchBase(); + auto batch = storage_->GetWriteBatchBase(ctx); WriteBatchLogData log_data(kRedisStream); s = batch->PutLogData(log_data.Encode()); if (!s.ok()) return s; @@ -712,7 +712,7 @@ rocksdb::Status Stream::CreateGroup(engine::Context &ctx, const Slice &stream_na std::string entry_key = internalKeyFromGroupName(ns_key, metadata, group_name); std::string entry_value = encodeStreamConsumerGroupMetadataValue(consumer_group_metadata); - auto batch = storage_->GetWriteBatchBase(); + auto batch = storage_->GetWriteBatchBase(ctx); WriteBatchLogData log_data(kRedisStream); s = batch->PutLogData(log_data.Encode()); if (!s.ok()) return s; @@ -751,7 +751,7 @@ rocksdb::Status Stream::DestroyGroup(engine::Context &ctx, const Slice &stream_n return rocksdb::Status::InvalidArgument(errXGroupSubcommandRequiresKeyExist); } - auto batch = storage_->GetWriteBatchBase(); + auto batch = storage_->GetWriteBatchBase(ctx); WriteBatchLogData log_data(kRedisStream); s = batch->PutLogData(log_data.Encode()); if (!s.ok()) return s; @@ -824,7 +824,7 @@ rocksdb::Status Stream::createConsumerWithoutLock(engine::Context &ctx, const Sl return s; } - auto batch = storage_->GetWriteBatchBase(); + auto batch = storage_->GetWriteBatchBase(ctx); WriteBatchLogData log_data(kRedisStream); s = batch->PutLogData(log_data.Encode()); if (!s.ok()) return s; @@ -878,7 +878,7 @@ rocksdb::Status Stream::DestroyConsumer(engine::Context &ctx, const Slice &strea StreamConsumerMetadata consumer_metadata = decodeStreamConsumerMetadataValue(get_consumer_value); deleted_pel = consumer_metadata.pending_number; - auto batch = storage_->GetWriteBatchBase(); + auto batch = storage_->GetWriteBatchBase(ctx); WriteBatchLogData log_data(kRedisStream); s = batch->PutLogData(log_data.Encode()); if (!s.ok()) return s; @@ -945,7 +945,7 @@ rocksdb::Status Stream::GroupSetId(engine::Context &ctx, const Slice &stream_nam consumer_group_metadata.entries_read = options.entries_read; std::string entry_value = encodeStreamConsumerGroupMetadataValue(consumer_group_metadata); - auto batch = storage_->GetWriteBatchBase(); + auto batch = storage_->GetWriteBatchBase(ctx); WriteBatchLogData log_data(kRedisStream); s = batch->PutLogData(log_data.Encode()); if (!s.ok()) return s; @@ -966,7 +966,7 @@ rocksdb::Status Stream::DeleteEntries(engine::Context &ctx, const Slice &stream_ return s.IsNotFound() ? rocksdb::Status::OK() : s; } - auto batch = storage_->GetWriteBatchBase(); + auto batch = storage_->GetWriteBatchBase(ctx); WriteBatchLogData log_data(kRedisStream); s = batch->PutLogData(log_data.Encode()); if (!s.ok()) return s; @@ -1472,7 +1472,7 @@ rocksdb::Status Stream::RangeWithPending(engine::Context &ctx, const Slice &stre s = storage_->Get(ctx, ctx.GetReadOptions(), stream_cf_handle_, group_key, &get_group_value); } - auto batch = storage_->GetWriteBatchBase(); + auto batch = storage_->GetWriteBatchBase(ctx); WriteBatchLogData log_data(kRedisStream); s = batch->PutLogData(log_data.Encode()); if (!s.ok()) return s; @@ -1580,7 +1580,7 @@ rocksdb::Status Stream::Trim(engine::Context &ctx, const Slice &stream_name, con return s.IsNotFound() ? rocksdb::Status::OK() : s; } - auto batch = storage_->GetWriteBatchBase(); + auto batch = storage_->GetWriteBatchBase(ctx); WriteBatchLogData log_data(kRedisStream); s = batch->PutLogData(log_data.Encode()); if (!s.ok()) return s; @@ -1721,7 +1721,7 @@ rocksdb::Status Stream::SetId(engine::Context &ctx, const Slice &stream_name, co metadata.max_deleted_entry_id = *max_deleted_id; } - auto batch = storage_->GetWriteBatchBase(); + auto batch = storage_->GetWriteBatchBase(ctx); WriteBatchLogData log_data(kRedisStream, {"XSETID"}); s = batch->PutLogData(log_data.Encode()); if (!s.ok()) return s; diff --git a/src/types/redis_string.cc b/src/types/redis_string.cc index a3d27492e0f..30878803063 100644 --- a/src/types/redis_string.cc +++ b/src/types/redis_string.cc @@ -106,7 +106,7 @@ std::vector String::getValues(engine::Context &ctx, const std:: } rocksdb::Status String::updateRawValue(engine::Context &ctx, const std::string &ns_key, const std::string &raw_value) { - auto batch = storage_->GetWriteBatchBase(); + auto batch = storage_->GetWriteBatchBase(ctx); WriteBatchLogData log_data(kRedisString); auto s = batch->PutLogData(log_data.Encode()); if (!s.ok()) return s; @@ -170,7 +170,7 @@ rocksdb::Status String::GetEx(engine::Context &ctx, const std::string &user_key, } metadata.Encode(&raw_data); raw_data.append(value->data(), value->size()); - auto batch = storage_->GetWriteBatchBase(); + auto batch = storage_->GetWriteBatchBase(ctx); WriteBatchLogData log_data(kRedisString); s = batch->PutLogData(log_data.Encode()); if (!s.ok()) return s; @@ -394,7 +394,7 @@ rocksdb::Status String::IncrByFloat(engine::Context &ctx, const std::string &use } rocksdb::Status String::MSet(engine::Context &ctx, const std::vector &pairs, uint64_t expire_ms) { - auto batch = storage_->GetWriteBatchBase(); + auto batch = storage_->GetWriteBatchBase(ctx); WriteBatchLogData log_data(kRedisString); auto s = batch->PutLogData(log_data.Encode()); if (!s.ok()) return s; diff --git a/src/types/redis_tdigest.cc b/src/types/redis_tdigest.cc index a5a278b18f9..8c5a4e09f36 100644 --- a/src/types/redis_tdigest.cc +++ b/src/types/redis_tdigest.cc @@ -130,7 +130,7 @@ rocksdb::Status TDigest::Create(engine::Context& ctx, const Slice& digest_name, return status; } - auto batch = storage_->GetWriteBatchBase(); + auto batch = storage_->GetWriteBatchBase(ctx); WriteBatchLogData log_data(kRedisTDigest); if (status = batch->PutLogData(log_data.Encode()); !status.ok()) { return status; @@ -153,7 +153,7 @@ rocksdb::Status TDigest::Add(engine::Context& ctx, const Slice& digest_name, con return status; } - auto batch = storage_->GetWriteBatchBase(); + auto batch = storage_->GetWriteBatchBase(ctx); WriteBatchLogData log_data(kRedisTDigest); if (auto status = batch->PutLogData(log_data.Encode()); !status.ok()) { return status; @@ -197,7 +197,7 @@ rocksdb::Status TDigest::Quantile(engine::Context& ctx, const Slice& digest_name } if (metadata.unmerged_nodes > 0) { - auto batch = storage_->GetWriteBatchBase(); + auto batch = storage_->GetWriteBatchBase(ctx); WriteBatchLogData log_data(kRedisTDigest); if (auto status = batch->PutLogData(log_data.Encode()); !status.ok()) { return status; @@ -246,7 +246,7 @@ rocksdb::Status TDigest::Reset(engine::Context& ctx, const Slice& digest_name) { return status; } - auto batch = storage_->GetWriteBatchBase(); + auto batch = storage_->GetWriteBatchBase(ctx); WriteBatchLogData log_data(kRedisTDigest); if (auto status = batch->PutLogData(log_data.Encode()); !status.ok()) { return status; diff --git a/src/types/redis_zset.cc b/src/types/redis_zset.cc index bdc0736429f..195cf7158a0 100644 --- a/src/types/redis_zset.cc +++ b/src/types/redis_zset.cc @@ -48,7 +48,7 @@ rocksdb::Status ZSet::Add(engine::Context &ctx, const Slice &user_key, ZAddFlags int added = 0; int changed = 0; - auto batch = storage_->GetWriteBatchBase(); + auto batch = storage_->GetWriteBatchBase(ctx); WriteBatchLogData log_data(kRedisZSet); s = batch->PutLogData(log_data.Encode()); if (!s.ok()) return s; @@ -172,7 +172,7 @@ rocksdb::Status ZSet::Pop(engine::Context &ctx, const Slice &user_key, int count std::string next_version_prefix_key = InternalKey(ns_key, "", metadata.version + 1, storage_->IsSlotIdEncoded()).Encode(); - auto batch = storage_->GetWriteBatchBase(); + auto batch = storage_->GetWriteBatchBase(ctx); WriteBatchLogData log_data(kRedisZSet); s = batch->PutLogData(log_data.Encode()); if (!s.ok()) return s; @@ -252,7 +252,7 @@ rocksdb::Status ZSet::RangeByRank(engine::Context &ctx, const Slice &user_key, c rocksdb::Slice lower_bound(prefix_key); read_options.iterate_lower_bound = &lower_bound; - auto batch = storage_->GetWriteBatchBase(); + auto batch = storage_->GetWriteBatchBase(ctx); WriteBatchLogData log_data(kRedisZSet); s = batch->PutLogData(log_data.Encode()); if (!s.ok()) return s; @@ -362,7 +362,7 @@ rocksdb::Status ZSet::RangeByScore(engine::Context &ctx, const Slice &user_key, int pos = 0; auto iter = util::UniqueIterator(ctx, read_options, score_cf_handle_); - auto batch = storage_->GetWriteBatchBase(); + auto batch = storage_->GetWriteBatchBase(ctx); WriteBatchLogData log_data(kRedisZSet); s = batch->PutLogData(log_data.Encode()); if (!s.ok()) return s; @@ -444,7 +444,7 @@ rocksdb::Status ZSet::RangeByLex(engine::Context &ctx, const Slice &user_key, co int pos = 0; auto iter = util::UniqueIterator(ctx, read_options); - auto batch = storage_->GetWriteBatchBase(); + auto batch = storage_->GetWriteBatchBase(ctx); WriteBatchLogData log_data(kRedisZSet); s = batch->PutLogData(log_data.Encode()); if (!s.ok()) return s; @@ -523,7 +523,7 @@ rocksdb::Status ZSet::Remove(engine::Context &ctx, const Slice &user_key, const rocksdb::Status s = GetMetadata(ctx, ns_key, &metadata); if (!s.ok()) return s.IsNotFound() ? rocksdb::Status::OK() : s; - auto batch = storage_->GetWriteBatchBase(); + auto batch = storage_->GetWriteBatchBase(ctx); WriteBatchLogData log_data(kRedisZSet); s = batch->PutLogData(log_data.Encode()); if (!s.ok()) return s; @@ -614,7 +614,7 @@ rocksdb::Status ZSet::Overwrite(engine::Context &ctx, const Slice &user_key, con std::string ns_key = AppendNamespacePrefix(user_key); ZSetMetadata metadata; - auto batch = storage_->GetWriteBatchBase(); + auto batch = storage_->GetWriteBatchBase(ctx); WriteBatchLogData log_data(kRedisZSet); auto s = batch->PutLogData(log_data.Encode()); if (!s.ok()) return s; diff --git a/tests/cppunit/hnsw_index_test.cc b/tests/cppunit/hnsw_index_test.cc index 022f2a73880..0ba59f8d15d 100644 --- a/tests/cppunit/hnsw_index_test.cc +++ b/tests/cppunit/hnsw_index_test.cc @@ -42,7 +42,7 @@ auto GetVectorKeys(const std::vector& keys_by_dist) -> s void InsertEntryIntoHnswIndex(engine::Context& ctx, std::string_view key, const kqir::NumericArray& vector, uint16_t target_level, redis::HnswIndex* hnsw_index, engine::Storage* storage) { - auto batch = storage->GetWriteBatchBase(); + auto batch = storage->GetWriteBatchBase(ctx); auto s = hnsw_index->InsertVectorEntryInternal(ctx, key, vector, batch, target_level); ASSERT_TRUE(s.IsOK()); auto status = storage->Write(ctx, storage->DefaultWriteOptions(), batch->GetWriteBatch()); @@ -185,7 +185,7 @@ TEST_F(HnswIndexTest, DecodeNodesToVectorItems) { redis::HnswNodeFieldMetadata metadata2(0, {4, 5, 6}); redis::HnswNodeFieldMetadata metadata3(0, {7, 8, 9}); - auto batch = storage_->GetWriteBatchBase(); + auto batch = storage_->GetWriteBatchBase(ctx); auto s = node1.PutMetadata(&metadata1, hnsw_index->search_key, hnsw_index->storage, batch.Get()); ASSERT_TRUE(s.IsOK()); s = node2.PutMetadata(&metadata2, hnsw_index->search_key, hnsw_index->storage, batch.Get()); @@ -293,7 +293,7 @@ TEST_F(HnswIndexTest, SearchLayer) { redis::HnswNodeFieldMetadata metadata5(0, {6.0, 6.0, 7.0}); // Add Nodes - auto batch = storage_->GetWriteBatchBase(); + auto batch = storage_->GetWriteBatchBase(ctx); auto put_meta_data_status = node1.PutMetadata(&metadata1, hnsw_index->search_key, hnsw_index->storage, batch.Get()); ASSERT_TRUE(put_meta_data_status.IsOK()); put_meta_data_status = node2.PutMetadata(&metadata2, hnsw_index->search_key, hnsw_index->storage, batch.Get()); @@ -309,7 +309,7 @@ TEST_F(HnswIndexTest, SearchLayer) { ASSERT_TRUE(s.ok()); // Add Neighbours - batch = storage_->GetWriteBatchBase(); + batch = storage_->GetWriteBatchBase(ctx); auto s1 = node1.AddNeighbour(ctx, "node2", hnsw_index->search_key, batch.Get()); ASSERT_TRUE(s1.IsOK()); auto s2 = node1.AddNeighbour(ctx, "node4", hnsw_index->search_key, batch.Get()); @@ -472,7 +472,7 @@ TEST_F(HnswIndexTest, InsertAndDeleteVectorEntry) { VerifyNodeMetadataAndNeighbours(ctx, &node5_layer0, hnsw_index.get(), {"n1", "n2", "n3", "n4"}); // Delete n2 - auto batch = storage_->GetWriteBatchBase(); + auto batch = storage_->GetWriteBatchBase(ctx); auto s2 = hnsw_index->DeleteVectorEntry(ctx, key2, batch); ASSERT_TRUE(s2.IsOK()); s = storage_->Write(ctx, storage_->DefaultWriteOptions(), batch->GetWriteBatch()); diff --git a/tests/cppunit/hnsw_node_test.cc b/tests/cppunit/hnsw_node_test.cc index 46874166387..c1cb15a841e 100644 --- a/tests/cppunit/hnsw_node_test.cc +++ b/tests/cppunit/hnsw_node_test.cc @@ -52,7 +52,7 @@ TEST_F(NodeTest, PutAndDecodeMetadata) { redis::HnswNodeFieldMetadata metadata2(0, {4, 5, 6}); redis::HnswNodeFieldMetadata metadata3(0, {7, 8, 9}); - auto batch = storage_->GetWriteBatchBase(); + auto batch = storage_->GetWriteBatchBase(ctx); auto s = node1.PutMetadata(&metadata1, search_key, storage_.get(), batch.Get()); ASSERT_TRUE(s.IsOK()); s = node2.PutMetadata(&metadata2, search_key, storage_.get(), batch.Get()); @@ -79,7 +79,7 @@ TEST_F(NodeTest, PutAndDecodeMetadata) { ASSERT_EQ(decoded_metadata3.GetValue().vector, std::vector({7, 8, 9})); // Prepare edges between node1 and node2 - batch = storage_->GetWriteBatchBase(); + batch = storage_->GetWriteBatchBase(ctx); auto edge1 = search_key.ConstructHnswEdge(layer, "node1", "node2"); auto edge2 = search_key.ConstructHnswEdge(layer, "node2", "node1"); auto edge3 = search_key.ConstructHnswEdge(layer, "node2", "node3"); @@ -124,7 +124,7 @@ TEST_F(NodeTest, ModifyNeighbours) { redis::HnswNodeFieldMetadata metadata4(0, {10, 11, 12}); // Add Nodes - auto batch1 = storage_->GetWriteBatchBase(); + auto batch1 = storage_->GetWriteBatchBase(ctx); auto put_meta_data = node1.PutMetadata(&metadata1, search_key, storage_.get(), batch1.Get()); ASSERT_TRUE(put_meta_data.IsOK()); put_meta_data = node2.PutMetadata(&metadata2, search_key, storage_.get(), batch1.Get()); @@ -138,7 +138,7 @@ TEST_F(NodeTest, ModifyNeighbours) { ASSERT_TRUE(s.ok()); // Add Edges - auto batch2 = storage_->GetWriteBatchBase(); + auto batch2 = storage_->GetWriteBatchBase(ctx); auto s1 = node1.AddNeighbour(ctx, "node2", search_key, batch2.Get()); ASSERT_TRUE(s1.IsOK()); auto s2 = node2.AddNeighbour(ctx, "node1", search_key, batch2.Get()); @@ -165,7 +165,7 @@ TEST_F(NodeTest, ModifyNeighbours) { EXPECT_EQ(node3.neighbours[0], "node2"); // Remove Edges - auto batch3 = storage_->GetWriteBatchBase(); + auto batch3 = storage_->GetWriteBatchBase(ctx); auto s5 = node2.RemoveNeighbour(ctx, "node3", search_key, batch3.Get()); ASSERT_TRUE(s5.IsOK());