一、日志收集及告警项目背景
近来安全测试项目较少,想着把安全设备、nginx日志收集起来并告警, 话不多说,直接说重点,搭建背景:
1. 日志源:安全设备日志(Imperva WAF、绿盟WAF、paloalto防火墙)、nginx日志等;
2. 日志分析开源软件:ELK,告警插件:Sentinl 或elastalert,告警方式:钉钉和邮件;
3. 安全设备日志->logstash->es,nginx日志由于其他部门已有一份(flume->kafka)我们通过kafka->logstash->es再输出一份,其中logstash的正则过滤规则需要配置正确,不然比较消耗性能,建议写之前使用grokdebug先测试好再放入配置文件;
4. 搭建系统:centos 7 , JDK 1.8, Python 2.7
5. ELK统一版本为5.5.2
Es:https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-5.5.2.tar.gz
Kibana: https://artifacts.elastic.co/downloads/kibana/kibana-5.5.2-linux-x86_64.tar.gz
Logstash: https://artifacts.elastic.co/downloads/logstash/logstash-5.5.2.tar.gz
测试链接Grokdebug:http://grokdebug.herokuapp.com
二、安全设备日志收集
2.1 Imperva WAF配置
策略->操作集->新增日志告警规则
我这边没有用设备自身的一些日志规则,而是根据手册自定义了一些需要的日志字段,可在自定义策略的消息填入如下字段:
参数说明:告警开始时间、告警ID、事件ID、事情数量、告警级别…. HTTP返回码、触发告警字符串、响应动作、响应时间、响应大小、http包头的值,中间省略的部分请自行查看手册。其实Imperva WAF的总日志字段数不少于一两百个,单从这一点可以看出确实好于国产WAF太多。
针对Imperva WAF的logstash配置如下:
说明:其中geoip为elk自带插件,可以解析ip归属地,比如ip归属的国家及城市,在仪表盘配置“地图炮”装X、查看攻击源地理位置的时候有点用,
2.2 绿盟WAF配置
日志报表->日志管理配置->Syslog配置&日志发生参数
针对绿盟WAF的logstash配置如下:
input和output参照imperva waf,贴出最要的grok部分,如下:
2.3 paloalto防火墙配置
(6.1版本,其他版本可能会有点差异)
新建syslog服务器->日志转发,具体看截图
针对PA的logstash配置如下:
input和output参照imperva waf,贴出最要的grok部分,如下:
贴两张最终的效果图:
三、Nginx日志收集
由于nginx日志已经被其他大数据部门收集过一遍了,为避免重复读取,我们从其他部门的kafka拉取过来即可,这里说一下nginx收集的方式,flume->kafka 示例配置方式如下:
Flume配置如下
配置扫描日志文件
log_analysis_test.conf配置文件
a1.sources=s1 #可以理解为输入端,定义名称为s1
a1.channels=c1 #传输频道,类似队列,定义为c1,设置为内存模式
a1.sinks=k1 #可以理解为输出端,定义为sk1
#source配置
a1.sources.s1.command=tail -F/data/log/nginx/crf_crm.access.log
#channel配置
kafka配置
kafka下载
配置zookeeper,根据机器状况更改jvm 内存设置
vimbin/zookeeper-server-start.sh
配置kafka
启动zookeeper
启动kafka
创建应用服务器的topic
分区及副本以自己的情况而定
查看Topic是否创建成功
启动kafka生产者
启动kafka消费者
logstash 配置文件
四、安全告警
安全告警可以是Sentinl或 elastalert,Sentinl是kibana插件,可以集成到kibana内图形化展示,但是写规则时需要对JS较熟悉,elastalert 是es的插件,不支持集成到kibana界面进行图形化展示。下面分别对这2个插件进行安装及配置说明。
4.1 Sentinl
安装如下:
5.4sentinl-v5.4.1.zip
安装后,以IP请求频次告警设置为例:
每2分钟去es查询一次
针对需要监控的IP字段
Input的body内容即es查询语法
编写过滤条件
如果想对状态码做监控,参考如下:
Action里面添加钉钉群机器人的webhook
钉钉报警如下
钉钉的接口文档链接:
https://open-doc.dingtalk.com/docs/doc.htm?spm=a219a.7629140.0.0.karFPe&treeId=257&articleId=105735&docType=1
邮件告警设置如下:
4.2 elastalert
前置条件:
JDK 1.8
python 2.7
easy_install -U setuptools (最新setuptools-39.2.0)
yum install gcc
yum install python-devel
yum install libffi-devel
安装
下载https://github.com/xuyaoqiang/elastalert-dingtalk-plugin
elastalert-dingtalk-plugin中的elastalert_modules复制到elastalert下
创建索引
输入 es的IP 及 es的端口,其余根据自己的环境写,一般默认即可
配置config.yaml,以下为关键配置信息:
rules_folder: example_rules #指定rule的目录
run_every:
minutes: 1 #每一分钟去探测一次
buffer_time:
minutes: 15 #缓存15分钟
es_host: x.x.x.x
es_port: 9200
writeback_index: elastalert_status #创建的告警索引
alert_time_limit:
days: 2 #失败重试的时间限制
下面先以钉钉告警为例:
在example_rules里面新建钉钉告警配置文件,内容如下
es_host: x.x.x.x
es_port: 9200
name: xxx安全告警
type: cardinality
index: nsfocuswaf_syslog
cardinality_field: src_ip
max_cardinality: 30
timeframe:
minutes: 5 #单IP 5分钟内访问超过30次就会告警
aggregation_key: src_ip
summary_table_fields:
src_ip
dst_ip
dst_port
Attack_types
stat_time
alert:
-“elastalert_modules.dingtalk_alert.DingTalkAlerter”
dingtalk_webhook: “your webhook”
dingtalk_msgtype: text
群机器人的配置比较简单,自己搜索一下即可
注意如果告警匹配了N条,却只发出1条告警,修改elastalert.py代码,在856行后面增加如下代码:
修改dingtalk_alert.py的代码,增加如下内容:
如果没有过滤除自定义的字段,只有message字段的话,可以新增如下代码:
最终效果图如下:
邮件的rule配置文件如下:
邮件告警最终效果如下:
五、总结
相关进程运行命令如下:
后台启动es
后台启动logstash
后台启动kibana
启动flume
启动kafka
后台启动elastalert
常见的告警策略除了来自安全设备的正则之外,大量的IP请求、错误状态码、nginx的request请求中包含的特征码也都是常见的告警规则。
参考链接:
http://www.freebuf.com/articles/others-articles/161905.html
http://www.freebuf.com/articles/web/160254.html
*本文原创作者:hackerbaba,本文属FreeBuf原创奖励计划,未经许可禁止转载
领取专属 10元无门槛券
私享最新 技术干货