smallrye / smallrye-reactive-messaging

SmallRye Reactive Messaging
http://www.smallrye.io/smallrye-reactive-messaging/
Apache License 2.0
238 stars 177 forks source link

Subscription is getting cancelled, no exception thrown. #2102

Closed PyAntony closed 1 year ago

PyAntony commented 1 year ago

Hi,

I have an application that is executing the following steps in order: 1) Consume message from kafka, it contains a URL. 1) Download a file. 2) Upload to S3. 3) Push some bytes to Redis .

Multiple files are downloaded at a time. Application is quite complex but the flow looks like this:

    @Incoming("downloader-msg-topic")
    @Outgoing("kafka-out")
    public Multi<Message<DownloaderMessage>> stream1(Multi<Message<DownloaderMessage>> stream) { 
        return ...;
    }

    @Incoming("kafka-out")
    @Outgoing("file-downloader")
    public Multi<Message<FullURL>> stream2(Multi<Message<DownloaderMessage>> stream) {
        return ...;
    }

    @Incoming("file-downloader")
    @Outgoing("file-downloader-out")
    public Multi<Message<GETResponse>> stream3(Multi<Message<FullURL>> stream) {
        return ...;
    }

    @Incoming("file-downloader-out")
    @Outgoing("s3-upload")
    public Multi<Message<UploadRequestS3>> stream4(Multi<Message<GETResponse>> stream) {
        return ...;
    }

    @Incoming("s3-upload-out")
    @Outgoing("redis")
    public Multi<Message<RedisRequest>> stream5(Multi<Message<UploadResponseS3>> stream) {
       // I am getting cancellation event here
        return stream.onCancellation().invoke(() -> Log.warnf("Cancellation event seen..."))...;
    }

    @Incoming("redis-out")
    Uni<Void> sink(Multi<Message<ReadyResponse>> stream) {
        return ...;
    }

Everything is working fine until I cross a threshold (something above 800 files/second). At that point the application stops processing altogether. As you can see the subscriber is created by the framework. I have put log statements everywhere and I don't get any exception; there is also no thread deadlocks, I have verified this. What I am getting is a cancellation event (see comment in code) log coming through the Redis stream. I have no idea why the subscriber is cancelling the subscription; again, I don't get any exception thrown anywhere... is there any way to access/capture the subscriber or activate further logs to understand why the subscriber is cancelling the subscription?

cescoffier commented 1 year ago

How do you chain your messages? Where is the acknowledgment happening?

cescoffier commented 1 year ago

Can you provide a reproducer?

PyAntony commented 1 year ago

We are trying to debug this now; will add more info soon.

PyAntony commented 1 year ago

We have fixed this issue, although we don't understand why processing just halted without warning/errors... It seems we were overloading the Redis server (too many connections). We solved the problem by grouping all requests over 5 seconds and producing a single request using the Redis API (io.vertx.mutiny.redis.client.Redis); we are using redis.batch(...) and pushing to a single json object.

There is actually not much difference in the way we had it before; we were just grouping over a key (sourceKey) so we were producing multiple batches instead of a single batch. We were also using ReactiveRedisDataSource (rds.withConnection) to send each batch to a single redis key (key produced by using sourceKey, for example: history:<sourceKey>). So we were producing multiple requests and sending to multiple keys, somewhere around 40 requests every 5 seconds (we have somewhere around 20 pods running at a time). So, briefly, by reducing the number of connections to Redis the issue disappeared.

Now, we can reproduce the issue constantly in our environment by setting up the older configuration (multiple redis request) and trying to download somewhere above 1000 files at a time. I'll try to get a reproducer, but I don't know if I'll be successful.

Basically the way we were aggregating messages (in the redis stream) simplified looks like this:

    @Incoming(REDIS_MSG_IN)
    @Outgoing(REDIS_BATCH_IN)
    public Multi<Message<PipeBatch<RequestTemplate, ResponseTuple>>> toBatch(Multi<Message<RedisRequest>> stream) {
        return stream
                .group().by(msg -> msg.getPayload().source())
                .flatMap(gp -> gp.group().intoLists().every(Duration.ofSeconds(gpEverySec)))
                .filter(ls -> !ls.isEmpty())
                // return some batch object with additional functionality
                .map(this::prepareBatch);
    }

    @Incoming(REDIS_BATCH_IN)
    @Outgoing(REDIS_BATCH_OUT)
    public Multi<Message<PipeBatch<RequestTemplate, ResponseTuple>>> stream(
        Multi<Message<PipeBatch<RequestTemplate, ResponseTuple>>> incoming
    ) {
        return incoming.onItem().transformToUniAndMerge(msg -> asyncIO(msg.getPayload());
    }

    public Uni<PipeBatch<RequestTemplate, ResponseTuple>> asyncIO(PipeBatch<RequestTemplate, ResponseTuple> input) {
        return client.putBatch(input);
    }

    // client.putBatch calls a function 'putHistoryAll' and it calls a custom Redis pipeliner class. Pipeliner is calling function:

    <I, R, CMD> Uni<Void> pipe(
        List<I> items,
        Function<ReactiveRedisDataSource, CMD> fromClientToCommands,
        BiFunction<ReactiveRedisDataSource, I, Uni<R>> fromClient,
        BiFunction<CMD, I, Uni<R>> fromCommands,
        BiConsumer<? super I, ? super R> resultConsumer,
        BiConsumer<? super I, Throwable> errorConsumer,
        Functions.TriConsumer<? super I, ? super R, Throwable> triConsumer
    ) {
        AtomicReference<CMD> cmdRef = new AtomicReference<>();
        Function<MutableList<I>, Uni<Void>> pipeDefinition = chunk -> redisClient.withConnection(rds ->
            Uni.createFrom().item(chunk)
                .invoke(__ -> cmdRef.set(fromClientToCommands.apply(rds)))
                .flatMap(_chunk ->
                    Uni.join().all(_chunk.collect(item -> {
                        Uni<R> uni = cmdRef.get() == null
                            ? fromClient.apply(rds, item)
                            : fromCommands.apply(cmdRef.get(), item);

                        return consume(item, uni, resultConsumer, errorConsumer, triConsumer);
                    })).andCollectFailures()
                ).replaceWithVoid()
        );

        // each chunk acquires a single connection. Chunk size == `batchSize`
        // each connection has up to `maxEnqueuedCmd` handlers.
        return Multi.createFrom().iterable(Lists.adapt(items).chunk(batchSize))
            .onItem().invoke(chunk -> Log.debugf("Redis pipe connection chunk size: %s", chunk.size()))
            .onItem().transformToUniAndMerge(batch -> pipeDefinition.apply(batch.toList()))
            .onFailure().invoke(e -> Log.errorf("Redis pipe failure: %s", e))
            .toUni();
    }

By the way, there is a very peculiar log we were seeing when application halted; it looked like this:

2018-12-30 10:06:53.286  DEBUG 22878 --- [nioEventLoopGroup-2-7] io.netty.channel.nio.NioEventLoop : Selector.select() returned prematurely 6 times in a row; rebuilding Selector ...

We saw multiple of those logs above (in DEBUG mode) and then application just halted. Maybe this log might provide some ideas.

As per acknowledgment it's happening in HistoryAcknowledger class. I don't think we have missed any acknowledgment before.

PyAntony commented 1 year ago

I have been unable to reproduce the issue locally. When I load Redis locally I do get a bunch of error logs:

ERROR [io.ver.red.cli.imp.RedisConnectionManager] (vert.x-eventloop-thread-0) Unhandled Error: java.io.IOException: Connection reset by peer

Also kafka consumption keeps working, messages keep piling up, and Redis keeps processing messages written to that caches (I see this by using MONITOR in redis-cli).

In our environment (with AWS redis cluster) we see many of the following logs when application is about to halt (not in any specific order):

{"jsonMessage": ..., "succeeded": true} # simply indicates the file was downloaded and processed sucesfully
... 
...
2023-03-14 22:02:14,708 DEBUG [io.ver.cor.net.imp.ConnectionBase] (vert.x-eventloop-thread-2) The connection will be closed due to timeout
2023-03-14 22:02:14,708 DEBUG [io.ver.cor.net.imp.ConnectionBase] (vert.x-eventloop-thread-2) The connection will be closed due to timeout
...
...
2023-03-14 22:03:13,043 DEBUG [io.net.cha.nio.NioEventLoop] (vert.x-eventloop-thread-2) Selector.select() returned prematurely 7 times in a row for Selector io.netty.channel.nio.SelectedSelectionKeySetSelector@74d01459.
2023-03-14 22:03:17,525 DEBUG [io.net.cha.nio.NioEventLoop] (vert.x-eventloop-thread-2) Selector.select() returned prematurely 3 times in a row for Selector io.netty.channel.nio.SelectedSelectionKeySetSelector@74d01459.
2023-03-14 22:03:17,527 DEBUG [io.net.cha.nio.NioEventLoop] (vert.x-eventloop-thread-2) Selector.select() returned prematurely 4 times in a row for Selector io.netty.channel.nio.SelectedSelectionKeySetSelector@74d01459.
...
...
2023-03-14 22:04:37,410 INFO ... (smallrye-kafka-consumer-thread-0) Single consumer lag for group 'downloader-msg-topic': 73
2023-03-14 22:04:52,009 INFO ... (executor-thread-52) Throughput, periodSec(15.0): {downloader=ThroughputResult{in=1655, out=0, remaining=3000, processedPerSec=0.0, processedPerPeriod=0.0}, redis=ThroughputResult{in=0, out=0, remaining=384, processedPerSec=0.0, processedPerPeriod=0.0}, uploader=ThroughputResult{in=0, out=0, remaining=3000, processedPerSec=0.0, processedPerPeriod=0.0}}

Notice the final ThroughputResult log. We always end up with 3000 items stuck in the downloader stream, 3000 items stuck in the s3 uploader stream, and some items stuck in the redis stream (we have set concurrency of 3000 by calling transformToUni(...).merge(3000) ). This indicates the redis subscriber is not requesting messages anymore. We have waited up to 30min. and nothing moves, and there is o warning or error log... When calling MONITOR trough redis-cli we see no more items being processed for the keys that we were writing to. We also see high number of Redis connection in the AWS dashboard; these connections are not moving:

Conections

Again, I am able to replicate this issue in our environment consistently.

cescoffier commented 1 year ago

Setting the concurrency to 3000 but not configuring the redis client to have that level of concurrency will create a bottleneck. You would need to increase the number of connections in your redis client. But 3000 seems to be a lot.

PyAntony commented 1 year ago

Now I now what was happening in the Redis server (AWS ElastiCache). Basically requests are going to both master and replica nodes (quarkus.redis.replicas=share). Connections to replica nodes exist in readonly mode to avoid the MOVE exception (https://github.com/vert-x3/vertx-redis-client/issues/160). When a client uses one of these connections the replica node returns the response correctly, but this causes an unavoidable effect: the client gets reset and a new connection pool is created with every response (or new connections are created). Meanwhile, on the server side, these readonly connections are never closed, they keep growing indefinitely until hitting some sore of limitation where the server just stops serving connections altogether for the overloaded node. I believe this is a bug in the Redis Vertx client; I will report this to them...

This issue is resolved by simple passing quarkus.redis.replicas=never (although this reduces the read capacity since all requests would go to master only).

But the reason why I asked here is to understand why the reactive stream behaves that way: items sitting idle in the queue forever; I have waited up to 30min. and nothing else moves through the stream after the redis server gets "clogged" with open connections. I would expect all sort of timeouts and exceptions, but items should keep flowing through the application finalizing with errors. What we see is the Redis stream subscriber simply stops requesting items (without the stream being full and without terminating the subscription)... any final insights and/or ideas as to how this bottleneck is actually happening in the stream at the smallrye-reactive-messaging code level? I am totally clueless about this behavior.

cescoffier commented 1 year ago

Because the connections do nothing / get stuck, there is no request for items anymore. Thus, it's just waiting for requests. From a reactive streams / back-pressure point of view, this is the expected behavior. Sending more would harm the system.

When doing such kind of system, you need to monitor the outbound (your redis pool) and implement a readiness/liveness check for it.

PyAntony commented 1 year ago

thank you!