A/B测试样本量计算的统计学基础

举报
数字扫地僧 发表于 2025/09/30 16:59:32 2025/09/30
【摘要】 I. 为什么样本量计算如此重要 1.1 样本量不足的代价在A/B测试实践中,样本量问题往往被低估。让我们通过一个实际案例来理解其重要性:import numpy as npimport matplotlib.pyplot as pltfrom scipy import statsdef demonstrate_sample_size_impact(): """展示样本量对A/B测试结...

I. 为什么样本量计算如此重要

1.1 样本量不足的代价

在A/B测试实践中,样本量问题往往被低估。让我们通过一个实际案例来理解其重要性:

import numpy as np
import matplotlib.pyplot as plt
from scipy import stats

def demonstrate_sample_size_impact():
    """展示样本量对A/B测试结果的影响"""
    np.random.seed(42)
    
    # 真实效应:实验组转化率比对照组高10%
    true_control_rate = 0.10
    true_treatment_rate = 0.11  # 实际提升10%
    
    sample_sizes = [100, 500, 1000, 5000]
    n_simulations = 1000
    
    detection_rates = []
    
    for n in sample_sizes:
        significant_count = 0
        
        for _ in range(n_simulations):
            # 模拟A/B测试数据
            control_conversions = np.random.binomial(1, true_control_rate, n)
            treatment_conversions = np.random.binomial(1, true_treatment_rate, n)
            
            # 执行统计检验
            from statsmodels.stats.proportion import proportions_ztest
            count = [np.sum(control_conversions), np.sum(treatment_conversions)]
            nobs = [n, n]
            z_stat, p_value = proportions_ztest(count, nobs)
            
            if p_value < 0.05:
                significant_count += 1
        
        detection_rate = significant_count / n_simulations
        detection_rates.append(detection_rate)
        print(f"样本量 {n}: 检测到显著效应的概率 = {detection_rate:.3f}")
    
    # 可视化结果
    plt.figure(figsize=(10, 6))
    plt.plot(sample_sizes, detection_rates, 'bo-', linewidth=2, markersize=8)
    plt.axhline(y=0.8, color='r', linestyle='--', label='期望的统计功效 (80%)')
    plt.xlabel('样本量 (每组)')
    plt.ylabel('检测到真实效应的概率')
    plt.title('样本量对统计功效的影响')
    plt.legend()
    plt.grid(True, alpha=0.3)
    plt.show()

# 运行演示
demonstrate_sample_size_impact()

运行上述代码,您会发现一个关键现象:即使存在真实的业务效应,样本量不足也会导致我们无法可靠地检测到这种效应

1.2 样本量计算的核心价值

价值维度 具体影响 长期后果
统计可靠性 控制第一类错误(α)和第二类错误(β) 避免基于随机波动做出错误决策
业务效率 确保检测到有业务意义的最小效应 资源投入与预期回报相匹配
实验可信度 结果具有足够的统计功效 建立数据驱动决策的文化
资源优化 避免过度收集数据或过早停止实验 提高实验项目的投资回报率

1.3 真实世界案例分析

考虑一个电商网站的注册流程优化测试:

  • 现状:当前注册转化率为8%
  • 目标:检测至少10%的相对提升(即从8%提升到8.8%)
  • 风险容忍度:假阳性率5%,假阴性率20%

如果没有进行样本量计算,团队可能只在收集了1000个用户后就停止实验。但通过正确的计算,他们会发现需要近15000个用户才能可靠地检测这种规模的效应。

def calculate_required_sample_size(baseline_rate, mde, alpha=0.05, power=0.8):
    """计算比例检验所需的样本量"""
    from statsmodels.stats.power import NormalIndPower
    from math import asin, sqrt
    
    # 计算效应量 (Cohen's h)
    effect_size = 2 * asin(sqrt(baseline_rate * (1 + mde))) - 2 * asin(sqrt(baseline_rate))
    
    # 使用statsmodels计算样本量
    power_analysis = NormalIndPower()
    sample_size = power_analysis.solve_power(
        effect_size=effect_size, 
        alpha=alpha, 
        power=power,
        ratio=1.0  # 两组样本量相等
    )
    
    return int(round(sample_size))

# 计算示例
baseline = 0.08
mde = 0.10  # 10%相对提升
required_n = calculate_required_sample_size(baseline, mde)
print(f"每组需要的样本量: {required_n}")
print(f"总样本量: {required_n * 2}")
样本量计算重要性
统计可靠性
业务效率
实验可信度
资源优化
控制第一类错误
控制第二类错误
确保统计功效
检测最小业务效应
避免资源浪费
最大化投资回报
结果可重复性
决策可信度
建立数据文化
避免过度采样
防止过早停止
优化实验排期
常见问题
样本量不足
样本量过大
假阴性风险
浪费实验机会
资源浪费
机会成本

II. 统计学基础概念

2.1 假设检验框架

样本量计算建立在假设检验的统计学框架之上。理解这个框架是掌握样本量计算的关键。

2.1.1 零假设与备择假设

在A/B测试中,我们建立两个相互竞争的假设:

  • 零假设 (H₀):实验组和对照组没有显著差异
  • 备择假设 (H₁):实验组和对照组存在显著差异

样本量计算的目标是确保当备择假设为真时,我们有足够的能力检测到这种差异。

2.1.2 决策错误类型

错误类型 定义 概率表示 业务影响
第一类错误 (Type I Error) 错误拒绝真零假设 α (显著性水平) 假阳性,推出无效的改动
第二类错误 (Type II Error) 错误接受假零假设 β 假阴性,错过有价值的优化
统计功效 (Statistical Power) 正确拒绝假零假设 1-β 检测真实效应的能力
import numpy as np
import matplotlib.pyplot as plt
from scipy import stats

def visualize_hypothesis_testing():
    """可视化假设检验中的错误类型"""
    # 设置参数
    mu_0 = 0  # 零假设的均值
    mu_1 = 0.5  # 备择假设的均值
    sigma = 1  # 标准差
    n = 50  # 样本量
    alpha = 0.05  # 显著性水平
    
    # 计算标准误
    se = sigma / np.sqrt(n)
    
    # 计算临界值 (双侧检验)
    z_critical = stats.norm.ppf(1 - alpha/2)
    critical_value = z_critical * se
    
    # 创建x轴数据
    x = np.linspace(-3, 3, 1000)
    
    # 计算两个分布的概率密度函数
    pdf_h0 = stats.norm.pdf(x, mu_0, se)
    pdf_h1 = stats.norm.pdf(x, mu_1, se)
    
    # 绘制图形
    plt.figure(figsize=(12, 8))
    
    # 绘制分布曲线
    plt.plot(x, pdf_h0, 'b-', linewidth=2, label='零假设分布 (H₀)')
    plt.plot(x, pdf_h1, 'r-', linewidth=2, label='备择假设分布 (H₁)')
    
    # 填充第一类错误区域 (α)
    x_alpha_left = np.linspace(-3, -critical_value, 100)
    x_alpha_right = np.linspace(critical_value, 3, 100)
    plt.fill_between(x_alpha_left, stats.norm.pdf(x_alpha_left, mu_0, se), 
                     alpha=0.3, color='blue', label=f'第一类错误 (α = {alpha})')
    plt.fill_between(x_alpha_right, stats.norm.pdf(x_alpha_right, mu_0, se), 
                     alpha=0.3, color='blue')
    
    # 填充第二类错误区域 (β)
    x_beta = np.linspace(-critical_value, critical_value, 100)
    plt.fill_between(x_beta, stats.norm.pdf(x_beta, mu_1, se), 
                     alpha=0.3, color='red', label='第二类错误 (β)')
    
    # 添加垂直线表示临界值
    plt.axvline(-critical_value, color='gray', linestyle='--', alpha=0.7)
    plt.axvline(critical_value, color='gray', linestyle='--', alpha=0.7, 
                label=f'临界值 (±{critical_value:.2f})')
    
    # 计算统计功效
    power = 1 - stats.norm.cdf(critical_value, mu_1, se) + stats.norm.cdf(-critical_value, mu_1, se)
    
    plt.title(f'假设检验错误类型可视化\n统计功效 (1-β) = {power:.3f}')
    plt.xlabel('效应大小')
    plt.ylabel('概率密度')
    plt.legend()
    plt.grid(True, alpha=0.3)
    plt.show()
    
    return power

# 运行可视化
power = visualize_hypothesis_testing()
print(f"在当前参数下的统计功效: {power:.3f}")

2.2 效应大小 (Effect Size)

效应大小是样本量计算中最关键的业务输入参数,它代表了我们有兴趣检测的最小业务效应。

2.2.1 效应大小的类型

效应大小类型 适用场景 计算公式 解释
Cohen’s d 连续变量均值比较 ( d = \frac{\mu_1 - \mu_0}{\sigma} ) 标准化均值差异
Cohen’s h 比例比较 ( h = 2 \times \arcsin(\sqrt{p_1}) - 2 \times \arcsin(\sqrt{p_0}) ) 比例差异的度量
相对提升 业务指标 ( \frac{p_1 - p_0}{p_0} ) 业务场景中更直观
风险比 生存分析 ( \frac{\lambda_1}{\lambda_0} ) 事件发生率的比值

2.2.2 选择适当的效应大小

class EffectSizeCalculator:
    """效应大小计算器"""
    
    @staticmethod
    def cohens_d(mean1, mean2, std1, std2, n1, n2):
        """计算Cohen's d (标准化均值差异)"""
        # 合并标准差
        pooled_std = np.sqrt(((n1 - 1) * std1**2 + (n2 - 1) * std2**2) / (n1 + n2 - 2))
        return abs(mean1 - mean2) / pooled_std
    
    @staticmethod
    def cohens_h(p1, p2):
        """计算Cohen's h (比例差异的度量)"""
        from math import asin, sqrt
        return 2 * abs(asin(sqrt(p1)) - asin(sqrt(p2)))
    
    @staticmethod
    def relative_improvement(baseline, new_value):
        """计算相对提升"""
        return (new_value - baseline) / baseline
    
    @staticmethod
    def classify_effect_size(effect_size, effect_type='cohens_d'):
        """对效应大小进行分类"""
        if effect_type == 'cohens_d':
            if effect_size < 0.2:
                return "很小"
            elif effect_size < 0.5:
                return "小"
            elif effect_size < 0.8:
                return "中等"
            else:
                return "大"
        elif effect_type == 'cohens_h':
            if effect_size < 0.2:
                return "很小"
            elif effect_size < 0.5:
                return "小"
            elif effect_size < 0.8:
                return "中等"
            else:
                return "大"
    
    @staticmethod
    def recommend_mde(baseline_rate, business_context):
        """基于业务场景推荐最小可检测效应(MDE)"""
        recommendations = {
            'high_stakes': 0.05,  # 高风险决策,检测小效应
            'medium_impact': 0.10,  # 中等影响决策
            'exploratory': 0.15,  # 探索性测试
            'low_risk': 0.20  # 低风险优化
        }
        
        base_mde = recommendations.get(business_context, 0.10)
        
        # 根据基线率调整推荐
        if baseline_rate < 0.01:
            adjusted_mde = min(base_mde * 2, 0.50)  # 极低基线率,允许更大的MDE
        elif baseline_rate < 0.05:
            adjusted_mde = base_mde * 1.5
        elif baseline_rate > 0.50:
            adjusted_mde = base_mde * 0.8  # 高基线率,建议更严格的MDE
        else:
            adjusted_mde = base_mde
        
        return adjusted_mde

# 使用示例
calculator = EffectSizeCalculator()

# 计算不同的效应大小
print("效应大小计算示例:")
print(f"Cohen's d: {calculator.cohens_d(10, 12, 3, 3, 100, 100):.3f}")
print(f"Cohen's h: {calculator.cohens_h(0.10, 0.12):.3f}")
print(f"相对提升: {calculator.relative_improvement(0.10, 0.12):.1%}")

# 为不同业务场景推荐MDE
business_contexts = ['high_stakes', 'medium_impact', 'exploratory', 'low_risk']
baseline_rate = 0.08

print(f"\n基于基线转化率 {baseline_rate:.1%} 的MDE推荐:")
for context in business_contexts:
    recommended_mde = calculator.recommend_mde(baseline_rate, context)
    print(f"  {context}: {recommended_mde:.1%}")

2.3 统计分布与抽样变异性

理解抽样分布是掌握样本量计算的基础。中心极限定理告诉我们,无论总体分布如何,样本均值的分布都近似正态分布。

def demonstrate_sampling_distribution():
    """演示抽样分布与样本量的关系"""
    np.random.seed(42)
    
    # 创建一个偏态的总体分布(模拟网站停留时间)
    population = np.random.exponential(300, 10000)  # 均值300秒
    
    sample_sizes = [10, 30, 100, 500]
    n_samples = 1000
    
    plt.figure(figsize=(15, 10))
    
    # 绘制总体分布
    plt.subplot(2, 3, 1)
    plt.hist(population, bins=50, density=True, alpha=0.7)
    plt.title('总体分布 (指数分布)')
    plt.xlabel('停留时间 (秒)')
    plt.ylabel('密度')
    
    # 绘制不同样本量下的抽样分布
    for i, n in enumerate(sample_sizes, 2):
        sample_means = []
        for _ in range(n_samples):
            sample = np.random.choice(population, n, replace=False)
            sample_means.append(np.mean(sample))
        
        plt.subplot(2, 3, i)
        plt.hist(sample_means, bins=30, density=True, alpha=0.7)
        plt.title(f'样本量 n={n}\n均值={np.mean(sample_means):.1f}, 标准差={np.std(sample_means):.1f}')
        plt.xlabel('样本均值')
        plt.ylabel('密度')
        
        # 叠加正态分布曲线
        xmin, xmax = plt.xlim()
        x = np.linspace(xmin, xmax, 100)
        p = stats.norm.pdf(x, np.mean(sample_means), np.std(sample_means))
        plt.plot(x, p, 'r-', linewidth=2)
    
    plt.tight_layout()
    plt.show()

