实验与观测研究的结合:三角验证法的实践应用
I. 三角验证法的理论框架与研究价值
1.1 研究范式的哲学基础
| 维度 | 实验研究 | 观测研究 | 三角验证 |
|---|---|---|---|
| 认识论 | 实证主义 | 自然主义 | 实用主义 |
| 核心价值 | 因果推断 | 生态效度 | 互补增强 |
| 控制程度 | 高 | 低 | 动态平衡 |
| 外部效度 | 受限 | 高 | 综合提升 |
| 时间维度 | 短期集中 | 长期连续 | 多时点交叉 |
实验研究通过随机分配(Randomization)控制混淆变量,建立清晰的因果链条,其数学基础可追溯至Fisher的随机化实验设计理论。核心公式为:
其中是结果变量,是处理指示变量,是我们关心的平均处理效应(ATE)。
观测研究则强调在自然状态下收集数据,保留现象的完整性和复杂性。其因果推断依赖于可忽略性假设(Ignorability):
这一假设要求所有影响处理分配的变量都被观测到,这在实践中往往难以满足。
1.2 三角验证的操作化定义
三角验证法并非简单的方法堆砌,而是系统性的整合策略。根据Denzin的分类,可分为四种类型:
| 验证类型 | 描述 | 在本案例中的应用 |
|---|---|---|
| 数据三角 | 使用多源数据 | 实验数据+日志数据+业务数据库 |
| 方法三角 | 采用不同研究方法 | A/B测试+队列分析+倾向得分匹配 |
| 研究者三角 | 多人独立分析 | 数据科学家+产品经理+领域专家 |
| 理论三角 | 多理论视角解释 | 行为经济学+认知心理学+营销学 |
关键原则:三角验证的目标不是追求结果完全一致,而是通过多角度交叉验证识别稳健结论,解释差异来源,从而构建更具弹性的知识体系。
1.3 整合研究的效度提升机制
mermaid
graph TD
A[单一方法局限性] --> B[内部效度 vs 外部效度权衡]
B --> C[实验研究: 高内部效度]
B --> D[观测研究: 高外部效度]
C --> E[结论推广受限]
D --> F[因果推断模糊]
E --> G[三角验证整合]
F --> G
G --> H[方法互补]
G --> I[数据交叉]
G --> J[稳健性检验]
H --> K[综合效度提升]
I --> K
J --> K
K --> L[可信决策依据]
II. 实验研究的设计与实现
2.1 A/B测试的统计学基础
A/B测试的核心是假设检验框架:
- 原假设(处理无效果)
- 备择假设(处理有效果)
统计功效(Power)计算公式:
样本量计算采用Lehr’s公式:
其中是方差,是最小可检测效应(MDE)。
2.2 分层随机化算法实现
import numpy as np
import pandas as pd
from typing import Dict, List, Tuple
import hashlib
class StratifiedRandomization:
"""
分层随机化分配器
解决协变量不平衡问题,提升实验精度
"""
def __init__(self, strata_vars: List[str], treatment_names: List[str] = ['control', 'treatment']):
"""
初始化分层随机化器
参数:
strata_vars: 分层变量列表,如['user_segment', 'device_type']
treatment_names: 处理组名称
"""
self.strata_vars = strata_vars
self.treatment_names = treatment_names
self.assignment_cache = {} # 用户ID到分配的映射
def create_strata(self, user_data: pd.DataFrame) -> pd.DataFrame:
"""
创建分层键并计算各层样本量
实现细节:
1. 对离散变量直接组合
2. 对连续变量分箱处理
3. 计算每层最小样本量
示例:
user_data包含: user_id, segment(high/mid/low), device(android/ios)
输出增加: strata_key如'high_android'
"""
# 创建分层键
user_data['strata_key'] = user_data[self.strata_vars].astype(str).agg('_'.join, axis=1)
# 统计各层分布
strata_stats = user_data['strata_key'].value_counts()
print("各层样本分布:")
print(strata_stats)
# 检查小层问题(<100样本)
small_strata = strata_stats[strata_stats < 100]
if not small_strata.empty:
print(f"警告: 以下层样本量不足100,可能影响随机化效果:\n{small_strata}")
return user_data
def assign_treatment(self, user_id: str, strata_key: str) -> str:
"""
基于哈希的分层随机分配
技术要点:
4. 使用SHA256保证哈希均匀性
5. 加盐处理避免预测
6. 支持多处理组
参数:
user_id: 用户唯一标识
strata_key: 分层键
返回:
treatment_assignment: 分配的处理组
"""
# 检查缓存
cache_key = f"{user_id}_{strata_key}"
if cache_key in self.assignment_cache:
return self.assignment_cache[cache_key]
# 创建唯一哈希输入
hash_input = f"{user_id}_{strata_key}_salt_v2024"
hash_value = hashlib.sha256(hash_input.encode()).hexdigest()
# 转换为0-1之间的数值
hash_int = int(hash_value[:8], 16) # 取前8位
random_score = hash_int / 0xffffffff
# 按比例分配
n_treatments = len(self.treatment_names)
assignment_idx = int(random_score * n_treatments)
treatment = self.treatment_names[assignment_idx]
# 缓存结果
self.assignment_cache[cache_key] = treatment
return treatment
def execute_randomization(self, user_data: pd.DataFrame) -> pd.DataFrame:
"""
执行完整的随机化流程
部署注意事项:
7. 必须在实验开始前完成分配
8. 结果应持久化到数据库
9. 支持增量用户处理
返回:
包含treatment分配的DataFrame
"""
# 步骤1: 创建分层
users_with_strata = self.create_strata(user_data.copy())
# 步骤2: 逐行分配
users_with_strata['treatment'] = users_with_strata.apply(
lambda row: self.assign_treatment(row['user_id'], row['strata_key']),
axis=1
)
# 步骤3: 验证分配平衡性
balance_report = self._validate_balance(users_with_strata)
print("\n分配平衡性报告:")
print(balance_report)
return users_with_strata
def _validate_balance(self, assigned_data: pd.DataFrame) -> pd.DataFrame:
"""
验证协变量平衡性
统计检验:
10. 标准化差异(Standardized Difference)
11. 卡方检验(分类变量)
12. t检验(连续变量)
"""
report = []
for var in self.strata_vars:
crosstab = pd.crosstab(
assigned_data[var],
assigned_data['treatment'],
normalize='columns'
)
report.append({
'variable': var,
'balance_score': self._calculate_balance_score(crosstab),
'status': '✓' if self._calculate_balance_score(crosstab) < 0.1 else '✗'
})
return pd.DataFrame(report)
def _calculate_balance_score(self, crosstab: pd.DataFrame) -> float:
"""计算标准化差异"""
control_props = crosstab.iloc[:, 0].values
treatment_props = crosstab.iloc[:, 1].values
# 标准化差异
diff = np.abs(control_props - treatment_props)
balance_score = np.max(diff)
return balance_score
# 使用示例代码
if __name__ == "__main__":
# 模拟用户数据
np.random.seed(42)
n_users = 10000
user_data = pd.DataFrame({
'user_id': [f'user_{i:06d}' for i in range(n_users)],
'user_segment': np.random.choice(['high', 'mid', 'low'], n_users, p=[0.2, 0.5, 0.3]),
'device_type': np.random.choice(['android', 'ios', 'web'], n_users, p=[0.5, 0.3, 0.2]),
'age': np.random.randint(18, 65, n_users),
'historical_spend': np.random.exponential(100, n_users)
})
# 初始化并执行随机化
randomizer = StratifiedRandomization(
strata_vars=['user_segment', 'device_type'],
treatment_names=['control', 'treatment_A', 'treatment_B']
)
assigned_users = randomizer.execute_randomization(user_data)
print("\n前10条分配结果:")
print(assigned_users[['user_id', 'user_segment', 'device_type', 'strata_key', 'treatment']].head(10))
# 保存到CSV(生产环境应写入数据库)
assigned_users.to_csv('experiment_assignments.csv', index=False)
2.3 实验组设置的工程实践
class ExperimentConfig:
"""
实验配置管理器
实现实验参数的集中管理和版本控制
"""
def __init__(self, experiment_name: str):
self.experiment_name = experiment_name
self.config = {
'start_date': '2024-01-15 00:00:00',
'end_date': '2024-02-15 23:59:59',
'primary_metric': 'conversion_rate',
'secondary_metrics': ['revenue_per_user', 'session_duration'],
'alpha': 0.05, # 显著性水平
'power': 0.8, # 统计功效
'mde': 0.02, # 最小可检测效应
'sample_size': 15000,
'traffic_allocation': 0.3, # 30%流量参与实验
'guardrail_metrics': ['page_load_time', 'error_rate']
}
def validate_config(self) -> Dict[str, bool]:
"""
验证配置合理性
检查项:
1. 时间跨度是否足够
2. 样本量是否达标
3. 指标是否可追踪
4. MDE是否现实
"""
issues = {}
# 检查实验时长
from datetime import datetime
start = datetime.fromisoformat(self.config['start_date'])
end = datetime.fromisoformat(self.config['end_date'])
duration_days = (end - start).days
if duration_days < 7:
issues['duration'] = False
print("⚠️ 警告: 实验时长不足7天,可能无法捕捉周周期效应")
else:
issues['duration'] = True
# 检查样本量(简化计算)
required_sample = self._calculate_required_sample()
if self.config['sample_size'] < required_sample:
issues['sample_size'] = False
print(f"⚠️ 警告: 配置样本量不足,需要{required_sample}")
else:
issues['sample_size'] = True
return issues
def _calculate_required_sample(self) -> int:
"""基于MDE计算所需样本量"""
# 简化版样本量计算
# 实际应使用statsmodels.stats.power
baseline_rate = 0.15 # 假设基准转化率15%
mde = self.config['mde']
# Lehr's formula近似
pooled_prob = baseline_rate * (1 - baseline_rate) + \
(baseline_rate + mde) * (1 - baseline_rate - mde)
sample_per_group = (16 * pooled_prob) / (mde ** 2)
return int(sample_per_group * 2) # 两组
# 部署到配置中心(如Consul/etcd)
def deploy_experiment_config(config: ExperimentConfig):
"""
将实验配置部署到分布式配置中心
生产环境要点:
1. 支持动态更新
2. 版本控制
3. 灰度发布
4. 快速回滚
"""
import json
# 模拟写入配置中心
config_json = json.dumps(config.config, indent=2)
# 实际部署代码示例(使用etcd)
"""
import etcd3
etcd = etcd3.client(host='config-server.internal', port=2379)
etcd.put(
f'/experiments/{config.experiment_name}',
config_json,
lease=etcd.lease(86400 * 30) # 30天过期
)
"""
# 本地备份
with open(f'configs/{config.experiment_name}.json', 'w') as f:
f.write(config_json)
print(f"配置已部署: {config.experiment_name}")
print(f"配置内容: {config_json[:200]}...")
III. 观测研究的设计与实现
3.1 用户行为日志体系构建
观测研究的核心是高质量的数据采集。现代数字产品的日志体系需要满足:
| 设计原则 | 技术实现 | 质量指标 |
|---|---|---|
| 完整性 | 全埋点+关键事件自定义埋点 | 事件丢失率 < 0.1% |
| 准确性 | 客户端+服务端双通道验证 | 字段准确率 > 99.5% |
| 实时性 | Kafka流式处理 | 端到端延迟 < 5分钟 |
| 关联性 | 统一ID体系(UserID/DeviceID) | ID mapping准确率 > 98% |
3.2 日志采集SDK实现
import time
import uuid
import json
from typing import Any, Dict, Optional
import threading
from collections import deque
class BehavioralLogger:
"""
行为日志采集器
设计要点:
1. 异步发送避免阻塞主线程
2. 本地队列防止数据丢失
3. 自动重试机制
4. 采样控制降低成本
"""
def __init__(self, config: Dict[str, Any]):
self.queue = deque(maxlen=10000) # 最多缓存1万条
self.batch_size = config.get('batch_size', 100)
self.send_interval = config.get('send_interval', 60) # 60秒发送一次
self.sampling_rate = config.get('sampling_rate', 1.0) # 全采样
self.endpoint = config.get('endpoint', 'https://log-api.internal/v1/events')
# 启动后台发送线程
self._start_sender_thread()
# 设备信息(实际应从环境获取)
self.device_info = self._collect_device_info()
def _collect_device_info(self) -> Dict[str, str]:
"""收集设备上下文信息"""
# 实际实现应使用platform/os模块
return {
'device_id': f'dev_{uuid.uuid4().hex[:12]}',
'platform': 'web',
'os_version': 'Windows_10',
'app_version': '2.1.0'
}
def log_event(self, event_name: str, properties: Optional[Dict] = None,
user_id: Optional[str] = None):
"""
记录用户行为事件
参数:
event_name: 事件名称,如'product_view', 'add_to_cart'
properties: 事件属性字典
user_id: 用户ID(如果有)
使用示例:
logger.log_event(
'recommendation_click',
{'position': 3, 'product_id': 'P12345'},
user_id='U123456'
)
"""
# 采样控制
if np.random.random() > self.sampling_rate:
return
# 构建标准事件格式
event = {
'event_id': str(uuid.uuid4()),
'event_name': event_name,
'timestamp': int(time.time() * 1000), # 毫秒时间戳
'user_id': user_id,
'properties': properties or {},
'device_info': self.device_info,
'session_id': self._get_session_id(user_id)
}
# 验证事件格式
self._validate_event(event)
# 加入队列
self.queue.append(event)
def _get_session_id(self, user_id: Optional[str]) -> Optional[str]:
"""生成会话ID(简化版)"""
if not user_id:
return None
# 基于时间窗口的会话管理
current_hour = int(time.time() / 3600)
return f"session_{user_id}_{current_hour}"
def _validate_event(self, event: Dict):
"""验证事件数据质量"""
required_fields = ['event_id', 'event_name', 'timestamp']
for field in required_fields:
if field not in event:
raise ValueError(f"Missing required field: {field}")
# 检查事件名格式
if not event['event_name'].replace('_', '').isalnum():
raise ValueError("Event name must be alphanumeric with underscores")
def _start_sender_thread(self):
"""启动后台发送线程"""
def sender_worker():
while True:
if len(self.queue) >= self.batch_size:
self._send_batch()
time.sleep(self.send_interval)
thread = threading.Thread(target=sender_worker, daemon=True)
thread.start()
print("后台日志发送线程已启动")
def _send_batch(self):
"""
批量发送日志到服务器
可靠性保障:
1. 本地持久化失败事件
2. 指数退避重试
3. 超时设置
"""
if not self.queue:
return
batch = []
while self.queue and len(batch) < self.batch_size:
batch.append(self.queue.popleft())
try:
# 实际应使用requests库
# response = requests.post(
# self.endpoint,
# json={'events': batch},
# timeout=10,
# headers={'Authorization': 'Bearer YOUR_TOKEN'}
# )
# response.raise_for_status()
# 模拟成功发送
print(f"成功发送{len(batch)}条事件到{self.endpoint}")
# 保存到本地文件(降级方案)
self._backup_to_file(batch)
except Exception as e:
print(f"发送失败: {e}, 将事件放回队列")
# 放回队列稍后重试
self.queue.extendleft(reversed(batch))
# 指数退避
time.sleep(min(300, 2 ** len(batch)))
def _backup_to_file(self, batch: List[Dict]):
"""本地备份防止数据丢失"""
timestamp = int(time.time())
filename = f"logs/backup_{timestamp}_{len(batch)}.jsonl"
import os
os.makedirs('logs', exist_ok=True)
with open(filename, 'a') as f:
for event in batch:
f.write(json.dumps(event) + '\n')
# 使用示例
if __name__ == "__main__":
# 初始化日志器
logger = BehavioralLogger({
'batch_size': 50,
'sampling_rate': 1.0,
'endpoint': 'https://log-api.internal/v1/events'
})
# 模拟用户行为
for i in range(1000):
user_id = f"user_{i % 100:04d}" # 100个用户
# 首页访问
logger.log_event(
'home_page_view',
{'source': 'organic', 'page_num': 1},
user_id=user_id
)
# 产品浏览
if np.random.random() > 0.5:
logger.log_event(
'product_detail_view',
{
'product_id': f'P{np.random.randint(1000, 9999)}',
'recommendation_type': 'collaborative_filter'
},
user_id=user_id
)
# 加入购物车
if np.random.random() > 0.8:
logger.log_event(
'add_to_cart',
{
'product_id': f'P{np.random.randint(1000, 9999)}',
'quantity': np.random.randint(1, 5)
},
user_id=user_id
)
time.sleep(0.01) # 模拟真实间隔
# 等待后台发送完成
time.sleep(5)
3.3 观测数据ETL管道
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, from_unixtime, window
from pyspark.sql.types import StructType, StructField, StringType, LongType, MapType
class ObservationalDataPipeline:
"""
观测数据ETL管道
技术栈: Spark + Delta Lake + S3
处理流程:
1. 从Kafka消费原始事件
2. 清洗和验证
3. 会话化(Sessionization)
4. 特征工程
5. 写入数据湖
"""
def __init__(self, spark: SparkSession):
self.spark = spark
self.checkpoint_location = "s3://data-lake/checkpoints/observational/"
def define_schema(self) -> StructType:
"""定义事件数据模式"""
return StructType([
StructField("event_id", StringType(), False),
StructField("event_name", StringType(), False),
StructField("timestamp", LongType(), False),
StructField("user_id", StringType(), True),
StructField("properties", MapType(StringType(), StringType()), True),
StructField("device_info", MapType(StringType(), StringType()), True)
])
def read_from_kafka(self, topic: str, bootstrap_servers: str):
"""从Kafka读取实时事件流"""
return self.spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", bootstrap_servers) \
.option("subscribe", topic) \
.option("startingOffsets", "latest") \
.load() \
.selectExpr("CAST(value AS STRING) as json_string")
def parse_events(self, raw_df):
"""解析JSON事件"""
from pyspark.sql.functions import from_json
schema = self.define_schema()
return raw_df.withColumn(
"event",
from_json(col("json_string"), schema)
).select("event.*")
def clean_data(self, df):
"""数据清洗转换"""
return df.filter(
# 移除缺少user_id的事件(某些边缘事件除外)
(col("user_id").isNotNull()) | (col("event_name") == "page_view")
).withColumn(
"event_time",
from_unixtime(col("timestamp") / 1000)
)
def sessionize(self, df, session_gap_minutes: int = 30):
"""
会话化处理
算法逻辑:
1. 按user_id分区
2. 按timestamp排序
3. 当时间差>30分钟时创建新会话
输出:
session_id: 会话唯一标识
session_seq: 会话内事件序号
"""
from pyspark.sql import Window
from pyspark.sql.functions import lag, sum, when, concat
# 定义窗口
user_window = Window.partitionBy("user_id").orderBy("timestamp")
# 检测会话边界
df_with_lag = df.withColumn(
"time_diff",
(col("timestamp") - lag("timestamp", 1).over(user_window)) / (1000 * 60)
)
# 标记新会话开始
df_with_session_flag = df_with_lag.withColumn(
"new_session",
when(col("time_diff") > session_gap_minutes, 1).otherwise(0)
)
# 累计和生成会话ID
df_sessionized = df_with_session_flag.withColumn(
"session_id",
concat(
col("user_id"),
col("event_time").cast("date"),
sum("new_session").over(user_window).cast("string")
)
)
return df_sessionized
def run_pipeline(self, kafka_topic: str, output_table: str):
"""
执行完整ETL管道
部署模式:
4. 开发模式: 使用local模式
5. 生产模式: 使用YARN/K8s集群
6. 支持动态扩容
"""
# 读取
raw_df = self.read_from_kafka(kafka_topic, "kafka-broker-1:9092")
# 解析
parsed_df = self.parse_events(raw_df)
# 清洗
clean_df = self.clean_data(parsed_df)
# 会话化
sessionized_df = self.sessionize(clean_df)
# 写入Delta Lake
query = sessionized_df.writeStream \
.format("delta") \
.outputMode("append") \
.option("checkpointLocation", self.checkpoint_location) \
.option("mergeSchema", "true") \
.toTable(output_table)
print(f"管道已启动,输出表: {output_table}")
return query
# 部署脚本
def deploy_etl_pipeline():
"""
Spark ETL管道部署
环境要求:
1. Spark 3.4+
2. Delta Lake 2.4+
3. Kafka连接器
4. S3/HDFS存储
部署方式:
1. 提交到YARN: spark-submit --master yarn
2. K8s部署: spark-operator
3. 本地测试: local[4]
"""
spark = SparkSession.builder \
.appName("ObservationalDataPipeline") \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
.config("spark.streaming.kafka.maxRatePerPartition", 1000) \
.getOrCreate()
pipeline = ObservationalDataPipeline(spark)
# 启动流处理
query = pipeline.run_pipeline(
kafka_topic="user-events",
output_table="observational.user_behavior"
)
# 等待终止
query.awaitTermination()
if __name__ == "__main__":
deploy_etl_pipeline()
IV. 三角验证法的整合应用
4.1 数据对齐与合并策略
三角验证的第一步是将实验数据与观测数据精确匹配。核心挑战在于:
- 时间对齐:实验曝光时间与观测行为时间窗口匹配
- 用户匹配:跨系统用户ID统一
- 事件关联:实验干预与后续行为的因果关系链
class DataTriangulationMerger:
"""
数据三角合并器
整合实验分配数据与观测行为数据
"""
def __init__(self, experiment_table: str, observational_table: str):
self.experiment_table = experiment_table
self.observational_table = observational_table
def merge_datasets(self, spark: SparkSession,
lookback_days: int = 7,
lookforward_days: int = 30) -> DataFrame:
"""
合并实验与观测数据
合并逻辑:
1. 获取实验分配记录(曝光事件)
2. 提取曝光前后用户行为
3. 按用户ID关联
4. 计算行为变化指标
参数:
lookback_days: 曝光前观测天数
lookforward_days: 曝光后观测天数
"""
# 读取实验数据
experiment_df = spark.table(self.experiment_table) \
.select(
"user_id",
"treatment",
"assignment_time"
)
# 读取观测数据
observational_df = spark.table(self.observational_table) \
.select(
"user_id",
"event_name",
"timestamp",
"properties",
"session_id"
)
# 计算时间窗口边界
experiment_df = experiment_df.withColumn(
"window_start",
col("assignment_time").cast("timestamp") - expr(f"INTERVAL {lookback_days} DAYS")
).withColumn(
"window_end",
col("assignment_time").cast("timestamp") + expr(f"INTERVAL {lookforward_days} DAYS")
)
# 关联并过滤时间窗口内的事件
merged_df = experiment_df.join(
observational_df,
on=["user_id"],
how="inner"
).filter(
(col("timestamp") >= col("window_start")) &
(col("timestamp") <= col("window_end"))
)
return merged_df
def calculate_behavioral_metrics(self, merged_df: DataFrame) -> DataFrame:
"""
计算行为指标
输出指标:
- 曝光前活动天数
- 曝光后活动天数
- 曝光前转化率
- 曝光后转化率
- 行为变化率
"""
# 按用户聚合
metrics_df = merged_df.groupBy("user_id", "treatment").agg(
# 曝光前指标
count(when(col("timestamp") < col("assignment_time"), True)).alias("pre_activity_count"),
# 曝光后指标
count(when(col("timestamp") >= col("assignment_time"), True)).alias("post_activity_count"),
# 转化事件(如购买)
count(when(
(col("timestamp") >= col("assignment_time")) &
(col("event_name") == "purchase"),
True
)).alias("post_purchase_count"),
# 首个转化时间
min(when(
col("event_name") == "purchase",
col("timestamp")
)).alias("first_purchase_time")
)
# 计算变化率
final_df = metrics_df.withColumn(
"activity_change_rate",
(col("post_activity_count") - col("pre_activity_count")) /
(col("pre_activity_count") + 1) # 平滑处理
).withColumn(
"conversion_indicator",
(col("post_purchase_count") > 0).cast("int")
)
return final_df
# 使用示例
def run_data_triangulation():
spark = SparkSession.builder.getOrCreate()
merger = DataTriangulationMerger(
experiment_table="experiments.assignments_v2",
observational_table="observational.user_behavior"
)
# 执行合并
merged_df = merger.merge_datasets(spark, lookback_days=7, lookforward_days=30)
# 计算指标
metrics_df = merger.calculate_behavioral_metrics(merged_df)
# 写入分析表
metrics_df.write.mode("overwrite").saveAsTable("analysis.triangulated_metrics")
print("数据三角合并完成")
if __name__ == "__main__":
run_data_triangulation()
4.2 倾向得分匹配增强
为应对观测数据的选择偏差,引入倾向得分匹配(PSM)作为第二层验证:
from sklearn.linear_model import LogisticRegression
from sklearn.neighbors import NearestNeighbors
import numpy as np
class PropensityScoreMatcher:
"""
倾向得分匹配器
用于在观测数据中构建伪实验组
"""
def __init__(self, covariates: List[str], caliper: float = 0.1):
"""
初始化PSM
参数:
covariates: 用于估计倾向得分的协变量
caliper: 匹配半径(标准差单位)
"""
self.covariates = covariates
self.caliper = caliper
self.ps_model = LogisticRegression(random_state=42, max_iter=1000)
def estimate_propensity_scores(self, df: pd.DataFrame) -> np.ndarray:
"""
估计倾向得分
实现细节:
1. 使用逻辑回归
2. 支持类别变量自动编码
3. 返回处理概率
"""
X = df[self.covariates]
# 处理缺失值
X = X.fillna(X.median())
# 训练倾向得分模型
# 注意: 实际应使用scikit-learn的Pipeline处理类别变量
self.ps_model.fit(X, df['treatment'])
# 预测倾向得分
propensity_scores = self.ps_model.predict_proba(X)[:, 1]
return propensity_scores
def match_groups(self, df: pd.DataFrame) -> pd.DataFrame:
"""
执行最近邻匹配
匹配算法:
4. 分离处理组和对照组
5. 构建KD树加速搜索
6. 1:1无放回匹配
7. 卡尺限制确保质量
返回:
匹配后的平衡数据集
"""
# 估计倾向得分
df = df.copy()
df['propensity_score'] = self.estimate_propensity_scores(df)
# 分离组
treatment = df[df['treatment'] == 1]
control = df[df['treatment'] == 0]
# 构建匹配
treatment_ps = treatment['propensity_score'].values.reshape(-1, 1)
control_ps = control['propensity_score'].values.reshape(-1, 1)
# 最近邻匹配
nn_match = NearestNeighbors(n_neighbors=1, algorithm='kd_tree')
nn_match.fit(control_ps)
distances, indices = nn_match.kneighbors(treatment_ps)
# 应用卡尺限制
valid_matches = distances.flatten() < self.caliper
matched_treatment = treatment[valid_matches]
matched_control_idx = indices[valid_matches].flatten()
matched_control = control.iloc[matched_control_idx]
# 合并匹配样本
matched_df = pd.concat([matched_treatment, matched_control])
# 验证平衡性
balance_improvement = self._calculate_balance_improvement(df, matched_df)
print(f"匹配后平衡性改善: {balance_improvement:.2%}")
return matched_df
def _calculate_balance_improvement(self, before: pd.DataFrame, after: pd.DataFrame) -> float:
"""计算平衡性改善度"""
before_imbalance = self._calculate_imbalance(before)
after_imbalance = self._calculate_imbalance(after)
improvement = (before_imbalance - after_imbalance) / before_imbalance
return improvement
def _calculate_imbalance(self, df: pd.DataFrame) -> float:
"""计算协变量不平衡度"""
imbalances = []
for cov in self.covariates:
control_mean = df[df['treatment'] == 0][cov].mean()
treatment_mean = df[df['treatment'] == 1][cov].mean()
std_diff = abs(control_mean - treatment_mean) / df[cov].std()
imbalances.append(std_diff)
return np.mean(imbalances)
# 在三角验证框架中应用PSM
class EnhancedTriangulation:
"""
增强型三角验证框架
集成实验数据、原始观测数据、PSM观测数据
"""
def __init__(self, metrics_df: pd.DataFrame):
self.metrics_df = metrics_df
self.results = {}
def analyze_experiment_arm(self, df: pd.DataFrame):
"""分析实验组(金标准)"""
ate = self._calculate_ate(df)
self.results['experiment_ate'] = ate
return ate
def analyze_observational_arm(self, df: pd.DataFrame):
"""分析原始观测数据"""
ate = self._calculate_ate(df)
self.results['observational_ate'] = ate
return ate
def analyze_psm_arm(self, df: pd.DataFrame, covariates: List[str]):
"""分析PSM调整后的观测数据"""
psm = PropensityScoreMatcher(covariates)
matched_df = psm.match_groups(df)
ate = self._calculate_ate(matched_df)
self.results['psm_ate'] = ate
return ate
def _calculate_ate(self, df: pd.DataFrame) -> float:
"""计算平均处理效应"""
treatment_outcome = df[df['treatment'] == 1]['conversion_indicator'].mean()
control_outcome = df[df['treatment'] == 0]['conversion_indicator'].mean()
return treatment_outcome - control_outcome
def generate_triangulation_report(self) -> Dict:
"""
生成三角验证报告
报告维度:
1. 点估计对比
2. 置信区间重叠度
3. 稳健性评分
4. 差异解释
"""
exp_ate = self.results.get('experiment_ate')
obs_ate = self.results.get('observational_ate')
psm_ate = self.results.get('psm_ate')
report = {
'experiment_ate': exp_ate,
'observational_ate': obs_ate,
'psm_ate': psm_ate,
'consistency_score': self._calculate_consistency(exp_ate, obs_ate, psm_ate),
'convergence_evidence': self._evaluate_convergence()
}
return report
def _calculate_consistency(self, *estimates) -> float:
"""计算估计一致性评分"""
# 简化实现:计算变异系数的倒数
estimates = [e for e in estimates if e is not None]
if len(estimates) < 2:
return 0.0
cv = np.std(estimates) / np.mean(estimates)
consistency = 1 / (1 + cv)
return consistency
def _evaluate_convergence(self) -> str:
"""评估证据收敛性"""
# 根据三种方法的估计方向、大小、显著性综合判断
return "强收敛" # 简化返回
# 使用示例
def run_enhanced_triangulation():
# 从分析表读取数据
metrics_df = spark.table("analysis.triangulated_metrics").toPandas()
# 初始化框架
triangulation = EnhancedTriangulation(metrics_df)
# 三条臂分析
exp_ate = triangulation.analyze_experiment_arm(metrics_df)
obs_ate = triangulation.analyze_observational_arm(metrics_df)
psm_ate = triangulation.analyze_psm_arm(
metrics_df,
covariates=['pre_activity_count', 'historical_spend', 'age']
)
# 生成报告
report = triangulation.generate_triangulation_report()
print(json.dumps(report, indent=2))
# 保存报告
with open('triangulation_report.json', 'w') as f:
json.dump(report, f, indent=2)
if __name__ == "__main__":
run_enhanced_triangulation()
V. 实际案例分析:电商平台推荐算法效果评估(2000+字详细分析)
5.1 业务背景与研究问题
案例背景:某头部电商平台计划上线新的推荐算法(基于深度学习的协同过滤模型),替换原有的基于规则的推荐系统。业务方关心三个核心问题:
- 新算法是否能显著提升转化率(主指标)?
- 新算法对用户体验的长期影响如何(次指标:留存率、活跃度)?
- 不同用户群体(新客/老客、高价值/低价值)的效果是否存在异质性?
研究设计挑战:
- 实验周期需覆盖完整用户生命周期(至少30天)
- 推荐效果存在网络效应和延迟效应
- 用户行为受季节性和营销活动影响
- 技术实现涉及多个微服务协调
5.2 实验设计细节
5.2.1 流量分配与随机化策略
采用分层随机化设计,控制关键混淆变量:
| 分层维度 | 分层依据 | 业务逻辑 |
|---|---|---|
| 用户生命周期 | 注册天数(<30天, 30-180天, >180天) | 新老客行为模式差异大 |
| 价值分层 | 历史消费金额(高/中/低) | 确保各层ROI可单独评估 |
| 设备类型 | Android/iOS/Web | 技术实现和体验差异 |
| 地域分组 | 一线/二线/其他城市 | 物流和商品供给差异 |
随机化算法改进:在基础哈希随机化上增加确定性重随机化机制。当检测到某层样本量<200时,触发跨层合并,但保持层内处理分配比例不变。这解决了小层统计功效不足的问题。
代码实现关键片段:
# 小层检测与合并逻辑
min_strata_size = 200
small_strata = strata_stats[strata_stats < min_strata_size].index
if len(small_strata) > 0:
print(f"检测到{len(small_strata)}个小层,触发合并策略")
# 按用户生命周期聚合小层
user_data['merged_strata'] = user_data['strata_key']
for strata in small_strata:
# 提取生命周期段
lifecycle = strata.split('_')[0] # 如'new' from 'new_android'
# 合并到同生命周期的其他层
user_data.loc[
user_data['strata_key'] == strata,
'merged_strata'
] = f"{lifecycle}_merged"
5.2.2 实验指标体系的构建
采用层次化指标矩阵设计:
I. 核心指标(Primary Metrics)
- 转化率(Conversion Rate):下单用户数 / 曝光用户数
- 客单价(Revenue per User):总GMV / 曝光用户数
- 统计显著性:p-value < 0.05(Bonferroni校正)
II. 护栏指标(Guardrail Metrics)
- 页面加载时间:P95 < 2秒
- 推荐位点击率(CTR):避免过度推荐导致的疲劳
- 用户投诉率:上升超过20%自动触发警报
III. 长期观测指标
- 7日留存率:实验后第7天回访率
- 30日复购率:长期购买行为
- 用户活跃度:日均使用时长变化
5.2.3 实验执行与监控
实验上线后,建立实时监控仪表盘,关键监控项包括:
| 监控维度 | 检查频率 | 告警阈值 | 应对措施 |
|---|---|---|---|
| 样本量 | 每小时 | 单日<500用户 | 检查流量分配 |
| SRM检验 | 每6小时 | p<0.001 | 暂停实验排查 |
| 护栏指标 | 实时 | CTR>25%或<5% | 人工审核推荐质量 |
| 显著性 | 每日 | 提前达到显著 | 继续运行至计划周期 |
SRM(Sample Ratio Mismatch)检测是实验健康的必要保障。计算公式:
其中是观测到的各组样本量,是期望的理论值。当p-value < 0.001时,表明随机化系统可能存在bug。
5.3 观测研究设计
5.3.1 日志采集方案
由于实验只能覆盖30%流量,剩余70%用户作为观测数据源,用于构建外部效度验证队列。
采集范围:
- 全量用户:确保观测数据代表性
- 事件粒度:曝光、点击、加购、下单、支付成功
- 上下文信息:推荐位位置、商品类别、用户实时行为序列
数据质量保证措施:
- 客户端SDK:采用批量发送+本地缓存,网络中断后24小时内可恢复
- 服务端日志:双写机制,写入两个独立Kafka集群
- 校验对账:每小时实验组vs对照组事件量对比,差异>5%触发告警
5.3.2 队列构建与特征工程
构建回顾性队列(Retrospective Cohort):
- 入组标准:在观测期间(实验前7天至实验后30天)有至少一次推荐位曝光
- 排除标准:同时参与其他实验的用户
- 分层暴露:根据推荐算法曝光次数分组(高/中/低暴露)
特征工程产出:
| 特征类别 | 特征示例 | 计算逻辑 | 用途 |
|---|---|---|---|
| 行为序列 | 平均浏览深度、跳出率 | 会话窗口聚合 | 用户兴趣建模 |
| 时间模式 | 周中/周末活跃度比 | 时间特征交叉 | 控制季节性 |
| 社交影响 | 好友购买行为 | 图计算 | 网络效应分析 |
| 内容偏好 | 类目集中度 | 熵计算 | 个性化评估 |
5.4 三角验证实施过程
5.4.1 三条证据线的并行分析
证据线A:实验组(A/B Test)
- 数据:30%流量,分层随机化分配
- 分析:意向性分析(ITT),工具变量法(IV)处理非依从性
- 结果:转化率提升2.3%(95% CI: [1.8%, 2.8%]),p=0.003
证据线B:原始观测数据
- 数据:70%非实验流量,按自然曝光算法分组
- 分析:线性回归+稳健标准误,控制混淆变量
- 结果:转化率提升3.1%(95% CI: [2.5%, 3.7%]),看似效果更强
证据线C:PSM调整观测数据
- 数据:从证据线B中提取,应用倾向得分匹配
- 分析:1:1最近邻匹配,卡尺0.15,控制年龄、历史消费、活跃度
- 结果:转化率提升2.5%(95% CI: [1.9%, 3.1%]),与实验组接近
5.4.2 结果对比与解释
| 分析方法 | 点估计 | 95% CI | p值 | 样本量 | 关键假设 |
|---|---|---|---|---|---|
| 实验组 | +2.3% | [1.8%, 2.8%] | 0.003 | 45,000 | 随机化+SUTVA |
| 观测原始 | +3.1% | [2.5%, 3.7%] | <0.001 | 105,000 | 可忽略性 |
| PSM观测 | +2.5% | [1.9%, 3.1%] | 0.002 | 22,000对 | 条件可忽略性 |
三角验证发现:
I. 收敛性证据(Convergence)
三条证据线的效应方向一致(均为正向),且置信区间高度重叠(交集[1.9%, 2.8%]),表明新推荐算法确实有效。这种跨方法的一致性极大增强了结论的可信度。
II. 差异解释(Explanatory)
观测原始组效果(3.1%)高于实验组,经排查发现:
- 选择偏差:算法自动优先服务高活跃度用户(28%偏差)
- 时间效应:观测数据覆盖更长的周末时段(0.4%偏差)
- 网络效应:观测组中社交推荐影响未被隔离(0.4%偏差)
PSM调整后,偏差降低至0.2%,证明匹配有效性。
III. 稳健性评分(Robustness Score)
采用多维度一致性评分:
- 方向一致性:100%(3/3方法正向)
- 大小一致性:计算变异系数CV=0.15 < 0.2阈值
- 显著性一致性:2/3方法在p<0.01水平显著
- 分层一致性:生命周期三段的子群分析结果一致
综合稳健性评分:0.87/1.0(高)
5.4.3 异质性分析发现
利用观测数据的大样本优势,进行深度交互分析:
发现**新客(注册<30天)**效果量(+4.2%)显著高于老客(+1.8%),但实验组因样本量不足未能检测到此差异。这提示:
- 算法对新用户冷启动优化更好
- 应调整流量分配,给新客更高实验比例
- 产品策略:针对老客推荐逻辑需进一步优化
5.5 长期观测结果
实验结束后持续追踪30天,发现:
负面发现:实验组用户在第3周开始,活跃度下降1.2%,虽未达到统计显著(p=0.08),但观测数据(大样本)证实存在显著下降(p=0.02)。
根因分析:
- 推荐疲劳:深度模型过度 exploitation,多样性不足
- 用户适应:初期新鲜感消退
对策:
- 引入探索机制(ε-greedy策略)
- 增加推荐结果多样性指标约束
- 分阶段调整算法参数
5.6 业务决策与影响
基于三角验证结果,业务方做出分阶段上线决策:
第一阶段(当前):
- 对新用户100%启用新算法(效果最显著)
- 老用户保持50%新算法+50%旧算法(监控长期指标)
第二阶段(优化后):
- 解决推荐疲劳问题后,全量推广
- 预计GMV年增量达2.8亿元
技术债务:
- 建立实验-观测数据自动核对系统
- 开发PSM匹配模块,嵌入例行分析流程
- 构建长期效果监控仪表板
5.7 案例总结与方法论反思
三角验证的价值体现:
- 风险降低:单一方法可能导致错误结论(如仅看观测数据会高估效果)
- 洞察深化:实验提供因果推断,观测提供生态效度和异质性分析
- 效率提升:PSM验证后,后续类似实验可减少样本量(节省30%流量成本)
实施挑战:
- 资源投入:观测数据ETL管道维护成本是实验系统的2倍
- 时间成本:完整分析周期从实验的2周延长至6周
- 组织协调:需要数据工程师、分析师、产品经理三方紧密协作
最佳实践:
- 先实验后观测:实验验证核心假设,观测探索边界条件
- 自动化PSM:将匹配流程脚本化,降低使用门槛
- 标准化报告:建立三角验证报告模板,包含一致性评分和差异解释
VI. 代码实现与部署详解
6.1 项目架构设计
mermaid
graph TB
subgraph 数据采集层
A[客户端SDK] --> B[Kafka集群]
C[服务端日志] --> B
end
subgraph 数据存储层
B --> D[Delta Lake]
E[实验配置中心] --> F[PostgreSQL]
end
subgraph 计算层
D --> G[Spark ETL管道]
G --> H[特征工程]
F --> H
H --> I[三角验证引擎]
end
subgraph 服务层
I --> J[REST API]
J --> K[监控仪表板]
J --> L[告警系统]
end
subgraph 部署层
M[Docker容器] --> N[K8s集群]
O[CI/CD流水线] --> M
P[Helm Chart] --> N
end
style I fill:#f9f,stroke:#333,stroke-width:2px
6.2 Docker化部署
6.2.1 基础镜像构建
# Dockerfile
FROM python:3.10-slim as base
# 安装系统依赖
RUN apt-get update && apt-get install -y \
build-essential \
libffi-dev \
libssl-dev \
&& rm -rf /var/lib/apt/lists/*
# 设置工作目录
WORKDIR /app
# 创建非root用户
RUN groupadd -r appuser && useradd -r -g appuser appuser
RUN chown -R appuser:appuser /app
# 切换到非root用户
USER appuser
# 安装Python依赖
COPY --chown=appuser:appuser requirements.txt .
RUN pip install --no-cache-dir --user -r requirements.txt
# 复制应用代码
COPY --chown=appuser:appuser src/ ./src/
# 设置环境变量
ENV PYTHONPATH=/app/src
ENV PATH=/home/appuser/.local/bin:$PATH
# 健康检查
HEALTHCHECK --interval=30s --timeout=3s \
CMD python -c "import requests; requests.get('http://localhost:8000/health')"
# 启动命令
CMD ["python", "src/main.py"]
requirements.txt:
pandas==2.1.1
numpy==1.25.2
pyspark==3.4.1
scikit-learn==1.3.0
fastapi==0.103.0
uvicorn==0.23.2
python-multipart==0.0.6
etcd3==0.12.0
6.2.2 多阶段构建优化
# 多阶段构建:分离编译和运行环境
# 阶段1:构建环境
FROM python:3.10-slim as builder
WORKDIR /build
COPY requirements.txt .
RUN pip install --user -r requirements.txt
# 阶段2:运行环境(更轻量)
FROM python:3.10-slim as runtime
# 仅安装运行时依赖
RUN apt-get update && apt-get install -y libpq5 && rm -rf /var/lib/apt/lists/*
WORKDIR /app
# 从构建阶段复制已安装的包
COPY --from=builder /root/.local /root/.local
# 复制应用代码
COPY src/ ./src/
ENV PATH=/root/.local/bin:$PATH
ENV PYTHONPATH=/app/src
CMD ["python", "src/main.py"]
镜像大小对比:
- 单阶段构建:485MB
- 多阶段构建:218MB(节省55%)
6.3 Kubernetes部署配置
6.3.1 Deployment配置
# k8s/deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: triangulation-engine
namespace: analytics
labels:
app: triangulation
version: v2.1.0
spec:
replicas: 3
strategy:
type: RollingUpdate
rollingUpdate:
maxSurge: 1
maxUnavailable: 0 # 零停机部署
selector:
matchLabels:
app: triangulation
template:
metadata:
labels:
app: triangulation
version: v2.1.0
spec:
serviceAccountName: triangulation-sa
securityContext:
runAsNonRoot: true
runAsUser: 1000
fsGroup: 1000
containers:
- name: engine
image: your-registry/triangulation-engine:v2.1.0
imagePullPolicy: Always
ports:
- containerPort: 8000
name: http
env:
- name: SPARK_MASTER
value: "spark://spark-master:7077"
- name: KAFKA_BROKERS
value: "kafka-1:9092,kafka-2:9092"
- name: DATABASE_URL
valueFrom:
secretKeyRef:
name: db-credentials
key: url
resources:
requests:
cpu: "2"
memory: "4Gi"
limits:
cpu: "4"
memory: "8Gi"
livenessProbe:
httpGet:
path: /health
port: 8000
initialDelaySeconds: 60
periodSeconds: 30
readinessProbe:
httpGet:
path: /ready
port: 8000
initialDelaySeconds: 30
periodSeconds: 10
volumeMounts:
- name: config-volume
mountPath: /app/config
readOnly: true
volumes:
- name: config-volume
configMap:
name: triangulation-config
nodeSelector:
workload-type: analytics
tolerations:
- key: "analytics-workload"
operator: "Equal"
value: "true"
effect: "NoSchedule"
6.3.2 Service与Ingress配置
# k8s/service.yaml
apiVersion: v1
kind: Service
metadata:
name: triangulation-service
namespace: analytics
spec:
selector:
app: triangulation
ports:
- port: 80
targetPort: 8000
type: ClusterIP
---
# k8s/ingress.yaml
apiVersion: networking.k8s.io/v1
kind: Ingress
metadata:
name: triangulation-ingress
namespace: analytics
annotations:
nginx.ingress.kubernetes.io/rate-limit: "100"
nginx.ingress.kubernetes.io/rate-limit-window: "1m"
cert-manager.io/cluster-issuer: "letsencrypt-prod"
spec:
ingressClassName: nginx
tls:
- hosts:
- triangulation.analytics.internal
secretName: triangulation-tls
rules:
- host: triangulation.analytics.internal
http:
paths:
- path: /
pathType: Prefix
backend:
service:
name: triangulation-service
port:
number: 80
6.4 CI/CD集成
6.4.1 GitHub Actions工作流
# .github/workflows/deploy.yml
name: Deploy Triangulation Engine
on:
push:
branches: [ main ]
paths:
- 'src/**'
- 'Dockerfile'
- 'requirements.txt'
env:
REGISTRY: ghcr.io
IMAGE_NAME: ${{ github.repository }}
jobs:
test:
runs-on: ubuntu-latest
strategy:
matrix:
python-version: ["3.10", "3.11"]
steps:
- uses: actions/checkout@v3
- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v4
with:
python-version: ${{ matrix.python-version }}
- name: Cache pip dependencies
uses: actions/cache@v3
with:
path: ~/.cache/pip
key: ${{ runner.os }}-pip-${{ hashFiles('requirements.txt') }}
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install -r requirements.txt
pip install pytest pytest-cov
- name: Run linting
run: |
pip install flake8
flake8 src/ --count --select=E9,F63,F7,F82 --show-source --statistics
flake8 src/ --count --exit-zero --max-complexity=10 --max-line-length=120
- name: Run tests with coverage
run: |
pytest tests/ --cov=src/ --cov-report=xml
- name: Upload coverage to Codecov
uses: codecov/codecov-action@v3
with:
file: ./coverage.xml
build-and-push:
needs: test
runs-on: ubuntu-latest
permissions:
contents: read
packages: write
steps:
- name: Checkout repository
uses: actions/checkout@v3
- name: Log in to Container Registry
uses: docker/login-action@v2
with:
registry: ${{ env.REGISTRY }}
username: ${{ github.actor }}
password: ${{ secrets.GITHUB_TOKEN }}
- name: Extract metadata
id: meta
uses: docker/metadata-action@v4
with:
images: ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}
tags: |
type=semver,pattern={{version}}
type=sha,prefix=git-
type=raw,value=latest,enable={{is_default_branch}}
- name: Build and push Docker image
uses: docker/build-push-action@v4
with:
context: .
push: true
tags: ${{ steps.meta.outputs.tags }}
labels: ${{ steps.meta.outputs.labels }}
cache-from: type=gha
cache-to: type=gha,mode=max
platforms: linux/amd64,linux/arm64
deploy-staging:
needs: build-and-push
runs-on: ubuntu-latest
environment: staging
if: github.ref == 'refs/heads/main'
steps:
- name: Deploy to staging
run: |
echo "Deploying to staging environment"
# 调用kubectl或Helm
helm upgrade triangulation ./helm-chart \
--install \
--namespace analytics-staging \
--set image.tag=git-${{ github.sha }} \
--wait
deploy-production:
needs: deploy-staging
runs-on: ubuntu-latest
environment: production
if: github.ref == 'refs/heads/main'
steps:
- name: Manual approval
uses: hmarr/auto-approve-action@v3
- name: Deploy to production
run: |
echo "Deploying to production"
helm upgrade triangulation ./helm-chart \
--install \
--namespace analytics-production \
--set image.tag=git-${{ github.sha }} \
--set replicaCount=5 \
--wait
6.4.2 Helm Chart配置
# helm-chart/values.yaml
# 默认配置值
replicaCount: 3
image:
repository: ghcr.io/your-org/triangulation-engine
tag: latest
pullPolicy: IfNotPresent
service:
type: ClusterIP
port: 80
ingress:
enabled: true
annotations:
nginx.ingress.kubernetes.io/rate-limit: "100"
hosts:
- host: triangulation.analytics.internal
paths:
- path: /
pathType: Prefix
tls:
- secretName: triangulation-tls
hosts:
- triangulation.analytics.internal
resources:
limits:
cpu: 4
memory: 8Gi
requests:
cpu: 2
memory: 4Gi
autoscaling:
enabled: true
minReplicas: 3
maxReplicas: 10
targetCPUUtilizationPercentage: 70
targetMemoryUtilizationPercentage: 80
nodeSelector:
workload-type: analytics
tolerations:
- key: "analytics-workload"
operator: "Equal"
value: "true"
effect: "NoSchedule"
affinity:
podAntiAffinity:
preferredDuringSchedulingIgnoredDuringExecution:
- weight: 100
podAffinityTerm:
labelSelector:
matchExpressions:
- key: app
operator: In
values:
- triangulation
topologyKey: kubernetes.io/hostname
- 点赞
- 收藏
- 关注作者
评论(0)