SolaceProducts / solace-spring-cloud

An umbrella project containing all Solace projects for Spring Cloud
Apache License 2.0
20 stars 14 forks source link

[Bug]: Spring cloud stream solace binder StreamBridge.send() unexpected error #312

Closed siaavush closed 4 weeks ago

siaavush commented 1 month ago

Bug Description

we are using Spring Cloud Stream Solace in our Spring Boot project, when we try to send messages simultaneously from 5 different threads by using StreamBridge.send() the underlying implementation "randomly" throws org.springframework.messaging.MessagingException, by randomly I mean it doesn't matter if I try to send 1000 messages at the same time or 50.

Library version: implementation("com.solace.spring.cloud:spring-cloud-starter-stream-solace:4.2.0")

Full stack trace:

org.springframework.messaging.MessagingException: Cannot send message using handler 028dfc28-458f-46d9-a669-aa22169323fd
    at com.solace.spring.cloud.stream.binder.util.ErrorChannelSendingCorrelationKey.send(ErrorChannelSendingCorrelationKey.java:57)
    at com.solace.spring.cloud.stream.binder.outbound.JCSMPOutboundMessageHandler.handleMessagingException(JCSMPOutboundMessageHandler.java:188)
    at com.solace.spring.cloud.stream.binder.outbound.JCSMPOutboundMessageHandler.handleMessage(JCSMPOutboundMessageHandler.java:76)
    at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder$SendingHandler.handleMessageInternal(AbstractMessageChannelBinder.java:1185)
    at org.springframework.integration.handler.AbstractMessageHandler.doHandleMessage(AbstractMessageHandler.java:105)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:73)
    at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:132)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:133)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106)
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72)
    at org.springframework.integration.channel.AbstractMessageChannel.sendInternal(AbstractMessageChannel.java:378)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:332)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:302)
    at org.springframework.cloud.stream.function.StreamBridge.send(StreamBridge.java:187)
    at org.springframework.cloud.stream.function.StreamBridge.send(StreamBridge.java:146)
    at org.springframework.cloud.stream.function.StreamBridge.send(StreamBridge.java:141)
    at com.ingka.fms.inbound.reversereceiving.common.service.EventMeshPublisher.publish(EventMeshPublisher.kt:22)
    at com.ingka.fms.inbound.reversereceiving.common.service.EventMeshPublisher.publish$default(EventMeshPublisher.kt:18)
    at com.ingka.fms.inbound.reversereceiving.receiving.messaging.publisher.reverseReceivingUnitStatus.ReverseReceivingUnitStatusUpdatedPublisher.publish(ReverseReceivingUnitStatusUpdatedPublisher.kt:37)
    at com.ingka.fms.inbound.reversereceiving.receiving.messaging.publisher.reverseReceivingUnitStatus.ReverseReceivingUnitStatusUpdatedPublisher.createEventAndPublish(ReverseReceivingUnitStatusUpdatedPublisher.kt:73)
    at com.ingka.fms.inbound.reversereceiving.common.service.UnitService.publishEvents(UnitService.kt:281)
    at com.ingka.fms.inbound.reversereceiving.common.service.UnitService.updateUnitStatusAndPublishEvents(UnitService.kt:391)
    at com.ingka.fms.inbound.reversereceiving.common.service.UnitService.updateUnitStatusAndPublishEvent(UnitService.kt:253)
    at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103)
    at java.base/java.lang.reflect.Method.invoke(Method.java:580)
    at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:351)
    at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:196)
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163)
    at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:765)
    at org.springframework.transaction.interceptor.TransactionInterceptor$1.proceedWithInvocation(TransactionInterceptor.java:123)
    at org.springframework.transaction.interceptor.TransactionAspectSupport.invokeWithinTransaction(TransactionAspectSupport.java:392)
    at org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:119)
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:184)
    at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:765)
    at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:717)
    at com.ingka.fms.inbound.reversereceiving.common.service.UnitService$$SpringCGLIB$$0.updateUnitStatusAndPublishEvent(<generated>)
    at com.ingka.fms.inbound.reversereceiving.receiving.service.ReceivingService.completeReceivingUnit(ReceivingService.kt:563)
    at com.ingka.fms.inbound.reversereceiving.receiving.service.ReceivingService.completeReceivingUnits(ReceivingService.kt:554)
    at com.ingka.fms.inbound.reversereceiving.receiving.service.ReceivingService.completeReceiving$lambda$8(ReceivingService.kt:298)
    at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1768)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
    at java.base/java.lang.Thread.run(Thread.java:1583)
Caused by: com.solace.spring.cloud.stream.binder.util.ClosedChannelBindingException: Message handler 028dfc28-458f-46d9-a669-aa22169323fd is not running
    ... 41 common frames omitted

maybe it gives more clue I do see this in the logs as well:

Stopping producer to TOPIC xxx/xxx/status/updated/V2/FA/FMSRR/xx/xx/ddd/aaaa/rc <message handler ID: 088fd415-8c5c-4ea0-91af-c319403403d3>

before the above log this log happens:

logger:  c.s.s.c.s.b.util.SharedResourceManager
message:  088fd415-8c5c-4ea0-91af-c319403403d3 is not the last user, persisting producer...

By a quick search in the Solace binder code, I see that this log comes from a stop() method, so I would assume that in a multithreaded environment, one thread would probably try to close the publisher while another thread is during its sending phase

I also see this log:

logger:  c.s.s.c.s.b.util.SharedResourceManager
message:  088fd415-8c5c-4ea0-91af-c319403403d3 is not the last user, persisting producer...

Expected Behavior

I expect Solace binder to publish the messages in a multi-threaded environment

Steps to Reproduce

  1. Create a thread pool
  2. Try to send long messages using StreamBrdigeApi within the thread pool created in step 1

Solace Broker version

No response

Solace API

implementation("com.solace.spring.cloud:spring-cloud-starter-stream-solace:4.2.0")

Solace API version

implementation("com.solace.spring.cloud:spring-cloud-starter-stream-solace:4.2.0")

siaavush commented 1 month ago

so after some more investigation I realized that the cardinality of the topics is important and on my local, I was trying to produce those messages on the same topic and adding some random suffix to the topic names, I was able to see in the logs that application open/close the handlers.

2024-07-16T14:52:25.900+02:00  INFO 68298 --- [ool-1-thread-16] c.s.s.c.s.b.util.SharedResourceManager   : b15b7319-a225-4d6f-bcb2-0111ce5ae655 is not the last user, persisting producer...
2024-07-16T14:52:25.901+02:00  INFO 68298 --- [ool-1-thread-17] .s.s.c.s.b.o.JCSMPOutboundMessageHandler : Creating producer to TOPIC ingka.ilo.fms/inboundUnit/status/receiving-completed/V1/EU/FMSRR/BE/STO/siavash3 <message handler ID: 3db42e77-8a15-4eff-a842-c39ed5163e20>
2024-07-16T14:52:25.901+02:00  INFO 68298 --- [ool-1-thread-17] o.s.c.s.m.DirectWithAttributesChannel    : Channel 'unknown.channel.name' has 1 subscriber(s).
2024-07-16T14:52:25.901+02:00  INFO 68298 --- [ool-1-thread-17] .s.s.c.s.b.o.JCSMPOutboundMessageHandler : Stopping producer to TOPIC ingka.ilo.fms/inboundUnit/status/receiving-completed/V1/EU/FMSRR/BE/STO/siavash39 <message handler ID: c29b584f-2895-4d2c-9c14-0417c68b297c>

By reviewing the code in SharedResourceManager.java and JCSMPOutboundMessageHandler.java in the Solace Library, a potential issue that stands out is related to the lifecycle management and synchronization of shared resources.

The release(String key) method in SharedResourceManager.java closes the shared resource when the last key is deregistered. However, if another thread calls get(String key) concurrently just after the resource has been checked to be not empty but before it is closed, it might end up using a resource that is about to be closed. This race condition could lead to unexpected behavior or exceptions

The start() and stop() methods manage the lifecycle. However, there's no explicit synchronization around the checks and operations on isRunning state and producer instance creation/release. In a multi-threaded scenario, this could lead to cases where multiple threads might concurrently attempt to start or stop the handler, potentially leading to inconsistent states or exceptions like ClosedChannelBindingException if one thread closes the producer while another is sending a message.

siaavush commented 1 month ago

A short-term solution to this problem would be to increase the cache size of the handlers so you decrease the probability of race-condition happening.

It's possible by tuning this configuration: spring.cloud.stream.dynamic-destination-cache-size

I think it is worth investing in fixing the bug specifically for bigger projects using Solace, that every MB of RAM counts for them.

Nephery commented 4 weeks ago

As per the Spring documentation, instead of using StreamBridge's dynamic destination feature, we strongly recommend that you use the scst_targetDestination message header to do this instead:

If you are using the Solace PubSub+ binder, Spring Cloud Stream has reserved the scst_targetDestination header (retrievable via BinderHeaders.TARGET_DESTINATION), which allows for messages to be redirected from their bindings' configured destination to the target destination specified by this header. This allows for the binder to manage the resources necessary to publish to dynamic destinations, relieving the framework from having to do so, and avoids the caching issues mentioned in the previous Note. More info here.

The issue that you are seeing is just one of many caveats from using StreamBridge's dynamic destination feature. And because the scst_targetDestination header is just an all-around better alternative, Solace has no immediate plans to support the StreamBridge implementation of dynamic destinations.

So assuming that you were doing something like this:

public void sendMessage(StreamBridge streamBridge, String myDestination, Message<?> message) {
  streamBridge.send(myDestination, message);
}

You should instead refactor your code to do this:

public void sendMessage(StreamBridge streamBridge, String myDestination, Message<?> message) {
  Message<?> messageWithDestination = MessageBuilder.fromMessage(message)
      .setHeader(BinderHeaders.TARGET_DESTINATION, myDestination)
      .build();
  streamBridge.send("some-pre-defined-output-binding", messageWithDestination);
}

Then in your application.properties (or application.yml):

spring.cloud.stream.output-bindings=some-pre-defined-output-binding

For more info, please see: