AbsaOSS / ABRiS

Avro SerDe for Apache Spark structured APIs.
Apache License 2.0
231 stars 76 forks source link

Unable to deserialize the records via Abris Pyspark #366

Open debuggerrr opened 1 month ago

debuggerrr commented 1 month ago

I am trying to run this code where I am trying to install the packages during runtime

`from pyspark import SparkContext from pyspark.sql.column import Column, _to_java_column from pyspark.sql.session import SparkSession

spark = SparkSession \ .builder \ .appName("Kafka_Test") \ .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0,org.apache.spark:spark-avro_2.12:3.3.0,za.co.absa:abris_2.13:6.4.0") \ .getOrCreate()

def from_avro(col, config): """ avro deserialize

:param col (PySpark column / str): column name "key" or "value"
:param config (za.co.absa.abris.config.FromAvroConfig): abris config, generated from abris_config helper function
:return: PySpark Column
"""
jvm_gateway = SparkContext._active_spark_context._gateway.jvm
abris_avro = jvm_gateway.za.co.absa.abris.avro

return Column(abris_avro.functions.from_avro(_to_java_column(col), config))

def from_avro_abris_config(config_map, topic, is_key): """ Create from avro abris config with a schema url

:param config_map (dict[str, str]): configuration map to pass to deserializer, ex: {'schema.registry.url': 'http://localhost:8081'}
:param topic (str): kafka topic
:param is_key (bool): boolean
:return: za.co.absa.abris.config.FromAvroConfig
"""
jvm_gateway = SparkContext._active_spark_context._gateway.jvm
scala_map = jvm_gateway.PythonUtils.toScalaMap(config_map)

return jvm_gateway.za.co.absa.abris.config \
    .AbrisConfig \
    .fromConfluentAvro() \
    .downloadReaderSchemaByLatestVersion() \
    .andTopicNameStrategy(topic, is_key) \
    .usingSchemaRegistry(scala_map)

def to_avro(col, config): """ avro serialize :param col (PySpark column / str): column name "key" or "value" :param config (za.co.absa.abris.config.ToAvroConfig): abris config, generated from abris_config helper function :return: PySpark Column """ jvm_gateway = SparkContext._active_spark_context._gateway.jvm abris_avro = jvm_gateway.za.co.absa.abris.avro

return Column(abris_avro.functions.to_avro(_to_java_column(col), config))

def to_avro_abris_config(config_map, topic, is_key): """ Create to avro abris config with a schema url

:param config_map (dict[str, str]): configuration map to pass to the serializer, ex: {'schema.registry.url': 'http://localhost:8081'}
:param topic (str): kafka topic
:param is_key (bool): boolean
:return: za.co.absa.abris.config.ToAvroConfig
"""
jvm_gateway = SparkContext._active_spark_context._gateway.jvm
scala_map = jvm_gateway.PythonUtils.toScalaMap(config_map)

return jvm_gateway.za.co.absa.abris.config \
    .AbrisConfig \
    .toConfluentAvro() \
    .downloadSchemaByLatestVersion() \
    .andTopicNameStrategy(topic, is_key) \
    .usingSchemaRegistry(scala_map)

df = spark.read.format("kafka").option("kafka.bootstrap.servers", "localhost:9092").option("subscribe", "test01").load()

from_avro_abris_settings = from_avro_abris_config({'schema.registry.url': 'http://schema-registry:8081'}, 'test01', False) df2 = df.withColumn("parsed", from_avro("value", from_avro_abris_settings)) df2.show()`

But it is giving me the below error:

`:: problems summary :: :::: WARNINGS module not found: io.confluent#kafka-avro-serializer;6.2.1

==== local-m2-cache: tried

  file:/Users/sid/.m2/repository/io/confluent/kafka-avro-serializer/6.2.1/kafka-avro-serializer-6.2.1.pom

  -- artifact io.confluent#kafka-avro-serializer;6.2.1!kafka-avro-serializer.jar:

  file:/Users/sid/.m2/repository/io/confluent/kafka-avro-serializer/6.2.1/kafka-avro-serializer-6.2.1.jar

==== local-ivy-cache: tried

  /Users/sid/.ivy2/local/io.confluent/kafka-avro-serializer/6.2.1/ivys/ivy.xml

  -- artifact io.confluent#kafka-avro-serializer;6.2.1!kafka-avro-serializer.jar:

  /Users/sid/.ivy2/local/io.confluent/kafka-avro-serializer/6.2.1/jars/kafka-avro-serializer.jar

==== central: tried

  https://repo1.maven.org/maven2/io/confluent/kafka-avro-serializer/6.2.1/kafka-avro-serializer-6.2.1.pom

  -- artifact io.confluent#kafka-avro-serializer;6.2.1!kafka-avro-serializer.jar:

  https://repo1.maven.org/maven2/io/confluent/kafka-avro-serializer/6.2.1/kafka-avro-serializer-6.2.1.jar

==== spark-packages: tried

  https://repos.spark-packages.org/io/confluent/kafka-avro-serializer/6.2.1/kafka-avro-serializer-6.2.1.pom

  -- artifact io.confluent#kafka-avro-serializer;6.2.1!kafka-avro-serializer.jar:

  https://repos.spark-packages.org/io/confluent/kafka-avro-serializer/6.2.1/kafka-avro-serializer-6.2.1.jar

    module not found: io.confluent#kafka-schema-registry-client;6.2.1

==== local-m2-cache: tried

  file:/Users/sid/.m2/repository/io/confluent/kafka-schema-registry-client/6.2.1/kafka-schema-registry-client-6.2.1.pom

  -- artifact io.confluent#kafka-schema-registry-client;6.2.1!kafka-schema-registry-client.jar:

  file:/Users/sid/.m2/repository/io/confluent/kafka-schema-registry-client/6.2.1/kafka-schema-registry-client-6.2.1.jar

==== local-ivy-cache: tried

  /Users/sid/.ivy2/local/io.confluent/kafka-schema-registry-client/6.2.1/ivys/ivy.xml

  -- artifact io.confluent#kafka-schema-registry-client;6.2.1!kafka-schema-registry-client.jar:

  /Users/sid/.ivy2/local/io.confluent/kafka-schema-registry-client/6.2.1/jars/kafka-schema-registry-client.jar

==== central: tried

  https://repo1.maven.org/maven2/io/confluent/kafka-schema-registry-client/6.2.1/kafka-schema-registry-client-6.2.1.pom

  -- artifact io.confluent#kafka-schema-registry-client;6.2.1!kafka-schema-registry-client.jar:

  https://repo1.maven.org/maven2/io/confluent/kafka-schema-registry-client/6.2.1/kafka-schema-registry-client-6.2.1.jar

==== spark-packages: tried

  https://repos.spark-packages.org/io/confluent/kafka-schema-registry-client/6.2.1/kafka-schema-registry-client-6.2.1.pom

  -- artifact io.confluent#kafka-schema-registry-client;6.2.1!kafka-schema-registry-client.jar:

  https://repos.spark-packages.org/io/confluent/kafka-schema-registry-client/6.2.1/kafka-schema-registry-client-6.2.1.jar

    ::::::::::::::::::::::::::::::::::::::::::::::

    ::          UNRESOLVED DEPENDENCIES         ::

    ::::::::::::::::::::::::::::::::::::::::::::::

    :: io.confluent#kafka-avro-serializer;6.2.1: not found

    :: io.confluent#kafka-schema-registry-client;6.2.1: not found

    ::::::::::::::::::::::::::::::::::::::::::::::

:: USE VERBOSE OR DEBUG MESSAGE LEVEL FOR MORE DETAILS Exception in thread "main" java.lang.RuntimeException: [unresolved dependency: io.confluent#kafka-avro-serializer;6.2.1: not found, unresolved dependency: io.confluent#kafka-schema-registry-client;6.2.1: not found] at org.apache.spark.deploy.SparkSubmitUtils$.resolveMavenCoordinates(SparkSubmit.scala:1456) ` Please help me in this.