多变量测试实战:从实验设计到结果分析
【摘要】 I. 多变量测试基础概念 1.1 什么是多变量测试多变量测试是一种同时测试多个独立变量(因素)对某个或多个依赖变量(指标)影响的实验方法。与A/B测试只比较单个变量的两个版本不同,MVT允许我们研究多个变量的交互作用。 1.1.1 MVT与A/B测试的关键区别特性A/B测试多变量测试测试变量数量单个变量,两个版本多个变量,每个变量多个水平实验复杂度简单复杂所需样本量相对较少显著更多检测能力...
I. 多变量测试基础概念
1.1 什么是多变量测试
多变量测试是一种同时测试多个独立变量(因素)对某个或多个依赖变量(指标)影响的实验方法。与A/B测试只比较单个变量的两个版本不同,MVT允许我们研究多个变量的交互作用。
1.1.1 MVT与A/B测试的关键区别
特性 | A/B测试 | 多变量测试 |
---|---|---|
测试变量数量 | 单个变量,两个版本 | 多个变量,每个变量多个水平 |
实验复杂度 | 简单 | 复杂 |
所需样本量 | 相对较少 | 显著更多 |
检测能力 | 只能检测主效应 | 能检测主效应和交互效应 |
适用场景 | 单一元素优化 | 整体体验优化 |
1.2 多变量测试的核心价值
多变量测试的主要价值在于能够发现因素之间的交互作用——即某些因素组合在一起时产生的效果会大于或小于各自独立效果之和。
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from itertools import product
class MVTFoundation:
"""多变量测试基础概念演示"""
def __init__(self):
self.factors = {}
self.interaction_effects = {}
def demonstrate_interaction_effect(self):
"""演示交互效应的概念"""
# 创建模拟数据
np.random.seed(42)
# 定义两个因素:按钮颜色和文案
button_colors = ['蓝色', '红色']
button_texts = ['立即购买', '加入购物车']
# 基础转化率
base_conversion = 0.10
# 主效应
color_effect = {'蓝色': 0, '红色': 0.02} # 红色提升2%
text_effect = {'立即购买': 0, '加入购物车': 0.01} # 加入购物车提升1%
# 交互效应:红色按钮+加入购物车有额外提升
interaction_effect = 0.03
results = []
for color in button_colors:
for text in button_texts:
# 计算期望转化率
expected_rate = (base_conversion +
color_effect[color] +
text_effect[text])
# 添加交互效应
if color == '红色' and text == '加入购物车':
expected_rate += interaction_effect
# 模拟数据(每组1000个用户)
n_users = 1000
conversions = np.random.binomial(n_users, expected_rate)
conversion_rate = conversions / n_users
results.append({
'按钮颜色': color,
'按钮文案': text,
'期望转化率': expected_rate,
'实际转化率': conversion_rate,
' conversions': conversions,
'visitors': n_users
})
df_results = pd.DataFrame(results)
# 可视化交互效应
self.plot_interaction_effects(df_results)
return df_results
def plot_interaction_effects(self, df):
"""绘制交互效应图"""
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(15, 6))
# 主效应图
color_means = df.groupby('按钮颜色')['实际转化率'].mean()
text_means = df.groupby('按钮文案')['实际转化率'].mean()
ax1.bar(color_means.index, color_means.values, alpha=0.7, label='按钮颜色')
ax1.set_xlabel('按钮颜色')
ax1.set_ylabel('平均转化率')
ax1.set_title('按钮颜色的主效应')
# 交互效应图
pivot_data = df.pivot(index='按钮颜色', columns='按钮文案', values='实际转化率')
x = np.arange(len(pivot_data.index))
width = 0.35
ax2.bar(x - width/2, pivot_data.iloc[:, 0], width, label=pivot_data.columns[0])
ax2.bar(x + width/2, pivot_data.iloc[:, 1], width, label=pivot_data.columns[1])
ax2.set_xlabel('按钮颜色')
ax2.set_ylabel('转化率')
ax2.set_title('按钮颜色与文案的交互效应')
ax2.set_xticks(x)
ax2.set_xticklabels(pivot_data.index)
ax2.legend()
plt.tight_layout()
plt.show()
print("交互效应分析:")
for _, row in df.iterrows():
print(f"{row['按钮颜色']} + {row['按钮文案']}: {row['实际转化率']:.3f}")
# 检测交互效应
blue_buy = df[(df['按钮颜色'] == '蓝色') & (df['按钮文案'] == '立即购买')]['实际转化率'].iloc[0]
blue_cart = df[(df['按钮颜色'] == '蓝色') & (df['按钮文案'] == '加入购物车')]['实际转化率'].iloc[0]
red_buy = df[(df['按钮颜色'] == '红色') & (df['按钮文案'] == '立即购买')]['实际转化率'].iloc[0]
red_cart = df[(df['按钮颜色'] == '红色') & (df['按钮文案'] == '加入购物车')]['实际转化率'].iloc[0]
interaction = (red_cart - blue_cart) - (red_buy - blue_buy)
print(f"\n交互效应大小: {interaction:.4f}")
return interaction
# 演示基础概念
foundation = MVTFoundation()
interaction_results = foundation.demonstrate_interaction_effect()
1.3 多变量测试的适用场景
多变量测试特别适合以下场景:
场景类型 | 具体案例 | 优势 |
---|---|---|
页面整体优化 | 登录页、产品详情页、购物车页 | 同时优化多个元素,发现最佳组合 |
新功能发布 | 复杂功能的多配置测试 | 快速找到最优配置方案 |
用户体验研究 | 理解不同设计元素的相互影响 | 发现意外的交互效应 |
营销活动优化 | 多渠道、多创意的活动测试 | 最大化活动效果 |
II. 实验设计方法论
2.1 全因子设计
全因子设计是多变量测试中最直接的方法,测试所有可能的因素组合。
2.1.1 全因子设计的实现
class FullFactorialDesign:
"""全因子实验设计"""
def __init__(self, factors):
"""
初始化全因子设计
Parameters:
factors: dict, 因素和水平的字典
例如: {'颜色': ['红', '蓝'], '大小': ['大', '小']}
"""
self.factors = factors
self.factor_names = list(factors.keys())
self.levels = {name: levels for name, levels in factors.items()}
# 生成所有组合
self.combinations = list(product(*factors.values()))
self.n_combinations = len(self.combinations)
# 为每个组合生成名称
self.combination_names = [
"_".join(f"{name}_{level}" for name, level in zip(factors.keys(), combo))
for combo in self.combinations
]
print(f"实验设计: {len(self.factor_names)}个因素, {self.n_combinations}种组合")
def generate_design_matrix(self):
"""生成设计矩阵"""
design_data = []
for i, combo in enumerate(self.combinations):
combination_dict = {name: level for name, level in zip(self.factor_names, combo)}
combination_dict['combination_id'] = f"comb_{i+1}"
combination_dict['combination_name'] = self.combination_names[i]
design_data.append(combination_dict)
design_df = pd.DataFrame(design_data)
return design_df
def calculate_required_sample_size(self, baseline_rate, detectable_effect,
alpha=0.05, power=0.8):
"""计算所需样本量"""
from statsmodels.stats.power import FTestAnovaPower
import math
# 对于多变量测试,我们需要考虑多重比较
n_comparisons = self.n_combinations - 1
adjusted_alpha = alpha / n_comparisons # Bonferroni校正
# 计算效应大小 (Cohen's f)
# 这里简化计算,实际应该基于可检测的效应
effect_size = detectable_effect / baseline_rate
power_analysis = FTestAnovaPower()
# 计算每组所需样本量
n_per_group = power_analysis.solve_power(
effect_size=effect_size,
nobs=None,
alpha=adjusted_alpha,
power=power,
k_groups=self.n_combinations
)
total_sample_size = n_per_group * self.n_combinations
return {
'n_per_group': int(math.ceil(n_per_group)),
'total_sample_size': int(math.ceil(total_sample_size)),
'adjusted_alpha': adjusted_alpha,
'effect_size': effect_size
}
def assign_users_to_combinations(self, user_ids, weights=None):
"""将用户分配到不同的组合"""
if weights is None:
# 默认均匀分配
weights = [1/self.n_combinations] * self.n_combinations
# 确保权重总和为1
weights = np.array(weights) / np.sum(weights)
assignments = []
design_df = self.generate_design_matrix()
for user_id in user_ids:
# 随机分配,但保持权重比例
combination_idx = np.random.choice(range(self.n_combinations), p=weights)
combination_info = design_df.iloc[combination_idx].to_dict()
combination_info['user_id'] = user_id
assignments.append(combination_info)
return pd.DataFrame(assignments)
# 全因子设计示例
print("全因子设计示例")
# 定义测试因素
factors = {
'按钮颜色': ['蓝色', '绿色', '红色'],
'按钮大小': ['小', '中', '大'],
'按钮文字': ['立即购买', '加入购物车', '马上抢购'],
'图标显示': ['无图标', '购物车图标', '星星图标']
}
# 创建实验设计
design = FullFactorialDesign(factors)
# 生成设计矩阵
design_matrix = design.generate_design_matrix()
print(f"\n前5种实验组合:")
print(design_matrix.head())
# 计算所需样本量
sample_size_info = design.calculate_required_sample_size(
baseline_rate=0.10,
detectable_effect=0.02, # 2%的绝对提升
alpha=0.05,
power=0.8
)
print(f"\n样本量计算:")
print(f"每组所需样本量: {sample_size_info['n_per_group']}")
print(f"总样本量: {sample_size_info['total_sample_size']}")
print(f"调整后的显著性水平: {sample_size_info['adjusted_alpha']:.6f}")
# 模拟用户分配
user_ids = [f"user_{i}" for i in range(1000)]
user_assignments = design.assign_users_to_combinations(user_ids)
print(f"\n用户分配示例:")
print(user_assignments.head())
2.2 部分因子设计
当因素数量较多时,全因子设计的组合数量会急剧增加。部分因子设计通过精心选择部分组合来减少实验规模,同时保持对主效应和重要交互效应的检测能力。
2.2.1 正交实验设计
class FractionalFactorialDesign:
"""部分因子实验设计"""
def __init__(self, factors, resolution='V'):
"""
初始化部分因子设计
Parameters:
factors: dict, 因素和水平的字典
resolution: str, 设计分辨率 ('III', 'IV', 'V', 等)
"""
self.factors = factors
self.resolution = resolution
self.factor_names = list(factors.keys())
# 检查因素水平数 (部分因子设计通常要求2水平)
level_counts = [len(levels) for levels in factors.values()]
if not all(count == 2 for count in level_counts):
print("警告: 部分因子设计通常要求所有因素为2水平")
self.design_matrix = self.generate_fractional_design()
def generate_fractional_design(self):
"""生成部分因子设计矩阵"""
try:
from pyDOE2 import fracfact
except ImportError:
print("请安装pyDOE2: pip install pyDOE2")
return None
# 生成生成器(这需要根据具体设计进行调整)
# 这里使用简化的方法
n_factors = len(self.factors)
# 创建基础设计
if n_factors <= 3:
generator = 'a b c'
design_size = 4
elif n_factors <= 7:
generator = 'a b c d ab ac bc abc'[:n_factors*2-1]
design_size = 8
else:
generator = 'a b c d e f g ab ac ad ae af ag'[:n_factors*2-1]
design_size = 16
try:
# 生成设计
design_array = fracfact(generator)
# 转换为DataFrame
design_data = []
factor_levels = {name: levels for name, levels in self.factors.items()}
for i, run in enumerate(design_array):
run_dict = {}
for j, factor_name in enumerate(self.factor_names[:len(run)]):
level_idx = 0 if run[j] == -1 else 1
run_dict[factor_name] = factor_levels[factor_name][level_idx]
run_dict['run_id'] = f"run_{i+1}"
design_data.append(run_dict)
design_df = pd.DataFrame(design_data)
print(f"部分因子设计: {len(design_df)}次实验 (全因子需要{2**n_factors}次)")
return design_df
except Exception as e:
print(f"生成设计时出错: {e}")
# 回退到手动创建简化设计
return self.create_simple_fractional_design()
def create_simple_fractional_design(self):
"""创建简化的部分因子设计"""
print("创建简化部分因子设计")
# 手动选择有代表性的组合
# 这里使用正交数组的思想选择组合
factor_combinations = []
# 基础组合
base_levels = {name: levels[0] for name, levels in self.factors.items()}
# 添加主要的主效应测试组合
for i, factor_name in enumerate(self.factor_names):
combo = base_levels.copy()
combo[factor_name] = self.factors[factor_name][1] # 改变这个因素
factor_combinations.append(combo)
# 添加一些交互效应测试组合
if len(self.factor_names) >= 2:
for i in range(min(3, len(self.factor_names))):
for j in range(i+1, min(4, len(self.factor_names))):
combo = base_levels.copy()
combo[self.factor_names[i]] = self.factors[self.factor_names[i]][1]
combo[self.factor_names[j]] = self.factors[self.factor_names[j]][1]
factor_combinations.append(combo)
# 添加全高水平和全低水平
all_low = {name: levels[0] for name, levels in self.factors.items()}
all_high = {name: levels[1] for name, levels in self.factors.items()}
factor_combinations.extend([all_low, all_high])
# 去重
unique_combinations = []
seen = set()
for combo in factor_combinations:
combo_key = tuple(sorted(combo.items()))
if combo_key not in seen:
seen.add(combo_key)
unique_combinations.append(combo)
design_data = []
for i, combo in enumerate(unique_combinations):
combo_dict = combo.copy()
combo_dict['combination_id'] = f"frac_comb_{i+1}"
design_data.append(combo_dict)
design_df = pd.DataFrame(design_data)
print(f"简化设计: {len(design_df)}种组合")
return design_df
def evaluate_design_quality(self):
"""评估设计质量"""
if self.design_matrix is None:
return {"error": "设计矩阵未生成"}
n_factors = len(self.factors)
n_runs = len(self.design_matrix)
full_factorial_runs = 2 ** n_factors
quality_metrics = {
'factors': n_factors,
'runs': n_runs,
'full_factorial_runs': full_factorial_runs,
'fraction': n_runs / full_factorial_runs,
'efficiency': (1 - n_runs / full_factorial_runs) * 100
}
print(f"设计质量评估:")
print(f" 因素数量: {quality_metrics['factors']}")
print(f" 实验次数: {quality_metrics['runs']}")
print(f" 全因子实验次数: {quality_metrics['full_factorial_runs']}")
print(f" 实验比例: {quality_metrics['fraction']:.1%}")
print(f" 效率提升: {quality_metrics['efficiency']:.1f}%")
return quality_metrics
# 部分因子设计示例
print("\n" + "="*50)
print("部分因子设计示例")
# 定义6个2水平因素
factors_fractional = {
'背景颜色': ['白色', '浅灰色'],
'标题大小': ['小', '大'],
'图片样式': ['方形', '圆角'],
'按钮颜色': ['蓝色', '绿色'],
'价格显示': ['简单', '突出'],
'评价展示': ['隐藏', '显示']
}
# 创建部分因子设计
fractional_design = FractionalFactorialDesign(factors_fractional, resolution='V')
fractional_matrix = fractional_design.design_matrix
if fractional_matrix is not None:
print(f"\n部分因子设计矩阵:")
print(fractional_matrix)
# 评估设计质量
quality = fractional_design.evaluate_design_quality()
2.3 实验设计的选择策略
选择合适的实验设计需要权衡多个因素:
设计类型 | 适用场景 | 优点 | 缺点 |
---|---|---|---|
全因子设计 | 因素少(2-4个),样本量充足 | 能检测所有交互效应,结果解释简单 | 组合数量指数增长,样本需求大 |
部分因子设计 | 因素多(5+),样本量有限 | 大幅减少实验次数,效率高 | 可能混淆某些交互效应 |
响应面设计 | 寻找最优配置,连续因素 | 能建模非线性关系,找到最优点 | 设计复杂,需要专业知识 |
田口方法 | 工程优化,稳健性设计 | 关注均值与方差,稳健性好 | 在软件测试中应用较少 |
III. 实施与流量分配
3.1 多变量测试系统架构
构建一个健壮的多变量测试系统需要精心设计系统架构。
import hashlib
import json
from datetime import datetime, timedelta
import redis
from typing import Dict, List, Optional
class MultivariateTestingSystem:
"""多变量测试系统"""
def __init__(self, redis_client, domain_salt="mvt_system_2024"):
self.redis = redis_client
self.domain_salt = domain_salt
self.experiments = {}
self.assignment_cache = {}
def create_experiment(self, experiment_id: str, factors: Dict,
design_type: str = 'full_factorial'):
"""创建多变量测试实验"""
if design_type == 'full_factorial':
design = FullFactorialDesign(factors)
elif design_type == 'fractional_factorial':
design = FractionalFactorialDesign(factors)
else:
raise ValueError(f"不支持的设计类型: {design_type}")
design_matrix = design.generate_design_matrix()
experiment = {
'id': experiment_id,
'factors': factors,
'design_type': design_type,
'design_matrix': design_matrix,
'status': 'active',
'created_at': datetime.now().isoformat(),
'traffic_allocation': 1.0, # 100%流量分配
'combinations': {}
}
# 初始化每个组合的统计信息
for _, combo in design_matrix.iterrows():
combo_id = combo['combination_id']
experiment['combinations'][combo_id] = {
'visitors': 0,
'conversions': 0,
'revenue': 0.0,
'last_updated': datetime.now().isoformat()
}
self.experiments[experiment_id] = experiment
# 存储到Redis
self._store_experiment(experiment)
print(f"创建实验: {experiment_id}")
print(f"设计类型: {design_type}")
print(f"组合数量: {len(design_matrix)}")
return experiment
def _store_experiment(self, experiment):
"""存储实验配置到Redis"""
exp_key = f"mvt:experiment:{experiment['id']}"
self.redis.hset(exp_key, mapping={
'config': json.dumps(experiment, default=str),
'updated_at': datetime.now().isoformat()
})
def assign_user_to_combination(self, user_id: str, experiment_id: str) -> Dict:
"""将用户分配到实验组合"""
# 检查缓存
cache_key = f"assignment:{user_id}:{experiment_id}"
if cache_key in self.assignment_cache:
return self.assignment_cache[cache_key]
# 检查持久化存储
stored_assignment = self._get_stored_assignment(user_id, experiment_id)
if stored_assignment:
self.assignment_cache[cache_key] = stored_assignment
return stored_assignment
# 获取实验配置
experiment = self._get_experiment(experiment_id)
if not experiment:
raise ValueError(f"实验 {experiment_id} 不存在")
# 使用确定性哈希分配
combination = self._calculate_assignment(user_id, experiment)
# 存储分配结果
self._store_assignment(user_id, experiment_id, combination)
self.assignment_cache[cache_key] = combination
# 更新组合统计
self._update_combination_stats(experiment_id, combination['combination_id'], 'visitor')
return combination
def _calculate_assignment(self, user_id: str, experiment: Dict) -> Dict:
"""计算用户分配"""
design_matrix = experiment['design_matrix']
n_combinations = len(design_matrix)
# 使用哈希进行确定性分配
hash_input = f"{user_id}_{experiment['id']}_{self.domain_salt}".encode('utf-8')
hash_value = hashlib.md5(hash_input).hexdigest()
hash_int = int(hash_value[:8], 16)
combination_idx = hash_int % n_combinations
combination = design_matrix.iloc[combination_idx].to_dict()
return combination
def _get_stored_assignment(self, user_id: str, experiment_id: str) -> Optional[Dict]:
"""从存储中获取分配结果"""
key = f"mvt:assignment:{experiment_id}:{user_id}"
assignment_json = self.redis.get(key)
if assignment_json:
return json.loads(assignment_json)
return None
def _store_assignment(self, user_id: str, experiment_id: str, combination: Dict):
"""存储分配结果"""
key = f"mvt:assignment:{experiment_id}:{user_id}"
# 存储30天
self.redis.setex(key, timedelta(days=30), json.dumps(combination))
def _get_experiment(self, experiment_id: str) -> Optional[Dict]:
"""从Redis获取实验配置"""
if experiment_id in self.experiments:
return self.experiments[experiment_id]
exp_key = f"mvt:experiment:{experiment_id}"
experiment_data = self.redis.hget(exp_key, 'config')
if experiment_data:
return json.loads(experiment_data)
return None
def _update_combination_stats(self, experiment_id: str, combination_id: str,
event_type: str, value: float = 0):
"""更新组合统计信息"""
stats_key = f"mvt:stats:{experiment_id}:{combination_id}"
if event_type == 'visitor':
self.redis.hincrby(stats_key, 'visitors', 1)
elif event_type == 'conversion':
self.redis.hincrby(stats_key, 'conversions', 1)
self.redis.hincrbyfloat(stats_key, 'revenue', value)
self.redis.hset(stats_key, 'last_updated', datetime.now().isoformat())
def track_conversion(self, user_id: str, experiment_id: str, value: float = 0):
"""跟踪转化事件"""
assignment = self._get_stored_assignment(user_id, experiment_id)
if assignment:
combination_id = assignment['combination_id']
self._update_combination_stats(experiment_id, combination_id, 'conversion', value)
print(f"跟踪转化: 用户 {user_id}, 实验 {experiment_id}, "
f"组合 {combination_id}, 价值 {value}")
def get_experiment_results(self, experiment_id: str) -> pd.DataFrame:
"""获取实验结果"""
experiment = self._get_experiment(experiment_id)
if not experiment:
raise ValueError(f"实验 {experiment_id} 不存在")
design_matrix = experiment['design_matrix']
results_data = []
for _, combo in design_matrix.iterrows():
combo_id = combo['combination_id']
stats_key = f"mvt:stats:{experiment_id}:{combo_id}"
stats = self.redis.hgetall(stats_key)
visitors = int(stats.get(b'visitors', 0))
conversions = int(stats.get(b'conversions', 0))
revenue = float(stats.get(b'revenue', 0))
conversion_rate = conversions / visitors if visitors > 0 else 0
avg_order_value = revenue / conversions if conversions > 0 else 0
result_row = combo.to_dict()
result_row.update({
'visitors': visitors,
'conversions': conversions,
'revenue': revenue,
'conversion_rate': conversion_rate,
'avg_order_value': avg_order_value
})
results_data.append(result_row)
results_df = pd.DataFrame(results_data)
return results_df
# 初始化多变量测试系统
redis_client = redis.Redis(host='localhost', port=6379, db=0, decode_responses=True)
mvt_system = MultivariateTestingSystem(redis_client)
# 创建多变量测试实验
print("创建多变量测试实验")
experiment_factors = {
'header_design': ['简约', '丰富'],
'image_style': ['产品图', '场景图'],
'button_color': ['蓝色', '绿色', '橙色'],
'social_proof': ['无', '评价数量', '用户评价']
}
experiment = mvt_system.create_experiment(
experiment_id='homepage_optimization_2024',
factors=experiment_factors,
design_type='full_factorial'
)
# 模拟用户分配和转化
print("\n模拟用户行为...")
np.random.seed(42)
n_users = 5000
# 基础转化率
base_conversion_rate = 0.08
# 模拟用户旅程
for i in range(n_users):
user_id = f"user_{i}"
# 分配用户到组合
assignment = mvt_system.assign_user_to_combination(user_id, 'homepage_optimization_2024')
# 模拟转化行为(基于组合特性)
combo_id = assignment['combination_id']
# 为不同组合设置不同的转化率
# 这里简化处理,实际应该基于组合特征
conversion_prob = base_conversion_rate + (hash(combo_id) % 100) / 1000
if np.random.random() < conversion_prob:
order_value = np.random.normal(100, 30) # 模拟订单价值
mvt_system.track_conversion(user_id, 'homepage_optimization_2024', order_value)
# 获取实验结果
results = mvt_system.get_experiment_results('homepage_optimization_2024')
print(f"\n实验结果 (前10行):")
print(results.head(10))
3.2 流量分配优化
在多变量测试中,智能的流量分配可以提升实验效率。
class AdaptiveTrafficAllocation:
"""自适应流量分配"""
def __init__(self, experiment_system, epsilon=0.1):
self.system = experiment_system
self.epsilon = epsilon # 探索概率
def get_optimal_allocation(self, experiment_id: str) -> Dict[str, float]:
"""计算最优流量分配"""
results = self.system.get_experiment_results(experiment_id)
if len(results) == 0 or results['visitors'].sum() == 0:
# 初始阶段均匀分配
n_combinations = len(results)
return {combo_id: 1/n_combinations for combo_id in results['combination_id']}
# 计算每个组合的UCB (Upper Confidence Bound) 分数
ucb_scores = {}
total_visitors = results['visitors'].sum()
for _, row in results.iterrows():
if row['visitors'] > 0:
conversion_rate = row['conversion_rate']
confidence_bound = np.sqrt(2 * np.log(total_visitors) / row['visitors'])
ucb_score = conversion_rate + confidence_bound
else:
ucb_score = float('inf') # 未探索的组合优先
ucb_scores[row['combination_id']] = ucb_score
# 使用epsilon-greedy策略
if np.random.random() < self.epsilon:
# 探索:均匀分配
n_combinations = len(ucb_scores)
allocation = {combo_id: 1/n_combinations for combo_id in ucb_scores.keys()}
else:
# 利用:按UCB分数分配
total_ucb = sum(ucb_scores.values())
allocation = {combo_id: score/total_ucb for combo_id, score in ucb_scores.items()}
return allocation
def optimize_experiment_traffic(self, experiment_id: str, new_users: int):
"""优化实验流量分配"""
optimal_allocation = self.get_optimal_allocation(experiment_id)
print(f"优化后的流量分配:")
for combo_id, allocation in sorted(optimal_allocation.items(),
key=lambda x: x[1], reverse=True)[:5]:
print(f" {combo_id}: {allocation:.3f}")
# 模拟按照新分配接待用户
experiment = self.system._get_experiment(experiment_id)
design_matrix = experiment['design_matrix']
combo_ids = list(optimal_allocation.keys())
allocations = list(optimal_allocation.values())
for i in range(new_users):
user_id = f"opt_user_{i}"
# 按优化分配选择组合
combo_id = np.random.choice(combo_ids, p=allocations)
combination = design_matrix[design_matrix['combination_id'] == combo_id].iloc[0].to_dict()
# 存储分配
self.system._store_assignment(user_id, experiment_id, combination)
self.system._update_combination_stats(experiment_id, combo_id, 'visitor')
# 模拟转化
conversion_prob = 0.08 + (hash(combo_id) % 100) / 1000
if np.random.random() < conversion_prob:
order_value = np.random.normal(100, 30)
self.system.track_conversion(user_id, experiment_id, order_value)
# 使用自适应流量分配
adaptive_allocator = AdaptiveTrafficAllocation(mvt_system)
print("初始流量分配优化...")
adaptive_allocator.optimize_experiment_traffic('homepage_optimization_2024', 2000)
# 查看优化后的结果
optimized_results = mvt_system.get_experiment_results('homepage_optimization_2024')
print(f"\n优化后实验结果 (按转化率排序):")
sorted_results = optimized_results.sort_values('conversion_rate', ascending=False)
print(sorted_results[['combination_id', 'visitors', 'conversions', 'conversion_rate']].head(10))
IV. 结果分析与统计方法
4.1 方差分析(ANOVA)
方差分析是多变量测试结果分析的核心统计方法,用于确定各个因素对结果变量的影响是否显著。
class MVTResultsAnalyzer:
"""多变量测试结果分析器"""
def __init__(self, confidence_level=0.95):
self.confidence_level = confidence_level
self.alpha = 1 - confidence_level
def perform_anova_analysis(self, results_df, response_var='conversion_rate'):
"""执行方差分析"""
import statsmodels.api as sm
from statsmodels.formula.api import ols
# 准备数据
anova_data = results_df.copy()
# 确保所有因素都是分类变量
factor_columns = [col for col in anova_data.columns
if col not in ['combination_id', 'visitors', 'conversions',
'revenue', 'conversion_rate', 'avg_order_value']]
for col in factor_columns:
anova_data[col] = anova_data[col].astype('category')
# 构建方差分析模型
formula = f"{response_var} ~ " + " * ".join(factor_columns)
try:
model = ols(formula, data=anova_data).fit()
anova_table = sm.stats.anova_lm(model, typ=2)
return model, anova_table
except Exception as e:
print(f"方差分析错误: {e}")
return None, None
def analyze_main_effects(self, anova_table):
"""分析主效应"""
if anova_table is None:
return None
main_effects = anova_table[anova_table.index.str.contains('^[A-Za-z_]+$')]
print("主效应分析:")
print("=" * 50)
significant_effects = []
for factor, row in main_effects.iterrows():
p_value = row['PR(>F)']
f_value = row['F']
significance = "显著" if p_value < self.alpha else "不显著"
print(f"{factor}: F={f_value:.4f}, p={p_value:.6f} ({significance})")
if p_value < self.alpha:
significant_effects.append(factor)
return significant_effects
def analyze_interaction_effects(self, anova_table):
"""分析交互效应"""
if anova_table is None:
return None
interaction_effects = anova_table[anova_table.index.str.contains(':')]
print("\n交互效应分析:")
print("=" * 50)
significant_interactions = []
for interaction, row in interaction_effects.iterrows():
p_value = row['PR(>F)']
f_value = row['F']
significance = "显著" if p_value < self.alpha else "不显著"
print(f"{interaction}: F={f_value:.4f}, p={p_value:.6f} ({significance})")
if p_value < self.alpha:
significant_interactions.append(interaction)
return significant_interactions
def calculate_effect_sizes(self, results_df, factors):
"""计算效应大小"""
effect_sizes = {}
for factor in factors:
# 计算该因素的每个水平的平均转化率
factor_levels = results_df.groupby(factor)['conversion_rate'].mean()
overall_mean = results_df['conversion_rate'].mean()
# 计算组间方差
between_variance = ((factor_levels - overall_mean) ** 2).mean()
# 计算eta平方 (效应大小)
total_variance = results_df['conversion_rate'].var()
eta_squared = between_variance / total_variance if total_variance > 0 else 0
effect_sizes[factor] = {
'eta_squared': eta_squared,
'interpretation': self.interpret_effect_size(eta_squared),
'level_means': factor_levels.to_dict()
}
return effect_sizes
def interpret_effect_size(self, eta_squared):
"""解释效应大小"""
if eta_squared < 0.01:
return "很小"
elif eta_squared < 0.06:
return "小"
elif eta_squared < 0.14:
return "中等"
else:
return "大"
def create_interaction_plots(self, results_df, significant_interactions):
"""创建交互效应图"""
if not significant_interactions:
print("无显著交互效应")
return
n_interactions = len(significant_interactions)
n_cols = 2
n_rows = (n_interactions + n_cols - 1) // n_cols
fig, axes = plt.subplots(n_rows, n_cols, figsize=(15, 5 * n_rows))
if n_interactions == 1:
axes = [axes]
else:
axes = axes.flatten()
for i, interaction in enumerate(significant_interactions[:len(axes)]):
if i >= len(axes):
break
factors = interaction.split(':')
if len(factors) != 2:
continue
factor1, factor2 = factors
# 创建交互图
pivot_data = results_df.pivot_table(
values='conversion_rate',
index=factor1,
columns=factor2,
aggfunc='mean'
)
pivot_data.plot(kind='line', marker='o', ax=axes[i])
axes[i].set_title(f'交互效应: {factor1} × {factor2}')
axes[i].set_ylabel('转化率')
axes[i].grid(True, alpha=0.3)
axes[i].legend(title=factor2)
# 隐藏多余的子图
for j in range(i + 1, len(axes)):
axes[j].set_visible(False)
plt.tight_layout()
plt.show()
def generate_comprehensive_report(self, results_df):
"""生成综合分析报告"""
print("多变量测试综合分析报告")
print("=" * 60)
# 基本统计
total_visitors = results_df['visitors'].sum()
total_conversions = results_df['conversions'].sum()
overall_conversion_rate = total_conversions / total_visitors if total_visitors > 0 else 0
print(f"总访问量: {total_visitors}")
print(f"总转化数: {total_conversions}")
print(f"总体转化率: {overall_conversion_rate:.4f}")
# 最佳组合
best_combo = results_df.loc[results_df['conversion_rate'].idxmax()]
worst_combo = results_df.loc[results_df['conversion_rate'].idxmin()]
print(f"\n最佳组合: {best_combo['combination_id']}")
print(f"最佳转化率: {best_combo['conversion_rate']:.4f}")
print(f"最差组合: {worst_combo['combination_id']}")
print(f"最差转化率: {worst_combo['conversion_rate']:.4f}")
print(f"最大提升: {best_combo['conversion_rate'] - worst_combo['conversion_rate']:.4f}")
# 方差分析
model, anova_table = self.perform_anova_analysis(results_df)
if anova_table is not None:
# 主效应分析
significant_main_effects = self.analyze_main_effects(anova_table)
# 交互效应分析
significant_interactions = self.analyze_interaction_effects(anova_table)
# 效应大小分析
if significant_main_effects:
effect_sizes = self.calculate_effect_sizes(results_df, significant_main_effects)
print("\n效应大小分析:")
print("=" * 50)
for factor, effects in effect_sizes.items():
print(f"{factor}: η² = {effects['eta_squared']:.4f} ({effects['interpretation']})")
# 交互效应图
if significant_interactions:
self.create_interaction_plots(results_df, significant_interactions)
return {
'overall_stats': {
'total_visitors': total_visitors,
'total_conversions': total_conversions,
'overall_conversion_rate': overall_conversion_rate
},
'best_combo': best_combo.to_dict(),
'worst_combo': worst_combo.to_dict(),
'significant_main_effects': significant_main_effects,
'significant_interactions': significant_interactions,
'effect_sizes': effect_sizes if 'effect_sizes' in locals() else None
}
# 分析多变量测试结果
analyzer = MVTResultsAnalyzer(confidence_level=0.95)
print("多变量测试结果分析")
print("=" * 50)
comprehensive_report = analyzer.generate_comprehensive_report(optimized_results)
4.2 多重比较校正
在多变量测试中,由于同时进行多个比较,需要校正显著性水平以避免假阳性。
class MultipleComparisonCorrection:
"""多重比较校正"""
def __init__(self, alpha=0.05):
self.alpha = alpha
def apply_bonferroni_correction(self, p_values, n_comparisons):
"""应用Bonferroni校正"""
corrected_alpha = self.alpha / n_comparisons
corrected_p_values = [p * n_comparisons for p in p_values]
corrected_p_values = [min(p, 1.0) for p in corrected_p_values] # 确保不超过1
return corrected_alpha, corrected_p_values
def apply_fdr_correction(self, p_values):
"""应用错误发现率(FDR)校正"""
from statsmodels.stats.multitest import multipletests
rejected, corrected_pvals, _, _ = multipletests(
p_values, alpha=self.alpha, method='fdr_bh'
)
return rejected, corrected_pvals
def analyze_pairwise_comparisons(self, results_df, control_combo_id=None):
"""分析成对比较"""
from itertools import combinations
from statsmodels.stats.proportion import proportions_ztest
combo_ids = results_df['combination_id'].tolist()
n_combinations = len(combo_ids)
if control_combo_id is None:
# 使用总体平均作为对照
control_rate = results_df['conversion_rate'].mean()
control_visitors = results_df['visitors'].sum()
else:
control_data = results_df[results_df['combination_id'] == control_combo_id].iloc[0]
control_rate = control_data['conversion_rate']
control_visitors = control_data['visitors']
comparisons = []
p_values = []
for _, row in results_df.iterrows():
if control_combo_id is not None and row['combination_id'] == control_combo_id:
continue
# 比例检验
count = [row['conversions'], control_visitors * control_rate]
nobs = [row['visitors'], control_visitors]
z_stat, p_value = proportions_ztest(count, nobs, alternative='two-sided')
relative_improvement = (row['conversion_rate'] - control_rate) / control_rate
comparisons.append({
'combination_id': row['combination_id'],
'conversion_rate': row['conversion_rate'],
'control_rate': control_rate,
'relative_improvement': relative_improvement,
'z_statistic': z_stat,
'p_value': p_value
})
p_values.append(p_value)
# 应用多重比较校正
n_comparisons = len(comparisons)
bonferroni_alpha, bonferroni_pvals = self.apply_bonferroni_correction(p_values, n_comparisons)
fdr_rejected, fdr_pvals = self.apply_fdr_correction(p_values)
# 添加校正后的结果
for i, comp in enumerate(comparisons):
comp['p_value_bonferroni'] = bonferroni_pvals[i]
comp['significant_bonferroni'] = bonferroni_pvals[i] < self.alpha
comp['p_value_fdr'] = fdr_pvals[i]
comp['significant_fdr'] = fdr_rejected[i]
comparisons_df = pd.DataFrame(comparisons)
print("成对比较分析:")
print("=" * 50)
print(f"比较数量: {n_comparisons}")
print(f"原始α水平: {self.alpha}")
print(f"Bonferroni校正后α: {bonferroni_alpha:.6f}")
# 显示显著结果
significant_bonferroni = comparisons_df[comparisons_df['significant_bonferroni']]
significant_fdr = comparisons_df[comparisons_df['significant_fdr']]
print(f"\nBonferroni校正后显著的数量: {len(significant_bonferroni)}")
print(f"FDR校正后显著的数量: {len(significant_fdr)}")
if len(significant_bonferroni) > 0:
print("\nBonferroni校正后显著的组合:")
for _, row in significant_bonferroni.iterrows():
print(f" {row['combination_id']}: 提升={row['relative_improvement']:+.2%}, "
f"p={row['p_value_bonferroni']:.6f}")
return comparisons_df
# 多重比较校正示例
correction = MultipleComparisonCorrection(alpha=0.05)
print("多重比较校正分析")
print("=" * 50)
pairwise_results = correction.analyze_pairwise_comparisons(optimized_results)
# 显示前10个比较结果
print(f"\n前10个成对比较结果:")
display_cols = ['combination_id', 'relative_improvement', 'p_value',
'p_value_bonferroni', 'significant_bonferroni', 'significant_fdr']
print(pairwise_results[display_cols].head(10))
4.3 响应优化与预测
基于多变量测试结果,我们可以建立预测模型来找到最优的因素组合。
class ResponseOptimizer:
"""响应优化器"""
def __init__(self):
self.models = {}
def fit_response_surface(self, results_df, degree=2):
"""拟合响应面模型"""
from sklearn.preprocessing import LabelEncoder, PolynomialFeatures
from sklearn.linear_model import LinearRegression
from sklearn.pipeline import Pipeline
from sklearn.metrics import r2_score
# 准备数据
X_data = results_df.copy()
y = X_data['conversion_rate']
# 识别因素列
factor_columns = [col for col in X_data.columns
if col not in ['combination_id', 'visitors', 'conversions',
'revenue', 'conversion_rate', 'avg_order_value']]
X = X_data[factor_columns]
# 编码分类变量
encoders = {}
X_encoded = X.copy()
for col in factor_columns:
le = LabelEncoder()
X_encoded[col] = le.fit_transform(X[col])
encoders[col] = le
# 创建多项式特征管道
model = Pipeline([
('poly', PolynomialFeatures(degree=degree, include_bias=False)),
('linear', LinearRegression())
])
# 拟合模型
model.fit(X_encoded, y)
y_pred = model.predict(X_encoded)
r2 = r2_score(y, y_pred)
self.models['response_surface'] = model
self.encoders = encoders
self.factor_columns = factor_columns
print(f"响应面模型 R²: {r2:.4f}")
return model, r2
def find_optimal_combination(self, results_df):
"""寻找最优组合"""
if 'response_surface' not in self.models:
print("请先拟合响应面模型")
return None
model = self.models['response_surface']
factor_columns = self.factor_columns
# 生成所有可能的组合
factor_levels = {}
for col in factor_columns:
factor_levels[col] = results_df[col].unique()
# 创建所有组合的网格
from itertools import product
combinations = list(product(*factor_levels.values()))
# 编码预测数据
X_pred = pd.DataFrame(combinations, columns=factor_columns)
X_pred_encoded = X_pred.copy()
for col in factor_columns:
le = self.encoders[col]
# 处理未见过的级别
valid_levels = set(le.classes_)
X_pred_encoded[col] = X_pred[col].apply(
lambda x: le.transform([x])[0] if x in valid_levels else -1
)
# 移除无效组合
valid_mask = (X_pred_encoded >= 0).all(axis=1)
X_pred_valid = X_pred_encoded[valid_mask]
combinations_valid = [combinations[i] for i in range(len(combinations)) if valid_mask.iloc[i]]
# 预测转化率
if len(X_pred_valid) > 0:
y_pred = model.predict(X_pred_valid)
# 找到最佳预测组合
best_idx = np.argmax(y_pred)
best_combination = combinations_valid[best_idx]
best_predicted_rate = y_pred[best_idx]
# 创建最佳组合的字典
best_combo_dict = {col: level for col, level in zip(factor_columns, best_combination)}
print("最优组合预测:")
print("=" * 50)
for factor, level in best_combo_dict.items():
print(f" {factor}: {level}")
print(f"预测转化率: {best_predicted_rate:.4f}")
# 检查这个组合是否在实验中出现过
actual_results = results_df.copy()
for factor, level in best_combo_dict.items():
actual_results = actual_results[actual_results[factor] == level]
if len(actual_results) > 0:
actual_rate = actual_results['conversion_rate'].iloc[0]
print(f"实验观测转化率: {actual_rate:.4f}")
print(f"预测误差: {abs(best_predicted_rate - actual_rate):.4f}")
else:
print("注意: 此组合未在实验中测试过")
return best_combo_dict, best_predicted_rate
return None
def plot_response_surface(self, results_df, factor_x, factor_y, other_factors=None):
"""绘制响应面图"""
if 'response_surface' not in self.models:
print("请先拟合响应面模型")
return
if other_factors is None:
other_factors = {}
# 创建网格数据
x_levels = results_df[factor_x].unique()
y_levels = results_df[factor_y].unique()
X_plot, Y_plot = np.meshgrid(range(len(x_levels)), range(len(y_levels)))
# 准备预测数据
plot_data = []
for i, x_val in enumerate(x_levels):
for j, y_val in enumerate(y_levels):
data_point = other_factors.copy()
data_point[factor_x] = x_val
data_point[factor_y] = y_val
# 添加其他因素的默认值(使用第一个水平)
for factor in self.factor_columns:
if factor not in data_point and factor not in [factor_x, factor_y]:
data_point[factor] = results_df[factor].iloc[0]
plot_data.append(data_point)
plot_df = pd.DataFrame(plot_data)
plot_df_encoded = plot_df.copy()
# 编码数据
for col in self.factor_columns:
le = self.encoders[col]
valid_levels = set(le.classes_)
plot_df_encoded[col] = plot_df[col].apply(
lambda x: le.transform([x])[0] if x in valid_levels else -1
)
# 预测
Z_pred = self.models['response_surface'].predict(plot_df_encoded)
Z_plot = Z_pred.reshape(len(y_levels), len(x_levels))
# 绘制响应面
plt.figure(figsize=(10, 8))
contour = plt.contourf(X_plot, Y_plot, Z_plot, levels=20, cmap='viridis')
plt.colorbar(contour, label='预测转化率')
plt.xticks(range(len(x_levels)), x_levels, rotation=45)
plt.yticks(range(len(y_levels)), y_levels)
plt.xlabel(factor_x)
plt.ylabel(factor_y)
plt.title(f'响应面: {factor_x} vs {factor_y}')
# 标记数据点
actual_points = results_df[[factor_x, factor_y, 'conversion_rate']].drop_duplicates()
for _, point in actual_points.iterrows():
x_idx = list(x_levels).index(point[factor_x])
y_idx = list(y_levels).index(point[factor_y])
plt.plot(x_idx, y_idx, 'ro', markersize=8, markeredgecolor='white')
plt.text(x_idx, y_idx, f'{point["conversion_rate"]:.3f}',
ha='center', va='bottom', color='white', fontweight='bold')
plt.tight_layout()
plt.show()
# 响应优化示例
optimizer = ResponseOptimizer()
print("响应优化分析")
print("=" * 50)
# 拟合响应面模型
model, r2 = optimizer.fit_response_surface(optimized_results, degree=2)
# 寻找最优组合
optimal_combo, predicted_rate = optimizer.find_optimal_combination(optimized_results)
# 绘制响应面图
if optimal_combo is not None:
# 固定其他因素,绘制两个主要因素的响应面
other_factors = optimal_combo.copy()
factors_to_plot = list(optimal_combo.keys())[:2] # 取前两个因素
if len(factors_to_plot) == 2:
factor_x, factor_y = factors_to_plot
# 从最优组合中移除要绘制的因素
other_factors_plot = other_factors.copy()
other_factors_plot.pop(factor_x)
other_factors_plot.pop(factor_y)
optimizer.plot_response_surface(optimized_results, factor_x, factor_y, other_factors_plot)
Lexical error on line 20. Unrecognized text.
...检测] B3 --> B31[η²效应大小] C1 -
----------------------^
V. 实战案例:电商首页优化
5.1 案例背景与实验设计
某电商平台希望优化其首页设计,提升用户注册转化率。需要同时测试以下四个因素:
- 头部布局:简约型 vs 丰富型
- 产品展示:网格布局 vs 列表布局
- 行动按钮:蓝色 vs 绿色 vs 橙色
- 信任标志:无 vs 安全认证 vs 用户评价
# 电商首页优化案例
def ecommerce_homepage_case_study():
"""电商首页多变量测试实战案例"""
print("电商首页优化多变量测试案例")
print("=" * 60)
# 定义实验因素
homepage_factors = {
'header_layout': ['minimal', 'rich'],
'product_display': ['grid', 'list'],
'cta_button': ['blue', 'green', 'orange'],
'trust_signals': ['none', 'security_badge', 'user_reviews']
}
# 创建全因子实验设计
homepage_design = FullFactorialDesign(homepage_factors)
design_matrix = homepage_design.generate_design_matrix()
print(f"实验设计: {len(homepage_factors)}个因素, {len(design_matrix)}种组合")
# 计算所需样本量
sample_info = homepage_design.calculate_required_sample_size(
baseline_rate=0.05, # 当前注册转化率5%
detectable_effect=0.01, # 检测1%的绝对提升
alpha=0.05,
power=0.8
)
print(f"\n样本量规划:")
print(f"每组所需样本量: {sample_info['n_per_group']}")
print(f"总样本量: {sample_info['total_sample_size']}")
print(f"按每日10万访问量计算, 需要运行: {sample_info['total_sample_size'] / 100000:.1f} 天")
# 模拟实验数据
np.random.seed(42)
n_users = 50000 # 模拟5万用户
# 基础转化率
base_conversion_rate = 0.05
# 设置真实效应(在真实实验中这些是未知的)
true_effects = {
'header_layout': {'minimal': 0, 'rich': 0.008}, # 丰富布局提升0.8%
'product_display': {'grid': 0, 'list': 0.005}, # 列表布局提升0.5%
'cta_button': {'blue': 0, 'green': 0.012, 'orange': 0.006}, # 绿色按钮最佳
'trust_signals': {'none': 0, 'security_badge': 0.004, 'user_reviews': 0.010} # 用户评价最佳
}
# 设置交互效应
interaction_effects = {
('header_layout', 'product_display'): 0.003, # 丰富布局+列表布局有额外提升
('cta_button', 'trust_signals'): 0.005 # 绿色按钮+用户评价有额外提升
}
# 生成实验数据
experiment_data = []
for user_id in range(n_users):
# 随机分配组合
combo_idx = np.random.randint(len(design_matrix))
combination = design_matrix.iloc[combo_idx]
combo_id = combination['combination_id']
# 计算期望转化率
expected_rate = base_conversion_rate
# 添加主效应
for factor, level in combination.items():
if factor in ['combination_id', 'combination_name']:
continue
expected_rate += true_effects[factor][level]
# 添加交互效应
for (factor1, factor2), effect in interaction_effects.items():
if combination[factor1] != list(true_effects[factor1].keys())[0] and \
combination[factor2] != list(true_effects[factor2].keys())[0]:
expected_rate += effect
# 确保转化率在合理范围内
expected_rate = max(0.01, min(0.20, expected_rate))
# 模拟用户行为
converted = np.random.random() < expected_rate
experiment_data.append({
'user_id': f"user_{user_id}",
'combination_id': combo_id,
'header_layout': combination['header_layout'],
'product_display': combination['product_display'],
'cta_button': combination['cta_button'],
'trust_signals': combination['trust_signals'],
'converted': converted,
'expected_rate': expected_rate
})
# 转换为DataFrame
experiment_df = pd.DataFrame(experiment_data)
# 汇总结果
results_summary = experiment_df.groupby('combination_id').agg({
'user_id': 'count',
'converted': 'sum',
'expected_rate': 'mean'
}).reset_index()
results_summary = results_summary.rename(columns={
'user_id': 'visitors',
'converted': 'conversions'
})
results_summary['conversion_rate'] = results_summary['conversions'] / results_summary['visitors']
# 合并设计矩阵
final_results = pd.merge(design_matrix, results_summary, on='combination_id')
print(f"\n实验数据汇总:")
print(f"总用户数: {len(experiment_df)}")
print(f"总转化数: {experiment_df['converted'].sum()}")
print(f"总体转化率: {experiment_df['converted'].mean():.4f}")
return final_results, experiment_df, true_effects, interaction_effects
# 运行案例研究
homepage_results, raw_data, true_effects, true_interactions = ecommerce_homepage_case_study()
print(f"\n前5种组合结果:")
display_cols = ['combination_id', 'header_layout', 'product_display', 'cta_button',
'trust_signals', 'visitors', 'conversions', 'conversion_rate']
print(homepage_results[display_cols].head())
5.2 结果分析与业务洞察
def homepage_analysis_and_insights(results_df, true_effects, true_interactions):
"""首页优化案例的深度分析"""
print("电商首页优化深度分析")
print("=" * 60)
# 使用分析器进行统计分析
analyzer = MVTResultsAnalyzer(confidence_level=0.95)
analysis_report = analyzer.generate_comprehensive_report(results_df)
# 多重比较校正
correction = MultipleComparisonCorrection(alpha=0.05)
pairwise_comparisons = correction.analyze_pairwise_comparisons(results_df)
# 响应优化
optimizer = ResponseOptimizer()
model, r2 = optimizer.fit_response_surface(results_df, degree=2)
optimal_combo, predicted_rate = optimizer.find_optimal_combination(results_df)
# 验证真实效应
print("\n真实效应验证:")
print("=" * 50)
# 计算实际观察到的效应
observed_effects = {}
for factor, levels in true_effects.items():
factor_effects = {}
for level in levels.keys():
level_data = results_df[results_df[factor] == level]
avg_rate = level_data['conversion_rate'].mean()
factor_effects[level] = avg_rate
# 计算相对基线的影响
baseline_level = list(levels.keys())[0]
baseline_rate = factor_effects[baseline_level]
print(f"\n{factor}:")
for level, rate in factor_effects.items():
effect = rate - baseline_rate
true_effect = true_effects[factor][level]
error = effect - true_effect
print(f" {level}: 观察效应={effect:.4f}, 真实效应={true_effect:.4f}, 误差={error:.4f}")
observed_effects[factor] = factor_effects
# 验证交互效应
print(f"\n交互效应验证:")
for (factor1, factor2), true_interaction in true_interactions.items():
# 计算观察到的交互效应
level1a, level1b = list(true_effects[factor1].keys())[0], list(true_effects[factor1].keys())[1]
level2a, level2b = list(true_effects[factor2].keys())[0], list(true_effects[factor2].keys())[1]
# 获取四种组合的转化率
rate_1a2a = results_df[
(results_df[factor1] == level1a) & (results_df[factor2] == level2a)
]['conversion_rate'].mean()
rate_1a2b = results_df[
(results_df[factor1] == level1a) & (results_df[factor2] == level2b)
]['conversion_rate'].mean()
rate_1b2a = results_df[
(results_df[factor1] == level1b) & (results_df[factor2] == level2a)
]['conversion_rate'].mean()
rate_1b2b = results_df[
(results_df[factor1] == level1b) & (results_df[factor2] == level2b)
]['conversion_rate'].mean()
# 计算交互效应
observed_interaction = (rate_1b2b - rate_1a2b) - (rate_1b2a - rate_1a2a)
interaction_error = observed_interaction - true_interaction
print(f"{factor1} × {factor2}:")
print(f" 观察交互效应: {observed_interaction:.4f}")
print(f" 真实交互效应: {true_interaction:.4f}")
print(f" 估计误差: {interaction_error:.4f}")
# 业务影响分析
print(f"\n业务影响分析:")
print("=" * 50)
# 找到最佳组合
best_combo = results_df.loc[results_df['conversion_rate'].idxmax()]
baseline_combo = results_df.loc[results_df['conversion_rate'].idxmin()]
improvement = best_combo['conversion_rate'] - baseline_combo['conversion_rate']
relative_improvement = improvement / baseline_combo['conversion_rate']
# 计算预期业务收益
monthly_visitors = 1000000 # 月访问量100万
average_customer_value = 200 # 平均客户价值200元
expected_monthly_gain = (improvement * monthly_visitors * average_customer_value)
print(f"最佳组合: {best_combo['combination_id']}")
print(f"最佳转化率: {best_combo['conversion_rate']:.4f}")
print(f"基准转化率: {baseline_combo['conversion_rate']:.4f}")
print(f"绝对提升: {improvement:.4f}")
print(f"相对提升: {relative_improvement:.2%}")
print(f"预期月收益增长: ¥{expected_monthly_gain:,.0f}")
# 实施建议
print(f"\n实施建议:")
print("=" * 50)
significant_factors = analysis_report.get('significant_main_effects', [])
if significant_factors:
print("基于统计显著性,建议优先优化以下因素:")
for factor in significant_factors:
best_level = observed_effects[factor]
best_level_name = max(best_level, key=best_level.get)
print(f" • {factor}: 选择 '{best_level_name}'")
else:
print("未发现统计显著的因素,建议:")
print(" • 延长实验时间收集更多数据")
print(" • 重新评估要测试的因素和水平")
print(" • 考虑测试更大的效应")
if optimal_combo:
print(f"\n基于响应面优化的推荐配置:")
for factor, level in optimal_combo.items():
print(f" • {factor}: {level}")
return {
'analysis_report': analysis_report,
'observed_effects': observed_effects,
'business_impact': {
'best_combo': best_combo['combination_id'],
'improvement': improvement,
'relative_improvement': relative_improvement,
'expected_gain': expected_monthly_gain
}
}
# 运行深度分析
homepage_insights = homepage_analysis_and_insights(
homepage_results, true_effects, true_interactions
)
5.3 可视化报告与决策支持
def create_comprehensive_visualization(results_df, analysis_report, observed_effects):
"""创建综合可视化报告"""
# 创建多面板可视化
fig = plt.figure(figsize=(20, 16))
# 1. 组合性能热力图 (顶部)
gs_top = fig.add_gridspec(2, 2, top=0.95, bottom=0.6, hspace=0.3, wspace=0.3)
# 1.1 转化率分布
ax1 = fig.add_subplot(gs_top[0, 0])
ax1.hist(results_df['conversion_rate'], bins=20, alpha=0.7, color='skyblue', edgecolor='black')
ax1.axvline(results_df['conversion_rate'].mean(), color='red', linestyle='--',
label=f'平均转化率: {results_df["conversion_rate"].mean():.4f}')
ax1.set_xlabel('转化率')
ax1.set_ylabel('组合数量')
ax1.set_title('所有组合转化率分布')
ax1.legend()
ax1.grid(True, alpha=0.3)
# 1.2 最佳组合对比
ax2 = fig.add_subplot(gs_top[0, 1])
top_combinations = results_df.nlargest(8, 'conversion_rate')
colors = plt.cm.viridis(np.linspace(0, 1, len(top_combinations)))
bars = ax2.barh(range(len(top_combinations)), top_combinations['conversion_rate'], color=colors)
ax2.set_yticks(range(len(top_combinations)))
ax2.set_yticklabels([f"组合 {i+1}" for i in range(len(top_combinations))])
ax2.set_xlabel('转化率')
ax2.set_title('Top 8 最佳组合')
ax2.grid(True, alpha=0.3)
# 在柱状图上添加数值
for i, bar in enumerate(bars):
width = bar.get_width()
ax2.text(width + 0.001, bar.get_y() + bar.get_height()/2,
f'{width:.4f}', ha='left', va='center')
# 1.3 因素重要性
ax3 = fig.add_subplot(gs_top[1, 0])
if 'effect_sizes' in analysis_report and analysis_report['effect_sizes']:
factors = list(analysis_report['effect_sizes'].keys())
eta_squared = [analysis_report['effect_sizes'][f]['eta_squared'] for f in factors]
colors = plt.cm.plasma(np.linspace(0, 1, len(factors)))
bars = ax3.barh(factors, eta_squared, color=colors)
ax3.set_xlabel('效应大小 (η²)')
ax3.set_title('因素重要性排序')
ax3.grid(True, alpha=0.3)
# 添加效应大小解释
for i, bar in enumerate(bars):
width = bar.get_width()
effect_type = analysis_report['effect_sizes'][factors[i]]['interpretation']
ax3.text(width + 0.001, bar.get_y() + bar.get_height()/2,
effect_type, ha='left', va='center', fontsize=10)
# 1.4 样本量充足性
ax4 = fig.add_subplot(gs_top[1, 1])
visitors_per_combo = results_df['visitors']
required_per_combo = 385 # 从样本量计算得出
ax4.hist(visitors_per_combo, bins=15, alpha=0.7, color='lightcoral', edgecolor='black')
ax4.axvline(required_per_combo, color='red', linestyle='--', linewidth=2,
label=f'所需样本量: {required_per_combo}')
ax4.axvline(visitors_per_combo.mean(), color='blue', linestyle='--',
label=f'平均样本量: {visitors_per_combo.mean():.0f}')
ax4.set_xlabel('每组样本量')
ax4.set_ylabel('组合数量')
ax4.set_title('样本量分布')
ax4.legend()
ax4.grid(True, alpha=0.3)
# 2. 因素效应分析 (底部)
gs_bottom = fig.add_gridspec(2, 2, top=0.55, bottom=0.05, hspace=0.3, wspace=0.3)
# 2.1 主效应图
ax5 = fig.add_subplot(gs_bottom[0, 0])
factors_to_plot = list(observed_effects.keys())[:4] # 取前4个因素
for i, factor in enumerate(factors_to_plot):
levels = list(observed_effects[factor].keys())
rates = [observed_effects[factor][level] for level in levels]
# 计算相对第一个水平的提升
baseline = rates[0]
improvements = [rate - baseline for rate in rates]
x_pos = np.arange(len(levels)) + i * 0.2
ax5.bar(x_pos, improvements, width=0.15, label=factor)
ax5.set_xlabel('因素水平')
ax5.set_ylabel('相对于基线的提升')
ax5.set_title('因素主效应')
ax5.legend()
ax5.grid(True, alpha=0.3)
# 简化x轴标签
all_levels = []
for factor in factors_to_plot:
all_levels.extend([f"{factor[:3]}_{l}" for l in observed_effects[factor].keys()])
ax5.set_xticks(range(len(all_levels)))
ax5.set_xticklabels(all_levels, rotation=45, ha='right')
# 2.2 交互效应热力图
ax6 = fig.add_subplot(gs_bottom[0, 1])
# 选择两个最重要的因素绘制交互热力图
if len(factors_to_plot) >= 2:
factor1, factor2 = factors_to_plot[:2]
pivot_data = results_df.pivot_table(
values='conversion_rate',
index=factor1,
columns=factor2,
aggfunc='mean'
)
im = ax6.imshow(pivot_data.values, cmap='YlOrRd', aspect='auto')
ax6.set_xticks(range(len(pivot_data.columns)))
ax6.set_yticks(range(len(pivot_data.index)))
ax6.set_xticklabels(pivot_data.columns, rotation=45, ha='right')
ax6.set_yticklabels(pivot_data.index)
ax6.set_xlabel(factor2)
ax6.set_ylabel(factor1)
ax6.set_title(f'交互效应热力图: {factor1} × {factor2}')
# 添加数值标签
for i in range(len(pivot_data.index)):
for j in range(len(pivot_data.columns)):
text = ax6.text(j, i, f'{pivot_data.iloc[i, j]:.4f}',
ha="center", va="center", color="black", fontsize=8)
plt.colorbar(im, ax=ax6, label='转化率')
# 2.3 统计显著性总结
ax7 = fig.add_subplot(gs_bottom[1, 0])
significant_factors = analysis_report.get('significant_main_effects', [])
significant_interactions = analysis_report.get('significant_interactions', [])
categories = ['显著主效应', '显著交互效应']
counts = [len(significant_factors), len(significant_interactions)]
colors = ['lightgreen', 'lightcoral']
bars = ax7.bar(categories, counts, color=colors, alpha=0.7)
ax7.set_ylabel('数量')
ax7.set_title('统计显著性总结')
ax7.grid(True, alpha=0.3)
# 在柱状图上添加数值
for bar in bars:
height = bar.get_height()
ax7.text(bar.get_x() + bar.get_width()/2., height + 0.1,
f'{int(height)}', ha='center', va='bottom')
# 2.4 业务价值评估
ax8 = fig.add_subplot(gs_bottom[1, 1])
business_impact = homepage_insights['business_impact']
metrics = ['当前转化率', '最佳转化率', '绝对提升', '相对提升']
baseline_rate = results_df['conversion_rate'].min()
best_rate = business_impact['improvement'] + baseline_rate
values = [baseline_rate, best_rate, business_impact['improvement'],
business_impact['relative_improvement']]
colors = ['lightgray', 'lightgreen', 'gold', 'lightcoral']
bars = ax8.bar(metrics, values, color=colors, alpha=0.7)
ax8.set_ylabel('数值')
ax8.set_title('业务价值评估')
ax8.grid(True, alpha=0.3)
# 格式化y轴标签
def format_y_tick(value, pos):
if value < 0.1:
return f'{value:.3f}'
else:
return f'{value:.1%}'
ax8.yaxis.set_major_formatter(plt.FuncFormatter(format_y_tick))
# 在柱状图上添加数值
for i, bar in enumerate(bars):
height = bar.get_height()
if i < 2: # 转化率
label = f'{height:.4f}'
elif i == 2: # 绝对提升
label = f'+{height:.4f}'
else: # 相对提升
label = f'+{height:.1%}'
ax8.text(bar.get_x() + bar.get_width()/2., height + 0.001,
label, ha='center', va='bottom')
plt.tight_layout()
plt.show()
# 打印关键洞察
print("\n关键业务洞察:")
print("=" * 50)
best_combo = results_df.loc[results_df['conversion_rate'].idxmax()]
print(f"🎯 推荐实施组合: {best_combo['combination_id']}")
print(f"📈 预期转化率: {best_combo['conversion_rate']:.4f}")
print(f"💰 预期业务价值: ¥{homepage_insights['business_impact']['expected_gain']:,.0f}/月")
if homepage_insights['analysis_report'].get('significant_main_effects'):
print(f"🔍 关键成功因素: {', '.join(homepage_insights['analysis_report']['significant_main_effects'])}")
if homepage_insights['analysis_report'].get('significant_interactions'):
print(f"🔄 重要交互效应: {', '.join(homepage_insights['analysis_report']['significant_interactions'])}")
# 创建综合可视化报告
create_comprehensive_visualization(
homepage_results,
homepage_insights['analysis_report'],
homepage_insights['observed_effects']
)
【声明】本内容来自华为云开发者社区博主,不代表华为云及华为云开发者社区的观点和立场。转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息,否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱:
cloudbbs@huaweicloud.com
- 点赞
- 收藏
- 关注作者
评论(0)