camunda / camunda

Process Orchestration Framework
https://camunda.com/platform/
3.35k stars 612 forks source link

Distributed Tracing #11417

Closed npepinpe closed 1 year ago

npepinpe commented 1 year ago

Description

This issue is mostly to document the outcome of our hack week where we set up distributed tracing in order to trace commands in the system.

This is not an issue which will describe what we should do, but rather what we did, open questions, etc. We should refer to it again when we're approaching this as a feature.

npepinpe commented 1 year ago

We decided to trace all commands in the system. We tried two different approaches:

  1. Hierarchical: a user sends a command which starts a trace, it continues in the gateway, then in the broker, etc. Every follow up command is a child of the previous command, resulting in a cascade that is interrupted only when we reach a wait state. In this way, almost all commands are triggered by an external client.
  2. Isolated: a user sends a command, which starts a trace, continues in the gateway, and finishes as soon as it's processed and the response is sent back. Follow up commands are traced individually, separately: the trace starts when the command is created (but not written), and finishes when the command is processed. Span links are used to denote causality between commands.

Our conclusion is the second approach is best for us, as we're mostly interested in tracing commands. While it would be great to trace a complete process instance lifecycle, neither approach allows this easily (not without keep span contexts in the state). The flattened approach should anyway provide us with all the relevant information we need.

npepinpe commented 1 year ago

In order to propagate context across all boundaries (both threads and processes), our approach was to use the built-in TextMap propagator for the gRPC part, and write our own custom serialization code in SBE for internal usage.

The SBE stuff was written in its own schema, though we did not do baggage propagation yet.

otel.xml ```xml ```
SbeSpanContextReader.java ```java /* * Copyright Camunda Services GmbH and/or licensed to Camunda Services GmbH under * one or more contributor license agreements. See the NOTICE file distributed * with this work for additional information regarding copyright ownership. * Licensed under the Zeebe Community License 1.1. You may not use this file * except in compliance with the Zeebe Community License 1.1. */ package io.camunda.zeebe.protocol.impl.otel; import io.camunda.zeebe.protocol.otel.MessageHeaderDecoder; import io.camunda.zeebe.protocol.otel.SpanContextDecoder; import io.camunda.zeebe.util.buffer.BufferReader; import io.opentelemetry.api.trace.SpanContext; import io.opentelemetry.api.trace.SpanId; import io.opentelemetry.api.trace.TraceFlags; import io.opentelemetry.api.trace.TraceId; import io.opentelemetry.api.trace.TraceState; import org.agrona.DirectBuffer; public final class SpanContextReader implements BufferReader { private final MessageHeaderDecoder headerDecoder = new MessageHeaderDecoder(); private final SpanContextDecoder bodyDecoder = new SpanContextDecoder(); private SpanContext context; @Override public void wrap(final DirectBuffer buffer, final int offset, final int length) { bodyDecoder.wrapAndApplyHeader(buffer, offset, headerDecoder); final var traceIdBytes = new byte[SpanContextDecoder.traceIdLength()]; bodyDecoder.getTraceId(traceIdBytes, 0); final var traceId = TraceId.fromBytes(traceIdBytes); final var spanId = SpanId.fromLong(bodyDecoder.spanId()); final var traceFlags = TraceFlags.fromByte(bodyDecoder.traceFlags()); final var traceState = decodeTraceState(); // TODO: we assume that deserialization means we're tracing a remote trace, which is not // necessarily true, but for now it's simpler to assume this context = SpanContext.createFromRemoteParent(traceId, spanId, traceFlags, traceState); } public SpanContext context() { return context; } private TraceState decodeTraceState() { final var builder = TraceState.builder(); var stateDecoder = bodyDecoder.traceState(); while (stateDecoder.hasNext()) { builder.put(stateDecoder.key(), stateDecoder.value()); stateDecoder = stateDecoder.next(); } return builder.build(); } } ```
SbeSpanContextWriter.java ```java /* * Copyright Camunda Services GmbH and/or licensed to Camunda Services GmbH under * one or more contributor license agreements. See the NOTICE file distributed * with this work for additional information regarding copyright ownership. * Licensed under the Zeebe Community License 1.1. You may not use this file * except in compliance with the Zeebe Community License 1.1. */ package io.camunda.zeebe.protocol.impl.otel; import io.camunda.zeebe.protocol.otel.MessageHeaderEncoder; import io.camunda.zeebe.protocol.otel.SpanContextEncoder; import io.camunda.zeebe.protocol.otel.SpanContextEncoder.TraceStateEncoder; import io.camunda.zeebe.util.buffer.BufferWriter; import io.opentelemetry.api.internal.OtelEncodingUtils; import io.opentelemetry.api.trace.SpanContext; import java.nio.charset.StandardCharsets; import org.agrona.MutableDirectBuffer; import org.agrona.collections.MutableInteger; public final class SpanContextWriter implements BufferWriter { private final MessageHeaderEncoder headerEncoder = new MessageHeaderEncoder(); private final SpanContextEncoder bodyEncoder = new SpanContextEncoder(); private SpanContext context; private int length = headerEncoder.encodedLength() + bodyEncoder.sbeBlockLength(); public SpanContextWriter context(final SpanContext context) { this.context = context; length = headerEncoder.encodedLength() + bodyEncoder.sbeBlockLength() + TraceStateEncoder.sbeHeaderSize() + computeTraceStateLength(); return this; } public SpanContext context() { return context; } @Override public int getLength() { return length; } @Override public void write(final MutableDirectBuffer buffer, final int offset) { bodyEncoder .wrapAndApplyHeader(buffer, offset, headerEncoder) .putTraceId(context.getTraceIdBytes(), 0) .spanId(OtelEncodingUtils.longFromBase16String(context.getSpanId(), 0)) .traceFlags(context.getTraceFlags().asByte()); final var traceState = context.getTraceState(); final var stateEncoder = bodyEncoder.traceStateCount(traceState.size()); traceState.forEach((key, value) -> stateEncoder.key(key).value(value).next()); } private int computeTraceStateLength() { if (context == null || context.getTraceState() == null || context.getTraceState().isEmpty()) { return 0; } final var state = context.getTraceState(); final var length = new MutableInteger(0); state.forEach((key, value) -> computeTraceStateEntryLength(length, key, value)); return length.get(); } private void computeTraceStateEntryLength( final MutableInteger length, final String key, final String value) { final int keyLength = key.getBytes(StandardCharsets.US_ASCII).length; final var valueLength = value.getBytes(StandardCharsets.US_ASCII).length; length.addAndGet( TraceStateEncoder.sbeBlockLength() + TraceStateEncoder.keyHeaderLength() + keyLength + TraceStateEncoder.valueHeaderLength() + valueLength); } } ```

