confluentinc / confluent-kafka-dotnet

Confluent's Apache Kafka .NET client
https://github.com/confluentinc/confluent-kafka-dotnet/wiki
Apache License 2.0
67 stars 861 forks source link

Out of memory when a broker is down #444

Open timothyfranzke opened 6 years ago

timothyfranzke commented 6 years ago

Description

We have a 5 broker cluster and 5 zookeepers. Broker 2 was down and our consumer application ran out of memory within 20 minutes of the broker going down. Our brokers are on version 0.9.0.1. The consumer application's brootstrap.server lists all 5 brokers. We found this error in the log:

Faulting application name: w3wp.exe, version: 8.5.9600.16384, time stamp: 0x5215df96 Faulting module name: librdkafka.dll, version: 0.0.0.0, time stamp: 0x5a2526b4 Exception code: 0xc0000005 Fault offset: 0x000000000007fcd7 Faulting process id: 0x198c Faulting application start time: 0x01d3ae1fc685270c Faulting application path: c:\windows\system32\inetsrv\w3wp.exe Faulting module path: E:\AtiAppServices\ProductsAuthorizationQueueMonitor.Services\bin\librdkafka\x64\librdkafka.dll Report Id: 018eb4b5-1a27-11e8-8146-005056a370b1 Faulting package full name: Faulting package-relative application ID:

Consumer Configuration: {"group.id", "ProductAuthorization"}, {"enable.auto.commit",false}, {"statistics.interval.ms", 60000}, {"api.version.request", false }, {"bootstrap.servers", "ati-prd-kaf01:9092,ati-prd-kaf02:9092,ati-prd-kaf03:9092,ati-prd-kaf04:9092,ati-prd-kaf05:9092"}, {"broker.version.fallback", "0.9.0.1"}, { "default.topic.config", new Dictionary<string, object>() { {"auto.offset.reset", "smallest"} } }

How to reproduce

We reproduced the error in our testing environment by stopping a single broker.

Checklist

Please provide the following information:

mhowlett commented 6 years ago

@timothyfranzke - are you sure the problem is running out of memory? based on the information it looks to me like it could be, but not necessarily (and my guess is it's probably some other problem with librdkafka). @edenhill - access violation in librdkakfa.

timothyfranzke commented 6 years ago

Yeah, the page file filled up the system drive. We have our IIS team working on that portion.
We were also able to track down this log: Exception Info: System.AccessViolationException at Confluent.Kafka.Impl.NativeMethods.NativeMethods.rd_kafka_query_watermark_offsets(IntPtr, System.String, Int32, Int64 ByRef, Int64 ByRef, IntPtr) at Confluent.Kafka.Impl.LibRdKafka.query_watermark_offsets(IntPtr, System.String, Int32, Int64 ByRef, Int64 ByRef, IntPtr) at Confluent.Kafka.Impl.SafeKafkaHandle.QueryWatermarkOffsets(System.String, Int32, Int32) at Monitor.Monitor.GetWaterMarkOffsets(Confluent.Kafka.TopicPartition) at Monitor.Monitor.b8_9(Confluent.Kafka.TopicPartition) at System.Linq.Enumerable+WhereSelectListIterator2[[System.__Canon, mscorlib, Version=4.0.0.0, Culture=neutral, PublicKeyToken=b77a5c561934e089],[System.__Canon, mscorlib, Version=4.0.0.0, Culture=neutral, PublicKeyToken=b77a5c561934e089]].MoveNext() at System.Collections.Generic.List1[[System.Canon, mscorlib, Version=4.0.0.0, Culture=neutral, PublicKeyToken=b77a5c561934e089]]..ctor(System.Collections.Generic.IEnumerable`1<System.Canon>) at System.Linq.Enumerable.ToList[System.__Canon, mscorlib, Version=4.0.0.0, Culture=neutral, PublicKeyToken=b77a5c561934e089] at Monitor.Monitor+<>c__DisplayClass8_0.b8(System.Object, System.Collections.Generic.List1<Confluent.Kafka.TopicPartition>) at Confluent.Kafka.Consumer.RebalanceCallback(IntPtr, Confluent.Kafka.ErrorCode, IntPtr, IntPtr) at Confluent.Kafka.Impl.NativeMethods.NativeMethods.rd_kafka_consumer_poll(IntPtr, IntPtr) at Confluent.Kafka.Impl.SafeKafkaHandle.ConsumerPoll(Confluent.Kafka.Message ByRef, IntPtr) at Confluent.Kafka.Consumer.Consume(Confluent.Kafka.Message ByRef, Int32) at Confluent.Kafka.Consumer2[[System.Canon, mscorlib, Version=4.0.0.0, Culture=neutral, PublicKeyToken=b77a5c561934e089],[System.Canon, mscorlib, Version=4.0.0.0, Culture=neutral, PublicKeyToken=b77a5c561934e089]].Consume(Confluent.Kafka.Message2<System.__Canon,System.__Canon> ByRef, Int32) at Confluent.Kafka.Consumer2[[System.Canon, mscorlib, Version=4.0.0.0, Culture=neutral, PublicKeyToken=b77a5c561934e089],[System.Canon, mscorlib, Version=4.0.0.0, Culture=neutral, PublicKeyToken=b77a5c561934e089]].Poll(System.TimeSpan) at Monitor.Monitor.Start() at Monitor.Monitor.StartMonitor() at System.Threading.ExecutionContext.RunInternal(System.Threading.ExecutionContext, System.Threading.ContextCallback, System.Object, Boolean) at System.Threading.ExecutionContext.Run(System.Threading.ExecutionContext, System.Threading.ContextCallback, System.Object, Boolean) at System.Threading.ExecutionContext.Run(System.Threading.ExecutionContext, System.Threading.ContextCallback, System.Object) at System.Threading.ThreadHelper.ThreadStart()

mhowlett commented 6 years ago

it could be that we are not correctly freeing memory somewhere in an error case, though I had a quick look and I can't see where that is.

It is probably a pretty good assumption that since the exception happens in the rebalance code calling QueryWatermarkOffsets the problem is with one of these.

If you could provide librdkafka logs (set debug config param to all) when the problem is occurring, that might help a lot.

Or if you provide relevant code, that would help us try to reproduce (or possibly identify problems with the code, not that Confluent.Kafka should ever leak memory like this).

timothyfranzke commented 6 years ago

That is interesting that you mentioned the QueryWatermarkOffsets. We noticed an issue in our testing environment where the consumer group would randomly start over at offset '0'. I had implemented a safe guard using QueryWatermarkOffsets to check the last message offset, every time the consumer process restarted, and not process any messages that were before this offset. I submitted a ticket with Confluent regarding the issue and they said it was possibly due to the broker version being old (0.9.01) and to use the "broker.fallback.version" configuration in my consumer. I haven't seen this issue since we've added this. So I am hoping that resolved the issue. If it is resolved, we can remove the QueryqatermarkOffsets method from the code.

We are going to try to recreate the issue in our dev environment. I'll let you know if we can get more logs out of that.

mhowlett commented 6 years ago

Removing QueryWatermakOffsets seems worth a try since i would guess it's unusual to be calling this in the rebalance callback, though I don't know of any reason why this would be problematic (and if it is, we should definitely change things to handle it more adequately than you report).

mhowlett commented 6 years ago

leaving open but marking with low priority as it seems likely there is an un-commonly hit real issue here (but further investigation required).

wangjia184 commented 5 years ago

We encounter the same issue. We have three nodes Kafka cluster, and when one node is down, process crazily consumes memory and finnally run out RAM.

After removing the call of QueryWatermarkOffsets, the issue is solved.

wangjia184 commented 5 years ago

This issue is fixed after upgrading from v0.11.6 to v1.1.0 Now when broker is down, it raises an exception. this is better

Confluent.Kafka.KafkaException: Broker: Leader not available
   at Confluent.Kafka.Impl.SafeKafkaHandle.QueryWatermarkOffsets(String topic, Int32 partition, Int32 millisecondsTimeout)
   at Confluent.Kafka.Consumer`2.QueryWatermarkOffsets(TopicPartition topicPartition, TimeSpan timeout)