akka / alpakka-kafka

Alpakka Kafka connector - Alpakka is a Reactive Enterprise Integration library for Java and Scala, based on Reactive Streams and Akka.
https://doc.akka.io/docs/alpakka-kafka/current/home.html
Other
1.42k stars 387 forks source link

Consumer comes up with no assigned partitions #302

Open njhartwell opened 7 years ago

njhartwell commented 7 years ago

We are using reactive-kafka (Using com.typesafe.akka:akka-stream-kafka_2.11:0.14) with kafka 10 brokers and are frequently seeing consumers come up (usually immediately following the termination of another consumer in the group) and having no partitions assigned. Logs look like this:

2017-05-11 01:55:49,394 INFO  o.a.k.c.consumer.ConsumerConfig - ConsumerConfig values:
    auto.commit.interval.ms = 5000
    auto.offset.reset = earliest
    bootstrap.servers = [192.168.1.1:9093, 192.168.1.2:9093, 192.168.1.3:9093]
    check.crcs = true
    client.id = consumer-1
    connections.max.idle.ms = 540000
    enable.auto.commit = false
    exclude.internal.topics = true
    fetch.max.bytes = 52428800
    fetch.max.wait.ms = 500
    fetch.min.bytes = 1
    group.id = my-group
    heartbeat.interval.ms = 3000
    interceptor.classes = null
    key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
    max.partition.fetch.bytes = 1048576
    max.poll.interval.ms = 300000
    max.poll.records = 500
    metadata.max.age.ms = 300000
    metric.reporters = [com.my.app.adapters.akkalib.metrics.KafkaDropwizardMetricsReporter]
    metrics.num.samples = 2
    metrics.recording.level = INFO
    metrics.sample.window.ms = 30000
    partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
    receive.buffer.bytes = 65536
    reconnect.backoff.ms = 50
    request.timeout.ms = 305000
    retry.backoff.ms = 100
    sasl.jaas.config = null
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    sasl.kerberos.min.time.before.relogin = 60000
    sasl.kerberos.service.name = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    sasl.kerberos.ticket.renew.window.factor = 0.8
    sasl.mechanism = GSSAPI
    security.protocol = SSL
    send.buffer.bytes = 131072
    session.timeout.ms = 10000
    ssl.cipher.suites = null
    ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
    ssl.endpoint.identification.algorithm = null
    ssl.key.password = [hidden]
    ssl.keymanager.algorithm = SunX509
    ssl.keystore.location = /etc/app/server.jks
    ssl.keystore.password = [hidden]
    ssl.keystore.type = JKS
    ssl.protocol = TLS
    ssl.provider = null
    ssl.secure.random.implementation = null
    ssl.trustmanager.algorithm = PKIX
    ssl.truststore.location = /etc/ssl/app.truststore.jks
    ssl.truststore.password = [hidden]
    ssl.truststore.type = JKS
    value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer

2017-05-11 01:55:49,474 INFO  o.a.kafka.common.utils.AppInfoParser - Kafka version : 0.10.2.0
2017-05-11 01:55:49,477 INFO  o.a.kafka.common.utils.AppInfoParser - Kafka commitId : 576d93a8dc0cf421
2017-05-11 01:55:49,600 WARN  o.a.k.c.consumer.ConsumerConfig - The configuration 'metric.tag' was supplied but isn't a known config.
2017-05-11 01:55:49,600 INFO  o.a.kafka.common.utils.AppInfoParser - Kafka version : 0.10.2.0
2017-05-11 01:55:49,600 INFO  o.a.kafka.common.utils.AppInfoParser - Kafka commitId : 576d93a8dc0cf421
2017-05-11 01:55:51,899 INFO  o.a.k.c.c.i.AbstractCoordinator - Discovered coordinator foo.bar.baz:9093 (id: 123 rack: null) for group my-group.
2017-05-11 01:55:51,976 INFO  o.a.k.c.c.i.ConsumerCoordinator - Revoking previously assigned partitions [] for group my-group
2017-05-11 01:55:51,978 INFO  o.a.k.c.c.i.AbstractCoordinator - (Re-)joining group my-group
2017-05-11 01:55:52,801 WARN  akka.kafka.KafkaConsumerActor - Consumer interrupted with WakeupException after timeout. Message: null. Current value of akka.kafka.consumer.wakeup-timeout is 3000 milliseconds
2017-05-11 01:55:52,995 INFO  o.a.k.c.c.i.AbstractCoordinator - Successfully joined group my-group with generation 1
2017-05-11 02:52:49,210 INFO  c.m.a.GoogleCredentialService - Refreshing Google auth token
2017-05-11 02:52:49,404 INFO  c.m.a.GoogleCredentialService - Refreshed Google auth token, scheduling to refresh again in 3420s

