首页
学习
活动
专区
圈层
工具
发布
社区首页 >专栏 >[数据分析]因果推断入门:相关性≠因果性,那什么才是?

[数据分析]因果推断入门:相关性≠因果性,那什么才是?

原创
作者头像
二一年冬末
发布2025-09-22 17:37:28
发布2025-09-22 17:37:28
5840
举报
文章被收录于专栏:数据分析数据分析

在数据科学和统计学中,我们经常听到"相关性不等于因果性"的告诫。但如果我们真正关心的是因果效应——某个干预或处理如何改变结果——那么我们需要因果推断的方法。

I. 相关性 vs 因果性:根本区别

在我们深入探讨因果推断之前,必须明确区分相关性和因果性这两个基本概念。这一区分是理解因果推断方法论的基石。

定义与区别

特征

相关性

因果性

定义

两个变量之间的统计关联

一个变量变化引起另一个变量变化的机制

数学表示

P(Y|X) ≠ P(Y)

P(Y|do(X)) ≠ P(Y)

方向性

可以是双向的

通常是单向的(原因→结果)

混淆因素

可能受第三方变量影响

需要控制所有混淆变量

证据要求

统计关联即可

需要排除其他解释

经典示例:冰淇淋销量与溺水事件

考虑一个经典案例:冰淇淋销量和溺水事故之间存在正相关关系。当冰淇淋销量高时,溺水事故也更频繁。这是否意味着吃冰淇淋会导致溺水?

当然不是。这里存在一个混淆变量——温度(或季节)。天气炎热时,更多人购买冰淇淋,同时更多人去游泳,从而增加了溺水风险。一旦我们控制温度因素,冰淇淋和溺水之间的伪相关就会消失。

上图展示了混淆变量如何创建伪相关。虚线表示虚假关联,实线表示真实因果路径。

为什么相关性不等于因果性?

混淆偏差:第三方变量同时影响原因和结果变量

反向因果:结果变量反而影响原因变量

选择偏差:样本选择过程与研究对象相关

纯粹巧合:变量间偶然出现的统计关联

因果推断的核心问题

因果推断要回答的问题是:如果我们改变X,Y会如何变化?这比仅仅知道X和Y相关要困难得多,因为它需要反事实推理——我们需要知道在X不同的情况下,Y会发生什么。

在数学上,我们关心的是干预分布P(Y|do(X)),而不是观察分布P(Y|X)。这里的do算子表示主动干预而非被动观察,这是因果推断的核心概念。


II. 因果推断的基本框架

因果推断建立在形式化的框架之上,这些框架帮助我们清晰地定义因果问题并提供解决方案。本节将介绍潜在结果框架和因果图模型这两个核心框架。

潜在结果框架(Rubin因果模型)

潜在结果框架,也称为Rubin因果模型,是因果推断中最常用的框架之一。其核心思想是为每个个体定义两个潜在结果:接受处理时的结果Y₁和未接受处理时的结果Y₀。

基本概念:

  • 个体处理效应:ITE = Y₁ - Y₀
  • 平均处理效应:ATE = EY₁ - Y₀
  • 处理组平均处理效应:ATT = EY₁ - Y₀ | T = 1

根本问题: 对于任何个体,我们只能观察到一个潜在结果(要么Y₁,要么Y₀),另一个是反事实的。这被称为因果推断的根本问题。

代码语言:python
复制
import numpy as np
import pandas as pd

class PotentialOutcomesFramework:
    def __init__(self, n_units=1000):
        self.n_units = n_units
        self.data = None
        
    def generate_data(self, baseline_y0=50, treatment_effect=10, heterogeneity=5):
        """生成模拟潜在结果数据"""
        np.random.seed(42)
        
        # 生成基线特征
        age = np.random.normal(45, 15, self.n_units)
        education = np.random.normal(12, 3, self.n_units)
        
        # 生成潜在结果
        base_y0 = baseline_y0 + 0.5*age + 1.5*education + np.random.normal(0, 10, self.n_units)
        indiv_te = treatment_effect + heterogeneity*(education - 12)/3 + np.random.normal(0, 3, self.n_units)
        
        y0 = base_y0
        y1 = base_y0 + indiv_te
        
        # 基于倾向得分分配处理
        propensity = 1 / (1 + np.exp(-(age - 45)/15 - (education - 12)/3))
        treatment = np.random.binomial(1, propensity)
        
        # 观察结果
        observed_y = np.where(treatment == 1, y1, y0)
        
        self.data = pd.DataFrame({
            'age': age,
            'education': education,
            'y0': y0,
            'y1': y1,
            'treatment': treatment,
            'observed_y': observed_y,
            'true_ite': y1 - y0
        })
        
        return self.data
    
    def calculate_naive_ate(self):
        """计算简单的ATE估计(可能有偏差)"""
        if self.data is None:
            self.generate_data()
            
        treated_mean = self.data[self.data['treatment'] == 1]['observed_y'].mean()
        control_mean = self.data[self.data['treatment'] == 0]['observed_y'].mean()
        
        return treated_mean - control_mean
    
    def calculate_true_ate(self):
        """计算真实的ATE"""
        if self.data is None:
            self.generate_data()
            
        return self.data['true_ite'].mean()

# 示例使用
framework = PotentialOutcomesFramework()
data = framework.generate_data()

naive_ate = framework.calculate_naive_ate()
true_ate = framework.calculate_true_ate()

print(f"简单ATE估计: {naive_ate:.2f}")
print(f"真实ATE: {true_ate:.2f}")
print(f"估计偏差: {naive_ate - true_ate:.2f}")

因果图模型

因果图模型使用图形表示变量间的因果关系,帮助我们识别混淆变量并选择合适的因果推断策略。

基本元素:

  • 节点:表示变量
  • 边:表示因果关系
  • 路径:连接节点的边序列

d-分离准则: 判断两个变量在给定条件集下是否条件独立

代码语言:python
复制
import networkx as nx
import matplotlib.pyplot as plt

class CausalGraph:
    def __init__(self):
        self.graph = nx.DiGraph()
        
    def add_edges(self, edges):
        """添加边到因果图"""
        self.graph.add_edges_from(edges)
        
    def draw_graph(self):
        """绘制因果图"""
        plt.figure(figsize=(10, 8))
        pos = nx.spring_layout(self.graph)
        nx.draw(self.graph, pos, with_labels=True, node_size=3000, 
                node_color='skyblue', font_size=15, arrowsize=20)
        plt.title("因果图模型", fontsize=20)
        plt.show()
        
    def get_backdoor_paths(self, treatment, outcome):
        """获取后门路径"""
        backdoor_paths = []
        
        # 查找所有连接treatment和outcome的路径
        all_paths = nx.all_simple_paths(self.graph, treatment, outcome)
        
        for path in all_paths:
            # 检查是否是后门路径(以指向treatment的箭头开始)
            if len(path) >= 2 and (path[1], path[0]) in self.graph.edges:
                backdoor_paths.append(path)
                
        return backdoor_paths
    
    def get_confounders(self, treatment, outcome):
        """识别混淆变量"""
        backdoor_paths = self.get_backdoor_paths(treatment, outcome)
        confounders = set()
        
        for path in backdoor_paths:
            # 后门路径上的变量(除了treatment和outcome)都是潜在的混淆变量
            confounders.update(path[1:-1])
            
        return confounders

# 创建因果图示例
cg = CausalGraph()
edges = [('年龄', '治疗'), ('年龄', '健康结果'), ('教育', '治疗'), 
         ('教育', '健康结果'), ('治疗', '健康结果')]
cg.add_edges(edges)

cg.draw_graph()

print("后门路径:", cg.get_backdoor_paths('治疗', '健康结果'))
print("混淆变量:", cg.get_confounders('治疗', '健康结果'))

III. 随机实验:因果推断的黄金标准

随机对照试验(RCT)被视为因果推断的黄金标准,因为它通过随机化处理分配,确保处理组和对照组在所有观测和未观测特征上平均可比性。

随机化的力量

随机化的核心优势在于它消除了混淆偏差。通过随机分配处理,我们确保处理状态与任何潜在混淆因素无关,无论是观测到的还是未观测到的。

随机化的性质:

  1. 平衡性:处理组和对照组在基线特征上平均相似
  2. 无混淆:处理分配与潜在结果独立
  3. 可交换性:处理组和对照组可以互换而不改变结果分布

随机实验的实践考虑

尽管随机实验在理论上是理想的,但在实践中面临多种挑战:

挑战

描述

解决方案

伦理问题

某些处理可能有害或不道德

使用观察性研究替代

成本高昂

随机实验通常昂贵且耗时

精心设计减少样本量

实践可行性

某些处理难以随机分配

部分随机化或自然实验

外部有效性

实验结果可能无法推广到更大群体

确保样本代表性

非遵守问题

参与者可能不遵守分配的处理

使用工具变量方法

随机实验的Python模拟

让我们通过模拟一个随机实验来展示其如何提供无偏的因果效应估计。

代码语言:python
复制
class RandomizedExperiment:
    def __init__(self, n_units=1000, true_effect=5):
        self.n_units = n_units
        self.true_effect = true_effect
        self.data = None
        
    def generate_data(self):
        """生成模拟数据"""
        np.random.seed(42)
        
        # 生成基线特征
        age = np.random.normal(45, 15, self.n_units)
        education = np.random.normal(12, 3, self.n_units)
        baseline_health = np.random.normal(50, 10, self.n_units)
        
        # 随机分配处理
        treatment = np.random.binomial(1, 0.5, self.n_units)
        
        # 生成结果(处理效应为true_effect)
        y0 = baseline_health + 0.3*age + 0.5*education + np.random.normal(0, 5, self.n_units)
        y1 = y0 + self.true_effect
        observed_y = np.where(treatment == 1, y1, y0)
        
        self.data = pd.DataFrame({
            'age': age,
            'education': education,
            'baseline_health': baseline_health,
            'treatment': treatment,
            'y0': y0,
            'y1': y1,
            'observed_y': observed_y
        })
        
        return self.data
    
    def check_balance(self):
        """检查随机化后的平衡性"""
        if self.data is None:
            self.generate_data()
            
        balance_table = pd.DataFrame()
        
        for covariate in ['age', 'education', 'baseline_health']:
            treated_mean = self.data[self.data['treatment'] == 1][covariate].mean()
            control_mean = self.data[self.data['treatment'] == 0][covariate].mean()
            difference = treated_mean - control_mean
            p_value = self._calculate_p_value(
                self.data[self.data['treatment'] == 1][covariate],
                self.data[self.data['treatment'] == 0][covariate]
            )
            
            balance_table[covariate] = [treated_mean, control_mean, difference, p_value]
            
        balance_table.index = ['处理组均值', '对照组均值', '差异', 'p值']
        
        return balance_table.round(3)
    
    def _calculate_p_value(self, group1, group2):
        """计算两组差异的p值"""
        from scipy.stats import ttest_ind
        _, p_value = ttest_ind(group1, group2)
        return p_value
    
    def estimate_ate(self):
        """估计平均处理效应"""
        if self.data is None:
            self.generate_data()
            
        treated_mean = self.data[self.data['treatment'] == 1]['observed_y'].mean()
        control_mean = self.data[self.data['treatment'] == 0]['observed_y'].mean()
        
        ate = treated_mean - control_mean
        ate_se = self._calculate_ate_se()
        
        return ate, ate_se, (ate - 1.96*ate_se, ate + 1.96*ate_se)
    
    def _calculate_ate_se(self):
        """计算ATE的标准误"""
        treated_var = self.data[self.data['treatment'] == 1]['observed_y'].var()
        control_var = self.data[self.data['treatment'] == 0]['observed_y'].var()
        n_treated = len(self.data[self.data['treatment'] == 1])
        n_control = len(self.data[self.data['treatment'] == 0])
        
        se = np.sqrt(treated_var/n_treated + control_var/n_control)
        return se

# 运行随机实验模拟
experiment = RandomizedExperiment(n_units=2000, true_effect=5)
data = experiment.generate_data()

print("平衡性检查:")
print(experiment.check_balance())

ate, ate_se, ci = experiment.estimate_ate()
print(f"\nATE估计: {ate:.3f}")
print(f"标准误: {ate_se:.3f}")
print(f"95%置信区间: ({ci[0]:.3f}, {ci[1]:.3f})")

随机实验的分析方法

除了简单的均值比较,随机实验还可以使用回归分析来提高估计精度:

代码语言:python
复制
import statsmodels.api as sm
import statsmodels.formula.api as smf

def analyze_experiment_regression(data):
    """使用回归分析实验数据"""
    # 简单均值比较模型
    model1 = smf.ols('observed_y ~ treatment', data=data).fit()
    
    # 调整基线协变量提高精度
    model2 = smf.ols('observed_y ~ treatment + age + education + baseline_health', data=data).fit()
    
    # 包含交互项(处理效应异质性)
    model3 = smf.ols('observed_y ~ treatment * (age + education)', data=data).fit()
    
    return model1, model2, model3

# 分析实验数据
model1, model2, model3 = analyze_experiment_regression(data)

print("简单模型:")
print(f"ATE估计: {model1.params['treatment']:.3f}, p值: {model1.pvalues['treatment']:.4f}")

print("\n调整协变量模型:")
print(f"ATE估计: {model2.params['treatment']:.3f}, p值: {model2.pvalues['treatment']:.4f}")

print("\n异质性处理效应模型:")
print(f"平均ATE: {model3.params['treatment']:.3f}, p值: {model3.pvalues['treatment']:.4f}")

IV. 观察性研究中的因果推断方法

当随机实验不可行时,观察性研究成为因果推断的主要方法。本节介绍几种主要的观察性因果推断方法,包括匹配、加权和双重差分等。

1. 倾向得分匹配(PSM)

倾向得分匹配通过模拟随机实验的条件,创建处理组和对照组的可比样本。

代码语言:python
复制
from sklearn.linear_model import LogisticRegression
from sklearn.neighbors import NearestNeighbors

class PropensityScoreMatcher:
    def __init__(self):
        self.ps_model = None
        self.matched_data = None
        
    def estimate_propensity_scores(self, X, treatment):
        """估计倾向得分"""
        self.ps_model = LogisticRegression()
        self.ps_model.fit(X, treatment)
        propensity_scores = self.ps_model.predict_proba(X)[:, 1]
        return propensity_scores
    
    def match(self, data, covariates, treatment_var, outcome_var, caliper=0.1):
        """执行倾向得分匹配"""
        X = data[covariates]
        treatment = data[treatment_var]
        
        # 估计倾向得分
        data['propensity_score'] = self.estimate_propensity_scores(X, treatment)
        
        # 分离处理组和对照组
        treated = data[data[treatment_var] == 1]
        control = data[data[treatment_var] == 0]
        
        # 使用最近邻匹配
        nbrs = NearestNeighbors(n_neighbors=1, metric='euclidean')
        nbrs.fit(control[['propensity_score']])
        
        distances, indices = nbrs.kneighbors(treated[['propensity_score']])
        
        # 应用卡钳值
        matched_indices = []
        for i, (dist, idx) in enumerate(zip(distances, indices)):
            if dist < caliper:
                matched_indices.append((treated.index[i], control.index[idx[0]]))
        
        # 创建匹配后的数据集
        treated_matched = data.loc[[pair[0] for pair in matched_indices]]
        control_matched = data.loc[[pair[1] for pair in matched_indices]]
        
        self.matched_data = pd.concat([treated_matched, control_matched])
        
        return self.matched_data
    
    def estimate_ate(self):
        """估计匹配后的ATE"""
        if self.matched_data is None:
            raise ValueError("必须先执行匹配")
            
        treated_mean = self.matched_data[self.matched_data['treatment'] == 1]['observed_y'].mean()
        control_mean = self.matched_data[self.matched_data['treatment'] == 0]['observed_y'].mean()
        
        return treated_mean - control_mean

# 使用PSM示例
# 首先创建有混淆的观察性数据
np.random.seed(42)
n_units = 2000

# 生成混淆变量
age = np.random.normal(45, 15, n_units)
education = np.random.normal(12, 3, n_units)

# 基于混淆变量生成处理分配(非随机)
propensity = 1 / (1 + np.exp(-(age - 45)/15 - (education - 12)/3))
treatment = np.random.binomial(1, propensity)

