序贯检验在A/B测试中的应用与优势
I. 传统A/B测试的挑战与序贯检验的崛起
1.1 传统固定样本量检验的局限性
传统A/B测试采用固定样本量设计,这种方法虽然统计原理简单明了,但在实际业务环境中面临诸多挑战:
挑战类型 | 具体表现 | 业务影响 |
---|---|---|
效率低下 | 即使结果已经很明确,仍需等待预设样本量 | 延迟决策,错过业务机会 |
资源浪费 | 对明显失败或成功的实验继续投入流量 | 浪费用户流量和实验机会 |
风险暴露 | 有害的变化在实验期间持续影响用户 | 损害用户体验和业务指标 |
灵活性不足 | 无法根据中期结果调整实验参数 | 缺乏对业务环境的适应性 |
1.2 序贯检验的基本概念
序贯检验是一种在数据积累过程中多次进行统计检验的方法,其核心思想是:在每一步新数据到来时都重新评估证据强度,一旦达到预定的决策边界就立即停止实验。
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
from scipy import stats
import math
def demonstrate_sequential_vs_fixed():
"""对比序贯检验与固定样本量检验的效率差异"""
np.random.seed(42)
# 模拟参数
true_control_rate = 0.10
true_treatment_rate = 0.13 # 30%的相对提升
max_sample_size = 5000
alpha = 0.05
power = 0.8
# 计算固定样本量检验所需的样本量
fixed_sample_size = calculate_fixed_sample_size(
true_control_rate, true_treatment_rate, alpha, power
)
print(f"固定样本量检验所需样本量: {fixed_sample_size} 每组")
print(f"总样本量: {fixed_sample_size * 2}")
# 模拟多次实验
n_simulations = 1000
fixed_stopping_times = []
sequential_stopping_times = []
for sim in range(n_simulations):
# 生成数据流
control_data = []
treatment_data = []
sequential_stopped = False
sequential_stop_time = max_sample_size
for t in range(1, max_sample_size + 1):
# 生成新数据点
control_data.append(np.random.binomial(1, true_control_rate))
treatment_data.append(np.random.binomial(1, true_treatment_rate))
# 序贯检验:在每一步检查是否达到决策边界
if not sequential_stopped and t >= 100: # 至少100个样本后开始检查
control_success = np.sum(control_data)
treatment_success = np.sum(treatment_data)
control_total = len(control_data)
treatment_total = len(treatment_data)
# 使用序贯概率比检验(SPRT)
should_stop = spr_test(
control_success, control_total,
treatment_success, treatment_total,
alpha=alpha, power=power
)
if should_stop:
sequential_stopped = True
sequential_stop_time = t
# 记录停止时间
sequential_stopping_times.append(sequential_stop_time)
# 固定样本量检验的停止时间就是预设样本量
fixed_stopping_time = min(fixed_sample_size, max_sample_size)
fixed_stopping_times.append(fixed_stopping_time)
# 分析结果
avg_sequential_time = np.mean(sequential_stopping_times)
avg_fixed_time = np.mean(fixed_stopping_times)
efficiency_gain = (avg_fixed_time - avg_sequential_time) / avg_fixed_time
print(f"序贯检验平均停止时间: {avg_sequential_time:.0f} 样本")
print(f"固定样本量检验停止时间: {avg_fixed_time:.0f} 样本")
print(f"效率提升: {efficiency_gain:.1%}")
# 可视化对比
plt.figure(figsize=(10, 6))
plt.hist(sequential_stopping_times, bins=50, alpha=0.7,
label='序贯检验', color='blue')
plt.axvline(avg_sequential_time, color='blue', linestyle='--',
label=f'序贯平均 ({avg_sequential_time:.0f})')
plt.axvline(avg_fixed_time, color='red', linestyle='--',
label=f'固定样本量 ({avg_fixed_time:.0f})')
plt.xlabel('停止时间 (样本量)')
plt.ylabel('频数')
plt.title('序贯检验 vs 固定样本量检验的停止时间分布')
plt.legend()
plt.grid(True, alpha=0.3)
plt.show()
return avg_sequential_time, avg_fixed_time, efficiency_gain
def calculate_fixed_sample_size(p1, p2, alpha=0.05, power=0.8):
"""计算固定样本量检验所需的样本量"""
from statsmodels.stats.power import NormalIndPower
from math import asin, sqrt
# 计算效应量 (Cohen's h)
effect_size = 2 * asin(sqrt(p2)) - 2 * asin(sqrt(p1))
# 使用statsmodels计算样本量
power_analysis = NormalIndPower()
sample_size = power_analysis.solve_power(
effect_size=effect_size,
alpha=alpha,
power=power,
ratio=1.0
)
return int(round(sample_size))
def spr_test(control_success, control_total, treatment_success, treatment_total,
alpha=0.05, power=0.8):
"""
简化的序贯概率比检验(SPRT)
返回是否应该停止实验
"""
# 计算似然比
p_control = control_success / control_total
p_treatment = treatment_success / treatment_total
# 避免极端情况
p_control = max(0.01, min(0.99, p_control))
p_treatment = max(0.01, min(0.99, p_treatment))
# 计算对数似然比
log_likelihood_ratio = (
treatment_success * np.log(p_treatment / p_control) +
(treatment_total - treatment_success) * np.log((1 - p_treatment) / (1 - p_control))
)
# 简化的决策边界(实际应用中应该使用更精确的边界)
upper_boundary = np.log((1 - beta) / alpha) # 备择假设边界
lower_boundary = np.log(beta / (1 - alpha)) # 零假设边界
beta = 1 - power
if log_likelihood_ratio >= upper_boundary:
return True # 拒绝零假设,支持备择假设
elif log_likelihood_ratio <= lower_boundary:
return True # 接受零假设
else:
return False # 继续实验
# 运行对比演示
avg_seq, avg_fixed, efficiency = demonstrate_sequential_vs_fixed()
1.3 序贯检验的发展历程
序贯检验并非新生事物,其理论基础可以追溯到20世纪40年代:
时间 | 里程碑 | 贡献者 | 意义 |
---|---|---|---|
1943年 | 序贯概率比检验(SPRT) | Abraham Wald | 奠定了序贯分析的理论基础 |
1945年 | 序贯分析专著 | Abraham Wald | 系统化序贯检验方法 |
1970年代 | 分组序贯设计 | 多位统计学家 | 适应临床实验的定期监测需求 |
1990年代 | 自适应设计 | 工业统计界 | 将序贯思想扩展到更广泛的实验设计 |
2000年代 | 在线实验应用 | 科技公司 | 将序贯检验大规模应用于A/B测试 |
II. 序贯检验的统计学基础
2.1 序贯概率比检验(SPRT)
SPRT是序贯检验最经典的方法,由Abraham Wald在1943年提出。其核心思想是基于似然比构建决策边界。
2.1.1 SPRT的数学原理
SPRT考虑两个简单的假设:
- H₀: θ = θ₀ (零假设)
- H₁: θ = θ₁ (备择假设)
在每一步,我们计算似然比:
决策规则为:
- 如果 ,拒绝H₀,接受H₁
- 如果 ,接受H₀
- 如果 ,继续实验
其中边界 ,
class SPRT:
"""序贯概率比检验实现"""
def __init__(self, alpha=0.05, power=0.8, theta0=None, theta1=None):
"""
初始化SPRT检验
Parameters:
alpha: 第一类错误概率
power: 统计功效 (1 - 第二类错误概率)
theta0: 零假设下的参数值
theta1: 备择假设下的参数值
"""
self.alpha = alpha
self.power = power
self.beta = 1 - power
# 计算决策边界
self.upper_bound = np.log((1 - self.beta) / self.alpha)
self.lower_bound = np.log(self.beta / (1 - self.alpha))
self.theta0 = theta0
self.theta1 = theta1
# 记录检验过程
self.log_likelihood_ratio = 0
self.sample_size = 0
self.decision = None # None: 继续, True: 拒绝H0, False: 接受H0
def update(self, data_point, group='treatment'):
"""
更新检验统计量
Parameters:
data_point: 新数据点 (对于二项数据,0或1)
group: 数据所属组别 ('control' 或 'treatment')
"""
if self.decision is not None:
return self.decision # 已经做出决策
self.sample_size += 1
# 计算当前数据点的对数似然比贡献
if group == 'treatment':
log_lr_contribution = self._log_likelihood_ratio_contribution(
data_point, self.theta1, self.theta0
)
else:
# 对照组数据用于估计theta0,如果未指定的话
log_lr_contribution = 0
self.log_likelihood_ratio += log_lr_contribution
# 检查决策边界
if self.log_likelihood_ratio >= self.upper_bound:
self.decision = True # 拒绝H0
elif self.log_likelihood_ratio <= self.lower_bound:
self.decision = False # 接受H0
return self.decision
def _log_likelihood_ratio_contribution(self, x, theta1, theta0):
"""计算单个数据点的对数似然比贡献"""
# 对于伯努利分布
if x == 1: # 成功
return np.log(theta1 / theta0)
else: # 失败
return np.log((1 - theta1) / (1 - theta0))
def get_current_status(self):
"""获取当前检验状态"""
return {
'sample_size': self.sample_size,
'log_likelihood_ratio': self.log_likelihood_ratio,
'decision': self.decision,
'upper_bound': self.upper_bound,
'lower_bound': self.lower_bound
}
def visualize_test_progress(self, data_stream, true_theta=None):
"""可视化检验过程"""
# 重置检验
self.log_likelihood_ratio = 0
self.sample_size = 0
self.decision = None
# 记录过程
sample_sizes = []
log_lrs = []
decisions = []
for i, data_point in enumerate(data_stream):
decision = self.update(data_point)
sample_sizes.append(self.sample_size)
log_lrs.append(self.log_likelihood_ratio)
decisions.append(decision)
if decision is not None:
break
# 绘制检验过程
plt.figure(figsize=(12, 8))
plt.plot(sample_sizes, log_lrs, 'b-', linewidth=2, label='对数似然比')
plt.axhline(self.upper_bound, color='r', linestyle='--',
label=f'上边界 ({self.upper_bound:.2f})')
plt.axhline(self.lower_bound, color='g', linestyle='--',
label=f'下边界 ({self.lower_bound:.2f})')
plt.axhline(0, color='k', linestyle='-', alpha=0.3)
# 标记决策点
if self.decision is not None:
stop_idx = len(sample_sizes) - 1
plt.plot(sample_sizes[stop_idx], log_lrs[stop_idx], 'ro',
markersize=10, label=f'停止决策 (n={sample_sizes[stop_idx]})')
plt.xlabel('样本量')
plt.ylabel('对数似然比')
plt.title('SPRT检验过程可视化')
plt.legend()
plt.grid(True, alpha=0.3)
# 添加真实参数信息
if true_theta is not None:
plt.figtext(0.02, 0.02, f'真实参数: θ={true_theta}',
fontsize=10, bbox=dict(boxstyle="round", facecolor='wheat'))
plt.show()
return sample_sizes, log_lrs, decisions
# SPRT示例演示
def demonstrate_sprt():
"""演示SPRT检验的实际应用"""
print("SPRT序贯检验演示")
print("=" * 50)
# 设置参数
theta0 = 0.10 # 零假设:转化率10%
theta1 = 0.12 # 备择假设:转化率12%
alpha = 0.05
power = 0.8
# 创建SPRT检验
sprt = SPRT(alpha=alpha, power=power, theta0=theta0, theta1=theta1)
# 生成模拟数据 (真实转化率11.5%,在两者之间)
np.random.seed(42)
true_theta = 0.115
n_max = 10000
data_stream = np.random.binomial(1, true_theta, n_max)
print(f"零假设: θ = {theta0}")
print(f"备择假设: θ = {theta1}")
print(f"真实参数: θ = {true_theta}")
print(f"显著性水平: α = {alpha}")
print(f"统计功效: 1-β = {power}")
# 运行检验并可视化
sample_sizes, log_lrs, decisions = sprt.visualize_test_progress(data_stream, true_theta)
# 输出结果
final_status = sprt.get_current_status()
print(f"\n检验结果:")
print(f"最终样本量: {final_status['sample_size']}")
print(f"最终对数似然比: {final_status['log_likelihood_ratio']:.4f}")
if final_status['decision'] is True:
print("决策: 拒绝零假设,支持备择假设")
elif final_status['decision'] is False:
print("决策: 接受零假设")
else:
print("决策: 未做出决策(达到最大样本量)")
# 分析检验性质
analyze_sprt_properties(sprt, theta0, theta1, alpha, power)
return sprt, sample_sizes, log_lrs
def analyze_sprt_properties(sprt, theta0, theta1, alpha, power):
"""分析SPRT检验的统计性质"""
print(f"\nSPRT统计性质分析:")
print("=" * 50)
# 计算期望样本量
# 在真实参数为theta时的期望对数似然比增量
def expected_llr_increment(theta):
p_success = theta * np.log(theta1 / theta0)
p_failure = (1 - theta) * np.log((1 - theta1) / (1 - theta0))
return p_success + p_failure
# 在不同真实参数下的期望样本量
theta_values = [theta0, theta1, (theta0 + theta1)/2]
for theta in theta_values:
expected_increment = expected_llr_increment(theta)
if expected_increment != 0:
# 近似的期望样本量(实际更复杂)
approx_sample_size = min(abs(sprt.upper_bound / expected_increment),
abs(sprt.lower_bound / expected_increment))
print(f"真实θ={theta}: 期望样本量 ≈ {approx_sample_size:.0f}")
# 错误概率验证
print(f"\n理论错误概率:")
print(f"第一类错误概率 (α): {alpha}")
print(f"第二类错误概率 (β): {1-power}")
# 与固定样本量检验对比
fixed_n = calculate_fixed_sample_size(theta0, theta1, alpha, power)
print(f"\n对比固定样本量检验:")
print(f"固定样本量检验所需样本量: {fixed_n}")
# 运行SPRT演示
sprt_instance, sample_sizes, log_lrs = demonstrate_sprt()
2.2 分组序贯检验
在实际应用中,我们通常不会在每个新数据点后都进行检验,而是采用分组序贯方法,在积累一定数量的数据后进行检查。
2.2.1 O’Brien-Fleming边界
O’Brien-Fleming边界是一种常用的分组序贯设计,它在早期采用更严格的边界,在后期逐渐放宽,从而在保持整体α水平的同时提高检验效率。
class GroupSequentialTest:
"""分组序贯检验"""
def __init__(self, alpha=0.05, power=0.8, max_samples=10000,
looks=10, boundary_type='obrien_fleming'):
"""
初始化分组序贯检验
Parameters:
alpha: 总体第一类错误概率
power: 统计功效
max_samples: 最大样本量
looks: 检查次数
boundary_type: 边界类型 ('obrien_fleming', 'pocock', 'haybittle_peto')
"""
self.alpha = alpha
self.power = power
self.max_samples = max_samples
self.looks = looks
self.boundary_type = boundary_type
# 计算检查点
self.look_points = np.linspace(0, max_samples, looks + 1)[1:].astype(int)
# 计算边界
self.boundaries = self._calculate_boundaries()
# 记录状态
self.current_look = 0
self.decision = None
self.test_statistics = []
def _calculate_boundaries(self):
"""计算序贯边界"""
boundaries = {}
if self.boundary_type == 'obrien_fleming':
# O'Brien-Fleming边界
for i, point in enumerate(self.look_points):
information_fraction = point / self.max_samples
# O'Brien-Fleming边界公式
z_boundary = stats.norm.ppf(1 - self.alpha/2) / np.sqrt(information_fraction)
boundaries[point] = z_boundary
elif self.boundary_type == 'pocock':
# Pocock边界
constant = self._calculate_pocock_constant()
for point in self.look_points:
information_fraction = point / self.max_samples
boundaries[point] = constant
elif self.boundary_type == 'haybittle_peto':
# Haybittle-Peto边界
for i, point in enumerate(self.look_points):
if i < len(self.look_points) - 1: # 中期分析
boundaries[point] = 3.0 # 固定边界
else: # 最终分析
boundaries[point] = stats.norm.ppf(1 - self.alpha/2)
return boundaries
def _calculate_pocock_constant(self):
"""计算Pocock边界的常数"""
# 这里使用近似值,实际应该使用更精确的计算
if self.looks == 5:
return 2.413
elif self.looks == 10:
return 2.555
else:
# 通用近似
return stats.norm.ppf(1 - self.alpha/2) + 0.5
def analyze_data(self, control_data, treatment_data):
"""分析当前数据"""
if self.decision is not None:
return self.decision
current_sample_size = len(treatment_data)
# 检查是否达到检查点
if current_sample_size not in self.look_points:
return None
self.current_look += 1
look_point = current_sample_size
# 计算检验统计量
control_success = np.sum(control_data)
treatment_success = np.sum(treatment_data)
control_total = len(control_data)
treatment_total = len(treatment_data)
# 计算z统计量
p_control = control_success / control_total
p_treatment = treatment_success / treatment_total
p_pooled = (control_success + treatment_success) / (control_total + treatment_total)
se = np.sqrt(p_pooled * (1 - p_pooled) * (1/control_total + 1/treatment_total))
z_stat = (p_treatment - p_control) / se
self.test_statistics.append((look_point, z_stat))
# 检查边界
boundary = self.boundaries[look_point]
if abs(z_stat) >= boundary:
self.decision = 'reject' if z_stat > 0 else 'accept'
elif look_point == self.look_points[-1]:
self.decision = 'accept' # 最终分析未拒绝则接受H0
return self.decision
def visualize_boundaries(self):
"""可视化序贯边界"""
look_indices = range(1, len(self.look_points) + 1)
boundaries = [self.boundaries[point] for point in self.look_points]
plt.figure(figsize=(12, 8))
# 绘制边界
plt.plot(look_indices, boundaries, 'ro-', linewidth=2,
label=f'{self.boundary_type}边界')
plt.axhline(stats.norm.ppf(1 - self.alpha/2), color='k', linestyle='--',
label=f'固定检验边界 ({stats.norm.ppf(1 - self.alpha/2):.2f})')
# 绘制检验统计量(如果有)
if self.test_statistics:
stat_look_indices = []
stat_values = []
for point, stat in self.test_statistics:
stat_look_indices.append(self.look_points.tolist().index(point) + 1)
stat_values.append(stat)
plt.plot(stat_look_indices, stat_values, 'bo-', linewidth=2,
label='检验统计量', markersize=8)
# 标记决策点
if self.decision:
last_idx = stat_look_indices[-1]
last_stat = stat_values[-1]
plt.plot(last_idx, last_stat, 'rs', markersize=12,
label=f'决策点 ({self.decision})')
plt.xlabel('检查点')
plt.ylabel('边界值 / 检验统计量')
plt.title(f'分组序贯检验 - {self.boundary_type}边界')
plt.legend()
plt.grid(True, alpha=0.3)
plt.xticks(look_indices)
plt.show()
return look_indices, boundaries
# 分组序贯检验演示
def demonstrate_group_sequential():
"""演示分组序贯检验"""
print("分组序贯检验演示")
print("=" * 50)
# 参数设置
max_samples = 10000
looks = 8
alpha = 0.05
power = 0.8
# 比较不同边界类型
boundary_types = ['obrien_fleming', 'pocock', 'haybittle_peto']
fig, axes = plt.subplots(1, 3, figsize=(18, 6))
for i, boundary_type in enumerate(boundary_types):
# 创建分组序贯检验
gst = GroupSequentialTest(
alpha=alpha, power=power,
max_samples=max_samples, looks=looks,
boundary_type=boundary_type
)
# 生成模拟数据
np.random.seed(42)
true_control_rate = 0.10
true_treatment_rate = 0.12
control_data = np.random.binomial(1, true_control_rate, max_samples)
treatment_data = np.random.binomial(1, true_treatment_rate, max_samples)
# 运行序贯检验
for look_point in gst.look_points:
control_subset = control_data[:look_point]
treatment_subset = treatment_data[:look_point]
decision = gst.analyze_data(control_subset, treatment_subset)
if decision is not None:
print(f"{boundary_type}: 在样本量 {look_point} 时停止,决策: {decision}")
break
else:
print(f"{boundary_type}: 达到最大样本量,决策: {gst.decision}")
# 在当前子图上可视化
look_indices = range(1, looks + 1)
boundaries = [gst.boundaries[point] for point in gst.look_points]
axes[i].plot(look_indices, boundaries, 'ro-', linewidth=2, label='边界')
axes[i].axhline(stats.norm.ppf(1 - alpha/2), color='k', linestyle='--',
label='固定检验边界')
# 绘制检验统计量
if gst.test_statistics:
stat_look_indices = []
stat_values = []
for point, stat in gst.test_statistics:
stat_look_indices.append(gst.look_points.tolist().index(point) + 1)
stat_values.append(stat)
axes[i].plot(stat_look_indices, stat_values, 'bo-', linewidth=2,
label='检验统计量', markersize=6)
axes[i].set_xlabel('检查点')
axes[i].set_ylabel('边界值')
axes[i].set_title(f'{boundary_type}边界')
axes[i].legend()
axes[i].grid(True, alpha=0.3)
axes[i].set_xticks(look_indices)
plt.tight_layout()
plt.show()
# 比较不同边界类型的性质
compare_boundary_properties()
def compare_boundary_properties():
"""比较不同边界类型的性质"""
print("\n不同边界类型比较:")
print("=" * 50)
boundary_comparison = []
for boundary_type in ['obrien_fleming', 'pocock', 'haybittle_peto']:
gst = GroupSequentialTest(boundary_type=boundary_type)
# 计算早期停止的概率(在效应大小适中的情况下)
# 这里使用简化的模拟
np.random.seed(42)
n_simulations = 1000
early_stop_count = 0
avg_sample_size = 0
true_control_rate = 0.10
true_treatment_rate = 0.12
for _ in range(n_simulations):
gst_sim = GroupSequentialTest(boundary_type=boundary_type)
control_data = np.random.binomial(1, true_control_rate, gst_sim.max_samples)
treatment_data = np.random.binomial(1, true_treatment_rate, gst_sim.max_samples)
stopped_early = False
for look_point in gst_sim.look_points:
decision = gst_sim.analyze_data(
control_data[:look_point],
treatment_data[:look_point]
)
if decision is not None and look_point < gst_sim.max_samples:
stopped_early = True
avg_sample_size += look_point
break
else:
avg_sample_size += gst_sim.max_samples
early_stop_prob = early_stop_count / n_simulations
avg_sample_size /= n_simulations
boundary_comparison.append({
'type': boundary_type,
'early_stop_prob': early_stop_prob,
'avg_sample_size': avg_sample_size,
'efficiency_gain': (10000 - avg_sample_size) / 10000
})
print(f"{boundary_type}:")
print(f" 早期停止概率: {early_stop_prob:.3f}")
print(f" 平均样本量: {avg_sample_size:.0f}")
print(f" 效率提升: {boundary_comparison[-1]['efficiency_gain']:.1%}")
return boundary_comparison
# 运行分组序贯检验演示
boundary_comparison = demonstrate_group_sequential()
III. 序贯检验在A/B测试中的实现
3.1 完整的序贯A/B测试系统
在实际业务中,我们需要构建一个完整的序贯A/B测试系统,包含流量分配、数据跟踪、序贯分析和决策支持等功能。
class SequentialABTestSystem:
"""序贯A/B测试系统"""
def __init__(self, experiment_id, alpha=0.05, power=0.8,
min_samples=100, max_samples=10000, looks=10,
boundary_type='obrien_fleming'):
"""
初始化序贯A/B测试系统
Parameters:
experiment_id: 实验ID
alpha: 总体第一类错误概率
power: 统计功效
min_samples: 最小样本量(避免早期偶然性)
max_samples: 最大样本量
looks: 检查次数
boundary_type: 边界类型
"""
self.experiment_id = experiment_id
self.alpha = alpha
self.power = power
self.min_samples = min_samples
self.max_samples = max_samples
self.looks = looks
self.boundary_type = boundary_type
# 初始化检验器
self.sequential_test = GroupSequentialTest(
alpha=alpha, power=power,
max_samples=max_samples, looks=looks,
boundary_type=boundary_type
)
# 数据存储
self.control_data = []
self.treatment_data = []
self.user_assignments = {} # user_id -> group
# 实验结果
self.final_decision = None
self.stopping_point = None
self.analysis_history = []
print(f"初始化序贯A/B测试: {experiment_id}")
print(f"参数: α={alpha}, 功效={power}, 样本量范围={min_samples}-{max_samples}")
def assign_user(self, user_id, traffic_split=0.5):
"""分配用户到对照组或实验组"""
if user_id in self.user_assignments:
return self.user_assignments[user_id]
# 随机分配,但保持比例
group = 'treatment' if np.random.random() < traffic_split else 'control'
self.user_assignments[user_id] = group
return group
def track_conversion(self, user_id, converted, value=0):
"""跟踪转化事件"""
if user_id not in self.user_assignments:
raise ValueError(f"用户 {user_id} 未分配组别")
group = self.user_assignments[user_id]
conversion_data = 1 if converted else 0
if group == 'control':
self.control_data.append(conversion_data)
else:
self.treatment_data.append(conversion_data)
# 检查是否需要进行序贯分析
return self._check_sequential_analysis()
def _check_sequential_analysis(self):
"""检查是否达到分析点并执行序贯分析"""
current_sample_size = min(len(self.control_data), len(self.treatment_data))
# 确保有足够的数据且达到检查点
if (current_sample_size < self.min_samples or
current_sample_size not in self.sequential_test.look_points):
return None
# 执行序贯分析
decision = self.sequential_test.analyze_data(
self.control_data, self.treatment_data
)
# 记录分析历史
analysis_record = {
'sample_size': current_sample_size,
'control_conversions': np.sum(self.control_data),
'treatment_conversions': np.sum(self.treatment_data),
'control_rate': np.mean(self.control_data),
'treatment_rate': np.mean(self.treatment_data),
'decision': decision,
'timestamp': pd.Timestamp.now()
}
self.analysis_history.append(analysis_record)
if decision is not None and self.final_decision is None:
self.final_decision = decision
self.stopping_point = current_sample_size
print(f"实验停止于样本量 {self.stopping_point},决策: {decision}")
return decision
def get_current_stats(self):
"""获取当前统计信息"""
if len(self.control_data) == 0 or len(self.treatment_data) == 0:
return None
control_conversions = np.sum(self.control_data)
treatment_conversions = np.sum(self.treatment_data)
control_total = len(self.control_data)
treatment_total = len(self.treatment_data)
control_rate = control_conversions / control_total
treatment_rate = treatment_conversions / treatment_total
relative_improvement = (treatment_rate - control_rate) / control_rate
# 计算置信区间
from statsmodels.stats.proportion import proportion_confint
control_ci = proportion_confint(control_conversions, control_total, alpha=0.05)
treatment_ci = proportion_confint(treatment_conversions, treatment_total, alpha=0.05)
return {
'control': {
'conversions': control_conversions,
'total': control_total,
'rate': control_rate,
'ci_lower': control_ci[0],
'ci_upper': control_ci[1]
},
'treatment': {
'conversions': treatment_conversions,
'total': treatment_total,
'rate': treatment_rate,
'ci_lower': treatment_ci[0],
'ci_upper': treatment_ci[1]
},
'difference': {
'absolute': treatment_rate - control_rate,
'relative': relative_improvement
},
'sample_size': min(control_total, treatment_total)
}
def generate_report(self):
"""生成实验报告"""
current_stats = self.get_current_stats()
report = [
f"序贯A/B测试报告: {self.experiment_id}",
"=" * 50,
f"生成时间: {pd.Timestamp.now()}",
f"最终决策: {self.final_decision if self.final_decision else '进行中'}",
f"停止点: {self.stopping_point if self.stopping_point else '未停止'}",
""
]
if current_stats:
report.append("当前统计信息:")
report.append(f" 对照组: {current_stats['control']['conversions']}/{current_stats['control']['total']} "
f"({current_stats['control']['rate']:.4f}) "
f"[{current_stats['control']['ci_lower']:.4f}, {current_stats['control']['ci_upper']:.4f}]")
report.append(f" 实验组: {current_stats['treatment']['conversions']}/{current_stats['treatment']['total']} "
f"({current_stats['treatment']['rate']:.4f}) "
f"[{current_stats['treatment']['ci_lower']:.4f}, {current_stats['treatment']['ci_upper']:.4f}]")
report.append(f" 绝对差异: {current_stats['difference']['absolute']:.4f}")
report.append(f" 相对提升: {current_stats['difference']['relative']:.2%}")
if self.analysis_history:
report.append("\n序贯分析历史:")
for i, analysis in enumerate(self.analysis_history[-5:]): # 显示最近5次分析
report.append(f" 分析{i+1} (n={analysis['sample_size']}): "
f"决策={analysis['decision']}, "
f"转化率={analysis['control_rate']:.3f} vs {analysis['treatment_rate']:.3f}")
# 与固定样本量检验对比
if current_stats:
fixed_sample_size = calculate_fixed_sample_size(
current_stats['control']['rate'],
current_stats['treatment']['rate'],
self.alpha, self.power
)
efficiency_gain = (fixed_sample_size - len(self.treatment_data)) / fixed_sample_size
report.append(f"\n效率分析:")
report.append(f" 当前样本量: {len(self.treatment_data)}")
report.append(f" 固定样本量所需: {fixed_sample_size}")
report.append(f" 样本量节约: {efficiency_gain:.1%}")
return "\n".join(report)
def visualize_experiment_progress(self):
"""可视化实验进度"""
if not self.analysis_history:
print("尚无分析数据")
return
# 创建可视化
fig, ((ax1, ax2), (ax3, ax4)) = plt.subplots(2, 2, figsize=(15, 12))
# 1. 转化率随时间变化
sample_sizes = [a['sample_size'] for a in self.analysis_history]
control_rates = [a['control_rate'] for a in self.analysis_history]
treatment_rates = [a['treatment_rate'] for a in self.analysis_history]
ax1.plot(sample_sizes, control_rates, 'b-', marker='o', label='对照组')
ax1.plot(sample_sizes, treatment_rates, 'r-', marker='s', label='实验组')
ax1.set_xlabel('样本量')
ax1.set_ylabel('转化率')
ax1.set_title('转化率随样本量变化')
ax1.legend()
ax1.grid(True, alpha=0.3)
# 2. 序贯边界和检验统计量
self.sequential_test.visualize_boundaries()
# 3. 相对提升的置信区间
current_stats = self.get_current_stats()
if current_stats:
relative_improvement = current_stats['difference']['relative']
# 使用bootstrap计算置信区间
bootstrap_differences = []
n_bootstrap = 1000
for _ in range(n_bootstrap):
control_bs = np.random.choice(self.control_data, size=len(self.control_data), replace=True)
treatment_bs = np.random.choice(self.treatment_data, size=len(self.treatment_data), replace=True)
control_rate_bs = np.mean(control_bs)
treatment_rate_bs = np.mean(treatment_bs)
if control_rate_bs > 0:
rel_diff_bs = (treatment_rate_bs - control_rate_bs) / control_rate_bs
bootstrap_differences.append(rel_diff_bs)
ci_lower = np.percentile(bootstrap_differences, 2.5)
ci_upper = np.percentile(bootstrap_differences, 97.5)
ax3.bar(['相对提升'], [relative_improvement],
yerr=[[relative_improvement - ci_lower], [ci_upper - relative_improvement]],
capsize=10, color='lightblue', alpha=0.7)
ax3.axhline(0, color='k', linestyle='-', alpha=0.3)
ax3.set_ylabel('相对提升')
ax3.set_title('相对提升估计')
ax3.grid(True, alpha=0.3)
# 4. 样本量节约分析
if current_stats:
fixed_n = calculate_fixed_sample_size(
current_stats['control']['rate'],
current_stats['treatment']['rate'],
self.alpha, self.power
)
actual_n = len(self.treatment_data)
ax4.bar(['固定样本量', '序贯检验'], [fixed_n, actual_n],
color=['red', 'green'], alpha=0.7)
ax4.set_ylabel('样本量')
ax4.set_title(f'样本量节约: {(fixed_n-actual_n)/fixed_n:.1%}')
ax4.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
# 完整的序贯A/B测试演示
def demonstrate_sequential_ab_test():
"""演示完整的序贯A/B测试流程"""
print("完整的序贯A/B测试演示")
print("=" * 50)
# 创建序贯A/B测试系统
ab_test = SequentialABTestSystem(
experiment_id="homepage_button_test_2024",
alpha=0.05,
power=0.8,
min_samples=200,
max_samples=10000,
looks=12,
boundary_type='obrien_fleming'
)
# 模拟用户流量和转化
np.random.seed(42)
n_users = 15000
# 真实效果:实验组比对照组提升20%
true_control_rate = 0.08
true_treatment_rate = 0.10
print(f"真实参数: 对照组={true_control_rate}, 实验组={true_treatment_rate}")
stopped_early = False
for user_id in range(n_users):
# 分配用户
group = ab_test.assign_user(f"user_{user_id}", traffic_split=0.5)
# 根据真实参数模拟转化
if group == 'control':
converted = np.random.random() < true_control_rate
else:
converted = np.random.random() < true_treatment_rate
# 跟踪转化
decision = ab_test.track_conversion(f"user_{user_id}", converted)
# 检查是否提前停止
if decision is not None and not stopped_early:
print(f"在用户 {user_id} 时提前停止实验")
stopped_early = True
# 在实际应用中,这里会停止分配流量
# 生成报告
report = ab_test.generate_report()
print("\n" + report)
# 可视化结果
ab_test.visualize_experiment_progress()
# 分析序贯检验的性能
analyze_sequential_performance(ab_test)
return ab_test
def analyze_sequential_performance(ab_test):
"""分析序贯检验的性能"""
print("\n序贯检验性能分析:")
print("=" * 50)
current_stats = ab_test.get_current_stats()
if not current_stats:
return
# 计算实际错误率(通过模拟)
n_simulations = 500
false_positive_count = 0
false_negative_count = 0
sample_sizes = []
true_control_rate = current_stats['control']['rate']
for sim in range(n_simulations):
# 模拟无真实效应的情况(检验第一类错误)
sim_test = SequentialABTestSystem(
experiment_id=f"sim_{sim}",
alpha=0.05, power=0.8,
min_samples=200, max_samples=10000
)
# 生成数据(无效应)
control_data = np.random.binomial(1, true_control_rate, 10000)
treatment_data = np.random.binomial(1, true_control_rate, 10000)
decision = None
for i in range(10000):
if i >= len(control_data) or i >= len(treatment_data):
break
decision = sim_test.track_conversion(f"c_{i}", bool(control_data[i]))
if decision is not None:
break
decision = sim_test.track_conversion(f"t_{i}", bool(treatment_data[i]))
if decision is not None:
break
if decision == 'reject': # 错误拒绝H0
false_positive_count += 1
sample_sizes.append(sim_test.stopping_point if sim_test.stopping_point else 10000)
empirical_alpha = false_positive_count / n_simulations
avg_sample_size = np.mean(sample_sizes)
print(f"经验第一类错误率: {empirical_alpha:.4f} (理论: {ab_test.alpha})")
print(f"平均样本量: {avg_sample_size:.0f}")
print(f"与固定样本量相比节约: {(10000 - avg_sample_size)/10000:.1%}")
# 运行完整的序贯A/B测试演示
ab_test_system = demonstrate_sequential_ab_test()
3.2 序贯检验的实操考虑
在实际业务中实施序贯检验时,需要考虑多个实操因素:
考虑因素 | 挑战 | 解决方案 |
---|---|---|
操作频率 | 过于频繁的检查增加第一类错误 | 使用分组序贯设计,限制检查次数 |
最小样本量 | 早期偶然波动导致错误决策 | 设置合理的最小样本量门槛 |
多重检验 | 同时运行多个实验的错误率膨胀 | 使用更严格的边界或错误发现率控制 |
效果估计 | 提前停止可能高估效应大小 | 使用偏差校正的估计方法 |
系统复杂性 | 实现和维护成本较高 | 使用成熟的统计库和标准化流程 |
IV. 序贯检验的优势与业务价值
4.1 效率提升的量化分析
序贯检验最直接的优势在于样本量节约,让我们通过系统的模拟分析来量化这一优势。
def comprehensive_efficiency_analysis():
"""序贯检验效率的全面分析"""
print("序贯检验效率分析")
print("=" * 50)
# 测试不同的效应大小
effect_sizes = [0.05, 0.10, 0.15, 0.20, 0.25, 0.30] # 相对提升
baseline_rate = 0.10
n_simulations = 500
results = []
for effect_size in effect_sizes:
treatment_rate = baseline_rate * (1 + effect_size)
sequential_sample_sizes = []
fixed_sample_sizes = []
early_stop_rates = []
for sim in range(n_simulations):
# 计算固定样本量
fixed_n = calculate_fixed_sample_size(
baseline_rate, treatment_rate, alpha=0.05, power=0.8
)
fixed_sample_sizes.append(fixed_n)
# 运行序贯检验
seq_test = SequentialABTestSystem(
experiment_id=f"eff_{effect_size}_{sim}",
alpha=0.05, power=0.8,
min_samples=200, max_samples=10000,
looks=10
)
# 生成数据直到停止或达到最大样本量
control_data = np.random.binomial(1, baseline_rate, 10000)
treatment_data = np.random.binomial(1, treatment_rate, 10000)
decision = None
for i in range(10000):
if i >= len(control_data) or i >= len(treatment_data):
break
decision = seq_test.track_conversion(f"c_{i}", bool(control_data[i]))
if decision is not None:
break
decision = seq_test.track_conversion(f"t_{i}", bool(treatment_data[i]))
if decision is not None:
break
stop_point = seq_test.stopping_point if seq_test.stopping_point else 10000
sequential_sample_sizes.append(stop_point)
early_stop_rates.append(1 if stop_point < 10000 else 0)
avg_sequential = np.mean(sequential_sample_sizes)
avg_fixed = np.mean(fixed_sample_sizes)
efficiency_gain = (avg_fixed - avg_sequential) / avg_fixed
early_stop_rate = np.mean(early_stop_rates)
results.append({
'effect_size': effect_size,
'relative_improvement': effect_size,
'avg_sequential_n': avg_sequential,
'avg_fixed_n': avg_fixed,
'efficiency_gain': efficiency_gain,
'early_stop_rate': early_stop_rate
})
print(f"效应大小 {effect_size:.0%}:")
print(f" 序贯检验平均样本量: {avg_sequential:.0f}")
print(f" 固定检验样本量: {avg_fixed:.0f}")
print(f" 效率提升: {efficiency_gain:.1%}")
print(f" 早期停止率: {early_stop_rate:.1%}")
# 可视化结果
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(15, 6))
# 样本量对比
effect_sizes_plot = [r['effect_size'] for r in results]
sequential_n_plot = [r['avg_sequential_n'] for r in results]
fixed_n_plot = [r['avg_fixed_n'] for r in results]
ax1.plot(effect_sizes_plot, sequential_n_plot, 'bo-', linewidth=2,
markersize=8, label='序贯检验')
ax1.plot(effect_sizes_plot, fixed_n_plot, 'r--', linewidth=2,
label='固定样本量检验')
ax1.set_xlabel('相对效应大小')
ax1.set_ylabel('样本量')
ax1.set_title('样本量需求对比')
ax1.legend()
ax1.grid(True, alpha=0.3)
# 效率提升
efficiency_gains = [r['efficiency_gain'] for r in results]
ax2.plot(effect_sizes_plot, efficiency_gains, 'g^-', linewidth=2,
markersize=8, label='效率提升')
ax2.set_xlabel('相对效应大小')
ax2.set_ylabel('效率提升')
ax2.set_title('序贯检验的效率优势')
ax2.grid(True, alpha=0.3)
# 添加参考线
ax2.axhline(0, color='k', linestyle='-', alpha=0.3)
ax2.axhline(0.5, color='r', linestyle='--', alpha=0.5, label='50%提升参考线')
ax2.legend()
plt.tight_layout()
plt.show()
# 创建总结表格
results_df = pd.DataFrame(results)
print(f"\n效率分析总结:")
print(results_df.round(4))
return results_df
# 运行效率分析
efficiency_results = comprehensive_efficiency_analysis()
4.2 业务价值评估
序贯检验的业务价值不仅体现在统计效率上,更重要的是其对业务决策速度和风险控制的影响。
def business_value_assessment():
"""序贯检验的业务价值评估"""
print("序贯检验业务价值评估")
print("=" * 50)
# 业务场景参数
scenarios = [
{
'name': '高风险功能发布',
'risk_level': '高',
'traffic_volume': 100000, # 日流量
'customer_value': 200, # 用户平均价值
'implementation_cost': 50000, # 实施成本
'opportunity_cost_delay': 1000 # 延迟决策的日机会成本
},
{
'name': '中等优化测试',
'risk_level': '中',
'traffic_volume': 50000,
'customer_value': 100,
'implementation_cost': 10000,
'opportunity_cost_delay': 500
},
{
'name': '低风险UI改进',
'risk_level': '低',
'traffic_volume': 20000,
'customer_value': 50,
'implementation_cost': 5000,
'opportunity_cost_delay': 200
}
]
# 假设的实验结果
experiment_results = {
'effect_size': 0.15, # 15%提升
'baseline_conversion': 0.08,
'sequential_stop_time': 2500, # 序贯检验停止样本量
'fixed_sample_size': 5000, # 固定检验样本量
'decision_accuracy': 0.95 # 决策准确率
}
value_analysis = []
for scenario in scenarios:
# 计算时间节约
daily_traffic_per_group = scenario['traffic_volume'] * 0.5 # 50%流量分配
days_saved = (experiment_results['fixed_sample_size'] -
experiment_results['sequential_stop_time']) / daily_traffic_per_group
# 计算机会成本节约
opportunity_saving = days_saved * scenario['opportunity_cost_delay']
# 计算风险控制价值
risk_exposure_reduction = (experiment_results['fixed_sample_size'] -
experiment_results['sequential_stop_time']) / scenario['traffic_volume']
# 如果新版本有负面影响,减少的损失
negative_impact_reduction = risk_exposure_reduction * scenario['customer_value'] * 0.1 # 假设10%的负面影响
# 总价值估计
total_value = opportunity_saving + negative_impact_reduction
value_analysis.append({
'scenario': scenario['name'],
'risk_level': scenario['risk_level'],
'days_saved': days_saved,
'opportunity_saving': opportunity_saving,
'risk_reduction_value': negative_impact_reduction,
'total_value': total_value,
'efficiency_gain': (experiment_results['fixed_sample_size'] -
experiment_results['sequential_stop_time']) / experiment_results['fixed_sample_size']
})
print(f"{scenario['name']} ({scenario['risk_level']}风险):")
print(f" 时间节约: {days_saved:.1f} 天")
print(f" 机会成本节约: ¥{opportunity_saving:,.0f}")
print(f" 风险控制价值: ¥{negative_impact_reduction:,.0f}")
print(f" 总价值: ¥{total_value:,.0f}")
print(f" 效率提升: {value_analysis[-1]['efficiency_gain']:.1%}")
# 可视化业务价值
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(15, 6))
# 价值分解
scenarios_names = [v['scenario'] for v in value_analysis]
opportunity_values = [v['opportunity_saving'] for v in value_analysis]
risk_values = [v['risk_reduction_value'] for v in value_analysis]
x = np.arange(len(scenarios_names))
width = 0.35
ax1.bar(x - width/2, opportunity_values, width, label='机会成本节约', alpha=0.7)
ax1.bar(x + width/2, risk_values, width, label='风险控制价值', alpha=0.7)
ax1.set_xlabel('业务场景')
ax1.set_ylabel('价值 (元)')
ax1.set_title('序贯检验的业务价值分解')
ax1.set_xticks(x)
ax1.set_xticklabels(scenarios_names)
ax1.legend()
ax1.grid(True, alpha=0.3)
# 效率与价值关系
efficiency_gains = [v['efficiency_gain'] for v in value_analysis]
total_values = [v['total_value'] for v in value_analysis]
colors = ['red', 'orange', 'green']
for i, (eff, val) in enumerate(zip(efficiency_gains, total_values)):
ax2.scatter(eff, val, s=200, c=colors[i], alpha=0.7,
label=scenarios_names[i])
ax2.set_xlabel('效率提升')
ax2.set_ylabel('总业务价值 (元)')
ax2.set_title('效率提升与业务价值的关系')
ax2.legend()
ax2.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
# 投资回报率分析
print(f"\n投资回报率(ROI)分析:")
for i, scenario in enumerate(scenarios):
implementation_cost = scenario['implementation_cost']
roi = value_analysis[i]['total_value'] / implementation_cost
print(f"{scenario['name']}:")
print(f" 实施成本: ¥{implementation_cost:,.0f}")
print(f" 预期价值: ¥{value_analysis[i]['total_value']:,.0f}")
print(f" ROI: {roi:.1f}x")
return value_analysis
# 运行业务价值评估
business_values = business_value_assessment()
4.3 实施建议与最佳实践
基于理论分析和实践经验,我们总结了序贯检验的实施建议:
实践领域 | 具体建议 | 理论依据 |
---|---|---|
实验设计 | 设置合理的最小样本量(100-500) | 避免早期偶然波动导致错误决策 |
边界选择 | 高风险场景使用O’Brien-Fleming边界 | 更好的第一类错误控制 |
检查频率 | 5-15次检查点,等间距或信息量比例 | 平衡效率与操作复杂性 |
效果估计 | 使用偏差校正方法估计效应大小 | 解决提前停止导致的估计偏差 |
多重检验 | 控制实验家族错误率 | 避免多重比较问题 |
V. 实际案例研究
5.1 电商网站转化率优化案例
某大型电商网站在黑色星期五促销期间需要测试新的商品推荐算法。由于促销期短暂,传统的固定样本量检验无法在有限时间内完成,因此采用序贯检验方法。
def ecommerce_case_study():
"""电商网站序贯检验实战案例"""
print("电商网站转化率优化案例研究")
print("=" * 50)
# 案例背景
case_background = {
'business_context': '黑色星期五促销期商品推荐算法优化',
'time_constraint': '促销期仅7天,需要快速决策',
'traffic_volume': 500000, # 日均流量
'baseline_conversion': 0.035, # 基线转化率3.5%
'min_meaningful_effect': 0.10, # 10%相对提升
'risk_tolerance': '中等' # 对错误决策的容忍度
}
print("案例背景:")
for key, value in case_background.items():
print(f" {key}: {value}")
# 设计序贯检验
sequential_design = {
'alpha': 0.05,
'power': 0.85, # 稍高的功效要求
'min_samples': 1000, # 最小样本量
'max_samples': 50000, # 最大样本量
'looks': 15, # 检查次数
'boundary_type': 'obrien_fleming' # 严格的边界控制
}
print(f"\n序贯检验设计:")
for key, value in sequential_design.items():
print(f" {key}: {value}")
# 模拟实验运行
np.random.seed(42) # 可重复的结果
# 真实效果:新算法提升12%转化率
true_control_rate = 0.035
true_treatment_rate = 0.0392 # 12%提升
# 创建序贯检验系统
ecommerce_test = SequentialABTestSystem(
experiment_id="black_friday_recommendation_2024",
**sequential_design
)
# 模拟7天的用户流量
daily_traffic = case_background['traffic_volume']
days = 7
daily_decisions = []
for day in range(days):
print(f"\n第 {day+1} 天:")
# 当天的用户数量
daily_users = int(daily_traffic * 0.5) # 50%流量用于实验
day_control_conversions = 0
day_treatment_conversions = 0
for user in range(daily_users):
# 分配用户
group = ecommerce_test.assign_user(f"day{day}_user{user}")
# 模拟转化
if group == 'control':
converted = np.random.random() < true_control_rate
day_control_conversions += 1 if converted else 0
else:
converted = np.random.random() < true_treatment_rate
day_treatment_conversions += 1 if converted else 0
# 跟踪转化
decision = ecommerce_test.track_conversion(
f"day{day}_user{user}", converted
)
# 当日统计
daily_stats = ecommerce_test.get_current_stats()
if daily_stats:
print(f" 对照组: {daily_stats['control']['conversions']}/{daily_stats['control']['total']} "
f"({daily_stats['control']['rate']:.4f})")
print(f" 实验组: {daily_stats['treatment']['conversions']}/{daily_stats['treatment']['total']} "
f"({daily_stats['treatment']['rate']:.4f})")
print(f" 相对提升: {daily_stats['difference']['relative']:.2%}")
if decision is not None:
print(f" ✅ 实验停止! 最终决策: {decision}")
daily_decisions.append(decision)
break
else:
print(f" ⏳ 继续实验...")
daily_decisions.append(None)
# 最终报告
final_report = ecommerce_test.generate_report()
print(f"\n{final_report}")
# 业务影响分析
analyze_business_impact(ecommerce_test, case_background)
# 与传统方法的对比
compare_with_traditional_approach(ecommerce_test, case_background)
return ecommerce_test, daily_decisions
def analyze_business_impact(ab_test, background):
"""分析业务影响"""
print("\n业务影响分析:")
print("=" * 50)
stats = ab_test.get_current_stats()
if not stats:
return
# 计算额外的转化和收入
additional_conversions = (stats['treatment']['conversions'] -
stats['control']['conversions'] *
stats['treatment']['total'] / stats['control']['total'])
# 假设平均订单价值
avg_order_value = 250 # 元
additional_revenue = additional_conversions * avg_order_value
# 计算由于提前停止节约的机会成本
if ab_test.stopping_point and ab_test.stopping_point < ab_test.max_samples:
days_saved = (ab_test.max_samples - ab_test.stopping_point) / (background['traffic_volume'] * 0.5)
opportunity_saving = days_saved * background['traffic_volume'] * 0.5 * background['baseline_conversion'] * avg_order_value * stats['difference']['relative']
else:
opportunity_saving = 0
print(f"实验期间业务影响:")
print(f" 额外转化: {additional_conversions:.0f} 次")
print(f" 额外收入: ¥{additional_revenue:,.0f}")
print(f" 机会成本节约: ¥{opportunity_saving:,.0f}")
print(f" 总价值: ¥{additional_revenue + opportunity_saving:,.0f}")
# 如果全量发布的预期影响
if ab_test.final_decision == 'reject':
monthly_traffic = background['traffic_volume'] * 30
expected_monthly_impact = monthly_traffic * background['baseline_conversion'] * stats['difference']['relative'] * avg_order_value
print(f" 全量发布月预期影响: ¥{expected_monthly_impact:,.0f}")
def compare_with_traditional_approach(ab_test, background):
"""与传统方法对比"""
print("\n与传统固定样本量方法对比:")
print("=" * 50)
stats = ab_test.get_current_stats()
if not stats:
return
# 计算固定样本量所需时间
fixed_sample_size = calculate_fixed_sample_size(
stats['control']['rate'],
stats['treatment']['rate'],
ab_test.alpha, ab_test.power
)
daily_traffic_per_group = background['traffic_volume'] * 0.5
fixed_days_required = fixed_sample_size / daily_traffic_per_group
actual_days_used = ab_test.stopping_point / daily_traffic_per_group if ab_test.stopping_point else ab_test.max_samples / daily_traffic_per_group
print(f"固定样本量方法:")
print(f" 所需样本量: {fixed_sample_size}")
print(f" 所需时间: {fixed_days_required:.1f} 天")
print(f"序贯检验方法:")
print(f" 实际样本量: {ab_test.stopping_point if ab_test.stopping_point else ab_test.max_samples}")
print(f" 实际时间: {actual_days_used:.1f} 天")
print(f"时间节约: {fixed_days_required - actual_days_used:.1f} 天 ({((fixed_days_required - actual_days_used)/fixed_days_required):.1%})")
# 在时间约束下的成功率
time_constraint = 7 # 天
if fixed_days_required > time_constraint:
print(f"⚠️ 固定样本量方法无法在 {time_constraint} 天内完成")
print(f"✅ 序贯检验方法成功在时间约束内完成决策")
else:
print(f"两种方法都能在时间约束内完成")
# 运行电商案例研究
ecommerce_test_system, daily_decisions = ecommerce_case_study()
5.2 关键成功因素与经验教训
基于实际案例的实施经验,我们总结了序贯检验成功实施的关键因素:
成功因素 | 具体实践 | 避免的陷阱 |
---|---|---|
明确的目标 | 预先定义最小有意义效应和风险容忍度 | 避免过度依赖统计显著性而忽略业务意义 |
合适的设计 | 根据业务场景选择边界类型和检查频率 | 避免过于激进的设计导致错误决策 |
基础设施 | 建立可靠的实时数据流水线和监控 | 避免数据质量问题影响检验有效性 |
团队教育 | 培训业务团队理解序贯检验的原理和限制 | 避免错误解读结果和过早决策 |
持续优化 | 基于经验调整设计参数和决策流程 | 避免一成不变的方法不适应业务变化 |
- 点赞
- 收藏
- 关注作者
评论(0)