网络A/B测试:处理干扰效应的实验设计

举报
数字扫地僧 发表于 2025/11/29 18:25:09 2025/11/29
【摘要】 I. 引言:理解网络干扰效应 1.1 传统A/B测试的局限性在数字化产品迭代中,A/B测试已成为数据驱动决策的黄金标准。然而,当实验单元之间存在交互关系时,传统独立同分布(i.i.d.)假设被打破。想象这样一个场景:在社交电商平台"邻里购"中,我们为部分用户启用了新的社交推荐算法(处理组),而对照组继续使用旧算法。由于用户之间存在好友关系、分享行为和团购互动,处理组用户的正面体验可能通过社...

I. 引言:理解网络干扰效应

1.1 传统A/B测试的局限性

在数字化产品迭代中,A/B测试已成为数据驱动决策的黄金标准。然而,当实验单元之间存在交互关系时,传统独立同分布(i.i.d.)假设被打破。想象这样一个场景:在社交电商平台"邻里购"中,我们为部分用户启用了新的社交推荐算法(处理组),而对照组继续使用旧算法。由于用户之间存在好友关系、分享行为和团购互动,处理组用户的正面体验可能通过社交链路"溢出"到对照组——他们向朋友推荐商品,而这些朋友可能恰好是对照组用户。这种干扰导致对照组的行为也发生间接变化,最终稀释甚至扭曲实验效果。

传统A/B测试假设 vs 网络A/B测试现实
假设 传统A/B测试 网络A/B测试
独立性 用户行为互不影响 用户通过社交网络相互影响
SUTVA 稳定单元干预值假设成立 违反SUTVA,存在溢出效应
随机化单元 单个用户 需要簇(Cluster)或图结构
分析维度 用户级指标 需考虑网络级指标

1.2 干扰效应的类型学

网络干扰效应可分为三种主要类型:

类型 定义 典型场景 影响程度
直接干扰 处理组直接影响其网络邻居 社交推荐、即时通讯 ★★★★★
间接干扰 通过多跳路径传播的二级影响 信息级联、口碑传播 ★★★★☆
全局干扰 改变整个网络的市场均衡 共享经济定价、平台匹配 ★★★★★

1.3 核心指标污染机制

干扰效应通过以下路径污染核心指标:

  • 处理组→对照组:正向溢出提升对照组表现,低估真实效果
  • 对照组→处理组:处理组用户与对照组交互,稀释处理纯度
  • 网络结构改变:实验本身改变用户连接模式
  • 竞争效应:资源有限时,处理组获益可能以牺牲对照组为代价
实验开始
随机分配用户
存在网络连接?
处理组行为改变
影响邻居行为
对照组指标偏移
ATE估计偏差
传统分析有效

II. 网络干扰效应的识别与测量

2.1 识别策略:从数据到洞察

在"邻里购"平台,我们拥有三类关键数据:

数据类型 表结构示例 网络价值
用户行为数据 user_id, event_type, timestamp 构建行为序列
社交关系数据 user_id, friend_id, relationship_strength 构建社交网络图
交易数据 order_id, buyer_id, seller_id, amount 识别经济交互

识别干扰效应的三步法:

# 步骤1:构建用户交互网络
import networkx as nx
import pandas as pd

def build_interaction_network(behavior_df, relationship_df, window_days=7):
    """
    构建加权有向图,边权重表示交互强度
    """
    G = nx.DiGraph()
    
    # 添加社交关系边
    for _, row in relationship_df.iterrows():
        G.add_edge(row['user_id'], row['friend_id'], 
                  weight=row['relationship_strength'],
                  type='social')
    
    # 添加行为交互边(如分享、评论)
    recent_behaviors = behavior_df[
        behavior_df['timestamp'] >= pd.Timestamp.now() - pd.Timedelta(days=window_days)
    ]
    
    # 计算用户间行为影响分数
    interaction_matrix = recent_behaviors.groupby(
        ['source_user', 'target_user']
    ).agg({
        'event_count': 'sum',
        'time_decay': lambda x: sum(0.9**i for i in x)
    }).reset_index()
    
    for _, row in interaction_matrix.iterrows():
        if G.has_edge(row['source_user'], row['target_user']):
            # 增强现有边权重
            G[row['source_user']][row['target_user']]['weight'] += row['time_decay']
        else:
            G.add_edge(row['source_user'], row['target_user'],
                      weight=row['time_decay'],
                      type='behavioral')
    
    return G

# 步骤2:计算网络聚类系数
def measure_network_effects(G, treatment_assignments):
    """
    测量网络中的干扰潜力
    """
    # 计算每个节点的局部聚类系数
    clustering_coeffs = nx.clustering(G.to_undirected())
    
    # 计算处理组和对照组的网络暴露度
    exposure_scores = {}
    for node in G.nodes():
        neighbors = list(G.neighbors(node))
        if not neighbors:
            exposure_scores[node] = 0
            continue
        
        # 计算邻居中处理组的比例
        treated_neighbors = sum(
            1 for n in neighbors if treatment_assignments.get(n) == 'treatment'
        )
        exposure_scores[node] = treated_neighbors / len(neighbors)
    
    return clustering_coeffs, exposure_scores

2.2 干扰效应的定量测量

2.2.1 暴露映射函数(Exposure Mapping)

定义用户i的暴露度为:

E_i = f({T_j: j ∈ N_i})

其中T_j是邻居j的处理状态,N_i是i的邻居集合。常用映射函数包括:

映射类型 公式 适用场景 优点
线性加权和 Σ w_ij × T_j 加权网络 考虑关系强度
阈值函数 I(Σ T_j > k) 强干扰场景 离散化暴露
距离衰减 Σ T_j × d_ij^(-α) 长距离影响 捕捉多跳效应

2.2.2 干扰指数计算

def calculate_spillover_index(G, treatment_assignments, max_hops=2):
    """
    计算多跳干扰指数
    """
    spillover_effects = {}
    
    for node in G.nodes():
        if treatment_assignments.get(node) == 'control':
            # 计算该对照组节点受到的处理影响
            total_influence = 0
            
            # 1跳邻居
            hop1_neighbors = list(G.neighbors(node))
            for n in hop1_neighbors:
                if treatment_assignments.get(n) == 'treatment':
                    total_influence += G[node][n]['weight'] * 1.0
            
            # 2跳邻居(衰减因子0.5)
            for n in hop1_neighbors:
                hop2_neighbors = list(G.neighbors(n))
                for m in hop2_neighbors:
                    if m != node and treatment_assignments.get(m) == 'treatment':
                        # 路径权重衰减
                        path_weight = G[node][n]['weight'] * G[n][m]['weight']
                        total_influence += path_weight * 0.5
            
            spillover_effects[node] = min(total_influence, 1.0)  # 归一化
    
    # 整体干扰指数
    spillover_index = sum(spillover_effects.values()) / len(spillover_effects)
    return spillover_index, spillover_effects

2.3 实例分析:邻里购平台的干扰检测

在2024年Q2,邻里购计划测试"拼团智能推荐"功能。我们首先进行为期两周的预实验,分析现有网络结构:

检测代码部署

# 数据准备:从Hive提取行为数据
"""
-- Hive SQL提取交互数据
CREATE TABLE network_analysis.pre_experiment_interactions AS
SELECT 
    user_id,
    target_user_id,
    event_type,
    COUNT(*) as interaction_count,
    AVG(UNIX_TIMESTAMP() - UNIX_TIMESTAMP(event_timestamp)) as avg_lag_seconds
FROM user_behavior_events
WHERE dt BETWEEN '2024-04-01' AND '2024-04-14'
    AND event_type IN ('share_product', 'invite_group', 'view_shared')
GROUP BY user_id, target_user_id, event_type
"""

# Python分析脚本
import matplotlib.pyplot as plt
import seaborn as sns

def analyze_pre_experiment_network():
    # 加载数据
    interactions_df = pd.read_sql(
        "SELECT * FROM network_analysis.pre_experiment_interactions",
        hive_connection
    )
    
    # 构建网络
    G = nx.from_pandas_edgelist(
        interactions_df,
        source='user_id',
        target='target_user_id',
        edge_attr='interaction_count',
        create_using=nx.DiGraph()
    )
    
    # 关键指标计算
    print(f"网络规模: {G.number_of_nodes()} 用户")
    print(f"连接总数: {G.number_of_edges()} 条")
    print(f"网络密度: {nx.density(G):.4f}")
    
    # 度分布分析
    in_degrees = [d for n, d in G.in_degree()]
    out_degrees = [d for n, d in G.out_degree()]
    
    plt.figure(figsize=(12, 5))
    plt.subplot(1, 2, 1)
    plt.hist(in_degrees, bins=50, log=True)
    plt.title('入度分布(被影响)')
    plt.xlabel('入度')
    plt.ylabel('用户数(log)')
    
    plt.subplot(1, 2, 2)
    plt.hist(out_degrees, bins=50, log=True)
    plt.title('出度分布(影响他人)')
    plt.xlabel('出度')
    plt.ylabel('用户数(log)')
    plt.savefig('network_degree_distribution.png')
    
    # 检测强连通分量
    sccs = list(nx.strongly_connected_components(G))
    print(f"强连通分量数量: {len(sccs)}")
    print(f"最大分量大小: {max(len(scc) for scc in sccs)} 用户")
    
    return G

# 执行分析
network = analyze_pre_experiment_network()

检测结果

  • 网络包含2,340,567个活跃用户
  • 平均出度为12.3,表明每个用户平均影响12个其他用户
  • 网络密度0.0008看似稀疏,但最大连通分量包含89%用户
  • 关键发现:前10%高度数用户的平均聚类系数达0.42,存在显著局部聚类

这一结果强烈提示:若采用传统随机分配,对照组将受到严重污染。

干扰检测流程
网络构建
原始数据
拓扑分析
网络密度 > 阈值?
存在干扰风险
传统设计可行
量化干扰指数
选择设计策略

III. 实验设计策略:缓解干扰效应

3.1 设计模式分类

设计策略 核心思想 实施复杂度 统计效率 适用场景
图分区设计 切断簇间连接 ★★★★☆ ★★★★☆ 社交网络、通讯网络
基于暴露的设计 直接建模干扰 ★★★★★ ★★★☆☆ 已知影响机制
时间轮转设计 隔离时间维度 ★★★☆☆ ★★★☆☆ 低频交互场景
地理聚类设计 空间隔离 ★★☆☆☆ ★★★☆☆ O2O、本地服务

3.2 图分区设计(Graph Partition Design)

这是处理社交网络干扰的黄金标准。核心思想是将网络划分为若干簇,以簇为单位随机分配处理。

3.2.1 分区算法选择

from sklearn.cluster import SpectralClustering
import numpy as np

def partition_network(G, n_clusters=100, algorithm='louvain'):
    """
    网络分区主函数
    """
    if algorithm == 'louvain':
        # Louvain社区发现算法
        import community as community_louvain
        partition = community_louvain.best_partition(G.to_undirected())
        
        # 转换为簇分配字典
        clusters = {}
        for node, cluster_id in partition.items():
            if cluster_id not in clusters:
                clusters[cluster_id] = []
            clusters[cluster_id].append(node)
    
    elif algorithm == 'spectral':
        # 谱聚类(需要指定簇数量)
        adj_matrix = nx.adjacency_matrix(G).astype(float)
        adj_matrix = (adj_matrix + adj_matrix.T) / 2  # 对称化
        
        # 计算拉普拉斯矩阵
        L = nx.laplacian_matrix(G).astype(float)
        
        # 谱聚类
        sc = SpectralClustering(n_clusters=n_clusters, 
                               affinity='precomputed',
                               random_state=42)
        labels = sc.fit_predict(adj_matrix)
        
        clusters = {}
        for idx, label in enumerate(labels):
            node = list(G.nodes())[idx]
            if label not in clusters:
                clusters[label] = []
            clusters[label].append(node)
    
    elif algorithm == 'metis':
        # 使用METIS库进行高质量分区(需安装pymetis)
        import pymetis
        adjacency_list = [list(G.neighbors(node)) for node in G.nodes()]
        n_cuts, membership = pymetis.part_graph(n_clusters, adjacency=adjacency_list)
        
        clusters = {}
        for node_id, cluster_id in enumerate(membership):
            node = list(G.nodes())[node_id]
            if cluster_id not in clusters:
                clusters[cluster_id] = []
            clusters[cluster_id].append(node)
    
    # 评估分区质量
    partition_quality = evaluate_partition(G, clusters)
    print(f"分区质量评估: {partition_quality}")
    
    return clusters, partition_quality

def evaluate_partition(G, clusters):
    """
    计算模块度和簇间边比例
    """
    # 创建簇分配映射
    node_to_cluster = {}
    for cid, nodes in clusters.items():
        for node in nodes:
            node_to_cluster[node] = cid
    
    # 计算簇间边比例
    inter_cluster_edges = 0
    total_edges = G.number_of_edges()
    
    for u, v in G.edges():
        if node_to_cluster[u] != node_to_cluster[v]:
            inter_cluster_edges += 1
    
    inter_cluster_ratio = inter_cluster_edges / total_edges
    
    # 模块度(Modularity)
    modularity = 0
    m = total_edges
    degrees = dict(G.degree())
    
    for cid, nodes in clusters.items():
        for i in nodes:
            for j in nodes:
                if i != j:
                    A_ij = 1 if G.has_edge(i, j) else 0
                    modularity += (A_ij - degrees[i]*degrees[j]/(2*m))
    
    modularity = modularity / (2*m)
    
    return {
        'inter_cluster_edge_ratio': inter_cluster_ratio,
        'modularity': modularity,
        'num_clusters': len(clusters),
        'avg_cluster_size': np.mean([len(c) for c in clusters.values()])
    }

3.2.2 分区后随机化流程

def cluster_level_randomization(clusters, treatment_prob=0.5):
    """
    簇级别随机分配处理
    """
    import random
    
    # 过滤小簇(阈值:最小50个用户)
    valid_clusters = {cid: nodes for cid, nodes in clusters.items() 
                     if len(nodes) >= 50}
    
    print(f"有效簇数量: {len(valid_clusters)}")
    print(f"覆盖用户比例: {sum(len(v) for v in valid_clusters.values()) / 
          sum(len(v) for v in clusters.values()):.2%}")
    
    # 簇级别随机化
    cluster_assignments = {}
    treatment_clusters = set()
    
    cluster_ids = list(valid_clusters.keys())
    random.shuffle(cluster_ids)
    
    n_treatment = int(len(cluster_ids) * treatment_prob)
    
    for i, cid in enumerate(cluster_ids):
        if i < n_treatment:
            cluster_assignments[cid] = 'treatment'
            treatment_clusters.add(cid)
        else:
            cluster_assignments[cid] = 'control'
    
    # 生成最终用户级别的分配表
    user_assignments = []
    for cid, nodes in valid_clusters.items():
        for user_id in nodes:
            user_assignments.append({
                'user_id': user_id,
                'cluster_id': cid,
                'cluster_assignment': cluster_assignments[cid]
            })
    
    assignment_df = pd.DataFrame(user_assignments)
    
    # 验证分配比例
    treatment_ratio = (assignment_df['cluster_assignment'] == 'treatment').mean()
    print(f"处理组比例: {treatment_ratio:.2%}")
    
    return assignment_df, cluster_assignments

3.3 基于暴露的设计(Exposure-Based Design)

当网络结构动态变化或无法分区时,直接建模暴露度更为有效。

class ExposureBasedDesign:
    def __init__(self, G, exposure_mapping='linear'):
        self.G = G
        self.exposure_mapping = exposure_mapping
    
    def compute_exposure(self, treatment_assignments):
        """
        计算每个节点的处理暴露度
        """
        exposures = {}
        for node in self.G.nodes():
            neighbors = list(self.G.neighbors(node))
            if not neighbors:
                exposures[node] = 0
                continue
            
            treated_neighbors = [
                n for n in neighbors if treatment_assignments.get(n) == 'treatment'
            ]
            
            if self.exposure_mapping == 'linear':
                # 线性加权和
                exposure = sum(
                    self.G[node][n]['weight'] for n in treated_neighbors
                ) / sum(
                    self.G[node][n]['weight'] for n in neighbors
                ) if neighbors else 0
            
            elif self.exposure_mapping == 'threshold':
                # 阈值函数(超过30%邻居处理则视为暴露)
                exposure = int(len(treated_neighbors) / len(neighbors) > 0.3)
            
            elif self.exposure_mapping == 'distance_decay':
                # 距离衰减(考虑2跳邻居)
                exposure = self._compute_decay_exposure(node, treatment_assignments)
            
            exposures[node] = exposure
        
        return exposures
    
    def _compute_decay_exposure(self, node, treatment_assignments, alpha=0.5):
        """
        带衰减的多跳暴露计算
        """
        total_exposure = 0
        
        # 1跳
        for n in self.G.neighbors(node):
            if treatment_assignments.get(n) == 'treatment':
                total_exposure += self.G[node][n]['weight'] * 1.0
        
        # 2跳
        for n in self.G.neighbors(node):
            for m in self.G.neighbors(n):
                if m != node and treatment_assignments.get(m) == 'treatment':
                    path_weight = self.G[node][n]['weight'] * self.G[n][m]['weight']
                    total_exposure += path_weight * alpha
        
        # 归一化
        max_possible = sum(self.G[node][n]['weight'] for n in self.G.neighbors(node)) * (1 + alpha)
        return total_exposure / max_possible if max_possible > 0 else 0

3.4 时间轮转设计(Switchback Design)

