www1350 / javaweb

http://www1350.github.io/
31 stars 5 forks source link

Akka #117

Open www1350 opened 7 years ago

www1350 commented 7 years ago

介绍

Actor 模型由 Carl Hewitt 于上世纪70年代早期提出,目的是为了解决分布式编程中一系列的编程问题。Actor 的要点包括:Actor 是一个个相互之间独立的实体; Actor 可以通过消息来通信,一个 Actor 收到其他Actor的信息后,可以根据需要作出各种相应反应;消息的类型可以是任意的,消息的内容也可以是任意的;当一个 Actor 收到多个消息时,它先建立一个消息队列,将接收到的消息就放入队列,每次从队列中取出一个消息体进行处理。
  Akka 是一个用 Scala 编写的库,用于简化编写容错的、高可伸缩性的 Actor 模型应用。Akka 使得开发人员可以更轻松地开发具有容错性、可扩展性和跨平台的并发程序,在工业界得到了广泛应用。Akka能够给应用程序带来的几个重要的特性是:

入门

public class MyActor extends AbstractActor {
    private final LoggingAdapter log = Logging.getLogger(getContext().getSystem(), this);

    static Props props() {
        return Props.create(MyActor.class, () -> new MyActor());
    }

    @Override
    public Receive createReceive() {
        return receiveBuilder()
                .match(String.class, s -> {
                    log.info("Received String message: {}", s);
                })
                .matchAny(o -> log.info("received unknown message"))
                .build();
    }

}
  @Test
    public void actor(){
        final ActorSystem system = ActorSystem.create("helloakka");
        try {
            final ActorRef howdyGreeter =
                    system.actorOf(MyActor.props());
            howdyGreeter.tell("2",ActorRef.noSender());
            howdyGreeter.tell(1,ActorRef.noSender());
            System.in.read();
        } catch (IOException e) {
            e.printStackTrace();
        }finally {
            system.terminate();
        }
    }

ActorSystem

ActorSystem主要有以下三个功能:

  1. Akka中Actor的组织是一种树形结构
  2. 每个Actor都有父级,有可能有子级当然也可能没有
  3. 父级Actor给其子级Actor分配资源,任务,并管理其的生命状态(监管和监控)

Actor引用,路径和地址

Actor引用是ActorRef的子类,每个Actor有唯一的ActorRef,Actor引用可以看成是Actor的代理,与Actor打交道都需要通过Actor引用,Actor引用可以帮对应Actor发送消息,也可以接收消息,向Actor发送消息其实是将消息发送到Actor对应的引用上,再由它将消息投寄到具体Actor的信箱中,所以ActorRef在整个Actor系统是一个非常重要的角色。

@Data
@AllArgsConstructor
public class Message {
    protected String content;
}

public class Meeting extends Message{
    public Meeting(String content) {
        super(content);
    }
}
public class Done extends Message{
    public Done(String content) {
        super(content);
    }
}

public class DoAction extends Message{
    public DoAction(String content) {
        super(content);
    }
}

@Data
public class Confirm extends Message{
    private ActorPath actorPath;

    public Confirm(String content, ActorPath actorPath) {
        super(content);
        this.actorPath = actorPath;
    }
}

public class Business extends Message{
    public Business(String content) {
        super(content);
    }
}
        final ActorSystem system = ActorSystem.create("company");
        ActorRef bossRef = system.actorOf(BossActor.props(),"boss");
        bossRef.tell(new Business("二手车市场"),ActorRef.noSender());
public class BossActor extends AbstractActor {
    private final LoggingAdapter log = Logging.getLogger(getContext().getSystem(), this);

    private int taskCount = 0;

    public static Props props() {
        return Props.create(BossActor.class, () -> new BossActor());
    }

    Timeout t = new Timeout(Duration.create(5, TimeUnit.SECONDS));

