10xfuturetechnologies / kafka-connect-iceberg

Kafka Connector for Iceberg tables
Apache License 2.0
16 stars 5 forks source link

ERROR Exception writing records: java.lang.NullPointerException #17

Open ayush-san opened 1 year ago

ayush-san commented 1 year ago

Hi,

I am getting NPE, while trying to test the sink connector. This means that the schema variable is coming out as null.

Can this be due to this property "value.converter.schemas.enable": "false",

[2023-04-13 10:37:26,249] INFO Configuring table 'test_iceberg' with primary key [id] (com.github.tenxfuturetechnologies.kafkaconnecticeberg.sink.iceberg.IcebergTableManager)
[2023-04-13 10:37:26,252] ERROR Exception writing records (com.github.tenxfuturetechnologies.kafkaconnecticeberg.sink.IcebergSinkTask)
java.lang.NullPointerException
    at com.github.tenxfuturetechnologies.kafkaconnecticeberg.sink.iceberg.IcebergTableManager.lambda$addPrimaryKeyIfNeeded$1(IcebergTableManager.java:83)
    at java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)
    at java.base/java.util.Iterator.forEachRemaining(Iterator.java:133)
    at java.base/java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1801)
    at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)
    at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)
    at java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:913)
    at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
    at java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:578)
    at com.github.tenxfuturetechnologies.kafkaconnecticeberg.sink.iceberg.IcebergTableManager.addPrimaryKeyIfNeeded(IcebergTableManager.java:84)
    at com.github.tenxfuturetechnologies.kafkaconnecticeberg.sink.iceberg.IcebergTableManager.from(IcebergTableManager.java:39)
    at com.github.tenxfuturetechnologies.kafkaconnecticeberg.sink.RecordWriter.writeRecord(RecordWriter.java:140)
    at com.github.tenxfuturetechnologies.kafkaconnecticeberg.sink.RecordWriter.flush(RecordWriter.java:115)
    at com.github.tenxfuturetechnologies.kafkaconnecticeberg.sink.IcebergSinkTask.put(IcebergSinkTask.java:72)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:601)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:333)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:234)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:203)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:243)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:829)
ayush-san commented 1 year ago

I added custom logs to see if the schema is null or not.

[2023-04-15 08:02:17,997] INFO [Consumer clientId=connector-consumer-avro-partitioned-table-sink-0, groupId=connect-avro-partitioned-table-sink] Resetting offset for partition dbserver2.inventory.customers-0 to position FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[kafka:9092 (id: 1 rack: null)], epoch=0}}. (org.apache.kafka.clients.consumer.internals.SubscriptionState)
[2023-04-15 08:02:18,179] INFO table {
  0: before: optional struct<6: id: required int, 7: first_name: required string, 8: last_name: required string, 9: email: required string>
  1: after: optional struct<10: id: required int, 11: first_name: required string, 12: last_name: required string, 13: email: required string>
  2: source: required struct<14: version: required string, 15: connector: required string, 16: name: required string, 17: ts_ms: required long, 18: snapshot: optional string, 19: db: required string, 20: sequence: optional string, 21: table: optional string, 22: server_id: required long, 23: gtid: optional string, 24: file: required string, 25: pos: required long, 26: row: required int, 27: thread: optional long, 28: query: optional string>
  3: op: required string
  4: ts_ms: optional long
  5: transaction: optional struct<29: id: required string, 30: total_order: required long, 31: data_collection_order: required long>
} (com.github.tenxfuturetechnologies.kafkaconnecticeberg.sink.iceberg.IcebergTableManager)
[2023-04-15 08:02:18,179] INFO Configuring table 'customers_iceberg_v2' with primary key [id] (com.github.tenxfuturetechnologies.kafkaconnecticeberg.sink.iceberg.IcebergTableManager)
[2023-04-15 08:02:18,180] ERROR Exception writing records (com.github.tenxfuturetechnologies.kafkaconnecticeberg.sink.IcebergSinkTask)
java.lang.NullPointerException
    at com.github.tenxfuturetechnologies.kafkaconnecticeberg.sink.iceberg.IcebergTableManager.lambda$addPrimaryKeyIfNeeded$1(IcebergTableManager.java:88)
    at java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)
    at java.base/java.util.Iterator.forEachRemaining(Iterator.java:133)

This is my kafka payload

Key:         { "id": 1004 }
Partition:   0
Offset:      3
Timestamp:   2023-04-15 08:00:04.217 +0000 UTC
{
  "after": {
    "dbserver2.inventory.customers.Value": {
      "email": "annek@noanswer.org",
      "first_name": "Anne",
      "id": 1004,
      "last_name": "Kretchmar"
    }
  },
  "before": null,
  "op": "r",
  "source": {
    "connector": "mysql",
    "db": "inventory",
    "file": "mysql-bin.000003",
    "gtid": null,
    "name": "dbserver2",
    "pos": 157,
    "query": null,
    "row": 0,
    "sequence": null,
    "server_id": 0,
    "snapshot": {
      "string": "last_in_data_collection"
    },
    "table": {
      "string": "customers"
    },
    "thread": null,
    "ts_ms": 1681545602000,
    "version": "2.1.4.Final"
  },
  "transaction": null,
  "ts_ms": {
    "long": 1681545602881
  }
}

Debezium config

