实验与观测研究的结合:三角验证法的实践应用

举报
数字扫地僧 发表于 2025/12/22 09:35:40 2025/12/22
【摘要】 I. 三角验证法的理论框架与研究价值 1.1 研究范式的哲学基础维度实验研究观测研究三角验证认识论实证主义自然主义实用主义核心价值因果推断生态效度互补增强控制程度高低动态平衡外部效度受限高综合提升时间维度短期集中长期连续多时点交叉实验研究通过随机分配(Randomization)控制混淆变量,建立清晰的因果链条,其数学基础可追溯至Fisher的随机化实验设计理论。核心公式为:Yi=α+τW...

I. 三角验证法的理论框架与研究价值

1.1 研究范式的哲学基础

维度 实验研究 观测研究 三角验证
认识论 实证主义 自然主义 实用主义
核心价值 因果推断 生态效度 互补增强
控制程度 动态平衡
外部效度 受限 综合提升
时间维度 短期集中 长期连续 多时点交叉

实验研究通过随机分配(Randomization)控制混淆变量,建立清晰的因果链条,其数学基础可追溯至Fisher的随机化实验设计理论。核心公式为:

Yi=α+τWi+βTXi+ϵiY_i = \alpha + \tau W_i + \beta^T X_i + \epsilon_i

其中YiY_i是结果变量,Wi{0,1}W_i \in \{0,1\}是处理指示变量,τ\tau是我们关心的平均处理效应(ATE)。

观测研究则强调在自然状态下收集数据,保留现象的完整性和复杂性。其因果推断依赖于可忽略性假设(Ignorability)

(Yi(0),Yi(1))WiXi(Y_i(0), Y_i(1)) \perp W_i | X_i

这一假设要求所有影响处理分配的变量XiX_i都被观测到,这在实践中往往难以满足。

1.2 三角验证的操作化定义

三角验证法并非简单的方法堆砌,而是系统性的整合策略。根据Denzin的分类,可分为四种类型:

验证类型 描述 在本案例中的应用
数据三角 使用多源数据 实验数据+日志数据+业务数据库
方法三角 采用不同研究方法 A/B测试+队列分析+倾向得分匹配
研究者三角 多人独立分析 数据科学家+产品经理+领域专家
理论三角 多理论视角解释 行为经济学+认知心理学+营销学

关键原则:三角验证的目标不是追求结果完全一致,而是通过多角度交叉验证识别稳健结论,解释差异来源,从而构建更具弹性的知识体系。

1.3 整合研究的效度提升机制

mermaid
graph TD
A[单一方法局限性] --> B[内部效度 vs 外部效度权衡]
B --> C[实验研究: 高内部效度]
B --> D[观测研究: 高外部效度]
C --> E[结论推广受限]
D --> F[因果推断模糊]
E --> G[三角验证整合]
F --> G
G --> H[方法互补]
G --> I[数据交叉]
G --> J[稳健性检验]
H --> K[综合效度提升]
I --> K
J --> K
K --> L[可信决策依据]


II. 实验研究的设计与实现

2.1 A/B测试的统计学基础

A/B测试的核心是假设检验框架:

  • 原假设H0:τ=0H_0: \tau = 0(处理无效果)
  • 备择假设H1:τ0H_1: \tau \neq 0(处理有效果)

统计功效(Power)计算公式:

Power=1β=P(拒绝 H0H1为真)\text{Power} = 1 - \beta = P(\text{拒绝 } H_0 | H_1 \text{为真})

样本量计算采用Lehr’s公式:

n=16σ2Δ2n = \frac{16 \sigma^2}{\Delta^2}

其中σ2\sigma^2是方差,Δ\Delta是最小可检测效应(MDE)。

2.2 分层随机化算法实现

import numpy as np
import pandas as pd
from typing import Dict, List, Tuple
import hashlib

class StratifiedRandomization:
    """
    分层随机化分配器
    解决协变量不平衡问题,提升实验精度
    """
    
    def __init__(self, strata_vars: List[str], treatment_names: List[str] = ['control', 'treatment']):
        """
        初始化分层随机化器
        
        参数:
            strata_vars: 分层变量列表,如['user_segment', 'device_type']
            treatment_names: 处理组名称
        """
        self.strata_vars = strata_vars
        self.treatment_names = treatment_names
        self.assignment_cache = {}  # 用户ID到分配的映射
        
    def create_strata(self, user_data: pd.DataFrame) -> pd.DataFrame:
        """
        创建分层键并计算各层样本量
        
        实现细节:
        1. 对离散变量直接组合
        2. 对连续变量分箱处理
        3. 计算每层最小样本量
        
        示例:
            user_data包含: user_id, segment(high/mid/low), device(android/ios)
            输出增加: strata_key如'high_android'
        """
        # 创建分层键
        user_data['strata_key'] = user_data[self.strata_vars].astype(str).agg('_'.join, axis=1)
        
        # 统计各层分布
        strata_stats = user_data['strata_key'].value_counts()
        print("各层样本分布:")
        print(strata_stats)
        
        # 检查小层问题(<100样本)
        small_strata = strata_stats[strata_stats < 100]
        if not small_strata.empty:
            print(f"警告: 以下层样本量不足100,可能影响随机化效果:\n{small_strata}")
        
        return user_data
    
    def assign_treatment(self, user_id: str, strata_key: str) -> str:
        """
        基于哈希的分层随机分配
        
        技术要点:
        4. 使用SHA256保证哈希均匀性
        5. 加盐处理避免预测
        6. 支持多处理组
        
        参数:
            user_id: 用户唯一标识
            strata_key: 分层键
            
        返回:
            treatment_assignment: 分配的处理组
        """
        # 检查缓存
        cache_key = f"{user_id}_{strata_key}"
        if cache_key in self.assignment_cache:
            return self.assignment_cache[cache_key]
        
        # 创建唯一哈希输入
        hash_input = f"{user_id}_{strata_key}_salt_v2024"
        hash_value = hashlib.sha256(hash_input.encode()).hexdigest()
        
        # 转换为0-1之间的数值
        hash_int = int(hash_value[:8], 16)  # 取前8位
        random_score = hash_int / 0xffffffff
        
        # 按比例分配
        n_treatments = len(self.treatment_names)
        assignment_idx = int(random_score * n_treatments)
        treatment = self.treatment_names[assignment_idx]
        
        # 缓存结果
        self.assignment_cache[cache_key] = treatment
        
        return treatment
    
    def execute_randomization(self, user_data: pd.DataFrame) -> pd.DataFrame:
        """
        执行完整的随机化流程
        
        部署注意事项:
        7. 必须在实验开始前完成分配
        8. 结果应持久化到数据库
        9. 支持增量用户处理
        
        返回:
            包含treatment分配的DataFrame
        """
        # 步骤1: 创建分层
        users_with_strata = self.create_strata(user_data.copy())
        
        # 步骤2: 逐行分配
        users_with_strata['treatment'] = users_with_strata.apply(
            lambda row: self.assign_treatment(row['user_id'], row['strata_key']),
            axis=1
        )
        
        # 步骤3: 验证分配平衡性
        balance_report = self._validate_balance(users_with_strata)
        print("\n分配平衡性报告:")
        print(balance_report)
        
        return users_with_strata
    
    def _validate_balance(self, assigned_data: pd.DataFrame) -> pd.DataFrame:
        """
        验证协变量平衡性
        
        统计检验:
        10. 标准化差异(Standardized Difference)
        11. 卡方检验(分类变量)
        12. t检验(连续变量)
        """
        report = []
        
        for var in self.strata_vars:
            crosstab = pd.crosstab(
                assigned_data[var],
                assigned_data['treatment'],
                normalize='columns'
            )
            report.append({
                'variable': var,
                'balance_score': self._calculate_balance_score(crosstab),
                'status': '✓' if self._calculate_balance_score(crosstab) < 0.1 else '✗'
            })
        
        return pd.DataFrame(report)
    
    def _calculate_balance_score(self, crosstab: pd.DataFrame) -> float:
        """计算标准化差异"""
        control_props = crosstab.iloc[:, 0].values
        treatment_props = crosstab.iloc[:, 1].values
        
        # 标准化差异
        diff = np.abs(control_props - treatment_props)
        balance_score = np.max(diff)
        
        return balance_score

# 使用示例代码
if __name__ == "__main__":
    # 模拟用户数据
    np.random.seed(42)
    n_users = 10000
    
    user_data = pd.DataFrame({
        'user_id': [f'user_{i:06d}' for i in range(n_users)],
        'user_segment': np.random.choice(['high', 'mid', 'low'], n_users, p=[0.2, 0.5, 0.3]),
        'device_type': np.random.choice(['android', 'ios', 'web'], n_users, p=[0.5, 0.3, 0.2]),
        'age': np.random.randint(18, 65, n_users),
        'historical_spend': np.random.exponential(100, n_users)
    })
    
    # 初始化并执行随机化
    randomizer = StratifiedRandomization(
        strata_vars=['user_segment', 'device_type'],
        treatment_names=['control', 'treatment_A', 'treatment_B']
    )
    
    assigned_users = randomizer.execute_randomization(user_data)
    print("\n前10条分配结果:")
    print(assigned_users[['user_id', 'user_segment', 'device_type', 'strata_key', 'treatment']].head(10))
    
    # 保存到CSV(生产环境应写入数据库)
    assigned_users.to_csv('experiment_assignments.csv', index=False)

2.3 实验组设置的工程实践

class ExperimentConfig:
    """
    实验配置管理器
    实现实验参数的集中管理和版本控制
    """
    
    def __init__(self, experiment_name: str):
        self.experiment_name = experiment_name
        self.config = {
            'start_date': '2024-01-15 00:00:00',
            'end_date': '2024-02-15 23:59:59',
            'primary_metric': 'conversion_rate',
            'secondary_metrics': ['revenue_per_user', 'session_duration'],
            'alpha': 0.05,  # 显著性水平
            'power': 0.8,   # 统计功效
            'mde': 0.02,    # 最小可检测效应
            'sample_size': 15000,
            'traffic_allocation': 0.3,  # 30%流量参与实验
            'guardrail_metrics': ['page_load_time', 'error_rate']
        }
    
    def validate_config(self) -> Dict[str, bool]:
        """
        验证配置合理性
        
        检查项:
        1. 时间跨度是否足够
        2. 样本量是否达标
        3. 指标是否可追踪
        4. MDE是否现实
        """
        issues = {}
        
        # 检查实验时长
        from datetime import datetime
        start = datetime.fromisoformat(self.config['start_date'])
        end = datetime.fromisoformat(self.config['end_date'])
        duration_days = (end - start).days
        
        if duration_days < 7:
            issues['duration'] = False
            print("⚠️ 警告: 实验时长不足7天,可能无法捕捉周周期效应")
        else:
            issues['duration'] = True
        
        # 检查样本量(简化计算)
        required_sample = self._calculate_required_sample()
        if self.config['sample_size'] < required_sample:
            issues['sample_size'] = False
            print(f"⚠️ 警告: 配置样本量不足,需要{required_sample}")
        else:
            issues['sample_size'] = True
        
        return issues
    
    def _calculate_required_sample(self) -> int:
        """基于MDE计算所需样本量"""
        # 简化版样本量计算
        # 实际应使用statsmodels.stats.power
        baseline_rate = 0.15  # 假设基准转化率15%
        mde = self.config['mde']
        
        # Lehr's formula近似
        pooled_prob = baseline_rate * (1 - baseline_rate) + \
                     (baseline_rate + mde) * (1 - baseline_rate - mde)
        sample_per_group = (16 * pooled_prob) / (mde ** 2)
        
        return int(sample_per_group * 2)  # 两组

# 部署到配置中心(如Consul/etcd)
def deploy_experiment_config(config: ExperimentConfig):
    """
    将实验配置部署到分布式配置中心
    
    生产环境要点:
    1. 支持动态更新
    2. 版本控制
    3. 灰度发布
    4. 快速回滚
    """
    import json
    
    # 模拟写入配置中心
    config_json = json.dumps(config.config, indent=2)
    
    # 实际部署代码示例(使用etcd)
    """
    import etcd3
    
    etcd = etcd3.client(host='config-server.internal', port=2379)
    etcd.put(
        f'/experiments/{config.experiment_name}',
        config_json,
        lease=etcd.lease(86400 * 30)  # 30天过期
    )
    """
    
    # 本地备份
    with open(f'configs/{config.experiment_name}.json', 'w') as f:
        f.write(config_json)
    
    print(f"配置已部署: {config.experiment_name}")
    print(f"配置内容: {config_json[:200]}...")

III. 观测研究的设计与实现

3.1 用户行为日志体系构建

观测研究的核心是高质量的数据采集。现代数字产品的日志体系需要满足:

设计原则 技术实现 质量指标
完整性 全埋点+关键事件自定义埋点 事件丢失率 < 0.1%
准确性 客户端+服务端双通道验证 字段准确率 > 99.5%
实时性 Kafka流式处理 端到端延迟 < 5分钟
关联性 统一ID体系(UserID/DeviceID) ID mapping准确率 > 98%

3.2 日志采集SDK实现

import time
import uuid
import json
from typing import Any, Dict, Optional
import threading
from collections import deque

