IBM / sarama

Sarama is a Go library for Apache Kafka.
MIT License
11.42k stars 1.75k forks source link

Memory leak sometimes when restarting kafka partitions #560

Closed cep21 closed 8 years ago

cep21 commented 8 years ago

I know this isn't a lot to go on, but hope you can give me pointers to debug this.

Restarting kafka brokers occasionally produces ever increasing memory usage until the go process dies OOM. This isn't consistent, but especially happens we have to restart multiple kafka nodes at once. When I run pprof/heap I see the memory usage by kafka messages but cannot tell where inside the kafka code the memory is being held onto :( It only happens occasionally and only on our production tier. I also don't see increased goroutine numbers. We're running with 256 brokers and about 12 kafka hosts, using the kafka revision just after you fixed the WaitGroup bug. The log has messages like

Failed to connect to XYZ: getsocopt: connection refused client/brokers deregistering XYZ client/brokers deregistered broker #-1 at kafkaXYZ:9092 ...

Even when kafka is stable, and the messages go away, memory usage still increases forever until we OOM. A restart and things work great.

Here is the config we use

    conf.Net.DialTimeout = time.Second * 30

    conf.ChannelBufferSize = 256

    conf.Producer.RequiredAcks = sarama.WaitForLocal
    conf.Producer.Return.Errors = true
    conf.Producer.Return.Successes = true
    conf.Producer.Partitioner = partitioner
    conf.Producer.Flush.Frequency = time.Millisecond * 250
    conf.Producer.Flush.Messages = 3000
    conf.Producer.Flush.Bytes = 83886080

The biggest issues for us are that even when kafka recovers memory usage still increases forever in this strait line upward.

When kafka failures happen, is it possible that multiple buffers could be created internally that end up buffering points?

Note we also don't see increases in messages on the error channel.

eapache commented 8 years ago

The obvious candidate for this kind of issue would be the partitionProducer. Once the cluster has healed, are there any affected partitions for which you do not see the log message producer/leader/%s/%d state change to [normal]?

cep21 commented 8 years ago

For some, I see repeated messages like

"producer/leader/sf.quantizer.rawbus/99 state change to [retrying-1]\n"
"producer/leader/sf.quantizer.rawbus/99 abandoning broker 18\n"
"producer/leader/sf.quantizer.rawbus/99 state change to [flushing-1]\n"
"producer/leader/sf.quantizer.rawbus/99 selected broker 19\n"
"producer/leader/sf.quantizer.rawbus/99 state change to [normal]\n"
"producer/flusher/19 state change to [retrying] on sf.quantizer.rawbus/99 because kafka server: Tried to send a message to a replica that is not the leader for some partition. Your metadata is out of date.\n"
"producer/leader/sf.quantizer.rawbus/99 abandoning broker 19\n"
"producer/leader/sf.quantizer.rawbus/99 state change to [retrying-1]\n"
"producer/flusher/19 state change to [normal] on sf.quantizer.rawbus/99\n"
"producer/leader/sf.quantizer.rawbus/99 state change to [flushing-1]\n"
"producer/leader/sf.quantizer.rawbus/99 selected broker 17\n"
"producer/leader/sf.quantizer.rawbus/99 state change to [normal]\n"
"producer/flusher/17 state change to [retrying] on sf.quantizer.rawbus/99 because kafka server: Tried to send a message to a replica that is not the leader for some partition. Your metadata is out of date.\n"
"producer/leader/sf.quantizer.rawbus/99 state change to [retrying-1]\n"
"producer/leader/sf.quantizer.rawbus/99 abandoning broker 17\n"
"producer/flusher/17 state change to [normal] on sf.quantizer.rawbus/99\n"
"producer/leader/sf.quantizer.rawbus/99 selected broker 17\n"
"producer/leader/sf.quantizer.rawbus/99 state change to [flushing-1]\n"
"producer/leader/sf.quantizer.rawbus/99 state change to [normal]\n"
"producer/flusher/17 state change to [retrying] on sf.quantizer.rawbus/99 because kafka server: Tried to send a message to a replica that is not the leader for some partition. Your metadata is out of date.\n"
"producer/leader/sf.quantizer.rawbus/99 abandoning broker 17\n"
"producer/leader/sf.quantizer.rawbus/99 state change to [retrying-2]\n"
"producer/flusher/17 state change to [normal] on sf.quantizer.rawbus/99\n"
"producer/leader/sf.quantizer.rawbus/99 state change to [flushing-2]\n"
"producer/leader/sf.quantizer.rawbus/99 state change to [normal]\n"
"producer/leader/sf.quantizer.rawbus/99 selected broker 19\n"

For others, I see logs like

"producer/leader/sf.quantizer.rawbus/67 state change to [retrying-1]\n"
"producer/leader/sf.quantizer.rawbus/67 abandoning broker 23\n"
"producer/leader/sf.quantizer.rawbus/67 selected broker 18\n"
"producer/leader/sf.quantizer.rawbus/67 state change to [normal]\n"
"producer/leader/sf.quantizer.rawbus/67 state change to [flushing-1]\n"

When I | sort | uniq -c I see lots of partitions that have no log messages at all.

Some other partition log messages

"producer/leader/sf.quantizer.rawbus/89 abandoning broker 21\n"
"producer/flusher/21 state change to [normal] on sf.quantizer.rawbus/89\n"
"producer/flusher/21 state change to [retrying] on sf.quantizer.rawbus/89 because kafka server: Tried to send a message to a replica that is not the leader for some partition. Your metadata is out of date.\n"
"producer/leader/sf.quantizer.rawbus/89 state change to [retrying-1]\n"
"producer/leader/sf.quantizer.rawbus/89 selected broker 18\n"
"producer/leader/sf.quantizer.rawbus/89 state change to [flushing-1]\n"
"producer/leader/sf.quantizer.rawbus/89 state change to [normal]\n"
"producer/flusher/18 state change to [retrying] on sf.quantizer.rawbus/89 because kafka server: Tried to send a message to a replica that is not the leader for some partition. Your metadata is out of date.\n"
"producer/leader/sf.quantizer.rawbus/89 state change to [retrying-1]\n"
"producer/leader/sf.quantizer.rawbus/89 abandoning broker 18\n"
"producer/flusher/18 state change to [normal] on sf.quantizer.rawbus/89\n"
"producer/leader/sf.quantizer.rawbus/89 selected broker 21\n"
"producer/leader/sf.quantizer.rawbus/89 state change to [flushing-1]\n"
"producer/leader/sf.quantizer.rawbus/89 state change to [normal]\n"
"producer/flusher/21 state change to [retrying] on sf.quantizer.rawbus/89 because kafka server: Tried to send a message to a replica that is not the leader for some partition. Your metadata is out of date.\n"
"producer/leader/sf.quantizer.rawbus/89 state change to [retrying-2]\n"
"producer/leader/sf.quantizer.rawbus/89 abandoning broker 21\n"
"producer/flusher/21 state change to [normal] on sf.quantizer.rawbus/89\n"
"producer/leader/sf.quantizer.rawbus/89 selected broker 21\n"
"producer/leader/sf.quantizer.rawbus/89 state change to [flushing-2]\n"
"producer/leader/sf.quantizer.rawbus/89 state change to [normal]\n"
"producer/flusher/21 state change to [retrying] on sf.quantizer.rawbus/89 because kafka server: Tried to send a message to a replica that is not the leader for some partition. Your metadata is out of date.\n"
"producer/leader/sf.quantizer.rawbus/89 state change to [retrying-3]\n"
"producer/leader/sf.quantizer.rawbus/89 abandoning broker 21\n"
"producer/flusher/21 state change to [normal] on sf.quantizer.rawbus/89\n"
"producer/leader/sf.quantizer.rawbus/89 selected broker 21\n"
"producer/leader/sf.quantizer.rawbus/89 state change to [flushing-3]\n"
"producer/leader/sf.quantizer.rawbus/89 state change to [normal]\n"
"producer/flusher/21 state change to [retrying] on sf.quantizer.rawbus/89 because kafka server: Tried to send a message to a replica that is not the leader for some partition. Your metadata is out of date.\n"
"producer/leader/sf.quantizer.rawbus/89 abandoning broker 21\n"
"producer/leader/sf.quantizer.rawbus/89 state change to [retrying-3]\n"
"producer/flusher/21 state change to [normal] on sf.quantizer.rawbus/89\n"
"producer/leader/sf.quantizer.rawbus/89 selected broker 21\n"
"producer/leader/sf.quantizer.rawbus/89 state change to [flushing-3]\n"
"producer/leader/sf.quantizer.rawbus/89 state change to [normal]\n"
"producer/flusher/21 state change to [retrying] on sf.quantizer.rawbus/89 because kafka server: Tried to send a message to a replica that is not the leader for some partition. Your metadata is out of date.\n"
"producer/leader/sf.quantizer.rawbus/89 state change to [retrying-3]\n"
"producer/leader/sf.quantizer.rawbus/89 abandoning broker 21\n"
"producer/flusher/21 state change to [normal] on sf.quantizer.rawbus/89\n"
"producer/leader/sf.quantizer.rawbus/89 selected broker 21\n"
"producer/leader/sf.quantizer.rawbus/89 state change to [flushing-3]\n"
"producer/leader/sf.quantizer.rawbus/89 state change to [normal]\n"
"producer/flusher/21 state change to [retrying] on sf.quantizer.rawbus/89 because kafka server: Tried to send a message to a replica that is not the leader for some partition. Your metadata is out of date.\n"
"producer/leader/sf.quantizer.rawbus/89 state change to [retrying-3]\n"
"producer/leader/sf.quantizer.rawbus/89 abandoning broker 21\n"
"producer/flusher/21 state change to [normal] on sf.quantizer.rawbus/89\n"
"producer/leader/sf.quantizer.rawbus/89 selected broker 21\n"
"producer/leader/sf.quantizer.rawbus/89 state change to [flushing-3]\n"
"producer/leader/sf.quantizer.rawbus/89 state change to [normal]\n"
"producer/flusher/21 state change to [retrying] on sf.quantizer.rawbus/89 because kafka server: Tried to send a message to a replica that is not the leader for some partition. Your metadata is out of date.\n"
"producer/leader/sf.quantizer.rawbus/89 state change to [retrying-3]\n"
"producer/leader/sf.quantizer.rawbus/89 abandoning broker 21\n"
"producer/flusher/21 state change to [normal] on sf.quantizer.rawbus/89\n"
"producer/leader/sf.quantizer.rawbus/89 selected broker 21\n"
"producer/leader/sf.quantizer.rawbus/89 state change to [flushing-3]\n"
"producer/leader/sf.quantizer.rawbus/89 state change to [normal]\n"
"producer/flusher/21 state change to [retrying] on sf.quantizer.rawbus/89 because kafka server: Tried to send a message to a replica that is not the leader for some partition. Your metadata is out of date.\n"
"producer/leader/sf.quantizer.rawbus/89 state change to [retrying-3]\n"
"producer/leader/sf.quantizer.rawbus/89 abandoning broker 21\n"
"producer/flusher/21 state change to [normal] on sf.quantizer.rawbus/89\n"
"producer/leader/sf.quantizer.rawbus/89 selected broker 21\n"
"producer/leader/sf.quantizer.rawbus/89 state change to [flushing-3]\n"
"producer/leader/sf.quantizer.rawbus/89 state change to [normal]\n"
"producer/flusher/21 state change to [retrying] on sf.quantizer.rawbus/89 because kafka server: Tried to send a message to a replica that is not the leader for some partition. Your metadata is out of date.\n"
"producer/leader/sf.quantizer.rawbus/89 state change to [retrying-3]\n"
"producer/leader/sf.quantizer.rawbus/89 abandoning broker 21\n"
"producer/flusher/21 state change to [normal] on sf.quantizer.rawbus/89\n"
"producer/leader/sf.quantizer.rawbus/89 selected broker 21\n"
"producer/leader/sf.quantizer.rawbus/89 state change to [flushing-3]\n"
"producer/leader/sf.quantizer.rawbus/89 state change to [normal]\n"
"producer/flusher/21 state change to [retrying] on sf.quantizer.rawbus/89 because kafka server: Tried to send a message to a replica that is not the leader for some partition. Your metadata is out of date.\n"
"producer/leader/sf.quantizer.rawbus/89 state change to [retrying-3]\n"
"producer/leader/sf.quantizer.rawbus/89 abandoning broker 21\n"
"producer/flusher/21 state change to [normal] on sf.quantizer.rawbus/89\n"
"producer/leader/sf.quantizer.rawbus/89 selected broker 21\n"
"producer/leader/sf.quantizer.rawbus/89 state change to [flushing-3]\n"
"producer/leader/sf.quantizer.rawbus/89 state change to [normal]\n"
"producer/flusher/21 state change to [retrying] on sf.quantizer.rawbus/89 because kafka server: Tried to send a message to a replica that is not the leader for some partition. Your metadata is out of date.\n"
"producer/leader/sf.quantizer.rawbus/89 abandoning broker 21\n"
"producer/leader/sf.quantizer.rawbus/89 state change to [retrying-3]\n"
"producer/flusher/21 state change to [normal] on sf.quantizer.rawbus/89\n"
"producer/leader/sf.quantizer.rawbus/89 selected broker 21\n"
"producer/leader/sf.quantizer.rawbus/89 state change to [flushing-3]\n"
"producer/leader/sf.quantizer.rawbus/89 state change to [normal]\n"
"producer/flusher/21 state change to [retrying] on sf.quantizer.rawbus/89 because kafka server: Tried to send a message to a replica that is not the leader for some partition. Your metadata is out of date.\n"
"producer/leader/sf.quantizer.rawbus/89 state change to [retrying-3]\n"
"producer/leader/sf.quantizer.rawbus/89 abandoning broker 21\n"
"producer/flusher/21 state change to [normal] on sf.quantizer.rawbus/89\n"
"producer/leader/sf.quantizer.rawbus/89 selected broker 21\n"
"producer/leader/sf.quantizer.rawbus/89 state change to [flushing-3]\n"
"producer/leader/sf.quantizer.rawbus/89 state change to [normal]\n"
"producer/flusher/21 state change to [retrying] on sf.quantizer.rawbus/89 because kafka server: Tried to send a message to a replica that is not the leader for some partition. Your metadata is out of date.\n"
"producer/leader/sf.quantizer.rawbus/89 state change to [retrying-3]\n"
"producer/leader/sf.quantizer.rawbus/89 abandoning broker 21\n"
"producer/flusher/21 state change to [normal] on sf.quantizer.rawbus/89\n"
"producer/leader/sf.quantizer.rawbus/89 selected broker 21\n"
"producer/leader/sf.quantizer.rawbus/89 state change to [flushing-3]\n"
"producer/leader/sf.quantizer.rawbus/89 state change to [normal]\n"
"producer/flusher/21 state change to [retrying] on sf.quantizer.rawbus/89 because kafka server: Tried to send a message to a replica that is not the leader for some partition. Your metadata is out of date.\n"
"producer/leader/sf.quantizer.rawbus/89 state change to [retrying-3]\n"
"producer/leader/sf.quantizer.rawbus/89 abandoning broker 21\n"
"producer/flusher/21 state change to [normal] on sf.quantizer.rawbus/89\n"
"producer/leader/sf.quantizer.rawbus/89 selected broker 21\n"
"producer/leader/sf.quantizer.rawbus/89 state change to [flushing-3]\n"
"producer/leader/sf.quantizer.rawbus/89 state change to [normal]\n"
"producer/flusher/21 state change to [retrying] on sf.quantizer.rawbus/89 because kafka server: Tried to send a message to a replica that is not the leader for some partition. Your metadata is out of date.\n"
"producer/leader/sf.quantizer.rawbus/89 abandoning broker 21\n"
"producer/leader/sf.quantizer.rawbus/89 state change to [retrying-3]\n"
"producer/flusher/21 state change to [normal] on sf.quantizer.rawbus/89\n"
"producer/leader/sf.quantizer.rawbus/89 selected broker 21\n"
"producer/leader/sf.quantizer.rawbus/89 state change to [flushing-3]\n"
"producer/leader/sf.quantizer.rawbus/89 state change to [normal]\n"
"producer/flusher/21 state change to [retrying] on sf.quantizer.rawbus/89 because kafka server: Tried to send a message to a replica that is not the leader for some partition. Your metadata is out of date.\n"
"producer/leader/sf.quantizer.rawbus/89 state change to [retrying-3]\n"
"producer/leader/sf.quantizer.rawbus/89 abandoning broker 21\n"
"producer/flusher/21 state change to [normal] on sf.quantizer.rawbus/89\n"
"producer/leader/sf.quantizer.rawbus/89 state change to [flushing-3]\n"
"producer/leader/sf.quantizer.rawbus/89 selected broker 21\n"
"producer/leader/sf.quantizer.rawbus/89 state change to [normal]\n"
"producer/flusher/21 state change to [retrying] on sf.quantizer.rawbus/89 because kafka server: Tried to send a message to a replica that is not the leader for some partition. Your metadata is out of date.\n"
"producer/leader/sf.quantizer.rawbus/89 state change to [retrying-3]\n"
"producer/flusher/21 state change to [normal] on sf.quantizer.rawbus/89\n"
"producer/leader/sf.quantizer.rawbus/89 abandoning broker 21\n"
"producer/leader/sf.quantizer.rawbus/89 selected broker 21\n"
"producer/leader/sf.quantizer.rawbus/89 state change to [flushing-3]\n"
"producer/leader/sf.quantizer.rawbus/89 state change to [normal]\n"
"producer/flusher/21 state change to [retrying] on sf.quantizer.rawbus/89 because kafka server: Tried to send a message to a replica that is not the leader for some partition. Your metadata is out of date.\n"
"producer/leader/sf.quantizer.rawbus/89 abandoning broker 21\n"
"producer/leader/sf.quantizer.rawbus/89 state change to [retrying-3]\n"
"producer/flusher/21 state change to [normal] on sf.quantizer.rawbus/89\n"
"producer/leader/sf.quantizer.rawbus/89 selected broker 21\n"
"producer/flusher/21 state change to [retrying] on sf.quantizer.rawbus/89 because kafka server: Tried to send a message to a replica that is not the leader for some partition. Your metadata is out of date.\n"
"producer/leader/sf.quantizer.rawbus/89 state change to [flushing-3]\n"
"producer/leader/sf.quantizer.rawbus/89 state change to [normal]\n"
"producer/leader/sf.quantizer.rawbus/89 state change to [retrying-3]\n"
"producer/leader/sf.quantizer.rawbus/89 abandoning broker 21\n"
"producer/flusher/21 state change to [normal] on sf.quantizer.rawbus/89\n"
"producer/leader/sf.quantizer.rawbus/89 state change to [flushing-3]\n"
"producer/leader/sf.quantizer.rawbus/89 state change to [normal]\n"
"producer/leader/sf.quantizer.rawbus/89 selected broker 21\n"
"producer/flusher/21 state change to [retrying] on sf.quantizer.rawbus/89 because kafka server: Tried to send a message to a replica that is not the leader for some partition. Your metadata is out of date.\n"
"producer/leader/sf.quantizer.rawbus/89 state change to [retrying-3]\n"
"producer/leader/sf.quantizer.rawbus/89 abandoning broker 21\n"
"producer/flusher/21 state change to [normal] on sf.quantizer.rawbus/89\n"
"producer/leader/sf.quantizer.rawbus/89 selected broker 21\n"
"producer/flusher/21 state change to [retrying] on sf.quantizer.rawbus/89 because kafka server: Tried to send a message to a replica that is not the leader for some partition. Your metadata is out of date.\n"
"producer/leader/sf.quantizer.rawbus/89 state change to [flushing-3]\n"
"producer/leader/sf.quantizer.rawbus/89 state change to [normal]\n"
"producer/flusher/21 state change to [normal] on sf.quantizer.rawbus/89\n"
"producer/leader/sf.quantizer.rawbus/89 state change to [retrying-3]\n"
"producer/leader/sf.quantizer.rawbus/89 abandoning broker 21\n"
"producer/leader/sf.quantizer.rawbus/89 selected broker 21\n"
"producer/leader/sf.quantizer.rawbus/89 state change to [flushing-3]\n"
"producer/leader/sf.quantizer.rawbus/89 state change to [normal]\n"
"producer/flusher/21 state change to [retrying] on sf.quantizer.rawbus/89 because kafka server: Tried to send a message to a replica that is not the leader for some partition. Your metadata is out of date.\n"
"producer/leader/sf.quantizer.rawbus/89 state change to [retrying-3]\n"
"producer/leader/sf.quantizer.rawbus/89 abandoning broker 21\n"
"producer/flusher/21 state change to [normal] on sf.quantizer.rawbus/89\n"
"producer/leader/sf.quantizer.rawbus/89 selected broker 21\n"
"producer/leader/sf.quantizer.rawbus/89 state change to [flushing-3]\n"
"producer/leader/sf.quantizer.rawbus/89 state change to [normal]\n"
eapache commented 8 years ago

