AbsaOSS / ABRiS

Avro SerDe for Apache Spark structured APIs.
Apache License 2.0
229 stars 75 forks source link

to_avro specify the key #291

Closed geoHeil closed 2 years ago

geoHeil commented 2 years ago

The commercial (databricks edition) allows to specify a key when writing Avro like: https://docs.microsoft.com/en-us/azure/databricks/spark/latest/structured-streaming/avro-dataframe Sadly, this was never added to spark https://github.com/apache/spark/pull/31771.

How can I specify a key using ABRiS?

https://github.com/AbsaOSS/ABRiS/blob/c5301eedee0e8195678f7f8a999f5ba2ebdf5303/src/main/scala/za/co/absa/abris/examples/ConfluentKafkaAvroWriter.scala#L62 nicely shows how to write data to kafka - but a key is missing.

When trying to specify a key I either get errors - or no results (= a nulled out key):

for a dataframe df with the following schema:

root
 |-- key: string (nullable = true)
 |-- value: binary (nullable = false)

where both the key and value schema are registered.

val keySchema = AvroSchemaUtils.toAvroSchema(df, "key")
schemaManager.register(SchemaSubject.usingTopicNameStrategy("t", true), schema)

When now writing to kafka ( for a dataframe with a key kolumn and avro-value column):

val query = aggedAsAvro.writeStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "localhost:9092")
    .option("topic", "t")
    .outputMode("update")
    .option("checkpointLocation", "my_query_checkpoint_dir")
    .start()

The Key of the messages is set to null. How can I get the key to actually write the key as well?

cerveada commented 2 years ago

You need to use to_avro twice, once for key and once for value.

The key needs its own abrisConfig config that points to the correct key schema. For some schema naming strategies, you need to set the isKey to true so the proper shcema name is crated.

Examples of the config are here: https://github.com/AbsaOSS/ABRiS/blob/master/documentation/confluent-avro-documentation.md

geoHeil commented 2 years ago

Do you have a full and working example?

Indeed, I was following https://github.com/AbsaOSS/ABRiS/blob/master/documentation/confluent-avro-documentation.md and also for the schema of the key:

val keySchema = AvroSchemaUtils.toAvroSchema(df, "key")
schemaManager.register(SchemaSubject.usingTopicNameStrategy("t", true), schema)

but then fail with:

Caused by: org.apache.spark.sql.avro.IncompatibleSchemaException: Cannot convert SQL top-level record to Avro top-level record because schema is incompatible (sqlType = STRING, avroType = {"type":"record","name":"topLevelRecord","fields":[{"name":"brand","type":["string","null"]},{"name":"rating_mean","type":["double","null"]},{"name":"duration_mean","type":["double","null"]}]})

when trying to use the Avro string type for the string key column. When instead of only calling to_avro once, I get the NULLed out key field (but no exception).

geoHeil commented 2 years ago

When using:

.andTopicNameStrategy("t")

for a topic t where the schema is postfixed with either: -key or -value I would expect that a single AbrisConfig works.

cerveada commented 2 years ago

If it was one config, how would to_avro function know if it is key or value?

geoHeil commented 2 years ago

For example from https://github.com/AbsaOSS/ABRiS/blob/master/documentation/confluent-avro-documentation.md:

// Use latest version of already existing schema
val toAvroConfig4: ToAvroConfig = AbrisConfig
    .toConfluentAvro
    .downloadSchemaByLatestVersion
    .andTopicNameStrategy("fooTopic")
    .usingSchemaRegistry("http://registry-url")

If one follows the (default) naming convention for topic: fooTopic, there is a:

schema. And therefore I assume that .andTopicNameStrategy("fooTopic") would work in both cases.

Where would you specify here if it is a key or value?

cerveada commented 2 years ago

As mentioned in other place in the documentation, the value schema is the default. If you look at the code, you will see that the method looks like this:

 def andTopicNameStrategy(topicName: String, isKey: Boolean = false)

So if you want to use key schema, you must set isKey = true

geoHeil commented 2 years ago

Thanks.

But when setting two different configurations:

Cannot convert SQL top-level record to Avro top-level record because schema is incompatible (sqlType = STRING, avroType = {"type":"record","name":"topLevelRecord","fields":[{"name":"brand","type":["string","null"]},{"name":"rating_mean","type":["double","null"]},{"name":"duration_mean","type":["double","null"]}]

still is the error. For an input data frame of:

|-- key_brand: binary (nullable = true)
|-- value: binary (nullable = false)

As you can see the original:

root
 |-- brand: string (nullable = true)
 |-- rating_mean: double (nullable = true)
 |-- duration_mean: double (nullable = true)

is transformed into a single key and value column.

geoHeil commented 2 years ago

Here you go with a full and self-contained example. I have tried to follow your suggestions - however, the key wich is outputted to kafka still is nulled out!

import spark.implicits._

val aggedDf = Seq(("foo", 1.0, 1.0), ("bar", 2.0, 2.0)).toDF("brand", "rating_mean", "duration_mean")
aggedDf.printSchema
aggedDf.show

+-----+-----------+-------------+
|brand|rating_mean|duration_mean|
+-----+-----------+-------------+
|  foo|        1.0|          1.0|
|  bar|        2.0|          2.0|
+-----+-----------+-------------+

import za.co.absa.abris.avro.parsing.utils.AvroSchemaUtils
import za.co.absa.abris.avro.read.confluent.SchemaManagerFactory
import org.apache.avro.Schema
import za.co.absa.abris.avro.read.confluent.SchemaManager
import za.co.absa.abris.avro.registry.SchemaSubject
import za.co.absa.abris.avro.functions.to_avro
import org.apache.spark.sql._
import za.co.absa.abris.config.ToAvroConfig

// generate schema for all columns in a dataframe
val valueSchema = AvroSchemaUtils.toAvroSchema(aggedDf)
val keySchema = AvroSchemaUtils.toAvroSchema(aggedDf.select($"brand".alias("key_brand")), "key_brand")
val schemaRegistryClientConfig = Map(AbrisConfig.SCHEMA_REGISTRY_URL -> "http://localhost:8081")
val t = "metrics_per_brand_spark222xx"

val schemaManager = SchemaManagerFactory.create(schemaRegistryClientConfig)

// register schema with topic name strategy
def registerSchema1(schemaKey: Schema, schemaValue: Schema, schemaManager: SchemaManager, schemaName:String): Int = {
  schemaManager.register(SchemaSubject.usingTopicNameStrategy(schemaName, true), schemaKey)
  schemaManager.register(SchemaSubject.usingTopicNameStrategy(schemaName, false), schemaValue)
}
registerSchema1(keySchema, valueSchema, schemaManager, t)

val toAvroConfig4 = AbrisConfig
    .toConfluentAvro
    .downloadSchemaByLatestVersion
    .andTopicNameStrategy(t)
    .usingSchemaRegistry("http://localhost:8081")

val toAvroConfig4Key = AbrisConfig
    .toConfluentAvro
    .downloadSchemaByLatestVersion
    .andTopicNameStrategy(t, isKey = true)
    .usingSchemaRegistry("http://localhost:8081")

def writeDfToAvro(keyAvroConfig: ToAvroConfig, toAvroConfig: ToAvroConfig)(dataFrame:DataFrame) = {
  // this is the key! need to keep the key to guarantee temporal ordering
  val availableCols = dataFrame.columns//.drop("brand").columns
  val allColumns = struct(availableCols.head, availableCols.tail: _*)
  dataFrame.select(to_avro($"brand", keyAvroConfig).alias("key_brand"), to_avro(allColumns, toAvroConfig) as 'value)
  // dataFrame.select($"brand".alias("key_brand"), to_avro(allColumns, toAvroConfig) as 'value)
}

val aggedAsAvro = aggedDf.transform(writeDfToAvro(toAvroConfig4Key, toAvroConfig4))
aggedAsAvro.printSchema

root
 |-- key_brand: binary (nullable = true)
 |-- value: binary (nullable = false)

aggedAsAvro.write
    .format("kafka")
    .option("kafka.bootstrap.servers", "localhost:9092")
    .option("topic", t).save()
cerveada commented 2 years ago

Oh, ok now I understand. Your problem is not Abris.

The value column is the only required option. If a key column is not specified then a null valued key column will be automatically added

from https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html#writing-data-to-kafka

You need to rename the key_brand to key.

geoHeil commented 2 years ago

Indeed. Many thanks.