spring-projects / spring-framework

Spring Framework
https://spring.io/projects/spring-framework
Apache License 2.0
56.14k stars 37.96k forks source link

@Scheduled task instrumentation does not work for Kotlin suspend functions #32165

Closed pwestlin closed 1 month ago

pwestlin commented 7 months ago

Spring docs for Scheduled tasks instrumentation states:

An Observation is created for each execution of an @Scheduled task.

This function get an automatic observation:

@Scheduled(fixedDelay = 5, timeUnit = TimeUnit.SECONDS)
fun nonSuspendable() {
    logger.info("Not suspendable")
}

but this suspend function does not:

@Scheduled(fixedDelay = 5, timeUnit = TimeUnit.SECONDS)
suspend fun suspendable() {
    logger.info("Suspendable")
}

I use Spring Boot 3.2.2 and I've also tried 3.2.3-SNAPSHOT and 3.3.0-M1.

build.gradle.kts (shortened):

plugins {
    id("org.springframework.boot") version "3.2.2"
    id("io.spring.dependency-management") version "1.1.4"
    kotlin("jvm") version "1.9.22"
    kotlin("plugin.spring") version "1.9.22"
}

java {
    sourceCompatibility = JavaVersion.VERSION_21
}

repositories {
    mavenCentral()
}

dependencies {
    implementation("org.springframework.boot:spring-boot-starter-webflux")
    implementation("org.springframework.boot:spring-boot-starter-actuator")
    implementation("com.fasterxml.jackson.module:jackson-module-kotlin")
    implementation("io.projectreactor.kotlin:reactor-kotlin-extensions")
    implementation("org.jetbrains.kotlin:kotlin-reflect")
    implementation("org.jetbrains.kotlinx:kotlinx-coroutines-reactor")

    implementation("io.micrometer:micrometer-tracing")
    implementation("io.micrometer:micrometer-tracing-bridge-brave")

    testImplementation("org.springframework.boot:spring-boot-starter-test")
    testImplementation("io.projectreactor:reactor-test")
}

Application code:

@SpringBootApplication
@EnableScheduling
class Application

fun main(args: Array<String>) {
    Hooks.enableAutomaticContextPropagation()
    runApplication<Application>(*args)
}

@Service
class SchedulingService {

    private val logger: Logger = LoggerFactory.getLogger(this.javaClass)

    @Scheduled(fixedDelay = 5, timeUnit = TimeUnit.SECONDS)
    fun nonSuspendable() {
        logger.info("Not suspendable")
    }

    @Scheduled(fixedDelay = 5, timeUnit = TimeUnit.SECONDS)
    suspend fun suspendable() {
        logger.info("Suspendable")
    }
}

When I run the application I get the following output:

2024-01-31T07:33:26.925+01:00  INFO [65b9e9b65a4ecd1702feecf2dbdd6be4,02feecf2dbdd6be4] 926648 --- [   scheduling-1] [65b9e9b65a4ecd1702feecf2dbdd6be4-02feecf2dbdd6be4] n.w.s.SchedulingService                  : Not suspendable
2024-01-31T07:33:26.946+01:00  INFO [,] 926648 --- [   scheduling-1] [                                                 ] n.w.s.SchedulingService                  : Suspendable

There we can se that "65b9e9b65a4ecd1702feecf2dbdd6be4,02feecf2dbdd6be4" means that the function "nonSuspendable" gets an observation but "suspendable" doesn't.

Regards Peter

bclozel commented 7 months ago

@sdeleuze and I investigated this issue and we've found that this is not a simple Spring Framework issue, but rather a broader problem with Kotlin Coroutines and Observability. This issue explored in micrometer-metrics/tracing#174.

How this works with plain Spring MVC

With Spring MVC, when an observation is created, we can then use it to open a scope around some code execution. The opening of this scope is handled by the tracing infrastructure and sets up the relevant ThreadLocal and MDC values. The logging statements, when executed, have all the information in context and you can see traceId and spanId.

