源码地址:https://github.com/k8scat/docker-log-driver-tencent-cls
在现代云原生架构中,容器化应用已经成为主流部署方式。随着容器数量的快速增长,如何高效地收集、存储和分析容器日志成为了一个关键挑战。传统的日志收集方式往往存在以下问题:
为了解决这些问题,我们开发了一个专门的 Docker 日志驱动,将容器日志直接发送到腾讯云的 CLS(Cloud Log Service)日志服务。这个驱动实现了与 Docker 日志系统的深度集成,提供了高性能、可靠的日志传输能力。
该日志驱动采用了模块化的设计架构,主要包含以下几个核心组件:
这种分层架构确保了代码的可维护性和可扩展性,每个模块都有明确的职责边界。
项目定义了多个关键的数据结构来支持日志驱动的功能:
type Driver struct {
streams map[string]\*logStream
containerStreams map[string]\*logStream
mu sync.RWMutex
fs fileSystem
newTencentCLSLogger newTencentCLSLoggerFunc
processLogs func(stream \*logStream)
logger \*zap.Logger
}
type TencentCLSLogger struct {
client client
formatter \*messageFormatter
cfg \*loggerConfig
buffer chan string
mu sync.Mutex
partialLogsBuffer \*partialLogBuffer
wg sync.WaitGroup
closed chan struct{}
logger \*zap.Logger
}
这些数据结构的设计充分考虑了并发安全性和资源管理,确保了在高并发场景下的稳定运行。
日志驱动的核心功能是管理容器的日志流。每个容器启动时,驱动会创建一个独立的日志流来处理该容器的所有日志输出:
func (d \*Driver) StartLogging(streamPath string, containerDetails \*ContainerDetails) (stream \*logStream, err error) {
d.logger.Info("starting logging", zap.String("stream\_path", streamPath), zap.Any("container\_details", containerDetails))
d.mu.RLock()
if \_, ok := d.streams[streamPath]; ok {
d.mu.RUnlock()
return nil, errors.New("already logging")
}
d.mu.RUnlock()
name := "container:" + containerDetails.ContainerName
stream = &logStream{
streamPath: streamPath,
containerDetails: containerDetails,
logger: d.logger.Named(name),
fs: d.fs,
stop: make(chan struct{}),
}
// 初始化日志流
if err := d.initializeStream(stream); err != nil {
return nil, err
}
// 启动日志处理协程
go d.processLogs(stream)
return stream, nil
}
这种设计确保了每个容器的日志都能被独立处理,避免了不同容器之间的日志混淆。
日志处理采用了异步非阻塞的设计模式,确保不会影响容器的正常运行:
func (d \*Driver) defaultProcessLogs(stream \*logStream, processedNotifier chan<- struct{}) {
defer func() {
if err := stream.Close(); err != nil {
d.logger.Error("failed to close stream", zap.Error(err))
}
}()
logs := NewLogs(stream)
for logs.Next() {
select {
case <-stream.stop:
return
default:
}
entry := logs.Log()
log := &logger.Message{
Line: entry.GetLine(),
Source: entry.GetSource(),
Timestamp: time.Unix(0, entry.GetTimeNano()),
PLogMetaData: partialLog,
}
// 发送到腾讯云 CLS
if err := stream.tencentCLSLogger.Log(log); err != nil {
stream.logger.Error("failed to log to tencent cls logger", zap.Error(err))
}
// 可选:保存到本地文件
if stream.jsonLogger != nil {
if err := stream.jsonLogger.Log(log); err != nil {
stream.logger.Error("failed to log to json logger", zap.Error(err))
}
}
}
}
这种设计确保了日志处理的可靠性和性能,即使在网络不稳定的情况下也能保证日志的完整性。
驱动支持灵活的日志格式配置,用户可以通过模板来自定义日志的输出格式:
type messageFormatter struct {
template \*fasttemplate.Template
containerDetails \*ContainerDetails
attrs map[string]string
}
func (f \*messageFormatter) tagFunc(msg \*logger.Message) fasttemplate.TagFunc {
return func(w io.Writer, tag string) (int, error) {
switch tag {
case "log":
return w.Write(msg.Line)
case "timestamp":
return w.Write([]byte(msg.Timestamp.UTC().Format(time.RFC3339)))
case "container\_id":
return w.Write([]byte(f.containerDetails.ID()))
case "container\_name":
return w.Write([]byte(f.containerDetails.Name()))
case "image\_name":
return w.Write([]byte(f.containerDetails.ImageName()))
case "daemon\_name":
return w.Write([]byte(f.containerDetails.DaemonName))
}
if value, ok := f.attrs[tag]; ok {
return w.Write([]byte(value))
}
return 0, fmt.Errorf("%w: %s", errUnknownTag, tag)
}
}
支持的模板标签包括:
{log}
:原始日志内容{timestamp}
:日志时间戳{container\_id}
:容器ID{container\_name}
:容器名称{image\_name}
:镜像名称{daemon\_name}
:Docker 守护进程名称驱动使用腾讯云官方提供的 SDK 来实现与 CLS 服务的集成:
type Client struct {
logger \*zap.Logger
cfg ClientConfig
producer \*tencentcloud\_cls\_sdk\_go.AsyncProducerClient
callback \*clsCallback
}
func (c \*Client) SendMessage(text string) error {
addLogMap := map[string]string{}
if err := json.Unmarshal([]byte(text), &addLogMap); err != nil {
c.logger.Debug("failed to unmarshal log", zap.String("log", text), zap.Error(err))
addLogMap["content"] = text
}
// 添加实例信息
if c.cfg.InstanceInfo != "" {
instanceInfo := map[string]string{}
if err := json.Unmarshal([]byte(c.cfg.InstanceInfo), &instanceInfo); err != nil {
addLogMap["instance"] = c.cfg.InstanceInfo
} else {
for k, v := range instanceInfo {
addLogMap["\_\_instance\_\_."+k] = v
}
}
}
// 添加容器详情
if len(c.cfg.AppendContainerDetailsKeys) > 0 {
for \_, k := range c.cfg.AppendContainerDetailsKeys {
switch k {
case "container\_id":
addLogMap["\_\_container\_details\_\_.container\_id"] = c.cfg.ContainerDetails.ContainerID
case "container\_name":
addLogMap["\_\_container\_details\_\_.container\_name"] = c.cfg.ContainerDetails.ContainerName
// ... 其他字段
}
}
}
log := tencentcloud\_cls\_sdk\_go.NewCLSLog(time.Now().Unix(), addLogMap)
err := c.producer.SendLog(c.cfg.TopicID, log, c.callback)
if err != nil {
return fmt.Errorf("failed to send message: %w", err)
}
return nil
}
这种设计确保了日志能够以结构化的形式发送到 CLS,便于后续的查询和分析。
驱动支持丰富的配置参数,满足不同场景的需求:
endpoint
:腾讯云 CLS 服务端点secret\_id
:腾讯云 API 密钥 IDsecret\_key
:腾讯云 API 密钥topic\_id
:CLS 主题 IDtemplate
:日志格式模板filter-regex
:日志过滤正则表达式retries
:重试次数timeout
:请求超时时间no-file
:是否禁用本地文件存储keep-file
:容器停止后是否保留日志文件驱动实现了完善的配置解析和验证机制:
func parseLoggerConfig(containerDetails \*ContainerDetails) (\*loggerConfig, error) {
clientConfig, err := parseClientConfig(containerDetails)
if err != nil {
return nil, fmt.Errorf("failed to parse client config: %w", err)
}
attrs, err := containerDetails.ExtraAttributes(nil)
if err != nil {
return nil, fmt.Errorf("failed to parse extra attributes: %w", err)
}
cfg := defaultLoggerConfig
cfg.ClientConfig = clientConfig
cfg.Attrs = attrs
// 解析模板配置
if template, ok := containerDetails.Config[cfgTemplateKey]; ok {
cfg.Template = template
}
// 解析过滤正则表达式
if filterRegex, ok := containerDetails.Config[cfgFilterRegexKey]; ok {
cfg.FilterRegex, err = regexp.Compile(filterRegex)
if err != nil {
return nil, fmt.Errorf("failed to parse %q option: %w", cfgFilterRegexKey, err)
}
}
if err := cfg.Validate(containerDetails.Config); err != nil {
return nil, err
}
return &cfg, nil
}
这种设计确保了配置的正确性和一致性,避免了因配置错误导致的问题。
驱动实现了完善的重试机制,确保在网络不稳定的情况下能够可靠地发送日志:
type ClientConfig struct {
Endpoint string
SecretID string
SecretKey string
TopicID string
InstanceInfo string
AppendContainerDetailsKeys []string
ContainerDetails \*ContainerDetails
// 重试配置
Retries int
// 超时配置
Timeout time.Duration
}
对于跨多个日志条目的长日志,驱动实现了部分日志的缓冲和组装机制:
type partialLogBuffer struct {
logs map[string]\*logger.Message
mu sync.Mutex
}
func (b \*partialLogBuffer) Append(log \*logger.Message) (\*logger.Message, bool) {
if log.PLogMetaData == nil {
panic("log must be partial")
}
b.mu.Lock()
defer b.mu.Unlock()
plog, exists := b.logs[log.PLogMetaData.ID]
if !exists {
plog = new(logger.Message)
\*plog = \*log
b.logs[plog.PLogMetaData.ID] = plog
plog.Line = make([]byte, 0, 16\*1024)
plog.PLogMetaData = nil
}
plog.Line = append(plog.Line, log.Line...)
if log.PLogMetaData.Last {
delete(b.logs, log.PLogMetaData.ID)
return plog, true
}
return nil, false
}
这种设计确保了长日志的完整性,避免了日志被截断的问题。
驱动采用了异步处理模式,确保日志发送不会阻塞容器的正常运行:
func (l \*TencentCLSLogger) Log(log \*logger.Message) error {
if l.isClosed() {
return errLoggerClosed
}
if log.PLogMetaData != nil {
assembledLog, last := l.partialLogsBuffer.Append(log)
if !last {
return nil
}
\*log = \*assembledLog
}
if l.cfg.FilterRegex != nil && !l.cfg.FilterRegex.Match(log.Line) {
l.logger.Debug("message is filtered out by regex", zap.String("regex", l.cfg.FilterRegex.String()))
return nil
}
text := l.formatter.Format(log)
l.send(text)
return nil
}
驱动支持灵活的日志级别控制,便于调试和监控:
func newLogger(env string, logLevel string) (\*zap.Logger, error) {
var cfg zap.Config
if env == "production" {
cfg = zap.NewProductionConfig()
} else {
cfg = zap.NewDevelopmentConfig()
}
var err error
cfg.Level, err = zap.ParseAtomicLevel(logLevel)
if err != nil {
return nil, fmt.Errorf("failed to parse log level: %w", err)
}
cfg.EncoderConfig.EncodeTime = zapcore.ISO8601TimeEncoder
return cfg.Build()
}
驱动以 Docker 插件的形式提供,安装过程简单:
# 安装插件
docker plugin install k8scat/docker-log-driver-tencent-cls:latest \
--alias tencent-cls \
--grant-all-permissions
在运行容器时指定日志驱动:
docker run --log-driver=tencent-cls \
--log-opt endpoint="<endpoint>" \
--log-opt secret\_id="<secret\_id>" \
--log-opt secret\_key="<secret\_key>" \
--log-opt topic\_id="<topic\_id>" \
--log-opt template="{container\_name}: {log}" \
your\_image
在 Docker 守护进程级别配置默认日志驱动:
{
"log-driver": "tencent-cls",
"log-opts": {
"endpoint": "<endpoint>",
"secret\_id": "<secret\_id>",
"secret\_key": "<secret\_key>",
"topic\_id": "<topic\_id>"
}
}
这个 Docker 日志驱动项目展示了如何构建一个企业级的日志收集解决方案。通过深度集成 Docker 的日志系统,我们实现了高性能、可靠的日志传输能力。项目的主要特点包括:
这个项目为容器化应用的日志管理提供了一个完整的解决方案,能够满足企业级应用的各种需求。未来可以考虑添加更多功能,如日志压缩、加密传输、多租户支持等,进一步提升产品的竞争力。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。