confluentinc / confluent-kafka-dotnet

Confluent's Apache Kafka .NET client
https://github.com/confluentinc/confluent-kafka-dotnet/wiki
Apache License 2.0
77 stars 866 forks source link

Possible memory issue with the producer when a message is large #1748

Open rafaelldi opened 2 years ago

rafaelldi commented 2 years ago

Description

Hello, We've been faced with a possible memory leak.

We have a long-living CancellationTokenSource. While trying to send a large message producer throws an exception ProduceException Broker: Message size too large. After that, GC doesn't remove TypedTaskDeliveryHandlerShim (with retained objects) from the heap because it is connected with that CancellationTokenSource. If the exception doesn't appear, the TypedTaskDeliveryHandlerShim doesn't retain in the memory.

How to reproduce

docker-compose.yml ```yaml --- version: '3' services: zookeeper: image: confluentinc/cp-zookeeper:7.0.0 hostname: zookeeper container_name: zookeeper environment: ZOOKEEPER_CLIENT_PORT: 2181 ZOOKEEPER_TICK_TIME: 2000 broker: image: confluentinc/cp-kafka:7.0.0 container_name: broker ports: - "9092:9092" depends_on: - zookeeper environment: KAFKA_BROKER_ID: 1 KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181' KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_INTERNAL:PLAINTEXT KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092,PLAINTEXT_INTERNAL://broker:29092 KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 ```
Program.cs ```csharp class Program { static async Task Main() { using var cts = new CancellationTokenSource(); var producerConfig = new ProducerConfig { BootstrapServers = "localhost:9092", }; using var producer = new ProducerBuilder(producerConfig).Build(); Console.WriteLine("Press c to continue and q to exit"); while (Console.ReadKey().KeyChar != 'q') { //no exceptions, no leaks //var value = new string('a', 800_000); //ProduceException and leak var value = new string('a', 1_000_000); try { await producer.ProduceAsync("example", new Message { Key = "key", Value = value }, cts.Token); } catch (Exception e) { Console.WriteLine(e); } } } } ```

Confluent.Kafka nuget version is 1.8.2.

dotMemory screenshot ![Annotation 2022-01-17 162603](https://user-images.githubusercontent.com/16965141/149778622-2efa194e-55ef-4f0d-b57a-823453912669.jpg)

It seems like a memory leak, but we're new to Kafka and maybe are missing something.

Thanks!

mhowlett commented 2 years ago

sounds like a bug to me, thanks for the report

sampsonye commented 3 months ago

image @mhowlett same problem,any solution?