-
Notifications
You must be signed in to change notification settings - Fork 25.6k
Fix deadlock in ThreadPoolMergeScheduler when a failing merge closes the IndexWriter #134656
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from 18 commits
Commits
Show all changes
32 commits
Select commit
Hold shift + click to select a range
e55f472
reproducer
tlrx a48b071
possible fix
tlrx 877bbc6
Update docs/changelog/134656.yaml
tlrx d38acd9
[CI] Auto commit changes from spotless
bc6c302
Merge branch 'main' into 2025/09/09/ES-12664
tlrx 12f5597
nit
tlrx 1aed9d2
nit
tlrx 6d900ac
nit
tlrx a0dbc8c
[CI] Auto commit changes from spotless
adce7eb
nit
tlrx a332f74
nit
tlrx e613990
Merge branch '2025/09/09/ES-12664' of github.com:tlrx/elasticsearch i…
tlrx 7781abf
nit
tlrx a9f15cd
Merge branch 'main' into 2025/09/09/ES-12664
tlrx 596cd62
Merge branch 'main' into 2025/09/09/ES-12664
tlrx e12c711
Merge branch 'main' into 2025/09/09/ES-12664
tlrx bf0790e
Merge branch 'main' into 2025/09/09/ES-12664
tlrx 4ac38b2
Merge branch 'main' into 2025/09/09/ES-12664
tlrx ec0d6bb
Merge branch 'main' into 2025/09/09/ES-12664
tlrx 37a3d9b
suppress exception
tlrx 4d38ecc
fix spin loop
tlrx 0eae7ac
remove synchronized block
tlrx ae99d45
fix test
tlrx 3218f7f
Merge branch 'main' into 2025/09/09/ES-12664
tlrx cfc4009
Merge branch 'main' into 2025/09/09/ES-12664
tlrx 97105b4
feedback
tlrx 742dd09
assertTrue
tlrx d3e083d
latch
tlrx 555a846
closedWithNoRunningMerges
tlrx 7eae1cc
[CI] Auto commit changes from spotless
3f6c734
Merge branch 'main' into 2025/09/09/ES-12664
tlrx 5bd52a4
rarely close
tlrx File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,6 @@ | ||
| pr: 134656 | ||
| summary: Fix deadlock in `ThreadPoolMergeScheduler` when a failing merge closes the | ||
| `IndexWriter` | ||
| area: Engine | ||
| type: bug | ||
| issues: [] |
329 changes: 329 additions & 0 deletions
329
server/src/internalClusterTest/java/org/elasticsearch/index/engine/MergeWithFailureIT.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,329 @@ | ||
| /* | ||
| * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one | ||
| * or more contributor license agreements. Licensed under the "Elastic License | ||
| * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side | ||
| * Public License v 1"; you may not use this file except in compliance with, at | ||
| * your election, the "Elastic License 2.0", the "GNU Affero General Public | ||
| * License v3.0 only", or the "Server Side Public License, v 1". | ||
| */ | ||
|
|
||
| package org.elasticsearch.index.engine; | ||
|
|
||
| import org.apache.lucene.codecs.DocValuesProducer; | ||
| import org.apache.lucene.index.CodecReader; | ||
| import org.apache.lucene.index.FieldInfo; | ||
| import org.apache.lucene.index.FilterCodecReader; | ||
| import org.apache.lucene.index.MergePolicy; | ||
| import org.apache.lucene.index.MergeTrigger; | ||
| import org.apache.lucene.index.NumericDocValues; | ||
| import org.apache.lucene.index.OneMergeWrappingMergePolicy; | ||
| import org.elasticsearch.ExceptionsHelper; | ||
| import org.elasticsearch.action.support.WriteRequest; | ||
| import org.elasticsearch.cluster.health.ClusterHealthStatus; | ||
| import org.elasticsearch.cluster.metadata.IndexMetadata; | ||
| import org.elasticsearch.cluster.metadata.ProjectId; | ||
| import org.elasticsearch.cluster.node.DiscoveryNode; | ||
| import org.elasticsearch.cluster.routing.IndexShardRoutingTable; | ||
| import org.elasticsearch.cluster.routing.ShardRoutingState; | ||
| import org.elasticsearch.cluster.routing.UnassignedInfo; | ||
| import org.elasticsearch.cluster.routing.allocation.decider.MaxRetryAllocationDecider; | ||
| import org.elasticsearch.common.Priority; | ||
| import org.elasticsearch.common.settings.Settings; | ||
| import org.elasticsearch.common.util.CollectionUtils; | ||
| import org.elasticsearch.common.util.concurrent.ConcurrentCollections; | ||
| import org.elasticsearch.core.TimeValue; | ||
| import org.elasticsearch.index.IndexSettings; | ||
| import org.elasticsearch.index.MergeSchedulerConfig; | ||
| import org.elasticsearch.index.codec.FilterDocValuesProducer; | ||
| import org.elasticsearch.index.shard.ShardId; | ||
| import org.elasticsearch.plugins.EnginePlugin; | ||
| import org.elasticsearch.plugins.Plugin; | ||
| import org.elasticsearch.plugins.PluginsService; | ||
| import org.elasticsearch.test.ESIntegTestCase; | ||
| import org.elasticsearch.threadpool.ThreadPool; | ||
|
|
||
| import java.io.IOException; | ||
| import java.util.Collection; | ||
| import java.util.Optional; | ||
| import java.util.Set; | ||
| import java.util.concurrent.CountDownLatch; | ||
| import java.util.concurrent.ThreadPoolExecutor; | ||
| import java.util.concurrent.atomic.AtomicBoolean; | ||
| import java.util.concurrent.atomic.AtomicInteger; | ||
| import java.util.function.Supplier; | ||
|
|
||
| import static org.elasticsearch.core.Strings.format; | ||
| import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; | ||
| import static org.hamcrest.Matchers.containsString; | ||
| import static org.hamcrest.Matchers.equalTo; | ||
| import static org.hamcrest.Matchers.greaterThanOrEqualTo; | ||
| import static org.hamcrest.Matchers.notNullValue; | ||
|
|
||
| public class MergeWithFailureIT extends ESIntegTestCase { | ||
|
|
||
| private static final String FAILING_MERGE_ON_PURPOSE = "Failing merge on purpose"; | ||
|
|
||
| public static class TestPlugin extends Plugin implements EnginePlugin { | ||
|
|
||
| // Number of queued merges in the thread pool. Lucene considers those as "running" and blocks waiting on them to complete in case | ||
| // of a merge failure. | ||
| private final Set<MergePolicy.OneMerge> pendingMerges = ConcurrentCollections.newConcurrentSet(); | ||
|
|
||
| // Number of running merges in the thread pool | ||
| private final AtomicInteger runningMergesCount = new AtomicInteger(); | ||
|
|
||
| // Latch to unblock merges | ||
| private final CountDownLatch runMerges = new CountDownLatch(1); | ||
|
|
||
| private final boolean isDataNode; | ||
|
|
||
| public TestPlugin(Settings settings) { | ||
| this.isDataNode = DiscoveryNode.hasDataRole(settings); | ||
| } | ||
|
|
||
| @Override | ||
| public Optional<EngineFactory> getEngineFactory(IndexSettings indexSettings) { | ||
| if (isDataNode == false) { | ||
| return Optional.of(InternalEngine::new); | ||
| } | ||
| final var failOnce = new AtomicBoolean(false); | ||
| return Optional.of( | ||
| config -> new TestEngine( | ||
| EngineTestCase.copy( | ||
| config, | ||
| new OneMergeWrappingMergePolicy(config.getMergePolicy(), toWrap -> new MergePolicy.OneMerge(toWrap) { | ||
| @Override | ||
| public CodecReader wrapForMerge(CodecReader reader) { | ||
| return new FilterCodecReader(reader) { | ||
| @Override | ||
| public CacheHelper getCoreCacheHelper() { | ||
| return in.getCoreCacheHelper(); | ||
| } | ||
|
|
||
| @Override | ||
| public CacheHelper getReaderCacheHelper() { | ||
| return in.getReaderCacheHelper(); | ||
| } | ||
|
|
||
| @Override | ||
| public DocValuesProducer getDocValuesReader() { | ||
| return new FilterDocValuesProducer(super.getDocValuesReader()) { | ||
| @Override | ||
| public NumericDocValues getNumeric(FieldInfo field) throws IOException { | ||
| safeAwait(runMerges, TimeValue.ONE_MINUTE); | ||
| if (failOnce.compareAndSet(false, true)) { | ||
| throw new IOException(FAILING_MERGE_ON_PURPOSE); | ||
| } | ||
| return super.getNumeric(field); | ||
| } | ||
| }; | ||
| } | ||
| }; | ||
| } | ||
| }) | ||
| ) | ||
| ) | ||
| ); | ||
| } | ||
|
|
||
| private class TestEngine extends InternalEngine { | ||
|
|
||
| TestEngine(EngineConfig engineConfig) { | ||
| super(engineConfig); | ||
| } | ||
|
|
||
| @Override | ||
| protected ElasticsearchMergeScheduler createMergeScheduler( | ||
| ShardId shardId, | ||
| IndexSettings indexSettings, | ||
| ThreadPoolMergeExecutorService executor, | ||
| MergeMetrics metrics | ||
| ) { | ||
| return new ThreadPoolMergeScheduler(shardId, indexSettings, executor, merge -> 0L, metrics) { | ||
|
|
||
| @Override | ||
| public void merge(MergeSource mergeSource, MergeTrigger trigger) { | ||
| var wrapped = wrapMergeSource(mergeSource); | ||
| super.merge(wrapped, trigger); | ||
| } | ||
|
|
||
| private MergeSource wrapMergeSource(MergeSource delegate) { | ||
| // Wraps the merge source to know which merges were pulled from Lucene by the IndexWriter | ||
| return new MergeSource() { | ||
| @Override | ||
| public MergePolicy.OneMerge getNextMerge() { | ||
| var merge = delegate.getNextMerge(); | ||
| if (merge != null) { | ||
| if (pendingMerges.add(merge) == false) { | ||
| throw new AssertionError("Merge already pending " + merge); | ||
| } | ||
| } | ||
| return merge; | ||
| } | ||
|
|
||
| @Override | ||
| public void onMergeFinished(MergePolicy.OneMerge merge) { | ||
| delegate.onMergeFinished(merge); | ||
| } | ||
|
|
||
| @Override | ||
| public boolean hasPendingMerges() { | ||
| return delegate.hasPendingMerges(); | ||
| } | ||
|
|
||
| @Override | ||
| public void merge(MergePolicy.OneMerge merge) throws IOException { | ||
| runningMergesCount.incrementAndGet(); | ||
| if (pendingMerges.remove(merge) == false) { | ||
| throw new AssertionError("Pending merge not found " + merge); | ||
| } | ||
| delegate.merge(merge); | ||
| } | ||
| }; | ||
| } | ||
|
|
||
| @Override | ||
| protected void handleMergeException(final Throwable exc) { | ||
| mergeException(exc); | ||
| } | ||
| }; | ||
| } | ||
| } | ||
| } | ||
|
|
||
| @Override | ||
| protected boolean addMockInternalEngine() { | ||
| return false; | ||
| } | ||
|
|
||
| @Override | ||
| protected Collection<Class<? extends Plugin>> nodePlugins() { | ||
| return CollectionUtils.appendToCopy(super.nodePlugins(), TestPlugin.class); | ||
| } | ||
|
|
||
| public void testFailedMergeDeadlock() throws Exception { | ||
| internalCluster().startMasterOnlyNode(); | ||
| final int maxMergeThreads = randomIntBetween(1, 3); | ||
| final int indexMaxThreadCount = randomBoolean() ? randomIntBetween(1, 10) : Integer.MAX_VALUE; | ||
|
|
||
| final var dataNode = internalCluster().startDataOnlyNode( | ||
| Settings.builder() | ||
| .put(ThreadPoolMergeScheduler.USE_THREAD_POOL_MERGE_SCHEDULER_SETTING.getKey(), true) | ||
| .put("thread_pool." + ThreadPool.Names.MERGE + ".max", maxMergeThreads) | ||
| .build() | ||
| ); | ||
|
|
||
| final var plugin = getTestPlugin(dataNode); | ||
| assertThat(plugin, notNullValue()); | ||
|
|
||
| final var indexName = randomIdentifier(); | ||
| createIndex( | ||
| indexName, | ||
| indexSettings(1, 0).put(IndexMetadata.INDEX_ROUTING_INCLUDE_GROUP_PREFIX + ".name", dataNode) | ||
| .put(MaxRetryAllocationDecider.SETTING_ALLOCATION_MAX_RETRY.getKey(), 1) | ||
| // when indexMaxThreadCount is small so merge tasks might be backlogged | ||
| .put(MergeSchedulerConfig.MAX_THREAD_COUNT_SETTING.getKey(), indexMaxThreadCount) | ||
| // no merge throttling | ||
| .put(MergeSchedulerConfig.MAX_MERGE_COUNT_SETTING.getKey(), Integer.MAX_VALUE) | ||
| .build() | ||
| ); | ||
|
|
||
| // Kick off enough merges to block the thread pool | ||
| indexDocsInManySegmentsUntil(indexName, () -> plugin.runningMergesCount.get() == maxMergeThreads); | ||
| assertThat(plugin.runningMergesCount.get(), equalTo(maxMergeThreads)); | ||
|
|
||
| // Now pull more merges so they are queued in the merge thread pool, but Lucene thinks they are running | ||
| final int pendingMerges = plugin.pendingMerges.size() + randomIntBetween(1, 5); | ||
| indexDocsInManySegmentsUntil(indexName, () -> plugin.pendingMerges.size() > pendingMerges); | ||
|
|
||
| var mergeThreadPool = asInstanceOf( | ||
| ThreadPoolExecutor.class, | ||
| internalCluster().clusterService(dataNode).threadPool().executor(ThreadPool.Names.MERGE) | ||
| ); | ||
| assertThat(mergeThreadPool.getQueue().size(), greaterThanOrEqualTo(pendingMerges)); | ||
| assertThat(mergeThreadPool.getActiveCount(), equalTo(maxMergeThreads)); | ||
|
|
||
| // More merges in the hope to have backlogged merges | ||
| if (indexMaxThreadCount != Integer.MAX_VALUE) { | ||
| final int backloggedMerges = plugin.pendingMerges.size() + randomIntBetween(1, 5); | ||
| indexDocsInManySegmentsUntil(indexName, () -> plugin.pendingMerges.size() > backloggedMerges); | ||
| } | ||
|
|
||
| // unblock merges, one merge will fail the IndexWriter | ||
| plugin.runMerges.countDown(); | ||
|
|
||
| // Deadlock sample: | ||
| // | ||
| // "elasticsearch[node_s5][merge][T#1]@16690" tid=0x8e nid=NA waiting | ||
| // java.lang.Thread.State: WAITING | ||
| // at java.lang.Object.wait0(Object.java:-1) | ||
| // at java.lang.Object.wait(Object.java:389) | ||
| // at org.apache.lucene.index.IndexWriter.doWait(IndexWriter.java:5531) | ||
| // at org.apache.lucene.index.IndexWriter.abortMerges(IndexWriter.java:2733) | ||
| // at org.apache.lucene.index.IndexWriter.rollbackInternalNoCommi(IndexWriter.java:2488) | ||
| // at org.apache.lucene.index.IndexWriter.rollbackInternal(IndexWriter.java:2457) | ||
| // - locked <0x429a> (a java.lang.Object) | ||
| // at org.apache.lucene.index.IndexWriter.maybeCloseOnTragicEvent(IndexWriter.java:5765) | ||
| // at org.apache.lucene.index.IndexWriter.tragicEvent(IndexWriter.java:5755) | ||
| // at org.apache.lucene.index.IndexWriter.merge(IndexWriter.java:4780) | ||
| // at org.apache.lucene.index.IndexWriter$IndexWriterMergeSource.merge(IndexWriter.java:6567) | ||
| // at org.elasticsearch.index.engine.MergeWithFailureIT$TestPlugin$TestEngine$1$1.merge(MergeWithFailureIT.java:178) | ||
| // at org.elasticsearch.index.engine.ThreadPoolMergeScheduler.doMerge(ThreadPoolMergeScheduler.java:347) | ||
| // at org.elasticsearch.index.engine.ThreadPoolMergeScheduler$MergeTask.run(ThreadPoolMergeScheduler.java:459) | ||
| // at org.elasticsearch.index.engine.ThreadPoolMergeExecutorService.runMergeTask(ThreadPoolMergeExecutorService.java:364) | ||
|
|
||
| ensureRed(indexName); | ||
|
|
||
| // check the state of the shard | ||
| var routingTable = internalCluster().clusterService(dataNode).state().routingTable(ProjectId.DEFAULT); | ||
| var indexRoutingTable = routingTable.index(resolveIndex(indexName)); | ||
| var primary = asInstanceOf(IndexShardRoutingTable.class, indexRoutingTable.shard(0)).primaryShard(); | ||
| assertThat(primary.state(), equalTo(ShardRoutingState.UNASSIGNED)); | ||
| assertThat(primary.unassignedInfo(), notNullValue()); | ||
| assertThat(primary.unassignedInfo().reason(), equalTo(UnassignedInfo.Reason.ALLOCATION_FAILED)); | ||
| var failure = ExceptionsHelper.unwrap(primary.unassignedInfo().failure(), IOException.class); | ||
| assertThat(failure, notNullValue()); | ||
| assertThat(failure.getMessage(), containsString(FAILING_MERGE_ON_PURPOSE)); | ||
tlrx marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| assertAcked(indicesAdmin().prepareDelete(indexName)); | ||
| } | ||
|
|
||
| private void indexDocsInManySegmentsUntil(String indexName, Supplier<Boolean> stopCondition) { | ||
| indexDocsInManySegmentsUntil(indexName, stopCondition, TimeValue.THIRTY_SECONDS); | ||
| } | ||
|
|
||
| private void indexDocsInManySegmentsUntil(String indexName, Supplier<Boolean> stopCondition, TimeValue timeout) { | ||
| long millisWaited = 0L; | ||
| do { | ||
| if (millisWaited >= timeout.millis()) { | ||
| logger.warn(format("timed out after waiting for [%d]", millisWaited)); | ||
| return; | ||
| } | ||
| var client = client(); | ||
| for (int request = 0; request < 10; request++) { | ||
| var bulkRequest = client.prepareBulk().setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); | ||
| for (int doc = 0; doc < 10; doc++) { | ||
| bulkRequest.add(client.prepareIndex(indexName).setCreate(true).setSource("value", randomIntBetween(0, 1024))); | ||
| } | ||
| bulkRequest.get(); | ||
| } | ||
| long sleepInMillis = randomLongBetween(50L, 200L); | ||
| safeSleep(sleepInMillis); | ||
tlrx marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| millisWaited += sleepInMillis; | ||
| } while (stopCondition.get() == false); | ||
| } | ||
|
|
||
| private static TestPlugin getTestPlugin(String dataNode) { | ||
| return internalCluster().getInstance(PluginsService.class, dataNode).filterPlugins(TestPlugin.class).findFirst().get(); | ||
| } | ||
|
|
||
| private static void ensureRed(String indexName) throws Exception { | ||
| assertBusy(() -> { | ||
| var healthResponse = clusterAdmin().prepareHealth(TEST_REQUEST_TIMEOUT, indexName) | ||
| .setWaitForStatus(ClusterHealthStatus.RED) | ||
| .setWaitForEvents(Priority.LANGUID) | ||
| .get(); | ||
| assertThat(healthResponse.getStatus(), equalTo(ClusterHealthStatus.RED)); | ||
| }); | ||
| } | ||
| } | ||
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.