wagslane / go-rabbitmq

A wrapper of streadway/amqp that provides reconnection logic and sane defaults
https://blog.boot.dev/golang/connecting-to-rabbitmq-in-golang-easy/
MIT License
785 stars 128 forks source link

memory leak #149

Open lmb1113 opened 9 months ago

lmb1113 commented 9 months ago

image

image

My English is not very good, please understand.As the concurrency of my project increases, the memory also increases. After investigation, it was found that the reason was due to the publisher.startNotifyBlockedHandler() method's failure to exit and release memory in a timely manner. I am not sure what happened that caused the failure to exit. My expected outcome is publisher Close() can close normally.

After commenting out this line of code, I found it to be effective, but it is not elegant. image

lmb1113 commented 9 months ago

Close your publishers and consumers when you're done with them and do not attempt to reuse them. Only close the connection itself once you've closed all associated publishers and consumers.

The issue has been resolved through conn.close.But it's not possible to repeat newConn.I did not reuse the publisher, but I reused Conn. When the publisher closed, startNotifyBlockedHandler was still blocking, so I had to reuse the publisher

LucaWolf commented 8 months ago

tough, that seems to be a leaky goroutine :-( indeed, the design does not cater for its termination. Short of moving things around [ e.g. a) provide an additional control channel like a context triggering on publisher close events or b) making the blocked chan parameter part of the publisher struct -- so it closes automatically when publisher is GC + changing the range blockings into a for/select also listening to this additional channel in addition to blockings], your best bet is to leave that goroutine commented out and live with the fact that connection blocking (has this really happened ever in real life?) will block your publishing/consuming (which ideally should have a time-out anyway).

wagslane commented 7 months ago

It seems to me that the most likely issue is that you have a lot of Publish() calls getting blocked. This function: startNotifyBlockedHandler waits for any in-flight publishes to finish and then stops new ones from happening. So if they are hanging indefinately for some reason that could be the root cause. I'd argue something at the infrastructure level is probably the issue.

You could add a context timeout, and that should stop them from hanging forever.

mjurczik commented 5 months ago

Hello, this seems still to be an issue.

I could reproduce this goroutine leak by using the example of the readme and printing the stacktrace before and after closing the publisher. The goroutine started at startup will not terminate with closing the publisher.

The problem is the goroutine is started and passes a notify chan (calling go channels chan to prevent confusion with rabbitmqs Channels) to the Connection. This chan will only be closed if the connection to rabbitmq is closed here.

But a long lived service will stay connected to the rabbitmq therefore hold the Connection alive. And only creating and closing publishers as needed. This results in the connection holding all channels in the blocks array created on startNotifyBlockedHandler, not closing it when a publisher shutdowns and keeping the goroutine at the for loop alive.

Contrary to the startNotifyFlowHandler which is also started on the creation of a publisher its chan is managed by a Channel. This channel is closed if the publisher Close function is called. Which terminates this routine started by the publisher.

I have two ideas for a fix:

I could implement the first. The second option would need discussion and implementation in amqp091-go library first.

Take care!

lmb1113 commented 5 months ago

Hello, this seems still to be an issue.

I could reproduce this goroutine leak by using the example of the readme and printing the stacktrace before and after closing the publisher. The goroutine started at startup will not terminate with closing the publisher.

The problem is the goroutine is started and passes a notify chan (calling go channels chan to prevent confusion with rabbitmqs Channels) to the Connection. This chan will only be closed if the connection to rabbitmq is closed here.

But a long lived service will stay connected to the rabbitmq therefore hold the Connection alive. And only creating and closing publishers as needed. This results in the connection holding all channels in the blocks array created on startNotifyBlockedHandler, not closing it when a publisher shutdowns and keeping the goroutine at the for loop alive.

Contrary to the startNotifyFlowHandler which is also started on the creation of a publisher its chan is managed by a Channel. This channel is closed if the publisher Close function is called. Which terminates this routine started by the publisher.

