AxonFramework / extension-kafka

Axon Framework extension for Kafka integration to publish and handle Event messages.
https://axoniq.io/
Apache License 2.0
66 stars 28 forks source link

Upgrade of a patch release introduced a deprecated methods and breaks in runtime #305

Closed zambrovski closed 2 years ago

zambrovski commented 2 years ago

Basic information

Steps to reproduce

Switched from 4.5.3 to 4.5.4

Expected behaviour

No errors in log, since it is a patch release.

Actual behaviour

2022-07-05 15:18:10.006  INFO 517797 --- [nanceService]-0] o.a.kafka.common.utils.AppInfoParser     : Kafka version: 3.1.1
2022-07-05 15:18:10.006  INFO 517797 --- [nanceService]-0] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId: 97671528ba54a138
2022-07-05 15:18:10.006  INFO 517797 --- [nanceService]-0] o.a.kafka.common.utils.AppInfoParser     : Kafka startTimeMs: 1657027090004
2022-07-05 15:18:10.153  INFO 517797 --- [MongoService]-0] org.apache.kafka.clients.Metadata        : [Consumer clientId=tasklist-localhost-polyflow, groupId=null] Cluster ID: vo0yCvCLQAWtvtIhuY_mWQ
2022-07-05 15:18:10.153  INFO 517797 --- [nanceService]-0] org.apache.kafka.clients.Metadata        : [Consumer clientId=tasklist-localhost-filter-options, groupId=null] Cluster ID: vo0yCvCLQAWtvtIhuY_mWQ
2022-07-05 15:18:10.155  INFO 517797 --- [MongoService]-0] o.a.k.clients.consumer.KafkaConsumer     : [Consumer clientId=tasklist-localhost-polyflow, groupId=null] Subscribed to partition(s): tasklist-eventbus-0
2022-07-05 15:18:10.155  INFO 517797 --- [nanceService]-0] o.a.k.clients.consumer.KafkaConsumer     : [Consumer clientId=tasklist-localhost-filter-options, groupId=null] Unsubscribed all topics or patterns and assigned partitions
2022-07-05 15:18:10.157  INFO 517797 --- [MongoService]-0] o.a.e.k.e.consumer.ConsumerSeekUtil      : Seeking topic-partition [tasklist-eventbus-0] with offset [9]
2022-07-05 15:18:10.157  INFO 517797 --- [MongoService]-0] o.a.k.clients.consumer.KafkaConsumer     : [Consumer clientId=tasklist-localhost-polyflow, groupId=null] Seeking to offset 9 for partition tasklist-eventbus-0
2022-07-05 15:18:10.159  WARN 517797 --- [ AsyncFetcher-0] o.a.e.k.e.consumer.FetchEventsTask       : Encountered an exception fetching ConsumerRecords

java.lang.IllegalStateException: Consumer is not subscribed to any topics or assigned any partitions
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1223) ~[kafka-clients-3.1.1.jar:na]
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1211) ~[kafka-clients-3.1.1.jar:na]
    at org.axonframework.extensions.kafka.eventhandling.consumer.FetchEventsTask.run(FetchEventsTask.java:90) ~[axon-kafka-4.5.4.jar:4.5.4]
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) ~[na:na]
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) ~[na:na]
    at java.base/java.lang.Thread.run(Thread.java:829) ~[na:na]

2022-07-05 15:18:10.159  WARN 517797 --- [nanceService]-0] o.a.e.TrackingEventProcessor             : Error occurred. Starting retry mode.

org.axonframework.extensions.kafka.eventhandling.consumer.FetchEventException: Cannot proceed with fetching ConsumerRecords since we encountered an exception
    at org.axonframework.extensions.kafka.eventhandling.consumer.FetchEventsTask.run(FetchEventsTask.java:96) ~[axon-kafka-4.5.4.jar:4.5.4]
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) ~[na:na]
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) ~[na:na]
    at java.base/java.lang.Thread.run(Thread.java:829) ~[na:na]
Caused by: java.lang.IllegalStateException: Consumer is not subscribed to any topics or assigned any partitions
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1223) ~[kafka-clients-3.1.1.jar:na]
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1211) ~[kafka-clients-3.1.1.jar:na]
    at org.axonframework.extensions.kafka.eventhandling.consumer.FetchEventsTask.run(FetchEventsTask.java:90) ~[axon-kafka-4.5.4.jar:4.5.4]
    ... 3 common frames omitted

2022-07-05 15:18:10.160  WARN 517797 --- [nanceService]-0] o.a.e.TrackingEventProcessor             : Releasing claim on token and preparing for retry in 1s
gklijs commented 2 years ago

Hi Simon, I'll try to reproduce the problem locally. Could you please share the kafka server / broker version used? Not sure if that's relevant but it might be helpful.

zambrovski commented 2 years ago

I observed the same behavior on:

I believe this is not a server problem.

gklijs commented 2 years ago

Hi @zambrovski, with Redpanda and a mimimal setup, it is working. If you want, this is the app I created to test. Do you maybe also use the consumer directly to either commit offsets or to subscribe to a topic?

zambrovski commented 2 years ago

Hi Gerard. We Don't use the consumer directly but use Spring Cloud Streams Kafka only. The one idea I have on it is a reconnect. Since we use it with Azure Event Hubs we observe a "relogin" because of the token expiration in Azure. I believe this could cause some problems with the consumer not assigned to anything... And - it is not deterministic, but happens eventually.

This is probably difficult to reproduce but I'll send some time on that and will try to create a test setup for this.

Just as a side note - deprecation of two methods and changing the implementation from using group id to not using group id but manually assigning partitions is to me not a patch-level change. Since it was a patch level (reported by a renovate-bot), we weaved without double checking and got a surprising result.... To me - it should be at least "minor" - the bahaviour changed.

gklijs commented 2 years ago

The change certainly is debateble, I figured as a big fix. Also the maintenance branch still has the 'this is a beta' in the readme 😉. To me it also made little since to supply a groupid, and then turn it into something random.

Having said that, I wasn't expecting thing like this to pop up. I'm not sure of the exception could be catched, and then just have the app consumer reassign the partitions would work. Or call assingment() on the consumer each time before polling, but that seems a bit weird.

zambrovski commented 2 years ago

Gerard,

I was somehow hunting a ghost. After I set the group related attributes to null and deployed everything, it seem to be running without any problems.

I'll close this issue with a wish to communicate changes (like deprecation of methods and changing behaviour) using semantic versioning.

Sorry,

Simon

gklijs commented 2 years ago

Hi Simon,

Good to hear it was ok in the and. I'll also asked in the Kafka community if someone had similar issues, but nobody responded. The tricky thing with the fix was that it was something between a bugfix and an enhancement. Not deprecating the methods, but only effectively make them not do anything anymore would also have been weird.