Loading [MathJax]/jax/output/CommonHTML/config.js
前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >背景介绍

背景介绍

作者头像
钢铁知识库
发布于 2022-08-20 01:07:25
发布于 2022-08-20 01:07:25
72700
代码可运行
举报
文章被收录于专栏:python爬虫教程python爬虫教程
运行总次数:0
代码可运行

背景介绍

最近工作涉及几台新服务器的日志需要接入ELK系统,配置思路如下:

使用Filebeat收集本地日志数据,Filebeat监视日志目录或特定的日志文件,再发送到消息队列到kafka,然后logstash去获取消费,利用filter功能过滤分析,最终存储到elasticsearch中。

filebeat和flume都具有日志收集功能,不过filebeat更轻量,使用go语言编写占用资源更少,可以有很高的并发,带有内部模块(auditd,Apache,Nginx,System和MySQL),可通过一个指定命令来简化通用日志格式的收集,解析和可视化;flume使用java开发,需要安装java环境,相对会比较重。

当然两者也存在区别:Filebeat收集数据的速度大于写入速度的时候可能出现数据丢失的现象,而flume会在收集数据和写入数据之间做出调整,保证能在两者之间提供一种平稳的数据状态。可以实时的将分析数据并将数据保存在数据库或者其他系统中,不会出现数据丢失的现象。

以下仅记录配置过程及常见的几种排错命令,安装篇会独立一篇做详细介绍。

配置信息

filebeat配置

我是直接yum install filebeat一键安装的,这里不做具体讲解官网有详细介绍:

https://www.elastic.co/guide/en/beats/filebeat/current/index.html

安装完成后我们以配置采集/var/log/messages为例,配置如下

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
# egrep -v '#|^$' /etc/filebeat/filebeat.yml
filebeat.inputs:
- type: log
  enabled: true
  paths:
    - /var/log/messages
filebeat.config.modules:
  path: ${path.config}/modules.d/*.yml
  reload.enabled: false
setup.template.settings:
  index.number_of_shards: 1
setup.kibana:
output.kafka:
  hosts: ["10.114.102.30:9092", "10.114.102.31:9092", "10.114.102.32:9092", "10.114.102.33:9092", "10.114.102.34:9092"]
  topic: T621_messages
  partition.round_robin:
    reachable_only: false
  required_acks: 1
  compression: gzip
processors:
  - add_host_metadata:
      when.not.contains.tags: forwarded
  - add_cloud_metadata: ~
  - add_docker_metadata: ~
  - add_kubernetes_metadata: ~

有几个参数需要注意的:

paths表示需要提取的日志的路径,将日志输出到kafka中,创建topic

  • required_acks

0:这意味着生产者producer不等待来自broker同步完成的确认继续发送下一条(批)消息。此选项提供最低的延迟但最弱的耐久性保证(当服务器发生故障时某些数据会丢失,如leader已死,但producer并不知情,发出去的信息broker就收不到)。

1:这意味着producer在leader已成功收到的数据并得到确认后发送下一条message。此选项提供了更好的耐久性为客户等待服务器确认请求成功(被写入死亡leader但尚未复制将失去了唯一的消息)。

-1:这意味着producer在follower副本确认接收到数据后才算一次发送完成。 此选项提供最好的耐久性,我们保证没有信息将丢失,只要至少一个同步副本保持存活。 三种机制,性能依次递减 (producer吞吐量降低),数据健壮性则依次递增。

  • json.keys_under_root: true
  • json.add_error_key: true
  • json.message_key: log

这三行是识别json格式日志的配置,若日志格式不为json格式,需要注释掉,否则收集到的日志为filebeat的错误日志。

kafka配置

kafka原来已经安装并配置好了,这里不再说明具体安装过程,后续会出一篇ELK完整搭建过程。

这里不做重点讲解,可直接查官网:https://kafka.apache.org/documentation/#quickstart

因为有5台配合zookeeper做了集群,选其中一台配置如下:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
# egrep -v '#|^$' /home/kafka/kafka/config/server.properties
broker.id=1		#按顺序写,不要乱
listeners=PLAINTEXT://0.0.0.0:9092		 #自己的ip
advertised.listeners=PLAINTEXT://10.114.102.30:9092
num.network.threads=24
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/data/kafka
num.partitions=8
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=48
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=10.114.102.30:2181,10.114.102.31:2181,10.114.102.32:2181,10.114.102.33:2181,10.114.102.34:2181
zookeeper.connection.timeout.ms=6000
auto.create.topics.enable=false
group.initial.rebalance.delay.ms=0

注意:每台服务器除broker.id需要修改之外,其他属性保持一致。

logstash配置

logstash安装也是直接参考官网就可以了

https://www.elastic.co/guide/en/logstash/7.x/index.html

不过有个地方要注意,kafka和logstash的版本兼容问题,以下是kafka使用的版本:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
find /home/kafka/kafka/libs/ -name \*kafka_\* | head -1 | grep -o '\kafka[^\n]*'   
kafka_2.11-1.1.0.jar

通过查找rpm包可以看到logstash用的是7.8.0

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
/home/cxhchusr/logstash-7.8.0.rpm

conf.d目录下配置消费messages的文件如下

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
# egrep -v '#|^$' /etc/logstash/conf.d/T621_messages.conf 
input {
  kafka {
    bootstrap_servers => "10.114.102.30:9092,10.114.102.31:9092,10.114.102.32:9092,10.114.102.33:9092,10.114.102.34:9092"
    client_id => "T621_messages"
    group_id => "T621_messages"
    auto_offset_reset => "latest"
    consumer_threads => 10
    decorate_events => true
    topics => ["T621_messages"]
    decorate_events => true
    type => syslog
    }
}
filter{
  grok {
    match => { "message" => "%{SYSLOGLINE}" }
  }
  date {
    match => [ "logdate", "YYYY-MM-dd HH:mm:ss.SSS" ]
    target => "@timestamp"
    timezone =>"+00:00"
  }
  mutate{
    remove_field => "logdate"
  }
}
output {
  elasticsearch {
    hosts => ["10.114.102.30:9200", "10.114.102.31:9200", "10.114.102.32:9200", "10.114.102.33:9200", "10.114.102.34:9200"]
    index => "t621_messages-%{+YYYY.MM.dd}"
    user => caixun
    password => "******()90"
    }
}

注意:es索引需要全部为小写。

最后启动即可,并加入开机自启动/etc/rc.local

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
nohup /usr/share/logstash/bin/logstash -f /etc/logstash/conf.d/ > /dev/null 2>&1 &

kibana创建索引

logstash配置完成后即可在kibana创建索引

创建完成效果如下,表示接入成功:

常用排查命令

配置过程中除了在kafka上创建topic,还需查询topic是否创建成功、消费情况、以及消息处理情况。

以及es是否正常入库并创建了索引。下面列出几个ELK运维常用命令。

kafka常用运维指令

  • 查询当前topic列表

/home/kafka/kafka/bin/kafka-topics.sh --zookeeper 127.0.0.1:2181 --list

  • 创建topic

/home/kafka/kafka/bin/kafka-topics.sh --zookeeper 127.0.0.1:2181 --create --partitions 10 --replication-factor 1 --topic T621_messages

  • topic描述(某topic详细信息)

/home/kafka/kafka/bin/kafka-topics.sh --zookeeper 127.0.0.1:2181 --describe --topic T621_messages

  • topic消费情况(测试消息是否正常生产)

发送:/home/kafka/kafka/bin/kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic T621_messages

接收:/home/kafka/kafka/bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic T621_messages --from-beginning

  • 查看topic堆积情况

/home/kafka/kafka/bin/kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --list ##查看组列表

/home/kafka/kafka/bin/kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --describe --group T621_messages ##偏移量

/data/kafka/kafka/bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic T621_messages --from-beginning

ES常用运维指令

  • 查看ES集群健康情况

curl -u caixxx:"CAIxxx()90" '10.114.102.30:9200/_cluster/health'

  • 查看索引存储情况

curl -u caixxx:"CAIxxx()90" '10.114.102.30:9200/_cat/indices?v'

  • 查看帮助命令
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
# curl -u caixun:"CAIXUN()90" '10.114.102.30:9200/_cat'
=^.^=
/_cat/allocation
/_cat/shards
/_cat/shards/{index}
/_cat/master
/_cat/nodes
/_cat/tasks
/_cat/indices
/_cat/indices/{index}
/_cat/segments
/_cat/segments/{index}
/_cat/count
/_cat/count/{index}
/_cat/recovery
/_cat/recovery/{index}
/_cat/health
/_cat/pending_tasks
/_cat/aliases
/_cat/aliases/{alias}
/_cat/thread_pool
/_cat/thread_pool/{thread_pools}
/_cat/plugins
/_cat/fielddata
/_cat/fielddata/{fields}
/_cat/nodeattrs
/_cat/repositories
/_cat/snapshots/{repository}
/_cat/templates
/_cat/transforms
/_cat/transforms/{transform_id}

过滤索引查看消息是否成功存储在es,有的话代表配置成功。

---- 钢铁 648403020@qq.com 2021.08.20

参考鸣谢

官方kafka:http://kafka.apache.org/

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2021-08-20,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
暂无评论
推荐阅读
编辑精选文章
换一批
ELK+FileBeat+Kafka分布式系统搭建图文教程
filebeat收集需要提取的日志文件,将日志文件转存到kafka集群中,logstash处理kafka日志,格式化处理,并将日志输出到elasticsearch中,前台页面通过kibana展示日志。
王知无-import_bigdata
2020/08/20
2K0
ELK+FileBeat+Kafka分布式系统搭建图文教程
ELK 日志分析系统整合 KafKa Zookeeper 集群
kafka/zookeeper 集群配置请参考公众号ELK专栏《KafKa 工作原理 && 集群部署(一)》的文章。
Kevin song
2020/04/27
1.1K0
数据监控ElasticStack全家桶之容器化部署
/usr/share/logstash/pipeline/logstash.conf
蒋老湿
2020/12/24
9530
数据监控ElasticStack全家桶之容器化部署
进击消息中间件系列(三):Kafka 中 shell 命令使用
注:partitions指定topic分区数,replication-factor指定topic每个分区的副本数。
民工哥
2023/08/22
5400
进击消息中间件系列(三):Kafka 中 shell 命令使用
ELK+filebeat+kafka+zooKeeper搭建(单机版)
关于elk的配置参考我之前的一篇文章,不在累述: elk安装地址: https://jjlu521016.github.io/2018/05/01/springboot-logback-log4j-elk.html#2-elk%E9%85%8D%E7%BD%AE
日薪月亿
2019/05/14
1.3K0
海量日志归集与分析:ELK集群搭建
ELK是三个开源软件的缩写,分别表示:Elasticsearch , Logstash, Kibana,也可以指elk技术栈,包含一系列的组件。
品茗IT
2019/09/12
1.9K0
logstash kafka filebeat zabbix
logstash 启动多个conf 文件进行日志处理时,默认不是每个配置文件独立运行,而是作为一个整体,每个input会匹配所有的filter,然后匹配所有的output,这时就会导致数据被错误的处理以及发送到错误的地方;利用tags字段进行字段匹配避免数据被错误的处理。
Kevin song
2021/03/08
1.2K0
logstash kafka filebeat zabbix
【日志架构】ELK Stack + Kafka 端到端练习
在前一章中,我们已经学习了如何从头到尾地配置ELK堆栈。这样的配置能够支持大多数用例。然而,对于一个无限扩展的生产环境,瓶颈仍然存在:
架构师研究会
2020/08/12
5380
【日志架构】ELK Stack + Kafka 端到端练习
Kafka 常用运维脚本
集群管理 (1)启动 broker $ bin/kafka-server-start.sh daemon <path>/server.properties (2)关闭 broker $ bin/kafka-server-stop.sh topic 管理 kafka-topics.sh 脚本 # 创建主题 $ bin/kafka-topics.sh --create --zookeeper localhost:2181 --partitions 64 --replication-factor 3 --topi
张乘辉
2020/08/27
1.4K0
CentOS 7.6 部署ELK日志分析系统步骤
记录在CentOS 7.6下部署ELK日志分析系统的过程步骤,希望对大家有所帮助。
星哥玩云
2022/07/28
5070
不背锅运维:享一个具有高可用性和可伸缩性的ELK架构实战案例
在logstash01主机上配置logstash,使其能够消费kafka集群a中主题为"wordpress-nginx-log"的消息。
不背锅运维
2023/03/16
6540
不背锅运维:享一个具有高可用性和可伸缩性的ELK架构实战案例
kafka集群及与springboot集成
在#192.168.1.128服务器上生产者控制台输入:hello kafka进行测试
全栈程序员站长
2022/08/10
9280
kafka集群及与springboot集成
ELK+Kafka学习笔记之搭建ELK+Kafka日志收集系统集群
关于如何搭建ELK部分,请参考这篇文章,https://www.cnblogs.com/JetpropelledSnake/p/9893566.html。
Jetpropelledsnake21
2018/12/24
9K0
Apache Kafka-初体验Kafka(02)-Centos7下搭建单节点kafka_配置参数详解_基本命令实操
由于Kafka是用Scala语言开发的,运行在JVM上,因此在安装Kafka之前需要先安装JDK.
小小工匠
2021/08/17
5380
2-网站日志分析案例-日志采集:Flume-Kafka-Flume-HDFS
参考:https://blog.csdn.net/m0_38139250/article/details/121155903
用户2225445
2022/11/12
3510
2-网站日志分析案例-日志采集:Flume-Kafka-Flume-HDFS
Kafka 常用工具脚本总结
JMX 的全称为 Java Management Extensions。顾名思义,是管理 Java 的一种扩展,通过 JMX 可以方便我们监控 Kafka 的内存,线程,CPU 的使用情况,以及生产和消费消息的指标。
Se7en258
2021/10/09
1.1K0
Filebeat+Kafka+Logstash+Elasticsearch+Kibana 构建日志分析系统
随着时间的积累,日志数据会越来越多,当你需要查看并分析庞杂的日志数据时,可通过 Filebeat+Kafka+Logstash+Elasticsearch 采集日志数据到Elasticsearch(简称ES)中,并通过 Kibana 进行可视化展示与分析。
高楼Zee
2021/09/23
2.1K0
Filebeat+Kafka+Logstash+Elasticsearch+Kibana 构建日志分析系统
Kafka的定义和安装与配置
定义:Kafka是一个基于zookeeper协调的分布式、多副本的(replica)、支持分区的(partition)系统,它的最大的特性就是可以实时的处理大量数据以满足各种需求场景:比如基于hadoop的批处理系统、低延迟的实时系统、Storm/Spark流式处理引擎,web/nginx日志、访问日志,消息服务等等,用scala语言编写的项目。
Java廖志伟
2022/03/07
5850
Kafka的定义和安装与配置
docker安装ELK详细步骤
ELK主要由ElasticSearch、Logstash和Kibana三个开源软件组成。
小炜同学
2022/09/23
2.5K0
基于Kafka+ELK搭建海量日志平台
早在传统的单体应用时代,查看日志大都通过SSH客户端登服务器去看,使用较多的命令就是 less 或者 tail。如果服务部署了好几台,就要分别登录到这几台机器上看,等到了分布式和微服务架构流行时代,一个从APP或H5发起的请求除了需要登陆服务器去排查日志,往往还会经过MQ和RPC调用远程到了别的主机继续处理,开发人员定位问题可能还需要根据TraceID或者业务唯一主键去跟踪服务的链路日志,基于传统SSH方式登陆主机查看日志的方式就像图中排查线路的工人一样困难,线上服务器几十上百之多,出了问题难以快速响应,因此需要高效、实时的日志存储和检索平台,ELK就提供这样一套解决方案。
王知无-import_bigdata
2019/07/29
9.2K0
基于Kafka+ELK搭建海量日志平台
相关推荐
ELK+FileBeat+Kafka分布式系统搭建图文教程
更多 >
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
本文部分代码块支持一键运行,欢迎体验
本文部分代码块支持一键运行,欢迎体验