reactor / reactor-rabbitmq

Reactor RabbitMQ
Apache License 2.0
157 stars 55 forks source link

Channels are leaking #20

Closed acogoluegnes closed 5 years ago

acogoluegnes commented 6 years ago

From: https://gitter.im/reactor/reactor?at=5b314d63479ca266897ee89b

Still need to investigate to kwow whether it's legit or not.

Hello people, I'm using reactor-rabbitmq 1.0.0.M2 to interract (quite a lot!) with rabbitMQ. I need to create some streams (like 300 of them - and fast) which will create a unique queue and consume from it but I also need to dispose them on need or if they become "idle" (timeout). So suppose I have two methods, one creating and one disposing:

public void createDisposable(String uuid){

        String queueName = "somePrefix" + uuid;

        Disposable disposable = sender.declareQueue(QueueSpecification.queue().name(queueName).durable(true))
                .flatMap(qOk -> sender.bind(BindingSpecification.binding()
                        .exchange("someExchange1")
                        .queue(queueName)
                        .routingKey("uuid")))
                .thenMany(receiver.consumeAutoAck(queueName))
                .timeout(Duration.of(60, ChronoUnit.SECONDS))
                .filter((delivery)-> delivery.getBody().length != 0)
                .flatMap((delivery) -> {
                    try {
                        return Flux.just((HashMap<String, String>) objectMapper.readValue(delivery.getBody(), HashMap.class));
                    } catch (IOException e) {
                        return Flux.empty();
                    }
                })
                .doOnNext(myHashMap -> {
                    // do some calculations
                })
                .flatMap(myHashMap -> sender.send(
                        Mono.just(new OutboundMessage("someExchange2", "", "someRandomString".getBytes())))
                        .then(Mono.just(myHashMap)))
                .subscribe((next) -> {
                    log.info("Whatever");
                });

            runningDisposables.put(uuid, disposable);
    }

and runningDisposables is a ConcurrentHashMap<String, Disposable> where I keep my disposables and can get them by uuid. Now this seems to work fine with few streams but when I have a lot of them I've got a problem: Some channels (quite a lot - like 60% of my streams) used to interact with rabbitMQ are left open (lingering). And they won't go away unless I stop my application or force close the connection. AFAIK calling disposable.dispose() is supposed to close the channel (and in some test I did the same happened when timeout was reached). So any thoughts on what I am doing wrong?

alexreve commented 6 years ago

Hello again. I didn't get to work on reproducing the issue in a simple system (yet) but I think I managed to get rid of it in a way that might help point out the problem (if there is one!).

What seems to have worked is this: Instead of trying to produce to rabbitMQ using the sender in the end of the stream, I delegated that work to another stream using a WorkQueueProcessor.

So the example above would look like this:

public void createDisposable(String uuid){

        String queueName = "somePrefix" + uuid;

        Disposable disposable = sender.declareQueue(QueueSpecification.queue().name(queueName).durable(true))
                .flatMap(qOk -> sender.bind(BindingSpecification.binding()
                        .exchange("someExchange1")
                        .queue(queueName)
                        .routingKey("uuid")))
                .thenMany(receiver.consumeAutoAck(queueName))
                .timeout(Duration.of(60, ChronoUnit.SECONDS))
                .filter((delivery)-> delivery.getBody().length != 0)
                .flatMap((delivery) -> {
                    try {
                        return Flux.just((HashMap<String, String>) objectMapper.readValue(delivery.getBody(), HashMap.class));
                    } catch (IOException e) {
                        return Flux.empty();
                    }
                })
                .doOnNext(myHashMap -> {
                    // do some calculations
                })
                .doOnNext(myHashMap -> producingProcessor.onNext(new OutboundMessage("someExchange2", "", "someRandomString".getBytes())))
                .subscribe((next) -> {
                    log.info("Whatever");
                });

        runningDisposables.put(uuid, disposable);
    }

and the producing to rabbitMQ part happens in another stream looking like this:

public void enableProducerStream(){
        producingProcessor.publish()
                .autoConnect()
                .flatMap(outboundMessage -> sender.send(Mono.just(outboundMessage)).then(Mono.just(outboundMessage)))
                .subscribe(outboundMessage -> {
                    log.info("Produced");
                });
    }

Now, I'm new to Reactor, so I don't have a solid understanding of it yet but I have a hypothesis:

This might happen to a traffic heavy system in the case where the whole disposable is getting disposed while there are still messages being processed. So maybe some part - the sender I guess - doesn't get the "dispose signal" properly while at the same time it's trying to produce to rabbitMQ so it doesn't close the channel.

acogoluegnes commented 6 years ago

@alexreve Thanks for the follow-up and happy to see you managed to make it work properly. I leave this issue open for further investigation later.

acogoluegnes commented 5 years ago

No update in about a year, closing.