Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Copyright (c) 2025 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package reactor.core.publisher;

import org.openjdk.jcstress.annotations.*;
import org.openjdk.jcstress.infra.results.II_Result;

import java.util.Queue;

import static org.openjdk.jcstress.annotations.Expect.ACCEPTABLE;

public class SinkManyEmitterProcessorStressTest {

@JCStressTest
@Outcome(id = "0, 0", expect = ACCEPTABLE, desc = "Emission returned OK, but the concurrent drain cleaned the queue before the arbiter ran.")
@Outcome(id = "3, 0", expect = ACCEPTABLE, desc = "Emission was correctly cancelled due to a race or because it happened post-cancellation.")
@State
public static class EmitNextAndAutoCancelRaceStressTest {

private final SinkManyEmitterProcessor<Integer> sink = new SinkManyEmitterProcessor<>(true, 16);

@Actor
public void emitActor(II_Result r) {
r.r1 = sink.tryEmitNext(1).ordinal();
}

@Actor
public void cancelActor() {
sink.asFlux().subscribe().dispose();
}

@Arbiter
public void arbiter(II_Result r) {
Queue<Integer> q = sink.queue;
r.r2 = (q == null) ? 0 : q.size();
}
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2016-2023 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2016-2025 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -57,7 +57,7 @@
* @author Stephane Maldini
*/
final class SinkManyEmitterProcessor<T> extends Flux<T> implements InternalManySink<T>,
Sinks.ManyWithUpstream<T>, CoreSubscriber<T>, Scannable, Disposable, ContextHolder {
Sinks.ManyWithUpstream<T>, CoreSubscriber<T>, Scannable, Disposable, ContextHolder {

@SuppressWarnings("rawtypes")
static final FluxPublish.PubSubInner[] EMPTY = new FluxPublish.PublishInner[0];
Expand Down Expand Up @@ -201,6 +201,9 @@ public void onComplete() {

@Override
public EmitResult tryEmitComplete() {
if (isCancelled()) {
return EmitResult.FAIL_CANCELLED;
}
if (done) {
return EmitResult.FAIL_TERMINATED;
}
Expand All @@ -217,6 +220,9 @@ public void onError(Throwable throwable) {
@Override
public EmitResult tryEmitError(Throwable t) {
Objects.requireNonNull(t, "tryEmitError must be invoked with a non-null Throwable");
if (isCancelled()) {
return EmitResult.FAIL_CANCELLED;
}
if (done) {
return EmitResult.FAIL_TERMINATED;
}
Expand All @@ -241,6 +247,9 @@ public void onNext(T t) {

@Override
public EmitResult tryEmitNext(T t) {
if (isCancelled()) {
return EmitResult.FAIL_CANCELLED;
}
if (done) {
return Sinks.EmitResult.FAIL_TERMINATED;
}
Expand Down Expand Up @@ -271,6 +280,23 @@ public EmitResult tryEmitNext(T t) {
return subscribers == EMPTY ? EmitResult.FAIL_ZERO_SUBSCRIBER : EmitResult.FAIL_OVERFLOW;
}
drain();

// This final check is critical for handling a race between this emit operation
// and a concurrent cancellation from another thread.
//
// The race condition scenario:
// 1. This thread passes the initial isCancelled() check at the top of the method.
// 2. This thread successfully offers an item to the queue.
// 3. Concurrently, another thread disposes the last subscriber, which cancels the sink
// and triggers a drain that cleans up the just-offered item.
//
// Without this check, we would return EmitResult.OK, but the item has already been
// discarded. This check ensures we accurately report FAIL_CANCELLED, reflecting
// the final state of the operation.
if (isCancelled()) {
return EmitResult.FAIL_CANCELLED;
}

return EmitResult.OK;
}

Expand Down Expand Up @@ -382,7 +408,7 @@ public Object scanUnsafe(Attr key) {
return null;
}

final void drain() {
void drain() {
if (WIP.getAndIncrement(this) != 0) {
return;
}
Expand All @@ -397,11 +423,9 @@ final void drain() {

boolean empty = q == null || q.isEmpty();

if (checkTerminated(d, empty)) {
return;
}
cleanupIfTerminated(d, empty);

FluxPublish.PubSubInner<T>[] a = subscribers;
FluxPublish.PubSubInner<T>[] a = subscribers;

if (a != EMPTY && !empty) {
long maxRequested = Long.MAX_VALUE;
Expand Down Expand Up @@ -431,10 +455,8 @@ final void drain() {
d = true;
v = null;
}
if (checkTerminated(d, v == null)) {
return;
}
if (sourceMode != Fuseable.SYNC) {
cleanupIfTerminated(d, v == null);
if (sourceMode != Fuseable.SYNC) {
s.request(1);
}
continue;
Expand All @@ -458,16 +480,14 @@ final void drain() {

empty = v == null;

if (checkTerminated(d, empty)) {
return;
}
cleanupIfTerminated(d, empty);

if (empty) {
if (empty) {
//async mode only needs to break but SYNC mode needs to perform terminal cleanup here...
if (sourceMode == Fuseable.SYNC) {
//the q is empty
done = true;
checkTerminated(true, true);
cleanupIfTerminated(true, true);
}
break;
}
Expand All @@ -494,10 +514,8 @@ final void drain() {
}
else if ( sourceMode == Fuseable.SYNC ) {
done = true;
if (checkTerminated(true, empty)) { //empty can be true if no subscriber
break;
}
}
cleanupIfTerminated(true, empty);//empty can be true if no subscriber
}

missed = WIP.addAndGet(this, -missed);
if (missed == 0) {
Expand All @@ -511,7 +529,14 @@ FluxPublish.PubSubInner<T>[] terminate() {
return SUBSCRIBERS.getAndSet(this, TERMINATED);
}

boolean checkTerminated(boolean d, boolean empty) {
/**
* Inspects the current state and, if terminal, performs the necessary cleanup actions
* like clearing the queue and signaling subscribers.
*
* @param d the current `done` state
* @param empty if the queue is currently empty
*/
void cleanupIfTerminated(boolean d, boolean empty) {
if (s == Operators.cancelledSubscription()) {
if (autoCancel) {
terminate();
Expand All @@ -520,7 +545,7 @@ boolean checkTerminated(boolean d, boolean empty) {
q.clear();
}
}
return true;
return;
}
if (d) {
Throwable e = error;
Expand All @@ -532,19 +557,16 @@ boolean checkTerminated(boolean d, boolean empty) {
for (FluxPublish.PubSubInner<T> inner : terminate()) {
inner.actual.onError(e);
}
return true;
}
else if (empty) {
for (FluxPublish.PubSubInner<T> inner : terminate()) {
inner.actual.onComplete();
}
return true;
}
}
return false;
}

final boolean add(EmitterInner<T> inner) {
boolean add(EmitterInner<T> inner) {
for (; ; ) {
FluxPublish.PubSubInner<T>[] a = subscribers;
if (a == TERMINATED) {
Expand All @@ -560,7 +582,7 @@ final boolean add(EmitterInner<T> inner) {
}
}

final void remove(FluxPublish.PubSubInner<T> inner) {
void remove(FluxPublish.PubSubInner<T> inner) {
for (; ; ) {
FluxPublish.PubSubInner<T>[] a = subscribers;
if (a == TERMINATED || a == EMPTY) {
Expand Down Expand Up @@ -591,14 +613,11 @@ final void remove(FluxPublish.PubSubInner<T> inner) {
if (SUBSCRIBERS.compareAndSet(this, a, b)) {
//contrary to FluxPublish, there is a possibility of auto-cancel, which
//happens when the removed inner makes the subscribers array EMPTY
if (autoCancel && b == EMPTY && Operators.terminate(S, this)) {
if (WIP.getAndIncrement(this) != 0) {
return;
}
terminate();
Queue<T> q = queue;
if (q != null) {
q.clear();
if (autoCancel && b == EMPTY && !isCancelled()) {
if (Operators.terminate(S, this)) {
// The state is now CANCELLED.
// Trigger a drain so the serialized drain-loop can perform the cleanup
drain();
}
}
return;
Expand Down Expand Up @@ -653,5 +672,4 @@ public void dispose() {
}
}


}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2016-2024 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2016-2025 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -50,6 +50,8 @@
import reactor.util.context.Context;

import static org.assertj.core.api.Assertions.*;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static reactor.core.Scannable.Attr;
import static reactor.core.Scannable.Attr.*;
import static reactor.core.publisher.Sinks.EmitFailureHandler.FAIL_FAST;
Expand Down Expand Up @@ -922,4 +924,72 @@ void emitNextWithNoSubscriberNoCapacityKeepsSinkOpenWithBuffer() {
.expectTimeout(Duration.ofSeconds(1))
.verify();
}

@Test
void testThatCancelledSinkShouldNotAcceptsEmissions() {
Sinks.Many<String> sink = Sinks.many().multicast().onBackpressureBuffer();
Disposable subscription1 = sink.asFlux().subscribe(s -> System.out.println("1: " + s));
assertEquals(1, sink.currentSubscriberCount());
sink.tryEmitNext("Test1");
subscription1.dispose();
assertEquals(0, sink.currentSubscriberCount());
Disposable subscription2 = sink.asFlux().subscribe(s -> System.out.println("2: " + s));
assertTrue(subscription2.isDisposed());
assertEquals(0, sink.currentSubscriberCount());
assertTrue(sink.tryEmitNext("Test2").isFailure(), "Emissions on a cancelled sink should fail");
}

@Test
void testQueueShouldBeEmptyAfterCancellation() {
SinkManyEmitterProcessor<Integer> processor = new SinkManyEmitterProcessor<>(true, 1);
processor.tryEmitNext(1);
assertThat(processor.queue.size()).isEqualTo(1);
processor.asFlux().doOnNext(i -> System.out.println("Received " + i)).subscribe().dispose();
assertThat(processor.queue.size()).isEqualTo(0);
processor.tryEmitNext(2);
assertThat(processor.queue.size()).isEqualTo(0);
}

@Test
void testNoQueueIsCreatedIfNoEmissionOccurredBeforeCancellation() {
SinkManyEmitterProcessor<Integer> processor = new SinkManyEmitterProcessor<>(true, 1);

processor.asFlux().subscribe().dispose();

processor.tryEmitNext(1);
assertThat(processor.queue).isNull();
}

@Test
void testThatOneSubscriberDisposesSinkStaysActive() {
Sinks.Many<Integer> sink = Sinks.many().multicast().onBackpressureBuffer();

sink.asFlux().subscribe(i -> System.out.println("Subscriber A received: " + i));
Disposable subscriberB = sink.asFlux().subscribe(i -> System.out.println("Subscriber B received: " + i));

assertThat(sink.currentSubscriberCount()).isEqualTo(2);

sink.tryEmitNext(1);
subscriberB.dispose();

assertThat(sink.currentSubscriberCount()).isEqualTo(1);

Sinks.EmitResult result = sink.tryEmitNext(2);
assertThat(result).isEqualTo(Sinks.EmitResult.OK);
}

@Test
void testThatLastSubscriberDisposesTriggersAutoCancel() {
Sinks.Many<Integer> sink = Sinks.many().multicast().onBackpressureBuffer();
Disposable disposable = sink.asFlux().subscribe();

assertThat(sink.currentSubscriberCount()).isEqualTo(1);

disposable.dispose();

assertThat(sink.currentSubscriberCount()).isEqualTo(0);

Sinks.EmitResult result = sink.tryEmitNext(1);
assertThat(result).isEqualTo(Sinks.EmitResult.FAIL_CANCELLED);
}
}