confluentinc / confluent-kafka-dotnet

Confluent's Apache Kafka .NET client
https://github.com/confluentinc/confluent-kafka-dotnet/wiki
Apache License 2.0
52 stars 857 forks source link

Consumer consuming messages only after re-naming group.Id #1487

Open tovalepo opened 3 years ago

tovalepo commented 3 years ago

Description

I have a .Net core console application, that uses Confluent.Kafka. I build a consumer for consuming messages from specific topic. the app is intended to run a few times every-day, consume the messages on the specified topic, process them and exit.

It took me a while to understand the consumer's vehavior, but finally I've realized the consumer will consume messages only if its groupId is a one that was never in use before. Every time I change the consumer's groupId - the comsumer will fetch the messages in the subscribed topic. But on the next runs - with same groupId - the consumer.Consume() returns EOF (EnablePartitionEof is set to 'true').

This behvior seems rlated to rebalance between consumers on same group. Altough I don't understand why - since the consumer should exist only throughout the application liftime. Before leaving the app, I call to consumer.Close() and consumer.Dispose(). These should destoy the consumer, so that on the next run, when I create the consumer, again it will be the first and single consumer on the specified groupId, and so - get assigned the only partition on the topic. (I created the topic via CommandLine, with only 1 partition). But as I said, this is not what happens in fact.

I know there are messages on the topic - I can consume them via command-line. And I also made sure the topic has only 1 partition.

The most weird thing is, that I have another .net core console app, which does the same process - and with no issue at all. The only difference betwwen the 2 aps, is that the working app consuming messages which I produces manually via command-line, while the non-working app is consuming messages produces by another .net producer.

How to reproduce

