首页
Preview

如何使用Symfony Messenger批量处理消息。

一个有用的功能解释。

Symfony Messenger 5.4 在2021年末 引入了 批量处理消息。

好处是显而易见的——你可以一次处理多个消息,而不是一个一个处理,这在某些情况下更加有用和高效(比如将一批文档通过一个HTTP请求送到外部搜索引擎(如Solr),而不是每个消息都发起单独的请求)。

不幸的是,在Messenger的 文档 中,除了我上面链接的公告之外,没有更多信息。在Stack Overflow上有一个未回答的 问题,在Symfony的GitHub页面上有一个未解决的 问题,还有一些相关的 Slack频道

让我们更详细地了解这个功能。如果你只是想使用它,你可以直接跳到 总结

Messenger组件

我假设你已经将symfony/messenger组件添加到你的项目中,并且对它的工作原理有基本的了解。

以下是Messenger组件中与批量处理消息相关的部分:

  • \Symfony\Component\Messenger\Worker
  • \Symfony\Component\Messenger\Middleware\HandleMessageMiddleware
  • \Symfony\Component\Messenger\Handler\BatchHandlerInterface
  • \Symfony\Component\Messenger\Handler\BatchHandlerTrait
  • \Symfony\Component\Messenger\Handler\Acknowledger
  • \Symfony\Component\Messenger\Stamp\AckStamp
  • \Symfony\Component\Messenger\Stamp\FlushBatchHandlersStamp

我不会详细讲解所有细节,但如果你好奇的话,可以自己看看代码。

示例分析

让我们更仔细地看一下公告中给出的示例:

处理程序类实现了BatchHandlerInterface接口,该接口仅声明了一个公共方法flush¹

/**
 * Flushes any pending buffers.
 *
 * @param bool $force Whether flushing is required
 */
 public function flush(bool $force): void;

接下来是BatchHandlerTrait特性,它实现了接口和一些私有方法来排队和处理消息。

然后是实际的__invoke方法,这个方法由中间件调用来处理消息MyMessage,它还有另一个(可选的)类型为Acknowledger的参数。

__invoke方法调用了私有的handle方法(默认实现在特性中),并传入给定的消息MyMessageAcknowledger实例。

然后handle方法要么将消息排队,要么刷新队列(调用处理程序的process方法),如果达到了排队消息的定义数量或没有给定Acknowledger实例。

Acknowledger

有趣的部分是__invoke方法的Acknowledger参数。为了理解这里发生了什么(即这个参数来自哪里),请看\Symfony\Component\Messenger\Middleware\HandleMessageMiddleware::handle方法,该方法通过处理程序描述符(你的处理程序实现了BatchHandlerInterface接口)将你的处理程序识别为批处理程序,并且如果消息的信封上有AckStamp(稍后会详细介绍),则会实例化一个新的Acknowledger并将其作为第二个参数传递给处理程序调用:

因此,每当批处理程序不确认消息(由于异常;请参见上面公告示例中带有$ack->nack($e)行的部分),异常就会被包装在HandlerFailedException中,这可能会导致工作程序的新交付尝试(重试)。

当消息通过worker分派到消息总线时,\Symfony\Component\Messenger\Worker::handleMessage方法会自动将AckStamp添加到消息的信封中。

因此,这意味着AckStamp仅添加到异步传输的消息中!通过同步传输(sync://)传输的消息总是立即处理(不是批处理)。

批量处理

让我们回到处理程序。

如果$ack不为null,则handle方法必须返回批处理中待处理消息的数量;否则,将在HandleMessageMiddleware中抛出LogicException异常。特性为你实现了此方法。

如果没有传递Acknowledger实例,则消息将立即处理(强制刷新堆栈)。

否则,消息将被推送到内部队列(实际上是一个数组),并调用shouldFlush方法来决定是否已达到阈值(默认值为10)。如果是,则刷新队列,并通过处理程序的process方法处理批次。

要定义另一个限制,你可以简单地添加一个像这样的方法

private function shouldFlush(): bool
{
    return 15 <= \count($this->jobs);
}

在你的处理程序²中。

正如之前所说,如果达到了定义的批量大小,则处理程序的process方法将被调用,并将排队的作业作为参数传递。该参数是一个类似于

[
    [
        0: MyMessage,
        1: Acknowledger
    ],    
    [
        0: MyMessage,
        1: Acknowledger
    ],
]

的数组,因此你可以像这样迭代它:ack 方法的参数仅适用于期望返回值的同步命令总线。我更喜欢不返回值,所以我们只需将消息本身用作参数。

简而言之

简而言之,要批量处理消息,你需要执行以下操作:

  • 创建消息和处理程序
  • 为消息和处理程序配置异步传输
  • 为处理程序实现 BatchHandlerInterface
  • 使用 BatchHandlerTrait(或自己实现方法)
  • 在处理程序的 __invoke 方法的第二个参数中期望一个 Acknowledger 实例
  • 覆盖 trait 的 shouldFlush 方法以设置自定义批量大小(默认值 = 10)
  • 在处理程序中实现一个 process 方法
  • 处理作业并通过 acknack 处理可能的异常
  • 如果你不希望处理程序返回结果,只需将作业的消息设置为 ack 参数

工作示例

我已经在 GitHub 上创建了一个完整的 工作示例

我希望我已经正确描述了一切,如果没有,请留下评论,我将很乐意调整文章。感谢阅读,祝你使用 Symfony 进行批处理!

[1] 有趣的是,flush 方法的 $force 布尔变量在 trait 中从未被使用过。但也许这是一种选项,如果你自己实现 flush 方法。

[2] 请记住,你可以随时覆盖 trait 的方法。使用 trait 的类中实现的方法优先级更高。

译自:https://wolfgang-klinger.medium.com/how-to-handle-messages-in-batches-with-symfony-messenger-c91b5aa1c8b1

版权声明:本文内容由TeHub注册用户自发贡献,版权归原作者所有,TeHub社区不拥有其著作权,亦不承担相应法律责任。 如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。

点赞(0)
收藏(0)
anko
宽以待人处事,严于律己修身。

评论(0)

添加评论