camunda-community-hub / kafka-connect-zeebe

Kafka Connect for Zeebe.io
Apache License 2.0
96 stars 52 forks source link

NullPointerException io.zeebe.kafka.connect.source.ZeebeSourceTask.commit #64

Open lrealdi opened 2 years ago

lrealdi commented 2 years ago

Hi I have this issue with kafka-connect-zeebe-0.32.1-SNAPSHOT.jar

This is my docker-compose

version: "3"

services:
    zeebe:
        image: camunda/zeebe:${CAMUNDA_CLOUD_VERSION:-1.2.4}
        container_name: zeebe
        environment:
            - ZEEBE_BROKER_EXPORTERS_ELASTICSEARCH_CLASSNAME=io.camunda.zeebe.exporter.ElasticsearchExporter
            - ZEEBE_BROKER_EXPORTERS_ELASTICSEARCH_ARGS_URL=http://elasticsearch:9200
            - ZEEBE_BROKER_EXPORTERS_ELASTICSEARCH_ARGS_BULK_SIZE=1
        ports:
            - 26500:26500
        volumes:
            - zeebe:/usr/local/zeebe/data
        networks:
            - camunda-cloud
        depends_on:
            - elasticsearch

    operate:
        image: camunda/operate:${CAMUNDA_CLOUD_VERSION:-1.2.4}
        container_name: operate
        environment:
            - CAMUNDA_OPERATE_ZEEBE_GATEWAYADDRESS=zeebe:26500
            - CAMUNDA_OPERATE_ELASTICSEARCH_URL=http://elasticsearch:9200
            - CAMUNDA_OPERATE_ZEEBEELASTICSEARCH_URL=http://elasticsearch:9200
        ports:
            - 8080:8080
        networks:
            - camunda-cloud
        depends_on:
            - elasticsearch

    tasklist:
        image: camunda/tasklist:${CAMUNDA_CLOUD_VERSION:-1.2.4}
        container_name: tasklist
        environment:
            - CAMUNDA_TASKLIST_ZEEBE_GATEWAYADDRESS=zeebe:26500
            - CAMUNDA_TASKLIST_ELASTICSEARCH_URL=http://elasticsearch:9200
            - CAMUNDA_TASKLIST_ZEEBEELASTICSEARCH_URL=http://elasticsearch:9200
        ports:
            - 8081:8080
        networks:
            - camunda-cloud
        depends_on:
            - elasticsearch

    elasticsearch:
        image: docker.elastic.co/elasticsearch/elasticsearch:${ELASTIC_VERSION:-7.14.1}
        container_name: elasticsearch
        environment:
            - cluster.name=camunda-cloud
            - discovery.type=single-node
            - bootstrap.memory_lock=true
            - "ES_JAVA_OPTS=-Xms512m -Xmx512m"
        ulimits:
            memlock:
                soft: -1
                hard: -1
        volumes:
            - elastic:/usr/share/elasticsearch/data
        networks:
            - camunda-cloud

    zookeeper:
        image: confluentinc/cp-zookeeper:latest
        restart: unless-stopped
        hostname: zookeeper
        environment:
            ZOOKEEPER_CLIENT_PORT: 2181
            ZOOKEEPER_TICK_TIME: 2000
        ports:
            - 22181:2181

    kafka:
        image: confluentinc/cp-server:7.0.0
        hostname: kafka
        container_name: kafka
        depends_on:
            - zookeeper
        ports:
            - "9092:9092"
            - "9101:9101"
        environment:
            KAFKA_BROKER_ID: 1
            KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
            KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
            KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
            KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
            KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
            KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
            KAFKA_CONFLUENT_LICENSE_TOPIC_REPLICATION_FACTOR: 1
            KAFKA_CONFLUENT_BALANCER_TOPIC_REPLICATION_FACTOR: 1
            KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
            KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
            KAFKA_JMX_PORT: 9101
            KAFKA_JMX_HOSTNAME: localhost
            KAFKA_CONFLUENT_SCHEMA_REGISTRY_URL: http://schema-registry:8085
            CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: kafka:29092
            CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1
            CONFLUENT_METRICS_ENABLE: 'true'
            CONFLUENT_SUPPORT_CUSTOMER_ID: 'anonymous'

    schema-registry:
        image: confluentinc/cp-schema-registry:7.0.0
        hostname: schema-registry
        container_name: schema-registry
        depends_on:
            - kafka
        ports:
            - "8085:8081"
        environment:
            SCHEMA_REGISTRY_HOST_NAME: schema-registry
            SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'kafka:29092'
            SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081

    connect:
        image: cnfldemos/cp-server-connect-datagen:0.5.0-6.2.0
        hostname: connect
        container_name: connect
        depends_on:
            - kafka
            - schema-registry
        ports:
            - "8083:8083"
        environment:
            CONNECT_BOOTSTRAP_SERVERS: 'kafka:29092'
            CONNECT_REST_ADVERTISED_HOST_NAME: connect
            CONNECT_REST_PORT: 8083
            CONNECT_GROUP_ID: compose-connect-group
            CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
            CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
            CONNECT_OFFSET_FLUSH_INTERVAL_MS: 10000
            CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
            CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
            CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
            CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
            CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter
            CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081
            CLASSPATH: /usr/share/java/monitoring-interceptors/monitoring-interceptors-7.0.0.jar
            CONNECT_PRODUCER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor"
            CONNECT_CONSUMER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor"
            CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
            CONNECT_VALUE_CONVERTER: "org.apache.kafka.connect.storage.StringConverter"
            CONNECT_INTERNAL_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
            CONNECT_INTERNAL_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
            CONNECT_LOG4J_ROOT_LOGLEVEL: "INFO"
            CONNECT_LOG4J_LOGGERS: "org.apache.kafka.connect.runtime.rest=INFO,org.reflections=ERROR,io.zeebe.kafka.connect=TRACE,io.zeebe.client=INFO"
            CONNECT_PLUGIN_PATH: '/usr/share/java,/usr/share/confluent-hub-components,/etc/kafka-connect/jars'
        volumes:
            - ./connectors:/etc/kafka-connect/jars/

    control-center:
        image: confluentinc/cp-enterprise-control-center:7.0.0
        hostname: control-center
        container_name: control-center
        depends_on:
            - zookeeper
            - kafka
            - schema-registry
            - connect
        ports:
            - "9021:9021"
        environment:
            CONTROL_CENTER_BOOTSTRAP_SERVERS: 'kafka:29092'
            CONTROL_CENTER_CONNECT_CONNECT-DEFAULT_CLUSTER: 'connect:8083'
            CONTROL_CENTER_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
            CONTROL_CENTER_REPLICATION_FACTOR: 1
            CONTROL_CENTER_INTERNAL_TOPICS_PARTITIONS: 1
            CONTROL_CENTER_MONITORING_INTERCEPTOR_TOPIC_PARTITIONS: 1
            CONFLUENT_METRICS_TOPIC_REPLICATION: 1
            PORT: 9021

volumes:
    zeebe:
        driver: local
    elastic:
        driver: local

networks:
    camunda-cloud:
        driver: bridge

When I add th ping connector, the controlcenter show: State: FAILED task id: connect:8083 immagine

In console I see:

 [2021-11-21 11:55:36,373] ERROR WorkerSourceTask{id=ping-0} Exception thrown while calling task.commit() (org.apache.kafka.connect.runtime.WorkerSourceTask)
 java.lang.NullPointerException
    at io.zeebe.kafka.connect.source.ZeebeSourceTask.commit(ZeebeSourceTask.java:114)
    at org.apache.kafka.connect.runtime.WorkerSourceTask.commitSourceTask(WorkerSourceTask.java:571)
    at org.apache.kafka.connect.runtime.WorkerSourceTask.commitOffsets(WorkerSourceTask.java:515)
    at org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter.commit(SourceTaskOffsetCommitter.java:110)
    at org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter.lambda$schedule$0(SourceTaskOffsetCommitter.java:84)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
    at java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305)
    at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:829)

Originally posted by @lrealdi in https://github.com/camunda-community-hub/kafka-connect-zeebe/issues/57#issuecomment-974803118

lrealdi commented 2 years ago
[2021-11-21 21:19:00,955] ERROR WorkerSourceTask{id=ping-0} Exception thrown while calling task.commit() (org.apache.kafka.connect.runtime.WorkerSourceTask)
java.lang.NullPointerException
    at io.zeebe.kafka.connect.source.ZeebeSourceTask.commit(ZeebeSourceTask.java:114)
    at org.apache.kafka.connect.runtime.WorkerSourceTask.commitSourceTask(WorkerSourceTask.java:571)
    at org.apache.kafka.connect.runtime.WorkerSourceTask.commitOffsets(WorkerSourceTask.java:515)
    at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:252)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:182)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:231)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:829)
[2021-11-21 21:19:00,959] ERROR WorkerSourceTask{id=ping-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask)
java.lang.ExceptionInInitializerError
    at io.grpc.netty.NettyChannelBuilder.<clinit>(NettyChannelBuilder.java:82)
    at io.camunda.zeebe.client.impl.ZeebeClientImpl.buildChannel(ZeebeClientImpl.java:124)
    at io.camunda.zeebe.client.impl.ZeebeClientImpl.<init>(ZeebeClientImpl.java:80)
    at io.camunda.zeebe.client.impl.ZeebeClientBuilderImpl.build(ZeebeClientBuilderImpl.java:282)
    at io.zeebe.kafka.connect.util.ZeebeClientHelper.buildClient(ZeebeClientHelper.java:62)
    at io.zeebe.kafka.connect.source.ZeebeSourceTask.start(ZeebeSourceTask.java:52)
    at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:225)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:182)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:231)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.ClassCastException: class io.netty.channel.epoll.EpollSocketChannel
    at java.base/java.lang.Class.asSubclass(Class.java:3640)
    at io.grpc.netty.Utils.epollChannelType(Utils.java:318)
    at io.grpc.netty.Utils.<clinit>(Utils.java:110)
    ... 14 more
[2021-11-21 21:19:00,966] WARN Could not stop task (org.apache.kafka.connect.runtime.WorkerSourceTask)
java.lang.NullPointerException
    at io.zeebe.kafka.connect.source.ZeebeSourceTask.stop(ZeebeSourceTask.java:109)
    at org.apache.kafka.connect.runtime.WorkerSourceTask.close(WorkerSourceTask.java:170)
    at org.apache.kafka.connect.runtime.WorkerTask.doClose(WorkerTask.java:161)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:187)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:231)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:829)