Loading [MathJax]/jax/output/CommonHTML/config.js
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >ELK实时日志分析平台环境部署--完整记录

ELK实时日志分析平台环境部署--完整记录

作者头像
洗尽了浮华
发布于 2018-01-22 09:49:29
发布于 2018-01-22 09:49:29
2.2K00
代码可运行
举报
文章被收录于专栏:散尽浮华散尽浮华
运行总次数:0
代码可运行

在日常运维工作中,对于系统和业务日志的处理尤为重要。今天,在这里分享一下自己部署的ELK(+Redis)-开源实时日志分析平台的记录过程(仅依据本人的实际操作为例说明,如有误述,敬请指出)~

一、概念介绍 日志主要包括系统日志、应用程序日志和安全日志。系统运维和开发人员可以通过日志了解服务器软硬件信息、检查配置过程中的错误及错误发生的原因。经常分析日志可以了解服务器的负荷,性能安全性,从而及时采取措施纠正错误。 通常,日志被分散的储存不同的设备上。如果你管理数十上百台服务器,你还在使用依次登录每台机器的传统方法查阅日志。这样是不是感觉很繁琐和效率低下。当务之急我们使用集中化的日志管理,例如:开源的syslog,将所有服务器上的日志收集汇总。

集中化管理日志后,日志的统计和检索又成为一件比较麻烦的事情,一般我们使用grep、awk和wc等Linux命令能实现检索和统计,但是对于要求更高的查询、排序和统计等要求和庞大的机器数量依然使用这样的方法难免有点力不从心。

开源实时日志分析ELK平台能够完美的解决我们上述的问题,ELK由ElasticSearch、Logstash和Kiabana三个开源工具组成: 1)ElasticSearch是一个基于Lucene的开源分布式搜索服务器。它的特点有:分布式,零配置,自动发现,索引自动分片,索引副本机制,restful风格接口,多数据源,自动搜索负载等。它提供了一个分布式多用户能力的全文搜索引擎,基于RESTful web接口。Elasticsearch是用Java开发的,并作为Apache许可条款下的开放源码发布,是第二流行的企业搜索引擎。设计用于云计算中,能够达到实时搜索,稳定,可靠,快速,安装使用方便。 在elasticsearch中,所有节点的数据是均等的。 2)Logstash是一个完全开源的工具,他可以对你的日志进行收集、过滤、分析,并将其存储供以后使用(如,搜索),您可以使用它。说到搜索,logstash带有一个web界面,搜索和展示所有日志。 3)Kibana 是一个基于浏览器页面的Elasticsearch前端展示工具,也是一个开源和免费的工具,它Kibana可以为 Logstash 和 ElasticSearch 提供的日志分析友好的 Web 界面,可以帮助您汇总、分析和搜索重要数据日志。

ELK工作原理展示图:

如上图:Logstash收集AppServer产生的Log,并存放到ElasticSearch集群中,而Kibana则从ES集群中查询数据生成图表,再返回给Browser。

二、ELK环境部署

(0)基础环境介绍

系统: Centos7.1 防火墙: 关闭 Sellinux: 关闭

机器环境: 两台 elk-node1: 192.168.1.160       #master机器 elk-node2:192.168.1.161      #slave机器

注明: master-slave模式: master收集到日志后,会把一部分数据碎片到salve上(随机的一部分数据);同时,master和slave又都会各自做副本,并把副本放到对方机器上,这样就保证了数据不会丢失。 如果master宕机了,那么客户端在日志采集配置中将elasticsearch主机指向改为slave,就可以保证ELK日志的正常采集和web展示。

========================================================================================= 由于elk-node1和elk-node2两台是虚拟机,没有外网ip,所以访问需要通过宿主机进行代理转发实现。

有以下两种转发设置:(任选其一)

通过访问宿主机的19200,19201端口分别转发到elk-node1,elk-node2的9200端口 通过访问宿主机的15601端口转发到elk-node1的5601端口

宿主机:112.110.115.10(内网ip为192.168.1.7)  (为了不让线上的真实ip暴露,这里任意给了一个ip做记录)

a)通过宿主机的haproxy服务进行代理转发,如下是宿主机上的代理配置:

[root@kvm-server conf]# pwd /usr/local/haproxy/conf [root@kvm-server conf]# cat haproxy.cfg .......... .......... listen node1-9200 0.0.0.0:19200 mode tcp option tcplog balance roundrobin server 192.168.1.160 192.168.1.160:9200 weight 1 check inter 1s rise 2 fall 2

listen node2-9200 0.0.0.0:19201 mode tcp option tcplog balance roundrobin server 192.168.1.161 192.168.1.161:9200 weight 1 check inter 1s rise 2 fall 2

listen node1-5601 0.0.0.0:15601 mode tcp option tcplog balance roundrobin server 192.168.1.160 192.168.1.160:5601 weight 1 check inter 1s rise 2 fall 2

重启haproxy服务 [root@kvm-server conf]# /etc/init.d/haproxy restart

设置宿主机防火墙 [root@kvm-server conf]# cat /etc/sysconfig/iptables ......... -A INPUT -p tcp -m state --state NEW -m tcp --dport 19200 -j ACCEPT -A INPUT -p tcp -m state --state NEW -m tcp --dport 19201 -j ACCEPT -A INPUT -p tcp -m state --state NEW -m tcp --dport 15601 -j ACCEPT

[root@kvm-server conf]# /etc/init.d/iptables restart

b)通过宿主机的NAT端口转发实现

[root@kvm-server conf]# iptables -t nat -A PREROUTING -p tcp -m tcp --dport 19200 -j DNAT --to-destination 192.168.1.160:9200 [root@kvm-server conf]# iptables -t nat -A POSTROUTING -d 192.168.1.160/32 -p tcp -m tcp --sport 9200 -j SNAT --to-source 192.168.1.7 [root@kvm-server conf]# iptables -t filter -A INPUT -p tcp -m state --state NEW -m tcp --dport 19200 -j ACCEPT

[root@kvm-server conf]# iptables -t nat -A PREROUTING -p tcp -m tcp --dport 19201 -j DNAT --to-destination 192.168.1.161:9200 [root@kvm-server conf]# iptables -t nat -A POSTROUTING -d 192.168.1.161/32 -p tcp -m tcp --sport 9200 -j SNAT --to-source 192.168.1.7 [root@kvm-server conf]# iptables -t filter -A INPUT -p tcp -m state --state NEW -m tcp --dport 19201 -j ACCEPT

[root@kvm-server conf]# iptables -t nat -A PREROUTING -p tcp -m tcp --dport 15601 -j DNAT --to-destination 192.168.1.160:5601 [root@kvm-server conf]# iptables -t nat -A POSTROUTING -d 192.168.1.160/32 -p tcp -m tcp --sport 5601 -j SNAT --to-source 192.168.1.7 [root@kvm-server conf]# iptables -t filter -A INPUT -p tcp -m state --state NEW -m tcp --dport 15601 -j ACCEPT

[root@kvm-server conf]# service iptables save [root@kvm-server conf]# service iptables restart

提醒一点: nat端口转发设置成功后,/etc/sysconfig/iptables文件里要注释掉下面两行!不然nat转发会有问题!一般如上面在nat转发规则设置好并save和restart防火墙之后就会自动在/etc/sysconfig/iptables文件里删除掉下面两行内容了。 [root@kvm-server conf]# vim /etc/sysconfig/iptables .......... #-A INPUT -j REJECT --reject-with icmp-host-prohibited #-A FORWARD -j REJECT --reject-with icmp-host-prohibited [root@linux-node1 ~]# service iptables restart

=========================================================================================

(1)Elasticsearch安装配置

基础环境安装(elk-node1和elk-node2同时操作)

1)下载并安装GPG Key [root@elk-node1 ~]# rpm --import https://packages.elastic.co/GPG-KEY-elasticsearch

2)添加yum仓库 [root@elk-node1 ~]# vim /etc/yum.repos.d/elasticsearch.repo [elasticsearch-2.x] name=Elasticsearch repository for 2.x packages baseurl=http://packages.elastic.co/elasticsearch/2.x/centos gpgcheck=1 gpgkey=http://packages.elastic.co/GPG-KEY-elasticsearch enabled=1

3)安装elasticsearch [root@elk-node1 ~]# yum install -y elasticsearch

