databricks / spark-avro

Avro Data Source for Apache Spark
http://databricks.com/
Apache License 2.0
539 stars 310 forks source link

Add support to convert to Avro IndexedRecord #275

Open mquraishi opened 6 years ago

mquraishi commented 6 years ago

I can extract an avro schema using SchemaConverters.convertStructToAvro. It would be nice if I can take a dataframe and get an avro indexedrecord or genericrecord using that generated avro schema. The feature to create containered-avro objects will be very useful. Today, there is code that needs to be written. Adding to this package will make it very easy! Thanks. A method that implements something like this:

    val df = spark.read.parquet(from_a_location)
    val ds = df.toJSON
    //These two next lines get around the lack of support of Date and Timestamp in Avro
    val jsonDF = spark.read.json(ds) // schema after going through all records
    val jsonSchema = jsonDF.schema // make it json

    val rdd = ds.rdd
    rdd.foreach(rec => {
      val json = rec.getBytes
      // Get the StructType Schema using the json schema with StringType for everything
      val avroSchema = SchemaConverters.convertStructToAvro(jsonSchema, SchemaBuilder.record("client").namespace("com.cigna.bigdata"), "com.cigna.bigdata")

      val converter = new JsonAvroConverter()
      //Get the Avro Generic Record - This blows up if any one record has a missing field.  Will need to fix that too.
      val record = converter.convertToGenericDataRecord(json, avroSchema)
    }