# 生成结果(真实效应为5)
y0 = 50 + 0.5*age + 1.5*education + np.random.normal(0, 10, n_units)
y1 = y0 + 5
observed_y = np.where(treatment == 1, y1, y0)

obs_data = pd.DataFrame({
    'age': age,
    'education': education,
    'treatment': treatment,
    'observed_y': observed_y
})

# 执行PSM
matcher = PropensityScoreMatcher()
matched_data = matcher.match(obs_data, ['age', 'education'], 'treatment', 'observed_y')

naive_ate = obs_data[obs_data['treatment'] == 1]['observed_y'].mean() - obs_data[obs_data['treatment'] == 0]['observed_y'].mean()
psm_ate = matcher.estimate_ate()

print(f"简单比较ATE: {naive_ate:.3f}")
print(f"PSM调整后ATE: {psm_ate:.3f}")
print(f"真实ATE: 5.000")

2. 逆概率处理加权(IPTW)

逆概率处理加权通过加权创建一个人工群体,其中处理分配与混淆变量独立。

代码语言:python
复制
class IPTWEstimator:
    def __init__(self):
        self.ps_model = None
        
    def estimate_ate(self, data, covariates, treatment_var, outcome_var):
        """使用IPTW估计ATE"""
        X = data[covariates]
        treatment = data[treatment_var]
        
        # 估计倾向得分
        self.ps_model = LogisticRegression()
        self.ps_model.fit(X, treatment)
        propensity_scores = self.ps_model.predict_proba(X)[:, 1]
        
        # 计算权重
        weights = np.where(treatment == 1, 
                          1/propensity_scores, 
                          1/(1-propensity_scores))
        
        # 加权估计ATE
        weighted_treated = np.sum(weights * treatment * data[outcome_var]) / np.sum(weights * treatment)
        weighted_control = np.sum(weights * (1-treatment) * data[outcome_var]) / np.sum(weights * (1-treatment))
        
        ate = weighted_treated - weighted_control
        
        return ate, weights

# 使用IPTW示例
iptw = IPTWEstimator()
ate_iptw, weights = iptw.estimate_ate(obs_data, ['age', 'education'], 'treatment', 'observed_y')

print(f"IPTW估计ATE: {ate_iptw:.3f}")

3. 双重差分法(DID)

双重差分法适用于面板数据,通过比较处理组和对照组在处理前后的变化差异来估计因果效应。

代码语言:python
复制
class DifferenceInDifferences:
    def __init__(self):
        pass
        
    def generate_panel_data(self, n_units=1000, n_periods=5, true_effect=3):
        """生成面板数据"""
        np.random.seed(42)
        
        # 生成单位固定效应和时间固定效应
        unit_effects = np.random.normal(0, 5, n_units)
        time_effects = np.random.normal(0, 2, n_periods)
        
        # 生成处理分配(某些单位在某个时间点后接受处理)
        treatment_unit = np.random.binomial(1, 0.5, n_units)
        treatment_time = n_periods // 2  # 中间时间点引入处理
        
        data = []
        for i in range(n_units):
            for t in range(n_periods):
                # 基础结果
                base_outcome = 50 + unit_effects[i] + time_effects[t]
                
                # 处理效应(处理后且处理组)
                if t >= treatment_time and treatment_unit[i] == 1:
                    treatment_effect = true_effect
                else:
                    treatment_effect = 0
                
                outcome = base_outcome + treatment_effect + np.random.normal(0, 3)
                
                data.append({
                    'unit': i,
                    'time': t,
                    'treatment_unit': treatment_unit[i],
                    'post_treatment': 1 if t >= treatment_time else 0,
                    'outcome': outcome
                })
                
        return pd.DataFrame(data)
    
    def estimate_did(self, data):
        """估计DID"""
        # 分组计算前后均值
        pre_treatment = data[data['post_treatment'] == 0].groupby('treatment_unit')['outcome'].mean()
        post_treatment = data[data['post_treatment'] == 1].groupby('treatment_unit')['outcome'].mean()
        
        # 计算DID
        diff_treatment = post_treatment[1] - pre_treatment[1]
        diff_control = post_treatment[0] - pre_treatment[0]
        
        did_estimate = diff_treatment - diff_control
        
        return did_estimate

# 使用DID示例
did = DifferenceInDifferences()
panel_data = did.generate_panel_data()
did_estimate = did.estimate_did(panel_data)

print(f"DID估计效应: {did_estimate:.3f}")
print(f"真实效应: 3.000")

4. 工具变量法(IV)

工具变量法使用一个与处理相关但与结果无关的变量(工具变量)来估计因果效应。

代码语言:python
复制
class InstrumentalVariable:
    def __init__(self):
        pass
        
    def generate_iv_data(self, n_units=1000, true_effect=4):
        """生成工具变量数据"""
        np.random.seed(42)
        
        # 生成工具变量(与处理相关但与结果无关)
        instrument = np.random.normal(0, 1, n_units)
        
        # 生成混淆变量
        confounder = np.random.normal(0, 2, n_units)
        
        # 生成处理(受工具变量和混淆变量影响)
        treatment = 0.8 * instrument + 0.5 * confounder + np.random.normal(0, 1, n_units)
        treatment = (treatment > 0).astype(int)
        
        # 生成结果(受处理和混淆变量影响)
        outcome = true_effect * treatment + 1.5 * confounder + np.random.normal(0, 2, n_units)
        
        return pd.DataFrame({
            'instrument': instrument,
            'confounder': confounder,
            'treatment': treatment,
            'outcome': outcome
        })
    
    def estimate_iv(self, data):
        """使用2SLS估计IV"""
        from sklearn.linear_model import LinearRegression
        
        # 第一阶段:工具变量预测处理
        stage1 = LinearRegression()
        stage1.fit(data[['instrument']], data['treatment'])
        treatment_pred = stage1.predict(data[['instrument']])
        
        # 第二阶段:预测的处理预测结果
        stage2 = LinearRegression()
        stage2.fit(treatment_pred.reshape(-1, 1), data['outcome'])
        
        iv_estimate = stage2.coef_[0]
        
        return iv_estimate

# 使用IV示例
iv = InstrumentalVariable()
iv_data = iv.generate_iv_data()
iv_estimate = iv.estimate_iv(iv_data)

naive_estimate = LinearRegression().fit(iv_data[['treatment']], iv_data['outcome']).coef_[0]

print(f"简单回归估计: {naive_estimate:.3f}")
print(f"IV估计: {iv_estimate:.3f}")
print(f"真实效应: 4.000")

V. 案例研究:教育对收入的影响

教育对收入的影响是社会科学中一个经典的因果推断问题。我们将使用美国国民纵向调查(NLSY)数据的模拟版本来演示如何应用因果推断方法。

问题背景

我们想回答的问题是:高等教育(大学学位)是否真的能提高个人收入?如果是,提高多少?

挑战:

  • 自我选择:更聪明、更有动力的个体更可能上大学并获得高收入
  • 混淆变量:家庭背景、能力、动机等同时影响教育选择和收入
  • 测量误差:能力等关键变量难以测量

数据生成

我们首先生成一个模拟数据集,包含观察到的和未观察到的混淆变量:

代码语言:python
复制
class EducationIncomeData:
    def __init__(self, n_units=5000):
        self.n_units = n_units
        self.data = None
        
    def generate_data(self, true_effect=0.25):
        """生成教育-收入模拟数据"""
        np.random.seed(42)
        
        # 观测到的变量
        family_income = np.random.lognormal(10.5, 0.8, self.n_units)
        parental_education = np.random.normal(14, 3, self.n_units)
        cognitive_ability = np.random.normal(100, 15, self.n_units)
        
        # 未观测到的变量(动机、 ambition)
        motivation = np.random.normal(0, 1, self.n_units)
        
        # 教育选择模型(受观测和未观测变量影响)
        education_propensity = (
            0.3 * (family_income - np.mean(family_income))/np.std(family_income) +
            0.4 * (parental_education - 14)/3 +
            0.5 * (cognitive_ability - 100)/15 +
            0.6 * motivation +
            np.random.normal(0, 1, self.n_units)
        )
        
        college = (education_propensity > 0).astype(int)
        
        # 收入模型(真实教育效应为true_effect)
        log_income = (
            10.0 + 
            true_effect * college +
            0.2 * (family_income - np.mean(family_income))/np.std(family_income) +
            0.3 * (parental_education - 14)/3 +
            0.4 * (cognitive_ability - 100)/15 +
            0.5 * motivation +
            np.random.normal(0, 0.5, self.n_units)
        )
        
        income = np.exp(log_income)
        
        self.data = pd.DataFrame({
            'family_income': family_income,
            'parental_education': parental_education,
            'cognitive_ability': cognitive_ability,
            'motivation': motivation,  # 注意:在实际分析中这个变量是未观测到的
            'college': college,
            'income': income,
            'log_income': log_income
        })
        
        return self.data
    
    def naive_analysis(self):
        """简单比较大学毕业生和非大学毕业生的收入"""
        if self.data is None:
            self.generate_data()
            
        college_income = self.data[self.data['college'] == 1]['income'].mean()
        no_college_income = self.data[self.data['college'] == 0]['income'].mean()
        
        income_diff = college_income - no_college_income
        ratio = college_income / no_college_income
        
        return income_diff, ratio

# 生成数据
education_data = EducationIncomeData(n_units=10000)
data = education_data.generate_data(true_effect=0.25)  # 真实效应:教育使收入增加25%

# 简单比较
income_diff, ratio = education_data.naive_analysis()
print(f"大学毕业生平均收入: {data[data['college'] == 1]['income'].mean():.2f}")
print(f"非大学毕业生平均收入: {data[data['college'] == 0]['income'].mean():.2f}")
print(f"收入差异: {income_diff:.2f}")
print(f"收入比率: {ratio:.3f}")

应用因果推断方法

现在我们将应用前面介绍的各种因果推断方法来估计教育的真实效应。

1. 回归调整
代码语言:python
复制
def regression_adjustment(data):
    """使用回归调整控制观测到的混淆变量"""
    # 只控制观测到的变量
    model = smf.ols('log_income ~ college + family_income + parental_education + cognitive_ability', 
                   data=data).fit()
    
    # 指数转换获取收入比率
    income_ratio = np.exp(model.params['college'])
    
    return model.params['college'], income_ratio

# 回归调整
coef, ratio = regression_adjustment(data)
print(f"回归调整估计的教育效应 (log): {coef:.4f}")
print(f"收入比率: {ratio:.4f}")
2. 倾向得分匹配
代码语言:python
复制
def propensity_score_matching_analysis(data):
    """使用PSM估计教育效应"""
    matcher = PropensityScoreMatcher()
    
    # 使用观测到的变量进行匹配
    matched_data = matcher.match(
        data, 
        ['family_income', 'parental_education', 'cognitive_ability'],
        'college', 
        'log_income'
    )
    
    # 计算匹配后的效应
    college_mean = matched_data[matched_data['college'] == 1]['log_income'].mean()
    no_college_mean = matched_data[matched_data['college'] == 0]['log_income'].mean()
    
    effect = college_mean - no_college_mean
    income_ratio = np.exp(effect)
    
    return effect, income_ratio

