wagslane / go-rabbitmq

A wrapper of streadway/amqp that provides reconnection logic and sane defaults
https://blog.boot.dev/golang/connecting-to-rabbitmq-in-golang-easy/
MIT License
772 stars 126 forks source link

Consumer won't reconnect #47

Closed nicklaros closed 2 years ago

nicklaros commented 2 years ago

Hi,

Somehow when network got disconnected for a minutes or so and then network up again, I got a log

gorabbit: rabbit consumer goroutine closed

After that, consumer stops receiving new message. Why wont it reconnect?

I'm using version 0.6.2, and my code is (more or less) like this

rabbitmqConsumer, err := rabbitmq.NewConsumer(config.constructURL(), amqp.Config{}, rabbitmq.WithConsumerOptionsLogging)
...
rabbitmqConsumer.StartConsuming(
  func(message rabbitmq.Delivery) bool {
    return true
  },
  "",
  []string{""},
  func(options *rabbitmq.ConsumeOptions) {
    options.QueueExclusive = true
    options.ConsumerExclusive = true
    options.QueueDurable = true
    options.BindingExchange = &rabbitmq.BindingExchangeOptions{
      Name:    "my_rabbitmq_topic",
      Kind:    "fanout",
      Durable: true,
    }
  },
)
nicklaros commented 2 years ago

https://github.com/wagslane/go-rabbitmq/blob/4d14909a3fed0e3a1466b7eb3818b6050f803348/consume.go#L259

Looks like msgs channel is closed causing it to print log from this line

https://github.com/wagslane/go-rabbitmq/blob/4d14909a3fed0e3a1466b7eb3818b6050f803348/consume.go#L282

So how to make it start consuming again?

miketonks commented 2 years ago

Any update on this? Does the library handle Automatic reconnection as stated in the README or not?

I'm looking for a library that does this, so if we can use this one that would be ideal, but need to know if it works.

wagslane commented 2 years ago

It is supposed to yes, I haven't had time to dive in on this bug yet. I'll try to take a look this week

miketonks commented 2 years ago

Thanks. It does seem to reconnect fine.

I did a simple test by runnung rabbitmq server in a docker container. After restarting the docker container, the consumer reconnected automatically:

https://github.com/wagslane/go-rabbitmq/blob/4d14909a3fed0e3a1466b7eb3818b6050f803348/channel.go#L61

2021/11/29 17:14:52 consumed: Hello World!
2021/11/29 17:14:53 consumed: Hello World!
2021/11/29 17:14:58 gorabbit: rabbit consumer goroutine closed
2021/11/29 17:14:58 gorabbit: rabbit consumer goroutine closed
2021/11/29 17:14:58 gorabbit: rabbit consumer goroutine closed
2021/11/29 17:14:58 gorabbit: rabbit consumer goroutine closed
2021/11/29 17:14:58 gorabbit: rabbit consumer goroutine closed
2021/11/29 17:14:58 gorabbit: attempting to reconnect to amqp server after close
2021/11/29 17:14:58 gorabbit: rabbit consumer goroutine closed
2021/11/29 17:14:58 gorabbit: rabbit consumer goroutine closed
2021/11/29 17:14:58 gorabbit: rabbit consumer goroutine closed
2021/11/29 17:14:58 gorabbit: waiting 1s seconds to attempt to reconnect to amqp server
2021/11/29 17:14:58 gorabbit: rabbit consumer goroutine closed
2021/11/29 17:14:58 gorabbit: rabbit consumer goroutine closed
2021/11/29 17:14:59 gorabbit: error reconnecting to amqp server: Exception (501) Reason: "read tcp 127.0.0.1:54752->127.0.0.1:5672: read: connection reset by peer"
2021/11/29 17:14:59 gorabbit: waiting 2s seconds to attempt to reconnect to amqp server
2021/11/29 17:15:01 gorabbit: error reconnecting to amqp server: dial tcp 127.0.0.1:5672: connect: connection refused
2021/11/29 17:15:01 gorabbit: waiting 4s seconds to attempt to reconnect to amqp server
2021/11/29 17:15:05 gorabbit: error reconnecting to amqp server: Exception (501) Reason: "read tcp 127.0.0.1:54858->127.0.0.1:5672: read: connection reset by peer"
2021/11/29 17:15:05 gorabbit: waiting 8s seconds to attempt to reconnect to amqp server
2021/11/29 17:15:13 gorabbit: successfully reconnected to amqp server after close
2021/11/29 17:15:13 gorabbit: consume cancel/close handler triggered. err: Exception (320) Reason: "CONNECTION_FORCED - broker forced connection closure with reason 'shutdown'"
2021/11/29 17:15:13 gorabbit: waiting 1s seconds to attempt to start consumer goroutines
2021/11/29 17:15:14 gorabbit: Processing messages on 10 goroutines
2021/11/29 17:15:14 consumed: Hello World!
2021/11/29 17:15:14 consumed: Hello World!

