Closed artembilan closed 3 months ago
Merged via https://github.com/spring-cloud/spring-cloud-stream/commit/3b5bb2e0d9a89c3b9ae9152c99268460ca7ed442.
@olegz We need to discuss this when you are back. I merged it to address some CI test failures.
@artembilan @sobychacko Any update on the release date?
Hi, this will be part of the 2024.0.0-M2
release of spring-cloud scheduled for 10/03/24.
Related to: https://github.com/spring-projects/spring-integration/issues/9362
After the fix in Spring Integration: https://github.com/spring-projects/spring-integration/commit/bdcb856a9081bc091d56be16d51f0f4d561bc9ce we ended up in a deadlock situation with a
beginPublishingTrigger
in theFunctionConfiguration
used for thedelaySubscription()
on an originalPublisher
. TheFluxMessageChannel
uses its owndelaySubscription()
until the channel has its subscribers. Apparently the logic before was with errors, so theFluxMessageChannel
was marked as active even if its subscriber is not ready yet, leading to famousDispatcher does not have subscribers
error. So, looks like thisbeginPublishingTrigger
was introduced back in days in Spring Cloud Stream to mitigate that situation until we really emit aBindingCreatedEvent
.The deadlock (and the flaw, respectively) is with the
setupBindingTrigger()
method implementation whereFluxMessageChannel
now "really" delays a subscription to the providedPublisher
, therefore not triggering thatMono.create()
fulfilment immediately. TheBindingCreatedEvent
arrives earlier, than we have a subscriber on the channel, buttriggerRef.get()
isnull
, so we don'tsuccess()
it and in the end don't subscribe to an originalPublisher
sincedelaySubscription()
on it is never completed.Since
FunctionConfiguration
fully relies onIntegrationFlow.from(Publisher)
, which ends up with the mentionedFluxMessageChannel.subscribeTo()
and its owndelaySubscription()
(which, in turn, apparently fixed now), we don't need our owndelaySubscription()
any more. Therefore the fix in this PR is to propose to removebeginPublishingTrigger
logic altogether.