The span context would then be serialized from the gateway to the broker as part of the ExecuteCommandRequest (this could also be applied to other such commands by writing the span context as a nested varDataEncoding property). For cross thread boundaries, we would serialize it then on the command in its RecordMetadata, again as a nested field.

Note One caveat, in many places we reuse RecordMetadata instances, so be careful when passing the span context directly from it and make sure it's immutable!

npepinpe commented 1 year ago

Adding the span context to the record metadata allowed us to quickly deal with batching/aggregation of commands at certain points. For example, in the Raft thread, when appending or replicating an entry, you could quickly check if the data writer (e.g. BufferWriter) of the application entry is an instance of SbeContextProvider, then iterate of these and create a fresh span for each. Something like:

    final List<Span> spans = new ArrayList<>();
    if (data instanceof SpanContextProvider provider) {
      provider
          .spanContexts()
          .forEach(
              sc -> {
                final var spanBuilder =
                    tracer.spanBuilder("appendEntry").setSpanKind(SpanKind.SERVER);
                if (sc.isValid()) {
                  final var parentContext = Context.current().with(Span.wrap(sc));
                  spanBuilder.setParent(parentContext);
                  spanBuilder.setAttribute("partitionId", String.valueOf(raft.getPartitionId()));

                  spans.add(spanBuilder.startSpan());
                }
              });
    }

    raft.getThreadContext()
        .execute(
            () -> {
              try {
                safeAppendEntry(
                    new UnserializedApplicationEntry(lowestPosition, highestPosition, data),
                    appendListener,
                    spans);
              } finally {
                CloseHelper.quietCloseAll(spans.stream().map(s -> (AutoCloseable) s::end).toList());
              }
            });

This is quite ugly, and I hope we can find a more convenient way of doing it, but I think the concept will be similar (i.e. iterate over a bunch of aggregated span contexts, and create a span for each, then close all at the end).

npepinpe commented 1 year ago

Open questions would be:

  1. Find proper root trace points (e.g. timer trigger commands, IPC stuff, etc.) that aren't user or follow up commands
  2. Should we propagate baggage?
  3. Find interesting span points
  4. Find convenient API to deal with batching/aggregation
npepinpe commented 1 year ago

Closing, we can open it again when we're working on this. @megglos feel free to add anything I forgot.