Logstash 语句参照表

最近更新时间:2024-11-07 09:37:32

我的收藏
部分 Logstash 语句,在数据加工中对应的函数如下表。
场景
Logstash
数据加工
重命名字段
mutate
mutate {
rename => {"old_field_name" => "new_field_name"
} }
fields_rename("old_field_name","new_field_name" )
删除字段
mutate {
remove_field => ["password_hash"]
}
fields_drop("password_hash")
更新字段值
mutate {
update => {"status_code" => "Not Found"
} } //将status_code更新为Not Found
fields_set("status_code", "Not Found")
提取键值- Grok
grok
grok {
match => { "message" => "%{TIMESTAMP_ISO8601:time} %{LOGLEVEL:level} " }}
ext_grok("message",grok="%{TIMESTAMP_ISO8601:time} %{DATA:level}")
提取键值- 分隔符
split
mutate {
split => { "message" => "|" }
add_field => {
"time" => "%{[message][0]}"
"level" => "%{[message][1]}"
"taskId" => "%{[message][2]}"
"ProcessName" => "%{[message][3]}"
"ip" => "%{[message][4]}"
}
remove_field => ["message"]
}
ext_sepstr("message","time,loglevel,taskId,ProcessName,ip",sep="\\|")
fields_drop("message")
提取键值-JSON
json
json {
source => "message"
target => "parsed_data"
}
ext_json("message")
删除日志
drop
if [status] == 404 { //如果status=404
drop { } //删除日志
}
log_drop( 删除日志
op_eq(v("status"),404)// 当status的值=404
)
逻辑判断
if else
if [log] //如果存在log字段
if "Cost" in [message] //message字段值有"Cost"字符时
t_if(has_field("log"))//如果存在log字段
t_if (
str_exist(v(message), "Cost", ignore_upper=True)
)//message字段值有"Cost"字符时
or , and
if "Cost" in [message] or "cost" in [message]
op_or(
str_exist(v(message), "Cost", ignore_upper=False),
str_exist(v(message), "cost", ignore_upper=False)
)
分发日志至多个sink(目标)
output

if [container] == "scm-pfc" {
elasticsearch {
hosts => ["xx.xx.x.xxx:9200"]
index => "p-k8s"
}

} else {
elasticsearch {
hosts => ["xx.xx.x.xx:9200"]
index => "p-container"
}}
op_str_eq(v("container"),"scm-pfc"),
log_output("p-k8s"), //if 分支
log_output("p-container")//else分支
)