首页
学习
活动
专区
圈层
工具
发布
社区首页 >专栏 >运维实战来了!如何构建适用于 YashanDB 的 Prometheus Exporter

运维实战来了!如何构建适用于 YashanDB 的 Prometheus Exporter

作者头像
用户10349277
发布2025-02-21 11:58:04
发布2025-02-21 11:58:04
2400
举报

前言

小崖又收到用户投稿啦。今天分享的是构建 YashanDB Exporter 的核心设计理念和关键方法,希望也能为你的运维实战加分!

背景

在数据库运维工作中,数据库监控是至关重要的一环。常见的数据库监控方案有:Prometheus+Grafana、Zabbix、Nagios 等。其中,Prometheus+Grafana 是目前业界较为流行的监控解决方案之一,广泛应用于:实时资源监控、监控结果可视化、资源异常告警、监控数据分析等场景。该方案中包含几个关键组件,其核心功能的简要说明如下:

根据上述核心组件的工作原理,我们可以分析出一个完整的监控流程:

  1. Exporter 负责收集特定的监控指标,并且按特定的格式组织结果;
  2. Prometheus Server 定时抓取 Exporter 提供的数据,解析数据并存储;
  3. 同时,如果指标触发告警条件,Prometheus Server 会向 Alert Manager 推送告警信息;
  4. Alert Manager 收到推送后对告警进行相应处理,例如:使用短信、邮件等方式通知运维人员;
  5. Grafana Server 接入 Prometheus Server 作为数据源,将查询到的监控数据进行可视化展示。

综上所述,本方案中实现 YashanDB 监控的关键点在于 ------ 实现适用于 YashanDB 的 Prometheus Exporter,后续我们称其为:YashanDB Exporter。至于其他组件,均为通用组件,使用官方发布的稳定软件包即可。下面就让我们一起探讨,如何构建 YashanDB Exporter。

功能设计

1 整体架构设计

基于 Prometheus+Grafana 监控 YashanDB,整体架构设计图如下。其中 YashanDB Exporter 可以抓取多个数据库实例的监控指标。Prometheus Server 通过拉取 YashanDB Exporter 的 Metrics 接口,获取指标数据,并将数据存储到其后台时序数据库 TSDB 中。其他组件的工作流程在上文的背景中已经说明,在此不再赘述。

2 YashanDB Exporter 架构设计

YashanDB Exporter 的功能还是比较简单的,核心工作主要分成三大步骤:采集数据、包装数据以及分享数据。

1. 采集数据:主要是借助协程池(Goroutines)并发使用 SQL 查询数据库的指标信息;

2. 包装数据:把数据库的指标结果转换成 Prometheus Server 能够解析的数据格式(Prometheus Metrics);

3. 分享数据:对外提供 HTTP 服务,外部服务通过请求 Metrics 接口就能够获取 Exporter 采集到的数据。

此外,为了灵活控制 YashanDB Exporter 的行为,我们需要两个配置文件:指标配置文件(Metric Config)和崖山数据库实例配置文件(DB Instance Config)。

1. 指标配置文件:定义了用于查询数据的 SQL,后续用户也可以通过编辑该文件来控制需要采集的指标;

  1. 崖山数据库实例配置文件:主要用于定义数据库实例的基本信息和连接信息,控制需要采集哪些数据库实例的信息。整体架构图如下:

背景与挑战

我们可以采用 Go 语言编码,利用官方提供的 github.com/prometheus/client_golang 开发包,快速构建 YashanDB Exporter。

1. 程序入口

在程序入口中,我们定义了一个名为 yashandb_exporter 的命令行工具。启动程序时,核心工作流程如下:

1. 首先执行必要的初始化工作;

2. 其次根据配置,创建一个 exporter 实例;

3. 通过 prometheus.MustRegister (exporter) 注册实例;

