AbsaOSS / ABRiS

Avro SerDe for Apache Spark structured APIs.
Apache License 2.0
229 stars 75 forks source link

Low performance while using `latest` as `schema.id` #105

Closed agolovenko closed 4 years ago

agolovenko commented 4 years ago

Looks like SchemaLoader uses unchached call to get the latest version id:

https://github.com/AbsaOSS/ABRiS/blob/master/src/main/scala/za/co/absa/abris/avro/schemas/SchemaLoader.scala#L103-L110

This happens quite often and results in a huge amount of http requests to schema registry. This value could be cached for some time period, and the time period should be configurable.

felipemmelo commented 4 years ago

@agolovenko , which version are you using? In the past it used to be the case, but in the newest version, 3.1.1, it should be invoked only once by executor.

agolovenko commented 4 years ago

It is 3.1.1 with spark 2.4.5 and scala 2.11.12.

agolovenko commented 4 years ago

It is strange: this is supposed to be cached, but is not: https://github.com/AbsaOSS/ABRiS/blob/master/src/main/scala/za/co/absa/abris/avro/sql/AvroDataToCatalyst.scala#L51-L55

Here's a a typical trace:

"stream execution thread for [id = 098b31cd-b8e8-46fe-995b-164a1f660178, runId = 997b4487-20a4-40a0-9a6f-2a806e600618]@7827" daemon prio=5 tid=0x3a nid=NA runnable
  java.lang.Thread.State: RUNNABLE
      at za.co.absa.abris.avro.read.confluent.SchemaManager$.getLatestVersionId(SchemaManager.scala:147)
      at za.co.absa.abris.avro.schemas.SchemaLoader$.getSchemaId(SchemaLoader.scala:105)
      at za.co.absa.abris.avro.schemas.SchemaLoader$.loadFromSchemaRegistry(SchemaLoader.scala:80)
      at za.co.absa.abris.avro.schemas.SchemaLoader$.loadFromSchemaRegistryValue(SchemaLoader.scala:49)
      at za.co.absa.abris.avro.parsing.utils.AvroSchemaUtils$.loadForValue(AvroSchemaUtils.scala:53)
      at za.co.absa.abris.avro.sql.AvroDataToCatalyst.loadSchemaFromRegistry(AvroDataToCatalyst.scala:130)
      at za.co.absa.abris.avro.sql.AvroDataToCatalyst.avroSchema$lzycompute(AvroDataToCatalyst.scala:53)
      - locked <0x2b88> (a za.co.absa.abris.avro.sql.AvroDataToCatalyst)
      at za.co.absa.abris.avro.sql.AvroDataToCatalyst.avroSchema(AvroDataToCatalyst.scala:51)
      at za.co.absa.abris.avro.sql.AvroDataToCatalyst.dataType$lzycompute(AvroDataToCatalyst.scala:47)
      at za.co.absa.abris.avro.sql.AvroDataToCatalyst.dataType(AvroDataToCatalyst.scala:47)
      at org.apache.spark.sql.catalyst.expressions.Alias.toAttribute(namedExpressions.scala:176)
      at org.apache.spark.sql.catalyst.optimizer.CollapseProject$$anonfun$collectAliases$1.applyOrElse(Optimizer.scala:670)
      at org.apache.spark.sql.catalyst.optimizer.CollapseProject$$anonfun$collectAliases$1.applyOrElse(Optimizer.scala:669)
      at scala.PartialFunction$$anonfun$runWith$1.apply(PartialFunction.scala:141)
      at scala.PartialFunction$$anonfun$runWith$1.apply(PartialFunction.scala:140)
      at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
      at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
      at scala.collection.TraversableLike$class.collect(TraversableLike.scala:271)
      at scala.collection.AbstractTraversable.collect(Traversable.scala:104)
      at org.apache.spark.sql.catalyst.optimizer.CollapseProject$.collectAliases(Optimizer.scala:669)
      at org.apache.spark.sql.catalyst.optimizer.CollapseProject$.org$apache$spark$sql$catalyst$optimizer$CollapseProject$$haveCommonNonDeterministicOutput(Optimizer.scala:678)
      at org.apache.spark.sql.catalyst.optimizer.CollapseProject$$anonfun$apply$9.applyOrElse(Optimizer.scala:654)
      at org.apache.spark.sql.catalyst.optimizer.CollapseProject$$anonfun$apply$9.applyOrElse(Optimizer.scala:652)
      at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$2.apply(TreeNode.scala:284)
      at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$2.apply(TreeNode.scala:284)
      at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:69)
      at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:283)
      at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformUp(LogicalPlan.scala:29)
      at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$class.transformUp(AnalysisHelper.scala:158)
      at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformUp(LogicalPlan.scala:29)
      at org.apache.spark.sql.catalyst.optimizer.CollapseProject$.apply(Optimizer.scala:652)
      at org.apache.spark.sql.catalyst.optimizer.CollapseProject$.apply(Optimizer.scala:650)
      at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:87)
      at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:84)
      at scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:124)
      at scala.collection.immutable.List.foldLeft(List.scala:84)
      at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:84)
      at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:76)
      at scala.collection.immutable.List.foreach(List.scala:392)
      at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:76)
      at org.apache.spark.sql.execution.streaming.IncrementalExecution.optimizedPlan$lzycompute(IncrementalExecution.scala:77)
      - locked <0x2bea> (a org.apache.spark.sql.execution.streaming.IncrementalExecution)
      at org.apache.spark.sql.execution.streaming.IncrementalExecution.optimizedPlan(IncrementalExecution.scala:77)
      at org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:73)
      at org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:69)
      at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:78)
      at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:78)
      at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$4.apply(MicroBatchExecution.scala:528)
      at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$4.apply(MicroBatchExecution.scala:519)
      at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351)
      at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
      at org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch(MicroBatchExecution.scala:519)
      at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:198)
      at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166)
      at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166)
      at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351)
      at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
      at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:166)
      at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
      at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:160)
      at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:281)
      at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:193)
felipemmelo commented 4 years ago

maybe issues accessing Schema Registry?

Also, are you using 3.1.1?

agolovenko commented 4 years ago

As I already mentioned I use: abris 3.1.1 with spark 2.4.5 and scala 2.11.12. No problems with schema registry: in fact setting the concrete schema id vs latest makes things run 5x faster.

