bakdata / fluent-kafka-streams-tests

Fluent Kafka Streams Test with Java
https://medium.com/bakdata/fluent-kafka-streams-tests-e641785171ec
MIT License
88 stars 16 forks source link

How to enable SchemaRegistryMock when using Spring Kafka + @Autowired + manual overriding of Avro Serdes? #63

Closed kensixx closed 2 years ago

kensixx commented 2 years ago

Hello, this is not an issue on the library, more of like asking for help because I have a slightly different code structure from the example codes and I can't seem to figure it out..

I am facing an issue wherein I'm using a SpecificAvroSerde that is initiated like so:

// When you want to override serdes explicitly/selectively
final Map<String, String> serdeConfig = Collections.singletonMap("schema.registry.url",
                                                                 "http://my-schema-registry:8081");
// `Foo` and `Bar` are Java classes generated from Avro schemas
final Serde<Foo> keySpecificAvroSerde = new SpecificAvroSerde<>();
keySpecificAvroSerde.configure(serdeConfig, true); // `true` for record keys
final Serde<Bar> valueSpecificAvroSerde = new SpecificAvroSerde<>();
valueSpecificAvroSerde.configure(serdeConfig, false); // `false` for record values

And I'm using @Autowired on my implementation.

So I think during the startup of my unit test, the autowired bean is being created first, and because of that, it doesn't access the SchemaRegistryMock.

Example Code: My @Autowired Implementation

@Component
public class HourlyLoginCountProcessor {
    private static final String LOGIN_RAW_STREAM_TOPIC_NAME = "LOGIN_RAW_STREAM";
    private static final String LOGIN_REVERSE_GEOCODED_TOPIC_NAME = "LOGIN_REVERSE_GEOCODED";

    @Autowired
    public void buildPipeline(StreamsBuilder streamsBuilder) throws IOException {
        KStream<String, LoginRaw> loginRawStream = streamsBuilder.stream(LOGIN_RAW_STREAM_TOPIC_NAME,
                Consumed.with(Serdes.String(), this.getLoginRawSerde()));

        // groupBy is required when aggregating, in our case, only the Service name is common to all of them
        // so we're putting that as a key of the message
        KGroupedStream<String, LoginRaw> loginRawStreamGroupedByService = loginRawStream.groupBy(
            (key, value) -> value.getService().toString()
        );

        KTable<Windowed<String>, LoginCount> loginCountTable = loginRawStreamGroupedByService
            .windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(3L)))
            .aggregate(
                /* initializer */
                () -> {
                    LoginCount loginCount = new LoginCount();

                    //todo on service
                    loginCount.setService("");
                    loginCount.setLoginCount(0L);
                    loginCount.setWithLocation(0L);

                    // blank placeholder to avoid runtime error
                    loginCount.setStartTime("");
                    loginCount.setEndTime("");

                    return loginCount;
                },
                /* aggregation portion */
                (key, loginRaw, loginCountAggregate) -> {
                    loginCountAggregate.setLoginCount(loginCountAggregate.getLoginCount() + 1);

                    // do an increment only if login data has latitude or longitude values.
                    if (loginRaw.getLatitude() != null) {
                        loginCountAggregate.setWithLocation(loginCountAggregate.getWithLocation() + 1);
                    }

                    String captureRate = this.calculateCaptureRate(loginCountAggregate);
                    loginCountAggregate.setCaptureRate(captureRate);

                    loginCountAggregate.setService(loginRaw.getService());

                    return loginCountAggregate;
                }
            );

        /* adding the Window startTime and endTime to the message */
        loginCountTable
            /* post a new entry ONLY at the end of the Window */
            .suppress(Suppressed.untilWindowCloses(unbounded()))
            .toStream()
            .map((window, loginCount) -> {
                String startTime = LocalDateTime.ofInstant(window.window().startTime(), ZoneId.of("Asia/Manila")).toString();
                String endTime = LocalDateTime.ofInstant(window.window().endTime(), ZoneId.of("Asia/Manila")).toString();

                loginCount.setStartTime(startTime);
                loginCount.setEndTime(endTime);

                return KeyValue.pair(window.key(), loginCount);
            })
            .to(LOGIN_REVERSE_GEOCODED_TOPIC_NAME, Produced.keySerde(new WindowedSerdes.TimeWindowedSerde(Serdes.String())));
    }

    private String calculateCaptureRate(LoginCount aggregate) {
        // omitted for brevity 
    }

    public Serde<LoginRaw> getLoginRawSerde() {
        final Map<String, String> config = new HashMap<>();
        final Serde<LoginRaw> serde = new SpecificAvroSerde<>();

        config.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,
                this.schemaRegistryUrl); // not working but I just put this here anyway
        serde.configure(config, false);

        return serde;
    }
}

