wagslane / go-rabbitmq

A wrapper of streadway/amqp that provides reconnection logic and sane defaults
https://blog.boot.dev/golang/connecting-to-rabbitmq-in-golang-easy/
MIT License
805 stars 128 forks source link

Not reconnecting in all cases #99

Closed johanneswuerbach closed 1 year ago

johanneswuerbach commented 1 year ago

We have observed that the module isn't reconnecting in all cases, but still haven't found the specific problem.

Logs from one issue (sorry for the mixture of formats, only the connection manager had a custom logger applied):

{"level":"info","ts":"2022-12-16T04:05:23.661704901Z","caller":"worker/worker.go:109","msg":"Handled message"} <-- last message
2022/12/16 04:59:48 gorabbit ERROR: attempting to reconnect to amqp server after cancel with error: ctag-/bin/worker-1
2022/12/16 04:59:48 gorabbit INFO: waiting 5s seconds to attempt to reconnect to amqp server
2022/12/16 04:59:48 gorabbit INFO: rabbit consumer goroutine closed
2022/12/16 04:59:53 gorabbit WARN: successfully reconnected to amqp server after cancel
2022/12/16 04:59:53 gorabbit INFO: successful recovery from: ctag-/bin/worker-1
2022/12/16 04:59:53 gorabbit ERROR: error restarting consumer goroutines after cancel or close: declare queue failed: Exception (404) Reason: "NOT_FOUND - queue 'new-builds' in vhost 'platform-development' process is stopped by supervisor"
2022/12/16 04:59:53 gorabbit ERROR: attempting to reconnect to amqp server after close with error: Exception (404) Reason: "NOT_FOUND - queue 'new-builds' in vhost 'platform-development' process is stopped by supervisor"
2022/12/16 04:59:53 gorabbit INFO: waiting 5s seconds to attempt to reconnect to amqp server
2022/12/16 04:59:58 gorabbit WARN: error closing channel while reconnecting: Exception (504) Reason: "channel/connection is not open"
2022/12/16 04:59:58 gorabbit WARN: successfully reconnected to amqp server
2022/12/16 04:59:58 gorabbit INFO: successful recovery from: Exception (404) Reason: "NOT_FOUND - queue 'new-builds' in vhost 'platform-development' process is stopped by supervisor"
{"level":"error","ts":"2022-12-16T05:00:41.13624477Z","logger":"rabbitmq","caller":"hrabbitmq@v1.7.0/logger.go:25","msg":"attempting to reconnect to amqp server after connection close with error: Exception (501) Reason: \"write tcp 10.0.1.19:49792->10.32.75.145:5672: write: connection reset by peer\"","stacktrace":"XYZ/golib/hrabbitmq.Logger.Errorf\n\t/go/pkg/mod/XYZ/golib/hrabbitmq@v1.7.0/logger.go:25\ngithub.com/wagslane/go-rabbitmq/internal/connectionmanager.(*ConnectionManager).startNotifyClose\n\t/go/pkg/mod/github.com/wagslane/go-rabbitmq@v0.11.0/internal/connectionmanager/connection_manager.go:85"}
{"level":"info","ts":"2022-12-16T05:00:41.136325279Z","logger":"rabbitmq","caller":"hrabbitmq@v1.7.0/logger.go:33","msg":"waiting 5s seconds to attempt to reconnect to amqp server"}
{"level":"error","ts":"2022-12-16T05:01:16.137948493Z","logger":"rabbitmq","caller":"hrabbitmq@v1.7.0/logger.go:25","msg":"error reconnecting to amqp server: dial tcp: lookup rabbitmq.rabbitmq.svc.cluster.local: i/o timeout","stacktrace":"XYZ/golib/hrabbitmq.Logger.Errorf\n\t/go/pkg/mod/XYZ/golib/hrabbitmq@v1.7.0/logger.go:25\ngithub.com/wagslane/go-rabbitmq/internal/connectionmanager.(*ConnectionManager).reconnectLoop\n\t/go/pkg/mod/github.com/wagslane/go-rabbitmq@v0.11.0/internal/connectionmanager/connection_manager.go:115\ngithub.com/wagslane/go-rabbitmq/internal/connectionmanager.(*ConnectionManager).startNotifyClose\n\t/go/pkg/mod/github.com/wagslane/go-rabbitmq@v0.11.0/internal/connectionmanager/connection_manager.go:86"}
{"level":"info","ts":"2022-12-16T05:01:16.138040023Z","logger":"rabbitmq","caller":"hrabbitmq@v1.7.0/logger.go:33","msg":"waiting 5s seconds to attempt to reconnect to amqp server"}
{"level":"error","ts":"2022-12-16T05:01:51.141472319Z","logger":"rabbitmq","caller":"hrabbitmq@v1.7.0/logger.go:25","msg":"error reconnecting to amqp server: dial tcp: lookup rabbitmq.rabbitmq.svc.cluster.local: i/o timeout","stacktrace":"XYZ/golib/hrabbitmq.Logger.Errorf\n\t/go/pkg/mod/XYZ/golib/hrabbitmq@v1.7.0/logger.go:25\ngithub.com/wagslane/go-rabbitmq/internal/connectionmanager.(*ConnectionManager).reconnectLoop\n\t/go/pkg/mod/github.com/wagslane/go-rabbitmq@v0.11.0/internal/connectionmanager/connection_manager.go:115\ngithub.com/wagslane/go-rabbitmq/internal/connectionmanager.(*ConnectionManager).startNotifyClose\n\t/go/pkg/mod/github.com/wagslane/go-rabbitmq@v0.11.0/internal/connectionmanager/connection_manager.go:86"}
{"level":"info","ts":"2022-12-16T05:01:51.141562769Z","logger":"rabbitmq","caller":"hrabbitmq@v1.7.0/logger.go:33","msg":"waiting 5s seconds to attempt to reconnect to amqp server"}
{"level":"error","ts":"2022-12-16T05:02:26.143127796Z","logger":"rabbitmq","caller":"hrabbitmq@v1.7.0/logger.go:25","msg":"error reconnecting to amqp server: dial tcp: lookup rabbitmq.rabbitmq.svc.cluster.local: i/o timeout","stacktrace":"XYZ/golib/hrabbitmq.Logger.Errorf\n\t/go/pkg/mod/XYZ/golib/hrabbitmq@v1.7.0/logger.go:25\ngithub.com/wagslane/go-rabbitmq/internal/connectionmanager.(*ConnectionManager).reconnectLoop\n\t/go/pkg/mod/github.com/wagslane/go-rabbitmq@v0.11.0/internal/connectionmanager/connection_manager.go:115\ngithub.com/wagslane/go-rabbitmq/internal/connectionmanager.(*ConnectionManager).startNotifyClose\n\t/go/pkg/mod/github.com/wagslane/go-rabbitmq@v0.11.0/internal/connectionmanager/connection_manager.go:86"}
{"level":"info","ts":"2022-12-16T05:02:26.143234192Z","logger":"rabbitmq","caller":"hrabbitmq@v1.7.0/logger.go:33","msg":"waiting 5s seconds to attempt to reconnect to amqp server"}
{"level":"error","ts":"2022-12-16T05:03:01.145103317Z","logger":"rabbitmq","caller":"hrabbitmq@v1.7.0/logger.go:25","msg":"error reconnecting to amqp server: dial tcp: lookup rabbitmq.rabbitmq.svc.cluster.local: i/o timeout","stacktrace":"XYZ/golib/hrabbitmq.Logger.Errorf\n\t/go/pkg/mod/XYZ/golib/hrabbitmq@v1.7.0/logger.go:25\ngithub.com/wagslane/go-rabbitmq/internal/connectionmanager.(*ConnectionManager).reconnectLoop\n\t/go/pkg/mod/github.com/wagslane/go-rabbitmq@v0.11.0/internal/connectionmanager/connection_manager.go:115\ngithub.com/wagslane/go-rabbitmq/internal/connectionmanager.(*ConnectionManager).startNotifyClose\n\t/go/pkg/mod/github.com/wagslane/go-rabbitmq@v0.11.0/internal/connectionmanager/connection_manager.go:86"}
{"level":"info","ts":"2022-12-16T05:03:01.145347574Z","logger":"rabbitmq","caller":"hrabbitmq@v1.7.0/logger.go:33","msg":"waiting 5s seconds to attempt to reconnect to amqp server"}
{"level":"error","ts":"2022-12-16T05:03:36.147081783Z","logger":"rabbitmq","caller":"hrabbitmq@v1.7.0/logger.go:25","msg":"error reconnecting to amqp server: dial tcp: lookup rabbitmq.rabbitmq.svc.cluster.local: i/o timeout","stacktrace":"XYZ/golib/hrabbitmq.Logger.Errorf\n\t/go/pkg/mod/XYZ/golib/hrabbitmq@v1.7.0/logger.go:25\ngithub.com/wagslane/go-rabbitmq/internal/connectionmanager.(*ConnectionManager).reconnectLoop\n\t/go/pkg/mod/github.com/wagslane/go-rabbitmq@v0.11.0/internal/connectionmanager/connection_manager.go:115\ngithub.com/wagslane/go-rabbitmq/internal/connectionmanager.(*ConnectionManager).startNotifyClose\n\t/go/pkg/mod/github.com/wagslane/go-rabbitmq@v0.11.0/internal/connectionmanager/connection_manager.go:86"}
{"level":"info","ts":"2022-12-16T05:03:36.147184473Z","logger":"rabbitmq","caller":"hrabbitmq@v1.7.0/logger.go:33","msg":"waiting 5s seconds to attempt to reconnect to amqp server"}
{"level":"error","ts":"2022-12-16T05:04:11.148442465Z","logger":"rabbitmq","caller":"hrabbitmq@v1.7.0/logger.go:25","msg":"error reconnecting to amqp server: dial tcp: lookup rabbitmq.rabbitmq.svc.cluster.local: i/o timeout","stacktrace":"XYZ/golib/hrabbitmq.Logger.Errorf\n\t/go/pkg/mod/XYZ/golib/hrabbitmq@v1.7.0/logger.go:25\ngithub.com/wagslane/go-rabbitmq/internal/connectionmanager.(*ConnectionManager).reconnectLoop\n\t/go/pkg/mod/github.com/wagslane/go-rabbitmq@v0.11.0/internal/connectionmanager/connection_manager.go:115\ngithub.com/wagslane/go-rabbitmq/internal/connectionmanager.(*ConnectionManager).startNotifyClose\n\t/go/pkg/mod/github.com/wagslane/go-rabbitmq@v0.11.0/internal/connectionmanager/connection_manager.go:86"}
{"level":"info","ts":"2022-12-16T05:04:11.148524676Z","logger":"rabbitmq","caller":"hrabbitmq@v1.7.0/logger.go:33","msg":"waiting 5s seconds to attempt to reconnect to amqp server"}
{"level":"error","ts":"2022-12-16T05:04:46.150196103Z","logger":"rabbitmq","caller":"hrabbitmq@v1.7.0/logger.go:25","msg":"error reconnecting to amqp server: dial tcp: lookup rabbitmq.rabbitmq.svc.cluster.local: i/o timeout","stacktrace":"XYZ/golib/hrabbitmq.Logger.Errorf\n\t/go/pkg/mod/XYZ/golib/hrabbitmq@v1.7.0/logger.go:25\ngithub.com/wagslane/go-rabbitmq/internal/connectionmanager.(*ConnectionManager).reconnectLoop\n\t/go/pkg/mod/github.com/wagslane/go-rabbitmq@v0.11.0/internal/connectionmanager/connection_manager.go:115\ngithub.com/wagslane/go-rabbitmq/internal/connectionmanager.(*ConnectionManager).startNotifyClose\n\t/go/pkg/mod/github.com/wagslane/go-rabbitmq@v0.11.0/internal/connectionmanager/connection_manager.go:86"}
{"level":"info","ts":"2022-12-16T05:04:46.150313791Z","logger":"rabbitmq","caller":"hrabbitmq@v1.7.0/logger.go:33","msg":"waiting 5s seconds to attempt to reconnect to amqp server"}
{"level":"error","ts":"2022-12-16T05:05:21.15146101Z","logger":"rabbitmq","caller":"hrabbitmq@v1.7.0/logger.go:25","msg":"error reconnecting to amqp server: dial tcp: lookup rabbitmq.rabbitmq.svc.cluster.local: i/o timeout","stacktrace":"XYZ/golib/hrabbitmq.Logger.Errorf\n\t/go/pkg/mod/XYZ/golib/hrabbitmq@v1.7.0/logger.go:25\ngithub.com/wagslane/go-rabbitmq/internal/connectionmanager.(*ConnectionManager).reconnectLoop\n\t/go/pkg/mod/github.com/wagslane/go-rabbitmq@v0.11.0/internal/connectionmanager/connection_manager.go:115\ngithub.com/wagslane/go-rabbitmq/internal/connectionmanager.(*ConnectionManager).startNotifyClose\n\t/go/pkg/mod/github.com/wagslane/go-rabbitmq@v0.11.0/internal/connectionmanager/connection_manager.go:86"}
{"level":"info","ts":"2022-12-16T05:05:21.151567677Z","logger":"rabbitmq","caller":"hrabbitmq@v1.7.0/logger.go:33","msg":"waiting 5s seconds to attempt to reconnect to amqp server"}
{"level":"error","ts":"2022-12-16T05:05:56.152881546Z","logger":"rabbitmq","caller":"hrabbitmq@v1.7.0/logger.go:25","msg":"error reconnecting to amqp server: dial tcp: lookup rabbitmq.rabbitmq.svc.cluster.local: i/o timeout","stacktrace":"XYZ/golib/hrabbitmq.Logger.Errorf\n\t/go/pkg/mod/XYZ/golib/hrabbitmq@v1.7.0/logger.go:25\ngithub.com/wagslane/go-rabbitmq/internal/connectionmanager.(*ConnectionManager).reconnectLoop\n\t/go/pkg/mod/github.com/wagslane/go-rabbitmq@v0.11.0/internal/connectionmanager/connection_manager.go:115\ngithub.com/wagslane/go-rabbitmq/internal/connectionmanager.(*ConnectionManager).startNotifyClose\n\t/go/pkg/mod/github.com/wagslane/go-rabbitmq@v0.11.0/internal/connectionmanager/connection_manager.go:86"}
{"level":"info","ts":"2022-12-16T05:05:56.15299162Z","logger":"rabbitmq","caller":"hrabbitmq@v1.7.0/logger.go:33","msg":"waiting 5s seconds to attempt to reconnect to amqp server"}
{"level":"warn","ts":"2022-12-16T05:06:14.267313916Z","logger":"rabbitmq","caller":"hrabbitmq@v1.7.0/logger.go:29","msg":"error closing connection while reconnecting: Exception (504) Reason: \"channel/connection is not open\""}
{"level":"warn","ts":"2022-12-16T05:06:14.267384733Z","logger":"rabbitmq","caller":"hrabbitmq@v1.7.0/logger.go:29","msg":"successfully reconnected to amqp server"}
{"level":"info","ts":"2022-12-16T05:06:14.267430768Z","logger":"rabbitmq","caller":"hrabbitmq@v1.7.0/logger.go:33","msg":"successful connection recovery from: Exception (501) Reason: \"write tcp 10.0.1.19:49792->10.32.75.145:5672: write: connection reset by peer\""}

