confluentinc / kafka-connect-hdfs

Kafka Connect HDFS connector
Other
12 stars 397 forks source link

NullPoint Exception, io.confluent.connect.hdfs3.Hdfs3SinkTask.open #583

Open Kimakjun opened 3 years ago

Kimakjun commented 3 years ago

I'm using the hdfs3 connector to consume data in kafka and stack it in hdfs3.

My Kafka confluent, sink connect version is below.

kafka-confluent:5.4.2-1
confluentinc/kafka-connect-hdfs3:latest

and my connect settings are as follows:

name=Hdfs3SinkConnector
connector.class=io.confluent.connect.hdfs3.Hdfs3SinkConnector
confluent.topic.bootstrap.servers="~~"
consumer.auto.offset.reset=earliest
topics="~~"

hdfs.url=hdfs://~~
store.url=hdfs://~~

plugin.path="~~"

schema.compatibility=BACKWARD

format.class=io.confluent.connect.hdfs3.parquet.ParquetFormat
key.converter=org.apache.kafka.connect.converters.ByteArrayConverter
value.converter=org.apache.kafka.connect.converters.ByteArrayConverter
value.converter.schemas.enabled=true
value.converter.schema.registry.url="~~"
value.subject.name.strategy=io.confluent.kafka.serializers.subject.TopicNameStrategy

timestamp.extractor=RecordField
timestamp.field=""~~""
partitioner.class=io.confluent.connect.storage.partitioner.TimeBasedPartitioner
path.format=YYYYMMdd/HH/
partition.duration.ms=1000

tasks.max=3
flush.size=100000
rotate.interval.ms=900000

hadoop.conf.dir="~~"
hdfs.authentication.kerberos=true
kerberos.ticket.renew.period.ms=360000
connect.hdfs.keytab="~~"
connect.hdfs.principal="~~"

errors.log.enable=true
errors.log.include.messages=true

timezone=Asia/Seoul
locale=ko_KR

and i encounter errors like this

java.lang.NullPointerException
    at io.confluent.connect.hdfs3.Hdfs3SinkTask.open(Hdfs3SinkTask.java:137)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.openPartitions(WorkerSinkTask.java:587)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.access$1100(WorkerSinkTask.java:67)
    at org.apache.kafka.connect.runtime.WorkerSinkTask$HandleRebalance.onPartitionsAssigned(WorkerSinkTask.java:652)
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.invokePartitionsAssigned(ConsumerCoordinator.java:272)
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:400)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:421)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:340)
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:471)
    at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1267)
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1231)
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1211)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.pollConsumer(WorkerSinkTask.java:444)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:317)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

Could it be a problem that appears when plugin.path is not set properly?

OneCricketeer commented 3 years ago

Hi @Kimakjun

This repo is for the HDFS2 connector. Your question seems to be about the HDFS3 one