4)安装相关测试软件 #提前先下载安装epel源:epel-release-latest-7.noarch.rpm,否则yum会报错:No Package..... [root@elk-node1 ~]# wget http://dl.fedoraproject.org/pub/epel/epel-release-latest-7.noarch.rpm [root@elk-node1 ~]# rpm -ivh epel-release-latest-7.noarch.rpm #安装Redis [root@elk-node1 ~]# yum install -y redis #安装Nginx [root@elk-node1 ~]# yum install -y nginx #安装java [root@elk-node1 ~]# yum install -y java

安装完java后,检测 [root@elk-node1 ~]# java -version openjdk version "1.8.0_102" OpenJDK Runtime Environment (build 1.8.0_102-b14) OpenJDK 64-Bit Server VM (build 25.102-b14, mixed mode)

配置部署(下面先进行elk-node1的配置)

1)配置修改配置文件 [root@elk-node1 ~]# mkdir -p /data/es-data [root@elk-node1 ~]# vim /etc/elasticsearch/elasticsearch.yml                               【将里面内容情况,配置下面内容】 cluster.name: huanqiu                            # 组名(同一个组,组名必须一致) node.name: elk-node1                            # 节点名称,建议和主机名一致 path.data: /data/es-data                         # 数据存放的路径 path.logs: /var/log/elasticsearch/             # 日志存放的路径 bootstrap.mlockall: true                         # 锁住内存,不被使用到交换分区去 network.host: 0.0.0.0                            # 网络设置 http.port: 9200                                    # 端口

2)启动并查看 [root@elk-node1 ~]# chown -R elasticsearch.elasticsearch /data/ [root@elk-node1 ~]# systemctl start elasticsearch [root@elk-node1 ~]# systemctl status elasticsearch CGroup: /system.slice/elasticsearch.service └─3005 /bin/java -Xms256m -Xmx1g -Djava.awt.headless=true -XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:CMSI...

注意:上面可以看出elasticsearch设置的内存最小256m,最大1g

[root@linux-node1 src]# netstat -antlp |egrep "9200|9300" tcp6 0 0 :::9200 :::* LISTEN 3005/java tcp6 0 0 :::9300 :::* LISTEN 3005/java

然后通过web访问(访问的浏览器最好用google浏览器)

http://112.110.115.10:19200/

3)通过命令的方式查看数据(在112.110.115.10宿主机或其他外网服务器上查看,如下) [root@kvm-server src]# curl -i -XGET 'http://192.168.1.160:9200/_count?pretty' -d '{"query":{"match_all":{}}}' HTTP/1.1 200 OK Content-Type: application/json; charset=UTF-8 Content-Length: 95

{ "count" : 0, "_shards" : { "total" : 0, "successful" : 0, "failed" : 0 } }

这样感觉用命令来查看,特别的不爽。

4)接下来安装插件,使用插件进行查看~  (下面两个插件要在elk-node1和elk-node2上都要安装) 4.1)安装head插件 ---------------------------------------------------------------------------------------------------- a)插件安装方法一 [root@elk-node1 src]# /usr/share/elasticsearch/bin/plugin install mobz/elasticsearch-head

b)插件安装方法二 首先下载head插件,下载到/usr/loca/src目录下 下载地址:https://github.com/mobz/elasticsearch-head

---------------------------------------------------------------- head插件包百度云盘下载:https://pan.baidu.com/s/1boBE0qj 提取密码:ifj7 ----------------------------------------------------------------

[root@elk-node1 src]# unzip elasticsearch-head-master.zip [root@elk-node1 src]# ls elasticsearch-head-master elasticsearch-head-master.zip

在/usr/share/elasticsearch/plugins目录下创建head目录 然后将上面下载的elasticsearch-head-master.zip解压后的文件都移到/usr/share/elasticsearch/plugins/head下 接着重启elasticsearch服务即可! [root@elk-node1 src]# cd /usr/share/elasticsearch/plugins/ [root@elk-node1 plugins]# mkdir head [root@elk-node1 plugins]# ls head [root@elk-node1 plugins]# cd head [root@elk-node1 head]# cp -r /usr/local/src/elasticsearch-head-master/* ./ [root@elk-node1 head]# pwd /usr/share/elasticsearch/plugins/head

[root@elk-node1 head]# chown -R elasticsearch:elasticsearch /usr/share/elasticsearch/plugins [root@elk-node1 head]# ll total 40 -rw-r--r--. 1 elasticsearch elasticsearch 104 Sep 28 01:57 elasticsearch-head.sublime-project -rw-r--r--. 1 elasticsearch elasticsearch 2171 Sep 28 01:57 Gruntfile.js -rw-r--r--. 1 elasticsearch elasticsearch 3482 Sep 28 01:57 grunt_fileSets.js -rw-r--r--. 1 elasticsearch elasticsearch 1085 Sep 28 01:57 index.html -rw-r--r--. 1 elasticsearch elasticsearch 559 Sep 28 01:57 LICENCE -rw-r--r--. 1 elasticsearch elasticsearch 795 Sep 28 01:57 package.json -rw-r--r--. 1 elasticsearch elasticsearch 100 Sep 28 01:57 plugin-descriptor.properties -rw-r--r--. 1 elasticsearch elasticsearch 5211 Sep 28 01:57 README.textile drwxr-xr-x. 5 elasticsearch elasticsearch 4096 Sep 28 01:57 _site drwxr-xr-x. 4 elasticsearch elasticsearch 29 Sep 28 01:57 src drwxr-xr-x. 4 elasticsearch elasticsearch 66 Sep 28 01:57 test

[root@elk-node1 _site]# systemctl restart elasticsearch -----------------------------------------------------------------------------------------------------

插件访问(最好提前将elk-node2节点的配置和插件都安装后,再来进行访问和数据插入测试) http://112.110.115.10:19200/_plugin/head/

先插入数据实例,测试下 如下:打开”复合查询“,在POST选项下,任意输入如/index-demo/test,然后在下面输入数据(注意内容之间换行的逗号不要漏掉); 数据输入好之后(如下输入wangshibo;hello world内容),下面点击”验证JSON“->”提交请求“,提交成功后,观察右栏里出现的信息:有index,type,version等信息,failed:0(成功消息)

再查看测试实例,如下: "复合查询"下,选择GET选项,在/index-demo/test/后面输入上面POST结果中的id号,不输入内容,即{}括号里为空! 然后点击”验证JSON“->"提交请求",观察右栏内就有了上面插入的数据了(即wangshibo,hello world)

打开"基本查询",查看下数据,如下,即可查询到上面插入的数据:

打开“数据浏览”,也能查看到插入的数据:

如下:一定要提前在elk-node2节点上也完成配置(配置内容在下面提到),否则上面插入数据后,集群状态会呈现黄色yellow状态,elk-node2完成配置加入到集群里后就会恢复到正常的绿色状态。

4.2)安装kopf监控插件 --------------------------------------------------------------------------------------------------------------------

a)监控插件安装方法一

[root@elk-node1 src]# /usr/share/elasticsearch/bin/plugin install lmenezes/elasticsearch-kopf

b)监控插件安装方法二

首先下载监控插件kopf,下载到/usr/loca/src目录下 下载地址:https://github.com/lmenezes/elasticsearch-kopf

---------------------------------------------------------------- kopf插件包百度云盘下载:https://pan.baidu.com/s/1qYixSL2 提取密码:ya4t ----------------------------------------------------------------

[root@elk-node1 src]# unzip elasticsearch-kopf-master.zip [root@elk-node1 src]# ls elasticsearch-kopf-master elasticsearch-kopf-master.zip

在/usr/share/elasticsearch/plugins目录下创建kopf目录 然后将上面下载的elasticsearch-kopf-master.zip解压后的文件都移到/usr/share/elasticsearch/plugins/kopf下 接着重启elasticsearch服务即可! [root@elk-node1 src]# cd /usr/share/elasticsearch/plugins/ [root@elk-node1 plugins]# mkdir kopf [root@elk-node1 plugins]# cd kopf [root@elk-node1 kopf]# cp -r /usr/local/src/elasticsearch-kopf-master/* ./ [root@elk-node1 kopf]# pwd /usr/share/elasticsearch/plugins/kopf

