akka / alpakka-kafka

Alpakka Kafka connector - Alpakka is a Reactive Enterprise Integration library for Java and Scala, based on Reactive Streams and Akka.
https://doc.akka.io/libraries/alpakka-kafka/current/
Other
1.42k stars 386 forks source link

Kafka consumer hangs in consumer group issue ? #899

Open Maatary opened 5 years ago

Maatary commented 5 years ago

Hi I have recently upgrade to the akka-stream kafka version 1.0.5. My broker is version 2.3.0, and the client is 2.1.1.

I am observing this very but very strange behavior:

When I run multiple consumer as part of the same group, let say 4, then only 2 of them get messages from the brokers. If i run 6 of them only 3 of them receive message from the brokers.

the other keep hanging with the following log:

15:01:45.255 [default-akka.kafka.default-dispatcher-6] INFO  o.a.k.c.consumer.ConsumerConfig - ConsumerConfig values: 
    auto.commit.interval.ms = 5000
    auto.offset.reset = earliest
    bootstrap.servers = [dev-cp-kafka-headless:9092]
    check.crcs = true
    client.dns.lookup = default
    client.id = 
    connections.max.idle.ms = 540000
    default.api.timeout.ms = 60000
    enable.auto.commit = false
    exclude.internal.topics = true
    fetch.max.bytes = 52428800
    fetch.max.wait.ms = 10000
    fetch.min.bytes = 1
    group.id = elsevier-triple-store-builder-26
    heartbeat.interval.ms = 60000
    interceptor.classes = []
    internal.leave.group.on.close = true
    isolation.level = read_uncommitted
    key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
    max.partition.fetch.bytes = 1048576
    max.poll.interval.ms = 5000000
    max.poll.records = 10000
    metadata.max.age.ms = 300000
    metric.reporters = []
    metrics.num.samples = 2
    metrics.recording.level = INFO
    metrics.sample.window.ms = 30000
    partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
    receive.buffer.bytes = 65536
    reconnect.backoff.max.ms = 1000
    reconnect.backoff.ms = 50
    request.timeout.ms = 30000
    retry.backoff.ms = 100
    sasl.client.callback.handler.class = null
    sasl.jaas.config = null
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    sasl.kerberos.min.time.before.relogin = 60000
    sasl.kerberos.service.name = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    sasl.kerberos.ticket.renew.window.factor = 0.8
    sasl.login.callback.handler.class = null
    sasl.login.class = null
    sasl.login.refresh.buffer.seconds = 300
    sasl.login.refresh.min.period.seconds = 60
    sasl.login.refresh.window.factor = 0.8
    sasl.login.refresh.window.jitter = 0.05
    sasl.mechanism = GSSAPI
    security.protocol = PLAINTEXT
    send.buffer.bytes = 131072
    session.timeout.ms = 300000
    ssl.cipher.suites = null
    ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
    ssl.endpoint.identification.algorithm = https
    ssl.key.password = null
    ssl.keymanager.algorithm = SunX509
    ssl.keystore.location = null
    ssl.keystore.password = null
    ssl.keystore.type = JKS
    ssl.protocol = TLS
    ssl.provider = null
    ssl.secure.random.implementation = null
    ssl.trustmanager.algorithm = PKIX
    ssl.truststore.location = null
    ssl.truststore.password = null
    ssl.truststore.type = JKS
    value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer

15:01:45.397 [default-akka.kafka.default-dispatcher-6] INFO  o.a.kafka.common.utils.AppInfoParser - Kafka version : 2.1.1
15:01:45.397 [default-akka.kafka.default-dispatcher-6] INFO  o.a.kafka.common.utils.AppInfoParser - Kafka commitId : 21234bee31165527
15:01:46.133 [default-akka.kafka.default-dispatcher-22] INFO  org.apache.kafka.clients.Metadata - Cluster ID: cLqnPKhKRPuZtkhAFpcaow
15:01:46.201 [default-akka.kafka.default-dispatcher-8] INFO  o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=consumer-1, groupId=elsevier-triple-store-builder-26] Discovered group coordinator dev-cp-kafka-2.dev-cp-kafka-headless.dev:9092 (id: 2147483645 rack: null)
15:01:46.872 [default-akka.kafka.default-dispatcher-10] INFO  o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=consumer-1, groupId=elsevier-triple-store-builder-26] Revoking previously assigned partitions []
15:01:46.874 [default-akka.kafka.default-dispatcher-10] INFO  o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=consumer-1, groupId=elsevier-triple-store-builder-26] (Re-)joining group
15:06:35.500 [default-akka.kafka.default-dispatcher-14] INFO  o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=consumer-1, groupId=elsevier-triple-store-builder-26] Successfully joined group with generation 339
15:06:35.641 [default-akka.kafka.default-dispatcher-19] INFO  o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=consumer-1, groupId=elsevier-triple-store-builder-26] Setting newly assigned partitions [elsevier-proxy-reaxys-document-2, elsevier-proxy-reaxys-protein-4, elsevier-proxy-reaxys-bioactivity-0, elsevier-proxy-reaxys-parameter-0, elsevier-proxy-reaxys-higestclinicalphase-4, elsevier-proxy-reaxys-chemicalentityname-2, elsevier-proxy-reaxys-proteinassembly-3, elsevier-proxy-reaxys-administrationroute-1, elsevier-proxy-reaxys-compoundtype-1, elsevier-proxy-reaxys-bioassay-1, elsevier-proxy-reaxys-putativeactiontarget-4, elsevier-proxy-reaxys-tissuetype-2, elsevier-proxy-reaxys-reactiontype-1, elsevier-proxy-reaxys-agentconfiguration-2, elsevier-proxy-reaxys-biologicalmaterial-2, elsevier-proxy-reaxys-document-1, elsevier-proxy-reaxys-parameter-1, elsevier-proxy-reaxys-proteinsubunit-0, elsevier-proxy-reaxys-bioassay-0, elsevier-proxy-reaxys-protein-3, elsevier-proxy-reaxys-chemicalentityname-3, elsevier-proxy-reaxys-bioactivity-1, elsevier-proxy-reaxys-administrationroute-0, elsevier-proxy-reaxys-proteinassembly-2, elsevier-proxy-reaxys-compoundtype-2, elsevier-proxy-reaxys-measuresettings-0, elsevier-proxy-reaxys-putativeactiontarget-3, elsevier-proxy-reaxys-tissuetype-1, elsevier-proxy-reaxys-higestclinicalphase-3, elsevier-proxy-reaxys-agentconfiguration-1, elsevier-proxy-reaxys-reactiontype-2, elsevier-proxy-reaxys-biologicalmaterial-1, elsevier-proxy-reaxys-chemicalentityname-4, elsevier-proxy-reaxys-protein-2, elsevier-proxy-reaxys-parameter-2, elsevier-proxy-reaxys-celltype-0, elsevier-proxy-reaxys-bioactivity-2, elsevier-proxy-fda-spl-document-0, elsevier-proxy-reaxys-metabolicreaction-1, elsevier-proxy-reaxys-administrationroute-3, elsevier-proxy-reaxys-proteinsubunit-1, elsevier-proxy-reaxys-organism-3, elsevier-proxy-reaxys-tissuetype-4, elsevier-proxy-reaxys-formulainformation-0, elsevier-proxy-reaxys-chemicalentity-1, elsevier-proxy-reaxys-bioactivity-3, elsevier-proxy-reaxys-agentconfiguration-4, elsevier-proxy-reaxys-biologicalmaterial-4, elsevier-proxy-reaxys-proteinnature-0, elsevier-proxy-reaxys-agentrole-0, elsevier-proxy-reaxys-higestclinicalphase-2, elsevier-proxy-reaxys-document-4, elsevier-proxy-reaxys-parameter-3, elsevier-proxy-reaxys-protein-1, elsevier-proxy-reaxys-metabolicreaction-0, elsevier-proxy-reaxys-proteinassembly-4, elsevier-proxy-reaxys-administrationroute-2, elsevier-proxy-reaxys-compoundtype-0, elsevier-proxy-reaxys-proteinsubunit-2, elsevier-proxy-reaxys-organism-4, elsevier-proxy-reaxys-tissuetype-3, elsevier-proxy-reaxys-formulainformation-1, elsevier-proxy-reaxys-bioactivity-4, elsevier-proxy-reaxys-higestclinicalphase-1, elsevier-proxy-reaxys-agentconfiguration-3, elsevier-proxy-reaxys-proteinnature-1, elsevier-proxy-reaxys-reactiontype-0, elsevier-proxy-reaxys-biologicalmaterial-3, elsevier-proxy-reaxys-document-3, elsevier-proxy-reaxys-chemicalentity-0, elsevier-proxy-reaxys-protein-0, elsevier-proxy-reaxys-parameter-4, elsevier-proxy-reaxys-celltype-2, elsevier-proxy-reaxys-organism-1, elsevier-proxy-reaxys-metabolicreaction-3, elsevier-proxy-reaxys-measuresettings-3, elsevier-proxy-reaxys-proteinsubunit-3, elsevier-proxy-reaxys-formulainformation-2, elsevier-proxy-reaxys-putativeactiontarget-0, elsevier-proxy-reaxys-agentrole-2, elsevier-proxy-reaxys-higestclinicalphase-0, elsevier-proxy-reaxys-chemicalentity-3, elsevier-proxy-reaxys-proteinnature-2, elsevier-proxy-reaxys-celltype-1, elsevier-proxy-reaxys-metabolicreaction-2, elsevier-proxy-reaxys-measuresettings-4, elsevier-proxy-reaxys-administrationroute-4, elsevier-proxy-reaxys-proteinsubunit-4, elsevier-proxy-reaxys-organism-2, elsevier-proxy-reaxys-bioassay-4, elsevier-proxy-reaxys-formulainformation-3, elsevier-proxy-reaxys-chemicalentity-2, elsevier-proxy-reaxys-agentrole-1, elsevier-proxy-reaxys-proteinnature-3, elsevier-proxy-reaxys-document-0, elsevier-proxy-reaxys-chemicalentityname-0, elsevier-proxy-reaxys-measuresettings-1, elsevier-proxy-reaxys-proteinassembly-1, elsevier-proxy-reaxys-compoundtype-3, elsevier-proxy-reaxys-bioassay-3, elsevier-proxy-reaxys-putativeactiontarget-2, elsevier-proxy-reaxys-formulainformation-4, elsevier-proxy-reaxys-tissuetype-0, elsevier-proxy-reaxys-reactiontype-3, elsevier-proxy-reaxys-agentconfiguration-0, elsevier-proxy-reaxys-biologicalmaterial-0, elsevier-proxy-reaxys-proteinnature-4, elsevier-proxy-reaxys-agentrole-4, elsevier-proxy-reaxys-celltype-4, elsevier-proxy-reaxys-celltype-3, elsevier-proxy-reaxys-organism-0, elsevier-proxy-reaxys-chemicalentityname-1, elsevier-proxy-reaxys-metabolicreaction-4, elsevier-proxy-reaxys-measuresettings-2, elsevier-proxy-reaxys-proteinassembly-0, elsevier-proxy-reaxys-bioassay-2, elsevier-proxy-reaxys-compoundtype-4, elsevier-proxy-reaxys-putativeactiontarget-1, elsevier-proxy-reaxys-reactiontype-4, elsevier-proxy-reaxys-chemicalentity-4, elsevier-proxy-reaxys-agentrole-3]
15:06:35.653 [default-akka.kafka.default-dispatcher-19] INFO  o.a.k.c.consumer.internals.Fetcher - [Consumer clientId=consumer-1, groupId=elsevier-triple-store-builder-26] Resetting offset for partition elsevier-proxy-reaxys-putativeactiontarget-3 to offset 0.
15:06:35.653 [default-akka.kafka.default-dispatcher-19] INFO  o.a.k.c.consumer.internals.Fetcher - [Consumer clientId=consumer-1, groupId=elsevier-triple-store-builder-26] Resetting offset for partition elsevier-proxy-reaxys-compoundtype-0 to offset 0.
15:06:35.653 [default-akka.kafka.default-dispatcher-19] INFO  o.a.k.c.consumer.internals.Fetcher - [Consumer clientId=consumer-1, groupId=elsevier-triple-store-builder-26] Resetting offset for partition elsevier-proxy-reaxys-higestclinicalphase-2 to offset 0.
15:06:35.653 [default-akka.kafka.default-dispatcher-19] INFO  o.a.k.c.consumer.internals.Fetcher - [Consumer clientId=consumer-1, groupId=elsevier-triple-store-builder-26] Resetting offset for partition elsevier-proxy-reaxys-compoundtype-3 to offset 0.
15:06:35.653 [default-akka.kafka.default-dispatcher-19] INFO  o.a.k.c.consumer.internals.Fetcher - [Consumer clientId=consumer-1, groupId=elsevier-triple-store-builder-26] Resetting offset for partition elsevier-proxy-reaxys-proteinnature-2 to offset 0.
15:06:35.657 [default-akka.kafka.default-dispatcher-19] INFO  o.a.k.c.consumer.internals.Fetcher - [Consumer clientId=consumer-1, groupId=elsevier-triple-store-builder-26] Resetting offset for partition elsevier-proxy-reaxys-higestclinicalphase-4 to offset 0.
15:06:35.657 [default-akka.kafka.default-dispatcher-19] INFO  o.a.k.c.consumer.internals.Fetcher - [Consumer clientId=consumer-1, groupId=elsevier-triple-store-builder-26] Resetting offset for partition elsevier-proxy-reaxys-proteinnature-1 to offset 0.
15:06:35.657 [default-akka.kafka.default-dispatcher-19] INFO  o.a.k.c.consumer.internals.Fetcher - [Consumer clientId=consumer-1, groupId=elsevier-triple-store-builder-26] Resetting offset for partition elsevier-proxy-fda-spl-document-0 to offset 0.
15:06:35.657 [default-akka.kafka.default-dispatcher-19] INFO  o.a.k.c.consumer.internals.Fetcher - [Consumer clientId=consumer-1, groupId=elsevier-triple-store-builder-26] Resetting offset for partition elsevier-proxy-reaxys-compoundtype-2 to offset 0.
15:06:35.657 [default-akka.kafka.default-dispatcher-19] INFO  o.a.k.c.consumer.internals.Fetcher - [Consumer clientId=consumer-1, groupId=elsevier-triple-store-builder-26] Resetting offset for partition elsevier-proxy-reaxys-proteinnature-4 to offset 0.
15:06:35.657 [default-akka.kafka.default-dispatcher-19] INFO  o.a.k.c.consumer.internals.Fetcher - [Consumer clientId=consumer-1, groupId=elsevier-triple-store-builder-26] Resetting offset for partition elsevier-proxy-reaxys-putativeactiontarget-4 to offset 0.
15:06:35.657 [default-akka.kafka.default-dispatcher-19] INFO  o.a.k.c.consumer.internals.Fetcher - [Consumer clientId=consumer-1, groupId=elsevier-triple-store-builder-26] Resetting offset for partition elsevier-proxy-reaxys-proteinnature-0 to offset 0.
15:06:35.657 [default-akka.kafka.default-dispatcher-19] INFO  o.a.k.c.consumer.internals.Fetcher - [Consumer clientId=consumer-1, groupId=elsevier-triple-store-builder-26] Resetting offset for partition elsevier-proxy-reaxys-higestclinicalphase-3 to offset 0.
15:06:35.657 [default-akka.kafka.default-dispatcher-19] INFO  o.a.k.c.consumer.internals.Fetcher - [Consumer clientId=consumer-1, groupId=elsevier-triple-store-builder-26] Resetting offset for partition elsevier-proxy-reaxys-compoundtype-1 to offset 0.
15:06:35.657 [default-akka.kafka.default-dispatcher-19] INFO  o.a.k.c.consumer.internals.Fetcher - [Consumer clientId=consumer-1, groupId=elsevier-triple-store-builder-26] Resetting offset for partition elsevier-proxy-reaxys-proteinnature-3 to offset 0.
15:06:35.657 [default-akka.kafka.default-dispatcher-19] INFO  o.a.k.c.consumer.internals.Fetcher - [Consumer clientId=consumer-1, groupId=elsevier-triple-store-builder-26] Resetting offset for partition elsevier-proxy-reaxys-compoundtype-4 to offset 0.
15:06:35.657 [default-akka.kafka.default-dispatcher-19] INFO  o.a.k.c.consumer.internals.Fetcher - [Consumer clientId=consumer-1, groupId=elsevier-triple-store-builder-26] Resetting offset for partition elsevier-proxy-reaxys-agentrole-3 to offset 0.
15:11:26.880 [default-akka.kafka.default-dispatcher-17] INFO  o.a.k.clients.FetchSessionHandler - [Consumer clientId=consumer-1, groupId=elsevier-triple-store-builder-26] Node 0 was unable to process the fetch request with (sessionId=2019035881, epoch=29): FETCH_SESSION_ID_NOT_FOUND.
15:11:36.913 [default-akka.kafka.default-dispatcher-8] INFO  o.a.k.clients.FetchSessionHandler - [Consumer clientId=consumer-1, groupId=elsevier-triple-store-builder-26] Node 2 was unable to process the fetch request with (sessionId=295384863, epoch=30): FETCH_SESSION_ID_NOT_FOUND.
15:11:36.913 [default-akka.kafka.default-dispatcher-8] INFO  o.a.k.clients.FetchSessionHandler - [Consumer clientId=consumer-1, groupId=elsevier-triple-store-builder-26] Node 1 was unable to process the fetch request with (sessionId=1568797267, epoch=30): FETCH_SESSION_ID_NOT_FOUND.
15:16:08.090 [default-akka.kafka.default-dispatcher-17] INFO  o.a.k.clients.FetchSessionHandler - [Consumer clientId=consumer-1, groupId=elsevier-triple-store-builder-26] Node 0 was unable to process the fetch request with (sessionId=424717464, epoch=28): FETCH_SESSION_ID_NOT_FOUND.
15:16:38.122 [default-akka.kafka.default-dispatcher-9] INFO  o.a.k.clients.FetchSessionHandler - [Consumer clientId=consumer-1, groupId=elsevier-triple-store-builder-26] Node 2 was unable to process the fetch request with (sessionId=1878455433, epoch=30): FETCH_SESSION_ID_NOT_FOUND.
15:16:38.123 [default-akka.kafka.default-dispatcher-9] INFO  o.a.k.clients.FetchSessionHandler - [Consumer clientId=consumer-1, groupId=elsevier-triple-store-builder-26] Node 1 was unable to process the fetch request with (sessionId=1382963023, epoch=30): FETCH_SESSION_ID_NOT_FOUND.
15:20:38.890 [default-akka.kafka.default-dispatcher-13] INFO  o.a.k.clients.FetchSessionHandler - [Consumer clientId=consumer-1, groupId=elsevier-triple-store-builder-26] Node 0 was unable to process the fetch request with (sessionId=861824997, epoch=27): FETCH_SESSION_ID_NOT_FOUND.
15:21:39.033 [default-akka.kafka.default-dispatcher-11] INFO  o.a.k.clients.FetchSessionHandler - [Consumer clientId=consumer-1, groupId=elsevier-triple-store-builder-26] Node 2 was unable to process the fetch request with (sessionId=2058171454, epoch=30): FETCH_SESSION_ID_NOT_FOUND.
15:21:39.034 [default-akka.kafka.default-dispatcher-11] INFO  o.a.k.clients.FetchSessionHandler - [Consumer clientId=consumer-1, groupId=elsevier-triple-store-builder-26] Node 1 was unable to process the fetch request with (sessionId=750170814, epoch=30): FETCH_SESSION_ID_NOT_FOUND.

