open-telemetry / opentelemetry-java

OpenTelemetry Java SDK
https://opentelemetry.io
Apache License 2.0
1.97k stars 812 forks source link

All spans are broken if R2dbcEntityTemplate or CoroutineCrudRepository was used in the code #5543

Closed yuliamitttovahuspy closed 1 year ago

yuliamitttovahuspy commented 1 year ago

Description:

We have a project which uses GraphQL, Spring, Kotlin coroutines. For DB interactions org.springframework.data.r2dbc.core.R2dbcEntityTemplate is used. It can return or coroutine Flow or Spring Flux/Mono. If r2dbcEntityTemplate is used, the trace looks like this:

override fun findAll(): Flow<SampleItemEntity> {
    return r2dbcEntityTemplate.select(SampleItemEntity::class.java).flow()
}
sample_broken_trace

If we return List it will be different :

override fun findAll(): List<SampleItemEntity> {
    // or we can return empty list, the effect will be the same
    return runBlocking(Dispatchers.IO) {
        r2dbcEntityTemplate.select(SampleItemEntity::class.java).all().collectList().block()!!
    }
}
sample_correct_trace

Steps to reproduce Gradle dependencies:

implementation("io.opentelemetry.instrumentation:opentelemetry-instrumentation-annotations:1.24.0")
implementation("io.opentelemetry:opentelemetry-extension-kotlin:1.27.0")
annotationProcessor("org.springframework.boot:spring-boot-configuration-processor")
implementation("org.springframework.boot:spring-boot-starter-actuator")
implementation("org.springframework.boot:spring-boot-starter-amqp")
implementation("org.springframework.boot:spring-boot-starter-data-jdbc") // Needed for Flyway
implementation("org.springframework.boot:spring-boot-starter-data-r2dbc")
implementation("org.springframework.boot:spring-boot-starter-webflux")
implementation("org.springframework.boot:spring-boot-starter-validation")
implementation("org.springdoc:springdoc-openapi-webflux-ui:1.6.15")
implementation("com.expediagroup:graphql-kotlin-spring-server:6.4.0")
implementation("com.expediagroup:graphql-kotlin-schema-generator:6.4.0")
implementation("com.graphql-java:graphql-java-extended-scalars:20.0")
implementation("org.postgresql:r2dbc-postgresql")
implementation("org.springframework.boot:spring-boot-starter-security")
developmentOnly("org.springframework.boot:spring-boot-devtools")
implementation("com.apollographql.apollo3:apollo-runtime:3.8.0") 

Configuration:

import io.opentelemetry.context.Context
import io.opentelemetry.extension.kotlin.asContextElement
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.SupervisorJob
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration

@Configuration
class CoroutineConfiguration {
    private val applicationCoroutineScope = CoroutineScope(
        SupervisorJob() + Dispatchers.IO + Context.current().asContextElement(),
    )
    @Bean
    fun applicationScope(): CoroutineScope = applicationCoroutineScope
}

We will create a simple chain: SampleItemFetcher(GraphQL) -> SampleItemService(Spring) -> EntityRepository(Spring) -> EntityRepository (uses R2dbcEntityTemplate) Code is described below:

schema.graphql:

type SampleItem {
  id: Long
  text: String
}

type Query {
  sampleItems: [SampleItem!]!
}

SampleItemFetcher:

import com.expediagroup.graphql.server.operations.Query
import ###.SampleItemService
import ###.SampleItemResponse
import io.opentelemetry.instrumentation.annotations.WithSpan
import org.springframework.security.access.prepost.PreAuthorize
import org.springframework.stereotype.Component

@Component
@PreAuthorize("isAuthenticated()")
class SampleItemFetcher(private val sampleItemService: SampleItemService) : Query {

    @WithSpan
    suspend fun sampleItems(): List<SampleItemResponse> {
        return sampleItemService.sampleItems()
    }
}

SampleItemService:

import ###.SampleItemResponse
import io.opentelemetry.instrumentation.annotations.WithSpan

