首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >运维自动化脚本集锦:那些能救命的 Bash 与 Python 小工具

运维自动化脚本集锦:那些能救命的 Bash 与 Python 小工具

作者头像
IT运维技术圈
发布2025-10-09 12:06:07
发布2025-10-09 12:06:07
6300
代码可运行
举报
文章被收录于专栏:IT运维技术圈IT运维技术圈
运行总次数:0
代码可运行

0. 写在前面:为什么你需要“神器”而非“常用命令

大家好,我是老杨 欢迎点击原文链接或直接访问vps.top365app.com,来看我的全球vps信息实时分析项目,另外觉得文章不错要记得.点赞、转发、在看以及打开小星标哦,攒今世之功德,修来世之福报

正文

在运维的日常工作里,时间有时候比 CPU 时钟还要宝贵。凌晨三点接到告警电话,SSH 上去手工一行行敲命令,效率极低,风险也很大。这个时候,准备好一些“应急小工具”就显得无比重要。它们不必庞大复杂,但要足够直接、实用,能帮人快速定位问题、止血,甚至直接修复。下面整理了一些 Bash 与 Python 写的小工具,每个都能在关键时刻派上用场。

约定:所有 Bash 脚本默认 set -euo pipefail,默认只读或带 --dry-run。涉及修改系统的地方,脚本或示例里都有醒目的提示与二次确认。


1)主机快照体检:triage.sh(Bash)

作用:60 秒内拉一份机器健康报告(负载、内存、I/O、端口、热点进程、最近错误日志、关键目标连通性),输出 Markdown 方便贴进工单。

代码语言:javascript
代码运行次数:0
运行
复制
#!/usr/bin/env bash
set -euo pipefail
TARGETS=${*:-"127.0.0.1:22 8.8.8.8:53"}
OUT="triage-$(hostname)-$(date +%F-%H%M%S).md"

sec() { echo -e "\n## $1\n"; }

{
echo"# Triage Report @ $(hostname) $(date -Is)"
  sec "Uptime / Load"
uptime; echo; cat /proc/loadavg

  sec "CPU/内存(Top 5)"
  ps -eo pid,ppid,cmd,%cpu,%mem --sort=-%cpu | head -n 6
echo
  free -h

  sec "磁盘空间与inode"
df -hT | sed -n '1,10p'
echo
df -i | sed -n '1,10p'

  sec "I/O 与网络概况"
  iostat -xz 1 1 2>/dev/null || true
echo
  ss -ltnp | sed -n '1,20p'

  sec "最近系统错误日志(200行)"
  journalctl -p 3 -n 200 --no-pager || true

  sec "关键目标连通性"
for t in$TARGETS; do
    host=${t%:*}; port=${t#*:}
    echo -n "$host:$port -> "
    (echo > /dev/tcp/$host/$port) >/dev/null 2>&1 && echo"OK" || echo"FAIL"
done
} > "$OUT"

echo "生成报告:$OUT"

示例执行

代码语言:javascript
代码运行次数:0
运行
复制
$ sudo bash triage.sh 10.0.0.10:3306 10.96.0.10:53
生成报告:triage-node-a-2025-08-10-144201.md

示例输出片段

代码语言:javascript
代码运行次数:0
运行
复制
## Uptime / Load
 14:42:02 up 20 days,  5:17,  1 user,  load average: 3.02, 2.71, 2.45
1.45 1.37 1.22 1/1024 22345

## CPU/内存(Top 5)
  PID  PPID CMD                         %CPU %MEM
23456     1 /usr/bin/myapp --worker     92.3  5.1
...

2)“带柄”的日志切割:safe_logrotate.sh(Bash)

作用:当一个超大日志被进程打开时,直接 rm 只会把 inode 留在磁盘上。这个脚本稳妥地改名→压缩→信号提示→可选truncate,避免丢句柄。

代码语言:javascript
代码运行次数:0
运行
复制
#!/usr/bin/env bash
set -euo pipefail
FILES=${*:? "用法:safe_logrotate.sh /var/log/app.log [/var/log/xxx.log]"}
for f in"$@"; do
  [[ -f "$f" ]] || { echo"不存在:$f"; continue; }
  ts=$(date +%F-%H%M%S)
  rot="$f.$ts"
echo"切割 $f -> $rot"
mv"$f""$rot"
  gzip -9 "$rot" &
# 尝试通知常见日志守护
  systemctl is-active --quiet rsyslog && systemctl reload rsyslog || true
  systemctl is-active --quiet nginx && systemctl reload nginx || true
# 若应用不重新打开文件,可选truncate原路径(已由进程重新创建)
  [[ -f "$f" ]] || : # 应用应已重建
done
wait
echo "完成。"

示例执行

代码语言:javascript
代码运行次数:0
运行
复制
$ sudo bash safe_logrotate.sh /var/log/app.log /var/log/nginx/access.log
切割 /var/log/app.log -> /var/log/app.log.2025-08-10-144921
切割 /var/log/nginx/access.log -> /var/log/nginx/access.log.2025-08-10-144921
完成。

