Closed oscarfh closed 2 years ago
There were circumstance in which I couldn't get the Autorecover Channels to bring things back to the correct state at the right time. I'm not sure what they were now, I think to do with multiple clients coming back to a replaced (clean) server introducing race conditions that I couldn't work around.
When I did my work on the rabbit publisher the default value for reconnect attempts was adequate to make it retry, but the RabbitMqOptions was subsequently changed to derive from NetClientOptions. I think that was a mistake, because I think the vertx generators are broken (#112). The workaround is to ensure that there is an adequately large value for the reconnect attempts.
The new way of doing reconnects requires the caller to use addConnectionEstablishedCallback to do whatever it wants to the connection before it is used. If you have to declare rabbit objects it has to be done in a connection established callback.
It should be possible to ignore the retrying that vertx adds (by having reconnectAttempts = 0 and automaticRecoveryEnabled = true). That should fall back to just using what the Java client provides. This may leave you with a race conditions when the auto recovery fails to reconnect, but should be equivalent to what vert.x 3 did. From what you are saying this isn't the case, so I agree that that's a regression.
In your test are you buffering the messages that you are trying to send whilst the server is down, or accepting that they are lost? There is a unit test (RabbitMQClientPublisherTest.java) that does effectively the same as your repro, but using the new RabbitMQPublisher (and thus using a connection established callback) to buffer messages client side until the server accepts them. There shouldn't be any need to use the RabbitMQPublisher, but that's what it's for.
Been digging around a bit with your repro, and found two things:
I'll try to find time for more digging tomorrow.
After a bit more digging:
I'll do a PR (code done, need to update docs). I don't know what to do about notifying breaking changes, but also I think 4.0.1 should be compatible with 3.9.5, though 4.0.0 isn't.
@YaytayAtWork Thanks a lot! Only now I had time to read your messages. I think your questions from the first post are now outdated. Let me know if they are still important.
One question from your PR: if I understood right, your approach for recovery requires the connectionEstablishedCallback
to be declared so the queue is recreated and re-bound. Is this correct?
Yes, and that is correct.
Ok, thanks a lot. I will wait for your PR to get merged.
@YaytayAtWork I just noticed that your PR was merged. Thanks a lot. What is the process in this project? When is this going to be available?
To me it seems the PR got closed without a merge.
@YaytayAtWork has this been done by accident? Is the issue fixed due to other changes?
You are right, I'll redo the PR.
@YaytayAtWork @vietj any chances this makes into 4.0.3? I just tried 4.0.1 (because it is marked as milestone 4.0.1) but it does not work.
@oscarfh could you retry with your reproducer so we con confirm this ticket can be closed?
@jekkel I was waiting for the 4.0.3 release, but I will do that using the commit hash.
I am very sorry for the delay, I completely forgot it. I think this issue is still present (I had it in another project). I will update the reproducer.
@vietj Yes, I can confirm this is still an issue. Setting
rabbitMQOptions.setAutomaticRecoveryEnabled(false);
rabbitMQOptions.setReconnectAttempts(Integer.MAX_VALUE);
rabbitMQOptions.setReconnectInterval(500);
as per doc, will generate
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no queue 'test_topic' in vhost '/', class-id=60, method-id=20)
at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:517)
at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:341)
at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:182)
at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:114)
at com.rabbitmq.client.impl.AMQConnection.readFrame(AMQConnection.java:739)
at com.rabbitmq.client.impl.AMQConnection.access$300(AMQConnection.java:47)
at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:666)
... 1 common frames omitted
Trying to configure anything on rabbitMQClient.getDelegate().addConnectionEstablishedCallback()
will make the consumer not to start.
ps: I updated the reproducer in https://github.com/oscarfh/vertx-rabbit-regression to show the issue
The reconnections are behaving as expected - with automatic recovery disabled and reconnects enabled you MUST use the connectionEstablishedCallback in order to recreate your objects. So the question is, what are you doing there that prevents the client starting? Can you provide an example?
@Yaytay
The Rxified version of the client does not provide such method, so I have to use getDelegate()
.
Adding rabbitMQClient.getDelegate().addConnectionEstablishedCallback(event -> /* your code here, or empty */)
will cause the rxStart not to complete.
~I updated the reproducer. If you run the test, you will see the message "Connection established", but not "Declaring exchange" (and the test is blocked waiting for the consumer to connect).
Comment the addConnectionEstablished
line and you will see the "Declaring Exchange" message along with tons of other log messages from the test.~
I got it to work. I was missing the event.complete()
call.
thanks a lot.
Excellent.
@Yaytay One question that rose within my team: is it expected that this method is not exposed by rxified API? Or should I open a bug in the generator?
It should definitely be exposed.
Ok, I will create the bug ticket, thanks
This is the result of further research of from https://github.com/eclipse-vertx/vert.x/issues/3726. At the end, there is the original post for completion sake.
After migrating to Vertx 4, a test that verified that the producers and consumers could reestablish connection after a RabbitMQ failure broke.
These are my findings:
RabbitMQOptions#setReconnectAttempts
method in order to enable retry, which was not the case before (before, usingRabbitMQOptions#setAutomaticRecoveryEnabled
was enough. This seems to be a breaking change that was not documented.RabbitMQClientImpl#restartConsumer
changes the way reconnection happens. Unfortunately the new way only works with durable queues. If your queue is not durable, you will get achannel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no queue 'test_topi2' in vhost '/', class-id=60, method-id=20)
. This seems to be a bug, since the older version would recreate the queue.I tried to fix the issue myself, but it seems that we do not have the required information at hand in
RabbitMQClientImpl#restartConsumer
and that it requires a deeper change. @YaytayAtWork, I saw that you were involved in the changes in this method. Can you help me understand why this was created? What didn't work in the previous approach? Is there any way we can reuse thecom.rabbitmq.client.impl.recovery.AutorecoveringChannel
class capabilities in order to reconnect?Original Post: