AbsaOSS / ABRiS

Avro SerDe for Apache Spark structured APIs.
Apache License 2.0
229 stars 75 forks source link

Caching Schema from SchemaRegistry using ABRIS 3.2.1 #219

Closed vphutane closed 3 years ago

vphutane commented 3 years ago

I am using ABRIS 3.2.1 library with our Pyspark application code to extract schema and register schema in Schema Registry for AVRO messages. We use Confluent Kafka using Spark 2.4.7 - Spark Structured Streaming . There is lot of IO network from our Pyspark application to SchemaRegistry Server for every microbatch. Reading to through the post https://github.com/AbsaOSS/ABRiS/issues/105 it is said the issue has been resolved in ABRIS 3.2.0 version, however we have seen the same behaviour in the current 3.2.1 version. Can you pls assist if you have seen this issue or if I am missing any piece of configuration in my code to cache the schema. Your suggestion / advice is highly appreciable.

Below is the snippet of the code for from_confluent_avro method that I am using

` get_df = df.select(ReadWriteFromAvro.from_avro('value', raw_topic).alias(df_alias)

def from_avro(col, topic): jvm_gateway = SparkContext._active_spark_context._gateway.jvm abris_avro = jvm_gateway.za.co.absa.abris.avro naming_strategy = getattr( getattr( abris_avro.read.confluent.SchemaManager, 'SchemaStorageNamingStrategies$', ), 'MODULE$', ).TOPIC_NAME()

      schema_registry_config_dict = {
        'schema.registry.url': SCHEMA_REGISTRY_URL,
        'basic.auth.credentials.source': 'USER_INFO',
        'basic.auth.user.info':
            KAFKA_SCHEMA_REGISTRY_API_KEY + ':' + KAFKA_SCHEMA_REGISTRY_API_SECRET,
        'schema.registry.topic': topic,
        f'{col}.schema.id': 'latest',
        f'{col}.schema.naming.strategy': naming_strategy,
    }

    conf_map = getattr(
        getattr(
            jvm_gateway.scala.collection.immutable.Map,
            'EmptyMap$',
        ), 'MODULE$',
    )
    for k, v in schema_registry_config_dict.items():
        conf_map = getattr(conf_map, '$plus')(
            jvm_gateway.scala.Tuple2(k, v),
        )

    return Column(abris_avro.functions.from_confluent_avro(_to_java_column(col), conf_map))`
cerveada commented 3 years ago

I suggest you upgrade to at least 3.2.2.

The version 4 has even more performance improvements so if you have problems with that I would try the latest version 4.2.0. The API changed, but there is lots of examples it shouldn't be that hard to change the code to version 4.

cerveada commented 3 years ago

@vphutane. Did the upgrade solve your issue?

vphutane commented 3 years ago

@cerveada thanks for your comments. I truly appreciate it. I tried using 3.2.2 version and I see the same issue where in it was accessing the schema registry again and again. we inserted single message and we could see it accessing the schema registry server until we push another message. pls refer below message log

2021-04-23 06:12:42,337] INFO 172.22.0.1 - - [23/Apr/2021:06:12:42 +0000] "POST /subjects/tsuppyo-value/versions HTTP/1.1" 200 8 2 (io.confluent.rest-utils.requests:62) [2021-04-23 06:12:43,798] INFO 172.22.0.1 - - [23/Apr/2021:06:12:43 +0000] "GET /schemas/ids/1 HTTP/1.1" 200 2497 1 (io.confluent.rest-utils.requests:62) [2021-04-23 06:12:44,004] INFO 172.22.0.1 - - [23/Apr/2021:06:12:44 +0000] "GET /schemas/ids/1 HTTP/1.1" 200 2497 1 (io.confluent.rest-utils.requests:62) [2021-04-23 06:12:44,022] INFO 172.22.0.1 - - [23/Apr/2021:06:12:44 +0000] "GET /schemas/ids/1 HTTP/1.1" 200 2497 1 (io.confluent.rest-utils.requests:62) [2021-04-23 06:12:44,054] INFO 172.22.0.1 - - [23/Apr/2021:06:12:44 +0000] "GET /schemas/ids/1 HTTP/1.1" 200 2497 1 (io.confluent.rest-utils.requests:62) [2021-04-23 06:12:44,074] INFO 172.22.0.1 - - [23/Apr/2021:06:12:44 +0000] "GET /schemas/ids/1 HTTP/1.1" 200 2497 1 (io.confluent.rest-utils.requests:62) [2021-04-23 06:12:44,080] INFO 172.22.0.1 - - [23/Apr/2021:06:12:44 +0000] "GET /schemas/ids/1 HTTP/1.1" 200 2497 1 (io.confluent.rest-utils.requests:62) [2021-04-23 06:12:44,089] INFO 172.22.0.1 - - [23/Apr/2021:06:12:44 +0000] "GET /schemas/ids/1 HTTP/1.1" 200 2497 1 (io.confluent.rest-utils.requests:62)

