长期指标监测:如何构建稳健的因果效应评估体系

举报
数字扫地僧 发表于 2025/12/19 15:44:16 2025/12/19
【摘要】 I. 长期因果效应评估的理论基础 I.I 从短期到长期:因果推断的时效性挑战传统A/B测试基于Stable Unit Treatment Value Assumption (SUTVA)和即时测量框架。但在长期监测场景中,我们需要扩展这一框架:概念维度短期评估长期评估关键差异时间窗口7-30天90-365天+观测周期延长导致的数据删失稳定性假设用户行为静态用户行为动态演化违反SUTVA的风...

I. 长期因果效应评估的理论基础

I.I 从短期到长期:因果推断的时效性挑战

传统A/B测试基于Stable Unit Treatment Value Assumption (SUTVA)和即时测量框架。但在长期监测场景中,我们需要扩展这一框架:

概念维度 短期评估 长期评估 关键差异
时间窗口 7-30天 90-365天+ 观测周期延长导致的数据删失
稳定性假设 用户行为静态 用户行为动态演化 违反SUTVA的风险增加
反馈机制 忽略系统反馈 必须考虑市场均衡 外部性效应显现
样本代表性 初始样本即代表 样本选择性流失 幸存者偏差严重

实例洞察:某社交产品在2023年进行了一项"好友推荐算法"优化实验。短期数据显示,新算法使用户好友添加量提升了18%(p<0.01)。但当我们追踪6个月后发现,实验组的用户社交活跃度反而比对照组低12%。原因在于新算法推荐了大量"弱关系"好友,短期内增加了连接数,但长期降低了用户的互动质量。

I.II 长期效应的数学框架

对于个体i在时间t的结果变量Y,我们可以用潜在结果框架表示:

Y_it(D_i) = α_i + τ_i·D_i + Σ(β_s·D_i·I(t>s)) + ε_it

其中:

  • D_i ∈ {0,1} 表示处理分配
  • τ_i 是即时效应
  • β_s 是时间s的延迟效应
  • I(t>s) 是指示函数

平均处理效应(ATE)的动态版本

ATE(t) = E[Y_it(1) - Y_it(0)] = τ + Σ(β_s) for s ≤ t

I.III 关键假设的重新审视

假设名称 短期版本 长期版本 检验方法
无干扰假设 用户间无交互 允许网络效应 网络结构分析
一致性假设 处理定义清晰 处理强度可能衰减 剂量响应建模
可忽略性假设 基线协变量足够 需时变协变量调整 重加权法
正向性假设 处理分配概率>0 随时间保持正值 截断权重诊断

Mermaid总结

长期因果评估框架
理论基础层
假设检验层
效应估计层
潜在结果框架扩展
时间动态ATE
识别策略
网络干扰检验
处理衰减建模
时变混淆调整
生存分析集成
面板数据方法
贝叶斯更新

II. 构建稳健评估体系的核心要素

II.I 分层实验设计架构

传统的单层实验设计在长期监测中容易失败。我们推荐分层协变量平衡设计

  1. 分层随机化:根据用户生命周期价值(LTV)预测分位数进行分层
  2. 时间块设计:将用户按进入实验的时间分块,每块内独立随机化
  3. 自适应分配:根据早期信号动态调整分配比例,但保持长期对照组

实施步骤

步骤 操作 目的 技术要点
基线特征工程 构建分层变量 包含历史行为、人口属性、设备特征
分层构造 创建同质性层 每层样本量≥200,确保平衡性
块内随机化 时间块划分 每周一个块,防止时间混杂
分配算法 Pocock-Simon最小化 动态平衡边际分布
监测看板 实时SMD检查 标准化均值差<0.1

II.II 多时间点测量协议

长期监测需要精心设计测量时间点,避免过度打扰用户:

测量频率矩阵

实验阶段 时间窗口 测量频率 核心指标 辅助指标
即时反应 0-7天 每日 激活率、首次行为 页面停留
短期适应 8-30天 每周 留存率、活跃度 功能渗透
中期稳定 31-90天 每双周 LTV、转化率 满意度
长期价值 91-365天 每月 年留存、NPS 商业价值

代码实现:测量调度器

from datetime import datetime, timedelta
import pandas as pd
from typing import Dict, List

class LongTermMeasurementScheduler:
    """
    长期实验测量调度系统
    根据实验启动时间和用户进入时间,自动生成测量计划
    """
    
    def __init__(self, experiment_start: datetime):
        self.start = experiment_start
        self.measurement_plan = self._define_plan()
    
    def _define_plan(self) -> Dict[str, Dict]:
        """定义多阶段测量协议"""
        return {
            'immediate': {
                'period': (0, 7),
                'frequency_days': 1,
                'metrics': ['activation', 'first_action', 'session_duration'],
                'survey': False
            },
            'short_term': {
                'period': (8, 30),
                'frequency_days': 7,
                'metrics': ['weekly_retention', 'engagement_score', 'feature_penetration'],
                'survey': True
            },
            'medium_term': {
                'period': (31, 90),
                'frequency_days': 14,
                'metrics': ['ltv_estimate', 'conversion_rate', 'satisfaction'],
                'survey': True
            },
            'long_term': {
                'period': (91, 365),
                'frequency_days': 30,
                'metrics': ['yearly_retention', 'nps', 'business_value'],
                'survey': True
            }
        }
    
    def generate_schedule(self, user_id: str, entry_time: datetime) -> pd.DataFrame:
        """
        为单个用户生成测量时间表
        
        参数:
            user_id: 用户唯一标识
            entry_time: 用户进入实验时间
        
        返回:
            DataFrame包含每次测量的时间点、阶段、指标
        """
        schedule_list = []
        
        for phase, config in self.measurement_plan.items():
            start_day, end_day = config['period']
            freq = config['frequency_days']
            
            # 计算该阶段的测量时间点
            current_day = start_day
            while current_day <= end_day:
                measurement_time = entry_time + timedelta(days=current_day)
                
                # 避免安排过去的时间点
                if measurement_time > datetime.now():
                    schedule_list.append({
                        'user_id': user_id,
                        'phase': phase,
                        'measurement_day': current_day,
                        'scheduled_time': measurement_time,
                        'metrics': config['metrics'],
                        'include_survey': config['survey'],
                        'status': 'pending'
                    })
                
                current_day += freq
        
        return pd.DataFrame(schedule_list)
    
    def get_upcoming_measurements(self, hours_ahead: int = 24) -> pd.DataFrame:
        """
        获取未来指定时间内的待执行测量任务
        
        用于批量调度数据收集作业
        """
        now = datetime.now()
        cutoff = now + timedelta(hours=hours_ahead)
        
        # 这里简化为演示,实际应从数据库查询
        # 假设我们有一个存储所有schedule的DataFrame
        all_schedules = pd.DataFrame()  # 实际应用中从数据库加载
        
        upcoming = all_schedules[
            (all_schedules['scheduled_time'] <= cutoff) &
            (all_schedules['status'] == 'pending')
        ]
        
        return upcoming

# 使用示例
if __name__ == "__main__":
    # 初始化调度器
    experiment_start = datetime(2024, 1, 15)
    scheduler = LongTermMeasurementScheduler(experiment_start)
    
    # 为新用户生成测量计划
    user_id = "user_12345"
    user_entry_time = datetime(2024, 1, 20, 14, 30)
    
    user_schedule = scheduler.generate_schedule(user_id, user_entry_time)
    print(f"用户 {user_id} 的测量计划:")
    print(user_schedule.head(10))
    
    # 统计各阶段测量次数
    phase_counts = user_schedule['phase'].value_counts()
    print("\n各阶段测量次数:")
    print(phase_counts)

代码解释

这段代码实现了智能测量调度系统的核心功能。关键设计点包括:

  1. 四阶段协议:将长期监测划分为即时反应、短期适应、中期稳定和长期价值四个阶段,每个阶段有不同的测量频率和指标集。这种设计避免了"一刀切"的测量策略,在实验早期密集收集数据以捕捉即时效应,后期降低频率减少用户打扰。

  2. 动态调度generate_schedule方法根据每个用户的实际入组时间生成个性化测量计划,而不是所有用户在同一日历时间点测量。这解决了实验中持续滚入新用户的实际问题。

  3. 批处理优化get_upcoming_measurements方法支持批量查询即将执行的测量任务,可以集成到Airflow或Cron作业中,实现自动化的数据收集调度。

II.III 时变混淆调整策略

长期实验中最危险的威胁是时变混淆变量——那些在实验开始后发生并同时影响处理和结果的变量。我们采用边际结构模型(MSM)结合逆概率加权(IPTW)

核心思想:不是调整基线混淆变量,而是调整"处理分配历史"的概率。

import numpy as np
import pandas as pd
from sklearn.linear_model import LogisticRegression
from sklearn.preprocessing import StandardScaler

