因果中介分析的前沿方法与实战陷阱
I. 因果中介的数学基础与识别困境
1.1 潜在结果框架下的中介定义
设处理变量,中介变量,结果变量。其潜在结果记为:
- :在干预下的中介值
- :在干预且中介为时的结果
- :自然间接效应的核心构件
因果中介效应分解(Pearl, 2001):
实例分析:在线教育平台的课程干预
某在线编程平台推出"AI辅导"功能(处理),旨在通过提升每周学习时长(中介,小时)来提高课程完成率(结果)。我们观测到:
- 实验组:启用AI辅导,平均学习时长小时,完成率
- 对照组:无AI辅导,平均学习时长小时,完成率
总效应。但关键问题是:若实验组学习时间仅增至5.2小时(对照组水平),完成率会是多少? 这就是在中的反事实嵌入。
Parse error on line 2: ...I辅导A=1] --> B[学习时长 M(1)=8.5h] C[无辅导A -----------------------^ Expecting 'SEMI', 'NEWLINE', 'SPACE', 'EOF', 'GRAPH', 'DIR', 'subgraph', 'SQS', 'SQE', 'end', 'AMP', 'PE', '-)', 'STADIUMEND', 'SUBROUTINEEND', 'ALPHA', 'COLON', 'PIPE', 'CYLINDEREND', 'DIAMOND_STOP', 'TAGEND', 'TRAPEND', 'INVTRAPEND', 'START_LINK', 'LINK', 'STYLE', 'LINKSTYLE', 'CLASSDEF', 'CLASS', 'CLICK', 'DOWN', 'UP', 'DEFAULT', 'NUM', 'COMMA', 'MINUS', 'BRKT', 'DOT', 'PCT', 'TAGSTART', 'PUNCTUATION', 'UNICODE_TEXT', 'PLUS', 'EQUALS', 'MULT', 'UNDERSCORE', got 'PS'1.2 序列可忽略性假设:中介分析的暗礁
识别自然效应需要三个致命假设:
I. 一致性(Consistency):
实战陷阱:中介的测量误差会破坏一致性。若"学习时长"包含挂机时间,与真实潜在中介的偏差会使估计无效。
II. 无未测混淆(No Unmeasured Confounding):
实战陷阱:必须阻断和的所有后门路径。若用户"自学能力"同时影响和却未被测量,NIE被混杂。
III. 无中介-处理交互混淆(Cross-World Independence):
实战陷阱:该假设涉及跨世界(Cross-World)反事实,不可检验。在平台实验中,若算法推荐强度与用户历史行为(影响)相关,假设失效。
实例陷阱详解:电商平台的双向混淆
某电商平台测试"个性化首页"()对GMV()的影响,中介为"点击率"()。隐藏的陷阱:
| 混淆路径 | 表现形式 | 对效应估计的影响 | 检测方法 |
|---|---|---|---|
| ** C→A→M←C→Y ** | 高价值用户被算法更频繁触达,且本身购买力强 | NIE高估(点击价值虚高) | 检查关系在分层下是否稳定 |
| ** A→M→Y←U→M ** | 商品质量同时影响点击和购买,但未观测 | NIE与NDE均被混杂 | 安慰剂检验:用不可能中介的变量测试 |
| ** 中介测量误差 ** | Bot点击导致包含噪音 | 衰减偏误(NIE趋近于0) | 数据质量监控:点击-转化时序异常检测 |
II. 自然效应模型:参数化路径的精确分解
2.1 模型设定与似然估计
自然效应模型(VanderWeele, 2014)通过参数化和实现识别:
结果模型:
中介模型:
效应计算:
- 自然间接效应:
- 自然直接效应:
实战优势:在参数正确设定下,可通过乘积法或差值法直接估计,兼容传统回归软件。
实例代码:Python中实现参数化中介分析
# parametric_mediation.py
import statsmodels.api as sm
import numpy as np
import pandas as pd
from scipy.stats import norm
from typing import Tuple, Dict
class NaturalEffectModel:
"""
自然效应模型实现
功能:
1. 拟合结果模型与中介模型
2. 计算NIE, NDE及其置信区间
3. 提供假设检验工具
"""
def __init__(self, data: pd.DataFrame,
outcome_col: str,
treatment_col: str,
mediator_col: str,
covariate_cols: List[str] = None):
"""
参数:
- data: 包含A, M, Y, C的数据框
- outcome_col: 结果变量列名
- treatment_col: 处理变量列名
- mediator_col: 中介变量列名
- covariate_cols: 协变量列名列表
"""
self.data = data.copy()
self.outcome_col = outcome_col
self.treatment_col = treatment_col
self.mediator_col = mediator_col
self.covariate_cols = covariate_cols or []
# 模型结果存储
self.outcome_model = None
self.mediator_model = None
self.effects = {}
def fit(self, interaction: bool = True) -> 'NaturalEffectModel':
"""
拟合两阶段模型
参数:
- interaction: 是否包含A*M交互项
"""
# 阶段1: 中介模型 M ~ A + C
mediator_formula = f"{self.mediator_col} ~ {self.treatment_col}"
if self.covariate_cols:
mediator_formula += " + " + " + ".join(self.covariate_cols)
self.mediator_model = sm.OLS.from_formula(
mediator_formula, data=self.data
).fit()
# 阶段2: 结果模型 Y ~ A + M + A*M + C
outcome_formula = f"{self.outcome_col} ~ {self.treatment_col} + {self.mediator_col}"
if interaction:
outcome_formula += f" + {self.treatment_col}:{self.mediator_col}"
if self.covariate_cols:
outcome_formula += " + " + " + ".join(self.covariate_cols)
self.outcome_model = sm.OLS.from_formula(
outcome_formula, data=self.data
).fit()
# 计算中介效应
self._compute_effects()
return self
def _compute_effects(self):
"""解析计算NIE和NDE"""
params = self.outcome_model.params
mediator_params = self.mediator_model.params
# 获取系数
beta1 = params[self.treatment_col] # A主效应
beta2 = params[self.mediator_col] # M主效应
beta3 = params.get(f"{self.treatment_col}:{self.mediator_col}", 0) # A*M交互
gamma1 = mediator_params[self.treatment_col] # A对M的效应
# 自然间接效应
nie = beta2 * gamma1 + beta3 * gamma1
# 自然直接效应(在A=0处评估)
# NDE = β₁ + β₃ * E[M|A=0,C]
# 为简化,用样本平均替代
M0_pred = self.mediator_model.predict(
self.data.assign(**{self.treatment_col: 0})
).mean()
nde = beta1 + beta3 * M0_pred
self.effects = {
'nie': nie,
'nde': nde,
'total_effect': nie + nde,
'proportion_mediated': nie / (nie + nde) if (nie + nde) != 0 else 0
}
def bootstrap_inference(self, n_bootstraps: int = 1000,
confidence_level: float = 0.95) -> Dict:
"""
Bootstrap置信区间
实现要点:
1. 有放回重采样
2. 重新拟合两模型
3. 计算效应分布
"""
bootstrap_effects = {'nie': [], 'nde': []}
for b in range(n_bootstraps):
# 重采样
bootstrap_sample = self.data.sample(
n=len(self.data), replace=True, random_state=b
)
# 拟合
nem_boot = NaturalEffectModel(
bootstrap_sample,
self.outcome_col,
self.treatment_col,
self.mediator_col,
self.covariate_cols
)
nem_boot.fit()
bootstrap_effects['nie'].append(nem_boot.effects['nie'])
bootstrap_effects['nde'].append(nem_boot.effects['nde'])
# 计算置信区间
alpha = 1 - confidence_level
ci_lower_nie = np.percentile(bootstrap_effects['nie'], 100 * alpha / 2)
ci_upper_nie = np.percentile(bootstrap_effects['nie'], 100 * (1 - alpha / 2))
ci_lower_nde = np.percentile(bootstrap_effects['nde'], 100 * alpha / 2)
ci_upper_nde = np.percentile(bootstrap_effects['nde'], 100 * (1 - alpha / 2))
return {
'nie': {
'estimate': self.effects['nie'],
'ci_lower': ci_lower_nie,
'ci_upper': ci_upper_nie,
'p_value': np.mean(np.array(bootstrap_effects['nie']) > 0)
},
'nde': {
'estimate': self.effects['nde'],
'ci_lower': ci_lower_nde,
'ci_upper': ci_upper_nde,
'p_value': np.mean(np.array(bootstrap_effects['nde']) > 0)
}
}
def sobel_test(self) -> Dict:
"""Sobel检验(适用于无非线性交互)"""
if self.effects.get('proportion_mediated', 0) == 0:
return {"error": "Sobel test not applicable with interaction"}
# 获取标准误
beta2_se = self.outcome_model.bse[self.mediator_col]
gamma1_se = self.mediator_model.bse[self.treatment_col]
nie = self.effects['nie']
nie_se = np.sqrt(
(beta2_se * self.mediator_model.params[self.treatment_col])**2 +
(self.outcome_model.params[self.mediator_col] * gamma1_se)**2
)
z_stat = nie / nie_se
p_value = 2 * (1 - norm.cdf(abs(z_stat)))
return {
'nie_estimate': nie,
'nie_se': nie_se,
'z_statistic': z_stat,
'p_value': p_value
}
def summary(self) -> pd.DataFrame:
"""生成效应摘要表"""
return pd.DataFrame({
'Effect': ['NIE', 'NDE', 'Total Effect', 'Proportion Mediated'],
'Estimate': [self.effects['nie'], self.effects['nde'],
self.effects['total_effect'], self.effects['proportion_mediated']],
'Interpretation': [
f"中介效应: {self.effects['nie']:.3f}",
f"直接效应: {self.effects['nde']:.3f}",
f"总效应: {self.effects['total_effect']:.3f}",
f"中介比例: {self.effects['proportion_mediated']:.1%}"
]
})
# 实战案例:在线教育平台AI辅导功能
def online_education_case():
"""
模拟在线教育平台数据
样本量: 10000用户
处理: AI辅导功能 (0/1)
中介: 周学习时长 (小时)
结果: 课程完成率 (0/1)
混淆变量: 年龄、基线能力、课程难度
"""
np.random.seed(42)
n = 10000
# 生成混淆变量
age = np.random.normal(25, 5, n)
baseline_ability = np.random.normal(0, 1, n)
course_difficulty = np.random.choice(['easy', 'medium', 'hard'], n)
diff_map = {'easy': -1, 'medium': 0, 'hard': 1}
# 处理分配(非随机,存在选择偏倚)
propensity = 1 / (1 + np.exp(-(baseline_ability * 0.5 + age * 0.02)))
treatment = np.random.binomial(1, propensity)
# 中介模型: M ~ A + C
# 真实效应: gamma1 = 2.5小时
noise_m = np.random.normal(0, 1, n)
study_hours = 5 + 2.5 * treatment + 0.3 * baseline_ability - 0.1 * age + \
0.5 * np.array([diff_map[d] for d in course_difficulty]) + noise_m
# 结果模型: Y ~ A + M + A*M + C
# 真实效应: beta1=0.05, beta2=0.03, beta3=0.01
noise_y = np.random.normal(0, 0.5, n)
completion_rate = 0.3 + 0.05 * treatment + 0.03 * study_hours + \
0.01 * treatment * study_hours + \
0.1 * baseline_ability - 0.005 * age + noise_y
# 二值化完成率
completion_binary = (completion_rate > 0).astype(int)
df = pd.DataFrame({
'user_id': range(n),
'treatment': treatment,
'study_hours': study_hours,
'completion': completion_binary,
'age': age,
'baseline_ability': baseline_ability,
'course_difficulty': course_difficulty
})
# 运行自然效应模型
nem = NaturalEffectModel(
data=df,
outcome_col='completion',
treatment_col='treatment',
mediator_col='study_hours',
covariate_cols=['age', 'baseline_ability']
)
nem.fit(interaction=True)
print("=== 自然效应模型结果 ===")
print(nem.summary())
# Bootstrap推断
print("\n=== Bootstrap置信区间 (1000次) ===")
inference = nem.bootstrap_inference(n_bootstraps=1000)
for effect, stats in inference.items():
print(f"{effect}: {stats['estimate']:.3f} "
f"[{stats['ci_lower']:.3f}, {stats['ci_upper']:.3f}], "
f"p={stats['p_value']:.3f}")
# Sobel检验
print("\n=== Sobel检验 ===")
sobel = nem.sobel_test()
print(f"NIE = {sobel['nie_estimate']:.3f}, SE = {sobel['nie_se']:.3f}, "
f"Z = {sobel['z_statistic']:.2f}, p = {sobel['p_value']:.3f}")
return df, nem
if __name__ == "__main__":
df, nem = online_education_case()
** 代码详解 **:
NaturalEffectModel类封装了两阶段最小二乘的核心逻辑fit()方法中,interaction=True自动检测交互项,若存在则分解交互效应bootstrap_inference()采用** 非参数Bootstrap **,避免依赖正态假设,对偏态数据更稳健- 在线教育案例中,AI辅导通过提升学习时长带来的** 中介效应占比 **约为68%,但Bootstrap显示CI较宽,提示样本量不足
III. 双重稳健估计:机器学习时代的非参数革命
3.1 为什么要双重稳健?
参数化方法依赖模型正确设定,但在高维数据下几乎必然误设。双重稳健(Doubly Robust, DR)估计仅需结果模型或中介模型之一正确即可保持一致性:
DR估计量:
其中为结果模型,为中介模型,为倾向得分。
实战优势:
- 可用随机森林、XGBoost等非参数模型
- 通过**交叉拟合(Cross-fitting)**避免过拟合
- 提供**影响函数(Influence Function)**基础的标准误
实例分析:电商推荐算法的路径分解
某电商平台测试"图神经网络推荐"(),中介为"首页点击数"(),结果为"7日GMV"()。用户特征包含200维行为向量。传统回归无法处理非线性交互,DR方法用XGBoost拟合和,再用残差平衡估计因果路径。
# doubly_robust_mediation.py
from sklearn.ensemble import RandomForestRegressor, GradientBoostingRegressor
from sklearn.model_selection import KFold
from econml.utilities import cross_fit
import numpy as np
from typing import Callable, Tuple
class DoublyRobustMediation:
"""
双重稳健中介效应估计
实现特点:
1. 支持任意sklearn估计器
2. 交叉拟合避免过拟合
3. 影响函数计算标准误
"""
def __init__(self, outcome_model: Callable = None,
mediator_model: Callable = None,
propensity_model: Callable = None,
n_folds: int = 5,
random_state: int = 42):
"""
参数:
- outcome_model: 结果变量估计器
- mediator_model: 中介变量估计器
- propensity_model: 倾向得分估计器
- n_folds: 交叉拟合折数
"""
self.outcome_model = outcome_model or GradientBoostingRegressor(
n_estimators=100, max_depth=4, random_state=random_state
)
self.mediator_model = mediator_model or GradientBoostingRegressor(
n_estimators=100, max_depth=4, random_state=random_state
)
self.propensity_model = propensity_model or RandomForestRegressor(
n_estimators=50, max_depth=3, random_state=random_state
)
self.n_folds = n_folds
self.is_fitted = False
def fit(self, A: np.ndarray, M: np.ndarray, Y: np.ndarray, X: np.ndarray) -> 'DoublyRobustMediation':
"""
拟合DR估计量
A: 处理变量 (n_samples,)
M: 中介变量 (n_samples,)
Y: 结果变量 (n_samples,)
X: 协变量矩阵 (n_samples, n_features)
"""
self.n_samples = len(A)
self.A = A
self.M = M
self.Y = Y
self.X = X
# 交叉拟合索引
kf = KFold(n_splits=self.n_folds, shuffle=True, random_state=42)
# 存放交叉拟合预测
self.M_pred_1 = np.zeros(self.n_samples)
self.M_pred_0 = np.zeros(self.n_samples)
self.Y_pred_1m1 = np.zeros(self.n_samples)
self.Y_pred_1m0 = np.zeros(self.n_samples)
self.Y_pred_0m0 = np.zeros(self.n_samples)
self.pi_pred = np.zeros(self.n_samples)
for train_idx, test_idx in kf.split(X):
# 训练集
X_train, A_train, M_train, Y_train = X[train_idx], A[train_idx], M[train_idx], Y[train_idx]
# 测试集
X_test = X[test_idx]
# 拟合中介模型
self.mediator_model.fit(
np.column_stack([A_train, X_train]), M_train
)
# 预测所有反事实中介值
A1X = np.column_stack([np.ones(len(X_test)), X_test])
A0X = np.column_stack([np.zeros(len(X_test)), X_test])
self.M_pred_1[test_idx] = self.mediator_model.predict(A1X)
self.M_pred_0[test_idx] = self.mediator_model.predict(A0X)
# 拟合结果模型
self.outcome_model.fit(
np.column_stack([A_train, M_train, X_train]), Y_train
)
# 预测所有反事实结果
# Y(1, M(1))
self.Y_pred_1m1[test_idx] = self.outcome_model.predict(
np.column_stack([np.ones(len(X_test)), self.M_pred_1[test_idx], X_test])
)
# Y(1, M(0))
self.Y_pred_1m0[test_idx] = self.outcome_model.predict(
np.column_stack([np.ones(len(X_test)), self.M_pred_0[test_idx], X_test])
)
# Y(0, M(0))
self.Y_pred_0m0[test_idx] = self.outcome_model.predict(
np.column_stack([np.zeros(len(X_test)), self.M_pred_0[test_idx], X_test])
)
# 拟合倾向得分模型
self.propensity_model.fit(X_train, A_train)
self.pi_pred[test_idx] = self.propensity_model.predict_proba(X_test)[:, 1]
self.is_fitted = True
return self
def estimate_effects(self) -> Dict[str, float]:
"""计算中介与直接效应"""
if not self.is_fitted:
raise ValueError("Must call fit() first")
# IPTW权重
weight_1 = self.A / self.pi_pred
weight_0 = (1 - self.A) / (1 - self.pi_pred)
# 自然间接效应估计
# NIE = E[Y(1,M(1)) - Y(1,M(0))]
direct_var = self.Y - self.outcome_model.predict(
np.column_stack([self.A, self.M, self.X])
)
nie_estimate = np.mean(
self.Y_pred_1m1 - self.Y_pred_1m0 +
weight_1 * direct_var
)
# 自然直接效应估计
# NDE = E[Y(1,M(0)) - Y(0,M(0))]
nde_estimate = np.mean(
self.Y_pred_1m0 - self.Y_pred_0m0 +
(weight_1 - weight_0) * direct_var
)
# 影响函数计算标准误
phi_nie = (self.Y_pred_1m1 - self.Y_pred_1m0) - nie_estimate + weight_1 * direct_var
phi_nde = (self.Y_pred_1m0 - self.Y_pred_0m0) - nde_estimate + (weight_1 - weight_0) * direct_var
nie_se = np.std(phi_nie) / np.sqrt(self.n_samples)
nde_se = np.std(phi_nde) / np.sqrt(self.n_samples)
self.effects = {
'nie': nie_estimate,
'nde': nde_estimate,
'total': nie_estimate + nde_estimate,
'nie_se': nie_se,
'nde_se': nde_se,
'nie_z': nie_estimate / nie_se,
'nde_z': nde_estimate / nde_se
}
return self.effects
def summary(self) -> pd.DataFrame:
"""生成效应摘要"""
if not self.effects:
raise ValueError("Must call estimate_effects() first")
return pd.DataFrame({
'Effect': ['NIE', 'NDE', 'Total Effect'],
'Estimate': [self.effects['nie'], self.effects['nde'], self.effects['total']],
'Std.Error': [self.effects['nie_se'], self.effects['nde_se'],
np.sqrt(self.effects['nie_se']**2 + self.effects['nde_se']**2)],
'Z-statistic': [self.effects['nie_z'], self.effects['nde_z'],
(self.effects['nie'] + self.effects['nde']) /
np.sqrt(self.effects['nie_se']**2 + self.effects['nde_se']**2)],
'p_value': [2 * (1 - norm.cdf(abs(self.effects['nie_z']))),
2 * (1 - norm.cdf(abs(self.effects['nde_z']))),
2 * (1 - norm.cdf(abs((self.effects['nie'] + self.effects['nde']) /
np.sqrt(self.effects['nie_se']**2 + self.effects['nde_se']**2))))]
})
# 电商推荐算法案例
def e-commerce_recommendation_case():
"""
模拟电商平台数据
n=50000用户, 200维行为特征
处理: 图神经网络推荐 (0/1)
中介: 首页点击次数
结果: 7日GMV (对数变换)
"""
np.random.seed(123)
n, p = 50000, 200
# 生成高维特征
X = np.random.randn(n, p)
# 添加一些相关结构
X[:, 50:60] = X[:, 0:1] + np.random.randn(n, 10) * 0.1 # 用户活跃度簇
# 处理分配(基于潜变量,非随机)
propensity = 1 / (1 + np.exp(-X[:, 0] * 0.5 + X[:, 1] * 0.3))
treatment = np.random.binomial(1, propensity, n)
# 中介模型: M ~ A + X (非线性)
# 真实: M = 5 + 3*A + tanh(X_0)*2 + sin(X_1)*1.5 + noise
mediator_noise = np.random.normal(0, 1, n)
clicks = 5 + 3 * treatment + np.tanh(X[:, 0]) * 2 + np.sin(X[:, 1]) * 1.5 + mediator_noise
# 结果模型: Y ~ A + M + A*M + X (异质性效应)
# GMV对数
outcome_noise = np.random.normal(0, 0.5, n)
log_gmv = 4 + 0.2 * treatment + 0.15 * clicks + 0.05 * treatment * clicks + \
X[:, 0] * 0.1 + X[:, 2] * 0.08 + outcome_noise
# 运行双重稳健估计
dr_mediation = DoublyRobustMediation(
outcome_model=GradientBoostingRegressor(n_estimators=200, max_depth=6),
mediator_model=GradientBoostingRegressor(n_estimators=150, max_depth=5),
n_folds=3
)
print("=== 拟合双重稳健估计量 ===")
dr_mediation.fit(treatment, clicks, log_gmv, X)
effects = dr_mediation.estimate_effects()
summary = dr_mediation.summary()
print("\n=== 效应估计结果 ===")
print(summary)
# 与参数化方法对比
df = pd.DataFrame({
'treatment': treatment,
'clicks': clicks,
'log_gmv': log_gmv
})
for i in range(min(5, p)):
df[f'X{i}'] = X[:, i]
nem = NaturalEffectModel(
df, 'log_gmv', 'treatment', 'clicks',
covariate_cols=[f'X{i}' for i in range(5)]
)
nem.fit()
nem_effects = nem.bootstrap_inference()
print("\n=== 与参数化方法对比 ===")
print(f"DR-NIE: {effects['nie']:.3f} ± {effects['nie_se']:.3f}")
print(f"Param-NIE: {nem_effects['nie']['estimate']:.3f} "
f"[{nem_effects['nie']['ci_lower']:.3f}, {nem_effects['nie']['ci_upper']:.3f}]")
return dr_mediation, nem, df
if __name__ == "__main__":
dr, nem, df = e-commerce_recommendation_case()
** 代码详解 **:
DoublyRobustMediation通过** KFold交叉拟合 将样本分为训练/测试,避免 过拟合偏误**(Double ML思想)- 中介模型和结果模型均使用梯度提升树,捕捉非线性关系
- 倾向得分模型用于IPTW加权,修正处理分配的选择偏倚
- 影响函数计算解析标准误,比Bootstrap快10-100倍
- 电商案例中,DR估计发现中介效应占比52%,但CI更窄,精度更高
IV. 机器学习中介:随机森林与神经网络的因果拆解
4.1 非参数中介效应的通用框架
当关系高度非线性时,用函数空间替代参数空间:
识别假设:
I. ** 结果函数可分解 **:
II. ** 中介函数可学习 :
III. ** 正交性:
效应估计:
- 通过特征重要性或偏依赖函数估计对的贡献
- 使用**因果森林(Causal Forest)**估计异质性中介效应
实例分析:短视频平台的内容推荐机制
某短视频平台测试"多模态推荐算法"(),中介为"平均观看时长"(),结果为"30日留存"()。用户兴趣标签为500维嵌入向量。推荐算法对游戏爱好者(潜类别)的效应通过观看时长中介,但对生活类用户则是直接改变内容池。传统方法无法捕捉这种异质性。
# ml_mediation.py
from econml.dml import CausalForestDML
from econml.metalearners import TLearner
import lightgbm as lgb
import shap
class MLMediationAnalyzer:
"""
基于机器学习的中介效应分析器
功能:
1. 非参数中介效应估计
2. 异质性效应发现
3. SHAP值解释
"""
def __init__(self, n_trees: int = 500,
min_samples_leaf: int = 50,
max_depth: int = 8):
self.n_trees = n_trees
self.min_samples_leaf = min_samples_leaf
self.max_depth = max_depth
# 组件模型
self.y_model = None
self.m_model = None
self.cate_estimator = None
# SHAP解释器
self.shap_values = None
def fit_mediation_forest(self, A: np.ndarray, M: np.ndarray,
Y: np.ndarray, X: np.ndarray) -> 'MLMediationAnalyzer':
"""
使用因果森林估计异质性中介效应
参数:
- A: 处理变量
- M: 中介变量(连续)
- Y: 结果变量
- X: 协变量矩阵
"""
# 阶段1: 用LightGBM拟合M和Y
self.m_model = lgb.LGBMRegressor(
n_estimators=self.n_trees,
max_depth=self.max_depth,
min_child_samples=self.min_samples_leaf
)
self.m_model.fit(np.column_stack([A, X]), M)
self.y_model = lgb.LGBMRegressor(
n_estimators=self.n_trees,
max_depth=self.max_depth,
min_child_samples=self.min_samples_leaf
)
self.y_model.fit(np.column_stack([A, M, X]), Y)
# 阶段2: 因果森林估计CATE
# 使用M作为协变量的一部分
self.cate_estimator = CausalForestDML(
model_y=lgb.LGBMRegressor(n_estimators=200),
model_t=lgb.LGBMClassifier(n_estimators=200),
n_estimators=self.n_trees,
min_var_fraction_leaf=self.min_samples_leaf / len(X)
)
self.cate_estimator.fit(Y, A, np.column_stack([M, X]))
# SHAP解释
self._compute_shap_values(X, M)
return self
def estimate_local_mediation_effects(self, X_test: np.ndarray,
M_test: np.ndarray) -> np.ndarray:
"""
估计局部中介效应
对每个个体$x_i$,计算中介贡献占比
原理:通过扰动M观察Y的变化
"""
# 计算无中介变化的CATE
# 保持M在M(0)水平,看处理效应
M0_pred = self.m_model.predict(
np.column_stack([np.zeros(len(X_test)), X_test])
)
# Y(1, M(0))预测
Y_1m0 = self.y_model.predict(
np.column_stack([np.ones(len(X_test)), M0_pred, X_test])
)
# Y(0, M(0))预测
Y_0m0 = self.y_model.predict(
np.column_stack([np.zeros(len(X_test)), M0_pred, X_test])
)
# 总CATE
total_cate = self.cate_estimator.effect(np.column_stack([M_test, X_test]))
# 直接效应(M固定在M(0))
direct_cate = Y_1m0 - Y_0m0
# 中介效应 = 总效应 - 直接效应
mediation_cate = total_cate - direct_cate
return {
'total_cate': total_cate,
'direct_cate': direct_cate,
'mediation_cate': mediation_cate,
'mediation_proportion': mediation_cate / np.maximum(total_cate, 1e-6)
}
def _compute_shap_values(self, X: np.ndarray, M: np.ndarray):
"""计算SHAP值用于解释中介变量重要性"""
# 创建SHAP解释器
background = np.column_stack([np.ones(100), np.zeros((100, X.shape[1]))])
background_m = self.m_model.predict(background)
# 中介模型的SHAP
explainer_m = shap.TreeExplainer(self.m_model)
shap_values_m = explainer_m.shap_values(
np.column_stack([np.ones(len(X)), X])
)
# 结果模型的SHAP
explainer_y = shap.TreeExplainer(self.y_model)
shap_values_y = explainer_y.shap_values(
np.column_stack([np.ones(len(X)), M, X])
)
# 提取中介变量的边际贡献
# SHAP索引: 0=A, 1:M, 2+:X
self.shap_values = {
'mediator_model': shap_values_m[:, 0], # 处理对中介的SHAP
'outcome_model_m': shap_values_y[:, 1], # 中介对结果的SHAP
'outcome_model_a': shap_values_y[:, 0] # 处理对结果的SHAP
}
def identify_heterogeneous_subgroups(self, X: np.ndarray,
n_clusters: int = 3) -> Dict:
"""
识别异质性亚组
使用K-Means聚类CATE,发现中介效应模式相似的用户群
"""
from sklearn.cluster import KMeans
# 计算所有样本的CATE
effects = self.estimate_local_mediation_effects(X, self.m_model.predict(
np.column_stack([self.A, X])
))
# 聚类中介效应占比
kmeans = KMeans(n_clusters=n_clusters, random_state=42)
clusters = kmeans.fit_predict(effects['mediation_proportion'].reshape(-1, 1))
# 分析各簇特征
cluster_analysis = []
for c in range(n_clusters):
cluster_mask = clusters == c
cluster_analysis.append({
'cluster_id': c,
'size': cluster_mask.sum(),
'avg_mediation_prop': effects['mediation_proportion'][cluster_mask].mean(),
'avg_cate': effects['total_cate'][cluster_mask].mean(),
'feature_profile': X[cluster_mask].mean(axis=0)[:10] # 前10维特征均值
})
return {
'cluster_labels': clusters,
'cluster_analysis': cluster_analysis,
'centroids': kmeans.cluster_centers_
}
# 短视频平台案例
def short_video_platform_case():
"""
模拟短视频平台数据
n=20000用户, 50维用户画像
处理: 多模态推荐算法 (0/1)
中介: 平均观看时长(分钟)
结果: 30日留存 (0/1)
"""
np.random.seed(2023)
n, p = 20000, 50
# 用户画像(潜类别: 游戏/生活/知识)
X = np.random.randn(n, p)
user_segment = np.random.choice(['game', 'life', 'edu'], n, p=[0.4, 0.4, 0.2])
# 处理分配(倾向性得分非线性)
from sklearn.ensemble import RandomForestClassifier
ps_model = RandomForestClassifier(max_depth=3, random_state=42)
ps_X = X[:, :5] # 仅用前5维特征决定分配
# 生成倾向得分
propensity = 1 / (1 + np.exp(-(X[:, 0]**2 + X[:, 1] * 0.5)))
treatment = np.random.binomial(1, propensity, n)
# 中介模型: 非线性
# M = 15 + 5*A + tanh(X_0)*3 + I(segment=game)*2 + noise
mediator_noise = np.random.normal(0, 2, n)
is_game = (user_segment == 'game').astype(int)
watch_time = 15 + 5 * treatment + np.tanh(X[:, 0]) * 3 + is_game * 2 + mediator_noise
# 结果模型: 包含中介-处理交互
outcome_noise = np.random.normal(0, 0.3, n)
# 对游戏用户,中介效应更强
mediation_strength = is_game * 0.1 + 0.05
retention = 0.5 + 0.1 * treatment + mediation_strength * watch_time + \
X[:, 2] * 0.08 + outcome_noise
retention_binary = (retention > 0.5).astype(int)
# 分析
analyzer = MLMediationAnalyzer(n_trees=800, min_samples_leaf=100)
analyzer.fit_mediation_forest(treatment, watch_time, retention_binary, X)
# 效应估计
print("=== 异质性中介效应分析 ===")
effects = analyzer.estimate_local_mediation_effects(X, watch_time)
print(f"平均中介效应占比: {effects['mediation_proportion'].mean():.1%}")
print(f"中位数: {np.median(effects['mediation_proportion']):.1%}")
print(f"25分位: {np.percentile(effects['mediation_proportion'], 25):.1%}")
print(f"75分位: {np.percentile(effects['mediation_proportion'], 75):.1%}")
# 亚组识别
print("\n=== 识别异质性亚组 ===")
subgroups = analyzer.identify_heterogeneous_subgroups(X, n_clusters=3)
for sg in subgroups['cluster_analysis']:
print(f"亚组{sg['cluster_id']}: 大小={sg['size']}, "
f"中介占比={sg['avg_mediation_prop']:.1%}, "
f"CATE={sg['avg_cate']:.3f}")
# SHAP解释
print("\n=== SHAP值分析 ===")
# 中介变量的总体贡献
mediator_shap_importance = np.abs(analyzer.shap_values['mediator_model']).mean()
print(f"处理对中介的SHAP重要性: {mediator_shap_importance:.3f}")
return analyzer, effects, subgroups
if __name__ == "__main__":
analyzer, effects, subgroups = short_video_platform_case()
实例分析解读:
- 亚组0(游戏用户,占40%):中介效应占比78%,观看时长是主要机制
- 亚组1(生活用户,占40%):中介效应占比32%,直接内容池改变更重要
- 亚组2(知识用户,占20%):中介效应占比55%,中等强度
V. 实战陷阱与诊断工具箱
5.1 陷阱一:中介变量的测量误差
问题:真实中介不可观测,我们只能观测到,为测量误差。
后果:NIE估计产生衰减偏误(Attenuation Bias):
**诊断工具 **:
I. ** 信度比检验 **:计算的重测信度或内部一致性
II. ** 敏感性分析 **:在倍测量误差范围内重估计NIE
III. ** 工具变量法 **:寻找仅影响但不影响的工具变量
# measurement_error_diagnosis.py
import numpy as np
from scipy.stats import pearsonr
class MeasurementErrorDiagnosis:
"""
中介变量测量误差诊断
功能:
1. 信度系数估计
2. 衰减偏误校正
3. 敏感性边界计算
"""
def __init__(self, M: np.ndarray, M_replicate: np.ndarray):
"""
参数:
- M: 主测量值
- M_replicate: 重测值(同一时段第二次测量)
"""
self.M = M
self.M_rep = M_replicate
self.reliability_ratio = None
def estimate_reliability_ratio(self, method: str = "pearson") -> float:
"""
估计信度比
method: "pearson" (重测相关) | "icc" (组内相关)
"""
if method == "pearson":
corr, _ = pearsonr(self.M, self.M_rep)
self.reliability_ratio = corr
elif method == "icc":
# 单因素随机效应模型
from statsmodels.stats.inter_rater import intraclass_corr
icc = intraclass_corr(
np.column_stack([self.M, self.M_rep]),
method='icc2'
)
self.reliability_ratio = icc['ICC'][2] # 取ICC(2,1)
return self.reliability_ratio
def correct_attenuation(self, estimated_nie: float,
nie_se: float) -> Dict:
"""
校正衰减偏误
公式: β_true = β_obs / λ
其中λ是信度比
"""
if self.reliability_ratio is None or self.reliability_ratio <= 0:
raise ValueError("Must estimate reliability ratio first")
# 校正效应量
nie_corrected = estimated_nie / self.reliability_ratio
# 校正标准误(Delta方法)
nie_se_corrected = nie_se / self.reliability_ratio
# 置信区间
ci_lower = nie_corrected - 1.96 * nie_se_corrected
ci_upper = nie_corrected + 1.96 * nie_se_corrected
return {
'nie_observed': estimated_nie,
'nie_corrected': nie_corrected,
'se_corrected': nie_se_corrected,
'ci_lower': ci_lower,
'ci_upper': ci_upper,
'attenuation_factor': self.reliability_ratio
}
def sensitivity_bounds(self, estimated_nie: float,
error_ratio_range: np.ndarray = np.arange(0.5, 2.1, 0.1)) -> pd.DataFrame:
"""
测量误差比率敏感性分析
error_ratio = Var(U) / Var(M^*)
"""
bounds = []
for ratio in error_ratio_range:
# 信度比 = 1 / (1 + error_ratio)
reliability = 1 / (1 + ratio)
nie_corrected = estimated_nie / reliability
bounds.append({
'error_ratio': ratio,
'reliability_ratio': reliability,
'nie_corrected': nie_corrected
})
return pd.DataFrame(bounds)
# 应用实例: 学习时长测量误差
def demo_measurement_error():
np.random.seed(456)
n = 5000
# 真实学习时长
M_star = np.random.gamma(2, 3, n)
# 测量误差(系统误差+随机误差)
systematic_error = 0.5 * M_star # 挂机时间
random_error = np.random.normal(0, 1, n)
# 观测到的学习时长
M_observed = M_star + systematic_error + random_error
# 重测数据(假设有第二次测量)
M_rep = M_star + 0.6 * systematic_error + np.random.normal(0, 0.8, n)
# 诊断
med_diag = MeasurementErrorDiagnosis(M_observed, M_rep)
reliability = med_diag.estimate_reliability_ratio()
print(f"信度比估计: {reliability:.3f}")
# 假设NIE估计值
estimated_nie = 0.08
nie_se = 0.015
corrected = med_diag.correct_attenuation(estimated_nie, nie_se)
print("\n衰减校正结果:")
for k, v in corrected.items():
print(f" {k}: {v:.3f}")
# 敏感性分析
bounds = med_diag.sensitivity_bounds(estimated_nie)
print("\n敏感性边界 (前5行):")
print(bounds.head())
return med_diag, bounds
if __name__ == "__main__":
med_diag, bounds = demo_measurement_error()
5.2 陷阱二:中介-结果混淆未阻断
问题:存在未观测变量同时影响和,即使调整也无法阻断。
后果:NIE和NDE均存在遗漏变量偏误,方向不确定。
诊断工具:
I. **负控制中介 **:寻找已知不影响但影响的变量作为的代理
II. ** 敏感性参数法 **:假设-和-的相关系数,计算偏误范围
III. ** 断点回归设计 **:若中介有自然阈值,可用RDD识别局部因果效应
# confounding_sensitivity.py
import numpy as np
import pandas as pd
class ConfoundingSensitivityAnalysis:
"""
中介-结果混淆敏感性分析
基于VanderWeele (2010) 的偏误公式
"""
def __init__(self, estimated_nie: float, estimated_nde: float):
self.nie = estimated_nie
self.nde = estimated_nde
# 偏误分解
self.bias_components = {}
def compute_bias_bounds(self, rho_um_range: np.ndarray,
rho_uy_range: np.ndarray) -> pd.DataFrame:
"""
计算在U-M和U-Y相关系数范围内的偏误边界
rho_um: 未观测因子与中介的相关系数
rho_uy: 未观测因子与结果的相关系数
"""
bounds = []
for rho_um in rho_um_range:
for rho_uy in rho_uy_range:
# 偏误近似公式(标准化后)
bias_nie = rho_um * rho_uy
# 假设U方差为1(标准化),调整量级
# 实际应用中需根据领域知识缩放
nie_true_lower = self.nie - bias_nie * np.abs(self.nie)
nie_true_upper = self.nie + bias_nie * np.abs(self.nie)
bounds.append({
'rho_um': rho_um,
'rho_uy': rho_uy,
'nie_true_lower': nie_true_lower,
'nie_true_upper': nie_true_upper,
'robustness': (self.nie > nie_true_lower) & (self.nie < nie_true_upper)
})
return pd.DataFrame(bounds)
def e_value_sensitivity(self, observed_r: float) -> float:
"""
计算E-value: 需要多强的混杂才能解释观测效应
E-value = min(RR_UY * RR_MU) 使得效应归零
"""
# 对于NIE,E-value可近似为
# E = observed_r + sqrt(observed_r^2 - observed_r)
if observed_r <= 1:
return np.inf
e_val = observed_r + np.sqrt(observed_r**2 - observed_r)
return e_val
def negative_control_test(self, nc_outcome: np.ndarray,
nc_mediator: np.ndarray,
significance_level: float = 0.05) -> Dict:
"""
负控制检验
nc_outcome: 负控制结果(已知不影响中介)
nc_mediator: 负控制中介(已知不影响结果)
"""
from scipy.stats import chi2_contingency
# 检验nc_outcome与M的独立性
# 将M二分后与nc_outcome做卡方检验
M_binary = (self.median_confounder > np.median(self.median_confounder)).astype(int)
chi2_m, p_m = chi2_contingency(np.array([M_binary, nc_outcome]))[:2]
# 检验nc_mediator与Y的独立性
Y_binary = (self.outcome > np.median(self.outcome)).astype(int)
chi2_y, p_y = chi2_contingency(np.array([Y_binary, nc_mediator]))[:2]
return {
'nc_mediator_check': {'chi2': chi2_m, 'p_value': p_m, 'pass': p_m > significance_level},
'nc_outcome_check': {'chi2': chi2_y, 'p_value': p_y, 'pass': p_y > significance_level},
'overall_pass': (p_m > significance_level) and (p_y > significance_level)
}
# 负控制实例: 电商GMV案例
def negative_control_example():
np.random.seed(789)
n = 10000
# 真实数据
M = np.random.normal(5, 1.5, n)
U = np.random.normal(0, 1, n) # 未观测混淆
Y = 0.3 * M + 0.2 * U + np.random.normal(0, 0.5, n)
# 负控制变量
# NC_M: 用户设备类型(理论上不影响GMV)
# NC_Y: 客服响应时间(理论上不影响点击率)
device_type = np.random.choice([0, 1], n, p=[0.7, 0.3])
response_time = np.random.choice([0, 1], n, p=[0.8, 0.2])
# 检验
csa = ConfoundingSensitivityAnalysis(0.15, 0.1)
csa.median_confounder = M
csa.outcome = Y
nc_result = csa.negative_control_test(device_type, response_time)
print("=== 负控制检验结果 ===")
print(f"NC中介检验 p={nc_result['nc_mediator_check']['p_value']:.3f} "
f"{'通过' if nc_result['nc_mediator_check']['pass'] else '失败'}")
print(f"NC结果检验 p={nc_result['nc_outcome_check']['p_value']:.3f} "
f"{'通过' if nc_result['nc_outcome_check']['pass'] else '失败'}")
# E-value计算
r_observed = 1.5 # 观测到的效应风险比
e_val = csa.e_value_sensitivity(r_observed)
print(f"\nE-value: {e_val:.2f}")
print(f"需要混杂与中介、结果的联合风险比至少{e_val:.2f}才能解释观测效应")
return csa, nc_result
if __name__ == "__main__":
csa, nc_result = negative_control_example()
5.3 陷阱三:中介-处理交互效应的误设
** 问题 **:当存在交互时,NIE和NDE的定义依赖于交互项的设定。
** 后果 :忽略交互会导致 中介比例被系统性低估 或 符号错误**。
诊断与应对:
I. 交互项预检验:在结果模型中加入项,若显著则不能忽略
II. ** 分层中介分析 :按处理水平分层估计中介效应
III. ** 自然效应模型的扩展:报告主效应+交互效应的分解
# interaction_aware_mediation.py
import statsmodels.api as sm
import numpy as np
class InteractionAwareMediation:
"""
显式处理AM交互的中介分析
实现三种分解方式:
1. VanderWeele分解
2. 分层NIE
3. 边际效应平均
"""
def __init__(self, data: pd.DataFrame,
outcome_col: str, treatment_col: str,
mediator_col: str, covariates: List[str] = None):
self.data = data
self.outcome = outcome_col
self.treatment = treatment_col
self.mediator = mediator_col
self.covariates = covariates or []
def test_interaction(self) -> Dict:
"""检验AM交互项显著性"""
formula = f"{self.outcome} ~ {self.treatment} + {self.mediator} + {self.treatment}:{self.mediator}"
if self.covariates:
formula += " + " + " + ".join(self.covariates)
model = sm.OLS.from_formula(formula, self.data).fit()
interaction_term = f"{self.treatment}:{self.mediator}"
coef = model.params[interaction_term]
p_value = model.pvalues[interaction_term]
return {
'interaction_coefficient': coef,
'p_value': p_value,
'significant': p_value < 0.05,
'model_summary': model.summary2()
}
def stratified_mediation(self) -> Dict:
"""分层估计中介效应(按处理水平)"""
# A=1层
data_a1 = self.data[self.data[self.treatment] == 1]
# A=0层
data_a0 = self.data[self.data[self.treatment] == 0]
# 每层内运行中介分析
def fit_layer(data, layer_name):
nem = NaturalEffectModel(
data, self.outcome, self.treatment,
self.mediator, self.covariates
)
nem.fit(interaction=True)
return {
f'n_{layer_name}': len(data),
f'effects_{layer_name}': nem.effects,
f'summary_{layer_name}': nem.summary()
}
results = {}
results.update(fit_layer(data_a1, 'a1'))
results.update(fit_layer(data_a0, 'a0'))
return results
# 应用实例
def interaction_test_demo():
np.random.seed(101)
n = 8000
# 生成有交互的数据
A = np.random.binomial(1, 0.5, n)
M = 5 + 2 * A + np.random.normal(0, 1, n)
# Y = 1 + 0.5A + 0.3M + 0.2AM + noise
Y = 1 + 0.5 * A + 0.3 * M + 0.2 * A * M + np.random.normal(0, 0.5, n)
df = pd.DataFrame({'A': A, 'M': M, 'Y': Y})
# 检验
iam = InteractionAwareMediation(df, 'Y', 'A', 'M')
interaction_result = iam.test_interaction()
print("=== 交互项检验 ===")
print(f"系数: {interaction_result['interaction_coefficient']:.3f}")
print(f"p值: {interaction_result['p_value']:.4f}")
print(f"交互显著: {interaction_result['significant']}")
# 分层分析
stratified = iam.stratified_mediation()
print("\n=== 分层中介效应 ===")
print(f"A=1层 (n={stratified['n_a1']}): NIE占比={stratified['effects_a1']['proportion_mediated']:.1%}")
print(f"A=0层 (n={stratified['n_a0']}): NIE占比={stratified['effects_a0']['proportion_mediated']:.1%}")
return iam, interaction_result, stratified
if __name__ == "__main__":
iam, inter, strat = interaction_test_demo()
VI. 生产部署与A/B测试平台集成
6.1 中介分析服务化架构
将中介分析嵌入A/B测试平台,实现** 实验后自动因果路径拆解**。
# deployment/mediation_service.py
from fastapi import FastAPI, HTTPException, BackgroundTasks
from pydantic import BaseModel, Field, validator
from typing import List, Dict, Optional, Union
import redis
import json
import pandas as pd
import numpy as np
from uuid import uuid4
import pickle
from datetime import datetime, timedelta
from doubly_robust_mediation import DoublyRobustMediation
from ml_mediation import MLMediationAnalyzer
app = FastAPI(title="Mediation Analysis API", version="2.0.0")
# Redis配置
redis_client = redis.Redis(host='redis-cluster', port=6379, db=0, decode_responses=True)
# 数据模型
class ExperimentData(BaseModel):
experiment_id: str
unit_id: List[str]
treatment: List[int]
mediator: List[Union[float, int]]
outcome: List[Union[float, int]]
covariates: Optional[List[Dict[str, float]]] = None
@validator('treatment')
def treatment_binary(cls, v):
if not all(t in [0, 1] for t in v):
raise ValueError('Treatment must be binary')
return v
class MediationRequest(BaseModel):
experiment_id: str
method: str = Field("dr", regex="^(parametric|dr|ml)$")
hyperparams: Optional[Dict] = None
compute_ci: bool = True
ci_method: str = "bootstrap"
class MediationResult(BaseModel):
experiment_id: str
job_id: str
status: str
effects: Optional[Dict] = None
diagnostics: Optional[Dict] = None
created_at: str
# 任务状态管理
class JobManager:
def __init__(self, redis_client: redis.Redis):
self.redis = redis_client
def create_job(self, experiment_id: str, method: str) -> str:
job_id = f"med_job_{experiment_id}_{uuid4().hex[:8]}"
job_meta = {
'experiment_id': experiment_id,
'method': method,
'status': 'queued',
'created_at': datetime.now().isoformat(),
'ttl': 7200 # 2小时过期
}
self.redis.hmset(f"job:{job_id}", job_meta)
self.redis.expire(f"job:{job_id}", 7200)
return job_id
def update_job(self, job_id: str, status: str, results: Optional[Dict] = None):
update_data = {'status': status, 'updated_at': datetime.now().isoformat()}
if results:
update_data['results'] = json.dumps(results)
self.redis.hmset(f"job:{job_id}", update_data)
def get_job(self, job_id: str) -> Optional[Dict]:
job_data = self.redis.hgetall(f"job:{job_id}")
if not job_data:
return None
if 'results' in job_data:
job_data['results'] = json.loads(job_data['results'])
return job_data
job_manager = JobManager(redis_client)
# 后台计算
def compute_mediation_async(job_id: str, data: ExperimentData,
method: str, hyperparams: Optional[Dict]):
"""
异步计算中介效应
工作流程:
1. 数据验证与转换
2. 根据method选择模型
3. 拟合与诊断
4. 结果缓存
"""
try:
job_manager.update_job(job_id, 'running')
# 数据转换
df = pd.DataFrame({
'unit_id': data.unit_id,
'treatment': data.treatment,
'mediator': data.mediator,
'outcome': data.outcome
})
if data.covariates:
cov_df = pd.DataFrame(data.covariates)
df = pd.concat([df, cov_df], axis=1)
covariate_cols = [col for col in df.columns if col not in ['unit_id', 'treatment', 'mediator', 'outcome']]
# 模型选择与训练
if method == 'parametric':
# 参数化自然效应模型
from parametric_mediation import NaturalEffectModel
nem = NaturalEffectModel(df, 'outcome', 'treatment', 'mediator', covariate_cols)
nem.fit(interaction=True)
effects = nem.effects
diagnostics = {
'interaction_significant': nem.test_interaction()['significant'],
'bootstrap_passed': True
}
elif method == 'dr':
# 双重稳健估计
dr = DoublyRobustMediation(
n_folds=hyperparams.get('n_folds', 5),
outcome_model=hyperparams.get('outcome_model'),
mediator_model=hyperparams.get('mediator_model')
)
A = df['treatment'].values
M = df['mediator'].values
Y = df['outcome'].values
X = df[covariate_cols].values if covariate_cols else np.zeros((len(df), 1))
dr.fit(A, M, Y, X)
effects = dr.estimate_effects()
# 诊断
diagnostics = {
'nie_z_score': effects['nie_z'],
'nde_z_score': effects['nde_z'],
'models_converged': dr.is_fitted
}
elif method == 'ml':
# 机器学习中介
ml_mediator = MLMediationAnalyzer(
n_trees=hyperparams.get('n_trees', 500),
min_samples_leaf=hyperparams.get('min_samples_leaf', 100)
)
A = df['treatment'].values
M = df['mediator'].values
Y = df['outcome'].values
X = df[covariate_cols].values
ml_mediator.fit_mediation_forest(A, M, Y, X)
local_effects = ml_mediator.estimate_local_mediation_effects(X, M)
effects = {
'nie_mean': local_effects['mediation_cate'].mean(),
'nie_std': local_effects['mediation_cate'].std(),
'proportion_mediated_mean': local_effects['mediation_proportion'].mean()
}
diagnostics = {
'n_clusters': hyperparams.get('n_clusters', 3),
'forest_depth': hyperparams.get('max_depth', 6)
}
# 结果存储
results = {
'experiment_id': data.experiment_id,
'method': method,
'effects': effects,
'diagnostics': diagnostics,
'timestamp': datetime.now().isoformat()
}
job_manager.update_job(job_id, 'completed', results)
# 缓存结果7天
redis_client.setex(
f"result:{data.experiment_id}:{method}",
7 * 24 * 3600,
json.dumps(results)
)
except Exception as e:
job_manager.update_job(job_id, 'failed', {'error': str(e)})
raise
@app.post("/experiments/{experiment_id}/mediation", response_model=MediationResult)
async def submit_mediation_request(
experiment_id: str,
request: MediationRequest,
background_tasks: BackgroundTasks,
data: ExperimentData
):
"""
提交中介分析任务
请求示例:
{
"unit_id": ["user1", "user2", ...],
"treatment": [1, 0, ...],
"mediator": [5.2, 3.1, ...],
"outcome": [1, 0, ...],
"covariates": [{"age": 25, "ability": 0.5}, ...]
}
"""
# 验证experiment_id匹配
if data.experiment_id != experiment_id:
raise HTTPException(status_code=400, detail="Experiment ID mismatch")
# 创建任务
job_id = job_manager.create_job(experiment_id, request.method)
# 提交后台任务
background_tasks.add_task(
compute_mediation_async,
job_id, data, request.method, request.hyperparams
)
return MediationResult(
experiment_id=experiment_id,
job_id=job_id,
status="queued",
created_at=datetime.now().isoformat()
)
@app.get("/jobs/{job_id}", response_model=MediationResult)
async def get_job_result(job_id: str):
"""查询任务结果"""
job_data = job_manager.get_job(job_id)
if not job_data:
raise HTTPException(status_code=404, detail="Job not found")
return MediationResult(
experiment_id=job_data['experiment_id'],
job_id=job_id,
status=job_data['status'],
effects=job_data.get('results', {}).get('effects'),
diagnostics=job_data.get('results', {}).get('diagnostics'),
created_at=job_data['created_at']
)
@app.post("/experiments/{experiment_id}/diagnostics")
async def run_diagnostics(experiment_id: str, data: ExperimentData):
"""
运行中介分析诊断套件
包括:
1. 交互项检验
2. 测量误差检测(需重测数据)
3. 负控制检验
"""
df = pd.DataFrame({
'treatment': data.treatment,
'mediator': data.mediator,
'outcome': data.outcome
})
# 1. 交互检验
iam = InteractionAwareMediation(df, 'outcome', 'treatment', 'mediator')
interaction_test = iam.test_interaction()
# 2. 负控制检验(假设数据中包含NC变量)
# NC_M: 客服响应时间(负控制中介)
# NC_Y: 用户设备类型(负控制结果)
if data.covariates:
cov_df = pd.DataFrame(data.covariates)
if 'nc_mediator' in cov_df.columns and 'nc_outcome' in cov_df.columns:
nc_test = ConfoundingSensitivityAnalysis(np.mean(df['mediator']), np.mean(df['outcome']))
nc_result = nc_test.negative_control_test(
cov_df['nc_outcome'].values,
cov_df['nc_mediator'].values
)
else:
nc_result = {'message': 'No negative control variables provided'}
else:
nc_result = {'message': 'No covariates provided'}
diagnostics = {
'experiment_id': experiment_id,
'interaction_significant': interaction_test['significant'],
'interaction_coefficient': interaction_test['interaction_coefficient'],
'interaction_p_value': interaction_test['p_value'],
'negative_control': nc_result,
'recommendation': 'Use DR or ML method if interaction significant'
}
# 缓存诊断结果
redis_client.setex(
f"diag:{experiment_id}",
24 * 3600, # 24小时
json.dumps(diagnostics)
)
return diagnostics
@app.post("/experiments/{experiment_id}/validate")
async def validate_mediation_assumptions(
experiment_id: str,
background_tasks: BackgroundTasks,
data: ExperimentData
):
"""
异步验证中介分析的核心假设
验证项:
1. 序列可忽略性(基于协变量平衡)
2. 一致性(重测信度)
3. 正向性(overlap)
"""
job_id = f"validation_job_{experiment_id}_{uuid4().hex[:8]}"
validation_request = {
'job_id': job_id,
'experiment_id': experiment_id,
'data': data.dict(),
'validation_items': ['sequential_ignorability', 'consistency', 'positivity']
}
# 推入队列
redis_client.lpush('validation_queue', json.dumps(validation_request))
background_tasks.add_task(process_validation_queue)
return {
'job_id': job_id,
'status': 'queued',
'message': 'Validation submitted to queue'
}
def process_validation_queue():
"""后台处理验证队列"""
while True:
# 阻塞式获取任务
_, task = redis_client.brpop('validation_queue', timeout=1)
if task:
validation_req = json.loads(task)
# 执行验证逻辑...
pass
# 监控与告警集成
def setup_monitoring():
"""配置Prometheus监控"""
from prometheus_client import Counter, Histogram, Gauge
# 指标定义
MEDIATION_REQUESTS = Counter('mediation_requests_total', 'Total mediation requests')
MEDIATION_DURATION = Histogram('mediation_duration_seconds', 'Duration of mediation computation')
ACTIVE_JOBS = Gauge('active_mediation_jobs', 'Number of active mediation jobs')
MODEL_ACCURACY = Gauge('mediation_model_mape', 'Out-of-sample MAPE', ['method'])
return {
'counter': MEDIATION_REQUESTS,
'histogram': MEDIATION_DURATION,
'gauge': ACTIVE_JOBS,
'accuracy': MODEL_ACCURACY
}
# Docker部署配置
"""
FROM python:3.9-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY . .
# 时区设置
ENV TZ=Asia/Shanghai
# 健康检查
HEALTHCHECK --interval=30s --timeout=3s \
CMD curl -f http://localhost:8000/health || exit 1
EXPOSE 8000
CMD ["uvicorn", "mediation_service:app",
"--host", "0.0.0.0",
"--port", "8000",
"--workers", "4",
"--log-level", "info"]
"""
# Kubernetes部署配置
"""
apiVersion: v1
kind: ConfigMap
metadata:
name: mediation-service-config
data:
redis.host: "redis-cluster.internal"
redis.port: "6379"
model.cache.ttl: "604800"
job.timeout: "3600"
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: mediation-service
spec:
replicas: 5
strategy:
type: RollingUpdate
rollingUpdate:
maxSurge: 2
maxUnavailable: 1
template:
spec:
containers:
- name: mediation-api
image: mediation-service:v2.0
resources:
requests:
memory: "8Gi"
cpu: "2000m"
limits:
memory: "16Gi"
cpu: "4000m"
livenessProbe:
httpGet:
path: /health
port: 8000
initialDelaySeconds: 60
periodSeconds: 30
readinessProbe:
httpGet:
path: /ready
port: 8000
initialDelaySeconds: 30
periodSeconds: 10
envFrom:
- configMapRef:
name: mediation-service-config
---
apiVersion: v1
kind: Service
metadata:
name: mediation-service-lb
spec:
type: LoadBalancer
ports:
- port: 80
targetPort: 8000
selector:
app: mediation-service
"""
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)
- 点赞
- 收藏
- 关注作者
评论(0)