当前位置 : 首页 » 文章分类 :  开发  »  Reactor

Reactor

Reactor相关笔记

https://projectreactor.io/

Flux 和 Mono

Flux 和 Mono 是 Reactor 中的两个基本概念。
Flux 表示的是包含 0 到 N 个元素的异步序列。
在该序列中可以包含三种不同类型的消息通知:正常的包含元素的消息、序列结束的消息和序列出错的消息。当消息通知产生时,订阅者中对应的方法 onNext(), onComplete()onError()会被调用。

Mono 表示的是包含 0 或者 1 个元素的异步序列。
该序列中同样可以包含与 Flux 相同的三种类型的消息通知。

Flux 和 Mono 之间可以进行转换。对一个 Flux 序列进行计数操作,得到的结果是一个 Mono<Long>对象。把两个 Mono 序列合并在一起,得到的是一个 Flux 对象。

Flux

通过Flux静态方法创建Flux

just():可以指定序列中包含的全部元素。创建出来的 Flux 序列在发布这些元素之后会自动结束。
fromArray(),fromIterable()和 fromStream():可以从一个数组、Iterable 对象或 Stream 对象中创建 Flux 对象。
empty():创建一个不包含任何元素,只发布结束消息的序列。
error(Throwable error):创建一个只包含错误消息的序列。
never():创建一个不包含任何消息通知的序列。
range(int start, int count):创建包含从 start 起始的 count 个数量的 Integer 对象的序列。
interval(Duration period)和 interval(Duration delay, Duration period):创建一个包含了从 0 开始递增的 Long 对象的序列。其中包含的元素按照指定的间隔来发布。除了间隔时间之外,还可以指定起始元素发布之前的延迟时间。
intervalMillis(long period)和 intervalMillis(long delay, long period):与 interval()方法的作用相同,只不过该方法通过毫秒数来指定时间间隔和延迟时间。

Flux.just("Hello", "World").subscribe(System.out::println);
Flux.fromArray(new Integer[]{1, 2, 3}).subscribe(System.out::println);
Flux.empty().subscribe(System.out::println);
Flux.range(1, 10).subscribe(System.out::println);
Flux.interval(Duration.of(10, ChronoUnit.SECONDS)).subscribe(System.out::println);
Flux.intervalMillis(1000).subscribe(System.out::println);

subscribe() 消息处理

当需要处理 Flux 或 Mono 中的消息时,如之前的代码清单所示,可以通过 subscribe 方法来添加相应的订阅逻辑。
在调用 subscribe 方法时可以指定需要处理的消息类型。可以只处理其中包含的正常消息,也可以同时处理错误消息和完成消息。

Flux.just(1, 2)
        .concatWith(Mono.error(new IllegalStateException()))
        .subscribe(System.out::println, System.err::println);

使用 Reactor 进行反应式编程
https://www.ibm.com/developerworks/cn/java/j-cn-with-reactor-response-encode/index.html

上一篇 Spring-WebFlux

下一篇 Apache-Shiro

阅读
评论
667
阅读预计2分钟
创建日期 2019-07-01
修改日期 2019-07-03
类别
标签

页面信息

location:
protocol:
host:
hostname:
origin:
pathname:
href:
document:
referrer:
navigator:
platform:
userAgent:

评论