thingsboard / thingsboard

Open-source IoT Platform - Device management, data collection, processing and visualization.
https://thingsboard.io
Apache License 2.0
17.4k stars 5.14k forks source link

Kafka consumer stop fetching message from broker[Bug] #3639

Closed zhangxianwei2015 closed 2 years ago

zhangxianwei2015 commented 3 years ago

Describe the bug the consumer group "tb-node-rule-engine and tb-node-transport-api" some times show lags and message keep increase without consume. It turn out Kafka-clients 2.3.0 has an issue https://github.com/apache/kafka/pull/7511, in some case, the consumer will stop fetcher messages from broker. it fixed in 2.3.1.

Your Server Environment

vin01 commented 3 years ago

I am also observing the same behavior, in my case there are 2 Thingsboard nodes and at times some consumers just disappear for tb_rule_engine.main.* topics and lag simply keeps building up. Restart of either node helps temporarily but it keeps happening.

Version: latest 3.2

ashvayka commented 3 years ago

Hi @YevhenBondarenko , please fix this in 3.2.1

vin01 commented 3 years ago

Looks like this is fixed in 3.2.1, I am yet to fully test it with 2 TB nodes running in parallel though, will update here once I do.

It looks like part of the fix is also removal of service id from the consumer group names in https://github.com/thingsboard/thingsboard/commit/4de258f2aed48e4e6df6191a0b2bfa6c71810d2a ?

Because of these unique group ids, until now it wasn't really possible for the second TB node to keep reading in same consumer group even when first one went down.

Raven888888 commented 2 years ago

I am still seeing this issue in tb CE 3.3.3 with wurstmeister/kafka 2.13-2.8.1. Once kafka client hits disconnectExcepion:null, the tb totally goes down, with the following logs

2022-02-21 19:12:02,336 [cassandra-callback-6-thread-1034] ERROR c.g.c.u.concurrent.AggregateFuture - Got more than one input Future failure. Logging failur>
com.datastax.oss.driver.api.core.NoNodeAvailableException: No node was available to execute the query
        at com.datastax.oss.driver.api.core.AllNodesFailedException.fromErrors(AllNodesFailedException.java:53)
        at com.datastax.oss.driver.internal.core.cql.CqlRequestHandler.sendRequest(CqlRequestHandler.java:261)
        at com.datastax.oss.driver.internal.core.cql.CqlRequestHandler.onThrottleReady(CqlRequestHandler.java:194)
        at com.datastax.oss.driver.internal.core.session.throttling.PassThroughRequestThrottler.register(PassThroughRequestThrottler.java:52)
        at com.datastax.oss.driver.internal.core.cql.CqlRequestHandler.<init>(CqlRequestHandler.java:171)
        at com.datastax.oss.driver.internal.core.cql.CqlRequestAsyncProcessor.process(CqlRequestAsyncProcessor.java:44)
        at com.datastax.oss.driver.internal.core.cql.CqlRequestAsyncProcessor.process(CqlRequestAsyncProcessor.java:29)
        at org.thingsboard.server.dao.cassandra.guava.GuavaRequestAsyncProcessor.process(GuavaRequestAsyncProcessor.java:63)
        at org.thingsboard.server.dao.cassandra.guava.GuavaRequestAsyncProcessor.process(GuavaRequestAsyncProcessor.java:35)
        at com.datastax.oss.driver.internal.core.session.DefaultSession.execute(DefaultSession.java:230)
        at com.datastax.oss.driver.internal.core.session.SessionWrapper.execute(SessionWrapper.java:115)
        at org.thingsboard.server.dao.cassandra.guava.GuavaSession.executeAsync(GuavaSession.java:52)
        at org.thingsboard.server.dao.nosql.CassandraStatementTask.executeAsync(CassandraStatementTask.java:40)
        at org.thingsboard.server.dao.nosql.CassandraBufferedRateWriteExecutor.execute(CassandraBufferedRateWriteExecutor.java:89)
        at org.thingsboard.server.dao.util.AbstractBufferedRateExecutor.dispatch(AbstractBufferedRateExecutor.java:185)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:829)