# PSM分析
effect_psm, ratio_psm = propensity_score_matching_analysis(data)
print(f"PSM估计的教育效应 (log): {effect_psm:.4f}")
print(f"收入比率: {ratio_psm:.4f}")
3. 工具变量法

对于教育的影响,一个常见的工具变量是接近大学的地理距离:

代码语言:python
复制
def instrumental_variable_analysis(data):
    """使用工具变量法估计教育效应"""
    # 添加工具变量:距离最近大学的距离(公里)
    # 假设:距离影响上大学决策但不直接影响收入
    np.random.seed(42)
    data['distance_to_college'] = np.random.exponential(20, len(data))
    
    # 工具变量与教育的关系
    data['distance_effect'] = -0.02 * data['distance_to_college'] + np.random.normal(0, 0.5, len(data))
    data['college_iv'] = (data['education_propensity'] + data['distance_effect'] > 0).astype(int)
    
    # 2SLS回归
    # 第一阶段:工具变量预测处理
    stage1 = LinearRegression()
    stage1.fit(data[['distance_to_college']], data['college'])
    data['college_pred'] = stage1.predict(data[['distance_to_college']])
    
    # 第二阶段:预测的处理预测结果
    stage2 = LinearRegression()
    stage2.fit(data[['college_pred']], data['log_income'])
    
    iv_effect = stage2.coef_[0]
    income_ratio = np.exp(iv_effect)
    
    return iv_effect, income_ratio

# IV分析(注意:这里使用了简化的工具变量模拟)
iv_effect, iv_ratio = instrumental_variable_analysis(data.copy())
print(f"IV估计的教育效应 (log): {iv_effect:.4f}")
print(f"收入比率: {iv_ratio:.4f}")

结果比较与解释

现在让我们比较各种方法的估计结果:

代码语言:python
复制
methods = ['简单比较', '回归调整', 'PSM', 'IV']
effects_log = [np.log(ratio), coef, effect_psm, iv_effect]
effects_ratio = [ratio, np.exp(coef), ratio_psm, iv_ratio]

results_df = pd.DataFrame({
    '方法': methods,
    'log收入效应': effects_log,
    '收入比率': effects_ratio
})

print("各种方法的估计结果比较:")
print(results_df.round(4))
print(f"\n真实效应: log收入增加0.2500, 收入比率{np.exp(0.25):.4f}")

VI. 代码实现与部署

在本节中,我们将构建一个完整的因果推断管道,从数据准备到效应估计,并展示如何将其部署为可重用的Python包。

因果推断管道

我们创建一个完整的因果推断管道,包含数据检查、预处理、多种分析方法和结果可视化。

代码语言:python
复制
class CausalInferencePipeline:
    def __init__(self, data, treatment_var, outcome_var, covariates=None):
        self.data = data.copy()
        self.treatment_var = treatment_var
        self.outcome_var = outcome_var
        self.covariates = covariates or []
        self.results = {}
        
    def check_balance(self):
        """检查处理组和对照组的平衡性"""
        balance_table = pd.DataFrame()
        
        for covariate in self.covariates:
            treated_mean = self.data[self.data[self.treatment_var] == 1][covariate].mean()
            control_mean = self.data[self.data[self.treatment_var] == 0][covariate].mean()
            difference = treated_mean - control_mean
            p_value = self._calculate_p_value(
                self.data[self.data[self.treatment_var] == 1][covariate],
                self.data[self.data[self.treatment_var] == 0][covariate]
            )
            
            balance_table[covariate] = [treated_mean, control_mean, difference, p_value]
            
        balance_table.index = ['处理组均值', '对照组均值', '差异', 'p值']
        
        return balance_table.round(3)
    
    def _calculate_p_value(self, group1, group2):
        """计算两组差异的p值"""
        from scipy.stats import ttest_ind
        _, p_value = ttest_ind(group1, group2)
        return p_value
    
    def run_naive_analysis(self):
        """运行简单比较分析"""
        treated_mean = self.data[self.data[self.treatment_var] == 1][self.outcome_var].mean()
        control_mean = self.data[self.data[self.treatment_var] == 0][self.outcome_var].mean()
        
        effect = treated_mean - control_mean
        self.results['naive'] = effect
        
        return effect
    
    def run_regression_adjustment(self):
        """运行回归调整分析"""
        if not self.covariates:
            raise ValueError("需要指定协变量")
            
        formula = f"{self.outcome_var} ~ {self.treatment_var} + " + " + ".join(self.covariates)
        model = smf.ols(formula, data=self.data).fit()
        
        effect = model.params[self.treatment_var]
        self.results['regression'] = effect
        
        return effect, model
    
    def run_propensity_score_matching(self, caliper=0.1):
        """运行倾向得分匹配"""
        if not self.covariates:
            raise ValueError("需要指定协变量")
            
        matcher = PropensityScoreMatcher()
        matched_data = matcher.match(
            self.data, self.covariates, self.treatment_var, self.outcome_var, caliper
        )
        
        treated_mean = matched_data[matched_data[self.treatment_var] == 1][self.outcome_var].mean()
        control_mean = matched_data[matched_data[self.treatment_var] == 0][self.outcome_var].mean()
        
        effect = treated_mean - control_mean
        self.results['psm'] = effect
        self.matched_data = matched_data
        
        return effect, matched_data
    
    def run_iptw(self):
        """运行逆概率处理加权"""
        if not self.covariates:
            raise ValueError("需要指定协变量")
            
        iptw = IPTWEstimator()
        effect, weights = iptw.estimate_ate(self.data, self.covariates, self.treatment_var, self.outcome_var)
        
        self.results['iptw'] = effect
        self.weights = weights
        
        return effect, weights
    
    def run_all_methods(self):
        """运行所有分析方法"""
        results = {}
        
        results['naive'] = self.run_naive_analysis()
        results['regression'], _ = self.run_regression_adjustment()
        results['psm'], _ = self.run_propensity_score_matching()
        results['iptw'], _ = self.run_iptw()
        
        return results
    
    def plot_results(self, true_effect=None):
        """可视化结果比较"""
        methods = list(self.results.keys())
        effects = list(self.results.values())
        
        plt.figure(figsize=(10, 6))
        bars = plt.bar(methods, effects, alpha=0.7)
        
        # 添加真实效应线(如果已知)
        if true_effect is not None:
            plt.axhline(y=true_effect, color='r', linestyle='--', label=f'真实效应: {true_effect:.3f}')
        
        plt.ylabel('处理效应估计')
        plt.title('不同方法的因果效应估计比较')
        plt.legend()
        
        # 在柱子上添加数值标签
        for bar, effect in zip(bars, effects):
            plt.text(bar.get_x() + bar.get_width()/2, bar.get_height() + 0.01, 
                    f'{effect:.3f}', ha='center', va='bottom')
        
        plt.tight_layout()
        plt.show()

