准实验设计的实战要点:自然实验的识别与验证
一、引言:当随机实验不可行时——准实验设计的力量
在因果推断的理想世界中,随机对照试验(RCT)是识别因果关系的黄金标准。然而,经济学、社会学、公共政策等领域中,研究者常常面临伦理限制、成本约束或现实不可行性——我们无法随机分配"是否上大学"、“是否实施房产税"或"是否经历地震”。此时,**准实验设计(Quasi-Experimental Design)**成为连接相关性与因果性的桥梁。
自然实验(Natural Experiment)是准实验的皇冠明珠,它利用外生的政策冲击、制度断点或偶然事件,创造出"拟随机"的变异。例如:
- 教育领域:义务教育法修订导致的出生日期断点
- 医疗领域:医保报销门槛的年龄断点(如65岁 Medicare)
- 环境领域:风速超标引发工厂停产的监管规则
二、理论基础:因果识别策略总览
2.1 识别策略的识别:何时用何种方法
| 策略类型 | 核心假设 | 变异来源 | 数据要求 | 效力强度 | 典型陷阱 |
|---|---|---|---|---|---|
| RDD | 连续性+局部随机 | 临界值规则 | 靠近断点密集数据 | ⭐⭐⭐⭐⭐ | 操纵检验、带宽选择 |
| DID | 平行趋势+共同冲击 | 时间+组间差异 | 面板数据(时期≥2) | ⭐⭐⭐⭐ | 溢出效应、预期效应 |
| IV | 排他性约束+相关性 | 工具变量 | 工具+第一阶段强F | ⭐⭐⭐ | 弱工具、排他性违反 |
| SCM | 因子模型+权重稀疏 | 单一处理单元 | 长期面板+多个对照 | ⭐⭐⭐⭐ | 外推性、安慰剂检验 |
2.2 共同识别基础:潜在结果框架下的可识别性
所有准实验方法都基于潜在结果框架和关键可识别假设:
I. 条件独立假设(Conditional Independence)
在RDD中,此假设在断点附近近似成立;在DID中,通过差分消除不随时间变化的混淆因子。
II. 共同支撑(Common Support)
确保每个单元都有接受处理和对照的可能性。
III. 稳定单元处理值假设(SUTVA)
一个单元的潜在结果不受其他单元处理状态影响。在存在溢出效应时(如邻居接受政策),此假设被违反。
章节总结:理论基础
三、断点回归设计(RDD):规则驱动的识别
3.1 方法原理:在临界值处捕捉因果
RDD利用确定性分配规则创造的间断性。假设奖学金按考试分数≥60授予,则59分和60分的学生在能力上几乎随机,但获得奖学金的概率从0跃升至1。这种已知规则+连续驱动变量的结构使得断点附近的因果识别类似于局部随机化。
核心假设:
- I. 连续性:潜在结果和在断点处连续
- II. 无精确操纵:驱动变量不能被精确控制以越过断点
- III. 局部随机化:在带宽内,的分布近似随机
估计方程:
3.2 代码实战:高考录取分数线RDD
场景:某省一本线(550分)对大学入学率及长期收入的因果影响
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from scipy import stats
import seaborn as sns
from sklearn.linear_model import RidgeCV
from sklearn.preprocessing import PolynomialFeatures
from sklearn.pipeline import make_pipeline
import warnings
warnings.filterwarnings('ignore')
# 设置中文字体支持
plt.rcParams['font.sans-serif'] = ['SimHei']
plt.rcParams['axes.unicode_minus'] = False
# I. 数据生成:模拟高考分数与录取数据
def generate_rdd_data(n=5000, cutoff=550, bandwidth=100):
"""
生成RDD模拟数据
模拟高考分数从450-650,一本线550分
假设分数越高越可能录取,但一本线处存在跳跃
"""
np.random.seed(42)
# 驱动变量:高考分数(连续)
# 在断点附近增加采样密度
scores_below = np.random.uniform(cutoff - bandwidth, cutoff, int(n/2))
scores_above = np.random.uniform(cutoff, cutoff + bandwidth, int(n/2))
scores = np.concatenate([scores_below, scores_above])
# 添加微小测量误差(确保无精确操纵)
scores += np.random.normal(0, 0.5, n)
# 处理变量:是否超过一本线
above_cutoff = (scores >= cutoff).astype(int)
# 结果变量1:是否被一本院校录取
# 连续性部分 + 断点处的跳跃
admission_prob = 0.2 + 0.0012 * scores + 0.35 * above_cutoff
admission_prob = np.clip(admission_prob, 0, 1)
admission = np.random.binomial(1, admission_prob, n)
# 结果变量2:十年后年收入(万)
# 基础收入 + 分数效应 + 录取的因果效应
base_income = 5 + 0.03 * scores
causal_effect = 8 * admission # 录取使收入增加8万
income = base_income + causal_effect + np.random.normal(0, 2, n)
# 其他协变量
urban = np.random.binomial(1, 0.5 + 0.001 * scores, n)
father_edu = np.random.normal(12, 3, n) + 0.01 * scores
df = pd.DataFrame({
'score': scores,
'above_cutoff': above_cutoff,
'admission': admission,
'income': income,
'score_centered': scores - cutoff, # 中心化驱动变量
'urban': urban,
'father_edu': father_edu
})
return df
# 生成数据
df_rdd = generate_rdd_data(n=8000, cutoff=550, bandwidth=120)
print("数据生成完成!")
print(df_rdd[['score', 'above_cutoff', 'admission', 'income']].describe().round(2))
代码解释:
- 驱动变量设计:在断点两侧对称采样,确保局部有足够的样本
- 连续性处理:
scores从450-650连续分布,但above_cutoff在550处发生跳跃 - 跳跃幅度设置:
0.35 * above_cutoff模拟了一本线对录取概率的因果效应(35个百分点) - 双重结果:同时生成录取(二元)和收入(连续)结果,展示RDD的灵活性
II. 图形分析:可视化断点
# 创建可视化函数
def plot_rdd_discontinuity(df, cutoff=550, outcome='admission',
bandwidth=80, bin_width=5):
"""
RDD可视化:展示断点处的跳跃
"""
fig, axes = plt.subplots(1, 2, figsize=(16, 6))
# 1. 散点图+局部拟合
ax1 = axes[0]
# 在带宽内绘制
df_plot = df[(df['score'] >= cutoff - bandwidth) &
(df['score'] <= cutoff + bandwidth)]
# 散点(随机采样避免过度绘制)
sample_idx = np.random.choice(df_plot.index, min(500, len(df_plot)), replace=False)
ax1.scatter(df_plot.loc[sample_idx, 'score'],
df_plot.loc[sample_idx, outcome],
alpha=0.3, s=10, color='gray', label='观测数据')
# 断点垂直线
ax1.axvline(x=cutoff, color='red', linestyle='--', linewidth=2,
label=f'断点 (={cutoff})')
# 局部多项式拟合(两侧分别)
from sklearn.linear_model import LinearRegression
from sklearn.preprocessing import PolynomialFeatures
# 左侧
left = df_plot[df_plot['score'] < cutoff]
X_left = left[['score_centered']].values
y_left = left[outcome].values
if len(left) > 50:
poly_left = PolynomialFeatures(degree=1)
X_left_poly = poly_left.fit_transform(X_left)
model_left = LinearRegression().fit(X_left_poly, y_left)
score_range_left = np.linspace(-bandwidth, -0.5, 100)
X_pred_left = poly_left.transform(score_range_left.reshape(-1, 1))
pred_left = model_left.predict(X_pred_left)
ax1.plot(score_range_left + cutoff, pred_left,
'blue', linewidth=2, label='左侧拟合')
# 右侧
right = df_plot[df_plot['score'] >= cutoff]
X_right = right[['score_centered']].values
y_right = right[outcome].values
if len(right) > 50:
poly_right = PolynomialFeatures(degree=1)
X_right_poly = poly_right.fit_transform(X_right)
model_right = LinearRegression().fit(X_right_poly, y_right)
score_range_right = np.linspace(0.5, bandwidth, 100)
X_pred_right = poly_right.transform(score_range_right.reshape(-1, 1))
pred_right = model_right.predict(X_pred_right)
ax1.plot(score_range_right + cutoff, pred_right,
'green', linewidth=2, label='右侧拟合')
ax1.set_xlabel('高考分数', fontsize=12)
ax1.set_ylabel('录取概率' if outcome == 'admission' else '年收入(万)',
fontsize=12)
ax1.set_title(f'RDD可视化:{outcome}的断点效应', fontsize=14, fontweight='bold')
ax1.legend()
ax1.grid(True, alpha=0.3)
# 2. 直方图:驱动变量分布(检验操纵)
ax2 = axes[1]
bins = np.arange(cutoff - bandwidth, cutoff + bandwidth + bin_width, bin_width)
ax2.hist(df_plot[df_plot['score'] < cutoff]['score'],
bins=bins, alpha=0.7, color='skyblue',
edgecolor='black', label='低于一本线')
ax2.hist(df_plot[df_plot['score'] >= cutoff]['score'],
bins=bins, alpha=0.7, color='lightcoral',
edgecolor='black', label='高于一本线')
ax2.axvline(x=cutoff, color='red', linestyle='--', linewidth=2)
ax2.set_xlabel('高考分数', fontsize=12)
ax2.set_ylabel('频数', fontsize=12)
ax2.set_title('驱动变量分布:检验分数操纵', fontsize=14, fontweight='bold')
ax2.legend()
plt.tight_layout()
plt.savefig(f'rdd_analysis_{outcome}.png', dpi=300, bbox_inches='tight')
plt.show()
# 可视化两个结果变量
plot_rdd_discontinuity(df_rdd, outcome='admission')
plot_rdd_discontinuity(df_rdd, outcome='income', bandwidth=100)
图形解读:
- 左图:清晰展示550分处的跳跃,录取概率从约35%跃升至70%,收入从约21万跃升至29万
- 右图:检查驱动变量分布的连续性,若在550分处出现"堆积"现象,则暗示分数操纵
III. 参数估计:局部多项式回归
def rdd_estimate(df, cutoff=550, bandwidth=60, polynomial_order=1,
outcome='income'):
"""
局部多项式回归估计RDD效应
参数:
- bandwidth: 带宽,决定使用断点多宽范围内的数据
- polynomial_order: 多项式阶数,1=线性,2=二次
- outcome: 结果变量名
"""
# I. 带宽选择(数据驱动)
# 使用IK最优带宽(Imbens & Kalyanaraman 2012)
def optimal_bandwidth_ik(data, cutoff):
"""
简化的IK带宽选择(实际应使用更复杂的实现)
"""
# 计算两侧标准差和密度
left = data[data['score'] < cutoff]
right = data[data['score'] >= cutoff]
if len(left) < 50 or len(right) < 50:
return 60 # 默认带宽
# 简单的启发式:选择两侧各有至少200个观测的带宽
for h in [30, 40, 50, 60, 80, 100]:
n_left = np.sum((data['score'] >= cutoff - h) &
(data['score'] < cutoff))
n_right = np.sum((data['score'] >= cutoff) &
(data['score'] <= cutoff + h))
if n_left >= 200 and n_right >= 200:
return h
return 100
# 使用指定带宽或数据驱动
if bandwidth == 'optimal':
bandwidth = optimal_bandwidth_ik(df, cutoff)
print(f"使用IK最优带宽: {bandwidth}")
# II. 限制在带宽内
df_local = df[(df['score'] >= cutoff - bandwidth) &
(df['score'] <= cutoff + bandwidth)].copy()
df_local['treat'] = (df_local['score'] >= cutoff).astype(int)
df_local['score_c'] = df_local['score'] - cutoff
# III. 多项式设计矩阵
# 交互项:treat * poly(score_c)
if polynomial_order == 0:
X_left = np.ones(len(df_local[df_local['treat']==0]))
X_right = np.ones(len(df_local[df_local['treat']==1]))
else:
poly = PolynomialFeatures(degree=polynomial_order, include_bias=False)
# 左侧(控制组)
X_left_raw = df_local[df_local['treat']==0][['score_c']].values
X_left_poly = poly.fit_transform(X_left_raw)
X_left = np.column_stack([np.ones(len(X_left_poly)), X_left_poly])
# 右侧(处理组)
X_right_raw = df_local[df_local['treat']==1][['score_c']].values
X_right_poly = poly.fit_transform(X_right_raw)
X_right = np.column_stack([np.ones(len(X_right_poly)), X_right_poly])
# 合并设计矩阵(RDD标准做法:允许两侧斜率不同)
# 实际模型:Y = α + β1*score_c + β2*treat + β3*score_c*treat
# 效应 = β2(在score_c=0处)
X = np.zeros((len(df_local), 2 * (polynomial_order + 1)))
# 左侧
left_idx = df_local['treat'] == 0
X[left_idx, :X_left.shape[1]] = X_left
# 右侧
right_idx = df_local['treat'] == 1
X[right_idx, X_left.shape[1]:] = X_right
# IV. 估计模型
y = df_local[outcome].values
# 使用Ridge回归防止过拟合
model = RidgeCV(alphas=np.logspace(-6, 6, 50), cv=5)
model.fit(X, y)
# V. 效应估计
# 在断点处的预测差异
# 左侧点:score_c=0, treat=0
X_left_point = np.zeros((1, X.shape[1]))
X_left_point[0, 0] = 1 # 截距
# 右侧点:score_c=0, treat=1
X_right_point = np.zeros((1, X.shape[1]))
X_right_point[0, X_left.shape[1]] = 1 # 右侧截距
pred_left = model.predict(X_left_point)[0]
pred_right = model.predict(X_right_point)[0]
treatment_effect = pred_right - pred_left
# VI. 标准误(使用稳健标准误)
residuals = y - model.predict(X)
# 简化的标准误计算(实际应使用聚类稳健)
X_left_reduced = X[:, :X_left.shape[1]] # 左侧设计
X_right_reduced = X[:, X_left.shape[1]:] # 右侧设计
# 计算在断点处的方差
# 使用Delta方法
var_left = residuals[left_idx].var() if np.sum(left_idx) > 1 else 1
var_right = residuals[right_idx].var() if np.sum(right_idx) > 1 else 1
n_left = np.sum(left_idx)
n_right = np.sum(right_idx)
se_effect = np.sqrt(var_left/n_left + var_right/n_right)
# VII. 带宽敏感性分析
bandwidths = [30, 40, 50, 60, 80, 100, 120]
sensitivity = []
for h in bandwidths:
try:
df_h = df[(df['score'] >= cutoff - h) &
(df['score'] <= cutoff + h)]
if len(df_h) > 100:
# 简化估计:均值差
left_mean = df_h[df_h['score'] < cutoff][outcome].mean()
right_mean = df_h[df_h['score'] >= cutoff][outcome].mean()
sensitivity.append({
'bandwidth': h,
'effect': right_mean - left_mean,
'n_obs': len(df_h)
})
except:
continue
sensitivity_df = pd.DataFrame(sensitivity)
return {
'effect': treatment_effect,
'se': se_effect,
'ci': [treatment_effect - 1.96*se_effect,
treatment_effect + 1.96*se_effect],
'n_left': n_left,
'n_right': n_right,
'bandwidth': bandwidth,
'sensitivity': sensitivity_df,
'model': model,
'df_local': df_local
}
# 估计收入效应
result_income = rdd_estimate(df_rdd, outcome='income', bandwidth=60,
polynomial_order=1)
result_admission = rdd_estimate(df_rdd, outcome='admission', bandwidth=50,
polynomial_order=1)
print("\n" + "="*60)
print("RDD估计结果:高考一本线的因果效应")
print("="*60)
print(f"\n【结果变量:未来年收入】")
print(f"效应估计: {result_income['effect']:.3f} 万元")
print(f"标准误: {result_income['se']:.3f}")
print(f"95%置信区间: [{result_income['ci'][0]:.3f}, {result_income['ci'][1]:.3f}]")
print(f"左侧样本: {result_income['n_left']}, 右侧样本: {result_income['n_right']}")
print(f"使用带宽: ±{result_income['bandwidth']}分")
print(f"\n【结果变量:一本录取】")
print(f"效应估计: {result_admission['effect']:.3f}")
print(f"95%置信区间: [{result_admission['ci'][0]:.3f}, {result_admission['ci'][1]:.3f}]")
IV. 稳健性检验:带宽与多项式敏感性
def rdd_robustness_checks(df, outcome='income'):
"""
RDD稳健性检验体系
"""
fig, axes = plt.subplots(2, 2, figsize=(16, 12))
# I. 带宽敏感性分析
ax1 = axes[0, 0]
bandwidths = [40, 50, 60, 70, 80, 100, 120]
effects = []
cis = []
for h in bandwidths:
res = rdd_estimate(df, outcome=outcome, bandwidth=h, polynomial_order=1)
effects.append(res['effect'])
cis.append(res['ci'])
effects = np.array(effects)
cis = np.array(cis)
ax1.plot(bandwidths, effects, 'o-', color='blue', linewidth=2,
markersize=8, label='点估计')
ax1.fill_between(bandwidths, cis[:, 0], cis[:, 1],
color='blue', alpha=0.2, label='95%置信区间')
ax1.axhline(y=0, color='red', linestyle='--', alpha=0.5)
ax1.set_xlabel('带宽', fontsize=12)
ax1.set_ylabel('效应估计', fontsize=12)
ax1.set_title('稳健性检验:带宽敏感性', fontsize=14, fontweight='bold')
ax1.legend()
ax1.grid(True, alpha=0.3)
# II. 多项式阶数敏感性
ax2 = axes[0, 1]
orders = [0, 1, 2, 3]
effects_poly = []
cis_poly = []
for order in orders:
res = rdd_estimate(df, outcome=outcome, bandwidth=60,
polynomial_order=order)
effects_poly.append(res['effect'])
cis_poly.append(res['ci'])
effects_poly = np.array(effects_poly)
cis_poly = np.array(cis_poly)
ax2.plot(orders, effects_poly, 's-', color='green', linewidth=2,
markersize=8, label='点估计')
ax2.fill_between(orders, cis_poly[:, 0], cis_poly[:, 1],
color='green', alpha=0.2, label='95%置信区间')
ax2.axhline(y=0, color='red', linestyle='--', alpha=0.5)
ax2.set_xlabel('多项式阶数', fontsize=12)
ax2.set_ylabel('效应估计', fontsize=12)
ax2.set_title('稳健性检验:多项式敏感性', fontsize=14, fontweight='bold')
ax2.set_xticks(orders)
ax2.legend()
ax2.grid(True, alpha=0.3)
# III. 安慰剂检验:虚假断点
ax3 = axes[1, 0]
placebo_cutoffs = [530, 540, 560, 570]
placebo_effects = []
for c_fake in placebo_cutoffs:
# 在虚假断点处估计效应
df_fake = df.copy()
df_fake['above_fake'] = (df_fake['score'] >= c_fake).astype(int)
# 限制在原带宽内
df_fake_local = df_fake[
(df_fake['score'] >= 550 - 60) &
(df_fake['score'] <= 550 + 60)
]
if len(df_fake_local) > 100:
left_mean = df_fake_local[df_fake_local['score'] < c_fake][outcome].mean()
right_mean = df_fake_local[df_fake_local['score'] >= c_fake][outcome].mean()
placebo_effects.append(right_mean - left_mean)
else:
placebo_effects.append(np.nan)
ax3.plot(placebo_cutoffs, placebo_effects, 'D-', color='orange',
linewidth=2, markersize=8, label='安慰剂效应')
ax3.axvline(x=550, color='red', linestyle='--', alpha=0.7, label='真实断点')
ax3.axhline(y=0, color='gray', linestyle='-', alpha=0.5)
ax3.set_xlabel('安慰剂断点位置', fontsize=12)
ax3.set_ylabel('估计效应', fontsize=12)
ax3.set_title('安慰剂检验:虚假断点', fontsize=14, fontweight='bold')
ax3.legend()
ax3.grid(True, alpha=0.3)
# IV. 驱动变量分布检验(McCrary检验)
ax4 = axes[1, 1]
# 简单直方图检验
bins = np.arange(530, 571, 2)
hist, bin_edges = np.histogram(df['score'], bins=bins)
bin_centers = (bin_edges[:-1] + bin_edges[1:]) / 2
# 标记断点所在bin
cutoff_idx = np.argmin(np.abs(bin_centers - 550))
ax4.bar(bin_centers, hist, width=1.8, color='skyblue',
edgecolor='black', alpha=0.7)
ax4.bar(bin_centers[cutoff_idx], hist[cutoff_idx],
width=1.8, color='red', alpha=0.8, label='断点处')
ax4.axvline(x=550, color='red', linestyle='--', linewidth=2)
ax4.set_xlabel('高考分数', fontsize=12)
ax4.set_ylabel('观测频数', fontsize=12)
ax4.set_title('McCrary检验:断点处密度连续性', fontsize=14, fontweight='bold')
ax4.legend()
# 简单的t检验:断点左右bin的差异
if cutoff_idx > 0 and cutoff_idx < len(hist) - 1:
left_bin = hist[cutoff_idx - 1]
right_bin = hist[cutoff_idx]
# Poisson检验(计数数据)
z_stat = (right_bin - left_bin) / np.sqrt(right_bin + left_bin + 1e-10)
p_val = 2 * (1 - stats.norm.cdf(abs(z_stat)))
ax4.text(550, max(hist) * 0.8,
f'z={z_stat:.2f}\np={p_val:.3f}',
ha='center', fontsize=10,
bbox=dict(boxstyle='round', facecolor='white', alpha=0.8))
plt.tight_layout()
plt.savefig('rdd_robustness.png', dpi=300, bbox_inches='tight')
plt.show()
# 执行稳健性检验
rdd_robustness_checks(df_rdd, outcome='income')
稳健性检验解读:
- 带宽敏感性:若效应估计在不同带宽下符号一致、幅度稳定,则结果可靠
- 多项式阶数:若在0-3阶间效应核心结论不变,说明模型设定不敏感
- 安慰剂检验:在虚假断点处效应应接近0,否则存在系统性偏误
- McCrary检验:p>0.1说明断点处无密度跳跃,排除精确操纵
V. 高级检验:驱动变量与协变量的平衡性
def covariate_balance_test(df, covariates=['urban', 'father_edu'],
cutoff=550, bandwidth=60):
"""
协变量平衡性检验:断点两侧协变量是否连续
若不连续,则暗示选择偏误
"""
df_local = df[(df['score'] >= cutoff - bandwidth) &
(df['score'] <= cutoff + bandwidth)]
left = df_local[df_local['score'] < cutoff]
right = df_local[df_local['score'] >= cutoff]
balance_results = []
print("\n" + "="*60)
print("协变量平衡性检验(安慰剂断点)")
print("="*60)
print(f"协变量\t\t左侧均值\t右侧均值\t差异\t\tt值\t\tp值")
print("-"*80)
for cov in covariates:
left_mean = left[cov].mean()
right_mean = right[cov].mean()
diff = right_mean - left_mean
# t检验
t_stat, p_val = stats.ttest_ind(right[cov].values, left[cov].values)
print(f"{cov}\t\t{left_mean:.3f}\t\t{right_mean:.3f}\t\t{diff:.3f}\t\t"
f"{t_stat:.3f}\t\t{p_val:.3f}")
balance_results.append({
'covariate': cov,
'left_mean': left_mean,
'right_mean': right_mean,
'diff': diff,
't_stat': t_stat,
'p_value': p_val
})
return pd.DataFrame(balance_results)
# 执行平衡性检验
balance_df = covariate_balance_test(df_rdd)
章节总结:断点回归
四、双重差分法(DID):时间维度的识别
4.1 方法原理:双重差分消除不可观测混淆
DID利用面板数据的双重维度差异:组间差异 + 时间差异。通过比较处理组在干预前后变化与对照组同期变化的差异,消除不随时间变化的组间固定效应和同时影响所有组的时间效应。
基础DID模型:
其中是ATT(处理组的平均处理效应)。
核心假设:
- I. 平行趋势:若无干预,处理组与对照组的结果变化趋势相同
- II. 无预期效应:处理组不会在政策实施前改变行为
- III. SUTVA:不存在溢出效应
4.2 代码实战:最低工资政策对就业的影响
场景:某市2018年上调最低工资,评估对餐饮业就业的影响
# I. 数据生成:面板数据
def generate_did_data(n_firms=5000, n_periods=48, treatment_period=24):
"""
生成DID模拟数据
n_firms: 企业数量
n_periods: 时期数(月度)
treatment_period: 政策实施期
"""
np.random.seed(42)
# 企业特征(时不变)
firm_size = np.random.normal(50, 20, n_firms) # 基线员工数
region = np.random.binomial(1, 0.5, n_firms) # 0=低影响区, 1=高影响区
# 处理分配:高影响区企业更可能受政策影响
# 处理概率依赖于region和size
treatment_prob = 0.3 + 0.4 * region + 0.001 * (firm_size - 50)
treatment_prob = np.clip(treatment_prob, 0.1, 0.8)
treated = np.random.binomial(1, treatment_prob, n_firms)
# 创建面板数据
data_list = []
for firm in range(n_firms):
# 企业特定的趋势和季节效应
trend = np.random.normal(0.01, 0.005) # 月度增长趋势
season = np.random.normal(5, 2) # 季节效应幅度
for period in range(n_periods):
# 时间虚拟变量
post = int(period >= treatment_period)
time_trend = period * trend
# 季节效应(正弦波)
seasonal_effect = season * np.sin(2 * np.pi * period / 12)
# 行业冲击(共同时间效应)
industry_shock = 2 * np.sin(2 * np.pi * period / 36) # 3年周期
# 基线就业(企业固定效应 + 时间效应)
baseline_employment = firm_size[firm] + time_trend + \
seasonal_effect + industry_shock
# 处理效应:最低工资政策(仅对受处理企业在post=1时)
# 负向效应,且大企业和高影响区影响更大
if treated[firm] == 1 and post == 1:
treatment_effect = -3 - 0.05 * firm_size[firm] + \
2 * region[firm] # 高影响区缓解效应
else:
treatment_effect = 0
# 结果:就业人数
employment = baseline_employment + treatment_effect + \
np.random.normal(0, 1.5)
# 避免负值
employment = max(10, employment)
data_list.append({
'firm_id': firm,
'period': period,
'employment': employment,
'treated': treated[firm],
'post': post,
'treat_post': treated[firm] * post,
'region': region[firm],
'firm_size': firm_size[firm],
'time_trend': time_trend,
'seasonal': seasonal_effect
})
df_did = pd.DataFrame(data_list)
# 添加时间固定效应
df_did['year'] = df_did['period'] // 12 + 2016
df_did['month'] = df_did['period'] % 12
return df_did, treated, treatment_period
# 生成数据
df_did, treated_firms, treat_period = generate_did_data()
print("DID数据生成完成!")
print(f"企业数: {df_did['firm_id'].nunique()}")
print(f"总观测: {len(df_did)}")
print(f"\n处理组企业数: {treated_firms.sum()}")
print(f"对照组企业数: {(treated_firms == 0).sum()}")
print(f"政策实施期: 2016+{treat_period//12}年")
数据生成逻辑详解:
- 企业异质性:
firm_size和region作为不随时间变化的企业固定效应 - 时间效应:包含线性趋势、季节波动和行业周期冲击
- 处理效应异质性:大企业(规模>50)的负向效应更强,但高影响区(region=1)因经济基础好而缓解部分负面效应
- 平行趋势设定:处理组和对照组的
time_trend和seasonal_effect来自同一分布,满足平行趋势前提
II. 平行趋势检验:DID的基石
def parallel_trend_test(df, outcome='employment',
treatment_period=24, n_pre_periods=12):
"""
平行趋势检验与事件研究法
步骤:
1. 估计动态DID模型
2. 检验政策前系数是否显著不为0
3. 可视化事件研究图
"""
# I. 准备相对时间变量
df = df.copy()
df['relative_time'] = df['period'] - treatment_period
# 只保留政策前后若干期
df_plot = df[(df['relative_time'] >= -n_pre_periods) &
(df['relative_time'] <= 12)].copy()
# 创建时间虚拟变量(政策前为负,政策后为正)
time_dummies = pd.get_dummies(df_plot['relative_time'],
prefix='t', prefix_sep='')
# II. 动态DID模型
# Y_it = α_i + λ_t + Σ_k β_k * treat * I(relative_time=k) + ε_it
# 企业固定效应(通过去均值或虚拟变量)
firm_dummies = pd.get_dummies(df_plot['firm_id'], prefix='firm')
# 时间固定效应
period_dummies = pd.get_dummies(df_plot['period'], prefix='period')
# 交互项:处理组 × 相对时间
# 排除政策当期(relative_time=0)作为基准
interaction_terms = []
for col in time_dummies.columns:
time_val = int(col.replace('t', ''))
if time_val != 0: # 排除当期
interaction = df_plot['treated'].values * time_dummies[col].values
interaction_terms.append(interaction)
# III. 构建设计矩阵
# 因变量
y = df_plot[outcome].values
# 自变量:企业FE + 时间FE + 交互项
X_firm = firm_dummies.values
X_period = period_dummies.values
X_interact = np.column_stack(interaction_terms)
# 拼接
X = np.column_stack([X_firm, X_period, X_interact])
# 为避免虚拟变量陷阱,需要drop一个(这里用Ridge自动处理)
# IV. 估计(使用Ridge处理高维)
from sklearn.linear_model import RidgeCV
model = RidgeCV(alphas=np.logspace(-3, 3, 50), cv=5)
model.fit(X, y)
# V. 提取动态系数
n_firms = df['firm_id'].nunique()
n_periods = df['period'].nunique()
# 系数对应位置
coef_start = n_firms + n_periods
coefs = model.coef_[coef_start:]
# 构建结果DataFrame
time_values = [int(col.replace('t', '')) for col in time_dummies.columns
if int(col.replace('t', '')) != 0]
results = pd.DataFrame({
'relative_time': time_values,
'coefficient': coefs,
'period_type': ['post' if t >= 0 else 'pre' for t in time_values]
})
# 计算标准误(简化版,实际应使用聚类稳健标准误)
y_pred = model.predict(X)
residuals = y - y_pred
mse = np.mean(residuals**2)
# 简化的标准误(假设独立)
# 实际应使用linearmodels库进行聚类
ses = np.sqrt(mse / len(y)) * np.ones(len(coefs))
results['se'] = ses
results['ci_lower'] = results['coefficient'] - 1.96 * results['se']
results['ci_upper'] = results['coefficient'] + 1.96 * results['se']
# VI. 政策前系数联合检验
pre_results = results[results['period_type'] == 'pre']
# F检验(模拟)
f_stat = np.sum((pre_results['coefficient'] / pre_results['se'])**2)
p_value_f = 1 - stats.chi2.cdf(f_stat, len(pre_results))
print("\n" + "="*60)
print("平行趋势检验:政策前系数联合检验")
print("="*60)
print(f"F统计量: {f_stat:.3f}")
print(f"p值: {p_value_f:.3f}")
print(f"结论: {'通过' if p_value_f > 0.05 else '失败'}平行趋势检验")
# VII. 可视化事件研究
fig, ax = plt.subplots(figsize=(14, 8))
# 政策前(预期系数为0)
pre_data = results[results['period_type'] == 'pre']
ax.errorbar(pre_data['relative_time'], pre_data['coefficient'],
yerr=1.96*pre_data['se'], fmt='o-', color='blue',
capsize=5, capthick=2, markersize=8,
label='政策前(平行趋势检验)')
# 政策后
post_data = results[results['period_type'] == 'post']
ax.errorbar(post_data['relative_time'], post_data['coefficient'],
yerr=1.96*post_data['se'], fmt='s-', color='red',
capsize=5, capthick=2, markersize=8,
label='政策后(动态效应)')
# 参考线
ax.axhline(y=0, color='black', linestyle='--', alpha=0.5)
ax.axvline(x=-0.5, color='gray', linestyle=':', alpha=0.7,
label='政策实施期')
ax.axvline(x=0.5, color='gray', linestyle=':', alpha=0.7)
ax.set_xlabel('相对政策实施时间(期)', fontsize=13)
ax.set_ylabel('处理效应系数', fontsize=13)
ax.set_title('事件研究法:平行趋势与动态效应', fontsize=16, fontweight='bold')
ax.legend(fontsize=11)
ax.grid(True, alpha=0.3)
# 添加显著性标记
for _, row in results.iterrows():
if row['period_type'] == 'post':
if abs(row['coefficient'] / row['se']) > 1.96:
ax.text(row['relative_time'], row['ci_upper'] + 0.2,
'**', ha='center', fontsize=12, color='red')
plt.tight_layout()
plt.savefig('did_event_study.png', dpi=300, bbox_inches='tight')
plt.show()
return results
# 执行平行趋势检验
event_results = parallel_trend_test(df_did, outcome='employment',
treatment_period=treat_period)
事件研究图解读:
- 政策前:系数应接近0且置信区间包含0,表明处理组和对照组趋势平行
- 政策后:系数显著为负,表明最低工资政策降低了就业
- 动态模式:若政策效果逐期变化,可检验滞后效应和调整过程
III. 基准DID估计
def did_baseline_estimate(df, outcome='employment',
cluster_var='firm_id'):
"""
基准DID估计
模型:Y_it = β0 + β1*treat_i + β2*post_t + δ*(treat_i*post_t) + ε_it
重要:必须使用聚类稳健标准误(企业层面聚类)
这里使用linearmodels库实现
"""
from linearmodels.panel import PanelOLS
# 设置索引,创建面板数据结构
df_panel = df.set_index(['firm_id', 'period'])
# 添加常数项
df_panel['const'] = 1
# 变量准备
y = df_panel[outcome]
X = df_panel[['const', 'treated', 'post', 'treat_post']]
# 执行PanelOLS
model = PanelOLS(y, X, entity_effects=False, time_effects=False)
results = model.fit(cov_type='clustered', cluster_entity=True)
# 提取关键结果
effect = results.params['treat_post']
se = results.std_errors['treat_post']
ci = results.conf_int().loc['treat_post'].values
r_squared = results.rsquared
print("\n" + "="*60)
print("基准DID估计结果")
print("="*60)
print(results.summary)
# 可视化系数
fig, ax = plt.subplots(figsize=(12, 6))
coef_names = ['treated', 'post', 'treat_post']
coef_values = [results.params[name] for name in coef_names]
se_values = [results.std_errors[name] for name in coef_names]
# 创建柱状图
bars = ax.bar(coef_names, coef_values,
color=['skyblue', 'lightgreen', 'salmon'],
alpha=0.8, edgecolor='black')
# 添加置信区间
for i, (coef, se) in enumerate(zip(coef_values, se_values)):
ax.errorbar(i, coef, yerr=1.96*se, fmt='none', color='black',
capsize=5, capthick=2)
ax.axhline(y=0, color='black', linestyle='--', alpha=0.5)
ax.set_ylabel('系数估计', fontsize=13)
ax.set_title('DID模型系数与置信区间', fontsize=16, fontweight='bold')
# 添加显著性标记
for i, (coef, se) in enumerate(zip(coef_values, se_values)):
t_stat = coef / se
if abs(t_stat) > 2.58:
ax.text(i, coef + 2*se + 0.5, '***', ha='center', fontsize=14)
elif abs(t_stat) > 1.96:
ax.text(i, coef + 2*se + 0.5, '**', ha='center', fontsize=14)
elif abs(t_stat) > 1.65:
ax.text(i, coef + 2*se + 0.5, '*', ha='center', fontsize=14)
plt.tight_layout()
plt.savefig('did_coefficients.png', dpi=300, bbox_inches='tight')
plt.show()
return {
'effect': effect,
'se': se,
'ci': ci,
'r_squared': r_squared,
'results': results
}
# 执行基准DID
did_results = did_baseline_estimate(df_did)
print(f"\n核心结果:最低工资政策的就业效应")
print(f"估计效应: {did_results['effect']:.3f} 人")
print(f"标准误: {did_results['se']:.3f}")
print(f"95%CI: [{did_results['ci'][0]:.3f}, {did_results['ci'][1]:.3f}]")
DID结果解读:
- treat_post系数:核心关注的处理效应,负值表明政策降低就业
- treated系数:处理组与对照组的固有差异(水平差异)
- post系数:政策实施后所有企业的共同时间效应
- 聚类稳健标准误:消除企业内序列相关带来的标准误低估
IV. 高级DID:交错DID与Bacon分解
def staggered_did_analysis(df, outcome='employment'):
"""
处理交错DID问题(不同企业政策时间不同)
使用Bacon分解检验处理效应异质性
"""
# 这里简化:假设所有处理组在同一时间接受政策
print("标准DID分析完成。对于交错DID,建议使用...")
# 提供理论框架
return """
交错DID的Bacon分解框架:
Callaway & Sant'Anna (2021) 方法:
1. 对每个处理时间点分别估计ATT
2. 权重平均得到整体ATT
3. 检验处理时间异质性
关键假设:无携带效应(No Carryover Effects)
即早期处理不影响后期处理组的潜在结果
Python实现:
- Staggered DID包(需R通过rpy2调用)
- 手动实现:分组回归+加权平均
"""
# 显示理论
print(staggered_did_analysis())
章节总结:双重差分
五、工具变量法(IV):寻找外生变异
5.1 方法原理:当处理变量内生时
当处理变量与误差项相关时(内生性问题),工具变量提供了解决方案。有效的工具必须满足:
- I. 相关性:(第一阶段显著)
- II. 排他性约束:(工具仅通过影响)
- III. 单调性:处理效应方向一致
两阶段最小二乘(2SLS):
第一阶段:D_i = π_0 + π_1 Z_i + ν_i
第二阶段:Y_i = β_0 + β_1 \hat{D}_i + ε_i
IV估计量:
5.2 代码实战:教育回报率估计
场景:使用义务教育法作为教育的工具,估计教育对收入的因果影响
# I. 数据生成:教育内生性问题
def generate_iv_data(n=10000):
"""
生成IV模拟数据
结构:
- 教育年限 Ed: 受能力(ability)和家庭背景(fam_bg)影响
- 工具变量 Z: 义务教育法实施强度
- 结果变量 Y: 收入,受能力直接影响(内生性来源)
"""
np.random.seed(42)
# I. 外生变量
ability = np.random.normal(0, 1, n) # 不可观测的能力
fam_bg = np.random.normal(12, 3, n) # 家庭背景(父母教育年限)
region = np.random.binomial(1, 0.5, n) # 地区(0=农村/执法弱,1=城市/执法强)
# II. 工具变量:义务教育法强度
# 在城市地区,法律强制要求至少9年教育
# 在农村地区,执行力度较弱
law_strength = 0.5 + 0.5 * region + np.random.normal(0, 0.2, n)
law_strength = np.clip(law_strength, 0, 1)
# Z是离散的(0,1),模拟是否处于强执法区域
Z = (law_strength > 0.7).astype(int)
# III. 内生处理变量:教育年限
# 教育受能力、家庭背景和工具变量共同影响
# 内生性:能力与收入直接相关
education = 9 + 0.5 * ability + 0.8 * fam_bg + 2.5 * Z + \
np.random.normal(0, 1.5, n)
education = np.clip(education, 6, 16) # 限制范围
# IV. 结果变量:年收入(万)
# 真实因果效应:每多一年教育增加1.5万收入
# 但能力也直接影响收入(遗漏变量偏误)
income = 5 + 1.5 * education + 2 * ability + np.random.normal(0, 2, n)
# V. 观测数据(无法观测ability)
df_iv = pd.DataFrame({
'education': education,
'income': income,
'Z': Z,
'region': region,
'fam_bg': fam_bg,
'ability': ability # 仅用于验证
})
return df_iv, ability
# 生成数据
df_iv, true_ability = generate_iv_data()
print("IV数据生成完成!")
print(df_iv[['education', 'income', 'Z', 'region']].describe().round(2))
# II. 展示内生性:OLS估计偏误
from sklearn.linear_model import LinearRegression
# 错误做法:OLS忽略能力变量
ols_model = LinearRegression()
ols_model.fit(df_iv[['education']], df_iv['income'])
ols_coef = ols_model.coef_[0]
print(f"\n比较真实因果效应与OLS估计")
print(f"真实因果效应: 1.5")
print(f"OLS估计效应: {ols_coef:.3f}")
print(f"偏误大小: {ols_coef - 1.5:.3f} (向上偏误)")
print("原因:遗漏了能力变量,能力与教育正相关")
内生性来源说明:
- 遗漏变量偏误:能力同时影响教育和收入
- 测量误差:教育年限报告误差导致衰减偏误
- 双向因果:高收入者更可能继续教育
III. 两阶段最小二乘(2SLS)实现
def two_stage_least_squares(df, outcome='income', treatment='education',
instrument='Z', covariates=['fam_bg', 'region'],
robust=True):
"""
手动实现2SLS
"""
from sklearn.linear_model import LinearRegression
n = len(df)
# I. 第一阶段:教育 = f(工具变量, 协变量)
print("\n" + "="*60)
print("第一阶段回归:教育对工具变量")
print("="*60)
X_first = df[[instrument] + covariates].values
y_first = df[treatment].values
# 添加常数项
X_first_with_const = np.column_stack([np.ones(n), X_first])
model_first = LinearRegression()
model_first.fit(X_first_with_const, y_first)
# 预测值(拟合的教育)
education_hat = model_first.predict(X_first_with_const)
# 第一阶段统计量
# F统计量(检验工具相关性)
y_first_mean = y_first.mean()
ss_total = np.sum((y_first - y_first_mean)**2)
ss_residual = np.sum((y_first - education_hat)**2)
ss_explained = ss_total - ss_residual
# 简化F统计量
# 实际应使用异方差稳健标准误
f_stat = (ss_explained / len(covariates)) / (ss_residual / (n - len(covariates) - 1))
print(f"工具变量系数: {model_first.coef_[1]:.3f}")
print(f"F统计量: {f_stat:.3f}")
print(f"F检验p值: {1 - stats.f.cdf(f_stat, len(covariates), n - len(covariates) - 1):.4f}")
print(f"R²: {1 - ss_residual/ss_total:.3f}")
# 检查弱工具(经验规则:F > 10)
if f_stat > 10:
print("✓ 通过弱工具检验(F > 10)")
else:
print("✗ 弱工具警告!F统计量过低")
# II. 第二阶段:收入 = f(拟合的教育, 协变量)
print("\n" + "="*60)
print("第二阶段回归:收入对教育(2SLS)")
print("="*60)
X_second = df[covariates].values
X_second_with_const = np.column_stack([np.ones(n), education_hat, X_second])
y_second = df[outcome].values
model_second = LinearRegression()
model_second.fit(X_second_with_const, y_second)
# 2SLS系数
iv_coef = model_second.coef_[1]
print(f"2SLS估计效应: {iv_coef:.3f}")
# III. 标准误计算(简化版)
# 实际应使用GMM或聚类稳健
residuals = y_second - model_second.predict(X_second_with_const)
mse = np.mean(residuals**2)
# 教育系数的方差(简化)
# 使用X'X矩阵的逆的对角线元素
xtx_inv = np.linalg.inv(X_second_with_const.T @ X_second_with_const)
var_coef = mse * xtx_inv[1, 1] # 教育系数的方差
iv_se = np.sqrt(var_coef)
ci = [iv_coef - 1.96 * iv_se, iv_coef + 1.96 * iv_se]
print(f"标准误: {iv_se:.3f}")
print(f"95%CI: [{ci[0]:.3f}, {ci[1]:.3f}]")
# IV. 对比OLS
ols_model = LinearRegression()
ols_model.fit(df[covariates + [treatment]], df[outcome])
ols_coef = ols_model.coef_[-1] # 最后一个系数是教育
return {
'iv_coef': iv_coef,
'iv_se': iv_se,
'iv_ci': ci,
'ols_coef': ols_coef,
'f_stat': f_stat,
'first_stage_model': model_first,
'second_stage_model': model_second
}
# 执行2SLS估计
iv_results = two_stage_least_squares(df_iv)
print("\n" + "="*60)
print("效应估计对比")
print("="*60)
print(f"真实因果效应: 1.500")
print(f"OLS估计: {iv_results['ols_coef']:.3f} (向上偏误)")
print(f"2SLS估计: {iv_results['iv_coef']:.3f} (接近真值)")
print(f"偏误减少: {abs(iv_results['ols_coef'] - 1.5) - abs(iv_results['iv_coef'] - 1.5):.3f}")
2SLS结果解读:
- 第一阶段F统计量:必须大于10(经验法则),否则为弱工具
- IV估计量:应接近真实值1.5,尽管标准误通常大于OLS
- 偏误校正:IV消除了能力带来的向上偏误
IV. 过度识别检验(Sargan检验)
def sargan_overid_test(df, instruments=['Z', 'region_rf'],
outcome='income', treatment='education',
covariates=['fam_bg']):
"""
Sargan过度识别检验(多个工具变量时)
H0: 所有工具变量满足排他性约束
"""
n = len(df)
k = len(instruments) # 工具变量数量
m = 1 # 内生变量数量
if k <= m:
print("工具变量数 ≤ 内生变量数,无法进行过度识别检验")
return None
print("\n" + "="*60)
print("Sargan过度识别检验")
print("="*60)
# 2SLS估计
# 第一阶段(多变量)
X_first = df[instruments + covariates].values
y_first = df[treatment].values
X_first_const = np.column_stack([np.ones(n), X_first])
model_first = LinearRegression()
model_first.fit(X_first_const, y_first)
education_hat = model_first.predict(X_first_const)
# 第二阶段
X_second = df[covariates].values
X_second_const = np.column_stack([np.ones(n), education_hat, X_second])
y_second = df[outcome].values
model_second = LinearRegression()
model_second.fit(X_second_const, y_second)
# 第二阶段残差
residuals_2sls = y_second - model_second.predict(X_second_const)
# 用残差对所有外生变量回归(包括工具)
X_all = df[instruments + covariates].values
X_all_const = np.column_stack([np.ones(n), X_all])
model_resid = LinearRegression()
model_resid.fit(X_all_const, residuals_2sls)
# R²
ss_resid = np.sum((residuals_2sls - model_resid.predict(X_all_const))**2)
ss_total = np.sum((residuals_2sls - residuals_2sls.mean())**2)
r_squared = 1 - ss_resid/ss_total
# Sargan统计量
# 由于计算复杂,简化版
sargan_stat = n * r_squared
df_overid = k - m
p_value = 1 - stats.chi2.cdf(sargan_stat, df_overid)
print(f"Sargan统计量: {sargan_stat:.3f}")
print(f"自由度: {df_overid}")
print(f"p值: {p_value:.4f}")
print(f"结论: {'通过检验' if p_value > 0.05 else '拒绝H0,存在无效工具'}")
return {
'sargan_stat': sargan_stat,
'df': df_overid,
'p_value': p_value
}
# 创建第二个工具变量:地区义务教育执法强度
df_iv['region_rf'] = (df_iv['region'] * df_iv['Z'] +
np.random.binomial(1, 0.3, len(df_iv)))
# 执行过度识别检验
sargan_result = sargan_overid_test(df_iv,
instruments=['Z', 'region_rf'],
covariates=['fam_bg'])
章节总结:工具变量
六、合成控制法(SCM):单一处理单元的反事实
6.1 方法原理:数据驱动的权重合成
当只有一个处理单元(如一个州实施政策,其他州为对照),且处理单元的特征独特时,传统DID失效。SCM通过对照单元的加权平均构建一个"合成对照",使其在干预前完美匹配处理单元的特征。
优化问题:
约束条件:
- (非负权重)
- (权重和为1)
- (协变量匹配)
6.2 代码实战:加州烟草控制法案效果评估
场景:1988加州通过Proposition 99(烟草控制法案),评估其对人均烟草消费的影响
# I. 数据生成:38州1970-2000年烟草消费面板
def generate_scm_data():
"""
生成SCM模拟数据
模仿Abadie et al. (2010) 加州烟草控制案例
"""
np.random.seed(42)
# 38个州,31年(1970-2000)
n_states = 38
n_years = 31
years = np.arange(1970, 2001)
# 加州为处理单元(第0个)
treated_state = 0
# 干预年份:1988(Proposition 99)
intervention_year = 1988
# 各州特征:烟草价格敏感度、收入水平、人口构成等
state_chars = np.random.normal(0, 1, (n_states, 5))
# 加州特征:更高的价格敏感度
state_chars[treated_state, 0] = 1.5 # price_sensitivity
# 生成时间序列
data_list = []
for state in range(n_states):
# 州特定的趋势
state_trend = np.random.normal(0, 0.5) + 0.1 * state_chars[state, 1]
# 共同时间效应(反烟运动趋势)
common_trend = -0.5 * (years - 1970) + 0.02 * (years - 1970)**2
for idx, year in enumerate(years):
# 基线消费(随时间递减)
base_consumption = 120 + state_trend * (year - 1970) + common_trend[idx] + \
np.random.normal(0, 3)
# 处理效应:加州在1988年后
treatment_effect = 0
if state == treated_state and year >= intervention_year:
# 法案使消费下降15包/人,但效果逐年递减
years_since = year - intervention_year
treatment_effect = -15 * 0.9**years_since
# 观测消费
consumption = base_consumption + treatment_effect + \
np.random.normal(0, 2)
# 协变量(预测变量)
# 经济特征:收入、教育、年龄结构
economy = 50 + 0.5 * (year - 1970) + state_chars[state, 2] * 10
data_list.append({
'state': state,
'year': year,
'consumption': consumption,
'treated': 1 if state == treated_state else 0,
'post': 1 if year >= intervention_year else 0,
'state_id': f'State_{state}',
'treatment_effect': treatment_effect,
'economy': economy,
'age_structure': state_chars[state, 3],
'education': state_chars[state, 4]
})
df_scm = pd.DataFrame(data_list)
# 标记干预前后
df_scm['period'] = df_scm['year'].apply(
lambda x: 'pre' if x < intervention_year else 'post'
)
return df_scm, treated_state, intervention_year
# 生成数据
df_scm, ca_state, intervene_year = generate_scm_data()
print("SCM数据生成完成!")
print(f"处理州: {ca_state}")
print(f"干预年份: {intervene_year}")
print(df_scm[['state', 'year', 'consumption', 'treatment_effect']].head(10))
数据生成逻辑:
- 加州特定效应:更高的价格敏感度,使法案效果更显著
- 衰减效应:模拟政策效果随时间减弱
- 协变量:包含经济、人口结构特征,用于权重优化
II. 合成控制构建
from scipy.optimize import minimize
import warnings
def synthetic_control(df, treated_state=0, outcome='consumption',
intervention_year=1988, predictors=['economy',
'age_structure', 'education']):
"""
实现合成控制法
步骤:
1. 准备干预前数据
2. 定义损失函数(拟合优度)
3. 优化权重
4. 构建合成控制序列
"""
# I. 数据准备
states = df['state'].unique()
control_states = [s for s in states if s != treated_state]
# 干预前数据
pre_treatment = df[df['year'] < intervention_year].copy()
# II. 定义损失函数
def loss_function(weights):
"""
权重w的损失函数
目标:最小化合成州与加州在干预前的差异
"""
# 约束:权重和为1,非负
if np.abs(np.sum(weights) - 1) > 1e-6 or np.any(weights < 0):
return 1e10
# 结果变量拟合:干预前各期
treated_outcome = pre_treatment[
pre_treatment['state'] == treated_state
][outcome].values
synthetic_outcome = np.zeros(len(treated_outcome))
for i, state in enumerate(control_states):
state_outcome = pre_treatment[
pre_treatment['state'] == state
][outcome].values
synthetic_outcome += weights[i] * state_outcome
# 均方误差
mse = np.mean((treated_outcome - synthetic_outcome)**2)
# 添加协变量匹配惩罚项
# 计算各州协变量均值(干预前)
treated_predictors = pre_treatment[
pre_treatment['state'] == treated_state
][predictors].mean().values
synthetic_predictors = np.zeros(len(predictors))
for i, state in enumerate(control_states):
state_predictors = pre_treatment[
pre_treatment['state'] == state
][predictors].mean().values
synthetic_predictors += weights[i] * state_predictors
predictor_mse = np.mean((treated_predictors - synthetic_predictors)**2)
# 总损失(可调权重)
total_loss = mse + 100 * predictor_mse
return total_loss
# III. 优化权重
# 初始权重:均匀分布
n_controls = len(control_states)
w0 = np.ones(n_controls) / n_controls
# 约束:权重和为1,非负
constraints = [
{'type': 'eq', 'fun': lambda w: np.sum(w) - 1}
]
bounds = [(0, 1) for _ in range(n_controls)]
# 执行优化
with warnings.catch_warnings():
warnings.simplefilter("ignore")
result = minimize(
fun=loss_function,
x0=w0,
method='SLSQP',
bounds=bounds,
constraints=constraints,
options={'maxiter': 1000, 'ftol': 1e-10}
)
if result.success:
weights = result.x
print("\n✓ 权重优化成功!")
# 展示权重分布
weights_df = pd.DataFrame({
'state': [f'State_{s}' for s in control_states],
'weight': weights
})
weights_df = weights_df.sort_values('weight', ascending=False)
print("\n合成控制权重分配(前5名):")
print(weights_df.head().to_string(index=False))
# 展示零权重数量
zero_weights = np.sum(weights < 0.01)
print(f"\n零权重州数量: {zero_weights}/{n_controls}")
else:
print("✗ 权重优化失败")
weights = w0
# IV. 构建合成控制序列
# 干预前
pre_years = df[df['year'] < intervene_year]['year'].unique()
treated_pre = df[
(df['state'] == ca_state) &
(df['year'] < intervene_year)
].set_index('year')[outcome]
synthetic_pre = np.zeros(len(pre_years))
synthetic_pre_series = pd.Series(index=pre_years)
for i, state in enumerate(control_states):
state_pre = df[
(df['state'] == state) &
(df['year'] < intervene_year)
].set_index('year')[outcome]
if i == 0:
synthetic_pre = weights[i] * state_pre
else:
synthetic_pre += weights[i] * state_pre
# 干预后
post_years = df[df['year'] >= intervene_year]['year'].unique()
synthetic_post = np.zeros(len(post_years))
for i, state in enumerate(control_states):
state_post = df[
(df['state'] == state) &
(df['year'] >= intervene_year)
].set_index('year')[outcome]
if i == 0:
synthetic_post = weights[i] * state_post
else:
synthetic_post += weights[i] * state_post
# 完整合成序列
synthetic_full = pd.concat([synthetic_pre, synthetic_post])
return {
'weights': weights,
'weights_df': weights_df,
'treated_pre': treated_pre,
'synthetic_pre': synthetic_pre,
'synthetic_post': synthetic_post,
'synthetic_full': synthetic_full
}
# 执行合成控制
scm_results = synthetic_control(df_scm, treated_state=ca_state,
outcome='consumption',
intervention_year=intervene_year)
print("\n合成控制构建完成!")
III. 效应估计与可视化
def scm_effect_estimation(df, scm_results, treated_state=0,
intervention_year=1988, outcome='consumption'):
"""
SCM效应估计与图形展示
"""
# I. 提取数据
full_years = df['year'].unique()
treated_series = df[
df['state'] == treated_state
].set_index('year')[outcome]
synthetic_series = scm_results['synthetic_full']
# II. 计算效应
effect_series = treated_series - synthetic_series
# 干预前均方根误差(拟合优度)
pre_periods = full_years < intervention_year
pre_treated = treated_series[pre_periods]
pre_synthetic = synthetic_series[pre_periods]
rmse_pre = np.sqrt(np.mean((pre_treated - pre_synthetic)**2))
mape_pre = np.mean(np.abs((pre_treated - pre_synthetic) / pre_treated)) * 100
print("\n" + "="*60)
print("SCM拟合优度(干预前)")
print("="*60)
print(f"RMSE: {rmse_pre:.2f} 包/人")
print(f"MAPE: {mape_pre:.1f}%")
if mape_pre < 5:
print("✓ 拟合优度优秀(MAPE < 5%)")
elif mape_pre < 10:
print("✓ 拟合优度良好(MAPE < 10%)")
else:
print("⚠ 拟合优度较差,结果需谨慎")
# III. 可视化
fig, axes = plt.subplots(2, 1, figsize=(14, 12))
# 1. 主要图形:实际 vs 合成
ax1 = axes[0]
ax1.plot(full_years, treated_series.values, 'o-', color='blue',
linewidth=2, markersize=6, label='加州实际')
ax1.plot(full_years, synthetic_series.values, 's--', color='red',
linewidth=2, markersize=5, label='合成加州')
# 干预线
ax1.axvline(x=intervention_year, color='black', linestyle=':',
linewidth=2, alpha=0.8)
ax1.text(intervention_year + 0.5, treated_series.max() * 0.9,
'Proposition 99\n实施', fontsize=11,
bbox=dict(boxstyle='round', facecolor='yellow', alpha=0.5))
ax1.set_xlabel('年份', fontsize=12)
ax1.set_ylabel('人均烟草消费(包/年)', fontsize=12)
ax1.set_title('合成控制法:加州烟草控制法案效果', fontsize=16, fontweight='bold')
ax1.legend(fontsize=11)
ax1.grid(True, alpha=0.3)
# 2. 效应图
ax2 = axes[1]
ax2.plot(full_years, effect_series.values, 'o-', color='purple',
linewidth=2, markersize=6, label='因果效应')
ax2.axhline(y=0, color='black', linestyle='--', alpha=0.5)
ax2.axvline(x=intervention_year, color='black', linestyle=':',
linewidth=2, alpha=0.8)
# 计算干预后平均效应
post_effect_mean = effect_series[~pre_periods].mean()
ax2.text(intervention_year + 2, post_effect_mean + 5,
f'平均效应: {post_effect_mean:.1f} 包/人',
fontsize=12, fontweight='bold',
bbox=dict(boxstyle='round', facecolor='lightgreen', alpha=0.7))
ax2.set_xlabel('年份', fontsize=12)
ax2.set_ylabel('处理效应(加州 - 合成加州)', fontsize=12)
ax2.set_title('估计处理效应', fontsize=16, fontweight='bold')
ax2.legend(fontsize=11)
ax2.grid(True, alpha=0.3)
plt.tight_layout()
plt.savefig('scm_analysis.png', dpi=300, bbox_inches='tight')
plt.show()
# IV. 统计推断:安慰剂检验
print("\n" + "="*60)
print("安慰剂检验:验证SCM显著性")
print("="*60)
# 对每个对照州进行安慰剂检验
placebo_effects = {}
for state in control_states:
# 假装该州是处理州
placebo_result = synthetic_control(
df, treated_state=state, outcome='consumption',
intervention_year=intervene_year
)
# 计算该州的干预后效应
placebo_series = df[
df['state'] == state
].set_index('year')[outcome]
placebo_synthetic = placebo_result['synthetic_full']
placebo_effect = (placebo_series - placebo_synthetic)[~pre_periods].mean()
placebo_effects[state] = placebo_effect
# 将加州真实效应与安慰剂分布比较
ca_effect = effect_series[~pre_periods].mean()
placebo_effects['California'] = ca_effect
# 排序,查看加州的排名
effect_rank = sorted(placebo_effects.items(),
key=lambda x: x[1], reverse=True)
ca_rank = [i for i, (s, e) in enumerate(effect_rank)
if s == 'California'][0] + 1
# p值近似
p_value = ca_rank / len(effect_rank)
print(f"加州效应({ca_effect:.2f})在{len(effect_rank)}个州中的排名: {ca_rank}")
print(f"近似p值: {p_value:.3f}")
if p_value < 0.05:
print("✓ 效应显著(安慰剂检验通过)")
else:
print("⚠ 效应不显著")
# 可视化安慰剂分布
fig, ax = plt.subplots(figsize=(10, 6))
placebo_values = [e for s, e in effect_rank if s != 'California']
ca_value = ca_effect
ax.hist(placebo_values, bins=20, alpha=0.7, color='skyblue',
edgecolor='black', label='安慰剂效应')
ax.axvline(x=ca_value, color='red', linewidth=3,
label=f'加州真实效应 ({ca_value:.1f})')
ax.axvline(x=np.mean(placebo_values), color='orange', linestyle='--',
label=f'安慰剂均值 ({np.mean(placebo_values):.1f})')
ax.set_xlabel('干预后平均处理效应(包/人)', fontsize=12)
ax.set_ylabel('频数', fontsize=12)
ax.set_title('安慰剂检验:加州vs其他州', fontsize=16, fontweight='bold')
ax.legend()
ax.grid(True, alpha=0.3)
plt.tight_layout()
plt.savefig('scm_placebo.png', dpi=300, bbox_inches='tight')
plt.show()
return {
'effect_series': effect_series,
'post_effect_mean': post_effect_mean,
'placebo_effects': placebo_effects,
'p_value': p_value,
'rmse_pre': rmse_pre
}
# 执行效应估计
scm_effect_results = scm_effect_estimation(
df_scm, scm_results, treated_state=ca_state,
intervention_year=intervene_year
)
SCM结果解读:
- 拟合优度:MAPE<5%表示合成加州与真实加州在干预前高度相似
- 效应估计:干预后加州实际值低于合成值,表明政策有效
- 安慰剂检验:加州效应在安慰剂分布中的排名(如第1/37)给出p值
- 稳健性:权重集中在少数几个州,符合"稀疏权重"原则
- 点赞
- 收藏
- 关注作者
评论(0)