spring-cloud / spring-cloud-stream-samples

Samples for Spring Cloud Stream
Apache License 2.0
959 stars 677 forks source link

kafka-streams-word-count error when running KafkaStreamsWordCountApplicationTests #183

Closed gborobio73 closed 4 years ago

gborobio73 commented 4 years ago

Hello! Thank you for the examples, they are very useful. I have a problem when running KafkaStreamsWordCountApplicationTests from kafka-streams-word-count. First I get this error due using Java 14:

java.lang.IllegalArgumentException: Unable to canonicalize address 127.0.0.1/<unresolved>:59201 because it's not resolvable

Then I switch to newer Kafka as explained here but the test still fails. First I see this error in the logs:

2020-06-09 08:16:36,182 ERROR hello-word-count-sample-58e3b82d-9e56-4f85-9067-5eab473cd5f1-StreamThread-1 o.a.z.s.NIOServerCnxnFactory:92 - Thread  StreamsThread threadId: hello-word-count-sample-58e3b82d-9e56-4f85-9067-5eab473cd5f1-StreamThread-1
TaskManager
    MetadataState:
        GlobalMetadata: []
        GlobalStores: []
        My HostInfo: HostInfo{host='unknown', port=-1}
        null
    Active tasks:
        Running:
        Suspended:
        New:
        Restoring:
    Standby tasks:
        Running:
        Suspended:
        New:
 died
java.lang.NoSuchMethodError: 'java.lang.Object org.apache.kafka.common.utils.Utils.notNull(java.lang.Object)'
    at org.apache.kafka.streams.processor.internals.InternalTopicManager.makeReady(InternalTopicManager.java:110)
    at org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.prepareTopic(StreamsPartitionAssignor.java:1049)
    at org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.assign(StreamsPartitionAssignor.java:545)
    at org.apache.kafka.clients.consumer.internals.PartitionAssignorAdapter.assign(PartitionAssignorAdapter.java:59)
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.performAssignment(ConsumerCoordinator.java:548)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onJoinLeader(AbstractCoordinator.java:650)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.access$1300(AbstractCoordinator.java:111)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:572)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:555)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1026)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1006)
    at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:204)
    at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:167)
    at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:127)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:599)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:409)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:294)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:212)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:400)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:340)
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:471)
    at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1267)
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1231)
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1211)
    at org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:963)
    at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:863)
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:819)
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:788)

Then it gets stuck at

2020-06-09 08:16:37,703  INFO controller-event-thread k.c.KafkaController:66 - [Controller id=0] Processing automatic preferred replica leader election

for about one minute and starts the shutting down process.

Test fails with:

java.lang.AssertionError:

Expecting:
 <0>
to be greater than or equal to:
 <1>
    at kafka.streams.word.count.KafkaStreamsWordCountApplicationTests.testKafkaStreamsWordCountProcessor(KafkaStreamsWordCountApplicationTests.java:89)

I really appreciate your help!

Thanks a lot!

gborobio73 commented 4 years ago

Hello! The issue was due not having latest kafka streams dependency. I updated to 2.4.1 and works perfectly.

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-streams</artifactId>
    <version>2.4.1</version>
</dependency>
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-streams-test-utils</artifactId>
    <version>2.4.1</version>
    <scope>test</scope>
</dependency>

Thank you!