reactor / reactor-rabbitmq

Reactor RabbitMQ
Apache License 2.0
157 stars 55 forks source link

Unclear how to keep the consumer open after an exception is thrown #177

Open jndietz opened 2 years ago

jndietz commented 2 years ago

Documentation Issue

I think it would be good to document how to keep a consumer alive, or handle exceptions gracefully within a reactive stream. I came up with a solution but it doesn't seem correct.

Improvement Suggestion

All of the example I've found online show the receiver being closed after it has consumed events. I think a lot of people have use cases where there is a listener constantly running, and handles messages as they come in as opposed to just shutting down after they've all been consumed.

Additional context

    Disposable processAuditEvents() {
        final String AUDIT_QUEUE_NAME = "audit"

        receiver
                .consumeAutoAck(AUDIT_QUEUE_NAME)
                .subscribe(m -> {
                    log.debug("Received message {}", new String(m.body))
                    def wrapper = objectMapper.readValue(m.body, WrapperObject)
                    strategies
                            .find { it.handles(wrapper.payload) }
                            .handleEvent(wrapper.payload)
                }, { ex ->
                    log.error("Something bad happened: ${ex.message}")
                    processAuditEvents() // 👈 just re-register the consumer on error 🤷‍♂️
                })
    }

Instead of using a Receiver, I ended up falling back to the annotation-driven style of listening, and this appears to work well enough for me.

    @RabbitListener(queues = "audit", ackMode = "MANUAL")
    Mono<Void> listen(CustomerActivityEventWrapper wrapper) {
        log.debug("Received message {}", wrapper.toString())
        strategies
                .find { it.handles(wrapper.payload) }
                .handleEvent(wrapper.payload)
    }
matsev commented 6 days ago

Hi @jndietz! :wave:

Did you find a solution to this problem or did you manage to work around it? We have a problem that occasionally causes our receiver to terminate that we would like to mitigate, c.f. #186