timzaak / blog

8 stars 1 forks source link

如何规避 akka ask timeout 的小陷阱 #69

Closed timzaak closed 3 years ago

timzaak commented 3 years ago

我一直把 actor 当成理想的并行处理工具,可以通过设置Router 的size,限制并行数, 但今天遇到的问题,令我产生了动摇(主要是解决不了)。实例代码如下:

//TestActor.scala
import akka.actor.typed.Behavior
import akka.actor.typed.scaladsl._
import com.strategytech.TestActor.{M1, Message}

class TestActor(context: ActorContext[Message]) extends AbstractBehavior[Message](context) {

  override def onMessage(msg: Message): Behavior[Message] = {
    val m = msg.asInstanceOf[M1]
    println(s"come: ${m.number}")
    if (m.number == 0) {
      //当消息 == 0 的时候,会block 很长时间。
      Thread.sleep(1000 * 100)
      throw new Exception(s"fuck error: ${m.number}")
    }
    m.ref ! m.num
    Behaviors.same
  }
}

object TestActor {
  sealed trait Message
  case class M1(number: Int, ref:ActorRef[Int]) extends Message
}

//DI.scala

  lazy val testActorRef: ActorRef[Message] = actorSystem.spawn(
    Routers
      .pool(3)(
        Behaviors
          .supervise[Message](Behaviors.setup(context => new TestActor(context)))
          .onFailure[Exception](SupervisorStrategy.resume)
      ),
    "testActor"
  )

  import scala.concurrent.duration._

  actorSystem.scheduler.scheduleOnce(2.seconds) {
    for (index <- 0 to 100) {
      testActorRef.ask[Int](ref=> M1(index,ref))(2.seconds, executionContext)
    }
  }(executionContext)

上面的代码中,会有3分之一的消息会因为 TestActor 的阻塞,而得不到及时处理,产生一长串的 Timeout。

虽然可以通过 设置 Dispatcher 为 fork-join-pool 来缓解上述问题,但更好的解决方案应该是当 actor 执行消息时间过长的时候,被杀死,然后创建新的,继续执行。

目前尚未找到更好的方案。

timzaak commented 3 years ago

这个问题,需要看任务是否可干掉吧,有些就是不行。那么就只能上akka自带的线程监控,报警,重启。 对于可以干掉的,可以 Awaits + Cancelable Future 来解决,虽不够优雅,但是管用。