akkadotnet / Alpakka

Akka Streams Connectors - Alpakka
https://alpakka.getakka.net/
Apache License 2.0
109 stars 40 forks source link

Kafka - assignedPartitions is null for ConsumerStage when using TopicPartitionOffset #44

Closed thedavejay closed 4 years ago

thedavejay commented 5 years ago

Hi there,

I am using the producer for the examples and the same code for the consumer except for 1 difference.

Instead of using Subscriptions.Topics("testtopic"), I am using Subscriptions.AssignmentWithOffset(new TopicPartitionOffset("testtopic", 0, Offset.Beginning));

This all works ok until the buffer is reached and the following code in ConsumerStage throws a an argument null exception as the assignedPartitions is null:

 private void PullQueue()
        {
            _consumer.Poll(_settings.PollTimeout);

            if (!isPaused && _buffer.Count > _settings.BufferSize)
            {
                Log.Debug($"Polling paused, buffer is full");
                _consumer.Pause(assignedPartitions);   // BREAKING CODE
                isPaused = true;
            }
        }

When I use the following code :

 static void Main(string[] args)
        {
            Config fallbackConfig = ConfigurationFactory.ParseString(@"
                    akka.suppress-json-serializer-warning=true
                    akka.loglevel = DEBUG
                ").WithFallback(ConfigurationFactory.FromResource<ConsumerSettings<object, object>>("Akka.Streams.Kafka.reference.conf"));

            var system = ActorSystem.Create("TestKafkaConsumer", fallbackConfig);
            var materializer = system.Materializer();

            var consumerSettings = ConsumerSettings<Null, string>.Create(system, null, new StringDeserializer(Encoding.UTF8))
                .WithBootstrapServers("localhost:9092")
                .WithGroupId("group1");

            //var subscription = Subscriptions.Topics("testtopic");

            var subscription = Subscriptions.AssignmentWithOffset(new TopicPartitionOffset("testtopic", 0, Offset.Beginning));

            var source = Consumer.PlainSource(consumerSettings, subscription);

            source.Throttle(5, TimeSpan.FromSeconds(1), 5, ThrottleMode.Shaping)
                .RunForeach(
                    result =>
                    {
                        Console.WriteLine(
                            $"Consumer: {result.Topic}/{result.Partition} {result.Offset}: {result.Value}");
                    }, materializer);

            Console.ReadLine();
        }

I'm not a Kafka expert and couldn't tell how I could fix this. It seems to be similar to this issue:

https://github.com/confluentinc/confluent-kafka-dotnet/issues/434

thedavejay commented 5 years ago

From what I can tell, it may be better to use _consumer.Assignment to get the assigned partitions that relying on the local variable that gets populated from the OnPartitionAssigned events?

e.g.

_consumer.Resume(_consumer.Assignment);
//AND
_consumer.Pause(_consumer.Assignment);
AndreSteenbergen commented 5 years ago

I'll check, to my knowledge the assignedpartitions event is only raised when a subscription is used, and when rebalancing accoured. So I guess you have a point. I'll check in a debug session.

AndreSteenbergen commented 5 years ago

Just to be sure, I changed this code into: _assignedPartitions ?? _consumer.Assignment I added this into my already existing PR.

thedavejay commented 5 years ago

Sorry for taking so long to reply. Thanks for that!

Arkatufus commented 4 years ago

Akka.Streams.Kafka have been moved out of Alpakka into its own Github repo. Closing this issue. Please re-open them in https://github.com/akkadotnet/Akka.Streams.Kafka if you're still having this problem.