tabular-io / iceberg-kafka-connect

Apache License 2.0
177 stars 32 forks source link

NullPointerException: Cannot invoke "java.lang.CharSequence.toString()" because "value" is null #170

Open bochenekmartin opened 7 months ago

bochenekmartin commented 7 months ago

Hi Iceberg connector Team! 👋

I am getting the following exception on the Iceberg connector 0.6.2: NullPointerException: Cannot invoke "java.lang.CharSequence.toString()" because "value" is null

Can you please suggest what can be a reason of the issue? When I try to reproduce it and process the message in isolation, the connector works fine, no errors.

Your help would be appreciated.

"class":"io.tabular.iceberg.connect.channel.Channel:163","msg":"Channel stopping"}
    ... 19 more
    at io.tabular.iceberg.connect.data.IcebergWriter.write(IcebergWriter.java:65)
    at org.apache.iceberg.io.UnpartitionedWriter.write(UnpartitionedWriter.java:42)
    at org.apache.iceberg.io.BaseTaskWriter$BaseRollingWriter.write(BaseTaskWriter.java:277)
    at org.apache.iceberg.io.BaseTaskWriter$RollingFileWriter.write(BaseTaskWriter.java:345)
    at org.apache.iceberg.io.BaseTaskWriter$RollingFileWriter.write(BaseTaskWriter.java:362)
    at org.apache.iceberg.io.DataWriter.write(DataWriter.java:71)
    at org.apache.iceberg.parquet.ParquetWriter.add(ParquetWriter.java:139)
    at org.apache.iceberg.parquet.ParquetValueWriters$StructWriter.write(ParquetValueWriters.java:589)
    at org.apache.iceberg.parquet.ParquetValueWriters$StringWriter.write(ParquetValueWriters.java:324)
    at org.apache.iceberg.parquet.ParquetValueWriters$StringWriter.write(ParquetValueWriters.java:336)
Caused by: java.lang.NullPointerException: Cannot invoke "java.lang.CharSequence.toString()" because "value" is null
    at java.base/java.lang.Thread.run(Thread.java:840)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
    at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:181)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:259)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:204)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:206)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:237)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:336)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:587)
    at io.tabular.iceberg.connect.IcebergSinkTask.put(IcebergSinkTask.java:150)
    at io.tabular.iceberg.connect.channel.Worker.save(Worker.java:171)
    at java.base/java.util.ArrayList.forEach(ArrayList.java:1511)
    at io.tabular.iceberg.connect.channel.Worker.save(Worker.java:184)
    at io.tabular.iceberg.connect.channel.Worker.routeRecordStatically(Worker.java:195)
    at java.base/java.util.Arrays$ArrayList.forEach(Arrays.java:4204)
    at io.tabular.iceberg.connect.channel.Worker.lambda$routeRecordStatically$5(Worker.java:197)
    at io.tabular.iceberg.connect.data.IcebergWriter.write(IcebergWriter.java:73)
org.apache.kafka.connect.errors.DataException: An error occurred converting record, topic: 10x, partition, 1, offset: 15684
2023-11-20 14:45:26.780 

 Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted.
 Error: An error occurred converting record, topic: 10x, partition, 1, offset: 15684"}

The connector config is:

{
    "connector.class": "io.tabular.iceberg.connect.IcebergSinkConnector",
    "iceberg.tables.evolve-schema-enabled": "true",
    "iceberg.catalog.glue.lakeformation-enabled": "true",
    "topic.creation.default.partitions": "1",
    "iceberg.catalog.s3.endpoint": "https://s3.af-south-1.amazonaws.com/",
    "iceberg.catalog.s3.region": "af-south-1",
    "iceberg.catalog.client.region": "af-south-1",
    "iceberg.catalog.s3.sse.key": "arn:aws:kms:af-south-1XXXXXkey/XXXXX",
    "iceberg.tables.upsert-mode-enabled": "false",
    "iceberg.catalog.client.assume-role.external-id": "XXXXX",
    "iceberg.tables.dynamic-enabled": "false",
    "errors.deadletterqueue.context.headers.enable": "true",
    "iceberg.tables.auto-create-enabled": "true",
    "iceberg.tables": "raw.rp_10x",
    "topic.creation.default.replication.factor": "-1",
    "iceberg.catalog.client.assume-role.region": "af-south-1",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "iceberg.catalog.s3.sse.type": "kms",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "iceberg.catalog.client.assume-role.arn": "arn:aws:iam::XXXXXX:role/XXXXX"",
    "iceberg.catalog.glue.account-id": "XXXXX"",
    "iceberg.catalog.catalog-impl": "org.apache.iceberg.aws.glue.GlueCatalog",
    "iceberg.catalog.io-impl": "org.apache.iceberg.aws.s3.S3FileIO",
    "topics.regex": "10.*",
    "iceberg.catalog": "raw",
    "iceberg.catalog.client.assume-role.tags.LakeFormationAuthorizedCaller": "iceberg-connector",
    "key.converter.schemas.enable": "false",
    "name": "iceberg-connector-1",
    "value.converter.schemas.enable": "true",
    "iceberg.catalog.warehouse": "s3://XXXXXXX",
    "iceberg.control.topic": "the-control-topic",
    "iceberg.catalog.s3.path-style-access": "true",
  }
bryanck commented 7 months ago

The Iceberg Parquet writer will throw an NPE when writing a null value to a required field. Is this the case for you?

tomasz-sadura commented 7 months ago

Is there a way to find out what field it failed for?

bryanck commented 7 months ago

Not currently, this fails inside the Iceberg Parquet writer, so it would be best to handle it there. I have on my list to submit a PR for that, unless someone gets to it first. I'll look into adding this to the conversion routine, we may be able to check there without adding much overhead.

jason-da-redpanda commented 5 months ago

Was just checking in to see if there was a PR for this .. thanks

jason-da-redpanda commented 5 months ago

checking in again to see if there was a PR for this .. thanks

bryanck commented 5 months ago

No there isn't a PR for this in Iceberg as far as I know. I'm planning on waiting until after the sink is submitted to the Iceberg project before investigating further, as other writers may benefit (e.g. Flink sink).

wzzzrd86 commented 1 week ago

@bryanck Following up on this to see if there has been any movement or a PR?

bryanck commented 1 week ago

Not yet unfortunately, I'm still working on getting the sink fully submitted, but that's pretty close I think.