在高并发和异步处理场景中,延迟队列(Delayed Queue)是一种常见的解决方案。它允许任务在指定时间后执行,而不是立即执行。在 Golang 中,我们可以使用多种方式来实现延迟队列,本文将介绍3种常见的实现方式。
文章目录
time.AfterFunc
实现简单延迟队列time.AfterFunc
可以在指定时间后执行一个函数,这是实现延迟任务最简单的方法。package main
import (
"fmt"
"time"
)
func main() {
fmt.Println("任务开始时间:", time.Now())
// 延迟 5 秒执行
time.AfterFunc(5*time.Second, func() {
fmt.Println("执行任务时间:", time.Now())
})
// 防止主 Goroutine 退出
time.Sleep(6 * time.Second)
}
time.Timer
+ heap
实现优先级队列heap
实现一个优先级队列,让最早需要执行的任务在前面。container/heap
实现最小堆,堆顶存放最近需要执行的任务。time.Timer
监听堆顶任务的执行时间。package main
import (
"container/heap"
"fmt"
"sync"
"time"
)
// 任务结构体
type Task struct {
execTime time.Time // 任务执行时间
fn func() // 任务函数
index int // 在堆中的索引
}
// 任务队列(最小堆)
type TaskQueue []*Task
func (tq TaskQueue) Len() int { return len(tq) }
func (tq TaskQueue) Less(i, j int) bool { return tq[i].execTime.Before(tq[j].execTime) }
func (tq TaskQueue) Swap(i, j int) {
tq[i], tq[j] = tq[j], tq[i]
tq[i].index, tq[j].index = i, j
}
func (tq *TaskQueue) Push(x interface{}) {
n := len(*tq)
task := x.(*Task)
task.index = n
*tq = append(*tq, task)
}
func (tq *TaskQueue) Pop() interface{} {
old := *tq
n := len(old)
task := old[n-1]
*tq = old[0 : n-1]
return task
}
// 延迟队列管理器
type DelayQueue struct {
taskQueue TaskQueue
timer *time.Timer
mu sync.Mutex
}
func NewDelayQueue() *DelayQueue {
dq := &DelayQueue{
taskQueue: TaskQueue{},
}
heap.Init(&dq.taskQueue)
return dq
}
func (dq *DelayQueue) PushTask(delay time.Duration, fn func()) {
dq.mu.Lock()
defer dq.mu.Unlock()
task := &Task{
execTime: time.Now().Add(delay),
fn: fn,
}
heap.Push(&dq.taskQueue, task)
// 如果新任务比当前队列中的任务更早执行,则重置 Timer
if len(dq.taskQueue) == 1 || task.index == 0 {
dq.resetTimer()
}
}
func (dq *DelayQueue) resetTimer() {
if len(dq.taskQueue) == 0 {
return
}
task := dq.taskQueue[0]
delay := time.Until(task.execTime)
if dq.timer != nil {
dq.timer.Stop()
}
dq.timer = time.AfterFunc(delay, dq.runTask)
}
func (dq *DelayQueue) runTask() {
dq.mu.Lock()
defer dq.mu.Unlock()
if len(dq.taskQueue) == 0 {
return
}
task := heap.Pop(&dq.taskQueue).(*Task)
go task.fn()
if len(dq.taskQueue) > 0 {
dq.resetTimer()
}
}
func main() {
dq := NewDelayQueue()
// 添加多个任务
dq.PushTask(3*time.Second, func() { fmt.Println("任务 1 执行时间:", time.Now()) })
dq.PushTask(1*time.Second, func() { fmt.Println("任务 2 执行时间:", time.Now()) })
dq.PushTask(5*time.Second, func() { fmt.Println("任务 3 执行时间:", time.Now()) })
// 保持程序运行
time.Sleep(6 * time.Second)
}
zset
实现分布式延迟队列zset
(有序集合)来存储任务,并使用定时轮询的方式执行到期任务。zadd
命令存储任务,score
值为执行时间的时间戳。zrangebyscore
查询到期任务并执行。package main
import (
"fmt"
"time"
"github.com/go-redis/redis/v8"
"golang.org/x/net/context"
)
var ctx = context.Background()
var rdb = redis.NewClient(&redis.Options{
Addr: "localhost:6379",
})
func addTask(taskID string, delay time.Duration) {
execTime := time.Now().Add(delay).Unix()
rdb.ZAdd(ctx, "delay_queue", &redis.Z{Score: float64(execTime), Member: taskID})
}
func processTask() {
for {
now := time.Now().Unix()
tasks, _ := rdb.ZRangeByScore(ctx, "delay_queue", &redis.ZRangeBy{
Min: "0", Max: fmt.Sprintf("%d", now),
}).Result()
for _, task := range tasks {
fmt.Println("执行任务:", task, "时间:", time.Now())
rdb.ZRem(ctx, "delay_queue", task)
}
time.Sleep(1 * time.Second)
}
}
func main() {
go processTask()
addTask("任务1", 3*time.Second)
addTask("任务2", 1*time.Second)
addTask("任务3", 5*time.Second)
time.Sleep(6 * time.Second)
}
time.AfterFunc | 简单任务 | 实现简单 | 无法管理多个任务 |
heap + time.Timer | 多任务管理 | 任务按时间排序,低资源占用 | 任务未持久化 |
Redis zset | 分布式、持久化 | 任务可持久化,支持集群 | 需要 Redis,轮询可能有延迟 |
选择合适的实现方式,能帮助你更高效地管理延迟任务。