levkhomich / akka-tracing

A distributed tracing extension for Akka. Provides integration with Play framework, Spray and Akka HTTP.
Other
307 stars 32 forks source link

A traced message cannot be be a child of itself #85

Closed reuben-sutton closed 7 years ago

reuben-sutton commented 8 years ago

In my planned use of zipkin, I want to forward the same message along to multiple actors, e.g. through a "routing" actor and to a bunch of "controller" actors

My understanding is that this requires marking the message as a child of itself - this doesn't successfully create spans when sent to zipkin.

I have adapted the TraceHierarchy sample to reflect this:

package sample

import java.util
import java.util.UUID
import scala.collection.JavaConversions._
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration._
import scala.util.Random

import akka.actor.{ActorSystem, Props, ActorRef, Actor}
import akka.pattern.{ask, pipe}
import akka.util.Timeout

import com.github.levkhomich.akka.tracing.{ActorTracing, TracingSupport}
import com.typesafe.config.ConfigFactory

final case class ExternalRequest(headers: util.Map[String, String], payload: String) extends TracingSupport
final case class ExternalResponse(responseCode: Int, payload: String)
final case class InternalRequest(payload: String) extends TracingSupport
final case class InternalResponse(responseCode: Int, payload: String)

class RequestHandler extends Actor with ActorTracing {
  val child: ActorRef = context.actorOf(Props[DelegateActor])
  implicit val askTimeout: Timeout = 100.milliseconds

  override def receive: Receive = {
    case msg @ ExternalRequest(headers, payload) =>
      println("RequestHandler received " + msg)

      // sample request
      trace.sample(msg, this.getClass.getSimpleName)

      // add info about request headers to trace
      headers.foreach { case (k, v) => trace.recordKeyValue(msg, k, v)}

      // to trace child request correctly, mark it using asChildOf
      child ? msg.asChildOf(msg) recover {
        case e: Exception =>
          // trace exception
          trace.record(msg, e)
          InternalResponse(500, "")
      } map {
        case InternalResponse(responseCode, resp) =>
          // close trace by marking response
          ExternalResponse(responseCode, resp + '!').asResponseTo(msg)
      } pipeTo sender
  }
}

class DelegateActor extends Actor with ActorTracing {
  override def receive: Receive = {
    case msg @ ExternalRequest(_, payload) =>
      println("DelegateActor received " + msg)
      trace.sample(msg, this.getClass.getSimpleName)
      // another computation (sometimes leading to timeout processed by recover block above)
      Thread.sleep(Random.nextInt(200))
      sender ! InternalResponse(200, s"Hello, $payload").asResponseTo(msg)
  }
}

// create actor system and send messages every second
object TraceHierarchy extends App {
  implicit val askTimeout: Timeout = 500.milliseconds
  def random = UUID.randomUUID().toString
  val system = ActorSystem.create("TraceHierarchy", ConfigFactory.load("application"))
  val handler = system.actorOf(Props[RequestHandler])
  system.scheduler.schedule(3.seconds, 1.second) {
    handler ? ExternalRequest(Map("userAgent" -> random), random)
  }
  system.awaitTermination()
}
levkhomich commented 8 years ago

You can use annotations to mark moments when message is actually routed or received by specific actor. So, when the message is created/reaches the system you can sample it and then use trace.record calls in child actors.