reactor / reactor-rabbitmq

Reactor RabbitMQ
Apache License 2.0
157 stars 55 forks source link

Adapt behavior for non-recoverable connections/channels #143

Open acogoluegnes opened 4 years ago

acogoluegnes commented 4 years ago

Following up on #142.

The code assumes that connections and channels are recoverable in several places, this does not play well if the connections (and the channels) are not.

https://github.com/reactor/reactor-rabbitmq/blob/fcb27c98bdd61c338cce9df1b6fbe2853504dfb5/src/main/java/reactor/rabbitmq/Receiver.java#L159-L165

The complete signal should always be generated, because a non-recoverable channel would not recover by itself.

https://github.com/reactor/reactor-rabbitmq/blob/fcb27c98bdd61c338cce9df1b6fbe2853504dfb5/src/main/java/reactor/rabbitmq/Receiver.java#L83

The connectionMono should not be cached because again a non-recoverable connection will not recover, a new instance must be provided by the connectionMono. This would allow to use the connectionMonoConfigurator to provide some retry logic instead of provided a full connectionMono.

There may be other places where checking the connection/channel are recoverable or not may be appropriate.

a701440 commented 4 years ago

There is one more item that needs checking. Every "send" operation in the Sender is concluded by closing the channel by default (default implementation of the channelCloseHandler).

.doFinally(st -> channelCloseHandler.accept(st, channel))

It's not clear how that is supposed to interact with Mono, etc. Also why close after every send? Most of the time people send many messages.

a701440 commented 4 years ago

One more item I noticed is that default SendOptions uses exceptionHandler with ExceptionHandlers.CONNECTION_RECOVERY_PREDICATE. This also creates some "effects" when using retry with connections.

acogoluegnes commented 4 years ago

There is one more item that needs checking. Every "send" operation in the Sender is concluded by closing the channel by default (default implementation of the channelCloseHandler).

.doFinally(st -> channelCloseHandler.accept(st, channel))

It's not clear how that is supposed to interact with Mono, etc. Also why close after every send? Most of the time people send many messages.

The channel is closed when the flux terminates, so indeed on each send calls, but not on every outbound message. The library assumes flux are long or even infinite.

acogoluegnes commented 4 years ago

One more item I noticed is that default SendOptions uses exceptionHandler with ExceptionHandlers.CONNECTION_RECOVERY_PREDICATE. This also creates some "effects" when using retry with connections.

What do you suggest to do? I don't think we can disable retry by default if the channel is not recoverable, it could break existing code (it's unlikely, but still). Do you still retry when using non-recoverable connections? If so, I guess you provide your own exception handler (that's what options are for). If not, again, you provide your own exception handler, using the options in the way they're meant for.

I don't see any default behavior that would fit recoverable and non-recoverable resources at the same time in this case.

If you have suggestions, I'm open to the introduction of ready-to-use utilities that would fit most non-recoverable resources use cases though.

a701440 commented 4 years ago

I am not quite sure what the best API would be. For example when using sender.send with a Flux of messages it's hard to get the success/failure confirmation for each specific message. Send returns then(), the only way to get the rabbit error is to install the exception handler, it's hard to get the positive confirmation that the message was sent, so in most cases I see. code using sender.send(Mono.just(message)) that allows to subscribe to ok and error condition. May be a better way would be to return a publisher of results from send that would have OK or Exception for each incoming message.

acogoluegnes commented 4 years ago

you mean something like sendWithPublishConfirm but the returned flux would just tell whether the basicPublish method failed or not? This is indeed more reactive-like I guess, but it won't provide as much guarantee as the publish confirm flavor. I think this is worth considering. In theory, we could change the signature to return something, it's not a breaking change.

a701440 commented 4 years ago

Probably yes. In addition to that Mono Void ( then() ) that is currently returned is "hard" to work with in Reactor. It does not trigger doOnSuccess or subscribe(ok -> {}) handlers since there is no "Void" instance.

acogoluegnes commented 4 years ago

My mistake, it would be a breaking change, so it'll have to go in 2.0.

acogoluegnes commented 4 years ago

I created a follow-up issue #146 , don't hesitate to post some ideas about what the return flux should contain.