
作者:HOS(安全风信子) 日期:2026-01-09 来源平台:GitHub 摘要: 数据泄露(Data Leakage)是机器学习中常被忽视但危害巨大的问题,尤其在安全场景下,数据泄露可能导致模型在训练集上表现优异,但在实际部署时完全失效,甚至泄露敏感信息。本文深入解析数据泄露的定义、分类及其在安全pipeline中的常见隐蔽形式,包括特征泄露、时间泄露、标签泄露等。结合最新GitHub开源项目和安全实践,提供3个完整代码示例、2个Mermaid架构图和2个对比表格,系统阐述安全场景下的数据泄露检测与防范策略。文章将帮助安全工程师识别和避免数据泄露,构建更可靠的安全机器学习模型。
数据泄露是指模型在训练过程中意外获取了测试集信息,导致模型在测试集上表现优异,但在真实环境中泛化能力差。在安全场景下,数据泄露的危害尤为严重:
在安全攻防场景下,数据泄露面临以下特殊挑战:
根据GitHub上的最新项目和arXiv研究论文,安全领域的数据泄露研究呈现以下热点:
数据泄露可分为以下几类:
泄露类型 | 定义 | 常见原因 | 安全影响 |
|---|---|---|---|
特征泄露 | 特征包含测试集信息 | 特征提取时使用了未来数据,或特征与标签高度相关 | 模型在测试集上表现优异,但真实环境中失效 |
时间泄露 | 训练数据包含未来信息 | 交叉验证时未考虑时间顺序,或特征包含未来时间戳 | 安全模型无法检测实时攻击 |
标签泄露 | 标签信息意外泄露到特征中 | 特征工程时使用了标签信息,或数据清洗过程中泄露 | 模型过度依赖泄露的标签信息,泛化能力差 |
测试集污染 | 测试集数据意外流入训练集 | 数据划分错误,或交叉验证时数据泄露 | 模型评估结果不可靠 |
跨样本泄露 | 样本间信息泄露 | 批量处理时泄露样本间关系,或使用了全局统计信息 | 模型无法处理独立样本,泛化能力差 |
安全机器学习pipeline中的常见数据泄露点包括:
检测和防范数据泄露的核心原则包括:
Mermaid流程图:

Mermaid架构图:
渲染错误: Mermaid 渲染失败: Parse error on line 57: ...日志模块 style 数据泄露防护系统 fill:#FF450 ---------------------^ Expecting 'ALPHA', got 'UNICODE_TEXT'
import numpy as np
import pandas as pd
from sklearn.datasets import make_classification
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import f1_score
from sklearn.feature_selection import SelectKBest, f_classif
# 生成模拟安全时序数据
def generate_security_data(n_samples=1000):
# 生成基本特征
X, y = make_classification(n_samples=n_samples, n_features=20, n_informative=10,
n_redundant=5, n_classes=2, weights=[0.95, 0.05],
random_state=42)
# 添加时间戳特征
timestamps = np.arange(n_samples).reshape(-1, 1)
X = np.hstack([X, timestamps])
# 添加一个包含未来信息的泄露特征(模拟特征泄露)
# 这个特征是下一个样本的标签,会导致严重的数据泄露
leaky_feature = np.roll(y, -1).reshape(-1, 1)
leaky_feature[-1] = y[-1] # 最后一个样本使用自身标签
X = np.hstack([X, leaky_feature])
# 创建特征名称
feature_names = [f'feature_{i}' for i in range(X.shape[1]-2)] + ['timestamp', 'leaky_feature']
return pd.DataFrame(X, columns=feature_names), pd.Series(y, name='label')
# 生成数据
X, y = generate_security_data(n_samples=1000)
# 检测特征泄露的函数
def detect_feature_leakage(X, y, test_size=0.2):
"""
检测特征泄露
参数:
X: 特征矩阵
y: 标签向量
test_size: 测试集比例
返回:
leakage_score: 泄露分数,分数越高泄露越严重
suspicious_features: 可疑特征列表
"""
# 正常划分训练集和测试集
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=test_size, random_state=42)
# 训练模型并评估
model = LogisticRegression(random_state=42, max_iter=1000)
model.fit(X_train, y_train)
y_pred = model.predict(X_test)
normal_score = f1_score(y_test, y_pred)
print(f"正常划分 - F1分数: {normal_score:.4f}")
# 检测每个特征的泄露情况
suspicious_features = []
for feature in X.columns:
# 创建只包含当前特征的数据集
X_single = X[[feature]]
X_single_train, X_single_test, y_single_train, y_single_test = train_test_split(
X_single, y, test_size=test_size, random_state=42
)
# 训练模型
model_single = LogisticRegression(random_state=42, max_iter=1000)
model_single.fit(X_single_train, y_single_train)
# 评估模型
y_single_pred = model_single.predict(X_single_test)
single_score = f1_score(y_single_test, y_single_pred)
# 如果单个特征的表现接近或超过完整模型,可能存在泄露
if single_score > 0.9 and single_score > normal_score * 0.9:
suspicious_features.append((feature, single_score))
print(f"特征 '{feature}' 可能存在泄露 - F1分数: {single_score:.4f}")
return normal_score, suspicious_features
# 检测特征泄露
print("=== 检测特征泄露 ===")
normal_score, suspicious_features = detect_feature_leakage(X, y)
# 演示不同特征选择方法对泄露的影响
print("\n=== 不同特征选择方法对泄露的影响 ===")
# 1. 不使用泄露特征
X_no_leak = X.drop('leaky_feature', axis=1)
X_train_no_leak, X_test_no_leak, y_train, y_test = train_test_split(
X_no_leak, y, test_size=0.2, random_state=42
)
model_no_leak = LogisticRegression(random_state=42, max_iter=1000)
model_no_leak.fit(X_train_no_leak, y_train)
y_pred_no_leak = model_no_leak.predict(X_test_no_leak)
score_no_leak = f1_score(y_test, y_pred_no_leak)
print(f"不使用泄露特征 - F1分数: {score_no_leak:.4f}")
# 2. 使用特征选择(可能会选中泄露特征)
selector = SelectKBest(score_func=f_classif, k=10)
X_selected = selector.fit_transform(X, y)
# 获取选中的特征名称
selected_feature_indices = selector.get_support(indices=True)
selected_features = X.columns[selected_feature_indices].tolist()
print(f"\n选中的特征: {selected_features}")
# 检查是否选中了泄露特征
if 'leaky_feature' in selected_features:
print("警告:特征选择选中了泄露特征!")
# 训练并评估包含泄露特征的模型
X_train_selected, X_test_selected, y_train, y_test = train_test_split(
X_selected, y, test_size=0.2, random_state=42
)
model_selected = LogisticRegression(random_state=42, max_iter=1000)
model_selected.fit(X_train_selected, y_train)
y_pred_selected = model_selected.predict(X_test_selected)
score_selected = f1_score(y_test, y_pred_selected)
print(f"使用选中特征(包含泄露特征)- F1分数: {score_selected:.4f}")
# 移除泄露特征后再训练
X_selected_no_leak = X.drop('leaky_feature', axis=1)
selector_no_leak = SelectKBest(score_func=f_classif, k=10)
X_selected_no_leak = selector_no_leak.fit_transform(X_selected_no_leak, y)
X_train_selected_no_leak, X_test_selected_no_leak, y_train, y_test = train_test_split(
X_selected_no_leak, y, test_size=0.2, random_state=42
)
model_selected_no_leak = LogisticRegression(random_state=42, max_iter=1000)
model_selected_no_leak.fit(X_train_selected_no_leak, y_train)
y_pred_selected_no_leak = model_selected_no_leak.predict(X_test_selected_no_leak)
score_selected_no_leak = f1_score(y_test, y_pred_selected_no_leak)
print(f"移除泄露特征后 - F1分数: {score_selected_no_leak:.4f}")import numpy as np
import pandas as pd
from sklearn.datasets import make_classification
from sklearn.model_selection import train_test_split, TimeSeriesSplit
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import f1_score
# 生成模拟安全时序数据
def generate_security_time_series(n_samples=1000):
# 生成基本特征
X, y = make_classification(n_samples=n_samples, n_features=20, n_informative=10,
n_redundant=5, n_classes=2, weights=[0.95, 0.05],
random_state=42)
# 添加时间相关特征
# 时间戳
timestamps = np.arange(n_samples).reshape(-1, 1)
# 时间趋势特征
time_trend = np.linspace(0, 1, n_samples).reshape(-1, 1)
# 周期性特征
time_cyclic = np.sin(np.arange(n_samples) * 0.1).reshape(-1, 1)
X = np.hstack([X, timestamps, time_trend, time_cyclic])
# 创建特征名称
feature_names = [f'feature_{i}' for i in range(X.shape[1]-3)] + ['timestamp', 'time_trend', 'time_cyclic']
return pd.DataFrame(X, columns=feature_names), pd.Series(y, name='label')
# 生成数据
X, y = generate_security_time_series(n_samples=1000)
# 检测时间泄露的函数
def detect_time_leakage(X, y, test_size=0.2):
"""
检测时间泄露
参数:
X: 特征矩阵,包含timestamp列
y: 标签向量
test_size: 测试集比例
返回:
leakage_score: 泄露分数,分数越高泄露越严重
"""
# 1. 错误的做法:随机划分训练集和测试集(可能导致时间泄露)
X_train_random, X_test_random, y_train_random, y_test_random = train_test_split(
X, y, test_size=test_size, random_state=42
)
# 2. 正确的做法:按时间顺序划分训练集和测试集
split_idx = int(len(X) * (1 - test_size))
X_train_time, X_test_time = X.iloc[:split_idx], X.iloc[split_idx:]
y_train_time, y_test_time = y.iloc[:split_idx], y.iloc[split_idx:]
# 训练并评估模型
model = LogisticRegression(random_state=42, max_iter=1000)
# 使用随机划分
model.fit(X_train_random.drop('timestamp', axis=1), y_train_random)
y_pred_random = model.predict(X_test_random.drop('timestamp', axis=1))
score_random = f1_score(y_test_random, y_pred_random)
# 使用时间顺序划分
model.fit(X_train_time.drop('timestamp', axis=1), y_train_time)
y_pred_time = model.predict(X_test_time.drop('timestamp', axis=1))
score_time = f1_score(y_test_time, y_pred_time)
# 计算泄露分数:随机划分分数与时间顺序划分分数的差异
leakage_score = score_random - score_time
print(f"随机划分 - F1分数: {score_random:.4f}")
print(f"时间顺序划分 - F1分数: {score_time:.4f}")
print(f"时间泄露分数: {leakage_score:.4f}")
if leakage_score > 0.1:
print("警告:可能存在严重的时间泄露!")
elif leakage_score > 0.05:
print("警告:可能存在轻微的时间泄露!")
else:
print("未检测到明显的时间泄露。")
return leakage_score
# 演示交叉验证中的时间泄露
def demonstrate_cross_validation_leakage(X, y):
"""
演示交叉验证中的时间泄露
"""
print("\n=== 交叉验证中的时间泄露演示 ===")
# 错误的做法:使用普通KFold交叉验证
from sklearn.model_selection import KFold
kf = KFold(n_splits=5, random_state=42, shuffle=True)
scores_kf = []
for train_idx, test_idx in kf.split(X):
X_train, X_test = X.iloc[train_idx], X.iloc[test_idx]
y_train, y_test = y.iloc[train_idx], y.iloc[test_idx]
# 检查是否存在时间泄露
train_max_time = X_train['timestamp'].max()
test_min_time = X_test['timestamp'].min()
if test_min_time < train_max_time:
print(f"KFold泄露 - 训练集最大时间: {train_max_time}, 测试集最小时间: {test_min_time}")
model = LogisticRegression(random_state=42, max_iter=1000)
model.fit(X_train.drop('timestamp', axis=1), y_train)
y_pred = model.predict(X_test.drop('timestamp', axis=1))
scores_kf.append(f1_score(y_test, y_pred))
print(f"KFold交叉验证平均F1分数: {np.mean(scores_kf):.4f}")
# 正确的做法:使用TimeSeriesSplit交叉验证
tscv = TimeSeriesSplit(n_splits=5)
scores_tscv = []
for train_idx, test_idx in tscv.split(X):
X_train, X_test = X.iloc[train_idx], X.iloc[test_idx]
y_train, y_test = y.iloc[train_idx], y.iloc[test_idx]
# 检查是否存在时间泄露
train_max_time = X_train['timestamp'].max()
test_min_time = X_test['timestamp'].min()
if test_min_time >= train_max_time:
print(f"TimeSeriesSplit正确 - 训练集最大时间: {train_max_time}, 测试集最小时间: {test_min_time}")
model = LogisticRegression(random_state=42, max_iter=1000)
model.fit(X_train.drop('timestamp', axis=1), y_train)
y_pred = model.predict(X_test.drop('timestamp', axis=1))
scores_tscv.append(f1_score(y_test, y_pred))
print(f"TimeSeriesSplit交叉验证平均F1分数: {np.mean(scores_tscv):.4f}")
return np.mean(scores_kf), np.mean(scores_tscv)
# 检测时间泄露
print("=== 检测时间泄露 ===")
detect_time_leakage(X, y)
# 演示交叉验证中的时间泄露
demonstrate_cross_validation_leakage(X, y)import numpy as np
import pandas as pd
from sklearn.datasets import make_classification
from sklearn.model_selection import train_test_split, TimeSeriesSplit, GridSearchCV
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import f1_score
from sklearn.preprocessing import StandardScaler
from sklearn.pipeline import Pipeline
# 生成模拟安全数据
def generate_safe_security_data(n_samples=1000):
# 生成基本特征
X, y = make_classification(n_samples=n_samples, n_features=20, n_informative=10,
n_redundant=5, n_classes=2, weights=[0.95, 0.05],
random_state=42)
# 添加时间戳
timestamps = np.arange(n_samples).reshape(-1, 1)
X = np.hstack([X, timestamps])
# 创建特征名称
feature_names = [f'feature_{i}' for i in range(X.shape[1]-1)] + ['timestamp']
return pd.DataFrame(X, columns=feature_names), pd.Series(y, name='label')
# 生成数据
X, y = generate_safe_security_data(n_samples=1000)
# 防范数据泄露的最佳实践
def prevent_data_leakage(X, y):
"""
演示防范数据泄露的最佳实践
"""
print("=== 防范数据泄露的最佳实践 ===")
# 1. 正确划分数据集
# 按时间顺序划分训练集、验证集、测试集
total_size = len(X)
train_size = int(total_size * 0.6)
val_size = int(total_size * 0.2)
# 训练集:前60%
X_train = X.iloc[:train_size]
y_train = y.iloc[:train_size]
# 验证集:中间20%
X_val = X.iloc[train_size:train_size+val_size]
y_val = y.iloc[train_size:train_size+val_size]
# 测试集:最后20%
X_test = X.iloc[train_size+val_size:]
y_test = y.iloc[train_size+val_size:]
print(f"数据集划分:")
print(f" 训练集: {len(X_train)} 样本 (60%)")
print(f" 验证集: {len(X_val)} 样本 (20%)")
print(f" 测试集: {len(X_test)} 样本 (20%)")
print(f" 训练集时间范围: {X_train['timestamp'].min():.0f} - {X_train['timestamp'].max():.0f}")
print(f" 验证集时间范围: {X_val['timestamp'].min():.0f} - {X_val['timestamp'].max():.0f}")
print(f" 测试集时间范围: {X_test['timestamp'].min():.0f} - {X_test['timestamp'].max():.0f}")
# 2. 安全的特征工程:只使用训练集信息
# 移除时间戳特征,只使用其他特征
X_train_feats = X_train.drop('timestamp', axis=1)
X_val_feats = X_val.drop('timestamp', axis=1)
X_test_feats = X_test.drop('timestamp', axis=1)
# 3. 安全的数据预处理:使用Pipeline避免数据泄露
# Pipeline会确保预处理只在训练集上拟合,然后应用到验证集和测试集
pipeline = Pipeline([
('scaler', StandardScaler()), # 数据标准化
('classifier', LogisticRegression(random_state=42, max_iter=1000)) # 分类器
])
# 4. 安全的超参数调优:使用GridSearchCV,确保验证集不泄露到训练集
param_grid = {
'classifier__C': [0.001, 0.01, 0.1, 1.0, 10.0],
'classifier__penalty': ['l1', 'l2']
}
# 使用TimeSeriesSplit进行交叉验证
tscv = TimeSeriesSplit(n_splits=5)
grid_search = GridSearchCV(pipeline, param_grid=param_grid, cv=tscv, scoring='f1', n_jobs=-1)
# 只在训练集上进行GridSearchCV
grid_search.fit(X_train_feats, y_train)
print(f"\n最佳参数: {grid_search.best_params_}")
print(f"交叉验证最佳F1分数: {grid_search.best_score_:.4f}")
# 5. 在验证集上评估
val_pred = grid_search.predict(X_val_feats)
val_score = f1_score(y_val, val_pred)
print(f"验证集F1分数: {val_score:.4f}")
# 6. 最终在测试集上评估(只做一次,避免信息泄露)
test_pred = grid_search.predict(X_test_feats)
test_score = f1_score(y_test, test_pred)
print(f"测试集F1分数: {test_score:.4f}")
# 7. 检查模型泛化能力
if test_score > 0.95:
print("警告:测试集分数过高,可能存在数据泄露!")
elif test_score < val_score * 0.8:
print("警告:测试集分数远低于验证集,可能存在过拟合!")
else:
print("模型泛化能力良好,未检测到明显的数据泄露。")
return grid_search, test_score
# 运行最佳实践
model, test_score = prevent_data_leakage(X, y)
# 演示如何检测已训练模型是否存在泄露
def detect_leakage_in_trained_model(model, X_train, y_train, X_test, y_test):
"""
检测已训练模型是否存在数据泄露
"""
print("\n=== 检测已训练模型是否存在泄露 ===")
# 1. 检查训练集和测试集的性能差异
train_pred = model.predict(X_train)
test_pred = model.predict(X_test)
train_score = f1_score(y_train, train_pred)
test_score = f1_score(y_test, test_pred)
print(f"训练集F1分数: {train_score:.4f}")
print(f"测试集F1分数: {test_score:.4f}")
# 2. 检查模型对随机打乱数据的性能
# 打乱测试集特征顺序
X_test_shuffled = X_test.copy()
np.random.shuffle(X_test_shuffled.values)
shuffled_pred = model.predict(X_test_shuffled)
shuffled_score = f1_score(y_test, shuffled_pred)
print(f"打乱测试集特征后F1分数: {shuffled_score:.4f}")
# 3. 泄露检测规则
leakage_detected = False
# 规则1:测试集分数接近或超过训练集分数
if test_score >= train_score * 0.95:
print("警告:测试集分数接近训练集分数,可能存在数据泄露!")
leakage_detected = True
# 规则2:测试集分数过高(超过0.95)
if test_score > 0.95:
print("警告:测试集分数过高,可能存在数据泄露!")
leakage_detected = True
# 规则3:打乱特征后性能下降不明显
if abs(test_score - shuffled_score) < 0.1:
print("警告:打乱特征后性能下降不明显,模型可能依赖泄露信息!")
leakage_detected = True
if not leakage_detected:
print("未检测到明显的数据泄露。")
return leakage_detected
# 检测已训练模型
X_train_feats = X.iloc[:600].drop('timestamp', axis=1)
y_train = y.iloc[:600]
X_test_feats = X.iloc[800:].drop('timestamp', axis=1)
y_test = y.iloc[800:]
detect_leakage_in_trained_model(model, X_train_feats, y_train, X_test_feats, y_test)划分方法 | 数据泄露风险 | 计算效率 | 适用场景 | 推荐程度 |
|---|---|---|---|---|
随机划分 | 高 | 高 | 非时序数据,快速原型开发 | ⭐⭐⭐ |
时间顺序划分 | 低 | 高 | 时序数据,安全场景 | ⭐⭐⭐⭐⭐ |
K-fold交叉验证 | 中 | 中 | 非时序数据,需要精确评估 | ⭐⭐⭐⭐ |
Stratified K-fold | 中 | 中 | 不平衡非时序数据 | ⭐⭐⭐⭐ |
TimeSeriesSplit | 低 | 中 | 时序数据,安全场景 | ⭐⭐⭐⭐⭐ |
Nested CV | 低 | 低 | 需要严格评估的场景 | ⭐⭐⭐⭐ |
工具名称 | 检测能力 | 易用性 | 适用场景 | 开源/商业 | 推荐程度 |
|---|---|---|---|---|---|
scikit-learn | 基础 | 高 | 简单检测 | 开源 | ⭐⭐⭐⭐ |
DataLeakDetector | 高级 | 中 | 复杂pipeline | 开源 | ⭐⭐⭐⭐ |
TensorFlow Data Validation | 高级 | 中 | 大规模数据 | 开源 | ⭐⭐⭐⭐ |
AWS SageMaker Debugger | 高级 | 高 | AWS环境 | 商业 | ⭐⭐⭐⭐ |
Google Cloud AI Platform | 高级 | 高 | GCP环境 | 商业 | ⭐⭐⭐⭐ |
自定义检测脚本 | 灵活 | 低 | 特定场景 | 自定义 | ⭐⭐⭐ |
参考链接:
附录(Appendix):
import numpy as np
import pandas as pd
from sklearn.model_selection import TimeSeriesSplit, train_test_split
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import f1_score
def safe_data_split(X, y, test_size=0.2, val_size=0.2, is_time_series=True):
"""
安全划分数据集
参数:
X: 特征矩阵
y: 标签向量
test_size: 测试集比例
val_size: 验证集比例
is_time_series: 是否为时序数据
返回:
X_train, X_val, X_test, y_train, y_val, y_test: 划分后的数据集
"""
if is_time_series:
# 时序数据按时间顺序划分
total_size = len(X)
train_size = int(total_size * (1 - test_size - val_size))
val_end = train_size + int(total_size * val_size)
X_train = X.iloc[:train_size]
X_val = X.iloc[train_size:val_end]
X_test = X.iloc[val_end:]
y_train = y.iloc[:train_size]
y_val = y.iloc[train_size:val_end]
y_test = y.iloc[val_end:]
else:
# 非时序数据随机划分,但确保训练集、验证集、测试集隔离
X_train_val, X_test, y_train_val, y_test = train_test_split(
X, y, test_size=test_size, random_state=42
)
X_train, X_val, y_train, y_val = train_test_split(
X_train_val, y_train_val, test_size=val_size/(1-test_size), random_state=42
)
return X_train, X_val, X_test, y_train, y_val, y_test
def safe_model_training(X_train, y_train, X_val, y_val, model, param_grid=None):
"""
安全训练模型,防止数据泄露
参数:
X_train: 训练特征矩阵
y_train: 训练标签向量
X_val: 验证特征矩阵
y_val: 验证标签向量
model: 模型对象
param_grid: 超参数网格
返回:
best_model: 最佳模型
best_score: 最佳分数
"""
from sklearn.model_selection import GridSearchCV
# 创建Pipeline,确保预处理只在训练集上拟合
pipeline = Pipeline([
('scaler', StandardScaler()),
('classifier', model)
])
if param_grid:
# 使用GridSearchCV进行超参数调优
grid_search = GridSearchCV(pipeline, param_grid=param_grid, cv=5, scoring='f1', n_jobs=-1)
grid_search.fit(X_train, y_train)
best_model = grid_search.best_estimator_
best_score = grid_search.best_score_
# 在验证集上评估
val_score = f1_score(y_val, best_model.predict(X_val))
print(f"交叉验证最佳F1分数: {best_score:.4f}")
print(f"验证集F1分数: {val_score:.4f}")
else:
# 直接训练模型
pipeline.fit(X_train, y_train)
best_model = pipeline
# 在训练集和验证集上评估
train_score = f1_score(y_train, best_model.predict(X_train))
val_score = f1_score(y_val, best_model.predict(X_val))
best_score = val_score
print(f"训练集F1分数: {train_score:.4f}")
print(f"验证集F1分数: {val_score:.4f}")
return best_model, best_score
def detect_model_leakage(model, X_train, y_train, X_test, y_test):
"""
检测模型是否存在数据泄露
参数:
model: 训练好的模型
X_train: 训练特征矩阵
y_train: 训练标签向量
X_test: 测试特征矩阵
y_test: 测试标签向量
返回:
is_leaky: 是否存在泄露
leakage_score: 泄露分数
"""
# 计算训练集和测试集分数
train_pred = model.predict(X_train)
test_pred = model.predict(X_test)
train_score = f1_score(y_train, train_pred)
test_score = f1_score(y_test, test_pred)
# 计算泄露分数
leakage_score = test_score - train_score
# 检测规则
is_leaky = False
if test_score > 0.95:
print("警告:测试集分数过高,可能存在数据泄露!")
is_leaky = True
if leakage_score > 0.1:
print("警告:测试集分数高于训练集,可能存在数据泄露!")
is_leaky = True
if test_score > train_score * 0.95:
print("警告:测试集分数接近训练集,可能存在数据泄露!")
is_leaky = True
# 打乱测试集特征,检查性能变化
X_test_shuffled = X_test.copy()
np.random.shuffle(X_test_shuffled.values)
shuffled_pred = model.predict(X_test_shuffled)
shuffled_score = f1_score(y_test, shuffled_pred)
if abs(test_score - shuffled_score) < 0.1:
print("警告:打乱特征后性能下降不明显,模型可能依赖泄露信息!")
is_leaky = True
print(f"泄露检测结果:")
print(f" 训练集F1分数: {train_score:.4f}")
print(f" 测试集F1分数: {test_score:.4f}")
print(f" 泄露分数: {leakage_score:.4f}")
print(f" 打乱特征后F1分数: {shuffled_score:.4f}")
if not is_leaky:
print(" 结论:未检测到明显的数据泄露。")
else:
print(" 结论:可能存在数据泄露!")
return is_leaky, leakage_score关键词: 数据泄露, 安全pipeline, 特征泄露, 时间泄露, 标签泄露, 测试集污染, 跨样本泄露, 泄露检测, 泄露防范