首页
Preview

使用Go 实现Redis 任务队列和定时任务调度

在该篇文章中,我们将使用 github.com/hibiken/asynq 包将任务加入 Redis 队列,并通过 github.com/robfig/cron 包创建定时任务调度器。本指南将逐步介绍如何设置任务队列、安排定时任务以及实现优雅的程序关闭。


初始化模块

首先,为项目创建一个新的 Go 模块:

go mod init learn_queue_and_cron

创建 cron.go 文件

cron.go 文件负责在特定的时间间隔内调度和运行任务。以下是实现代码:

package main

import (
    "fmt"
    "log"
    "time"

    "github.com/robfig/cron/v3"
)

func runCron(c *cron.Cron) {
    // 每分钟执行一次任务
    _, err := c.AddFunc("@every 1m", func() {
        fmt.Printf("每分钟执行任务: %v\n", time.Now().Local())
    })
    if err != nil {
        log.Fatal(err)
    }

    // 启动定时调度器
    c.Start()
    log.Println("定时任务调度器已启动")

    // 保持主协程运行
    select {}
}

这段代码每分钟执行一次任务,并确保程序持续运行以支持调度器工作。


创建 queue.go 文件

queue.go 文件管理任务队列的处理,代码如下:

package main

import (
    "context"
    "encoding/json"
    "fmt"
    "log"

    "github.com/hibiken/asynq"
)

func runQueue(server *asynq.Server) {
    mux := asynq.NewServeMux()
    mux.HandleFunc("send_email", emailHandler)
    mux.HandleFunc("generate_report", reportHandler)

    if err := server.Run(mux); err != nil {
        log.Fatalf("Asynq 服务器运行失败: %v", err)
    }
}

func emailHandler(ctx context.Context, task *asynq.Task) error {
    var payload struct {
        To string `json:"to"`
    }
    if err := json.Unmarshal(task.Payload(), &payload); err != nil {
        return fmt.Errorf("解析任务载荷失败: %w", err)
    }
    fmt.Printf("发送邮件至: %s\n", payload.To)
    return nil
}

func reportHandler(ctx context.Context, task *asynq.Task) error {
    var payload struct {
        ReportID int `json:"report_id"`
    }
    if err := json.Unmarshal(task.Payload(), &payload); err != nil {
        return fmt.Errorf("解析任务载荷失败: %w", err)
    }
    fmt.Printf("生成报告 ID: %d\n", payload.ReportID)
    return nil
}

解释:

  • emailHandlerreportHandler 负责解析任务的载荷并执行相应操作。
  • 任务队列通过 Asynq 定义和处理,如 send_emailgenerate_report

创建 router.go 文件

router.go 文件定义 HTTP 端点,用于向队列添加任务:

package main

import (
    "encoding/json"
    "net/http"

    "github.com/gin-gonic/gin"
    "github.com/hibiken/asynq"
)

func setupRouter(client *asynq.Client) *gin.Engine {
    r := gin.Default()

    r.POST("/enqueue/email", func(c *gin.Context) {
        var payload struct {
            To string `json:"to"`
        }
        if err := c.ShouldBindJSON(&payload); err != nil {
            c.JSON(http.StatusBadRequest, gin.H{"error": "请求体无效"})
            return
        }

        jsonPayload, err := json.Marshal(payload)
        if err != nil {
            c.JSON(http.StatusInternalServerError, gin.H{"error": "载荷序列化失败"})
            return
        }

        task := asynq.NewTask("send_email", jsonPayload)
        _, err = client.Enqueue(task)
        if err != nil {
            c.JSON(http.StatusInternalServerError, gin.H{"error": "任务加入队列失败"})
            return
        }

        c.JSON(http.StatusOK, gin.H{"message": "邮件任务已加入队列"})
    })

    r.POST("/enqueue/report", func(c *gin.Context) {
        var payload struct {
            ReportID int `json:"report_id"`
        }
        if err := c.ShouldBindJSON(&payload); err != nil {
            c.JSON(http.StatusBadRequest, gin.H{"error": "请求体无效"})
            return
        }

        jsonPayload, err := json.Marshal(payload)
        if err != nil {
            c.JSON(http.StatusInternalServerError, gin.H{"error": "载荷序列化失败"})
            return
        }

        task := asynq.NewTask("generate_report", jsonPayload)
        _, err = client.Enqueue(task)
        if err != nil {
            c.JSON(http.StatusInternalServerError, gin.H{"error": "任务加入队列失败"})
            return
        }

        c.JSON(http.StatusOK, gin.H{"message": "报告任务已加入队列"})
    })

    return r
}

这段代码通过 Gin 框架提供两个 HTTP 端点来向任务队列添加任务。


创建 main.go 文件

main.go 文件将所有模块整合:

package main

import (
    "context"
    "log"
    "net/http"
    "os"
    "os/signal"
    "syscall"
    "time"

    "github.com/hibiken/asynq"
    "github.com/robfig/cron/v3"
)

func main() {
    c := cron.New()

    server := asynq.NewServer(
        asynq.RedisClientOpt{Addr: "localhost:6379"},
        asynq.Config{
            Concurrency: 10,
        },
    )

    client := asynq.NewClient(asynq.RedisClientOpt{Addr: "localhost:6379"})
    defer client.Close()

    router := setupRouter(client)

    httpServer := &http.Server{
        Addr:    ":8080",
        Handler: router,
    }

    ctx, stop := context.WithCancel(context.Background())
    defer stop()
    quit := make(chan os.Signal, 1)
    signal.Notify(quit, os.Interrupt, syscall.SIGTERM)

    go runQueue(server)
    go runCron(c)
    go func() {
        if err := httpServer.ListenAndServe(); err != nil && err != http.ErrServerClosed {
            log.Fatalf("HTTP 服务器运行失败: %v", err)
        }
    }()

    appShutdown(ctx, httpServer, c, server, quit)
}

func appShutdown(ctx context.Context, httpServer *http.Server, c *cron.Cron, server *asynq.Server, quit chan os.Signal) {
    <-quit
    log.Println("正在优雅关闭...")

    httpCtx, httpCancel := context.WithTimeout(ctx, 5*time.Second)
    defer httpCancel()
    if err := httpServer.Shutdown(httpCtx); err != nil {
        log.Printf("HTTP 服务器关闭错误: %v", err)
    }

    server.Shutdown()
    c.Stop()

    log.Println("应用已停止")
}

安装依赖

go mod tidy

构建与运行应用

go build -o run *.go && ./run

测试应用

访问以下端点以将任务加入队列:

  • http://localhost:8080/enqueue/email
  • http://localhost:8080/enqueue/report

终端中可查看任务执行日志。

版权声明:本文内容由TeHub注册用户自发贡献,版权归原作者所有,TeHub社区不拥有其著作权,亦不承担相应法律责任。 如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。

点赞(0)
收藏(0)
norvia
爱读书

评论(0)

添加评论