<-- no further messages handled

Code:

conn, err := rabbitmq.NewConn(
  connStr,
  rabbitmq.WithConnectionOptionsLogger(hrabbitmq.NewLogger(logger)),
)
if err != nil {
  sugar.Fatalw("failed to create a rabbit connection", "err", err)
}
defer conn.Close()

consumer, err := rabbitmq.NewConsumer(conn, func(d rabbitmq.Delivery) rabbitmq.Action {
  // omitted handler logic
  return rabbitmq.Ack
},
  "new-builds",
  rabbitmq.WithConsumerOptionsRoutingKey("organization.*.addBuild"),
  rabbitmq.WithConsumerOptionsRoutingKey("organization.*.addVersion"),
  rabbitmq.WithConsumerOptionsQueueDurable,
  rabbitmq.WithConsumerOptionsQOSPrefetch(1),
  rabbitmq.WithConsumerOptionsExchangeName(conf.RabbitmqExchange),
  rabbitmq.WithConsumerOptionsExchangeKind("topic"),
  rabbitmq.WithConsumerOptionsExchangeDurable,
)
if err != nil {
  sugar.Fatalw("failed to start consumer", "err", err)
}
defer consumer.Close()

Used version github.com/wagslane/go-rabbitmq v0.11.0

If you have any pointers what we could look into, that would be appreciated.