class TimeVaryingConfoundingAdjuster:
    """
    时变混淆调整器
    使用逆概率加权处理长期实验中的时变混淆
    """
    
    def __init__(self, stabilized: bool = True):
        self.stabilized = stabilized
        self.propensity_models = {}
        self.scaler = StandardScaler()
    
    def fit_propensity_models(self, panel_data: pd.DataFrame) -> Dict[int, LogisticRegression]:
        """
        为每个时间点拟合倾向得分模型
        
        参数:
            panel_data: 面板数据,需包含:
                - user_id: 用户ID
                - time: 时间点
                - treatment: 处理状态(0/1)
                - 时变协变量: 如过去行为、外部冲击等
        
        返回:
            每个时间点的倾向得分模型字典
        """
        # 确保数据排序
        panel_data = panel_data.sort_values(['user_id', 'time'])
        
        # 对每个时间点拟合模型
        unique_times = sorted(panel_data['time'].unique())
        propensity_scores = pd.DataFrame()
        
        for t in unique_times:
            # 获取时间t的数据
            t_data = panel_data[panel_data['time'] == t].copy()
            
            # 准备特征(包含过去的处理历史和协变量)
            feature_cols = [col for col in t_data.columns 
                           if col not in ['user_id', 'time', 'treatment']]
            
            X = t_data[feature_cols]
            y = t_data['treatment']
            
            # 标准化
            X_scaled = self.scaler.fit_transform(X)
            
            # 拟合逻辑回归
            model = LogisticRegression(max_iter=1000, class_weight='balanced')
            model.fit(X_scaled, y)
            
            # 预测倾向得分
            ps = model.predict_proba(X_scaled)[:, 1]
            
            # 存储结果
            t_data['propensity_score'] = ps
            propensity_scores = pd.concat([propensity_scores, t_data[['user_id', 'time', 'propensity_score', 'treatment']]])
            
            self.propensity_models[t] = model
        
        return propensity_scores
    
    def calculate_weights(self, propensity_scores: pd.DataFrame) -> pd.DataFrame:
        """
        计算逆概率权重(IPW)和稳定化权重
        
        实现考虑时变处理和时变混淆的IPW公式:
        
        SW_i(t) = ∏[P(A_i(s)=a_i(s)) / P(A_i(s)=a_i(s) | L_i(s))] 
        for s=0 to t
        
        其中A_i(s)是时间s的处理,L_i(s)是时间s的混淆变量
        """
        # 计算每个用户的处理历史概率
        weights_data = []
        
        for user_id, group in propensity_scores.groupby('user_id'):
            group = group.sort_values('time')
            
            # 计算分母:条件概率(给定协变量下的处理概率)
            denominator = 1.0
            numerator = 1.0
            
            for idx, row in group.iterrows():
                t = row['time']
                treatment = row['treatment']
                ps = row['propensity_score']
                
                # 分母:在时间t观察到该处理状态的概率
                denominator *= (ps if treatment == 1 else (1 - ps))
                
                # 分子:边际概率(如果稳定化)
                if self.stabilized:
                    # 计算时间t的边际处理概率
                    marginal_prob = propensity_scores[propensity_scores['time'] == t]['treatment'].mean()
                    numerator *= (marginal_prob if treatment == 1 else (1 - marginal_prob))
            
            # 稳定化权重 = numerator / denominator
            weight = numerator / denominator if self.stabilized else 1 / denominator
            
            weights_data.append({
                'user_id': user_id,
                'final_weight': weight,
                'denominator': denominator,
                'numerator': numerator
            })
        
        return pd.DataFrame(weights_data)
    
    def adjust_estimates(self, outcome_data: pd.DataFrame, weights: pd.DataFrame, 
                         time_point: int) -> Dict[str, float]:
        """
        使用IPW权重调整效应估计
        
        参数:
            outcome_data: 包含user_id, time, outcome的数据
            weights: calculate_weights的输出
            time_point: 评估的时间点
        
        返回:
            加权后的处理效应估计
        """
        # 合并数据和权重
        merged = outcome_data[outcome_data['time'] == time_point].merge(
            weights, on='user_id', how='inner'
        )
        
        # 按处理状态分组计算加权均值
        treatment_means = merged.groupby('treatment').apply(
            lambda x: np.average(x['outcome'], weights=x['final_weight'])
        )
        
        # 计算加权ATE
        ate = treatment_means.loc[1] - treatment_means.loc[0]
        
        # 计算加权方差(使用 sandwich estimator)
        # 这里简化演示,实际应使用稳健方差估计
        n_treated = len(merged[merged['treatment'] == 1])
        n_control = len(merged[merged['treatment'] == 0])
        
        # 返回结果
        return {
            'ate': ate,
            'treated_mean': treatment_means.loc[1],
            'control_mean': treatment_means.loc[0],
            'n_treated': n_treated,
            'n_control': n_control
        }

# 使用示例
if __name__ == "__main__":
    # 生成模拟面板数据
    np.random.seed(42)
    n_users = 1000
    n_timepoints = 12
    
    data = []
    for user_id in range(n_users):
        # 基线特征
        baseline_engagement = np.random.normal(0.5, 0.2)
        
        for t in range(n_timepoints):
            # 时变协变量(受过去处理影响)
            if t == 0:
                lag_treatment = 0
                lag_engagement = baseline_engagement
            else:
                lag_treatment = np.random.binomial(1, 0.3) if t % 4 == 0 else 0
                lag_engagement = baseline_engagement + 0.1 * lag_treatment
            
            # 当前处理(受时变协变量影响,造成混淆)
            treatment_prob = 0.5 + 0.2 * lag_engagement - 0.1 * (t / n_timepoints)
            treatment = np.random.binomial(1, np.clip(treatment_prob, 0.1, 0.9))
            
            # 结果变量(同时受处理和时变协变量影响)
            noise = np.random.normal(0, 0.1)
            outcome = 0.3 * treatment + 0.6 * lag_engagement + 0.1 * t + noise
            
            data.append({
                'user_id': user_id,
                'time': t,
                'treatment': treatment,
                'lag_engagement': lag_engagement,
                'lag_treatment': lag_treatment,
                'outcome': outcome
            })
    
    panel_df = pd.DataFrame(data)
    
    # 应用时变混淆调整
    adjuster = TimeVaryingConfoundingAdjuster(stabilized=True)
    
    # 拟合倾向得分模型
    ps_df = adjuster.fit_propensity_models(panel_df)
    print("倾向得分模型拟合完成")
    
    # 计算权重
    weights_df = adjuster.calculate_weights(ps_df)
    print(f"权重计算完成,平均权重: {weights_df['final_weight'].mean():.2f}")
    
    # 在时间t=6调整效应估计
    results = adjuster.adjust_estimates(panel_df, weights_df, time_point=6)
    print("\n时间t=6的调整后估计:")
    for key, value in results.items():
        print(f"  {key}: {value:.4f}" if isinstance(value, float) else f"  {key}: {value}")

Mermaid总结

核心挑战
网络干扰
处理衰减
样本流失
原始面板数据
时变协变量识别
倾向得分建模
每个时间点
IPW权重计算
稳定化调整
加权效应估计
Sandwich方差

III. 实例分析:电商平台用户增长实验(2000+字深度解析)

III.I 实验背景与目标

业务场景:某跨境电商平台(代号"GlobalShop")计划推出"智能购物助手"功能,该功能通过AI对话帮助用户发现商品。产品团队需要评估这一功能对用户长期价值的影响,而不仅仅是短期点击。

核心挑战

  • 长期价值评估(180天LTV)
  • 用户可能产生功能疲劳
  • 季节性购物行为干扰(实验跨越Q4购物季)
  • 样本流失率预计达40%(用户卸载App)

传统A/B测试的局限:如果只测量30天指标,可能会高估效应,因为用户初期因新鲜感而频繁使用,但长期可能依赖度下降。

III.II 实验设计细节

分层策略:基于用户历史行为划分5个层

分层维度 分层标准 样本分配比例 设计理由
Stratum 1 高价值用户(过去180天GMV>¥5000) 20% 防止负向影响核心用户
Stratum 2 活跃用户(过去30天访问>15次) 25% 确保足够信号
Stratum 3 普通用户(过去30天访问5-15次) 30% 主要目标群体
Stratum 4 低频用户(过去30天访问<5次) 15% 探索增长潜力
Stratum 5 新用户(注册<30天) 10% 独立评估新用户效果

时间块设计:实验历时240天,分为8个时间块(每块30天)。每个时间块内,新注册用户独立随机化,防止购物季效应混淆。

样本量计算:使用群组顺序设计(Group Sequential Design)

from statsmodels.stats.power import tt_solve_power
import numpy as np

