反应式编程与RxPY

举报
Jiang Yuan 发表于 2019/12/30 17:25:28 2019/12/30
【摘要】 一、反应式编程(Reactive Programming)1、简介 反应式编程是一种编程思想、编程方式,是为了简化并发编程而出现的。与传统的处理方式相比,它能够基于数据流中的事件进行反应处理。例如:a+b=c的场景,在传统编程方式下如果a、b发生变化,那么我们需要重新计算a+b来得到c的新值。而反应式编程中,我们不需要重新计算,a、b的变化事件会触发c的值自动更新。这种方式类似于...

一、反应式编程(Reactive Programming)

1、简介   

    反应式编程是一种编程思想、编程方式,是为了简化并发编程而出现的。与传统的处理方式相比,它能够基于数据流中的事件进行反应处理。例如:a+b=c的场景,在传统编程方式下如果a、b发生变化,那么我们需要重新计算a+b来得到c的新值。而反应式编程中,我们不需要重新计算,a、b的变化事件会触发c的值自动更新。这种方式类似于我们在消息中间件中常见的发布/订阅模式。由流发布事件,而我们的代码逻辑作为订阅方基于事件进行处理,并且是异步处理的。

    

    反应式编程中,最基本的处理单元是事件流(事件流是不可变的,对流进行操作只会返回新的流)中的事件。流中的事件包括正常事件(对象代表的数据、数据流结束标识)和异常事件(异常对象,例如Exception)。同时,只有当订阅者第一次发布者,发布者发布的事件流才会被消费,后续的订阅者只能从订阅点开始消费,但是我们可以通过背压、流控等方式控制消费。


2、反应式编程与函数式编程(Functional Programming)

    函数式编程中的函数指的并不是编程语言中的函数(或方法),它指的是数学意义上的函数,即映射关系(如:y = f(x)),就是 y 和 x 的对应关系。函数式编程关心数据的映射。由于变量值是不可变的,对于值的操作并不是修改原来的值,而是修改新产生的值,原来的值保持不便。因此,相对于函数式编程来说,反应式编程基于消息驱动,在响应性、弹性等方面更好。


二、RxPY

    RxPY是一个使用可观察集合和查询运算函数实现的基于异步和事件的python库,是reactiveX的python语言实现。

1、ReactiveX

    Reactive Extensions 是一系列基于异步和事件程序构成的python库,它扩展了观察者模式来支持时间/事件序列,添加操作符,来允许开发人员在从低级线程/同步/线程安全/同步数据结构和非阻塞IO等进行抽象时,能够声明式地构成序列。ReactiveX主要包括Observalbe、Operators、Single、Subject、Scheduler五个功能模块,开发者使用Obsservables表示异步数据流,使用Operators查询异步数据流,使用Schedulers在数据/事件流中参数同步。

    它有时会被称为“函数式反应式编程“,但这是一个误称。ReactiveX可能是函数式的,也可能是反应式的,但是”函数式反应式编程“是全然不同的。一个主要的区别就是”函数式反应式编程“操作的是随时间不断变化的数值,而ReactiveX操作的是时间上离散地发送的数据。


2、RxPY简介

    使用Rx可以表示多个异步数据流(来自各种来源,例如股票报价,推文,计算机事件,Web服务请求等),并使用Observer对象订阅事件流。每当发生事件时,Observable都会通知订阅的Observer实例。同时也可以在源Observable和使用者Observer之间进行各种转换。 由于可观察序列是数据流,因此可以使用标准查询运算符来查询它们,这些查询运算符实现为可以与管道运算符链接的函数。因此,使用这些运算符可以轻松地对多个事件进行过滤,映射,缩小,组合和执行基于时间的操作。此外,还有许多其他特定于响应式流的运算符,可用来编写功能强大的查询。取消,异常和同步也可以通过使用专用运算符来正常处理。


3、使用

通过下面用例来介绍RxPY的使用:”

(1)创建Observable

from rx import create
def push_five_strings(observer, scheduler):
    observer.on_next("Alpha")
    observer.on_next("Beta")
    observer.on_next("Gamma")
    observer.on_next("Delta")
    observer.on_next("Epsilon")
    observer.on_completed()
    
source = create(push_five_strings)
source.subscribe(
    on_next = lambda i: print("Received {0}".format(i)),
    on_error = lambda e: print("Error Occurred: {0}".format(e)),
    on_completed = lambda: print("Done!"),)

(2)Observable factories

from rx import of

source = of("Alpha", "Beta", "Gamma", "Delta", "Epsilon")

source.subscribe(
    on_next = lambda i: print("Received {0}".format(i)),
    on_error = lambda e: print("Error Occurred: {0}".format(e)),
    on_completed = lambda: print("Done!"),)

(3)Operators

from rx import of, operators as op
source = of("Alpha", "Beta", "Gamma", "Delta", "Epsilon")
composed = source.pipe(
    op.map(lambda s: len(s)),
    op.filter(lambda i: i >= 5))
    
composed.subscribe(lambda value: print("Received {0}".format(value)))

输出:

Received 5
Received 5
Received 5
Received 7

(4)定制Operator

import rx
def lowercase():
    def _lowercase(source):
        def subscribe(observer, scheduler = None):
            def on_next(value):
                observer.on_next(value.lower())

            return source.subscribe(
                on_next,
                observer.on_error,
                observer.on_completed,
                scheduler)
        return rx.create(subscribe)
    return _lowercase
rx.of("Alpha", "Beta", "Gamma", "Delta", "Epsilon").pipe(
        lowercase()
     ).subscribe(lambda value: print("Received {0}".format(value)))

输出:

Received alpha
Received beta
Received gamma
Received delta
Received epsilon

(5)同步

import multiprocessing
import random
import time
from threading import current_thread

import rx
from rx.scheduler import ThreadPoolScheduler
from rx import operators as ops

def intense_calculation(value):
    # sleep for a random short duration between 0.5 to 2.0 seconds to simulate a long-running calculation
    time.sleep(random.randint(5, 20) * 0.1)
    return value
    
# calculate number of CPU's, then create a ThreadPoolScheduler with that number of threads
optimal_thread_count = multiprocessing.cpu_count()
pool_scheduler = ThreadPoolScheduler(optimal_thread_count)

# Create Process 1
rx.of("Alpha", "Beta", "Gamma", "Delta", "Epsilon").pipe(
    ops.map(lambda s: intense_calculation(s)), ops.subscribe_on(pool_scheduler)
    ).subscribe(
    on_next=lambda s: print("PROCESS 1: {0} {1}".format(current_thread().name, s)),
    on_error=lambda e: print(e),
    on_completed=lambda: print("PROCESS 1 done!"),
)

# Create Process 2
rx.range(1, 10).pipe(
    ops.map(lambda s: intense_calculation(s)), ops.subscribe_on(pool_scheduler)
    ).subscribe(
    on_next=lambda i: print("PROCESS 2: {0} {1}".format(current_thread().name, i)),
    on_error=lambda e: print(e),
    on_completed=lambda: print("PROCESS 2 done!"),
)
# Create Process 3, which is infinite
rx.interval(1).pipe(
    ops.map(lambda i: i * 100),
    ops.observe_on(pool_scheduler),
    ops.map(lambda s: intense_calculation(s)),).subscribe(
    on_next=lambda i: print("PROCESS 3: {0} {1}".format(current_thread().name, i)),
    on_error=lambda e: print(e),)input("Press any key to exit\n")



————————————————

参考:

(1)https://blog.csdn.net/li_xiao_dai/article/details/80841642

(2)https://rxpy.readthedocs.io/en/latest/rationale.html





【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。