# 使用管道示例
pipeline = CausalInferencePipeline(
    data=obs_data,
    treatment_var='treatment',
    outcome_var='observed_y',
    covariates=['age', 'education']
)

print("平衡性检查:")
print(pipeline.check_balance())

print("\n运行各种分析方法...")
results = pipeline.run_all_methods()

print("\n效应估计结果:")
for method, effect in results.items():
    print(f"{method}: {effect:.3f}")

print(f"真实效应: 5.000")

# 可视化结果
pipeline.plot_results(true_effect=5.0)

部署为Python包

我们可以将上述因果推断方法打包成一个可重用的Python库:

代码语言:shell
复制
causalib/
│
├── __init__.py
├── data.py          # 数据生成和处理
├── methods.py       # 因果推断方法
├── utils.py         # 工具函数
├── visualization.py # 可视化功能
└── pipeline.py      # 分析管道

setup.py:

代码语言:python
复制
from setuptools import setup, find_packages

setup(
    name="causalib",
    version="0.1.0",
    description="A causal inference library for Python",
    author="Your Name",
    author_email="your.email@example.com",
    packages=find_packages(),
    install_requires=[
        "numpy>=1.20.0",
        "pandas>=1.3.0",
        "scipy>=1.7.0",
        "statsmodels>=0.13.0",
        "scikit-learn>=1.0.0",
        "matplotlib>=3.5.0",
        "seaborn>=0.11.0",
        "networkx>=2.6.0"
    ],
    python_requires=">=3.8",
)

创建示例Notebook

我们还可以创建一个Jupyter Notebook示例,展示如何使用这个库:

代码语言:python
复制
# example_usage.ipynb
import causalib as cl
import numpy as np
import pandas as pd

# 生成模拟数据
data_generator = cl.DataGenerator()
data = data_generator.generate_observational_data(
    n_units=2000, 
    true_effect=5.0,
    confounding_strength=0.7
)

# 初始化管道
pipeline = cl.CausalPipeline(
    data=data,
    treatment_var='treatment',
    outcome_var='outcome',
    covariates=['age', 'education', 'income']
)

# 检查平衡性
print("平衡性检查:")
print(pipeline.check_balance())

# 运行分析
results = pipeline.run_all_methods()

# 可视化结果
pipeline.plot_results(true_effect=5.0)

# 生成报告
report = pipeline.generate_report()
report.save('causal_analysis_report.html')

自动化测试

为了确保代码质量,我们还需要编写自动化测试:

代码语言:python
复制
# tests/test_causalib.py
import unittest
import numpy as np
from causalib import DataGenerator, CausalPipeline

class TestCausalLib(unittest.TestCase):
    def setUp(self):
        self.data_generator = DataGenerator()
        
    def test_data_generation(self):
        """测试数据生成"""
        data = self.data_generator.generate_observational_data(
            n_units=1000, true_effect=5.0
        )
        
        self.assertIn('treatment', data.columns)
        self.assertIn('outcome', data.columns)
        self.assertEqual(len(data), 1000)
        
    def test_pipeline(self):
        """测试因果管道"""
        data = self.data_generator.generate_observational_data(
            n_units=500, true_effect=3.0
        )
        
        pipeline = CausalPipeline(
            data=data,
            treatment_var='treatment',
            outcome_var='outcome',
            covariates=['covariate1', 'covariate2']
        )
        
        results = pipeline.run_all_methods()
        
        self.assertIn('naive', results)
        self.assertIn('regression', results)
        self.assertIn('psm', results)
        self.assertIn('iptw', results)
        
        # 检查所有方法都返回数值结果
        for method, effect in results.items():
            self.assertIsInstance(effect, (int, float, np.number))

if __name__ == '__main__':
    unittest.main()

VII. 因果推断的挑战与局限

尽管因果推断方法强大,但它们都依赖于某些假设和条件,这些假设在实际应用中可能难以满足。了解这些挑战和局限对于正确解释因果推断结果至关重要。

主要挑战

挑战

描述

影响

未测混淆

未能测量所有相关混淆变量

估计偏差,因果结论无效

模型误设

错误的函数形式或模型假设