Relevant code looks like this:

Consumer.committablePartitionedSource(settings, Subscriptions.topics(config.getTopic())).watchTermination(Keep.both())
                .via(killSwitch.flow())
                .via(sourceFilter.limiterFlow())
                .flatMapMerge(1000, r -> r.second()
                        .filter(this::filter)
                        .map(this::parseRecord)
                        .filter(Optional::isPresent)
                        .map(Optional::get)
                        .mapAsync(1, this::sendRequest)
                        .map(this::handleResponse))
                .map(StreamMessage::getKafkaMessage)
                .batch(
                        1000,
                        first -> ConsumerMessage.emptyCommittableOffsetBatch().updated(first.committableOffset()),
                        (batch, elem) -> batch.updated(elem.committableOffset()))
                .mapAsync(10, ConsumerMessage.Committable::commitJavadsl)
                .runWith(Sink.ignore(), materializer);

My apologies for lack of runnable test case, etc. (that will soon follow if need be); just wanted to get this out there in case there is a known issue behind it or something obviously wrong with how we're using the library. Thanks!

jpeel commented 7 years ago

We are seeing this sometimes as well. I think that the wake up time out should be much higher. It currently is 3 seconds, but the default heartbeat in kafka is also 3 seconds. The heartbeat frequency is a factor in how long it takes to rebalance so it makes sense to me that the wake up time out should be significantly higher, perhaps close to the default session timeout of 10 seconds.

romanwozniak commented 7 years ago

I'm facing something similar issue in my project:

09:11:43.738 [test.kafka.default-dispatcher-22] DEBUG o.a.k.c.c.i.ConsumerCoordinator - Group test committed offset 17872622155 for partition test-0
09:11:43.753 [test.kafka.default-dispatcher-14] INFO  o.a.k.clients.producer.KafkaProducer - Closing the Kafka producer with timeoutMillis = 60000 ms.
09:11:43.755 [kafka-producer-network-thread | producer-1] DEBUG o.a.k.c.producer.internals.Sender - Beginning shutdown of Kafka producer I/O thread, sending remaining records.
09:11:43.759 [test.actor.default-dispatcher-48] DEBUG akka.kafka.internal.ProducerStage - Stage completed
09:11:43.762 [kafka-producer-network-thread | producer-1] DEBUG o.a.k.c.producer.internals.Sender - Shutdown of Kafka producer I/O thread has completed.
09:11:43.762 [test.kafka.default-dispatcher-14] DEBUG o.a.k.clients.producer.KafkaProducer - The Kafka producer has closed.
09:11:43.763 [test.actor.default-dispatcher-48] DEBUG akka.kafka.internal.ProducerStage - Producer closed

This happens randomly after couple of hours (or even days) of running. The reason looks to be this:

Exception in thread "main" java.lang.IllegalStateException: No current assignment for partition test-0
    at org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:264)
    at org.apache.kafka.clients.consumer.internals.SubscriptionState.committed(SubscriptionState.java:269)
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:739)
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:716)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:784)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:765)
    at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:186)
    at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:149)
    at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:116)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:493)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:322)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:253)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.pollNoWakeup(ConsumerNetworkClient.java:263)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$HeartbeatThread.run(AbstractCoordinator.java:907)

(com.typesafe.akka:akka-stream-kafka_2.12:0.15 with kafka brokers 0.10)

rafalbigaj commented 7 years ago

+1

I experience exactly the same problem quite randomly after wakeup:

