hpgrahsl / kafka-connect-mongodb

**Unofficial / Community** Kafka Connect MongoDB Sink Connector -> integrated 2019 into the official MongoDB Kafka Connector here: https://www.mongodb.com/kafka-connector
Apache License 2.0
153 stars 60 forks source link

MongoDB Sink Connector Fails with NullPointerException #138

Closed PauloFavero closed 1 year ago

PauloFavero commented 1 year ago

HI,

I am trying to create a CDC pipeline with Debezium and kafka-mongo connect. This pipeline goal is to replicate data between 2 MongoDB databases(The real use case case is synchronization between multiple databases to a single one) .

My first try was using the official MongoDB Sink Connector but it fails for update operations as I explain in this topic topic at the MongoDB community.

Since I am having this issue, I moved on to give a try with the kafka-connect-mongodb but it is fails once it receives the first event.

The environment configuration and the error when using the kafka-connect-mongodb is slightly different than the one I showed at the MongoDB forum so, I am going to post my configuration below and the error at end:

Debezium Source Connector Config

{
  "name": "metrics",
  "config": {
    "connector.class": "io.debezium.connector.mongodb.MongoDbConnector",
    "mongodb.name": "metrics-src",
    "mongodb.user": "admin",
    "mongodb.password": "admin",
    "mongodb.authsource": "admin",
    "mongodb.hosts": "rs0/metrics-src:27017",
    "topic.prefix": "src",
    "database.include.list": "metrics"
  }
}

Kafka MongoDB Sink Connect Config

{
  "name": "metrics",
  "config": {
    "connector.class": "at.grahsl.kafka.connect.mongodb.MongoDbSinkConnector",
    "mongodb.change.data.capture.handler": "at.grahsl.kafka.connect.mongodb.cdc.debezium.mongodb.MongoDbHandler",
    "mongodb.connection.uri": "mongodb://metrics-sink:27017/metrics",
    "mongodb.collection": "metrics",
    "mongodb.delete.on.null.values": "true",
    "topics": "src.metrics.customers"
  }
}

Sink Connector Dockerfile

FROM quay.io/debezium/connect:2.0
ENV KAFKA_CONNECT_MONGODB_DIR=$KAFKA_CONNECT_PLUGINS_DIR/kafka-connect-mongodb

USER root
RUN microdnf -y install git maven java-11-openjdk-devel && microdnf clean all

USER kafka

# Deploy MongoDB Sink Connector
RUN mkdir -p $KAFKA_CONNECT_MONGODB_DIR && cd $KAFKA_CONNECT_MONGODB_DIR && \
  git clone https://github.com/hpgrahsl/kafka-connect-mongodb.git && \
  cd kafka-connect-mongodb && \
  git fetch --tags && \
  git checkout tags/v1.2.0 && \
  mvn clean package -DskipTests=true -DskipITs=true && \
  mv target/kafka-connect-mongodb/kafka-connect-mongodb-1.2.0-jar-with-dependencies.jar $KAFKA_CONNECT_MONGODB_DIR && \
  cd .. && rm -rf $KAFKA_CONNECT_MONGODB_DIR/kafka-connect-mongodb

Docker Compose Configuration

version: '3.4'

services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.0.1
    container_name: zookeeper
    restart: always
    networks:
        - sync-network
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
      ZOO_4LW_COMMANDS_WHITELIST: "*"
      KAFKA_OPTS: "-Dzookeeper.4lw.commands.whitelist=ruok"
    healthcheck:
      test: nc -z localhost 2181 || exit -1
      interval: 10s
      timeout: 5s
      retries: 3
      start_period: 10s
    extra_hosts:
      - "moby:127.0.0.1"

  broker:
    image: confluentinc/cp-kafka:7.0.1
    container_name: broker
    restart: always
    networks:
        - sync-network
    ports:
      - "9092:9092"
      - "39092:39092"
    depends_on:
        zookeeper:
          condition: service_healthy
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      KAFKA_LISTENERS: DOCKER_LISTENER://broker:9092,HOST_LISTENER://broker:19092,EXTERNAL_LISTENER://0.0.0.0:39092
      KAFKA_ADVERTISED_LISTENERS: DOCKER_LISTENER://broker:9092,HOST_LISTENER://localhost:19092,EXTERNAL_LISTENER://150.230.85.73:39092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: DOCKER_LISTENER:PLAINTEXT,HOST_LISTENER:PLAINTEXT,EXTERNAL_LISTENER:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: DOCKER_LISTENER
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
    extra_hosts:
      - "moby:127.0.0.1"
    healthcheck:
      test: echo "ruok" | timeout 2 nc -w 2 zookeeper 2181 | grep imok
      interval: 10s
      timeout: 5s
      retries: 3

  kafdrop:
    image: obsidiandynamics/kafdrop:latest
    container_name: kafdrop
    # network_mode: host
    ports:
      - 9000:9000
    networks:
      - sync-network
    depends_on:
        broker:
          condition: service_healthy
    environment:
      KAFKA_BROKERCONNECT: broker:9092

  metrics-src:
    image: mongo:5.0.5
    hostname: metrics-src
    restart: always
    container_name: metrics-src
    ports:
      - 27040:27017
    networks:
      - sync-network
    environment:
      MONGO_INITDB_DATABASE: metrics
    volumes:
      - ./scripts:/scripts
    healthcheck:
      test: test $$(echo "rs.initiate().ok || rs.status().ok" | mongo -u admin -p admin --quiet) -eq 1
      interval: 10s
      start_period: 30s
    command: --replSet rs0 --bind_ip_all

  metrics-sink:
    image: mongo:5.0.5
    hostname: metrics-sink
    restart: always
    container_name: metrics-sink
    ports:
      - 27020:27017
    networks:
      - sync-network
    environment:
      MONGO_INITDB_DATABASE: metrics
    volumes:
      - ./scripts:/scripts
    healthcheck:
      test: test $$(echo "rs.initiate().ok || rs.status().ok" | mongo -u admin -p admin --quiet) -eq 1
      interval: 10s
      start_period: 30s
    command: --replSet rs0 --bind_ip_all

  metrics-src-connect:
    image: quay.io/debezium/connect:2.0
    container_name: metrics-connect
    ports:
     - 8083:8083
    links:
     - broker
     - metrics-src
    networks:
      - sync-network
    volumes:
     - kafka-src-config:/kafka/config
    environment:
     - BOOTSTRAP_SERVERS=broker:9092
     - REST_HOST_NAME=0.0.0.0
     - GROUP_ID=1
     - CONFIG_STORAGE_TOPIC=metrics_src_connect_configs
     - OFFSET_STORAGE_TOPIC=metrics_src_connect_offsets
     - STATUS_STORAGE_TOPIC=metrics_src_connect_status
     - CONNECT_KEY_CONVERTER_SCHEMAS_ENABLE=false
     - CONNECT_KEY_CONVERTER=org.apache.kafka.connect.json.JsonConverter
     - CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE=true
     - CONNECT_VALUE_CONVERTER=org.apache.kafka.connect.json.JsonConverter

  # container with debezium plugins
  metrics-sink-connect:
    image: debezium/connect-mongodb:2.0
    container_name: metrics-sink-connect
    build:
      context: debezium-sink-mongodb-connect
    ports:
     - 8084:8083
    links:
     - broker
     - metrics-sink
    networks:
      - sync-network
    environment:
     - BOOTSTRAP_SERVERS=broker:9092
     - GROUP_ID=2
     - CONFIG_STORAGE_TOPIC=metrics_sink_connect_configs
     - OFFSET_STORAGE_TOPIC=metrics_sink_connect_offsets
     - STATUS_STORAGE_TOPIC=metrics_sink_connect_status
     - CONNECT_KEY_CONVERTER_SCHEMAS_ENABLE=false
     - CONNECT_KEY_CONVERTER=org.apache.kafka.connect.json.JsonConverter
     - CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE=true
     - CONNECT_VALUE_CONVERTER=org.apache.kafka.connect.json.JsonConverter

networks:
  sync-network:
    driver: bridge

volumes:
  kafka-sink-config:
    driver: local
    driver_opts:
      type: none
      o: bind
      device: ./kafka/sink-config

  kafka-src-config:
    driver: local
    driver_opts:
      type: none
      o: bind
      device: ./kafka/src-config

The error that rises is the following:


