opensearch-project / OpenSearch

🔎 Open source distributed and RESTful search engine.
https://opensearch.org/docs/latest/opensearch/index/
Apache License 2.0
9.77k stars 1.82k forks source link

Context Propagation - OpenTelemetry Integration #7351

Open rishabhmaurya opened 1 year ago

rishabhmaurya commented 1 year ago

Span context isn't propagated automatically when using OpenTelemetry manual instrumentation. It needs to be propagated whenever thread context switch happens (as they are stored in ThreadLocal) to avoid losing it down the execution. It needs to be propagated for following cases -

  1. Spawning thread asynchronously (for sub-tasks parallelism).
  2. Non blocking IO.

The basic idea is - If thread has a context, it should pass the context to the next thread in case of context switch and preserve it for the non-blocking IO operation cases. The context propagation logic shouldn't be left to the consumer to incorporate and should be taken care by framework wherever possible.

For 1, the OpenSearch ActionListener interface is heavily used throughout the codebase. Thus, preserving the context there should take care of all the cases.

For 2, again all non-blocking IO calls are wrapped around ActionListener interface, thus preserving the context around the ActionListener for such calls will ensure whichever thread picks the task onResponse() of the IO event, will have the Context.

For 1, most of the times, it is DirectExecutorService which is used to spawn a thread asynchronously using the same executor service, however, consumer can also use ThreadPool instance to directly get the desired executor service and execute the asynchronous step using it. This usage pattern heavily depends on the business logic and thus will be left to the consumer of the framework to wrap such ActionListeners to preserve the context when needed.

For 2, which is less business logic oriented, there are limited cases and depends on the nature of the IO operation. For example, FileChannel is blocking IO and thus doesn't need to be incorporated. Whereas network calls, which are managed through transport actions and are, majority of the times, non-blocking in nature, use ActionListener interface, where the context needs to be preserved. Thus, wrapping the ActionListner in TransportService before execution the transport action to preserve the context, should take care of majority of the cases. All such cases will be taken care of by framework on demand basis.

Let's summarize the above in terms of requirements from the framework (more details can be found in code changes associated)-

  1. OTelContextPresevingActionListener(ActionListener delegate) - wrapper over ActionListener to preserve the context.
  2. OTelContextPresevingExecutorService(ExecutorService delegate)- wrapper over ExecutorService to automatically stash the context and pass it over to the next thread when it starts work on the task.

Context Propagation Above figure illustrates the expected life cycle of a span, which is started in the middle of an execution of a task, started by Thread 1 (of ExecutorService 1). It performs an IO operation, and on completion of IO, Thread 2 (of ExecutorService 1) resumes the task. Thread 2 also does an IO operation and task is resumed later by the Thread 3 (of ExecutorService 1) on IO completion. Thread 3 performs 3 different sub-tasks, which it executes in parallel by spawning three different threads in different executor services, and is non-blocking in nature, so it continues to work on rest of the task, while sub-tasks are delegated to different threads. However, it does wait for sub-tasks completion, without blocking the thread, and on response to the last sub-task, the task is resumed by Thread 2 (of ExecutorService 1) to its completion and the Span is closed somewhere in between. ExecutorService 3, isn't onboarded to preserve the context, thus it doesn't have the span propagated (no blue strip). However, the ActionListener used by main task while spawning threads to execute the sub-tasks, was context preserving, thus Thread 2 (of ExecutorService 1) was able to resume the span on response on the last sub-task.

In many use cases, the Span closure is tightly coupled to the completion of async step, this brings to the third requirement from the framework i.e. an option to automatically close the span on response to the ActionListener. To keep it simple and clean, the StartSpan API can be extended for such cases. For such cases the contract is, consumer must move all logic of a span into a function, which takes span details (span to be started), function and an ActionListener, to be wrapped with span closure logic, as arguments.

  1. StartSpan(SpanDetails, Function, FunArgs, ActionListener) - This will invoke the function and wraps the ActionListener using OTelContextPresevingActionListener and closes the Span on Respose/Failure of ActionListener.

These requirements needs to be incorporated in the overall initiative of Tracing framework APIs in OpenSearch https://github.com/opensearch-project/OpenSearch/issues/7026 and https://github.com/opensearch-project/OpenSearch/issues/6750

