Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions ydb/core/persqueue/pqtablet/pq_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2756,8 +2756,7 @@ void TPersQueue::HandleEventForSupportivePartition(const ui64 responseCookie,

const TWriteId writeId = GetWriteId(req);
ui32 originalPartitionId = req.GetPartition();

if (writeId.KafkaApiTransaction && TxWrites.contains(writeId) && !TxWrites.at(writeId).Partitions.contains(originalPartitionId)) {
if (writeId.KafkaApiTransaction && TxWrites.contains(writeId) && !TxWrites.at(writeId).Partitions.contains(originalPartitionId) && TxWrites.at(writeId).PartitionsProcessed.contains(originalPartitionId)) {
// This branch happens when previous Kafka transaction has committed and we recieve write for next one
// after PQ has deleted supportive partition and before it has deleted writeId from TxWrites (tx has not transaitioned to DELETED state)
PQ_LOG_D("GetOwnership request for the next Kafka transaction while previous is being deleted. Saving it till the complete delete of the previous tx.%01");
Expand Down Expand Up @@ -2832,6 +2831,7 @@ void TPersQueue::HandleEventForSupportivePartition(const ui64 responseCookie,
PQ_LOG_TX_I("partition " << partitionId << " for WriteId " << writeId);

writeInfo.Partitions.emplace(originalPartitionId, partitionId);
writeInfo.PartitionsProcessed.emplace(originalPartitionId);
TxWritesChanged = true;
AddSupportivePartition(partitionId);

Expand Down
1 change: 1 addition & 0 deletions ydb/core/persqueue/pqtablet/pq_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,7 @@ class TPersQueue : public NKeyValue::TKeyValueFlat {
bool Deleting = false;
bool KafkaTransaction = false;
TInstant CreatedAt;
TSet<ui32> PartitionsProcessed;
};

THashMap<TWriteId, TTxWriteInfo> TxWrites;
Expand Down
90 changes: 75 additions & 15 deletions ydb/core/persqueue/ut/pqtablet_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -259,9 +259,9 @@ class TPQTabletFixture : public NUnitTest::TBaseFixture {
void WaitWriteResponse(const TWriteResponseMatcher& matcher);

// returns owner cookie for this supportive partition
TString CreateSupportivePartitionForKafka(const NKafka::TProducerInstanceId& producerInstanceId);
void SendKafkaTxnWriteRequest(const NKafka::TProducerInstanceId& producerInstanceId, const TString& ownerCookie);
void CommitKafkaTransaction(NKafka::TProducerInstanceId producerInstanceId, ui64 txId);
TString CreateSupportivePartitionForKafka(const NKafka::TProducerInstanceId& producerInstanceId, ui32 partitionId = 0);
void SendKafkaTxnWriteRequest(const NKafka::TProducerInstanceId& producerInstanceId, const TString& ownerCookie, const ui32& partitionId = 0);
void CommitKafkaTransaction(NKafka::TProducerInstanceId producerInstanceId, ui64 txId, const std::vector<ui32>& partitionIds = {0});

std::unique_ptr<TEvPersQueue::TEvRequest> MakeGetOwnershipRequest(const TGetOwnershipRequestParams& params,
const TActorId& pipe) const;
Expand Down Expand Up @@ -835,10 +835,11 @@ void TPQTabletFixture::SendWriteRequest(const TWriteRequestParams& params)
event.Release());
}

TString TPQTabletFixture::CreateSupportivePartitionForKafka(const NKafka::TProducerInstanceId& producerInstanceId) {
TString TPQTabletFixture::CreateSupportivePartitionForKafka(const NKafka::TProducerInstanceId& producerInstanceId,
ui32 partitionId) {
EnsurePipeExist();

auto request = MakeGetOwnershipRequest({.Partition=0,
auto request = MakeGetOwnershipRequest({.Partition=partitionId,
.WriteId=TWriteId{producerInstanceId},
.NeedSupportivePartition=true,
.Owner=DEFAULT_OWNER,
Expand All @@ -851,11 +852,11 @@ TString TPQTabletFixture::CreateSupportivePartitionForKafka(const NKafka::TProdu
return WaitGetOwnershipResponse({.Cookie=4, .Status=NMsgBusProxy::MSTATUS_OK});
}

void TPQTabletFixture::SendKafkaTxnWriteRequest(const NKafka::TProducerInstanceId& producerInstanceId, const TString& ownerCookie) {
void TPQTabletFixture::SendKafkaTxnWriteRequest(const NKafka::TProducerInstanceId& producerInstanceId, const TString& ownerCookie, const ui32& partitionId) {
auto event = MakeHolder<TEvPersQueue::TEvRequest>();
auto* request = event->Record.MutablePartitionRequest();
request->SetTopic("/topic");
request->SetPartition(0);
request->SetPartition(partitionId);
request->SetCookie(123);
request->SetOwnerCookie(ownerCookie);
request->SetMessageNo(0);
Expand Down Expand Up @@ -889,14 +890,16 @@ void TPQTabletFixture::SendKafkaTxnWriteRequest(const NKafka::TProducerInstanceI
UNIT_ASSERT_VALUES_EQUAL(123, response->Record.GetPartitionResponse().GetCookie());
}

void TPQTabletFixture::CommitKafkaTransaction(NKafka::TProducerInstanceId producerInstanceId, ui64 txId) {
SendProposeTransactionRequest({.TxId=txId,
.Senders={Ctx->TabletId}, .Receivers={Ctx->TabletId},
.TxOps={
{.Partition=0, .Path="/topic", .KafkaTransaction=true},
},
.WriteId=TWriteId(producerInstanceId)
});
void TPQTabletFixture::CommitKafkaTransaction(NKafka::TProducerInstanceId producerInstanceId, ui64 txId, const std::vector<ui32>& partitionIds) {
TProposeTransactionParams params;
params.TxId = txId;
params.Senders = {Ctx->TabletId};
params.Receivers = {Ctx->TabletId};
params.WriteId = TWriteId(producerInstanceId);
for (const ui32& partitionId : partitionIds) {
params.TxOps.push_back({.Partition=partitionId, .Path="/topic", .KafkaTransaction=true});
}
SendProposeTransactionRequest(params);
WaitProposeTransactionResponse({.TxId=txId,
.Status=NKikimrPQ::TEvProposeTransactionResult::PREPARED});
SendPlanStep({.Step=100, .TxIds={txId}});
Expand Down Expand Up @@ -2605,6 +2608,63 @@ Y_UNIT_TEST_F(Kafka_Transaction_Incoming_Before_Previous_TEvDeletePartitionDone_
UNIT_ASSERT_VALUES_UNEQUAL(ownerCookie2, ownerCookie);
}

Y_UNIT_TEST_F(Kafka_Transaction_Several_Partitions_One_Tablet, TPQTabletFixture) {
NKafka::TProducerInstanceId producerInstanceId = {1, 0};
const ui64 txId = 67890;
PQTabletPrepare({.partitions=2}, {}, *Ctx);
EnsurePipeExist();

TString ownerCookie1 = CreateSupportivePartitionForKafka(producerInstanceId, 0);
TString ownerCookie2 = CreateSupportivePartitionForKafka(producerInstanceId, 1);

UNIT_ASSERT_VALUES_UNEQUAL(ownerCookie1, ownerCookie2);

SendKafkaTxnWriteRequest(producerInstanceId, ownerCookie1, 0);
SendKafkaTxnWriteRequest(producerInstanceId, ownerCookie2, 1);

const NKikimrPQ::TTabletTxInfo& txInfo = WaitForExactTxWritesCount(2);
ui32 firstSupportivePartitionId = txInfo.GetTxWrites(0).GetInternalPartitionId();
ui32 secondSupportivePartitionId = txInfo.GetTxWrites(1).GetInternalPartitionId();
Cerr << "Sup1Id: " << firstSupportivePartitionId << ", Sup2Id: " << secondSupportivePartitionId << Endl;

TAutoPtr<TEvPQ::TEvDeletePartitionDone> deleteDoneEvent;
bool seenEvent = false;
// add observer for TEvPQ::TEvDeletePartitionDone request and skip it
AddOneTimeEventObserver<TEvPQ::TEvDeletePartitionDone>(seenEvent, [&deleteDoneEvent](TAutoPtr<IEventHandle>& eventHandle) {
deleteDoneEvent = eventHandle->Release<TEvPQ::TEvDeletePartitionDone>();
return TTestActorRuntimeBase::EEventAction::DROP;
});

CommitKafkaTransaction(producerInstanceId, txId, {0, 1});

// wait for delete response and save it
TDispatchOptions options;
options.CustomFinalCondition = [&seenEvent]() {return seenEvent;};
UNIT_ASSERT(Ctx->Runtime->DispatchEvents(options));

// send another GetOwnership request to enforce new suportive partition creation (it imitates new transaction start for same proudcer epoch)
SendGetOwnershipRequest({.Partition=0,
.WriteId=TWriteId{producerInstanceId},
.NeedSupportivePartition=true,
.Owner=DEFAULT_OWNER,
.Cookie=5});
// now we can eventually send TEvPQ::TEvDeletePartitionDone
Ctx->Runtime->SendToPipe(Pipe,
Ctx->Edge,
deleteDoneEvent.Release(),
0, 0);
WaitForTheTransactionToBeDeleted(txId);

auto txInfo1 = GetTxWritesFromKV();
UNIT_ASSERT_EQUAL(txInfo1.TxWritesSize(), 1);
UNIT_ASSERT_VALUES_EQUAL(txInfo1.GetTxWrites(0).GetWriteId().GetKafkaProducerInstanceId().GetId(), producerInstanceId.Id);
UNIT_ASSERT_VALUES_UNEQUAL(txInfo1.GetTxWrites(0).GetInternalPartitionId(), firstSupportivePartitionId);
TString ownerCookie3 = WaitGetOwnershipResponse({.Cookie=5, .Status=NMsgBusProxy::MSTATUS_OK});
UNIT_ASSERT_VALUES_UNEQUAL(ownerCookie1, ownerCookie3);
UNIT_ASSERT_VALUES_UNEQUAL(ownerCookie2, ownerCookie3);

}

Y_UNIT_TEST_F(Kafka_Transaction_Incoming_Before_Previous_Is_In_DELETED_State_Should_Be_Processed_After_Previous_Complete_Erasure, TPQTabletFixture) {
NKafka::TProducerInstanceId producerInstanceId = {1, 0};
const ui64 txId = 67890;
Expand Down
Loading