{
  "name": "avro-connector",
  "config": {
    "database.server.id": "183054",
    "database.user": "debezium",
    "key.converter": "io.confluent.connect.avro.AvroConverter",
    "database.hostname": "mysql",
    "database.include.list": "inventory",
    "value.converter.schema.registry.url": "http://schema-registry:8081",
    "topic.prefix": "dbserver2",
    "key.converter.schema.registry.url": "http://schema-registry:8081",
    "database.port": "3306",
    "value.converter.schemas.enable": "true",
    "database.password": "dbz",
    "tasks.max": "1",
    "schema.history.internal.kafka.bootstrap.servers": "kafka:9092",
    "key.converter.schemas.enable": "true",
    "schema.history.internal.kafka.topic": "schemahistory2.inventory",
    "connector.class": "io.debezium.connector.mysql.MySqlConnector",
    "value.converter": "io.confluent.connect.avro.AvroConverter"
  }
}

Iceberg sink connector config

{
  "name": "avro-partitioned-table-sink",
  "config": {
    "connector.class": "com.github.tenxfuturetechnologies.kafkaconnecticeberg.sink.IcebergSinkConnector",
    "producer.interceptor.classes": "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor",
    "consumer.interceptor.classes": "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor",
    "topics": "dbserver2.inventory.customers",
    "errors.log.enable": "true",
    "errors.retry.delay.max.ms": "60000",
    "errors.retry.timeout": "5",
    "format.class": "io.confluent.connect.s3.format.avro.AvroFormat",
    "header.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
    "key.converter": "io.confluent.connect.avro.AvroConverter",
    "key.converter.schemas.enable": "true",
    "key.converter.schema.registry.url": "http://schema-registry:8081",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "value.converter.schemas.enable": "true",
    "value.converter.schema.registry.url": "http://schema-registry:8081",
    "key.converter.enhanced.avro.schema.support": true,
    "transforms": "TombstoneHandler",
    "transforms.TombstoneHandler.type": "io.confluent.connect.transforms.TombstoneHandler",
    "transforms.TombstoneHandler.behavior": "warn",
    "table.namespace": "test_inventory",
    "table.name": "customers_iceberg_v2",
    "table.auto-create": true,
    "table.primary-key": "id",
    "timezone": "UTC",
    "flush.size": 1000,
    "iceberg.catalog-impl": "org.apache.iceberg.aws.glue.GlueCatalog",
    "iceberg.warehouse": "s3a://XXX/iceberg_sink",
    "iceberg.fs.defaultFS": "s3a://XXX/iceberg_sink",
    "iceberg.com.amazonaws.services.s3.enableV4": true,
    "iceberg.com.amazonaws.services.s3a.enableV4": true,
    "iceberg.fs.s3a.aws.credentials.provider": "com.amazonaws.auth.DefaultAWSCredentialsProviderChain",
    "iceberg.fs.s3a.path.style.access": true,
    "iceberg.fs.s3a.impl": "org.apache.hadoop.fs.s3a.S3AFileSystem"
  }
}
ayush-san commented 1 year ago

For this sink connector to work with debezium format change log we need to use ExtractNewRecordState transform. We should mention this in readme

{
  "name": "avro-connector",
  "config": {
    "database.server.id": "183054",
    "database.user": "debezium",
    "key.converter": "io.confluent.connect.avro.AvroConverter",
    "database.hostname": "mysql",
    "database.include.list": "inventory",
    "value.converter.schema.registry.url": "http://schema-registry:8081",
    "topic.prefix": "dbserver1",
    "key.converter.schema.registry.url": "http://schema-registry:8081",
    "database.port": "3306",
    "value.converter.schemas.enable": "true",
    "database.password": "dbz",
    "tasks.max": "1",
    "schema.history.internal.kafka.bootstrap.servers": "kafka:9092",
    "key.converter.schemas.enable": "true",
    "schema.history.internal.kafka.topic": "schemahistory1.inventory",
    "connector.class": "io.debezium.connector.mysql.MySqlConnector",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "transforms": "unwrap",
    "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
    "transforms.unwrap.drop.tombstones": "false",
    "transforms.unwrap.delete.handling.mode": "rewrite",
    "transforms.unwrap.add.fields": "op,source.ts_ms,source.file,source.pos,source.row"
  }
}
ddcprg commented 1 year ago

hi @ayush-san thanks for raising this issue. We've not tested this connector with Debezium so far, please feel free to raise a PR to add docs for this. Please note that this sink won't handle the Debezium operation, are you using the Iceberg table as a storage for all the mysql bin logs?

ddcprg commented 1 year ago

We'll add additional documentation to a wiki page, that may be the best place for this type of examples

ayush-san commented 1 year ago

@ddcprg Will paise a PR to update the docs.

Yes I was thinking of using this connector to read from the debezium CDC data and have realtime replica of prod tables in our data lake

ddcprg commented 1 year ago

If you are looking at table replication you may consider using https://github.com/getindata/kafka-connect-iceberg-sink instead

ayush-san commented 1 year ago

@ddcprg I was playing around with this connector and didn't notice any issue. Why do you think that for table replication that connector is more apt?

ddcprg commented 1 year ago

I don't know the details of your use case but the following is one of the issues you'll have if you intend your Iceberg table to be a replica of your MySQL table: this connector won't be able to handle Debezium's operation field, if you delete a record from the MySQL table the row won't be deleted in the Iceberg table. We may add a similar feature later if we or someone else need it but it will probably be handled with tombstones.

If you instead want to use the Iceberg table as a database operation log of the source table then this connector will be suitable for the use case.

ayush-san commented 1 year ago

this connector won't be able to handle Debezium's operation field, if you delete a record from the MySQL table the row won't be deleted in the Iceberg table

Yes, but that is something I need as I will not be directly exposing my table since it can contain sensitive fields. So for my use case after using ExtractNewRecordState transform, this connector is making more sense.

Also apart from this, I noticed that we want the user to provide table name config but that will not work if the user also gives topic regex config.

Should I raise the PR to use the topic name as the table name in that case?

ayush-san commented 1 year ago

this connector won't be able to handle Debezium's operation field

Meaning this will create new rows instead of updating the row?