响应式编程简介
响应式编程简介
Reactor文档:Reactor 3 Reference Guide (projectreactor.io)
反应式编程范式通常以面向对象语言的形式呈现,作为观察者设计模式的扩展。您还可以将主要反应流模式与熟悉的迭代器设计模式进行比较,因为所有这些库中的 Iterable
- Iterator
对都具有对偶性。一个主要区别是,迭代器是基于拉式的,而反应流是基于推式的。
使用迭代器是一种命令式编程模式,即使访问值的方法完全由 Iterable
负责。事实上,由开发人员选择何时访问序列中的 next()
项。在反应式流中,上述对的等效项是 Publisher-Subscriber
。但正是 Publisher
通知订阅者新的可用值,而这个推送方面是反应的关键。此外,应用于推送值的操作是声明式表达的,而不是命令式的:程序员表达计算的逻辑,而不是描述其精确的控制流。
除了推送值之外,还以明确定义的方式涵盖了错误处理和完成方面。 Publisher
可以将新值推送到其 Subscriber
(通过调用 onNext
),但也可以发出错误信号(通过调用 onError
)或完成(通过调用 onComplete
)。错误和完成都会终止序列。这可以总结如下:
onNext x 0..N [onError | onComplete]
这种方法非常灵活。该模式支持没有值、一个值或 n 个值(包括无限序列的值,例如时钟的连续滴答声)的用例。
为什么需要响应式编程
阻塞可能会造成浪费
现代应用程序可以覆盖大量并发用户,尽管现代硬件的功能不断提高,但现代软件的性能仍然是一个关键问题。
一般来说,有两种方法可以提高程序的性能:
- 并行化以使用更多线程和更多硬件资源。
- 寻求提高现有资源使用效率。
异步来拯救?
前面提到的第二种方法,寻求更高的效率,可以解决资源浪费问题。通过编写异步、非阻塞代码,您可以让执行切换到另一个使用相同底层资源的活动任务,并在异步处理完成后返回到当前进程。
但是如何在 JVM 上生成异步代码呢? Java 提供了两种异步编程模型:
- Callbacks:异步方法没有返回值,但采用额外的
callback
参数(lambda 或匿名类),当结果可用时调用该参数。一个众所周知的例子是 Swing 的EventListener
层次结构。 - Futures:异步方法立即返回
Future<T>
。异步进程计算T
值,但Future
对象包装对其的访问。该值不会立即可用,可以轮询该对象,直到该值可用。例如,运行Callable<T>
任务的ExecutorService
使用Future
对象。
这些技术足够好吗?并非适用于所有用例,并且这两种方法都有局限性。
Callbacks的问题-回调地狱
回调很难组合在一起,很快就会导致代码难以阅读和维护(称为“回调地狱”(Callback Hell))。
- 对于复杂的流程,您需要从回调中执行回调,而回调本身又嵌套在另一个回调中,等等。这种混乱被称为“回调地狱”。
考虑一个示例:在用户界面上显示用户最喜欢的五个,或者在她没有最喜欢的情况下显示建议。这通过三个服务(一个提供最喜欢的 ID,第二个获取最喜欢的详细信息,第三个提供包含详细信息的建议),如下所示:
userService.getFavorites(userId, new Callback<List<String>>() {
public void onSuccess(List<String> list) {
if (list.isEmpty()) {
suggestionService.getSuggestions(new Callback<List<Favorite>>() {
public void onSuccess(List<Favorite> list) {
UiUtils.submitOnUiThread(() -> {
list.stream()
.limit(5)
.forEach(uiList::show);
});
}
public void onError(Throwable error) {
UiUtils.errorPopup(error);
}
});
} else {
list.stream()
.limit(5)
.forEach(favId -> favoriteService.getDetails(favId,
new Callback<Favorite>() {
public void onSuccess(Favorite details) {
UiUtils.submitOnUiThread(() -> uiList.show(details));
}
public void onError(Throwable error) {
UiUtils.errorPopup(error);
}
}
));
}
}
public void onError(Throwable error) {
UiUtils.errorPopup(error);
}
});
与回调代码等效的 Reactor 代码示例:
userService.getFavorites(userId)
.flatMap(favoriteService::getDetails)
.switchIfEmpty(suggestionService.getSuggestions())
.take(5)
.publishOn(UiUtils.uiThreadScheduler())
.subscribe(uiList::show, UiUtils::errorPopup);
如果您想确保在 800 毫秒内检索到最喜爱的 ID,或者如果需要更长的时间,则从缓存中获取它们,该怎么办?在基于回调的代码中,这是一项复杂的任务。在 Reactor 中,它变得就像在链中添加 timeout 运算符一样简单,如下所示:
具有超时和回退功能的 Reactor 代码示例:
userService.getFavorites(userId)
.timeout(Duration.ofMillis(800))
.onErrorResume(cacheService.cachedFavoritesFor(userId))
.flatMap(favoriteService::getDetails)
.switchIfEmpty(suggestionService.getSuggestions())
.take(5)
.publishOn(UiUtils.uiThreadScheduler())
.subscribe(uiList::show, UiUtils::errorPopup);
Futures的问题
Future
对象比回调好一点,但尽管 CompletableFuture
在 Java 8 中带来了改进,但它们在组合方面仍然表现不佳。将多个 Future
对象编排在一起是可行的,但并不容易。此外, Future
还有其他问题:
- 通过调用
get()
方法,很容易导致Future
对象出现另一种阻塞情况。 - 它们不支持惰性计算。
- 它们缺乏对多个值和高级错误处理的支持。
考虑另一个例子:我们得到一个 ID 列表,我们想要从中获取名称和统计信息,并将它们成对地组合起来,所有这些都是异步的。以下示例使用 CompletableFuture
类型的列表执行此操作:
CompletableFuture
组合示例:
CompletableFuture<List<String>> ids = ifhIds();
CompletableFuture<List<String>> result = ids.thenComposeAsync(l -> {
Stream<CompletableFuture<String>> zip =
l.stream().map(i -> {
CompletableFuture<String> nameTask = ifhName(i);
CompletableFuture<Integer> statTask = ifhStat(i);
return nameTask.thenCombineAsync(statTask, (name, stat) -> "Name " + name + " has stats " + stat);
});
List<CompletableFuture<String>> combinationList = zip.collect(Collectors.toList());
CompletableFuture<String>[] combinationArray = combinationList.toArray(new CompletableFuture[combinationList.size()]);
CompletableFuture<Void> allDone = CompletableFuture.allOf(combinationArray);
return allDone.thenApply(v -> combinationList.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList()));
});
List<String> results = result.join();
assertThat(results).contains(
"Name NameJoe has stats 103",
"Name NameBart has stats 104",
"Name NameHenry has stats 105",
"Name NameNicole has stats 106",
"Name NameABSLAJNFOAJNFOANFANSF has stats 121");
由于 Reactor 有更多开箱即用的组合运算符,因此可以简化此过程,如下所示:
与Futures代码等效的 Reactor 代码示例:
Flux<String> ids = ifhrIds();
Flux<String> combinations =
ids.flatMap(id -> {
Mono<String> nameTask = ifhrName(id);
Mono<Integer> statTask = ifhrStat(id);
return nameTask.zipWith(statTask,
(name, stat) -> "Name " + name + " has stats " + stat);
});
Mono<List<String>> result = combinations.collectList();
List<String> results = result.block();
assertThat(results).containsExactly(
"Name NameJoe has stats 103",
"Name NameBart has stats 104",
"Name NameHenry has stats 105",
"Name NameNicole has stats 106",
"Name NameABSLAJNFOAJNFOANFANSF has stats 121"
);
使用回调和 Future 对象的危险是相似的,这正是反应式编程通过 Publisher-Subscriber 对解决的问题。
从命令式编程到反应式编程
反应式库(例如 Reactor)旨在解决 JVM 上“经典”异步方法的这些缺点,同时还关注一些其他方面:
- 可组合性(Composability)和可读性(readability)
- 数据作为一个流(flow),通过丰富的运算符词汇进行操作(operators)
- 在您订阅(subscribe)之前什么都不会发生
- 背压(Backpressure)或消费者向生产者发出排放率过高信号的能力
- 与并发无关的高水平但高价值的抽象(High level but high value abstraction that is concurrency-agnostic)
可组合性(Composability)和可读性(readability)
“可组合性”是指编排多个异步任务的能力,其中我们使用先前任务的结果将输入提供给后续任务。或者,我们可以以分叉连接方式运行多个任务。此外,我们可以将异步任务作为更高级别系统中的离散组件进行重用。
编排任务的能力与代码的可读性和可维护性紧密耦合。随着异步进程层的数量和复杂性的增加,编写和读取代码变得越来越困难。
正如我们所看到的,回调模型很简单,但其主要缺点之一是,对于复杂的流程,您需要从回调中执行回调,而回调本身又嵌套在另一个回调中,等等。这种混乱被称为“回调地狱”。
Reactor 提供了丰富的组合选项,其中代码反映了抽象过程的组织,并且所有内容通常都保持在同一级别(嵌套被最小化)。
装配线类比
您可以将反应式应用程序处理的数据视为在装配线上移动。 Reactor既是传送带,又是工作站。原材料从源头(原始的 Publisher )流出,最终成为成品,准备推向消费者(或 Subscriber )。原材料可以经历各种转变和其他中间步骤,或者成为将中间件聚合在一起的更大装配线的一部分。如果某一点出现故障或堵塞(也许装箱产品需要花费不成比例的长时间),受影响的工作站可以向上游发出信号,以限制原材料的流动。
操作符
在 Reactor 中,操作符是我们装配类比中的工作站。每个运算符都会向 Publisher
添加行为,并将上一步的 Publisher
包装到一个新实例中。整个链因此被链接起来,使得数据源自第一个 Publisher
并沿着链向下移动,并由每个链接进行转换。最终, Subscriber
完成了该过程。请记住,在 Subscriber
订阅 Publisher
之前不会发生任何事情,我们很快就会看到。
- 了解运算符创建新实例可以帮助您避免常见错误,这种错误会让您相信您在链中使用的运算符没有被应用。Reactor Operator是装饰器。它们返回一个包装了源序列并添加行为的不同实例。这就是为什么使用操作符的首选方式是链接调用的原因。
虽然反应式流规范根本没有指定运算符,但反应式库(例如 Reactor)的最佳附加值之一是它们提供的丰富的运算符词汇表。这些涵盖了很多领域,从简单的转换和过滤到复杂的编排和错误处理。
subscribe()
之前什么都不会发生
在你 在 Reactor 中,当您编写 Publisher 链时,默认情况下数据不会开始注入其中。事实上,您创建的是异步流程的抽象描述(这有助于提高可重用性和组合性)。
通过订阅行为,您将 Publisher 绑定到 Subscriber ,这会触发整个链中的数据流。这是通过来自 Subscriber 的单个 request 信号在内部实现的,该信号向上游传播,一直返回到源 Publisher 。
背压(Backpressure)
向上游传播信号也用于实现反压,我们在装配线类比中将其描述为当工作站处理速度比上游工作站慢时沿生产线发送的反馈信号。
Reactive Streams 规范定义的真实机制非常接近这个类比:订阅者可以在无界模式下工作,让源以最快的可实现速率推送所有数据,或者可以使用 request 机制来向源发出信号,表明它已准备好处理最多 n 个元素。
中间操作员还可以更改传输中的请求。想象一个 buffer 运算符将元素按十个为一组进行分组。如果订阅者请求一个buffer,则源产生十个元素是可以接受的。一些运算符还实现预取策略,这可以避免 request(1) 往返,并且如果在请求之前生成元素的成本不是太高,那么这是有益的。
这将推模型转换为推拉混合模型,其中下游可以从上游拉取 n 个元素(如果这些元素随时可用)。但如果元素还没有准备好,它们在生产时就会被上游推送。
Hot vs Cold
Rx 反应库家族区分了两大类反应序列:热反应序列和冷反应序列。这种区别主要与反应流如何对订阅者做出反应有关:
- 对于每个 Subscriber ,冷序列都会重新开始,包括在数据源处。例如,如果源包装 HTTP 调用,则会为每个订阅发出新的 HTTP 请求。
- 对于每个
Subscriber
,热序列不会从头开始。相反,迟到的订阅者会收到订阅后发出的信号。但请注意,某些热反应流可以全部或部分缓存或重放排放历史记录。
For more information on hot vs cold in the context of Reactor, see this reactor-specific section.
版权申明
本站点所有内容,版权均归https://wenchao.ren所有,除非明确授权,否则禁止一切形式的转载协议
打赏