    @Override
    public Receive createReceive() {
        return receiveBuilder()
                .match(Business.class, b -> {
                    log.info("看准了商机!行动!");
                    log.info("我们是 {} 公司",self().path().address());
                    List managers = new ArrayList<>();
                    ActorRef managerActorRef = context().actorOf(ManagerActor.props(), "managerA"); //这里我们召唤3个主管
                    ActorRef managerActorRef2 = context().actorOf(ManagerActor.props(), "managerB"); //这里我们召唤3个主管
                    ActorRef managerActorRef3 = context().actorOf(ManagerActor.props(), "managerC"); //这里我们召唤3个主管
                    managers.add(managerActorRef);
                    managers.add(managerActorRef2);
                    managers.add(managerActorRef3);
                    Iterator<ActorRef> iterator = managers.iterator();
                    while(iterator.hasNext()){
                        ActorRef managerActor = iterator.next();
                        CompletableFuture<Object> future =
                                ask(managerActor, new Meeting("开会讨论一下"), t)
                                        .toCompletableFuture();
                        CompletableFuture<Confirm> transformed = CompletableFuture.allOf(future)
                                    .thenApply(v -> {
                                        Confirm x = (Confirm) future.join();
                                        return x;
                                    });
                        Confirm confirm =  transformed.get();
                        log.info("{} ---{}",confirm.getContent(),confirm.getActorPath().parent().toString());
                        ActorSelection manager = context().actorSelection(confirm.getActorPath());
                        manager.tell(new DoAction("展开业务"),self());
                    }
                })
                .match(Done.class , d -> {
                    log.info("? {}",d.getContent());
                    taskCount++;
                    if (taskCount == 3) {
                        log.info("项目做完了,涨工资!");
                        context().system().terminate();
                    }
                })
            .build();
    }
}
public class ManagerActor extends AbstractActor{
    private final LoggingAdapter log = Logging.getLogger(getContext().getSystem(), this);

    public static Props props() {
        return Props.create(ManagerActor.class, () -> new ManagerActor());
    }
    @Override
    public Receive createReceive() {
        return receiveBuilder()
                .match(Meeting.class, m->{
                    log.info(" {} 加入会议",self().path());
                    log.info("老板说 {}",m.getContent());
                        sender().tell(
                                new Confirm("老板,我知道了",self().path()),
                                self());
                })
                .match(DoAction.class,d->{
                    log.info("分配工作");
                    ActorRef workerActorRef = context().actorOf(WorkerActor.props(), "worker");
                    workerActorRef.forward(d,context());
                })
                .build();
    }
}
public class ManagerActor extends AbstractActor{
    private final LoggingAdapter log = Logging.getLogger(getContext().getSystem(), this);

    public static Props props() {
        return Props.create(ManagerActor.class, () -> new ManagerActor());
    }
    @Override
    public Receive createReceive() {
        return receiveBuilder()
                .match(Meeting.class, m->{
                    log.info(" {} 加入会议",self().path());
                    log.info("老板说 {}",m.getContent());
                        sender().tell(
                                new Confirm("老板,我知道了",self().path()),
                                self());
                })
                .match(DoAction.class,d->{
                    log.info("分配工作");
                    ActorRef workerActorRef = context().actorOf(WorkerActor.props(), "worker");
                    workerActorRef.forward(d,context());
                })
                .build();
    }
}

image

复杂一点的操作: https://scala.cool/2017/04/learning-akka-2/

监管

监管者

image

一个actor系统在其创建过程中至少要启动三个actor,如上图所示,下面来说说这三个Actor的功能:

1./: 根监管者

顾名思义,它是一个老大,它监管着ActorSystem中所有的顶级Actor,顶级Actor有以下几种:

跟我们平常打交道最多的就是/user,它是我们在程序中用ActorSystem.actorOf创建的actor的监管者,下面的容错我们重点关心的就是它下面的失败处理,其他几种顶级Actor具体功能定义已经给出,有兴趣的也可以去了解一下。

根监管者监管着所有顶级Actor,对它们的各种失败情况进行处理,一般来说如果错误要上升到根监管者,整个系统就会停止。

2./user: 顶级actor监管者 上面已经讲过/user是所有由用户创建的顶级actor的监管者,即用ActorSystem.actorOf创建的actor,我们可以自己制定相应的监管策略,但由于它是actor系统启动时就产生的,所以我们需要在相应的配置文件里配置,具体的配置可以参考这里Akka配置 https://doc.akka.io/docs/akka/current/scala/general/configuration.html

3./system: 系统监管者

/system所有由系统创建的顶级actor的监管者,比如Akka中的日志监听器,因为在Akka中日志本身也是用Actor实现的,/system的监管策略如下:对收到的除ActorInitializationException和ActorKilledException之外的所有Exception无限地执行重启,当然这也会终止其所有子actor。所有其他Throwable被上升到根监管者,然后整个actor系统将会关闭。

用户创建的普通actor的监管:

Actor系统的组织结构,是一种树形结构,其实这种结构对actor的监管是非常有利的,Akka实现的是一种叫“父监管”的形式,每一个被创建的actor都由其父亲所监管,这种限制使得actor的监管结构隐式符合其树形结构,所以我们可以得出一个结论:

一个被创建的Actor肯定是一个被监管者,也可能是一个监管者,它监管着它的子级Actor

