观察性研究中的因果推断最佳实践总结

举报
数字扫地僧 发表于 2025/09/30 23:40:02 2025/09/30
【摘要】 观察性研究挑战混杂偏差选择偏差测量误差因果推断方法倾向得分方法工具变量法回归断点设计双重差分法因果效应估计有效性验证可靠因果结论 I. 因果推断的理论基础与核心概念 1.1 潜在结果框架潜在结果框架(Potential Outcomes Framework),又称Rubin因果模型,是现代因果推断的基石。该框架定义了个体在接受处理和不接受处理两种情况下的潜在结果。设Yi(1)Y_i(1)Yi...
观察性研究挑战
混杂偏差
选择偏差
测量误差
因果推断方法
倾向得分方法
工具变量法
回归断点设计
双重差分法
因果效应估计
有效性验证
可靠因果结论

I. 因果推断的理论基础与核心概念

1.1 潜在结果框架

潜在结果框架(Potential Outcomes Framework),又称Rubin因果模型,是现代因果推断的基石。该框架定义了个体在接受处理和不接受处理两种情况下的潜在结果。

Yi(1)Y_i(1)表示个体ii接受处理的潜在结果,Yi(0)Y_i(0)表示未接受处理的潜在结果。个体因果效应定义为:

τi=Yi(1)Yi(0)\tau_i = Y_i(1) - Y_i(0)

然而,根本性问题(Fundamental Problem of Causal Inference)在于我们只能观察到一个潜在结果,另一个是反事实的。

1.2 平均处理效应与识别假设

在无法获知个体因果效应的情况下,我们通常关注平均处理效应(Average Treatment Effect, ATE):

ATE=E[Y(1)Y(0)]ATE = E[Y(1) - Y(0)]

为了实现ATE的识别,需要以下关键假设:

假设名称 数学表述 实质含义
条件独立假设 (Y(1),Y(0))TX(Y(1), Y(0)) \perp T \mid X 给定协变量X,处理分配与潜在结果独立
重叠假设 0<P(T=1X)<10 < P(T=1 \mid X) < 1 每个个体都有接受处理和不处理的可能
一致性假设 Y=TY(1)+(1T)Y(0)Y = T \cdot Y(1) + (1-T) \cdot Y(0) 观察结果等于对应处理状态下的潜在结果
无干扰假设 Yi(T1,...,Tn)=Yi(Ti)Y_i(T_1, ..., T_n) = Y_i(T_i) 个体的结果不受其他个体处理状态的影响

1.3 因果图与结构因果模型

因果图提供了表示变量间因果关系的直观工具。在因果图中,节点表示变量,有向边表示因果关系。通过因果图可以系统分析混杂因素并指导分析方法的选择。

结构因果模型(Structural Causal Models, SCM)为因果关系提供了数学形式化表达:

Y=f(T,X,UY)Y = f(T, X, U_Y)

T=g(X,UT)T = g(X, U_T)

其中UYU_YUTU_T为未观测变量。

因果推断理论基础
核心识别假设
无混杂
条件独立
重叠
共同支持
一致性
结果明确
无干扰
SUTVA
识别假设
潜在结果框架
因果图模型
结构因果模型

II. 因果推断方法体系

2.1 主要方法分类与比较

观察性研究中常用的因果推断方法可根据其原理和应用场景分为以下几类:

方法类别 代表方法 适用场景 关键假设
回归调整 多元线性回归、逻辑回归 线性关系,无严重混杂 线性函数形式正确
倾向得分方法 倾向得分匹配、加权、分层 中等维度混杂 倾向得分模型正确
双重稳健方法 增强逆概率加权 模型双重保护 倾向得分或结果模型之一正确
工具变量法 2SLS、GMM 存在未观测混杂 有效的工具变量
断点回归设计 精确断点、模糊断点 处理分配有明确断点 断点附近近似随机
双重差分法 标准DID、事件研究法 面板数据,处理时点不同 平行趋势假设

2.2 方法选择指南

选择适当的因果推断方法需考虑数据特征和研究问题:

import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.ensemble import RandomForestClassifier
from sklearn.linear_model import LogisticRegression
import statsmodels.api as sm
import warnings
warnings.filterwarnings('ignore')

