vert-x3 / vertx-rx

Reactive Extensions for Vert.x
Apache License 2.0
147 stars 73 forks source link

300 - Fix potential race condition when the flowable runs on another thread #303

Closed NilsRenaud closed 9 months ago

NilsRenaud commented 11 months ago

Fixes this issue: https://github.com/vert-x3/vertx-rx/issues/300

When the flowable runs on another thread, checkStatus can be called from 2 threads: the reader one (usually an event loop) and the producer one (usually a worker thread). Although this method is synchronized, a race condition can still happen in the handler.handle(...) method because producer thread can catch up the execution of the reader one (and the other way around). This is safer then to ensure that only one thread produce elements of this ReadStream.

I chose the reader context as the one used to execute every event publications because it's the context we know about at the subscriber's creation.

NilsRenaud commented 11 months ago

I know tests are broken, but I wanted a validation before continuing to work on this

vietj commented 11 months ago

@NilsRenaud I don't think this is the right way to fix it, can you provide a simple reproducer of the case ?

NilsRenaud commented 11 months ago

Okay, here is a reproducer:

The idea is to have the readStream in a special state where 2 threads will end up publishing data to the ReadStream handler. Then introduce an artificial processing delay interleaving with thread scheduling using latch.

import static org.assertj.core.api.Assertions.assertThat;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import org.junit.Test;

import io.reactivex.rxjava3.core.Emitter;
import io.reactivex.rxjava3.core.Flowable;
import io.reactivex.rxjava3.schedulers.Schedulers;
import io.vertx.core.streams.ReadStream;
import io.vertx.rxjava3.impl.ReadStreamSubscriber;

public class Reproducer {
    private int called = 0;
    CountDownLatch waitForSecondElement = new CountDownLatch(1);
    CountDownLatch consumerLatch = new CountDownLatch(2);
    CountDownLatch testLatch = new CountDownLatch(1);
    CountDownLatch waitForFirstElementInPending = new CountDownLatch(1);
    String total = "";

