AbsaOSS / ABRiS

Avro SerDe for Apache Spark structured APIs.
Apache License 2.0
229 stars 75 forks source link

I can read the data with from_confluent_avro, but unable to write to_confluent_avro #133

Closed luancaarvalho closed 4 years ago

luancaarvalho commented 4 years ago

Hello, Im reading data with from_confluent_avro and i can even print my schema, but when I try to write to another topic with the same schema of my reading topic this error appears:

ERROR SchemaManager: Problems found while retrieving metadata for subject 'customers-key.key'

So, I think I'm making a mistake in the configuration, but since I'm following the same plan as the reading configuration changing only topic name. I really don't know what I'm doing wrong, follow the code with read and write configurations separated. Explain the code: I read from dbserver1.inventory.customers where have your schema registred in schema registry and i want to put in customers topic this avro record :


import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import za.co.absa.abris.avro.functions.from_confluent_avro
import za.co.absa.abris.avro.read.confluent.SchemaManager
import za.co.absa.abris.avro.functions.to_confluent_avro

object Testkeyy {

  val spark = SparkSession.builder()
    .appName("Our first streams")
    .master("local[2]")
    .getOrCreate()
  val schemaRegistryURL = "http://localhost:8081"
  val topic = "dbserver1.inventory.customers"

  val commonRegistryConfig = Map(
    SchemaManager.PARAM_SCHEMA_REGISTRY_TOPIC -> topic,
    SchemaManager.PARAM_SCHEMA_REGISTRY_URL -> schemaRegistryURL
  )

  val keyRegistryConfig = commonRegistryConfig ++ Map(
    SchemaManager.PARAM_KEY_SCHEMA_NAMING_STRATEGY -> "topic.record.name",
    SchemaManager.PARAM_KEY_SCHEMA_ID -> "latest",
    SchemaManager.PARAM_KEY_SCHEMA_NAME_FOR_RECORD_STRATEGY -> "key"

  )

  val valueRegistryConfig = commonRegistryConfig ++ Map(
    SchemaManager.PARAM_VALUE_SCHEMA_NAMING_STRATEGY -> "topic.name",
    SchemaManager.PARAM_VALUE_SCHEMA_ID -> "latest"
  )

  def readFromAvro() = {
    val df = spark
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "localhost:9092")
      .option("subscribe", "dbserver1.inventory.customers")
      .option("startingOffsets", "earliest")
      .load()

    val data = df.select(
      from_confluent_avro(col("key"), keyRegistryConfig).as("key")
      ,from_confluent_avro(df.col("value"), valueRegistryConfig) as 'value)
    data.printSchema

    val config = Map(
      SchemaManager.PARAM_SCHEMA_REGISTRY_TOPIC -> "teste",
      SchemaManager.PARAM_SCHEMA_REGISTRY_URL -> "http://localhost:8081",
      SchemaManager.PARAM_KEY_SCHEMA_NAME_FOR_RECORD_STRATEGY -> "key",
      SchemaManager.PARAM_KEY_SCHEMA_NAMESPACE_FOR_RECORD_STRATEGY -> "customers"
    )

    val configkey = config ++ Map(
      SchemaManager.PARAM_KEY_SCHEMA_NAMING_STRATEGY -> "topic.record.name")

    val configvalue = config ++ Map(
      SchemaManager.PARAM_VALUE_SCHEMA_NAMING_STRATEGY -> "topic.name"
    )

    data.select(
      to_confluent_avro(col("key"), configkey).as("key"),
      to_confluent_avro(col("value"), configvalue).as("value"))
      .writeStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "localhost:9092")
      .option("topic", "customers")
      .outputMode("append")
      .option("checkpointLocation", "checkpoints")
      .start() 

My Customers-key and customers-value Avro Schemas ((It's exactly the same schema of dbserver1.inventory.customers only changed the topic name).

{
  "type": "record",
  "name": "Key",
  "namespace": "customers",
  "fields": [
    {
      "name": "id",
      "type": "int"
    }
  ],
  "connect.name": "customers.Key"
}
{
  "type": "record",
  "name": "Value",
  "namespace": "customers",
  "fields": [
    {
      "name": "id",
      "type": "int"
    },
    {
      "name": "first_name",
      "type": "string"
    },
    {
      "name": "last_name",
      "type": "string"
    },
    {
      "name": "email",
      "type": "string"
    }
  ],
  "connect.name": "customers.Value"
}

I would like it very much if you could help me, I have really tried in many ways and have made no progress. Thank you so much !

cerveada commented 4 years ago

Hi, could you provide the whole exception?

When you are writing is the schema already in the registry? Under the "teste" topic?

luancaarvalho commented 4 years ago

Hi, could you provide the whole exception?

When you are writing is the schema already in the registry? Under the "teste" topic?

Thank you, I solved my mistake. But one tip, give more details about scala schema registry integration using key and value, me and another people had a lot of difficulties trying to using this. Example: explain what "topic.record.name" means or give some real example and not "foo" and "com.example" With this changes i think will be more easily to people use Abris.

Thank you Again