介绍
Reactive Streams 是一种异步数据处理标准,以流式方式进行非阻塞背压处理。从Java 9开始,它们已成为JDK的一部分,以 java.util.concurrent.Flow.*
的形式提供接口。
拥有这些接口可能会引诱你编写自己的实现。尽管这看起来很惊人,但这并不是JDK中它们存在的原因。
在本文中,我将描述反应式流处理的基本概念,并展示如何(不)使用JDK 9+中包含的API。此外,我们将考虑JDK的反应式流支持未来可能走向的方向。
流处理回顾
在广义的流处理架构中,你可能已经看过至少一次,可以列举出一些主要概念:
- 数据源,有时称为 生产者,
- 数据的目的地,有时称为 消费者,
- 一个或多个 处理阶段 对数据进行操作。
在这样的管道中,数据从生产者流经处理阶段到消费者:
广义流处理架构
现在,如果你考虑到组件可能具有不同的处理速度,那么有两种可能的情况:
-
如果下游(即接收数据的组件)比上游(发送数据的组件)快,则一切正常,因为管道应该正常工作。
-
但是,如果上游更快,则下游将被大量数据淹没,事情开始变得更糟。在后一种情况下,有几种处理多余数据的策略:
-
缓冲区 - 但是缓冲区容量有限,迟早会耗尽内存。
-
放弃 - 但是这样会丢失数据(通常不是所期望的,但在某些情况下可能是有意义的 - 例如,这是网络硬件经常做的)。
-
阻塞,直到消费者处理完成 - 但这可能会导致整个管道变慢。 处理这些不同处理能力的首选方式是一种称为 背压 的技术 - 它归结为较慢的消费者从更快的生产者请求给定数量的数据 - 但只有消费者在那时能够处理的数量。
回到流式处理图,你可以将背压视为一种特殊类型的信号数据,与正在处理的常规数据相比,流动方向相反:
带背压的流式处理
然而,并不是每个具有背压的流处理管道都必然是反应式的。
反应式流
反应式流的关键概念是以异步和非阻塞的方式处理无限数据流,以便可以并行使用计算资源(例如CPU核或网络主机)。
使流反应式的三个关键因素是:
- 数据以异步方式处理,
- 背压机制是非阻塞的,
- 下游比上游慢的事实在领域模型中以某种方式表示。
最后一个示例包括 Twitter 流式 API,如果消费太慢,你可能会被断开连接,或者 Akka Streams 中的内置阶段之一 - <a class="af oo" href="https://doc.akka.io/docs/akka/2.5/stream/operators/Source-or-Flow/conflate.html" rel="noopener ugc nofollow" target="_blank">conflate</a>
- 它允许你明确计划缓慢的下游。
JDK 中的反应式流支持
从版本9开始,反应式流接口 - 以前作为 单独的库 可用 - 已成为 java.util.concurrent.Flow
类的一部分。
这四个接口乍一看似乎非常简单:
Publisher<T>
负责发布类型为T
的元素,并提供一个subscribe
方法,以供订阅者连接到它,Subscriber<T>
连接到Publisher
,通过onSubscribe
接收确认,然后通过onNext
回调接收数据和通过onError
和onComplete
接收其他信号,Subscription
表示Publisher
和Subscriber
之间的链接,并允许使用request
进行背压或使用cancel
终止链接,Processor
将Publisher
和Subscriber
的功能结合在一个接口中。# 好了,让我们开始写代码吧!
这么简单的接口可能会引起你的兴趣去实现它们。例如,你可以编写一个Publisher
的简单实现,用于发布任意整数的迭代器:
然后你可以尝试使用一些虚拟的订阅者来运行它,只需打印接收到的数据:
如果你运行它并检查输出,应该会产生:
item = [1]
item = [2]
item = [3]
item = [4]
item = [5]
item = [6]
item = [7]
item = [8]
item = [9]
item = [10]
complete
所以它能正常工作,对吗?看起来是这样的,但你可能有一种直觉,觉得有些东西还缺失。例如,发布者不会根据任何需求发出元素,而是立即将它们全部发送到下游。
事实证明,有一种方法可以证明这种简单的实现远非正确。这可以通过运行一些来自Reactive Streams TCK的测试来实现。TCK(或技术兼容性套件)只是一个测试框架,用于验证反应式组件的实现是否正确,即组件之间的正确交互。它的目标是确保所有自定义反应式流实现都可以平稳地工作在一起 - 通过抽象接口连接 - 同时正确执行所有数据传输、信号和背压。
要为SimplePublisher
创建一个测试用例,你需要在构建定义中添加一个适当的依赖项,并扩展TCK的FlowPublisherVerification
:
运行朴素发布者的测试用例后,你会发现它确实存在一些问题:
针对SimplePublisher运行TCK发布者测试的结果
实际上,只有一个测试用例通过了,其他所有测试都存在问题。这清楚地表明了这种简单的实现并不是正确的。
测试用例名称中的数字是指反应式流规范中的相应条目,你可以在其中进一步探索这些要求背后的概念。
事实证明,大多数问题可以通过几个小的更改来消除,即:
- 引入一个
Subscription
的实现,将发布者与其订阅者连接起来,根据需求发出元素 - 添加一些基本的错误处理,
- 在订阅中添加一些简单的状态,以正确处理终止。
有关详细信息,请查看示例代码的提交历史记录。
然而,最终你会遇到一些问题变得不太简单且难以解决。
由于实现是同步的,存在一个问题,即订阅的request()
调用订阅者的onNext()
导致无限递归,订阅者又调用request()
,以此类推。
另一个严重的问题与处理无限需求有关(即订阅者请求Long.MAX_VALUE
元素,可能多次请求)。如果你在这里不小心,可能会产生太多线程或溢出一些long
值,其中你可能存储了累积的需求。
不要在家尝试这个
上面的示例的底线是,反应式组件真的不容易正确地实现。因此,除非你正在撰写另一个反应式流实现,否则你真的不应该自己实现它们,而是使用已经经过TCK验证的现有实现。
如果你决定自己编写实现,一定要了解规范的所有细节,并记得针对你的代码运行TCK。
新接口的目的
你可能会问,那么这些接口的目的是什么呢?它们被包含在JDK中的实际目标是提供所谓的服务提供者接口(或SPI)层。这最终应该作为不同具有反应式和流特性的组件的统一层,但可能会公开它们自己的自定义API,因此无法与其他类似实现进行互操作。
另一个同样重要的目标是将JDK的未来发展引向正确的方向,导致现有的流抽象(已经存在于JDK中并广泛使用)使用一些共同的接口 - 再次提高互操作性。
现有的流抽象
那么在JDK中已经有哪些流抽象了(其中“流”意味着逐块处理大量、可能是无限的数据,而不是事先将所有数据读入内存)?它们包括:
java.io.InputStream
/OutputStream
java.util.Iterator
java.nio.channels.*
javax.servlet.ReadListener
/WriteListener
java.sql.ResultSet
java.util.Stream
java.util.concurrent.Flow.*
尽管上述所有抽象都公开了某种流式行为,但它们缺少一个公共API,可以让你轻松地连接它们,例如使用Publisher
从一个文件中读取数据,并使用Subscriber
将其写入另一个文件中。
拥有这样的统一层的优势是可以使用单个调用:
publisher.subscribe(subscriber)
来处理反应式流处理的所有隐藏复杂性(如背压和信号)。
迈向理想的世界
如果让各种抽象使用共同的接口,可能会产生什么结果?让我们看几个例子。
最小操作集
JDK中当前的反应式流支持仅限于前面描述的四个接口。如果你以前使用过某些反应式库 - Akka Streams、RxJava或Project Reactor,你就会意识到它们的强大之处在于各种流组合器(如map
或filter
等最简单的组合器)可直接使用。这些组合器在JDK中缺失,尽管你可能期望至少有几个组合器可用。为了解决这个问题,Lightbend提出了一个Reactive Streams Utilities的POC,这是一个内置基本操作的库,可以提供更复杂的插件作为现有实现的委托,并通过JVM系统参数指定,例如:
-Djava.flow.provider=akka
HTTP
如何以响应式方式通过HTTP接收上传的文件并将其上传到其他地方?
自Servlet版本3.1开始,就有异步Servlet IO。此外,从JDK 9开始,还有一个新的HTTP客户端(在Java 9/10中是在jdk.incubating.http
模块中,但从Java 11开始被认为是稳定的)。除了更好的API外,新的客户端还支持Reactive Streams作为输入/输出。其中之一是它提供了一个POST(Publisher<ByteBuffer>)
方法。
现在,如果HttpServletRequest
提供了一个发布者来公开请求正文,那么上传接收到的文件将变为:
POST(BodyPublisher.fromPublisher(req.getPublisher())
通过使用那一行代码,所有的反应特性都在内部实现。
数据库访问
当涉及以反应式方式通用访问关系数据库时,异步数据库访问API(ADBA)带来了一些希望,不幸的是,它到目前为止还没有进入JDK。
还有R2DBC——_一种将反应式编程API带到关系数据存储中的努力。它目前支持H2和Postgres,并且与Spring Data JPA很好地协作,这可能是帮助扩大采用的优势。
然后还有一些供应商特定的异步驱动程序。但我们仍然缺少一个完美的解决方案,可以让你执行类似于:
Publisher<User> users = entityManager
.createQuery("select u from users")
.getResultPublisher()
这基本上是一个普通的JPA调用,只是使用了一个用户的Publisher
而不是一个List
。
这仍然不是现实
再次提醒你——上述示例是对未来的展望,它们还没有实现。JDK和生态系统将朝哪个方向发展是时间和社区努力的问题。
统一层的实际用途
虽然HTTP和数据库的统一还没有实现,但已经可以使用在JDK中找到的统一接口实际连接各种Reactive Streams实现。
在这个例子中,我将使用Project Reactor的Flux
作为发布者,Akka Streams的Flow
作为处理器,RxJava作为订阅者。注意:下面的示例代码使用Java 10 var
,因此,如果你打算自己尝试,请确保有适当的JDK。
看一下main
,你会发现有三个组件构成了管道:reactorPublisher
,akkaStreamsProcessor
和Flowable
(它打印到标准输出)。
当你查看工厂方法的返回类型时,你会注意到它们仅仅是常见的Reactive Streams接口(Publisher<Long>
和Processor<Long, Long>
),它们用于无缝连接不同的实现。
此外,正如你所看到的,各种库不会直接返回统一类型(即它们在内部使用不同的类型层次结构),但它们需要一些粘合代码,将其内部类型转换为来自java.util.concurrent.Flow.*
的类型,如JdkFlowAdapter
或JavaFlowSupport
。
最后但并非最不重要的是,你可以在流引擎的内部暴露不同库之间的差异。虽然Project Reactor倾向于完全隐藏内部,但另一方面,Akka Streams要求你显式地定义一个材料化器——流水线的运行时。
总结
以下是本文的几个关键要点:
- JDK中的Reactive Streams支持不是规范的完整实现,而仅仅是共同的接口,
- 这些接口是作为SPI(服务提供程序接口)提供的——不同的Reactive Streams实现的统一层,
- 自己实现接口并不容易,也不推荐,除非你正在创建一些新的库;如果你决定实现它们,请确保TCK中的所有测试都是绿色的——这给你带来了良好的机会,使你的库能够与其他反应式组件平稳工作。
如果你想尝试TCK和SimplePublisher
示例,代码可在我的GitHub上找到:
如果你对深入了解Reactive Streams实现感兴趣,我真诚地推荐Advanced Reactive Java博客。
译自:https://blog.softwaremill.com/how-not-to-use-reactive-streams-in-java-9-7a39ea9c2cb3
评论(0)