opensearch-project / OpenSearch

🔎 Open source distributed and RESTful search engine.
https://opensearch.org/docs/latest/opensearch/index/
Apache License 2.0
9.5k stars 1.74k forks source link

[Prototype] Distributed Tracing #7026

Open Gaganjuneja opened 1 year ago

Gaganjuneja commented 1 year ago

Distributed tracing is defined in this RFC #6750. In the continuation of this, I prototyped the actual interface and implementation to see how it works? We mostly focused on the below aspects in this prototype and rest should be built on top of it.

  1. Abstractions Abstractions are clearly defined to hide the implementation framework like OTel so that the tracing users need not be aware about the implementation. We would be exposing the well defined simple interface to the developers and hiding the implementation. Developers can add the traces by simply adding the start trace and end trace.
  2. Code Pollution Code should not be polluted. If we write a lot of code for tracing then it will be hard to find/focus on the actual logic. We will be providing the methods to start/end traces and most of the plumbing work will be taken care of by the framework.
  3. Automatic Context Propagation A trace may be linked to a parent and one parent trace could have multiple children. Most of the tasks/operations run in async mode in the OpenSearch cluster. These async tasks may run on the same thread, different threads in the same thread pool, different thread pool and even on different nodes. In this case the parent context needs to be propagated to the child so that child trace and parent trace can be linked together to show the telemetry information either at parent or / as well as at child level. One classical example for this would be a search request.

Example - Let’s assume there is an OS cluster of 3 data nodes. It contains one Index with 3 shards distributed over 3 nodes (node1, node2, node3). Now we need to trace this request at multiple code points like, RestAction, QueryPhase, IndexSearcher and Fetch phase then the physical view of the query would somewhat look like.

Screenshot 2023-04-05 at 12 12 16 PM

Let’s say we are tracing all these places so we need to propagate the current parent to the following tasks/operations automatically as the current context might not be aware of the parent.

There are 2 ways this context propagation can be done.

  1. OpenSearch ThreadContext - Opensearch custom ThreadContext provides out of the box feature to propagate the context to forked threads on the same/different threadpool or even the network calls to the other nodes. We can simply update the parent info in the thread context and it will be available to the children. ThreadContext will persist the context of immediate parent only.

    1. Pros
      1. Out of the box support.
      2. Context propagation can totally be abstracted out in the framework itself.
    2. Cons
      1. Slight maintenance overhead in maintaining the span cleanup logic and updating the ThreadContext.
  2. Opentelemetry- Opentelemetry also provides the support for context propagation but it requires an exposure of Otel code to the user. More details on context propagation through OTel can be found here #6533 .

    1. Pros
      1. No state needs to be maintained at Framework level so no maintenance overhead.
    2. Cons
      1. Code pollution
      2. Exposes the OTel/Telemetry implementation to core OS code so upgrades/change the implementation solution would be difficult.
      3. Hard to add circuit breakers and level management as levels are not part of Otel yet.
  3. Recommended Approach - Both approaches have their own pros and cons but fundamentally approach-1 OpenSearch ThreadContext gives us more control and clear abstractions. Though it requires some extra code maintenance overhead but considering the pros it’s recommended.

Sample Tracing Interface ## (Not prod ready code just for understanding purposes)

Levels - Level is an OpenSearch specific concept which OpenTelemetry doesn’t support yet(OpenTelemetry github issue for Levels). It will work similar to logging levels. It allows developers to add detailed spans with appropriate tracing levels. While debugging any issue they should be able to dynamically enable the granular tracing levels for sometime. There is one strong assumption we will have to make, the level of a parent can't be higher ordered than the child so that it shouldn't get into a situation where parent span is filtered out based on the level and child still exists; it will lead to a parent child linking issue and the child will be orphaned.

public enum Level{
        UNKNOWN(0), LOWEST(1), LOW(2), MID(3), HIGH(4);
        private int order;
        private Level(int order){
            this.order = order;
        }
    }
public class SpanName{
        String name;
        String uniqueId;
    }
public interface Tracer {

    /**
     * Start the trace with passed attributes. It takes care of automatically propagating the context/parent to the
     * child spans wherever the context switch is happening like from one thread to another in the same
     * ExecutorService, across thread pools and even across nodes.
     * 
     * SpanName should be unique for each span. SpanName consists of two parameters uniqueId and name. So the caller
     * need to provide the unique id for each span otherwise span creation will fail.
     * @param spanName
     * @param attributes
     */
    default public void startTrace(SpanName spanName, Map<String,Object> attributes, Level  level){
        startTrace(spanName, attributes, null, level);
    }

