Akka Actor API介绍
Actor的行为是通过Actor中的receive方法来实现的。如果当前Actor的行为与收到的消息不匹配,就会调用unhandled(unhandled的实现是向Actor 系统的事件流中发布)akka.actor.UnhandledMessage(message,sender,recipient)
Actor trait中包括下列属性、方法:
- 成员变量self:代表本Actor的ActorRef
- 成员变量sender:代表最近收到的消息的发送Actor
- 成员方法supervisorStrategy:用户可重写它来定义对子Actor的监管策略。
- 隐式成员变量context:暴露Actor和当前消息的上下文信息,如:
1)用于创建子Actor的工厂方法actorOf
2)Actor所属的系统
3)父监管者
4)所监管的子Actor
5)生命周期监控
6)hotswap行为栈
Actor生命周期调用的回调函数如下:
// 在启动Actor之前调用
def preStart(){}
// 在重启之前调用
def preRestart(reason:Throwable,message: Option[Any]){ // 递归停止子Actor context.children foeach (context.stop(_)) // 调用postStop方法释放资源 postStop()
}
// 在重启之后调用
def postRestart(reason:Throwable){ // 调用preStart方法做启动准备 preStart()
}
// 在Actor停止后调用,清理释放暂用资源
def postStop(){}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
使用DeathWatch进行生命周期监控
为了在其他 Actor结束时收到通知,Actor可以将自己注册为其他Actor在终止所发布的Terminated消息的接收者。这个服务是由Actor系统的DeathWatch组件提供的。
下例中MyWatchActor继承Actor,在MyWatchActor内部使用context的actorOf工厂方法,创建一个Child Actor,并通过context的watch方法将MyWatchActor注册到child Actor,完成对child Actor的监控注册。当child Actor终止时,MyWatchActor将收到child Actor发出的Terminated消息,并作相应的处理。
import akka.actor._
object HelloScala {
def main(args: Array[String]): Unit = { // 创建名为HelloAkka的ActorSystem val _system = ActorSystem("HelloAkka") // 使用ActorSystem的ActorOf工厂方法创建Actor val myWatchActor = _system.actorOf(Props[MyWatchActor],"watchActor") // 向MyWatchActor发消息 myWatchActor ! "kill" }
}
class MyWatchActor extends Actor{
val child = context.actorOf(Props.empty,"child")
// 通过context的watch方法将MyWatchActor注册到child Actor,完成对child Actor的监控注册
context.watch(child)
var lastSender = context.system.deadLetters
println(lastSender)
override def receive: Receive = { case "kill" => // 停止child context.stop(child) lastSender = sender() // 收到监控child发出的Terminated消息 case Terminated(child) => println("receive child Terminated message") lastSender ! "finished!!!"
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
Hook函数的调用
1.启动Hook
启动Actor后,preStart会被立即执行。因此,我们可以在preStart方法中做相应的准备工作,或注册启动其他的Actor。
2.重启Hook
所有的Actor都是被监管的,并且以某种失败处理策略与另一个Actor连接在一起。如果在处理一个消息时抛出异常,Actor将被重启。
1)要被重启的Actor的preRestart方法将被调用,重启的Actor携带着导致重启的异常和触发异常的消息。如果重启不是因为消息的处理而发生的,那么携带的消息为None,如:一个监管者没有处理某个异常继而被它自己的监管者重启时,携带的消息为None。preRestart是用来完成清理、准备移交给新的Actor实例的最佳位置。它的缺省实现的是终止所有的子Actor并调用postStop
2)调用 actorOf工厂方法创建新的实例。
3)新的Actor的postRestart方法被调用 ,携带着导致重启的异常信息。Actor的重启会替换掉原来的Actor对象;重启不影响邮箱的内容,所以对消息的处理将在postRestart回调函数返回后继续,触发异常的消息不会被重新接收。在Actor重启过程中所有发送到该Actor的消息将像平常一样被放进邮箱队列中。
3.终止Hook
一个Actor终止后,它的postStop回调函数将被调用,postStop方法可以用来取消该 Actor在其他服务中的注册。这个回调函数保证该 Actor消息队列被禁止后才会运行,之后发给该 Actor的消息将被重定向到ActorSystem的deadLetters中。
查找Actor
每个Actor拥有一个唯一的逻辑路径,此路径是由从Actor系统的根开始的父子链构成的。同时,它还拥有一个物理路径 ,如果监管链包含远程监管者,那么此路径可能会与逻辑路径不同。这些路径用来在系统查找 Actor,如当收到一个远程消息时查找收件者。Actor可以通过指定绝对或相对路径(逻辑的或物理的)来查找其他的Actor并随结果获取ActorRef。
// 查找绝对路径
context.actorFor("/user/serviceA/aggregator")
// 查找相对路径,查找同一个父监管者下的兄弟
context.actorFor("../joe")
- 1
- 2
- 3
- 4
- 5
路径被解释为一个java.net.URI,以“/”分隔。如果路径以"/“开始,表示一个绝对路径,从根监管者("/user"的父亲)开始查找,否则会从当前Actor开始。如果某一个路径段为”…“,会找到当前所遍历到的Actor的上一级,否则会向下一级寻找具有该名字的子Actor。注意:Actor路径中的”…“总是表示逻辑结构,即它的监管者。如果查找的路径不存在,会返回一个特殊的Actor引用,它的行为与Actor系统的死信队列类似,但是保留其身份。如果开启了远程调用,则远程Actor地址也可以被查找,如:
context.actorFor("akka://app@otherhost:8888/user/service")
- 1
消息的不可变性
消息可以是任何类型的对象,但必须是不可变的。目前 ,Scala还无法强制不可变性。所以要大家自觉遵守这一约定。不可变的原始类型:String、Int、Boolean,除它们之外,推荐的做法就是使用Scala case class,它们是不可变的,并与接收的模式匹配配合得非常好,后面的例子中用了很多这种推荐的做法。其他适合做消息的类型:scala.Tuple2、scala.List、scala.Map。
发送消息
向Actor发消息的方式:
1)!:fire-and-forget,异步发送一条消息并立即返回,也称为tell。
2)?:异步发送一条消息并返回一个Future代表一个可能的回应,也称为ask。
每一个消息发送者分别保证自己消息的次序。
tell:「推荐」不会阻塞等待消息,拥有最好的并发性和可扩展性。如果是在一个Actor中调用,那么发送方的Actor引用会被隐式地作为消息的sender: ActorRef成员一起发送,目的Actor就可以用sender向原Actor发送回应,使用方式:sender ! replyMsg。如果不是从Actor实例发送的,sender成员默认为deadLetters Actor的引用。
ask:即包含Actor也包含Future,它作为一种使用模式,而不是ActorRef的方法。
示例:
import java.util.concurrent.TimeUnit
import akka.actor._
import akka.util.Timeout
import scala.concurrent.Future
import akka.pattern.{ask, pipe}
import scala.concurrent.ExecutionContext.Implicits.global
case object Request
case class Result(x: Int, s: String, d: Double)
class ActorA extends Actor{
override def receive: Receive = { case Request => println("ActorA") sender ! 25 // 给发送者返回整数25
}
}
class ActorB extends Actor{
override def receive: Receive = { case Request => println("ActorB") sender ! "Actor BB" // 给发送者返回字符串Actor BB
}
}
class ActorC extends Actor{
override def receive: Receive = { case Request => println("ActorC") sender ! 7500.0 // 给发送者返回浮点类型7500.0
}
}
class ActorD extends Actor{
override def receive: Receive = { case msg => println("ActorD") println(s"Request#${msg.toString}") // 打印Actor收到的消息
}
}
class TestActor extends Actor{ // 创建ActorA
val actorA = context.actorOf(Props[ActorA],"ActorA")
// 创建ActorB
val actorB = context.actorOf(Props[ActorB],"ActorB")
// 创建ActorC
val actorC = context.actorOf(Props[ActorC],"ActorC")
// 创建ActorD
val actorD = context.actorOf(Props[ActorD],"ActorD")
// 隐式参数,'?'操作会用到
implicit val timeout = Timeout(10,TimeUnit.SECONDS)
def f():Future[Result] = { for{ x <- ask(actorA,Request) mapTo manifest[Int] // 直接调用 s <- actorB ask Request mapTo manifest[String]// 隐式转换调用 d <- actorC ? Request mapTo manifest[Double]// 通过符号名称调用,用到超时隐式参数 }yield Result(x, s, d)
} override def receive: Receive = { case msg => println(s"receive msg:${msg}") f pipeTo actorD // 调用f方法,f方法返回Future,调用pipeTo方法,将消息转到actorD }
}
class MasterActor extends Actor{ val testActor = context.actorOf(Props[TestActor],"TestActor")
testActor ! "go"
override def receive: Receive = { case msg => println(s"masterActor receive msg:$msg")
}
}
object HelloScala { def main(args: Array[String]): Unit = { // 创建名为HelloAkka的ActorSystem val _system = ActorSystem("HelloAkka") // 使用ActorSystem的actorOf工厂方法创建MasterActor val masterActor = _system.actorOf(Props[MasterActor],"masterActor") Thread.sleep(60000) // 主线程睡眠60秒,等待计算结束后退出 _system.terminate()// 关闭ActorSystem
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
- 61
- 62
- 63
- 64
- 65
- 66
- 67
- 68
- 69
- 70
- 71
- 72
- 73
- 74
- 75
- 76
- 77
- 78
- 79
- 80
- 81
- 82
- 83
- 84
- 85
- 86
- 87
ask 产生Future,3个Future通过for语法组合成一个新的Future。然后pipeTo在Future上安装一个onComplete处理器来完成将收集到的Result发送到其他Actor的动作。使用ask会像tell一样发送消息给接收方,接收方必须通过sender ! reply发送回应来为返回的Future填充数据。因为ask操作包括创建一个内部Actor来处理回应,所以必须为这个内部Actor指定一个超时期限,过了超时期限,内部Actor将被销毁以防止内存泄露。如果要以异常来填充Future,需要发送一个Failure消息给发送方。这个操作不会在Actor处理消息发生异常时自动完成。可以使用try-catch方式来处理请求异常,把消息处理放在try块中,在捕捉到异常后,发送一个Failure消息给发送方:
try{ val result = doSomeOperation() sender ! result }catch { case e:Exception => sender ! Failure(e) throw e }
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
转发消息
可以将消息从一个Actor转发给另一个。虽然经过了一个”中转“,但最初的发送者的地址和引用将保持不变。当实现功能类似路由器、负载均衡器、备份等的Actor时,这一特性会很有用,用法是:oneActor.forward(message),即将message消息转发给oneActor。
接收消息
Actor必须实现receive方法来接收消息,receive方法的定义 :
def receive: PartialFunction[Any, Unit]
- 1
在接收消息时,如果在一段时间内没有收到消息,可以使用超时机制。要检测这种超时必须设置receiveTimeout属性:
class MyActor extends Actor{
context.setReceiveTimeout(new FiniteDuration(3000,TimeUnit.MILLISECONDS))// context设置超时时间
override def receive: Receive = { case "Hello" => println("hi") case ReceiveTimeout => throw new RuntimeException("received timeout")
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
上述代码设置了超时时间为3秒,如果3秒还未收到消息,receive方法中的case ReceiveTimeout分支将会被匹配到,将抛出一个异常。
回应消息
从前面的示例代码中, 可知向消息发送者反馈消息时,使用sender,sender代表了消息的发送方,当Actor通过ask或tell方法发送消息的时候,将自己作为一个隐式参数,发送到消息接收方那里,sender在消息接收方那里代表消息发送者,因此可以用sender ! replyMsg的方式给消息发送者回应消息。甚至可以将这个sender引用保存起来将来再做回应。如果没有sender(不是从Actor发送的消息或者没有Future上下文),那么sender默认为deadLetters"死信“Actor的引用。
终止Actor
在某些情况下,需要终止Actor并重新启动,以使Actor状态一致。可以通过调用ActorRefFactory或ActorContext或ActorSystem的stop方法来终止一个Actor。通常context用来终止子Actor,而system用来终止顶级Actor。实际的终止操作是异步执行的,因此stop可能在Actor被弹终止之前返回。如果当前有正在处理的消息,对该消息的处理将在Actor被终止之前完成,邮箱中的后续消息将不会被处理,默认情况下这些消息会被送到ActorSystem的死信中,但这也取决于邮箱的实现。
Actor终止分两步:
1)Actor停止对邮箱的处理,向所有子Actor发送终止命令,然后处理子Actor的终止消息,直到所有的子Actor都完成终止。
2)终止自己(调用postStop,销毁邮箱,向DeathWatch发布Terminated,通知其监管者)。这个过程保证Actor系统中的子树以一种有序的方式终止,将终止命令传播到叶子结点并收集它们回送的确认消息。如果其中某个Actor没有响应(如由于处理消息用了太长时间,以至于没有收到终止命令),整个过程将会被阻塞。
_system.terminate()关闭ActorSystem时,系统根监管Actor会被终止。postStop回调函数是在Actor被完全终止以后调用的,以释放占用的资源。
除了使用stop方法终止Actor外,还可以向Actor发送akka.actor.PoisonPill消息,这个消息处理完成后,Actor会被终止。PoisonPill与普通消息一样被放进队列,因此会在已经进入队列的其他消息之后被执行。
杀死Actor
可以通过发送Kill消息来杀死Actor,这将会使用正规的监管语义杀死Actor,如myActor ! Kill。
Kill、stop、PoisonPill的区别:
- stop方法和PoisonPill消息都会终止Actor的执行,并且停止消息队列。stop和PoisonPill操作会向子Actor发送终止消息,并等待它们的终止反馈,待所有的子Actor都终止后,调用回调函数postStop清理资源。stop与PoisonPill的区别是:调用stop方法会等待正在处理的消息处理完成,而之后的消息则置之不管;而发送PoisonPill消息,该消息将以普通消息的方式进入消息队列,等待处理,在消息队列中,PoisonPill消息之前的消息都将会得到处理。
- Kill消息将会使Actor抛出ActorKilledException异常,而处理该异常将会使用到监管机制,因此这里的处理完全依赖于定义的监管策略。在默认情况下,会停止Actor,并保存消息队列,待Actor重启之时,除了引发异常的消息之外,其余消息将得到恢复。
不同类型的Actor
Akka中的Actor分为有类型Actor、普通Actor。有类型Actor是Active Objects模式的一种,这种模式的主要思想是将方法的调用和执行分离,使用Actor的实现更清晰、更简洁。为了将方法的执行从方法的调用中分离,必须将方法的执行和方法的调用放置到隔离的线程中去,那么Actor就可以并行、异步地获取对象状态。
有类型Actor由两部分组成:一个公开的接口和一个实现。对普通Actor来说,拥有一个外部API(公开接口的实例)来将方法调用异步地委托给其实现的私有实例。
有类型Actor相对于普通Actor来说的优势在于:有类型Actor拥有静态的契约,不需要定义自己的消息。
有类型Actor相对于普通Actor来说的劣势在于:对能做什么和不能做什么进行了一些限制,如不能使用become/unbecome。
有类型Actor是使用JDK Proxies实现的,JDK Proxies提供了非常简单的API来拦截方法调用。
Active Object模式为了实现分离方法的调用和方法的执行,使用了代理模式,将接口与实现进行分离,思想就是在相互隔离的线程中分别运行代理和实现:
1)运行时,Client调用Proxy对象并执行方法
2)Proxy将方法调用转换成成对的Schedular或者Invocation Handler的请求。Schedular或者Invocation Handler将会拦截请求。
3)Schedular或者Invocation Handler将方法请求放入队列
4)监控队列,执行可执行的方法
5)Schedular或者Invocation Handler分发请求到具体实现方法的对象上执行。
6)具体对象执行方法,给Client端返回Future结果。
创建一个有类型Actor需要一个或多个接口,以及一个实现。创建有类型Actor最简单的方法是:
val myTypedActor:Trait = TypedActor(system).typedActorOf(TypedProps[Impl]())
- 1
关于接口方法,做以下说明:
- 如果方法返回void类型,该方法的调用就像有类型Actor中的tell一样,属于“fire and forget”
- 如果方法返回Option类型,该方法将会一直阻塞,直到结果的返回,如果在设置的超时时间内,还没有返回,方法将停止并返回None。
- 如果方法返回Future类型,该方法的调用就像有类型Actor中的ask一样,不会阻塞,会立即返回一个Future
- 方法返回其他类型,该方法将会一直阻塞,直到结果返回,一直到超时。
实现有类型Actor的示例:
import java.util.concurrent.TimeUnit
import akka.actor._
import scala.concurrent.duration.FiniteDuration
import scala.concurrent.{Await, Future}
// 定义接口
trait Cal{
// 求和
def add(x: Int, y: Int):Unit
// 求积
def multi:Future[Int]
}
// 实现接口
class Calculate(var length: Int, var width: Int) extends Cal{
override def add(x: Int, y: Int): Unit = { // 没有返回值,这种方法相当于tell调用 this.length + x + this.width + y
} override def multi: Future[Int] = { // 返回值为Future,这种方法相当于ask调用 println("wait before multi") Thread.sleep(2000) Future.successful(length * width)
}
}
object HelloScala { def main(args: Array[String]): Unit = { // 创建名为myActorSystem的ActorSystem val _system = ActorSystem("myActorSystem") // 使用TypedActor的teypedActorOf工厂方法实例化出Calculate对象 val calculator: Cal = TypedActor(_system).typedActorOf(TypedProps(classOf[Cal],new Calculate(20,30))) // fire and forget 类似tell val future1 = calculator.add(3,4) println(future1) // send and receive 类似ask val future2 = calculator.multi val result2 = Await.result(future2,new FiniteDuration(5,TimeUnit.SECONDS)) println(result2) _system.terminate() }
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
TypedActorOf方法的调用,返回一个Calculate动态代理实例。Akka中的有类型Actor,将异步的调用和执行封装在方法中,在代码层保证了顺序执行。Active Objects设计模式包含6种元素:
1)代理:提供了面向客户端的带有公开方法的接口
2)接口:定义了到active object的请求方法(业务代码提供)
3)来自客户端的序列等待请求
4)调度器:决定接下来执行哪个请求
5)active object方法的实现类(业务代码提供)
6)一个回调或变量,以让客户端接收结果
终止有类型Actor
由于有类型Actor底层还是Akka Actor,所以在不需要的时候要终止它,以释放资源,通常有两种终止方法:
TypedActor(_system).stop(calculator) 或 TypedActor(_system).poisonPill(calculator)// 异步终止与指定的代理关联的有类型Actor
- 1
- 2
- 3
方法派发语义
- 返回类型为void的方法时,Unit工具会以fire-and-forget语义进行派发,与ActorRef.tell完全一致。
- 返回类型为akka.dispatch.Future[ _ ]的方法,会以Send-Request-Reply语义进行派发,与ActorRef.ask完全一致。
- 返回类型为scala.Option[ _ ]和akka.japi.Option<?>的方法,会以Send-Request-Reply语义进行派发,会阻塞等待应答。如果在超时时限内没有应答,则返回None;否则,返回包含结果的scala.Some或akka.japi.Some。在这个调用中发生的异常将被重新抛出。
- 任何其他类型的值将以Send-Request-Reply语义进行派发,会阻塞等待应答。如果超时,会抛出java.util.concurrent.TimeoutException,如果发生异常,则将异常重新抛出。
Become/Unbecome
Akka支持在运行对Actor消息循环进行实时替换,即消息处理的HotSwap。其实现原理是通过become和unbecome在运行时动态替换消息处理的代码。become要求用一个PartialFunction[Any, Unit]参数作为新的消息处理实现,被替换的代码被存在一个栈中,可以通过push和pop替换。
示例代码:
import java.util.concurrent.TimeUnit
import akka.actor._
import akka.event.Logging
import scala.concurrent.duration.FiniteDuration
import scala.concurrent.{Await, Future}
case object HotSwap
class HotSwapperActor extends Actor{
import context._
val log = Logging(system,this)
override def receive: Receive = { case HotSwap => log.info("Hi")// 打印 // 调用become,此时处理邮符中处理消息的代码变成become块中的代码 become{ case HotSwap => log.info("Hello!") // 打印 unbecome()// 调用unbecome,此时处理邮箱中处理消息的代码变成become块外面的代码 } }
}
object HelloScala { def main(args: Array[String]): Unit = { // val _system = ActorSystem("HotSwapperSystem") val swap = _system.actorOf(Props[HotSwapperActor],"hotSwapperActor") swap ! HotSwap swap ! HotSwap swap ! HotSwap swap ! HotSwap }
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
上面的代码使用become/unbecome对处理消息的代码进行替换,HotSwapperActor收到第一个HotSwap消息后,打印“Hi”,然后调用become,当HotSwapperActor收到第二个HotSwap消息时,将执行become中的代码,打印“Hello!”,紧接着调用了unbecome,使用处理消息的代码回到初始,打印“Hi”。
以上就是Akka Actor API的相关内容,谢谢阅读!
文章来源: blog.csdn.net,作者:WongKyunban,版权归原作者所有,如需转载,请联系作者。
原文链接:blog.csdn.net/weixin_40763897/article/details/93243711
- 点赞
- 收藏
- 关注作者
评论(0)