hypertrace / javaagent

Hypertrace OpenTelemetry Java agent with payload/body and headers data capture.
Apache License 2.0
34 stars 15 forks source link

Context propagation in async/parallel lambdas #64

Closed pavolloffay closed 4 years ago

pavolloffay commented 4 years ago

Verify whether a context is being propagated in async lambdas.

Some code examples

Using ForkJoinPool.commonPool():

    IntStream.range(0, 20)
        .parallel()
        .forEach(value -> {
          System.out.println("Thread : " + Thread.currentThread().getName() + ", value: " + value);
          // currentSpan is not propagated
          printSpan(TRACER.getCurrentSpan());
        });

    ForkJoinPool.commonPool().execute(() -> {
      System.out.println("Thread : " + Thread.currentThread().getName());
      // currentSpan is correctly propagated
      printSpan(TRACER.getCurrentSpan());
    });
    ForkJoinPool.commonPool().invoke(new RecursiveAction() {
      @Override
      protected void compute() {
        System.out.println("Thread : " + Thread.currentThread().getName());

        // currentSpan is correctly propagated
        printSpan(TRACER.getCurrentSpan());
      }
    });

   // These run also on the ForkJoinPool.commonPool
    CompletableFuture<String> completableFuture
        = CompletableFuture.supplyAsync(() -> {
      System.out.println("Thread : " + Thread.currentThread().getName());
      // currentSpan is correctly propagated
      printSpan(TRACER.getCurrentSpan());
      return "Hello";
    });
    CompletableFuture<String> future = completableFuture
        .thenApplyAsync(s -> {
          System.out.println("Thread : " + Thread.currentThread().getName());
          // currentSpan is correctly propagated
          printSpan(TRACER.getCurrentSpan());
          return s + " World";
        });

  @PostMapping(path = "/async")
  public void createAbc(HttpServletRequest request) {
    AsyncContext asyncContext = request.startAsync();
    asyncContext.start(() -> {
      try {
       // current span is propagated
        printSpan(TRACER.getCurrentSpan());
        Thread.sleep(1000);
      } catch (InterruptedException e) {
        e.printStackTrace();
      } finally {
        asyncContext.complete();
      }
    });
  }

   ExecutorService executor = Executors.newFixedThreadPool(5);
     executor.execute(()-> {
      // current span is correctly propagated.
      printSpan(TRACER.getCurrentSpan());
    });

OTEL support ForkJoinPool https://github.com/open-telemetry/opentelemetry-java-instrumentation/issues/709 OT special agent ForkJoinPool: https://github.com/opentracing-contrib/java-specialagent/issues/102

pavolloffay commented 4 years ago

OTEL agent instruments public APIs of the ExecutorService, ForkJoinPool etc. When a lambda is passed e.g. executor.execute(() -> {}). The instrumentation creates a wrapper around lambda (in this case runnable) and stores a context inside. When wrapper is executed the context is restored.

The context propagation seems to be broken only when the parallel stream is used. The parallel stream uses ForkJoinPool.commonPool. In the following code the context is not propagated in the foreEach. However when the commonPool is called explicitly the context is correctly propagated since all public APIs of the ForkJoinPool wrap the execution.

   IntStream.range(0, 20)
        .parallel()
        .forEach(value -> {
          System.out.println("Thread : " + Thread.currentThread().getName() + ", value: " + value);
          // currentSpan is not propagated
          printSpan(TRACER.getCurrentSpan());
        });
pavolloffay commented 4 years ago

Done this is noop on our side. The only missing part is https://github.com/open-telemetry/opentelemetry-java-instrumentation/issues/709

demidov163 commented 1 year ago

Hello, sorry, but issue with stream.parallel() was fixed? I mean propagating traceid, because seems it isn't work in last version of https://github.com/open-telemetry/opentelemetry-java-instrumentation/releases