When I | sort | uniq -c I see lots of partitions that have no log messages at all.

That should be normal, partitions which are on other brokers (ones that are not restarted) won't log much if anything.

The thing that is curious for me is the partitions (e.g. 89 in your sample logs) repeatedly selecting and then abandoning the same broker. It looks to me like the Kafka cluster might be confused at this point: every time we ask it who leads the partition it says broker 21, but every time we try and send a message to broker 21 it returns NotLeaderForPartition.

I'll dig a little deeper.

eapache commented 8 years ago

Note we also don't see increases in messages on the error channel.

This surprises me a lot: given the logs posted above I'd have expected a steady stream of messages on the error channel from partition 89 as each message to it is retried 3 times and then fails.

The biggest issues for us are that even when kafka recovers memory usage still increases forever in this strait line upward.

I think I know why this is happening, and agree that ideally it shouldn't, but it's almost more of a symptom than a bug. Sarama buffers internally while retrying partitions, and in this case because we are retrying in an apparent infinite loop we are also buffering infinitely. If we can fix those partitions then the memory problem should effectively go away because Sarama will be able to flush its buffers properly.

The next time you hit a situation like this, it is worth checking the kafka broker and controller logs, and also the contents of zookeeper, for the partitions having trouble. Perhaps one of the brokers is serving stale metadata.

