spring-cloud / spring-cloud-stream

Framework for building Event-Driven Microservices
http://cloud.spring.io/spring-cloud-stream
Apache License 2.0
1.01k stars 614 forks source link

discrepancy between kafka and rabbitmq binder #2185

Open jonenst opened 3 years ago

jonenst commented 3 years ago

Describe the issue When I send a message with a header containing an UUID, it is received on the other end as an UUID with the kafka binder, but as a String with the rabbitmq binder. This causes code that checks for equality to fail.

To Reproduce Send

MessageBuilder.withPayload("").setHeader("foo", UUID.randomUUID());

Receive

"8554052e-da16-4f83-a152-3f083d238386".equals(message.getHeaders().get("foo")) // works with rabbitmq, doesn't work with kafka

Version of the framework 3.0.3.RELEASE Expected behavior The default simple code should behave the same accross binders

Additional context To fix this, I know I can use

"8554052e-da16-4f83-a152-3f083d238386".equals(message.getHeaders().get("foo").toString()) // toString workaround

but this is a workaround for the fact that I don't get the same object across binders. Should they all return an UUID because that's what I sent ? Or a String because the sender should have called toString() on the input ?

olegz commented 3 years ago
Screen Shot 2021-06-28 at 14 50 12

Unfortunately I can not reproduce it as you can see form above. The version of stream is 3.0.3 (as mentioned). Can you please post a small project that reproduces the issue so we can take a look?

jonenst commented 3 years ago

Hi, sorry I made a typo on the version. It's 3.1.3. I'll try to write a simple demo.

jonenst commented 3 years ago

Or maybe it was 3.0.3. I'm not sure anymore. I'll write a simple repro case.

jonenst commented 3 years ago

Here's a maven repro:

pom.xml

<?xml version="1.0" encoding="utf-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>com.mycompany.app</groupId>
  <artifactId>my-cas</artifactId>
  <version>1.0-SNAPSHOT</version>
  <name>my-cas</name>
  <url>http://www.example.com</url>
  <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <maven.compiler.source>8</maven.compiler.source>
    <maven.compiler.target>8</maven.compiler.target>
  </properties>
  <dependencies>
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <scope>test</scope>
    </dependency>
    <dependency>
      <groupId>org.springframework.cloud</groupId>
      <artifactId>spring-cloud-stream</artifactId>
    </dependency>
<!-- choose kafka or rabbit binder -->
<!--
    <dependency>
      <groupId>org.springframework.cloud</groupId>
      <artifactId>spring-cloud-stream-binder-rabbit</artifactId>
      <scope>runtime</scope>
    </dependency>
-->
    <dependency>
      <groupId>org.springframework.cloud</groupId>
      <artifactId>spring-cloud-stream-binder-kafka</artifactId>
      <scope>runtime</scope>
    </dependency>
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-test</artifactId>
      <scope>test</scope>
    </dependency>

  </dependencies>
  <dependencyManagement>
    <dependencies>
      <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-dependencies</artifactId>
        <version>2.4.7</version>
        <type>pom</type>
        <scope>import</scope>
      </dependency>
      <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-dependencies</artifactId>
        <version>2020.0.3</version>
        <type>pom</type>
        <scope>import</scope>
      </dependency>
    </dependencies>
  </dependencyManagement>
</project>

src/main/resources/application.yml

spring:
  cloud:
    function:
      definition: consumenotification
    stream:
      bindings:
        consumenotification-in-0:
          destination: mynotificationqueue
        publishnotification-out-0:
          destination: mynotificationqueue
      source: publishnotification

src/test/java/com/mycompany/app/AppTest.java

package com.mycompany.app;
import org.junit.runner.RunWith;
import org.springframework.test.context.junit4.SpringRunner;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.cloud.stream.function.StreamBridge;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import java.util.function.Consumer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.Message;
import org.junit.Test;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;

@SpringBootTest
@RunWith(SpringRunner.class)
public class AppTest {

    @Autowired
    private StreamBridge streamBridge;

    private static CountDownLatch done = new CountDownLatch(1);

    @Test
    public void testEmptyConfiguration() throws Exception {
        Message<String> msg = MessageBuilder
            .withPayload("")
            .setHeader("foo", UUID.randomUUID())
            .build();
        System.out.println("================== SENDING UUID CLASS =============");
        System.out.println(msg.getHeaders().get("foo"));
        System.out.println(msg.getHeaders().get("foo").getClass());
        streamBridge.send("publishNotification-out-0", msg);
        done.await();
    }

    @SpringBootApplication
    public static class SampleConfiguration {
        @Bean
        public Consumer<Message<String>> consumeNotification() {
            return msg -> {
                System.out.println("================== RECEIVED UUID CLASS =============");
                System.out.println(msg.getHeaders().get("foo"));
                System.out.println(msg.getHeaders().get("foo").getClass());
                done.countDown();
            };
        }
    }
}

running with rabbitmq: (I use docker run -p 5672:5672 rabbitmq to launch a local rabbitmq)

$ mvn clean package
================== SENDING UUID CLASS =============
d951bdae-d32f-4486-b086-ddb554f8aa6d
class java.util.UUID

================== RECEIVED UUID CLASS =============
d951bdae-d32f-4486-b086-ddb554f8aa6d
class java.lang.String

running with kafka: (I manually download and extract kafka and then run bin/zookeeper-server-start.sh config/zookeeper.properties and bin/kafka-server-start.sh config/server.properties )

$ mvn clean package
================== SENDING UUID CLASS =============
ce744ad2-f31a-4611-8b94-b194e81b53a9
class java.util.UUID
================== RECEIVED UUID CLASS =============
ce744ad2-f31a-4611-8b94-b194e81b53a9
class java.util.UUID
jonenst commented 3 years ago

So it seems to be the same in 3.0.3 and 3.1.3

olegz commented 3 years ago

Still no joy with the current snapshot. Here is my full app

@SpringBootApplication
public class DemoStreamApplication {

    public static void main(String[] args) {
        ApplicationContext context = SpringApplication.run(DemoStreamApplication.class,
                "--spring.cloud.function.definition=func"
                );
        StreamBridge bridge = context.getBean(StreamBridge.class);
        Message<String> msg = MessageBuilder
                .withPayload("")
                .setHeader("foo", UUID.randomUUID())
                .build();
        bridge.send("func-in-0", msg);
    }

    @Bean
    public Function<Message<String>, Message<String>> func() {
        return m -> {
            System.out.println("foo header class: " + m.getHeaders().get("foo").getClass());
            return m;
        };
    }

    @Bean
    public Function<Flux<Message<String>>, Flux<Message<String>>> funcReactive() {
        return flux -> flux.map(m -> {
            System.out.println("foo header class: " + m.getHeaders().get("foo").getClass());
            return m;
        });
    }
}

. . . and the output for both (reactive and imperative) cases is

foo header class: class java.util.UUID

The above is with RabbitMQ binder and current snapshot of s-c-stream What am I missing?