本文将从docker(1.12.6)源码的角度分析docker daemon怎么将容器的日志收集出来并通过配置的log-driver发送出去,并结合示例介绍了好雨云帮中实现的一个zmq-loger。阅读本文,你也可以实现适合自己业务场景的log-driver。
本文适合能够阅读和编写golang代码的同学。 (1)首先你需要认知以下几个关键词:
(2)你需要知道关于进程产生日志的形式:
进程产生日志有两类输出方式,一类是写入到文件中。另一类是直接写到stdout或者stderr,例如php的echo
python的print
golang的fmt.Println("")
等等。
(3)是否知道docker-daemon与运行中container的关系?
一个container就是一个特殊的进程,它是由docker daemon创建并启动,因此container是docker daemon的子进程。由docker daemon守护和管理。因此container的stdout能够被docker daemon获取到。基于此理论,我们来分析docker daemon相关代码。
# /container/container.go:62
type CommonContainer struct{
StreamConfig *stream.Config
...
}
# /container/stream/streams.go:26
type Config struct {
sync.WaitGroup
stdout *broadcaster.Unbuffered
stderr *broadcaster.Unbuffered
stdin io.ReadCloser
stdinPipe io.WriteCloser
}
找到如上所示对应的代码,显示了每一个container实例都有几个属性stdout,stderr,stdin,以及管道stdinPipe。这里说下stdinPipe,当容器使用-i参数启动时标准输入将被运行,daemon将能够使用此管道向容器内写入标准输入。
我们试想以上图例,如果是你,你怎么实现日志收集转发?
# /container/container.go:312func (container *Container) StartLogger(cfg containertypes.LogConfig) (logger.Logger, error) {
c, err := logger.GetLogDriver(cfg.Type) if err != nil { return nil, fmt.Errorf("Failed to get logging factory: %v", err)
}
ctx := logger.Context{
Config: cfg.Config,
ContainerID: container.ID,
ContainerName: container.Name,
ContainerEntrypoint: container.Path,
ContainerArgs: container.Args,
ContainerImageID: container.ImageID.String(),
ContainerImageName: container.Config.Image,
ContainerCreated: container.Created,
ContainerEnv: container.Config.Env,
ContainerLabels: container.Config.Labels,
DaemonName: "docker",
} // Set logging file for "json-logger"
if cfg.Type == jsonfilelog.Name {
ctx.LogPath, err = container.GetRootResourcePath(fmt.Sprintf("%s-json.log", container.ID)) if err != nil { return nil, err
}
} return c(ctx)
}
#/container/container.go:978func (container *Container) startLogging() error { if container.HostConfig.LogConfig.Type == "none" { return nil // do not start logging routines
}
l, err := container.StartLogger(container.HostConfig.LogConfig) if err != nil { return fmt.Errorf("Failed to initialize logging driver: %v", err)
}
copier := logger.NewCopier(map[string]io.Reader{"stdout": container.StdoutPipe(), "stderr": container.StderrPipe()}, l)
container.LogCopier = copier
copier.Run()
container.LogDriver = l // set LogPath field only for json-file logdriver
if jl, ok := l.(*jsonfilelog.JSONFileLogger); ok {
container.LogPath = jl.LogPath()
} return nil}
第一个方法是为container查找log-driver。首先根据容器配置的log-driver类别调用:logger.GetLogDriver(cfg.Type)
返回一个方法类型:
/daemon/logger/factory.go:9
type Creator func(Context) (Logger, error)
实质就是从工厂类注册的logdriver插件去查找,具体源码下文分析。获取到c方法后构建调用参数具体就是容器的一些信息。然后使用调用c方法返回driver。driver是个接口类型,我们看看有哪些方法:
# /daemon/logger/logger.go:61type Logger interface {
Log(*Message) error
Name() string
Close() error
}
很简单的三个方法,也很容易理解,Log()
发送日志消息到driver,Close()
进行关闭操作(根据不同实现)。
也就是说我们自己实现一个logdriver,只需要实现如上三个方法,然后注册到logger工厂类中即可。下面我们来看/daemon/logger/factory.go
第二个方法就是处理日志了,获取到日志driver,在创建一个Copier
,顾名思义就是复制日志,分别从stdout 和stderr复制到logger driver。下面看看具体关键实现:
#/daemon/logger/copir.go:41func (c *Copier) copySrc(name string, src io.Reader) { defer c.copyJobs.Done()
reader := bufio.NewReader(src) for { select { case <-c.closed: return
default:
line, err := reader.ReadBytes('\n')
line = bytes.TrimSuffix(line, []byte{'\n'}) // ReadBytes can return full or partial output even when it failed.
// e.g. it can return a full entry and EOF.
if err == nil || len(line) > 0 { if logErr := c.dst.Log(&Message{Line: line, Source: name, Timestamp: time.Now().UTC()}); logErr != nil {
logrus.Errorf("Failed to log msg %q for logger %s: %s", line, c.dst.Name(), logErr)
}
} if err != nil { if err != io.EOF {
logrus.Errorf("Error scanning log stream: %s", err)
} return
}
}
}
}
每读取一行数据,构建一个消息,调用logdriver的log方法发送到driver处理。
位于/daemon/logger/factory.go
的源码实现即时日志driver的注册器,其中几个重要的方法(上文已经提到一个):
# /daemon/logger/factory.go:21func (lf *logdriverFactory) register(name string, c Creator) error { if lf.driverRegistered(name) { return fmt.Errorf("logger: log driver named '%s' is already registered", name)
}
lf.m.Lock()
lf.registry[name] = c
lf.m.Unlock() return nil}
# /daemon/logger/factory.go:39func (lf *logdriverFactory) registerLogOptValidator(name string, l LogOptValidator) error {
lf.m.Lock() defer lf.m.Unlock() if _, ok := lf.optValidator[name]; ok { return fmt.Errorf("logger: log validator named '%s' is already registered", name)
}
lf.optValidator[name] = l return nil}
看起来很简单,就是将一个Creator
方法类型添加到一个map结构中,将LogOptValidator
添加到另一个map这里注意加锁的操作。
#/daemon/logger/factory.go:13
type LogOptValidator func(cfg map[string]string) error
这个主要是验证driver的参数 ,dockerd和docker启动参数中有:--log-opt
上文已经完整分析了docker daemon管理logdriver和处理日志的整个流程。相信你已经比较明白了。下面我们以zmq-driver为例讲讲我们怎么实现自己的driver。直接接收容器的日志。
上文我们已经谈了一个log-driver需要实现的几个方法。
我们可以看看位于/daemon/logger
目录下的已有的driver的实现,例如fluentd
,awslogs
等。
下面我们来分析zmq-driver具体的代码:
//定义一个struct,这里包含一个zmq套接字type ZmqLogger struct {
writer *zmq.Socket
containerId string
tenantId string
serviceId string
felock sync.Mutex
}//定义init方法调用logger注册器的方法注册当前driver//和参数验证方法。func init() { if err := logger.RegisterLogDriver(name, New); err != nil {
logrus.Fatal(err)
} if err := logger.RegisterLogOptValidator(name, ValidateLogOpt); err != nil {
logrus.Fatal(err)
}
}//实现一个上文提到的Creator方法注册logdriver.//这里新建一个zmq套接字构建一个实例func New(ctx logger.Context) (logger.Logger, error) {
zmqaddress := ctx.Config[zmqAddress]
puber, err := zmq.NewSocket(zmq.PUB) if err != nil { return nil, err
} var (
env = make(map[string]string)
tenantId string
serviceId string
) for _, pair := range ctx.ContainerEnv {
p := strings.SplitN(pair, "=", 2) //logrus.Errorf("ContainerEnv pair: %s", pair)
if len(p) == 2 {
key := p[0]
value := p[1]
env[key] = value
}
}
tenantId = env["TENANT_ID"]
serviceId = env["SERVICE_ID"] if tenantId == "" {
tenantId = "default"
} if serviceId == "" {
serviceId = "default"
}
puber.Connect(zmqaddress) return &ZmqLogger{
writer: puber,
containerId: ctx.ID(),
tenantId: tenantId,
serviceId: serviceId,
felock: sync.Mutex{},
}, nil}//实现Log方法,这里使用zmq socket发送日志消息//这里必须注意,zmq socket是线程不安全的,我们知道//本方法可能被两个线程(复制stdout和肤质stderr)调用//必须使用锁保证线程安全。否则会发生错误。func (s *ZmqLogger) Log(msg *logger.Message) error {
s.felock.Lock() defer s.felock.Unlock()
s.writer.Send(s.tenantId, zmq.SNDMORE)
s.writer.Send(s.serviceId, zmq.SNDMORE) if msg.Source == "stderr" {
s.writer.Send(s.containerId+": "+string(msg.Line), zmq.DONTWAIT)
} else {
s.writer.Send(s.containerId+": "+string(msg.Line), zmq.DONTWAIT)
} return nil}//实现Close方法,这里用来关闭zmq socket。//同样注意线程安全,调用此方法的是容器关闭协程。func (s *ZmqLogger) Close() error {
s.felock.Lock() defer s.felock.Unlock() if s.writer != nil { return s.writer.Close()
} return nil}func (s *ZmqLogger) Name() string { return name
}//验证参数的方法,我们使用参数传入zmq pub的地址。func ValidateLogOpt(cfg map[string]string) error { for key := range cfg { switch key { case zmqAddress: default: return fmt.Errorf("unknown log opt '%s' for %s log driver", key, name)
}
} if cfg[zmqAddress] == "" { return fmt.Errorf("must specify a value for log opt '%s'", zmqAddress)
} return nil}
多研究源码可以方便我们理解docker的工作原理。今天我们分析了日志部分。希望读者对这部分功能能够理解得更清晰。
扫码关注腾讯云开发者
领取腾讯云代金券
Copyright © 2013 - 2025 Tencent Cloud. All Rights Reserved. 腾讯云 版权所有
深圳市腾讯计算机系统有限公司 ICP备案/许可证号:粤B2-20090059 深公网安备号 44030502008569
腾讯云计算(北京)有限责任公司 京ICP证150476号 | 京ICP备11018762号 | 京公网安备号11010802020287
Copyright © 2013 - 2025 Tencent Cloud.
All Rights Reserved. 腾讯云 版权所有