fingltd / 4mc

4mc - splittable lz4 and zstd in hadoop/spark/flink
Other
108 stars 36 forks source link

4mz - Zstd - cannot load csv.4mz into Spark (4mc works correctly) #41

Closed kazmirzkazmirzowski closed 5 years ago

kazmirzkazmirzowski commented 5 years ago

Hello,

First of all thank you for your hard work delivering this solution - it is just fantastic. Second - sorry for posting into issues but i did not find any other place to do so. The issue I have is with loading properly compressed 4mz file (by properly compressed i mean one that gets properly decompressed by your 4mc.exe application, so this gives me confidence that i have created proper file) into Spark DataFrame.

The setup is Spark 2.4.3 for Hadoop 2.7. 4mc lattest version (2.0.0), the LD_LIBRARY_PATH set to proper folder containing hadoop native libraries.

The coode (Scala): ` import com.hadoop.mapreduce.FourMcTextInputFormat import com.hadoop.mapreduce.FourMzTextInputFormat import org.apache.hadoop.io.LongWritable import org.apache.hadoop.io.Text import org.apache.hadoop.mapreduce.lib.input.TextInputFormat import spark.implicits._

var conf = new org.apache.hadoop.conf.Configuration(sc.hadoopConfiguration) conf.set("textinputformat.record.delimiter", "\r\n") conf.set("io.compression.codecs","org.apache.hadoop.io.compress.GzipCodec,org.apache.hadoop.io.compress.DefaultCodec,org.apache.hadoop.io.compress.BZip2Codec,com.hadoop.compression.fourmc.Lz4Codec,com.hadoop.compression.fourmc.Lz4MediumCodec,com.hadoop.compression.fourmc.Lz4HighCodec,com.hadoop.compression.fourmc.Lz4UltraCodec,com.hadoop.compression.fourmc.FourMcCodec,com.hadoop.compression.fourmc.FourMcMediumCodec,com.hadoop.compression.fourmc.FourMcHighCodec,com.hadoop.compression.fourmc.FourMcUltraCodec,com.hadoop.compression.fourmc.FourMzCodec,com.hadoop.compression.fourmc.FourMzMediumCodec,com.hadoop.compression.fourmc.FourMzHighCodec,com.hadoop.compression.fourmc.FourMzUltraCodec")

var rdd = sc.newAPIHadoopFile("file:/home/me/part-m-20190605121832.4mz", classOf[FourMzTextInputFormat].asSubclass(classOf[org.apache.hadoop.mapreduce.lib.input.FileInputFormat[LongWritable, Text]]), classOf[LongWritable],classOf[Text], conf); val dset = rdd.map(_._2.toString).toDS val frame = spark.read.option("header", false).option("inferSchema",true).option("delimiter","\u0001").csv(dset) frame.printSchema() frame.show()`

Every time the rdd is empty, so frame is empty as well. No matter what i do it always ends up empty without any exception raised and without any debug / warning messages (when i set log level to DEBUG i get lots of messages from Spark but none from the 4mc saying anything wrong). Would you be so kind to suggest what am i doing wrong or what are the external references (native libraries or sth?) that i have set faulty?

The code for 4mc compression works flawlessly and it is almost the same (different imput file of course and different TextImputFormat):

`import com.hadoop.mapreduce.FourMcTextInputFormat import org.apache.hadoop.io.LongWritable import org.apache.hadoop.io.Text import org.apache.hadoop.mapreduce.lib.input.TextInputFormat import spark.implicits._

var conf = new org.apache.hadoop.conf.Configuration(sc.hadoopConfiguration) conf.set("textinputformat.record.delimiter", "\r\n") conf.set("io.compression.codecs","org.apache.hadoop.io.compress.GzipCodec,org.apache.hadoop.io.compress.DefaultCodec,org.apache.hadoop.io.compress.BZip2Codec,com.hadoop.compression.fourmc.Lz4Codec,com.hadoop.compression.fourmc.Lz4MediumCodec,com.hadoop.compression.fourmc.Lz4HighCodec,com.hadoop.compression.fourmc.Lz4UltraCodec,com.hadoop.compression.fourmc.FourMcCodec,com.hadoop.compression.fourmc.FourMcMediumCodec,com.hadoop.compression.fourmc.FourMcHighCodec,com.hadoop.compression.fourmc.FourMcUltraCodec,com.hadoop.compression.fourmc.FourMzCodec,com.hadoop.compression.fourmc.FourMzMediumCodec,com.hadoop.compression.fourmc.FourMzHighCodec,com.hadoop.compression.fourmc.FourMzUltraCodec")

var rdd = sc.newAPIHadoopFile("file:/home/me/part-m-20190605121825.4mc", classOf[FourMcTextInputFormat].asSubclass(classOf[org.apache.hadoop.mapreduce.lib.input.FileInputFormat[LongWritable, Text]]), classOf[LongWritable],classOf[Text], conf) val dset = rdd.map(_._2.toString).toDS val frame = spark.read.option("header", false).option("inferSchema",true).option("delimiter","\u0001").csv(dset) frame.printSchema() frame.show() `

Thank you very much in advance. Best regards.

kazmirzkazmirzowski commented 5 years ago

Works with the newest sources taken from github and build locally. Would you mind releasing the updated version as JAR / include it in mvnrepository?

Closing the issue then.

carlomedas commented 5 years ago

I will rebuild with latest fixes from other contributors. At this time I cannot rebuild all the native libraries as I don't have all needed platforms available in this very moment, but it's not going to be an issue because they are binary compatible.