eclipse-vertx / vert.x

Vert.x is a tool-kit for building reactive applications on the JVM
http://vertx.io
Other
14.31k stars 2.08k forks source link

Race condition when use RecordParser with concatMapCompletable and observeOn #4297

Open ben1222 opened 2 years ago

ben1222 commented 2 years ago

Version

vertx 4.1.7

Context

I encountered some issues when using RecordParser with concatMapCompletable and observeOn like the following code to process records one by one on worker thread:

RecordParser.newDelimited(delimiter, asyncFile)
  .toFlowable()
  .concatMapCompletable(data -> Completable.complete()
    .observeOn(blockingScheduler(executor))
    .andThen(process(data))
  )

The issues including:

  1. Sometime the RecordParser will suddenly stop emitting record, there's no error and not reached end of file and it is not disposed... it just stuck there.
  2. Sometime a MissingBackpressureException is thrown

After investigated and tried create a unit test for this issue, it looks like a race condition on RecordParserImpl: Usually the record is emitted from RecordParserImpl on event loop thread. (RecordParserImpl.handle) However, when backpressure exists (concatMapCompletable here), it can be the thread running on downstream to request item from upstream. (RecordParserImpl.fetch) In my case, the inner stream of concatMapCompletable is switched to a worker thread using observeOn, so it will be the worker thread requesting item from RecordParserImpl during backpressure.

When the RecordParserImpl.handle running on event loop thread and the RecordParserImpl.fetch running on worker thread are called at same time, race condition happens because RecordParserImpl is not written in thread-safe way. The race condition includes but not limited to:

Steps to reproduce

Here's a unit test that could reproduce the issue:

  @Test
  public void testBackpressure4() throws Throwable {
    Logger LOG = LogManager.getLogger();
    String fileName = "/tmp/testBackpressure";
    FileWriter writer = new FileWriter(fileName);
    for (int i = 0; i < 300; i++) {
      writer.write(String.format("%1000d\n", i));
    }
    writer.close();

    int count = 100;
    for (int i = 0; i < count; i++) {
      LOG.info("{}: Start", i);
      Vertx vertx = Vertx.vertx();
      WorkerExecutor executor = vertx.createSharedWorkerExecutor("shared-worker", 1);
      Throwable err = Single.just(i)
          .flatMapCompletable(n -> vertx.fileSystem()
              .rxOpen(fileName, new OpenOptions().setWrite(false).setRead(true))
              .flatMapCompletable(f -> RecordParser.newDelimited("\n", f).toFlowable()
                  .map(data -> {
                    final String dataString = data.toString().trim();
                    LOG.info("{}: Read record {}", n, dataString);
                    return dataString;
                  })
                  .concatMapCompletable(data -> Completable.complete()
                    .observeOn(blockingScheduler(executor))
                    .doOnComplete(() -> LOG.info("{}: processed record {}", n, data))
                  )
                  .onErrorResumeNext(e -> f.rxClose().andThen(Completable.error(e)))
                  .andThen(Completable.defer(f::rxClose))
              )
          )
          .timeout(5, TimeUnit.SECONDS)
          .blockingGet();

      vertx.close();

      if (err == null) {
        LOG.info("{}: Success", i);
      } else {
        LOG.error("{}: failure", i, err);
        if (!(err instanceof TimeoutException)) { // it will fail the case when issue 2 happens, remove the "!" to fail the case when issue 1 happens
          throw err;
        }
      }
    }
  }

In the printed log, we can see the xx: Read record yy log can sometime be printed on event loop thread (RecordParserImpl.handle) and sometime be printed on shared worker thread (due to backpressure, RecordParserImpl.fetch)

The backtrace for issue 1 looks like:

The source did not signal an event for 5 seconds and has been terminated.java.util.concurrent.TimeoutException: The source did not signal an event for 5 seconds and has been terminated.
    at io.reactivex.internal.operators.completable.CompletableTimeout$DisposeTask.run(CompletableTimeout.java:109)
    at io.reactivex.internal.schedulers.ScheduledDirectTask.call(ScheduledDirectTask.java:38)
    at io.reactivex.internal.schedulers.ScheduledDirectTask.call(ScheduledDirectTask.java:26)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:834)

The backtrace for issue 2 looks like:

io.reactivex.exceptions.MissingBackpressureException: Queue full?!
    at io.reactivex.internal.operators.mixed.FlowableConcatMapCompletable$ConcatMapCompletableObserver.onNext(FlowableConcatMapCompletable.java:121)
    at io.reactivex.internal.operators.flowable.FlowableMap$MapSubscriber.onNext(FlowableMap.java:68)
    at io.vertx.reactivex.impl.FlowableReadStream.lambda$subscribeActual$2(FlowableReadStream.java:86)
    at io.vertx.core.parsetools.impl.RecordParserImpl.handleParsing(RecordParserImpl.java:214)
    at io.vertx.core.parsetools.impl.RecordParserImpl.handle(RecordParserImpl.java:285)
    at io.vertx.core.parsetools.impl.RecordParserImpl.handle(RecordParserImpl.java:27)
    at io.vertx.core.file.impl.AsyncFileImpl.handleBuffer(AsyncFileImpl.java:425)
    at io.vertx.core.file.impl.AsyncFileImpl.lambda$new$0(AsyncFileImpl.java:110)
    at io.vertx.core.streams.impl.InboundBuffer.handleEvent(InboundBuffer.java:240)
    at io.vertx.core.streams.impl.InboundBuffer.write(InboundBuffer.java:130)
    at io.vertx.core.file.impl.AsyncFileImpl.lambda$doRead$5(AsyncFileImpl.java:407)
    at io.vertx.core.impl.future.FutureImpl$3.onSuccess(FutureImpl.java:141)
    at io.vertx.core.impl.future.FutureBase.lambda$emitSuccess$0(FutureBase.java:54)
    at app//io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
    at app//io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:469)
    at app//io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:500)
    at app//io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986)
    at app//io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
    at app//io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    at java.base@11.0.7/java.lang.Thread.run(Thread.java:834)

Not sure if it is a problem in RecordParser or concatMapCompletable (is it expected for upstream to be requested on worker thread in this case?) or maybe it is not a desired to use them in this way?

vietj commented 2 years ago

record parser was meant to be used exclusively from event loop, that's why we do have this corner case.

we need try to fix this and have a concurrent version or find another strategy that could be in vertx-rx

vietj commented 2 years ago

it seems that using an atomic long for the demand and a CAS for the parsing boolean could work. we would likely also need to replace the current buffer by a list of buffer instead so a new buffer can be concurrently added while another thread is parsing.

@tsegismont made this code not reentrant and instead have thread cooperate and I think it helps to have a proper concurrent version

vietj commented 2 years ago

do you have an opinion you can share @jponge ?

jponge commented 2 years ago

Looking at the code it sounds like we should explore a drain-loop design as one can find in RxJava / Mutiny, etc.

This way concurrent calls to:

We'd also need to come up with an even smaller reproducer that's only based on Vert.x core.