-- --and this continues until we push another message

however we did a quick PoC using 4.2.0 package. We ran 3 events, waited 5 minutes and then ran 3 more events. ) It's only doing the call when an event is read. Below is the log for same

[2021-04-23 06:01:40,660] INFO Registering new schema: subject tsuppyo-value, version null, id null, type null (io.confluent.kafka.schemaregistry.rest.resources.SubjectVersionsResource:249) [2021-04-23 06:01:40,661] INFO 172.22.0.1 - - [23/Apr/2021:06:01:40 +0000] "POST /subjects/tsuppyo-value/versions HTTP/1.1" 200 8 3 (io.confluent.rest-utils.requests:62) [2021-04-23 06:01:42,044] INFO 172.22.0.1 - - [23/Apr/2021:06:01:42 +0000] "GET /schemas/ids/1 HTTP/1.1" 200 2497 4 (io.confluent.rest-utils.requests:62) [2021-04-23 06:01:42,641] INFO Registering new schema: subject tsuppyo-value, version null, id null, type null (io.confluent.kafka.schemaregistry.rest.resources.SubjectVersionsResource:249) [2021-04-23 06:01:42,643] INFO 172.22.0.1 - - [23/Apr/2021:06:01:42 +0000] "POST /subjects/tsuppyo-value/versions HTTP/1.1" 200 8 3 (io.confluent.rest-utils.requests:62) [2021-04-23 06:01:45,220] INFO Registering new schema: subject tsuppyo-value, version null, id null, type null (io.confluent.kafka.schemaregistry.rest.resources.SubjectVersionsResource:249) [2021-04-23 06:01:45,222] INFO 172.22.0.1 - - [23/Apr/2021:06:01:45 +0000] "POST /subjects/tsuppyo-value/versions HTTP/1.1" 200 8 4 (io.confluent.rest-utils.requests:62) [2021-04-23 06:01:47,685] INFO Registering new schema: subject tsuppyo-value, version null, id null, type null (io.confluent.kafka.schemaregistry.rest.resources.SubjectVersionsResource:249) [2021-04-23 06:01:47,689] INFO 172.22.0.1 - - [23/Apr/2021:06:01:47 +0000] "POST /subjects/tsuppyo-value/versions HTTP/1.1" 200 8 5 (io.confluent.rest-utils.requests:62) [2021-04-23 06:06:23,110] INFO Registering new schema: subject tsuppyo-value, version null, id null, type null (io.confluent.kafka.schemaregistry.rest.resources.SubjectVersionsResource:249) [2021-04-23 06:06:23,111] INFO 172.22.0.1 - - [23/Apr/2021:06:06:23 +0000] "POST /subjects/tsuppyo-value/versions HTTP/1.1" 200 8 2 (io.confluent.rest-utils.requests:62) [2021-04-23 06:06:25,368] INFO Registering new schema: subject tsuppyo-value, version null, id null, type null (io.confluent.kafka.schemaregistry.rest.resources.SubjectVersionsResource:249) [2021-04-23 06:06:25,370] INFO 172.22.0.1 - - [23/Apr/2021:06:06:25 +0000] "POST /subjects/tsuppyo-value/versions HTTP/1.1" 200 8 4 (io.confluent.rest-utils.requests:62) [2021-04-23 06:06:27,807] INFO Registering new schema: subject tsuppyo-value, version null, id null, type null (io.confluent.kafka.schemaregistry.rest.resources.SubjectVersionsResource:249) [2021-04-23 06:06:27,809] INFO 172.22.0.1 - - [23/Apr/2021:06:06:27 +0000] "POST /subjects/tsuppyo-value/versions HTTP/1.1" 200 8 3 (io.confluent.rest-utils.requests:62)

cerveada commented 3 years ago

So if I understand it right, the version 4.2.0 solves the problem for you?

vphutane commented 3 years ago

Yes....that is right. version4.2.0 does the trick. Thank you for such wonderful work...truly appreciate you guys.