Akka Dispatchers和Routers

举报
yd_221104950 发表于 2020/12/05 01:05:44 2020/12/05
【摘要】 Akka Dispatcher是维持Akka Actor动作的核心组件,是整个Akka框架的引擎。它是基于Java的Executor框架来实现的。Dispatcher控制和协调消息并将其分发给运行在底层线程上的Actor,由它来负责调度资源的优化,并保证任务以最快的速度执行。 Akka的高稳定性是建立在“Let It Crash”模型之上的,该模型是基于Supervi...

Akka Dispatcher是维持Akka Actor动作的核心组件,是整个Akka框架的引擎。它是基于Java的Executor框架来实现的。Dispatcher控制和协调消息并将其分发给运行在底层线程上的Actor,由它来负责调度资源的优化,并保证任务以最快的速度执行。

Akka的高稳定性是建立在“Let It Crash”模型之上的,该模型是基于Supervision和Monitoring实现的。通过定义Supervision和监管策略,实现系统异常处理。

Akka为了保证事务的一致,引入了STM的概念。STM使用的是“乐观锁”,执行临界区代码后,会检测是否产生冲突,如果产生冲突,将回滚修改,重新执行临界区代码。

Akka中,Dispatcher基于Java Executor框架来实现,提供了异步执行任务的能力。Executor是基于生产者——消费者模型来构建的。这意味着任务的提交和任务的执行是在不同的线程中隔离执行的,即提交任务的线程与执行任务的线程是不同的。

Executor框架有两个重要实现:
ThreadPoolExecutor:该实现从预定义的线程池中选取线程来执行任务。
ForkJoinPool:使用相同的线程池模型,提供了工作窃取的支持。

Dispatcher运行在线程之上,负责分发其邮箱里面的Actors和Messages到executor中的线程上运行。在Akka中,提供了4种类型的Dispatcher:

  • Dispatcher
  • Pinned Dispatcher
  • Balancing Dispatcher
  • Calling Thread Dispatcher
    对应的也有4种默认的邮箱:
  • Unbounded mailbox
  • Bounded mailbox
  • Unbounded priority mailbox
  • Bounded priority mailbox

为Actor指定派发器

一般Actor都会有缺省的派发器,如果要指定派发器,要做两件事:
1)在实例化Actor时,指定派发器:

  val myActor = context.actorOf(Props[MyActor].withDispatcher("my-dispatcher"),"myActor")

  
 
  • 1

2)创建Actor时,使用withDispatcher指定派发器,如my-dispatcher,然后在applicaction.conf配置文件中配置派发器
使用Dispatcher派发器

my-dispatcher{
  # Dispatcher是基于事件的派发器名称
  type = Dispatcher
  # 使用何种ExecutionService executor = "fork-join-executor"
  # 配置fork join池
  fork-join-executor{ # 容纳基于倍数的并行数的线程数下限 parallelism-min = 2 # 并行数(线程)(CPU核数*2) parallelism-factor = 2.0 # 容纳基于倍数的并行数量的线程数上限 parallelism-max = 10
  }
  # throughput定义了线程切换到另一个Actor之前处理的消息数上限
  # 设置为1表示尽可能公平
  throughput = 100
}

  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18

使用PinnedDispatcher派发器

my-dispatcher{
  # Dispatcher是基于事件的派发器名称
  type = PinnedDispatcher
  # 使用何种ExecutionService
  executor = "thread-pool-executor"
  # 配置fork join池
  thread-pool-executor{ # 容纳基于倍数的并行数的线程数下限 parallelism-min = 2 # 并行数(线程)(CPU核数*2) parallelism-factor = 2.0 # 容纳基于倍数的并行数量的线程数上限 parallelism-max = 10
  }
  # throughput定义了线程切换到另一个Actor之前处理的消息数上限
  # 设置为1表示尽可能公平
  throughput = 100
}

  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18

不同派发器的介绍

  • Dispatcher
    Dispatcher是Akka中默认的派发器,它是基于事件的分发器,该派发器绑定一组Actor到线程池中。该派发器有如下特点:
    1)每一个Actor都有自己的邮箱
    2)该派发器都可以被任意数量的Actor共享
    3)该派发器可以由ThreadPoolExecutor或ForkJoinPool提供支持
    4)该派发器是非阻塞的。

  • Balancing Dispatcher
    该派发器是基于事件的分发器,它会将任务比较多的Actor的任务重新分发到比较闲的Actor上运行。该派发器有如下特点:
    1)所有Actor共用一个邮箱
    2)该派发器只能被同一种类型的Actor共享
    3)该派发器可以由ThreadPoolExecutor或ForkJoinPool提供支持

  • Pinned Dispatcher
    该派发器为每一个Actor提供一个单一的、专用的线程。这种做法在I/O操作或者长时间运行的计算中很有用。该派发器有如下特点:
    1)每一个Actor都有自己的邮箱
    2)每一个Actor都有专用的线程,该线程不能和其他Actor共享
    3)该派发器有一个Executor线程池
    4)该派发器在阻塞上进行了优化,如:如果程序正在进行I/O操作,那么这个Actor将会等到任务执行完成。这种阻塞型的操作在性能上要比默认的Dispatcher要好。

  • Calling Thread Dispatcher
    该派发器主要用于测试,并且在当前线程运行任务,不会创建新线程,该派发器有如下特点:
    1)每一个Actor都有自己的邮箱
    2)该派发器都可以被任意数量的Actor共享
    3)该派发器由调用线程支持

邮箱

邮箱用于保存接收的消息,在Akka中除使用BalancingDispather分发器的Actor以外,每个Actor都拥有自己的邮箱。使用同一个BalancingDispather的所有Actor共享同一个邮箱实例。

邮箱是基于Java concurrent中的队列来实现的,它有如下特点:
1)阻塞队列,直到队列空间可用,或者队列中有可用元素
2)有界队列,它的大小是被限制的

缺省的邮箱实现

  • UnboundedMailbox
    底层是一个java.util.concurrent.ConcurrentLinkedQueue
    是否阻塞:No
    是否有界:No
  • BoundedMailbox
    底层是一个java.util.concurrent.LinkedBlockingQueue
    是否阻塞:Yes
    是否有界:Yes
  • UnboundedPriorityMailbox
    底层是一个java.util.concurrent.PriorityBlockingQueue
    是否阻塞:Yes
    是否有界:No
  • BoundedPriorityMailbox
    底层是一个java.util.PriorityBlockingQueue
    是否阻塞:Yes
    是否有界:Yes
    还有一些缺省的持久邮箱

Router

当处理到来的消息流时,我们需要一个actor来引导消息路由到目标actor,从而提高消息的分配效率。在Akka中这个 actor就是Router。它所管理的一些目标actor叫做routees

Akka定义好的一些Router:

  • akka.routing.RoundRobinRouter:轮转路由器将消息按照轮转顺序发送给routers
  • akka.routing.RandomRouter:随机路由器随机选择一个router,并将消息发送给这个router
  • akka.routing.SmallestMailboxRouter:最小邮箱路由器会在routers中选择邮箱里信息最少的router,然后把消息发送给它。
  • akka.routing.BroadcastRouter:广播路由器将相同的消息发送给所有的routers
  • akka.routing.ScatterGatherFirstCompletedRouter:敏捷路由器先将消息广播到所有routers,返回最先完成任务的router的结果给调用者。

路由器的使用

  • RoundRobinPool 和 RoundRobinGroupRouter对routees使用轮询机制
  • RandomPool 和 RandomGroupRouter随机选择routees发送消息
  • BalancingPool尝试从繁忙的routee重新分配任务到空闲routee,所有的routee共享一个mailbox
  • SmallestMailboxPoolRouter创建的所有routees中谁邮箱中的消息最少发给谁
  • BroadcastPool 和 BroadcastGroup广播的路由器将接收到的消息转发到它所有的routee。
  • ScatterGatherFirstCompletedPool 和 ScatterGatherFirstCompletedGroup将消息发送给所有的routees,然后等待到收到第一个回复,将结果发送回原始发送者。其他的回复将被丢弃
  • TailChoppingPool 和 TailChoppingGroup将首先发送消息到一个随机挑取的routee,短暂的延迟后发给第二个routee(从剩余的routee中随机挑选),以此类推。它等待第一个答复,并将它转回给原始发送者。其他答复将被丢弃。此Router的目标是通过查询到多个routee来减少延迟,假设其他的actor可能比第一个actor更快响应。
  • ConsistentHashingPool 和 ConsistentHashingGroup对消息使用一致性哈希(consistent hashing)选择routee
    有三种方式定义哪些数据作为一致性哈希键
    定义路由的hashMapping,将传入的消息映射到它们一致哈希键。这使决策对发送者透明。·
    这些消息可能会实现ConsistentHashable。键是消息的一部分,并很方便地与消息定义一起定义。·
    消息可以被包装在一个ConsistentHashableEnvelope中,来定义哪些数据可以用来做一致性哈希。发送者知道要使用的键。

路由器的使用要先创建路由器后使用。 AKKA的路由由router和众多的routees组成,router和routees都是actor.Router即路由,是负责负载均衡和路由的抽象,有两种方法来创建router:
1.Actor Group
2.Actor Pool
当处理到来的消息流时,我们需要一个actor来引导消息路由到目标actor,从而提高消息的分配效率。在Akka中这个 actor就是Router。它所管理的一些目标actor叫做routees

根据不同的情况需要,Akka提供了几种路由策略。当然也可以创建自己的路由及策略。Akka提供的路由策略如下:

  • akka.routing.RoundRobinRoutingLogic 轮询
  • akka.routing.RandomRoutingLogic 随机
  • akka.routing.SmallestMailboxRoutingLogic 空闲
  • akka.routing.BroadcastRoutingLogic 广播
  • akka.routing.ScatterGatherFirstCompletedRoutingLogic 分散聚集
  • akka.routing.TailChoppingRoutingLogic 尾部断续
  • akka.routing.ConsistentHashingRoutingLogic 一致性哈希

创建Router Actor

创建router actor 有两种方式:

  1. Pool(池)——routees都是router 的子actor,如果routees终止,router将把它们移除
  2. Group(群组)——routees都创建在router的外部,router通过使用actor来选择将消息发送到指定路径,但不监管routees是否终止。Router actor 向 routees 发送消息,与向普通actor发送消息一样通过其ActorRef。Router actor 不会改变消息的发送人,routees 回复消息时发送回原始发件人,而不是Router actor。

Pool(池)可以通过配置并使用代码在配置中获取的方法来实现 (例如创建一个轮询Router向5个routees发送消息)

Group(群组)有时我们需要单独地创建routees,然后提供一个Router来供其使用。可以通过将routees的路径传递给Router的配置,消息将通过ActorSelection来发送到这些路径。
有两种方式创建路由器:
Pool(池)

import akka.actor._
import akka.routing.{ActorRefRoutee, FromConfig, RoundRobinGroup, RoundRobinPool, RoundRobinRoutingLogic, Router}
object HelloScala {
  def main(args: Array[String]): Unit = { // 创建router val _system = ActorSystem("testRouter") // 通知代码来实现路由器 val hahaRouter = _system.actorOf(RoundRobinPool(5).props(Props[WorkerRoutee]),"router111") hahaRouter ! RouteeMsg(333) val myRouter = _system.actorOf(Props[WorkerRoutee].withRouter(RoundRobinPool(nrOfInstances = 5))) myRouter ! RouteeMsg(22) val masterRouter = _system.actorOf(Props[MasterRouter],"masterRouter") masterRouter ! RouteeMsg(100)
  }
}
class MasterRouter extends Actor{
  var masterRouter = { val routees = Vector.fill(3){ val r = context.actorOf(Props[WorkerRoutee]) context watch r ActorRefRoutee(r) } Router(RoundRobinRoutingLogic(),routees)
  }
  override def receive: Receive = { case w: RouteeMsg => masterRouter.route(w,sender()) case Terminated(a) => masterRouter = masterRouter.removeRoutee(a) val r = context.actorOf(Props[WorkerRoutee]) context watch r masterRouter = masterRouter.addRoutee(r) }
}