监管策略

上面我们对ActorSystem中的监管角色有了一定的了解,那么到底是如何制定相应的监管策略呢?Akka中有以下4种策略:

https://scala.cool/2017/05/learning-akka-3/

Akka共享内存

通过通讯来实现共享内存,而不是用共享内存来实现通讯

Mailbox

Mailbox在Actor模型是一个很重要的概念,我们都知道向一个Actor发送的消息首先都会被存储到它所对应的Mailbox中,那么我们先来看看MailBox的定义结构

private[akka] abstract class Mailbox(val messageQueue: MessageQueue) 
 extends ForkJoinTask[Unit] with SystemMessageQueue with Runnable {}

很清晰Mailbox内部维护了一个messageQueue这样的消息队列,并继承了Scala自身定义的ForkJoinTask任务执行类和我们很熟悉的Runnable接口,由此可以看出,Mailbox底层还是利用Java中的线程进行处理的。那么我们先来看看它的run方法:

override final def run(): Unit = {
    try {
      if (!isClosed) { //Volatile read, needed here
        processAllSystemMessages() //First, deal with any system messages
        processMailbox() //Then deal with messages
      }
    } finally {
      setAsIdle() //Volatile write, needed here
      dispatcher.registerForExecution(this, false, false)
    }
  }
@inline
  final def currentStatus: Mailbox.Status = Unsafe.instance.getIntVolatile(this, AbstractMailbox.mailboxStatusOffset)

@inline
  final def isClosed: Boolean = currentStatus == Closed

@volatile
  protected var _statusDoNotCallMeDirectly: Status = _ //0 by default
@tailrec private final def processMailbox(
    left:       Int  = java.lang.Math.max(dispatcher.throughput, 1),
    deadlineNs: Long = if (dispatcher.isThroughputDeadlineTimeDefined == true) System.nanoTime + dispatcher.throughputDeadlineTime.toNanos else 0L): Unit =
    if (shouldProcessMessage) {
      val next = dequeue()  //去出下一条消息
      if (next ne null) {
        if (Mailbox.debug) println(actor.self + " processing message " + next)
        actor invoke next
        if (Thread.interrupted())
          throw new InterruptedException("Interrupted while processing actor messages")
        processAllSystemMessages()
        if ((left > 1) && ((dispatcher.isThroughputDeadlineTimeDefined == false) || (System.nanoTime - deadlineNs) < 0))
          processMailbox(left - 1, deadlineNs) //递归处理下一条消息
      }
    }

Actor是如何保证串行处理消息的

@tailrec
  final def setAsScheduled(): Boolean = {  //是否有线程正在调度执行该MailBox的任务
    val s = currentStatus
    /*
     * Only try to add Scheduled bit if pure Open/Suspended, not Closed or with
     * Scheduled bit already set.
     */
    if ((s & shouldScheduleMask) != Open) false
    else updateStatus(s, s | Scheduled) || setAsScheduled()
  }

当已有线程在执行返回false,若没有则去更改状态为以调度,直到被其他线程抢占或者更改成功,其中updateStatus()是线程安全的,我们可以看一下它的实现,是一个CAS操作

@inline
  protected final def updateStatus(oldStatus: Status, newStatus: Status): Boolean =
    Unsafe.instance.compareAndSwapInt(this, AbstractMailbox.mailboxStatusOffset, oldStatus, newStatus)

Akka persistence的核心架构

https://scala.cool/2017/07/learning-akka-7/

CQRS

https://scala.cool/2017/08/learning-akka-8/

配置

使用Akka可以不用任何配置,Akka提供了明智的默认配置。为了适应特别的运行环境,修改默认行为,你可能需要修改:

Akka的所有配置信息装在 ActorSystem的实例中, 或者换个说法, 从外界看来, ActorSystem 是配置信息的唯一消费者. 在构造一个actor系统时,你可以传进来一个 Config object,如果不传,就相当于传进来 ConfigFactory.load() (使用正确的classloader). 这意味着将会读取classpath根目录下的所有application.conf, application.json and application.properties这些文件—请参阅之前推荐的文档以了解细节. 然后actor系统会合并classpath根目录下的 reference.conf 来组成其内部使用的缺省配置

如果你编写的是一个Akka应用,把配置放在classpath根目录下的 application.conf 中. 如果你编写的是一个基于Akka的库,把配置放在jar包根目录下的 reference.conf 中.

