streamthoughts / jikkou

The Open source Resource as Code framework for Apache Kafka. Jikkou helps you implement GitOps for Kafka at scale!
https://www.jikkou.io/
Apache License 2.0
213 stars 21 forks source link

BUG: CRUD a topic with a schema, using a large schema registry, fails both schema & topic management #498

Open JohnPreston opened 3 days ago

JohnPreston commented 3 days ago

Describe the bug When I have a topic and a schema defined in my resource file, and the schema registry I use has 1000s of subjects, the commands A/ take a very long time B/ fail with a NPE on topic management.

Additional info: I have observed, via DEBUG, that the client seems to list all the subjects, and retrieve all the schemas, every single time. Not sure why that is. The SR management should only mean to retrieve the existing subject, versions, if not found, create the first version, if found, try update to have a new version with the new schema definition, along with checking compatibility, and possibly setting compatibility on the subject, etc. But it does not need to pull all the schemas.

To Reproduce Try to create/update a topic on a cluster and small (empty/few) schema registry -> Works Try the same with a schema registry that has 1000s of subjects -> fails with NPE below Remove/comment out the Schema from the resource file-> works fine for the topic management.

Expected behavior My topic is created/updated and the operations for the schema should work too.

Screenshots/Configs

---
apiVersion: kafka.jikkou.io/v1beta2
kind: KafkaTopic
metadata:
  name: my-topic-name.v1
  labels: {}
  annotations: {}
spec:
  partitions: 1
  replicas: 3
  configs:
    min.insync.replicas: 2

#---
#apiVersion: schemaregistry.jikkou.io/v1beta2
#kind: SchemaRegistrySubject
#metadata:
#  name: my-topic-name.v1-value
#  labels:
#  annotations:
#    schemaregistry.jikkou.io/normalize-schema: true
#spec:
#  compatibilityLevel: FULL_TRANSITIVE
#  schemaType: AVRO
#  schema:
#    $ref: ./schemas/user-test.avsc
jikkou apply -f topics/private.yaml                                                                                                 
java.lang.NullPointerException                                                                                                                                                                                       
        at io.streamthoughts.jikkou.kafka.change.topics.CreateTopicChangeHandler.toNewTopic(CreateTopicChangeHandler.java:118)                                                                                       
        at java.base@21.0.2/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:197)                                                                                                                
        at java.base@21.0.2/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1708)          
        at java.base@21.0.2/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509)         
        at java.base@21.0.2/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499)                                                                                                             
        at java.base@21.0.2/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:921)                                                                                                               
        at java.base@21.0.2/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)                                                                                                                    
        at java.base@21.0.2/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:682)        
        at io.streamthoughts.jikkou.kafka.change.topics.CreateTopicChangeHandler.handleChanges(CreateTopicChangeHandler.java:74)                                                                                     
        at io.streamthoughts.jikkou.core.reconciler.DefaultChangeExecutor.execute(DefaultChangeExecutor.java:104)                                                                                                    
        at io.streamthoughts.jikkou.core.reconciler.DefaultChangeExecutor.lambda$execute$3(DefaultChangeExecutor.java:97)
        at java.base@21.0.2/java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:273)                                                                                                                
        at java.base@21.0.2/java.util.HashMap$EntrySpliterator.forEachRemaining(HashMap.java:1858)  
        at java.base@21.0.2/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509)                                                                                                                    
        at java.base@21.0.2/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499)                                                                                                             
        at java.base@21.0.2/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:921)                                                                                                               
        at java.base@21.0.2/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)         
        at java.base@21.0.2/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:682)                                                                                                                   
        at io.streamthoughts.jikkou.core.reconciler.DefaultChangeExecutor.execute(DefaultChangeExecutor.java:99)                                                                                                     
        at io.streamthoughts.jikkou.core.reconciler.DefaultChangeExecutor.applyChanges(DefaultChangeExecutor.java:72)
        at io.streamthoughts.jikkou.kafka.reconciler.AdminClientKafkaTopicController.execute(AdminClientKafkaTopicController.java:104)                                                                               
        at io.streamthoughts.jikkou.core.reconciler.Reconciler.apply(Reconciler.java:57)                                                                                                                             
        at io.streamthoughts.jikkou.core.DefaultApi.patch(DefaultApi.java:426)                       
        at io.streamthoughts.jikkou.core.DefaultApi.reconcile(DefaultApi.java:396)                                                                                                                                   
        at io.streamthoughts.jikkou.client.command.reconcile.BaseResourceCommand.call(BaseResourceCommand.java:54)                                                                                                   
        at io.streamthoughts.jikkou.client.command.reconcile.BaseResourceCommand.call(BaseResourceCommand.java:30)                                                                                                   
        at picocli.CommandLine.executeUserObject(CommandLine.java:2045)                                                                                                                                              
        at picocli.CommandLine.access$1500(CommandLine.java:148)                                                                                                                                                     
        at picocli.CommandLine$RunLast.executeUserObjectOfLastSubcommandWithSameParent(CommandLine.java:2465)                                                                                                        
        at picocli.CommandLine$RunLast.handle(CommandLine.java:2457)                                                                                                                                                 
        at picocli.CommandLine$RunLast.handle(CommandLine.java:2419)                                                                                                                                                 
        at picocli.CommandLine$AbstractParseResultHandler.execute(CommandLine.java:2277)                                                                                                                             
        at picocli.CommandLine$RunLast.execute(CommandLine.java:2421)                                                                                                                                                
        at io.streamthoughts.jikkou.client.Jikkou.executionStrategy(Jikkou.java:150)              
        at picocli.CommandLine.execute(CommandLine.java:2174)                                                                                                                                                        
        at io.streamthoughts.jikkou.client.Jikkou.execute(Jikkou.java:140)                              
        at io.streamthoughts.jikkou.client.Jikkou.main(Jikkou.java:128)                               
        at java.base@21.0.2/java.lang.invoke.LambdaForm$DMH/sa346b79c.invokeStaticInit(LambdaForm$DMH)

Error: NullPointerException: null  

Configuration is as you'd expect, with SASL_SSL/PLAIN and the registry details set to use Confluent Cloud URL, basic auth.

Runtime environment

JohnPreston commented 3 days ago

PS: if I split the schema and the topic to be in different files, then Jikkou seems happy to deal with the schema, although that's taking a long, long time to complete :shrug:

EDIT: I am guessing, jikkou is importing all the subjects & schemas to do the diff, but it could probably do that just for the subject name instead of a bulk?