How this works with Reactor Mono and Flux

With Reactor, work can be scheduled on any worker thread so this is not as straightforward. Reactor is using the Micrometer context-propagation library to propagate the information across contexts. Here, our instrumentation is putting the current observation in the Reactor ContextView under the ObservationThreadLocalAccessor.KEY key. If the context propagation feature is enabled (typically in Spring Boot with spring.reactor.context-propagation=auto, then Reactor uses the registered context propagation ThreadLocalAccessor to get the information from the ContextView and temporarily restore it as ThreadLocal in the current scope. The ObservationThreadLocalAccessor takes care of opening and closing the observation scope, which sets up the MDC as expected.

With Kotlin Coroutines

In this case, the reactor context is propagated by the org.jetbrains.kotlinx:kotlinx-coroutines-reactor library automatically, but under a specific key. Unlike Reactor's ContextView, keys are not String-based but rather must implement the CoroutineContext.Key interface. This means that you can get the entire Reactor context from the current coroutine context and get values from it.

But unlike Reactor, there is no automatic integration with context propagation, ThreadLocals or the MDC. In fact, it seems Kotlin Coroutines expect users to directly interact with the context to get values, or compose with a CoroutineContext implementation that implements ThreadContextElement to set and restore context values.

I have managed to get this working with a custom function that leverages existing Micrometer infrastructure:

import io.micrometer.core.instrument.kotlin.asContextElement
import io.micrometer.observation.Observation
import io.micrometer.observation.contextpropagation.ObservationThreadLocalAccessor
import kotlinx.coroutines.reactor.ReactorContext
import kotlinx.coroutines.withContext
import org.slf4j.LoggerFactory
import org.springframework.scheduling.annotation.Scheduled
import org.springframework.stereotype.Component
import reactor.util.context.ContextView
import java.util.concurrent.TimeUnit
import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.coroutineContext

@Component
class SchedulingService {

    companion object {
        val logger = LoggerFactory.getLogger(SchedulingService::class.java.name)
    }

    @Scheduled(fixedDelay = 5, timeUnit = TimeUnit.SECONDS)
    suspend fun suspendable() {
        withContext(observationContext(coroutineContext)) {
            logger.info("Suspendable")
        }
    }

    fun observationContext(context: CoroutineContext) : CoroutineContext {
        // get the Reactor context from the CoroutineContext
        val contextView = context[ReactorContext]!!.context as ContextView
        // this context contains the current observation under this well known key
        // because the @Scheduled instrumentation contributed it
        val observation = contextView.get(ObservationThreadLocalAccessor.KEY) as Observation
        // we can then use this Micrometer context to wrap the execution
        // the observation scope and MDC will be taken care of
        return observation.observationRegistry.asContextElement()
    }
}

This is by no means the solution we're advertizing and we're not sure how to tackle this problem at this point. I'm personally not convinced that we should restore ThreadLocals and MDC transparently for developers, as the kotlinx library does not do this in the first place.

I'm leaving this issue opened for now because we might want to revisit the Coroutine to Publisher arrangement in this case, in order to pass a custom CoroutineContext. But this issue should be mainly discussed and tackled with the Micrometer team.

bclozel commented 7 months ago

I think I've refined a solution that could be contributed to the context-propagation project.

This ThreadContextElement will help Kotlin apps to lift the ReactorContext from the CoroutineContext if it's there, or just capture the context from the local ThreadLocals:

package io.mircrometer.context

import io.micrometer.context.ContextRegistry
import io.micrometer.context.ContextSnapshot
import io.micrometer.context.ContextSnapshotFactory
import kotlinx.coroutines.ThreadContextElement
import kotlinx.coroutines.reactor.ReactorContext
import reactor.util.context.ContextView
import kotlin.coroutines.AbstractCoroutineContextElement
import kotlin.coroutines.CoroutineContext

class PropagationContextElement(private val context: CoroutineContext) : ThreadContextElement<ContextSnapshot.Scope>,
    AbstractCoroutineContextElement(Key) {

    public companion object Key : CoroutineContext.Key<PropagationContextElement>

    val contextSnapshot: ContextSnapshot
        get() {
            val contextView: ContextView? = context[ReactorContext]?.context
            val contextSnapshotFactory =
                ContextSnapshotFactory.builder().contextRegistry(ContextRegistry.getInstance()).build()
            if (contextView != null) {
                return contextSnapshotFactory.captureFrom(contextView)
            }
            return contextSnapshotFactory.captureAll()
        }

    override fun restoreThreadContext(context: CoroutineContext, scope: ContextSnapshot.Scope) {
        scope.close()
    }

    override fun updateThreadContext(context: CoroutineContext): ContextSnapshot.Scope {
        return contextSnapshot.setThreadLocals()
    }
}

Using it in the application leverages the context-propagation ThreadLocalAccessors as Reactor does and does not need to depend on Micrometer Observation or MDC.

    @Scheduled(fixedDelay = 5, timeUnit = TimeUnit.SECONDS)
    suspend fun suspendable() {
        withContext(PropagationContextElement(coroutineContext)) {
            logger.info("Suspendable")
        }
    }

WDYT @sdeleuze ?

sdeleuze commented 7 months ago

I think I like the direction taken by this proposal as Spring support for Coroutines is tightly linked to the Reactive support, and the scope of this issue is probably wider than scheduling support and could potentially solved what has been discussed for months in https://github.com/micrometer-metrics/tracing/issues/174.

But we should experiment and discuss about where and how this potential feature should be provided and integrated. To make this feature usable, I think we should find a way to configure it automatically.

As proposed by @antechrestos in this comment, I am wondering if we could use this kind of facility to provide that support seamlessly at Spring level.

For example in org.springframework.web.reactive.result.method.InvocableHandlerMethod, should we change:

if (coroutineContext == null) {
    return CoroutinesUtils.invokeSuspendingFunction(method, target, args);
}
else {
    return CoroutinesUtils.invokeSuspendingFunction((CoroutineContext) coroutineContext, method, target, args);
}

To something like:

if (coroutineContext == null) {
    return CoroutinesUtils.invokeSuspendingFunction(PropagationContextElement(Dispatchers.getUnconfined()), method, target, args);
}
else {
    return CoroutinesUtils.invokeSuspendingFunction(PropagationContextElement((CoroutineContext) coroutineContext), method, target, args);
}

and do the same for ScheduledAnnotationReactiveSupport?

IMO that could provide the level of integration people expect from Spring and would be consistent with what we do on Reactive side.

If provided at context-propagation level, we should make sure this will be provided in a dependency we can use in Spring Java code (potentially using defensive classpath check and nest class to ensure this remains optional).

Any thoughts?

ilya40umov commented 6 months ago

@sdeleuze btw, I guess you meant to write something like this (you need to use plus operation to "mix in" the new element):

var context = coroutineContext != null ? (CoroutineContext) coroutineContext : Dispatchers.getUnconfined();
return CoroutinesUtils.invokeSuspendingFunction(context.plus(PropagationContextElement(context)), method, target, args);

Either way, I would totally love having something like PropagationContextElement at our disposal, as right now the only option to propagate the observation (and thus the trace) is via KotlinObservationContextElement from Micrometer Observation library.

However, KotlinObservationContextElement only interacts with ObservationThreadLocalAccessor, and thus does nothing to handle ObservationAwareSpanThreadLocalAccessor from Micrometer Tracing.

If Spring would then add this element to the context in all cases when suspending Kotlin code is called (i.e. WebFlux Controller methods, coRouter, @Scheduled etc.) it would hopefully take care of all ThreadLocalAccessors at once.

I am wondering if KotlinObservationContextElement would also work for the cases when a child observation / span etc. is created and needs to be put into the scope. E.g. I'm currently trying to add some more Kotlin-friendly API to Observation in in https://github.com/micrometer-metrics/micrometer/pull/4823, and one of the main use-cases for this API is to be able to create a nested observation (e.g. to have a new span to demarcate a section of the code).


Edit: I realized that this context element may need to be added earlier, as something like CoWebFilter would already require it to be in the context.

And as a side note, looks like there are multiple places in Spring Framework which are all creating "an initial" coroutine context, which right now happens to be just consist of Dispatchers.Unconfined. I am wondering if this should also be abstracted away and all of these places should then just call this abstraction to get an initial context (if none is was created so far in any of the filters etc.).

bclozel commented 6 months ago

I have discussed this with the Micrometer team and finding a proper home for PropagationContextElement is not that easy. This class depends on io.micrometer:context-propagation and org.jetbrains.kotlinx:kotlinx-coroutines-reactor (which itself depends on kotlin and io.projectreactor:reactor-core).

grassehh commented 3 months ago

Hello, I seem to have found another context propagation issue when using proxied beans. For example suppose you have the following controller:

 @GetMapping("test")
 @ResponseStatus(OK
 suspend fun test() {
     CoroutineScope(SupervisorJob()).launch(observationRegistry.asContextElement()) {
         proxiedService.doSomethingSuspend()
     }
 }

And proxiedService bean is a simple service that just makes a WebClient call:

class ProxiedServiceImpl(private val webClient: WebClient) : ProxiedService {
    override suspend fun doSomethingSuspend() {
        webClient.get()
            .uri("https://www.google.fr")
            .awaitExchange {
                println(MDC.getCopyOfContextMap())
            }
    }
}

and it is proxied using AutoProxyCreator:

@Bean
fun autoProxyCreator() = BeanNameAutoProxyCreator().apply {
    setBeanNames("proxiedService")
    setInterceptorNames("dummyInterceptor")
}

@Bean
fun dummyInterceptor(observationRegistry: ObservationRegistry) = MethodInterceptor { invocation -> invocation.proceed() }

Then the MDC context will be empty in the awaitExchange. To make it work, I need to pass the whole parent coroutine context to the child coroutine instead of just observationRegistry.asContextElement().

If the service is not a proxy, it works fine. Also if the service is not called inside the coroutine, but rather directly inside the controller method, it works with the proxy too.

So I wonder if there is not an issue with Spring AOP with observability.

ilya40umov commented 3 months ago

@grassehh I'm just curious if this is because CoroutineScope(SupervisorJob()).launch() will basically launch a new coroutine detached from the one created for handling of HTTP request, which potentially would mean that handling of HTTP request can finish before the code in the launch block ever has chance to execute (and thus the associated observation is potentially stopped by that point).

Could you try grabbing the job returned by the launch call and calling join() on it?

grassehh commented 3 months ago

@grassehh I'm just curious if this is because CoroutineScope(SupervisorJob()).launch() will basically launch a new coroutine detached from the one created for handling of HTTP request, which potentially would mean that handling of HTTP request can finish before the code in the launch block ever has chance to execute (and thus the associated observation is potentially stopped by that point).

Could you try grabbing the job returned by the launch call and calling join() on it?

Doesn't seem to work. You can checkout my sample here.

The thing is that if you use the nonProxiedService instead of proxiedService and call the /aop route, it works (the MDC context will be preserved).

Alternatively like I said, if you pass the parent coroutineContext to the new coroutine, it works too. Then maybe it makes more sense than just re-transforming the ObservationRegistry as context element, as it is already done in the web filter.

bclozel commented 1 month ago

I'm closing this issue for now, in favor of https://github.com/Kotlin/kotlinx.coroutines/issues/4187. At this point I think we should explore a org.jetbrains.kotlinx:kotlinx-coroutines-reactor integration, as Spring Framework would only serve a fraction of the Kotlin community for this need.

We can reopen this issue if we need to reconsider.