confluentinc / confluent-kafka-dotnet

Confluent's Apache Kafka .NET client
https://github.com/confluentinc/confluent-kafka-dotnet/wiki
Apache License 2.0
91 stars 869 forks source link

Access violation Exception thrown in consume method after consumer object is disposed #2055

Closed ksdvishnukumar closed 1 year ago

ksdvishnukumar commented 1 year ago

Description & How to reproduce

Hi, I use .Net core 6 to connect to kafka using Confluent Kafka Library with 2.1.1 version. I used hosted service to create a kafka clients (Consumer and Producer). Hosted service uses StartAsync and StopAysc. StartAsync method have logic to create a Kafka Consumer and Producer client. StopAsync method have logic to dispose the Kafka Consumer and producer clients.

StartAsync has a method to consume the message from subscribed topic in a while loop. StopAsync method to dispose the client.

Both of the method shares the same cancellation token using CancellationTokenSource.Token. While stopping the application, we use CancellationTokensource Cancel method to notify consumer to stop the processing.

To consume the message we use the Consume(cancellationToken) method.

In the above case am seeing the AccessViolationException is thrown in the consume method.

I have gone through the Consume Method from Confluent Kafka package, Ideally it should through the ObjectDisposedException. But i did not get that. Even i tried with latest 2.1.1 version. Still the same.

I use the custom config model to map to the respective kafka configuration.

Below are the Consumer configuration image

Exception: We use a wrapper around the Consume method image

Extracted the code for better understanding

public static readonly ConcurrentDictionary<id, ItemDetailsModel> ProcessedList = new();

public class ItemDetailsModel
{
    public Consumer<string, string> consumerHandler {get; set;}
    public CancellationTokenSource consumerCts  {get; set;}
}

public class ItemModel{
    public int id {get; set;}
    public string name {get; set;}
}

foreach(var item in items)
{
    Task.Run(()=>{
        NotificationProcess();
    });

    Task.Run (()=>{
        CreateAndProcess(item);
    });
}

public void NotificationProcess()
{
    //API Call to get the Id
    //Close and Dispose the Consumer Object based on the id
    if (ProcessedList.ContainsKey(id))
    {
        ItemDetailsModel model = ProcessedList[id];
        Dispose(model.consumerHandler, model.consumerCts);      
        ProcessedList.TryRemove(id, out ItemDetailsModel removedItem);      
    }
}

public void CreateAndProcess(ItemModel item)
{
    ConsumerConfig consumerConfig = new ConsumerConfig
    {
        BootstrapServers = "localhost:9092",
        GroupId = "my-consumer-group",
        AutoOffsetReset = AutoOffsetReset.Latest
        EnableAutoOffsetStore = false,
        EnableAutoCommit = false,
        AutoCommitIntervalMs = 0,
        EnablePartitionEof = true,
        ConnectionsMaxIdleMs = 180000,
        MaxPartitionFetchBytes = 20971520,

        PartitionAssignmentStrategy = PartitionAssignmentStrategy.CooperativeSticky,
        //This is to enable the End of Partition flag for EventHub. For Kafka, with out this property itself it will work
        IsolationLevel = IsolationLevel.ReadUncommitted,
        //https://github.com/confluentinc/confluent-kafka-dotnet/issues/89
        SocketNagleDisable = true,

        //https://github.com/Azure/azure-event-hubs-for-kafka/issues/139
        SocketKeepaliveEnable = true,
        MetadataMaxAgeMs = 180000,

        //https://learn.microsoft.com/en-us/azure/event-hubs/apache-kafka-configurations
        SessionTimeoutMs = 30000,
        MaxPollIntervalMs = 300000,

        //// Alternative to Polling timeout for Consume method where it take the TimeoutInMs.
        //// We can use the below property along with Consume Method where it passes the cancellation token
        ////Yet to be verified
        CancellationDelayMaxMs = 100,
    }
    var consumer = new Consumer<string, string>(consumerConfig, stringDeserializer, stringDeserializer);
    CancellationTokenSource cts = new();
    ProcessedList.TryAdd(item.id, new ItemDetailsModel { consumerHandler = consumer,  consumerCts = cts});  

    while (!cts.IsCancellationRequested)
    {
        consumer.Consume(cts.Token); // GIVES ACCESSVIOLATION EXCEPION IN THIS LINE
        //BUSINESS PROCESSING LOGIC (NOT MORE THAN 30ms)
    }
}
public void Dispose(Consumer<string,string> consumer,CancellationTokenSource cts )  {
    cts.Cancel()
        Thread.Sleep(2000); //If I SLEEP FOR 2 SECONDS THEN ACCESSVIOLATIONEXCEPTION HAPPEND in Close() in the NEXT LINE NOT ALWAYS but OCCASIONALY
    consumer.Close();
    consumer.Dispose();
}

Checklist

Please provide the following information:

Appreciate the help to solve this issue @mhowlett

ksdvishnukumar commented 1 year ago

@mhowlett Friendly remainder to get the solution

ksdvishnukumar commented 1 year ago

I found a way to resolve the issue and closing it

SergeyA commented 11 months ago

@ksdvishnukumar Could you please share the solution?