factorhouse / kpow-streams-agent

Monitor Kafka Streams applications with Kpow
Apache License 2.0
11 stars 1 forks source link

Multiple StreamsRegistry in same application #3

Open AntoineDuComptoirDesPharmacies opened 2 years ago

AntoineDuComptoirDesPharmacies commented 2 years ago

Hi,

We are currently using StreamsRegistry to monitor a KafkaStream in our application. We would like to add a new KafkaStream in the same application and, for separation of concerns, we tried to create a new StreamsRegistry for this second stream. However, the registry crash on register with the following error :

application-akka.actor.default-dispatcher-19 - [error] - akka.dispatch.TaskInvocation - Attempting to call unbound fn: #'io.operatr.kpow
.agent/init-registry
java.lang.IllegalStateException: Attempting to call unbound fn: #'io.operatr.kpow.agent/init-registry
        at clojure.lang.Var$Unbound.throwArity(Var.java:45)
        at clojure.lang.AFn.invoke(AFn.java:32)
        at clojure.lang.Var.invoke(Var.java:384)
        at io.operatr.kpow.StreamsRegistry.<init>(StreamsRegistry.java:83)
        at events.KafkaStreamEventProcessor.run(KafkaStreamEventProcessor.java:176)
        at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:49)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:834)
[ERROR] [08/11/2022 07:55:20.654] [application-akka.actor.kafka-consumers-dispatcher-15] [TaskInvocation] Attempting to call unbound fn:
 #'io.operatr.kpow.agent/init-registry
java.lang.IllegalStateException: Attempting to call unbound fn: #'io.operatr.kpow.agent/init-registry
        at clojure.lang.Var$Unbound.throwArity(Var.java:45)
        at clojure.lang.AFn.invoke(AFn.java:32)
        at clojure.lang.Var.invoke(Var.java:384)
        at io.operatr.kpow.StreamsRegistry.<init>(StreamsRegistry.java:83)
        at events.KafkaStreamEventProcessor.run(KafkaStreamEventProcessor.java:176)
        at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:49)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:834)

We are wondering if StreamsRegistry is meant to be instantiated only one time per application or if we did a mistake while declaring/using it ?

Thanks in advance for your answer ! Yours faithfully, LCDP

wavejumper commented 2 years ago

Hey there @AntoineDuComptoirDesPharmacies,

That seems like a bug to me! You should be able to have multiple instances of the streams agent configured.

There are two patterns for using the streams agent:


Pattern 1: using a single StreamsRegistry instance (recommended)

You should be able to use a single StreamsRegistry instance and register multiple streams against it (example below). This has the advantage of using less resources (threads, kafka producers etc).

import io.operatr.kpow.StreamsRegistry;

// Your Kafka Streams topology
Topology topologyA = createMyTopology(); 
Topology topologyB = createMyOtherTopology(); 

// Your Kafka Streams config
Properties props = new createMyStreamProperties();

// Your Kafka Streams instance
KafkaStreams streamsA = new KafkaStreams(topology, props); 
KafkaStreams streamsA = new KafkaStreams(topology, props); 

// Create a kPow StreamsRegistry
StreamsRegistry registry = new StreamsRegistry(props);

// Register your KafkaStreams and Topology instances with the StreamsRegistry
registry.register(streamsA, topologyA); 
registry.register(streamsB, topologyB); 

// Start your Kafka Streams application
streams.start();

Pattern 2: creating multiple StreamsRegistry instances

import io.operatr.kpow.StreamsRegistry;

// Your Kafka Streams topology
Topology topologyA = createMyTopology(); 
Topology topologyB = createMyOtherTopology(); 

// Your Kafka Streams config
Properties props = new createMyStreamProperties();

// Your Kafka Streams instance
KafkaStreams streamsA = new KafkaStreams(topology, props); 
KafkaStreams streamsA = new KafkaStreams(topology, props); 

// Create a kPow StreamsRegistry
StreamsRegistry registryA = new StreamsRegistry(props);
StreamsRegistry registryB = new StreamsRegistry(props);

// Register your KafkaStreams and Topology instances with the StreamsRegistry
registryA.register(streamsA, topologyA); 
registryB.register(streamsB, topologyB); 

// Start your Kafka Streams application
streams.start();

This pattern should also be supported, but is looking to cause a runtime exception for you. We will try and reproduce and get a fix for you.

In the meantime, if you can try and re-usue a single instance of StreamsRegistry, that would be the preferred way of using the streams agent (for the performance reasons outlined).