倾向得分匹配全流程详解与实战

举报
数字扫地僧 发表于 2025/09/30 17:55:15 2025/09/30
【摘要】 I. 引言:为什么需要倾向得分匹配 观察性研究中的因果推断挑战在医学、经济学、社会科学等领域,研究者经常面临一个根本性问题:如何从非实验数据中得出因果结论?与随机对照试验不同,观察性研究中处理组和对照组往往在基线特征上存在系统性差异,这导致简单的组间比较会产生有偏估计。举个例子:假设我们想研究大学教育对个人收入的影响。上大学的人和不上大学的人在能力、家庭背景、动机等方面可能存在显著差异。如...

I. 引言:为什么需要倾向得分匹配

观察性研究中的因果推断挑战

在医学、经济学、社会科学等领域,研究者经常面临一个根本性问题:如何从非实验数据中得出因果结论?与随机对照试验不同,观察性研究中处理组和对照组往往在基线特征上存在系统性差异,这导致简单的组间比较会产生有偏估计。

举个例子:假设我们想研究大学教育对个人收入的影响。上大学的人和不上大学的人在能力、家庭背景、动机等方面可能存在显著差异。如果直接比较两组人的收入,我们无法确定收入差异是教育带来的,还是这些前置差异造成的。

倾向得分匹配的基本思想

倾向得分匹配由Rosenbaum和Rubin在1983年提出,其核心思想是通过模拟随机化来创建可比较的组。具体来说,PSM通过以下步骤实现:

  1. 为每个个体估计接受处理的概率(倾向得分)
  2. 基于倾向得分匹配处理组和对照组个体
  3. 在匹配后的样本中比较结果变量
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.linear_model import LogisticRegression
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.neighbors import NearestNeighbors
import statsmodels.api as sm
from scipy import stats
import warnings
warnings.filterwarnings('ignore')

# 设置中文字体
plt.rcParams['font.sans-serif'] = ['SimHei']
plt.rcParams['axes.unicode_minus'] = False

print("倾向得分匹配全流程详解与实战")
print("=" * 50)

II. 理论基础:倾向得分的统计基础

倾向得分的定义与性质

倾向得分定义为在给定协变量X的条件下,个体接受处理的条件概率:

e(X) = P(D=1|X)

其中D是处理指示变量(1表示处理组,0表示对照组),X是观测到的协变量。

Rosenbaum和Rubin证明了倾向得分的关键性质:如果满足强可忽略处理分配假设,那么在给定倾向得分的条件下,处理分配与潜在结果独立:

(Y₁, Y₀) ⫫ D | e(X)

这意味着,一旦我们控制了倾向得分,处理组和对照组在协变量分布上应该是相似的,就像随机化实验一样。

倾向得分匹配的假设

成功的PSM分析依赖于三个关键假设:

假设名称 数学表达 实际含义 验证方法
条件独立性 (Y₁, Y₀) ⫫ D | X 给定协变量后,处理分配与潜在结果独立 理论论证,无法完全检验
共同支持 0 < P(D=1|X) < 1 每个个体都有接受和不接受处理的可能性 检查倾向得分分布重叠
平衡性 X ⫫ D | e(X) 给定倾向得分后,协变量在处理组间平衡 平衡性检验

倾向得分匹配的优势与局限

优势

  • 降低维度:将多变量匹配简化为单变量匹配
  • 直观易懂:基于概率概念,易于解释
  • 灵活性:可与多种匹配算法结合

局限

  • 只能控制观测到的混淆变量
  • 对模型设定敏感
  • 可能损失样本量
倾向得分匹配
三大核心假设
条件独立性假设
共同支持假设
平衡性假设
无法检验
依赖理论
可视化检查
倾向得分分布
统计检验
标准化差异
敏感性分析
修剪样本
调整模型
有效因果推断

III. 数据准备与探索性分析

实例背景:职业培训项目效果评估

假设我们正在评估一个职业培训项目对参与者收入的影响。我们有一个包含2000个个体的数据集,其中500人参加了培训项目(处理组),1500人未参加(对照组)。

我们观测到的协变量包括:

  • 年龄、性别、教育年限
  • 之前的工作经验、之前收入
  • 居住地区、家庭背景等
# 模拟职业培训项目数据
np.random.seed(2024)
n_total = 2000
n_treated = 500

# 生成协变量
data = pd.DataFrame({
    'age': np.random.normal(35, 8, n_total),  # 年龄
    'gender': np.random.binomial(1, 0.45, n_total),  # 性别,1=女性,0=男性
    'education': np.random.poisson(14, n_total),  # 教育年限
    'previous_income': np.random.normal(30000, 8000, n_total),  # 之前收入
    'experience': np.random.normal(5, 3, n_total),  # 工作经验
    'urban': np.random.binomial(1, 0.6, n_total),  # 是否城市
    'family_support': np.random.normal(0, 1, n_total)  # 家庭支持程度
})

# 生成真实的倾向得分模型
# 培训参与与年龄、教育、之前收入等相关
X_ps = np.column_stack([
    data['age'], data['gender'], data['education'], 
    data['previous_income'], data['experience'],
    data['urban'], data['family_support']
])
X_ps = sm.add_constant(X_ps)

# 真实倾向得分模型参数
true_beta = np.array([-2.5, 0.02, -0.3, 0.15, -0.00001, 0.1, 0.4, 0.5])
true_ps = 1 / (1 + np.exp(-X_ps @ true_beta))

# 生成处理分配
treatment = np.random.binomial(1, true_ps)

# 确保处理组有500人
while np.sum(treatment) != n_treated:
    treatment = np.random.binomial(1, true_ps)

data['treatment'] = treatment
data['true_ps'] = true_ps

# 生成结果变量(收入)
# 真实处理效应:培训使收入增加8000元
base_income = 20000 + 500 * data['age'] + 3000 * data['education'] + 0.7 * data['previous_income']
treatment_effect = 8000  # 真实处理效应
noise = np.random.normal(0, 5000, n_total)

data['income'] = base_income + treatment_effect * data['treatment'] + noise

print("数据概览:")
print(f"总样本量: {len(data)}")
print(f"处理组: {sum(data['treatment'])}人")
print(f"对照组: {sum(1 - data['treatment'])}人")
print(f"\n处理组平均收入: {data[data['treatment']==1]['income'].mean():.2f}元")
print(f"对照组平均收入: {data[data['treatment']==0]['income'].mean():.2f}元")
print(f"简单差异: {data[data['treatment']==1]['income'].mean() - data[data['treatment']==0]['income'].mean():.2f}元")
print(f"真实处理效应: {treatment_effect}元")

# 基本统计描述
print("\n协变量描述统计:")
print(data.describe())