Looks good to me :+1:

wagslane commented 2 years ago

Thanks for running that little test! I'll still take a look at the above report, but this helps me check some boxes :)

On Mon, Nov 29, 2021, 10:19 AM miketonks @.***> wrote:

Thanks. It does seem to reconnect fine.

I did a simple test by runnung rabbitmq server in a docker container. After restarting the docker container, the consumer reconnected automatically:

https://github.com/wagslane/go-rabbitmq/blob/4d14909a3fed0e3a1466b7eb3818b6050f803348/channel.go#L61

2021/11/29 17:14:52 consumed: Hello World!

2021/11/29 17:14:53 consumed: Hello World!

2021/11/29 17:14:58 gorabbit: rabbit consumer goroutine closed

2021/11/29 17:14:58 gorabbit: rabbit consumer goroutine closed

2021/11/29 17:14:58 gorabbit: rabbit consumer goroutine closed

2021/11/29 17:14:58 gorabbit: rabbit consumer goroutine closed

2021/11/29 17:14:58 gorabbit: rabbit consumer goroutine closed

2021/11/29 17:14:58 gorabbit: attempting to reconnect to amqp server after close

2021/11/29 17:14:58 gorabbit: rabbit consumer goroutine closed

2021/11/29 17:14:58 gorabbit: rabbit consumer goroutine closed

2021/11/29 17:14:58 gorabbit: rabbit consumer goroutine closed

2021/11/29 17:14:58 gorabbit: waiting 1s seconds to attempt to reconnect to amqp server

2021/11/29 17:14:58 gorabbit: rabbit consumer goroutine closed

2021/11/29 17:14:58 gorabbit: rabbit consumer goroutine closed

2021/11/29 17:14:59 gorabbit: error reconnecting to amqp server: Exception (501) Reason: "read tcp 127.0.0.1:54752->127.0.0.1:5672: read: connection reset by peer"

2021/11/29 17:14:59 gorabbit: waiting 2s seconds to attempt to reconnect to amqp server

2021/11/29 17:15:01 gorabbit: error reconnecting to amqp server: dial tcp 127.0.0.1:5672: connect: connection refused

2021/11/29 17:15:01 gorabbit: waiting 4s seconds to attempt to reconnect to amqp server

2021/11/29 17:15:05 gorabbit: error reconnecting to amqp server: Exception (501) Reason: "read tcp 127.0.0.1:54858->127.0.0.1:5672: read: connection reset by peer"

2021/11/29 17:15:05 gorabbit: waiting 8s seconds to attempt to reconnect to amqp server

2021/11/29 17:15:13 gorabbit: successfully reconnected to amqp server after close

2021/11/29 17:15:13 gorabbit: consume cancel/close handler triggered. err: Exception (320) Reason: "CONNECTION_FORCED - broker forced connection closure with reason 'shutdown'"

2021/11/29 17:15:13 gorabbit: waiting 1s seconds to attempt to start consumer goroutines

2021/11/29 17:15:14 gorabbit: Processing messages on 10 goroutines

2021/11/29 17:15:14 consumed: Hello World!

2021/11/29 17:15:14 consumed: Hello World!

Looks good to me 👍

