opentracing-contrib / java-spring-cloud

Distributed tracing for Spring Boot, Cloud and other Spring projects
Apache License 2.0
389 stars 143 forks source link

opentracing-spring-cloud-starter breaks stomp client publish #301

Open cameronbraid opened 4 years ago

cameronbraid commented 4 years ago

As soon as I add opentracing-spring-cloud-starter:0.5.7 jar to my app I am no longer able to publish messages via stomp from client.

Client side subscribe works, and server publish messages are received on the client but the client can't publish as the following execption is received :

Exception in thread "clientInboundChannel-2" org.springframework.messaging.MessageDeliveryException: Failed to handle GenericMessage [payload=byte[2], headers={simpMessageType=MESSAGE, stompCommand=SEND, nativeHeaders={destination=[/queue/alternative-search], content-length=[2]}, simpSessionAttributes={}, simpHeartbeat=[J@28df87fe, opentracing.span=679eddba3b563c2e:679eddba3b563c2e:0:0 - /queue/alternative-search, id=1296640f-6ea0-5b7b-8706-b71243d6fd1f, simpSessionId=35c1c8b0-8047-e0b8-e728-e48972d77b0b, simpDestination=/queue/alternative-search, timestamp=1600508333518}] to org.springframework.messaging.support.ExecutorSubscribableChannel$SendTask@3d640f4d in StompBrokerRelay[ReactorNettyTcpClient[reactor.netty.tcp.TcpClientDoOn@9ba905b]]; nested exception is java.lang.IllegalStateException: No header accessor (not using the SimpMessagingTemplate?): GenericMessage [payload=byte[2], headers={simpMessageType=MESSAGE, stompCommand=SEND, nativeHeaders={destination=[/queue/alternative-search], content-length=[2]}, simpSessionAttributes={}, simpHeartbeat=[J@28df87fe, opentracing.span=679eddba3b563c2e:679eddba3b563c2e:0:0 - /queue/alternative-search, id=1296640f-6ea0-5b7b-8706-b71243d6fd1f, simpSessionId=35c1c8b0-8047-e0b8-e728-e48972d77b0b, simpDestination=/queue/alternative-search, timestamp=1600508333518}], failedMessage=GenericMessage [payload=byte[2], headers={simpMessageType=MESSAGE, stompCommand=SEND, nativeHeaders={destination=[/queue/alternative-search], content-length=[2]}, simpSessionAttributes={}, simpHeartbeat=[J@28df87fe, opentracing.span=679eddba3b563c2e:679eddba3b563c2e:0:0 - /queue/alternative-search, id=1296640f-6ea0-5b7b-8706-b71243d6fd1f, simpSessionId=35c1c8b0-8047-e0b8-e728-e48972d77b0b, simpDestination=/queue/alternative-search, timestamp=1600508333518}]
        at org.springframework.messaging.support.ExecutorSubscribableChannel$SendTask.run(ExecutorSubscribableChannel.java:153)
        at io.opentracing.contrib.concurrent.TracedRunnable.run(TracedRunnable.java:30)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.lang.IllegalStateException: No header accessor (not using the SimpMessagingTemplate?): GenericMessage [payload=byte[2], headers={simpMessageType=MESSAGE, stompCommand=SEND, nativeHeaders={destination=[/queue/alternative-search], content-length=[2]}, simpSessionAttributes={}, simpHeartbeat=[J@28df87fe, opentracing.span=679eddba3b563c2e:679eddba3b563c2e:0:0 - /queue/alternative-search, id=1296640f-6ea0-5b7b-8706-b71243d6fd1f, simpSessionId=35c1c8b0-8047-e0b8-e728-e48972d77b0b, simpDestination=/queue/alternative-search, timestamp=1600508333518}]
        at org.springframework.messaging.simp.stomp.StompBrokerRelayMessageHandler.handleMessageInternal(StompBrokerRelayMessageHandler.java:499)
        at org.springframework.messaging.simp.broker.AbstractBrokerMessageHandler.handleMessage(AbstractBrokerMessageHandler.java:256)
        at org.springframework.messaging.support.ExecutorSubscribableChannel$SendTask.run(ExecutorSubscribableChannel.java:144)
        ... 4 more

Spring boot version 2.3.4.RELEASE

cameronbraid commented 4 years ago

FYI I have more details of my broker setup and client are at https://github.com/spring-projects/spring-framework/issues/25640

rstoyanchev commented 4 years ago

I believe the issue is related to how TracingChannelInterceptor is implemented. It uses MessageBuilder.from(message) to obtain a builder, make changes, and then call build() again. However the STOMP over WebSocket messaging in the Spring Framework works with headers through a MessageHeaderAccessor wrapper, and sub-classes which provide support for in-app headers vs native (STOMP) headers as well as mutability of headers across components (e.g. interceptors) before the message is sent on a channel. When this wrapper is not used, and headers are manipulated directly via MessageBuilder the wrapper is lost and that's what leads to the specific IllegalStateException above.

The Javadoc provides more details but roughly the usage is something like this:

MessageHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, MessageHeaderAccessor.class);
if (accessor != null && accessor.isMutable()) {
    // headers are mutable, so change the headers and no need to re-create message
    updateHeaders(accessor);
}
else {
    accessor = MessageHeaderAccessor.getMutableAccessor(message);
    accessor.setLeaveMutable(true); // in preSend, leaving this mutable could benefit other interceptors
    updateHeaders(accessor);
    return new GenericMessage<>(message.getPayload(), accessor.toMessageHeaders());
}

where updateHeaders(accessor) is a common method that sets or removes headers.

arjunkrishnamurthy commented 2 years ago

@rstoyanchev wrapper is lost when we return a new GenericMessage object in the above snippet which still causes IllegalStateException in StompBrokerRelayMessageHandler. Do we have any solutions for this

anngdinh commented 2 years ago

@rstoyanchev I'm using a java agent (opentelemetry java agent) to catch all traffic in, out my spring-boot app. It modifies header of websocket for tracing. Then I receive IllegalStateException. Do you know how to fix it?

My stack trace: Failed to handle GenericMessage [payload=byte[91], headers={simpMessageType=MESSAGE, nativeHeaders={foo=[bar], tracepFailed to handle GenericMessage [payload=byte[91], headers={simpMessageType=MESSAGE, nativeHeaders={foo=[bar], traceparent=[00-e06e35d09fda2cb7cc624c1269acb2cd-7dc5aa0ca1566892-01]}, traceparent=00-e06e35d09fda2cb7cc624c1269acb2cd-7dc5aa0ca1566892-01, id=cf214d5f-5d9e-3d78-67d7-5b83c004f1e6, contentType=application/json, timestamp=1665369946985}] to org.springframework.messaging.support.ExecutorSubscribableChannel$SendTask@be1f38d in StompBrokerRelay[ReactorNettyTcpClient[reactor.netty.tcp.TcpClientConnect@539ba851]]; nested exception is java.lang.IllegalStateException: No header accessor (not using the SimpMessagingTemplate?): GenericMessage [payload=byte[91], headers={simpMessageType=MESSAGE, nativeHeaders={foo=[bar], traceparent=[00-e06e35d09fda2cb7cc624c1269acb2cd-7dc5aa0ca1566892-01]}, traceparent=00-e06e35d09fda2cb7cc624c1269acb2cd-7dc5aa0ca1566892-01, id=cf214d5f-5d9e-3d78-67d7-5b83c004f1e6, contentType=application/json, simpDestination=/topic/53539-resource-billing-buy-more-changing, timestamp=1665369946985}]", "stackTraceElements":"org.springframework.messaging.support.ExecutorSubscribableChannel$SendTask.run(ExecutorSubscribableChannel.java:153) org.springframework.messaging.support.ExecutorSubscribableChannel.sendInternal(ExecutorSubscribableChannel.java:100) org.springframework.messaging.support.AbstractMessageChannel.send(AbstractMessageChannel.java:139) org.springframework.messaging.support.AbstractMessageChannel.send(AbstractMessageChannel.java:125) org.springframework.messaging.simp.SimpMessagingTemplate.sendInternal(SimpMessagingTemplate.java:187) org.springframework.messaging.simp.SimpMessagingTemplate.doSend(SimpMessagingTemplate.java:162) org.springframework.messaging.simp.SimpMessagingTemplate.doSend(SimpMessagingTemplate.java:48) org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109) arent=[00-e06e35d09fda2cb7cc624c1269acb2cd-7dc5aa0ca1566892-01]}, traceparent=00-e06e35d09fda2cb7cc624c1269acb2cd-7dc5aa0ca1566892-01, id=cf214d5f-5d9e-3d78-67d7-5b83c004f1e6, contentType=application/json, timestamp=1665369946985}] to org.springframework.messaging.support.ExecutorSubscribableChannel$SendTask@be1f38d in StompBrokerRelay[ReactorNettyTcpClient[reactor.netty.tcp.TcpClientConnect@539ba851]]; nested exception is java.lang.IllegalStateException: No header accessor (not using the SimpMessagingTemplate?): GenericMessage [payload=byte[91], headers={simpMessageType=MESSAGE, nativeHeaders={foo=[bar], traceparent=[00-e06e35d09fda2cb7cc624c1269acb2cd-7dc5aa0ca1566892-01]}, traceparent=00-e06e35d09fda2cb7cc624c1269acb2cd-7dc5aa0ca1566892-01, id=cf214d5f-5d9e-3d78-67d7-5b83c004f1e6, contentType=application/json, simpDestination=/topic/53539-resource-billing-buy-more-changing, timestamp=1665369946985}]", "stackTraceElements":"org.springframework.messaging.support.ExecutorSubscribableChannel$SendTask.run(ExecutorSubscribableChannel.java:153) org.springframework.messaging.support.ExecutorSubscribableChannel.sendInternal(ExecutorSubscribableChannel.java:100) org.springframework.messaging.support.AbstractMessageChannel.send(AbstractMessageChannel.java:139) org.springframework.messaging.support.AbstractMessageChannel.send(AbstractMessageChannel.java:125) org.springframework.messaging.simp.SimpMessagingTemplate.sendInternal(SimpMessagingTemplate.java:187) org.springframework.messaging.simp.SimpMessagingTemplate.doSend(SimpMessagingTemplate.java:162) org.springframework.messaging.simp.SimpMessagingTemplate.doSend(SimpMessagingTemplate.java:48) org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)