首页
Preview

如何(不)在Java 9+中使用响应式流

介绍

Reactive Streams 是一种异步数据处理标准,以流式方式进行非阻塞背压处理。从Java 9开始,它们已成为JDK的一部分,以 java.util.concurrent.Flow.* 的形式提供接口。

拥有这些接口可能会引诱你编写自己的实现。尽管这看起来很惊人,但这并不是JDK中它们存在的原因。

在本文中,我将描述反应式流处理的基本概念,并展示如何(不)使用JDK 9+中包含的API。此外,我们将考虑JDK的反应式流支持未来可能走向的方向。

流处理回顾

在广义的流处理架构中,你可能已经看过至少一次,可以列举出一些主要概念:

  • 数据源,有时称为 生产者
  • 数据的目的地,有时称为 消费者
  • 一个或多个 处理阶段 对数据进行操作。

在这样的管道中,数据从生产者流经处理阶段到消费者:

广义流处理架构

现在,如果你考虑到组件可能具有不同的处理速度,那么有两种可能的情况:

  • 如果下游(即接收数据的组件)比上游(发送数据的组件)快,则一切正常,因为管道应该正常工作。

  • 但是,如果上游更快,则下游将被大量数据淹没,事情开始变得更糟。在后一种情况下,有几种处理多余数据的策略:

  • 缓冲区 - 但是缓冲区容量有限,迟早会耗尽内存。

  • 放弃 - 但是这样会丢失数据(通常不是所期望的,但在某些情况下可能是有意义的 - 例如,这是网络硬件经常做的)。

  • 阻塞,直到消费者处理完成 - 但这可能会导致整个管道变慢。 处理这些不同处理能力的首选方式是一种称为 背压 的技术 - 它归结为较慢的消费者从更快的生产者请求给定数量的数据 - 但只有消费者在那时能够处理的数量。

回到流式处理图,你可以将背压视为一种特殊类型的信号数据,与正在处理的常规数据相比,流动方向相反:

带背压的流式处理

然而,并不是每个具有背压的流处理管道都必然是反应式的。

反应式流

反应式流的关键概念是以异步和非阻塞的方式处理无限数据流,以便可以并行使用计算资源(例如CPU核或网络主机)。

使流反应式的三个关键因素是:

  • 数据以异步方式处理,
  • 背压机制是非阻塞的,
  • 下游比上游慢的事实在领域模型中以某种方式表示。

最后一个示例包括 Twitter 流式 API,如果消费太慢,你可能会被断开连接,或者 Akka Streams 中的内置阶段之一 - <a class="af oo" href="https://doc.akka.io/docs/akka/2.5/stream/operators/Source-or-Flow/conflate.html" rel="noopener ugc nofollow" target="_blank">conflate</a> - 它允许你明确计划缓慢的下游。

JDK 中的反应式流支持

从版本9开始,反应式流接口 - 以前作为 单独的库 可用 - 已成为 java.util.concurrent.Flow 类的一部分。

这四个接口乍一看似乎非常简单:

  • Publisher&lt;T&gt; 负责发布类型为 T 的元素,并提供一个 subscribe 方法,以供订阅者连接到它,
  • Subscriber&lt;T&gt; 连接到 Publisher,通过 onSubscribe 接收确认,然后通过 onNext 回调接收数据和通过 onErroronComplete 接收其他信号,
  • Subscription 表示 PublisherSubscriber 之间的链接,并允许使用 request 进行背压或使用 cancel 终止链接,
  • ProcessorPublisherSubscriber 的功能结合在一个接口中。# 好了,让我们开始写代码吧!

这么简单的接口可能会引起你的兴趣去实现它们。例如,你可以编写一个Publisher的简单实现,用于发布任意整数的迭代器:

然后你可以尝试使用一些虚拟的订阅者来运行它,只需打印接收到的数据:

如果你运行它并检查输出,应该会产生:

item = [1]
item = [2]
item = [3]
item = [4]
item = [5]
item = [6]
item = [7]
item = [8]
item = [9]
item = [10]
complete