Sample: Unit Test code:

@SpringBootTest
public class HourlyLoginCountProcessorTests {
    private SchemaRegistryMock schemaRegistryMock = new SchemaRegistryMock();

    @Autowired
    HourlyLoginCountProcessor hourlyLoginCountProcessor;

    TestTopology<Object, Object> testTopology = null;

    @BeforeEach
    public void setUp() throws IOException {
        this.schemaRegistryMock.start();

        StreamsBuilder streamsBuilder = new StreamsBuilder();
        this.hourlyLoginCountProcessor.buildPipeline(streamsBuilder);

        Topology topology = streamsBuilder.build();

        this.testTopology = new TestTopology<>(topology, this.getStreamsConfiguration("http://dummy"))
                .withSchemaRegistryMock(this.schemaRegistryMock);

        this.testTopology.start();
    }

    @Test
    void HourlyLoginCountProcessorTest() {
        LoginRaw loginRaw = this.generateLoginRawForSauyo();
        LoginRaw loginRawNullLocation = this.generateLoginRawWithNullLocation();

        // try inserting data into the LOGIN_RAW_STREAM stream / topic
        this.testTopology.input()
                .add("", loginRaw)
                .add("", loginRawNullLocation);

        this.testTopology.input()
                .at(TimeUnit.SECONDS.toMillis(1)).add("", loginRaw)
                .at(TimeUnit.SECONDS.toMillis(2)).add("", loginRawNullLocation);
//                .at(System.currentTimeMillis()).add(dummyInput); // flush KTable

        // see the output of the stream based on input
        ProducerRecord<Object, LoginCount> record =
                this.testTopology.tableOutput().withValueType(LoginCount.class).readOneRecord();

        // should use .toString() method because the field is in CharSequence
        System.out.println("haha");
    }

    private LoginRaw generateLoginRawWithNullLocation() {
        // omitted for brevity
    }

    private LoginRaw generateLoginRawForSauyo() {
        // omitted for brevity
    }

    private Properties getStreamsConfiguration(String url) {
        final Properties streamsConfiguration = new Properties();
        streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "TopologyTestDriver");
        streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "mock:9092");
        streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, SpecificAvroSerde.class);
        // giving any value to SCHEMA_REGISTRY_URL_CONFIG will activate a mocked Schema Registry.
        // actual value is ignored
        streamsConfiguration.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, url);

        return streamsConfiguration;
    }
}

Any help would be greatly appreciated, like an advice on how am I able to refactor my unit test and/or implementation class using @Autowired so this can work together without facing runtime issues on the unit test, thanks so much! 🙏🏻

philipp94831 commented 2 years ago

Hi @kensixx, I don't quite understand where your first code snippet is being called in the second snippet

kensixx commented 2 years ago

Hi @philipp94831, thanks for this, I update my code above 😄

philipp94831 commented 2 years ago

Something like this should work

this.testTopology = new TestTopology<>(properties -> {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        this.hourlyLoginCountProcessor.setSchemaRegistryUrl(properties.getProperty("schema.registry.url"));
        this.hourlyLoginCountProcessor.buildPipeline(streamsBuilder);

        return streamsBuilder.build();
}, this.getStreamsConfiguration("http://dummy"));

You also don't need to provide your own SR mock. There is one embedded in the TestTopology

kensixx commented 2 years ago

Thanks for this!

I updated the code the unit test as your advice,

and I also added a field and setter on my implementation like so:

@Component
public class HourlyLoginCountProcessor {
    private static final String LOGIN_RAW_STREAM_TOPIC_NAME = "LOGIN_RAW_STREAM";
    private static final String LOGIN_REVERSE_GEOCODED_TOPIC_NAME = "LOGIN_REVERSE_GEOCODED";

    private String schemaRegistryUrl;

    /**
     * Setter method for property <tt>schemaRegistryUrl</tt>.
     *
     * @param schemaRegistryUrl value to be assigned to property schemaRegistryUrl
     */
    public void setSchemaRegistryUrl(String schemaRegistryUrl) {
        this.schemaRegistryUrl = schemaRegistryUrl;
    }