eapache commented 8 years ago

Hmm, reminds me a bit of https://issues.apache.org/jira/browse/KAFKA-2082 in which brokers can serve incorrect/stale metadata if they are temporarily isolated from the zookeeper cluster. Could something like that be happening here?

cep21 commented 8 years ago

You're right about error messages. I rechecked the log and I see only these messages on the error channel

kafka: Failed to produce message to topic sf.quantizer.rawbus: kafka server: Tried to send a message to a replica that is not the leader for some partition. Your metadata is out of date.

I could understand kafka getting in some strange state, but the weird thing for us is that restarting the server that is sending points to kafka fixes the problem.

Even when we stop seeing messages about "Failed to produce message to topic" and stop getting messages on the error channel we still see the same slope of increased memory usage..

Attached is a chart with memory usage on one axis and # of error messages on the other axis.

https://cloud.githubusercontent.com/assets/20358/10912580/0ea7f30c-81ff-11e5-86bc-509993aa8c61.png

![image](img width="1425" alt="memory_usage" src="https://cloud.githubusercontent.com/assets/20358/10912580/0ea7f30c-81ff-11e5-86bc-509993aa8c61.png")

eapache commented 8 years ago

I could understand kafka getting in some strange state, but the weird thing for us is that restarting the server that is sending points to kafka fixes the problem.

