闲碎记事本 闲碎记事本
首页
  • JAVA
  • Cloudflare
  • 学完再改一遍UI
友链
  • 分类
  • 标签
  • 归档
GitHub (opens new window)

YAN

我要偷偷记录...
首页
  • JAVA
  • Cloudflare
  • 学完再改一遍UI
友链
  • 分类
  • 标签
  • 归档
GitHub (opens new window)
  • java

    • SpringBoot

    • SpringSecurity

    • MybatisPlus

    • Netty

    • sip

    • 其他

      • MDC 使用
      • 位运算
      • RedisMQ实现
      • 自定义枚举序列化
      • Mybatis使用自定义枚举
      • Jackson反序列化泛型注意点
      • 敏感词过滤算法
      • 线程
      • 并发学习
      • jni使用
      • 关于注释
      • 为什么一个Byte用两个16进制表示
      • JAVA获取系统信息
      • 对extends和super的理解
      • JAVA系统API
      • java探针初探
      • JAVA获取USB信息
      • HashMap初探
      • JAVA远程调试
      • 初探webflux
        • Flux 和 Mono 用法
          • 创建 Flux/Mono
          • 转换与处理数据
          • 异步与调度
          • 错误处理
          • 组合与依赖
          • 流控制与背压
          • 资源管理
          • 调试与测试
          • 使用场景总结
      • SSE示例
  • linux

  • docker

  • redis

  • nginx

  • mysql

  • 其他

  • 环境搭建

  • 知识库
  • java
  • 其他
YAN
2025-04-17
目录

初探webflux

WebFlux 是 基于 Reactive Streams 的异步非阻塞模型,使用事件循环(Event Loop)处理请求,通过少量线程(如 CPU 核数)处理高并发流量,适合处理大量长延迟请求(如微服务调用、数据库 IO)。

# 编程方式

WebFlux 提供两种编程方式:

  • 注解模式 @Controller,但返回值类型为 Mono<T> 或 Flux<T>
  • 函数式路由:通过 RouterFunction 和 HandlerFunction 定义路由

WebFlux 需要全链路响应式支持

# 适用场景

  • 高并发(如 10k+ QPS)
  • 延迟敏感型操作(如频繁的微服务调用)
  • 非阻塞 IO 操作(如响应式数据库访问)

# 集成方式

加载依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-webflux</artifactId>
</dependency>

同时引入 WebFlux 和 Web两个 Starter 时,Spring Boot 优先启用 WebFlux(响应式栈),禁用 Spring MVC。

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
</dependency>
代码示例

application.yaml

server:
  port: 8081
import lombok.Data;

@Data
public class Result<T> {
    private int code;
    private T data;
    private String  message;

    public static <T> Result<T> success(T data) {
        Result<T> result = new Result<>();
        result.code = 200;
        result.message = "操作成功";
        result.data = data;
        return  result;
    }
    public static <T> Result<T> fail() {
        Result<T> result = new Result<>();
        result.code = 500;
        result.message = "操作失败";
        return  result;
    }
}

package com.haveways.learnjava.controller;

import com.haveways.learnjava.domain.Result;
import lombok.extern.slf4j.Slf4j;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.time.Duration;
import java.util.stream.Stream;

@Slf4j
@RestController
@RequestMapping(value = "/test")
public class TestController {

    private int count = 0;

    @GetMapping("/")
    public Flux<String> test() {
        Result<Object> objectResult = new Result<>();
        objectResult.setCode(200);
        objectResult.setData("操作成功");
        objectResult.setMessage("success");
        return Flux.create(fluxSink -> {
            // 比如这里我们创建了 10 个对象,然后添加到 fluxSink 里
            for (int i = 0; i < 10; i++) {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException ignored) {
                }
                log.info("输出:{}",i);
                fluxSink.next("success\n");
            }
            // 结束之后调用完结方法
            fluxSink.complete();
        });
    }

    /**
     * 已 event-stream 的形式返回
     */
    @GetMapping(path = "/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<String> stream() {
        return Flux.interval(Duration.ofSeconds(1)).map(l -> "Message " + (count++));
    }

    @GetMapping("/type")
    public Stream<String> type() {
        return  Stream.of("科技", "旅游");
    }

    @GetMapping("/list")
    public Flux<Result<String>> list() {
        return Flux.create(fluxSink -> {
            // 比如这里我们创建了 10 个对象,然后添加到 fluxSink 里
            for (int i = 0; i < 10; i++) {
                // 加入到 sink
                fluxSink.next(Result.success("result"+i));
            }
            // 结束之后调用完结方法
            fluxSink.complete();
        });
    }

    @GetMapping("/info")
    public Result<String> info() {
        return Result.success("测试成功");
    }

    @PostMapping("/save")
    public Mono<Result<String>> save() {
        return Mono.just(Result.success("保存成功"));
    }

    @GetMapping("/get")
    public Mono<Result<String>> get() {
        // 直接返回对象,也可以使用 create() 方法
        return Mono.create(callback -> {
            // 成功的时候返回的结果,success() 方法有一个带参数,一个不带参数
            // 另外还有 error() 方法,在异常的情况下返回的结果
            callback.success(Result.success(null));
        });
    }

}
@Configuration
public class GreetingRouter {

    @Component
    public static class GreetingHandler {
        public Mono<ServerResponse> hello(ServerRequest request) {
            return ServerResponse.ok().contentType(MediaType.TEXT_PLAIN)
                    .body(BodyInserters.fromValue("Hello, Spring WebFlux!"));
        }
    }

    @Bean
    public RouterFunction<ServerResponse> route(GreetingHandler greetingHandler) {
        return RouterFunctions.route(
                RequestPredicates.GET("/hello").and(RequestPredicates.accept(MediaType.TEXT_PLAIN)), greetingHandler::hello
        );
    }
}

# Flux 和 Mono 用法

简单示例

Flux<String> flux = Flux.just("a", "b", "c").cache();
//需注意流只能被消费一次
flux.subscribe(System.out::println);

//如果要多次消费需要 .cache();
Flux<String> flux2 = Flux.just("a", "b", "c").cache();
flux2.subscribe(System.out::println);
flux2.subscribe(System.out::println);

//消费并打印最终结果
Flux.just("a", "b", "c").subscribe(System.out::println, System.err::println, () -> System.out.println("Done"));

//异常
Flux.error(new RuntimeException("执行异常"))
 .subscribe(System.out::println, e->System.err.println("失败:"+e.getMessage()), () -> System.out.println("Done"));

# 创建 Flux/Mono

操作 Flux 示例 Mono 示例 说明
从值创建 Flux.just("a", "b", "c") Mono.just("foo") 从已知值创建序列
从集合创建 Flux.fromIterable(List.of(1, 2, 3)) Mono.justOrEmpty(Optional.of(1)) 从集合或 Optional 转换
从数组创建 Flux.fromArray(new String[]{"a", "b"}) - 数组转为 Flux
从流创建 Flux.fromStream(Stream.of(1, 2, 3)) - 需注意流只能被消费一次
空序列 Flux.empty() Mono.empty() 创建空的响应式流
错误序列 Flux.error(new RuntimeException()) Mono.error(new RuntimeException()) 直接触发错误
范围生成 Flux.range(1, 5) - 生成整数序列(1, 2, 3, 4, 5),第一次参数为初始值,第二个参数为生成数量
延迟生成 Flux.interval(Duration.ofSeconds(1)) - 每隔 1 秒生成递增数字(0, 1, 2…)

# 转换与处理数据

操作 Flux 示例 Mono 示例 说明
合并流(merge) Flux.merge(flux1, flux2) - 合并多个流的数据(按时间顺序)
连接流(concat) Flux.concat(flux1, flux2) Mono.concat(mono1, mono2) 按顺序连接流(先消费 flux1)
映射(map) flux.map(s -> s.toUpperCase()) mono.map(i -> i * 2) 同步转换每个元素 return Flux<V>
异步映射(flatMap) flux.flatMap(s -> Mono.just(s + "!")) mono.flatMap(i -> Flux.range(1, i)) 异步展开为新的流,return Mono<R>
过滤(filter) flux.filter(s -> s.length() > 3) - 只保留满足条件的元素
去重(distinct) flux.distinct() - 去除重复元素
缓冲(buffer) flux.buffer(3) - 每 3 个元素聚合为 List,return Flux<List<T>>
窗口(window) flux.window(2) - 每 2 个元素拆分为新的 Flux, return Flux<Flux<T>>
减少(reduce) flux.reduce(0, (a, b) -> a + b) - 聚合所有元素(类似 Stream.reduce),p1为初始值,p2为计算函数 return Mono<A>
收集(collect) flux.collectList() - 将 Flux 转为 return Mono<List>

# 异步与调度

操作 Flux 示例 Mono 示例 说明
设置运行线程 flux.subscribeOn(Schedulers.parallel()) 同左 无论在流的哪个位置调用,都会影响整个流的上游
切换线程 flux.publishOn(Schedulers.boundedElastic()) 同左 可以在流的任意位置调用,每次调用都会影响后续的操作符

# 错误处理

操作 Flux 示例 Mono 示例 说明
捕获错误(onErrorResume) flux.onErrorResume(e -> Flux.just("fallback")) mono.onErrorResume(e -> Mono.just(0)) 发生错误时切换到一个备用流
返回默认值(onErrorReturn) flux.onErrorReturn("default") mono.onErrorReturn(0) 发生错误时返回静态值
重试(retry) flux.retry(3) mono.retry(2) 失败后重试 N 次
超时(timeout) flux.timeout(Duration.ofSeconds(5)) mono.timeout(Duration.ofSeconds(5)) 超时后触发错误

# 组合与依赖

操作 Flux 示例 Mono 示例 说明
依赖另一个 Mono(then) - mono1.then(mono2) 忽略 mono1 结果,执行 mono2
等待多个 Mono(zip) - Mono.zip(mono1, mono2, (a, b) -> a + b) 合并多个 Mono 的结果
首个完成(first) Flux.first(flux1, flux2) - 选择第一个发出数据的流
条件触发(switchIfEmpty) - mono.switchIfEmpty(Mono.just(0)) 如果为空则切换到一个备用 Mono

# 流控制与背压

操作 Flux 示例 Mono 示例 说明
限流(limitRate) flux.limitRate(10) - 限制每批次请求的元素数量(背压)
采样(sample) flux.sample(Duration.ofMillis(300)) - 每隔 300ms 发射一个最新元素
跳过(skip) flux.skip(3) - 跳过前 N 个元素
取前 N 个(take) flux.take(5) mono.take(Duration.ofSeconds(1)) 只取前 N 个元素或限时内的数据

# 资源管理

操作 Flux 示例 Mono 示例 说明
资源释放(using) Flux.using(() -> resource, r -> Flux.just(r), r -> r.close()) 同左 自动管理资源(类似 try-with-resources)

# 调试与测试

操作 Flux 示例 Mono 示例 说明
日志(log) flux.log() mono.log() 打印流的事件(订阅、数据、错误)
缓存(cache) flux.cache() mono.cache() 缓存已发出的数据供后续重用
阻塞获取(block) flux.blockFirst() mono.block() 测试专用,阻塞直到数据到达

# 使用场景总结

场景 推荐类型 示例
返回单个值(如 REST API) Mono Mono<User> getUser()
返回集合或流式数据 Flux Flux<User> getAllUsers()
实时事件推送(SSE) Flux @GetMapping(produces = MediaType.TEXT_EVENT_STREAM_VALUE) Flux<Event>
异步组合多个操作 Mono.zip Mono.zip(mono1, mono2).map(tuple -> ...)
上次更新: 2025/05/14, 01:34:05
JAVA远程调试
SSE示例

← JAVA远程调试 SSE示例→

最近更新
01
Caddy操作指南
04-25
02
Swap空间
04-22
03
Alist使用
04-21
更多文章>
Theme by Vdoing | Copyright © 2022-2025 YAN | MIT License
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式