rishabhmaurya commented 1 year ago

@reta @gaganjuneja @Bukhtawar @backslasht would like to know what you guys think about the framework/consumer contract set here. Thanks.

rishabhmaurya commented 1 year ago

Sample usage in source peer recovery code -

  1. OTelContextPresevingActionListener(ActionListener delegate) Wrapping the network call performed by the RetryableTransportClient -

    -  new ActionListenerResponseHandler<>(listener, reader, ThreadPool.Names.GENERIC)
    +  new ActionListenerResponseHandler<>(
    +    new OTelContextPreservingActionListener<>(listener, Context.current()),
    +    reader, ThreadPool.Names.GENERIC)

    source

  2. OTelContextPresevingExecutorService(ExecutorService delegate)- wraps the executor service in ThreadPool.java if its onboarded to the framework.

    +        if (OpenTelemetryService.isThreadPoolAllowed(name())) {
    +           executor = OpenTelemetryContextWrapper.wrapTask(executor);
    +        }

    source

  3. StartSpan(SpanDetails, Function, FunArgs, ActionListener) -

    - recover(request, new ChannelActionListener<>(channel, Actions.START_RECOVERY, request));
    + BiFunction<Object[], ActionListener<?>, Void> recoverFunction = (args, actionListener) -> {
    +     recover((StartRecoveryRequest) args[0], (ActionListener<RecoveryResponse>) actionListener);
    +     return null;
    +  };
    + OpenTelemetryService.callFunctionAndStartSpan("recover", recoverFunction,
    +      new ChannelActionListener<>(channel, Actions.START_RECOVERY, request), request);

    source This will ensure whenever recover is completed, the actionListener is invoked and span is closed. Not closing span can have consequences such as memory leaks or unexpected Spans and scope being propagated. Closing span shouldn't be left to the consumer especially in ActionListener case where flow is asynchronous and closure of span can get complicated.

rishabhmaurya commented 1 year ago

Here is the complete code change for source peer recover and all framework changes described here. It produces following spans -

Screenshot 2023-05-01 at 5 49 30 PM
reta commented 1 year ago

@rishabhmaurya thanks for the summary, wrapping action listeners and thread pools (1st and 2nd) use cases make perfect sense, the OpenTelemetryService.callFunctionAndStartSpan I didn't get - there is ActionListener associated with the function (always) and I think this falls into 1st use case perfectly.

In many use cases, the Span closure is tightly coupled to the completion of async step, this brings to the third requirement from the framework i.e. an option to automatically close the span on response to the ActionListener.

I believe this is always the case: the span should be closed on response or failure, how it is different from the 1st option you have described?

rishabhmaurya commented 1 year ago

@reta thanks for the review.

I believe this is always the case: the span should be closed on response or failure, how it is different from the 1st option you have described?

You're right about the span closure. The first option is just for context propagation and isn't tied to starting a new span and closing them, so if there is an existing context and some asynchronous flow is starting, wrapping ActionListener will ensure that on callback the context is reattached. callFunctionAndStartSpan() - this internally uses OTelContextPresevingActionListener() to wrap the ActionListener, additionally it stashes the current context, start a new span, calls the function and on callback closes the newly started span and reattaches the stashed context back.

public static <R> void callFunctionAndStartSpan(String spanName, BiFunction<Object[], ActionListener<?>, R> function,
                                                ActionListener<?> actionListener, Object... args) {
    Context beforeAttach = Context.current();
    Span span = startSpan(spanName);
    try(Scope ignored = span.makeCurrent()) {
        actionListener = new OTelContextPreservingActionListener<>(actionListener, beforeAttach, span.getSpanContext().getSpanId());

        function.apply(args, actionListener);
    } 
}

on callback

@Override
public void onResponse(Response r) {
    try (Scope ignored = Objects.requireNonNull(afterAttachContext).makeCurrent()) {
        Span span = Span.current();
        closeCurrentScope(span);
    }
    try (Scope ignored = Objects.requireNonNull(beforeAttachContext).makeCurrent()) {
        delegate.onResponse(r);
    }
}

In first case, the before and after attach context would be same as we are not starting a new span.

reta commented 1 year ago

Thanks @rishabhmaurya , I believe the span could be started by the method itself (recover in your example), there is no need to introduce the indirection as it just not really bringing a lot of value (in my opinion). Plus, there would be tons of the function shapes so you would probably end up with wrapping Callables anyway.

