spring-cloud / spring-cloud-stream-binder-kafka

Spring Cloud Stream binders for Apache Kafka and Kafka Streams
Apache License 2.0
331 stars 301 forks source link

Upgrade to Spring Cloud 2020.0.0 - KafkaBinderMetrics - ConcurrentModificationException #1019

Closed buyukim closed 3 years ago

buyukim commented 3 years ago

We recently upgraded to Spring Boot 2.4.1 (and tested 2.4.2 today) from Spring Boot 2.3.3 + Cloud Hoxton.SR6 and while our application starts fine and can process Kafka events, we are seeing the following exceptions in logs after startup that were not happening before:

2021-01-15 09:10:27.300 DEBUG 56330 --- [pool-5-thread-4] o.s.c.s.binder.kafka.KafkaBinderMetrics : Cannot generate metric for topic: digital.topic.a.dev java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access at org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:2452) at org.apache.kafka.clients.consumer.KafkaConsumer.acquireAndEnsureOpen(KafkaConsumer.java:2436) at org.apache.kafka.clients.consumer.KafkaConsumer.partitionsFor(KafkaConsumer.java:1954) at org.apache.kafka.clients.consumer.KafkaConsumer.partitionsFor(KafkaConsumer.java:1930) at org.springframework.cloud.stream.binder.kafka.KafkaBinderMetrics.findTotalTopicGroupLag(KafkaBinderMetrics.java:191) at org.springframework.cloud.stream.binder.kafka.KafkaBinderMetrics.lambda$computeUnconsumedMessagesRunnable$1(KafkaBinderMetrics.java:148) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) at java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305) at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:834)

We see the above message for each topic, and the exceptions keep happening as the app runs.

We are using spring cloud stream v3.1.0, which is bundled with cloud 2020.0.0, and are also using: implementation 'org.springframework.cloud:spring-cloud-starter-bootstrap'

Here is the basic YAML config:


spring:
  cloud:
    bootstrap:
      enabled: true
    function:
      definition: consumeEventTypeA;consumeEventTypeB;consumeEventTypeC;consumeEventTypeD;consumeEventTypeE
    stream:
      bindings: 
        consumeEventTypeA-in-0: 
          destination: digital.topic.a.dev
          contentType: application/json
        consumeEventTypeB-in-0:
          destination: digital.topic.b.dev
          contentType: application/json
        consumeEventTypeC-in-0:
          destination: digital.topic.c.dev
          contentType: application/json
        consumeEventTypeD-in-0:
          destination: digital.topic.d.dev
          contentType: application/json
        consumeEventTypeE-in-0:
          destination: digital.topic.e.dev
          contentType: application/json
      default:
        group: ${SERVICE_KEY:my-app-name}
management: 
  endpoints:
    enabled-by-default: true
    web:
      exposure:
        include: health, env, sessions, jolokia, caches, prometheus, metrics, threaddump, heapdump, auditevents, beans, conditions, info, loggers, mappings, scheduledtasks, logfile
        exclude: refresh, restart, pause, resume, httptrace, httpstrace, configprops, shutdown, features
  endpoint:
    info:
      enabled: true
    metrics:
      enabled: true
    prometheus:
      enabled: true
    logfile:
      enabled: true
      external-file: /temp/spring.log
    loggers:
      enabled: true
    jolokia:
      config:
        debug: true
    health:
      show-details: ALWAYS
  metrics:
    export.prometheus.enabled: true
logging: 
  level:
    root: WARN
    org.springframework.cloud.stream.binder.kafka: DEBUG
sobychacko commented 3 years ago

@buyukim I tried to reproduce the issue with a sample project with the same dependencies as you have. I could not reproduce the issue, i.e, no exceptions on startup and I can invoke the metrics endpoint (see below). Here is that sample. Could you compare your setup with that sample and see where the mismatch is? If you still think that this is an issue, please try this sample and make it reproducible there.

Here is the metrics output when running the application:

curl http://localhost:8080/actuator/metrics/spring.cloud.stream.binder.kafka.offset | jq .                                                                                <<<
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100   342    0   342    0     0   4071      0 --:--:-- --:--:-- --:--:--  4071
{
  "name": "spring.cloud.stream.binder.kafka.offset",
  "description": "Unconsumed messages for a particular group and topic",
  "baseUnit": null,
  "measurements": [
    {
      "statistic": "VALUE",
      "value": 11
    }
  ],
  "availableTags": [
    {
      "tag": "topic",
      "values": [
        "hello4-in-0",
        "hello5-in-0",
        "hello1-in-0",
        "hello3-in-0",
        "hello2-in-0"
      ]
    },
    {
      "tag": "group",
      "values": [
        "my-group-1"
      ]
    }
  ]
}
buyukim commented 3 years ago

Did you turn up the logging to DEBUG? I have adjusted the config to try to include everything that is pertinent.

I also did testing of the real project and found that the kafka metrics are working, which seems odd given the exception message.

I am going to try to use this project and duplicate the exception

sobychacko commented 3 years ago

@buyukim Please try to reproduce the issue using the provided sample. Then we can triage further.

sobychacko commented 3 years ago

@buyukim Any updates on this issue? Were you able to duplicate the exception?

sabbyanandan commented 3 years ago

Closing due to no activity.

AlexandreCassagne commented 1 year ago

I am also getting this error, I think this should be reopened.

AlexandreCassagne commented 1 year ago
2023-01-24 17:12:58.698 DEBUG 96889 --- [pool-9-thread-9] o.s.c.s.binder.kafka.KafkaBinderMetrics  : Cannot generate metric for topic: TOPIC

java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access
    at org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:2469) ~[kafka-clients-3.1.1.jar:na]
    at org.apache.kafka.clients.consumer.KafkaConsumer.acquireAndEnsureOpen(KafkaConsumer.java:2453) ~[kafka-clients-3.1.1.jar:na]
    at org.apache.kafka.clients.consumer.KafkaConsumer.partitionsFor(KafkaConsumer.java:1946) ~[kafka-clients-3.1.1.jar:na]
    at org.apache.kafka.clients.consumer.KafkaConsumer.partitionsFor(KafkaConsumer.java:1922) ~[kafka-clients-3.1.1.jar:na]
    at org.springframework.cloud.stream.binder.kafka.KafkaBinderMetrics.findTotalTopicGroupLag(KafkaBinderMetrics.java:204) ~[spring-cloud-stream-binder-kafka-3.2.6.jar:3.2.6]
    at org.springframework.cloud.stream.binder.kafka.KafkaBinderMetrics.computeUnconsumedMessages(KafkaBinderMetrics.java:189) ~[spring-cloud-stream-binder-kafka-3.2.6.jar:3.2.6]
    at org.springframework.cloud.stream.binder.kafka.KafkaBinderMetrics.lambda$bindTo$0(KafkaBinderMetrics.java:154) ~[spring-cloud-stream-binder-kafka-3.2.6.jar:3.2.6]
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539) ~[na:na]
    at java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305) ~[na:na]
    at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305) ~[na:na]
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) ~[na:na]
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) ~[na:na]
    at java.base/java.lang.Thread.run(Thread.java:833) ~[na:na]
AlexandreCassagne commented 1 year ago

Hey @sabbyanandan, just wanted to log that this is still an issue for us. Would you prefer if I create a new issue instead? :-)

sobychacko commented 1 year ago

@AlexandreCassagne Yes, please create a new issue in https://github.com/spring-cloud/spring-cloud-stream.

Could you also link to this issue?