日志结构化-分隔符

最近更新时间:2024-10-21 14:25:52

我的收藏

场景描述

场景1:小王的日志使用 | 进行分隔,现需要在其中提取结构化的字段。
场景2:小王将 Flink 任务运行的日志,以单行文本采集到日志服务(Cloud Log Service,CLS)。其中有一段 JSON,是 Flink 返回的包,小王想将这个 JSON 结构化。

场景1

原始日志

{
"message": "2021-12-09 11:34:28.279|INFO|605c643e29e4|BIN--Python|192.168.1.1"
}

加工语句

//使用|作为分隔符(竖线需要转义),从message中提取time,loglevel,taskId,ProcessName,ip字段.
ext_sepstr("message","time,loglevel,taskId,ProcessName,ip",sep="\\|")
//丢弃message
fields_drop("message")

加工结果

{
"ProcessName":"BIN--PYTHON",
"ip":"192.168.1.1",
"loglevel":"INFO",
"taskId":"605c643e29e4",
"time":"2021-12-09 11:34:28.279"
}

场景2

原始日志

{
"regex": "2021-12-02 14:33:35.022 [1] INFO org.apache.Load - Response:status: 200, resp msg: OK, resp content: { \\"TxnId\\": 58322, \\"Label\\": \\"flink_connector_20211202_1de749d8c80015a8\\", \\"Status\\": \\"Success\\", \\"Message\\": \\"OK\\", \\"TotalRows\\": 1, \\"LoadedRows\\": 1, \\"FilteredRows\\": 0, \\"CommitAndPublishTimeMs\\": 16}"
}

DSL 加工函数

//使用逗号作为分隔符,将日志切成三段
ext_sepstr("regex", "f1, f2, f3", sep=",")
//仅保留f3字段:即resp content:及其后续的字符
fields_drop("regex","f1","f2")
//使用冒号来切割f3字段值,仅保留{}和括号内的字符,存至resp_content字段,该字段是一个json,是flink返回的消息包
ext_sepstr("f3", "f1,resp_content", sep=":")
fields_drop("f1","f3")
//对resp_content字段结构化
ext_json("resp_content", prefix="")
//丢弃原字段
fields_drop("resp_content")

加工结果

{
"CommitAndPublishTimeMs":"16",
"FilteredRows":"0",
"Label":"flink_connector_20211202_1de749d8c80015a8",
"LoadedRows":"1",
"Message":"OK",
"Status":"Success",
"TotalRows":"1",
"TxnId":"58322"
}