armanbilge / bayou

An enhanced Trace for IO
Apache License 2.0
21 stars 0 forks source link

Unexpected behavior with merging a traced Stream with another Stream #1

Open ryanmiville opened 2 years ago

ryanmiville commented 2 years ago

Merging a stream (.merge, .mergeHaltBoth, etc.) that uses bayou for tracing with another stream breaks the usage of child spans.

I know this project is just experimental, but I am also following the discussion of adding similar functionally to natchez so I figured I would share anyway. I have not pulled down and tested Ross' branch to verify the same behavior.

Merging a stream that uses bayou with another stream (even untraced streams) causes the underlying IOLocal to not properly update its state. I see no errors, but when I step through the debugger, the IOLocal will call set with the new Span[IO], but the next get still returns the default span. so every Trace[F] method acts upon the original root span used when instantiating the IOTrace. If the same stream is not merged with another, you can create and nest child spans as you would expect.

Here is an example of a single stream with its tracing output, which matches my expectations. (scala-cli https://gist.github.com/ryanmiville/6e4d756eceac70f9919c1de859e48f5a)

And here is the exact same stream merged with another stream, along with its traces. (scala-cli https://gist.github.com/ryanmiville/71d06f76dfe7c62ab96f342f5bed72b5)

Unfortunately, I don't know much about the inner workings of fs2 or IOLocal to provide any insight.

I don't expect you to really do anything about it, but I just wanted to bring it up in case the same behavior ends up making it into natchez, so it can at least be documented.

armanbilge commented 2 years ago

Yes, no worries, thank you very much for trying this out and writing this up! Pinging @rossabaker and @systemfw so they can have a look as well.

rossabaker commented 2 years ago

Thanks! I've been a little queasy about Trace[IO], and this makes me much more queasy. IOLocal values are visible on the fiber we set them, and are passed to any fibers we start. But they don't magically make it through more complicated types. If the stream interpreter hops fibers between these two lines, "child" will not be created from span (aka "new_root"), but from whatever trace was initialized with ("root"):

            _    <- Stream.resource(trace.spanR(span))
            _    <- Stream.resource(trace.spanR("child"))

I don't really know what to do about this.

SystemFw commented 2 years ago

Generally speaking I'm queasy on *Local because of this type of issues, but also queasy on the alternative (tagless + Kleisli) because it raises the complexity floor so high for a non-core business logic like tracing that it's unjustifiable outside of highly skilled teams for me...now if we're lucky, this problem might be specific to merge, but I suspect Ross' general point will hold

SystemFw commented 2 years ago

For context, all concurrent combinators in fs2 follow a roughly similar pattern:

The fact that the different streams end up running in different fibers doesn't bode too well for IOLocal propagation, but I haven't done a full analysis on the issue. It'd also be interesting to check what happens with Resource, although the situation there is both simpler, and offers more tools for control like allocatedCase

rossabaker commented 2 years ago

Tracing resources with IOLocal seems ... pretty good? I posted a full gist, but the intersting bits are:

    trace.use { implicit trace =>
      def io(name: String): IO[Unit] =
        trace.span(name)(
          IO(Thread.currentThread.getName)
            .flatMap(t => trace.put("thread" -> t))
        )

      val acquire = trace.span("acquire")(IO.unit)
      val release = trace.span("release")(IO.unit)
      val r = trace.spanR("resource") >> Resource.make(acquire)(_ => release)
      r.use(_ => trace.span("use")(IO.race(io("a"), io("b")).void))
    }

acquire, release, and use are children of resource. a and b are raced on different threads, but still are children of use.


The worst I've been able to do is this intentionally stupid thing, where we allocate resources and release them in an unnatural order:

    trace.use { implicit trace =>
      for {
        a <- trace.spanR("a").allocated
        b <- trace.spanR("b").allocated
        _ <- trace.put("1" -> "1")
        _ <- a._2
        _ <- trace.put("2" -> "2")
        _ <- b._2
        _ <- trace.put("3" -> "3")
      } yield ()
    }

a and 2 end up under root. 1, 3, and b all vanish. I clearly voided the warranty here, but I left trace in a horrible state.

armanbilge commented 2 years ago

I've also been experimenting, to see how hard it is to break Stream tracing. So far I've minified down to https://github.com/armanbilge/bayou/commit/2ca914e211715bac59073fea281535663dcb3204 which basically inserts a Queue between two Streams.

    def run: IO[Unit] = {
      trace.use { implicit trace =>
        foo(stream).compile.drain
      }
    }

  def foo[F[_], O](
    s: Stream[F, O]
  )(implicit F: Concurrent[F]): Stream[F, O] =
    Stream.eval(Queue.unbounded[F, Option[O]]).flatMap { q =>
      Stream.eval(s.enqueueNoneTerminated(q).compile.drain.start).flatMap { _ =>
        Stream.repeatEval[F, Option[O]](q.take).unNoneTerminate
      } 
    }

    val entryPoint = Log.entryPoint[IO]("example")

    val trace = entryPoint.root("root").flatMap(r => Resource.eval(Trace.ioTrace(r)))

    def stream(implicit trace: IOTrace): Stream[IO, Unit] = {
        Stream(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
          .covary[IO]
          .chunkN(3)
          .flatMap { chunk =>
            for {
            _    <- Stream.resource(trace.spanR("green kangaroo"))
            _    <- Stream.resource(trace.spanR("child"))
            _    <- Stream.eval(trace.put("vals" -> chunk.toList.mkString(",")))
            _    <- Stream.sleep[IO](1.second)
            } yield ()
          }
    }

Gives:

{
  "name" : "root",
  "service" : "example",
  "timestamp" : "2022-03-09T01:53:38.047505Z",
  "duration_ms" : 4523,
  "trace.span_id" : "fec8a9f6-9c78-43e1-9de5-e909406f5819",
  "trace.parent_id" : null,
  "trace.trace_id" : "b3396a93-4342-452e-8d53-88653d9e2bed",
  "exit.case" : "succeeded",
  "vals" : "10",
  "children" : [
    {
      "name" : "child",
      "service" : "example",
      "timestamp" : "2022-03-09T01:53:38.475466Z",
      "duration_ms" : 1036,
      "trace.span_id" : "b3b5586e-1573-418f-aaf6-ec249150acdb",
      "trace.parent_id" : "b3396a93-4342-452e-8d53-88653d9e2bed",
      "trace.trace_id" : "b3396a93-4342-452e-8d53-88653d9e2bed",
      "exit.case" : "succeeded",
      "children" : [
      ]
    },
    {
      "name" : "green kangaroo",
      "service" : "example",
      "timestamp" : "2022-03-09T01:53:38.466419Z",
      "duration_ms" : 1074,
      "trace.span_id" : "a0965406-9196-4671-940b-dea74a9e0e8d",
      "trace.parent_id" : "b3396a93-4342-452e-8d53-88653d9e2bed",
      "trace.trace_id" : "b3396a93-4342-452e-8d53-88653d9e2bed",
      "exit.case" : "succeeded",
      "children" : [
      ]
    },
    ...
  ]
}

So sadly, child is not nested under green kangaroo. I really cannot understand why.

armanbilge commented 2 years ago

Ok, minimized down to noneTerminate ...

def run: IO[Unit] = {
  trace.use { implicit trace =>
    stream.noneTerminate.compile.drain
  }
}

Update:

Yikes. Minimized to this?

def run: IO[Unit] = {
  trace.use { implicit trace =>
    stream.map(_ => ()).compile.drain
  }
}
rossabaker commented 2 years ago

stream.map(identity).compile.drain

rossabaker commented 2 years ago

It calls this:

  private[fs2] def mapOutput[F[_], O, P](s: Stream[F, O], f: O => P): Pull[F, P, Unit] =
    interruptScope(mapOutputNoScope(s, f))

And when I reimplement mapOutputNoScope, it's fine. interruptScope sounds ominous.

rossabaker commented 2 years ago

stream.interruptScope.compile.drain does it. I can confirm that it's releasing resources in the reverse order they're acquired, but the child is not seeing the same IOLocal state as the parent. I suspect it's in InterruptionContext, but I'm not familiar enough to have an idea what can be done about it.

armanbilge commented 2 years ago

Thanks. Minimized to this:

//> using scala "3.1.1"
//> using lib "co.fs2::fs2-core:3.2.5"

import cats.effect._
import fs2._

object Bayou extends IOApp.Simple:
  // def run = s.compile.lastOrError.flatMap(IO.println) // List(2, 1)
  def run = s.interruptScope.compile.lastOrError.flatMap(IO.println) // List()

  def s = Stream.eval(IOLocal(List.empty[Int])).flatMap { local =>
    Stream.eval(local.update(1 :: _)).flatMap { _ =>
      Stream.eval(local.update(2 :: _)).flatMap { _ =>
        Stream.eval(local.get)
      }
    }
  }
armanbilge commented 2 years ago

Ok, I think we more-or-less got to the bottom of it in https://github.com/typelevel/fs2/issues/2842#issuecomment-1062859215. Whenever interruption is involved, so are multiple fibers. And whenever multiple fibers are involved, you are liable to lose these sorts of modifications to fiber-local state once the fiber is completed/discarded.

armanbilge commented 2 years ago

It looks like there's some hope to fix this in fs2 :) but, what it means for Bayou is that it would probably specifically need a spanS(...): Stream[F, Unit]-style method to make the magic invocations on the Stream.

ryanmiville commented 2 years ago

it would probably specifically need a spanS(...): Stream[F, Unit]-style method

FWIW I have already been adding a spanS as a convenience method in my project. To echo @SystemFw's sentiment, tracing should be a frictionless addition to our codebase, so I personally welcome specific methods for specific use cases. :)

rossabaker commented 2 years ago

What region of the stream would have to be non-interruptible? The spanS effects that set the local state, or everything inside spanS?

      def spanS[F[_]: MonadCancelThrow: Trace](name: String) =
        // plus future magic
        Stream.resource(Trace[F].spanR("stream"))

      trace.use { implicit trace =>
        for {
          d  <- Deferred[IO, Either[Throwable, Unit]]
          s   = Stream(1).repeat.take(100000)
          s2  = s ++ Stream.eval_(d.complete(Either.unit)) ++ s
          x  <- (spanS[IO]("stream") >> s2).interruptWhen(d).compile.foldMonoid
          _  <- IO.println(x)
        } yield x

Can x be less than 200000?

armanbilge commented 2 years ago

What region of the stream would have to be non-interruptible?

Anytime you want to set local state, and have it be visible to the subsequent stream. Subsequent reading is fine because forked fibers clone the local state of the parent fiber, but any modifications made on forked fibers cannot be seen on the parent fiber.

rossabaker commented 2 years ago

And modifications to the parent fiber aren't propagated to children after the fork. So this works well if:

  1. Release of spanS happens on the same fiber as spanS.
  2. Fibers forked by spanS don't continue after spanS. (Even this is fine for creating new spans, but it's not fine for adding more tags to the stream's span.)
armanbilge commented 2 years ago

Re (1): I wonder if we can have the release step check to see if the span currently in the fiber is actually the one that it installed. If not, maybe better not to touch it? Who knows!

Re (2): yeah, this is a tricky one. If you follow good practices with by closely associating spanR with fiber scopes e.g. Supervisor then I think that takes the worst of the bite away.

rossabaker commented 2 years ago

Not releasing in a state mismatch makes things mildly worse. Not only do we end on a different span, but we leak a span:

Status quo:

ambient a open b open
Init root 🚫 🚫
acquire a a 🚫
acquire b b
release a root 🚫
release b a ⚠️ 🚫 🚫

If we :

ambient a open b open
Init root 🚫 🚫
acquire a a 🚫
acquire b b
release a b ✅ ⚠️
release b a ⚠️ ✅ ⚠️ 🚫
armanbilge commented 2 years ago

I love your tables! :)

I could be misunderstanding, but the span in question should definitely still close. But, while doing so it shouldn't change the ambient span if it didn't install the one that's already there. But then if I read your table correctly I think we end up in the same final state anyway.

rossabaker commented 2 years ago

Oh, right. If we close it without resetting it, the only difference from the status quo is that b, not root, is ambient after release a. The end state is indeed the same.

armanbilge commented 2 years ago

the only difference from the status quo is that b, not root, is ambient after release a

IMO that's probably slightly better. But at this point you are in undefined-behaviour-ish territory anyway.

rossabaker commented 2 years ago

I would err on the side that has an open span as the ambient span, so fields have somewhere to go. But that's not a distinction here. And if there are lots of bs, making sure stray fields end up on b instead of root prevents overwrites. I think that is probably the best we can do.

rossabaker commented 2 years ago

The definition in https://github.com/tpolecat/natchez/pull/526/commits/e2afc8515a19e7f7650a76b85e806bf468b7b092 works, albeit with a bit different API. new_root spans the outer stream, and chunk is spanned by child:

    Trace[Stream[IO, *]].span("new_root")(
      Stream(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
        .covary[IO]
        .chunkN(3)
        .flatMap { chunk =>
          Trace[Stream[IO, *]].span("child")(
            Stream.eval(trace.put("vals" -> chunk.toList.mkString(","))) >>
            Stream.sleep[IO](1.second)
          )
        }
    )

Full frankengist, which is @ryanmiville's example and my Trace.

armanbilge commented 2 years ago

I had to add //> using plugin "org.typelevel:::kind-projector:0.13.2" to get it to compile. Having a look now :)

rossabaker commented 2 years ago

Oh, I couldn't find that syntax and have been adding that as a command line argument. Yay.

I have an API design comment on this, but it may be more germane to the Natchez PR.

armanbilge commented 2 years ago

So, is the problem with Resource the use? It's not obvious to me what the exact equivalent of use is for Stream. If we didn't use with a function but instead used use more like compile, then there isn't the issue about the ambient span not covering use?

rossabaker commented 2 years ago

An entire stream can be traced because we can exchange its untraced effects F for traced effects F via translate. We can similarly exchange the effects of acquiring and releasing a resource. But there is no hook to exchange the output of use. The f in fold would need to be exchanged. We can't add an Exchange to the Resource algebra because the ADT is a public API. It is thus up to the user of the resource to apply the F ~> F themselves... if they can even see it.

armanbilge commented 2 years ago

Oh, now I think I understand your other comment about how it's easier for Stream. Thanks, I think I'm slowly catching up.

This is the second time Resource being a public ADT has been annoying. I asked about it before and my understanding was the design choice was so fs2 could interpret it. But I think Resource#allocatedCase covers that use-case now, but we can't revoke it anymore :(

rossabaker commented 2 years ago

When we trace a resource through the lens of Trace[Resource[IO, *]].span, it's more intuitive. Acquisition and release are part of the resource, and use happens on the outside.

    trace.use { implicit trace =>
      Trace[Resource[IO, *]].span("child")(
        Resource.make(
          trace.span("acquire")(IO.unit))(
          _ => trace.span("release")(IO.unit))
      ).surround(trace.span("use")(IO.unit))
    }

As a practical matter, it would prevent an http4s tracer from tracing anything about the body that it didn't know from the headers.

I know less about skunk. The Session interface is full of Resources, but tracing is deeply embedded. A better comparison to http4s might be that tracing doesn't happen through a middleware, but is baked directly into the backend.

rossabaker commented 2 years ago

allocatedCase is another way to view the problem: where in F[(B, ExitCase => F[Unit])] are we going to put the code that can exchange the output of a B => F[C] to be provided later?

ryanmiville commented 2 years ago

Just figured I'd share that we've been using a custom Trace at work for the last few months that is essentially a combination of bayou and @rossabaker's F ~> F implementation and it's been working well for us.

I've open sourced it here. I don't intend to promote yet another tracing library. I'd much rather get these sort of things added to bayou or natchez itself.

armanbilge commented 2 years ago

@ryanmiville that's great, thanks for trying it out in the wild and sharing your implementation! I agree about upstreaming, ideally to Natchez itself. Towards that you should definitely add a similar comment to one of Ross's PRs if you haven't yet.

Bayou itself was only started as an experiment, but if you consider it a "step up" I'd be very very happy to accept a PR here with your changes :)

rossabaker commented 2 years ago

Shameless plug: otel4s is on the verge of being a Typelevel project, and probably where I'll be putting most of my tracing efforts into the future. (That said, I started with Metrics. There's no tracing at all yet, so that's a future answer.)

ryanmiville commented 2 years ago

Thanks for the feedback @armanbilge . I'll probably whip up a PR in the next week or two.

I'm excited to eventually try out otel4s :)