Skip to content

Commit 404dbdf

Browse files
authored
[8.19] Fix deadlock in ThreadPoolMergeScheduler when a failing merge closes the IndexWriter (#134656) (#135177)
This change fixes a bug that causes a deadlock in the thread pool merge scheduler when a merge fails due to a tragic event. The deadlock occurs because Lucene aborts running merges when failing with a tragic event and then waits for them to complete. But those "running" merges might in fact be waiting in the Elasticsearch's thread pool merge scheduler tasks queue, or they might be waiting in the backlogged merge tasks queue because the per-shard concurrent merges count limit has been reached, or they might simply be waiting for enough disk space to be executed. In which cases the merge thread that is failing waits indefinitely. The proposed fix in this change uses the merge thread that is failing due to a tragic event to abort all other enqueued and backlogged merge tasks of the same shard, before pursuing with the closing of the IndexWriter. This way Lucene won't have to wait for any running merges as they would have all be aborted upfront. Backport of #134656 for 8.19.5 Relates ES-12664
1 parent ccd582f commit 404dbdf

File tree

7 files changed

+710
-34
lines changed

7 files changed

+710
-34
lines changed

docs/changelog/134656.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
pr: 134656
2+
summary: Fix deadlock in `ThreadPoolMergeScheduler` when a failing merge closes the
3+
`IndexWriter`
4+
area: Engine
5+
type: bug
6+
issues: []

server/src/internalClusterTest/java/org/elasticsearch/index/engine/MergeWithFailureIT.java

Lines changed: 457 additions & 0 deletions
Large diffs are not rendered by default.

server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java

Lines changed: 42 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -2682,11 +2682,7 @@ private IndexWriter createWriter() throws IOException {
26822682

26832683
// protected for testing
26842684
protected IndexWriter createWriter(Directory directory, IndexWriterConfig iwc) throws IOException {
2685-
if (Assertions.ENABLED) {
2686-
return new AssertingIndexWriter(directory, iwc);
2687-
} else {
2688-
return new IndexWriter(directory, iwc);
2689-
}
2685+
return new ElasticsearchIndexWriter(directory, iwc, logger);
26902686
}
26912687

26922688
// with tests.verbose, lucene sets this up: plumb to align with filesystem stream
@@ -2822,8 +2818,10 @@ LiveIndexWriterConfig getCurrentIndexWriterConfig() {
28222818
return indexWriter.getConfig();
28232819
}
28242820

2825-
private void maybeFlushAfterMerge(OnGoingMerge merge) {
2826-
if (indexWriter.hasPendingMerges() == false && System.nanoTime() - lastWriteNanos >= engineConfig.getFlushMergesAfter().nanos()) {
2821+
protected void maybeFlushAfterMerge(OnGoingMerge merge) {
2822+
if (indexWriter.getTragicException() == null
2823+
&& indexWriter.hasPendingMerges() == false
2824+
&& System.nanoTime() - lastWriteNanos >= engineConfig.getFlushMergesAfter().nanos()) {
28272825
// NEVER do this on a merge thread since we acquire some locks blocking here and if we concurrently rollback the
28282826
// writer
28292827
// we deadlock on engine#close for instance.
@@ -3280,19 +3278,49 @@ private static Map<String, String> commitDataAsMap(final IndexWriter indexWriter
32803278
return commitData;
32813279
}
32823280

3283-
private static class AssertingIndexWriter extends IndexWriter {
3284-
AssertingIndexWriter(Directory d, IndexWriterConfig conf) throws IOException {
3285-
super(d, conf);
3281+
private static class ElasticsearchIndexWriter extends IndexWriter {
3282+
3283+
private final Logger logger;
3284+
3285+
ElasticsearchIndexWriter(Directory directory, IndexWriterConfig indexWriterConfig, Logger logger) throws IOException {
3286+
super(directory, indexWriterConfig);
3287+
this.logger = logger;
32863288
}
32873289

32883290
@Override
3289-
public long deleteDocuments(Term... terms) {
3290-
throw new AssertionError("must not hard delete documents");
3291+
public void onTragicEvent(Throwable tragedy, String location) {
3292+
assert tragedy != null;
3293+
try {
3294+
if (getConfig().getMergeScheduler() instanceof ThreadPoolMergeScheduler mergeScheduler) {
3295+
try {
3296+
// Must be executed before calling IndexWriter#onTragicEvent
3297+
mergeScheduler.onTragicEvent(tragedy);
3298+
} catch (Exception e) {
3299+
logger.warn("Exception thrown when notifying the merge scheduler of a tragic event", e);
3300+
if (tragedy != e) {
3301+
tragedy.addSuppressed(e);
3302+
}
3303+
}
3304+
}
3305+
} finally {
3306+
super.onTragicEvent(tragedy, location);
3307+
}
32913308
}
32923309

32933310
@Override
3294-
public long tryDeleteDocument(IndexReader readerIn, int docID) {
3295-
throw new AssertionError("tryDeleteDocument is not supported. See Lucene#DirectoryReaderWithAllLiveDocs");
3311+
public long deleteDocuments(Term... terms) throws IOException {
3312+
if (Assertions.ENABLED) {
3313+
throw new AssertionError("must not hard delete documents");
3314+
}
3315+
return super.deleteDocuments(terms);
3316+
}
3317+
3318+
@Override
3319+
public long tryDeleteDocument(IndexReader readerIn, int docID) throws IOException {
3320+
if (Assertions.ENABLED) {
3321+
throw new AssertionError("tryDeleteDocument is not supported. See Lucene#DirectoryReaderWithAllLiveDocs");
3322+
}
3323+
return super.tryDeleteDocument(readerIn, docID);
32963324
}
32973325
}
32983326

server/src/main/java/org/elasticsearch/index/engine/ThreadPoolMergeExecutorService.java

Lines changed: 43 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,9 @@
3131
import java.io.IOException;
3232
import java.util.ArrayList;
3333
import java.util.Arrays;
34+
import java.util.Collection;
3435
import java.util.Comparator;
36+
import java.util.HashSet;
3537
import java.util.IdentityHashMap;
3638
import java.util.Iterator;
3739
import java.util.List;
@@ -48,6 +50,7 @@
4850
import java.util.concurrent.locks.ReentrantLock;
4951
import java.util.function.Consumer;
5052
import java.util.function.LongUnaryOperator;
53+
import java.util.function.Predicate;
5154
import java.util.function.ToLongFunction;
5255

5356
import static org.elasticsearch.cluster.routing.allocation.DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_DISK_FLOOD_STAGE_MAX_HEADROOM_SETTING;
@@ -372,7 +375,7 @@ private void runMergeTask(MergeTask mergeTask) {
372375
}
373376
}
374377

375-
private void abortMergeTask(MergeTask mergeTask) {
378+
void abortMergeTask(MergeTask mergeTask) {
376379
assert mergeTask.hasStartedRunning() == false;
377380
assert runningMergeTasks.contains(mergeTask) == false;
378381
try {
@@ -385,6 +388,25 @@ private void abortMergeTask(MergeTask mergeTask) {
385388
}
386389
}
387390

391+
private void abortMergeTasks(Collection<MergeTask> mergeTasks) {
392+
if (mergeTasks != null && mergeTasks.isEmpty() == false) {
393+
for (var mergeTask : mergeTasks) {
394+
abortMergeTask(mergeTask);
395+
}
396+
}
397+
}
398+
399+
/**
400+
* Removes all {@link MergeTask} that match the predicate and aborts them.
401+
* @param predicate the predicate to filter merge tasks to be aborted
402+
*/
403+
void abortQueuedMergeTasks(Predicate<MergeTask> predicate) {
404+
final var queuedMergesToAbort = new HashSet<MergeTask>();
405+
if (queuedMergeTasks.drainMatchingElementsTo(predicate, queuedMergesToAbort) > 0) {
406+
abortMergeTasks(queuedMergesToAbort);
407+
}
408+
}
409+
388410
/**
389411
* Start monitoring the available disk space, and update the available budget for running merge tasks
390412
* Note: this doesn't work correctly for nodes with multiple data paths, as it only considers the data path with the MOST
@@ -675,6 +697,25 @@ ElementWithReleasableBudget take() throws InterruptedException {
675697
}
676698
}
677699

700+
int drainMatchingElementsTo(Predicate<E> predicate, Collection<? super E> c) {
701+
int removed = 0;
702+
final ReentrantLock lock = this.lock;
703+
lock.lock();
704+
try {
705+
for (Iterator<Tuple<E, Long>> iterator = enqueuedByBudget.iterator(); iterator.hasNext();) {
706+
E item = iterator.next().v1();
707+
if (predicate.test(item)) {
708+
iterator.remove();
709+
c.add(item);
710+
removed++;
711+
}
712+
}
713+
return removed;
714+
} finally {
715+
lock.unlock();
716+
}
717+
}
718+
678719
/**
679720
* Updates the available budged given the passed-in argument, from which it deducts the budget hold up by taken elements
680721
* that are still in use. The elements budget is also updated by re-applying the budget function.
@@ -704,7 +745,7 @@ void updateBudget(long availableBudget) {
704745

705746
void postBudgetUpdate() {
706747
assert lock.isHeldByCurrentThread();
707-
};
748+
}
708749

709750
private void updateBudgetOfEnqueuedElementsAndReorderQueue() {
710751
assert this.lock.isHeldByCurrentThread();

0 commit comments

Comments
 (0)