AbsaOSS / ABRiS

Avro SerDe for Apache Spark structured APIs.
Apache License 2.0
229 stars 75 forks source link

Getting zero byte files using abris #307

Open kpr9991 opened 2 years ago

kpr9991 commented 2 years ago

I was using abris with confluent schema registry to deserialize avro records received from kafka source. When i use confluent schema registry and manually get the schema and pass it to spark default from_avro function by skipping first 6 bytes i was able to read records. I wish to do the same using abris. Since abris as a library does that. But when i am using abris 0 byte files are written. Is this issue with Abris ?

Working code without Abris:

package pruthvi.kafka.poc
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.avro.functions.from_avro
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.streaming.Trigger
import io.confluent.kafka.schemaregistry.client.rest.RestService
import org.apache.avro.Schema

import java.util
import org.apache.spark.sql.avro.functions._

object Important1 {

  def main(args: Array[String]): Unit = {
    println("Hello world!")

try {
      val spark: SparkSession = SparkSession.builder
      .master("local[3]")
      .appName("Kafka testing")
      .config("spark.streaming.stopGracefullyOnShutdown", "true")
      .config("spark.sql.shuffle.partitions",3)
      .getOrCreate()
      spark.conf.set("spark.sql.avro.compression.codec", "uncompressed")

      val topicName = "foo"

      val df = spark.readStream
      .format("kafka")
      .option("kafka.security.protocol", "SSL")
      .option("kafka.ssl.truststore.location", "certs/truststore.jks")
      .option("kafka.ssl.keystore.location", "certs/keystore.jks")
      .option("kafka.ssl.key.password", "foo")
      .option("kafka.ssl.keystore.password", "foo")
      .option("kafka.ssl.truststore.password", "foo")
      .option("kafka.bootstrap.servers", "x:16501,y:16501,z:16501")
      .option("subscribe", topicName)
      .option("kafka.group.id", "foo")
      .option("startingOffsets", "earliest")
    .load()

  val schemaRegistryURL = "url"
  val restService = new RestService(schemaRegistryURL)
  val valueRestResponseSchema = restService.getLatestVersion(topicName)
val jsonSchema = valueRestResponseSchema.getSchema

      import spark.implicits._
      val dsAvroRecord = df
        .selectExpr("substring(value, 6) as avro_value")
        .select(
      from_avro($"avro_value", jsonSchema, fromAvroConfig).as("RecordValue")) 

  dsAvroRecord.writeStream
    .outputMode("append")
    .format("json")
    .option("path", "output")
    .trigger(Trigger.ProcessingTime(1))
    .option("checkpointLocation", "chk_point_dir")
    .start().awaitTermination()
}
    catch{
      case e:Exception=>{
        println(e.printStackTrace())
      }
    }
  }
}

With Abris :

package pruthvi.kafka.poc
package examples

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.streaming.Trigger
import za.co.absa.abris.config.AbrisConfig

import scala.concurrent.duration.Duration

object readStreamingData {

  def main(args: Array[String]): Unit = {

    val spark: SparkSession = SparkSession.builder
      .master("local[4]")
      .appName("Kafka testing")
      .config("spark.streaming.stopGracefullyOnShutdown", "true")
      .getOrCreate()

    val topicName = "foo"
    val df = spark.readStream
      .format("kafka")
      .option("kafka.security.protocol", "SSL")
      .option("kafka.ssl.truststore.location", "certs/truststore.jks")
      .option("kafka.ssl.keystore.location", "certs/keystore.jks")
      .option("kafka.ssl.key.password", "")
      .option("kafka.ssl.keystore.password", "")
      .option("kafka.ssl.truststore.password", "")
      .option(
        "kafka.bootstrap.servers",
        "x:16501,y:16501,z:16501"
      )
      .option("kafka.group.id", "foo")
      .option("subscribe", topicName)
      .option("startingOffsets", "earliest")
      .option("maxOffsetsPerTrigger","50")
      .load()

    val abrisConfig =
      AbrisConfig.fromConfluentAvro.downloadReaderSchemaByLatestVersion
        .andTopicNameStrategy(topicName)
        .usingSchemaRegistry("url")

    import za.co.absa.abris.avro.functions.from_avro
    val deserialized = df.select(from_avro(col("value"), abrisConfig) as 'data)

    deserialized
    .writeStream
    .option("path", "output")
    .option("checkpointLocation", "chk_point_dir")
    .outputMode("append")
    .format("parquet")
    .trigger(Trigger.ProcessingTime(1000))
    .start()
    .awaitTermination()

  }
}
kevinwallimann commented 2 years ago

Hi @kpr9991 Thanks for your examples. I ran your code with Abris and consumed the records as expected. I don't see any errors in your provided code. What versions of Spark and Abris are you running? I see that you used the same output and checkpoint directory for both examples. Is it possible that you forgot to clear the checkpoints before running the example with Abris?

kpr9991 commented 2 years ago

I ran it for 2.5hrs and then it was filling up the 0 byte files. The culprit was trigger. I thought trigger would trigger the infinite table for every x milliseconds, and write the output but it was creating 0 byte files for every x milliseconds and after some hours it started to write the files. Trigger is not behaving as expected. So i removed trigger and it ran perfectly. But still failed for batch data. Can you try once with spark.read instead of spark.readStream and spark.write instead of spark.writeStream. I am getting 0 byte files even in this case and this doesnt have any triggers so nothing to remove from this code to test

kevinwallimann commented 2 years ago

Hi @kpr9991 I was able to run the example using .read and .write and could ingest the records as expected. I don't see that this is a problem in Abris. The default value for the trigger is ProcessingTime(0), so it seems strange that changing from ProcessingTime(1000) to ProcessingTime(0) solved your problem, but it surely sped up your query. One reason for empty parquet files can be if some partitions happen to be empty. Maybe you can try running with a single executor and see if the problem persists.