rishabhmaurya commented 1 year ago

I was a bit hesitant as well and felt like its more of a utility method than a requirement to be part of framework. Given 1 -

public OTelContextPreservingActionListener(ActionListener<Response> delegate, Context beforeAttachContext, String spanID)

can be used to achieve 3, I will remove it from the framework. Will keep it as a utility method (maybe in some util class) in case any consumer find it useful. Instead will add another constructor for the case where consumer is not starting a new span but just want to use it for context propagation of existing spans -

public OTelContextPreservingActionListener(ActionListener<Response> delegate)

I was also thinking if this can be annotated and if the last arg is an actionListener, something like -

@StartSpan("spanName")
fun(arg1, arg2, actionListener) {}

However, I will park it for now and will create a separate discussion to avoid code pollution to discuss short hand notations and annotations which can be used.

rishabhmaurya commented 1 year ago

I will wait for basic open telemetry framework changes being implemented in https://github.com/opensearch-project/OpenSearch/issues/7026 to be pushed into the feature branch before raising the PR here. cc @Gaganjuneja @backslasht

Gaganjuneja commented 1 year ago

@rishabhmaurya, tracing framework would take care of the context propagation automatically through ThreadContext. Tracing aware implementation of ActionListener makes sense in some scenarios like where same thread is submitting multiple async tasks. TracingAwareActionListener should be able to call the framework to start and end the span.

rishabhmaurya commented 1 year ago

@Gaganjuneja I somehow missed your comment earlier. Thanks for the review. Could you elaborate more on

tracing framework would take care of the context propagation automatically through ThreadContext?

Assuming someone started a span using tracing framework, this would internally translate into following -

  1. Write all associated context associated with the span to the Thread local. This is something OpenTelemetry will do on its own as soon as a span is started.
  2. Tracing framework will convert the key values set in the ThreadLocal in step1 to transient headers compatible with ThreadContext using one its API (putHeader() or putTransient() )? https://github.com/opensearch-project/OpenSearch/blob/bd9b00d4809c495636d34ee922569f6f495200f1/server/src/main/java/org/opensearch/common/util/concurrent/ThreadContext.java#L407-L416
  3. Going forward use the context set in ThreadContext to start the OpenTelemetry span whenever requested down the chain of execution.

OR

you mean to modify the default behavior of Span by making changes in OpenTelemetry classes to use ThreadContext instead of directly interacting with the ThreadLocal? This means we have to ensure with each new version of OpenTelemtry the conversion logic is compatible.

I'm not against using ThreadContext but I think using ThreadContext, we are introducing a translation layer to translate context back and forth from opentelemetry way <-> ThreadContext, which can have maintenance overhead with otel versions and is an additional step which can be avoided. I maybe completely wrong here though, thus, it would be great if you could elaborate more on the approach you are thinking to help us take a better call here.

Also, did you agree upon we need something like 1) which you referred as TracingAwareActionListener?

Gaganjuneja commented 1 year ago

Hi @rishabhmaurya, I have written a detailed prototype here #7026. Idea is to just keep the current span in the thread context. For TracingAwareActionListener, yes we would be needing this but TracingAwareActionListener should take care of staring and closing the span.

rishabhmaurya commented 1 year ago

@Gaganjuneja Yes, I read it and it wasn't clear there thus I asked the question above. Would be great if you can answer above and more details on implementation of addParentToThreadContext(threadPool.getThreadContext(), span);.

TracingAwareActionListener should take care of staring and closing the span.

this is discussed above and I think it shouldn't start and close the span automatically as a default behavior.

Gaganjuneja commented 1 year ago

@rishabhmaurya Sure, There is a slight change post discussion on the same thread. Now we will be storing the current span in the ThreadContext. Span would be our own implementation which contains the OtelSpan and the parent span. So here, ThreadContext would take care propagating the current span to new threads and other data node calls. The framework would take care of updating the current span based on the start and end span calls.

TracingAwareActionListener should take care of staring and closing the span.

this is discussed above and I think it shouldn't start and close the span automatically as a default behavior.

Yes, my bad start should be in the method itself but the end would be inside the listener onSuccess and onFailure methods.

thoughts?