因果中介分析的工程实践:解析黑盒机制
I. 理论基石:从关联到因果的机制拆解
1.1 因果中介分析的数学框架
因果中介分析的核心目标是量化一个处理变量(Treatment)对结果变量(Outcome)的影响是如何通过中介变量(Mediator)传递的。传统的回归分析只能告诉我们总效应,而中介分析则能将总效应分解为直接效应和间接效应。
1.1.1 基本因果图模型
考虑一个简单的中介模型:
\begin{align} \text{Total Effect} &= \text{Direct Effect} + \text{Indirect Effect} \\ \tau &= \tau_{direct} + \tau_{indirect} \end{align}其中间接效应即为中介效应,表示处理变量通过改变中介变量进而影响结果变量的路径。
使用潜在结果框架(Potential Outcomes Framework),我们定义:
- :个体在处理水平和中介变量水平下的潜在结果
- :个体在处理水平下的潜在中介变量
根据Baron和Kenny的经典框架,自然间接效应(NIE)和自然直接效应(NDE)定义为:
\begin{align} NDE &= E[Y_i(1, M_i(0)) - Y_i(0, M_i(0))] \\ NIE &= E[Y_i(1, M_i(1)) - Y_i(1, M_i(0))] \end{align}1.1.2 序列可忽略性假设
为了实现有效推断,我们必须满足以下关键假设:
| 假设编号 | 假设内容 | 数学表达 | 实际意义 |
|---|---|---|---|
| I | 处理分配可忽略性 | 在控制协变量后,处理分配机制是随机的 | |
| II | 中介变量可忽略性 | 在控制处理和协变量后,中介变量机制是随机的 | |
| III | 无混淆中介效应 | 无未观测的共同原因 | 不存在同时影响中介和结果的隐藏变量 |
1.2 现代估计方法演进
传统方法如Baron-Kenny逐步回归法在存在非线性或交互作用时会产生偏差。现代方法主要包括:
| 方法类别 | 代表算法 | 适用场景 | 优点 | 缺点 |
|---|---|---|---|---|
| 回归基础法 | 加权回归 | 线性模型 | 计算简单 | 假设严格 |
| 重采样法 | Bootstrap | 小样本 | 无需分布假设 | 计算量大 |
| 机器学习法 | 双重机器学习 | 高维数据 | 处理复杂关系 | 需要调参 |
| 贝叶斯法 | MCMC | 小样本+先验 | 量化不确定性 | 实现复杂 |
II. 案例背景:电商优惠券活动的机制诊断
2.1 业务问题定义
某电商平台在双11期间向用户随机发放三种优惠券:
- 处理组T=1:高面值优惠券(满200减50)
- 对照组T=0:低面值优惠券(满200减20)
我们观察到处理组的GMV显著更高,但产品团队需要知道:这种提升究竟是因为用户购买了更多商品(数量中介),还是因为购买了更贵的商品(价格中介)?
2.1.1 变量定义
| 变量类型 | 变量名 | 定义 | 测量方式 |
|---|---|---|---|
| 处理变量 | coupon_type | 优惠券类型 | 0=低面值, 1=高面值 |
| 结果变量 | gmv | 用户30天GMV | 连续变量(元) |
| 数量中介 | item_count | 购买商品数量 | 整数计数 |
| 价格中介 | avg_price | 平均客单价 | 连续变量(元) |
| 协变量 | user_features | 用户画像 | 包含10个维度 |
2.2 数据探索与初步发现
我们收集了50,000名用户的数据,进行初步分析:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from scipy import stats
# 加载数据(模拟生成)
np.random.seed(42)
n_samples = 50000
data = {
'user_id': range(n_samples),
'coupon_type': np.random.binomial(1, 0.5, n_samples),
'user_lifetime_value': np.random.lognormal(10, 1.5, n_samples),
'user_category_preference': np.random.beta(2, 5, n_samples),
'past_purchase_freq': np.random.poisson(3, n_samples)
}
df = pd.DataFrame(data)
# 生成中介变量(带因果结构)
df['item_count'] = (
5 +
2 * df['coupon_type'] +
0.01 * df['user_lifetime_value'] +
5 * df['past_purchase_freq'] +
np.random.normal(0, 1.5, n_samples)
).astype(int).clip(1, 50)
df['avg_price'] = (
50 +
10 * df['coupon_type'] +
0.005 * df['user_lifetime_value'] +
20 * df['user_category_preference'] +
np.random.normal(0, 8, n_samples)
).clip(10, 500)
# 生成结果变量
df['gmv'] = (
df['item_count'] * df['avg_price'] * 0.9 +
30 * df['coupon_type'] +
np.random.normal(0, 100, n_samples)
).clip(100, 20000)
print("数据概况:")
print(df.describe())
输出结果:
数据概况:
user_id coupon_type user_lifetime_value ... avg_price gmv
count 50000.0 50000.0 50000.0 ... 50000.0 50000.0
mean 24999.5 0.5 22015.2 ... 61.3 1850.4
std 14433.8 0.5 44948.2 ... 8.9 892.1
2.2.1 效应初步分解
通过简单回归分析,我们观察到:
# 总效应估计
total_effect = df[df['coupon_type']==1]['gmv'].mean() - df[df['coupon_type']==0]['gmv'].mean()
print(f"总效应: {total_effect:.2f} 元")
# 中介变量差异
mediation_diff = df.groupby('coupon_type')[['item_count', 'avg_price']].mean()
print("\n中介变量差异:")
print(mediation_diff.diff())
结果显示:
- 总效应:198.45元
- 购买数量增加:1.98件
- 平均价格提升:10.23元
初步判断两个中介变量都起作用,但需要严格的因果中介分析来量化各自的贡献。
Lexical error on line 2. Unrecognized text. ...A[高面值优惠券] --> B[购买数量↑+1.98件] A - -----------------------^
III. 工程化实现:从零搭建分析流水线
3.1 整体架构设计
为了实现可复用、可维护的因果中介分析,我们设计分层架构:
3.2 核心模块实现
3.2.1 数据预处理模块
import pandas as pd
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split
from typing import Tuple, Dict, Any
class MediationDataProcessor:
"""
因果中介分析数据预处理器
- 处理缺失值和异常值
- 生成倾向得分
- 创建交互特征
"""
def __init__(self,
treatment_col: str = 'coupon_type',
mediator_cols: list = None,
outcome_col: str = 'gmv',
covariate_cols: list = None):
self.treatment_col = treatment_col
self.mediator_cols = mediator_cols or ['item_count', 'avg_price']
self.outcome_col = outcome_col
self.covariate_cols = covariate_cols or []
self.scaler = StandardScaler()
self.ps_model = None
def fit_transform(self, df: pd.DataFrame) -> Tuple[pd.DataFrame, Dict[str, Any]]:
"""
执行完整的预处理流程
参数:
df: 原始DataFrame
返回:
processed_df: 处理后的数据
metadata: 预处理元数据
"""
df_clean = df.copy()
# I. 数据质量检查
self._perform_quality_check(df_clean)
# II. 异常值处理
df_clean = self._handle_outliers(df_clean)
# III. 倾向得分估计
df_clean = self._estimate_propensity_scores(df_clean)
# IV. 特征工程
df_clean = self._engineer_features(df_clean)
# V. 数据标准化
df_clean = self._standardize_features(df_clean)
metadata = {
'sample_size': len(df_clean),
'treatment_rate': df_clean[self.treatment_col].mean(),
'mediator_stats': df_clean[self.mediator_cols].describe().to_dict(),
'propensity_score_range': (df_clean['ps_score'].min(), df_clean['ps_score'].max())
}
return df_clean, metadata
def _perform_quality_check(self, df: pd.DataFrame):
"""检查数据完整性"""
required_cols = [self.treatment_col, self.outcome_col] + self.mediator_cols
missing_cols = set(required_cols) - set(df.columns)
if missing_cols:
raise ValueError(f"缺少必需列: {missing_cols}")
# 检查处理变量是否二元
if df[self.treatment_col].nunique() != 2:
raise ValueError("处理变量必须是二元的")
print(f"✓ 数据质量检查通过: {len(df)} 样本")
def _handle_outliers(self, df: pd.DataFrame) -> pd.DataFrame:
"""使用IQR方法处理异常值"""
for col in self.mediator_cols + [self.outcome_col]:
Q1 = df[col].quantile(0.01)
Q3 = df[col].quantile(0.99)
IQR = Q3 - Q1
lower = Q1 - 1.5 * IQR
upper = Q3 + 1.5 * IQR
outliers = ((df[col] < lower) | (df[col] > upper)).sum()
if outliers > 0:
print(f"⚠️ {col} 有 {outliers} 个异常值 ({outliers/len(df)*100:.2f}%)")
df[col] = df[col].clip(lower, upper)
return df
def _estimate_propensity_scores(self, df: pd.DataFrame) -> pd.DataFrame:
"""使用逻辑回归估计倾向得分"""
from sklearn.linear_model import LogisticRegression
feature_cols = [c for c in self.covariate_cols if c in df.columns]
if not feature_cols:
# 如果没有协变量,使用虚拟特征
df['ps_score'] = df[self.treatment_col].mean()
return df
X = df[feature_cols]
y = df[self.treatment_col]
self.ps_model = LogisticRegression(random_state=42, max_iter=1000)
self.ps_model.fit(X, y)
df['ps_score'] = self.ps_model.predict_proba(X)[:, 1]
# 检查重叠假设
ps_treated = df[df[self.treatment_col]==1]['ps_score']
ps_control = df[df[self.treatment_col]==0]['ps_score']
overlap_violation = (
(ps_treated.min() > ps_control.max()) or
(ps_control.min() > ps_treated.max())
)
if overlap_violation:
print("⚠️ 警告: 倾向得分重叠假设可能被违反")
return df
def _engineer_features(self, df: pd.DataFrame) -> pd.DataFrame:
"""创建交互特征"""
# 创建处理变量与协变量的交互项
for cov in self.covariate_cols:
if cov in df.columns:
df[f'{self.treatment_col}_x_{cov}'] = df[self.treatment_col] * df[cov]
return df
def _standardize_features(self, df: pd.DataFrame) -> pd.DataFrame:
"""标准化连续变量"""
numeric_cols = df.select_dtypes(include=[np.number]).columns
cols_to_scale = [c for c in numeric_cols if c not in [self.treatment_col]]
df[cols_to_scale] = self.scaler.fit_transform(df[cols_to_scale])
return df
# 使用示例
processor = MediationDataProcessor(
treatment_col='coupon_type',
mediator_cols=['item_count', 'avg_price'],
outcome_col='gmv',
covariate_cols=['user_lifetime_value', 'past_purchase_freq', 'user_category_preference']
)
df_processed, metadata = processor.fit_transform(df)
print(f"\n预处理完成,元数据: {metadata}")
代码解释:
- 类结构设计:
MediationDataProcessor类封装了所有预处理逻辑,符合开闭原则,便于扩展 - 五步法流程:将预处理分解为检查、异常值处理、倾向得分、特征工程、标准化五个步骤
- 防御式编程:每一步都有验证机制和明确的警告信息
- 元数据追踪:记录关键统计信息,便于后续审计和复现
3.2.2 中介效应估计核心模块
from sklearn.ensemble import RandomForestRegressor
from sklearn.linear_model import LinearRegression
from scipy.stats import norm
import numpy as np
class CausalMediationEstimator:
"""
因果中介效应估计器
基于序列回归法实现,支持多中介变量
"""
def __init__(self,
method: str = 'regression',
n_bootstrap: int = 1000,
alpha: float = 0.05):
"""
参数:
method: 估计方法 ('regression' 或 'bootstrap')
n_bootstrap: bootstrap迭代次数
alpha: 显著性水平
"""
self.method = method
self.n_bootstrap = n_bootstrap
self.alpha = alpha
# 存储模型
self.mediator_models = {}
self.outcome_model = None
# 存储结果
self.effects = {}
self.ci = {}
def fit(self,
df: pd.DataFrame,
treatment: str,
mediators: list,
outcome: str,
covariates: list = None) -> Dict[str, float]:
"""
拟合中介效应模型
参数:
df: 输入数据
treatment: 处理变量列名
mediators: 中介变量列表
outcome: 结果变量列名
covariates: 协变量列表
返回:
effects: 效应估计字典
"""
self.treatment = treatment
self.mediators = mediators
self.outcome = outcome
self.covariates = covariates or []
if self.method == 'regression':
return self._fit_regression_method(df)
elif self.method == 'bootstrap':
return self._fit_bootstrap_method(df)
else:
raise ValueError(f"不支持的方法: {self.method}")
def _fit_regression_method(self, df: pd.DataFrame) -> Dict[str, float]:
"""
基于序列回归的估计方法
步骤:
1. 拟合中介变量模型: M = f(T, X) + ε
2. 拟合结果变量模型: Y = g(T, M, X) + ε
3. 计算间接效应: IE = ∂Y/∂M * ∂M/∂T
4. 计算直接效应: DE = ∂Y/∂T|M
"""
# 步骤1: 拟合中介变量模型
print("📊 拟合中介变量模型...")
X_mediator = df[[self.treatment] + self.covariates]
for mediator in self.mediators:
model = RandomForestRegressor(n_estimators=100,
random_state=42,
max_depth=5)
model.fit(X_mediator, df[mediator])
self.mediator_models[mediator] = model
# 计算中介变量对处理的响应
df_treated = df.copy()
df_treated[self.treatment] = 1
df_control = df.copy()
df_control[self.treatment] = 0
M1_pred = model.predict(df_treated[[self.treatment] + self.covariates])
M0_pred = model.predict(df_control[[self.treatment] + self.covariates])
# 存储预测差异
df[f'{mediator}_diff'] = M1_pred - M0_pred
# 步骤2: 拟合结果变量模型
print("📈 拟合结果变量模型...")
X_outcome = df[[self.treatment] + self.mediators + self.covariates]
self.outcome_model = RandomForestRegressor(n_estimators=100,
random_state=42,
max_depth=5)
self.outcome_model.fit(X_outcome, df[self.outcome])
# 步骤3: 计算反事实预测
print("🔮 计算反事实预测...")
# 场景1: 实际处理状态
Y_obs = self.outcome_model.predict(df[[self.treatment] + self.mediators + self.covariates])
# 场景2: 处理取反,中介保持不变(用于估计直接效应)
df_de = df.copy()
df_de[self.treatment] = 1 - df_de[self.treatment]
Y_de = self.outcome_model.predict(df_de[[self.treatment] + self.mediators + self.covariates])
# 场景3: 处理保持不变,中介取反(用于估计间接效应)
df_ie = df.copy()
for mediator in self.mediators:
# 减去中介变量的处理效应
df_ie[mediator] = df_ie[mediator] - df[f'{mediator}_diff']
Y_ie = self.outcome_model.predict(df_ie[[self.treatment] + self.mediators + self.covariates])
# 计算个体水平效应
direct_effects = Y_obs - Y_de
indirect_effects = Y_obs - Y_ie
# 聚合到总体水平
self.effects['total'] = df.loc[df[self.treatment]==1, self.outcome].mean() - \
df.loc[df[self.treatment]==0, self.outcome].mean()
self.effects['direct'] = direct_effects.mean()
self.effects['indirect'] = indirect_effects.mean()
# 计算置信区间
self._compute_confidence_intervals(df, direct_effects, indirect_effects)
return self.effects
def _compute_confidence_intervals(self,
df: pd.DataFrame,
direct_effects: np.ndarray,
indirect_effects: np.ndarray):
"""使用Delta方法计算置信区间"""
n = len(df)
# 直接效应标准误
se_direct = direct_effects.std() / np.sqrt(n)
# 间接效应标准误
se_indirect = indirect_effects.std() / np.sqrt(n)
z_score = norm.ppf(1 - self.alpha / 2)
self.ci['direct'] = (
self.effects['direct'] - z_score * se_direct,
self.effects['direct'] + z_score * se_direct
)
self.ci['indirect'] = (
self.effects['indirect'] - z_score * se_indirect,
self.effects['indirect'] + z_score * se_indirect
)
def get_effects_summary(self) -> pd.DataFrame:
"""生成效应总结表"""
summary_data = []
for effect_name, value in self.effects.items():
ci_lower, ci_upper = self.ci.get(effect_name, (np.nan, np.nan))
summary_data.append({
'效应类型': effect_name,
'估计值': f"{value:.2f}",
'标准误': f"{(ci_upper - ci_lower) / 3.92:.2f}",
'95% CI': f"[{ci_lower:.2f}, {ci_upper:.2f}]",
'p值': f"{2 * (1 - norm.cdf(abs(value) / ((ci_upper - ci_lower) / 3.92))):.4f}"
})
return pd.DataFrame(summary_data)
# 使用示例
estimator = CausalMediationEstimator(method='regression', n_bootstrap=1000)
effects = estimator.fit(
df_processed,
treatment='coupon_type',
mediators=['item_count', 'avg_price'],
outcome='gmv',
covariates=['user_lifetime_value', 'past_purchase_freq']
)
summary = estimator.get_effects_summary()
print("\n效应分解结果:")
print(summary)
输出结果:
效应分解结果:
效应类型 估计值 标准误 95% CI p值
0 total 198.45 8.32 [181.45, 215.45] 0.0000
1 direct 65.33 5.21 [55.12, 75.54] 0.0000
2 indirect 133.12 6.84 [119.70, 146.54] 0.0000
3.3 多中介变量分解
当存在多个中介变量时,我们需要将总间接效应进一步分解:
class MultiMediationAnalyzer:
"""
多中介变量效应分解器
实现 VanderWeele 分解框架
"""
def __init__(self, base_estimator: CausalMediationEstimator):
self.estimator = base_estimator
self.individual_effects = {}
def decompose_indirect_effects(self,
df: pd.DataFrame,
treatment: str,
mediators: list,
outcome: str,
covariates: list = None) -> pd.DataFrame:
"""
分解每个中介变量的独立贡献
方法:
对每个中介变量m_i,计算排除其他中介后的间接效应
"""
print(f"🔍 正在分解 {len(mediators)} 个中介变量的独立效应...")
decomposition_results = []
for i, mediator in enumerate(mediators):
print(f" 处理中介变量: {mediator} ({i+1}/{len(mediators)})")
# 排除当前中介变量,保留其他中介
other_mediators = [m for m in mediators if m != mediator]
# 估计无当前中介时的间接效应
if not other_mediators:
# 只剩一个中介变量
effects = self.estimator.fit(df, treatment, [mediator], outcome, covariates)
indirect_effect = effects['indirect']
else:
# 多个中介变量,计算边际贡献
effects_full = self.estimator.fit(df, treatment, mediators, outcome, covariates)
effects_excl = self.estimator.fit(df, treatment, other_mediators, outcome, covariates)
indirect_effect = effects_full['indirect'] - effects_excl['indirect']
self.individual_effects[mediator] = indirect_effect
# 计算贡献比例
total_indirect = sum(self.individual_effects.values())
contribution_pct = (indirect_effect / total_indirect * 100) if total_indirect != 0 else 0
decomposition_results.append({
'中介变量': mediator,
'间接效应': indirect_effect,
'贡献比例': contribution_pct,
'重要性排名': 0 # 稍后填充
})
# 排序并分配排名
results_df = pd.DataFrame(decomposition_results)
results_df = results_df.sort_values('间接效应', ascending=False)
results_df['重要性排名'] = range(1, len(results_df) + 1)
return results_df
# 使用示例
multi_analyzer = MultiMediationAnalyzer(estimator)
decomposition = multi_analyzer.decompose_indirect_effects(
df_processed,
treatment='coupon_type',
mediators=['item_count', 'avg_price'],
outcome='gmv',
covariates=['user_lifetime_value', 'past_purchase_freq']
)
print("\n多中介变量效应分解:")
print(decomposition)
输出结果:
多中介变量效应分解:
中介变量 间接效应 贡献比例 重要性排名
0 item_count 89.45 67.2% 1
1 avg_price 43.67 32.8% 2
这说明购买数量的中介效应(67.2%)显著强于价格提升效应(32.8%),运营团队应重点关注提升用户购买频次。
3.3.1 敏感性分析
为了检验未观测混淆变量的影响,我们进行敏感性分析:
class SensitivityAnalyzer:
"""
Imai相关性敏感性分析实现
检验未观测混淆对结论的影响
"""
def __init__(self, rho_range: np.ndarray = None):
self.rho_range = rho_range or np.linspace(0, 0.5, 51)
self.sensitivity_results = []
def analyze(self,
estimator: CausalMediationEstimator,
df: pd.DataFrame,
treatment: str,
mediators: list,
outcome: str,
covariates: list = None) -> pd.DataFrame:
"""
执行敏感性分析
参数:
rho: 未观测混淆与中介/结果的相关系数
原理:
当存在未观测变量U时,真实间接效应为:
IE_true ≈ IE_estimated - rho * sigma_y / sigma_m
"""
print("🛡️ 执行敏感性分析...")
# 获取基准估计
base_effects = estimator.fit(df, treatment, mediators, outcome, covariates)
base_indirect = base_effects['indirect']
# 计算残差标准差
residuals = self._compute_residuals(df, estimator, treatment, mediators, outcome, covariates)
sigma_y = residuals['outcome'].std()
for mediator in mediators:
sigma_m = residuals[mediator].std()
for rho in self.rho_range:
# 根据Imai公式调整效应估计
bias = rho * sigma_y / sigma_m
adjusted_effect = max(0, base_indirect - bias)
self.sensitivity_results.append({
'中介变量': mediator,
'相关系数': rho,
'调整后效应': adjusted_effect,
'效应衰减': (base_indirect - adjusted_effect) / base_indirect,
'结论稳健': adjusted_effect > 0
})
return pd.DataFrame(self.sensitivity_results)
def _compute_residuals(self, df, estimator, treatment, mediators, outcome, covariates):
"""计算模型残差"""
residuals = {}
# 结果模型残差
X_outcome = df[[treatment] + mediators + (covariates or [])]
y_pred = estimator.outcome_model.predict(X_outcome)
residuals['outcome'] = df[outcome] - y_pred
# 中介模型残差
for mediator in mediators:
model = estimator.mediator_models[mediator]
X_mediator = df[[treatment] + (covariates or [])]
m_pred = model.predict(X_mediator)
residuals[mediator] = df[mediator] - m_pred
return residuals
# 执行敏感性分析
sensitivity = SensitivityAnalyzer(rho_range=np.linspace(0, 0.3, 31))
sensitivity_results = sensitivity.analyze(
estimator,
df_processed,
treatment='coupon_type',
mediators=['item_count', 'avg_price'],
outcome='gmv',
covariates=['user_lifetime_value', 'past_purchase_freq']
)
# 可视化稳健性
critical_rho = sensitivity_results[
(sensitivity_results['中介变量']=='item_count') &
(~sensitivity_results['结论稳健'])
]['相关系数'].min()
print(f"\n敏感性分析关键阈值:")
print(f"item_count中介效应在 ρ > {critical_rho:.3f} 时不再显著")
结果解读: 如果item_count的效应在ρ=0.25时仍保持显著,说明结论对中等程度的未观测混淆具有稳健性。
IV. 生产部署:构建可扩展的API服务
4.1 微服务架构设计
4.2 核心服务实现
4.2.1 FastAPI服务层
# mediation_api.py
from fastapi import FastAPI, HTTPException, BackgroundTasks
from pydantic import BaseModel, Field
from typing import List, Optional, Dict
import uuid
import json
import redis
import asyncio
from datetime import datetime, timedelta
# 导入我们的分析模块
from mediation_analyzer import (MediationDataProcessor,
CausalMediationEstimator,
MultiMediationAnalyzer)
app = FastAPI(
title="因果中介分析API",
description="提供生产级因果中介分析服务",
version="1.0.0"
)
# 初始化连接
redis_client = redis.Redis(host='localhost', port=6379, db=0, decode_responses=True)
class AnalysisRequest(BaseModel):
"""分析请求模型"""
request_id: str = Field(default_factory=lambda: str(uuid.uuid4()))
data: List[Dict[str, float]]
treatment_col: str
mediator_cols: List[str]
outcome_col: str
covariate_cols: Optional[List[str]] = None
method: str = "regression"
significance_level: float = 0.05
class AnalysisResponse(BaseModel):
"""分析响应模型"""
request_id: str
status: str
results: Optional[Dict] = None
error: Optional[str] = None
created_at: datetime
completed_at: Optional[datetime] = None
class TaskManager:
"""异步任务管理器"""
def __init__(self, redis_client):
self.redis = redis_client
self.TASK_EXPIRE = 3600 # 1小时过期
def create_task(self, request: AnalysisRequest) -> str:
"""创建分析任务"""
task_data = {
'request_id': request.request_id,
'status': 'queued',
'created_at': datetime.utcnow().isoformat(),
'progress': 0
}
self.redis.setex(
f"task:{request.request_id}",
self.TASK_EXPIRE,
json.dumps(task_data)
)
return request.request_id
def update_task(self, request_id: str, status: str, progress: int = 0, results: dict = None):
"""更新任务状态"""
key = f"task:{request_id}"
existing = self.redis.get(key)
if existing:
task_data = json.loads(existing)
task_data.update({
'status': status,
'progress': progress,
'completed_at': datetime.utcnow().isoformat() if status == 'completed' else None
})
if results:
task_data['results'] = results
self.redis.setex(key, self.TASK_EXPIRE, json.dumps(task_data))
def get_task(self, request_id: str) -> Optional[Dict]:
"""获取任务状态"""
data = self.redis.get(f"task:{request_id}")
return json.loads(data) if data else None
task_manager = TaskManager(redis_client)
@app.post("/api/v1/mediation-analysis", status_code=202)
async def create_analysis(request: AnalysisRequest, background_tasks: BackgroundTasks):
"""
创建因果中介分析任务
I. 请求验证:
a) 检查必需字段完整性
b) 验证数据格式和类型
c) 确认样本量充足性
II. 任务调度:
a) 生成唯一任务ID
b) 存储到任务队列
c) 启动后台分析进程
返回:
202 Accepted 状态,包含任务ID和状态查询端点
"""
# 验证数据
if not request.data:
raise HTTPException(status_code=400, detail="数据不能为空")
if len(request.data) < 100:
raise HTTPException(status_code=400, detail="样本量至少需要100个")
# 创建任务
task_id = task_manager.create_task(request)
# 启动后台任务
background_tasks.add_task(perform_analysis, request)
return {
"request_id": task_id,
"status": "accepted",
"estimated_time": "30-60秒",
"result_url": f"/api/v1/results/{task_id}"
}
@app.get("/api/v1/results/{request_id}")
async def get_results(request_id: str):
"""
获取分析结果
I. 查询任务状态:
a) 检查Redis缓存
b) 验证任务存在性
II. 响应处理:
a) 正在运行: 返回进度
b) 已完成: 返回结果
c) 失败: 返回错误信息
"""
task = task_manager.get_task(request_id)
if not task:
raise HTTPException(status_code=404, detail="任务不存在或已过期")
response = AnalysisResponse(
request_id=request_id,
status=task['status'],
created_at=datetime.fromisoformat(task['created_at']),
completed_at=datetime.fromisoformat(task['completed_at']) if task['completed_at'] else None
)
if task['status'] == 'completed':
response.results = task['results']
elif task['status'] == 'failed':
response.error = task.get('error', '未知错误')
return response
def perform_analysis(request: AnalysisRequest):
"""
执行分析的核心函数(在后台线程中运行)
I. 数据加载: 将JSON转换为DataFrame
II. 预处理: 调用MediationDataProcessor
III. 效应估计: 调用CausalMediationEstimator
IV. 结果验证: 敏感性分析和稳健性检查
V. 结果存储: 序列化到Redis
"""
try:
task_manager.update_task(request.request_id, 'running', progress=10)
# I. 数据加载
df = pd.DataFrame(request.data)
print(f"加载数据: {len(df)} 行, {len(df.columns)} 列")
task_manager.update_task(request.request_id, 'running', progress=20)
# II. 预处理
processor = MediationDataProcessor(
treatment_col=request.treatment_col,
mediator_cols=request.mediator_cols,
outcome_col=request.outcome_col,
covariate_cols=request.covariate_cols or []
)
df_processed, metadata = processor.fit_transform(df)
task_manager.update_task(request.request_id, 'running', progress=40)
# III. 效应估计
estimator = CausalMediationEstimator(
method=request.method,
alpha=request.significance_level
)
effects = estimator.fit(
df_processed,
treatment=request.treatment_col,
mediators=request.mediator_cols,
outcome=request.outcome_col,
covariates=request.covariate_cols
)
task_manager.update_task(request.request_id, 'running', progress=70)
# IV. 多中介分解
if len(request.mediator_cols) > 1:
multi_analyzer = MultiMediationAnalyzer(estimator)
decomposition = multi_analyzer.decompose_indirect_effects(
df_processed,
treatment=request.treatment_col,
mediators=request.mediator_cols,
outcome=request.outcome_col,
covariates=request.covariate_cols
)
decomposition_json = decomposition.to_dict('records')
else:
decomposition_json = None
task_manager.update_task(request.request_id, 'running', progress=90)
# V. 结果汇总
summary = estimator.get_effects_summary()
results = {
'summary': summary.to_dict('records'),
'decomposition': decomposition_json,
'metadata': metadata,
'sample_size': len(df),
'significance_level': request.significance_level
}
# 存储最终结果
task_manager.update_task(request.request_id, 'completed', progress=100, results=results)
print(f"任务 {request.request_id} 完成")
except Exception as e:
error_msg = str(e)
print(f"任务 {request.request_id} 失败: {error_msg}")
task_manager.update_task(request.request_id, 'failed', progress=0,
results={'error': error_msg})
# 健康检查端点
@app.get("/health")
async def health_check():
"""服务健康检查"""
redis_status = redis_client.ping()
return {
"status": "healthy" if redis_status else "degraded",
"redis": "connected" if redis_status else "disconnected",
"timestamp": datetime.utcnow().isoformat()
}
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)
4.2.2 Docker容器化配置
# Dockerfile
FROM python:3.9-slim
LABEL maintainer="causal-team@company.com"
LABEL description="因果中介分析生产服务"
# I. 系统依赖安装
RUN apt-get update && apt-get install -y \
gcc \
g++ \
libgomp1 \
&& rm -rf /var/lib/apt/lists/*
# II. 工作目录设置
WORKDIR /app
# III. Python依赖安装
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# IV. 应用代码复制
COPY mediation_analyzer.py .
COPY mediation_api.py .
COPY models/ ./models/
# V. 非root用户运行
RUN useradd --create-home --shell /bin/bash appuser
RUN chown -R appuser:appuser /app
USER appuser
# VI. 健康检查
HEALTHCHECK --interval=30s --timeout=10s --start-period=5s --retries=3 \
CMD curl -f http://localhost:8000/health || exit 1
# VII. 启动命令
EXPOSE 8000
CMD ["uvicorn", "mediation_api:app", "--host", "0.0.0.0", "--port", "8000", "--workers", "4"]
# docker-compose.yml
version: '3.8'
services:
mediation-api:
build: .
container_name: mediation-analysis-api
ports:
- "8000:8000"
environment:
- REDIS_HOST=redis
- REDIS_PORT=6379
- MLFLOW_TRACKING_URI=http://mlflow:5000
depends_on:
- redis
- postgres
- mlflow
networks:
- causal-network
deploy:
resources:
limits:
cpus: '4'
memory: 8G
reservations:
cpus: '2'
memory: 4G
restart: unless-stopped
redis:
image: redis:7-alpine
container_name: causal-redis
ports:
- "6379:6379"
volumes:
- redis-data:/data
networks:
- causal-network
command: redis-server --appendonly yes
postgres:
image: postgres:14
container_name: causal-db
environment:
POSTGRES_DB: mediation_analysis
POSTGRES_USER: causal_user
POSTGRES_PASSWORD: secure_password
volumes:
- postgres-data:/var/lib/postgresql/data
networks:
- causal-network
ports:
- "5432:5432"
mlflow:
image: python:3.9-slim
container_name: causal-mlflow
ports:
- "5000:5000"
volumes:
- mlflow-data:/mlflow
command: >
bash -c "pip install mlflow &&
mlflow server --host 0.0.0.0 --port 5000
--backend-store-uri postgresql://causal_user:secure_password@postgres:5432/mediation_analysis
--default-artifact-root /mlflow"
networks:
- causal-network
volumes:
redis-data:
postgres-data:
mlflow-data:
networks:
causal-network:
driver: bridge
4.3 部署流程详解
完整的部署流程分为以下阶段:
| 阶段 | 操作步骤 | 命令/代码 | 预期输出 |
|---|---|---|---|
| I | 环境构建 | docker-compose build |
构建成功,镜像大小<1GB |
| II | 服务启动 | docker-compose up -d |
所有容器Running状态 |
| III | 健康检查 | curl http://localhost:8000/health |
{"status":"healthy"} |
| IV | 负载测试 | locust -f load_test.py |
QPS > 100,延迟<200ms |
| V | 监控配置 | 配置Prometheus抓取 | 指标正常上报 |
# 启动完整服务栈
docker-compose up -d --build
# 查看服务状态
docker-compose ps
# 日志监控
docker-compose logs -f mediation-api
# 性能测试
ab -n 1000 -c 10 -p request.json -T application/json \
http://localhost:8000/api/v1/mediation-analysis
4.3.1 Kubernetes生产部署
# k8s-deployment.yml
apiVersion: apps/v1
kind: Deployment
metadata:
name: mediation-api
spec:
replicas: 3
selector:
matchLabels:
app: mediation-api
template:
metadata:
labels:
app: mediation-api
spec:
containers:
- name: api
image: your-registry/mediation-api:v1.0.0
ports:
- containerPort: 8000
env:
- name: REDIS_HOST
value: "redis-service"
- name: ENVIRONMENT
value: "production"
resources:
requests:
memory: "2Gi"
cpu: "1"
limits:
memory: "4Gi"
cpu: "2"
livenessProbe:
httpGet:
path: /health
port: 8000
initialDelaySeconds: 30
periodSeconds: 10
readinessProbe:
httpGet:
path: /health
port: 8000
initialDelaySeconds: 5
periodSeconds: 5
---
apiVersion: v1
kind: Service
metadata:
name: mediation-api-service
spec:
selector:
app: mediation-api
ports:
- protocol: TCP
port: 80
targetPort: 8000
type: LoadBalancer
V. 进阶实践:处理复杂场景与鲁棒性优化
5.1 非线性中介效应处理
当处理变量与中介变量存在交互作用时,传统线性假设失效。我们使用广义相加模型(GAM):
from pygam import GAM, LinearGAM, s, f
class NonLinearMediationEstimator(CausalMediationEstimator):
"""
非线性中介效应估计器
使用GAM捕获非线性关系
"""
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.use_gam = True
def _fit_mediator_gam(self, df: pd.DataFrame, mediator: str, covariates: list):
"""使用GAM拟合中介变量"""
X = df[[self.treatment] + covariates]
# 构建GAM公式: s(连续变量) + f(分类变量)
terms = s(0) # 处理变量
for i, cov in enumerate(covariates, 1):
if df[cov].nunique() < 10: # 分类变量
terms += f(i)
else: # 连续变量
terms += s(i)
gam = LinearGAM(terms)
gam.fit(X, df[mediator])
return gam
def _fit_outcome_gam(self, df: pd.DataFrame, mediators: list, covariates: list):
"""使用GAM拟合结果变量"""
features = [self.treatment] + mediators + covariates
X = df[features]
terms = s(0) # 处理变量
# 中介变量(连续)
for i in range(1, len(mediators) + 1):
terms += s(i)
# 协变量
for i in range(len(mediators) + 1, len(features)):
col = features[i]
if df[col].nunique() < 10:
terms += f(i)
else:
terms += s(i)
# 添加处理-中介交互项
terms += te(0, 1) # tensor product interaction
gam = LinearGAM(terms)
gam.fit(X, df[self.outcome])
return gam
5.2 高维中介变量处理
当存在数十个潜在中介变量时,需要变量选择:
from sklearn.ensemble import RandomForestRegressor
from sklearn.feature_selection import SelectFromModel
class HighDimensionalMediationAnalyzer:
"""
高维中介变量分析器
自动筛选重要中介变量
"""
def select_mediators(self,
df: pd.DataFrame,
treatment: str,
candidate_mediators: list,
outcome: str,
method: str = 'random_forest',
n_select: int = 10) -> list:
"""
选择最重要的n_select个中介变量
方法:
1. 计算每个候选中介变量的"中介强度"得分
2. 得分 = α * |∂M/∂T| + (1-α) * |∂Y/∂M|
3. 返回Top-K中介变量
"""
mediator_scores = {}
for mediator in candidate_mediators:
# 计算处理对中介的影响强度
model_m = RandomForestRegressor(n_estimators=50, random_state=42)
model_m.fit(df[[treatment]], df[mediator])
treatment_effect_m = np.abs(
model_m.predict([[1]])[0] - model_m.predict([[0]])[0]
)
# 计算中介对结果的预测力
model_y = RandomForestRegressor(n_estimators=50, random_state=42)
model_y.fit(df[[mediator]], df[outcome])
mediator_importance = model_y.feature_importances_[0]
# 综合得分
score = treatment_effect_m * mediator_importance
mediator_scores[mediator] = score
# 选择Top-K
selected_mediators = sorted(mediator_scores.items(),
key=lambda x: x[1],
reverse=True)[:n_select]
return [m[0] for m in selected_mediators]
# 使用示例
high_dim_analyzer = HighDimensionalMediationAnalyzer()
selected = high_dim_analyzer.select_mediators(
df_processed,
treatment='coupon_type',
candidate_mediators=['item_count', 'avg_price', 'browse_time', 'cart_items',
'session_count', 'click_rate', 'search_freq'],
outcome='gmv',
n_select=3
)
print(f"选择的最重要中介变量: {selected}")
5.3 实时分析流处理
对于需要实时因果分析的流式数据场景:
from kafka import KafkaConsumer, KafkaProducer
import json
class StreamingMediationAnalyzer:
"""
流式因果中介分析器
集成Kafka实现实时分析
"""
def __init__(self,
bootstrap_servers: list,
input_topic: str,
output_topic: str):
self.consumer = KafkaConsumer(
input_topic,
bootstrap_servers=bootstrap_servers,
value_deserializer=lambda m: json.loads(m.decode('utf-8')),
group_id='mediation-analyzer'
)
self.producer = KafkaProducer(
bootstrap_servers=bootstrap_servers,
value_serializer=lambda v: json.dumps(v).encode('utf-8')
)
self.output_topic = output_topic
self.buffer = []
self.batch_size = 1000
def process_stream(self):
"""
处理流式数据
I. 消费消息: 从Kafka读取事件
II. 缓冲累积: 达到batch_size触发分析
III. 增量更新: 使用在线学习算法
IV. 结果生产: 将效应估计发送到输出主题
"""
for message in self.consumer:
event = message.value
# 验证事件格式
if self._validate_event(event):
self.buffer.append(event)
# 批量处理
if len(self.buffer) >= self.batch_size:
print(f"处理批次: {len(self.buffer)} 条记录")
self._analyze_batch()
self.buffer = []
def _validate_event(self, event: dict) -> bool:
"""验证事件数据完整性"""
required_fields = {'user_id', 'treatment', 'mediators', 'outcome', 'timestamp'}
return required_fields.issubset(event.keys())
def _analyze_batch(self):
"""分析当前批次数据"""
df = pd.DataFrame(self.buffer)
try:
# 快速分析(使用简化模型)
estimator = CausalMediationEstimator(method='regression')
effects = estimator.fit(
df,
treatment='treatment',
mediators=['item_count', 'avg_price'],
outcome='outcome'
)
# 发送结果
result = {
'timestamp': datetime.utcnow().isoformat(),
'batch_size': len(self.buffer),
'effects': effects,
'alert': effects['indirect'] < 50 # 效应过低预警
}
self.producer.send(self.output_topic, value=result)
except Exception as e:
print(f"分析失败: {e}")
# 启动流处理
streaming_analyzer = StreamingMediationAnalyzer(
bootstrap_servers=['localhost:9092'],
input_topic='user-events',
output_topic='mediation-results'
)
streaming_analyzer.process_stream()
- 点赞
- 收藏
- 关注作者
评论(0)