适用于低频交互场景,如网约车定价:

def switchback_experiment_design(time_slots, switch_interval='1H', 
                                treatment_prob=0.5):
    """
    时间轮转实验设计
    """
    import numpy as np
    
    # 生成时间槽
    time_range = pd.date_range(
        start=time_slots[0],
        end=time_slots[-1],
        freq=switch_interval
    )
    
    assignments = []
    current_is_treatment = np.random.random() < treatment_prob
    
    for i, time_slot in enumerate(time_range):
        # 每n个时间槽切换一次
        if i % 3 == 0 and i > 0:  # 每3个槽切换
            current_is_treatment = not current_is_treatment
        
        assignments.append({
            'time_slot': time_slot,
            'is_treatment': current_is_treatment,
            'exposure_prob': treatment_prob
        })
    
    return pd.DataFrame(assignments)
高频
低频
开始设计
网络是否稳定?
交互频率?
基于暴露设计
图分区设计
时间轮转设计
簇间边 < 5%?
Louvain分区
METIS优化分区
簇级别随机化
设计完成

IV. 代码实现:基于图分区的实验部署

4.1 完整部署架构

我们将展示从数据仓库到实验服务的完整流水线。

# ==================== 部署架构配置 ====================
"""
实验部署依赖:
- Spark 3.4+ (分布式图计算)
- NetworkX 3.1 (图分析)
- Redis 7.0+ (实时分配服务)
- PostgreSQL 14+ (分配记录)
- Apache Airflow (调度)
"""

# ==================== 步骤1:数据ETL与图构建 ====================
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count, unix_timestamp, when, lit

class NetworkExperimentPipeline:
    def __init__(self, experiment_id, treatment_prob=0.5):
        self.experiment_id = experiment_id
        self.treatment_prob = treatment_prob
        self.spark = SparkSession.builder \
            .appName(f"NetworkABTest_{experiment_id}") \
            .config("spark.sql.shuffle.partitions", 200) \
            .getOrCreate()
    
    def extract_interaction_data(self, start_date, end_date):
        """
        从Hive提取并构建交互数据
        """
        # 构建Spark DataFrame
        query = f"""
        SELECT 
            user_id,
            target_user_id,
            event_type,
            event_timestamp,
            -- 计算时间衰减权重
            EXP(-(UNIX_TIMESTAMP() - UNIX_TIMESTAMP(event_timestamp)) / 
                (7 * 24 * 3600)) as time_decay_weight
        FROM user_behavior_events
        WHERE dt BETWEEN '{start_date}' AND '{end_date}'
            AND event_type IN ('share_product', 'invite_friend', 
                              'group_chat_message', 'co_purchase')
        """
        
        interactions_df = self.spark.sql(query)
        
        # 聚合交互强度
        aggregated_df = interactions_df.groupBy(
            "user_id", "target_user_id"
        ).agg(
            count("*").alias("interaction_count"),
            (sum("time_decay_weight") / count("*")).alias("avg_weight")
        )
        
        return aggregated_df
    
    def build_distributed_graph(self, aggregated_df):
        """
        使用GraphFrame构建分布式图
        """
        from graphframes import GraphFrame
        
        # 创建顶点DataFrame
        vertices_df = aggregated_df.select(
            col("user_id").alias("id")
        ).union(
            aggregated_df.select(col("target_user_id").alias("id"))
        ).distinct().withColumn("type", lit("user"))
        
        # 创建边DataFrame
        edges_df = aggregated_df.select(
            col("user_id").alias("src"),
            col("target_user_id").alias("dst"),
            col("interaction_count").alias("weight"),
            col("avg_weight")
        )
        
        # 构建GraphFrame
        G = GraphFrame(vertices_df, edges_df)
        
        print(f"图顶点数: {G.vertices.count()}")
        print(f"图边数: {G.edges.count()}")
        
        return G
    
    def partition_graph(self, G, min_cluster_size=100):
        """
        分布式Louvain分区(使用Spark的Pregel API)
        """
        # 简化版:先collect到driver,使用NetworkX分区
        # 生产环境应使用分布式实现
        edges_local = G.edges.select("src", "dst", "weight").collect()
        nx_graph = nx.DiGraph()
        
        for row in edges_local:
            nx_graph.add_edge(int(row['src']), int(row['dst']), weight=float(row['weight']))
        
        # 使用Louvain算法
        import community as community_louvain
        partition_dict = community_louvain.best_partition(nx_graph.to_undirected())
        
        # 转换回Spark DataFrame
        partition_rdd = self.spark.sparkContext.parallelize([
            (int(node), int(cluster_id)) for node, cluster_id in partition_dict.items()
        ])
        
        partition_df = partition_rdd.toDF(["user_id", "cluster_id"])
        
        # 过滤小簇并分配处理
        cluster_stats = partition_df.groupBy("cluster_id").count()
        valid_clusters = cluster_stats.filter(col("count") >= min_cluster_size)
        
        # 广播有效簇列表
        valid_cluster_ids = set([
            row['cluster_id'] for row in valid_clusters.collect()
        ])
        
        # 标记有效簇
        filtered_df = partition_df.filter(col("cluster_id").isin(valid_cluster_ids))
        
        # 簇级别随机化
        from pyspark.sql import Window
        from pyspark.sql.functions import row_number, rand, when
        
        # 为每个簇分配处理状态
        cluster_assignments = filtered_df.select(
            "cluster_id"
        ).distinct().withColumn(
            "assignment",
            when(rand() < self.treatment_prob, "treatment").otherwise("control")
        )
        
        # 关联回用户
        user_assignments = filtered_df.join(
            cluster_assignments, on="cluster_id", how="left"
        )
        
        return user_assignments
    
    def persist_assignments(self, assignment_df):
        """
        持久化分配结果到PostgreSQL和Redis
        """
        # 写入PostgreSQL(审计和离线分析)
        jdbc_url = "jdbc:postgresql://analytics-db:5432/experiments"
        
        assignment_df.withColumn(
            "assigned_at", lit(pd.Timestamp.now())
        ).write.jdbc(
            url=jdbc_url,
            table=f"experiment_assignments.{self.experiment_id}",
            mode="overwrite",
            properties={"user": "exp_user", "password": "secure_pass"}
        )
        
        # 写入Redis(实时服务查询)
        import redis
        r = redis.Redis(host='redis-cluster', port=6379, db=0)
        
        # 批量写入(避免大量小操作)
        pipeline = r.pipeline()
        for row in assignment_df.limit(1000000).collect():  # 分批处理
            key = f"exp:{self.experiment_id}:user:{row['user_id']}"
            pipeline.hset(key, mapping={
                "cluster_id": str(row['cluster_id']),
                "assignment": row['assignment']
            })
            pipeline.expire(key, 86400 * 30)  # 30天过期
        
        pipeline.execute()
        
        print("分配结果已持久化到PostgreSQL和Redis")

# ==================== 使用示例 ====================
pipeline = NetworkExperimentPipeline(experiment_id="exp_social_rec_2024q2")
interactions = pipeline.extract_interaction_data("2024-04-01", "2024-04-14")
graph = pipeline.build_distributed_graph(interactions)
assignments = pipeline.partition_graph(graph, min_cluster_size=200)
pipeline.persist_assignments(assignments)

4.2 实时分配服务(Flask API)

# ==================== experiment_service.py ====================
from flask import Flask, request, jsonify
import redis
import logging
from typing import Dict, Optional

app = Flask(__name__)
r = redis.Redis(host='redis-cluster', port=6379, db=0, decode_responses=True)

class ExperimentConfig:
    def __init__(self):
        self.active_experiments = {
            "exp_social_rec_2024q2": {
                "name": "社交推荐算法V2",
                "traffic_allocation": 0.8,  # 80%流量参与实验
                "layers": ["recommendation_layer"]  # 实验层
            }
        }

config = ExperimentConfig()

def get_user_assignment(user_id: str, experiment_id: str) -> Optional[Dict]:
    """
    从Redis获取用户实验分配
    """
    try:
        key = f"exp:{experiment_id}:user:{user_id}"
        assignment = r.hgetall(key)
        
        if not assignment:
            # 回查PostgreSQL(冷启动场景)
            import psycopg2
            conn = psycopg2.connect(
                host="analytics-db",
                database="experiments",
                user="exp_user",
                password="secure_pass"
            )
            cursor = conn.cursor()
            cursor.execute("""
                SELECT cluster_id, assignment 
                FROM experiment_assignments.%s 
                WHERE user_id = %s
            """, (experiment_id, user_id))
            
            result = cursor.fetchone()
            if result:
                assignment = {
                    "cluster_id": str(result[0]),
                    "assignment": result[1]
                }
                # 回填Redis
                r.hset(key, mapping=assignment)
                r.expire(key, 86400)
            conn.close()
        
        return assignment
    except Exception as e:
        logging.error(f"获取分配失败: {e}")
        return None