I wonder if that rings a bell somehow

Note that my topics have 30 partitions each. I am subscribing to multiple topics at once.

EDIT1

It is not necessarily half strictly or always. In general i always have one or 2 in the group working and the other hangs

ennru commented 5 years ago

This looks like a problem in the Kafka client. You could try overriding the Kafka client Alpakka uses to Kafka 2.3. Please report back if that does help.

kvmuralidhar commented 5 years ago

Hi ennru,

I upgraded the Kafka client to 2.3.0 and the info message is showing up in the logs on a continuous basis.

[2019-11-25T15:12:32,778][INFO ][org.apache.kafka.common.utils.AppInfoParser] Kafka version: 2.3.0

[Consumer clientId=logstash-1, groupId=ls_proxy] Node 339862361 was unable to process the fetch request with (sessionId=555954710, epoch= 1502): FETCH_SESSION_ID_NOT_FOUND.

Thanks Murali

ramyamagham commented 4 years ago

@ennru You seem to have closed this ticket, but I do have issues with akka-stream 2.6.1, akka-stream-kafka is 2.0.2 and Kafka clients 2.4.0 in my project. I see info logs such as below and then the consumption on some partitions stop around when these lines start getting logged.

Node 549730437 was unable to process the fetch request with (sessionId=605783777, epoch=13889): FETCH_SESSION_ID_NOT_FOUND.]
Node 295448087 was unable to process the fetch request with (sessionId=1293138871, epoch=13807): FETCH_SESSION_ID_NOT_FOUND.] 
ennru commented 4 years ago

This issue is still open. We have not gotten enough information of what might be causing this problem.

What Kafka version is your broker on?

sxganapa commented 4 years ago

I am also getting FETCH_SESSION_ID_NOT_FOUND. (org.apache.kafka.clients.FetchSessionHandler) when using Kafka connect. Can some one explain what this means? I am on 2.3 version of Kafka.

naresh-kotha-ck commented 4 years ago

Did we find any thing for here I am also getting the same error

naresh-kotha-ck commented 4 years ago

We have kafka broker version of 2.5.1 and we are using alpakka version of 1.04 and overwritten kafka clients to 2.5.1 and we are still seeing this issue. Can some one help me on this.