Closed waylau closed 1 year ago
I use spring-cloud-starter-stream-jms with ActiveMQ in Sprig Cloud Stream batch mode, but it doesn't work fine.
Here is pom.xml:
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <!--spring-boot必须是2.x--> <version>2.7.8</version> <relativePath/> <!-- lookup parent from repository --> </parent> <groupId>com.waylau.spring.cloud.stream.binder.jms</groupId> <artifactId>spring-cloud-stream-batch-demo</artifactId> <version>0.0.1-SNAPSHOT</version> <name>spring-cloud-stream-batch-demo</name> <description>Batch demo for Spring Cloud Stream</description> <properties> <java.version>11</java.version> <!--spring-cloud必须是2021.x--> <spring-cloud.version>2021.0.5</spring-cloud.version> <spring-cloud-stream-binder-jms.version>1.0.0.RELEASE</spring-cloud-stream-binder-jms.version> <spring-cloud-starter-stream-rocketmq.version>2022.0.0.0-RC1</spring-cloud-starter-stream-rocketmq.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-activemq</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-stream</artifactId> </dependency> <!--start: 添加binder--> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-stream-binder-kafka</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-stream-binder-rabbit</artifactId> </dependency> <dependency> <groupId>com.alibaba.cloud</groupId> <artifactId>spring-cloud-starter-stream-rocketmq</artifactId> <version>${spring-cloud-starter-stream-rocketmq.version}</version> <exclusions> <exclusion> <artifactId>fastjson</artifactId> <groupId>com.alibaba</groupId> </exclusion> </exclusions> </dependency> <dependency> <groupId>com.boutouil</groupId> <artifactId>spring-cloud-stream-binder-jms</artifactId> <version>${spring-cloud-stream-binder-jms.version}</version> </dependency> <!--end: 添加binder--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-stream</artifactId> <scope>test</scope> <classifier>test-binder</classifier> <type>test-jar</type> </dependency> </dependencies> <dependencyManagement> <dependencies> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-dependencies</artifactId> <version>${spring-cloud.version}</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>
Here is application.properties:
# 默认binder,可选kafka、rabbit、rocketmq、jms。如果是ActiveMQ则配置jms # rocketmq、jms暂不支持批量消费 spring.cloud.stream.defaultBinder=jms # 存在多个Supplier/Function/Consumer Bean时,配置 spring.cloud.function.definition=log;logBatch spring.cloud.stream.bindings.log-in-0.destination=queue://logSingleDestination spring.cloud.stream.bindings.logBatch-in-0.destination=queue://logBatchDestination spring.cloud.stream.bindings.logBatch-in-0.consumer.batch-mode=true
Here is DemoApplication.java:
package com.waylau.spring.cloud.stream.batch.demo; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.ApplicationRunner; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.cloud.stream.function.StreamBridge; import org.springframework.context.annotation.Bean; import java.util.List; import java.util.function.Consumer; /** * 批量消费与生产 * * @author <a href="https://waylau.com">Way Lau</a> * @since 2023-02-22 */ @SpringBootApplication public class DemoApplication { private final static List<Person> PERSON_LIST = List.of( new Person("Sam Spade"), new Person("Sam Po"), new Person("Sam Li"), new Person("Sam Bo"), new Person("Way Lau"), new Person("Fei Po"), new Person("Gu Li") ); @Autowired private StreamBridge bridge; public static void main(String[] args) { SpringApplication.run(DemoApplication.class, args); } @Bean public ApplicationRunner runner() { return arg -> { // 将消息批量转发到其他destination bridge.send("logBatchTransmitDestination", PERSON_LIST); // 将消息逐个转发到其他destination PERSON_LIST.stream().forEach(person -> { bridge.send("logBatchDestination", person); }); }; } /** * 单条消息处理器 * * @return */ @Bean public Consumer<Person> log() { return person -> { // 打印出接收到的消息 System.out.println("Received: " + person); }; } /** * 批量消息处理器 * * @return */ @Bean public Consumer<List<Person>> logBatch() { return personList -> { // 打印出接收到的消息 System.out.println("Received personList: " + personList); // 将消息逐个转发到其他destination personList.stream().forEach(person -> { bridge.send("logSingleDestination", person); }); }; } public static class Person { private String name; public Person() { } public Person(String name) { this.name = name; } public String getName() { return name; } public void setName(String name) { this.name = name; } public String toString() { return this.name; } } }
Complete Application code can found here: https://github.com/waylau/spring-cloud-stream-tutorial/tree/main/samples/spring-cloud-stream-batch-demo
It seems logBatch() dosen't work fine. Message data doesn't convert to Person class:
Received personList: [123, 34, 110, 97, 109, 101, 34, 58, 34, 83, 97, 109, 32, 83, 112, 97, 100, 101, 34, 125] Received personList: [123, 34, 110, 97, 109, 101, 34, 58, 34, 83, 97, 109, 32, 83, 112, 97, 100, 101, 34, 125] Received personList: [123, 34, 110, 97, 109, 101, 34, 58, 34, 83, 97, 109, 32, 83, 112, 97, 100, 101, 34, 125]
I have read the reference doc of spring-cloud-stream-binder-jms, and there is nothing about batch-mode.
Any idea?
I use spring-cloud-starter-stream-jms with ActiveMQ in Sprig Cloud Stream batch mode, but it doesn't work fine.
Here is pom.xml:
Here is application.properties:
Here is DemoApplication.java:
Complete Application code can found here: https://github.com/waylau/spring-cloud-stream-tutorial/tree/main/samples/spring-cloud-stream-batch-demo
It seems logBatch() dosen't work fine. Message data doesn't convert to Person class:
I have read the reference doc of spring-cloud-stream-binder-jms, and there is nothing about batch-mode.
Any idea?