// 定义routee对应的actor类型
case class RouteeMsg(s: Int)

class WorkerRoutee extends Actor{
  override def receive: Receive = { case RouteeMsg(s) => println(s"${self.path} mesage#$s") val caleActor = context.actorOf(Props[Cale]) caleActor ! RouteeMsg(s) case _ => println(s"${self.path}")
  }
}

class WorkerRoutee2 extends Actor{
  override def receive: Receive = { case RouteeMsg(s) => println(s"${self.path} mesage#@@@@@$s") val caleActor = context.actorOf(Props[Cale]) caleActor ! RouteeMsg(s) case _ => println(s"${self.path}")
  }
}

class Cale extends Actor{
  override def receive: Receive = { case RouteeMsg(s) => println(s"${self.path} message#$s") case _ => println(s"${self.path}")
  }
} 
  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74

Group(群组)

import akka.actor._
import akka.routing.{ RoundRobinGroup}
object HelloScala {
  def main(args: Array[String]): Unit = { val _system = ActorSystem("AkkaTestActor") val tActor = _system.actorOf(Props[TestActor],"testActor") tActor ! RouteeMsg(13333)
 }
}

class TestActor extends  Actor{
  val routee1 = context.actorOf(Props[WorkerRoutee],"w1")
  val routee2 = context.actorOf(Props[WorkerRoutee],"w2")
  val routee3 = context.actorOf(Props[WorkerRoutee],"w3")
  val paths: Array[String] = Array(routee1.path.toString,routee2.path.toString,routee3.path.toString) val testRouter = context.actorOf(RoundRobinGroup(paths).props(),"testRouter")
  override def receive = { case RouteeMsg(s) => testRouter ! RouteeMsg(s) case _ =>
  }
}

// 定义routee对应的actor类型
case class RouteeMsg(s: Int)

class WorkerRoutee extends Actor{
  override def receive: Receive = { case RouteeMsg(s) => println(s"${self.path} mesage#$s") val caleActor = context.actorOf(Props[Cale]) caleActor ! RouteeMsg(s) case _ => println(s"${self.path}")
  }
}


class Cale extends Actor{
  override def receive: Receive = { case RouteeMsg(s) => println(s"${self.path} message#$s") case _ => println(s"${self.path}")
  }
} 
  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50

特殊消息
Broadcast消息用于向Router所有的routee发送一条消息,不管该Router通常是如何路由消息的。
PoisonPill消息无论哪个actor收到PosionPill消息都会被停止。但是对于PoisonPill消息Router不会将其传给routees。但仍然能影响到routees,因为Router停止时它的子actor也会停止,就可能会造成消息未处理。因此我们可以将PoisonPill包装到Broadcast消息中。这样Router所管理的所有routees将会处理完消息后再处理PoisonPill并停止。
Kill消息当Kill消息被发送到Router,Router将内部处理该消息,并且不会将它发送到其routee。Router将抛出ActorKilledException并失败,然后Router根据监管的策略,被恢复、重启或终止。Router的子routee也将被暂停,也受Router监管的影响,但是独立在Router外部创建的routee将不会被影响。

