reactor / reactor-core

Non-Blocking Reactive Foundation for the JVM
http://projectreactor.io
Apache License 2.0
4.93k stars 1.2k forks source link

Get OOM using Mono.cache trapping in endless cycle #3855

Closed BarexaS closed 2 months ago

BarexaS commented 2 months ago

Expected Behavior

Building stack trace should not case OOM or endless cycles

Actual Behavior

Method reactor.core.publisher.FluxOnAssembly.OnAssemblyException#findPathToLeaves never ends a

Steps to Reproduce

For now still dont get how I stuck in it. For now all I have is dump discovered graph photo_2024-07-22_14-36-18

Possible Solution

Guess if we reach node that already was visited we can stop building trace

chemicL commented 2 months ago

Can you share the code which leads to the problem? Without a reproducer we won't be able to help.

BarexaS commented 2 months ago

Basically I got GRPC server that have intercepter

class AuthInterceptor : ServerInterceptor {

    override fun <ReqT : Any?, RespT : Any?> interceptCall(
        serverCall: ServerCall<ReqT, RespT>,
        metadata: Metadata?,
        handler: ServerCallHandler<ReqT, RespT>
    ): ServerCall.Listener<ReqT> {

        val cachedSessionContext = getSessionContext(metadata, clientIpAddress).cache()
        return Context.current().withValue(SESSION_CTX_KEY, cachedSessionContext)
            .let { context ->
                Contexts.interceptCall(context, serverCall, metadata, handler)
            }
    companion object {
        val SESSION_CTX_KEY: Context.Key<Mono<SessionContext>> = Context.key("userSession")
    }
}

Later before grpc service implementation is called aspect take place

private fun <Req : GeneratedMessageV3, Resp : Any> innerCheckAccess(
        pjp: ProceedingJoinPoint,
        annotation: CheckAccess,
        request: Req,
        responseObserver: StreamObserver<Resp>
    ): Any? {
        try {
            val session = AuthInterceptor.SESSION_CTX_KEY.get().block()!!

            // We use block operation here, because Spring AOP don't
            // provide possibility for proceed target method invocation from another thread
           // Here some internal business logic with DB calls
            val hasAccess = session.hasAccess(annotation, request).block()!!

            if (hasAccess) {
                return session.executeInScope {
                    pjp.proceed()
                }
            } else {
                LOG.info("Permission denied. Method: {}, request: {}, session: {}.", pjp, request, session)
                responseObserver.onError(StatusRuntimeException(Status.PERMISSION_DENIED))
            }
        } catch (t: Throwable) {
            LOG.error("Internal exception during check access!", t)
            responseObserver.onError(StatusRuntimeException(Status.INTERNAL))
        }

        return Unit
    }

     private fun SessionContext.executeInScope(f: () -> Any?): Any? {
        try {
            CurrentSession.put(this)
            return f()
        } finally {
            CurrentSession.release()
        }
    }

object CurrentSession {
    const val SESSION_CONTEXT_KEY = "SESSION_CONTEXT_KEY"
    private val holder: ThreadLocal<SessionContext> = ThreadLocal()

    fun put(sessionContext: SessionContext) {
        holder.set(sessionContext)
    }

    fun get(): SessionContext {
        return holder.get()
    }

    fun getSession(): Mono<SessionContext> {
        return Mono.deferContextual { context -> context.toMono() }
            .flatMap { it.get(SESSION_CONTEXT_KEY) as Mono<SessionContext> }
    }

    fun release() {
        holder.remove()
    }
}

Then in GRPC service scope I access publisher that stored in thread local to work with session. For example:

class GrpcPingPongService(
    private val subscriptionInvalidationService: SubscriptionInvalidationService
) : ReactorPingPongServiceGrpc.PingPongServiceImplBase() {

    override fun pingPongStream(request: Flux<Ping>): Flux<Pong> {
        val sessionContainer = AtomicReference<SessionContext>()
        val callSampler = CallSampler(SESSION_EXTENSION_SAMPLE_TIMESPAN)
        return request.flatMap { pingRequest ->
            CurrentSession.getSession()
                .doOnNext { session -> sessionContainer.set(session) }
                .map { pingRequest }
        }.map { ping ->
            Pong.newBuilder()
                .setPingTimestamp(ping.timestamp)
                .setServerTimestamp(Instant.now().asProtoTimestamp())
                .build()
        }
            .doOnNext { callSampler.call(sessionContainer.get()::extendSession) }
            .doOnError { LOG.error("Error on pin-pong", it) }
    }
}
chemicL commented 2 months ago

I’d appreciate something I can run and debug where the OOM is triggered. Either a zip or a repository or a failing test case I can paste into my IDE. It would be best if it was in Java as well. The above gives me no hint of where the problem happens. The gRPC library you are using can be doing something unexpected and all sorts of other issues can be hidden somewhere in your custom logic as well.

BarexaS commented 2 months ago

I understand, but if I could reproduce this issue - I guess i could solve it from my side. Problem that I can't understand why this is happening. How that even possible to create cycled stack trace building reactor chain?

chemicL commented 2 months ago

There must be some recursive chaining, e.g. you return a session Mono in a flatMap while that session Mono was already part of an earlier stage. I'm closing the issue for now as it is more a question rather than a bug report. Please use StackOverflow or Gitter for questions. Should you discover there is in fact a reproducible bug in reactor-core, please feel free to reopen or create a new issue.