探索性分析:处理组与对照组的基线差异

在进行匹配前,我们需要了解处理组和对照组在基线特征上的差异。

# 探索性分析:基线特征比较
def compare_groups(data, treatment_col, covariates):
    """比较处理组和对照组的基线特征"""
    results = []
    
    for cov in covariates:
        treated = data[data[treatment_col] == 1][cov]
        control = data[data[treatment_col] == 0][cov]
        
        # 均值差异
        mean_diff = treated.mean() - control.mean()
        
        # t检验
        t_stat, p_value = stats.ttest_ind(treated, control)
        
        # 标准化均值差异 (SMD)
        pooled_std = np.sqrt((treated.std()**2 + control.std()**2) / 2)
        smd = mean_diff / pooled_std if pooled_std != 0 else 0
        
        results.append({
            '变量': cov,
            '处理组均值': treated.mean(),
            '对照组均值': control.mean(),
            '均值差异': mean_diff,
            '标准化差异': smd,
            'p值': p_value
        })
    
    return pd.DataFrame(results)

# 比较基线特征
covariates = ['age', 'gender', 'education', 'previous_income', 'experience', 'urban', 'family_support']
baseline_comparison = compare_groups(data, 'treatment', covariates)

print("处理组与对照组基线特征比较:")
print(baseline_comparison.round(4))

# 可视化基线差异
fig, axes = plt.subplots(2, 4, figsize=(20, 10))
axes = axes.ravel()

for i, cov in enumerate(covariates):
    # 分布图
    data[data['treatment'] == 0][cov].hist(alpha=0.7, label='对照组', bins=20, ax=axes[i])
    data[data['treatment'] == 1][cov].hist(alpha=0.7, label='处理组', bins=20, ax=axes[i])
    axes[i].set_title(f'{cov}\nSMD: {baseline_comparison.iloc[i]["标准化差异"]:.3f}')
    axes[i].legend()

# SMD可视化
axes[7].barh(range(len(covariates)), baseline_comparison['标准化差异'])
axes[7].set_yticks(range(len(covariates)))
axes[7].set_yticklabels(covariates)
axes[7].axvline(x=0, color='black', linestyle='-', alpha=0.3)
axes[7].axvline(x=0.1, color='red', linestyle='--', alpha=0.5, label='平衡阈值 (0.1)')
axes[7].set_xlabel('标准化均值差异 (SMD)')
axes[7].set_title('基线协变量平衡性')
axes[7].legend()

plt.tight_layout()
plt.show()

探索性分析显示,处理组和对照组在多个协变量上存在显著差异,这表明简单的组间比较会产生有偏估计。

不平衡
平衡
开始: 数据准备
生成模拟数据
探索性分析
检查基线平衡
需要进行倾向得分匹配
可直接比较
计算倾向得分
简单组间比较
执行匹配算法
评估匹配效果
平衡性是否达标?
估计处理效应
结果解释
结束

IV. 倾向得分估计

倾向得分模型设定

倾向得分通常使用逻辑回归模型估计,但也可以使用其他机器学习方法。模型设定的关键是要包含所有与处理分配和结果变量相关的协变量。

# 倾向得分估计
def estimate_propensity_score(data, treatment_col, covariates, method='logistic', **kwargs):
    """
    估计倾向得分
    
    参数:
    - data: 数据框
    - treatment_col: 处理变量列名
    - covariates: 协变量列表
    - method: 估计方法 ('logistic', 'gbm')
    
    返回:
    - data_with_ps: 包含倾向得分的数据
    - model: 拟合的模型
    """
    
    X = data[covariates]
    y = data[treatment_col]
    
    if method == 'logistic':
        # 逻辑回归
        X_with_const = sm.add_constant(X)
        model = sm.Logit(y, X_with_const).fit(disp=False)
        propensity_scores = model.predict(X_with_const)
        
    elif method == 'gbm':
        # 梯度提升机
        gbm = GradientBoostingClassifier(**kwargs)
        gbm.fit(X, y)
        propensity_scores = gbm.predict_proba(X)[:, 1]
        model = gbm
    else:
        raise ValueError("方法必须是 'logistic' 或 'gbm'")
    
    data_with_ps = data.copy()
    data_with_ps['propensity_score'] = propensity_scores
    
    return data_with_ps, model

# 使用逻辑回归估计倾向得分
data_ps, ps_model_logit = estimate_propensity_score(
    data, 'treatment', covariates, method='logistic'
)

# 使用GBM估计倾向得分(作为比较)
data_ps_gbm, ps_model_gbm = estimate_propensity_score(
    data, 'treatment', covariates, method='gbm',
    n_estimators=100, max_depth=3, random_state=42
)

print("倾向得分模型结果 (逻辑回归):")
if hasattr(ps_model_logit, 'summary'):
    print(ps_model_logit.summary())

# 比较两种方法的倾向得分
fig, axes = plt.subplots(1, 3, figsize=(18, 5))

# 倾向得分分布
axes[0].hist(data_ps[data_ps['treatment']==0]['propensity_score'], 
             alpha=0.7, label='对照组', bins=20, density=True)
axes[0].hist(data_ps[data_ps['treatment']==1]['propensity_score'], 
             alpha=0.7, label='处理组', bins=20, density=True)
axes[0].set_xlabel('倾向得分 (逻辑回归)')
axes[0].set_ylabel('密度')
axes[0].set_title('倾向得分分布 - 逻辑回归')
axes[0].legend()

axes[1].hist(data_ps_gbm[data_ps_gbm['treatment']==0]['propensity_score'], 
             alpha=0.7, label='对照组', bins=20, density=True)
axes[1].hist(data_ps_gbm[data_ps_gbm['treatment']==1]['propensity_score'], 
             alpha=0.7, label='处理组', bins=20, density=True)
axes[1].set_xlabel('倾向得分 (GBM)')
axes[1].set_ylabel('密度')
axes[1].set_title('倾向得分分布 - GBM')
axes[1].legend()

# 两种方法比较
axes[2].scatter(data_ps['propensity_score'], data_ps_gbm['propensity_score'], 
                alpha=0.5, c=data_ps['treatment'], cmap='viridis')
axes[2].set_xlabel('逻辑回归倾向得分')
axes[2].set_ylabel('GBM倾向得分')
axes[2].set_title('两种方法倾向得分比较')
axes[2].plot([0, 1], [0, 1], 'r--', alpha=0.5)

plt.tight_layout()
plt.show()

# 评估倾向得分模型
from sklearn.metrics import roc_auc_score, accuracy_score

y_true = data_ps['treatment']
y_pred_logit = data_ps['propensity_score']
y_pred_gbm = data_ps_gbm['propensity_score']