    /**
     * Start the trace with passed attributes. It takes care of propagating the context/parent automatically to the
     * child spans wherever the context switch is happening like from one thread to another in the same
     * ExecutorService, across thread pool and even across nodes.
     *
     * Caller can also explicitly set the Parent span. It may be needed in case one more level of nesting is
     * required and the cases where multiple async child tasks are being submitted from the same thread and the user
     * wants to set this child as a parent of the following runnable(s). The parentSpanName provided should be an active
     * span.
     *
     * SpanName should be unique for each span. SpanName consists of two parameters uniqueId and name. So the caller
     * need to provide the unique id for each span otherwise span creation will fail.
     *
     * Caller can also explicitly set the Parent span. It may be needed in case one more level of nesting is
     * required and the cases where multiple async child tasks are being submitted from the same thread and the user
     * wants to set this child as a parent of the following runnable. The parentSpanName provided should be an active
     * span. 
     *
     * Example - if three spans (A,B,C) are started in the same thread (before calling the endTrace). Then A will become
     * the parent of B and B will become the parent of C. In case the user want A to be the parent of both B and C then
     * they will have to tell explicitly through the parentSpanName.
     *
     * Callers need to define the level of the span. Levels are ordered and specified by an order value. Only Spans with
     * Levels with value higher and equal to the configured level will be published. Level of a child span can't be
     * higher than the parent span so that it shouldn't get into a situation where parent span is filtered out based on
     * the level and child still exists; it will lead to a parent child linking issue and the child will be orphaned.
     * @param spanName
     * @param attributes
     * @param parentSpanName
     * @param level
     */
    public void startTrace(SpanName spanName, Map<String,Object> attributes, SpanName parentSpanName, Level level);

    /**
     * Ends the scope of the trace. It is mandatory to end each span explicitly. If there
     * are multiple endTrace calls for the span then only one which reaches the first will
     * succeed and rest all will be ignored. 
     * @param spanName
     */
    public void endTrace(SpanName spanName);

    /**
     * Adds attribute to the span.
     * @param spanName
     * @param key
     * @param value
     */
    public void addAttribute(SpanName spanName, String key, Object value);

    /**
     * Adds an event to the span.
     * @param spanName
     * @param event
     */
    public void addEvent(SpanName spanName, String event);

}

Sample Implementation

Important Points of implementation.

  1. ThreadPool - We need to register the thread pool with the tracer during initialisation from Node.java. There is another way where users can pass the ThreadContext to the startTrace method but that would require the ThreadContext to be available in a lot of low level classes which may overall pollute the code. So the preference is to inject the ThreadPool while initialising the TracerFactory class.
  2. SpanMap - This implementation keeps all the Otel spans in the map locally and doesn't expose it to the customer. There are following pros/cons of this approach.
    1. Pros
      1. Abstraction - We will keep the spans locally and not expose it to the users so that actual core OS code or even plugins code will not have direct dependency on Otel. This way we can make it vendor agnostic and easily replace with the other telemetry solution if required.
      2. Span management - We can automatically close the spans which are open for a long time to avoid the memory leak. It could happen because of missing end span and errors. We will allow the TTL per span to be configured in case bigger TTLs than the default value (yet to be figured out) are required.
      3. Guard rails - With this approach we can easily implement the circuit breakers on the number of spans open at any point in time. We will be losing the out on the spans when the circuit breaker is triggering in so we will have to define the strategies based on span level, component priority, etc. (Yet to be discovered but we do have the option to circuit break,)
    2. Cons
      1. Management Overhead - We need to define the cache similar to LRU and all to store the spans so it will incur some management overhead at the framework level.
public class DefaultTracer implements Tracer {

    private final ThreadPool threadPool;
    private final Map<String, Span> spanMap = new ConcurrentHashMap<>();

    public DefaultTracer(io.opentelemetry.api.trace.Tracer openTelemetryTracer, ThreadPool threadPool) {
        this.openTelemetryTracer = openTelemetryTracer;
        this.threadPool = threadPool;
    }

