Loading [MathJax]/jax/input/TeX/config.js
前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >背景介绍

背景介绍

作者头像
钢铁知识库
发布于 2022-08-20 01:07:25
发布于 2022-08-20 01:07:25
72700
代码可运行
举报
文章被收录于专栏:python爬虫教程python爬虫教程
运行总次数:0
代码可运行

背景介绍

最近工作涉及几台新服务器的日志需要接入ELK系统,配置思路如下:

使用Filebeat收集本地日志数据,Filebeat监视日志目录或特定的日志文件,再发送到消息队列到kafka,然后logstash去获取消费,利用filter功能过滤分析,最终存储到elasticsearch中。

filebeat和flume都具有日志收集功能,不过filebeat更轻量,使用go语言编写占用资源更少,可以有很高的并发,带有内部模块(auditd,Apache,Nginx,System和MySQL),可通过一个指定命令来简化通用日志格式的收集,解析和可视化;flume使用java开发,需要安装java环境,相对会比较重。

当然两者也存在区别:Filebeat收集数据的速度大于写入速度的时候可能出现数据丢失的现象,而flume会在收集数据和写入数据之间做出调整,保证能在两者之间提供一种平稳的数据状态。可以实时的将分析数据并将数据保存在数据库或者其他系统中,不会出现数据丢失的现象。

以下仅记录配置过程及常见的几种排错命令,安装篇会独立一篇做详细介绍。

配置信息

filebeat配置

我是直接yum install filebeat一键安装的,这里不做具体讲解官网有详细介绍:

https://www.elastic.co/guide/en/beats/filebeat/current/index.html

安装完成后我们以配置采集/var/log/messages为例,配置如下

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
# egrep -v '#|^$' /etc/filebeat/filebeat.yml
filebeat.inputs:
- type: log
  enabled: true
  paths:
    - /var/log/messages
filebeat.config.modules:
  path: ${path.config}/modules.d/*.yml
  reload.enabled: false
setup.template.settings:
  index.number_of_shards: 1
setup.kibana:
output.kafka:
  hosts: ["10.114.102.30:9092", "10.114.102.31:9092", "10.114.102.32:9092", "10.114.102.33:9092", "10.114.102.34:9092"]
  topic: T621_messages
  partition.round_robin:
    reachable_only: false
  required_acks: 1
  compression: gzip
processors:
  - add_host_metadata:
      when.not.contains.tags: forwarded
  - add_cloud_metadata: ~
  - add_docker_metadata: ~
  - add_kubernetes_metadata: ~

有几个参数需要注意的:

paths表示需要提取的日志的路径,将日志输出到kafka中,创建topic

  • required_acks

0:这意味着生产者producer不等待来自broker同步完成的确认继续发送下一条(批)消息。此选项提供最低的延迟但最弱的耐久性保证(当服务器发生故障时某些数据会丢失,如leader已死,但producer并不知情,发出去的信息broker就收不到)。

1:这意味着producer在leader已成功收到的数据并得到确认后发送下一条message。此选项提供了更好的耐久性为客户等待服务器确认请求成功(被写入死亡leader但尚未复制将失去了唯一的消息)。

-1:这意味着producer在follower副本确认接收到数据后才算一次发送完成。 此选项提供最好的耐久性,我们保证没有信息将丢失,只要至少一个同步副本保持存活。 三种机制,性能依次递减 (producer吞吐量降低),数据健壮性则依次递增。

  • json.keys_under_root: true
  • json.add_error_key: true
  • json.message_key: log

这三行是识别json格式日志的配置,若日志格式不为json格式,需要注释掉,否则收集到的日志为filebeat的错误日志。

kafka配置

kafka原来已经安装并配置好了,这里不再说明具体安装过程,后续会出一篇ELK完整搭建过程。

这里不做重点讲解,可直接查官网:https://kafka.apache.org/documentation/#quickstart

因为有5台配合zookeeper做了集群,选其中一台配置如下:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
# egrep -v '#|^$' /home/kafka/kafka/config/server.properties
broker.id=1		#按顺序写,不要乱
listeners=PLAINTEXT://0.0.0.0:9092		 #自己的ip
advertised.listeners=PLAINTEXT://10.114.102.30:9092
num.network.threads=24
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/data/kafka
num.partitions=8
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=48
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=10.114.102.30:2181,10.114.102.31:2181,10.114.102.32:2181,10.114.102.33:2181,10.114.102.34:2181
zookeeper.connection.timeout.ms=6000
auto.create.topics.enable=false
group.initial.rebalance.delay.ms=0

注意:每台服务器除broker.id需要修改之外,其他属性保持一致。

logstash配置

logstash安装也是直接参考官网就可以了

https://www.elastic.co/guide/en/logstash/7.x/index.html

不过有个地方要注意,kafka和logstash的版本兼容问题,以下是kafka使用的版本:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
find /home/kafka/kafka/libs/ -name \*kafka_\* | head -1 | grep -o '\kafka[^\n]*'   
kafka_2.11-1.1.0.jar

通过查找rpm包可以看到logstash用的是7.8.0

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
/home/cxhchusr/logstash-7.8.0.rpm

conf.d目录下配置消费messages的文件如下

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
# egrep -v '#|^$' /etc/logstash/conf.d/T621_messages.conf 
input {
  kafka {
    bootstrap_servers => "10.114.102.30:9092,10.114.102.31:9092,10.114.102.32:9092,10.114.102.33:9092,10.114.102.34:9092"
    client_id => "T621_messages"
    group_id => "T621_messages"
    auto_offset_reset => "latest"
    consumer_threads => 10
    decorate_events => true
    topics => ["T621_messages"]
    decorate_events => true
    type => syslog
    }
}
filter{
  grok {
    match => { "message" => "%{SYSLOGLINE}" }
  }
  date {
    match => [ "logdate", "YYYY-MM-dd HH:mm:ss.SSS" ]
    target => "@timestamp"
    timezone =>"+00:00"
  }
  mutate{
    remove_field => "logdate"
  }
}
output {
  elasticsearch {
    hosts => ["10.114.102.30:9200", "10.114.102.31:9200", "10.114.102.32:9200", "10.114.102.33:9200", "10.114.102.34:9200"]
    index => "t621_messages-%{+YYYY.MM.dd}"
    user => caixun
    password => "******()90"
    }
}

注意:es索引需要全部为小写。

最后启动即可,并加入开机自启动/etc/rc.local

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
nohup /usr/share/logstash/bin/logstash -f /etc/logstash/conf.d/ > /dev/null 2>&1 &

kibana创建索引

logstash配置完成后即可在kibana创建索引

创建完成效果如下,表示接入成功:

常用排查命令

配置过程中除了在kafka上创建topic,还需查询topic是否创建成功、消费情况、以及消息处理情况。

以及es是否正常入库并创建了索引。下面列出几个ELK运维常用命令。

kafka常用运维指令

  • 查询当前topic列表

/home/kafka/kafka/bin/kafka-topics.sh --zookeeper 127.0.0.1:2181 --list

  • 创建topic

/home/kafka/kafka/bin/kafka-topics.sh --zookeeper 127.0.0.1:2181 --create --partitions 10 --replication-factor 1 --topic T621_messages

  • topic描述(某topic详细信息)

/home/kafka/kafka/bin/kafka-topics.sh --zookeeper 127.0.0.1:2181 --describe --topic T621_messages

  • topic消费情况(测试消息是否正常生产)

发送:/home/kafka/kafka/bin/kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic T621_messages

接收:/home/kafka/kafka/bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic T621_messages --from-beginning

  • 查看topic堆积情况

/home/kafka/kafka/bin/kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --list ##查看组列表

/home/kafka/kafka/bin/kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --describe --group T621_messages ##偏移量

/data/kafka/kafka/bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic T621_messages --from-beginning

ES常用运维指令

  • 查看ES集群健康情况

curl -u caixxx:"CAIxxx()90" '10.114.102.30:9200/_cluster/health'

  • 查看索引存储情况

curl -u caixxx:"CAIxxx()90" '10.114.102.30:9200/_cat/indices?v'

  • 查看帮助命令
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
# curl -u caixun:"CAIXUN()90" '10.114.102.30:9200/_cat'
=^.^=
/_cat/allocation
/_cat/shards
/_cat/shards/{index}
/_cat/master
/_cat/nodes
/_cat/tasks
/_cat/indices
/_cat/indices/{index}
/_cat/segments
/_cat/segments/{index}
/_cat/count
/_cat/count/{index}
/_cat/recovery
/_cat/recovery/{index}
/_cat/health
/_cat/pending_tasks
/_cat/aliases
/_cat/aliases/{alias}
/_cat/thread_pool
/_cat/thread_pool/{thread_pools}
/_cat/plugins
/_cat/fielddata
/_cat/fielddata/{fields}
/_cat/nodeattrs
/_cat/repositories
/_cat/snapshots/{repository}
/_cat/templates
/_cat/transforms
/_cat/transforms/{transform_id}

过滤索引查看消息是否成功存储在es,有的话代表配置成功。

---- 钢铁 648403020@qq.com 2021.08.20

参考鸣谢

官方kafka:http://kafka.apache.org/

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2021-08-20,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
暂无评论
推荐阅读
编辑精选文章
换一批
二叉树经典题题解(超全题目)(力扣)
题目链接:https://leetcode.cn/problems/binary-tree-preorder-traversal/
用户11039529
2024/03/25
3290
二叉树:看看这些树的最大深度
「精简之后的代码根本看不出是哪种遍历方式,也看不出递归三部曲的步骤,所以如果对二叉树的操作还不熟练,尽量不要直接照着精简代码来学。」
代码随想录
2020/10/10
1.6K0
二叉树:看看这些树的最大深度
【代码随想录】二刷-二叉树
二叉树中章节中,相对于迭代,递归有时候会更好理解,部分题用到了马上要刷的回溯算法。
半生瓜的blog
2023/05/13
8500
【代码随想录】二刷-二叉树
九十五、二叉树的递归和非递归的遍历算法模板
刷Leetcode,需要知道一定的算法模板,本次先总结下二叉树的递归和非递归的遍历算法模板。
润森
2022/08/18
4610
二叉树:我有多少个节点?
如果之前两篇二叉树:看看这些树的最大深度, 二叉树:看看这些树的最小深度都认真看了的话,这道题目可以分分钟刷掉了,愉快过节!
代码随想录
2020/10/10
1.2K0
二叉树:我有多少个节点?
二叉树:看看这些树的最小深度
遍历顺序上依然是后序遍历(因为要比较递归返回之后的结果),但在处理中间节点的逻辑上,最大深度很容易理解,最小深度可有一个误区,如图:
代码随想录
2020/10/10
5410
二叉树:看看这些树的最小深度
二叉树问题(三)-LeetCode 669、951、662、199、538、236(中序,层次遍历)
给定一个二叉搜索树,同时给定最小边界L 和最大边界 R。通过修剪二叉搜索树,使得所有节点的值在[L, R]中 (R>=L) 。你可能需要改变树的根节点,所以结果应当返回修剪好的二叉搜索树的新的根节点。
算法工程师之路
2019/12/24
6380
剑指offer 把二叉树打印成多行
题目描述 从上到下按层打印二叉树,同一层结点从左至右输出。每一层输出一行。 【思路】使用队列实现二叉树的层次遍历。 /* struct TreeNode { int val; struct TreeNode *left; struct TreeNode *right; TreeNode(int x) : val(x), left(NULL), right(NULL) { } }; */ class Solution { public:
week
2019/04/01
2730
二叉树:你真的会翻转二叉树么?
这道题目背后有一个让程序员心酸的故事,听说 Homebrew的作者Max Howell,就是因为没在白板上写出翻转二叉树,最后被Google拒绝了。(真假不做判断,权当一个乐子哈)
代码随想录
2020/10/10
1.7K0
二叉树:你真的会翻转二叉树么?
二叉树的最大深度,最小深度两种解法(C++)
若想看更详细的二叉树相关题目,请移步:二叉树经典题题解(超全题目)(力扣)-CSDN博客
用户11039529
2024/03/25
1180
二叉树构建与遍历-LeetCode 103、108、109(二叉树的构建,层次遍历)
这道题目依然是层次遍历的应用,与剑指Offer中的"之字形打印二叉树"是一样的,根据层次遍历的思路,可以将每一层压入到数组中,当层数为奇数的话,从而将该层的数据压入数组中,并进行反转!如果是偶数的话,则保持不变!
算法工程师之路
2019/11/04
5270
N叉树问题-LeetCode 429、589、590、105、106(构建二叉树,N叉树遍历)
给定一个 N 叉树,返回其节点值的层序遍历。(即从左到右,逐层遍历)。 例如,给定一个 3叉树 :
算法工程师之路
2019/11/26
1.2K0
N叉树问题-LeetCode 429、589、590、105、106(构建二叉树,N叉树遍历)
69. 二叉树的层次遍历层次遍历+queue
给出一棵二叉树,返回其节点值的层次遍历(逐层从左往右访问) 样例 给一棵二叉树 {3,9,20,#,#,15,7} :
和蔼的zhxing
2018/09/04
1K0
二叉树:我的左下角的值是多少?
本地要找出树的最后一行找到最左边的值。此时大家应该想起用层序遍历是非常简单的了,反而用递归的话会比较难一点。
代码随想录
2020/10/10
4410
二叉树:我的左下角的值是多少?
LeetCode | 102.二叉树的层次遍历
上面的题就是 二叉树的层次遍历 题目的截图,同时 LeetCode 会根据选择的语言给出一个类的定义或者函数的定义,然后在其中实现 二叉树的层次遍历 的解题过程。这次我使用 C++ 语言来进行完成。
码农UP2U
2020/08/26
4660
二叉树及其三种遍历[通俗易懂]
<3>.若二叉树按照从上到下从左到右依次编号,则若某节点编号为k,则其左右子树根节点编号分别为2k和2k+1;
全栈程序员站长
2022/08/23
1.1K0
二叉树及其三种遍历[通俗易懂]
70. 二叉树的层次遍历 II借助stack
给出一棵二叉树,返回其节点值从底向上的层次序遍历(按从叶节点所在层到根节点所在的层遍历,然后逐层从左往右遍历)。 原题见这里 这是另外一个题的扩展,见69题,这个题目要求层次遍历二叉树,并且把每一层放入一个vector,整体返回一个vector。 这个题要求是最底层的放在最前面。
和蔼的zhxing
2018/09/04
4660
70. 二叉树的层次遍历 II借助stack
LeetCode | 107.二叉树的层次遍历2
这次来写一下 LeetCode 的第 107 题,二叉树的层次遍历2。
码农UP2U
2020/10/29
3380
LeetCode | 107.二叉树的层次遍历2
【算法】使数组有序的最小交换次数
相关参考: 数组排序 使得交换次数最少 ,该文章中代码出现了一处错误,看起来作者好像很长时间没有更新了,在此纠正下。 TsReaper-6235. 逐层排序二叉树所需的最少操作数目,参考该题解的评论区的作者解答,进行纠正。 贪心思想,每一步使得对应元素放到它该放的位置。 先将要排序的数组复制一份,然后将其排序,使用哈希表记录排序后的数组对应元素与其对应下标。 遍历原数组与排序后的数组,如果对应下标不相等,则根据哈希表记录该元素的下标进行交换。 int getMinSwap(vect
半生瓜的blog
2023/05/13
5070
【算法】使数组有序的最小交换次数
二叉树常见算法总结和C++实现
DFS深度搜索(从上到下)和分治法区别:前者一般将最终结果通过引用参数传入,或者一般递归返回结果最终合并
evenleo
2020/08/21
1K0
推荐阅读
相关推荐
二叉树经典题题解(超全题目)(力扣)
更多 >
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
本文部分代码块支持一键运行,欢迎体验
本文部分代码块支持一键运行,欢迎体验