0%

asynq任务队列的使用

什么是任务队列 ?

消息队列(Message Queue),一般来说知道的人不少。比如常见的:kafka、Rabbitmq、RocketMQ 等。

任务队列(Task Queue),听说过这个概念的人不会太多,清楚它的概念的人怕是更少。

这两个概念是有关系的,他们是怎样的关系呢?任务队列(Task Queue)是消息队列(Message Queue)的超集。任务队列是构建在消息队列之上的。消息队列是任务队列的一部分。

1. 使用

先看一遍:https://github.com/hibiken/asynq/wiki/Getting-Started

asynq 中,工作单元封装在称为 Task 的类型中,它概念上具有两个字段:TypePayload

1
2
3
4
5
// Type是一个字符串值,用于指示任务的类型。
func (t *Task) Type() string

// Payload是任务执行所需的数据。
func (t *Task) Payload() []byte

1.1 生产

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
// 与电子邮件相关任务的有效负载。
type EmailTaskPayload struct {
// 电子邮件接收者的ID。
UserID int
}

// client.go
func main() {
client := asynq.NewClient(asynq.RedisClientOpt{Addr: "localhost:6379"})

// 创建带有类型名称和有效负载的任务。
payload, err := json.Marshal(EmailTaskPayload{UserID: 42})
if err != nil {
log.Fatal(err)
}
t1 := asynq.NewTask("email:welcome", payload)
t2 := asynq.NewTask("email:reminder", payload)

// 立即处理任务。
info, err := client.Enqueue(t1)
if err != nil {
log.Fatal(err)
}
log.Printf(" [*] 成功将任务加入队列:%+v", info)

// 在24小时后处理任务。
info, err = client.Enqueue(t2, asynq.ProcessIn(24*time.Hour))
if err != nil {
log.Fatal(err)
}
log.Printf(" [*] 成功将任务加入队列:%+v", info)
}

1.2 消费

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
// workers.go
func main() {
srv := asynq.NewServe(
asynq.RedisClientOpt{Addr: "localhost:6379"},
asynq.Config{Concurrency: 10},
)

// 使用asynq.HandlerFunc适配器来处理函数
if err := srv.Run(asynq.HandlerFunc(handler)); err != nil {
log.Fatal(err)
}
}


func handler(ctx context.Context, t *asynq.Task) error {
switch t.Type() {
case "email:welcome":
var p EmailTaskPayload
if err := json.Unmarshal(t.Payload(), &p); err != nil {
return err
}
log.Printf(" [*] 给用户 %d 发送欢迎邮件", p.UserID)

case "email:reminder":
var p EmailTaskPayload
if err := json.Unmarshal(t.Payload(), &p); err != nil {
return err
}
log.Printf(" [*] 给用户 %d 发送提醒邮件", p.UserID)

default:
return fmt.Errorf("意外的任务类型:%s", t.Type())
}
return nil
}


2. 概念

2.1 task 生命周期

参考:https://github.com/hibiken/asynq/wiki/Life-of-a-Task

当您将任务加入队列时,asynq内部管理任务,以确保在指定时间调用处理程序处理任务。在此过程中,任务可能经历不同的生命周期状态。

以下是不同生命周期状态的列表:

  • Scheduled:任务正在等待未来处理(仅适用于具有ProcessAtProcessIn选项的任务)。
  • Pending:任务已准备好进行处理,并将由一个空闲的工作器接收。
  • Active:任务正在被工作器处理(即处理程序正在处理该任务)。
  • Retry:工作器无法处理任务,任务正在等待将来重试。
  • Archived:任务达到最大重试次数,并存储在归档中以供手动检查。
  • Completed:任务已成功处理,并保留到保留时间到期为止(仅适用于具有Retention选项的任务)。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
+-------------+            +--------------+          +--------------+           +-------------+
| | | | | | Success | |
| Scheduled |----------->| Pending |--------->| Active |---------> | Completed |
| (Optional) | | | | | | (Optional) |
+-------------+ +--------------+ +--------------+ +-------------+
^ | |
| | | Deletion
| | Failed |
| | V
| |
| |
+------+-------+ | +--------------+
| | | | |
| Retry |<--------------+------->| Archived |
| | | |
+--------------+ +--------------+

一旦一个任务耗尽它的重试次数,任务将转为Archived状态,并且将不会再次重试(您仍然可以使用 CLI 或 WebUI 工具手动运行存档的任务)。

3. 监控

3.1 web ui

参考:https://github.com/hibiken/asynqmon

1
./asynqmon --redis-url=redis://:@127.0.01:6379

3.2 asynq cli

参考:https://github.com/hibiken/asynq/blob/master/tools/asynq/README.md

1
go install github.com/hibiken/asynq/tools/asynq@latest

By default, asynq will try to read config file located in $HOME/.asynq.(yml|json). You can specify the file location via --config flag. Config file example:

1
2
3
uri: 127.0.0.1:6379
db: 2
password: mypassword

This will set the default values for --uri, --db, and --password flags.

1
asynq task list --queue=local.abc:default --state=archived

4. 参考教程

可以加首页作者微信,咨询相关问题!