databricks / spark-avro

Avro Data Source for Apache Spark
http://databricks.com/
Apache License 2.0
539 stars 310 forks source link

using the SchemaConverters.convertStructToAvro #277

Open divo77 opened 6 years ago

divo77 commented 6 years ago

In my project I am trying to convert the Struct DF schema to Avro schema with SchemaConverters.convertStructToAvro . Does anybody have example how to use this converter ?

I am using Scala and

com.databricks spark-avro_2.11 4.0.0
florin1288 commented 6 years ago

import com.databricks.spark.avro.SchemaConverters import org.apache.avro.{Schema, SchemaBuilder} import org.apache.avro.SchemaBuilder.RecordBuilder import org.apache.avro.generic.{GenericData, GenericRecord} import org.apache.spark.sql.types.StructType import org.apache.spark.sql._

val recordName: String = "Model" val recordNamespace: String = "Test" val sparkSession: SparkSession = SparkSession.builder().master("local[*]").getOrCreate() val builder: RecordBuilder[Schema] = SchemaBuilder.record(recordName).namespace(recordNamespace)

def serialize(): Unit = { import sparkSession.implicits._ val inputDF: DataFrame = sparkSession.sparkContext.parallelize(Seq(Model(Some(List("param1"))))).toDF() val structType: StructType = Encoders.product[Model].schema

val schema: Schema = SchemaConverters.convertStructToAvro(structType, builder, recordNamespace) val genericRecord: GenericRecord = getGenericRecord(inputDF.head())

avroSerialize(schema, genericRecord) }

case class Model(params: Option[List[String]])

def avroSerialize(schema: Schema, genericRecord: GenericRecord): Try[Array[Byte]] = { Try { val writer = new SpecificDatumWriter[GenericRecord](schema) val out = new ByteArrayOutputStream() val encoder = EncoderFactory.get().binaryEncoder(out, null) writer.write(genericRecord, encoder) encoder.flush() out.close() out.toByteArray } }