def calculate_long_term_sample_size(
    effect_size: float,
    alpha: float = 0.05,
    power: float = 0.8,
    expected_attrition: float = 0.4,
    n_timepoints: int = 6,
    corr_structure: str = 'AR1'
) -> int:
    """
    计算考虑样本流失的样本量
    
    关键洞察:
    1. 样本流失会导致有效样本量减少
    2. 重复测量间的相关性影响方差
    3. 需要调整效应量以反映衰减
    """
    
    # 1. 基础样本量(单个时间点)
    # 使用t检验的样本量公式
    n_per_group_basic = tt_solve_power(
        effect_size=effect_size,
        alpha=alpha,
        power=power,
        alternative='larger'
    )
    
    # 2. 调整流失率
    # 最终需要分析的样本 = initial * (1 - attrition)
    # 因此初始样本 = final / (1 - attrition)
    n_per_group_with_attrition = n_per_group_basic / (1 - expected_attrition)
    
    # 3. 调整重复测量的相关性损失
    # 使用广义估计方程(GEE)的方差膨胀因子
    if corr_structure == 'AR1':
        # AR(1)相关结构:corr = rho^|t-s|
        rho = 0.7  # 假设相邻时间点相关性为0.7
        variance_inflation = (1 + (n_timepoints - 1) * rho) / n_timepoints
    elif corr_structure == 'CS':
        # 复合对称结构
        rho = 0.5
        variance_inflation = 1 + (n_timepoints - 1) * rho
    else:
        variance_inflation = 1
    
    n_per_group_final = n_per_group_with_attrition * variance_inflation
    
    # 4. 考虑效应衰减
    # 假设效应随时间指数衰减:effect(t) = effect_0 * exp(-lambda*t)
    decay_lambda = 0.1  # 衰减率
    avg_effect_over_time = effect_size * np.mean([
        np.exp(-decay_lambda * t) for t in range(n_timepoints)
    ])
    
    # 根据平均效应重新调整
    adjustment_ratio = effect_size / avg_effect_over_time
    n_per_group_final *= adjustment_ratio
    
    return int(np.ceil(n_per_group_final))

# 计算GlobalShop案例的样本量
if __name__ == "__main__":
    # 预期180天LTV提升效应量为0.2(Cohen's d)
    # 预期40%样本流失
    # 6个测量时间点(30, 60, 90, 120, 150, 180天)
    
    n_needed = calculate_long_term_sample_size(
        effect_size=0.2,
        expected_attrition=0.4,
        n_timepoints=6,
        corr_structure='AR1'
    )
    
    print(f"每组需要样本量: {n_needed:,}")
    print(f"总样本量(2组): {2*n_needed:,}")
    
    # 分层调整
    strata_weights = [0.2, 0.25, 0.3, 0.15, 0.1]
    for i, weight in enumerate(strata_weights, 1):
        strata_n = int(n_needed * weight)
        print(f"  分层{i}样本量: {strata_n:,}")

结果解读:计算显示,每组需要约3,850名初始用户,考虑40%流失后,最终分析样本约2,310名。分层1(高价值用户)需要770名初始样本,这是为了保护核心用户群体。

III.III 数据收集架构与实现

GlobalShop构建了事件驱动型数据管道,专门处理长期实验的稀疏数据问题:

import json
from typing import Dict, Any
from kafka import KafkaProducer
import redis
import boto3
from datetime import datetime, timedelta

class LongTermDataCollector:
    """
    长期实验数据收集器
    设计特点:
    1. 支持断点续传(处理用户离线情况)
    2. 去重机制(防止重复事件)
    3. 灵活的模式演化(支持新增指标)
    """
    
    def __init__(self, config: Dict[str, Any]):
        self.kafka_producer = KafkaProducer(
            bootstrap_servers=config['kafka_servers'],
            value_serializer=lambda v: json.dumps(v, default=str).encode('utf-8')
        )
        self.redis_client = redis.Redis(
            host=config['redis_host'],
            port=config['redis_port']
        )
        self.s3_client = boto3.client('s3')
        self.checkpoint_bucket = config['checkpoint_bucket']
        self.event_ttl_days = config.get('event_ttl', 365)  # 保留1年
        
    def collect_long_term_outcome(self, user_id: str, event_type: str, 
                                 event_data: Dict[str, Any]) -> bool:
        """
        收集长期结果变量
        
        去重逻辑:
        1. 使用Redis记录已处理事件的指纹
        2. 指纹有效期根据事件类型动态设置
        """
        # 生成事件指纹(去重键)
        event_fingerprint = self._generate_fingerprint(user_id, event_type, event_data)
        
        # 检查是否已处理(去重)
        cache_key = f"lt_event:{event_fingerprint}"
        if self.redis_client.exists(cache_key):
            return False  # 重复事件
        
        # 丰富事件元数据
        enriched_event = {
            'user_id': user_id,
            'event_type': event_type,
            'event_timestamp': datetime.utcnow().isoformat(),
            'experiment_id': event_data.get('experiment_id'),
            'treatment_group': event_data.get('treatment_group'),
            'metrics': event_data.get('metrics', {}),
            'metadata': {
                'session_id': event_data.get('session_id'),
                'app_version': event_data.get('app_version'),
                'network_type': event_data.get('network_type')
            }
        }
        
        # 发送到Kafka主题(按时间分区)
        topic = f"long_term_metrics_{datetime.now().strftime('%Y_%m')}"
        future = self.kafka_producer.send(topic, enriched_event)
        
        # 记录处理标记(TTL根据实验阶段设置)
        # 即时事件TTL=7天,长期事件TTL=365天
        ttl = 7*24*3600 if 'immediate' in event_type else self.event_ttl_days*24*3600
        self.redis_client.setex(cache_key, int(ttl), '1')
        
        try:
            future.get(timeout=10)
            return True
        except Exception as e:
            print(f"事件发送失败: {e}")
            return False
    
    def _generate_fingerprint(self, user_id: str, event_type: str, 
                             event_data: Dict) -> str:
        """生成事件唯一指纹"""
        import hashlib
        
        # 对于支付事件,包含订单ID
        if event_type == 'purchase':
            unique_str = f"{user_id}:{event_type}:{event_data.get('order_id')}"
        # 对于留存事件,包含测量窗口
        elif event_type == 'retention_check':
            window = event_data.get('measurement_window')
            unique_str = f"{user_id}:{event_type}:{window}"
        else:
            # 其他事件使用时间戳(精确到小时)
            hour_bucket = datetime.now().strftime('%Y%m%d%H')
            unique_str = f"{user_id}:{event_type}:{hour_bucket}"
        
        return hashlib.md5(unique_str.encode()).hexdigest()
    
    def recover_lost_measurements(self, user_id: str, 
                                 missed_windows: List[int]) -> List[Dict]:
        """
        恢复错过的测量窗口
        
        当用户离线后重新上线时,尝试补采缺失的长期数据
        关键场景:用户卸载后重装App
        """
        recovered_data = []
        
        for window in missed_windows:
            # 从S3加载检查点数据
            checkpoint_key = f"experiment_payt/data/global_shop_assistant/{user_id}/window_{window}.json"
            
            try:
                response = self.s3_client.get_object(
                    Bucket=self.checkpoint_bucket,
                    Key=checkpoint_key
                )
                checkpoint_data = json.loads(response['Body'].read())
                
                # 检查数据完整性
                if self._validate_checkpoint(checkpoint_data):
                    # 生成补偿事件
                    recovery_event = {
                        'user_id': user_id,
                        'event_type': 'recovered_measurement',
                        'original_window': window,
                        'recovery_time': datetime.utcnow().isoformat(),
                        'metrics': checkpoint_data.get('metrics'),
                        'data_quality': 'recovered'
                    }
                    recovered_data.append(recovery_event)
                    
                    # 重新发送到数据流
                    self.kafka_producer.send('recovered_metrics', recovery_event)
                    
            except self.s3_client.exceptions.NoSuchKey:
                print(f"检查点不存在: {checkpoint_key}")
                continue
        
        return recovered_data
    
    def _validate_checkpoint(self, data: Dict) -> bool:
        """验证检查点数据的完整性"""
        required_fields = ['user_id', 'measurement_window', 'metrics', 'last_updated']
        
        for field in required_fields:
            if field not in data:
                return False
        
        # 检查时间有效性(不超过30天的延迟)
        last_updated = datetime.fromisoformat(data['last_updated'])
        if datetime.now() - last_updated > timedelta(days=30):
            return False
        
        return True