有偏或不一致的估计

重叠 violation

处理组和对照组缺乏共同支持

外推不可靠,估计不稳定

测量误差

变量测量不准确

衰减偏倚,低估真实效应

选择偏差

样本选择与处理或结果相关

样本不具有代表性,结论不推广

敏感性分析

敏感性分析评估因果结论对未测混淆的稳健性。下面实现一个基本的敏感性分析:

代码语言:python
复制
class SensitivityAnalysis:
    def __init__(self):
        pass
        
    def rosenbaum_sensitivity(self, effect_estimate, gamma_range=np.arange(1, 2.1, 0.1)):
        """
        Rosenbaum敏感性分析:评估结论对隐藏偏倚的稳健性
        
       参数:
        effect_estimate: 观察到的效应估计
        gamma_range: 隐藏偏倚的强度范围(Γ)
        
       返回:
        不同Γ水平下的p值范围
        """
        from scipy.stats import norm
        
        results = []
        
        for gamma in gamma_range:
            # 计算隐藏偏倚下的p值范围
            log_gamma = np.log(gamma)
            se = effect_estimate / norm.ppf(0.975)  # 近似标准误
            
            # 计算p值边界
            p_lower = 2 * (1 - norm.cdf(abs(effect_estimate)/se + log_gamma/se))
            p_upper = 2 * (1 - norm.cdf(abs(effect_estimate)/se - log_gamma/se))
            
            results.append({
                'gamma': gamma,
                'p_lower': p_lower,
                'p_upper': p_upper,
                'significant_lower': p_lower < 0.05,
                'significant_upper': p_upper < 0.05
            })
            
        return pd.DataFrame(results)
    
    def plot_sensitivity(self, sensitivity_results):
        """绘制敏感性分析图"""
        plt.figure(figsize=(10, 6))
        
        plt.plot(sensitivity_results['gamma'], sensitivity_results['p_lower'], 
                label='p值下限', marker='o')
        plt.plot(sensitivity_results['gamma'], sensitivity_results['p_upper'], 
                label='p值上限', marker='o')
        
        plt.axhline(y=0.05, color='r', linestyle='--', label='显著性阈值 (0.05)')
        
        plt.xlabel('隐藏偏倚强度 (Γ)')
        plt.ylabel('p值')
        plt.title('Rosenbaum敏感性分析')
        plt.legend()
        plt.grid(True)
        
        # 找到结论变得不显著的Γ值
        critical_gamma = sensitivity_results[
            sensitivity_results['significant_upper'] == False
        ]['gamma'].min()
        
        if not np.isnan(critical_gamma):
            plt.axvline(x=critical_gamma, color='g', linestyle=':', 
                       label=f'临界值: Γ={critical_gamma:.2f}')
            plt.legend()
        
        plt.tight_layout()
        plt.show()
        
        return critical_gamma

# 敏感性分析示例
sensitivity = SensitivityAnalysis()
sensitivity_results = sensitivity.rosenbaum_sensitivity(effect_estimate=4.5)
critical_gamma = sensitivity.plot_sensitivity(sensitivity_results)

print(f"结论保持显著的最大隐藏偏倚: Γ={critical_gamma:.2f}")

常见假设及其检验

不同的因果推断方法依赖于不同的假设。以下是主要假设及其检验方法:

1I. 条件独立假设(倾向得分方法)

  • 检验:平衡性检查,标准化差异
  1. 平行趋势假设(双重差分法)
    • 检验:事件研究图,预处理期趋势检验
  2. 排他性约束(工具变量法)
    • 检验:过度识别检验, placebo检验
  3. 一致性假设(所有方法)
    • 检验:处理异质性分析,机制检验

实践建议

为了应对因果推断的挑战,我们提出以下实践建议:

  1. 透明报告所有假设
    • 明确说明分析依赖的假设
    • 报告假设检验的结果
  2. 进行全面的敏感性分析
    • 评估结论对未测混淆的稳健性
    • 尝试不同的模型设定和估计方法
  3. 三角验证
    • 使用多种方法估计同一效应
    • 比较不同方法的结果
  4. 谨慎解释结果
    • 承认因果推断的局限性
    • 避免过度宣称因果结论
  5. 优先考虑研究设计
    • 尽可能使用随机实验或自然实验设计
    • 在数据收集前考虑因果推断需求

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • I. 相关性 vs 因果性:根本区别
    • 定义与区别
    • 经典示例:冰淇淋销量与溺水事件
    • 为什么相关性不等于因果性?
    • 因果推断的核心问题
  • II. 因果推断的基本框架
    • 潜在结果框架(Rubin因果模型)
    • 因果图模型
  • III. 随机实验:因果推断的黄金标准
    • 随机化的力量
    • 随机实验的实践考虑
    • 随机实验的Python模拟
    • 随机实验的分析方法
  • IV. 观察性研究中的因果推断方法
    • 1. 倾向得分匹配(PSM)
    • 2. 逆概率处理加权(IPTW)
    • 3. 双重差分法(DID)
    • 4. 工具变量法(IV)
  • V. 案例研究:教育对收入的影响
    • 问题背景
    • 数据生成
    • 应用因果推断方法
      • 1. 回归调整
      • 2. 倾向得分匹配
      • 3. 工具变量法
    • 结果比较与解释
  • VI. 代码实现与部署
    • 因果推断管道
    • 部署为Python包
    • 创建示例Notebook
    • 自动化测试
  • VII. 因果推断的挑战与局限
    • 主要挑战
    • 敏感性分析
    • 常见假设及其检验
    • 实践建议
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档