验证

代码语言:javascript
代码运行次数:0
运行
复制
$ ls -lh /var/log/app.log*
-rw-r--r-- 1 root root  12K 14:49 /var/log/app.log
-rw-r--r-- 1 root root 6.8G 14:49 /var/log/app.log.2025-08-10-144921.gz

3)进程“发胖”守护:leak_guard.py(Python)

作用:盯一个 PID 的 RSS 增长,一旦超过阈值,立刻采集smaps_rollup、线程栈回溯(pstack 可选)、句柄快照(lsof),把证据留住。

代码语言:javascript
代码运行次数:0
运行
复制
#!/usr/bin/env python3
import argparse, os, time, subprocess, shutil, datetime, sys

defrss_kb(pid: int) -> int:
    withopen(f"/proc/{pid}/status") as f:
        for line in f:
            if line.startswith("VmRSS:"):
                returnint(line.split()[1])
    return0

defsnapshot(pid: int, outdir: str):
    ts = datetime.datetime.now().strftime("%F-%H%M%S")
    base = os.path.join(outdir, f"{pid}-{ts}")
    os.makedirs(base, exist_ok=True)
    for src, dst in [(f"/proc/{pid}/smaps_rollup", f"{base}/smaps_rollup.txt"),
                     (f"/proc/{pid}/limits", f"{base}/limits.txt"),
                     (f"/proc/{pid}/status", f"{base}/status.txt")]:
        try:
            shutil.copy(src, dst)
        except Exception as e:
            print(f"copy {src} fail: {e}", file=sys.stderr)
    try:
        withopen(f"{base}/lsof.txt","w") as w:
            subprocess.run(["lsof","-p",str(pid)], stdout=w, stderr=subprocess.DEVNULL)
    except FileNotFoundError:
        pass
    try:
        withopen(f"{base}/pstack.txt","w") as w:
            subprocess.run(["pstack",str(pid)], stdout=w, stderr=subprocess.DEVNULL, timeout=10)
    except Exception:
        pass
    print(f"[snap] saved to {base}")

if __name__ == "__main__":
    ap = argparse.ArgumentParser()
    ap.add_argument("--pid", type=int, required=True)
    ap.add_argument("--limit-mb", type=int, default=2048)
    ap.add_argument("--interval", type=float, default=2.0)
    ap.add_argument("--outdir", default="/tmp/leak_guard")
    args = ap.parse_args()

    lim_kb = args.limit_mb * 1024
    print(f"[watch] pid={args.pid} limit={args.limit_mb}MB interval={args.interval}s")
    last = 0
    whileTrue:
        cur = rss_kb(args.pid)
        if cur <= 0:
            print("[exit] process ended")
            break
        if cur != last:
            print(f"[rss] {cur/1024:.1f} MB")
            last = cur
        if cur > lim_kb:
            print(f"[alarm] rss {cur/1024:.1f} MB > {args.limit_mb} MB")
            snapshot(args.pid, args.outdir)
            time.sleep(60)  # 避免疯狂采样
        time.sleep(args.interval)

示例执行

代码语言:javascript
代码运行次数:0
运行
复制
$ sudo ./leak_guard.py --pid 23456 --limit-mb 1024 --interval 1
[watch] pid=23456 limit=1024MB interval=1.0s
[rss] 820.5 MB
[rss] 1056.2 MB
[alarm] rss 1056.2 MB > 1024 MB
[snap] saved to /tmp/leak_guard/23456-2025-08-10-145302

4)磁盘“止血针”:free_disk_hotfix.sh(Bash)

作用:80% 以上空间时快速释放 5–20G,优先日志与缓存;默认 --dry-run 只打印将要做的动作;确认后执行。

代码语言:javascript
代码运行次数:0
运行
复制
#!/usr/bin/env bash
set -euo pipefail
DRY=1
[[ "${1:-}" == "--confirm" ]] && DRY=0
act(){ if ((DRY)); thenecho"[DRY] $*"; elseeval"$@"; fi; }

echo"扫描大目录…"
du -xh /var /home /opt 2>/dev/null | sort -hr | head -n 20

echo"准备清理候选:logrotate、journal、apt缓存、Docker垃圾…"
act "journalctl --vacuum-time=3d"
act "apt-get clean 2>/dev/null || true"
act "docker system prune -f 2>/dev/null || true"
act "find /var/log -type f -name '*.log.*' -mtime +3 -print -exec rm -f {} +"
act "find /var/log -type f -size +200M -print -exec gzip -9 {} +"

echo"完成。当前磁盘:"
df -h | sed -n '1,10p'

示例执行

