优先级队列模式
为发送到服务的请求确定优先级,以便高优先级请求能够得到比低优先级请求更快速地接收和处理。 在向各个客户端提供不同服务级别保障的应用程序中,此模式非常有用。
上下文和问题
应用程序可以将特定任务委托给其他服务,例如执行后台处理,或与其他应用程序、服务集成。 在云中,消息队列通常用于将任务委托给后台处理。 在许多情况下,服务接收请求的顺序并不重要。 但在某些情况下,必须确定特定请求的优先级。 相对于应用程序以前发送的优先级较低的请求,这些请求应先于得到处理。
解决方案
队列通常是先入先出 (FIFO) 结构,使用者通常是按消息发布到队列的相同顺序接收的。 但是,某些消息队列支持优先级消息传送。 发布消息的应用程序可以分配优先级。 队列中的消息会自动重新排序,以便优先级较高的消息先于优先级较低的消息被接收。 下图演示了此过程: 备注 大多数消息队列实现都支持多个使用者。 (请参阅使用者竞争模式)使用者进程数可根据需要进行纵向扩展和缩减。 在不支持基于优先级的消息队列的系统中,替代解决方法是将每个优先级的消息保持一个单独队列。 应用程序负责将消息发布到相应的队列。 每个队列可以有单独的使用者池。 与优先级较低的队列相比,优先级较高的队列可以有更大的使用者池,这些使用者在速度更快的硬件上运行。 下图显示了对每个优先级使用单独的消息队列: 此策略的变体是实现单个使用者池,这些使用者首先检查高优先级队列中是否有消息,然后再从优先级较低的队列中提取消息。 使用单个使用者进程池的解决方案与使用多个队列的解决方案存在一些语义上的差异:前者使用单个队列支持具有不同优先级的消息,或使用多个队列,每个队列处理一种优先级的消息;而后者对每个队列使用一个单独池。 在单个池方法中,优先级较高的消息始终先于优先级较低的消息得到接收和处理。 理论上,低优先级消息可能会被不断取代,并且可能永远得不到处理。 在多个池方法中,始终都会处理优先级较低的消息,但不会像优先级较高的消息那样快(具体要取决于它们可用的池和资源的相对大小)。 使用优先级排队机制可以提供以下好处:
- 它能让应用程序满足需要确定可用性或性能优先级的业务需求,比如为不同客户群提供不同级别的服务。
- 它有助于最大程度地减少运营成本。 如果使用单队列方法,可以根据需要按比例缩减使用者数量。 仍然会先处理高优先级的消息(但速度可能较慢),较低优先级的消息可能会延迟更长时间。 如果实施多消息队列方法,并对每个队列使用单独的使用者池,则可减少优先级较低队列的使用者池。 甚至可通过阻止侦听这些队列消息的所有使用者来暂停处理某些优先级极低的队列。
- 多个消息队列方法可以基于处理要求为消息分区,从而帮助最大程度地提高应用程序性能和可伸缩性。 例如,可以确定关键任务的优先级,使其由立即运行的接收器处理,而不太重要的后台任务可由计划在相对空闲期间运行的接收器处理。
注意事项
在决定如何实现此模式时,请考虑以下几点:
- 在解决方案的上下文中定义优先级。 例如,高优先级消息可定义为应在 10 秒内处理的消息。 确定处理高优先级项目的要求,以及为满足这些条件而需要分配的其他资源。
- 决定是否必须在任何低优先级项目之前处理所有的高优先级项目。 如果消息由单个使用者池处理,则需要提供这样一种机制:如果优先级较高的消息进入队列,该机制可以取代和暂停正在处理低优先级消息的任务。
- 在多队列方法中,使用单个使用者进程池侦听所有队列,而不是每个队列都有专用的使用者池时,使用者必须应用一种算法,以确保始终都先为较高优先级队列中的消息提供服务,之后才是较低优先级队列中的消息。
- 监控高优先级和低优先级队列的处理速度,确保这些队列中的消息按照预期速度进行处理。
- 如果需要保证低优先级的消息得到处理,可实施具有多个使用者池的多消息队列方法。 或者,在支持消息优先级的队列中,可以随时间的推移动态提高已排队消息的优先级。 但是,这种方法取决于提供上述功能的消息队列。
- 对于具有少量明确定义的优先级的系统,建议使用基于消息优先级的单独队列策略。
- 系统可以在逻辑上确定消息优先级。 例如,可以将消息指定为“付费客户”或“非付费客户”,而不是采用明确的高优先级和低优先级消息。然后,系统可以分配更多资源来处理付费客户的消息。
- 检查队列中是否有消息时,可能会产生财务和处理成本。 例如,每次发布或检索消息时,以及每次查询队列中的消息时,某些商业消息传递系统都会收取少量费用。 上述成本会在检查多个队列时增加。
- 可以基于池所服务的队列长度动态调整使用者池的大小。 有关详细信息,请参阅自动缩放指南。
何时使用此模式
此模式在以下情况非常有用:
- 系统必须处理具有不同优先级的多个任务。
- 应为不同的用户或租户提供不同优先级。
示例
Azure 不提供本身支持通过排序自动确定消息优先级的排队机制。 但提供 Azure 服务总线主题、支持提供消息筛选的排队机制的服务总线订阅以及一系列灵活功能,这使得 Azure 成为大多数优先级队列实现的理想选择。
Azure 解决方案可以实施服务总线主题,应用程序可将消息发布到该主题,就像将消息发布到队列一样。 消息可以包含应用程序定义的自定义属性形式的元数据。 服务总线订阅可以与该主题相关联,并且这些订阅可以基于属性筛选消息。 应用程序将消息发送到主题时,消息被定向到可由使用者读取的相应订阅中。 使用者进程可以使用与消息队列相同的语义从订阅中检索消息。 (订阅是一个逻辑队列。)下图显示如何使用服务总线主题和订阅实施优先级队列:
在上图中,应用程序创建了多条消息,并在每条消息中分配一个名为 Priority
的自定义属性。 Priority
的值为 High
或 Low
。 应用程序将这些消息发布到主题。 主题包含两个相关联的订阅,这些订阅基于 Priority
属性筛选消息。 一个订阅接受 Priority
属性设置为 High
的消息。 另一个订阅接受 Priority
属性设置为 Low
的消息。 使用者池从每个订阅中读取消息。 高优先级的订阅具有更大的池,与低优先级池的计算机相比,这些高优先级池中的使用者可能在功能更强大、可用资源更多的计算机上运行。
本示例中指定的高优先级和低优先级消息没有什么特别的。 它们只是每个消息中指定为属性的标签, 用于将消息定向到特定订阅。 如果需要其他优先级,相对容易的做法是创建更多订阅和使用者进程池来处理这些优先级。
GitHub 上的 PriorityQueue 解决方案基于此方法。 此解决方案包含名为 PriorityQueueConsumerHigh
和 PriorityQueueConsumerLow
的 Azure 函数项目。 这些 Azure Functions 项目通过触发器和绑定来与服务总线集成。 它们连接到 ServiceBusTrigger
中定义的不同订阅,并对传入消息做出响应。
public static class PriorityQueueConsumerHighFn
{
[FunctionName("HighPriorityQueueConsumerFunction")]
public static void Run(
[ServiceBusTrigger("messages", "highPriority", Connection = "ServiceBusConnection")]string highPriorityMessage,
ILogger log)
{
log.LogInformation($"C# ServiceBus topic trigger function processed message: {highPriorityMessage}");
}
}
管理员可以配置 Azure 应用服务上的函数可以横向扩展到的实例数。 方法是在 Azure 门户中配置“强制实施横向扩展限制”选项,为每个函数设置横向扩展上限。 通常,PriorityQueueConsumerHigh
函数的实例数需要多于 PriorityQueueConsumerLow
函数。 此配置可确保从队列中读取高优先级消息的速度比低优先级消息更快。
另一个项目 PriorityQueueSender
包含一个时间触发的 Azure 函数,该函数配置为每 30 秒运行一次。 此函数通过输出绑定与服务总线集成,并将低优先级和高优先级消息批量发送到 IAsyncCollector
对象。 当该函数将消息发布到与 PriorityQueueConsumerHigh
和 PriorityQueueConsumerLow
函数使用的订阅相关联的主题时,它通过使用 Priority
自定义属性指定优先级,如下所示:
public static class PriorityQueueSenderFn
{
[FunctionName("PriorityQueueSenderFunction")]
public static async Task Run(
[TimerTrigger("0,30 * * * * *")] TimerInfo myTimer,
[ServiceBus("messages", Connection = "ServiceBusConnection")] IAsyncCollector<ServiceBusMessage> collector)
{
for (int i = 0; i < 10; i++)
{
var messageId = Guid.NewGuid().ToString();
var lpMessage = new ServiceBusMessage() { MessageId = messageId };
lpMessage.ApplicationProperties["Priority"] = Priority.Low;
lpMessage.Body = BinaryData.FromString($"Low priority message with Id: {messageId}");
await collector.AddAsync(lpMessage);
messageId = Guid.NewGuid().ToString();
var hpMessage = new ServiceBusMessage() { MessageId = messageId };
hpMessage.ApplicationProperties["Priority"] = Priority.High;
hpMessage.Body = BinaryData.FromString($"High priority message with Id: {messageId}");
await collector.AddAsync(hpMessage);
}
}
}
后续步骤
实现此模式时,以下资源可能有用:
- GitHub 上演示此模式的示例。
- 异步消息传递入门。 处理请求的使用者服务可能需要将回复发送到发布请求的应用程序的实例。 本文提供可用于实现请求/响应消息传递的策略的相关信息。
- 自动缩放指南。 有时可以根据队列长度,扩展处理队列的使用者进程池的大小。 此策略可以帮助提高性能,尤其适用于处理高优先级消息的池。
相关资源
实现此模式时,以下模式可能有用: