From b215db36af73f33dcc5c53e66f176100fc7da6d7 Mon Sep 17 00:00:00 2001 From: Irina Skvortsova Date: Thu, 4 Sep 2025 10:01:05 +0000 Subject: [PATCH 1/3] tried to create test --- ydb/core/persqueue/ut/pqtablet_ut.cpp | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/ydb/core/persqueue/ut/pqtablet_ut.cpp b/ydb/core/persqueue/ut/pqtablet_ut.cpp index 1fcfcc39c47b..af113d662f9f 100644 --- a/ydb/core/persqueue/ut/pqtablet_ut.cpp +++ b/ydb/core/persqueue/ut/pqtablet_ut.cpp @@ -2605,6 +2605,22 @@ Y_UNIT_TEST_F(Kafka_Transaction_Incoming_Before_Previous_TEvDeletePartitionDone_ UNIT_ASSERT_VALUES_UNEQUAL(ownerCookie2, ownerCookie); } +Y_UNIT_TEST_F(Kafka_Transaction_Several_Partitions_One_Tablet, TPQTabletFixture) { + // NKafka::TProducerInstanceId producerInstanceId = {1, 0}; + const ui64 txId = 67890; + PQTabletPrepare({.partitions=2}, {}, *Ctx); + EnsurePipeExist(); + + SendProposeTransactionRequest({.TxId=txId, + .TxOps={ + {.Partition=0, .Consumer="user", .Begin=0, .End=0, .Path="/topic"}, + {.Partition=1, .Consumer="user", .Begin=0, .End=0, .Path="/topic"}, + }}); + WaitProposeTransactionResponse({.TxId=txId, + .Status=NKikimrPQ::TEvProposeTransactionResult::PREPARED}); + +} + Y_UNIT_TEST_F(Kafka_Transaction_Incoming_Before_Previous_Is_In_DELETED_State_Should_Be_Processed_After_Previous_Complete_Erasure, TPQTabletFixture) { NKafka::TProducerInstanceId producerInstanceId = {1, 0}; const ui64 txId = 67890; From d9ac8322a9d51b718dd1ebdd569b64311a155b75 Mon Sep 17 00:00:00 2001 From: Irina Skvortsova Date: Fri, 12 Sep 2025 15:35:34 +0000 Subject: [PATCH 2/3] add new field to TTxWriteInfo --- ydb/core/persqueue/pqtablet/pq_impl.cpp | 4 +- ydb/core/persqueue/pqtablet/pq_impl.h | 1 + ydb/core/persqueue/ut/pqtablet_ut.cpp | 99 +++++++++++++++++++------ 3 files changed, 78 insertions(+), 26 deletions(-) diff --git a/ydb/core/persqueue/pqtablet/pq_impl.cpp b/ydb/core/persqueue/pqtablet/pq_impl.cpp index 1a851be4b1ca..3c0b4a9bd1b3 100644 --- a/ydb/core/persqueue/pqtablet/pq_impl.cpp +++ b/ydb/core/persqueue/pqtablet/pq_impl.cpp @@ -2756,8 +2756,7 @@ void TPersQueue::HandleEventForSupportivePartition(const ui64 responseCookie, const TWriteId writeId = GetWriteId(req); ui32 originalPartitionId = req.GetPartition(); - - if (writeId.KafkaApiTransaction && TxWrites.contains(writeId) && !TxWrites.at(writeId).Partitions.contains(originalPartitionId)) { + if (writeId.KafkaApiTransaction && TxWrites.contains(writeId) && !TxWrites.at(writeId).Partitions.contains(originalPartitionId) && TxWrites.at(writeId).PartitionsProcessed.contains(originalPartitionId)) { // This branch happens when previous Kafka transaction has committed and we recieve write for next one // after PQ has deleted supportive partition and before it has deleted writeId from TxWrites (tx has not transaitioned to DELETED state) PQ_LOG_D("GetOwnership request for the next Kafka transaction while previous is being deleted. Saving it till the complete delete of the previous tx.%01"); @@ -2832,6 +2831,7 @@ void TPersQueue::HandleEventForSupportivePartition(const ui64 responseCookie, PQ_LOG_TX_I("partition " << partitionId << " for WriteId " << writeId); writeInfo.Partitions.emplace(originalPartitionId, partitionId); + writeInfo.PartitionsProcessed.emplace(originalPartitionId); TxWritesChanged = true; AddSupportivePartition(partitionId); diff --git a/ydb/core/persqueue/pqtablet/pq_impl.h b/ydb/core/persqueue/pqtablet/pq_impl.h index 027b75684b62..e91c2c2b46fd 100644 --- a/ydb/core/persqueue/pqtablet/pq_impl.h +++ b/ydb/core/persqueue/pqtablet/pq_impl.h @@ -211,6 +211,7 @@ class TPersQueue : public NKeyValue::TKeyValueFlat { bool Deleting = false; bool KafkaTransaction = false; TInstant CreatedAt; + TSet PartitionsProcessed; }; THashMap TxWrites; diff --git a/ydb/core/persqueue/ut/pqtablet_ut.cpp b/ydb/core/persqueue/ut/pqtablet_ut.cpp index 69f10d76c5d8..a408f5810ad9 100644 --- a/ydb/core/persqueue/ut/pqtablet_ut.cpp +++ b/ydb/core/persqueue/ut/pqtablet_ut.cpp @@ -259,9 +259,9 @@ class TPQTabletFixture : public NUnitTest::TBaseFixture { void WaitWriteResponse(const TWriteResponseMatcher& matcher); // returns owner cookie for this supportive partition - TString CreateSupportivePartitionForKafka(const NKafka::TProducerInstanceId& producerInstanceId); - void SendKafkaTxnWriteRequest(const NKafka::TProducerInstanceId& producerInstanceId, const TString& ownerCookie); - void CommitKafkaTransaction(NKafka::TProducerInstanceId producerInstanceId, ui64 txId); + TString CreateSupportivePartitionForKafka(const NKafka::TProducerInstanceId& producerInstanceId, ui32 partitionId = 0); + void SendKafkaTxnWriteRequest(const NKafka::TProducerInstanceId& producerInstanceId, const TString& ownerCookie, const ui32& partitionId = 0); + void CommitKafkaTransaction(NKafka::TProducerInstanceId producerInstanceId, ui64 txId, const std::vector& partitionIds = {0}); std::unique_ptr MakeGetOwnershipRequest(const TGetOwnershipRequestParams& params, const TActorId& pipe) const; @@ -835,10 +835,11 @@ void TPQTabletFixture::SendWriteRequest(const TWriteRequestParams& params) event.Release()); } -TString TPQTabletFixture::CreateSupportivePartitionForKafka(const NKafka::TProducerInstanceId& producerInstanceId) { +TString TPQTabletFixture::CreateSupportivePartitionForKafka(const NKafka::TProducerInstanceId& producerInstanceId, + ui32 partitionId) { EnsurePipeExist(); - auto request = MakeGetOwnershipRequest({.Partition=0, + auto request = MakeGetOwnershipRequest({.Partition=partitionId, .WriteId=TWriteId{producerInstanceId}, .NeedSupportivePartition=true, .Owner=DEFAULT_OWNER, @@ -851,11 +852,11 @@ TString TPQTabletFixture::CreateSupportivePartitionForKafka(const NKafka::TProdu return WaitGetOwnershipResponse({.Cookie=4, .Status=NMsgBusProxy::MSTATUS_OK}); } -void TPQTabletFixture::SendKafkaTxnWriteRequest(const NKafka::TProducerInstanceId& producerInstanceId, const TString& ownerCookie) { +void TPQTabletFixture::SendKafkaTxnWriteRequest(const NKafka::TProducerInstanceId& producerInstanceId, const TString& ownerCookie, const ui32& partitionId) { auto event = MakeHolder(); auto* request = event->Record.MutablePartitionRequest(); request->SetTopic("/topic"); - request->SetPartition(0); + request->SetPartition(partitionId); request->SetCookie(123); request->SetOwnerCookie(ownerCookie); request->SetMessageNo(0); @@ -889,14 +890,23 @@ void TPQTabletFixture::SendKafkaTxnWriteRequest(const NKafka::TProducerInstanceI UNIT_ASSERT_VALUES_EQUAL(123, response->Record.GetPartitionResponse().GetCookie()); } -void TPQTabletFixture::CommitKafkaTransaction(NKafka::TProducerInstanceId producerInstanceId, ui64 txId) { - SendProposeTransactionRequest({.TxId=txId, - .Senders={Ctx->TabletId}, .Receivers={Ctx->TabletId}, - .TxOps={ - {.Partition=0, .Path="/topic", .KafkaTransaction=true}, - }, - .WriteId=TWriteId(producerInstanceId) - }); +void TPQTabletFixture::CommitKafkaTransaction(NKafka::TProducerInstanceId producerInstanceId, ui64 txId, const std::vector& partitionIds) { + TProposeTransactionParams params; + params.TxId = txId; + params.Senders = {Ctx->TabletId}; + params.Receivers = {Ctx->TabletId}; + params.WriteId = TWriteId(producerInstanceId); + for (const ui32& partitionId : partitionIds) { + params.TxOps.push_back({.Partition=partitionId, .Path="/topic", .KafkaTransaction=true}); + } + // SendProposeTransactionRequest({.TxId=txId, + // .Senders={Ctx->TabletId}, .Receivers={Ctx->TabletId}, + // .TxOps={ + // {.Partition=0, .Path="/topic", .KafkaTransaction=true}, + // }, + // .WriteId=TWriteId(producerInstanceId) + // }); + SendProposeTransactionRequest(params); WaitProposeTransactionResponse({.TxId=txId, .Status=NKikimrPQ::TEvProposeTransactionResult::PREPARED}); SendPlanStep({.Step=100, .TxIds={txId}}); @@ -2606,19 +2616,60 @@ Y_UNIT_TEST_F(Kafka_Transaction_Incoming_Before_Previous_TEvDeletePartitionDone_ } Y_UNIT_TEST_F(Kafka_Transaction_Several_Partitions_One_Tablet, TPQTabletFixture) { - // NKafka::TProducerInstanceId producerInstanceId = {1, 0}; + NKafka::TProducerInstanceId producerInstanceId = {1, 0}; const ui64 txId = 67890; PQTabletPrepare({.partitions=2}, {}, *Ctx); EnsurePipeExist(); - SendProposeTransactionRequest({.TxId=txId, - .TxOps={ - {.Partition=0, .Consumer="user", .Begin=0, .End=0, .Path="/topic"}, - {.Partition=1, .Consumer="user", .Begin=0, .End=0, .Path="/topic"}, - }}); - WaitProposeTransactionResponse({.TxId=txId, - .Status=NKikimrPQ::TEvProposeTransactionResult::PREPARED}); - + TString ownerCookie1 = CreateSupportivePartitionForKafka(producerInstanceId, 0); + TString ownerCookie2 = CreateSupportivePartitionForKafka(producerInstanceId, 1); + + UNIT_ASSERT_VALUES_UNEQUAL(ownerCookie1, ownerCookie2); + + SendKafkaTxnWriteRequest(producerInstanceId, ownerCookie1, 0); + SendKafkaTxnWriteRequest(producerInstanceId, ownerCookie2, 1); + + const NKikimrPQ::TTabletTxInfo& txInfo = WaitForExactTxWritesCount(2); + ui32 firstSupportivePartitionId = txInfo.GetTxWrites(0).GetInternalPartitionId(); + ui32 secondSupportivePartitionId = txInfo.GetTxWrites(1).GetInternalPartitionId(); + Cerr << "Sup1Id: " << firstSupportivePartitionId << ", Sup2Id: " << secondSupportivePartitionId << Endl; + + TAutoPtr deleteDoneEvent; + bool seenEvent = false; + // add observer for TEvPQ::TEvDeletePartitionDone request and skip it + AddOneTimeEventObserver(seenEvent, [&deleteDoneEvent](TAutoPtr& eventHandle) { + deleteDoneEvent = eventHandle->Release(); + return TTestActorRuntimeBase::EEventAction::DROP; + }); + + CommitKafkaTransaction(producerInstanceId, txId, {0, 1}); + + // wait for delete response and save it + TDispatchOptions options; + options.CustomFinalCondition = [&seenEvent]() {return seenEvent;}; + UNIT_ASSERT(Ctx->Runtime->DispatchEvents(options)); + + // send another GetOwnership request to enforce new suportive partition creation (it imitates new transaction start for same proudcer epoch) + SendGetOwnershipRequest({.Partition=0, + .WriteId=TWriteId{producerInstanceId}, + .NeedSupportivePartition=true, + .Owner=DEFAULT_OWNER, + .Cookie=5}); + // now we can eventually send TEvPQ::TEvDeletePartitionDone + Ctx->Runtime->SendToPipe(Pipe, + Ctx->Edge, + deleteDoneEvent.Release(), + 0, 0); + WaitForTheTransactionToBeDeleted(txId); + + auto txInfo1 = GetTxWritesFromKV(); + UNIT_ASSERT_EQUAL(txInfo1.TxWritesSize(), 1); + UNIT_ASSERT_VALUES_EQUAL(txInfo1.GetTxWrites(0).GetWriteId().GetKafkaProducerInstanceId().GetId(), producerInstanceId.Id); + UNIT_ASSERT_VALUES_UNEQUAL(txInfo1.GetTxWrites(0).GetInternalPartitionId(), firstSupportivePartitionId); + TString ownerCookie3 = WaitGetOwnershipResponse({.Cookie=5, .Status=NMsgBusProxy::MSTATUS_OK}); + UNIT_ASSERT_VALUES_UNEQUAL(ownerCookie1, ownerCookie3); + UNIT_ASSERT_VALUES_UNEQUAL(ownerCookie2, ownerCookie3); + } Y_UNIT_TEST_F(Kafka_Transaction_Incoming_Before_Previous_Is_In_DELETED_State_Should_Be_Processed_After_Previous_Complete_Erasure, TPQTabletFixture) { From f2bd0bcd23a46070939f44c12e145bd490622460 Mon Sep 17 00:00:00 2001 From: Irina Skvortsova Date: Fri, 12 Sep 2025 15:37:01 +0000 Subject: [PATCH 3/3] remove comments --- ydb/core/persqueue/ut/pqtablet_ut.cpp | 7 ------- 1 file changed, 7 deletions(-) diff --git a/ydb/core/persqueue/ut/pqtablet_ut.cpp b/ydb/core/persqueue/ut/pqtablet_ut.cpp index a408f5810ad9..41d9dde4ed51 100644 --- a/ydb/core/persqueue/ut/pqtablet_ut.cpp +++ b/ydb/core/persqueue/ut/pqtablet_ut.cpp @@ -899,13 +899,6 @@ void TPQTabletFixture::CommitKafkaTransaction(NKafka::TProducerInstanceId produc for (const ui32& partitionId : partitionIds) { params.TxOps.push_back({.Partition=partitionId, .Path="/topic", .KafkaTransaction=true}); } - // SendProposeTransactionRequest({.TxId=txId, - // .Senders={Ctx->TabletId}, .Receivers={Ctx->TabletId}, - // .TxOps={ - // {.Partition=0, .Path="/topic", .KafkaTransaction=true}, - // }, - // .WriteId=TWriteId(producerInstanceId) - // }); SendProposeTransactionRequest(params); WaitProposeTransactionResponse({.TxId=txId, .Status=NKikimrPQ::TEvProposeTransactionResult::PREPARED});