AxonFramework / extension-kafka

Axon Framework extension for Kafka integration to publish and handle Event messages.
https://axoniq.io/
Apache License 2.0
66 stars 28 forks source link

Add ability to customize Kafka Event Publisher supportsReset flag via properties #487

Closed aupodogov closed 9 months ago

aupodogov commented 10 months ago

By default the Kafka Event Publisher doesn't support reset and this is not configurable at all. This PR adds ability to customise this via builder and Spring Boot configuration properties.

gklijs commented 10 months ago

Hi Podogov, can you explain the use case? Kafka cannot be reset. If a reset were possible, we would remove all the messages that have already been sent. But this is not possible.

aupodogov commented 10 months ago

Hi Podogov, can you explain the use case? Kafka cannot be reset. If a reset were possible, we would remove all the messages that have already been sent. But this is not possible.

Hi @gklijs, we want to all events, that already present in Axonserver event store, to be exported to Kafka when app with the extension runs for the first time. Currently it starts with ReplayToken and not producing messages to Kafka until replay is finished.

The EventMessageHandler.supportsReset method means that this event handler supports reset and replay. Is this correct?

gklijs commented 10 months ago

@aupodogov If you configure it with axon.kafka.producer.event-processor-mode=tracking or axon.kafka.producer.event-processor-mode=pooled_streaming I would expect it to start at the beginning, since the default token will be created at the tail.

The reset the pr is referring to, is to reset a projection. A typical event handler will store data in the data base, and, if implemented, the reset will throw away old data to prepare reading the data again.

aupodogov commented 10 months ago

@aupodogov If you configure it with axon.kafka.producer.event-processor-mode=tracking or axon.kafka.producer.event-processor-mode=pooled_streaming I would expect it to start at the beginning, since the default token will be created at the tail.

@gklijs that's right. But the ReplayToken is created and during replay the events not published to Kafka. Is there a way to force publishing events during a replay?

gklijs commented 10 months ago

I see the problem now, but I don't think this pr is a good solution, as sending to Kafka, is not replyable. In 4.9.0 we changed the default token to be a replay token, together with:

    private boolean canHandle(EventHandlerInvoker invoker, EventMessage<?> eventMessage, Segment segment) {
        return (invoker.supportsReset() || !ReplayToken.isReplay(eventMessage))
                && invoker.canHandle(eventMessage, segment);
    }

in the MultiEventHandlerInvoker the event are not send to Kafka. I think the better solution is to change the initial token on the event processor to a tail token.

aupodogov commented 9 months ago

Thanks for your clarifications @gklijs! I think this PR can be closed without merge.