断点回归设计的实施要点与注意事项
【摘要】 I. 断点回归设计的基本原理与理论基础 RDD的核心直觉断点回归设计的基本思想相当直观:当处理分配完全或部分依赖于某个连续变量(称为运行变量或分配变量)是否超过某个特定阈值时,我们可以比较阈值两侧个体的结果差异,从而估计处理的因果效应。 实例分析:奖学金对学业表现的影响考虑一个经典的教育经济学问题:奖学金是否真的提高了学生的学业表现?假设某大学规定,高考分数达到600分的学生自动获得奖学金...
I. 断点回归设计的基本原理与理论基础
RDD的核心直觉
断点回归设计的基本思想相当直观:当处理分配完全或部分依赖于某个连续变量(称为运行变量或分配变量)是否超过某个特定阈值时,我们可以比较阈值两侧个体的结果差异,从而估计处理的因果效应。
实例分析:奖学金对学业表现的影响
考虑一个经典的教育经济学问题:奖学金是否真的提高了学生的学业表现?假设某大学规定,高考分数达到600分的学生自动获得奖学金,而低于600分的学生则没有。在这种情况下:
- 运行变量:高考分数
- 阈值:600分
- 处理:获得奖学金
- 结果变量:大学GPA
通过比较高考分数在600分附近(如595-605分)学生的大学GPA,我们可以估计奖学金对学业表现的因果效应。
数学模型框架
在精确断点回归设计中,处理分配规则可以表示为:
其中是运行变量,是阈值,是处理指示变量。
我们感兴趣的处理效应可以通过比较阈值两侧的结果变量极限值来估计:
RDD与其他因果推断方法的比较
方法 | 识别策略 | 关键假设 | 适用场景 |
---|---|---|---|
断点回归设计 | 阈值附近的局部比较 | 运行变量不被精确操控 | 基于明确规则的处理分配 |
随机对照试验 | 随机化分配 | 随机化成功执行 | 可进行随机化的场景 |
双重差分法 | 处理组和对照组的时间趋势比较 | 平行趋势假设 | 面板数据,多期处理 |
工具变量法 | 外生变量影响处理但不直接影响结果 | 排他性约束 | 存在有效的工具变量 |
匹配方法 | 基于可观测变量的相似个体匹配 | 无不可观测混杂 | 横截面数据 |
II. 清晰断点与模糊断点的区别
清晰断点回归
在清晰断点(Sharp RD)设计中,处理分配完全由运行变量是否超过阈值决定:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import statsmodels.api as sm
import statsmodels.formula.api as smf
from sklearn.linear_model import LinearRegression
from sklearn.preprocessing import PolynomialFeatures
import warnings
warnings.filterwarnings('ignore')
class SharpRDAnalysis:
"""清晰断点回归分析"""
def __init__(self):
self.data = None
self.results = {}
def generate_sharp_rd_data(self, n=1000, threshold=0, true_effect=0.5,
noise_sd=0.5, nonlinear=False):
"""生成清晰断点回归数据"""
np.random.seed(42)
# 生成运行变量(均匀分布 around threshold)
running_var = np.random.uniform(-2, 2, n)
# 清晰断点:完全由运行变量决定处理状态
treatment = (running_var >= threshold).astype(int)
# 生成结果变量
if nonlinear:
# 非线性关系
baseline = 2 + 0.3 * running_var - 0.1 * running_var**2
else:
# 线性关系
baseline = 2 + 0.3 * running_var
# 添加处理效应和噪声
Y = baseline + true_effect * treatment + np.random.normal(0, noise_sd, n)
self.data = pd.DataFrame({
'running_var': running_var,
'treatment': treatment,
'Y': Y,
'above_threshold': (running_var >= threshold).astype(int)
})
return self.data
def visualize_sharp_rd(self, bandwidth=None):
"""可视化清晰断点回归"""
if self.data is None:
raise ValueError("请先生成数据")
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(15, 6))
# 散点图
ax1.scatter(self.data['running_var'], self.data['Y'],
c=self.data['treatment'], alpha=0.6, cmap='coolwarm')
ax1.axvline(x=0, color='red', linestyle='--', linewidth=2,
label='阈值 (x=0)')
ax1.set_xlabel('运行变量')
ax1.set_ylabel('结果变量')
ax1.set_title('清晰断点回归: 数据散点图')
ax1.legend()
ax1.grid(True, alpha=0.3)
# 箱线图/小提琴图
if bandwidth is None:
bandwidth = 1.0
subset = self.data[np.abs(self.data['running_var']) <= bandwidth]
sns.violinplot(x='above_threshold', y='Y', data=subset, ax=ax2)
ax2.set_xlabel('是否超过阈值')
ax2.set_ylabel('结果变量')
ax2.set_title(f'阈值附近{bandwidth}范围内的分布比较')
ax2.set_xticklabels(['低于阈值', '高于阈值'])
plt.tight_layout()
plt.show()
return fig
def estimate_sharp_rd_effect(self, bandwidth=1.0, polynomial_order=1):
"""估计清晰断点回归效应"""
if self.data is None:
raise ValueError("请先生成数据")
# 选择带宽内的数据
subset = self.data[np.abs(self.data['running_var']) <= bandwidth].copy()
# 创建运行变量的中心化版本
subset['running_centered'] = subset['running_var']
if polynomial_order == 1:
# 线性回归
formula = 'Y ~ treatment + running_centered'
else:
# 多项式回归
formula = 'Y ~ treatment + '
poly_terms = []
for order in range(1, polynomial_order + 1):
poly_terms.append(f'I(running_centered**{order})')
formula += ' + '.join(poly_terms)
model = smf.ols(formula, data=subset).fit()
# 提取处理效应
treatment_effect = model.params.get('treatment', 0)
treatment_se = model.bse.get('treatment', 0)
self.results['sharp_rd'] = {
'model': model,
'treatment_effect': treatment_effect,
'standard_error': treatment_se,
't_statistic': treatment_effect / treatment_se,
'p_value': model.pvalues.get('treatment', 1),
'bandwidth': bandwidth,
'polynomial_order': polynomial_order,
'n_observations': len(subset)
}
return self.results['sharp_rd']
# 清晰断点回归示例
sharp_rd = SharpRDAnalysis()
sharp_data = sharp_rd.generate_sharp_rd_data(n=800, true_effect=0.8, nonlinear=False)
sharp_rd.visualize_sharp_rd(bandwidth=1.0)
# 估计处理效应
results_linear = sharp_rd.estimate_sharp_rd_effect(bandwidth=1.0, polynomial_order=1)
results_quadratic = sharp_rd.estimate_sharp_rd_effect(bandwidth=1.0, polynomial_order=2)
print("清晰断点回归结果:")
print(f"线性模型估计效应: {results_linear['treatment_effect']:.3f} (p值: {results_linear['p_value']:.3f})")
print(f"二次模型估计效应: {results_quadratic['treatment_effect']:.3f} (p值: {results_quadratic['p_value']:.3f})")
模糊断点回归
在模糊断点(Fuzzy RD)设计中,运行变量超过阈值只是影响处理概率,而非完全决定处理状态:
class FuzzyRDAnalysis:
"""模糊断点回归分析"""
def __init__(self):
self.data = None
self.results = {}
def generate_fuzzy_rd_data(self, n=1000, threshold=0, first_stage_effect=0.6,
true_effect=0.5, noise_sd=0.5, compliance_rate=0.7):
"""生成模糊断点回归数据"""
np.random.seed(42)
# 生成运行变量
running_var = np.random.uniform(-2, 2, n)
# 第一阶段:运行变量影响处理概率
eligibility = (running_var >= threshold).astype(int) # 资格变量
# 非完全依从:有些合格个体不参加,有些不合格个体参加
base_treatment_prob = 0.3 # 基础处理概率
first_stage_jump = first_stage_effect # 阈值处的处理概率跳跃
treatment_prob = (base_treatment_prob +
first_stage_jump * eligibility +
np.random.normal(0, 0.1, n))
treatment_prob = np.clip(treatment_prob, 0, 1)
treatment = np.random.binomial(1, treatment_prob)
# 生成结果变量
baseline = 2 + 0.3 * running_var
Y = (baseline + true_effect * treatment +
np.random.normal(0, noise_sd, n))
self.data = pd.DataFrame({
'running_var': running_var,
'eligibility': eligibility,
'treatment_prob': treatment_prob,
'treatment': treatment,
'Y': Y,
'above_threshold': (running_var >= threshold).astype(int)
})
return self.data
def visualize_fuzzy_rd(self):
"""可视化模糊断点回归"""
if self.data is None:
raise ValueError("请先生成数据")
fig, (ax1, ax2, ax3) = plt.subplots(1, 3, figsize=(18, 6))
# 结果变量散点图
ax1.scatter(self.data['running_var'], self.data['Y'],
c=self.data['treatment'], alpha=0.6, cmap='coolwarm')
ax1.axvline(x=0, color='red', linestyle='--', linewidth=2)
ax1.set_xlabel('运行变量')
ax1.set_ylabel('结果变量')
ax1.set_title('模糊断点回归: 结果变量')
ax1.grid(True, alpha=0.3)
# 处理概率散点图
ax2.scatter(self.data['running_var'], self.data['treatment_prob'],
alpha=0.6, color='green')
ax2.axvline(x=0, color='red', linestyle='--', linewidth=2)
ax2.set_xlabel('运行变量')
ax2.set_ylabel('处理概率')
ax2.set_title('模糊断点回归: 处理概率')
ax2.grid(True, alpha=0.3)
# 实际处理状态
ax3.scatter(self.data['running_var'], self.data['treatment'],
alpha=0.6, color='purple')
ax3.axvline(x=0, color='red', linestyle='--', linewidth=2)
ax3.set_xlabel('运行变量')
ax3.set_ylabel('实际处理状态')
ax3.set_title('模糊断点回归: 实际处理')
ax3.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
return fig
def estimate_fuzzy_rd_effect(self, bandwidth=1.0, polynomial_order=1):
"""估计模糊断点回归效应(两阶段最小二乘法)"""
if self.data is None:
raise ValueError("请先生成数据")
subset = self.data[np.abs(self.data['running_var']) <= bandwidth].copy()
subset['running_centered'] = subset['running_var']
# 第一阶段:eligibility作为工具变量
first_stage_formula = 'treatment ~ eligibility + running_centered'
if polynomial_order > 1:
for order in range(2, polynomial_order + 1):
first_stage_formula += f' + I(running_centered**{order})'
first_stage = smf.ols(first_stage_formula, data=subset).fit()
subset['treatment_hat'] = first_stage.fittedvalues
# 第二阶段
second_stage_formula = 'Y ~ treatment_hat + running_centered'
if polynomial_order > 1:
for order in range(2, polynomial_order + 1):
second_stage_formula += f' + I(running_centered**{order})'
second_stage = smf.ols(second_stage_formula, data=subset).fit()
# 简化版2SLS(实际应用中应使用专门的IV方法)
treatment_effect = second_stage.params.get('treatment_hat', 0)
treatment_se = second_stage.bse.get('treatment_hat', 0)
# 第一阶段F统计量(检验弱工具变量)
first_stage_f = first_stage.fvalue
self.results['fuzzy_rd'] = {
'first_stage': first_stage,
'second_stage': second_stage,
'treatment_effect': treatment_effect,
'standard_error': treatment_se,
't_statistic': treatment_effect / treatment_se,
'p_value': second_stage.pvalues.get('treatment_hat', 1),
'first_stage_f': first_stage_f,
'bandwidth': bandwidth,
'polynomial_order': polynomial_order
}
return self.results['fuzzy_rd']
# 模糊断点回归示例
fuzzy_rd = FuzzyRDAnalysis()
fuzzy_data = fuzzy_rd.generate_fuzzy_rd_data(n=1000, first_stage_effect=0.4,
true_effect=0.6, compliance_rate=0.7)
fuzzy_rd.visualize_fuzzy_rd()
fuzzy_results = fuzzy_rd.estimate_fuzzy_rd_effect(bandwidth=1.0, polynomial_order=1)
print("模糊断点回归结果:")
print(f"处理效应估计: {fuzzy_results['treatment_effect']:.3f}")
print(f"标准误: {fuzzy_results['standard_error']:.3f}")
print(f"第一阶段F统计量: {fuzzy_results['first_stage_f']:.1f}")
print(f"工具变量强度: {'强' if fuzzy_results['first_stage_f'] > 10 else '弱'}")
清晰vs模糊断点回归比较
Lexical error on line 5. Unrecognized text. ...[处理分配规则D = 1 X ≥ c] B --> E[识别假 -----------------------^
III. 带宽选择与多项式阶数确定
带宽选择的重要性
带宽选择是断点回归设计中最关键的决策之一。带宽太小会导致估计方差过大,带宽太大则会引入偏误。
class BandwidthSelection:
"""带宽选择方法"""
def __init__(self, data):
self.data = data
self.bandwidth_results = {}
def mse_optimal_bandwidth(self, threshold=0, max_bandwidth=2.0,
n_bandwidths=50, polynomial_order=1):
"""通过均方误差最优选择带宽"""
from sklearn.model_selection import cross_val_score
from sklearn.linear_model import LinearRegression
bandwidths = np.linspace(0.1, max_bandwidth, n_bandwidths)
mse_scores = []
for bw in bandwidths:
# 选择带宽内数据
subset = self.data[
np.abs(self.data['running_var'] - threshold) <= bw
].copy()
if len(subset) < 20: # 确保足够样本量
mse_scores.append(np.inf)
continue
# 准备特征
subset['running_centered'] = subset['running_var'] - threshold
subset['treatment'] = (subset['running_var'] >= threshold).astype(int)
X = subset[['treatment', 'running_centered']].copy()
if polynomial_order > 1:
poly = PolynomialFeatures(degree=polynomial_order, include_bias=False)
X_poly = poly.fit_transform(X[['running_centered']])
X = np.column_stack([X['treatment'].values, X_poly])
else:
X = X.values
y = subset['Y'].values
# 交叉验证计算MSE
model = LinearRegression()
scores = cross_val_score(model, X, y, cv=5, scoring='neg_mean_squared_error')
mse = -np.mean(scores)
mse_scores.append(mse)
# 找到最优带宽
optimal_idx = np.argmin(mse_scores)
optimal_bandwidth = bandwidths[optimal_idx]
self.bandwidth_results['mse_optimal'] = {
'bandwidths': bandwidths,
'mse_scores': mse_scores,
'optimal_bandwidth': optimal_bandwidth,
'optimal_mse': mse_scores[optimal_idx]
}
return optimal_bandwidth
def ik_bandwidth(self, threshold=0):
"""Imbens-Kalyanaraman 最优带宽选择"""
# 简化的IK带宽选择实现
# 实际应用中应使用rdrobust等专门包
subset = self.data.copy()
subset['running_centered'] = subset['running_var'] - threshold
# 估计阈值两侧的方差
left_var = subset[subset['running_centered'] < 0]['Y'].var()
right_var = subset[subset['running_centered'] >= 0]['Y'].var()
# 估计二阶导数(简化版本)
# 实际IK带宽有更复杂的计算
n = len(subset)
h_opt = n**(-1/5) * 2.5 # 参考带宽
self.bandwidth_results['ik_bandwidth'] = h_opt
return h_opt
def sensitivity_analysis_bandwidth(self, threshold=0, polynomial_order=1):
"""带宽敏感性分析"""
bandwidths = [0.3, 0.5, 0.8, 1.0, 1.2, 1.5]
effects = []
standard_errors = []
for bw in bandwidths:
subset = self.data[
np.abs(self.data['running_var'] - threshold) <= bw
].copy()
if len(subset) < 20:
effects.append(np.nan)
standard_errors.append(np.nan)
continue
subset['running_centered'] = subset['running_var'] - threshold
subset['treatment'] = (subset['running_var'] >= threshold).astype(int)
# 多项式回归
formula = 'Y ~ treatment + running_centered'
if polynomial_order > 1:
for order in range(2, polynomial_order + 1):
formula += f' + I(running_centered**{order})'
model = smf.ols(formula, data=subset).fit()
effect = model.params.get('treatment', 0)
se = model.bse.get('treatment', 0)
effects.append(effect)
standard_errors.append(se)
self.bandwidth_results['sensitivity'] = {
'bandwidths': bandwidths,
'effects': effects,
'standard_errors': standard_errors
}
return bandwidths, effects, standard_errors
def visualize_bandwidth_selection(self):
"""可视化带宽选择结果"""
if not self.bandwidth_results:
raise ValueError("请先进行带宽选择分析")
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(15, 6))
# MSE最优带宽
if 'mse_optimal' in self.bandwidth_results:
mse_result = self.bandwidth_results['mse_optimal']
ax1.plot(mse_result['bandwidths'], mse_result['mse_scores'],
'o-', linewidth=2)
ax1.axvline(x=mse_result['optimal_bandwidth'], color='red',
linestyle='--', label=f'最优带宽: {mse_result["optimal_bandwidth"]:.2f}')
ax1.set_xlabel('带宽')
ax1.set_ylabel('交叉验证MSE')
ax1.set_title('MSE最优带宽选择')
ax1.legend()
ax1.grid(True, alpha=0.3)
# 带宽敏感性分析
if 'sensitivity' in self.bandwidth_results:
sens_result = self.bandwidth_results['sensitivity']
effects = np.array(sens_result['effects'])
ses = np.array(sens_result['standard_errors'])
bandwidths = sens_result['bandwidths']
# 移除NaN值
valid_mask = ~np.isnan(effects)
effects = effects[valid_mask]
ses = ses[valid_mask]
bandwidths = np.array(bandwidths)[valid_mask]
ax2.errorbar(bandwidths, effects, yerr=1.96*ses,
fmt='o-', capsize=5, linewidth=2)
ax2.axhline(y=0, color='black', linestyle='-', alpha=0.3)
ax2.set_xlabel('带宽')
ax2.set_ylabel('处理效应估计')
ax2.set_title('带宽敏感性分析')
ax2.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
# 打印结果
if 'mse_optimal' in self.bandwidth_results:
mse_result = self.bandwidth_results['mse_optimal']
print(f"MSE最优带宽: {mse_result['optimal_bandwidth']:.3f}")
if 'ik_bandwidth' in self.bandwidth_results:
ik_bw = self.bandwidth_results['ik_bandwidth']
print(f"IK带宽: {ik_bw:.3f}")
if 'sensitivity' in self.bandwidth_results:
sens_result = self.bandwidth_results['sensitivity']
print("\n带宽敏感性分析:")
for bw, effect, se in zip(sens_result['bandwidths'],
sens_result['effects'],
sens_result['standard_errors']):
if not np.isnan(effect):
print(f" 带宽 {bw}: 效应={effect:.3f}, 标准误={se:.3f}")
# 带宽选择示例
bandwidth_selector = BandwidthSelection(sharp_data)
# 不同方法选择带宽
mse_bw = bandwidth_selector.mse_optimal_bandwidth()
ik_bw = bandwidth_selector.ik_bandwidth()
bw_range, effects, ses = bandwidth_selector.sensitivity_analysis_bandwidth()
bandwidth_selector.visualize_bandwidth_selection()
多项式阶数选择
class PolynomialSelection:
"""多项式阶数选择"""
def __init__(self, data):
self.data = data
self.poly_results = {}
def compare_polynomial_orders(self, threshold=0, bandwidth=1.0, max_order=4):
"""比较不同多项式阶数的结果"""
orders = range(1, max_order + 1)
results = {}
for order in orders:
subset = self.data[
np.abs(self.data['running_var'] - threshold) <= bandwidth
].copy()
subset['running_centered'] = subset['running_var'] - threshold
subset['treatment'] = (subset['running_var'] >= threshold).astype(int)
# 构建多项式公式
formula = 'Y ~ treatment + running_centered'
if order > 1:
for o in range(2, order + 1):
formula += f' + I(running_centered**{o})'
model = smf.ols(formula, data=subset).fit()
results[order] = {
'treatment_effect': model.params.get('treatment', 0),
'standard_error': model.bse.get('treatment', 0),
'aic': model.aic,
'bic': model.bic,
'rsquared_adj': model.rsquared_adj,
'n_params': len(model.params)
}
self.poly_results = results
return results
def visualize_polynomial_comparison(self):
"""可视化多项式阶数比较"""
if not self.poly_results:
raise ValueError("请先比较多项式阶数")
orders = list(self.poly_results.keys())
effects = [self.poly_results[o]['treatment_effect'] for o in orders]
aics = [self.poly_results[o]['aic'] for o in orders]
bics = [self.poly_results[o]['bic'] for o in orders]
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(15, 6))
# 处理效应比较
ax1.bar(orders, effects, alpha=0.7, color='skyblue')
ax1.set_xlabel('多项式阶数')
ax1.set_ylabel('处理效应估计')
ax1.set_title('不同多项式阶数的处理效应估计')
ax1.set_xticks(orders)
# 添加数值标签
for i, effect in enumerate(effects):
ax1.text(orders[i], effect, f'{effect:.3f}',
ha='center', va='bottom')
# 信息准则比较
ax2.plot(orders, aics, 'o-', label='AIC', linewidth=2)
ax2.plot(orders, bics, 's-', label='BIC', linewidth=2)
ax2.set_xlabel('多项式阶数')
ax2.set_ylabel('信息准则值')
ax2.set_title('模型选择: AIC vs BIC')
ax2.legend()
ax2.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
# 选择最优模型
best_aic_order = min(self.poly_results.keys(),
key=lambda x: self.poly_results[x]['aic'])
best_bic_order = min(self.poly_results.keys(),
key=lambda x: self.poly_results[x]['bic'])
print("多项式阶数选择结果:")
print(f"AIC最优阶数: {best_aic_order}")
print(f"BIC最优阶数: {best_bic_order}")
print(f"推荐阶数: {best_bic_order} (BIC更倾向于简约模型)")
return best_bic_order
# 多项式阶数选择示例
poly_selector = PolynomialSelection(sharp_data)
poly_results = poly_selector.compare_polynomial_orders(max_order=4)
best_order = poly_selector.visualize_polynomial_comparison()
带宽与多项式选择策略
IV. 假设检验与有效性检查
连续性假设检验
RDD有效性的核心假设是:除了处理状态外,其他所有变量在阈值处应该是连续的。
class ValidityTests:
"""RDD有效性检验"""
def __init__(self, data):
self.data = data
self.validity_results = {}
def test_covariate_balance(self, covariates, threshold=0, bandwidth=1.0):
"""检验协变量在阈值处的平衡性"""
subset = self.data[
np.abs(self.data['running_var'] - threshold) <= bandwidth
].copy()
balance_results = {}
for covariate in covariates:
if covariate not in subset.columns:
continue
# 在阈值两侧比较协变量均值
left_mean = subset[subset['running_var'] < threshold][covariate].mean()
right_mean = subset[subset['running_var'] >= threshold][covariate].mean()
# t检验
from scipy.stats import ttest_ind
t_stat, p_value = ttest_ind(
subset[subset['running_var'] < threshold][covariate],
subset[subset['running_var'] >= threshold][covariate],
nan_policy='omit'
)
balance_results[covariate] = {
'left_mean': left_mean,
'right_mean': right_mean,
'difference': right_mean - left_mean,
't_statistic': t_stat,
'p_value': p_value,
'balanced': p_value > 0.05
}
self.validity_results['covariate_balance'] = balance_results
return balance_results
def test_density_continuity(self, threshold=0, n_bins=20):
"""检验运行变量密度的连续性(McCrary检验)"""
from scipy.stats import kstest
# 创建运行变量的直方图
left_data = self.data[self.data['running_var'] < threshold]['running_var']
right_data = self.data[self.data['running_var'] >= threshold]['running_var']
# 简化版的McCrary检验
# 实际应用中应使用专门的mccrary检验
# KS检验比较两侧分布
ks_stat, ks_pvalue = kstest(left_data, right_data)
# 检查阈值处密度是否有跳跃
bin_width = (self.data['running_var'].max() - self.data['running_var'].min()) / n_bins
threshold_bin_left = self.data[
(self.data['running_var'] >= threshold - bin_width) &
(self_data['running_var'] < threshold)
]
threshold_bin_right = self.data[
(self.data['running_var'] >= threshold) &
(self.data['running_var'] < threshold + bin_width)
]
density_left = len(threshold_bin_left) / len(self.data)
density_right = len(threshold_bin_right) / len(self.data)
density_ratio = density_right / density_left if density_left > 0 else np.inf
density_test = {
'ks_statistic': ks_stat,
'ks_pvalue': ks_pvalue,
'density_left': density_left,
'density_right': density_right,
'density_ratio': density_ratio,
'manipulation_suspected': ks_pvalue < 0.05 or abs(np.log(density_ratio)) > 0.1
}
self.validity_results['density_test'] = density_test
return density_test
def placebo_tests(self, outcome_var, fake_thresholds, bandwidth=1.0):
"""安慰剂检验:在非真实阈值处检验效应"""
placebo_results = {}
for fake_threshold in fake_thresholds:
subset = self.data[
np.abs(self.data['running_var'] - fake_threshold) <= bandwidth
].copy()
if len(subset) < 20:
continue
subset['running_centered'] = subset['running_var'] - fake_threshold
subset['fake_treatment'] = (subset['running_var'] >= fake_threshold).astype(int)
# 估计安慰剂效应
model = smf.ols(f'{outcome_var} ~ fake_treatment + running_centered',
data=subset).fit()
placebo_effect = model.params.get('fake_treatment', 0)
placebo_pvalue = model.pvalues.get('fake_treatment', 1)
placebo_results[fake_threshold] = {
'effect': placebo_effect,
'p_value': placebo_pvalue,
'significant': placebo_pvalue < 0.05
}
self.validity_results['placebo_tests'] = placebo_results
return placebo_results
def visualize_validity_checks(self, covariates):
"""可视化有效性检验结果"""
if not self.validity_results:
raise ValueError("请先进行有效性检验")
fig, ((ax1, ax2), (ax3, ax4)) = plt.subplots(2, 2, figsize=(15, 12))
# 1. 协变量平衡性
if 'covariate_balance' in self.validity_results:
balance_results = self.validity_results['covariate_balance']
covariates_list = list(balance_results.keys())
p_values = [balance_results[cov]['p_value'] for cov in covariates_list]
colors = ['green' if p > 0.05 else 'red' for p in p_values]
bars = ax1.bar(covariates_list, p_values, color=colors, alpha=0.7)
ax1.axhline(y=0.05, color='red', linestyle='--', label='显著性阈值 (0.05)')
ax1.set_ylabel('p值')
ax1.set_title('协变量平衡性检验')
ax1.set_xticklabels(covariates_list, rotation=45)
ax1.legend()
# 添加数值标签
for bar, p_val in zip(bars, p_values):
ax1.text(bar.get_x() + bar.get_width()/2, bar.get_height() + 0.01,
f'{p_val:.3f}', ha='center', va='bottom')
# 2. 密度检验
if 'density_test' in self.validity_results:
density_test = self.validity_results['density_test']
# 运行变量分布直方图
ax2.hist(self.data[self.data['running_var'] < 0]['running_var'],
bins=20, alpha=0.7, label='低于阈值', density=True)
ax2.hist(self.data[self.data['running_var'] >= 0]['running_var'],
bins=20, alpha=0.7, label='高于阈值', density=True)
ax2.axvline(x=0, color='red', linestyle='--', linewidth=2)
ax2.set_xlabel('运行变量')
ax2.set_ylabel('密度')
ax2.set_title('运行变量分布')
ax2.legend()
# 3. 安慰剂检验
if 'placebo_tests' in self.validity_results:
placebo_results = self.validity_results['placebo_tests']
thresholds = list(placebo_results.keys())
effects = [placebo_results[t]['effect'] for t in thresholds]
significant = [placebo_results[t]['significant'] for t in thresholds]
colors = ['red' if sig else 'blue' for sig in significant]
ax3.scatter(thresholds, effects, c=colors, s=60, alpha=0.7)
ax3.axhline(y=0, color='black', linestyle='-', alpha=0.3)
ax3.axvline(x=0, color='red', linestyle='--', linewidth=2,
label='真实阈值')
ax3.set_xlabel('安慰剂阈值')
ax3.set_ylabel('安慰剂效应')
ax3.set_title('安慰剂检验')
ax3.legend()
ax3.grid(True, alpha=0.3)
# 4. 有效性总结
tests_passed = 0
total_tests = 0
test_names = []
statuses = []
if 'covariate_balance' in self.validity_results:
balance_passed = sum(1 for r in balance_results.values() if r['balanced'])
total_balance = len(balance_results)
tests_passed += balance_passed
total_tests += total_balance
test_names.append('协变量平衡')
statuses.append(balance_passed / total_balance)
if 'density_test' in self.validity_results:
density_passed = not density_test['manipulation_suspected']
tests_passed += 1 if density_passed else 0
total_tests += 1
test_names.append('密度连续性')
statuses.append(1.0 if density_passed else 0.0)
if test_names:
colors = ['green' if s > 0.5 else 'red' for s in statuses]
ax4.bar(test_names, statuses, color=colors, alpha=0.7)
ax4.set_ylabel('通过比例')
ax4.set_title('有效性检验总结')
ax4.set_ylim(0, 1)
plt.tight_layout()
plt.show()
# 打印总结
print("有效性检验总结:")
if 'covariate_balance' in self.validity_results:
balance_passed = sum(1 for r in balance_results.values() if r['balanced'])
print(f"协变量平衡: {balance_passed}/{len(balance_results)} 个变量通过检验")
if 'density_test' in self.validity_results:
density_status = "通过" if not density_test['manipulation_suspected'] else "未通过"
print(f"密度连续性检验: {density_status}")
if 'placebo_tests' in self.validity_results:
placebo_significant = sum(1 for r in placebo_results.values() if r['significant'])
print(f"安慰剂检验: {placebo_significant}/{len(placebo_results)} 个阈值显著")
overall_status = "通过" if tests_passed / total_tests > 0.7 else "未通过"
print(f"总体有效性: {overall_status} ({tests_passed}/{total_tests} 项检验通过)")
# 有效性检验示例
# 首先添加一些协变量到数据中
sharp_data['covariate1'] = np.random.normal(0, 1, len(sharp_data))
sharp_data['covariate2'] = np.random.normal(0, 1, len(sharp_data))
validity_checker = ValidityTests(sharp_data)
# 进行各种有效性检验
covariate_balance = validity_checker.test_covariate_balance(
['covariate1', 'covariate2'], threshold=0, bandwidth=1.0
)
density_test = validity_checker.test_density_continuity(threshold=0)
placebo_tests = validity_checker.placebo_tests(
'Y', fake_thresholds=[-1.0, -0.5, 0.5, 1.0], bandwidth=0.8
)
validity_checker.visualize_validity_checks(['covariate1', 'covariate2'])
RDD有效性检验框架
Parse error on line 1: timeline title 断 ^ Expecting 'open_directive', 'NEWLINE', 'SPACE', 'GRAPH', got 'ALPHA'V. 实例分析:班级规模对学生成绩的影响
研究背景与数据生成
让我们考虑一个教育经济学的经典问题:班级规模对学生成绩的影响。许多学校使用入学人数阈值来决定是否分班,这为RDD分析提供了自然实验。
class ClassSizeRDAnalysis:
"""班级规模对学生成绩影响的RDD分析"""
def __init__(self):
self.data = None
self.results = {}
def generate_class_size_data(self, n_schools=500, max_enrollment=50):
"""生成班级规模数据"""
np.random.seed(42)
# 生成学校入学人数(运行变量)
enrollment = np.random.randint(1, max_enrollment + 1, n_schools)
# 阈值:通常为25人(超过25人需要分班)
threshold = 25
# 处理:班级规模(小班教学)
# 假设超过阈值后平均班级规模变小
class_size = np.where(enrollment <= threshold,
enrollment, # 单班,班级规模=入学人数
enrollment / 2) # 分班,班级规模减半
# 基础成绩(与入学人数相关)
base_score = 70 + 0.1 * enrollment
# 班级规模效应(小班教学提高成绩)
class_size_effect = -0.3 * (class_size - 20) # 相对于20人的效应
# 其他影响因素
school_quality = np.random.normal(0, 5, n_schools)
student_background = np.random.normal(0, 3, n_schools)
# 生成最终成绩
test_score = (base_score + class_size_effect +
school_quality + student_background +
np.random.normal(0, 2, n_schools))
self.data = pd.DataFrame({
'enrollment': enrollment,
'class_size': class_size,
'test_score': test_score,
'above_threshold': (enrollment > threshold).astype(int),
'school_quality': school_quality,
'student_background': student_background,
'running_var': enrollment - threshold # 中心化的运行变量
})
return self.data
def comprehensive_analysis(self):
"""综合RDD分析"""
if self.data is None:
raise ValueError("请先生成数据")
# 1. 描述性统计
desc_stats = self.data.describe()
# 2. 图形分析
self._visualize_class_size_effects()
# 3. 正式RDD估计
rd_results = self._estimate_rd_effect()
# 4. 有效性检验
validity_results = self._run_validity_checks()
# 5. 政策含义分析
policy_implications = self._analyze_policy_implications()
self.results.update({
'descriptive_stats': desc_stats,
'rd_estimates': rd_results,
'validity_checks': validity_results,
'policy_implications': policy_implications
})
return self.results
def _visualize_class_size_effects(self):
"""可视化班级规模效应"""
fig, ((ax1, ax2), (ax3, ax4)) = plt.subplots(2, 2, figsize=(15, 12))
# 1. 原始数据散点图
ax1.scatter(self.data['enrollment'], self.data['test_score'],
alpha=0.6, c=self.data['above_threshold'], cmap='coolwarm')
ax1.axvline(x=25, color='red', linestyle='--', linewidth=2)
ax1.set_xlabel('入学人数')
ax1.set_ylabel('测试成绩')
ax1.set_title('入学人数与测试成绩关系')
ax1.grid(True, alpha=0.3)
# 2. 班级规模与成绩关系
ax2.scatter(self.data['class_size'], self.data['test_score'],
alpha=0.6, color='green')
ax2.set_xlabel('班级规模')
ax2.set_ylabel('测试成绩')
ax2.set_title('班级规模与测试成绩关系')
ax2.grid(True, alpha=0.3)
# 3. 局部多项式拟合
bandwidth = 10
subset = self.data[np.abs(self.data['running_var']) <= bandwidth]
# 分别拟合阈值两侧
left_data = subset[subset['running_var'] <= 0]
right_data = subset[subset['running_var'] >= 0]
if len(left_data) > 0:
left_fit = np.polyfit(left_data['running_var'], left_data['test_score'], 1)
x_left = np.linspace(-bandwidth, 0, 50)
y_left = np.polyval(left_fit, x_left)
ax3.plot(x_left + 25, y_left, 'b-', linewidth=2, label='阈值左侧')
if len(right_data) > 0:
right_fit = np.polyfit(right_data['running_var'], right_data['test_score'], 1)
x_right = np.linspace(0, bandwidth, 50)
y_right = np.polyval(right_fit, x_right)
ax3.plot(x_right + 25, y_right, 'r-', linewidth=2, label='阈值右侧')
ax3.axvline(x=25, color='red', linestyle='--', linewidth=2)
ax3.set_xlabel('入学人数')
ax3.set_ylabel('测试成绩')
ax3.set_title('局部线性拟合')
ax3.legend()
ax3.grid(True, alpha=0.3)
# 4. 带宽敏感性
bandwidths = [5, 8, 10, 12, 15]
effects = []
for bw in bandwidths:
subset_bw = self.data[np.abs(self.data['running_var']) <= bw]
if len(subset_bw) > 10:
model = smf.ols('test_score ~ above_threshold + running_var',
data=subset_bw).fit()
effect = model.params.get('above_threshold', 0)
effects.append(effect)
else:
effects.append(np.nan)
ax4.plot(bandwidths, effects, 'o-', linewidth=2)
ax4.axhline(y=0, color='black', linestyle='-', alpha=0.3)
ax4.set_xlabel('带宽')
ax4.set_ylabel('处理效应估计')
ax4.set_title('带宽敏感性分析')
ax4.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
return fig
def _estimate_rd_effect(self):
"""估计RDD效应"""
results = {}
# 不同带宽下的估计
bandwidths = [8, 10, 12]
for bw in bandwidths:
subset = self.data[np.abs(self.data['running_var']) <= bw]
if len(subset) < 20:
continue
# 线性模型
linear_model = smf.ols('test_score ~ above_threshold + running_var',
data=subset).fit()
# 二次模型
quadratic_model = smf.ols('test_score ~ above_threshold + running_var + I(running_var**2)',
data=subset).fit()
results[bw] = {
'linear_effect': linear_model.params.get('above_threshold', 0),
'linear_se': linear_model.bse.get('above_threshold', 0),
'quadratic_effect': quadratic_model.params.get('above_threshold', 0),
'quadratic_se': quadratic_model.bse.get('above_threshold', 0),
'n_observations': len(subset)
}
return results
def _run_validity_checks(self):
"""运行有效性检验"""
validity_checker = ValidityTests(self.data)
# 协变量平衡检验
covariate_balance = validity_checker.test_covariate_balance(
['school_quality', 'student_background'],
threshold=0, bandwidth=10
)
# 密度检验
density_test = validity_checker.test_density_continuity(threshold=0)
# 安慰剂检验
placebo_tests = validity_checker.placebo_tests(
'test_score', [-15, -10, 10, 15], bandwidth=8
)
return {
'covariate_balance': covariate_balance,
'density_test': density_test,
'placebo_tests': placebo_tests
}
def _analyze_policy_implications(self):
"""分析政策含义"""
# 使用最优带宽估计效应
optimal_bw = 10
subset = self.data[np.abs(self.data['running_var']) <= optimal_bw]
model = smf.ols('test_score ~ above_threshold + running_var', data=subset).fit()
effect_size = model.params.get('above_threshold', 0)
effect_se = model.bse.get('above_threshold', 0)
# 计算受影响学校数量
affected_schools = len(self.data[
(self.data['enrollment'] > 25) & (self.data['enrollment'] <= 35)
])
# 政策成本效益分析(简化版)
avg_class_size_reduction = 10 # 假设平均减少10人
cost_per_student = 1000 # 假设每学生成本
benefit_per_score_point = 500 # 假设每分收益
total_cost = affected_schools * 25 * cost_per_student # 简化计算
total_benefit = affected_schools * 25 * effect_size * benefit_per_score_point
return {
'effect_size': effect_size,
'effect_se': effect_se,
'affected_schools': affected_schools,
'cost_benefit_ratio': total_benefit / total_cost if total_cost > 0 else 0,
'policy_recommendation': '推荐' if effect_size > 0 and total_benefit > total_cost else '不推荐'
}
def generate_final_report(self):
"""生成最终分析报告"""
if not self.results:
self.comprehensive_analysis()
print("=" * 70)
print("班级规模对学生成绩影响的断点回归分析报告")
print("=" * 70)
# 主要发现
rd_estimates = self.results['rd_estimates']
policy_impl = self.results['policy_implications']
print(f"\n主要发现:")
print(f"班级规模减小的平均效应: {policy_impl['effect_size']:.2f} 分")
print(f"标准误差: {policy_impl['effect_se']:.2f}")
print(f"统计显著性: {'显著' if abs(policy_impl['effect_size']/policy_impl['effect_se']) > 1.96 else '不显著'}")
print(f"\n政策含义:")
print(f"受影响学校数量: {policy_impl['affected_schools']}")
print(f"成本效益比: {policy_impl['cost_benefit_ratio']:.2f}")
print(f"政策建议: {policy_impl['policy_recommendation']}实施分班政策")
# 有效性检验结果
validity = self.results['validity_checks']
print(f"\n有效性检验:")
if 'covariate_balance' in validity:
balance_passed = sum(1 for r in validity['covariate_balance'].values() if r['balanced'])
print(f"协变量平衡: {balance_passed}/{len(validity['covariate_balance'])} 通过")
if 'density_test' in validity:
density_ok = not validity['density_test']['manipulation_suspected']
print(f"密度连续性: {'通过' if density_ok else '未通过'}")
print(f"\n结论:")
if (policy_impl['policy_recommendation'] == '推荐' and
abs(policy_impl['effect_size']/policy_impl['effect_se']) > 1.96):
print("基于断点回归分析,班级规模减小对提高学生成绩有显著正面影响,建议实施相关政策。")
else:
print("当前证据不足以支持班级规模减小政策,建议进一步研究或试点实施。")
# 运行班级规模分析
class_size_analysis = ClassSizeRDAnalysis()
class_size_data = class_size_analysis.generate_class_size_data(n_schools=800)
results = class_size_analysis.comprehensive_analysis()
class_size_analysis.generate_final_report()
教育政策RDD分析框架
VI. 常见陷阱与注意事项
RDD实施中的常见问题
class CommonPitfalls:
"""RDD常见陷阱与解决方案"""
@staticmethod
def check_manipulation_signs(data, running_var, threshold, bandwidth=None):
"""检查运行变量操纵迹象"""
if bandwidth is None:
bandwidth = (data[running_var].max() - data[running_var].min()) / 10
# 检查阈值附近的密度
left_density = len(data[
(data[running_var] >= threshold - bandwidth) &
(data[running_var] < threshold)
])
right_density = len(data[
(data[running_var] >= threshold) &
(data[running_var] < threshold + bandwidth)
])
density_ratio = right_density / left_density if left_density > 0 else np.inf
return {
'left_density': left_density,
'right_density': right_density,
'density_ratio': density_ratio,
'manipulation_suspected': abs(np.log(density_ratio)) > 0.5 # 经验阈值
}
@staticmethod
def assess_power(sample_size, effect_size, alpha=0.05):
"""评估统计功效"""
# 简化版的功效计算
# 实际应用中应使用更精确的方法
expected_t_stat = effect_size * np.sqrt(sample_size / 4) # 简化假设
from scipy import stats
critical_value = stats.norm.ppf(1 - alpha/2)
power = 1 - stats.norm.cdf(critical_value - expected_t_stat)
return {
'sample_size': sample_size,
'effect_size': effect_size,
'expected_t_stat': expected_t_stat,
'statistical_power': power,
'adequate_power': power >= 0.8
}
@staticmethod
def check_bandwidth_adequacy(data, running_var, threshold, min_bandwidth=0.5):
"""检查带宽是否足够"""
stats = data[running_var].describe()
iqr = stats['75%'] - stats['25%']
# 基于IQR的带宽建议
suggested_bandwidth = 2 * iqr / (len(data)**(1/5)) # 参考带宽
adequacy = {
'current_iqr': iqr,
'suggested_bandwidth': suggested_bandwidth,
'adequate_bandwidth': suggested_bandwidth >= min_bandwidth,
'recommendation': '增加样本量' if suggested_bandwidth < min_bandwidth else '带宽充足'
}
return adequacy
@staticmethod
def generate_pitfall_report(data, running_var, threshold, outcome_var):
"""生成陷阱检查报告"""
print("=" * 60)
print("RDD常见陷阱检查报告")
print("=" * 60)
# 1. 操纵检查
manipulation_check = CommonPitfalls.check_manipulation_signs(
data, running_var, threshold
)
print(f"\n1. 运行变量操纵检查:")
print(f" 阈值左侧密度: {manipulation_check['left_density']}")
print(f" 阈值右侧密度: {manipulation_check['right_density']}")
print(f" 密度比率: {manipulation_check['density_ratio']:.2f}")
print(f" 操纵嫌疑: {'是' if manipulation_check['manipulation_suspected'] else '否'}")
# 2. 统计功效
subset_size = len(data[np.abs(data[running_var] - threshold) <= 1.0])
power_check = CommonPitfalls.assess_power(subset_size, effect_size=0.5)
print(f"\n2. 统计功效评估:")
print(f" 带宽内样本量: {subset_size}")
print(f" 预期统计功效: {power_check['statistical_power']:.3f}")
print(f" 功效是否充足: {'是' if power_check['adequate_power'] else '否'}")
# 3. 带宽充足性
bandwidth_check = CommonPitfalls.check_bandwidth_adequacy(
data, running_var, threshold
)
print(f"\n3. 带宽充足性检查:")
print(f" 数据IQR: {bandwidth_check['current_iqr']:.2f}")
print(f" 建议带宽: {bandwidth_check['suggested_bandwidth']:.2f}")
print(f" 带宽是否充足: {'是' if bandwidth_check['adequate_bandwidth'] else '否'}")
# 4. 通用建议
print(f"\n4. 通用建议:")
if manipulation_check['manipulation_suspected']:
print(" ⚠️ 发现操纵迹象,建议进行更严格的McCrary检验")
if not power_check['adequate_power']:
print(" ⚠️ 统计功效不足,考虑增加样本量或扩大带宽")
if not bandwidth_check['adequate_bandwidth']:
print(" ⚠️ 带宽可能不足,谨慎解释结果")
if (not manipulation_check['manipulation_suspected'] and
power_check['adequate_power'] and
bandwidth_check['adequate_bandwidth']):
print(" ✅ 未发现明显问题,RDD设计适用")
# 陷阱检查示例
pitfall_checker = CommonPitfalls()
pitfall_report = pitfall_checker.generate_pitfall_report(
sharp_data, 'running_var', 0, 'Y'
)
RDD实施检查清单
检查项目 | 具体内容 | 通过标准 | 补救措施 |
---|---|---|---|
运行变量操纵 | McCrary密度检验 | p > 0.05, 密度比率接近1 | 使用模糊RDD或寻找其他识别策略 |
协变量平衡 | 阈值两侧协变量比较 | p > 0.05, 标准化差异<0.1 | 控制不平衡协变量或改变带宽 |
带宽选择 | 交叉验证或IK方法 | 估计结果对带宽选择不敏感 | 进行带宽敏感性分析 |
函数形式 | 不同多项式阶数比较 | AIC/BIC支持,结果稳健 | 使用非参数方法 |
统计功效 | 样本量和效应大小评估 | 功效 > 0.8 | 增加样本量或合并数据 |
外部有效性 | 局部平均处理效应解释 | 明确说明适用范围 | 多阈值分析或异质性检验 |
【声明】本内容来自华为云开发者社区博主,不代表华为云及华为云开发者社区的观点和立场。转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息,否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱:
cloudbbs@huaweicloud.com
- 点赞
- 收藏
- 关注作者
评论(0)