kafka-ops / julie

A solution to help you build automation and gitops in your Apache Kafka deployments. The Kafka gitops!
MIT License
419 stars 114 forks source link

Avro Schemas: Support schema references #282

Open talvey opened 3 years ago

talvey commented 3 years ago

Attempting to register an Avro schema with a contained schema reference fails. Currently, there is no way to add schema references to the descriptor config.

As an example, given a primary schema (MyOrder-with-reference.avsc) and a referenced schema (MyContext.avsc), attempting to register the primary will fail with the following

Exception in thread "main" com.purbon.kafka.topology.schemas.SchemaRegistryManager$SchemaRegistryManagerException: Failed to parse the schema file schemas/avro/MyOrder-with-reference.avsc
    at com.purbon.kafka.topology.schemas.SchemaRegistryManager.register(SchemaRegistryManager.java:45)
    at com.purbon.kafka.topology.actions.topics.SyncTopicAction.syncTopic(SyncTopicAction.java:75)
    at com.purbon.kafka.topology.actions.topics.SyncTopicAction.run(SyncTopicAction.java:46)
    at com.purbon.kafka.topology.ExecutionPlan.execute(ExecutionPlan.java:88)
    at com.purbon.kafka.topology.ExecutionPlan.run(ExecutionPlan.java:69)
    at com.purbon.kafka.topology.JulieOps.run(JulieOps.java:189)
    at com.purbon.kafka.topology.JulieOps.run(JulieOps.java:201)
    at com.purbon.kafka.topology.CommandLineInterface.processTopology(CommandLineInterface.java:196)
    at com.purbon.kafka.topology.CommandLineInterface.run(CommandLineInterface.java:144)
    at com.purbon.kafka.topology.CommandLineInterface.main(CommandLineInterface.java:134)
Caused by: com.purbon.kafka.topology.schemas.SchemaRegistryManager$SchemaRegistryManagerException: Failed to parse the schema file kafka-topology/group1/schemas/avro/MyOrder-with-reference.avsc
    at com.purbon.kafka.topology.schemas.SchemaRegistryManager.register(SchemaRegistryManager.java:56)
    at com.purbon.kafka.topology.schemas.SchemaRegistryManager.register(SchemaRegistryManager.java:43)
    ... 9 more
Caused by: com.purbon.kafka.topology.schemas.SchemaRegistryManager$SchemaRegistryManagerException: Failed to parse the schema for subject 'my.dev.test.my.topic.1.0-value' of type 'AVRO'
    at com.purbon.kafka.topology.schemas.SchemaRegistryManager.lambda$save$0(SchemaRegistryManager.java:92)
    at java.base/java.util.Optional.orElseThrow(Optional.java:408)
    at com.purbon.kafka.topology.schemas.SchemaRegistryManager.save(SchemaRegistryManager.java:86)
    at com.purbon.kafka.topology.schemas.SchemaRegistryManager.register(SchemaRegistryManager.java:54)
    ... 10 more

MyOrder-with-reference.avsc

{
  "type": "record",
  "name": "MyOrder",
  "namespace": "data.generated",
  "fields": [
    {
      "name": "context",
      "type": "data.generated.MyContext"
    },
    {
      "name": "orderId",
      "type": "string"
    }
  ]
}

MyContext.avsc

{
  "type": "record",
  "namespace": "data.generated",
  "name": "MyContext",
  "doc": "Common event context data",
  "fields": [
    {
      "name": "eventTimestamp",
      "type": "string",
      "doc": "Timestamp as an ISO-8601 string"
    },
    {
      "name": "eventType",
      "type": "string",
      "doc": "Unique type defining message"
    },
    {
      "name": "traceId",
      "type": "string",
      "doc": "UUID for tracing.  May relate to multiple spanIds"
    }
  ]
}

descriptor.yaml

---
context: "my"
source: "dev"
projects:
  - name: "test"
    topics:
      - name: "my.topic.1.0"
        config:
          replication.factor: "1"
          num.partitions: "1"
        schemas:
          - value.schema.file: "schemas/avro/MyOrder-with-reference.avsc"

Describe the solution you'd like I'd like to be able to either specify schema references in the descriptor configuration, or better yet, allow for automatic detection of references in local files.

Describe alternatives you've considered I've used the kafka-schema-registry-maven-plugin maven plugin to manage schemas; however, you still can't reference those validly registered schemas (with references) in the descriptor yaml. In this scenario, configuration for schemas and topics has no direct relationship.

Additional context

Relevant code: It appears the current call hard-codes an empty list of references, so even if you "pre-register" the referenced schema in the registry, the parseSchema() will fail https://github.com/kafka-ops/julie/blob/73c18eacdc21b4573d88acdcfea37be32b0ec3ec/src/main/java/com/purbon/kafka/topology/schemas/SchemaRegistryManager.java#L83

purbon commented 3 years ago

related to #263

varminas commented 1 week ago

Is there any progress on this issue? At the moment I am forced to use the workaround solution by copying the same type in all the schemas. KTB can compile such a configuration, but as you know, "the last" type wins. This is a bad praxis as the same type must be copy-pasted and in case of changes there is a big possiblity to introduce some mistakes as the type which should be defined once and referenced then from other schemas need to be updated in all schemas.

It was kind of "ok" for long time, but after the update of the maven plugin "org.apache.avro:avro-compiler" to version 1.12.0 it became a real problem, because that plugin became more strict and it does not allow to redefine the same time multiple times. So, I need to use older version of the maven plugin in order to make schemas compatible for KTB.

varminas commented 1 week ago

Is it really necessary to modify something, as it looks like the Confluent support new syntax? https://akhq.io/docs/configuration/schema-registry/schema-references.html