反应式编程与RxPY
一、反应式编程(Reactive Programming)
1、简介
反应式编程是一种编程思想、编程方式,是为了简化并发编程而出现的。与传统的处理方式相比,它能够基于数据流中的事件进行反应处理。例如:a+b=c的场景,在传统编程方式下如果a、b发生变化,那么我们需要重新计算a+b来得到c的新值。而反应式编程中,我们不需要重新计算,a、b的变化事件会触发c的值自动更新。这种方式类似于我们在消息中间件中常见的发布/订阅模式。由流发布事件,而我们的代码逻辑作为订阅方基于事件进行处理,并且是异步处理的。
反应式编程中,最基本的处理单元是事件流(事件流是不可变的,对流进行操作只会返回新的流)中的事件。流中的事件包括正常事件(对象代表的数据、数据流结束标识)和异常事件(异常对象,例如Exception)。同时,只有当订阅者第一次发布者,发布者发布的事件流才会被消费,后续的订阅者只能从订阅点开始消费,但是我们可以通过背压、流控等方式控制消费。
2、反应式编程与函数式编程(Functional Programming)
函数式编程中的函数指的并不是编程语言中的函数(或方法),它指的是数学意义上的函数,即映射关系(如:y = f(x)),就是 y 和 x 的对应关系。函数式编程关心数据的映射。由于变量值是不可变的,对于值的操作并不是修改原来的值,而是修改新产生的值,原来的值保持不便。因此,相对于函数式编程来说,反应式编程基于消息驱动,在响应性、弹性等方面更好。
二、RxPY
RxPY是一个使用可观察集合和查询运算函数实现的基于异步和事件的python库,是reactiveX的python语言实现。
1、ReactiveX
Reactive Extensions 是一系列基于异步和事件程序构成的python库,它扩展了观察者模式来支持时间/事件序列,添加操作符,来允许开发人员在从低级线程/同步/线程安全/同步数据结构和非阻塞IO等进行抽象时,能够声明式地构成序列。ReactiveX主要包括Observalbe、Operators、Single、Subject、Scheduler五个功能模块,开发者使用Obsservables表示异步数据流,使用Operators查询异步数据流,使用Schedulers在数据/事件流中参数同步。
它有时会被称为“函数式反应式编程“,但这是一个误称。ReactiveX可能是函数式的,也可能是反应式的,但是”函数式反应式编程“是全然不同的。一个主要的区别就是”函数式反应式编程“操作的是随时间不断变化的数值,而ReactiveX操作的是时间上离散地发送的数据。
2、RxPY简介
使用Rx可以表示多个异步数据流(来自各种来源,例如股票报价,推文,计算机事件,Web服务请求等),并使用Observer对象订阅事件流。每当发生事件时,Observable都会通知订阅的Observer实例。同时也可以在源Observable和使用者Observer之间进行各种转换。 由于可观察序列是数据流,因此可以使用标准查询运算符来查询它们,这些查询运算符实现为可以与管道运算符链接的函数。因此,使用这些运算符可以轻松地对多个事件进行过滤,映射,缩小,组合和执行基于时间的操作。此外,还有许多其他特定于响应式流的运算符,可用来编写功能强大的查询。取消,异常和同步也可以通过使用专用运算符来正常处理。
3、使用
通过下面用例来介绍RxPY的使用:”
(1)创建Observable
from rx import create def push_five_strings(observer, scheduler): observer.on_next("Alpha") observer.on_next("Beta") observer.on_next("Gamma") observer.on_next("Delta") observer.on_next("Epsilon") observer.on_completed() source = create(push_five_strings) source.subscribe( on_next = lambda i: print("Received {0}".format(i)), on_error = lambda e: print("Error Occurred: {0}".format(e)), on_completed = lambda: print("Done!"),)
(2)Observable factories
from rx import of source = of("Alpha", "Beta", "Gamma", "Delta", "Epsilon") source.subscribe( on_next = lambda i: print("Received {0}".format(i)), on_error = lambda e: print("Error Occurred: {0}".format(e)), on_completed = lambda: print("Done!"),)
(3)Operators
from rx import of, operators as op source = of("Alpha", "Beta", "Gamma", "Delta", "Epsilon") composed = source.pipe( op.map(lambda s: len(s)), op.filter(lambda i: i >= 5)) composed.subscribe(lambda value: print("Received {0}".format(value)))
输出:
Received 5 Received 5 Received 5 Received 7
(4)定制Operator
import rx def lowercase(): def _lowercase(source): def subscribe(observer, scheduler = None): def on_next(value): observer.on_next(value.lower()) return source.subscribe( on_next, observer.on_error, observer.on_completed, scheduler) return rx.create(subscribe) return _lowercase rx.of("Alpha", "Beta", "Gamma", "Delta", "Epsilon").pipe( lowercase() ).subscribe(lambda value: print("Received {0}".format(value)))
输出:
Received alpha Received beta Received gamma Received delta Received epsilon
(5)同步
import multiprocessing import random import time from threading import current_thread import rx from rx.scheduler import ThreadPoolScheduler from rx import operators as ops def intense_calculation(value): # sleep for a random short duration between 0.5 to 2.0 seconds to simulate a long-running calculation time.sleep(random.randint(5, 20) * 0.1) return value # calculate number of CPU's, then create a ThreadPoolScheduler with that number of threads optimal_thread_count = multiprocessing.cpu_count() pool_scheduler = ThreadPoolScheduler(optimal_thread_count) # Create Process 1 rx.of("Alpha", "Beta", "Gamma", "Delta", "Epsilon").pipe( ops.map(lambda s: intense_calculation(s)), ops.subscribe_on(pool_scheduler) ).subscribe( on_next=lambda s: print("PROCESS 1: {0} {1}".format(current_thread().name, s)), on_error=lambda e: print(e), on_completed=lambda: print("PROCESS 1 done!"), ) # Create Process 2 rx.range(1, 10).pipe( ops.map(lambda s: intense_calculation(s)), ops.subscribe_on(pool_scheduler) ).subscribe( on_next=lambda i: print("PROCESS 2: {0} {1}".format(current_thread().name, i)), on_error=lambda e: print(e), on_completed=lambda: print("PROCESS 2 done!"), ) # Create Process 3, which is infinite rx.interval(1).pipe( ops.map(lambda i: i * 100), ops.observe_on(pool_scheduler), ops.map(lambda s: intense_calculation(s)),).subscribe( on_next=lambda i: print("PROCESS 3: {0} {1}".format(current_thread().name, i)), on_error=lambda e: print(e),)input("Press any key to exit\n")
————————————————
参考:
(1)https://blog.csdn.net/li_xiao_dai/article/details/80841642
(2)https://rxpy.readthedocs.io/en/latest/rationale.html
- 点赞
- 收藏
- 关注作者
评论(0)