前往小程序,Get更优阅读体验!
立即前往
发布
社区首页 >专栏 >logstash增量同步MySQL关于sql_last_value取值失败的问题

logstash增量同步MySQL关于sql_last_value取值失败的问题

原创
作者头像
WeldonWang
修改2024-12-02 14:39:57
修改2024-12-02 14:39:57
1.1K0
举报
文章被收录于专栏:腾讯云大数据与AI专家服务

官网

https://www.elastic.co/guide/en/logstash/7.17/plugins-inputs-jdbc.html#plugins-inputs-jdbc


常见问题

代码语言:shell
复制
use_column_value => true
tracking_column => "tracking_time"
# 这两行配置是一起用的,当 use_column_value 取值为true 时,sql_last_value 会跟踪 tracking_column 指定的字段的值,这里指定的是 "tracking_time" 这个字段
# 当 tracking_time 为 date类型,还需要指定
tracking_column_type => "timestamp" #因为该参数默认为 "numeric"
代码语言:shell
复制
# 当 use_column_value 取值为 false 时,sql_last_value 默认取值为 last_run_metadata_path 中记录 Path to file with last run time
last_run_metadata_path => "/usr/local/service/logstash/temp/.my-pipeline-sql_last_value.yml"
# 手动指定该值需要赋予正确的值,文件必须是 yml格式,且父级目录必须存在,例如
"/usr/local/service/logstash/temp/.my-pipeline-sql_last_value.yml"

必要项检查

  1. 时间字段不能为 NULL
  2. input jdbc时区必须指定 jdbc_default_timezone => "Asia/Shanghai"
  3. jdbc_page_size不能过大,建议 jdbc_page_size => 1000
  4. jdbc_connection_string 最好也指定一下时区,如 xxx?serverTimezone=Asia/Shanghai
  5. 若上游 MySQL 数据更新有离线任务,比如 tracking_column 追踪字段在当前写入了一个历史的值,具体场景可以理解为 logstash 在 2024-01-01T01:00:00.000+0800 已经调度过一次增量任务并且更新了 tracking_column 的值,但是MySQL在 2024-01-01T02:00:00.000+0800 写入了一条时间为 2023-12-31T14:25:36.000+0800 的数据(该场景我们称之为离线任务),这个时候就会出现这1条历史数据始终无法被 logstash 同步到 ES 的情况。因此我们就需要在 input schedule 避开离线任务调度的时间,否则可能会出现丢数问题,如 schedule => "*/5 9-23 * * *"

贴一个线上用户的例子:该用户使用 logstash 实时将上游 MySQL 数据增量同步到 ES,但因上游 MySQL 的数据也并非真正数据源,存在每天凌晨跑批从其他地方同步数据的情况。

代码语言:yaml
复制
input {
  jdbc {
    jdbc_driver_library => "/usr/local/service/logstash/extended-files/mysql-connector-java-8.0.18.jar"
    jdbc_driver_class => "com.mysql.cj.jdbc.Driver"
    jdbc_connection_string => "jdbc:mysql://mysql:3306/database?serverTimezone=Asia/Shanghai"
    jdbc_user => "my_table"
    jdbc_password => "xxxxxx"
    jdbc_default_timezone => "Asia/Shanghai"
    jdbc_paging_enabled => true
    jdbc_page_size => 1000
    jdbc_validate_connection => true
    jdbc_validation_timeout => 3600

    use_column_value => true
    tracking_column => "update_time"
    tracking_column_type => "timestamp"

    # 定时调度,避开凌晨离线任务调度时间
    schedule => "*/5 9-23 * * *"

    statement => "SELECT * FROM my_table WHERE update_time > :sql_last_value"

    last_run_metadata_path => "/usr/local/service/logstash/extended-files/.pro_mysql_2_es_last_run.yml"
  }
}

filter {
  mutate {
    remove_field => ["@version", "@timestamp"]
    convert => {
      "id" => "string"
    }
  }
}

output {
  elasticsearch {
    hosts => ["http://127.0.0.1:9200"]
    index => "idx"
    document_id => "%{id}"
    user => "elastic"
    password => "xxxxxx"
  }
}

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 官网
  • 常见问题
  • 必要项检查
  • 贴一个线上用户的例子:该用户使用 logstash 实时将上游 MySQL 数据增量同步到 ES,但因上游 MySQL 的数据也并非真正数据源,存在每天凌晨跑批从其他地方同步数据的情况。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档