贝叶斯方法在A/B测试中的创新应用
【摘要】 I. 传统A/B测试的局限性与贝叶斯范式的优势 传统频率主义方法的挑战传统A/B测试基于频率主义统计框架,这种方法在实践中面临多个核心挑战:样本量刚性:必须预先确定样本量,无法灵活调整多次检验问题:无法在测试过程中随时查看结果而不增加第一类错误结果解释困难:p值和置信区间的解释常常被误解决策二元化:只能得出"显著"或"不显著"的结论,缺乏 nuance 实例分析:电商平台的决策困境某电商平...
I. 传统A/B测试的局限性与贝叶斯范式的优势
传统频率主义方法的挑战
传统A/B测试基于频率主义统计框架,这种方法在实践中面临多个核心挑战:
- 样本量刚性:必须预先确定样本量,无法灵活调整
- 多次检验问题:无法在测试过程中随时查看结果而不增加第一类错误
- 结果解释困难:p值和置信区间的解释常常被误解
- 决策二元化:只能得出"显著"或"不显著"的结论,缺乏 nuance
实例分析:电商平台的决策困境
某电商平台进行支付流程优化的A/B测试,计划收集5000用户样本。当收集到3000用户时,产品经理观察到变体B的转化率似乎更高,希望提前结束测试上线新版本。但数据科学家警告说,这样做会增大假阳性风险。
团队陷入两难:
- 如果继续测试,可能错过业务机会
- 如果提前停止,可能基于噪声做出错误决策
- 无法量化当前证据的支持强度
贝叶斯方法的竞争优势
维度 | 传统方法 | 贝叶斯方法 |
---|---|---|
样本量 | 固定,需预先计算 | 灵活,支持序贯分析 |
监测频率 | 有限制,避免α膨胀 | 任意频率,无多重检验问题 |
结果解释 | 基于p值和置信区间 | 直接概率陈述,更直观 |
先验知识 | 无法纳入 | 可整合历史数据和领域知识 |
决策支持 | 二元结论 | 概率性结论,支持风险量化 |
范式转变的核心价值
II. 贝叶斯A/B测试的数学基础与核心概念
贝叶斯定理在A/B测试中的应用
贝叶斯定理为A/B测试提供了完整的概率框架:
在A/B测试语境中:
- :假设(如"变体B优于变体A")
- :观测数据
- :先验概率(基于历史数据或领域知识)
- :似然函数(数据在假设下的概率)
- :后验概率(给定数据后假设的概率)
共轭先验分布的选择
对于常见的A/B测试指标,推荐使用共轭先验分布:
数据类型 | 似然分布 | 共轭先验 | 后验分布 |
---|---|---|---|
转化率 | 伯努利 | Beta分布 | Beta分布 |
点击率 | 二项 | Beta分布 | Beta分布 |
收入/价值 | 正态 | 正态-逆伽马 | 正态-逆伽马 |
计数数据 | 泊松 | Gamma分布 | Gamma分布 |
后验分布的计算
贝叶斯A/B测试的核心是通过后验分布进行推断。对于转化率测试,后验分布的计算如下:
import numpy as np
import matplotlib.pyplot as plt
import scipy.stats as stats
from scipy import integrate
import pandas as pd
class BayesianConversionRate:
"""贝叶斯转化率测试基础类"""
def __init__(self, prior_alpha=1, prior_beta=1):
"""
初始化贝叶斯转化率测试
参数:
prior_alpha: Beta先验分布的α参数
prior_beta: Beta先验分布的β参数
"""
self.prior_alpha = prior_alpha
self.prior_beta = prior_beta
self.results = {}
def update_posterior(self, successes, trials, variant_name):
"""
更新后验分布
参数:
successes: 成功次数
trials: 总试验次数
variant_name: 变体名称
"""
posterior_alpha = self.prior_alpha + successes
posterior_beta = self.prior_beta + (trials - successes)
self.results[variant_name] = {
'successes': successes,
'trials': trials,
'posterior_alpha': posterior_alpha,
'posterior_beta': posterior_beta,
'observed_rate': successes / trials if trials > 0 else 0
}
return posterior_alpha, posterior_beta
def calculate_probability_beating_control(self, treatment_name, control_name='control'):
"""
计算实验组优于对照组的概率
"""
if treatment_name not in self.results or control_name not in self.results:
raise ValueError("必须先更新两个变体的后验分布")
treatment_alpha = self.results[treatment_name]['posterior_alpha']
treatment_beta = self.results[treatment_name]['posterior_beta']
control_alpha = self.results[control_name]['posterior_alpha']
control_beta = self.results[control_name]['posterior_beta']
# 通过数值积分计算P(treatment > control)
probability = integrate.quad(
lambda p: stats.beta.pdf(p, treatment_alpha, treatment_beta) *
stats.beta.cdf(p, control_alpha, control_beta),
0, 1
)[0]
return probability
def calculate_expected_loss(self, treatment_name, control_name='control'):
"""
计算预期损失(如果做出错误决策的预期代价)
"""
treatment_alpha = self.results[treatment_name]['posterior_alpha']
treatment_beta = self.results[treatment_name]['posterior_beta']
control_alpha = self.results[control_name]['posterior_alpha']
control_beta = self.results[control_name]['posterior_beta']
# 计算预期损失
samples_treatment = np.random.beta(treatment_alpha, treatment_beta, 100000)
samples_control = np.random.beta(control_alpha, control_beta, 100000)
loss_treatment = np.maximum(samples_control - samples_treatment, 0).mean()
loss_control = np.maximum(samples_treatment - samples_control, 0).mean()
return {
'loss_if_choose_treatment': loss_treatment,
'loss_if_choose_control': loss_control
}
def plot_posterior_distributions(self, variants=None):
"""
绘制后验分布图
"""
if variants is None:
variants = list(self.results.keys())
plt.figure(figsize=(12, 6))
x = np.linspace(0, 1, 1000)
for variant in variants:
alpha = self.results[variant]['posterior_alpha']
beta = self.results[variant]['posterior_beta']
observed_rate = self.results[variant]['observed_rate']
y = stats.beta.pdf(x, alpha, beta)
plt.plot(x, y, label=f'{variant} (观测: {observed_rate:.3f})', linewidth=2)
# 添加最高密度区间
hdi_low, hdi_high = self.calculate_hdi(alpha, beta)
plt.axvspan(hdi_low, hdi_high, alpha=0.2)
plt.xlabel('转化率')
plt.ylabel('概率密度')
plt.title('后验分布比较')
plt.legend()
plt.grid(True, alpha=0.3)
plt.show()
def calculate_hdi(self, alpha, beta, credible_level=0.95):
"""
计算最高密度区间(HDI)
"""
# 简化实现 - 实际中可能使用更精确的方法
samples = np.random.beta(alpha, beta, 100000)
lower_percentile = (1 - credible_level) / 2
upper_percentile = 1 - lower_percentile
return np.percentile(samples, lower_percentile * 100), np.percentile(samples, upper_percentile * 100)
# 示例使用
bayesian_test = BayesianConversionRate(prior_alpha=2, prior_beta=2)
# 模拟数据:对照组和实验组
control_successes = 120
control_trials = 1000
treatment_successes = 150
treatment_trials = 1000
# 更新后验分布
bayesian_test.update_posterior(control_successes, control_trials, 'control')
bayesian_test.update_posterior(treatment_successes, treatment_trials, 'treatment')
# 计算关键指标
prob_beating_control = bayesian_test.calculate_probability_beating_control('treatment')
expected_loss = bayesian_test.calculate_expected_loss('treatment')
print(f"实验组优于对照组的概率: {prob_beating_control:.3f}")
print(f"如果选择实验组的预期损失: {expected_loss['loss_if_choose_treatment']:.4f}")
print(f"如果选择对照组的预期损失: {expected_loss['loss_if_choose_control']:.4f}")
# 可视化后验分布
bayesian_test.plot_posterior_distributions()
贝叶斯推理的核心概念
III. 贝叶斯A/B测试的完整实施框架
端到端实施流程
建立完整的贝叶斯A/B测试流程需要系统化的方法:
class BayesianABTestFramework:
"""贝叶斯A/B测试完整框架"""
def __init__(self, variants, prior_params=None, decision_threshold=0.95,
risk_threshold=0.01):
"""
初始化测试框架
参数:
variants: 变体列表
prior_params: 各变体的先验参数
decision_threshold: 决策阈值(概率)
risk_threshold: 风险阈值(预期损失)
"""
self.variants = variants
self.decision_threshold = decision_threshold
self.risk_threshold = risk_threshold
if prior_params is None:
# 默认使用无信息先验
self.prior_params = {variant: {'alpha': 1, 'beta': 1} for variant in variants}
else:
self.prior_params = prior_params
self.test_history = []
self.current_state = {variant: {'successes': 0, 'trials': 0} for variant in variants}
def add_observation(self, variant, success):
"""
添加新的观测数据
"""
self.current_state[variant]['trials'] += 1
if success:
self.current_state[variant]['successes'] += 1
# 记录历史状态
snapshot = {
'timestamp': len(self.test_history),
'state': self.current_state.copy(),
'metrics': self._calculate_current_metrics()
}
self.test_history.append(snapshot)
return snapshot
def _calculate_current_metrics(self):
"""
计算当前所有变体的后验指标
"""
metrics = {}
bayesian_calculator = BayesianConversionRate()
# 为每个变体更新后验
for variant in self.variants:
state = self.current_state[variant]
prior_alpha = self.prior_params[variant]['alpha']
prior_beta = self.prior_params[variant]['beta']
bayesian_calculator.prior_alpha = prior_alpha
bayesian_calculator.prior_beta = prior_beta
bayesian_calculator.update_posterior(
state['successes'], state['trials'], variant
)
# 计算比较指标
control_variant = self.variants[0] # 假设第一个是对照组
for variant in self.variants[1:]:
prob_beating = bayesian_calculator.calculate_probability_beating_control(
variant, control_variant
)
expected_loss = bayesian_calculator.calculate_expected_loss(
variant, control_variant
)
metrics[f'{variant}_vs_{control_variant}'] = {
'probability_beating_control': prob_beating,
'expected_loss': expected_loss['loss_if_choose_treatment'],
'expected_gain': -expected_loss['loss_if_choose_control']
}
return metrics
def check_stopping_conditions(self):
"""
检查停止条件
"""
current_metrics = self.test_history[-1]['metrics'] if self.test_history else {}
stopping_conditions = {}
recommendation = "继续测试"
for comparison, metrics in current_metrics.items():
prob_beating = metrics['probability_beating_control']
expected_loss = metrics['expected_loss']
# 检查是否达到决策阈值且风险可接受
if prob_beating > self.decision_threshold and expected_loss < self.risk_threshold:
stopping_conditions[comparison] = {
'met': True,
'reason': f'概率({prob_beating:.3f}) > 阈值({self.decision_threshold}) 且 损失({expected_loss:.4f}) < 风险阈值({self.risk_threshold})',
'recommendation': f'选择 {comparison.split("_vs_")[0]}'
}
recommendation = f"停止测试: {stopping_conditions[comparison]['recommendation']}"
elif prob_beating < (1 - self.decision_threshold) and expected_loss < self.risk_threshold:
stopping_conditions[comparison] = {
'met': True,
'reason': f'概率({prob_beating:.3f}) < {1-self.decision_threshold} 且 损失({expected_loss:.4f}) < 风险阈值({self.risk_threshold})',
'recommendation': f'选择对照组'
}
recommendation = "停止测试: 选择对照组"
else:
stopping_conditions[comparison] = {
'met': False,
'reason': '未达到停止条件'
}
return {
'stopping_conditions': stopping_conditions,
'overall_recommendation': recommendation,
'should_stop': any(cond['met'] for cond in stopping_conditions.values())
}
def run_sequential_test(self, data_stream, max_samples=10000):
"""
运行序贯测试
"""
results = []
for i, (variant, success) in enumerate(data_stream):
if i >= max_samples:
break
# 添加观测
snapshot = self.add_observation(variant, success)
# 检查停止条件
stopping_check = self.check_stopping_conditions()
snapshot['stopping_check'] = stopping_check
results.append(snapshot)
print(f"样本 {i+1}: {variant} - {'成功' if success else '失败'}")
print(f" 当前推荐: {stopping_check['overall_recommendation']}")
if stopping_check['should_stop']:
print("达到停止条件,结束测试")
break
return results
def plot_test_progress(self):
"""
绘制测试进度图
"""
if not self.test_history:
print("尚无测试数据")
return
timestamps = [entry['timestamp'] for entry in self.test_history]
# 提取概率轨迹
prob_data = {}
for variant in self.variants[1:]:
comparison_key = f'{variant}_vs_{self.variants[0]}'
prob_data[variant] = [
entry['metrics'][comparison_key]['probability_beating_control']
for entry in self.test_history
if comparison_key in entry['metrics']
]
plt.figure(figsize=(12, 8))
# 概率轨迹
plt.subplot(2, 1, 1)
for variant, probs in prob_data.items():
plt.plot(timestamps[:len(probs)], probs, label=variant, linewidth=2)
plt.axhline(y=self.decision_threshold, color='red', linestyle='--',
label=f'决策阈值 ({self.decision_threshold})')
plt.axhline(y=1-self.decision_threshold, color='blue', linestyle='--',
label=f'反向阈值 ({1-self.decision_threshold})')
plt.xlabel('样本数量')
plt.ylabel('优于对照组的概率')
plt.title('贝叶斯A/B测试进度 - 概率轨迹')
plt.legend()
plt.grid(True, alpha=0.3)
# 样本量分布
plt.subplot(2, 1, 2)
sample_sizes = []
for entry in self.test_history:
sizes = [entry['state'][v]['trials'] for v in self.variants]
sample_sizes.append(sizes)
sample_sizes = np.array(sample_sizes)
for i, variant in enumerate(self.variants):
plt.plot(timestamps, sample_sizes[:, i], label=variant, linewidth=2)
plt.xlabel('样本数量')
plt.ylabel('各变体样本量')
plt.title('样本量累积过程')
plt.legend()
plt.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
# 模拟数据流生成器
def simulate_data_stream(variants, true_rates, total_samples=5000):
"""模拟数据流"""
for i in range(total_samples):
variant = np.random.choice(variants)
success = np.random.random() < true_rates[variant]
yield variant, success
# 示例实施
variants = ['control', 'treatment_A', 'treatment_B']
true_rates = {'control': 0.10, 'treatment_A': 0.12, 'treatment_B': 0.09}
# 使用历史数据设置信息先验
prior_params = {
'control': {'alpha': 30, 'beta': 270}, # 基于历史数据:约10%转化率
'treatment_A': {'alpha': 1, 'beta': 1}, # 新变体,使用弱信息先验
'treatment_B': {'alpha': 1, 'beta': 1}
}
# 初始化测试框架
ab_test = BayesianABTestFramework(
variants=variants,
prior_params=prior_params,
decision_threshold=0.95,
risk_threshold=0.005
)
# 运行测试
data_stream = simulate_data_stream(variants, true_rates, 2000)
results = ab_test.run_sequential_test(data_stream)
# 绘制测试进度
ab_test.plot_test_progress()
实施流程可视化
IV. 高级贝叶斯方法:多臂老虎机与Thompson采样
多臂老虎机问题
在多变体测试场景中,传统的A/B/n测试需要将流量均匀分配给所有变体,这导致了统计效率低下。多臂老虎机方法通过动态流量分配来解决这个问题。
Thompson采样原理
Thompson采样是一种贝叶斯优化方法,其核心思想是根据当前后验分布随机采样,选择表现最佳的变体:
class ThompsonSamplingBandit:
"""Thompson采样多臂老虎机"""
def __init__(self, variants, prior_alpha=1, prior_beta=1,
exploration_weight=1.0):
"""
初始化Thompson采样
参数:
variants: 变体列表
prior_alpha, prior_beta: Beta先验参数
exploration_weight: 探索权重
"""
self.variants = variants
self.prior_alpha = prior_alpha
self.prior_beta = prior_beta
self.exploration_weight = exploration_weight
# 初始化每个变体的后验参数
self.posteriors = {
variant: {'alpha': prior_alpha, 'beta': prior_beta}
for variant in variants
}
self.history = []
def select_variant(self):
"""
选择下一个要展示的变体
"""
# 从每个变体的后验分布中采样
samples = {}
for variant, posterior in self.posteriors.items():
# 添加探索权重
alpha = posterior['alpha'] * self.exploration_weight
beta = posterior['beta'] * self.exploration_weight
samples[variant] = np.random.beta(alpha, beta)
# 选择采样值最大的变体
selected_variant = max(samples, key=samples.get)
return selected_variant, samples
def update_posterior(self, variant, success):
"""
更新选定变体的后验分布
"""
if success:
self.posteriors[variant]['alpha'] += 1
else:
self.posteriors[variant]['beta'] += 1
# 记录历史
self.history.append({
'variant': variant,
'success': success,
'posteriors': self.posteriors.copy()
})
def run_bandit_experiment(self, data_generator, n_rounds=1000):
"""
运行老虎机实验
"""
results = {
'selections': {variant: 0 for variant in self.variants},
'successes': {variant: 0 for variant in self.variants},
'trials': {variant: 0 for variant in self.variants}
}
conversion_rates = []
selection_counts = []
for round in range(n_rounds):
# 选择变体
selected_variant, samples = self.select_variant()
results['selections'][selected_variant] += 1
# 生成结果(在实际应用中来自真实用户)
true_rate = data_generator.get_true_rate(selected_variant)
success = np.random.random() < true_rate
# 更新后验
self.update_posterior(selected_variant, success)
# 记录结果
results['trials'][selected_variant] += 1
if success:
results['successes'][selected_variant] += 1
# 记录进度
if round % 100 == 0:
current_rates = {
variant: results['successes'][variant] / max(1, results['trials'][variant])
for variant in self.variants
}
conversion_rates.append(current_rates)
selection_counts.append(results['selections'].copy())
return results, conversion_rates, selection_counts
def plot_bandit_performance(self, conversion_rates, selection_counts, true_rates):
"""
绘制老虎机性能
"""
rounds = list(range(0, len(conversion_rates) * 100, 100))
fig, (ax1, ax2, ax3) = plt.subplots(3, 1, figsize=(12, 12))
# 转化率收敛情况
for variant in self.variants:
rates = [cr[variant] for cr in conversion_rates]
ax1.plot(rounds, rates, label=variant, linewidth=2)
ax1.axhline(y=true_rates[variant], linestyle='--', alpha=0.7,
label=f'{variant}真实值')
ax1.set_xlabel('轮次')
ax1.set_ylabel('观测转化率')
ax1.set_title('Thompson采样 - 转化率收敛')
ax1.legend()
ax1.grid(True, alpha=0.3)
# 选择比例变化
for variant in self.variants:
selections = [sc[variant] for sc in selection_counts]
total_selections = [sum(sc.values()) for sc in selection_counts]
proportions = [s / t for s, t in zip(selections, total_selections)]
ax2.plot(rounds, proportions, label=variant, linewidth=2)
ax2.set_xlabel('轮次')
ax2.set_ylabel('选择比例')
ax2.set_title('Thompson采样 - 流量分配演变')
ax2.legend()
ax2.grid(True, alpha=0.3)
# 后悔值分析
best_rate = max(true_rates.values())
regret_data = []
for i, cr in enumerate(conversion_rates):
round_regret = 0
for variant in self.variants:
if selection_counts[i][variant] > 0:
regret = (best_rate - true_rates[variant]) * selection_counts[i][variant]
round_regret += regret
regret_data.append(round_regret)
cumulative_regret = np.cumsum(regret_data)
ax3.plot(rounds, cumulative_regret, linewidth=2, color='red')
ax3.set_xlabel('轮次')
ax3.set_ylabel('累积后悔值')
ax3.set_title('Thompson采样 - 累积后悔值')
ax3.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
# 数据生成器类
class DataGenerator:
def __init__(self, true_rates):
self.true_rates = true_rates
def get_true_rate(self, variant):
return self.true_rates[variant]
# 比较Thompson采样与A/B测试
def compare_methods(true_rates, total_traffic=10000):
"""比较Thompson采样与传统A/B测试"""
variants = list(true_rates.keys())
best_variant = max(true_rates, key=true_rates.get)
best_rate = true_rates[best_variant]
# Thompson采样
ts_bandit = ThompsonSamplingBandit(variants, exploration_weight=1.0)
data_gen = DataGenerator(true_rates)
ts_results, ts_rates, ts_selections = ts_bandit.run_bandit_experiment(
data_gen, total_traffic
)
# 传统A/B测试(均匀分配)
ab_results = {variant: {'successes': 0, 'trials': 0} for variant in variants}
traffic_per_variant = total_traffic // len(variants)
for variant in variants:
successes = np.random.binomial(traffic_per_variant, true_rates[variant])
ab_results[variant]['successes'] = successes
ab_results[variant]['trials'] = traffic_per_variant
# 计算效率指标
ts_conversions = sum(ts_results['successes'].values())
ab_conversions = sum(ab_results[variant]['successes'] for variant in variants)
ts_regret = (best_rate * total_traffic) - ts_conversions
ab_regret = (best_rate * total_traffic) - ab_conversions
print("方法比较结果:")
print(f"Thompson采样总转化: {ts_conversions}")
print(f"A/B测试总转化: {ab_conversions}")
print(f"Thompson采样后悔值: {ts_regret:.1f}")
print(f"A/B测试后悔值: {ab_regret:.1f}")
print(f"效率提升: {(ab_regret - ts_regret) / ab_regret * 100:.1f}%")
# 绘制比较图
ts_bandit.plot_bandit_performance(ts_rates, ts_selections, true_rates)
return ts_results, ab_results
# 运行比较实验
true_rates = {
'control': 0.10,
'variant_A': 0.11,
'variant_B': 0.09,
'variant_C': 0.13 # 最佳变体
}
ts_results, ab_results = compare_methods(true_rates, 5000)
多臂老虎机架构
V. 实际案例:电商平台的贝叶斯实验系统
业务背景
某大型电商平台面临以下挑战:
- 同时运行数十个A/B测试,流量竞争激烈
- 传统测试周期长,机会成本高
- 决策缺乏不确定性量化
- 无法有效利用历史测试数据
贝叶斯实验系统设计
class BayesianExperimentSystem:
"""电商平台贝叶斯实验系统"""
def __init__(self, historical_data=None):
self.experiments = {}
self.historical_data = historical_data or {}
self.prior_registry = {}
def create_experiment(self, experiment_id, variants, objective_type='conversion',
prior_source='historical'):
"""
创建新实验
"""
# 基于目标类型和历史数据设置先验
if prior_source == 'historical' and objective_type in self.historical_data:
historical_prior = self._calculate_historical_prior(objective_type)
prior_params = historical_prior
else:
# 使用弱信息先验
prior_params = {variant: {'alpha': 1, 'beta': 1} for variant in variants}
experiment = {
'id': experiment_id,
'variants': variants,
'objective_type': objective_type,
'prior_params': prior_params,
'current_data': {v: {'successes': 0, 'trials': 0} for v in variants},
'start_time': pd.Timestamp.now(),
'status': 'running'
}
self.experiments[experiment_id] = experiment
return experiment
def _calculate_historical_prior(self, objective_type):
"""
基于历史数据计算先验分布
"""
if objective_type not in self.historical_data:
return None
historical_results = self.historical_data[objective_type]
# 使用历史实验的合并后验作为新实验的先验
total_successes = sum(r['successes'] for r in historical_results)
total_trials = sum(r['trials'] for r in historical_results)
# 计算经验贝叶斯先验
empirical_alpha = max(1, total_successes / len(historical_results))
empirical_beta = max(1, (total_trials - total_successes) / len(historical_results))
return {'alpha': empirical_alpha, 'beta': empirical_beta}
def log_event(self, experiment_id, variant, user_id, success, metadata=None):
"""
记录实验事件
"""
if experiment_id not in self.experiments:
raise ValueError(f"实验 {experiment_id} 不存在")
experiment = self.experiments[experiment_id]
# 更新当前数据
experiment['current_data'][variant]['trials'] += 1
if success:
experiment['current_data'][variant]['successes'] += 1
# 检查停止条件
if self._check_experiment_completion(experiment_id):
experiment['status'] = 'completed'
experiment['end_time'] = pd.Timestamp.now()
# 归档实验结果
self._archive_experiment(experiment_id)
def _check_experiment_completion(self, experiment_id):
"""
检查实验是否完成
"""
experiment = self.experiments[experiment_id]
current_data = experiment['current_data']
# 计算后验分布
bayesian_calc = BayesianConversionRate()
control_variant = experiment['variants'][0]
for variant in experiment['variants']:
data = current_data[variant]
prior = experiment['prior_params'].get(variant, {'alpha': 1, 'beta': 1})
bayesian_calc.prior_alpha = prior['alpha']
bayesian_calc.prior_beta = prior['beta']
bayesian_calc.update_posterior(data['successes'], data['trials'], variant)
# 检查所有实验组与对照组的比较
for variant in experiment['variants'][1:]:
prob_beating = bayesian_calc.calculate_probability_beating_control(
variant, control_variant
)
expected_loss = bayesian_calc.calculate_expected_loss(variant, control_variant)
# 停止条件:高概率且低风险,或明确无效果
if (prob_beating > 0.99 and expected_loss['loss_if_choose_treatment'] < 0.001) or \
(prob_beating < 0.01 and expected_loss['loss_if_choose_control'] < 0.001):
return True
# 最大样本量限制
total_trials = sum(data['trials'] for data in current_data.values())
if total_trials >= 100000: # 安全限制
return True
return False
def _archive_experiment(self, experiment_id):
"""
归档实验数据到历史库
"""
experiment = self.experiments[experiment_id]
objective_type = experiment['objective_type']
if objective_type not in self.historical_data:
self.historical_data[objective_type] = []
# 存储实验总结
experiment_summary = {
'experiment_id': experiment_id,
'completion_time': experiment['end_time'],
'results': experiment['current_data'].copy(),
'duration': (experiment['end_time'] - experiment['start_time']).total_seconds(),
'total_traffic': sum(data['trials'] for data in experiment['current_data'].values())
}
self.historical_data[objective_type].append(experiment_summary)
def get_experiment_recommendation(self, experiment_id):
"""
获取实验推荐
"""
if experiment_id not in self.experiments:
raise ValueError(f"实验 {experiment_id} 不存在")
experiment = self.experiments[experiment_id]
current_data = experiment['current_data']
# 计算后验比较
bayesian_calc = BayesianConversionRate()
control_variant = experiment['variants'][0]
recommendations = []
for variant in experiment['variants'][1:]:
data = current_data[variant]
prior = experiment['prior_params'].get(variant, {'alpha': 1, 'beta': 1})
bayesian_calc.prior_alpha = prior['alpha']
bayesian_calc.prior_beta = prior['beta']
bayesian_calc.update_posterior(data['successes'], data['trials'], variant)
prob_beating = bayesian_calc.calculate_probability_beating_control(
variant, control_variant
)
expected_loss = bayesian_calc.calculate_expected_loss(variant, control_variant)
recommendations.append({
'variant': variant,
'probability_beating_control': prob_beating,
'expected_loss_if_chosen': expected_loss['loss_if_choose_treatment'],
'expected_gain_if_chosen': -expected_loss['loss_if_choose_control'],
'recommendation': '选择' if prob_beating > 0.95 and expected_loss['loss_if_choose_treatment'] < 0.005 else '继续测试'
})
return recommendations
def plot_experiment_dashboard(self, experiment_id):
"""
绘制实验仪表板
"""
experiment = self.experiments[experiment_id]
current_data = experiment['current_data']
fig, ((ax1, ax2), (ax3, ax4)) = plt.subplots(2, 2, figsize=(15, 12))
# 转化率后验分布
bayesian_calc = BayesianConversionRate()
x = np.linspace(0, 1, 1000)
for variant in experiment['variants']:
data = current_data[variant]
prior = experiment['prior_params'].get(variant, {'alpha': 1, 'beta': 1})
bayesian_calc.prior_alpha = prior['alpha']
bayesian_calc.prior_beta = prior['beta']
alpha_post, beta_post = bayesian_calc.update_posterior(
data['successes'], data['trials'], variant
)
y = stats.beta.pdf(x, alpha_post, beta_post)
ax1.plot(x, y, label=f'{variant} (观测: {data["successes"]}/{data["trials"]})', linewidth=2)
ax1.set_xlabel('转化率')
ax1.set_ylabel('概率密度')
ax1.set_title('后验分布比较')
ax1.legend()
ax1.grid(True, alpha=0.3)
# 样本量分布
variants = list(current_data.keys())
trials = [current_data[v]['trials'] for v in variants]
ax2.bar(variants, trials, alpha=0.7)
ax2.set_ylabel('样本量')
ax2.set_title('各变体样本量分布')
# 概率轨迹(如果有时序数据)
if 'history' in experiment:
history = experiment['history']
timestamps = range(len(history))
for variant in experiment['variants'][1:]:
probs = [h['metrics'].get(f'{variant}_vs_{experiment["variants"][0]}', {}).get('probability_beating_control', 0)
for h in history]
ax3.plot(timestamps, probs, label=variant, linewidth=2)
ax3.axhline(y=0.95, color='red', linestyle='--', alpha=0.7, label='95%阈值')
ax3.set_xlabel('时间点')
ax3.set_ylabel('优于对照组的概率')
ax3.set_title('概率轨迹')
ax3.legend()
ax3.grid(True, alpha=0.3)
# 预期损失
recommendations = self.get_experiment_recommendation(experiment_id)
variants_rec = [rec['variant'] for rec in recommendations]
losses = [rec['expected_loss_if_chosen'] for rec in recommendations]
ax4.bar(variants_rec, losses, alpha=0.7, color='orange')
ax4.axhline(y=0.005, color='red', linestyle='--', alpha=0.7, label='风险阈值')
ax4.set_ylabel('预期损失')
ax4.set_title('决策风险分析')
ax4.legend()
plt.tight_layout()
plt.show()
# 初始化电商实验系统
historical_data = {
'conversion': [
{'successes': 450, 'trials': 5000}, # 历史实验1
{'successes': 620, 'trials': 6000}, # 历史实验2
{'successes': 380, 'trials': 4000}, # 历史实验3
]
}
experiment_system = BayesianExperimentSystem(historical_data)
# 创建新实验
experiment = experiment_system.create_experiment(
experiment_id='checkout_redesign_2024',
variants=['control', 'new_design_A', 'new_design_B'],
objective_type='conversion',
prior_source='historical'
)
print(f"创建实验: {experiment['id']}")
print(f"使用先验: {experiment['prior_params']}")
# 模拟实验数据
np.random.seed(42)
true_rates = {'control': 0.09, 'new_design_A': 0.11, 'new_design_B': 0.095}
for i in range(5000):
variant = np.random.choice(experiment['variants'])
success = np.random.random() < true_rates[variant]
user_id = f"user_{i}"
experiment_system.log_event('checkout_redesign_2024', variant, user_id, success)
# 定期检查进度
if i % 1000 == 0:
recommendations = experiment_system.get_experiment_recommendation('checkout_redesign_2024')
print(f"\n进度 {i}/5000:")
for rec in recommendations:
print(f" {rec['variant']}: P(优于对照组)={rec['probability_beating_control']:.3f}, "
f"推荐: {rec['recommendation']}")
# 最终推荐
final_recommendations = experiment_system.get_experiment_recommendation('checkout_redesign_2024')
print("\n最终推荐:")
for rec in final_recommendations:
print(f" {rec['variant']}:")
print(f" 优于对照组的概率: {rec['probability_beating_control']:.3f}")
print(f" 选择该变体的预期损失: {rec['expected_loss_if_chosen']:.4f}")
print(f" 推荐: {rec['recommendation']}")
# 显示仪表板
experiment_system.plot_experiment_dashboard('checkout_redesign_2024')
系统架构与工作流
VI. 贝叶斯A/B测试的最佳实践与挑战
实施最佳实践
成功实施贝叶斯A/B测试需要遵循以下最佳实践:
实践领域 | 具体建议 | 预期效果 |
---|---|---|
先验选择 | 使用历史数据或弱信息先验 | 提高统计效率,减少所需样本量 |
停止规则 | 基于概率阈值和预期损失 | 平衡决策速度与风险控制 |
结果解释 | 培训团队理解概率性结论 | 改善决策质量,减少误解 |
系统集成 | 与现有数据管道集成 | 提高实施效率,减少摩擦 |
常见挑战与解决方案
挑战 | 症状 | 解决方案 |
---|---|---|
先验敏感性 | 不同先验导致不同结论 | 进行先验敏感性分析,使用鲁棒先验 |
计算复杂性 | 后验计算耗时 | 使用共轭先验,近似计算方法 |
组织接受度 | 团队不熟悉贝叶斯概念 | 开展培训,提供直观可视化工具 |
多重比较 | 同时运行多个实验 | 使用分层模型共享信息 |
敏感性分析框架
class SensitivityAnalyzer:
"""先验敏感性分析"""
def __init__(self, data):
self.data = data
def analyze_prior_sensitivity(self, variant_pairs, prior_configs):
"""
分析先验敏感性
"""
sensitivity_results = {}
for variant_A, variant_B in variant_pairs:
data_A = self.data[variant_A]
data_B = self.data[variant_B]
config_results = []
for config_name, prior_params in prior_configs.items():
# 使用不同先验计算后验
bayesian_calc = BayesianConversionRate(
prior_alpha=prior_params['alpha'],
prior_beta=prior_params['beta']
)
bayesian_calc.update_posterior(
data_A['successes'], data_A['trials'], 'A'
)
bayesian_calc.update_posterior(
data_B['successes'], data_B['trials'], 'B'
)
prob_A_beats_B = bayesian_calc.calculate_probability_beating_control('A', 'B')
config_results.append({
'config_name': config_name,
'prior_alpha': prior_params['alpha'],
'prior_beta': prior_params['beta'],
'probability_A_beats_B': prob_A_beats_B
})
sensitivity_results[f'{variant_A}_vs_{variant_B}'] = config_results
return sensitivity_results
def plot_sensitivity_analysis(self, sensitivity_results):
"""
绘制敏感性分析结果
"""
fig, axes = plt.subplots(1, len(sensitivity_results), figsize=(5*len(sensitivity_results), 6))
if len(sensitivity_results) == 1:
axes = [axes]
for idx, (comparison, results) in enumerate(sensitivity_results.items()):
config_names = [r['config_name'] for r in results]
probabilities = [r['probability_A_beats_B'] for r in results]
axes[idx].bar(config_names, probabilities, alpha=0.7)
axes[idx].set_ylabel('P(A > B)')
axes[idx].set_title(f'先验敏感性: {comparison}')
axes[idx].tick_params(axis='x', rotation=45)
# 添加参考线
axes[idx].axhline(y=0.5, color='red', linestyle='--', alpha=0.5)
axes[idx].axhline(y=0.95, color='green', linestyle='--', alpha=0.5)
plt.tight_layout()
plt.show()
# 敏感性分析示例
data = {
'control': {'successes': 120, 'trials': 1000},
'treatment': {'successes': 150, 'trials': 1000}
}
prior_configs = {
'无信息先验': {'alpha': 1, 'beta': 1},
'弱信息先验': {'alpha': 2, 'beta': 2},
'历史数据先验': {'alpha': 30, 'beta': 270},
'乐观先验': {'alpha': 5, 'beta': 45},
'悲观先验': {'alpha': 1, 'beta': 9}
}
sensitivity_analyzer = SensitivityAnalyzer(data)
sensitivity_results = sensitivity_analyzer.analyze_prior_sensitivity(
[('treatment', 'control')], prior_configs
)
sensitivity_analyzer.plot_sensitivity_analysis(sensitivity_results)
print("敏感性分析结果:")
for comparison, results in sensitivity_results.items():
print(f"\n{comparison}:")
for result in results:
print(f" {result['config_name']}: P(A>B) = {result['probability_A_beats_B']:.3f}")
组织采纳路线图
Parse error on line 1: timeline title 贝 ^ Expecting 'open_directive', 'NEWLINE', 'SPACE', 'GRAPH', got 'ALPHA'
【声明】本内容来自华为云开发者社区博主,不代表华为云及华为云开发者社区的观点和立场。转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息,否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱:
cloudbbs@huaweicloud.com
- 点赞
- 收藏
- 关注作者
评论(0)