07/13/2017 23:37:43.090|INFO ||||org.apache.kafka.common.config.AbstractConfig.logAll(AbstractConfig.java:180)|ProducerConfig values: 
    acks = 1
    batch.size = 16384
    block.on.buffer.full = false
    bootstrap.servers = [kafka01-stage1.messagehub.services.us-south.bluemix.net:9093, kafka02-stage1.messagehub.services.us-south.bluemix.net:9093, kafka03-stage1.messagehub.services.us-south.bluemix.net:9093, kafka04-stage1.messagehub.services.us-south.bluemix.net:9093, kafka05-stage1.messagehub.services.us-south.bluemix.net:9093]
    buffer.memory = 33554432
    client.id = 
    compression.type = none
    connections.max.idle.ms = 540000
    interceptor.classes = null
    key.serializer = class org.apache.kafka.common.serialization.StringSerializer
    linger.ms = 0
    max.block.ms = 60000
    max.in.flight.requests.per.connection = 5
    max.request.size = 1048576
    metadata.fetch.timeout.ms = 60000
    metadata.max.age.ms = 300000
    metric.reporters = []
    metrics.num.samples = 2
    metrics.sample.window.ms = 30000
    partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
    receive.buffer.bytes = 32768
    reconnect.backoff.ms = 50
    request.timeout.ms = 30000
    retries = 0
    retry.backoff.ms = 100
    sasl.jaas.config = [hidden]
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    sasl.kerberos.min.time.before.relogin = 60000
    sasl.kerberos.service.name = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    sasl.kerberos.ticket.renew.window.factor = 0.8
    sasl.mechanism = PLAIN
    security.protocol = SASL_SSL
    send.buffer.bytes = 131072
    ssl.cipher.suites = null
    ssl.enabled.protocols = [TLSv1.2]
    ssl.endpoint.identification.algorithm = HTTPS
    ssl.key.password = null
    ssl.keymanager.algorithm = SunX509
    ssl.keystore.location = null
    ssl.keystore.password = null
    ssl.keystore.type = JKS
    ssl.protocol = TLSv1.2
    ssl.provider = null
    ssl.secure.random.implementation = null
    ssl.trustmanager.algorithm = PKIX
    ssl.truststore.location = null
    ssl.truststore.password = null
    ssl.truststore.type = JKS
    timeout.ms = 30000
    value.serializer = class org.apache.kafka.common.serialization.StringSerializer

07/13/2017 23:37:43.094|INFO ||||org.apache.kafka.common.utils.AppInfoParser$AppInfo.<init>(AppInfoParser.java:83)|Kafka version : 0.10.2.1
07/13/2017 23:37:43.098|INFO ||||org.apache.kafka.common.utils.AppInfoParser$AppInfo.<init>(AppInfoParser.java:84)|Kafka commitId : e89bffd6b2eff799
07/13/2017 23:37:43.099|INFO ||||org.apache.kafka.common.utils.AppInfoParser$AppInfo.<init>(AppInfoParser.java:83)|Kafka version : 0.10.2.1
07/13/2017 23:37:43.099|INFO ||||org.apache.kafka.common.utils.AppInfoParser$AppInfo.<init>(AppInfoParser.java:84)|Kafka commitId : e89bffd6b2eff799

