Skip to content

Commit 5dc5519

Browse files
authored
CTAS mkdir (#23807)
1 parent 37342f3 commit 5dc5519

File tree

5 files changed

+192
-7
lines changed

5 files changed

+192
-7
lines changed

ydb/core/kqp/executer_actor/kqp_executer.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -160,7 +160,7 @@ IActor* CreateKqpSchemeExecuter(
160160
TKqpPhyTxHolder::TConstPtr phyTx, NKikimrKqp::EQueryType queryType, const TActorId& target,
161161
const TMaybe<TString>& requestType, const TString& database,
162162
TIntrusiveConstPtr<NACLib::TUserToken> userToken, const TString& clientAddress,
163-
bool temporary, TString SessionId, TIntrusivePtr<TUserRequestContext> ctx,
163+
bool temporary, bool isCreateTableAs, TString SessionId, TIntrusivePtr<TUserRequestContext> ctx,
164164
const TActorId& kqpTempTablesAgentActor = TActorId());
165165

166166
std::unique_ptr<TEvKqpExecuter::TEvTxResponse> ExecuteLiteral(

ydb/core/kqp/executer_actor/kqp_scheme_executer.cpp

Lines changed: 156 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ class TKqpSchemeExecuter : public TActorBootstrapped<TKqpSchemeExecuter> {
4949
EvResult = EventSpaceBegin(TEvents::ES_PRIVATE),
5050
EvMakeTempDirResult,
5151
EvMakeSessionDirResult,
52+
EvMakeCTASDirResult,
5253
};
5354

5455
struct TEvResult : public TEventLocal<TEvResult, EEv::EvResult> {
@@ -62,6 +63,10 @@ class TKqpSchemeExecuter : public TActorBootstrapped<TKqpSchemeExecuter> {
6263
struct TEvMakeSessionDirResult : public TEventLocal<TEvMakeSessionDirResult, EEv::EvMakeSessionDirResult> {
6364
IKqpGateway::TGenericResult Result;
6465
};
66+
67+
struct TEvMakeCTASDirResult : public TEventLocal<TEvMakeCTASDirResult, EEv::EvMakeCTASDirResult> {
68+
IKqpGateway::TGenericResult Result;
69+
};
6570
};
6671
public:
6772
static constexpr NKikimrServices::TActivity::EType ActorActivityType() {
@@ -71,7 +76,7 @@ class TKqpSchemeExecuter : public TActorBootstrapped<TKqpSchemeExecuter> {
7176
TKqpSchemeExecuter(
7277
TKqpPhyTxHolder::TConstPtr phyTx, NKikimrKqp::EQueryType queryType, const TActorId& target, const TMaybe<TString>& requestType,
7378
const TString& database, TIntrusiveConstPtr<NACLib::TUserToken> userToken, const TString& clientAddress,
74-
bool temporary, TString sessionId, TIntrusivePtr<TUserRequestContext> ctx,
79+
bool temporary, bool isCreateTableAs, TString sessionId, TIntrusivePtr<TUserRequestContext> ctx,
7580
const TActorId& kqpTempTablesAgentActor)
7681
: PhyTx(phyTx)
7782
, QueryType(queryType)
@@ -80,6 +85,7 @@ class TKqpSchemeExecuter : public TActorBootstrapped<TKqpSchemeExecuter> {
8085
, UserToken(userToken)
8186
, ClientAddress(clientAddress)
8287
, Temporary(temporary)
88+
, IsCreateTableAs(isCreateTableAs)
8389
, SessionId(sessionId)
8490
, RequestContext(std::move(ctx))
8591
, RequestType(requestType)
@@ -175,6 +181,128 @@ class TKqpSchemeExecuter : public TActorBootstrapped<TKqpSchemeExecuter> {
175181
});
176182
}
177183

184+
void FindWorkingDirForCTAS() {
185+
const auto& schemeOp = PhyTx->GetSchemeOperation();
186+
187+
AFL_ENSURE(schemeOp.GetOperationCase() == NKqpProto::TKqpSchemeOperation::kAlterTable);
188+
const auto& alterTableModifyScheme = schemeOp.GetAlterTable();
189+
AFL_ENSURE(alterTableModifyScheme.GetOperationType() == NKikimrSchemeOp::ESchemeOpMoveTable);
190+
191+
const auto dirPath = SplitPath(alterTableModifyScheme.GetMoveTable().GetDstPath());
192+
AFL_ENSURE(dirPath.size() >= 2);
193+
194+
auto request = MakeHolder<NSchemeCache::TSchemeCacheNavigate>();
195+
TVector<TString> path;
196+
197+
for (const auto& part : dirPath) {
198+
path.emplace_back(part);
199+
200+
NSchemeCache::TSchemeCacheNavigate::TEntry entry;
201+
entry.Path = path;
202+
entry.Operation = NSchemeCache::TSchemeCacheNavigate::EOp::OpPath;
203+
entry.SyncVersion = true;
204+
entry.RedirectRequired = false;
205+
request->ResultSet.emplace_back(entry);
206+
}
207+
request->ResultSet.pop_back();
208+
209+
auto ev = std::make_unique<TEvTxProxySchemeCache::TEvNavigateKeySet>(request);
210+
211+
Send(MakeSchemeCacheID(), ev.release());
212+
Become(&TKqpSchemeExecuter::ExecuteState);
213+
}
214+
215+
void HandleCTASWorkingDir(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev) {
216+
const auto& resultSet = ev->Get()->Request->ResultSet;
217+
218+
const TVector<TString>* workingDir = nullptr;
219+
for (auto it = resultSet.rbegin(); it != resultSet.rend(); ++it) {
220+
if (it->Status == NSchemeCache::TSchemeCacheNavigate::EStatus::Ok) {
221+
workingDir = &it->Path;
222+
break;
223+
}
224+
}
225+
226+
const auto& schemeOp = PhyTx->GetSchemeOperation();
227+
AFL_ENSURE(schemeOp.GetOperationCase() == NKqpProto::TKqpSchemeOperation::kAlterTable);
228+
const auto& alterTableModifyScheme = schemeOp.GetAlterTable();
229+
AFL_ENSURE(alterTableModifyScheme.GetOperationType() == NKikimrSchemeOp::ESchemeOpMoveTable);
230+
const auto dirPath = SplitPath(alterTableModifyScheme.GetMoveTable().GetDstPath());
231+
232+
if (!workingDir) {
233+
const auto errText = TStringBuilder()
234+
<< "Cannot resolve working dir."
235+
<< " path# " << JoinPath(dirPath);
236+
LOG_D(errText);
237+
238+
const auto issue = MakeIssue(NKikimrIssues::TIssuesIds::RESOLVE_LOOKUP_ERROR, errText);
239+
return ReplyErrorAndDie(Ydb::StatusIds::BAD_REQUEST, issue);
240+
}
241+
AFL_ENSURE(!workingDir->empty() && workingDir->size() < dirPath.size());
242+
AFL_ENSURE(std::equal(
243+
workingDir->begin(),
244+
workingDir->end(),
245+
dirPath.begin(),
246+
dirPath.begin() + workingDir->size()));
247+
248+
CreateCTASDirectory(
249+
TConstArrayRef<TString>(
250+
workingDir->begin(),
251+
workingDir->end()),
252+
TConstArrayRef<TString>(
253+
dirPath.begin() + workingDir->size(),
254+
dirPath.end()));
255+
}
256+
257+
void CreateCTASDirectory(const TConstArrayRef<TString> workingDir, const TConstArrayRef<TString> dirPath) {
258+
AFL_ENSURE(!dirPath.empty());
259+
AFL_ENSURE(!workingDir.empty());
260+
261+
auto ev = MakeHolder<TEvTxUserProxy::TEvProposeTransaction>();
262+
auto& record = ev->Record;
263+
264+
auto actorSystem = TActivationContext::ActorSystem();
265+
auto selfId = SelfId();
266+
267+
record.SetDatabaseName(Database);
268+
if (UserToken) {
269+
record.SetUserToken(UserToken->GetSerializedToken());
270+
}
271+
record.SetPeerName(ClientAddress);
272+
273+
const auto& schemeOp = PhyTx->GetSchemeOperation();
274+
275+
AFL_ENSURE(schemeOp.GetOperationCase() == NKqpProto::TKqpSchemeOperation::kAlterTable);
276+
const auto& alterTableModifyScheme = schemeOp.GetAlterTable();
277+
AFL_ENSURE(alterTableModifyScheme.GetOperationType() == NKikimrSchemeOp::ESchemeOpMoveTable);
278+
279+
if (dirPath.size() == 1) {
280+
auto ev = MakeHolder<TEvPrivate::TEvMakeCTASDirResult>();
281+
ev->Result.SetSuccess();
282+
actorSystem->Send(selfId, ev.Release());
283+
Become(&TKqpSchemeExecuter::ExecuteState);
284+
return;
285+
}
286+
287+
auto* modifyScheme = record.MutableTransaction()->MutableModifyScheme();
288+
modifyScheme->SetWorkingDir(CombinePath(workingDir.begin(), workingDir.end()));
289+
modifyScheme->SetOperationType(NKikimrSchemeOp::EOperationType::ESchemeOpMkDir);
290+
291+
auto* makeDir = modifyScheme->MutableMkDir();
292+
makeDir->SetName(CombinePath(dirPath.begin(), std::prev(dirPath.end()), false));
293+
294+
auto promise = NewPromise<IKqpGateway::TGenericResult>();
295+
IActor* requestHandler = new TSchemeOpRequestHandler(ev.Release(), promise, false);
296+
RegisterWithSameMailbox(requestHandler);
297+
298+
promise.GetFuture().Subscribe([actorSystem, selfId](const TFuture<IKqpGateway::TGenericResult>& future) {
299+
auto ev = MakeHolder<TEvPrivate::TEvMakeCTASDirResult>();
300+
ev->Result = future.GetValue();
301+
actorSystem->Send(selfId, ev.Release());
302+
});
303+
Become(&TKqpSchemeExecuter::ExecuteState);
304+
}
305+
178306
void MakeSchemeOperationRequest() {
179307
using TRequest = TEvTxUserProxy::TEvProposeTransaction;
180308

@@ -604,6 +732,8 @@ class TKqpSchemeExecuter : public TActorBootstrapped<TKqpSchemeExecuter> {
604732
const auto& schemeOp = PhyTx->GetSchemeOperation();
605733
if (schemeOp.GetObjectType()) {
606734
MakeObjectRequest();
735+
} else if (IsCreateTableAs && schemeOp.GetOperationCase() == NKqpProto::TKqpSchemeOperation::kAlterTable) {
736+
FindWorkingDirForCTAS();
607737
} else {
608738
if (Temporary) {
609739
CreateTmpDirectory();
@@ -620,6 +750,7 @@ class TKqpSchemeExecuter : public TActorBootstrapped<TKqpSchemeExecuter> {
620750
hFunc(TEvPrivate::TEvResult, HandleExecute);
621751
hFunc(TEvPrivate::TEvMakeTempDirResult, Handle);
622752
hFunc(TEvPrivate::TEvMakeSessionDirResult, Handle);
753+
hFunc(TEvPrivate::TEvMakeCTASDirResult, Handle);
623754
hFunc(TEvKqp::TEvAbortExecution, HandleAbortExecution);
624755
hFunc(TEvTxUserProxy::TEvAllocateTxIdResult, Handle);
625756
hFunc(TEvTxProxySchemeCache::TEvNavigateKeySetResult, Handle);
@@ -669,6 +800,15 @@ class TKqpSchemeExecuter : public TActorBootstrapped<TKqpSchemeExecuter> {
669800
MakeSchemeOperationRequest();
670801
}
671802

803+
void Handle(TEvPrivate::TEvMakeCTASDirResult::TPtr& result) {
804+
if (!result->Get()->Result.Success()) {
805+
InternalError(TStringBuilder()
806+
<< "Error creating directory:"
807+
<< result->Get()->Result.Issues().ToString(true));
808+
}
809+
MakeSchemeOperationRequest();
810+
}
811+
672812
void Handle(TEvTxUserProxy::TEvAllocateTxIdResult::TPtr& ev) {
673813
const auto* msg = ev->Get();
674814
TxId = msg->TxId;
@@ -713,6 +853,14 @@ class TKqpSchemeExecuter : public TActorBootstrapped<TKqpSchemeExecuter> {
713853

714854
NSchemeCache::TSchemeCacheNavigate* resp = ev->Get()->Request.Get();
715855

856+
if (IsCreateTableAs) {
857+
AFL_ENSURE(std::all_of(resp->ResultSet.begin(), resp->ResultSet.end(), [](const auto& entry) {
858+
return entry.Operation == NSchemeCache::TSchemeCacheNavigate::OpPath;
859+
}));
860+
HandleCTASWorkingDir(ev);
861+
return;
862+
}
863+
716864
if (resp->ErrorCount > 0 || resp->ResultSet.empty()) {
717865
TStringBuilder builder;
718866
builder << "Unable to navigate:";
@@ -728,6 +876,8 @@ class TKqpSchemeExecuter : public TActorBootstrapped<TKqpSchemeExecuter> {
728876
return ReplyErrorAndDie(Ydb::StatusIds::SCHEME_ERROR, NYql::TIssue(error));
729877
}
730878

879+
AFL_ENSURE(resp->ResultSet.size() <= 2);
880+
731881
if (UserToken && !UserToken->GetSerializedToken().empty() && !CheckAlterAccess(*UserToken, resp)) {
732882
LOG_E("Access check failed");
733883
return ReplyErrorAndDie(Ydb::StatusIds::UNAUTHORIZED, NYql::TIssue("Unauthorized"));
@@ -941,6 +1091,7 @@ class TKqpSchemeExecuter : public TActorBootstrapped<TKqpSchemeExecuter> {
9411091
const TString ClientAddress;
9421092
std::unique_ptr<TEvKqpExecuter::TEvTxResponse> ResponseEv;
9431093
bool Temporary;
1094+
bool IsCreateTableAs;
9441095
TString SessionId;
9451096
ui64 TxId = 0;
9461097
TActorId SchemePipeActorId_;
@@ -955,12 +1106,13 @@ class TKqpSchemeExecuter : public TActorBootstrapped<TKqpSchemeExecuter> {
9551106
IActor* CreateKqpSchemeExecuter(
9561107
TKqpPhyTxHolder::TConstPtr phyTx, NKikimrKqp::EQueryType queryType, const TActorId& target,
9571108
const TMaybe<TString>& requestType, const TString& database,
958-
TIntrusiveConstPtr<NACLib::TUserToken> userToken, const TString& clientAddress, bool temporary, TString sessionId,
959-
TIntrusivePtr<TUserRequestContext> ctx, const TActorId& kqpTempTablesAgentActor)
1109+
TIntrusiveConstPtr<NACLib::TUserToken> userToken, const TString& clientAddress,
1110+
bool temporary, bool isCreateTableAs,
1111+
TString sessionId, TIntrusivePtr<TUserRequestContext> ctx, const TActorId& kqpTempTablesAgentActor)
9601112
{
9611113
return new TKqpSchemeExecuter(
9621114
phyTx, queryType, target, requestType, database, userToken, clientAddress,
963-
temporary, sessionId, std::move(ctx), kqpTempTablesAgentActor);
1115+
temporary, isCreateTableAs, sessionId, std::move(ctx), kqpTempTablesAgentActor);
9641116
}
9651117

9661118
} // namespace NKikimr::NKqp

ydb/core/kqp/gateway/kqp_ic_gateway.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -615,7 +615,7 @@ class TKqpSchemeExecuterRequestHandler: public TActorBootstrapped<TKqpSchemeExec
615615
void Bootstrap() {
616616
auto ctx = MakeIntrusive<TUserRequestContext>();
617617
ctx->DatabaseId = DatabaseId;
618-
IActor* actor = CreateKqpSchemeExecuter(PhyTx, QueryType, SelfId(), RequestType, Database, UserToken, ClientAddress, false /* temporary */, TString() /* sessionId */, ctx);
618+
IActor* actor = CreateKqpSchemeExecuter(PhyTx, QueryType, SelfId(), RequestType, Database, UserToken, ClientAddress, false /* temporary */, false /* isCreateTableAs */, TString() /* sessionId */, ctx);
619619
Register(actor);
620620
Become(&TThis::WaitState);
621621
}

ydb/core/kqp/session_actor/kqp_session_actor.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1632,7 +1632,7 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
16321632
const bool temporary = GetTemporaryTableInfo(tx).has_value();
16331633

16341634
auto executerActor = CreateKqpSchemeExecuter(tx, QueryState->GetType(), SelfId(), requestType, Settings.Database, userToken, clientAddress,
1635-
temporary, TempTablesState.SessionId, QueryState->UserRequestContext, KqpTempTablesAgentActor);
1635+
temporary, QueryState->IsCreateTableAs(), TempTablesState.SessionId, QueryState->UserRequestContext, KqpTempTablesAgentActor);
16361636

16371637
ExecuterId = RegisterWithSameMailbox(executerActor);
16381638

ydb/core/kqp/ut/query/kqp_query_ut.cpp

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3103,6 +3103,39 @@ Y_UNIT_TEST_SUITE(KqpQuery) {
31033103
CompareYson(output, R"([[[1u];[1u]];[[10u];[10u]];[[100u];[100u]]])");
31043104
}
31053105
}
3106+
3107+
Y_UNIT_TEST(CreateTableAs_MkDir) {
3108+
NKikimrConfig::TFeatureFlags featureFlags;
3109+
featureFlags.SetEnableMoveColumnTable(true);
3110+
auto settings = TKikimrSettings().SetFeatureFlags(featureFlags).SetWithSampleTables(false);
3111+
settings.AppConfig.MutableTableServiceConfig()->SetEnableOlapSink(true);
3112+
settings.AppConfig.MutableTableServiceConfig()->SetEnableCreateTableAs(true);
3113+
settings.AppConfig.MutableTableServiceConfig()->SetEnablePerStatementQueryExecution(true);
3114+
TKikimrRunner kikimr(settings);
3115+
3116+
const TString query = R"(
3117+
CREATE TABLE `/Root/test_dir/Destination` (
3118+
PRIMARY KEY (Col1)
3119+
)
3120+
WITH (STORE = ROW)
3121+
AS SELECT 1u As Col1, 1u As Col2;
3122+
)";
3123+
3124+
auto client = kikimr.GetQueryClient();
3125+
{
3126+
auto result = client.ExecuteQuery(query, NYdb::NQuery::TTxControl::NoTx()).ExtractValueSync();
3127+
UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString());
3128+
}
3129+
3130+
{
3131+
auto it = client.StreamExecuteQuery(R"(
3132+
SELECT * FROM `/Root/test_dir/Destination` ORDER BY Col1;
3133+
)", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync();
3134+
UNIT_ASSERT_VALUES_EQUAL_C(it.GetStatus(), EStatus::SUCCESS, it.GetIssues().ToString());
3135+
TString output = StreamResultToYson(it);
3136+
CompareYson(output, R"([[1u;1u]])");
3137+
}
3138+
}
31063139
}
31073140

31083141
} // namespace NKqp

0 commit comments

Comments
 (0)