reactor / reactor-kotlin-extensions

Reactor Kotlin Support
Apache License 2.0
137 stars 21 forks source link

Apply contextCapture automatically in `await` function #59

Open be-hase opened 11 months ago

be-hase commented 11 months ago

Motivation

This is a similar issue. https://github.com/reactor/reactor-core/issues/3406

For users using kotlin coroutine, ThreadContextElement is used to propagate MDC, etc.

The block operator is supported by Hooks.automaticContextPropagation, but not the await function. Therefore, we are forced to write it this way.

// e.g. WebClient

webClient.post()
    .uri("...")
    // omitted
    .contextCapture()
    .awaitSingle()

It would be great if the await function is also supported, like the block operator.

Reproduction code

I have created a sample project. https://github.com/be-hase/context-propagation-report-202309/tree/main/coroutines-issue

The code is in one file. https://github.com/be-hase/context-propagation-report-202309/blob/main/coroutines-issue/src/main/kotlin/example/CoroutinesIssueApplication.kt

You can start it with the following command.

./gradlew :coroutines-issue:bootRun 

Then, request with curl.

curl localhost:8080

The log shows that the context is not propagating. (see last log)

Controller. MDC={traceId=651e4b5c7f37de9f9425f6f2b46edc94, spanId=9425f6f2b46edc94}, myThreadLocal=d578c95d-5bed-471d-8571-d45493cd9298
Request. MDC={traceId=651e4b5c7f37de9f9425f6f2b46edc94, spanId=9425f6f2b46edc94}, myThreadLocal=d578c95d-5bed-471d-8571-d45493cd9298
Response. MDC={traceId=651e4b5c7f37de9f9425f6f2b46edc94, spanId=bc0acce7efb692e0}, myThreadLocal=null
doOnSuccess. MDC={}, myThreadLocal=null

Desired solution

The await function is implemented on the coroutine side, maybe I should make a request on the coroutine side. https://github.com/Kotlin/kotlinx.coroutines/blob/1ccc96890d38be52d1bdcd31af5befb54804aa05/reactive/kotlinx-coroutines-reactor/src/Mono.kt#L46

But, I found that the following workaround can be done using the ContextInjector mechanism provided by the coroutines side.

package example

import kotlinx.coroutines.InternalCoroutinesApi
import kotlinx.coroutines.reactive.ContextInjector
import org.reactivestreams.Publisher
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
import kotlin.coroutines.CoroutineContext

@OptIn(InternalCoroutinesApi::class)
class ContextCaptureInjector : ContextInjector {
    override fun <T> injectCoroutineContext(publisher: Publisher<T>, coroutineContext: CoroutineContext): Publisher<T> {
        return when (publisher) {
            is Mono -> publisher.contextCapture()
            is Flux -> publisher.contextCapture()
            else -> publisher
        }
    }
}

If this method is acceptable, I will send you a PR.

Considered alternatives

Additional context

I originally submitted my request here. https://github.com/reactor/reactor-core/issues/3563

be-hase commented 3 months ago

Can someone take a look at this issue?

chemicL commented 3 months ago

Hi. Thanks for providing the background and suggestions. I am no Kotlin expert by any means, so I can't speak whether the ContextInjector mechanism combined with @OptIn is a desired one.

For the context-propagation part, note that in reactor-core, before applying the automatic capture, a check whether automatic context propagation is enabled is issued, e.g. in Flux#toIterable blocking method.

For one, the API to make the necessary checks is not public. I imagine it could be worked-around by creating reactor.core.publisher package in reactor-kotlin-extensions providing the necessary access, however now these are strictly coupled -> any change in reactor-core's package-private API could break reactor-kotlin-extensions and that is undesired. Users are free to use varying versions of the libraries and assuming they maintain compatiblity. What does that leave us? Either of these options:

I don't know at this point the answers to the above questions but it just doesn't feel like a straightforward path to resolution. @be-hase If you'd like to make a contribution in this direction, I'd expect the proposal to consist of the answers to the above, a "considered alternatives" to proposed design section, "why would we not want this feature" evaluation and consideration of related work. This one comes to mind: https://github.com/micrometer-metrics/tracing/issues/174 and I can't tell how related that one is, but it feels it is in the same area and has not been resolved for quite some time. Thanks in advance for considering and addressing these remarks!