网络A/B测试:处理干扰效应的实验设计
I. 引言:理解网络干扰效应
1.1 传统A/B测试的局限性
在数字化产品迭代中,A/B测试已成为数据驱动决策的黄金标准。然而,当实验单元之间存在交互关系时,传统独立同分布(i.i.d.)假设被打破。想象这样一个场景:在社交电商平台"邻里购"中,我们为部分用户启用了新的社交推荐算法(处理组),而对照组继续使用旧算法。由于用户之间存在好友关系、分享行为和团购互动,处理组用户的正面体验可能通过社交链路"溢出"到对照组——他们向朋友推荐商品,而这些朋友可能恰好是对照组用户。这种干扰导致对照组的行为也发生间接变化,最终稀释甚至扭曲实验效果。
| 传统A/B测试假设 vs 网络A/B测试现实 | ||
|---|---|---|
| 假设 | 传统A/B测试 | 网络A/B测试 |
| 独立性 | 用户行为互不影响 | 用户通过社交网络相互影响 |
| SUTVA | 稳定单元干预值假设成立 | 违反SUTVA,存在溢出效应 |
| 随机化单元 | 单个用户 | 需要簇(Cluster)或图结构 |
| 分析维度 | 用户级指标 | 需考虑网络级指标 |
1.2 干扰效应的类型学
网络干扰效应可分为三种主要类型:
| 类型 | 定义 | 典型场景 | 影响程度 |
|---|---|---|---|
| 直接干扰 | 处理组直接影响其网络邻居 | 社交推荐、即时通讯 | ★★★★★ |
| 间接干扰 | 通过多跳路径传播的二级影响 | 信息级联、口碑传播 | ★★★★☆ |
| 全局干扰 | 改变整个网络的市场均衡 | 共享经济定价、平台匹配 | ★★★★★ |
1.3 核心指标污染机制
干扰效应通过以下路径污染核心指标:
- 处理组→对照组:正向溢出提升对照组表现,低估真实效果
- 对照组→处理组:处理组用户与对照组交互,稀释处理纯度
- 网络结构改变:实验本身改变用户连接模式
- 竞争效应:资源有限时,处理组获益可能以牺牲对照组为代价
II. 网络干扰效应的识别与测量
2.1 识别策略:从数据到洞察
在"邻里购"平台,我们拥有三类关键数据:
| 数据类型 | 表结构示例 | 网络价值 |
|---|---|---|
| 用户行为数据 | user_id, event_type, timestamp | 构建行为序列 |
| 社交关系数据 | user_id, friend_id, relationship_strength | 构建社交网络图 |
| 交易数据 | order_id, buyer_id, seller_id, amount | 识别经济交互 |
识别干扰效应的三步法:
# 步骤1:构建用户交互网络
import networkx as nx
import pandas as pd
def build_interaction_network(behavior_df, relationship_df, window_days=7):
"""
构建加权有向图,边权重表示交互强度
"""
G = nx.DiGraph()
# 添加社交关系边
for _, row in relationship_df.iterrows():
G.add_edge(row['user_id'], row['friend_id'],
weight=row['relationship_strength'],
type='social')
# 添加行为交互边(如分享、评论)
recent_behaviors = behavior_df[
behavior_df['timestamp'] >= pd.Timestamp.now() - pd.Timedelta(days=window_days)
]
# 计算用户间行为影响分数
interaction_matrix = recent_behaviors.groupby(
['source_user', 'target_user']
).agg({
'event_count': 'sum',
'time_decay': lambda x: sum(0.9**i for i in x)
}).reset_index()
for _, row in interaction_matrix.iterrows():
if G.has_edge(row['source_user'], row['target_user']):
# 增强现有边权重
G[row['source_user']][row['target_user']]['weight'] += row['time_decay']
else:
G.add_edge(row['source_user'], row['target_user'],
weight=row['time_decay'],
type='behavioral')
return G
# 步骤2:计算网络聚类系数
def measure_network_effects(G, treatment_assignments):
"""
测量网络中的干扰潜力
"""
# 计算每个节点的局部聚类系数
clustering_coeffs = nx.clustering(G.to_undirected())
# 计算处理组和对照组的网络暴露度
exposure_scores = {}
for node in G.nodes():
neighbors = list(G.neighbors(node))
if not neighbors:
exposure_scores[node] = 0
continue
# 计算邻居中处理组的比例
treated_neighbors = sum(
1 for n in neighbors if treatment_assignments.get(n) == 'treatment'
)
exposure_scores[node] = treated_neighbors / len(neighbors)
return clustering_coeffs, exposure_scores
2.2 干扰效应的定量测量
2.2.1 暴露映射函数(Exposure Mapping)
定义用户i的暴露度为:
E_i = f({T_j: j ∈ N_i})
其中T_j是邻居j的处理状态,N_i是i的邻居集合。常用映射函数包括:
| 映射类型 | 公式 | 适用场景 | 优点 |
|---|---|---|---|
| 线性加权和 | Σ w_ij × T_j | 加权网络 | 考虑关系强度 |
| 阈值函数 | I(Σ T_j > k) | 强干扰场景 | 离散化暴露 |
| 距离衰减 | Σ T_j × d_ij^(-α) | 长距离影响 | 捕捉多跳效应 |
2.2.2 干扰指数计算
def calculate_spillover_index(G, treatment_assignments, max_hops=2):
"""
计算多跳干扰指数
"""
spillover_effects = {}
for node in G.nodes():
if treatment_assignments.get(node) == 'control':
# 计算该对照组节点受到的处理影响
total_influence = 0
# 1跳邻居
hop1_neighbors = list(G.neighbors(node))
for n in hop1_neighbors:
if treatment_assignments.get(n) == 'treatment':
total_influence += G[node][n]['weight'] * 1.0
# 2跳邻居(衰减因子0.5)
for n in hop1_neighbors:
hop2_neighbors = list(G.neighbors(n))
for m in hop2_neighbors:
if m != node and treatment_assignments.get(m) == 'treatment':
# 路径权重衰减
path_weight = G[node][n]['weight'] * G[n][m]['weight']
total_influence += path_weight * 0.5
spillover_effects[node] = min(total_influence, 1.0) # 归一化
# 整体干扰指数
spillover_index = sum(spillover_effects.values()) / len(spillover_effects)
return spillover_index, spillover_effects
2.3 实例分析:邻里购平台的干扰检测
在2024年Q2,邻里购计划测试"拼团智能推荐"功能。我们首先进行为期两周的预实验,分析现有网络结构:
检测代码部署:
# 数据准备:从Hive提取行为数据
"""
-- Hive SQL提取交互数据
CREATE TABLE network_analysis.pre_experiment_interactions AS
SELECT
user_id,
target_user_id,
event_type,
COUNT(*) as interaction_count,
AVG(UNIX_TIMESTAMP() - UNIX_TIMESTAMP(event_timestamp)) as avg_lag_seconds
FROM user_behavior_events
WHERE dt BETWEEN '2024-04-01' AND '2024-04-14'
AND event_type IN ('share_product', 'invite_group', 'view_shared')
GROUP BY user_id, target_user_id, event_type
"""
# Python分析脚本
import matplotlib.pyplot as plt
import seaborn as sns
def analyze_pre_experiment_network():
# 加载数据
interactions_df = pd.read_sql(
"SELECT * FROM network_analysis.pre_experiment_interactions",
hive_connection
)
# 构建网络
G = nx.from_pandas_edgelist(
interactions_df,
source='user_id',
target='target_user_id',
edge_attr='interaction_count',
create_using=nx.DiGraph()
)
# 关键指标计算
print(f"网络规模: {G.number_of_nodes()} 用户")
print(f"连接总数: {G.number_of_edges()} 条")
print(f"网络密度: {nx.density(G):.4f}")
# 度分布分析
in_degrees = [d for n, d in G.in_degree()]
out_degrees = [d for n, d in G.out_degree()]
plt.figure(figsize=(12, 5))
plt.subplot(1, 2, 1)
plt.hist(in_degrees, bins=50, log=True)
plt.title('入度分布(被影响)')
plt.xlabel('入度')
plt.ylabel('用户数(log)')
plt.subplot(1, 2, 2)
plt.hist(out_degrees, bins=50, log=True)
plt.title('出度分布(影响他人)')
plt.xlabel('出度')
plt.ylabel('用户数(log)')
plt.savefig('network_degree_distribution.png')
# 检测强连通分量
sccs = list(nx.strongly_connected_components(G))
print(f"强连通分量数量: {len(sccs)}")
print(f"最大分量大小: {max(len(scc) for scc in sccs)} 用户")
return G
# 执行分析
network = analyze_pre_experiment_network()
检测结果:
- 网络包含2,340,567个活跃用户
- 平均出度为12.3,表明每个用户平均影响12个其他用户
- 网络密度0.0008看似稀疏,但最大连通分量包含89%用户
- 关键发现:前10%高度数用户的平均聚类系数达0.42,存在显著局部聚类
这一结果强烈提示:若采用传统随机分配,对照组将受到严重污染。
III. 实验设计策略:缓解干扰效应
3.1 设计模式分类
| 设计策略 | 核心思想 | 实施复杂度 | 统计效率 | 适用场景 |
|---|---|---|---|---|
| 图分区设计 | 切断簇间连接 | ★★★★☆ | ★★★★☆ | 社交网络、通讯网络 |
| 基于暴露的设计 | 直接建模干扰 | ★★★★★ | ★★★☆☆ | 已知影响机制 |
| 时间轮转设计 | 隔离时间维度 | ★★★☆☆ | ★★★☆☆ | 低频交互场景 |
| 地理聚类设计 | 空间隔离 | ★★☆☆☆ | ★★★☆☆ | O2O、本地服务 |
3.2 图分区设计(Graph Partition Design)
这是处理社交网络干扰的黄金标准。核心思想是将网络划分为若干簇,以簇为单位随机分配处理。
3.2.1 分区算法选择
from sklearn.cluster import SpectralClustering
import numpy as np
def partition_network(G, n_clusters=100, algorithm='louvain'):
"""
网络分区主函数
"""
if algorithm == 'louvain':
# Louvain社区发现算法
import community as community_louvain
partition = community_louvain.best_partition(G.to_undirected())
# 转换为簇分配字典
clusters = {}
for node, cluster_id in partition.items():
if cluster_id not in clusters:
clusters[cluster_id] = []
clusters[cluster_id].append(node)
elif algorithm == 'spectral':
# 谱聚类(需要指定簇数量)
adj_matrix = nx.adjacency_matrix(G).astype(float)
adj_matrix = (adj_matrix + adj_matrix.T) / 2 # 对称化
# 计算拉普拉斯矩阵
L = nx.laplacian_matrix(G).astype(float)
# 谱聚类
sc = SpectralClustering(n_clusters=n_clusters,
affinity='precomputed',
random_state=42)
labels = sc.fit_predict(adj_matrix)
clusters = {}
for idx, label in enumerate(labels):
node = list(G.nodes())[idx]
if label not in clusters:
clusters[label] = []
clusters[label].append(node)
elif algorithm == 'metis':
# 使用METIS库进行高质量分区(需安装pymetis)
import pymetis
adjacency_list = [list(G.neighbors(node)) for node in G.nodes()]
n_cuts, membership = pymetis.part_graph(n_clusters, adjacency=adjacency_list)
clusters = {}
for node_id, cluster_id in enumerate(membership):
node = list(G.nodes())[node_id]
if cluster_id not in clusters:
clusters[cluster_id] = []
clusters[cluster_id].append(node)
# 评估分区质量
partition_quality = evaluate_partition(G, clusters)
print(f"分区质量评估: {partition_quality}")
return clusters, partition_quality
def evaluate_partition(G, clusters):
"""
计算模块度和簇间边比例
"""
# 创建簇分配映射
node_to_cluster = {}
for cid, nodes in clusters.items():
for node in nodes:
node_to_cluster[node] = cid
# 计算簇间边比例
inter_cluster_edges = 0
total_edges = G.number_of_edges()
for u, v in G.edges():
if node_to_cluster[u] != node_to_cluster[v]:
inter_cluster_edges += 1
inter_cluster_ratio = inter_cluster_edges / total_edges
# 模块度(Modularity)
modularity = 0
m = total_edges
degrees = dict(G.degree())
for cid, nodes in clusters.items():
for i in nodes:
for j in nodes:
if i != j:
A_ij = 1 if G.has_edge(i, j) else 0
modularity += (A_ij - degrees[i]*degrees[j]/(2*m))
modularity = modularity / (2*m)
return {
'inter_cluster_edge_ratio': inter_cluster_ratio,
'modularity': modularity,
'num_clusters': len(clusters),
'avg_cluster_size': np.mean([len(c) for c in clusters.values()])
}
3.2.2 分区后随机化流程
def cluster_level_randomization(clusters, treatment_prob=0.5):
"""
簇级别随机分配处理
"""
import random
# 过滤小簇(阈值:最小50个用户)
valid_clusters = {cid: nodes for cid, nodes in clusters.items()
if len(nodes) >= 50}
print(f"有效簇数量: {len(valid_clusters)}")
print(f"覆盖用户比例: {sum(len(v) for v in valid_clusters.values()) /
sum(len(v) for v in clusters.values()):.2%}")
# 簇级别随机化
cluster_assignments = {}
treatment_clusters = set()
cluster_ids = list(valid_clusters.keys())
random.shuffle(cluster_ids)
n_treatment = int(len(cluster_ids) * treatment_prob)
for i, cid in enumerate(cluster_ids):
if i < n_treatment:
cluster_assignments[cid] = 'treatment'
treatment_clusters.add(cid)
else:
cluster_assignments[cid] = 'control'
# 生成最终用户级别的分配表
user_assignments = []
for cid, nodes in valid_clusters.items():
for user_id in nodes:
user_assignments.append({
'user_id': user_id,
'cluster_id': cid,
'cluster_assignment': cluster_assignments[cid]
})
assignment_df = pd.DataFrame(user_assignments)
# 验证分配比例
treatment_ratio = (assignment_df['cluster_assignment'] == 'treatment').mean()
print(f"处理组比例: {treatment_ratio:.2%}")
return assignment_df, cluster_assignments
3.3 基于暴露的设计(Exposure-Based Design)
当网络结构动态变化或无法分区时,直接建模暴露度更为有效。
class ExposureBasedDesign:
def __init__(self, G, exposure_mapping='linear'):
self.G = G
self.exposure_mapping = exposure_mapping
def compute_exposure(self, treatment_assignments):
"""
计算每个节点的处理暴露度
"""
exposures = {}
for node in self.G.nodes():
neighbors = list(self.G.neighbors(node))
if not neighbors:
exposures[node] = 0
continue
treated_neighbors = [
n for n in neighbors if treatment_assignments.get(n) == 'treatment'
]
if self.exposure_mapping == 'linear':
# 线性加权和
exposure = sum(
self.G[node][n]['weight'] for n in treated_neighbors
) / sum(
self.G[node][n]['weight'] for n in neighbors
) if neighbors else 0
elif self.exposure_mapping == 'threshold':
# 阈值函数(超过30%邻居处理则视为暴露)
exposure = int(len(treated_neighbors) / len(neighbors) > 0.3)
elif self.exposure_mapping == 'distance_decay':
# 距离衰减(考虑2跳邻居)
exposure = self._compute_decay_exposure(node, treatment_assignments)
exposures[node] = exposure
return exposures
def _compute_decay_exposure(self, node, treatment_assignments, alpha=0.5):
"""
带衰减的多跳暴露计算
"""
total_exposure = 0
# 1跳
for n in self.G.neighbors(node):
if treatment_assignments.get(n) == 'treatment':
total_exposure += self.G[node][n]['weight'] * 1.0
# 2跳
for n in self.G.neighbors(node):
for m in self.G.neighbors(n):
if m != node and treatment_assignments.get(m) == 'treatment':
path_weight = self.G[node][n]['weight'] * self.G[n][m]['weight']
total_exposure += path_weight * alpha
# 归一化
max_possible = sum(self.G[node][n]['weight'] for n in self.G.neighbors(node)) * (1 + alpha)
return total_exposure / max_possible if max_possible > 0 else 0
3.4 时间轮转设计(Switchback Design)
适用于低频交互场景,如网约车定价:
def switchback_experiment_design(time_slots, switch_interval='1H',
treatment_prob=0.5):
"""
时间轮转实验设计
"""
import numpy as np
# 生成时间槽
time_range = pd.date_range(
start=time_slots[0],
end=time_slots[-1],
freq=switch_interval
)
assignments = []
current_is_treatment = np.random.random() < treatment_prob
for i, time_slot in enumerate(time_range):
# 每n个时间槽切换一次
if i % 3 == 0 and i > 0: # 每3个槽切换
current_is_treatment = not current_is_treatment
assignments.append({
'time_slot': time_slot,
'is_treatment': current_is_treatment,
'exposure_prob': treatment_prob
})
return pd.DataFrame(assignments)
IV. 代码实现:基于图分区的实验部署
4.1 完整部署架构
我们将展示从数据仓库到实验服务的完整流水线。
# ==================== 部署架构配置 ====================
"""
实验部署依赖:
- Spark 3.4+ (分布式图计算)
- NetworkX 3.1 (图分析)
- Redis 7.0+ (实时分配服务)
- PostgreSQL 14+ (分配记录)
- Apache Airflow (调度)
"""
# ==================== 步骤1:数据ETL与图构建 ====================
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count, unix_timestamp, when, lit
class NetworkExperimentPipeline:
def __init__(self, experiment_id, treatment_prob=0.5):
self.experiment_id = experiment_id
self.treatment_prob = treatment_prob
self.spark = SparkSession.builder \
.appName(f"NetworkABTest_{experiment_id}") \
.config("spark.sql.shuffle.partitions", 200) \
.getOrCreate()
def extract_interaction_data(self, start_date, end_date):
"""
从Hive提取并构建交互数据
"""
# 构建Spark DataFrame
query = f"""
SELECT
user_id,
target_user_id,
event_type,
event_timestamp,
-- 计算时间衰减权重
EXP(-(UNIX_TIMESTAMP() - UNIX_TIMESTAMP(event_timestamp)) /
(7 * 24 * 3600)) as time_decay_weight
FROM user_behavior_events
WHERE dt BETWEEN '{start_date}' AND '{end_date}'
AND event_type IN ('share_product', 'invite_friend',
'group_chat_message', 'co_purchase')
"""
interactions_df = self.spark.sql(query)
# 聚合交互强度
aggregated_df = interactions_df.groupBy(
"user_id", "target_user_id"
).agg(
count("*").alias("interaction_count"),
(sum("time_decay_weight") / count("*")).alias("avg_weight")
)
return aggregated_df
def build_distributed_graph(self, aggregated_df):
"""
使用GraphFrame构建分布式图
"""
from graphframes import GraphFrame
# 创建顶点DataFrame
vertices_df = aggregated_df.select(
col("user_id").alias("id")
).union(
aggregated_df.select(col("target_user_id").alias("id"))
).distinct().withColumn("type", lit("user"))
# 创建边DataFrame
edges_df = aggregated_df.select(
col("user_id").alias("src"),
col("target_user_id").alias("dst"),
col("interaction_count").alias("weight"),
col("avg_weight")
)
# 构建GraphFrame
G = GraphFrame(vertices_df, edges_df)
print(f"图顶点数: {G.vertices.count()}")
print(f"图边数: {G.edges.count()}")
return G
def partition_graph(self, G, min_cluster_size=100):
"""
分布式Louvain分区(使用Spark的Pregel API)
"""
# 简化版:先collect到driver,使用NetworkX分区
# 生产环境应使用分布式实现
edges_local = G.edges.select("src", "dst", "weight").collect()
nx_graph = nx.DiGraph()
for row in edges_local:
nx_graph.add_edge(int(row['src']), int(row['dst']), weight=float(row['weight']))
# 使用Louvain算法
import community as community_louvain
partition_dict = community_louvain.best_partition(nx_graph.to_undirected())
# 转换回Spark DataFrame
partition_rdd = self.spark.sparkContext.parallelize([
(int(node), int(cluster_id)) for node, cluster_id in partition_dict.items()
])
partition_df = partition_rdd.toDF(["user_id", "cluster_id"])
# 过滤小簇并分配处理
cluster_stats = partition_df.groupBy("cluster_id").count()
valid_clusters = cluster_stats.filter(col("count") >= min_cluster_size)
# 广播有效簇列表
valid_cluster_ids = set([
row['cluster_id'] for row in valid_clusters.collect()
])
# 标记有效簇
filtered_df = partition_df.filter(col("cluster_id").isin(valid_cluster_ids))
# 簇级别随机化
from pyspark.sql import Window
from pyspark.sql.functions import row_number, rand, when
# 为每个簇分配处理状态
cluster_assignments = filtered_df.select(
"cluster_id"
).distinct().withColumn(
"assignment",
when(rand() < self.treatment_prob, "treatment").otherwise("control")
)
# 关联回用户
user_assignments = filtered_df.join(
cluster_assignments, on="cluster_id", how="left"
)
return user_assignments
def persist_assignments(self, assignment_df):
"""
持久化分配结果到PostgreSQL和Redis
"""
# 写入PostgreSQL(审计和离线分析)
jdbc_url = "jdbc:postgresql://analytics-db:5432/experiments"
assignment_df.withColumn(
"assigned_at", lit(pd.Timestamp.now())
).write.jdbc(
url=jdbc_url,
table=f"experiment_assignments.{self.experiment_id}",
mode="overwrite",
properties={"user": "exp_user", "password": "secure_pass"}
)
# 写入Redis(实时服务查询)
import redis
r = redis.Redis(host='redis-cluster', port=6379, db=0)
# 批量写入(避免大量小操作)
pipeline = r.pipeline()
for row in assignment_df.limit(1000000).collect(): # 分批处理
key = f"exp:{self.experiment_id}:user:{row['user_id']}"
pipeline.hset(key, mapping={
"cluster_id": str(row['cluster_id']),
"assignment": row['assignment']
})
pipeline.expire(key, 86400 * 30) # 30天过期
pipeline.execute()
print("分配结果已持久化到PostgreSQL和Redis")
# ==================== 使用示例 ====================
pipeline = NetworkExperimentPipeline(experiment_id="exp_social_rec_2024q2")
interactions = pipeline.extract_interaction_data("2024-04-01", "2024-04-14")
graph = pipeline.build_distributed_graph(interactions)
assignments = pipeline.partition_graph(graph, min_cluster_size=200)
pipeline.persist_assignments(assignments)
4.2 实时分配服务(Flask API)
# ==================== experiment_service.py ====================
from flask import Flask, request, jsonify
import redis
import logging
from typing import Dict, Optional
app = Flask(__name__)
r = redis.Redis(host='redis-cluster', port=6379, db=0, decode_responses=True)
class ExperimentConfig:
def __init__(self):
self.active_experiments = {
"exp_social_rec_2024q2": {
"name": "社交推荐算法V2",
"traffic_allocation": 0.8, # 80%流量参与实验
"layers": ["recommendation_layer"] # 实验层
}
}
config = ExperimentConfig()
def get_user_assignment(user_id: str, experiment_id: str) -> Optional[Dict]:
"""
从Redis获取用户实验分配
"""
try:
key = f"exp:{experiment_id}:user:{user_id}"
assignment = r.hgetall(key)
if not assignment:
# 回查PostgreSQL(冷启动场景)
import psycopg2
conn = psycopg2.connect(
host="analytics-db",
database="experiments",
user="exp_user",
password="secure_pass"
)
cursor = conn.cursor()
cursor.execute("""
SELECT cluster_id, assignment
FROM experiment_assignments.%s
WHERE user_id = %s
""", (experiment_id, user_id))
result = cursor.fetchone()
if result:
assignment = {
"cluster_id": str(result[0]),
"assignment": result[1]
}
# 回填Redis
r.hset(key, mapping=assignment)
r.expire(key, 86400)
conn.close()
return assignment
except Exception as e:
logging.error(f"获取分配失败: {e}")
return None
@app.route('/v1/experiment/assign', methods=['POST'])
def assign_experiment():
"""
实验分配API端点
{
"user_id": "12345",
"experiment_id": "exp_social_rec_2024q2"
}
"""
data = request.get_json()
user_id = data.get("user_id")
experiment_id = data.get("experiment_id")
if not user_id or not experiment_id:
return jsonify({"error": "缺少user_id或experiment_id"}), 400
# 验证实验存在
if experiment_id not in config.active_eximents:
return jsonify({"error": "实验不存在"}), 404
# 检查用户是否符合实验条件(如新老用户)
if not user_is_eligible(user_id, experiment_id):
return jsonify({
"user_id": user_id,
"is_in_experiment": False,
"reason": "用户不符合条件"
})
# 获取分配
assignment = get_user_assignment(user_id, experiment_id)
if not assignment:
return jsonify({
"user_id": user_id,
"is_in_experiment": False,
"reason": "分配未找到"
}), 404
# 返回完整信息
return jsonify({
"user_id": user_id,
"experiment_id": experiment_id,
"is_in_experiment": True,
"cluster_id": assignment["cluster_id"],
"assignment": assignment["assignment"],
"timestamp": pd.Timestamp.now().isoformat()
})
def user_is_eligible(user_id: str, experiment_id: str) -> bool:
"""
用户资格审核
"""
# 查询用户标签(从Redis或标签服务)
user_tags = r.hget(f"user:tags:{user_id}", "tags")
if not user_tags:
return True # 默认符合
tags = user_tags.split(",")
# 实验特定规则
exp_rules = {
"exp_social_rec_2024q2": {
"exclude_tags": ["new_user", "low_activity"]
}
}
rules = exp_rules.get(experiment_id, {})
excluded = set(rules.get("exclude_tags", []))
return len(set(tags) & excluded) == 0
@app.route('/health', methods=['GET'])
def health_check():
"""健康检查"""
try:
r.ping()
return jsonify({"status": "healthy"}), 200
except:
return jsonify({"status": "unhealthy"}), 500
if __name__ == '__main__':
app.run(host='0.0.0.0', port=8080, debug=False)
# ==================== 部署配置 ====================
"""
# Dockerfile
FROM python:3.9-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY experiment_service.py .
COPY config.yaml .
EXPOSE 8080
CMD ["gunicorn", "-w", "4", "-b", "0.0.0.0:8080", "experiment_service:app"]
# docker-compose.yml
version: '3.8'
services:
experiment-service:
build: .
ports:
- "8080:8080"
environment:
- REDIS_HOST=redis-cluster
- DB_HOST=analytics-db
depends_on:
- redis
- postgres
deploy:
replicas: 3
resources:
limits:
cpus: '2'
memory: 4G
"""
V. 实例分析:社交平台的推荐算法测试(2000字详细分析)
5.1 业务背景与问题定义
“邻里购"是一个拥有800万月活用户的社交电商平台,其核心功能是允许用户创建购物群组、分享商品并获得团购折扣。2024年Q2,产品团队开发了"智能拼团推荐算法V2”,该算法通过强化学习优化拼团建议,预期可提升成团率15%。
然而,平台产品经理发现:由于以下网络效应,传统A/B测试将严重低估真实效果:
干扰路径分析:
| 路径编号 | 传播机制 | 影响延迟 | 影响强度 | 可测量性 |
|---|---|---|---|---|
| I | 处理组用户分享更高质量拼团 → 对照组用户点击 → 间接体验新算法 | 即时(分钟) | 中(0.12) | 高 |
| II | 处理组成团率提升 → 平台整体商品池变动 → 对照组选择集改变 | 短期(小时) | 高(0.23) | 中 |
| III | 处理组满意度提升 → 口碑传播 → 对照组留存率改善 | 长期(周) | 低(0.07) | 低 |
| IV | 处理组占用更多履约资源 → 对照组配送延迟 → 负面溢出 | 中期(天) | 中(-0.08) | 高 |
5.2 实验设计决策树
基于预实验分析,我们面临关键决策:
| 评估维度 | 图分区设计 | 暴露建模设计 | 混合设计 |
|---|---|---|---|
| 网络稳定性 | ✅ 社区结构稳定 | ⚠️ 需要动态更新 | ✅ 静态+动态 |
| 计算成本 | 中等(一次性) | 高(持续计算) | 高 |
| 统计效力 | 高(大簇) | 中等(暴露方差) | 高 |
| 实施难度 | 中等 | 高(模型复杂) | 最高 |
| 选择 | ✓ 推荐 | 备选 | 资源充足时 |
5.3 实施细节与参数调优
5.3.1 分区参数选择
我们使用历史14天数据构建网络,关键参数如下:
# 分阶段调优代码
def tune_partition_parameters():
"""
分区参数调优:在模块度和簇大小间权衡
"""
results = []
for min_size in [50, 100, 200, 500]:
for resolution in [0.5, 1.0, 1.5, 2.0]:
# 使用不同分辨率的Louvain
partition_dict = community_louvain.best_partition(
G.to_undirected(),
resolution=resolution
)
# 计算指标
clusters = {}
for node, cid in partition_dict.items():
clusters.setdefault(cid, []).append(node)
# 过滤小簇
valid_clusters = {k: v for k, v in clusters.items() if len(v) >= min_size}
quality = evaluate_partition(G, valid_clusters)
results.append({
'min_cluster_size': min_size,
'resolution': resolution,
'num_clusters': len(valid_clusters),
'coverage': sum(len(v) for v in valid_clusters.values()) / len(G),
'inter_cluster_ratio': quality['inter_cluster_edge_ratio'],
'modularity': quality['modularity']
})
results_df = pd.DataFrame(results)
# 可视化帕累托前沿
plt.figure(figsize=(10, 6))
for min_size in results_df['min_cluster_size'].unique():
subset = results_df[results_df['min_cluster_size'] == min_size]
plt.scatter(subset['inter_cluster_ratio'],
subset['modularity'],
label=f'min_size={min_size}')
plt.xlabel('簇间边比例(越低越好)')
plt.ylabel('模块度(越高越好)')
plt.title('分区质量帕累托前沿')
plt.legend()
plt.grid(True)
plt.savefig('partition_tuning.png')
return results_df
# 执行调优
tuning_results = tune_partition_parameters()
best_config = tuning_results.loc[
(tuning_results['inter_cluster_ratio'] < 0.03) &
(tuning_results['coverage'] > 0.85)
].iloc[0]
print(f"最优配置: {best_config}")
调优结果:
- 最优参数:最小簇大小=200,分辨率=1.5
- 质量指标:簇间边比例2.1%(理想<3%),模块度0.68(良好>0.6)
- 覆盖率:有效覆盖91%的用户(丢弃的主要是低活跃用户)
- 簇数量:最终得到1,245个有效簇,平均簇大小847人
5.3.2 实验分组与流量分配
为确保统计效力,我们采用分层随机化:
def stratified_cluster_randomization(clusters, user_attributes_df,
treatment_prob=0.5):
"""
基于用户属性的分层簇随机化
"""
# 计算每个簇的关键指标
cluster_metrics = []
for cid, user_ids in clusters.items():
attrs = user_attributes_df[user_attributes_df['user_id'].isin(user_ids)]
cluster_metrics.append({
'cluster_id': cid,
'size': len(user_ids),
'avg_lifetime_value': attrs['lifetime_value'].mean(),
'avg_activity_score': attrs['activity_score'].mean(),
'region_mode': attrs['region'].mode().iloc[0] if not attrs.empty else 'unknown'
})
metrics_df = pd.DataFrame(cluster_metrics)
# 按大小区间分层
metrics_df['size_bin'] = pd.cut(
metrics_df['size'],
bins=[0, 300, 600, 1000, float('inf')],
labels=['small', 'medium', 'large', 'xlarge']
)
# 在每个层内独立随机化
assignments = []
for (size_bin, region), group in metrics_df.groupby(['size_bin', 'region_mode']):
n_clusters = len(group)
n_treatment = int(n_clusters * treatment_prob)
# 随机排序并分配
shuffled = group.sample(frac=1, random_state=42)
treatment_ids = set(shuffled.head(n_treatment)['cluster_id'])
for _, row in group.iterrows():
assignments.append({
'cluster_id': row['cluster_id'],
'assignment': 'treatment' if row['cluster_id'] in treatment_ids else 'control',
'size_bin': size_bin,
'region': region
})
assignment_df = pd.DataFrame(assignments)
# 验证均衡性
validate_balance(assignment_df, metrics_df)
return assignment_df
def validate_balance(assignment_df, metrics_df):
"""
验证处理组和对照组的协变量均衡性
"""
merged = assignment_df.merge(metrics_df, on='cluster_id')
for covariate in ['size', 'avg_lifetime_value', 'avg_activity_score']:
treat = merged[merged['assignment']=='treatment'][covariate]
control = merged[merged['assignment']=='control'][covariate]
# 计算标准化均值差(SMD)
smd = (treat.mean() - control.mean()) / np.sqrt(
(treat.var() + control.var()) / 2
)
print(f"{covariate} SMD: {smd:.3f} {'✓' if abs(smd) < 0.1 else '✗'}")
均衡性检验结果:
- 簇大小SMD: 0.032 ✓
- 平均LTV SMD: 0.087 ✓
- 活跃度SMD: 0.041 ✓
- 结论:两组协变量平衡良好,满足实验要求
5.4 实验执行与监控
5.4.1 实时分流逻辑
# 客户端SDK集成示例
"""
// JavaScript SDK (嵌入到APP/Web)
class NetworkABTestSDK {
constructor() {
this.experimentCache = new Map();
this.apiEndpoint = 'https://exp-api.neighborbuy.com/v1/experiment/assign';
}
async getAssignment(userId, experimentId) {
// 检查缓存
const cacheKey = `${experimentId}:${userId}`;
if (this.experimentCache.has(cacheKey)) {
return this.experimentCache.get(cacheKey);
}
try {
const response = await fetch(this.apiEndpoint, {
method: 'POST',
headers: {'Content-Type': 'application/json'},
body: JSON.stringify({user_id: userId, experiment_id: experimentId})
});
const result = await response.json();
// 只缓存有效的分配
if (result.is_in_experiment) {
this.experimentCache.set(cacheKey, result);
// 设置30分钟过期
setTimeout(() => this.experimentCache.delete(cacheKey), 30 * 60 * 1000);
}
return result;
} catch (error) {
console.error('实验分配失败:', error);
return {is_in_experiment: false, assignment: 'fallback'};
}
}
}
// 使用示例
const sdk = new NetworkABTestSDK();
const assignment = await sdk.getAssignment('user_12345', 'exp_social_rec_2024q2');
if (assignment.assignment === 'treatment') {
// 加载新算法
enableSmartRecommendation();
} else {
// 保持旧算法
useLegacyRecommendation();
}
"""
5.4.2 监控仪表板
# 实验监控脚本(每小时执行)
def monitor_experiment_health(experiment_id, date):
"""
监控实验健康度指标
"""
query = f"""
WITH user_metrics AS (
SELECT
ea.cluster_id,
ea.assignment,
COUNT(DISTINCT ub.user_id) as active_users,
AVG(ub.session_duration) as avg_session,
SUM(ub.purchase_amount) as total_gmv
FROM experiment_assignments.{experiment_id} ea
LEFT JOIN user_behavior_hourly ub
ON ea.user_id = ub.user_id
WHERE ub.dt = '{date}'
GROUP BY ea.cluster_id, ea.assignment
)
SELECT
assignment,
COUNT(*) as cluster_count,
SUM(active_users) as total_users,
AVG(avg_session) as avg_session_time,
SUM(total_gmv) as gmv,
STDDEV(total_gmv) as gmv_std
FROM user_metrics
GROUP BY assignment
"""
results = spark.sql(query).collect()
# 计算SRM(样本比例不匹配)检验
treat_users = next(r['total_users'] for r in results if r['assignment'] == 'treatment')
control_users = next(r['total_users'] for r in results if r['assignment'] == 'control')
total = treat_users + control_users
expected_ratio = 0.5
chi_squared = (treat_users - total*expected_ratio)**2 / (total*expected_ratio) + \
(control_users - total*expected_ratio)**2 / (total*expected_ratio)
srm_p_value = 1 - chi2.cdf(chi_squared, df=1)
alert = srm_p_value < 0.001 # 显著性水平0.1%
# 发送告警
if alert:
send_alert(
f"实验{experiment_id} SRM告警: p={srm_p_value:.4f}",
f"处理组: {treat_users}, 对照组: {control_users}"
)
return {
'date': date,
'srm_p_value': srm_p_value,
'is_healthy': not alert,
'group_summary': results
}
监控指标示例(实验第3天):
| 指标 | 处理组 | 对照组 | 预期差异 | 实际差异 | 状态 |
|---|---|---|---|---|---|
| 活跃用户数 | 1,024,567 | 1,018,923 | <0.5% | 0.55% | ⚠️ 需观察 |
| 簇数量 | 623 | 622 | <1% | 0.16% | ✅ 正常 |
| 平均会话时长 | 18.3min | 18.1min | <2% | 1.1% | ✅ 正常 |
| SRM p值 | - | - | >0.001 | 0.23 | ✅ 正常 |
5.5 结果分析与效果估计
5.5.1 初步结果(实验运行14天后)
使用簇级别聚合减少相关性:
def analyze_cluster_aggregated_results(experiment_id, end_date):
"""
簇聚合分析(减少干扰导致的方差膨胀)
"""
query = f"""
WITH user_level AS (
SELECT
ea.cluster_id,
ea.assignment,
ub.user_id,
SUM(ub.purchase_amount) as user_gmv,
COUNT(DISTINCT ub.order_id) as order_count,
-- 网络特定指标
COUNT(DISTINCT ub.shared_user_id) as unique_shares
FROM experiment_assignments.{experiment_id} ea
JOIN user_behavior_daily ub ON ea.user_id = ub.user_id
WHERE ub.dt BETWEEN '2024-05-01' AND '{end_date}'
GROUP BY ea.cluster_id, ea.assignment, ub.user_id
),
cluster_level AS (
SELECT
cluster_id,
assignment,
AVG(user_gmv) as avg_gmv,
AVG(order_count) as avg_orders,
AVG(unique_shares) as avg_shares,
COUNT(*) as cluster_size,
STDDEV(user_gmv) as gmv_std
FROM user_level
GROUP BY cluster_id, assignment
)
SELECT * FROM cluster_level
"""
cluster_df = spark.sql(query).toPandas()
# 按assignment分组
treatment = cluster_df[cluster_df['assignment'] == 'treatment']
control = cluster_df[cluster_df['assignment'] == 'control']
# 计算簇级别ATE
delta_gmv = treatment['avg_gmv'].mean() - control['avg_gmv'].mean()
delta_shares = treatment['avg_shares'].mean() - control['avg_shares'].mean()
# 考虑簇内相关性调整的t检验
from scipy.stats import ttest_ind
# 使用Satterthwaite近似自由度
t_stat, p_value = ttest_ind(
treatment['avg_gmv'],
control['avg_gmv'],
equal_var=False
)
# 计算置信区间
se_delta = np.sqrt(
treatment['avg_gmv'].var() / len(treatment) +
control['avg_gmv'].var() / len(control)
)
ci_lower = delta_gmv - 1.96 * se_delta
ci_upper = delta_gmv + 1.96 * se_delta
return {
'effect_size': delta_gmv,
'relative_lift': delta_gmv / control['avg_gmv'].mean(),
'p_value': p_value,
'ci_95': (ci_lower, ci_upper),
'n_clusters': (len(treatment), len(control))
}
# 执行分析
results = analyze_cluster_aggregated_results("exp_social_rec_2024q2", "2024-05-14")
print(f"""
实验效果报告(14天结果):
==============================
GMV提升:${results['effect_size']:.2f} (95% CI: [{results['ci_95'][0]:.2f}, {results['ci_95'][1]:.2f}])
相对提升:{results['relative_lift']:.2%}
统计显著性:p = {results['p_value']:.4f} {'***' if results['p_value'] < 0.001 else '**' if results['p_value'] < 0.01 else '*' if results['p_value'] < 0.05 else '不显著'}
有效簇数:处理组{results['n_clusters'][0]}个,对照组{results['n_clusters'][1]}个
""")
初步结果解读:
- GMV提升:+$2.87/用户/周(95% CI: [+$1.92, +$4.03])
- 相对提升:8.7%,但远低于产品团队预期的15%
- p值<0.001,统计高度显著
- 关键发现:新算法确实有效,但可能因干扰效应被低估
5.5.2 干扰效应校正分析
为了估计真实效果,我们实施暴露度工具变量分析:
def iv_analysis_with_exposure(experiment_id, end_date):
"""
使用工具变量法校正干扰效应
工具变量:簇分配(Z)
暴露度:实际处理邻居比例(E)
结果:个体GMV(Y)
"""
# 1. 计算每个用户的暴露度
exposure_query = f"""
WITH neighbor_assignments AS (
SELECT
ea1.user_id as target_user,
ea2.user_id as neighbor_user,
ea2.assignment as neighbor_assignment,
n.interaction_weight
FROM experiment_assignments.{experiment_id} ea1
CROSS JOIN experiment_assignments.{experiment_id} ea2
JOIN network_analysis.user_network n
ON ea1.user_id = n.user_id AND ea2.user_id = n.neighbor_id
WHERE ea1.assignment = 'control' -- 只分析对照组
),
exposure_calc AS (
SELECT
target_user,
SUM(CASE WHEN neighbor_assignment = 'treatment'
THEN interaction_weight ELSE 0 END) /
NULLIF(SUM(interaction_weight), 0) as exposure_score
FROM neighbor_assignments
GROUP BY target_user
)
SELECT * FROM exposure_calc
"""
exposure_df = spark.sql(exposure_query).toPandas()
# 2. 获取对照组用户的实际结果
outcome_query = f"""
SELECT
ea.user_id,
SUM(ub.purchase_amount) as gmv,
ea.cluster_id
FROM experiment_assignments.{experiment_id} ea
JOIN user_behavior_daily ub ON ea.user_id = ub.user_id
WHERE ea.assignment = 'control'
AND ub.dt BETWEEN '2024-05-01' AND '{end_date}'
GROUP BY ea.user_id, ea.cluster_id
"""
outcome_df = spark.sql(outcome_query).toPandas()
# 3. 合并暴露度和结果
analysis_df = outcome_df.merge(exposure_df,
left_on='user_id',
right_on='target_user',
how='inner')
# 4. 工具变量回归
import statsmodels.api as sm
from statsmodels.sandbox.regression.gmm import IV2SLS
# 第一阶段:暴露度 ~ 簇分配
# 计算簇级别处理比例(作为工具变量强度)
cluster_treatment_rate = spark.sql(f"""
SELECT
cluster_id,
AVG(CASE WHEN assignment = 'treatment' THEN 1 ELSE 0 END) as cluster_treat_rate
FROM experiment_assignments.{experiment_id}
GROUP BY cluster_id
""").toPandas()
analysis_df = analysis_df.merge(cluster_treatment_rate, on='cluster_id')
# 转换为numpy数组
Y = analysis_df['gmv'].values # 结果
E = analysis_df['exposure_score'].fillna(0).values # 内生暴露度
Z = analysis_df['cluster_treat_rate'].values # 工具变量
X = analysis_df[['cluster_id']].values # 控制变量
# 添加常数项
Z_with_const = sm.add_constant(Z)
# 2SLS估计
iv_model = IV2SLS(Y, X, E, Z_with_const).fit()
# 计算真实处理效应(排除溢出)
true_effect = iv_model.params[1] / iv_model.params[0] # 假设简单线性关系
return {
'iv_coefficient': iv_model.params[1],
'se': iv_model.bse[1],
'p_value': iv_model.pvalues[1],
'true_effect_estimate': true_effect,
'first_stage_f_stat': iv_model.first_stage.model_1.fvalue,
'exposure_df': analysis_df
}
# 执行IV分析
iv_results = iv_analysis_with_exposure("exp_social_rec_2024q2", "2024-05-14")
print(f"""
工具变量分析结果:
==================
真实处理效应估计:${iv_results['true_effect_estimate']:.2f}
标准误:{iv_results['se']:.3f}
p值:{iv_results['p_value']:.4f}
第一阶段F统计量:{iv_results['first_stage_f_stat']:.2f} {'(强工具变量)' if iv_results['first_stage_f_stat'] > 10 else '(弱工具变量警告)'}
与传统ATE对比:
- 未校正ATE: +$2.87
- 校正后TE: +$3.42
- 低估幅度: {(3.42-2.87)/3.42:.1%}
""")
校正结果解读:
- 工具变量分析显示,真实效应为+$3.42/用户,比原始估计高19%
- 低估原因:对照组因处理组溢出获得了$0.55/用户的正向影响
- 第一阶段F统计量=38.4 > 10,说明工具变量强有效
- 结论:产品团队的15%预期目标(约+$4.50)仍未达到,但差距缩小
5.5.3 网络效应分解
进一步分解直接效应与间接效应:
def decompose_network_effects(experiment_id, end_date):
"""
分解直接效应、溢出效应和总效应
"""
# 计算每个簇的"处理密度"
cluster_density = spark.sql(f"""
WITH intra_cluster_edges AS (
SELECT
ea1.cluster_id,
COUNT(*) as total_edges,
SUM(CASE WHEN ea1.assignment != ea2.assignment THEN 1 ELSE 0 END) as cross_edges
FROM experiment_assignments.{experiment_id} ea1
JOIN experiment_assignments.{experiment_id} ea2
ON ea1.user_id != ea2.user_id
WHERE ea1.cluster_id = ea2.cluster_id
GROUP BY ea1.cluster_id
)
SELECT
cluster_id,
cross_edges / NULLIF(total_edges, 0) as spillover_potential
FROM intra_cluster_edges
""").toPandas()
# 获取簇级别结果
cluster_outcomes = spark.sql(f"""
SELECT
ea.cluster_id,
ea.assignment,
AVG(ub.purchase_amount) as avg_gmv,
COUNT(DISTINCT ea.user_id) as cluster_size
FROM experiment_assignments.{experiment_id} ea
LEFT JOIN user_behavior_daily ub
ON ea.user_id = ub.user_id
AND ub.dt BETWEEN '2024-05-01' AND '{end_date}'
GROUP BY ea.cluster_id, ea.assignment
""").toPandas()
# 合并数据
analysis_df = cluster_outcomes.merge(cluster_density, on='cluster_id')
# 回归分解
import statsmodels.formula.api as smf
# 模型:GMV = β₀ + β₁·Treatment + β₂·SpilloverPotential + β₃·Treatment×Spillover
model = smf.ols(
'avg_gmv ~ assignment + spillover_potential + assignment:spillover_potential',
data=analysis_df
).fit()
# 提取系数
direct_effect = model.params['assignment[T.treatment]']
spillover_main = model.params['spillover_potential']
interaction_effect = model.params['assignment[T.treatment]:spillover_potential']
return {
'model_summary': model.summary(),
'direct_effect': direct_effect,
'spillover_main': spillover_main,
'interaction_effect': interaction_effect,
'total_effect_in_pure_cluster': direct_effect, # 纯处理簇
'total_effect_in_mixed_cluster': direct_effect + interaction_effect * 0.5 # 假设平均混淆度0.5
}
# 执行分解
decomp = decompose_network_effects("exp_social_rec_2024q2", "2024-05-14")
print(f"""
网络效应分解:
==============
直接效应(无混淆簇): +${decomp['direct_effect']:.2f}
溢出主效应: {decomp['spillover_main']:+.2f}
交互效应: {decomp['interaction_effect']:+.2f}
不同簇类型的预期效果:
- 纯处理簇(无边跨组): +${decomp['total_effect_in_pure_cluster']:.2f}
- 混合簇(平均混淆): +${decomp['total_effect_in_mixed_cluster']:.2f}
""")
分解洞察:
- 直接效应+$3.61:在完全隔离的理想环境中,算法效果更强
- 负向交互效应-0.32:说明簇内跨组边会导致竞争/稀释
- 纯处理簇效果比混合簇高9%,验证分区质量的重要性
- 建议:未来实验可进一步优化分区,将跨组边压缩至<1%
5.6 业务决策与迭代
基于完整分析,我们向管理层提交以下建议:
| 决策维度 | 传统分析结论 | 网络校正分析结论 | 业务建议 |
|---|---|---|---|
| 算法效果 | 8.7%提升(不迭预期) | 12.1%真实提升(接近预期) | ✅ 全量发布 |
| 低估程度 | 未识别 | 19%的效应被稀释 | 未来实验必须网络设计 |
| 分区质量 | 未评估 | 2.1%跨组边,可改进 | 下次使用METIS优化 |
| 资源投入 | 继续优化算法 | 算法已达标,需改进测试 | 平衡开发与实验工程 |
最终决策:批准全量发布,但要求数据科学团队建立"网络感知实验平台",将本次的临时方案产品化。
VI. 高级方法:准实验设计与机器学习
6.1 合成控制法(Synthetic Control)
当无法随机化时,构建合成对照组。
def synthetic_control_network_experiment(treated_units, outcome_matrix, covariates):
"""
网络合成控制法
treated_units: 被处理簇ID列表
outcome_matrix: 面板数据,列为时间,行为簇
covariates: 协变量矩阵
"""
from sklearn.linear_model import Ridge
from sklearn.preprocessing import StandardScaler
# 标准化
scaler = StandardScaler()
cov_scaled = scaler.fit_transform(covariates)
# 预处理期优化权重
pre_period_outcomes = outcome_matrix.loc[:, :'treatment_start']
# 构建合成对照
ridge = Ridge(alpha=0.1, fit_intercept=False)
# 目标:使合成组在预处理期匹配处理组
ridge.fit(cov_scaled, pre_period_outcomes.mean())
weights = ridge.coef_
weights = weights / weights.sum() # 归一化
# 预测反事实
post_period = outcome_matrix.loc[:, 'treatment_start':]
synthetic_outcome = post_period @ weights
# 计算效应
actual_outcome = outcome_matrix.loc[treated_units, 'treatment_start':].mean()
effect = actual_outcome - synthetic_outcome
return {
'weights': weights,
'actual': actual_outcome,
'synthetic': synthetic_outcome,
'effect': effect,
'mspe': ((actual_outcome - synthetic_outcome) ** 2).mean()
}
6.2 图神经网络(GNN)用于暴露预测
import torch
import torch.nn as nn
from torch_geometric.nn import GraphConv, global_mean_pool
class ExposurePredictionGNN(nn.Module):
def __init__(self, node_feature_dim, hidden_dim=64):
super().__init__()
self.conv1 = GraphConv(node_feature_dim, hidden_dim)
self.conv2 = GraphConv(hidden_dim, hidden_dim)
self.conv3 = GraphConv(hidden_dim, hidden_dim)
self.predictor = nn.Sequential(
nn.Linear(hidden_dim * 2, hidden_dim),
nn.ReLU(),
nn.Dropout(0.2),
nn.Linear(hidden_dim, 1),
nn.Sigmoid()
)
def forward(self, x, edge_index, batch, treatment_assignments):
# 图卷积
h1 = torch.relu(self.conv1(x, edge_index))
h2 = torch.relu(self.conv2(h1, edge_index))
h3 = torch.relu(self.conv3(h2, edge_index))
# 全局池化
graph_rep = global_mean_pool(h3, batch)
# 结合处理状态预测暴露
treat_features = torch.FloatTensor([
treatment_assignments.get(node.item(), 0) for node in batch.unique()
])
combined = torch.cat([graph_rep, treat_features.unsqueeze(1)], dim=1)
exposure_pred = self.predictor(combined)
return exposure_pred
# 训练循环(伪代码)
"""
model = ExposurePredictionGNN(node_feature_dim=128)
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
criterion = nn.MSELoss()
for epoch in range(100):
model.train()
for batch in dataloader:
optimizer.zero_grad()
pred_exposure = model(
batch.x,
batch.edge_index,
batch.batch,
batch.treatment_assignments
)
loss = criterion(pred_exposure, batch.true_exposure)
loss.backward()
optimizer.step()
"""
6.3 双重机器学习(Double Machine Learning)
from sklearn.ensemble import RandomForestRegressor, GradientBoostingRegressor
from sklearn.model_selection import KFold
class DMLNetworkEstimator:
def __init__(self, n_folds=5):
self.n_folds = n_folds
self.y_model = GradientBoostingRegressor(n_estimators=100, random_state=42)
self.d_model = RandomForestRegressor(n_estimators=100, random_state=42)
def estimate_effect(self, Y, D, X, network_features):
"""
Y: 结果变量
D: 处理变量(带混淆)
X: 协变量
network_features: 网络特征(如聚类系数、中心性)
"""
# 合并特征
features = pd.concat([X, network_features], axis=1)
kf = KFold(n_splits=self.n_folds, shuffle=True, random_state=42)
Y_residuals = np.zeros_like(Y)
D_residuals = np.zeros_like(D)
for train_idx, test_idx in kf.split(features):
# 训练集
X_train, X_test = features.iloc[train_idx], features.iloc[test_idx]
Y_train, Y_test = Y[train_idx], Y[test_idx]
D_train, D_test = D[train_idx], D[test_idx]
# 预测Y
self.y_model.fit(X_train, Y_train)
Y_pred = self.y_model.predict(X_test)
Y_residuals[test_idx] = Y_test - Y_pred
# 预测D
self.d_model.fit(X_train, D_train)
D_pred = self.d_model.predict(X_test)
D_residuals[test_idx] = D_test - D_pred
# 最终估计:残差回归
effect = np.mean(Y_residuals * D_residuals) / np.mean(D_residuals ** 2)
# 计算标准误(简单Bootstrap)
n = len(Y)
bootstrap_effects = []
for _ in range(1000):
sample_idx = np.random.choice(n, n, replace=True)
effect_boot = np.mean(Y_residuals[sample_idx] * D_residuals[sample_idx]) / \
np.mean(D_residuals[sample_idx] ** 2)
bootstrap_effects.append(effect_boot)
se = np.std(bootstrap_effects)
ci_lower = effect - 1.96 * se
ci_upper = effect + 1.96 * se
return {
'effect': effect,
'se': se,
'ci_95': (ci_lower, ci_upper)
}
VII. 结果分析与验证
7.1 敏感性分析
7.1.1 分区稳定性检验
def partition_stability_test(G, time_windows=['14d', '30d', '60d']):
"""
验证分区在不同时间窗口的稳定性
"""
from sklearn.metrics import adjusted_rand_score
partitions = {}
for window in time_windows:
# 构建不同时期的网络
window_edges = get_edges_for_window(G, window)
window_G = nx.from_pandas_edgelist(window_edges, create_using=nx.DiGraph())
# 分区
partition = community_louvain.best_partition(window_G.to_undirected())
partitions[window] = partition
# 计算ARI
ari_scores = {}
for i, w1 in enumerate(time_windows):
for j, w2 in enumerate(time_windows):
if i < j:
# 对齐用户集合
common_users = set(partitions[w1].keys()) & set(partitions[w2].keys())
labels1 = [partitions[w1][u] for u in common_users]
labels2 = [partitions[w2][u] for u in common_users]
ari = adjusted_rand_score(labels1, labels2)
ari_scores[f"{w1}_vs_{w2}"] = ari
return ari_scores
# 执行检验
stability = partition_stability_test(network)
print("分区稳定性(ARI):", stability)
# 输出示例: {'14d_vs_30d': 0.78, '14d_vs_60d': 0.65, '30d_vs_60d': 0.71}
# ARI > 0.7 表明稳定性良好
7.1.2 安慰剂检验
def placebo_test(experiment_id, n_placebo_runs=100):
"""
在预处理期运行安慰剂实验
"""
placebo_effects = []
for i in range(n_placebo_runs):
# 在预处理期(randomize)
placebo_assignments = cluster_level_randomization(
clusters,
user_attributes_df,
treatment_prob=0.5
)
# 计算"伪效应"(预处理期应无差异)
pre_period_results = analyze_cluster_aggregated_results(
experiment_id=f"placebo_{i}",
end_date='2024-04-24' # 实验开始前
)
placebo_effects.append(pre_period_results['effect_size'])
# 检验分布
plt.figure(figsize=(10, 6))
plt.hist(placebo_effects, bins=20, alpha=0.7)
plt.axvline(x=0, color='red', linestyle='--')
plt.title('安慰剂检验:预处理期效应分布')
plt.xlabel('伪效应大小')
plt.ylabel('频率')
plt.savefig('placebo_test.png')
# 计算p值
observed_effect = 2.87 # 真实实验效应
placebo_p = np.mean(np.abs(placebo_effects) >= abs(observed_effect))
return {
'placebo_effects': placebo_effects,
'p_value': placebo_p,
'mean_bias': np.mean(placebo_effects),
'is_valid': abs(np.mean(placebo_effects)) < 0.1
}
placebo_results = placebo_test("exp_social_rec_2024q2")
print(f"安慰剂检验p值: {placebo_results['p_value']:.3f}")
print(f"平均偏差: ${placebo_results['mean_bias']:.3f}")
7.2 异质性分析
7.2.1 按网络位置分层
def heterogeneous_treatment_effects_by_position(G, assignment_df, outcomes_df):
"""
按网络中心性分层的效应估计
"""
# 计算网络中心性指标
centrality_measures = {
'degree': nx.degree_centrality(G),
'betweenness': nx.betweenness_centrality(G, k=1000),
'pagerank': nx.pagerank(G)
}
# 合并数据
merged = assignment_df.merge(outcomes_df, on='user_id', how='inner')
for measure_name, measure_dict in centrality_measures.items():
merged[f'{measure_name}_bin'] = pd.qcut(
merged['user_id'].map(measure_dict),
q=5,
labels=['Q1', 'Q2', 'Q3', 'Q4', 'Q5']
)
# 每层效应
het_effects = []
for bin_label in ['Q1', 'Q2', 'Q3', 'Q4', 'Q5']:
subset = merged[merged[f'{measure_name}_bin'] == bin_label]
treat = subset[subset['assignment'] == 'treatment']['outcome']
control = subset[subset['assignment'] == 'control']['outcome']
effect = treat.mean() - control.mean()
se = np.sqrt(treat.var()/len(treat) + control.var()/len(control))
het_effects.append({
'bin': bin_label,
'effect': effect,
'se': se,
'n_treat': len(treat),
'n_control': len(control)
})
# 绘图
effects_df = pd.DataFrame(het_effects)
plt.figure(figsize=(8, 5))
plt.errorbar(effects_df['bin'], effects_df['effect'],
yerr=1.96*effects_df['se'], marker='o')
plt.title(f'效应异质性:按{measure_name}分层')
plt.xlabel(f'{measure_name} 分位')
plt.ylabel('处理效应')
plt.axhline(y=0, color='gray', linestyle='--')
plt.savefig(f'het_effect_{measure_name}.png')
# 执行分析
het_results = heterogeneous_treatment_effects_by_position(
G=network,
assignment_df=user_assignments,
outcomes_df=user_outcomes
)
异质性发现:
- **高度数用户(Q5)**效应+$4.12,显著高于低度数用户+$1.98
- PageRank高分用户效应+$4.87,表明意见领袖获益更多
- 介数中心性分层显示,桥接用户效应-$0.45(可能因信息过载)
7.3 长期效应追踪
def long_term_effect_tracking(experiment_id, followup_weeks=12):
"""
追踪实验结束后效应持续性
"""
weekly_effects = []
for week in range(followup_weeks):
week_end = pd.Timestamp('2024-05-14') + pd.Timedelta(weeks=week)
# 查询该周所有用户(包括实验后新增)
query = f"""
SELECT
ea.user_id,
ea.assignment, -- 原始分配
ub.purchase_amount,
ub.week
FROM experiment_assignments.{experiment_id} ea
JOIN user_behavior_weekly ub ON ea.user_id = ub.user_id
WHERE ub.week = '{week_end.strftime('%Y-W%W')}'
AND ea.assignment IN ('treatment', 'control')
"""
week_data = spark.sql(query).toPandas()
treat_gmv = week_data[week_data['assignment']=='treatment']['purchase_amount'].mean()
control_gmv = week_data[week_data['assignment']=='control']['purchase_amount'].mean()
weekly_effects.append({
'week': week,
'effect': treat_gmv - control_gmv,
'treat_gmv': treat_gmv,
'control_gmv': control_gmv
})
# 可视化衰减
effect_df = pd.DataFrame(weekly_effects)
plt.figure(figsize=(12, 6))
plt.plot(effect_df['week'], effect_df['effect'], marker='o')
plt.fill_between(effect_df['week'],
effect_df['effect'] - 1.96*effect_df['effect'].std(),
effect_df['effect'] + 1.96*effect_df['effect'].std(),
alpha=0.3)
plt.axhline(y=0, color='red', linestyle='--')
plt.title('处理效应随时间衰减')
plt.xlabel('实验后周数')
plt.ylabel('GMV效应')
plt.savefig('long_term_decay.png')
return effect_df
# 执行追踪
long_term = long_term_effect_tracking("exp_social_rec_2024q2")
长期衰减模式:
- 第1-4周:效应保持+$2.8-3.2(用户习惯形成)
- 第5-8周:衰减至+$2.1(新奇效应减弱)
- 第9-12周:稳定在+$1.8(持续效应)
- 结论:算法有持续价值,但需结合运营活动维持热度
VIII. 最佳实践与常见问题
8.1 实施清单
| 阶段 | 检查项 | 验收标准 | 工具/脚本 |
|---|---|---|---|
| 设计前 | 网络干扰评估 | 干扰指数>0.05需网络设计 | calculate_spillover_index |
| 分区 | 模块度检验 | >0.60 | evaluate_partition |
| 随机化 | 协变量均衡 | SMD<0.1 | validate_balance |
| 运行中 | SRM监控 | p>0.001 | monitor_experiment_health |
| 分析 | 簇聚合 | ICC<0.1 | analyze_cluster_aggregated |
| 验证 | 安慰剂检验 | 均值偏差<$0.1 | placebo_test |
| 发布 | 长期追踪 | 效应衰减<50% | long_term_effect_tracking |
8.2 常见问题与解决方案
| 问题现象 | 根本原因 | 检测方法 | 解决方案 | 成本 |
|---|---|---|---|---|
| 统计效力不足 | 簇数量太少 | n_clusters < 100 | 降低min_cluster_size | 增加干扰 |
| SRM告警 | 分配不均衡 | chi^2检验 | 分层随机化 | 无 |
| 分区不稳定 | 网络动态变化 | ARI检验 | 缩短分区时间窗口 | 计算成本↑ |
| 弱工具变量 | 跨组边过多 | F统计量<10 | 重新分区或排除边缘 | 覆盖率↓ |
| 长尾簇 | 度分布幂律 | 最大簇 > 10k用户 | 随机采样或强制拆分 | 分区质量↓ |
8.3 性能优化指南
大数据场景(>1000万用户):
- 使用Pytorch Geometric分布式图分区
- 预计算Louvain分区并缓存(周级更新)
- Redis Pipeline批量写入分配结果
- Spark缓存中间结果:
df.persist(StorageLevel.MEMORY_AND_DISK)
实时服务场景(QPS>1000):
- 本地缓存:Caffeine(Java)或LRU(Python)
- 分配结果TTL:15分钟
- Redis Cluster分片:按
user_id % 1024 - Hystrix熔断:Redis故障时降级为传统随机分配
# 优化后的分区函数:使用PySpark原生实现
def optimize_partition_with_pyspark(G, min_cluster_size=200):
"""
使用PySpark GraphFrames的Connected Components作为快速分区
"""
# 1. 计算连通分量(快速近似)
cc = G.connectedComponents()
# 2. 统计分量大小
cc_stats = cc.groupBy("component").count().filter("count >= 5000")
# 3. 对大分量进行二次分区(采样后Louvain)
large_components = cc_stats.collect()
final_assignments = []
for component_row in large_components:
component_id = component_row['component']
# 提取子图
subgraph_nodes = cc.filter(f"component = {component_id}").select("id")
subgraph_edges = G.edges.join(
subgraph_nodes.select(col("id").alias("src")),
"src"
).join(
subgraph_nodes.select(col("id").alias("dst")),
"dst"
)
# 采样到driver进行Louvain(避免collect全量数据)
sampled_edges = subgraph_edges.sample(fraction=0.3).collect()
if sampled_edges:
nx_subgraph = nx.DiGraph()
for edge in sampled_edges:
nx_subgraph.add_edge(int(edge['src']), int(edge['dst']))
# Louvain分区
partition = community_louvain.best_partition(nx_subgraph.to_undirected())
# 映射回原节点
for node, cluster_id in partition.items():
final_assignments.append({
'user_id': node,
'cluster_id': f"{component_id}_{cluster_id}"
})
return spark.createDataFrame(final_assignments)
- 点赞
- 收藏
- 关注作者
评论(0)