@@ -71,6 +71,17 @@ NYT::TNode MakeOutputSchema() {
7171 return NYT::TNode::CreateList ().Add (" StructType" ).Add (std::move (structMembers));
7272}
7373
74+ struct TInputType {
75+ const TVector<ui64>& Offsets;
76+ const TVector<const TVector<NYql::NUdf::TUnboxedValue>*>& Values;
77+ const ui64 RowsOffset; // offset of first value
78+ const ui64 NumberRows;
79+
80+ ui64 GetOffset (ui64 rowId) const {
81+ return Offsets[rowId + RowsOffset];
82+ }
83+ };
84+
7485class TFilterInputSpec : public NYql ::NPureCalc::TInputSpecBase {
7586public:
7687 TFilterInputSpec (const NYT::TNode& schema)
@@ -85,7 +96,7 @@ class TFilterInputSpec : public NYql::NPureCalc::TInputSpecBase {
8596 TVector<NYT::TNode> Schemas;
8697};
8798
88- class TFilterInputConsumer : public NYql ::NPureCalc::IConsumer<std::pair< const TVector<ui64>&, const TVector< const NKikimr::NMiniKQL::TUnboxedValueVector*>&> > {
99+ class TFilterInputConsumer : public NYql ::NPureCalc::IConsumer<TInputType > {
89100public:
90101 TFilterInputConsumer (
91102 const TFilterInputSpec& spec,
@@ -123,36 +134,38 @@ class TFilterInputConsumer : public NYql::NPureCalc::IConsumer<std::pair<const T
123134 }
124135 }
125136
126- void OnObject (std::pair< const TVector<ui64>&, const TVector< const NKikimr::NMiniKQL::TUnboxedValueVector*>&> values ) override {
127- Y_ENSURE (FieldsPositions.size () == values. second .size ());
137+ void OnObject (TInputType input ) override {
138+ Y_ENSURE (FieldsPositions.size () == input. Values .size ());
128139
129140 NKikimr::NMiniKQL::TThrowingBindTerminator bind;
130141 with_lock (Worker->GetScopedAlloc ()) {
142+ Y_DEFER {
143+ // Clear cache after each object because
144+ // values allocated on another allocator and should be released
145+ Cache.Clear ();
146+ Worker->GetGraph ().Invalidate ();
147+ };
148+
131149 auto & holderFactory = Worker->GetGraph ().GetHolderFactory ();
132150
133151 // TODO: use blocks here
134- for (size_t rowId = 0 ; rowId < values. second . front ()-> size () ; ++rowId) {
152+ for (size_t rowId = 0 ; rowId < input. NumberRows ; ++rowId) {
135153 NYql::NUdf::TUnboxedValue* items = nullptr ;
136154
137155 NYql::NUdf::TUnboxedValue result = Cache.NewArray (
138156 holderFactory,
139- static_cast <ui32>(values. second .size () + 1 ),
157+ static_cast <ui32>(input. Values .size () + 1 ),
140158 items);
141159
142- items[OffsetPosition] = NYql::NUdf::TUnboxedValuePod (values. first [ rowId] );
160+ items[OffsetPosition] = NYql::NUdf::TUnboxedValuePod (input. GetOffset ( rowId) );
143161
144162 size_t fieldId = 0 ;
145- for (const auto & column : values. second ) {
163+ for (const auto column : input. Values ) {
146164 items[FieldsPositions[fieldId++]] = column->at (rowId);
147165 }
148166
149167 Worker->Push (std::move (result));
150168 }
151-
152- // Clear cache after each object because
153- // values allocated on another allocator and should be released
154- Cache.Clear ();
155- Worker->GetGraph ().Invalidate ();
156169 }
157170 }
158171
@@ -236,7 +249,7 @@ struct NYql::NPureCalc::TInputSpecTraits<TFilterInputSpec> {
236249 static constexpr bool IsPartial = false ;
237250 static constexpr bool SupportPushStreamMode = true ;
238251
239- using TConsumerType = THolder<NYql::NPureCalc::IConsumer<std::pair< const TVector<ui64>&, const TVector< const NKikimr::NMiniKQL::TUnboxedValueVector*>&> >>;
252+ using TConsumerType = THolder<NYql::NPureCalc::IConsumer<TInputType >>;
240253
241254 static TConsumerType MakeConsumer (
242255 const TFilterInputSpec& spec,
@@ -282,9 +295,9 @@ class TJsonFilter::TImpl {
282295 LOG_ROW_DISPATCHER_DEBUG (" Program created" );
283296 }
284297
285- void Push (const TVector<ui64>& offsets, const TVector<const NKikimr::NMiniKQL::TUnboxedValueVector *>& values) {
298+ void Push (const TVector<ui64>& offsets, const TVector<const TVector<NYql::NUdf::TUnboxedValue> *>& values, ui64 rowsOffset, ui64 numberRows ) {
286299 Y_ENSURE (values, " Expected non empty schema" );
287- InputConsumer->OnObject (std::make_pair ( offsets, values) );
300+ InputConsumer->OnObject ({. Offsets = offsets, . Values = values, . RowsOffset = rowsOffset, . NumberRows = numberRows} );
288301 }
289302
290303 TString GetSql () const {
@@ -305,7 +318,7 @@ class TJsonFilter::TImpl {
305318
306319private:
307320 THolder<NYql::NPureCalc::TPushStreamProgram<TFilterInputSpec, TFilterOutputSpec>> Program;
308- THolder<NYql::NPureCalc::IConsumer<std::pair< const TVector<ui64>&, const TVector< const NKikimr::NMiniKQL::TUnboxedValueVector*>&> >> InputConsumer;
321+ THolder<NYql::NPureCalc::IConsumer<TInputType >> InputConsumer;
309322 const TString Sql;
310323};
311324
@@ -322,8 +335,8 @@ TJsonFilter::TJsonFilter(
322335TJsonFilter::~TJsonFilter () {
323336}
324337
325- void TJsonFilter::Push (const TVector<ui64>& offsets, const TVector<const NKikimr::NMiniKQL::TUnboxedValueVector *>& values) {
326- Impl->Push (offsets, values);
338+ void TJsonFilter::Push (const TVector<ui64>& offsets, const TVector<const TVector<NYql::NUdf::TUnboxedValue> *>& values, ui64 rowsOffset, ui64 numberRows ) {
339+ Impl->Push (offsets, values, rowsOffset, numberRows );
327340}
328341
329342TString TJsonFilter::GetSql () {
0 commit comments