# 实际应用场景
if __name__ == "__main__":
    # 配置
    config = {
        'kafka_servers': ['kafka01.globalshop.com:9092'],
        'redis_host': 'redis.globalshop.com',
        'redis_port': 6379,
        'checkpoint_bucket': 'globalshop-experiment-data',
        'event_ttl': 365
    }
    
    collector = LongTermDataCollector(config)
    
    # 场景1:日常数据收集
    event_data = {
        'experiment_id': 'exp_global_assistant_v1',
        'treatment_group': 'treatment',
        'session_id': 'sess_abc123',
        'app_version': '3.8.1',
        'metrics': {
            'ltv_180d': 528.50,
            'purchase_frequency': 12,
            'avg_order_value': 88.75,
            'nps_score': 8
        }
    }
    
    success = collector.collect_long_term_outcome(
        user_id='user_67890',
        event_type='long_term_measurement',
        event_data=event_data
    )
    print(f"数据收集成功: {success}")
    
    # 场景2:恢复错过的测量
    missed_windows = [3, 4, 5]  # 用户在30-90天期间离线
    recovered = collector.recover_lost_measurements('user_67890', missed_windows)
    print(f"恢复数据点数: {len(recovered)}")

架构深度解析

  1. 去重机制:使用Redis存储事件指纹,TTL根据事件类型动态设置。对于支付等高价值事件,使用订单ID作为去重键;对于留存等周期性事件,使用测量窗口作为键。这解决了移动场景下网络重试导致的重复上报问题。

  2. 数据恢复recover_lost_measurements方法处理了长期实验的核心痛点——用户流失后回归。通过S3存储的检查点机制,当用户重新安装App时,可以恢复部分历史数据,减少样本选择偏差。检查点数据每30天更新一次,包含关键指标的累计值。

  3. 模式演化:Kafka主题按月分区(long_term_metrics_2024_01),支持Schema Evolution。通过Avro或Protobuf序列化,可以在不破坏兼容性的前提下添加新指标。

III.IV 效应估计与敏感性分析

GlobalShop采用分层贝叶斯模型结合生存分析来估计长期因果效应:

import pymc as pm
import pandas as pd
import numpy as np
from lifelines import CoxPHFitter
import arviz as az

class LongTermEffectEstimator:
    """
    长期效应估计器
    整合贝叶斯层次模型与生存分析
    
    模型特点:
    1. 分层建模(考虑用户异质性)
    2. 时间动态效应(非恒定效应)
    3. 右删失处理(样本流失)
    """
    
    def __init__(self, n_strata: int = 5):
        self.n_strata = n_strata
        self.survival_model = CoxPHFitter()
        self.bayesian_model = None
    
    def prepare_panel_data(self, raw_data: pd.DataFrame) -> pd.DataFrame:
        """
        准备面板数据
        
        关键转换:
        1. 将宽格式转为长格式
        2. 创建流失标记(censoring indicator)
        3. 计算时间相对实验开始的偏移
        """
        # 假设raw_data是宽格式:user_id, treatment, day_30, day_60, ..., day_180, churned
        panel_data = []
        
        for _, row in raw_data.iterrows():
            user_id = row['user_id']
            treatment = row['treatment']
            strata = row['strata']
            
            # 检查时间点
            timepoints = [30, 60, 90, 120, 150, 180]
            
            for i, t in enumerate(timepoints):
                outcome_col = f'day_{t}'
                
                # 如果用户在该时间点前流失,标记为删失
                if pd.isna(row[outcome_col]):
                    # 使用上一个有数据的点作为删失时间
                    censored_time = timepoints[i-1] if i > 0 else 0
                    panel_data.append({
                        'user_id': user_id,
                        'time': censored_time,
                        'outcome': np.nan,
                        'treatment': treatment,
                        'strata': strata,
                        'censored': 1,
                        'observed': 0
                    })
                    break  # 后续时间点都无数据
                
                panel_data.append({
                    'user_id': user_id,
                    'time': t,
                    'outcome': row[outcome_col],
                    'treatment': treatment,
                    'strata': strata,
                    'censored': 0,
                    'observed': 1
                })
        
        return pd.DataFrame(panel_data)
    
    def fit_bayesian_hierarchical_model(self, panel_data: pd.DataFrame) -> pm.Model:
        """
        拟合分层贝叶斯模型
        
        模型结构:
        outcome_it ~ Normal(μ_it, σ)
        μ_it = α_strata[i] + γ_i + τ_treatment[i] * treatment_it + β_time * time_it
        
        其中:
        - α_strata: 分层基线(共享先验)
        - γ_i: 个体随机效应
        - τ_treatment: 处理效应(随时间可能变化)
        - β_time: 时间趋势
        """
        # 准备数据
        n_users = panel_data['user_id'].nunique()
        user_ids = panel_data['user_id'].astype('category').cat.codes.values
        strata_ids = panel_data['strata'].astype('category').cat.codes.values
        
        treatment_values = panel_data['treatment'].values
        time_values = panel_data['time'].values / 180  # 标准化到[0,1]
        outcomes = panel_data['outcome'].values
        
        # 只使用观测到的数据(非删失)
        observed_mask = panel_data['observed'].astype(bool).values
        
        with pm.Model() as model:
            # 超先验(Hyperpriors)
            # 全局均值
            mu_alpha = pm.Normal('mu_alpha', mu=0, sigma=1)
            sigma_alpha = pm.HalfNormal('sigma_alpha', sigma=1)
            
            # 分层基线效应
            alpha_strata = pm.Normal('alpha_strata', 
                                     mu=mu_alpha, 
                                     sigma=sigma_alpha, 
                                     shape=self.n_strata)
            
            # 个体随机效应
            sigma_gamma = pm.HalfNormal('sigma_gamma', sigma=0.5)
            gamma = pm.Normal('gamma', mu=0, sigma=sigma_gamma, shape=n_users)
            
            # 时间趋势
            beta_time = pm.Normal('beta_time', mu=0, sigma=0.5)
            
            # 处理效应(固定效应)
            # 允许效应随时间变化:τ(t) = τ0 + τ1 * t
            tau0 = pm.Normal('tau0', mu=0, sigma=0.5)  # 基线处理效应
            tau1 = pm.Normal('tau1', mu=0, sigma=0.3)  # 效应的时间趋势
            
            # 处理效应的时间交互
            treatment_time = treatment_values * time_values
            tau_treatment = tau0 + tau1 * time_values
            
            # 线性预测器
            mu = (alpha_strata[strata_ids] + 
                  gamma[user_ids] + 
                  tau_treatment * treatment_values +
                  beta_time * time_values)
            
            # 似然函数(仅观测数据)
            sigma = pm.HalfNormal('sigma', sigma=1)
            
            # 处理删失:只拟合观测到的结果
            outcome_observed = pm.Normal(
                'outcome_observed',
                mu=mu[observed_mask],
                sigma=sigma,
                observed=outcomes[observed_mask]
            )
            
            # 保存处理效应的轨迹
            pm.Deterministic('ATE_t', tau_treatment)
            pm.Deterministic('ATE_overall', pm.math.mean(tau_treatment))
            
            self.bayesian_model = model
        
        return model
    
    def fit_survival_analysis(self, survival_data: pd.DataFrame) -> None:
        """
        拟合生存分析模型评估对流失的影响
        
        模型公式:
        h(t|X) = h0(t) * exp(β_treatment * treatment + β_strata * strata)
        """
        # 准备生存数据
        # survival_data需要包含:user_id, duration, churned, treatment, strata
        
        self.survival_model.fit(
            survival_data,
            duration_col='duration',
            event_col='churned',
            formula='treatment + strata',
            show_progress=True
        )
        
        # 提取处理效应
        treatment_hr = self.survival_model.hazard_ratios_['treatment']
        treatment_ci = self.survival_model.confidence_intervals_.loc['treatment']
        
        print(f"\n处理效应风险比: {treatment_hr:.3f}")
        print(f"95% CI: [{treatment_ci['lower-bound']:.3f}, {treatment_ci['upper-bound']:.3f}]")
    
    def run_full_analysis(self, raw_data: pd.DataFrame) -> Dict[str, Any]:
        """
        运行完整的长期效应分析
        """
        results = {}
        
        # 第1步:准备面板数据
        panel_data = self.prepare_panel_data(raw_data)
        print(f"面板数据准备完成:{len(panel_data)}行")
        
        # 第2步:贝叶斯分析
        with self.fit_bayesian_hierarchical_model(panel_data):
            trace = pm.sample(
                draws=2000,
                chains=4,
                cores=4,
                target_accept=0.95,
                return_inferencedata=True
            )
        
        # 提取关键结果
        ate_trace = trace.posterior['ATE_overall'].values
        results['ate_mean'] = np.mean(ate_trace)
        results['ate_hdi_95'] = az.hdi(ate_trace, hdi_prob=0.95)
        results['trace'] = trace
        
        # 检查效应的时间趋势
        ate_t = trace.posterior['ATE_t'].values
        # 计算效应是否随时间衰减
        later_effect = ate_t[:, :, -1]  # 最后一个时间点
        early_effect = ate_t[:, :, 0]   # 第一个时间点
        decay_prob = np.mean(later_effect < early_effect * 0.8)
        results['decay_probability'] = decay_prob
        
        # 第3步:生存分析
        # 准备生存数据
        survival_data = raw_data[['user_id', 'treatment', 'strata', 'duration', 'churned']].copy()
        self.fit_survival_analysis(survival_data)
        results['survival_summary'] = self.survival_model.summary
        
        return results

