什么是任务队列 ?
消息队列(Message Queue),一般来说知道的人不少。比如常见的:kafka、Rabbitmq、RocketMQ 等。
任务队列(Task Queue),听说过这个概念的人不会太多,清楚它的概念的人怕是更少。
这两个概念是有关系的,他们是怎样的关系呢?任务队列(Task Queue)是消息队列(Message Queue)的超集。任务队列是构建在消息队列之上的。消息队列是任务队列的一部分。
1. 使用 先看一遍:https://github.com/hibiken/asynq/wiki/Getting-Started
在 asynq
中,工作单元封装在称为 Task
的类型中,它概念上具有两个字段:Type
和 Payload
。
1 2 3 4 5 func (t *Task) Type () string func (t *Task) Payload () [] byte
1.1 生产 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 type EmailTaskPayload struct { UserID int } func main () { client := asynq.NewClient(asynq.RedisClientOpt{Addr: "localhost:6379" }) payload, err := json.Marshal(EmailTaskPayload{UserID: 42 }) if err != nil { log.Fatal(err) } t1 := asynq.NewTask("email:welcome" , payload) t2 := asynq.NewTask("email:reminder" , payload) info, err := client.Enqueue(t1) if err != nil { log.Fatal(err) } log.Printf(" [*] 成功将任务加入队列:%+v" , info) info, err = client.Enqueue(t2, asynq.ProcessIn(24 *time.Hour)) if err != nil { log.Fatal(err) } log.Printf(" [*] 成功将任务加入队列:%+v" , info) }
1.2 消费 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 func main () { srv := asynq.NewServe( asynq.RedisClientOpt{Addr: "localhost:6379" }, asynq.Config{Concurrency: 10 }, ) if err := srv.Run(asynq.HandlerFunc(handler)); err != nil { log.Fatal(err) } } func handler (ctx context.Context, t *asynq.Task) error { switch t.Type() { case "email:welcome" : var p EmailTaskPayload if err := json.Unmarshal(t.Payload(), &p); err != nil { return err } log.Printf(" [*] 给用户 %d 发送欢迎邮件" , p.UserID) case "email:reminder" : var p EmailTaskPayload if err := json.Unmarshal(t.Payload(), &p); err != nil { return err } log.Printf(" [*] 给用户 %d 发送提醒邮件" , p.UserID) default : return fmt.Errorf("意外的任务类型:%s" , t.Type()) } return nil }
2. 概念 2.1 task 生命周期 参考:https://github.com/hibiken/asynq/wiki/Life-of-a-Task
当您将任务加入队列时,asynq内部管理任务,以确保在指定时间调用处理程序处理任务。在此过程中,任务可能经历不同的生命周期状态。
以下是不同生命周期状态的列表:
Scheduled :任务正在等待未来处理(仅适用于具有ProcessAt
或ProcessIn
选项的任务)。Pending :任务已准备好进行处理,并将由一个空闲的工作器接收。Active :任务正在被工作器处理(即处理程序正在处理该任务)。Retry :工作器无法处理任务,任务正在等待将来重试。Archived :任务达到最大重试次数,并存储在归档中以供手动检查。Completed :任务已成功处理,并保留到保留时间到期为止(仅适用于具有Retention
选项的任务)。1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 +-------------+ +--------------+ +--------------+ +-------------+ | | | | | | Success | | | Scheduled |----------->| Pending |--------->| Active |---------> | Completed | | (Optional) | | | | | | (Optional) | +-------------+ +--------------+ +--------------+ +-------------+ ^ | | | | | Deletion | | Failed | | | V | | | | +------+-------+ | +--------------+ | | | | | | Retry |<--------------+------->| Archived | | | | | +--------------+ +--------------+
一旦一个任务耗尽它的重试次数,任务将转为Archived 状态,并且将不会再次重试(您仍然可以使用 CLI 或 WebUI 工具手动运行存档的任务)。
3. 监控 3.1 web ui 参考:https://github.com/hibiken/asynqmon
1 ./asynqmon --redis-url=redis://:@127.0.01:6379
3.2 asynq cli 参考:https://github.com/hibiken/asynq/blob/master/tools/asynq/README.md
1 go install github.com/hibiken/asynq/tools/asynq@latest
By default, asynq
will try to read config file located in $HOME/.asynq.(yml|json)
. You can specify the file location via --config
flag. Config file example:
1 2 3 uri: 127.0.0.1:6379 db: 2 password: mypassword
This will set the default values for --uri
, --db
, and --password
flags.
1 asynq task list --queue=local.abc:default --state=archived
4. 参考教程