class BehavioralLogger:
    """
    行为日志采集器
    设计要点:
    1. 异步发送避免阻塞主线程
    2. 本地队列防止数据丢失
    3. 自动重试机制
    4. 采样控制降低成本
    """
    
    def __init__(self, config: Dict[str, Any]):
        self.queue = deque(maxlen=10000)  # 最多缓存1万条
        self.batch_size = config.get('batch_size', 100)
        self.send_interval = config.get('send_interval', 60)  # 60秒发送一次
        self.sampling_rate = config.get('sampling_rate', 1.0)  # 全采样
        self.endpoint = config.get('endpoint', 'https://log-api.internal/v1/events')
        
        # 启动后台发送线程
        self._start_sender_thread()
        
        # 设备信息(实际应从环境获取)
        self.device_info = self._collect_device_info()
    
    def _collect_device_info(self) -> Dict[str, str]:
        """收集设备上下文信息"""
        # 实际实现应使用platform/os模块
        return {
            'device_id': f'dev_{uuid.uuid4().hex[:12]}',
            'platform': 'web',
            'os_version': 'Windows_10',
            'app_version': '2.1.0'
        }
    
    def log_event(self, event_name: str, properties: Optional[Dict] = None, 
                  user_id: Optional[str] = None):
        """
        记录用户行为事件
        
        参数:
            event_name: 事件名称,如'product_view', 'add_to_cart'
            properties: 事件属性字典
            user_id: 用户ID(如果有)
        
        使用示例:
            logger.log_event(
                'recommendation_click',
                {'position': 3, 'product_id': 'P12345'},
                user_id='U123456'
            )
        """
        # 采样控制
        if np.random.random() > self.sampling_rate:
            return
        
        # 构建标准事件格式
        event = {
            'event_id': str(uuid.uuid4()),
            'event_name': event_name,
            'timestamp': int(time.time() * 1000),  # 毫秒时间戳
            'user_id': user_id,
            'properties': properties or {},
            'device_info': self.device_info,
            'session_id': self._get_session_id(user_id)
        }
        
        # 验证事件格式
        self._validate_event(event)
        
        # 加入队列
        self.queue.append(event)
    
    def _get_session_id(self, user_id: Optional[str]) -> Optional[str]:
        """生成会话ID(简化版)"""
        if not user_id:
            return None
        
        # 基于时间窗口的会话管理
        current_hour = int(time.time() / 3600)
        return f"session_{user_id}_{current_hour}"
    
    def _validate_event(self, event: Dict):
        """验证事件数据质量"""
        required_fields = ['event_id', 'event_name', 'timestamp']
        for field in required_fields:
            if field not in event:
                raise ValueError(f"Missing required field: {field}")
        
        # 检查事件名格式
        if not event['event_name'].replace('_', '').isalnum():
            raise ValueError("Event name must be alphanumeric with underscores")
    
    def _start_sender_thread(self):
        """启动后台发送线程"""
        def sender_worker():
            while True:
                if len(self.queue) >= self.batch_size:
                    self._send_batch()
                time.sleep(self.send_interval)
        
        thread = threading.Thread(target=sender_worker, daemon=True)
        thread.start()
        print("后台日志发送线程已启动")
    
    def _send_batch(self):
        """
        批量发送日志到服务器
        
        可靠性保障:
        1. 本地持久化失败事件
        2. 指数退避重试
        3. 超时设置
        """
        if not self.queue:
            return
        
        batch = []
        while self.queue and len(batch) < self.batch_size:
            batch.append(self.queue.popleft())
        
        try:
            # 实际应使用requests库
            # response = requests.post(
            #     self.endpoint,
            #     json={'events': batch},
            #     timeout=10,
            #     headers={'Authorization': 'Bearer YOUR_TOKEN'}
            # )
            # response.raise_for_status()
            
            # 模拟成功发送
            print(f"成功发送{len(batch)}条事件到{self.endpoint}")
            
            # 保存到本地文件(降级方案)
            self._backup_to_file(batch)
            
        except Exception as e:
            print(f"发送失败: {e}, 将事件放回队列")
            # 放回队列稍后重试
            self.queue.extendleft(reversed(batch))
            
            # 指数退避
            time.sleep(min(300, 2 ** len(batch)))
    
    def _backup_to_file(self, batch: List[Dict]):
        """本地备份防止数据丢失"""
        timestamp = int(time.time())
        filename = f"logs/backup_{timestamp}_{len(batch)}.jsonl"
        
        import os
        os.makedirs('logs', exist_ok=True)
        
        with open(filename, 'a') as f:
            for event in batch:
                f.write(json.dumps(event) + '\n')

# 使用示例
if __name__ == "__main__":
    # 初始化日志器
    logger = BehavioralLogger({
        'batch_size': 50,
        'sampling_rate': 1.0,
        'endpoint': 'https://log-api.internal/v1/events'
    })
    
    # 模拟用户行为
    for i in range(1000):
        user_id = f"user_{i % 100:04d}"  # 100个用户
        
        # 首页访问
        logger.log_event(
            'home_page_view',
            {'source': 'organic', 'page_num': 1},
            user_id=user_id
        )
        
        # 产品浏览
        if np.random.random() > 0.5:
            logger.log_event(
                'product_detail_view',
                {
                    'product_id': f'P{np.random.randint(1000, 9999)}',
                    'recommendation_type': 'collaborative_filter'
                },
                user_id=user_id
            )
        
        # 加入购物车
        if np.random.random() > 0.8:
            logger.log_event(
                'add_to_cart',
                {
                    'product_id': f'P{np.random.randint(1000, 9999)}',
                    'quantity': np.random.randint(1, 5)
                },
                user_id=user_id
            )
        
        time.sleep(0.01)  # 模拟真实间隔
    
    # 等待后台发送完成
    time.sleep(5)

3.3 观测数据ETL管道

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, from_unixtime, window
from pyspark.sql.types import StructType, StructField, StringType, LongType, MapType

class ObservationalDataPipeline:
    """
    观测数据ETL管道
    技术栈: Spark + Delta Lake + S3
    
    处理流程:
    1. 从Kafka消费原始事件
    2. 清洗和验证
    3. 会话化(Sessionization)
    4. 特征工程
    5. 写入数据湖
    """
    
    def __init__(self, spark: SparkSession):
        self.spark = spark
        self.checkpoint_location = "s3://data-lake/checkpoints/observational/"
        
    def define_schema(self) -> StructType:
        """定义事件数据模式"""
        return StructType([
            StructField("event_id", StringType(), False),
            StructField("event_name", StringType(), False),
            StructField("timestamp", LongType(), False),
            StructField("user_id", StringType(), True),
            StructField("properties", MapType(StringType(), StringType()), True),
            StructField("device_info", MapType(StringType(), StringType()), True)
        ])
    
    def read_from_kafka(self, topic: str, bootstrap_servers: str):
        """从Kafka读取实时事件流"""
        return self.spark.readStream \
            .format("kafka") \
            .option("kafka.bootstrap.servers", bootstrap_servers) \
            .option("subscribe", topic) \
            .option("startingOffsets", "latest") \
            .load() \
            .selectExpr("CAST(value AS STRING) as json_string")
    
    def parse_events(self, raw_df):
        """解析JSON事件"""
        from pyspark.sql.functions import from_json
        
        schema = self.define_schema()
        return raw_df.withColumn(
            "event",
            from_json(col("json_string"), schema)
        ).select("event.*")
    
    def clean_data(self, df):
        """数据清洗转换"""
        return df.filter(
            # 移除缺少user_id的事件(某些边缘事件除外)
            (col("user_id").isNotNull()) | (col("event_name") == "page_view")
        ).withColumn(
            "event_time",
            from_unixtime(col("timestamp") / 1000)
        )
    
    def sessionize(self, df, session_gap_minutes: int = 30):
        """
        会话化处理
        
        算法逻辑:
        1. 按user_id分区
        2. 按timestamp排序
        3. 当时间差>30分钟时创建新会话
        
        输出:
            session_id: 会话唯一标识
            session_seq: 会话内事件序号
        """
        from pyspark.sql import Window
        from pyspark.sql.functions import lag, sum, when, concat
        
        # 定义窗口
        user_window = Window.partitionBy("user_id").orderBy("timestamp")
        
        # 检测会话边界
        df_with_lag = df.withColumn(
            "time_diff",
            (col("timestamp") - lag("timestamp", 1).over(user_window)) / (1000 * 60)
        )
        
        # 标记新会话开始
        df_with_session_flag = df_with_lag.withColumn(
            "new_session",
            when(col("time_diff") > session_gap_minutes, 1).otherwise(0)
        )
        
        # 累计和生成会话ID
        df_sessionized = df_with_session_flag.withColumn(
            "session_id",
            concat(
                col("user_id"),
                col("event_time").cast("date"),
                sum("new_session").over(user_window).cast("string")
            )
        )
        
        return df_sessionized
    
    def run_pipeline(self, kafka_topic: str, output_table: str):
        """
        执行完整ETL管道
        
        部署模式:
        4. 开发模式: 使用local模式
        5. 生产模式: 使用YARN/K8s集群
        6. 支持动态扩容
        """
        # 读取
        raw_df = self.read_from_kafka(kafka_topic, "kafka-broker-1:9092")
        
        # 解析
        parsed_df = self.parse_events(raw_df)
        
        # 清洗
        clean_df = self.clean_data(parsed_df)
        
        # 会话化
        sessionized_df = self.sessionize(clean_df)
        
        # 写入Delta Lake
        query = sessionized_df.writeStream \
            .format("delta") \
            .outputMode("append") \
            .option("checkpointLocation", self.checkpoint_location) \
            .option("mergeSchema", "true") \
            .toTable(output_table)
        
        print(f"管道已启动,输出表: {output_table}")
        return query

# 部署脚本
def deploy_etl_pipeline():
    """
    Spark ETL管道部署
    
    环境要求:
    1. Spark 3.4+
    2. Delta Lake 2.4+
    3. Kafka连接器
    4. S3/HDFS存储
    
    部署方式:
    1. 提交到YARN: spark-submit --master yarn
    2. K8s部署: spark-operator
    3. 本地测试: local[4]
    """
    spark = SparkSession.builder \
        .appName("ObservationalDataPipeline") \
        .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
        .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
        .config("spark.streaming.kafka.maxRatePerPartition", 1000) \
        .getOrCreate()
    
    pipeline = ObservationalDataPipeline(spark)
    
    # 启动流处理
    query = pipeline.run_pipeline(
        kafka_topic="user-events",
        output_table="observational.user_behavior"
    )
    
    # 等待终止
    query.awaitTermination()

if __name__ == "__main__":
    deploy_etl_pipeline()

IV. 三角验证法的整合应用

4.1 数据对齐与合并策略

三角验证的第一步是将实验数据与观测数据精确匹配。核心挑战在于:

  1. 时间对齐:实验曝光时间与观测行为时间窗口匹配
  2. 用户匹配:跨系统用户ID统一
  3. 事件关联:实验干预与后续行为的因果关系链
class DataTriangulationMerger:
    """
    数据三角合并器
    整合实验分配数据与观测行为数据
    """
    
    def __init__(self, experiment_table: str, observational_table: str):
        self.experiment_table = experiment_table
        self.observational_table = observational_table
        
    def merge_datasets(self, spark: SparkSession, 
                       lookback_days: int = 7, 
                       lookforward_days: int = 30) -> DataFrame:
        """
        合并实验与观测数据
        
        合并逻辑:
        1. 获取实验分配记录(曝光事件)
        2. 提取曝光前后用户行为
        3. 按用户ID关联
        4. 计算行为变化指标
        
        参数:
            lookback_days: 曝光前观测天数
            lookforward_days: 曝光后观测天数
        """
        # 读取实验数据
        experiment_df = spark.table(self.experiment_table) \
            .select(
                "user_id",
                "treatment",
                "assignment_time"
            )
        
        # 读取观测数据
        observational_df = spark.table(self.observational_table) \
            .select(
                "user_id",
                "event_name",
                "timestamp",
                "properties",
                "session_id"
            )
        
        # 计算时间窗口边界
        experiment_df = experiment_df.withColumn(
            "window_start",
            col("assignment_time").cast("timestamp") - expr(f"INTERVAL {lookback_days} DAYS")
        ).withColumn(
            "window_end",
            col("assignment_time").cast("timestamp") + expr(f"INTERVAL {lookforward_days} DAYS")
        )
        
        # 关联并过滤时间窗口内的事件
        merged_df = experiment_df.join(
            observational_df,
            on=["user_id"],
            how="inner"
        ).filter(
            (col("timestamp") >= col("window_start")) &
            (col("timestamp") <= col("window_end"))
        )
        
        return merged_df
    
    def calculate_behavioral_metrics(self, merged_df: DataFrame) -> DataFrame:
        """
        计算行为指标
        
        输出指标:
        - 曝光前活动天数
        - 曝光后活动天数
        - 曝光前转化率
        - 曝光后转化率
        - 行为变化率
        """
        # 按用户聚合
        metrics_df = merged_df.groupBy("user_id", "treatment").agg(
            # 曝光前指标
            count(when(col("timestamp") < col("assignment_time"), True)).alias("pre_activity_count"),
            
            # 曝光后指标
            count(when(col("timestamp") >= col("assignment_time"), True)).alias("post_activity_count"),
            
            # 转化事件(如购买)
            count(when(
                (col("timestamp") >= col("assignment_time")) & 
                (col("event_name") == "purchase"), 
                True
            )).alias("post_purchase_count"),
            
            # 首个转化时间
            min(when(
                col("event_name") == "purchase", 
                col("timestamp")
            )).alias("first_purchase_time")
        )
        
        # 计算变化率
        final_df = metrics_df.withColumn(
            "activity_change_rate",
            (col("post_activity_count") - col("pre_activity_count")) / 
            (col("pre_activity_count") + 1)  # 平滑处理
        ).withColumn(
            "conversion_indicator",
            (col("post_purchase_count") > 0).cast("int")
        )
        
        return final_df

# 使用示例
def run_data_triangulation():
    spark = SparkSession.builder.getOrCreate()
    
    merger = DataTriangulationMerger(
        experiment_table="experiments.assignments_v2",
        observational_table="observational.user_behavior"
    )
    
    # 执行合并
    merged_df = merger.merge_datasets(spark, lookback_days=7, lookforward_days=30)
    
    # 计算指标
    metrics_df = merger.calculate_behavioral_metrics(merged_df)
    
    # 写入分析表
    metrics_df.write.mode("overwrite").saveAsTable("analysis.triangulated_metrics")
    
    print("数据三角合并完成")

