Grafana是监控领域比较出名的开源可视化套件,笔者最近在阅读grafana后台源码,里面有很多值得我们学习借鉴的地方,这里通过文章记录下来。
一、服务注册及初始化
grafana把各种功能抽象成一个个逻辑service,比如登录是loginService,对数据库的操作是sqlstorageService,暴露API给前端访问的是HTTPService,这些service在程序起来时会完成NewService实例创建及初始化。这里有个技巧,这些service都实现了BackgroundService这个interface,BackgroundService内部定义了Init()、Run()、Stop()等函数,这些service会在包所在的init函数里通过Register的方式进行统一的注册,注册好后在主函数里统一执行service的Init()、Run(),进而完成每个service的初始化与启动。
// pkg/services/auth/auth_token.go
package auth
const ServiceName = "UserAuthTokenService"
func init() {
registry.Register(®istry.Descriptor{
Name: ServiceName,
Instance: &UserAuthTokenService{},
InitPriority: registry.Medium,
})
}
grafana提供了专门用于service注册的函数,registry.Register(),实际的注册逻辑也非常的简单,即放到一个service数组里
// pkg/registry/registry.go
package registry
type Descriptor struct {
Name string
Instance Service
InitPriority Priority
}
var services []*Descriptor
func Register(descriptor *Descriptor) {
if descriptor == nil {
return
}
// Overwrite any existing equivalent service
for i, svc := range services {
if svc.Name == descriptor.Name {
services[i] = descriptor
return
}
}
services = append(services, descriptor)
}
这里的service都实现了BackgroundService接口,该接口里有Init()、Run()等函数,注册后就可以在主函数里遍历执行每个service里的Init()和Run()。这里service还能加上权重,主函数在执行前先按权重sort排序后按权重的大小先后执行,这里可以解决service间依赖需要顺序启动的问题。
这种service初始化和启动方式在golang开发中经常使用,流程如下:
二、依赖注入进行成员变量初始化
通过上面可以看到service的实例创建是在各自的init函数里,但这里的实例创建只初始化了实例里很少的参数,比如像HTTPService struct里成员参数非常的多,但在init函数里并没有显式初始化这些成员变量,那么其他成员变量是怎么完成初始化的呢?
可以看到HTTPServer struct里有很多的inject tag,没错,这里就是通过依赖注入的方式最终完成实例的初始化,这里的依赖注入使用的是facebook的inject库(https://github.com/facebookarchive/inject)。有兴趣的同学可以看下,代码量比较短,主要用到了golang的反射机制。
依赖注入对于复杂对象的初始化,也是常用的写法,如果通过手工的方式来显式初始化,无疑是非常繁琐的。众所周知,golang反射是比较耗费性能的,但这里只是服务启动实例初始化时一次使用,后面没有用到,因此这里的性能损耗是可以接受的。
inject的使用方法比较简单,代码如下:
func BuildServiceGraph(objs []interface{}, services []*Descriptor) error {
if services == nil {
services = GetServices()
}
for _, service := range services {
objs = append(objs, service.Instance)
}
serviceGraph := inject.Graph{}
// Provide services and their dependencies to the graph.
for _, obj := range objs {
if err := serviceGraph.Provide(&inject.Object{Value: obj}); err != nil {
return fmt.Errorf("failed to provide object to the graph: %w", err)
}
}
// Resolve services and their dependencies.
if err := serviceGraph.Populate(); err != nil {
return fmt.Errorf("failed to populate service dependencies: %w", err)
}
if err := service.Instance.Init(); err != nil {
return fmt.Errorf("service init failed: %w", err)
}
}
return nil
}
基本的写法大概是这样,需要注意的是,使用Inject tag的成员变量首字母需要大写,这也好理解,因为这里使用的是golang反射机制,因此成员变量首字母必须大写。另外还有一个需要注意的,一个struct里同种类型的Inject tag只能有一个,这也好理解,反射是通过type找到对应的成员变量的,若同个tyep的成员变量有多个,则无法对应上。
三、消息总线
grafana内部服务间的通讯(service间的函数调用)是通过消息总线(也叫bus)进行的。grafana内部定义了三种类型的消息:事件(event)、命令(cmd)、查询(query),以上三种消息均通过总线进行投递(dispatch)。总线投递消息并非异步,而是同步的。
以grafana查询用户信息为例, 比较常见的写法,loginService内部有一个queryUserInfo函数,需要查询用户信息的地方则调用queryUserInfo函数
func (ls *loginService) queryUserInfo (userName string) (*UserInfo, error) {
// 查询逻辑
}
因为这里的queryUserInfo是属于loginService的,那么需要用到queryUserInfo的地方都需要有实例化的loginService,总体来说具有一定的耦合度。
而使用消息总线逻辑会比较清晰,耦合度也会比较低,这里还是以查询用户信息为例:
// src/services/loginService.go
import (
"demo/bus"
)
type loginService struct {
// loginService
}
func (ls *loginService) Init() {
bus.AddHandler("login", queryUserInfo)
// other handler...
}
// queryUserInfo 查询用户信息
func (ls *loginService) queryUserInfo(query *models.UserInfoQuery) error {
// 主逻辑
result, err := ls.queryByUserName(query.UserName)
if err != nil {
return err
}
query.Result = result // 查询结果放到query结构体里
return nil
}
// src/models/userInfo.go
// UserInfoQuery 查询
type UserInfoQuery struct {
UserName string
Result *UserInfo // 查询结果
}
// UserInfo 用户信息
type UserInfo struct {
UserName string
Password string
OrgID int64
OrgName string
// other ...
}
// src/bus/bus.go
type HandlerFunc interface{}
var handlers = make(map[string]HandlerFunc)
// AddHandler 把handler加入到消息总线
func AddHandler(handler HandlerFunc) {
handlerType := reflect.TypeOf(handler)
queryTypeName := handlerType.In(0).Elem().Name()
b.handlers[queryTypeName] = handler
}
type Msg interface {}
// Dispatch dispatch msg to the bus
func Dispatch(msg Msg) error {
var msgName = reflect.TypeOf(msg).Elem().Name()
handler := b.handlersWithCtx[msgName]
var params = []reflect.Value{}
params = append(params, reflect.ValueOf(msg))
ret := reflect.ValueOf(handler).Call(params)
err := ret[0].Interface()
if err == nil {
return nil
}
return err.(error)
}
// DispatchCtx 带ctx的dispatch
func DispatchCtx(ctx context.Context, msg Msg) error {
// ...
}
整体的流程如下所示,由此可见,grafana使用的消息总线并非实际意义上的消息总线,这里是一种设计模式的写法,使用到了观察者模式。
grafana消息总线使用了golang反射机制,对性能有一定的影响,但消息总线的写法是可以学习的并且经常使用到。
这里以笔者开发的拨测项目为例,在实际的需求中,需要开发一个后台管理工具,可以对拨测任务的标签和计费情况进行增删改,并且希望后面还可以在这个工具上添加别的功能,比如任务迁移等等。
// main.go
func main() {
// 计费管理工具
billingService := billing.NewBillingService(database, cfg)
billingService.Init()
// tag标签管理工具
tagService := tag.NewTagService(database, cfg.Tag)
tagService.Init()
tagCmd := flag.String("tag", "", "Tag标签管理工具")
billingCmd := flag.String("billing", "", "计费管理工具")
flag.Parse()
if *tagCmd != "" {
if err := bus.Dispatch(context.Background(), &models.Command{
ServiceType: "tag",
Cmd: *tagCmd,
Args: flag.Args(),
}); err != nil {
fmt.Println("exec tag cmd failed, errMsg:", err)
os.Exit(1)
}
}
if *billingCmd != "" {
if err := bus.Dispatch(context.Background(), &models.Command{
ServiceType: "billing",
Cmd: *billingCmd,
Args: flag.Args(),
}); err != nil {
fmt.Println("exec billing cmd failed, errMsg:", err)
os.Exit(1)
}
}
}
// bus.go
package bus
import (
"context"
"fmt"
)
type HandlerFunc func(ctx context.Context, args []string) error
var handlers map[string]HandlerFunc
// Dispatch 执行命令
func Dispatch(ctx context.Context, cmd *models.Command) error {
key := fmt.Sprintf("%s_%s", cmd.ServiceType, cmd.Cmd)
handler := handlers[key]
return handler(ctx, cmd.Args)
}
// AddHandler 加入handler
func AddHandler(serviceType string, cmd string, handler HandlerFunc) {
key := fmt.Sprintf("%s_%s", serviceType, cmd)
handlers[key] = handler
}
func init() {
handlers = make(map[string]HandlerFunc)
}
// tag.go
// 标签管理
// TagService tag标签service
type TagService struct {
// ...
}
// NewTagService 新建service
func NewTagService(db db.DB, cfg cfg.TagConfiguration) *TagService {
// ...
}
// Init 初始化,注册处理handler
func (ts *TagService) Init() {
bus.AddHandler(serviceType, "list", ts.list)
bus.AddHandler(serviceType, "delete", ts.delete)
bus.AddHandler(serviceType, "update", ts.update)
}
// list 列出任务标签
func (ts *TagService) list(ctx context.Context, args []string) error {
// ...
}
// delete 删除任务标签
func (ts *TagService) delete(ctx context.Context, args []string) error {
// ...
}
// update 更新标签
func (ts *TagService) update(ctx context.Context, args []string) error {
// ...
}
以上是grafana源码分析的一部分,主要集中在框架这块,后面笔者会对grafana应用层面(比如登录校验、session)也做一次分析。