spring-cloud / spring-cloud-stream-samples

Samples for Spring Cloud Stream
Apache License 2.0
956 stars 676 forks source link

Can't use more than one Consumer #200

Closed xwlcn closed 3 years ago

xwlcn commented 3 years ago

yml:

bindings:
        store-in-0:
          group: torrent-store-group
          destination: torrentMessages
          contentType: application/json
          consumer:
            batch-mode: true
            compressionType: gzip
            autoCommitOffset: false
        index-in-0:
          group: torrent-index-group
          destination: torrentMessages
          contentType: application/json
          consumer:
            batch-mode: true
            max-poll-records: 100
            compressionType: gzip
            autoCommitOffset: false

java:

@Slf4j
@SpringBootApplication
public class TorrentStoreServiceApplication {

    @Autowired
    private TorrentService torrentService;
    public static Boolean filterXxx;
    @Value("${dodder.filter-sensitive-torrent}")
    public void setFilterXxx(Boolean filterXxx) {
        TorrentStoreServiceApplication.filterXxx = filterXxx;
    }

    public static void main(String[] args) {
        SpringApplication.run(TorrentStoreServiceApplication.class, args);
    }

    @Bean
    Consumer<Message<List<Torrent>>> index() {
        return message -> {
            try {
                Acknowledgment acknowledgment = message.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class);
                List<Torrent> torrents = message.getPayload();
                //System.out.println(torrents);
                //torrentService.index(torrents);
                //no error, execute acknowledge
                if (acknowledgment != null) {
                    acknowledgment.acknowledge();
                }
            } catch (Exception e) {
                log.error("Insert or update torrent error: {}", e);
            }
        };
    }

    @Bean
    Consumer<Message<List<Torrent>>> store() {
        return message -> {
            try {
                Acknowledgment acknowledgment = message.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class);
                List<Torrent> torrents = message.getPayload();
                System.out.println(torrents);
                torrentService.upsert(torrents);
                //no error, execute acknowledge
                if (acknowledgment != null) {
                    acknowledgment.acknowledge();
                }
            } catch (Exception e) {
                log.error("Insert or update torrent error: {}", e);
            }
        };
    }
}

console info:

Found more then one function beans in BeanFactory: [index, store]. If you did not intend to use functions, ignore this message. However, if you did intend to use functions in the context of spring-cloud-function, consider providing 'spring.cloud.function.definition' property pointing to a function bean(s) you intend to use. For example, 'spring.cloud.function.definition=myFunction'

Before using version 2.X, use @StreamListener annotation had no problem, but I wanted to do batch processing. Two different groups consumed the same topic, and the above code could not run correctly. I needed to comment out a consumer method will it can be run.

xwlcn commented 3 years ago

Addspring.cloud.function.definition=index;store it works.