4. 定义 HTTP 服务,将指标处理逻辑绑定到 Mertics 接口;

  1. 启动 HTTP 服务,对外提供 Mertics 接口。 const servername = "yashandb_exporter" var ( httpsConfig = kingpinflag.AddFlags(kingpin.CommandLine, ":9100") pprofAddr = kingpin.Flag( "web.pprof-address", "Address to listen on for pprof debug. (env: YAS_EXPORTER_PPROF_ADDRESS)"). Envar("YAS_EXPORTER_PPROF_ADDRESS").String() metricsURL = kingpin.Flag( "web.telemetry-path", "Telemetry path under which to expose metrics. (env: YAS_EXPORTER_TELEMETRY_PATH)"). Default("/metrics").Envar("YAS_EXPORTER_TELEMETRY_PATH").String() metricsConf = kingpin.Flag( "yashandb.metrics", "File with metrics in a YAML file. (env: YAS_EXPORTER_METRICS)"). Default("yashandb-metrics.yml").Envar("YAS_EXPORTER_METRICS").String() databaseTargets = kingpin.Flag( "yashandb.targets", "File with database targets in a YAML file. (env: YAS_EXPORTER_TARGETS)"). Default("yashandb-targets.yml").Envar("YAS_EXPORTER_TARGETS").String() scrapeTimeout = kingpin.Flag( "scrape.timeout", "Scrape timeout (in seconds). (env: YAS_EXPORTER_SCRAPE_TIMEOUT)"). Default("15").Envar("YAS_EXPORTER_SCRAPE_TIMEOUT").Uint() scrapeMaxConcurrency = kingpin.Flag( "scrape.max-concurrency", "Maximum number of concurrent scrape tasks at the same time. (env: YAS_EXPORTER_SCRAPE_MAX_CONCURRENCY)"). Default("512").Envar("YAS_EXPORTER_SCRAPE_MAX_CONCURRENCY").Uint() logLevel = kingpin.Flag( "log.level", "Log level of YashanDB Exporter. One of: [debug, info, warn, error]. (env: YAS_EXPORTER_LOG_LEVEL)"). Default("info").Envar("YAS_EXPORTER_LOG_LEVEL").String() maxOpenConns = kingpin.Flag( "max.open.conns", "Max open connections of database. (env: YAS_EXPORTER_MAX_OPEN_CONNS)"). Default("3").Envar("YAS_EXPORTER_MAX_OPEN_CONNS").Uint() ) func init() { commons.InitBasePath() commons.InitAuth() } func generateResponse(code int, message string) []byte { res := &struct { Code int json:"code" Message string json:"message" }{Code: code, Message: message} data, _ := json.Marshal(res) return data } func main() { kingpin.Version(fmt.Sprintf("%s-%s", servername, commons.Version)) kingpin.HelpFlag.Short('h') kingpin.Parse() initLog(*logLevel) log.Sugar.Infof("Starting %s-%s", servername, commons.Version) exporterOpts := []collector.ExporterOpt{ collector.WithScrapeMaxConcurrency(*scrapeMaxConcurrency), collector.WithScrapeTimeout(*scrapeTimeout), collector.WithMaxOpenConns(*maxOpenConns), } exporter, err := collector.NewExporter(*databaseTargets, *metricsConf, exporterOpts...) if err != nil { log.Sugar.Fatalf("load configs failed: %s", err) } prometheus.MustRegister(exporter) logger, _ := zap.NewStdLogAt(log.Logger, zap.ErrorLevel) opts := promhttp.HandlerOpts{ ErrorLog: logger, ErrorHandling: promhttp.ContinueOnError, } mux := http.NewServeMux() mux.Handle(*metricsURL, promhttp.HandlerFor(prometheus.DefaultGatherer, opts)) mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", "text/html; charset=UTF-8") _, _ = w.Write(generateServerHome(*metricsURL)) }) if *pprofAddr != "" { go debug.StartHTTPDebuger(*pprofAddr) } server := &http.Server{ Handler: mux, ReadHeaderTimeout: 32 * time.Second, } promlogger := promlog.New(&promlog.Config{}) if err := web.ListenAndServe(server, httpsConfig, promlogger); err != nil { log.Sugar.Fatal("Error running HTTP server:", err) } }

2. Collector 接口

在 3.1 小节中,我们通过 prometheus.MustRegister (exporter) 注册实例后,再通过 mux.Handle (*metricsURL, promhttp.HandlerFor (prometheus.DefaultGatherer, opts)) 就可以完成 Metrics 接口的构建。其中的关键在于如何实现 exporter 实例。通过查看 prometheus.MustRegister 函数,发现其参数是一个 Collector 接口,那么我们的 exporter 实例只需要实现 Collector 接口即可。接口定义如下:

代码语言:javascript
复制
type Collector interface {
  Describe(chan<- *Desc)
  Collect(chan<- Metric)
}

接口有两个函数:Describe 和 Collect。Describe 会在 prometheus.MustRegister 时调用,执行一次标准的采集动作。而 Collect 则会在收到 Metrics 接口请求的时候调用,执行一次标准的采集动作。从 Exporter 的功能上来说,关键点在于 Collect,而 Describe 就算不做任何事情也不会影响整体的功能。所以我们可以聚焦 Collect 函数,代码内容如下:

代码语言:javascript
复制
func (e *Exporter) Describe(ch chan<- *prometheus.Desc) {}

func (e *Exporter) Collect(ch chan<- prometheus.Metric) {
  e.mu.RLock()
  yashandbList := e.originalYashandbList.DeepCopy()
  metricList := e.originalMetricList.DeepCopy()
  e.mu.RUnlock()

  e.scrape(ch, yashandbList, metricList)
  e.sendInstanceDisconnected(ch, yashandbList)
  ch <- e.duration
  ch <- e.totalScrapes
  ch <- e.success
  e.scrapeErrors.Collect(ch)
  ch <- e.up
}

可以看到,关键在于 e.scrape (ch, yashandbList, metricList)。传入的参数有三个:指标信息接收通道(ch)、YashanDB 实例列表(yashandbList)以及采集的指标列表(metricList)。展开分析该函数:

代码语言:javascript
复制
func (e *Exporter) scrape(ch chan<- prometheus.Metric, yashandbList *YashanDBList, metricList *MetricList) {
  e.reset()
  e.totalScrapes.Inc()
  e.up.Set(1)
  defer func(begun time.Time) {
    s := time.Since(begun).Seconds()
    e.duration.Set(s)
    log.Sugar.Debugf("scrape total time: %0.4fs", s)
  }(time.Now())

  scrapeSuccess := true
  swg := sizedwaitgroup.New(int(e.scrapeMaxConcurrency))
  for _, yashandb := range yashandbList.YashanDBs {
    for _, i := range yashandb.ScrapeInstances {
            for _, m := range metricList.Metrics {
                swg.Add()
                go e.scrapeWithMetric(&swg, ch, yashandb, i, m, &scrapeSuccess)
            }
    }
  }
  swg.Wait()

  if scrapeSuccess {
    log.Sugar.Debug("scrape all metrics successfully")
    e.success.Set(1)
  }
}


func (e *Exporter) scrapeWithMetric(
  swg *sizedwaitgroup.SizedWaitGroup,
  ch chan<- prometheus.Metric,
  yashandb *YashanDB,
  scrapeInstance *ScrapeInstance,
  metric *YasMetric,
  scrapeSuccess *bool,
) {
  defer swg.Done()
  start := time.Now()
  if err := metric.Scrape(yashandb, scrapeInstance, ch, e); err != nil {
    *scrapeSuccess = false
    log.Sugar.Errorf("%0.4fs %s failed to scrape <%s>: %s", time.Since(start).Seconds(), scrapeInstance.Mark(), metric.Name, err)
    return
  }
  if len(metric.SubMetrics) == 1 {
    log.Sugar.Debugf("%0.4fs %s succeeded to scrape <%s>", time.Since(start).Seconds(), scrapeInstance.Mark(), metric.Name)
  } else {
    for _, sub := range metric.SubMetrics {
      log.Sugar.Debugf("%0.4fs %s succeeded to scrape <%s_%s>", time.Since(start).Seconds(), scrapeInstance.Mark(), metric.Name, sub.ColName)
    }
  }
}

上述代码中,采用了一个拥有最大并发数控制的协程池,对于每个数据库下每个实例的每个指标,都使用协程去并发采集数据。由此可见,一次采集所需的时间就是所有协程中耗时最长的协程的采集时间。根据代码我们可以看到,具体的采集操作是由 metric.Scrape (yashandb, scrapeInstance, ch, e) 函数执行的。查看函数的具体内容如下:

代码语言:javascript
复制
func (m *YasMetric) Scrape(yashandb *YashanDB, i *ScrapeInstance, ch chan<- prometheus.Metric, e *Exporter) error {
  if i.genQuery(m) == "" {
    log.Sugar.Debugf("%s, skip empty query", m.Name)
    return nil
  }
  if err := i.Query(yashandb, m, ch, e.scrapeTimeout); err != nil {
    i.scrapeFailedOnce = true
    return err
  }
  i.scrapeSuccessOnce = true
  return nil
}

可以看到,核心功能在于 i.Query (yashandb, m, ch, e.scrapeTimeout),查看 Query 函数的相关内容如下所示。在该函数中,主要是使用 YashanDB 的 Go 语言驱动,通过 SQL 查询数据库信息,并把它转换成 Prometheus Metric 格式。最后把采集的指标通过 i.exposeMetrics (yashandb, ch, prometheusMetrics) 发送到指标信息接收通道。其中还引入了超时机制,如果本次查询达到最大超时时间,则终止查询,指标采集失败,协程超时返回。这样可以有效控制 Metrics 接口的返回时间,避免接口长时间无法返回数据。

代码语言:javascript
复制
func (i *ScrapeInstance) Query(yashandb *YashanDB, m *YasMetric, ch chan<- prometheus.Metric, timeout uint) error {
  var prometheusMetrics []prometheus.Metric
  errCh := make(chan error)
  done := make(chan struct{})
  defer close(done)
  go func() {
    defer func() {
      if err := recover(); err != nil {
        log.Sugar.Errorf("recover error: %v", err)
        log.Sugar.Warnf("debug stack warn: %s", string(debug.Stack()))
      }
    }()
    var err error
    defer func() {
      select {
      case <-done:
      case errCh <- err:
      }
      close(errCh)
    }()

    query := i.genQuery(m)
    if query == "" {
      return
    }
    ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(timeout))
    defer cancel()
    rows, err := i.db.QueryContext(ctx, query)
    if err != nil {
      return
    }
    defer rows.Close()
    err = rows.Err()
    if err != nil {
      return
    }
    cols, err := rows.Columns()
    if err != nil {
      return
    }

    result := []map[string]string{}
    for rows.Next() {
      columnData := make([]interface{}, len(cols))
      scanArgs := make([]interface{}, len(cols))
      for i := range columnData {
        scanArgs[i] = &columnData[i]
      }
      err = rows.Scan(scanArgs...)
      if err != nil {
        return
      }
      row := make(map[string]string)
      for i, colName := range cols {
        if columnData[i] == nil {
          columnData[i] = ""
        }
        row[strings.ToLower(colName)] = fmt.Sprintf("%v", columnData[i])
      }
      result = append(result, row)
    }

    for _, row := range result {
      metrics, err := m.genPrometheusMetrics(yashandb, i, row)
      if err != nil {
        return
      }
      prometheusMetrics = append(prometheusMetrics, metrics...)
    }
  }()
  select {
  case err := <-errCh:
    if err != nil {
      return err
    }
    i.exposeMetrics(yashandb, ch, prometheusMetrics)
    return nil
  case <-time.After(time.Second * time.Duration(timeout)):
    return errors.New("query timeout")
  }
}

