Skip to content
Merged
Show file tree
Hide file tree
Changes from 31 commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
e55f472
reproducer
tlrx Sep 9, 2025
a48b071
possible fix
tlrx Sep 12, 2025
877bbc6
Update docs/changelog/134656.yaml
tlrx Sep 12, 2025
d38acd9
[CI] Auto commit changes from spotless
Sep 12, 2025
bc6c302
Merge branch 'main' into 2025/09/09/ES-12664
tlrx Sep 15, 2025
12f5597
nit
tlrx Sep 15, 2025
1aed9d2
nit
tlrx Sep 15, 2025
6d900ac
nit
tlrx Sep 15, 2025
a0dbc8c
[CI] Auto commit changes from spotless
Sep 15, 2025
adce7eb
nit
tlrx Sep 15, 2025
a332f74
nit
tlrx Sep 15, 2025
e613990
Merge branch '2025/09/09/ES-12664' of github.com:tlrx/elasticsearch i…
tlrx Sep 15, 2025
7781abf
nit
tlrx Sep 15, 2025
a9f15cd
Merge branch 'main' into 2025/09/09/ES-12664
tlrx Sep 15, 2025
596cd62
Merge branch 'main' into 2025/09/09/ES-12664
tlrx Sep 15, 2025
e12c711
Merge branch 'main' into 2025/09/09/ES-12664
tlrx Sep 15, 2025
bf0790e
Merge branch 'main' into 2025/09/09/ES-12664
tlrx Sep 16, 2025
4ac38b2
Merge branch 'main' into 2025/09/09/ES-12664
tlrx Sep 17, 2025
ec0d6bb
Merge branch 'main' into 2025/09/09/ES-12664
tlrx Sep 18, 2025
37a3d9b
suppress exception
tlrx Sep 18, 2025
4d38ecc
fix spin loop
tlrx Sep 18, 2025
0eae7ac
remove synchronized block
tlrx Sep 19, 2025
ae99d45
fix test
tlrx Sep 19, 2025
3218f7f
Merge branch 'main' into 2025/09/09/ES-12664
tlrx Sep 19, 2025
cfc4009
Merge branch 'main' into 2025/09/09/ES-12664
tlrx Sep 19, 2025
97105b4
feedback
tlrx Sep 19, 2025
742dd09
assertTrue
tlrx Sep 19, 2025
d3e083d
latch
tlrx Sep 19, 2025
555a846
closedWithNoRunningMerges
tlrx Sep 19, 2025
7eae1cc
[CI] Auto commit changes from spotless
Sep 19, 2025
3f6c734
Merge branch 'main' into 2025/09/09/ES-12664
tlrx Sep 20, 2025
5bd52a4
rarely close
tlrx Sep 20, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/changelog/134656.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 134656
summary: Fix deadlock in `ThreadPoolMergeScheduler` when a failing merge closes the
`IndexWriter`
area: Engine
type: bug
issues: []

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -2756,11 +2756,7 @@ private IndexWriter createWriter() throws IOException {

// protected for testing
protected IndexWriter createWriter(Directory directory, IndexWriterConfig iwc) throws IOException {
if (Assertions.ENABLED) {
return new AssertingIndexWriter(directory, iwc);
} else {
return new IndexWriter(directory, iwc);
}
return new ElasticsearchIndexWriter(directory, iwc, logger);
}

// with tests.verbose, lucene sets this up: plumb to align with filesystem stream
Expand Down Expand Up @@ -2920,8 +2916,10 @@ LiveIndexWriterConfig getCurrentIndexWriterConfig() {
return indexWriter.getConfig();
}

private void maybeFlushAfterMerge(OnGoingMerge merge) {
if (indexWriter.hasPendingMerges() == false && System.nanoTime() - lastWriteNanos >= engineConfig.getFlushMergesAfter().nanos()) {
protected void maybeFlushAfterMerge(OnGoingMerge merge) {
if (indexWriter.getTragicException() == null
&& indexWriter.hasPendingMerges() == false
&& System.nanoTime() - lastWriteNanos >= engineConfig.getFlushMergesAfter().nanos()) {
// NEVER do this on a merge thread since we acquire some locks blocking here and if we concurrently rollback the
// writer
// we deadlock on engine#close for instance.
Expand Down Expand Up @@ -3377,19 +3375,49 @@ private static Map<String, String> commitDataAsMap(final IndexWriter indexWriter
return commitData;
}

private static class AssertingIndexWriter extends IndexWriter {
AssertingIndexWriter(Directory d, IndexWriterConfig conf) throws IOException {
super(d, conf);
private static class ElasticsearchIndexWriter extends IndexWriter {

private final Logger logger;

ElasticsearchIndexWriter(Directory directory, IndexWriterConfig indexWriterConfig, Logger logger) throws IOException {
super(directory, indexWriterConfig);
this.logger = logger;
}

@Override
public void onTragicEvent(Throwable tragedy, String location) {
assert tragedy != null;
try {
if (getConfig().getMergeScheduler() instanceof ThreadPoolMergeScheduler mergeScheduler) {
try {
// Must be executed before calling IndexWriter#onTragicEvent
mergeScheduler.onTragicEvent(tragedy);
} catch (Exception e) {
logger.warn("Exception thrown when notifying the merge scheduler of a tragic event", e);
if (tragedy != e) {
tragedy.addSuppressed(e);
}
}
}
} finally {
super.onTragicEvent(tragedy, location);
}
}

@Override
public long deleteDocuments(Term... terms) {
throw new AssertionError("must not hard delete documents");
public long deleteDocuments(Term... terms) throws IOException {
if (Assertions.ENABLED) {
throw new AssertionError("must not hard delete documents");
}
return super.deleteDocuments(terms);
}

@Override
public long tryDeleteDocument(IndexReader readerIn, int docID) {
throw new AssertionError("tryDeleteDocument is not supported. See Lucene#DirectoryReaderWithAllLiveDocs");
public long tryDeleteDocument(IndexReader readerIn, int docID) throws IOException {
if (Assertions.ENABLED) {
throw new AssertionError("tryDeleteDocument is not supported. See Lucene#DirectoryReaderWithAllLiveDocs");
}
return super.tryDeleteDocument(readerIn, docID);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Comparator;
import java.util.HashSet;
import java.util.IdentityHashMap;
import java.util.Iterator;
import java.util.List;
Expand All @@ -48,6 +50,7 @@
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;
import java.util.function.LongUnaryOperator;
import java.util.function.Predicate;
import java.util.function.ToLongFunction;

import static org.elasticsearch.cluster.routing.allocation.DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_DISK_FLOOD_STAGE_MAX_HEADROOM_SETTING;
Expand Down Expand Up @@ -372,7 +375,7 @@ private void runMergeTask(MergeTask mergeTask) {
}
}

private void abortMergeTask(MergeTask mergeTask) {
void abortMergeTask(MergeTask mergeTask) {
assert mergeTask.hasStartedRunning() == false;
assert runningMergeTasks.contains(mergeTask) == false;
try {
Expand All @@ -385,6 +388,25 @@ private void abortMergeTask(MergeTask mergeTask) {
}
}

private void abortMergeTasks(Collection<MergeTask> mergeTasks) {
if (mergeTasks != null && mergeTasks.isEmpty() == false) {
for (var mergeTask : mergeTasks) {
abortMergeTask(mergeTask);
}
}
}

/**
* Removes all {@link MergeTask} that match the predicate and aborts them.
* @param predicate the predicate to filter merge tasks to be aborted
*/
void abortQueuedMergeTasks(Predicate<MergeTask> predicate) {
final var queuedMergesToAbort = new HashSet<MergeTask>();
if (queuedMergeTasks.drainMatchingElementsTo(predicate, queuedMergesToAbort) > 0) {
abortMergeTasks(queuedMergesToAbort);
}
}

/**
* Start monitoring the available disk space, and update the available budget for running merge tasks
* Note: this doesn't work correctly for nodes with multiple data paths, as it only considers the data path with the MOST
Expand Down Expand Up @@ -675,6 +697,25 @@ ElementWithReleasableBudget take() throws InterruptedException {
}
}

int drainMatchingElementsTo(Predicate<E> predicate, Collection<? super E> c) {
int removed = 0;
final ReentrantLock lock = this.lock;
lock.lock();
try {
for (Iterator<Tuple<E, Long>> iterator = enqueuedByBudget.iterator(); iterator.hasNext();) {
E item = iterator.next().v1();
if (predicate.test(item)) {
iterator.remove();
c.add(item);
removed++;
}
}
return removed;
} finally {
lock.unlock();
}
}

/**
* Updates the available budged given the passed-in argument, from which it deducts the budget hold up by taken elements
* that are still in use. The elements budget is also updated by re-applying the budget function.
Expand Down Expand Up @@ -704,7 +745,7 @@ void updateBudget(long availableBudget) {

void postBudgetUpdate() {
assert lock.isHeldByCurrentThread();
};
}

private void updateBudgetOfEnqueuedElementsAndReorderQueue() {
assert this.lock.isHeldByCurrentThread();
Expand Down
Loading