[exporter] [Vector] [OTel GW] [OpenObserve]
NE ─┐ ┌─────────┐ ┌──────────┐ ┌─────────────┐
PE ─┼── metrics/logs ────> │ Vector │ ───> │ OTel GW │ ──> │ OO │
DF ─┤ └─────────┘ └──────────┘ └─────────────┘
LG ─┘
(近线窗口 ETL: 对齐=1m · 延迟=2m)
│
▼
┌──────────────────────────────────────┐
IaC/Cloud ────────────>│ │
│ ObserveBridge (ETL 任务) │
Ansible ───────────>│ • ETL 窗口聚合 / oo_locator │
│ • 拓扑 (IaC/Ansible) │
OO 明细(OO→OB) ───────>│ • AGE 10 分钟活跃调用图刷新 │
└──────────────────────────────────────┘
┌─────────────────────────────── Postgres 套件 ───────────────────────────────┐
│ PG_JSONB │ PG Aggregates (Timescale) │ PG Vector │ AGE │
│ (oo_locator/events) │ (metric_1m / call_5m / log_5m)│ (pgvector) │ Graph │
└───────────────┬────────┴───────────────┬──────────────┴─────────────┬────────┘
│ │ │
│ │ │
▼ ▼ ▼
[ llm-ops-agent / 应用消费(查询/检索/推理) ]
目标:在单二进制 Go 编排器内,完成三条主干 ETL 闭环:
metric_1m
、service_call_5m
、log_pattern_5m
,维护 log_pattern
指纹库与 oo_locator
回查线索。service_call_5m
为源,刷新 AGE 图中 10 分钟活跃服务级调用边 (:Resource)-[:CALLS]->(:Resource)
。topo_edge_time
(开/关区间),可选同步到 AGE 作为 :STRUCT
边。调度特性:窗口对齐 + 延迟容忍 + DAG 依赖 + 幂等 Upsert + 分片多租户。
可靠性:PG 唯一索引保证一次性;指数退避重试;失败熔断;事件补数回放。
etl/
├─ cmd/etl/ # 二进制入口/CLI
│ └─ main.go
├─ pkg/
│ ├─ scheduler/ # 调度器(窗口计算/派发)
│ ├─ runner/ # Worker 池 + 执行/重试/回调
│ ├─ registry/ # Job 接口 + 注册中心 + DAG
│ ├─ store/ # 状态/一次性保证/简易队列(PG)
│ ├─ window/ # 时间对齐/窗口工具
│ ├─ events/ # 事件入口(HTTP/CloudEvents风格)
│ ├─ oo/ # OO 读取(S3 客户端 + 查询 API 适配)
│ ├─ agg/ # 聚合器(指标1m / 调用5m / 指纹5m)
│ ├─ patterns/ # 日志指纹挖掘(Drain / RE2)
│ ├─ iac/ # IaC/Cloud 归一(TF/Pulumi/aws/aliyun…)
│ ├─ ansible/ # Playbook/Inventory 解析与依赖抽取
│ └─ pgw/ # PG 写入器(COPY 批量 + 幂等 upsert)
├─ jobs/
│ ├─ ooagg.go # OO → metric_1m / call_5m / log_5m / locator
│ ├─ age_refresh.go # 近10分钟活跃调用图刷新(依赖 ooagg)
│ ├─ topo_iac.go # IaC/Cloud → topo_edge_time(时态差分)
│ └─ topo_ansible.go # Ansible → topo_edge_time(时态差分)
├─ sql/
│ └─ age_refresh.sql # AGE 刷新 SQL
└─ configs/
└─ etl.yaml # 调度/并发/延迟 配置
下面是模块—接口—数据源/目标—窗口/键—幂等的一览表(开发/联调用)。
模块 | API/入口 | SRC(输入) | DEST(输出) | 窗口/键 | 幂等/唯一约束 | 备注 |
---|---|---|---|---|---|---|
pkg/oo | Stream(ctx, tenant, w, fn) | OO(S3 分区或查询 API)logs/metrics/traces | 回调 oo.Record | w=[From,To) | — | 统一时间/URN;并发读取 |
pkg/agg | Feed(rec) / Drain() | oo.Record | Metrics1m / Calls5m / LogPatterns5m / PatternsUpsert / Locators | 1m/5m | — | 内存桶聚合、指纹抽取 |
pkg/pgw | Flush(ctx, tenant, w, out) | 聚合结果 out | metric_1m、service_call_5m、log_pattern_5m、log_pattern、oo_locator、dim_resource | 1m/5m 主键 | ON CONFLICT DO UPDATE;oo_locator 唯一 | PG 批量 COPY + Upsert |
jobs/ooagg | Run(ctx, tenant, w) | pkg/oo → pkg/agg | pkg/pgw.Flush | Align=1m;Delay=2m | 由目标表主键保证 | 成功后触发 age-refresh |
sql/age_refresh.sql | cypher('ops', ...) | service_call_5m 近10分钟 | AGE 图 CALLS 边 | 10 分钟 | MERGE 唯一匹配 | e.last_seen/rps/err/p95 |
jobs/age_refresh | Run(ctx, tenant, w) | service_call_5m | 执行 SQL | Align=5m | — | After()="oo-agg" |
pkg/iac | Discover(ctx, tenant) | TF/Pulumi/Cloud API | 边集合 []Edge | — | — | 构造 URN、relation |
pkg/ansible | ExtractDeps(ctx, tenant) | inventory/group_vars/roles | 边集合 []Edge | — | — | 解析 upstream/连接串 |
pkg/pgw | UpsertTopoEdges(ctx, tenant, edges) | iac/ansible 边 | topo_edge_time(时态) | valid tstzrange | PK(tenant,src,dst,rel,valid) | 差分开/关区间 |
jobs/topo_iac | Run(ctx, tenant, w) | pkg/iac.Discover | topo_edge_time | Align=15m | — | 结构拓扑 |
jobs/topo_ansible | Run(ctx, tenant, w) | pkg/ansible.ExtractDeps | topo_edge_time | Align=1h | — | 应用依赖拓扑 |
pkg/events | /events/enqueue | CloudEvents/HTTP | etl_job_run 状态置 queued | 任意窗口 | ux_job_once | 手动补数/回放 |
pkg/store | EnqueueOnce/Mark* | — | etl_job_run | job/tenant/window | ux_job_once | 一次性保证/队列 |
pkg/scheduler | Tick() | dim_tenant & etl_job_run | 入队窗口 | Align/Delay/Lookback | — | 动态加载配置 |
相关 PG 表(12 + ETL 元数据)
dim_tenant
、dim_resource
oo_locator
metric_1m
(1m)、service_call_5m
(5m)、log_pattern_5m
(5m)log_pattern
topo_edge_time
kb_doc
、kb_chunk
(向量)event_envelope
、evidence_link
etl_job_run
、etl_job_circuit
ops
图(Resource
、CALLS
)upper = floor(now - Delay, Align)
,从上次成功窗口末尾或 initial_lookback
起步。log_pattern
以 pattern hash 或唯一键 upsert;oo_locator
唯一组合避免重复。urn:k8s:svc:<ns>/<name>
/ urn:host:<host>
/ urn:db:postgres:<cluster>/<db>
,并缓存 resource_id
。service_call_5m
与 topo_edge_time
。tstzrange
开/关区间维护时态;当边消失时关闭上次开区间。每个任务包含:name / 描述 / 测试-验证。可直接拆成 Codex 指令执行(create-or-update-files / patch),或作为 PR checklist。
pkg/pgw.Flush
(COPY + Upsert 批量写)oo_locator
写入并回填 sample_ref
;metric_1m
/service_call_5m
/log_pattern
/log_pattern_5m
批量写入,全部 ON CONFLICT DO UPDATE
。etl_job_run
状态转为 success
;冲突率 < 5%。sql/age_refresh.sql
的程序化执行jobs/age_refresh.go
读取并执行 sql/age_refresh.sql
,或用参数化 SQL 内嵌实现。service_call_5m
行,运行 age-refresh
,检查 AGE 中 CALLS
边的 last_seen/rps/err_rate/p95
。pkg/oo.Stream
的 MOCK 与真实 S3/API 适配--mock
开关。dataset/yyyy=.../hh=.../mm=...
列表对象;API:按时间窗查询。--mock
模式 1m 生成 5k 日志、500 指标点、1k span,端到端落库 < 3s/窗口 pkg/agg
聚合正确性avg/max/p95
、调用 5m rps/err/p50/p95
、日志指纹 5m 计数;接入 patterns.MineTemplate
。pkg/patterns
指纹抽取(Drain/RE2)fingerprint
、severity
;可配置忽略字段。count_error
正确累积。pkg/iac.Discover
+ pkg/ansible.ExtractDeps
DEPENDS_ON
边。topo_edge_time
新增开区间;删除节点后再次运行,旧边区间关闭。T7 — pkg/pgw.UpsertTopoEdges
时态差分
E_now
与当前开区间 E_prev
的集合差;新增插入开区间,消失关闭区间。valid
上界从无穷变为 now()
。pkg/events
事件补数接口POST /events/enqueue
支持 {job, tenant_id, from, to}
回放窗口;写入 etl_job_run
为 queued
并入队。pkg/config
已有;在 scheduler
注入 Cfg
并使用 tenants.initial_lookback.oo-agg
等。etl_job_run
记录后启动,观察从 initial_lookback
开始补跑。/metrics
或写回 OO);记录窗口滞后。window_lag_seconds
< Delay + Align
;失败重试曲线可见。SELECT metric, count(*), min(bucket), max(bucket)
FROM metric_1m
GROUP BY metric ORDER BY count(*) DESC LIMIT 10;
WITH active AS (
SELECT src_resource_id, dst_resource_id
FROM service_call_5m
WHERE bucket >= now() - interval '10 minutes'
GROUP BY 1,2
)
SELECT count(*) AS edges_in_source FROM active;
-- 再在 AGE 里查同样规模(CALLS 边数量),两者应近似一致(考虑租户过滤)
-- 当前有效边
SELECT count(*) FROM topo_edge_time WHERE upper_inf(valid);
-- 历史边(已关闭)
SELECT count(*) FROM topo_edge_time WHERE NOT upper_inf(valid);
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。