在现实世界的政策评估中,我们经常遇到这样的情况:一项政策只在某个地区、某个企业或某个特定群体中实施,而我们需要评估这项政策的效果。这种"单一处理单元"的问题给传统的因果推断方法带来了巨大挑战。
方法 | 在处理单一单元时的局限性 |
|---|---|
随机对照试验 | 需要将单元随机分配到处理组和对照组,不适用于单一处理单元 |
双重差分法 | 需要多个处理单元和多个控制单元,满足平行趋势假设 |
断点回归 | 需要明确的断点和处理变量连续性,适用场景有限 |
匹配方法 | 需要多个处理单元寻找匹配对象 |
在实际政策评估中,单一处理单元的情况比比皆是:
地区性政策:某个州或国家实施独特政策(如加州的烟草控制法案)
企业决策:某家公司实施特定战略(如特斯拉的电动汽车策略)
社会运动:某个社区发起的社会改革运动
国际事件:某个国家经历的特殊事件(如东日本大地震后的核政策变化)
合成控制法由Abadie和Gardeazabal(2003)提出,最初用于评估西班牙巴斯克地区恐怖主义活动对经济增长的影响。随后,Abadie、Diamond和Hainmueller(2010)将其发展为成熟的因果推断方法,应用于加州烟草控制政策的评估。

合成控制法的核心思想非常直观且巧妙:当没有一个完美的对照组时,我们可以通过加权组合多个控制单元来构建一个"合成控制组",这个合成控制组在处理前的特征和趋势上与处理单元尽可能相似。
想象一下,我们要评估加州某项政策的效果。理想的对照组应该是"没有实施该政策的加州",但这在现实中不存在。合成控制法的解决方案是:寻找其他州的加权组合,使得这个"合成加州"在政策实施前的经济、人口、社会特征等方面与真实加州高度相似。
与传统方法相比,合成控制法具有独特优势:
特征 | 合成控制法 | 传统方法 |
|---|---|---|
处理单元数量 | 可处理单个处理单元 | 需要多个处理单元 |
对照组构建 | 数据驱动,透明 | 常常主观选择 |
趋势匹配 | 强调处理前趋势相似性 | 可能忽略趋势匹配 |
外推问题 | 限制在捐赠池范围内 | 可能过度外推 |
合成控制法并非万能钥匙,它需要满足一定的条件:

合成控制法建立在严谨的数学框架之上,通过优化算法寻找最优的权重组合。本节将深入探讨合成控制法的数学模型和估计过程。
假设我们有J+1个单元,其中第一个单元(j=1)是处理单元,其余J个单元(j=2,...,J+1)是控制单元。观察期分为预处理期(t=1,...,T₀)和处理后期(t=T₀+1,...,T)。
令:
则观察结果可表示为:
Y_{jt} = Y_{jt}^N + \alpha_{jt}D_{jt}
其中\alpha_{jt} = Y_{jt}^I - Y_{jt}^N 是处理效应。
合成控制组是控制单元的加权组合,权重向量W = (w_2, ..., w_{J+1}) 满足:
合成控制组在时期t的结果为:
Y_{1t}^N = \sum_{j=2}^{J+1} w_j Y_{jt}
权重的选择基于预处理期的匹配质量。令X_1 是处理单元的预测变量向量,X_0 是控制单元的预测变量矩阵(每列对应一个控制单元)。我们选择权重W使得:
\min_W \|X_1 - X_0W\|_V = \sqrt{(X_1 - X_0W)'V(X_1 - X_0W)}
其中V是一个对角矩阵,表示各预测变量的相对重要性。
预测变量通常包括两类:
import numpy as np
import pandas as pd
from scipy.optimize import minimize
import matplotlib.pyplot as plt
class SyntheticControl:
def __init__(self):
self.weights = None
self.v_matrix = None
self.synthetic_control = None
def fit(self, treated_pre, control_pre, predictors=None, V=None):
"""
拟合合成控制模型
参数:
treated_pre: 处理单元预处理期数据 (T0,)
control_pre: 控制单元预处理期数据 (T0, J)
predictors: 预测变量矩阵 (K, J+1),第一列是处理单元
V: 预测变量权重矩阵,如果为None则自动选择
"""
self.T0 = treated_pre.shape[0]
self.J = control_pre.shape[1]
# 如果没有提供预测变量,使用预处理期结果变量的均值
if predictors is None:
predictors = np.vstack([treated_pre.mean(), control_pre.mean(axis=0)]).T
predictors = np.vstack([predictors, treated_pre[-3:].mean(), control_pre[-3:,:].mean(axis=0)]).T
# 分离处理单元和控制单元的预测变量
X1 = predictors[:, 0] # 处理单元预测变量
X0 = predictors[:, 1:] # 控制单元预测变量
# 如果没有提供V矩阵,使用均等权重或基于回归的权重
if V is None:
# 简单方法:使用均等权重
V = np.diag(np.ones(len(X1)) / len(X1))
self.V = V
# 定义优化问题
def objective(W):
return np.sqrt((X1 - X0 @ W).T @ V @ (X1 - X0 @ W))
# 约束条件:权重非负且和为1
constraints = ({'type': 'eq', 'fun': lambda W: np.sum(W) - 1})
bounds = [(0, 1) for _ in range(self.J)]
# 初始值:均等权重
W0 = np.ones(self.J) / self.J
# 优化
result = minimize(objective, W0, method='SLSQP',
bounds=bounds, constraints=constraints)
if result.success:
self.weights = result.x
else:
raise ValueError("权重优化失败")
return self
def predict(self, control_post):
"""预测处理后的合成控制结果"""
if self.weights is None:
raise ValueError("需要先拟合模型")
self.synthetic_control = control_post @ self.weights
return self.synthetic_control
def calculate_effect(self, treated_post, control_post):
"""计算处理效应"""
synthetic_post = self.predict(control_post)
effect = treated_post - synthetic_post
return effect
def plot_results(self, treated_pre, treated_post, control_pre, control_post, event_time=None):
"""绘制合成控制结果"""
if event_time is None:
event_time = list(range(-len(treated_pre), len(treated_post)))
# 计算合成控制组在整个时期的轨迹
synthetic_pre = control_pre @ self.weights
synthetic_post = control_post @ self.weights
# 合并预处理期和处理后期
treated_full = np.concatenate([treated_pre, treated_post])
synthetic_full = np.concatenate([synthetic_pre, synthetic_post])
plt.figure(figsize=(12, 6))
plt.plot(event_time, treated_full, 'b-', linewidth=2, label='处理单元')
plt.plot(event_time, synthetic_full, 'r--', linewidth=2, label='合成控制组')
plt.axvline(x=0, color='gray', linestyle='-', alpha=0.7, label='政策实施')
plt.xlabel('时间(相对政策实施)')
plt.ylabel('结果变量')
plt.title('合成控制法分析结果')
plt.legend()
plt.grid(True, alpha=0.3)
plt.show()
# 绘制处理效应
effect = treated_full - synthetic_full
plt.figure(figsize=(12, 4))
plt.bar(event_time, effect, alpha=0.7, color='orange')
plt.axhline(y=0, color='gray', linestyle='-', alpha=0.7)
plt.axvline(x=0, color='gray', linestyle='-', alpha=0.7)
plt.xlabel('时间(相对政策实施)')
plt.ylabel('处理效应')
plt.title('处理效应随时间变化')
plt.grid(True, alpha=0.3)
plt.show()
return effect
# 示例使用
def example_synthetic_control():
"""合成控制法示例"""
np.random.seed(42)
# 生成模拟数据
T0 = 20 # 预处理期
T1 = 10 # 处理后期
J = 20 # 控制单元数量
# 生成共同趋势和个体固定效应
common_trend = np.linspace(0, 10, T0 + T1)
unit_effects = np.random.normal(0, 2, J + 1)
# 生成预处理期数据
control_pre = np.zeros((T0, J))
for j in range(J):
control_pre[:, j] = common_trend[:T0] + unit_effects[j+1] + np.random.normal(0, 0.5, T0)
treated_pre = common_trend[:T0] + unit_effects[0] + np.random.normal(0, 0.5, T0)
# 生成处理后期数据(真实效应为2)
control_post = np.zeros((T1, J))
for j in range(J):
control_post[:, j] = common_trend[T0:] + unit_effects[j+1] + np.random.normal(0, 0.5, T1)
treated_post = common_trend[T0:] + unit_effects[0] + 2 + np.random.normal(0, 0.5, T1) # 真实效应为2
# 应用合成控制法
scm = SyntheticControl()
scm.fit(treated_pre, control_pre)
effect = scm.calculate_effect(treated_post, control_post)
avg_effect = np.mean(effect)
print(f"估计的平均处理效应: {avg_effect:.3f}")
print(f"真实处理效应: 2.000")
print(f"估计偏差: {avg_effect - 2:.3f}")
# 绘制结果
event_time = list(range(-T0, T1))
scm.plot_results(treated_pre, treated_post, control_pre, control_post, event_time)
return scm, effect
# 运行示例
scm, effect = example_synthetic_control()
在本节中,我们将重现Abadie、Diamond和Hainmueller(2010)的经典研究,评估加州1988年通过的99号提案(烟草控制法案)对烟草消费的影响。这个案例是合成控制法最著名的应用之一。
1988年,加州通过了99号提案,大幅提高烟草税并将部分税收用于烟草控制项目。这项政策是美国历史上最严格的烟草控制措施之一。我们需要评估这项政策是否有效降低了加州的烟草消费量。
我们使用与原始研究类似的数据,包括:
处理单元:加利福尼亚州
控制单元:其他38个州(排除有类似政策的州)
结果变量:人均烟草消费量(包/人/年)
预测变量:
现在我们对加州的烟草控制政策进行评估:
def run_california_analysis():
"""运行加州烟草政策评估"""
# 准备数据
study = CaliforniaTobaccoStudy()
scm_data = study.prepare_scm_data()
# 应用合成控制法
scm = SyntheticControl()
# 使用特征矩阵作为预测变量
scm.fit(scm_data['treated_pre'],
scm_data['control_pre'],
predictors=np.vstack([scm_data['X1'], scm_data['X0']]).T)
# 计算处理效应
effect = scm.calculate_effect(scm_data['treated_post'],
scm_data['control_post'])
# 绘制结果
years = study.pre_period + study.post_period
event_time = [year - study.data['treatment_year'] for year in years]
effect_plot = scm.plot_results(scm_data['treated_pre'],
scm_data['treated_post'],
scm_data['control_pre'],
scm_data['control_post'],
event_time)
# 计算平均处理效应和累计效应
avg_effect = np.mean(effect)
cumulative_effect = np.sum(effect)
print("加州烟草控制政策评估结果:")
print(f"平均年度处理效应: {avg_effect:.2f} 包/人/年")
print(f"累计处理效应 (1989-2000): {cumulative_effect:.2f} 包/人")
print(f"相当于平均减少: {avg_effect/scm_data['treated_pre'][-1]*100:.1f}%")
# 显示权重分布
print("\n合成控制组权重分布:")
weights_df = pd.DataFrame({
'State': study.control_states,
'Weight': scm.weights
}).sort_values('Weight', ascending=False)
print(weights_df.head(10)) # 显示前10个最重要控制州
return scm, effect, weights_df
# 运行分析
scm_ca, effect_ca, weights_df = run_california_analysis()加州烟草控制政策评估的结果显示:
政策效果显著:99号提案实施后,加州的人均烟草消费量显著下降
效果持续增强:政策效果随时间推移而增强,显示长期影响
合成控制组质量高:预处理期加州与合成加州高度吻合,验证了方法的有效性
这一研究为烟草控制政策提供了有力证据,表明提高烟草税和资助控制项目可以有效减少烟草消费。

在本节中,我们将构建一个完整的、可重用的合成控制法Python包,包含数据预处理、模型拟合、效果评估和可视化等完整功能。
我们创建一个功能完整的SyntheticControl类,包含所有必要的方法:
import numpy as np
import pandas as pd
from scipy.optimize import minimize
import matplotlib.pyplot as plt
from matplotlib.gridspec import GridSpec
import seaborn as sns
class AdvancedSyntheticControl:
def __init__(self, treated_unit, donor_pool, time_index=None):
"""
初始化高级合成控制模型
参数:
treated_unit: 处理单元名称或标识
donor_pool: 捐赠池单元列表
time_index: 时间索引
"""
self.treated_unit = treated_unit
self.donor_pool = donor_pool
self.time_index = time_index
self.weights = None
self.V_matrix = None
self.prediction = None
self.effect = None
def fit(self, data, treatment_time, predictors=None,
V_method='equal', custom_V=None, optimizer_options=None):
"""
拟合合成控制模型
参数:
data: 面板数据,索引为时间,列为单元
treatment_time: 处理开始时间
predictors: 预测变量,如果为None则使用预处理期数据
V_method: V矩阵选择方法 ('equal', 'regression', 'custom')
custom_V: 自定义V矩阵
optimizer_options: 优化器选项
"""
self.data = data
self.treatment_time = treatment_time
self.pre_period = data.index[data.index < treatment_time]
self.post_period = data.index[data.index >= treatment_time]
if len(self.pre_period) == 0:
raise ValueError("预处理期数据不足")
# 准备数据
self._prepare_data(predictors)
# 选择V矩阵
self._choose_V_matrix(V_method, custom_V)
# 优化权重
self._optimize_weights(optimizer_options)
return self
def _prepare_data(self, predictors):
"""准备数据"""
# 处理单元数据
self.y_treated_pre = self.data.loc[self.pre_period, self.treated_unit].values
self.y_treated_post = self.data.loc[self.post_period, self.treated_unit].values
# 控制单元数据
self.y_donor_pre = self.data.loc[self.pre_period, self.donor_pool].values
self.y_donor_post = self.data.loc[self.post_period, self.donor_pool].values
# 预测变量
if predictors is None:
# 使用预处理期数据的各种统计量作为预测变量
pre_means = self.data.loc[self.pre_period].mean()
pre_early = self.data.loc[self.pre_period[:len(self.pre_period)//2]].mean()
pre_late = self.data.loc[self.pre_period[len(self.pre_period)//2:]].mean()
pre_trend = np.polyfit(range(len(self.pre_period)),
self.data.loc[self.pre_period, self.treated_unit], 1)[0]
self.X1 = np.array([pre_means[self.treated_unit],
pre_early[self.treated_unit],
pre_late[self.treated_unit], pre_trend])
self.X0 = np.vstack([
pre_means[self.donor_pool].values,
pre_early[self.donor_pool].values,
pre_late[self.donor_pool].values,
[np.polyfit(range(len(self.pre_period)),
self.data.loc[self.pre_period, donor], 1)[0]
for donor in self.donor_pool]
])
else:
# 使用提供的预测变量
self.X1 = predictors.loc[self.treated_unit].values
self.X0 = predictors.loc[self.donor_pool].values.T
def _choose_V_matrix(self, method, custom_V):
"""选择V矩阵"""
if method == 'equal':
# 均等权重
self.V_matrix = np.diag(np.ones(len(self.X1)) / len(self.X1))
elif method == 'regression':
# 基于回归的权重(预测变量对结果的影响)
# 简化实现:使用预处理期结果与预测变量的相关性
correlations = []
for i in range(len(self.X1)):
corr = np.corrcoef(self.X0[i], self.y_donor_pre.mean(axis=0))[0, 1]
correlations.append(abs(corr) if not np.isnan(corr) else 0)
weights = np.array(correlations)
if weights.sum() > 0:
weights = weights / weights.sum()
else:
weights = np.ones(len(self.X1)) / len(self.X1)
self.V_matrix = np.diag(weights)
elif method == 'custom' and custom_V is not None:
self.V_matrix = custom_V
else:
raise ValueError("不支持的V矩阵选择方法")
def _optimize_weights(self, options):
"""优化权重"""
if options is None:
options = {'method': 'SLSQP', 'options': {'ftol': 1e-9}}
# 目标函数
def objective(W):
return np.sqrt((self.X1 - self.X0 @ W).T @ self.V_matrix @ (self.X1 - self.X0 @ W))
# 约束:权重非负且和为1
constraints = ({'type': 'eq', 'fun': lambda W: np.sum(W) - 1})
bounds = [(0, 1) for _ in range(len(self.donor_pool))]
# 初始值
W0 = np.ones(len(self.donor_pool)) / len(self.donor_pool)
# 优化
result = minimize(objective, W0, method=options.get('method', 'SLSQP'),
bounds=bounds, constraints=constraints,
options=options.get('options', {}))
if result.success:
self.weights = result.x
self.optimization_result = result
else:
raise ValueError(f"权重优化失败: {result.message}")
def predict(self, period='all'):
"""预测合成控制结果"""
if self.weights is None:
raise ValueError("需要先拟合模型")
if period == 'all':
donor_data = self.data[self.donor_pool].values
self.prediction = donor_data @ self.weights
elif period == 'pre':
self.prediction_pre = self.y_donor_pre @ self.weights
return self.prediction_pre
elif period == 'post':
self.prediction_post = self.y_donor_post @ self.weights
return self.prediction_post
else:
raise ValueError("period参数应为 'all', 'pre' 或 'post'")
def calculate_effect(self):
"""计算处理效应"""
if self.prediction is None:
self.predict('all')
treated = self.data[self.treated_unit].values
self.effect = treated - self.prediction
# 分离预处理期和处理后期效应
pre_idx = len(self.pre_period)
self.effect_pre = self.effect[:pre_idx]
self.effect_post = self.effect[pre_idx:]
return self.effect
def inference_placebo(self, n_permutations=1000):
"""Placebo检验:置换检验"""
placebo_effects = []
# 将处理单元也加入捐赠池进行Placebo检验
all_units = [self.treated_unit] + self.donor_pool
for unit in all_units:
# 将当前单元作为假想的处理单元
placebo_donor = [u for u in all_units if u != unit]
try:
# 拟合Placebo模型
placebo_sc = AdvancedSyntheticControl(unit, placebo_donor, self.time_index)
placebo_sc.fit(self.data, self.treatment_time,
V_method='equal', optimizer_options={'method': 'SLSQP'})
placebo_effect = placebo_sc.calculate_effect()
placebo_post_effect = placebo_sc.effect_post
if len(placebo_post_effect) > 0:
placebo_effects.append({
'unit': unit,
'effect': np.mean(placebo_post_effect),
'rmse_pre': np.sqrt(np.mean(placebo_sc.effect_pre**2))
})
except:
continue
self.placebo_results = pd.DataFrame(placebo_effects)
return self.placebo_results
def plot_diagnostic(self, figsize=(15, 10)):
"""绘制诊断图"""
fig = plt.figure(figsize=figsize)
gs = GridSpec(3, 2, figure=fig)
# 1. 趋势比较图
ax1 = fig.add_subplot(gs[0, :])
self._plot_trend_comparison(ax1)
# 2. 处理效应图
ax2 = fig.add_subplot(gs[1, 0])
self._plot_effect(ax2)
# 3. 权重分布图
ax3 = fig.add_subplot(gs[1, 1])
self._plot_weights(ax3)
# 4. Placebo检验图
if hasattr(self, 'placebo_results'):
ax4 = fig.add_subplot(gs[2, 0])
self._plot_placebo(ax4)
# 5. 预测变量平衡图
ax5 = fig.add_subplot(gs[2, 1])
self._plot_predictor_balance(ax5)
plt.tight_layout()
plt.show()
def _plot_trend_comparison(self, ax):
"""绘制趋势比较图"""
time_idx = range(len(self.data))
ax.plot(time_idx, self.data[self.treated_unit].values,
'b-', linewidth=2, label=f'处理单元 ({self.treated_unit})')
ax.plot(time_idx, self.prediction, 'r--', linewidth=2, label='合成控制组')
# 标记处理时间
treatment_idx = len(self.pre_period)
ax.axvline(x=treatment_idx, color='gray', linestyle='-', alpha=0.7)
ax.text(treatment_idx, ax.get_ylim()[0], '政策实施',
ha='center', va='bottom', rotation=90)
ax.set_xlabel('时间')
ax.set_ylabel('结果变量')
ax.set_title('处理单元 vs 合成控制组')
ax.legend()
ax.grid(True, alpha=0.3)
def _plot_effect(self, ax):
"""绘制处理效应图"""
time_idx = range(len(self.effect))
ax.bar(time_idx, self.effect, alpha=0.7, color='orange')
ax.axhline(y=0, color='gray', linestyle='-')
ax.axvline(x=len(self.pre_period), color='gray', linestyle='-')
ax.set_xlabel('时间')
ax.set_ylabel('处理效应')
ax.set_title('处理效应随时间变化')
ax.grid(True, alpha=0.3)
def _plot_weights(self, ax):
"""绘制权重分布图"""
# 只显示权重大于0.01的单元
significant_weights = [(i, w) for i, w in enumerate(self.weights) if w > 0.01]
if not significant_weights:
significant_weights = [(i, w) for i, w in enumerate(self.weights) if w > 0]
indices, weights = zip(*significant_weights)
units = [self.donor_pool[i] for i in indices]
ax.barh(units, weights, color='skyblue')
ax.set_xlabel('权重')
ax.set_title('合成控制组权重分布')
ax.grid(True, alpha=0.3)
def _plot_placebo(self, ax):
"""绘制Placebo检验图"""
if not hasattr(self, 'placebo_results'):
return
# 真实处理效应的位置
real_effect = np.mean(self.effect_post)
ax.hist(self.placebo_results['effect'], bins=20, alpha=0.7, color='lightblue')
ax.axvline(x=real_effect, color='red', linestyle='--', linewidth=2,
label='真实处理效应')
ax.set_xlabel('Placebo处理效应')
ax.set_ylabel('频数')
ax.set_title('Placebo检验')
ax.legend()
ax.grid(True, alpha=0.3)
def _plot_predictor_balance(self, ax):
"""绘制预测变量平衡图"""
# 计算处理单元和合成控制组的预测变量值
treated_predictors = self.X1
synthetic_predictors = self.X0 @ self.weights
predictors = [f'Predictor {i+1}' for i in range(len(treated_predictors))]
x = np.arange(len(predictors))
width = 0.35
ax.bar(x - width/2, treated_predictors, width, label='处理单元', alpha=0.7)
ax.bar(x + width/2, synthetic_predictors, width, label='合成控制组', alpha=0.7)
ax.set_xlabel('预测变量')
ax.set_ylabel('值')
ax.set_title('预测变量平衡性')
ax.set_xticks(x)
ax.set_xticklabels(predictors, rotation=45)
ax.legend()
ax.grid(True, alpha=0.3)
# 使用示例
def example_advanced_scm():
"""高级合成控制法示例"""
# 生成模拟数据
np.random.seed(42)
n_time = 30
n_units = 20
# 创建时间索引
time_index = pd.date_range('1990-01-01', periods=n_time, freq='Y')
# 生成单元名称
units = [f'Unit_{i}' for i in range(n_units)]
treated_unit = 'Unit_0'
donor_pool = units[1:]
# 生成数据
data = pd.DataFrame(index=time_index, columns=units)
# 共同趋势
common_trend = np.linspace(100, 120, n_time)
for i, unit in enumerate(units):
# 单元特定特征
unit_effect = np.random.normal(0, 5)
unit_trend = common_trend + unit_effect + np.random.normal(0, 2, n_time)
# 对处理单元添加处理效应
if unit == treated_unit:
treatment_time = '2000-01-01'
treatment_idx = time_index.get_loc(treatment_time)
unit_trend[treatment_idx:] += np.linspace(-5, -15, n_time - treatment_idx)
data[unit] = unit_trend
# 应用高级合成控制法
scm = AdvancedSyntheticControl(treated_unit, donor_pool, time_index)
scm.fit(data, treatment_time, V_method='equal')
# 计算效应
effect = scm.calculate_effect()
# Placebo检验
placebo_results = scm.inference_placebo(n_permutations=100)
# 绘制诊断图
scm.plot_diagnostic()
# 计算统计显著性
real_effect = np.mean(scm.effect_post)
placebo_effects = placebo_results['effect'].values
p_value = np.mean(np.abs(placebo_effects) >= np.abs(real_effect))
print(f"真实处理效应: {real_effect:.3f}")
print(f"Placebo检验p值: {p_value:.3f}")
print(f"预处理期RMSE: {np.sqrt(np.mean(scm.effect_pre**2)):.3f}")
return scm, effect, placebo_results
# 运行示例
scm_advanced, effect_advanced, placebo_results = example_advanced_scm()我们将合成控制法实现打包为可安装的Python包:
项目结构:
synthetic-control/
│
├── __init__.py
├── core/
│ ├── __init__.py
│ ├── scm.py # 主要合成控制法实现
│ ├── inference.py # 统计推断方法
│ └── diagnostics.py # 诊断工具
├── utils/
│ ├── __init__.py
│ ├── data_utils.py # 数据工具
│ └── visualization.py # 可视化工具
├── examples/
│ ├── __init__.py
│ ├── california_tobacco.py # 加州烟草案例
│ └── german_reunification.py # 德国统一案例
└── tests/
├── __init__.py
└── test_scm.py # 单元测试setup.py配置:
from setuptools import setup, find_packages
setup(
name="synthetic-control",
version="0.1.0",
description="A comprehensive Python implementation of Synthetic Control Method",
author="Your Name",
author_email="your.email@example.com",
packages=find_packages(),
install_requires=[
"numpy>=1.20.0",
"pandas>=1.3.0",
"scipy>=1.7.0",
"matplotlib>=3.5.0",
"seaborn>=0.11.0",
],
python_requires=">=3.8",
)使用示例:
from synthetic_control import AdvancedSyntheticControl
from synthetic_control.utils.visualization import plot_scm_diagnostics
# 加载数据
data = pd.read_csv('policy_data.csv', index_col=0)
# 创建合成控制模型
scm = AdvancedSyntheticControl(
treated_unit='California',
donor_pool=['New York', 'Texas', 'Florida', 'Illinois', 'Ohio'],
time_index=data.index
)
# 拟合模型
scm.fit(data, treatment_time='2005-01-01', V_method='regression')
# 进行统计推断
placebo_results = scm.inference_placebo(n_permutations=1000)
# 生成诊断报告
plot_scm_diagnostics(scm, save_path='scm_diagnostic.png')通过这个完整的实现,研究人员可以方便地应用合成控制法进行政策评估和因果推断,同时获得全面的诊断和统计推断结果。

合成控制法的统计推断具有挑战性,因为传统的渐近理论在此不适用(处理单元只有一个)。本节将介绍合成控制法中常用的统计推断方法,包括Placebo检验、排序检验和交叉验证等。
Placebo检验(也称为置换检验)是合成控制法中最常用的推断方法。其基本思想是将处理状态随机分配给捐赠池中的其他单元,观察"虚假"处理效应的分布。
def placebo_test(scm, data, treatment_time, n_permutations=1000):
"""
执行Placebo检验
参数:
scm: 合成控制模型实例
data: 面板数据
treatment_time: 处理时间
n_permutations: 置换次数
返回:
placebo_distribution: Placebo效应分布
p_value: 统计显著性p值
"""
# 真实处理效应
real_effect = np.mean(scm.effect_post)
# 所有单元(包括处理单元)
all_units = [scm.treated_unit] + scm.donor_pool
placebo_effects = []
placebo_rmspe = [] # 预处理期均方预测误差
for _ in range(n_permutations):
# 随机选择一个单元作为假想的处理单元
placebo_treated = np.random.choice(all_units)
placebo_donor = [u for u in all_units if u != placebo_treated]
try:
# 拟合Placebo模型
placebo_model = AdvancedSyntheticControl(placebo_treated, placebo_donor)
placebo_model.fit(data, treatment_time, V_method='equal')
placebo_effect = placebo_model.calculate_effect()
# 只保留成功的拟合
if len(placebo_model.effect_post) > 0:
placebo_post_effect = np.mean(placebo_model.effect_post)
placebo_pre_rmse = np.sqrt(np.mean(placebo_model.effect_pre**2))
placebo_effects.append(placebo_post_effect)
placebo_rmspe.append(placebo_pre_rmse)
except:
continue
# 计算p值(双边检验)
extreme_count = sum(np.abs(placebo_effects) >= np.abs(real_effect))
p_value = extreme_count / len(placebo_effects) if len(placebo_effects) > 0 else 1.0
return {
'real_effect': real_effect,
'placebo_effects': np.array(placebo_effects),
'placebo_rmspe': np.array(placebo_rmspe),
'p_value': p_value,
'n_successful': len(placebo_effects)
}
def plot_placebo_results(placebo_results, figsize=(12, 5)):
"""绘制Placebo检验结果"""
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=figsize)
# Placebo效应分布
ax1.hist(placebo_results['placebo_effects'], bins=20, alpha=0.7, color='lightblue')
ax1.axvline(x=placebo_results['real_effect'], color='red', linestyle='--',
linewidth=2, label=f'真实效应: {placebo_results["real_effect"]:.3f}')
ax1.set_xlabel('Placebo处理效应')
ax1.set_ylabel('频数')
ax1.set_title('Placebo效应分布')
ax1.legend()
ax1.grid(True, alpha=0.3)
# 效应 vs 预处理期拟合质量
ax2.scatter(placebo_results['placebo_rmspe'], placebo_results['placebo_effects'],
alpha=0.6, color='blue')
ax2.axhline(y=0, color='gray', linestyle='-', alpha=0.7)
ax2.set_xlabel('预处理期RMSPE')
ax2.set_ylabel('Placebo处理效应')
ax2.set_title('效应大小 vs 拟合质量')
ax2.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
print(f"真实处理效应: {placebo_results['real_effect']:.3f}")
print(f"Placebo检验p值: {placebo_results['p_value']:.3f}")
print(f"成功Placebo检验次数: {placebo_results['n_successful']}")
# 使用示例
placebo_results = placebo_test(scm_advanced, data, '2000-01-01', n_permutations=500)
plot_placebo_results(placebo_results)排序检验通过比较真实处理效应在Placebo效应分布中的位置来评估统计显著性:
def rank_test(placebo_results, alternative='two-sided'):
"""
排序检验
参数:
placebo_results: Placebo检验结果
alternative: 检验类型 ('two-sided', 'less', 'greater')
返回:
rank: 真实效应在分布中的排名
p_value: 排序检验p值
"""
real_effect = placebo_results['real_effect']
placebo_effects = placebo_results['placebo_effects']
if len(placebo_effects) == 0:
return None, 1.0
# 合并真实效应和Placebo效应
all_effects = np.concatenate([[real_effect], placebo_effects])
# 计算排名(从大到小)
ranks = len(all_effects) - np.argsort(np.argsort(np.abs(all_effects))) if alternative == 'two-sided' else \
len(all_effects) - np.argsort(np.argsort(all_effects))
real_rank = ranks[0] # 真实效应的排名
# 计算p值
if alternative == 'two-sided':
p_value = (real_rank) / len(all_effects)
elif alternative == 'greater':
p_value = (len(all_effects) - real_rank + 1) / len(all_effects)
elif alternative == 'less':
p_value = real_rank / len(all_effects)
else:
raise ValueError("alternative参数应为 'two-sided', 'less' 或 'greater'")
return real_rank, p_value
# 使用示例
rank, p_value_rank = rank_test(placebo_results, alternative='two-sided')
print(f"真实效应排名: {rank}/{len(placebo_results['placebo_effects']) + 1}")
print(f"排序检验p值: {p_value_rank:.3f}")预处理期的拟合优度是评估合成控制法有效性的重要指标:
def evaluate_fit_quality(scm, threshold=0.1):
"""
评估拟合优度
参数:
scm: 合成控制模型
threshold: RMSPE阈值
返回:
fit_metrics: 拟合优度指标
"""
# 计算预处理期均方预测误差(RMSPE)
rmspe_pre = np.sqrt(np.mean(scm.effect_pre**2))
# 计算R²
ss_total = np.sum((scm.y_treated_pre - np.mean(scm.y_treated_pre))**2)
ss_residual = np.sum(scm.effect_pre**2)
r_squared = 1 - (ss_residual / ss_total) if ss_total > 0 else 0
# 计算平均绝对误差(MAE)
mae_pre = np.mean(np.abs(scm.effect_pre))
# 标准化处理效应(相对于预处理期波动)
pre_std = np.std(scm.y_treated_pre)
standardized_effect = np.mean(scm.effect_post) / pre_std if pre_std > 0 else 0
fit_metrics = {
'rmspe_pre': rmspe_pre,
'r_squared': r_squared,
'mae_pre': mae_pre,
'standardized_effect': standardized_effect,
'good_fit': rmspe_pre < threshold
}
return fit_metrics
# 使用示例
fit_metrics = evaluate_fit_quality(scm_advanced)
print("拟合优度评估:")
for metric, value in fit_metrics.items():
if metric != 'good_fit':
print(f"{metric}: {value:.3f}")
print(f"拟合质量良好: {fit_metrics['good_fit']}")敏感性分析评估结论对模型设定的稳健性:
def sensitivity_analysis(scm, data, treatment_time, variations):
"""
敏感性分析
参数:
scm: 基础模型
data: 数据
treatment_time: 处理时间
variations: 敏感性分析变体列表
返回:
sensitivity_results: 敏感性分析结果
"""
sensitivity_results = []
base_effect = np.mean(scm.effect_post)
for variation in variations:
try:
# 创建变体模型
if variation['type'] == 'donor_pool':
# 改变捐赠池
donor_pool_var = variation['donor_pool']
scm_var = AdvancedSyntheticControl(scm.treated_unit, donor_pool_var)
scm_var.fit(data, treatment_time, V_method='equal')
elif variation['type'] == 'V_matrix':
# 改变V矩阵
scm_var = AdvancedSyntheticControl(scm.treated_unit, scm.donor_pool)
scm_var.fit(data, treatment_time, V_method=variation['method'])
elif variation['type'] == 'predictors':
# 改变预测变量
scm_var = AdvancedSyntheticControl(scm.treated_unit, scm.donor_pool)
# 这里需要实现特定的预测变量选择
pass
else:
continue
effect_var = np.mean(scm_var.effect_post)
rmspe_pre_var = np.sqrt(np.mean(scm_var.effect_pre**2))
sensitivity_results.append({
'variation': variation['name'],
'effect': effect_var,
'rmspe_pre': rmspe_pre_var,
'difference': effect_var - base_effect,
'relative_diff': (effect_var - base_effect) / abs(base_effect) if base_effect != 0 else 0
})
except Exception as e:
print(f"变体 {variation['name']} 失败: {str(e)}")
continue
return pd.DataFrame(sensitivity_results)
# 敏感性分析示例
variations = [
{'type': 'V_matrix', 'method': 'equal', 'name': '均等权重V矩阵'},
{'type': 'V_matrix', 'method': 'regression', 'name': '回归权重V矩阵'},
{'type': 'donor_pool', 'donor_pool': scm_advanced.donor_pool[:10], 'name': '前10个捐赠单元'},
{'type': 'donor_pool', 'donor_pool': scm_advanced.donor_pool[5:15], 'name': '中间10个捐赠单元'}
]
sensitivity_results = sensitivity_analysis(scm_advanced, data, '2000-01-01', variations)
print("敏感性分析结果:")
print(sensitivity_results.round(4))
通过综合使用这些统计推断方法,研究人员可以更全面地评估合成控制法结果的可靠性和稳健性,为政策评估提供更科学的依据。
基础合成控制法有许多扩展和变体,这些方法针对SCM的不同局限性提供了改进方案。本节将介绍几种重要的SCM扩展方法。
广义合成控制法(Generalized Synthetic Control)允许处理多个处理单元和多个处理期,扩展了SCM的应用范围。
class GeneralizedSyntheticControl:
def __init__(self):
self.weights = None
self.unit_effects = None
self.time_effects = None
def fit(self, data, treatment_mask, factors=2, max_iter=100):
"""
拟合广义合成控制模型
参数:
data: 面板数据 (时间 × 单元)
treatment_mask: 处理指示矩阵 (时间 × 单元)
factors: 潜在因子数量
max_iter: 最大迭代次数
"""
from sklearn.decomposition import NMF
import numpy as np
self.data = data.values
self.treatment_mask = treatment_mask.values
self.n_time, self.n_units = self.data.shape
# 初始化因子和权重
np.random.seed(42)
self.unit_effects = np.random.normal(0, 1, (self.n_units, factors))
self.time_effects = np.random.normal(0, 1, (self.n_time, factors))
# 迭代优化(简化实现)
for iteration in range(max_iter):
# 更新单元效应(固定时间效应)
for i in range(self.n_units):
# 只使用未处理期的数据
control_periods = ~self.treatment_mask[:, i]
if np.sum(control_periods) > 0:
X = self.time_effects[control_periods]
y = self.data[control_periods, i]
self.unit_effects[i] = np.linalg.lstsq(X, y, rcond=None)[0]
# 更新时间效应(固定单元效应)
for t in range(self.n_time):
# 只使用未处理单元的数据
control_units = ~self.treatment_mask[t, :]
if np.sum(control_units) > 0:
X = self.unit_effects[control_units]
y = self.data[t, control_units]
self.time_effects[t] = np.linalg.lstsq(X, y, rcond=None)[0]
# 预测反事实结果
self.predicted = self.time_effects @ self.unit_effects.T
self.effect = self.data - self.predicted
return self
def calculate_effect(self, units=None, periods=None):
"""计算处理效应"""
if units is None and periods is None:
return self.effect
effect_subset = self.effect.copy()
if units is not None:
effect_subset = effect_subset[:, units]
if periods is not None:
effect_subset = effect_subset[periods, :]
return effect_subset
# 使用示例(简化)
def example_gscm():
"""广义合成控制法示例"""
np.random.seed(42)
n_time, n_units = 30, 15
# 生成数据
data = np.random.normal(0, 1, (n_time, n_units))
# 添加共同趋势和个体效应
common_trend = np.linspace(0, 2, n_time)
for i in range(n_units):
data[:, i] += common_trend + np.random.normal(0, 0.5)
# 创建处理掩码(部分单元在后期接受处理)
treatment_mask = np.zeros((n_time, n_units), dtype=bool)
treatment_mask[20:, :5] = True # 最后5个单元在后期接受处理
# 添加处理效应
data[treatment_mask] += -1.5 # 处理效应为-1.5
# 应用广义合成控制法
gscm = GeneralizedSyntheticControl()
gscm.fit(pd.DataFrame(data), pd.DataFrame(treatment_mask), factors=2)
# 计算平均处理效应
effect = gscm.calculate_effect()
avg_effect = np.mean(effect[treatment_mask])
print(f"估计的平均处理效应: {avg_effect:.3f}")
print(f"真实处理效应: -1.500")
return gscm, effect
gscm, effect_gscm = example_gscm()矩阵补全方法将合成控制法视为矩阵补全问题,使用核范数最小化等技术:
class MatrixCompletionSCM:
def __init__(self, lambda_param=1.0):
self.lambda_param = lambda_param
self.completed_matrix = None
def fit(self, data, treatment_mask, max_iter=1000, tol=1e-6):
"""
矩阵补全合成控制法
参数:
data: 面板数据
treatment_mask: 处理指示矩阵
lambda_param: 正则化参数
max_iter: 最大迭代次数
tol: 收敛容忍度
"""
from scipy.linalg import svd
import numpy as np
Y = data.values.copy()
M = treatment_mask.values # 1表示缺失(处理期),0表示观测
# 初始化
X = Y.copy()
X[M] = 0 # 初始时用0填充缺失值
for iteration in range(max_iter):
# SVD分解
U, s, Vt = svd(X, full_matrices=False)
# 软阈值收缩
s_shrink = np.maximum(s - self.lambda_param, 0)
# 更新矩阵
X_new = U @ np.diag(s_shrink) @ Vt
# 保持观测值不变
X_new[~M] = Y[~M]
# 检查收敛
if np.linalg.norm(X_new - X) < tol:
break
X = X_new
self.completed_matrix = X
self.effect = Y - X
return self
# 使用示例
def example_matrix_completion():
"""矩阵补全SCM示例"""
np.random.seed(42)
n_time, n_units = 25, 10
# 生成低秩数据
rank = 3
U = np.random.normal(0, 1, (n_time, rank))
V = np.random.normal(0, 1, (n_units, rank))
data = U @ V.T + np.random.normal(0, 0.5, (n_time, n_units))
# 创建处理
treatment_mask = np.zeros((n_time, n_units), dtype=bool)
treatment_mask[15:, 0] = True # 第一个单元在后期接受处理
data[treatment_mask] += -2.0 # 添加处理效应
# 应用矩阵补全方法
mc_scm = MatrixCompletionSCM(lambda_param=0.5)
mc_scm.fit(pd.DataFrame(data), pd.DataFrame(treatment_mask))
# 计算处理效应
effect = mc_scm.effect
avg_effect = np.mean(effect[treatment_mask])
print(f"矩阵补全SCM估计效应: {avg_effect:.3f}")
print(f"真实处理效应: -2.000")
return mc_scm, effect
mc_scm, effect_mc = example_matrix_completion()贝叶斯方法为合成控制法提供概率解释和不确定性量化:
class BayesianSyntheticControl:
def __init__(self, n_samples=1000, warmup=500):
self.n_samples = n_samples
self.warmup = warmup
self.samples = None
def fit(self, treated_pre, control_pre, treated_post, control_post):
"""
贝叶斯合成控制法(简化实现)
注意:完整实现需要使用PyMC3或Stan
"""
import numpy as np
from scipy.optimize import minimize
# 简化实现:使用Bootstrap近似贝叶斯后验
n_pre = len(treated_pre)
n_post = len(treated_post)
n_donor = control_pre.shape[1]
# Bootstrap采样
bootstrap_weights = []
bootstrap_effects = []
for _ in range(self.n_samples + self.warmup):
# 对预处理期进行Bootstrap重采样
indices = np.random.choice(n_pre, n_pre, replace=True)
treated_boot = treated_pre[indices]
control_boot = control_pre[indices, :]
try:
# 估计权重
def objective(W):
return np.mean((treated_boot - control_boot @ W)**2)
constraints = ({'type': 'eq', 'fun': lambda W: np.sum(W) - 1})
bounds = [(0, 1) for _ in range(n_donor)]
W0 = np.ones(n_donor) / n_donor
result = minimize(objective, W0, method='SLSQP',
bounds=bounds, constraints=constraints)
if result.success:
weights = result.x
# 预测处理后期
synthetic_post = control_post @ weights
effect = treated_post - synthetic_post
bootstrap_weights.append(weights)
bootstrap_effects.append(effect)
except:
continue
# 丢弃warmup期
self.samples = {
'weights': np.array(bootstrap_weights[self.warmup:]),
'effects': np.array(bootstrap_effects[self.warmup:])
}
return self
def summary(self):
"""后验总结"""
if self.samples is None:
raise ValueError("需要先拟合模型")
effect_samples = self.samples['effects']
avg_effects = effect_samples.mean(axis=1)
summary = {
'mean_effect': np.mean(avg_effects),
'effect_sd': np.std(avg_effects),
'effect_ci_95': np.percentile(avg_effects, [2.5, 97.5]),
'prob_positive': np.mean(avg_effects > 0),
'prob_negative': np.mean(avg_effects < 0)
}
return summary
# 使用示例
def example_bayesian_scm():
"""贝叶斯SCM示例"""
np.random.seed(42)
# 生成数据
n_pre, n_post, n_donor = 20, 10, 15
treated_pre = np.cumsum(np.random.normal(0, 0.5, n_pre)) + 10
control_pre = np.zeros((n_pre, n_donor))
for j in range(n_donor):
control_pre[:, j] = np.cumsum(np.random.normal(0, 0.5, n_pre)) + 10 + np.random.normal(0, 1)
treated_post = np.cumsum(np.random.normal(0, 0.5, n_post)) + 10 - 1.5 # 处理效应-1.5
control_post = np.zeros((n_post, n_donor))
for j in range(n_donor):
control_post[:, j] = np.cumsum(np.random.normal(0, 0.5, n_post)) + 10 + np.random.normal(0, 1)
# 应用贝叶斯SCM
bayesian_scm = BayesianSyntheticControl(n_samples=500, warmup=100)
bayesian_scm.fit(treated_pre, control_pre, treated_post, control_post)
summary = bayesian_scm.summary()
print("贝叶斯合成控制法结果:")
print(f"平均处理效应: {summary['mean_effect']:.3f}")
print(f"95%置信区间: ({summary['effect_ci_95'][0]:.3f}, {summary['effect_ci_95'][1]:.3f})")
print(f"效应为负的概率: {summary['prob_negative']:.3f}")
return bayesian_scm, summary
bayesian_scm, summary_bayesian = example_bayesian_scm()当处理效应可能随时间和单元特征变化时,可以使用异质性处理效应模型:
class HeterogeneousEffectSCM:
def __init__(self):
self.weights = None
self.effect_model = None
def fit(self, data, treatment_time, unit_covariates=None):
"""
异质性处理效应合成控制法
参数:
data: 面板数据
treatment_time: 处理时间
unit_covariates: 单元特征,用于建模效应异质性
"""
from sklearn.linear_model import LinearRegression
import numpy as np
self.data = data
self.treatment_time = treatment_time
self.pre_period = data.index[data.index < treatment_time]
self.post_period = data.index[data.index >= treatment_time]
# 第一步:标准合成控制法
treated_unit = data.columns[0] # 假设第一列是处理单元
donor_pool = data.columns[1:]
scm = AdvancedSyntheticControl(treated_unit, donor_pool)
scm.fit(data, treatment_time)
scm_effect = scm.calculate_effect()
# 第二步:建模效应异质性(如果有单元特征)
if unit_covariates is not None and len(self.post_period) > 1:
# 使用处理后期效应与单元特征的关系
post_effects = scm_effect[len(self.pre_period):]
# 平均处理效应作为基线
avg_effect = np.mean(post_effects)
# 如果有多个处理后期,可以建模效应随时间的变化
if len(post_effects) > 2:
time_since_treatment = np.arange(len(post_effects))
# 简单线性趋势模型
effect_trend = LinearRegression()
effect_trend.fit(time_since_treatment.reshape(-1, 1), post_effects)
self.effect_model = {
'type': 'linear_trend',
'intercept': effect_trend.intercept_,
'slope': effect_trend.coef_[0],
'avg_effect': avg_effect
}
else:
self.effect_model = {
'type': 'constant',
'avg_effect': avg_effect
}
else:
self.effect_model = {
'type': 'constant',
'avg_effect': np.mean(scm_effect[len(self.pre_period):])
}
self.weights = scm.weights
self.effect = scm_effect
return self
def predict_effect(self, time_points=None, unit_features=None):
"""预测处理效应"""
if self.effect_model is None:
raise ValueError("需要先拟合模型")
if self.effect_model['type'] == 'constant':
return np.full_like(time_points, self.effect_model['avg_effect'], dtype=float)
elif self.effect_model['type'] == 'linear_trend':
effects = (self.effect_model['intercept'] +
self.effect_model['slope'] * time_points)
return effects
else:
raise ValueError("不支持的效应模型类型")
# 使用示例
def example_heterogeneous_scm():
"""异质性效应SCM示例"""
np.random.seed(42)
n_time, n_units = 30, 10
# 生成数据
time_index = pd.date_range('2000-01-01', periods=n_time, freq='Y')
data = pd.DataFrame(index=time_index)
# 生成单元数据
for i in range(n_units):
trend = np.linspace(100, 120, n_time) + np.random.normal(0, 5, n_time)
data[f'Unit_{i}'] = trend
# 添加处理效应(随时间增强)
treatment_time = '2010-01-01'
treatment_idx = time_index.get_loc(treatment_time)
# 处理效应随时间线性增长
for i in range(treatment_idx, n_time):
effect = -2 - 0.5 * (i - treatment_idx) # 从-2开始,每期减少0.5
data.iloc[i, 0] += effect
# 应用异质性效应SCM
hescm = HeterogeneousEffectSCM()
hescm.fit(data, treatment_time)
# 预测未来效应
future_periods = 5
future_time_points = np.arange(len(hescm.post_period),
len(hescm.post_period) + future_periods)
future_effects = hescm.predict_effect(future_time_points)
print("异质性处理效应SCM结果:")
print(f"当前平均效应: {hescm.effect_model['avg_effect']:.3f}")
print(f"预测未来效应: {future_effects}")
return hescm, future_effects
hescm, future_effects = example_heterogeneous_scm()

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。