IBMStreams / streamsx.kafka

Repository for integration with Apache Kafka
https://ibmstreams.github.io/streamsx.kafka/
Apache License 2.0
13 stars 9 forks source link

KafkaConsumer: dynamic startPosition parameter #213

Closed tenbarge closed 4 years ago

tenbarge commented 4 years ago

I'd like to create a composite which allows the KafkaConsumer's startPosition to be chosen based on a submission time value.

The example KafkaStartOffsetSample.spl#L24 shows a literal value which maps to the StartPosition enum values but when I try to use the rstring from a submission time value it causes a build error.

CDISP0048E ERROR: The startPosition operator parameter requires values of the enum{Beginning,End,Default,Time,Offset} type, but values of the rstring type are specified.

I also tried to recreate the enum in SPL but that will not build either.

CDISP0088E ERROR: The (enum{Beginning,End,Default,Time,Offset})(getSubmissionTimeValue("kafkaStartPosition", "Default")) value of the startPosition parameter is not a custom literal. The operator model for the TEST_KAFKA operator expects a value of the custom literal type.

With AbstractKafkaConsumerOperator.setStartPosition expecting a custom literal type with a value from the enum, is there a way I can pass a dynamic value for the startPosition param?

I did test a modification of AbstractKafkaConsumerOperator.setStartPosition where it takes a String and converts the String to the StartPosition enum value and this works for me but it breaks passing in the custom literal type. As one last test I tried to define two setStartPosition methods where one expects a String and one expects a StartPosition but the build chooses the second one defined instead of supporting both.

My need is to support Beginning, End, Default and Time so the only other alternative I can think of is to build four jobs, one to support each startPosition value that we may want to run. This isn't as nice for operations if we can support a submission time value.

ghost commented 4 years ago

It's known, that custom literals for operator paramers cannot be used for submission time parameters - at least for Java operators.

I'm going to add a new parameter startPositionStr, which takes an rstring expression. That's the only solution. The disadvantage is, that invalid startPosition values can be detected earliest, when the PE launches, i.e. at runtime. Illegal custom literals, for example startPosition: Begin would be detected by the Streams compiler.

Please note, when you want to select the startPosition at job submission and want to have also Time as an option, you must also add the startTime parameter and assign it from a submission time value. The other startpositions would ignore the start time parameter.

ghost commented 4 years ago

resolved with toolkit release v3.0.1

tenbarge commented 4 years ago

Perfect! Thank you @RolefH I really appreciate how quickly you resolved this.

ghost commented 4 years ago

A sample is now here: https://github.com/IBMStreams/streamsx.kafka/tree/develop/samples/KafkaConsumerVariableStartPositionSample