[root@elk-node1 kopf]# chown -R elasticsearch:elasticsearch /usr/share/elasticsearch/plugins [root@elk-node1 kopf]# ll total 40 -rw-r--r--. 1 elasticsearch elasticsearch 237 Sep 28 16:28 CHANGELOG.md drwxr-xr-x. 2 elasticsearch elasticsearch 22 Sep 28 16:28 dataset drwxr-xr-x. 2 elasticsearch elasticsearch 73 Sep 28 16:28 docker -rw-r--r--. 1 elasticsearch elasticsearch 4315 Sep 28 16:28 Gruntfile.js drwxr-xr-x. 2 elasticsearch elasticsearch 4096 Sep 28 16:28 imgs -rw-r--r--. 1 elasticsearch elasticsearch 1083 Sep 28 16:28 LICENSE -rw-r--r--. 1 elasticsearch elasticsearch 1276 Sep 28 16:28 package.json -rw-r--r--. 1 elasticsearch elasticsearch 102 Sep 28 16:28 plugin-descriptor.properties -rw-r--r--. 1 elasticsearch elasticsearch 3165 Sep 28 16:28 README.md drwxr-xr-x. 6 elasticsearch elasticsearch 4096 Sep 28 16:28 _site drwxr-xr-x. 4 elasticsearch elasticsearch 27 Sep 28 16:28 src drwxr-xr-x. 4 elasticsearch elasticsearch 4096 Sep 28 16:28 tests

[root@elk-node1 _site]# systemctl restart elasticsearch

-----------------------------------------------------------------------------------------------------

访问插件:(如下,同样要提前安装好elk-node2节点上的插件,否则访问时会出现集群节点为黄色的yellow告警状态)

http://112.110.115.10:19200/_plugin/kopf/#!/cluster

************************************************************************* 下面进行节点elk-node2的配置  (如上的两个插件也在elk-node2上同样安装)

注释:其实两个的安装配置基本上是一样的。

[root@elk-node2 src]# mkdir -p /data/es-data [root@elk-node2 ~]# cat /etc/elasticsearch/elasticsearch.yml cluster.name: huanqiu node.name: elk-node2 path.data: /data/es-data path.logs: /var/log/elasticsearch/ bootstrap.mlockall: true network.host: 0.0.0.0 http.port: 9200 discovery.zen.ping.multicast.enabled: false discovery.zen.ping.unicast.hosts: ["192.168.1.160", "192.168.1.161"]

# 修改权限配置 [root@elk-node2 src]# chown -R elasticsearch.elasticsearch /data/

# 启动服务 [root@elk-node2 src]# systemctl start elasticsearch [root@elk-node2 src]# systemctl status elasticsearch ● elasticsearch.service - Elasticsearch Loaded: loaded (/usr/lib/systemd/system/elasticsearch.service; enabled; vendor preset: disabled) Active: active (running) since Wed 2016-09-28 16:49:41 CST; 1 weeks 3 days ago Docs: http://www.elastic.co Process: 17798 ExecStartPre=/usr/share/elasticsearch/bin/elasticsearch-systemd-pre-exec (code=exited, status=0/SUCCESS) Main PID: 17800 (java) CGroup: /system.slice/elasticsearch.service └─17800 /bin/java -Xms256m -Xmx1g -Djava.awt.headless=true -XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:CMSInitiatingOccupancyFra...

Oct 09 13:42:22 elk-node2 elasticsearch[17800]: [2016-10-09 13:42:22,295][WARN ][transport ] [elk-node2] Transport res...943817] Oct 09 13:42:23 elk-node2 elasticsearch[17800]: [2016-10-09 13:42:23,111][WARN ][transport ] [elk-node2] Transport res...943846] ................ ................

# 查看端口 [root@elk-node2 src]# netstat -antlp|egrep "9200|9300" tcp6 0 0 :::9200 :::* LISTEN 2928/java tcp6 0 0 :::9300 :::* LISTEN 2928/java tcp6 0 0 127.0.0.1:48200 127.0.0.1:9300 TIME_WAIT - tcp6 0 0 ::1:41892 ::1:9300 TIME_WAIT - *************************************************************************

通过命令的方式查看elk-node2数据(在112.110.115.10宿主机或其他外网服务器上查看,如下) [root@kvm-server ~]# curl -i -XGET 'http://192.168.1.161:9200/_count?pretty' -d '{"query":{"match_all":{}}}' HTTP/1.1 200 OK Content-Type: application/json; charset=UTF-8 Content-Length: 95

{ "count" : 1, "_shards" : { "total" : 5, "successful" : 5, "failed" : 0 }

然后通过web访问elk-node2 http://112.110.115.10:19201/

访问两个插件: http://112.110.115.10:19201/_plugin/head/ http://112.110.115.10:19201/_plugin/kopf/#!/cluster

 (2)Logstash安装配置(这个在客户机上是要安装的。elk-node1和elk-node2都安装)

基础环境安装(客户端安装logstash,收集到的数据写入到elasticsearch里,就可以登陆logstash界面查看到了

1)下载并安装GPG Key [root@elk-node1 ~]# rpm --import https://packages.elastic.co/GPG-KEY-elasticsearch

2)添加yum仓库 [root@hadoop-node1 ~]# vim /etc/yum.repos.d/logstash.repo [logstash-2.1] name=Logstash repository for 2.1.x packages baseurl=http://packages.elastic.co/logstash/2.1/centos gpgcheck=1 gpgkey=http://packages.elastic.co/GPG-KEY-elasticsearch enabled=1

3)安装logstash [root@elk-node1 ~]# yum install -y logstash

4)logstash启动 [root@elk-node1 ~]# systemctl start elasticsearch [root@elk-node1 ~]# systemctl status elasticsearch ● elasticsearch.service - Elasticsearch Loaded: loaded (/usr/lib/systemd/system/elasticsearch.service; disabled; vendor preset: disabled) Active: active (running) since Mon 2016-11-07 18:33:28 CST; 3 days ago Docs: http://www.elastic.co Main PID: 8275 (java) CGroup: /system.slice/elasticsearch.service └─8275 /bin/java -Xms256m -Xmx1g -Djava.awt.headless=true -XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:CMSInitiatingOccupancyFrac... .......... ..........

数据的测试

1)基本的输入输出 [root@elk-node1 ~]# /opt/logstash/bin/logstash -e 'input { stdin{} } output { stdout{} }' Settings: Default filter workers: 1 Logstash startup completed hello                                                                                     #输入这个 2016-11-11T06:41:07.690Z elk-node1 hello                        #输出这个 wangshibo                                                                            #输入这个 2016-11-11T06:41:10.608Z elk-node1 wangshibo               #输出这个

2)使用rubydebug详细输出 [root@elk-node1 ~]# /opt/logstash/bin/logstash -e 'input { stdin{} } output { stdout{ codec => rubydebug} }' Settings: Default filter workers: 1 Logstash startup completed hello   #输入这个 {                                                                                         #输出下面信息            "message" => "hello",            "@version" => "1",       "@timestamp" => "2016-11-11T06:44:06.711Z",                   "host" => "elk-node1" } wangshibo #输入这个 {                                                                                       #输出下面信息          "message" => "wangshibo",         "@version" => "1",    "@timestamp" => "2016-11-11T06:44:11.270Z",                "host" => "elk-node1" }

3) 把内容写到elasticsearch中 [root@elk-node1 ~]# /opt/logstash/bin/logstash -e 'input { stdin{} } output { elasticsearch { hosts => ["192.168.1.160:9200"]} }' Settings: Default filter workers: 1 Logstash startup completed                       #输入下面的测试数据 123456 wangshibo huanqiu hahaha

使用rubydebug和写到elasticsearch中的区别:其实就在于后面标准输出的区别,前者使用codec;后者使用elasticsearch

写到elasticsearch中在logstash中查看,如下图: 注意: master收集到日志后,会把一部分数据碎片到salve上(随机的一部分数据),master和slave又都会各自做副本,并把副本放到对方机器上,这样就保证了数据不会丢失。 如下,master收集到的数据放到了自己的第1,3分片上,其他的放到了slave的第0,2,4分片上。

4)即写到elasticsearch中又写在文件中一份 [root@elk-node1 ~]# /opt/logstash/bin/logstash -e 'input { stdin{} } output { elasticsearch { hosts => ["192.168.1.160:9200"]} stdout{ codec => rubydebug}}' Settings: Default filter workers: 1 Logstash startup completed huanqiupc {            "message" => "huanqiupc",           "@version" => "1",      "@timestamp" => "2016-11-11T07:27:42.012Z",                  "host" => "elk-node1" } wangshiboqun {          "message" => "wangshiboqun",         "@version" => "1",    "@timestamp" => "2016-11-11T07:27:55.396Z",                "host" => "elk-node1" }

以上文本可以长期保留、操作简单、压缩比大。下面登陆elasticsearch界面中查看;

 logstash的配置和文件的编写

1)logstash的配置 简单的配置方式: [root@elk-node1 ~]# vim /etc/logstash/conf.d/01-logstash.conf input { stdin { } } output {         elasticsearch { hosts => ["192.168.1.160:9200"]}         stdout { codec => rubydebug } }

它的执行: [root@elk-node1 ~]# /opt/logstash/bin/logstash -f /etc/logstash/conf.d/01-logstash.conf Settings: Default filter workers: 1 Logstash startup completed beijing                                                #输入内容 {                                                       #输出下面信息              "message" => "beijing",             "@version" => "1",        "@timestamp" => "2016-11-11T07:41:48.401Z",                    "host" => "elk-node1" }

-------------------------------------------------------------------------------------------------- 参考内容: https://www.elastic.co/guide/en/logstash/current/configuration.html https://www.elastic.co/guide/en/logstash/current/configuration-file-structure.html --------------------------------------------------------------------------------------------------

2)收集系统日志

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
[root@elk-node1 ~]# vim  file.conf
input {
    file {
      path => "/var/log/messages"
      type => "system"
      start_position => "beginning"
    }
}

output {
    elasticsearch {
       hosts => ["192.168.1.160:9200"]
       index => "system-%{+YYYY.MM.dd}"
    }
}

执行上面日志信息的收集,如下,这个命令会一直在执行中,表示日志在监控收集中;如果中断,就表示日志不在收集!所以需要放在后台执行~ [root@elk-node1 ~]# /opt/logstash/bin/logstash -f file.conf &

登陆elasticsearch界面,查看本机系统日志的信息:

-------------------------------------------------------------------------------------------------- 参考内容: https://www.elastic.co/guide/en/logstash/current/plugins-outputs-elasticsearch.html --------------------------------------------------------------------------------------------------

3)收集java日志,其中包含上面讲到的日志收集

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
[root@elk-node1 ~]# vim  file.conf
input {
    file {
      path => "/var/log/messages"
      type => "system"
      start_position => "beginning"
    }
}

input {
    file {
       path => "/var/log/elasticsearch/huanqiu.log"
       type => "es-error" 
       start_position => "beginning"
    }
}


output {

    if [type] == "system"{
        elasticsearch {
           hosts => ["192.168.1.160:9200"]
           index => "system-%{+YYYY.MM.dd}"
        }
    }

    if [type] == "es-error"{
        elasticsearch {
           hosts => ["192.168.1.160:9200"]
           index => "es-error-%{+YYYY.MM.dd}"
        }
    }
}

注意: 如果你的日志中有type字段 那你就不能在conf文件中使用type

执行如下命令收集: [root@elk-node1 ~]# /opt/logstash/bin/logstash -f file.conf &

登陆elasticsearch界面,查看数据:

-------------------------------------------------------------------------------------------------- 参考内容: https://www.elastic.co/guide/en/logstash/current/event-dependent-configuration.html --------------------------------------------------------------------------------------------------

--------------- 有个问题: 每个报错都给收集成一行了,不是按照一个报错,一个事件模块收集的。

下面将行换成事件的方式展示:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
[root@elk-node1 ~]# vim multiline.conf
input {
    stdin {
       codec => multiline {
          pattern => "^\["
          negate => true
          what => "previous"
        }
    }
}
output {
    stdout {
      codec => "rubydebug"
     }  
}

执行命令:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
[root@elk-node1 ~]# /opt/logstash/bin/logstash -f multiline.conf 
Settings: Default filter workers: 1
Logstash startup completed
123
456
[123
{
    "@timestamp" => "2016-11-11T09:28:56.824Z",
       "message" => "123\n456",
      "@version" => "1",
          "tags" => [
        [0] "multiline"
    ],
          "host" => "elk-node1"
}
123]
[456]
{
    "@timestamp" => "2016-11-11T09:29:09.043Z",
       "message" => "[123\n123]",
      "@version" => "1",
          "tags" => [
        [0] "multiline"
    ],
          "host" => "elk-node1"
}

在没有遇到[的时候,系统不会收集,只有遇见[的时候,才算是一个事件,才收集起来。 -------------------------------------------------------------------------------------------------- 参考内容 https://www.elastic.co/guide/en/logstash/current/plugins-codecs-multiline.html --------------------------------------------------------------------------------------------------

(3)Kibana安装配置

1)kibana的安装: [root@elk-node1 ~]# cd /usr/local/src [root@elk-node1 src]# wget https://download.elastic.co/kibana/kibana/kibana-4.3.1-linux-x64.tar.gz [root@elk-node1 src]# tar zxf kibana-4.3.1-linux-x64.tar.gz [root@elk-node1 src]# mv kibana-4.3.1-linux-x64 /usr/local/ [root@elk-node1 src]# ln -s /usr/local/kibana-4.3.1-linux-x64/ /usr/local/kibana

2)修改配置文件: [root@elk-node1 config]# pwd /usr/local/kibana/config [root@elk-node1 config]# cp kibana.yml kibana.yml.bak [root@elk-node1 config]# vim kibana.yml server.port: 5601 server.host: "0.0.0.0" elasticsearch.url: "http://192.168.1.160:9200" kibana.index: ".kibana"

因为他一直运行在前台,要么选择开一个窗口,要么选择使用screen。 安装并使用screen启动kibana: [root@elk-node1 ~]# yum -y install screen [root@elk-node1 ~]# screen                          #这样就另开启了一个终端窗口 [root@elk-node1 ~]# /usr/local/kibana/bin/kibana log [18:23:19.867] [info][status][plugin:kibana] Status changed from uninitialized to green - Ready log [18:23:19.911] [info][status][plugin:elasticsearch] Status changed from uninitialized to yellow - Waiting for Elasticsearch log [18:23:19.941] [info][status][plugin:kbn_vislib_vis_types] Status changed from uninitialized to green - Ready log [18:23:19.953] [info][status][plugin:markdown_vis] Status changed from uninitialized to green - Ready log [18:23:19.963] [info][status][plugin:metric_vis] Status changed from uninitialized to green - Ready log [18:23:19.995] [info][status][plugin:spyModes] Status changed from uninitialized to green - Ready log [18:23:20.004] [info][status][plugin:statusPage] Status changed from uninitialized to green - Ready log [18:23:20.010] [info][status][plugin:table_vis] Status changed from uninitialized to green - Ready

然后按ctrl+a+d组合键,这样在上面另启的screen屏里启动的kibana服务就一直运行在前台了.... [root@elk-node1 ~]# screen -ls There is a screen on: 15041.pts-0.elk-node1 (Detached) 1 Socket in /var/run/screen/S-root.

(3)访问kibana:http://112.110.115.10:15601/ 如下,如果是添加上面设置的java日志收集信息,则在下面填写es-error*;如果是添加上面设置的系统日志信息system*,以此类型(可以从logstash界面看到日志收集项)

 然后点击上面的Discover,在Discover中查看:

查看日志登陆,需要点击“Discover”-->"message",点击它后面的“add” 注意: 需要右边查看日志内容时带什么属性,就在左边点击相应属性后面的“add” 如下图,添加了message和path的属性:

这样,右边显示的日志内容的属性就带了message和path

点击右边日志内容属性后面隐藏的<<,就可将内容向前缩进

添加新的日志采集项,点击Settings->+Add New,比如添加system系统日志。注意后面的*不要忘了。

删除kibana里的日志采集项,如下,点击删除图标即可。

如果打开kibana查看日志,发现没有日志内容,出现“No results found”,如下图所示,这说明要查看的日志在当前时间没有日志信息输出,可以点击右上角的时间钟来调试日志信息的查看。

4)收集nginx的访问日志

修改nginx的配置文件,分别在nginx.conf的http和server配置区域添加下面内容:

##### http 标签中           log_format json '{"@timestamp":"$time_iso8601",'                            '"@version":"1",'                            '"client":"$remote_addr",'                            '"url":"$uri",'                            '"status":"$status",'                            '"domain":"$host",'                            '"host":"$server_addr",'                            '"size":$body_bytes_sent,'                            '"responsetime":$request_time,'                            '"referer": "$http_referer",'                            '"ua": "$http_user_agent"' '}'; ##### server标签中             access_log /var/log/nginx/access_json.log json;

截图如下:

启动nginx服务:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
[root@elk-node1 ~]# systemctl start nginx
[root@elk-node1 ~]# systemctl status nginx
● nginx.service - The nginx HTTP and reverse proxy server
   Loaded: loaded (/usr/lib/systemd/system/nginx.service; disabled; vendor preset: disabled)
   Active: active (running) since Fri 2016-11-11 19:06:55 CST; 3s ago
  Process: 15119 ExecStart=/usr/sbin/nginx (code=exited, status=0/SUCCESS)
  Process: 15116 ExecStartPre=/usr/sbin/nginx -t (code=exited, status=0/SUCCESS)
  Process: 15114 ExecStartPre=/usr/bin/rm -f /run/nginx.pid (code=exited, status=0/SUCCESS)
 Main PID: 15122 (nginx)
   CGroup: /system.slice/nginx.service
           ├─15122 nginx: master process /usr/sbin/nginx
           ├─15123 nginx: worker process
           └─15124 nginx: worker process

