reactor / reactor-core

Non-Blocking Reactive Foundation for the JVM
http://projectreactor.io
Apache License 2.0
4.87k stars 1.19k forks source link

BufferTimeoutSubscriber is not thread safe #3738

Open jamiestewart opened 4 months ago

jamiestewart commented 4 months ago

This issue seems to be very similar in cause and solution to reactor/reactor-core#2362. I recommend that you read that issue before continuing further here.

Like reactor/reactor-core#2362, synchronization is present around many of BufferTimeoutSubscriber's operations on its buffer, but BufferTimeoutSubscriber.cancel() calls Operators.onDiscardMultiple() (which iterates through the buffer) with no synchronization, so another thread is not prevented from modifying the buffer while the cancel is underway. This can cause multiple failures.

The most problematic of these failures is ArrayIndexOutOfBounds, thrown from

   java.base/java.util.ArrayList.clear(ArrayList.java:653)
   at reactor.core.publisher.FluxBufferTimeout$BufferTimeoutSubscriber.cancel(FluxBufferTimeout.java:383)

This can occur if the buffer is being modified by one thread calling onNext() while another thread calls cancel(), so the cancelling thread is calling List.clear() while another thread is adding to the list.

Somewhat more common is a ConcurrentModificationException, thrown when the cancelling thread is iterating through the buffer while another thread modifies it. In anticipiation of this problem. Operators.onDiscardMultiple() catches this exception and logs it at WARN level, which prevents it from propagating, but it can be worrisome for application administrators, and the application prevents some discarded elements from being passed to the hook. Below is an example of the loggged ConcurrentModificationException:

2024-02-28 15:58:02.483  WARN 38878 --- [tor-http-nio-41] reactor.core.publisher.Operators         : Error while discarding collection, stopping

