eclipse / microprofile-reactive-messaging

Apache License 2.0
60 stars 37 forks source link

Kafka Tombstone with Reactive Messaging #155

Open BastianSperrhacke-Otto opened 2 years ago

BastianSperrhacke-Otto commented 2 years ago

Hi there. Currently I am implementing a Kafka-Message-Publisher using mp reactive messaging. Behind the scenes I use quarkus which comes with the smallrye implementation of the spec.

Our Kafka Topic is configured to "compact" which generally means, that the most recent message related to a given ID will be kept without being deleting by the kafka system. Now I have the problem to delete the recent message which means to get rid of the last message of that ID. For this purpose the kafka Guidelines recommend to send a so called "Tombstone", which is a Message with the ID in the Metadata and a NULL-Message. And the NULL-Message is the problem: Due to the spec of the Emitter I use the message must not be null:

public interface Emitter<T> {
    /**
     * Sends a message to the channel.
     *
     * @param <M> the <em>Message</em> type
     * @param **msg the <em>Message</em> to send, must not be {@code null}**
     * @throws IllegalStateException if the channel has been cancelled or terminated or if an overflow strategy of
     *         {@link OnOverflow.Strategy#THROW_EXCEPTION THROW_EXCEPTION} or {@link OnOverflow.Strategy#BUFFER BUFFER} is
     *         configured and the emitter overflows.
     */
    <M extends Message<? extends T>> void send(M msg);

Are there any thoughts or even solutions on that?

cescoffier commented 2 years ago

You can do:

@Incoming("in")
@Outgoing("out")
public Record<String, String> process(String in) {
    return Record.of("my-key", in);
}

Same with an emitter (just use null as in). See: https://smallrye.io/smallrye-reactive-messaging/3.18.0/kafka/writing-kafka-records/#serialization

BastianSperrhacke-Otto commented 2 years ago

Hi @cescoffier . Thank you for the answer. You mean non-standard io.smallrye.reactive.messaging.kafka.Record? I try this, stay tuned.

BastianSperrhacke-Otto commented 2 years ago

Hi @cescoffier . Finally I've got it even though it was little bit tricky. It works with this snippet:


    @Inject
    @Channel(CHANNEL_NAME)
    Emitter<io.smallrye.reactive.messaging.kafka.Record<String, my.app.OutgoingMessage>> kafkaEmitter;

    public void publish(my.app.OutgoingMessage myOutgoingMessage) {

    //metadata from io.smallrye.reactive.messaging.ce.OutgoingCloudEventMetadata
    Metadata metadata = Metadata.of(io.smallrye.reactive.messaging.ce.OutgoingCloudEventMetadata.builder()
                    .withId(UUID.randomUUID().toString())
                    .withSpecVersion("1.0") // cloudevent Spec.
                    .withType("mytype." + EVENT_SPEC_VERSION)
                    .withTimestamp(ZonedDateTime.now())
                    .withSource(new URI("//some.live.server/entity/" + myId))
            .build()
            );

    //sending message
    kafkaEmitter.send(Message.of(Record.of(myId.toString(), myOutgoingMessage), metadata));

    //sending tombstone
    kafkaEmitter.send(Message.of(Record.of(assignmentChanged.getAssignment().getId().toString(), null), metadata));

    }

But this feels for me as a spec-loving guy like a workaround, because I have to use some vendor (smallrye) specific stuff, especially the Record class, to get a null value into my payload.

I think it would be a nice Idea to add some ability to send NULL-Payload, like kafkaEmitter.sendNullRecord(metadata)

or kafkaEmitter.sendTombstone(id, metadata)

or kafkaEmitter.send(NullMessage.of(metadata)) where NullMessage behaves like Message, but skips the Nullchecks.

How are your thoughts on this? How can we achieve this?

cescoffier commented 2 years ago

As you use MEtadata already, you can achieve the same using the OutgoingKafkaMetadata.

Note that sendNullRecord and sendTombstone would have to be:

BastianSperrhacke-Otto commented 2 years ago

What I meant is to enhance the spec MP reactive messaging in one of the upcoming releases. Of course, that is probably not only a pull request ( I have absolutetly no idea, what effort is needed for a addition to the spec). So the Idea is to find a possibility that the user can use the spec (not a vendor specific class) to send a null Record. Currently there is IMO no possibility to do that, so that I had to use the Record class from smallrye.