monix / monix

Asynchronous, Reactive Programming for Scala and Scala.js.
https://monix.io
Apache License 2.0
1.93k stars 246 forks source link

Locals lost when using akka http client #1009

Open nrktkt opened 5 years ago

nrktkt commented 5 years ago

I haven't had time (or knowledge) to look deeply into this, maybe someone else has insight.

Monix Locals are not propagated across invocations of akka's http client. Passing the tracing scheduler into the creation of the actor system should ensure it's used everywhere (and indeed that does work when using the akka http server). Maybe there's a bug in TracingScheduler, or maybe there's some actor pattern that breaks future chaining (but I can't think of what it would be) Some code to reproduce is below.

import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.stream.ActorMaterializer
import monix.execution.misc.Local
import monix.execution.schedulers.TracingScheduler
import akka.http.scaladsl.client.RequestBuilding._

import scala.concurrent.duration.Duration
import scala.concurrent.{Await, ExecutionContext, Future}
import scala.util.Random

object Test extends App {
  implicit val system = ActorSystem("default", defaultExecutionContext = Some(TracingScheduler(ExecutionContext.global)))
  implicit val materializer = ActorMaterializer()
  implicit val executionContext = system.dispatcher

  val local = Local("missing")

  Await.result(
  Local.isolate {
    local := Random.nextInt.toString
    println(local())
    val result =
      Future(())
        .andThen{ case _ => println(local())}
        .flatMap(_ => Http().singleRequest(Get("http://example.com")))
    result.onComplete(_ => println(local()))
    result
  }, Duration.Inf
  )
}
nrktkt commented 5 years ago

So the reason this happens on the akka http client is that singleRequest creates a promise which is sent in a message to an actor. The actor completes the promise, but since the message is received via a queue the local context is not carried over. This doesn't seem like something that can be fixed in monix, and seems inevitable (slick's db.run also returns a future but doesn't take an execution context). I've got a workaround

  def propagate[A](future: Future[A]): Future[A] = {
    val ctx = Local.getContext()
    future.transform { a =>
      Local.setContext(ctx)
      a
    }(TrampolineExecutionContext.immediate)
  }

Maybe adding something like this to the documentation and/or codebase will help users be aware that a manual step is needed to cover certain parts of code.

Avasil commented 5 years ago

Thanks for investigation @kag0 Yeah, I'm afraid we are not able to do anything with cases like this. There was an idea about quarantine earlier. Seems like we should add it.

Would be nice to be able to point out where exactly it needs to be used.

cc @oleg-py @alexandru

oleg-py commented 5 years ago

@kag0 this looks very much like what isolate does for Future (impl here). Can you check if you can use Local.isolate as a drop-in replacement for your propagate function?

The issue here is also that actor model is at odds of having something like thread-local, fiber-local, whatever you name it, since you don't have a logical thread of execution with actors, and that part might leak from anything akka-based that you call (like http client), but probably won't leak from things that call you (like http server).

Anyway, documentation on TaskLocal needs to be improved for sure.

nrktkt commented 5 years ago

Yes propagate is just a simplified version of isolate for Future and looks to work in place.

It's become clear to me that the twitter style local is natively incompatible with anything message-passing based. But. Akka actually has local context propagation used by lightbend telemetry which I imagine places the local context in some metadata of the messages between actors. I don't have experience with it but it would be awesome if there were a way to connect monix locals with what akka has.

But even if that were possible, there still would be a need for something like propagate/quarantine to handle outliers like slick and promise/callback based adapters for java libraries.

oleg-py commented 5 years ago

@kag0 isolate should work for this uses - you continue with what you had before, and any 3rd-party code or parallel process gets a copy for its uses. That was the general idea of it