java.util.ConcurrentModificationException: null
    at java.base/java.util.ArrayList$Itr.checkForComodification(ArrayList.java:1013) ~[na:na]
    at java.base/java.util.ArrayList$Itr.next(ArrayList.java:967) ~[na:na]
    at reactor.core.publisher.Operators.onDiscardMultiple(Operators.java:567) ~[reactor-core-3.4.21.jar!/:3.4.21]
    at reactor.core.publisher.FluxBufferTimeout$BufferTimeoutSubscriber.cancel(FluxBufferTimeout.java:382) ~[reactor-core-3.4.21.jar!/:3.4.21]
    at reactor.core.publisher.SerializedSubscriber.cancel(SerializedSubscriber.java:157) ~[reactor-core-3.4.21.jar!/:3.4.21]
    at reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.cancel(FluxPublishOn.java:277) ~[reactor-core-3.4.21.jar!/:3.4.21]
    at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.cancel(FluxPeekFuseable.java:159) ~[reactor-core-3.4.21.jar!/:3.4.21]
    at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.cancel(FluxPeekFuseable.java:159) ~[reactor-core-3.4.21.jar!/:3.4.21]
    at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.cancel(FluxMapFuseable.java:176) ~[reactor-core-3.4.21.jar!/:3.4.21]
    at reactor.core.publisher.Operators.terminate(Operators.java:1240) ~[reactor-core-3.4.21.jar!/:3.4.21]
    at reactor.core.publisher.MonoFlatMapMany$FlatMapManyMain.cancel(MonoFlatMapMany.java:131) ~[reactor-core-3.4.21.jar!/:3.4.21]
    at reactor.core.publisher.FluxMap$MapSubscriber.cancel(FluxMap.java:169) ~[reactor-core-3.4.21.jar!/:3.4.21]
    at reactor.core.publisher.FluxMap$MapSubscriber.cancel(FluxMap.java:169) ~[reactor-core-3.4.21.jar!/:3.4.21]
    at org.springframework.messaging.handler.invocation.reactive.ChannelSendOperator$WriteBarrier.cancel(ChannelSendOperator.java:335) ~[spring-messaging-5.3.22.jar!/:5.3.22]
    at reactor.core.publisher.FluxMap$MapSubscriber.cancel(FluxMap.java:169) ~[reactor-core-3.4.21.jar!/:3.4.21]
    at reactor.core.publisher.FluxConcatArray$ConcatArraySubscriber.cancel(FluxConcatArray.java:286) ~[reactor-core-3.4.21.jar!/:3.4.21]
    at reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.cancel(FluxContextWrite.java:141) ~[reactor-core-3.4.21.jar!/:3.4.21]
    at reactor.core.publisher.Operators.terminate(Operators.java:1240) ~[reactor-core-3.4.21.jar!/:3.4.21]
    at reactor.core.publisher.MonoFlatMapMany$FlatMapManyMain.cancel(MonoFlatMapMany.java:131) ~[reactor-core-3.4.21.jar!/:3.4.21]
    at io.rsocket.core.RequestStreamResponderSubscriber.handleCancel(RequestStreamResponderSubscriber.java:300) ~[rsocket-core-1.1.2.jar!/:na]
    at io.rsocket.core.RSocketResponder.cleanUpSendingSubscriptions(RSocketResponder.java:193) ~[rsocket-core-1.1.2.jar!/:na]
    at io.rsocket.core.RSocketResponder.doOnDispose(RSocketResponder.java:172) ~[rsocket-core-1.1.2.jar!/:na]
    at io.rsocket.core.RSocketResponder.tryTerminate(RSocketResponder.java:106) ~[rsocket-core-1.1.2.jar!/:na]
    at io.rsocket.core.RSocketResponder.tryTerminateOnConnectionClose(RSocketResponder.java:99) ~[rsocket-core-1.1.2.jar!/:na]
    at reactor.core.publisher.LambdaMonoSubscriber.onComplete(LambdaMonoSubscriber.java:135) ~[reactor-core-3.4.21.jar!/:3.4.21]
    at reactor.core.publisher.SinkEmptyMulticast$VoidInner.complete(SinkEmptyMulticast.java:238) ~[reactor-core-3.4.21.jar!/:3.4.21]
    at reactor.core.publisher.SinkEmptyMulticast.tryEmitEmpty(SinkEmptyMulticast.java:70) ~[reactor-core-3.4.21.jar!/:3.4.21]
    at reactor.core.publisher.SinkEmptySerialized.tryEmitEmpty(SinkEmptySerialized.java:46) ~[reactor-core-3.4.21.jar!/:3.4.21]
    at io.rsocket.internal.BaseDuplexConnection.dispose(BaseDuplexConnection.java:51) ~[rsocket-core-1.1.2.jar!/:na]
    at io.rsocket.transport.netty.WebsocketDuplexConnection.lambda$new$0(WebsocketDuplexConnection.java:54) ~[rsocket-transport-netty-1.1.2.jar!/:na]
    at io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:578) ~[netty-common-4.1.79.Final.jar!/:4.1.79.Final]
    at io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:571) ~[netty-common-4.1.79.Final.jar!/:4.1.79.Final]
    at io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:550) ~[netty-common-4.1.79.Final.jar!/:4.1.79.Final]
    at io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:491) ~[netty-common-4.1.79.Final.jar!/:4.1.79.Final]
    at io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:616) ~[netty-common-4.1.79.Final.jar!/:4.1.79.Final]
    at io.netty.util.concurrent.DefaultPromise.setSuccess0(DefaultPromise.java:605) ~[netty-common-4.1.79.Final.jar!/:4.1.79.Final]
    at io.netty.util.concurrent.DefaultPromise.trySuccess(DefaultPromise.java:104) ~[netty-common-4.1.79.Final.jar!/:4.1.79.Final]
    at io.netty.channel.DefaultChannelPromise.trySuccess(DefaultChannelPromise.java:84) ~[netty-transport-4.1.79.Final.jar!/:4.1.79.Final]
    at io.netty.channel.AbstractChannel$CloseFuture.setClosed(AbstractChannel.java:1164) ~[netty-transport-4.1.79.Final.jar!/:4.1.79.Final]
    at io.netty.channel.AbstractChannel$AbstractUnsafe.doClose0(AbstractChannel.java:755) ~[netty-transport-4.1.79.Final.jar!/:4.1.79.Final]
    at io.netty.channel.AbstractChannel$AbstractUnsafe.close(AbstractChannel.java:731) ~[netty-transport-4.1.79.Final.jar!/:4.1.79.Final]
    at io.netty.channel.AbstractChannel$AbstractUnsafe.close(AbstractChannel.java:620) ~[netty-transport-4.1.79.Final.jar!/:4.1.79.Final]
    at io.netty.channel.DefaultChannelPipeline$HeadContext.close(DefaultChannelPipeline.java:1352) ~[netty-transport-4.1.79.Final.jar!/:4.1.79.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeClose(AbstractChannelHandlerContext.java:622) ~[netty-transport-4.1.79.Final.jar!/:4.1.79.Final]
    at io.netty.channel.AbstractChannelHandlerContext.close(AbstractChannelHandlerContext.java:606) ~[netty-transport-4.1.79.Final.jar!/:4.1.79.Final]
    at io.netty.channel.AbstractChannelHandlerContext.close(AbstractChannelHandlerContext.java:472) ~[netty-transport-4.1.79.Final.jar!/:4.1.79.Final]
    at io.netty.channel.DefaultChannelPipeline.close(DefaultChannelPipeline.java:957) ~[netty-transport-4.1.79.Final.jar!/:4.1.79.Final]
    at io.netty.channel.AbstractChannel.close(AbstractChannel.java:244) ~[netty-transport-4.1.79.Final.jar!/:4.1.79.Final]
    at reactor.netty.DisposableChannel.dispose(DisposableChannel.java:72) ~[reactor-netty-core-1.0.21.jar!/:1.0.21]
    at reactor.netty.channel.ChannelOperations.dispose(ChannelOperations.java:203) ~[reactor-netty-core-1.0.21.jar!/:1.0.21]
    at reactor.netty.transport.ServerTransport$ChildObserver.onStateChange(ServerTransport.java:474) ~[reactor-netty-core-1.0.21.jar!/:1.0.21]
    at reactor.netty.channel.ChannelOperations.terminate(ChannelOperations.java:478) ~[reactor-netty-core-1.0.21.jar!/:1.0.21]
    at reactor.netty.http.server.WebsocketServerOperations.lambda$onInboundNext$2(WebsocketServerOperations.java:158) ~[reactor-netty-http-1.0.21.jar!/:1.0.21]
    at io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:578) ~[netty-common-4.1.79.Final.jar!/:4.1.79.Final]
    at io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:552) ~[netty-common-4.1.79.Final.jar!/:4.1.79.Final]
    at io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:491) ~[netty-common-4.1.79.Final.jar!/:4.1.79.Final]
    at io.netty.util.concurrent.DefaultPromise.addListener(DefaultPromise.java:184) ~[netty-common-4.1.79.Final.jar!/:4.1.79.Final]
    at io.netty.channel.DefaultChannelPromise.addListener(DefaultChannelPromise.java:95) ~[netty-transport-4.1.79.Final.jar!/:4.1.79.Final]
    at io.netty.channel.DefaultChannelPromise.addListener(DefaultChannelPromise.java:30) ~[netty-transport-4.1.79.Final.jar!/:4.1.79.Final]
    at reactor.netty.http.server.WebsocketServerOperations.sendCloseNow(WebsocketServerOperations.java:262) ~[reactor-netty-http-1.0.21.jar!/:1.0.21]
    at reactor.netty.http.server.WebsocketServerOperations.onInboundNext(WebsocketServerOperations.java:158) ~[reactor-netty-http-1.0.21.jar!/:1.0.21]
    at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:93) ~[reactor-netty-core-1.0.21.jar!/:1.0.21]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[netty-transport-4.1.79.Final.jar!/:4.1.79.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[netty-transport-4.1.79.Final.jar!/:4.1.79.Final]
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) ~[netty-transport-4.1.79.Final.jar!/:4.1.79.Final]
    at io.rsocket.transport.netty.server.BaseWebsocketServerTransport$PongHandler.channelRead(BaseWebsocketServerTransport.java:60) ~[rsocket-transport-netty-1.1.2.jar!/:na]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[netty-transport-4.1.79.Final.jar!/:4.1.79.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[netty-transport-4.1.79.Final.jar!/:4.1.79.Final]
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) ~[netty-transport-4.1.79.Final.jar!/:4.1.79.Final]
    at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:327) ~[netty-codec-4.1.79.Final.jar!/:4.1.79.Final]
    at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:299) ~[netty-codec-4.1.79.Final.jar!/:4.1.79.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[netty-transport-4.1.79.Final.jar!/:4.1.79.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[netty-transport-4.1.79.Final.jar!/:4.1.79.Final]
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) ~[netty-transport-4.1.79.Final.jar!/:4.1.79.Final]
    at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) ~[netty-transport-4.1.79.Final.jar!/:4.1.79.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[netty-transport-4.1.79.Final.jar!/:4.1.79.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[netty-transport-4.1.79.Final.jar!/:4.1.79.Final]
    at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) ~[netty-transport-4.1.79.Final.jar!/:4.1.79.Final]
    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166) ~[netty-transport-4.1.79.Final.jar!/:4.1.79.Final]
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:722) ~[netty-transport-4.1.79.Final.jar!/:4.1.79.Final]
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:658) ~[netty-transport-4.1.79.Final.jar!/:4.1.79.Final]
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:584) ~[netty-transport-4.1.79.Final.jar!/:4.1.79.Final]
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:496) ~[netty-transport-4.1.79.Final.jar!/:4.1.79.Final]
    at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997) ~[netty-common-4.1.79.Final.jar!/:4.1.79.Final]
    at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[netty-common-4.1.79.Final.jar!/:4.1.79.Final]
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.79.Final.jar!/:4.1.79.Final]
    at java.base/java.lang.Thread.run(Thread.java:833) ~[na:na] 

