reactor / reactor-rabbitmq

Reactor RabbitMQ
Apache License 2.0
157 stars 55 forks source link

Cannot use stream retry mechanics to retry on connection failure #94

Closed philsttr closed 5 years ago

philsttr commented 5 years ago

Similar to #43, the caching of the resourceManagementChannelMono is preventing reconnection attempts because it always returns the original exception.

Consider the following example:

sender.declare(queue("foo"))
    .retry()
    .subscribe()

When executed with rabbit down, the example will only attempt to connect once, fail with an exception, and then every retry attempt will return the same exception without trying to reconnect.

This is because the resourceManagementChannelMono is cached() and the default connectionMono is also cached

This is a contrived example, but in general, I would like to be able to use the standard reactor retry operators to perform retries on startup.

pmackowski commented 5 years ago

There are at least two ways of providing custom resourceManagementChannelMono:

Mono<Channel> monoChannel = ... 

// when Sender is created
Sender sender = RabbitFlux.createSender(
  new SenderOptions().resourceManagementChannelMono(monoChannel)
);
sender.declare(queue("foo")).subscribe();

// or when declaring queue
sender.declare(queue("foo"), new ResourceManagementOptions().channelMono(monoChannel))
           .subscribe();

Now you can define monoChannel without cache operator and retry should work now:

Mono<Channel> monoChannel =
        monoConnection.map(connection -> {
            try {
                return connection.createChannel();
            } catch (IOException e) {
                throw new RabbitFluxException("Error while creating channel", e);
            }
        })

Because caching declare is best approach in most cases here is a sugestion of monoChannel that caches only positive scenario:

Function<Channel, Duration> ttlForValue = channel -> Duration.ofMillis(Long.MAX_VALUE);
Function<Throwable, Duration> ttlForError = throwable -> Duration.ofMillis(0);
Supplier<Duration> ttlForEmpty = () -> Duration.ofMillis(0);

Mono<Channel> monoChannelCache = monoChannel 
        .cache(ttlForValue, ttlForError, ttlForEmpty);
philsttr commented 5 years ago

Thanks @pmackowski . That is indeed the workaround I have implemented.

Having said that, I think there is room for improvement here. I view this solution as a workaround to bad default behavior, versus a true solution. I feel like retries should be supported by default, since that is a very common use case for anyone creating a resilient system.