— You are receiving this because you commented. Reply to this email directly, view it on GitHub https://github.com/wagslane/go-rabbitmq/issues/47#issuecomment-981845666, or unsubscribe https://github.com/notifications/unsubscribe-auth/AFHZKVONOQU255HWSJLEIP3UOOY3LANCNFSM5HPN4IYA . Triage notifications on the go with GitHub Mobile for iOS https://apps.apple.com/app/apple-store/id1477376905?ct=notification-email&mt=8&pt=524675 or Android https://play.google.com/store/apps/details?id=com.github.android&referrer=utm_campaign%3Dnotification-email%26utm_medium%3Demail%26utm_source%3Dgithub.

wagslane commented 2 years ago

@nicklaros Do you ever see these logs?

attempting to reconnect to amqp server after close waiting Xs seconds to attempt to reconnect to amqp server

We do an exponential backoff by default, and I'm wondering if with a long disconnect time you were getting backoff retries with 2 min+ wait times.

You might not be seeing the logs if you aren't using rabbitmq.WithConsumerOptionsLogging

phillip-o commented 2 years ago

I'm having trouble with reconnects too on v0.7.0. Simulating connection loss by stopping an active port forwarding from a Kubernetes cluster, followed by opening it again a few seconds later.

2022/01/11 22:55:02 gorabbit: rabbit consumer goroutine closed

..but nothing else after 10 minutes.

Any tricks I could try? I'm a rookie user of this module, so I wouldn't be surprised if I'm doing something wrong on my end.

tmalaher commented 2 years ago

Having the same difficulty. I'll start my consumer, it will pick up a few messages, and once the queue is drained and it's been sitting idle... ~10 hours later, "gorabbit: rabbit consumer goroutine closed" and no reconnection. I have to stop and restart the consumer. Every time. I don't think this is network connection related, and the server has not gone down.

