streadway / amqp

Go client for AMQP 0.9.1
http://godoc.org/github.com/streadway/amqp
BSD 2-Clause "Simplified" License
4.88k stars 621 forks source link

QoS prefetch buffer is not released when channel is closed #149

Open ghost opened 9 years ago

ghost commented 9 years ago

If you create a Consumer with auto-ack set to true, a memory leak will result.

Sample code:

conn, err := amqp.Dial(<constr>)
if err != nil {
    //
}

ch, err := conn.Channel()
if err != nil {
    //
}

if err != nil {
    //
}

defer conn.Close()
defer ch.Close()

err = ch.Qos(1, 0, false)
if err != nil {
    //
}

msgs, err := ch.Consume(
    "broker", // queue
    "",     // consumer
    true,   // auto-ack
    false,  // exclusive
    false,  // no-local
    false,  // no-wait
    nil,    // args
)

if err != nil {
    //
}

forever := make(chan bool)

go func() {
    for d := range msgs {           
        //
    }
}()

<-forever

If I populate a RabbitMQ server with 1M messages and try to process them this will result in about 1GB RSS from the program.

If I make the following changes only 12.5MB RSS is used to process 1M messages:

...
msgs, err := ch.Consume(
    "broker", // queue
    "",     // consumer
    false,   // auto-ack
    false,  // exclusive
    false,  // no-local
    false,  // no-wait
    nil,    // args
)
...
go func() {
    for d := range msgs {           
        //
        d.Ack(true)
    }
}()
...

I used pprof tools and found the leak:

top20 -cum
49486.65kB of 53029.65kB total (93.32%)
Dropped 42 nodes (cum <= 265.15kB)
Showing top 20 nodes out of 29 (cum >= 1024.09kB)
      flat  flat%   sum%        cum   cum%
         0     0%     0% 52517.50kB 99.03%  runtime.goexit
         0     0%     0% 46089.60kB 86.91%  github.com/streadway/amqp.(*Connection).reader
14849.80kB 28.00% 28.00% 39945.45kB 75.33%  github.com/streadway/amqp.(*Channel).recvContent
         0     0% 28.00% 39945.45kB 75.33%  github.com/streadway/amqp.(*Connection).demux
         0     0% 28.00% 39945.45kB 75.33%  github.com/streadway/amqp.(*Connection).dispatchN
         0     0% 28.00% 25095.66kB 47.32%  github.com/streadway/amqp.(*Channel).dispatch
25095.66kB 47.32% 75.33% 25095.66kB 47.32%  github.com/streadway/amqp.newDelivery
         0     0% 75.33%  6144.15kB 11.59%  github.com/streadway/amqp.(*reader).ReadFrame
 6144.15kB 11.59% 86.91%  6144.15kB 11.59%  github.com/streadway/amqp.readShortstr
         0     0% 86.91%  5632.14kB 10.62%  github.com/streadway/amqp.(*basicDeliver).read
         0     0% 86.91%  5632.14kB 10.62%  github.com/streadway/amqp.(*reader).parseMethodFrame

The leak ends up occurring at:

(pprof) list newDelivery
Total: 51.79MB
ROUTINE ======================== github.com/streadway/amqp.newDelivery in /.../src/github.com/streadway/amqp/delivery.go
   24.51MB    24.51MB (flat, cum) 47.32% of Total
         .          .     77:       Type:            props.Type,
         .          .     78:       UserId:          props.UserId,
         .          .     79:       AppId:           props.AppId,
         .          .     80:
         .          .     81:       Body: body,
   24.51MB    24.51MB     82:   }
         .          .     83:
         .          .     84:   // Properties for the delivery types
         .          .     85:   switch m := msg.(type) {
         .          .     86:   case *basicDeliver:
         .          .     87:       delivery.ConsumerTag = m.ConsumerTag

selection_003

trist4n commented 9 years ago

definitely seeing the same issue, but don't quite understand well enough to say why the leaked deliveries aren't being cleared.

could the autoAck feature be un-leaked by just having the api call .Ack(false) on itself until a proper fix is put in place?

ghost commented 9 years ago

@trist4n Been awhile since I had to deal with this; I remember trying a work-around but it did not work. If I remember correctly this was the only way I could solve the memory issue.

trist4n commented 9 years ago

it has a bit of a performance hit (at least compared to doing nothing) in my case, but its probably better than the alternative.

anyway, on closer inspection it isn't really a memory leak per-se. when autoAck is set, there is no Qos() on the channel, so it consumes messages as fast as possible with no flow control.

the comment on channel.go: *Channel.Consume() says

Deliveries on the returned chan will be buffered indefinitely.  To limit memory
of this buffer, use the Channel.Qos method to limit the amount of
unacknowledged/buffered deliveries the server will deliver on this Channel.

but the Qos() stuff does not function on channels with autoAck (as the documentation for Qos() says)

So I don't know what to do other than explicit acking. I do not understand how a client with autoAck is supposed to rate limit itself in general, nor in this library.

michaelklishin commented 7 years ago

It is not a leak but the classic producer/consumer problem with unbounded buffers where a producer (in this case, data coming down the socket) outpaces consumer(s). Use manual acknowledgements if you experience this: that's why they exist.

In Java client there is a mechanism that protects against this via TCP back pressure but it is a non-trivial change to make. So I wouldn't assume all clients will try given that there is a protocol feature already for dealing with just this behaviour.

mattwilliamson commented 7 years ago

I was seeing this exact thing when using manual ACKs without QoS. Adding QoS with a prefetch of 1 worked, but seems like a hack. conn.close() and ch.close did not help, which I would expect to release these resources. I have an application which opens and closes AMQP connections at a high frequency, which resulted in 16GB memory usage before my server crashed.

Here's a screenshot after a few thousand requests:

Screenshot

michaelklishin commented 7 years ago

Using manual acknowledgements with prefetch is not a hack: this problem is one of the key reasons why those features exist.

mattwilliamson commented 7 years ago

The prefetch buffer is leaked when the connection and channel are closed. This is a bug. Even this example has no QoS in it: https://github.com/streadway/amqp/blob/master/_examples/simple-consumer/consumer.go

michaelklishin commented 7 years ago

Thanks for clarifying.

mattwilliamson commented 7 years ago

Just to make sure you don't chase your tail, the memory leak goes away when using QoS of 1, but when not using QoS, closing the channel and connection do not release the implicit buffer.

michaelklishin commented 7 years ago

@mattwilliamson we are trying to work out what "implicit buffer" means here. Can you please point us at a specific code line/field? Thank you.

michaelklishin commented 7 years ago

This seems to be closely related to #264.