在该篇文章中,我们将使用 github.com/hibiken/asynq
包将任务加入 Redis 队列,并通过 github.com/robfig/cron
包创建定时任务调度器。本指南将逐步介绍如何设置任务队列、安排定时任务以及实现优雅的程序关闭。
初始化模块
首先,为项目创建一个新的 Go 模块:
go mod init learn_queue_and_cron
创建 cron.go
文件
cron.go
文件负责在特定的时间间隔内调度和运行任务。以下是实现代码:
package main
import (
"fmt"
"log"
"time"
"github.com/robfig/cron/v3"
)
func runCron(c *cron.Cron) {
// 每分钟执行一次任务
_, err := c.AddFunc("@every 1m", func() {
fmt.Printf("每分钟执行任务: %v\n", time.Now().Local())
})
if err != nil {
log.Fatal(err)
}
// 启动定时调度器
c.Start()
log.Println("定时任务调度器已启动")
// 保持主协程运行
select {}
}
这段代码每分钟执行一次任务,并确保程序持续运行以支持调度器工作。
创建 queue.go
文件
queue.go
文件管理任务队列的处理,代码如下:
package main
import (
"context"
"encoding/json"
"fmt"
"log"
"github.com/hibiken/asynq"
)
func runQueue(server *asynq.Server) {
mux := asynq.NewServeMux()
mux.HandleFunc("send_email", emailHandler)
mux.HandleFunc("generate_report", reportHandler)
if err := server.Run(mux); err != nil {
log.Fatalf("Asynq 服务器运行失败: %v", err)
}
}
func emailHandler(ctx context.Context, task *asynq.Task) error {
var payload struct {
To string `json:"to"`
}
if err := json.Unmarshal(task.Payload(), &payload); err != nil {
return fmt.Errorf("解析任务载荷失败: %w", err)
}
fmt.Printf("发送邮件至: %s\n", payload.To)
return nil
}
func reportHandler(ctx context.Context, task *asynq.Task) error {
var payload struct {
ReportID int `json:"report_id"`
}
if err := json.Unmarshal(task.Payload(), &payload); err != nil {
return fmt.Errorf("解析任务载荷失败: %w", err)
}
fmt.Printf("生成报告 ID: %d\n", payload.ReportID)
return nil
}
解释:
emailHandler
和reportHandler
负责解析任务的载荷并执行相应操作。- 任务队列通过 Asynq 定义和处理,如
send_email
和generate_report
。
创建 router.go
文件
router.go
文件定义 HTTP 端点,用于向队列添加任务:
package main
import (
"encoding/json"
"net/http"
"github.com/gin-gonic/gin"
"github.com/hibiken/asynq"
)
func setupRouter(client *asynq.Client) *gin.Engine {
r := gin.Default()
r.POST("/enqueue/email", func(c *gin.Context) {
var payload struct {
To string `json:"to"`
}
if err := c.ShouldBindJSON(&payload); err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": "请求体无效"})
return
}
jsonPayload, err := json.Marshal(payload)
if err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": "载荷序列化失败"})
return
}
task := asynq.NewTask("send_email", jsonPayload)
_, err = client.Enqueue(task)
if err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": "任务加入队列失败"})
return
}
c.JSON(http.StatusOK, gin.H{"message": "邮件任务已加入队列"})
})
r.POST("/enqueue/report", func(c *gin.Context) {
var payload struct {
ReportID int `json:"report_id"`
}
if err := c.ShouldBindJSON(&payload); err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": "请求体无效"})
return
}
jsonPayload, err := json.Marshal(payload)
if err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": "载荷序列化失败"})
return
}
task := asynq.NewTask("generate_report", jsonPayload)
_, err = client.Enqueue(task)
if err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": "任务加入队列失败"})
return
}
c.JSON(http.StatusOK, gin.H{"message": "报告任务已加入队列"})
})
return r
}
这段代码通过 Gin 框架提供两个 HTTP 端点来向任务队列添加任务。
创建 main.go
文件
main.go
文件将所有模块整合:
package main
import (
"context"
"log"
"net/http"
"os"
"os/signal"
"syscall"
"time"
"github.com/hibiken/asynq"
"github.com/robfig/cron/v3"
)
func main() {
c := cron.New()
server := asynq.NewServer(
asynq.RedisClientOpt{Addr: "localhost:6379"},
asynq.Config{
Concurrency: 10,
},
)
client := asynq.NewClient(asynq.RedisClientOpt{Addr: "localhost:6379"})
defer client.Close()
router := setupRouter(client)
httpServer := &http.Server{
Addr: ":8080",
Handler: router,
}
ctx, stop := context.WithCancel(context.Background())
defer stop()
quit := make(chan os.Signal, 1)
signal.Notify(quit, os.Interrupt, syscall.SIGTERM)
go runQueue(server)
go runCron(c)
go func() {
if err := httpServer.ListenAndServe(); err != nil && err != http.ErrServerClosed {
log.Fatalf("HTTP 服务器运行失败: %v", err)
}
}()
appShutdown(ctx, httpServer, c, server, quit)
}
func appShutdown(ctx context.Context, httpServer *http.Server, c *cron.Cron, server *asynq.Server, quit chan os.Signal) {
<-quit
log.Println("正在优雅关闭...")
httpCtx, httpCancel := context.WithTimeout(ctx, 5*time.Second)
defer httpCancel()
if err := httpServer.Shutdown(httpCtx); err != nil {
log.Printf("HTTP 服务器关闭错误: %v", err)
}
server.Shutdown()
c.Stop()
log.Println("应用已停止")
}
安装依赖
go mod tidy
构建与运行应用
go build -o run *.go && ./run
测试应用
访问以下端点以将任务加入队列:
http://localhost:8080/enqueue/email
http://localhost:8080/enqueue/report
终端中可查看任务执行日志。
评论(0)