Nov 11 19:06:54 elk-node1 systemd[1]: Starting The nginx HTTP and reverse proxy server...
Nov 11 19:06:55 elk-node1 nginx[15116]: nginx: the configuration file /etc/nginx/nginx.conf syntax is ok
Nov 11 19:06:55 elk-node1 nginx[15116]: nginx: configuration file /etc/nginx/nginx.conf test is successful
Nov 11 19:06:55 elk-node1 systemd[1]: Started The nginx HTTP and reverse proxy server.

编写收集文件 这次使用json的方式收集:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
[root@elk-node1 ~]# vim json.conf 
input {
   file {
      path => "/var/log/nginx/access_json.log"
      codec => "json"
   }
}

output {
   stdout {
      codec => "rubydebug"
   }
}

启动日志收集程序: [root@elk-node1 ~]# /opt/logstash/bin/logstash -f json.conf        #或加个&放在后台执行

访问nginx页面(在elk-node1的宿主机上执行访问页面的命令:curl http://192.168.1.160)就会出现以下内容:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
[root@elk-node1 ~]# /opt/logstash/bin/logstash -f json.conf
Settings: Default filter workers: 1
Logstash startup completed
{
      "@timestamp" => "2016-11-11T11:10:53.000Z",
        "@version" => "1",
          "client" => "192.168.1.7",
             "url" => "/index.html",
          "status" => "200",
          "domain" => "192.168.1.160",
            "host" => "192.168.1.160",
            "size" => 3700,
    "responsetime" => 0.0,
         "referer" => "-",
              "ua" => "curl/7.19.7 (x86_64-redhat-linux-gnu) libcurl/7.19.7 NSS/3.14.0.0 zlib/1.2.3 libidn/1.18 libssh2/1.4.2",
            "path" => "/var/log/nginx/access_json.log"
}

注意: 上面的json.conf配置只是将nginx日志输出,还没有输入到elasticsearch里,所以这个时候在elasticsearch界面里是采集不到nginx日志的。

需要配置一下,将nginx日志输入到elasticsearch中,将其汇总到总文件file.conf里,如下也将nginx-log日志输入到elasticserach里:(后续就可以只用这个汇总文件,把要追加的日志汇总到这个总文件里即可)

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
[root@elk-node1 ~]# cat file.conf 
input {
    file {
      path => "/var/log/messages"
      type => "system"
      start_position => "beginning"
    }

    file {
       path => "/var/log/elasticsearch/huanqiu.log"
       type => "es-error" 
       start_position => "beginning"
       codec => multiline {
           pattern => "^\["
           negate => true
           what => "previous"
       }
    }
    file {
       path => "/var/log/nginx/access_json.log"
       codec => json
       start_position => "beginning"
       type => "nginx-log"
    }
}


output {

    if [type] == "system"{
        elasticsearch {
           hosts => ["192.168.1.160:9200"]
           index => "system-%{+YYYY.MM.dd}"
        }
    }

    if [type] == "es-error"{
        elasticsearch {
           hosts => ["192.168.1.160:9200"]
           index => "es-error-%{+YYYY.MM.dd}"
        }
    }
    if [type] == "nginx-log"{
        elasticsearch {
           hosts => ["192.168.1.160:9200"]
           index => "nignx-log-%{+YYYY.MM.dd}"
        }
    }
}

可以加上--configtest参数,测试下配置文件是否有语法错误或配置不当的地方,这个很重要!! [root@elk-node1 ~]# /opt/logstash/bin/logstash -f file.conf --configtest Configuration OK

然后接着执行logstash命令(由于上面已经将这个执行命令放到了后台,所以这里其实不用执行,也可以先kill之前的,再放后台执行),然后可以再访问nginx界面测试下 [root@elk-node1 ~]# /opt/logstash/bin/logstash -f file.conf &

登陆elasticsearch界面查看:

 将nginx日志整合到kibana界面里,如下:

5)收集系统日志

编写收集文件并执行。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
[root@elk-node1 ~]# cat syslog.conf
input {
    syslog {
        type => "system-syslog"
        host => "192.168.1.160"
        port => "514"
    }
}

output {
    stdout {
        codec => "rubydebug"
    }
}

对上面的采集文件进行执行: [root@elk-node1 ~]# /opt/logstash/bin/logstash -f syslog.conf

重新开启一个窗口,查看服务是否启动: [root@elk-node1 ~]# netstat -ntlp|grep 514 tcp6 0 0 192.168.1.160:514 :::* LISTEN 17842/java [root@elk-node1 ~]# vim /etc/rsyslog.conf #*.* @@remote-host:514                                                           【在此行下面添加如下内容】 *.* @@192.168.1.160:514

[root@elk-node1 ~]# systemctl restart rsyslog

回到原来的窗口(即上面采集文件的执行终端),就会出现数据:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
[root@elk-node1 ~]# /opt/logstash/bin/logstash -f syslog.conf
Settings: Default filter workers: 1
Logstash startup completed
{
           "message" => "Stopping System Logging Service...\n",
          "@version" => "1",
        "@timestamp" => "2016-11-13T10:35:30.000Z",
              "type" => "system-syslog",
              "host" => "192.168.1.160",
          "priority" => 30,
         "timestamp" => "Nov 13 18:35:30",
         "logsource" => "elk-node1",
           "program" => "systemd",
          "severity" => 6,
          "facility" => 3,
    "facility_label" => "system",
    "severity_label" => "Informational"
}
........
........

再次添加到总文件file.conf中:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
[root@elk-node1 ~]# cat file.conf
input {
    file {
      path => "/var/log/messages"
      type => "system"
      start_position => "beginning"
    }

    file {
       path => "/var/log/elasticsearch/huanqiu.log"
       type => "es-error" 
       start_position => "beginning"
       codec => multiline {
           pattern => "^\["
           negate => true
           what => "previous"
       }
    }
    file {
       path => "/var/log/nginx/access_json.log"
       codec => json
       start_position => "beginning"
       type => "nginx-log"
    }
    syslog {
        type => "system-syslog"
        host => "192.168.1.160"
        port => "514"
    }
}


output {

    if [type] == "system"{
        elasticsearch {
           hosts => ["192.168.1.160:9200"]
           index => "system-%{+YYYY.MM.dd}"
        }
    }

    if [type] == "es-error"{
        elasticsearch {
           hosts => ["192.168.1.160:9200"]
           index => "es-error-%{+YYYY.MM.dd}"
        }
    }
    if [type] == "nginx-log"{
        elasticsearch {
           hosts => ["192.168.1.160:9200"]
           index => "nignx-log-%{+YYYY.MM.dd}"
        }
    }
    if [type] == "system-syslog"{
        elasticsearch {
           hosts => ["192.168.1.160:9200"]
           index => "system-syslog-%{+YYYY.MM.dd}"
        }
    }
}

执行总文件(先测试下总文件配置是否有误,然后先kill之前在后台启动的file.conf文件,再次执行): [root@elk-node1 ~]# /opt/logstash/bin/logstash -f file.conf --configtest Configuration OK [root@elk-node1 ~]# /opt/logstash/bin/logstash -f file.conf &

测试: 向日志中添加数据,看elasticsearch和kibana的变化: [root@elk-node1 ~]# logger "hehehehehehe1" [root@elk-node1 ~]# logger "hehehehehehe2" [root@elk-node1 ~]# logger "hehehehehehe3" [root@elk-node1 ~]# logger "hehehehehehe4" [root@elk-node1 ~]# logger "hehehehehehe5"

添加到kibana界面中:

6)TCP日志的收集

编写日志收集文件,并执行:(有需要的话,可以将下面收集文件的配置汇总到上面的总文件file.conf里,进而输入到elasticsearch界面里和kibana里查看) [root@elk-node1 ~]# cat tcp.conf input { tcp { host => "192.168.1.160" port => "6666" } } output { stdout { codec => "rubydebug" } }

[root@elk-node1 ~]# /opt/logstash/bin/logstash -f tcp.conf

开启另外一个窗口,测试一(安装nc命令:yum install -y nc): [root@elk-node1 ~]# nc 192.168.1.160 6666 </etc/resolv.conf

