贝叶斯方法在A/B测试中的创新应用
        【摘要】  I. 传统A/B测试的局限性与贝叶斯范式的优势 传统频率主义方法的挑战传统A/B测试基于频率主义统计框架,这种方法在实践中面临多个核心挑战:样本量刚性:必须预先确定样本量,无法灵活调整多次检验问题:无法在测试过程中随时查看结果而不增加第一类错误结果解释困难:p值和置信区间的解释常常被误解决策二元化:只能得出"显著"或"不显著"的结论,缺乏 nuance 实例分析:电商平台的决策困境某电商平...
    
    
    
    I. 传统A/B测试的局限性与贝叶斯范式的优势
传统频率主义方法的挑战
传统A/B测试基于频率主义统计框架,这种方法在实践中面临多个核心挑战:
- 样本量刚性:必须预先确定样本量,无法灵活调整
- 多次检验问题:无法在测试过程中随时查看结果而不增加第一类错误
- 结果解释困难:p值和置信区间的解释常常被误解
- 决策二元化:只能得出"显著"或"不显著"的结论,缺乏 nuance
实例分析:电商平台的决策困境
某电商平台进行支付流程优化的A/B测试,计划收集5000用户样本。当收集到3000用户时,产品经理观察到变体B的转化率似乎更高,希望提前结束测试上线新版本。但数据科学家警告说,这样做会增大假阳性风险。
团队陷入两难:
- 如果继续测试,可能错过业务机会
- 如果提前停止,可能基于噪声做出错误决策
- 无法量化当前证据的支持强度
贝叶斯方法的竞争优势
| 维度 | 传统方法 | 贝叶斯方法 | 
|---|---|---|
| 样本量 | 固定,需预先计算 | 灵活,支持序贯分析 | 
| 监测频率 | 有限制,避免α膨胀 | 任意频率,无多重检验问题 | 
| 结果解释 | 基于p值和置信区间 | 直接概率陈述,更直观 | 
| 先验知识 | 无法纳入 | 可整合历史数据和领域知识 | 
| 决策支持 | 二元结论 | 概率性结论,支持风险量化 | 
范式转变的核心价值
II. 贝叶斯A/B测试的数学基础与核心概念
贝叶斯定理在A/B测试中的应用
贝叶斯定理为A/B测试提供了完整的概率框架:
在A/B测试语境中:
- :假设(如"变体B优于变体A")
- :观测数据
- :先验概率(基于历史数据或领域知识)
- :似然函数(数据在假设下的概率)
- :后验概率(给定数据后假设的概率)
共轭先验分布的选择
对于常见的A/B测试指标,推荐使用共轭先验分布:
| 数据类型 | 似然分布 | 共轭先验 | 后验分布 | 
|---|---|---|---|
| 转化率 | 伯努利 | Beta分布 | Beta分布 | 
| 点击率 | 二项 | Beta分布 | Beta分布 | 
| 收入/价值 | 正态 | 正态-逆伽马 | 正态-逆伽马 | 
| 计数数据 | 泊松 | Gamma分布 | Gamma分布 | 
后验分布的计算
贝叶斯A/B测试的核心是通过后验分布进行推断。对于转化率测试,后验分布的计算如下:
import numpy as np
import matplotlib.pyplot as plt
import scipy.stats as stats
from scipy import integrate
import pandas as pd
class BayesianConversionRate:
    """贝叶斯转化率测试基础类"""
    
    def __init__(self, prior_alpha=1, prior_beta=1):
        """
        初始化贝叶斯转化率测试
        
        参数:
        prior_alpha: Beta先验分布的α参数
        prior_beta: Beta先验分布的β参数
        """
        self.prior_alpha = prior_alpha
        self.prior_beta = prior_beta
        self.results = {}
    
    def update_posterior(self, successes, trials, variant_name):
        """
        更新后验分布
        
        参数:
        successes: 成功次数
        trials: 总试验次数
        variant_name: 变体名称
        """
        posterior_alpha = self.prior_alpha + successes
        posterior_beta = self.prior_beta + (trials - successes)
        
        self.results[variant_name] = {
            'successes': successes,
            'trials': trials,
            'posterior_alpha': posterior_alpha,
            'posterior_beta': posterior_beta,
            'observed_rate': successes / trials if trials > 0 else 0
        }
        
        return posterior_alpha, posterior_beta
    
    def calculate_probability_beating_control(self, treatment_name, control_name='control'):
        """
        计算实验组优于对照组的概率
        """
        if treatment_name not in self.results or control_name not in self.results:
            raise ValueError("必须先更新两个变体的后验分布")
        
        treatment_alpha = self.results[treatment_name]['posterior_alpha']
        treatment_beta = self.results[treatment_name]['posterior_beta']
        control_alpha = self.results[control_name]['posterior_alpha']
        control_beta = self.results[control_name]['posterior_beta']
        
        # 通过数值积分计算P(treatment > control)
        probability = integrate.quad(
            lambda p: stats.beta.pdf(p, treatment_alpha, treatment_beta) * 
                      stats.beta.cdf(p, control_alpha, control_beta),
            0, 1
        )[0]
        
        return probability
    
    def calculate_expected_loss(self, treatment_name, control_name='control'):
        """
        计算预期损失(如果做出错误决策的预期代价)
        """
        treatment_alpha = self.results[treatment_name]['posterior_alpha']
        treatment_beta = self.results[treatment_name]['posterior_beta']
        control_alpha = self.results[control_name]['posterior_alpha']
        control_beta = self.results[control_name]['posterior_beta']
        
        # 计算预期损失
        samples_treatment = np.random.beta(treatment_alpha, treatment_beta, 100000)
        samples_control = np.random.beta(control_alpha, control_beta, 100000)
        
        loss_treatment = np.maximum(samples_control - samples_treatment, 0).mean()
        loss_control = np.maximum(samples_treatment - samples_control, 0).mean()
        
        return {
            'loss_if_choose_treatment': loss_treatment,
            'loss_if_choose_control': loss_control
        }
    
    def plot_posterior_distributions(self, variants=None):
        """
        绘制后验分布图
        """
        if variants is None:
            variants = list(self.results.keys())
        
        plt.figure(figsize=(12, 6))
        x = np.linspace(0, 1, 1000)
        
        for variant in variants:
            alpha = self.results[variant]['posterior_alpha']
            beta = self.results[variant]['posterior_beta']
            observed_rate = self.results[variant]['observed_rate']
            
            y = stats.beta.pdf(x, alpha, beta)
            plt.plot(x, y, label=f'{variant} (观测: {observed_rate:.3f})', linewidth=2)
            
            # 添加最高密度区间
            hdi_low, hdi_high = self.calculate_hdi(alpha, beta)
            plt.axvspan(hdi_low, hdi_high, alpha=0.2)
        
        plt.xlabel('转化率')
        plt.ylabel('概率密度')
        plt.title('后验分布比较')
        plt.legend()
        plt.grid(True, alpha=0.3)
        plt.show()
    
    def calculate_hdi(self, alpha, beta, credible_level=0.95):
        """
        计算最高密度区间(HDI)
        """
        # 简化实现 - 实际中可能使用更精确的方法
        samples = np.random.beta(alpha, beta, 100000)
        lower_percentile = (1 - credible_level) / 2
        upper_percentile = 1 - lower_percentile
        
        return np.percentile(samples, lower_percentile * 100), np.percentile(samples, upper_percentile * 100)
# 示例使用
bayesian_test = BayesianConversionRate(prior_alpha=2, prior_beta=2)
# 模拟数据:对照组和实验组
control_successes = 120
control_trials = 1000
treatment_successes = 150
treatment_trials = 1000
# 更新后验分布
bayesian_test.update_posterior(control_successes, control_trials, 'control')
bayesian_test.update_posterior(treatment_successes, treatment_trials, 'treatment')
# 计算关键指标
prob_beating_control = bayesian_test.calculate_probability_beating_control('treatment')
expected_loss = bayesian_test.calculate_expected_loss('treatment')
print(f"实验组优于对照组的概率: {prob_beating_control:.3f}")
print(f"如果选择实验组的预期损失: {expected_loss['loss_if_choose_treatment']:.4f}")
print(f"如果选择对照组的预期损失: {expected_loss['loss_if_choose_control']:.4f}")
# 可视化后验分布
bayesian_test.plot_posterior_distributions()
贝叶斯推理的核心概念
III. 贝叶斯A/B测试的完整实施框架
端到端实施流程
建立完整的贝叶斯A/B测试流程需要系统化的方法:
class BayesianABTestFramework:
    """贝叶斯A/B测试完整框架"""
    
    def __init__(self, variants, prior_params=None, decision_threshold=0.95, 
                 risk_threshold=0.01):
        """
        初始化测试框架
        
        参数:
        variants: 变体列表
        prior_params: 各变体的先验参数
        decision_threshold: 决策阈值(概率)
        risk_threshold: 风险阈值(预期损失)
        """
        self.variants = variants
        self.decision_threshold = decision_threshold
        self.risk_threshold = risk_threshold
        
        if prior_params is None:
            # 默认使用无信息先验
            self.prior_params = {variant: {'alpha': 1, 'beta': 1} for variant in variants}
        else:
            self.prior_params = prior_params
        
        self.test_history = []
        self.current_state = {variant: {'successes': 0, 'trials': 0} for variant in variants}
        
    def add_observation(self, variant, success):
        """
        添加新的观测数据
        """
        self.current_state[variant]['trials'] += 1
        if success:
            self.current_state[variant]['successes'] += 1
        
        # 记录历史状态
        snapshot = {
            'timestamp': len(self.test_history),
            'state': self.current_state.copy(),
            'metrics': self._calculate_current_metrics()
        }
        self.test_history.append(snapshot)
        
        return snapshot
    
    def _calculate_current_metrics(self):
        """
        计算当前所有变体的后验指标
        """
        metrics = {}
        bayesian_calculator = BayesianConversionRate()
        
        # 为每个变体更新后验
        for variant in self.variants:
            state = self.current_state[variant]
            prior_alpha = self.prior_params[variant]['alpha']
            prior_beta = self.prior_params[variant]['beta']
            
            bayesian_calculator.prior_alpha = prior_alpha
            bayesian_calculator.prior_beta = prior_beta
            bayesian_calculator.update_posterior(
                state['successes'], state['trials'], variant
            )
        
        # 计算比较指标
        control_variant = self.variants[0]  # 假设第一个是对照组
        for variant in self.variants[1:]:
            prob_beating = bayesian_calculator.calculate_probability_beating_control(
                variant, control_variant
            )
            expected_loss = bayesian_calculator.calculate_expected_loss(
                variant, control_variant
            )
            
            metrics[f'{variant}_vs_{control_variant}'] = {
                'probability_beating_control': prob_beating,
                'expected_loss': expected_loss['loss_if_choose_treatment'],
                'expected_gain': -expected_loss['loss_if_choose_control']
            }
        
        return metrics
    
    def check_stopping_conditions(self):
        """
        检查停止条件
        """
        current_metrics = self.test_history[-1]['metrics'] if self.test_history else {}
        
        stopping_conditions = {}
        recommendation = "继续测试"
        
        for comparison, metrics in current_metrics.items():
            prob_beating = metrics['probability_beating_control']
            expected_loss = metrics['expected_loss']
            
            # 检查是否达到决策阈值且风险可接受
            if prob_beating > self.decision_threshold and expected_loss < self.risk_threshold:
                stopping_conditions[comparison] = {
                    'met': True,
                    'reason': f'概率({prob_beating:.3f}) > 阈值({self.decision_threshold}) 且 损失({expected_loss:.4f}) < 风险阈值({self.risk_threshold})',
                    'recommendation': f'选择 {comparison.split("_vs_")[0]}'
                }
                recommendation = f"停止测试: {stopping_conditions[comparison]['recommendation']}"
            elif prob_beating < (1 - self.decision_threshold) and expected_loss < self.risk_threshold:
                stopping_conditions[comparison] = {
                    'met': True,
                    'reason': f'概率({prob_beating:.3f}) < {1-self.decision_threshold} 且 损失({expected_loss:.4f}) < 风险阈值({self.risk_threshold})',
                    'recommendation': f'选择对照组'
                }
                recommendation = "停止测试: 选择对照组"
            else:
                stopping_conditions[comparison] = {
                    'met': False,
                    'reason': '未达到停止条件'
                }
        
        return {
            'stopping_conditions': stopping_conditions,
            'overall_recommendation': recommendation,
            'should_stop': any(cond['met'] for cond in stopping_conditions.values())
        }
    
    def run_sequential_test(self, data_stream, max_samples=10000):
        """
        运行序贯测试
        """
        results = []
        
        for i, (variant, success) in enumerate(data_stream):
            if i >= max_samples:
                break
                
            # 添加观测
            snapshot = self.add_observation(variant, success)
            
            # 检查停止条件
            stopping_check = self.check_stopping_conditions()
            snapshot['stopping_check'] = stopping_check
            
            results.append(snapshot)
            
            print(f"样本 {i+1}: {variant} - {'成功' if success else '失败'}")
            print(f"  当前推荐: {stopping_check['overall_recommendation']}")
            
            if stopping_check['should_stop']:
                print("达到停止条件,结束测试")
                break
        
        return results
    
    def plot_test_progress(self):
        """
        绘制测试进度图
        """
        if not self.test_history:
            print("尚无测试数据")
            return
        
        timestamps = [entry['timestamp'] for entry in self.test_history]
        
        # 提取概率轨迹
        prob_data = {}
        for variant in self.variants[1:]:
            comparison_key = f'{variant}_vs_{self.variants[0]}'
            prob_data[variant] = [
                entry['metrics'][comparison_key]['probability_beating_control'] 
                for entry in self.test_history 
                if comparison_key in entry['metrics']
            ]
        
        plt.figure(figsize=(12, 8))
        
        # 概率轨迹
        plt.subplot(2, 1, 1)
        for variant, probs in prob_data.items():
            plt.plot(timestamps[:len(probs)], probs, label=variant, linewidth=2)
        
        plt.axhline(y=self.decision_threshold, color='red', linestyle='--', 
                   label=f'决策阈值 ({self.decision_threshold})')
        plt.axhline(y=1-self.decision_threshold, color='blue', linestyle='--',
                   label=f'反向阈值 ({1-self.decision_threshold})')
        plt.xlabel('样本数量')
        plt.ylabel('优于对照组的概率')
        plt.title('贝叶斯A/B测试进度 - 概率轨迹')
        plt.legend()
        plt.grid(True, alpha=0.3)
        
        # 样本量分布
        plt.subplot(2, 1, 2)
        sample_sizes = []
        for entry in self.test_history:
            sizes = [entry['state'][v]['trials'] for v in self.variants]
            sample_sizes.append(sizes)
        
        sample_sizes = np.array(sample_sizes)
        for i, variant in enumerate(self.variants):
            plt.plot(timestamps, sample_sizes[:, i], label=variant, linewidth=2)
        
        plt.xlabel('样本数量')
        plt.ylabel('各变体样本量')
        plt.title('样本量累积过程')
        plt.legend()
        plt.grid(True, alpha=0.3)
        
        plt.tight_layout()
        plt.show()
# 模拟数据流生成器
def simulate_data_stream(variants, true_rates, total_samples=5000):
    """模拟数据流"""
    for i in range(total_samples):
        variant = np.random.choice(variants)
        success = np.random.random() < true_rates[variant]
        yield variant, success
# 示例实施
variants = ['control', 'treatment_A', 'treatment_B']
true_rates = {'control': 0.10, 'treatment_A': 0.12, 'treatment_B': 0.09}
# 使用历史数据设置信息先验
prior_params = {
    'control': {'alpha': 30, 'beta': 270},      # 基于历史数据:约10%转化率
    'treatment_A': {'alpha': 1, 'beta': 1},     # 新变体,使用弱信息先验
    'treatment_B': {'alpha': 1, 'beta': 1}
}
# 初始化测试框架
ab_test = BayesianABTestFramework(
    variants=variants,
    prior_params=prior_params,
    decision_threshold=0.95,
    risk_threshold=0.005
)
# 运行测试
data_stream = simulate_data_stream(variants, true_rates, 2000)
results = ab_test.run_sequential_test(data_stream)
# 绘制测试进度
ab_test.plot_test_progress()
实施流程可视化
IV. 高级贝叶斯方法:多臂老虎机与Thompson采样
多臂老虎机问题
在多变体测试场景中,传统的A/B/n测试需要将流量均匀分配给所有变体,这导致了统计效率低下。多臂老虎机方法通过动态流量分配来解决这个问题。
Thompson采样原理
Thompson采样是一种贝叶斯优化方法,其核心思想是根据当前后验分布随机采样,选择表现最佳的变体:
class ThompsonSamplingBandit:
    """Thompson采样多臂老虎机"""
    
    def __init__(self, variants, prior_alpha=1, prior_beta=1, 
                 exploration_weight=1.0):
        """
        初始化Thompson采样
        
        参数:
        variants: 变体列表
        prior_alpha, prior_beta: Beta先验参数
        exploration_weight: 探索权重
        """
        self.variants = variants
        self.prior_alpha = prior_alpha
        self.prior_beta = prior_beta
        self.exploration_weight = exploration_weight
        
        # 初始化每个变体的后验参数
        self.posteriors = {
            variant: {'alpha': prior_alpha, 'beta': prior_beta} 
            for variant in variants
        }
        
        self.history = []
    
    def select_variant(self):
        """
        选择下一个要展示的变体
        """
        # 从每个变体的后验分布中采样
        samples = {}
        for variant, posterior in self.posteriors.items():
            # 添加探索权重
            alpha = posterior['alpha'] * self.exploration_weight
            beta = posterior['beta'] * self.exploration_weight
            samples[variant] = np.random.beta(alpha, beta)
        
        # 选择采样值最大的变体
        selected_variant = max(samples, key=samples.get)
        
        return selected_variant, samples
    
    def update_posterior(self, variant, success):
        """
        更新选定变体的后验分布
        """
        if success:
            self.posteriors[variant]['alpha'] += 1
        else:
            self.posteriors[variant]['beta'] += 1
        
        # 记录历史
        self.history.append({
            'variant': variant,
            'success': success,
            'posteriors': self.posteriors.copy()
        })
    
    def run_bandit_experiment(self, data_generator, n_rounds=1000):
        """
        运行老虎机实验
        """
        results = {
            'selections': {variant: 0 for variant in self.variants},
            'successes': {variant: 0 for variant in self.variants},
            'trials': {variant: 0 for variant in self.variants}
        }
        
        conversion_rates = []
        selection_counts = []
        
        for round in range(n_rounds):
            # 选择变体
            selected_variant, samples = self.select_variant()
            results['selections'][selected_variant] += 1
            
            # 生成结果(在实际应用中来自真实用户)
            true_rate = data_generator.get_true_rate(selected_variant)
            success = np.random.random() < true_rate
            
            # 更新后验
            self.update_posterior(selected_variant, success)
            
            # 记录结果
            results['trials'][selected_variant] += 1
            if success:
                results['successes'][selected_variant] += 1
            
            # 记录进度
            if round % 100 == 0:
                current_rates = {
                    variant: results['successes'][variant] / max(1, results['trials'][variant])
                    for variant in self.variants
                }
                conversion_rates.append(current_rates)
                selection_counts.append(results['selections'].copy())
        
        return results, conversion_rates, selection_counts
    
    def plot_bandit_performance(self, conversion_rates, selection_counts, true_rates):
        """
        绘制老虎机性能
        """
        rounds = list(range(0, len(conversion_rates) * 100, 100))
        
        fig, (ax1, ax2, ax3) = plt.subplots(3, 1, figsize=(12, 12))
        
        # 转化率收敛情况
        for variant in self.variants:
            rates = [cr[variant] for cr in conversion_rates]
            ax1.plot(rounds, rates, label=variant, linewidth=2)
            ax1.axhline(y=true_rates[variant], linestyle='--', alpha=0.7, 
                       label=f'{variant}真实值')
        
        ax1.set_xlabel('轮次')
        ax1.set_ylabel('观测转化率')
        ax1.set_title('Thompson采样 - 转化率收敛')
        ax1.legend()
        ax1.grid(True, alpha=0.3)
        
        # 选择比例变化
        for variant in self.variants:
            selections = [sc[variant] for sc in selection_counts]
            total_selections = [sum(sc.values()) for sc in selection_counts]
            proportions = [s / t for s, t in zip(selections, total_selections)]
            ax2.plot(rounds, proportions, label=variant, linewidth=2)
        
        ax2.set_xlabel('轮次')
        ax2.set_ylabel('选择比例')
        ax2.set_title('Thompson采样 - 流量分配演变')
        ax2.legend()
        ax2.grid(True, alpha=0.3)
        
        # 后悔值分析
        best_rate = max(true_rates.values())
        regret_data = []
        for i, cr in enumerate(conversion_rates):
            round_regret = 0
            for variant in self.variants:
                if selection_counts[i][variant] > 0:
                    regret = (best_rate - true_rates[variant]) * selection_counts[i][variant]
                    round_regret += regret
            regret_data.append(round_regret)
        
        cumulative_regret = np.cumsum(regret_data)
        ax3.plot(rounds, cumulative_regret, linewidth=2, color='red')
        ax3.set_xlabel('轮次')
        ax3.set_ylabel('累积后悔值')
        ax3.set_title('Thompson采样 - 累积后悔值')
        ax3.grid(True, alpha=0.3)
        
        plt.tight_layout()
        plt.show()
# 数据生成器类
class DataGenerator:
    def __init__(self, true_rates):
        self.true_rates = true_rates
    
    def get_true_rate(self, variant):
        return self.true_rates[variant]
# 比较Thompson采样与A/B测试
def compare_methods(true_rates, total_traffic=10000):
    """比较Thompson采样与传统A/B测试"""
    
    variants = list(true_rates.keys())
    best_variant = max(true_rates, key=true_rates.get)
    best_rate = true_rates[best_variant]
    
    # Thompson采样
    ts_bandit = ThompsonSamplingBandit(variants, exploration_weight=1.0)
    data_gen = DataGenerator(true_rates)
    ts_results, ts_rates, ts_selections = ts_bandit.run_bandit_experiment(
        data_gen, total_traffic
    )
    
    # 传统A/B测试(均匀分配)
    ab_results = {variant: {'successes': 0, 'trials': 0} for variant in variants}
    traffic_per_variant = total_traffic // len(variants)
    
    for variant in variants:
        successes = np.random.binomial(traffic_per_variant, true_rates[variant])
        ab_results[variant]['successes'] = successes
        ab_results[variant]['trials'] = traffic_per_variant
    
    # 计算效率指标
    ts_conversions = sum(ts_results['successes'].values())
    ab_conversions = sum(ab_results[variant]['successes'] for variant in variants)
    
    ts_regret = (best_rate * total_traffic) - ts_conversions
    ab_regret = (best_rate * total_traffic) - ab_conversions
    
    print("方法比较结果:")
    print(f"Thompson采样总转化: {ts_conversions}")
    print(f"A/B测试总转化: {ab_conversions}")
    print(f"Thompson采样后悔值: {ts_regret:.1f}")
    print(f"A/B测试后悔值: {ab_regret:.1f}")
    print(f"效率提升: {(ab_regret - ts_regret) / ab_regret * 100:.1f}%")
    
    # 绘制比较图
    ts_bandit.plot_bandit_performance(ts_rates, ts_selections, true_rates)
    
    return ts_results, ab_results
# 运行比较实验
true_rates = {
    'control': 0.10,
    'variant_A': 0.11,
    'variant_B': 0.09,
    'variant_C': 0.13  # 最佳变体
}
ts_results, ab_results = compare_methods(true_rates, 5000)
多臂老虎机架构
V. 实际案例:电商平台的贝叶斯实验系统
业务背景
某大型电商平台面临以下挑战:
- 同时运行数十个A/B测试,流量竞争激烈
- 传统测试周期长,机会成本高
- 决策缺乏不确定性量化
- 无法有效利用历史测试数据
贝叶斯实验系统设计
class BayesianExperimentSystem:
    """电商平台贝叶斯实验系统"""
    
    def __init__(self, historical_data=None):
        self.experiments = {}
        self.historical_data = historical_data or {}
        self.prior_registry = {}
        
    def create_experiment(self, experiment_id, variants, objective_type='conversion',
                         prior_source='historical'):
        """
        创建新实验
        """
        # 基于目标类型和历史数据设置先验
        if prior_source == 'historical' and objective_type in self.historical_data:
            historical_prior = self._calculate_historical_prior(objective_type)
            prior_params = historical_prior
        else:
            # 使用弱信息先验
            prior_params = {variant: {'alpha': 1, 'beta': 1} for variant in variants}
        
        experiment = {
            'id': experiment_id,
            'variants': variants,
            'objective_type': objective_type,
            'prior_params': prior_params,
            'current_data': {v: {'successes': 0, 'trials': 0} for v in variants},
            'start_time': pd.Timestamp.now(),
            'status': 'running'
        }
        
        self.experiments[experiment_id] = experiment
        return experiment
    
    def _calculate_historical_prior(self, objective_type):
        """
        基于历史数据计算先验分布
        """
        if objective_type not in self.historical_data:
            return None
        
        historical_results = self.historical_data[objective_type]
        
        # 使用历史实验的合并后验作为新实验的先验
        total_successes = sum(r['successes'] for r in historical_results)
        total_trials = sum(r['trials'] for r in historical_results)
        
        # 计算经验贝叶斯先验
        empirical_alpha = max(1, total_successes / len(historical_results))
        empirical_beta = max(1, (total_trials - total_successes) / len(historical_results))
        
        return {'alpha': empirical_alpha, 'beta': empirical_beta}
    
    def log_event(self, experiment_id, variant, user_id, success, metadata=None):
        """
        记录实验事件
        """
        if experiment_id not in self.experiments:
            raise ValueError(f"实验 {experiment_id} 不存在")
        
        experiment = self.experiments[experiment_id]
        
        # 更新当前数据
        experiment['current_data'][variant]['trials'] += 1
        if success:
            experiment['current_data'][variant]['successes'] += 1
        
        # 检查停止条件
        if self._check_experiment_completion(experiment_id):
            experiment['status'] = 'completed'
            experiment['end_time'] = pd.Timestamp.now()
            
            # 归档实验结果
            self._archive_experiment(experiment_id)
    
    def _check_experiment_completion(self, experiment_id):
        """
        检查实验是否完成
        """
        experiment = self.experiments[experiment_id]
        current_data = experiment['current_data']
        
        # 计算后验分布
        bayesian_calc = BayesianConversionRate()
        control_variant = experiment['variants'][0]
        
        for variant in experiment['variants']:
            data = current_data[variant]
            prior = experiment['prior_params'].get(variant, {'alpha': 1, 'beta': 1})
            bayesian_calc.prior_alpha = prior['alpha']
            bayesian_calc.prior_beta = prior['beta']
            bayesian_calc.update_posterior(data['successes'], data['trials'], variant)
        
        # 检查所有实验组与对照组的比较
        for variant in experiment['variants'][1:]:
            prob_beating = bayesian_calc.calculate_probability_beating_control(
                variant, control_variant
            )
            expected_loss = bayesian_calc.calculate_expected_loss(variant, control_variant)
            
            # 停止条件:高概率且低风险,或明确无效果
            if (prob_beating > 0.99 and expected_loss['loss_if_choose_treatment'] < 0.001) or \
               (prob_beating < 0.01 and expected_loss['loss_if_choose_control'] < 0.001):
                return True
        
        # 最大样本量限制
        total_trials = sum(data['trials'] for data in current_data.values())
        if total_trials >= 100000:  # 安全限制
            return True
        
        return False
    
    def _archive_experiment(self, experiment_id):
        """
        归档实验数据到历史库
        """
        experiment = self.experiments[experiment_id]
        objective_type = experiment['objective_type']
        
        if objective_type not in self.historical_data:
            self.historical_data[objective_type] = []
        
        # 存储实验总结
        experiment_summary = {
            'experiment_id': experiment_id,
            'completion_time': experiment['end_time'],
            'results': experiment['current_data'].copy(),
            'duration': (experiment['end_time'] - experiment['start_time']).total_seconds(),
            'total_traffic': sum(data['trials'] for data in experiment['current_data'].values())
        }
        
        self.historical_data[objective_type].append(experiment_summary)
    
    def get_experiment_recommendation(self, experiment_id):
        """
        获取实验推荐
        """
        if experiment_id not in self.experiments:
            raise ValueError(f"实验 {experiment_id} 不存在")
        
        experiment = self.experiments[experiment_id]
        current_data = experiment['current_data']
        
        # 计算后验比较
        bayesian_calc = BayesianConversionRate()
        control_variant = experiment['variants'][0]
        
        recommendations = []
        for variant in experiment['variants'][1:]:
            data = current_data[variant]
            prior = experiment['prior_params'].get(variant, {'alpha': 1, 'beta': 1})
            bayesian_calc.prior_alpha = prior['alpha']
            bayesian_calc.prior_beta = prior['beta']
            bayesian_calc.update_posterior(data['successes'], data['trials'], variant)
            
            prob_beating = bayesian_calc.calculate_probability_beating_control(
                variant, control_variant
            )
            expected_loss = bayesian_calc.calculate_expected_loss(variant, control_variant)
            
            recommendations.append({
                'variant': variant,
                'probability_beating_control': prob_beating,
                'expected_loss_if_chosen': expected_loss['loss_if_choose_treatment'],
                'expected_gain_if_chosen': -expected_loss['loss_if_choose_control'],
                'recommendation': '选择' if prob_beating > 0.95 and expected_loss['loss_if_choose_treatment'] < 0.005 else '继续测试'
            })
        
        return recommendations
    
    def plot_experiment_dashboard(self, experiment_id):
        """
        绘制实验仪表板
        """
        experiment = self.experiments[experiment_id]
        current_data = experiment['current_data']
        
        fig, ((ax1, ax2), (ax3, ax4)) = plt.subplots(2, 2, figsize=(15, 12))
        
        # 转化率后验分布
        bayesian_calc = BayesianConversionRate()
        x = np.linspace(0, 1, 1000)
        
        for variant in experiment['variants']:
            data = current_data[variant]
            prior = experiment['prior_params'].get(variant, {'alpha': 1, 'beta': 1})
            bayesian_calc.prior_alpha = prior['alpha']
            bayesian_calc.prior_beta = prior['beta']
            alpha_post, beta_post = bayesian_calc.update_posterior(
                data['successes'], data['trials'], variant
            )
            
            y = stats.beta.pdf(x, alpha_post, beta_post)
            ax1.plot(x, y, label=f'{variant} (观测: {data["successes"]}/{data["trials"]})', linewidth=2)
        
        ax1.set_xlabel('转化率')
        ax1.set_ylabel('概率密度')
        ax1.set_title('后验分布比较')
        ax1.legend()
        ax1.grid(True, alpha=0.3)
        
        # 样本量分布
        variants = list(current_data.keys())
        trials = [current_data[v]['trials'] for v in variants]
        ax2.bar(variants, trials, alpha=0.7)
        ax2.set_ylabel('样本量')
        ax2.set_title('各变体样本量分布')
        
        # 概率轨迹(如果有时序数据)
        if 'history' in experiment:
            history = experiment['history']
            timestamps = range(len(history))
            
            for variant in experiment['variants'][1:]:
                probs = [h['metrics'].get(f'{variant}_vs_{experiment["variants"][0]}', {}).get('probability_beating_control', 0) 
                        for h in history]
                ax3.plot(timestamps, probs, label=variant, linewidth=2)
            
            ax3.axhline(y=0.95, color='red', linestyle='--', alpha=0.7, label='95%阈值')
            ax3.set_xlabel('时间点')
            ax3.set_ylabel('优于对照组的概率')
            ax3.set_title('概率轨迹')
            ax3.legend()
            ax3.grid(True, alpha=0.3)
        
        # 预期损失
        recommendations = self.get_experiment_recommendation(experiment_id)
        variants_rec = [rec['variant'] for rec in recommendations]
        losses = [rec['expected_loss_if_chosen'] for rec in recommendations]
        
        ax4.bar(variants_rec, losses, alpha=0.7, color='orange')
        ax4.axhline(y=0.005, color='red', linestyle='--', alpha=0.7, label='风险阈值')
        ax4.set_ylabel('预期损失')
        ax4.set_title('决策风险分析')
        ax4.legend()
        
        plt.tight_layout()
        plt.show()
# 初始化电商实验系统
historical_data = {
    'conversion': [
        {'successes': 450, 'trials': 5000},  # 历史实验1
        {'successes': 620, 'trials': 6000},  # 历史实验2
        {'successes': 380, 'trials': 4000},  # 历史实验3
    ]
}
experiment_system = BayesianExperimentSystem(historical_data)
# 创建新实验
experiment = experiment_system.create_experiment(
    experiment_id='checkout_redesign_2024',
    variants=['control', 'new_design_A', 'new_design_B'],
    objective_type='conversion',
    prior_source='historical'
)
print(f"创建实验: {experiment['id']}")
print(f"使用先验: {experiment['prior_params']}")
# 模拟实验数据
np.random.seed(42)
true_rates = {'control': 0.09, 'new_design_A': 0.11, 'new_design_B': 0.095}
for i in range(5000):
    variant = np.random.choice(experiment['variants'])
    success = np.random.random() < true_rates[variant]
    user_id = f"user_{i}"
    
    experiment_system.log_event('checkout_redesign_2024', variant, user_id, success)
    
    # 定期检查进度
    if i % 1000 == 0:
        recommendations = experiment_system.get_experiment_recommendation('checkout_redesign_2024')
        print(f"\n进度 {i}/5000:")
        for rec in recommendations:
            print(f"  {rec['variant']}: P(优于对照组)={rec['probability_beating_control']:.3f}, "
                  f"推荐: {rec['recommendation']}")
# 最终推荐
final_recommendations = experiment_system.get_experiment_recommendation('checkout_redesign_2024')
print("\n最终推荐:")
for rec in final_recommendations:
    print(f"  {rec['variant']}:")
    print(f"    优于对照组的概率: {rec['probability_beating_control']:.3f}")
    print(f"    选择该变体的预期损失: {rec['expected_loss_if_chosen']:.4f}")
    print(f"    推荐: {rec['recommendation']}")
# 显示仪表板
experiment_system.plot_experiment_dashboard('checkout_redesign_2024')
系统架构与工作流
VI. 贝叶斯A/B测试的最佳实践与挑战
实施最佳实践
成功实施贝叶斯A/B测试需要遵循以下最佳实践:
| 实践领域 | 具体建议 | 预期效果 | 
|---|---|---|
| 先验选择 | 使用历史数据或弱信息先验 | 提高统计效率,减少所需样本量 | 
| 停止规则 | 基于概率阈值和预期损失 | 平衡决策速度与风险控制 | 
| 结果解释 | 培训团队理解概率性结论 | 改善决策质量,减少误解 | 
| 系统集成 | 与现有数据管道集成 | 提高实施效率,减少摩擦 | 
常见挑战与解决方案
| 挑战 | 症状 | 解决方案 | 
|---|---|---|
| 先验敏感性 | 不同先验导致不同结论 | 进行先验敏感性分析,使用鲁棒先验 | 
| 计算复杂性 | 后验计算耗时 | 使用共轭先验,近似计算方法 | 
| 组织接受度 | 团队不熟悉贝叶斯概念 | 开展培训,提供直观可视化工具 | 
| 多重比较 | 同时运行多个实验 | 使用分层模型共享信息 | 
敏感性分析框架
class SensitivityAnalyzer:
    """先验敏感性分析"""
    
    def __init__(self, data):
        self.data = data
    
    def analyze_prior_sensitivity(self, variant_pairs, prior_configs):
        """
        分析先验敏感性
        """
        sensitivity_results = {}
        
        for variant_A, variant_B in variant_pairs:
            data_A = self.data[variant_A]
            data_B = self.data[variant_B]
            
            config_results = []
            for config_name, prior_params in prior_configs.items():
                # 使用不同先验计算后验
                bayesian_calc = BayesianConversionRate(
                    prior_alpha=prior_params['alpha'],
                    prior_beta=prior_params['beta']
                )
                
                bayesian_calc.update_posterior(
                    data_A['successes'], data_A['trials'], 'A'
                )
                bayesian_calc.update_posterior(
                    data_B['successes'], data_B['trials'], 'B'
                )
                
                prob_A_beats_B = bayesian_calc.calculate_probability_beating_control('A', 'B')
                
                config_results.append({
                    'config_name': config_name,
                    'prior_alpha': prior_params['alpha'],
                    'prior_beta': prior_params['beta'],
                    'probability_A_beats_B': prob_A_beats_B
                })
            
            sensitivity_results[f'{variant_A}_vs_{variant_B}'] = config_results
        
        return sensitivity_results
    
    def plot_sensitivity_analysis(self, sensitivity_results):
        """
        绘制敏感性分析结果
        """
        fig, axes = plt.subplots(1, len(sensitivity_results), figsize=(5*len(sensitivity_results), 6))
        
        if len(sensitivity_results) == 1:
            axes = [axes]
        
        for idx, (comparison, results) in enumerate(sensitivity_results.items()):
            config_names = [r['config_name'] for r in results]
            probabilities = [r['probability_A_beats_B'] for r in results]
            
            axes[idx].bar(config_names, probabilities, alpha=0.7)
            axes[idx].set_ylabel('P(A > B)')
            axes[idx].set_title(f'先验敏感性: {comparison}')
            axes[idx].tick_params(axis='x', rotation=45)
            
            # 添加参考线
            axes[idx].axhline(y=0.5, color='red', linestyle='--', alpha=0.5)
            axes[idx].axhline(y=0.95, color='green', linestyle='--', alpha=0.5)
        
        plt.tight_layout()
        plt.show()
# 敏感性分析示例
data = {
    'control': {'successes': 120, 'trials': 1000},
    'treatment': {'successes': 150, 'trials': 1000}
}
prior_configs = {
    '无信息先验': {'alpha': 1, 'beta': 1},
    '弱信息先验': {'alpha': 2, 'beta': 2},
    '历史数据先验': {'alpha': 30, 'beta': 270},
    '乐观先验': {'alpha': 5, 'beta': 45},
    '悲观先验': {'alpha': 1, 'beta': 9}
}
sensitivity_analyzer = SensitivityAnalyzer(data)
sensitivity_results = sensitivity_analyzer.analyze_prior_sensitivity(
    [('treatment', 'control')], prior_configs
)
sensitivity_analyzer.plot_sensitivity_analysis(sensitivity_results)
print("敏感性分析结果:")
for comparison, results in sensitivity_results.items():
    print(f"\n{comparison}:")
    for result in results:
        print(f"  {result['config_name']}: P(A>B) = {result['probability_A_beats_B']:.3f}")
组织采纳路线图
Parse error on line 1: timeline title 贝 ^ Expecting 'open_directive', 'NEWLINE', 'SPACE', 'GRAPH', got 'ALPHA'
            【声明】本内容来自华为云开发者社区博主,不代表华为云及华为云开发者社区的观点和立场。转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息,否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱:
                cloudbbs@huaweicloud.com
                
            
        
        
        
        
        
        
        - 点赞
- 收藏
- 关注作者
 
             
           
评论(0)