    @Override
    public void startTrace(SpanName spanName, Map<String, Object> attributes, SpanName parentSpanName, Level level) {
        Level calculatedLevel = getLevel(level);
        if(!isLevelEnabled(calculatedLevel)){
            return;
        }
        Span parentSpan = null;
        if(parentSpanName != null){
            //parent span shouldn't be ended
            parentSpan = spanMap.get(getSpanMapKey(parentSpanName.uniqueId, parentSpanName.name));
            parentSpan = parentSpan == null ? getParentFromThreadContext(threadPool.getThreadContext()) : parentSpan;
        }
        Span span = openTelemetryTracer.spanBuilder(spanName.name).setParent(Context.current().with(parentSpan)).startSpan();
        setSpanLevel(span, parentSpan, calculatedLevel);
        addParentToThreadContext(threadPool.getThreadContext(), span);
        spanMap.put(getSpanMapKey(spanName.uniqueId, spanName.name), span);
        setSpanAttributes(span, parentSpan);
    }

    @Override
    public void endTrace(SpanName spanName) {
        Span span = spanMap.get(getSpanMapKey(spanName.uniqueId, spanName.name));
        if (span == null) {
            return;
        }
        String parentSpanId = getParentSpan(span);
        Span parentSpan = spanMapById.get(parentSpanId);
        span.end();
        spanMap.remove(getSpanMapKey(spanName.uniqueId, spanName.name));
        if (parentSpan != null) {
            addParentToThreadContext(threadPool.getThreadContext(), parentSpan);
        }
    }

    @Override
    public void addAttribute(SpanName spanName, String key, Object value) {
        /**
         * Adds attribute to the existing open span.
         */
    }

    @Override
    public void addEvent(SpanName spanName, String event) {
        /**
         * Adds event to the existing open span.
         */
    }

TracerFactory TracerFactory will help in initialising the tracer objects and return the instance so that we need to pass the tracer instance everywhere in the code. Users can simply get the instance from TracerFactory and start the instrumentation.

public class TracerFactory {

    private static Tracer tracer;

    //Need to make singleton
    public static synchronized void initializeTracer(ThreadPool threadPool){
        OpenTelemetry openTelemetry = OTelMain.openTelemetry;
        io.opentelemetry.api.trace.Tracer openTelemetryTracer = openTelemetry.getTracer("instrumentation-library-name", "1.0.0");
        if(tracer == null) {
            tracer = new DefaultTracer(openTelemetryTracer, threadPool);
        }else{
            throw new IllegalStateException("Double-initialization not allowed!");
        }
    }

    public static Tracer getInstance(){
        return tracer;
    }
}

Sample tracing in the search flow

Snippets of instrumentation for the above example.

//TaskManager.java 
//Though the right place to add this trace is Inside TransportAction 
//but just added for testing purposes.

public Task register(String type, String action, TaskAwareRequest request) {
  .
  .
  TracerFactory.getInstance().startTrace(new SpanName( "Task", String.valueOf(task.getId())), 
            null, new SpanName( "Task", String.valueOf(task.getParentTaskId().getId())), Level.HIGH)
  .
  .
  .

  }
public Task unregister(Task task) {
   .
   .
    TracerFactory.getInstance().endTrace(new SpanName( "Task", String.valueOf(task.getId())));
   .
   .
}
//Inside SearchOperationsListener
    @Override
    public void onPreQueryPhase(SearchContext searchContext) {
        SearchOperationListener.super.onPreQueryPhase(searchContext);
        TracerFactory.getInstance().startTrace(new SpanName("onQueryPhase", 
                                                String.valueOf(searchContext.getTask().getId())), null, Level.HIGH);

    }

    @Override
    public void onQueryPhase(SearchContext searchContext, long tookInNanos) {
        SearchOperationListener.super.onQueryPhase(searchContext, tookInNanos);
        TracerFactory.getInstance().endTrace(new SpanName("onQueryPhase", 
                                              String.valueOf(searchContext.getTask().getId())));

    }

     @Override
    public void onPreFetchPhase(SearchContext searchContext) {
        SearchOperationListener.super.onPreFetchPhase(searchContext);
        TracerFactory.getInstance().startTrace(new SpanName("onFetchPhase", 
                                             String.valueOf(searchContext.getTask().getId())), null, Level.HIGH);
    }

