Open piotrmackowiak opened 4 years ago
Hi Piotr,
For such asynchronous use case, you will need to manually pass the trace context across threads so you don't create another segment for the CompletableFuture. For example:
Entity entity = AWSXRay.getTraceEntity();
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
AWSXRay.setTraceEntity(entity);
return "Hello World";
);
In order to "tell XRay to wait with sending Segment from AWSXRayServletFilter until he will get response from Controller", you can override the AWSXRayServletFilter
to send a "in-progress" flag to record that a segment is started, but is not complete. Then complete it later.
Please feel free to reopen if you have any other question
@shengxil AWSXRay.setTraceEntity is now deprecated though it is still the prescribed method to in the official AWS docs.
How do we use CompletableFuture with the replacement interfaces and still be able to leverage the common pool's executor?
@softprops I'm working with the AWS Docs team to get this PR merged to update the docs away from using setTraceEntity
.
The new-and-improved version of the code snippet above would use the Entity.run()
method like this:
Entity entity = AWSXRay.getTraceEntity();
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
String ret = null; // May need to be AtomicReference, not sure off the top of my head
entity.run(() -> {
// Do meaningful work for CompletableFuture
ret = "Hello World";
});
return ret;
);
This has the benefit of automatically cleaning up your thread local context, which the current example does not.
Since the return type of entity.run
is void
does your example compile? In my usecases I need a CompletableFuture
that returns the data of the underlying operation.
@softprops you're right, my mistake. I've updated the sample to something that should compile
May need to be AtomicReference, not sure off the top of my head.
lol.
Is there a way to do this with a callable instead of a runnable. In that case of Callables, you can return values
Sorry, as you can tell the async paradigm in Java is not my strong suit :)
I realize the best tool for this case is actually our SegmentContextExecutor
, since you can pass it directly to your supplyAsync
call as the Executor argument. This way the supplied lambda will have the required context with minimal additional code and the same cleanup guarantee. We have an example of how to pass in a SegmentContextExecutor
in the javadoc. Hope this helps for your use case!
I realize the best tool for this case is actually our SegmentContextExecutor, since you can pass it directly to your supplyAsync call as the Executor argument
Yes, I saw that however in doing so, one would lose the asynchronous benefits of using a CompletableFuture.
Let me know if I'm misunderstanding something. I'll try to explain below.
The default executor service CompletableFuture.supplyAsync
will use is the CommonPool, a good default for most IO-bound applications. The second optional argument is indeed intended to implement the Executor
interface, for asynchronous code, its typically an executor backed by a thread pool like those exposed by factory methods on the std library's Executors class.
If one were to use the SegmentContextExecutor
, calls would turn to blocking calls as SegmentContextExecutor
does not execute a task within a thread pool It just happens on the current thread which in all irony means that no segment propagation is needed!
Here's the way I understand the javadocs example to work
client.getItem(request).thenComposeAsync(response -> {
// If we did not provide the segment context executor, this request would not be traced correctly.
return client.getItem(request2);
}, SegmentContextExecutors.newSegmentContextExecutor());
client.getItem(request)
presumably executes with a future context itself so right off the bat I'm not sure how the segment gets propagated there. thenComposeAsync(...)
will execute the second request within the same thread as client.getItem(request)
after client.getItem(request)
completes so I believe the second request would get traced but its unclear how the first request would without a picture of how it propegates the segment in its thread.
Using the example at the beginning of this thread would work but would effectively make the CompletableFuture
a blocking request as the SegmentContextExecutors
does not execute the task on a thread pool
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
return "Hello World";
}, SegmentContextExecutors.newSegmentContextExecutor() // the supplier will just execute on the current thread here
);
The closest and cleanest approach I've seen so far was listed above but is now deprecated
Entity entity = AWSXRay.getTraceEntity();
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
AWSXRay.setTraceEntity(entity);
return "Hello World";
);
You might be able to get the best of both worlds by having SegmentContextExecutors
implement Executor interface but wrap an Executor that provides some desirable async behavior
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
return "Hello World";
}, SegmentContextExecutors.newSegmentContextExecutor(ForkJoinPool.commonPool())
);
Hi @softprops - indeed having SegmentContextExecutors.wrap(Segment, Executor)
will help for your use case if you need to delegate the callbacks to a threadpool. The intent of the current executors is for use with actual async clients. So in this example you gave (I added numbers to the clients to refer to them better)
client1.getItem(request).thenComposeAsync(response -> {
// If we did not provide the segment context executor, this request would not be traced correctly.
return client2.getItem(request2);
}, SegmentContextExecutors.newSegmentContextExecutor());
actually the code works fine. client2.getItem
returns CompletableFuture
, so the second call also does not actually do any blocking. The reason segment propagation is required using *Async
is that client1.getItem
and the body of the callback in themComposeAsync
can run on different threads, for example with asynchronous AWS SDK clients, Netty is used for the actual HTTP calls and the callback will run on a Netty event thread. So generally we expect thenCompose
to work well with the current executors.
But if you use thenApply
because the client is actually blocking, not returning a CompletableFuture
, you probably do want to delegate to a threadpool. This is common with JDBC
client1.getItem(request).thenApplyAsync(response -> {
return sqlClient.executeQuery(query); // Does not return CompletableFuture
}, SegmentContextExecutors.wrap(Segment.current(), threadPoolExecutor));
We're currently missing this method and it would be a great addition. In opentelemetry-java, we have implemented it and can follow a similar implementation https://github.com/open-telemetry/opentelemetry-java/blob/master/api/context/src/main/java/io/opentelemetry/context/Context.java#L194
Just for a bit of information, in this
Entity entity = AWSXRay.getTraceEntity();
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
AWSXRay.setTraceEntity(entity);
return "Hello World";
);
it actually demonstrates why we deprecated setTraceEntity
- it is unsafe because setTraceEntity
sets into a threadlocal but is not cleared in this callback. It means that same thread may handle a different, completely unrelated event, with the segment accidentally in the threadlocal.
Thanks for the explanation.
To answer the question originally asked in this thread,
What what is the recommended approach for those using CompletableFuture.supplyAsync to initiate an async operation that is not and aws sdk request?
The only way I can see that doesn't sacrifice making the operation happen in a non blocking way is the now deprecated method.
@softprops I believe you're right here that currently there is no good way to use supplyAsync
in a non-blocking way. However, I think the solution that @anuraaga has proposed (correct me if I'm wrong) for thenApplyAsync
is what you would want here, but is unfortunately not yet implemented in this SDK:
Segment segment = AWSXRay.getCurrentSegment();
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
// ...
return "Hello World";
}, SegmentContextExecutors.wrap(segment, ForkJoinPool.commonPool())); // any Executor could be wrapped
This would allow you to wrap an arbitrary Executor, like the CommonPool, with X-Ray segment context propagation. We are always open to pull requests if you'd like to contribute this change, otherwise we'll add it to our backlog. Reopening this issue to track.
I suggested wrapping an underlying executor but that's cool. Thanks for hashing this out with me.
You might be able to get the best of both worlds by having
SegmentContextExecutors
implement Executor interface but wrap an Executor that provides some desirable async behaviorCompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> { return "Hello World"; }, SegmentContextExecutors.newSegmentContextExecutor(ForkJoinPool.commonPool()) );
Any update on this ?
@praetp unfortunately we have not been able to prioritize this work yet, but are always open to PRs e.g. to implement such a proposed wrapper.
Generally though, we recommend you also take a look at the AWS Distro for OpenTelemetry's (ADOT) Java support. It was released as generally available in September for both auto-instrumentation, which will manage context in async paradigms for you, and manual instrumentation, if you'd like to have more fine-grained control over the context.
Any update on this? I'm looking into the ADOT approach, but need to figure out the above issue in case we don't go that direction.
Can I use this?
final Segment segment = AWSXRay.getCurrentSegment();
CompletableFuture<Stuff> future = CompletableFuture.supplyAsync(() -> {
return doStuff();
}, SegmentContextExecutors.newSegmentContextExecutor(segment));
Looks like I can use the above in one of my use-cases, but the other has its own executor and cannot.
Hi @gaoagong, in regards to supporting wrapping arbitrary Executors with X-Ray segment context propagation, this task is still in our backlog. While so, we are still open to PRs for this implementation.
In the meantime, I would recommend that you continue to look into the ADOT approach, for the same reason mentioned above in the previous comment.
@gaoagong You can write like your example, but it doesn't support executing the task with other Thread. It means that the issue @softprops reported is still remaining.
If one were to use the SegmentContextExecutor, calls would turn to blocking calls as SegmentContextExecutor does not execute a task within a thread pool It just happens on the current thread which in all irony means that no segment propagation is needed!
Entity#run seems to depend on setTraceEntity
which has been deprecated.
Therefore, I think it is also needed to reconsider the implementation of Entity#run. It seems hard to solve only with SegmentContextExecutors.wrap
.
By the way, I hope that the X-Ray document will mention about this async usecase problem.
Hey Guys,
is there any timeline for when this issue will be addressed? We are facing similar issues with Kotlin suspend functions.
Regards, Andreas
Hi,
How to use properly XRay with CompletableFuture?
How that works right now:
AWSXRayServletFilter is created also doing his job - catch request and create Segment for them. However if new task is spawned (in Java with CompletableFuture), inside of this task there is no possibility to reach Segment created by AWSXRayServletFilter. What I need to do in this case, is create another Segment manually, but if I will do this two different events are pushed to XRay Console.
Is there any way to tell XRay to wait with sending Segment from AWSXRayServletFilter until he will get response from Controller.
So intead of
it is possible to have:
BR, Piotr