if __name__ == "__main__":
    run_data_triangulation()

4.2 倾向得分匹配增强

为应对观测数据的选择偏差,引入倾向得分匹配(PSM)作为第二层验证:

from sklearn.linear_model import LogisticRegression
from sklearn.neighbors import NearestNeighbors
import numpy as np

class PropensityScoreMatcher:
    """
    倾向得分匹配器
    用于在观测数据中构建伪实验组
    """
    
    def __init__(self, covariates: List[str], caliper: float = 0.1):
        """
        初始化PSM
        
        参数:
            covariates: 用于估计倾向得分的协变量
            caliper: 匹配半径(标准差单位)
        """
        self.covariates = covariates
        self.caliper = caliper
        self.ps_model = LogisticRegression(random_state=42, max_iter=1000)
        
    def estimate_propensity_scores(self, df: pd.DataFrame) -> np.ndarray:
        """
        估计倾向得分
        
        实现细节:
        1. 使用逻辑回归
        2. 支持类别变量自动编码
        3. 返回处理概率
        """
        X = df[self.covariates]
        
        # 处理缺失值
        X = X.fillna(X.median())
        
        # 训练倾向得分模型
        # 注意: 实际应使用scikit-learn的Pipeline处理类别变量
        self.ps_model.fit(X, df['treatment'])
        
        # 预测倾向得分
        propensity_scores = self.ps_model.predict_proba(X)[:, 1]
        
        return propensity_scores
    
    def match_groups(self, df: pd.DataFrame) -> pd.DataFrame:
        """
        执行最近邻匹配
        
        匹配算法:
        4. 分离处理组和对照组
        5. 构建KD树加速搜索
        6. 1:1无放回匹配
        7. 卡尺限制确保质量
        
        返回:
            匹配后的平衡数据集
        """
        # 估计倾向得分
        df = df.copy()
        df['propensity_score'] = self.estimate_propensity_scores(df)
        
        # 分离组
        treatment = df[df['treatment'] == 1]
        control = df[df['treatment'] == 0]
        
        # 构建匹配
        treatment_ps = treatment['propensity_score'].values.reshape(-1, 1)
        control_ps = control['propensity_score'].values.reshape(-1, 1)
        
        # 最近邻匹配
        nn_match = NearestNeighbors(n_neighbors=1, algorithm='kd_tree')
        nn_match.fit(control_ps)
        
        distances, indices = nn_match.kneighbors(treatment_ps)
        
        # 应用卡尺限制
        valid_matches = distances.flatten() < self.caliper
        
        matched_treatment = treatment[valid_matches]
        matched_control_idx = indices[valid_matches].flatten()
        matched_control = control.iloc[matched_control_idx]
        
        # 合并匹配样本
        matched_df = pd.concat([matched_treatment, matched_control])
        
        # 验证平衡性
        balance_improvement = self._calculate_balance_improvement(df, matched_df)
        print(f"匹配后平衡性改善: {balance_improvement:.2%}")
        
        return matched_df
    
    def _calculate_balance_improvement(self, before: pd.DataFrame, after: pd.DataFrame) -> float:
        """计算平衡性改善度"""
        before_imbalance = self._calculate_imbalance(before)
        after_imbalance = self._calculate_imbalance(after)
        
        improvement = (before_imbalance - after_imbalance) / before_imbalance
        return improvement
    
    def _calculate_imbalance(self, df: pd.DataFrame) -> float:
        """计算协变量不平衡度"""
        imbalances = []
        for cov in self.covariates:
            control_mean = df[df['treatment'] == 0][cov].mean()
            treatment_mean = df[df['treatment'] == 1][cov].mean()
            
            std_diff = abs(control_mean - treatment_mean) / df[cov].std()
            imbalances.append(std_diff)
        
        return np.mean(imbalances)

# 在三角验证框架中应用PSM
class EnhancedTriangulation:
    """
    增强型三角验证框架
    集成实验数据、原始观测数据、PSM观测数据
    """
    
    def __init__(self, metrics_df: pd.DataFrame):
        self.metrics_df = metrics_df
        self.results = {}
    
    def analyze_experiment_arm(self, df: pd.DataFrame):
        """分析实验组(金标准)"""
        ate = self._calculate_ate(df)
        self.results['experiment_ate'] = ate
        return ate
    
    def analyze_observational_arm(self, df: pd.DataFrame):
        """分析原始观测数据"""
        ate = self._calculate_ate(df)
        self.results['observational_ate'] = ate
        return ate
    
    def analyze_psm_arm(self, df: pd.DataFrame, covariates: List[str]):
        """分析PSM调整后的观测数据"""
        psm = PropensityScoreMatcher(covariates)
        matched_df = psm.match_groups(df)
        ate = self._calculate_ate(matched_df)
        self.results['psm_ate'] = ate
        return ate
    
    def _calculate_ate(self, df: pd.DataFrame) -> float:
        """计算平均处理效应"""
        treatment_outcome = df[df['treatment'] == 1]['conversion_indicator'].mean()
        control_outcome = df[df['treatment'] == 0]['conversion_indicator'].mean()
        return treatment_outcome - control_outcome
    
    def generate_triangulation_report(self) -> Dict:
        """
        生成三角验证报告
        
        报告维度:
        1. 点估计对比
        2. 置信区间重叠度
        3. 稳健性评分
        4. 差异解释
        """
        exp_ate = self.results.get('experiment_ate')
        obs_ate = self.results.get('observational_ate')
        psm_ate = self.results.get('psm_ate')
        
        report = {
            'experiment_ate': exp_ate,
            'observational_ate': obs_ate,
            'psm_ate': psm_ate,
            'consistency_score': self._calculate_consistency(exp_ate, obs_ate, psm_ate),
            'convergence_evidence': self._evaluate_convergence()
        }
        
        return report
    
    def _calculate_consistency(self, *estimates) -> float:
        """计算估计一致性评分"""
        # 简化实现:计算变异系数的倒数
        estimates = [e for e in estimates if e is not None]
        if len(estimates) < 2:
            return 0.0
        
        cv = np.std(estimates) / np.mean(estimates)
        consistency = 1 / (1 + cv)
        return consistency
    
    def _evaluate_convergence(self) -> str:
        """评估证据收敛性"""
        # 根据三种方法的估计方向、大小、显著性综合判断
        return "强收敛"  # 简化返回

# 使用示例
def run_enhanced_triangulation():
    # 从分析表读取数据
    metrics_df = spark.table("analysis.triangulated_metrics").toPandas()
    
    # 初始化框架
    triangulation = EnhancedTriangulation(metrics_df)
    
    # 三条臂分析
    exp_ate = triangulation.analyze_experiment_arm(metrics_df)
    obs_ate = triangulation.analyze_observational_arm(metrics_df)
    psm_ate = triangulation.analyze_psm_arm(
        metrics_df, 
        covariates=['pre_activity_count', 'historical_spend', 'age']
    )
    
    # 生成报告
    report = triangulation.generate_triangulation_report()
    print(json.dumps(report, indent=2))
    
    # 保存报告
    with open('triangulation_report.json', 'w') as f:
        json.dump(report, f, indent=2)

if __name__ == "__main__":
    run_enhanced_triangulation()

V. 实际案例分析:电商平台推荐算法效果评估(2000+字详细分析)

5.1 业务背景与研究问题

案例背景:某头部电商平台计划上线新的推荐算法(基于深度学习的协同过滤模型),替换原有的基于规则的推荐系统。业务方关心三个核心问题:

  1. 新算法是否能显著提升转化率(主指标)?
  2. 新算法对用户体验的长期影响如何(次指标:留存率、活跃度)?
  3. 不同用户群体(新客/老客、高价值/低价值)的效果是否存在异质性?

研究设计挑战

  • 实验周期需覆盖完整用户生命周期(至少30天)
  • 推荐效果存在网络效应和延迟效应
  • 用户行为受季节性和营销活动影响
  • 技术实现涉及多个微服务协调

5.2 实验设计细节

5.2.1 流量分配与随机化策略

采用分层随机化设计,控制关键混淆变量:

分层维度 分层依据 业务逻辑
用户生命周期 注册天数(<30天, 30-180天, >180天) 新老客行为模式差异大
价值分层 历史消费金额(高/中/低) 确保各层ROI可单独评估
设备类型 Android/iOS/Web 技术实现和体验差异
地域分组 一线/二线/其他城市 物流和商品供给差异

随机化算法改进:在基础哈希随机化上增加确定性重随机化机制。当检测到某层样本量<200时,触发跨层合并,但保持层内处理分配比例不变。这解决了小层统计功效不足的问题。

代码实现关键片段

# 小层检测与合并逻辑
min_strata_size = 200
small_strata = strata_stats[strata_stats < min_strata_size].index

if len(small_strata) > 0:
    print(f"检测到{len(small_strata)}个小层,触发合并策略")
    
    # 按用户生命周期聚合小层
    user_data['merged_strata'] = user_data['strata_key']
    for strata in small_strata:
        # 提取生命周期段
        lifecycle = strata.split('_')[0]  # 如'new' from 'new_android'
        # 合并到同生命周期的其他层
        user_data.loc[
            user_data['strata_key'] == strata,
            'merged_strata'
        ] = f"{lifecycle}_merged"

5.2.2 实验指标体系的构建

采用层次化指标矩阵设计:

I. 核心指标(Primary Metrics)

  • 转化率(Conversion Rate):下单用户数 / 曝光用户数
  • 客单价(Revenue per User):总GMV / 曝光用户数
  • 统计显著性:p-value < 0.05(Bonferroni校正)

II. 护栏指标(Guardrail Metrics)

  • 页面加载时间:P95 < 2秒
  • 推荐位点击率(CTR):避免过度推荐导致的疲劳
  • 用户投诉率:上升超过20%自动触发警报

III. 长期观测指标

  • 7日留存率:实验后第7天回访率
  • 30日复购率:长期购买行为
  • 用户活跃度:日均使用时长变化

5.2.3 实验执行与监控

实验上线后,建立实时监控仪表盘,关键监控项包括:

监控维度 检查频率 告警阈值 应对措施
样本量 每小时 单日<500用户 检查流量分配
SRM检验 每6小时 p<0.001 暂停实验排查
护栏指标 实时 CTR>25%或<5% 人工审核推荐质量
显著性 每日 提前达到显著 继续运行至计划周期

SRM(Sample Ratio Mismatch)检测是实验健康的必要保障。计算公式:

χ2=i=1k(OiEi)2Ei\chi^2 = \sum_{i=1}^{k} \frac{(O_i - E_i)^2}{E_i}

其中OiO_i是观测到的各组样本量,EiE_i是期望的理论值。当p-value < 0.001时,表明随机化系统可能存在bug。

5.3 观测研究设计

5.3.1 日志采集方案

由于实验只能覆盖30%流量,剩余70%用户作为观测数据源,用于构建外部效度验证队列

采集范围

  • 全量用户:确保观测数据代表性
  • 事件粒度:曝光、点击、加购、下单、支付成功
  • 上下文信息:推荐位位置、商品类别、用户实时行为序列

数据质量保证措施

  1. 客户端SDK:采用批量发送+本地缓存,网络中断后24小时内可恢复
  2. 服务端日志:双写机制,写入两个独立Kafka集群
  3. 校验对账:每小时实验组vs对照组事件量对比,差异>5%触发告警

5.3.2 队列构建与特征工程

构建回顾性队列(Retrospective Cohort)

  • 入组标准:在观测期间(实验前7天至实验后30天)有至少一次推荐位曝光
  • 排除标准:同时参与其他实验的用户
  • 分层暴露:根据推荐算法曝光次数分组(高/中/低暴露)

特征工程产出

特征类别 特征示例 计算逻辑 用途
行为序列 平均浏览深度、跳出率 会话窗口聚合 用户兴趣建模
时间模式 周中/周末活跃度比 时间特征交叉 控制季节性
社交影响 好友购买行为 图计算 网络效应分析
内容偏好 类目集中度 熵计算 个性化评估

5.4 三角验证实施过程

5.4.1 三条证据线的并行分析

证据线A:实验组(A/B Test)

  • 数据:30%流量,分层随机化分配
  • 分析:意向性分析(ITT),工具变量法(IV)处理非依从性
  • 结果:转化率提升2.3%(95% CI: [1.8%, 2.8%]),p=0.003

证据线B:原始观测数据

  • 数据:70%非实验流量,按自然曝光算法分组
  • 分析:线性回归+稳健标准误,控制混淆变量
  • 结果:转化率提升3.1%(95% CI: [2.5%, 3.7%]),看似效果更强

证据线C:PSM调整观测数据

  • 数据:从证据线B中提取,应用倾向得分匹配
  • 分析:1:1最近邻匹配,卡尺0.15,控制年龄、历史消费、活跃度
  • 结果:转化率提升2.5%(95% CI: [1.9%, 3.1%]),与实验组接近

5.4.2 结果对比与解释

分析方法 点估计 95% CI p值 样本量 关键假设
实验组 +2.3% [1.8%, 2.8%] 0.003 45,000 随机化+SUTVA
观测原始 +3.1% [2.5%, 3.7%] <0.001 105,000 可忽略性
PSM观测 +2.5% [1.9%, 3.1%] 0.002 22,000对 条件可忽略性

三角验证发现

I. 收敛性证据(Convergence)
三条证据线的效应方向一致(均为正向),且置信区间高度重叠(交集[1.9%, 2.8%]),表明新推荐算法确实有效。这种跨方法的一致性极大增强了结论的可信度。

