Implement Transformer Interface for KafkaStreams-Connector:
The KeyedScottyWindowOperator implements the Proccessor interface.
Therefore, it is only possible to call the .process(...) function from the Kafka Streams DSL, which is a terminating function.
Thus, it is not possible to do further computations on the stream.
With the new class, which implement the transformer interface, it is possible to call .transform(...) on the stream and do further computations.
Add Supplier for Processor and Transformer and update demo:
When the Kafka Streams Demo is started with more than one partition for the input topic, an exception occurs:
Exception in thread "SumDemo-a9a0abaf-52f6-4a7c-badb-64991c3fca28-StreamThread-1" java.lang.IllegalStateException: This should not happen as timestamp() should only be called while a record is processed
at org.apache.kafka.streams.processor.internals.AbstractProcessorContext.timestamp(AbstractProcessorContext.java:158)
at de.tub.dima.scotty.kafkastreamsconnector.KeyedScottyWindowOperator.process(KeyedScottyWindowOperator.java:46)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:118)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:133)
at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:87)
at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:429)
at org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(AssignedStreamsTasks.java:474)
at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:536)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:792)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:698)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:671)
This happens, because only one KeyedScottyWindowOperator is created.
From the docs:
The supplier should always generate a new instance each time get() gets called. Creating a single Processor object and returning the same object reference in get() would be a violation of the supplier pattern and leads to runtime exceptions.
Therefore, Suppliers are added for Processor and Transformer and are used in the demo to fix this issue.
Implement Transformer Interface for KafkaStreams-Connector: The KeyedScottyWindowOperator implements the Proccessor interface. Therefore, it is only possible to call the .process(...) function from the Kafka Streams DSL, which is a terminating function. Thus, it is not possible to do further computations on the stream. With the new class, which implement the transformer interface, it is possible to call .transform(...) on the stream and do further computations.
Add Supplier for Processor and Transformer and update demo: When the Kafka Streams Demo is started with more than one partition for the input topic, an exception occurs:
This happens, because only one
KeyedScottyWindowOperator
is created. From the docs:Therefore, Suppliers are added for Processor and Transformer and are used in the demo to fix this issue.