IBMStreams / streamsx.kafka

Repository for integration with Apache Kafka
https://ibmstreams.github.io/streamsx.kafka/
Apache License 2.0
13 stars 9 forks source link

Resetting from checkpoint will fail when sequence id is >1000 #171

Closed thomas-mattsson closed 5 years ago

thomas-mattsson commented 5 years ago

All our jobs (using streamsx.messagehub toolkit 2.0.1) are currently failing to reset from checkpoints with the following error:

    at com.ibm.streamsx.kafka.clients.consumer.CrKafkaConsumerGroupClient.createSeekOffsetMap(CrKafkaConsumerGroupClient.java:1045)
    at com.ibm.streamsx.kafka.clients.consumer.CrKafkaConsumerGroupClient.resetPrepareDataBeforeStopPolling(CrKafkaConsumerGroupClient.java:909)
    at com.ibm.streamsx.kafka.clients.consumer.CrKafkaConsumerGroupClient.onReset(CrKafkaConsumerGroupClient.java:481)
    at com.ibm.streamsx.kafka.operators.AbstractKafkaConsumerOperator.reset(AbstractKafkaConsumerOperator.java:1100)

When enabling debug tracing we get the explanation in the following error:

07 Aug 2019 08:15:44.725+0000 [49763] DEBUG  M[ClientNotifForwarder.NotifFetcher-run:?:-1]  - Failed to forward a notification to a listener
07 Aug 2019 08:15:44.726+0000 [49763] DEBUG  M[?:?:0]  - com.google.gson.JsonSyntaxException: com.google.gson.stream.MalformedJsonException: Expected ':' at line 1 column 418 path $.key.192
07 Aug 2019 08:15:44.726+0000 [49763] DEBUG  M[?:?:0]  - com.google.gson.Gson.fromJson(Gson.java:902)
07 Aug 2019 08:15:44.726+0000 [49763] DEBUG  M[?:?:0]  - com.google.gson.Gson.fromJson(Gson.java:852)
07 Aug 2019 08:15:44.727+0000 [49763] DEBUG  M[?:?:0]  - com.google.gson.Gson.fromJson(Gson.java:801)
07 Aug 2019 08:15:44.727+0000 [49763] DEBUG  M[?:?:0]  - com.google.gson.Gson.fromJson(Gson.java:773)
07 Aug 2019 08:15:44.727+0000 [49763] DEBUG  M[?:?:0]  - com.ibm.streamsx.kafka.clients.consumer.CrKafkaConsumerGroupClient.handleNotification(CrKafkaConsumerGroupClient.java:270)
07 Aug 2019 08:15:44.727+0000 [49763] DEBUG  M[?:?:0]  - com.sun.jmx.remote.opt.internal.ClientNotifForwarder$NotifFetcher.dispatchNotification(ClientNotifForwarder.java:421)
07 Aug 2019 08:15:44.728+0000 [49763] DEBUG  M[?:?:0]  - com.sun.jmx.remote.opt.internal.ClientNotifForwarder$NotifFetcher.run(ClientNotifForwarder.java:395)
07 Aug 2019 08:15:44.728+0000 [49763] DEBUG  M[?:?:0]  - java.lang.Thread.run(Thread.java:812)

Turns out when the sequence id passes 1000 the json generation is incorrectly adding number separators ( 1,000).

thomas-mattsson commented 5 years ago

Have a fix ready, preparing PR

thomas-mattsson commented 5 years ago

https://github.com/IBMStreams/streamsx.kafka/pull/172

thomas-mattsson commented 5 years ago

@RolefH, can you have a look at this? It's a serious blocker for us. See PR #172 for fix.

ghost commented 5 years ago

corrected in https://github.com/IBMStreams/streamsx.kafka/releases/tag/v2.0.1