spring-attic / spring-integration-kafka

Apache License 2.0
324 stars 180 forks source link

TimeoutException on upgrading spring-kafka libraries to 2.1.0 or above #269

Closed ahubli closed 5 years ago

ahubli commented 5 years ago

Hello, I am working on enterprise java application that polls records from kafka queue. We upgraded spring-kafka library from 1.1.2 to 2.1.0 or above. We have tried, 2.1.0, 2.1.5, 2.1.8, 2.2.5 version of library. On doing this, we are seeing TimeoutException: console: 2019-05-10 15:37:45,804 [messageConsumer-0-C-1] INFO o.a.k.c.Metadata fsreqid=N/A AppId=Core - Cluster ID: gVi0qsqnQeSUR9R26fgWhA console: 2019-05-10 15:37:45,810 [messageConsumer-0-C-1] INFO .a.k.c.c.i.AbstractCoordinator fsreqid=N/A AppId=Core - [Consumer clientId=kafkaclientidtest-0, groupId=KafkaConsumerGroupId] Discovered group coordinator vc2crtp2365101np.fmr.com:9094 (id: 2147483646 rack: null) console: 2019-05-10 15:37:45,814 [messageConsumer-0-C-1] INFO .a.k.c.c.i.ConsumerCoordinator fsreqid=N/A AppId=Core - [Consumer clientId=kafkaclientidtest-0, groupId=KafkaConsumerGroupId] Revoking previously assigned partitions [] console: 2019-05-10 15:37:45,814 [messageConsumer-0-C-1] INFO .KafkaMessageListenerContainer fsreqid=N/A AppId=Core - partitions revoked: [] console: 2019-05-10 15:37:45,815 [messageConsumer-0-C-1] INFO .a.k.c.c.i.AbstractCoordinator fsreqid=N/A AppId=Core - [Consumer clientId=kafkaclientidtest-0, groupId=KafkaConsumerGroupId] (Re-)joining group console: 2019-05-10 15:37:46,955 [messageConsumer-0-C-1] INFO .a.k.c.c.i.AbstractCoordinator fsreqid=N/A AppId=Core - [Consumer clientId=kafkaclientidtest-0, groupId=KafkaConsumerGroupId] Successfully joined group with generation 48 console: 2019-05-10 15:37:46,963 [messageConsumer-0-C-1] INFO .a.k.c.c.i.ConsumerCoordinator fsreqid=N/A AppId=Core - [Consumer clientId=kafkaclientidtest-0, groupId=KafkaConsumerGroupId] Setting newly assigned partitions [kafkatopictest-0, kafkatopictest-5, kafkatopictest-1, kafkatopictest-2, kafkatopictest-3, kafkatopictest-4] console: 2019-05-10 15:37:47,030 [messageConsumer-0-C-1] INFO .KafkaMessageListenerContainer fsreqid=N/A AppId=Core - partitions assigned: [kafkatopictest-0, kafkatopictest-5, kafkatopictest-1, kafkatopictest-2, kafkatopictest-3, kafkatopictest-4] console: 2019-05-10 15:37:48,984 [messageConsumer-0-C-1] INFO o.a.k.c.FetchSessionHandler fsreqid=N/A AppId=Core - [Consumer clientId=kafkaclientidtest-0, groupId=KafkaConsumerGroupId] Node 3 sent an invalid full fetch response with extra=(kafkatopictest-5, kafkatopictest-2, response=( console: 2019-05-10 15:37:49,003 [messageConsumer-0-C-1] INFO o.a.k.c.FetchSessionHandler fsreqid=N/A AppId=Core - [Consumer clientId=kafkaclientidtest-0, groupId=KafkaConsumerGroupId] Node 5 sent an invalid full fetch response with extra=(kafkatopictest-1, kafkatopictest-4, response=( console: 2019-05-10 15:37:49,035 [messageConsumer-0-C-1] INFO o.a.k.c.FetchSessionHandler fsreqid=N/A AppId=Core - [Consumer clientId=kafkaclientidtest-0, groupId=KafkaConsumerGroupId] Node 4 sent an invalid full fetch response with extra=(kafkatopictest-0, kafkatopictest-3, response=( console: 2019-05-10 15:38:19,084 [messageConsumer-0-C-1] INFO o.a.k.c.FetchSessionHandler fsreqid=N/A AppId=Core - [Consumer clientId=kafkaclientidtest-0, groupId=KafkaConsumerGroupId] Error sending fetch request (sessionId=INVALID, epoch=INITIAL) to node 5: org.apache.kafka.common.errors.TimeoutException: Failed to send request after 30000 ms.. console: 2019-05-10 15:38:19,084 [messageConsumer-0-C-1] INFO o.a.k.c.FetchSessionHandler fsreqid=N/A AppId=Core - [Consumer clientId=kafkaclientidtest-0, groupId=KafkaConsumerGroupId] Error sending fetch request (sessionId=INVALID, epoch=INITIAL) to node 3: org.apache.kafka.common.errors.TimeoutException: Failed to send request after 30000 ms.. console: 2019-05-10 15:38:19,084 [kafka-coordinator-heartbeat-thread | KafkaConsumerGroupId] INFO o.a.k.c.FetchSessionHandler fsreqid=N/A AppId=Core - [Consumer clientId=kafkaclientidtest-0, groupId=KafkaConsumerGroupId] Error sending fetch request (sessionId=INVALID, epoch=INITIAL) to node 4: org.apache.kafka.common.errors.TimeoutException: Failed to send request after 30000 ms.. console: 2019-05-10 15:38:49,086 [messageConsumer-0-C-1] INFO o.a.k.c.FetchSessionHandler fsreqid=N/A AppId=Core - [Consumer clientId=kafkaclientidtest-0, groupId=KafkaConsumerGroupId] Error sending fetch request (sessionId=INVALID, epoch=INITIAL) to node 5: org.apache.kafka.common.errors.TimeoutException: Failed to send request after 30000 ms.. console: 2019-05-10 15:38:49,086 [kafka-coordinator-heartbeat-thread | KafkaConsumerGroupId] INFO o.a.k.c.FetchSessionHandler fsreqid=N/A AppId=Core - [Consumer clientId=kafkaclientidtest-0, groupId=KafkaConsumerGroupId] Error sending fetch request (sessionId=INVALID, epoch=INITIAL) to node 4: org.apache.kafka.common.errors.TimeoutException: Failed to send request after 30000 ms.. console: 2019-05-10 15:38:49,086 [messageConsumer-0-C-1] INFO o.a.k.c.FetchSessionHandler fsreqid=N/A AppId=Core - [Consumer clientId=kafkaclientidtest-0, groupId=KafkaConsumerGroupId] Error sending fetch request (sessionId=INVALID, epoch=INITIAL) to node 3: org.apache.kafka.common.errors.TimeoutException: Failed to send request after 30000 ms.. console: 2019-05-10 15:39:19,089 [messageConsumer-0-C-1] INFO o.a.k.c.FetchSessionHandler fsreqid=N/A AppId=Core - [Consumer clientId=kafkaclientidtest-0, groupId=KafkaConsumerGroupId] Error sending fetch request (sessionId=INVALID, epoch=INITIAL) to node 5: org.apache.kafka.common.errors.TimeoutException: Failed to send request after 30000 ms.. console: 2019-05-10 15:39:19,089 [kafka-coordinator-heartbeat-thread | KafkaConsumerGroupId] INFO o.a.k.c.FetchSessionHandler fsreqid=N/A AppId=Core - [Consumer clientId=kafkaclientidtest-0, groupId=KafkaConsumerGroupId] Error sending fetch request (sessionId=INVALID, epoch=INITIAL) to node 4: org.apache.kafka.common.errors.TimeoutException: Failed to send request after 30000 ms.. console: 2019-05-10 15:39:19,089 [messageConsumer-0-C-1] INFO o.a.k.c.FetchSessionHandler fsreqid=N/A AppId=Core - [Consumer clientId=kafkaclientidtest-0, groupId=KafkaConsumerGroupId] Error sending fetch request (sessionId=INVALID, epoch=INITIAL) to node 3: org.apache.kafka.common.errors.TimeoutException: Failed to send request after 30000 ms.. console: 2019-05-10 15:39:37,060 [messageConsumer-0-C-1] INFO o.a.k.c.FetchSessionHandler fsreqid=N/A AppId=Core - [Consumer clientId=kafkaclientidtest-0, groupId=KafkaConsumerGroupId] Node 5 sent an invalid full fetch response with extra=(kafkatopictest-1, kafkatopictest-4, response=( console: 2019-05-10 15:39:49,190 [messageConsumer-0-C-1] INFO o.a.k.c.FetchSessionHandler fsreqid=N/A AppId=Core - [Consumer clientId=kafkaclientidtest-0, groupId=KafkaConsumerGroupId] Error sending fetch request (sessionId=INVALID, epoch=INITIAL) to node 4: org.apache.kafka.common.errors.TimeoutException: Failed to send request after 30000 ms.. console: 2019-05-10 15:39:49,190 [kafka-coordinator-heartbeat-thread | KafkaConsumerGroupId] INFO o.a.k.c.FetchSessionHandler fsreqid=N/A AppId=Core - [Consumer clientId=kafkaclientidtest-0, groupId=KafkaConsumerGroupId] Error sending fetch request (sessionId=INVALID, epoch=INITIAL) to node 3: org.apache.kafka.common.errors.TimeoutException: Failed to send request after 30000 ms.. console: 2019-05-10 15:40:07,140 [messageConsumer-0-C-1] INFO o.a.k.c.FetchSessionHandler fsreqid=N/A AppId=Core - [Consumer clientId=kafkaclientidtest-0, groupId=KafkaConsumerGroupId] Error sending fetch request (sessionId=INVALID, epoch=INITIAL) to node 5: org.apache.kafka.common.errors.TimeoutException: Failed to send request after 30000 ms.. console: 2019-05-10 15:40:19,290 [messageConsumer-0-C-1] INFO o.a.k.c.FetchSessionHandler fsreqid=N/A AppId=Core - [Consumer clientId=kafkaclientidtest-0, groupId=KafkaConsumerGroupId] Error sending fetch request (sessionId=INVALID, epoch=INITIAL) to node 4: org.apache.kafka.common.errors.TimeoutException: Failed to send request after 30000 ms.. console: 2019-05-10 15:40:19,290 [kafka-coordinator-heartbeat-thread | KafkaConsumerGroupId] INFO o.a.k.c.FetchSessionHandler fsreqid=N/A AppId=Core - [Consumer clientId=kafkaclientidtest-0, groupId=KafkaConsumerGroupId] Error sending fetch request (sessionId=INVALID, epoch=INITIAL) to node 3: org.apache.kafka.common.errors.TimeoutException: Failed to send request after 30000 ms..

If we revert these libraries to 1.1.2, application runs fine locally. But when deployed on server, it fails to run from docker with this error:

org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:327) at org.apache.kafka.common.network.Selector.poll(Selector.java:303) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:349) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:226) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:188) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:207) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:193) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:275) at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1030) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995) at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:532) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.lang.Thread.run(Thread.java:748) level: ERROR logger: org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer message: Container exception