agolovenko commented 4 years ago

tried with spark 2.4.4 - no difference. Here are some logs:

DEBUG   2020-04-08 19:31:03,902 11721   za.co.absa.abris.avro.read.confluent.SchemaManager  [Executor task launch worker for task 13]   Trying to get schema for id '100009'
DEBUG   2020-04-08 19:31:03,902 11721   za.co.absa.abris.avro.read.confluent.SchemaManager  [Executor task launch worker for task 15]   Trying to get schema for id '100009'
DEBUG   2020-04-08 19:31:03,902 11721   za.co.absa.abris.avro.read.confluent.SchemaManager  [Executor task launch worker for task 13]   Subject name resolved to: input2-value
DEBUG   2020-04-08 19:31:03,902 11721   za.co.absa.abris.avro.read.confluent.SchemaManager  [Executor task launch worker for task 15]   Subject name resolved to: input2-value
DEBUG   2020-04-08 19:31:03,902 11721   za.co.absa.abris.avro.read.confluent.SchemaManager  [Executor task launch worker for task 15]   Trying to get latest schema version id for subject 'input2-value'
DEBUG   2020-04-08 19:31:03,902 11721   za.co.absa.abris.avro.read.confluent.SchemaManager  [Executor task launch worker for task 13]   Trying to get latest schema version id for subject 'input2-value'
DEBUG   2020-04-08 19:31:03,902 11721   io.confluent.kafka.schemaregistry.client.rest.RestService   [Executor task launch worker for task 15]   Sending GET with input null to https://psrc-4kk0p.westeurope.azure.confluent.cloud/subjects/input2-value/versions/latest
DEBUG   2020-04-08 19:31:03,902 11721   za.co.absa.abris.avro.read.confluent.SchemaManager  [Executor task launch worker for task 14]   Trying to get schema for id '100009'
DEBUG   2020-04-08 19:31:03,994 11813   za.co.absa.abris.avro.read.confluent.SchemaManager  [Executor task launch worker for task 15]   Trying to get schema for subject 'input2-value' and id '100009'
DEBUG   2020-04-08 19:31:03,994 11813   za.co.absa.abris.avro.read.confluent.SchemaManager  [Executor task launch worker for task 14]   Subject name resolved to: input2-value
DEBUG   2020-04-08 19:31:03,994 11813   za.co.absa.abris.avro.read.confluent.SchemaManager  [Executor task launch worker for task 14]   Trying to get latest schema version id for subject 'input2-value'
DEBUG   2020-04-08 19:31:03,994 11813   io.confluent.kafka.schemaregistry.client.rest.RestService   [Executor task launch worker for task 13]   Sending GET with input null to https://psrc-4kk0p.westeurope.azure.confluent.cloud/subjects/input2-value/versions/latest
DEBUG   2020-04-08 19:31:04,082 11901   za.co.absa.abris.avro.read.confluent.SchemaManager  [Executor task launch worker for task 13]   Trying to get schema for subject 'input2-value' and id '100009'
DEBUG   2020-04-08 19:31:04,082 11901   io.confluent.kafka.schemaregistry.client.rest.RestService   [Executor task launch worker for task 14]   Sending GET with input null to https://psrc-4kk0p.westeurope.azure.confluent.cloud/subjects/input2-value/versions/latest
DEBUG   2020-04-08 19:31:04,171 11990   za.co.absa.abris.avro.read.confluent.SchemaManager  [Executor task launch worker for task 14]   Trying to get schema for subject 'input2-value' and id '100009'
DEBUG   2020-04-08 19:31:04,192 12011   org.apache.spark.sql.catalyst.expressions.codegen.GenerateSafeProjection    [Executor task launch worker for task 14]   code for createexternalrow(if (isnull(input[0, struct<DataType:string,EventId:string,Country:string,EventType:string,EventTimestamp:string,EventVersion:string,Event:struct<ConfirmUrl:string,ClientId:string,Language:string,ClientCountryCode:string,Street:string,Number:string,Door:string,PostCode:string,City:string,UserId:bigint,SalesforceId:string,UserName:string,Gender:string,FirstName:string,Surname:string,BirthDate:string,Email:string,PhoneNumber:string,CountryCode:string,RegistrationDate:string,OccurredOn:string,Version:string>>, true])) null else createexternalrow(if (input[0, struct<DataType:string,EventId:string,Country:string,EventType:string,EventTimestamp:string,EventVersion:string,Event:struct<ConfirmUrl:string,ClientId:string,Language:string,ClientCountryCode:string,Street:string,Number:string,Door:string,PostCode:string,City:string,UserId:bigint,SalesforceId:string,UserName:string,Gender:string,FirstName:string,Surname:string,BirthDate:string,Email:string,PhoneNumber:string,CountryCode:string,RegistrationDate:string,OccurredOn:string,Version:string>>, true].isNullAt) null else input[0, struct<DataType:string,EventId:string,Country:string,EventType:string,EventTimestamp:string,EventVersion:string,Event:struct<ConfirmUrl:string,ClientId:string,Language:string,ClientCountryCode:string,Street:string,Number:string,Door:string,PostCode:string,City:string,UserId:bigint,SalesforceId:string,UserName:string,Gender:string,FirstName:string,Surname:string,BirthDate:string,Email:string,PhoneNumber:string,CountryCode:string,RegistrationDate:string,OccurredOn:string,Version:string>>, true].DataType.toString, if (input[0, struct<DataType:string,EventId:string,Country:string,EventType:string,EventTimestamp:string,EventVersion:string,Event:struct<ConfirmUrl:string,ClientId:string,Language:string,ClientCountryCode:string,Street:string,Number:string,Door:string,PostCode:string,City:string,UserId:bigint,SalesforceId:string,UserName:string,Gender:string,FirstName:string,Surname:string,BirthDate:string,Email:string,PhoneNumber:string,CountryCode:string,RegistrationDate:string,OccurredOn:string,Version:string>>, true].isNullAt) null else input[0, struct<DataType:string,EventId:string,Country:string,EventType:string,EventTimestamp:string,EventVersion:string,Event:struct<ConfirmUrl:string,ClientId:string,Language:string,ClientCountryCode:string,Street:string,Number:string,Door:string,PostCode:string,City:string,UserId:bigint,SalesforceId:string,UserName:string,Gender:string,FirstName:string,Surname:string,BirthDate:string,Email:string,PhoneNumber:string,CountryCode:string,RegistrationDate:string,OccurredOn:string,Version:string>>, true].EventId.toString, if (input[0, struct<DataType:string,EventId:string,Country:string,EventType:string,EventTimestamp:string,EventVersion:string,Event:struct<ConfirmUrl:string,ClientId:string,Language:string,ClientCountryCode:string,Street:string,Number:string,Door:string,PostCode:string,City:string,UserId:bigint,SalesforceId:string,UserName:string,Gender:string,FirstName:string,Surname:string,BirthDate:string,Email:string,PhoneNumber:string,CountryCode:string,RegistrationDate:string,OccurredOn:string,Version:string>>, true].isNullAt) null else input[0, struct<DataType:string,EventId:string,Country:string,EventType:string,EventTimestamp:string,EventVersion:string,Event:struct<ConfirmUrl:string,ClientId:string,Language:string,ClientCountryCode:string,Street:string,Number:string,Door:string,PostCode:string,City:string,UserId:bigint,SalesforceId:string,UserName:string,Gender:string,FirstName:string,Surname:string,BirthDate:string,Email:string,PhoneNumber:string,CountryCode:string,RegistrationDate:string,OccurredOn:string,Version:string>>, true].Country.toString, if (input[0, struct<DataType:string,EventId:string,Country:string,EventType:string,EventTimestamp:string,EventVersion:string,Event:struct<ConfirmUrl:string,ClientId:string,Language:string,ClientCountryCode:string,Street:string,Number:string,Door:string,PostCode:string,City:string,UserId:bigint,SalesforceId:string,UserName:string,Gender:string,FirstName:string,Surname:string,BirthDate:string,Email:string,PhoneNumber:string,CountryCode:string,RegistrationDate:string,OccurredOn:string,Version:string>>, true].isNullAt) null else input[0, struct<DataType:string,EventId:string,Country:string,EventType:string,EventTimestamp:string,EventVersion:string,Event:struct<ConfirmUrl:string,ClientId:string,Language:string,ClientCountryCode:string,Street:string,Number:string,Door:string,PostCode:string,City:string,UserId:bigint,SalesforceId:string,UserName:string,Gender:string,FirstName:string,Surname:string,BirthDate:string,Email:string,PhoneNumber:string,CountryCode:string,RegistrationDate:string,OccurredOn:string,Version:string>>, true].EventType.toString, if (input[0, struct<DataType:string,EventId:string,Country:string,EventType:string,EventTimestamp:string,EventVersion:string,Event:struct<ConfirmUrl:string,ClientId:string,Language:string,ClientCountryCode:string,Street:string,Number:string,Door:string,PostCode:string,City:string,UserId:bigint,SalesforceId:string,UserName:string,Gender:string,FirstName:string,Surname:string,BirthDate:string,Email:string,PhoneNumber:string,CountryCode:string,RegistrationDate:string,OccurredOn:string,Version:string>>, true].isNullAt) null else input[0, struct<DataType:string,EventId:string,Country:string,EventType:string,EventTimestamp:string,EventVersion:string,Event:struct<ConfirmUrl:string,ClientId:string,Language:string,ClientCountryCode:string,Street:string,Number:string,Door:string,PostCode:string,City:string,UserId:bigint,SalesforceId:string,UserName:string,Gender:string,FirstName:string,Surname:string,BirthDate:string,Email:string,PhoneNumber:string,CountryCode:string,RegistrationDate:string,OccurredOn:string,Version:string>>, true].EventTimestamp.toString, if (input[0, struct<DataType:string,EventId:string,Country:string,EventType:string,EventTimestamp:string,EventVersion:string,Event:struct<ConfirmUrl:string,ClientId:string,Language:string,ClientCountryCode:string,Street:string,Number:string,Door:string,PostCode:string,City:string,UserId:bigint,SalesforceId:string,UserName:string,Gender:string,FirstName:string,Surname:string,BirthDate:string,Email:string,PhoneNumber:string,CountryCode:string,RegistrationDate:string,OccurredOn:string,Version:string>>, true].isNullAt) null else input[0, struct<DataType:string,EventId:string,Country:string,EventType:string,EventTimestamp:string,EventVersion:string,Event:struct<ConfirmUrl:string,ClientId:string,Language:string,ClientCountryCode:string,Street:string,Number:string,Door:string,PostCode:string,City:string,UserId:bigint,SalesforceId:string,UserName:string,Gender:string,FirstName:string,Surname:string,BirthDate:string,Email:string,PhoneNumber:string,CountryCode:string,RegistrationDate:string,OccurredOn:string,Version:string>>, true].EventVersion.toString, if (input[0, struct<DataType:string,EventId:string,Country:string,EventType:string,EventTimestamp:string,EventVersion:string,Event:struct<ConfirmUrl:string,ClientId:string,Language:string,ClientCountryCode:string,Street:string,Number:string,Door:string,PostCode:string,City:string,UserId:bigint,SalesforceId:string,UserName:string,Gender:string,FirstName:string,Surname:string,BirthDate:string,Email:string,PhoneNumber:string,CountryCode:string,RegistrationDate:string,OccurredOn:string,Version:string>>, true].isNullAt) null else if (isnull(input[0, struct<DataType:string,EventId:string,Country:string,EventType:string,EventTimestamp:string,EventVersion:string,Event:struct<ConfirmUrl:string,ClientId:string,Language:string,ClientCountryCode:string,Street:string,Number:string,Door:string,PostCode:string,City:string,UserId:bigint,SalesforceId:string,UserName:string,Gender:string,FirstName:string,Surname:string,BirthDate:string,Email:string,PhoneNumber:string,CountryCode:string,RegistrationDate:string,OccurredOn:string,Version:string>>, true].Event)) null else createexternalrow(if (input[0, struct<DataType:string,EventId:string,Country:string,EventType:string,EventTimestamp:string,EventVersion:string,Event:struct<ConfirmUrl:string,ClientId:string,Language:string,ClientCountryCode:string,Street:string,Number:string,Door:string,PostCode:string,City:string,UserId:bigint,SalesforceId:string,UserName:string,Gender:string,FirstName:string,Surname:string,BirthDate:string,Email:string,PhoneNumber:string,CountryCode:string,RegistrationDate:string,OccurredOn:string,Version:string>>, true].Event.isNullAt) null else input[0, struct<DataType:string,EventId:string,Country:string,EventType:string,EventTimestamp:string,EventVersion:string,Event:struct<ConfirmUrl:string,ClientId:string,Language:string,ClientCountryCode:string,Street:string,Number:string,Door:string,PostCode:string,City:string,UserId:bigint,SalesforceId:string,UserName:string,Gender:string,FirstName:string,Surname:string,BirthDate:string,Email:string,PhoneNumber:string,CountryCode:string,RegistrationDate:string,OccurredOn:string,Version:string>>, true].Event.ConfirmUrl.toString, if (input[0, struct<DataType:string,EventId:string,Country:string,EventType:string,EventTimestamp:string,EventVersion:string,Event:struct<ConfirmUrl:string,ClientId:string,Language:string,ClientCountryCode:string,Street:string,Number:string,Door:string,PostCode:string,City:string,UserId:bigint,SalesforceId:string,UserName:string,Gender:string,FirstName:string,Surname:string,BirthDate:string,Email:string,PhoneNumber:string,CountryCode:string,RegistrationDate:string,OccurredOn:string,Version:string>>, true].Event.isNullAt) null else input[0, struct<DataType:string,EventId:string,Country:string,EventType:string,EventTimestamp:string,EventVersion:string,Event:struct<ConfirmUrl:string,ClientId:string,Language:string,ClientCountryCode:string,Street:string,Number:string,Door:string,PostCode:string,City:string,UserId:bigint,SalesforceId:string,UserName:string,Gender:string,FirstName:string,Surname:string,BirthDate:string,Email:string,PhoneNumber:string,CountryCode:string,RegistrationDate:string,OccurredOn:string,Version:string>>, true].Event.ClientId.toString, if (input[0, struct<DataType:string,EventId:string,Country:string,EventType:string,EventTimestamp:string,EventVersion:string,Event:struct<ConfirmUrl:string,ClientId:string,Language:string,ClientCountryCode:string,Street:string,Number:string,Door:string,PostCode:string,City:string,UserId:bigint,SalesforceId:string,UserName:string,Gender:string,FirstName:string,Surname:string,BirthDate:string,Email:string,PhoneNumber:string,CountryCode:string,RegistrationDate:string,OccurredOn:string,Version:string>>, true].Event.isNullAt) null else input[0, struct<DataType:string,EventId:string,Country:string,EventType:string,EventTimestamp:string,EventVersion:string,Event:struct<ConfirmUrl:string,ClientId:string,Language:string,ClientCountryCode:string,Street:string,Number:string,Door:string,PostCode:string,City:string,UserId:bigint,SalesforceId:string,UserName:string,Gender:string,FirstName:string,Surname:string,BirthDate:string,Email:string,PhoneNumber:string,CountryCode:string,RegistrationDate:string,OccurredOn:string,Version:string>>, true].Event.Language.toString, if (input[0, struct<DataType:string,EventId:string,Country:string,EventType:string,EventTimestamp:string,EventVersion:string,Event:struct<ConfirmUrl:string,ClientId:string,Language:string,ClientCountryCode:string,Street:string,Number:string,Door:string,PostCode:string,City:string,UserId:bigint,SalesforceId:string,UserName:string,Gender:string,FirstName:string,Surname:string,BirthDate:string,Email:string,PhoneNumber:string,CountryCode:string,RegistrationDate:string,OccurredOn:string,Version:string>>, true].Event.isNullAt) null else input[0, struct<DataType:string,EventId:string,Country:string,EventType:string,EventTimestamp:string,EventVersion:string,Event:struct<ConfirmUrl:string,ClientId:string,Language:string,ClientCountryCode:string,Street:string,Number:string,Door:string,PostCode:string,City:string,UserId:bigint,SalesforceId:string,UserName:string,Gender:string,FirstName:string,Surname:string,BirthDate:string,Email:string,PhoneNumber:string,CountryCode:string,RegistrationDate:string,OccurredOn:string,Version:string>>, true].Event.ClientCountryCode.toString, if (input[0, struct<DataType:string,EventId:string,Country:string,EventType:string,EventTimestamp:string,EventVersion:string,Event:struct<ConfirmUrl:string,ClientId:string,Language:string,ClientCountryCode:string,Street:string,Number:string,Door:string,PostCode:string,City:string,UserId:bigint,SalesforceId:string,UserName:string,Gender:string,FirstName:string,Surname:string,BirthDate:string,Email:string,PhoneNumber:string,CountryCode:string,RegistrationDate:string,OccurredOn:string,Version:string>>, true].Event.isNullAt) null else input[0, struct<DataType:string,EventId:string,Country:string,EventType:string,EventTimestamp:string,EventVersion:string,Event:struct<ConfirmUrl:string,ClientId:string,Language:string,ClientCountryCode:string,Street:string,Number:string,Door:string,PostCode:string,City:string,UserId:bigint,SalesforceId:string,UserName:string,Gender:string,FirstName:string,Surname:string,BirthDate:string,Email:string,PhoneNumber:string,CountryCode:string,RegistrationDate:string,OccurredOn:string,Version:string>>, true].Event.Street.toString, if (input[0, struct<DataType:string,EventId:string,Country:string,EventType:string,EventTimestamp:string,EventVersion:string,Event:struct<ConfirmUrl:string,ClientId:string,Language:string,ClientCountryCode:string,Street:string,Number:string,Door:string,PostCode:string,City:string,UserId:bigint,SalesforceId:string,UserName:string,Gender:string,FirstName:string,Surname:string,BirthDate:string,Email:string,PhoneNumber:string,CountryCode:string,RegistrationDate:string,OccurredOn:string,Version:string>>, true].Event.isNullAt) null else input[0, struct<DataType:string,EventId:string,Country:string,EventType:string,EventTimestamp:string,EventVersion:string,Event:struct<ConfirmUrl:string,ClientId:string,Language:string,ClientCountryCode:string,Street:string,Number:string,Door:string,PostCode:string,City:string,UserId:bigint,SalesforceId:string,UserName:string,Gender:string,FirstName:string,Surname:string,BirthDate:string,Email:string,PhoneNumber:string,CountryCode:string,RegistrationDate:string,OccurredOn:string,Version:string>>, true].Event.Number.toString, if (input[0, struct<DataType:string,EventId:string,Country:string,EventType:string,EventTimestamp:string,EventVersion:string,Event:struct<ConfirmUrl:string,ClientId:string,Language:string,ClientCountryCode:string,Street:string,Number:string,Door:string,PostCode:string,City:string,UserId:bigint,SalesforceId:string,UserName:string,Gender:string,FirstName:string,Surname:string,BirthDate:string,Email:string,PhoneNumber:string,CountryCode:string,RegistrationDate:string,OccurredOn:string,Version:string>>, true].Event.isNullAt) null else input[0, struct<DataType:string,EventId:string,Country:string,EventType:string,EventTimestamp:string,EventVersion:string,Event:struct<ConfirmUrl:string,ClientId:string,Language:string,ClientCountryCode:string,Street:string,Number:string,Door:string,PostCode:string,City:string,UserId:bigint,SalesforceId:string,UserName:string,Gender:string,FirstName:string,Surname:string,BirthDate:string,Email:string,PhoneNumber:string,CountryCode:string,RegistrationDate:string,OccurredOn:string,Version:string>>, true].Event.Door.toString, if (input[0, struct<DataType:string,EventId:string,Country:string,EventType:string,EventTimestamp:string,EventVersion:string,Event:struct<ConfirmUrl:string,ClientId:string,Language:string,ClientCountryCode:string,Street:string,Number:string,Door:string,PostCode:string,City:string,UserId:bigint,SalesforceId:string,UserName:string,Gender:string,FirstName:string,Surname:string,BirthDate:string,Email:string,PhoneNumber:string,CountryCode:string,RegistrationDate:string,OccurredOn:string,Version:string>>, true].Event.isNullAt) null else input[0, struct<DataType:string,EventId:string,Country:string,EventType:string,EventTimestamp:string,EventVersion:string,Event:struct<ConfirmUrl:string,ClientId:string,Language:string,ClientCountryCode:string,Street:string,Number:string,Door:string,PostCode:string,City:string,UserId:bigint,SalesforceId:string,UserName:string,Gender:string,FirstName:string,Surname:string,BirthDate:string,Email:string,PhoneNumber:string,CountryCode:string,RegistrationDate:string,OccurredOn:string,Version:string>>, true].Event.PostCode.toString, if (input[0, struct<DataType:string,EventId:string,Country:string,EventType:string,EventTimestamp:string,EventVersion:string,Event:struct<ConfirmUrl:string,ClientId:string,Language:string,ClientCountryCode:string,Street:string,Number:string,Door:string,PostCode:string,City:string,UserId:bigint,SalesforceId:string,UserName:string,Gender:string,FirstName:string,Surname:string,BirthDate:string,Email:string,PhoneNumber:string,CountryCode:string,RegistrationDate:string,OccurredOn:string,Version:string>>, true].Event.isNullAt) null else input[0, struct<DataType:string,EventId:string,Country:string,EventType:string,EventTimestamp:string,EventVersion:string,Event:struct<ConfirmUrl:string,ClientId:string,Language:string,ClientCountryCode:string,Street:string,Number:string,Door:string,PostCode:string,City:string,UserId:bigint,SalesforceId:string,UserName:string,Gender:string,FirstName:string,Surname:string,BirthDate:string,Email:string,PhoneNumber:string,CountryCode:string,RegistrationDate:string,OccurredOn:string,Version:string>>, true].Event.City.toString, if (input[0, struct<DataType:string,EventId:string,Country:string,EventType:string,EventTimestamp:string,EventVersion:string,Event:struct<ConfirmUrl:string,ClientId:string,Language:string,ClientCountryCode:string,Street:string,Number:string,Door:string,PostCode:string,City:string,UserId:bigint,SalesforceId:string,UserName:string,Gender:string,FirstName:string,Surname:string,BirthDate:string,Email:string,PhoneNumber:string,CountryCode:string,RegistrationDate:string,OccurredOn:string,Version:string>>, true].Event.isNullAt) null else input[0, struct<DataType:string,EventId:string,Country:string,EventType:string,EventTimestamp:string,EventVersion:string,Event:struct<ConfirmUrl:string,ClientId:string,Language:string,ClientCountryCode:string,Street:string,Number:string,Door:string,PostCode:string,City:string,UserId:bigint,SalesforceId:string,UserName:string,Gender:string,FirstName:string,Surname:string,BirthDate:string,Email:string,PhoneNumber:string,CountryCode:string,RegistrationDate:string,OccurredOn:string,Version:string>>, true].Event.UserId, if (input[0, struct<DataType:string,EventId:string,Country:string,EventType:string,EventTimestamp:string,EventVersion:string,Event:struct<ConfirmUrl:string,ClientId:string,Language:string,ClientCountryCode:string,Street:string,Number:string,Door:string,PostCode:string,City:string,UserId:bigint,SalesforceId:string,UserName:string,Gender:string,FirstName:string,Surname:string,BirthDate:string,Email:string,PhoneNumber:string,CountryCode:string,RegistrationDate:string,OccurredOn:string,Version:string>>, true].Event.isNullAt) null else input[0, struct<DataType:string,EventId:string,Country:string,EventType:string,EventTimestamp:string,EventVersion:string,Event:struct<ConfirmUrl:string,ClientId:string,Language:string,ClientCountryCode:string,Street:string,Number:string,Door:string,PostCode:string,City:string,UserId:bigint,SalesforceId:string,UserName:string,Gender:string,FirstName:string,Surname:string,BirthDate:string,Email:string,PhoneNumber:string,CountryCode:string,RegistrationDate:string,OccurredOn:string,Version:string>>, true].Event.SalesforceId.toString, if (input[0, struct<DataType:string,EventId:string,Country:string,EventType:string,EventTimestamp:string,EventVersion:string,Event:struct<ConfirmUrl:string,ClientId:string,Language:string,ClientCountryCode:string,Street:string,Number:string,Door:string,PostCode:string,City:string,UserId:bigint,SalesforceId:string,UserName:string,Gender:string,FirstName:string,Surname:string,BirthDate:string,Email:string,PhoneNumber:string,CountryCode:string,RegistrationDate:string,OccurredOn:string,Version:string>>, true].Event.isNullAt) null else input[0, struct<DataType:string,EventId:string,Country:string,EventType:string,EventTimestamp:string,EventVersion:string,Event:struct<ConfirmUrl:string,ClientId:string,Language:string,ClientCountryCode:string,Street:string,Number:string,Door:string,PostCode:string,City:string,UserId:bigint,SalesforceId:string,UserName:string,Gender:string,FirstName:string,Surname:string,BirthDate:string,Email:string,PhoneNumber:string,CountryCode:string,RegistrationDate:string,OccurredOn:string,Version:string>>, true].Event.UserName.toString, if (input[0, struct<DataType:string,EventId:string,Country:string,EventType:string,EventTimestamp:string,EventVersion:string,Event:struct<ConfirmUrl:string,ClientId:string,Language:string,ClientCountryCode:string,Street:string,Number:string,Door:string,PostCode:string,City:string,UserId:bigint,SalesforceId:string,UserName:string,Gender:string,FirstName:string,Surname:string,BirthDate:string,Email:string,PhoneNumber:string,CountryCode:string,RegistrationDate:string,OccurredOn:string,Version:string>>, true].Event.isNullAt) null else input[0, struct<DataType:string,EventId:string,Country:string,EventType:string,EventTimestamp:string,EventVersion:string,Event:struct<ConfirmUrl:string,ClientId:string,Language:string,ClientCountryCode:string,Street:string,Number:string,Door:string,PostCode:string,City:string,UserId:bigint,SalesforceId:string,UserName:string,Gender:string,FirstName:string,Surname:string,BirthDate:string,Email:string,PhoneNumber:string,CountryCode:string,RegistrationDate:string,OccurredOn:string,Version:string>>, true].Event.Gender.toString, if (input[0, struct<DataType:string,EventId:string,Country:string,EventType:string,EventTimestamp:string,EventVersion:string,Event:struct<ConfirmUrl:string,ClientId:string,Language:string,ClientCountryCode:string,Street:string,Number:string,Door:string,PostCode:string,City:string,UserId:bigint,SalesforceId:string,UserName:string,Gender:string,FirstName:string,Surname:string,BirthDate:string,Email:string,PhoneNumber:string,CountryCode:string,RegistrationDate:string,OccurredOn:string,Version:string>>, true].Event.isNullAt) null else input[0, struct<DataType:string,EventId:string,Country:string,EventType:string,EventTimestamp:string,EventVersion:string,Event:struct<ConfirmUrl:string,ClientId:string,Language:string,ClientCountryCode:string,Street:string,Number:string,Door:string,PostCode:string,City:string,UserId:bigint,SalesforceId:string,UserName:string,Gender:string,FirstName:string,Surname:string,BirthDate:string,Email:string,PhoneNumber:string,CountryCode:string,RegistrationDate:string,OccurredOn:string,Version:string>>, true].Event.FirstName.toString, if (input[0, struct<DataType:string,EventId:string,Country:string,EventType:string,EventTimestamp:string,EventVersion:string,Event:struct<ConfirmUrl:string,ClientId:string,Language:string,ClientCountryCode:string,Street:string,Number:string,Door:string,PostCode:string,City:string,UserId:bigint,SalesforceId:string,UserName:string,Gender:string,FirstName:string,Surname:string,BirthDate:string,Email:string,PhoneNumber:string,CountryCode:string,RegistrationDate:string,OccurredOn:string,Version:string>>, true].Event.isNullAt) null else input[0, struct<DataType:string,EventId:string,Country:string,EventType:string,EventTimestamp:string,EventVersion:string,Event:struct<ConfirmUrl:string,ClientId:string,Language:string,ClientCountryCode:string,Street:string,Number:string,Door:string,PostCode:string,City:string,UserId:bigint,SalesforceId:string,UserName:string,Gender:string,FirstName:string,Surname:string,BirthDate:string,Email:string,PhoneNumber:string,CountryCode:string,RegistrationDate:string,OccurredOn:string,Version:string>>, true].Event.Surname.toString, if (input[0, struct<DataType:string,EventId:string,Country:string,EventType:string,EventTimestamp:string,EventVersion:string,Event:struct<ConfirmUrl:string,ClientId:string,Language:string,ClientCountryCode:string,Street:string,Number:string,Door:string,PostCode:string,City:string,UserId:bigint,SalesforceId:string,UserName:string,Gender:string,FirstName:string,Surname:string,BirthDate:string,Email:string,PhoneNumber:string,CountryCode:string,RegistrationDate:string,OccurredOn:string,Version:string>>, true].Event.isNullAt) null else input[0, struct<DataType:string,EventId:string,Country:string,EventType:string,EventTimestamp:string,EventVersion:string,Event:struct<ConfirmUrl:string,ClientId:string,Language:string,ClientCountryCode:string,Street:string,Number:string,Door:string,PostCode:string,City:string,UserId:bigint,SalesforceId:string,UserName:string,Gender:string,FirstName:string,Surname:string,BirthDate:string,Email:string,PhoneNumber:string,CountryCode:string,RegistrationDate:string,OccurredOn:string,Version:string>>, true].Event.BirthDate.toString, if (input[0, struct<DataType:string,EventId:string,Country:string,EventType:string,EventTimestamp:string,EventVersion:string,Event:struct<ConfirmUrl:string,ClientId:string,Language:string,ClientCountryCode:string,Street:string,Number:string,Door:string,PostCode:string,City:string,UserId:bigint,SalesforceId:string,UserName:string,Gender:string,FirstName:string,Surname:string,BirthDate:string,Email:string,PhoneNumber:string,CountryCode:string,RegistrationDate:string,OccurredOn:string,Version:string>>, true].Event.isNullAt) null else input[0, struct<DataType:string,EventId:string,Country:string,EventType:string,EventTimestamp:string,EventVersion:string,Event:struct<ConfirmUrl:string,ClientId:string,Language:string,ClientCountryCode:string,Street:string,Number:string,Door:string,PostCode:string,City:string,UserId:bigint,SalesforceId:string,UserName:string,Gender:string,FirstName:string,Surname:string,BirthDate:string,Email:string,PhoneNumber:string,CountryCode:string,RegistrationDate:string,OccurredOn:string,Version:string>>, true].Event.Email.toString, if (input[0, struct<DataType:string,EventId:string,Country:string,EventType:string,EventTimestamp:string,EventVersion:string,Event:struct<ConfirmUrl:string,ClientId:string,Language:string,ClientCountryCode:string,Street:string,Number:string,Door:string,PostCode:string,City:string,UserId:bigint,SalesforceId:string,UserName:string,Gender:string,FirstName:string,Surname:string,BirthDate:string,Email:string,PhoneNumber:string,CountryCode:string,RegistrationDate:string,OccurredOn:string,Version:string>>, true].Event.isNullAt) null else input[0, struct<DataType:string,EventId:string,Country:string,EventType:string,EventTimestamp:string,EventVersion:string,Event:struct<ConfirmUrl:string,ClientId:string,Language:string,ClientCountryCode:string,Street:string,Number:string,Door:string,PostCode:string,City:string,UserId:bigint,SalesforceId:string,UserName:string,Gender:string,FirstName:string,Surname:string,BirthDate:string,Email:string,PhoneNumber:string,CountryCode:string,RegistrationDate:string,OccurredOn:string,Version:string>>, true].Event.PhoneNumber.toString, if (input[0, struct<DataType:string,EventId:string,Country:string,EventType:string,EventTimestamp:string,EventVersion:string,Event:struct<ConfirmUrl:string,ClientId:string,Language:string,ClientCountryCode:string,Street:string,Number:string,Door:string,PostCode:string,City:string,UserId:bigint,SalesforceId:string,UserName:string,Gender:string,FirstName:string,Surname:string,BirthDate:string,Email:string,PhoneNumber:string,CountryCode:string,RegistrationDate:string,OccurredOn:string,Version:string>>, true].Event.isNullAt) null else input[0, struct<DataType:string,EventId:string,Country:string,EventType:string,EventTimestamp:string,EventVersion:string,Event:struct<ConfirmUrl:string,ClientId:string,Language:string,ClientCountryCode:string,Street:string,Number:string,Door:string,PostCode:string,City:string,UserId:bigint,SalesforceId:string,UserName:string,Gender:string,FirstName:string,Surname:string,BirthDate:string,Email:string,PhoneNumber:string,CountryCode:string,RegistrationDate:string,OccurredOn:string,Version:string>>, true].Event.CountryCode.toString, if (input[0, struct<DataType:string,EventId:string,Country:string,EventType:string,EventTimestamp:string,EventVersion:string,Event:struct<ConfirmUrl:string,ClientId:string,Language:string,ClientCountryCode:string,Street:string,Number:string,Door:string,PostCode:string,City:string,UserId:bigint,SalesforceId:string,UserName:string,Gender:string,FirstName:string,Surname:string,BirthDate:string,Email:string,PhoneNumber:string,CountryCode:string,RegistrationDate:string,OccurredOn:string,Version:string>>, true].Event.isNullAt) null else input[0, struct<DataType:string,EventId:string,Country:string,EventType:string,EventTimestamp:string,EventVersion:string,Event:struct<ConfirmUrl:string,ClientId:string,Language:string,ClientCountryCode:string,Street:string,Number:string,Door:string,PostCode:string,City:string,UserId:bigint,SalesforceId:string,UserName:string,Gender:string,FirstName:string,Surname:string,BirthDate:string,Email:string,PhoneNumber:string,CountryCode:string,RegistrationDate:string,OccurredOn:string,Version:string>>, true].Event.RegistrationDate.toString, if (input[0, struct<DataType:string,EventId:string,Country:string,EventType:string,EventTimestamp:string,EventVersion:string,Event:struct<ConfirmUrl:string,ClientId:string,Language:string,ClientCountryCode:string,Street:string,Number:string,Door:string,PostCode:string,City:string,UserId:bigint,SalesforceId:string,UserName:string,Gender:string,FirstName:string,Surname:string,BirthDate:string,Email:string,PhoneNumber:string,CountryCode:string,RegistrationDate:string,OccurredOn:string,Version:string>>, true].Event.isNullAt) null else input[0, struct<DataType:string,EventId:string,Country:string,EventType:string,EventTimestamp:string,EventVersion:string,Event:struct<ConfirmUrl:string,ClientId:string,Language:string,ClientCountryCode:string,Street:string,Number:string,Door:string,PostCode:string,City:string,UserId:bigint,SalesforceId:string,UserName:string,Gender:string,FirstName:string,Surname:string,BirthDate:string,Email:string,PhoneNumber:string,CountryCode:string,RegistrationDate:string,OccurredOn:string,Version:string>>, true].Event.OccurredOn.toString, if (input[0, struct<DataType:string,EventId:string,Country:string,EventType:string,EventTimestamp:string,EventVersion:string,Event:struct<ConfirmUrl:string,ClientId:string,Language:string,ClientCountryCode:string,Street:string,Number:string,Door:string,PostCode:string,City:string,UserId:bigint,SalesforceId:string,UserName:string,Gender:string,FirstName:string,Surname:string,BirthDate:string,Email:string,PhoneNumber:string,CountryCode:string,RegistrationDate:string,OccurredOn:string,Version:string>>, true].Event.isNullAt) null else input[0, struct<DataType:string,EventId:string,Country:string,EventType:string,EventTimestamp:string,EventVersion:string,Event:struct<ConfirmUrl:string,ClientId:string,Language:string,ClientCountryCode:string,Street:string,Number:string,Door:string,PostCode:string,City:string,UserId:bigint,SalesforceId:string,UserName:string,Gender:string,FirstName:string,Surname:string,BirthDate:string,Email:string,PhoneNumber:string,CountryCode:string,RegistrationDate:string,OccurredOn:string,Version:string>>, true].Event.Version.toString, StructField(ConfirmUrl,StringType,false), StructField(ClientId,StringType,false), ... 20 more fields), StructField(DataType,StringType,true), StructField(EventId,StringType,true), StructField(Country,StringType,true), StructField(EventType,StringType,true), StructField(EventTimestamp,StringType,true), StructField(EventVersion,StringType,true), StructField(Event,StructType(StructField(ConfirmUrl,StringType,false), StructField(ClientId,StringType,false), StructField(Language,StringType,false), StructField(ClientCountryCode,StringType,false), StructField(Street,StringType,true), StructField(Number,StringType,true), StructField(Door,StringType,true), StructField(PostCode,StringType,true), StructField(City,StringType,true), StructField(UserId,LongType,false), StructField(SalesforceId,StringType,false), StructField(UserName,StringType,false), StructField(Gender,StringType,true), StructField(FirstName,StringType,true), StructField(Surname,StringType,true), StructField(BirthDate,StringType,false), StructField(Email,StringType,false), StructField(PhoneNumber,StringType,true), StructField(CountryCode,StringType,false), StructField(RegistrationDate,StringType,false), StructField(OccurredOn,StringType,false), StructField(Version,StringType,true)),true)), StructField(value,StructType(StructField(DataType,StringType,true), StructField(EventId,StringType,true), StructField(Country,StringType,true), StructField(EventType,StringType,true), StructField(EventTimestamp,StringType,true), StructField(EventVersion,StringType,true), StructField(Event,StructType(StructField(ConfirmUrl,StringType,false), StructField(ClientId,StringType,false), StructField(Language,StringType,false), StructField(ClientCountryCode,StringType,false), StructField(Street,StringType,true), StructField(Number,StringType,true), StructField(Door,StringType,true), StructField(PostCode,StringType,true), StructField(City,StringType,true), StructField(UserId,LongType,false), StructField(SalesforceId,StringType,false), StructField(UserName,StringType,false), StructField(Gender,StringType,true), StructField(FirstName,StringType,true), StructField(Surname,StringType,true), StructField(BirthDate,StringType,false), StructField(Email,StringType,false), StructField(PhoneNumber,StringType,true), StructField(CountryCode,StringType,false), StructField(RegistrationDate,StringType,false), StructField(OccurredOn,StringType,false), StructField(Version,StringType,true)),true)),true)):
agolovenko commented 4 years ago

Just as an idea: could it all be for the reason that my app is in fact structured streaming app? Seems like AvroDataToCatalyst gets recreated for each batch. If so, is there a way around that?

  val schemaRegistryConfig = Map(
    SchemaManager.PARAM_SCHEMA_REGISTRY_URL          -> "https://psrc-4kk0p.westeurope.azure.confluent.cloud",
    SchemaManager.PARAM_SCHEMA_REGISTRY_TOPIC        -> "input2",
    SchemaManager.PARAM_VALUE_SCHEMA_NAMING_STRATEGY -> SchemaManager.SchemaStorageNamingStrategies.TOPIC_NAME,
    SchemaManager.PARAM_VALUE_SCHEMA_ID              -> "latest", //"100009", 
    "basic.auth.credentials.source"                  -> "USER_INFO",
    "schema.registry.basic.auth.user.info"           -> "...",
    "auto.register.schemas"                          -> "false"
  )

    val upstream = consumeEventHub() // creates an upstream of messages

    upstream
      .select(from_confluent_avro(col("body"), schemaRegistryConfig) as "value")
      .writeStream
      .option("checkpointLocation", "checkpoints")
      .format("console")
      .start()
      .awaitTermination()
felipemmelo commented 4 years ago

Hi @agolovenko , sorry, was on holidays. It shouldn't be, since the library was developed specifically for the structured API. I'll try to replicate your issue and come back to you asap.

