
作者:HOS(安全风信子) 日期:2026-01-09 来源平台:GitHub 摘要: 类别不平衡是机器学习中的常见问题,在安全领域尤为突出。稀有攻击检测、异常行为识别等场景中,正样本(攻击样本)往往只占总样本的极小比例,导致传统机器学习模型倾向于预测多数类,性能严重下降。本文从安全视角出发,深入探讨类别不平衡问题的系统性解决方案,包括数据层面、算法层面、模型层面和系统层面的多种技术。通过分析最新的研究进展和工业实践,结合实际代码案例,展示如何构建高效的不平衡学习系统,提高稀有攻击的检测率。文章重点讨论了安全领域中类别不平衡的特点、基于SMOTE的高级数据增强技术、动态加权损失函数、集成学习方法以及对抗性不平衡学习,为读者提供了一套完整的安全机器学习不平衡问题解决方案。
在安全领域,类别不平衡问题非常普遍。例如:
传统的机器学习模型假设各类别样本数量大致均衡,在不平衡数据上表现不佳。模型往往倾向于预测多数类,导致少数类(攻击样本)的召回率极低,无法满足安全需求。
最新研究表明,超过70%的安全机器学习项目面临类别不平衡问题,而超过50%的项目因不平衡问题导致模型性能不达标。因此,解决类别不平衡问题是安全机器学习工程化的核心挑战之一。
当前,类别不平衡学习领域正呈现出以下几个重要趋势:
安全领域的类别不平衡具有以下特点:
传统的SMOTE(合成少数类过采样技术)在处理复杂数据时效果有限。本文提出基于深度学习的数据增强技术,包括:
传统的加权损失函数通常采用固定权重,无法适应动态变化的不平衡数据。本文提出动态加权损失函数,包括:
攻击者可能主动攻击模型,导致其在少数类上表现不佳。本文提出对抗性不平衡学习框架,包括:
类别不平衡解决方案可以分为以下几个层面:
层面 | 主要技术 | 代表方法 | 适用场景 |
|---|---|---|---|
数据层面 | 重采样 | 过采样(SMOTE)、欠采样(Tomek Links) | 数据量较小的场景 |
算法层面 | 损失函数调整 | 加权损失、Focal Loss | 各种场景,尤其是深度学习 |
模型层面 | 结构调整 | 注意力机制、对抗训练 | 深度学习场景 |
集成层面 | 集成学习 | Bagging、Boosting | 大规模数据场景 |
系统层面 | 多模型融合 | 规则+ML、多模型投票 | 生产环境部署 |
数据层面的解决方案主要通过调整样本分布来解决类别不平衡问题,包括过采样和欠采样两种方法。
过采样是通过增加少数类样本数量来平衡数据分布,常用的方法包括:
下面是SMOTE算法的实现示例:
import numpy as np
from sklearn.neighbors import NearestNeighbors
class SMOTE:
def __init__(self, k_neighbors=5, sampling_strategy='auto'):
self.k_neighbors = k_neighbors
self.sampling_strategy = sampling_strategy
self.nn = None
def fit_resample(self, X, y):
"""
执行SMOTE算法,生成平衡数据集
X: 特征矩阵,形状为(n_samples, n_features)
y: 标签向量,形状为(n_samples,)
返回: (X_resampled, y_resampled),平衡后的特征矩阵和标签向量
"""
# 确定少数类和多数类
classes, counts = np.unique(y, return_counts=True)
minority_class = classes[np.argmin(counts)]
majority_class = classes[np.argmax(counts)]
# 计算需要生成的少数类样本数量
if self.sampling_strategy == 'auto':
n_minority = counts[np.argmin(counts)]
n_majority = counts[np.argmax(counts)]
n_samples_to_generate = n_majority - n_minority
else:
n_samples_to_generate = int(self.sampling_strategy * len(X))
# 获取少数类样本
X_minority = X[y == minority_class]
y_minority = y[y == minority_class]
# 训练KNN模型
self.nn = NearestNeighbors(n_neighbors=self.k_neighbors)
self.nn.fit(X_minority)
# 生成合成样本
synthetic_samples = []
for _ in range(n_samples_to_generate):
# 随机选择一个少数类样本
idx = np.random.randint(0, len(X_minority))
sample = X_minority[idx]
# 找到其K个最近邻
distances, indices = self.nn.kneighbors([sample], n_neighbors=self.k_neighbors+1)
# 排除样本自身
indices = indices[0][1:]
# 随机选择一个最近邻
neighbor_idx = np.random.choice(indices)
neighbor = X_minority[neighbor_idx]
# 生成合成样本
alpha = np.random.random()
synthetic_sample = sample + alpha * (neighbor - sample)
synthetic_samples.append(synthetic_sample)
# 转换为numpy数组
synthetic_samples = np.array(synthetic_samples)
synthetic_labels = np.full(len(synthetic_samples), minority_class)
# 合并原始数据和合成数据
X_resampled = np.vstack([X, synthetic_samples])
y_resampled = np.hstack([y, synthetic_labels])
return X_resampled, y_resampled
# 示例用法
if __name__ == "__main__":
# 生成不平衡数据
from sklearn.datasets import make_classification
X, y = make_classification(n_samples=1000, n_features=20, n_informative=15,
n_redundant=5, n_classes=2, weights=[0.9, 0.1],
random_state=42)
print(f"原始数据形状: X={X.shape}, y={y.shape}")
print(f"原始数据类别分布: {np.bincount(y)}")
# 使用SMOTE生成平衡数据
smote = SMOTE(k_neighbors=5, sampling_strategy='auto')
X_resampled, y_resampled = smote.fit_resample(X, y)
print(f"平衡数据形状: X={X_resampled.shape}, y={y_resampled.shape}")
print(f"平衡数据类别分布: {np.bincount(y_resampled)}")这段代码实现了SMOTE算法的核心逻辑,包括:
欠采样是通过减少多数类样本数量来平衡数据分布,常用的方法包括:
算法层面的解决方案主要通过调整损失函数或学习策略来解决类别不平衡问题,包括加权损失、Focal Loss等。
加权损失函数通过为不同类别分配不同的权重,使模型更加关注少数类。常用的加权损失函数包括:
下面是加权交叉熵损失和Focal Loss的实现示例:
import torch
import torch.nn as nn
import torch.nn.functional as F
class WeightedCrossEntropyLoss(nn.Module):
"""
加权交叉熵损失
weight: 类别权重,形状为(n_classes,)
"""
def __init__(self, weight=None):
super(WeightedCrossEntropyLoss, self).__init__()
self.weight = weight
def forward(self, input, target):
return F.cross_entropy(input, target, weight=self.weight)
class FocalLoss(nn.Module):
"""
Focal Loss
alpha: 类别权重,形状为(n_classes,)
gamma: 调制因子,默认2
"""
def __init__(self, alpha=None, gamma=2.0, reduction='mean'):
super(FocalLoss, self).__init__()
self.alpha = alpha
self.gamma = gamma
self.reduction = reduction
def forward(self, input, target):
# 计算交叉熵损失
ce_loss = F.cross_entropy(input, target, weight=self.alpha, reduction='none')
# 计算预测概率
p_t = torch.exp(-ce_loss)
# 计算Focal Loss
loss = (1 - p_t) ** self.gamma * ce_loss
# 应用reduction
if self.reduction == 'mean':
return loss.mean()
elif self.reduction == 'sum':
return loss.sum()
else:
return loss
# 示例用法
if __name__ == "__main__":
# 生成示例数据
input = torch.randn(10, 2, requires_grad=True)
target = torch.randint(0, 2, (10,))
print(f"输入形状: {input.shape}")
print(f"目标标签: {target}")
# 计算普通交叉熵损失
ce_loss = F.cross_entropy(input, target)
print(f"普通交叉熵损失: {ce_loss.item()}")
# 计算加权交叉熵损失
# 假设类别0为多数类,类别1为少数类,分配更高权重
weight = torch.tensor([1.0, 5.0])
weighted_ce_loss = WeightedCrossEntropyLoss(weight=weight)
loss = weighted_ce_loss(input, target)
print(f"加权交叉熵损失: {loss.item()}")
# 计算Focal Loss
focal_loss = FocalLoss(alpha=weight, gamma=2.0)
loss = focal_loss(input, target)
print(f"Focal Loss: {loss.item()}")这段代码实现了加权交叉熵损失和Focal Loss,包括:
集成学习通过组合多个模型来提高整体性能,在不平衡数据上表现良好。常用的集成学习方法包括:
模型层面的解决方案主要通过调整模型结构来提高对少数类的关注,包括注意力机制、对抗训练等。
注意力机制可以使模型更加关注少数类样本,提高其识别能力。常用的注意力机制包括:
下面是通道注意力机制的实现示例:
import torch
import torch.nn as nn
import torch.nn.functional as F
class ChannelAttention(nn.Module):
"""
通道注意力模块
in_channels: 输入通道数
ratio: 压缩比率,默认16
"""
def __init__(self, in_channels, ratio=16):
super(ChannelAttention, self).__init__()
# 全局平均池化
self.avg_pool = nn.AdaptiveAvgPool2d(1)
# 全局最大池化
self.max_pool = nn.AdaptiveMaxPool2d(1)
# 共享MLP
self.mlp = nn.Sequential(
nn.Conv2d(in_channels, in_channels // ratio, 1, bias=False),
nn.ReLU(),
nn.Conv2d(in_channels // ratio, in_channels, 1, bias=False)
)
# Sigmoid激活
self.sigmoid = nn.Sigmoid()
def forward(self, x):
# 计算平均池化特征
avg_out = self.mlp(self.avg_pool(x))
# 计算最大池化特征
max_out = self.mlp(self.max_pool(x))
# 相加并经过Sigmoid激活
out = self.sigmoid(avg_out + max_out)
# 与输入特征相乘
return x * out
class ResNetBlock(nn.Module):
"""
带有通道注意力的ResNet块
in_channels: 输入通道数
out_channels: 输出通道数
stride: 步长,默认1
"""
def __init__(self, in_channels, out_channels, stride=1):
super(ResNetBlock, self).__init__()
# 主分支
self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size=3, stride=stride, padding=1, bias=False)
self.bn1 = nn.BatchNorm2d(out_channels)
self.relu = nn.ReLU(inplace=True)
self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size=3, stride=1, padding=1, bias=False)
self.bn2 = nn.BatchNorm2d(out_channels)
# 通道注意力
self.ca = ChannelAttention(out_channels)
# 短路连接
self.shortcut = nn.Sequential()
if stride != 1 or in_channels != out_channels:
self.shortcut = nn.Sequential(
nn.Conv2d(in_channels, out_channels, kernel_size=1, stride=stride, bias=False),
nn.BatchNorm2d(out_channels)
)
def forward(self, x):
out = self.relu(self.bn1(self.conv1(x)))
out = self.bn2(self.conv2(out))
out = self.ca(out) # 应用通道注意力
out += self.shortcut(x)
out = self.relu(out)
return out
# 示例用法
if __name__ == "__main__":
# 生成示例数据
x = torch.randn(1, 64, 32, 32)
print(f"输入形状: {x.shape}")
# 创建通道注意力模块
ca = ChannelAttention(in_channels=64)
out = ca(x)
print(f"通道注意力输出形状: {out.shape}")
# 创建带有通道注意力的ResNet块
res_block = ResNetBlock(in_channels=64, out_channels=128, stride=2)
out = res_block(x)
print(f"ResNet块输出形状: {out.shape}")这段代码实现了通道注意力机制,包括:
对抗训练通过生成对抗样本,增强模型的鲁棒性,在不平衡数据上表现良好。常用的对抗训练方法包括:
系统层面的解决方案主要通过多模型融合和业务规则集成来解决类别不平衡问题,包括:
方法 | 优点 | 缺点 | 适用场景 |
|---|---|---|---|
随机过采样 | 简单直观,易于实现 | 容易导致过拟合 | 少数类样本数量极少的场景 |
SMOTE | 生成合成样本,避免过拟合 | 合成样本质量有限,可能生成噪声 | 中等不平衡场景 |
ADASYN | 自适应生成样本,关注边界样本 | 计算复杂度较高 | 边界样本重要的场景 |
Borderline-SMOTE | 只对边界样本过采样,提高样本质量 | 可能丢失一些重要的内部样本 | 类间重叠严重的场景 |
GAN增强 | 生成高质量合成样本 | 训练复杂,需要大量计算资源 | 数据量较大的场景 |
随机欠采样 | 简单直观,计算效率高 | 容易丢失重要信息 | 多数类样本数量极大的场景 |
Tomek Links | 提高类间分离度 | 可能删除有用样本 | 类间重叠严重的场景 |
ENN | 减少噪声样本 | 可能删除有用的少数类样本 | 噪声较多的场景 |
NearMiss | 保留重要的多数类样本 | 计算复杂度较高 | 多数类样本分布不均的场景 |
方法 | 优点 | 缺点 | 适用场景 |
|---|---|---|---|
加权交叉熵 | 简单直观,易于实现 | 权重需要手动调整 | 各种场景,尤其是深度学习 |
Focal Loss | 自动调整样本权重,关注难分类样本 | gamma参数需要调整 | 极度不平衡场景 |
GHM Loss | 基于梯度密度调整权重,平衡不同难度样本 | 计算复杂度较高 | 样本难度差异较大的场景 |
Bagging | 减少方差,提高泛化能力 | 计算效率低 | 大规模数据场景 |
Boosting | 减少偏差,提高少数类召回率 | 容易过拟合 | 中小规模数据场景 |
Balanced Random Forest | 平衡每个决策树的样本分布 | 计算效率低 | 结构化数据场景 |
方法 | 优点 | 缺点 | 适用场景 |
|---|---|---|---|
通道注意力 | 增强模型对重要通道的关注 | 增加模型复杂度 | 深度学习场景 |
空间注意力 | 增强模型对重要区域的关注 | 增加模型复杂度 | 图像和视频分析场景 |
自注意力 | 增强样本间的相关性 | 计算复杂度高 | 序列数据场景 |
对抗训练 | 提高模型的鲁棒性和泛化能力 | 增加训练时间 | 对抗性环境 |
动态加权 | 自适应调整样本权重 | 需要额外的计算资源 | 动态变化的不平衡场景 |
类别不平衡解决方案在安全领域具有重要的工程意义:
类别不平衡解决方案也存在一些潜在风险:
类别不平衡解决方案还存在一些局限性:
未来,自适应不平衡学习将成为研究热点,包括:
联邦学习场景下的不平衡处理将成为重要研究方向,包括:
生成式模型在不平衡学习中的应用将更加广泛,包括:
可解释性将成为不平衡学习的重要研究方向,包括:
对抗性不平衡学习将成为未来的重要研究方向,包括:
指标 | 计算公式 | 含义 |
|---|---|---|
不平衡比(IR) | IR = 多数类样本数 / 少数类样本数 | 衡量数据不平衡程度 |
G-mean | G-mean = √(TPR × TNR) | 平衡精确率和召回率 |
F1-score | F1 = 2 × (精确率 × 召回率) / (精确率 + 召回率) | 平衡精确率和召回率 |
AUC | 曲线下面积 | 衡量模型的排序能力 |
PR曲线 | 精确率-召回率曲线 | 评估不平衡数据上的模型性能 |
# 安装必要的依赖
pip install numpy pandas scikit-learn torch torchvision matplotlib seaborn
# 安装不平衡学习库
pip install imbalanced-learn
# 安装对抗训练库
pip install robustbenchfrom sklearn.datasets import make_classification
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier, AdaBoostClassifier, GradientBoostingClassifier
from sklearn.metrics import classification_report, confusion_matrix, roc_auc_score
from imblearn.ensemble import BalancedRandomForestClassifier, EasyEnsembleClassifier
# 生成不平衡数据
X, y = make_classification(n_samples=10000, n_features=20, n_informative=15,
n_redundant=5, n_classes=2, weights=[0.99, 0.01],
random_state=42)
# 划分训练集和测试集
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=42)
print(f"训练集类别分布: {dict(zip(*np.unique(y_train, return_counts=True)))}")
print(f"测试集类别分布: {dict(zip(*np.unique(y_test, return_counts=True)))}")
# 定义模型列表
models = {
"Random Forest": RandomForestClassifier(random_state=42),
"Balanced Random Forest": BalancedRandomForestClassifier(random_state=42),
"AdaBoost": AdaBoostClassifier(random_state=42),
"Gradient Boosting": GradientBoostingClassifier(random_state=42),
"EasyEnsemble": EasyEnsembleClassifier(random_state=42)
}
# 训练和评估模型
results = {}
for name, model in models.items():
print(f"\n训练模型: {name}")
model.fit(X_train, y_train)
# 预测
y_pred = model.predict(X_test)
y_pred_proba = model.predict_proba(X_test)[:, 1]
# 评估
report = classification_report(y_test, y_pred, target_names=['正常', '攻击'])
auc = roc_auc_score(y_test, y_pred_proba)
cm = confusion_matrix(y_test, y_pred)
results[name] = {
'report': report,
'auc': auc,
'confusion_matrix': cm
}
print(f"AUC: {auc:.4f}")
print("分类报告:")
print(report)
print("混淆矩阵:")
print(cm)
# 比较不同模型的AUC
print("\n各模型AUC比较:")
for name, result in results.items():
print(f"{name}: {result['auc']:.4f}")这段代码实现了多种集成学习方法在不平衡数据上的应用,包括:
通过比较不同模型的AUC和分类报告,可以选择最适合特定场景的集成学习方法。
类别不平衡, 稀有攻击检测, SMOTE, 加权损失, Focal Loss, 集成学习, 注意力机制, 对抗训练, 安全机器学习, 不平衡学习