博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Akka:基于Actor的并发解决方案
阅读量:3935 次
发布时间:2019-05-23

本文共 25855 字,大约阅读时间需要 86 分钟。

Actor模型

在面向对象编程中,一个对象可以访问或修改另一个对象的值,在高并发情况下,由于机器性能的瓶颈,当有多个对象对同一竞争资源进行操作时,可能会出现数据错误的问题(即实际读取的数据不是预期数据,而是前面阶段到这一阶段未修改完成的数据)。Actor模型对此进行了修改,它不是直接对对象进行操作,而是通过消息传递的方式与外界进行交互。如图所示:1591594965284

Actor一次只接收处理一个消息,未处理消息会被放入队列等待处理。

Actor有几个重要概念:

  • Actor:处理消息并修改内部状态的工作节点。
  • 消息:用于在多个Actor之前通信的数据。
  • 消息传递:一种开发模式,通过传递消息来触发行为。
  • 邮箱地址:消息传递的目标地址,在Actor空闲时会从该地址获取消息。
  • 邮箱:存储多个未处理消息的队列。
  • Actor系统:由Actor集合、邮箱地址、邮箱和配置等组成的系统。

在一个应用中,所有Actor组成了ActorSystem(Actor系统),它是一个层级结构,除顶级Actor外所有Actor都有一个父Actor,当子Actor在处理消息时出现异常情况,父Actor可以通过预先指定的方式来处理子Actor,处理方式有:恢复子Actor、重启子Actor、停止子Actor以及扩大化失败。在ActorSystem创建时,默认会启动三个Actor。

所有Actor都有自己的生命周期,Akka提供了对应的函数来响应不同的生命周期。常见的操作是构建一个Actor来处理其他Actor死亡时传递的消息,这个Actor也被称为Death Wath

因为Actor是通过消息进行通信的,所以对于其他Actor是在本地还是在远程它都不在乎,Actor仅仅操作它的引用。

粗略了解了Actor的相关知识后,接下来,我们就开始Akka的学习。

我的环境为:

操作系统:Windows10

jdk版本:jdk11

Akka依赖:

com.typesafe.akka
akka-actor_2.13
2.6.0

Hello Akka

Akka是一个用于高并发、分布式、弹性伸缩场景下的消息驱动应用开发框架,它基于Actor模型,为开发者提供了消息控制状态的开发思想。前面提到,Actor通过操作模型的引用来控制内部状态的变化,Akka对此的实现是ActorRef对象,Akka通过ActorSystem.create()方法获取Actor系统,然后调用API得到指定Actor的引用,通过引用发送消息来与其它Actor通信。

首先,让我们来尝试一下Akka版的Hello World