至此,我们已经完整个 Exporter 的核工作原理介绍:通过配置文件控制需要监控的指标以及数据库实例;再通过实现 Collector 接口,利用 prometheus.MustRegister (exporter) 注册完成后,通过 promhttp.HandlerFor (prometheus.DefaultGatherer, opts) 快速构建一个 Metrics 接口处理器;最后将处理器绑定到 HTTP 服务上,并启动服务即可。

3. 配置文件说明

配置文件其实就是一些数据结构的设计和简单的数据输入,能够获取到需要的数据即可。本方案中设计的配置文件示例如下:

指标配置文件示例如下,在配置文件中我们定义了数据库启动时间和数据库最大会话数两个简单指标。实际使用时,可以通过配置文件内置所需的监控指标,也可以后续增加自定义指标。

代码语言:javascript
复制
metrics:
  - name: uptime
    query: select EXTRACT(DAY FROM (sysdate - startup_time)) * 60 * 60 * 24 +
      EXTRACT(HOUR FROM (sysdate - startup_time)) * 60 * 60 +
      EXTRACT(MINUTE FROM (sysdate - startup_time)) * 60 +
      EXTRACT(SECOND FROM (sysdate - startup_time)) AS uptime
      from v$instance
    sub_metrics:
      - col: uptime
        type: gauge
        description: Uptime of the database instance (in seconds).
  - name: max_sessions
    query: select value as max_sessions from sys.v$parameter where name = 'MAX_SESSIONS'
    sub_metrics:
      - col: max_sessions
        type: gauge
        description: Maximum number of database sessions.