print("\n倾向得分模型评估:")
print(f"逻辑回归 - AUC: {roc_auc_score(y_true, y_pred_logit):.4f}")
print(f"GBM - AUC: {roc_auc_score(y_true, y_pred_gbm):.4f}")
print(f"逻辑回归 - 准确率: {accuracy_score(y_true, y_pred_logit > 0.5):.4f}")
print(f"GBM - 准确率: {accuracy_score(y_true, y_pred_gbm > 0.5):.4f}")

共同支持区域检查

在进行匹配前,我们需要确保处理组和对照组的倾向得分分布有足够的重叠区域。

# 共同支持区域分析
def check_common_support(data, treatment_col, ps_col):
    """检查共同支持区域"""
    treated_ps = data[data[treatment_col] == 1][ps_col]
    control_ps = data[data[treatment_col] == 0][ps_col]
    
    # 计算共同支持区域
    min_treated, max_treated = treated_ps.min(), treated_ps.max()
    min_control, max_control = control_ps.min(), control_ps.max()
    
    common_min = max(min_treated, min_control)
    common_max = min(max_treated, max_control)
    
    # 识别在共同支持区域外的样本
    data_out = data.copy()
    data_out['in_common_support'] = (
        (data_out[ps_col] >= common_min) & 
        (data_out[ps_col] <= common_max)
    )
    
    support_info = {
        'common_min': common_min,
        'common_max': common_max,
        'n_treated_in_support': sum(data_out[data_out[treatment_col]==1]['in_common_support']),
        'n_control_in_support': sum(data_out[data_out[treatment_col]==0]['in_common_support']),
        'n_treated_total': sum(data_out[treatment_col]==1),
        'n_control_total': sum(data_out[treatment_col]==0)
    }
    
    return data_out, support_info

# 检查共同支持区域
data_support, support_info = check_common_support(data_ps, 'treatment', 'propensity_score')

print("\n共同支持区域分析:")
print(f"倾向得分重叠范围: [{support_info['common_min']:.4f}, {support_info['common_max']:.4f}]")
print(f"处理组在共同支持区域内: {support_info['n_treated_in_support']}/{support_info['n_treated_total']} "
      f"({support_info['n_treated_in_support']/support_info['n_treated_total']*100:.1f}%)")
print(f"对照组在共同支持区域内: {support_info['n_control_in_support']}/{support_info['n_control_total']} "
      f"({support_info['n_control_in_support']/support_info['n_control_total']*100:.1f}%)")

# 可视化共同支持区域
plt.figure(figsize=(10, 6))

# 密度图
treated_ps = data_ps[data_ps['treatment']==1]['propensity_score']
control_ps = data_ps[data_ps['treatment']==0]['propensity_score']

plt.hist(treated_ps, alpha=0.7, label='处理组', bins=30, density=True, color='red')
plt.hist(control_ps, alpha=0.7, label='对照组', bins=30, density=True, color='blue')

# 标记共同支持区域
plt.axvline(support_info['common_min'], color='black', linestyle='--', 
            label=f'共同支持区域下限: {support_info["common_min"]:.3f}')
plt.axvline(support_info['common_max'], color='black', linestyle='--', 
            label=f'共同支持区域上限: {support_info["common_max"]:.3f}')

plt.xlabel('倾向得分')
plt.ylabel('密度')
plt.title('倾向得分分布与共同支持区域')
plt.legend()
plt.show()

共同支持区域分析帮助我们确定哪些样本可以用于匹配。通常我们会排除倾向得分极端(接近0或1)的样本,因为这些样本缺乏合适的匹配对象。

65%20%10%5%倾向得分估计方法比较逻辑回归梯度提升机随机森林神经网络

V. 匹配算法实现

多种匹配方法实现

倾向得分匹配有多种算法,每种都有其优缺点。我们将实现几种常用的匹配方法。