@app.route('/v1/experiment/assign', methods=['POST'])
def assign_experiment():
    """
    实验分配API端点
    {
        "user_id": "12345",
        "experiment_id": "exp_social_rec_2024q2"
    }
    """
    data = request.get_json()
    user_id = data.get("user_id")
    experiment_id = data.get("experiment_id")
    
    if not user_id or not experiment_id:
        return jsonify({"error": "缺少user_id或experiment_id"}), 400
    
    # 验证实验存在
    if experiment_id not in config.active_eximents:
        return jsonify({"error": "实验不存在"}), 404
    
    # 检查用户是否符合实验条件(如新老用户)
    if not user_is_eligible(user_id, experiment_id):
        return jsonify({
            "user_id": user_id,
            "is_in_experiment": False,
            "reason": "用户不符合条件"
        })
    
    # 获取分配
    assignment = get_user_assignment(user_id, experiment_id)
    
    if not assignment:
        return jsonify({
            "user_id": user_id,
            "is_in_experiment": False,
            "reason": "分配未找到"
        }), 404
    
    # 返回完整信息
    return jsonify({
        "user_id": user_id,
        "experiment_id": experiment_id,
        "is_in_experiment": True,
        "cluster_id": assignment["cluster_id"],
        "assignment": assignment["assignment"],
        "timestamp": pd.Timestamp.now().isoformat()
    })

def user_is_eligible(user_id: str, experiment_id: str) -> bool:
    """
    用户资格审核
    """
    # 查询用户标签(从Redis或标签服务)
    user_tags = r.hget(f"user:tags:{user_id}", "tags")
    
    if not user_tags:
        return True  # 默认符合
    
    tags = user_tags.split(",")
    
    # 实验特定规则
    exp_rules = {
        "exp_social_rec_2024q2": {
            "exclude_tags": ["new_user", "low_activity"]
        }
    }
    
    rules = exp_rules.get(experiment_id, {})
    excluded = set(rules.get("exclude_tags", []))
    
    return len(set(tags) & excluded) == 0

@app.route('/health', methods=['GET'])
def health_check():
    """健康检查"""
    try:
        r.ping()
        return jsonify({"status": "healthy"}), 200
    except:
        return jsonify({"status": "unhealthy"}), 500

if __name__ == '__main__':
    app.run(host='0.0.0.0', port=8080, debug=False)

# ==================== 部署配置 ====================
"""
# Dockerfile
FROM python:3.9-slim

WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

COPY experiment_service.py .
COPY config.yaml .

EXPOSE 8080

CMD ["gunicorn", "-w", "4", "-b", "0.0.0.0:8080", "experiment_service:app"]

# docker-compose.yml
version: '3.8'
services:
  experiment-service:
    build: .
    ports:
      - "8080:8080"
    environment:
      - REDIS_HOST=redis-cluster
      - DB_HOST=analytics-db
    depends_on:
      - redis
      - postgres
    deploy:
      replicas: 3
      resources:
        limits:
          cpus: '2'
          memory: 4G
"""
监控
服务层
数据流水线
分配比例监控
Prometheus
实时仪表板
Grafana
用户请求
Flask API
查询Redis
返回分配
冷启动回查
PySpark ETL
Hive行为数据
GraphFrame构建
Louvain分区
簇级别随机化
PostgreSQL持久化
Redis缓存

V. 实例分析:社交平台的推荐算法测试(2000字详细分析)

5.1 业务背景与问题定义

“邻里购"是一个拥有800万月活用户的社交电商平台,其核心功能是允许用户创建购物群组、分享商品并获得团购折扣。2024年Q2,产品团队开发了"智能拼团推荐算法V2”,该算法通过强化学习优化拼团建议,预期可提升成团率15%。

然而,平台产品经理发现:由于以下网络效应,传统A/B测试将严重低估真实效果:

干扰路径分析

路径编号 传播机制 影响延迟 影响强度 可测量性
I 处理组用户分享更高质量拼团 → 对照组用户点击 → 间接体验新算法 即时(分钟) 中(0.12)
II 处理组成团率提升 → 平台整体商品池变动 → 对照组选择集改变 短期(小时) 高(0.23)
III 处理组满意度提升 → 口碑传播 → 对照组留存率改善 长期(周) 低(0.07)
IV 处理组占用更多履约资源 → 对照组配送延迟 → 负面溢出 中期(天) 中(-0.08)

5.2 实验设计决策树

基于预实验分析,我们面临关键决策:

评估维度 图分区设计 暴露建模设计 混合设计
网络稳定性 ✅ 社区结构稳定 ⚠️ 需要动态更新 ✅ 静态+动态
计算成本 中等(一次性) 高(持续计算)
统计效力 高(大簇) 中等(暴露方差)
实施难度 中等 高(模型复杂) 最高
选择 推荐 备选 资源充足时

5.3 实施细节与参数调优

5.3.1 分区参数选择

我们使用历史14天数据构建网络,关键参数如下:

# 分阶段调优代码
def tune_partition_parameters():
    """
    分区参数调优:在模块度和簇大小间权衡
    """
    results = []
    
    for min_size in [50, 100, 200, 500]:
        for resolution in [0.5, 1.0, 1.5, 2.0]:
            # 使用不同分辨率的Louvain
            partition_dict = community_louvain.best_partition(
                G.to_undirected(),
                resolution=resolution
            )
            
            # 计算指标
            clusters = {}
            for node, cid in partition_dict.items():
                clusters.setdefault(cid, []).append(node)
            
            # 过滤小簇
            valid_clusters = {k: v for k, v in clusters.items() if len(v) >= min_size}
            
            quality = evaluate_partition(G, valid_clusters)
            
            results.append({
                'min_cluster_size': min_size,
                'resolution': resolution,
                'num_clusters': len(valid_clusters),
                'coverage': sum(len(v) for v in valid_clusters.values()) / len(G),
                'inter_cluster_ratio': quality['inter_cluster_edge_ratio'],
                'modularity': quality['modularity']
            })
    
    results_df = pd.DataFrame(results)
    
    # 可视化帕累托前沿
    plt.figure(figsize=(10, 6))
    for min_size in results_df['min_cluster_size'].unique():
        subset = results_df[results_df['min_cluster_size'] == min_size]
        plt.scatter(subset['inter_cluster_ratio'], 
                   subset['modularity'], 
                   label=f'min_size={min_size}')
    
    plt.xlabel('簇间边比例(越低越好)')
    plt.ylabel('模块度(越高越好)')
    plt.title('分区质量帕累托前沿')
    plt.legend()
    plt.grid(True)
    plt.savefig('partition_tuning.png')
    
    return results_df

# 执行调优
tuning_results = tune_partition_parameters()
best_config = tuning_results.loc[
    (tuning_results['inter_cluster_ratio'] < 0.03) & 
    (tuning_results['coverage'] > 0.85)
].iloc[0]

print(f"最优配置: {best_config}")

调优结果

  • 最优参数:最小簇大小=200,分辨率=1.5
  • 质量指标:簇间边比例2.1%(理想<3%),模块度0.68(良好>0.6)
  • 覆盖率:有效覆盖91%的用户(丢弃的主要是低活跃用户)
  • 簇数量:最终得到1,245个有效簇,平均簇大小847人

5.3.2 实验分组与流量分配

为确保统计效力,我们采用分层随机化:

def stratified_cluster_randomization(clusters, user_attributes_df, 
                                    treatment_prob=0.5):
    """
    基于用户属性的分层簇随机化
    """
    # 计算每个簇的关键指标
    cluster_metrics = []
    
    for cid, user_ids in clusters.items():
        attrs = user_attributes_df[user_attributes_df['user_id'].isin(user_ids)]
        
        cluster_metrics.append({
            'cluster_id': cid,
            'size': len(user_ids),
            'avg_lifetime_value': attrs['lifetime_value'].mean(),
            'avg_activity_score': attrs['activity_score'].mean(),
            'region_mode': attrs['region'].mode().iloc[0] if not attrs.empty else 'unknown'
        })
    
    metrics_df = pd.DataFrame(cluster_metrics)
    
    # 按大小区间分层
    metrics_df['size_bin'] = pd.cut(
        metrics_df['size'], 
        bins=[0, 300, 600, 1000, float('inf')],
        labels=['small', 'medium', 'large', 'xlarge']
    )
    
    # 在每个层内独立随机化
    assignments = []
    
    for (size_bin, region), group in metrics_df.groupby(['size_bin', 'region_mode']):
        n_clusters = len(group)
        n_treatment = int(n_clusters * treatment_prob)
        
        # 随机排序并分配
        shuffled = group.sample(frac=1, random_state=42)
        treatment_ids = set(shuffled.head(n_treatment)['cluster_id'])
        
        for _, row in group.iterrows():
            assignments.append({
                'cluster_id': row['cluster_id'],
                'assignment': 'treatment' if row['cluster_id'] in treatment_ids else 'control',
                'size_bin': size_bin,
                'region': region
            })
    
    assignment_df = pd.DataFrame(assignments)
    
    # 验证均衡性
    validate_balance(assignment_df, metrics_df)
    
    return assignment_df