It looks like reactor.core.publisher.FluxBufferTimeout.BufferTimeoutSubscriber.cancel() violates the reactive-streams specification:

2.8 A Subscriber MUST be prepared to receive one or more onNext signals after having called Subscription.cancel() if there are still requested elements pending [see 3.12]. Subscription.cancel() does not guarantee to perform the underlying cleaning operations immediately.

BufferTimeoutSubscriber.cancel is clearly not thread safe with respect to onNext, if one thread runs cancel iterating over the buffer to dispose its elements (via Operators.onDiscardMultiple) while another thread calls onNext to add new elements to the buffer. This causes ConcurrentModificationException to be thrown (at least when the underlying buffer is a ArrayList, which it is by default.)

3.15 Calling Subscription.cancel MUST return normally.

Expected Behavior

BufferTimeoutSubscriber.cancel MUST return normally, even if another thread is calling BufferTimeoutSubscriber.onNext.

Actual Behavior

On occasion, BufferTimeoutSubscriber.cancel is observed to throw ConcurrentModificationException if invoked while another thread is calling BufferTimeoutSubscriber.onNext.

Steps to Reproduce

Below is an imperfect unit test for exercising this behavior.

package reactor.core.publisher;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;

import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.reactivestreams.Subscription;

import reactor.core.CoreSubscriber;
import reactor.core.publisher.FluxBufferTimeout.BufferTimeoutSubscriber;
import reactor.core.scheduler.Scheduler;
import reactor.util.Logger;
import reactor.util.Loggers;
import reactor.util.context.Context;