Is it because -n is appended to clientid and topic. Any suggestion or feedback is much appreciated.

garyrussell commented 5 years ago

The client.id is just an identifier; it is described as

public static final String CLIENT_ID_DOC = "An id string to pass to the server when making requests. The purpose of this is to be able to track the source of requests beyond just ip/port by allowing a logical application name to be included in server-side request logging.";

the -n is appended because if you add concurrency to the container, it has to be unique.

What do you mean by "and topic" in

s it because -n is appended to clientid and topic.

?

groupId=KafkaConsumerGroupId] Setting newly assigned partitions [kafkatopictest-0, kafkatopictest-5, kafkatopictest-1, kafkatopictest-2, kafkatopictest-3, kafkatopictest-4] console: 2019-05-10 15:37:47,030 [messageConsumer-0-C-1] INFO .KafkaMessageListenerContainer fsreqid=N/A AppId=Core - partitions assigned: [kafkatopictest-0, kafkatopictest-5, kafkatopictest-1, kafkatopictest-2, kafkatopictest-3, kafkatopictest-4]

The -n there is the partition number that was assigned.

You need to provide much more information (configuration, broker versions etc).

I have seen these timeouts when the SSL configuration is mismatched between the client and broker(s). But it looks like something else since topic assignment seems to be in process.

To be honest, this really should be directed at the Kafka folks, not Spring.

You would be better off posting on Stack Overflow tagged with [spring-kafka] and [apache-kafka] and you'll reach a wider audience. If you do so, again, provide much more information.

artembilan commented 5 years ago

Ins't that a problem of Kafka Broker and Client incompatibility?

garyrussell commented 5 years ago

Closing due to inactivity.