algorri94 commented 4 years ago

Hi, I can confirm this is happening to us as well. We are receiving many calls in the Schema Registry API trying to get the latest version. 15:06:29 INFO: i.c.r.requests | 127.0.0.1 - - [13/Apr/2020:13:06:29 +0000] “GET /subjects/subject-value/versions/latest HTTP/1.1” 200 3611 2 It seems it's only happening in the _from_confluentavro function. Apparently, it's working as expected when using the _to_confluentavro function.

felipemmelo commented 4 years ago

Hi @agolovenko and @algorri94 , once again, tks a lot for the help.

We seem to have 2 situations here:

  1. Retrieval of Avro reader schema for every micro-batch
  2. Retrieval of Avro writer schema for every record

The former is definitely a "performance bug" and is something we can quickly address by retrieving the schema before the Catalyst expression is invoked, which would achieve the same performance as when the schema is informed as a plain JSON file.

The latter is a bit more involved. As you certainly know, Avro uses writer and reader schemas to provide evolution capabilities. We cannot assume a single writer schema for the whole execution for compatibility reasons, so we rely on CachedSchemaRegistryClient to cache it, which should be doing its job locally, i.e. should not reach Schema Registry back-end for a cached id as we can see here.

@cerveada is currently off but I'll ask him for a chat as soon as he's back for us to decide how to better address these questions. In the meantime, if you have ideas or would like to give a PR a try, please, feel free.