This is theoretically explainable because Sarama picks a random broker on startup to get metadata from; if only one of the brokers is serving bad metadata, restarting sarama will likely pick a different one and "fix" the problem.

Even when we stop seeing messages about "Failed to produce message to topic" and stop getting messages on the error channel we still see the same slope of increased memory usage..

That is super-weird.

eapache commented 8 years ago

To be sure I understand; after the brokers restart, the final "steady" state of the producer is:

eapache commented 8 years ago

I am unable to reconcile the repeated state changes with the lack of messages coming out on the error channel. Or do the state change logs stop at the same time as the errors stop?

If they do both stop together: what is the final state change for an affected partition?

If the state changes continue even after the errors stop: somehow, messages that are at the configured retry limit (3, per default) in the partitionProducer are not getting put on the error channel by the retryMessages helper method. It may be worth putting some log messages in those places and in the aggregator to see where those messages are getting lost/mutated.

Tangent 1: Is the Encode method provided for your keys and values idempotent? The producer will re-encode messages from scratch every time they are retried, so if your Encoder implementation tracks state or something that could theoretically be an issue.

Tangent 2: I assume based on your initial report that the SHA you are running is 7e88a99 or thereabouts.

cep21 commented 8 years ago

Here is a histogram of log messages coming from sarama logger https://cloud.githubusercontent.com/assets/20358/10916807/2ff1370e-8212-11e5-80d4-325e2113e099.png

