一个有用的功能解释。
Symfony Messenger 5.4 在2021年末 引入了 批量处理消息。
好处是显而易见的——你可以一次处理多个消息,而不是一个一个处理,这在某些情况下更加有用和高效(比如将一批文档通过一个HTTP请求送到外部搜索引擎(如Solr),而不是每个消息都发起单独的请求)。
不幸的是,在Messenger的 文档 中,除了我上面链接的公告之外,没有更多信息。在Stack Overflow上有一个未回答的 问题,在Symfony的GitHub页面上有一个未解决的 问题,还有一些相关的 Slack频道。
让我们更详细地了解这个功能。如果你只是想使用它,你可以直接跳到 总结。
Messenger组件
我假设你已经将symfony/messenger
组件添加到你的项目中,并且对它的工作原理有基本的了解。
以下是Messenger组件中与批量处理消息相关的部分:
\Symfony\Component\Messenger\Worker
\Symfony\Component\Messenger\Middleware\HandleMessageMiddleware
\Symfony\Component\Messenger\Handler\BatchHandlerInterface
\Symfony\Component\Messenger\Handler\BatchHandlerTrait
\Symfony\Component\Messenger\Handler\Acknowledger
\Symfony\Component\Messenger\Stamp\AckStamp
\Symfony\Component\Messenger\Stamp\FlushBatchHandlersStamp
我不会详细讲解所有细节,但如果你好奇的话,可以自己看看代码。
示例分析
让我们更仔细地看一下公告中给出的示例:
处理程序类实现了BatchHandlerInterface
接口,该接口仅声明了一个公共方法flush
¹
/**
* Flushes any pending buffers.
*
* @param bool $force Whether flushing is required
*/
public function flush(bool $force): void;
接下来是BatchHandlerTrait
特性,它实现了接口和一些私有方法来排队和处理消息。
然后是实际的__invoke
方法,这个方法由中间件调用来处理消息MyMessage
,它还有另一个(可选的)类型为Acknowledger
的参数。
__invoke
方法调用了私有的handle
方法(默认实现在特性中),并传入给定的消息MyMessage
和Acknowledger
实例。
然后handle
方法要么将消息排队,要么刷新队列(调用处理程序的process
方法),如果达到了排队消息的定义数量或没有给定Acknowledger
实例。
Acknowledger
有趣的部分是__invoke
方法的Acknowledger
参数。为了理解这里发生了什么(即这个参数来自哪里),请看\Symfony\Component\Messenger\Middleware\HandleMessageMiddleware::handle
方法,该方法通过处理程序描述符(你的处理程序实现了BatchHandlerInterface
接口)将你的处理程序识别为批处理程序,并且如果消息的信封上有AckStamp
(稍后会详细介绍),则会实例化一个新的Acknowledger
并将其作为第二个参数传递给处理程序调用:
因此,每当批处理程序不确认消息(由于异常;请参见上面公告示例中带有$ack->nack($e)
行的部分),异常就会被包装在HandlerFailedException
中,这可能会导致工作程序的新交付尝试(重试)。
当消息通过worker分派到消息总线时,\Symfony\Component\Messenger\Worker::handleMessage
方法会自动将AckStamp
添加到消息的信封中。
因此,这意味着AckStamp
仅添加到异步传输的消息中!通过同步传输(sync://
)传输的消息总是立即处理(不是批处理)。
批量处理
让我们回到处理程序。
如果$ack
不为null,则handle
方法必须返回批处理中待处理消息的数量;否则,将在HandleMessageMiddleware
中抛出LogicException
异常。特性为你实现了此方法。
如果没有传递Acknowledger
实例,则消息将立即处理(强制刷新堆栈)。
否则,消息将被推送到内部队列(实际上是一个数组),并调用shouldFlush
方法来决定是否已达到阈值(默认值为10)。如果是,则刷新队列,并通过处理程序的process
方法处理批次。
要定义另一个限制,你可以简单地添加一个像这样的方法
private function shouldFlush(): bool
{
return 15 <= \count($this->jobs);
}
在你的处理程序²中。
正如之前所说,如果达到了定义的批量大小,则处理程序的process
方法将被调用,并将排队的作业作为参数传递。该参数是一个类似于
[
[
0: MyMessage,
1: Acknowledger
],
[
0: MyMessage,
1: Acknowledger
],
]
的数组,因此你可以像这样迭代它:ack
方法的参数仅适用于期望返回值的同步命令总线。我更喜欢不返回值,所以我们只需将消息本身用作参数。
简而言之
简而言之,要批量处理消息,你需要执行以下操作:
- 创建消息和处理程序
- 为消息和处理程序配置异步传输
- 为处理程序实现
BatchHandlerInterface
- 使用
BatchHandlerTrait
(或自己实现方法) - 在处理程序的
__invoke
方法的第二个参数中期望一个Acknowledger
实例 - 覆盖 trait 的
shouldFlush
方法以设置自定义批量大小(默认值 = 10) - 在处理程序中实现一个
process
方法 - 处理作业并通过
ack
或nack
处理可能的异常 - 如果你不希望处理程序返回结果,只需将作业的消息设置为
ack
参数
工作示例
我已经在 GitHub 上创建了一个完整的 工作示例。
我希望我已经正确描述了一切,如果没有,请留下评论,我将很乐意调整文章。感谢阅读,祝你使用 Symfony 进行批处理!
[1] 有趣的是,flush
方法的 $force
布尔变量在 trait 中从未被使用过。但也许这是一种选项,如果你自己实现 flush
方法。
[2] 请记住,你可以随时覆盖 trait 的方法。使用 trait 的类中实现的方法优先级更高。
评论(0)