大家好,我是渔夫子。
今天给大家推荐一个基于redis
实现的简单、可靠且高效的分布式任务队列:asynq。该队列出自谷歌员工Ken Hibino。
项目的开源地址:https://github.com/hibiken/asynq ,星标star 6.1k,目前有29位贡献者。
基于redis的集群,支持哨兵模式。因此具备了存储可横向扩展及高可用性。以下是asynq运行的顶层设计原理:
以下是该包的架构图:
接下来,我们通过基于async包来编写一个客户端程序和一个服务端程序,用于发送消息和消费消息。
通过go get命令安装包
go get -u github.com/hibiken/asynq
首先,我们来看一下在使用该包时要用到的两个核心数据类型,
asynq
包使用Redis作为消息代理,我们称之为message broker
。在和redis建立连接时,通过该类型来自定和redis建立连接的一些属性。比如redis的地址、密码等。
redisConnOpt := async.RedisClientOpt{
Addr: "localhost:6379",
// Omit if no password is required
Password: "mypassword",
// Use a dedicated db number for asynq.
// By default, Redis offers 16 databases (0..15)
DB: 0,
}
在asynq
中,一个工作单元被包装成一个Task
,在Task
类型中有两个字段:Type
和Payload
。
// Type是一个字符串类型,代表该任务的类型.
func (t *Task) Type() string
// Payload 是消息体.
func (t *Task) Payload() []byte
往队列中发送消息包括3步:和redis建立连接、构建任务Task、发送Task到redis。
通过NewClient
函数和redis
建立连接
client := asynq.NewClient(asynq.RedisClientOpt{Addr: "localhost:6379"})
在和redis建立连接时,通过asynq.RedisClusterClientOpt
结构体配置集群。如下:
client := asynq.NewClient(asynq.RedisClusterClientOpt{
Addrs: []string{":7000", ":7001", ":7002", ":7003", ":7004", ":7005"},
})
同样,在消费侧和redis建立连接的时候也采用集群模式。
为了支持高可用性,在出现故障时redis服务能够起到自动恢复的目的,asynq包还支持redis的哨兵模式。如下:
var redis = &asynq.RedisFailoverClientOpt{
MasterName: "mymaster",
SentinelAddrs: []string{"localhost:5000", "localhost:5001", "localhost:5002"},
}
client := asynq.NewClient(redis)
通过NewTask构建任务。但在构建任务之前,需要先根据自己的业务定义消息体的结构。这里我们定义了一个EmailTaskPayload
的消息体结构,被消费时用于业务处理的数据。
type EmailTaskPayload struct {
// ID for the email recipient.
UserID int
}
payload, err := json.Marshal(EmailTaskPayload{UserID: 42})
//构建了2个任务,一个任务类型为"email:welcome"
t1 := asynq.NewTask("email:welcome", payload)
//构建任务,任务类型是 "email:reminder"
t2 := asynq.NewTask("email:reminder", payload)
通过Enqueue
函数就能将任务发送至队列,并且会被立即消费。
info, err := client.Enqueue(t1)
有时候,我们不希望任务立即被消费,而是等待一定的时间后再被消费,也就是常说的延时消费或延时队列。在asynq包中有两种方法可以实现。
通过ProcessIn
函数指定一个时间段,如下,代表是在24小时后再被消费。
info, err = client.Enqueue(t2, asynq.ProcessIn(24*time.Hour))
通过ProcessAt
函数指定一个具体的时间点,如下,也是代表在24小时后再被消费。
info, err = client.Enqueue(t2, asynq.ProcessAt(time.Now().Add(24*time.Hour))
通过Retention
函数指定一个时间段,代表当任务被处理完后,还可以保留一定的时间。如下是当task3被成功消费后再保留2个小时。
client.Enqueue(task3, asynq.Retention(2*time.Hour))
通过Queue
选项函数指定将消息发送至哪个队列。如下,将消息发送至redis的名为“high”的队列中。
client.Enqueue(t2, asynq.Queue("high"))
要消费redis队列中的任务,首先要和redis建立连接,我们称之为server。通过以下方式和redis服务实例以及要订阅的队列进行关联。
func main() {
srv := asynq.NewServer(
asynq.RedisClientOpt{Addr: "localhost:6379"},
)
// NOTE: We'll cover what this `handler` is in the section below.
if err := srv.Run(handler); err != nil {
log.Fatal(err)
}
}
比如,我们要消费redis中的名为"high"、"slow"两个队列,那么通过asynq.Config结构体指定即可。如下:
srv := asynq.NewServer(
asynq.RedisClientOpt{Addr: "localhost:6379"},
asynq.Config{
Queues: map[string]int{
"high": 6,
"slow": 4,
}
},
)
队列中的6和4代表两个队列的优先级别。
在NewServer中,通过asynq.Config
结构体中的Concurrency
字段,可以指定启动的worker数量。多个worker可以并发的消费队列中的任务。如下,代表启动10个worker。如果不指定该配置参数,默认启动的worker数量是服务器的CPU个数。
srv := asynq.NewServer(
asynq.RedisClientOpt{Addr: "localhost:6379"},
asynq.Config{
Concurrency: 10,
Queues: map[string]int{
"high": 6,
"slow": 4,
}
},
)
从redis队列中获取到任务后应该如何被处理呢?就是在执行server的Run方法时指定一个Handler
,该Handler
是一个接口类型,需要实现如下接口:
type Handler interface {
ProcessTask(context.Context, *Task) error
}
接下来,我们定义一个实现了该接口的类型handler,在该handler中执行具体的业务处理。在该handler中,根据接收到的任务类型做不同的处理。
func handler(ctx context.Context, t *asynq.Task) error {
switch t.Type() {
case "email:welcome":
var p EmailTaskPayload
if err := json.Unmarshal(t.Payload(), &p); err != nil {
return err
}
log.Printf(" [*] Send Welcome Email to User %d", p.UserID)
case "email:reminder":
var p EmailTaskPayload
if err := json.Unmarshal(t.Payload(), &p); err != nil {
return err
}
log.Printf(" [*] Send Reminder Email to User %d", p.UserID)
default:
return fmt.Errorf("unexpected task type: %s", t.Type())
}
return nil
}
然后,启动Server,指定该handler。如下:
func main() {
srv := asynq.NewServer(
asynq.RedisClientOpt{Addr: "localhost:6379"},
asynq.Config{Concurrency: 10},
)
// Use asynq.HandlerFunc adapter for a handler function
if err := srv.Run(asynq.HandlerFunc(handler)); err != nil {
log.Fatal(err)
}
}
上面的任务消费的handler的代码可读性比较低。在asynq包中还提供了一个Mux
类型,用于进行注册任务类型和对应的消费逻辑的功能。如下:
func main() {
srv := asynq.NewServer(
asynq.RedisClientOpt{Addr: "localhost:6379"},
asynq.Config{Concurrency: 10},
)
mux := asynq.NewServeMux()
mux.HandleFunc("email:welcome", sendWelcomeEmail)
mux.HandleFunc("email:reminder", sendReminderEmail)
if err := srv.Run(mux); err != nil {
log.Fatal(err)
}
}
在该代码中,通过mux.HandleFunc
函数分别注册了任务类型email:welcome
的任务的处理逻辑是sendWelcomeEmail
函数;任务类型是email:reminder
的任务的处理逻辑是sendReminderEmail
函数。
接下来我们再看两个处理函数的定义。如下:
func sendWelcomeEmail(ctx context.Context, t *asynq.Task) error {
var p EmailTaskPayload
if err := json.Unmarshal(t.Payload(), &p); err != nil {
return err
}
log.Printf(" [*] Send Welcome Email to User %d", p.UserID)
return nil
}
func sendReminderEmail(ctx context.Context, t *asynq.Task) error {
var p EmailTaskPayload
if err := json.Unmarshal(t.Payload(), &p); err != nil {
return err
}
log.Printf(" [*] Send Reminder Email to User %d", p.UserID)
return nil
}
asynq包还支持批量处理任务。即在入队上将多个任务发送到相同的组和相同的队列。在消费的时候就可以通过指定的聚合函数将多个任务聚合成一个任务,再发送给任务处理器进行处理。
当达到以下两个中的任意一个条件时,就可以进行聚合并发送给任务处理器:
在发送任务时,通过Group函数指定任务所属组:
// Enqueue three tasks to the same group.
client.Enqueue(task1, asynq.Queue("notifications"), asynq.Group("user1:email"))
client.Enqueue(task2, asynq.Queue("notifications"), asynq.Group("user1:email"))
client.Enqueue(task3, asynq.Queue("notifications"), asynq.Group("user1:email"))
在进行消费启动服务时,可以指定聚合相关的函数选项。如下:
// This function is used to aggregate multiple tasks into one.
func aggregate(group string, tasks []*asynq.Task) *asynq.Task {
// ... Your logic to aggregate the given tasks and return the aggregated task.
// ... Use NewTask(typename, payload, opts...) to create a new task and set options if needed.
// ... (Note) Queue option will be ignored and the aggregated task will always be enqueued to the same queue the group belonged.
}
srv := asynq.NewServer(
redisConnOpt,
asynq.Config{
GroupAggregator: asynq.GroupAggregatorFunc(aggregate), //指定聚合函数为aggregate
GroupMaxDelay: 10 * time.Minute, //最晚10分钟聚合一次
GroupGracePeriod: 2 * time.Minute, //每2分钟聚合一次
GroupMaxSize: 20, // 每20个任务聚合一次
Queues: map[string]int{"notifications": 1},
},
)
// 相同组的聚合函数,最终将多个任务组成一个任务,发送个任务处理器
func aggregate(group string, tasks []*asynq.Task) *asynq.Task {
log.Printf("Aggregating %d tasks from group %q", len(tasks), group)
var b strings.Builder
for _, t := range tasks {
b.Write(t.Payload())
b.WriteString("\n")
}
return asynq.NewTask("aggregated-task", []byte(b.String()))
}
一个完善的队列系统,监控是必不可少的。asynq包还配备了两种形式的监控:webUI和命令行工具。
webUI监控是通过开源的asynqmon包实现的,地址是https://github.com/hibiken/asynqmon。如下图所示:
image.png
image.png
另外一种就是命令行监控。通过go install安装asynq工具,如下:
go install github.com/hibiken/asynq/tools/asynq
然后在终端输入命令 asynq dash
启动 命令行的dashboard。效果如下:
好了,以上就是给大家分享的基于redis的分布式队列包。更多功能请查看文档:https://github.com/hibiken/asynq/wiki
特别说明:你的关注,是我写下去的最大动力。点击下方公众号卡片,直接关注。关注送《100个go常见的错误》pdf文档、经典go学习资料。