Skip to content

Commit f583557

Browse files
authored
Refactor export/import cancel ack transaction (#24788)
1 parent fb57b18 commit f583557

File tree

2 files changed

+30
-33
lines changed

2 files changed

+30
-33
lines changed

ydb/core/tx/schemeshard/schemeshard_export__cancel.cpp

Lines changed: 14 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -108,11 +108,18 @@ struct TSchemeShard::TExport::TTxCancel: public TSchemeShard::TXxport::TTxBase {
108108
}; // TTxCancel
109109

110110
struct TSchemeShard::TExport::TTxCancelAck: public TSchemeShard::TXxport::TTxBase {
111-
TEvSchemeShard::TEvCancelTxResult::TPtr CancelResult;
111+
const ui64 ExportId;
112+
const TTxId TxId;
112113

113-
explicit TTxCancelAck(TSelf *self, TEvSchemeShard::TEvCancelTxResult::TPtr& ev)
114+
explicit TTxCancelAck(TSelf* self, ui64 exportId, TTxId txId)
114115
: TXxport::TTxBase(self)
115-
, CancelResult(ev)
116+
, ExportId(exportId)
117+
, TxId(txId)
118+
{
119+
}
120+
121+
explicit TTxCancelAck(TSelf* self, TEvSchemeShard::TEvCancelTxResult::TPtr& ev)
122+
: TTxCancelAck(self, ev->Cookie, TTxId(ev->Get()->Record.GetTargetTxId()))
116123
{
117124
}
118125

@@ -121,14 +128,11 @@ struct TSchemeShard::TExport::TTxCancelAck: public TSchemeShard::TXxport::TTxBas
121128
}
122129

123130
bool DoExecute(TTransactionContext& txc, const TActorContext&) override {
124-
const ui64 id = CancelResult->Cookie;
125-
const auto backupTxId = TTxId(CancelResult->Get()->Record.GetTargetTxId());
126-
127-
if (!Self->Exports.contains(id)) {
131+
if (!Self->Exports.contains(ExportId)) {
128132
return true;
129133
}
130134

131-
TExportInfo::TPtr exportInfo = Self->Exports.at(id);
135+
TExportInfo::TPtr exportInfo = Self->Exports.at(ExportId);
132136

133137
if (exportInfo->State != TExportInfo::EState::Cancellation) {
134138
return true;
@@ -150,7 +154,7 @@ struct TSchemeShard::TExport::TTxCancelAck: public TSchemeShard::TXxport::TTxBas
150154
++cancellableItems;
151155
}
152156

153-
if (item.WaitTxId == backupTxId) {
157+
if (item.WaitTxId == TxId) {
154158
found = true;
155159

156160
item.State = TExportInfo::EState::Cancelled;
@@ -166,7 +170,7 @@ struct TSchemeShard::TExport::TTxCancelAck: public TSchemeShard::TXxport::TTxBas
166170
return true;
167171
}
168172

169-
Self->TxIdToExport.erase(backupTxId);
173+
Self->TxIdToExport.erase(TxId);
170174

171175
NIceDb::TNiceDb db(txc.DB);
172176
Self->PersistExportItemState(db, *exportInfo, itemIdx);

ydb/core/tx/schemeshard/schemeshard_import__cancel.cpp

Lines changed: 16 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -111,18 +111,23 @@ struct TSchemeShard::TImport::TTxCancel: public TSchemeShard::TXxport::TTxBase {
111111
}; // TTxCancel
112112

113113
struct TSchemeShard::TImport::TTxCancelAck: public TSchemeShard::TXxport::TTxBase {
114-
TEvSchemeShard::TEvCancelTxResult::TPtr CancelTxResult = nullptr;
115-
TEvIndexBuilder::TEvCancelResponse::TPtr CancelIndexBuildResult = nullptr;
114+
const ui64 ImportId;
115+
const TTxId TxId;
116116

117-
explicit TTxCancelAck(TSelf *self, TEvSchemeShard::TEvCancelTxResult::TPtr& ev)
117+
explicit TTxCancelAck(TSelf* self, ui64 importId, TTxId txId)
118118
: TXxport::TTxBase(self)
119-
, CancelTxResult(ev)
119+
, ImportId(importId)
120+
, TxId(txId)
120121
{
121122
}
122123

123-
explicit TTxCancelAck(TSelf *self, TEvIndexBuilder::TEvCancelResponse::TPtr& ev)
124-
: TXxport::TTxBase(self)
125-
, CancelIndexBuildResult(ev)
124+
explicit TTxCancelAck(TSelf* self, TEvSchemeShard::TEvCancelTxResult::TPtr& ev)
125+
: TTxCancelAck(self, ev->Cookie, TTxId(ev->Get()->Record.GetTargetTxId()))
126+
{
127+
}
128+
129+
explicit TTxCancelAck(TSelf* self, TEvIndexBuilder::TEvCancelResponse::TPtr& ev)
130+
: TTxCancelAck(self, ev->Cookie, TTxId(ev->Get()->Record.GetTxId()))
126131
{
127132
}
128133

@@ -131,23 +136,11 @@ struct TSchemeShard::TImport::TTxCancelAck: public TSchemeShard::TXxport::TTxBas
131136
}
132137

133138
bool DoExecute(TTransactionContext& txc, const TActorContext&) override {
134-
TTxId txId;
135-
ui64 id;
136-
if (CancelTxResult) {
137-
txId = TTxId(CancelTxResult->Get()->Record.GetTargetTxId());
138-
id = CancelTxResult->Cookie;
139-
} else if (CancelIndexBuildResult) {
140-
txId = TTxId(CancelIndexBuildResult->Get()->Record.GetTxId());
141-
id = CancelIndexBuildResult->Cookie;
142-
} else {
143-
Y_ABORT("unreachable");
144-
}
145-
146-
if (!Self->Imports.contains(id)) {
139+
if (!Self->Imports.contains(ImportId)) {
147140
return true;
148141
}
149142

150-
TImportInfo::TPtr importInfo = Self->Imports.at(id);
143+
TImportInfo::TPtr importInfo = Self->Imports.at(ImportId);
151144
NIceDb::TNiceDb db(txc.DB);
152145

153146
if (importInfo->State != TImportInfo::EState::Cancellation) {
@@ -170,7 +163,7 @@ struct TSchemeShard::TImport::TTxCancelAck: public TSchemeShard::TXxport::TTxBas
170163
++cancellableItems;
171164
}
172165

173-
if (item.WaitTxId == txId) {
166+
if (item.WaitTxId == TxId) {
174167
found = true;
175168

176169
item.State = TImportInfo::EState::Cancelled;
@@ -186,7 +179,7 @@ struct TSchemeShard::TImport::TTxCancelAck: public TSchemeShard::TXxport::TTxBas
186179
return true;
187180
}
188181

189-
Self->TxIdToImport.erase(txId);
182+
Self->TxIdToImport.erase(TxId);
190183
Self->PersistImportItemState(db, *importInfo, itemIdx);
191184

192185
if (cancelledItems != cancellableItems) {

0 commit comments

Comments
 (0)