# 匹配算法实现
class PropensityScoreMatcher:
    """倾向得分匹配器"""
    
    def __init__(self, caliper=0.2, caliper_method='std', replace=False):
        """
        参数:
        - caliper: 卡尺值
        - caliper_method: 卡尺计算方法 ('std', 'absolute')
        - replace: 是否允许重复匹配
        """
        self.caliper = caliper
        self.caliper_method = caliper_method
        self.replace = replace
        
    def nearest_neighbor_match(self, data, treatment_col, ps_col, outcome_col):
        """最近邻匹配"""
        treated = data[data[treatment_col] == 1].copy()
        control = data[data[treatment_col] == 0].copy()
        
        # 计算卡尺
        if self.caliper_method == 'std':
            ps_std = data[ps_col].std()
            caliper_value = self.caliper * ps_std
        else:
            caliper_value = self.caliper
            
        # 最近邻匹配
        nbrs = NearestNeighbors(n_neighbors=1, metric='euclidean')
        nbrs.fit(control[[ps_col]])
        
        distances, indices = nbrs.kneighbors(treated[[ps_col]])
        
        # 应用卡尺并创建匹配对
        matches = []
        match_ids = []
        
        for i, (dist, idx) in enumerate(zip(distances, indices)):
            if dist[0] <= caliper_value:
                treated_row = treated.iloc[i].copy()
                control_row = control.iloc[idx[0]].copy()
                
                match_id = len(match_ids)
                treated_row['match_id'] = match_id
                control_row['match_id'] = match_id
                treated_row['distance'] = dist[0]
                control_row['distance'] = dist[0]
                
                matches.extend([treated_row, control_row])
                match_ids.append(match_id)
                
                # 如果不允许重复匹配,则移除已匹配的对照
                if not self.replace:
                    control = control.drop(control.index[idx[0]])
                    if len(control) > 0:
                        nbrs.fit(control[[ps_col]])
                    else:
                        break
        
        matched_data = pd.DataFrame(matches)
        
        # 计算匹配后的处理效应
        if len(matched_data) > 0:
            treated_matched = matched_data[matched_data[treatment_col] == 1][outcome_col]
            control_matched = matched_data[matched_data[treatment_col] == 0][outcome_col]
            ate = treated_matched.mean() - control_matched.mean()
        else:
            ate = np.nan
            
        match_info = {
            'n_treated_matched': len(treated_matched) if len(matched_data) > 0 else 0,
            'n_control_matched': len(control_matched) if len(matched_data) > 0 else 0,
            'ate': ate,
            'matching_ratio': len(treated_matched) / len(treated) if len(treated) > 0 else 0
        }
        
        return matched_data, match_info
    
    def radius_match(self, data, treatment_col, ps_col, outcome_col):
        """半径匹配"""
        treated = data[data[treatment_col] == 1].copy()
        control = data[data[treatment_col] == 0].copy()
        
        # 计算半径
        if self.caliper_method == 'std':
            ps_std = data[ps_col].std()
            radius = self.caliper * ps_std
        else:
            radius = self.caliper
            
        matches = []
        match_id = 0
        
        for i, treated_row in treated.iterrows():
            # 找到在半径内的所有对照
            distances = np.abs(control[ps_col] - treated_row[ps_col])
            within_radius = control[distances <= radius]
            
            if len(within_radius) > 0:
                # 随机选择一个对照(如果不允许重复匹配)
                if not self.replace and len(within_radius) > 0:
                    control_match = within_radius.sample(1).iloc[0]
                    control = control.drop(control_match.name)
                elif len(within_radius) > 0:
                    control_match = within_radius.sample(1).iloc[0]
                else:
                    continue
                    
                # 创建匹配对
                treated_match = treated_row.copy()
                distance = np.abs(control_match[ps_col] - treated_match[ps_col])
                
                treated_match['match_id'] = match_id
                control_match['match_id'] = match_id
                treated_match['distance'] = distance
                control_match['distance'] = distance
                
                matches.extend([treated_match, control_match])
                match_id += 1
        
        matched_data = pd.DataFrame(matches) if matches else pd.DataFrame()
        
        # 计算处理效应
        if len(matched_data) > 0:
            treated_matched = matched_data[matched_data[treatment_col] == 1][outcome_col]
            control_matched = matched_data[matched_data[treatment_col] == 0][outcome_col]
            ate = treated_matched.mean() - control_matched.mean()
        else:
            ate = np.nan
            
        match_info = {
            'n_treated_matched': len(treated_matched) if len(matched_data) > 0 else 0,
            'n_control_matched': len(control_matched) if len(matched_data) > 0 else 0,
            'ate': ate,
            'matching_ratio': len(treated_matched) / len(treated) if len(treated) > 0 else 0
        }
        
        return matched_data, match_info
    
    def kernel_match(self, data, treatment_col, ps_col, outcome_col, bandwidth=0.06):
        """核匹配"""
        treated = data[data[treatment_col] == 1].copy()
        control = data[data[treatment_col] == 0].copy()
        
        # 为每个处理组个体计算加权结果
        counterfactuals = []
        
        for i, treated_row in treated.iterrows():
            # 计算核权重
            distances = np.abs(control[ps_col] - treated_row[ps_col])
            weights = np.exp(-0.5 * (distances / bandwidth) ** 2)
            weights = weights / np.sum(weights)  # 标准化权重
            
            # 计算加权平均结果
            weighted_outcome = np.sum(control[outcome_col] * weights)
            counterfactuals.append(weighted_outcome)
        
        # 计算处理效应
        if len(counterfactuals) > 0:
            actual_outcomes = treated[outcome_col].values
            treatment_effects = actual_outcomes - counterfactuals
            ate = np.mean(treatment_effects)
        else:
            ate = np.nan
            
        match_info = {
            'n_treated_matched': len(treated),
            'n_control_used': len(control),
            'ate': ate,
            'individual_te': treatment_effects if len(counterfactuals) > 0 else []
        }
        
        # 核匹配不产生具体的匹配对数据框
        return None, match_info

# 应用不同的匹配方法
print("应用不同匹配方法...")

# 1. 最近邻匹配
matcher_nn = PropensityScoreMatcher(caliper=0.2, caliper_method='std', replace=False)
matched_nn, info_nn = matcher_nn.nearest_neighbor_match(
    data_support[data_support['in_common_support']],  # 只在共同支持区域内匹配
    'treatment', 'propensity_score', 'income'
)

# 2. 半径匹配
matcher_radius = PropensityScoreMatcher(caliper=0.1, caliper_method='std', replace=False)
matched_radius, info_radius = matcher_radius.radius_match(
    data_support[data_support['in_common_support']],
    'treatment', 'propensity_score', 'income'
)

# 3. 核匹配
matcher_kernel = PropensityScoreMatcher()
_, info_kernel = matcher_kernel.kernel_match(
    data_support[data_support['in_common_support']],
    'treatment', 'propensity_score', 'income'
)

# 比较匹配结果
matching_results = pd.DataFrame({
    '匹配方法': ['最近邻匹配', '半径匹配', '核匹配'],
    '匹配的处理组数': [info_nn['n_treated_matched'], info_radius['n_treated_matched'], info_kernel['n_treated_matched']],
    '使用的对照组数': [info_nn['n_control_matched'], info_radius['n_control_matched'], info_kernel['n_control_used']],
    '匹配率': [info_nn['matching_ratio'], info_radius['matching_ratio'], 1.0],
    'ATE估计': [info_nn['ate'], info_radius['ate'], info_kernel['ate']]
})

print("\n不同匹配方法结果比较:")
print(matching_results.round(4))

# 可视化匹配效果
fig, axes = plt.subplots(2, 2, figsize=(15, 12))

# 原始倾向得分分布
axes[0,0].hist(data_ps[data_ps['treatment']==0]['propensity_score'], 
               alpha=0.7, label='对照组', bins=20, density=True)
axes[0,0].hist(data_ps[data_ps['treatment']==1]['propensity_score'], 
               alpha=0.7, label='处理组', bins=20, density=True)
axes[0,0].set_xlabel('倾向得分')
axes[0,0].set_ylabel('密度')
axes[0,0].set_title('原始倾向得分分布')
axes[0,0].legend()

# 最近邻匹配后分布
if len(matched_nn) > 0:
    axes[0,1].hist(matched_nn[matched_nn['treatment']==0]['propensity_score'], 
                   alpha=0.7, label='对照组', bins=20, density=True)
    axes[0,1].hist(matched_nn[matched_nn['treatment']==1]['propensity_score'], 
                   alpha=0.7, label='处理组', bins=20, density=True)
    axes[0,1].set_xlabel('倾向得分')
    axes[0,1].set_ylabel('密度')
    axes[0,1].set_title('最近邻匹配后倾向得分分布')
    axes[0,1].legend()

# 匹配距离分布
if len(matched_nn) > 0:
    axes[1,0].hist(matched_nn['distance'], bins=30, alpha=0.7)
    axes[1,0].set_xlabel('匹配距离')
    axes[1,0].set_ylabel('频数')
    axes[1,0].set_title('匹配距离分布')

# ATE比较
methods = ['原始差异', '最近邻匹配', '半径匹配', '核匹配', '真实效应']
ate_values = [
    data[data['treatment']==1]['income'].mean() - data[data['treatment']==0]['income'].mean(),
    info_nn['ate'],
    info_radius['ate'],
    info_kernel['ate'],
    treatment_effect
]