Cheers.

agolovenko commented 4 years ago

Thanks @felipemmelo ! Here's my comment

we rely on CachedSchemaRegistryClient to cache it, which should be doing its job locally

not all the calls of CachedSchemaRegistryClient are cached in reality, you can browse the source code when in doubt. getLatestSchema is one of non-cached calls and this behavior makes a lot of sense for a client that is asked "what is the latest schema RN?".

The problem is that this isn't that this model is the best for this library. You probably what to cache the result of this call, but also not forever but some period of time...

felipemmelo commented 4 years ago

Hi @agolovenko , my comments on your comments.

  1. The method you're referring to is getLatestSchemaMetadata, right? This one

  2. If yes, then of course it has to constantly query Schema Registry for the latest version, however, this only happens when getting the schema to be used by Catalyst, as you can browse here to confirm

  3. The subsequent calls can and must be cached since they are based on the id available on top of the payload for each record, as you can see here

Anyway, thank you very much for coming back and we'll soon release an improvement for this.

cerveada commented 4 years ago

Hello in new version (2.3.0 3.2.0) this should be fixed. In case of AvroDataToCatalyst :

Could you test the new version and let us know if it works?

agolovenko commented 4 years ago

Thanks guys! Great job!

cerveada commented 4 years ago

You are welcome. Since there seems to be no issue any more, I'm closing this ticket. Please open a new one if you have any problems.