    @Autowired
    public void buildPipeline(StreamsBuilder streamsBuilder) throws IOException {
        KStream<String, LoginRaw> loginRawStream = streamsBuilder.stream(LOGIN_RAW_STREAM_TOPIC_NAME,
    .
    .
    .
    .

but apparently the unit test is getting a NullPointerException on this part:

public Serde<LoginRaw> getLoginRawSerde() {
        final Map<String, String> config = new HashMap<>();
        final Serde<LoginRaw> serde = new SpecificAvroSerde<>();

        config.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,
                this.schemaRegistryUrl); // <----- right here
        serde.configure(config, false);

        return serde;
    }
philipp94831 commented 2 years ago

Can you set a break point in the lambda and check the properties object? Does it contain a schema registry url?

kensixx commented 2 years ago

I set the breakpoint inside the lambda in the unit test, I still get an NPE and it didn't even show me a breakpoint info in the lambda.

Because of that, I then tried to set a breakpoint in the serde getter in my @Autowired implementation.

It's the one running first before the unit test... maybe because I set @Autowired in my unit test??? Correct me if I'm wrong 🙏🏻

image

Anyway it's set to null inside my HourlyLoginCountProcessor 😞 certainly my Spring knowledge is messed up right now 😢

philipp94831 commented 2 years ago

I have no experience with Spring Boot. It would be really helpful to see how the properties object is looking inside the lambda so we know how the properties look like that are passed to the topology factory. Does this mean that your NPE is thrown before reaching the lambda? In that case, something seems to go wrong with autowired. You could check if schema registry url is present and only set it in that case

kensixx commented 2 years ago

Hello @philipp94831, for the sake of the discussion on @Autowired, I tried updating my impl code in a way that I basically removed the annotations such as @Component in my impl class. I also added a constructor that takes in a schemaRegistryUrl string.

This way, I will be able to instantiate my impl class inside the unit test where the generation of the mock schema registry, and everything else in this library comes first.

I was able to get positive results. 🙏🏻 But, as stated, all of my annotations are gone.

So the downside is, I guess when I deploy this, I would really need to bring back the annotations, thus I need to remove the unit test.

Except there's some core Spring Boot knowledge that I'm really missing out (I'm pretty sure 😅 ) that I can actually use both @Autowired annotation and the library at the same time (with the case of manual overriding of Avro Serdes).

(Note that I was able to use this library with @Autowired in my other impl class but I wasn't using a manual overriding of Avro Serdes.)

philipp94831 commented 2 years ago

@kensixx Is this part this.hourlyLoginCountProcessor.setSchemaRegistryUrl(properties.getProperty("schema.registry.url")); reached before the NPE is fired? (with @Autowired)

kensixx commented 2 years ago

@philipp94831 It didn't :( that's why I had a clue / guess to set a breakpoint on the impl class because I was suspecting the @Autowired ran first,

philipp94831 commented 2 years ago

@kensixx Then try sth like this

if (this.schemaRegistryUrl != null) {
    config.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, this.schemaRegistryUrl);
}

When the method is called via autowired, there is no NPE. When you call it explicitly, it is called with the now configured schema registry url

philipp94831 commented 2 years ago

Or you create another method of your autowired buildPipeline and make it not autowired. That way you can test your code without spring

kensixx commented 2 years ago

I tried both of these just now, ran into some errors related to creating the Spring Context on startup.

The ending was I didn't put @Autowired on the impl class inside the unit test anymore, and disabled the @SpringBootTest annotation.

Or you create another method of your autowired buildPipeline and make it not autowired. That way you can test your code without spring

With this, I also put another method in the impl class like so:

public void buildPipelineForUnitTestPurposeOnly(StreamsBuilder streamsBuilder) throws IOException {
        this.buildPipeline(streamsBuilder);
    }

and that's the one I called in the unit test. And I think it works as expected! Thanks @philipp94831! This is helpful 🙏🏻 but I really feel there's a more elegant solution that combines all of these. But well, this is better than no progress at all!

I do appreciate your fast response @philipp94831. Thank you!

philipp94831 commented 2 years ago

Glad to hear that you was able to work around it. To me this sounds more like a Spring Boot issue as this problem would not occur without it