异步请求-答复模式
在后端处理需要是异步处理但前端仍需要明确响应的情况下,将后端处理与前端主机分离。
上下文和问题
在新式应用程序开发中,客户端应用程序(通常是在 Web 客户端(浏览器)中运行的代码)依赖远程 API 来提供业务逻辑和组合功能是很正常的。 这些 API 可能与应用程序直接相关,也可能是由第三方提供的共享服务。 通常,这些 API 调用通过 HTTP(S) 协议进行并遵循 REST 语义。 在大多数情况下,客户端应用程序的 API 旨在快速响应,大约 100 毫秒或更短。 许多因素都会影响响应延迟,包括:
- 应用程序的托管堆栈。
- 安全组件。
- 调用方和后端的相对地理位置。
- 网络基础结构。
- 当前负载。
- 请求有效负载的大小。
- 处理队列长度。
- 后端处理请求的时间。
这些因素中的任何一个都会增加响应的延迟。 可以通过横向扩展后端来缓解某些问题。 其他因素(例如网络基础结构)在很大程度上不受应用程序开发人员的控制。 大多数 API 可以快速响应,以便使响应通过同一连接返回。 应用程序代码可以非阻塞方式进行同步 API 调用,从而提供异步处理的外观,建议用于 I/O 相关操作。 然而,在某些情况下,后端完成的工作可能是长时间运行的(以秒为单位),或者可能是在几分钟甚至几小时内执行的后台进程。 在这种情况下,在响应请求之前等待作业完成是不可行的。 对于任何同步请求-回复模式,这种情况都是一个潜在的问题。 一些体系结构使用消息代理来分离请求和响应阶段,从而解决此问题。 此分离通常通过使用基于队列的负载均衡模式来实现。 此分离可以允许客户端进程和后端 API 进行独立扩展。 但是当客户端需要成功通知时,此分离也会带来额外的复杂性,因为此步骤必须是异步操作。 针对客户端应用程序讨论的许多相同注意事项也适用于分布式系统(例如,在微服务体系结构中)中的服务器到服务器 REST API 调用。
解决方案
此问题的一种解决方案是使用 HTTP 轮询。 轮询对于客户端代码很有用,因为它很难提供回调终结点或使用长时间运行的连接。 即使可以进行回调,所需的额外库和服务有时也会增加太多额外的复杂性。
- 客户端应用程序对 API 进行同步调用,从而在后端触发长时间运行的操作。
- API 会尽快地进行同步响应。 它返回 HTTP 202(已接受)状态代码,确认已收到要处理的请求。 备注 API 应在启动长时间运行的进程之前,同时验证请求和要执行的操作。 如果请求无效,请立即回复 HTTP 400(错误请求)等错误代码。
- 响应包含指向终结点的位置引用,客户端可以轮询该终结点以检查长时间运行操作的结果。
- API 将处理卸载到另一个组件,例如消息队列。
- 对于状态终结点的每次成功调用,它都会返回 HTTP 200。 当工作仍处于待处理状态时,状态终结点会返回指示工作仍在进行中的资源。 工作完成后,状态终结点可以返回指示已完成的资源,也可以重定向到其他资源 URL。 例如,如果异步操作创建了新资源,状态终结点将重定向到该资源的 URL。
下图显示了一个典型的流:
- 客户端发送请求并接收 HTTP 202(已接受)响应。
- 客户端向状态终结点发送 HTTP GET 请求。 由于工作仍处于待处理状态,因此该调用返回 HTTP 200。
- 在某个时刻,工作完成,状态终结点返回 302(已找到)并重定向到资源。
- 客户端从指定的 URL 提取资源。
问题和注意事项
-
可以借助许多可能的方法,通过 HTTP 实现此模式,但并非所有上游服务都具有相同的语义。 例如,当远程进程尚未完成时,大多数服务不会从 GET 方法返回 HTTP 202 响应。 遵循纯 REST 语义,它们应返回 HTTP 404(未找到)。 如果你认为调用的结果尚不存在,则此响应是有意义的。
-
HTTP 202 响应应指示客户端应轮询响应的位置和频率。 它应该具有以下附加标头:
标头 说明 说明 位置 客户端应轮询响应状态的 URL。 如果该位置需要访问控制,则该 URL 可以是带有附属密钥模式的 SAS 令牌。 当响应轮询需要卸载到另一个后端时,附属密钥模式也有效 Retry-After 估计处理何时完成 此标头旨在防止轮询客户端因重试而导致后端瘫痪。 -
可能需要使用处理代理或外观来操作响应标头或有效负载,具体取决于使用的基础服务。
-
如果状态终结点在完成时重定向,则 HTTP 302 或 HTTP 303 都是合适的返回代码,具体取决于你支持的确切语义。
-
成功处理后,由 Location 标头指定的资源应返回适当的 HTTP 响应代码,例如 200(正常)、201(已创建)或 204(无内容)。
-
如果在处理过程中发生错误,请将错误保留在 Location 标头中所述的资源 URL 中,并应从该资源向客户端返回适当的响应代码(4xx 代码)。
-
并非所有解决方案都将以相同的方式实现此模式,某些服务将包含附加或备用标头。 例如,Azure 资源管理器使用此模式的修改变体。 有关详细信息,请参阅 Azure 资源管理器异步操作。
-
旧版客户端可能不支持此模式。 在这种情况下,可能需要在异步 API 上放置外观,以对原始客户端隐藏异步处理。 例如,Azure 逻辑应用以原生方式支持此模式,可用作异步 API 和进行同步调用的客户端之间的集成层。 请参阅使用 Webhook 操作模式执行长时间运行任务。
-
在某些情况下,可能希望为客户端提供一种方法来取消长时间运行的请求。 在这种情况下,后端服务必须支持某种形式的取消指令。
何时使用此模式
在以下情况下可使用此模式:
- 客户端代码(例如浏览器应用程序),它很难提供回调终结点,或者使用长时间运行的连接会增加太多额外的复杂性。
- 只有 HTTP 协议可用且返回服务因客户端上的防火墙限制而导致无法触发回调操作的服务调用。
- 需要与不支持新式回调技术(如 WebSocket 或 Webhook)的旧版体系结构集成的服务调用。
在以下情况下,此模式可能不适用:
- 可改为使用为异步通知构建的服务,例如 Azure 事件网格。
- 响应必须实时流式传输到客户端。
- 客户端需要收集很多结果,接收这些结果的延迟很重要。 请考虑改为使用服务总线模式。
- 可以使用服务器端的持久性网络连接,例如 WebSockets 或 SignalR。 这些服务可用于通知调用方结果。
- 网络设计允许打开端口以接收异步回调或 Webhook。
示例
以下代码显示使用 Azure Functions 实现此模式的应用程序的摘录。 解决方案中包含三个函数:
- 异步 API 终结点。
- 状态终结点。
- 一个后端函数,它接受排队的工作项并执行这些项。
GitHub 上提供此示例。
AsyncProcessingWorkAcceptor 函数
AsyncProcessingWorkAcceptor
函数实现一个终结点,该终结点接受来自客户端应用程序的工作并将其放入队列中以进行处理。
- 该函数生成请求 ID 并将其作为元数据添加到队列消息中。
- HTTP 响应包括指向状态终结点的位置标头。 请求 ID 是 URL 路径的一部分。
public static class AsyncProcessingWorkAcceptor
{
[FunctionName("AsyncProcessingWorkAcceptor")]
public static async Task<IActionResult> Run(
[HttpTrigger(AuthorizationLevel.Anonymous, "post", Route = null)] CustomerPOCO customer,
[ServiceBus("outqueue", Connection = "ServiceBusConnectionAppSetting")] IAsyncCollector<ServiceBusMessage> OutMessages,
ILogger log)
{
if (String.IsNullOrEmpty(customer.id) || string.IsNullOrEmpty(customer.customername))
{
return new BadRequestResult();
}
string reqid = Guid.NewGuid().ToString();
string rqs = $"http://{Environment.GetEnvironmentVariable("WEBSITE_HOSTNAME")}/api/RequestStatus/{reqid}";
var messagePayload = JsonConvert.SerializeObject(customer);
var message = new ServiceBusMessage(messagePayload);
message.ApplicationProperties.Add("RequestGUID", reqid);
message.ApplicationProperties.Add("RequestSubmittedAt", DateTime.Now);
message.ApplicationProperties.Add("RequestStatusURL", rqs);
await OutMessages.AddAsync(message);
return new AcceptedResult(rqs, $"Request Accepted for Processing{Environment.NewLine}ProxyStatus: {rqs}");
}
}
AsyncProcessingBackgroundWorker 函数
AsyncProcessingBackgroundWorker
函数从队列中选取操作,根据消息有效负载执行一些工作,并将结果写入存储帐户。
public static class AsyncProcessingBackgroundWorker
{
[FunctionName("AsyncProcessingBackgroundWorker")]
public static async Task RunAsync(
[ServiceBusTrigger("outqueue", Connection = "ServiceBusConnectionAppSetting")] BinaryData customer,
IDictionary<string, object> applicationProperties,
[Blob("data", FileAccess.ReadWrite, Connection = "StorageConnectionAppSetting")] BlobContainerClient inputContainer,
ILogger log)
{
// Perform an actual action against the blob data source for the async readers to be able to check against.
// This is where your actual service worker processing will be performed
var id = applicationProperties["RequestGUID"] as string;
BlobClient blob = inputContainer.GetBlobClient($"{id}.blobdata");
// Now write the results to blob storage.
await blob.UploadAsync(customer);
}
}
AsyncOperationStatusChecker 函数
该 AsyncOperationStatusChecker
函数实现状态终结点。 该函数首先检查请求是否已完成
- 如果请求已完成,该函数要么将附属密钥返回到响应,要么立即将调用重定向到附属密钥 URL。
- 如果请求仍处于挂起状态,则应返回 200 代码,包括当前状态。
public static class AsyncOperationStatusChecker
{
[FunctionName("AsyncOperationStatusChecker")]
public static async Task<IActionResult> Run(
[HttpTrigger(AuthorizationLevel.Anonymous, "get", Route = "RequestStatus/{thisGUID}")] HttpRequest req,
[Blob("data/{thisGuid}.blobdata", FileAccess.Read, Connection = "StorageConnectionAppSetting")] BlockBlobClient inputBlob, string thisGUID,
ILogger log)
{
OnCompleteEnum OnComplete = Enum.Parse<OnCompleteEnum>(req.Query["OnComplete"].FirstOrDefault() ?? "Redirect");
OnPendingEnum OnPending = Enum.Parse<OnPendingEnum>(req.Query["OnPending"].FirstOrDefault() ?? "OK");
log.LogInformation($"C# HTTP trigger function processed a request for status on {thisGUID} - OnComplete {OnComplete} - OnPending {OnPending}");
// Check to see if the blob is present
if (await inputBlob.ExistsAsync())
{
// If it's present, depending on the value of the optional "OnComplete" parameter choose what to do.
return await OnCompleted(OnComplete, inputBlob, thisGUID);
}
else
{
// If it's NOT present, then we need to back off. Depending on the value of the optional "OnPending" parameter, choose what to do.
string rqs = $"http://{Environment.GetEnvironmentVariable("WEBSITE_HOSTNAME")}/api/RequestStatus/{thisGUID}";
switch (OnPending)
{
case OnPendingEnum.OK:
{
// Return an HTTP 200 status code.
return new OkObjectResult(new { status = "In progress", Location = rqs });
}
case OnPendingEnum.Synchronous:
{
// Back off and retry. Time out if the backoff period hits one minute.
int backoff = 250;
while (!await inputBlob.ExistsAsync() && backoff < 64000)
{
log.LogInformation($"Synchronous mode {thisGUID}.blob - retrying in {backoff} ms");
backoff = backoff * 2;
await Task.Delay(backoff);
}
if (await inputBlob.ExistsAsync())
{
log.LogInformation($"Synchronous Redirect mode {thisGUID}.blob - completed after {backoff} ms");
return await OnCompleted(OnComplete, inputBlob, thisGUID);
}
else
{
log.LogInformation($"Synchronous mode {thisGUID}.blob - NOT FOUND after timeout {backoff} ms");
return new NotFoundResult();
}
}
default:
{
throw new InvalidOperationException($"Unexpected value: {OnPending}");
}
}
}
}
private static async Task<IActionResult> OnCompleted(OnCompleteEnum OnComplete, BlockBlobClient inputBlob, string thisGUID)
{
switch (OnComplete)
{
case OnCompleteEnum.Redirect:
{
// Redirect to the SAS URI to blob storage
return new RedirectResult(inputBlob.GenerateSASURI());
}
case OnCompleteEnum.Stream:
{
// Download the file and return it directly to the caller.
// For larger files, use a stream to minimize RAM usage.
return new OkObjectResult(await inputBlob.DownloadContentAsync());
}
default:
{
throw new InvalidOperationException($"Unexpected value: {OnComplete}");
}
}
}
}
public enum OnCompleteEnum
{
Redirect,
Stream
}
public enum OnPendingEnum
{
OK,
Synchronous
}
后续步骤
实现此模式时,以下信息可能相关:
- Azure 逻辑应用 - 使用轮询操作模式执行长时间运行的任务。
- 有关设计 Web API 时的一般最佳做法,请参阅 Web API 设计。