ntent / kafka4net

C# client for Kafka
Apache License 2.0
52 stars 32 forks source link

Producer starts writing to the same single partition (when no Message.Key is specified) #25

Closed vikmv closed 8 years ago

vikmv commented 8 years ago

There is a bug in FletcherHashedMessagePartitioner.

Because Random is not thread-safe, when using Producer in multithreaded environment for some time it'll start writing to the single one partition (only when message has no Key specified).

Here is small snippet that shows, usage of random in multithreaded environment:

class Message { public int Partition; }

static void Main()
{
    var random = new Random();

    //Here is how to fix it:
    //var randomLocal = new ThreadLocal<Random>(() => new Random());

    var data = new ConcurrentDictionary<int, int>();

    var subject = new Subject<Message>();

    subject
        //another way of fixing - adding .Synchronize() here
        .Do(msg => msg.Partition = random.Next(10))
        .Subscribe(x => { data.AddOrUpdate(x.Partition, i => 1, (i, val) => val + 1); });

    var threads = new List<Thread>();

    for (int t = 0; t < 10; t++)
    {
        var thread = new Thread(() =>
        {
            for (int i = 0; i < 100000; i++)
                subject.OnNext(new Message());
        });

        thread.Start();
        threads.Add(thread);
    }

    foreach (var thread in threads)
        thread.Join();

    foreach (var d in data)
        Console.WriteLine("Value: {0}, Count: {1}", d.Key, d.Value);

    //results with non thread-local random are something like this:
    //Value: 0, Count: 933076
    //Value: 1, Count: 7443
    //Value: 2, Count: 7345
    //Value: 3, Count: 7363
    //Value: 4, Count: 7600
    //Value: 5, Count: 7454
    //Value: 6, Count: 7658
    //Value: 7, Count: 7302
    //Value: 8, Count: 7360
    //Value: 9, Count: 7399
}
vchekan commented 8 years ago

Thank you, accepted your patch. Partitioning is indeed called in caller thread and thus must be thread-safe. Published at nuget.org as version 1.3