初探webflux
WebFlux 是 基于 Reactive Streams 的异步非阻塞模型,使用事件循环(Event Loop)处理请求,通过少量线程(如 CPU 核数)处理高并发流量,适合处理大量长延迟请求(如微服务调用、数据库 IO)。
# 编程方式
WebFlux 提供两种编程方式:
- 注解模式
@Controller
,但返回值类型为Mono<T>
或Flux<T>
- 函数式路由:通过
RouterFunction
和HandlerFunction
定义路由
WebFlux 需要全链路响应式支持
# 适用场景
- 高并发(如 10k+ QPS)
- 延迟敏感型操作(如频繁的微服务调用)
- 非阻塞 IO 操作(如响应式数据库访问)
# 集成方式
加载依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
同时引入 WebFlux 和 Web两个 Starter 时,Spring Boot 优先启用 WebFlux(响应式栈),禁用 Spring MVC。
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
代码示例
application.yaml
server:
port: 8081
import lombok.Data;
@Data
public class Result<T> {
private int code;
private T data;
private String message;
public static <T> Result<T> success(T data) {
Result<T> result = new Result<>();
result.code = 200;
result.message = "操作成功";
result.data = data;
return result;
}
public static <T> Result<T> fail() {
Result<T> result = new Result<>();
result.code = 500;
result.message = "操作失败";
return result;
}
}
package com.haveways.learnjava.controller;
import com.haveways.learnjava.domain.Result;
import lombok.extern.slf4j.Slf4j;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.time.Duration;
import java.util.stream.Stream;
@Slf4j
@RestController
@RequestMapping(value = "/test")
public class TestController {
private int count = 0;
@GetMapping("/")
public Flux<String> test() {
Result<Object> objectResult = new Result<>();
objectResult.setCode(200);
objectResult.setData("操作成功");
objectResult.setMessage("success");
return Flux.create(fluxSink -> {
// 比如这里我们创建了 10 个对象,然后添加到 fluxSink 里
for (int i = 0; i < 10; i++) {
try {
Thread.sleep(1000);
} catch (InterruptedException ignored) {
}
log.info("输出:{}",i);
fluxSink.next("success\n");
}
// 结束之后调用完结方法
fluxSink.complete();
});
}
/**
* 已 event-stream 的形式返回
*/
@GetMapping(path = "/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<String> stream() {
return Flux.interval(Duration.ofSeconds(1)).map(l -> "Message " + (count++));
}
@GetMapping("/type")
public Stream<String> type() {
return Stream.of("科技", "旅游");
}
@GetMapping("/list")
public Flux<Result<String>> list() {
return Flux.create(fluxSink -> {
// 比如这里我们创建了 10 个对象,然后添加到 fluxSink 里
for (int i = 0; i < 10; i++) {
// 加入到 sink
fluxSink.next(Result.success("result"+i));
}
// 结束之后调用完结方法
fluxSink.complete();
});
}
@GetMapping("/info")
public Result<String> info() {
return Result.success("测试成功");
}
@PostMapping("/save")
public Mono<Result<String>> save() {
return Mono.just(Result.success("保存成功"));
}
@GetMapping("/get")
public Mono<Result<String>> get() {
// 直接返回对象,也可以使用 create() 方法
return Mono.create(callback -> {
// 成功的时候返回的结果,success() 方法有一个带参数,一个不带参数
// 另外还有 error() 方法,在异常的情况下返回的结果
callback.success(Result.success(null));
});
}
}
@Configuration
public class GreetingRouter {
@Component
public static class GreetingHandler {
public Mono<ServerResponse> hello(ServerRequest request) {
return ServerResponse.ok().contentType(MediaType.TEXT_PLAIN)
.body(BodyInserters.fromValue("Hello, Spring WebFlux!"));
}
}
@Bean
public RouterFunction<ServerResponse> route(GreetingHandler greetingHandler) {
return RouterFunctions.route(
RequestPredicates.GET("/hello").and(RequestPredicates.accept(MediaType.TEXT_PLAIN)), greetingHandler::hello
);
}
}
# Flux 和 Mono 用法
简单示例
Flux<String> flux = Flux.just("a", "b", "c").cache();
//需注意流只能被消费一次
flux.subscribe(System.out::println);
//如果要多次消费需要 .cache();
Flux<String> flux2 = Flux.just("a", "b", "c").cache();
flux2.subscribe(System.out::println);
flux2.subscribe(System.out::println);
//消费并打印最终结果
Flux.just("a", "b", "c").subscribe(System.out::println, System.err::println, () -> System.out.println("Done"));
//异常
Flux.error(new RuntimeException("执行异常"))
.subscribe(System.out::println, e->System.err.println("失败:"+e.getMessage()), () -> System.out.println("Done"));
# 创建 Flux/Mono
操作 | Flux 示例 | Mono 示例 | 说明 |
---|---|---|---|
从值创建 | Flux.just("a", "b", "c") | Mono.just("foo") | 从已知值创建序列 |
从集合创建 | Flux.fromIterable(List.of(1, 2, 3)) | Mono.justOrEmpty(Optional.of(1)) | 从集合或 Optional 转换 |
从数组创建 | Flux.fromArray(new String[]{"a", "b"}) | - | 数组转为 Flux |
从流创建 | Flux.fromStream(Stream.of(1, 2, 3)) | - | 需注意流只能被消费一次 |
空序列 | Flux.empty() | Mono.empty() | 创建空的响应式流 |
错误序列 | Flux.error(new RuntimeException()) | Mono.error(new RuntimeException()) | 直接触发错误 |
范围生成 | Flux.range(1, 5) | - | 生成整数序列(1, 2, 3, 4, 5),第一次参数为初始值,第二个参数为生成数量 |
延迟生成 | Flux.interval(Duration.ofSeconds(1)) | - | 每隔 1 秒生成递增数字(0, 1, 2…) |
# 转换与处理数据
操作 | Flux 示例 | Mono 示例 | 说明 |
---|---|---|---|
合并流(merge) | Flux.merge(flux1, flux2) | - | 合并多个流的数据(按时间顺序) |
连接流(concat) | Flux.concat(flux1, flux2) | Mono.concat(mono1, mono2) | 按顺序连接流(先消费 flux1) |
映射(map) | flux.map(s -> s.toUpperCase()) | mono.map(i -> i * 2) | 同步转换每个元素 return Flux<V> |
异步映射(flatMap) | flux.flatMap(s -> Mono.just(s + "!")) | mono.flatMap(i -> Flux.range(1, i)) | 异步展开为新的流,return Mono<R> |
过滤(filter) | flux.filter(s -> s.length() > 3) | - | 只保留满足条件的元素 |
去重(distinct) | flux.distinct() | - | 去除重复元素 |
缓冲(buffer) | flux.buffer(3) | - | 每 3 个元素聚合为 List,return Flux<List<T>> |
窗口(window) | flux.window(2) | - | 每 2 个元素拆分为新的 Flux, return Flux<Flux<T>> |
减少(reduce) | flux.reduce(0, (a, b) -> a + b) | - | 聚合所有元素(类似 Stream.reduce),p1为初始值,p2为计算函数 return Mono<A> |
收集(collect) | flux.collectList() | - | 将 Flux 转为 return Mono<List> |
# 异步与调度
操作 | Flux 示例 | Mono 示例 | 说明 |
---|---|---|---|
设置运行线程 | flux.subscribeOn(Schedulers.parallel()) | 同左 | 无论在流的哪个位置调用,都会影响整个流的上游 |
切换线程 | flux.publishOn(Schedulers.boundedElastic()) | 同左 | 可以在流的任意位置调用,每次调用都会影响后续的操作符 |
# 错误处理
操作 | Flux 示例 | Mono 示例 | 说明 |
---|---|---|---|
捕获错误(onErrorResume) | flux.onErrorResume(e -> Flux.just("fallback")) | mono.onErrorResume(e -> Mono.just(0)) | 发生错误时切换到一个备用流 |
返回默认值(onErrorReturn) | flux.onErrorReturn("default") | mono.onErrorReturn(0) | 发生错误时返回静态值 |
重试(retry) | flux.retry(3) | mono.retry(2) | 失败后重试 N 次 |
超时(timeout) | flux.timeout(Duration.ofSeconds(5)) | mono.timeout(Duration.ofSeconds(5)) | 超时后触发错误 |
# 组合与依赖
操作 | Flux 示例 | Mono 示例 | 说明 |
---|---|---|---|
依赖另一个 Mono(then) | - | mono1.then(mono2) | 忽略 mono1 结果,执行 mono2 |
等待多个 Mono(zip) | - | Mono.zip(mono1, mono2, (a, b) -> a + b) | 合并多个 Mono 的结果 |
首个完成(first) | Flux.first(flux1, flux2) | - | 选择第一个发出数据的流 |
条件触发(switchIfEmpty) | - | mono.switchIfEmpty(Mono.just(0)) | 如果为空则切换到一个备用 Mono |
# 流控制与背压
操作 | Flux 示例 | Mono 示例 | 说明 |
---|---|---|---|
限流(limitRate) | flux.limitRate(10) | - | 限制每批次请求的元素数量(背压) |
采样(sample) | flux.sample(Duration.ofMillis(300)) | - | 每隔 300ms 发射一个最新元素 |
跳过(skip) | flux.skip(3) | - | 跳过前 N 个元素 |
取前 N 个(take) | flux.take(5) | mono.take(Duration.ofSeconds(1)) | 只取前 N 个元素或限时内的数据 |
# 资源管理
操作 | Flux 示例 | Mono 示例 | 说明 |
---|---|---|---|
资源释放(using) | Flux.using(() -> resource, r -> Flux.just(r), r -> r.close()) | 同左 | 自动管理资源(类似 try-with-resources) |
# 调试与测试
操作 | Flux 示例 | Mono 示例 | 说明 |
---|---|---|---|
日志(log) | flux.log() | mono.log() | 打印流的事件(订阅、数据、错误) |
缓存(cache) | flux.cache() | mono.cache() | 缓存已发出的数据供后续重用 |
阻塞获取(block) | flux.blockFirst() | mono.block() | 测试专用,阻塞直到数据到达 |
# 使用场景总结
场景 | 推荐类型 | 示例 |
---|---|---|
返回单个值(如 REST API) | Mono | Mono<User> getUser() |
返回集合或流式数据 | Flux | Flux<User> getAllUsers() |
实时事件推送(SSE) | Flux | @GetMapping(produces = MediaType.TEXT_EVENT_STREAM_VALUE) Flux<Event> |
异步组合多个操作 | Mono.zip | Mono.zip(mono1, mono2).map(tuple -> ...) |
上次更新: 2025/04/25, 10:46:05