colors = ['red', 'orange', 'orange', 'orange', 'green']
axes[1,1].barh(methods, ate_values, color=colors, alpha=0.7)
axes[1,1].set_xlabel('处理效应估计 (元)')
axes[1,1].set_title('不同方法的处理效应估计')
axes[1,1].grid(axis='x', alpha=0.3)

plt.tight_layout()
plt.show()

匹配方法选择指南

不同的匹配方法适用于不同的场景:

匹配方法 优点 缺点 适用场景
最近邻匹配 简单直观,保留最多信息 可能匹配质量不一,对卡尺敏感 样本量充足,追求精确匹配
半径匹配 确保匹配质量,更稳健 可能损失部分处理组样本 对匹配质量要求高
核匹配 使用所有信息,效率高 结果不易解释,带宽选择敏感 大样本,关注平均效应
分层匹配 简单易实现 分层数选择主观 探索性分析,初步匹配
大样本
小样本
中等
选择匹配方法
样本量大小?
核匹配/半径匹配
最近邻匹配
对匹配质量要求?
允许重复匹配?
半径匹配
核匹配
有放回最近邻
无放回最近邻
设置合适卡尺
选择适当带宽
计算匹配权重
执行匹配

VI. 平衡性检验与匹配质量评估

匹配后,我们必须检验匹配质量,确保处理组和对照组在协变量上达到平衡。

# 平衡性检验函数
def assess_balance(original_data, matched_data, treatment_col, covariates, ps_col=None):
    """评估匹配前后的平衡性"""
    
    balance_results = []
    
    for data_type, data_dict in [('匹配前', original_data), ('匹配后', matched_data)]:
        if len(data_dict) == 0:
            continue
            
        for cov in covariates:
            treated = data_dict[data_dict[treatment_col] == 1][cov]
            control = data_dict[data_dict[treatment_col] == 0][cov]
            
            # 基本统计量
            treated_mean = treated.mean()
            control_mean = control.mean()
            mean_diff = treated_mean - control_mean
            
            # 标准化均值差异
            pooled_std = np.sqrt((treated.std()**2 + control.std()**2) / 2)
            smd = mean_diff / pooled_std if pooled_std != 0 else 0
            
            # 方差比
            var_ratio = treated.var() / control.var() if control.var() != 0 else np.inf
            
            # t检验
            if len(treated) > 1 and len(control) > 1:
                t_stat, p_value = stats.ttest_ind(treated, control)
            else:
                t_stat, p_value = np.nan, np.nan
            
            balance_results.append({
                '阶段': data_type,
                '变量': cov,
                '处理组均值': treated_mean,
                '对照组均值': control_mean,
                '均值差异': mean_diff,
                '标准化差异': smd,
                '方差比': var_ratio,
                'p值': p_value
            })
    
    balance_df = pd.DataFrame(balance_results)
    
    # 计算总体平衡性指标
    overall_balance = balance_df.groupby('阶段').agg({
        '标准化差异': [lambda x: np.mean(np.abs(x)), lambda x: np.max(np.abs(x))],
        'p值': lambda x: np.mean(x < 0.05)
    }).round(4)
    
    overall_balance.columns = ['平均绝对SMD', '最大绝对SMD', '显著比例']
    
    return balance_df, overall_balance

# 执行平衡性检验(使用最近邻匹配结果)
if len(matched_nn) > 0:
    balance_details, balance_overall = assess_balance(
        data, matched_nn, 'treatment', covariates
    )
    
    print("平衡性检验详情:")
    print(balance_details.round(4))
    
    print("\n总体平衡性指标:")
    print(balance_overall)

# 可视化平衡性改善
if len(matched_nn) > 0:
    # 提取匹配前后的SMD
    smd_before = balance_details[balance_details['阶段'] == '匹配前']['标准化差异'].values
    smd_after = balance_details[balance_details['阶段'] == '匹配后']['标准化差异'].values
    
    fig, axes = plt.subplots(1, 2, figsize=(15, 6))
    
    # SMD比较图
    max_smd = max(np.max(np.abs(smd_before)), np.max(np.abs(smd_after)))
    axes[0].scatter(smd_before, smd_after, s=60, alpha=0.7)
    for i, cov in enumerate(covariates):
        axes[0].annotate(cov, (smd_before[i], smd_after[i]), 
                        xytext=(5, 5), textcoords='offset points', fontsize=9)
    axes[0].axhline(y=0, color='black', linestyle='-', alpha=0.3)
    axes[0].axvline(x=0, color='black', linestyle='-', alpha=0.3)
    axes[0].axhline(y=0.1, color='red', linestyle='--', alpha=0.5, label='平衡阈值 (0.1)')
    axes[0].axhline(y=-0.1, color='red', linestyle='--', alpha=0.5)
    axes[0].axvline(x=0.1, color='red', linestyle='--', alpha=0.5)
    axes[0].axvline(x=-0.1, color='red', linestyle='--', alpha=0.5)
    axes[0].set_xlim(-max_smd*1.1, max_smd*1.1)
    axes[0].set_ylim(-max_smd*1.1, max_smd*1.1)
    axes[0].set_xlabel('匹配前标准化差异')
    axes[0].set_ylabel('匹配后标准化差异')
    axes[0].set_title('匹配前后平衡性比较')
    axes[0].legend()
    axes[0].grid(True, alpha=0.3)
    
    # SMD改善图
    x_pos = np.arange(len(covariates))
    width = 0.35
    
    axes[1].bar(x_pos - width/2, np.abs(smd_before), width, label='匹配前', alpha=0.7)
    axes[1].bar(x_pos + width/2, np.abs(smd_after), width, label='匹配后', alpha=0.7)
    axes[1].axhline(y=0.1, color='red', linestyle='--', label='平衡阈值 (0.1)')
    axes[1].set_xlabel('协变量')
    axes[1].set_ylabel('绝对标准化差异')
    axes[1].set_title('匹配前后绝对标准化差异比较')
    axes[1].set_xticks(x_pos)
    axes[1].set_xticklabels(covariates, rotation=45)
    axes[1].legend()
    axes[1].grid(True, alpha=0.3)
    
    plt.tight_layout()
    plt.show()

