RxJava的操作符

举报
周杰伦本人 发表于 2022/07/27 12:58:40 2022/07/27
【摘要】 RxJava的操作符 创建型操作符interval操作符是按照时间间隔来进行输出Observable.interval(10, TimeUnit.MILLISECONDS) .subscribe(aLong -> log.info(aLong.toString()));这段代码每隔10毫秒输出一次defer操作符是延迟创建,当有观察者订阅的时候才会输出消息Ob...

RxJava的操作符

创建型操作符

interval操作符是按照时间间隔来进行输出

Observable.interval(10, TimeUnit.MILLISECONDS)
                .subscribe(aLong -> log.info(aLong.toString()));

这段代码每隔10毫秒输出一次

defer操作符是延迟创建,当有观察者订阅的时候才会输出消息

Observable observable = Observable.just(foo.get());
       
Observable dObservable = Observable.defer(() -> Observable.just(foo.get()));

just操作符用来创建一个主题,并将参数弹出。

Observable.just("hello world" )
                .subscribe(s -> log.info("just string->" + s));

from操作符是以数组作为输入,创建主题对象,并将数组的元素一个一个输出


        String[] items = {"b", "c", "d", "e"};
        Observable.from(items)
                .subscribe(s -> log.info("just string->" + s));

range是整数范围作为输入,包括的区间的上限和下限

bservable.range(1, 8)
                .subscribe(i -> log.info("just int->" + i));

这行代码会输出1到8的所有整数

转换操作符

map操作符是转换的方法,接元素进行转换后弹出

这段代码是将所有元素乘以4之后输出

Observable.range(1, 4)
                .map(i -> i *i)
                .subscribe(i -> log.info(i.toString())); 

scan操作符是将每个数据累积,它会将上一个项的数据累积作为下一项的输入数据。

flatMap操作符是将元素变成一个新的主题后输出

Observable.range(1, 4)
                .flatMap(i -> Observable.range(1, i).toList())
                .subscribe(list -> log.info(list.toString()));

这段代码输出结果为四个数组

过滤型操作符

过滤型操作符顾名思义就是对结果进行过滤

Filter操作符

这段代码输出的是能被5整除的数

   Observable.range(1, 20)
                .filter(integer -> integer%5==0)
                .subscribe(i -> log.info("filter int->" + i));         

distinct是对消息重复数据过滤,已经发出去的元素不再发出

Observable.just("apple", "pair", "banana", "apple", "pair")
                .distinct()  //使用distinct过滤重复元素
                .subscribe(s -> log.info("distinct s->" + s));
           

输出结果为apple pair 和banana

聚合操作符

count操作符就是数据项进行统计,然后输出

String[] items = {"one", "two", "three","fore"};
        Integer count = Observable
                .from(items)
                .count()
                .toBlocking().single();
        log.info("计数的结果为 {}",count);

输出结果为4,Observable.toBlocking()是返回BlockingObservable阻塞实例,然后single()方法是阻塞当前线程,直到输出唯一的一个元素,如果有多个就会抛出异常。

reduce操作符和scan操作符差不多,scan操作符每次都要输出结果,reduce操作符只会输出最后的结果

其他操作符

take操作符是挑选前n个元素,skip操作符是跳过前n个元素

window操作符是按照固定数量n进行分组

List<Integer> srcList = Arrays.asList(10, 11, 20, 21, 30, 31);

Observable.from(srcList)
        .window(3)
        .flatMap(o -> o.toList())
        .subscribe(list -> log.info(list.toString()));

window(3)就是对数组按3个一组进行分组,window方法还可以有两个参数,
window(3, 1)就是按照3个一组,间隔为1

HystrixCommand

HystrixCommand用来封装RPC的调用,它有异步执行能力和同步执行能力,先进行缓存是否命中,如果启用了缓存,就可以使用缓存响应请求,、再判断熔断器是否打开,如果熔断开启了直接调用HystrixCommand的getFallback()方法进行服务降级处理,如果熔断器没有开启就判断线程池是否满了,然后没有满就开始执行run方法,如果满了同样执行getFallback()方法进行服务降级处理,自定义HystrixCommand类的时候可以重写run()方法和getFallback()方法,当出现错误的时候执行getFallback()方法

Hystrix的健康统计滑动窗口的执行过程是首先HystrixCommand的执行结果会被弹出,然后桶计数流会按照固定时间长度划分滚动窗口,然后按照执行结果进行累积统计

总结

RxJava的操作符有创建型操作符、转换操作符和过滤型操作符,然后分别对具体的方法进行了介绍,有聚合操作符和其他操作符包括window操作符,hystrix利用了window操作符进行健康统计等等,HystrixCommand是重要的一个类,主要实现它的run方法和getFallback()。

【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。