@@ -102,10 +102,9 @@ class TColumnDataAccessorFetching: public IDataAccessorRequestsSubscriber {
102
102
mem += accessor->GetColumnRawBytes (Request.GetFetchingColumnIds (), false );
103
103
}
104
104
105
- auto context = Request.GetContext ();
106
- NGroupedMemoryManager::TDeduplicationMemoryLimiterOperator::SendToAllocation (context->GetMemoryProcessId (), context->GetMemoryScopeId (),
107
- context->GetMemoryGroupId (), { std::make_shared<TColumnDataAllocation>(std::move (Request), mem) },
108
- (ui64)TFilterAccumulator::EFetchingStage::COLUMN_DATA);
105
+ NGroupedMemoryManager::TDeduplicationMemoryLimiterOperator::SendToAllocation (Request.GetRequestGuard ()->GetMemoryProcessId (),
106
+ Request.GetRequestGuard ()->GetMemoryScopeId (), Request.GetRequestGuard ()->GetMemoryGroupId (),
107
+ { std::make_shared<TColumnDataAllocation>(std::move (Request), mem) }, (ui64)TFilterAccumulator::EFetchingStage::COLUMN_DATA);
109
108
}
110
109
virtual const std::shared_ptr<const TAtomicCounter>& DoGetAbortionFlag () const override {
111
110
return Request.GetContext ()->GetRequest ()->Get ()->GetAbortionFlag ();
@@ -161,22 +160,25 @@ class TPortionIntersectionsAllocation: public NGroupedMemoryManager::IAllocation
161
160
private:
162
161
TActorId Owner;
163
162
std::shared_ptr<TFilterAccumulator> Request;
163
+ YDB_READONLY_DEF (std::unique_ptr<TFilterBuildingGuard>, RequestGuard);
164
164
165
165
private:
166
166
virtual void DoOnAllocationImpossible (const TString& errorMessage) override {
167
167
Request->Abort (TStringBuilder () << " cannot allocate memory: " << errorMessage);
168
168
}
169
169
virtual bool DoOnAllocated (std::shared_ptr<NGroupedMemoryManager::TAllocationGuard>&& guard,
170
170
const std::shared_ptr<NGroupedMemoryManager::IAllocation>& /* allocation*/ ) override {
171
- TActorContext::AsActorContext ().Send (Owner, new NPrivate::TEvFilterRequestResourcesAllocated (Request, guard));
171
+ TActorContext::AsActorContext ().Send (Owner, new NPrivate::TEvFilterRequestResourcesAllocated (Request, guard, std::move (RequestGuard) ));
172
172
return true ;
173
173
}
174
174
175
175
public:
176
- TPortionIntersectionsAllocation (const TActorId& owner, const std::shared_ptr<TFilterAccumulator>& request, const ui64 mem)
176
+ TPortionIntersectionsAllocation (const TActorId& owner, const std::shared_ptr<TFilterAccumulator>& request, const ui64 mem,
177
+ std::unique_ptr<TFilterBuildingGuard>&& requestGuard)
177
178
: NGroupedMemoryManager::IAllocation(mem)
178
179
, Owner(owner)
179
180
, Request(request)
181
+ , RequestGuard(std::move(requestGuard))
180
182
{
181
183
}
182
184
};
@@ -200,26 +202,35 @@ TDuplicateManager::TDuplicateManager(const TSpecialReadContext& context, const s
200
202
{
201
203
}
202
204
203
- void TDuplicateManager::Handle (const TEvRequestFilter::TPtr& ev) {
204
- auto constructor = std::make_shared<TFilterAccumulator>(ev);
205
- TPortionInfo::TConstPtr mainPortion = Portions->GetPortionVerified (constructor->GetRequest ()->Get ()->GetSourceId ());
206
- static constexpr ui64 LOW_INTERSECTIONS_LIMIT = 10 ;
205
+ bool TDuplicateManager::IsExclusiveInterval (const NArrow::TSimpleRow& begin, const NArrow::TSimpleRow& end) const {
207
206
ui64 intersectionsCount = 0 ;
208
- Intervals.EachIntersection (TPortionIntervalTree::TRange (mainPortion-> IndexKeyStart () , true , mainPortion-> IndexKeyEnd () , true ),
207
+ return Intervals.EachIntersection (TPortionIntervalTree::TRange (begin , true , end , true ),
209
208
[&intersectionsCount](const TPortionIntervalTree::TRange& /* interval*/ , const std::shared_ptr<TPortionInfo>& /* portion*/ ) {
210
209
++intersectionsCount;
211
- return intersectionsCount <= LOW_INTERSECTIONS_LIMIT ;
210
+ return intersectionsCount == 1 ;
212
211
});
213
- ExpectedIntersectionCount = std::max (ExpectedIntersectionCount, intersectionsCount);
214
- if (intersectionsCount <= LOW_INTERSECTIONS_LIMIT) {
215
- Send (SelfId (), new NPrivate::TEvFilterRequestResourcesAllocated (constructor, nullptr ));
216
- } else {
217
- NGroupedMemoryManager::TDeduplicationMemoryLimiterOperator::SendToAllocation (constructor->GetMemoryProcessId (),
218
- constructor->GetMemoryScopeId (), constructor->GetMemoryGroupId (),
219
- { std::make_shared<TPortionIntersectionsAllocation>(
220
- SelfId (), constructor, TBuildFilterContext::GetApproximateDataSize (ExpectedIntersectionCount)) },
221
- (ui64)TFilterAccumulator::EFetchingStage::INTERSECTIONS);
212
+ }
213
+
214
+ void TDuplicateManager::Handle (const TEvRequestFilter::TPtr& ev) {
215
+ auto constructor = std::make_shared<TFilterAccumulator>(ev);
216
+ TPortionInfo::TConstPtr mainPortion = Portions->GetPortionVerified (constructor->GetRequest ()->Get ()->GetSourceId ());
217
+ if (IsExclusiveInterval (mainPortion->IndexKeyStart (), mainPortion->IndexKeyEnd ())) {
218
+ auto filter = NArrow::TColumnFilter::BuildAllowFilter ();
219
+ filter.Add (true , mainPortion->GetRecordsCount ());
220
+ constructor->SetIntervalsCount (1 );
221
+ constructor->AddFilter (0 , std::move (filter));
222
+ AFL_VERIFY (constructor->IsDone ());
223
+ Counters->OnFilterRequest (1 );
224
+ Counters->OnRowsMerged (0 , 0 , mainPortion->GetRecordsCount ());
225
+ return ;
222
226
}
227
+
228
+ auto task = std::make_shared<TPortionIntersectionsAllocation>(
229
+ SelfId (), constructor, TBuildFilterContext::GetApproximateDataSize (ExpectedIntersectionCount), std::make_unique<TFilterBuildingGuard>());
230
+ NGroupedMemoryManager::TDeduplicationMemoryLimiterOperator::SendToAllocation (task->GetRequestGuard ()->GetMemoryProcessId (),
231
+ task->GetRequestGuard ()->GetMemoryScopeId (), task->GetRequestGuard ()->GetMemoryGroupId (), { task },
232
+ (ui64)TFilterAccumulator::EFetchingStage::INTERSECTIONS);
233
+ return ;
223
234
}
224
235
225
236
void TDuplicateManager::StartIntervalProcessing (const THashMap<ui64, TPortionInfo::TConstPtr>& intersectingPortions,
@@ -271,6 +282,7 @@ void TDuplicateManager::StartIntervalProcessing(const THashMap<ui64, TPortionInf
271
282
void TDuplicateManager::Handle (const NPrivate::TEvFilterRequestResourcesAllocated::TPtr& ev) {
272
283
std::shared_ptr<TFilterAccumulator> constructor = ev->Get ()->GetRequest ();
273
284
std::shared_ptr<NGroupedMemoryManager::TAllocationGuard> memoryGuard = ev->Get ()->ExtractAllocationGuard ();
285
+ auto requestGuard = ev->Get ()->ExtractRequestGuard ();
274
286
275
287
THashMap<ui64, TPortionInfo::TConstPtr> intersectingPortions;
276
288
const std::shared_ptr<const TPortionInfo>& mainPortion = Portions->GetPortionVerified (constructor->GetRequest ()->Get ()->GetSourceId ());
@@ -290,16 +302,6 @@ void TDuplicateManager::Handle(const NPrivate::TEvFilterRequestResourcesAllocate
290
302
(" source" , constructor->GetRequest ()->Get ()->GetSourceId ())(" intersecting_portions" , intersectingPortions.size ());
291
303
AFL_VERIFY (intersectingPortions.size ());
292
304
293
- if (intersectingPortions.size () == 1 ) {
294
- AFL_VERIFY ((*intersectingPortions.begin ()).first == mainPortion->GetPortionId ());
295
- auto filter = NArrow::TColumnFilter::BuildAllowFilter ();
296
- filter.Add (true , mainPortion->GetRecordsCount ());
297
- constructor->SetIntervalsCount (1 );
298
- constructor->AddFilter (0 , std::move (filter));
299
- AFL_VERIFY (constructor->IsDone ());
300
- return ;
301
- }
302
-
303
305
THashSet<ui64> portionIdsToFetch;
304
306
std::vector<std::pair<TColumnDataSplitter::TBorder, TColumnDataSplitter::TBorder>> intervalsToBuild;
305
307
StartIntervalProcessing (intersectingPortions, constructor, portionIdsToFetch, intervalsToBuild);
@@ -314,13 +316,13 @@ void TDuplicateManager::Handle(const NPrivate::TEvFilterRequestResourcesAllocate
314
316
AFL_VERIFY (portionIdsToFetch.contains (mainPortion->GetPortionId ()))(" main_portion" , mainPortion->GetPortionId ())(
315
317
" required_portions" , JoinSeq (' ,' , portionIdsToFetch));
316
318
TBuildFilterContext columnFetchingRequest (SelfId (), constructor, std::move (portionsToFetch), std::move (intervalsToBuild),
317
- GetFetchingColumns (), PKSchema, ColumnDataManager, DataAccessorsManager, Counters, memoryGuard);
318
- if (memoryGuard) {
319
- memoryGuard->Update (columnFetchingRequest.GetDataSize ());
320
- }
319
+ GetFetchingColumns (), PKSchema, ColumnDataManager, DataAccessorsManager, Counters, std::move (requestGuard), memoryGuard);
320
+ memoryGuard->Update (columnFetchingRequest.GetDataSize ());
321
321
const ui64 mem = TColumnDataAccessorFetching::GetRequiredMemory (columnFetchingRequest, LastSchema);
322
- NGroupedMemoryManager::TDeduplicationMemoryLimiterOperator::SendToAllocation (constructor->GetMemoryProcessId (),
323
- constructor->GetMemoryScopeId (), constructor->GetMemoryGroupId (),
322
+ const ui64 processId = columnFetchingRequest.GetRequestGuard ()->GetMemoryProcessId ();
323
+ const ui64 scopeId = columnFetchingRequest.GetRequestGuard ()->GetMemoryScopeId ();
324
+ const ui64 groupId = columnFetchingRequest.GetRequestGuard ()->GetMemoryGroupId ();
325
+ NGroupedMemoryManager::TDeduplicationMemoryLimiterOperator::SendToAllocation (processId, scopeId, groupId,
324
326
{ std::make_shared<TDataAccessorAllocation>(std::move (columnFetchingRequest), mem) },
325
327
(ui64)TFilterAccumulator::EFetchingStage::ACCESSORS);
326
328
}
0 commit comments