import akka.actor._
import akka.routing.{Broadcast, RoundRobinGroup}
object HelloScala {
  def main(args: Array[String]): Unit = { val _system = ActorSystem("AkkaTestActor") val tActor = _system.actorOf(Props[TestActor],"testActor") tActor ! PoisonPill
 }
} class TestActor extends  Actor{
  val routee1 = context.actorOf(Props[WorkerRoutee],"w1")
  val routee2 = context.actorOf(Props[WorkerRoutee],"w2")
  val routee3 = context.actorOf(Props[WorkerRoutee],"w3")
  val paths: Array[String] = Array(routee1.path.toString,routee2.path.toString,routee3.path.toString) val testRouter = context.actorOf(RoundRobinGroup(paths).props(),"testRouter")
  override def receive = { case RouteeMsg(s) => testRouter ! RouteeMsg(s) case RouteeBroadcast => testRouter ! Broadcast // 用于向Router所有的routee发送一条消息,不管该Router通常是如何路由消息的。 case Broadcast => println("TestActor receive a broadcast message") case Kill => testRouter ! Kill// 当Kill消息被发送到Router,Router将内部处理该消息,并且不会将它发送到其routee。 case PoisonPill => testRouter ! PoisonPill // 无论哪个actor收到PosionPill消息都会被停止。但是对于PoisonPill消息Router不会将其传给routees。 case _ =>
  }
}

// 定义routee对应的actor类型
case class RouteeMsg(s: Int)
// 定义广播信息
case object RouteeBroadcast

class WorkerRoutee extends Actor{
  override def receive: Receive = { case RouteeMsg(s) => println(s"${self.path} mesage#$s") val caleActor = context.actorOf(Props[Cale]) caleActor ! RouteeMsg(s) case Broadcast => println("WorkerRoutee receive a broadcast message") case Kill => println("WorkerRoutee receive a Kill message") case PoisonPill => println("WorkerRoutee receive a PoisonPill message") case _ => println(s"${self.path}")
  }
}


class Cale extends Actor{
  override def receive: Receive = { case RouteeMsg(s) => println(s"${self.path} message#$s") case Broadcast => println("Cale receive a broadcast message") case _ => println(s"${self.path}")
  }
} 
  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72

远程部署Router

既可以创建本地actor来作为Router,也可以命令Router在任一远程主机上部署子actor。需要将路由配置放在RemoteRouterConfig下,在远程部署的路径类中要添加akka-remote模块:

import akka.actor._
import akka.remote.routing.{RemoteRouterConfig}
import akka.routing.{Broadcast,RoundRobinPool}
object HelloScala {
  def main(args: Array[String]): Unit = { val _system = ActorSystem("AkkaTestActor") val addresses = Seq( Address("akka.tcp","remotesys","otherhost",6666), AddressFromURIString("akka.tcp://othersys@anotherhost:6666") ) // WorkerRoutee 路由部署到远程的主机上 val routerRemote = _system.actorOf(RemoteRouterConfig(RoundRobinPool(5),addresses).props(Props[WorkerRoutee]))
 }
}

// 定义routee对应的actor类型
case class RouteeMsg(s: Int)
class WorkerRoutee extends Actor{
  override def receive: Receive = { case RouteeMsg(s) => println(s"${self.path} mesage#$s") val caleActor = context.actorOf(Props[Cale]) caleActor ! RouteeMsg(s) case Broadcast => println("WorkerRoutee receive a broadcast message") case Kill => println("WorkerRoutee receive a Kill message") case PoisonPill => println("WorkerRoutee receive a PoisonPill message") case _ => println(s"${self.path}")
  }
}
class Cale extends Actor{
  override def receive: Receive = { case RouteeMsg(s) => println(s"${self.path} message#$s") case Broadcast => println("Cale receive a broadcast message") case _ => println(s"${self.path}")
  }
} 
  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48

文章来源: blog.csdn.net,作者:WongKyunban,版权归原作者所有,如需转载,请联系作者。

原文链接:blog.csdn.net/weixin_40763897/article/details/93516694

【版权声明】本文为华为云社区用户转载文章,如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。