原生实现异步处理利器 —— Observable

举报
掘金安东尼 发表于 2022/09/16 17:00:23 2022/09/16
【摘要】 本篇带来用原生实现 Observable,一探内部究竟!!实现步骤解析如下:Observable 应该是一个 Subject 新对象类型;并且 Subject 对象有subscribe和next两个函数;subscribe 由 observers (观察者)调用,来订阅可观察的数据流;next由 Subject owner 调用,实现推送/发布新数据;Subject owner 应该知道 o...

本篇带来用原生实现 Observable,一探内部究竟!!

实现步骤解析如下:

  1. Observable 应该是一个 Subject 新对象类型;
  2. 并且 Subject 对象有subscribenext两个函数;
  3. subscribe 由 observers (观察者)调用,来订阅可观察的数据流;
  4. nextSubject owner 调用,实现推送/发布新数据;
  5. Subject owner 应该知道 observers 何时监听数据生效;
  6. 另外,Subject owner 也应该知道 observers 何时不再监听了;
  7. 再看 observer,它也应该可以随时被取消;
  8. 可以定义一个新的对象,Subscription
  9. Subscription 包含 unsubscribe 函数;
  10. 每个 observer 想要停止监听来自 Subject 的数据流时,可以调用unsubscribe实现;

有了以上思路之后,我们可以进一步来看代码实现~


代码实现:

Subscription

let Subscription = function(handlerId, unsubscribeNotificationCallback) {
	let self = this;

	self.unsubscribe = () => {
		if(unsubscribeNotificationCallback) {
			unsubscribeNotificationCallback(handlerId);
		}
	};
	
	return self;
};

注意:Subscription 仅在 unsubscribe 被调用时,通知 Subject


再看 Subject 完整代码实现:

let Subject = function(subscribersStateChangeNotificationCallback) {
	let self = this;
	
	let handlers = {};
	
	Object.defineProperty(self, "subscribersFound", {
		get() {
			let found = false;
			
			for(const prop in handlers) {
				if(handlers.hasOwnProperty(prop)) {
					found = true;
					break;
				}
			}
			
			return found;
		}
	});
	
	Object.defineProperty(self, "subscribersCount", {
		get() {
			let count = 0;
			
			for(const prop in handlers) {
				if(handlers.hasOwnProperty(prop)) {
					count++;
				}
			}
			
			return count;
		}
	});
	
	let unsubscribeNotificationCallback = (handlerId) => {
		if(handlerId && handlerId !== '' && handlers.hasOwnProperty(handlerId)) {
			delete handlers[handlerId];
			
			if(subscribersStateChangeNotificationCallback && !self.subscribersFound) {
				subscribersStateChangeNotificationCallback(false);
			}
		}
	};
	
	self.subscribe = (handler) => {
		let handlerId = createGuid();
		handlers[handlerId] = handler;
		
		if(subscribersStateChangeNotificationCallback && self.subscribersCount === 1) {
			subscribersStateChangeNotificationCallback(true);
		}
		
		return new Subscription(handlerId, unsubscribeNotificationCallback);
	};
	
	self.next = (data) => {
		for(const handlerId in handlers) {
			handlers[handlerId](data);
		}
	};
	
	return self;
};

let createGuid = function() {  
   return 'xxxxxxxx-xxxx-4xxx-yxxx-xxxxxxxxxxxx'.replace(/[xy]/g, function(c) {  
      var r = Math.random()*16|0, v = c === 'x' ? r : (r&0x3|0x8);  
      return v.toString(16);  
   });  
};

Subject Owner 实现:

.
.
.
let subscribersStateChangeNotificationCallback = (subscriberFound) => {
	if(!subscriberFound && isNowWatching) {
		stopWatching();
		isNowWatching = false;
	} else if(subscriberFound && !isNowWatching) {
		startWatching();
	}
};

self.data = new Subject(subscribersStateChangeNotificationCallback);
.
.
.
self.data.next(self.snapshot.data);
.
.
.

Observer 实现:

.
.
.
const dashboardServiceSubscription = myDashboardService.data.subscribe((data) => {
	...
});
.
.
.
dashboardServiceSubscription.unsubscribe();
.
.
.

小结:我们可以看到实现关键是 Subject 对象,更重要的是 发布订阅 的过程!当然,也不能忘了 取消订阅 的功能;

发布和订阅模式来处理异步可以忽视掉时间这个维度,就是不用管时间上的先后,就保证了顺序!这一点,在前面一篇函数式编程中也讲过:《XDM,JS如何函数式编程?看这就够了!(六)》 —— 减少时间状态!

不得不说,都是相通的~~ Σ(⊙▽⊙"a


OK,就是这样!这就是用原生模拟 Observable 的实现过程!

撰文不易,点赞鼓励👍👍👍👍👍👍

我是掘金安东尼,公众号同名,日拱一卒、日掘一金,再会~

【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。