alibaba / spring-cloud-alibaba

Spring Cloud Alibaba provides a one-stop solution for application development for the distributed solutions of Alibaba middleware.
https://sca.aliyun.com
Apache License 2.0
27.85k stars 8.32k forks source link

stream 4.04版本 集成rocketmq收不到消息,使用官方文档的demo也收不到,请大佬们指教。 #3418

Closed baiying319 closed 1 year ago

baiying319 commented 1 year ago

org.springframework.boot: 3.0.9 spring-cloud: 2022.0.4 spring-cloud-alibaba: 2022.0.0.0 rocketMq: 4.9.2 use spring-cloud-starter-stream-rocketmq

server上topic可以看见消息,消费者列表也有数据。但是一直无法消费,broker和server也没看见报错,java代码也没报错。 尝试和spring-cloud-stream4.x各种消费方法都无法消费(实在难以相信我写错了消费方法)。 最后我使用了其他的代码,降低了spring-cloud-stream 版本 到 3.x,使用@StreamListener注解消费成功。 请问 我要怎么样才可以在4.x上消费成功?

(The topic on the server can see messages, and the consumer list also has data. But it has been unable to consume, and the broker and server have not seen any errors, and the Java code has not reported any errors. Attempting and using various consumption methods such as spring cloud stream 4. x cannot consume (it's hard to believe I wrote the wrong consumption method). Finally, I used other code to reduce the spring cloud stream version to 3.x and successfully annotate consumption with @ StreamListener. May I ask how can I successfully consume on 4. x?)

WX20230806-010844@2x

please give me a help.

config
spring:
  cloud:
    function:
      # function定义,申明函数名称,只有消费者,与@Bean中的函数名对应起来,这是函数定义的地方,声明所有的函数名称。
      # 这个不能注释,注释的话,消费组都不出来了,在service中定义了sendSmsToAdmin
      definition: sendSmsToAdmin;sendSmsToUser #;sendSmsToAdminFunction;sendSmsToUserFunction
    stream:
      # 默认binder
      default-binder: rocketmq
      # 这里面要么声明的是消费者类 要么就是消费者类中的多个方法
      function:
        # stream Binding相关的函数定义,也需要声明使用到的函数名,需要确保消费者类的名称和这里一样,多个使用;区分
        definition: sendSmsToAdmin;sendSmsToUser #;sendSmsToAdminFunction;sendSmsToUserFunction
      rocketmq:
        # 在 stream.rocketmq.bindings 中,我们可以选择只配置 RocketMQ 特有的绑定参数,比如 producer/consumer 的详细配置
        # 在 stream.rocketmq.bindings 中没有配置 destination,而是放在了 stream.bindings 配置
        # 比较通用可观
        binder:
          name-server: centos8-2:9876
          group: producerGroup
        # 用于绑定 RocketMQ 作为消息中间件的 Binding 配置
#        bindings:
#          # 生产者必须有一个group
#          # 发给管理员
#          sendSmsToAdmin-out-0:
#            # rocketmq中配置了的bindings,下面的bindings不可以再配置
#            # 这里不配置 destination,在stream.bindings中对应的sendSmsToAdmin-out-0中配置
#            #destination: admin_sms_send
#            producer:
#              # group不能用中横线-
#              group: producer_admin_sms_send_group
#              sync: true
#              sendMessageTimeout: 3000
#          # 发给用户
#          sendSmsToUser-out-0:
#            #destination: user_sms_send
#            producer:
#              group: producer_user_sms_send_group
#              sync: true
#              sendMessageTimeout: 3000
#
#          # 上面的两个使用工具类发送,下面的使用Bean发送的
#          sendSmsToAdminFunction-out-0:
#            producer:
#              # group不能用中横线-
#              group: producer_function_admin_sms_send_group
#              sync: true
#              sendMessageTimeout: 3000
#          sendSmsToUserFunction-out-0:
#            producer:
#              # group不能用中横线-
#              group: producer_function_user_sms_send_group
#              sync: true
#              sendMessageTimeout: 3000

#            consumer:
#              messageModel: BROADCASTING # 设置为广播消费
      # 用于绑定通用消息中间件的 Binding 配置,不特定于某个中间件实现。rocketmq中配置了的bindings,下面的bindings不可以再配置
      bindings:
        # 配置生产者topic,group不能用中横线-,
        sendSmsToAdmin-out-0:
          destination: admin_sms_send
          group: producer_admin_sms_send_group
          binder: rocketmq
          content-type: application/json
        sendSmsToAdmin-in-0:
          destination: admin_sms_send
          group: producer_admin_sms_send_group
          binder: rocketmq
          content-type: application/json

        # 配置生产者topic
        sendSmsToUser-out-0:
          destination: user_sms_send
          group: producer_user_sms_send_group
          binder: rocketmq
          content-type: application/json
        sendSmsToUser-in-0:
          destination: user_sms_send
          group: consumer_user_sms_send_group
          binder: rocketmq
          content-type: application/json

producer:

  @Autowired
    private StreamBridge streamBridge;

    /**
     * 这个方法和sendSmsToAdminFunction一样 都是发消息
     *
     * @param message
     */
    public void sendSmsToAdmin(SmsSendMessage message) {
        log.info("要发送的短信内容为: {}", message);
        streamBridge.send("sendSmsToAdmin-out-0", message);
    }

    public void sendSmsToUser(Long userId, Long accountId) {
        log.info("要发送的短信内容为: {}", "userId:" + userId + "accountId:" + accountId);
        streamBridge.send("sendSmsToUser-out-0", "userId:" + userId + "  accountId:" + accountId);
    }

consumer:

 @Bean
    public Consumer<SmsSendMessage> sendSmsToAdmin() {
        return request -> {
            log.info("------received-------: {} ", request.getContent());
        };
    }
//
//
//    @Bean
//    public Consumer<Object> sendSmsToAdmin() {
//        return request -> {
//            log.info("------received-------: {} ", request);
//        };
//    }
//
    @Bean
    public Consumer<String> sendSmsToUser(){
        return request -> {
            log.info("-----received-------: {}", request);
        };
    }

pom:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example</groupId>
    <artifactId>mqtest</artifactId>
    <version>1.0-SNAPSHOT</version>

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>3.0.9</version>
    </parent>

    <properties>
        <jeecgboot.version>3.5.3</jeecgboot.version>
        <java.version>17</java.version>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>

        <!-- 微服务 -->
        <spring-cloud.version>2022.0.4</spring-cloud.version>
        <spring-cloud-alibaba.version>2022.0.0.0</spring-cloud-alibaba.version>
        <alibaba.nacos.version>2.0.4</alibaba.nacos.version>

        <xxl-job-core.version>2.4.0</xxl-job-core.version>
        <fastjson.version>1.2.83</fastjson.version>
        <guava.version>32.1.1-jre</guava.version>
        <lombok.version>1.18.28</lombok.version>
        <!--   A Java 1.6+ library providing a clean and lightweight markdown processor     -->
        <pegdown.version>1.6.0</pegdown.version>

        <knife4j-spring-boot-starter.version>3.0.3</knife4j-spring-boot-starter.version>
        <knife4j-spring-ui.version>2.0.9</knife4j-spring-ui.version>

        <!-- 数据库驱动 -->
        <postgresql.version>42.2.25</postgresql.version>
        <ojdbc6.version>11.2.0.3</ojdbc6.version>
        <sqljdbc4.version>4.0</sqljdbc4.version>
        <mysql-connector-java.version>8.0.27</mysql-connector-java.version>
        <hutool.version>5.8.20</hutool.version>
        <commons-collection.version>4.4</commons-collection.version>
        <snakeyaml.version>2.0</snakeyaml.version>
        <!-- 持久层 -->
        <mybatis-plus.version>3.5.3.1</mybatis-plus.version>
        <dynamic-datasource-spring-boot-starter.version>4.1.2</dynamic-datasource-spring-boot-starter.version>
        <druid.version>1.2.18</druid.version>
        <minidao.version>1.9.1</minidao.version>

        <!-- 积木报表-->
        <jimureport-spring-boot-starter.version>1.5.9</jimureport-spring-boot-starter.version>
        <commons.version>2.6</commons.version>
        <aliyun-java-sdk-dysmsapi.version>2.1.0</aliyun-java-sdk-dysmsapi.version>
        <aliyun.oss.version>3.11.2</aliyun.oss.version>

        <!--##### 缓存相关 #####-->
        <j2cache.version>2.8.0-release</j2cache.version>
        <lettuce.version>5.3.7.RELEASE</lettuce.version>

        <!-- 分布式锁,对应springboot2.0版本 -->
        <redisson.version>3.12.5</redisson.version>

        <!--对应 spring redis 里面lettuce 中的netty版本号 不然会冲突-->
        <netty.version>4.1.96.Final</netty.version>
        <commons-lang3.version>3.12.0</commons-lang3.version>
        <commons-codec.version>1.15</commons-codec.version>
        <commons-net.version>3.8.0</commons-net.version>
        <dom4j.version>1.6.1</dom4j.version>

        <!--   反射     -->
        <reflection.version>0.9.11</reflection.version>
        <jsoup.version>1.13.1</jsoup.version>
        <useragentutils.version>1.21</useragentutils.version>

        <!-- shiro -->
        <shiro.version>1.10.1</shiro.version>
        <java-jwt.version>3.11.0</java-jwt.version>
        <shiro-redis.version>3.1.0</shiro-redis.version>
        <codegenerate.version>1.4.3</codegenerate.version>
        <autopoi-web.version>1.4.5</autopoi-web.version>
        <minio.version>8.0.3</minio.version>
        <justauth-spring-boot-starter.version>1.4.0</justauth-spring-boot-starter.version>
        <qiniu-java-sdk.version>7.11.0</qiniu-java-sdk.version>

        <!-- Log4j2爆雷漏洞 -->
        <log4j2.version>2.17.0</log4j2.version>
        <logback.version>1.2.9</logback.version>

        <maven.test.skip>true</maven.test.skip>
        <docker-maven-plugin.version>1.2.0</docker-maven-plugin.version>
        <docker.baseImage>openjdk:8-jre-alpine</docker.baseImage>
        <docker.volumes>/tmp</docker.volumes>
        <docker.image.prefix>hub.mall.com:8080/mallcloud</docker.image.prefix>
        <docker.java.security.egd>-Djava.security.egd=file:/dev/./urandom</docker.java.security.egd>
        <docker.java.opts>-Xms128m -Xmx128m</docker.java.opts>
    </properties>

    <repositories>
        <repository>
            <id>aliyun</id>
            <name>aliyun Repository</name>
            <url>https://maven.aliyun.com/repository/public</url>
            <snapshots>
                <enabled>false</enabled>
            </snapshots>
        </repository>
    </repositories>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>

        <!-- Lombok -->
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>
        <!-- json -->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>

        <dependency>
            <groupId>ch.qos.logback</groupId>
            <artifactId>logback-classic</artifactId>
            <version>1.4.7</version>
        </dependency>

        <dependency>
            <groupId>ch.qos.logback</groupId>
            <artifactId>logback-core</artifactId>
            <version>1.4.7</version>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
        </dependency>

        <dependency>
            <groupId>com.alibaba.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
            <exclusions>
                <exclusion>
                    <groupId>commons-collections</groupId>
                    <artifactId>commons-collections</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-aop</artifactId>
        </dependency>

        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-collections4</artifactId>
        </dependency>

<!--        <dependency>-->
<!--            <groupId>org.springframework.cloud</groupId>-->
<!--            <artifactId>spring-cloud-starter-stream-rabbit</artifactId>-->
<!--        </dependency>-->
<!--        <dependency>-->
<!--            <groupId>org.springframework.cloud</groupId>-->
<!--            <artifactId>spring-cloud-starter-stream-kafka</artifactId>-->
<!--        </dependency>-->

        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>2.0.7</version>
        </dependency>
    </dependencies>

    <dependencyManagement>
        <dependencies>
            <!-- spring-cloud-->
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-dependencies</artifactId>
                <version>${spring-cloud.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
            <!-- spring-cloud-alibaba -->
            <dependency>
                <groupId>com.alibaba.cloud</groupId>
                <artifactId>spring-cloud-alibaba-dependencies</artifactId>
                <version>${spring-cloud-alibaba.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>

            <dependency>
                <groupId>org.projectlombok</groupId>
                <artifactId>lombok</artifactId>
                <version>${lombok.version}</version>
            </dependency>

            <dependency>
                <groupId>com.google.guava</groupId>
                <artifactId>guava</artifactId>
                <version>${guava.version}</version>
            </dependency>

            <dependency>
                <groupId>com.alibaba</groupId>
                <artifactId>fastjson</artifactId>
                <version>${fastjson.version}</version>
            </dependency>

            <!-- apache工具包 -->
            <dependency>
                <groupId>org.apache.commons</groupId>
                <artifactId>commons-lang3</artifactId>
                <version>${commons-lang3.version}</version>
            </dependency>

            <dependency>
                <groupId>org.apache.commons</groupId>
                <artifactId>commons-collections4</artifactId>
                <version>${commons-collection.version}</version>
            </dependency>

            <dependency>
                <groupId>org.yaml</groupId>
                <artifactId>snakeyaml</artifactId>
                <version>${snakeyaml.version}</version>
            </dependency>
        </dependencies>
    </dependencyManagement>

    <build>
        <plugins>
            <!-- 指定JDK编译版本 -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <source>17</source>
                    <target>17</target>
                    <compilerVersion>17</compilerVersion>
                    <encoding>UTF-8</encoding>
                </configuration>
            </plugin>
            <!-- 打包跳过测试 -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-surefire-plugin</artifactId>
                <configuration>
                    <skipTests>true</skipTests>
                </configuration>
            </plugin>
            <!-- 避免font文件的二进制文件格式压缩破坏 -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-resources-plugin</artifactId>
                <version>3.1.0</version>
                <configuration>
                    <nonFilteredFileExtensions>
                        <nonFilteredFileExtension>woff</nonFilteredFileExtension>
                        <nonFilteredFileExtension>woff2</nonFilteredFileExtension>
                        <nonFilteredFileExtension>eot</nonFilteredFileExtension>
                        <nonFilteredFileExtension>ttf</nonFilteredFileExtension>
                        <nonFilteredFileExtension>svg</nonFilteredFileExtension>
                    </nonFilteredFileExtensions>
                </configuration>
            </plugin>
        </plugins>
        <resources>
            <resource>
                <directory>src/main/resources</directory>
                <filtering>true</filtering>
            </resource>
            <resource>
                <directory>src/main/java</directory>
                <includes>
                    <include>**/*.xml</include>
                    <include>**/*.json</include>
                    <include>**/*.ftl</include>
                </includes>
            </resource>
        </resources>
    </build>
</project>
baiying319 commented 1 year ago

配置问题,out和in匹配要用 | ,而不是;

baiying319 commented 1 year ago

when I use org.springframework.cloud.stream.function.StreamBridge to send message, I won't be able to consume messages for anyway.

  public void sendSmsToAdmin(SmsSendMessage message) {
        log.info("要发送的短信内容为: {}", message);
        streamBridge.send("send-out-0", message);
    }
   @Bean
    public Consumer<SmsSendMessage> receive() {
        return request -> {
            log.info("------received-------: {} ", request.getContent());
        };
    }
spring:
  cloud:
    function:
      definition: receive
    stream:
#      input-bindings: receive,receive-in-0
#      output-bindings: send,send-out-0
      # 默认binder
      default-binder: rocketmq
      function:
        definition: receive
      rocketmq:
        binder:
          name-server: centos8-2:9876
          group: sms_group
      bindings:
        send-out-0:
          # 多个逗号分隔
          destination: admin_sms_send_default
          group: sms_group_default
          binder: rocketmq
          content-type: application/json
        receive-in-0:
          destination: admin_sms_send_default
          group: sms_group_default
          binder: rocketmq
          content-type: application/json
wen002003 commented 8 months ago

我也遇到了同样的问题,想问下处理了么?如何处理

qiujianchen commented 5 months ago

我也遇到了同样的问题,请问是如何解决的?

wen002003 commented 5 months ago

我一会可以把我的配置文件给你,这个里面要注意几个点:

  1. 配置文件 cloud: stream: function:

    stream Binding相关的函数定义,也需要声明使用到的函数名,需要确保消费者类的名称和这里一样,多个使用;区分

    definition: scanQRcodeConsumer;bikeIndentConsumer; default-binder: rocketmq

    这里面要么声明的是消费者类 要么就是消费者类中的多个方法

    rocketmq: binder: name-server: 127.0.0.1:9876 bindings: scanQRcodeConsumer-in-0: consumer: messageModel: BROADCASTING bikeIndentConsumer-in-0: consumer: messageModel: BROADCASTING openDevice-out-0: producer: group: output_1 bikeResponse-out-0: bindings: scanQRcodeConsumer-in-0: content-type: application/json destination: openDevice group: scanQRcodeConsumer binder: rocketmq bikeIndentConsumer-in-0: content-type: application/json destination: bikeIndent group: bikeIndentConsumer binder: rocketmq openDevice-out-0: destination: openDevice

  2. 定义一个config,Java代码如下: @Slf4j @Configuration public class BikeSyncListenConfig { /**

    • 订单同步监听
    • @return */ @Bean public Consumer<Message> bikeIndentConsumer() { return msg -> { log.error("开始消费bikeIndentConsumer:{}", msg.getPayload()); List list = JSONUtil.toList(msg.getPayload(), BikeIndentSyncDTO.class); if (list.isEmpty()) return;

}; } }

按照以上的配置写,就可以正常获取,具体的看bikeIndentConsumer 海涛哦

@.*** |

---- 回复的原邮件 ---- | 发件人 | @.> | | 发送日期 | 2024年04月7日 23:05 | | 收件人 | @.> | | 抄送人 | @.> , @.> | | 主题 | Re: [alibaba/spring-cloud-alibaba] stream 4.04版本 集成rocketmq收不到消息,使用官方文档的demo也收不到,请大佬们指教。 (Issue #3418) |

我也遇到了同样的问题,请问是如何解决的?

— Reply to this email directly, view it on GitHub, or unsubscribe. You are receiving this because you commented.Message ID: @.***>

qiujianchen commented 5 months ago

可以了,感谢!

wen002003 commented 5 months ago

不客气

2024年4月7日 23:19,qiujianchen @.***> 写道:

可以了,感谢!

— Reply to this email directly, view it on GitHub https://github.com/alibaba/spring-cloud-alibaba/issues/3418#issuecomment-2041503161, or unsubscribe https://github.com/notifications/unsubscribe-auth/ADJRXISK65Z74LNL7GMXOBDY4FPX7AVCNFSM6AAAAAA3FKT5DWVHI2DSMVQWIX3LMV43OSLTON2WKQ3PNVWWK3TUHMZDANBRGUYDGMJWGE. You are receiving this because you commented.