响应式编程(Reactive Programming with Reactor)!
开篇语
哈喽,各位小伙伴们,你们好呀,我是喵手。运营社区:C站/掘金/腾讯云/阿里云/华为云/51CTO;欢迎大家常来逛逛
今天我要给大家分享一些自己日常学习到的一些知识点,并以文字的形式跟大家一起交流,互相学习,一个人虽可以走的更快,但一群人可以走的更远。
我是一名后端开发爱好者,工作日常接触到最多的就是Java语言啦,所以我都尽量抽业余时间把自己所学到所会的,通过文章的形式进行输出,希望以这种方式帮助到更多的初学者或者想入门的小伙伴们,同时也能对自己的技术进行沉淀,加以复盘,查缺补漏。
小伙伴们在批阅的过程中,如果觉得文章不错,欢迎点赞、收藏、关注哦。三连即是对作者我写作道路上最好的鼓励与支持!
1) 响应式流(Reactive Streams)与背压(Backpressure)简介
- 目的:在异步/流式场景中以非阻塞方式传输数据,并且让消费者能对生产者“申请”它能处理的数量(背压),从而避免 OOM / 线程耗尽。Reactive Streams 是 JVM 上的规范,定义了
Publisher/Subscriber/Subscription/Processor等契约。 - 核心行为:消费者通过
Subscription.request(n)控制速率;生产者必须尊重请求量(否则会违背规范并产生资源问题)。常见误区是“认为背压自动解决一切” —— 实际上上游要支持或使用限速/缓冲策略。
2) Reactor(Mono / Flux)基础与常用操作符(关键模式)
-
Mono<T>:0…1 个元素流;Flux<T>:0…N 个元素流。Reactor 是对 Reactive Streams 的实现并提供大量组合操作符。 -
创建方式:
Mono.just(..),Mono.empty(),Mono.fromCallable(...),Flux.just(...),Flux.fromIterable(...),Flux.create(...)。 -
变换/组合基础:
map:同步变换单元素flatMap:异步/并发地把元素映射为 Publisher(常用于 I/O 调用)concatMap:保证顺序(串行地等待上一个完成)zip/merge/concat:多流合并与配对switchMap:切换到最新的 inner publisher(典型用于用户输入的“防抖”)
-
常见组合模式:
- “串联异步步骤”:
Mono.just(x).flatMap(this::step1).flatMap(this::step2) - “并行等待多源结果”:
Mono.zip(m1, m2, (a,b)->combine) - “批量流处理并发限制”:
flux.flatMap(f, concurrency)(不要把并发设置得太大)
- “串联异步步骤”:
-
注意:
flatMap默认并行(无序)行为可能导致不可预期的并发副作用;想保序用concatMap。
3) 把阻塞 API 转为非阻塞(实用策略)
现实中最多见的挑战是“第三方库/旧代码是阻塞的” —— 需要把它们包成不会阻塞 Reactor 的非阻塞路径:
-
1) 最优:使用原生异步/非阻塞实现(例如使用 R2DBC 而不是 JDBC,使用 Reactor Netty / WebClient 而不是 Apache blocking 客户端)。
-
2) 若无法改库:把阻塞调用移到专用阻塞线程池
Mono.fromCallable(() -> blockingCall()).subscribeOn(Schedulers.boundedElastic())—— 用boundedElastic()处理短期阻塞 I/O(它会弹性扩容但有上限)。- 切记不要用
Schedulers.parallel()(用于 CPU 任务且标记为非阻塞线程),也不要把阻塞放到 Netty IO 事件线程。
-
3) 对大批阻塞任务用队列/限流:把请求变成有界队列并在有限并发下调度,避免瞬时并发爆发。
-
4) 资源池化:对于必须 blocking 的资源(例如 legacy 客户端),用连接池并在 boundedElastic 上执行短时调用。
-
5) 检测阻塞:在开发/CI 中使用 BlockHound 自动检测非阻塞线程上的阻塞调用(见调试部分)。
4) 调试、测试与错误处理模式
-
错误处理:
onErrorResume(e -> fallbackMono):优雅降级。onErrorMap:转换异常类型(便于上层统一处理)。retryWhen(Retry.backoff(...)):带退避的重试(注意幂等性)。
-
上下文(Context):Reactor 支持
Context(不同于线程局部变量),用于传递请求级别数据(如 traceId、auth info)。使用contextWrite与Mono.deferContextual操作。 -
调试技巧:
checkpoint("where")与doOnEach/log()帮助定位堆栈与事件。Hooks.onOperatorDebug()(仅在开发环境)可以增强异常堆栈信息(代价是性能)。- BlockHound:在测试/开发时安装 BlockHound 来立刻暴露在非阻塞线程上调用阻塞 API 的位置。
-
测试:用
reactor-test的StepVerifier做流行为校验(事件顺序、超时、error 分支等)。(单元测试尽量用 StepVerifier 而非.block(),以保持非阻塞思维)。 -
性能调试:把 Reactor 事件(订阅/取消/请求)与应用日志、线程堆栈对齐;观察 CPU、GC、Netty 事件循环线程是否被阻塞。
5) 实战:用 Reactor(Spring WebFlux) + R2DBC 构建响应式端点(示例)
下面示例以 Spring WebFlux + Spring Data R2DBC + PostgreSQL 为例(只展示关键类) —— 若你要我生成完整 Maven 项目我可以直接输出可运行的工程。
关键依赖(pom.xml)示例
<!-- spring-boot-starter-webflux, data-r2dbc, r2dbc-postgresql, reactor-test, blockhound -->
(实际依赖我可按你要的 Spring Boot / Java 版本列出)
实体 + repository(Reactive)
@Data
@NoArgsConstructor
@AllArgsConstructor
@Table("users")
public class User {
@Id private Long id;
private String name;
private String email;
}
public interface UserRepository extends ReactiveCrudRepository<User, Long> {
Flux<User> findByName(String name);
}
(Spring Data R2DBC 提供 reactive repository 支持;也可以用 DatabaseClient/R2dbcEntityTemplate 进行更复杂的 SQL 控制)。
Service(保持非阻塞)
@Service
public class UserService {
private final UserRepository repo;
public UserService(UserRepository repo) { this.repo = repo; }
public Mono<User> createUser(User u){
// validate -> save (reactive)
return repo.save(u)
.onErrorMap(e -> new RuntimeException("db-fail", e));
}
public Flux<User> searchByName(String name){
return repo.findByName(name)
.filter(u -> u.getEmail() != null);
}
}
Controller(WebFlux)
@RestController
@RequestMapping("/api/users")
public class UserController {
private final UserService svc;
public UserController(UserService svc){ this.svc = svc; }
@PostMapping
public Mono<ResponseEntity<User>> create(@RequestBody User u){
return svc.createUser(u)
.map(saved -> ResponseEntity.created(URI.create("/api/users/" + saved.getId()))
.body(saved));
}
@GetMapping(produces = MediaType.APPLICATION_NDJSON_VALUE)
public Flux<User> streamByName(@RequestParam String name){
// Backpressure is automatic via reactive stream protocol.
return svc.searchByName(name);
}
}
运维要点:
- 在 Netty 的事件循环线程里绝不能执行阻塞 DB/IO;使用 R2DBC 能把 DB 调用本身做到非阻塞(避免切换到 boundedElastic)。
- 若不得不调用阻塞 lib(极少情况),
Mono.fromCallable(...).subscribeOn(Schedulers.boundedElastic())。
6) 实战练习建议(进阶可重复实验)
- 小练习 A(背压观察):写一个生产者
Flux.interval(...)以极快频率发射数据,消费端flatMap到一个慢速的Mono.delay,观察如何通过request(n)/limitRate()或onBackpressureBuffer()控制流量。 - 小练习 B(阻塞检测):把一个小 WebFlux 服务引入
BlockHound.install(),故意在 controller 中做Thread.sleep(100),运行测试观察 BlockHound 报错。 - 小练习 C(R2DBC 联动):写一个端点,查询 DB(R2DBC)并用
StepVerifier在测试里验证数据与 error paths。推荐在 CI 中把 StepVerifier 与 BlockHound 一起跑。 - 性能对比:用
wrk或自写的负载脚本对比同一功能用 blocking(Tomcat + JDBC)与 non-blocking(WebFlux + R2DBC)在高并发下的吞吐与 p99 延迟曲线。
7) 常见陷阱与防护清单
- 把阻塞工作放到 Netty/parallel 线程 → 线程池耗尽、响应阻塞。防护:BlockHound + code review + 把阻塞放 boundedElastic。
- 误用
block()/blockFirst()在生产代码(会把异步流变同步,阻塞线程) - 滥用
flatMap高并发 导致 DB/下游被打爆 —— 使用flatMap(func, concurrency)或限流/信号量(Semaphore)保护下游。 - 忽视 Context 的传递(例如日志 traceId 丢失)——使用 Reactor 的 Context API 正确传播。
- 盲目缓存 reactive sequence 的
Flux:如果缓存的是热流/共享流(share()),需要考虑订阅/取消语义与内存使用。 - 错误的 Scheduler 选择:
Schedulers.parallel()用于 CPU bound;boundedElastic()用于阻塞 I/O;不要把 blocking call 放到 event-loop。
8) 推荐阅读(入门 → 进阶)
- Project Reactor 官方参考指南(核心 API、操作符表)和 operator reference。
- Reactive Streams 规范(理解订阅/请求/取消语义)。
- Spring WebFlux 文档(Reactor 与 WebFlux 集成、非阻塞服务器注意点)。
- R2DBC 官方站点与驱动页(如何将关系 DB 做成非阻塞访问)。
- BlockHound(检测开发/CI 阶段的阻塞调用)。
… …
文末
好啦,以上就是我这期的全部内容,如果有任何疑问,欢迎下方留言哦,咱们下期见。
… …
学习不分先后,知识不分多少;事无巨细,当以虚心求教;三人行,必有我师焉!!!
wished for you successed !!!
⭐️若喜欢我,就请关注我叭。
⭐️若对您有用,就请点赞叭。
⭐️若有疑问,就请评论留言告诉我叭。
版权声明:本文由作者原创,转载请注明出处,谢谢支持!
- 点赞
- 收藏
- 关注作者
评论(0)