II. 差异解释(Explanatory)
观测原始组效果(3.1%)高于实验组,经排查发现:

  • 选择偏差:算法自动优先服务高活跃度用户(28%偏差)
  • 时间效应:观测数据覆盖更长的周末时段(0.4%偏差)
  • 网络效应:观测组中社交推荐影响未被隔离(0.4%偏差)

PSM调整后,偏差降低至0.2%,证明匹配有效性。

III. 稳健性评分(Robustness Score)

采用多维度一致性评分

  1. 方向一致性:100%(3/3方法正向)
  2. 大小一致性:计算变异系数CV=0.15 < 0.2阈值
  3. 显著性一致性:2/3方法在p<0.01水平显著
  4. 分层一致性:生命周期三段的子群分析结果一致

综合稳健性评分:0.87/1.0(高)

5.4.3 异质性分析发现

利用观测数据的大样本优势,进行深度交互分析

发现**新客(注册<30天)**效果量(+4.2%)显著高于老客(+1.8%),但实验组因样本量不足未能检测到此差异。这提示:

  • 算法对新用户冷启动优化更好
  • 应调整流量分配,给新客更高实验比例
  • 产品策略:针对老客推荐逻辑需进一步优化

5.5 长期观测结果

实验结束后持续追踪30天,发现:

负面发现:实验组用户在第3周开始,活跃度下降1.2%,虽未达到统计显著(p=0.08),但观测数据(大样本)证实存在显著下降(p=0.02)。

根因分析

  • 推荐疲劳:深度模型过度 exploitation,多样性不足
  • 用户适应:初期新鲜感消退

对策

  1. 引入探索机制(ε-greedy策略)
  2. 增加推荐结果多样性指标约束
  3. 分阶段调整算法参数

5.6 业务决策与影响

基于三角验证结果,业务方做出分阶段上线决策

第一阶段(当前)

  • 对新用户100%启用新算法(效果最显著)
  • 老用户保持50%新算法+50%旧算法(监控长期指标)

第二阶段(优化后)

  • 解决推荐疲劳问题后,全量推广
  • 预计GMV年增量达2.8亿元

技术债务

  • 建立实验-观测数据自动核对系统
  • 开发PSM匹配模块,嵌入例行分析流程
  • 构建长期效果监控仪表板

5.7 案例总结与方法论反思

三角验证的价值体现

  1. 风险降低:单一方法可能导致错误结论(如仅看观测数据会高估效果)
  2. 洞察深化:实验提供因果推断,观测提供生态效度和异质性分析
  3. 效率提升:PSM验证后,后续类似实验可减少样本量(节省30%流量成本)

实施挑战

  • 资源投入:观测数据ETL管道维护成本是实验系统的2倍
  • 时间成本:完整分析周期从实验的2周延长至6周
  • 组织协调:需要数据工程师、分析师、产品经理三方紧密协作

最佳实践

  • 先实验后观测:实验验证核心假设,观测探索边界条件
  • 自动化PSM:将匹配流程脚本化,降低使用门槛
  • 标准化报告:建立三角验证报告模板,包含一致性评分和差异解释

VI. 代码实现与部署详解

6.1 项目架构设计

mermaid
graph TB
subgraph 数据采集层
A[客户端SDK] --> B[Kafka集群]
C[服务端日志] --> B
end

subgraph 数据存储层
    B --> D[Delta Lake]
    E[实验配置中心] --> F[PostgreSQL]
end

subgraph 计算层
    D --> G[Spark ETL管道]
    G --> H[特征工程]
    F --> H
    H --> I[三角验证引擎]
end

subgraph 服务层
    I --> J[REST API]
    J --> K[监控仪表板]
    J --> L[告警系统]
end

subgraph 部署层
    M[Docker容器] --> N[K8s集群]
    O[CI/CD流水线] --> M
    P[Helm Chart] --> N
end

style I fill:#f9f,stroke:#333,stroke-width:2px

6.2 Docker化部署

6.2.1 基础镜像构建

# Dockerfile
FROM python:3.10-slim as base

