Open dahiyahimanshu opened 3 weeks ago
Hi @dahiyahimanshu You have two options:
ParquetStreams.toParquetSingleFile
with custom
option. Then you leverage original Parquet's capabilities to write Avro files (https://github.com/apache/parquet-java/tree/master/parquet-avro). Here's an example using Protobuf: https://github.com/mjakubowski84/parquet4s/blob/master/examples/src/main/scala/com/github/mjakubowski84/parquet4s/akkaPekko/CustomParquetWriterAkkaPekkoApp.scala. Use available Avro writer instead. I don't know much about Parquet's Avro writer, you need to go through the code.viaParque
t for a stream which writes Parquet files indefinitely then, unfortunately, you have to provide your own instances of ValueDecoder and TypedSchema for IndexedRecord
. Please read the documentation and examples.AvroToParquetPartitioningFlow.scala.txt AvroToParquetSink.scala.txt
@mjakubowski84 : I tried a custom impl on my side taking inspiration from ParquetPartitioningFlow (in the current repo). Wonder if we can enrich your infinite streams impl to yields avro to parquet objects
Hi! That can be a good idea! However, I think it will be even better to allow users to configure the existing flow to optionally use a provided low-level writer. Something resembling the writerFactory you have in your code. Please check custom builder options in fromParquet and toParquetSingleFile. viaParquet hasn't obtained such a feature yet.
` import com.github.mjakubowski84.parquet4s.ScalaCompat.stream.scaladsl.Sink import com.github.mjakubowski84.parquet4s. import org.apache.avro.generic.IndexedRecord import org.apache.parquet.hadoop.ParquetFileWriter.Mode import org.apache.parquet.hadoop.metadata.CompressionCodecName import org.apache.pekko.Done import org.apache.pekko.stream.scaladsl.
import scala.concurrent.Future import scala.concurrent.duration.DurationInt
object AvroToParquetSink {
private val writeOptions = ParquetWriter.Options(writeMode = Mode.OVERWRITE, compressionCodecName = CompressionCodecName.SNAPPY)
def parquetSink(path: String)(implicit schemaResolver: ParquetSchemaResolver[IndexedRecord], encoder: ParquetRecordEncoder[IndexedRecord] ): Sink[IndexedRecord, Future[Done]] = Flow[IndexedRecord] .via( ParquetStreams.viaParquet .of[IndexedRecord] .maxCount(writeOptions.rowGroupSize) .maxDuration(10.seconds) .options(writeOptions) .write(Path(s"file://$path")) ) .toMat(Sink.ignore)(Keep.right) } `
Is it feasible to write avro IndexedRecords to parquet file using ParquetStreams