Open huynhphuong10284 opened 5 years ago
Can you be a bit more specific? What version of hdfs connector are you seeing this in? what was your config when you saw the NPE? what kind of environment setup do you have when you see the NPE?
The following information in my lab: 1. Kafka Connect version: 2.3.0 2. Hdfs Connector version: 5.3.0 3. Config: { "name": "hdfs-sink", "config": { "connector.class": "io.confluent.connect.hdfs.HdfsSinkConnector", "tasks.max": "1", "topics": "test_hdfs", "hdfs.url": "hdfs://xx.xx.xx.xx:8020/user/ph/connector/", "format.class": "io.confluent.connect.hdfs.parquet.ParquetFormat" "partitioner.class": "TimeBasedPartitioner" "path.format": "path.format=YYYYMMdd" "name": "hdfs-sink" }, "tasks": [] }
The NPE happens on HdfsSinkTask:143 - this line for version 5.3.0: https://github.com/confluentinc/kafka-connect-hdfs/blob/5.3.x/src/main/java/io/confluent/connect/hdfs/HdfsSinkTask.java#L143
That line is not actually HdfsSinkTask.open(). Do you have local changes to HdfsSinkTask in your fork based off 5.3.0? HdfsSinkTask.open is here: https://github.com/confluentinc/kafka-connect-hdfs/blob/5.3.x/src/main/java/io/confluent/connect/hdfs/HdfsSinkTask.java#L155. NPE there most likely means null hdfsWriter.
No change anything, I only downloaded from https://www.confluent.io/hub/confluentinc/kafka-connect-hdfs
Ah, I just realized you try to specify a path in hdfs.url
. The property does not expect the url to contain a path portion. The expected format is documented here.
I think that is what is causing the hdfsWriter to be null and cause the NPE.
I think if you separate the URL portion into hdfs.url
and the path portion into topics.dir
, things should work.
The following is my configuration and still see the same issue, please advice.
{ "name": "test7", "config": { "connector.class": "io.confluent.connect.hdfs.HdfsSinkConnector", "tasks.max": "1", "topics": "userprofile18", "hdfs.url": "hdfs://xx.xx.xx.xx:8020", "topics.dir": "/user/phuonaa/connector/", "flush.size":"1", "format.class":"io.confluent.connect.hdfs.parquet.ParquetFormat" }
Do logs contain any errors or warnings leading up to the NPE?
I got the following issue:
"trace": "java.lang.NullPointerException\n\tat io.confluent.connect.hdfs.HdfsSinkTask.open(HdfsSinkTask.java:143)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.openPartitions(WorkerSinkTask.java:586)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.access$1100(WorkerSinkTask.java:67)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask$HandleRebalance.onPartitionsAssigned(WorkerSinkTask.java:651)\n\tat org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:285)\n\tat org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:424)\n\tat org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:358)\n\tat org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:353)\n\tat org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1251)\n\tat org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1216)\n\tat org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1201)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.pollConsumer(WorkerSinkTask.java:443)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:316)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)\n\tat java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)\n\tat java.util.concurrent.FutureTask.run(FutureTask.java:266)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\tat java.lang.Thread.run(Thread.java:748)\n"