17_Scala_Akka_Actor上

举报
alexsully 发表于 2021/05/14 15:49:14 2021/05/14
【摘要】 Akka 中Actor 模型是一种处理并发问题的解决方案Actor模型工作机制说明1 处理并发问题关键是要保证共享数据的一致性和正确性,因为程序是多线程时,多个线程对同一个数据进行修改,若不加同步条件,势必会造成数据污染但是当对关键代码加入同步条件synchronized 后,实际上大并发就会阻塞在这段代码,对程序效率有影响2 若是用单线程处理,不会有数据一致性的问题,但是系统的性能又不能保...

Akka 中Actor 模型是一种处理并发问题的解决方案

Actor模型工作机制说明

1 处理并发问题关键是要保证共享数据的一致性和正确性,因为程序是多线程时,多个线程对同一个数据进行修改,若不加同步条件,势必会造成数据污染
但是当对关键代码加入同步条件synchronized 后,实际上大并发就会阻塞在这段代码,对程序效率有影响
2 若是用单线程处理,不会有数据一致性的问题,但是系统的性能又不能保证。
3 Actor 模型是一种处理并发问题的解决方案

4 Actor 之间只能用消息进行通信,给 Actor发消息,消息是有顺序的(消息队列),只需要将消息投寄的相应的邮箱即可。
5 怎么处理消息是由接收消息的Actor决定的,发送消息Actor可以等待回复,也可以异步处理【类似ajax异步】
6 ActorSystem 的职责是负责创建并管理其创建的 Actor, ActorSystem 是单例的(ActorSystem是一个工厂,专门创建Actor),一个 JVM 进程中有一个即可,而 Acotr 是可以有多个的

7 Actor模型是对并发模型进行了更高的抽象。
8 Actor模型是异步、非阻塞、高性能的事件驱动编程模型
9 Actor模型是轻量级事件处理(1GB 内存可容纳百万级别个 Actor),因此处理大并发性能高.


Actor模型工作机制

1 ActorySystem创建Actor
2 ActorRef:可以理解成是Actor的代理或者引用。
 (注:消息是通过ActorRef来发送,而不能通过Actor 发送消息,通过哪个ActorRef 发消息,就表示把该消息发给哪个Actor)
3 消息发送到Dispatcher Message (消息分发器),它得到消息后,会将消息进行分发到对应的MailBox。
(注: Dispatcher Message 可以理解成是一个线程池, MailBox 可以理解成是消息队列,可以缓冲多个消息,遵守FIFO 先进先出原则)
4 Actor 可以通过 receive方法来获取消息,然后进行处理。

Actor间传递消息机制

1 每一个消息就是一个Message对象。Message 继承了Runable, 因为Message就是线程类。
2 编程时只需要编写Actor就可以了,其它的交给Actor模型完成即可。
3  A Actor要给B Actor 发送消息,那么A Actor 要先拿到(也称为持有) B Actor 的 代理对象ActorRef 才能发送消息
4  每个actor有个mailbox(封装不可见)且实现了runnable 接口一直在运行,等待其他人给发消息


//1. 当我们继承Actor后,就是一个Actor,核心方法receive 方法重写
class HelloActor extends  Actor{
  //1. receive方法,会被该Actor的MailBox(实现了Runnable接口)调用
  //2. 当该Actor的MailBox 接收到消息,就会调用 receive
  //3. type Receive = PartialFunction[Any, Unit]
  override def receive: Actor.Receive = {
    case "hello" =>  println("收到hello, 回应hello too:)")
    case "ok" => println("收到ok, 回应ok too:)")
    case "exit" => {
      println("接收到exit指令,退出系统")
      context.stop(self) //停止actoref  (停止mailbox) // self 就是自己
      context.system.terminate()//退出actorsystem (停止 actor )
    }
    case _ => println("匹配不到")
  }
}

object  HelloActorDemo{
  //先创建一个ActorSystem, 专门用于创建Actor
  //1. ActorSystem apply方法返回一个 ActorSystem;
  val actoryFactory = ActorSystem("ActoryFactory")

  //2. 创建一个Actor的同时,返回Actor的ActorRef 说明
  //(1) Props[SayHelloActor] 创建了一个 SayHelloActor实例,使用反射 ,也可以使用 new 方式
  // Props[SayHelloActor] 该实例被 ActorSystem 接管了
  //(2) "alexHelloActor" 给actor取名
  //(3) sayHelloActorRef: ActorRef 就是 Props[SayHelloActor] 的ActorRef
  //(4) 创建的SayHelloActor(Props[SayHelloActor]) 实例被ActorSystm接管
  //(5) ActorRef 仅仅是与SayHelloActor(Props[SayHelloActor]) 关联起来
  private val alexHelloActor1: ActorRef = actoryFactory.actorOf(Props[HelloActor], "alexHelloActor")
  def main(args: Array[String]): Unit = {
    alexHelloActor1 ! "hello"
    alexHelloActor1 ! "ok"
    alexHelloActor1 ! "exit"
  }
}
Actor自我通讯机制原理图
当程序执行 aActorRef = actorFactory.actorOf(Props[AActor], "aActor") ,会完成如下任务 

1 actorFactory 是 ActorSystem("ActorFactory") 这样创建的。
2 这里的 Props[AActor] 会使用反射机制,创建一个AActor 对象,如果是actorFactory.actorOf(Props(new AActor(bActorRef)), "aActorRef") 形式,就是使用new 的方式创建一个AActor对象, 注意Props() 是小括号。
3 会创建一个AActor 对象的代理对象 aActorRef , 使用aActorRef 才能发送消息
4 会在底层创建 Dispather Message ,是一个线程池,用于分发消息, 消息是发送到对应的Actor的 MailBox
5 会在底层创建AActor 的MailBox 对象,该对象是一个队列,可接收Dispatcher Message 发送的消息
6 MailBox 实现了Runnable 接口,是一个线程,一直运行并调用Actor的receive 方法,因此当Dispather 发送消息到MailBox时,Actor 在receive 方法就可以得到信息.
7 aActorRef !  "hello", 表示把hello消息发送到A Actor 的mailbox (通过Dispatcher Message 转发)

object ActorsGame {
  def main(args: Array[String]): Unit = {
    //创建ActorSystem
    val actorFactory = ActorSystem("actorFactory")

    // 创建 bobref
    val bobActoref = actorFactory.actorOf(Props[BobActor], "bobActor")

    // 创建 alexref
    val alexActoref = actorFactory.actorOf(Props(new AlexActor(bobActoref)), "alexActor")

    alexActoref ! "start"
  }
}


// 构建  AlexActor 重写receive时候,传入bobref ,这样 alex才能给Bob 发信
class AlexActor (actorref: ActorRef)  extends  Actor {
  val bobref: ActorRef = actorref
  override def receive: Receive = {
    case "start" => {
      println("给自己发消息,开始沟通Bob")
      self ! "hello"
    }
    case "hello" => {
      println("准备给Bob 发消息 ")  // 必须持有bobref
      Thread.sleep(1000)
      bobref ! "hello Bob san"   // 发消息
    }
    case  "hello alex san" =>{
      println("收到Bob 发的消息")
      self !  "hello"
    }
  }
}


class BobActor extends  Actor {
  override def receive: Receive = {
    case "hello Bob san" => {
      println("接到 alex 发的消息")
      Thread.sleep(1000)
      sender() ! "hello alex san" //通过sender() 可以获取到发现消息的actor的ref
    }
  }
}
【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。