Skip to content

Commit a65e75d

Browse files
author
dilakshan
committed
Implement Indexed Priority Queue to improve grouping performance (#251)
Signed-off-by: dilakshan <[email protected]>
1 parent 766784d commit a65e75d

File tree

4 files changed

+144
-17
lines changed

4 files changed

+144
-17
lines changed

src/main/java/org/opensearch/plugin/insights/core/service/TopQueriesService.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@
2828
import java.util.Locale;
2929
import java.util.Map;
3030
import java.util.Objects;
31-
import java.util.concurrent.PriorityBlockingQueue;
3231
import java.util.concurrent.atomic.AtomicReference;
3332
import java.util.function.Predicate;
3433
import java.util.stream.Collectors;
@@ -46,6 +45,7 @@
4645
import org.opensearch.plugin.insights.core.service.grouper.MinMaxHeapQueryGrouper;
4746
import org.opensearch.plugin.insights.core.service.grouper.QueryGrouper;
4847
import org.opensearch.plugin.insights.core.utils.ExporterReaderUtils;
48+
import org.opensearch.plugin.insights.core.utils.IndexedPriorityQueue;
4949
import org.opensearch.plugin.insights.rules.model.AggregationType;
5050
import org.opensearch.plugin.insights.rules.model.Attribute;
5151
import org.opensearch.plugin.insights.rules.model.GroupingType;
@@ -99,7 +99,7 @@ public class TopQueriesService {
9999
/**
100100
* The internal thread-safe store that holds the top n queries insight data
101101
*/
102-
private final PriorityBlockingQueue<SearchQueryRecord> topQueriesStore;
102+
private final IndexedPriorityQueue<SearchQueryRecord> topQueriesStore;
103103

104104
/**
105105
* The AtomicReference of a snapshot of the current window top queries for getters to consume
@@ -144,7 +144,7 @@ public class TopQueriesService {
144144
this.topNSize = QueryInsightsSettings.DEFAULT_TOP_N_SIZE;
145145
this.windowSize = QueryInsightsSettings.DEFAULT_WINDOW_SIZE;
146146
this.windowStart = -1L;
147-
topQueriesStore = new PriorityBlockingQueue<>(topNSize, (a, b) -> SearchQueryRecord.compare(a, b, metricType));
147+
topQueriesStore = new IndexedPriorityQueue<>(topNSize, (a, b) -> SearchQueryRecord.compare(a, b, metricType));
148148
topQueriesCurrentSnapshot = new AtomicReference<>(new ArrayList<>());
149149
topQueriesHistorySnapshot = new AtomicReference<>(new ArrayList<>());
150150
queryGrouper = new MinMaxHeapQueryGrouper(
@@ -411,8 +411,7 @@ void consumeRecords(final List<SearchQueryRecord> records) {
411411
// add records in current window, if there are any, to the top n store
412412
addToTopNStore(recordsInThisWindow);
413413
// update the current window snapshot for getters to consume
414-
final List<SearchQueryRecord> newSnapShot = new ArrayList<>(topQueriesStore);
415-
newSnapShot.sort((a, b) -> SearchQueryRecord.compare(a, b, metricType));
414+
final List<SearchQueryRecord> newSnapShot = topQueriesStore.toSortedList();
416415
topQueriesCurrentSnapshot.set(newSnapShot);
417416
}
418417

@@ -422,8 +421,9 @@ private void addToTopNStore(final List<SearchQueryRecord> records) {
422421
queryGrouper.add(record);
423422
}
424423
} else {
425-
topQueriesStore.addAll(records);
426-
// remove top elements for fix sizing priority queue
424+
for (SearchQueryRecord record : records) {
425+
topQueriesStore.insert(record);
426+
}
427427
while (topQueriesStore.size() > topNSize) {
428428
topQueriesStore.poll();
429429
}
@@ -442,7 +442,7 @@ private void rotateWindowIfNecessary(final long newWindowStart) {
442442
final List<SearchQueryRecord> history = new ArrayList<>();
443443
// rotate the current window to history store only if the data belongs to the last window
444444
if (windowStart == newWindowStart - windowSize.getMillis()) {
445-
history.addAll(topQueriesStore);
445+
history.addAll(topQueriesStore.getAllElements());
446446
}
447447
topQueriesHistorySnapshot.set(history);
448448
topQueriesStore.clear();

src/main/java/org/opensearch/plugin/insights/core/service/grouper/MinMaxHeapQueryGrouper.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -11,10 +11,10 @@
1111
import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.TOP_N_QUERIES_MAX_GROUPS_EXCLUDING_N;
1212

1313
import java.util.concurrent.ConcurrentHashMap;
14-
import java.util.concurrent.PriorityBlockingQueue;
1514
import org.apache.logging.log4j.LogManager;
1615
import org.apache.logging.log4j.Logger;
1716
import org.opensearch.common.collect.Tuple;
17+
import org.opensearch.plugin.insights.core.utils.IndexedPriorityQueue;
1818
import org.opensearch.plugin.insights.rules.model.AggregationType;
1919
import org.opensearch.plugin.insights.rules.model.Attribute;
2020
import org.opensearch.plugin.insights.rules.model.GroupingType;
@@ -58,14 +58,14 @@ public class MinMaxHeapQueryGrouper implements QueryGrouper {
5858
/**
5959
* Min heap to keep track of the Top N query groups and is passed from TopQueriesService as the topQueriesStore
6060
*/
61-
private final PriorityBlockingQueue<SearchQueryRecord> minHeapTopQueriesStore;
61+
private final IndexedPriorityQueue<SearchQueryRecord> minHeapTopQueriesStore;
6262
/**
6363
* The Max heap is an overflow data structure used to manage records that exceed the capacity of the Min heap.
6464
* It stores all records not included in the Top N query results. When the aggregate measurement for one of these
6565
* records is updated and it now qualifies as part of the Top N, the record is moved from the Max heap to the Min heap,
6666
* and the records are rearranged accordingly.
6767
*/
68-
private final PriorityBlockingQueue<SearchQueryRecord> maxHeapQueryStore;
68+
private final IndexedPriorityQueue<SearchQueryRecord> maxHeapQueryStore;
6969

7070
/**
7171
* Top N size based on the configuration set
@@ -84,7 +84,7 @@ public MinMaxHeapQueryGrouper(
8484
final MetricType metricType,
8585
final GroupingType groupingType,
8686
final AggregationType aggregationType,
87-
final PriorityBlockingQueue<SearchQueryRecord> topQueriesStore,
87+
final IndexedPriorityQueue<SearchQueryRecord> topQueriesStore,
8888
final int topNSize
8989
) {
9090
this.groupingType = groupingType;
@@ -94,7 +94,7 @@ public MinMaxHeapQueryGrouper(
9494
this.minHeapTopQueriesStore = topQueriesStore;
9595
this.topNSize = topNSize;
9696
this.maxGroups = QueryInsightsSettings.DEFAULT_GROUPS_EXCLUDING_TOPN_LIMIT;
97-
this.maxHeapQueryStore = new PriorityBlockingQueue<>(maxGroups, (a, b) -> SearchQueryRecord.compare(b, a, metricType));
97+
this.maxHeapQueryStore = new IndexedPriorityQueue<>(maxGroups, (a, b) -> SearchQueryRecord.compare(b, a, metricType));
9898
}
9999

100100
/**
@@ -207,7 +207,7 @@ public void updateTopNSize(final int newSize) {
207207
}
208208

209209
private void addToMinPQ(final SearchQueryRecord searchQueryRecord, final String groupId) {
210-
minHeapTopQueriesStore.add(searchQueryRecord);
210+
minHeapTopQueriesStore.insert(searchQueryRecord);
211211
groupIdToAggSearchQueryRecord.put(groupId, new Tuple<>(searchQueryRecord, true));
212212
overflow();
213213
}
@@ -232,7 +232,7 @@ private void addAndPromote(
232232
private void overflow() {
233233
if (minHeapTopQueriesStore.size() > topNSize) {
234234
SearchQueryRecord recordMovedFromMinToMax = minHeapTopQueriesStore.poll();
235-
maxHeapQueryStore.add(recordMovedFromMinToMax);
235+
maxHeapQueryStore.insert(recordMovedFromMinToMax);
236236
groupIdToAggSearchQueryRecord.put(recordMovedFromMinToMax.getGroupingId(), new Tuple<>(recordMovedFromMinToMax, false));
237237
}
238238
}
Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,127 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.plugin.insights.core.utils;
10+
11+
import java.util.ArrayList;
12+
import java.util.Collection;
13+
import java.util.Comparator;
14+
import java.util.HashMap;
15+
import java.util.List;
16+
import java.util.Map;
17+
18+
public class IndexedPriorityQueue<T> {
19+
20+
private final List<T> heap;
21+
private final Map<T, Integer> indexMap;
22+
private final Comparator<? super T> comparator;
23+
24+
public IndexedPriorityQueue(int initialCapacity, Comparator<? super T> comparator) {
25+
this.comparator = comparator;
26+
this.heap = new ArrayList<>(initialCapacity);
27+
this.indexMap = new HashMap<>(initialCapacity);
28+
}
29+
30+
public synchronized boolean insert(T item) {
31+
if (indexMap.containsKey(item)) return false;
32+
heap.add(item);
33+
int idx = heap.size() - 1;
34+
indexMap.put(item, idx);
35+
siftUp(idx);
36+
return true;
37+
}
38+
39+
public synchronized boolean remove(T item) {
40+
Integer idx = indexMap.remove(item);
41+
if (idx == null) return false;
42+
43+
int lastIdx = heap.size() - 1;
44+
if (idx != lastIdx) {
45+
T lastItem = heap.get(lastIdx);
46+
heap.set(idx, lastItem);
47+
indexMap.put(lastItem, idx);
48+
siftDown(idx);
49+
siftUp(idx);
50+
}
51+
heap.remove(lastIdx);
52+
return true;
53+
}
54+
55+
public synchronized Collection<T> getAllElements() {
56+
return new ArrayList<>(heap);
57+
}
58+
59+
public synchronized T poll() {
60+
if (heap.isEmpty()) return null;
61+
T result = heap.get(0);
62+
remove(result);
63+
return result;
64+
}
65+
66+
public synchronized T peek() {
67+
return heap.isEmpty() ? null : heap.get(0);
68+
}
69+
70+
public synchronized boolean contains(T item) {
71+
return indexMap.containsKey(item);
72+
}
73+
74+
public synchronized int size() {
75+
return heap.size();
76+
}
77+
78+
public synchronized void clear() {
79+
heap.clear();
80+
indexMap.clear();
81+
}
82+
83+
public synchronized List<T> toSortedList() {
84+
List<T> sorted = new ArrayList<>(heap);
85+
sorted.sort(comparator);
86+
return sorted;
87+
}
88+
89+
private void siftUp(int idx) {
90+
T item = heap.get(idx);
91+
while (idx > 0) {
92+
int parentIdx = (idx - 1) >>> 1;
93+
T parent = heap.get(parentIdx);
94+
if (comparator.compare(item, parent) >= 0) break;
95+
heap.set(idx, parent);
96+
indexMap.put(parent, idx);
97+
idx = parentIdx;
98+
}
99+
heap.set(idx, item);
100+
indexMap.put(item, idx);
101+
}
102+
103+
private void siftDown(int idx) {
104+
int half = heap.size() >>> 1;
105+
T item = heap.get(idx);
106+
while (idx < half) {
107+
int left = (idx << 1) + 1;
108+
int right = left + 1;
109+
int smallest = left;
110+
111+
if (right < heap.size() && comparator.compare(heap.get(right), heap.get(left)) < 0) smallest = right;
112+
113+
T smallestItem = heap.get(smallest);
114+
if (comparator.compare(item, smallestItem) <= 0) break;
115+
116+
heap.set(idx, smallestItem);
117+
indexMap.put(smallestItem, idx);
118+
idx = smallest;
119+
}
120+
heap.set(idx, item);
121+
indexMap.put(item, idx);
122+
}
123+
124+
public boolean isEmpty() {
125+
return heap.isEmpty();
126+
}
127+
}

src/test/java/org/opensearch/plugin/insights/core/service/grouper/MinMaxHeapQueryGrouperTests.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,9 @@
1111
import java.util.HashSet;
1212
import java.util.List;
1313
import java.util.Set;
14-
import java.util.concurrent.PriorityBlockingQueue;
1514
import org.junit.Before;
1615
import org.opensearch.plugin.insights.QueryInsightsTestUtils;
16+
import org.opensearch.plugin.insights.core.utils.IndexedPriorityQueue;
1717
import org.opensearch.plugin.insights.rules.model.AggregationType;
1818
import org.opensearch.plugin.insights.rules.model.Attribute;
1919
import org.opensearch.plugin.insights.rules.model.GroupingType;
@@ -27,7 +27,7 @@
2727
*/
2828
public class MinMaxHeapQueryGrouperTests extends OpenSearchTestCase {
2929
private MinMaxHeapQueryGrouper minMaxHeapQueryGrouper;
30-
private PriorityBlockingQueue<SearchQueryRecord> topQueriesStore = new PriorityBlockingQueue<>(
30+
private IndexedPriorityQueue<SearchQueryRecord> topQueriesStore = new IndexedPriorityQueue<>(
3131
100,
3232
(a, b) -> SearchQueryRecord.compare(a, b, MetricType.LATENCY)
3333
);

0 commit comments

Comments
 (0)