Skip to content

Commit b1de0ba

Browse files
authored
Use enshure in core/persqueue (#24858)
1 parent e540dea commit b1de0ba

38 files changed

+649
-612
lines changed

ydb/core/persqueue/pqrb/mirror_describer.cpp

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@
44

55
#include <google/protobuf/util/message_differencer.h>
66

7+
#define PQ_ENSURE(condition) AFL_ENSURE(condition)("topic", TopicName)
8+
79
using namespace NPersQueue;
810

911
namespace NKikimr {
@@ -80,7 +82,7 @@ void TMirrorDescriber::DescribeTopic(const TActorContext& ctx) {
8082
}
8183

8284
auto factory = AppData(ctx)->PersQueueMirrorReaderFactory;
83-
Y_ABORT_UNLESS(factory);
85+
PQ_ENSURE(factory);
8486
auto future = factory->GetTopicDescription(Config, CredentialsProvider);
8587
future.Subscribe(
8688
[
@@ -114,7 +116,7 @@ void TMirrorDescriber::HandleInitCredentials(TEvPQ::TEvInitCredentials::TPtr& /*
114116
CredentialsProvider = nullptr;
115117

116118
auto factory = AppData(ctx)->PersQueueMirrorReaderFactory;
117-
Y_ABORT_UNLESS(factory);
119+
PQ_ENSURE(factory);
118120
auto future = factory->GetCredentialsProvider(Config.GetCredentials());
119121
future.Subscribe(
120122
[
@@ -174,6 +176,24 @@ TString TMirrorDescriber::GetCurrentState() const {
174176
return "UNKNOWN";
175177
}
176178

179+
bool TMirrorDescriber::OnUnhandledException(const std::exception& exc) {
180+
LOG_CRIT_S(*TlsActivationContext, NKikimrServices::PQ_MIRROR_DESCRIBER,
181+
LogDescription() << "unhandled exception " << TypeName(exc) << ": " << exc.what() << Endl
182+
<< TBackTrace::FromCurrentException().PrintToString());
183+
184+
Send(ReadBalancerActorId, new TEvents::TEvPoison());
185+
PassAway();
186+
return true;
187+
}
188+
189+
NActors::IActor* CreateMirrorDescriber(
190+
const NActors::TActorId& readBalancerActorId,
191+
const TString& topicName,
192+
const NKikimrPQ::TMirrorPartitionConfig& config
193+
) {
194+
return new TMirrorDescriber(readBalancerActorId, topicName, config);
195+
}
196+
177197

178198
}// NPQ
179199
}// NKikimr

ydb/core/persqueue/pqrb/mirror_describer.h

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,8 @@
1414
namespace NKikimr::NPQ {
1515

1616

17-
class TMirrorDescriber : public TActorBootstrapped<TMirrorDescriber> {
17+
class TMirrorDescriber : public TActorBootstrapped<TMirrorDescriber>
18+
, public IActorExceptionHandler {
1819
private:
1920
static constexpr TDuration INIT_INTERVAL_MAX = TDuration::Seconds(240);
2021
static constexpr TDuration INIT_INTERVAL_START = TDuration::Seconds(1);
@@ -76,7 +77,7 @@ class TMirrorDescriber : public TActorBootstrapped<TMirrorDescriber> {
7677
void HandleCredentialsCreated(TEvPQ::TEvCredentialsCreated::TPtr& ev, const TActorContext& ctx);
7778
void HandleWakeup(const TActorContext& ctx);
7879
void HandleDescriptionResult(TEvPQ::TEvMirrorTopicDescription::TPtr& ev, const TActorContext& ctx);
79-
80+
bool OnUnhandledException(const std::exception&) override;
8081
private:
8182
TActorId ReadBalancerActorId;
8283
TString TopicName;
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
#pragma once
2+
3+
#include <ydb/core/persqueue/public/config.h>
4+
#include <ydb/library/actors/core/actorsystem_fwd.h>
5+
6+
namespace NKikimr::NPQ {
7+
8+
NActors::IActor* CreateMirrorDescriber(
9+
const NActors::TActorId& readBalancerActorId,
10+
const TString& topicName,
11+
const NKikimrPQ::TMirrorPartitionConfig& config
12+
);
13+
14+
}

ydb/core/persqueue/pqrb/read_balancer.cpp

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
#include "read_balancer__txpreinit.h"
44
#include "read_balancer__txwrite.h"
55
#include "read_balancer_log.h"
6-
#include "mirror_describer.h"
6+
#include "mirror_describer_factory.h"
77

88
#include <ydb/core/persqueue/events/internal.h>
99
#include <ydb/core/protos/counters_pq.pb.h>
@@ -13,6 +13,8 @@
1313
#include <library/cpp/string_utils/base64/base64.h>
1414
#include <library/cpp/random_provider/random_provider.h>
1515

16+
#define PQ_ENSURE(condition) AFL_ENSURE(condition)("tablet_id", TabletID())("path", Path)("topic", Topic)
17+
1618
namespace NKikimr {
1719
namespace NPQ {
1820

@@ -289,7 +291,7 @@ void TPersQueueReadBalancer::Handle(TEvPersQueue::TEvUpdateBalancerConfig::TPtr
289291
if (MirrorTopicDescriberActorId) {
290292
ctx.Send(MirrorTopicDescriberActorId, new TEvPQ::TEvChangePartitionConfig(nullptr, TabletConfig));
291293
} else {
292-
MirrorTopicDescriberActorId = ctx.Register(new TMirrorDescriber(SelfId(), Topic, TabletConfig.GetPartitionConfig().GetMirrorFrom()));
294+
MirrorTopicDescriberActorId = ctx.Register(CreateMirrorDescriber(SelfId(), Topic, TabletConfig.GetPartitionConfig().GetMirrorFrom()));
293295
}
294296
} else {
295297
if (MirrorTopicDescriberActorId) {
@@ -320,7 +322,7 @@ void TPersQueueReadBalancer::Handle(TEvPersQueue::TEvUpdateBalancerConfig::TPtr
320322
for (auto& p : record.GetPartitions()) {
321323
auto it = PartitionsInfo.find(p.GetPartition());
322324
if (it == PartitionsInfo.end()) {
323-
Y_ABORT_UNLESS(p.GetPartition() >= prevNextPartitionId && p.GetPartition() < NextPartitionId || NextPartitionId == 0);
325+
PQ_ENSURE(p.GetPartition() >= prevNextPartitionId && p.GetPartition() < NextPartitionId || NextPartitionId == 0);
324326

325327
partitionsInfo[p.GetPartition()] = {p.GetTabletId()};
326328

@@ -422,7 +424,7 @@ TActorId TPersQueueReadBalancer::GetPipeClient(const ui64 tabletId, const TActor
422424
pipeClient = ctx.RegisterWithSameMailbox(NTabletPipe::CreateClient(ctx.SelfID, tabletId, clientConfig));
423425
TabletPipes[tabletId].PipeActor = pipeClient;
424426
auto res = PipesRequested.insert(tabletId);
425-
Y_ABORT_UNLESS(res.second);
427+
PQ_ENSURE(res.second);
426428
} else {
427429
pipeClient = it->second.PipeActor;
428430
}

ydb/core/persqueue/pqrb/read_balancer.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,8 @@ class TMetricsTimeKeeper {
5454
};
5555

5656

57-
class TPersQueueReadBalancer : public TActor<TPersQueueReadBalancer>, public TTabletExecutedFlat {
57+
class TPersQueueReadBalancer : public TActor<TPersQueueReadBalancer>,
58+
public TTabletExecutedFlat {
5859
struct TTxPreInit;
5960
struct TTxInit;
6061
struct TTxWrite;

ydb/core/persqueue/pqrb/read_balancer__txinit.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ struct TPersQueueReadBalancer::TTxInit : public ITransaction {
3333
return false;
3434

3535
while (!dataRowset.EndOfSet()) { //found out topic info
36-
Y_ABORT_UNLESS(!Self->Inited);
36+
AFL_ENSURE(!Self->Inited)("tablet_id", Self->TabletID());
3737
Self->PathId = dataRowset.GetValue<Schema::Data::PathId>();
3838
Self->Topic = dataRowset.GetValue<Schema::Data::Topic>();
3939
Self->Path = dataRowset.GetValue<Schema::Data::Path>();
@@ -50,7 +50,7 @@ struct TPersQueueReadBalancer::TTxInit : public ITransaction {
5050
TString config = dataRowset.GetValueOrDefault<Schema::Data::Config>("");
5151
if (!config.empty()) {
5252
bool res = Self->TabletConfig.ParseFromString(config);
53-
Y_ABORT_UNLESS(res);
53+
AFL_ENSURE(res)("tablet_id", Self->TabletID())("path", Self->Path)("topic", Self->Topic);
5454

5555
Migrate(Self->TabletConfig);
5656
Self->Consumers.clear();

ydb/core/persqueue/pqrb/read_balancer__txwrite.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ struct TPersQueueReadBalancer::TTxWrite : public ITransaction {
4848
NIceDb::TNiceDb db(txc.DB);
4949
TString config;
5050
bool res = Self->TabletConfig.SerializeToString(&config);
51-
Y_ABORT_UNLESS(res);
51+
AFL_ENSURE(res)("tablet_id", Self->TabletID())("path", Self->Path)("topic", Self->Topic);
5252
db.Table<Schema::Data>().Key(1).Update(
5353
NIceDb::TUpdate<Schema::Data::PathId>(Self->PathId),
5454
NIceDb::TUpdate<Schema::Data::Topic>(Self->Topic),

ydb/core/persqueue/pqtablet/blob/blob.cpp

Lines changed: 31 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
#include <util/string/builder.h>
55
#include <util/string/escape.h>
66
#include <util/system/unaligned_mem.h>
7+
#include <ydb/library/actors/core/log.h>
78

89
namespace NKikimr {
910
namespace NPQ {
@@ -123,7 +124,7 @@ TBatch::TBatch(const NKikimrPQ::TBatchHeader &header, const char* data)
123124
}
124125

125126
TBatch TBatch::FromBlobs(const ui64 offset, std::deque<TClientBlob>&& blobs) {
126-
Y_ABORT_UNLESS(!blobs.empty());
127+
AFL_ENSURE(!blobs.empty());
127128
TBatch batch(offset, blobs.front().GetPartNo());
128129
for (auto& b : blobs) {
129130
batch.AddBlob(b);
@@ -183,12 +184,12 @@ TInstant TBatch::GetEndWriteTimestamp() const {
183184
}
184185

185186
ui32 TBatch::GetPackedSize() const {
186-
Y_ABORT_UNLESS(Packed);
187+
AFL_ENSURE(Packed);
187188
return sizeof(ui16) + PackedData.size() + Header.ByteSize();
188189
}
189190

190191
ui32 TBatch::FindPos(const ui64 offset, const ui16 partNo) const {
191-
Y_ABORT_UNLESS(!Packed);
192+
AFL_ENSURE(!Packed);
192193
if (offset < GetOffset() || offset == GetOffset() && partNo < GetPartNo())
193194
return Max<ui32>();
194195
if (offset == GetOffset()) {
@@ -257,20 +258,20 @@ const TBatch& THead::GetBatch(ui32 idx) const {
257258
}
258259

259260
const TBatch& THead::GetLastBatch() const {
260-
Y_ABORT_UNLESS(!Batches.empty());
261+
AFL_ENSURE(!Batches.empty());
261262
return Batches.back();
262263
}
263264

264265
TBatch THead::ExtractFirstBatch() {
265-
Y_ABORT_UNLESS(!Batches.empty());
266+
AFL_ENSURE(!Batches.empty());
266267
auto batch = std::move(Batches.front());
267268
InternalPartsCount -= batch.GetInternalPartsCount();
268269
Batches.pop_front();
269270
return batch;
270271
}
271272

272273
void THead::AddBlob(const TClientBlob& blob) {
273-
Y_ABORT_UNLESS(!Batches.empty());
274+
AFL_ENSURE(!Batches.empty());
274275
auto& batch = Batches.back();
275276
InternalPartsCount -= batch.GetInternalPartsCount();
276277
batch.AddBlob(blob);
@@ -299,9 +300,7 @@ ui32 THead::GetCount() const
299300
return 0;
300301

301302
//how much offsets before last batch and how much offsets in last batch
302-
Y_ABORT_UNLESS(Batches.front().GetOffset() == Offset,
303-
"front.Offset=%" PRIu64 ", offset=%" PRIu64,
304-
Batches.front().GetOffset(), Offset);
303+
AFL_ENSURE(Batches.front().GetOffset() == Offset)("front.Offset", Batches.front().GetOffset())("offset", Offset);
305304

306305
return Batches.back().GetOffset() - Offset + Batches.back().GetCount();
307306
}
@@ -312,12 +311,12 @@ ui32 THead::GetCount() const
312311
//
313312

314313
THead::TBatchAccessor THead::MutableBatch(ui32 idx) {
315-
Y_ABORT_UNLESS(idx < Batches.size());
314+
AFL_ENSURE(idx < Batches.size());
316315
return TBatchAccessor(Batches[idx]);
317316
}
318317

319318
THead::TBatchAccessor THead::MutableLastBatch() {
320-
Y_ABORT_UNLESS(!Batches.empty());
319+
AFL_ENSURE(!Batches.empty());
321320
return TBatchAccessor(Batches.back());
322321
}
323322

@@ -437,7 +436,7 @@ TPartitionedBlob::TPartitionedBlob(const TPartitionId& partition, const ui64 off
437436
, MaxBlobSize(maxBlobSize)
438437
, FastWrite(fastWrite)
439438
{
440-
Y_ABORT_UNLESS(NewHead.Offset == Head.GetNextOffset() && NewHead.PartNo == 0 || headCleared || needCompactHead || Head.PackedSize == 0); // if head not cleared, then NewHead is going after Head
439+
AFL_ENSURE(NewHead.Offset == Head.GetNextOffset() && NewHead.PartNo == 0 || headCleared || needCompactHead || Head.PackedSize == 0); // if head not cleared, then NewHead is going after Head
441440
if (!headCleared) {
442441
HeadSize = Head.PackedSize + NewHead.PackedSize;
443442
InternalPartsCount = Head.GetInternalPartsCount() + NewHead.GetInternalPartsCount();
@@ -452,7 +451,7 @@ TPartitionedBlob::TPartitionedBlob(const TPartitionId& partition, const ui64 off
452451
if (HeadSize == 0) {
453452
StartOffset = offset;
454453
NewHead.Offset = offset;
455-
//Y_ABORT_UNLESS(StartPartNo == 0);
454+
//AFL_ENSURE(StartPartNo == 0);
456455
}
457456
}
458457

@@ -462,7 +461,7 @@ TString TPartitionedBlob::CompactHead(bool glueHead, THead& head, bool glueNewHe
462461
valueD.reserve(estimatedSize);
463462
if (glueHead) {
464463
for (ui32 pp = 0; pp < head.Batches.size(); ++pp) {
465-
Y_ABORT_UNLESS(head.Batches[pp].Packed);
464+
AFL_ENSURE(head.Batches[pp].Packed);
466465
head.Batches[pp].SerializeTo(valueD);
467466
}
468467
}
@@ -471,12 +470,12 @@ TString TPartitionedBlob::CompactHead(bool glueHead, THead& head, bool glueNewHe
471470
TBatch *b = &newHead.Batches[pp];
472471
TBatch batch;
473472
if (!b->Packed) {
474-
Y_ABORT_UNLESS(pp + 1 == newHead.Batches.size());
473+
AFL_ENSURE(pp + 1 == newHead.Batches.size());
475474
batch = newHead.Batches[pp];
476475
batch.Pack();
477476
b = &batch;
478477
}
479-
Y_ABORT_UNLESS(b->Packed);
478+
AFL_ENSURE(b->Packed);
480479
b->SerializeTo(valueD);
481480
}
482481
}
@@ -488,9 +487,9 @@ auto TPartitionedBlob::CreateFormedBlob(ui32 size, bool useRename) -> std::optio
488487
HeadPartNo = NextPartNo;
489488
ui32 count = (GlueHead ? Head.GetCount() : 0) + (GlueNewHead ? NewHead.GetCount() : 0);
490489

491-
Y_ABORT_UNLESS(Offset >= (GlueHead ? Head.Offset : NewHead.Offset));
490+
AFL_ENSURE(Offset >= (GlueHead ? Head.Offset : NewHead.Offset));
492491

493-
Y_ABORT_UNLESS(NewHead.GetNextOffset() >= (GlueHead ? Head.Offset : NewHead.Offset));
492+
AFL_ENSURE(NewHead.GetNextOffset() >= (GlueHead ? Head.Offset : NewHead.Offset));
494493

495494
TKey tmpKey, dataKey;
496495

@@ -513,11 +512,11 @@ auto TPartitionedBlob::CreateFormedBlob(ui32 size, bool useRename) -> std::optio
513512
auto batch = TBatch::FromBlobs(Offset, std::move(Blobs));
514513
Blobs.clear();
515514
batch.Pack();
516-
Y_ABORT_UNLESS(batch.Packed);
515+
AFL_ENSURE(batch.Packed);
517516
batch.SerializeTo(valueD);
518517
}
519518

520-
Y_ABORT_UNLESS(valueD.size() <= MaxBlobSize && (valueD.size() + size + 1_MB > MaxBlobSize || HeadSize + BlobsSize + size + GetMaxHeaderSize() <= MaxBlobSize));
519+
AFL_ENSURE(valueD.size() <= MaxBlobSize && (valueD.size() + size + 1_MB > MaxBlobSize || HeadSize + BlobsSize + size + GetMaxHeaderSize() <= MaxBlobSize));
521520
HeadSize = 0;
522521
BlobsSize = 0;
523522
TClientBlob::CheckBlob(tmpKey, valueD);
@@ -531,11 +530,9 @@ auto TPartitionedBlob::CreateFormedBlob(ui32 size, bool useRename) -> std::optio
531530

532531
auto TPartitionedBlob::Add(TClientBlob&& blob) -> std::optional<TFormedBlobInfo>
533532
{
534-
Y_ABORT_UNLESS(NewHead.Offset >= Head.Offset,
535-
"Head.Offset=%" PRIu64 ", NewHead.Offset=%" PRIu64,
536-
Head.Offset, NewHead.Offset);
533+
AFL_ENSURE(NewHead.Offset >= Head.Offset)("Head.Offset", Head.Offset)("NewHead.Offset", NewHead.Offset);
537534
ui32 size = blob.GetSerializedSize();
538-
Y_ABORT_UNLESS(InternalPartsCount < 1000); //just check for future packing
535+
AFL_ENSURE(InternalPartsCount < 1000); //just check for future packing
539536
if (HeadSize + BlobsSize + size + GetMaxHeaderSize() > MaxBlobSize) {
540537
NeedCompactHead = true;
541538
}
@@ -617,22 +614,20 @@ TBlobIterator::TBlobIterator(const TKey& key, const TString& blob)
617614
, Count(0)
618615
, InternalPartsCount(0)
619616
{
620-
Y_ABORT_UNLESS(Data != End,
621-
"Key=%s, blob.size=%" PRISZT,
622-
Key.ToString().data(), blob.size());
617+
AFL_ENSURE(Data != End)("Key", Key.ToString())("blob.size", blob.size());
623618
ParseBatch();
624-
Y_ABORT_UNLESS(Header.GetPartNo() == Key.GetPartNo());
619+
AFL_ENSURE(Header.GetPartNo() == Key.GetPartNo());
625620
}
626621

627622
void TBlobIterator::ParseBatch() {
628-
Y_ABORT_UNLESS(Data < End);
623+
AFL_ENSURE(Data < End);
629624
Header = ExtractHeader(Data, End - Data);
630-
//Y_ABORT_UNLESS(Header.GetOffset() == Offset);
625+
//AFL_ENSURE(Header.GetOffset() == Offset);
631626
Count += Header.GetCount();
632627
Offset += Header.GetCount();
633628
InternalPartsCount += Header.GetInternalPartsCount();
634-
Y_ABORT_UNLESS(Count <= Key.GetCount());
635-
Y_ABORT_UNLESS(InternalPartsCount <= Key.GetInternalPartsCount());
629+
AFL_ENSURE(Count <= Key.GetCount());
630+
AFL_ENSURE(InternalPartsCount <= Key.GetInternalPartsCount());
636631
}
637632

638633
bool TBlobIterator::IsValid()
@@ -642,11 +637,11 @@ bool TBlobIterator::IsValid()
642637

643638
bool TBlobIterator::Next()
644639
{
645-
Y_ABORT_UNLESS(IsValid());
640+
AFL_ENSURE(IsValid());
646641
Data += Header.GetPayloadSize() + sizeof(ui16) + Header.ByteSize();
647642
if (Data == End) { //this was last batch
648-
Y_ABORT_UNLESS(Count == Key.GetCount());
649-
Y_ABORT_UNLESS(InternalPartsCount == Key.GetInternalPartsCount());
643+
AFL_ENSURE(Count == Key.GetCount());
644+
AFL_ENSURE(InternalPartsCount == Key.GetInternalPartsCount());
650645
return false;
651646
}
652647
ParseBatch();
@@ -655,7 +650,7 @@ bool TBlobIterator::Next()
655650

656651
TBatch TBlobIterator::GetBatch()
657652
{
658-
Y_ABORT_UNLESS(IsValid());
653+
AFL_ENSURE(IsValid());
659654

660655
return TBatch(Header, Data + sizeof(ui16) + Header.ByteSize());
661656
}

0 commit comments

Comments
 (0)