diff --git a/ydb/core/persqueue/pqtablet/pq_impl.cpp b/ydb/core/persqueue/pqtablet/pq_impl.cpp index 1a851be4b1ca..c67d4ee819d3 100644 --- a/ydb/core/persqueue/pqtablet/pq_impl.cpp +++ b/ydb/core/persqueue/pqtablet/pq_impl.cpp @@ -2756,12 +2756,11 @@ void TPersQueue::HandleEventForSupportivePartition(const ui64 responseCookie, const TWriteId writeId = GetWriteId(req); ui32 originalPartitionId = req.GetPartition(); - - if (writeId.KafkaApiTransaction && TxWrites.contains(writeId) && !TxWrites.at(writeId).Partitions.contains(originalPartitionId)) { + if (writeId.IsKafkaApiTransaction() && TxWrites.contains(writeId) && TxWrites.at(writeId).Deleting) { // This branch happens when previous Kafka transaction has committed and we recieve write for next one // after PQ has deleted supportive partition and before it has deleted writeId from TxWrites (tx has not transaitioned to DELETED state) PQ_LOG_D("GetOwnership request for the next Kafka transaction while previous is being deleted. Saving it till the complete delete of the previous tx.%01"); - KafkaNextTransactionRequests[writeId.KafkaProducerInstanceId] = event; + KafkaNextTransactionRequests[writeId.KafkaProducerInstanceId].push_back(event); return; } else if (TxWrites.contains(writeId) && TxWrites.at(writeId).Partitions.contains(originalPartitionId)) { // @@ -2782,7 +2781,7 @@ void TPersQueue::HandleEventForSupportivePartition(const ui64 responseCookie, // This branch happens when previous Kafka transaction has committed and we recieve write for next one // before PQ has deleted supportive partition for previous transaction PQ_LOG_D("GetOwnership request for the next Kafka transaction while previous is being deleted. Saving it till the complete delete of the previous tx.%02"); - KafkaNextTransactionRequests[writeId.KafkaProducerInstanceId] = event; + KafkaNextTransactionRequests[writeId.KafkaProducerInstanceId].push_back(event); return; } @@ -3327,7 +3326,9 @@ void TPersQueue::TryContinueKafkaWrites(const TMaybe writeId, const TA if (writeId.Defined() && writeId->IsKafkaApiTransaction()) { auto it = KafkaNextTransactionRequests.find(writeId->KafkaProducerInstanceId); if (it != KafkaNextTransactionRequests.end()) { - Handle(it->second, ctx); + for (auto& request : it->second) { + Handle(request, ctx); + } KafkaNextTransactionRequests.erase(it); } } diff --git a/ydb/core/persqueue/pqtablet/pq_impl.h b/ydb/core/persqueue/pqtablet/pq_impl.h index 027b75684b62..3f3603006879 100644 --- a/ydb/core/persqueue/pqtablet/pq_impl.h +++ b/ydb/core/persqueue/pqtablet/pq_impl.h @@ -302,7 +302,7 @@ class TPersQueue : public NKeyValue::TKeyValueFlat { But we know for sure that all writes coming after the commit of the kafka transaction refer to the next transaction. That's why we queue them here till previous transaction is completely deleted (all supportive partitions are deleted and writeId is erased from TxWrites). */ - THashMap KafkaNextTransactionRequests; + THashMap, NKafka::TProducerInstanceIdHashFn> KafkaNextTransactionRequests; // PLANNED -> CALCULATING -> CALCULATED -> WAIT_RS -> EXECUTING -> EXECUTED THashMap> TxsOrder; diff --git a/ydb/core/persqueue/ut/pqtablet_ut.cpp b/ydb/core/persqueue/ut/pqtablet_ut.cpp index 3712da1269de..0e583583c212 100644 --- a/ydb/core/persqueue/ut/pqtablet_ut.cpp +++ b/ydb/core/persqueue/ut/pqtablet_ut.cpp @@ -259,9 +259,9 @@ class TPQTabletFixture : public NUnitTest::TBaseFixture { void WaitWriteResponse(const TWriteResponseMatcher& matcher); // returns owner cookie for this supportive partition - TString CreateSupportivePartitionForKafka(const NKafka::TProducerInstanceId& producerInstanceId); - void SendKafkaTxnWriteRequest(const NKafka::TProducerInstanceId& producerInstanceId, const TString& ownerCookie); - void CommitKafkaTransaction(NKafka::TProducerInstanceId producerInstanceId, ui64 txId); + TString CreateSupportivePartitionForKafka(const NKafka::TProducerInstanceId& producerInstanceId, const ui32 partitionId = 0); + void SendKafkaTxnWriteRequest(const NKafka::TProducerInstanceId& producerInstanceId, const TString& ownerCookie, const ui32 partitionId = 0); + void CommitKafkaTransaction(NKafka::TProducerInstanceId producerInstanceId, ui64 txId, const std::vector& partitionIds = {0}); std::unique_ptr MakeGetOwnershipRequest(const TGetOwnershipRequestParams& params, const TActorId& pipe) const; @@ -290,7 +290,9 @@ class TPQTabletFixture : public NUnitTest::TBaseFixture { void TestSendingTEvReadSetViaApp(const TSendReadSetViaAppTestParams& params); template - void AddOneTimeEventObserver(bool& seenEvent, std::function&)> callback = [](){return TTestActorRuntimeBase::EEventAction::PROCESS;}); + void AddOneTimeEventObserver(bool& seenEvent, + ui32 unseenEventCount, + std::function&)> callback = [](){return TTestActorRuntimeBase::EEventAction::PROCESS;}); void ExpectNoExclusiveLockAcquired(); void ExpectNoReadQuotaAcquired(); @@ -835,10 +837,11 @@ void TPQTabletFixture::SendWriteRequest(const TWriteRequestParams& params) event.Release()); } -TString TPQTabletFixture::CreateSupportivePartitionForKafka(const NKafka::TProducerInstanceId& producerInstanceId) { +TString TPQTabletFixture::CreateSupportivePartitionForKafka(const NKafka::TProducerInstanceId& producerInstanceId, + const ui32 partitionId) { EnsurePipeExist(); - auto request = MakeGetOwnershipRequest({.Partition=0, + auto request = MakeGetOwnershipRequest({.Partition=partitionId, .WriteId=TWriteId{producerInstanceId}, .NeedSupportivePartition=true, .Owner=DEFAULT_OWNER, @@ -851,11 +854,11 @@ TString TPQTabletFixture::CreateSupportivePartitionForKafka(const NKafka::TProdu return WaitGetOwnershipResponse({.Cookie=4, .Status=NMsgBusProxy::MSTATUS_OK}); } -void TPQTabletFixture::SendKafkaTxnWriteRequest(const NKafka::TProducerInstanceId& producerInstanceId, const TString& ownerCookie) { +void TPQTabletFixture::SendKafkaTxnWriteRequest(const NKafka::TProducerInstanceId& producerInstanceId, const TString& ownerCookie, const ui32 partitionId) { auto event = MakeHolder(); auto* request = event->Record.MutablePartitionRequest(); request->SetTopic("/topic"); - request->SetPartition(0); + request->SetPartition(partitionId); request->SetCookie(123); request->SetOwnerCookie(ownerCookie); request->SetMessageNo(0); @@ -889,14 +892,16 @@ void TPQTabletFixture::SendKafkaTxnWriteRequest(const NKafka::TProducerInstanceI UNIT_ASSERT_VALUES_EQUAL(123, response->Record.GetPartitionResponse().GetCookie()); } -void TPQTabletFixture::CommitKafkaTransaction(NKafka::TProducerInstanceId producerInstanceId, ui64 txId) { - SendProposeTransactionRequest({.TxId=txId, - .Senders={Ctx->TabletId}, .Receivers={Ctx->TabletId}, - .TxOps={ - {.Partition=0, .Path="/topic", .KafkaTransaction=true}, - }, - .WriteId=TWriteId(producerInstanceId) - }); +void TPQTabletFixture::CommitKafkaTransaction(NKafka::TProducerInstanceId producerInstanceId, ui64 txId, const std::vector& partitionIds) { + TProposeTransactionParams params; + params.TxId = txId; + params.Senders = {Ctx->TabletId}; + params.Receivers = {Ctx->TabletId}; + params.WriteId = TWriteId(producerInstanceId); + for (const ui32& partitionId : partitionIds) { + params.TxOps.push_back({.Partition=partitionId, .Path="/topic", .KafkaTransaction=true}); + } + SendProposeTransactionRequest(params); WaitProposeTransactionResponse({.TxId=txId, .Status=NKikimrPQ::TEvProposeTransactionResult::PREPARED}); SendPlanStep({.Step=100, .TxIds={txId}}); @@ -1419,10 +1424,13 @@ void TPQTabletFixture::WaitForAppSendRsResponse(const TAppSendReadSetMatcher& ma } template -void TPQTabletFixture::AddOneTimeEventObserver(bool& seenEvent, std::function&)> callback) { - auto observer = [&](TAutoPtr& input) { +void TPQTabletFixture::AddOneTimeEventObserver(bool& seenEvent, ui32 unseenEventCount, std::function&)> callback) { + auto observer = [&seenEvent, unseenEventCount, callback](TAutoPtr& input) mutable { if (!seenEvent && input->CastAsLocal()) { - seenEvent = true; + unseenEventCount--; + if (unseenEventCount == 0) { + seenEvent = true; + } return callback(input); } @@ -2571,8 +2579,9 @@ Y_UNIT_TEST_F(Kafka_Transaction_Incoming_Before_Previous_TEvDeletePartitionDone_ TAutoPtr deleteDoneEvent; bool seenEvent = false; + ui32 unseenEventCount = 1; // add observer for TEvPQ::TEvDeletePartitionDone request and skip it - AddOneTimeEventObserver(seenEvent, [&deleteDoneEvent](TAutoPtr& eventHandle) { + AddOneTimeEventObserver(seenEvent, unseenEventCount, [&deleteDoneEvent](TAutoPtr& eventHandle) { deleteDoneEvent = eventHandle->Release(); return TTestActorRuntimeBase::EEventAction::DROP; }); @@ -2597,6 +2606,7 @@ Y_UNIT_TEST_F(Kafka_Transaction_Incoming_Before_Previous_TEvDeletePartitionDone_ 0, 0); WaitForTheTransactionToBeDeleted(txId); + // check that information about a transaction with this WriteId has been renewed on disk auto txInfo = GetTxWritesFromKV(); UNIT_ASSERT_EQUAL(txInfo.TxWritesSize(), 1); UNIT_ASSERT_VALUES_EQUAL(txInfo.GetTxWrites(0).GetWriteId().GetKafkaProducerInstanceId().GetId(), producerInstanceId.Id); @@ -2605,6 +2615,85 @@ Y_UNIT_TEST_F(Kafka_Transaction_Incoming_Before_Previous_TEvDeletePartitionDone_ UNIT_ASSERT_VALUES_UNEQUAL(ownerCookie2, ownerCookie); } +Y_UNIT_TEST_F(Kafka_Transaction_Several_Partitions_One_Tablet_Deleting_State, TPQTabletFixture) { + NKafka::TProducerInstanceId producerInstanceId = {1, 0}; + const ui64 txId = 67890; + PQTabletPrepare({.partitions=2}, {}, *Ctx); + EnsurePipeExist(); + + TString ownerCookie1 = CreateSupportivePartitionForKafka(producerInstanceId, 0); + TString ownerCookie2 = CreateSupportivePartitionForKafka(producerInstanceId, 1); + + UNIT_ASSERT_VALUES_UNEQUAL(ownerCookie1, ownerCookie2); + + SendKafkaTxnWriteRequest(producerInstanceId, ownerCookie1, 0); + SendKafkaTxnWriteRequest(producerInstanceId, ownerCookie2, 1); + + const NKikimrPQ::TTabletTxInfo& txInfo1 = WaitForExactTxWritesCount(2); + ui32 firstSupportivePartitionId = txInfo1.GetTxWrites(0).GetInternalPartitionId(); + ui32 secondSupportivePartitionId = txInfo1.GetTxWrites(1).GetInternalPartitionId(); + + std::vector> deleteDoneEvents; + bool seenEvent = false; + // add observer for TEvPQ::TEvDeletePartitionDone requests and skip it + AddOneTimeEventObserver(seenEvent, 2, [&deleteDoneEvents](TAutoPtr& eventHandle) { + deleteDoneEvents.push_back(eventHandle->Release()); + return TTestActorRuntimeBase::EEventAction::DROP; + }); + + CommitKafkaTransaction(producerInstanceId, txId, {0, 1}); + + // wait for delete responses and save them + TDispatchOptions options; + options.CustomFinalCondition = [&seenEvent]() {return seenEvent;}; + UNIT_ASSERT(Ctx->Runtime->DispatchEvents(options)); + + // send another GetOwnership request to enforce new suportive partition creation (it imitates new transaction start for same proudcer epoch) + SendGetOwnershipRequest({.Partition=0, + .WriteId=TWriteId{producerInstanceId}, + .NeedSupportivePartition=true, + .Owner=DEFAULT_OWNER, + .Cookie=5}); + // now we can eventually send TEvPQ::TEvDeletePartitionDone responses + for (size_t i = 0; i < deleteDoneEvents.size(); i++) { + Ctx->Runtime->SendToPipe(Pipe, + Ctx->Edge, + deleteDoneEvents[i].Release(), + 0, i); + } + + WaitForTheTransactionToBeDeleted(txId); + + // check that information about a transaction with this WriteId has been renewed on disk + auto txInfo2 = GetTxWritesFromKV(); + UNIT_ASSERT_EQUAL(txInfo2.TxWritesSize(), 1); + UNIT_ASSERT_VALUES_EQUAL(txInfo2.GetTxWrites(0).GetWriteId().GetKafkaProducerInstanceId().GetId(), producerInstanceId.Id); + UNIT_ASSERT_UNEQUAL(txInfo2.GetTxWrites(0).GetInternalPartitionId(), firstSupportivePartitionId); + UNIT_ASSERT_UNEQUAL(txInfo2.GetTxWrites(0).GetInternalPartitionId(), secondSupportivePartitionId); + + TString ownerCookie3 = WaitGetOwnershipResponse({.Cookie=5, .Status=NMsgBusProxy::MSTATUS_OK}); + UNIT_ASSERT_VALUES_UNEQUAL(ownerCookie1, ownerCookie3); + UNIT_ASSERT_VALUES_UNEQUAL(ownerCookie2, ownerCookie3); +} + +Y_UNIT_TEST_F(Kafka_Transaction_Several_Partitions_One_Tablet_Successful_Commit, TPQTabletFixture) { + NKafka::TProducerInstanceId producerInstanceId = {1, 0}; + const ui64 txId = 67890; + PQTabletPrepare({.partitions=2}, {}, *Ctx); + EnsurePipeExist(); + + TString ownerCookie1 = CreateSupportivePartitionForKafka(producerInstanceId, 0); + TString ownerCookie2 = CreateSupportivePartitionForKafka(producerInstanceId, 1); + + UNIT_ASSERT_VALUES_UNEQUAL(ownerCookie1, ownerCookie2); + + SendKafkaTxnWriteRequest(producerInstanceId, ownerCookie1, 0); + SendKafkaTxnWriteRequest(producerInstanceId, ownerCookie2, 1); + + const NKikimrPQ::TTabletTxInfo& txInfo = WaitForExactTxWritesCount(2); + CommitKafkaTransaction(producerInstanceId, txId, {0, 1}); +} + Y_UNIT_TEST_F(Kafka_Transaction_Incoming_Before_Previous_Is_In_DELETED_State_Should_Be_Processed_After_Previous_Complete_Erasure, TPQTabletFixture) { NKafka::TProducerInstanceId producerInstanceId = {1, 0}; const ui64 txId = 67890;