首页
学习
活动
专区
圈层
工具
发布
社区首页 >专栏 >[数据分析]合成控制法:当没有完美对照组时该怎么办?

[数据分析]合成控制法:当没有完美对照组时该怎么办?

原创
作者头像
二一年冬末
发布2025-09-23 13:11:59
发布2025-09-23 13:11:59
2610
举报
文章被收录于专栏:数据分析数据分析

I. 引言:单一处理单元的因果推断难题

在现实世界的政策评估中,我们经常遇到这样的情况:一项政策只在某个地区、某个企业或某个特定群体中实施,而我们需要评估这项政策的效果。这种"单一处理单元"的问题给传统的因果推断方法带来了巨大挑战。

传统方法的局限性

方法

在处理单一单元时的局限性

随机对照试验

需要将单元随机分配到处理组和对照组,不适用于单一处理单元

双重差分法

需要多个处理单元和多个控制单元,满足平行趋势假设

断点回归

需要明确的断点和处理变量连续性,适用场景有限

匹配方法

需要多个处理单元寻找匹配对象

真实世界中的单一处理单元问题

在实际政策评估中,单一处理单元的情况比比皆是:

地区性政策:某个州或国家实施独特政策(如加州的烟草控制法案)

企业决策:某家公司实施特定战略(如特斯拉的电动汽车策略)

社会运动:某个社区发起的社会改革运动

国际事件:某个国家经历的特殊事件(如东日本大地震后的核政策变化)

合成控制法的诞生

合成控制法由Abadie和Gardeazabal(2003)提出,最初用于评估西班牙巴斯克地区恐怖主义活动对经济增长的影响。随后,Abadie、Diamond和Hainmueller(2010)将其发展为成熟的因果推断方法,应用于加州烟草控制政策的评估。


II. 合成控制法的基本思想

合成控制法的核心思想非常直观且巧妙:当没有一个完美的对照组时,我们可以通过加权组合多个控制单元来构建一个"合成控制组",这个合成控制组在处理前的特征和趋势上与处理单元尽可能相似。

合成控制法的直观理解

想象一下,我们要评估加州某项政策的效果。理想的对照组应该是"没有实施该政策的加州",但这在现实中不存在。合成控制法的解决方案是:寻找其他州的加权组合,使得这个"合成加州"在政策实施前的经济、人口、社会特征等方面与真实加州高度相似。

合成控制法的优势

与传统方法相比,合成控制法具有独特优势:

特征

合成控制法

传统方法

处理单元数量

可处理单个处理单元

需要多个处理单元

对照组构建

数据驱动,透明

常常主观选择

趋势匹配

强调处理前趋势相似性

可能忽略趋势匹配

外推问题

限制在捐赠池范围内

可能过度外推

合成控制法的适用条件

合成控制法并非万能钥匙,它需要满足一定的条件:

  1. 处理前时期足够长:需要有足够多的预处理期数据来估计权重
  2. 捐赠池规模适当:控制单元数量既不能太少(缺乏选择),也不能太多(过度拟合)
  3. 处理效应明显:政策效果应该足够大,能够从随机波动中区分出来
  4. 无传染效应:处理效应不会扩散到控制单元

III. 合成控制法的数学模型

合成控制法建立在严谨的数学框架之上,通过优化算法寻找最优的权重组合。本节将深入探讨合成控制法的数学模型和估计过程。

基本设定

假设我们有J+1个单元,其中第一个单元(j=1)是处理单元,其余J个单元(j=2,...,J+1)是控制单元。观察期分为预处理期(t=1,...,T₀)和处理后期(t=T₀+1,...,T)。

令:

  • Y_{jt} :单元j在时期t的结果
  • Y_{jt}^N :未接受处理时的潜在结果
  • Y_{jt}^I :接受处理时的潜在结果
  • D_{jt} :处理指示变量(处理后为1,否则为0)

则观察结果可表示为:

Y_{jt} = Y_{jt}^N + \alpha_{jt}D_{jt}

其中\alpha_{jt} = Y_{jt}^I - Y_{jt}^N 是处理效应。

合成控制组的构建

合成控制组是控制单元的加权组合,权重向量W = (w_2, ..., w_{J+1}) 满足:

  • w_j \geq 0 对于所有j
  • \sum_{j=2}^{J+1} w_j = 1

合成控制组在时期t的结果为:

Y_{1t}^N = \sum_{j=2}^{J+1} w_j Y_{jt}

权重选择准则

权重的选择基于预处理期的匹配质量。令X_1 是处理单元的预测变量向量,X_0 是控制单元的预测变量矩阵(每列对应一个控制单元)。我们选择权重W使得:

\min_W \|X_1 - X_0W\|_V = \sqrt{(X_1 - X_0W)'V(X_1 - X_0W)}

其中V是一个对角矩阵,表示各预测变量的相对重要性。

预测变量的选择

预测变量通常包括两类:

  1. 预处理期的结果变量值
  2. 影响结果的其他协变量
代码语言:python
复制
import numpy as np
import pandas as pd
from scipy.optimize import minimize
import matplotlib.pyplot as plt

class SyntheticControl:
    def __init__(self):
        self.weights = None
        self.v_matrix = None
        self.synthetic_control = None
        
    def fit(self, treated_pre, control_pre, predictors=None, V=None):
        """
        拟合合成控制模型
        
        参数:
        treated_pre: 处理单元预处理期数据 (T0,)
        control_pre: 控制单元预处理期数据 (T0, J)
        predictors: 预测变量矩阵 (K, J+1),第一列是处理单元
        V: 预测变量权重矩阵,如果为None则自动选择
        """
        self.T0 = treated_pre.shape[0]
        self.J = control_pre.shape[1]
        
        # 如果没有提供预测变量,使用预处理期结果变量的均值
        if predictors is None:
            predictors = np.vstack([treated_pre.mean(), control_pre.mean(axis=0)]).T
            predictors = np.vstack([predictors, treated_pre[-3:].mean(), control_pre[-3:,:].mean(axis=0)]).T
        
        # 分离处理单元和控制单元的预测变量
        X1 = predictors[:, 0]  # 处理单元预测变量
        X0 = predictors[:, 1:] # 控制单元预测变量
        
        # 如果没有提供V矩阵,使用均等权重或基于回归的权重
        if V is None:
            # 简单方法:使用均等权重
            V = np.diag(np.ones(len(X1)) / len(X1))
        
        self.V = V
        
        # 定义优化问题
        def objective(W):
            return np.sqrt((X1 - X0 @ W).T @ V @ (X1 - X0 @ W))
        
        # 约束条件:权重非负且和为1
        constraints = ({'type': 'eq', 'fun': lambda W: np.sum(W) - 1})
        bounds = [(0, 1) for _ in range(self.J)]
        
        # 初始值:均等权重
        W0 = np.ones(self.J) / self.J
        
        # 优化
        result = minimize(objective, W0, method='SLSQP', 
                         bounds=bounds, constraints=constraints)
        
        if result.success:
            self.weights = result.x
        else:
            raise ValueError("权重优化失败")
        
        return self
    
    def predict(self, control_post):
        """预测处理后的合成控制结果"""
        if self.weights is None:
            raise ValueError("需要先拟合模型")
        
        self.synthetic_control = control_post @ self.weights
        return self.synthetic_control
    
    def calculate_effect(self, treated_post, control_post):
        """计算处理效应"""
        synthetic_post = self.predict(control_post)
        effect = treated_post - synthetic_post
        return effect
    
    def plot_results(self, treated_pre, treated_post, control_pre, control_post, event_time=None):
        """绘制合成控制结果"""
        if event_time is None:
            event_time = list(range(-len(treated_pre), len(treated_post)))
        
        # 计算合成控制组在整个时期的轨迹
        synthetic_pre = control_pre @ self.weights
        synthetic_post = control_post @ self.weights
        
        # 合并预处理期和处理后期
        treated_full = np.concatenate([treated_pre, treated_post])
        synthetic_full = np.concatenate([synthetic_pre, synthetic_post])
        
        plt.figure(figsize=(12, 6))
        plt.plot(event_time, treated_full, 'b-', linewidth=2, label='处理单元')
        plt.plot(event_time, synthetic_full, 'r--', linewidth=2, label='合成控制组')
        plt.axvline(x=0, color='gray', linestyle='-', alpha=0.7, label='政策实施')
        plt.xlabel('时间(相对政策实施)')
        plt.ylabel('结果变量')
        plt.title('合成控制法分析结果')
        plt.legend()
        plt.grid(True, alpha=0.3)
        plt.show()
        
        # 绘制处理效应
        effect = treated_full - synthetic_full
        plt.figure(figsize=(12, 4))
        plt.bar(event_time, effect, alpha=0.7, color='orange')
        plt.axhline(y=0, color='gray', linestyle='-', alpha=0.7)
        plt.axvline(x=0, color='gray', linestyle='-', alpha=0.7)
        plt.xlabel('时间(相对政策实施)')
        plt.ylabel('处理效应')
        plt.title('处理效应随时间变化')
        plt.grid(True, alpha=0.3)
        plt.show()
        
        return effect

# 示例使用
def example_synthetic_control():
    """合成控制法示例"""
    np.random.seed(42)
    
    # 生成模拟数据
    T0 = 20  # 预处理期
    T1 = 10  # 处理后期
    J = 20   # 控制单元数量
    
    # 生成共同趋势和个体固定效应
    common_trend = np.linspace(0, 10, T0 + T1)
    unit_effects = np.random.normal(0, 2, J + 1)
    
    # 生成预处理期数据
    control_pre = np.zeros((T0, J))
    for j in range(J):
        control_pre[:, j] = common_trend[:T0] + unit_effects[j+1] + np.random.normal(0, 0.5, T0)
    
    treated_pre = common_trend[:T0] + unit_effects[0] + np.random.normal(0, 0.5, T0)
    
    # 生成处理后期数据(真实效应为2)
    control_post = np.zeros((T1, J))
    for j in range(J):
        control_post[:, j] = common_trend[T0:] + unit_effects[j+1] + np.random.normal(0, 0.5, T1)
    
    treated_post = common_trend[T0:] + unit_effects[0] + 2 + np.random.normal(0, 0.5, T1)  # 真实效应为2
    
    # 应用合成控制法
    scm = SyntheticControl()
    scm.fit(treated_pre, control_pre)
    
    effect = scm.calculate_effect(treated_post, control_post)
    avg_effect = np.mean(effect)
    
    print(f"估计的平均处理效应: {avg_effect:.3f}")
    print(f"真实处理效应: 2.000")
    print(f"估计偏差: {avg_effect - 2:.3f}")
    
    # 绘制结果
    event_time = list(range(-T0, T1))
    scm.plot_results(treated_pre, treated_post, control_pre, control_post, event_time)
    
    return scm, effect

# 运行示例
scm, effect = example_synthetic_control()

IV. 案例研究:加州烟草控制政策评估

在本节中,我们将重现Abadie、Diamond和Hainmueller(2010)的经典研究,评估加州1988年通过的99号提案(烟草控制法案)对烟草消费的影响。这个案例是合成控制法最著名的应用之一。

研究背景

1988年,加州通过了99号提案,大幅提高烟草税并将部分税收用于烟草控制项目。这项政策是美国历史上最严格的烟草控制措施之一。我们需要评估这项政策是否有效降低了加州的烟草消费量。

数据准备

我们使用与原始研究类似的数据,包括:

处理单元:加利福尼亚州

控制单元:其他38个州(排除有类似政策的州)

结果变量:人均烟草消费量(包/人/年)

预测变量

  • 预处理期的人均烟草消费量
  • 人均收入
  • 15-24岁人口比例
  • 啤酒消费量
  • 烟草价格

合成控制分析

现在我们对加州的烟草控制政策进行评估:

代码语言:python
复制
def run_california_analysis():
    """运行加州烟草政策评估"""
    # 准备数据
    study = CaliforniaTobaccoStudy()
    scm_data = study.prepare_scm_data()
    
    # 应用合成控制法
    scm = SyntheticControl()
    
    # 使用特征矩阵作为预测变量
    scm.fit(scm_data['treated_pre'], 
            scm_data['control_pre'],
            predictors=np.vstack([scm_data['X1'], scm_data['X0']]).T)
    
    # 计算处理效应
    effect = scm.calculate_effect(scm_data['treated_post'], 
                                 scm_data['control_post'])
    
    # 绘制结果
    years = study.pre_period + study.post_period
    event_time = [year - study.data['treatment_year'] for year in years]
    
    effect_plot = scm.plot_results(scm_data['treated_pre'], 
                                  scm_data['treated_post'],
                                  scm_data['control_pre'], 
                                  scm_data['control_post'],
                                  event_time)
    
    # 计算平均处理效应和累计效应
    avg_effect = np.mean(effect)
    cumulative_effect = np.sum(effect)
    
    print("加州烟草控制政策评估结果:")
    print(f"平均年度处理效应: {avg_effect:.2f} 包/人/年")
    print(f"累计处理效应 (1989-2000): {cumulative_effect:.2f} 包/人")
    print(f"相当于平均减少: {avg_effect/scm_data['treated_pre'][-1]*100:.1f}%")
    
    # 显示权重分布
    print("\n合成控制组权重分布:")
    weights_df = pd.DataFrame({
        'State': study.control_states,
        'Weight': scm.weights
    }).sort_values('Weight', ascending=False)
    
    print(weights_df.head(10))  # 显示前10个最重要控制州
    
    return scm, effect, weights_df

# 运行分析
scm_ca, effect_ca, weights_df = run_california_analysis()

结果解释与政策含义

加州烟草控制政策评估的结果显示:

政策效果显著:99号提案实施后,加州的人均烟草消费量显著下降

效果持续增强:政策效果随时间推移而增强,显示长期影响

合成控制组质量高:预处理期加州与合成加州高度吻合,验证了方法的有效性

这一研究为烟草控制政策提供了有力证据,表明提高烟草税和资助控制项目可以有效减少烟草消费。


V. Python代码实现与部署

在本节中,我们将构建一个完整的、可重用的合成控制法Python包,包含数据预处理、模型拟合、效果评估和可视化等完整功能。

完整的合成控制法实现

我们创建一个功能完整的SyntheticControl类,包含所有必要的方法:

代码语言:python
复制
import numpy as np
import pandas as pd
from scipy.optimize import minimize
import matplotlib.pyplot as plt
from matplotlib.gridspec import GridSpec
import seaborn as sns

class AdvancedSyntheticControl:
    def __init__(self, treated_unit, donor_pool, time_index=None):
        """
        初始化高级合成控制模型
        
        参数:
        treated_unit: 处理单元名称或标识
        donor_pool: 捐赠池单元列表
        time_index: 时间索引
        """
        self.treated_unit = treated_unit
        self.donor_pool = donor_pool
        self.time_index = time_index
        self.weights = None
        self.V_matrix = None
        self.prediction = None
        self.effect = None
        
    def fit(self, data, treatment_time, predictors=None, 
            V_method='equal', custom_V=None, optimizer_options=None):
        """
        拟合合成控制模型
        
        参数:
        data: 面板数据,索引为时间,列为单元
        treatment_time: 处理开始时间
        predictors: 预测变量,如果为None则使用预处理期数据
        V_method: V矩阵选择方法 ('equal', 'regression', 'custom')
        custom_V: 自定义V矩阵
        optimizer_options: 优化器选项
        """
        self.data = data
        self.treatment_time = treatment_time
        self.pre_period = data.index[data.index < treatment_time]
        self.post_period = data.index[data.index >= treatment_time]
        
        if len(self.pre_period) == 0:
            raise ValueError("预处理期数据不足")
        
        # 准备数据
        self._prepare_data(predictors)
        
        # 选择V矩阵
        self._choose_V_matrix(V_method, custom_V)
        
        # 优化权重
        self._optimize_weights(optimizer_options)
        
        return self
    
    def _prepare_data(self, predictors):
        """准备数据"""
        # 处理单元数据
        self.y_treated_pre = self.data.loc[self.pre_period, self.treated_unit].values
        self.y_treated_post = self.data.loc[self.post_period, self.treated_unit].values
        
        # 控制单元数据
        self.y_donor_pre = self.data.loc[self.pre_period, self.donor_pool].values
        self.y_donor_post = self.data.loc[self.post_period, self.donor_pool].values
        
        # 预测变量
        if predictors is None:
            # 使用预处理期数据的各种统计量作为预测变量
            pre_means = self.data.loc[self.pre_period].mean()
            pre_early = self.data.loc[self.pre_period[:len(self.pre_period)//2]].mean()
            pre_late = self.data.loc[self.pre_period[len(self.pre_period)//2:]].mean()
            pre_trend = np.polyfit(range(len(self.pre_period)), 
                                  self.data.loc[self.pre_period, self.treated_unit], 1)[0]
            
            self.X1 = np.array([pre_means[self.treated_unit], 
                              pre_early[self.treated_unit],
                              pre_late[self.treated_unit], pre_trend])
            
            self.X0 = np.vstack([
                pre_means[self.donor_pool].values,
                pre_early[self.donor_pool].values,
                pre_late[self.donor_pool].values,
                [np.polyfit(range(len(self.pre_period)), 
                           self.data.loc[self.pre_period, donor], 1)[0] 
                 for donor in self.donor_pool]
            ])
        else:
            # 使用提供的预测变量
            self.X1 = predictors.loc[self.treated_unit].values
            self.X0 = predictors.loc[self.donor_pool].values.T
    
    def _choose_V_matrix(self, method, custom_V):
        """选择V矩阵"""
        if method == 'equal':
            # 均等权重
            self.V_matrix = np.diag(np.ones(len(self.X1)) / len(self.X1))
        
        elif method == 'regression':
            # 基于回归的权重(预测变量对结果的影响)
            # 简化实现:使用预处理期结果与预测变量的相关性
            correlations = []
            for i in range(len(self.X1)):
                corr = np.corrcoef(self.X0[i], self.y_donor_pre.mean(axis=0))[0, 1]
                correlations.append(abs(corr) if not np.isnan(corr) else 0)
            
            weights = np.array(correlations)
            if weights.sum() > 0:
                weights = weights / weights.sum()
            else:
                weights = np.ones(len(self.X1)) / len(self.X1)
            
            self.V_matrix = np.diag(weights)
        
        elif method == 'custom' and custom_V is not None:
            self.V_matrix = custom_V
        else:
            raise ValueError("不支持的V矩阵选择方法")
    
    def _optimize_weights(self, options):
        """优化权重"""
        if options is None:
            options = {'method': 'SLSQP', 'options': {'ftol': 1e-9}}
        
        # 目标函数
        def objective(W):
            return np.sqrt((self.X1 - self.X0 @ W).T @ self.V_matrix @ (self.X1 - self.X0 @ W))
        
        # 约束:权重非负且和为1
        constraints = ({'type': 'eq', 'fun': lambda W: np.sum(W) - 1})
        bounds = [(0, 1) for _ in range(len(self.donor_pool))]
        
        # 初始值
        W0 = np.ones(len(self.donor_pool)) / len(self.donor_pool)
        
        # 优化
        result = minimize(objective, W0, method=options.get('method', 'SLSQP'),
                         bounds=bounds, constraints=constraints,
                         options=options.get('options', {}))
        
        if result.success:
            self.weights = result.x
            self.optimization_result = result
        else:
            raise ValueError(f"权重优化失败: {result.message}")
    
    def predict(self, period='all'):
        """预测合成控制结果"""
        if self.weights is None:
            raise ValueError("需要先拟合模型")
        
        if period == 'all':
            donor_data = self.data[self.donor_pool].values
            self.prediction = donor_data @ self.weights
        elif period == 'pre':
            self.prediction_pre = self.y_donor_pre @ self.weights
            return self.prediction_pre
        elif period == 'post':
            self.prediction_post = self.y_donor_post @ self.weights
            return self.prediction_post
        else:
            raise ValueError("period参数应为 'all', 'pre' 或 'post'")
    
    def calculate_effect(self):
        """计算处理效应"""
        if self.prediction is None:
            self.predict('all')
        
        treated = self.data[self.treated_unit].values
        self.effect = treated - self.prediction
        
        # 分离预处理期和处理后期效应
        pre_idx = len(self.pre_period)
        self.effect_pre = self.effect[:pre_idx]
        self.effect_post = self.effect[pre_idx:]
        
        return self.effect
    
    def inference_placebo(self, n_permutations=1000):
        """Placebo检验:置换检验"""
        placebo_effects = []
        
        # 将处理单元也加入捐赠池进行Placebo检验
        all_units = [self.treated_unit] + self.donor_pool
        
        for unit in all_units:
            # 将当前单元作为假想的处理单元
            placebo_donor = [u for u in all_units if u != unit]
            
            try:
                # 拟合Placebo模型
                placebo_sc = AdvancedSyntheticControl(unit, placebo_donor, self.time_index)
                placebo_sc.fit(self.data, self.treatment_time, 
                              V_method='equal', optimizer_options={'method': 'SLSQP'})
                
                placebo_effect = placebo_sc.calculate_effect()
                placebo_post_effect = placebo_sc.effect_post
                
                if len(placebo_post_effect) > 0:
                    placebo_effects.append({
                        'unit': unit,
                        'effect': np.mean(placebo_post_effect),
                        'rmse_pre': np.sqrt(np.mean(placebo_sc.effect_pre**2))
                    })
            except:
                continue
        
        self.placebo_results = pd.DataFrame(placebo_effects)
        return self.placebo_results
    
    def plot_diagnostic(self, figsize=(15, 10)):
        """绘制诊断图"""
        fig = plt.figure(figsize=figsize)
        gs = GridSpec(3, 2, figure=fig)
        
        # 1. 趋势比较图
        ax1 = fig.add_subplot(gs[0, :])
        self._plot_trend_comparison(ax1)
        
        # 2. 处理效应图
        ax2 = fig.add_subplot(gs[1, 0])
        self._plot_effect(ax2)
        
        # 3. 权重分布图
        ax3 = fig.add_subplot(gs[1, 1])
        self._plot_weights(ax3)
        
        # 4. Placebo检验图
        if hasattr(self, 'placebo_results'):
            ax4 = fig.add_subplot(gs[2, 0])
            self._plot_placebo(ax4)
        
        # 5. 预测变量平衡图
        ax5 = fig.add_subplot(gs[2, 1])
        self._plot_predictor_balance(ax5)
        
        plt.tight_layout()
        plt.show()
    
    def _plot_trend_comparison(self, ax):
        """绘制趋势比较图"""
        time_idx = range(len(self.data))
        
        ax.plot(time_idx, self.data[self.treated_unit].values, 
               'b-', linewidth=2, label=f'处理单元 ({self.treated_unit})')
        ax.plot(time_idx, self.prediction, 'r--', linewidth=2, label='合成控制组')
        
        # 标记处理时间
        treatment_idx = len(self.pre_period)
        ax.axvline(x=treatment_idx, color='gray', linestyle='-', alpha=0.7)
        ax.text(treatment_idx, ax.get_ylim()[0], '政策实施', 
               ha='center', va='bottom', rotation=90)
        
        ax.set_xlabel('时间')
        ax.set_ylabel('结果变量')
        ax.set_title('处理单元 vs 合成控制组')
        ax.legend()
        ax.grid(True, alpha=0.3)
    
    def _plot_effect(self, ax):
        """绘制处理效应图"""
        time_idx = range(len(self.effect))
        
        ax.bar(time_idx, self.effect, alpha=0.7, color='orange')
        ax.axhline(y=0, color='gray', linestyle='-')
        ax.axvline(x=len(self.pre_period), color='gray', linestyle='-')
        
        ax.set_xlabel('时间')
        ax.set_ylabel('处理效应')
        ax.set_title('处理效应随时间变化')
        ax.grid(True, alpha=0.3)
    
    def _plot_weights(self, ax):
        """绘制权重分布图"""
        # 只显示权重大于0.01的单元
        significant_weights = [(i, w) for i, w in enumerate(self.weights) if w > 0.01]
        if not significant_weights:
            significant_weights = [(i, w) for i, w in enumerate(self.weights) if w > 0]
        
        indices, weights = zip(*significant_weights)
        units = [self.donor_pool[i] for i in indices]
        
        ax.barh(units, weights, color='skyblue')
        ax.set_xlabel('权重')
        ax.set_title('合成控制组权重分布')
        ax.grid(True, alpha=0.3)
    
    def _plot_placebo(self, ax):
        """绘制Placebo检验图"""
        if not hasattr(self, 'placebo_results'):
            return
        
        # 真实处理效应的位置
        real_effect = np.mean(self.effect_post)
        
        ax.hist(self.placebo_results['effect'], bins=20, alpha=0.7, color='lightblue')
        ax.axvline(x=real_effect, color='red', linestyle='--', linewidth=2, 
                  label='真实处理效应')
        ax.set_xlabel('Placebo处理效应')
        ax.set_ylabel('频数')
        ax.set_title('Placebo检验')
        ax.legend()
        ax.grid(True, alpha=0.3)
    
    def _plot_predictor_balance(self, ax):
        """绘制预测变量平衡图"""
        # 计算处理单元和合成控制组的预测变量值
        treated_predictors = self.X1
        synthetic_predictors = self.X0 @ self.weights
        
        predictors = [f'Predictor {i+1}' for i in range(len(treated_predictors))]
        
        x = np.arange(len(predictors))
        width = 0.35
        
        ax.bar(x - width/2, treated_predictors, width, label='处理单元', alpha=0.7)
        ax.bar(x + width/2, synthetic_predictors, width, label='合成控制组', alpha=0.7)
        
        ax.set_xlabel('预测变量')
        ax.set_ylabel('值')
        ax.set_title('预测变量平衡性')
        ax.set_xticks(x)
        ax.set_xticklabels(predictors, rotation=45)
        ax.legend()
        ax.grid(True, alpha=0.3)

# 使用示例
def example_advanced_scm():
    """高级合成控制法示例"""
    # 生成模拟数据
    np.random.seed(42)
    n_time = 30
    n_units = 20
    
    # 创建时间索引
    time_index = pd.date_range('1990-01-01', periods=n_time, freq='Y')
    
    # 生成单元名称
    units = [f'Unit_{i}' for i in range(n_units)]
    treated_unit = 'Unit_0'
    donor_pool = units[1:]
    
    # 生成数据
    data = pd.DataFrame(index=time_index, columns=units)
    
    # 共同趋势
    common_trend = np.linspace(100, 120, n_time)
    
    for i, unit in enumerate(units):
        # 单元特定特征
        unit_effect = np.random.normal(0, 5)
        unit_trend = common_trend + unit_effect + np.random.normal(0, 2, n_time)
        
        # 对处理单元添加处理效应
        if unit == treated_unit:
            treatment_time = '2000-01-01'
            treatment_idx = time_index.get_loc(treatment_time)
            unit_trend[treatment_idx:] += np.linspace(-5, -15, n_time - treatment_idx)
        
        data[unit] = unit_trend
    
    # 应用高级合成控制法
    scm = AdvancedSyntheticControl(treated_unit, donor_pool, time_index)
    scm.fit(data, treatment_time, V_method='equal')
    
    # 计算效应
    effect = scm.calculate_effect()
    
    # Placebo检验
    placebo_results = scm.inference_placebo(n_permutations=100)
    
    # 绘制诊断图
    scm.plot_diagnostic()
    
    # 计算统计显著性
    real_effect = np.mean(scm.effect_post)
    placebo_effects = placebo_results['effect'].values
    p_value = np.mean(np.abs(placebo_effects) >= np.abs(real_effect))
    
    print(f"真实处理效应: {real_effect:.3f}")
    print(f"Placebo检验p值: {p_value:.3f}")
    print(f"预处理期RMSE: {np.sqrt(np.mean(scm.effect_pre**2)):.3f}")
    
    return scm, effect, placebo_results

# 运行示例
scm_advanced, effect_advanced, placebo_results = example_advanced_scm()

部署为Python包

我们将合成控制法实现打包为可安装的Python包:

项目结构:

代码语言:shell
复制
synthetic-control/
│
├── __init__.py
├── core/
│   ├── __init__.py
│   ├── scm.py              # 主要合成控制法实现
│   ├── inference.py        # 统计推断方法
│   └── diagnostics.py      # 诊断工具
├── utils/
│   ├── __init__.py
│   ├── data_utils.py       # 数据工具
│   └── visualization.py    # 可视化工具
├── examples/
│   ├── __init__.py
│   ├── california_tobacco.py  # 加州烟草案例
│   └── german_reunification.py # 德国统一案例
└── tests/
    ├── __init__.py
    └── test_scm.py         # 单元测试

setup.py配置:

代码语言:python
复制
from setuptools import setup, find_packages

setup(
    name="synthetic-control",
    version="0.1.0",
    description="A comprehensive Python implementation of Synthetic Control Method",
    author="Your Name",
    author_email="your.email@example.com",
    packages=find_packages(),
    install_requires=[
        "numpy>=1.20.0",
        "pandas>=1.3.0",
        "scipy>=1.7.0",
        "matplotlib>=3.5.0",
        "seaborn>=0.11.0",
    ],
    python_requires=">=3.8",
)

使用示例:

代码语言:python
复制
from synthetic_control import AdvancedSyntheticControl
from synthetic_control.utils.visualization import plot_scm_diagnostics

# 加载数据
data = pd.read_csv('policy_data.csv', index_col=0)

# 创建合成控制模型
scm = AdvancedSyntheticControl(
    treated_unit='California',
    donor_pool=['New York', 'Texas', 'Florida', 'Illinois', 'Ohio'],
    time_index=data.index
)

# 拟合模型
scm.fit(data, treatment_time='2005-01-01', V_method='regression')

# 进行统计推断
placebo_results = scm.inference_placebo(n_permutations=1000)

# 生成诊断报告
plot_scm_diagnostics(scm, save_path='scm_diagnostic.png')

通过这个完整的实现,研究人员可以方便地应用合成控制法进行政策评估和因果推断,同时获得全面的诊断和统计推断结果。


VI. 合成控制法的统计推断

合成控制法的统计推断具有挑战性,因为传统的渐近理论在此不适用(处理单元只有一个)。本节将介绍合成控制法中常用的统计推断方法,包括Placebo检验、排序检验和交叉验证等。

Placebo检验

Placebo检验(也称为置换检验)是合成控制法中最常用的推断方法。其基本思想是将处理状态随机分配给捐赠池中的其他单元,观察"虚假"处理效应的分布。

代码语言:python
复制
def placebo_test(scm, data, treatment_time, n_permutations=1000):
    """
    执行Placebo检验
    
    参数:
    scm: 合成控制模型实例
    data: 面板数据
    treatment_time: 处理时间
    n_permutations: 置换次数
    
    返回:
    placebo_distribution: Placebo效应分布
    p_value: 统计显著性p值
    """
    # 真实处理效应
    real_effect = np.mean(scm.effect_post)
    
    # 所有单元(包括处理单元)
    all_units = [scm.treated_unit] + scm.donor_pool
    
    placebo_effects = []
    placebo_rmspe = []  # 预处理期均方预测误差
    
    for _ in range(n_permutations):
        # 随机选择一个单元作为假想的处理单元
        placebo_treated = np.random.choice(all_units)
        placebo_donor = [u for u in all_units if u != placebo_treated]
        
        try:
            # 拟合Placebo模型
            placebo_model = AdvancedSyntheticControl(placebo_treated, placebo_donor)
            placebo_model.fit(data, treatment_time, V_method='equal')
            placebo_effect = placebo_model.calculate_effect()
            
            # 只保留成功的拟合
            if len(placebo_model.effect_post) > 0:
                placebo_post_effect = np.mean(placebo_model.effect_post)
                placebo_pre_rmse = np.sqrt(np.mean(placebo_model.effect_pre**2))
                
                placebo_effects.append(placebo_post_effect)
                placebo_rmspe.append(placebo_pre_rmse)
                
        except:
            continue
    
    # 计算p值(双边检验)
    extreme_count = sum(np.abs(placebo_effects) >= np.abs(real_effect))
    p_value = extreme_count / len(placebo_effects) if len(placebo_effects) > 0 else 1.0
    
    return {
        'real_effect': real_effect,
        'placebo_effects': np.array(placebo_effects),
        'placebo_rmspe': np.array(placebo_rmspe),
        'p_value': p_value,
        'n_successful': len(placebo_effects)
    }

def plot_placebo_results(placebo_results, figsize=(12, 5)):
    """绘制Placebo检验结果"""
    fig, (ax1, ax2) = plt.subplots(1, 2, figsize=figsize)
    
    # Placebo效应分布
    ax1.hist(placebo_results['placebo_effects'], bins=20, alpha=0.7, color='lightblue')
    ax1.axvline(x=placebo_results['real_effect'], color='red', linestyle='--', 
                linewidth=2, label=f'真实效应: {placebo_results["real_effect"]:.3f}')
    ax1.set_xlabel('Placebo处理效应')
    ax1.set_ylabel('频数')
    ax1.set_title('Placebo效应分布')
    ax1.legend()
    ax1.grid(True, alpha=0.3)
    
    # 效应 vs 预处理期拟合质量
    ax2.scatter(placebo_results['placebo_rmspe'], placebo_results['placebo_effects'], 
               alpha=0.6, color='blue')
    ax2.axhline(y=0, color='gray', linestyle='-', alpha=0.7)
    ax2.set_xlabel('预处理期RMSPE')
    ax2.set_ylabel('Placebo处理效应')
    ax2.set_title('效应大小 vs 拟合质量')
    ax2.grid(True, alpha=0.3)
    
    plt.tight_layout()
    plt.show()
    
    print(f"真实处理效应: {placebo_results['real_effect']:.3f}")
    print(f"Placebo检验p值: {placebo_results['p_value']:.3f}")
    print(f"成功Placebo检验次数: {placebo_results['n_successful']}")

# 使用示例
placebo_results = placebo_test(scm_advanced, data, '2000-01-01', n_permutations=500)
plot_placebo_results(placebo_results)

排序检验(Rank Test)

排序检验通过比较真实处理效应在Placebo效应分布中的位置来评估统计显著性:

代码语言:python
复制
def rank_test(placebo_results, alternative='two-sided'):
    """
    排序检验
    
    参数:
    placebo_results: Placebo检验结果
    alternative: 检验类型 ('two-sided', 'less', 'greater')
    
    返回:
    rank: 真实效应在分布中的排名
    p_value: 排序检验p值
    """
    real_effect = placebo_results['real_effect']
    placebo_effects = placebo_results['placebo_effects']
    
    if len(placebo_effects) == 0:
        return None, 1.0
    
    # 合并真实效应和Placebo效应
    all_effects = np.concatenate([[real_effect], placebo_effects])
    
    # 计算排名(从大到小)
    ranks = len(all_effects) - np.argsort(np.argsort(np.abs(all_effects))) if alternative == 'two-sided' else \
            len(all_effects) - np.argsort(np.argsort(all_effects))
    
    real_rank = ranks[0]  # 真实效应的排名
    
    # 计算p值
    if alternative == 'two-sided':
        p_value = (real_rank) / len(all_effects)
    elif alternative == 'greater':
        p_value = (len(all_effects) - real_rank + 1) / len(all_effects)
    elif alternative == 'less':
        p_value = real_rank / len(all_effects)
    else:
        raise ValueError("alternative参数应为 'two-sided', 'less' 或 'greater'")
    
    return real_rank, p_value

# 使用示例
rank, p_value_rank = rank_test(placebo_results, alternative='two-sided')
print(f"真实效应排名: {rank}/{len(placebo_results['placebo_effects']) + 1}")
print(f"排序检验p值: {p_value_rank:.3f}")

拟合优度评估

预处理期的拟合优度是评估合成控制法有效性的重要指标:

代码语言:python
复制
def evaluate_fit_quality(scm, threshold=0.1):
    """
    评估拟合优度
    
    参数:
    scm: 合成控制模型
    threshold: RMSPE阈值
    
    返回:
    fit_metrics: 拟合优度指标
    """
    # 计算预处理期均方预测误差(RMSPE)
    rmspe_pre = np.sqrt(np.mean(scm.effect_pre**2))
    
    # 计算R²
    ss_total = np.sum((scm.y_treated_pre - np.mean(scm.y_treated_pre))**2)
    ss_residual = np.sum(scm.effect_pre**2)
    r_squared = 1 - (ss_residual / ss_total) if ss_total > 0 else 0
    
    # 计算平均绝对误差(MAE)
    mae_pre = np.mean(np.abs(scm.effect_pre))
    
    # 标准化处理效应(相对于预处理期波动)
    pre_std = np.std(scm.y_treated_pre)
    standardized_effect = np.mean(scm.effect_post) / pre_std if pre_std > 0 else 0
    
    fit_metrics = {
        'rmspe_pre': rmspe_pre,
        'r_squared': r_squared,
        'mae_pre': mae_pre,
        'standardized_effect': standardized_effect,
        'good_fit': rmspe_pre < threshold
    }
    
    return fit_metrics

# 使用示例
fit_metrics = evaluate_fit_quality(scm_advanced)
print("拟合优度评估:")
for metric, value in fit_metrics.items():
    if metric != 'good_fit':
        print(f"{metric}: {value:.3f}")
print(f"拟合质量良好: {fit_metrics['good_fit']}")

敏感性分析

敏感性分析评估结论对模型设定的稳健性:

代码语言:python
复制
def sensitivity_analysis(scm, data, treatment_time, variations):
    """
    敏感性分析
    
    参数:
    scm: 基础模型
    data: 数据
    treatment_time: 处理时间
    variations: 敏感性分析变体列表
    
    返回:
    sensitivity_results: 敏感性分析结果
    """
    sensitivity_results = []
    
    base_effect = np.mean(scm.effect_post)
    
    for variation in variations:
        try:
            # 创建变体模型
            if variation['type'] == 'donor_pool':
                # 改变捐赠池
                donor_pool_var = variation['donor_pool']
                scm_var = AdvancedSyntheticControl(scm.treated_unit, donor_pool_var)
                scm_var.fit(data, treatment_time, V_method='equal')
                
            elif variation['type'] == 'V_matrix':
                # 改变V矩阵
                scm_var = AdvancedSyntheticControl(scm.treated_unit, scm.donor_pool)
                scm_var.fit(data, treatment_time, V_method=variation['method'])
                
            elif variation['type'] == 'predictors':
                # 改变预测变量
                scm_var = AdvancedSyntheticControl(scm.treated_unit, scm.donor_pool)
                # 这里需要实现特定的预测变量选择
                pass
                
            else:
                continue
            
            effect_var = np.mean(scm_var.effect_post)
            rmspe_pre_var = np.sqrt(np.mean(scm_var.effect_pre**2))
            
            sensitivity_results.append({
                'variation': variation['name'],
                'effect': effect_var,
                'rmspe_pre': rmspe_pre_var,
                'difference': effect_var - base_effect,
                'relative_diff': (effect_var - base_effect) / abs(base_effect) if base_effect != 0 else 0
            })
            
        except Exception as e:
            print(f"变体 {variation['name']} 失败: {str(e)}")
            continue
    
    return pd.DataFrame(sensitivity_results)

# 敏感性分析示例
variations = [
    {'type': 'V_matrix', 'method': 'equal', 'name': '均等权重V矩阵'},
    {'type': 'V_matrix', 'method': 'regression', 'name': '回归权重V矩阵'},
    {'type': 'donor_pool', 'donor_pool': scm_advanced.donor_pool[:10], 'name': '前10个捐赠单元'},
    {'type': 'donor_pool', 'donor_pool': scm_advanced.donor_pool[5:15], 'name': '中间10个捐赠单元'}
]

sensitivity_results = sensitivity_analysis(scm_advanced, data, '2000-01-01', variations)
print("敏感性分析结果:")
print(sensitivity_results.round(4))

通过综合使用这些统计推断方法,研究人员可以更全面地评估合成控制法结果的可靠性和稳健性,为政策评估提供更科学的依据。


VII. 合成控制法的扩展与变体

基础合成控制法有许多扩展和变体,这些方法针对SCM的不同局限性提供了改进方案。本节将介绍几种重要的SCM扩展方法。

1. 广义合成控制法

广义合成控制法(Generalized Synthetic Control)允许处理多个处理单元和多个处理期,扩展了SCM的应用范围。

代码语言:python
复制
class GeneralizedSyntheticControl:
    def __init__(self):
        self.weights = None
        self.unit_effects = None
        self.time_effects = None
        
    def fit(self, data, treatment_mask, factors=2, max_iter=100):
        """
        拟合广义合成控制模型
        
        参数:
        data: 面板数据 (时间 × 单元)
        treatment_mask: 处理指示矩阵 (时间 × 单元)
        factors: 潜在因子数量
        max_iter: 最大迭代次数
        """
        from sklearn.decomposition import NMF
        import numpy as np
        
        self.data = data.values
        self.treatment_mask = treatment_mask.values
        self.n_time, self.n_units = self.data.shape
        
        # 初始化因子和权重
        np.random.seed(42)
        self.unit_effects = np.random.normal(0, 1, (self.n_units, factors))
        self.time_effects = np.random.normal(0, 1, (self.n_time, factors))
        
        # 迭代优化(简化实现)
        for iteration in range(max_iter):
            # 更新单元效应(固定时间效应)
            for i in range(self.n_units):
                # 只使用未处理期的数据
                control_periods = ~self.treatment_mask[:, i]
                if np.sum(control_periods) > 0:
                    X = self.time_effects[control_periods]
                    y = self.data[control_periods, i]
                    self.unit_effects[i] = np.linalg.lstsq(X, y, rcond=None)[0]
            
            # 更新时间效应(固定单元效应)
            for t in range(self.n_time):
                # 只使用未处理单元的数据
                control_units = ~self.treatment_mask[t, :]
                if np.sum(control_units) > 0:
                    X = self.unit_effects[control_units]
                    y = self.data[t, control_units]
                    self.time_effects[t] = np.linalg.lstsq(X, y, rcond=None)[0]
        
        # 预测反事实结果
        self.predicted = self.time_effects @ self.unit_effects.T
        self.effect = self.data - self.predicted
        
        return self
    
    def calculate_effect(self, units=None, periods=None):
        """计算处理效应"""
        if units is None and periods is None:
            return self.effect
        
        effect_subset = self.effect.copy()
        if units is not None:
            effect_subset = effect_subset[:, units]
        if periods is not None:
            effect_subset = effect_subset[periods, :]
        
        return effect_subset

# 使用示例(简化)
def example_gscm():
    """广义合成控制法示例"""
    np.random.seed(42)
    n_time, n_units = 30, 15
    
    # 生成数据
    data = np.random.normal(0, 1, (n_time, n_units))
    
    # 添加共同趋势和个体效应
    common_trend = np.linspace(0, 2, n_time)
    for i in range(n_units):
        data[:, i] += common_trend + np.random.normal(0, 0.5)
    
    # 创建处理掩码(部分单元在后期接受处理)
    treatment_mask = np.zeros((n_time, n_units), dtype=bool)
    treatment_mask[20:, :5] = True  # 最后5个单元在后期接受处理
    
    # 添加处理效应
    data[treatment_mask] += -1.5  # 处理效应为-1.5
    
    # 应用广义合成控制法
    gscm = GeneralizedSyntheticControl()
    gscm.fit(pd.DataFrame(data), pd.DataFrame(treatment_mask), factors=2)
    
    # 计算平均处理效应
    effect = gscm.calculate_effect()
    avg_effect = np.mean(effect[treatment_mask])
    
    print(f"估计的平均处理效应: {avg_effect:.3f}")
    print(f"真实处理效应: -1.500")
    
    return gscm, effect

gscm, effect_gscm = example_gscm()

2. 矩阵补全方法

矩阵补全方法将合成控制法视为矩阵补全问题,使用核范数最小化等技术:

代码语言:python
复制
class MatrixCompletionSCM:
    def __init__(self, lambda_param=1.0):
        self.lambda_param = lambda_param
        self.completed_matrix = None
        
    def fit(self, data, treatment_mask, max_iter=1000, tol=1e-6):
        """
        矩阵补全合成控制法
        
        参数:
        data: 面板数据
        treatment_mask: 处理指示矩阵
        lambda_param: 正则化参数
        max_iter: 最大迭代次数
        tol: 收敛容忍度
        """
        from scipy.linalg import svd
        import numpy as np
        
        Y = data.values.copy()
        M = treatment_mask.values  # 1表示缺失(处理期),0表示观测
        
        # 初始化
        X = Y.copy()
        X[M] = 0  # 初始时用0填充缺失值
        
        for iteration in range(max_iter):
            # SVD分解
            U, s, Vt = svd(X, full_matrices=False)
            
            # 软阈值收缩
            s_shrink = np.maximum(s - self.lambda_param, 0)
            
            # 更新矩阵
            X_new = U @ np.diag(s_shrink) @ Vt
            
            # 保持观测值不变
            X_new[~M] = Y[~M]
            
            # 检查收敛
            if np.linalg.norm(X_new - X) < tol:
                break
                
            X = X_new
        
        self.completed_matrix = X
        self.effect = Y - X
        
        return self

# 使用示例
def example_matrix_completion():
    """矩阵补全SCM示例"""
    np.random.seed(42)
    n_time, n_units = 25, 10
    
    # 生成低秩数据
    rank = 3
    U = np.random.normal(0, 1, (n_time, rank))
    V = np.random.normal(0, 1, (n_units, rank))
    data = U @ V.T + np.random.normal(0, 0.5, (n_time, n_units))
    
    # 创建处理
    treatment_mask = np.zeros((n_time, n_units), dtype=bool)
    treatment_mask[15:, 0] = True  # 第一个单元在后期接受处理
    data[treatment_mask] += -2.0  # 添加处理效应
    
    # 应用矩阵补全方法
    mc_scm = MatrixCompletionSCM(lambda_param=0.5)
    mc_scm.fit(pd.DataFrame(data), pd.DataFrame(treatment_mask))
    
    # 计算处理效应
    effect = mc_scm.effect
    avg_effect = np.mean(effect[treatment_mask])
    
    print(f"矩阵补全SCM估计效应: {avg_effect:.3f}")
    print(f"真实处理效应: -2.000")
    
    return mc_scm, effect

mc_scm, effect_mc = example_matrix_completion()

3. 贝叶斯合成控制法

贝叶斯方法为合成控制法提供概率解释和不确定性量化:

代码语言:python
复制
class BayesianSyntheticControl:
    def __init__(self, n_samples=1000, warmup=500):
        self.n_samples = n_samples
        self.warmup = warmup
        self.samples = None
        
    def fit(self, treated_pre, control_pre, treated_post, control_post):
        """
        贝叶斯合成控制法(简化实现)
        
        注意:完整实现需要使用PyMC3或Stan
        """
        import numpy as np
        from scipy.optimize import minimize
        
        # 简化实现:使用Bootstrap近似贝叶斯后验
        n_pre = len(treated_pre)
        n_post = len(treated_post)
        n_donor = control_pre.shape[1]
        
        # Bootstrap采样
        bootstrap_weights = []
        bootstrap_effects = []
        
        for _ in range(self.n_samples + self.warmup):
            # 对预处理期进行Bootstrap重采样
            indices = np.random.choice(n_pre, n_pre, replace=True)
            treated_boot = treated_pre[indices]
            control_boot = control_pre[indices, :]
            
            try:
                # 估计权重
                def objective(W):
                    return np.mean((treated_boot - control_boot @ W)**2)
                
                constraints = ({'type': 'eq', 'fun': lambda W: np.sum(W) - 1})
                bounds = [(0, 1) for _ in range(n_donor)]
                W0 = np.ones(n_donor) / n_donor
                
                result = minimize(objective, W0, method='SLSQP', 
                                 bounds=bounds, constraints=constraints)
                
                if result.success:
                    weights = result.x
                    # 预测处理后期
                    synthetic_post = control_post @ weights
                    effect = treated_post - synthetic_post
                    
                    bootstrap_weights.append(weights)
                    bootstrap_effects.append(effect)
                    
            except:
                continue
        
        # 丢弃warmup期
        self.samples = {
            'weights': np.array(bootstrap_weights[self.warmup:]),
            'effects': np.array(bootstrap_effects[self.warmup:])
        }
        
        return self
    
    def summary(self):
        """后验总结"""
        if self.samples is None:
            raise ValueError("需要先拟合模型")
        
        effect_samples = self.samples['effects']
        avg_effects = effect_samples.mean(axis=1)
        
        summary = {
            'mean_effect': np.mean(avg_effects),
            'effect_sd': np.std(avg_effects),
            'effect_ci_95': np.percentile(avg_effects, [2.5, 97.5]),
            'prob_positive': np.mean(avg_effects > 0),
            'prob_negative': np.mean(avg_effects < 0)
        }
        
        return summary

# 使用示例
def example_bayesian_scm():
    """贝叶斯SCM示例"""
    np.random.seed(42)
    
    # 生成数据
    n_pre, n_post, n_donor = 20, 10, 15
    treated_pre = np.cumsum(np.random.normal(0, 0.5, n_pre)) + 10
    control_pre = np.zeros((n_pre, n_donor))
    
    for j in range(n_donor):
        control_pre[:, j] = np.cumsum(np.random.normal(0, 0.5, n_pre)) + 10 + np.random.normal(0, 1)
    
    treated_post = np.cumsum(np.random.normal(0, 0.5, n_post)) + 10 - 1.5  # 处理效应-1.5
    control_post = np.zeros((n_post, n_donor))
    
    for j in range(n_donor):
        control_post[:, j] = np.cumsum(np.random.normal(0, 0.5, n_post)) + 10 + np.random.normal(0, 1)
    
    # 应用贝叶斯SCM
    bayesian_scm = BayesianSyntheticControl(n_samples=500, warmup=100)
    bayesian_scm.fit(treated_pre, control_pre, treated_post, control_post)
    
    summary = bayesian_scm.summary()
    
    print("贝叶斯合成控制法结果:")
    print(f"平均处理效应: {summary['mean_effect']:.3f}")
    print(f"95%置信区间: ({summary['effect_ci_95'][0]:.3f}, {summary['effect_ci_95'][1]:.3f})")
    print(f"效应为负的概率: {summary['prob_negative']:.3f}")
    
    return bayesian_scm, summary

bayesian_scm, summary_bayesian = example_bayesian_scm()

4. 异质性处理效应合成控制法

当处理效应可能随时间和单元特征变化时,可以使用异质性处理效应模型:

代码语言:python
复制
class HeterogeneousEffectSCM:
    def __init__(self):
        self.weights = None
        self.effect_model = None
        
    def fit(self, data, treatment_time, unit_covariates=None):
        """
        异质性处理效应合成控制法
        
        参数:
        data: 面板数据
        treatment_time: 处理时间
        unit_covariates: 单元特征,用于建模效应异质性
        """
        from sklearn.linear_model import LinearRegression
        import numpy as np
        
        self.data = data
        self.treatment_time = treatment_time
        self.pre_period = data.index[data.index < treatment_time]
        self.post_period = data.index[data.index >= treatment_time]
        
        # 第一步:标准合成控制法
        treated_unit = data.columns[0]  # 假设第一列是处理单元
        donor_pool = data.columns[1:]
        
        scm = AdvancedSyntheticControl(treated_unit, donor_pool)
        scm.fit(data, treatment_time)
        scm_effect = scm.calculate_effect()
        
        # 第二步:建模效应异质性(如果有单元特征)
        if unit_covariates is not None and len(self.post_period) > 1:
            # 使用处理后期效应与单元特征的关系
            post_effects = scm_effect[len(self.pre_period):]
            
            # 平均处理效应作为基线
            avg_effect = np.mean(post_effects)
            
            # 如果有多个处理后期,可以建模效应随时间的变化
            if len(post_effects) > 2:
                time_since_treatment = np.arange(len(post_effects))
                
                # 简单线性趋势模型
                effect_trend = LinearRegression()
                effect_trend.fit(time_since_treatment.reshape(-1, 1), post_effects)
                
                self.effect_model = {
                    'type': 'linear_trend',
                    'intercept': effect_trend.intercept_,
                    'slope': effect_trend.coef_[0],
                    'avg_effect': avg_effect
                }
            else:
                self.effect_model = {
                    'type': 'constant',
                    'avg_effect': avg_effect
                }
        else:
            self.effect_model = {
                'type': 'constant',
                'avg_effect': np.mean(scm_effect[len(self.pre_period):])
            }
        
        self.weights = scm.weights
        self.effect = scm_effect
        
        return self
    
    def predict_effect(self, time_points=None, unit_features=None):
        """预测处理效应"""
        if self.effect_model is None:
            raise ValueError("需要先拟合模型")
        
        if self.effect_model['type'] == 'constant':
            return np.full_like(time_points, self.effect_model['avg_effect'], dtype=float)
        
        elif self.effect_model['type'] == 'linear_trend':
            effects = (self.effect_model['intercept'] + 
                      self.effect_model['slope'] * time_points)
            return effects
        
        else:
            raise ValueError("不支持的效应模型类型")

# 使用示例
def example_heterogeneous_scm():
    """异质性效应SCM示例"""
    np.random.seed(42)
    n_time, n_units = 30, 10
    
    # 生成数据
    time_index = pd.date_range('2000-01-01', periods=n_time, freq='Y')
    data = pd.DataFrame(index=time_index)
    
    # 生成单元数据
    for i in range(n_units):
        trend = np.linspace(100, 120, n_time) + np.random.normal(0, 5, n_time)
        data[f'Unit_{i}'] = trend
    
    # 添加处理效应(随时间增强)
    treatment_time = '2010-01-01'
    treatment_idx = time_index.get_loc(treatment_time)
    
    # 处理效应随时间线性增长
    for i in range(treatment_idx, n_time):
        effect = -2 - 0.5 * (i - treatment_idx)  # 从-2开始,每期减少0.5
        data.iloc[i, 0] += effect
    
    # 应用异质性效应SCM
    hescm = HeterogeneousEffectSCM()
    hescm.fit(data, treatment_time)
    
    # 预测未来效应
    future_periods = 5
    future_time_points = np.arange(len(hescm.post_period), 
                                  len(hescm.post_period) + future_periods)
    future_effects = hescm.predict_effect(future_time_points)
    
    print("异质性处理效应SCM结果:")
    print(f"当前平均效应: {hescm.effect_model['avg_effect']:.3f}")
    print(f"预测未来效应: {future_effects}")
    
    return hescm, future_effects

hescm, future_effects = example_heterogeneous_scm()

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • I. 引言:单一处理单元的因果推断难题
    • 传统方法的局限性
    • 真实世界中的单一处理单元问题
    • 合成控制法的诞生
  • II. 合成控制法的基本思想
    • 合成控制法的直观理解
    • 合成控制法的优势
    • 合成控制法的适用条件
  • III. 合成控制法的数学模型
    • 基本设定
    • 合成控制组的构建
    • 权重选择准则
    • 预测变量的选择
  • IV. 案例研究:加州烟草控制政策评估
    • 研究背景
    • 数据准备
    • 合成控制分析
    • 结果解释与政策含义
  • V. Python代码实现与部署
    • 完整的合成控制法实现
    • 部署为Python包
  • VI. 合成控制法的统计推断
    • Placebo检验
    • 排序检验(Rank Test)
    • 拟合优度评估
    • 敏感性分析
  • VII. 合成控制法的扩展与变体
    • 1. 广义合成控制法
    • 2. 矩阵补全方法
    • 3. 贝叶斯合成控制法
    • 4. 异质性处理效应合成控制法
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档