发布订阅模式及其 在 Rust 语言中的使用
作者:李俊才 (jcLee95):https://blog.csdn.net/qq_28550263?spm=1001.2101.3001.5343
邮箱 :291148484@163.com
本文地址:https://blog.csdn.net/qq_28550263/article/details/130877457
【介绍】:本文介绍发布订阅模式的相关思想,以及第三方模块 EventEmitter 的使用。
推荐阅读:
《发布订阅模式原理及其应用(多种语言实现)》 这篇是我早前的博客,里面使用了 Powershell、Dart、Python、TypeScript 讲解或实现了一个 EventEmitter 对象。但是当时还没有考虑使用 Rust。本文多数内容直接来源于该博文,主要是将语言替换成了 Rust。
@[TOC](
)
1. 引例:从我的一个经历说起
1.1 从 订阅 到 发布
记得一九年的时候我刚刚来到深圳工作,众所周知那时候还没有爆发 新冠疫情,身边的同事们组队去香港购物是常有的事情。但是那会儿我还没有办理港澳通行证,于是年底回老家的时候去当地政务中心办理了。
办证是需要时间的,万万没想到的是二零年春节前夕——新冠疫情爆发了。当我回到深圳后的某一天接到老家政务中心的电话,通知我由于疫情的原因,通信证的办理已经被暂停了并且什么时候恢复办理还不能确定,如果愿意等待,则需要到恢复办证的时候,再通知我们。
—— 这就是一个 发布-订阅模式的典型例子。
发布-订阅 模式 模式中的多方可以分为两类,一类是消息的 发布者,另外一类是消息的 订阅者。在上面的案例中,政务中心的工作人员就是 发布者,当我表示愿意等到恢复通信证办理时,我就 向发布者订阅了 恢复办理的通知(消息),因此我时消息的 订阅者。
这样有什么好处呢:
- 对于我(订阅者)来说,不需要每隔几天就打电话到政务服务中心(发布者)去询问是否恢复办理的消息;
- 对于政务服务中心(发布者)同样也不需要每天回答相同的问题——毕竟何时恢复办理他们也不能确定。
- 一旦恢复办理,政务服务中心(发布者)可以一次性地通知所有和我一样地广大订阅者。
看到了吗——相比于我们去轮询以获取消息,改用发布-订阅 模式 同时节省了我们双方地时间!
多么棒地思想!——运用于程序设计中岂不秒哉?
1.2 如果我不想继续订阅了
有一种情况也是非常常见的,那就是我不愿意继续等待消息了,也有可能是这个消息对我来说已经不重要了。这时我不再希望继续收到来自发布者的恢复办理通知,那就需要 退订。
还记得吗——当我们订阅的时候,是将我们的订阅意愿登记在发布者那边的,这样就能实现发布者在适当的时候通过查询 所有的登记记录 然后逐一通知。
因此如果一旦有用户需要退订,其实很简单,只需要订阅者在他们所登记订阅的“订阅者登陆表”中将订阅信息删除掉即可,这样下一次广播通知的时候就不会再将消息发送给退订的用户。
2. 发布-订阅 的 实践、应用、思考
2.1 实践:用 Rust 来复现上面的场景
如果现在你好像明白 发布-订阅模式 的基本思想了——那么就请成热打铁,跟着我用程序来模拟一下证件办理的情景。
use std::collections::HashSet;
#[derive(Eq, Hash, PartialEq, Clone)]
struct Subscriber {
name: String,
}
struct Publisher {
subscribers: HashSet<Subscriber>,
name: String,
}
impl Publisher {
fn new(name: &str) -> Publisher {
Publisher {
subscribers: HashSet::new(),
name: name.to_string(),
}
}
fn add_subscriber(&mut self, subscriber: &Subscriber) {
self.subscribers.insert(subscriber.clone());
}
fn remove_subscriber(&mut self, subscriber: &Subscriber) {
self.subscribers.remove(subscriber);
println!("\n=> {} 已取消订阅。\n", subscriber.name);
}
fn notify_all(&self, arg: &str) {
for subscriber in &self.subscribers {
subscriber.notify(self, arg);
}
}
}
impl Subscriber {
fn new(name: &str) -> Subscriber {
Subscriber {
name: name.to_string(),
}
}
fn notify(&self, publisher: &Publisher, arg: &str) {
println!(
"\"{}\"(订阅者) 收到的通知来自 \"{}\"(发布者)的通知: {}",
self.name, publisher.name, arg
);
}
}
fn main() {
let mut publisher = Publisher::new("政务服务中心");
let jack_lee = Subscriber::new("jackLee");
let jack_ma = Subscriber::new("jackMa");
publisher.add_subscriber(&jack_lee);
publisher.add_subscriber(&jack_ma);
println!("------- 第一次发布消息 -------");
publisher.notify_all("[通知] 恢复证件办理!");
// 用户 jackMa 取消订阅
publisher.remove_subscriber(&jack_ma);
println!("------- 第二次发布消息 -------");
publisher.notify_all("[通知] 恢复证件办理!");
}
运行该脚本,输出如下:
------- 第一次发布消息 -------
"jackLee"(订阅者) 收到的通知来自 "政务服务中心"(发布者)的通知: [通知] 恢复证件办理!
"jackMa"(订阅者) 收到的通知来自 "政务服务中心"(发布者)的通知: [通 知] 恢复证件办理!
=> jackMa 已取消订阅。
------- 第二次发布消息 -------
"jackLee"(订阅者) 收到的通知来自 "政务服务中心"(发布者)的通知: [通知] 恢复证件办理!
可以看到订阅者有两个订阅者实例:jackLee
(本人)、jackMa
(可能是阿里出来的)共同订阅了证件办理信息。
政务服务中心(发布者)第一次发布恢复通知的时候,jackLee
和 jackMa
这两个同学都订阅了消息,因此都受到了来自该中心的通知。后来,jackMa
可能由于已经派小弟火速前往该中心取走了他的证件不需要继续订阅了,于是该中心的工作人(发布者)员调用publisher.remove_subscriber(jackMa)
从该中心的订阅者记录表中移除了 jackMa
的订阅记录。
于是,到了该中心第二次发布消息的时候,jackMa
已经不会再收到恢复证件办理消息,而 jackLee
还可以接收到恢复证件办理的消息。
3. 通用型发布者对象的改进
3.1 从 Subscriber 的服务员 到 事件的发布者
在阅读本小节前请读者先自己尝试回答这个问题:Subscriber 类真的有必要实现吗?
在我们上面的代码中,Subscriber 类 实现了几乎唯一一个有用的方法:update
,它的作用却是给 Publisher 类的 notifyAll
方法进行调用。
从现实生活中给一个解释:
notifyAll
是消息的发布这发布消息的工具,update
是订阅用户接受到的新的定制化消息,比如同样是订阅了售房信息,但是由于不同类别的购房者订阅时所选定的楼层、大小等参数不一样,则这些不同订阅者接收到的发布结果不一样——也就表面原始的消息需要为不同的订阅者做一些 定制化 处理。在之前的代码实现中,这个消息的定制化工作就是使用 Subscriber 类的 update
方法实现的。
很显然,上表面的代码要真正实现定制化,往往不仅是参数值的不同,可能对参数的处理也不一样。因此仅仅依赖参数data
是不合理的。因此我们大概是需要写多个仅仅 update
方法的实现不同的 Subscriber 类——这不太好。略好一点的办法是,让 update
方法接受的不是单纯的数据 data
,而是一个 回调函数 传入 update
方法中。
先不着急修改我们的代码。对于发布者来说,似乎可以提供 更加周到 的服务——直接登记好订阅着的定制化需求处理方式,使用订阅者要求的处理方式处理好定制的消息后,直接告诉订阅者。——因此 update
这个接受表示用户定制化需求处理方式的方法可以直接合并到 发布者那边。
于是 Subscriber 类 就不需要了,现在我们只需要更新一下我们的 Publisher。更新的思路是这样的:
- 添加订阅者时(
Publisher.addSubscriber
)不仅需要记录订阅者名字,还要记录一个对应的响应函数用以消息发布后给订阅者提供定制化服务。
从 Publisher 看,需要登记的内容又多了一些。不过好在 订阅者名称(认为是唯一标识符)和 与之对于的服务(回调函数),是对应的关系,既可以一对一,也可以一对多(表示这个订阅者需要多个定制化服务)。因此我们将 Publisher 的 “记录本”改成下面的类型:
HashMap<String, Vec<EventCallback>>
这个映射(Map)的 key 就表示 订阅者名称,而value 部分是一组函数,表示该订阅者需要的各种服务。
另外,到了这里,对于 Publisher来说,添加订阅者就转化为了 为订阅者订阅各种定制化服务 。同时反过来看,对于某个具体的订阅者 Subscriber,一旦它的服务定制数组 (Function[]
)为空数组,表明他已经没有任何订阅,也不再需要接收发布者的任何消息了。
因此先前我们使用的方法名addSubscriber
不适用了,从含以上换成 addListener
似乎更加合适。
为什么呢? 我们接下来对此做进一步说明。
一直以来,我们聚焦点都在于 发布者 和 订阅者,而忽略了 引起发布者发布的事件。 这个方法接受两个参数,一个是用户名,一个是为用户新增的回调。同时必须指出的是,这个 回调 往往是需要再其调用时接受一些数据的,比如由发布者发布的某些原始数据,他们就像是时时刻刻地 监听着、守候着 发布者 发布一个事件,一旦这个 事件/消息 被发布,就 完成消息发布后为 订阅者 所提供地服务。
换一下思路,我们接着把聚焦点转移到 事件 上来。
其实从现实中看,同一个事件发生,可能意味着可能需 要干很多事,既可以 服务更多的订阅者,也可以干其它任何的工作——我们一味地想着在发布者处登记订阅者的id然后完成订阅者的需求,那么 没有区别为何事件
而需要去发布这些消息!
更好的做法是 不再记录订阅者,而是记录为什么要发布消息给订阅者——也就是记录 事件。这样我们就可以在同一个事件发生的时候,通过一系列的属于该事件函数(可能一个或多个回调函数服务于同一个订阅者),完成该事件的响应,也就是回调函数们。
从这个意义上看,我们所关注所谓的 订阅者 可以看作 一个事件发布后,发布者需要调用的一组函数。而所谓 发布,实际上就是调用这组函数以 完成事件(的回调)。因此我们接下来该用 listener
表示监听事件以待执行的回调函数, event
表示事件名, emit
表示这个事件发生后需要由发布者调用函数的过程。
至此,我们的 Publisher 从一个 Subscriber 的服务员 转型成为了职业 事件的管理者,不妨给它改个名——EventEmitter。
现在我们实现一个最基础的 EventEmitter 对象:
use std::collections::HashMap;
type EventCallback = Box<dyn Fn()>;
struct EventEmitter {
/// 事件-监听器数组容器
_events: HashMap<String, Vec<EventCallback>>,
}
impl EventEmitter {
fn new() -> EventEmitter {
EventEmitter {
events: HashMap::new(),
}
}
/// 添加事件监听器,监听器是一个回调函数,表示用户订阅的具体服务
fn add_listener(&mut self, event: &str, callback: EventCallback) {
let callbacks = self._events.entry(event.to_string()).or_insert(Vec::new());
callbacks.push(callback);
}
/// 移除事件监听器:相当于用户取消订阅
fn remove_listener(&mut self, event: &str, callback: &EventCallback) {
if let Some(callbacks) = self._events.get_mut(event) {
callbacks.retain(|cb| cb != callback);
}
}
/// 触发事件:相当于发布消息或服务,也就是事件发生时,将订阅者订阅的服务一一为订阅者执行
fn emit(&self, event: &str) {
if let Some(callbacks) = self._events.get(event) {
for callback in callbacks {
callback();
}
}
}
}
3.2 一个比较初步的功能增强
不过很多时候我们还不满足于此,比如能够限制监听器的数量。从现实生活中打个比方,就像我们只服务一定数量的客户,一旦订满,由于资源有限,不再接收其它订阅。更贴切实际地说,就像节假日你去旅游景区地酒店订房间,对于酒店来说,一旦所有房间都预定满了,就不再接收新的订阅了——除非,有已经订阅地客人退订了它们先前已经预定地房间。
实现这样一个功能,只需要一个变量 _max_listeners
作为最大监听器数量的控制变量。在外部相应的我们需要允许用户修改和读取该变量的值,因此还要提供 set_max_listeners
和 get_max_listeners
两个方法。
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
type EventCallback = Arc<dyn Fn() + Send + Sync>;
pub struct EventEmitter {
_events: Mutex<HashMap<String, Vec<EventCallback>>>,
_max_listeners: usize,
}
impl EventEmitter {
pub fn new() -> Self {
EventEmitter {
_events: Mutex::new(HashMap::new()),
_max_listeners: usize::MAX,
}
}
/// 设置最大监听器数量
/// Set the maximum number of listeners
pub fn set_max_listeners(&mut self, max_listeners: usize) {
self._max_listeners = max_listeners;
}
/// 获取最大监听器数量
pub fn get_max_listeners(&self) -> usize {
self._max_listeners
}
/// 添加事件监听器
pub fn add_listener(&self, event: &str, callback: EventCallback) {
let mut events = self._events.lock().unwrap();
let callbacks = events.entry(event.to_string()).or_insert(Vec::new());
callbacks.push(callback);
}
/// 移除事件监听器
pub fn remove_listener(&self, event: &str, callback: &EventCallback) {
let mut events = self._events.lock().unwrap();
if let Some(callbacks) = events.get_mut(event) {
callbacks.retain(|cb| !Arc::ptr_eq(cb, callback));
}
}
/// 触发事件
pub fn emit(&self, event: &str) {
let events = self._events.lock().unwrap();
if let Some(callbacks) = events.get(event) {
for callback in callbacks {
let callback_clone = callback.clone();
// Spawn a new thread to run each callback asynchronously
std::thread::spawn(move || {
(*callback_clone)();
});
}
}
}
}
4. 使用现成的第三方模块:EventEmitter
4.1 EventEmitter 的安装
你可以直接使用 cargo
包管理器安装 EventEmitter:
cargo add EventEmitter
4.2 在你的项目中使用 EventEmitter
以下是一段包括引入 EventEmitter 和使用的例子:
use std::sync::{Arc};
use EventEmitter::EventEmitter;
fn main() {
let emitter = EventEmitter::new();
let callback1 = Arc::new(|| println!("[event1 emitted]: The first callback of event1 has been called."));
let callback2 = Arc::new(|| println!("[event1 emitted]: The second callback of event1 has been called."));
let callback3 = Arc::new(|| println!("[event2 emitted]: The only one callbask of event2 has been called."));
// Add event listener
emitter.on("event1", callback1);
emitter.on("event1", callback2);
emitter.on("event2", callback3);
let ct1 = emitter.listener_count("event1");
let ct2 = emitter.listener_count("event2");
println!("Number of Listeners for event1 is: {ct1}, \nNumber of Listeners for event2 is: {ct2}");
emitter.emit("event1"); // Emit event1
emitter.emit("event2"); // Emit event1
}
运行项目:
cargo run
可以看到控制台打印结果:
Number of Listeners for event1 is: 2,
Number of Listeners for event2 is: 1
4.3 EventEmitter 实例上的方法
4.3.1 set_max_listeners 方法
pub fn set_max_listeners(&mut self, max_listeners: usize)
设置最大监听器数量。
4.3.2 set_max_listeners 方法
pub fn get_max_listeners(&self) -> usize
获取最大监听器数量。
4.3.3 on 方法
pub fn on(&self, event: &str, callback: Arc<dyn Fn() + Send + Sync>)
添加事件监听器。
4.3.4 add_listener 方法
pub fn add_listener(&self, event: &str, callback: Arc<dyn Fn() + Send + Sync>)
添加事件监听器,是 on 方法的别名。
4.3.5 off 方法
pub fn off(&self, event: &str, callback: &Arc<dyn Fn() + Send + Sync>)
移除事件监听器。
4.3.6 remove_listener 方法
pub fn remove_listener(
&self,
event: &str,
callback: &Arc<dyn Fn() + Send + Sync>
)
移除事件监听器,是 off 方法的别名。
4.3.7 emit 方法
pub fn emit(&self, event: &str)
触发事件。
触发相当于“发布-订阅”模式中的“发布”,一但某个事件被触发,该事件对应得所有监听器函数都会被执行。监听器就相当于“订阅者”。
4.3.8 remove_all_listeners 方法
pub fn remove_all_listeners(&self, event: &str)
移除所有事件的所有监听器。
4.3.9 prepend_listener 方法
pub fn prepend_listener(
&self,
event: &str,
callback: Arc<dyn Fn() + Send + Sync>
)
从指定事件监听器向量的前方插入新的监听器。该方法与使用 on
、add_listener
方法添加新的监听器时,插入监听器向量的方向相反。
4.3.10 listeners 方法
pub fn listeners(&self, event: &str) -> Vec<Arc<dyn Fn() + Send + Sync>>
获取指定事件的监听器。
4.3.11 listener_count 方法
pub fn listener_count(&self, event: &str) -> usize
获取指定事件的监听器数量。
- 点赞
- 收藏
- 关注作者
评论(0)