    @Override
    public void onFetchPhase(SearchContext searchContext, long tookInNanos) {
        SearchOperationListener.super.onFetchPhase(searchContext, tookInNanos);
        TracerFactory.getInstance().endTrace(new SpanName("onFetchPhase", 
                                          String.valueOf(searchContext.getTask().getId())));

    }
//Inside ContextIndexSearcher.java
protected void search(List<LeafReaderContext> leaves, Weight weight, Collector collector) throws IOException {
        String randomString = UUID.randomUUID();
        TracerFactory.getInstance().startTrace(new SpanName("IndexSearcher", randomString), null, Level.MID);

        for (LeafReaderContext ctx : leaves) {
            searchLeaf(ctx, weight, collector);
        }

        TracerFactory.getInstance().endTrace(new SpanName("IndexSearcher", randomString));

    }

Sample tracing output

Screenshot 2023-04-06 at 2 58 25 PM
reta commented 1 year ago

Thanks @Gaganjuneja the general approach sounds reasonable, few question which are not clear to me:

On more general note, we have a few core mechanism to distribute the work: actions/listeners, tasks manager and thread pools (at least). Those should be the ones taking care of establishing the initial boundaries (spans fe) which could be augmented later on. Using the TracerFactory.getInstance() pollutes the code (and difficult to test actually), I think we should better use the context (like SearchContext, tasks context) to fence access to the tracers.

Gaganjuneja commented 1 year ago

Thanks @reta for reviewing the approach and providing the valuable feedback.

  • Why do we need SpanName? In OTel, every trace and span have an ID already, this is by design. I would expect span name to be a simpler string.

You are right, the spanID is already there but the point is we don’t want to expose the OTel spans out of the framework and keep them in a Map. So now in order to create a unique key for this map we need some unique identifier. Name should be the same for all the traces representing the same logical unit of work. Essentially this is just for the SpanMap key only.

  • Why do we need SpanMap? The natural relations between traces and spans is parent ID, the tracer would need to keep the spans (before flushing them out) in any case. I don't think we need to maintain this additional map

Tracers only keep the spans once they are closed for flushing based on the type of processor like BatchSpanProcessor for some duration, etc. But It doesn't keep the full span hierarchy. Anyways the traceid is the same so that they can reconstruct the hierarchy at the data store. We need to persist the span reference somewhere. It could be either return the span to callers, SpanMap or ThreadContext. There are a couple of reasons why I chose SpanMap over others. But yes I agree there is an overhead in maintaining the SpanMap.