# 匹配质量综合评估
def evaluate_matching_quality(original_data, matched_data, treatment_col, covariates):
    """综合评估匹配质量"""
    
    quality_metrics = {}
    
    # 1. 样本保留率
    n_treated_original = sum(original_data[treatment_col] == 1)
    n_treated_matched = sum(matched_data[treatment_col] == 1) if len(matched_data) > 0 else 0
    quality_metrics['处理组保留率'] = n_treated_matched / n_treated_original
    
    # 2. 平衡性改善
    balance_before, _ = assess_balance(original_data, original_data, treatment_col, covariates)
    balance_after, _ = assess_balance(original_data, matched_data, treatment_col, covariates)
    
    smd_before = balance_before[balance_before['阶段'] == '匹配前']['标准化差异'].abs().mean()
    smd_after = balance_after[balance_after['阶段'] == '匹配后']['标准化差异'].abs().mean()
    quality_metrics['SMD改善比例'] = (smd_before - smd_after) / smd_before
    
    # 3. 平衡变量比例
    n_balanced_before = sum(balance_before[balance_before['阶段'] == '匹配前']['标准化差异'].abs() < 0.1)
    n_balanced_after = sum(balance_after[balance_after['阶段'] == '匹配后']['标准化差异'].abs() < 0.1)
    quality_metrics['平衡变量比例'] = n_balanced_after / len(covariates)
    
    # 4. 方差比在0.5-2之间的变量比例
    var_ratios_after = balance_after[balance_after['阶段'] == '匹配后']['方差比']
    n_good_var = sum((var_ratios_after >= 0.5) & (var_ratios_after <= 2))
    quality_metrics['良好方差比比例'] = n_good_var / len(covariates)
    
    return quality_metrics

# 评估匹配质量
if len(matched_nn) > 0:
    quality_metrics = evaluate_matching_quality(data, matched_nn, 'treatment', covariates)
    
    print("\n匹配质量综合评估:")
    for metric, value in quality_metrics.items():
        print(f"{metric}: {value:.4f}")
    
    # 质量评分
    quality_score = (
        quality_metrics['处理组保留率'] * 0.3 +
        quality_metrics['SMD改善比例'] * 0.3 +
        quality_metrics['平衡变量比例'] * 0.2 +
        quality_metrics['良好方差比比例'] * 0.2
    )
    print(f"\n综合质量评分: {quality_score:.4f}")

平衡性检验是PSM流程中至关重要的一步,它确保我们的匹配确实改善了组间可比性。

SMD < 0.1
SMD >= 0.1
修改卡尺/算法
匹配前平衡性检验
执行匹配
匹配后平衡性检验
平衡性达标
平衡性不达标
调整匹配参数
估计处理效应

VII. 处理效应估计与统计推断

在确认匹配质量良好后,我们可以估计处理效应并进行统计推断。

# 处理效应估计与推断
def estimate_treatment_effect(matched_data, treatment_col, outcome_col, method='paired_t'):
    """
    估计处理效应并进行统计推断
    
    参数:
    - matched_data: 匹配后的数据
    - treatment_col: 处理变量列名
    - outcome_col: 结果变量列名
    - method: 推断方法 ('paired_t', 'regression', 'bootstrap')
    """
    
    if len(matched_data) == 0:
        return {'ate': np.nan, 'se': np.nan, 'ci_lower': np.nan, 'ci_upper': np.nan, 'p_value': np.nan}
    
    treated = matched_data[matched_data[treatment_col] == 1]
    control = matched_data[matched_data[treatment_col] == 0]
    
    # 确保有匹配对
    if 'match_id' not in matched_data.columns or len(treated) != len(control):
        # 如果没有匹配对信息,使用独立样本t检验
        ate = treated[outcome_col].mean() - control[outcome_col].mean()
        t_stat, p_value = stats.ttest_ind(treated[outcome_col], control[outcome_col])
        se = ate / t_stat if t_stat != 0 else np.nan
    else:
        # 按匹配对计算差异
        matched_pairs = matched_data.groupby('match_id')
        differences = []
        
        for match_id, pair in matched_pairs:
            if len(pair) == 2:  # 确保是完整的匹配对
                treated_outcome = pair[pair[treatment_col] == 1][outcome_col].iloc[0]
                control_outcome = pair[pair[treatment_col] == 0][outcome_col].iloc[0]
                differences.append(treated_outcome - control_outcome)
        
        differences = np.array(differences)
        ate = np.mean(differences)
        
        # 配对t检验
        if len(differences) > 1:
            t_stat, p_value = stats.ttest_1samp(differences, 0)
            se = np.std(differences, ddof=1) / np.sqrt(len(differences))
        else:
            t_stat, p_value, se = np.nan, np.nan, np.nan
    
    # 计算置信区间
    if not np.isnan(ate) and not np.isnan(se):
        ci_lower = ate - 1.96 * se
        ci_upper = ate + 1.96 * se
    else:
        ci_lower, ci_upper = np.nan, np.nan
    
    return {
        'ate': ate,
        'se': se,
        'ci_lower': ci_lower,
        'ci_upper': ci_upper,
        'p_value': p_value,
        'n_pairs': len(treated) if 'match_id' not in matched_data.columns else len(matched_pairs)
    }

# 自助法标准误
def bootstrap_ate(data, treatment_col, outcome_col, ps_col, n_bootstrap=1000):
    """使用自助法计算ATE的标准误"""
    
    ate_bootstrap = []
    
    for i in range(n_bootstrap):
        # 自助重抽样
        bootstrap_sample = data.sample(n=len(data), replace=True)
        
        # 在重抽样样本上执行匹配
        matcher = PropensityScoreMatcher(caliper=0.2, caliper_method='std', replace=False)
        matched_bootstrap, _ = matcher.nearest_neighbor_match(
            bootstrap_sample, treatment_col, ps_col, outcome_col
        )
        
        # 计算ATE
        if len(matched_bootstrap) > 0:
            treated = matched_bootstrap[matched_bootstrap[treatment_col] == 1][outcome_col]
            control = matched_bootstrap[matched_bootstrap[treatment_col] == 0][outcome_col]
            ate_bootstrap.append(treated.mean() - control.mean())
    
    ate_bootstrap = np.array(ate_bootstrap)
    
    return {
        'ate_bootstrap_mean': np.mean(ate_bootstrap),
        'ate_bootstrap_se': np.std(ate_bootstrap, ddof=1),
        'ci_bootstrap_lower': np.percentile(ate_bootstrap, 2.5),
        'ci_bootstrap_upper': np.percentile(ate_bootstrap, 97.5)
    }

# 估计处理效应
if len(matched_nn) > 0:
    # 使用配对t检验
    te_result = estimate_treatment_effect(matched_nn, 'treatment', 'income', 'paired_t')
    
    # 使用自助法
    bootstrap_result = bootstrap_ate(
        data_support[data_support['in_common_support']], 
        'treatment', 'income', 'propensity_score', n_bootstrap=500
    )
    
    print("处理效应估计结果:")
    print(f"ATE (配对t检验): {te_result['ate']:.2f}元")
    print(f"标准误: {te_result['se']:.2f}")
    print(f"95%置信区间: [{te_result['ci_lower']:.2f}, {te_result['ci_upper']:.2f}]")
    print(f"p值: {te_result['p_value']:.4f}")
    print(f"匹配对数: {te_result['n_pairs']}")
    
    print(f"\nATE (自助法): {bootstrap_result['ate_bootstrap_mean']:.2f}元")
    print(f"自助标准误: {bootstrap_result['ate_bootstrap_se']:.2f}")
    print(f"自助95%置信区间: [{bootstrap_result['ci_bootstrap_lower']:.2f}, {bootstrap_result['ci_boostrap_upper']:.2f}]")

# 异质性处理效应分析
def analyze_heterogeneous_te(matched_data, treatment_col, outcome_col, subgroup_vars):
    """分析异质性处理效应"""
    
    results = {}
    
    for subgroup_var in subgroup_vars:
        if subgroup_var not in matched_data.columns:
            continue
            
        # 按子组分析
        unique_values = matched_data[subgroup_var].unique()
        
        subgroup_results = []
        for value in unique_values:
            subgroup_data = matched_data[matched_data[subgroup_var] == value]
            
            if len(subgroup_data) > 0:
                te_subgroup = estimate_treatment_effect(subgroup_data, treatment_col, outcome_col)
                te_subgroup['subgroup_value'] = value
                te_subgroup['n_obs'] = len(subgroup_data)
                subgroup_results.append(te_subgroup)
        
        results[subgroup_var] = pd.DataFrame(subgroup_results)
    
    return results

# 异质性分析
if len(matched_nn) > 0:
    subgroup_vars = ['gender', 'urban', 'education_group']
    
    # 创建教育分组
    matched_nn['education_group'] = pd.cut(matched_nn['education'], 
                                         bins=[0, 12, 16, 20], 
                                         labels=['高中及以下', '大学', '研究生'])
    
    hetero_results = analyze_heterogeneous_te(matched_nn, 'treatment', 'income', subgroup_vars)
    
    print("\n异质性处理效应分析:")
    for var, result_df in hetero_results.items():
        print(f"\n按 {var} 分组:")
        if len(result_df) > 0:
            display_cols = ['subgroup_value', 'ate', 'ci_lower', 'ci_upper', 'p_value', 'n_obs']
            available_cols = [col for col in display_cols if col in result_df.columns]
            print(result_df[available_cols].round(4))

# 可视化处理效应
if len(matched_nn) > 0:
    fig, axes = plt.subplots(1, 3, figsize=(18, 6))
    
    # 1. 主要ATE估计
    methods = ['简单比较', 'PSM-配对t', 'PSM-自助法', '真实效应']
    ate_values = [
        data[data['treatment']==1]['income'].mean() - data[data['treatment']==0]['income'].mean(),
        te_result['ate'],
        bootstrap_result['ate_bootstrap_mean'],
        treatment_effect
    ]
    ci_lowers = [np.nan, te_result['ci_lower'], bootstrap_result['ci_bootstrap_lower'], np.nan]
    ci_uppers = [np.nan, te_result['ci_upper'], bootstrap_result['ci_bootstrap_upper'], np.nan]
    
    colors = ['red', 'orange', 'orange', 'green']
    
    for i, (method, ate, color) in enumerate(zip(methods, ate_values, colors)):
        axes[0].barh(i, ate, color=color, alpha=0.7, label=method)
        if not np.isnan(ci_lowers[i]) and not np.isnan(ci_uppers[i]):
            axes[0].errorbar(ate, i, xerr=[[ate - ci_lowers[i]], [ci_uppers[i] - ate]], 
                           fmt='none', color='black', capsize=5)
    
    axes[0].axvline(x=treatment_effect, color='green', linestyle='--', alpha=0.7, label='真实效应')
    axes[0].set_yticks(range(len(methods)))
    axes[0].set_yticklabels(methods)
    axes[0].set_xlabel('处理效应估计 (元)')
    axes[0].set_title('不同方法的处理效应估计比较')
    axes[0].legend()
    axes[0].grid(True, alpha=0.3)
    
    # 2. 异质性分析 - 性别
    if 'gender' in hetero_results and len(hetero_results['gender']) > 0:
        gender_results = hetero_results['gender']
        gender_labels = ['男性', '女性']
        gender_ates = gender_results['ate'].values
        
        axes[1].bar(gender_labels, gender_ates, color=['blue', 'pink'], alpha=0.7)
        for i, (ate, ci_l, ci_u) in enumerate(zip(gender_results['ate'], 
                                                gender_results['ci_lower'], 
                                                gender_results['ci_upper'])):
            axes[1].errorbar(i, ate, yerr=[[ate - ci_l], [ci_u - ate]], 
                           fmt='none', color='black', capsize=5)
        
        axes[1].axhline(y=te_result['ate'], color='red', linestyle='--', alpha=0.7, label='总体ATE')
        axes[1].set_ylabel('处理效应 (元)')
        axes[1].set_title('按性别的异质性处理效应')
        axes[1].legend()
        axes[1].grid(True, alpha=0.3)
    
    # 3. 异质性分析 - 教育
    if 'education_group' in hetero_results and len(hetero_results['education_group']) > 0:
        edu_results = hetero_results['education_group']
        edu_labels = edu_results['subgroup_value'].values
        edu_ates = edu_results['ate'].values
        
        axes[2].bar(edu_labels, edu_ates, alpha=0.7)
        for i, (ate, ci_l, ci_u) in enumerate(zip(edu_results['ate'], 
                                                edu_results['ci_lower'], 
                                                edu_results['ci_upper'])):
            axes[2].errorbar(i, ate, yerr=[[ate - ci_l], [ci_u - ate]], 
                           fmt='none', color='black', capsize=5)
        
        axes[2].axhline(y=te_result['ate'], color='red', linestyle='--', alpha=0.7, label='总体ATE')
        axes[2].set_ylabel('处理效应 (元)')
        axes[2].set_title('按教育程度的异质性处理效应')
        axes[2].legend()
        axes[2].grid(True, alpha=0.3)
    
    plt.tight_layout()
    plt.show()

处理效应估计显示,经过倾向得分匹配后,我们得到了更接近真实处理效应的估计值,这表明PSM方法有效地减少了选择偏差。

45%25%20%10%处理效应估计方法使用频率配对t检验回归调整自助法加权最小二乘

VIII. 敏感性分析

敏感性分析用于评估我们的因果结论对未观测混淆变量的稳健性。