I have two ideas for a fix:

  • your above mentioned context which is passed down into the function and acts as a lifetime, if the publisher is closed this lifetime is exceeded and the routine finishes. This is NOT allowed to close the channel or it will panic if the Connection is terminated and trying to close the channel aswell
  • tag the channel passed, notify the connection to remove the channel on publisher close. This would require changes in the wrapped library as well

I could implement the first. The second option would need discussion and implementation in amqp091-go library first.

Take care!

once.Do rabbitmq.NewPublisher;Reusing this connection has not caused any problems after large-scale high-concurrency testing

mjurczik commented 5 months ago

For me this is not a fix of the problem. This does not change the fact that even initializing one publisher and closing it will not stop all started goroutines from this publisher thus resulting in a leaking goroutine. Furthermore someone can have multiple publisher which are written to simultaneously with different publisher options configured. So i'm confused how it should be possible to reuse a publisher created with different settings which does not allow to change them. Can you elaborate this more?

lmb1113 commented 5 months ago

For me this is not a fix of the problem. This does not change the fact that even initializing one publisher and closing it will not stop all started goroutines from this publisher thus resulting in a leaking goroutine. Furthermore someone can have multiple publisher which are written to simultaneously with different publisher options configured. So i'm confused how it should be possible to reuse a publisher created with different settings which does not allow to change them. Can you elaborate this more?

var once sync.Once
var oncePublish sync.Once
var conn *rabbitmq.Conn
var publisher *rabbitmq.Publisher

func GetMqConn() *rabbitmq.Conn {
    once.Do(func() {
        var err error
        conn, err = rabbitmq.NewConn(GetMqUrl())
        if err != nil {
            logging.Logger.Error("rabbitmq conn err", zap.Error(err))
        }
    })
    return conn
}

func GetPublisher() (*rabbitmq.Publisher, error) {
    var err error
    oncePublish.Do(func() {
        publisher, err = rabbitmq.NewPublisher(
            GetMqConn(),
            rabbitmq.WithPublisherOptionsLogging,
        )
    })
    return publisher, err
}

Our business is relatively simple, with only one configuration. We can use GetPublisher globally, which has withstood the test of high concurrency. If you have multiple configurations, my suggestion is to put different configurations of Publishers into the Publisher pool to reuse the same configuration of Publishers

mjurczik commented 5 months ago

Ah thank you for your example @lmb1113 . We have a different use case, we open a lot of publishers because each client gets its own queue and exchange for incoming messages

mjurczik commented 4 months ago

Hello,

dont want to necro bump but @wagslane should i open a new issue for this? Or can this issue be reopened?

wagslane commented 4 months ago

Sorry I've been crazy busy at work. I want to look at this when I have some time just haven't had a chance yet

lmb1113 commented 4 months ago

Hello, I noticed that each publisher has created the startNotifyBlockedHandler function. Can we adjust it to use the same function for each conn? My understanding is that the function of startNotifyBlockedHandler is to prevent publishing if the connection is not available @mjurczik @wagslane Reproduce memory leak process, when globally connected once, multiple publishers

lmb1113 commented 4 months ago

publisher.startNotifyBlockedHandler()->conn.startNotifyBlockedHandler()?

hotrush commented 3 months ago

We also met this issue. We don't reuse publishers or consumers, but when getting a lot of messages to publish we receive OOMs with ~2-3gb memory usage when regular memory usage is about 100-200mb. Hope you can fix it.

lmb1113 commented 3 months ago

publisher.startNotifyBlockedHandler()->conn.startNotifyBlockedHandler()?

We also met this issue. We don't reuse publishers or consumers, but when getting a lot of messages to publish we receive OOMs with ~2-3gb memory usage when regular memory usage is about 100-200mb. Hope you can fix it.

Suggest reuse publishers or consumers

hotrush commented 3 months ago