# GlobalShop案例分析
if __name__ == "__main__":
    # 模拟GlobalShop实验数据
    np.random.seed(42)
    n_users = 8000  # 每组4000人
    
    # 生成基线数据
    data = []
    for user_id in range(n_users):
        # 分配分层
        strata = np.random.choice([1, 2, 3, 4, 5], p=[0.2, 0.25, 0.3, 0.15, 0.1])
        
        # 分配处理(按分层平衡)
        treatment = np.random.binomial(1, 0.5)
        
        # 生成时序结果(存在效应衰减)
        # 真实效应:前60天效应=15%,之后衰减到5%
        base_ltv = np.random.lognormal(mean=6, sigma=0.8)  # 基础LTV
        
        measurements = {}
        for t in [30, 60, 90, 120, 150, 180]:
            if t <= 60:
                effect = 0.15
            else:
                effect = 0.15 * np.exp(-0.02 * (t - 60))
            
            noise = np.random.normal(0, 0.1)
            ltv_at_t = base_ltv * (1 + effect * treatment) * (1 + 0.001 * t) + noise
            
            # 模拟样本流失(censoring)
            # 处理组流失率略高(功能复杂性导致)
            churn_prob = 0.35 + 0.05 * treatment
            if np.random.random() < churn_prob and t >= 90:
                # 在90天后流失
                measurements[f'day_{t}'] = np.nan
                measurements['duration'] = np.random.randint(90, t)
                measurements['churned'] = 1
                break
            else:
                measurements[f'day_{t}'] = max(ltv_at_t, 0)
                measurements['duration'] = 180
                measurements['churned'] = 0
        
        record = {
            'user_id': user_id,
            'treatment': treatment,
            'strata': strata,
            **measurements
        }
        data.append(record)
    
    raw_df = pd.DataFrame(data)
    print(f"数据生成完成:{len(raw_df)}用户")
    
    # 执行分析
    estimator = LongTermEffectEstimator(n_strata=5)
    results = estimator.run_full_analysis(raw_df)
    
    # 结果解读
    print("\n" + "="*60)
    print("GLOBALSHOP长期效应分析结果")
    print("="*60)
    print(f"平均处理效应(ATE): ¥{results['ate_mean']:.2f}")
    print(f"95%可信区间: [¥{results['ate_hdi_95'][0]:.2f}, ¥{results['ate_hdi_95'][1]:.2f}]")
    print(f"效应衰减概率(后期<早期80%): {results['decay_probability']:.1%}")
    
    # 可视化效应时间趋势
    import matplotlib.pyplot as plt
    
    trace = results['trace']
    ate_t = trace.posterior['ATE_t'].values.mean(axis=(0, 1))
    timepoints = [30, 60, 90, 120, 150, 180]
    
    plt.figure(figsize=(12, 6))
    plt.subplot(1, 2, 1)
    plt.plot(timepoints, ate_t, marker='o', linewidth=2)
    plt.axhline(y=0, color='r', linestyle='--', alpha=0.5)
    plt.xlabel('实验天数')
    plt.ylabel('处理效应(¥)')
    plt.title('处理效应的时间趋势')
    plt.grid(True, alpha=0.3)
    
    # 生存曲线
    plt.subplot(1, 2, 2)
    survival_plot = estimator.survival_model.plot()
    plt.title('用户留存曲线(按处理组)')
    plt.xlabel('天数')
    plt.ylabel('留存率')
    
    plt.tight_layout()
    plt.savefig('global_shop_analysis.png', dpi=300, bbox_inches='tight')
    print("\n分析图表已保存: global_shop_analysis.png")