回到原来的窗口(即上面采集文件的执行终端),就会出现数据: [root@elk-node1 ~]# /opt/logstash/bin/logstash -f tcp.conf Settings: Default filter workers: 1 Logstash startup completed {         "message" => "",        "@version" => "1",    "@timestamp" => "2016-11-13T11:01:15.280Z",               "host" => "192.168.1.160",               "port" => 49743 }

测试二: [root@elk-node1 ~]# echo "hehe" | nc 192.168.1.160 6666 [root@elk-node1 ~]# echo "hehe" > /dev/tcp/192.168.1.160/6666

回到之前的执行端口,在去查看,就会显示出来:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
[root@elk-node1 ~]# /opt/logstash/bin/logstash -f tcp.conf 
Settings: Default filter workers: 1
Logstash startup completed.......
{
       "message" => "hehe",
      "@version" => "1",
    "@timestamp" => "2016-11-13T11:39:58.263Z",
          "host" => "192.168.1.160",
          "port" => 53432
}
{
       "message" => "hehe",
      "@version" => "1",
    "@timestamp" => "2016-11-13T11:40:13.458Z",
          "host" => "192.168.1.160",
          "port" => 53457
}

7)使用filter 编写文件:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
[root@elk-node1 ~]# cat grok.conf
input {
    stdin{}
}
filter {
  grok {
    match => { "message" => "%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration}" }
  }
}
output {
    stdout{
        codec => "rubydebug"
    }
}

执行检测:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
[root@elk-node1 ~]# /opt/logstash/bin/logstash -f grok.conf 
Settings: Default filter workers: 1
Logstash startup completed
55.3.244.1 GET /index.html 15824 0.043                    #输入这个,下面就会自动形成字典的形式
{
       "message" => "55.3.244.1 GET /index.html 15824 0.043",
      "@version" => "1",
    "@timestamp" => "2016-11-13T11:45:47.882Z",
          "host" => "elk-node1",
        "client" => "55.3.244.1",
        "method" => "GET",
       "request" => "/index.html",
         "bytes" => "15824",
      "duration" => "0.043"
}

其实上面使用的那些变量在程序中都有定义:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
[root@elk-node1 ~]# cd /opt/logstash/vendor/bundle/jruby/1.9/gems/logstash-patterns-core-2.0.2/patterns/
[root@elk-node1 patterns]# ls
aws     bro   firewalls      haproxy  junos         mcollective           mongodb  postgresql  redis
bacula  exim  grok-patterns  java     linux-syslog  mcollective-patterns  nagios   rails       ruby
[root@elk-node1 patterns]# cat grok-patterns
filter {
      # drop sleep events
    grok {
        match => { "message" =>"SELECT SLEEP" }
        add_tag => [ "sleep_drop" ]
        tag_on_failure => [] # prevent default _grokparsefailure tag on real records
      }
     if "sleep_drop" in [tags] {
        drop {}
     }
     grok {
        match => [ "message", "(?m)^# User@Host: %{USER:user}\[[^\]]+\] @ (?:(?<clienthost>\S*) )?\[(?:%{IP:clientip})?\]\s+Id: %{NUMBER:row_id:int}\s*# Query_time: %{NUMBER:query_time:float}\s+Lock_time: %{NUMBER:lock_time:float}\s+Rows_sent: %{NUMBER:rows_sent:int}\s+Rows_examined: %{NUMBER:rows_examined:int}\s*(?:use %{DATA:database};\s*)?SET timestamp=%{NUMBER:timestamp};\s*(?<query>(?<action>\w+)\s+.*)\n#\s*" ]
      }
      date {
        match => [ "timestamp", "UNIX" ]
        remove_field => [ "timestamp" ]
      }
}

8)mysql慢查询

收集文件:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
[root@elk-node1 ~]# cat mysql-slow.conf
input {
    file {
        path => "/root/slow.log"
        type => "mysql-slowlog"
        codec => multiline {
            pattern => "^# User@Host"
            negate => true
            what => "previous"
        }
    }
}

filter {
      # drop sleep events
    grok {
        match => { "message" =>"SELECT SLEEP" }
        add_tag => [ "sleep_drop" ]
        tag_on_failure => [] # prevent default _grokparsefailure tag on real records
      }
     if "sleep_drop" in [tags] {
        drop {}
     }
     grok {
        match => [ "message", "(?m)^# User@Host: %{USER:user}\[[^\]]+\] @ (?:(?<clienthost>\S*) )?\[(?:%{IP:clientip})?\]\s+Id: %{NUMBER:row_id:int}\s*# Query_time: %{NUMBER:query_time:float}\s+Lock_time: %{NUMBER:lock_time:float}\s+Rows_sent: %{NUMBER:rows_sent:int}\s+Rows_examined: %{NUMBER:rows_examined:int}\s*(?:use %{DATA:database};\s*)?SET timestamp=%{NUMBER:timestamp};\s*(?<query>(?<action>\w+)\s+.*)\n#\s*" ]
      }
      date {
        match => [ "timestamp", "UNIX" ]
        remove_field => [ "timestamp" ]
      }
}


output {
    stdout {
       codec =>"rubydebug"
    }
}

执行检测: 上面需要的/root/slow.log是自己上传的,然后自己插入数据保存后,会显示:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
[root@elk-node1 ~]# /opt/logstash/bin/logstash -f mysql-slow.conf
Settings: Default filter workers: 1
Logstash startup completed
{
    "@timestamp" => "2016-11-14T06:53:54.100Z",
       "message" => "# Time: 161114 11:05:18",
      "@version" => "1",
          "path" => "/root/slow.log",
          "host" => "elk-node1",
          "type" => "mysql-slowlog",
          "tags" => [
        [0] "_grokparsefailure"
    ]
}
{
    "@timestamp" => "2016-11-14T06:53:54.105Z",
       "message" => "# User@Host: test[test] @  [124.65.197.154]\n# Query_time: 1.725889  Lock_time: 0.000430 Rows_sent: 0  Rows_examined: 0\nuse test_zh_o2o_db;\nSET timestamp=1479092718;\nSELECT trigger_name, event_manipulation, event_object_table, action_statement, action_timing, DEFINER FROM information_schema.triggers WHERE BINARY event_object_schema='test_zh_o2o_db' AND BINARY event_object_table='customer';\n# Time: 161114 12:10:30",
      "@version" => "1",
          "tags" => [
        [0] "multiline",
        [1] "_grokparsefailure"
    ],
          "path" => "/root/slow.log",
          "host" => "elk-node1",
          "type" => "mysql-slowlog"
}
.........
.........

---------------------------------------------------------------------------------------------------------------------------------- 接下来描述会遇见到的一个问题: 一旦我们的elasticsearch出现问题,就不能进行日志采集处理了! 这种情况下该怎么办呢?

解决方案; 可以在client和elasticsearch之间添加一个中间件作为缓存,先将采集到的日志内容写到中间件上,然后再从中间件输入到elasticsearch中。 这就完美的解决了上述的问题了。

(4)ELK中使用redis作为中间件,缓存日志采集内容

1)redis的配置和启动

[root@elk-node1 ~]# vim /etc/redis.conf               #修改下面两行内容 daemonize yes bind 192.168.1.160 [root@elk-node1 ~]# systemctl start redis [root@elk-node1 ~]# lsof -i:6379 COMMAND PID USER FD TYPE DEVICE SIZE/OFF NODE NAME redis-ser 19474 redis 4u IPv4 1344465 0t0 TCP elk-node1:6379 (LISTEN) [root@elk-node1 ~]# redis-cli -h 192.168.1.160 192.168.1.160:6379> info # Server redis_version:2.8.19 .......

2)编写从Client端收集数据的文件

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
[root@elk-node1 ~]# vim redis-out.conf
input {
   stdin {}
}

output {
   redis {
      host => "192.168.1.160"
      port => "6379"
      db => "6"
      data_type => "list"
      key => "demo"
   }
}

3)执行收集数据的文件,并输入数据hello redis 

[root@elk-node1 ~]# /opt/logstash/bin/logstash -f redis-out.conf Settings: Default filter workers: 1 Logstash startup completed             #下面输入数据hello redis hello redis

4)在redis中查看数据

[root@elk-node1 ~]# redis-cli -h 192.168.1.160 192.168.1.160:6379> info # Server ....... ....... # Keyspace db6:keys=1,expires=0,avg_ttl=0                   #在最下面一行,显示是db6 192.168.1.160:6379> select 6 OK 192.168.1.160:6379[6]> keys * 1) "demo" 192.168.1.160:6379[6]> LINDEX demo -1 "{\"message\":\"hello redis\",\"@version\":\"1\",\"@timestamp\":\"2016-11-14T08:04:25.981Z\",\"host\":\"elk-node1\"}"