public class TestAkka{
static ExecutorService threadPool = Executors.newFixedThreadPool(10000); public static void test(){
Demo demo = new Demo(); for (int i = 0; i < 100000; i++) {
threadPool.execute(new TestAkkaThread(demo)); } } public static void testAkka(){
// 获取Actor系统 ActorSystem sys = ActorSystem.create(); // 获取指定Actor ActorRef ref = sys.actorOf(Props.create(AkkaDemo.class), "startActor"); for (int i = 0; i < 100000; i++) {
threadPool.execute(new TestAkkaThread(ref)); } } public static void main(String[] args) {
// test(); testAkka(); }}class AkkaDemo extends UntypedAbstractActor {
private static int cnt = 1; public void onReceive(Object message){
// 当Actor接收到消息时,自动调用此方法 System.out.println(String.format("第:%d次接收消息", cnt++)); }}class Demo {
private static int cnt = 1; public void tell(){
System.out.println(String.format("第:%d次接收消息", cnt++)); }}class TestAkkaThread implements Runnable{
private Object ref; public TestAkkaThread(Object ref) {
this.ref = ref; } @Override public void run() {
if (ref instanceof ActorRef) // 通过ActorRef向对应Actor传送消息 ((ActorRef)ref).tell("", ActorRef.noSender()); else ((Demo)ref).tell(); }}

getSelf():获取当前Actor的引用

getSender():返回当前Actor接收的消息的发送者的引用,比如如果Actor A向Actor B发送消息,则当B调用getSender()时,它将返回Actor A的引用。在这里可以简单理解为:返回来发送回应消息的目标的引用。

Akka提供两种发送消息的机制,分别为tellask,两者的主要区别有:

  1. tell为同步发送,ask为异步发送。
  2. tell无返回值,ask可获取发送后的结果。

ask的应用如下:

public class StartAkka extends UntypedAbstractActor {
@Override public void onReceive(Object message){
System.out.println("接收消息:" + message); getSender().tell("返回消息", getSelf()); } public static void main(String[] args) {
ActorSystem sys = ActorSystem.create(); ActorRef ref = sys.actorOf(Props.create(StartAkka.class), "startAkka"); ref.tell("Hello Akka!", ActorRef.noSender()); Timeout timeout = new Timeout(10, TimeUnit.SECONDS); Future akka_ask = Patterns.ask(ref, "Akka Ask", timeout); System.out.println("ask..."); akka_ask.onComplete(new Function1
, Object>() {
@Override public Object apply(Try v1) {
// 获取回复成功的处理逻辑 if (v1.isSuccess()) System.out.println("发送成功,收到消息:" + v1.get()); // 获取回复失败的处理逻辑 if (v1.isFailure()) System.out.println("发送失败:" + v1.get()); return null; } }, sys.dispatcher()); System.out.println("continue..."); }}

Patterns.ask 方法会异步执行,假如Actor返回消息超时了,会产生一个akka.pattern.AskTimeoutException

sys.dispatcher():返回当前Akka的消息分发器,该内容到后面会讲到。

Actor查找

Actor模型中我们了解到一个Actor系统其实就是一棵树,每一个Actor都是一个节点,对于已存在的Actor,我们可以通过路径(当前路径/绝对路径)来查找:

public class SearchAkka extends UntypedAbstractActor {
private ActorRef target = getContext().actorOf(Props.create(Target.class), "targetActor"); @Override public void onReceive(Object message) throws Throwable, Throwable {
if (message instanceof String) {
if ("find".equals(message)){
/* LookupActor在收到"find"消息后,会通过ActorContext查找出ActorSelection对象. ActorSelection发送Identify时,需要指定一个messageId(用来区分Actor), 消息发送后,当前Actor会收到一个ActorIdentity,可以通过ActorIdentity.getRef() 方法来获取指定的ActorRef */ ActorSelection targetActor = getContext().actorSelection("targetActor"); // 异步查找Actor Timeout timeout = new Timeout(10, TimeUnit.SECONDS); Future find = Patterns.ask(targetActor, "find", timeout); find.onComplete(new Function1
, Object>() {
@Override public Object apply(Try v1) {
if (v1.isSuccess()) targetActor.tell(new Identify("A001"), getSelf()); if (v1.isFailure()) System.out.println("查找失败"); return null; } }, getContext().dispatcher()); } } else if (message instanceof ActorIdentity){
ActorIdentity actorIdentity = (ActorIdentity) message; if (actorIdentity.correlationId().equals("A001")) {
Optional
ref = actorIdentity.getActorRef(); if (!ref.isEmpty()){
System.out.println("ActorIdentity is:" + actorIdentity.correlationId() + " " + ref); ref.get().tell("hello target", getSelf()); } } } else unhandled(message); } public static void main(String[] args) {
ActorSystem system = ActorSystem.create("sys"); ActorRef actorRef = system.actorOf(Props.create(SearchAkka.class), "askActorDemo"); Timeout timeout = new Timeout(10, TimeUnit.MINUTES); Future
akka_ask = Patterns.ask(actorRef, "find", timeout); akka_ask.onComplete(new Function1
, Object>() { @Override public Object apply(Try v1) { if (v1.isSuccess()){ System.out.println("收到消息:" + v1.get()); } else if (v1.isFailure()){ System.out.println("湖获取消息失败"); } return null; } }, system.dispatcher()); }}/** * 被查找的对象 */class Target extends UntypedAbstractActor{ @Override public void onReceive(Object message) throws Throwable, Throwable { System.out.println("target actor reveive: " + message); }}

Actor生命周期

Actor在运行时中会经历不同的阶段,有创建并启动、恢复运行、重启、停止。Akka针对Actor不同的状态提供了对应的响应API:

preStart():启动前。

aroundPreStart():可以覆盖preStart()方法,默认情况下调用preStart()。

preRestart():重启前。(将被废弃)

aroundPreResrat():可以覆盖preRestart()方法,默认情况下调用preRestart()。

postRestart():重启后。

aroundPostRestart():可以覆盖postRestart()方法,默认情况下调用postRestart()。

aroundPostStop():可以覆盖postStop()方法,默认情况下调用postStop()。

postStop():停止后。

对于Actor的停止有三种方法:

  1. 调用ActorSystem或getContext()的stop方法:

    sys.stop(ref);

  2. 给Actor发送一个PoisonPill (毒丸)消息:

    ref.tell(PoisonPill.getInstance(), ActorRef.noSender());

  3. 给Actor 发送一个Kill 的消息, 此时会抛出ActorKilledException 异常:

    ref.tell(Kill.getInstance(), ActorRef.noSender());

当Actor停止时,它会执行以下流程:

  1. 在完全停止前处理完正在处理的消息,并不处理后续的消息,挂起消息队列。
  2. 给所有子级Actor发送终止指令,当所有子级Actor都停止后,再停掉子级。停止后会调用postStop()方法。
  3. 向生命周期监控者发送Terminated消息。

Actor行为切换

当我们处理业务时,可能需要针对不同的消息采用不同的处理逻辑。我们可以将多个状态的处理过程封装为对应组件,然后进行组装。在Akka中,提供了Producer实现,它有两个方法:become(切换为某个行为),unbecome(切换为上一个行为)。实例如下:

public class StateAkka extends UntypedAbstractActor {
private PartialFunction
procedure1 = new PartialFunction<>() {
@Override public BoxedUnit apply(Object param) {
System.out.println(param); if ("break".equals(param)) getContext().unbecome(); else System.out.println("state1:" + param); return null; } @Override public boolean isDefinedAt(Object x) {
return true; } }; private PartialFunction
procedure2 = new PartialFunction<>() {
@Override public BoxedUnit apply(Object param) {
System.out.println(param); if ("break".equals(param)) getContext().unbecome(); else System.out.println("state2:" + param); return null; } @Override public boolean isDefinedAt(Object x) {
return true; } }; @Override public void onReceive(Object message) {
/* 当Procedue执行了unbecome方法后,计算流程会重新进入onReceive内。 当调用了一次become之后,新Producre的代码逻辑会被保存进一个执行栈中, 此时可以通过调用UNbecome来返回到上一个Procure。你也可以在become方法中 传递第二个参数为false来表示不存储当前行为。 */ System.out.println("开始执行模式:" + message); if ("1".equals(message)) getContext().become(procedure1); if ("2".equals(message)) getContext().become(procedure2); } public static void main(String[] args) {
ActorSystem sys = ActorSystem.create("sys"); ActorRef ref = sys.actorOf(Props.create(StateAkka.class), "statActor"); ref.tell("1", ActorRef.noSender()); ref.tell("nihao", ActorRef.noSender()); ref.tell("nihao", ActorRef.noSender()); ref.tell("break", ActorRef.noSender()); ref.tell("2", ActorRef.noSender()); ref.tell("nihao", ActorRef.noSender()); ref.tell("nihao", ActorRef.noSender()); ref.tell("nihao", ActorRef.noSender()); ref.tell("nihao", ActorRef.noSender()); }}

Actor容错处理

Actor系统采用"父监督"的模式进行管理,即父Actor会监督子Actor的异常情况,然后根据默认或者预设的处理逻辑来确定到底是该恢复Actor 、停止Actor 、重启Actor还是把错误提交到父级。

Akka 提供了两种监督策略:

  • One-For-One Strategy (默认监督策略):当一个子Actor出现异常时,只对该Actor 做处理。
  • All-For-One Stratw,当一个子Actor出现异常时,对所有Actor 都做
    处理。

当程序中没有显式指定策略时,会启动一个默认策略,该策略遵循下列规则:

  1. 当抛出ActorlnitializationExceptionActorKilledException时,会终止子Actor。
  2. 当抛出Exception时,会重启子Actor。
  3. 抛出其他类型的Throwable异常时, 会上溯到父级。

自定义容错策略:

public class StrategyAkka extends UntypedAbstractActor {
// 定义监督策略 private SupervisorStrategy strategy = new OneForOneStrategy(3, Duration.apply("1 minute"), new Function
() {
@Override public SupervisorStrategy.Directive apply(Throwable err) throws Exception {
if (err instanceof IOException) {
System.out.println("-----------IOException-----------"); return SupervisorStrategy.resume(); // 恢复运行 } else if (err instanceof IndexOutOfBoundsException) {
System.out.println("-----------IndexOutOfBoundsException-----------"); return SupervisorStrategy.restart(); // 重启 } else if (err instanceof SQLException) {
System.out.println("-----------SQLException-----------"); return SupervisorStrategy.stop(); // 停止 } else {
System.out.println("-----------UnkownException-----------"); return SupervisorStrategy.escalate(); // 升级失败 } } }); @Override public SupervisorStrategy supervisorStrategy() {
return strategy; } @Override public void preStart() throws Exception {
ActorRef ref = getContext().actorOf(Props.create(WorkActor.class), "workActor"); // 监控生命周期 getContext().watch(ref); ref.tell("Hello", ActorRef.noSender()); ref.tell(new IOException(), ActorRef.noSender()); ref.tell(new IndexOutOfBoundsException(), ActorRef.noSender()); Timeout timeout = new Timeout(10, TimeUnit.SECONDS); Future
akka_ask = Patterns.ask(ref, "getValue", timeout); System.out.println("ask..."); akka_ask.onComplete(new Function1
, Object>() {
@Override public Object apply(Try v1) {
if (v1.isSuccess()) System.out.println("发送成功,收到消息:" + v1.get()); if (v1.isFailure()) System.out.println("发送失败:" + v1.get()); return null; } }, getContext().dispatcher()); System.out.println("continue..."); super.preStart(); } @Override public void onReceive(Object message) throws Throwable, Throwable {
if (message instanceof Terminated) System.out.println(((Terminated)message).getActor() + "已经停止"); else System.out.println("stateCount:" + message); } public static void main(String[] args) {
ActorSystem sys = ActorSystem.create("sys"); ActorRef ref = sys.actorOf(Props.create(StrategyAkka.class), "strategyActor"); }}class WorkActor extends UntypedAbstractActor {
private int state = 1; // 状态参数 @Override public void preStart() throws Exception, Exception {
System.out.println("start, state is:" + state++); super.preStart(); } @Override public void postStop() throws Exception {
System.out.println("stop"); super.postStop(); } @Override public void postRestart(Throwable reason) throws Exception {
System.out.println("postRestart"); super.postRestart(reason); } @Override public void onReceive(Object message) throws Exception {
// 模拟计算任务 this.state++; System.out.println("message:" + message); if (message instanceof Exception) throw (Exception) message; else if ("getValue".equals(message)) getSender().tell(state, getSelf()); else unhandled(message); }}

OneForOneStrategy对象需要三个参数:

  1. maxNrOfRetries:指定时间内的最大重启次数。
  2. withinTimeRange:指定时间大小。
  3. decider:接收一个Function对象,通过apply方法返回监督指令:
    • SupervisorStrategy.resume() :恢复运行
    • SupervisorStrategy.restart(): 重启
    • SupervisorStrategy.stop():停止
    • SupervisorStrategy.escalate():升级失败

Actor熔断机制

在分布环境下,可能由于网络或其它问题导致系统级联失败,为了防止系统不断重试而造成资源大量耗费,Actor提供了熔断机制。即当尝试指定次数后仍然失败,则反馈错误信息。熔断机制有以下三种状态:

  • Closed:正常情况下,熔断是关闭状态,当调用超过配置的等待时间,就增加一次失败计数,成功则重置计数。当失败达到指定次数时,会进入Open状态。
  • Open:调用者抛出CircuitBreakerOpenException错误,并在指定时间(resetTimeout)过后,进入Half-Open状态。
  • Half-Open:进入Half-Open状态后,会尝试执行第一次调用,如果第一次调用成功,则返回Close状态,否则进入Open状态,并等待下一个重启时间。
public class CricuitBreakAkka extends UntypedAbstractActor {
private ActorRef workChild; private static SupervisorStrategy strategy = new OneForOneStrategy(20, Duration.ofMinutes(1), new Function
() {
@Override public SupervisorStrategy.Directive apply(Throwable param) throws Exception {
// 直接恢复运行 return SupervisorStrategy.resume(); } }); @Override public SupervisorStrategy supervisorStrategy() {
return strategy; } @Override public void preStart() throws Exception {
super.preStart(); workChild = getContext().actorOf(Props.create(CricuitWorkActor.class), "workActor"); } @Override public void onReceive(Object message) throws Throwable {
workChild.tell(message, getSender()); } public static void main(String[] args) {
ActorSystem sys = ActorSystem.create(); ActorRef ref = sys.actorOf(Props.create(CricuitBreakAkka.class), "cricuitBreakActor"); for (int i = 0; i < 15; i++) {
ref.tell("block Hello " + i, ActorRef.noSender()); } }}class CricuitWorkActor extends UntypedAbstractActor{
private CircuitBreaker breaker; @Override public void preStart() throws Exception {
super.preStart(); /** * 在启动阶段创建CircuitBreaker对象,当向CricuitWorkActor发送"block"开头的字符串信息后,会阻塞3s,来触发超时并计数一次。 * 当计数达到3次后,CircuitBreaker会处于Open状态,触发onOpen函数。5s后会进入Half-Open状态,此时调用onHalfOpen函数。 * 然后继续发送消息,如果消息被成功处理,那么CircuitBreaker进入Closed状态,并调用onClose函数,否则又进入Open状态。 */ this.breaker = new CircuitBreaker(getContext().dispatcher(), getContext().system().scheduler(), 3, Duration.ofSeconds(2), Duration.ofSeconds(5)).onOpen(new Function0
() {
@Override public BoxedUnit apply() {
System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm").format(new Date()) + "--> Acator CircuitBreak 开启"); return null; } }).onHalfOpen(new Function0
() {
@Override public BoxedUnit apply() {
System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm").format(new Date()) + "--> Acator CircuitBreak 半开启"); return null; } }).onClose(new Function0
() {
@Override public BoxedUnit apply() {
System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm").format(new Date()) + "--> Acator CircuitBreak 关闭"); return null; } }); } @Override public void onReceive(Object message) throws Throwable {
if (message instanceof String) {
String msg = (String) message; if (msg.startsWith("block")) {
getSender().tell(breaker.callWithCircuitBreaker(new Callable() {
@Override public String call() throws Exception {
System.out.println("msg is: " + msg); Thread.sleep(3000); return null; } }), getSelf()); } } }}

自定义

定制Dispatcher

在Akka 中, Actor的消息通信和任务执行建立在一个完全透明的调度机制之上,它屏蔽了底层线程池的使用,暴露出了一个消息分发器作为线程的调度,这个消息分发器就是Dispatcher。

与Dispatcher相关联的为Executor,它为Dispatcher提供了执行异步任务的策略,Executor有两种类型:

  • thread-pool-executor:基于工作队列的线程池。
  • fork-join-executor:类似java的Fork/Join思想,基于工作窃取的线程池,是Akka的默认选项。

自定义Dispatcher

在类路径下创建application.conf文件,在里面写上:

my-forkjoin-dispatcher{ # 自定义的Dispatcher的名字    type = Dispatcher # dispatcher 类型    executor = "fork-join-executor"    fork-join-executor { # 配置forkjoin线程池        parallelism-min = 3 # 最小并发线程数        parallelism-factor = 3.0 # 并发因子        parallelism-max = 16 # 最大并发数    }    throughput = 1executor # 对于一个Actor,某个线程在处理下一个Actor之前能处理的最大消息数}my-pinned-dispatcher{    executor = "thread-pool-executor"    type  = PinnedDispatcher # PinnedDispatcher是另一种Dispatcher,为每个Actor提供只有一个线程的线程池}

parallelism-factor作为线程池的并发因子,影响线程池的最大 可用线程数:最大线程数=处理器个数*并发因子。

type包括:

  • Dispatcher:基于事件的调度器,将一组Actor绑定到线程池。

  • PinnedDispatcher:为每个Actor 提供只有一个线程的线程池。

  • CallingThreadDispatcher:不创建执行线程,当前线程执行Actor调用。

在代码中使用自己指定的Dispatcher:

// 在创建Actor引用时,指定策略ActorRef ref = sys.actorOf(Props.create(CustomActorDemo.class).                withDispatcher("my-forkjoin-dispatcher"), "customActor");

定制MailBox

我们知道,当我们给一个Actor发送消息时,并不是直接传递给它,而是将消息发送到"邮箱",由邮箱来进行调度,决定什么时候发送。Actor的邮箱其实就是一个消息队列,默认遵循先进先出的原则。当然,我们基于某些场景自定义它。

邮箱分为有界(Unbounded)和无界(Unbounded),Actor默认 采用UnboundedMailbox。

UnboundedMailbox一个基于链表的队列结构,元素从队尾入队,从队首出队,同时它使用CAS保证多线程下的安全,保证了性能的高效。

Akka包含了很多自定义的邮箱,主要有:

邮箱 说明 是否阻塞 是否有界
UnboundedMailbox 基于ConcurrentLinkedQueue实现,先进先出。 N N
SingleConsumerOnlyUnboundedMailbox 多生产者——单消费者模式(比ConcurrentLinkedQueue慢)。 Y N
NonBlockingBoundedMailbox 多生产者——单消费者模式,直接将多余消息废弃,所以不需要mailbox-push-timeout-time N Y
UnboundedControlAwareMailbox 优先发送实现ControlMessage的控制消息。 N N
UnboundedPriorityMailbox 允许对其内容进行优先排序。扩展这个类并在构造函数中提供比较器。 N N
UnboundedStablePriorityMailbox 与UnboundedPriorityMailbox类似,但它保留同等优先级邮件的顺序。 N N
BoundedMailbox 参与者使用的默认有界邮箱类型。 Z N
BoundedPriorityMailbox 允许对其内容进行优先排序的有界邮箱。 Z Y
BoundedStablePriorityMailbox 允许对其内容进行优先排序,保留同等优先级邮件的顺序的有界邮箱。 Z Y
BoundedControlAwareMailbox 优先发送实现ControlMessage控制消息的有界有偶像。 Z Y

Z表示:当mailbox-push-timeout-time非0 时,可能会阻塞, 反之则不会。

首先,我们要创建一个自定义的邮箱类:

class CustomEmail extends UnboundedStablePriorityMailbox {
public CustomEmail(ActorSystem.Settings settings, Config config){
/* 返回值越小,优先级越高 */ super(new PriorityGenerator() {
@Override public int gen(Object message) {
if (message instanceof String) {
String msg = (String) message; if (msg.startsWith("张")) return 0; if (msg.startsWith("李")) return 1; if (msg.startsWith("王")) return 2; } return 3; } }); }}

然后,修改application.conf文件内容:

my-mailbox{    mailbox-type = "cn.bigkai.akka.CustomEmail" # 绑定邮箱    mailbox capacity = 1000 # 邮箱容量    mailbox-push-timeout-time = 10s # 入队超时时间(对于有界邮箱)}

测试时,将自定义邮箱关联起来:

ActorRef ref = sys.actorOf(Props.create(CustomEmailActorDemo.class).withMailbox("my-mailbox"));

你也可以在配置文件的dispatcher中配置邮箱,然后在代码中直接关联dispatcher:

my-forkjoin-dispatcher{    mailbox-type = "cn.bigkai.akka.CustomEmail" # 绑定邮箱}

或者直接在你的Actor上继承对应的邮箱接口:

// 给该Actor发送的ControlMessage消息将会被优先处理public class CustomMailBoxAkka extends UntypedAbstractActor implements RequiresMessageQueue
{
@Override public void onReceive(Object message) throws Throwable {
System.out.println(message); }}

邮箱队列接口与类型对应:

接口 类型
UnboundedMessageQueueSemantics UnboundedMailbox
BoundedMessageQueueSemantics BoundedMailbox
DequeBasedMessageQueueSemantics UnboundedDequeBasedMailbox
UnboundedDequeBasedMessageQueueSemantics UnboundedDequeBasedMailbox
BoundedDequeBasedMessageQueueSemantics BoundedDequeBasedMailbox
MultipleConsumerSemantics UnboundedMailbox
AwareMessageQueueSemantics UnboundedControlAwareMailbox
UnboundedControlAwareMessageQueueSemantics UnboundedControlAwareMailbox
BoundedControlAwareMessageQueueSemantics BoundedControlAwareMailbox
LoggerMessageQueueSemantics LoggerMailboxType

在上面我们通过继承对应的邮箱类型的接口来自定义邮箱,但是如果有的时候我们想自己修改队列的实现或者完成更细粒度的操作该怎么办呢?这时候就需要实现它们的父类接口:MessageQueue:

class BusinessMsgQueue implements MessageQueue{
private Queue
queue = new ConcurrentLinkedDeque<>(); @Override public void enqueue(ActorRef receiver, Envelope handle) {
queue.offer(handle); } @Override public Envelope dequeue() {
return queue.poll(); } @Override public int numberOfMessages() {
return queue.size(); } @Override public boolean hasMessages() {
return !queue.isEmpty(); } @Override public void cleanUp(ActorRef owner, MessageQueue deadLetters) {
for (Envelope ev : queue) deadLetters.enqueue(owner, ev); }}class BusinessMailBoxType implements MailboxType, ProducesMessageQueue
{
// 必须要构建一个带Settings、Config的构造参数 public BusinessMailBoxType(ActorSystem.Settings settings, Config config) {
} // 指定自定义的队列 @Override public MessageQueue create(Option
owner, Option
system) {
return new BusinessMsgQueue(); }}

Actor消息路由

Akka除了为简单的消息传递提供了tell、ask等API,同时通过路由的方式也为轮询、广播等复杂消息投递逻辑实现了对应方法。下面,就让我们尝试一下Akka的路由机制。

在进行代码编写前,我们先理解两个概念:RouterRoute

  • Router:路由器,将消息进行转发。
  • Routee:路由目标,从路由器转发的消息最后会进行对应的路由目标。

下面是一段简单的测试代码:

public class RouteAkka extends UntypedAbstractActor{
private Router router; @Override public void preStart() throws Exception {
super.preStart(); ArrayList
list = new ArrayList<>(); for (int i = 0; i < 2; i++) list.add(new ActorRefRoutee(getContext().actorOf(Props.create(RouteeActor.class), "routeeActor" + i))); router = new Router(new RoundRobinRoutingLogic(), list); // 轮询投递 } @Override public void onReceive(Object message) throws Throwable {
router.route(message, getSender()); } public static void main(String[] args) {
ActorSystem sys = ActorSystem.create(); ActorRef ref = sys.actorOf(Props.create(RouteAkka.class), "routeActor"); for (int i = 0; i < 10; i++) {
ref.tell("Hello " + i, ActorRef.noSender()); } }}class RouteeActor extends UntypedAbstractActor {
@Override public void onReceive(Object message) throws Throwable {
System.out.println(getSelf() + "——>" + message); }}

RoundRobinRoutingLogic继承了RoutingLogic,并重写了它的select方法,通过一个AtomicLong原子类递增并求余,来获取当前routee的下标。

private final AtomicLong next = new AtomicLong();public Routee select(final Object message, final IndexedSeq
routees) {
Object var10000; if (routees.nonEmpty()) {
int size = routees.size(); int index = (int)(this.next().getAndIncrement() % (long)size); var10000 = (Routee)routees.apply(index < 0 ? size + index : index); } else {
var10000 = .MODULE$; } return (Routee)var10000;}

在Router的route方法中,通过send发送消息。

public void route(final Object message, final ActorRef sender) {
BoxedUnit var3; if (message instanceof Broadcast) {
Broadcast var5 = (Broadcast)message; Object msg = var5.message(); // 发送消息 (new SeveralRoutees(this.routees())).send(msg, sender); var3 = BoxedUnit.UNIT; } else {
this.send(this.logic().select(message, this.routees()), message, sender); var3 = BoxedUnit.UNIT; }}

内置路由类型

路由 说明 支持pool 支持group
RoundRobinRouting 轮询发送消息。 Y Y
RandomRouting 随机发送消息 。 Y Y
BalancingRouting 动态分配Routee的任务,努力达到执行时间的平衡。 Y N
SmallestMailbox 尝试给消息较少的非挂起的Routee 发送消息。 Y N
Broadcast 会以广播的形式发送给所有Routee。 Y Y
ScatterGatherFirstComp leted 将消息发送给所有Routee ,并等待一个最快回复(如果指定时间内没有回复,则抛出一个异常)。 Y Y
TailChopping 首先发送消息给一个随机选取的Routee ,一段时间时间后发给第二个随机选取的Routee ,直到获取回复。 Y Y
ConsistentHashing 使用一致性Hash算法来选择Routee 。 Y Y

一个路由Actor有两种模式:

  • pool:路由器Actor 会创建子Actor 作为其Routee 并对其监督和监控,当某个Routee 终止时将其移除出去,如上面实例所示。

  • group:可以将Routee 的生产方式放在外部(即在conf文件中设置),然后路由器Actor 通过路径( path )对这些目标进行消息发送。

    my-dispatcher{    executor = "thread-pool-executor"    type  = PinnedDispatcher    router = boradcast-group    routees.path = ["/default/user/routeActor/routeeActor1", "/default/user/routeActor/routeeActor2"]}

转载地址:http://wjqgn.baihongyu.com/

你可能感兴趣的文章
EXPORT_SYMBOL() 错误--warning: type defaults to 'int' in declaration of 'EXPORT_SYMBOL'
查看>>
Qt 使用 QSettings 读写ini文件
查看>>
Uboot LCD 添加进度条功能
查看>>
Git diff 使用 vimdiff 对比差异
查看>>
使用debugfs来调试内核
查看>>
Qt4 程序 QWS 启动参数详解
查看>>
QT 支持鼠标和触摸屏输入
查看>>
svn diff 使用 vimdiff 对比差异
查看>>
使用 vimdiff 比较文件的技巧
查看>>
Ubuntu 安装 SSH 服务
查看>>
Git commit 设置提交日志编辑器
查看>>
Git config 配置文件详解
查看>>
Git config alias 设置命令别名
查看>>
Git 追踪内容详解
查看>>
Git 打包文件详解
查看>>
C 数据对齐算法
查看>>
Openssl 移植
查看>>
wireless tools 移植和使用
查看>>
libnl1.1.4 移植
查看>>
udhcpc 移植和使用
查看>>