大家好,又见面了,我是你们的朋友全栈君。
# -*- coding: utf-8 -*-
import pyshark
from scapy.all import *
import matplotlib.pyplot as plt
# 读取pcap文件
packets = pyshark.FileCapture("./net_package.pcap")
def protocal(packets):
"""
制作流量协议类型直方图
:param packets: 读取的pcap文件数据
"""
# 新建空字典
dict = {}
for packet in packets:
if packet.highest_layer not in dict.keys():
dict[packet.highest_layer] = 1
else:
dict[packet.highest_layer] += 1
# print(dict)
keys = dict.keys()
values = dict.values()
plt.figure(figsize=(8, 20), dpi=80)
plt.bar(keys, values)
plt.xticks(rotation=45)
plt.xlabel('protocal')
plt.ylabel('amount')
plt.title('the amounts of all protocals')
plt.show()
# print(proto_sum)
def graph_size(packets):
"""
作流量大小时序图
:param packets: 读取的pcap文件数据
"""
time_stamps = []
print("正在统计中。。。")
for packet in packets:
# print(int(float(packet.sniff_timestamp)))
time_stamps.append(int(float(packet.sniff_timestamp)))
# print(time_stamps)
print("统计完成!")
d = int(float(input("请输入时间间隔(单位:分钟):")) * 60)
# d = 30 #半分钟
num_bins = (max(time_stamps) - min(time_stamps)) // d
step = len(time_stamps) // num_bins
time_labels = [time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(i)) for i in time_stamps[::step]]
# 新建20*8英寸图形,分辨率为80
plt.figure(figsize=(20, 8), dpi=80)
# X轴分布数据以及num_bins条柱状图
plt.hist(time_stamps, num_bins)
# 标签旋转角度45
plt.xticks(range(min(time_stamps), max(time_stamps) + d, d), time_labels, rotation=45)
# plt.xticks(range(min(time_stamps),max(time_stamps)+d,d),rotation = 45)
plt.xlabel("timestamp")
plt.ylabel("amount")
plt.title("amount of per " + str(d) + " s")
plt.show()
def filter(packets):
"""
显示过滤器
:param packets: 读取的pcap文件数据
"""
protocal = input("请输入协议类型:")
begin_time = input("请输入开始时间(Example:2019-09-09 10:58:42):")
end_time = input("请输入结束时间(Example:2019-09-09 11:40:00):")
length = int(input("请输入最大长度限制(单位:字节):"))
# time.strptime把固定格式时间转换为时间元组
array_begin_time = time.strptime(begin_time, "%Y-%m-%d %H:%M:%S")
# time.mktime把时间元组转换为以秒表示的时间
begin_time_stamp = float(time.mktime(array_begin_time))
# print("begin_time_stamp:"+str(begin_time_stamp))
array_end_time = time.strptime(end_time, "%Y-%m-%d %H:%M:%S")
end_time_stamp = float(time.mktime(array_end_time))
# print("end_time_stamp:"+str(end_time_stamp))
packlist = []
for packet in packets:
# sniff_timestamp获取开始嗅探的时间戳
time_stamp = float(packet.sniff_timestamp)
# 获取数据包的捕获长度
size = float(packet.captured_length)
if packet.highest_layer == protocal and time_stamp > begin_time_stamp and time_stamp < end_time_stamp and size <= length:
print(packet)
packlist.append(packet)
print("过滤出的数据包个数为 %s" % len(packlist))
# 调用函数进行操作
protocal(packets)
graph_size(packets)
filter(packets)
发布者:全栈程序员栈长,转载请注明出处:https://javaforall.cn/130923.html原文链接:https://javaforall.cn