A/B测试样本量计算的统计学基础
I. 为什么样本量计算如此重要
1.1 样本量不足的代价
在A/B测试实践中,样本量问题往往被低估。让我们通过一个实际案例来理解其重要性:
import numpy as np
import matplotlib.pyplot as plt
from scipy import stats
def demonstrate_sample_size_impact():
"""展示样本量对A/B测试结果的影响"""
np.random.seed(42)
# 真实效应:实验组转化率比对照组高10%
true_control_rate = 0.10
true_treatment_rate = 0.11 # 实际提升10%
sample_sizes = [100, 500, 1000, 5000]
n_simulations = 1000
detection_rates = []
for n in sample_sizes:
significant_count = 0
for _ in range(n_simulations):
# 模拟A/B测试数据
control_conversions = np.random.binomial(1, true_control_rate, n)
treatment_conversions = np.random.binomial(1, true_treatment_rate, n)
# 执行统计检验
from statsmodels.stats.proportion import proportions_ztest
count = [np.sum(control_conversions), np.sum(treatment_conversions)]
nobs = [n, n]
z_stat, p_value = proportions_ztest(count, nobs)
if p_value < 0.05:
significant_count += 1
detection_rate = significant_count / n_simulations
detection_rates.append(detection_rate)
print(f"样本量 {n}: 检测到显著效应的概率 = {detection_rate:.3f}")
# 可视化结果
plt.figure(figsize=(10, 6))
plt.plot(sample_sizes, detection_rates, 'bo-', linewidth=2, markersize=8)
plt.axhline(y=0.8, color='r', linestyle='--', label='期望的统计功效 (80%)')
plt.xlabel('样本量 (每组)')
plt.ylabel('检测到真实效应的概率')
plt.title('样本量对统计功效的影响')
plt.legend()
plt.grid(True, alpha=0.3)
plt.show()
# 运行演示
demonstrate_sample_size_impact()
运行上述代码,您会发现一个关键现象:即使存在真实的业务效应,样本量不足也会导致我们无法可靠地检测到这种效应。
1.2 样本量计算的核心价值
价值维度 | 具体影响 | 长期后果 |
---|---|---|
统计可靠性 | 控制第一类错误(α)和第二类错误(β) | 避免基于随机波动做出错误决策 |
业务效率 | 确保检测到有业务意义的最小效应 | 资源投入与预期回报相匹配 |
实验可信度 | 结果具有足够的统计功效 | 建立数据驱动决策的文化 |
资源优化 | 避免过度收集数据或过早停止实验 | 提高实验项目的投资回报率 |
1.3 真实世界案例分析
考虑一个电商网站的注册流程优化测试:
- 现状:当前注册转化率为8%
- 目标:检测至少10%的相对提升(即从8%提升到8.8%)
- 风险容忍度:假阳性率5%,假阴性率20%
如果没有进行样本量计算,团队可能只在收集了1000个用户后就停止实验。但通过正确的计算,他们会发现需要近15000个用户才能可靠地检测这种规模的效应。
def calculate_required_sample_size(baseline_rate, mde, alpha=0.05, power=0.8):
"""计算比例检验所需的样本量"""
from statsmodels.stats.power import NormalIndPower
from math import asin, sqrt
# 计算效应量 (Cohen's h)
effect_size = 2 * asin(sqrt(baseline_rate * (1 + mde))) - 2 * asin(sqrt(baseline_rate))
# 使用statsmodels计算样本量
power_analysis = NormalIndPower()
sample_size = power_analysis.solve_power(
effect_size=effect_size,
alpha=alpha,
power=power,
ratio=1.0 # 两组样本量相等
)
return int(round(sample_size))
# 计算示例
baseline = 0.08
mde = 0.10 # 10%相对提升
required_n = calculate_required_sample_size(baseline, mde)
print(f"每组需要的样本量: {required_n}")
print(f"总样本量: {required_n * 2}")
II. 统计学基础概念
2.1 假设检验框架
样本量计算建立在假设检验的统计学框架之上。理解这个框架是掌握样本量计算的关键。
2.1.1 零假设与备择假设
在A/B测试中,我们建立两个相互竞争的假设:
- 零假设 (H₀):实验组和对照组没有显著差异
- 备择假设 (H₁):实验组和对照组存在显著差异
样本量计算的目标是确保当备择假设为真时,我们有足够的能力检测到这种差异。
2.1.2 决策错误类型
错误类型 | 定义 | 概率表示 | 业务影响 |
---|---|---|---|
第一类错误 (Type I Error) | 错误拒绝真零假设 | α (显著性水平) | 假阳性,推出无效的改动 |
第二类错误 (Type II Error) | 错误接受假零假设 | β | 假阴性,错过有价值的优化 |
统计功效 (Statistical Power) | 正确拒绝假零假设 | 1-β | 检测真实效应的能力 |
import numpy as np
import matplotlib.pyplot as plt
from scipy import stats
def visualize_hypothesis_testing():
"""可视化假设检验中的错误类型"""
# 设置参数
mu_0 = 0 # 零假设的均值
mu_1 = 0.5 # 备择假设的均值
sigma = 1 # 标准差
n = 50 # 样本量
alpha = 0.05 # 显著性水平
# 计算标准误
se = sigma / np.sqrt(n)
# 计算临界值 (双侧检验)
z_critical = stats.norm.ppf(1 - alpha/2)
critical_value = z_critical * se
# 创建x轴数据
x = np.linspace(-3, 3, 1000)
# 计算两个分布的概率密度函数
pdf_h0 = stats.norm.pdf(x, mu_0, se)
pdf_h1 = stats.norm.pdf(x, mu_1, se)
# 绘制图形
plt.figure(figsize=(12, 8))
# 绘制分布曲线
plt.plot(x, pdf_h0, 'b-', linewidth=2, label='零假设分布 (H₀)')
plt.plot(x, pdf_h1, 'r-', linewidth=2, label='备择假设分布 (H₁)')
# 填充第一类错误区域 (α)
x_alpha_left = np.linspace(-3, -critical_value, 100)
x_alpha_right = np.linspace(critical_value, 3, 100)
plt.fill_between(x_alpha_left, stats.norm.pdf(x_alpha_left, mu_0, se),
alpha=0.3, color='blue', label=f'第一类错误 (α = {alpha})')
plt.fill_between(x_alpha_right, stats.norm.pdf(x_alpha_right, mu_0, se),
alpha=0.3, color='blue')
# 填充第二类错误区域 (β)
x_beta = np.linspace(-critical_value, critical_value, 100)
plt.fill_between(x_beta, stats.norm.pdf(x_beta, mu_1, se),
alpha=0.3, color='red', label='第二类错误 (β)')
# 添加垂直线表示临界值
plt.axvline(-critical_value, color='gray', linestyle='--', alpha=0.7)
plt.axvline(critical_value, color='gray', linestyle='--', alpha=0.7,
label=f'临界值 (±{critical_value:.2f})')
# 计算统计功效
power = 1 - stats.norm.cdf(critical_value, mu_1, se) + stats.norm.cdf(-critical_value, mu_1, se)
plt.title(f'假设检验错误类型可视化\n统计功效 (1-β) = {power:.3f}')
plt.xlabel('效应大小')
plt.ylabel('概率密度')
plt.legend()
plt.grid(True, alpha=0.3)
plt.show()
return power
# 运行可视化
power = visualize_hypothesis_testing()
print(f"在当前参数下的统计功效: {power:.3f}")
2.2 效应大小 (Effect Size)
效应大小是样本量计算中最关键的业务输入参数,它代表了我们有兴趣检测的最小业务效应。
2.2.1 效应大小的类型
效应大小类型 | 适用场景 | 计算公式 | 解释 |
---|---|---|---|
Cohen’s d | 连续变量均值比较 | ( d = \frac{\mu_1 - \mu_0}{\sigma} ) | 标准化均值差异 |
Cohen’s h | 比例比较 | ( h = 2 \times \arcsin(\sqrt{p_1}) - 2 \times \arcsin(\sqrt{p_0}) ) | 比例差异的度量 |
相对提升 | 业务指标 | ( \frac{p_1 - p_0}{p_0} ) | 业务场景中更直观 |
风险比 | 生存分析 | ( \frac{\lambda_1}{\lambda_0} ) | 事件发生率的比值 |
2.2.2 选择适当的效应大小
class EffectSizeCalculator:
"""效应大小计算器"""
@staticmethod
def cohens_d(mean1, mean2, std1, std2, n1, n2):
"""计算Cohen's d (标准化均值差异)"""
# 合并标准差
pooled_std = np.sqrt(((n1 - 1) * std1**2 + (n2 - 1) * std2**2) / (n1 + n2 - 2))
return abs(mean1 - mean2) / pooled_std
@staticmethod
def cohens_h(p1, p2):
"""计算Cohen's h (比例差异的度量)"""
from math import asin, sqrt
return 2 * abs(asin(sqrt(p1)) - asin(sqrt(p2)))
@staticmethod
def relative_improvement(baseline, new_value):
"""计算相对提升"""
return (new_value - baseline) / baseline
@staticmethod
def classify_effect_size(effect_size, effect_type='cohens_d'):
"""对效应大小进行分类"""
if effect_type == 'cohens_d':
if effect_size < 0.2:
return "很小"
elif effect_size < 0.5:
return "小"
elif effect_size < 0.8:
return "中等"
else:
return "大"
elif effect_type == 'cohens_h':
if effect_size < 0.2:
return "很小"
elif effect_size < 0.5:
return "小"
elif effect_size < 0.8:
return "中等"
else:
return "大"
@staticmethod
def recommend_mde(baseline_rate, business_context):
"""基于业务场景推荐最小可检测效应(MDE)"""
recommendations = {
'high_stakes': 0.05, # 高风险决策,检测小效应
'medium_impact': 0.10, # 中等影响决策
'exploratory': 0.15, # 探索性测试
'low_risk': 0.20 # 低风险优化
}
base_mde = recommendations.get(business_context, 0.10)
# 根据基线率调整推荐
if baseline_rate < 0.01:
adjusted_mde = min(base_mde * 2, 0.50) # 极低基线率,允许更大的MDE
elif baseline_rate < 0.05:
adjusted_mde = base_mde * 1.5
elif baseline_rate > 0.50:
adjusted_mde = base_mde * 0.8 # 高基线率,建议更严格的MDE
else:
adjusted_mde = base_mde
return adjusted_mde
# 使用示例
calculator = EffectSizeCalculator()
# 计算不同的效应大小
print("效应大小计算示例:")
print(f"Cohen's d: {calculator.cohens_d(10, 12, 3, 3, 100, 100):.3f}")
print(f"Cohen's h: {calculator.cohens_h(0.10, 0.12):.3f}")
print(f"相对提升: {calculator.relative_improvement(0.10, 0.12):.1%}")
# 为不同业务场景推荐MDE
business_contexts = ['high_stakes', 'medium_impact', 'exploratory', 'low_risk']
baseline_rate = 0.08
print(f"\n基于基线转化率 {baseline_rate:.1%} 的MDE推荐:")
for context in business_contexts:
recommended_mde = calculator.recommend_mde(baseline_rate, context)
print(f" {context}: {recommended_mde:.1%}")
2.3 统计分布与抽样变异性
理解抽样分布是掌握样本量计算的基础。中心极限定理告诉我们,无论总体分布如何,样本均值的分布都近似正态分布。
def demonstrate_sampling_distribution():
"""演示抽样分布与样本量的关系"""
np.random.seed(42)
# 创建一个偏态的总体分布(模拟网站停留时间)
population = np.random.exponential(300, 10000) # 均值300秒
sample_sizes = [10, 30, 100, 500]
n_samples = 1000
plt.figure(figsize=(15, 10))
# 绘制总体分布
plt.subplot(2, 3, 1)
plt.hist(population, bins=50, density=True, alpha=0.7)
plt.title('总体分布 (指数分布)')
plt.xlabel('停留时间 (秒)')
plt.ylabel('密度')
# 绘制不同样本量下的抽样分布
for i, n in enumerate(sample_sizes, 2):
sample_means = []
for _ in range(n_samples):
sample = np.random.choice(population, n, replace=False)
sample_means.append(np.mean(sample))
plt.subplot(2, 3, i)
plt.hist(sample_means, bins=30, density=True, alpha=0.7)
plt.title(f'样本量 n={n}\n均值={np.mean(sample_means):.1f}, 标准差={np.std(sample_means):.1f}')
plt.xlabel('样本均值')
plt.ylabel('密度')
# 叠加正态分布曲线
xmin, xmax = plt.xlim()
x = np.linspace(xmin, xmax, 100)
p = stats.norm.pdf(x, np.mean(sample_means), np.std(sample_means))
plt.plot(x, p, 'r-', linewidth=2)
plt.tight_layout()
plt.show()
# 运行演示
demonstrate_sampling_distribution()
Lexical error on line 2. Unrecognized text.
...[假设检验框架] --> B[零假设 H₀] A --> C[备择假设
-----------------------^
III. 样本量计算公式推导
3.1 比例检验的样本量计算
比例检验是A/B测试中最常见的场景,比如转化率、点击率等二元指标的比较。
3.1.1 公式推导
对于两个独立样本的比例检验,样本量计算公式为:
[ n = \frac{(z_{1-\alpha/2} \sqrt{2p(1-p)} + z_{1-\beta} \sqrt{p_1(1-p_1) + p_0(1-p_0)})^2}{(p_1 - p_0)^2} ]
其中:
- ( p_0 ) = 基线比例(对照组)
- ( p_1 ) = 期望比例(实验组)
- ( p = \frac{p_0 + p_1}{2} ) = 合并比例
- ( z_{1-\alpha/2} ) = 标准正态分布的(1-α/2)分位数
- ( z_{1-\beta} ) = 标准正态分布的(1-β)分位数
import numpy as np
from scipy import stats
import matplotlib.pyplot as plt
class SampleSizeCalculator:
"""样本量计算器基类"""
def __init__(self, alpha=0.05, power=0.8):
self.alpha = alpha
self.power = power
self.z_alpha = stats.norm.ppf(1 - alpha/2)
self.z_beta = stats.norm.ppf(power)
def calculate_power(self, n, effect_size):
"""计算给定样本量下的统计功效"""
raise NotImplementedError("子类必须实现此方法")
class ProportionSampleSize(SampleSizeCalculator):
"""比例检验的样本量计算"""
def calculate(self, p0, p1, ratio=1.0):
"""
计算比例检验所需的样本量
Parameters:
p0: 对照组比例
p1: 实验组比例
ratio: 实验组与对照组的样本量比例
"""
# 计算合并比例
p_pool = (p0 + ratio * p1) / (1 + ratio)
# 计算标准误
se_pool = np.sqrt(p_pool * (1 - p_pool) * (1 + 1/ratio))
se_alt = np.sqrt(p0*(1-p0) + p1*(1-p1)/ratio)
effect_size = abs(p1 - p0)
numerator = self.z_alpha * se_pool + self.z_beta * se_alt
n = (numerator / effect_size) ** 2
return int(np.ceil(n))
def calculate_power(self, n, p0, p1, ratio=1.0):
"""计算给定样本量下的统计功效"""
# 计算合并比例
p_pool = (p0 + ratio * p1) / (1 + ratio)
# 计算标准误
se_pool = np.sqrt(p_pool * (1 - p_pool) * (1 + 1/ratio))
se_alt = np.sqrt(p0*(1-p0) + p1*(1-p1)/ratio)
effect_size = abs(p1 - p0)
# 计算z统计量
z_stat = (effect_size - self.z_alpha * se_pool) / se_alt
# 计算统计功效
power = stats.norm.cdf(z_stat)
return power
def sensitivity_analysis(self, p0, n_per_group, power_threshold=0.8):
"""敏感性分析:检测给定样本量能检测到的最小效应"""
p1_values = np.linspace(p0, p0 * 2, 1000)
detectable_effects = []
for p1 in p1_values:
if p1 <= p0:
continue
power = self.calculate_power(n_per_group, p0, p1)
if power >= power_threshold:
relative_effect = (p1 - p0) / p0
detectable_effects.append(relative_effect)
if detectable_effects:
min_detectable = min(detectable_effects)
return min_detectable
else:
return None
# 使用示例
calculator = ProportionSampleSize(alpha=0.05, power=0.8)
# 基本计算
p0 = 0.10 # 基线转化率 10%
p1 = 0.12 # 期望检测的转化率 12% (20%相对提升)
sample_size = calculator.calculate(p0, p1)
print(f"比例检验样本量计算:")
print(f"基线转化率: {p0:.1%}")
print(f"目标转化率: {p1:.1%}")
print(f"相对提升: {(p1-p0)/p0:.1%}")
print(f"每组所需样本量: {sample_size}")
print(f"总样本量: {sample_size * 2}")
# 计算统计功效
power = calculator.calculate_power(sample_size, p0, p1)
print(f"预计统计功效: {power:.3f}")
# 敏感性分析
min_detectable = calculator.sensitivity_analysis(p0, 1000)
print(f"在1000样本量下可检测的最小相对效应: {min_detectable:.1%}")
3.2 均值检验的样本量计算
对于连续变量的比较,如平均订单价值、用户停留时间等,我们需要使用均值检验的样本量公式。
3.2.1 公式推导
两个独立样本均值检验的样本量公式:
[ n = \frac{(z_{1-\alpha/2} + z_{1-\beta})^2 \times 2\sigma^2}{(\mu_1 - \mu_0)^2} ]
其中:
- ( \mu_0 ) = 对照组均值
- ( \mu_1 ) = 实验组均值
- ( \sigma ) = 总体标准差(假设两组方差相等)
class MeanSampleSize(SampleSizeCalculator):
"""均值检验的样本量计算"""
def calculate(self, mu0, mu1, sigma, ratio=1.0):
"""
计算均值检验所需的样本量
Parameters:
mu0: 对照组均值
mu1: 实验组均值
sigma: 总体标准差
ratio: 实验组与对照组的样本量比例
"""
effect_size = abs(mu1 - mu0)
numerator = (self.z_alpha + self.z_beta) ** 2
denominator = (effect_size / sigma) ** 2
n = numerator / denominator * (1 + 1/ratio)
return int(np.ceil(n))
def calculate_power(self, n, mu0, mu1, sigma, ratio=1.0):
"""计算给定样本量下的统计功效"""
effect_size = abs(mu1 - mu0)
# 计算非中心化参数
noncentrality = effect_size / (sigma * np.sqrt((1 + 1/ratio) / n))
# 计算临界值
critical_value = stats.t.ppf(1 - self.alpha/2, n * (1 + ratio) - 2)
# 计算统计功效
power = 1 - stats.t.cdf(critical_value, n * (1 + ratio) - 2, noncentrality)
return power
def calculate_using_cohens_d(self, cohens_d, ratio=1.0):
"""使用Cohen's d计算样本量"""
numerator = (self.z_alpha + self.z_beta) ** 2
n = numerator / (cohens_d ** 2) * (1 + 1/ratio)
return int(np.ceil(n))
# 均值检验示例
mean_calculator = MeanSampleSize(alpha=0.05, power=0.8)
# 场景:平均订单价值测试
mu0 = 100 # 当前平均订单价值 100元
mu1 = 110 # 期望检测的平均订单价值 110元
sigma = 30 # 历史数据的标准差
sample_size = mean_calculator.calculate(mu0, mu1, sigma)
print(f"\n均值检验样本量计算:")
print(f"当前均值: {mu0}")
print(f"目标均值: {mu1}")
print(f"标准差: {sigma}")
print(f"Cohen's d: {abs(mu1-mu0)/sigma:.3f}")
print(f"每组所需样本量: {sample_size}")
# 使用Cohen's d计算
cohens_d = 0.3 # 中等效应大小
sample_size_d = mean_calculator.calculate_using_cohens_d(cohens_d)
print(f"对于Cohen's d = {cohens_d},每组所需样本量: {sample_size_d}")
3.3 方差分析(ANOVA)的样本量计算
当比较多个组时(如A/B/C测试),我们需要使用ANOVA的样本量计算方法。
class AnovaSampleSize:
"""ANOVA样本量计算"""
def __init__(self, alpha=0.05, power=0.8):
self.alpha = alpha
self.power = power
def calculate(self, k, effect_size, groups_equal=True):
"""
计算单因素ANOVA所需的样本量
Parameters:
k: 组数
effect_size: Cohen's f 效应大小
groups_equal: 是否各组样本量相等
"""
from statsmodels.stats.power import FTestAnovaPower
power_analysis = FTestAnovaPower()
if groups_equal:
# 各组样本量相等
n_per_group = power_analysis.solve_power(
effect_size=effect_size,
nobs=None,
alpha=self.alpha,
power=self.power,
k_groups=k
)
return int(np.ceil(n_per_group))
else:
# 这里简化处理,实际应用中需要更复杂的计算
n_per_group = power_analysis.solve_power(
effect_size=effect_size,
nobs=None,
alpha=self.alpha,
power=self.power,
k_groups=k
)
return int(np.ceil(n_per_group))
def calculate_effect_size_f(self, means, sigma):
"""计算Cohen's f效应大小"""
grand_mean = np.mean(means)
between_variance = np.var(means)
f = np.sqrt(between_variance) / sigma
return f
# ANOVA示例
anova_calculator = AnovaSampleSize(alpha=0.05, power=0.8)
# 场景:三个不同定价策略的测试
k = 3 # 三个组
effect_size_f = 0.25 # 中等效应大小
n_per_group = anova_calculator.calculate(k, effect_size_f)
print(f"\nANOVA样本量计算:")
print(f"组数: {k}")
print(f"Cohen's f: {effect_size_f}")
print(f"每组所需样本量: {n_per_group}")
print(f"总样本量: {n_per_group * k}")
# 从均值计算效应大小
means = [100, 110, 105] # 三组的均值
sigma = 15 # 组内标准差
effect_size_calculated = anova_calculator.calculate_effect_size_f(means, sigma)
print(f"基于均值和标准差计算的Cohen's f: {effect_size_calculated:.3f}")
3.4 公式可视化与比较
def visualize_sample_size_relationships():
"""可视化样本量与各种参数的关系"""
# 创建图形
fig, axes = plt.subplots(2, 2, figsize=(15, 12))
# 1. 样本量 vs 效应大小
effect_sizes = np.linspace(0.05, 0.3, 50)
baseline = 0.10
sample_sizes_effect = []
calculator = ProportionSampleSize()
for es in effect_sizes:
p1 = baseline * (1 + es)
n = calculator.calculate(baseline, p1)
sample_sizes_effect.append(n)
axes[0, 0].plot(effect_sizes, sample_sizes_effect, 'b-', linewidth=2)
axes[0, 0].set_xlabel('相对效应大小')
axes[0, 0].set_ylabel('所需样本量 (每组)')
axes[0, 0].set_title('样本量 vs 效应大小')
axes[0, 0].grid(True, alpha=0.3)
# 2. 样本量 vs 统计功效
powers = np.linspace(0.5, 0.95, 50)
sample_sizes_power = []
for power in powers:
calculator_power = ProportionSampleSize(power=power)
n = calculator_power.calculate(0.10, 0.12)
sample_sizes_power.append(n)
axes[0, 1].plot(powers, sample_sizes_power, 'r-', linewidth=2)
axes[0, 1].set_xlabel('统计功效')
axes[0, 1].set_ylabel('所需样本量 (每组)')
axes[0, 1].set_title('样本量 vs 统计功效')
axes[0, 1].grid(True, alpha=0.3)
# 3. 样本量 vs 显著性水平
alphas = np.linspace(0.01, 0.1, 50)
sample_sizes_alpha = []
for alpha in alphas:
calculator_alpha = ProportionSampleSize(alpha=alpha)
n = calculator_alpha.calculate(0.10, 0.12)
sample_sizes_alpha.append(n)
axes[1, 0].plot(alphas, sample_sizes_alpha, 'g-', linewidth=2)
axes[1, 0].set_xlabel('显著性水平 (α)')
axes[1, 0].set_ylabel('所需样本量 (每组)')
axes[1, 0].set_title('样本量 vs 显著性水平')
axes[1, 0].grid(True, alpha=0.3)
# 4. 样本量 vs 基线转化率
baselines = np.linspace(0.01, 0.5, 50)
sample_sizes_baseline = []
for baseline in baselines:
n = calculator.calculate(baseline, baseline * 1.2) # 20%提升
sample_sizes_baseline.append(n)
axes[1, 1].plot(baselines, sample_sizes_baseline, 'purple', linewidth=2)
axes[1, 1].set_xlabel('基线转化率')
axes[1, 1].set_ylabel('所需样本量 (每组)')
axes[1, 1].set_title('样本量 vs 基线转化率 (20%提升)')
axes[1, 1].grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
# 运行可视化
visualize_sample_size_relationships()
Lexical error on line 14. Unrecognized text.
... B2 --> B21[基线比例 p₀] B2 --> B22[目标
-----------------------^
IV. 实际应用与代码实现
4.1 完整的样本量计算系统
在实际业务中,我们需要一个健壮的样本量计算系统来处理各种复杂场景。
import pandas as pd
from datetime import datetime, timedelta
class AdvancedSampleSizeCalculator:
"""高级样本量计算系统"""
def __init__(self, alpha=0.05, power=0.8):
self.alpha = alpha
self.power = power
self.proportion_calc = ProportionSampleSize(alpha, power)
self.mean_calc = MeanSampleSize(alpha, power)
self.anova_calc = AnovaSampleSize(alpha, power)
# 缓存计算结果
self.calculation_history = []
def calculate_for_proportion(self, baseline_rate, mde, ratio=1.0,
test_type='two_sided', adjustment=None):
"""比例检验的样本量计算"""
p1 = baseline_rate * (1 + mde)
# 基本计算
sample_size = self.proportion_calc.calculate(baseline_rate, p1, ratio)
# 应用调整
if adjustment == 'continuity':
# 连续性校正
sample_size = int(sample_size * 1.05)
elif adjustment == 'multiple_comparisons':
# 多重比较校正 (Bonferroni)
sample_size = int(sample_size * 1.1)
# 记录计算历史
calculation_record = {
'timestamp': datetime.now(),
'test_type': 'proportion',
'baseline': baseline_rate,
'mde': mde,
'sample_size': sample_size,
'parameters': {
'alpha': self.alpha,
'power': self.power,
'ratio': ratio
}
}
self.calculation_history.append(calculation_record)
return sample_size
def calculate_for_means(self, baseline_mean, mde, std, ratio=1.0):
"""均值检验的样本量计算"""
target_mean = baseline_mean * (1 + mde)
sample_size = self.mean_calc.calculate(baseline_mean, target_mean, std, ratio)
calculation_record = {
'timestamp': datetime.now(),
'test_type': 'means',
'baseline': baseline_mean,
'mde': mde,
'std': std,
'sample_size': sample_size,
'parameters': {
'alpha': self.alpha,
'power': self.power,
'ratio': ratio
}
}
self.calculation_history.append(calculation_record)
return sample_size
def calculate_experiment_duration(self, sample_size_per_variant, daily_traffic,
variants=2, traffic_allocation=1.0):
"""计算实验所需时长"""
total_sample_size = sample_size_per_variant * variants
available_daily_traffic = daily_traffic * traffic_allocation
if available_daily_traffic == 0:
raise ValueError("日流量不能为0")
duration_days = total_sample_size / available_daily_traffic
return max(1, int(np.ceil(duration_days)))
def power_analysis(self, test_type, **kwargs):
"""功效分析:计算不同样本量下的统计功效"""
sample_sizes = range(100, 10000, 100)
powers = []
if test_type == 'proportion':
p0 = kwargs['baseline_rate']
p1 = p0 * (1 + kwargs['mde'])
for n in sample_sizes:
power = self.proportion_calc.calculate_power(n, p0, p1)
powers.append(power)
elif test_type == 'means':
mu0 = kwargs['baseline_mean']
mu1 = mu0 * (1 + kwargs['mde'])
sigma = kwargs['std']
for n in sample_sizes:
power = self.mean_calc.calculate_power(n, mu0, mu1, sigma)
powers.append(power)
return list(sample_sizes), powers
def generate_report(self, test_type, **kwargs):
"""生成详细的样本量分析报告"""
if test_type == 'proportion':
baseline = kwargs['baseline_rate']
mde = kwargs['mde']
sample_size = self.calculate_for_proportion(baseline, mde)
duration = self.calculate_experiment_duration(sample_size, kwargs.get('daily_traffic', 1000))
# 功效分析
sample_sizes, powers = self.power_analysis('proportion',
baseline_rate=baseline, mde=mde)
elif test_type == 'means':
baseline = kwargs['baseline_mean']
mde = kwargs['mde']
std = kwargs['std']
sample_size = self.calculate_for_means(baseline, mde, std)
duration = self.calculate_experiment_duration(sample_size, kwargs.get('daily_traffic', 1000))
# 功效分析
sample_sizes, powers = self.power_analysis('means',
baseline_mean=baseline, mde=mde, std=std)
# 生成报告
report = {
'test_type': test_type,
'parameters': kwargs,
'sample_size_per_variant': sample_size,
'total_sample_size': sample_size * 2,
'estimated_duration_days': duration,
'power_analysis': {
'sample_sizes': sample_sizes,
'powers': powers
}
}
return report
# 使用高级计算系统
advanced_calc = AdvancedSampleSizeCalculator(alpha=0.05, power=0.8)
# 比例检验示例
prop_report = advanced_calc.generate_report(
'proportion',
baseline_rate=0.08,
mde=0.15, # 15%相对提升
daily_traffic=2000
)
print("比例检验样本量报告:")
print(f"基线转化率: {prop_report['parameters']['baseline_rate']:.1%}")
print(f"目标相对提升: {prop_report['parameters']['mde']:.1%}")
print(f"每组样本量: {prop_report['sample_size_per_variant']}")
print(f"总样本量: {prop_report['total_sample_size']}")
print(f"预计实验时长: {prop_report['estimated_duration_days']} 天")
# 均值检验示例
mean_report = advanced_calc.generate_report(
'means',
baseline_mean=100,
mde=0.10, # 10%提升
std=25,
daily_traffic=1500
)
print("\n均值检验样本量报告:")
print(f"基线均值: {mean_report['parameters']['baseline_mean']}")
print(f"目标相对提升: {mean_report['parameters']['mde']:.1%}")
print(f"标准差: {mean_report['parameters']['std']}")
print(f"每组样本量: {mean_report['sample_size_per_variant']}")
print(f"总样本量: {mean_report['total_sample_size']}")
print(f"预计实验时长: {mean_report['estimated_duration_days']} 天")
4.2 交互式样本量计算工具
创建一个用户友好的交互式工具,方便业务团队使用。
import ipywidgets as widgets
from IPython.display import display, HTML
import plotly.graph_objects as go
from plotly.subplots import make_subplots
class InteractiveSampleSizeCalculator:
"""交互式样本量计算工具"""
def __init__(self):
self.calculator = AdvancedSampleSizeCalculator()
# 创建控件
self.test_type = widgets.Dropdown(
options=['比例检验', '均值检验'],
value='比例检验',
description='测试类型:'
)
self.baseline = widgets.FloatSlider(
value=0.10,
min=0.01,
max=0.50,
step=0.01,
description='基线值:',
readout_format='.1%' if self.test_type.value == '比例检验' else '.1f'
)
self.mde = widgets.FloatSlider(
value=0.10,
min=0.05,
max=0.50,
step=0.05,
description='MDE:',
readout_format='.1%'
)
self.alpha = widgets.FloatSlider(
value=0.05,
min=0.01,
max=0.10,
step=0.01,
description='α:',
readout_format='.3f'
)
self.power = widgets.FloatSlider(
value=0.80,
min=0.50,
max=0.95,
step=0.05,
description='功效:',
readout_format='.2f'
)
self.daily_traffic = widgets.IntSlider(
value=1000,
min=100,
max=10000,
step=100,
description='日流量:'
)
# 均值检验特有控件
self.std = widgets.FloatSlider(
value=25,
min=1,
max=100,
step=5,
description='标准差:'
)
# 输出控件
self.output = widgets.Output()
# 绑定事件
self.test_type.observe(self.on_test_type_change, names='value')
self.alpha.observe(self.on_parameter_change, names='value')
self.power.observe(self.on_parameter_change, names='value')
self.baseline.observe(self.on_parameter_change, names='value')
self.mde.observe(self.on_parameter_change, names='value')
self.daily_traffic.observe(self.on_parameter_change, names='value')
self.std.observe(self.on_parameter_change, names='value')
def on_test_type_change(self, change):
"""测试类型改变时的回调"""
if change['new'] == '比例检验':
self.baseline.max = 0.50
self.baseline.readout_format = '.1%'
self.baseline.value = 0.10
else:
self.baseline.max = 200
self.baseline.readout_format = '.1f'
self.baseline.value = 100
self.update_calculation()
def on_parameter_change(self, change):
"""参数改变时的回调"""
self.update_calculation()
def update_calculation(self):
"""更新计算结果显示"""
with self.output:
self.output.clear_output()
# 更新计算器参数
self.calculator.alpha = self.alpha.value
self.calculator.power = self.power.value
if self.test_type.value == '比例检验':
sample_size = self.calculator.calculate_for_proportion(
self.baseline.value, self.mde.value
)
duration = self.calculator.calculate_experiment_duration(
sample_size, self.daily_traffic.value
)
# 显示结果
display(HTML(f"""
<div style="background-color:#f0f8ff; padding:15px; border-radius:5px; margin:10px 0;">
<h3>📊 样本量计算结果</h3>
<p><b>测试类型:</b> 比例检验</p>
<p><b>基线转化率:</b> {self.baseline.value:.1%}</p>
<p><b>最小可检测效应(MDE):</b> {self.mde.value:.1%}</p>
<p><b>显著性水平(α):</b> {self.alpha.value}</p>
<p><b>统计功效:</b> {self.power.value:.0%}</p>
<hr>
<p><b>每组所需样本量:</b> <span style="color:blue; font-size:1.2em">{sample_size:,}</span></p>
<p><b>总样本量:</b> <span style="color:green; font-size:1.2em">{sample_size * 2:,}</span></p>
<p><b>预计实验时长:</b> <span style="color:orange; font-size:1.2em">{duration} 天</span></p>
</div>
"""))
# 绘制功效分析图
self.plot_power_analysis('proportion')
else:
sample_size = self.calculator.calculate_for_means(
self.baseline.value, self.mde.value, self.std.value
)
duration = self.calculator.calculate_experiment_duration(
sample_size, self.daily_traffic.value
)
# 显示结果
display(HTML(f"""
<div style="background-color:#f0f8ff; padding:15px; border-radius:5px; margin:10px 0;">
<h3>📊 样本量计算结果</h3>
<p><b>测试类型:</b> 均值检验</p>
<p><b>基线均值:</b> {self.baseline.value:.1f}</p>
<p><b>最小可检测效应(MDE):</b> {self.mde.value:.1%}</p>
<p><b>标准差:</b> {self.std.value:.1f}</p>
<p><b>显著性水平(α):</b> {self.alpha.value}</p>
<p><b>统计功效:</b> {self.power.value:.0%}</p>
<hr>
<p><b>每组所需样本量:</b> <span style="color:blue; font-size:1.2em">{sample_size:,}</span></p>
<p><b>总样本量:</b> <span style="color:green; font-size:1.2em">{sample_size * 2:,}</span></p>
<p><b>预计实验时长:</b> <span style="color:orange; font-size:1.2em">{duration} 天</span></p>
</div>
"""))
# 绘制功效分析图
self.plot_power_analysis('means')
def plot_power_analysis(self, test_type):
"""绘制功效分析图"""
if test_type == 'proportion':
sample_sizes, powers = self.calculator.power_analysis(
'proportion',
baseline_rate=self.baseline.value,
mde=self.mde.value
)
else:
sample_sizes, powers = self.calculator.power_analysis(
'means',
baseline_mean=self.baseline.value,
mde=self.mde.value,
std=self.std.value
)
# 使用plotly创建交互式图表
fig = go.Figure()
fig.add_trace(go.Scatter(
x=sample_sizes,
y=powers,
mode='lines',
name='统计功效',
line=dict(color='blue', width=3)
))
# 添加目标功效线
fig.add_hline(y=self.power.value, line_dash="dash",
line_color="red", annotation_text=f"目标功效 ({self.power.value:.0%})")
# 计算当前设置下的样本量
if test_type == 'proportion':
current_sample_size = self.calculator.calculate_for_proportion(
self.baseline.value, self.mde.value
)
else:
current_sample_size = self.calculator.calculate_for_means(
self.baseline.value, self.mde.value, self.std.value
)
# 添加当前样本量标记
current_power = None
for size, power in zip(sample_sizes, powers):
if size >= current_sample_size:
current_power = power
break
if current_power:
fig.add_trace(go.Scatter(
x=[current_sample_size],
y=[current_power],
mode='markers',
marker=dict(color='red', size=10),
name=f'当前设置 (n={current_sample_size})'
))
fig.update_layout(
title='统计功效 vs 样本量',
xaxis_title='样本量 (每组)',
yaxis_title='统计功效',
showlegend=True,
height=400
)
fig.show()
def display(self):
"""显示交互式界面"""
# 根据测试类型显示不同的控件
if self.test_type.value == '比例检验':
controls = widgets.VBox([
self.test_type,
self.baseline,
self.mde,
self.alpha,
self.power,
self.daily_traffic
])
else:
controls = widgets.VBox([
self.test_type,
self.baseline,
self.mde,
self.std,
self.alpha,
self.power,
self.daily_traffic
])
# 显示界面
display(widgets.HBox([controls, self.output]))
# 初始计算
self.update_calculation()
# 使用交互式工具
# interactive_calc = InteractiveSampleSizeCalculator()
# interactive_calc.display()
4.3 实际业务场景应用
让我们通过几个真实的业务场景来演示样本量计算的应用。
def business_scenario_analysis():
"""业务场景分析"""
calculator = AdvancedSampleSizeCalculator()
scenarios = [
{
'name': '电商注册流程优化',
'type': 'proportion',
'baseline': 0.08,
'mde': 0.10,
'daily_traffic': 5000,
'business_impact': '高',
'description': '优化注册表单,期望提升10%的注册转化率'
},
{
'name': '内容推荐算法改进',
'type': 'proportion',
'baseline': 0.15,
'mde': 0.05,
'daily_traffic': 10000,
'business_impact': '中',
'description': '改进推荐算法,期望提升5%的点击率'
},
{
'name': '定价策略测试',
'type': 'means',
'baseline': 120,
'mde': 0.08,
'std': 40,
'daily_traffic': 2000,
'business_impact': '高',
'description': '测试新的定价策略,期望提升8%的平均订单价值'
},
{
'name': '页面加载速度优化',
'type': 'means',
'baseline': 3.5,
'mde': 0.15,
'std': 1.2,
'daily_traffic': 8000,
'business_impact': '中',
'description': '优化页面加载速度,期望减少15%的加载时间'
}
]
results = []
for scenario in scenarios:
if scenario['type'] == 'proportion':
sample_size = calculator.calculate_for_proportion(
scenario['baseline'], scenario['mde']
)
else:
sample_size = calculator.calculate_for_means(
scenario['baseline'], scenario['mde'], scenario['std']
)
duration = calculator.calculate_experiment_duration(
sample_size, scenario['daily_traffic']
)
results.append({
'场景': scenario['name'],
'描述': scenario['description'],
'业务影响': scenario['business_impact'],
'每组样本量': f"{sample_size:,}",
'总样本量': f"{sample_size * 2:,}",
'预计时长(天)': duration,
'日流量': f"{scenario['daily_traffic']:,}"
})
# 创建结果表格
df_results = pd.DataFrame(results)
# 样式设置
def highlight_high_impact(row):
if row['业务影响'] == '高':
return ['background-color: #ffcccc'] * len(row)
else:
return [''] * len(row)
styled_df = df_results.style.apply(highlight_high_impact, axis=1)
return styled_df
# 运行业务场景分析
styled_results = business_scenario_analysis()
print("业务场景样本量分析")
styled_results
- 点赞
- 收藏
- 关注作者
评论(0)