对于PostgreSQL的监控,行业里多多少少还是有不少的开源方案可用的,基本上拿过来修修补补就能跑起来。但是对于Greenplum集群的监控方案比较少,有的同学会说GPCC也可以啊,我有以下的几点考虑。
1)GPCC的监控还是缺少一些系统层的监控细节,比如缺少每个数据segment节点网卡的流量,比如每个数据segment节点的IO负载历史,监控历史的区间范围过大等,同时一般的监控我们可能会跑偏方向,更多是在Master上面监控,但是显然这样是缺少一些真实信息的。
2)GPCC的功能会有独立的数据档案库gpperfmon,程序交互是一个独立的客户端
3)GPCC偏重于集群管理,还有防火墙变更等系统管理功能
4)对于监控选项不能够灵活扩展
我的核心需求是关于Greenplum的监控,在监控方面可以实现扩展,所以也算是做了一个尝试。
这个过程也碰到几个问题。
1)数据字典的选取,PG和GP的大部分数据字典是兼容的,但是GP体系有自己独有的数据字典,比如重要的gp_segment_configuration,整体的信息主要是通过专有的数据字典完成的
2)关于psycopg2版本的兼容性,驱动psycopg2的版本对于Python的版本比较敏感,导致会有一些比较尴尬的情况,比如GP自带的Python驱动是3.6.9,而操作系统层默认的是3.6.3左右,结果不同的用户调用就会有完全不同的结果。补充操作是对.bash_profile刻意做区分,做不同的配置选项。
3)监控脚本的补充和完善,目前这块的脚本是比较少的,所以也就自己在PG的基础上开始做一些改动。
整个改造的过程整体还是比较顺利的,能够基本实现集群信息概览,系统资源概览。
对数据segment节点的系统资源使用取平均值,去除毛刺,这样能够看到一个整体的使用情况。
对网卡流量进行聚合,可以看到整个集群的吞吐量情况。
关于监控的脚本,是在TCollector中PostgreSQL的脚本基础上改进的。对于单机多实例,同事也帮忙在统一接入的地方做了整合,在一个独立的配置文件中,使用的是一个JSON配置,
{"ip":"xxxxx","port":5432,"idc":"xxxx","service":"xxxxx"}
监控脚本内容如下:
import sys
import os
import time
import socket
import errno
try:
import psycopg2
except ImportError:
psycopg2 = None # handled in main()
COLLECTION_INTERVAL = 15 # seconds
CONNECT_TIMEOUT = 2 # seconds
SCAN_INTERVAL = 300
from collectors.lib import utils
from collectors.etc import greenplumconf
# Directories under which to search socket files
SEARCH_DIRS = frozenset([
"/var/run/postgresql", # Debian default
"/var/pgsql_socket", # MacOS default
"/usr/local/var/postgres", # custom compilation
"/tmp", # custom compilation
])
def now():
return int(time.time())
def find_sockdir():
"""Returns a path to PostgreSQL socket file to monitor."""
for dir in SEARCH_DIRS:
for dirpath, dirnames, dirfiles in os.walk(dir, followlinks=True):
for name in dirfiles:
# ensure selection of PostgreSQL socket only
if (utils.is_sockfile(os.path.join(dirpath, name))
and "PGSQL" in name):
return (dirpath)
def postgres_connect(host, port):
"""Connects to the PostgreSQL server using the specified socket file."""
user, password = greenplumconf.get_user_password()
try:
return psycopg2.connect("host='%s' port='%s' user='%s' password='%s' "
"connect_timeout='%s' dbname=postgres"
% (host, port, user, password,
CONNECT_TIMEOUT))
except (EnvironmentError, EOFError, RuntimeError), e:
utils.err("Couldn't connect to DB :%s" % (e))
def collect(db, tags_str):
"""
Collects and prints stats.
Here we collect only general info, for full list of data for collection
see http://www.postgresql.org/docs/9.2/static/monitoring-stats.html
"""
try:
cursor = db.cursor()
# general statics
cursor.execute("SELECT pg_stat_database.*"
" FROM pg_database JOIN"
" pg_stat_database ON pg_database.datname ="
" pg_stat_database.datname WHERE pg_stat_database.datname"
" NOT IN ('template0', 'template1', 'postgres')")
ts = time.time()
stats = cursor.fetchall()
# datid | datname | numbackends | xact_commit | xact_rollback | blks_read | blks_hit | tup_returned | tup_fetched | tup_inserted | tup_updated | tup_deleted | conflicts | temp_files | temp_bytes | deadlocks | blk_read_time | blk_write_time | stats_reset | size
result = {}
for stat in stats:
database = stat[1]
result[database] = stat
for database in result:
for i in range(2, len(cursor.description)):
metric = cursor.description[i][0]
value = result[database][i]
try:
if metric in ("stats_reset"):
continue
print ("greenplum.%s %i %s database=%s %s"
% (metric, ts, value, database, tags_str))
except:
utils.err("got here")
continue
# DB connections
cursor.execute("SELECT datname, count(datname) FROM pg_stat_activity"
" GROUP BY datname")
ts = time.time()
dbconnections = cursor.fetchall()
for database, connection in dbconnections:
print ("greenplum.dbconnections %i %s database=%s %s"
% (ts, connection, database, tags_str))
# DB segment status
cursor.execute("select role,count(*) from gp_segment_configuration "
" group by role")
ts = time.time()
segment_roles = cursor.fetchall()
for segment_role, role_no in segment_roles:
print ("greenplum.segment_roles %i %s segment_role=%s %s"
% (ts, role_no, segment_role, tags_str))
cursor.execute("select status,count(*) from gp_segment_configuration "
" group by status")
ts = time.time()
segments_status = cursor.fetchall()
for segment_status, status_no in segments_status:
print ("greenplum.segment_status %i %s segment_role=%s %s"
% (ts, status_no, segment_status, tags_str))
cursor.execute("select count(*) from gp_segment_configuration where status='d' ")
ts = time.time()
dead_segments_no = cursor.fetchone()
for dead_segemnt_no in dead_segments_no:
print ("greenplum.dead_segement_no %i %s %s"
% (ts, dead_segemnt_no, tags_str))
except (EnvironmentError, EOFError, RuntimeError,), e:
if isinstance(e, IOError) and e[0] == errno.EPIPE:
# exit on a broken pipe. There is no point in continuing
# because no one will read our stdout anyway.
return 2
utils.err("error: failed to collect data: %s" % e)
def scan_for_instances():
try:
f = open("/etc/tcollector/greenplum_tags.conf")
except IOError, e:
tils.err("error: can't open /etc/tcollector/tags.conf : %s" % e)
return None
f.seek(0)
out = {}
for each_data in f:
tags = eval(each_data)
port = tags["port"]
# tags_str = "endpoint=%s port=%s idc=%s service=%s cluster_type=%s db_soft=%s db_engins=%s" % (tags["ip"], tags["port"], tags["idc"], tags["service"], tags["cluster_type"], tags["db_soft"], tags["db_engine"])
tags_str = "endpoint=%s port=%s idc=%s service=%s" % (tags["ip"], tags["port"], tags["idc"], tags["service"])
out[port] = {}
out[port]["tags_map"] = tags
out[port]["tags_str"] = tags_str
return out
def main(args):
"""Collects and dumps stats from a PostgreSQL server."""
if psycopg2 is None:
utils.err("error: Python module 'psycopg2' is missing")
return 13 # Ask tcollector to not respawn us
# sockdir = find_sockdir()
# if not sockdir: # Nothing to monitor
# utils.err("error: Can't find postgresql socket file")
# return 13 # Ask tcollector to not respawn us
last_scan = now()
instances = scan_for_instances()
if not len(instances):
return 13
while True:
ts = now()
if ts - last_scan >= SCAN_INTERVAL:
instances = scan_for_instances()
last_scan = ts
for port in instances:
tags_str = instances[port]["tags_str"]
tags_map = instances[port]["tags_map"]
db = postgres_connect(tags_map["ip"], tags_map["port"])
collect(db, tags_str)
sys.stdout.flush()
time.sleep(COLLECTION_INTERVAL)
if __name__ == "__main__":
sys.stdin.close()
sys.exit(main(sys.argv))
剩下的就是不断的调试和修正,直到看起来还像回事。