Working app: Class Program.cs class Program { ...

    static void Main(string[] args)
    {
        if (args.Length != 2)
        {
            Console.WriteLine("Please provide topic name to read and SMTP topic name");
        }
        else
        {
            var services = new ServiceCollection();
            services.AddSingleton<ConsumerConfig, ConsumerConfig>();
            services.AddSingleton<ProducerConfig, ProducerConfig>();

            var serviceProvider = services.BuildServiceProvider();

            var cConfig = serviceProvider.GetService<ConsumerConfig>();
            var pConfig = serviceProvider.GetService<ProducerConfig>();

            cConfig.BootstrapServers = Environment.GetEnvironmentVariable("consumer_bootstrap_servers");
            cConfig.GroupId = "confluence-consumer";
            cConfig.EnableAutoCommit = true;
            cConfig.StatisticsIntervalMs = 5000;
            cConfig.SessionTimeoutMs = 6000;
            cConfig.AutoOffsetReset = AutoOffsetReset.Earliest;
            cConfig.EnablePartitionEof = true;

            pConfig.BootstrapServers = Environment.GetEnvironmentVariable("producer_bootstrap_servers");

            var consumer = new ConsumerHelper(cConfig, args[0]);

            messages = new Dictionary<string, Dictionary<string, UserMsg>>();

            var result = consumer.ReadMessage();
            while (result != null && !result.IsPartitionEOF)
            {
                Console.WriteLine($"Current consumed msg-json: {result.Message.Value}");

                ...

                result = consumer.ReadMessage();
            }

            consumer.Close();
            Console.WriteLine($"Done consuming messages from topic {args[0]}");

        }

    }

Class ConsumerHelper.cs `public class ConsumerHelper { private string _topicName; private ConsumerConfig _consumerConfig; private IConsumer<string, string> _consumer;

    public ConsumerHelper(ConsumerConfig consumerConfig, string topicName)
    {
        try
        {
            _topicName = topicName;
            _consumerConfig = consumerConfig;

            var builder = new ConsumerBuilder<string, string>(_consumerConfig);
            _consumer = builder.Build();

            _consumer.Subscribe(_topicName);
        }
        catch (System.Exception exc)
        {
            Console.WriteLine($"Error on ConsumerHelper: {exc.ToString()}");
        }
    }

    public ConsumeResult<string, string> ReadMessage()
    {
        Console.WriteLine("ReadMessage: start");
        try
        {
            return _consumer.Consume();
        }
        catch (System.Exception exc)
        {
            Console.WriteLine($"Error on ReadMessage: {exc.ToString()}");
            return null;
        }
    }

    public void Close()
    {
        Console.WriteLine("Close: start");
        try
        {
            _consumer.Close();
            _consumer.Dispose();
        }
        catch (System.Exception exc)
        {
            Console.WriteLine($"Error on Close: {exc.ToString()}");
        }
    }
}

} Non-working app: Class Program.cs class Program { private static SmtpClient smtpClient; private static Random random = new Random(); static void Main(string[] args) { try { var services = new ServiceCollection(); services.AddSingleton<ConsumerConfig, ConsumerConfig>(); services.AddSingleton(new SmtpClient("smtp.gmail.com"));

            var serviceProvider = services.BuildServiceProvider();

            var cConfig = serviceProvider.GetService<ConsumerConfig>();
            cConfig.BootstrapServers = Environment.GetEnvironmentVariable("consumer_bootstrap_servers");
            cConfig.GroupId= "smtp-consumer";
            cConfig.GroupInstanceId = "single-smtp-consumer";
            cConfig.EnableAutoCommit = true;
            cConfig.StatisticsIntervalMs = 5000;
            cConfig.SessionTimeoutMs = 6000;
            cConfig.AutoOffsetReset = AutoOffsetReset.Earliest;
            cConfig.EnablePartitionEof = true;

            var consumer = new ConsumerHelper(cConfig, args[0]);

            ...

            var result = consumer.ReadMessage();
            while (result!=null && !result.IsPartitionEOF)
            {
                Console.WriteLine($"current consumed message: {result.Message.Value}");
                var msg = JsonConvert.DeserializeObject<EmailMsg>(result.Message.Value);
                SendEmail(msg);

                result = consumer.ReadMessage();
            }

            Console.WriteLine("Done sending emails consumed from SMTP topic");
            consumer.Close();
        }
        catch (System.Exception exc)
        {
            Console.WriteLine($"Error on Main: {exc.ToString()}");
        }

    }

} Class ConsumerHelper.cs public class ConsumerHelper { private string _topicName; private ConsumerConfig _consumerConfig; private IConsumer<string, string> _consumer; public ConsumerHelper(ConsumerConfig consumerConfig, string topicName) { try { _topicName = topicName; _consumerConfig = consumerConfig;

            var builder = new ConsumerBuilder<string, string>(_consumerConfig);

            builder.SetPartitionsAssignedHandler((_consumer, ps) =>
            {var committed = _consumer.Committed(ps, TimeSpan.FromSeconds(2)); 
            Console.WriteLine($"Assigned partitions:{string.Join(',',committed)}");});

            _consumer = builder.Build();

            _consumer.Subscribe(_topicName);

        }
        catch (System.Exception exc)
        {
            Console.WriteLine($"Error on ConsumerHelper: {exc.ToString()}");
        }
    }
    public ConsumeResult<string, string> ReadMessage()
    {
        Console.WriteLine("ConsumeResult: start");
        try
        {

            return _consumer.Consume();
        }
        catch (System.Exception exc)
        {
            Console.WriteLine($"Error on ConsumeResult: {exc.ToString()}");
            return null;
        }
    }
    public void Close()
    {
        Console.WriteLine("Close: start");
        try
        {
            _consumer.Close();
            _consumer.Dispose();
        }
        catch (System.Exception exc)
        {
            Console.WriteLine($"Error on Close: {exc.ToString()}");
        }
        Console.WriteLine("Close: end");
    }

}

}`

Note the write to console I added to builder.SetPartitionsAssignedHandler() The output is: On first run, i.e. with a group.Id never in use before (messages are consumed as required) - Assigned partitions:SMTP [[0]] @Unset [-1001] On next run and on with same group.Id - Assigned partitions:SMTP [[0]] @1

Checklist

Please provide the following information:

tovalepo commented 3 years ago

Well, after lots of research, it seems the issue is on the producer side - or I have another issue on the producer side I produce the messages to my topic via a .net producer. And I see the last message of every bunch-of-messages is never produced. If I produce 5 messages - then 4 are produced but the 5th is not. If I produce 3 messages, then 2 are produced but the 3rd - not. Here is my producer code: ` public class ProducerHelper { private string _topicName; private IProducer<string, string> _producer; private ProducerConfig _config; private static Random random = new Random(); public ProducerHelper(ProducerConfig config, string topicName) { try { _config = config; _topicName = topicName; var pBuilder = new ProducerBuilder<string, string>(_config); _producer = pBuilder.Build(); } catch (System.Exception exc) { Console.WriteLine($"Error on ProducerHelper: {exc.ToString()}"); } }

    public void WritreMessage(EmailMsg msg)
    {
        Console.WriteLine("WritreMessage: start");
        try
        {
            var jsonMsg = JsonConvert.SerializeObject(msg,
              new JsonSerializerSettings
              {
                  ContractResolver = new CamelCasePropertyNamesContractResolver()
              });
            Console.WriteLine($"Producing msg: {jsonMsg}");
            _producer.Produce(_topicName, new Message<string, string>()
            {
                Key = random.Next().ToString(),
                Value = jsonMsg
            });
        }
        catch (System.Exception exc)
        {
            Console.WriteLine($"Error on WritreMessage: {exc.ToString()}");
        }
        Console.WriteLine("WritreMessage: end");
    }
}

`

` class Program { ... private static Dictionary<string, Dictionary<string, UserMsg>> messages; static void Main(string[] args) { if (args.Length != 2) { Console.WriteLine("Please provide topic name to read and SMTP topic name"); } else { var services = new ServiceCollection(); services.AddSingleton<ConsumerConfig, ConsumerConfig>(); services.AddSingleton<ProducerConfig, ProducerConfig>();

            var serviceProvider = services.BuildServiceProvider();

            var cConfig = serviceProvider.GetService<ConsumerConfig>();
            var pConfig = serviceProvider.GetService<ProducerConfig>();

            cConfig.BootstrapServers = Environment.GetEnvironmentVariable("consumer_bootstrap_servers");
            cConfig.GroupId = "confluence-consumer";
            cConfig.EnableAutoCommit = true;
            cConfig.StatisticsIntervalMs = 5000;
            cConfig.SessionTimeoutMs = 6000;
            cConfig.AutoOffsetReset = AutoOffsetReset.Earliest;
            cConfig.EnablePartitionEof = true;

            pConfig.BootstrapServers = Environment.GetEnvironmentVariable("producer_bootstrap_servers");

            var consumer = new ConsumerHelper(cConfig, args[0]);

            messages = new Dictionary<string, Dictionary<string, UserMsg>>();

            var result = consumer.ReadMessage();
            while (result != null && !result.IsPartitionEOF)
            {
                Console.WriteLine($"Current consumed msg-json: {result.Message.Value}");

                var msg = JsonConvert.DeserializeObject<ConfluenceMsg>(result.Message.Value,
                  new JsonSerializerSettings
                  {
                      ContractResolver = new CamelCasePropertyNamesContractResolver()
                  }
                );

                AggreagteCurrentMsg(msg);

                result = consumer.ReadMessage();
            }

            consumer.Close();
            Console.WriteLine($"Done consuming messages from topic {args[0]}");

            var producer = new ProducerHelper(pConfig, args[1]);

            foreach (var msg in messages)
            {
                //build emails & send to producer
                var mail = BuildMail(msg);
                producer.WritreMessage(mail);
            }
        }
    }
}

`

mhowlett commented 3 years ago

I've realized the consumer will consume messages only if its groupId is a one that was never in use before.

that's expected behavior since you're committing offsets: cConfig.EnableAutoCommit = true;.

the cConfig.AutoOffsetReset property tells the consumer what to do if no committed offsets are found. because you have that set to earliest, the consumer will read from the beginning of the partitions - but only when you change group.id, because the previous group will have committed offsets corresponding to the end of the partitions. the new group will have no committed offsets yet.

anchitj commented 3 months ago

Is this still an issue?