openzipkin / brave

Java distributed tracing implementation compatible with Zipkin backend services.
Apache License 2.0
2.36k stars 714 forks source link

Kotlin coroutine context propagation #820

Open codefromthecrypt opened 6 years ago

codefromthecrypt commented 6 years ago

At LINE, Xu JunJian noted that it may be tricky to propagate async contexts through co-routines. We should investigate this.

https://kotlinlang.org/docs/reference/coroutines-overview.html

shiraji commented 6 years ago

We have this problem. Is there any workaround for this?

codefromthecrypt commented 6 years ago

@shiraji this problem was described in abstract. Can you give example code that you are using so that we can become closer to fixing? cc @zeagord

shiraji commented 6 years ago

We are using this library from org.springframework.cloud:spring-cloud-gcp-starter-trace. Let me create sample project only using brave.

codefromthecrypt commented 6 years ago

cool. you can do a variant of this if you want https://github.com/openzipkin/brave-webmvc-example

shiraji commented 6 years ago

I tried but I kept getting The parameters 'mainClass' for goal org.codehaus.mojo:exec-maven-plugin:1.6.0:java are missing or invalid

Since I'm not familiar with this project, I could not fix it.

Let me create spring project with spring initializer. https://start.spring.io/

shiraji commented 6 years ago

https://github.com/shiraji/brave-coroutine

Here is the coroutine code with TracingConfiguration. Could you verify this is what you want me to do?

codefromthecrypt commented 6 years ago

ok thanks so in this case you want the code in the GlobalScope.launch to have the same span ID I guess?

shiraji commented 6 years ago

Yes. But since it is different thread, span id is different.

codefromthecrypt commented 6 years ago

some other related info on coroutine for who addresses this https://github.com/square/retrofit/pull/2886/

For the specific use case, we need to figure out any hooks possible for GlobalScope.launch, such that doing launch is the same as explicitly saving off currentContext.get() and re-applying it with currentContext.newScope before the work is run

essh commented 5 years ago

The cleanest way to handle this would be with ThreadContextElement. However, Iโ€™m currently unsure of how to apply this against Brave. Mostly around how to close() the scope returned by newScope when implementing this interface.

Further information is available at https://github.com/Kotlin/kotlinx.coroutines/blob/master/docs/coroutine-context-and-dispatchers.md#thread-local-data

An implementation for MDC is available at https://github.com/Kotlin/kotlinx.coroutines/blob/master/integration/kotlinx-coroutines-slf4j/src/MDCContext.kt which could be a useful reference. One for gRPC Context also exists at https://github.com/marcoferrer/kroto-plus/blob/master/kroto-plus-coroutines/src/main/kotlin/com/github/marcoferrer/krotoplus/coroutines/GrpcContextElement.kt

codefromthecrypt commented 5 years ago

I will try this today

codefromthecrypt commented 5 years ago

example which breaks on /async endpoint

@SpringBootApplication
@Controller
open class Demo @Autowired constructor(val ctx: CurrentTraceContext) {

  @GetMapping("/")
  fun sync(): ResponseEntity<String> {
    return ResponseEntity.ok(ctx.get().traceIdString());
  }

  @GetMapping("/async")
  fun async() = runBlocking {
    GlobalScope.async {
      ResponseEntity.ok(ctx.get().traceIdString());
    }.await()
  }
}

fun main(args: Array<String>) {
  runApplication<Demo>(*args)
}
ackintosh commented 5 years ago