5)继续随便写点数据

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
[root@elk-node1 ~]# /opt/logstash/bin/logstash -f redis-out.conf 
Settings: Default filter workers: 1
Logstash startup completed
hello redis
123456
asdf
ert
wang
shi
bo
guohuihui
as
we
r
g

asdfjkdfsak
5423wer
34rt3
6y
7uj
u
io9
sdjfhsdk890
huanqiu
huanqiuchain
hqsb
asda    

6)在redis中查看

在redis中查看长度: [root@elk-node1 ~]# redis-cli -h 192.168.1.160 192.168.1.160:6379> info # Server redis_version:2.8.19 ....... ....... # Keyspace db6:keys=1,expires=0,avg_ttl=0      #显示是db6 192.168.1.160:6379> select 6 OK 192.168.1.160:6379[6]> keys * 1) "demo" 192.168.1.160:6379[6]> LLEN demo (integer) 24

7)将redis中的内容写到ES中

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
[root@elk-node1 ~]# vim redis-in.conf
input { 
    redis {
      host => "192.168.1.160"
      port => "6379"
      db => "6"
      data_type => "list"
      key => "demo"
   }
}

output {
    elasticsearch {
      hosts => ["192.168.1.160:9200"]
      index => "redis-in-%{+YYYY.MM.dd}"
    }
}

执行: [root@elk-node1 ~]# /opt/logstash/bin/logstash -f redis-in.conf --configtest Configuration OK [root@elk-node1 ~]# /opt/logstash/bin/logstash -f redis-in.conf &

在redis中查看,发现数据已被读出: 192.168.1.160:6379[6]> LLEN demo (integer) 0

登陆elasticsearch界面查看:

8)接着,将收集到的所有日志写入到redis中。这了重新定义一个添加redis缓存后的总文件shipper.conf。(可以将之前执行的总文件file.conf停掉)

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
[root@elk-node1 ~]# vim shipper.conf
input {
    file {
      path => "/var/log/messages"
      type => "system"
      start_position => "beginning"
    }
 
    file {
       path => "/var/log/elasticsearch/huanqiu.log"
       type => "es-error"
       start_position => "beginning"
       codec => multiline {
           pattern => "^\["
           negate => true
           what => "previous"
       }
    }
    file {
       path => "/var/log/nginx/access_json.log"
       codec => json
       start_position => "beginning"
       type => "nginx-log"
    }
    syslog {
        type => "system-syslog"
        host => "192.168.1.160"
        port => "514"
    }
 
}
 
 
output {
   if [type] == "system"{
     redis {
        host => "192.168.1.160"
        port => "6379"
        db => "6"
        data_type => "list"
        key => "system"
     }
   }
 
    if [type] == "es-error"{
      redis {
        host => "192.168.1.160"
        port => "6379"
        db => "6"
        data_type => "list"
        key => "demo"
        }
     }
    if [type] == "nginx-log"{    
       redis {
          host => "192.168.1.160"
          port => "6379"
          db => "6"
          data_type => "list"
          key => "nginx-log"
       }
    }
    if [type] == "system-syslog"{
       redis {
          host => "192.168.1.160"
          port => "6379"
          db => "6"
          data_type => "list"
          key => "system-syslog"
       }    
     }
}

执行上面的文件(提前将上面之前启动的file.conf文件的执行给结束掉!) [root@elk-node1 ~]# /opt/logstash/bin/logstash -f shipper.conf --configtest Configuration OK [root@elk-node1 ~]# /opt/logstash/bin/logstash -f shipper.conf Settings: Default filter workers: 1 Logstash startup completed

在redis中查看: [root@elk-node1 ~]# redis-cli -h 192.168.1.160 192.168.1.160:6379> info # Server redis_version:2.8.19 ....... ....... # Keyspace db6:keys=1,expires=0,avg_ttl=0                      #显示是db6 192.168.1.160:6379> select 6 OK 192.168.1.160:6379[6]> keys * 1) "demo" 2) "system" 192.168.1.160:6379[6]> keys * 1) "nginx-log" 2) "demo" 3) "system"

另开一个窗口,添加点日志: [root@elk-node1 ~]# logger "12325423" [root@elk-node1 ~]# logger "12325423" [root@elk-node1 ~]# logger "12325423" [root@elk-node1 ~]# logger "12325423" [root@elk-node1 ~]# logger "12325423" [root@elk-node1 ~]# logger "12325423"

又会增加日志: 192.168.1.160:6379[6]> keys * 1) "system-syslog" 2) "nginx-log" 3) "demo" 4) "system"

其实可以在任意的一台ES中将数据从redis读取到ES中。 下面咱们在elk-node2节点,将数据从redis读取到ES中:

编写文件:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
[root@elk-node2 ~]# cat file.conf
input {
     redis {
        type => "system"
        host => "192.168.1.160"
        port => "6379"
        db => "6"
        data_type => "list"
        key => "system"
     }

      redis {
        type => "es-error"
        host => "192.168.1.160"
        port => "6379"
        db => "6"
        data_type => "list"
        key => "es-error"
        }
       redis {
          type => "nginx-log"
          host => "192.168.1.160"
          port => "6379"
          db => "6"
          data_type => "list"
          key => "nginx-log"
       }
       redis {
          type => "system-syslog"
          host => "192.168.1.160"
          port => "6379"
          db => "6"
          data_type => "list"
          key => "system-syslog"
       }    

}


output {

    if [type] == "system"{
        elasticsearch {
           hosts => ["192.168.1.160:9200"]
           index => "system-%{+YYYY.MM.dd}"
        }
    }

    if [type] == "es-error"{
        elasticsearch {
           hosts => ["192.168.1.160:9200"]
           index => "es-error-%{+YYYY.MM.dd}"
        }
    }
    if [type] == "nginx-log"{
        elasticsearch {
           hosts => ["192.168.1.160:9200"]
           index => "nignx-log-%{+YYYY.MM.dd}"
        }
    }
    if [type] == "system-syslog"{
        elasticsearch {
           hosts => ["192.168.1.160:9200"]
           index => "system-syslog-%{+YYYY.MM.dd}"
        }
    }
}

执行: [root@elk-node2 ~]# /opt/logstash/bin/logstash -f file.conf --configtest Configuration OK [root@elk-node2 ~]# /opt/logstash/bin/logstash -f file.conf &

去redis中检查,发现数据已经被读出到elasticsearch中了。 192.168.1.160:6379[6]> keys * (empty list or set)

同时登陆logstash和kibana看,发现可以正常收集到日志了。

可以执行这个 去查看nginx日志 [root@elk-node1 ~]# ab -n10000 -c1 http://192.168.1.160/

也可以启动多个redis写到ES中,具体根据自己的实际情况而定。

以上即是本人部署ELK环境的整个操作记录,在此留笔,希望能帮助到一些朋友,如有问题,敬请提出~

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2016-09-29 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
暂无评论
推荐阅读
编辑精选文章
换一批
一线运维 DBA 五年经验常用 SQL 大全(二)
本文 SQL 及相关命令均是在运维工作中总结整理而成的,对于运维 DBA 来说可提高很大的工作效率,值得收藏。当然如果你全部能够背下来那就很牛逼了,如果不能,还是建议收藏下来慢慢看,每条 SQL 的使用频率都很高,肯定能够帮助到你。
JiekeXu之路
2021/03/15
9920
一线运维 DBA 五年经验常用 SQL 大全(二)
开发人员必学的几点 SQL 优化点
博主负责的项目主要采用阿里云数据库MySQL,最近频繁出现慢SQL告警,执行时间最长的竟然高达5分钟。导出日志后分析,主要原因竟然是没有命中索引和没有分页处理。其实这是非常低级的错误,我不禁后背一凉,团队成员的技术水平亟待提高啊。改造这些SQL的过程中,总结了一些经验分享给大家,如果有错误欢迎批评指正。
龙哥
2020/03/24
8130
基于Hadoop生态圈的数据仓库实践 —— 进阶技术(十)
十、杂项维度 本节讨论杂项维度。简单地说,杂项维度就是一种包含的数据具有很少可能值的维度。例如销售订单,它可能有很多离散数据(yes-no这种类型的值),如
用户1148526
2019/05/25
3510
(中)史上最全干货!Flink SQL 成神之路(全文 18 万字、138 个案例、42 张图)
CREATE 语句用于向当前或指定的 Catalog 中注册库、表、视图或函数。注册后的库、表、视图和函数可以在 SQL 查询中使用。
公众号:大数据羊说
2022/04/04
6.7K0
(中)史上最全干货!Flink SQL 成神之路(全文 18 万字、138 个案例、42 张图)
ORACLE常用性能监控SQL【一】
kill session: 执行 alter system kill session ‘761,876’(sid 为 761);
小小工匠
2021/08/16
2.9K0
一文学完所有的Hive Sql(两万字最全详解)
lateral view用于和split、explode等UDTF一起使用的,能将一行数据拆分成多行数据,在此基础上可以对拆分的数据进行聚合,lateral view首先为原始表的每行调用UDTF,UDTF会把一行拆分成一行或者多行,lateral view在把结果组合,产生一个支持别名表的虚拟表。
五分钟学大数据
2021/04/02
3.4K0
流数据湖平台Apache Paimon(二)集成 Flink 引擎
Paimon目前支持Flink 1.17, 1.16, 1.15 和 1.14。本课程使用Flink 1.17.0。
Maynor
2023/07/31
3.6K0
流数据湖平台Apache Paimon(二)集成 Flink 引擎
Apache Doris 支持 Arrow Flight SQL 协议,数据传输效率实现百倍飞跃
近年来,随着数据科学、数据湖分析等场景的兴起,对数据读取和传输速度提出更高的要求。而 JDBC/ODBC 作为与数据库交互的主流标准,在应对大规模数据读取和传输时显得力不从心,无法满足高性能、低延迟等数据处理需求。为提供更高效的数据传输方案,Apache Doris 在 2.1 版本中基于 Arrow Flight SQL 协议实现了高速数据传输链路,使得数据传输性能实现百倍飞跃。
SelectDB技术团队
2024/04/07
8680
Greenplum 实时数据仓库实践(7)——维度表技术
前面章节中,我们实现了实时多维数据仓库的基本功能,如使用Canal和Kafka实现实时数据同步,定义Greenplum rule执行实时数据装载逻辑等。本篇将继续讨论常见的维度表技术。
用户1148526
2022/01/06
2.7K0
Greenplum 实时数据仓库实践(7)——维度表技术
使用OGG 21c迁移Oracle 12c到MySQL 8.0并配置实时同步
OGG有传统的经典架构,也有最新的微服务,2个都可以远程捕获和应用数据,对数据库服务器是0侵入,而传统的经典架构是纯命令行模式,最新的微服务架构是图形化界面操作,几乎所有操作都可以在界面进行。相关文章可以参考:
AiDBA宝典
2023/04/26
1.5K0
使用OGG 21c迁移Oracle 12c到MySQL 8.0并配置实时同步
Oracle RAC基本维护指令
所有实例和服务的状态 $ srvctl status database -d orcl Instance orcl1 is running on node linux1 Instance orcl2 is running on node linux2 单个实例的状态 $ srvctl status instance -d orcl -i orcl2 Instance orcl2 is running on node linux2 在数据库全局命名服务的状态 $ srvctl status serv
猿人谷
2018/01/17
1.2K0
查询性能提升3倍!Apache Hudi 查询优化了解下?
从 Hudi 0.10.0版本开始,我们很高兴推出在数据库领域中称为 Z-Order和 Hilbert 空间填充曲线的高级数据布局优化技术的支持。
ApacheHudi
2022/01/04
1.8K0
查询性能提升3倍!Apache Hudi 查询优化了解下?
SQL优化案例解析:MINUS改写为标量子查询后提升5倍,但还可以再快近百倍
* GreatSQL社区原创内容未经授权不得随意使用,转载请联系小编并注明来源。 写在前面(By老叶)从GreatSQL 8.0.32-25版本开始支持Rapid引擎,该引擎使得GreatSQL能满足联机分析(OLAP)查询请求。老叶尝试利用Rapid引擎优化本案例,结果是相当可喜的,对比如下: -SQL执行耗时(秒)Rows_examinedRead_keyRead_nextRead_rnd_nextTmp_tablesTmp_disk_tablesTmp_table_sizesInnoDB_pages_distinct原始SQL9.943230200036368909070210877403112372448181标量子查询改写优化后2.294906728836617308100036382011284368182走Rapid引擎0.11763300001090000 上述测试结果表明:
老叶茶馆
2024/04/03
2630
SQL优化案例解析:MINUS改写为标量子查询后提升5倍,但还可以再快近百倍
最强最全面的Hive SQL开发指南,超四万字全面解析!
hive -S -e 'select table_cloum from table' -S,终端上的输出不会有mapreduce的进度,执行完毕,只会把查询结果输出到终端上。
五分钟学大数据
2021/12/02
8.4K0
最强最全面的Hive SQL开发指南,超四万字全面解析!
腾讯技术团队出品的《面向开发人员梳理的代码安全指南-Go安全指南》
使用"net/http"下的方法http.Get(url)、http.Post(url, contentType, body)、http.Head(url)、http.PostForm(url, data)、http.Do(req)时,如变量值外部可控(指从参数中动态获取),应对请求目标进行严格的安全校验。
公众号: 云原生生态圈
2021/09/26
1.4K0
大数据技术之_27_电商平台数据分析项目_03_项目概述 + 项目主体架构 + 模拟业务数据源 + 程序框架解析 + 需求解析 + 项目总结
1、user_visit_action user_visit_action 表,存放网站或者 APP 每天的点击流数据。通俗地讲,就是用户对 网站/APP 每点击一下,就会产生一条存放在这个表里面的数据。
黑泽君
2019/06/14
3.9K0
大数据技术之_27_电商平台数据分析项目_03_项目概述 + 项目主体架构 + 模拟业务数据源 + 程序框架解析 + 需求解析 + 项目总结
Greenplum 实时数据仓库实践(8)——事实表技术
上一篇里介绍了几种基本的维度表技术,并用示例演示了每种技术的实现过程。本篇说明多维数据仓库中常见的事实表技术。我们将讲述五种基本事实表扩展技术,分别是周期快照、累积快照、无事实的事实表、迟到的事实和累积度量。和讨论维度表一样,也会从概念开始认识这些技术,继而给出常见的使用场景,最后以销售订单数据仓库为例,给出实现代码和测试过程。
用户1148526
2022/04/13
1.9K0
Greenplum 实时数据仓库实践(8)——事实表技术
数据仓库开发 SQL 使用技巧总结
作者:dcguo 使用 sql 做数仓开发有一段时间了,现做一下梳理复盘,主要内容包括 sql 语法、特性、函数、优化、特殊业务表实现等。 mysql 数据结构 常用 innodb 存储为 B+ 树 特点 多路平衡树,m 个子树中间节点就包含 m 个元素,一个中间节点是一个 page(磁盘页) 默认 16 kb; 子节点保存了全部得元素,父节点得元素是子节点的最大或者最小元素,而且依然是有序得; 节点元素有序,叶子节点双向有序,便于排序和范围查询。 优势 平衡查找树,logn 级别 crud; 单一节点比二
腾讯技术工程官方号
2022/07/19
3.5K0
数据仓库开发 SQL 使用技巧总结
salesforce的功能_salesforce开发
161、【String.format escape curly braces – 转义花括号】:
全栈程序员站长
2022/11/01
7.4K0
salesforce的功能_salesforce开发
基于Hadoop生态圈的数据仓库实践 —— 进阶技术
五、快照 前面实验说明了处理维度的扩展。本节讨论两种事实表的扩展技术。 有些用户,尤其是管理者,经常要看某个特定时间点的数据。也就是说,他们需要数据的快照。周期快照和累积快照是两种常用的事实表扩展技术。 周期快照是在一个给定的时间对事实表进行一段时期的总计。例如,一个月销售订单周期快照汇总每个月底时总的销售订单金额。 累积快照用于跟踪事实表的变化。例如,数据仓库可能需要累积(存储)销售订单从下订单的时间开始,到订单中的商品被打包、运输和到达的各阶段的时间点数据来跟踪订单生命周期的进展情况。用户可能要取得在某个给定时间点,销售订单处理状态的累积快照。 下面说明周期快照和累积快照的细节问题。 1. 周期快照 下面以销售订单的月底汇总为例说明如何实现一个周期快照。 首先需要添加一个新的事实表。下图中的模式显示了一个名为month_end_sales_order_fact的新事实表。
用户1148526
2019/05/25
7010
推荐阅读
相关推荐
一线运维 DBA 五年经验常用 SQL 大全(二)
更多 >
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
本文部分代码块支持一键运行,欢迎体验
本文部分代码块支持一键运行,欢迎体验