所以它能正常工作,对吗?看起来是这样的,但你可能有一种直觉,觉得有些东西还缺失。例如,发布者不会根据任何需求发出元素,而是立即将它们全部发送到下游。

事实证明,有一种方法可以证明这种简单的实现远非正确。这可以通过运行一些来自Reactive Streams TCK的测试来实现。TCK(或技术兼容性套件)只是一个测试框架,用于验证反应式组件的实现是否正确,即组件之间的正确交互。它的目标是确保所有自定义反应式流实现都可以平稳地工作在一起 - 通过抽象接口连接 - 同时正确执行所有数据传输、信号和背压。

要为SimplePublisher创建一个测试用例,你需要在构建定义中添加一个适当的依赖项,并扩展TCK的FlowPublisherVerification

运行朴素发布者的测试用例后,你会发现它确实存在一些问题:

针对SimplePublisher运行TCK发布者测试的结果

实际上,只有一个测试用例通过了,其他所有测试都存在问题。这清楚地表明了这种简单的实现并不是正确的。

测试用例名称中的数字是指反应式流规范中的相应条目,你可以在其中进一步探索这些要求背后的概念。

事实证明,大多数问题可以通过几个小的更改来消除,即:

  • 引入一个Subscription的实现,将发布者与其订阅者连接起来,根据需求发出元素
  • 添加一些基本的错误处理,
  • 在订阅中添加一些简单的状态,以正确处理终止。

有关详细信息,请查看示例代码的提交历史记录

然而,最终你会遇到一些问题变得不太简单且难以解决。

由于实现是同步的,存在一个问题,即订阅的request()调用订阅者的onNext()导致无限递归,订阅者又调用request(),以此类推。

另一个严重的问题与处理无限需求有关(即订阅者请求Long.MAX_VALUE元素,可能多次请求)。如果你在这里不小心,可能会产生太多线程或溢出一些long值,其中你可能存储了累积的需求。

不要在家尝试这个

上面的示例的底线是,反应式组件真的不容易正确地实现。因此,除非你正在撰写另一个反应式流实现,否则你真的不应该自己实现它们,而是使用已经经过TCK验证的现有实现。

如果你决定自己编写实现,一定要了解规范的所有细节,并记得针对你的代码运行TCK。

新接口的目的

你可能会问,那么这些接口的目的是什么呢?它们被包含在JDK中的实际目标是提供所谓的服务提供者接口(或SPI)层。这最终应该作为不同具有反应式和流特性的组件的统一层,但可能会公开它们自己的自定义API,因此无法与其他类似实现进行互操作。

另一个同样重要的目标是将JDK的未来发展引向正确的方向,导致现有的流抽象(已经存在于JDK中并广泛使用)使用一些共同的接口 - 再次提高互操作性。

现有的流抽象

那么在JDK中已经有哪些流抽象了(其中“流”意味着逐块处理大量、可能是无限的数据,而不是事先将所有数据读入内存)?它们包括:

  • java.io.InputStream / OutputStream
  • java.util.Iterator
  • java.nio.channels.*
  • javax.servlet.ReadListener / WriteListener
  • java.sql.ResultSet
  • java.util.Stream
  • java.util.concurrent.Flow.*

尽管上述所有抽象都公开了某种流式行为,但它们缺少一个公共API,可以让你轻松地连接它们,例如使用Publisher从一个文件中读取数据,并使用Subscriber将其写入另一个文件中。

拥有这样的统一层的优势是可以使用单个调用:

publisher.subscribe(subscriber)

来处理反应式流处理的所有隐藏复杂性(如背压和信号)。

迈向理想的世界

如果让各种抽象使用共同的接口,可能会产生什么结果?让我们看几个例子。

最小操作集

JDK中当前的反应式流支持仅限于前面描述的四个接口。如果你以前使用过某些反应式库 - Akka StreamsRxJavaProject Reactor,你就会意识到它们的强大之处在于各种流组合器(如mapfilter等最简单的组合器)可直接使用。这些组合器在JDK中缺失,尽管你可能期望至少有几个组合器可用。为了解决这个问题,Lightbend提出了一个Reactive Streams Utilities的POC,这是一个内置基本操作的库,可以提供更复杂的插件作为现有实现的委托,并通过JVM系统参数指定,例如:

-Djava.flow.provider=akka

HTTP

如何以响应式方式通过HTTP接收上传的文件并将其上传到其他地方?

自Servlet版本3.1开始,就有异步Servlet IO。此外,从JDK 9开始,还有一个新的HTTP客户端(在Java 9/10中是在jdk.incubating.http模块中,但从Java 11开始被认为是稳定的)。除了更好的API外,新的客户端还支持Reactive Streams作为输入/输出。其中之一是它提供了一个POST(Publisher&lt;ByteBuffer&gt;)方法。

现在,如果HttpServletRequest提供了一个发布者来公开请求正文,那么上传接收到的文件将变为:

POST(BodyPublisher.fromPublisher(req.getPublisher())

通过使用那一行代码,所有的反应特性都在内部实现。

数据库访问

当涉及以反应式方式通用访问关系数据库时,异步数据库访问API(ADBA)带来了一些希望,不幸的是,它到目前为止还没有进入JDK。

还有R2DBC——_一种将反应式编程API带到关系数据存储中的努力。它目前支持H2和Postgres,并且与Spring Data JPA很好地协作,这可能是帮助扩大采用的优势。

然后还有一些供应商特定的异步驱动程序。但我们仍然缺少一个完美的解决方案,可以让你执行类似于:

Publisher<User> users = entityManager
    .createQuery("select u from users")
    .getResultPublisher()

这基本上是一个普通的JPA调用,只是使用了一个用户的Publisher而不是一个List

这仍然不是现实

再次提醒你——上述示例是对未来的展望,它们还没有实现。JDK和生态系统将朝哪个方向发展是时间和社区努力的问题。

统一层的实际用途

虽然HTTP和数据库的统一还没有实现,但已经可以使用在JDK中找到的统一接口实际连接各种Reactive Streams实现。

在这个例子中,我将使用Project Reactor的Flux作为发布者,Akka Streams的Flow作为处理器,RxJava作为订阅者。注意:下面的示例代码使用Java 10 var,因此,如果你打算自己尝试,请确保有适当的JDK。

看一下main,你会发现有三个组件构成了管道:reactorPublisherakkaStreamsProcessorFlowable(它打印到标准输出)。

当你查看工厂方法的返回类型时,你会注意到它们仅仅是常见的Reactive Streams接口(Publisher&lt;Long&gt;Processor&lt;Long, Long&gt;),它们用于无缝连接不同的实现。

此外,正如你所看到的,各种库不会直接返回统一类型(即它们在内部使用不同的类型层次结构),但它们需要一些粘合代码,将其内部类型转换为来自java.util.concurrent.Flow.*的类型,如JdkFlowAdapterJavaFlowSupport

最后但并非最不重要的是,你可以在流引擎的内部暴露不同库之间的差异。虽然Project Reactor倾向于完全隐藏内部,但另一方面,Akka Streams要求你显式地定义一个材料化器——流水线的运行时。

总结

以下是本文的几个关键要点:

  • JDK中的Reactive Streams支持不是规范的完整实现,而仅仅是共同的接口,
  • 这些接口是作为SPI(服务提供程序接口)提供的——不同的Reactive Streams实现的统一层,
  • 自己实现接口并不容易,也不推荐,除非你正在创建一些新的库;如果你决定实现它们,请确保TCK中的所有测试都是绿色的——这给你带来了良好的机会,使你的库能够与其他反应式组件平稳工作。

如果你想尝试TCK和SimplePublisher示例,代码可在我的GitHub上找到:

rucek/reactive-streams-java9

如果你对深入了解Reactive Streams实现感兴趣,我真诚地推荐Advanced Reactive Java博客。

译自:https://blog.softwaremill.com/how-not-to-use-reactive-streams-in-java-9-7a39ea9c2cb3

版权声明:本文内容由TeHub注册用户自发贡献,版权归原作者所有,TeHub社区不拥有其著作权,亦不承担相应法律责任。 如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。

点赞(0)
收藏(0)
阿波
The minute I see you, I want your clothes gone!

评论(0)

添加评论