Consumer options are as follows (yes, it's just the example from the README):

v0.7.0

 consumer, err := rabbitmq.NewConsumer(fmt.Sprintf(
                "amqp://%s:%s@%s", user, pass, host), amqp.Config{},
                rabbitmq.WithConsumerOptionsLogging,
        )
//...
err = consumer.StartConsuming(
                func(d rabbitmq.Delivery) (action rabbitmq.Action) {
                        return handler(d, done)
                },
                "my_queue",
                []string{"routing_key"},
                rabbitmq.WithConsumeOptionsConcurrency(1),
                rabbitmq.WithConsumeOptionsQueueDurable,
                rabbitmq.WithConsumeOptionsQuorum,
                rabbitmq.WithConsumeOptionsBindingExchangeName("events"),
                rabbitmq.WithConsumeOptionsBindingExchangeKind("topic"),
                rabbitmq.WithConsumeOptionsBindingExchangeDurable,
                rabbitmq.WithConsumeOptionsConsumerName(consumerName),
        )
wagslane commented 2 years ago

I'm sorry I'm not able to reproduce this. Are you guys sure you aren't missing some logging, and that messages are actually being sent?

This is my test process. Let me know what to do differently to see what you're seeing:

  1. Start example consumer connected to local rabbitmq
  2. Close local rabbit server
  3. Watch consumer auto reconnect
2022/02/27 17:34:30 gorabbit: attempting to reconnect to amqp server after close
2022/02/27 17:34:30 gorabbit: rabbit consumer goroutine closed
2022/02/27 17:34:30 gorabbit: rabbit consumer goroutine closed
2022/02/27 17:34:30 gorabbit: rabbit consumer goroutine closed
2022/02/27 17:34:30 gorabbit: waiting 1s seconds to attempt to reconnect to amqp server
2022/02/27 17:34:30 gorabbit: rabbit consumer goroutine closed
2022/02/27 17:34:30 gorabbit: rabbit consumer goroutine closed
2022/02/27 17:34:30 gorabbit: rabbit consumer goroutine closed
2022/02/27 17:34:30 gorabbit: rabbit consumer goroutine closed
2022/02/27 17:34:30 gorabbit: rabbit consumer goroutine closed
2022/02/27 17:34:31 gorabbit: error reconnecting to amqp server: dial tcp 127.0.0.1:5672: connect: connection refused
2022/02/27 17:34:31 gorabbit: waiting 2s seconds to attempt to reconnect to amqp server
2022/02/27 17:34:33 gorabbit: error reconnecting to amqp server: dial tcp 127.0.0.1:5672: connect: connection refused
2022/02/27 17:34:33 gorabbit: waiting 4s seconds to attempt to reconnect to amqp server
2022/02/27 17:34:37 gorabbit: error reconnecting to amqp server: dial tcp 127.0.0.1:5672: connect: connection refused
2022/02/27 17:34:37 gorabbit: waiting 8s seconds to attempt to reconnect to amqp server
2022/02/27 17:34:45 gorabbit: error reconnecting to amqp server: dial tcp 127.0.0.1:5672: connect: connection refused
2022/02/27 17:34:45 gorabbit: waiting 16s seconds to attempt to reconnect to amqp server

2022/02/27 17:35:01 gorabbit: successfully reconnected to amqp server after close
2022/02/27 17:35:01 gorabbit: gorabbit: successful recovery from: Exception (320) Reason: "CONNECTION_FORCED - broker forced connection closure with reason 'shutdown'"
2022/02/27 17:35:01 gorabbit: waiting 1s seconds to attempt to start consumer goroutines
2022/02/27 17:35:03 gorabbit: Processing messages on 10 goroutines
  1. Start publisher, watch the consumer consume the messages
  2. Close cleanly both the publisher and consumer
phillip-o commented 2 years ago

Thanks for having a look into this @wagslane 👍

Speaking for myself, I'm sure there aren't other logs and I certainly can't imagine messages flowing through after having closed the forwarded connection into the kubernetes cluster.

I did not try to run a local RabbitMQ server tho, I could ofcourse see if that makes any difference.

tmalaher commented 2 years ago

I'm sorry I'm not able to reproduce this. Are you guys sure you aren't missing some logging, and that messages are actually being sent?

This is my test process. Let me know what to do differently to see what you're seeing:

  1. Start example consumer connected to local rabbitmq
  2. Close local rabbit server
  3. Watch consumer auto reconnect
  4. Start publisher, watch the consumer consume the messages
  5. Close cleanly both the publisher and consumer

Note that this is not my use case. My case is:

  1. Start remote server
  2. Start local consumer
  3. Watch consumer receive messages
  4. ...long period of inactivity...
  5. See message gorabbit: rabbit consumer goroutine closed in the consumer log
  6. Publish more messages
  7. messages are not picked up by consumer
  8. Stop local consumer
  9. Start new local consumer
  10. Messages are received

So it seems like the consumer gets bored and drops the connection and does not reconnect.

fortyanov commented 2 years ago

It may helps #65

wagslane commented 2 years ago

@tmalaher I just published a new version with some more logging as I'm still not able to reproduce this error. There is a case where the an error is initiated by the client, or where the channel is closed gracefully that we don't attempt (and shouldn't attempt) to reconnect. Can you try with the new logs? Hopefully they will give us some more insight into what is causing this "boredom"

ruinshe commented 2 years ago

Hi all, I managed to reproduce this issue in latest version 0.7.2 and the logs attached below, could you take a look?

2022/03/24 18:49:05 gorabbit: Processing messages on 1 goroutines
2022/03/25 10:16:19 gorabbit: rabbit consumer goroutine closed
2022/03/25 10:16:19 gorabbit: not attempting to reconnect to amqp server because closure was initiated by the client
2022/03/25 10:16:19 gorabbit: not attempting to reconnect to amqp server because closure was initiated by the client

What I did is, manually disable my computer's network for a while and re-enable the network.

wagslane commented 2 years ago

@ruinshe Can you please try again with 0.7.3? The new logs should show us exactly which error you are getting. Basically we just need to keep whitelisting the various errors that make sense to reconnect on from the AMQP lib

tmalaher commented 2 years ago

I've narrowed this down, I think, to when my (RedHat Linux 8.5) laptop hibernates. With 0.7.2, I get this: 2022/03/31 19:26:39 gorabbit: not attempting to reconnect to amqp server because closure was initiated by the client 2022/03/31 19:26:39 gorabbit: not attempting to reconnect to amqp server because closure was initiated by the client 2022/03/31 19:26:39 gorabbit: rabbit consumer goroutine closed

I've recompiled with 0.7.3 and I'll let you know if that changes anything.

tmalaher commented 2 years ago

With 0.7.3, it stayed connected for 10+ hours, until I hibernated the machine, then: 2022/04/01 07:43:59 gorabbit: rabbit consumer goroutine closed 2022/04/01 07:43:59 gorabbit: not attempting to reconnect to amqp server because closure was initiated by the client: Exception (501) Reason: "read tcp 192.168.1.88:34762->162.246.156.242:5672: i/o timeout" 2022/04/01 07:43:59 gorabbit: not attempting to reconnect to amqp server because closure was initiated by the client

i/o timeout doesn't seem like an "initiated by the client" situation. Is that another error that can be added to the "allow" list?

wagslane commented 2 years ago

@tmalaher awesome, yes! 0.7.4 should have the reconnect for that error.

marcelmiguel commented 2 years ago

No reconnecting with 0.7.4

I've changed channel.go from a fork marcelmiguel/go-rabbitmq to fix the reconnection.

Now seems to work. Need some more testing.

Te reproduce the error:

wagslane commented 2 years ago

Can you turn on logging and share your logs? It should tell us the disconnection error so we can whitelist it. @marcelmiguel

marcelmiguel commented 2 years ago

Sorry, forget to put the error:

Exception (501) Reason: "read tcp 192.168.1.150:63236->159.223.6.51:5672: wsarecv: Se ha anulado una conexión establecida por el software en su equipo host.."

But better not use any spanish phrase, as it can be on any language.

wagslane commented 2 years ago

Oh lol xD we catch this one in English but not Spanish. Is there an unwrap we can do to catch this in one? Like an errors.Is comparison?

wagslane commented 2 years ago

I'm hoping io.EOF is the right one.. I can add that

tmalaher commented 2 years ago

Recompiled with 0.7.4 and now it seems to survive hibernation. There are no log messages produced! I guess this is a good thing, as it means it is ignoring whatever errors the hibernate/wake cycle is causing.

I'll do further torture testing (I hibernate the laptop every day, so plenty of testing over the course of the next week.)

Thanks!

wagslane commented 2 years ago

@tmalaher Okay with all the feedback from this thread I believe I've solved... all? The reconnection issues in 0.8.0. I originally was doing clean closure wrong and thats why I had to whitelist the errors, but I don't think I need to anymore, so I can just reconnect on any error because the new .Close() method returns a nil error.

Sorry about the breaking API change, hopefully it's the last one!

That's what v0 is for after all haha.

You can also configure your logger, that's probably why you aren't seeing logs.

marcelmiguel commented 2 years ago

Seen all changes of the new 0.8.0, perfect !! Testing them tomorrow.

ruinshe commented 2 years ago

After upgrading to 0.8.0, it seems works now.

Will keep in mind of it in future, thanks!

marcelmiguel commented 2 years ago

With 0.8.0, half the time, give this when closing:

2022/04/06 10:00:51 gorabbit: closing consumer...
2022/04/06 10:00:51 gorabbit: attempting to reconnect to amqp server after cancel
2022/04/06 10:00:51 gorabbit: rabbit consumer goroutine closed
2022/04/06 10:00:51 gorabbit: waiting 5s seconds to attempt to reconnect to amqp server

And sometimes ends like this:

2022/04/06 10:03:58 gorabbit: closing consumer...
2022/04/06 10:03:58 gorabbit: rabbit consumer goroutine closed
2022/04/06 10:03:58 gorabbit: amqp channel closed gracefully
wagslane commented 2 years ago

@marcelmiguel what are you doing differently, or what different thing is happening when it tries to reconnect? That's the cancel message, which logic actually didn't change in 0.8.0

marcelmiguel commented 2 years ago

@wagslane This is happening when closing a connection via Close(), I didn't test it before v0.8.0. It is not a real problem for me, because the app closes before a reconnection is alive, but seems that the closgin method does not disable reconnection.

wagslane commented 2 years ago

I'm going to close this thread. I added some more logging @marcelmiguel to 0.8.1 because I think this might be something different. If you're still having issues please open a new issue.