Closed sobychacko closed 3 years ago
One use case is to only consume messages of relevant types. The event type would be declared as a message header event_type
. This feature is similar to condition
of @StreamListener
.
For example, the following function should only receive messages of type projekt-aggregat-1
which is mapped to ProjektAggregatEvent
in Kotlin code:
@Configuration
class ProjektStoreBuilderStreamConfiguration {
@Bean
fun projektStoreBuilder() = Consumer<GlobalKTable<String, ProjektAggregatEvent>> {}
}
The event types should be configurable on the binding level:
spring.cloud.stream.bindings.projektStoreBuilder-in-0.eventTypes = projekt-aggregat-1
spring.cloud.stream.bindings.otherFunction-in-0.eventTypes = projekt-aggregat-1,other-event-type-in-same-topic-1
The configurability of the event type header name is optional.
@andrashatvani Thinking about it a bit further, I wonder if solving it using a transformer approach is the cleaner one. See this SO thread, others have encountered this use case before. If the solutions outlined on that issue (See the accepted answer and the additional one as well from the reporter) work for you, I guess that will be a compromise for this issue. What do you think?
The problem with integrating the regular routing function here is that it introduces the creation of a new front-end topic. In addition, at the moment, the binder model only supports Kafka Streams functions triggered by a Kafka topic, not through any intermediate mechanisms.
I already use this approach and just for the purpose of building a state store it's an overkill: It's way more time-consuming than defining an empty consumer and providing a property in the yaml.
Perhaps a case-specific ie. event-type-focused mechanism would be the way? And instead of routing it really should be filtering: The binder is the first to see the message headers and it's the entry point which should decide which messages to forward to the functions and which to discard/ignore.
How does this work for condition
in @StreamListener
? Can't it just be ported?
@andrashatvani Although the binder is the first to see the message, in the case of Kafka Streams, by the time the binder gets it, Kafka completes the deserialization and there we have a mismatch with what is registered on the function.
Are you ok with disabling nativeDecoding
and then rely on using the framework provided message conversion on the inbound? If so, we might have some options. I will explore it further. Keep in mind that, this approach has a downside of making your topology deeper, but not by a lot.
I got your point.
I've never enabled nativeDecoding
so far, so I'm good with it. Do you mean with deeper topology that the framework would append a transformer?
nativeDecoding
enabled is the default in 3.0.x
and going forward in Kafka Streams binder, so you don't need to explicitly enable it. Yes, the framework appends some transformers and some branch calls.
Actually, there might be a way to solve this use case without resorting to the framework level message conversion. Sorry to cause more confusion, I am just thinking aloud. I will keep you posted with the design I have in mind. Do you use any custom Serde
for your inbound types? Or rely on binder provided Serde
inference?
You provided these above:
spring.cloud.stream.bindings.projektStoreBuilder-in-0.eventTypes = projekt-aggregat-1
spring.cloud.stream.bindings.otherFunction-in-0.eventTypes = projekt-aggregat-1,other-event-type-in-same-topic-1
How about using it at the Kafka Streams binding level instead?
spring.cloud.stream.kafka.streams.bindings.projektStoreBuilder-in-0.eventTypes = projekt-aggregat-1
spring.cloud.stream.bindings.otherFunction-in-0.eventTypes = projekt-aggregat-1,other-event-type-in-same-topic-1
How does it get mapped to the actual type (ProjektAggregatEvent
) from projekt-aggregat-1
?
I would expect the eventType to be specified as the actual type - ProjektAggregatEvent
. No?
Also, the incoming record's headers should contain the type information, correct? That is how we can match against what is set at the binding and based on a match continue processing downstream.
JsonSerde
. In case of the one-liner state store builder I didn't even have to provide this as value.serde and the values still will be serialiazed to JSON.projekt-aggregat-1
to ProjektAggregatEvent
- this will automatically be done by the Jackson ObjectMapper without further action. This applies to this one-liner case, where one event type will be consumed from a topic.
The event type in the message header is projekt-aggregat-1
event_type
, but any default is fine as long as it's configurable.data class ProjektAggregatEvent(
...
): BaseEvent(metaInfo) {
override fun getEventType() = "projekt-aggregat-1"
}
abstract class BaseEvent(open val metaInfo: MetaInfo) {
companion object {
const val EVENT_TYPE_HEADER = "event_type"
}
@JsonProperty(access = JsonProperty.Access.READ_ONLY)
abstract fun getEventType(): String
@JsonIgnore
abstract fun getMessageKey(): String
}
Hi @andrashatvani The suggested changes are pushed upstream. See this commit. Could you take a look at these changes and see if it meets the criteria for your use case? The changes are only available in 3.1.x
(should be part of the latest snapshots).
Wow, that's awesome, I'm very impressed and looking forward to the release, I appreciate @sobychacko!
I just realized that this support only works for KStream
as KTable
and GlobalKTable
do not provide process
, transform
or
branch
methods. Any chance you can convert the functions that you need tables as input to KStream
and then call toTable
internally in the business logic? I don't see a clean way to handle this for tables in the framework in the same way it is intuitive with KStream
type. If you have any ideas or suggestions, PR is welcomed!
Of course I could add an additional step, but it's a repetitive action and I really like to keep the state store builder as simple as it is: just a declaration. I assumed that the framework does the stream -> table conversion and in that case it has the chance to add any arbitrary transformation step prior to it. I just would like to better understand the issue.
@andrashatvani Framework directly calls StreamsBuilder.globaltable
with the correct SerDe types to consume from. Thus, we don't have a way to intervene before invoking the function. GlobalKTable
does not provide many ways to apply custom processing on it before calling the function, similar to what KStream
provides.
Out of curiosity, how do you currently make the GlobalKTable
scenario work for your use case?
I only use the function exactly as described in my first comment. It currently uses a single topic containing a single event type, thus in this very function no filtering is needed.
@sobychacko what's the status here?
Hi, @andrashatvani Sorry for the delay in responding to you. I was able to apply the same logic that we used for KStream
type for KTable
as well. Basically, if the event_type
in the Kafka record doesn't match what is configured on the application, then it won't be forwarded to the function. However, it remains difficult to apply this same idea for GlobalKTable
. The main issue is that GlobalKTable
does not allow it to be converted to KStream
(rightly so) and it does not expose any methods that allow us to inspect the headers. Although binder is the first one to see the GlobalKTable
data, it simply hands that off to the function without doing any extra processing or filtering. Here is a potential workaround though.
Let's say this is your function:
@Bean
public java.util.function.Consumer<GlobalKTable<Integer, Foo>> global() {
return gt -> {};
}
You can define a special Serde
bean in your application as below:
@Bean
public JsonSerde<Foo> serde() {
return new JsonSerde<>(new JsonSerializer<>(), new JsonDeserializer<Foo>() {
@Override
public Foo deserialize(String topic, byte[] data) {
final Foo deserialized = super.deserialize(topic, data);
// if (deserialized.getEventType.equals(...)
// { return deseialized; }
// else { throw new IllegalStateExcepion(); }
}
});
}
If you have a bean defined as above in the application, then the binder takes this Serde
for any type of Foo
and gets the highest precedence. Essentially, while deserializing the incoming data, the overridden method above checks for the event type specified on the actual data (Note that this is not the one in the Kafka record header) and if it is not found, then throw an exception.
Then, at the application level, you add this binding level property: spring.cloud.stream.kafka.streams.bindings.global-in-0.consumer.deserializationExceptionHandler: skipAndContine
. This is a new custom deserialization exception handler that is available from the binder that allows you to simply skip the message silently and then continue processing. This way, your function will only receive data that matched the event types defined on the incoming data.
I think this is a reasonable compromise to address this use case for GlobalKTable
types. Please let me know what you think about this idea. Tomorrow, we will release 3.1.2
. This will contain the KTable
changes required for the event type routing and this new SkipAndContinue
deserialization exception handler.
@sobychacko I appreciate your effort and that the event routing for streams and tables found its way in the most recent release. For global tables however, creating a deserializer for every event type defeats the purpose of the one-liner store builder. In of my recent microservices 5 event types need to be aggregated so this means 5 store builders and 5 deserializers - in such a case it's perhaps better to just write the aggregator consumers. Furthermore, throwing exceptions is very costly and doing it for every unwanted event type could mean a serious performance impact. In case of my 5 event types this would mean 4 exceptions for every event accepted. Another implication/risk would be the dilution/hiding of real deserialization issues.
@andrashatvani Ya, the workaround I proposed has trade-offs. I am still brainstorming ideas on how we can address the GlobalKTable
scenario. GlobalKTable
API does not provide many options. I started an SO thread, hopefully there is a way.
@andrashatvani Based on the suggestions on that SO thread, I don't think there is a cleaner way to achieve what you are looking for easily with a one-line function such as Consumer<GlobalKTable<String, ProjektAggregatEvent>>
. However, I have a few questions about this use-case. I see that your implementation is empty and thus assume that all you are interested in is in building a global state store that can be queried later in the application (maybe through an interactive query). If that is indeed the use-case and you are not planning to use the resultant GlobalKTable
for other join operations downstream with a KStream
, then I think what is suggested by Matthias on the SO thread will work, given we make some necessary changes inside the binder. We may have to introduce few new properties (something along the lines of spring.cloud.stream...globalStateStore
). If the event routing properties we already introduced ^^ are present along with the new property, then we introduce a processor when retrieving records into the global state store. This way, you don't even need to write that single line function, but rather provide a new configuration. What do you think about that?
@sobychacko Yes, the sole reason for these consumer functions is the building of state stores for later access via interactive queries. Building state stores merely by configuration with event type selection/routing would of course be even more awesome, so yes please!
@andrashatvani The implementation for the GlobalKTable
event routing is trickier than I thought. Here is the gist of the issue. While the application is running, we can in the binder provide a way to route events based on some strategy. However, when there is a restore operation, Kafka Streams completely bypasses the custom processor we provided when the addGlobalStore
was called and instead sources the data directly from the input topic, thus putting all the records we skipped prior, in the state store. This clearly has implications from a business logic perspective. This StackOverflow thread has more details on this.
This brings back the discussion on how to achieve this use case in the correct way. I don't think it is a good idea to implement the solution as is, given that it invalidates the state store if the application crashes or restarts. If you are open to the idea of introducing an extra topic that is the source for the global state store, here is an idea.
KStream
or KTable
to consume records from the original topic. All the function does is to write to another topic - Function<KStream<?, ?>, KStream<?, ?>>
. GlobalKTable
binding - Consumer<GlobalKTable<?, ?>>
. For this, you use the output from the above function (the new extra topic) as the input. The tradeoff obviously is that you have to now create and maintain an additional topic, but the data in the state store will be the correct set, which I think is more important. What do you think?
@andrashatvani Any thoughts on the ^^? The way you were thinking to build global stores is a problem even if you are not involving the binder, since is it by design from the ground up in Kafka Streams (at least at the moment). This is because each time you restart the application, the global state store is going to be restored directly from the source topic, thus completely short-circuiting any custom logic in the application. I would appreciate it if you can chime in and provide your thoughts, so that we can decide on how to proceed.
@sobychacko I wouldn't duplicate data just to spare a few lines of code. However, in my current project our own services are the consumers and the producers, too. Thus I've decided that when importing data every event type goes into its own topic thereby allowing for the one-liner state store builders. The application restart is only an issue if the streams data won't be kept in a permanent location. The aforementioned services keep their data in k8s PVCs allowing for the continuation of work where it was left.
Ya, as long as you have data going into its own topic, then the oneliner state store builders should work. Basically in that case, there is no event type routing, rather everything from the topic will be read. It's a bummer that we couldn't implement it for GlobalKTable
as it is a restriction from Kafka Streams itself. I see that there are some ongoing discussions there to improve that behavior. Once it is possible to do so in Kafka Streams, then we can revisit this use case. In the meantime, please feel free to close this issue if you agree.
@andrashatvani What are your thoughts on closing this issue? Maybe we can create a separate issue for addressing this use case for GlobalKTable
once the support is available in Kafka Streams?
OK, sounds good. Thanks for your effort and support!
Thank you, closing the issue now. Feel free to ping here or create a new issue if you find a solution to address the problem or if it is available to do so in Kafka Streams.
@sobychacko The topic and the headers will be omitted when using event_type-based routing - why? On lines https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/blob/63b306d34c2c0a84267e0842f15d0d10b8ed17d2/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/AbstractKafkaStreamsBinderProcessor.java#L457 and https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/blob/63b306d34c2c0a84267e0842f15d0d10b8ed17d2/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/AbstractKafkaStreamsBinderProcessor.java#L559
Thus, it's currently not possible to use event type routing and method-based json target type determination based on topic or header data.
@andrashatvani That is a bug. If you don't mind, could you create a new issue and link to the comment above? I will try to fix it then.
As a developer, I would like to add a routing function in my Kafka Streams application so that a routing expression can be used to select the correct Kafka Streams processor to route to. For more information on the context of this request, see this gitter chat.