def validate_balance(assignment_df, metrics_df):
    """
    验证处理组和对照组的协变量均衡性
    """
    merged = assignment_df.merge(metrics_df, on='cluster_id')
    
    for covariate in ['size', 'avg_lifetime_value', 'avg_activity_score']:
        treat = merged[merged['assignment']=='treatment'][covariate]
        control = merged[merged['assignment']=='control'][covariate]
        
        # 计算标准化均值差(SMD)
        smd = (treat.mean() - control.mean()) / np.sqrt(
            (treat.var() + control.var()) / 2
        )
        
        print(f"{covariate} SMD: {smd:.3f} {'✓' if abs(smd) < 0.1 else '✗'}")

均衡性检验结果

  • 簇大小SMD: 0.032 ✓
  • 平均LTV SMD: 0.087 ✓
  • 活跃度SMD: 0.041 ✓
  • 结论:两组协变量平衡良好,满足实验要求

5.4 实验执行与监控

5.4.1 实时分流逻辑

# 客户端SDK集成示例
"""
// JavaScript SDK (嵌入到APP/Web)
class NetworkABTestSDK {
    constructor() {
        this.experimentCache = new Map();
        this.apiEndpoint = 'https://exp-api.neighborbuy.com/v1/experiment/assign';
    }
    
    async getAssignment(userId, experimentId) {
        // 检查缓存
        const cacheKey = `${experimentId}:${userId}`;
        if (this.experimentCache.has(cacheKey)) {
            return this.experimentCache.get(cacheKey);
        }
        
        try {
            const response = await fetch(this.apiEndpoint, {
                method: 'POST',
                headers: {'Content-Type': 'application/json'},
                body: JSON.stringify({user_id: userId, experiment_id: experimentId})
            });
            
            const result = await response.json();
            
            // 只缓存有效的分配
            if (result.is_in_experiment) {
                this.experimentCache.set(cacheKey, result);
                // 设置30分钟过期
                setTimeout(() => this.experimentCache.delete(cacheKey), 30 * 60 * 1000);
            }
            
            return result;
        } catch (error) {
            console.error('实验分配失败:', error);
            return {is_in_experiment: false, assignment: 'fallback'};
        }
    }
}

// 使用示例
const sdk = new NetworkABTestSDK();
const assignment = await sdk.getAssignment('user_12345', 'exp_social_rec_2024q2');

if (assignment.assignment === 'treatment') {
    // 加载新算法
    enableSmartRecommendation();
} else {
    // 保持旧算法
    useLegacyRecommendation();
}
"""

5.4.2 监控仪表板

# 实验监控脚本(每小时执行)
def monitor_experiment_health(experiment_id, date):
    """
    监控实验健康度指标
    """
    query = f"""
    WITH user_metrics AS (
        SELECT 
            ea.cluster_id,
            ea.assignment,
            COUNT(DISTINCT ub.user_id) as active_users,
            AVG(ub.session_duration) as avg_session,
            SUM(ub.purchase_amount) as total_gmv
        FROM experiment_assignments.{experiment_id} ea
        LEFT JOIN user_behavior_hourly ub 
            ON ea.user_id = ub.user_id
        WHERE ub.dt = '{date}'
        GROUP BY ea.cluster_id, ea.assignment
    )
    SELECT 
        assignment,
        COUNT(*) as cluster_count,
        SUM(active_users) as total_users,
        AVG(avg_session) as avg_session_time,
        SUM(total_gmv) as gmv,
        STDDEV(total_gmv) as gmv_std
    FROM user_metrics
    GROUP BY assignment
    """
    
    results = spark.sql(query).collect()
    
    # 计算SRM(样本比例不匹配)检验
    treat_users = next(r['total_users'] for r in results if r['assignment'] == 'treatment')
    control_users = next(r['total_users'] for r in results if r['assignment'] == 'control')
    
    total = treat_users + control_users
    expected_ratio = 0.5
    chi_squared = (treat_users - total*expected_ratio)**2 / (total*expected_ratio) + \
                  (control_users - total*expected_ratio)**2 / (total*expected_ratio)
    
    srm_p_value = 1 - chi2.cdf(chi_squared, df=1)
    
    alert = srm_p_value < 0.001  # 显著性水平0.1%
    
    # 发送告警
    if alert:
        send_alert(
            f"实验{experiment_id} SRM告警: p={srm_p_value:.4f}",
            f"处理组: {treat_users}, 对照组: {control_users}"
        )
    
    return {
        'date': date,
        'srm_p_value': srm_p_value,
        'is_healthy': not alert,
        'group_summary': results
    }

监控指标示例(实验第3天):

指标 处理组 对照组 预期差异 实际差异 状态
活跃用户数 1,024,567 1,018,923 <0.5% 0.55% ⚠️ 需观察
簇数量 623 622 <1% 0.16% ✅ 正常
平均会话时长 18.3min 18.1min <2% 1.1% ✅ 正常
SRM p值 - - >0.001 0.23 ✅ 正常

5.5 结果分析与效果估计

5.5.1 初步结果(实验运行14天后)

使用簇级别聚合减少相关性:

def analyze_cluster_aggregated_results(experiment_id, end_date):
    """
    簇聚合分析(减少干扰导致的方差膨胀)
    """
    query = f"""
    WITH user_level AS (
        SELECT 
            ea.cluster_id,
            ea.assignment,
            ub.user_id,
            SUM(ub.purchase_amount) as user_gmv,
            COUNT(DISTINCT ub.order_id) as order_count,
            -- 网络特定指标
            COUNT(DISTINCT ub.shared_user_id) as unique_shares
        FROM experiment_assignments.{experiment_id} ea
        JOIN user_behavior_daily ub ON ea.user_id = ub.user_id
        WHERE ub.dt BETWEEN '2024-05-01' AND '{end_date}'
        GROUP BY ea.cluster_id, ea.assignment, ub.user_id
    ),
    cluster_level AS (
        SELECT 
            cluster_id,
            assignment,
            AVG(user_gmv) as avg_gmv,
            AVG(order_count) as avg_orders,
            AVG(unique_shares) as avg_shares,
            COUNT(*) as cluster_size,
            STDDEV(user_gmv) as gmv_std
        FROM user_level
        GROUP BY cluster_id, assignment
    )
    SELECT * FROM cluster_level
    """
    
    cluster_df = spark.sql(query).toPandas()
    
    # 按assignment分组
    treatment = cluster_df[cluster_df['assignment'] == 'treatment']
    control = cluster_df[cluster_df['assignment'] == 'control']
    
    # 计算簇级别ATE
    delta_gmv = treatment['avg_gmv'].mean() - control['avg_gmv'].mean()
    delta_shares = treatment['avg_shares'].mean() - control['avg_shares'].mean()
    
    # 考虑簇内相关性调整的t检验
    from scipy.stats import ttest_ind
    
    # 使用Satterthwaite近似自由度
    t_stat, p_value = ttest_ind(
        treatment['avg_gmv'],
        control['avg_gmv'],
        equal_var=False
    )
    
    # 计算置信区间
    se_delta = np.sqrt(
        treatment['avg_gmv'].var() / len(treatment) + 
        control['avg_gmv'].var() / len(control)
    )
    
    ci_lower = delta_gmv - 1.96 * se_delta
    ci_upper = delta_gmv + 1.96 * se_delta
    
    return {
        'effect_size': delta_gmv,
        'relative_lift': delta_gmv / control['avg_gmv'].mean(),
        'p_value': p_value,
        'ci_95': (ci_lower, ci_upper),
        'n_clusters': (len(treatment), len(control))
    }

# 执行分析
results = analyze_cluster_aggregated_results("exp_social_rec_2024q2", "2024-05-14")

print(f"""
实验效果报告(14天结果):
==============================
GMV提升:${results['effect_size']:.2f} (95% CI: [{results['ci_95'][0]:.2f}, {results['ci_95'][1]:.2f}])
相对提升:{results['relative_lift']:.2%}
统计显著性:p = {results['p_value']:.4f} {'***' if results['p_value'] < 0.001 else '**' if results['p_value'] < 0.01 else '*' if results['p_value'] < 0.05 else '不显著'}
有效簇数:处理组{results['n_clusters'][0]}个,对照组{results['n_clusters'][1]}个
""")

初步结果解读

  • GMV提升:+$2.87/用户/周(95% CI: [+$1.92, +$4.03])
  • 相对提升:8.7%,但远低于产品团队预期的15%
  • p值<0.001,统计高度显著
  • 关键发现:新算法确实有效,但可能因干扰效应被低估

5.5.2 干扰效应校正分析

为了估计真实效果,我们实施暴露度工具变量分析:

def iv_analysis_with_exposure(experiment_id, end_date):
    """
    使用工具变量法校正干扰效应
    工具变量:簇分配(Z)
    暴露度:实际处理邻居比例(E)
    结果:个体GMV(Y)
    """
    
    # 1. 计算每个用户的暴露度
    exposure_query = f"""
    WITH neighbor_assignments AS (
        SELECT 
            ea1.user_id as target_user,
            ea2.user_id as neighbor_user,
            ea2.assignment as neighbor_assignment,
            n.interaction_weight
        FROM experiment_assignments.{experiment_id} ea1
        CROSS JOIN experiment_assignments.{experiment_id} ea2
        JOIN network_analysis.user_network n 
            ON ea1.user_id = n.user_id AND ea2.user_id = n.neighbor_id
        WHERE ea1.assignment = 'control'  -- 只分析对照组
    ),
    exposure_calc AS (
        SELECT 
            target_user,
            SUM(CASE WHEN neighbor_assignment = 'treatment' 
                     THEN interaction_weight ELSE 0 END) / 
            NULLIF(SUM(interaction_weight), 0) as exposure_score
        FROM neighbor_assignments
        GROUP BY target_user
    )
    SELECT * FROM exposure_calc
    """
    
    exposure_df = spark.sql(exposure_query).toPandas()
    
    # 2. 获取对照组用户的实际结果
    outcome_query = f"""
    SELECT 
        ea.user_id,
        SUM(ub.purchase_amount) as gmv,
        ea.cluster_id
    FROM experiment_assignments.{experiment_id} ea
    JOIN user_behavior_daily ub ON ea.user_id = ub.user_id
    WHERE ea.assignment = 'control'
        AND ub.dt BETWEEN '2024-05-01' AND '{end_date}'
    GROUP BY ea.user_id, ea.cluster_id
    """
    
    outcome_df = spark.sql(outcome_query).toPandas()
    
    # 3. 合并暴露度和结果
    analysis_df = outcome_df.merge(exposure_df, 
                                  left_on='user_id', 
                                  right_on='target_user',
                                  how='inner')
    
    # 4. 工具变量回归
    import statsmodels.api as sm
    from statsmodels.sandbox.regression.gmm import IV2SLS
    
    # 第一阶段:暴露度 ~ 簇分配
    # 计算簇级别处理比例(作为工具变量强度)
    cluster_treatment_rate = spark.sql(f"""
        SELECT 
            cluster_id,
            AVG(CASE WHEN assignment = 'treatment' THEN 1 ELSE 0 END) as cluster_treat_rate
        FROM experiment_assignments.{experiment_id}
        GROUP BY cluster_id
    """).toPandas()
    
    analysis_df = analysis_df.merge(cluster_treatment_rate, on='cluster_id')
    
    # 转换为numpy数组
    Y = analysis_df['gmv'].values  # 结果
    E = analysis_df['exposure_score'].fillna(0).values  # 内生暴露度
    Z = analysis_df['cluster_treat_rate'].values  # 工具变量
    X = analysis_df[['cluster_id']].values  # 控制变量
    
    # 添加常数项
    Z_with_const = sm.add_constant(Z)
    
    # 2SLS估计
    iv_model = IV2SLS(Y, X, E, Z_with_const).fit()
    
    # 计算真实处理效应(排除溢出)
    true_effect = iv_model.params[1] / iv_model.params[0]  # 假设简单线性关系
    
    return {
        'iv_coefficient': iv_model.params[1],
        'se': iv_model.bse[1],
        'p_value': iv_model.pvalues[1],
        'true_effect_estimate': true_effect,
        'first_stage_f_stat': iv_model.first_stage.model_1.fvalue,
        'exposure_df': analysis_df
    }

# 执行IV分析
iv_results = iv_analysis_with_exposure("exp_social_rec_2024q2", "2024-05-14")

print(f"""
工具变量分析结果:
==================
真实处理效应估计:${iv_results['true_effect_estimate']:.2f}
标准误:{iv_results['se']:.3f}
p值:{iv_results['p_value']:.4f}
第一阶段F统计量:{iv_results['first_stage_f_stat']:.2f} {'(强工具变量)' if iv_results['first_stage_f_stat'] > 10 else '(弱工具变量警告)'}

与传统ATE对比:
- 未校正ATE: +$2.87
- 校正后TE: +$3.42
- 低估幅度: {(3.42-2.87)/3.42:.1%}
""")

校正结果解读

  • 工具变量分析显示,真实效应为+$3.42/用户,比原始估计高19%
  • 低估原因:对照组因处理组溢出获得了$0.55/用户的正向影响
  • 第一阶段F统计量=38.4 > 10,说明工具变量强有效
  • 结论:产品团队的15%预期目标(约+$4.50)仍未达到,但差距缩小

5.5.3 网络效应分解

进一步分解直接效应与间接效应:

def decompose_network_effects(experiment_id, end_date):
    """
    分解直接效应、溢出效应和总效应
    """
    # 计算每个簇的"处理密度"
    cluster_density = spark.sql(f"""
        WITH intra_cluster_edges AS (
            SELECT 
                ea1.cluster_id,
                COUNT(*) as total_edges,
                SUM(CASE WHEN ea1.assignment != ea2.assignment THEN 1 ELSE 0 END) as cross_edges
            FROM experiment_assignments.{experiment_id} ea1
            JOIN experiment_assignments.{experiment_id} ea2
                ON ea1.user_id != ea2.user_id
            WHERE ea1.cluster_id = ea2.cluster_id
            GROUP BY ea1.cluster_id
        )
        SELECT 
            cluster_id,
            cross_edges / NULLIF(total_edges, 0) as spillover_potential
        FROM intra_cluster_edges
    """).toPandas()
    
    # 获取簇级别结果
    cluster_outcomes = spark.sql(f"""
        SELECT 
            ea.cluster_id,
            ea.assignment,
            AVG(ub.purchase_amount) as avg_gmv,
            COUNT(DISTINCT ea.user_id) as cluster_size
        FROM experiment_assignments.{experiment_id} ea
        LEFT JOIN user_behavior_daily ub 
            ON ea.user_id = ub.user_id
            AND ub.dt BETWEEN '2024-05-01' AND '{end_date}'
        GROUP BY ea.cluster_id, ea.assignment
    """).toPandas()
    
    # 合并数据
    analysis_df = cluster_outcomes.merge(cluster_density, on='cluster_id')
    
    # 回归分解
    import statsmodels.formula.api as smf
    
    # 模型:GMV = β₀ + β₁·Treatment + β₂·SpilloverPotential + β₃·Treatment×Spillover
    model = smf.ols(
        'avg_gmv ~ assignment + spillover_potential + assignment:spillover_potential',
        data=analysis_df
    ).fit()
    
    # 提取系数
    direct_effect = model.params['assignment[T.treatment]']
    spillover_main = model.params['spillover_potential']
    interaction_effect = model.params['assignment[T.treatment]:spillover_potential']
    
    return {
        'model_summary': model.summary(),
        'direct_effect': direct_effect,
        'spillover_main': spillover_main,
        'interaction_effect': interaction_effect,
        'total_effect_in_pure_cluster': direct_effect,  # 纯处理簇
        'total_effect_in_mixed_cluster': direct_effect + interaction_effect * 0.5  # 假设平均混淆度0.5
    }

# 执行分解
decomp = decompose_network_effects("exp_social_rec_2024q2", "2024-05-14")

print(f"""
网络效应分解:
==============
直接效应(无混淆簇): +${decomp['direct_effect']:.2f}
溢出主效应: {decomp['spillover_main']:+.2f}
交互效应: {decomp['interaction_effect']:+.2f}

不同簇类型的预期效果:
- 纯处理簇(无边跨组): +${decomp['total_effect_in_pure_cluster']:.2f}
- 混合簇(平均混淆): +${decomp['total_effect_in_mixed_cluster']:.2f}
""")

分解洞察

  • 直接效应+$3.61:在完全隔离的理想环境中,算法效果更强
  • 负向交互效应-0.32:说明簇内跨组边会导致竞争/稀释
  • 纯处理簇效果比混合簇高9%,验证分区质量的重要性
  • 建议:未来实验可进一步优化分区,将跨组边压缩至<1%

5.6 业务决策与迭代

基于完整分析,我们向管理层提交以下建议:

决策维度 传统分析结论 网络校正分析结论 业务建议
算法效果 8.7%提升(不迭预期) 12.1%真实提升(接近预期) ✅ 全量发布
低估程度 未识别 19%的效应被稀释 未来实验必须网络设计
分区质量 未评估 2.1%跨组边,可改进 下次使用METIS优化
资源投入 继续优化算法 算法已达标,需改进测试 平衡开发与实验工程

最终决策:批准全量发布,但要求数据科学团队建立"网络感知实验平台",将本次的临时方案产品化。