代码语言:javascript
代码运行次数:0
运行
复制
$ sudo bash free_disk_hotfix.sh
扫描大目录…
12G /var
8.1G /var/log
…
[DRY] journalctl --vacuum-time=3d
[DRY] apt-get clean
[DRY] docker system prune -f
[DRY] find /var/log -type f -name '*.log.*' -mtime +3 -print -exec rm -f {} +
[DRY] find /var/log -type f -size +200M -print -exec gzip -9 {} +
完成。当前磁盘:
Filesystem      Size  Used Avail Use% Mounted on
/dev/sda1        50G   49G  1.0G  98% /

确认执行

代码语言:javascript
代码运行次数:0
运行
复制
$ sudo bash free_disk_hotfix.sh --confirm
…
/dev/sda1        50G   41G   9.0G  82% /

5)端口与 HTTP 并发探针:port_probe.py(Python)

作用:并发检测一批 host:port,对 HTTP/HTTPS 还会取状态码与握手耗时、证书剩余天数,输出 CSV。

代码语言:javascript
代码运行次数:0
运行
复制
#!/usr/bin/env python3
import argparse, ssl, socket, time, csv, concurrent.futures, datetime, sys

defprobe(target, timeout=2.0):
    host, port = target.split(":")
    port = int(port)
    start = time.time()
    is_https = port in (443, 8443)
    code, tls_days = "", ""
    try:
        if is_https:
            ctx = ssl.create_default_context()
            with socket.create_connection((host, port), timeout=timeout) as sock:
                with ctx.wrap_socket(sock, server_hostname=host) as ssock:
                    cert = ssock.getpeercert()
                    exp = datetime.datetime.strptime(cert['notAfter'], "%b %d %H:%M:%S %Y %Z")
                    tls_days = (exp - datetime.datetime.utcnow()).days
                    ssock.send(b"GET /health HTTP/1.1\r\nHost: "+host.encode()+b"\r\nConnection: close\r\n\r\n")
                    resp = ssock.recv(64).decode("latin1", errors="ignore")
                    if resp.startswith("HTTP/1.1 "): code = resp.split()[1]
        else:
            with socket.create_connection((host, port), timeout=timeout):
                pass
        ok = True
    except Exception as e:
        ok = False; code = f"ERR:{type(e).__name__}"
    return {
        "target": target,
        "ok": ok,
        "ms": int((time.time()-start)*1000),
        "http_code": code,
        "tls_days": tls_days
    }

if __name__ == "__main__":
    ap = argparse.ArgumentParser()
    ap.add_argument("targets", nargs="+")
    ap.add_argument("--out", default="probe.csv")
    args = ap.parse_args()
    with concurrent.futures.ThreadPoolExecutor(max_workers=32) as exe:
        results = list(exe.map(probe, args.targets))
    withopen(args.out,"w",newline="") as f:
        w = csv.DictWriter(f, fieldnames=results[0].keys())
        w.writeheader(); w.writerows(results)
    for r in results:
        print(f"{r['target']:22} ok={r['ok']} {r['ms']}ms code={r['http_code']} tls_days={r['tls_days']}")
    print(f"写入 {args.out}")

示例执行

代码语言:javascript
代码运行次数:0
运行
复制
$ ./port_probe.py api.example.com:443 redis.internal:6379 10.0.0.10:22
api.example.com:443      ok=True 87ms code=200 tls_days=63
redis.internal:6379      ok=True 2ms code=  tls_days=
10.0.0.10:22             ok=True 1ms code=  tls_days=
写入 probe.csv

6)K8s 发布看守与快速回滚:k8s_rollout_guard.sh(Bash)

作用:盯一个 Deployment 的发布,超时或不可用直接回滚到上一个健康版本,并把事件打到屏幕上。

代码语言:javascript
代码运行次数:0
运行
复制
#!/usr/bin/env bash
set -euo pipefail
DEPLOY="${1:?用法:k8s_rollout_guard.sh <deploy>}"
NS="${2:-prod}"
TO="${3:-120s}"

echo"监控发布:$NS/$DEPLOY,超时 $TO"
if kubectl -n "$NS" rollout status "deploy/$DEPLOY" --timeout="$TO"; then
echo"发布成功。"
else
echo"发布异常,准备回滚…"
  kubectl -n "$NS" rollout undo "deploy/$DEPLOY"
  kubectl -n "$NS" get events --sort-by=.lastTimestamp | tail -n 20
exit 1
fi

示例执行

代码语言:javascript
代码运行次数:0
运行
复制
$ ./k8s_rollout_guard.sh myapp prod 90s
监控发布:prod/myapp,超时 90s
Waiting for deployment "myapp" rollout to finish: 2 out of 5 new replicas have been updated...
error: deployment "myapp" exceeded its progress deadline
发布异常,准备回滚…
deployment.apps/myapp rolled back
LAST SEEN   TYPE     REASON   OBJECT   MESSAGE
1m          Warning  Unhealthy  pod/...  Liveness probe failed

7)Nginx 上下线后端的“快开关”:nginx_switch.py(Python)

