reactor / reactor-core

Non-Blocking Reactive Foundation for the JVM
http://projectreactor.io
Apache License 2.0
4.99k stars 1.21k forks source link

Problem with context propagation across Mono <-> CompletableFuture boundary #3851

Closed msosnicki closed 4 months ago

msosnicki commented 4 months ago

We're using Mono to carry the context during the HTTP request processing. Most of our pipeline uses Mono, but there are parts that are using CompletableFuture (for some legacy reasons lets say). Both sides of the processing chain need to access context values. Unfortunately the context is not readable from the block that is converted to CompletableFuture.

Expected Behavior

Context is readable, as with the withCtx(Mono.deferContextual(ctx => getCtx)).block().

Actual Behavior

The reproduced example below raises java.util.NoSuchElementException: Context is empty.

Steps to Reproduce

Reproduction below (using scala-cli):

//> using scala "3.3"
//> using dep "io.projectreactor:reactor-core:3.6.8"
//> using dep "io.micrometer:context-propagation:1.1.1"

import reactor.core.publisher.Hooks
import reactor.core.publisher.Mono

Hooks.enableAutomaticContextPropagation()

val key = "foo"

def getCtx = Mono.deferContextual{ctx => 
  val result = ctx.get[String](key)
  println(result)
  Mono.just(result)
}

def withCtx[A](m : Mono[A]) = m.contextWrite(_.put(key, "bar"))

withCtx(Mono.deferContextual(ctx => Mono.fromFuture(getCtx.toFuture()))).block()

I tried various hints mentioned threads that looked relevant:

https://github.com/reactor/reactor-core/issues/3357 https://github.com/reactor/reactor-core/issues/3366

Unfortunately without luck.

Will appreciate any hints how to solve the above problem. Thanks

Possible Solution

Your Environment

chemicL commented 4 months ago

Hey @msosnicki ! First off, I'm not well versed in Scala, so I might be missing something from your example (please note reactor-core is a Java project). What you are trying to highlight here is that there is no Context propagation for CompletableFuture.

The way you structure your code somehow hides the fact that there's a disconnect between two reactive chains. The fact that you call toFuture() on the upstream chain, makes anything above that CompletableFuture unaware of any Context that lies downstream from fromFuture(). Yet still, there is a strong type mismatch here and these chains are not connected in a reactive fashion, but a mapping between a JDK CompleteblaFuture stack's completion signals is made on your behalf.

Context is a unique Project Reactor feature and is available in Flux and Mono.

We don't have any means in CompletableFuture to transport that explicitly. That's an inherent property of the design in the JDK of these classes. Unless you know of some explicit metadata propagation mechanism in that stack, there's not much we can do. One option is to propagate the contextual metadata along the values, e.g. using a Pair<V, CTX>. I used the term "explicit" purposefully - let's now switch to the "implicit" propagation mechanisms.

What I also notice is that you used the automatic context propagation feature. It aids in propagating between Context and ThreadLocal values. Please consult the context-propagation library to understand how you can register a ThreadLocalAccessor and restore ThreadLocal state to be available in your Future if that's your intent. However, don't expect Context to be captured in your deferContextual lambda. Reactor provides an empty Context every time it subscribes using a CompletableFuture as it's a brand new chain at that point.

Just for the matter of experimentation, consider the following change:

- withCtx(Mono.deferContextual(ctx => Mono.fromFuture(getCtx.toFuture()))).block()
+ withCtx(Mono.deferContextual(ctx => Mono.fromFuture(getCtx.contextWrite(ctx).toFuture()))).block()

I believe your artificial code would now work, but I assume you're more interested in accessing the contextual data inside the asynchronous execution points that use the CompletableFuture/CompletionStage stack.

With the above - I consider this issue more of a question suitable for StackOverflow/Gitter and am closing it. Hope this comment helps you find the solution to your particular use case.