  1. I think starting a trace should be as simple as adding a log to the logger and didn't want the users to work with spans directly.  Due to the async nature of our workloads/tasks it’s possible that users create spans in one method and ends somewhere else. So now this span can be passed in 3 ways -     1.    SpanMap - We keep the spans in the map. SpanName can be deterministically determined from the context available in that method.     2.    ThreadContext - We can keep the current span also in the ThreadContext and users can just ask to close the current span.     3.    Span Object - User will be given the handle to Span and now its their responsibility to pass Span wherever it needs to be closed. This will pollute the code as well.
  2.    It gives us more control to put checks in place like parents shouldn’t be closed before the child etc.
  3.    We are doing the custom context/propagation so at the Span end call we need to reset the parent in the thread context. I just wanted to keep it abstract from users.
  4.    SpanMap gives us some more benefits like defining the guard rails, preventing memory leaks etc. mentioned in detail above under the section  SpanMap.

We can discuss the benefits vs overhead and take a call. One other possible solution could be to expose a custom wrapped spans which contains the parent span and some additional details.

On more general note, we have a few core mechanism to distribute the work: actions/listeners, tasks manager and thread pools (at least). Those should be the ones taking care of establishing the initial boundaries (spans fe) which could be augmented later on. Using the TracerFactory.getInstance() pollutes the code (and difficult to test actually), I think we should better use the context (like SearchContext, tasks context) to fence access to the tracers.

I have also thought about this and tried even passing the tracer object to the classes through search context, even if you look at the sample implementation I started from TaskManager itself and SearchOperationListener. But while doing this I realized that some low level classes like ContextIndexSearcher.java on the search path don’t even have access to the SearchContext and even if we go one level till Lucene then, we will unnecessarily end up with passing the tracer object everywhere.

Somehow, when I think much about adding traces the closest thing comes to my mind is logging. Yes testing is difficult for static constructs but should we be really caring about the testing span start/end (again similar to logs). I am just writing my thoughts here, we can discuss this and decide, but yes both are doable.

reta commented 1 year ago

Thanks @Gaganjuneja

I believe we should start with as simple API as possible: provider Tracer API to fence the OTel / other impl, wrap OTel spans (we actually never need to refer to IDs directly), no need for additional maps etc. Those won't bring much value (at least initially) but incur overhead for questionable benefits (my opinion).

I think starting a trace should be as simple as adding a log to the logger and didn't want the users to work with spans directly.

That's the Tracer API should abstract away but the difference is that Tracer has state (current trace / span), whereas loggers are usually stateless and could be injected as static instances. Traces and spans do not need to be exposes as is but Tracer should at least provide the ways to start new ones.

I have also thought about this and tried even passing the tracer object to the classes through search context, even if you look at the sample implementation I started from TaskManager itself and SearchOperationListener.

In this case we probably should provide Tracer as injectable service (like all other services).

Somehow, when I think much about adding traces the closest thing comes to my mind is logging. Yes testing is difficult for static constructs but should we be really caring about the testing span start/end (again similar to logs). I am just writing my thoughts here, we can discuss this and decide, but yes both are doable.

There are a number of production ready libraries to learn from, they cover very complex async / reactive / ... paradigms so we could learn from them:

Gaganjuneja commented 1 year ago

Thanks @reta for your follow up.

I believe we should start with as simple API as possible: provider Tracer API to fence the OTel / other impl, wrap OTel spans (we actually never need to refer to IDs directly), no need for additional maps etc. Those won't bring much value (at least initially) but incur overhead for questionable benefits (my opinion).

I agree, to start with we can avoid keeping the map and tracer should return the wrapped span, which the caller can use to end the span. This caller will be having the span reference and we need not to maintain this in a map. Now the tracer's startTrace and endTrace method would look something like this.

public OSSpan startTrace(String name, Map<String,Object> attributes, Level  level);

public void endTrace(OSSpan span);

That's the Tracer API should abstract away but the difference is that Tracer has state (current trace / span), whereas loggers are usually stateless and could be injected as static instances. Traces and spans do not need to be exposes as is but Tracer should at least provide the ways to start new ones.

Agree, tracer as mentioned above will provide the methods to start and end spans/traces.

In this case we probably should provide Tracer as injectable service (like all other services).

A Tracer instance will be initialized at startup and we need to make this instance available to the classes where traces need to be added. Now, there are 2 ways.

  1. Inject Tracer instance to all classes where traces need to be added. All the classes are not using injectable so tracer instance need to be passed over through constructor/requests to low level classes. Which will pollute the code.
    1. The benefit of this approach is that it will provide direct access to the tracer instance so it can easily be mocked for testing.
    2. But we may not want callers to mock the tracer object as we need one tracer object across all unit tests cases to add strict span checking to make sure that all the spans and ended and there is no memory leak;
  2. Through Static holder, We can expose the tracer instance through static holder "TracerFactory.getInstance()" where static holder will give access to the tracer instance. Downside is callers will not be able to mock but like mentioned above we don't want them to do that, also TracerFactory.getInstance() will be written everywhere in the code will also pollute the code a bit.
  3. We can also provide direct access to tracer methods from TracerFactory through delegate and caller need to call as TracerFactory.startTrace(..) to reduce the code pollution by getInstance() method.

your thoughts?

reta commented 1 year ago

Thanks @Gaganjuneja , sounds like a good start

backslasht commented 1 year ago

Thanks @Gaganjuneja for detailing the approach. Few questions/suggestions:

  1. How would the span information passed across different methods? I can think of two options: i) Pass the Span object as an argument to the method where it is required, or ii) Set the Span object in ThreadContext and it can be fetched from the context wherever (methods) it is required. Alternatively, we could support both and let the developers to decide between the options.
  2. On the approach to make the tracer instance available for usage, I like the factory approach wherein factory instance is injected to the classes required and Span is created using startSpan method. This would help in passing NoTracerFactory or NullTracerFactory in test cases. We could also support creating child spans from parent spans which will help avoid passing TracerFactory to all the classes.

Thoughts?

Gaganjuneja commented 1 year ago

Agreed @backslasht.

rishabhmaurya commented 1 year ago

I have added some framework requirements for Span context propagation which needs to be incorporated in ~StartSpan~ StartTrace API. https://github.com/opensearch-project/OpenSearch/issues/7351 It references code changes and documentation for the same.

suranjay commented 1 year ago

@reta I am teaming up with @Gaganjuneja on this change and have raised a draft PR. Please review when you get chance.