Elasticsearch 6.3 发布SQL模块作为C-Pack的一部分使用
Dev Tools
- console
查看
POST /_xpack/sql?format=txt
{
"query": "SHOW tables"
}
# 查询有哪些表
POST /_xpack/sql?format=txt
{
"query": "show functions "
}
# 查询支持的函数
POST /_xpack/sql?format=txt
{
"query": " select count(1) as \"统计行\" from \"browser_req-*\" "
}
# 转义特殊字符
/usr/share/logstash/bin/logstash-plugin install logstash-input-jdbc
/usr/share/logstash/bin/logstash-plugin install logstash-output-elasticsearch
wget -P /usr/local/src/ https://cdn.mysql.com//Downloads/Connector-J/mysql-connector-java-5.1.46.zip
unzip /usr/local/src/mysql-connector-java-5.1.46.zip -d /usr/local/
# 下载解压
/etc/logstash/conf.d/sql.indexer.conf
input {
jdbc {
jdbc_driver_library => "/usr/local/mysql-connector-java-5.1.46/mysql-connector-java-5.1.46.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_default_timezone =>"Asia/Shanghai"
# 分页 limit
jdbc_paging_enabled => true
jdbc_page_size => "5000"
jdbc_connection_string => "jdbc:mysql://172.16.140.xxx:3306/gateway"
jdbc_user => "root"
jdbc_password => "xxxxxxx"
# 这里类似crontab,可以定制定时操作,比如每分钟执行一次同步(分 时 天 月 年) 5分钟频率: */5 * * * *
schedule => "* * * * *"
statement => "SELECT * FROM browser_req_log WHERE gmt_create >= :sql_last_value"
use_column_value => true
tracking_column_type => "timestamp"
tracking_column => "gmt_create"
# 是否记录上次执行结果, 如果为真,将会把上次执行到的 tracking_column 字段的值记录下来
last_run_metadata_path => "/etc/logstash/conf.d/syncpoint_table"
record_last_run => true
# 是否清除 last_run_metadata_path 的记录,如果为真那么每次都相当于从头开始查询所有的数据库记录
clean_run => false
#是否将 字段(column) 名称转小写
lowercase_column_names => false
}
}
filter {
mutate {
add_field => {"temp_ts" => "%{gmt_create}"}
}
#添加数据库字段作为 timestamp
# timestamp
date {
match => ["temp_ts","ISO8601"]
remove_field => ["temp_ts"]
timezone => "Asia/Shanghai"
}
output {
elasticsearch {
hosts => ["172.16.140.120", "172.16.140.121", "172.16.140.122"]
index => "browser_req-log"
document_id => "%{id}"
}
}
cat /etc/logstash/conf.d/syncpoint_table
2019-01-07
jdbc_driver_library
: jdbc mysql 驱动的路径,在上一步中已经下载jdbc_driver_class
: 驱动类的名字,mysql 填 com.mysql.jdbc.Driver 就好了jdbc_connection_string
: mysql 地址jdbc_user
: mysql 用户jdbc_password
: mysql 密码schedule
: 执行 sql 时机,类似 crontab 的调度statement
: 要执行的 sql,以 ":" 开头是定义的变量,可以通过 parameters 来设置变量,这里的 sql_last_value 是内置的变量,表示上一次 sql 执行中 update_time 的值,这里 update_time 条件是 >= 因为时间有可能相等,没有等号可能会漏掉一些增量use_column_value
: 使用递增列的值tracking_column_type
: 递增字段的类型,numeric 表示数值类型, timestamp 表示时间戳类型tracking_column
: 递增字段的名称,这里使用 update_time 这一列,这列的类型是 timestamplast_run_metadata_path
: 同步点文件,这个文件记录了上次的同步点,重启时会读取这个文件,这个文件可以手动修改hosts
: es 集群地址user
: es 用户名password
: es 密码index
: 导入到 es 中的 index 名,这里我直接设置成了 mysql 表的名字document_id
: 导入到 es 中的文档 id,这个需要设置成主键,否则同一条记录更新后在 es 中会出现两条记录,%{id} 表示引用 mysql 表中 id 字段的值config/pipelines.yml
- pipeline.id: table1
path.config: "config/sync_table1.cfg"
- pipeline.id: table2
path.config: "config/sync_table2.cfg"