# 敏感性分析
def rosenbaum_sensitivity_analysis(matched_data, treatment_col, outcome_col, gamma_range=np.linspace(1, 3, 21)):
    """
    Rosenbaum敏感性分析
    评估结论对隐藏偏差的敏感性
    
    参数:
    - gamma:  odds of treatment ratio due to unobserved confounders
    """
    
    if len(matched_data) == 0 or 'match_id' not in matched_data.columns:
        return pd.DataFrame()
    
    # 按匹配对计算差异
    matched_pairs = matched_data.groupby('match_id')
    differences = []
    
    for match_id, pair in matched_pairs:
        if len(pair) == 2:
            treated_outcome = pair[pair[treatment_col] == 1][outcome_col].iloc[0]
            control_outcome = pair[pair[treatment_col] == 0][outcome_col].iloc[0]
            differences.append(treated_outcome - control_outcome)
    
    differences = np.array(differences)
    n_pairs = len(differences)
    
    if n_pairs == 0:
        return pd.DataFrame()
    
    # 符号秩检验的敏感性分析
    sensitivity_results = []
    
    for gamma in gamma_range:
        # 计算在隐藏偏差下的p值范围
        # 这里使用简化的方法,实际Rosenbaum界限计算更复杂
        positive_diff = sum(differences > 0)
        negative_diff = sum(differences < 0)
        
        # 简化计算:假设隐藏偏差会改变一定比例的对
        max_favor_treatment = positive_diff + min(negative_diff, int(n_pairs * (gamma-1)/gamma))
        min_favor_treatment = max(0, positive_diff - int(n_pairs * (gamma-1)))
        
        # 二项检验p值
        p_max = stats.binom_test(max_favor_treatment, n_pairs, 0.5, alternative='greater')
        p_min = stats.binom_test(min_favor_treatment, n_pairs, 0.5, alternative='greater')
        
        sensitivity_results.append({
            'gamma': gamma,
            'p_value_upper': p_max,
            'p_value_lower': p_min,
            'significant_upper': p_max < 0.05,
            'significant_lower': p_min < 0.05
        })
    
    return pd.DataFrame(sensitivity_results)

# 执行敏感性分析
if len(matched_nn) > 0:
    sensitivity_df = rosenbaum_sensitivity_analysis(matched_nn, 'treatment', 'income')
    
    if len(sensitivity_df) > 0:
        print("Rosenbaum敏感性分析结果:")
        print(sensitivity_df.round(4))
        
        # 找到结论变得不显著的gamma值
        critical_gamma = sensitivity_df[sensitivity_df['significant_lower'] == False]['gamma'].min()
        if not np.isnan(critical_gamma):
            print(f"\n结论变得不稳健的临界Gamma值: {critical_gamma:.2f}")
            print("Gamma表示由于未观测混淆变量导致的处理分配几率比")
        else:
            print("\n在分析的Gamma范围内,结论始终保持稳健")

# 可视化敏感性分析
if len(sensitivity_df) > 0:
    plt.figure(figsize=(10, 6))
    
    plt.plot(sensitivity_df['gamma'], sensitivity_df['p_value_lower'], 
             'b-', label='p值下限', linewidth=2)
    plt.plot(sensitivity_df['gamma'], sensitivity_df['p_value_upper'], 
             'r-', label='p值上限', linewidth=2)
    plt.axhline(y=0.05, color='black', linestyle='--', alpha=0.7, label='显著性阈值 (0.05)')
    
    if not np.isnan(critical_gamma):
        plt.axvline(x=critical_gamma, color='green', linestyle='--', 
                   label=f'临界Gamma: {critical_gamma:.2f}')
    
    plt.xlabel('Gamma (未观测混淆的强度)')
    plt.ylabel('p值')
    plt.title('Rosenbaum敏感性分析')
    plt.legend()
    plt.grid(True, alpha=0.3)
    plt.show()

# 其他敏感性分析:未观测混淆的影响
def assess_unobserved_confounding(ate_estimate, confounding_strength):
    """评估未观测混淆对处理效应估计的潜在影响"""
    
    results = []
    
    for strength in confounding_strength:
        # 假设未观测混淆与处理和结果的关系强度
        # 这里进行简化计算
        bias_direction = np.random.choice([-1, 1])  # 混淆的方向不确定
        bias_magnitude = strength * ate_estimate  # 偏差大小
        
        adjusted_ate = ate_estimate - bias_direction * bias_magnitude
        
        results.append({
            'confounding_strength': strength,
            'bias_direction': bias_direction,
            'bias_magnitude': bias_magnitude,
            'adjusted_ate': adjusted_ate,
            'bias_percentage': bias_magnitude / ate_estimate * 100
        })
    
    return pd.DataFrame(results)

# 评估未观测混淆的影响
confounding_strengths = [0.1, 0.2, 0.3, 0.4, 0.5]
if len(matched_nn) > 0 and not np.isnan(te_result['ate']):
    confounding_df = assess_unobserved_confounding(te_result['ate'], confounding_strengths)
    
    print("\n未观测混淆敏感性分析:")
    print(confounding_df.round(4))

# 综合敏感性报告
def generate_sensitivity_report(matched_data, treatment_col, outcome_col, ate_estimate):
    """生成敏感性分析综合报告"""
    
    report = {
        '样本量': {
            '总匹配对': len(matched_data) // 2 if len(matched_data) > 0 else 0,
            '处理组': sum(matched_data[treatment_col] == 1) if len(matched_data) > 0 else 0,
            '对照组': sum(matched_data[treatment_col] == 0) if len(matched_data) > 0 else 0
        },
        '处理效应': {
            'ATE估计': ate_estimate,
            '95%CI': f"[{te_result['ci_lower']:.2f}, {te_result['ci_upper']:.2f}]" if 'ci_lower' in te_result else "N/A",
            'p值': te_result['p_value'] if 'p_value' in te_result else np.nan
        },
        '敏感性': {
            'Rosenbaum临界Gamma': critical_gamma if 'critical_gamma' in locals() else "N/A",
            '结论稳健性': '稳健' if ('critical_gamma' in locals() and critical_gamma > 1.5) else '需要谨慎解释'
        }
    }
    
    return report

# 生成敏感性报告
if len(matched_nn) > 0:
    sensitivity_report = generate_sensitivity_report(matched_nn, 'treatment', 'income', te_result['ate'])
    
    print("\n=== 敏感性分析综合报告 ===")
    for category, metrics in sensitivity_report.items():
        print(f"\n{category}:")
        for metric, value in metrics.items():
            print(f"  {metric}: {value}")

敏感性分析帮助我们理解研究结论对未观测混淆变量的稳健性,这是观察性研究中至关重要的一步。

敏感性分析
Rosenbaum边界法
未观测混淆模拟
安慰剂检验
评估隐藏偏差影响
量化混淆强度
验证模型设定
结论稳健性判断
强稳健性
中等稳健性
弱稳健性
结论可信
结论需谨慎
结论不可靠
【声明】本内容来自华为云开发者社区博主,不代表华为云及华为云开发者社区的观点和立场。转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息,否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。