Akka-事件驱动新选择
在高并发场景解决方案中,多从线程角度出发,以解决线程安全问题,锁范围又需要多业务场景考虑,何时上锁,何时解锁,何时自动过期等,而事件驱动是从执行什么操作驱动的,在软件系统的设计层面,两者关联性不大,一个强调安全,一个强调策略,那么有没有两者结合解决并发编程难的事件驱动解决方案呢?带着场景解决方案我们走进Akka。
什么是Akka
官网:https://guobinhit.github.io/akka-guide/
Akka 是一个用 Scala 编写的库,用于在 JVM 平台上简化编写具有可容错的、高可伸缩性的 Java 和 Scala 的 Actor 模型应用,其同时提供了Java 和 Scala 的开发接口。Akka 允许我们专注于满足业务需求,而不是编写初级代码。在 Akka 中,Actor 之间通信的唯一机制就是消息传递。Akka 对 Actor 模型的使用提供了一个抽象级别,使得编写正确的并发、并行和分布式系统更加容易。Actor 模型贯穿了整个 Akka 库,为我们提供了一致的理解和使用它们的方法。
Actor 模型 VS Reactor模型
以Netty的Reactor模型为例(redis同理),本身的Reactor模型即是从事件驱动(NeetyEventLoop)的设计模式,Netty从io角度出发,分发请求,以Reactor对象分发调用链接,结合线程池以此提高多线程高并发的吞吐量。
Actor模型
而Akka的Actor模型重在消息传递,但是第一个特性仍然是事件驱动模型。注意这个多次出现的词,说明Akka的侧重点在于事件驱动
事件驱动模型:
Event-driven model
,Actor 通过响应消息来执行工作。Actor 之间的通信是异步的,允许 Actor 发送消息并继续自己的工作,而不是阻塞等待响应。强隔离原则:
Strong isolation principles
,与 Java 中的常规对象不同,Actor 在调用的方法方面,没有一个公共 API。相反,它的公共 API 是通过 Actor 处理的消息来定义的。这可以防止 Actor 之间共享状态;观察另一个 Actor 状态的唯一方法是向其发送请求状态的消息。位置透明:
Location transparency
,系统通过工厂方法构造 Actor 并返回对实例的引用。因为位置无关紧要,所以 Actor 实例可以启动、停止、移动和重新启动,以向上和向下扩展以及从意外故障中恢复。轻量级:
Lightweight
,每个实例只消耗几百个字节,这实际上允许数百万并发 Actor 存在于一个应用程序中。
第一个Akka的java程序
在官网下创建第一个Akkademo,点击网站-->create a project for me即可
https://developer.lightbend.com/start/?group=akka&project=akka-quickstart-java
Greet
:向Greeter
执行问候的指令;Greeted
:Greeter
用来确认问候发生时回复的消息;SayHello
:GreeterMain
开始执行问候进程的指令;
这样看的话不如直接进入test
@Testpublic void testGreeterActorSendingOfGreeting() { //testKit为单元测试提前加入SpringBean而准备的对象 TestProbe<Greeter.Greeted> testProbe = testKit.createTestProbe(); ActorRef<Greeter.Greet> underTest = testKit.spawn(Greeter.create(), "greeter"); 得到Greet问候(tell)回复消息对象 underTest.tell(new Greeter.Greet("Charles", testProbe.getRef())); //发送消息 testProbe.expectMessage(new Greeter.Greeted("Charles", underTest));}
在哪接收消息?
com.example.Greeter#createReceive在本类的方法中我们记录日志
@Override public Receive<Greet> createReceive() { log.info("在此接收....."); return newReceiveBuilder().onMessage(Greet.class, this::onGreet).build(); } private Behavior<Greet> onGreet(Greet command) { getContext().getLog().info("Hello {}!", command.whom); //#greeter-send-message command.replyTo.tell(new Greeted(command.whom, getContext().getSelf())); //#greeter-send-message return this; }
控制台输出
[2023-03-07 14:49:45,909] [INFO] [akka.event.slf4j.Slf4jLogger] [AkkaQuickstartTest-akka.actor.default-dispatcher-3] [] - Slf4jLogger started[2023-03-07 14:49:46,119] [INFO] [com.example.Greeter] [AkkaQuickstartTest-akka.actor.default-dispatcher-5] [] - 在此接收.....[2023-03-07 14:49:46,123] [INFO] [com.example.Greeter] [AkkaQuickstartTest-akka.actor.default-dispatcher-5] [akka://AkkaQuickstartTest/user/greeter] - Hello Charles![2023-03-07 14:49:46,226] [INFO] [akka.actor.CoordinatedShutdown] [AkkaQuickstartTest-akka.actor.default-dispatcher-3] [CoordinatedShutdown(akka://AkkaQuickstartTest)] - Running CoordinatedShutdown with reason [ActorSystemTerminateReason]Disconnected from the target VM, address: '127.0.0.1:64809', transport: 'socket'
这个消息的发送接收,由消息接收方去处理业务逻辑的方式,与MQ中间件思路无疑,而我们应用Akka时应该考虑到应用场景,既然应用于事件驱动,那么其处理的消息必须要考虑到非实时性场景的。即最终一致性的消息数据适配
集成Springboot
1.引入依赖
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId></dependency><dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope></dependency><dependency> <groupId>com.typesafe.akka</groupId> <artifactId>akka-slf4j_2.11</artifactId> <version>2.5.16</version></dependency>
2.Actor生产者
public class ActorProducer implements IndirectActorProducer { private ApplicationContext context; private String beanName;
public ActorProducer(ApplicationContext context,String beanName){ this.context=context; this.beanName=beanName; }
@Override public Actor produce() { return (Actor) context.getBean(beanName); }
@Override public Class<? extends Actor> actorClass() { return (Class<? extends Actor>) context.getType(beanName); }}
3.构造Props创建ActorRef
public class SpringExt implements Extension { private ApplicationContext context;
public void init(ApplicationContext context) { System.out.println("applicationContext初始化..."); this.context = context; } public Props create(String beanName) { return Props.create(ActorProducer.class, this.context, beanName); }}
4.创建Provider继承AbstractExtensionId
public class SpringExtProvider extends AbstractExtensionId<SpringExt> { private static SpringExtProvider provider = new SpringExtProvider(); public static SpringExtProvider getInstance() { return provider; } @Override public SpringExt createExtension(ExtendedActorSystem extendedActorSystem) { return new SpringExt(); }}
5.初始化ActorSystem
@Configurationpublic class ScanConfig { private final ApplicationContext context; @Autowired public ScanConfig(ApplicationContext context) { this.context = context; } @Bean public ActorSystem createSystem() { ActorSystem system = ActorSystem.create("system"); SpringExtProvider.getInstance().get(system).init(context); return system; }}
6.消息的接收者
@Component@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)public class TestActor extends AbstractActor { @Override public Receive createReceive() { return receiveBuilder().matchAny(o -> { System.out.println("接受到消息:" + o); }).build(); }}
7.单元测试
@SpringBootTest@RunWith(SpringRunner.class)public class LeetcodeApplicationTests {
@Autowired private ActorSystem actorSystem;
@Test public void contextLoads() { ActorRef ref = actorSystem.actorOf(SpringExtProvider.getInstance().get(actorSystem).create("testActor"), "testActor"); ref.tell("Hello,Akka", ActorRef.noSender()); } }
End to End Argument
在End to End Argument论文中指出:
端到端的可靠通信,只能通过通信两端的application层来保证,而中间件(比如SQS, Kinesis, ActiveMQ, 到更底层Netty乃至TCP)只能提高效率,而无法保证通信的可靠性
这里将消息传递的安全保证提升到不可企及的高度,当消息接收到ack之后,仍不能保证此条消息的读取者为本人,为了保证消息为本人接收,还需加入令牌/口令来实现密文的反编译。只要中间件都做不到可靠通信,如果我自己理解的有问题,那么对于中间件来说就失去了原本的意义,中间件本身也不是解决可靠性问题,主要解决的是分布式环境下数据传输、数据访问、应用调度、系统构建和系统集成、流程管理等问题。从全局可靠性来考虑,就要从消息的发送,收,传递,确认等流程来确认,从业务角度出发,而不是强调中间件的不可靠性,而且万事都有不可确定性,如果真的提升到如此角度,那么确实不用开发了。
参考博文:
EDAhttps://softobiz.com/understanding-the-event-driven-architecture/AKKAhttps://doc.akka.io/docs/akka/current/typed/guide/tutorial_1.htmlEnd to End Argumenthttps://zhuanlan.zhihu.com/p/55311553
- 点赞
- 收藏
- 关注作者
评论(0)