class BufferTimeoutSubscriberConcurrency {

    private AutoCloseable mockingFramework;

    @Mock private CoreSubscriber<String> actual;
    @Mock private Context context;
    @Mock private Consumer<Object> hook;
    @Mock private Scheduler.Worker timer;
    @Mock private Subscription s;
    @Mock private Logger logger;

    @BeforeEach
    public void setup() throws URISyntaxException {
        mockingFramework = MockitoAnnotations.openMocks(this);

        // Instruct Reactive to use a mock logger
        Loggers.useCustomLoggers(loggerName -> logger);

        when(actual.currentContext()).thenReturn(context);
        when(context.getOrDefault(any(), any())).thenReturn(hook);
    }

    @AfterEach
    public void cleanup() throws Exception {
        mockingFramework.close();
    }

    @Test
    void test() throws InterruptedException {
        for (int index = 0; index < 1_000_000 ; index++) {
            BufferTimeoutSubscriber<String, ArrayList<String>> subscriber = new BufferTimeoutSubscriber(actual,
                    1_000_000, 1, TimeUnit.HOURS, timer, ArrayList::new);

            // Start with a non-empty buffer
            subscriber.onSubscribe(s);
            subscriber.request(5_000);

            var thread1 = new Thread(() -> subscriber.cancel());
            var thread2 = new Thread(() -> {
                while (thread1.isAlive()) {
                    subscriber.onNext("an element");
                }
            });
            var threads = List.of(thread1, thread2);
            threads.forEach(Thread::start);
            for (var thread : threads) {
                thread.join();
            }
        }

        verify(logger, never()).warn(any());
    }
}

Possible Solution

As was done in the fix for 2362, synchronizing the implementation of BufferTimeoutSubscriber.cancel on this would address the problem.

My Environment

Reactor version(s) used: 3.4.21 JVM version (java -version): openjdk version "17.0.2" openjdk 17.0.2 2022-01-18 OpenJDK Runtime Environment (build 17.0.2+8-86) OpenJDK 64-Bit Server VM (build 17.0.2+8-86, mixed mode, sharing) OS and version (eg uname -a): Darwin jstewart-mb01 23.3.0 Darwin Kernel Version 23.3.0: Wed Dec 20 21:30:44 PST 2023; root:xnu-10002.81.5~7/RELEASE_ARM64_T6000 arm64

chemicL commented 4 months ago

Thank you for the report @jamiestewart 👍 I can confirm this is a bug in the existing implementation. I need to give it some thought about the next steps.

There is another implementation of the bufferTimeout() operator that honours backpressure. Can you try that one? It also has a known deficiency regarding concurrency but that is something that's planned to be fixed.

Regarding this issue, we'd need:

Are you considering a contribution, @jamiestewart?

He-Pin commented 3 months ago

is it possible for you to upgrade to newer version @jamiestewart , does this problem still present?

chemicL commented 3 weeks ago

@jamiestewart can you please try the latest version of reactor or 3.5.18 at minimum and use bufferTimeout with fairBackpressure = true? It should not have any defects. The non-backpressured variant is not being currently worked on. I think we should consider deprecating it. Your comments will be valuable.