AbsaOSS / ABRiS

Avro SerDe for Apache Spark structured APIs.
Apache License 2.0
229 stars 75 forks source link

Add example of to_avro in python to documentation #128

Closed cerveada closed 4 years ago

cerveada commented 4 years ago

Maybe python docs could be moved to special page as the other documentation pages.

Vincent-Zeng commented 4 years ago

My example:

from pyspark import SparkContext
from pyspark.sql.column import Column, _to_java_column

def from_avro(col, topic, schema_registry_url):
    """
    avro deserialize

    :param col: column name "key" or "value"
    :param topic: kafka topic
    :param schema_registry_url: schema registry http address
    :return:
    """
    jvm_gateway = SparkContext._active_spark_context._gateway.jvm
    abris_avro = jvm_gateway.za.co.absa.abris.avro
    naming_strategy = getattr(
        getattr(abris_avro.read.confluent.SchemaManager, "SchemaStorageNamingStrategies$"),
        "MODULE$"
    ).TOPIC_NAME()

    schema_registry_config_dict = {
        "schema.registry.url": schema_registry_url,
        "schema.registry.topic": topic,
        "{col}.schema.id".format(col=col): "latest",
        "{col}.schema.naming.strategy".format(col=col): naming_strategy
    }

    conf_map = getattr(getattr(jvm_gateway.scala.collection.immutable.Map, "EmptyMap$"), "MODULE$")
    for k, v in schema_registry_config_dict.items():
        conf_map = getattr(conf_map, "$plus")(jvm_gateway.scala.Tuple2(k, v))

    return Column(abris_avro.functions.from_confluent_avro(_to_java_column(col), conf_map))

def to_avro(col, topic, schema_registry_url):
    """
    avro  serialize
    :param col: column name "key" or "value"
    :param topic: kafka topic
    :param schema_registry_url: schema registry http address
    :return:
    """
    jvm_gateway = SparkContext._active_spark_context._gateway.jvm
    abris_avro = jvm_gateway.za.co.absa.abris.avro
    naming_strategy = getattr(
        getattr(abris_avro.read.confluent.SchemaManager, "SchemaStorageNamingStrategies$"),
        "MODULE$"
    ).TOPIC_NAME()

    schema_registry_config_dict = {
        "schema.registry.url": schema_registry_url,
        "schema.registry.topic": topic,
        "{col}.schema.id".format(col=col): "latest", 
        "{col}.schema.naming.strategy".format(col=col): naming_strategy
    }

    conf_map = getattr(getattr(jvm_gateway.scala.collection.immutable.Map, "EmptyMap$"), "MODULE$")
    for k, v in schema_registry_config_dict.items():
        conf_map = getattr(conf_map, "$plus")(jvm_gateway.scala.Tuple2(k, v))

    return Column(abris_avro.functions.to_confluent_avro(_to_java_column(col), conf_map))
cerveada commented 4 years ago

Nice! I will add that to documentation. Thank you.

brandon-stanley commented 4 years ago

Thanks @cerveada & @Vincent-Zeng!

Should the descriptions of the functions be switched around (to_avro should be serialize and from_avro should be deserialize)?

Also, would it be more efficient to define the conf_map outside of the functions and pass it in as a parameter rather than defining it within to_avro and from_avro?

Vincent-Zeng commented 4 years ago

Thanks @cerveada & @Vincent-Zeng!

Should the descriptions of the functions be switched around (to_avro should be serialize and from_avro should be deserialize)?

Also, would it be more efficient to define the conf_map outside of the functions and pass it in as a parameter rather than defining it within to_avro and from_avro?

  1. yeah. Description fixed now.
  2. No difference, it would not be more efficient.