amielc1 / LearnKafka

Learn Apache Kafka 3.0 Ecosystem, Core Concepts, Real World Java Producers/Consumers & Big Data Architecture Using .Net Core
Apache License 2.0
1 stars 0 forks source link

Make Consumer Idempotence #24

Open amielc1 opened 2 weeks ago

amielc1 commented 2 weeks ago

using Confluent.Kafka;
using System;
using System.Collections.Concurrent;
using System.Threading.Tasks;

class Program
{
    private static readonly string kafkaBootstrapServers = "localhost:9092";
    private static readonly string kafkaTopic = "your_topic";
    private static readonly string groupId = "your_group_id";
    private static readonly ConcurrentDictionary<string, bool> processedMessages = new ConcurrentDictionary<string, bool>();

    static async Task Main(string[] args)
    {
        var config = new ConsumerConfig
        {
            BootstrapServers = kafkaBootstrapServers,
            GroupId = groupId,
            AutoOffsetReset = AutoOffsetReset.Earliest
        };

        using (var consumer = new ConsumerBuilder<Ignore, string>(config).Build())
        {
            consumer.Subscribe(kafkaTopic);

            try
            {
                while (true)
                {
                    var consumeResult = consumer.Consume();

                    if (!HasMessageBeenProcessed(consumeResult.Message.Key))
                    {
                        // Process the message
                        Console.WriteLine($"Processing message: {consumeResult.Message.Value}");

                        // Mark the message as processed
                        MarkMessageAsProcessed(consumeResult.Message.Key);
                    }
                    else
                    {
                        Console.WriteLine($"Skipping already processed message: {consumeResult.Message.Key}");
                    }
                }
            }
            catch (OperationCanceledException)
            {
                consumer.Close();
            }
        }
    }

    private static bool HasMessageBeenProcessed(string messageId)
    {
        return processedMessages.ContainsKey(messageId);
    }

    private static void MarkMessageAsProcessed(string messageId)
    {
        processedMessages.TryAdd(messageId, true);
    }
}