qubole / streamx

kafka-connect-s3 : Ingest data from Kafka to Object Stores(s3)
Apache License 2.0
97 stars 54 forks source link

JSON records to Parquet on S3 won't work #45

Open inneractive-opensrc opened 7 years ago

inneractive-opensrc commented 7 years ago

Hi

I would like to write parquet directly on S3 my events are only JSON string. Do you know if what I'm trying to do can work ?

In fact I've already tried with this config

{ "name": "ParquetS3", "config": { "name": "ParquetS3", "connector.class": "com.qubole.streamx.s3.S3SinkConnector", "format.class": "io.confluent.connect.hdfs.parquet.ParquetFormat", "partitioner.class": "io.confluent.connect.hdfs.partitioner.HourlyPartitioner", "locale": "en", "timezone": "UTC", "tasks.max": 11, "topics": "XXX", "flush.size": 50000, "s3.url": "s3n://XXXXX", "hadoop.conf.dir": "/etc/kafka-connect-s3/hadoop-conf" } }

it's actually don't work

The only error I can see in the log is

Task ParquetS3-56 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:141) java.lang.NullPointerException at io.confluent.connect.hdfs.DataWriter.close(DataWriter.java:299) at io.confluent.connect.hdfs.HdfsSinkTask.close(HdfsSinkTask.java:122) at org.apache.kafka.connect.runtime.WorkerSinkTask.commitOffsets(WorkerSinkTask.java:317) at org.apache.kafka.connect.runtime.WorkerSinkTask.closePartitions(WorkerSinkTask.java:480) at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:152) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:139) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:182) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:748)

Is there any other log output for streamX connector

Thanks

lewisdawson commented 6 years ago

@inneractive-opensrc This doesn't work because the io.confluent.connect.hdfs.parquet.ParquetFormat class expects the source data being read into the formatter to be Avro format, not JSON. If you check out my fork, I've added the ability to convert CSV sourced data to Parquet before uploading to S3, which is very similar to what you want. You'd just need to write code similar to the CSV Parquet converter that instead converts JSON source data to Parquet.