Akka会读取所有jar包的reference.conf配置,所以如果你把多个jar包合并成一个jar,那么你也必须合并这些reference.conf,否则默认配置会丢失,导致Akka不能正常工作

    final ActorSystem system = ActorSystem.create("CalculatorSystem",
        ConfigFactory.load(("calculator")));

Akka Cluster

一些相同的ActorSystem的组合,它们具有着相同的功能,我们需要执行的任务可以随机的分配到目前可用的ActorSystem上,这点跟Nginx的负载均衡很类似,根据算法和配置将请求转发给运行正常的服务器去,Akka集群的表现形式也是这样,当然它背后的理论基础是基于gossip协议的,目前很多分布式的数据库的数据同步都采用这个协议。

Seed Nodes

Seed Nodes可以看过是种子节点或者原始节点,它的一个主要作用用于可以自动接收新加入集群的节点的信息,并与之通信,使用方式可以用配置文件或者运行时指定,推荐使用配置文件方式,比如:

akka.cluster.seed-nodes = [
  "akka.tcp://ClusterSystem@host1:2552",
  "akka.tcp://ClusterSystem@host2:2552"]

seed-nodes列表中的第一个节点会集群启动的时候初始化,而其他节点则是在有需要时再初始化。

当然你也可以不指定seed nodes,但你可以需要手动或者在程序中写相关逻辑让相应的节点加入集群,具体使用方式可参考官方文档。

Cluster Events

Cluster Events字面意思是集群事件,那么这是什么意思呢?其实它代表着是一个节点的各种状态和操作,举个例子,假设你在打一局王者5v5的游戏,那么你可以把十个人看成一个集群,我们每个人都是一个节点,我们的任何操作和状态都能被整个系统捕获到,比如A杀了B、A超神了,A离开了游戏,A重新连接了游戏等等,这些状态和操作在Cluster Events中就相当于节点之于集群,那么它具体是怎么使用的呢?

首先我们必须将节点注册到集群中,或者说节点订阅了某个集群,我们可以这么做: cluster.subscribe(getSelf(), MemberEvent.class, UnreachableMember.class);

cluster.subscribe(getSelf(), ClusterEvent.initialStateAsEvents(), 
    MemberEvent.class, UnreachableMember.class);

从上面的代码我们可以看到有一个MemberEvent的概念,这个其实就是每个成员所可能拥有的events,那么一个成员在它的生命周期中有以下的events

状态说明:

demo

akka {
  actor {
    provider = "cluster"
  }
  remote {
    log-remote-lifecycle-events = off
    netty.tcp {
      hostname = "127.0.0.1"
      port = 0
    }
  }

  cluster {
    seed-nodes = [
      "akka.tcp://ClusterSystem@127.0.0.1:2551",
      "akka.tcp://ClusterSystem@127.0.0.1:2552"]

    # auto downing is NOT safe for production deployments.
    # you may want to use it during development, read more about it in the docs.
    #
    # auto-down-unreachable-after = 10s
  }
}

# Enable metrics extension in akka-cluster-metrics.
akka.extensions=["akka.cluster.metrics.ClusterMetricsExtension"]

# Sigar native library extract location during tests.
# Note: use per-jvm-instance folder when running multiple jvm on one host.
akka.cluster.metrics.native-library-extract-folder=${user.dir}/target/native
public class SimpleClusterListener extends AbstractActor {
    LoggingAdapter log = Logging.getLogger(getContext().getSystem(), this);
    Cluster cluster = Cluster.get(getContext().getSystem());

    @Override
    public void preStart() throws Exception {
        cluster.subscribe(getSelf(), ClusterEvent.initialStateAsEvents(),
                ClusterEvent.MemberEvent.class, ClusterEvent.UnreachableMember.class);
    }

    @Override
    public void postStop() throws Exception {
        cluster.unsubscribe(getSelf());
    }

    @Override
    public Receive createReceive() {
        return receiveBuilder()
                .match(ClusterEvent.MemberUp.class, mUp -> {
                    log.info("Member is Up: {}", mUp.member());
                })
                .match(ClusterEvent.UnreachableMember.class, mUnreachable -> {
                    log.info("Member detected as unreachable: {}", mUnreachable.member());
                })
                .match(ClusterEvent.MemberRemoved.class, mRemoved -> {
                    log.info("Member is Removed: {}", mRemoved.member());
                })
                .match(ClusterEvent.MemberEvent.class, message -> {
                    // ignore
                })
                .build();
    }
}

文本转化example https://github.com/akka/akka-samples/tree/2.5/akka-sample-cluster-java/src/main/java/sample/cluster/transformation

www1350 commented 7 years ago

aa

www1350 commented 7 years ago

fds