johanneswuerbach commented 1 year ago

Debugging this further it seems that the connection is successfully recovered (it shows up in the rabbitmq management interface), but the channel / consumer isn't 🤔

wagslane commented 1 year ago

Thank you for all the tests and this error log. After playing with the tests and looking through your logs, I don't think there is a good solution on the library side, except to maybe improve the logs so it's more obvious what's happening. This error:

declare queue failed: Exception (404) Reason: "NOT_FOUND - queue 'new-builds' in vhost 'platform-development' process is stopped by supervisor

is a queue declaration failure, which is something I don't think we want to be trying to recover from on the library side. You should be debugging your declarations for your application.

johanneswuerbach commented 1 year ago

is a queue declaration failure

Yes and no. A 404 can also happen while a queue is being moved between queue leaders and can be a temporary failure.

want to be trying to recover from on the library side

This would be fine for us as long as the library returns an error in such cases, so we can exit the process (and have it restarted).

wagslane commented 1 year ago

You can programmatically gain access to all of the errors by defining a custom logger, that may help capture the edge case errors.

johanneswuerbach commented 1 year ago

It feels somehow fairly implicit that a library user needs to implement logic that decides if the library can handle certain errors or not.

Couldn't there be some kind of unrecoverableError hook, that is called in such situations so I can hard-fail, panic the process?

This way the logic if something is recoverable could be located inside the library. Unrecoverable errors could be all kind of channel failures or failures that have been generally retried more than X times.