倾向得分匹配全流程详解与实战
I. 引言:为什么需要倾向得分匹配
观察性研究中的因果推断挑战
在医学、经济学、社会科学等领域,研究者经常面临一个根本性问题:如何从非实验数据中得出因果结论?与随机对照试验不同,观察性研究中处理组和对照组往往在基线特征上存在系统性差异,这导致简单的组间比较会产生有偏估计。
举个例子:假设我们想研究大学教育对个人收入的影响。上大学的人和不上大学的人在能力、家庭背景、动机等方面可能存在显著差异。如果直接比较两组人的收入,我们无法确定收入差异是教育带来的,还是这些前置差异造成的。
倾向得分匹配的基本思想
倾向得分匹配由Rosenbaum和Rubin在1983年提出,其核心思想是通过模拟随机化来创建可比较的组。具体来说,PSM通过以下步骤实现:
- 为每个个体估计接受处理的概率(倾向得分)
- 基于倾向得分匹配处理组和对照组个体
- 在匹配后的样本中比较结果变量
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.linear_model import LogisticRegression
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.neighbors import NearestNeighbors
import statsmodels.api as sm
from scipy import stats
import warnings
warnings.filterwarnings('ignore')
# 设置中文字体
plt.rcParams['font.sans-serif'] = ['SimHei']
plt.rcParams['axes.unicode_minus'] = False
print("倾向得分匹配全流程详解与实战")
print("=" * 50)
II. 理论基础:倾向得分的统计基础
倾向得分的定义与性质
倾向得分定义为在给定协变量X的条件下,个体接受处理的条件概率:
e(X) = P(D=1|X)
其中D是处理指示变量(1表示处理组,0表示对照组),X是观测到的协变量。
Rosenbaum和Rubin证明了倾向得分的关键性质:如果满足强可忽略处理分配假设,那么在给定倾向得分的条件下,处理分配与潜在结果独立:
(Y₁, Y₀) ⫫ D | e(X)
这意味着,一旦我们控制了倾向得分,处理组和对照组在协变量分布上应该是相似的,就像随机化实验一样。
倾向得分匹配的假设
成功的PSM分析依赖于三个关键假设:
假设名称 | 数学表达 | 实际含义 | 验证方法 |
---|---|---|---|
条件独立性 | (Y₁, Y₀) ⫫ D | X | 给定协变量后,处理分配与潜在结果独立 | 理论论证,无法完全检验 |
共同支持 | 0 < P(D=1|X) < 1 | 每个个体都有接受和不接受处理的可能性 | 检查倾向得分分布重叠 |
平衡性 | X ⫫ D | e(X) | 给定倾向得分后,协变量在处理组间平衡 | 平衡性检验 |
倾向得分匹配的优势与局限
优势:
- 降低维度:将多变量匹配简化为单变量匹配
- 直观易懂:基于概率概念,易于解释
- 灵活性:可与多种匹配算法结合
局限:
- 只能控制观测到的混淆变量
- 对模型设定敏感
- 可能损失样本量
III. 数据准备与探索性分析
实例背景:职业培训项目效果评估
假设我们正在评估一个职业培训项目对参与者收入的影响。我们有一个包含2000个个体的数据集,其中500人参加了培训项目(处理组),1500人未参加(对照组)。
我们观测到的协变量包括:
- 年龄、性别、教育年限
- 之前的工作经验、之前收入
- 居住地区、家庭背景等
# 模拟职业培训项目数据
np.random.seed(2024)
n_total = 2000
n_treated = 500
# 生成协变量
data = pd.DataFrame({
'age': np.random.normal(35, 8, n_total), # 年龄
'gender': np.random.binomial(1, 0.45, n_total), # 性别,1=女性,0=男性
'education': np.random.poisson(14, n_total), # 教育年限
'previous_income': np.random.normal(30000, 8000, n_total), # 之前收入
'experience': np.random.normal(5, 3, n_total), # 工作经验
'urban': np.random.binomial(1, 0.6, n_total), # 是否城市
'family_support': np.random.normal(0, 1, n_total) # 家庭支持程度
})
# 生成真实的倾向得分模型
# 培训参与与年龄、教育、之前收入等相关
X_ps = np.column_stack([
data['age'], data['gender'], data['education'],
data['previous_income'], data['experience'],
data['urban'], data['family_support']
])
X_ps = sm.add_constant(X_ps)
# 真实倾向得分模型参数
true_beta = np.array([-2.5, 0.02, -0.3, 0.15, -0.00001, 0.1, 0.4, 0.5])
true_ps = 1 / (1 + np.exp(-X_ps @ true_beta))
# 生成处理分配
treatment = np.random.binomial(1, true_ps)
# 确保处理组有500人
while np.sum(treatment) != n_treated:
treatment = np.random.binomial(1, true_ps)
data['treatment'] = treatment
data['true_ps'] = true_ps
# 生成结果变量(收入)
# 真实处理效应:培训使收入增加8000元
base_income = 20000 + 500 * data['age'] + 3000 * data['education'] + 0.7 * data['previous_income']
treatment_effect = 8000 # 真实处理效应
noise = np.random.normal(0, 5000, n_total)
data['income'] = base_income + treatment_effect * data['treatment'] + noise
print("数据概览:")
print(f"总样本量: {len(data)}")
print(f"处理组: {sum(data['treatment'])}人")
print(f"对照组: {sum(1 - data['treatment'])}人")
print(f"\n处理组平均收入: {data[data['treatment']==1]['income'].mean():.2f}元")
print(f"对照组平均收入: {data[data['treatment']==0]['income'].mean():.2f}元")
print(f"简单差异: {data[data['treatment']==1]['income'].mean() - data[data['treatment']==0]['income'].mean():.2f}元")
print(f"真实处理效应: {treatment_effect}元")
# 基本统计描述
print("\n协变量描述统计:")
print(data.describe())
探索性分析:处理组与对照组的基线差异
在进行匹配前,我们需要了解处理组和对照组在基线特征上的差异。
# 探索性分析:基线特征比较
def compare_groups(data, treatment_col, covariates):
"""比较处理组和对照组的基线特征"""
results = []
for cov in covariates:
treated = data[data[treatment_col] == 1][cov]
control = data[data[treatment_col] == 0][cov]
# 均值差异
mean_diff = treated.mean() - control.mean()
# t检验
t_stat, p_value = stats.ttest_ind(treated, control)
# 标准化均值差异 (SMD)
pooled_std = np.sqrt((treated.std()**2 + control.std()**2) / 2)
smd = mean_diff / pooled_std if pooled_std != 0 else 0
results.append({
'变量': cov,
'处理组均值': treated.mean(),
'对照组均值': control.mean(),
'均值差异': mean_diff,
'标准化差异': smd,
'p值': p_value
})
return pd.DataFrame(results)
# 比较基线特征
covariates = ['age', 'gender', 'education', 'previous_income', 'experience', 'urban', 'family_support']
baseline_comparison = compare_groups(data, 'treatment', covariates)
print("处理组与对照组基线特征比较:")
print(baseline_comparison.round(4))
# 可视化基线差异
fig, axes = plt.subplots(2, 4, figsize=(20, 10))
axes = axes.ravel()
for i, cov in enumerate(covariates):
# 分布图
data[data['treatment'] == 0][cov].hist(alpha=0.7, label='对照组', bins=20, ax=axes[i])
data[data['treatment'] == 1][cov].hist(alpha=0.7, label='处理组', bins=20, ax=axes[i])
axes[i].set_title(f'{cov}\nSMD: {baseline_comparison.iloc[i]["标准化差异"]:.3f}')
axes[i].legend()
# SMD可视化
axes[7].barh(range(len(covariates)), baseline_comparison['标准化差异'])
axes[7].set_yticks(range(len(covariates)))
axes[7].set_yticklabels(covariates)
axes[7].axvline(x=0, color='black', linestyle='-', alpha=0.3)
axes[7].axvline(x=0.1, color='red', linestyle='--', alpha=0.5, label='平衡阈值 (0.1)')
axes[7].set_xlabel('标准化均值差异 (SMD)')
axes[7].set_title('基线协变量平衡性')
axes[7].legend()
plt.tight_layout()
plt.show()
探索性分析显示,处理组和对照组在多个协变量上存在显著差异,这表明简单的组间比较会产生有偏估计。
IV. 倾向得分估计
倾向得分模型设定
倾向得分通常使用逻辑回归模型估计,但也可以使用其他机器学习方法。模型设定的关键是要包含所有与处理分配和结果变量相关的协变量。
# 倾向得分估计
def estimate_propensity_score(data, treatment_col, covariates, method='logistic', **kwargs):
"""
估计倾向得分
参数:
- data: 数据框
- treatment_col: 处理变量列名
- covariates: 协变量列表
- method: 估计方法 ('logistic', 'gbm')
返回:
- data_with_ps: 包含倾向得分的数据
- model: 拟合的模型
"""
X = data[covariates]
y = data[treatment_col]
if method == 'logistic':
# 逻辑回归
X_with_const = sm.add_constant(X)
model = sm.Logit(y, X_with_const).fit(disp=False)
propensity_scores = model.predict(X_with_const)
elif method == 'gbm':
# 梯度提升机
gbm = GradientBoostingClassifier(**kwargs)
gbm.fit(X, y)
propensity_scores = gbm.predict_proba(X)[:, 1]
model = gbm
else:
raise ValueError("方法必须是 'logistic' 或 'gbm'")
data_with_ps = data.copy()
data_with_ps['propensity_score'] = propensity_scores
return data_with_ps, model
# 使用逻辑回归估计倾向得分
data_ps, ps_model_logit = estimate_propensity_score(
data, 'treatment', covariates, method='logistic'
)
# 使用GBM估计倾向得分(作为比较)
data_ps_gbm, ps_model_gbm = estimate_propensity_score(
data, 'treatment', covariates, method='gbm',
n_estimators=100, max_depth=3, random_state=42
)
print("倾向得分模型结果 (逻辑回归):")
if hasattr(ps_model_logit, 'summary'):
print(ps_model_logit.summary())
# 比较两种方法的倾向得分
fig, axes = plt.subplots(1, 3, figsize=(18, 5))
# 倾向得分分布
axes[0].hist(data_ps[data_ps['treatment']==0]['propensity_score'],
alpha=0.7, label='对照组', bins=20, density=True)
axes[0].hist(data_ps[data_ps['treatment']==1]['propensity_score'],
alpha=0.7, label='处理组', bins=20, density=True)
axes[0].set_xlabel('倾向得分 (逻辑回归)')
axes[0].set_ylabel('密度')
axes[0].set_title('倾向得分分布 - 逻辑回归')
axes[0].legend()
axes[1].hist(data_ps_gbm[data_ps_gbm['treatment']==0]['propensity_score'],
alpha=0.7, label='对照组', bins=20, density=True)
axes[1].hist(data_ps_gbm[data_ps_gbm['treatment']==1]['propensity_score'],
alpha=0.7, label='处理组', bins=20, density=True)
axes[1].set_xlabel('倾向得分 (GBM)')
axes[1].set_ylabel('密度')
axes[1].set_title('倾向得分分布 - GBM')
axes[1].legend()
# 两种方法比较
axes[2].scatter(data_ps['propensity_score'], data_ps_gbm['propensity_score'],
alpha=0.5, c=data_ps['treatment'], cmap='viridis')
axes[2].set_xlabel('逻辑回归倾向得分')
axes[2].set_ylabel('GBM倾向得分')
axes[2].set_title('两种方法倾向得分比较')
axes[2].plot([0, 1], [0, 1], 'r--', alpha=0.5)
plt.tight_layout()
plt.show()
# 评估倾向得分模型
from sklearn.metrics import roc_auc_score, accuracy_score
y_true = data_ps['treatment']
y_pred_logit = data_ps['propensity_score']
y_pred_gbm = data_ps_gbm['propensity_score']
print("\n倾向得分模型评估:")
print(f"逻辑回归 - AUC: {roc_auc_score(y_true, y_pred_logit):.4f}")
print(f"GBM - AUC: {roc_auc_score(y_true, y_pred_gbm):.4f}")
print(f"逻辑回归 - 准确率: {accuracy_score(y_true, y_pred_logit > 0.5):.4f}")
print(f"GBM - 准确率: {accuracy_score(y_true, y_pred_gbm > 0.5):.4f}")
共同支持区域检查
在进行匹配前,我们需要确保处理组和对照组的倾向得分分布有足够的重叠区域。
# 共同支持区域分析
def check_common_support(data, treatment_col, ps_col):
"""检查共同支持区域"""
treated_ps = data[data[treatment_col] == 1][ps_col]
control_ps = data[data[treatment_col] == 0][ps_col]
# 计算共同支持区域
min_treated, max_treated = treated_ps.min(), treated_ps.max()
min_control, max_control = control_ps.min(), control_ps.max()
common_min = max(min_treated, min_control)
common_max = min(max_treated, max_control)
# 识别在共同支持区域外的样本
data_out = data.copy()
data_out['in_common_support'] = (
(data_out[ps_col] >= common_min) &
(data_out[ps_col] <= common_max)
)
support_info = {
'common_min': common_min,
'common_max': common_max,
'n_treated_in_support': sum(data_out[data_out[treatment_col]==1]['in_common_support']),
'n_control_in_support': sum(data_out[data_out[treatment_col]==0]['in_common_support']),
'n_treated_total': sum(data_out[treatment_col]==1),
'n_control_total': sum(data_out[treatment_col]==0)
}
return data_out, support_info
# 检查共同支持区域
data_support, support_info = check_common_support(data_ps, 'treatment', 'propensity_score')
print("\n共同支持区域分析:")
print(f"倾向得分重叠范围: [{support_info['common_min']:.4f}, {support_info['common_max']:.4f}]")
print(f"处理组在共同支持区域内: {support_info['n_treated_in_support']}/{support_info['n_treated_total']} "
f"({support_info['n_treated_in_support']/support_info['n_treated_total']*100:.1f}%)")
print(f"对照组在共同支持区域内: {support_info['n_control_in_support']}/{support_info['n_control_total']} "
f"({support_info['n_control_in_support']/support_info['n_control_total']*100:.1f}%)")
# 可视化共同支持区域
plt.figure(figsize=(10, 6))
# 密度图
treated_ps = data_ps[data_ps['treatment']==1]['propensity_score']
control_ps = data_ps[data_ps['treatment']==0]['propensity_score']
plt.hist(treated_ps, alpha=0.7, label='处理组', bins=30, density=True, color='red')
plt.hist(control_ps, alpha=0.7, label='对照组', bins=30, density=True, color='blue')
# 标记共同支持区域
plt.axvline(support_info['common_min'], color='black', linestyle='--',
label=f'共同支持区域下限: {support_info["common_min"]:.3f}')
plt.axvline(support_info['common_max'], color='black', linestyle='--',
label=f'共同支持区域上限: {support_info["common_max"]:.3f}')
plt.xlabel('倾向得分')
plt.ylabel('密度')
plt.title('倾向得分分布与共同支持区域')
plt.legend()
plt.show()
共同支持区域分析帮助我们确定哪些样本可以用于匹配。通常我们会排除倾向得分极端(接近0或1)的样本,因为这些样本缺乏合适的匹配对象。
V. 匹配算法实现
多种匹配方法实现
倾向得分匹配有多种算法,每种都有其优缺点。我们将实现几种常用的匹配方法。
# 匹配算法实现
class PropensityScoreMatcher:
"""倾向得分匹配器"""
def __init__(self, caliper=0.2, caliper_method='std', replace=False):
"""
参数:
- caliper: 卡尺值
- caliper_method: 卡尺计算方法 ('std', 'absolute')
- replace: 是否允许重复匹配
"""
self.caliper = caliper
self.caliper_method = caliper_method
self.replace = replace
def nearest_neighbor_match(self, data, treatment_col, ps_col, outcome_col):
"""最近邻匹配"""
treated = data[data[treatment_col] == 1].copy()
control = data[data[treatment_col] == 0].copy()
# 计算卡尺
if self.caliper_method == 'std':
ps_std = data[ps_col].std()
caliper_value = self.caliper * ps_std
else:
caliper_value = self.caliper
# 最近邻匹配
nbrs = NearestNeighbors(n_neighbors=1, metric='euclidean')
nbrs.fit(control[[ps_col]])
distances, indices = nbrs.kneighbors(treated[[ps_col]])
# 应用卡尺并创建匹配对
matches = []
match_ids = []
for i, (dist, idx) in enumerate(zip(distances, indices)):
if dist[0] <= caliper_value:
treated_row = treated.iloc[i].copy()
control_row = control.iloc[idx[0]].copy()
match_id = len(match_ids)
treated_row['match_id'] = match_id
control_row['match_id'] = match_id
treated_row['distance'] = dist[0]
control_row['distance'] = dist[0]
matches.extend([treated_row, control_row])
match_ids.append(match_id)
# 如果不允许重复匹配,则移除已匹配的对照
if not self.replace:
control = control.drop(control.index[idx[0]])
if len(control) > 0:
nbrs.fit(control[[ps_col]])
else:
break
matched_data = pd.DataFrame(matches)
# 计算匹配后的处理效应
if len(matched_data) > 0:
treated_matched = matched_data[matched_data[treatment_col] == 1][outcome_col]
control_matched = matched_data[matched_data[treatment_col] == 0][outcome_col]
ate = treated_matched.mean() - control_matched.mean()
else:
ate = np.nan
match_info = {
'n_treated_matched': len(treated_matched) if len(matched_data) > 0 else 0,
'n_control_matched': len(control_matched) if len(matched_data) > 0 else 0,
'ate': ate,
'matching_ratio': len(treated_matched) / len(treated) if len(treated) > 0 else 0
}
return matched_data, match_info
def radius_match(self, data, treatment_col, ps_col, outcome_col):
"""半径匹配"""
treated = data[data[treatment_col] == 1].copy()
control = data[data[treatment_col] == 0].copy()
# 计算半径
if self.caliper_method == 'std':
ps_std = data[ps_col].std()
radius = self.caliper * ps_std
else:
radius = self.caliper
matches = []
match_id = 0
for i, treated_row in treated.iterrows():
# 找到在半径内的所有对照
distances = np.abs(control[ps_col] - treated_row[ps_col])
within_radius = control[distances <= radius]
if len(within_radius) > 0:
# 随机选择一个对照(如果不允许重复匹配)
if not self.replace and len(within_radius) > 0:
control_match = within_radius.sample(1).iloc[0]
control = control.drop(control_match.name)
elif len(within_radius) > 0:
control_match = within_radius.sample(1).iloc[0]
else:
continue
# 创建匹配对
treated_match = treated_row.copy()
distance = np.abs(control_match[ps_col] - treated_match[ps_col])
treated_match['match_id'] = match_id
control_match['match_id'] = match_id
treated_match['distance'] = distance
control_match['distance'] = distance
matches.extend([treated_match, control_match])
match_id += 1
matched_data = pd.DataFrame(matches) if matches else pd.DataFrame()
# 计算处理效应
if len(matched_data) > 0:
treated_matched = matched_data[matched_data[treatment_col] == 1][outcome_col]
control_matched = matched_data[matched_data[treatment_col] == 0][outcome_col]
ate = treated_matched.mean() - control_matched.mean()
else:
ate = np.nan
match_info = {
'n_treated_matched': len(treated_matched) if len(matched_data) > 0 else 0,
'n_control_matched': len(control_matched) if len(matched_data) > 0 else 0,
'ate': ate,
'matching_ratio': len(treated_matched) / len(treated) if len(treated) > 0 else 0
}
return matched_data, match_info
def kernel_match(self, data, treatment_col, ps_col, outcome_col, bandwidth=0.06):
"""核匹配"""
treated = data[data[treatment_col] == 1].copy()
control = data[data[treatment_col] == 0].copy()
# 为每个处理组个体计算加权结果
counterfactuals = []
for i, treated_row in treated.iterrows():
# 计算核权重
distances = np.abs(control[ps_col] - treated_row[ps_col])
weights = np.exp(-0.5 * (distances / bandwidth) ** 2)
weights = weights / np.sum(weights) # 标准化权重
# 计算加权平均结果
weighted_outcome = np.sum(control[outcome_col] * weights)
counterfactuals.append(weighted_outcome)
# 计算处理效应
if len(counterfactuals) > 0:
actual_outcomes = treated[outcome_col].values
treatment_effects = actual_outcomes - counterfactuals
ate = np.mean(treatment_effects)
else:
ate = np.nan
match_info = {
'n_treated_matched': len(treated),
'n_control_used': len(control),
'ate': ate,
'individual_te': treatment_effects if len(counterfactuals) > 0 else []
}
# 核匹配不产生具体的匹配对数据框
return None, match_info
# 应用不同的匹配方法
print("应用不同匹配方法...")
# 1. 最近邻匹配
matcher_nn = PropensityScoreMatcher(caliper=0.2, caliper_method='std', replace=False)
matched_nn, info_nn = matcher_nn.nearest_neighbor_match(
data_support[data_support['in_common_support']], # 只在共同支持区域内匹配
'treatment', 'propensity_score', 'income'
)
# 2. 半径匹配
matcher_radius = PropensityScoreMatcher(caliper=0.1, caliper_method='std', replace=False)
matched_radius, info_radius = matcher_radius.radius_match(
data_support[data_support['in_common_support']],
'treatment', 'propensity_score', 'income'
)
# 3. 核匹配
matcher_kernel = PropensityScoreMatcher()
_, info_kernel = matcher_kernel.kernel_match(
data_support[data_support['in_common_support']],
'treatment', 'propensity_score', 'income'
)
# 比较匹配结果
matching_results = pd.DataFrame({
'匹配方法': ['最近邻匹配', '半径匹配', '核匹配'],
'匹配的处理组数': [info_nn['n_treated_matched'], info_radius['n_treated_matched'], info_kernel['n_treated_matched']],
'使用的对照组数': [info_nn['n_control_matched'], info_radius['n_control_matched'], info_kernel['n_control_used']],
'匹配率': [info_nn['matching_ratio'], info_radius['matching_ratio'], 1.0],
'ATE估计': [info_nn['ate'], info_radius['ate'], info_kernel['ate']]
})
print("\n不同匹配方法结果比较:")
print(matching_results.round(4))
# 可视化匹配效果
fig, axes = plt.subplots(2, 2, figsize=(15, 12))
# 原始倾向得分分布
axes[0,0].hist(data_ps[data_ps['treatment']==0]['propensity_score'],
alpha=0.7, label='对照组', bins=20, density=True)
axes[0,0].hist(data_ps[data_ps['treatment']==1]['propensity_score'],
alpha=0.7, label='处理组', bins=20, density=True)
axes[0,0].set_xlabel('倾向得分')
axes[0,0].set_ylabel('密度')
axes[0,0].set_title('原始倾向得分分布')
axes[0,0].legend()
# 最近邻匹配后分布
if len(matched_nn) > 0:
axes[0,1].hist(matched_nn[matched_nn['treatment']==0]['propensity_score'],
alpha=0.7, label='对照组', bins=20, density=True)
axes[0,1].hist(matched_nn[matched_nn['treatment']==1]['propensity_score'],
alpha=0.7, label='处理组', bins=20, density=True)
axes[0,1].set_xlabel('倾向得分')
axes[0,1].set_ylabel('密度')
axes[0,1].set_title('最近邻匹配后倾向得分分布')
axes[0,1].legend()
# 匹配距离分布
if len(matched_nn) > 0:
axes[1,0].hist(matched_nn['distance'], bins=30, alpha=0.7)
axes[1,0].set_xlabel('匹配距离')
axes[1,0].set_ylabel('频数')
axes[1,0].set_title('匹配距离分布')
# ATE比较
methods = ['原始差异', '最近邻匹配', '半径匹配', '核匹配', '真实效应']
ate_values = [
data[data['treatment']==1]['income'].mean() - data[data['treatment']==0]['income'].mean(),
info_nn['ate'],
info_radius['ate'],
info_kernel['ate'],
treatment_effect
]
colors = ['red', 'orange', 'orange', 'orange', 'green']
axes[1,1].barh(methods, ate_values, color=colors, alpha=0.7)
axes[1,1].set_xlabel('处理效应估计 (元)')
axes[1,1].set_title('不同方法的处理效应估计')
axes[1,1].grid(axis='x', alpha=0.3)
plt.tight_layout()
plt.show()
匹配方法选择指南
不同的匹配方法适用于不同的场景:
匹配方法 | 优点 | 缺点 | 适用场景 |
---|---|---|---|
最近邻匹配 | 简单直观,保留最多信息 | 可能匹配质量不一,对卡尺敏感 | 样本量充足,追求精确匹配 |
半径匹配 | 确保匹配质量,更稳健 | 可能损失部分处理组样本 | 对匹配质量要求高 |
核匹配 | 使用所有信息,效率高 | 结果不易解释,带宽选择敏感 | 大样本,关注平均效应 |
分层匹配 | 简单易实现 | 分层数选择主观 | 探索性分析,初步匹配 |
VI. 平衡性检验与匹配质量评估
匹配后,我们必须检验匹配质量,确保处理组和对照组在协变量上达到平衡。
# 平衡性检验函数
def assess_balance(original_data, matched_data, treatment_col, covariates, ps_col=None):
"""评估匹配前后的平衡性"""
balance_results = []
for data_type, data_dict in [('匹配前', original_data), ('匹配后', matched_data)]:
if len(data_dict) == 0:
continue
for cov in covariates:
treated = data_dict[data_dict[treatment_col] == 1][cov]
control = data_dict[data_dict[treatment_col] == 0][cov]
# 基本统计量
treated_mean = treated.mean()
control_mean = control.mean()
mean_diff = treated_mean - control_mean
# 标准化均值差异
pooled_std = np.sqrt((treated.std()**2 + control.std()**2) / 2)
smd = mean_diff / pooled_std if pooled_std != 0 else 0
# 方差比
var_ratio = treated.var() / control.var() if control.var() != 0 else np.inf
# t检验
if len(treated) > 1 and len(control) > 1:
t_stat, p_value = stats.ttest_ind(treated, control)
else:
t_stat, p_value = np.nan, np.nan
balance_results.append({
'阶段': data_type,
'变量': cov,
'处理组均值': treated_mean,
'对照组均值': control_mean,
'均值差异': mean_diff,
'标准化差异': smd,
'方差比': var_ratio,
'p值': p_value
})
balance_df = pd.DataFrame(balance_results)
# 计算总体平衡性指标
overall_balance = balance_df.groupby('阶段').agg({
'标准化差异': [lambda x: np.mean(np.abs(x)), lambda x: np.max(np.abs(x))],
'p值': lambda x: np.mean(x < 0.05)
}).round(4)
overall_balance.columns = ['平均绝对SMD', '最大绝对SMD', '显著比例']
return balance_df, overall_balance
# 执行平衡性检验(使用最近邻匹配结果)
if len(matched_nn) > 0:
balance_details, balance_overall = assess_balance(
data, matched_nn, 'treatment', covariates
)
print("平衡性检验详情:")
print(balance_details.round(4))
print("\n总体平衡性指标:")
print(balance_overall)
# 可视化平衡性改善
if len(matched_nn) > 0:
# 提取匹配前后的SMD
smd_before = balance_details[balance_details['阶段'] == '匹配前']['标准化差异'].values
smd_after = balance_details[balance_details['阶段'] == '匹配后']['标准化差异'].values
fig, axes = plt.subplots(1, 2, figsize=(15, 6))
# SMD比较图
max_smd = max(np.max(np.abs(smd_before)), np.max(np.abs(smd_after)))
axes[0].scatter(smd_before, smd_after, s=60, alpha=0.7)
for i, cov in enumerate(covariates):
axes[0].annotate(cov, (smd_before[i], smd_after[i]),
xytext=(5, 5), textcoords='offset points', fontsize=9)
axes[0].axhline(y=0, color='black', linestyle='-', alpha=0.3)
axes[0].axvline(x=0, color='black', linestyle='-', alpha=0.3)
axes[0].axhline(y=0.1, color='red', linestyle='--', alpha=0.5, label='平衡阈值 (0.1)')
axes[0].axhline(y=-0.1, color='red', linestyle='--', alpha=0.5)
axes[0].axvline(x=0.1, color='red', linestyle='--', alpha=0.5)
axes[0].axvline(x=-0.1, color='red', linestyle='--', alpha=0.5)
axes[0].set_xlim(-max_smd*1.1, max_smd*1.1)
axes[0].set_ylim(-max_smd*1.1, max_smd*1.1)
axes[0].set_xlabel('匹配前标准化差异')
axes[0].set_ylabel('匹配后标准化差异')
axes[0].set_title('匹配前后平衡性比较')
axes[0].legend()
axes[0].grid(True, alpha=0.3)
# SMD改善图
x_pos = np.arange(len(covariates))
width = 0.35
axes[1].bar(x_pos - width/2, np.abs(smd_before), width, label='匹配前', alpha=0.7)
axes[1].bar(x_pos + width/2, np.abs(smd_after), width, label='匹配后', alpha=0.7)
axes[1].axhline(y=0.1, color='red', linestyle='--', label='平衡阈值 (0.1)')
axes[1].set_xlabel('协变量')
axes[1].set_ylabel('绝对标准化差异')
axes[1].set_title('匹配前后绝对标准化差异比较')
axes[1].set_xticks(x_pos)
axes[1].set_xticklabels(covariates, rotation=45)
axes[1].legend()
axes[1].grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
# 匹配质量综合评估
def evaluate_matching_quality(original_data, matched_data, treatment_col, covariates):
"""综合评估匹配质量"""
quality_metrics = {}
# 1. 样本保留率
n_treated_original = sum(original_data[treatment_col] == 1)
n_treated_matched = sum(matched_data[treatment_col] == 1) if len(matched_data) > 0 else 0
quality_metrics['处理组保留率'] = n_treated_matched / n_treated_original
# 2. 平衡性改善
balance_before, _ = assess_balance(original_data, original_data, treatment_col, covariates)
balance_after, _ = assess_balance(original_data, matched_data, treatment_col, covariates)
smd_before = balance_before[balance_before['阶段'] == '匹配前']['标准化差异'].abs().mean()
smd_after = balance_after[balance_after['阶段'] == '匹配后']['标准化差异'].abs().mean()
quality_metrics['SMD改善比例'] = (smd_before - smd_after) / smd_before
# 3. 平衡变量比例
n_balanced_before = sum(balance_before[balance_before['阶段'] == '匹配前']['标准化差异'].abs() < 0.1)
n_balanced_after = sum(balance_after[balance_after['阶段'] == '匹配后']['标准化差异'].abs() < 0.1)
quality_metrics['平衡变量比例'] = n_balanced_after / len(covariates)
# 4. 方差比在0.5-2之间的变量比例
var_ratios_after = balance_after[balance_after['阶段'] == '匹配后']['方差比']
n_good_var = sum((var_ratios_after >= 0.5) & (var_ratios_after <= 2))
quality_metrics['良好方差比比例'] = n_good_var / len(covariates)
return quality_metrics
# 评估匹配质量
if len(matched_nn) > 0:
quality_metrics = evaluate_matching_quality(data, matched_nn, 'treatment', covariates)
print("\n匹配质量综合评估:")
for metric, value in quality_metrics.items():
print(f"{metric}: {value:.4f}")
# 质量评分
quality_score = (
quality_metrics['处理组保留率'] * 0.3 +
quality_metrics['SMD改善比例'] * 0.3 +
quality_metrics['平衡变量比例'] * 0.2 +
quality_metrics['良好方差比比例'] * 0.2
)
print(f"\n综合质量评分: {quality_score:.4f}")
平衡性检验是PSM流程中至关重要的一步,它确保我们的匹配确实改善了组间可比性。
VII. 处理效应估计与统计推断
在确认匹配质量良好后,我们可以估计处理效应并进行统计推断。
# 处理效应估计与推断
def estimate_treatment_effect(matched_data, treatment_col, outcome_col, method='paired_t'):
"""
估计处理效应并进行统计推断
参数:
- matched_data: 匹配后的数据
- treatment_col: 处理变量列名
- outcome_col: 结果变量列名
- method: 推断方法 ('paired_t', 'regression', 'bootstrap')
"""
if len(matched_data) == 0:
return {'ate': np.nan, 'se': np.nan, 'ci_lower': np.nan, 'ci_upper': np.nan, 'p_value': np.nan}
treated = matched_data[matched_data[treatment_col] == 1]
control = matched_data[matched_data[treatment_col] == 0]
# 确保有匹配对
if 'match_id' not in matched_data.columns or len(treated) != len(control):
# 如果没有匹配对信息,使用独立样本t检验
ate = treated[outcome_col].mean() - control[outcome_col].mean()
t_stat, p_value = stats.ttest_ind(treated[outcome_col], control[outcome_col])
se = ate / t_stat if t_stat != 0 else np.nan
else:
# 按匹配对计算差异
matched_pairs = matched_data.groupby('match_id')
differences = []
for match_id, pair in matched_pairs:
if len(pair) == 2: # 确保是完整的匹配对
treated_outcome = pair[pair[treatment_col] == 1][outcome_col].iloc[0]
control_outcome = pair[pair[treatment_col] == 0][outcome_col].iloc[0]
differences.append(treated_outcome - control_outcome)
differences = np.array(differences)
ate = np.mean(differences)
# 配对t检验
if len(differences) > 1:
t_stat, p_value = stats.ttest_1samp(differences, 0)
se = np.std(differences, ddof=1) / np.sqrt(len(differences))
else:
t_stat, p_value, se = np.nan, np.nan, np.nan
# 计算置信区间
if not np.isnan(ate) and not np.isnan(se):
ci_lower = ate - 1.96 * se
ci_upper = ate + 1.96 * se
else:
ci_lower, ci_upper = np.nan, np.nan
return {
'ate': ate,
'se': se,
'ci_lower': ci_lower,
'ci_upper': ci_upper,
'p_value': p_value,
'n_pairs': len(treated) if 'match_id' not in matched_data.columns else len(matched_pairs)
}
# 自助法标准误
def bootstrap_ate(data, treatment_col, outcome_col, ps_col, n_bootstrap=1000):
"""使用自助法计算ATE的标准误"""
ate_bootstrap = []
for i in range(n_bootstrap):
# 自助重抽样
bootstrap_sample = data.sample(n=len(data), replace=True)
# 在重抽样样本上执行匹配
matcher = PropensityScoreMatcher(caliper=0.2, caliper_method='std', replace=False)
matched_bootstrap, _ = matcher.nearest_neighbor_match(
bootstrap_sample, treatment_col, ps_col, outcome_col
)
# 计算ATE
if len(matched_bootstrap) > 0:
treated = matched_bootstrap[matched_bootstrap[treatment_col] == 1][outcome_col]
control = matched_bootstrap[matched_bootstrap[treatment_col] == 0][outcome_col]
ate_bootstrap.append(treated.mean() - control.mean())
ate_bootstrap = np.array(ate_bootstrap)
return {
'ate_bootstrap_mean': np.mean(ate_bootstrap),
'ate_bootstrap_se': np.std(ate_bootstrap, ddof=1),
'ci_bootstrap_lower': np.percentile(ate_bootstrap, 2.5),
'ci_bootstrap_upper': np.percentile(ate_bootstrap, 97.5)
}
# 估计处理效应
if len(matched_nn) > 0:
# 使用配对t检验
te_result = estimate_treatment_effect(matched_nn, 'treatment', 'income', 'paired_t')
# 使用自助法
bootstrap_result = bootstrap_ate(
data_support[data_support['in_common_support']],
'treatment', 'income', 'propensity_score', n_bootstrap=500
)
print("处理效应估计结果:")
print(f"ATE (配对t检验): {te_result['ate']:.2f}元")
print(f"标准误: {te_result['se']:.2f}")
print(f"95%置信区间: [{te_result['ci_lower']:.2f}, {te_result['ci_upper']:.2f}]")
print(f"p值: {te_result['p_value']:.4f}")
print(f"匹配对数: {te_result['n_pairs']}")
print(f"\nATE (自助法): {bootstrap_result['ate_bootstrap_mean']:.2f}元")
print(f"自助标准误: {bootstrap_result['ate_bootstrap_se']:.2f}")
print(f"自助95%置信区间: [{bootstrap_result['ci_bootstrap_lower']:.2f}, {bootstrap_result['ci_boostrap_upper']:.2f}]")
# 异质性处理效应分析
def analyze_heterogeneous_te(matched_data, treatment_col, outcome_col, subgroup_vars):
"""分析异质性处理效应"""
results = {}
for subgroup_var in subgroup_vars:
if subgroup_var not in matched_data.columns:
continue
# 按子组分析
unique_values = matched_data[subgroup_var].unique()
subgroup_results = []
for value in unique_values:
subgroup_data = matched_data[matched_data[subgroup_var] == value]
if len(subgroup_data) > 0:
te_subgroup = estimate_treatment_effect(subgroup_data, treatment_col, outcome_col)
te_subgroup['subgroup_value'] = value
te_subgroup['n_obs'] = len(subgroup_data)
subgroup_results.append(te_subgroup)
results[subgroup_var] = pd.DataFrame(subgroup_results)
return results
# 异质性分析
if len(matched_nn) > 0:
subgroup_vars = ['gender', 'urban', 'education_group']
# 创建教育分组
matched_nn['education_group'] = pd.cut(matched_nn['education'],
bins=[0, 12, 16, 20],
labels=['高中及以下', '大学', '研究生'])
hetero_results = analyze_heterogeneous_te(matched_nn, 'treatment', 'income', subgroup_vars)
print("\n异质性处理效应分析:")
for var, result_df in hetero_results.items():
print(f"\n按 {var} 分组:")
if len(result_df) > 0:
display_cols = ['subgroup_value', 'ate', 'ci_lower', 'ci_upper', 'p_value', 'n_obs']
available_cols = [col for col in display_cols if col in result_df.columns]
print(result_df[available_cols].round(4))
# 可视化处理效应
if len(matched_nn) > 0:
fig, axes = plt.subplots(1, 3, figsize=(18, 6))
# 1. 主要ATE估计
methods = ['简单比较', 'PSM-配对t', 'PSM-自助法', '真实效应']
ate_values = [
data[data['treatment']==1]['income'].mean() - data[data['treatment']==0]['income'].mean(),
te_result['ate'],
bootstrap_result['ate_bootstrap_mean'],
treatment_effect
]
ci_lowers = [np.nan, te_result['ci_lower'], bootstrap_result['ci_bootstrap_lower'], np.nan]
ci_uppers = [np.nan, te_result['ci_upper'], bootstrap_result['ci_bootstrap_upper'], np.nan]
colors = ['red', 'orange', 'orange', 'green']
for i, (method, ate, color) in enumerate(zip(methods, ate_values, colors)):
axes[0].barh(i, ate, color=color, alpha=0.7, label=method)
if not np.isnan(ci_lowers[i]) and not np.isnan(ci_uppers[i]):
axes[0].errorbar(ate, i, xerr=[[ate - ci_lowers[i]], [ci_uppers[i] - ate]],
fmt='none', color='black', capsize=5)
axes[0].axvline(x=treatment_effect, color='green', linestyle='--', alpha=0.7, label='真实效应')
axes[0].set_yticks(range(len(methods)))
axes[0].set_yticklabels(methods)
axes[0].set_xlabel('处理效应估计 (元)')
axes[0].set_title('不同方法的处理效应估计比较')
axes[0].legend()
axes[0].grid(True, alpha=0.3)
# 2. 异质性分析 - 性别
if 'gender' in hetero_results and len(hetero_results['gender']) > 0:
gender_results = hetero_results['gender']
gender_labels = ['男性', '女性']
gender_ates = gender_results['ate'].values
axes[1].bar(gender_labels, gender_ates, color=['blue', 'pink'], alpha=0.7)
for i, (ate, ci_l, ci_u) in enumerate(zip(gender_results['ate'],
gender_results['ci_lower'],
gender_results['ci_upper'])):
axes[1].errorbar(i, ate, yerr=[[ate - ci_l], [ci_u - ate]],
fmt='none', color='black', capsize=5)
axes[1].axhline(y=te_result['ate'], color='red', linestyle='--', alpha=0.7, label='总体ATE')
axes[1].set_ylabel('处理效应 (元)')
axes[1].set_title('按性别的异质性处理效应')
axes[1].legend()
axes[1].grid(True, alpha=0.3)
# 3. 异质性分析 - 教育
if 'education_group' in hetero_results and len(hetero_results['education_group']) > 0:
edu_results = hetero_results['education_group']
edu_labels = edu_results['subgroup_value'].values
edu_ates = edu_results['ate'].values
axes[2].bar(edu_labels, edu_ates, alpha=0.7)
for i, (ate, ci_l, ci_u) in enumerate(zip(edu_results['ate'],
edu_results['ci_lower'],
edu_results['ci_upper'])):
axes[2].errorbar(i, ate, yerr=[[ate - ci_l], [ci_u - ate]],
fmt='none', color='black', capsize=5)
axes[2].axhline(y=te_result['ate'], color='red', linestyle='--', alpha=0.7, label='总体ATE')
axes[2].set_ylabel('处理效应 (元)')
axes[2].set_title('按教育程度的异质性处理效应')
axes[2].legend()
axes[2].grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
处理效应估计显示,经过倾向得分匹配后,我们得到了更接近真实处理效应的估计值,这表明PSM方法有效地减少了选择偏差。
VIII. 敏感性分析
敏感性分析用于评估我们的因果结论对未观测混淆变量的稳健性。
# 敏感性分析
def rosenbaum_sensitivity_analysis(matched_data, treatment_col, outcome_col, gamma_range=np.linspace(1, 3, 21)):
"""
Rosenbaum敏感性分析
评估结论对隐藏偏差的敏感性
参数:
- gamma: odds of treatment ratio due to unobserved confounders
"""
if len(matched_data) == 0 or 'match_id' not in matched_data.columns:
return pd.DataFrame()
# 按匹配对计算差异
matched_pairs = matched_data.groupby('match_id')
differences = []
for match_id, pair in matched_pairs:
if len(pair) == 2:
treated_outcome = pair[pair[treatment_col] == 1][outcome_col].iloc[0]
control_outcome = pair[pair[treatment_col] == 0][outcome_col].iloc[0]
differences.append(treated_outcome - control_outcome)
differences = np.array(differences)
n_pairs = len(differences)
if n_pairs == 0:
return pd.DataFrame()
# 符号秩检验的敏感性分析
sensitivity_results = []
for gamma in gamma_range:
# 计算在隐藏偏差下的p值范围
# 这里使用简化的方法,实际Rosenbaum界限计算更复杂
positive_diff = sum(differences > 0)
negative_diff = sum(differences < 0)
# 简化计算:假设隐藏偏差会改变一定比例的对
max_favor_treatment = positive_diff + min(negative_diff, int(n_pairs * (gamma-1)/gamma))
min_favor_treatment = max(0, positive_diff - int(n_pairs * (gamma-1)))
# 二项检验p值
p_max = stats.binom_test(max_favor_treatment, n_pairs, 0.5, alternative='greater')
p_min = stats.binom_test(min_favor_treatment, n_pairs, 0.5, alternative='greater')
sensitivity_results.append({
'gamma': gamma,
'p_value_upper': p_max,
'p_value_lower': p_min,
'significant_upper': p_max < 0.05,
'significant_lower': p_min < 0.05
})
return pd.DataFrame(sensitivity_results)
# 执行敏感性分析
if len(matched_nn) > 0:
sensitivity_df = rosenbaum_sensitivity_analysis(matched_nn, 'treatment', 'income')
if len(sensitivity_df) > 0:
print("Rosenbaum敏感性分析结果:")
print(sensitivity_df.round(4))
# 找到结论变得不显著的gamma值
critical_gamma = sensitivity_df[sensitivity_df['significant_lower'] == False]['gamma'].min()
if not np.isnan(critical_gamma):
print(f"\n结论变得不稳健的临界Gamma值: {critical_gamma:.2f}")
print("Gamma表示由于未观测混淆变量导致的处理分配几率比")
else:
print("\n在分析的Gamma范围内,结论始终保持稳健")
# 可视化敏感性分析
if len(sensitivity_df) > 0:
plt.figure(figsize=(10, 6))
plt.plot(sensitivity_df['gamma'], sensitivity_df['p_value_lower'],
'b-', label='p值下限', linewidth=2)
plt.plot(sensitivity_df['gamma'], sensitivity_df['p_value_upper'],
'r-', label='p值上限', linewidth=2)
plt.axhline(y=0.05, color='black', linestyle='--', alpha=0.7, label='显著性阈值 (0.05)')
if not np.isnan(critical_gamma):
plt.axvline(x=critical_gamma, color='green', linestyle='--',
label=f'临界Gamma: {critical_gamma:.2f}')
plt.xlabel('Gamma (未观测混淆的强度)')
plt.ylabel('p值')
plt.title('Rosenbaum敏感性分析')
plt.legend()
plt.grid(True, alpha=0.3)
plt.show()
# 其他敏感性分析:未观测混淆的影响
def assess_unobserved_confounding(ate_estimate, confounding_strength):
"""评估未观测混淆对处理效应估计的潜在影响"""
results = []
for strength in confounding_strength:
# 假设未观测混淆与处理和结果的关系强度
# 这里进行简化计算
bias_direction = np.random.choice([-1, 1]) # 混淆的方向不确定
bias_magnitude = strength * ate_estimate # 偏差大小
adjusted_ate = ate_estimate - bias_direction * bias_magnitude
results.append({
'confounding_strength': strength,
'bias_direction': bias_direction,
'bias_magnitude': bias_magnitude,
'adjusted_ate': adjusted_ate,
'bias_percentage': bias_magnitude / ate_estimate * 100
})
return pd.DataFrame(results)
# 评估未观测混淆的影响
confounding_strengths = [0.1, 0.2, 0.3, 0.4, 0.5]
if len(matched_nn) > 0 and not np.isnan(te_result['ate']):
confounding_df = assess_unobserved_confounding(te_result['ate'], confounding_strengths)
print("\n未观测混淆敏感性分析:")
print(confounding_df.round(4))
# 综合敏感性报告
def generate_sensitivity_report(matched_data, treatment_col, outcome_col, ate_estimate):
"""生成敏感性分析综合报告"""
report = {
'样本量': {
'总匹配对': len(matched_data) // 2 if len(matched_data) > 0 else 0,
'处理组': sum(matched_data[treatment_col] == 1) if len(matched_data) > 0 else 0,
'对照组': sum(matched_data[treatment_col] == 0) if len(matched_data) > 0 else 0
},
'处理效应': {
'ATE估计': ate_estimate,
'95%CI': f"[{te_result['ci_lower']:.2f}, {te_result['ci_upper']:.2f}]" if 'ci_lower' in te_result else "N/A",
'p值': te_result['p_value'] if 'p_value' in te_result else np.nan
},
'敏感性': {
'Rosenbaum临界Gamma': critical_gamma if 'critical_gamma' in locals() else "N/A",
'结论稳健性': '稳健' if ('critical_gamma' in locals() and critical_gamma > 1.5) else '需要谨慎解释'
}
}
return report
# 生成敏感性报告
if len(matched_nn) > 0:
sensitivity_report = generate_sensitivity_report(matched_nn, 'treatment', 'income', te_result['ate'])
print("\n=== 敏感性分析综合报告 ===")
for category, metrics in sensitivity_report.items():
print(f"\n{category}:")
for metric, value in metrics.items():
print(f" {metric}: {value}")
敏感性分析帮助我们理解研究结论对未观测混淆变量的稳健性,这是观察性研究中至关重要的一步。
- 点赞
- 收藏
- 关注作者
评论(0)