    @Test
    public void test() throws InterruptedException {
        ExecutorService executor = Executors.newSingleThreadExecutor();
        Flowable<String> flowableContent = Flowable.generate(this::emit).subscribeOn(Schedulers.io());
        ReadStream<String> readStream = ReadStreamSubscriber.asReadStream(flowableContent, obj -> obj);
        executor.execute(() -> {
            // Set the readStream as if it was paused, with data in its pending queue and more incoming data expected
            readStream.pause();
            readStream.handler(this::handler); // Fetch elements, ending in pending queue
            readStream.endHandler(unused -> testLatch.countDown());
            try {
                waitForFirstElementInPending.await(); // Wait for the first element to be published in the pending queue
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            readStream.fetch(2); // immediately consume the first element, published on this thread
            waitForSecondElement.countDown(); // release the 2nd element to be published on Rx thread.
        });
        testLatch.await();
        assertThat(total).isEqualTo("msg1msg2");
    }

    // - Emit "msg1",
    // - wait for a latch to emit the second element,
    // - complete
    private void emit(Emitter<String> emitter) throws InterruptedException {
        if (called == 0) {
            called = 1;
            emitter.onNext("msg1");
            waitForFirstElementInPending.countDown();
        } else if (called == 1) {
            called = 2;
            waitForSecondElement.await();
            emitter.onNext("msg2");
        } else {
            emitter.onComplete();
        }
    }

    private void handler(final String s) {
        waitForSecondElement.countDown();
        try {
            // Simulate an unsynchronized delay in the consumer by blocking the first message, letting go the 2nd message
            if (consumerLatch.getCount() > 1) {
                consumerLatch.countDown();
                consumerLatch.await();
            }
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        total += s;
        consumerLatch.countDown(); // release the 1st message
    }
}
vietj commented 11 months ago

thanks for the reproducer I understand better what is going on. I think that is an inherent bug of the read stream subscriber implementation that should be fixed

vietj commented 11 months ago

@NilsRenaud can you try your original case with

public class ReadStreamSubscriber<R, J> implements Subscriber<R>, ReadStream<J> {

  private static final Runnable NOOP_ACTION = () -> { };
  private static final Throwable DONE_SENTINEL = new Throwable();

  public static final int BUFFER_SIZE = 16;

  public static <R, J> ReadStream<J> asReadStream(Flowable<R> flowable, Function<R, J> adapter) {
    return new ReadStreamSubscriber<>(adapter, flowable::subscribe);
  }

  public static <R, J> ReadStream<J> asReadStream(Observable<R> observable, Function<R, J> adapter) {
    return asReadStream(observable.toFlowable(BackpressureStrategy.BUFFER), adapter);
  }

  private final Function<R, J> adapter;
  private Handler<Void> endHandler;
  private Handler<Throwable> exceptionHandler;
  private Handler<J> elementHandler;
  private long demand = Long.MAX_VALUE;
  private Throwable completed;
  private ArrayDeque<R> pending = new ArrayDeque<>();
  private int requested = 0;
  private Subscription subscription;
  private Publisher<R> publisher;
  private boolean emitting;

  public ReadStreamSubscriber(Function<R, J> adapter, Publisher<R> publisher) {
    this.adapter = adapter;
    this.publisher = publisher;
  }

  @Override
  public ReadStream<J> handler(Handler<J> handler) {
    Runnable action;
    synchronized (this) {
      elementHandler = handler;
      if (handler != null) {
        action = () -> publisher.subscribe(this);
      } else {
        Subscription s = subscription;
        action = s != null ? s::cancel : NOOP_ACTION;
      }
    }
    action.run();
    checkStatus();
    return this;
  }

  @Override
  public ReadStream<J> pause() {
    synchronized (this) {
      demand = 0L;
    }
    return this;
  }

  @Override
  public ReadStream<J> fetch(long amount) {
    if (amount < 0L) {
      throw new IllegalArgumentException("Invalid amount: " + amount);
    }
    synchronized (this) {
      demand += amount;
      if (demand < 0L) {
        demand = Long.MAX_VALUE;
      }
    }
    checkStatus();
    return this;
  }

  @Override
  public ReadStream<J> resume() {
    return fetch(Long.MAX_VALUE);
  }

  @Override
  public void onSubscribe(Subscription s) {
    synchronized (this) {
      subscription = s;
    }
    checkStatus();
  }

  private void checkStatus() {
    synchronized (this) {
      if (emitting) {
        return;
      }
      emitting = true;
    }
    try {
      Runnable action = NOOP_ACTION;
      while (true) {
        J adapted;
        Handler<J> handler;
        synchronized (this) {
          if (demand > 0L && (handler = elementHandler) != null && pending.size() > 0) {
            if (demand != Long.MAX_VALUE) {
              demand--;
            }
            requested--;
            R item = pending.poll();
            adapted = adapter.apply(item);
          } else {
            if (completed != null) {
              if (pending.isEmpty()) {
                Handler<Throwable> onError;
                Throwable result;
                if (completed != DONE_SENTINEL) {
                  onError = exceptionHandler;
                  result = completed;
                  exceptionHandler = null;
                } else {
                  onError = null;
                  result = null;
                }
                Handler<Void> onCompleted = endHandler;
                endHandler = null;
                action = () -> {
                  try {
                    if (onError != null) {
                      onError.handle(result);
                    }
                  } finally {
                    if (onCompleted != null) {
                      onCompleted.handle(null);
                    }
                  }
                };
              }
            } else if (elementHandler != null && requested < BUFFER_SIZE / 2) {
              int request = BUFFER_SIZE - requested;
              action = () -> subscription.request(request);
              requested = BUFFER_SIZE;
            }
            break;
          }
        }
        handler.handle(adapted);
      }
      action.run();
    } finally {
      synchronized (this) {
        emitting = false;
      }
    }
  }

  @Override
  public ReadStream<J> endHandler(Handler<Void> handler) {
    synchronized (this) {
      if (completed == null || pending.size() > 0) {
        endHandler = handler;
      } else {
        if (handler != null) {
          throw new IllegalStateException();
        }
      }
    }
    return this;
  }

  @Override
  public ReadStream<J> exceptionHandler(Handler<Throwable> handler) {
    synchronized (this) {
      if (completed == null || pending.size() > 0) {
        exceptionHandler = handler;
      } else {
        if (handler != null) {
          throw new IllegalStateException();
        }
      }
    }
    return this;
  }

  @Override
  public void onComplete() {
    onError(DONE_SENTINEL);
  }

  @Override
  public void onError(Throwable e) {
    synchronized (this) {
      if (completed != null) {
        return;
      }
      completed = e;
    }
    checkStatus();
  }

  @Override
  public void onNext(R item) {
    synchronized (this) {
      pending.add(item);
    }
    checkStatus();
  }
}
NilsRenaud commented 11 months ago

Thanks a lot for this try, but unfortunately it does not work :/ As as said, the ReadStreamSubscriber publishes events sequentially but on 2 threads so it's in the ReadStream handler that events may mix up (depending of its implementation though, it may never happen if the whole handler is synchronized).

That's why I think the real solution is to have only 1 thread publishing elements, either the Flowable-side one, or the ReadStream-side one. With .runOnContext solution, I choose the ReadStream side thread. But we can choose the Flowable one if you prefer :)

I would like to know why the .runOnContext solution is bad though, has it huge perf impacts I've not identified ?

FYI here is the original reproducer:

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.Base64;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;

import io.netty.buffer.ByteBuf;
import io.reactivex.rxjava3.core.Emitter;
import io.reactivex.rxjava3.core.Flowable;
import io.reactivex.rxjava3.functions.Consumer;
import io.reactivex.rxjava3.schedulers.Schedulers;
import io.vertx.core.http.HttpMethod;
import io.vertx.rxjava3.core.RxHelper;
import io.vertx.rxjava3.core.Vertx;
import io.vertx.rxjava3.core.buffer.Buffer;
import io.vertx.rxjava3.core.http.HttpServer;

public class Reproducer {
    private static final String BASE_PATH = "whatever big (~10MB) file";
    private static final String NEW_FILE = "whatever name";
    private static final Vertx VERTX = Vertx.vertx();
    private static String originalChecksum;

    public static void main(String[] args) throws InterruptedException, IOException, NoSuchAlgorithmException {
        byte[] fileAsBytes = Files.readAllBytes(Path.of(BASE_PATH));
        Files.deleteIfExists(Path.of(NEW_FILE));
        writeBuffInfo("client", fileAsBytes);

        createHttpServerWhichLogsReceivedBufferInfo();

        Flowable<Buffer> flowableContent = Flowable
                .generate(emitIn8KiBChunks(fileAsBytes))
                .subscribeOn(Schedulers.io());
        VERTX.createHttpClient()
             .rxRequest(HttpMethod.POST, 8080, "localhost", "/")
             .flatMap(req -> req.rxSend(flowableContent))
             .flatMap(resp -> resp.body())
             .blockingSubscribe(body -> System.out.println(body.toString()));
    }

    private static void createHttpServerWhichLogsReceivedBufferInfo() {
        HttpServer server = VERTX.createHttpServer();
        server.requestHandler(req -> {
            req.bodyHandler(buffer -> {
                writeBuffInfo("server", buffer.getBytes());
                writeFile(buffer.getBytes());
                req.response().end("OK");
                server.close();
            });
        }).listen(8080).blockingGet();
        System.out.println("server started.");
    }

    // Do not pay too much attention to this method, it simply read a byte array by chunks
    private static Consumer<Emitter<Buffer>> emitIn8KiBChunks(byte[] bufferToEmit) {
        final int chunkSize = 8 * 1024;
        final AtomicInteger pos = new AtomicInteger(0);
        final int maxPos = bufferToEmit.length - 1;

        return emitter -> {
            // Adding a Thread.sleep(...) here makes it work.
            int currentPos = pos.get();
            if (currentPos == maxPos) {
                emitter.onComplete();
            } else if (currentPos + chunkSize > maxPos) {
                emitter.onNext(Buffer.buffer().setBytes(0, bufferToEmit, currentPos, maxPos - currentPos + 1));
                pos.set(maxPos);
            } else {
                emitter.onNext(Buffer.buffer().setBytes(0, bufferToEmit, currentPos, chunkSize));
                pos.addAndGet(chunkSize);
            }
        };
    }

    private static void writeBuffInfo(String prefix, byte[] buff) {
        try {
            MessageDigest md5 = MessageDigest.getInstance("md5");
            System.out.println(prefix + " - buffer size: " + buff.length);
            String checkSum = Base64.getEncoder().encodeToString(md5.digest(buff));
            System.out.println(prefix + " - buffer MD5 checksum: " + checkSum);
            if (originalChecksum == null) {
                originalChecksum = checkSum;
            } else {
                System.out.println("Checksums: \"" + originalChecksum + "\" vs \"" + checkSum + "\" : "
                                           + originalChecksum.equals(checkSum));
            }
        } catch (NoSuchAlgorithmException e) {
            e.printStackTrace();
        }
    }

    private static void writeFile(byte[] buff) {
        try {
            Files.write(Path.of(NEW_FILE), buff);
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }
}
vietj commented 10 months ago

I would prefer a solution that does not rely on vertx context

NilsRenaud commented 10 months ago

Ok I'm back on the topic. I'll try to write a solution without which only use the Flowable-side thread to produce elements. I chose this thread, because otherwise the Stream-side thread would have to actively monitor the pending queue (or to wait for the flowable-side thread to awake it) to be sure not to miss any data :/

NilsRenaud commented 10 months ago

@vietj I just pushed a new commit with the above strategy, using only the main producer thread to produce data implemented. I'm sure it could be better, and some tests fails because on this internal change but would you agree on such a solution ?