spring-cloud / spring-cloud-stream

Framework for building Event-Driven Microservices
http://cloud.spring.io/spring-cloud-stream
Apache License 2.0
1.01k stars 614 forks source link

Exception : ['XXX-in-0']] doesn't have subscribers to accept messages #2892

Closed cooperlyt closed 8 months ago

cooperlyt commented 9 months ago

When consuming messages using the Flux approach in Spring Cloud Stream, if an exception is thrown, it will result in an exception the next time a message arrives.

Caused by: java.lang.IllegalStateException: The [bean 'workStatusChannel-in-0'] doesn't have subscribers to accept messages
building-1     |        at org.springframework.util.Assert.state(Assert.java:97) ~[spring-core-6.1.3.jar!/:6.1.3]
building-1     |        at org.springframework.integration.channel.FluxMessageChannel.doSend(FluxMessageChannel.java:67) ~[spring-integration-core-6.2.1.jar!/:6.2.1]
building-1     |        at org.springframework.integration.channel.AbstractMessageChannel.sendInternal(AbstractMessageChannel.java:378) ~[spring-integration-core-6.2.1.jar!/:6.2.1]

Sample code:


  @Bean
  public Function<Flux<Message<StatusChangeMessage>>, Mono<Void>> workStatusChannel(){
        return flux -> flux
        .flatMap(msg -> Mono.error(new IllegalArgumentException("test exception"))).then();
  }

I am using RocketMQ to provide messaging services. Whenever an exception occurs during the processing of a message, subsequent messages result in the error "doesn't have subscribers to accept messages." Even if an exception occurs, RocketMQ still marks the message as "CONSUMED." Subsequent messages that couldn't be consumed due to "doesn't have subscribers to accept messages" are also marked as "CONSUMED." This has caused significant challenges for my project.

I believe that when an exception occurs during message consumption, the appropriate action should be to return the exception to RocketMQ rather than unsubscribing.

Evn: RocketMQ 5.1.4 Spring boot 3.2.2 spring cloud 2023.0.0 spring cloud alibaba 2022.0.0.0

cooperlyt commented 9 months ago

My temporary workaround is:

  @Bean
  public Function<Flux<Message<StatusChangeMessage>>, Mono<Void>> workStatusChannel(){
    return flux -> flux
        .doOnNext(message -> log.info("workStatusChannel {}",message))
        .flatMap(msg -> Mono.just(msg)
            .flatMap(workEventHandler::testStreamException)
            .doOnError(error -> log.error("workStatusChannel error",error))
            .retryWhen(Retry.backoff(3, Duration.ofMillis(1000)))
            .onErrorResume(error -> {
              log.error("Message can't consume! --------------------------------------",error);
              return Mono.empty();
            })
            .then()
        )
        .then();
  }

However, this is not perfect, but it can address the inconvenience of not being able to receive any messages once an exception occurs.

sobychacko commented 8 months ago

@cooperlyt For reactive functions, this is expected as part of error handling. See this ref docs section, specifically the reactive functions section. The framework is not involved at runtime whenever reactive functions are involved, and the application is in total control. Framework only does the initial bootstrapping of the stream. For this reason, it is up to the application to handle runtime errors. What you provided as a workaround (with retryWhen) is the right solution when it comes to reactive functions. We don't think there is anything that the framework could assist with in this situation. I am closing the issue, but feel free to re-open if there is more context or a need. Thanks!