As you can see from your example, not only do you have to specify a channel mono, but you also have to manually create the connectionMono (since you can't use the default connectionMono provided by Sender). And since you can't rely on the default connectionMono, you also cannot rely on the Sender creating and managing the lifecycle of the default channelSubscriptionScheduler.

Therefore, to enable retries to work, I have to:

  1. Define a connectionSubscriptionScheduler, and manage its disposal (similar to how Sender does by default)
  2. Define a connectionMono that uses the connectionSubscriptionScheduler (similar to how Sender does by default, but only cache successes)
  3. Define a resourceManagementChannelMono that uses the connectionMono (similar to how Sender does by default, but only cache successes)

All of which entail duplicating some of the behavior from Sender.

Also note, that 1 and 2 are required if you want retries to work for sender.send and receiver.consume since the default connectionMono is also cached, so this is not specific to the resourceManagementChannelMono used by sender.declare

I feel like this is a bit much, for enabling behavior that I view should be supported by default.

In any case, if no change in behavior is made, this should be well documented for such a common use case as retrying on initial failure.

pmackowski commented 5 years ago

@philsttr I see your point. Providing custom channelMono and managing its lifecycle is too much in this case.

I would rather remove caching of the resourceManagementChannelMono and add options for cache and retry policy to ResourceManagementOptions.

Sender sender = RabbitFlux.createSender();

// no cache, no retry
sender.declare(queue);

sender.declare(QueueSpecification.queue(), new ResourceManagementOptions()
        .retry(retry options)
        .cache(cache options)
);

It should be fine with one exception. At the moment I cannot see a simple solution for evicting cache in case queue has been deleted later on (e.g. by administrator). AFAIK there is no resource notification similar to consumer cancel.

acogoluegnes commented 5 years ago

@philsttr I think this is a reasonable suggestion, retrying the connection is a common scenario that should be manageable without to much fuss. Precise semantics are yet to be defined though. @pmackowski Thanks for participating in the discussion.

acogoluegnes commented 5 years ago

@philsttr @pmackowski I pushed #100 to provide a hook to be able to configure the Mono<Connection>. This would be a good place to configure retry without losing the boilerplate the sender/receiver does when creating the Mono<Connection>.

After some thinking, I don't think retry should be enabled by default. Some applications may want to fail as fast as possible, and retry would prevent that. Moreover it'd be impossible to find reasonable defaults (how many retry attempts, what kind of backoff policy, what time between attempts, etc).

100 should make retry configuration easy, please have a look and tell me what you think. Thanks!

acogoluegnes commented 5 years ago

@philsttr Any thoughts on my comment above?

philsttr commented 5 years ago

(Apologies for delay, I've been travelling for a few weeks, and just got back today.)

I agree that connection attempts should not be retried by default. To clarify my earlier comment, I was hoping that retries would be easily supported by default by just configuring retry somewhere (and not also have to jump through hoops to disable caching of errors, etc).

Having said that, I'm still trying to process the change you have made to see exactly how to take advantage of it.

Here is a gist of what I had previously implemented for a request/response style pattern using the existing APIs (while considering this issue, along with #93). The implementation in the gist will retry on failure, caches on success only, uses a single connection for the sender (with separate channels for resource management and consumption), and uses a different connection for the receiver.

Regarding:

At the moment I cannot see a simple solution for evicting cache in case queue has been deleted later on (e.g. by administrator). AFAIK there is no resource notification similar to consumer cancel.

I noticed that when a queue is deleted while we are consuming, the consumer Flux completes. Therefore I added a repeat operator in my processor. This results in redeclaring the queue (because our admins should not be deleting queues that are still being consumed).

I'm actually not sure if the change in #100 will simplify my use case. I believe if I were to take advantage of the new connectionMonoConfigurer, I would need to configure retries in multiple places... once for the connectionMono, and again to handle errors from anything downstream (e.g. queue declaration if an exchange doesn't exist yet or something).

I don't have a perfect solution to recommend. Although, I think the main problem is that errors are cached by default, with no easy way to disable that behavior. Maybe you can take a look at my implementation and get some ideas?

acogoluegnes commented 5 years ago

@philsttr I add https://github.com/reactor/reactor-rabbitmq/pull/100/commits/0ba6045032fe6d8ba2c44a10d3abbb5b64a4870c to #100 to cache connections and channels only on success. I think this is a reasonable default: it does not do any harm and makes retry work (at least downstream).

Combined with the connection mono configuration hook, this should simplify some parts of your sample:

Retry would still need to be set the same way, but this is a start :) Let me know what you think.

And thanks for the gist, it's very helpful.

philsttr commented 5 years ago

Thanks @acogoluegnes ! I think that not caching failures (along with the previous fix to #93) will simplify my code a lot as you have mentioned.

Bonus points regarding:

doOnNext callback can be specified with the connectionMonoConfigurer.

I almost don't need that anymore. I'm only using that to track the connections I need to close on shutdown. But thanks to the fix for #93, I can now use a connectionSupplier, and allow the Sender/Receiver to create the connectionMono, which causes the sender/receiver's hasConnection to be true, and therefore sender/receiver will close the connection when the sender/receiver is closed.

The reason I say "almost", is that the close logic of Sender/Receiver does not quite match my desired behavior.

What I want:

What the sender/receiver do currently:

Do you think we can adjust the closing behavior somehow? Maybe by allowing an optional timeout in SenderOptions/ReceiverOptions, and disposing schedulers even if connection close fails.

acogoluegnes commented 5 years ago

@philsttr Yes, the closing sequence should be fixed and a closing timeout with a reasonable default (30 seconds maybe) can be added. I created 2 follow-up issues #103 #104. I'll include the fixes in #100.

acogoluegnes commented 5 years ago

@philsttr #100 now contains the connection closing timeout (with a default of 30 seconds instead of no timeout before) and the fix for close() in Sender and Receiver.

If you think that fits your needs, I'll merge the PR.

Note I had to convert Mono/Flux#transform calls to Mono/Flux#composeNow because Mono/Flux#transform is deprecated in Reactor 3.3.

/cc @pmackowski

acogoluegnes commented 5 years ago

Fixed in #100 by not caching failures.

acogoluegnes commented 5 years ago

@philsttr FYI, it appears Mono/Flux#transform won't be deprecated in Reactor 3.3 (it's deprecated in the current snapshots, but this will go away soon).

/cc @simonbasle @pmackowski