interface SampleItemService {

    @WithSpan
    suspend fun sampleItems(): List<SampleItemResponse>
}

SampleItemServiceImpl:

import ###.EntityRepository
import ###.SampleItemService
import ###.SampleItemResponse
import io.opentelemetry.instrumentation.annotations.WithSpan
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import org.springframework.stereotype.Service

@Service
class SampleItemServiceImpl(
    private val repo: EntityRepository,
) : SampleItemService {

    @WithSpan
    override suspend fun sampleItems(): List<SampleItemResponse> {
        return repo.getAll().map { it.toSampleItem() }
    }

    companion object {
        val logger: Logger = LoggerFactory.getLogger(SampleItemServiceImpl::class.java)
    }
}

EntityRepository:

import ###.SampleItemResponse
import io.opentelemetry.instrumentation.annotations.WithSpan

interface EntityRepository {
    @WithSpan
    suspend fun getAll(): List<SampleItemResponse>

EntityRepositoryImpl:

@Repository
class EntityRepositoryImpl(
    private val dbRepo: DBRepository
) : EntityRepository {

    @WithSpan
    override suspend fun getAll(): List<SampleITemResponse> {
        return dbRepo.findAll().map { it.toDto() }
    }

DBRepository:

interface DBRepository {

    @WithSpan
    suspend fun findAll(): List<SampleItemEntity>

DBRepositoryImpl:

import ###.SampleItemEntity
import io.opentelemetry.instrumentation.annotations.WithSpan
import io.r2dbc.postgresql.codec.Json
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.reactor.awaitSingleOrNull
import org.slf4j.LoggerFactory
import org.springframework.data.r2dbc.core.R2dbcEntityTemplate
import org.springframework.data.r2dbc.core.flow
import org.springframework.data.relational.core.query.Criteria
import org.springframework.data.relational.core.query.Query
import org.springframework.r2dbc.core.awaitRowsUpdated
import org.springframework.stereotype.Repository

@Repository
class DBRepositoryImpl(
    private val r2dbcEntityTemplate: R2dbcEntityTemplate,
) : DBRepository {

    @WithSpan
    override suspend fun findAll(): Flow<SampleItemEntity> {
        return r2dbcEntityTemplate.select(SampleItemEntity::class.java).flow()
    }

What did you expect to see? Spans should contain the correct ones.

What did you see instead? In broken trace RepoImpl.getAll is shown separately, but it should be a part of the parent span. sample_broken_trace_highlighted

What version and what artefacts are you using?

implementation("io.opentelemetry.instrumentation:opentelemetry-instrumentation-annotations:1.24.0")
implementation("io.opentelemetry:opentelemetry-extension-kotlin:1.27.0")
annotationProcessor("org.springframework.boot:spring-boot-configuration-processor")
implementation("org.springframework.boot:spring-boot-starter-actuator")
implementation("org.springframework.boot:spring-boot-starter-amqp")
implementation("org.springframework.boot:spring-boot-starter-data-jdbc") // Needed for Flyway
implementation("org.springframework.boot:spring-boot-starter-data-r2dbc")
implementation("org.springframework.boot:spring-boot-starter-webflux")
implementation("org.springframework.boot:spring-boot-starter-validation")
implementation("org.springdoc:springdoc-openapi-webflux-ui:1.6.15")
implementation("com.expediagroup:graphql-kotlin-spring-server:6.4.0")
implementation("com.expediagroup:graphql-kotlin-schema-generator:6.4.0")
implementation("com.graphql-java:graphql-java-extended-scalars:20.0")
implementation("org.postgresql:r2dbc-postgresql")
implementation("org.springframework.boot:spring-boot-starter-security")
developmentOnly("org.springframework.boot:spring-boot-devtools")
implementation("com.apollographql.apollo3:apollo-runtime:3.8.0") 

Environment macOS, tested locally

Additional context Add any other context about the problem here.

andydenk commented 1 year ago

+1 The same issue is affecting me as well.

mayur-bavisia commented 1 year ago

💯 Experiencing same thing with coroutines.

yuliamitttovahuspy commented 1 year ago

The same problem happens when CoroutineCrudRepository is used. For example, if the following code was used:

@Repository
interface SampleITemCoroutineCrudRepository : CoroutineCrudRepository<SampleItemEntity, UUID>
yuliamitttovahuspy commented 1 year ago

I found the solution how to fix spans, but it is not a good one: 1) Switch off annotations (it looks like they are not tracking things properly) and move to manual approach. For it, add new dependency:

implementation("io.opentelemetry:opentelemetry-extension-kotlin:1.27.0")

And env params listed below:

OTEL_INSTRUMENTATION_GRAPHQL_JAVA_ENABLED=false;
OTEL_INSTRUMENTATION_OPENTELEMETRY_API_ENABLED=true;
OTEL_INSTRUMENTATION_OPENTELEMETRY_INSTRUMENTATION_ANNOTATIONS_ENABLED=false;
OTEL_INSTRUMENTATION_SPRING_WEBFLUX_ENABLED=false;
  1. Introduce function to wrap every call:
suspend fun <Result> withFixedSpan(
    name: String,
    parameters: (SpanBuilder.() -> Unit)? = null,
    parentSpan: Span? = null,
    block: suspend (span: Span?) -> Result,
): Result {
    val tracer = GlobalOpenTelemetry.getTracer("com.example", "0.0.0")
    val parentSpanToSet = parentSpan ?: Span.current()
    val childSpan: Span = tracer.spanBuilder(name).run {
        if (parameters != null)
            parameters()
        coroutineContext[CoroutineName]?.let {
            setAttribute("coroutine.name", it.name)
        }
        setParent(Context.current().with(parentSpanToSet))
        startSpan()
    }

    return withContext(parentSpanToSet.asContextElement()) {
        try {
            childSpan.makeCurrent().use { scope -> block(childSpan) }
        } catch (throwable: Throwable) {
            childSpan.setStatus(StatusCode.ERROR)
            childSpan.recordException(throwable)
            throw throwable
        } finally {
            childSpan.end()
        }
    }
}
  1. Wrap every call you need:
        return withFixedSpan(
            name = "SampleEntityRepositoryImpl.findAllBy",
        ) {
            r2dbcEntityTemplate.select(SampleEntity::class.java)
                .matching(filter.toQuery(sorting))
                .flow().toList()
        }
jkwatson commented 1 year ago

There are (at least) three places where things could be going wrong, only one of which would apply to this repositories codebase: 1) Our co-routine context propagation code has a bug in it [that lives here: https://github.com/open-telemetry/opentelemetry-java/tree/main/extensions/kotlin) 2) There are issues with the OTel-based instrumentation 3) There are issues with the Spring code that makes things break.

It would be helpful if you could narrow things down a bit on this, since only option 1) would apply to this codebase.

YuliaMittova commented 1 year ago

Hello @jkwatson I have created a sample project. Please find it here: https://github.com/YuliaMittova/opentelemetry_issue_5543 I think the problem is in how OpenTelemetry is processing @WithSpan spans when Flow emits more than one item with delay (which is a typical situation when working with a database). If you fetch a specific student (Flow will emit only one item), you will see the following picture: with_flow_only_one_item_emitted

But if you have Coroutine Flow which emits more than one item, you will see spans which don't have correct parents and are measured incorrectly: with_flow_many_items

jkwatson commented 1 year ago

That sounds like it's an instrumentation issue, then, and not an issue for the core libraries. Can you open an issue in the instrumentation repository for this?

YuliaMittova commented 1 year ago

@jkwatson thank you for a reply. https://github.com/open-telemetry/opentelemetry-java-instrumentation/issues - is this a correct place to create an issue?

YuliaMittova commented 1 year ago

Created https://github.com/open-telemetry/opentelemetry-java-instrumentation/issues/8812

trask commented 1 year ago

Closing now that issue has been moved over to opentelemetry-java-instrumentation