因果推断中的敏感性分析:评估结论的稳健性
I. 引言:为什么需要敏感性分析?
观察性研究的根本局限
在理想情况下,随机对照试验(RCT)通过随机化处理分配来消除混淆变量的影响。然而,在现实世界中,由于伦理、成本或可行性的限制,我们往往只能依赖观察性数据。这种数据的根本问题是:
- 未观测混淆的存在:我们无法测量所有影响处理选择和结果的变量
- 假设的不可验证性:关键因果假设(如无未观测混淆)无法被数据直接检验
- 结论的不确定性:基于有缺陷假设的结论可能严重误导决策
经典案例:在研究吸烟对肺癌的影响时,早期的反对者声称可能存在未观测的基因因素同时导致吸烟倾向和肺癌易感性。敏感性分析能够量化这种质疑需要多强的基因效应才能推翻吸烟致癌的结论。
敏感性分析的核心价值
敏感性分析不是要"证明"因果效应存在与否,而是要量化结论对假设 violations 的稳健性。具体来说,它帮助我们:
- 评估未观测混淆需要多强才能改变研究结论
- 识别最关键的因果假设
- 为决策者提供更全面的证据基础
- 提高研究的透明度和可信度
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import statsmodels.api as sm
from sklearn.linear_model import LinearRegression
from scipy import stats
import warnings
warnings.filterwarnings('ignore')
# 设置中文字体
plt.rcParams['font.sans-serif'] = ['SimHei']
plt.rcParams['axes.unicode_minus'] = False
print("因果推断中的敏感性分析:评估结论的稳健性")
print("=" * 50)
II. 敏感性分析的基本框架
敏感性分析的理论基础
敏感性分析建立在因果推断的潜在结果框架上。设Y₁和Y₀分别表示个体在接受处理和未接受处理时的潜在结果,D表示处理状态,X表示观测协变量,U表示未观测混淆变量。
核心问题:在控制X后,U需要与D和Y有多么强的关联才能推翻我们的因果结论?
敏感性参数的概念
不同的敏感性分析方法使用不同的参数来量化未观测混淆的强度:
敏感性参数 | 定义 | 解释 | 适用方法 |
---|---|---|---|
Γ | 处理优势比 | 在相同X下,U对D的影响强度 | Rosenbaum边界 |
δ | 标准化偏误 | U对Y的偏误大小 | 标准化差异法 |
R² | 解释方差比例 | U对D和Y的联合解释力 | 方差分解法 |
ρ | 相关性 | U与D、Y的相关性 | 相关性法 |
# 创建敏感性分析的基本示例
def create_sensitivity_example(n=10000):
"""创建敏感性分析的基本示例数据"""
np.random.seed(2024)
# 观测协变量
X1 = np.random.normal(0, 1, n) # 年龄
X2 = np.random.binomial(1, 0.5, n) # 性别
# 未观测混淆变量 U
U = np.random.normal(0, 1, n)
# 处理分配(受观测变量和未观测变量影响)
treatment_prob = 1 / (1 + np.exp(-(0.5 + 0.3*X1 + 0.2*X2 + 0.4*U)))
D = np.random.binomial(1, treatment_prob)
# 结果变量(受处理、观测变量和未观测变量影响)
Y = 2.0 * D + 1.5 * X1 + 1.0 * X2 + 1.2 * U + np.random.normal(0, 1, n)
data = pd.DataFrame({
'X1': X1, 'X2': X2, 'D': D, 'Y': Y, 'U': U
})
return data
# 生成数据
sensitivity_data = create_sensitivity_example()
print("敏感性分析示例数据描述:")
print(f"样本量: {len(sensitivity_data)}")
print(f"处理组比例: {sensitivity_data['D'].mean():.3f}")
print(f"结果变量均值: {sensitivity_data['Y'].mean():.3f}")
print(f"真实处理效应: 2.0")
# 基本因果效应估计
def estimate_treatment_effect(data, adjustment_vars=None):
"""估计处理效应"""
if adjustment_vars is None:
# 无调整
model = sm.OLS(data['Y'], sm.add_constant(data['D'])).fit()
else:
# 调整观测变量
X = sm.add_constant(data[['D'] + adjustment_vars])
model = sm.OLS(data['Y'], X).fit()
return {
'estimate': model.params['D'],
'std_error': model.bse['D'],
'p_value': model.pvalues['D'],
'ci_lower': model.conf_int().loc['D', 0],
'ci_upper': model.conf_int().loc['D', 1]
}
# 比较不同调整策略
print("\n不同调整策略的处理效应估计:")
naive_estimate = estimate_treatment_effect(sensitivity_data)
print(f"无调整: {naive_estimate['estimate']:.3f} [{naive_estimate['ci_lower']:.3f}, {naive_estimate['ci_upper']:.3f}]")
adjusted_estimate = estimate_treatment_effect(sensitivity_data, ['X1', 'X2'])
print(f"调整X1,X2: {adjusted_estimate['estimate']:.3f} [{adjusted_estimate['ci_lower']:.3f}, {adjusted_estimate['ci_upper']:.3f}]")
# 如果我们能观测U(理想情况)
ideal_estimate = estimate_treatment_effect(sensitivity_data, ['X1', 'X2', 'U'])
print(f"调整所有变量: {ideal_estimate['estimate']:.3f} [{ideal_estimate['ci_lower']:.3f}, {ideal_estimate['ci_upper']:.3f}]")
# 可视化不同调整策略
estimates = [
naive_estimate['estimate'],
adjusted_estimate['estimate'],
ideal_estimate['estimate']
]
ci_lowers = [
naive_estimate['ci_lower'],
adjusted_estimate['ci_lower'],
ideal_estimate['ci_lower']
]
ci_uppers = [
naive_estimate['ci_upper'],
adjusted_estimate['ci_upper'],
ideal_estimate['ci_upper']
]
methods = ['无调整', '调整X1,X2', '调整所有变量']
fig, ax = plt.subplots(figsize=(10, 6))
for i, (method, estimate, ci_l, ci_u) in enumerate(zip(methods, estimates, ci_lowers, ci_uppers)):
ax.errorbar(i, estimate, yerr=[[estimate - ci_l], [ci_u - estimate]],
fmt='o', capsize=5, capthick=2, markersize=8, label=method)
ax.axhline(y=2.0, color='red', linestyle='--', linewidth=2, label='真实效应')
ax.set_xlabel('调整策略')
ax.set_ylabel('处理效应估计')
ax.set_title('不同调整策略的因果效应估计')
ax.set_xticks(range(len(methods)))
ax.set_xticklabels(methods)
ax.legend()
ax.grid(True, alpha=0.3)
plt.show()
敏感性分析的基本流程
一个完整的敏感性分析通常包含以下步骤:
III. Rosenbaum边界方法
Rosenbaum边界的基本思想
Rosenbaum边界方法是处理二元结果时最常用的敏感性分析方法。其核心思想是:量化未观测混淆对处理分配的影响需要多强才能改变统计结论。
敏感性参数Γ:表示在匹配的个体对中,由于未观测混淆导致的处理分配几率比的最大差异。
Rosenbaum边界的实现
class RosenbaumBounds:
"""Rosenbaum边界敏感性分析"""
def __init__(self, treated_outcomes, control_outcomes):
"""
初始化
treated_outcomes: 处理组结果
control_outcomes: 对照组结果
"""
self.treated = np.array(treated_outcomes)
self.control = np.array(control_outcomes)
self.n_pairs = min(len(self.treated), len(self.control))
def wilcoxon_test(self, gamma=1.0):
"""
在给定gamma下的Wilcoxon符号秩检验
gamma: 敏感性参数,Γ >= 1
"""
# 创建匹配对差异
if len(self.treated) != len(self.control):
# 如果样本量不等,随机匹配
np.random.seed(42)
treated_sample = np.random.choice(self.treated, self.n_pairs, replace=False)
control_sample = np.random.choice(self.control, self.n_pairs, replace=False)
else:
treated_sample = self.treated
control_sample = self.control
differences = treated_sample - control_sample
# 计算符号秩统计量
abs_differences = np.abs(differences)
ranks = stats.rankdata(abs_differences)
signed_ranks = np.where(differences > 0, ranks, -ranks)
# 在gamma下的上下界
T_plus_upper = 0
T_plus_lower = 0
for i, rank in enumerate(ranks):
if differences[i] > 0:
p_upper = gamma / (1 + gamma)
p_lower = 1 / (1 + gamma)
else:
p_upper = 1 / (1 + gamma)
p_lower = gamma / (1 + gamma)
T_plus_upper += rank * p_upper
T_plus_lower += rank * p_lower
# 计算p值边界
# 使用正态近似
n = self.n_pairs
mean_upper = n * (n + 1) * gamma / (4 * (1 + gamma))
mean_lower = n * (n + 1) / (4 * (1 + gamma))
var = n * (n + 1) * (2 * n + 1) * gamma / (24 * (1 + gamma)**2)
z_upper = (T_plus_upper - mean_upper) / np.sqrt(var)
z_lower = (T_plus_lower - mean_lower) / np.sqrt(var)
p_upper = 2 * min(stats.norm.cdf(z_upper), 1 - stats.norm.cdf(z_upper))
p_lower = 2 * min(stats.norm.cdf(z_lower), 1 - stats.norm.cdf(z_lower))
return {
'gamma': gamma,
'p_value_upper': p_upper,
'p_value_lower': p_lower,
'significant_upper': p_upper < 0.05,
'significant_lower': p_lower < 0.05
}
def analyze_sensitivity(self, gamma_range=np.linspace(1, 3, 21)):
"""分析不同gamma下的敏感性"""
results = []
for gamma in gamma_range:
result = self.wilcoxon_test(gamma)
results.append(result)
return pd.DataFrame(results)
# 模拟二元结果数据
def create_binary_outcome_data(n=2000):
"""创建二元结果的模拟数据"""
np.random.seed(2024)
# 观测协变量
X = np.random.normal(0, 1, n)
# 未观测混淆
U = np.random.normal(0, 1, n)
# 处理分配
treatment_logit = -0.5 + 0.6*X + 0.8*U
treatment_prob = 1 / (1 + np.exp(-treatment_logit))
D = np.random.binomial(1, treatment_prob)
# 结果变量(二元)
outcome_logit = -1.0 + 0.8*D + 0.5*X + 0.7*U
outcome_prob = 1 / (1 + np.exp(-outcome_logit))
Y = np.random.binomial(1, outcome_prob)
data = pd.DataFrame({'X': X, 'D': D, 'Y': Y, 'U': U})
return data
# 生成二元结果数据
binary_data = create_binary_outcome_data()
print("二元结果数据描述:")
print(f"样本量: {len(binary_data)}")
print(f"处理组比例: {binary_data['D'].mean():.3f}")
print(f"结果发生率: {binary_data['Y'].mean():.3f}")
print(f"处理组结果率: {binary_data[binary_data['D']==1]['Y'].mean():.3f}")
print(f"对照组结果率: {binary_data[binary_data['D']==0]['Y'].mean():.3f}")
# 应用Rosenbaum边界分析
treated_outcomes = binary_data[binary_data['D']==1]['Y'].values
control_outcomes = binary_data[binary_data['D']==0]['Y'].values
rosenbaum = RosenbaumBounds(treated_outcomes, control_outcomes)
sensitivity_results = rosenbaum.analyze_sensitivity()
print("\nRosenbaum边界敏感性分析结果:")
print("Gamma\tP-value上限\tP-value下限\t结论是否改变")
print("-" * 50)
for _, row in sensitivity_results.iterrows():
gamma = row['gamma']
p_upper = row['p_value_upper']
p_lower = row['p_value_lower']
conclusion_changed = not (row['significant_upper'] and row['significant_lower'])
print(f"{gamma:.1f}\t{p_upper:.4f}\t\t{p_lower:.4f}\t\t{'是' if conclusion_changed else '否'}")
# 找到临界Gamma值
critical_gamma = sensitivity_results[
~(sensitivity_results['significant_upper'] & sensitivity_results['significant_lower'])
]['gamma'].min()
if not np.isnan(critical_gamma):
print(f"\n临界Gamma值: {critical_gamma:.2f}")
print("当未观测混淆导致的处理分配几率比超过此值时,统计结论可能改变")
else:
print("\n在分析的Gamma范围内,结论始终保持显著")
# 可视化Rosenbaum边界
plt.figure(figsize=(10, 6))
plt.plot(sensitivity_results['gamma'], sensitivity_results['p_value_upper'],
'b-', label='P-value上限', linewidth=2)
plt.plot(sensitivity_results['gamma'], sensitivity_results['p_value_lower'],
'r-', label='P-value下限', linewidth=2)
plt.axhline(y=0.05, color='black', linestyle='--', alpha=0.7, label='显著性阈值 (0.05)')
if not np.isnan(critical_gamma):
plt.axvline(x=critical_gamma, color='green', linestyle='--',
label=f'临界Γ值: {critical_gamma:.2f}')
plt.xlabel('敏感性参数 Γ')
plt.ylabel('P-value')
plt.title('Rosenbaum边界敏感性分析')
plt.legend()
plt.grid(True, alpha=0.3)
plt.show()
Rosenbaum边界的解释
Rosenbaum边界的结果提供了关于结论稳健性的重要信息:
Γ值范围 | 稳健性解释 | 实践意义 |
---|---|---|
Γ < 1.5 | 敏感性较高 | 较弱的未观测混淆可能改变结论 |
1.5 ≤ Γ < 2 | 中等稳健性 | 需要中等强度的未观测混淆才能改变结论 |
2 ≤ Γ < 3 | 较为稳健 | 需要较强的未观测混淆才能改变结论 |
Γ ≥ 3 | 高度稳健 | 即使很强的未观测混淆也不改变结论 |
IV. 标准化偏误方法
标准化偏误的直观理解
标准化偏误方法适用于连续结果变量,其核心思想是:量化未观测混淆需要对结果产生多大的偏误才能改变因果结论。
敏感性参数δ:表示未观测混淆导致的标准化偏误大小。
标准化偏误的实现
class StandardizedBiasSensitivity:
"""标准化偏误敏感性分析"""
def __init__(self, treated_outcomes, control_outcomes, treated_covariates, control_covariates):
"""
初始化
treated_outcomes: 处理组结果
control_outcomes: 对照组结果
treated_covariates: 处理组协变量
control_covariates: 对照组协变量
"""
self.treated_y = np.array(treated_outcomes)
self.control_y = np.array(control_outcomes)
self.treated_x = np.array(treated_covariates)
self.control_x = np.array(control_covariates)
def calculate_standardized_bias(self, delta=0):
"""
在给定偏误delta下的调整估计
delta: 标准化偏误
"""
# 基准估计(无偏误调整)
y_pooled = np.concatenate([self.treated_y, self.control_y])
x_pooled = np.vstack([self.treated_x, self.control_x])
d_pooled = np.concatenate([np.ones(len(self.treated_y)), np.zeros(len(self.control_y))])
# 线性回归估计处理效应
X_design = np.column_stack([np.ones(len(d_pooled)), d_pooled, x_pooled])
model = sm.OLS(y_pooled, X_design).fit()
baseline_effect = model.params[1]
# 应用偏误调整
# 假设未观测混淆导致delta的标准化偏误
adjusted_effect = baseline_effect - delta * np.std(y_pooled)
return adjusted_effect
def sensitivity_analysis(self, delta_range=np.linspace(-1, 1, 21), alpha=0.05):
"""执行敏感性分析"""
results = []
# 计算基准效应的置信区间
y_pooled = np.concatenate([self.treated_y, self.control_y])
x_pooled = np.vstack([self.treated_x, self.control_x])
d_pooled = np.concatenate([np.ones(len(self.treated_y)), np.zeros(len(self.control_y))])
X_design = np.column_stack([np.ones(len(d_pooled)), d_pooled, x_pooled])
model = sm.OLS(y_pooled, X_design).fit()
baseline_effect = model.params[1]
baseline_se = model.bse[1]
critical_value = stats.t.ppf(1 - alpha/2, model.df_resid)
baseline_ci_lower = baseline_effect - critical_value * baseline_se
baseline_ci_upper = baseline_effect + critical_value * baseline_se
for delta in delta_range:
adjusted_effect = self.calculate_standardized_bias(delta)
# 检查结论是否改变(符号改变或包含0)
conclusion_unchanged = (
(baseline_ci_lower > 0 and adjusted_effect > 0) or
(baseline_ci_upper < 0 and adjusted_effect < 0)
)
results.append({
'delta': delta,
'adjusted_effect': adjusted_effect,
'conclusion_unchanged': conclusion_unchanged,
'significant': np.abs(adjusted_effect) > critical_value * baseline_se
})
return pd.DataFrame(results)
# 创建连续结果数据
def create_continuous_outcome_data(n=1000):
"""创建连续结果的模拟数据"""
np.random.seed(2024)
# 观测协变量
X1 = np.random.normal(0, 1, n)
X2 = np.random.binomial(1, 0.5, n)
# 未观测混淆
U = np.random.normal(0, 1, n)
# 处理分配
treatment_logit = -0.5 + 0.4*X1 + 0.3*X2 + 0.5*U
treatment_prob = 1 / (1 + np.exp(-treatment_logit))
D = np.random.binomial(1, treatment_prob)
# 连续结果变量
Y = 2.5 * D + 1.2 * X1 + 0.8 * X2 + 1.0 * U + np.random.normal(0, 1, n)
data = pd.DataFrame({
'X1': X1, 'X2': X2, 'D': D, 'Y': Y, 'U': U
})
return data
# 生成连续结果数据
continuous_data = create_continuous_outcome_data()
print("连续结果数据描述:")
print(f"样本量: {len(continuous_data)}")
print(f"处理组比例: {continuous_data['D'].mean():.3f}")
print(f"结果变量均值: {continuous_data['Y'].mean():.3f}")
print(f"处理组均值: {continuous_data[continuous_data['D']==1]['Y'].mean():.3f}")
print(f"对照组均值: {continuous_data[continuous_data['D']==0]['Y'].mean():.3f}")
# 应用标准化偏误敏感性分析
treated_y = continuous_data[continuous_data['D']==1]['Y'].values
control_y = continuous_data[continuous_data['D']==0]['Y'].values
treated_x = continuous_data[continuous_data['D']==1][['X1', 'X2']].values
control_x = continuous_data[continuous_data['D']==0][['X1', 'X2']].values
bias_analyzer = StandardizedBiasSensitivity(treated_y, control_y, treated_x, control_x)
bias_results = bias_analyzer.sensitivity_analysis()
print("\n标准化偏误敏感性分析结果:")
print("Delta\t调整后效应\t结论是否改变")
print("-" * 40)
for _, row in bias_results.iterrows():
delta = row['delta']
effect = row['adjusted_effect']
unchanged = row['conclusion_unchanged']
print(f"{delta:.2f}\t{effect:.3f}\t\t{'否' if unchanged else '是'}")
# 找到临界delta值
positive_delta_results = bias_results[bias_results['delta'] > 0]
negative_delta_results = bias_results[bias_results['delta'] < 0]
critical_delta_positive = positive_delta_results[
~positive_delta_results['conclusion_unchanged']
]['delta'].min()
critical_delta_negative = negative_delta_results[
~negative_delta_results['conclusion_unchanged']
]['delta'].max()
print(f"\n临界偏误值:")
if not np.isnan(critical_delta_positive):
print(f"正向偏误: {critical_delta_positive:.2f} 标准差")
if not np.isnan(critical_delta_negative):
print(f"负向偏误: {critical_delta_negative:.2f} 标准差")
# 可视化标准化偏误分析
plt.figure(figsize=(10, 6))
plt.plot(bias_results['delta'], bias_results['adjusted_effect'], 'b-', linewidth=2)
plt.axhline(y=0, color='black', linestyle='-', alpha=0.5)
# 标记显著性区域
significant_region = bias_results[bias_results['significant']]
if len(significant_region) > 0:
plt.fill_between(significant_region['delta'],
significant_region['adjusted_effect'],
alpha=0.3, color='green', label='统计显著区域')
if not np.isnan(critical_delta_positive):
plt.axvline(x=critical_delta_positive, color='red', linestyle='--',
label=f'正向临界值: {critical_delta_positive:.2f}')
if not np.isnan(critical_delta_negative):
plt.axvline(x=critical_delta_negative, color='red', linestyle='--',
label=f'负向临界值: {critical_delta_negative:.2f}')
plt.xlabel('标准化偏误 (δ, 标准差单位)')
plt.ylabel('调整后的处理效应')
plt.title('标准化偏误敏感性分析')
plt.legend()
plt.grid(True, alpha=0.3)
plt.show()
标准化偏误的解释指南
标准化偏误方法的结果可以从多个角度解释:
δ绝对值范围 | 偏误强度 | 现实对应物 | 稳健性评估 |
---|---|---|---|
δ | < 0.1 | 很小 | |
0.1 ≤ | δ | < 0.3 | 小到中等 |
0.3 ≤ | δ | < 0.5 | 中等 |
0.5 ≤ | δ | < 0.8 | 大 |
δ | ≥ 0.8 | 很大 |
V. 实例分析:教育对收入的因果效应
研究背景与问题设定
我们将分析教育对收入影响的经典问题,重点关注未观测能力混淆的敏感性。假设我们观测到教育年限和收入数据,但无法测量先天能力。
数据生成与基准分析
# 创建教育回报的敏感性分析示例
def create_education_earnings_data(n=5000):
"""创建教育对收入影响的数据"""
np.random.seed(2024)
# 观测变量
family_income = np.random.normal(0, 1, n) # 家庭收入
parent_education = np.random.normal(0, 1, n) # 父母教育
cognitive_skill = np.random.normal(0, 1, n) # 认知技能(可观测部分)
# 未观测能力(先天能力)
innate_ability = np.random.normal(0, 1, n)
# 教育决策
education_prob = 1 / (1 + np.exp(-(
-1.5 + 0.4*family_income + 0.5*parent_education +
0.3*cognitive_skill + 0.6*innate_ability
)))
education = np.random.binomial(1, education_prob) # 0=高中, 1=大学
# 收入生成
base_income = 30000
earnings = (
base_income +
8000 * education + # 真实因果效应
3000 * family_income +
2500 * parent_education +
4000 * cognitive_skill +
3500 * innate_ability + # 未观测能力的影响
np.random.normal(0, 5000, n)
)
data = pd.DataFrame({
'education': education,
'earnings': earnings,
'family_income': family_income,
'parent_education': parent_education,
'cognitive_skill': cognitive_skill,
'innate_ability': innate_ability # 在实际分析中不可观测
})
return data
# 生成数据
education_data = create_education_earnings_data()
print("教育回报数据分析:")
print(f"样本量: {len(education_data)}")
print(f"大学教育比例: {education_data['education'].mean():.3f}")
print(f"平均收入: ${education_data['earnings'].mean():.2f}")
print(f"大学组收入: ${education_data[education_data['education']==1]['earnings'].mean():.2f}")
print(f"高中组收入: ${education_data[education_data['education']==0]['earnings'].mean():.2f}")
print(f"简单差异: ${education_data[education_data['education']==1]['earnings'].mean() - education_data[education_data['education']==0]['earnings'].mean():.2f}")
print(f"真实因果效应: $8000.00")
# 基准因果效应估计
def estimate_education_effect(data, adjustment_set):
"""估计教育对收入的影响"""
if adjustment_set:
X = sm.add_constant(data[['education'] + adjustment_set])
else:
X = sm.add_constant(data['education'])
model = sm.OLS(data['earnings'], X).fit()
return {
'adjustment_set': adjustment_set,
'estimate': model.params['education'],
'std_error': model.bse['education'],
'p_value': model.pvalues['education'],
'ci_lower': model.conf_int().loc['education', 0],
'ci_upper': model.conf_int().loc['education', 1]
}
# 不同调整策略的比较
adjustment_strategies = [
[],
['family_income'],
['family_income', 'parent_education'],
['family_income', 'parent_education', 'cognitive_skill'],
['family_income', 'parent_education', 'cognitive_skill', 'innate_ability'] # 理想情况
]
education_results = []
for adj_set in adjustment_strategies:
result = estimate_education_effect(education_data, adj_set)
education_results.append(result)
# 显示结果
print("\n教育回报的不同调整策略结果:")
print("调整变量\t\t估计值\t\t95%置信区间")
print("-" * 60)
for result in education_results:
adj_str = ", ".join(result['adjustment_set']) if result['adjustment_set'] else "无"
estimate = result['estimate']
ci_lower = result['ci_lower']
ci_upper = result['ci_upper']
print(f"{adj_str:<20}\t${estimate:.2f}\t\t[${ci_lower:.2f}, ${ci_upper:.2f}]")
# 可视化基准结果
estimates = [r['estimate'] for r in education_results]
ci_lowers = [r['ci_lower'] for r in education_results]
ci_uppers = [r['ci_upper'] for r in education_results]
labels = ['无调整', '+家庭收入', '+父母教育', '+认知技能', '+所有变量']
fig, ax = plt.subplots(figsize=(12, 6))
bars = ax.bar(range(len(estimates)), estimates, yerr=[np.array(estimates)-np.array(ci_lowers),
np.array(ci_uppers)-np.array(estimates)],
capsize=5, alpha=0.7, color=['red', 'orange', 'yellow', 'lightgreen', 'green'])
ax.axhline(y=8000, color='blue', linestyle='--', linewidth=2, label='真实因果效应')
ax.set_xlabel('调整策略')
ax.set_ylabel('教育回报估计 ($)')
ax.set_title('不同调整策略的教育回报估计')
ax.set_xticks(range(len(labels)))
ax.set_xticklabels(labels, rotation=45)
ax.legend()
ax.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
综合敏感性分析
现在我们对教育回报估计进行全面的敏感性分析,考虑未观测能力的影响。
# 教育回报的敏感性分析
class EducationSensitivityAnalysis:
"""教育回报的综合性敏感性分析"""
def __init__(self, data):
self.data = data
def ability_bias_sensitivity(self, ability_strength_range=np.linspace(0, 1, 11)):
"""能力偏误敏感性分析"""
results = []
# 基准估计(无能力调整)
baseline = estimate_education_effect(self.data,
['family_income', 'parent_education', 'cognitive_skill'])
for strength in ability_strength_range:
# 模拟不同强度的能力偏误
# 假设能力对收入的影响强度
ability_effect = 3500 * strength # 基准是3500
# 调整后的估计(简化处理)
# 实际中需要更复杂的偏误模型
adjusted_estimate = baseline['estimate'] - ability_effect * 0.15 # 假设能力与教育相关度为0.15
# 计算调整后的标准误(简化)
adjusted_se = baseline['std_error'] * (1 + 0.1 * strength)
# 置信区间
ci_lower = adjusted_estimate - 1.96 * adjusted_se
ci_upper = adjusted_estimate + 1.96 * adjusted_se
# 结论是否改变(真实效应8000是否在CI内)
conclusion_unchanged = (ci_lower <= 8000 <= ci_upper) and (baseline['ci_lower'] <= 8000 <= baseline['ci_upper'])
results.append({
'ability_strength': strength,
'adjusted_estimate': adjusted_estimate,
'ci_lower': ci_lower,
'ci_upper': ci_upper,
'conclusion_unchanged': conclusion_unchanged,
'bias_direction': '过度估计' if adjusted_estimate > 8000 else '低估'
})
return pd.DataFrame(results)
def multiple_sensitivity_approaches(self):
"""多种敏感性分析方法"""
sensitivity_results = {}
# 1. Rosenbaum边界风格分析(简化)
treated = self.data[self.data['education']==1]['earnings']
control = self.data[self.data['education']==0]['earnings']
# 计算处理组和对照组的差异
mean_difference = treated.mean() - control.mean()
std_pooled = np.sqrt((treated.var() + control.var()) / 2)
# 2. 标准化偏误分析
bias_analyzer = StandardizedBiasSensitivity(
treated.values, control.values,
self.data[self.data['education']==1][['family_income', 'parent_education', 'cognitive_skill']].values,
self.data[self.data['education']==0][['family_income', 'parent_education', 'cognitive_skill']].values
)
bias_results = bias_analyzer.sensitivity_analysis(delta_range=np.linspace(-0.5, 0.5, 11))
sensitivity_results['bias_analysis'] = bias_results
sensitivity_results['mean_difference'] = mean_difference
sensitivity_results['std_pooled'] = std_pooled
return sensitivity_results
# 执行教育回报的敏感性分析
education_analyzer = EducationSensitivityAnalysis(education_data)
# 能力偏误敏感性
ability_results = education_analyzer.ability_bias_sensitivity()
print("教育回报的能力偏误敏感性分析:")
print("能力强度\t调整后估计\t95%置信区间\t\t结论是否改变")
print("-" * 70)
for _, row in ability_results.iterrows():
strength = row['ability_strength']
estimate = row['adjusted_estimate']
ci_lower = row['ci_lower']
ci_upper = row['ci_upper']
unchanged = row['conclusion_unchanged']
print(f"{strength:.1f}\t\t${estimate:.2f}\t\t[${ci_lower:.2f}, ${ci_upper:.2f}]\t{'是' if unchanged else '否'}")
# 多种敏感性方法
multiple_results = education_analyzer.multiple_sensitivity_approaches()
# 可视化综合敏感性分析
fig, axes = plt.subplots(2, 2, figsize=(15, 10))
# 1. 能力偏误敏感性
axes[0,0].plot(ability_results['ability_strength'], ability_results['adjusted_estimate'], 'b-', linewidth=2)
axes[0,0].fill_between(ability_results['ability_strength'],
ability_results['ci_lower'], ability_results['ci_upper'],
alpha=0.3, color='blue')
axes[0,0].axhline(y=8000, color='red', linestyle='--', linewidth=2, label='真实效应')
axes[0,0].set_xlabel('未观测能力的影响强度')
axes[0,0].set_ylabel('调整后的教育回报 ($)')
axes[0,0].set_title('能力偏误敏感性分析')
axes[0,0].legend()
axes[0,0].grid(True, alpha=0.3)
# 2. 标准化偏误分析
bias_results = multiple_results['bias_analysis']
axes[0,1].plot(bias_results['delta'], bias_results['adjusted_effect'], 'g-', linewidth=2)
significant_region = bias_results[bias_results['significant']]
if len(significant_region) > 0:
axes[0,1].fill_between(significant_region['delta'],
significant_region['adjusted_effect'],
alpha=0.3, color='green')
axes[0,1].axhline(y=0, color='black', linestyle='-', alpha=0.5)
axes[0,1].set_xlabel('标准化偏误 (δ)')
axes[0,1].set_ylabel('调整后的处理效应')
axes[0,1].set_title('标准化偏误敏感性分析')
axes[0,1].grid(True, alpha=0.3)
# 3. 偏误方向分析
bias_directions = ability_results['bias_direction'].value_counts()
axes[1,0].pie(bias_directions.values, labels=bias_directions.index, autopct='%1.1f%%',
colors=['lightcoral', 'lightblue'])
axes[1,0].set_title('偏误方向分布')
# 4. 结论稳健性总结
conclusion_robust = ability_results['conclusion_unchanged'].mean()
robustness_data = [conclusion_robust, 1 - conclusion_robust]
axes[1,1].bar(['结论不变', '结论改变'], robustness_data,
color=['lightgreen', 'lightcoral'], alpha=0.7)
axes[1,1].set_ylabel('比例')
axes[1,1].set_title('结论稳健性总结')
for i, v in enumerate(robustness_data):
axes[1,1].text(i, v + 0.01, f'{v:.1%}', ha='center', va='bottom')
plt.tight_layout()
plt.show()
# 敏感性分析总结
print("\n=== 教育回报敏感性分析总结 ===")
critical_strength = ability_results[~ability_results['conclusion_unchanged']]['ability_strength'].min()
if not np.isnan(critical_strength):
print(f"未观测能力需要达到 {critical_strength:.1f} 倍基准影响才能改变因果结论")
else:
print("在分析的能力影响范围内,教育回报的因果结论保持稳健")
bias_critical = multiple_results['bias_analysis'][
~multiple_results['bias_analysis']['conclusion_unchanged']
]['delta'].abs().min()
if not np.isnan(bias_critical):
print(f"标准化偏误需要达到 {bias_critical:.2f} 标准差才能改变统计显著性")
print(f"\n总体评估:")
if critical_strength > 0.5 and bias_critical > 0.3:
print("教育回报估计较为稳健,需要较强的未观测混淆才能推翻结论")
elif critical_strength > 0.3 or bias_critical > 0.2:
print("教育回报估计中等稳健,需要中等强度的未观测混淆才能推翻结论")
else:
print("教育回报估计敏感性较高,较弱的未观测混淆可能改变结论")
实例分析洞察
通过系统的敏感性分析,我们获得了关于教育回报估计稳健性的重要洞察:
- 偏误方向:未观测能力主要导致教育回报的高估
- 稳健性阈值:需要中等偏强的能力影响才能改变因果结论
- 政策含义:教育回报的结论相对稳健,但决策时仍需谨慎
- 研究改进:未来研究应致力于更好地测量能力相关变量
VI. 多变量敏感性分析
处理多重未观测混淆
在现实研究中,我们通常面临多个未观测混淆变量。多变量敏感性分析扩展了单变量方法,同时考虑多个未观测混淆的影响。
# 多变量敏感性分析
class MultivariateSensitivityAnalysis:
"""多变量敏感性分析"""
def __init__(self, data, treatment_var, outcome_var, observed_covariates):
self.data = data
self.treatment_var = treatment_var
self.outcome_var = outcome_var
self.observed_covariates = observed_covariates
def simulate_unobserved_confounders(self, n_confounders=2, correlation_strength=0.5):
"""模拟未观测混淆变量的影响"""
results = []
# 基准模型(无未观测混淆)
baseline_formula = f"{self.outcome_var} ~ {self.treatment_var} + " + " + ".join(self.observed_covariates)
baseline_model = sm.OLS.from_formula(baseline_formula, data=self.data).fit()
baseline_effect = baseline_model.params[self.treatment_var]
baseline_se = baseline_model.bse[self.treatment_var]
# 模拟不同强度的未观测混淆
strength_range = np.linspace(0, 1, 6)
for strength in strength_range:
if strength == 0:
# 无未观测混淆
adjusted_effect = baseline_effect
adjusted_se = baseline_se
else:
# 模拟未观测混淆的影响
# 简化方法:根据混淆强度调整估计和标准误
confounding_bias = strength * 0.3 * baseline_effect # 假设偏误与效应大小相关
adjusted_effect = baseline_effect - confounding_bias
adjusted_se = baseline_se * (1 + 0.2 * strength) # 标准误增加
# 置信区间
ci_lower = adjusted_effect - 1.96 * adjusted_se
ci_upper = adjusted_effect + 1.96 * adjusted_se
results.append({
'confounding_strength': strength,
'n_confounders': n_confounders,
'adjusted_effect': adjusted_effect,
'adjusted_se': adjusted_se,
'ci_lower': ci_lower,
'ci_upper': ci_upper,
'bias_percentage': (baseline_effect - adjusted_effect) / baseline_effect * 100
})
return pd.DataFrame(results)
def sensitivity_contour_plot(self, strength_range=np.linspace(0, 1, 11),
n_confounders_range=[1, 2, 3, 4]):
"""创建敏感性等值线图"""
# 计算基准效应
baseline_formula = f"{self.outcome_var} ~ {self.treatment_var} + " + " + ".join(self.observed_covariates)
baseline_model = sm.OLS.from_formula(baseline_formula, data=self.data).fit()
baseline_effect = baseline_model.params[self.treatment_var]
# 创建网格
X, Y = np.meshgrid(strength_range, n_confounders_range)
Z = np.zeros_like(X) # 存储调整后效应
for i, strength in enumerate(strength_range):
for j, n_conf in enumerate(n_confounders_range):
# 计算调整后效应
bias_per_confounder = strength * 0.15 * baseline_effect
total_bias = n_conf * bias_per_confounder
Z[j, i] = baseline_effect - total_bias
return X, Y, Z, baseline_effect
# 应用多变量敏感性分析
multivariate_analyzer = MultivariateSensitivityAnalysis(
education_data, 'education', 'earnings',
['family_income', 'parent_education', 'cognitive_skill']
)
# 模拟不同数量的未观测混淆
multivariate_results = []
for n_conf in [1, 2, 3]:
results = multivariate_analyzer.simulate_unobserved_confounders(n_confounders=n_conf)
multivariate_results.append(results)
# 合并结果
all_results = pd.concat(multivariate_results, ignore_index=True)
print("多变量敏感性分析结果:")
print("混淆变量数\t混淆强度\t调整后效应\t偏误百分比")
print("-" * 60)
for n_conf in [1, 2, 3]:
conf_results = all_results[all_results['n_confounders'] == n_conf]
for _, row in conf_results.iterrows():
strength = row['confounding_strength']
effect = row['adjusted_effect']
bias_pct = row['bias_percentage']
print(f"{n_conf}\t\t{strength:.1f}\t\t${effect:.2f}\t\t{bias_pct:.1f}%")
# 创建敏感性等值线图
X, Y, Z, baseline_effect = multivariate_analyzer.sensitivity_contour_plot()
plt.figure(figsize=(10, 6))
contour = plt.contourf(X, Y, Z, levels=20, cmap='RdYlBu_r')
plt.colorbar(contour, label='调整后的教育回报 ($)')
# 添加等值线标签
contour_lines = plt.contour(X, Y, Z, levels=10, colors='black', alpha=0.5)
plt.clabel(contour_lines, inline=True, fontsize=8, fmt='%.0f')
plt.axhline(y=1, color='white', linestyle='--', alpha=0.7)
plt.axhline(y=2, color='white', linestyle='--', alpha=0.7)
plt.axhline(y=3, color='white', linestyle='--', alpha=0.7)
plt.xlabel('单个混淆变量的影响强度')
plt.ylabel('未观测混淆变量数量')
plt.title('多变量敏感性分析: 教育回报的稳健性')
plt.grid(True, alpha=0.3)
plt.show()
# 敏感性分析总结表
def create_sensitivity_summary_table(analysis_results):
"""创建敏感性分析总结表"""
summary_data = []
for n_conf in [1, 2, 3]:
conf_results = analysis_results[analysis_results['n_confounders'] == n_conf]
# 找到结论改变的临界点
critical_point = conf_results[
conf_results['adjusted_effect'] < 8000 * 0.8 # 效应减少20%以上
]
if len(critical_point) > 0:
critical_strength = critical_point['confounding_strength'].min()
else:
critical_strength = np.nan
# 最大偏误
max_bias = conf_results['bias_percentage'].max()
summary_data.append({
'未观测混淆数量': n_conf,
'临界影响强度': f"{critical_strength:.2f}" if not np.isnan(critical_strength) else ">1.0",
'最大偏误百分比': f"{max_bias:.1f}%",
'稳健性评估': (
"高" if critical_strength > 0.7 else
"中等" if critical_strength > 0.4 else
"低"
) if not np.isnan(critical_strength) else "非常高"
})
return pd.DataFrame(summary_data)
# 显示总结表
sensitivity_summary = create_sensitivity_summary_table(all_results)
print("\n多变量敏感性分析总结:")
print(sensitivity_summary.to_string(index=False))
多变量敏感性分析的价值
多变量敏感性分析提供了更全面的稳健性评估:
分析维度 | 单变量分析 | 多变量分析 | 优势 |
---|---|---|---|
混淆数量 | 假设单个未观测混淆 | 考虑多个未观测混淆 | 更现实的情景建模 |
交互效应 | 忽略混淆间交互 | 可包含交互效应 | 更完整的偏误评估 |
结果解释 | 相对简单 | 需要多维思考 | 提供更丰富的洞察 |
决策支持 | 有限的场景 | 多种情景分析 | 更好的决策依据 |
VII. 敏感性分析的最佳实践
实施敏感性分析的完整流程
基于理论研究和实践经验,我们总结出敏感性分析的最佳实践流程:
# 敏感性分析检查表
def sensitivity_analysis_checklist():
"""敏感性分析实施检查表"""
checklist = {
'研究设计阶段': [
'明确关键因果假设',
'识别潜在的未观测混淆',
'选择适当的敏感性分析方法',
'预设敏感性分析计划'
],
'数据分析阶段': [
'计算基准因果效应估计',
'选择合理的敏感性参数范围',
'执行多种敏感性分析方法',
'检查方法间的一致性'
],
'结果解释阶段': [
'确定临界敏感性参数值',
'评估现实中的合理性',
'考虑偏误的方向和大小',
'综合多种方法的结果'
],
'报告阶段': [
'透明报告所有敏感性分析',
'用直观方式展示结果',
'讨论结果的现实意义',
'承认分析的局限性'
]
}
return checklist
# 显示检查表
checklist = sensitivity_analysis_checklist()
print("敏感性分析实施检查表:")
print("=" * 50)
for stage, items in checklist.items():
print(f"\n{stage}:")
for i, item in enumerate(items, 1):
print(f" {i}. {item}")
# 敏感性分析报告模板
def generate_sensitivity_report(analysis_name, baseline_estimate, sensitivity_results, critical_values):
"""生成敏感性分析报告"""
report = f"""
敏感性分析报告: {analysis_name}
{'=' * 50}
1. 基准分析结果
- 因果效应估计: {baseline_estimate['estimate']:.3f}
- 95%置信区间: [{baseline_estimate['ci_lower']:.3f}, {baseline_estimate['ci_upper']:.3f}]
- P值: {baseline_estimate['p_value']:.4f}
2. 敏感性分析摘要
- 使用的敏感性方法: {len(sensitivity_results)} 种
- 结论改变的临界条件:
"""
for method, critical_value in critical_values.items():
report += f" - {method}: {critical_value}\n"
report += f"""
3. 稳健性评估
- 总体稳健性: {'高' if len(critical_values) > 0 and all('需要强' in str(cv) for cv in critical_values.values()) else '中等'}
- 主要威胁: 未观测混淆变量
4. 建议与限制
- 结论在 {', '.join([f'{method}(>{value})' for method, value in critical_values.items()])} 条件下稳健
- 未来研究应致力于测量关键的未观测变量
- 决策时应考虑敏感性分析揭示的不确定性
"""
return report
# 生成教育回报的敏感性报告
education_baseline = estimate_education_effect(education_data,
['family_income', 'parent_education', 'cognitive_skill'])
critical_values_summary = {
'能力偏误强度': '> 0.6倍基准影响',
'标准化偏误': '> 0.35标准差'
}
sensitivity_report = generate_sensitivity_report(
"教育对收入的影响",
education_baseline,
['Rosenbaum边界', '标准化偏误', '多变量分析'],
critical_values_summary
)
print(sensitivity_report)
常见陷阱与避免策略
敏感性分析实施中的常见问题及解决方案:
常见陷阱 | 问题描述 | 后果 | 避免策略 |
---|---|---|---|
参数范围不合理 | 敏感性参数范围过窄或过宽 | 结论过于乐观或悲观 | 基于文献和实质知识设定范围 |
方法单一化 | 只使用一种敏感性方法 | 结论可能方法依赖 | 使用多种互补的方法 |
忽略偏误方向 | 只考虑单向偏误 | 低估真正的不确定性 | 分析正负两个方向的偏误 |
过度解释 | 将敏感性分析视为证明工具 | 误导性结论 | 明确敏感性分析的探索性质 |
报告不完整 | 只报告有利的结果 | 结论可信度降低 | 透明报告所有分析结果 |
敏感性分析的局限性
尽管敏感性分析是强大的工具,但它也有固有的局限性:
- 点赞
- 收藏
- 关注作者
评论(0)