场景描述
场景1:小王的日志使用 | 进行分隔,现需要在其中提取结构化的字段。
场景2:小王将 Flink 任务运行的日志,以单行文本采集到日志服务(Cloud Log Service,CLS)。其中有一段 JSON,是 Flink 返回的包,小王想将这个 JSON 结构化。
场景1
原始日志
{"message": "2021-12-09 11:34:28.279|INFO|605c643e29e4|BIN--Python|192.168.1.1"}
加工语句
//使用|作为分隔符(竖线需要转义),从message中提取time,loglevel,taskId,ProcessName,ip字段.ext_sepstr("message","time,loglevel,taskId,ProcessName,ip",sep="\\|")//丢弃messagefields_drop("message")
加工结果
{"ProcessName":"BIN--PYTHON","ip":"192.168.1.1","loglevel":"INFO","taskId":"605c643e29e4","time":"2021-12-09 11:34:28.279"}
场景2
原始日志
{"regex": "2021-12-02 14:33:35.022 [1] INFO org.apache.Load - Response:status: 200, resp msg: OK, resp content: { \\"TxnId\\": 58322, \\"Label\\": \\"flink_connector_20211202_1de749d8c80015a8\\", \\"Status\\": \\"Success\\", \\"Message\\": \\"OK\\", \\"TotalRows\\": 1, \\"LoadedRows\\": 1, \\"FilteredRows\\": 0, \\"CommitAndPublishTimeMs\\": 16}"}
DSL 加工函数
//使用逗号作为分隔符,将日志切成三段ext_sepstr("regex", "f1, f2, f3", sep=",")//仅保留f3字段:即resp content:及其后续的字符fields_drop("regex","f1","f2")//使用冒号来切割f3字段值,仅保留{}和括号内的字符,存至resp_content字段,该字段是一个json,是flink返回的消息包ext_sepstr("f3", "f1,resp_content", sep=":")fields_drop("f1","f3")//对resp_content字段结构化ext_json("resp_content", prefix="")//丢弃原字段fields_drop("resp_content")
加工结果
{"CommitAndPublishTimeMs":"16","FilteredRows":"0","Label":"flink_connector_20211202_1de749d8c80015a8","LoadedRows":"1","Message":"OK","Status":"Success","TotalRows":"1","TxnId":"58322"}