因果图与结构因果模型入门
I. 引言:从相关到因果的思维转变
为什么需要因果图?
在统计学和数据分析中,我们经常被教导"相关不等于因果"。但一个更深刻的问题是:我们如何知道什么时候相关可以暗示因果? 传统的统计方法往往停留在变量间的关联性分析,而因果图则为我们提供了一种系统性的框架来表达和检验因果假设。
经典案例:观察数据显示冰淇淋销量与溺水死亡数正相关。这是否意味着吃冰淇淋会导致溺水?显然不是。这里存在一个混淆变量——气温。高温既促使人们购买冰淇淋,也促使更多人游泳,从而增加溺水风险。因果图能够清晰地表达这种关系。
结构因果模型的革命性意义
Judea Pearl提出的结构因果模型框架被誉为"因果推断的革命",因为它:
- 提供了清晰的语言表达因果假设
- 形式化了因果问题的数学基础
- 开发了系统的因果推断算法
- 统一了统计学、流行病学、经济学等领域的因果思维
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import networkx as nx
from sklearn.linear_model import LinearRegression
import statsmodels.api as sm
import warnings
warnings.filterwarnings('ignore')
# 设置中文字体
plt.rcParams['font.sans-serif'] = ['SimHei']
plt.rcParams['axes.unicode_minus'] = False
print("因果图与结构因果模型入门")
print("=" * 50)
II. 因果图的基本概念与符号
图论基础
因果图建立在图论基础上,包含两个基本元素:
- 节点:表示变量
- 边:表示变量间的关系
三种基本结构
因果图中有三种基本结构,每种都有独特的统计和因果性质:
# 可视化三种基本因果结构
fig, axes = plt.subplots(1, 3, figsize=(15, 4))
# 1. 链式结构: A → B → C
G_chain = nx.DiGraph()
G_chain.add_edges_from([('A', 'B'), ('B', 'C')])
pos_chain = {'A': (0, 0), 'B': (1, 0), 'C': (2, 0)}
nx.draw(G_chain, pos_chain, with_labels=True, node_size=1500,
node_color='lightblue', arrowsize=20, ax=axes[0])
axes[0].set_title('链式结构: A → B → C')
# 2. 叉式结构: A ← B → C
G_fork = nx.DiGraph()
G_fork.add_edges_from([('B', 'A'), ('B', 'C')])
pos_fork = {'A': (0, 0), 'B': (1, 0), 'C': (2, 0)}
nx.draw(G_fork, pos_fork, with_labels=True, node_size=1500,
node_color='lightgreen', arrowsize=20, ax=axes[1])
axes[1].set_title('叉式结构: A ← B → C')
# 3. 对撞结构: A → B ← C
G_collider = nx.DiGraph()
G_collider.add_edges_from([('A', 'B'), ('C', 'B')])
pos_collider = {'A': (0, 0), 'B': (1, 0), 'C': (2, 0)}
nx.draw(G_collider, pos_collider, with_labels=True, node_size=1500,
node_color='lightcoral', arrowsize=20, ax=axes[2])
axes[2].set_title('对撞结构: A → B ← C')
plt.tight_layout()
plt.show()
d-分离准则
d-分离是因果图理论的核心概念,它告诉我们如何在给定某些变量的条件下判断另外两个变量是否独立。
# 模拟三种结构下的条件独立性
def simulate_causal_structures(n=1000):
"""模拟三种基本结构的数值例子"""
np.random.seed(42)
# 链式结构: A → B → C
A_chain = np.random.normal(0, 1, n)
B_chain = 0.5 * A_chain + np.random.normal(0, 0.5, n)
C_chain = 0.5 * B_chain + np.random.normal(0, 0.5, n)
# 叉式结构: A ← B → C
B_fork = np.random.normal(0, 1, n)
A_fork = 0.5 * B_fork + np.random.normal(0, 0.5, n)
C_fork = 0.5 * B_fork + np.random.normal(0, 0.5, n)
# 对撞结构: A → B ← C
A_collider = np.random.normal(0, 1, n)
C_collider = np.random.normal(0, 1, n)
B_collider = 0.5 * A_collider + 0.5 * C_collider + np.random.normal(0, 0.5, n)
return {
'chain': (A_chain, B_chain, C_chain),
'fork': (A_fork, B_fork, C_fork),
'collider': (A_collider, B_collider, C_collider)
}
# 计算条件独立性
def check_conditional_independence(X, Y, Z=None):
"""检查条件独立性"""
if Z is None:
# 无条件相关
return np.corrcoef(X, Y)[0, 1]
else:
# 条件相关(偏相关)
# 对X和Y分别对Z回归,然后计算残差的相关性
if Z.ndim == 1:
Z = Z.reshape(-1, 1)
model_X = LinearRegression().fit(Z, X)
resid_X = X - model_X.predict(Z)
model_Y = LinearRegression().fit(Z, Y)
resid_Y = Y - model_Y.predict(Z)
return np.corrcoef(resid_X, resid_Y)[0, 1]
# 运行模拟
data = simulate_causal_structures()
# 计算各种情况下的相关性
results = []
# 链式结构
corr_chain_uncond = check_conditional_independence(data['chain'][0], data['chain'][2])
corr_chain_cond = check_conditional_independence(data['chain'][0], data['chain'][2], data['chain'][1])
results.append(['链式 A→B→C', corr_chain_uncond, corr_chain_cond, '相关变独立'])
# 叉式结构
corr_fork_uncond = check_conditional_independence(data['fork'][0], data['fork'][2])
corr_fork_cond = check_conditional_independence(data['fork'][0], data['fork'][2], data['fork'][1])
results.append(['叉式 A←B→C', corr_fork_uncond, corr_fork_cond, '相关变独立'])
# 对撞结构
corr_collider_uncond = check_conditional_independence(data['collider'][0], data['collider'][2])
corr_collider_cond = check_conditional_independence(data['collider'][0], data['collider'][2], data['collider'][1])
results.append(['对撞 A→B←C', corr_collider_uncond, corr_collider_cond, '独立变相关'])
# 显示结果
results_df = pd.DataFrame(results,
columns=['结构类型', '无条件相关', '条件相关(以B为条件)', 'd-分离预测'])
print("三种基本结构的条件独立性验证:")
print(results_df.round(4))
# 可视化相关性模式
fig, axes = plt.subplots(1, 3, figsize=(15, 4))
structures = ['链式', '叉式', '对撞']
unconditional = [corr_chain_uncond, corr_fork_uncond, corr_collider_uncond]
conditional = [corr_chain_cond, corr_fork_cond, corr_collider_cond]
x = np.arange(len(structures))
width = 0.35
axes[0].bar(x - width/2, unconditional, width, label='无条件相关', alpha=0.7)
axes[0].bar(x + width/2, conditional, width, label='条件相关(以B为条件)', alpha=0.7)
axes[0].set_xlabel('结构类型')
axes[0].set_ylabel('相关系数')
axes[0].set_title('三种结构的条件独立性模式')
axes[0].set_xticks(x)
axes[0].set_xticklabels(structures)
axes[0].legend()
axes[0].grid(True, alpha=0.3)
# 链式结构散点图
axes[1].scatter(data['chain'][0], data['chain'][2], alpha=0.5)
axes[1].set_xlabel('A')
axes[1].set_ylabel('C')
axes[1].set_title(f'链式: A和C相关 (r={corr_chain_uncond:.3f})')
axes[1].grid(True, alpha=0.3)
# 对撞结构散点图
axes[2].scatter(data['collider'][0], data['collider'][2], alpha=0.5)
axes[2].set_xlabel('A')
axes[2].set_ylabel('C')
axes[2].set_title(f'对撞: A和C独立 (r={corr_collider_uncond:.3f})')
axes[2].grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
因果图的核心概念总结
概念 | 定义 | 因果意义 | 例子 |
---|---|---|---|
节点 | 图中的变量 | 因果机制中的实体 | 教育、收入、能力 |
有向边 | 变量间的因果关系 | 直接因果影响 | 教育→收入 |
路径 | 连接的边序列 | 因果路径或非因果路径 | 教育→技能→收入 |
d-分离 | 图的分离准则 | 条件独立性的图形判断 | 以中间变量为条件 |
对撞节点 | 两个箭头指向的节点 | 条件独立性的特殊情况 | 教育→工作←家庭背景 |
III. 结构因果模型的数学基础
结构方程模型
结构因果模型通过结构方程来表达变量间的因果关系。每个方程表示一个因果机制:
X = fₓ(Pa(X), εₓ)
其中Pa(X)是X的父节点(直接原因),εₓ是外生变量(未被模型解释的因素)。
# 结构因果模型实现
class StructuralCausalModel:
"""结构因果模型类"""
def __init__(self):
self.equations = {}
self.variables = {}
def add_equation(self, variable, equation_func, parents):
"""
添加结构方程
参数:
- variable: 因变量
- equation_func: 方程函数 f(pa1, pa2, ..., noise)
- parents: 父变量列表
"""
self.equations[variable] = {
'function': equation_func,
'parents': parents
}
def generate_data(self, n_samples, noise_distributions):
"""
从SCM生成数据
参数:
- n_samples: 样本量
- noise_distributions: 噪声项分布字典
"""
data = {}
# 确保外生变量先生成
sorted_vars = self.topological_sort()
for var in sorted_vars:
if var in self.equations:
eq_info = self.equations[var]
parents_data = [data[parent] for parent in eq_info['parents']]
# 生成噪声
noise = noise_distributions[var](n_samples)
# 应用结构方程
data[var] = eq_info['function'](*parents_data, noise)
else:
# 外生变量
data[var] = noise_distributions[var](n_samples)
return pd.DataFrame(data)
def topological_sort(self):
"""拓扑排序,确保父节点在子节点之前"""
visited = set()
stack = []
def visit(variable):
if variable not in visited:
visited.add(variable)
if variable in self.equations:
for parent in self.equations[variable]['parents']:
visit(parent)
stack.append(variable)
for variable in list(self.equations.keys()):
visit(variable)
return stack
# 创建教育-收入SCM示例
def create_education_income_scm():
"""创建教育-收入的结构因果模型"""
scm = StructuralCausalModel()
# 定义结构方程
# 能力 (外生)
# 家庭背景 (外生)
# 教育 = f(能力, 家庭背景, ε_教育)
# 收入 = f(教育, 能力, ε_收入)
def education_eq(ability, family_bg, noise):
return 10 + 0.3 * ability + 0.4 * family_bg + noise
def income_eq(education, ability, noise):
return 20 + 0.5 * education + 0.3 * ability + noise
# 添加方程
scm.add_equation('education', education_eq, ['ability', 'family_bg'])
scm.add_equation('income', income_eq, ['education', 'ability'])
return scm
# 生成数据
scm = create_education_income_scm()
# 定义噪声分布
noise_dists = {
'ability': lambda n: np.random.normal(0, 1, n),
'family_bg': lambda n: np.random.normal(0, 1, n),
'education': lambda n: np.random.normal(0, 0.5, n),
'income': lambda n: np.random.normal(0, 1, n)
}
# 生成样本数据
n_samples = 1000
data = scm.generate_data(n_samples, noise_dists)
print("结构因果模型生成的数据:")
print(data.describe().round(3))
# 可视化SCM生成的数据关系
fig, axes = plt.subplots(2, 2, figsize=(12, 10))
# 教育 vs 收入
axes[0,0].scatter(data['education'], data['income'], alpha=0.5)
z = np.polyfit(data['education'], data['income'], 1)
p = np.poly1d(z)
axes[0,0].plot(data['education'], p(data['education']), "r-", linewidth=2)
axes[0,0].set_xlabel('教育年限')
axes[0,0].set_ylabel('收入')
axes[0,0].set_title('教育 vs 收入')
axes[0,0].text(0.05, 0.95, f'相关系数: {np.corrcoef(data["education"], data["income"])[0,1]:.3f}',
transform=axes[0,0].transAxes, fontsize=10, verticalalignment='top',
bbox=dict(boxstyle='round', facecolor='wheat', alpha=0.5))
# 能力 vs 教育
axes[0,1].scatter(data['ability'], data['education'], alpha=0.5)
z = np.polyfit(data['ability'], data['education'], 1)
p = np.poly1d(z)
axes[0,1].plot(data['ability'], p(data['ability']), "r-", linewidth=2)
axes[0,1].set_xlabel('能力')
axes[0,1].set_ylabel('教育年限')
axes[0,1].set_title('能力 vs 教育')
axes[0,1].text(0.05, 0.95, f'相关系数: {np.corrcoef(data["ability"], data["education"])[0,1]:.3f}',
transform=axes[0,1].transAxes, fontsize=10, verticalalignment='top',
bbox=dict(boxstyle='round', facecolor='wheat', alpha=0.5))
# 能力 vs 收入
axes[1,0].scatter(data['ability'], data['income'], alpha=0.5)
z = np.polyfit(data['ability'], data['income'], 1)
p = np.poly1d(z)
axes[1,0].plot(data['ability'], p(data['ability']), "r-", linewidth=2)
axes[1,0].set_xlabel('能力')
axes[1,0].set_ylabel('收入')
axes[1,0].set_title('能力 vs 收入')
axes[1,0].text(0.05, 0.95, f'相关系数: {np.corrcoef(data["ability"], data["income"])[0,1]:.3f}',
transform=axes[1,0].transAxes, fontsize=10, verticalalignment='top',
bbox=dict(boxstyle='round', facecolor='wheat', alpha=0.5))
# 家庭背景 vs 教育
axes[1,1].scatter(data['family_bg'], data['education'], alpha=0.5)
z = np.polyfit(data['family_bg'], data['education'], 1)
p = np.poly1d(z)
axes[1,1].plot(data['family_bg'], p(data['family_bg']), "r-", linewidth=2)
axes[1,1].set_xlabel('家庭背景')
axes[1,1].set_ylabel('教育年限')
axes[1,1].set_title('家庭背景 vs 教育')
axes[1,1].text(0.05, 0.95, f'相关系数: {np.corrcoef(data["family_bg"], data["education"])[0,1]:.3f}',
transform=axes[1,1].transAxes, fontsize=10, verticalalignment='top',
bbox=dict(boxstyle='round', facecolor='wheat', alpha=0.5))
plt.tight_layout()
plt.show()
干预与do-演算
在SCM框架中,干预(do-operator)与观察有本质区别:
- P(Y|X=x):观察到X=x时Y的分布
- P(Y|do(X=x)):干预设置X=x时Y的分布
# 干预模拟
def simulate_intervention(scm, intervention_var, intervention_value, n_samples, noise_dists):
"""模拟干预效果"""
intervention_data = {}
sorted_vars = scm.topological_sort()
for var in sorted_vars:
if var == intervention_var:
# 实施干预:固定该变量值
intervention_data[var] = np.full(n_samples, intervention_value)
elif var in scm.equations:
eq_info = scm.equations[var]
parents_data = [intervention_data[parent] for parent in eq_info['parents']]
noise = noise_dists[var](n_samples)
intervention_data[var] = eq_info['function'](*parents_data, noise)
else:
# 外生变量
intervention_data[var] = noise_dists[var](n_samples)
return pd.DataFrame(intervention_data)
# 比较观察与干预
print("比较观察与干预:")
# 观察到的关系
observed_effect = np.corrcoef(data['education'], data['income'])[0,1]
print(f"观察到的教育-收入相关性: {observed_effect:.3f}")
# 干预效果:随机分配教育年限
intervention_effects = []
education_levels = [10, 12, 14, 16] # 不同的教育干预水平
for edu_level in education_levels:
intervention_data = simulate_intervention(scm, 'education', edu_level, 500, noise_dists)
avg_income = intervention_data['income'].mean()
intervention_effects.append(avg_income)
print(f"干预 do(education={edu_level}) 的平均收入: {avg_income:.2f}")
# 可视化干预效果
plt.figure(figsize=(10, 6))
plt.plot(education_levels, intervention_effects, 'bo-', linewidth=2, markersize=8, label='干预效果')
plt.xlabel('教育年限 (干预设置)')
plt.ylabel('平均收入')
plt.title('教育对收入的因果效应: 干预分析')
plt.grid(True, alpha=0.3)
# 对比观察关系
observed_fit = np.polyfit(data['education'], data['income'], 1)
observed_line = np.poly1d(observed_fit)
edu_range = np.linspace(data['education'].min(), data['education'].max(), 100)
plt.plot(edu_range, observed_line(edu_range), 'r--', alpha=0.7, label='观察关系')
plt.legend()
plt.show()
SCM的核心要素
结构因果模型包含三个核心要素:
要素 | 符号表示 | 含义 | 例子 |
---|---|---|---|
外生变量 | U | 模型外部因素,未被解释 | 个人天赋、随机冲击 |
内生变量 | V | 模型内部变量,被其他变量决定 | 教育、收入、就业 |
结构方程 | fᵢ | 变量间的函数关系 | 收入 = f(教育, 能力, ε) |
干预 | do(X=x) | 外部设置变量值 | 强制教育政策 |
IV. 因果图的构建与应用
构建因果图的原则
构建有效的因果图需要遵循以下原则:
- 实质性知识驱动:基于领域知识而非数据挖掘
- 简洁性原则:包含所有相关变量,但避免不必要的复杂性
- 因果顺序性:考虑变量间的时间顺序和逻辑顺序
- 完整性检查:确保所有重要混淆变量都被包含
# 因果图构建工具
class CausalGraphBuilder:
"""因果图构建器"""
def __init__(self):
self.graph = nx.DiGraph()
self.variable_types = {} # 变量类型:treatment, outcome, confounder, mediator等
def add_variable(self, variable, var_type='other'):
"""添加变量"""
self.graph.add_node(variable)
self.variable_types[variable] = var_type
def add_causal_edge(self, cause, effect):
"""添加因果边"""
self.graph.add_edge(cause, effect)
def visualize_graph(self, title="因果图"):
"""可视化因果图"""
plt.figure(figsize=(10, 8))
# 根据变量类型设置颜色
node_colors = []
color_map = {
'treatment': 'lightcoral',
'outcome': 'lightgreen',
'confounder': 'lightblue',
'mediator': 'lightyellow',
'other': 'lightgray'
}
for node in self.graph.nodes():
node_colors.append(color_map.get(self.variable_types[node], 'lightgray'))
pos = nx.spring_layout(self.graph, seed=42)
nx.draw(self.graph, pos, with_labels=True, node_color=node_colors,
node_size=2000, arrowsize=20, font_size=10,
edge_color='gray', alpha=0.7)
# 添加图例
legend_elements = []
for var_type, color in color_map.items():
legend_elements.append(plt.Line2D([0], [0], marker='o', color='w',
markerfacecolor=color, markersize=10, label=var_type))
plt.legend(handles=legend_elements, loc='upper left')
plt.title(title)
plt.show()
def get_backdoor_paths(self, treatment, outcome):
"""获取后门路径"""
paths = []
for path in nx.all_simple_paths(self.graph, source=treatment, target=outcome):
# 检查是否为后门路径(包含指向treatment的边)
if len(path) >= 3:
first_edge = (path[1], path[0]) # 检查是否指向treatment
if first_edge in self.graph.edges():
paths.append(path)
return paths
def find_confounders(self, treatment, outcome):
"""寻找混淆变量"""
confounders = set()
# 查找treatment和outcome的共同原因
treatment_parents = set(self.graph.predecessors(treatment))
outcome_parents = set(self.graph.predecessors(outcome))
common_causes = treatment_parents.intersection(outcome_parents)
# 查找后门路径上的变量
backdoor_paths = self.get_backdoor_paths(treatment, outcome)
for path in backdoor_paths:
confounders.update(path[1:-1]) # 排除treatment和outcome
return list(confounders.union(common_causes))
def suggest_adjustment_sets(self, treatment, outcome):
"""建议调整集以满足后门准则"""
confounders = self.find_confounders(treatment, outcome)
# 基本的后门调整集
adjustment_sets = []
if confounders:
adjustment_sets.append(confounders)
return adjustment_sets
# 构建教育-收入的因果图示例
builder = CausalGraphBuilder()
# 添加变量
variables = [
('ability', 'confounder'),
('family_bg', 'confounder'),
('education', 'treatment'),
('skills', 'mediator'),
('network', 'mediator'),
('income', 'outcome')
]
for var, var_type in variables:
builder.add_variable(var, var_type)
# 添加因果边
causal_edges = [
('ability', 'education'),
('ability', 'skills'),
('ability', 'income'),
('family_bg', 'education'),
('family_bg', 'network'),
('education', 'skills'),
('education', 'network'),
('education', 'income'),
('skills', 'income'),
('network', 'income')
]
for cause, effect in causal_edges:
builder.add_causal_edge(cause, effect)
# 可视化因果图
builder.visualize_graph("教育对收入影响的因果图")
# 分析因果图
treatment = 'education'
outcome = 'income'
print(f"分析 {treatment} → {outcome} 的因果效应:")
confounders = builder.find_confounders(treatment, outcome)
print(f"发现的混淆变量: {confounders}")
adjustment_sets = builder.suggest_adjustment_sets(treatment, outcome)
print(f"建议的调整集: {adjustment_sets}")
backdoor_paths = builder.get_backdoor_paths(treatment, outcome)
print(f"后门路径: {backdoor_paths}")
后门准则与前门准则
在因果推断中,我们有两个重要的识别准则:
# 后门准则实现
def backdoor_criterion(graph, treatment, outcome, adjustment_set):
"""
检查调整集是否满足后门准则
"""
# 1. 阻断所有后门路径
# 2. 不包含treatment的后代
# 简化实现:检查调整集是否包含所有混淆变量
confounders = builder.find_confounders(treatment, outcome)
return set(confounders).issubset(set(adjustment_set))
# 前门准则示例
def demonstrate_frontdoor_criterion():
"""演示前门准则"""
# 创建吸烟-焦油-癌症的例子
frontdoor_builder = CausalGraphBuilder()
# 添加变量
frontdoor_builder.add_variable('smoking', 'treatment')
frontdoor_builder.add_variable('tar', 'mediator')
frontdoor_builder.add_variable('cancer', 'outcome')
frontdoor_builder.add_variable('genetics', 'confounder') # 未观测的混淆
# 添加因果边
frontdoor_builder.add_causal_edge('smoking', 'tar')
frontdoor_builder.add_causal_edge('tar', 'cancer')
frontdoor_builder.add_causal_edge('genetics', 'smoking')
frontdoor_builder.add_causal_edge('genetics', 'cancer')
# 可视化
frontdoor_builder.visualize_graph("前门准则示例: 吸烟-焦油-癌症")
print("前门准则分析:")
print("存在未观测混淆变量(genetics),后门准则无法直接使用")
print("但可以通过前门路径 smoking → tar → cancer 识别因果效应")
print("前门准则要求:")
print("1. 中介变量(tar)完全中介treatment(smoking)对outcome(cancer)的效应")
print("2. treatment(smoking)与中介变量(tar)之间无混淆")
print("3. 中介变量(tar)与outcome(cancer)之间的混淆都被阻断")
return frontdoor_builder
# 演示前门准则
frontdoor_example = demonstrate_frontdoor_criterion()
因果图的应用价值
因果图在因果推断中具有多重价值:
应用场景 | 因果图的作用 | 具体方法 |
---|---|---|
因果识别 | 确定可估计的因果效应 | 后门准则、前门准则、工具变量 |
研究设计 | 指导数据收集和变量测量 | 识别关键混淆变量、中介变量 |
敏感性分析 | 评估未观测混淆的影响 | 评估因果结论的稳健性 |
模型验证 | 检验因果假设的合理性 | d-分离检验、条件独立性检验 |
V. 实例分析:教育回报的因果研究
研究背景与问题设定
我们将应用因果图分析教育对收入的影响这个经典问题。传统回归分析可能产生有偏估计,因为存在多个混淆变量和中介变量。
构建完整的因果图
# 构建完整的教育回报因果图
def create_education_return_graph():
"""创建教育回报的完整因果图"""
builder = CausalGraphBuilder()
# 添加所有相关变量
variables = [
('genetic_ability', 'confounder'), # 遗传能力
('family_income', 'confounder'), # 家庭收入
('parent_education', 'confounder'), # 父母教育
('neighborhood', 'confounder'), # 居住社区
('high_school_quality', 'confounder'), # 高中质量
('motivation', 'confounder'), # 个人动机
('education', 'treatment'), # 教育年限
('college_quality', 'mediator'), # 大学质量
('academic_skills', 'mediator'), # 学术技能
('job_skills', 'mediator'), # 工作技能
('professional_network', 'mediator'), # 专业网络
('signaling', 'mediator'), # 信号效应
('occupation', 'mediator'), # 职业选择
('work_experience', 'mediator'), # 工作经验
('income', 'outcome') # 收入
]
for var, var_type in variables:
builder.add_variable(var, var_type)
# 添加因果边
causal_edges = [
# 混淆变量影响教育
('genetic_ability', 'education'),
('family_income', 'education'),
('parent_education', 'education'),
('neighborhood', 'education'),
('high_school_quality', 'education'),
('motivation', 'education'),
# 混淆变量影响收入
('genetic_ability', 'income'),
('family_income', 'income'),
('parent_education', 'income'),
('neighborhood', 'income'),
('motivation', 'income'),
# 教育影响中介变量
('education', 'college_quality'),
('education', 'academic_skills'),
('education', 'job_skills'),
('education', 'professional_network'),
('education', 'signaling'),
('education', 'occupation'),
# 中介变量影响收入
('college_quality', 'income'),
('academic_skills', 'income'),
('job_skills', 'income'),
('professional_network', 'income'),
('signaling', 'income'),
('occupation', 'income'),
('work_experience', 'income'),
# 其他关系
('education', 'work_experience'), # 更多教育可能延迟工作
('occupation', 'work_experience'), # 职业影响工作经验积累
]
for cause, effect in causal_edges:
builder.add_causal_edge(cause, effect)
return builder
# 创建完整因果图
education_builder = create_education_return_graph()
# 由于变量较多,我们分层可视化
def visualize_layered_graph(builder, treatment, outcome):
"""分层可视化因果图"""
plt.figure(figsize=(14, 10))
# 定义层次
confounders = [var for var, t in builder.variable_types.items() if t == 'confounder']
treatments = [var for var, t in builder.variable_types.items() if t == 'treatment']
mediators = [var for var, t in builder.variable_types.items() if t == 'mediator']
outcomes = [var for var, t in builder.variable_types.items() if t == 'outcome']
# 创建层次布局
pos = {}
# 混淆变量在左侧
for i, var in enumerate(confounders):
pos[var] = (0, i - len(confounders)/2)
# 处理变量在中间
for i, var in enumerate(treatments):
pos[var] = (2, 0)
# 中介变量在右上方和右下方
for i, var in enumerate(mediators):
row = i % 2
col = i // 2
pos[var] = (4 + col * 0.5, 1 - row * 2)
# 结果变量在最右侧
for i, var in enumerate(outcomes):
pos[var] = (8, 0)
# 绘制图形
node_colors = []
color_map = {
'treatment': 'lightcoral',
'outcome': 'lightgreen',
'confounder': 'lightblue',
'mediator': 'lightyellow'
}
for node in builder.graph.nodes():
node_colors.append(color_map.get(builder.variable_types[node], 'lightgray'))
nx.draw(builder.graph, pos, with_labels=True, node_color=node_colors,
node_size=1500, arrowsize=15, font_size=8, alpha=0.8,
edge_color='gray')
plt.title("教育回报的完整因果图 (分层显示)")
plt.show()
# 分层可视化
visualize_layered_graph(education_builder, 'education', 'income')
# 分析因果识别
print("教育回报的因果识别分析:")
print("=" * 50)
confounders = education_builder.find_confounders('education', 'income')
print(f"主要混淆变量 ({len(confounders)}个):")
for conf in confounders:
print(f" - {conf}")
adjustment_sets = education_builder.suggest_adjustment_sets('education', 'income')
print(f"\n最小充分调整集: {adjustment_sets[0] if adjustment_sets else 'None'}")
print(f"\n因果识别挑战:")
print("1. 许多混淆变量(如遗传能力、动机)难以观测")
print("2. 存在复杂的中介机制")
print("3. 需要工具变量或自然实验来识别因果效应")
数据生成与因果效应估计
基于因果图,我们可以生成模拟数据并估计因果效应。
# 基于因果图生成数据
def generate_education_data(n_samples=5000):
"""基于教育回报因果图生成模拟数据"""
np.random.seed(2024)
data = {}
# 外生变量(混淆变量)
data['genetic_ability'] = np.random.normal(0, 1, n_samples)
data['family_income'] = np.random.normal(0, 1, n_samples)
data['parent_education'] = np.random.normal(0, 1, n_samples)
data['neighborhood'] = np.random.normal(0, 1, n_samples)
data['high_school_quality'] = np.random.normal(0, 1, n_samples)
data['motivation'] = np.random.normal(0, 1, n_samples)
# 教育决策(内生变量)
education_prob = 1 / (1 + np.exp(-(
-2.0 +
0.5 * data['genetic_ability'] +
0.4 * data['family_income'] +
0.6 * data['parent_education'] +
0.3 * data['neighborhood'] +
0.4 * data['high_school_quality'] +
0.5 * data['motivation']
)))
data['education'] = np.random.binomial(1, education_prob) # 0=高中, 1=大学
# 中介变量
data['college_quality'] = (
0.7 * data['education'] +
0.3 * data['high_school_quality'] +
np.random.normal(0, 0.5, n_samples)
)
data['academic_skills'] = (
0.6 * data['education'] +
0.4 * data['genetic_ability'] +
0.3 * data['motivation'] +
np.random.normal(0, 0.5, n_samples)
)
data['job_skills'] = (
0.5 * data['education'] +
0.3 * data['academic_skills'] +
0.2 * data['motivation'] +
np.random.normal(0, 0.5, n_samples)
)
# 收入生成(结果变量)
base_income = 30000
data['income'] = (
base_income +
8000 * data['education'] + # 真实因果效应
3000 * data['genetic_ability'] +
2000 * data['family_income'] +
1500 * data['parent_education'] +
1000 * data['neighborhood'] +
500 * data['motivation'] +
2000 * data['college_quality'] +
3000 * data['academic_skills'] +
2500 * data['job_skills'] +
np.random.normal(0, 5000, n_samples)
)
return pd.DataFrame(data)
# 生成数据
education_data = generate_education_data()
print("教育回报模拟数据描述:")
print(f"样本量: {len(education_data)}")
print(f"大学教育比例: {education_data['education'].mean():.3f}")
print(f"大学组平均收入: {education_data[education_data['education']==1]['income'].mean():.2f}")
print(f"高中组平均收入: {education_data[education_data['education']==0]['income'].mean():.2f}")
print(f"简单收入差异: {education_data[education_data['education']==1]['income'].mean() - education_data[education_data['education']==0]['income'].mean():.2f}")
print(f"真实因果效应: 8000.00")
# 不同调整策略的估计比较
def compare_adjustment_strategies(data, treatment, outcome, confounders):
"""比较不同调整策略的估计结果"""
results = []
# 1. 无调整
model_naive = sm.OLS(data[outcome], sm.add_constant(data[treatment])).fit()
results.append({
'method': '无调整',
'adjustment_set': '无',
'estimate': model_naive.params[treatment],
'bias': model_naive.params[treatment] - 8000
})
# 2. 部分调整(可观测混淆)
observable_confounders = ['family_income', 'parent_education', 'neighborhood', 'high_school_quality']
X_partial = sm.add_constant(data[[treatment] + observable_confounders])
model_partial = sm.OLS(data[outcome], X_partial).fit()
results.append({
'method': '部分调整',
'adjustment_set': str(observable_confounders),
'estimate': model_partial.params[treatment],
'bias': model_partial.params[treatment] - 8000
})
# 3. 工具变量方法(模拟)
# 假设我们有到大学距离作为工具变量
data['distance_to_college'] = np.random.normal(0, 1, len(data))
# 简化IV估计
first_stage = sm.OLS(data[treatment], sm.add_constant(data[['distance_to_college'] + observable_confounders])).fit()
data['education_hat'] = first_stage.predict()
second_stage = sm.OLS(data[outcome], sm.add_constant(data[['education_hat'] + observable_confounders])).fit()
results.append({
'method': '工具变量',
'adjustment_set': 'distance_to_college + 可观测变量',
'estimate': second_stage.params['education_hat'],
'bias': second_stage.params['education_hat'] - 8000
})
return pd.DataFrame(results)
# 比较不同方法
comparison_results = compare_adjustment_strategies(education_data, 'education', 'income', [])
print("\n不同调整策略的估计比较:")
print(comparison_results.round(2))
# 可视化比较结果
fig, axes = plt.subplots(1, 2, figsize=(12, 5))
# 估计值比较
methods = comparison_results['method']
estimates = comparison_results['estimate']
biases = comparison_results['bias']
axes[0].bar(methods, estimates, color=['red', 'orange', 'green'], alpha=0.7)
axes[0].axhline(y=8000, color='blue', linestyle='--', linewidth=2, label='真实效应')
axes[0].set_ylabel('教育回报估计')
axes[0].set_title('不同方法的因果效应估计')
axes[0].legend()
axes[0].tick_params(axis='x', rotation=45)
# 偏差比较
axes[1].bar(methods, np.abs(biases), color=['red', 'orange', 'green'], alpha=0.7)
axes[1].set_ylabel('绝对偏差')
axes[1].set_title('估计偏差比较')
axes[1].tick_params(axis='x', rotation=45)
plt.tight_layout()
plt.show()
实例分析洞察
通过因果图分析,我们获得了以下重要洞察:
- 混淆变量的重要性:遗传能力和动机等未观测变量导致传统估计向上偏误
- 调整策略的选择:仅调整可观测变量不足以消除所有偏误
- 工具变量的价值:在存在未观测混淆时,工具变量方法能提供更接近真实的估计
- 机制理解:教育通过多种路径(技能、网络、信号)影响收入
VI. 因果发现算法
从数据到因果图
虽然因果图通常基于先验知识构建,但我们可以使用因果发现算法从数据中学习因果结构。
# 因果发现算法实现
class CausalDiscovery:
"""因果发现算法"""
def __init__(self):
self.learned_graph = None
def pc_algorithm(self, data, alpha=0.05):
"""
PC算法(Peter-Clark算法)
基于条件独立性测试学习因果图
"""
import itertools
from scipy.stats import pearsonr
from sklearn.preprocessing import StandardScaler
variables = data.columns.tolist()
n_vars = len(variables)
# 标准化数据
scaler = StandardScaler()
data_scaled = pd.DataFrame(scaler.fit_transform(data), columns=variables)
# 初始化完全连接的无向图
graph = np.ones((n_vars, n_vars)) - np.eye(n_vars)
separation_sets = [[set() for _ in range(n_vars)] for _ in range(n_vars)]
# 阶段1: 学习骨架(无向图)
l = 0 # 条件集大小
while True:
any_edge_removed = False
for i, j in itertools.combinations(range(n_vars), 2):
if graph[i, j] == 1: # 如果i-j有边
# 获取i的邻接点(排除j)
adj_i = [k for k in range(n_vars) if graph[i, k] == 1 and k != j]
if len(adj_i) >= l:
for S in itertools.combinations(adj_i, l):
# 条件独立性检验
cond_vars = [variables[k] for k in S]
p_value = self.conditional_independence_test(
data_scaled, variables[i], variables[j], cond_vars
)
if p_value > alpha: # 独立
graph[i, j] = graph[j, i] = 0
separation_sets[i][j] = set(S)
separation_sets[j][i] = set(S)
any_edge_removed = True
break
l += 1
if not any_edge_removed or l > n_vars - 2:
break
# 阶段2: 定向边(对撞结构识别)
# 简化实现:识别对撞结构
for i, j, k in itertools.combinations(range(n_vars), 3):
if (graph[i, j] == 1 and graph[j, k] == 1 and graph[i, k] == 0 and
j not in separation_sets[i][k]):
# 发现对撞结构 i → j ← k
# 在实际PC算法中会有更复杂的定向规则
pass
# 转换为networkx图
learned_graph = nx.Graph()
for var in variables:
learned_graph.add_node(var)
for i in range(n_vars):
for j in range(i+1, n_vars):
if graph[i, j] == 1:
learned_graph.add_edge(variables[i], variables[j])
self.learned_graph = learned_graph
return learned_graph
def conditional_independence_test(self, data, var1, var2, cond_vars):
"""条件独立性检验(使用偏相关)"""
if not cond_vars:
# 无条件相关
corr, p_value = pearsonr(data[var1], data[var2])
return p_value
else:
# 偏相关
# 对var1和var2分别对cond_vars回归,然后检验残差的相关性
X_cond = data[cond_vars]
model1 = LinearRegression().fit(X_cond, data[var1])
resid1 = data[var1] - model1.predict(X_cond)
model2 = LinearRegression().fit(X_cond, data[var2])
resid2 = data[var2] - model2.predict(X_cond)
corr, p_value = pearsonr(resid1, resid2)
return p_value
def visualize_learned_graph(self, true_graph=None):
"""可视化学习到的图"""
plt.figure(figsize=(10, 8))
if self.learned_graph:
pos = nx.spring_layout(self.learned_graph, seed=42)
nx.draw(self.learned_graph, pos, with_labels=True, node_color='lightblue',
node_size=1500, font_size=10, alpha=0.8)
plt.title("学习到的因果图骨架")
else:
plt.text(0.5, 0.5, "请先运行PC算法", ha='center', va='center', fontsize=16)
plt.show()
# 使用教育数据的子集进行因果发现
discovery_data = education_data[['genetic_ability', 'family_income', 'education', 'income']]
print("运行因果发现算法...")
discoverer = CausalDiscovery()
learned_graph = discoverer.pc_algorithm(discovery_data, alpha=0.01)
print("学习到的图边:")
print(list(learned_graph.edges()))
# 可视化学习到的图
discoverer.visualize_learned_graph()
# 比较学习图与真实图
print("\n真实因果结构:")
print("genetic_ability → education")
print("genetic_ability → income")
print("family_income → education")
print("family_income → income")
print("education → income")
因果发现的方法比较
不同的因果发现算法各有优缺点:
算法 | 原理 | 优点 | 缺点 | 适用场景 |
---|---|---|---|---|
PC算法 | 条件独立性检验 | 无需参数假设,理论完备 | 对高维数据计算复杂,需要大样本 | 中等维度,连续变量 |
LiNGAM | 非高斯噪声识别 | 能识别因果方向,不需要干预 | 依赖非高斯假设 | 满足非高斯假设的数据 |
因果森林 | 基于随机森林 | 能处理复杂非线性关系 | 主要是关联性发现 | 预测性建模 |
贝叶斯网络 | 概率图模型 | 完整的不确定性量化 | 计算复杂,先验敏感 | 小到中等维度数据 |
VII. 实践指南与最佳实践
因果图构建的最佳实践
基于理论研究和实践经验,我们总结出因果图构建的最佳实践:
# 因果图验证工具
def validate_causal_graph(builder, data, treatment, outcome):
"""验证因果图的合理性"""
validation_results = {
'warnings': [],
'suggestions': [],
'tests_passed': []
}
# 1. 检查d-分离蕴含的条件独立性
print("进行d-分离检验...")
# 简化示例:检查对撞结构
colliders = []
for node in builder.graph.nodes():
parents = list(builder.graph.predecessors(node))
if len(parents) >= 2:
colliders.append((node, parents))
if colliders:
validation_results['tests_passed'].append(f"发现 {len(colliders)} 个对撞结构")
# 2. 检查是否有明显的变量缺失
potential_confounders = ['ability', 'motivation', 'family_background', 'early_life']
missing_vars = [var for var in potential_confounders if var not in builder.graph.nodes()]
if missing_vars:
validation_results['warnings'].append(f"可能缺失重要混淆变量: {missing_vars}")
# 3. 检查因果顺序合理性
treatment_descendants = list(nx.descendants(builder.graph, treatment))
if outcome not in treatment_descendants:
validation_results['warnings'].append("结果变量不是处理变量的后代,请检查因果方向")
# 4. 建议敏感性分析
validation_results['suggestions'].append("进行未观测混淆的敏感性分析")
validation_results['suggestions'].append("考虑不同图结构的稳健性检验")
return validation_results
# 验证教育回报因果图
validation = validate_causal_graph(education_builder, education_data, 'education', 'income')
print("因果图验证结果:")
print("=" * 50)
if validation['warnings']:
print("警告:")
for warning in validation['warnings']:
print(f" ⚠ {warning}")
if validation['tests_passed']:
print("\n通过的检验:")
for test in validation['tests_passed']:
print(f" ✓ {test}")
if validation['suggestions']:
print("\n建议:")
for suggestion in validation['suggestions']:
print(f" 💡 {suggestion}")
# 因果图构建检查表
def causal_graph_checklist():
"""因果图构建检查表"""
checklist = {
'理论基础': [
'基于领域知识和理论构建图',
'明确变量间的因果机制',
'考虑变量测量和时间顺序',
'参考相关文献和专家意见'
],
'图结构': [
'包含所有相关变量',
'正确设定因果方向',
'识别所有混淆路径',
'检查对撞结构合理性'
],
'实践考虑': [
'变量是否可观测和测量',
'考虑实际数据可用性',
'评估模型复杂性',
'准备敏感性分析'
],
'验证': [
'进行d-分离检验',
'检查条件独立性',
'比较不同图结构',
'结合实质性知识验证'
]
}
return checklist
# 显示检查表
checklist = causal_graph_checklist()
print("\n因果图构建检查表:")
print("=" * 50)
for category, items in checklist.items():
print(f"\n{category}:")
for i, item in enumerate(items, 1):
print(f" {i}. {item}")
常见陷阱与避免策略
因果图分析中常见的陷阱及避免方法:
陷阱 | 描述 | 后果 | 避免策略 |
---|---|---|---|
变量遗漏 | 遗漏重要混淆变量 | 因果估计有偏 | 基于理论全面考虑,进行敏感性分析 |
错误方向 | 因果方向设定错误 | 完全错误的结论 | 考虑时间顺序,进行因果发现检验 |
过度控制 | 控制中介变量或对撞变量 | 引入偏误,丢失信息 | 理解不同变量的因果地位 |
测量误差 | 变量测量不准确 | 估计不精确,检验功效降低 | 使用可靠测量工具,考虑测量模型 |
循环因果 | 存在双向因果关系 | 模型识别问题 | 明确时间顺序,使用动态模型 |
- 点赞
- 收藏
- 关注作者
评论(0)