Closed bart-blommaerts closed 4 years ago
Yeah it looks like it would be a similar problem described here: https://docs.aws.amazon.com/xray/latest/devguide/xray-sdk-java-multithreading.html. As we are doing a lot of asynchronous coding, they are are all running on their own thread pool which isn't getting the tracing context applied. This would be the same problem for other tracing libraries like Zipkin or with MDC context which I wanted to dig into more but never got around to implementing.
Ultimately I would want something similar to how we did the Spring Cloud Schema Registry where I make some changes to the core to make this extensibility working for all types of tracing and then you can add an extension to get your specific tracing working like X-Ray. It should just be a simple drop-in and work out of the box like you expect.
If you have a minimal reproducible version of this that would be great for me to get started, otherwise I will create an example in this library so I can reproduce it and begin to dig into how to implement this.
I expect that this will be a little bit more complicated than the Spring Cloud Schema Registry changes that we made as it involves more core changes so I expect it may take longer for me finish this. If you have the workaround working for you at the moment that will at least allow you to keep moving forward while you wait for me to get to this.
I quickly created a demo application: https://github.com/bart-blommaerts/sqs-xray-demo. It's a small project to reproduce the issue. It's not something anyone should ever use.
Check application.yml for configuration of your SQS queue. When you send
{
"Message" : "{\"demo\": \"1234567\"}"
}
to it, you will find the
Caused by: com.amazonaws.xray.exceptions.SubsegmentNotFoundException: Failed to end subsegment: subsegment cannot be found
.
You will need to purge the queue if you want to use it for something else.
Hey,
First off, thanks for the demo I was able to get it working and I saw the problem that you have had.
So I have been working on this with the spare time I have had. I have the base functionality working using Brave tracing (as this is something I have used before so have a basic understanding of how it works). It should be pretty easy to extend it to another tracing library like X-Ray. I haven't done a full test of integrating X-Ray into the fixes yet so not sure if there is something I am missing in the X-Ray use case but I can let you know what happens when I try that.
If you are curious this is the draft interface that I am working with that would allow you to hook into the thread that is processing the messaging to add MDC, Thread locals, etc.
/**
* Used to decorate the thread that will be used to process the given message.
*
* <p>This can be used to add extra information to the thread, such as Tracing, metrics, logging, etc.
*
* <p>For each method in this class an exception will result in
*/
public interface MessageProcessingDecorator {
/**
* Apply decorations to the thread before it is beginning to process the message.
*
* <p>If any of these decorators fail to perform this method, subsequent calls will not be made and the message will not be
* processed by the {@link MessageProcessor}.
*
* @param details details about the message processing functionality, e.g. identifier for this message processor
* @param message the message being processed
*/
default void preProcess(MessageProcessingDetails details, Message message) {
}
/**
* Method called if there was a failure to process the message on the given thread.
*
* <p>This is guaranteed to be performed on the same thread as the message processing thread and will run before
* the {@link #postProcess(MessageProcessingDetails, Message)} callback.
*
* @param details details about the message processing functionality, e.g. identifier for this message processor
* @param message the message being processed
* @param throwable exception that was thrown by the message processing function
*/
default void onProcessingFailure(MessageProcessingDetails details, Message message, Throwable throwable) {
}
/**
* Method called if the message processing thread did not throw an exception when being executed.
*
* <p>Note that this does not guarantee that the message was actually processed as it could have been run on an asynchronous thread
* that is resolved independently or the message consumer will resolve the thread manually using the {@link Acknowledge}
* parameter.
*
* @param details details about the message processing functionality, e.g. identifier for this message processor
* @param message the message being processed
*/
default void onProcessingSuccess(MessageProcessingDetails details, Message message) {
}
/**
* Method called if the message was successfully processed and therefore triggered the {@link MessageResolver#resolveMessage(Message)} call
* for this message.
*
* <p>This will be called after the message was successfully resolved and is not guaranteed to be on the thread that processed the message.
*
* @param details details about the message processing functionality, e.g. identifier for this message processor
* @param message the message being processed
*/
default void onMessageResolved(MessageProcessingDetails details, Message message) {
}
/**
* Method called if the message was attempted to be resolved via the {@link MessageResolver#resolveMessage(Message)} call but it failed.
*
* <p>This can be useful in monitoring the underlying infrastructure failures for the message being successfully processed but it couldn't
* be removed from the SQS queue and therefore will be processed again if there is replaying setup.
*
* @param details details about the message processing functionality, e.g. identifier for this message processor
* @param message the message being processed
* @param throwable exception that was thrown by the {@link MessageResolver}
*/
default void onMessageResolutionFailure(MessageProcessingDetails details, Message message, Throwable throwable) {
}
/**
* Method called when the message processing thread has finished.
*
* <p>This is guaranteed to be run, regardless of the state of the message processing and will be executed on the same thread that was processing
* the message.
*
* <p>Note that this does not guarantee that the message was successfully processed as it could have been run on an asynchronous thread
* that is resolved independently or the message consumer will resolve the thread manually using the {@link Acknowledge}
* parameter. See {@link #onMessageResolved(MessageProcessingDetails, Message)} for how to monitor those events.
*
* @param details details about the message processing functionality, e.g. identifier for this message processor
* @param message the message being processed
*/
default void postProcess(MessageProcessingDetails details, Message message) {
}
}
Makes sense!
Okay I have the first draft of how this decorating will work and it has been integrated with Brave Tracing.
Here is the specific file that would do it for Brave: https://github.com/JaidenAshmore/java-dynamic-sqs-listener/pull/160/files#diff-0ebb132cf4bc53f160f04caf83869bf5
I would assume that the equivalent would be needed for your XRay equivalent. With this part done I will be able to start taking a look at this.
Okay I have the basic implementation with no tests appearing to be working. You can see a spring boot example here: https://github.com/JaidenAshmore/java-dynamic-sqs-listener/tree/150_xray_support/examples/aws-xray-spring-example
It is a simplified version of the spring boot app that you sent me. I will be polishing this over the weekend and hopefully I can do a 4.0.0-M1 cut so you can start to use this if you still need this functionality.
It's been a couple of busy days .. I will make time to test your M1 next week. I'm still preparing a talk for a webinar this weekend, but after that I should be able to test.
One thing I did notice, was the SDK dependency on 2.4.0. Maybe an update to 2.6.0 can make it to this build as well (https://mvnrepository.com/artifact/com.amazonaws/aws-xray-recorder-sdk-spring/2.6.0). Unless you prefer separating the updates.
Thanks for your effort so far. Expect more from me at the beginning of next week.
Hey I have released 4.0.0-M1 if you wanted to muck around with it. I have just converted from maven to gradle so I am in the process of double checking that the JARs are equivalent. My initial test worked but let me know if you see any weirdness.
In regards to the actual XRay functionality I have done the minimum to get it working. You can just depend on the aws xray decorator extension dependency and spring should auto configure it.
<dependency>
<groupId>com.jashmore</groupId>
<artifactId>aws-xray-message-processing-decorator</artifactId>
<version>4.0.0-M1</version>
</dependency>
It will just start a segment when the message listener thread starts and end the segment when the thread ends.
If it doesn't do what you need, you can always implement your own MessageProcessingDecorator to do the same bit of functionality.
Let me know if there are any problems :)
Small intermediate update .. I added the dependency (and updated a bunch of others). I started with some tests, but I can't quite get any decent result yet. It doesn't seem to link segments together and other segments are missing.
I will debug some more and provide you with feedback. This is just a small update to let you know I'm still working on it :)
Yeah no rush, I don't really know much at all about XRay so just got it to start the segment before processing the message. Let me know if I need to do something else.
At this moment, I'm confused .. When I test I see the following logs:
receive SQS event:
2020-07-02 10:17:28.003 DEBUG 366 --- [ge-processing-4] c.a.x.c.ThreadLocalSegmentContext : Beginning subsegment named: processEvent
save event in DynamoDb:
2020-07-02 10:17:28.006 DEBUG 366 --- [ge-processing-4] c.a.x.c.ThreadLocalSegmentContext : Beginning subsegment named: saveAlert
2020-07-02 10:17:28.236 INFO 366 --- [ge-processing-4] c.k.kat.alerter.alerts.AlertServiceImpl : Successfully saved new alert in DynamoDB: PutItemResponse(Attributes={})
2020-07-02 10:17:28.236 DEBUG 366 --- [ge-processing-4] c.a.x.c.ThreadLocalSegmentContext : Ending subsegment named: saveAlert
2020-07-02 10:17:28.236 DEBUG 366 --- [ge-processing-4] c.a.xray.entities.SubsegmentImpl : Subsegment named 'saveAlert' ending. Parent segment named 'sqs-listener-event-consumer-process-event' has reference count 2
send event somewhere else
2020-07-02 10:17:28.237 DEBUG 366 --- [ge-processing-4] c.a.x.c.ThreadLocalSegmentContext : Beginning subsegment named: send
2020-07-02 10:17:28.238 DEBUG 366 --- [ge-processing-4] c.a.x.c.ThreadLocalSegmentContext : Ending subsegment named: send
2020-07-02 10:17:28.238 DEBUG 366 --- [ge-processing-4] c.a.xray.entities.SubsegmentImpl : Subsegment named 'send' ending. Parent segment named 'sqs-listener-event-consumer-process-event' has reference count 2
close
2020-07-02 10:17:28.238 DEBUG 366 --- [ge-processing-4] c.a.x.c.ThreadLocalSegmentContext : Ending subsegment named: processEvent
2020-07-02 10:17:28.238 DEBUG 366 --- [ge-processing-4] c.a.xray.entities.SubsegmentImpl : Subsegment named 'processEvent' ending. Parent segment named 'sqs-listener-event-consumer-process-event' has reference count 1
2020-07-02 10:17:28.239 DEBUG 366 --- [ge-processing-4] com.amazonaws.xray.AWSXRayRecorder : Ending segment named 'sqs-listener-event-consumer-process-event'.
I find this in the UDPEmitter as well:
"name" : "sqs-listener-event-consumer-process-event",
"id" : "75dcc5697147a991",
"start_time" : 1.593677848003E9,
"trace_id" : "1-5efd9818-4782174834cddec82eea2023",
"end_time" : 1.593677848239E9,
"subsegments" : [ {
"name" : "processEvent",
"id" : "292fbfbdbf1efe7e",
"start_time" : 1.593677848003E9,
"end_time" : 1.593677848238E9,
"subsegments" : [ {
"name" : "saveAlert",
"id" : "6604d0e61e1dd067",
"start_time" : 1.593677848006E9,
"end_time" : 1.593677848236E9,
"metadata" : {
"ClassInfo" : {
"Class" : "xyz"
}
}
}, {
"name" : "send",
"id" : "25434ca0dbe3300c",
"start_time" : 1.593677848237E9,
"end_time" : 1.593677848238E9,
"metadata" : {
"ClassInfo" : {
"Class" : "abc"
}
}
} ],
"metadata" : {
"ClassInfo" : {
"Class" : "defhandler"
}
}
} ],
"aws" : {
"xray" : {
"sdk_version" : "2.6.1",
"sdk" : "X-Ray for Java"
}
},
"service" : {
"runtime" : "OpenJDK 64-Bit Server VM",
"runtime_version" : "11.0.2"
}
}
I have the x-ray daemon running locally and it confirms my segments:
2020-07-02T10:17:29+02:00 [Info] Successfully sent batch of 1 segments (0.402 seconds)
All of this looks good .. but I don't see anything in the XRay console related to sqs-listener-event-consumer-process-event. I use com.amazonaws.aws-xray-recorder-sdk-spring, com.jashmore.java-dynamic-sqs-listener-spring-starter, com.jashmore.aws-xray-message-processing-decorator. When I add com.amazonaws.aws-xray-recorder-sdk-aws-sdk-v2-instrumentor (for automatic instrumentation of AWS services), I end up with a NPE before the application gets started:
s.a.a.c.i.h.BaseAsyncClientHandler : Error thrown from TransformingAsyncResponseHandler#onError, ignoring.
caused by
software.amazon.awssdk.core.exception.SdkClientException: Failed to begin subsegment named 'Sqs': segment cannot be found.
The segments starts and ends while bootstrapping the application:
2020-07-02 10:38:57.830 INFO 6179 --- [ main] faultMessageListenerContainerCoordinator : Starting MessageListenerContainerCoordinator
2020-07-02 10:38:57.850 DEBUG 6179 --- [ main] c.a.x.c.ThreadLocalSegmentContext : Beginning subsegment named: Sqs
2020-07-02 10:38:58.535 DEBUG 6179 --- [nc-response-1-0] c.a.xray.entities.SubsegmentImpl : Subsegment named 'Sqs' ending. Parent segment named 'test' has reference count 1
If I don't set a traceEntity in globalRecorder, I get the same error, but without any segment logging.
I'm looking further into it.
Maybe we need to start a subsegment in the BasicXrayMessgeProcessingDecorator?
I think something is missing in that class yes.
With com.amazonaws.aws-xray-recorder-sdk-aws-sdk-v2-instrumentor ] I always get the
Failed to begin subsegment named 'Sqs'
exception. It is thrown (I believe) by MessageListenerContainerCoordinator in DefaultMessageListenerContainerCoordinator, meaning 'auto instrumentation' is out .. while it would be incredibly helpful.
Leaving that dependency out, I was able to get the "sqs-listener-" + context.getListenerIdentifier()
segment in XRay, however it seems to be lost on its own. The application that uses the decorator, is in EKS and is also EKS-aware (in XRay) however, the context is lost. So I believe, we need to get a hold of the parentContext and instead of beginning a context, beginning a context with a traceId and a parent. In the message of onPreMessageProcessing an attribute AWSTraceHeader
is available which seems to be suitable for manual tracing. However, I kind figure out how to get a hold of the parent (which should be the application itself I assume). Starting a subsegment leads to the expected SegmentNotFoundException.
I noticed AWSXRay.getTraceEntity() is null in BasicXrayMessageProcessingDecorator. However when I look at the var after the application started (ie. without any messages fetched from SQS) it contained the Entity I set it to. So it got lost somewhere...
It is thrown (I believe) by MessageListenerContainerCoordinator in DefaultMessageListenerContainerCoordinator, meaning 'auto instrumentation' is out .. while it would be incredibly helpful.
Hmm my guess is that because I am running all the Sqs Retrieval and resolution on their own threads, which do not have a segment started, when it tries to wrap a SQS call it can't find the parent segment and fails. You can see here is one of the places that we are retrieving messages but no wrapping around that would be made.
The reason why I believe MessageListenerContainerCoordinator
could be the one that is failing is because when it starts up the containers it will try and determine the queue URL of the queue (when you just put a queue name and not a specific URL), you can see that here, and because we haven't wrapped it in a surrounding segment it will blow up with a failure and not start up the container.
I am really not a fan that when the XRay tracing library blows up it will fail the business logic of your app, seems a little strange to me as this should be separate metrics. I don't see why they can't start a new segment if there isn't one that already exists...but anyway 🤷
Ways I can think of getting around this:
ExecutorService
I use something like an XrayWrappedExecutorService
so at least any thread started will have the tracing information started. This would probably be the least intrusive way to support this. I can muck around with this but not sure how easy that would be to do. Another workaround that may work is that you create your own ExecutionInterceptor
before the XRay interceptor that checks if there is currently a segment, and if not create one. And then at the end, end the segment if you created one before.
You would need to make sure this occurs before the XRay instrumentation
/**
* Make sure to put this execution interceptor before the instrumentation one
*/
public class XraySegmentExecutionInterceptor implements ExecutionInterceptor {
@Override
public void beforeExecution(final Context.BeforeExecution context, final ExecutionAttributes executionAttributes) {
// check if there is a segment, if there isn't create one
}
@Override
public void afterExecution(final Context.AfterExecution context, final ExecutionAttributes executionAttributes) {
// if you created a segment before execution, end it
}
}
not sure exactly how it would work but that could allow you to keep instrumentation.
when the XRay tracing library blows up it will fail the business logic of your app
Well, you can configure that with contextMissingStrategy: LOG_ERROR
.
I don't think creating a segment is enough. That was what I was doing earlier, but it means it's an 'orphan' segment. You need to capture the parent of it. And that parent doesn't seem to be there, likely because of the threading you mentionned.
As an example using the auto-instrumentation: (had to cut the first name out)
I execute a request from my local machine to an AWS API GW, which is XRay enabled. The "Monitor" you see is an AWSXRayServletFilter
. For SNS / STS I don't have anything specific in my code (besides the @XRayEnabled
annotation in my interface). But when I lose that parent information, SQS doesn't know the message was sent by SNS, which is an insight I really want.
Yeah gotcha. So I think there are two things that we need to do:
When we get an SQS message we need to continue the trace somehow. When I implemented Brave tracing support I used the message attributes of the SQS message to store the trace ID so you could continue the chain, see BraveMessageProcessingDecorator. I think we would need to do the same for this. I have no idea how the trace information is passed into the SQS message but I will look at the TracingInterceptor to reverse engineer how it works.
The other part is the message retrieving threads not having a segment as discussed before. This is a bit more difficult to fix elegantly but I have been thinking about this the past couple of days and I think there might be a way for me to do this by wrapping the SqsAsyncClient
with this X-Ray logic. Will need to experiment whether this works and works nicely with the XRay library.
This is the last week at my current job so I will have a few weeks off where I will have the opportunity to look at this more deeply and hopefully be able to knock it out.
Okay I have gotten something sort of working. The layout of my AWS resources are:
In my service I have the following 3 scheduled tasks each running every 10 seconds
For the Service -> SQS -> Service use case I appear to be getting the correct tracing:
For the Service -> SNS -> SQS -> Service I also appear to be getting the correct tracing:
Unfortunately for the Service -> S3 -> SNS -> SQS -> Service use case the trace is being lost with the S3 event being created. There might be a way around this but from my googling I am not sure if it is supported. This is the side publishing
And then on the consuming side I have:
Do you think this will cover the use cases that you need at least?
I will polish up this and hopefully make another release for you. Otherwise I can push my branch up and you can muck around with the branch yourself if you want to test before a release would be made.
My use case is service > SNS > SQS > service, so the same as your second case. Sounds like worth the try.
If you can push the branch, I'll give it a try! (Next week on Thursday, I'll be off a couple days .. but if you can get anything out prior, I can test it)
Here is the branch that I have gotten it working on: https://github.com/JaidenAshmore/java-dynamic-sqs-listener/tree/150_xray_full_integration
You can use the examples/aws-xray-spring-example if you want to do some simple testing. The README.md in that module has steps on how to set it up. Otherwise just build the project into your m2 repository and use it in your own project. Let me know if there are any problems and I can clean it up.
I've released 4.0.0-M4 to make it easier for you to test without building it locally. Here is the current guide for getting set up, the module name is different to what it was before: https://github.com/JaidenAshmore/java-dynamic-sqs-listener/blob/4.x/doc/how-to-guides/spring/spring-how-to-add-aws-xray-tracing.md
Nevermind, the release didn't work correctly. I am working on fixing that now.
Nope, it does seem to have worked there was just a delay in publishing it to maven central. 🤷
Some feedback ..
The version for aws-xray-extension-spring-boot on https://github.com/JaidenAshmore/java-dynamic-sqs-listener/blob/4.x/doc/how-to-guides/spring/spring-how-to-add-aws-xray-tracing.md is incorrect :)
Initially, I got the same exception at startup of the Spring Boot application: Failed to begin subsegment named 'Sqs'
.
I managed to fix that by wrapping my SqsAsyncClient:
final ClientSegmentNamingStrategy namingStrategy = new StaticClientSegmentNamingStrategy("Jaiden - M4");
return new DefaultSqsAsyncClientProvider(
new XrayWrappedSqsAsyncClient(
SqsAsyncClient.builder()
.credentialsProvider(xyz)
.region(Region.EU_WEST_1).build(), recorder(), namingStrategy));
with recorder() being a standard AWSXRayRecorder
.
That fixes the startup, but logs a lot of "operation" : "ReceiveMessage",
, which I understand .. but it's kind of annoying, because it leads to
Unfortunately, the final picture is still missing parts:
I POST a message to Monitor via an API GW and it gets published to SNS. However the SQS is not linked to the SNS. I'm still trying some things.
Clarification: Jaiden - M4 to SQS are only the ReceiveMessage operations. Actual messages seem to be discarded entirely.
I'm setting up something simpler to find out why.
I'm starting to see something:
service
must be a manually created segment, because it has no AWS related metadata. I haven't figured out what .. it's not in any of my source code ..
Ah yeah, so by default I am was using the spring.application.name
as the name of the service, otherwise it just set it as `service: https://github.com/JaidenAshmore/java-dynamic-sqs-listener/blob/4.x/extensions/aws-xray-extension/spring-boot/src/main/java/com/jashmore/sqs/extensions/xray/spring/SqsListenerXrayConfiguration.java#L28
So you can name your spring application and it should get a better name otherwise you can manually configure it.
The version for aws-xray-extension-spring-boot on https://github.com/JaidenAshmore/java-dynamic-sqs-listener/blob/4.x/doc/how-to-guides/spring/spring-how-to-add-aws-xray-tracing.md is incorrect :)
There isn't a version set in there for that component, did you mean the aws-xray-recorder-sdk-aws-sdk-v2-instrumentor
version? I should probably remove that hard coded version as it it will go stale quickly.
In regards to the lots of tracing for the receiving messages, I am not sure if there is much we can do about that. As that is what the aws-xray-recorder-sdk-aws-sdk-v2-instrumentor
is providing. One option that I was considering but I didn't go into was to turn off sampling for those receive message requests, I am not sure if that removes the logging though.
Oh sorry, I just saw your comment on the PR, it is the ${dynamic-sqs-listener.version>}
having the >
in there. Thanks for that!
So you can name your spring application and it should get a better name otherwise you can manually configure it.
That explains why I couldn't find it. Looks a lot better now: (ignore the failed call at the end)
There still is 1 inconsistency though. You can see "Monitor" (a Spring Boot app) to AWS::SNS), but you can't see "Rec" (another Spring Boot app) to AWS::SQS. It seems to talk directly to SNS, which it in practice doesn't do. On the other hand the XRay wrapper on top of SQSClient shows the configured ClientSegmentNamingStrategy
from the ReceiveMessage
operations on AWS::SQS. There is no link between 'Rec' and AWS::SQS.
Other small remark, you don't need to add the emitter in code, you can configure it with
com:
amazonaws:
xray:
emitters:
daemonAddress: localhost:2000
I also prefer AWSXRay.getGlobalRecorder()
for the recorder, because I set the EKSPlugin outside of the SQS configuration anyway.
Fwiw, these are my deps:
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-xray-recorder-sdk-spring</artifactId>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-xray-recorder-sdk-apache-http</artifactId>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-xray-recorder-sdk-aws-sdk-v2-instrumentor</artifactId>
<exclusions>
<exclusion>
<groupId>software.amazon.awssdk</groupId>
<artifactId>aws-core</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.jashmore</groupId>
<artifactId>java-dynamic-sqs-listener-spring-starter</artifactId>
<version>${dynamic-sqs-listener.version}</version>
</dependency>
<dependency>
<groupId>com.jashmore</groupId>
<artifactId>aws-xray-extension-spring-boot</artifactId>
<version>${dynamic-sqs-listener.version}</version>
</dependency>
Initially, I got the same exception at startup of the Spring Boot application: Failed to begin subsegment named 'Sqs'. I managed to fix that by wrapping my SqsAsyncClient:
Interesting, the SqsListenerXrayConfiguration should automatically set up a SqsAsyncClientProvider with the SqsAsyncClient
unless you were already providing your own provider? E.g. you should just be able to do this and it should auto wrap it with an XrayWrappedSqsAsyncClient
correctly:
@Bean
public SqsAsyncClient sqsAsyncClient() {
return SqsAsyncClient.builder()
.credentialsProvider(xyz)
.region(Region.EU_WEST_1).build()
}
This should default to the AWSXRay.getGlobalRecorder()
as well.
There still is 1 inconsistency though. You can see "Monitor" (a Spring Boot app) to AWS::SNS), but you can't see "Rec" (another Spring Boot app) to AWS::SQS. It seems to talk directly to SNS, which it in practice doesn't do. On the other hand the XRay wrapper on top of SQSClient shows the configured ClientSegmentNamingStrategy from the ReceiveMessage operations on AWS::SQS. There is no link between 'Rec' and AWS::SQS.
Ah yes I didn't notice this in my own example where I did Service -> SNS -> SQS -> Service. I am able to reproduce this locally so let me see what I can do. I am just continuing the trace that AWS gives me so maybe there is something online on why this happens. Will get back to you.
I am continuing my work on https://github.com/JaidenAshmore/java-dynamic-sqs-listener/pull/200/files#diff-93a42e2504092ed7b5772908ce466c4b which includes changes like:
This example shows the SNS example you were talking about where it gets process by a queue listener called "queue" and as an Xray wrapped service that threw an exception.
Okay I have done some experiments and I am not sure what the best approach for this is.
Doing some googling we may be hitting a similar problem as described here:
Looking at their documentation, the library code appears to be doing what they recommend so I am not sure if there is a good solution to this. I don't have access to AWS Support otherwise I would have asked them what they recommend.
I have attempted to send a message directly to SNS and SQS manually in the AWS Console and there is no tracing information in the message listener. It looks like AWS doesn't do any automatic tracing for you and it requires the service to publish the trace information into the SNS or SQS send message call.
I spent some time looking at how the TracingInterceptor works and it appears that it just creates a new subsegment with name Sqs or Sns and Xray knows to link this to the AWS Service. I attempted to do the same in that the message listener in that I create a subsegment with Sqs as the name but that wasn't a workable solution:
My next attempt is to create a new segment called SQS and do the linking manually. The code for this is a bit of a hack and unfortunuately it appears that Xray has created a new service called SQS for us instead of pointing it to the AWS Service.
// This is the new segment added that we link to the originally segment we were creating below
final Segment parentSegment = recorder.beginSegment("SQS");
parentSegment.setNamespace(Namespace.AWS.toString());
parentSegment.putAws(EntityDataKeys.AWS.OPERATION_KEY, "ReceiveMessage");
final String rawTraceHeader = message.attributes().get(MessageSystemAttributeName.AWS_TRACE_HEADER);
if (rawTraceHeader != null) {
final TraceHeader traceHeader = TraceHeader.fromString(rawTraceHeader);
parentSegment.setTraceId(traceHeader.getRootTraceId());
parentSegment.setParentId(traceHeader.getParentId());
parentSegment.setSampled(traceHeader.getSampled().equals(TraceHeader.SampleDecision.SAMPLED));
}
recorder.endSegment();
// original segment that use to be linked to the trace header.
final Segment segment = recorder.beginSegment(segmentNamingStrategy.getSegmentName(context, message));
segment.setParentId(parentSegment.getId());
segment.setTraceId(parentSegment.getTraceId());
segment.setSampled(parentSegment.isSampled());
When you use just SQS it looks like this, which is a little bit different to what we originally had:
vs what we had originally
Any thoughts?
Hey, going to close it for now as I want to cut the release of 4.0.0 and I am not sure there is a good way to fix that problem of SNS -> SQS -> service.
Let me know if there are any problems and I can do some patch fixes.
Thanks for all the help! :)
Hi Jaiden,
Late reply, since I was on a Holiday. We'll be upgrading to 4.0.0 soon, since it already introduces more X-Ray improvements.
From what I've read, they are working on the SNS > SQS > service problem, so that's something.
If we face problems, I'll create an issue. Thanks for your hard work!
https://docs.aws.amazon.com/xray/latest/devguide/xray-sdk-java.html#xray-sdk-java-submodules
The
aws-xray-recorder-sdk-aws-sdk-v2-instrumentor
submodule can automatically instrument supported AWS services like SQS.Combining it with your listener, unfortunately leads to
It says 'end' but it's actually first caused by
Subsegment subsegment = AWSXRay.beginSubsegment(pjp.getSignature().getName());
in BaseAbstractXRayInterceptor. It wants to create a subsegment of the QueueListener method, but doesn't have a segment. I can get it to work adding conditional checks inhttps://github.com/aws/aws-xray-sdk-java/blob/master/aws-xray-recorder-sdk-spring/src/main/java/com/amazonaws/xray/spring/aop/BaseAbstractXRayInterceptor.java
but I prefer not overriding the AWS SDK. For reference:in the beginning
at the end.
I believe this is caused because the X-Ray SDK uses
com.amazonaws.xray.ThreadLocalStorage
to store segment information and the threading ofSqsAsyncClientProvider
inCoreMessageProcessor
.Another possible workaround is setting the segments manually in the method annotated with
@QueueListener
. However, this is cumbersome and leads to partial / incorrect tracing (eg. when SQS is subscribed to SNS or when using Lambdas)An example to set this up, using Spring is available using AOP: https://docs.aws.amazon.com/xray/latest/devguide/xray-sdk-java-aop-spring.html. If you are not using JPA, this (unreleased) base class is needed in your project: https://github.com/aws/aws-xray-sdk-java/blob/master/aws-xray-recorder-sdk-spring/src/main/java/com/amazonaws/xray/spring/aop/BaseAbstractXRayInterceptor.java