作用:通过修改一个 upstreams.d/*.conf 的 include 文件来临时下线/上线某台后端;先 nginx -t 校验,再热加载。适合临时救火。

代码语言:javascript
代码运行次数:0
运行
复制
#!/usr/bin/env python3
import argparse, re, subprocess, sys, pathlib

ap = argparse.ArgumentParser()
ap.add_argument("--file", default="/etc/nginx/upstreams.d/myapp.conf")
ap.add_argument("--server", required=True, help="例如 10.0.0.12:8080")
ap.add_argument("--action", choices=["enable","disable"], required=True)
args = ap.parse_args()

p = pathlib.Path(args.file)
txt = p.read_text()
pat = re.compile(rf'(^\s*server\s+{re.escape(args.server)}\s*)(;|down;)', re.M)

deftoggle(match):
    head = match.group(1)
    if args.action == "disable":
        return head + " down;"
    else:
        return head + ";"

new = pat.sub(toggle, txt)
if new == txt:
    print("未发生变化。"); sys.exit(0)

p.write_text(new)
ret = subprocess.run(["nginx","-t"], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True)
print(ret.stdout)
if ret.returncode != 0:
    print("配置校验失败,回滚。")
    p.write_text(txt); sys.exit(2)

subprocess.check_call(["systemctl","reload","nginx"])
print(f"{args.action} {args.server} 完成。")

示例执行

代码语言:javascript
代码运行次数:0
运行
复制
$ sudo ./nginx_switch.py --server 10.0.0.12:8080 --action disable
nginx: the configuration file /etc/nginx/nginx.conf syntax is ok
nginx: configuration file /etc/nginx/nginx.conf test is successful
disable 10.0.0.12:8080 完成。

8)临时开启 MySQL 慢日志并取证:mysql_slow_snapshot.sh(Bash)

作用:线上短时打开慢日志、降低阈值、采样 120 秒后恢复,顺便把最“重”的 SQL 频次粗排。

代码语言:javascript
代码运行次数:0
运行
复制
#!/usr/bin/env bash
set -euo pipefail
MYSQL="mysql -uroot -p${MYSQL_PWD:-root}"
DUR="${1:-120}"

echo"开启慢日志采样 ${DUR}s(long_query_time=0.2s)"
$MYSQL -e "SET GLOBAL slow_query_log=ON; SET GLOBAL long_query_time=0.2; FLUSH LOGS;"
sleep"$DUR"
echo"恢复慢日志阈值(1s)"
$MYSQL -e "SET GLOBAL long_query_time=1; FLUSH LOGS;"

LOG=$($MYSQL -Nse "SHOW VARIABLES LIKE 'slow_query_log_file'\G" | awk -F': ''/Value/ {print $2}')
OUT="/tmp/slow-sample-$(date +%F-%H%M%S).txt"
echo"解析 $LOG -> $OUT(粗排)"
grep -a "Query_time" -A1 "$LOG" | awk '
/^# Query_time/ {qt=$3}
/^use |^SET timestamp/ {next}
/^[A-Z]/ {print qt, $0}' | sort -nr | head -n 30 > "$OUT"
echo "完成:$OUT"

示例执行

代码语言:javascript
代码运行次数:0
运行
复制
$ MYSQL_PWD=secret ./mysql_slow_snapshot.sh 90
开启慢日志采样 90s(long_query_time=0.2s)
恢复慢日志阈值(1s)
解析 /var/lib/mysql/hostname-slow.log -> /tmp/slow-sample-2025-08-10-151401.txt(粗排)
完成:/tmp/slow-sample-2025-08-10-151401.txt

示例输出片段

代码语言:javascript
代码运行次数:0
运行
复制
3.201 SELECT * FROM orders WHERE user_id = ? ORDER BY created_at DESC LIMIT 50;
2.987 UPDATE carts SET ...

9)TLS 证书到期扫描:tls_scan.py(Python)

作用:批量检查证书有效期、颁发者与 SNI,离到期 N 天内高亮。

代码语言:javascript
代码运行次数:0
运行
复制
#!/usr/bin/env python3
import ssl, socket, argparse, datetime

defcheck(host, port=443, timeout=3):
    ctx = ssl.create_default_context()
    with socket.create_connection((host, port), timeout=timeout) as sock:
        with ctx.wrap_socket(sock, server_hostname=host) as ssock:
            cert = ssock.getpeercert()
            not_after = datetime.datetime.strptime(cert['notAfter'], "%b %d %H:%M:%S %Y %Z")
            days = (not_after - datetime.datetime.utcnow()).days
            issuer = dict(x[0] for x in cert['issuer'])['organizationName']
            return days, issuer

if __name__ == "__main__":
    ap = argparse.ArgumentParser()
    ap.add_argument("hosts", nargs="+")
    ap.add_argument("--warn", type=int, default=14)
    args = ap.parse_args()
    for h in args.hosts:
        days, issuer = check(h)
        tag = "⚠️"if days <= args.warn else"✓"
        print(f"{tag} {h:30} 剩余 {days:3d} 天 | 颁发者 {issuer}")

示例执行

代码语言:javascript
代码运行次数:0
运行
复制
$ ./tls_scan.py api.example.com grafana.example.com
✓ api.example.com                 剩余  63 天 | 颁发者 Let's Encrypt
⚠️ grafana.example.com             剩余   6 天 | 颁发者 Let's Encrypt

10)备份校验一把过:backup_verify.sh(Bash)

作用:校验目录内所有 .sha256 清单与对应文件是否一致;支持并行;输出失败列表。

代码语言:javascript
代码运行次数:0
运行
复制
#!/usr/bin/env bash
set -euo pipefail
DIR="${1:-/backups}"
FAIL=()
shopt -s nullglob
echo"校验 $DIR 下的 sha256…"
forsumin"$DIR"/*.sha256; do
echo">> $sum"
ifsha256sum -c "$sum"; then
    :
else
    FAIL+=("$sum")
fi
done
echo
if ((${#FAIL[@]})); then
echo"❌ 校验失败:"
printf' - %s\n'"${FAIL[@]}"
exit 1
else
echo"✅ 全部通过"
fi

示例执行

代码语言:javascript
代码运行次数:0
运行
复制
$ bash backup_verify.sh /data/backup
校验 /data/backup 下的 sha256…
>> /data/backup/db-2025-08-10.sha256
db-2025-08-10.sql.gz: OK
>> /data/backup/files-2025-08-10.sha256
files-2025-08-10.tar.gz: FAILED

❌ 校验失败:
 - /data/backup/files-2025-08-10.sha256

11)Conntrack 水位观察与建议:conntrack_watch.py(Python)

作用:每秒打印 nf_conntrack_count/max,高于阈值给出建议的 sysctl削峰动作提示(只打印,不自动改)。

代码语言:javascript
代码运行次数:0
运行
复制
#!/usr/bin/env python3
import time, argparse

defread_int(path):
    withopen(path) as f: returnint(f.read().strip())

if __name__ == "__main__":
    ap = argparse.ArgumentParser()
    ap.add_argument("--interval", type=float, default=1.0)
    ap.add_argument("--warn", type=float, default=0.8, help="比例")
    args = ap.parse_args()
    path_c="/proc/sys/net/netfilter/nf_conntrack_count"
    path_m="/proc/sys/net/netfilter/nf_conntrack_max"
    print("watching conntrack… Ctrl-C 退出")
    whileTrue:
        c=read_int(path_c); m=read_int(path_m); r=c/m
        bar=int(r*50)*"#"+"-"*(50-int(r*50))
        line=f"{c}/{m} ({r:.0%}) [{bar}]"
        if r>=args.warn:
            line+="  ⚠️ 建议:临时提高 nf_conntrack_max 或在入口限速"
        print(line, flush=True)
        time.sleep(args.interval)

示例执行

代码语言:javascript
代码运行次数:0
运行
复制
$ sudo ./conntrack_watch.py --warn 0.7
watching conntrack… Ctrl-C 退出
356120/524288 (68%) [##################################----------]
412223/524288 (79%) [##########################################--]  ⚠️ 建议:临时提高 nf_conntrack_max 或在入口限速

12)事故时间线拼接器:incident_timeline.sh(Bash)

作用:把 Nginx、应用日志、系统日志和你的 notes.txt 合并,按时间排序,得到一份可交付的统一时间线

代码语言:javascript
代码运行次数:0
运行
复制
#!/usr/bin/env bash
set -euo pipefail
OUT="timeline-$(date +%F-%H%M%S).md"
FILES=${*:-"/var/log/nginx/access.log /var/log/syslog /var/log/app/app.log ./notes.txt"}
echo "# Incident Timeline $(date -Is)" > "$OUT"
echo >> "$OUT"
# 假设日志前缀为 ISO8601 或可被 date -d 解析
grep -h . $FILES | awk '
{
  ts=$1
  for(i=2;i<=NF;i++){ if ($i ~ /^[0-9]{2}:[0-9]{2}:/) { ts=$1" "$i; break } }
  print ts " | " $0
}' | sort | sed 's/^/ - /' >> "$OUT"
echo "已生成:$OUT"

示例执行

代码语言:javascript
代码运行次数:0
运行
复制
$ bash incident_timeline.sh "/var/log/nginx/access.log /var/log/syslog ./notes.txt"
已生成:timeline-2025-08-10-152210.md

示例输出片段

代码语言:javascript
代码运行次数:0
运行
复制
# Incident Timeline 2025-08-10T15:22:10+00:00

 - 2025-08-10 15:01:02 | 2025-08-10T15:01:02Z gateway 502 /api/order
 - 2025-08-10 15:01:05 | kernel: TCP: Possible SYN flooding on port 80
 - 2025-08-10 15:01:12 | NOTE: 手动回滚 myapp 到 rev-42

13)一键抓取 P99 指标并落地:prom_p99_snap.sh(Bash)

作用:对 Prometheus 做一次即席查询,把服务的 P99 延迟与错误率快照成 CSV,方便事故附件。

代码语言:javascript
代码运行次数:0
运行
复制
#!/usr/bin/env bash
set -euo pipefail
PROM="${PROM:-http://localhost:9090}"
SERV="${1:-myapp}"
OUT="p99-${SERV}-$(date +%F-%H%M%S).csv"

q1="histogram_quantile(0.99, sum by(le) (rate(http_request_duration_seconds_bucket{service=\"$SERV\"}[5m])))"
q2="sum(rate(http_requests_total{service=\"$SERV\",code!~\"2..\"}[5m])) / sum(rate(http_requests_total{service=\"$SERV\"}[5m]))"

curl -sG "$PROM/api/v1/query" --data-urlencode "query=$q1" | jq -r '.data.result[] | [.metric.instance, .value[1]] | @csv' > "$OUT"
echo '"error_rate"' >> "$OUT"
curl -sG "$PROM/api/v1/query" --data-urlencode "query=$q2" | jq -r '.data.result[] | [.metric.service, .value[1]] | @csv' >> "$OUT"
echo "写入 $OUT"

示例执行

代码语言:javascript
代码运行次数:0
运行
复制
$ PROM=http://prometheus.prod:9090 ./prom_p99_snap.sh gateway
写入 p99-gateway-2025-08-10-153000.csv

示例输出片段

代码语言:javascript
代码运行次数:0
运行
复制
"10.0.0.11:8080","0.120"
"10.0.0.12:8080","0.098"
"error_rate"
"gateway","0.032"

14)Grafana JSON 备份器:grafana_backup.py(Python)

作用:备份所有 Dashboard 为 JSON 到本地目录(使用 API Token),方便变更前留档或跨环境迁移。

代码语言:javascript
代码运行次数:0
运行
复制
#!/usr/bin/env python3
import requests, argparse, os, json

ap = argparse.ArgumentParser()
ap.add_argument("--url", required=True)
ap.add_argument("--token", required=True)
ap.add_argument("--out", default="./grafana-backup")
args = ap.parse_args()

hdr = {"Authorization": f"Bearer {args.token}"}
os.makedirs(args.out, exist_ok=True)
r = requests.get(f"{args.url}/api/search?type=dash-db&query=", headers=hdr, timeout=10)
r.raise_for_status()
for item in r.json():
    uid = item["uid"]
    d = requests.get(f"{args.url}/api/dashboards/uid/{uid}", headers=hdr, timeout=10).json()
    title = d["dashboard"]["title"].replace("/","_")
    path = os.path.join(args.out, f"{title}-{uid}.json")
    withopen(path,"w") as f: json.dump(d, f, ensure_ascii=False, indent=2)
    print("saved", path)
print("完成。")

示例执行

代码语言:javascript
代码运行次数:0
运行
复制
$ ./grafana_backup.py --url https://grafana.example.com --token eyJrIjoi... --out ./gfbak
saved gfbak/Gateway延迟-p99-a1b2c3.json
saved gfbak/主机-CPU-内存-d4e5f6.json
完成。

15)一键抓包并自带“口供”:pcap_with_context.sh(Bash)

作用:抓指定接口 60 秒,同时记录 ip route/iptables/conntrack 摘要,统一打包。

代码语言:javascript
代码运行次数:0
运行
复制
#!/usr/bin/env bash
set -euo pipefail
IF="${1:-cni0}"
DUR="${2:-60}"
DIR="pcap-$(hostname)-$(date +%F-%H%M%S)"
mkdir"$DIR"
echo"抓取 $IF ${DUR}s…"
tcpdump -ni "$IF" -w "$DIR/cap.pcap" -G "$DUR" -W 1 >/dev/null 2>&1 &
PID=$!
sleep 1
ip route > "$DIR/ip-route.txt"
iptables -t nat -S > "$DIR/iptables-nat.txt" 2>/dev/null || true
conntrack -S > "$DIR/conntrack.txt" 2>/dev/null || true
wait"$PID"
tar czf "$DIR.tgz""$DIR"
echo "已保存 $DIR.tgz"

示例执行

代码语言:javascript
代码运行次数:0
运行
复制
$ sudo ./pcap_with_context.sh eth0 30
抓取 eth0 30s…
已保存 pcap-node-a-2025-08-10-153741.tgz

16)系统服务“稳重启”:svc_restart_safe.sh(Bash)

作用:重启前先保存状态(最近日志与端口占用),重启后验证健康,失败就打印原因。适合 systemd 服务。

代码语言:javascript
代码运行次数:0
运行
复制
#!/usr/bin/env bash
set -euo pipefail
SVC="${1:?用法:svc_restart_safe.sh <service>.service}"
echo"保存最近日志与端口占用…"
journalctl -u "$SVC" -n 200 --no-pager > /tmp/${SVC}.pre.log || true
ss -ltnp | grep -i "${SVC%%.*}" > /tmp/${SVC}.ports || true

echo"准备重启:$SVC(Ctrl-C 取消)"
sleep 2
systemctl restart "$SVC"
systemctl is-active --quiet "$SVC" && echo"服务运行中。" || { echo"服务未运行!"; exit 1; }
sleep 1
systemctl status "$SVC" --no-pager | sed -n '1,12p'

示例执行

代码语言:javascript
代码运行次数:0
运行
复制
$ sudo ./svc_restart_safe.sh myapp.service
保存最近日志与端口占用…
准备重启:myapp.service(Ctrl-C 取消)
服务运行中。
● myapp.service - MyApp
   Loaded: loaded (/etc/systemd/system/myapp.service; enabled)
   Active: active (running) since Sun 2025-08-10 15:41:12 UTC; 1s ago

17)NTP/时钟偏差秒查:clock_drift.sh(Bash)

作用:对比本机与多个 NTP/HTTP 源的时间,偏差超过阈值报警。

代码语言:javascript
代码运行次数:0
运行
复制
#!/usr/bin/env bash
set -euo pipefail
set +e
SOURCES=("time.google.com""time.cloudflare.com""www.baidu.com")
THRESH="${1:-0.5}"# 秒
now=$(date +%s)
for s in"${SOURCES[@]}"; do
  t=$(curl -sI "https://$s" | awk -F': ''tolower($1)=="date"{print $2}' | xargs -I{} date -d "{}" +%s 2>/dev/null)
if [[ -n "$t" ]]; then
    d=$(( t - now ))
    echo"$s 偏差 ${d}s"
    awk -v d="$d" -v th="$THRESH"'BEGIN{if (d>th || d<-th) print "⚠️ 偏差过大"}'
fi
done

示例执行

代码语言:javascript
代码运行次数:0
运行
复制
$ ./clock_drift.sh 1
time.google.com 偏差 0s
time.cloudflare.com 偏差 -1s
⚠️ 偏差过大
www.baidu.com 偏差 0s

18)系统资源硬阈值守护:guard_limits.sh(Bash)

作用:当负载、内存、磁盘逼近阈值时,自动执行温和动作(降级、限流、发信号),全程有日志。

代码语言:javascript
代码运行次数:0
运行
复制
#!/usr/bin/env bash
set -euo pipefail
LOG="/var/log/guard_limits.log"
CPU_MAX="${CPU_MAX:-8.0}"
MEM_MAX="${MEM_MAX:-90}"
DISK_MAX="${DISK_MAX:-90}"

log(){ echo"[$(date -Is)] $*" | tee -a "$LOG"; }

whiletrue; do
  load=$(cut -d' ' -f1 < /proc/loadavg)
  mem=$(free | awk '/Mem:/ {printf "%.0f", $3/$2*100}')
  disk=$(df -h / | awk 'NR==2{gsub("%",""); print $5}')

if (( $(echo "$load > $CPU_MAX" | bc -l) )); then
    log"负载 $load 超阈值 $CPU_MAX,建议降级或限流。"
fi
if (( mem > MEM_MAX )); then
    log"内存 ${mem}% 超阈值 ${MEM_MAX}%,建议重启泄漏进程或临时扩容。"
fi
if (( disk > DISK_MAX )); then
    log"磁盘 ${disk}% 超阈值 ${DISK_MAX}%,触发日志压缩/清理策略。"
fi
sleep 5
done

示例输出

代码语言:javascript
代码运行次数:0
运行
复制
$ sudo ./guard_limits.sh
[2025-08-10T15:48:01+00:00] 内存 92% 超阈值 90%,建议重启泄漏进程或临时扩容。

19)HTTP 回环与出口诊断:curl_diag.sh(Bash)

作用:对一个 URL 做多视角诊断:DNS、TCP/TLS 时间、HTTP 状态、出口 IP,全部一屏。

代码语言:javascript
代码运行次数:0
运行
复制
#!/usr/bin/env bash
set -euo pipefail
URL="${1:?用法:curl_diag.sh <url>}"
echo "DNS:"
dig +short "$(echo "$URL" | awk -F/ '{print $3}')" || true
echo
echo "CURL:"
curl -sS -o /dev/null -w "code=%{http_code} namelookup=%{time_namelookup}s connect=%{time_connect}s tls=%{time_appconnect}s starttransfer=%{time_starttransfer}s total=%{time_total}s\n" "$URL"
echo
echo "出口IP:"
curl -s https://ifconfig.me 2>/dev/null || true
echo

示例执行

代码语言:javascript
代码运行次数:0
运行
复制
$ ./curl_diag.sh https://api.example.com/health
DNS:
203.0.113.23

CURL:
code=200 namelookup=0.004s connect=0.010s tls=0.050s starttransfer=0.090s total=0.091s

出口IP:
198.51.100.12

20)系统变更“前后对照”:sys_diff.sh(Bash)

作用:记录关键配置快照并与上次对比,定位“变更引发的怪事”。

代码语言:javascript
代码运行次数:0
运行
复制
#!/usr/bin/env bash
set -euo pipefail
DIR="/var/snap/sysdiff"
NOW="$DIR/$(date +%F-%H%M%S)"
mkdir -p "$NOW"
echo"采集快照 @ $NOW"
uname -a > "$NOW/uname.txt"
sysctl -a 2>/dev/null | sort > "$NOW/sysctl.txt"
iptables-save > "$NOW/iptables.txt" 2>/dev/null || true
ip -br a > "$NOW/ip-a.txt"
ip route > "$NOW/ip-route.txt"
lsmod | sort > "$NOW/lsmod.txt"
last=$(ls -1 $DIR | sort | tail -n 2 | head -n 1 || true)
if [[ -n "$last" ]]; then
echo"与上次 $last 对比:"
  diff -u "$DIR/$last/sysctl.txt""$NOW/sysctl.txt" || true
else
echo"首次采集,无对比。"
fi

示例执行

代码语言:javascript
代码运行次数:0
运行
复制
$ sudo ./sys_diff.sh
采集快照 @ /var/snap/sysdiff/2025-08-10-155201
与上次 2025-08-09-230101 对比:
--- /var/snap/sysdiff/2025-08-09-230101/sysctl.txt
+++ /var/snap/sysdiff/2025-08-10-155201/sysctl.txt
+net.ipv4.tcp_tw_reuse = 1

总结

脚本是“现场经验”的固化,救火时要快,事后要准。建议你把上面的脚本按团队的约定再做三件小事: 1)参数表写在脚本头部注释里,默认只读或 --dry-run; 2)日志留痕,每次执行都带 timestamp 与主机名,方便回溯; 3)小而全,宁可拆分也别做成“万能脚本”。

你完全可以把这套放进 ops-tools/,配上 Makefilejustfile 做入口,再配套一个 README 的“使用清单”。真正的救命时刻,它们会替你说话。

老杨时间

这里我先声明一下,日常生活中大家都叫我波哥,跟辈分没关系,主要是岁数大了.就一个代称而已. 我的00后小同事我喊都是带哥的.张哥,李哥的. 但是这个称呼呀,在线下参加一些活动时.金主爸爸也这么叫就显的不太合适. 比如上次某集团策划总监,公司开大会来一句:“今个咱高兴!有请IT运维技术圈的波哥讲两句“ 这个氛围配这个称呼在互联网这行来讲就有点对不齐! 每次遇到这个情况我就想这么接话: “遇到各位是缘分,承蒙厚爱,啥也别说了,都在酒里了.我干了,你们随意!” 所以以后咱们改叫老杨,即市井又低调.还挺亲切,我觉得挺好.

运维X档案系列文章:

从告警到CTO:一个P0故障的11小时生死时速

企业级 Kubernetes 集群安全加固全攻略( 附带一键检查脚本)

看完别走.修行在于点赞、转发、在看.攒今世之功德,修来世之福报

点击阅读原文或打开地址实时收集分析全球vps的项目 vps.top365app.com

老杨AI的号: 98dev

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2025-08-20,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 IT运维技术圈 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 正文
    • 1)主机快照体检:triage.sh(Bash)
    • 2)“带柄”的日志切割:safe_logrotate.sh(Bash)
    • 3)进程“发胖”守护:leak_guard.py(Python)
    • 4)磁盘“止血针”:free_disk_hotfix.sh(Bash)
    • 5)端口与 HTTP 并发探针:port_probe.py(Python)
    • 6)K8s 发布看守与快速回滚:k8s_rollout_guard.sh(Bash)
    • 7)Nginx 上下线后端的“快开关”:nginx_switch.py(Python)
    • 8)临时开启 MySQL 慢日志并取证:mysql_slow_snapshot.sh(Bash)
    • 9)TLS 证书到期扫描:tls_scan.py(Python)
    • 10)备份校验一把过:backup_verify.sh(Bash)
    • 11)Conntrack 水位观察与建议:conntrack_watch.py(Python)
    • 12)事故时间线拼接器:incident_timeline.sh(Bash)
    • 13)一键抓取 P99 指标并落地:prom_p99_snap.sh(Bash)
    • 14)Grafana JSON 备份器:grafana_backup.py(Python)
    • 15)一键抓包并自带“口供”:pcap_with_context.sh(Bash)
    • 16)系统服务“稳重启”:svc_restart_safe.sh(Bash)
    • 17)NTP/时钟偏差秒查:clock_drift.sh(Bash)
    • 18)系统资源硬阈值守护:guard_limits.sh(Bash)
    • 19)HTTP 回环与出口诊断:curl_diag.sh(Bash)
    • 20)系统变更“前后对照”:sys_diff.sh(Bash)
    • 总结
    • 老杨时间
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档