# 运行演示
demonstrate_sampling_distribution()
Lexical error on line 2. Unrecognized text. ...[假设检验框架] --> B[零假设 H₀] A --> C[备择假设 -----------------------^

III. 样本量计算公式推导

3.1 比例检验的样本量计算

比例检验是A/B测试中最常见的场景,比如转化率、点击率等二元指标的比较。

3.1.1 公式推导

对于两个独立样本的比例检验,样本量计算公式为:

[ n = \frac{(z_{1-\alpha/2} \sqrt{2p(1-p)} + z_{1-\beta} \sqrt{p_1(1-p_1) + p_0(1-p_0)})^2}{(p_1 - p_0)^2} ]

其中:

  • ( p_0 ) = 基线比例(对照组)
  • ( p_1 ) = 期望比例(实验组)
  • ( p = \frac{p_0 + p_1}{2} ) = 合并比例
  • ( z_{1-\alpha/2} ) = 标准正态分布的(1-α/2)分位数
  • ( z_{1-\beta} ) = 标准正态分布的(1-β)分位数
import numpy as np
from scipy import stats
import matplotlib.pyplot as plt

class SampleSizeCalculator:
    """样本量计算器基类"""
    
    def __init__(self, alpha=0.05, power=0.8):
        self.alpha = alpha
        self.power = power
        self.z_alpha = stats.norm.ppf(1 - alpha/2)
        self.z_beta = stats.norm.ppf(power)
    
    def calculate_power(self, n, effect_size):
        """计算给定样本量下的统计功效"""
        raise NotImplementedError("子类必须实现此方法")

class ProportionSampleSize(SampleSizeCalculator):
    """比例检验的样本量计算"""
    
    def calculate(self, p0, p1, ratio=1.0):
        """
        计算比例检验所需的样本量
        
        Parameters:
        p0: 对照组比例
        p1: 实验组比例  
        ratio: 实验组与对照组的样本量比例
        """
        # 计算合并比例
        p_pool = (p0 + ratio * p1) / (1 + ratio)
        
        # 计算标准误
        se_pool = np.sqrt(p_pool * (1 - p_pool) * (1 + 1/ratio))
        se_alt = np.sqrt(p0*(1-p0) + p1*(1-p1)/ratio)
        
        effect_size = abs(p1 - p0)
        
        numerator = self.z_alpha * se_pool + self.z_beta * se_alt
        n = (numerator / effect_size) ** 2
        
        return int(np.ceil(n))
    
    def calculate_power(self, n, p0, p1, ratio=1.0):
        """计算给定样本量下的统计功效"""
        # 计算合并比例
        p_pool = (p0 + ratio * p1) / (1 + ratio)
        
        # 计算标准误
        se_pool = np.sqrt(p_pool * (1 - p_pool) * (1 + 1/ratio))
        se_alt = np.sqrt(p0*(1-p0) + p1*(1-p1)/ratio)
        
        effect_size = abs(p1 - p0)
        
        # 计算z统计量
        z_stat = (effect_size - self.z_alpha * se_pool) / se_alt
        
        # 计算统计功效
        power = stats.norm.cdf(z_stat)
        
        return power
    
    def sensitivity_analysis(self, p0, n_per_group, power_threshold=0.8):
        """敏感性分析:检测给定样本量能检测到的最小效应"""
        p1_values = np.linspace(p0, p0 * 2, 1000)
        detectable_effects = []
        
        for p1 in p1_values:
            if p1 <= p0:
                continue
            power = self.calculate_power(n_per_group, p0, p1)
            if power >= power_threshold:
                relative_effect = (p1 - p0) / p0
                detectable_effects.append(relative_effect)
        
        if detectable_effects:
            min_detectable = min(detectable_effects)
            return min_detectable
        else:
            return None

# 使用示例
calculator = ProportionSampleSize(alpha=0.05, power=0.8)

# 基本计算
p0 = 0.10  # 基线转化率 10%
p1 = 0.12  # 期望检测的转化率 12% (20%相对提升)

sample_size = calculator.calculate(p0, p1)
print(f"比例检验样本量计算:")
print(f"基线转化率: {p0:.1%}")
print(f"目标转化率: {p1:.1%}")
print(f"相对提升: {(p1-p0)/p0:.1%}")
print(f"每组所需样本量: {sample_size}")
print(f"总样本量: {sample_size * 2}")

# 计算统计功效
power = calculator.calculate_power(sample_size, p0, p1)
print(f"预计统计功效: {power:.3f}")

# 敏感性分析
min_detectable = calculator.sensitivity_analysis(p0, 1000)
print(f"在1000样本量下可检测的最小相对效应: {min_detectable:.1%}")

3.2 均值检验的样本量计算

对于连续变量的比较,如平均订单价值、用户停留时间等,我们需要使用均值检验的样本量公式。

3.2.1 公式推导

两个独立样本均值检验的样本量公式:

[ n = \frac{(z_{1-\alpha/2} + z_{1-\beta})^2 \times 2\sigma^2}{(\mu_1 - \mu_0)^2} ]

其中:

  • ( \mu_0 ) = 对照组均值
  • ( \mu_1 ) = 实验组均值
  • ( \sigma ) = 总体标准差(假设两组方差相等)
class MeanSampleSize(SampleSizeCalculator):
    """均值检验的样本量计算"""
    
    def calculate(self, mu0, mu1, sigma, ratio=1.0):
        """
        计算均值检验所需的样本量
        
        Parameters:
        mu0: 对照组均值
        mu1: 实验组均值
        sigma: 总体标准差
        ratio: 实验组与对照组的样本量比例
        """
        effect_size = abs(mu1 - mu0)
        
        numerator = (self.z_alpha + self.z_beta) ** 2
        denominator = (effect_size / sigma) ** 2
        
        n = numerator / denominator * (1 + 1/ratio)
        
        return int(np.ceil(n))
    
    def calculate_power(self, n, mu0, mu1, sigma, ratio=1.0):
        """计算给定样本量下的统计功效"""
        effect_size = abs(mu1 - mu0)
        
        # 计算非中心化参数
        noncentrality = effect_size / (sigma * np.sqrt((1 + 1/ratio) / n))
        
        # 计算临界值
        critical_value = stats.t.ppf(1 - self.alpha/2, n * (1 + ratio) - 2)
        
        # 计算统计功效
        power = 1 - stats.t.cdf(critical_value, n * (1 + ratio) - 2, noncentrality)
        
        return power
    
    def calculate_using_cohens_d(self, cohens_d, ratio=1.0):
        """使用Cohen's d计算样本量"""
        numerator = (self.z_alpha + self.z_beta) ** 2
        n = numerator / (cohens_d ** 2) * (1 + 1/ratio)
        
        return int(np.ceil(n))

# 均值检验示例
mean_calculator = MeanSampleSize(alpha=0.05, power=0.8)

# 场景:平均订单价值测试
mu0 = 100  # 当前平均订单价值 100元
mu1 = 110  # 期望检测的平均订单价值 110元
sigma = 30  # 历史数据的标准差

sample_size = mean_calculator.calculate(mu0, mu1, sigma)
print(f"\n均值检验样本量计算:")
print(f"当前均值: {mu0}")
print(f"目标均值: {mu1}")
print(f"标准差: {sigma}")
print(f"Cohen's d: {abs(mu1-mu0)/sigma:.3f}")
print(f"每组所需样本量: {sample_size}")

# 使用Cohen's d计算
cohens_d = 0.3  # 中等效应大小
sample_size_d = mean_calculator.calculate_using_cohens_d(cohens_d)
print(f"对于Cohen's d = {cohens_d},每组所需样本量: {sample_size_d}")

3.3 方差分析(ANOVA)的样本量计算

当比较多个组时(如A/B/C测试),我们需要使用ANOVA的样本量计算方法。

class AnovaSampleSize:
    """ANOVA样本量计算"""
    
    def __init__(self, alpha=0.05, power=0.8):
        self.alpha = alpha
        self.power = power
    
    def calculate(self, k, effect_size, groups_equal=True):
        """
        计算单因素ANOVA所需的样本量
        
        Parameters:
        k: 组数
        effect_size: Cohen's f 效应大小
        groups_equal: 是否各组样本量相等
        """
        from statsmodels.stats.power import FTestAnovaPower
        
        power_analysis = FTestAnovaPower()
        
        if groups_equal:
            # 各组样本量相等
            n_per_group = power_analysis.solve_power(
                effect_size=effect_size,
                nobs=None,
                alpha=self.alpha,
                power=self.power,
                k_groups=k
            )
            return int(np.ceil(n_per_group))
        else:
            # 这里简化处理,实际应用中需要更复杂的计算
            n_per_group = power_analysis.solve_power(
                effect_size=effect_size,
                nobs=None, 
                alpha=self.alpha,
                power=self.power,
                k_groups=k
            )
            return int(np.ceil(n_per_group))
    
    def calculate_effect_size_f(self, means, sigma):
        """计算Cohen's f效应大小"""
        grand_mean = np.mean(means)
        between_variance = np.var(means)
        f = np.sqrt(between_variance) / sigma
        return f

# ANOVA示例
anova_calculator = AnovaSampleSize(alpha=0.05, power=0.8)

# 场景:三个不同定价策略的测试
k = 3  # 三个组
effect_size_f = 0.25  # 中等效应大小

n_per_group = anova_calculator.calculate(k, effect_size_f)
print(f"\nANOVA样本量计算:")
print(f"组数: {k}")
print(f"Cohen's f: {effect_size_f}")
print(f"每组所需样本量: {n_per_group}")
print(f"总样本量: {n_per_group * k}")

# 从均值计算效应大小
means = [100, 110, 105]  # 三组的均值
sigma = 15  # 组内标准差

effect_size_calculated = anova_calculator.calculate_effect_size_f(means, sigma)
print(f"基于均值和标准差计算的Cohen's f: {effect_size_calculated:.3f}")

3.4 公式可视化与比较

def visualize_sample_size_relationships():
    """可视化样本量与各种参数的关系"""
    
    # 创建图形
    fig, axes = plt.subplots(2, 2, figsize=(15, 12))
    
    # 1. 样本量 vs 效应大小
    effect_sizes = np.linspace(0.05, 0.3, 50)
    baseline = 0.10
    sample_sizes_effect = []
    
    calculator = ProportionSampleSize()
    for es in effect_sizes:
        p1 = baseline * (1 + es)
        n = calculator.calculate(baseline, p1)
        sample_sizes_effect.append(n)
    
    axes[0, 0].plot(effect_sizes, sample_sizes_effect, 'b-', linewidth=2)
    axes[0, 0].set_xlabel('相对效应大小')
    axes[0, 0].set_ylabel('所需样本量 (每组)')
    axes[0, 0].set_title('样本量 vs 效应大小')
    axes[0, 0].grid(True, alpha=0.3)
    
    # 2. 样本量 vs 统计功效
    powers = np.linspace(0.5, 0.95, 50)
    sample_sizes_power = []
    
    for power in powers:
        calculator_power = ProportionSampleSize(power=power)
        n = calculator_power.calculate(0.10, 0.12)
        sample_sizes_power.append(n)
    
    axes[0, 1].plot(powers, sample_sizes_power, 'r-', linewidth=2)
    axes[0, 1].set_xlabel('统计功效')
    axes[0, 1].set_ylabel('所需样本量 (每组)')
    axes[0, 1].set_title('样本量 vs 统计功效')
    axes[0, 1].grid(True, alpha=0.3)
    
    # 3. 样本量 vs 显著性水平
    alphas = np.linspace(0.01, 0.1, 50)
    sample_sizes_alpha = []
    
    for alpha in alphas:
        calculator_alpha = ProportionSampleSize(alpha=alpha)
        n = calculator_alpha.calculate(0.10, 0.12)
        sample_sizes_alpha.append(n)
    
    axes[1, 0].plot(alphas, sample_sizes_alpha, 'g-', linewidth=2)
    axes[1, 0].set_xlabel('显著性水平 (α)')
    axes[1, 0].set_ylabel('所需样本量 (每组)')
    axes[1, 0].set_title('样本量 vs 显著性水平')
    axes[1, 0].grid(True, alpha=0.3)
    
    # 4. 样本量 vs 基线转化率
    baselines = np.linspace(0.01, 0.5, 50)
    sample_sizes_baseline = []
    
    for baseline in baselines:
        n = calculator.calculate(baseline, baseline * 1.2)  # 20%提升
        sample_sizes_baseline.append(n)
    
    axes[1, 1].plot(baselines, sample_sizes_baseline, 'purple', linewidth=2)
    axes[1, 1].set_xlabel('基线转化率')
    axes[1, 1].set_ylabel('所需样本量 (每组)')
    axes[1, 1].set_title('样本量 vs 基线转化率 (20%提升)')
    axes[1, 1].grid(True, alpha=0.3)
    
    plt.tight_layout()
    plt.show()

# 运行可视化
visualize_sample_size_relationships()
Lexical error on line 14. Unrecognized text. ... B2 --> B21[基线比例 p₀] B2 --> B22[目标 -----------------------^

IV. 实际应用与代码实现

4.1 完整的样本量计算系统

在实际业务中,我们需要一个健壮的样本量计算系统来处理各种复杂场景。

import pandas as pd
from datetime import datetime, timedelta

class AdvancedSampleSizeCalculator:
    """高级样本量计算系统"""
    
    def __init__(self, alpha=0.05, power=0.8):
        self.alpha = alpha
        self.power = power
        self.proportion_calc = ProportionSampleSize(alpha, power)
        self.mean_calc = MeanSampleSize(alpha, power)
        self.anova_calc = AnovaSampleSize(alpha, power)
        
        # 缓存计算结果
        self.calculation_history = []
    
    def calculate_for_proportion(self, baseline_rate, mde, ratio=1.0, 
                               test_type='two_sided', adjustment=None):
        """比例检验的样本量计算"""
        
        p1 = baseline_rate * (1 + mde)
        
        # 基本计算
        sample_size = self.proportion_calc.calculate(baseline_rate, p1, ratio)
        
        # 应用调整
        if adjustment == 'continuity':
            # 连续性校正
            sample_size = int(sample_size * 1.05)
        elif adjustment == 'multiple_comparisons':
            # 多重比较校正 (Bonferroni)
            sample_size = int(sample_size * 1.1)
        
        # 记录计算历史
        calculation_record = {
            'timestamp': datetime.now(),
            'test_type': 'proportion',
            'baseline': baseline_rate,
            'mde': mde,
            'sample_size': sample_size,
            'parameters': {
                'alpha': self.alpha,
                'power': self.power,
                'ratio': ratio
            }
        }
        self.calculation_history.append(calculation_record)
        
        return sample_size
    
    def calculate_for_means(self, baseline_mean, mde, std, ratio=1.0):
        """均值检验的样本量计算"""
        target_mean = baseline_mean * (1 + mde)
        sample_size = self.mean_calc.calculate(baseline_mean, target_mean, std, ratio)
        
        calculation_record = {
            'timestamp': datetime.now(),
            'test_type': 'means',
            'baseline': baseline_mean,
            'mde': mde,
            'std': std,
            'sample_size': sample_size,
            'parameters': {
                'alpha': self.alpha,
                'power': self.power,
                'ratio': ratio
            }
        }
        self.calculation_history.append(calculation_record)
        
        return sample_size
    
    def calculate_experiment_duration(self, sample_size_per_variant, daily_traffic, 
                                    variants=2, traffic_allocation=1.0):
        """计算实验所需时长"""
        total_sample_size = sample_size_per_variant * variants
        available_daily_traffic = daily_traffic * traffic_allocation
        
        if available_daily_traffic == 0:
            raise ValueError("日流量不能为0")
        
        duration_days = total_sample_size / available_daily_traffic
        return max(1, int(np.ceil(duration_days)))
    
    def power_analysis(self, test_type, **kwargs):
        """功效分析:计算不同样本量下的统计功效"""
        sample_sizes = range(100, 10000, 100)
        powers = []
        
        if test_type == 'proportion':
            p0 = kwargs['baseline_rate']
            p1 = p0 * (1 + kwargs['mde'])
            
            for n in sample_sizes:
                power = self.proportion_calc.calculate_power(n, p0, p1)
                powers.append(power)
        
        elif test_type == 'means':
            mu0 = kwargs['baseline_mean']
            mu1 = mu0 * (1 + kwargs['mde'])
            sigma = kwargs['std']
            
            for n in sample_sizes:
                power = self.mean_calc.calculate_power(n, mu0, mu1, sigma)
                powers.append(power)
        
        return list(sample_sizes), powers
    
    def generate_report(self, test_type, **kwargs):
        """生成详细的样本量分析报告"""
        if test_type == 'proportion':
            baseline = kwargs['baseline_rate']
            mde = kwargs['mde']
            sample_size = self.calculate_for_proportion(baseline, mde)
            duration = self.calculate_experiment_duration(sample_size, kwargs.get('daily_traffic', 1000))
            
            # 功效分析
            sample_sizes, powers = self.power_analysis('proportion', 
                                                     baseline_rate=baseline, mde=mde)
            
        elif test_type == 'means':
            baseline = kwargs['baseline_mean']
            mde = kwargs['mde']
            std = kwargs['std']
            sample_size = self.calculate_for_means(baseline, mde, std)
            duration = self.calculate_experiment_duration(sample_size, kwargs.get('daily_traffic', 1000))
            
            # 功效分析
            sample_sizes, powers = self.power_analysis('means',
                                                     baseline_mean=baseline, mde=mde, std=std)
        
        # 生成报告
        report = {
            'test_type': test_type,
            'parameters': kwargs,
            'sample_size_per_variant': sample_size,
            'total_sample_size': sample_size * 2,
            'estimated_duration_days': duration,
            'power_analysis': {
                'sample_sizes': sample_sizes,
                'powers': powers
            }
        }
        
        return report

# 使用高级计算系统
advanced_calc = AdvancedSampleSizeCalculator(alpha=0.05, power=0.8)

# 比例检验示例
prop_report = advanced_calc.generate_report(
    'proportion',
    baseline_rate=0.08,
    mde=0.15,  # 15%相对提升
    daily_traffic=2000
)

print("比例检验样本量报告:")
print(f"基线转化率: {prop_report['parameters']['baseline_rate']:.1%}")
print(f"目标相对提升: {prop_report['parameters']['mde']:.1%}")
print(f"每组样本量: {prop_report['sample_size_per_variant']}")
print(f"总样本量: {prop_report['total_sample_size']}")
print(f"预计实验时长: {prop_report['estimated_duration_days']} 天")

# 均值检验示例
mean_report = advanced_calc.generate_report(
    'means',
    baseline_mean=100,
    mde=0.10,  # 10%提升
    std=25,
    daily_traffic=1500
)

print("\n均值检验样本量报告:")
print(f"基线均值: {mean_report['parameters']['baseline_mean']}")
print(f"目标相对提升: {mean_report['parameters']['mde']:.1%}")
print(f"标准差: {mean_report['parameters']['std']}")
print(f"每组样本量: {mean_report['sample_size_per_variant']}")
print(f"总样本量: {mean_report['total_sample_size']}")
print(f"预计实验时长: {mean_report['estimated_duration_days']} 天")

4.2 交互式样本量计算工具

创建一个用户友好的交互式工具,方便业务团队使用。

import ipywidgets as widgets
from IPython.display import display, HTML
import plotly.graph_objects as go
from plotly.subplots import make_subplots

class InteractiveSampleSizeCalculator:
    """交互式样本量计算工具"""
    
    def __init__(self):
        self.calculator = AdvancedSampleSizeCalculator()
        
        # 创建控件
        self.test_type = widgets.Dropdown(
            options=['比例检验', '均值检验'],
            value='比例检验',
            description='测试类型:'
        )
        
        self.baseline = widgets.FloatSlider(
            value=0.10,
            min=0.01,
            max=0.50,
            step=0.01,
            description='基线值:',
            readout_format='.1%' if self.test_type.value == '比例检验' else '.1f'
        )
        
        self.mde = widgets.FloatSlider(
            value=0.10,
            min=0.05,
            max=0.50,
            step=0.05,
            description='MDE:',
            readout_format='.1%'
        )
        
        self.alpha = widgets.FloatSlider(
            value=0.05,
            min=0.01,
            max=0.10,
            step=0.01,
            description='α:',
            readout_format='.3f'
        )
        
        self.power = widgets.FloatSlider(
            value=0.80,
            min=0.50,
            max=0.95,
            step=0.05,
            description='功效:',
            readout_format='.2f'
        )
        
        self.daily_traffic = widgets.IntSlider(
            value=1000,
            min=100,
            max=10000,
            step=100,
            description='日流量:'
        )
        
        # 均值检验特有控件
        self.std = widgets.FloatSlider(
            value=25,
            min=1,
            max=100,
            step=5,
            description='标准差:'
        )
        
        # 输出控件
        self.output = widgets.Output()
        
        # 绑定事件
        self.test_type.observe(self.on_test_type_change, names='value')
        self.alpha.observe(self.on_parameter_change, names='value')
        self.power.observe(self.on_parameter_change, names='value')
        self.baseline.observe(self.on_parameter_change, names='value')
        self.mde.observe(self.on_parameter_change, names='value')
        self.daily_traffic.observe(self.on_parameter_change, names='value')
        self.std.observe(self.on_parameter_change, names='value')
    
    def on_test_type_change(self, change):
        """测试类型改变时的回调"""
        if change['new'] == '比例检验':
            self.baseline.max = 0.50
            self.baseline.readout_format = '.1%'
            self.baseline.value = 0.10
        else:
            self.baseline.max = 200
            self.baseline.readout_format = '.1f'
            self.baseline.value = 100
        
        self.update_calculation()
    
    def on_parameter_change(self, change):
        """参数改变时的回调"""
        self.update_calculation()
    
    def update_calculation(self):
        """更新计算结果显示"""
        with self.output:
            self.output.clear_output()
            
            # 更新计算器参数
            self.calculator.alpha = self.alpha.value
            self.calculator.power = self.power.value
            
            if self.test_type.value == '比例检验':
                sample_size = self.calculator.calculate_for_proportion(
                    self.baseline.value, self.mde.value
                )
                duration = self.calculator.calculate_experiment_duration(
                    sample_size, self.daily_traffic.value
                )
                
                # 显示结果
                display(HTML(f"""
                <div style="background-color:#f0f8ff; padding:15px; border-radius:5px; margin:10px 0;">
                    <h3>📊 样本量计算结果</h3>
                    <p><b>测试类型:</b> 比例检验</p>
                    <p><b>基线转化率:</b> {self.baseline.value:.1%}</p>
                    <p><b>最小可检测效应(MDE):</b> {self.mde.value:.1%}</p>
                    <p><b>显著性水平(α):</b> {self.alpha.value}</p>
                    <p><b>统计功效:</b> {self.power.value:.0%}</p>
                    <hr>
                    <p><b>每组所需样本量:</b> <span style="color:blue; font-size:1.2em">{sample_size:,}</span></p>
                    <p><b>总样本量:</b> <span style="color:green; font-size:1.2em">{sample_size * 2:,}</span></p>
                    <p><b>预计实验时长:</b> <span style="color:orange; font-size:1.2em">{duration} 天</span></p>
                </div>
                """))
                
                # 绘制功效分析图
                self.plot_power_analysis('proportion')
                
            else:
                sample_size = self.calculator.calculate_for_means(
                    self.baseline.value, self.mde.value, self.std.value
                )
                duration = self.calculator.calculate_experiment_duration(
                    sample_size, self.daily_traffic.value
                )
                
                # 显示结果
                display(HTML(f"""
                <div style="background-color:#f0f8ff; padding:15px; border-radius:5px; margin:10px 0;">
                    <h3>📊 样本量计算结果</h3>
                    <p><b>测试类型:</b> 均值检验</p>
                    <p><b>基线均值:</b> {self.baseline.value:.1f}</p>
                    <p><b>最小可检测效应(MDE):</b> {self.mde.value:.1%}</p>
                    <p><b>标准差:</b> {self.std.value:.1f}</p>
                    <p><b>显著性水平(α):</b> {self.alpha.value}</p>
                    <p><b>统计功效:</b> {self.power.value:.0%}</p>
                    <hr>
                    <p><b>每组所需样本量:</b> <span style="color:blue; font-size:1.2em">{sample_size:,}</span></p>
                    <p><b>总样本量:</b> <span style="color:green; font-size:1.2em">{sample_size * 2:,}</span></p>
                    <p><b>预计实验时长:</b> <span style="color:orange; font-size:1.2em">{duration} 天</span></p>
                </div>
                """))
                
                # 绘制功效分析图
                self.plot_power_analysis('means')
    
    def plot_power_analysis(self, test_type):
        """绘制功效分析图"""
        if test_type == 'proportion':
            sample_sizes, powers = self.calculator.power_analysis(
                'proportion',
                baseline_rate=self.baseline.value,
                mde=self.mde.value
            )
        else:
            sample_sizes, powers = self.calculator.power_analysis(
                'means',
                baseline_mean=self.baseline.value,
                mde=self.mde.value,
                std=self.std.value
            )
        
        # 使用plotly创建交互式图表
        fig = go.Figure()
        
        fig.add_trace(go.Scatter(
            x=sample_sizes,
            y=powers,
            mode='lines',
            name='统计功效',
            line=dict(color='blue', width=3)
        ))
        
        # 添加目标功效线
        fig.add_hline(y=self.power.value, line_dash="dash", 
                     line_color="red", annotation_text=f"目标功效 ({self.power.value:.0%})")
        
        # 计算当前设置下的样本量
        if test_type == 'proportion':
            current_sample_size = self.calculator.calculate_for_proportion(
                self.baseline.value, self.mde.value
            )
        else:
            current_sample_size = self.calculator.calculate_for_means(
                self.baseline.value, self.mde.value, self.std.value
            )
        
        # 添加当前样本量标记
        current_power = None
        for size, power in zip(sample_sizes, powers):
            if size >= current_sample_size:
                current_power = power
                break
        
        if current_power:
            fig.add_trace(go.Scatter(
                x=[current_sample_size],
                y=[current_power],
                mode='markers',
                marker=dict(color='red', size=10),
                name=f'当前设置 (n={current_sample_size})'
            ))
        
        fig.update_layout(
            title='统计功效 vs 样本量',
            xaxis_title='样本量 (每组)',
            yaxis_title='统计功效',
            showlegend=True,
            height=400
        )
        
        fig.show()
    
    def display(self):
        """显示交互式界面"""
        # 根据测试类型显示不同的控件
        if self.test_type.value == '比例检验':
            controls = widgets.VBox([
                self.test_type,
                self.baseline,
                self.mde,
                self.alpha,
                self.power,
                self.daily_traffic
            ])
        else:
            controls = widgets.VBox([
                self.test_type,
                self.baseline,
                self.mde,
                self.std,
                self.alpha,
                self.power,
                self.daily_traffic
            ])
        
        # 显示界面
        display(widgets.HBox([controls, self.output]))
        
        # 初始计算
        self.update_calculation()

# 使用交互式工具
# interactive_calc = InteractiveSampleSizeCalculator()
# interactive_calc.display()

4.3 实际业务场景应用

让我们通过几个真实的业务场景来演示样本量计算的应用。

def business_scenario_analysis():
    """业务场景分析"""
    
    calculator = AdvancedSampleSizeCalculator()
    
    scenarios = [
        {
            'name': '电商注册流程优化',
            'type': 'proportion',
            'baseline': 0.08,
            'mde': 0.10,
            'daily_traffic': 5000,
            'business_impact': '高',
            'description': '优化注册表单,期望提升10%的注册转化率'
        },
        {
            'name': '内容推荐算法改进',
            'type': 'proportion', 
            'baseline': 0.15,
            'mde': 0.05,
            'daily_traffic': 10000,
            'business_impact': '中',
            'description': '改进推荐算法,期望提升5%的点击率'
        },
        {
            'name': '定价策略测试',
            'type': 'means',
            'baseline': 120,
            'mde': 0.08,
            'std': 40,
            'daily_traffic': 2000,
            'business_impact': '高',
            'description': '测试新的定价策略,期望提升8%的平均订单价值'
        },
        {
            'name': '页面加载速度优化',
            'type': 'means',
            'baseline': 3.5,
            'mde': 0.15,
            'std': 1.2,
            'daily_traffic': 8000,
            'business_impact': '中',
            'description': '优化页面加载速度,期望减少15%的加载时间'
        }
    ]
    
    results = []
    
    for scenario in scenarios:
        if scenario['type'] == 'proportion':
            sample_size = calculator.calculate_for_proportion(
                scenario['baseline'], scenario['mde']
            )
        else:
            sample_size = calculator.calculate_for_means(
                scenario['baseline'], scenario['mde'], scenario['std']
            )
        
        duration = calculator.calculate_experiment_duration(
            sample_size, scenario['daily_traffic']
        )
        
        results.append({
            '场景': scenario['name'],
            '描述': scenario['description'],
            '业务影响': scenario['business_impact'],
            '每组样本量': f"{sample_size:,}",
            '总样本量': f"{sample_size * 2:,}",
            '预计时长(天)': duration,
            '日流量': f"{scenario['daily_traffic']:,}"
        })
    
    # 创建结果表格
    df_results = pd.DataFrame(results)
    
    # 样式设置
    def highlight_high_impact(row):
        if row['业务影响'] == '高':
            return ['background-color: #ffcccc'] * len(row)
        else:
            return [''] * len(row)
    
    styled_df = df_results.style.apply(highlight_high_impact, axis=1)
    
    return styled_df

# 运行业务场景分析
styled_results = business_scenario_analysis()
print("业务场景样本量分析")
styled_results
实际应用场景
样本量计算系统
交互式工具
业务场景分析
比例检验
均值检验
方差分析
功效分析
参数调节
实时计算
可视化展示
电商场景
内容推荐
定价测试
性能优化
转化率优化
收入指标测试
多版本比较
敏感性分析
基线值调节
MDE设置
显著性水平
统计功效
注册流程
点击率提升
平均订单价值
用户体验指标
【声明】本内容来自华为云开发者社区博主,不代表华为云及华为云开发者社区的观点和立场。转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息,否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。