rsocket / rsocket-java

Java implementation of RSocket
http://rsocket.io
Apache License 2.0
2.35k stars 354 forks source link

Slow Producer in combination with RSocket is not sending any data until request met #1037

Closed hjohn closed 2 years ago

hjohn commented 2 years ago

Expected Behavior

When a client requests 100 items from a RSocket MessageMapping but the producer is producing these at a slow pace (1 per second) I expect these to be sent out to the client in a timely fashion, one at a time.

Actual Behavior

The behavior I'm observing is that with RSocket in between the producer and subscriber some buffering is occurring until the request of 100 is met (which will take 100 seconds). When RSocket is removed from the equation, the items do come in one at a time as I would expect.

Steps to Reproduce

Before I create a full test case, I'm wondering if I'm not just missing something obvious. My use case is to connect a client to a server where the server can send out messages at highly variable speeds, either a few per minute or tens of thousands per second. Latency is important and messages must arrive as quickly as possible, yet there is also need for high throughput when the server has many message available (a client may not be able to keep up, and so some back pressure is needed in those cases).

The client will basically at all times attempt to have a request of 100+ messages outstanding in case many are available, but it still expects messages to be delivered quickly when there are less than that available. In simulations without RSocket this performs as expected.

The client simply looks like this:

@Autowired RSocketRequester requester;

@EventListener(ApplicationReadyEvent.class)
public void run() throws InterruptedException {
    long time = System.currentTimeMillis();

    requester.route("fake-events")
        .retrieveFlux(String.class)
        .doOnNext(x -> System.out.println("Received raw: " + x))
        .doOnRequest(size -> System.out.println("Request going out of: " + size))
        .subscribe(new BatchEventSubscriber<>(100, Duration.ofMillis(50), new Consumer<List<String>>() {
            long totalReceived = 0;
            long totalBytes = 0;

            @Override
            public void accept(List<String> s) {
                totalReceived += s.size();
                totalBytes += s.stream().mapToInt(String::length).sum();
                System.out.printf("So we received: %4d (%4d) messages @ %8.1f msg/sec (%d kB/sec)\n", totalReceived, s.size(), ((double)totalReceived / (System.currentTimeMillis() - time)) * 1000, totalBytes / (System.currentTimeMillis() - time));
            }
        }));

What I see on the client is that a request goes out of 100 items (triggered by the BatchEventSubscriber), but doOnNext doesn't print anything until at least 100 messages have been generated on the server side.

The server side looks like this:

@MessageMapping("fake-events")
public Flux<String> go() {
    return Flux.generate(new Consumer<SynchronousSink<String>>() {
        long offset = 0;

        @Override
        public void accept(SynchronousSink<String> emitter) {
            emitter.next("" + offset++);
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    })
    .doOnRequest(r -> System.out.println("[+] Requested " + r));
}

The server side sees immediately a request of 1 and one of 99, but nothing arrives on the client side until 100 seconds have elapsed. Not sure why the server sees 1 and 99 as the client only did a single request of 100.

Possible Solution

I'm guessing there is some buffering happening. As I said, connecting the client directly to this Flux results in the expected behavior.

Your Environment

OlegDokuka commented 2 years ago

Hey @hjohn!

Unfortunately, given the code, I can recommend nothing. First of all BatchEventSubscriber is not a Reactor or RSocket class. Thus I can just guess what is the behavior implemented inside. Secondly - doing Thread.sleep is definitely a bad idea ( I would suggest replacing element delay with a dedicated operator .delayElement)

Try to create a runnable example to simplify the investigation on our side.

hjohn commented 2 years ago

The hint of not using Thread.sleep was sufficient; I always wondered how one would deal with a generator that is more than just a "test" source and is a bit slow to produce its items (or can even block). Adding a subscribeOn to move it to the bounded elastic pool solves the issue.