07/13/2017 23:37:44.534|INFO ||||org.apache.kafka.clients.consumer.internals.AbstractCoordinator$GroupCoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:586)|Discovered coordinator kafka05-stage1.messagehub.services.us-south.bluemix.net:9093 (id: 2147483643 rack: null) for group test-UsageMetricsSpec-group-1.
07/13/2017 23:37:44.539|INFO ||||org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinPrepare(ConsumerCoordinator.java:397)|Revoking previously assigned partitions [] for group test-UsageMetricsSpec-group-1
07/13/2017 23:37:44.539|INFO ||||org.apache.kafka.clients.consumer.internals.AbstractCoordinator.sendJoinGroupRequest(AbstractCoordinator.java:420)|(Re-)joining group test-UsageMetricsSpec-group-1
[WARN] [07/13/2017 23:37:46.191] [ml-event-client-units-akka.kafka.default-dispatcher-20] [akka://ml-event-client-units/system/kafka-consumer-2] Consumer interrupted with WakeupException after timeout. Message: null. Current value of akka.kafka.consumer.wakeup-timeout is 3000 milliseconds
07/13/2017 23:37:46.640|INFO ||||org.apache.kafka.clients.consumer.internals.AbstractCoordinator$1.onSuccess(AbstractCoordinator.java:388)|Successfully joined group test-UsageMetricsSpec-group-1 with generation 5

After: Successfully joined group test-UsageMetricsSpec-group-1 with generation 5 consumer gets stuck forever

wojda commented 7 years ago

In my case increasing wakeup timeout didn't help. Consumers get stuck rarely but they still get stuck. It is a big issue because Kafka brokers think partitions are assign to the consumer. The only way to "unclog" the consumer is to trigger a group rebalance. Is the way reactive-kafka uses a wakeup mechanism an issue or vanilla kafka consumer implementation?

anilkumar763 commented 7 years ago

I am also facing this issue. Thrice I saw in the past month. The rebalance happens frequently in my application. But at times, I see 0 partitions assinged (I subsrcibed to 1 partition)

Does it take time to get update that the partition is already revoked ?? Should we poll again , if this arises ??

jpeel commented 7 years ago

I am pretty confident that the issue is that the poll initiated by the KafkaConsumerActor is grabbing the metadata and joining the group, but the wakeupexception interrupts this process after the group is joined but before the callbacks (ConsumerRebalanceListener) are called which are used to tell the SingleSourceLogic stuff which topics are subscribed to. The SingleSourceListener makes requests to the KafkaConsumerActor for the topic partitions that it knows about.. but they either aren't the right ones or are empty because the callback was never called. To be clear, the KafkaConsumer has been assigned partitions in this case, but because the SingleSourceLogic doesn't know about them, they are never consumed from.

I'm not fully sure how the code should be changed to deal with this problem, but the workaround we are implementing right now is to set the wake up timeout a little longer (6 seconds or so), set the max wakeup timeouts to 1, and restart the stream whenever the max wakeup timeouts is encountered.

nivox commented 7 years ago

We're facing the same issue.

We have a service which sometime stops receiving data right after a WakeupException. The issue doesn't happen on every WakeupException, but when it happens it's always after a WakeupException.

The explanation of @jpeel fit pretty well with our observation.

We'll try and implement the proposed workaround, but a more structured way of solving the issue (without restarting the stream) would be definitely appreciated.

Has there been any activity on this issue?

nivox commented 7 years ago

Ping @patriknw @kciesielski. Given you're the last to have worked on the code, do you have any insights?

patriknw commented 7 years ago

Defaults:

wakeup-timeout = 3s
max-wakeups = 10

So you mean that increasing the wakeup-timeout to let's say 9s would be different than trying 3 times with 3s?

The reason it's bad to have long timeout is that it's blocking a thread during that time, and that can easily result in starvation and unresponsiveness of other streams running on the same dispatcher. But sure, if that solves the problem please go ahead and create a PR.

Note that the reason for retrying at all, compared to just failing the stream immediately and only rely on stream restart is that then it can hopefully survive short glitches without loosing the state.

nivox commented 7 years ago

@patriknw I think the point of @jpeel was that sometimes when the timeout occurs it somehow leaves Kafka consumer in a state where no messages are being received (even though there are messages available) and the stream becomes stuck. The proposed workaround of raising the timeout and reset the stream was to circumvent the problem but indeed it is NOT a solution.

I think that the retry mechanism is the right way to go, as you said it would prevent losing the stream state. Given the thesis of @jpeel on why the streams gets stuck, do you think it is plausible? Do you have any idea on how to avoid the issue while maintaining the retry mechanism?

I'm willing to try and put together a PR, but I don't really know the internals so any guidance would be welcome.

patriknw commented 6 years ago

Ok, read his comment again. Is there a way we can reset things completely after a wakeup interupt? So that it is treated as a new consumer in next attempt? Or can we get the assigned partitions with a merhod call after wakeup, since we might have missed the callback. Other ideas?

nivox commented 6 years ago

@patriknw @jpeel I've tried to put together an implementation of the first strategy you proposed: reset things completely after a wakeup. commit

The base idea is to just unsubscribe and re-subscribe right away to all the topics. This way the assignment callbacks will be called and the state of the Single/SubSourceLogic should be consistent. Note that I had to introduce another callback just to clean the state of the Single/SubSourceLogic to make things consistent.

While I was working on this it came to me that another strategy we could use is to:

This strategy would still require to keep some book keeping on the KafkaConsumerActor side but would free the Single/SubSourceLogic from any change. This requires that the internal state of the KafkaConsumer is not corrupted after the wakeup, which I think is a reasonable assumption to make.

I'll try to work on this approach in the next few days, in the meantime could someone take a look at the current proposal and give me some feedback?

nivox commented 6 years ago

@patriknw @jpeel I've implemented the alternative strategy of reconcile the TopicPartition assignments on WakeupException. commit

As I stated in the previous comment this strategy tries to reconcile the new KafkaConsumer assignments with the Source's by manually calling the RebalanceListener callbacks for all revoked/added TopicPartition assignment.

Can someone take a look at both approaches and give some feedback? For both solution I've tested that everything works in the normal case but I'm not sure how to manually cause the issue (stopping the Kafka broker will not work as the consumer detects that the broker is not there and won't follow the code path leading to the wakeup). Any ideas on how to test this in a controlled environment?

schrepfler commented 6 years ago

If we compare a normal scenario where the partitions get assigned

INFO  [2017-10-22 09:16:48,855] [ConsumerCoordinator:256] [] [] - Setting newly assigned partitions [coffee_manager_events_1.0-2, coffee_manager_events_1.0-1, coffee_manager_events_1.0-0, coffee_manager_events_1.0-4, coffee_manager_events_1.0-3] for group coffee-manager
INFO  [2017-10-22 09:16:48,855] [AbstractCoordinator:388] [] [] - Successfully joined group coffee-manager with generation 9
INFO  [2017-10-22 09:16:48,852] [AbstractCoordinator:420] [] [] - (Re-)joining group coffee-manager
INFO  [2017-10-22 09:16:47,504] [AbstractCoordinator:420] [] [] - (Re-)joining group coffee-manager
INFO  [2017-10-22 09:16:47,504] [ConsumerCoordinator:397] [] [] - Revoking previously assigned partitions [] for group coffee-manager

bad scenario where it happens

INFO  [2017-10-22 09:17:53,401] [AbstractCoordinator:388] [] [] - Successfully joined group coffee-manager with generation 11
WARN  [2017-10-22 09:17:53,401] [KafkaConsumerActor:78] [coffeemanager-akka.kafka.default-dispatcher-62041] [akka.tcp://coffeemanager@contoso.com2551/system/kafka-consumer-9] - Consumer interrupted with WakeupException after timeout. Message: null. Current value of akka.kafka.consumer.wakeup-timeout is 3000 milliseconds
INFO  [2017-10-22 09:17:53,377] [AbstractCoordinator:388] [] [] - Successfully joined group coffee-manager with generation 11
INFO  [2017-10-22 09:17:52,959] [AbstractCoordinator:420] [] [] - (Re-)joining group coffee-manager
INFO  [2017-10-22 09:17:50,433] [AbstractCoordinator:420] [] [] - (Re-)joining group coffee-manager
INFO  [2017-10-22 09:17:50,433] [ConsumerCoordinator:397] [] [] - Revoking previously assigned partitions [] for group coffee-manager

It for sure points to some floppy consumers but wondering if reactive-kafka can be more resilient to this? One way to self-check would be to use the driver consumer-coordinator-metrics and get the assigned partitions count which should subscribed to go up within a certain period and if not perhaps let it escalate to the supervisor?

nivox commented 6 years ago

@schrepfler wouldn't the strategy I proposed in the last message fix the problem?

schrepfler commented 6 years ago

Without knowing too many Kafka internals, I think WakeupExceptions are not such a rare events in reality, is it acceptable to reconcile all the time? I've noticed some callbacks being created and you mention this Single/SubSourceLogic, is that the best/only technique we have at the driver level? There's no risk of getting in some bad state, or race condition, or memory leak?

patriknw commented 6 years ago

@nivox The second alternative looks attractive since it contains the problem inside the KakaConsumerActor. I don't know if there are any drawbacks of always doing that for the WakeupExc.

chbatey commented 6 years ago

@nivox i like the second solution. I rebased it and added some more logging around wake up exceptions. Can you review https://github.com/akka/reactive-kafka/pull/395 ?

nivox commented 6 years ago

@chbatey: Thanks for taking the issue forward, I was a little bit unsure on how to proceed due to the lack of feedback.

I've taken a look at the pull request and it seems good to me.

giejay commented 6 years ago

Any update on this? We are facing the same issue, increasing the wakeup timeout to 6s and setting the the max to 1 seems to resolve the issue but doesn't feel really solid to me.

Am I correct to assume that MR https://github.com/akka/reactive-kafka/pull/395 did not fully resolve the issue?

ennru commented 6 years ago

We'd like to understand if people still experience this with 0.20 and 0.21.

Hallborg commented 6 years ago

I had this issue on 0.16 and was fortunate to catch the 0.22 on the release day. I haven't seen the issue since.

ennru commented 6 years ago

Thank you for this update. Anyone else?

sudpaw commented 6 years ago

Im using 0.22 and i'm having the same issue.

I'll try with changing the settings.

breckcs commented 5 years ago

@ennru, we just encountered this issue using v0.22. We stopped consuming a subset of topic partitions after the error: KafkaConsumer poll has exceeded wake up timeout (3 seconds). Waking up consumer to avoid thread starvation. We were using the default configurations.

We have encountered the wake-up timeout error 60 times in the past 30 days, but only once did it stop consuming.

Anecdotally, we used to encounter this issue more often on older versions of alpakka-kafka. We would also usually lose all topic partitions, not just a subset. As a workaround, we added an idleTimeout flow element to terminate and restart the stream if it stopped consuming all topic partitions, but of course that didn't work when only losing a subset of them.

We are looking at #614 in v1.0 to ultimately resolve this issue, but we are interested if any workarounds have been effective.

ennru commented 5 years ago

Thank you for reporting that here. So some state in the actor or Kafka client seems to be lost upon wake-up in certain cases. I'm not aware of a workaround for this. We've improved the error-handling around polling in 1.0-M1 a bit more, but you're right #614 would make the wakeup-dance go away completely.

politrons commented 5 years ago

@ennru any idea when version 1.0 will be production ready?. I see that for now is just RC version.

ennru commented 5 years ago

It is considered production ready. We do not expect any significant changes and might call it 1.0 before the end of March 2019.

ValueYouth commented 5 years ago

I have the same problem. After investigation, I found that there are two consumers.I hope my ideas can help you. Command used: bin/kafka-consumer-groups.sh --bootstrap-server [your kafka server] --group [your group name] --describe

ennru commented 5 years ago

Thank you @ValueYouth, we haven't found any evidence that this is caused by Alpakka Kafka, so your hint might help others to identify the problem.

baltiyskiy commented 4 years ago

FWIW: https://issues.apache.org/jira/browse/KAFKA-9917 --- I've seen this as well several times after broker restart. In my app, consumer source was inside RestartSource, so in fact both restarted.

yogendramaarisetty commented 1 week ago

I am using Confluent kafka in dotnetcore I need to check if the Assignments are greater than zero while consuming. But I am ending up in a continous loop without any assignments

Os: Windows 64 Kafka: Kafka 2.8.1 dotnet: 6.0 library: ConfluentKafka I have a event-log-master Topic created with 5 partitions on my kafka broker

Here's sample code in dotnetcore that I using for my testing to reproduce.

ConsumerConfig conf = new ConsumerConfig();
conf.BootstrapServers = "localhost:9092";
conf.EnableAutoCommit = false;
conf.SessionTimeoutMs = 30000;
conf.TopicMetadataRefreshIntervalMs = 10000;
conf.AutoOffsetReset = AutoOffsetReset.Latest;
conf.MaxPollIntervalMs = 300 * 60 * 1000;
conf.MaxPartitionFetchBytes = 10000;
conf.GroupId = "test-group-poc-1";
conf.AllowAutoCreateTopics = true;
conf.PartitionAssignmentStrategy = PartitionAssignmentStrategy.RoundRobin;

var pattern = "event-log-master";
long count = 0;

CancellationTokenSource cts = new CancellationTokenSource();
Console.CancelKeyPress += (_, e) => {
    e.Cancel = true; // prevent the process from terminating.
    cts.Cancel();
};
using (var consumer = new ConsumerBuilder<string, string>(conf).Build())
{
    consumer.Subscribe(pattern);
    try
    {
        while (!cts.Token.IsCancellationRequested)
        {
            if (consumer.Assignment.Count > 0)
            {
                    var record = consumer.Consume(cts.Token);
                    Console.WriteLine($"Consumed {record?.Message}");
                                       if (record != null)
                    {
                        consumer.Commit(record);
                    }
            }
            else
            {
                ConsumeResult<string, string> msg = null;
                // var msg = consumer.Consume(0);
                consumer.Subscribe(pattern);
                Thread.Sleep(1000);
                Console.WriteLine($"ZeroAssignments: {consumer.Name} | msg: {msg?.Message}");
            }
        }
    }
    catch (OperationCanceledException)
    {
        // Ctrl-C was pressed.
    }
    finally
    {
        consumer.Close();
    }

I found a workaround which is somehow giving the assignments. I tried to call consumer.Consume(0) with 0 timeout when there are not assignments after the loop runs for 2 to 3 times we are getting the assignments. This behavior seems to be wierd.

What's the right fix. I was trying to re subscribe If I donet find any assignments, I know it doesnt make sense but I was just trying.