响应式编程(Reactive Programming with Reactor)!

举报
喵手 发表于 2026/01/15 17:49:54 2026/01/15
【摘要】 开篇语哈喽,各位小伙伴们,你们好呀,我是喵手。运营社区:C站/掘金/腾讯云/阿里云/华为云/51CTO;欢迎大家常来逛逛  今天我要给大家分享一些自己日常学习到的一些知识点,并以文字的形式跟大家一起交流,互相学习,一个人虽可以走的更快,但一群人可以走的更远。  我是一名后端开发爱好者,工作日常接触到最多的就是Java语言啦,所以我都尽量抽业余时间把自己所学到所会的,通过文章的形式进行输出,...

开篇语

哈喽,各位小伙伴们,你们好呀,我是喵手。运营社区:C站/掘金/腾讯云/阿里云/华为云/51CTO;欢迎大家常来逛逛

  今天我要给大家分享一些自己日常学习到的一些知识点,并以文字的形式跟大家一起交流,互相学习,一个人虽可以走的更快,但一群人可以走的更远。

  我是一名后端开发爱好者,工作日常接触到最多的就是Java语言啦,所以我都尽量抽业余时间把自己所学到所会的,通过文章的形式进行输出,希望以这种方式帮助到更多的初学者或者想入门的小伙伴们,同时也能对自己的技术进行沉淀,加以复盘,查缺补漏。

小伙伴们在批阅的过程中,如果觉得文章不错,欢迎点赞、收藏、关注哦。三连即是对作者我写作道路上最好的鼓励与支持!

1) 响应式流(Reactive Streams)与背压(Backpressure)简介

  • 目的:在异步/流式场景中以非阻塞方式传输数据,并且让消费者能对生产者“申请”它能处理的数量(背压),从而避免 OOM / 线程耗尽。Reactive Streams 是 JVM 上的规范,定义了 Publisher/Subscriber/Subscription/Processor 等契约。
  • 核心行为:消费者通过 Subscription.request(n) 控制速率;生产者必须尊重请求量(否则会违背规范并产生资源问题)。常见误区是“认为背压自动解决一切” —— 实际上上游要支持或使用限速/缓冲策略。

2) Reactor(Mono / Flux)基础与常用操作符(关键模式)

  • Mono<T>:0…1 个元素流;Flux<T>:0…N 个元素流。Reactor 是对 Reactive Streams 的实现并提供大量组合操作符。

  • 创建方式Mono.just(..), Mono.empty(), Mono.fromCallable(...), Flux.just(...), Flux.fromIterable(...), Flux.create(...)

  • 变换/组合基础

    • map:同步变换单元素
    • flatMap:异步/并发地把元素映射为 Publisher(常用于 I/O 调用)
    • concatMap:保证顺序(串行地等待上一个完成)
    • zip/merge/concat:多流合并与配对
    • switchMap:切换到最新的 inner publisher(典型用于用户输入的“防抖”)
  • 常见组合模式

    • “串联异步步骤”:Mono.just(x).flatMap(this::step1).flatMap(this::step2)
    • “并行等待多源结果”:Mono.zip(m1, m2, (a,b)->combine)
    • “批量流处理并发限制”:flux.flatMap(f, concurrency)(不要把并发设置得太大)
  • 注意flatMap 默认并行(无序)行为可能导致不可预期的并发副作用;想保序用 concatMap

3) 把阻塞 API 转为非阻塞(实用策略)

现实中最多见的挑战是“第三方库/旧代码是阻塞的” —— 需要把它们包成不会阻塞 Reactor 的非阻塞路径:

  • 1) 最优:使用原生异步/非阻塞实现(例如使用 R2DBC 而不是 JDBC,使用 Reactor Netty / WebClient 而不是 Apache blocking 客户端)。

  • 2) 若无法改库:把阻塞调用移到专用阻塞线程池

    • Mono.fromCallable(() -> blockingCall()).subscribeOn(Schedulers.boundedElastic()) —— 用 boundedElastic() 处理短期阻塞 I/O(它会弹性扩容但有上限)。
    • 切记不要用 Schedulers.parallel()(用于 CPU 任务且标记为非阻塞线程),也不要把阻塞放到 Netty IO 事件线程。
  • 3) 对大批阻塞任务用队列/限流:把请求变成有界队列并在有限并发下调度,避免瞬时并发爆发。

  • 4) 资源池化:对于必须 blocking 的资源(例如 legacy 客户端),用连接池并在 boundedElastic 上执行短时调用。

  • 5) 检测阻塞:在开发/CI 中使用 BlockHound 自动检测非阻塞线程上的阻塞调用(见调试部分)。

4) 调试、测试与错误处理模式

  • 错误处理

    • onErrorResume(e -> fallbackMono):优雅降级。
    • onErrorMap:转换异常类型(便于上层统一处理)。
    • retryWhen(Retry.backoff(...)):带退避的重试(注意幂等性)。
  • 上下文(Context):Reactor 支持 Context(不同于线程局部变量),用于传递请求级别数据(如 traceId、auth info)。使用 contextWriteMono.deferContextual 操作。

  • 调试技巧

    • checkpoint("where")doOnEach/log() 帮助定位堆栈与事件。
    • Hooks.onOperatorDebug()(仅在开发环境)可以增强异常堆栈信息(代价是性能)。
    • BlockHound:在测试/开发时安装 BlockHound 来立刻暴露在非阻塞线程上调用阻塞 API 的位置。
  • 测试:用 reactor-testStepVerifier 做流行为校验(事件顺序、超时、error 分支等)。(单元测试尽量用 StepVerifier 而非 .block(),以保持非阻塞思维)。

  • 性能调试:把 Reactor 事件(订阅/取消/请求)与应用日志、线程堆栈对齐;观察 CPU、GC、Netty 事件循环线程是否被阻塞。

