mjakubowski84 / parquet4s

Read and write Parquet in Scala. Use Scala classes as schema. No need to start a cluster.
https://mjakubowski84.github.io/parquet4s/
MIT License
283 stars 65 forks source link

Unable to pass schema implicitely #245

Closed nialloriordan closed 2 years ago

nialloriordan commented 2 years ago

This is a very exciting package and I am currently trying to use parquet4s instead of org.apache.parquet.avro. However, I am having difficulty reading and writing via Generic Records even after following the documentation here.

My current implementation with parquet4s is as follows:

import com.github.mjakubowski84.parquet4s.{ParquetReader, ParquetWriter, Path, RowParquetRecord}
import org.apache.parquet.schema.MessageType

val record = ParquetReader.as[RowParquetRecord].read(Path("file.parquet"))

ParquetWriter
  .of[RowParquetRecord] // schema is passed implicitly
  .writeAndClose(Path("file.parquet"), record)

This results in the following error:

Please check if there is implicit TypedSchemaDef available for each field and subfield of com.github.mjakubowski84.parquet4s.RowParquetRecord.

I also tried to obtain the schema directly from the record but this does not seem to be possible in parquet4s:

import com.github.mjakubowski84.parquet4s.{ParquetReader, ParquetWriter, Path, RowParquetRecord}
import org.apache.parquet.schema.MessageType

val record = ParquetReader.generic.read(Path("file.parquet"))

val schema = record.getSchema() // throws an error

ParquetWriter
  .generic(schema) // schema is passed explicitly
  .writeAndClose(Path("file.parquet"), data)

My current approach with org.apache.parquet.avro looks as follows:

import org.apache.hadoop.fs.Path
import org.apache.hadoop.conf.Configuration
import org.apache.parquet.hadoop.metadata.CompressionCodecName
import org.apache.parquet.hadoop.ParquetFileWriter
import org.apache.parquet.hadoop.util.{HadoopInputFile, HadoopOutputFile}
import org.apache.avro.generic.GenericRecord
import org.apache.parquet.avro.{AvroParquetReader, AvroParquetWriter}

val inputPath = new Path("file.parquet")
val outputPath = new Path("file_output.parquet")

val inputFile = HadoopInputFile.fromPath(
      inputPath,
      new Configuration
)

val outputFile = HadoopOutputFile.fromPath(
  outputPath,
  new Configuration
)

val reader = AvroParquetReader
      .builder[GenericRecord](inputFile)
      .build()

val record = reader.read()

val schema = record.getSchema()

val writer = AvroParquetWriter
    .builder[GenericRecord](outputFile)
    .withSchema(schema)
    .withCompressionCodec(CompressionCodecName.SNAPPY)
    .withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
    .build()

try {
  writer.write(record)
} finally {
  writer.close()
}

Do you have any recommendations on how to replicate the above in parquet4s? It is simply reading a parquet file and writing the parquet file again without any changes to the file.

mjakubowski84 commented 2 years ago

Hi @nialloriordan

Sorry for the late response - I was away on vacation :)

You fail to pass implicit schema, because you did not define it. Please check the docs how to do it.

val schema = record.getSchema() throws an error because there's no such a function. In Parquet you define the schema like that https://github.com/mjakubowski84/parquet4s/blob/master/examples/src/main/scala/com/github/mjakubowski84/parquet4s/core/WriteAndReadGenericApp.scala#L34.

I hope that helps.