https://www.elastic.co/guide/en/logstash/7.17/plugins-inputs-jdbc.html#plugins-inputs-jdbc
use_column_value => true
tracking_column => "tracking_time"
# 这两行配置是一起用的,当 use_column_value 取值为true 时,sql_last_value 会跟踪 tracking_column 指定的字段的值,这里指定的是 "tracking_time" 这个字段
# 当 tracking_time 为 date类型,还需要指定
tracking_column_type => "timestamp" #因为该参数默认为 "numeric"
# 当 use_column_value 取值为 false 时,sql_last_value 默认取值为 last_run_metadata_path 中记录 Path to file with last run time
last_run_metadata_path => "/usr/local/service/logstash/temp/.my-pipeline-sql_last_value.yml"
# 手动指定该值需要赋予正确的值,文件必须是 yml格式,且父级目录必须存在,例如
"/usr/local/service/logstash/temp/.my-pipeline-sql_last_value.yml"
NULL
jdbc_default_timezone => "Asia/Shanghai"
jdbc_page_size => 1000
xxx?serverTimezone=Asia/Shanghai
tracking_column
追踪字段在当前写入了一个历史的值,具体场景可以理解为 logstash 在 2024-01-01T01:00:00.000+0800
已经调度过一次增量任务并且更新了 tracking_column
的值,但是MySQL在 2024-01-01T02:00:00.000+0800
写入了一条时间为 2023-12-31T14:25:36.000+0800
的数据(该场景我们称之为离线任务),这个时候就会出现这1条历史数据始终无法被 logstash 同步到 ES 的情况。因此我们就需要在 input schedule 避开离线任务调度的时间,否则可能会出现丢数问题,如 schedule => "*/5 9-23 * * *"
input {
jdbc {
jdbc_driver_library => "/usr/local/service/logstash/extended-files/mysql-connector-java-8.0.18.jar"
jdbc_driver_class => "com.mysql.cj.jdbc.Driver"
jdbc_connection_string => "jdbc:mysql://mysql:3306/database?serverTimezone=Asia/Shanghai"
jdbc_user => "my_table"
jdbc_password => "xxxxxx"
jdbc_default_timezone => "Asia/Shanghai"
jdbc_paging_enabled => true
jdbc_page_size => 1000
jdbc_validate_connection => true
jdbc_validation_timeout => 3600
use_column_value => true
tracking_column => "update_time"
tracking_column_type => "timestamp"
# 定时调度,避开凌晨离线任务调度时间
schedule => "*/5 9-23 * * *"
statement => "SELECT * FROM my_table WHERE update_time > :sql_last_value"
last_run_metadata_path => "/usr/local/service/logstash/extended-files/.pro_mysql_2_es_last_run.yml"
}
}
filter {
mutate {
remove_field => ["@version", "@timestamp"]
convert => {
"id" => "string"
}
}
}
output {
elasticsearch {
hosts => ["http://127.0.0.1:9200"]
index => "idx"
document_id => "%{id}"
user => "elastic"
password => "xxxxxx"
}
}
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。