管道和筛选器模式
将一个执行复杂处理的任务分解为一系列可重复使用的单个元素。 这样做可以单独部署和缩放执行处理的任务元素,从而提高性能、可伸缩性和可重用性。
上下文和问题
应用程序可对其处理的信息执行复杂性不一的各种任务。 实施应用程序的一种直接但不灵活的方法是将这种处理当作单一模块来执行。 但是,这种方法可能会降低重构代码、优化代码或在应用程序的其他位置需要相同处理的部分时重复使用代码的机会。 下图说明了使用单一方法处理数据时出现的问题。 应用程序接收并处理来自两个源的数据。 由单个模块处理来自每个源的数据,执行一系列的任务来转换这些数据,然后再将结果传递到应用程序的业务逻辑。 单个模块执行的某些任务在功能上很相似,但这些模块已经过单独设计。 实现任务的代码紧密组合在模块中。 开发期间没有考虑到重用和可伸缩性。 但是,随着业务需求的更新,由每个模块执行的处理任务或每个任务的部署要求都可能会更改。 某些任务可能是计算密集型的,最好是在功能强大的硬件上运行。 其他任务可能不需要此类昂贵的资源。 此外,将来可能还需要额外的处理,处理执行任务的顺序也可能更改。 我们需要一种解决方案,它既能解决这些问题,又能增加代码重用的可能性。
解决方案
将每个流所需的处理分解为一组单独的组件(或筛选器),每个组件执行一项任务。 要实现每个组件接收和发送的数据的标准格式,这些筛选器可以组合到管道中。 这样做可避免代码重复,并可在处理要求发生更改的情况下,轻松删除、替换或集成其他。 下图显示了使用管道和筛选器实现的解决方案: 处理单个请求所花的时间取决于管道中最慢筛选器的速度。 一个或多个筛选器可能是瓶颈,尤其是在特定数据源的流中出现大量请求时。 管道结构的主要优点是,它提供了为速度缓慢的筛选器运行并行实例的机会,使系统能够分散负载并提高吞吐量。 构成管道的筛选器可以在不同的计算机上运行,使其能够独立缩放并充分利用许多云环境提供的弹性。 计算密集型筛选器可以在高性能硬件上运行,而其他要求较低的筛选器可以在成本较低的商用硬件上托管。 筛选器甚至不需要位于同一个数据中心或地理位置,这样管道中的每个元素都可以在靠近自己所需资源的环境中运行。 下图显示了应用于来源 1 的数据管道的示例: 如果筛选器的输入和输出采用流结构,则可以为每个筛选器并行执行处理。 管道中的第一个筛选器可以启动其工作并输出其结果,然后在第一个筛选器完成工作之前,按顺序将结果直接传递到下一筛选器。 此模型的另一个好处是可以提供复原能力。 如果某一筛选器失败或运行它的计算机不再可用,管道可以重新计划执行筛选器的工作,并将这项工作定向到组件的另一个实例。 一个筛选器失败不一定就会导致整个管道失败。 将管道和筛选器模式与补偿事务模式结合使用,这是实现分布式事务的另一种方法。 可以将分布式事务分解为单独的、可补偿的任务,每个任务都可以通过筛选器来实现,而筛选器还能实现补偿事务模式。 可以将管道中的筛选器作为单独的托管任务来实现,在靠近它们所维护数据的位置运行。
问题和注意事项
在决定如何实现此模式时,请考虑以下几点:
- 复杂性。 此模式提供的较高灵活性也可能会引入复杂性,尤其是管道中的筛选器分布在不同的服务器上时。
- 可靠性。 使用基础结构来确保管道中的筛选器之间流动的数据不会丢失。
- 幂等性。 如果管道中的筛选器在接收消息后失败,并且将工作重新安排给筛选器的另一个实例,则部分工作可能已经完成。 如果这项工作仅更新全局状态的一些方面(例如存储在数据库中的信息),则可以重复一次更新。 如果筛选器在将其结果发布到管道中的下一个筛选器后失败,但在指示它已成功完成工作之前,则可能会发生类似的问题。 在这些情况下,相同的工作可能由筛选器的另一个实例重复执行,导致相同的结果发布两次。 此方案可能会导致管道中的后续筛选器对相同数据处理两次。 因此,应将管道中的筛选器设计为幂等。 有关详细信息,请参阅 Jonathan Oliver 博客中的 Idempotency Patterns(幂等模式)。
- 重复消息。 如果管道中的筛选器在将消息发布到管道的下一阶段后失败,则可能会运行筛选器的另一个实例,并且它会将相同消息的副本发布到管道。 此方案可能会导致将相同消息的两个实例传递到下一个筛选器。 为了避免此问题,管道应该检测和消除重复的消息。 备注 如果通过使用消息队列(例如 Azure 服务总线队列)实现管道,则消息队列基础结构可能会提供自动重复消息检测和删除功能。
- 上下文和状态。 在管道中,每个筛选器基本上是分开运行的,不应对它的调用方式做任何假设。 因此,每个筛选器应提供足够的上下文来执行其工作。 此上下文可以包括大量状态信息。
何时使用此模式
在以下情况下使用此模式:
- 应用程序所需的处理可以轻松分解为一组独立的步骤。
- 应用程序执行的处理步骤具有不同的可伸缩性要求。 备注 可以将应一起缩放的筛选器分组到同一进程中。 有关详细信息,请参阅计算资源整合模式。
- 需要具备一定的灵活性,以便能够对应用程序执行的处理步骤重新排序;或启用添加和删除步骤的功能。
- 系统可以从将不同步骤的处理能力分配给不同的服务器中获益。
- 需要一种可靠的解决方案,在处理数据的同时,尽可能降低步骤失败带来的影响。
在以下情况下,此模式可能不起作用:
- 应用程序执行的处理步骤不是独立的,或者它们必须作为单个事务的一部分一起执行。
- 步骤所需的上下文或状态信息量使得该方法效率低下。 它可能能够将状态信息保留到数据库,但如果数据库的额外负载导致过度争用资源,请勿使用此策略。
示例
可以使用一系列消息队列来提供实现管道所需的基础结构。 初始消息队列接收未处理的消息。 作为筛选器任务实现的组件侦听此队列中的消息,执行其工作,然后将转换的消息发布到序列中的下一个队列。 另一个筛选器任务可以侦听此队列中的消息,处理它们,将结果发布到另一个队列等,直到完全转换的数据出现在队列中的最终消息中。 下图演示了一个使用消息队列的管道:
如果在 Azure 上构建解决方案,则可以使用服务总线队列来提供可靠的可缩放排队机制。 以下 C# 代码中所示的 ServiceBusPipeFilter
类演示了如何实现从队列接收输入消息的筛选器,处理这些消息以及将结果发布到另一队列。
备注
ServiceBusPipeFilter
类在可从 GitHub 获取的 PipesAndFilters.Shared 项目中定义。
public class ServiceBusPipeFilter
{
...
private readonly string inQueuePath;
private readonly string outQueuePath;
...
private QueueClient inQueue;
private QueueClient outQueue;
...
public ServiceBusPipeFilter(..., string inQueuePath, string outQueuePath = null)
{
...
this.inQueuePath = inQueuePath;
this.outQueuePath = outQueuePath;
}
public void Start()
{
...
// Create the outbound filter queue if it doesn't exist.
...
this.outQueue = QueueClient.CreateFromConnectionString(...);
...
// Create the inbound and outbound queue clients.
this.inQueue = QueueClient.CreateFromConnectionString(...);
}
public void OnPipeFilterMessageAsync(
Func<BrokeredMessage, Task<BrokeredMessage>> asyncFilterTask, ...)
{
...
this.inQueue.OnMessageAsync(
async (msg) =>
{
...
// Process the filter and send the output to the
// next queue in the pipeline.
var outMessage = await asyncFilterTask(msg);
// Send the message from the filter processor
// to the next queue in the pipeline.
if (outQueue != null)
{
await outQueue.SendAsync(outMessage);
}
// Note: There's a chance that the same message could be sent twice
// or that a message could be processed by an upstream or downstream
// filter at the same time.
// This would happen in a situation where processing of a message was
// completed, it was sent to the next pipe/queue, and it then failed
// to complete when using the PeekLock method.
// In a real-world implementation, you should consider idempotent message
// processing and concurrency.
},
options);
}
public async Task Close(TimeSpan timespan)
{
// Pause the processing threads.
this.pauseProcessingEvent.Reset();
// There's no clean approach for waiting for the threads to complete
// the processing. This example simply stops any new processing, waits
// for the existing thread to complete, closes the message pump,
// and finally returns.
Thread.Sleep(timespan);
this.inQueue.Close();
...
}
...
}
ServiceBusPipeFilter
类中的 Start
方法连接到一对输入和输出队列,Close
方法从输入队列中断开连接。 OnPipeFilterMessageAsync
方法执行实际的消息处理,此方法的 asyncFilterTask
参数指定要执行的处理。 OnPipeFilterMessageAsync
方法等待输入队列中的传入消息,在每个消息到达时对其运行 asyncFilterTask
参数指定的代码,并将结果发布到输出队列。 队列由构造函数指定。
示例解决方案在一组辅助角色中实现筛选器。 每个辅助角色都可以单独缩放,具体取决于它执行的业务处理的复杂性或进行处理所需的资源。 此外,可以并行运行每个辅助角色的多个实例以提高吞吐量。
以下代码显示名为 PipeFilterARoleEntry
的 Azure 辅助角色,已在示例解决方案的 PipeFilterA 项目中定义。
public class PipeFilterARoleEntry : RoleEntryPoint
{
...
private ServiceBusPipeFilter pipeFilterA;
public override bool OnStart()
{
...
this.pipeFilterA = new ServiceBusPipeFilter(
...,
Constants.QueueAPath,
Constants.QueueBPath);
this.pipeFilterA.Start();
...
}
public override void Run()
{
this.pipeFilterA.OnPipeFilterMessageAsync(async (msg) =>
{
// Clone the message and update it.
// Properties set by the broker (Deliver count, enqueue time, ...)
// aren't cloned and must be copied over if required.
var newMsg = msg.Clone();
await Task.Delay(500); // DOING WORK
Trace.TraceInformation("Filter A processed message:{0} at {1}",
msg.MessageId, DateTime.UtcNow);
newMsg.Properties.Add(Constants.FilterAMessageKey, "Complete");
return newMsg;
});
...
}
...
}
此角色包含 ServiceBusPipeFilter
对象。 该角色中的 OnStart
方法连接到接收输入消息和发布输出消息的队列。 (队列名称已在 Constants
类中定义)。Run
方法调用 OnPipeFilterMessageAsync
方法来对收到的每条消息执行处理。 (在本例中,通过等待一小段时间来模拟处理)。处理完成时,构造包含结果的新消息(在这种情况下,输入消息中已添加自定义属性),并且该消息已被发布到输出队列。
示例代码包含另一个名为 PipeFilterBRoleEntry
的辅助角色。 它位于 PipeFilterB 项目中。 此角色与 PipeFilterARoleEntry
类似,但是它在 Run
方法中执行不同的处理。 在示例解决方案中,这两个角色组合构成一个管道。 PipeFilterARoleEntry
角色的输出队列是 PipeFilterBRoleEntry
角色的输入队列。
示例解决方案还提供了两个名为 InitialSenderRoleEntry
(在 InitialSender 项目中)和 FinalReceiverRoleEntry
(在 FinalReceiver 项目中)的其他角色。 InitialSenderRoleEntry
角色在管道中提供初始消息。 OnStart
方法连接到单个队列,Run
方法将方法发布到此队列。 此队列是 PipeFilterARoleEntry
角色使用的输入队列,因此向其发布消息将使该消息由 PipeFilterARoleEntry
角色接收和处理。 然后,处理后的消息将通过 PipeFilterBRoleEntry
角色。
FinalReceiveRoleEntry
角色的输入队列是 PipeFilterBRoleEntry
角色的输出队列。 FinalReceiveRoleEntry
角色中的 Run
方法(如以下代码所示)接收消息并执行一些最终处理。 然后,它将管道中的筛选器添加的自定义属性值写入到跟踪输出。
public class FinalReceiverRoleEntry : RoleEntryPoint
{
...
// Final queue/pipe in the pipeline to process data from.
private ServiceBusPipeFilter queueFinal;
public override bool OnStart()
{
...
// Set up the queue.
this.queueFinal = new ServiceBusPipeFilter(...,Constants.QueueFinalPath);
this.queueFinal.Start();
...
}
public override void Run()
{
this.queueFinal.OnPipeFilterMessageAsync(
async (msg) =>
{
await Task.Delay(500); // DOING WORK
// The pipeline message was received.
Trace.TraceInformation(
"Pipeline Message Complete - FilterA:{0} FilterB:{1}",
msg.Properties[Constants.FilterAMessageKey],
msg.Properties[Constants.FilterBMessageKey]);
return null;
});
...
}
...
}
后续步骤
实现此模式时,你可能会发现以下资源很有用:
- GitHub 上演示此模式的示例
- Jonathan Oliver 博客中的幂等模式
相关资源
实现此模式时,以下模式也可能有用: