首页
Preview

开发 Pulsar 连接器用于发布/订阅

Pub/Sub 的 Pulsar 连接器

本篇博客将介绍如何为 Google Cloud Pub/Sub 开发 Apache Pulsar 连接器。它包含支持最基本数据中继场景的可用代码,提供了在 GitHub 上构建和测试的自动化框架,并讨论了在每个步骤中需要考虑的问题。如果你对 Pub/Sub 和使用 Pulsar IO 开发 Pulsar 连接器都是新手,那么这篇博客将是一个很好的起点。

Pub/Sub sink 连接器

在开始编写 source connector 之前,我们先编写 sink connector,这样你就有机会熟悉如何将消息发送到 Pub/Sub,因为它是两者中较为简单的一个。你可以拆解官方 Pub/Sub Java 快速入门示例,并将发布者客户端创建和关闭代码以及发布调用放置在 Sink 接口实现的 open()、write() 和 close() 方法中。

此示例在 Pub/Sub 发布者中启用了批处理。如果你已经熟悉 Pulsar 中的批处理,那么这不是什么新鲜事物。设置每批最大消息数可以实现更高的吞吐量。以下是 Pub/Sub 发布者客户端上的其他自定义设置及其在 Pulsar 中的等效项:

将这些自定义设置添加到你的 sink 连接器中基本上需要两个步骤:首先,在你的 sink 配置类中实现它们,如下所示;然后,从 sink 连接器类中调用它们,如上所示。

Pub/Sub source 连接器

在高层次上,source 连接器代码与 sink 连接器代码几乎相同。我们仍然在 Source 接口实现的 open() 和 close() 方法中创建和关闭 Pub/Sub 订阅客户端。但是,我们使用 LinkedBlockingQueue 来暂时存储从 Pub/Sub 订阅接收到的消息。在 Source 接口实现中,read() 方法简单地弹出队列中最旧的消息并将其转发到 Pulsar 主题。还有一个类,将 Pub/Sub 消息转换为 Pulsar 记录。

使用 LinkedBlockingQueue 是一种简单的实现方法,如果程序在消息被确认并在成功写入 Pulsar 之前失败,它可能会导致消息丢失。最好让 LinkedBlockingQueue 包含消息本身和 AckReplyConsumer 对,只有在成功写入 Pulsar 后才调用 ack()。(感谢Kamal Aboul-Hosn

除了执行简单的消息转发之外,此示例还展示了如何在 Pub/Sub 订阅客户端中启用流量控制并发控制。流量控制确保没有单个订阅者被压垮或饿死。并发控制允许打开多个 gRPC 双向流以拉取消息,即 setParallelPullCount(),并使用处理消息的线程的多个线程回调函数,即 setExecutorThreadCount()。

该示例允许用户输入前者,并为演示目的硬编码后者。通常情况下,你应该让用户根据其消息处理的 CPU 绑定情况和其处理传入消息量的能力来决定这些值。

还可以实现其他订阅者功能,例如仅一次传递过滤

构建

这里的连接器使用 Apache Pulsar 2.11.x 和 Java 17。要将它们打包为 NAR,你可以将以下build插件添加到你的 pom.xml

然后,需要创建一个 META-INF/services/pulsar-io.yaml 文件,指定这些连接器参数:

在我的情况下,运行 mvn clean package -DskipTests=true 将创建一个大小约为 84 MBtarget/tz-pulsar-io-1.0-SNAPSHOT.nar

测试

我的repo中的 README 文件介绍了如何在本地独立的 Pulsar 集群上运行连接器。在开发它们时,这是我测试它们的方法。但是,这不是一种高效的测试方法,因为它涉及许多设置步骤。使用 Cloud Build 启动本地独立的 Pulsar 集群可以节省大量时间。

这个 Dockerfile 使用预安装了 Java 17 和 Maven 的 Maven 镜像。它还安装了 wget,用于下载 Pulsar。

这个 cloudbuild.yaml 被分解成两个步骤。步骤#1 pulsar-standalone 构建了在 Dockerfile 中定义的 Docker 镜像,并将镜像上传到 Artifact Registry。从那里,步骤#2 run-tests 启动独立的 Pulsar 集群,为连接器打包 NAR,然后使用 Pulsar CLI 工具单独运行 sink 和 source 集成测试。承认,需要一些努力才能将一切做对。以下是脚本中出现一些奇怪选择的原因:

  • sleep 20 是必要的,因为 healthcheck 命令不会立即返回 ok。在此处失败将导致测试的其余部分失败。
  • /workspace(Cloud Build 提供的默认卷)将 nar 复制到 ${PULSAR_HOME} 也是必要的。将 --archive 指向 /workspace 中打包的 nar 会返回错误,例如 Sink/Source doesn't exist
  • 在启动 mvn clean test 时使用 & 运算符在后台运行 sink 和 source 连接器,以确保运行中的 Pulsar 集群可用于集成测试。

我设置了一个 Cloud Build 触发器,以在每次对 main 分支进行新提交时运行集成测试。还有其他触发器可用,例如对不同分支的推送、具有特定标签的提交或拉取请求。

这是我的 Cloud Build 历史记录的屏幕截图。faf6afb7 构建是由我最新的提交触发的,用于更新 README.md

本文所有可用的工作代码都可以在 GitHub 上找到。请注意,尽管连接器按预期工作,但它们并不适用于生产环境,也没有发布。这是一个简单的实现,旨在学习如何为 Pub/Sub 开发 Pulsar 连接器。使用 Cloud Build 进行的 CI/CD 设置在你的 Pulsar 连接器的官方开发和发布周期中都非常适用,我强烈推荐你使用它们。希望你会发现这篇博客有用,并且如果有问题,请随时联系。

译自:https://medium.com/google-cloud/develop-pulsar-connectors-for-pub-sub-7c6cd4499877

版权声明:本文内容由TeHub注册用户自发贡献,版权归原作者所有,TeHub社区不拥有其著作权,亦不承担相应法律责任。 如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。

点赞(0)
收藏(0)
阿波
The minute I see you, I want your clothes gone!

评论(0)

添加评论