5) 实战:用 Reactor(Spring WebFlux) + R2DBC 构建响应式端点(示例)

下面示例以 Spring WebFlux + Spring Data R2DBC + PostgreSQL 为例(只展示关键类) —— 若你要我生成完整 Maven 项目我可以直接输出可运行的工程。

关键依赖(pom.xml)示例

<!-- spring-boot-starter-webflux, data-r2dbc, r2dbc-postgresql, reactor-test, blockhound -->

(实际依赖我可按你要的 Spring Boot / Java 版本列出)

实体 + repository(Reactive)

@Data
@NoArgsConstructor
@AllArgsConstructor
@Table("users")
public class User {
    @Id private Long id;
    private String name;
    private String email;
}

public interface UserRepository extends ReactiveCrudRepository<User, Long> {
    Flux<User> findByName(String name);
}

(Spring Data R2DBC 提供 reactive repository 支持;也可以用 DatabaseClient/R2dbcEntityTemplate 进行更复杂的 SQL 控制)。

Service(保持非阻塞)

@Service
public class UserService {
    private final UserRepository repo;
    public UserService(UserRepository repo) { this.repo = repo; }

    public Mono<User> createUser(User u){
        // validate -> save (reactive)
        return repo.save(u)
                   .onErrorMap(e -> new RuntimeException("db-fail", e));
    }

    public Flux<User> searchByName(String name){
        return repo.findByName(name)
                   .filter(u -> u.getEmail() != null);
    }
}

Controller(WebFlux)

@RestController
@RequestMapping("/api/users")
public class UserController {
    private final UserService svc;
    public UserController(UserService svc){ this.svc = svc; }

    @PostMapping
    public Mono<ResponseEntity<User>> create(@RequestBody User u){
        return svc.createUser(u)
                  .map(saved -> ResponseEntity.created(URI.create("/api/users/" + saved.getId()))
                                               .body(saved));
    }

    @GetMapping(produces = MediaType.APPLICATION_NDJSON_VALUE)
    public Flux<User> streamByName(@RequestParam String name){
        // Backpressure is automatic via reactive stream protocol.
        return svc.searchByName(name);
    }
}

运维要点

  • 在 Netty 的事件循环线程里绝不能执行阻塞 DB/IO;使用 R2DBC 能把 DB 调用本身做到非阻塞(避免切换到 boundedElastic)。
  • 若不得不调用阻塞 lib(极少情况),Mono.fromCallable(...).subscribeOn(Schedulers.boundedElastic())

6) 实战练习建议(进阶可重复实验)

  1. 小练习 A(背压观察):写一个生产者 Flux.interval(...) 以极快频率发射数据,消费端 flatMap 到一个慢速的 Mono.delay,观察如何通过 request(n)/limitRate()onBackpressureBuffer() 控制流量。
  2. 小练习 B(阻塞检测):把一个小 WebFlux 服务引入 BlockHound.install(),故意在 controller 中做 Thread.sleep(100),运行测试观察 BlockHound 报错。
  3. 小练习 C(R2DBC 联动):写一个端点,查询 DB(R2DBC)并用 StepVerifier 在测试里验证数据与 error paths。推荐在 CI 中把 StepVerifier 与 BlockHound 一起跑。
  4. 性能对比:用 wrk 或自写的负载脚本对比同一功能用 blocking(Tomcat + JDBC)与 non-blocking(WebFlux + R2DBC)在高并发下的吞吐与 p99 延迟曲线。

7) 常见陷阱与防护清单

  • 把阻塞工作放到 Netty/parallel 线程 → 线程池耗尽、响应阻塞。防护:BlockHound + code review + 把阻塞放 boundedElastic。
  • 误用 block() / blockFirst() 在生产代码(会把异步流变同步,阻塞线程)
  • 滥用 flatMap 高并发 导致 DB/下游被打爆 —— 使用 flatMap(func, concurrency) 或限流/信号量(Semaphore)保护下游。
  • 忽视 Context 的传递(例如日志 traceId 丢失)——使用 Reactor 的 Context API 正确传播。
  • 盲目缓存 reactive sequence 的 Flux:如果缓存的是热流/共享流(share()),需要考虑订阅/取消语义与内存使用。
  • 错误的 Scheduler 选择Schedulers.parallel() 用于 CPU bound;boundedElastic() 用于阻塞 I/O;不要把 blocking call 放到 event-loop。

8) 推荐阅读(入门 → 进阶)

  • Project Reactor 官方参考指南(核心 API、操作符表)和 operator reference。
  • Reactive Streams 规范(理解订阅/请求/取消语义)。
  • Spring WebFlux 文档(Reactor 与 WebFlux 集成、非阻塞服务器注意点)。
  • R2DBC 官方站点与驱动页(如何将关系 DB 做成非阻塞访问)。
  • BlockHound(检测开发/CI 阶段的阻塞调用)。

… …

文末

好啦,以上就是我这期的全部内容,如果有任何疑问,欢迎下方留言哦,咱们下期见。

… …

学习不分先后,知识不分多少;事无巨细,当以虚心求教;三人行,必有我师焉!!!

wished for you successed !!!


⭐️若喜欢我,就请关注我叭。

⭐️若对您有用,就请点赞叭。
⭐️若有疑问,就请评论留言告诉我叭。


版权声明:本文由作者原创,转载请注明出处,谢谢支持!

【声明】本内容来自华为云开发者社区博主,不代表华为云及华为云开发者社区的观点和立场。转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息,否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。