因果推断基础:从相关到因果的思维转变
在数据科学和统计学领域,我们经常面临一个根本性的挑战:如何从观察到的相关性中推断出因果关系?这个问题看似简单,实则深奥。本文将通过理论讲解、实例分析和代码实现,带你完成从相关思维到因果思维的转变。
I. 引言:为什么相关性不等于因果性
在开始技术讨论前,让我们先明确一个基本原则:相关性不等于因果性。这是因果推断领域的首要戒律,也是我们思维转变的起点。
相关与因果的根本区别
想象一个经典的例子:冰淇淋销量与溺水事件之间存在正相关关系。当冰淇淋销量增加时,溺水事件也往往增多。这是否意味着吃冰淇淋会导致溺水?显然不是。这里存在一个混淆变量——气温。高温天气既促使人们购买更多冰淇淋,也促使更多人游泳,从而增加了溺水风险。
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.linear_model import LinearRegression
# 模拟数据:气温、冰淇淋销量和溺水事件
np.random.seed(42)
days = 100
temperature = np.random.normal(25, 5, days) # 平均温度25度
# 冰淇淋销量与气温正相关
ice_cream_sales = 50 + 2 * temperature + np.random.normal(0, 10, days)
# 溺水事件与气温正相关
drowning_incidents = 5 + 0.3 * temperature + np.random.normal(0, 2, days)
# 创建数据框
data = pd.DataFrame({
'temperature': temperature,
'ice_cream_sales': ice_cream_sales,
'drowning_incidents': drowning_incidents
})
# 计算相关系数
correlation = data['ice_cream_sales'].corr(data['drowning_incidents'])
print(f"冰淇淋销量与溺水事件的相关系数: {correlation:.3f}")
# 可视化
fig, axes = plt.subplots(1, 2, figsize=(12, 5))
# 散点图
axes[0].scatter(data['ice_cream_sales'], data['drowning_incidents'], alpha=0.7)
axes[0].set_xlabel('冰淇淋销量')
axes[0].set_ylabel('溺水事件')
axes[0].set_title(f'冰淇淋销量 vs 溺水事件 (r = {correlation:.3f})')
# 分别对温度作图
colors = data['temperature']
scatter = axes[1].scatter(data['ice_cream_sales'], data['drowning_incidents'],
c=colors, cmap='viridis', alpha=0.7)
axes[1].set_xlabel('冰淇淋销量')
axes[1].set_ylabel('溺水事件')
axes[1].set_title('按温度着色: 冰淇淋销量 vs 溺水事件')
plt.colorbar(scatter, ax=axes[1], label='温度')
plt.tight_layout()
plt.show()
这段代码模拟了冰淇淋销量、气温和溺水事件之间的关系。当我们仅看冰淇淋销量和溺水事件时,会发现明显的正相关。但当我们引入温度变量后,真正的因果关系变得清晰:温度是影响两者的共同原因。
因果推断的基本问题
因果推断的核心挑战在于我们永远无法同时观察到一个单元的两种潜在结果。对于一个特定的个体,我们只能观察到他们在接受干预或不接受干预中的一种状态,而无法知道另一种状态会是什么样子。这就是著名的因果推断基本问题。
II. 因果推断的框架与符号
要建立因果思维,我们需要一个清晰的框架和符号系统。目前最主流的是Rubin因果模型(也称为潜在结果框架)和Pearl的因果图模型。
潜在结果框架
在潜在结果框架中,对于每个个体i,我们定义:
- Y₁ᵢ:个体i接受干预时的潜在结果
- Y₀ᵢ:个体i未接受干预时的潜在结果
- Dᵢ:个体i实际接受的干预状态(1表示接受,0表示未接受)
个体i的因果效应定义为:τᵢ = Y₁ᵢ - Y₀ᵢ
但我们只能观察到:Yᵢ = DᵢY₁ᵢ + (1-Dᵢ)Y₀ᵢ
符号 | 含义 | 观察情况 |
---|---|---|
Y₁ᵢ | 个体i接受干预时的潜在结果 | 仅当Dᵢ=1时可观察 |
Y₀ᵢ | 个体i未接受干预时的潜在结果 | 仅当Dᵢ=0时可观察 |
τᵢ | 个体处理效应 | 永远不可直接观察 |
Yᵢ | 实际观察到的结果 | 总是可观察 |
平均处理效应(ATE)
由于个体处理效应τᵢ不可直接观察,我们通常关注平均处理效应:
ATE = E[Y₁ - Y₀] = E[Y₁] - E[Y₀]
但在观察性研究中,简单的均值比较E[Y|D=1] - E[Y|D=0]往往不等于ATE,因为可能存在选择偏差。
# 模拟数据:展示选择偏差的问题
np.random.seed(123)
n = 1000
# 模拟能力变量(混淆变量)
ability = np.random.normal(0, 1, n)
# 干预分配与能力相关(能力高的人更可能接受培训)
D = np.random.binomial(1, 1/(1+np.exp(-ability))) # 逻辑函数
# 潜在结果:结果与能力正相关
Y0 = ability + np.random.normal(0, 0.5, n) # 未接受干预的结果
Y1 = Y0 + 0.5 + np.random.normal(0, 0.5, n) # 接受干预的结果,真实效应为0.5
# 观察到的结果
Y_obs = D * Y1 + (1 - D) * Y0
# 创建数据框
data_bias = pd.DataFrame({
'ability': ability,
'D': D,
'Y0': Y0,
'Y1': Y1,
'Y_obs': Y_obs
})
# 计算简单均值差异
naive_ate = data_bias[data_bias['D'] == 1]['Y_obs'].mean() - data_bias[data_bias['D'] == 0]['Y_obs'].mean()
true_ate = (Y1 - Y0).mean()
print(f"简单均值差异: {naive_ate:.3f}")
print(f"真实平均处理效应: {true_ate:.3f}")
print(f"偏差: {naive_ate - true_ate:.3f}")
# 可视化
fig, axes = plt.subplots(1, 2, figsize=(12, 5))
# 按干预状态显示能力分布
axes[0].hist(data_bias[data_bias['D'] == 0]['ability'], alpha=0.7, label='未接受干预 (D=0)', bins=20)
axes[0].hist(data_bias[data_bias['D'] == 1]['ability'], alpha=0.7, label='接受干预 (D=1)', bins=20)
axes[0].set_xlabel('能力')
axes[0].set_ylabel('频数')
axes[0].set_title('干预组和对照组的能力分布')
axes[0].legend()
# 按干预状态显示结果分布
axes[1].hist(data_bias[data_bias['D'] == 0]['Y_obs'], alpha=0.7, label='未接受干预 (D=0)', bins=20)
axes[1].hist(data_bias[data_bias['D'] == 1]['Y_obs'], alpha=0.7, label='接受干预 (D=1)', bins=20)
axes[1].set_xlabel('观察结果 Y')
axes[1].set_ylabel('频数')
axes[1].set_title('干预组和对照组的结果分布')
axes[1].legend()
plt.tight_layout()
plt.show()
这段代码展示了选择偏差的问题。由于能力高的人更可能接受培训,简单比较培训组和未培训组的结果会高估培训效果,因为两组人在培训前就存在系统性差异。
III. 因果图与d-分离
因果图是表示变量间因果关系的强大工具。在因果图中,节点代表变量,有向边代表因果关系。d-分离是判断两个变量在给定条件集下是否独立的重要概念。
因果图的基本结构
因果图中有三种基本结构:
- 链式结构:A → B → C
- 叉式结构:A ← B → C
- 对撞结构:A → B ← C
# 可视化不同的因果结构
import networkx as nx
# 创建图形
fig, axes = plt.subplots(1, 3, figsize=(15, 4))
# 链式结构: A -> B -> C
G_chain = nx.DiGraph()
G_chain.add_edges_from([('A', 'B'), ('B', 'C')])
pos_chain = {'A': (0, 0), 'B': (1, 0), 'C': (2, 0)}
nx.draw(G_chain, pos_chain, with_labels=True, node_size=1500, node_color='lightblue',
arrowsize=20, ax=axes[0])
axes[0].set_title('链式结构: A → B → C')
# 叉式结构: A <- B -> C
G_fork = nx.DiGraph()
G_fork.add_edges_from([('B', 'A'), ('B', 'C')])
pos_fork = {'A': (0, 0), 'B': (1, 0), 'C': (2, 0)}
nx.draw(G_fork, pos_fork, with_labels=True, node_size=1500, node_color='lightgreen',
arrowsize=20, ax=axes[1])
axes[1].set_title('叉式结构: A ← B → C')
# 对撞结构: A -> B <- C
G_collider = nx.DiGraph()
G_collider.add_edges_from([('A', 'B'), ('C', 'B')])
pos_collider = {'A': (0, 0), 'B': (1, 0), 'C': (2, 0)}
nx.draw(G_collider, pos_collider, with_labels=True, node_size=1500, node_color='lightcoral',
arrowsize=20, ax=axes[2])
axes[2].set_title('对撞结构: A → B ← C')
plt.tight_layout()
plt.show()
d-分离准则
d-分离是判断在给定条件集Z下,两个变量X和Y是否独立的图形准则:
- 在链式结构A → B → C中,A和C相关,但以B为条件时,A和C独立
- 在叉式结构A ← B → C中,A和C相关,但以B为条件时,A和C独立
- 在对撞结构A → B ← C中,A和C原本独立,但以B或B的后代为条件时,A和C可能相关
# 模拟三种结构下的条件独立性
np.random.seed(42)
n = 1000
# 链式结构: A -> B -> C
A_chain = np.random.normal(0, 1, n)
B_chain = A_chain + np.random.normal(0, 0.5, n)
C_chain = B_chain + np.random.normal(0, 0.5, n)
# 叉式结构: A <- B -> C
B_fork = np.random.normal(0, 1, n)
A_fork = B_fork + np.random.normal(0, 0.5, n)
C_fork = B_fork + np.random.normal(0, 0.5, n)
# 对撞结构: A -> B <- C
A_collider = np.random.normal(0, 1, n)
C_collider = np.random.normal(0, 1, n)
B_collider = A_collider + C_collider + np.random.normal(0, 0.5, n)
# 计算相关系数
corr_chain_uncond = np.corrcoef(A_chain, C_chain)[0,1]
corr_chain_cond = partial_corr(A_chain, C_chain, [B_chain]) # 需要实现偏相关
corr_fork_uncond = np.corrcoef(A_fork, C_fork)[0,1]
corr_fork_cond = partial_corr(A_fork, C_fork, [B_fork])
corr_collider_uncond = np.corrcoef(A_collider, C_collider)[0,1]
corr_collider_cond = partial_corr(A_collider, C_collider, [B_collider])
# 实现偏相关函数
def partial_corr(x, y, z_list):
"""计算x和y在控制z_list中变量后的偏相关系数"""
# 为x和y对z_list回归并取残差
X = np.column_stack([np.ones(len(x))] + [z for z in z_list])
beta_x = np.linalg.lstsq(X, x, rcond=None)[0]
resid_x = x - X @ beta_x
beta_y = np.linalg.lstsq(X, y, rcond=None)[0]
resid_y = y - X @ beta_y
return np.corrcoef(resid_x, resid_y)[0,1]
# 重新计算条件相关
corr_chain_cond = partial_corr(A_chain, C_chain, [B_chain])
corr_fork_cond = partial_corr(A_fork, C_fork, [B_fork])
corr_collider_cond = partial_corr(A_collider, C_collider, [B_collider])
# 创建结果显示表
structure_data = {
'结构类型': ['链式 A→B→C', '叉式 A←B→C', '对撞 A→B←C'],
'无条件相关': [f"{corr_chain_uncond:.3f}", f"{corr_fork_uncond:.3f}", f"{corr_collider_uncond:.3f}"],
'条件相关(以B为条件)': [f"{corr_chain_cond:.3f}", f"{corr_fork_cond:.3f}", f"{corr_collider_cond:.3f}"],
'd-分离预测': ['相关变独立', '相关变独立', '独立变相关']
}
structure_df = pd.DataFrame(structure_data)
print("不同因果结构下的相关性模式:")
print(structure_df)
这个模拟验证了d-分离准则:在链式和叉式结构中,以中间变量为条件会使原本相关的变量变得独立;而在对撞结构中,以中间变量为条件会使原本独立的变量变得相关。
IV. ignorability与因果识别
要识别因果效应,我们需要一些假设。最重要的假设之一是ignorability(可忽略性)或无混淆假设。
ignorability假设
ignorability假设表明,在给定协变量X的条件下,干预分配D与潜在结果独立:
(Y₁, Y₀) ⫫ D | X
这意味着,一旦我们控制了相关的协变量X,干预分配就像是随机的。
假设名称 | 数学表达 | 直观解释 |
---|---|---|
可忽略性 | (Y₁, Y₀) ⫫ D | X | 给定X后,干预分配与潜在结果独立 |
一致性 | Y = D·Y₁ + (1-D)·Y₀ | 观察到的结果等于相应潜在结果 |
正值 | 0 < P(D=1|X) < 1 | 每个单元都有接受和不接受干预的可能 |
倾向得分与匹配方法
当ignorability成立时,我们可以使用各种方法估计因果效应。其中最流行的是基于倾向得分的方法。
倾向得分定义为:e(X) = P(D=1|X)
根据Rosenbaum和Rubin的理论,在ignorability假设下:
(Y₁, Y₀) ⫫ D | e(X)
这意味着我们可以基于倾向得分而不是整个X来调整选择偏差。
from sklearn.linear_model import LogisticRegression
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.neighbors import NearestNeighbors
# 模拟更复杂的数据
np.random.seed(123)
n = 2000
# 生成多个协变量
X1 = np.random.normal(0, 1, n) # 年龄
X2 = np.random.normal(0, 1, n) # 收入
X3 = np.random.binomial(1, 0.5, n) # 性别
# 真实的倾向得分模型
true_ps = 1 / (1 + np.exp(-(0.5 + 0.8*X1 + 0.6*X2 - 0.4*X3)))
D = np.random.binomial(1, true_ps)
# 潜在结果模型
Y0 = 20 + 3*X1 + 2*X2 + 5*X3 + np.random.normal(0, 2, n)
Y1 = Y0 + 5 + 1.5*X1 + np.random.normal(0, 2, n) # 异质处理效应
Y_obs = D * Y1 + (1 - D) * Y0
# 创建数据框
data_ps = pd.DataFrame({
'X1': X1, 'X2': X2, 'X3': X3,
'D': D, 'Y_obs': Y_obs,
'true_ps': true_ps
})
print("数据摘要:")
print(f"干预组样本数: {sum(D)}")
print(f"对照组样本数: {sum(1-D)}")
print(f"真实ATE: {(Y1 - Y0).mean():.3f}")
# 估计倾向得分
X_features = data_ps[['X1', 'X2', 'X3']]
ps_model = LogisticRegression()
ps_model.fit(X_features, D)
data_ps['est_ps'] = ps_model.predict_proba(X_features)[:, 1]
# 可视化倾向得分分布
fig, axes = plt.subplots(1, 2, figsize=(12, 5))
# 倾向得分分布
axes[0].hist(data_ps[data_ps['D'] == 0]['est_ps'], alpha=0.7, label='对照组 (D=0)', bins=20)
axes[0].hist(data_ps[data_ps['D'] == 1]['est_ps'], alpha=0.7, label='干预组 (D=1)', bins=20)
axes[0].set_xlabel('倾向得分')
axes[0].set_ylabel('频数')
axes[0].set_title('干预组和对照组的倾向得分分布')
axes[0].legend()
# 共同支持区域
ps_range = np.linspace(0, 1, 100)
control_density = np.array([np.mean(data_ps[data_ps['D'] == 0]['est_ps'] > p) for p in ps_range])
treatment_density = np.array([np.mean(data_ps[data_ps['D'] == 1]['est_ps'] > p) for p in ps_range])
axes[1].plot(ps_range, control_density, label='对照组')
axes[1].plot(ps_range, treatment_density, label='干预组')
axes[1].set_xlabel('倾向得分阈值')
axes[1].set_ylabel('超过阈值的比例')
axes[1].set_title('共同支持区域')
axes[1].legend()
plt.tight_layout()
plt.show()
倾向得分匹配
现在我们将使用估计的倾向得分进行匹配:
# 倾向得分匹配实现
def propensity_score_matching(data, treatment_col='D', ps_col='est_ps', outcome_col='Y_obs', caliper=0.1):
"""
执行倾向得分匹配
参数:
- data: 包含倾向得分和结果的数据框
- treatment_col: 干预指示变量列名
- ps_col: 倾向得分列名
- outcome_col: 结果变量列名
- caliper: 匹配的最大允许倾向得分差异
返回:
- matched_data: 匹配后的数据
- ate: 估计的ATE
"""
treatment = data[data[treatment_col] == 1]
control = data[data[treatment_col] == 0]
# 使用最近邻匹配
nbrs = NearestNeighbors(n_neighbors=1, metric='euclidean')
nbrs.fit(control[[ps_col]])
distances, indices = nbrs.kneighbors(treatment[[ps_col]])
# 应用caliper
matches = []
for i, (dist, idx) in enumerate(zip(distances, indices)):
if dist[0] <= caliper:
treated_row = treatment.iloc[i].copy()
control_row = control.iloc[idx[0]].copy()
# 添加匹配对标识
treated_row['match_id'] = i
control_row['match_id'] = i
matches.extend([treated_row, control_row])
matched_data = pd.DataFrame(matches)
# 计算匹配后的ATE
matched_treatment = matched_data[matched_data[treatment_col] == 1][outcome_col]
matched_control = matched_data[matched_data[treatment_col] == 0][outcome_col]
ate = matched_treatment.mean() - matched_control.mean()
return matched_data, ate
# 执行匹配
matched_data, ate_matched = propensity_score_matching(data_ps)
print(f"匹配前样本数: {len(data_ps)}")
print(f"匹配后样本数: {len(matched_data)}")
print(f"匹配后ATE估计: {ate_matched:.3f}")
# 评估匹配质量 - 标准化均值差异
def calculate_smd(data, treatment_col, covariates):
"""计算标准化均值差异"""
smd_results = {}
treatment = data[data[treatment_col] == 1]
control = data[data[treatment_col] == 0]
for cov in covariates:
treat_mean = treatment[cov].mean()
control_mean = control[cov].mean()
treat_std = treatment[cov].std()
control_std = control[cov].std()
pooled_std = np.sqrt((treat_std**2 + control_std**2) / 2)
smd = (treat_mean - control_mean) / pooled_std
smd_results[cov] = smd
return smd_results
# 比较匹配前后的平衡性
covariates = ['X1', 'X2', 'X3']
smd_before = calculate_smd(data_ps, 'D', covariates)
smd_after = calculate_smd(matched_data, 'D', covariates)
balance_df = pd.DataFrame({
'协变量': covariates,
'匹配前SMD': [smd_before[c] for c in covariates],
'匹配后SMD': [smd_after[c] for c in covariates]
})
print("\n匹配前后平衡性比较 (标准化均值差异):")
print(balance_df)
倾向得分匹配通过创建干预组和对照组之间协变量平衡的数据子集,帮助我们更准确地估计因果效应。
V. 工具变量方法
当ignorability假设不成立时(即存在未观测的混淆变量),我们可以使用工具变量方法。工具变量Z需要满足三个条件:
- 相关性:Z与干预D相关
- 排他性约束:Z仅通过D影响结果Y
- 独立性:Z与未观测的混淆变量独立
工具变量模拟
# 模拟工具变量场景
np.random.seed(123)
n = 5000
# 未观测的混淆变量
U = np.random.normal(0, 1, n)
# 工具变量 Z (满足独立性)
Z = np.random.binomial(1, 0.5, n)
# 干预 D 与 Z 和 U 相关
D = np.random.binomial(1, 1/(1+np.exp(-(0.5 + 1.0*Z + 1.0*U))))
# 结果 Y 与 D 和 U 相关,但与 Z 不直接相关
Y = 10 + 2.0*D + 1.5*U + np.random.normal(0, 1, n)
# 创建数据框
data_iv = pd.DataFrame({
'Z': Z, 'D': D, 'Y': Y, 'U': U
})
print("工具变量数据摘要:")
print(data_iv.describe())
# 可视化工具变量的关系
fig, axes = plt.subplots(1, 3, figsize=(15, 5))
# Z 与 D 的关系
z_d_corr = np.corrcoef(Z, D)[0,1]
axes[0].scatter(Z, D, alpha=0.1)
axes[0].set_xlabel('工具变量 Z')
axes[0].set_ylabel('干预 D')
axes[0].set_title(f'Z 与 D 的相关性: {z_d_corr:.3f}')
# D 与 Y 的关系
d_y_corr = np.corrcoef(D, Y)[0,1]
axes[1].scatter(D, Y, alpha=0.1)
axes[1].set_xlabel('干预 D')
axes[1].set_ylabel('结果 Y')
axes[1].set_title(f'D 与 Y 的相关性: {d_y_corr:.3f}')
# Z 与 Y 的关系
z_y_corr = np.corrcoef(Z, Y)[0,1]
axes[2].scatter(Z, Y, alpha=0.1)
axes[2].set_xlabel('工具变量 Z')
axes[2].set_ylabel('结果 Y')
axes[2].set_title(f'Z 与 Y 的相关性: {z_y_corr:.3f}')
plt.tight_layout()
plt.show()
两阶段最小二乘法
工具变量最常用的估计方法是两阶段最小二乘法:
第一阶段:用工具变量Z预测干预D
D = α₀ + α₁Z + ε
第二阶段:用预测的干预D̂预测结果Y
Y = β₀ + β₁D̂ + η
from sklearn.linear_model import LinearRegression
import statsmodels.api as sm
# 两阶段最小二乘法实现
def two_stage_least_squares(Z, D, Y):
"""
执行两阶段最小二乘法
参数:
- Z: 工具变量
- D: 干预变量
- Y: 结果变量
返回:
- iv_estimate: 工具变量估计值
- first_stage: 第一阶段结果
- second_stage: 第二阶段结果
"""
# 第一阶段: Z -> D
Z_const = sm.add_constant(Z)
first_stage = sm.OLS(D, Z_const).fit()
D_hat = first_stage.predict(Z_const)
# 第二阶段: D_hat -> Y
D_hat_const = sm.add_constant(D_hat)
second_stage = sm.OLS(Y, D_hat_const).fit()
iv_estimate = second_stage.params[1]
return iv_estimate, first_stage, second_stage
# 应用2SLS
iv_estimate, first_stage, second_stage = two_stage_least_squares(
data_iv['Z'], data_iv['D'], data_iv['Y']
)
# 对比不同估计方法
naive_estimate = sm.OLS(data_iv['Y'], sm.add_constant(data_iv['D'])).fit().params[1]
true_effect = 2.0 # 我们模拟中设定的真实效应
print("不同方法的处理效应估计:")
print(f"真实效应: {true_effect:.3f}")
print(f"简单回归估计: {naive_estimate:.3f}")
print(f"工具变量估计: {iv_estimate:.3f}")
# 展示第一阶段和第二阶段结果
print("\n第一阶段回归结果 (Z -> D):")
print(f"工具变量系数: {first_stage.params[1]:.3f}")
print(f"工具变量t值: {first_stage.tvalues[1]:.3f}")
print("\n第二阶段回归结果 (D_hat -> Y):")
print(f"处理效应估计: {second_stage.params[1]:.3f}")
print(f"处理效应标准误: {second_stage.bse[1]:.3f}")
# 可视化2SLS过程
fig, axes = plt.subplots(1, 2, figsize=(12, 5))
# 第一阶段: Z 与 D
axes[0].scatter(data_iv['Z'], data_iv['D'], alpha=0.3, label='实际数据')
z_vals = np.array([0, 1])
d_pred = first_stage.params[0] + first_stage.params[1] * z_vals
axes[0].plot(z_vals, d_pred, 'r-', linewidth=2, label='第一阶段拟合')
axes[0].set_xlabel('工具变量 Z')
axes[0].set_ylabel('干预 D')
axes[0].set_title('第一阶段: Z → D')
axes[0].legend()
# 第二阶段: D_hat 与 Y
axes[1].scatter(D_hat, data_iv['Y'], alpha=0.3, label='预测干预 vs 结果')
d_hat_vals = np.linspace(D_hat.min(), D_hat.max(), 100)
y_pred = second_stage.params[0] + second_stage.params[1] * d_hat_vals
axes[1].plot(d_hat_vals, y_pred, 'r-', linewidth=2, label='第二阶段拟合')
axes[1].set_xlabel('预测干预 D_hat')
axes[1].set_ylabel('结果 Y')
axes[1].set_title('第二阶段: D_hat → Y')
axes[1].legend()
plt.tight_layout()
plt.show()
工具变量方法通过利用仅通过干预影响结果的变量,为我们提供了在存在未观测混淆时估计因果效应的有力工具。
VI. 实例分析:教育对收入的影响
现在让我们应用前面学到的因果推断方法来分析一个实际问题:教育对收入的影响。这是一个经典的因果推断问题,因为存在许多混淆变量(如能力、家庭背景等)。
数据生成与问题设定
# 模拟教育对收入影响的数据
np.random.seed(123)
n = 10000
# 模拟个体特征
ability = np.random.normal(0, 1, n) # 能力(通常不可观测)
family_income = np.random.normal(0, 1, n) # 家庭收入
parent_education = np.random.normal(0, 1, n) # 父母教育程度
urban = np.random.binomial(1, 0.6, n) # 是否城市地区
# 工具变量: 学校到家的距离(满足工具变量假设)
distance = np.random.exponential(2, n) # 距离(公里)
# 教育决策与能力、家庭背景、距离相关
education_prob = 1 / (1 + np.exp(-(
-0.5 + 0.8*ability + 0.6*family_income + 0.5*parent_education - 0.3*distance + 0.4*urban
)))
education = np.random.binomial(1, education_prob) # 1=高等教育, 0=中等教育
# 收入生成过程
# 真实教育效应: 高等教育比中等教育平均高5000元
# 但能力也影响收入(混淆)
base_income = 20000 # 基本收入
education_effect = 5000 # 真实教育效应
income = (base_income +
education_effect * education +
3000 * ability + # 能力对收入的影响
2000 * family_income +
1500 * parent_education +
1000 * urban +
np.random.normal(0, 3000, n))
# 创建数据框
education_data = pd.DataFrame({
'ability': ability,
'family_income': family_income,
'parent_education': parent_education,
'urban': urban,
'distance': distance,
'education': education,
'income': income
})
print("教育对收入影响数据摘要:")
print(f"高等教育比例: {education.mean():.3f}")
print(f"高等教育组平均收入: {education_data[education_data['education'] == 1]['income'].mean():.2f}")
print(f"中等教育组平均收入: {education_data[education_data['education'] == 0]['income'].mean():.2f}")
print(f"简单收入差异: {education_data[education_data['education'] == 1]['income'].mean() - education_data[education_data['education'] == 0]['income'].mean():.2f}")
print(f"真实教育效应: {education_effect:.2f}")
应用不同因果推断方法
现在我们将应用前面讨论的各种因果推断方法来估计教育对收入的影响:
# 方法1: 简单比较(有偏)
simple_effect = (education_data[education_data['education'] == 1]['income'].mean() -
education_data[education_data['education'] == 0]['income'].mean())
# 方法2: 多元回归调整
X_adjust = education_data[['family_income', 'parent_education', 'urban']]
X_adjust = sm.add_constant(X_adjust)
y_income = education_data['income']
regression_model = sm.OLS(y_income, sm.add_constant(
pd.DataFrame({
'education': education_data['education'],
'family_income': education_data['family_income'],
'parent_education': education_data['parent_education'],
'urban': education_data['urban']
})
)).fit()
regression_effect = regression_model.params['education']
# 方法3: 倾向得分匹配
# 估计倾向得分
ps_features = education_data[['family_income', 'parent_education', 'urban']]
ps_model_edu = LogisticRegression()
ps_model_edu.fit(ps_features, education_data['education'])
education_data['ps_education'] = ps_model_edu.predict_proba(ps_features)[:, 1]
# 执行匹配
matched_edu, ps_effect = propensity_score_matching(education_data, 'education', 'ps_education', 'income')
# 方法4: 工具变量(使用距离作为工具)
iv_effect_edu, first_stage_edu, second_stage_edu = two_stage_least_squares(
education_data['distance'], education_data['education'], education_data['income']
)
# 汇总结果
results_comparison = pd.DataFrame({
'方法': ['简单比较', '多元回归', '倾向得分匹配', '工具变量', '真实效应'],
'估计效应': [simple_effect, regression_effect, ps_effect, iv_effect_edu, education_effect],
'偏差': [simple_effect - education_effect,
regression_effect - education_effect,
ps_effect - education_effect,
iv_effect_edu - education_effect, 0]
})
print("不同方法的教育效应估计比较:")
print(results_comparison.round(2))
# 可视化比较
fig, axes = plt.subplots(1, 2, figsize=(15, 6))
# 效应估计比较
methods = results_comparison['方法']
effects = results_comparison['估计效应']
colors = ['red' if '简单' in method else 'blue' if method == '真实效应' else 'orange' for method in methods]
axes[0].barh(methods, effects, color=colors, alpha=0.7)
axes[0].axvline(x=education_effect, color='blue', linestyle='--', alpha=0.7, label='真实效应')
axes[0].set_xlabel('教育效应估计 (元)')
axes[0].set_title('不同方法的教育效应估计')
axes[0].legend()
# 偏差比较
biases = results_comparison['偏差'][:-1] # 排除真实效应
method_biases = methods[:-1]
axes[1].barh(method_biases, biases, color=['red', 'orange', 'orange', 'green'], alpha=0.7)
axes[1].axvline(x=0, color='blue', linestyle='--', alpha=0.7)
axes[1].set_xlabel('估计偏差 (元)')
axes[1].set_title('各方法估计偏差')
plt.tight_layout()
plt.show()
方法效果评估
让我们进一步评估每种方法的性能:
# 评估不同方法的性能
def evaluate_balance(data, treatment_col, covariates):
"""评估协变量平衡性"""
balance_results = {}
for method_name, method_data in data.items():
smd_results = calculate_smd(method_data, treatment_col, covariates)
mean_abs_smd = np.mean(np.abs(list(smd_results.values())))
balance_results[method_name] = mean_abs_smd
return balance_results
# 准备不同方法的数据
method_data = {
'原始数据': education_data,
'匹配后数据': matched_edu
}
covariates_edu = ['family_income', 'parent_education', 'urban']
balance_results = evaluate_balance(method_data, 'education', covariates_edu)
print("协变量平衡性评估 (平均绝对SMD):")
for method, balance in balance_results.items():
print(f"{method}: {balance:.4f}")
# 敏感性分析
def sensitivity_analysis(base_effect, confounding_strength):
"""
进行简单的敏感性分析
评估未观测混淆可能带来的偏差
"""
potential_biases = []
for strength in confounding_strength:
# 假设能力与教育和收入的关系强度
# 这里简化处理,实际敏感性分析更复杂
bias = strength * 3000 # 能力对收入的影响
adjusted_effect = base_effect - bias
potential_biases.append((strength, bias, adjusted_effect))
return pd.DataFrame(potential_biases,
columns=['混淆强度', '估计偏差', '调整后效应'])
# 进行敏感性分析
confounding_strengths = [0.1, 0.2, 0.3, 0.4, 0.5]
sensitivity_results = sensitivity_analysis(regression_effect, confounding_strengths)
print("\n敏感性分析结果 (多元回归方法):")
print(sensitivity_results)
# 可视化敏感性分析
plt.figure(figsize=(10, 6))
plt.plot(sensitivity_results['混淆强度'], sensitivity_results['调整后效应'],
'o-', linewidth=2, markersize=8)
plt.axhline(y=education_effect, color='red', linestyle='--', label='真实效应')
plt.axhline(y=regression_effect, color='blue', linestyle='--', label='原始估计')
plt.xlabel('未观测混淆的假设强度')
plt.ylabel('调整后的教育效应估计')
plt.title('敏感性分析: 未观测混淆对估计的影响')
plt.legend()
plt.grid(True, alpha=0.3)
plt.show()
这个实例分析展示了如何应用不同的因果推断方法来解决实际问题。我们可以看到:
- 简单比较严重高估教育效应,因为它忽略了能力等混淆变量
- 多元回归和倾向得分匹配在一定程度上减少了偏差,但仍有残余混淆
- 工具变量方法提供了最接近真实效应的估计,但依赖于工具变量的有效性假设
- 点赞
- 收藏
- 关注作者
评论(0)