Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 65 additions & 22 deletions reactor-core/src/main/java/reactor/core/publisher/FluxZip.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2016-2023 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2016-2025 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -780,6 +780,12 @@ public Object scanUnsafe(Attr key) {

void error(Throwable e, int index) {
if (Exceptions.addThrowable(ERROR, this, e)) {
// Mark the inner subscriber as 'done' only AFTER this coordinator has
// successfully registered the error above.
// This specific ordering guarantees that a concurrent drain loop cannot see
// `done == true` on the inner subscriber without the coordinator's `error`
// field also being visible.
subscribers[index].done = true;
drain(null, null);
}
else {
Expand Down Expand Up @@ -862,15 +868,31 @@ else if (dataSignal != null && cancelled) {
return;
}

// This block is the central point for terminating the operator.
// It is checked at the start of every work cycle inside the drain loop.
// We also enter this block on the loop AFTER a terminal condition has been
// atomically registered deeper inside the drain logic.
if (error != null) {
cancelAll();
discardAll(missed);
if (error == Exceptions.TERMINATED) {
// The onComplete() path is specifically triggered when a previous pass through
// the drain loop detected that a source had finished cleanly (d && sourceEmpty)
// and successfully won a potential race condition to set the `error` field to the TERMINATED
// sentinel. The recursive `drain()` call made at that time ensures we re-enter
// this loop, hit this check, and can now safely terminate.
cancelAll();
discardAll(missed);
a.onComplete();
return;
}
else {
cancelAll();
discardAll(missed);

Throwable ex = Exceptions.terminate(ERROR, this);
Throwable ex = Exceptions.terminate(ERROR, this);

a.onError(ex);

return;
a.onError(ex);
return;
}
}

boolean empty = false;
Expand All @@ -886,11 +908,26 @@ else if (dataSignal != null && cancelled) {

boolean sourceEmpty = v == null;
if (d && sourceEmpty) {
cancelAll();
discardAll(missed);
// Attempt to claim the error state. In a concurrent scenario,
// an onError() from one source could be racing with this onComplete path
// from another. `compareAndSet` ensures that only the first signal
// to arrive can set the error state using a sentinel value.
if (ERROR.compareAndSet(this, null, Exceptions.TERMINATED)) {
// If case of a race condition, we have just created a new unit of work: the
// drain loop must now see the TERMINATED state and send onComplete().
// We call drain() here to safely increment the WIP, which
// guarantees the main drain loop will run at least one more time
// instead of exiting prematurely.
drain(null, null);
}

// We signal to the rest of the drain loop that we cannot produce a
// zipped value in this iteration. If we didn't, an early 'break' could leave
// the `values` array partially filled, causing an NPE when the zipper
// function is called.
empty = true;

a.onComplete();
return;
break;
}
if (!sourceEmpty) {
values[j] = v;
Expand Down Expand Up @@ -955,14 +992,21 @@ else if (dataSignal != null && cancelled) {
}

if (error != null) {
cancelAll();
discardAll(missed);
if (error == Exceptions.TERMINATED) {
cancelAll();
discardAll(missed);
a.onComplete();
return;
}
else {
cancelAll();
discardAll(missed);

Throwable ex = Exceptions.terminate(ERROR, this);
Throwable ex = Exceptions.terminate(ERROR, this);

a.onError(ex);

return;
a.onError(ex);
return;
}
}

for (int j = 0; j < n; j++) {
Expand All @@ -975,11 +1019,11 @@ else if (dataSignal != null && cancelled) {

boolean empty = v == null;
if (d && empty) {
cancelAll();
discardAll(missed);
if (ERROR.compareAndSet(this, null, Exceptions.TERMINATED)) {
drain(null, null);
}

a.onComplete();
return;
break;
}
if (!empty) {
values[j] = v;
Expand Down Expand Up @@ -1134,7 +1178,6 @@ public void onError(Throwable t) {
Operators.onErrorDropped(t, currentContext());
return;
}
done = true;
parent.error(t, index);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2016-2022 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2016-2025 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -19,6 +19,7 @@
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
Expand All @@ -31,6 +32,7 @@
import reactor.core.CoreSubscriber;
import reactor.core.Scannable;
import reactor.core.TestLoggerExtension;
import reactor.core.scheduler.Schedulers;
import reactor.test.ParameterizedTestWithName;
import reactor.test.StepVerifier;
import reactor.test.publisher.FluxOperatorTest;
Expand Down Expand Up @@ -1428,4 +1430,27 @@ public void scanSingleSubscriber() {
assertThat(test.scan(Scannable.Attr.TERMINATED)).isTrue();
assertThat(test.scan(Scannable.Attr.CANCELLED)).isTrue();
}

@Test
public void testZipCorrectlyPropagatesTheErrorEmittedByAConcurrentSource() {
Flux<Integer> fluxToTest = Flux.range(1, 10)
.flatMap(ignored -> {
Mono<Optional<Object>> mono1 = Mono.empty()
.publishOn(Schedulers.parallel())
.map(Optional::of)
.defaultIfEmpty(Optional.empty());

Mono<Optional<Object>> mono2 = Mono.error(new NullPointerException())
.map(Optional::of)
.defaultIfEmpty(Optional.empty());

return Flux.zip(mono1, mono2)
.collectList()
.onErrorResume(e -> Mono.empty());
})
.flatMap(evt ->
Mono.error(new RuntimeException(String.format("Unexpected empty list return by collectList of size %s", evt.size())))
);
StepVerifier.create(fluxToTest).expectNextCount(0).verifyComplete();
}
}