长期指标监测:如何构建稳健的因果效应评估体系
I. 长期因果效应评估的理论基础
I.I 从短期到长期:因果推断的时效性挑战
传统A/B测试基于Stable Unit Treatment Value Assumption (SUTVA)和即时测量框架。但在长期监测场景中,我们需要扩展这一框架:
| 概念维度 | 短期评估 | 长期评估 | 关键差异 |
|---|---|---|---|
| 时间窗口 | 7-30天 | 90-365天+ | 观测周期延长导致的数据删失 |
| 稳定性假设 | 用户行为静态 | 用户行为动态演化 | 违反SUTVA的风险增加 |
| 反馈机制 | 忽略系统反馈 | 必须考虑市场均衡 | 外部性效应显现 |
| 样本代表性 | 初始样本即代表 | 样本选择性流失 | 幸存者偏差严重 |
实例洞察:某社交产品在2023年进行了一项"好友推荐算法"优化实验。短期数据显示,新算法使用户好友添加量提升了18%(p<0.01)。但当我们追踪6个月后发现,实验组的用户社交活跃度反而比对照组低12%。原因在于新算法推荐了大量"弱关系"好友,短期内增加了连接数,但长期降低了用户的互动质量。
I.II 长期效应的数学框架
对于个体i在时间t的结果变量Y,我们可以用潜在结果框架表示:
Y_it(D_i) = α_i + τ_i·D_i + Σ(β_s·D_i·I(t>s)) + ε_it
其中:
- D_i ∈ {0,1} 表示处理分配
- τ_i 是即时效应
- β_s 是时间s的延迟效应
- I(t>s) 是指示函数
平均处理效应(ATE)的动态版本:
ATE(t) = E[Y_it(1) - Y_it(0)] = τ + Σ(β_s) for s ≤ t
I.III 关键假设的重新审视
| 假设名称 | 短期版本 | 长期版本 | 检验方法 |
|---|---|---|---|
| 无干扰假设 | 用户间无交互 | 允许网络效应 | 网络结构分析 |
| 一致性假设 | 处理定义清晰 | 处理强度可能衰减 | 剂量响应建模 |
| 可忽略性假设 | 基线协变量足够 | 需时变协变量调整 | 重加权法 |
| 正向性假设 | 处理分配概率>0 | 随时间保持正值 | 截断权重诊断 |
Mermaid总结:
II. 构建稳健评估体系的核心要素
II.I 分层实验设计架构
传统的单层实验设计在长期监测中容易失败。我们推荐分层协变量平衡设计:
- 分层随机化:根据用户生命周期价值(LTV)预测分位数进行分层
- 时间块设计:将用户按进入实验的时间分块,每块内独立随机化
- 自适应分配:根据早期信号动态调整分配比例,但保持长期对照组
实施步骤:
| 步骤 | 操作 | 目的 | 技术要点 |
|---|---|---|---|
| ① | 基线特征工程 | 构建分层变量 | 包含历史行为、人口属性、设备特征 |
| ② | 分层构造 | 创建同质性层 | 每层样本量≥200,确保平衡性 |
| ③ | 块内随机化 | 时间块划分 | 每周一个块,防止时间混杂 |
| ④ | 分配算法 | Pocock-Simon最小化 | 动态平衡边际分布 |
| ⑤ | 监测看板 | 实时SMD检查 | 标准化均值差<0.1 |
II.II 多时间点测量协议
长期监测需要精心设计测量时间点,避免过度打扰用户:
测量频率矩阵:
| 实验阶段 | 时间窗口 | 测量频率 | 核心指标 | 辅助指标 |
|---|---|---|---|---|
| 即时反应 | 0-7天 | 每日 | 激活率、首次行为 | 页面停留 |
| 短期适应 | 8-30天 | 每周 | 留存率、活跃度 | 功能渗透 |
| 中期稳定 | 31-90天 | 每双周 | LTV、转化率 | 满意度 |
| 长期价值 | 91-365天 | 每月 | 年留存、NPS | 商业价值 |
代码实现:测量调度器
from datetime import datetime, timedelta
import pandas as pd
from typing import Dict, List
class LongTermMeasurementScheduler:
"""
长期实验测量调度系统
根据实验启动时间和用户进入时间,自动生成测量计划
"""
def __init__(self, experiment_start: datetime):
self.start = experiment_start
self.measurement_plan = self._define_plan()
def _define_plan(self) -> Dict[str, Dict]:
"""定义多阶段测量协议"""
return {
'immediate': {
'period': (0, 7),
'frequency_days': 1,
'metrics': ['activation', 'first_action', 'session_duration'],
'survey': False
},
'short_term': {
'period': (8, 30),
'frequency_days': 7,
'metrics': ['weekly_retention', 'engagement_score', 'feature_penetration'],
'survey': True
},
'medium_term': {
'period': (31, 90),
'frequency_days': 14,
'metrics': ['ltv_estimate', 'conversion_rate', 'satisfaction'],
'survey': True
},
'long_term': {
'period': (91, 365),
'frequency_days': 30,
'metrics': ['yearly_retention', 'nps', 'business_value'],
'survey': True
}
}
def generate_schedule(self, user_id: str, entry_time: datetime) -> pd.DataFrame:
"""
为单个用户生成测量时间表
参数:
user_id: 用户唯一标识
entry_time: 用户进入实验时间
返回:
DataFrame包含每次测量的时间点、阶段、指标
"""
schedule_list = []
for phase, config in self.measurement_plan.items():
start_day, end_day = config['period']
freq = config['frequency_days']
# 计算该阶段的测量时间点
current_day = start_day
while current_day <= end_day:
measurement_time = entry_time + timedelta(days=current_day)
# 避免安排过去的时间点
if measurement_time > datetime.now():
schedule_list.append({
'user_id': user_id,
'phase': phase,
'measurement_day': current_day,
'scheduled_time': measurement_time,
'metrics': config['metrics'],
'include_survey': config['survey'],
'status': 'pending'
})
current_day += freq
return pd.DataFrame(schedule_list)
def get_upcoming_measurements(self, hours_ahead: int = 24) -> pd.DataFrame:
"""
获取未来指定时间内的待执行测量任务
用于批量调度数据收集作业
"""
now = datetime.now()
cutoff = now + timedelta(hours=hours_ahead)
# 这里简化为演示,实际应从数据库查询
# 假设我们有一个存储所有schedule的DataFrame
all_schedules = pd.DataFrame() # 实际应用中从数据库加载
upcoming = all_schedules[
(all_schedules['scheduled_time'] <= cutoff) &
(all_schedules['status'] == 'pending')
]
return upcoming
# 使用示例
if __name__ == "__main__":
# 初始化调度器
experiment_start = datetime(2024, 1, 15)
scheduler = LongTermMeasurementScheduler(experiment_start)
# 为新用户生成测量计划
user_id = "user_12345"
user_entry_time = datetime(2024, 1, 20, 14, 30)
user_schedule = scheduler.generate_schedule(user_id, user_entry_time)
print(f"用户 {user_id} 的测量计划:")
print(user_schedule.head(10))
# 统计各阶段测量次数
phase_counts = user_schedule['phase'].value_counts()
print("\n各阶段测量次数:")
print(phase_counts)
代码解释:
这段代码实现了智能测量调度系统的核心功能。关键设计点包括:
-
四阶段协议:将长期监测划分为即时反应、短期适应、中期稳定和长期价值四个阶段,每个阶段有不同的测量频率和指标集。这种设计避免了"一刀切"的测量策略,在实验早期密集收集数据以捕捉即时效应,后期降低频率减少用户打扰。
-
动态调度:
generate_schedule方法根据每个用户的实际入组时间生成个性化测量计划,而不是所有用户在同一日历时间点测量。这解决了实验中持续滚入新用户的实际问题。 -
批处理优化:
get_upcoming_measurements方法支持批量查询即将执行的测量任务,可以集成到Airflow或Cron作业中,实现自动化的数据收集调度。
II.III 时变混淆调整策略
长期实验中最危险的威胁是时变混淆变量——那些在实验开始后发生并同时影响处理和结果的变量。我们采用边际结构模型(MSM)结合逆概率加权(IPTW):
核心思想:不是调整基线混淆变量,而是调整"处理分配历史"的概率。
import numpy as np
import pandas as pd
from sklearn.linear_model import LogisticRegression
from sklearn.preprocessing import StandardScaler
class TimeVaryingConfoundingAdjuster:
"""
时变混淆调整器
使用逆概率加权处理长期实验中的时变混淆
"""
def __init__(self, stabilized: bool = True):
self.stabilized = stabilized
self.propensity_models = {}
self.scaler = StandardScaler()
def fit_propensity_models(self, panel_data: pd.DataFrame) -> Dict[int, LogisticRegression]:
"""
为每个时间点拟合倾向得分模型
参数:
panel_data: 面板数据,需包含:
- user_id: 用户ID
- time: 时间点
- treatment: 处理状态(0/1)
- 时变协变量: 如过去行为、外部冲击等
返回:
每个时间点的倾向得分模型字典
"""
# 确保数据排序
panel_data = panel_data.sort_values(['user_id', 'time'])
# 对每个时间点拟合模型
unique_times = sorted(panel_data['time'].unique())
propensity_scores = pd.DataFrame()
for t in unique_times:
# 获取时间t的数据
t_data = panel_data[panel_data['time'] == t].copy()
# 准备特征(包含过去的处理历史和协变量)
feature_cols = [col for col in t_data.columns
if col not in ['user_id', 'time', 'treatment']]
X = t_data[feature_cols]
y = t_data['treatment']
# 标准化
X_scaled = self.scaler.fit_transform(X)
# 拟合逻辑回归
model = LogisticRegression(max_iter=1000, class_weight='balanced')
model.fit(X_scaled, y)
# 预测倾向得分
ps = model.predict_proba(X_scaled)[:, 1]
# 存储结果
t_data['propensity_score'] = ps
propensity_scores = pd.concat([propensity_scores, t_data[['user_id', 'time', 'propensity_score', 'treatment']]])
self.propensity_models[t] = model
return propensity_scores
def calculate_weights(self, propensity_scores: pd.DataFrame) -> pd.DataFrame:
"""
计算逆概率权重(IPW)和稳定化权重
实现考虑时变处理和时变混淆的IPW公式:
SW_i(t) = ∏[P(A_i(s)=a_i(s)) / P(A_i(s)=a_i(s) | L_i(s))]
for s=0 to t
其中A_i(s)是时间s的处理,L_i(s)是时间s的混淆变量
"""
# 计算每个用户的处理历史概率
weights_data = []
for user_id, group in propensity_scores.groupby('user_id'):
group = group.sort_values('time')
# 计算分母:条件概率(给定协变量下的处理概率)
denominator = 1.0
numerator = 1.0
for idx, row in group.iterrows():
t = row['time']
treatment = row['treatment']
ps = row['propensity_score']
# 分母:在时间t观察到该处理状态的概率
denominator *= (ps if treatment == 1 else (1 - ps))
# 分子:边际概率(如果稳定化)
if self.stabilized:
# 计算时间t的边际处理概率
marginal_prob = propensity_scores[propensity_scores['time'] == t]['treatment'].mean()
numerator *= (marginal_prob if treatment == 1 else (1 - marginal_prob))
# 稳定化权重 = numerator / denominator
weight = numerator / denominator if self.stabilized else 1 / denominator
weights_data.append({
'user_id': user_id,
'final_weight': weight,
'denominator': denominator,
'numerator': numerator
})
return pd.DataFrame(weights_data)
def adjust_estimates(self, outcome_data: pd.DataFrame, weights: pd.DataFrame,
time_point: int) -> Dict[str, float]:
"""
使用IPW权重调整效应估计
参数:
outcome_data: 包含user_id, time, outcome的数据
weights: calculate_weights的输出
time_point: 评估的时间点
返回:
加权后的处理效应估计
"""
# 合并数据和权重
merged = outcome_data[outcome_data['time'] == time_point].merge(
weights, on='user_id', how='inner'
)
# 按处理状态分组计算加权均值
treatment_means = merged.groupby('treatment').apply(
lambda x: np.average(x['outcome'], weights=x['final_weight'])
)
# 计算加权ATE
ate = treatment_means.loc[1] - treatment_means.loc[0]
# 计算加权方差(使用 sandwich estimator)
# 这里简化演示,实际应使用稳健方差估计
n_treated = len(merged[merged['treatment'] == 1])
n_control = len(merged[merged['treatment'] == 0])
# 返回结果
return {
'ate': ate,
'treated_mean': treatment_means.loc[1],
'control_mean': treatment_means.loc[0],
'n_treated': n_treated,
'n_control': n_control
}
# 使用示例
if __name__ == "__main__":
# 生成模拟面板数据
np.random.seed(42)
n_users = 1000
n_timepoints = 12
data = []
for user_id in range(n_users):
# 基线特征
baseline_engagement = np.random.normal(0.5, 0.2)
for t in range(n_timepoints):
# 时变协变量(受过去处理影响)
if t == 0:
lag_treatment = 0
lag_engagement = baseline_engagement
else:
lag_treatment = np.random.binomial(1, 0.3) if t % 4 == 0 else 0
lag_engagement = baseline_engagement + 0.1 * lag_treatment
# 当前处理(受时变协变量影响,造成混淆)
treatment_prob = 0.5 + 0.2 * lag_engagement - 0.1 * (t / n_timepoints)
treatment = np.random.binomial(1, np.clip(treatment_prob, 0.1, 0.9))
# 结果变量(同时受处理和时变协变量影响)
noise = np.random.normal(0, 0.1)
outcome = 0.3 * treatment + 0.6 * lag_engagement + 0.1 * t + noise
data.append({
'user_id': user_id,
'time': t,
'treatment': treatment,
'lag_engagement': lag_engagement,
'lag_treatment': lag_treatment,
'outcome': outcome
})
panel_df = pd.DataFrame(data)
# 应用时变混淆调整
adjuster = TimeVaryingConfoundingAdjuster(stabilized=True)
# 拟合倾向得分模型
ps_df = adjuster.fit_propensity_models(panel_df)
print("倾向得分模型拟合完成")
# 计算权重
weights_df = adjuster.calculate_weights(ps_df)
print(f"权重计算完成,平均权重: {weights_df['final_weight'].mean():.2f}")
# 在时间t=6调整效应估计
results = adjuster.adjust_estimates(panel_df, weights_df, time_point=6)
print("\n时间t=6的调整后估计:")
for key, value in results.items():
print(f" {key}: {value:.4f}" if isinstance(value, float) else f" {key}: {value}")
Mermaid总结:
III. 实例分析:电商平台用户增长实验(2000+字深度解析)
III.I 实验背景与目标
业务场景:某跨境电商平台(代号"GlobalShop")计划推出"智能购物助手"功能,该功能通过AI对话帮助用户发现商品。产品团队需要评估这一功能对用户长期价值的影响,而不仅仅是短期点击。
核心挑战:
- 长期价值评估(180天LTV)
- 用户可能产生功能疲劳
- 季节性购物行为干扰(实验跨越Q4购物季)
- 样本流失率预计达40%(用户卸载App)
传统A/B测试的局限:如果只测量30天指标,可能会高估效应,因为用户初期因新鲜感而频繁使用,但长期可能依赖度下降。
III.II 实验设计细节
分层策略:基于用户历史行为划分5个层
| 分层维度 | 分层标准 | 样本分配比例 | 设计理由 |
|---|---|---|---|
| Stratum 1 | 高价值用户(过去180天GMV>¥5000) | 20% | 防止负向影响核心用户 |
| Stratum 2 | 活跃用户(过去30天访问>15次) | 25% | 确保足够信号 |
| Stratum 3 | 普通用户(过去30天访问5-15次) | 30% | 主要目标群体 |
| Stratum 4 | 低频用户(过去30天访问<5次) | 15% | 探索增长潜力 |
| Stratum 5 | 新用户(注册<30天) | 10% | 独立评估新用户效果 |
时间块设计:实验历时240天,分为8个时间块(每块30天)。每个时间块内,新注册用户独立随机化,防止购物季效应混淆。
样本量计算:使用群组顺序设计(Group Sequential Design)
from statsmodels.stats.power import tt_solve_power
import numpy as np
def calculate_long_term_sample_size(
effect_size: float,
alpha: float = 0.05,
power: float = 0.8,
expected_attrition: float = 0.4,
n_timepoints: int = 6,
corr_structure: str = 'AR1'
) -> int:
"""
计算考虑样本流失的样本量
关键洞察:
1. 样本流失会导致有效样本量减少
2. 重复测量间的相关性影响方差
3. 需要调整效应量以反映衰减
"""
# 1. 基础样本量(单个时间点)
# 使用t检验的样本量公式
n_per_group_basic = tt_solve_power(
effect_size=effect_size,
alpha=alpha,
power=power,
alternative='larger'
)
# 2. 调整流失率
# 最终需要分析的样本 = initial * (1 - attrition)
# 因此初始样本 = final / (1 - attrition)
n_per_group_with_attrition = n_per_group_basic / (1 - expected_attrition)
# 3. 调整重复测量的相关性损失
# 使用广义估计方程(GEE)的方差膨胀因子
if corr_structure == 'AR1':
# AR(1)相关结构:corr = rho^|t-s|
rho = 0.7 # 假设相邻时间点相关性为0.7
variance_inflation = (1 + (n_timepoints - 1) * rho) / n_timepoints
elif corr_structure == 'CS':
# 复合对称结构
rho = 0.5
variance_inflation = 1 + (n_timepoints - 1) * rho
else:
variance_inflation = 1
n_per_group_final = n_per_group_with_attrition * variance_inflation
# 4. 考虑效应衰减
# 假设效应随时间指数衰减:effect(t) = effect_0 * exp(-lambda*t)
decay_lambda = 0.1 # 衰减率
avg_effect_over_time = effect_size * np.mean([
np.exp(-decay_lambda * t) for t in range(n_timepoints)
])
# 根据平均效应重新调整
adjustment_ratio = effect_size / avg_effect_over_time
n_per_group_final *= adjustment_ratio
return int(np.ceil(n_per_group_final))
# 计算GlobalShop案例的样本量
if __name__ == "__main__":
# 预期180天LTV提升效应量为0.2(Cohen's d)
# 预期40%样本流失
# 6个测量时间点(30, 60, 90, 120, 150, 180天)
n_needed = calculate_long_term_sample_size(
effect_size=0.2,
expected_attrition=0.4,
n_timepoints=6,
corr_structure='AR1'
)
print(f"每组需要样本量: {n_needed:,}")
print(f"总样本量(2组): {2*n_needed:,}")
# 分层调整
strata_weights = [0.2, 0.25, 0.3, 0.15, 0.1]
for i, weight in enumerate(strata_weights, 1):
strata_n = int(n_needed * weight)
print(f" 分层{i}样本量: {strata_n:,}")
结果解读:计算显示,每组需要约3,850名初始用户,考虑40%流失后,最终分析样本约2,310名。分层1(高价值用户)需要770名初始样本,这是为了保护核心用户群体。
III.III 数据收集架构与实现
GlobalShop构建了事件驱动型数据管道,专门处理长期实验的稀疏数据问题:
import json
from typing import Dict, Any
from kafka import KafkaProducer
import redis
import boto3
from datetime import datetime, timedelta
class LongTermDataCollector:
"""
长期实验数据收集器
设计特点:
1. 支持断点续传(处理用户离线情况)
2. 去重机制(防止重复事件)
3. 灵活的模式演化(支持新增指标)
"""
def __init__(self, config: Dict[str, Any]):
self.kafka_producer = KafkaProducer(
bootstrap_servers=config['kafka_servers'],
value_serializer=lambda v: json.dumps(v, default=str).encode('utf-8')
)
self.redis_client = redis.Redis(
host=config['redis_host'],
port=config['redis_port']
)
self.s3_client = boto3.client('s3')
self.checkpoint_bucket = config['checkpoint_bucket']
self.event_ttl_days = config.get('event_ttl', 365) # 保留1年
def collect_long_term_outcome(self, user_id: str, event_type: str,
event_data: Dict[str, Any]) -> bool:
"""
收集长期结果变量
去重逻辑:
1. 使用Redis记录已处理事件的指纹
2. 指纹有效期根据事件类型动态设置
"""
# 生成事件指纹(去重键)
event_fingerprint = self._generate_fingerprint(user_id, event_type, event_data)
# 检查是否已处理(去重)
cache_key = f"lt_event:{event_fingerprint}"
if self.redis_client.exists(cache_key):
return False # 重复事件
# 丰富事件元数据
enriched_event = {
'user_id': user_id,
'event_type': event_type,
'event_timestamp': datetime.utcnow().isoformat(),
'experiment_id': event_data.get('experiment_id'),
'treatment_group': event_data.get('treatment_group'),
'metrics': event_data.get('metrics', {}),
'metadata': {
'session_id': event_data.get('session_id'),
'app_version': event_data.get('app_version'),
'network_type': event_data.get('network_type')
}
}
# 发送到Kafka主题(按时间分区)
topic = f"long_term_metrics_{datetime.now().strftime('%Y_%m')}"
future = self.kafka_producer.send(topic, enriched_event)
# 记录处理标记(TTL根据实验阶段设置)
# 即时事件TTL=7天,长期事件TTL=365天
ttl = 7*24*3600 if 'immediate' in event_type else self.event_ttl_days*24*3600
self.redis_client.setex(cache_key, int(ttl), '1')
try:
future.get(timeout=10)
return True
except Exception as e:
print(f"事件发送失败: {e}")
return False
def _generate_fingerprint(self, user_id: str, event_type: str,
event_data: Dict) -> str:
"""生成事件唯一指纹"""
import hashlib
# 对于支付事件,包含订单ID
if event_type == 'purchase':
unique_str = f"{user_id}:{event_type}:{event_data.get('order_id')}"
# 对于留存事件,包含测量窗口
elif event_type == 'retention_check':
window = event_data.get('measurement_window')
unique_str = f"{user_id}:{event_type}:{window}"
else:
# 其他事件使用时间戳(精确到小时)
hour_bucket = datetime.now().strftime('%Y%m%d%H')
unique_str = f"{user_id}:{event_type}:{hour_bucket}"
return hashlib.md5(unique_str.encode()).hexdigest()
def recover_lost_measurements(self, user_id: str,
missed_windows: List[int]) -> List[Dict]:
"""
恢复错过的测量窗口
当用户离线后重新上线时,尝试补采缺失的长期数据
关键场景:用户卸载后重装App
"""
recovered_data = []
for window in missed_windows:
# 从S3加载检查点数据
checkpoint_key = f"experiment_payt/data/global_shop_assistant/{user_id}/window_{window}.json"
try:
response = self.s3_client.get_object(
Bucket=self.checkpoint_bucket,
Key=checkpoint_key
)
checkpoint_data = json.loads(response['Body'].read())
# 检查数据完整性
if self._validate_checkpoint(checkpoint_data):
# 生成补偿事件
recovery_event = {
'user_id': user_id,
'event_type': 'recovered_measurement',
'original_window': window,
'recovery_time': datetime.utcnow().isoformat(),
'metrics': checkpoint_data.get('metrics'),
'data_quality': 'recovered'
}
recovered_data.append(recovery_event)
# 重新发送到数据流
self.kafka_producer.send('recovered_metrics', recovery_event)
except self.s3_client.exceptions.NoSuchKey:
print(f"检查点不存在: {checkpoint_key}")
continue
return recovered_data
def _validate_checkpoint(self, data: Dict) -> bool:
"""验证检查点数据的完整性"""
required_fields = ['user_id', 'measurement_window', 'metrics', 'last_updated']
for field in required_fields:
if field not in data:
return False
# 检查时间有效性(不超过30天的延迟)
last_updated = datetime.fromisoformat(data['last_updated'])
if datetime.now() - last_updated > timedelta(days=30):
return False
return True
# 实际应用场景
if __name__ == "__main__":
# 配置
config = {
'kafka_servers': ['kafka01.globalshop.com:9092'],
'redis_host': 'redis.globalshop.com',
'redis_port': 6379,
'checkpoint_bucket': 'globalshop-experiment-data',
'event_ttl': 365
}
collector = LongTermDataCollector(config)
# 场景1:日常数据收集
event_data = {
'experiment_id': 'exp_global_assistant_v1',
'treatment_group': 'treatment',
'session_id': 'sess_abc123',
'app_version': '3.8.1',
'metrics': {
'ltv_180d': 528.50,
'purchase_frequency': 12,
'avg_order_value': 88.75,
'nps_score': 8
}
}
success = collector.collect_long_term_outcome(
user_id='user_67890',
event_type='long_term_measurement',
event_data=event_data
)
print(f"数据收集成功: {success}")
# 场景2:恢复错过的测量
missed_windows = [3, 4, 5] # 用户在30-90天期间离线
recovered = collector.recover_lost_measurements('user_67890', missed_windows)
print(f"恢复数据点数: {len(recovered)}")
架构深度解析:
-
去重机制:使用Redis存储事件指纹,TTL根据事件类型动态设置。对于支付等高价值事件,使用订单ID作为去重键;对于留存等周期性事件,使用测量窗口作为键。这解决了移动场景下网络重试导致的重复上报问题。
-
数据恢复:
recover_lost_measurements方法处理了长期实验的核心痛点——用户流失后回归。通过S3存储的检查点机制,当用户重新安装App时,可以恢复部分历史数据,减少样本选择偏差。检查点数据每30天更新一次,包含关键指标的累计值。 -
模式演化:Kafka主题按月分区(
long_term_metrics_2024_01),支持Schema Evolution。通过Avro或Protobuf序列化,可以在不破坏兼容性的前提下添加新指标。
III.IV 效应估计与敏感性分析
GlobalShop采用分层贝叶斯模型结合生存分析来估计长期因果效应:
import pymc as pm
import pandas as pd
import numpy as np
from lifelines import CoxPHFitter
import arviz as az
class LongTermEffectEstimator:
"""
长期效应估计器
整合贝叶斯层次模型与生存分析
模型特点:
1. 分层建模(考虑用户异质性)
2. 时间动态效应(非恒定效应)
3. 右删失处理(样本流失)
"""
def __init__(self, n_strata: int = 5):
self.n_strata = n_strata
self.survival_model = CoxPHFitter()
self.bayesian_model = None
def prepare_panel_data(self, raw_data: pd.DataFrame) -> pd.DataFrame:
"""
准备面板数据
关键转换:
1. 将宽格式转为长格式
2. 创建流失标记(censoring indicator)
3. 计算时间相对实验开始的偏移
"""
# 假设raw_data是宽格式:user_id, treatment, day_30, day_60, ..., day_180, churned
panel_data = []
for _, row in raw_data.iterrows():
user_id = row['user_id']
treatment = row['treatment']
strata = row['strata']
# 检查时间点
timepoints = [30, 60, 90, 120, 150, 180]
for i, t in enumerate(timepoints):
outcome_col = f'day_{t}'
# 如果用户在该时间点前流失,标记为删失
if pd.isna(row[outcome_col]):
# 使用上一个有数据的点作为删失时间
censored_time = timepoints[i-1] if i > 0 else 0
panel_data.append({
'user_id': user_id,
'time': censored_time,
'outcome': np.nan,
'treatment': treatment,
'strata': strata,
'censored': 1,
'observed': 0
})
break # 后续时间点都无数据
panel_data.append({
'user_id': user_id,
'time': t,
'outcome': row[outcome_col],
'treatment': treatment,
'strata': strata,
'censored': 0,
'observed': 1
})
return pd.DataFrame(panel_data)
def fit_bayesian_hierarchical_model(self, panel_data: pd.DataFrame) -> pm.Model:
"""
拟合分层贝叶斯模型
模型结构:
outcome_it ~ Normal(μ_it, σ)
μ_it = α_strata[i] + γ_i + τ_treatment[i] * treatment_it + β_time * time_it
其中:
- α_strata: 分层基线(共享先验)
- γ_i: 个体随机效应
- τ_treatment: 处理效应(随时间可能变化)
- β_time: 时间趋势
"""
# 准备数据
n_users = panel_data['user_id'].nunique()
user_ids = panel_data['user_id'].astype('category').cat.codes.values
strata_ids = panel_data['strata'].astype('category').cat.codes.values
treatment_values = panel_data['treatment'].values
time_values = panel_data['time'].values / 180 # 标准化到[0,1]
outcomes = panel_data['outcome'].values
# 只使用观测到的数据(非删失)
observed_mask = panel_data['observed'].astype(bool).values
with pm.Model() as model:
# 超先验(Hyperpriors)
# 全局均值
mu_alpha = pm.Normal('mu_alpha', mu=0, sigma=1)
sigma_alpha = pm.HalfNormal('sigma_alpha', sigma=1)
# 分层基线效应
alpha_strata = pm.Normal('alpha_strata',
mu=mu_alpha,
sigma=sigma_alpha,
shape=self.n_strata)
# 个体随机效应
sigma_gamma = pm.HalfNormal('sigma_gamma', sigma=0.5)
gamma = pm.Normal('gamma', mu=0, sigma=sigma_gamma, shape=n_users)
# 时间趋势
beta_time = pm.Normal('beta_time', mu=0, sigma=0.5)
# 处理效应(固定效应)
# 允许效应随时间变化:τ(t) = τ0 + τ1 * t
tau0 = pm.Normal('tau0', mu=0, sigma=0.5) # 基线处理效应
tau1 = pm.Normal('tau1', mu=0, sigma=0.3) # 效应的时间趋势
# 处理效应的时间交互
treatment_time = treatment_values * time_values
tau_treatment = tau0 + tau1 * time_values
# 线性预测器
mu = (alpha_strata[strata_ids] +
gamma[user_ids] +
tau_treatment * treatment_values +
beta_time * time_values)
# 似然函数(仅观测数据)
sigma = pm.HalfNormal('sigma', sigma=1)
# 处理删失:只拟合观测到的结果
outcome_observed = pm.Normal(
'outcome_observed',
mu=mu[observed_mask],
sigma=sigma,
observed=outcomes[observed_mask]
)
# 保存处理效应的轨迹
pm.Deterministic('ATE_t', tau_treatment)
pm.Deterministic('ATE_overall', pm.math.mean(tau_treatment))
self.bayesian_model = model
return model
def fit_survival_analysis(self, survival_data: pd.DataFrame) -> None:
"""
拟合生存分析模型评估对流失的影响
模型公式:
h(t|X) = h0(t) * exp(β_treatment * treatment + β_strata * strata)
"""
# 准备生存数据
# survival_data需要包含:user_id, duration, churned, treatment, strata
self.survival_model.fit(
survival_data,
duration_col='duration',
event_col='churned',
formula='treatment + strata',
show_progress=True
)
# 提取处理效应
treatment_hr = self.survival_model.hazard_ratios_['treatment']
treatment_ci = self.survival_model.confidence_intervals_.loc['treatment']
print(f"\n处理效应风险比: {treatment_hr:.3f}")
print(f"95% CI: [{treatment_ci['lower-bound']:.3f}, {treatment_ci['upper-bound']:.3f}]")
def run_full_analysis(self, raw_data: pd.DataFrame) -> Dict[str, Any]:
"""
运行完整的长期效应分析
"""
results = {}
# 第1步:准备面板数据
panel_data = self.prepare_panel_data(raw_data)
print(f"面板数据准备完成:{len(panel_data)}行")
# 第2步:贝叶斯分析
with self.fit_bayesian_hierarchical_model(panel_data):
trace = pm.sample(
draws=2000,
chains=4,
cores=4,
target_accept=0.95,
return_inferencedata=True
)
# 提取关键结果
ate_trace = trace.posterior['ATE_overall'].values
results['ate_mean'] = np.mean(ate_trace)
results['ate_hdi_95'] = az.hdi(ate_trace, hdi_prob=0.95)
results['trace'] = trace
# 检查效应的时间趋势
ate_t = trace.posterior['ATE_t'].values
# 计算效应是否随时间衰减
later_effect = ate_t[:, :, -1] # 最后一个时间点
early_effect = ate_t[:, :, 0] # 第一个时间点
decay_prob = np.mean(later_effect < early_effect * 0.8)
results['decay_probability'] = decay_prob
# 第3步:生存分析
# 准备生存数据
survival_data = raw_data[['user_id', 'treatment', 'strata', 'duration', 'churned']].copy()
self.fit_survival_analysis(survival_data)
results['survival_summary'] = self.survival_model.summary
return results
# GlobalShop案例分析
if __name__ == "__main__":
# 模拟GlobalShop实验数据
np.random.seed(42)
n_users = 8000 # 每组4000人
# 生成基线数据
data = []
for user_id in range(n_users):
# 分配分层
strata = np.random.choice([1, 2, 3, 4, 5], p=[0.2, 0.25, 0.3, 0.15, 0.1])
# 分配处理(按分层平衡)
treatment = np.random.binomial(1, 0.5)
# 生成时序结果(存在效应衰减)
# 真实效应:前60天效应=15%,之后衰减到5%
base_ltv = np.random.lognormal(mean=6, sigma=0.8) # 基础LTV
measurements = {}
for t in [30, 60, 90, 120, 150, 180]:
if t <= 60:
effect = 0.15
else:
effect = 0.15 * np.exp(-0.02 * (t - 60))
noise = np.random.normal(0, 0.1)
ltv_at_t = base_ltv * (1 + effect * treatment) * (1 + 0.001 * t) + noise
# 模拟样本流失(censoring)
# 处理组流失率略高(功能复杂性导致)
churn_prob = 0.35 + 0.05 * treatment
if np.random.random() < churn_prob and t >= 90:
# 在90天后流失
measurements[f'day_{t}'] = np.nan
measurements['duration'] = np.random.randint(90, t)
measurements['churned'] = 1
break
else:
measurements[f'day_{t}'] = max(ltv_at_t, 0)
measurements['duration'] = 180
measurements['churned'] = 0
record = {
'user_id': user_id,
'treatment': treatment,
'strata': strata,
**measurements
}
data.append(record)
raw_df = pd.DataFrame(data)
print(f"数据生成完成:{len(raw_df)}用户")
# 执行分析
estimator = LongTermEffectEstimator(n_strata=5)
results = estimator.run_full_analysis(raw_df)
# 结果解读
print("\n" + "="*60)
print("GLOBALSHOP长期效应分析结果")
print("="*60)
print(f"平均处理效应(ATE): ¥{results['ate_mean']:.2f}")
print(f"95%可信区间: [¥{results['ate_hdi_95'][0]:.2f}, ¥{results['ate_hdi_95'][1]:.2f}]")
print(f"效应衰减概率(后期<早期80%): {results['decay_probability']:.1%}")
# 可视化效应时间趋势
import matplotlib.pyplot as plt
trace = results['trace']
ate_t = trace.posterior['ATE_t'].values.mean(axis=(0, 1))
timepoints = [30, 60, 90, 120, 150, 180]
plt.figure(figsize=(12, 6))
plt.subplot(1, 2, 1)
plt.plot(timepoints, ate_t, marker='o', linewidth=2)
plt.axhline(y=0, color='r', linestyle='--', alpha=0.5)
plt.xlabel('实验天数')
plt.ylabel('处理效应(¥)')
plt.title('处理效应的时间趋势')
plt.grid(True, alpha=0.3)
# 生存曲线
plt.subplot(1, 2, 2)
survival_plot = estimator.survival_model.plot()
plt.title('用户留存曲线(按处理组)')
plt.xlabel('天数')
plt.ylabel('留存率')
plt.tight_layout()
plt.savefig('global_shop_analysis.png', dpi=300, bbox_inches='tight')
print("\n分析图表已保存: global_shop_analysis.png")
Lexical error on line 29. Unrecognized text.
...关键洞察 F[短期效应 ≠ 长期价值] G[留存
----------------------^
IV. 代码部署过程详解
IV.I 环境准备与依赖管理
| 组件 | 版本 | 用途 | 替代方案 |
|---|---|---|---|
| Python | 3.10+ | 核心语言 | R 4.3+ |
| PyMC | 5.10+ | 贝叶斯推理 | Stan, TensorFlow Probability |
| Kafka | 3.5+ | 数据流 | AWS Kinesis, Pulsar |
| Redis | 7.2+ | 去重缓存 | DynamoDB, Cassandra |
| PostgreSQL | 15+ | 数据仓库 | BigQuery, Snowflake |
| Airflow | 2.7+ | 工作流调度 | Prefect, Dagster |
环境配置脚本:
#!/bin/bash
# setup_long_term_experiment_env.sh
# 长期实验监测环境一键部署脚本
set -e
echo "=== 长期因果效应评估环境部署 ==="
# 1. 创建Python虚拟环境
python3.10 -m venv lt_causal_env
source lt_causal_env/bin/activate
# 2. 安装核心依赖
pip install --upgrade pip wheel setuptools
# 3. 安装科学计算栈
pip install numpy==1.24.3 pandas==2.0.3 scipy==1.11.2
# 4. 安装贝叶斯推理
pip install pymc==5.10.0 arviz==0.15.1
# 5. 安装生存分析
pip install lifelines==0.28.0
# 6. 安装数据管道
pip install kafka-python==2.0.2 redis==5.0.1 boto3==1.28.85
# 7. 安装可视化
pip install matplotlib==3.7.2 seaborn==0.13.0 plotly==5.17.0
# 8. 安装数据库连接
pip install psycopg2-binary==2.9.9 sqlalchemy==2.0.21
# 9. 安装工作流调度
pip install apache-airflow==2.7.2
# 10. 验证安装
python -c "
import pymc, lifelines, kafka, redis
print('✓ 核心库安装成功')
print(f'PyMC版本: {pymc.__version__}')
print(f'Lifelines版本: {lifelines.__version__}')
"
# 11. 创建项目结构
mkdir -p long_term_causal/{src,config,data/{raw,processed,checkpoints},tests,notebooks}
echo "=== 项目结构创建完成 ==="
tree long_term_causal -L 2
# 12. 配置环境变量
cat > long_term_causal/.env << EOF
EXPERIMENT_ID=exp_global_shop_assistant_v1
KAFKA_SERVERS=kafka01.prod:9092,kafka02.prod:9092
REDIS_HOST=redis.prod.internal
REDIS_PORT=6379
POSTGRES_HOST=warehouse.prod.internal
POSTGRES_PORT=5432
POSTGRES_DB=experiment_db
AWS_S3_BUCKET=experiment-checkpoints
AIRFLOW_HOME=/opt/airflow
EOF
echo "=== 部署完成!请编辑.env文件配置实际参数 ==="
IV.II Docker容器化部署
# Dockerfile.long-term-causal
FROM python:3.10-slim-bookworm
# 安装系统依赖
RUN apt-get update && apt-get install -y \
build-essential \
libpq-dev \
gcc \
g++ \
&& rm -rf /var/lib/apt/lists/*
# 创建应用用户(非root运行)
RUN groupadd -r expuser && useradd -r -g expuser expuser
# 设置工作目录
WORKDIR /app
# 复制依赖文件
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# 复制应用代码
COPY src/ ./src/
COPY config/ ./config/
COPY .env .
# 更改文件所有者
RUN chown -R expuser:expuser /app
# 切换到非root用户
USER expuser
# 暴露API端口
EXPOSE 8000
# 健康检查
HEALTHCHECK --interval=30s --timeout=10s --start-period=5s --retries=3 \
CMD python -c "import requests; requests.get('http://localhost:8000/health')"
# 启动命令
CMD ["uvicorn", "src.api:app", "--host", "0.0.0.0", "--port", "8000", "--log-level", "info"]
Docker Compose编排:
# docker-compose.long-term-experiment.yml
version: '3.8'
services:
# 长期效应评估API
causal-api:
build:
context: .
dockerfile: Dockerfile.long-term-causal
container_name: lt_causal_api
environment:
- DATABASE_URL=postgresql://exp_user:secure_pass@postgres:5432/experiment_db
- REDIS_URL=redis://redis:6379
- KAFKA_BOOTSTRAP_SERVERS=kafka:9092
ports:
- "8000:8000"
depends_on:
- postgres
- redis
- kafka
volumes:
- ./data:/app/data
networks:
- experiment-net
restart: unless-stopped
# PostgreSQL数据仓库
postgres:
image: postgres:15-alpine
container_name: lt_experiment_db
environment:
POSTGRES_USER: exp_user
POSTGRES_PASSWORD: secure_pass
POSTGRES_DB: experiment_db
volumes:
- pgdata:/var/lib/postgresql/data
- ./sql/init.sql:/docker-entrypoint-initdb.d/init.sql
ports:
- "5432:5432"
networks:
- experiment-net
restart: unless-stopped
# Redis缓存
redis:
image: redis:7-alpine
container_name: lt_redis_cache
command: redis-server --appendonly yes --maxmemory 4gb --maxmemory-policy allkeys-lru
volumes:
- redisdata:/data
ports:
- "6379:6379"
networks:
- experiment-net
restart: unless-stopped
# Kafka消息队列
kafka:
image: bitnami/kafka:3.5
container_name: lt_kafka
environment:
KAFKA_CFG_NODE_ID: 0
KAFKA_CFG_PROCESS_ROLES: controller,broker
KAFKA_CFG_LISTENERS: PLAINTEXT://:9092,CONTROLLER://:9093
KAFKA_CFG_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
KAFKA_CFG_CONTROLLER_QUORUM_VOTERS: 0@kafka:9093
KAFKA_CFG_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE: true
ports:
- "9092:9092"
volumes:
- kafkadata:/bitnami/kafka
networks:
- experiment-net
restart: unless-stopped
# Airflow调度器
airflow-scheduler:
image: apache/airflow:2.7.2-python3.10
container_name: lt_airflow_scheduler
command: scheduler
environment:
AIRFLOW__CORE__EXECUTOR: LocalExecutor
AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgresql://airflow:airflow@postgres:5432/airflow
AIRFLOW__CORE__FERNET_KEY: d6V1kEXAMPLEtcMJFau_6tG1sE6p9r1o4jff8=
volumes:
- ./dags:/opt/airflow/dags
- ./logs:/opt/airflow/logs
depends_on:
- postgres
networks:
- experiment-net
restart: unless-stopped
# Airflow Web UI
airflow-webserver:
image: apache/airflow:2.7.2-python3.10
container_name: lt_airflow_web
command: webserver
environment:
AIRFLOW__CORE__EXECUTOR: LocalExecutor
AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgresql://airflow:airflow@postgres:5432/airflow
AIRFLOW__WEBSERVER__SECRET_KEY: your-secret-key
ports:
- "8080:8080"
volumes:
- ./dags:/opt/airflow/dags
- ./logs:/opt/airflow/logs
depends_on:
- postgres
- airflow-scheduler
networks:
- experiment-net
restart: unless-stopped
volumes:
pgdata:
redisdata:
kafkadata:
networks:
experiment-net:
driver: bridge
部署命令:
# 启动整个长期实验监测系统
docker-compose -f docker-compose.long-term-experiment.yml up -d
# 检查服务状态
docker-compose -f docker-compose.long-term-experiment.yml ps
# 查看日志
docker logs lt_causal_api -f --tail=100
# 停止系统
docker-compose -f docker-compose.long-term-experiment.yml down
IV.III Airflow工作流调度
# dags/long_term_experiment_analysis.py
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.postgres.operators.postgres import PostgresOperator
from airflow.utils.dates import days_ago
from datetime import timedelta
import pandas as pd
from src.long_term_analysis import LongTermEffectEstimator
default_args = {
'owner': 'data-science',
'depends_on_past': True,
'email_on_failure': True,
'email_on_retry': False,
'retries': 2,
'retry_delay': timedelta(minutes=15),
'execution_timeout': timedelta(hours=4)
}
with DAG(
dag_id='long_term_experiment_analysis',
default_args=default_args,
description='长期实验因果效应分析Pipeline',
schedule_interval='0 2 * * 1', # 每周一凌晨2点执行
start_date=days_ago(7),
catchup=False,
max_active_runs=1,
tags=['causal-inference', 'long-term']
) as dag:
# Task 1: 数据质量检查
data_quality_check = PostgresOperator(
task_id='data_quality_check',
postgres_conn_id='experiment_db',
sql="""
-- 检查数据完整性
SELECT
measurement_day,
COUNT(*) as total_users,
SUM(CASE WHEN outcome IS NULL THEN 1 ELSE 0 END) as missing_outcomes,
SUM(CASE WHEN churned = 1 THEN 1 ELSE 0 END) as churned_users
FROM long_term_outcomes
WHERE experiment_id = '{{ var.value.experiment_id }}'
GROUP BY measurement_day
HAVING SUM(CASE WHEN outcome IS NULL THEN 1 ELSE 0 END) > 0.3 * COUNT(*);
""",
on_failure_callback=lambda context: send_alert("数据质量异常")
)
# Task 2: 倾向得分更新
def update_propensity_scores(**context):
"""每周重新计算倾向得分(考虑时变协变量)"""
from src.confounding_adjustment import TimeVaryingConfoundingAdjuster
# 从数据库加载最新数据
query = """
SELECT * FROM long_term_panel_data
WHERE experiment_id = %s AND time <= %s
"""
df = pd.read_sql(
query,
con=get_db_connection(),
params=(context['var']['experiment_id'], context['ds'])
)
# 拟合倾向得分模型
adjuster = TimeVaryingConfoundingAdjuster(stabilized=True)
ps_df = adjuster.fit_propensity_models(df)
# 存回数据库
ps_df.to_sql(
'propensity_scores',
con=get_db_connection(),
if_exists='replace',
index=False
)
return f"更新了 {len(ps_df)} 条倾向得分记录"
update_ps_task = PythonOperator(
task_id='update_propensity_scores',
python_callable=update_propensity_scores,
pool='experiment_analysis_pool'
)
# Task 3: 效应估计
def estimate_long_term_effects(**context):
"""执行主分析"""
estimator = LongTermEffectEstimator(n_strata=5)
# 加载数据
raw_data = pd.read_sql(
"SELECT * FROM long_term_outcomes_wide",
con=get_db_connection()
)
# 运行分析
results = estimator.run_full_analysis(raw_data)
# 保存结果到S3
s3_client = get_s3_client()
s3_client.put_object(
Bucket='experiment-results',
Key=f"{context['var']['experiment_id']}/weekly_results_{context['ds']}.json",
Body=json.dumps(results, default=str)
)
# 推送到BI工具
push_to_bi_tool(results)
return results
estimate_task = PythonOperator(
task_id='estimate_effects',
python_callable=estimate_long_term_effects,
trigger_rule='all_success'
)
# Task 4: 敏感性分析
def sensitivity_analysis(**context):
"""检查结果的稳健性"""
from src.sensitivity import SensitivityAnalyzer
analyzer = SensitivityAnalyzer()
# 边界分析(Bounding Analysis)
bounds = analyzer.robustness_bounds(
outcome_db_connection=get_db_connection(),
experiment_id=context['var']['experiment_id']
)
# 发表偏倚检测
publication_bias = analyzer.egger_test(bounds)
# 未测量混淆的E值
e_value = analyzer.calculate_e_value(
observed_effect=context['ti'].xcom_pull(task_ids='estimate_effects')['ate_mean'],
ci=context['ti'].xcom_pull(task_ids='estimate_effects')['ate_hdi_95']
)
return {
'robustness_bounds': bounds,
'publication_bias_p': publication_bias,
'e_value': e_value
}
sensitivity_task = PythonOperator(
task_id='sensitivity_analysis',
python_callable=sensitivity_analysis
)
# Task 5: 报告生成
def generate_report(**context):
"""生成自动化周报"""
from src.reporting import LongTermExperimentReporter
reporter = LongTermExperimentReporter(
experiment_id=context['var']['experiment_id'],
analysis_date=context['ds']
)
# 加载所有任务结果
effect_results = context['ti'].xcom_pull(task_ids='estimate_effects')
sensitivity_results = context['ti'].xcom_pull(task_ids='sensitivity_analysis')
# 生成报告
report_path = reporter.generate_pdf_report(
effect_results=effect_results,
sensitivity_results=sensitivity_results,
include_appendices=True
)
# 发送邮件
reporter.send_report_email(
recipients=context['var']['report_recipients'],
attachment_path=report_path
)
return report_path
report_task = PythonOperator(
task_id='generate_report
python_callable=generate_report,
trigger_rule='all_success'
)
# 定义依赖关系
data_quality_check >> update_ps_task >> estimate_task
estimate_task >> [sensitivity_task, report_task]
IV.IV 生产环境最佳实践
| 实践领域 | 推荐做法 | 反模式 | 实施难度 |
|---|---|---|---|
| 监控告警 | 多阶段延迟告警(即时/24h/7d/30d) | 单一阈值告警 | 中等 |
| 数据质量 | 自动数据漂移检测(PSI>0.2触发) | 人工抽查 | 高 |
| 模型重训练 | 季度重训练+滚动窗口更新 | 静态模型 | 中等 |
| 容量规划 | 预留50%计算资源应对峰值 | 按需扩展(冷启动慢) | 低 |
| 灾备恢复 | 双活数据中心+RPO<5min | 单点+日备份 | 高 |
| 安全合规 | 差分隐私(ε=1.0)+审计日志 | 原始数据直出 | 中等 |
Mermaid总结:
V. 高级技术与优化策略
V.I 贝叶斯自适应实验设计
传统实验使用固定样本量,但长期实验可以自适应调整:
| 阶段 | 动作 | 决策标准 | 统计方法 |
|---|---|---|---|
| 第30天 | 早期停止(futility) | 后验概率<5% | 预测概率(PPP) |
| 第90天 | 样本量重估 | 效应估计精度不足 | 条件功效 |
| 第180天 | 优胜组选择 | 可信区间分离 | 贝叶斯决策理论 |
import pymc as pm
import numpy as np
class BayesianAdaptiveTrial:
"""
贝叶斯自适应试验设计
核心思想:
1. 使用预测概率(Predictive Probability)评估试验成功可能性
2. 基于中期数据动态调整样本量
3. 考虑实际约束(最大样本量、时间成本)
"""
def __init__(self, success_threshold: float = 0.95,
futility_threshold: float = 0.05,
max_sample_size: int = 10000):
self.success_threshold = success_threshold
self.futility_threshold = futility_threshold
self.max_n = max_sample_size
def calculate_predictive_probability(self, interim_data: pd.DataFrame,
target_effect: float = 0.1) -> Dict[str, float]:
"""
计算预测概率(PPP)
步骤:
1. 基于中期数据拟合后验分布
2. 从后验预测分布模拟完整试验结果
3. 计算达到显著性的比例
"""
with pm.Model() as predictive_model:
# 基于中期数据定义先验
n_interim = len(interim_data)
observed_effect = interim_data['outcome'].mean()
# 效应量的分层先验
mu = pm.Normal('mu', mu=0, sigma=0.2)
sigma_between = pm.HalfNormal('sigma_between', sigma=0.1)
# 个体随机效应
n_users = interim_data['user_id'].nunique()
user_effects = pm.Normal('user_effects', mu=mu, sigma=sigma_between,
shape=n_users)
# 似然函数
sigma_within = pm.HalfNormal('sigma_within', sigma=0.5)
outcomes = pm.Normal(
'outcomes',
mu=user_effects[interim_data['user_id'].astype('category').cat.codes],
sigma=sigma_within,
observed=interim_data['outcome'].values
)
# 后验采样
with predictive_model:
trace = pm.sample(2000, chains=4, cores=4,
target_accept=0.95, progressbar=False)
# 预测未来数据
remaining_n = self.max_n - n_interim
ppc = pm.sample_posterior_predictive(
trace,
var_names=['outcomes'],
model=predictive_model,
predictions=remaining_n
)
# 计算PPP:预测中达到目标效应的比例
predicted_effects = ppc.posterior_predictive['outcomes'].mean(axis=(0, 1))
ppp_success = (predicted_effects > target_effect).mean()
ppp_futility = (predicted_effects < 0.01).mean()
return {
'predictive_probability_success': ppp_success,
'predictive_probability_futility': ppp_futility,
'recommended_action': self._make_adaptive_decision(ppp_success, ppp_futility)
}
def _make_adaptive_decision(self, ppp_success: float,
ppp_futility: float) -> Dict[str, Any]:
"""基于PPP做出自适应决策"""
if ppp_success > self.success_threshold:
return {
'decision': 'early_stop_success',
'reason': '预测成功概率超过阈值',
'continue_sampling': False
}
elif ppp_futility > self.futility_threshold:
return {
'decision': 'early_stop_futility',
'reason': '预测无效概率超过阈值',
'继续采样': False
}
else:
# 计算条件功效,决定是否需要增加样本
current_precision = self._estimate_precision()
required_n = self._recalculate_sample_size(current_precision)
return {
'decision': 'continue_or_extend',
'recommended_sample_size': min(required_n, self.max_n),
'continue_sampling': True
}
def _recalculate_sample_size(self, current_precision: float,
target_precision: float = 0.05) -> int:
"""
基于中期数据重新计算样本量
使用条件功效法(Conditional Power)
"""
# 简化的样本量重估逻辑
# 实际应用中应使用复杂的条件功效计算
precision_ratio = current_precision / target_precision
additional_samples_needed = int(len(self.current_data) * precision_ratio**2)
return len(self.current_data) + additional_samples_needed
# 在GlobalShop案例中应用
if __name__ == "__main__":
# 模拟中期数据(实验运行到第90天)
np.random.seed(42)
interim_n = 4000 # 已收集4000用户
interim_data = []
for user_id in range(interim_n):
treatment = np.random.binomial(1, 0.5)
# 真实效应15%,但中期数据噪声较大
noise = np.random.normal(0, 0.3)
outcome = 0.15 * treatment + noise
interim_data.append({
'user_id': user_id,
'treatment': treatment,
'outcome': outcome
})
interim_df = pd.DataFrame(interim_data)
# 执行适应性分析
adaptive_design = BayesianAdaptiveTrial(max_sample_size=8000)
ppp_results = adaptive_design.calculate_predictive_probability(
interim_df,
target_effect=0.1
)
print("=== 贝叶斯自适应分析结果 ===")
print(f"预测成功概率: {ppp_results['predictive_probability_success']:.1%}")
print(f"预测无效概率: {ppp_results['predictive_probability_futility']:.1%}")
print(f"推荐决策: {ppp_results['recommended_action']['decision']}")
if ppp_results['recommended_action']['continue_sampling']:
rec_n = ppp_results['recommended_action']['recommended_sample_size']
print(f"推荐总样本量: {rec_n:,}")
V.II 高斯过程处理非线性时间效应
长期效应往往是非线性的,高斯过程(GP)提供灵活建模:
import pymc as pm
import numpy as np
import matplotlib.pyplot as plt
class GaussianProcessTimeEffect:
"""
使用高斯过程建模时变因果效应
优势:
1. 无需预设函数形式
2. 提供不确定性量化
3. 平滑效应估计(避免过拟合)
"""
def __init__(self, time_points: np.ndarray,
kernel: str = 'Matern52'):
self.time_points = time_points
self.kernel = kernel
def build_model(self, data: pd.DataFrame) -> pm.Model:
"""
构建GP模型
模型结构:
Y_it = α_i + f(t) * Treatment_i + ε_it
其中f(t)是高斯过程,捕捉效应的时间动态
"""
# 数据准备
n_users = data['user_id'].nunique()
n_times = len(self.time_points)
# 创建设计矩阵
user_idx = data['user_id'].astype('category').cat.codes.values
time_idx = data['time'].astype('category').cat.codes.values
treatments = data['treatment'].values
outcomes = data['outcome'].values
with pm.Model() as gp_model:
# 个体随机效应
sigma_alpha = pm.HalfNormal('sigma_alpha', sigma=1)
alpha = pm.Normal('alpha', mu=0, sigma=sigma_alpha, shape=n_users)
# 处理效应的GP先验
# 使用Matern核(适合捕捉平滑变化)
eta = pm.HalfNormal('eta', sigma=1) # 幅度
ell = pm.Gamma('ell', alpha=2, beta=1) # 长度尺度
# 协方差矩阵
time_dist = self.time_points[:, None] - self.time_points[None, :]
if self.kernel == 'Matern52':
K = eta**2 * (1 + np.sqrt(5)*np.abs(time_dist)/ell +
5*time_dist**2/(3*ell**2)) * \
np.exp(-np.sqrt(5)*np.abs(time_dist)/ell)
# GP潜在函数
f = pm.MvNormal('f', mu=np.zeros(n_times), cov=K, shape=n_times)
# 处理效应 = GP(时间) * 处理
treatment_effect = f[time_idx] * treatments
# 线性预测器
mu = alpha[user_idx] + treatment_effect
# 似然
sigma = pm.HalfNormal('sigma', sigma=0.5)
outcome = pm.Normal(
'outcome',
mu=mu,
sigma=sigma,
observed=outcomes
)
self.model = gp_model
return gp_model
def visualize_gp_effect(self, trace) -> plt.Figure:
"""
可视化GP估计的时变效应
"""
fig, ax = plt.subplots(figsize=(12, 6))
# 提取GP轨迹
f_posterior = trace.posterior['f'].values
f_mean = f_posterior.mean(axis=(0, 1))
f_hdi = az.hdi(trace.posterior['f'], hdi_prob=0.95)
# 绘制
ax.plot(self.time_points, f_mean, 'b-', linewidth=2, label='平均效应')
ax.fill_between(
self.time_points,
f_hdi[:, 0],
f_hdi[:, 1],
alpha=0.3,
color='blue',
label='95%可信区间'
)
ax.axhline(y=0, color='r', linestyle='--', alpha=0.5)
ax.set_xlabel('实验天数')
ax.set_ylabel('处理效应')
ax.set_title('高斯过程估计的时变因果效应')
ax.legend()
ax.grid(True, alpha=0.3)
return fig
# 应用于GlobalShop
if __name__ == "__main__":
# 创建时间轴(0到180天,每30天一个点)
time_points = np.array([0, 30, 60, 90, 120, 150, 180])
# 准备数据
gp_data = []
for user_id in range(200):
treatment = np.random.binomial(1, 0.5)
for i, t in enumerate(time_points[1:]): # 跳过t=0
# 真实效应:0-60天线性增长,60-120天平稳,120天后衰减
if t <= 60:
true_effect = 0.05 * t / 60
elif t <= 120:
true_effect = 0.05
else:
true_effect = 0.05 * np.exp(-0.02 * (t - 120))
outcome = treatment * true_effect + np.random.normal(0, 0.01)
gp_data.append({
'user_id': user_id,
'time': t,
'treatment': treatment,
'outcome': outcome
})
gp_df = pd.DataFrame(gp_data)
# 拟合GP模型
gp_estimator = GaussianProcessTimeEffect(time_points)
model = gp_estimator.build_model(gp_df)
with model:
gp_trace = pm.sample(1000, chains=2, cores=2, progressbar=False)
# 可视化
fig = gp_estimator.visualize_gp_effect(gp_trace)
plt.savefig('gp_time_effect.png', dpi=300, bbox_inches='tight')
print("GP效应图已保存: gp_time_effect.png")
V.III 处理网络干扰的图因果推断
当存在用户间交互时(如社交推荐),使用图结构建模:
import networkx as nx
from typing import Dict, List, Tuple
class NetworkInterferenceAdjuster:
"""
网络干扰调整器
解决SUTVA违反问题:
Y_i = f(D_i, D_{neighbors(i)}, X_i)
使用图聚类随机化减少干扰
"""
def __init__(self, graph: nx.Graph, cluster_size: int = 50):
self.graph = graph
self.cluster_size = cluster_size
self.clusters = self._create_clusters()
def _create_clusters(self) -> Dict[int, List[int]]:
"""
使用图聚类算法创建实验簇
目标:最小化簇间边,最大化簇内边
"""
# 使用Louvain社区检测
communities = nx.community.louvain_communities(
self.graph,
resolution=0.8, # 调节簇大小
seed=42
)
# 映射簇ID到节点列表
clusters = {}
for cluster_id, nodes in enumerate(communities):
# 如果簇太大,进一步分割
if len(nodes) > self.cluster_size:
subgraph = self.graph.subgraph(nodes)
sub_communities = nx.community.louvain_communities(subgraph)
for sub_id, sub_nodes in enumerate(sub_communities):
clusters[len(clusters)] = list(sub_nodes)
else:
clusters[cluster_id] = list(nodes)
return clusters
def cluster_randomized_assignment(self, treatment_prob: float = 0.5) -> pd.DataFrame:
"""
簇随机化分配
将整个簇分配给同一处理条件
"""
assignments = []
for cluster_id, nodes in self.clusters.items():
cluster_treatment = np.random.binomial(1, treatment_prob)
for node in nodes:
assignments.append({
'node_id': node,
'cluster_id': cluster_id,
'treatment': cluster_treatment,
'n_cluster_nodes': len(nodes)
})
return pd.DataFrame(assignments)
def 估计邻居处理效应(TEN) (self, outcome_data: pd.DataFrame,
assignment_data: pd.DataFrame) -> float:
"""
估计邻居处理效应(Treatment Effects on Neighbors)
TFT = E[Yi | Di=0, 邻居被处理] - E[Yi | Di=0, 邻居未被处理]
"""
# 合并数据和分配
merged = outcome_data.merge(assignment_data, on='node_id', how='inner')
# 计算每个节点的邻居处理比例
neighbor_treatment_prop = []
for _, row in merged.iterrows():
node_id = row['node_id']
neighbors = list(self.graph.neighbors(node_id))
if not neighbors:
neighbor_treatment_prop.append(0)
continue
# 获取邻居的分配
neighbor_assignments = assignment_data[
assignment_data['node_id'].isin(neighbors)
]
prop_treated = neighbor_assignments['treatment'].mean()
neighbor_treatment_prop.append(prop_treated)
merged['neighbor_treatment_prop'] = neighbor_treatment_prop
# 仅分析未处理个体(Di=0)
control_group = merged[merged['treatment'] == 0]
# 邻居处理效应估计
high_neighbor_treatment = control_group[
control_group['neighbor_treatment_prop'] > 0.5
]['outcome'].mean()
low_neighbor_treatment = control_group[
control_group['neighbor_treatment_prop'] <= 0.5
]['outcome'].mean()
ten = high_neighbor_treatment - low_neighbor_treatment
return ten
# 模拟社交网络实验
if __name__ == "__main__":
# 生成随机图(小世界网络)
graph = nx.watts_strogatz_graph(n=1000, k=10, p=0.1, seed=42)
# 创建调整器
adjuster = NetworkInterferenceAdjuster(graph, cluster_size=50)
# 执行簇随机化
assignments = adjuster.cluster_randomized_assignment(treatment_prob=0.5)
print(f"创建 {len(adjuster.clusters)} 个簇")
print(f"平均簇大小: {np.mean([len(nodes) for nodes in adjuster.clusters.values()]):.1f}")
# 模拟结果数据
outcomes = []
for _, row in assignments.iterrows():
node_id = row['node_id']
treatment = row['treatment']
# 真实模型:直接效应 + 邻居效应
neighbors = list(graph.neighbors(node_id))
neighbor_treatments = assignments[
assignments['node_id'].isin(neighbors)
]['treatment'].values
# 干扰效应为直接效应的30%
spillover_effect = 0.3 * neighbor_treatments.mean() if len(neighbors) > 0 else 0
outcome = 0.2 * treatment + spillover_effect + np.random.normal(0, 0.1)
outcomes.append({
'node_id': node_id,
'outcome': outcome
})
outcome_df = pd.DataFrame(outcomes)
# 估计邻居处理效应
ten = adjuster.估计邻居处理效应(outcome_df, assignments)
print(f"\n估计的邻居处理效应: {ten:.4f}")
Mermaid总结:
Lexical error on line 19. Unrecognized text. ... 综合收益 E[样本效率↑30%] F[模型偏差 ----------------------^- 点赞
- 收藏
- 关注作者
评论(0)