@lmb1113 we tried that approach but getting a lot of errors like here:

lmb1113 commented 3 months ago

@lmb1113 we tried that approach but getting a lot of errors like here:

Reuse connections and reuse publishers or consumers?

For me this is not a fix of the problem. This does not change the fact that even initializing one publisher and closing it will not stop all started goroutines from this publisher thus resulting in a leaking goroutine. Furthermore someone can have multiple publisher which are written to simultaneously with different publisher options configured. So i'm confused how it should be possible to reuse a publisher created with different settings which does not allow to change them. Can you elaborate this more?

var once sync.Once
var oncePublish sync.Once
var conn *rabbitmq.Conn
var publisher *rabbitmq.Publisher

func GetMqConn() *rabbitmq.Conn {
  once.Do(func() {
      var err error
      conn, err = rabbitmq.NewConn(GetMqUrl())
      if err != nil {
          logging.Logger.Error("rabbitmq conn err", zap.Error(err))
      }
  })
  return conn
}

func GetPublisher() (*rabbitmq.Publisher, error) {
  var err error
  oncePublish.Do(func() {
      publisher, err = rabbitmq.NewPublisher(
          GetMqConn(),
          rabbitmq.WithPublisherOptionsLogging,
      )
  })
  return publisher, err
}

Our business is relatively simple, with only one configuration. We can use GetPublisher globally, which has withstood the test of high concurrency. If you have multiple configurations, my suggestion is to put different configurations of Publishers into the Publisher pool to reuse the same configuration of Publishers

@hotrush Reuse connections and reuse publishers or consumers?

hotrush commented 3 months ago

@lmb1113 connection reused always. Publisher reused - connection/reconnection issues, publisher not reused - memory leaks. Smth like that

wagslane commented 3 months ago

@mjurczik So all resources are not necessarily expected to be cleaned up when you close the publisher, you would need to also close the underlying connection itself. When you close both are you still seeing a "leak"?

(And if that's not a suitable use case you might want to just drop down to the amqp lib)

mjurczik commented 2 months ago

Hello,

So all resources are not necessarily expected to be cleaned up when you close the publisher, you would need to also close the underlying connection itself.

Imo this should not be expected. A connection should be reusable, removing a publisher and have them free their memory along side their started routines should not force you to close the connection aswell. In the readme itself it says: Close your publishers and consumers when you're done with them and do not attempt to reuse them. Only close the connection itself once you've closed all associated publishers and consumers.

When you close both are you still seeing a "leak"?

Sure if i shutdown my programme and gracefully close all publisher and afterwards close the connection the routines will be stopped gracefully. That happens because on conn.Close() function shutdown closes the channels blocking the startNotifyFlowHandler.

Closing the connection seems not to be a suitable fix:

Lets say we have 3 Publishers, and we want to close one of them because it is not needed anymore. We can't simply close it AND the connection. This will eventually break the connection required by the other still in use publishers as well. Closing only the publisher will keep the startNotifyFlowHandler routine alive, which doesn't do anything anymore, because the publisher it is assigned to and the notify will be reported to is already closed and discarded.

I thought about my "fix" with the lifetime, the problem is this would finish the routine and allow the publisher + routine to be garbage collected but the chan whould still be hold in the Connection blocks array in amqp lib. I guess it would be better to implement something similar in the ConnectionManager it holds the block channels, but on NotifyBlockedSafe does not pass them down, instead the ConnectionManager passes one channel down to amqp and on signal iterates through the selfmanaged blocks channels. Additionally a function to remove a channel from the connection manager would be needed allowing to remove it.

If you like i can implement this, create a PR and you can still decide if this does make sense for you :)

wagslane commented 2 months ago

Yeah this makes sense to me @mjurczik ! If we have 3 publishers on a single connection, we should be able to close 1 publisher, keep the connection alive for the other 2, and release any publisher specific resources.

If you'd like to propose a PR I will take a look when I have time