Skip to content

Commit 06224ed

Browse files
authored
add block size counting for map block join + add benchmark with little right table (#24837)
1 parent f583557 commit 06224ed

File tree

3 files changed

+60
-29
lines changed

3 files changed

+60
-29
lines changed

ydb/core/kqp/tools/combiner_perf/construct_join_graph.cpp

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -53,12 +53,12 @@ void SetEntryPointValues(IComputationGraph& g, NYql::NUdf::TUnboxedValue left, N
5353
g.GetEntryPoint(1, false)->SetValue(ctx, std::move(right));
5454
}
5555

56+
} // namespace
57+
5658
bool IsBlockJoin(ETestedJoinAlgo kind) {
5759
return kind == ETestedJoinAlgo::kBlockHash || kind == ETestedJoinAlgo::kBlockMap;
5860
}
5961

60-
} // namespace
61-
6262
THolder<IComputationGraph> ConstructInnerJoinGraphStream(ETestedJoinAlgo algo, TInnerJoinDescription descr) {
6363
Y_ABORT_IF(algo == ETestedJoinAlgo::kBlockHash || algo == ETestedJoinAlgo::kScalarHash,
6464
"{Block,Scalar}HashJoin bench is not implemented");
@@ -157,10 +157,9 @@ THolder<IComputationGraph> ConstructInnerJoinGraphStream(ETestedJoinAlgo algo, T
157157
scalarMapRenames.Right, pb.NewFlowType(pb.NewTupleType(resultTypesArr)));
158158

159159
TRuntimeNode wideStream = ToWideStream(
160-
pb, pb.Collect(pb.NarrowMap(mapJoinSomething,
161-
[&pb](TRuntimeNode::TList items) -> TRuntimeNode { return pb.NewTuple(items); }
162-
163-
)));
160+
pb, pb.Collect(pb.NarrowMap(mapJoinSomething, [&pb](TRuntimeNode::TList items) -> TRuntimeNode {
161+
return pb.NewTuple(items);
162+
})));
164163

165164
THolder<IComputationGraph> graph = descr.Setup->BuildGraph(wideStream, args.Entrypoints);
166165
SetEntryPointValues(*graph, descr.LeftSource.ValuesList, descr.RightSource.ValuesList);

ydb/core/kqp/tools/combiner_perf/construct_join_graph.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,9 @@ struct TInnerJoinDescription {
1717
TDqSetup<false>* Setup;
1818
};
1919

20+
bool IsBlockJoin(ETestedJoinAlgo algo);
21+
2022
THolder<IComputationGraph> ConstructInnerJoinGraphStream(ETestedJoinAlgo algo, TInnerJoinDescription descr);
23+
2124
i32 ResultColumnCount(ETestedJoinAlgo algo, TInnerJoinDescription descr);
2225
} // namespace NKikimr::NMiniKQL

ydb/core/kqp/tools/combiner_perf/joins.cpp

Lines changed: 52 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
#include "construct_join_graph.h"
33
#include "factories.h"
44
#include <ydb/library/yql/dq/comp_nodes/ut/utils/utils.h>
5+
#include <yql/essentials/minikql/computation/mkql_computation_node_holders.h>
56