# 安装系统依赖
RUN apt-get update && apt-get install -y \
    build-essential \
    libffi-dev \
    libssl-dev \
    && rm -rf /var/lib/apt/lists/*

# 设置工作目录
WORKDIR /app

# 创建非root用户
RUN groupadd -r appuser && useradd -r -g appuser appuser
RUN chown -R appuser:appuser /app

# 切换到非root用户
USER appuser

# 安装Python依赖
COPY --chown=appuser:appuser requirements.txt .
RUN pip install --no-cache-dir --user -r requirements.txt

# 复制应用代码
COPY --chown=appuser:appuser src/ ./src/

# 设置环境变量
ENV PYTHONPATH=/app/src
ENV PATH=/home/appuser/.local/bin:$PATH

# 健康检查
HEALTHCHECK --interval=30s --timeout=3s \
  CMD python -c "import requests; requests.get('http://localhost:8000/health')"

# 启动命令
CMD ["python", "src/main.py"]

requirements.txt

pandas==2.1.1
numpy==1.25.2
pyspark==3.4.1
scikit-learn==1.3.0
fastapi==0.103.0
uvicorn==0.23.2
python-multipart==0.0.6
etcd3==0.12.0

6.2.2 多阶段构建优化

# 多阶段构建:分离编译和运行环境
# 阶段1:构建环境
FROM python:3.10-slim as builder

WORKDIR /build
COPY requirements.txt .
RUN pip install --user -r requirements.txt

# 阶段2:运行环境(更轻量)
FROM python:3.10-slim as runtime

# 仅安装运行时依赖
RUN apt-get update && apt-get install -y libpq5 && rm -rf /var/lib/apt/lists/*

WORKDIR /app

# 从构建阶段复制已安装的包
COPY --from=builder /root/.local /root/.local

# 复制应用代码
COPY src/ ./src/

ENV PATH=/root/.local/bin:$PATH
ENV PYTHONPATH=/app/src

CMD ["python", "src/main.py"]

镜像大小对比:

  • 单阶段构建:485MB
  • 多阶段构建:218MB(节省55%)

6.3 Kubernetes部署配置

6.3.1 Deployment配置

# k8s/deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: triangulation-engine
  namespace: analytics
  labels:
    app: triangulation
    version: v2.1.0
spec:
  replicas: 3
  strategy:
    type: RollingUpdate
    rollingUpdate:
      maxSurge: 1
      maxUnavailable: 0  # 零停机部署
  selector:
    matchLabels:
      app: triangulation
  template:
    metadata:
      labels:
        app: triangulation
        version: v2.1.0
    spec:
      serviceAccountName: triangulation-sa
      securityContext:
        runAsNonRoot: true
        runAsUser: 1000
        fsGroup: 1000
      containers:
      - name: engine
        image: your-registry/triangulation-engine:v2.1.0
        imagePullPolicy: Always
        ports:
        - containerPort: 8000
          name: http
        env:
        - name: SPARK_MASTER
          value: "spark://spark-master:7077"
        - name: KAFKA_BROKERS
          value: "kafka-1:9092,kafka-2:9092"
        - name: DATABASE_URL
          valueFrom:
            secretKeyRef:
              name: db-credentials
              key: url
        resources:
          requests:
            cpu: "2"
            memory: "4Gi"
          limits:
            cpu: "4"
            memory: "8Gi"
        livenessProbe:
          httpGet:
            path: /health
            port: 8000
          initialDelaySeconds: 60
          periodSeconds: 30
        readinessProbe:
          httpGet:
            path: /ready
            port: 8000
          initialDelaySeconds: 30
          periodSeconds: 10
        volumeMounts:
        - name: config-volume
          mountPath: /app/config
          readOnly: true
      volumes:
      - name: config-volume
        configMap:
          name: triangulation-config
      nodeSelector:
        workload-type: analytics
      tolerations:
      - key: "analytics-workload"
        operator: "Equal"
        value: "true"
        effect: "NoSchedule"

6.3.2 Service与Ingress配置

# k8s/service.yaml
apiVersion: v1
kind: Service
metadata:
  name: triangulation-service
  namespace: analytics
spec:
  selector:
    app: triangulation
  ports:
  - port: 80
    targetPort: 8000
  type: ClusterIP

---

# k8s/ingress.yaml
apiVersion: networking.k8s.io/v1
kind: Ingress
metadata:
  name: triangulation-ingress
  namespace: analytics
  annotations:
    nginx.ingress.kubernetes.io/rate-limit: "100"
    nginx.ingress.kubernetes.io/rate-limit-window: "1m"
    cert-manager.io/cluster-issuer: "letsencrypt-prod"
spec:
  ingressClassName: nginx
  tls:
  - hosts:
    - triangulation.analytics.internal
    secretName: triangulation-tls
  rules:
  - host: triangulation.analytics.internal
    http:
      paths:
      - path: /
        pathType: Prefix
        backend:
          service:
            name: triangulation-service
            port:
              number: 80

6.4 CI/CD集成

6.4.1 GitHub Actions工作流

# .github/workflows/deploy.yml
name: Deploy Triangulation Engine

on:
  push:
    branches: [ main ]
    paths:
      - 'src/**'
      - 'Dockerfile'
      - 'requirements.txt'

env:
  REGISTRY: ghcr.io
  IMAGE_NAME: ${{ github.repository }}

jobs:
  test:
    runs-on: ubuntu-latest
    strategy:
      matrix:
        python-version: ["3.10", "3.11"]
    
    steps:
    - uses: actions/checkout@v3
    
    - name: Set up Python ${{ matrix.python-version }}
      uses: actions/setup-python@v4
      with:
        python-version: ${{ matrix.python-version }}
    
    - name: Cache pip dependencies
      uses: actions/cache@v3
      with:
        path: ~/.cache/pip
        key: ${{ runner.os }}-pip-${{ hashFiles('requirements.txt') }}
    
    - name: Install dependencies
      run: |
        python -m pip install --upgrade pip
        pip install -r requirements.txt
        pip install pytest pytest-cov
    
    - name: Run linting
      run: |
        pip install flake8
        flake8 src/ --count --select=E9,F63,F7,F82 --show-source --statistics
        flake8 src/ --count --exit-zero --max-complexity=10 --max-line-length=120
    
    - name: Run tests with coverage
      run: |
        pytest tests/ --cov=src/ --cov-report=xml
    
    - name: Upload coverage to Codecov
      uses: codecov/codecov-action@v3
      with:
        file: ./coverage.xml
  
  build-and-push:
    needs: test
    runs-on: ubuntu-latest
    permissions:
      contents: read
      packages: write
    
    steps:
    - name: Checkout repository
      uses: actions/checkout@v3
    
    - name: Log in to Container Registry
      uses: docker/login-action@v2
      with:
        registry: ${{ env.REGISTRY }}
        username: ${{ github.actor }}
        password: ${{ secrets.GITHUB_TOKEN }}
    
    - name: Extract metadata
      id: meta
      uses: docker/metadata-action@v4
      with:
        images: ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}
        tags: |
          type=semver,pattern={{version}}
          type=sha,prefix=git-
          type=raw,value=latest,enable={{is_default_branch}}
    
    - name: Build and push Docker image
      uses: docker/build-push-action@v4
      with:
        context: .
        push: true
        tags: ${{ steps.meta.outputs.tags }}
        labels: ${{ steps.meta.outputs.labels }}
        cache-from: type=gha
        cache-to: type=gha,mode=max
        platforms: linux/amd64,linux/arm64
    
  deploy-staging:
    needs: build-and-push
    runs-on: ubuntu-latest
    environment: staging
    if: github.ref == 'refs/heads/main'
    
    steps:
    - name: Deploy to staging
      run: |
        echo "Deploying to staging environment"
        # 调用kubectl或Helm
        helm upgrade triangulation ./helm-chart \
          --install \
          --namespace analytics-staging \
          --set image.tag=git-${{ github.sha }} \
          --wait
    
  deploy-production:
    needs: deploy-staging
    runs-on: ubuntu-latest
    environment: production
    if: github.ref == 'refs/heads/main'
    
    steps:
    - name: Manual approval
      uses: hmarr/auto-approve-action@v3
    
    - name: Deploy to production
      run: |
        echo "Deploying to production"
        helm upgrade triangulation ./helm-chart \
          --install \
          --namespace analytics-production \
          --set image.tag=git-${{ github.sha }} \
          --set replicaCount=5 \
          --wait

6.4.2 Helm Chart配置

# helm-chart/values.yaml
# 默认配置值
replicaCount: 3

image:
  repository: ghcr.io/your-org/triangulation-engine
  tag: latest
  pullPolicy: IfNotPresent

service:
  type: ClusterIP
  port: 80

ingress:
  enabled: true
  annotations:
    nginx.ingress.kubernetes.io/rate-limit: "100"
  hosts:
    - host: triangulation.analytics.internal
      paths:
        - path: /
          pathType: Prefix
  tls:
    - secretName: triangulation-tls
      hosts:
        - triangulation.analytics.internal

resources:
  limits:
    cpu: 4
    memory: 8Gi
  requests:
    cpu: 2
    memory: 4Gi

autoscaling:
  enabled: true
  minReplicas: 3
  maxReplicas: 10
  targetCPUUtilizationPercentage: 70
  targetMemoryUtilizationPercentage: 80

nodeSelector:
  workload-type: analytics

tolerations:
  - key: "analytics-workload"
    operator: "Equal"
    value: "true"
    effect: "NoSchedule"

affinity:
  podAntiAffinity:
    preferredDuringSchedulingIgnoredDuringExecution:
      - weight: 100
        podAffinityTerm:
          labelSelector:
            matchExpressions:
              - key: app
                operator: In
                values:
                  - triangulation
          topologyKey: kubernetes.io/hostname
【声明】本内容来自华为云开发者社区博主,不代表华为云及华为云开发者社区的观点和立场。转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息,否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。