数据库实例配置文件示例如下,在配置文件中我们定义了一个名为 yasdb 的数据库,其拥有两个数据库实例:

代码语言:javascript
复制
targets:
    - type: SE
      name: yasdb
      pkgVersion: 23.2.0.13
      nodes:
        - id: 1-1
          type: yasdn
          group: "1"
          name: instance1
          connection:
            ip: 127.0.0.1
            port: 1688
        - id: 1-2
          type: yasdn
          group: "1"
          name: instance2
          connection:
            ip: 127.0.0.1
            port: 1690

结果展示

1. 首先部署 YashanDB,我们快速部署一个单机单节点数据库,实例的监听地址设置为:127.0.0.1:1688。

2. 然后编辑数据库实例配置文件,设置好数据库地址等信息。接着在 9100 端口启动 YashanDB Exporter,此时访问 Metrics 接口已经可以看到采集到的数据库信息:

3. 随后,我们可以通过 docker 快速拉起 Prometheus Server 和 Grafana Server。先拉取 docker 镜像:

代码语言:javascript
复制
docker pull prom/prometheus
docker pull grafana/grafana

4. 启动 Prometheus Server:

代码语言:javascript
复制
# 生成配置文件
mkdir /opt/prometheus
cd /opt/prometheus/
vim prometheus.yml
# 键入以下内容并保存
global:
  scrape_interval:     60s
  evaluation_interval: 60s
 