实验启动
预实验网络分析
干扰指数 > 0.05?
图分区设计
传统A/B测试
Louvain分区
簇大小过滤
分层随机化
实验运行
实时监控SRM
实验结束?
簇聚合分析
IV校正分析
效应分解
业务决策
全量发布/迭代

VI. 高级方法:准实验设计与机器学习

6.1 合成控制法(Synthetic Control)

当无法随机化时,构建合成对照组。

def synthetic_control_network_experiment(treated_units, outcome_matrix, covariates):
    """
    网络合成控制法
    treated_units: 被处理簇ID列表
    outcome_matrix: 面板数据,列为时间,行为簇
    covariates: 协变量矩阵
    """
    from sklearn.linear_model import Ridge
    from sklearn.preprocessing import StandardScaler
    
    # 标准化
    scaler = StandardScaler()
    cov_scaled = scaler.fit_transform(covariates)
    
    # 预处理期优化权重
    pre_period_outcomes = outcome_matrix.loc[:, :'treatment_start']
    
    # 构建合成对照
    ridge = Ridge(alpha=0.1, fit_intercept=False)
    # 目标:使合成组在预处理期匹配处理组
    ridge.fit(cov_scaled, pre_period_outcomes.mean())
    
    weights = ridge.coef_
    weights = weights / weights.sum()  # 归一化
    
    # 预测反事实
    post_period = outcome_matrix.loc[:, 'treatment_start':]
    synthetic_outcome = post_period @ weights
    
    # 计算效应
    actual_outcome = outcome_matrix.loc[treated_units, 'treatment_start':].mean()
    effect = actual_outcome - synthetic_outcome
    
    return {
        'weights': weights,
        'actual': actual_outcome,
        'synthetic': synthetic_outcome,
        'effect': effect,
        'mspe': ((actual_outcome - synthetic_outcome) ** 2).mean()
    }

6.2 图神经网络(GNN)用于暴露预测

import torch
import torch.nn as nn
from torch_geometric.nn import GraphConv, global_mean_pool

class ExposurePredictionGNN(nn.Module):
    def __init__(self, node_feature_dim, hidden_dim=64):
        super().__init__()
        
        self.conv1 = GraphConv(node_feature_dim, hidden_dim)
        self.conv2 = GraphConv(hidden_dim, hidden_dim)
        self.conv3 = GraphConv(hidden_dim, hidden_dim)
        
        self.predictor = nn.Sequential(
            nn.Linear(hidden_dim * 2, hidden_dim),
            nn.ReLU(),
            nn.Dropout(0.2),
            nn.Linear(hidden_dim, 1),
            nn.Sigmoid()
        )
    
    def forward(self, x, edge_index, batch, treatment_assignments):
        # 图卷积
        h1 = torch.relu(self.conv1(x, edge_index))
        h2 = torch.relu(self.conv2(h1, edge_index))
        h3 = torch.relu(self.conv3(h2, edge_index))
        
        # 全局池化
        graph_rep = global_mean_pool(h3, batch)
        
        # 结合处理状态预测暴露
        treat_features = torch.FloatTensor([
            treatment_assignments.get(node.item(), 0) for node in batch.unique()
        ])
        
        combined = torch.cat([graph_rep, treat_features.unsqueeze(1)], dim=1)
        
        exposure_pred = self.predictor(combined)
        
        return exposure_pred

# 训练循环(伪代码)
"""
model = ExposurePredictionGNN(node_feature_dim=128)
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
criterion = nn.MSELoss()

for epoch in range(100):
    model.train()
    for batch in dataloader:
        optimizer.zero_grad()
        
        pred_exposure = model(
            batch.x, 
            batch.edge_index, 
            batch.batch,
            batch.treatment_assignments
        )
        
        loss = criterion(pred_exposure, batch.true_exposure)
        loss.backward()
        optimizer.step()
"""

6.3 双重机器学习(Double Machine Learning)

from sklearn.ensemble import RandomForestRegressor, GradientBoostingRegressor
from sklearn.model_selection import KFold

class DMLNetworkEstimator:
    def __init__(self, n_folds=5):
        self.n_folds = n_folds
        self.y_model = GradientBoostingRegressor(n_estimators=100, random_state=42)
        self.d_model = RandomForestRegressor(n_estimators=100, random_state=42)
    
    def estimate_effect(self, Y, D, X, network_features):
        """
        Y: 结果变量
        D: 处理变量(带混淆)
        X: 协变量
        network_features: 网络特征(如聚类系数、中心性)
        """
        # 合并特征
        features = pd.concat([X, network_features], axis=1)
        
        kf = KFold(n_splits=self.n_folds, shuffle=True, random_state=42)
        
        Y_residuals = np.zeros_like(Y)
        D_residuals = np.zeros_like(D)
        
        for train_idx, test_idx in kf.split(features):
            # 训练集
            X_train, X_test = features.iloc[train_idx], features.iloc[test_idx]
            Y_train, Y_test = Y[train_idx], Y[test_idx]
            D_train, D_test = D[train_idx], D[test_idx]
            
            # 预测Y
            self.y_model.fit(X_train, Y_train)
            Y_pred = self.y_model.predict(X_test)
            Y_residuals[test_idx] = Y_test - Y_pred
            
            # 预测D
            self.d_model.fit(X_train, D_train)
            D_pred = self.d_model.predict(X_test)
            D_residuals[test_idx] = D_test - D_pred
        
        # 最终估计:残差回归
        effect = np.mean(Y_residuals * D_residuals) / np.mean(D_residuals ** 2)
        
        # 计算标准误(简单Bootstrap)
        n = len(Y)
        bootstrap_effects = []
        
        for _ in range(1000):
            sample_idx = np.random.choice(n, n, replace=True)
            effect_boot = np.mean(Y_residuals[sample_idx] * D_residuals[sample_idx]) / \
                          np.mean(D_residuals[sample_idx] ** 2)
            bootstrap_effects.append(effect_boot)
        
        se = np.std(bootstrap_effects)
        ci_lower = effect - 1.96 * se
        ci_upper = effect + 1.96 * se
        
        return {
            'effect': effect,
            'se': se,
            'ci_95': (ci_lower, ci_upper)
        }
ML增强实验设计
数据流
网络复杂?
传统线性模型
预测暴露度E_hat
残差化处理&结果
无偏效应估计
GNN暴露预测
网络特征
双重机器学习
协变量X
工具变量回归
处理状态Z
结果Y

VII. 结果分析与验证

7.1 敏感性分析

7.1.1 分区稳定性检验

def partition_stability_test(G, time_windows=['14d', '30d', '60d']):
    """
    验证分区在不同时间窗口的稳定性
    """
    from sklearn.metrics import adjusted_rand_score
    
    partitions = {}
    for window in time_windows:
        # 构建不同时期的网络
        window_edges = get_edges_for_window(G, window)
        window_G = nx.from_pandas_edgelist(window_edges, create_using=nx.DiGraph())
        
        # 分区
        partition = community_louvain.best_partition(window_G.to_undirected())
        partitions[window] = partition
    
    # 计算ARI
    ari_scores = {}
    for i, w1 in enumerate(time_windows):
        for j, w2 in enumerate(time_windows):
            if i < j:
                # 对齐用户集合
                common_users = set(partitions[w1].keys()) & set(partitions[w2].keys())
                
                labels1 = [partitions[w1][u] for u in common_users]
                labels2 = [partitions[w2][u] for u in common_users]
                
                ari = adjusted_rand_score(labels1, labels2)
                ari_scores[f"{w1}_vs_{w2}"] = ari
    
    return ari_scores

# 执行检验
stability = partition_stability_test(network)
print("分区稳定性(ARI):", stability)
# 输出示例: {'14d_vs_30d': 0.78, '14d_vs_60d': 0.65, '30d_vs_60d': 0.71}
# ARI > 0.7 表明稳定性良好

7.1.2 安慰剂检验

def placebo_test(experiment_id, n_placebo_runs=100):
    """
    在预处理期运行安慰剂实验
    """
    placebo_effects = []
    
    for i in range(n_placebo_runs):
        # 在预处理期(randomize)
        placebo_assignments = cluster_level_randomization(
            clusters, 
            user_attributes_df, 
            treatment_prob=0.5
        )
        
        # 计算"伪效应"(预处理期应无差异)
        pre_period_results = analyze_cluster_aggregated_results(
            experiment_id=f"placebo_{i}",
            end_date='2024-04-24'  # 实验开始前
        )
        
        placebo_effects.append(pre_period_results['effect_size'])
    
    # 检验分布
    plt.figure(figsize=(10, 6))
    plt.hist(placebo_effects, bins=20, alpha=0.7)
    plt.axvline(x=0, color='red', linestyle='--')
    plt.title('安慰剂检验:预处理期效应分布')
    plt.xlabel('伪效应大小')
    plt.ylabel('频率')
    plt.savefig('placebo_test.png')
    
    # 计算p值
    observed_effect = 2.87  # 真实实验效应
    placebo_p = np.mean(np.abs(placebo_effects) >= abs(observed_effect))
    
    return {
        'placebo_effects': placebo_effects,
        'p_value': placebo_p,
        'mean_bias': np.mean(placebo_effects),
        'is_valid': abs(np.mean(placebo_effects)) < 0.1
    }

