vert-x3 / vertx-rx

Reactive Extensions for Vert.x
Apache License 2.0
147 stars 73 forks source link

Race Condition with ClientHandler rxSend when the content Flowable is offloaded to a worker thread #300

Open NilsRenaud opened 1 year ago

NilsRenaud commented 1 year ago

Versions

I'm using Vert.x 4.4.6 with Java 11.

Context

I want to use Vert.x to send a content coming from a potentially blocking source (such as an InputStream for example), using the RxJava mechanics.

Thus I create a Flowable, but since I don't want to block the event loop by waiting for new buffers coming in, I ask for subscribing on a Worker Thread:

Flowable<Buffer> flowableContent = Flowable.generate(fromInputStream(is))
                .subscribeOn(RxHelper.blockingScheduler(VERTX)) // Or Schedulers.IO from RxJava
VERTX.createHttpClient()
             .rxRequest(HttpMethod.POST, 8080, "localhost", "/")
             .flatMap(req -> req.rxSend(flowableContent))

Sadly, passed a certain amount of data (100KB on my laptop), the data sent get corrupted: some buffers are missing or mangled together. Which looks like a race condition due to the different threads involved. Note that this code works well when the subscription is done on the event loop thread, but doing that I may block the event loop.

Am I doing something conceptually wrong ? Is that an expected behavior not to work in a multi-threading context ?

Do you have a reproducer?

Here is a reproducer, you only have to create a >100KB file, it will be chunked in 8KiB buffers which should be sent over the network. The output of this program looks like:

client - buffer size: 100013
client - buffer MD5 checksum: y/AU5ciYqaU/LaKhE3bHeg==

server - buffer size: 75437
server - buffer MD5 checksum: qKnksB3uuCX3X6Znnnxfcg==

client lines being the original file server lines come from the receiving server. We can see that input and output do not match.

import static io.vertx.rxjava3.core.buffer.Buffer.buffer;

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.Base64;
import java.util.concurrent.atomic.AtomicInteger;

import io.reactivex.rxjava3.core.Emitter;
import io.reactivex.rxjava3.core.Flowable;
import io.reactivex.rxjava3.functions.Consumer;
import io.reactivex.rxjava3.schedulers.Schedulers;
import io.vertx.core.http.HttpMethod;
import io.vertx.rxjava3.core.RxHelper;
import io.vertx.rxjava3.core.Vertx;
import io.vertx.rxjava3.core.buffer.Buffer;

public class Reproducer {
    private static final String BASE_PATH = "/.../file_in_100k.txt";
    private static final Vertx VERTX = Vertx.vertx();

    public static void main(String[] args) throws InterruptedException, IOException, NoSuchAlgorithmException {
        byte[] fileAsBytes = Files.readAllBytes(Path.of(BASE_PATH));
        writeBuffInfo("client", fileAsBytes);

        createHttpServerWhichLogsReceivedBufferInfo();

        Flowable<Buffer> flowableContent = Flowable
                .generate(emitIn8KiBChunks(fileAsBytes))
                .subscribeOn(RxHelper.blockingScheduler(VERTX)) // <--- This line to comment/Uncomment
                // Note: Works with ".subscribeOn(RxHelper.scheduler(VERTX))" or without subscribeOn.
        ;

        VERTX.createHttpClient()
             .rxRequest(HttpMethod.POST, 8080, "localhost", "/")
             .flatMap(req -> req.rxSend(flowableContent))
             .flatMap(resp -> resp.body())
             .blockingSubscribe(body -> System.out.println(body.toString()));
    }

    private static void createHttpServerWhichLogsReceivedBufferInfo() {
        VERTX.createHttpServer()
             .requestHandler(req -> {
                 req.bodyHandler(buffer -> {
                     writeBuffInfo("server", buffer.getBytes());
                     req.response().end("OK");
                 });
             })
             .listen(8080).blockingGet();
        System.out.println("server started.");
    }

    // Do not pay too much attention to this method, it simply read a byte array by chunks
    private static Consumer<Emitter<Buffer>> emitIn8KiBChunks(byte[] bufferToEmit) {
        final AtomicInteger pos = new AtomicInteger(0);
        final int maxPos = bufferToEmit.length - 1;

        return emitter -> {
            // Adding a Thread.sleep(...) here makes it work.
            int currentPos = pos.get();
            if (currentPos == maxPos) {
                emitter.onComplete();
            } else if (currentPos + 8 * 1_024 > maxPos) {
                emitter.onNext(buffer().setBytes(0, bufferToEmit, currentPos, maxPos - currentPos + 1));
                pos.set(maxPos);
            } else {
                emitter.onNext(buffer().setBytes(0, bufferToEmit, currentPos, 8 * 1_024));
                pos.addAndGet(8 * 1_024);
            }
        };
    }

    private static void writeBuffInfo(String prefix, byte[] buff) {
        try {
            MessageDigest md5 = MessageDigest.getInstance("md5");
            System.out.println(prefix + " - buffer size: " + buff.length);
            System.out.println(prefix + " - buffer MD5 checksum: " + Base64.getEncoder()
                                                                           .encodeToString(md5.digest(buff)));
        } catch (NoSuchAlgorithmException e) {
            e.printStackTrace();
        }
    }
}
NilsRenaud commented 1 year ago

As seen together with @vietj, this is not a vertx-rx bug but a vertx-core one. The issue is in HTTP 1.x and 2 HTTP Clients.

Here is a reproducer without Rx Java:

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;

import io.vertx.core.Future;
import io.vertx.core.Vertx;
import io.vertx.core.http.HttpClientRequest;
import io.vertx.core.http.HttpMethod;

public class SmallestReproducer {
    private static final Vertx VERTX = Vertx.vertx();
    private static final ExecutorService EXECUTOR = Executors.newFixedThreadPool(1);

    public static void main(String[] args) {
        createHttpServerWhichLogsReceivedBuffers();

        VERTX.createHttpClient()
             .request(HttpMethod.POST, 8080, "localhost", "/")
             .onSuccess(req -> send(req))
             .flatMap(req -> req.response())
             .flatMap(response -> response.body())
             .onSuccess(body -> System.out.println(body.toString()));
    }

    private static void send(HttpClientRequest req) {
        req.setChunked(true);
        Future<Void> future = req.sendHead();
        while (!future.isComplete()) { } // Wait for the header to be sent.
        AtomicBoolean latch = new AtomicBoolean(false);
        EXECUTOR.submit(() -> {
            req.write("1111");
            latch.set(true); // Release Event-loop thread
        });
        while (!latch.get()) { } // Active wait for the event to be published
        req.write("222222222");
        req.end();
        System.out.println(Thread.currentThread().getName() + " - ended");
    }

    // Nothing interesting here
    private static void createHttpServerWhichLogsReceivedBuffers() {
        CountDownLatch latch = new CountDownLatch(1);
        VERTX.createHttpServer()
             .requestHandler(req -> {
                 System.out.println("Received request");
                 req.bodyHandler(buf -> System.out.println("Received: " + buf.toString()));
                 req.endHandler(o -> {
                     System.out.println("Request 100% received.");
                     req.response().end("OK");
                 });
                 req.exceptionHandler(t -> t.printStackTrace());
             })
             .listen(8080).onSuccess(unused -> latch.countDown());
        try {
            latch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("server started.");
    }
}

This code will only send the 2nd message ("22222"), totally ignoring the first one.

@vietj Should I close this issue and create one on vertx-core issue tracker ?

vietj commented 1 year ago

@NilsRenaud yes you can

NilsRenaud commented 1 year ago

Closed in favor of this vertx-core issue: https://github.com/eclipse-vertx/vert.x/issues/4982

NilsRenaud commented 10 months ago

After further investigations with the help of @vietj we understood the real issue was that the produced ReadStream was not "serialized", meaning that the handler callback could be called concurrently.