Lexical error on line 29. Unrecognized text. ...关键洞察 F[短期效应 ≠ 长期价值] G[留存 ----------------------^

IV. 代码部署过程详解

IV.I 环境准备与依赖管理

组件 版本 用途 替代方案
Python 3.10+ 核心语言 R 4.3+
PyMC 5.10+ 贝叶斯推理 Stan, TensorFlow Probability
Kafka 3.5+ 数据流 AWS Kinesis, Pulsar
Redis 7.2+ 去重缓存 DynamoDB, Cassandra
PostgreSQL 15+ 数据仓库 BigQuery, Snowflake
Airflow 2.7+ 工作流调度 Prefect, Dagster

环境配置脚本

#!/bin/bash
# setup_long_term_experiment_env.sh
# 长期实验监测环境一键部署脚本

set -e

echo "=== 长期因果效应评估环境部署 ==="

# 1. 创建Python虚拟环境
python3.10 -m venv lt_causal_env
source lt_causal_env/bin/activate

# 2. 安装核心依赖
pip install --upgrade pip wheel setuptools

# 3. 安装科学计算栈
pip install numpy==1.24.3 pandas==2.0.3 scipy==1.11.2

# 4. 安装贝叶斯推理
pip install pymc==5.10.0 arviz==0.15.1

# 5. 安装生存分析
pip install lifelines==0.28.0

# 6. 安装数据管道
pip install kafka-python==2.0.2 redis==5.0.1 boto3==1.28.85

# 7. 安装可视化
pip install matplotlib==3.7.2 seaborn==0.13.0 plotly==5.17.0

# 8. 安装数据库连接
pip install psycopg2-binary==2.9.9 sqlalchemy==2.0.21

# 9. 安装工作流调度
pip install apache-airflow==2.7.2

# 10. 验证安装
python -c "
import pymc, lifelines, kafka, redis
print('✓ 核心库安装成功')
print(f'PyMC版本: {pymc.__version__}')
print(f'Lifelines版本: {lifelines.__version__}')
"

# 11. 创建项目结构
mkdir -p long_term_causal/{src,config,data/{raw,processed,checkpoints},tests,notebooks}

echo "=== 项目结构创建完成 ==="
tree long_term_causal -L 2

# 12. 配置环境变量
cat > long_term_causal/.env << EOF
EXPERIMENT_ID=exp_global_shop_assistant_v1
KAFKA_SERVERS=kafka01.prod:9092,kafka02.prod:9092
REDIS_HOST=redis.prod.internal
REDIS_PORT=6379
POSTGRES_HOST=warehouse.prod.internal
POSTGRES_PORT=5432
POSTGRES_DB=experiment_db
AWS_S3_BUCKET=experiment-checkpoints
AIRFLOW_HOME=/opt/airflow
EOF

echo "=== 部署完成!请编辑.env文件配置实际参数 ==="

IV.II Docker容器化部署

# Dockerfile.long-term-causal
FROM python:3.10-slim-bookworm

# 安装系统依赖
RUN apt-get update && apt-get install -y \
    build-essential \
    libpq-dev \
    gcc \
    g++ \
    && rm -rf /var/lib/apt/lists/*

# 创建应用用户(非root运行)
RUN groupadd -r expuser && useradd -r -g expuser expuser

# 设置工作目录
WORKDIR /app

# 复制依赖文件
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

# 复制应用代码
COPY src/ ./src/
COPY config/ ./config/
COPY .env .

# 更改文件所有者
RUN chown -R expuser:expuser /app

# 切换到非root用户
USER expuser

# 暴露API端口
EXPOSE 8000

# 健康检查
HEALTHCHECK --interval=30s --timeout=10s --start-period=5s --retries=3 \
    CMD python -c "import requests; requests.get('http://localhost:8000/health')"

# 启动命令
CMD ["uvicorn", "src.api:app", "--host", "0.0.0.0", "--port", "8000", "--log-level", "info"]

Docker Compose编排

# docker-compose.long-term-experiment.yml
version: '3.8'

services:
  # 长期效应评估API
  causal-api:
    build:
      context: .
      dockerfile: Dockerfile.long-term-causal
    container_name: lt_causal_api
    environment:
      - DATABASE_URL=postgresql://exp_user:secure_pass@postgres:5432/experiment_db
      - REDIS_URL=redis://redis:6379
      - KAFKA_BOOTSTRAP_SERVERS=kafka:9092
    ports:
      - "8000:8000"
    depends_on:
      - postgres
      - redis
      - kafka
    volumes:
      - ./data:/app/data
    networks:
      - experiment-net
    restart: unless-stopped
  
  # PostgreSQL数据仓库
  postgres:
    image: postgres:15-alpine
    container_name: lt_experiment_db
    environment:
      POSTGRES_USER: exp_user
      POSTGRES_PASSWORD: secure_pass
      POSTGRES_DB: experiment_db
    volumes:
      - pgdata:/var/lib/postgresql/data
      - ./sql/init.sql:/docker-entrypoint-initdb.d/init.sql
    ports:
      - "5432:5432"
    networks:
      - experiment-net
    restart: unless-stopped
  
  # Redis缓存
  redis:
    image: redis:7-alpine
    container_name: lt_redis_cache
    command: redis-server --appendonly yes --maxmemory 4gb --maxmemory-policy allkeys-lru
    volumes:
      - redisdata:/data
    ports:
      - "6379:6379"
    networks:
      - experiment-net
    restart: unless-stopped
  
  # Kafka消息队列
  kafka:
    image: bitnami/kafka:3.5
    container_name: lt_kafka
    environment:
      KAFKA_CFG_NODE_ID: 0
      KAFKA_CFG_PROCESS_ROLES: controller,broker
      KAFKA_CFG_LISTENERS: PLAINTEXT://:9092,CONTROLLER://:9093
      KAFKA_CFG_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
      KAFKA_CFG_CONTROLLER_QUORUM_VOTERS: 0@kafka:9093
      KAFKA_CFG_CONTROLLER_LISTENER_NAMES: CONTROLLER
      KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE: true
    ports:
      - "9092:9092"
    volumes:
      - kafkadata:/bitnami/kafka
    networks:
      - experiment-net
    restart: unless-stopped
  
  # Airflow调度器
  airflow-scheduler:
    image: apache/airflow:2.7.2-python3.10
    container_name: lt_airflow_scheduler
    command: scheduler
    environment:
      AIRFLOW__CORE__EXECUTOR: LocalExecutor
      AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgresql://airflow:airflow@postgres:5432/airflow
      AIRFLOW__CORE__FERNET_KEY: d6V1kEXAMPLEtcMJFau_6tG1sE6p9r1o4jff8=
    volumes:
      - ./dags:/opt/airflow/dags
      - ./logs:/opt/airflow/logs
    depends_on:
      - postgres
    networks:
      - experiment-net
    restart: unless-stopped
  
  # Airflow Web UI
  airflow-webserver:
    image: apache/airflow:2.7.2-python3.10
    container_name: lt_airflow_web
    command: webserver
    environment:
      AIRFLOW__CORE__EXECUTOR: LocalExecutor
      AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgresql://airflow:airflow@postgres:5432/airflow
      AIRFLOW__WEBSERVER__SECRET_KEY: your-secret-key
    ports:
      - "8080:8080"
    volumes:
      - ./dags:/opt/airflow/dags
      - ./logs:/opt/airflow/logs
    depends_on:
      - postgres
      - airflow-scheduler
    networks:
      - experiment-net
    restart: unless-stopped

volumes:
  pgdata:
  redisdata:
  kafkadata:

networks:
  experiment-net:
    driver: bridge

部署命令

# 启动整个长期实验监测系统
docker-compose -f docker-compose.long-term-experiment.yml up -d

# 检查服务状态
docker-compose -f docker-compose.long-term-experiment.yml ps

# 查看日志
docker logs lt_causal_api -f --tail=100

# 停止系统
docker-compose -f docker-compose.long-term-experiment.yml down

IV.III Airflow工作流调度

# dags/long_term_experiment_analysis.py
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.postgres.operators.postgres import PostgresOperator
from airflow.utils.dates import days_ago
from datetime import timedelta
import pandas as pd
from src.long_term_analysis import LongTermEffectEstimator

default_args = {
    'owner': 'data-science',
    'depends_on_past': True,
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 2,
    'retry_delay': timedelta(minutes=15),
    'execution_timeout': timedelta(hours=4)
}

with DAG(
    dag_id='long_term_experiment_analysis',
    default_args=default_args,
    description='长期实验因果效应分析Pipeline',
    schedule_interval='0 2 * * 1',  # 每周一凌晨2点执行
    start_date=days_ago(7),
    catchup=False,
    max_active_runs=1,
    tags=['causal-inference', 'long-term']
) as dag:
    
    # Task 1: 数据质量检查
    data_quality_check = PostgresOperator(
        task_id='data_quality_check',
        postgres_conn_id='experiment_db',
        sql="""
        -- 检查数据完整性
        SELECT 
            measurement_day,
            COUNT(*) as total_users,
            SUM(CASE WHEN outcome IS NULL THEN 1 ELSE 0 END) as missing_outcomes,
            SUM(CASE WHEN churned = 1 THEN 1 ELSE 0 END) as churned_users
        FROM long_term_outcomes
        WHERE experiment_id = '{{ var.value.experiment_id }}'
        GROUP BY measurement_day
        HAVING SUM(CASE WHEN outcome IS NULL THEN 1 ELSE 0 END) > 0.3 * COUNT(*);
        """,
        on_failure_callback=lambda context: send_alert("数据质量异常")
    )
    
    # Task 2: 倾向得分更新
    def update_propensity_scores(**context):
        """每周重新计算倾向得分(考虑时变协变量)"""
        from src.confounding_adjustment import TimeVaryingConfoundingAdjuster
        
        # 从数据库加载最新数据
        query = """
        SELECT * FROM long_term_panel_data
        WHERE experiment_id = %s AND time <= %s
        """
        df = pd.read_sql(
            query, 
            con=get_db_connection(),
            params=(context['var']['experiment_id'], context['ds'])
        )
        
        # 拟合倾向得分模型
        adjuster = TimeVaryingConfoundingAdjuster(stabilized=True)
        ps_df = adjuster.fit_propensity_models(df)
        
        # 存回数据库
        ps_df.to_sql(
            'propensity_scores',
            con=get_db_connection(),
            if_exists='replace',
            index=False
        )
        
        return f"更新了 {len(ps_df)} 条倾向得分记录"
    
    update_ps_task = PythonOperator(
        task_id='update_propensity_scores',
        python_callable=update_propensity_scores,
        pool='experiment_analysis_pool'
    )
    
    # Task 3: 效应估计
    def estimate_long_term_effects(**context):
        """执行主分析"""
        estimator = LongTermEffectEstimator(n_strata=5)
        
        # 加载数据
        raw_data = pd.read_sql(
            "SELECT * FROM long_term_outcomes_wide",
            con=get_db_connection()
        )
        
        # 运行分析
        results = estimator.run_full_analysis(raw_data)
        
        # 保存结果到S3
        s3_client = get_s3_client()
        s3_client.put_object(
            Bucket='experiment-results',
            Key=f"{context['var']['experiment_id']}/weekly_results_{context['ds']}.json",
            Body=json.dumps(results, default=str)
        )
        
        # 推送到BI工具
        push_to_bi_tool(results)
        
        return results
    
    estimate_task = PythonOperator(
        task_id='estimate_effects',
        python_callable=estimate_long_term_effects,
        trigger_rule='all_success'
    )
    
    # Task 4: 敏感性分析
    def sensitivity_analysis(**context):
        """检查结果的稳健性"""
        from src.sensitivity import SensitivityAnalyzer
        
        analyzer = SensitivityAnalyzer()
        
        # 边界分析(Bounding Analysis)
        bounds = analyzer.robustness_bounds(
            outcome_db_connection=get_db_connection(),
            experiment_id=context['var']['experiment_id']
        )
        
        # 发表偏倚检测
        publication_bias = analyzer.egger_test(bounds)
        
        # 未测量混淆的E值
        e_value = analyzer.calculate_e_value(
            observed_effect=context['ti'].xcom_pull(task_ids='estimate_effects')['ate_mean'],
            ci=context['ti'].xcom_pull(task_ids='estimate_effects')['ate_hdi_95']
        )
        
        return {
            'robustness_bounds': bounds,
            'publication_bias_p': publication_bias,
            'e_value': e_value
        }
    
    sensitivity_task = PythonOperator(
        task_id='sensitivity_analysis',
        python_callable=sensitivity_analysis
    )
    
    # Task 5: 报告生成
    def generate_report(**context):
        """生成自动化周报"""
        from src.reporting import LongTermExperimentReporter
        
        reporter = LongTermExperimentReporter(
            experiment_id=context['var']['experiment_id'],
            analysis_date=context['ds']
        )
        
        # 加载所有任务结果
        effect_results = context['ti'].xcom_pull(task_ids='estimate_effects')
        sensitivity_results = context['ti'].xcom_pull(task_ids='sensitivity_analysis')
        
        # 生成报告
        report_path = reporter.generate_pdf_report(
            effect_results=effect_results,
            sensitivity_results=sensitivity_results,
            include_appendices=True
        )
        
        # 发送邮件
        reporter.send_report_email(
            recipients=context['var']['report_recipients'],
            attachment_path=report_path
        )
        
        return report_path
    
    report_task = PythonOperator(
        task_id='generate_report
        python_callable=generate_report,
        trigger_rule='all_success'
    )
    
    # 定义依赖关系
    data_quality_check >> update_ps_task >> estimate_task
    estimate_task >> [sensitivity_task, report_task]

IV.IV 生产环境最佳实践

实践领域 推荐做法 反模式 实施难度
监控告警 多阶段延迟告警(即时/24h/7d/30d) 单一阈值告警 中等
数据质量 自动数据漂移检测(PSI>0.2触发) 人工抽查
模型重训练 季度重训练+滚动窗口更新 静态模型 中等
容量规划 预留50%计算资源应对峰值 按需扩展(冷启动慢)
灾备恢复 双活数据中心+RPO<5min 单点+日备份
安全合规 差分隐私(ε=1.0)+审计日志 原始数据直出 中等

Mermaid总结

关键设计决策
存储计算分离
事件溯源架构
声明式基础设施
生产部署架构
基础设施层
数据管道层
分析服务层
监控管理层
Docker容器化
微服务
K8s编排
自动扩缩
对象存储
S3/GCS
Kafka流处理
实时采集
Airflow调度
批处理
Redis缓存
去重
贝叶斯推理服务
PyMC后端
生存分析引擎
Lifelines
权重计算集群
Spark
Prometheus监控
指标采集
Grafana看板
可视化
PagerDuty告警
异常通知

V. 高级技术与优化策略

V.I 贝叶斯自适应实验设计

传统实验使用固定样本量,但长期实验可以自适应调整

阶段 动作 决策标准 统计方法
第30天 早期停止(futility) 后验概率<5% 预测概率(PPP)
第90天 样本量重估 效应估计精度不足 条件功效
第180天 优胜组选择 可信区间分离 贝叶斯决策理论
import pymc as pm
import numpy as np

class BayesianAdaptiveTrial:
    """
    贝叶斯自适应试验设计
    
    核心思想:
    1. 使用预测概率(Predictive Probability)评估试验成功可能性
    2. 基于中期数据动态调整样本量
    3. 考虑实际约束(最大样本量、时间成本)
    """
    
    def __init__(self, success_threshold: float = 0.95, 
                 futility_threshold: float = 0.05,
                 max_sample_size: int = 10000):
        self.success_threshold = success_threshold
        self.futility_threshold = futility_threshold
        self.max_n = max_sample_size
    
    def calculate_predictive_probability(self, interim_data: pd.DataFrame, 
                                        target_effect: float = 0.1) -> Dict[str, float]:
        """
        计算预测概率(PPP)
        
        步骤:
        1. 基于中期数据拟合后验分布
        2. 从后验预测分布模拟完整试验结果
        3. 计算达到显著性的比例
        """
        with pm.Model() as predictive_model:
            # 基于中期数据定义先验
            n_interim = len(interim_data)
            observed_effect = interim_data['outcome'].mean()
            
            # 效应量的分层先验
            mu = pm.Normal('mu', mu=0, sigma=0.2)
            sigma_between = pm.HalfNormal('sigma_between', sigma=0.1)
            
            # 个体随机效应
            n_users = interim_data['user_id'].nunique()
            user_effects = pm.Normal('user_effects', mu=mu, sigma=sigma_between, 
                                     shape=n_users)
            
            # 似然函数
            sigma_within = pm.HalfNormal('sigma_within', sigma=0.5)
            outcomes = pm.Normal(
                'outcomes',
                mu=user_effects[interim_data['user_id'].astype('category').cat.codes],
                sigma=sigma_within,
                observed=interim_data['outcome'].values
            )
            
            # 后验采样
            with predictive_model:
                trace = pm.sample(2000, chains=4, cores=4, 
                                 target_accept=0.95, progressbar=False)
            
            # 预测未来数据
            remaining_n = self.max_n - n_interim
            ppc = pm.sample_posterior_predictive(
                trace,
                var_names=['outcomes'],
                model=predictive_model,
                predictions=remaining_n
            )
            
            # 计算PPP:预测中达到目标效应的比例
            predicted_effects = ppc.posterior_predictive['outcomes'].mean(axis=(0, 1))
            ppp_success = (predicted_effects > target_effect).mean()
            ppp_futility = (predicted_effects < 0.01).mean()
            
        return {
            'predictive_probability_success': ppp_success,
            'predictive_probability_futility': ppp_futility,
            'recommended_action': self._make_adaptive_decision(ppp_success, ppp_futility)
        }
    
    def _make_adaptive_decision(self, ppp_success: float, 
                               ppp_futility: float) -> Dict[str, Any]:
        """基于PPP做出自适应决策"""
        if ppp_success > self.success_threshold:
            return {
                'decision': 'early_stop_success',
                'reason': '预测成功概率超过阈值',
                'continue_sampling': False
            }
        elif ppp_futility > self.futility_threshold:
            return {
                'decision': 'early_stop_futility',
                'reason': '预测无效概率超过阈值',
                '继续采样': False
            }
        else# 计算条件功效,决定是否需要增加样本
            current_precision = self._estimate_precision()
            required_n = self._recalculate_sample_size(current_precision)
            
            return {
                'decision': 'continue_or_extend',
                'recommended_sample_size': min(required_n, self.max_n),
                'continue_sampling': True
            }
    
    def _recalculate_sample_size(self, current_precision: float, 
                                target_precision: float = 0.05) -> int:
        """
        基于中期数据重新计算样本量
        
        使用条件功效法(Conditional Power)
        """
        # 简化的样本量重估逻辑
        # 实际应用中应使用复杂的条件功效计算
        precision_ratio = current_precision / target_precision
        additional_samples_needed = int(len(self.current_data) * precision_ratio**2)
        
        return len(self.current_data) + additional_samples_needed

# 在GlobalShop案例中应用
if __name__ == "__main__":
    # 模拟中期数据(实验运行到第90天)
    np.random.seed(42)
    interim_n = 4000  # 已收集4000用户
    
    interim_data = []
    for user_id in range(interim_n):
        treatment = np.random.binomial(1, 0.5)
        # 真实效应15%,但中期数据噪声较大
        noise = np.random.normal(0, 0.3)
        outcome = 0.15 * treatment + noise
        
        interim_data.append({
            'user_id': user_id,
            'treatment': treatment,
            'outcome': outcome
        })
    
    interim_df = pd.DataFrame(interim_data)
    
    # 执行适应性分析
    adaptive_design = BayesianAdaptiveTrial(max_sample_size=8000)
    ppp_results = adaptive_design.calculate_predictive_probability(
        interim_df, 
        target_effect=0.1
    )
    
    print("=== 贝叶斯自适应分析结果 ===")
    print(f"预测成功概率: {ppp_results['predictive_probability_success']:.1%}")
    print(f"预测无效概率: {ppp_results['predictive_probability_futility']:.1%}")
    print(f"推荐决策: {ppp_results['recommended_action']['decision']}")
    
    if ppp_results['recommended_action']['continue_sampling']:
        rec_n = ppp_results['recommended_action']['recommended_sample_size']
        print(f"推荐总样本量: {rec_n:,}")

V.II 高斯过程处理非线性时间效应

长期效应往往是非线性的,高斯过程(GP)提供灵活建模:

import pymc as pm
import numpy as np
import matplotlib.pyplot as plt

class GaussianProcessTimeEffect:
    """
    使用高斯过程建模时变因果效应
    
    优势:
    1. 无需预设函数形式
    2. 提供不确定性量化
    3. 平滑效应估计(避免过拟合)
    """
    
    def __init__(self, time_points: np.ndarray, 
                 kernel: str = 'Matern52'):
        self.time_points = time_points
        self.kernel = kernel
    
    def build_model(self, data: pd.DataFrame) -> pm.Model:
        """
        构建GP模型
        
        模型结构:
        Y_it = α_i + f(t) * Treatment_i + ε_it
        
        其中f(t)是高斯过程,捕捉效应的时间动态
        """
        # 数据准备
        n_users = data['user_id'].nunique()
        n_times = len(self.time_points)
        
        # 创建设计矩阵
        user_idx = data['user_id'].astype('category').cat.codes.values
        time_idx = data['time'].astype('category').cat.codes.values
        treatments = data['treatment'].values
        outcomes = data['outcome'].values
        
        with pm.Model() as gp_model:
            # 个体随机效应
            sigma_alpha = pm.HalfNormal('sigma_alpha', sigma=1)
            alpha = pm.Normal('alpha', mu=0, sigma=sigma_alpha, shape=n_users)
            
            # 处理效应的GP先验
            # 使用Matern核(适合捕捉平滑变化)
            eta = pm.HalfNormal('eta', sigma=1)  # 幅度
            ell = pm.Gamma('ell', alpha=2, beta=1)  # 长度尺度
            
            # 协方差矩阵
            time_dist = self.time_points[:, None] - self.time_points[None, :]
            if self.kernel == 'Matern52':
                K = eta**2 * (1 + np.sqrt(5)*np.abs(time_dist)/ell + 
                             5*time_dist**2/(3*ell**2)) * \
                    np.exp(-np.sqrt(5)*np.abs(time_dist)/ell)
            
            # GP潜在函数
            f = pm.MvNormal('f', mu=np.zeros(n_times), cov=K, shape=n_times)
            
            # 处理效应 = GP(时间) * 处理
            treatment_effect = f[time_idx] * treatments
            
            # 线性预测器
            mu = alpha[user_idx] + treatment_effect
            
            # 似然
            sigma = pm.HalfNormal('sigma', sigma=0.5)
            outcome = pm.Normal(
                'outcome',
                mu=mu,
                sigma=sigma,
                observed=outcomes
            )
            
            self.model = gp_model
        
        return gp_model
    
    def visualize_gp_effect(self, trace) -> plt.Figure:
        """
        可视化GP估计的时变效应
        """
        fig, ax = plt.subplots(figsize=(12, 6))
        
        # 提取GP轨迹
        f_posterior = trace.posterior['f'].values
        f_mean = f_posterior.mean(axis=(0, 1))
        f_hdi = az.hdi(trace.posterior['f'], hdi_prob=0.95)
        
        # 绘制
        ax.plot(self.time_points, f_mean, 'b-', linewidth=2, label='平均效应')
        ax.fill_between(
            self.time_points,
            f_hdi[:, 0],
            f_hdi[:, 1],
            alpha=0.3,
            color='blue',
            label='95%可信区间'
        )
        
        ax.axhline(y=0, color='r', linestyle='--', alpha=0.5)
        ax.set_xlabel('实验天数')
        ax.set_ylabel('处理效应')
        ax.set_title('高斯过程估计的时变因果效应')
        ax.legend()
        ax.grid(True, alpha=0.3)
        
        return fig

# 应用于GlobalShop
if __name__ == "__main__":
    # 创建时间轴(0到180天,每30天一个点)
    time_points = np.array([0, 30, 60, 90, 120, 150, 180])
    
    # 准备数据
    gp_data = []
    for user_id in range(200):
        treatment = np.random.binomial(1, 0.5)
        for i, t in enumerate(time_points[1:]):  # 跳过t=0
            # 真实效应:0-60天线性增长,60-120天平稳,120天后衰减
            if t <= 60:
                true_effect = 0.05 * t / 60
            elif t <= 120:
                true_effect = 0.05
            else:
                true_effect = 0.05 * np.exp(-0.02 * (t - 120))
            
            outcome = treatment * true_effect + np.random.normal(0, 0.01)
            gp_data.append({
                'user_id': user_id,
                'time': t,
                'treatment': treatment,
                'outcome': outcome
            })
    
    gp_df = pd.DataFrame(gp_data)
    
    # 拟合GP模型
    gp_estimator = GaussianProcessTimeEffect(time_points)
    model = gp_estimator.build_model(gp_df)
    
    with model:
        gp_trace = pm.sample(1000, chains=2, cores=2, progressbar=False)
    
    # 可视化
    fig = gp_estimator.visualize_gp_effect(gp_trace)
    plt.savefig('gp_time_effect.png', dpi=300, bbox_inches='tight')
    print("GP效应图已保存: gp_time_effect.png")

V.III 处理网络干扰的图因果推断

当存在用户间交互时(如社交推荐),使用图结构建模:

import networkx as nx
from typing import Dict, List, Tuple

class NetworkInterferenceAdjuster:
    """
    网络干扰调整器
    
    解决SUTVA违反问题:
    Y_i = f(D_i, D_{neighbors(i)}, X_i)
    
    使用图聚类随机化减少干扰
    """
    
    def __init__(self, graph: nx.Graph, cluster_size: int = 50):
        self.graph = graph
        self.cluster_size = cluster_size
        self.clusters = self._create_clusters()
    
    def _create_clusters(self) -> Dict[int, List[int]]:
        """
        使用图聚类算法创建实验簇
        
        目标:最小化簇间边,最大化簇内边
        """
        # 使用Louvain社区检测
        communities = nx.community.louvain_communities(
            self.graph, 
            resolution=0.8,  # 调节簇大小
            seed=42
        )
        
        # 映射簇ID到节点列表
        clusters = {}
        for cluster_id, nodes in enumerate(communities):
            # 如果簇太大,进一步分割
            if len(nodes) > self.cluster_size:
                subgraph = self.graph.subgraph(nodes)
                sub_communities = nx.community.louvain_communities(subgraph)
                for sub_id, sub_nodes in enumerate(sub_communities):
                    clusters[len(clusters)] = list(sub_nodes)
            else:
                clusters[cluster_id] = list(nodes)
        
        return clusters
    
    def cluster_randomized_assignment(self, treatment_prob: float = 0.5) -> pd.DataFrame:
        """
        簇随机化分配
        
        将整个簇分配给同一处理条件
        """
        assignments = []
        
        for cluster_id, nodes in self.clusters.items():
            cluster_treatment = np.random.binomial(1, treatment_prob)
            
            for node in nodes:
                assignments.append({
                    'node_id': node,
                    'cluster_id': cluster_id,
                    'treatment': cluster_treatment,
                    'n_cluster_nodes': len(nodes)
                })
        
        return pd.DataFrame(assignments)
    
    def 估计邻居处理效应(TEN) (self, outcome_data: pd.DataFrame, 
                               assignment_data: pd.DataFrame) -> float:
        """
        估计邻居处理效应(Treatment Effects on Neighbors)
        
        TFT = E[Yi | Di=0, 邻居被处理] - E[Yi | Di=0, 邻居未被处理]
        """
        # 合并数据和分配
        merged = outcome_data.merge(assignment_data, on='node_id', how='inner')
        
        # 计算每个节点的邻居处理比例
        neighbor_treatment_prop = []
        
        for _, row in merged.iterrows():
            node_id = row['node_id']
            neighbors = list(self.graph.neighbors(node_id))
            
            if not neighbors:
                neighbor_treatment_prop.append(0)
                continue
            
            # 获取邻居的分配
            neighbor_assignments = assignment_data[
                assignment_data['node_id'].isin(neighbors)
            ]
            
            prop_treated = neighbor_assignments['treatment'].mean()
            neighbor_treatment_prop.append(prop_treated)
        
        merged['neighbor_treatment_prop'] = neighbor_treatment_prop
        
        # 仅分析未处理个体(Di=0)
        control_group = merged[merged['treatment'] == 0]
        
        # 邻居处理效应估计
        high_neighbor_treatment = control_group[
            control_group['neighbor_treatment_prop'] > 0.5
        ]['outcome'].mean()
        
        low_neighbor_treatment = control_group[
            control_group['neighbor_treatment_prop'] <= 0.5
        ]['outcome'].mean()
        
        ten = high_neighbor_treatment - low_neighbor_treatment
        
        return ten

# 模拟社交网络实验
if __name__ == "__main__":
    # 生成随机图(小世界网络)
    graph = nx.watts_strogatz_graph(n=1000, k=10, p=0.1, seed=42)
    
    # 创建调整器
    adjuster = NetworkInterferenceAdjuster(graph, cluster_size=50)
    
    # 执行簇随机化
    assignments = adjuster.cluster_randomized_assignment(treatment_prob=0.5)
    
    print(f"创建 {len(adjuster.clusters)} 个簇")
    print(f"平均簇大小: {np.mean([len(nodes) for nodes in adjuster.clusters.values()]):.1f}")
    
    # 模拟结果数据
    outcomes = []
    for _, row in assignments.iterrows():
        node_id = row['node_id']
        treatment = row['treatment']
        
        # 真实模型:直接效应 + 邻居效应
        neighbors = list(graph.neighbors(node_id))
        neighbor_treatments = assignments[
            assignments['node_id'].isin(neighbors)
        ]['treatment'].values
        
        # 干扰效应为直接效应的30%
        spillover_effect = 0.3 * neighbor_treatments.mean() if len(neighbors) > 0 else 0
        
        outcome = 0.2 * treatment + spillover_effect + np.random.normal(0, 0.1)
        
        outcomes.append({
            'node_id': node_id,
            'outcome': outcome
        })
    
    outcome_df = pd.DataFrame(outcomes)
    
    # 估计邻居处理效应
    ten = adjuster.估计邻居处理效应(outcome_df, assignments)
    print(f"\n估计的邻居处理效应: {ten:.4f}")

Mermaid总结

Lexical error on line 19. Unrecognized text. ... 综合收益 E[样本效率↑30%] F[模型偏差 ----------------------^
【声明】本内容来自华为云开发者社区博主,不代表华为云及华为云开发者社区的观点和立场。转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息,否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。