typelevel / cats-tagless

Library of utilities for tagless final encoded algebras
https://typelevel.org/cats-tagless/
Apache License 2.0
314 stars 41 forks source link

Does Aspect need to be a tagless algebra algebra? #412

Open ValdemarGr opened 1 year ago

ValdemarGr commented 1 year ago

I have been trying out https://github.com/Dwolla/natchez-tagless/tree/main for tracing and been really interested in this idea of typeclass aop. Unfortunately I cannot see how an implementation of Aspect that works on tagless algebras can reason with any algebra that does not have F as the outermost type constructor; A Stream[Instrumentation[F, *], *] doesn't make much sense, and the aspect can't exactly change the return signature into Instrumentation[Stream[F, *], *].

However, what about just reasoning with type-costructors with typeclasses also? Consider the following implementation of an aspect that allows any structure that embeds an F, and as an example, tracing streams becomes trivial:

trait Aspect[A, Dom[_], Cod[_]] {
  def weave(a: A)(ak: Aspect.Weave[Dom, Cod, *] ~> Id): A
}

object Aspect {
  trait Advice[F[_], G[_]] {
    type A
    def name: String
    def target: F[A]
    implicit def instance: G[A]
  }

  object Advice {
    type Aux[F[_], G[_], A0] = Advice[F, G] { type A = A0 }

    def apply[F[_], G[_], A](
      name: String,
      target: F[A]
    )(implicit instance: G[A]): Aux[F, G, A] = {
      type A0 = A
      val name0 = name
      val target0 = target
      val instance0 = instance
      new Advice[F, G] {
        type A = A0
        val name = name0
        val target = target0
        val instance = instance0
      }
    }
  }

  case class Weave[Dom[_], Cod[_], A](
    algebraName: String,
    domain: List[List[Advice[Eval, Dom]]],
    codomain: Advice.Aux[Id, Cod, A]
  )
}

trait Effect[A] {
  type F[_]
  type U
  def ev: F[U] =:= A
}

object Effect {
  type Aux[F0[_], U0] = Effect[F0[U0]] {
    type F[x] = F0[x]
    type U = U0
  }
}

implicit def effectForAnyKind[F[_], A]: Effect.Aux[F, A] = {
  type F0[B] = F[B]
  new Effect[F[A]] {
    type F[C] = F0[C]
    type U = A
    val ev = implicitly
  }
}

trait TraceableEffect[A] {
  val effect: Effect[A]

  def trace: Trace[effect.F]
  def F: Applicative[effect.F]
}

object TraceableEffect {
  type Aux[F0[_], A0] = TraceableEffect[F0[A0]] {
    val effect: Effect.Aux[F0, A0]
  }
}

implicit def traceableEffectForAnyTraceableEffect[F[_]: Trace, A](implicit
  effect: Effect.Aux[F, A],
  F0: Applicative[F]
): TraceableEffect.Aux[F, A] = {
  val effect0 = effect
  new TraceableEffect[F[A]] {
    val effect = effect0

    def trace = Trace[F]
    def F = F0
  }
}

def applyTracingAspect: Aspect.Weave[Trivial, TraceableEffect, *] ~> Id =
  new (Aspect.Weave[Trivial, TraceableEffect, *] ~> Id) {
    override def apply[A](fa: Aspect.Weave[Trivial, TraceableEffect, A]): Id[A] = {
      val inst = fa.codomain.instance
      val T: Trace[inst.effect.F] = inst.trace
      val ev = inst.effect.ev
      ev {
        T.span(s"${fa.algebraName}.${fa.codomain.name}") {
          ev.flip(fa.codomain.target)
        }
      }
    }
  }

trait MyAlg[F[_]] {
  def getData(id: String): F[String]

  def getDataStream(id: String): fs2.Stream[F, String]
}

def aspectForMyAlg[F[_]: Trace: MonadCancelThrow]: Aspect[MyAlg[F], Trivial, TraceableEffect] = {
  def aspectConstructionMacro[Dom[_], Cod[_]](implicit
    d1: Dom[String],
    c1: Cod[F[String]],
    c2: Cod[fs2.Stream[F, String]]
  ) = new Aspect[MyAlg[F], Dom, Cod] {
    override def weave(a: MyAlg[F])(ak: Aspect.Weave[Dom, Cod, *] ~> Id): MyAlg[F] =
      new MyAlg[F] {
        override def getData(id: String): F[String] =
          ak {
            Aspect.Weave(
              "MyAlg",
              List(List(Aspect.Advice("id", Eval.now(id)))),
              Aspect.Advice[Id, Cod, F[String]]("getData", a.getData(id))
            )
          }

        override def getDataStream(id: String): fs2.Stream[F, String] =
          ak {
            Aspect.Weave(
              "MyAlg",
              List(List(Aspect.Advice("id", Eval.now(id)))),
              Aspect.Advice[Id, Cod, fs2.Stream[F, String]]("getDataStream", a.getDataStream(id))
            )
          }
      }
  }

  aspectConstructionMacro[Trivial, TraceableEffect]
}

def traceMyAlg[F[_]: Trace: MonadCancelThrow](alg: MyAlg[F]): MyAlg[F] =
  aspectForMyAlg[F].weave(alg)(applyTracingAspect)
joroKr21 commented 1 year ago

It's an interesting idea but I think the definition of Aspect is too restrictive:

trait Aspect[A, Dom[_], Cod[_]] {
  def weave(a: A)(ak: Aspect.Weave[Dom, Cod, *] ~> Id): A
}

It doesn't allow us to change the return type. So we're only able to perform side effects (like tracing). Currently AOP doesn't work when the interface has mixed return types because of the issues you mentioned.

I wonder if splitting the interface into streaming / non-streaming parts might be a good alternative? Then you would have Alg[F], Alg[Stream[F, *]] and Alg[Instrumentation[Stream[F, *], *]] But I realize that's not common practice so we need some ideas to improve the situation.

ValdemarGr commented 1 year ago

It's an interesting idea but I think the definition of Aspect is too restrictive:

trait Aspect[A, Dom[_], Cod[_]] {
  def weave(a: A)(ak: Aspect.Weave[Dom, Cod, *] ~> Id): A
}

It doesn't allow us to change the return type. So we're only able to perform side effects (like tracing). Currently AOP doesn't work when the interface has mixed return types because of the issues you mentioned.

Ah yes, I see. I have been thinking a bit, and maybe weave could instead "provide" the weaving data, see in the bottom.

I wonder if splitting the interface into streaming / non-streaming parts might be a good alternative? Then you would have Alg[F], Alg[Stream[F, *]] and Alg[Instrumentation[Stream[F, *], *]] But I realize that's not common practice so we need some ideas to improve the situation.

My particular use case is adding tracing to gRPC services, that is, the interface is codegenerated. As such, splitting it up is unfortunately out of my control.

Update

This formulation is not there yet either, since translateing the stream is not the preferred semantics. This by itself implies Stream[Instrumentation[F, *], *] from the original description.

Formulation

Consider the following formulation, where everything weave actually does run a Kleisli. We can then use natural transformations and a FunctorK instance to modify the algebra however we want:

trait Aspect[Alg[_[_]], Dom[_]] {
  type G[F[_], A] = Kleisli[F, Aspect.Weave[Dom], A]
  def weave[F[_]](a: Alg[G[F, *]]): Alg[F]
}

trait MyAlg[F[_]] {
  def getData(id: String): F[String]

  def getDataStream(id: String): fs2.Stream[F, String]
}

def aspectConstructionMacro[Dom[_]](implicit
  d1: Dom[String],
  fk: FunctorK[fs2.Stream[*[_], String]
): Aspect[MyAlg, Dom] = new Aspect[MyAlg, Dom] {
  override def weave[F[_]](a: MyAlg[G[F, *]]): MyAlg[F] =
    new MyAlg[F] {
      override def getData(id: String): F[String] =
        a.getData(id).run {
          Aspect.Weave(
            "MyAlg",
            List(List(Aspect.Advice("id", Eval.now(id)))),
            "getData"
          )
        }

      override def getDataStream(id: String): fs2.Stream[F, String] =
        fk.mapK(a.getDataStream(id)) {
          Kleisli.applyK[F, Aspect.Weave[Dom]] {
            Aspect.Weave[Dom](
              "MyAlg",
              List(List(Aspect.Advice("id", Eval.now(id)))),
              "getDataStream"
            )
          }
        }
    }
}

def traceK[F[_]: Trace, Dom[_]]: F ~> Kleisli[F, Aspect.Weave[Dom], *] =
  new (F ~> Kleisli[F, Aspect.Weave[Dom], *]) {
    override def apply[A](fa: F[A]): Kleisli[F, Aspect.Weave[Dom], A] =
      Kleisli { weave =>
        Trace[F].span(s"${weave.algebraName}.${weave.codomainName}") {
          fa
        }
      }
  }

def example[F[_]: Trace] = {
  implicit def functorKForStream[A] = new FunctorK[fs2.Stream[*[_], A]] {
    override def mapK[F0[_], G[_]](af: fs2.Stream[F0, A])(fk: F0 ~> G): fs2.Stream[G, A] =
      af.translate(fk)
  }
  val alg: MyAlg[F] = ???

  implicit val functorK = Derive.functorK[MyAlg]
  import cats.tagless.implicits._

  val o: MyAlg[F] = aspectConstructionMacro[Trivial].weave(alg.mapK(traceK[F, Trivial]))

  val liftToInstrumentation =
    alg
      .mapK(traceK[F, Trivial])
      .mapK(new (Kleisli[F, Aspect.Weave[Trivial], *] ~> Kleisli[Instrumentation[F, *], Aspect.Weave[Trivial], *]) {
        override def apply[A](
          fa: Kleisli[F, Aspect.Weave[Trivial], A]
        ): Kleisli[Instrumentation[F, *], Aspect.Weave[Trivial], A] =
          Kleisli { weave =>
            Instrumentation(fa.run(weave), weave.algebraName, weave.codomainName)
          }
      })

  val o2: MyAlg[Instrumentation[F, *]] = aspectConstructionMacro[Trivial].weave(liftToInstrumentation)
}

object Aspect {
  trait Advice[F[_], G[_]] {
    type A
    def name: String
    def target: F[A]
    implicit def instance: G[A]
  }

  object Advice {
    type Aux[F[_], G[_], A0] = Advice[F, G] { type A = A0 }

    def apply[F[_], G[_], A](
      name: String,
      target: F[A]
    )(implicit instance: G[A]): Aux[F, G, A] = {
      type A0 = A
      val name0 = name
      val target0 = target
      val instance0 = instance
      new Advice[F, G] {
        type A = A0
        val name = name0
        val target = target0
        val instance = instance0
      }
    }
  }

  case class Weave[Dom[_]](
    algebraName: String,
    domain: List[List[Advice[Eval, Dom]]],
    codomainName: String
  )
}
bpholt commented 1 year ago

@ValdemarGr Can you share what you expect the traces to look like for those gRPC streams? I haven't really engaged with tracing on streams because I don't totally understand how it should work. (Typically I'll wrap tracing around the compiled stream effect instead, but I know that's not a perfect solution either.)

ValdemarGr commented 1 year ago

@ValdemarGr Can you share what you expect the traces to look like for those gRPC streams? I haven't really engaged with tracing on streams because I don't totally understand how it should work. (Typically I'll wrap tracing around the compiled stream effect instead, but I know that's not a perfect solution either.)

It requires Trace to implement Resource[F, F ~> F] which can then be lifted into Stream[F, F ~> F] and then you can _.flatMap(fk => fb.translate(fk)).

A recent version of natchez implemented this feature in core: https://github.com/tpolecat/natchez/blob/27ebf05927652cca61f0187e3735865e2f75fa0e/modules/core/shared/src/main/scala/Trace.scala#L524

def streamDataFromDB[F[_]: Trace: MonadCancelThrow](id: String): fs2.Stream[F, String] =
  Trace[fs2.Stream[F, *]].span("alg.streamDataFromDB") {
    getDBStream(id)
  }
joroKr21 commented 1 year ago

Hmm how would that work for infinite streams though?

ValdemarGr commented 1 year ago

Hmm how would that work for infinite streams though?

I'm not entirely sure how to anwser this, so I'll just do a little write up.

A Span is submitted asynchronously when its scope ends (exposed in natched via Resource). A Span references its parent, so it may be submitted even though it's parent has not completed.

For instance consider the following example:

Trace[fs2.Stream[F, *]].span("example") {
  infStream.evalMap(x => Trace[F].span("doingSomething")(doSomething(x)))
}

Throughout the application lifetime many spans with name "doSomething" may be allocated and released, and they all point to the same "example" span. Whenever the application is cancelled, the release effect is invoked for the "example" span with it's respective duration.

Consider the following real example from an infinite stream (a websocket): image