在数据科学和统计学中,我们经常听到"相关性不等于因果性"的告诫。但如果我们真正关心的是因果效应——某个干预或处理如何改变结果——那么我们需要因果推断的方法。
在我们深入探讨因果推断之前,必须明确区分相关性和因果性这两个基本概念。这一区分是理解因果推断方法论的基石。
特征 | 相关性 | 因果性 |
|---|---|---|
定义 | 两个变量之间的统计关联 | 一个变量变化引起另一个变量变化的机制 |
数学表示 | P(Y|X) ≠ P(Y) | P(Y|do(X)) ≠ P(Y) |
方向性 | 可以是双向的 | 通常是单向的(原因→结果) |
混淆因素 | 可能受第三方变量影响 | 需要控制所有混淆变量 |
证据要求 | 统计关联即可 | 需要排除其他解释 |
考虑一个经典案例:冰淇淋销量和溺水事故之间存在正相关关系。当冰淇淋销量高时,溺水事故也更频繁。这是否意味着吃冰淇淋会导致溺水?
当然不是。这里存在一个混淆变量——温度(或季节)。天气炎热时,更多人购买冰淇淋,同时更多人去游泳,从而增加了溺水风险。一旦我们控制温度因素,冰淇淋和溺水之间的伪相关就会消失。

上图展示了混淆变量如何创建伪相关。虚线表示虚假关联,实线表示真实因果路径。
混淆偏差:第三方变量同时影响原因和结果变量
反向因果:结果变量反而影响原因变量
选择偏差:样本选择过程与研究对象相关
纯粹巧合:变量间偶然出现的统计关联
因果推断要回答的问题是:如果我们改变X,Y会如何变化?这比仅仅知道X和Y相关要困难得多,因为它需要反事实推理——我们需要知道在X不同的情况下,Y会发生什么。
在数学上,我们关心的是干预分布P(Y|do(X)),而不是观察分布P(Y|X)。这里的do算子表示主动干预而非被动观察,这是因果推断的核心概念。
因果推断建立在形式化的框架之上,这些框架帮助我们清晰地定义因果问题并提供解决方案。本节将介绍潜在结果框架和因果图模型这两个核心框架。
潜在结果框架,也称为Rubin因果模型,是因果推断中最常用的框架之一。其核心思想是为每个个体定义两个潜在结果:接受处理时的结果Y₁和未接受处理时的结果Y₀。
基本概念:
根本问题: 对于任何个体,我们只能观察到一个潜在结果(要么Y₁,要么Y₀),另一个是反事实的。这被称为因果推断的根本问题。
import numpy as np
import pandas as pd
class PotentialOutcomesFramework:
def __init__(self, n_units=1000):
self.n_units = n_units
self.data = None
def generate_data(self, baseline_y0=50, treatment_effect=10, heterogeneity=5):
"""生成模拟潜在结果数据"""
np.random.seed(42)
# 生成基线特征
age = np.random.normal(45, 15, self.n_units)
education = np.random.normal(12, 3, self.n_units)
# 生成潜在结果
base_y0 = baseline_y0 + 0.5*age + 1.5*education + np.random.normal(0, 10, self.n_units)
indiv_te = treatment_effect + heterogeneity*(education - 12)/3 + np.random.normal(0, 3, self.n_units)
y0 = base_y0
y1 = base_y0 + indiv_te
# 基于倾向得分分配处理
propensity = 1 / (1 + np.exp(-(age - 45)/15 - (education - 12)/3))
treatment = np.random.binomial(1, propensity)
# 观察结果
observed_y = np.where(treatment == 1, y1, y0)
self.data = pd.DataFrame({
'age': age,
'education': education,
'y0': y0,
'y1': y1,
'treatment': treatment,
'observed_y': observed_y,
'true_ite': y1 - y0
})
return self.data
def calculate_naive_ate(self):
"""计算简单的ATE估计(可能有偏差)"""
if self.data is None:
self.generate_data()
treated_mean = self.data[self.data['treatment'] == 1]['observed_y'].mean()
control_mean = self.data[self.data['treatment'] == 0]['observed_y'].mean()
return treated_mean - control_mean
def calculate_true_ate(self):
"""计算真实的ATE"""
if self.data is None:
self.generate_data()
return self.data['true_ite'].mean()
# 示例使用
framework = PotentialOutcomesFramework()
data = framework.generate_data()
naive_ate = framework.calculate_naive_ate()
true_ate = framework.calculate_true_ate()
print(f"简单ATE估计: {naive_ate:.2f}")
print(f"真实ATE: {true_ate:.2f}")
print(f"估计偏差: {naive_ate - true_ate:.2f}")因果图模型使用图形表示变量间的因果关系,帮助我们识别混淆变量并选择合适的因果推断策略。
基本元素:
d-分离准则: 判断两个变量在给定条件集下是否条件独立
import networkx as nx
import matplotlib.pyplot as plt
class CausalGraph:
def __init__(self):
self.graph = nx.DiGraph()
def add_edges(self, edges):
"""添加边到因果图"""
self.graph.add_edges_from(edges)
def draw_graph(self):
"""绘制因果图"""
plt.figure(figsize=(10, 8))
pos = nx.spring_layout(self.graph)
nx.draw(self.graph, pos, with_labels=True, node_size=3000,
node_color='skyblue', font_size=15, arrowsize=20)
plt.title("因果图模型", fontsize=20)
plt.show()
def get_backdoor_paths(self, treatment, outcome):
"""获取后门路径"""
backdoor_paths = []
# 查找所有连接treatment和outcome的路径
all_paths = nx.all_simple_paths(self.graph, treatment, outcome)
for path in all_paths:
# 检查是否是后门路径(以指向treatment的箭头开始)
if len(path) >= 2 and (path[1], path[0]) in self.graph.edges:
backdoor_paths.append(path)
return backdoor_paths
def get_confounders(self, treatment, outcome):
"""识别混淆变量"""
backdoor_paths = self.get_backdoor_paths(treatment, outcome)
confounders = set()
for path in backdoor_paths:
# 后门路径上的变量(除了treatment和outcome)都是潜在的混淆变量
confounders.update(path[1:-1])
return confounders
# 创建因果图示例
cg = CausalGraph()
edges = [('年龄', '治疗'), ('年龄', '健康结果'), ('教育', '治疗'),
('教育', '健康结果'), ('治疗', '健康结果')]
cg.add_edges(edges)
cg.draw_graph()
print("后门路径:", cg.get_backdoor_paths('治疗', '健康结果'))
print("混淆变量:", cg.get_confounders('治疗', '健康结果'))
随机对照试验(RCT)被视为因果推断的黄金标准,因为它通过随机化处理分配,确保处理组和对照组在所有观测和未观测特征上平均可比性。
随机化的核心优势在于它消除了混淆偏差。通过随机分配处理,我们确保处理状态与任何潜在混淆因素无关,无论是观测到的还是未观测到的。
随机化的性质:
尽管随机实验在理论上是理想的,但在实践中面临多种挑战:
挑战 | 描述 | 解决方案 |
|---|---|---|
伦理问题 | 某些处理可能有害或不道德 | 使用观察性研究替代 |
成本高昂 | 随机实验通常昂贵且耗时 | 精心设计减少样本量 |
实践可行性 | 某些处理难以随机分配 | 部分随机化或自然实验 |
外部有效性 | 实验结果可能无法推广到更大群体 | 确保样本代表性 |
非遵守问题 | 参与者可能不遵守分配的处理 | 使用工具变量方法 |
让我们通过模拟一个随机实验来展示其如何提供无偏的因果效应估计。
class RandomizedExperiment:
def __init__(self, n_units=1000, true_effect=5):
self.n_units = n_units
self.true_effect = true_effect
self.data = None
def generate_data(self):
"""生成模拟数据"""
np.random.seed(42)
# 生成基线特征
age = np.random.normal(45, 15, self.n_units)
education = np.random.normal(12, 3, self.n_units)
baseline_health = np.random.normal(50, 10, self.n_units)
# 随机分配处理
treatment = np.random.binomial(1, 0.5, self.n_units)
# 生成结果(处理效应为true_effect)
y0 = baseline_health + 0.3*age + 0.5*education + np.random.normal(0, 5, self.n_units)
y1 = y0 + self.true_effect
observed_y = np.where(treatment == 1, y1, y0)
self.data = pd.DataFrame({
'age': age,
'education': education,
'baseline_health': baseline_health,
'treatment': treatment,
'y0': y0,
'y1': y1,
'observed_y': observed_y
})
return self.data
def check_balance(self):
"""检查随机化后的平衡性"""
if self.data is None:
self.generate_data()
balance_table = pd.DataFrame()
for covariate in ['age', 'education', 'baseline_health']:
treated_mean = self.data[self.data['treatment'] == 1][covariate].mean()
control_mean = self.data[self.data['treatment'] == 0][covariate].mean()
difference = treated_mean - control_mean
p_value = self._calculate_p_value(
self.data[self.data['treatment'] == 1][covariate],
self.data[self.data['treatment'] == 0][covariate]
)
balance_table[covariate] = [treated_mean, control_mean, difference, p_value]
balance_table.index = ['处理组均值', '对照组均值', '差异', 'p值']
return balance_table.round(3)
def _calculate_p_value(self, group1, group2):
"""计算两组差异的p值"""
from scipy.stats import ttest_ind
_, p_value = ttest_ind(group1, group2)
return p_value
def estimate_ate(self):
"""估计平均处理效应"""
if self.data is None:
self.generate_data()
treated_mean = self.data[self.data['treatment'] == 1]['observed_y'].mean()
control_mean = self.data[self.data['treatment'] == 0]['observed_y'].mean()
ate = treated_mean - control_mean
ate_se = self._calculate_ate_se()
return ate, ate_se, (ate - 1.96*ate_se, ate + 1.96*ate_se)
def _calculate_ate_se(self):
"""计算ATE的标准误"""
treated_var = self.data[self.data['treatment'] == 1]['observed_y'].var()
control_var = self.data[self.data['treatment'] == 0]['observed_y'].var()
n_treated = len(self.data[self.data['treatment'] == 1])
n_control = len(self.data[self.data['treatment'] == 0])
se = np.sqrt(treated_var/n_treated + control_var/n_control)
return se
# 运行随机实验模拟
experiment = RandomizedExperiment(n_units=2000, true_effect=5)
data = experiment.generate_data()
print("平衡性检查:")
print(experiment.check_balance())
ate, ate_se, ci = experiment.estimate_ate()
print(f"\nATE估计: {ate:.3f}")
print(f"标准误: {ate_se:.3f}")
print(f"95%置信区间: ({ci[0]:.3f}, {ci[1]:.3f})")除了简单的均值比较,随机实验还可以使用回归分析来提高估计精度:
import statsmodels.api as sm
import statsmodels.formula.api as smf
def analyze_experiment_regression(data):
"""使用回归分析实验数据"""
# 简单均值比较模型
model1 = smf.ols('observed_y ~ treatment', data=data).fit()
# 调整基线协变量提高精度
model2 = smf.ols('observed_y ~ treatment + age + education + baseline_health', data=data).fit()
# 包含交互项(处理效应异质性)
model3 = smf.ols('observed_y ~ treatment * (age + education)', data=data).fit()
return model1, model2, model3
# 分析实验数据
model1, model2, model3 = analyze_experiment_regression(data)
print("简单模型:")
print(f"ATE估计: {model1.params['treatment']:.3f}, p值: {model1.pvalues['treatment']:.4f}")
print("\n调整协变量模型:")
print(f"ATE估计: {model2.params['treatment']:.3f}, p值: {model2.pvalues['treatment']:.4f}")
print("\n异质性处理效应模型:")
print(f"平均ATE: {model3.params['treatment']:.3f}, p值: {model3.pvalues['treatment']:.4f}")
当随机实验不可行时,观察性研究成为因果推断的主要方法。本节介绍几种主要的观察性因果推断方法,包括匹配、加权和双重差分等。
倾向得分匹配通过模拟随机实验的条件,创建处理组和对照组的可比样本。
from sklearn.linear_model import LogisticRegression
from sklearn.neighbors import NearestNeighbors
class PropensityScoreMatcher:
def __init__(self):
self.ps_model = None
self.matched_data = None
def estimate_propensity_scores(self, X, treatment):
"""估计倾向得分"""
self.ps_model = LogisticRegression()
self.ps_model.fit(X, treatment)
propensity_scores = self.ps_model.predict_proba(X)[:, 1]
return propensity_scores
def match(self, data, covariates, treatment_var, outcome_var, caliper=0.1):
"""执行倾向得分匹配"""
X = data[covariates]
treatment = data[treatment_var]
# 估计倾向得分
data['propensity_score'] = self.estimate_propensity_scores(X, treatment)
# 分离处理组和对照组
treated = data[data[treatment_var] == 1]
control = data[data[treatment_var] == 0]
# 使用最近邻匹配
nbrs = NearestNeighbors(n_neighbors=1, metric='euclidean')
nbrs.fit(control[['propensity_score']])
distances, indices = nbrs.kneighbors(treated[['propensity_score']])
# 应用卡钳值
matched_indices = []
for i, (dist, idx) in enumerate(zip(distances, indices)):
if dist < caliper:
matched_indices.append((treated.index[i], control.index[idx[0]]))
# 创建匹配后的数据集
treated_matched = data.loc[[pair[0] for pair in matched_indices]]
control_matched = data.loc[[pair[1] for pair in matched_indices]]
self.matched_data = pd.concat([treated_matched, control_matched])
return self.matched_data
def estimate_ate(self):
"""估计匹配后的ATE"""
if self.matched_data is None:
raise ValueError("必须先执行匹配")
treated_mean = self.matched_data[self.matched_data['treatment'] == 1]['observed_y'].mean()
control_mean = self.matched_data[self.matched_data['treatment'] == 0]['observed_y'].mean()
return treated_mean - control_mean
# 使用PSM示例
# 首先创建有混淆的观察性数据
np.random.seed(42)
n_units = 2000
# 生成混淆变量
age = np.random.normal(45, 15, n_units)
education = np.random.normal(12, 3, n_units)
# 基于混淆变量生成处理分配(非随机)
propensity = 1 / (1 + np.exp(-(age - 45)/15 - (education - 12)/3))
treatment = np.random.binomial(1, propensity)
# 生成结果(真实效应为5)
y0 = 50 + 0.5*age + 1.5*education + np.random.normal(0, 10, n_units)
y1 = y0 + 5
observed_y = np.where(treatment == 1, y1, y0)
obs_data = pd.DataFrame({
'age': age,
'education': education,
'treatment': treatment,
'observed_y': observed_y
})
# 执行PSM
matcher = PropensityScoreMatcher()
matched_data = matcher.match(obs_data, ['age', 'education'], 'treatment', 'observed_y')
naive_ate = obs_data[obs_data['treatment'] == 1]['observed_y'].mean() - obs_data[obs_data['treatment'] == 0]['observed_y'].mean()
psm_ate = matcher.estimate_ate()
print(f"简单比较ATE: {naive_ate:.3f}")
print(f"PSM调整后ATE: {psm_ate:.3f}")
print(f"真实ATE: 5.000")逆概率处理加权通过加权创建一个人工群体,其中处理分配与混淆变量独立。
class IPTWEstimator:
def __init__(self):
self.ps_model = None
def estimate_ate(self, data, covariates, treatment_var, outcome_var):
"""使用IPTW估计ATE"""
X = data[covariates]
treatment = data[treatment_var]
# 估计倾向得分
self.ps_model = LogisticRegression()
self.ps_model.fit(X, treatment)
propensity_scores = self.ps_model.predict_proba(X)[:, 1]
# 计算权重
weights = np.where(treatment == 1,
1/propensity_scores,
1/(1-propensity_scores))
# 加权估计ATE
weighted_treated = np.sum(weights * treatment * data[outcome_var]) / np.sum(weights * treatment)
weighted_control = np.sum(weights * (1-treatment) * data[outcome_var]) / np.sum(weights * (1-treatment))
ate = weighted_treated - weighted_control
return ate, weights
# 使用IPTW示例
iptw = IPTWEstimator()
ate_iptw, weights = iptw.estimate_ate(obs_data, ['age', 'education'], 'treatment', 'observed_y')
print(f"IPTW估计ATE: {ate_iptw:.3f}")双重差分法适用于面板数据,通过比较处理组和对照组在处理前后的变化差异来估计因果效应。
class DifferenceInDifferences:
def __init__(self):
pass
def generate_panel_data(self, n_units=1000, n_periods=5, true_effect=3):
"""生成面板数据"""
np.random.seed(42)
# 生成单位固定效应和时间固定效应
unit_effects = np.random.normal(0, 5, n_units)
time_effects = np.random.normal(0, 2, n_periods)
# 生成处理分配(某些单位在某个时间点后接受处理)
treatment_unit = np.random.binomial(1, 0.5, n_units)
treatment_time = n_periods // 2 # 中间时间点引入处理
data = []
for i in range(n_units):
for t in range(n_periods):
# 基础结果
base_outcome = 50 + unit_effects[i] + time_effects[t]
# 处理效应(处理后且处理组)
if t >= treatment_time and treatment_unit[i] == 1:
treatment_effect = true_effect
else:
treatment_effect = 0
outcome = base_outcome + treatment_effect + np.random.normal(0, 3)
data.append({
'unit': i,
'time': t,
'treatment_unit': treatment_unit[i],
'post_treatment': 1 if t >= treatment_time else 0,
'outcome': outcome
})
return pd.DataFrame(data)
def estimate_did(self, data):
"""估计DID"""
# 分组计算前后均值
pre_treatment = data[data['post_treatment'] == 0].groupby('treatment_unit')['outcome'].mean()
post_treatment = data[data['post_treatment'] == 1].groupby('treatment_unit')['outcome'].mean()
# 计算DID
diff_treatment = post_treatment[1] - pre_treatment[1]
diff_control = post_treatment[0] - pre_treatment[0]
did_estimate = diff_treatment - diff_control
return did_estimate
# 使用DID示例
did = DifferenceInDifferences()
panel_data = did.generate_panel_data()
did_estimate = did.estimate_did(panel_data)
print(f"DID估计效应: {did_estimate:.3f}")
print(f"真实效应: 3.000")工具变量法使用一个与处理相关但与结果无关的变量(工具变量)来估计因果效应。
class InstrumentalVariable:
def __init__(self):
pass
def generate_iv_data(self, n_units=1000, true_effect=4):
"""生成工具变量数据"""
np.random.seed(42)
# 生成工具变量(与处理相关但与结果无关)
instrument = np.random.normal(0, 1, n_units)
# 生成混淆变量
confounder = np.random.normal(0, 2, n_units)
# 生成处理(受工具变量和混淆变量影响)
treatment = 0.8 * instrument + 0.5 * confounder + np.random.normal(0, 1, n_units)
treatment = (treatment > 0).astype(int)
# 生成结果(受处理和混淆变量影响)
outcome = true_effect * treatment + 1.5 * confounder + np.random.normal(0, 2, n_units)
return pd.DataFrame({
'instrument': instrument,
'confounder': confounder,
'treatment': treatment,
'outcome': outcome
})
def estimate_iv(self, data):
"""使用2SLS估计IV"""
from sklearn.linear_model import LinearRegression
# 第一阶段:工具变量预测处理
stage1 = LinearRegression()
stage1.fit(data[['instrument']], data['treatment'])
treatment_pred = stage1.predict(data[['instrument']])
# 第二阶段:预测的处理预测结果
stage2 = LinearRegression()
stage2.fit(treatment_pred.reshape(-1, 1), data['outcome'])
iv_estimate = stage2.coef_[0]
return iv_estimate
# 使用IV示例
iv = InstrumentalVariable()
iv_data = iv.generate_iv_data()
iv_estimate = iv.estimate_iv(iv_data)
naive_estimate = LinearRegression().fit(iv_data[['treatment']], iv_data['outcome']).coef_[0]
print(f"简单回归估计: {naive_estimate:.3f}")
print(f"IV估计: {iv_estimate:.3f}")
print(f"真实效应: 4.000")
教育对收入的影响是社会科学中一个经典的因果推断问题。我们将使用美国国民纵向调查(NLSY)数据的模拟版本来演示如何应用因果推断方法。
我们想回答的问题是:高等教育(大学学位)是否真的能提高个人收入?如果是,提高多少?
挑战:
我们首先生成一个模拟数据集,包含观察到的和未观察到的混淆变量:
class EducationIncomeData:
def __init__(self, n_units=5000):
self.n_units = n_units
self.data = None
def generate_data(self, true_effect=0.25):
"""生成教育-收入模拟数据"""
np.random.seed(42)
# 观测到的变量
family_income = np.random.lognormal(10.5, 0.8, self.n_units)
parental_education = np.random.normal(14, 3, self.n_units)
cognitive_ability = np.random.normal(100, 15, self.n_units)
# 未观测到的变量(动机、 ambition)
motivation = np.random.normal(0, 1, self.n_units)
# 教育选择模型(受观测和未观测变量影响)
education_propensity = (
0.3 * (family_income - np.mean(family_income))/np.std(family_income) +
0.4 * (parental_education - 14)/3 +
0.5 * (cognitive_ability - 100)/15 +
0.6 * motivation +
np.random.normal(0, 1, self.n_units)
)
college = (education_propensity > 0).astype(int)
# 收入模型(真实教育效应为true_effect)
log_income = (
10.0 +
true_effect * college +
0.2 * (family_income - np.mean(family_income))/np.std(family_income) +
0.3 * (parental_education - 14)/3 +
0.4 * (cognitive_ability - 100)/15 +
0.5 * motivation +
np.random.normal(0, 0.5, self.n_units)
)
income = np.exp(log_income)
self.data = pd.DataFrame({
'family_income': family_income,
'parental_education': parental_education,
'cognitive_ability': cognitive_ability,
'motivation': motivation, # 注意:在实际分析中这个变量是未观测到的
'college': college,
'income': income,
'log_income': log_income
})
return self.data
def naive_analysis(self):
"""简单比较大学毕业生和非大学毕业生的收入"""
if self.data is None:
self.generate_data()
college_income = self.data[self.data['college'] == 1]['income'].mean()
no_college_income = self.data[self.data['college'] == 0]['income'].mean()
income_diff = college_income - no_college_income
ratio = college_income / no_college_income
return income_diff, ratio
# 生成数据
education_data = EducationIncomeData(n_units=10000)
data = education_data.generate_data(true_effect=0.25) # 真实效应:教育使收入增加25%
# 简单比较
income_diff, ratio = education_data.naive_analysis()
print(f"大学毕业生平均收入: {data[data['college'] == 1]['income'].mean():.2f}")
print(f"非大学毕业生平均收入: {data[data['college'] == 0]['income'].mean():.2f}")
print(f"收入差异: {income_diff:.2f}")
print(f"收入比率: {ratio:.3f}")现在我们将应用前面介绍的各种因果推断方法来估计教育的真实效应。
def regression_adjustment(data):
"""使用回归调整控制观测到的混淆变量"""
# 只控制观测到的变量
model = smf.ols('log_income ~ college + family_income + parental_education + cognitive_ability',
data=data).fit()
# 指数转换获取收入比率
income_ratio = np.exp(model.params['college'])
return model.params['college'], income_ratio
# 回归调整
coef, ratio = regression_adjustment(data)
print(f"回归调整估计的教育效应 (log): {coef:.4f}")
print(f"收入比率: {ratio:.4f}")def propensity_score_matching_analysis(data):
"""使用PSM估计教育效应"""
matcher = PropensityScoreMatcher()
# 使用观测到的变量进行匹配
matched_data = matcher.match(
data,
['family_income', 'parental_education', 'cognitive_ability'],
'college',
'log_income'
)
# 计算匹配后的效应
college_mean = matched_data[matched_data['college'] == 1]['log_income'].mean()
no_college_mean = matched_data[matched_data['college'] == 0]['log_income'].mean()
effect = college_mean - no_college_mean
income_ratio = np.exp(effect)
return effect, income_ratio
# PSM分析
effect_psm, ratio_psm = propensity_score_matching_analysis(data)
print(f"PSM估计的教育效应 (log): {effect_psm:.4f}")
print(f"收入比率: {ratio_psm:.4f}")对于教育的影响,一个常见的工具变量是接近大学的地理距离:
def instrumental_variable_analysis(data):
"""使用工具变量法估计教育效应"""
# 添加工具变量:距离最近大学的距离(公里)
# 假设:距离影响上大学决策但不直接影响收入
np.random.seed(42)
data['distance_to_college'] = np.random.exponential(20, len(data))
# 工具变量与教育的关系
data['distance_effect'] = -0.02 * data['distance_to_college'] + np.random.normal(0, 0.5, len(data))
data['college_iv'] = (data['education_propensity'] + data['distance_effect'] > 0).astype(int)
# 2SLS回归
# 第一阶段:工具变量预测处理
stage1 = LinearRegression()
stage1.fit(data[['distance_to_college']], data['college'])
data['college_pred'] = stage1.predict(data[['distance_to_college']])
# 第二阶段:预测的处理预测结果
stage2 = LinearRegression()
stage2.fit(data[['college_pred']], data['log_income'])
iv_effect = stage2.coef_[0]
income_ratio = np.exp(iv_effect)
return iv_effect, income_ratio
# IV分析(注意:这里使用了简化的工具变量模拟)
iv_effect, iv_ratio = instrumental_variable_analysis(data.copy())
print(f"IV估计的教育效应 (log): {iv_effect:.4f}")
print(f"收入比率: {iv_ratio:.4f}")现在让我们比较各种方法的估计结果:
methods = ['简单比较', '回归调整', 'PSM', 'IV']
effects_log = [np.log(ratio), coef, effect_psm, iv_effect]
effects_ratio = [ratio, np.exp(coef), ratio_psm, iv_ratio]
results_df = pd.DataFrame({
'方法': methods,
'log收入效应': effects_log,
'收入比率': effects_ratio
})
print("各种方法的估计结果比较:")
print(results_df.round(4))
print(f"\n真实效应: log收入增加0.2500, 收入比率{np.exp(0.25):.4f}")
在本节中,我们将构建一个完整的因果推断管道,从数据准备到效应估计,并展示如何将其部署为可重用的Python包。
我们创建一个完整的因果推断管道,包含数据检查、预处理、多种分析方法和结果可视化。
class CausalInferencePipeline:
def __init__(self, data, treatment_var, outcome_var, covariates=None):
self.data = data.copy()
self.treatment_var = treatment_var
self.outcome_var = outcome_var
self.covariates = covariates or []
self.results = {}
def check_balance(self):
"""检查处理组和对照组的平衡性"""
balance_table = pd.DataFrame()
for covariate in self.covariates:
treated_mean = self.data[self.data[self.treatment_var] == 1][covariate].mean()
control_mean = self.data[self.data[self.treatment_var] == 0][covariate].mean()
difference = treated_mean - control_mean
p_value = self._calculate_p_value(
self.data[self.data[self.treatment_var] == 1][covariate],
self.data[self.data[self.treatment_var] == 0][covariate]
)
balance_table[covariate] = [treated_mean, control_mean, difference, p_value]
balance_table.index = ['处理组均值', '对照组均值', '差异', 'p值']
return balance_table.round(3)
def _calculate_p_value(self, group1, group2):
"""计算两组差异的p值"""
from scipy.stats import ttest_ind
_, p_value = ttest_ind(group1, group2)
return p_value
def run_naive_analysis(self):
"""运行简单比较分析"""
treated_mean = self.data[self.data[self.treatment_var] == 1][self.outcome_var].mean()
control_mean = self.data[self.data[self.treatment_var] == 0][self.outcome_var].mean()
effect = treated_mean - control_mean
self.results['naive'] = effect
return effect
def run_regression_adjustment(self):
"""运行回归调整分析"""
if not self.covariates:
raise ValueError("需要指定协变量")
formula = f"{self.outcome_var} ~ {self.treatment_var} + " + " + ".join(self.covariates)
model = smf.ols(formula, data=self.data).fit()
effect = model.params[self.treatment_var]
self.results['regression'] = effect
return effect, model
def run_propensity_score_matching(self, caliper=0.1):
"""运行倾向得分匹配"""
if not self.covariates:
raise ValueError("需要指定协变量")
matcher = PropensityScoreMatcher()
matched_data = matcher.match(
self.data, self.covariates, self.treatment_var, self.outcome_var, caliper
)
treated_mean = matched_data[matched_data[self.treatment_var] == 1][self.outcome_var].mean()
control_mean = matched_data[matched_data[self.treatment_var] == 0][self.outcome_var].mean()
effect = treated_mean - control_mean
self.results['psm'] = effect
self.matched_data = matched_data
return effect, matched_data
def run_iptw(self):
"""运行逆概率处理加权"""
if not self.covariates:
raise ValueError("需要指定协变量")
iptw = IPTWEstimator()
effect, weights = iptw.estimate_ate(self.data, self.covariates, self.treatment_var, self.outcome_var)
self.results['iptw'] = effect
self.weights = weights
return effect, weights
def run_all_methods(self):
"""运行所有分析方法"""
results = {}
results['naive'] = self.run_naive_analysis()
results['regression'], _ = self.run_regression_adjustment()
results['psm'], _ = self.run_propensity_score_matching()
results['iptw'], _ = self.run_iptw()
return results
def plot_results(self, true_effect=None):
"""可视化结果比较"""
methods = list(self.results.keys())
effects = list(self.results.values())
plt.figure(figsize=(10, 6))
bars = plt.bar(methods, effects, alpha=0.7)
# 添加真实效应线(如果已知)
if true_effect is not None:
plt.axhline(y=true_effect, color='r', linestyle='--', label=f'真实效应: {true_effect:.3f}')
plt.ylabel('处理效应估计')
plt.title('不同方法的因果效应估计比较')
plt.legend()
# 在柱子上添加数值标签
for bar, effect in zip(bars, effects):
plt.text(bar.get_x() + bar.get_width()/2, bar.get_height() + 0.01,
f'{effect:.3f}', ha='center', va='bottom')
plt.tight_layout()
plt.show()
# 使用管道示例
pipeline = CausalInferencePipeline(
data=obs_data,
treatment_var='treatment',
outcome_var='observed_y',
covariates=['age', 'education']
)
print("平衡性检查:")
print(pipeline.check_balance())
print("\n运行各种分析方法...")
results = pipeline.run_all_methods()
print("\n效应估计结果:")
for method, effect in results.items():
print(f"{method}: {effect:.3f}")
print(f"真实效应: 5.000")
# 可视化结果
pipeline.plot_results(true_effect=5.0)我们可以将上述因果推断方法打包成一个可重用的Python库:
causalib/
│
├── __init__.py
├── data.py # 数据生成和处理
├── methods.py # 因果推断方法
├── utils.py # 工具函数
├── visualization.py # 可视化功能
└── pipeline.py # 分析管道setup.py:
from setuptools import setup, find_packages
setup(
name="causalib",
version="0.1.0",
description="A causal inference library for Python",
author="Your Name",
author_email="your.email@example.com",
packages=find_packages(),
install_requires=[
"numpy>=1.20.0",
"pandas>=1.3.0",
"scipy>=1.7.0",
"statsmodels>=0.13.0",
"scikit-learn>=1.0.0",
"matplotlib>=3.5.0",
"seaborn>=0.11.0",
"networkx>=2.6.0"
],
python_requires=">=3.8",
)我们还可以创建一个Jupyter Notebook示例,展示如何使用这个库:
# example_usage.ipynb
import causalib as cl
import numpy as np
import pandas as pd
# 生成模拟数据
data_generator = cl.DataGenerator()
data = data_generator.generate_observational_data(
n_units=2000,
true_effect=5.0,
confounding_strength=0.7
)
# 初始化管道
pipeline = cl.CausalPipeline(
data=data,
treatment_var='treatment',
outcome_var='outcome',
covariates=['age', 'education', 'income']
)
# 检查平衡性
print("平衡性检查:")
print(pipeline.check_balance())
# 运行分析
results = pipeline.run_all_methods()
# 可视化结果
pipeline.plot_results(true_effect=5.0)
# 生成报告
report = pipeline.generate_report()
report.save('causal_analysis_report.html')为了确保代码质量,我们还需要编写自动化测试:
# tests/test_causalib.py
import unittest
import numpy as np
from causalib import DataGenerator, CausalPipeline
class TestCausalLib(unittest.TestCase):
def setUp(self):
self.data_generator = DataGenerator()
def test_data_generation(self):
"""测试数据生成"""
data = self.data_generator.generate_observational_data(
n_units=1000, true_effect=5.0
)
self.assertIn('treatment', data.columns)
self.assertIn('outcome', data.columns)
self.assertEqual(len(data), 1000)
def test_pipeline(self):
"""测试因果管道"""
data = self.data_generator.generate_observational_data(
n_units=500, true_effect=3.0
)
pipeline = CausalPipeline(
data=data,
treatment_var='treatment',
outcome_var='outcome',
covariates=['covariate1', 'covariate2']
)
results = pipeline.run_all_methods()
self.assertIn('naive', results)
self.assertIn('regression', results)
self.assertIn('psm', results)
self.assertIn('iptw', results)
# 检查所有方法都返回数值结果
for method, effect in results.items():
self.assertIsInstance(effect, (int, float, np.number))
if __name__ == '__main__':
unittest.main()
尽管因果推断方法强大,但它们都依赖于某些假设和条件,这些假设在实际应用中可能难以满足。了解这些挑战和局限对于正确解释因果推断结果至关重要。
挑战 | 描述 | 影响 |
|---|---|---|
未测混淆 | 未能测量所有相关混淆变量 | 估计偏差,因果结论无效 |
模型误设 | 错误的函数形式或模型假设 | 有偏或不一致的估计 |
重叠 violation | 处理组和对照组缺乏共同支持 | 外推不可靠,估计不稳定 |
测量误差 | 变量测量不准确 | 衰减偏倚,低估真实效应 |
选择偏差 | 样本选择与处理或结果相关 | 样本不具有代表性,结论不推广 |
敏感性分析评估因果结论对未测混淆的稳健性。下面实现一个基本的敏感性分析:
class SensitivityAnalysis:
def __init__(self):
pass
def rosenbaum_sensitivity(self, effect_estimate, gamma_range=np.arange(1, 2.1, 0.1)):
"""
Rosenbaum敏感性分析:评估结论对隐藏偏倚的稳健性
参数:
effect_estimate: 观察到的效应估计
gamma_range: 隐藏偏倚的强度范围(Γ)
返回:
不同Γ水平下的p值范围
"""
from scipy.stats import norm
results = []
for gamma in gamma_range:
# 计算隐藏偏倚下的p值范围
log_gamma = np.log(gamma)
se = effect_estimate / norm.ppf(0.975) # 近似标准误
# 计算p值边界
p_lower = 2 * (1 - norm.cdf(abs(effect_estimate)/se + log_gamma/se))
p_upper = 2 * (1 - norm.cdf(abs(effect_estimate)/se - log_gamma/se))
results.append({
'gamma': gamma,
'p_lower': p_lower,
'p_upper': p_upper,
'significant_lower': p_lower < 0.05,
'significant_upper': p_upper < 0.05
})
return pd.DataFrame(results)
def plot_sensitivity(self, sensitivity_results):
"""绘制敏感性分析图"""
plt.figure(figsize=(10, 6))
plt.plot(sensitivity_results['gamma'], sensitivity_results['p_lower'],
label='p值下限', marker='o')
plt.plot(sensitivity_results['gamma'], sensitivity_results['p_upper'],
label='p值上限', marker='o')
plt.axhline(y=0.05, color='r', linestyle='--', label='显著性阈值 (0.05)')
plt.xlabel('隐藏偏倚强度 (Γ)')
plt.ylabel('p值')
plt.title('Rosenbaum敏感性分析')
plt.legend()
plt.grid(True)
# 找到结论变得不显著的Γ值
critical_gamma = sensitivity_results[
sensitivity_results['significant_upper'] == False
]['gamma'].min()
if not np.isnan(critical_gamma):
plt.axvline(x=critical_gamma, color='g', linestyle=':',
label=f'临界值: Γ={critical_gamma:.2f}')
plt.legend()
plt.tight_layout()
plt.show()
return critical_gamma
# 敏感性分析示例
sensitivity = SensitivityAnalysis()
sensitivity_results = sensitivity.rosenbaum_sensitivity(effect_estimate=4.5)
critical_gamma = sensitivity.plot_sensitivity(sensitivity_results)
print(f"结论保持显著的最大隐藏偏倚: Γ={critical_gamma:.2f}")不同的因果推断方法依赖于不同的假设。以下是主要假设及其检验方法:
1I. 条件独立假设(倾向得分方法)
为了应对因果推断的挑战,我们提出以下实践建议:

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。