67
namespace {
78
TVector<ui64> GenerateKeyColumn(i32 size, i32 seed) {
@@ -13,10 +14,10 @@ TVector<ui64> GenerateKeyColumn(i32 size, i32 seed) {
1314
return keyCoumn;
1415
}
1516

16-
NKikimr::NMiniKQL::TInnerJoinDescription PrepareCommonDescription(NKikimr::NMiniKQL::TDqSetup<false>* setup) {
17+
NKikimr::NMiniKQL::TInnerJoinDescription PrepareSameSizeTables(NKikimr::NMiniKQL::TDqSetup<false>* setup) {
1718
NKikimr::NMiniKQL::TInnerJoinDescription descr;
1819
descr.Setup = setup;
19-
const int size = 1 << 14;
20+
const int size = 1 << 16;
2021

2122
std::tie(descr.LeftSource.ColumnTypes, descr.LeftSource.ValuesList) = ConvertVectorsToRuntimeTypesAndValue(
2223
*setup, GenerateKeyColumn(size, 123), TVector<ui64>(size, 111), TVector<TString>(size, "meow"));
@@ -25,10 +26,31 @@ NKikimr::NMiniKQL::TInnerJoinDescription PrepareCommonDescription(NKikimr::NMini
2526
return descr;
2627
}
2728

29+
NKikimr::NMiniKQL::TInnerJoinDescription PrepareSmallRightTable(NKikimr::NMiniKQL::TDqSetup<false>* setup) {
30+
NKikimr::NMiniKQL::TInnerJoinDescription descr;
31+
descr.Setup = setup;
32+
const int leftSize = 1 << 16;
33+
const int rightSize = leftSize >> 7;
34+
std::tie(descr.LeftSource.ColumnTypes, descr.LeftSource.ValuesList) = ConvertVectorsToRuntimeTypesAndValue(
35+
*setup, GenerateKeyColumn(leftSize, 123), TVector<ui64>(leftSize, 111), TVector<TString>(leftSize, "meow"));
36+
std::tie(descr.RightSource.ColumnTypes, descr.RightSource.ValuesList) = ConvertVectorsToRuntimeTypesAndValue(
37+
*setup, GenerateKeyColumn(rightSize, 111), TVector<TString>(rightSize, "woo"));
38+
return descr;
39+
}
40+
2841
struct TTestResult {
2942
TRunResult Run;
3043
TString TestName;
3144
};
45+
46+
int LineSize(NKikimr::NMiniKQL::ETestedJoinAlgo algo, std::span<const NYql::NUdf::TUnboxedValue> line) {
47+
if (NKikimr::NMiniKQL::IsBlockJoin(algo)) {
48+
return NKikimr::NMiniKQL::TArrowBlock::From(line.back()).GetDatum().scalar_as<arrow::UInt64Scalar>().value;
49+
} else {
50+
return 1;
51+
}
52+
}
53+
3254
} // namespace
3355

3456
void NKikimr::NMiniKQL::RunJoinsBench(const TRunParams& params, TTestResultCollector& printout) {
@@ -44,31 +66,38 @@ void NKikimr::NMiniKQL::RunJoinsBench(const TRunParams& params, TTestResultColle
4466
{NYKQL::ETestedJoinAlgo::kScalarMap, "ScalarMap"},
4567
{NYKQL::ETestedJoinAlgo::kBlockMap, "BlockMap"},
4668
};
69+
TVector<std::pair<NYKQL::TInnerJoinDescription, std::string_view>> inputs = {
70+
{PrepareSameSizeTables(&setup), "SameSizeTables"},
71+
{PrepareSmallRightTable(&setup), "SmallRight"},
72+
};
4773

48-
for (auto [algo, name] : cases) {
49-
NYKQL::TInnerJoinDescription descr = PrepareCommonDescription(&setup);
50-
descr.LeftSource.KeyColumnIndexes = keyColumns;
51-
descr.RightSource.KeyColumnIndexes = keyColumns;
52-
THolder<NKikimr::NMiniKQL::IComputationGraph> wideStreamGraph = ConstructInnerJoinGraphStream(algo, descr);
53-
NYql::NUdf::TUnboxedValue wideStream = wideStreamGraph->GetValue();
54-
std::vector<NYql::NUdf::TUnboxedValue> fetchBuff;
55-
i32 cols = NKikimr::NMiniKQL::ResultColumnCount(algo, descr);
56-
fetchBuff.resize(cols);
57-
Cerr << "Compute graph result for algorithm '" << name << "'";
74+
for (auto [algo, algo_name] : cases) {
75+
for (auto [descr, descr_name] : inputs) {
76+
descr.LeftSource.KeyColumnIndexes = keyColumns;
77+
descr.RightSource.KeyColumnIndexes = keyColumns;
5878

59-
NYql::NUdf::EFetchStatus fetchStatus;
60-
i64 lineCount = 0;
61-
const auto graphTimeStart = GetThreadCPUTime();
79+
THolder<NKikimr::NMiniKQL::IComputationGraph> wideStreamGraph = ConstructInnerJoinGraphStream(algo, descr);
80+
NYql::NUdf::TUnboxedValue wideStream = wideStreamGraph->GetValue();
81+
std::vector<NYql::NUdf::TUnboxedValue> fetchBuff;
82+
ui32 cols = NKikimr::NMiniKQL::ResultColumnCount(algo, descr);
83+
fetchBuff.resize(cols);
84+
Cerr << "Compute graph result for algorithm '" << algo_name << "' and input data '" << descr_name << "'";
6285

63-
while ((fetchStatus = wideStream.WideFetch(fetchBuff.data(), cols)) != NYql::NUdf::EFetchStatus::Finish) {
64-
if (fetchStatus == NYql::NUdf::EFetchStatus::Ok) {
65-
++lineCount;
86+
NYql::NUdf::EFetchStatus fetchStatus;
87+
i64 lineCount = 0;
88+
const auto graphTimeStart = GetThreadCPUTime();
89+
90+
while ((fetchStatus = wideStream.WideFetch(fetchBuff.data(), cols)) != NYql::NUdf::EFetchStatus::Finish) {
91+
if (fetchStatus == NYql::NUdf::EFetchStatus::Ok) {
92+
lineCount += LineSize(algo, {fetchBuff.data(), cols});
93+
}
6694
}
67-
}
68-
TRunResult thisNodeResult;
95+
TRunResult thisNodeResult;
6996

70-
thisNodeResult.ResultTime = GetThreadCPUTimeDelta(graphTimeStart);
71-
Cerr << ". Output line count(block considered to be 1 line): " << lineCount << Endl;
72-
printout.SubmitMetrics(params, thisNodeResult, name.data(), false, false);
97+
thisNodeResult.ResultTime = GetThreadCPUTimeDelta(graphTimeStart);
98+
Cerr << ". Output line count(block considered to be 1 line): " << lineCount << Endl;
99+
std::string testname = std::string{algo_name} + "_" + std::string{descr_name};
100+
printout.SubmitMetrics(params, thisNodeResult, testname.data(), false, false);
101+
}
73102
}
74103
}

0 commit comments

Comments
 (0)