micrometer-metrics / context-propagation

Context Propagation API
Apache License 2.0
84 stars 25 forks source link

I want to propagate TraceId to all places where parallelStream() and CustomForkJoinPool is used. #138

Open petercheon opened 1 year ago

petercheon commented 1 year ago

I would like to know how to propagate the TraceId within the Java Stream API in Spring Cloud Sleuth.

In the code below, the trace_id is not propagated, and each client ends up having an individual TraceId.

TestForkJoinPool testForkJoinPool = new TestForkJoinPool(10);
Span span = tracer.nextSpan(tracer.currentSpan());

testForkJoinPool.submit(() -> {
    try (SpanInScope ignored = tracer.withSpan(span)) {
        IntStream.range(1, 20).parallel().forEach(idx -> {
            httpClient.get(1000L);
        });
    } finally {
        span.end();
    }
}).get();

However, if we modify the above code as shown below, the TraceId is propagated.

TestForkJoinPool testForkJoinPool = new TestForkJoinPool(10);
Span span = tracer.nextSpan(tracer.currentSpan());

testForkJoinPool.submit(() -> {
    IntStream.range(1, 20).parallel().forEach(idx -> {
        try (SpanInScope ignored = tracer.withSpan(span)) {
            httpClient.get(1000L);
        } finally {
            span.end();
        }
    });
}).get();

Unfortunately, applying SpanInScope to all existing Stream API code as mentioned above has its limitations. Is there a way to inject SpanInScope with minimal modifications to the existing Stream API code?

Thank you.

jonatan-ivanov commented 1 year ago

See connected issue in Sleuth: https://github.com/spring-cloud/spring-cloud-sleuth/issues/2297 To me this seems like a new feature instrumenting the Fork Join API (or find a way to inject an executor/completion service).

chemicL commented 11 months ago

The classes from the tracing domain make it a bit hard for me to understand the suggestion. Consider providing an example with only classes from the JDK and the context-propagation library so that it is more comprehensible.

I'm skeptical we can do anything for the ForkJoinPool.commonPool(), which is implicitly used by the Stream API and there is no option to configure a different implementation. I suppose manual ContextSnapshot capturing and restoration is what's available at the moment.

If you have some suggestions, please update the issue.

github-actions[bot] commented 10 months ago

If you would like us to look at this issue, please provide the requested information. If the information is not provided within the next 7 days this issue will be closed.

cah-renan-ferreira commented 4 months ago

Any news on that? Is is possible to do that manually at least?

marcingrzejszczak commented 4 months ago

Have you tried using ContextExecutorService.wrap(ForkJoinPool.commonPool()) ?

cah-renan-ferreira commented 4 months ago

Not really. Should I do it everytime before calling a parallelStream or just once on app startup? Thanks for the support.

marcingrzejszczak commented 4 months ago

The problem is that this will return the ExecutorService interface that will simply wrap the common pool. I don't think you'll be happy with the result cause you want to use the ForkJoinPool API :grimacing: