Pub/Sub 的 Pulsar 连接器
本篇博客将介绍如何为 Google Cloud Pub/Sub 开发 Apache Pulsar 连接器。它包含支持最基本数据中继场景的可用代码,提供了在 GitHub 上构建和测试的自动化框架,并讨论了在每个步骤中需要考虑的问题。如果你对 Pub/Sub 和使用 Pulsar IO 开发 Pulsar 连接器都是新手,那么这篇博客将是一个很好的起点。
Pub/Sub sink 连接器
在开始编写 source connector 之前,我们先编写 sink connector,这样你就有机会熟悉如何将消息发送到 Pub/Sub,因为它是两者中较为简单的一个。你可以拆解官方 Pub/Sub Java 快速入门示例,并将发布者客户端创建和关闭代码以及发布调用放置在 Sink 接口实现的 open()、write() 和 close() 方法中。
此示例在 Pub/Sub 发布者中启用了批处理。如果你已经熟悉 Pulsar 中的批处理,那么这不是什么新鲜事物。设置每批最大消息数可以实现更高的吞吐量。以下是 Pub/Sub 发布者客户端上的其他自定义设置及其在 Pulsar 中的等效项:
- Pub/Sub 发布者排序键与 Pulsar 消息或分区键
- Pub/Sub 模式与 Pulsar Schema
将这些自定义设置添加到你的 sink 连接器中基本上需要两个步骤:首先,在你的 sink 配置类中实现它们,如下所示;然后,从 sink 连接器类中调用它们,如上所示。
Pub/Sub source 连接器
在高层次上,source 连接器代码与 sink 连接器代码几乎相同。我们仍然在 Source 接口实现的 open() 和 close() 方法中创建和关闭 Pub/Sub 订阅客户端。但是,我们使用 LinkedBlockingQueue 来暂时存储从 Pub/Sub 订阅接收到的消息。在 Source 接口实现中,read() 方法简单地弹出队列中最旧的消息并将其转发到 Pulsar 主题。还有一个类,将 Pub/Sub 消息转换为 Pulsar 记录。
使用 LinkedBlockingQueue 是一种简单的实现方法,如果程序在消息被确认并在成功写入 Pulsar 之前失败,它可能会导致消息丢失。最好让 LinkedBlockingQueue 包含消息本身和 AckReplyConsumer 对,只有在成功写入 Pulsar 后才调用 ack()。(感谢Kamal Aboul-Hosn)
除了执行简单的消息转发之外,此示例还展示了如何在 Pub/Sub 订阅客户端中启用流量控制和并发控制。流量控制确保没有单个订阅者被压垮或饿死。并发控制允许打开多个 gRPC 双向流以拉取消息,即 setParallelPullCount(),并使用处理消息的线程的多个线程回调函数,即 setExecutorThreadCount()。
该示例允许用户输入前者,并为演示目的硬编码后者。通常情况下,你应该让用户根据其消息处理的 CPU 绑定情况和其处理传入消息量的能力来决定这些值。
构建
这里的连接器使用 Apache Pulsar 2.11.x
和 Java 17
。要将它们打包为 NAR,你可以将以下build
插件添加到你的 pom.xml
:
然后,需要创建一个 META-INF/services/pulsar-io.yaml
文件,指定这些连接器参数:
在我的情况下,运行 mvn clean package -DskipTests=true
将创建一个大小约为 84 MB
的 target/tz-pulsar-io-1.0-SNAPSHOT.nar
。
测试
我的repo中的 README
文件介绍了如何在本地独立的 Pulsar 集群上运行连接器。在开发它们时,这是我测试它们的方法。但是,这不是一种高效的测试方法,因为它涉及许多设置步骤。使用 Cloud Build 启动本地独立的 Pulsar 集群可以节省大量时间。
这个 Dockerfile
使用预安装了 Java 17
和 Maven 的 Maven 镜像。它还安装了 wget
,用于下载 Pulsar。
这个 cloudbuild.yaml
被分解成两个步骤。步骤#1 pulsar-standalone
构建了在 Dockerfile
中定义的 Docker 镜像,并将镜像上传到 Artifact Registry。从那里,步骤#2 run-tests
启动独立的 Pulsar
集群,为连接器打包 NAR
,然后使用 Pulsar CLI 工具单独运行 sink 和 source 集成测试。承认,需要一些努力才能将一切做对。以下是脚本中出现一些奇怪选择的原因:
sleep 20
是必要的,因为healthcheck
命令不会立即返回ok
。在此处失败将导致测试的其余部分失败。- 从
/workspace
(Cloud Build 提供的默认卷)将nar
复制到${PULSAR_HOME}
也是必要的。将--archive
指向/workspace
中打包的nar
会返回错误,例如Sink/Source doesn't exist
。 - 在启动
mvn clean test
时使用&
运算符在后台运行 sink 和 source 连接器,以确保运行中的 Pulsar 集群可用于集成测试。
我设置了一个 Cloud Build 触发器,以在每次对 main
分支进行新提交时运行集成测试。还有其他触发器可用,例如对不同分支的推送、具有特定标签的提交或拉取请求。
这是我的 Cloud Build 历史记录的屏幕截图。faf6afb7
构建是由我最新的提交触发的,用于更新 README.md
。
本文所有可用的工作代码都可以在 GitHub 上找到。请注意,尽管连接器按预期工作,但它们并不适用于生产环境,也没有发布。这是一个简单的实现,旨在学习如何为 Pub/Sub 开发 Pulsar 连接器。使用 Cloud Build 进行的 CI/CD 设置在你的 Pulsar 连接器的官方开发和发布周期中都非常适用,我强烈推荐你使用它们。希望你会发现这篇博客有用,并且如果有问题,请随时联系。
译自:https://medium.com/google-cloud/develop-pulsar-connectors-for-pub-sub-7c6cd4499877
评论(0)