databricks / iceberg-kafka-connect

Apache License 2.0
219 stars 49 forks source link

NullPointerException: Cannot invoke "java.lang.CharSequence.toString()" because "value" is null #170

Open bochenekmartin opened 11 months ago

bochenekmartin commented 11 months ago

Hi Iceberg connector Team! 👋

I am getting the following exception on the Iceberg connector 0.6.2: NullPointerException: Cannot invoke "java.lang.CharSequence.toString()" because "value" is null

Can you please suggest what can be a reason of the issue? When I try to reproduce it and process the message in isolation, the connector works fine, no errors.

Your help would be appreciated.

"class":"io.tabular.iceberg.connect.channel.Channel:163","msg":"Channel stopping"}
    ... 19 more
    at io.tabular.iceberg.connect.data.IcebergWriter.write(IcebergWriter.java:65)
    at org.apache.iceberg.io.UnpartitionedWriter.write(UnpartitionedWriter.java:42)
    at org.apache.iceberg.io.BaseTaskWriter$BaseRollingWriter.write(BaseTaskWriter.java:277)
    at org.apache.iceberg.io.BaseTaskWriter$RollingFileWriter.write(BaseTaskWriter.java:345)
    at org.apache.iceberg.io.BaseTaskWriter$RollingFileWriter.write(BaseTaskWriter.java:362)
    at org.apache.iceberg.io.DataWriter.write(DataWriter.java:71)
    at org.apache.iceberg.parquet.ParquetWriter.add(ParquetWriter.java:139)
    at org.apache.iceberg.parquet.ParquetValueWriters$StructWriter.write(ParquetValueWriters.java:589)
    at org.apache.iceberg.parquet.ParquetValueWriters$StringWriter.write(ParquetValueWriters.java:324)
    at org.apache.iceberg.parquet.ParquetValueWriters$StringWriter.write(ParquetValueWriters.java:336)
Caused by: java.lang.NullPointerException: Cannot invoke "java.lang.CharSequence.toString()" because "value" is null
    at java.base/java.lang.Thread.run(Thread.java:840)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
    at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:181)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:259)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:204)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:206)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:237)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:336)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:587)
    at io.tabular.iceberg.connect.IcebergSinkTask.put(IcebergSinkTask.java:150)
    at io.tabular.iceberg.connect.channel.Worker.save(Worker.java:171)
    at java.base/java.util.ArrayList.forEach(ArrayList.java:1511)
    at io.tabular.iceberg.connect.channel.Worker.save(Worker.java:184)
    at io.tabular.iceberg.connect.channel.Worker.routeRecordStatically(Worker.java:195)
    at java.base/java.util.Arrays$ArrayList.forEach(Arrays.java:4204)
    at io.tabular.iceberg.connect.channel.Worker.lambda$routeRecordStatically$5(Worker.java:197)
    at io.tabular.iceberg.connect.data.IcebergWriter.write(IcebergWriter.java:73)
org.apache.kafka.connect.errors.DataException: An error occurred converting record, topic: 10x, partition, 1, offset: 15684
2023-11-20 14:45:26.780 

 Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted.
 Error: An error occurred converting record, topic: 10x, partition, 1, offset: 15684"}

The connector config is:

{
    "connector.class": "io.tabular.iceberg.connect.IcebergSinkConnector",
    "iceberg.tables.evolve-schema-enabled": "true",
    "iceberg.catalog.glue.lakeformation-enabled": "true",
    "topic.creation.default.partitions": "1",
    "iceberg.catalog.s3.endpoint": "https://s3.af-south-1.amazonaws.com/",
    "iceberg.catalog.s3.region": "af-south-1",
    "iceberg.catalog.client.region": "af-south-1",
    "iceberg.catalog.s3.sse.key": "arn:aws:kms:af-south-1XXXXXkey/XXXXX",
    "iceberg.tables.upsert-mode-enabled": "false",
    "iceberg.catalog.client.assume-role.external-id": "XXXXX",
    "iceberg.tables.dynamic-enabled": "false",
    "errors.deadletterqueue.context.headers.enable": "true",
    "iceberg.tables.auto-create-enabled": "true",
    "iceberg.tables": "raw.rp_10x",
    "topic.creation.default.replication.factor": "-1",
    "iceberg.catalog.client.assume-role.region": "af-south-1",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "iceberg.catalog.s3.sse.type": "kms",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "iceberg.catalog.client.assume-role.arn": "arn:aws:iam::XXXXXX:role/XXXXX"",
    "iceberg.catalog.glue.account-id": "XXXXX"",
    "iceberg.catalog.catalog-impl": "org.apache.iceberg.aws.glue.GlueCatalog",
    "iceberg.catalog.io-impl": "org.apache.iceberg.aws.s3.S3FileIO",
    "topics.regex": "10.*",
    "iceberg.catalog": "raw",
    "iceberg.catalog.client.assume-role.tags.LakeFormationAuthorizedCaller": "iceberg-connector",
    "key.converter.schemas.enable": "false",
    "name": "iceberg-connector-1",
    "value.converter.schemas.enable": "true",
    "iceberg.catalog.warehouse": "s3://XXXXXXX",
    "iceberg.control.topic": "the-control-topic",
    "iceberg.catalog.s3.path-style-access": "true",
  }
bryanck commented 11 months ago

The Iceberg Parquet writer will throw an NPE when writing a null value to a required field. Is this the case for you?

tomasz-sadura commented 11 months ago

Is there a way to find out what field it failed for?

bryanck commented 11 months ago

Not currently, this fails inside the Iceberg Parquet writer, so it would be best to handle it there. I have on my list to submit a PR for that, unless someone gets to it first. I'll look into adding this to the conversion routine, we may be able to check there without adding much overhead.

jason-da-redpanda commented 10 months ago

Was just checking in to see if there was a PR for this .. thanks

jason-da-redpanda commented 9 months ago

checking in again to see if there was a PR for this .. thanks

bryanck commented 9 months ago

No there isn't a PR for this in Iceberg as far as I know. I'm planning on waiting until after the sink is submitted to the Iceberg project before investigating further, as other writers may benefit (e.g. Flink sink).

wzzzrd86 commented 4 months ago

@bryanck Following up on this to see if there has been any movement or a PR?

bryanck commented 4 months ago

Not yet unfortunately, I'm still working on getting the sink fully submitted, but that's pretty close I think.