sarama_log_histogram

The short burst at around 18:50 is

client/metadata got error from broker while fetching metadata: EOF
client/metadata fetching metadata for all topics from broker kafka26--bbac.int.XYZ.com:9092
 client/metadata fetching metadata for all topics from broker kafka44--bbac.int.XYZ.com:9092
 Closed connection to broker kafka44--bbac.int.XYZ.com:9092
 Connected to broker at kafka26--bbac.int.XYZ.com:9092 (unregistered)
 client/metadata fetching metadata for all topics from broker kafka20--bbab.int.XYZ.com:9092
 client/metadata fetching metadata for all topics from broker kafka22--bbaa.int.XYZ.com:9092
cep21 commented 8 years ago

Pprof inuse space at the time

> go tool pprof -focus=sarama -text -inuse_space /tmp/sbingest /tmp/pprof_heap
8789.72kB of 448073.58kB total ( 1.96%)
Dropped 83 nodes (cum <= 2240.37kB)
      flat  flat%   sum%        cum   cum%
 3635.02kB  0.81%  0.81%  4147.05kB  0.93%  github.com/signalfuse/XYZ/vendor/github.com/cep21/sarama.(*partitionProducer).dispatch
 1549.16kB  0.35%  1.16%  1549.16kB  0.35%  bytes.makeSlice
 1024.09kB  0.23%  1.39%  1024.09kB  0.23%  github.com/signalfuse/XYZ/vendor/github.com/cep21/sarama.(*asyncProducer).buildRequest
 1024.06kB  0.23%  1.61%  1024.06kB  0.23%  github.com/signalfuse/XYZ/vendor/github.com/cep21/sarama.(*TopicMetadata).decode
  532.26kB  0.12%  1.73%   532.26kB  0.12%  github.com/signalfuse/XYZ/vendor/github.com/cep21/sarama.encode
     513kB  0.11%  1.85%      513kB  0.11%  github.com/signalfuse/XYZ/vendor/github.com/cep21/sarama.(*asyncProducer).newPartitionProducer
  512.12kB  0.11%  1.96%   512.12kB  0.11%  github.com/signalfuse/XYZ/vendor/github.com/cep21/sarama.(*flusher).groupAndFilter

