smallrye / smallrye-reactive-messaging

SmallRye Reactive Messaging
http://www.smallrye.io/smallrye-reactive-messaging/
Apache License 2.0
239 stars 177 forks source link

KafkaProducer serializer broken when using PublisherBuilder to convert to Message #1340

Closed kabir closed 3 years ago

kabir commented 3 years ago

I noticed when updating the WildFly quickstart that the KafkaProducer gets confused about what the payload is when PublisherBuilder.map() has been used to create a message from a stream of the payloads.

I have modified the SR RM quickstart to showcase the problem here. https://github.com/smallrye/smallrye-reactive-messaging/compare/main...kabir:publisher-with-message-problem?expand=1#diff-db6074d3260024cab0f3ac87e05a6b1f4c5889f07e181960b2d08fbc1ab4d26c (the first commit is me deleting a bunch of stuff to avoid wiring errors, and adjusting the class path to be able to run it)

Essentially I have changed

    @Outgoing("prices-out")
    public Multi<Message<Double>> generate() {
        // Build an infinite stream of random prices
        // It emits a price every 8 seconds
        return Multi.createFrom().ticks().every(Duration.ofSeconds(8))
                .map(x -> Message.of(random.nextDouble()));
    }

to

    @Outgoing("one")
    public Multi<Message<Double>> source() {
        // Build an infinite stream of random prices
        // It emits a price every 8 seconds
        return Multi.createFrom().ticks().every(Duration.ofSeconds(5))
                .map(x -> Message.of(random.nextDouble()));
    }

    @Incoming("one")
    @Outgoing("prices-out")
    public Publisher<Message<Double>> generate(Publisher<Double> messages) {
        return ReactiveStreams.fromPublisher(messages)
                .map(d -> Message.of(d))
                .buildRs();
    }

When I do it this way, I get errors as the following

rg.apache.kafka.common.errors.SerializationException: Can't convert value of class org.eclipse.microprofile.reactive.messaging.Message$$Lambda$988/0x0000000800a4ec40 to class org.apache.kafka.common.serialization.DoubleSerializer specified in value.serializer
Caused by: java.lang.ClassCastException: class org.eclipse.microprofile.reactive.messaging.Message$$Lambda$988/0x0000000800a4ec40 cannot be cast to class java.lang.Double (org.eclipse.microprofile.reactive.messaging.Message$$Lambda$988/0x0000000800a4ec40 is in unnamed module of loader java.net.URLClassLoader @f5167f0; java.lang.Double is in module java.base of loader 'bootstrap')
    at org.apache.kafka.common.serialization.DoubleSerializer.serialize(DoubleSerializer.java:19)
    at org.apache.kafka.common.serialization.Serializer.serialize(Serializer.java:62)
    at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:925)
    at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:885)
    at io.smallrye.reactive.messaging.kafka.impl.ReactiveKafkaProducer.lambda$send$3(ReactiveKafkaProducer.java:102)
    at io.smallrye.mutiny.operators.uni.builders.UniCreateWithEmitter.subscribe(UniCreateWithEmitter.java:22)
    at io.smallrye.mutiny.operators.AbstractUni.subscribe(AbstractUni.java:36)
    at io.smallrye.mutiny.operators.uni.UniRunSubscribeOn.lambda$subscribe$0(UniRunSubscribeOn.java:27)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:834)

So essentially it seems to be wanting to serialise the whole Message with the value serializer, rather than just the payload when doing it this way.

I tried a few other things and they work fine (the source method is the same as in the previous example). e.g:

    @Incoming("one")
    @Outgoing("prices-out")
    public Message<Double> generate(Double d) {
        return Message.of(d);
    }

Also this slightly pointless one

   @Incoming("one")
    @Outgoing("two")
    public Message<Double> generate(Double d) {
        return Message.of(d);
    }

    @Incoming("two")
    @Outgoing("prices-out")
    public Publisher<Message<Double>> generate(Publisher<Message<Double>> messages) {
        return messages;
    }

It fails again if I put the RSO stuff in an earlier method

    @Incoming("one")
    @Outgoing("two")
    public Publisher<Message<Double>> generate(Publisher<Double> messages) {
        return ReactiveStreams.fromPublisher(messages)
                .map(d -> Message.of(d))
                .buildRs();
    }

    @Incoming("two")
    @Outgoing("prices-out")
    public Message<Double> generate(Message<Double> m) {
        return m;
    }

If I change this last example to

    @Incoming("one")
    @Outgoing("two")
    public Publisher<Message<Double>> generate(Publisher<Double> messages) {
        return ReactiveStreams.fromPublisher(messages)
                .map(d -> Message.of(d))
                .buildRs();
    }

    @Incoming("two")
    @Outgoing("prices-out")
    public Message<Double> generate(Message<Double> m) {
        return Message.of(m());
    }

Then, I get the classcast exception when calling the Message.of() method.

Poking a bit further in the last method, I can see that the message is double wrapped.

    @Incoming("two")
    @Outgoing("prices-out")
    public Message<Double> generate(Message<Double> m) {
        System.out.println("---> " + m);
        System.out.println("---> " + m.getPayload());
        Message<Double> m2 = (Message) ((Object) m.getPayload());
        System.out.println("---> " + m2.getPayload());
        return m2;
    }
Output:
---> org.eclipse.microprofile.reactive.messaging.Message$$Lambda$969/0x0000000800991040@74dbf530
---> org.eclipse.microprofile.reactive.messaging.Message$$Lambda$969/0x0000000800991040@71e529ed
---> 0.2821780018692911

I double checked the previous method, to see if what it receives is actually just the Double payload and it is

    @Incoming("one")
    @Outgoing("two")
    public Publisher<Message<Double>> generate(Publisher<Double> messages) {
        return ReactiveStreams.fromPublisher(messages)
                .map(d -> {
                    System.out.println("!!!!! " + d);
                    return Message.of(d);
                })
                .buildRs();
    }

    @Incoming("two")
    @Outgoing("prices-out")
    public Message<Double> generate(Message<Double> m) {
        System.out.println("---> " + m);
        System.out.println("---> " + m.getPayload());
        Message<Double> m2 = (Message) ((Object) m.getPayload());
        System.out.println("---> " + m2.getPayload());
        return m2;
    }
Output:
!!!!! 0.996955166651525
---> org.eclipse.microprofile.reactive.messaging.Message$$Lambda$600/0x0000000800725040@1e22484b
---> org.eclipse.microprofile.reactive.messaging.Message$$Lambda$600/0x0000000800725040@1d069ff
---> 0.996955166651525

So it seems that somehow when using RSO to process the streams and create Message instances that these get double-wrapped.

kabir commented 3 years ago

I believe the problem is in StreamTransformerMediator.processMethodConsumingAPublisherOfPayload() for this particular case and probably also will be in processMethodConsumingAPublisherBuilderOfPayload().

That is where the extra wrapping happens. I think it needs to take into account whether the generic MediatorConfigurationSupport.returnTypeAssignable is a Message or not.

cescoffier commented 3 years ago

You can't mix payloads and messages. Such a signature is not valid: Publisher<Message<Double>>generate(Publisher<Double> messages), it needs to be Publisher<Message<Double>> generate(Publisher<Message<Double>> messages).

I'm preparing a PR that detects and reports invalid signatures.