placebo_results = placebo_test("exp_social_rec_2024q2")
print(f"安慰剂检验p值: {placebo_results['p_value']:.3f}")
print(f"平均偏差: ${placebo_results['mean_bias']:.3f}")

7.2 异质性分析

7.2.1 按网络位置分层

def heterogeneous_treatment_effects_by_position(G, assignment_df, outcomes_df):
    """
    按网络中心性分层的效应估计
    """
    # 计算网络中心性指标
    centrality_measures = {
        'degree': nx.degree_centrality(G),
        'betweenness': nx.betweenness_centrality(G, k=1000),
        'pagerank': nx.pagerank(G)
    }
    
    # 合并数据
    merged = assignment_df.merge(outcomes_df, on='user_id', how='inner')
    
    for measure_name, measure_dict in centrality_measures.items():
        merged[f'{measure_name}_bin'] = pd.qcut(
            merged['user_id'].map(measure_dict),
            q=5,
            labels=['Q1', 'Q2', 'Q3', 'Q4', 'Q5']
        )
        
        # 每层效应
        het_effects = []
        for bin_label in ['Q1', 'Q2', 'Q3', 'Q4', 'Q5']:
            subset = merged[merged[f'{measure_name}_bin'] == bin_label]
            
            treat = subset[subset['assignment'] == 'treatment']['outcome']
            control = subset[subset['assignment'] == 'control']['outcome']
            
            effect = treat.mean() - control.mean()
            se = np.sqrt(treat.var()/len(treat) + control.var()/len(control))
            
            het_effects.append({
                'bin': bin_label,
                'effect': effect,
                'se': se,
                'n_treat': len(treat),
                'n_control': len(control)
            })
        
        # 绘图
        effects_df = pd.DataFrame(het_effects)
        plt.figure(figsize=(8, 5))
        plt.errorbar(effects_df['bin'], effects_df['effect'], 
                    yerr=1.96*effects_df['se'], marker='o')
        plt.title(f'效应异质性:按{measure_name}分层')
        plt.xlabel(f'{measure_name} 分位')
        plt.ylabel('处理效应')
        plt.axhline(y=0, color='gray', linestyle='--')
        plt.savefig(f'het_effect_{measure_name}.png')

# 执行分析
het_results = heterogeneous_treatment_effects_by_position(
    G=network,
    assignment_df=user_assignments,
    outcomes_df=user_outcomes
)

异质性发现

  • **高度数用户(Q5)**效应+$4.12,显著高于低度数用户+$1.98
  • PageRank高分用户效应+$4.87,表明意见领袖获益更多
  • 介数中心性分层显示,桥接用户效应-$0.45(可能因信息过载)

7.3 长期效应追踪

def long_term_effect_tracking(experiment_id, followup_weeks=12):
    """
    追踪实验结束后效应持续性
    """
    weekly_effects = []
    
    for week in range(followup_weeks):
        week_end = pd.Timestamp('2024-05-14') + pd.Timedelta(weeks=week)
        
        # 查询该周所有用户(包括实验后新增)
        query = f"""
        SELECT 
            ea.user_id,
            ea.assignment,  -- 原始分配
            ub.purchase_amount,
            ub.week
        FROM experiment_assignments.{experiment_id} ea
        JOIN user_behavior_weekly ub ON ea.user_id = ub.user_id
        WHERE ub.week = '{week_end.strftime('%Y-W%W')}'
            AND ea.assignment IN ('treatment', 'control')
        """
        
        week_data = spark.sql(query).toPandas()
        
        treat_gmv = week_data[week_data['assignment']=='treatment']['purchase_amount'].mean()
        control_gmv = week_data[week_data['assignment']=='control']['purchase_amount'].mean()
        
        weekly_effects.append({
            'week': week,
            'effect': treat_gmv - control_gmv,
            'treat_gmv': treat_gmv,
            'control_gmv': control_gmv
        })
    
    # 可视化衰减
    effect_df = pd.DataFrame(weekly_effects)
    plt.figure(figsize=(12, 6))
    plt.plot(effect_df['week'], effect_df['effect'], marker='o')
    plt.fill_between(effect_df['week'], 
                     effect_df['effect'] - 1.96*effect_df['effect'].std(),
                     effect_df['effect'] + 1.96*effect_df['effect'].std(),
                     alpha=0.3)
    plt.axhline(y=0, color='red', linestyle='--')
    plt.title('处理效应随时间衰减')
    plt.xlabel('实验后周数')
    plt.ylabel('GMV效应')
    plt.savefig('long_term_decay.png')
    
    return effect_df

# 执行追踪
long_term = long_term_effect_tracking("exp_social_rec_2024q2")

长期衰减模式

  • 第1-4周:效应保持+$2.8-3.2(用户习惯形成)
  • 第5-8周:衰减至+$2.1(新奇效应减弱)
  • 第9-12周:稳定在+$1.8(持续效应)
  • 结论:算法有持续价值,但需结合运营活动维持热度
Lexical error on line 3. Unrecognized text. ... 验证金字塔 A[第一层:分区质量] --> B[模块度 > 0 ----------------------^

VIII. 最佳实践与常见问题

8.1 实施清单

阶段 检查项 验收标准 工具/脚本
设计前 网络干扰评估 干扰指数>0.05需网络设计 calculate_spillover_index
分区 模块度检验 >0.60 evaluate_partition
随机化 协变量均衡 SMD<0.1 validate_balance
运行中 SRM监控 p>0.001 monitor_experiment_health
分析 簇聚合 ICC<0.1 analyze_cluster_aggregated
验证 安慰剂检验 均值偏差<$0.1 placebo_test
发布 长期追踪 效应衰减<50% long_term_effect_tracking

8.2 常见问题与解决方案

问题现象 根本原因 检测方法 解决方案 成本
统计效力不足 簇数量太少 n_clusters < 100 降低min_cluster_size 增加干扰
SRM告警 分配不均衡 chi^2检验 分层随机化
分区不稳定 网络动态变化 ARI检验 缩短分区时间窗口 计算成本↑
弱工具变量 跨组边过多 F统计量<10 重新分区或排除边缘 覆盖率↓
长尾簇 度分布幂律 最大簇 > 10k用户 随机采样或强制拆分 分区质量↓

8.3 性能优化指南

大数据场景(>1000万用户):

  • 使用Pytorch Geometric分布式图分区
  • 预计算Louvain分区并缓存(周级更新)
  • Redis Pipeline批量写入分配结果
  • Spark缓存中间结果:df.persist(StorageLevel.MEMORY_AND_DISK)

实时服务场景(QPS>1000):

  • 本地缓存:Caffeine(Java)或LRU(Python)
  • 分配结果TTL:15分钟
  • Redis Cluster分片:按user_id % 1024
  • Hystrix熔断:Redis故障时降级为传统随机分配
# 优化后的分区函数:使用PySpark原生实现
def optimize_partition_with_pyspark(G, min_cluster_size=200):
    """
    使用PySpark GraphFrames的Connected Components作为快速分区
    """
    # 1. 计算连通分量(快速近似)
    cc = G.connectedComponents()
    
    # 2. 统计分量大小
    cc_stats = cc.groupBy("component").count().filter("count >= 5000")
    
    # 3. 对大分量进行二次分区(采样后Louvain)
    large_components = cc_stats.collect()
    
    final_assignments = []
    
    for component_row in large_components:
        component_id = component_row['component']
        
        # 提取子图
        subgraph_nodes = cc.filter(f"component = {component_id}").select("id")
        subgraph_edges = G.edges.join(
            subgraph_nodes.select(col("id").alias("src")),
            "src"
        ).join(
            subgraph_nodes.select(col("id").alias("dst")),
            "dst"
        )
        
        # 采样到driver进行Louvain(避免collect全量数据)
        sampled_edges = subgraph_edges.sample(fraction=0.3).collect()
        
        if sampled_edges:
            nx_subgraph = nx.DiGraph()
            for edge in sampled_edges:
                nx_subgraph.add_edge(int(edge['src']), int(edge['dst']))
            
            # Louvain分区
            partition = community_louvain.best_partition(nx_subgraph.to_undirected())
            
            # 映射回原节点
            for node, cluster_id in partition.items():
                final_assignments.append({
                    'user_id': node,
                    'cluster_id': f"{component_id}_{cluster_id}"
                })
    
    return spark.createDataFrame(final_assignments)
【声明】本内容来自华为云开发者社区博主,不代表华为云及华为云开发者社区的观点和立场。转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息,否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。