pprof inuse objects

< go tool pprof -focus=sarama -text -inuse_objects /tmp/sbingest /tmp/pprof_heap
29791 of 7577988 total ( 0.39%)
Dropped 88 nodes (cum <= 37889)
      flat  flat%   sum%        cum   cum%
     16384  0.22%  0.22%      16384  0.22%  github.com/signalfuse/XYZ/vendor/github.com/cep21/sarama.(*TopicMetadata).decode
     10923  0.14%  0.36%      10923  0.14%  github.com/signalfuse/XYZ/vendor/github.com/cep21/sarama.(*asyncProducer).buildRequest
      2048 0.027%  0.39%       2048 0.027%  github.com/signalfuse/XYZ/vendor/github.com/cep21/sarama.(*flusher).groupAndFilter
       256 0.0034%  0.39%        256 0.0034%  github.com/signalfuse/XYZ/vendor/github.com/cep21/sarama.(*asyncProducer).newPartitionProducer
       179 0.0024%  0.39%        179 0.0024%  bytes.makeSlice
         1 1.3e-05%  0.39%       8193  0.11%  github.com/signalfuse/XYZ/vendor/github.com/cep21/sarama.(*partitionProducer).dispatch
cep21 commented 8 years ago

The actual version I'm using is https://github.com/cep21/sarama which is a wrapper around 4faee6199919651f769e85fd97f1b3bf5739cb81

eapache commented 8 years ago

Those pprof profiles don't seem to indicate the many hundreds of MBs of usage that is shown in your graph. If you ask for allocated instead (alloc_space, alloc_objects) or sort by cumulative (-cum) does that change?

https://github.com/cep21/sarama which is a wrapper around 4faee61

That's just before I fixed the waitgroup bug, not after...?

cep21 commented 8 years ago

Those pprof profiles don't seem to indicate

I filtered the pprof to sarama functions. If I remove the sarama functioun filter it shows the objects I'm putting into sarama's channel.

before if fixed

Sorry, right, the one just before you fixed it.

eapache commented 8 years ago

