alibaba / spring-cloud-alibaba

Spring Cloud Alibaba provides a one-stop solution for application development for the distributed solutions of Alibaba middleware.
https://sca.aliyun.com
Apache License 2.0
27.91k stars 8.33k forks source link

关于使用spring-cloud-starter-stream-rocketmq 2021.1 版本中的Spring Cloud Stream新特性 #2174

Closed fzdwx closed 3 years ago

fzdwx commented 3 years ago

1.我的依赖

<dependency>
    <groupId>com.alibaba.cloud</groupId>
    <artifactId>spring-cloud-starter-alibaba-nacos-config-server</artifactId>
    <version>2021.1</version>
</dependency>

2.producer

/**
 * 注册producer:test-out-0
 *
 * @return {@link Supplier<String>}
 */
@Bean
public Supplier<String> test() {
    return () -> {
        String s= UUID.fastUUID().toString()
        System.out.println("发送消息:" + s);
        return s;
    };
}

3.comsumer


/**
 * 注册consumer:test-in-0
 *
 * @return {@link Consumer<String>}
 */
@Bean
public Consumer<String> test() {
    return (s) -> {
        System.out.println("收到消息:" + s);
    };
}

4.问题 启动后会自动一直发信息,这种咋办? prodcuer image

consumer

image

fzdwx commented 3 years ago

配置

spring:
  cloud:
    stream:
      function:
        definition: test
      bindings:
        test-out-0:
          destination: test-topic
          content-type: application/json
        test-in-0:
          destination: test-topic
          content-type: application/json
          group: test-group
      rocketmq:
        binder:
          name-server: 127.0.0.1:9876 # RocketMQ Namesrv 地址
          grou: test
        bindings:
          test-out-0:
            producer:
              group: test-group
              sync: true
          test-in-0:
            consumer:
              group: test-group
              orderly: true
fzdwx commented 3 years ago

1.我的依赖

<dependency>
  <groupId>com.alibaba.cloud</groupId>
  <artifactId>spring-cloud-starter-alibaba-nacos-config-server</artifactId>
  <version>2021.1</version>
</dependency>

2.producer

/**
 * 注册producer:test-out-0
 *
 * @return {@link Supplier<String>}
 */
@Bean
public Supplier<String> test() {
    return () -> {
        String s= UUID.fastUUID().toString()
        System.out.println("发送消息:" + s);
        return s;
    };
}

3.comsumer

/**
 * 注册consumer:test-in-0
 *
 * @return {@link Consumer<String>}
 */
@Bean
public Consumer<String> test() {
    return (s) -> {
        System.out.println("收到消息:" + s);
    };
}

4.问题 启动后会自动一直发信息,这种咋办? prodcuer image

consumer

image

luocongqiu commented 3 years ago

可以去了解一下spring-cloud-stream的Supplier,默认是1秒产生一条消息

fzdwx commented 3 years ago

可以去了解一下spring-cloud-stream的Supplier,默认是1秒产生一条消息

谢谢,已经处理好了。最后我用那个reactor里面的Sinks弄好了。这个spirng cloud stream 3确实改变有点大

haihaiya commented 2 years ago

麻烦请教下函数方式配置消费者如何设置tags啊