spring-cloud / spring-cloud-stream

Framework for building Event-Driven Microservices
http://cloud.spring.io/spring-cloud-stream
Apache License 2.0
1.01k stars 614 forks source link

@EmbeddedKafka not working properly with GlobalKTable and transformer #2319

Open sobychacko opened 2 years ago

sobychacko commented 2 years ago

@ecristobal commented on Mon Feb 07 2022

Hi guys!

In this repo I've attached a project that is not currently working. To sum up what it is intended to do, we have 2 input topics: one with incoming purchases (KStream) and one with an item catalog (GlobalKTable, materialized as a state store). The idea is, each time we receive a purchase, we enrich its content with a property stored on the KTable, and with some other operations an enriched purchase object will be returned. There are also a couple of caches implemented with Spring and Redis to cache calls to a couple of external methods (I've added the code to give you full context, but I don't think they affect tp the result of the sample).

However, I didn't find a way to make this example work. I've tried to switch from GlobalKTable to KTable, follow your documentation about state stores, transformations and so on, but it wasn't possible to me to make this work. The project also contains a sample test to reproduce the problem here.

Please, can you help us in figuring out what is happening?

Thanks and regards!

onobc commented 2 years ago

Hi @ecristobal,

I am taking a look at your example repo - thanks for providing it. When you say:

However, I didn't find a way to make this example work.

can you give me some details on what is not working? Is it the IT, or is it running the actual app? Any stacktrace you can provide?

I attempted to run the IT, and it failed. Here is what I did to get it working:

Failure 1

Caused by: java.lang.NullPointerException: Cannot invoke "java.lang.Boolean.booleanValue()" because the return value of "org.springframework.web.client.RestTemplate.getForObject(String, java.lang.Class, Object[])" is null
    at com.example.enrich.validators.MemberValidator.validate(MemberValidator.java:25) ~[classes/:na]
    at com.example.enrich.validators.MemberValidator.validate(MemberValidator.java:9) ~[classes/:na]

It is because the MemberValidator endpoint is not mocked and returns null which then NPE's. I fixed that and then ran into the next issue.

Failure 2

Caused by: java.lang.ClassCastException: class org.apache.kafka.streams.state.ValueAndTimestamp cannot be cast to class com.example.enrich.messages.OneCatalogItem (org.apache.kafka.streams.state.ValueAndTimestamp and com.example.enrich.messages.OneCatalogItem are in unnamed module of loader 'app')
    at com.example.enrich.streams.transformers.PurchaseTransformer.lambda$transform$2(PurchaseTransformer.java:37) ~[classes/:na]
    at java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:197) ~[na:na]
    at java.base/java.util.stream.DistinctOps$1$2.accept(DistinctOps.java:174) ~[na:na]
    at java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:197) ~[na:na]
    at java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1625) 
...

The reason is that the state store is a TimestampedKeyValueStore which deals w/ ValueAndTimestamp<V>. Adjusting the purchaseTransformer accordingly gets us past this problem.

At this point the test passes. Please adjust your code as I did above and see if it then works as you expect.

Extra info

Not that you asked, but since I was in the code I wanted to give a few comments/suggestions. Keep what you like, discard the rest 😸

Minimize levers when debugging

When debugging problems like this I will typically remove pieces until I get it working - aka limit the number of variables in the equation. In this case I saw the following pieces to remove to simplify and get to the heart of the issue:

Testcontainers is your friend

I would consider using testcontainers for the Redis server, when/if I bring Redis into the IT equation

Use SpringBoot provided auto-configured beans

Unless you are doing this for a specific reason, I would consider using the RestTemplate provided by SpringBoot rather than construct my own in ValidatorsUtilitiesConfiguration

onobc commented 2 years ago

Here is a patch of my changes that I describe above