From 4909bf7923583ec738d6ab7e6b14a7a7ac1e0b7b Mon Sep 17 00:00:00 2001 From: dilakshan Date: Sun, 13 Apr 2025 17:32:26 +0000 Subject: [PATCH 1/2] Implement Indexed Priority Queue to improve grouping performance (#251) Signed-off-by: Dilaxn --- .../core/service/TopQueriesService.java | 18 +- .../grouper/MinMaxHeapQueryGrouper.java | 32 +-- .../core/utils/IndexedPriorityQueue.java | 185 ++++++++++++++++++ .../grouper/MinMaxHeapQueryGrouperTests.java | 4 +- 4 files changed, 217 insertions(+), 22 deletions(-) create mode 100644 src/main/java/org/opensearch/plugin/insights/core/utils/IndexedPriorityQueue.java diff --git a/src/main/java/org/opensearch/plugin/insights/core/service/TopQueriesService.java b/src/main/java/org/opensearch/plugin/insights/core/service/TopQueriesService.java index 06425ac9..e576bebf 100644 --- a/src/main/java/org/opensearch/plugin/insights/core/service/TopQueriesService.java +++ b/src/main/java/org/opensearch/plugin/insights/core/service/TopQueriesService.java @@ -28,7 +28,7 @@ import java.util.Locale; import java.util.Map; import java.util.Objects; -import java.util.concurrent.PriorityBlockingQueue; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Predicate; import java.util.stream.Collectors; @@ -46,6 +46,7 @@ import org.opensearch.plugin.insights.core.service.grouper.MinMaxHeapQueryGrouper; import org.opensearch.plugin.insights.core.service.grouper.QueryGrouper; import org.opensearch.plugin.insights.core.utils.ExporterReaderUtils; +import org.opensearch.plugin.insights.core.utils.IndexedPriorityQueue; import org.opensearch.plugin.insights.rules.model.AggregationType; import org.opensearch.plugin.insights.rules.model.Attribute; import org.opensearch.plugin.insights.rules.model.GroupingType; @@ -99,7 +100,7 @@ public class TopQueriesService { /** * The internal thread-safe store that holds the top n queries insight data */ - private final PriorityBlockingQueue topQueriesStore; + private final IndexedPriorityQueue topQueriesStore; /** * The AtomicReference of a snapshot of the current window top queries for getters to consume @@ -128,6 +129,8 @@ public class TopQueriesService { private final QueryGrouper queryGrouper; + private final AtomicLong insertSequence = new AtomicLong(); + TopQueriesService( final Client client, final MetricType metricType, @@ -144,7 +147,7 @@ public class TopQueriesService { this.topNSize = QueryInsightsSettings.DEFAULT_TOP_N_SIZE; this.windowSize = QueryInsightsSettings.DEFAULT_WINDOW_SIZE; this.windowStart = -1L; - topQueriesStore = new PriorityBlockingQueue<>(topNSize, (a, b) -> SearchQueryRecord.compare(a, b, metricType)); + topQueriesStore = new IndexedPriorityQueue<>(topNSize, (a, b) -> SearchQueryRecord.compare(a, b, metricType)); topQueriesCurrentSnapshot = new AtomicReference<>(new ArrayList<>()); topQueriesHistorySnapshot = new AtomicReference<>(new ArrayList<>()); queryGrouper = new MinMaxHeapQueryGrouper( @@ -411,7 +414,7 @@ void consumeRecords(final List records) { // add records in current window, if there are any, to the top n store addToTopNStore(recordsInThisWindow); // update the current window snapshot for getters to consume - final List newSnapShot = new ArrayList<>(topQueriesStore); + final List newSnapShot = new ArrayList<>(topQueriesStore.getAllValues()); newSnapShot.sort((a, b) -> SearchQueryRecord.compare(a, b, metricType)); topQueriesCurrentSnapshot.set(newSnapShot); } @@ -422,7 +425,10 @@ private void addToTopNStore(final List records) { queryGrouper.add(record); } } else { - topQueriesStore.addAll(records); + for (SearchQueryRecord record : records) { + String uniqueKey = String.valueOf(insertSequence.getAndIncrement()); + topQueriesStore.insert(uniqueKey, record); + } // remove top elements for fix sizing priority queue while (topQueriesStore.size() > topNSize) { topQueriesStore.poll(); @@ -442,7 +448,7 @@ private void rotateWindowIfNecessary(final long newWindowStart) { final List history = new ArrayList<>(); // rotate the current window to history store only if the data belongs to the last window if (windowStart == newWindowStart - windowSize.getMillis()) { - history.addAll(topQueriesStore); + history.addAll(topQueriesStore.getAllValues()); } topQueriesHistorySnapshot.set(history); topQueriesStore.clear(); diff --git a/src/main/java/org/opensearch/plugin/insights/core/service/grouper/MinMaxHeapQueryGrouper.java b/src/main/java/org/opensearch/plugin/insights/core/service/grouper/MinMaxHeapQueryGrouper.java index 8cb79af3..55a02ea0 100644 --- a/src/main/java/org/opensearch/plugin/insights/core/service/grouper/MinMaxHeapQueryGrouper.java +++ b/src/main/java/org/opensearch/plugin/insights/core/service/grouper/MinMaxHeapQueryGrouper.java @@ -11,10 +11,10 @@ import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.TOP_N_QUERIES_MAX_GROUPS_EXCLUDING_N; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.PriorityBlockingQueue; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.opensearch.common.collect.Tuple; +import org.opensearch.plugin.insights.core.utils.IndexedPriorityQueue; import org.opensearch.plugin.insights.rules.model.AggregationType; import org.opensearch.plugin.insights.rules.model.Attribute; import org.opensearch.plugin.insights.rules.model.GroupingType; @@ -58,14 +58,14 @@ public class MinMaxHeapQueryGrouper implements QueryGrouper { /** * Min heap to keep track of the Top N query groups and is passed from TopQueriesService as the topQueriesStore */ - private final PriorityBlockingQueue minHeapTopQueriesStore; + private final IndexedPriorityQueue minHeapTopQueriesStore; /** * The Max heap is an overflow data structure used to manage records that exceed the capacity of the Min heap. * It stores all records not included in the Top N query results. When the aggregate measurement for one of these * records is updated and it now qualifies as part of the Top N, the record is moved from the Max heap to the Min heap, * and the records are rearranged accordingly. */ - private final PriorityBlockingQueue maxHeapQueryStore; + private final IndexedPriorityQueue maxHeapQueryStore; /** * Top N size based on the configuration set @@ -84,7 +84,7 @@ public MinMaxHeapQueryGrouper( final MetricType metricType, final GroupingType groupingType, final AggregationType aggregationType, - final PriorityBlockingQueue topQueriesStore, + final IndexedPriorityQueue topQueriesStore, final int topNSize ) { this.groupingType = groupingType; @@ -94,7 +94,7 @@ public MinMaxHeapQueryGrouper( this.minHeapTopQueriesStore = topQueriesStore; this.topNSize = topNSize; this.maxGroups = QueryInsightsSettings.DEFAULT_GROUPS_EXCLUDING_TOPN_LIMIT; - this.maxHeapQueryStore = new PriorityBlockingQueue<>(maxGroups, (a, b) -> SearchQueryRecord.compare(b, a, metricType)); + this.maxHeapQueryStore = new IndexedPriorityQueue<>(maxGroups, (a, b) -> SearchQueryRecord.compare(b, a, metricType)); } /** @@ -137,9 +137,9 @@ public SearchQueryRecord add(final SearchQueryRecord searchQueryRecord) { aggregateSearchQueryRecord = groupIdToAggSearchQueryRecord.get(groupId).v1(); boolean isPresentInMinPQ = groupIdToAggSearchQueryRecord.get(groupId).v2(); if (isPresentInMinPQ) { - minHeapTopQueriesStore.remove(aggregateSearchQueryRecord); + minHeapTopQueriesStore.remove(groupId); } else { - maxHeapQueryStore.remove(aggregateSearchQueryRecord); + maxHeapQueryStore.remove(groupId); } addAndPromote(searchQueryRecord, aggregateSearchQueryRecord, groupId); } @@ -207,7 +207,7 @@ public void updateTopNSize(final int newSize) { } private void addToMinPQ(final SearchQueryRecord searchQueryRecord, final String groupId) { - minHeapTopQueriesStore.add(searchQueryRecord); + minHeapTopQueriesStore.insert(searchQueryRecord.getGroupingId(), searchQueryRecord); groupIdToAggSearchQueryRecord.put(groupId, new Tuple<>(searchQueryRecord, true)); overflow(); } @@ -223,17 +223,21 @@ private void addAndPromote( if (maxHeapQueryStore.isEmpty()) { return; } - if (SearchQueryRecord.compare(maxHeapQueryStore.peek(), minHeapTopQueriesStore.peek(), metricType) > 0) { - SearchQueryRecord recordMovedFromMaxToMin = maxHeapQueryStore.poll(); - addToMinPQ(recordMovedFromMaxToMin, recordMovedFromMaxToMin.getGroupingId()); + IndexedPriorityQueue.Entry maxPeek = maxHeapQueryStore.peek(); + IndexedPriorityQueue.Entry minPeek = minHeapTopQueriesStore.peek(); + if (maxPeek != null && minPeek != null && SearchQueryRecord.compare(maxPeek.value, minPeek.value, metricType) > 0) { + IndexedPriorityQueue.Entry entryMovedFromMaxToMin = maxHeapQueryStore.pollEntry(); + addToMinPQ(entryMovedFromMaxToMin.value, entryMovedFromMaxToMin.key); } } private void overflow() { if (minHeapTopQueriesStore.size() > topNSize) { - SearchQueryRecord recordMovedFromMinToMax = minHeapTopQueriesStore.poll(); - maxHeapQueryStore.add(recordMovedFromMinToMax); - groupIdToAggSearchQueryRecord.put(recordMovedFromMinToMax.getGroupingId(), new Tuple<>(recordMovedFromMinToMax, false)); + IndexedPriorityQueue.Entry movedEntry = minHeapTopQueriesStore.pollEntry(); + if (movedEntry != null) { + maxHeapQueryStore.insert(movedEntry.key, movedEntry.value); + groupIdToAggSearchQueryRecord.put(movedEntry.key, new Tuple<>(movedEntry.value, false)); + } } } diff --git a/src/main/java/org/opensearch/plugin/insights/core/utils/IndexedPriorityQueue.java b/src/main/java/org/opensearch/plugin/insights/core/utils/IndexedPriorityQueue.java new file mode 100644 index 00000000..08cb883f --- /dev/null +++ b/src/main/java/org/opensearch/plugin/insights/core/utils/IndexedPriorityQueue.java @@ -0,0 +1,185 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.plugin.insights.core.utils; + +import java.util.ArrayList; +import java.util.Comparator; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +public class IndexedPriorityQueue { + + public static class Entry { + public final K key; + public V value; + + Entry(K key, V value) { + this.key = key; + this.value = value; + } + } + + private final List> heap; + private final Map indexMap; + private final Comparator comparator; + + private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); + private final Lock read = lock.readLock(); + private final Lock write = lock.writeLock(); + + public IndexedPriorityQueue(int initialCapacity, Comparator comparator) { + this.comparator = comparator; + this.heap = new ArrayList<>(initialCapacity); + this.indexMap = new HashMap<>(initialCapacity); + } + + public boolean insert(K key, V value) { + write.lock(); + try { + if (indexMap.containsKey(key)) return false; + heap.add(new Entry<>(key, value)); + int idx = heap.size() - 1; + indexMap.put(key, idx); + siftUp(idx); + return true; + } finally { + write.unlock(); + } + } + + public boolean remove(K key) { + write.lock(); + try { + Integer idx = indexMap.remove(key); + if (idx == null) return false; + + int lastIdx = heap.size() - 1; + if (idx != lastIdx) { + Entry lastItem = heap.get(lastIdx); + heap.set(idx, lastItem); + indexMap.put(lastItem.key, idx); + siftDown(idx); + siftUp(idx); + } + heap.remove(lastIdx); + return true; + } finally { + write.unlock(); + } + } + + public V poll() { + Entry e = pollEntry(); + return e == null ? null : e.value; + } + + public Entry pollEntry() { + write.lock(); + try { + if (heap.isEmpty()) { + return null; + } + Entry head = heap.get(0); + remove(head.key); + return head; + } finally { + write.unlock(); + } + } + + public Entry peek() { + read.lock(); + try { + return heap.isEmpty() ? null : heap.get(0); + } finally { + read.unlock(); + } + } + + public int size() { + read.lock(); + try { + return heap.size(); + } finally { + read.unlock(); + } + } + + public List getAllValues() { + read.lock(); + try { + List values = new ArrayList<>(); + for (Entry entry : heap) { + values.add(entry.value); + } + return values; + } finally { + read.unlock(); + } + } + + public void clear() { + write.lock(); + try { + heap.clear(); + indexMap.clear(); + } finally { + write.unlock(); + } + } + + private void siftUp(int idx) { + Entry item = heap.get(idx); + while (idx > 0) { + int parentIdx = (idx - 1) >>> 1; + Entry parent = heap.get(parentIdx); + if (comparator.compare(item.value, parent.value) >= 0) break; + heap.set(idx, parent); + indexMap.put(parent.key, idx); + idx = parentIdx; + } + heap.set(idx, item); + indexMap.put(item.key, idx); + } + + private void siftDown(int idx) { + int half = heap.size() >>> 1; + Entry item = heap.get(idx); + while (idx < half) { + int left = (idx << 1) + 1; + int right = left + 1; + int smallest = left; + + if (right < heap.size() && comparator.compare(heap.get(right).value, heap.get(left).value) < 0) { + smallest = right; + } + + Entry smallestItem = heap.get(smallest); + if (comparator.compare(item.value, smallestItem.value) <= 0) break; + + heap.set(idx, smallestItem); + indexMap.put(smallestItem.key, idx); + idx = smallest; + } + heap.set(idx, item); + indexMap.put(item.key, idx); + } + + public boolean isEmpty() { + read.lock(); + try { + return heap.isEmpty(); + } finally { + read.unlock(); + } + } +} diff --git a/src/test/java/org/opensearch/plugin/insights/core/service/grouper/MinMaxHeapQueryGrouperTests.java b/src/test/java/org/opensearch/plugin/insights/core/service/grouper/MinMaxHeapQueryGrouperTests.java index d5ca7bff..a2b5deba 100644 --- a/src/test/java/org/opensearch/plugin/insights/core/service/grouper/MinMaxHeapQueryGrouperTests.java +++ b/src/test/java/org/opensearch/plugin/insights/core/service/grouper/MinMaxHeapQueryGrouperTests.java @@ -11,9 +11,9 @@ import java.util.HashSet; import java.util.List; import java.util.Set; -import java.util.concurrent.PriorityBlockingQueue; import org.junit.Before; import org.opensearch.plugin.insights.QueryInsightsTestUtils; +import org.opensearch.plugin.insights.core.utils.IndexedPriorityQueue; import org.opensearch.plugin.insights.rules.model.AggregationType; import org.opensearch.plugin.insights.rules.model.Attribute; import org.opensearch.plugin.insights.rules.model.GroupingType; @@ -27,7 +27,7 @@ */ public class MinMaxHeapQueryGrouperTests extends OpenSearchTestCase { private MinMaxHeapQueryGrouper minMaxHeapQueryGrouper; - private PriorityBlockingQueue topQueriesStore = new PriorityBlockingQueue<>( + private IndexedPriorityQueue topQueriesStore = new IndexedPriorityQueue<>( 100, (a, b) -> SearchQueryRecord.compare(a, b, MetricType.LATENCY) ); From 3792e70b1356191de8a1220afeec1ad73e1d6490 Mon Sep 17 00:00:00 2001 From: Dilaxn Date: Sun, 15 Jun 2025 06:29:23 +0000 Subject: [PATCH 2/2] Resolve review comments --- .../core/utils/IndexedPriorityQueue.java | 43 +++++++++++++++++-- 1 file changed, 39 insertions(+), 4 deletions(-) diff --git a/src/main/java/org/opensearch/plugin/insights/core/utils/IndexedPriorityQueue.java b/src/main/java/org/opensearch/plugin/insights/core/utils/IndexedPriorityQueue.java index 08cb883f..17b7f04e 100644 --- a/src/main/java/org/opensearch/plugin/insights/core/utils/IndexedPriorityQueue.java +++ b/src/main/java/org/opensearch/plugin/insights/core/utils/IndexedPriorityQueue.java @@ -15,9 +15,13 @@ import java.util.Map; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantReadWriteLock; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; public class IndexedPriorityQueue { + private static final Logger logger = LogManager.getLogger(IndexedPriorityQueue.class); + public static class Entry { public final K key; public V value; @@ -45,11 +49,15 @@ public IndexedPriorityQueue(int initialCapacity, Comparator comparato public boolean insert(K key, V value) { write.lock(); try { - if (indexMap.containsKey(key)) return false; + if (indexMap.containsKey(key)) { + logger.debug("Key {} already exists in queue, skipping insert", key); + return false; + } heap.add(new Entry<>(key, value)); int idx = heap.size() - 1; indexMap.put(key, idx); siftUp(idx); + logger.debug("Successfully inserted key {} at index {}", key, idx); return true; } finally { write.unlock(); @@ -60,17 +68,44 @@ public boolean remove(K key) { write.lock(); try { Integer idx = indexMap.remove(key); - if (idx == null) return false; + if (idx == null) { + logger.debug("Key {} not found in queue, nothing to remove", key); + return false; + } int lastIdx = heap.size() - 1; if (idx != lastIdx) { Entry lastItem = heap.get(lastIdx); heap.set(idx, lastItem); indexMap.put(lastItem.key, idx); - siftDown(idx); - siftUp(idx); + + boolean needSiftUp = false; + boolean needSiftDown = false; + + if (idx > 0) { + int parentIdx = (idx - 1) >>> 1; + if (comparator.compare(lastItem.value, heap.get(parentIdx).value) < 0) { + needSiftUp = true; + } + } + + if (!needSiftUp && idx < heap.size() >>> 1) { + int left = (idx << 1) + 1; + if (comparator.compare(lastItem.value, heap.get(left).value) > 0) { + needSiftDown = true; + } else if (left + 1 < heap.size() && comparator.compare(lastItem.value, heap.get(left + 1).value) > 0) { + needSiftDown = true; + } + } + + if (needSiftUp) { + siftUp(idx); + } else if (needSiftDown) { + siftDown(idx); + } } heap.remove(lastIdx); + logger.debug("Successfully removed key {} from index {}", key, idx); return true; } finally { write.unlock();