I've written some tests and tried a few things here and I haven't found anything suspicious: buffers are always getting flushed appropriately for me.

Since memory is consistently increasing even after the logs and errors stop: what is the very last set of state change logs coming from one of the affected partitions? Does it end with a state change [normal] or is it maybe getting stuck in some other state? I assume these would be from just before 18:40 in your graph of logs.

eapache commented 8 years ago

I was able to create a test which approximates some of the symptoms you're seeing I think, but it still seems somewhat different. I pushed it as branch debug-560 off of 4faee61. When I run DEBUG=true go test -v -run RepeatFailures on that branch, the test process does use increasing memory throughout the course of the test.

However, when I run a memory profile, the result shows all the memory being properly freed by the end of the test (once the error channel is drained), so it doesn't really look like a leak. I'd be curious if you run this test yourself and compare it to the details of your production system if this looks like it's a similar or different problem to the one you're actually seeing?

cep21 commented 8 years ago

I emailed you the sarama log output. I'll try running the test on a production machine.

eapache commented 8 years ago

Without more precise log ordering it is difficult to get any useful information out of these logs. There are e.g. hundreds of entries at 27:33 which might be informative or might not, depending on what order the state changes happen in.

cep21 commented 8 years ago

Would it be useful if there were internal statistics about the sarama state machine that I could track or even report when issues happen? For example, if sarama had a func (s *) Stats() Stats {} endpoint that would return lots of information about what's going on internally?

eapache commented 8 years ago

It might be useful, but accumulating a consistent snapshot across all of sarama's goroutines would be quite difficult without some sort of global stop-the-world lock.

One thing I have noticed in the logs you sent is that partition 65 transitions several times to state [retrying-1] without a corresponding transition to [flushing-1]. This is not impossible, but given the other characteristics of the problem this transition is a bit unexpected. It also happens (within the granularity of your graph) right when the memory starts leaking.

I've done a bit of playing with the state machine and haven't come up with any explanations yet, but there's a lot of ground to cover. Any further details you might be able to provide on the behaviour of that partition in particular could be useful (e.g. if you have broker-side msgs/sec metrics broken out for that partition they might be informative).

cep21 commented 8 years ago

This happened again. The memory leak doesn't happen for all servers at once, but for all of them it eventually happens.

screen shot 2015-11-13 at 11 08 02 am

cep21 commented 8 years ago

One kafka broker failed and we replaced the broker. It kept the same broker ID. And then we restarted the tier. The leak happened on the restart, but only after a few were restarted. We were able to restart a good number before things started increasing in memory usage.

cep21 commented 8 years ago

One thing unique about our production env is that we sometimes error the message encode. Could that somehow contribute?

eapache commented 8 years ago

Well the failing encode was the cause of #449 and you're not actually running with that fixed, just with the work-around you implemented to rescue the panic. I would certainly try moving to vanilla v1.6.1 in order to eliminate that as the problem.

cep21 commented 8 years ago

I'll probably update my current branch to v1.6.1 but still wrap sync.waitgroup inside sarama since I'm still hesitant to run with sync.WaitGroup's fatal stop-the-world behavior if there is another bug that we can't currently identify.

However, I don't see how this could be the cause of the current memory leak behavior. I just mentioned it to see if it could cause a memory leak in sarama during kafka restarts.

eapache commented 8 years ago

After some investigation, I am reasonably sure this bug is just another side-effect of the encoding-error bug (#449) and that updating to 1.6.1 (with or without the safety wrapper) will fix it.

In detail: the encoding bug I fixed in 1.6.1 was that when a message failed to encode we would return it out the error channel immediately, but leave it in the buffer to be returned again when the broker response arrived (typically as a success). Since the message had been added only once but was being removed twice (once as a failure, then again incorrectly as a success) this threw the wait-group out of sync.

However, if the broker response was not a success but a retriable error (e.g. when the cluster is restarting), then the failed message would be retried with the rest of the batch. Because we clear the retry metadata when we return an error, this means the retried message would have incorrect retry metadata. The state machine is reasonably robust and I'm not actually sure how this would have caused a memory leak vs just mis-ordering certain messages, but it definitely caused problems and clearly explains the weird state changes on partition 65 I noted above.

If this still happens with a 1.6.1-based release then please reopen and I'll take another look.