metrics-sink-connect    | 2022-11-24 14:21:42,928 ERROR  ||  WorkerSinkTask{id=metrics-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. Error: null   [org.apache.kafka.connect.runtime.WorkerSinkTask]
metrics-sink-connect    | java.lang.NullPointerException
metrics-sink-connect    |       at java.base/java.util.Objects.requireNonNull(Objects.java:221)
metrics-sink-connect    |       at at.grahsl.kafka.connect.mongodb.MongoDbSinkTask.buildWriteModelCDC(MongoDbSinkTask.java:215)
metrics-sink-connect    |       at at.grahsl.kafka.connect.mongodb.MongoDbSinkTask.processSinkRecords(MongoDbSinkTask.java:121)
metrics-sink-connect    |       at at.grahsl.kafka.connect.mongodb.MongoDbSinkTask.lambda$put$0(MongoDbSinkTask.java:111)
metrics-sink-connect    |       at java.base/java.util.ArrayList.forEach(ArrayList.java:1541)
metrics-sink-connect    |       at at.grahsl.kafka.connect.mongodb.MongoDbSinkTask.lambda$put$1(MongoDbSinkTask.java:110)
metrics-sink-connect    |       at java.base/java.util.HashMap.forEach(HashMap.java:1337)
metrics-sink-connect    |       at at.grahsl.kafka.connect.mongodb.MongoDbSinkTask.put(MongoDbSinkTask.java:109)
metrics-sink-connect    |       at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:581)
metrics-sink-connect    |       at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:333)
metrics-sink-connect    |       at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:234)
metrics-sink-connect    |       at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:203)
metrics-sink-connect    |       at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:189)
metrics-sink-connect    |       at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:244)
metrics-sink-connect    |       at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
metrics-sink-connect    |       at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
metrics-sink-connect    |       at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
metrics-sink-connect    |       at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
metrics-sink-connect    |       at java.base/java.lang.Thread.run(Thread.java:829)
metrics-sink-connect    | 2022-11-24 14:21:42,931 ERROR  ||  WorkerSinkTask{id=metrics-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted   [org.apache.kafka.connect.runtime.WorkerTask]
metrics-sink-connect    | org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
metrics-sink-connect    |       at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:611)
metrics-sink-connect    |       at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:333)
metrics-sink-connect    |       at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:234)
metrics-sink-connect    |       at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:203)
metrics-sink-connect    |       at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:189)
metrics-sink-connect    |       at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:244)
metrics-sink-connect    |       at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
metrics-sink-connect    |       at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
metrics-sink-connect    |       at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
metrics-sink-connect    |       at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
metrics-sink-connect    |       at java.base/java.lang.Thread.run(Thread.java:829)
metrics-sink-connect    | Caused by: java.lang.NullPointerException
metrics-sink-connect    |       at java.base/java.util.Objects.requireNonNull(Objects.java:221)
metrics-sink-connect    |       at at.grahsl.kafka.connect.mongodb.MongoDbSinkTask.buildWriteModelCDC(MongoDbSinkTask.java:215)
metrics-sink-connect    |       at at.grahsl.kafka.connect.mongodb.MongoDbSinkTask.processSinkRecords(MongoDbSinkTask.java:121)
metrics-sink-connect    |       at at.grahsl.kafka.connect.mongodb.MongoDbSinkTask.lambda$put$0(MongoDbSinkTask.java:111)
metrics-sink-connect    |       at java.base/java.util.ArrayList.forEach(ArrayList.java:1541)
metrics-sink-connect    |       at at.grahsl.kafka.connect.mongodb.MongoDbSinkTask.lambda$put$1(MongoDbSinkTask.java:110)
metrics-sink-connect    |       at java.base/java.util.HashMap.forEach(HashMap.java:1337)
metrics-sink-connect    |       at at.grahsl.kafka.connect.mongodb.MongoDbSinkTask.put(MongoDbSinkTask.java:109)
metrics-sink-connect    |       at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:581)
metrics-sink-connect    |       ... 10 more

Could this error be to a compatibility version mismatch between Debezium and kafka-mongodb-connect or with MongoDB 5.05?

Also, do you have any thoughts regarding when I use the official MongoDB sink connector with the Debezium CDC Handler as I posted on the MongoDB forum?

Thanks in advance Paulo

hpgrahsl commented 1 year ago

Hi. Thx for reaching out. Since it's basically the same problem I replied in the mongodb community forums (https://www.mongodb.com/community/forums/t/mongodb-kafka-sink-connector-w-debezium-cdc-handler-fails-on-update-operations/201288/2) so that hopefully more people will read it and benefit from my answer. Also, please consider this community project as discontinued since the code has been integrated into the official mongodb sink connector a couple of years ago already. Therefore, please raise further issues in the mongodb issue tracker directly rather than here. I'm closing this for now since this issue will not be fixed in my community project anymore. Thx for your understanding.