I've added the TracingContextElement which inspired by MDCContext.kt. It looks work for now but I'm worried whether that is correct. (I'm not familiar with Java/Kotlin/etc...)

    @GetMapping("/async")
    fun async() = runBlocking {
-        GlobalScope.async {
+        GlobalScope.async(TracingContextElement()) {
            ResponseEntity.ok(ctx.get().traceIdString())
        }.await()
    }
public typealias TraceContextState = TraceContext?

class TracingContextElement(
        public val contextState: TraceContextState = Tracing.current().currentTraceContext().get()
) : ThreadContextElement<TraceContextState>, AbstractCoroutineContextElement(Key) {

    companion object Key : CoroutineContext.Key<TracingContextElement>

    override fun updateThreadContext(context: CoroutineContext): TraceContextState {
        val oldState = Tracing.current().currentTraceContext().get()
        setCurrent(contextState)
        return oldState
    }

    /** @suppress */
    override fun restoreThreadContext(context: CoroutineContext, oldState: TraceContextState) {
        setCurrent(oldState)
    }

    /** @suppress */
    private fun setCurrent(contextState: TraceContextState) {
        if (contextState == null) {
            // TODO?
            // we should do something?
        } else {
            Tracing.current().currentTraceContext().newScope(contextState)
        }
    }
}
codefromthecrypt commented 5 years ago

@ackintosh thank you .. I started thinking this, too. the problem is that this assumes a thread local is behind the impl, which is almost always the case, but might not be. I'm not saying this isn't the end result, but have been thinking about the problem more. Maybe there's a way to hook more directly in to cause less abstraction break (ex scope is the abstraction broken as scope open should always close)

codefromthecrypt commented 5 years ago

@ackintosh do you know some way to globally register TracingContextElement so people don't need to add this to their code explicitly?

ackintosh commented 5 years ago

Hmm... I'm sorry I don't know that. ๐Ÿค”

ackintosh commented 5 years ago

Related issue: Work with ThreadLocal-sensitive Components ยท Issue #119 ยท Kotlin/kotlinx.coroutines

codefromthecrypt commented 5 years ago

very nice link

codefromthecrypt commented 5 years ago

I think this is the way to do it.. can you please verify?

import brave.propagation.CurrentTraceContext
import brave.propagation.CurrentTraceContext.Scope
import brave.propagation.TraceContext
import kotlinx.coroutines.ThreadContextElement
import kotlin.coroutines.AbstractCoroutineContextElement
import kotlin.coroutines.CoroutineContext

class TracingContextElement(
  val currentTraceContext: CurrentTraceContext,
  val initial: TraceContext = currentTraceContext.get()
) : ThreadContextElement<Scope>, AbstractCoroutineContextElement(Key) {
  companion object Key : CoroutineContext.Key<TracingContextElement>

  override fun updateThreadContext(context: CoroutineContext): Scope {
    return currentTraceContext.maybeScope(initial);
  }

  override fun restoreThreadContext(context: CoroutineContext, scope: Scope) {
    scope.close()
  }
}
ackintosh commented 5 years ago

@adriancole Thank you!

I've tested the TracingContextElement with the following application.

Sample application

package io.github.ackintosh.sleuthtest.controller

import brave.Tracing
import brave.propagation.CurrentTraceContext
import io.github.ackintosh.sleuthtest.TracingContextElement
import kotlinx.coroutines.*
import org.slf4j.LoggerFactory
import org.springframework.stereotype.Controller
import org.springframework.web.bind.annotation.GetMapping
import org.springframework.web.reactive.function.client.WebClient

@Controller
class GreetingController(val builder: WebClient.Builder, val ctx: CurrentTraceContext) {
    val logger = LoggerFactory.getLogger(GreetingController::class.java)
    val client = builder.build()

    fun sendRequest() {
        logger.info("------> WebClient#get")

        client.get()
                .uri("http://127.0.0.1:9999/")
                .retrieve()
                .bodyToMono(String::class.java)
                .subscribe { logger.info("(response)") }

        logger.info("<------ WebClient#get")
    }

    @GetMapping("/hello")
    fun hello() = runBlocking {
        logger.info("<Start>")

        val deferred1 = GlobalScope.async(TracingContextElement(Tracing.current().currentTraceContext())) {
            logger.info("---> sendRquest (1)")
            sendRequest()
            logger.info("<--- sendRquest (1)")
        }
        val deferred2 = GlobalScope.async(TracingContextElement(Tracing.current().currentTraceContext())) {
            logger.info("---> sendRquest (2)")
            sendRequest()
            logger.info("<--- sendRquest (2)")
        }

        deferred1.await()
        deferred2.await()
        logger.info("<End>")
    }
}

Logs

curl http://localhost:8080/hello

2018-12-16 20:13:24.754  INFO [-,cde2d5f80b5efa52,cde2d5f80b5efa52,false] 54048 --- [ctor-http-nio-2] i.g.a.s.controller.GreetingController    : <Start>
2018-12-16 20:13:24.760  INFO [-,cde2d5f80b5efa52,cde2d5f80b5efa52,false] 54048 --- [atcher-worker-1] i.g.a.s.controller.GreetingController    : ---> sendRquest (1)
2018-12-16 20:13:24.760  INFO [-,cde2d5f80b5efa52,cde2d5f80b5efa52,false] 54048 --- [atcher-worker-2] i.g.a.s.controller.GreetingController    : ---> sendRquest (2)
2018-12-16 20:13:24.760  INFO [-,cde2d5f80b5efa52,cde2d5f80b5efa52,false] 54048 --- [atcher-worker-1] i.g.a.s.controller.GreetingController    : ------> WebClient#get
2018-12-16 20:13:24.760  INFO [-,cde2d5f80b5efa52,cde2d5f80b5efa52,false] 54048 --- [atcher-worker-2] i.g.a.s.controller.GreetingController    : ------> WebClient#get
2018-12-16 20:13:24.829  INFO [-,cde2d5f80b5efa52,989a3ff276996021,false] 54048 --- [atcher-worker-1] i.g.a.s.controller.GreetingController    : <------ WebClient#get
2018-12-16 20:13:24.829  INFO [-,cde2d5f80b5efa52,3d9843e9bc53ab25,false] 54048 --- [atcher-worker-2] i.g.a.s.controller.GreetingController    : <------ WebClient#get
2018-12-16 20:13:24.829  INFO [-,cde2d5f80b5efa52,989a3ff276996021,false] 54048 --- [atcher-worker-1] i.g.a.s.controller.GreetingController    : <--- sendRquest (1)
2018-12-16 20:13:24.829  INFO [-,cde2d5f80b5efa52,3d9843e9bc53ab25,false] 54048 --- [atcher-worker-2] i.g.a.s.controller.GreetingController    : <--- sendRquest (2)
2018-12-16 20:13:24.833  INFO [-,cde2d5f80b5efa52,cde2d5f80b5efa52,false] 54048 --- [ctor-http-nio-2] i.g.a.s.controller.GreetingController    : <End>
2018-12-16 20:13:24.911  INFO [-,cde2d5f80b5efa52,cde2d5f80b5efa52,false] 54048 --- [ctor-http-nio-1] i.g.a.s.controller.GreetingController    : (response)
2018-12-16 20:13:24.941  INFO [-,cde2d5f80b5efa52,cde2d5f80b5efa52,false] 54048 --- [ctor-http-nio-2] i.g.a.s.controller.GreetingController    : (response)
codefromthecrypt commented 5 years ago

@ackintosh can you use a different http client? I want to make sure there's no bug in the WebClient instrumentation which might be leaking its state. Not saying there is, just it is possible.

codefromthecrypt commented 5 years ago

ps use (TracingContextElement(ctx) not (TracingContextElement(Tracing.current().currentTraceContext())), though this should have nothing to do with the problem

ackintosh commented 5 years ago

(Updated)

@adriancole I've checked again with Fuel.

Diffs from the sample application

    fun sendRequest() {
        logger.info("------> WebClient#get")

+ //        client.get()
+ //                .uri("http://127.0.0.1:9999/")
+ //                .retrieve()
+ //                .bodyToMono(String::class.java)
+ //                .subscribe { logger.info("(response)") }

+       Fuel.get("http://127.0.0.1:9999/").responseString { request, response, result ->
+           when (result) {
+               is Result.Failure -> {
+                   val ex = result.getException()
+                   logger.info("(response)")
+               }
+               is Result.Success -> {
+                   val data = result.get()
+                   logger.info("(response)")
+               }
+           }
+       }

        logger.info("<------ WebClient#get")
    }

(Used TracingContextElement(ctx) instead of TracingContextElement(Tracing.current().currentTraceContext()) )

Logs

2018-12-17 13:14:06.159  INFO [-,860570cdd0f85989,860570cdd0f85989,false] 17396 --- [ctor-http-nio-2] i.g.a.s.controller.GreetingController    : <Start>
2018-12-17 13:14:06.159  INFO [-,860570cdd0f85989,860570cdd0f85989,false] 17396 --- [atcher-worker-2] i.g.a.s.controller.GreetingController    : ---> sendRquest (2)
2018-12-17 13:14:06.159  INFO [-,860570cdd0f85989,860570cdd0f85989,false] 17396 --- [atcher-worker-1] i.g.a.s.controller.GreetingController    : ---> sendRquest (1)
2018-12-17 13:14:06.159  INFO [-,860570cdd0f85989,860570cdd0f85989,false] 17396 --- [atcher-worker-2] i.g.a.s.controller.GreetingController    : ------> WebClient#get
2018-12-17 13:14:06.159  INFO [-,860570cdd0f85989,860570cdd0f85989,false] 17396 --- [atcher-worker-1] i.g.a.s.controller.GreetingController    : ------> WebClient#get
2018-12-17 13:14:06.160  INFO [-,860570cdd0f85989,860570cdd0f85989,false] 17396 --- [atcher-worker-2] i.g.a.s.controller.GreetingController    : <------ WebClient#get
2018-12-17 13:14:06.160  INFO [-,860570cdd0f85989,860570cdd0f85989,false] 17396 --- [atcher-worker-1] i.g.a.s.controller.GreetingController    : <------ WebClient#get
2018-12-17 13:14:06.160  INFO [-,860570cdd0f85989,860570cdd0f85989,false] 17396 --- [atcher-worker-2] i.g.a.s.controller.GreetingController    : <--- sendRquest (2)
2018-12-17 13:14:06.160  INFO [-,860570cdd0f85989,860570cdd0f85989,false] 17396 --- [atcher-worker-1] i.g.a.s.controller.GreetingController    : <--- sendRquest (1)
2018-12-17 13:14:06.160  INFO [-,860570cdd0f85989,860570cdd0f85989,false] 17396 --- [ctor-http-nio-2] i.g.a.s.controller.GreetingController    : <End>
2018-12-17 13:14:06.171  INFO [-,,,] 17396 --- [      Thread-38] i.g.a.s.controller.GreetingController    : (response)
2018-12-17 13:14:06.181  INFO [-,,,] 17396 --- [      Thread-40] i.g.a.s.controller.GreetingController    : (response)

The last two are weird. Missed both of TraceID and SpanID. ๐Ÿค”

2018-12-17 13:14:06.171 INFO [-,,,] 17396 --- [ Thread-38] i.g.a.s.controller.GreetingController : (response) 2018-12-17 13:14:06.181 INFO [-,,,] 17396 --- [ Thread-40] i.g.a.s.controller.GreetingController : (response)

ackintosh commented 5 years ago

Arf, sorry the test above was insufficient... I'll update the comment.

ackintosh commented 5 years ago

Updated https://github.com/openzipkin/brave/issues/820#issuecomment-447716628

codefromthecrypt commented 5 years ago

could possibly be a bug. worth trying to isolate into a non kotlin test in sleuth.

I will be offline after tomorrow for holiday fyi

this feature can go into brave after tests etc but people can paste for now as it is small code. we have people pasting hystrix for same reason.

ackintosh commented 5 years ago

Yeah, I'll paste the code to avoid the issue for now. ๐Ÿ“


Enjoy your holiday. ๐Ÿ˜‰

shiraji commented 5 years ago

FYI

We are already using other CoroutineContext like this.

withContext(context) {
   foo()
}

To enable brave with TracingContextElement, we did like this

withContext(context + TracingContextElement(Tracing.current().currentTraceContext())) {
    foo()
}

This perfectly works.

Thanks guys!

codefromthecrypt commented 5 years ago

@shiraji question: do you need to reference like Tracing.current().currentTraceContext() or is it just that you have no handy reference to CurrentTraceContext and doing so with the current tracing component is more convenient?

shiraji commented 5 years ago

We don't have handy reference to the context object.

TobiasBales commented 5 years ago

Is there any update on this/a preferred solution?

codefromthecrypt commented 5 years ago

Last I heard.. https://github.com/openzipkin/brave/issues/820#issuecomment-447614394 works for @shiraji and unknown for @ackintosh as it is not clear on the latter whether instrumentation was of concern or the context impl. @TobiasBales you can try to be a tie breaker? paste the code and try? It isn't much code... anyway next step after there's a clear yes/no on this impl would be to add a directory for context-kotlin with tests

powturns commented 5 years ago

One problem with the above solution is the following:

  1. Coroutine executing
  2. A new scoped span is created
  3. Coroutine suspended
  4. Coroutine resumed

When the coroutine resumes, rather than applying the context of the scoped span that was created in (2) it will apply the original context.

The following appears to pass my unit tests:

typealias TraceContextHolder = CurrentTraceContext
typealias TraceContextState = TraceContext?

class TracingContextElement(
        private val traceContextHolder: TraceContextHolder = Tracing.current().currentTraceContext(),
        private var context: TraceContextState = traceContextHolder.get()
) : ThreadContextElement<Scope>, AbstractCoroutineContextElement(Key) {
    companion object Key : CoroutineContext.Key<BraveTracingContextElement>

    override fun updateThreadContext(context: CoroutineContext): Scope {
        return traceContextHolder.maybeScope(this.context)
    }

    override fun restoreThreadContext(context: CoroutineContext, oldState: Scope) {
        // check to see if, during coroutine execution, the context was updated
        traceContextHolder.get()?.let {
            if (it != this.context) {
                this.context = it
            }
        }

        oldState.close()
    }
}
anuraaga commented 5 years ago

I don't know if this applies to Kotlin, but just in case - in Armeria we use a custom implementation of CurrentTraceContext which directly manipulates Armeria's request context for state. Previously we used the approach of remounting the default CurrentTraceContext when activating a request context but this method is much simpler and less corner cases to work through.

https://github.com/line/armeria/blob/master/brave/src/main/java/com/linecorp/armeria/common/brave/RequestContextCurrentTraceContext.java

I would imagine something similar could happen for Kotlin, where a CurrentTraceContext implementation directly stores into the coroutine's context but don't know enough about Kotlin to know details. Does this sound reasonable to anyone?

codefromthecrypt commented 4 years ago

My guess is we need to focus on this again, as people have a lot more experience with kotlin since when this was first opened. Also docs seem good on integration options. https://kotlinlang.org/docs/reference/coroutines/coroutine-context-and-dispatchers.html#thread-local-data

bjartek commented 4 years ago

I played around with this and pinged on gitter.im triggering the post @adriancole did above.

After trying the code above here, I found that I did not really need it to propagate traces correctly. However I needed to add MDCContext to propagate my own MDC field User correctly. However when I log as part of an reactor chain. That is in a WebClient call before I return await it will not log out my MDC context but it will lot out the correct traces and spans from brave.

My code is here: https://github.com/bjartek/webflux-playground

In order to test start up the application and run http :8080/auth/bar Authorization:"Bearer foo" The result of that run against (cb97de9) is at https://github.com/bjartek/webflux-playground/blob/master/after-mdccontext.log

Note that I use a custom logback.xml so my log output is not the same as the above mentions. An example with the standard logback format is: (irrelevant lines removed)

2020-02-29 15:49:11.588 INFO [,a7fbc6f4d74e58b0,a7fbc6f4d74e58b0,false] 4611 --- [oundedElastic-1] o.b.w.WebfluxRefappApplication : Auth bar begin 2020-02-29 15:49:11.736 INFO [,a7fbc6f4d74e58b0,fa2f1e55f4c59245,false] 4611 --- [oundedElastic-1] o.b.w.WebfluxRefappApplication : {authfoo=bar} 2020-02-29 15:49:11.817 INFO [,a7fbc6f4d74e58b0,a7fbc6f4d74e58b0,false] 4611 --- [or-http-epoll-4] o.b.w.WebfluxRefappApplication : Next 2020-02-29 15:49:11.819 INFO [,a7fbc6f4d74e58b0,a7fbc6f4d74e58b0,false] 4611 --- [or-http-epoll-4] o.b.w.WebfluxRefappApplication : Auth bar ends

bjartek commented 4 years ago

For the record I just tried to remove spring-cloud-sleuth from my example and the pattern is still the same for me then.

bjartek commented 4 years ago

My example https://github.com/bjartek/webflux-playground now works the way I want it. The comment above is resolved in the latest commit.

codefromthecrypt commented 4 years ago

@bjartek thanks again for keeping us posted with your research!

wlezzar commented 4 years ago

Hi @adriancole : ) . I just tested the coroutine context element you suggested here: https://github.com/openzipkin/brave/issues/820#issuecomment-447614394

It works perfectly for me when I use the CurrentTraceContext.Default.create(). My traces are perfectly well formed when used inside coroutines.

However it doesn't work correctly when I use CurrentTraceContext.Default.inheritable(). I guess this is because it doesn't reuse the same thread local variable?

Anyway, thanks a lot for the snippet. If you want me to do some tests, do not hesitate. Any chance this would be merged into the main code base soon?

codefromthecrypt commented 4 years ago

@wlezzar if you are up to it, make a pull request for a new module in the context directory. There will be some other tasks, like a polished README etc, but I'm sure everyone here will :heart: you for it!

wlezzar commented 4 years ago

I would love to contribute. I will try to find the time for it : )

asarkar commented 4 years ago

I tried the shown TracingContextElement for gRPC, and it didn't work.

ServerInterceptor:

return runBlocking {
            withContext(
                Dispatchers.Unconfined + TracingContextElement(
                    Tracing.current().currentTraceContext()
                )
            ) {
                Contexts.interceptCall(ctx.withValue(AppGlobal.USER_ID_CONTEXT_KEY, userId), call, headers, next)
            }
        }

The trace and the span are empty in log statements in the gRPC service. Nothing changes if I replace Tracing.current().currentTraceContext() with tracing.currentTraceContext(), where tracing is autowired.

jiraguha commented 3 years ago

Things suggested by @codefromthecrypt comment worked for me. @asarkar not sure what you try to do with Grpc. I use LogNet/grpc-spring-boot-starter impl. For the server extending grpc-kotlin coroutines, I register a bean using a CoroutineContextServerInterceptor with a TracingContextElement:

    @Bean
    @GRpcGlobalInterceptor
    fun coroutineTraceInterceptor(): ServerInterceptor {
        return object : CoroutineContextServerInterceptor() {
            override fun coroutineContext(
                call: ServerCall<*, *>,
                headers: Metadata,
            ): CoroutineContext {
                return TracingContextElement(Tracing.current().currentTraceContext())
            }
        }
    }

For the client, I use the SpringAwareManagedChannelBuilder bean!

asarkar commented 3 years ago

@jiraguha Happy to hear that it worked for you, although that probably means you've a use case that doesn't do one or all of the following:

  1. Modify server context in a ServerInterceptor.
  2. Launch user coroutines, perhaps within a new scope.
  3. Use ClientInterceptor.
  4. Make non gRPC calls using libraries that launch their own cororutines (like Ktor HTTP client, or a DB call).

Unless all of the above cases work in tandem, it can't be said official support for Kotlin gRPC has arrived.

jiraguha commented 3 years ago

@asarkar, Your supposition is correct, I decided to use coroutines within a narrow scope and interface it with project-reactor by default. I favor the simplicity of a functional code.

Not sure that all those points are related to this current issue only. Maybe you can create a small project with all problems, not sure I will have the answer to your problems but it will certainly be interesting.

asarkar commented 3 years ago

@jiraguha https://github.com/openzipkin/zipkin-support/issues/40#issuecomment-708070670 said I interrupted someone's breakfast by reporting the issue. Since the beauty of OSS is there are always more than one players in the game, I decided to spend my time working with OpenTelemetry, and have no interest in creating a project to try out Brave. I responded to your comment because you mentioned me as if you've solved the issue I had reported, where you didn't at all since your use case is very different and most likely simpler.

ilya40umov commented 11 months ago

For anybody who is using Spring Boot / Micrometer Tracing (with a bridge to Brave) the topic of traceId propagation in coroutines is also being discussed here: https://github.com/micrometer-metrics/tracing/issues/174

Micrometer basically provides an implementation of ThreadContextElement for this case, which is similar to TracingContextElement posted in this thread.