XuLaLa.Tech

首页客户端下载Windows 使用V2Ray 教程SSR 教程Clash 教程

Golang 如何实现延迟队列?

2025.04.09

在高并发和异步处理场景中,延迟队列(Delayed Queue)是一种常见的解决方案。它允许任务在指定时间后执行,而不是立即执行。在 Golang 中,我们可以使用多种方式来实现延迟队列,本文将介绍3种常见的实现方式。

文章目录

  • 1 一、使用 time.AfterFunc 实现简单延迟队列
    • 1.1 适用场景
  • 2 二、使用 time.Timer + heap 实现优先级队列
    • 2.1 实现方法
    • 2.2 适用场景
  • 3 三、使用 Redis 的 zset 实现分布式延迟队列
    • 3.1 实现方法
    • 3.2 适用场景
  • 4 三、总结

一、使用 time.AfterFunc 实现简单延迟队列

time.AfterFunc 可以在指定时间后执行一个函数,这是实现延迟任务最简单的方法。
package main
import (
"fmt"
"time"
)
func main() {
fmt.Println("任务开始时间:", time.Now())
// 延迟 5 秒执行
time.AfterFunc(5*time.Second, func() {
fmt.Println("执行任务时间:", time.Now())
})
// 防止主 Goroutine 退出
time.Sleep(6 * time.Second)
}

适用场景

  • 适用于简单的延迟任务,任务量不大时使用。
缺点
  • 任务无法持久化,程序重启后会丢失。
  • 无法方便地管理和取消任务。

二、使用 time.Timer + heap 实现优先级队列

如果需要管理多个延迟任务,并且可能会动态增加或取消任务,可以使用 heap 实现一个优先级队列,让最早需要执行的任务在前面。

实现方法

  • 使用 container/heap 实现最小堆,堆顶存放最近需要执行的任务。
  • 使用 time.Timer 监听堆顶任务的执行时间。
  • 定时轮询任务队列,执行到期任务。
package main

import (

"container/heap"

"fmt"

"sync"

"time"

)

// 任务结构体

type Task struct {

execTime time.Time // 任务执行时间

fn func() // 任务函数

index int // 在堆中的索引

}

// 任务队列(最小堆)

type TaskQueue []*Task

func (tq TaskQueue) Len() int { return len(tq) }

func (tq TaskQueue) Less(i, j int) bool { return tq[i].execTime.Before(tq[j].execTime) }

func (tq TaskQueue) Swap(i, j int) {

tq[i], tq[j] = tq[j], tq[i]

tq[i].index, tq[j].index = i, j

}

func (tq *TaskQueue) Push(x interface{}) {

n := len(*tq)

task := x.(*Task)

task.index = n

*tq = append(*tq, task)

}

func (tq *TaskQueue) Pop() interface{} {

old := *tq

n := len(old)

task := old[n-1]

*tq = old[0 : n-1]

return task

}

// 延迟队列管理器

type DelayQueue struct {

taskQueue TaskQueue

timer *time.Timer

mu sync.Mutex

}

func NewDelayQueue() *DelayQueue {

dq := &DelayQueue{

taskQueue: TaskQueue{},

}

heap.Init(&dq.taskQueue)

return dq

}

func (dq *DelayQueue) PushTask(delay time.Duration, fn func()) {

dq.mu.Lock()

defer dq.mu.Unlock()

task := &Task{

execTime: time.Now().Add(delay),

fn: fn,

}

heap.Push(&dq.taskQueue, task)

// 如果新任务比当前队列中的任务更早执行,则重置 Timer

if len(dq.taskQueue) == 1 || task.index == 0 {

dq.resetTimer()

}

}

func (dq *DelayQueue) resetTimer() {

if len(dq.taskQueue) == 0 {

return

}

task := dq.taskQueue[0]

delay := time.Until(task.execTime)

if dq.timer != nil {

dq.timer.Stop()

}

dq.timer = time.AfterFunc(delay, dq.runTask)

}

func (dq *DelayQueue) runTask() {

dq.mu.Lock()

defer dq.mu.Unlock()

if len(dq.taskQueue) == 0 {

return

}

task := heap.Pop(&dq.taskQueue).(*Task)

go task.fn()

if len(dq.taskQueue) > 0 {

dq.resetTimer()

}

}

func main() {

dq := NewDelayQueue()

// 添加多个任务

dq.PushTask(3*time.Second, func() { fmt.Println("任务 1 执行时间:", time.Now()) })

dq.PushTask(1*time.Second, func() { fmt.Println("任务 2 执行时间:", time.Now()) })

dq.PushTask(5*time.Second, func() { fmt.Println("任务 3 执行时间:", time.Now()) })

// 保持程序运行

time.Sleep(6 * time.Second)

}

适用场景

  • 适用于多个延迟任务,任务可动态增加、删除,且需要持久化管理的场景。
优点
  • 任务按照执行时间排序,按需执行,避免资源浪费。
  • 适用于高并发场景,可以管理大量任务。
缺点
  • 任务未持久化,程序崩溃后会丢失。

三、使用 Redis 的 zset 实现分布式延迟队列

如果任务需要持久化,并且支持分布式架构,可以使用 Redis 的 zset(有序集合)来存储任务,并使用定时轮询的方式执行到期任务。

实现方法

  1. 使用 Redis zadd 命令存储任务,score 值为执行时间的时间戳。
  2. 定期使用 zrangebyscore 查询到期任务并执行。
  3. 执行后删除任务。
package main

import (

"fmt"

"time"

"github.com/go-redis/redis/v8"

"golang.org/x/net/context"

)

var ctx = context.Background()

var rdb = redis.NewClient(&redis.Options{

Addr: "localhost:6379",

})

func addTask(taskID string, delay time.Duration) {

execTime := time.Now().Add(delay).Unix()

rdb.ZAdd(ctx, "delay_queue", &redis.Z{Score: float64(execTime), Member: taskID})

}

func processTask() {

for {

now := time.Now().Unix()

tasks, _ := rdb.ZRangeByScore(ctx, "delay_queue", &redis.ZRangeBy{

Min: "0", Max: fmt.Sprintf("%d", now),

}).Result()

for _, task := range tasks {

fmt.Println("执行任务:", task, "时间:", time.Now())

rdb.ZRem(ctx, "delay_queue", task)

}

time.Sleep(1 * time.Second)

}

}

func main() {

go processTask()

addTask("任务1", 3*time.Second)

addTask("任务2", 1*time.Second)

addTask("任务3", 5*time.Second)

time.Sleep(6 * time.Second)

}

适用场景

  • 适用于分布式环境,需要持久化任务队列。
优点
  • 任务不会丢失,支持持久化存储。
  • 可扩展性强,适用于分布式任务队列。
缺点
  • 需要 Redis 依赖。
  • 轮询方式可能会带来一定的延迟。

三、总结

time.AfterFunc简单任务实现简单无法管理多个任务
heap + time.Timer多任务管理任务按时间排序,低资源占用任务未持久化
Redis zset分布式、持久化任务可持久化,支持集群需要 Redis,轮询可能有延迟

选择合适的实现方式,能帮助你更高效地管理延迟任务。

© 2010-2022 XuLaLa 保留所有权利 本站由 WordPress 强力驱动
请求次数:69 次,加载用时:0.665 秒,内存占用:32.19 MB