timzaak / blog

8 stars 1 forks source link

DAG 任务编排 #107

Closed timzaak closed 8 months ago

timzaak commented 8 months ago

先简单做一个任务抽象和组合函数处理

trait Task[Param,Resp] {
  def run(p:Param):Resp
}

trait FutureTask[Param, Resp](using ExecutionContext) extends Task[Param, Future[Resp]] {
}

case class LinkTask[P,R,R2](task1:Task[P,R], task2:Task[R,R2]) extends Task[P,R2] {

  override def run(p: P): R2 = task2.run(task1.run(p))
}

case class EitherTask[P,P2,E, E1<:E,E2<:E,R, R1<:R, R2<:R](task1:Task[P,Either[E1,R1]], task2:Task[P2,Either[E2,R2]]) extends Task[(P,P2), Either[E,R]] {
  override def run(p: (P, P2)): Either[E,R] = {
    val (p1,p2) = p
    task1.run(p1) match {
      case Left(e) => task2.run(p2)
      case Right(v) => Right(v)
    }
  }
}

extension [P,R,E](task:Task[P, Either[E,R]]) {
  def or[P2,E2<:E,R2<:R](t: Task[P2, Either[E2,R2]]): Task[(P, P2), Either[E,R]] = {
      EitherTask(task, t)
  }
}

extension [P,R](task:Task[P,R]) {
  @targetName("next")
  def ~>[Resp](t2: Task[R, Resp]): Task[P, Resp] = {
    CombineTask(task,t2)
  }

  def toEither[E]:Task[P, Either[E,R]] = (p:P) => Right(task.run(p))

  def toFuture:Task[P,Future[R]] = (p:P) => Future.successful(task.run(p))
}

至于 DAG,就是将上述的 EitherTask、LinkTask 归属于 DAG 的构造 API,再补充个无环图校验。具体执行的时候,需要再注入一个 Runtime,该 Runtime 解决调度,ExecutionContext,结果存储,执行跟踪统计。