spring-projects / spring-integration

Spring Integration provides an extension of the Spring programming model to support the well-known Enterprise Integration Patterns (EIP)
http://projects.spring.io/spring-integration/
Apache License 2.0
1.54k stars 1.11k forks source link

Message publishing via Sink and OutputDestination times out with Spring Integration 6.3.2 #9362

Closed wrwksexahatvani closed 3 months ago

wrwksexahatvani commented 3 months ago

In what version(s) of Spring Integration are you seeing this issue?

6.3.2.RELEASE

Describe the bug

The DummyEventProducerIntegrationTestt in the sample project https://github.com/andrashatvani/spring-demo/ times out once I upgrade Spring Boot to 3.3.2 in the POM. It works with Spring Boot 3.3.1 with Spring Integration 6.3.1 as it has worked so far always. @wilkinsona guided me in the direction of Spring Integration in https://github.com/spring-projects/spring-boot/issues/41602.

To Reproduce

Run DummyEventProducerIntegrationTest.

Expected behavior

The test should pass.

Sample

https://github.com/andrashatvani/spring-demo/

artembilan commented 3 months ago

Apparently the fix in Spring integration 6.3.2 has just revealed some problem in Spring Cloud Stream. There is a login in the FunctionConfiguration:

    /*
     * Creates a publishing trigger to ensure Supplier does not begin publishing until binding is created
     */
    private Publisher<Object> setupBindingTrigger(GenericApplicationContext context) {
        AtomicReference<MonoSink<Object>> triggerRef = new AtomicReference<>();
        Publisher<Object> beginPublishingTrigger = Mono.create(triggerRef::set);
        context.addApplicationListener(event -> {
            if (event instanceof BindingCreatedEvent) {
                if (triggerRef.get() != null) {
                    triggerRef.get().success();
                }
            }
        });
        return beginPublishingTrigger;
    }

What I see in the debug session, when that BindingCreatedEvent arrives, there is no triggerRef.get() yet. This one can be populated there only when we subscriber to that beginPublishingTrigger. However, the FluxMessageChannel does not subscribe to the provided Publisher until it has its own subscribers:

    @Override
    public void subscribeTo(Publisher<? extends Message<?>> publisher) {
        Flux<Object> upstreamPublisher =
                Flux.from(publisher)
                        .delaySubscription(
                                Mono.fromCallable(this.sink::currentSubscriberCount)
                                        .filter((value) -> value > 0)
                                        .repeatWhenEmpty((repeat) ->
                                                this.active ? repeat.delayElements(Duration.ofMillis(100)) : repeat))
                        .flatMap((message) ->
                                Mono.just(message)
                                        .handle((messageToHandle, syncSink) -> sendReactiveMessage(messageToHandle))
                                        .contextWrite(StaticMessageHeaderAccessor.getReactorContext(message)))
                        .contextCapture();

        addPublisherToSubscribe(upstreamPublisher);
    }

And that is a race condition of the application state when this FluxMessageChannel does not have consumers yet, but BindingCreatedEvent has been emitted already. The logic is totally OK to emit that event and have consumers subscribed for the channel independently. Only the problem that trigger Mono is not populated until we subscribe to it. And that leads to the lost emission from the source Supplier<Flux>.

I'm thinking about reworking that setupBindingTrigger to not wait for Mono subscription, but rather just expose Sinks.One as a trigger for the event.

artembilan commented 3 months ago

Please, raise an issue on Spring Cloud Stream side and I'll provide PR shortly after I come up with the unit test to at least cover the functionality. The fix on Spring Integration side was this: https://github.com/spring-projects/spring-integration/commit/bdcb856a9081bc091d56be16d51f0f4d561bc9ce. And apparently before that there was some hidden behavior which allowed reactive subscription to be pushed up. Right now I'm bumping to your another issue after the fix: https://github.com/spring-cloud/spring-cloud-stream/issues/2977. But that what we will work later on.

Closing this one as Works as Designed.

artembilan commented 3 months ago

FYI, @wrwksexahatvani , I've raise Pull Request in Spring Cloud Stream: https://github.com/spring-cloud/spring-cloud-stream/pull/2978

No need in the separate issue over there.