Skip to content

Commit 4909bf7

Browse files
dilakshanDilaxn
authored andcommitted
Implement Indexed Priority Queue to improve grouping performance (#251)
Signed-off-by: Dilaxn <[email protected]>
1 parent 766784d commit 4909bf7

File tree

4 files changed

+217
-22
lines changed

4 files changed

+217
-22
lines changed

src/main/java/org/opensearch/plugin/insights/core/service/TopQueriesService.java

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@
2828
import java.util.Locale;
2929
import java.util.Map;
3030
import java.util.Objects;
31-
import java.util.concurrent.PriorityBlockingQueue;
31+
import java.util.concurrent.atomic.AtomicLong;
3232
import java.util.concurrent.atomic.AtomicReference;
3333
import java.util.function.Predicate;
3434
import java.util.stream.Collectors;
@@ -46,6 +46,7 @@
4646
import org.opensearch.plugin.insights.core.service.grouper.MinMaxHeapQueryGrouper;
4747
import org.opensearch.plugin.insights.core.service.grouper.QueryGrouper;
4848
import org.opensearch.plugin.insights.core.utils.ExporterReaderUtils;
49+
import org.opensearch.plugin.insights.core.utils.IndexedPriorityQueue;
4950
import org.opensearch.plugin.insights.rules.model.AggregationType;
5051
import org.opensearch.plugin.insights.rules.model.Attribute;
5152
import org.opensearch.plugin.insights.rules.model.GroupingType;
@@ -99,7 +100,7 @@ public class TopQueriesService {
99100
/**
100101
* The internal thread-safe store that holds the top n queries insight data
101102
*/
102-
private final PriorityBlockingQueue<SearchQueryRecord> topQueriesStore;
103+
private final IndexedPriorityQueue<String, SearchQueryRecord> topQueriesStore;
103104

104105
/**
105106
* The AtomicReference of a snapshot of the current window top queries for getters to consume
@@ -128,6 +129,8 @@ public class TopQueriesService {
128129

129130
private final QueryGrouper queryGrouper;
130131

132+
private final AtomicLong insertSequence = new AtomicLong();
133+
131134
TopQueriesService(
132135
final Client client,
133136
final MetricType metricType,
@@ -144,7 +147,7 @@ public class TopQueriesService {
144147
this.topNSize = QueryInsightsSettings.DEFAULT_TOP_N_SIZE;
145148
this.windowSize = QueryInsightsSettings.DEFAULT_WINDOW_SIZE;
146149
this.windowStart = -1L;
147-
topQueriesStore = new PriorityBlockingQueue<>(topNSize, (a, b) -> SearchQueryRecord.compare(a, b, metricType));
150+
topQueriesStore = new IndexedPriorityQueue<>(topNSize, (a, b) -> SearchQueryRecord.compare(a, b, metricType));
148151
topQueriesCurrentSnapshot = new AtomicReference<>(new ArrayList<>());
149152
topQueriesHistorySnapshot = new AtomicReference<>(new ArrayList<>());
150153
queryGrouper = new MinMaxHeapQueryGrouper(
@@ -411,7 +414,7 @@ void consumeRecords(final List<SearchQueryRecord> records) {
411414
// add records in current window, if there are any, to the top n store
412415
addToTopNStore(recordsInThisWindow);
413416
// update the current window snapshot for getters to consume
414-
final List<SearchQueryRecord> newSnapShot = new ArrayList<>(topQueriesStore);
417+
final List<SearchQueryRecord> newSnapShot = new ArrayList<>(topQueriesStore.getAllValues());
415418
newSnapShot.sort((a, b) -> SearchQueryRecord.compare(a, b, metricType));
416419
topQueriesCurrentSnapshot.set(newSnapShot);
417420
}
@@ -422,7 +425,10 @@ private void addToTopNStore(final List<SearchQueryRecord> records) {
422425
queryGrouper.add(record);
423426
}
424427
} else {
425-
topQueriesStore.addAll(records);
428+
for (SearchQueryRecord record : records) {
429+
String uniqueKey = String.valueOf(insertSequence.getAndIncrement());
430+
topQueriesStore.insert(uniqueKey, record);
431+
}
426432
// remove top elements for fix sizing priority queue
427433
while (topQueriesStore.size() > topNSize) {
428434
topQueriesStore.poll();
@@ -442,7 +448,7 @@ private void rotateWindowIfNecessary(final long newWindowStart) {
442448
final List<SearchQueryRecord> history = new ArrayList<>();
443449
// rotate the current window to history store only if the data belongs to the last window
444450
if (windowStart == newWindowStart - windowSize.getMillis()) {
445-
history.addAll(topQueriesStore);
451+
history.addAll(topQueriesStore.getAllValues());
446452
}
447453
topQueriesHistorySnapshot.set(history);
448454
topQueriesStore.clear();

src/main/java/org/opensearch/plugin/insights/core/service/grouper/MinMaxHeapQueryGrouper.java

Lines changed: 18 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -11,10 +11,10 @@
1111
import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.TOP_N_QUERIES_MAX_GROUPS_EXCLUDING_N;
1212

1313
import java.util.concurrent.ConcurrentHashMap;
14-
import java.util.concurrent.PriorityBlockingQueue;
1514
import org.apache.logging.log4j.LogManager;
1615
import org.apache.logging.log4j.Logger;
1716
import org.opensearch.common.collect.Tuple;
17+
import org.opensearch.plugin.insights.core.utils.IndexedPriorityQueue;
1818
import org.opensearch.plugin.insights.rules.model.AggregationType;
1919
import org.opensearch.plugin.insights.rules.model.Attribute;
2020
import org.opensearch.plugin.insights.rules.model.GroupingType;
@@ -58,14 +58,14 @@ public class MinMaxHeapQueryGrouper implements QueryGrouper {
5858
/**
5959
* Min heap to keep track of the Top N query groups and is passed from TopQueriesService as the topQueriesStore
6060
*/
61-
private final PriorityBlockingQueue<SearchQueryRecord> minHeapTopQueriesStore;
61+
private final IndexedPriorityQueue<String, SearchQueryRecord> minHeapTopQueriesStore;
6262
/**
6363
* The Max heap is an overflow data structure used to manage records that exceed the capacity of the Min heap.
6464
* It stores all records not included in the Top N query results. When the aggregate measurement for one of these
6565
* records is updated and it now qualifies as part of the Top N, the record is moved from the Max heap to the Min heap,
6666
* and the records are rearranged accordingly.
6767
*/
68-
private final PriorityBlockingQueue<SearchQueryRecord> maxHeapQueryStore;
68+
private final IndexedPriorityQueue<String, SearchQueryRecord> maxHeapQueryStore;
6969

7070
/**
7171
* Top N size based on the configuration set
@@ -84,7 +84,7 @@ public MinMaxHeapQueryGrouper(
8484
final MetricType metricType,
8585
final GroupingType groupingType,
8686
final AggregationType aggregationType,
87-
final PriorityBlockingQueue<SearchQueryRecord> topQueriesStore,
87+
final IndexedPriorityQueue<String, SearchQueryRecord> topQueriesStore,
8888
final int topNSize
8989
) {
9090
this.groupingType = groupingType;
@@ -94,7 +94,7 @@ public MinMaxHeapQueryGrouper(
9494
this.minHeapTopQueriesStore = topQueriesStore;
9595
this.topNSize = topNSize;
9696
this.maxGroups = QueryInsightsSettings.DEFAULT_GROUPS_EXCLUDING_TOPN_LIMIT;
97-
this.maxHeapQueryStore = new PriorityBlockingQueue<>(maxGroups, (a, b) -> SearchQueryRecord.compare(b, a, metricType));
97+
this.maxHeapQueryStore = new IndexedPriorityQueue<>(maxGroups, (a, b) -> SearchQueryRecord.compare(b, a, metricType));
9898
}
9999

100100
/**
@@ -137,9 +137,9 @@ public SearchQueryRecord add(final SearchQueryRecord searchQueryRecord) {
137137
aggregateSearchQueryRecord = groupIdToAggSearchQueryRecord.get(groupId).v1();
138138
boolean isPresentInMinPQ = groupIdToAggSearchQueryRecord.get(groupId).v2();
139139
if (isPresentInMinPQ) {
140-
minHeapTopQueriesStore.remove(aggregateSearchQueryRecord);
140+
minHeapTopQueriesStore.remove(groupId);
141141
} else {
142-
maxHeapQueryStore.remove(aggregateSearchQueryRecord);
142+
maxHeapQueryStore.remove(groupId);
143143
}
144144
addAndPromote(searchQueryRecord, aggregateSearchQueryRecord, groupId);
145145
}
@@ -207,7 +207,7 @@ public void updateTopNSize(final int newSize) {
207207
}
208208

209209
private void addToMinPQ(final SearchQueryRecord searchQueryRecord, final String groupId) {
210-
minHeapTopQueriesStore.add(searchQueryRecord);
210+
minHeapTopQueriesStore.insert(searchQueryRecord.getGroupingId(), searchQueryRecord);
211211
groupIdToAggSearchQueryRecord.put(groupId, new Tuple<>(searchQueryRecord, true));
212212
overflow();
213213
}
@@ -223,17 +223,21 @@ private void addAndPromote(
223223
if (maxHeapQueryStore.isEmpty()) {
224224
return;
225225
}
226-
if (SearchQueryRecord.compare(maxHeapQueryStore.peek(), minHeapTopQueriesStore.peek(), metricType) > 0) {
227-
SearchQueryRecord recordMovedFromMaxToMin = maxHeapQueryStore.poll();
228-
addToMinPQ(recordMovedFromMaxToMin, recordMovedFromMaxToMin.getGroupingId());
226+
IndexedPriorityQueue.Entry<String, SearchQueryRecord> maxPeek = maxHeapQueryStore.peek();
227+
IndexedPriorityQueue.Entry<String, SearchQueryRecord> minPeek = minHeapTopQueriesStore.peek();
228+
if (maxPeek != null && minPeek != null && SearchQueryRecord.compare(maxPeek.value, minPeek.value, metricType) > 0) {
229+
IndexedPriorityQueue.Entry<String, SearchQueryRecord> entryMovedFromMaxToMin = maxHeapQueryStore.pollEntry();
230+
addToMinPQ(entryMovedFromMaxToMin.value, entryMovedFromMaxToMin.key);
229231
}
230232
}
231233

232234
private void overflow() {
233235
if (minHeapTopQueriesStore.size() > topNSize) {
234-
SearchQueryRecord recordMovedFromMinToMax = minHeapTopQueriesStore.poll();
235-
maxHeapQueryStore.add(recordMovedFromMinToMax);
236-
groupIdToAggSearchQueryRecord.put(recordMovedFromMinToMax.getGroupingId(), new Tuple<>(recordMovedFromMinToMax, false));
236+
IndexedPriorityQueue.Entry<String, SearchQueryRecord> movedEntry = minHeapTopQueriesStore.pollEntry();
237+
if (movedEntry != null) {
238+
maxHeapQueryStore.insert(movedEntry.key, movedEntry.value);
239+
groupIdToAggSearchQueryRecord.put(movedEntry.key, new Tuple<>(movedEntry.value, false));
240+
}
237241
}
238242
}
239243

Lines changed: 185 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,185 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.plugin.insights.core.utils;
10+
11+
import java.util.ArrayList;
12+
import java.util.Comparator;
13+
import java.util.HashMap;
14+
import java.util.List;
15+
import java.util.Map;
16+
import java.util.concurrent.locks.Lock;
17+
import java.util.concurrent.locks.ReentrantReadWriteLock;
18+
19+
public class IndexedPriorityQueue<K, V> {
20+
21+
public static class Entry<K, V> {
22+
public final K key;
23+
public V value;
24+
25+
Entry(K key, V value) {
26+
this.key = key;
27+
this.value = value;
28+
}
29+
}
30+
31+
private final List<Entry<K, V>> heap;
32+
private final Map<K, Integer> indexMap;
33+
private final Comparator<? super V> comparator;
34+
35+
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
36+
private final Lock read = lock.readLock();
37+
private final Lock write = lock.writeLock();
38+
39+
public IndexedPriorityQueue(int initialCapacity, Comparator<? super V> comparator) {
40+
this.comparator = comparator;
41+
this.heap = new ArrayList<>(initialCapacity);
42+
this.indexMap = new HashMap<>(initialCapacity);
43+
}
44+
45+
public boolean insert(K key, V value) {
46+
write.lock();
47+
try {
48+
if (indexMap.containsKey(key)) return false;
49+
heap.add(new Entry<>(key, value));
50+
int idx = heap.size() - 1;
51+
indexMap.put(key, idx);
52+
siftUp(idx);
53+
return true;
54+
} finally {
55+
write.unlock();
56+
}
57+
}
58+
59+
public boolean remove(K key) {
60+
write.lock();
61+
try {
62+
Integer idx = indexMap.remove(key);
63+
if (idx == null) return false;
64+
65+
int lastIdx = heap.size() - 1;
66+
if (idx != lastIdx) {
67+
Entry<K, V> lastItem = heap.get(lastIdx);
68+
heap.set(idx, lastItem);
69+
indexMap.put(lastItem.key, idx);
70+
siftDown(idx);
71+
siftUp(idx);
72+
}
73+
heap.remove(lastIdx);
74+
return true;
75+
} finally {
76+
write.unlock();
77+
}
78+
}
79+
80+
public V poll() {
81+
Entry<K, V> e = pollEntry();
82+
return e == null ? null : e.value;
83+
}
84+
85+
public Entry<K, V> pollEntry() {
86+
write.lock();
87+
try {
88+
if (heap.isEmpty()) {
89+
return null;
90+
}
91+
Entry<K, V> head = heap.get(0);
92+
remove(head.key);
93+
return head;
94+
} finally {
95+
write.unlock();
96+
}
97+
}
98+
99+
public Entry<K, V> peek() {
100+
read.lock();
101+
try {
102+
return heap.isEmpty() ? null : heap.get(0);
103+
} finally {
104+
read.unlock();
105+
}
106+
}
107+
108+
public int size() {
109+
read.lock();
110+
try {
111+
return heap.size();
112+
} finally {
113+
read.unlock();
114+
}
115+
}
116+
117+
public List<V> getAllValues() {
118+
read.lock();
119+
try {
120+
List<V> values = new ArrayList<>();
121+
for (Entry<K, V> entry : heap) {
122+
values.add(entry.value);
123+
}
124+
return values;
125+
} finally {
126+
read.unlock();
127+
}
128+
}
129+
130+
public void clear() {
131+
write.lock();
132+
try {
133+
heap.clear();
134+
indexMap.clear();
135+
} finally {
136+
write.unlock();
137+
}
138+
}
139+
140+
private void siftUp(int idx) {
141+
Entry<K, V> item = heap.get(idx);
142+
while (idx > 0) {
143+
int parentIdx = (idx - 1) >>> 1;
144+
Entry<K, V> parent = heap.get(parentIdx);
145+
if (comparator.compare(item.value, parent.value) >= 0) break;
146+
heap.set(idx, parent);
147+
indexMap.put(parent.key, idx);
148+
idx = parentIdx;
149+
}
150+
heap.set(idx, item);
151+
indexMap.put(item.key, idx);
152+
}
153+
154+
private void siftDown(int idx) {
155+
int half = heap.size() >>> 1;
156+
Entry<K, V> item = heap.get(idx);
157+
while (idx < half) {
158+
int left = (idx << 1) + 1;
159+
int right = left + 1;
160+
int smallest = left;
161+
162+
if (right < heap.size() && comparator.compare(heap.get(right).value, heap.get(left).value) < 0) {
163+
smallest = right;
164+
}
165+
166+
Entry<K, V> smallestItem = heap.get(smallest);
167+
if (comparator.compare(item.value, smallestItem.value) <= 0) break;
168+
169+
heap.set(idx, smallestItem);
170+
indexMap.put(smallestItem.key, idx);
171+
idx = smallest;
172+
}
173+
heap.set(idx, item);
174+
indexMap.put(item.key, idx);
175+
}
176+
177+
public boolean isEmpty() {
178+
read.lock();
179+
try {
180+
return heap.isEmpty();
181+
} finally {
182+
read.unlock();
183+
}
184+
}
185+
}

src/test/java/org/opensearch/plugin/insights/core/service/grouper/MinMaxHeapQueryGrouperTests.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,9 @@
1111
import java.util.HashSet;
1212
import java.util.List;
1313
import java.util.Set;
14-
import java.util.concurrent.PriorityBlockingQueue;
1514
import org.junit.Before;
1615
import org.opensearch.plugin.insights.QueryInsightsTestUtils;
16+
import org.opensearch.plugin.insights.core.utils.IndexedPriorityQueue;
1717
import org.opensearch.plugin.insights.rules.model.AggregationType;
1818
import org.opensearch.plugin.insights.rules.model.Attribute;
1919
import org.opensearch.plugin.insights.rules.model.GroupingType;
@@ -27,7 +27,7 @@
2727
*/
2828
public class MinMaxHeapQueryGrouperTests extends OpenSearchTestCase {
2929
private MinMaxHeapQueryGrouper minMaxHeapQueryGrouper;
30-
private PriorityBlockingQueue<SearchQueryRecord> topQueriesStore = new PriorityBlockingQueue<>(
30+
private IndexedPriorityQueue<String, SearchQueryRecord> topQueriesStore = new IndexedPriorityQueue<>(
3131
100,
3232
(a, b) -> SearchQueryRecord.compare(a, b, MetricType.LATENCY)
3333
);

0 commit comments

Comments
 (0)