scrape_configs:
  - job_name: prometheus
    static_configs:
      - targets: ['localhost:9090'] # Prometheus服务本身
        labels:
          instance: prometheus
 
  - job_name: yashandb_exporter
    static_configs:
      - targets: ['127.0.0.1:9100'] # YashanDB Exporter服务
        labels:
          instance: localhost
# 在9090端口启动服务
docker run  -d \
  -p 9090:9090 \
  -v /opt/prometheus/prometheus.yml:/etc/prometheus/prometheus.yml  \
  prom/prometheus

5. 启动 Grafana Server:

代码语言:javascript
复制
# 新建空文件夹grafana-storage
mkdir /opt/grafana-storage
# 设置权限,测试环境设置777,比较简单粗暴!
chmod 777 -R /opt/grafana-storage
# 在3000端口启动服务
docker run -d \
  -p 3000:3000 \
  --name=grafana \
  -v /opt/grafana-storage:/var/lib/grafana \
  grafana/grafana

6. 访问 3000 端口登录 Grafana Server,初始用户名和密码均为:admin。

7. 配置 Grafana Server 的数据源,选择 Prometheus。根据我们前面的配置,其在 9090 端口运行:

8. 添加 Dashboard,以监控数据库实例的启动时间(yashandb_uptime)为例:

以上就是我通过第三方开发包快速搭建 YashanDB Exporter 的实战记录啦。YashanDB 作为近年来关系数据库领域的后起之秀,其数据库生态也在不断蓬勃发展中。欢迎小伙伴们一起来探讨学习,共同摸索更多运维的高效方法。

本文系转载,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文系转载前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 前言
  • 背景
  • 功能设计
    • 1 整体架构设计
    • 2 YashanDB Exporter 架构设计
  • 背景与挑战
    • 1. 程序入口
    • 2. Collector 接口
    • 3. 配置文件说明
  • 结果展示
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档