# 创建方法选择辅助函数
def method_selection_guide(data_type, confounding_type, design_type):
    """
    根据研究特征推荐因果推断方法
    """
    recommendations = []
    
    # 基于数据类型的推荐
    if data_type == "横截面":
        recommendations.append("倾向得分方法")
        recommendations.append("回归调整")
        recommendations.append("工具变量法(如可用)")
    elif data_type == "面板数据":
        recommendations.append("双重差分法")
        recommendations.append("固定效应模型")
    
    # 基于混杂类型的推荐
    if confounding_type == "观测混杂":
        recommendations.append("倾向得分加权")
        recommendations.append("匹配方法")
    elif confounding_type == "未观测混杂":
        recommendations.append("工具变量法")
        recommendations.append("固定效应模型(面板数据)")
    
    # 基于设计类型的推荐
    if design_type == "自然实验":
        recommendations.append("断点回归设计")
        recommendations.append("工具变量法")
    
    # 去重并排序
    recommendations = sorted(list(set(recommendations)))
    
    return recommendations

# 测试方法选择指南
print("=== 因果推断方法选择指南 ===")
test_cases = [
    {"data_type": "横截面", "confounding_type": "观测混杂", "design_type": "标准观察"},
    {"data_type": "面板数据", "confounding_type": "未观测混杂", "design_type": "自然实验"}
]

for i, case in enumerate(test_cases):
    recs = method_selection_guide(**case)
    print(f"案例 {i+1}: {case}")
    print(f"推荐方法: {', '.join(recs)}\n")

III. 实例分析:高等教育对收入的影响

3.1 研究背景与数据生成

我们考虑一个经典问题:完成大学教育对个人收入的影响。由于大学教育不是随机分配的,直接比较大学毕业生和高中毕业生的收入会产生严重偏差(能力偏差、家庭背景偏差等)。

# 设置随机种子确保结果可重现
np.random.seed(2024)

# 生成模拟数据
n = 5000  # 样本量

# 生成混杂变量(观测和未观测)
ability = np.random.normal(0, 1, n)  # 能力(标准化)
family_income = np.random.normal(0, 1, n)  # 家庭收入(标准化)
male = np.random.binomial(1, 0.5, n)  # 性别
urban = np.random.binomial(1, 0.6, n)  # 城市居住

# 未观测混杂(实践中不可得)
motivation = np.random.normal(0, 1, n)

# 生成大学教育处理变量(非随机分配)
# 大学入学受能力、家庭收入、动机等因素影响
college_prob = 1 / (1 + np.exp(-(0.8 * ability + 0.6 * family_income + 0.5 * motivation + 0.3 * urban - 0.5)))
college = np.random.binomial(1, college_prob, n)

# 生成收入结果(受教育、能力、家庭背景等影响)
# 真实因果效应:大学教育使年收入增加 15,000元
true_effect = 15000

income = (
    30000 +  # 基础收入
    true_effect * college +  # 教育因果效应
    8000 * ability +  # 能力对收入的影响
    5000 * family_income +  # 家庭背景的影响
    4000 * male +  # 性别差异
    3000 * urban +  # 城乡差异
    6000 * motivation +  # 动机的影响
    np.random.normal(0, 5000, n)  # 随机误差
)

# 创建数据集
data = pd.DataFrame({
    'college': college,
    'income': income,
    'ability': ability,
    'family_income': family_income,
    'male': male,
    'urban': urban,
    'motivation': motivation  # 在实践中通常不可观测
})

print("=== 数据生成完成 ===")
print(f"样本量: {len(data)}")
print(f"大学生比例: {data['college'].mean():.3f}")
print(f"大学生平均收入: {data[data['college'] == 1]['income'].mean():.2f}")
print(f"高中生平均收入: {data[data['college'] == 0]['income'].mean():.2f}")
print(f"原始差异: {data[data['college'] == 1]['income'].mean() - data[data['college'] == 0]['income'].mean():.2f}")
print(f"真实因果效应: {true_effect:.2f}")

3.2 简单比较的偏差

首先展示简单比较大学毕业生和高中毕业生收入的偏差:

# 简单比较分析
simple_diff = data[data['college'] == 1]['income'].mean() - data[data['college'] == 0]['income'].mean()
bias = simple_diff - true_effect

print("=== 简单比较分析 ===")
print(f"大学生平均收入: {data[data['college'] == 1]['income'].mean():.2f}")
print(f"高中生平均收入: {data[data['college'] == 0]['income'].mean():.2f}")
print(f"简单收入差异: {simple_diff:.2f}")
print(f"真实因果效应: {true_effect:.2f}")
print(f"估计偏差: {bias:.2f}")
print(f"偏差百分比: {(bias/true_effect)*100:.1f}%")

# 可视化混杂因素分布差异
fig, axes = plt.subplots(2, 2, figsize=(15, 12))

# 能力分布比较
axes[0, 0].hist(data[data['college'] == 0]['ability'], alpha=0.7, label='高中', bins=30, color='skyblue')
axes[0, 0].hist(data[data['college'] == 1]['ability'], alpha=0.7, label='大学', bins=30, color='coral')
axes[0, 0].set_xlabel('能力')
axes[0, 0].set_ylabel('频数')
axes[0, 0].set_title('能力分布比较')
axes[0, 0].legend()

# 家庭收入分布比较
axes[0, 1].hist(data[data['college'] == 0]['family_income'], alpha=0.7, label='高中', bins=30, color='skyblue')
axes[0, 1].hist(data[data['college'] == 1]['family_income'], alpha=0.7, label='大学', bins=30, color='coral')
axes[0, 1].set_xlabel('家庭收入')
axes[0, 1].set_ylabel('频数')
axes[0, 1].set_title('家庭收入分布比较')
axes[0, 1].legend()

# 动机分布比较
axes[1, 0].hist(data[data['college'] == 0]['motivation'], alpha=0.7, label='高中', bins=30, color='skyblue')
axes[1, 0].hist(data[data['college'] == 1]['motivation'], alpha=0.7, label='大学', bins=30, color='coral')
axes[1, 0].set_xlabel('动机')
axes[1, 0].set_ylabel('频数')
axes[1, 0].set_title('动机分布比较(未观测混杂)')
axes[1, 0].legend()

# 城乡分布比较
urban_comparison = pd.crosstab(data['college'], data['urban'], normalize='index')
urban_comparison.plot(kind='bar', ax=axes[1, 1], color=['lightblue', 'darkblue'])
axes[1, 1].set_xlabel('教育水平')
axes[1, 1].set_ylabel('比例')
axes[1, 1].set_title('城乡分布比较')
axes[1, 1].legend(['农村', '城市'])
axes[1, 1].set_xticklabels(['高中', '大学'], rotation=0)

plt.tight_layout()
plt.show()
大学教育
个人收入
个人能力
家庭背景
个人动机
性别
居住地

IV. 倾向得分方法实践

4.1 倾向得分估计与平衡检查

倾向得分(Propensity Score)定义为给定协变量情况下接受处理的条件概率:

e(X)=P(T=1X)e(X) = P(T=1|X)

我们使用逻辑回归估计倾向得分,并检查协变量平衡:

from sklearn.linear_model import LogisticRegression
from sklearn.metrics import accuracy_score, roc_auc_score

# 准备协变量矩阵(不包括未观测的动机变量)
covariates = ['ability', 'family_income', 'male', 'urban']
X_ps = data[covariates]
T = data['college']

# 估计倾向得分
ps_model = LogisticRegression(random_state=42)
ps_model.fit(X_ps, T)
data['propensity_score'] = ps_model.predict_proba(X_ps)[:, 1]

print("=== 倾向得分模型评估 ===")
print(f"模型AUC: {roc_auc_score(T, data['propensity_score']):.4f}")
print(f"预测准确率: {accuracy_score(T, ps_model.predict(X_ps)):.4f}")

# 倾向得分分布可视化
plt.figure(figsize=(12, 5))

plt.subplot(1, 2, 1)
plt.hist(data[data['college'] == 0]['propensity_score'], 
         alpha=0.7, label='高中', bins=30, color='skyblue')
plt.hist(data[data['college'] == 1]['propensity_score'], 
         alpha=0.7, label='大学', bins=30, color='coral')
plt.xlabel('倾向得分')
plt.ylabel('频数')
plt.title('倾向得分分布')
plt.legend()

plt.subplot(1, 2, 2)
# 共同支持区域检查
min_ps = data['propensity_score'].min()
max_ps = data['propensity_score'].max()
x_range = np.linspace(min_ps, max_ps, 100)

treatment_density = []
control_density = []

for x in x_range:
    treatment_density.append(np.sum((data['college'] == 1) & 
                                   (data['propensity_score'] >= x - 0.01) & 
                                   (data['propensity_score'] < x + 0.01)))
    control_density.append(np.sum((data['college'] == 0) & 
                                 (data['propensity_score'] >= x - 0.01) & 
                                 (data['propensity_score'] < x + 0.01)))

plt.plot(x_range, treatment_density, label='大学', color='coral')
plt.plot(x_range, control_density, label='高中', color='skyblue')
plt.xlabel('倾向得分')
plt.ylabel('密度')
plt.title('共同支持区域检查')
plt.legend()

plt.tight_layout()
plt.show()

# 协变量平衡检查
def check_balance(data, covariates, weight_col=None):
    """
    检查协变量在处理组和对照组之间的平衡
    """
    balance_results = []
    
    for covar in covariates:
        if weight_col is None:
            # 未加权的情况
            treat_mean = data[data['college'] == 1][covar].mean()
            control_mean = data[data['college'] == 0][covar].mean()
            treat_std = data[data['college'] == 1][covar].std()
            control_std = data[data['college'] == 0][covar].std()
        else:
            # 加权的情况
            treat_weights = data[data['college'] == 1][weight_col]
            control_weights = data[data['college'] == 0][weight_col]
            
            treat_mean = np.average(data[data['college'] == 1][covar], weights=treat_weights)
            control_mean = np.average(data[data['college'] == 0][covar], weights=control_weights)
            
            treat_var = np.cov(data[data['college'] == 1][covar], aweights=treat_weights)
            control_var = np.cov(data[data['college'] == 0][covar], aweights=control_weights)
            treat_std = np.sqrt(treat_var)
            control_std = np.sqrt(control_var)
        
        # 标准化均值差异
        pooled_std = np.sqrt((treat_std**2 + control_std**2) / 2)
        smd = abs(treat_mean - control_mean) / pooled_std
        
        balance_results.append({
            '变量': covar,
            '处理组均值': treat_mean,
            '对照组均值': control_mean,
            '标准化差异': smd
        })
    
    return pd.DataFrame(balance_results)

# 检查加权前的平衡性
balance_before = check_balance(data, covariates)
print("\n=== 加权前协变量平衡性 ===")
print(balance_before.round(4))

4.2 逆概率加权处理效应估计

逆概率加权(Inverse Probability Weighting, IPW)通过加权使得处理组和对照组在协变量分布上相似:

# 计算逆概率权重
data['ipw'] = np.where(data['college'] == 1, 
                       1 / data['propensity_score'], 
                       1 / (1 - data['propensity_score']))

# 标准化权重(稳定估计)
data['ipw_stabilized'] = np.where(data['college'] == 1,
                                 data['propensity_score'].mean() / data['propensity_score'],
                                 (1 - data['propensity_score'].mean()) / (1 - data['propensity_score']))

# 检查加权后的平衡性
balance_after_ipw = check_balance(data, covariates, weight_col='ipw_stabilized')
print("\n=== 逆概率加权后协变量平衡性 ===")
print(balance_after_ipw.round(4))

# IPW估计平均处理效应
def ipw_ate(data, outcome, treatment, weight_col):
    """
    使用逆概率加权估计平均处理效应
    """
    weighted_treat = np.average(data[data[treatment] == 1][outcome], 
                               weights=data[data[treatment] == 1][weight_col])
    weighted_control = np.average(data[data[treatment] == 0][outcome], 
                                 weights=data[data[treatment] == 0][weight_col])
    
    ate = weighted_treat - weighted_control
    
    # 计算标准误(简化版)
    n_treat = data[treatment].sum()
    n_control = len(data) - n_treat
    
    var_treat = np.var(data[data[treatment] == 1][outcome]) / n_treat
    var_control = np.var(data[data[treatment] == 0][outcome]) / n_control
    se = np.sqrt(var_treat + var_control)
    
    return ate, se

# 估计ATE
ipw_ate_est, ipw_se = ipw_ate(data, 'income', 'college', 'ipw_stabilized')
ipw_ci_lower = ipw_ate_est - 1.96 * ipw_se
ipw_ci_upper = ipw_ate_est + 1.96 * ipw_se

print(f"\n=== 逆概率加权ATE估计 ===")
print(f"ATE估计值: {ipw_ate_est:.2f}")
print(f"标准误: {ipw_se:.2f}")
print(f"95%置信区间: [{ipw_ci_lower:.2f}, {ipw_ci_upper:.2f}]")
print(f"真实ATE: {true_effect:.2f}")

4.3 倾向得分匹配

倾向得分匹配(Propensity Score Matching)将处理组个体与倾向得分相似的对照组个体进行匹配:

from sklearn.neighbors import NearestNeighbors

def propensity_score_matching(data, treatment_col, ps_col, caliper=0.1):
    """
    实施倾向得分匹配
    """
    treated = data[data[treatment_col] == 1].copy()
    control = data[data[treatment_col] == 0].copy()
    
    # 使用最近邻匹配
    nbrs = NearestNeighbors(n_neighbors=1, metric='euclidean')
    nbrs.fit(control[[ps_col]].values)
    
    distances, indices = nbrs.kneighbors(treated[[ps_col]].values)
    
    # 应用卡钳值
    matched_pairs = []
    for i, (dist, idx) in enumerate(zip(distances, indices)):
        if dist[0] <= caliper:
            treated_idx = treated.index[i]
            control_idx = control.index[idx[0]]
            matched_pairs.append((treated_idx, control_idx, dist[0]))
    
    # 创建匹配后数据集
    matched_treated_indices = [pair[0] for pair in matched_pairs]
    matched_control_indices = [pair[1] for pair in matched_pairs]
    
    matched_data = pd.concat([
        data.loc[matched_treated_indices],
        data.loc[matched_control_indices]
    ])
    
    return matched_data, matched_pairs

# 执行倾向得分匹配
matched_data, pairs = propensity_score_matching(data, 'college', 'propensity_score', caliper=0.05)

print(f"\n=== 倾向得分匹配结果 ===")
print(f"原始处理组样本数: {data['college'].sum()}")
print(f"匹配后处理组样本数: {matched_data['college'].sum()}")
print(f"匹配率: {matched_data['college'].sum() / data['college'].sum():.3f}")

# 匹配后平衡性检查
balance_after_matching = check_balance(matched_data, covariates)
print("\n=== 匹配后协变量平衡性 ===")
print(balance_after_matching.round(4))

# 匹配后ATE估计
matched_ate = (matched_data[matched_data['college'] == 1]['income'].mean() - 
               matched_data[matched_data['college'] == 0]['income'].mean())

print(f"\n=== 匹配后ATE估计 ===")
print(f"ATE估计值: {matched_ate:.2f}")
print(f"真实ATE: {true_effect:.2f}")

# 可视化匹配效果
fig, axes = plt.subplots(1, 2, figsize=(15, 6))

# 匹配前后能力分布比较
axes[0].hist(data[data['college'] == 0]['ability'], alpha=0.5, label='高中-匹配前', bins=20, color='lightblue')
axes[0].hist(data[data['college'] == 1]['ability'], alpha=0.5, label='大学-匹配前', bins=20, color='lightcoral')
axes[0].hist(matched_data[matched_data['college'] == 0]['ability'], alpha=0.8, 
            label='高中-匹配后', bins=20, color='blue', histtype='step', linewidth=2)
axes[0].hist(matched_data[matched_data['college'] == 1]['ability'], alpha=0.8, 
            label='大学-匹配后', bins=20, color='red', histtype='step', linewidth=2)
axes[0].set_xlabel('能力')
axes[0].set_ylabel('频数')
axes[0].set_title('匹配前后能力分布比较')
axes[0].legend()

# 标准化均值差异比较
balance_comparison = pd.merge(balance_before, balance_after_matching, on='变量', suffixes=('_前', '_后'))
variables = balance_comparison['变量']
smd_before = balance_comparison['标准化差异_前']
smd_after = balance_comparison['标准化差异_后']

x = np.arange(len(variables))
width = 0.35

axes[1].bar(x - width/2, smd_before, width, label='匹配前', color='lightblue')
axes[1].bar(x + width/2, smd_after, width, label='匹配后', color='lightcoral')
axes[1].set_xlabel('变量')
axes[1].set_ylabel('标准化均值差异')
axes[1].set_title('匹配前后标准化均值差异比较')
axes[1].set_xticks(x)
axes[1].set_xticklabels(variables)
axes[1].legend()
axes[1].axhline(y=0.1, color='red', linestyle='--', alpha=0.7, label='平衡阈值 (0.1)')

plt.tight_layout()
plt.show()
原始数据
估计倾向得分
选择PS方法
逆概率加权
倾向得分匹配
倾向得分分层
计算IPW权重
最近邻匹配
按PS分层
加权平衡检查
匹配平衡检查
分层平衡检查
IPW ATE估计
匹配ATE估计
分层ATE估计
因果效应估计
敏感性分析
稳健性结论

V. 双重稳健估计与高级方法

5.1 双重稳健估计

双重稳健估计(Doubly Robust Estimation)结合了倾向得分模型和结果回归模型,只要其中一个模型正确,就能得到一致的估计:

from sklearn.linear_model import LinearRegression

def doubly_robust_estimation(data, outcome, treatment, covariates, ps_col):
    """
    双重稳健估计
    """
    # 拟合结果回归模型
    # 处理组结果模型
    treated_data = data[data[treatment] == 1]
    outcome_model_treated = LinearRegression()
    outcome_model_treated.fit(treated_data[covariates], treated_data[outcome])
    
    # 对照组结果模型
    control_data = data[data[treatment] == 0]
    outcome_model_control = LinearRegression()
    outcome_model_control.fit(control_data[covariates], control_data[outcome])
    
    # 预测潜在结果
    data['Y1_pred'] = outcome_model_treated.predict(data[covariates])
    data['Y0_pred'] = outcome_model_control.predict(data[covariates])
    
    # 双重稳健估计量
    dr_estimates = []
    
    for i, row in data.iterrows():
        if row[treatment] == 1:
            dr_est = (row[outcome] - row['Y1_pred']) / row[ps_col] + row['Y1_pred']
        else:
            dr_est = row['Y0_pred'] - (row[outcome] - row['Y0_pred']) / (1 - row[ps_col])
        dr_estimates.append(dr_est)
    
    data['dr_estimate'] = dr_estimates
    
    # 计算ATE
    dr_ate = np.mean(data['dr_estimate'])
    
    return dr_ate, data

# 执行双重稳健估计
dr_ate, dr_data = doubly_robust_estimation(data, 'income', 'college', covariates, 'propensity_score')

print("=== 双重稳健估计结果 ===")
print(f"DR ATE估计值: {dr_ate:.2f}")
print(f"真实ATE: {true_effect:.2f}")

# 比较不同方法的估计结果
methods_comparison = pd.DataFrame({
    '方法': ['简单比较', '逆概率加权', '倾向得分匹配', '双重稳健'],
    'ATE估计': [simple_diff, ipw_ate_est, matched_ate, dr_ate],
    '偏差': [simple_diff - true_effect, ipw_ate_est - true_effect, 
            matched_ate - true_effect, dr_ate - true_effect],
    '偏差百分比': [(simple_diff - true_effect)/true_effect*100, 
                 (ipw_ate_est - true_effect)/true_effect*100,
                 (matched_ate - true_effect)/true_effect*100,
                 (dr_ate - true_effect)/true_effect*100]
})

print("\n=== 方法比较 ===")
print(methods_comparison.round(2))

5.2 机器学习在因果推断中的应用

现代因果推断越来越多地利用机器学习方法处理高维数据和复杂函数形式:

from sklearn.ensemble import GradientBoostingRegressor, RandomForestRegressor
from sklearn.model_selection import cross_val_score

def ml_doubly_robust(data, outcome, treatment, covariates, ps_col):
    """
    使用机器学习的双重稳健估计
    """
    # 使用梯度提升树拟合结果模型
    treated_data = data[data[treatment] == 1]
    control_data = data[data[treatment] == 0]
    
    # 处理组结果模型
    outcome_model_treated = GradientBoostingRegressor(random_state=42, n_estimators=100)
    outcome_model_treated.fit(treated_data[covariates], treated_data[outcome])
    
    # 对照组结果模型
    outcome_model_control = GradientBoostingRegressor(random_state=42, n_estimators=100)
    outcome_model_control.fit(control_data[covariates], control_data[outcome])
    
    # 预测潜在结果
    data['Y1_pred_ml'] = outcome_model_treated.predict(data[covariates])
    data['Y0_pred_ml'] = outcome_model_control.predict(data[covariates])
    
    # 交叉验证评估模型性能
    cv_treated = cross_val_score(outcome_model_treated, treated_data[covariates], 
                                treated_data[outcome], cv=5, scoring='r2')
    cv_control = cross_val_score(outcome_model_control, control_data[covariates], 
                                control_data[outcome], cv=5, scoring='r2')
    
    print(f"处理组结果模型R² (CV): {cv_treated.mean():.4f}{cv_treated.std():.4f})")
    print(f"对照组结果模型R² (CV): {cv_control.mean():.4f}{cv_control.std():.4f})")
    
    # 双重稳健估计
    dr_estimates_ml = []
    
    for i, row in data.iterrows():
        if row[treatment] == 1:
            dr_est = (row[outcome] - row['Y1_pred_ml']) / row[ps_col] + row['Y1_pred_ml']
        else:
            dr_est = row['Y0_pred_ml'] - (row[outcome] - row['Y0_pred_ml']) / (1 - row[ps_col])
        dr_estimates_ml.append(dr_est)
    
    data['dr_estimate_ml'] = dr_estimates_ml
    
    # 计算ATE
    dr_ate_ml = np.mean(data['dr_estimate_ml'])
    
    return dr_ate_ml, data

# 执行机器学习双重稳健估计
dr_ate_ml, ml_data = ml_doubly_robust(data, 'income', 'college', covariates, 'propensity_score')

print("\n=== 机器学习双重稳健估计 ===")
print(f"ML-DR ATE估计值: {dr_ate_ml:.2f}")
print(f"真实ATE: {true_effect:.2f}")

# 添加到方法比较
ml_comparison = pd.DataFrame({
    '方法': ['机器学习双重稳健'],
    'ATE估计': [dr_ate_ml],
    '偏差': [dr_ate_ml - true_effect],
    '偏差百分比': [(dr_ate_ml - true_effect)/true_effect*100]
})

methods_comparison = pd.concat([methods_comparison, ml_comparison], ignore_index=True)
print("\n=== 更新后的方法比较 ===")
print(methods_comparison.round(2))

VI. 敏感性分析与稳健性检验

6.1 未观测混杂的敏感性分析

敏感性分析评估结论对未观测混杂的稳健性:

def sensitivity_analysis_rosenbaum(data, outcome, treatment, ps_col, gamma_range=[1, 2, 3, 4, 5]):
    """
    Rosenbaum敏感性分析:评估结论对隐藏偏差的敏感性
    """
    from scipy import stats
    
    # 匹配后的数据(简化处理,使用全部数据)
    treated = data[data[treatment] == 1]
    control = data[data[treatment] == 0]
    
    results = []
    
    for gamma in gamma_range:
        # 计算敏感性分析界限
        # 这里使用简化的Rosenbaum界限计算
        n_treated = len(treated)
        n_control = len(control)
        
        # 计算处理组和对照组的平均结果差异
        mean_diff = treated[outcome].mean() - control[outcome].mean()
        
        # 简化敏感性界限(实际应用应使用更精确的方法)
        upper_bound = mean_diff * gamma
        lower_bound = mean_diff / gamma
        
        results.append({
            'Gamma': gamma,
            '原始效应': mean_diff,
            '敏感性上界': upper_bound,
            '敏感性下界': lower_bound,
            '结论稳健': lower_bound > 0  # 假设我们关心正效应
        })
    
    return pd.DataFrame(results)

# 执行敏感性分析
sensitivity_results = sensitivity_analysis_rosenbaum(data, 'income', 'college', 'propensity_score')
print("=== Rosenbaum敏感性分析 ===")
print(sensitivity_results.round(2))

def sensitivity_analysis_ebalance(data, outcome, treatment, observed_covariates, unobserved_confounding=0.5):
    """
    评估需要多大的未观测混杂才能改变结论
    """
    # 计算观测混杂的联合解释力
    X = data[observed_covariates]
    y = data[outcome]
    
    # 拟合包含协变量的模型
    X_with_const = sm.add_constant(X)
    model = sm.OLS(y, X_with_const).fit()
    r2_observed = model.rsquared
    
    # 计算处理效应的t统计量
    t_observed = (methods_comparison[methods_comparison['方法'] == '双重稳健']['ATE估计'].values[0] / 
                  methods_comparison[methods_comparison['方法'] == '双重稳健']['ATE估计'].values[0].std())
    
    # 简化的敏感性分析(Frank, 2000方法)
    # 需要改变结论的未观测混杂强度
    threshold = (t_observed**2 / (t_observed**2 + len(data) - len(observed_covariates) - 1)) * (1 - r2_observed)
    
    return {
        '观测变量R²': r2_observed,
        '处理效应t值': t_observed,
        '改变结论所需的未观测混杂R²': threshold,
        '结论稳健性': '稳健' if threshold > unobserved_confounding else '敏感'
    }

# 执行e值敏感性分析
evalue_analysis = sensitivity_analysis_ebalance(data, 'income', 'college', covariates)
print("\n=== E值敏感性分析 ===")
for key, value in evalue_analysis.items():
    print(f"{key}: {value}")

6.2 安慰剂检验与证伪检验

def placebo_test(data, outcome, treatment, covariates, n_simulations=1000):
    """
    安慰剂检验:在已知无效应的变量上测试方法
    """
    # 生成安慰剂处理变量(与真实处理无关)
    np.random.seed(42)
    placebo_effects = []
    
    for i in range(n_simulations):
        placebo_treatment = np.random.binomial(1, 0.5, len(data))
        
        # 使用相同的因果推断方法
        placebo_data = data.copy()
        placebo_data['placebo_treatment'] = placebo_treatment
        
        # 估计倾向得分
        ps_model_placebo = LogisticRegression(random_state=42)
        ps_model_placebo.fit(placebo_data[covariates], placebo_treatment)
        placebo_data['ps_placebo'] = ps_model_placebo.predict_proba(placebo_data[covariates])[:, 1]
        
        # 逆概率加权
        placebo_data['ipw_placebo'] = np.where(placebo_data['placebo_treatment'] == 1, 
                                             1 / placebo_data['ps_placebo'], 
                                             1 / (1 - placebo_data['ps_placebo']))
        
        # 标准化权重
        ps_mean = placebo_data['ps_placebo'].mean()
        placebo_data['ipw_stabilized_placebo'] = np.where(
            placebo_data['placebo_treatment'] == 1,
            ps_mean / placebo_data['ps_placebo'],
            (1 - ps_mean) / (1 - placebo_data['ps_placebo'])
        )
        
        # 估计安慰剂效应
        weighted_treat = np.average(placebo_data[placebo_data['placebo_treatment'] == 1][outcome], 
                                   weights=placebo_data[placebo_data['placebo_treatment'] == 1]['ipw_stabilized_placebo'])
        weighted_control = np.average(placebo_data[placebo_data['placebo_treatment'] == 0][outcome], 
                                     weights=placebo_data[placebo_data['placebo_treatment'] == 0]['ipw_stabilized_placebo'])
        
        placebo_effect = weighted_treat - weighted_control
        placebo_effects.append(placebo_effect)
    
    # 计算真实效应在安慰剂分布中的p值
    real_effect = ipw_ate_est
    p_value = np.mean(np.abs(placebo_effects) >= np.abs(real_effect))
    
    return placebo_effects, real_effect, p_value

# 执行安慰剂检验
placebo_effects, real_effect, placebo_p = placebo_test(data, 'income', 'college', covariates, n_simulations=500)

print("=== 安慰剂检验结果 ===")
print(f"真实效应: {real_effect:.2f}")
print(f"安慰剂效应均值: {np.mean(placebo_effects):.2f}")
print(f"安慰剂效应标准差: {np.std(placebo_effects):.2f}")
print(f"安慰剂检验p值: {placebo_p:.4f}")

# 可视化安慰剂检验
plt.figure(figsize=(10, 6))
plt.hist(placebo_effects, bins=50, alpha=0.7, color='gray', label='安慰剂效应分布')
plt.axvline(x=real_effect, color='red', linewidth=2, label=f'真实效应: {real_effect:.2f}')
plt.axvline(x=np.mean(placebo_effects), color='blue', linestyle='--', 
           label=f'安慰剂均值: {np.mean(placebo_effects):.2f}')
plt.xlabel('处理效应')
plt.ylabel('频数')
plt.title('安慰剂检验:真实效应在安慰剂分布中的位置')
plt.legend()
plt.show()
敏感性分析
Rosenbaum界限分析
E值分析
安慰剂检验
评估隐藏偏差
评估未观测混杂强度
验证方法有效性
结论稳健性判断
稳健结论
敏感结论

VII. 最佳实践总结与实施建议

基于我们的分析和实践,我们总结出观察性研究中因果推断的完整工作流程和最佳实践:

7.1 完整工作流程

阶段 核心任务 关键产出
研究设计 明确因果问题,识别处理变量和结果变量 因果图,识别假设清单
数据准备 收集所有相关协变量,处理缺失值 清理后的数据集,变量描述
探索性分析 检查重叠性,评估预处理平衡 平衡性表格,倾向得分分布图
方法实施 选择并实施适当的因果推断方法 倾向得分模型,处理效应估计
平衡验证 检查加权/匹配后协变量平衡 标准化差异比较,平衡诊断图
敏感性分析 评估未观测混杂的影响 敏感性参数,稳健性结论
结果解释 结合领域知识解释因果效应 效应大小,实际意义评估

7.2 实施要点检查表

def best_practices_checklist():
    """
    因果推断最佳实践检查表
    """
    checklist = {
        "研究设计": [
            "是否明确了因果问题?",
            "是否绘制了因果图?",
            "是否列出了所有识别假设?",
            "是否考虑了所有相关混杂因素?"
        ],
        "数据质量": [
            "样本量是否充足?",
            "处理变量定义是否清晰?",
            "结果变量测量是否可靠?",
            "协变量测量是否在处理前?"
        ],
        "分析方法": [
            "是否检查了重叠假设?",
            "是否选择了适当的因果推断方法?",
            "是否验证了模型假设?",
            "是否进行了平衡性检查?"
        ],
        "稳健性检验": [
            "是否进行了敏感性分析?",
            "是否使用了多种估计方法?",
            "是否进行了安慰剂检验?",
            "是否评估了极端值影响?"
        ],
        "结果报告": [
            "是否报告了效应大小和置信区间?",
            "是否讨论了局限性?",
            "是否评估了实际意义?",
            "结论是否谨慎?"
        ]
    }
    
    return checklist

# 生成检查表
checklist = best_practices_checklist()
print("=== 因果推断最佳实践检查表 ===")
for category, items in checklist.items():
    print(f"\n{category}:")
    for item in items:
        print(f"  □ {item}")

7.3 常见陷阱与规避策略

常见陷阱 表现特征 规避策略
遗忘变量偏差 忽略重要混杂因素 绘制完整因果图,进行敏感性分析
选择偏差 样本缺失与处理相关 使用逆概率加权,评估缺失机制
测量误差 变量测量不准确 使用验证数据,进行测量误差校正
模型误设 函数形式不正确 使用机器学习方法,进行模型诊断
过度控制偏差 控制中介变量 基于因果图区分混杂和中介
选择性报告 只报告显著结果 预注册分析计划,报告所有分析
【声明】本内容来自华为云开发者社区博主,不代表华为云及华为云开发者社区的观点和立场。转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息,否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。