生产环境中的因果推理:实时决策系统的架构设计
I. 引言:从相关性到因果性的范式转变
在当今数据驱动的商业环境中,机器学习系统已经能够做出令人惊叹的预测。然而,当我们将这些系统部署到生产环境进行实时决策时,一个根本性的挑战浮现出来:预测相关性并不等同于理解因果关系。一个推荐系统可以预测用户点击某个商品的概率,但它无法告诉我们为什么用户会点击,以及如果我们改变推荐策略会发生什么。
因果推理(Causal Inference)正在改变这一现状。它使我们能够回答诸如"如果我们降低价格10%,销量会增加多少?"或"向这个用户展示广告是否真正促成了购买,而不仅仅是相关性?"这类反事实问题。在生产环境中构建实时因果推理系统,需要将统计学的严谨性与工程实践的健壮性相结合。
II. 系统架构概览:分层设计思想
2.1 整体架构设计
生产级因果推理系统采用分层架构,各层之间通过明确的接口契约进行通信,确保系统的可扩展性和可维护性。
2.2 核心组件职责划分
| 组件名称 | 技术选型 | 主要职责 | 性能要求 |
|---|---|---|---|
| 事件采集器 | Kafka Streams | 实时收集用户行为事件 | < 50ms延迟 |
| 特征工程 | Feast | 生成因果特征向量 | QPS > 10k |
| 因果引擎 | Python+Cython | 执行do-calculus运算 | 单次推理<100ms |
| 模型服务 | FastAPI+Redis | 提供实时决策接口 | 99%ile < 200ms |
| 效果追踪 | PostgreSQL+TimescaleDB | 记录决策结果 | 写入吞吐量>5k/s |
III. 数据基础设施:因果特征的工程化实践
3.1 因果特征的独特性
与传统ML特征不同,因果特征需要明确区分处理变量(Treatment)、结果变量(Outcome)和混淆变量(Confounder)。我们必须追踪变量的干预路径,而不仅仅是相关性。
# 因果特征定义示例(使用Feast特征存储)
from feast import Entity, Feature, FeatureView, ValueType
from google.protobuf.duration_pb2 import Duration
# 定义实体:用户
user = Entity(
name="user_id",
value_type=ValueType.STRING,
description="Unique user identifier"
)
# 定义因果特征视图
causal_user_features = FeatureView(
name="causal_user_features",
entities=["user_id"],
ttl=Duration(seconds=86400), # 24小时
features=[
# 处理变量:是否发放优惠券
Feature(name="treatment_coupon_issued", dtype=ValueType.BOOL),
Feature(name="treatment_coupon_discount", dtype=ValueType.FLOAT),
# 结果变量:购买行为
Feature(name="outcome_purchase_made", dtype=ValueType.BOOL),
Feature(name="outcome_purchase_amount", dtype=ValueType.FLOAT),
# 混淆变量:用户历史行为
Feature(name="confounder_avg_session_duration", dtype=ValueType.FLOAT),
Feature(name="confounder_days_since_last_purchase", dtype=ValueType.INT32),
Feature(name="confounder_user_tier", dtype=ValueType.STRING),
# 工具变量:用于处理内生性(如优惠券发放机制)
Feature(name="instrument_user_location", dtype=ValueType.STRING),
]
)
3.2 反事实数据生成管道
为了训练因果模型,我们需要构建反事实数据集。这需要一个复杂的ETL过程,从原始日志中提取干预前后的状态。
# 反事实数据生成:PySpark实现
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window
def generate_counterfactual_dataset(
raw_logs_path: str,
output_path: str
) -> None:
"""
从原始日志生成反事实训练数据集
核心逻辑:
1. 识别干预事件(优惠券发放)
2. 为每个干预找到匹配的"伪对照组"
3. 构建时间窗口观察结果
"""
spark = SparkSession.builder.appName("CausalETL").getOrCreate()
# 读取原始行为日志
logs = spark.read.parquet(raw_logs_path)
# 步骤1:标记干预事件及其时间戳
treatments = logs.filter(
logs.event_type == "coupon_issued"
).select(
"user_id",
F.col("timestamp").alias("treatment_time"),
F.col("properties.discount_percent").alias("treatment_dose"),
F.lit(1).alias("treatment_assigned") # 实际接受处理
)
# 步骤2:为每个用户构建观察窗口
window_spec = Window.partitionBy("user_id").orderBy("timestamp")
# 计算干预前30天的用户状态(混淆变量)
user_states = logs.withColumn(
"row_num",
F.row_number().over(window_spec)
).groupBy("user_id", "treatment_time").agg(
F.avg(
F.when(
F.datediff("timestamp", "treatment_time").between(-30, -1),
F.col("properties.session_duration")
)
).alias("confounder_avg_session_duration"),
F.countDistinct(
F.when(
F.datediff("timestamp", "treatment_time").between(-30, -1),
F.col("session_id")
)
).alias("confounder_session_frequency")
)
# 步骤3:构建结果变量(干预后7天是否购买)
outcomes = logs.filter(
F.col("event_type") == "purchase"
).groupBy("user_id", "treatment_time").agg(
F.max(
F.when(
F.datediff("timestamp", "treatment_time").between(0, 7),
1
).otherwise(0)
).alias("outcome_purchase_made"),
F.sum(
F.when(
F.datediff("timestamp", "treatment_time").between(0, 7),
F.col("properties.order_value")
)
).alias("outcome_purchase_amount")
)
# 步骤4:创建反事实对照组(通过匹配混淆变量)
# 使用倾向得分匹配(PSM)技术
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import VectorAssembler
# 计算倾向得分
feature_cols = ["confounder_avg_session_duration", "confounder_session_frequency"]
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")
labeled_data = user_states.join(
treatments.select("user_id", "treatment_time", "treatment_assigned"),
["user_id", "treatment_time"],
"left"
).fillna(0, subset=["treatment_assigned"])
assembled = assembler.transform(labeled_data)
# 训练倾向得分模型
ps_model = LogisticRegression(featuresCol="features", labelCol="treatment_assigned")
ps_results = ps_model.fit(assembled).transform(assembled)
# 根据倾向得分匹配对照组
matched_pairs = ps_results.filter(
(F.col("treatment_assigned") == 1) |
(F.col("prediction") < 0.3) # 选择低概率作为对照
)
# 最终数据集:包含处理组和对照组的反事实样本
final_dataset = matched_pairs.join(
outcomes,
["user_id", "treatment_time"],
"left"
).fillna(0, subset=["outcome_purchase_made", "outcome_purchase_amount"])
# 写入特征存储
final_dataset.write.mode("overwrite").parquet(output_path)
print(f"反事实数据集生成完成:{output_path}")
print(f"样本总数:{final_dataset.count()}")
print(f"处理组数量:{final_dataset.filter('treatment_assigned=1').count()}")
IV. 因果推理引擎:从理论到实现
4.1 因果图建模
生产系统的因果图需要版本控制和可视化能力。我们使用NetworkX构建因果图,并通过GraphQL API暴露给其他服务。
# 因果图定义与版本管理
import networkx as nx
from typing import Dict, List, Optional
import json
from datetime import datetime
class CausalGraphModel:
"""
可版本化的因果图模型
核心功能:
1. 支持do-calculus运算
2. 识别后门路径和前门路径
3. 验证因果识别性
4. 导出干预策略
"""
def __init__(self, version: str = "v1.0"):
self.graph = nx.DiGraph()
self.version = version
self.created_at = datetime.now()
self._build_graph()
def _build_graph(self) -> None:
"""定义电商场景的因果结构"""
# 添加节点(变量类型标注)
nodes = {
# 处理变量
"coupon_discount": {"type": "treatment", "description": "优惠券折扣率"},
"ad_exposure": {"type": "treatment", "description": "广告曝光次数"},
# 结果变量
"purchase_decision": {"type": "outcome", "description": "是否购买"},
"purchase_amount": {"type": "outcome", "description": "购买金额"},
# 混淆变量
"user_engagement": {"type": "confounder", "description": "用户参与度"},
"price_sensitivity": {"type": "confounder", "description": "价格敏感度"},
"historical_spend": {"type": "confounder", "description": "历史消费"},
# 中介变量
"cart_value": {"type": "mediator", "description": "购物车金额"},
"browsing_time": {"type": "mediator", "description": "浏览时长"},
# 未观测变量(用于敏感性分析)
"user_intent": {"type": "unobserved", "description": "购买意图(未观测)"}
}
self.graph.add_nodes_from([
(name, attrs) for name, attrs in nodes.items()
])
# 添加因果边(基于领域知识)
edges = [
# 处理 -> 中介
("coupon_discount", "cart_value"),
("ad_exposure", "browsing_time"),
# 中介 -> 结果
("cart_value", "purchase_amount"),
("browsing_time", "purchase_decision"),
# 处理 -> 结果(直接效应)
("coupon_discount", "purchase_decision"),
# 混淆变量关系
("user_engagement", "ad_exposure"),
("user_engagement", "browsing_time"),
("user_engagement", "purchase_decision"),
("price_sensitivity", "coupon_discount"),
("price_sensitivity", "purchase_decision"),
("price_sensitivity", "cart_value"),
("historical_spend", "price_sensitivity"),
("historical_spend", "purchase_amount"),
# 未观测混淆
("user_intent", "ad_exposure"),
("user_intent", "purchase_decision"),
]
self.graph.add_edges_from(edges)
def identify_backdoor_paths(
self,
treatment: str,
outcome: str
) -> List[List[str]]:
"""
识别所有后门路径
后门路径:从处理变量到结果变量,且箭头指向处理变量的路径
这些路径需要被阻断以获得无偏估计
"""
all_paths = []
# 找到所有连接treatment和outcome的简单路径
for path in nx.all_simple_paths(
self.graph,
treatment,
outcome,
cutoff=5
):
# 检查是否为后门路径(第一个边指向treatment)
if len(path) > 2:
first_edge = self.graph.get_edge_data(path[0], path[1])
# 在因果图中,我们需要检查是否存在指向treatment的路径
# 更准确的实现需要考虑边的方向性
# 简化的后门路径检测:路径中包含混淆变量
confounders = [
node for node in path[1:-1]
if self.graph.nodes[node].get("type") == "confounder"
]
if confounders:
all_paths.append(path)
return all_paths
def get_adjustment_set(
self,
treatment: str,
outcome: str
) -> Optional[List[str]]:
"""
使用do-calculus找到最小调整集
基于后门准则:阻断所有后门路径的最小变量集合
"""
backdoor_paths = self.identify_backdoor_paths(treatment, outcome)
if not backdoor_paths:
return [] # 无前门路径,无需调整
# 使用贪心算法寻找最小调整集
# 实际生产中会使用更复杂的算法(如基于图的最小顶点覆盖)
all_confounders = set()
for path in backdoor_paths:
confounders = [
node for node in path
if self.graph.nodes[node].get("type") == "confounder"
]
all_confounders.update(confounders)
return list(all_confounders)
def export_for_serving(self) -> Dict:
"""导出为生产环境可用的配置"""
return {
"version": self.version,
"adjustment_sets": {
"coupon_effect": self.get_adjustment_set(
"coupon_discount",
"purchase_decision"
),
"ad_effect": self.get_adjustment_set(
"ad_exposure",
"purchase_decision"
)
},
"topology": nx.node_link_data(self.graph)
}
4.2 双重机器学习估计器
生产级因果效应估计需要无偏且稳健的方法。双重机器学习(Double ML)通过交叉拟合减少正则化偏差。
# 双重机器学习实现
import numpy as np
from sklearn.base import clone
from sklearn.ensemble import RandomForestRegressor, GradientBoostingClassifier
from sklearn.model_selection import KFold
import joblib
import logging
class DoubleMLEstimator:
"""
生产级双重机器学习估计器
核心特性:
1. 支持连续和二元处理变量
2. 内置交叉验证防止过拟合
3. 提供置信区间估计
4. 可序列化用于模型服务
"""
def __init__(
self,
treatment_model=None,
outcome_model=None,
n_folds: int = 5,
random_state: int = 42
):
self.treatment_model = treatment_model or RandomForestRegressor(
n_estimators=100,
random_state=random_state
)
self.outcome_model = outcome_model or GradientBoostingClassifier(
n_estimators=100,
random_state=random_state
)
self.n_folds = n_folds
self.random_state = random_state
self.is_fitted = False
self.ate_estimate = None
self.ate_std = None
# 日志记录
self.logger = logging.getLogger(__name__)
def fit(
self,
X: np.ndarray, # 混淆变量
T: np.ndarray, # 处理变量
Y: np.ndarray # 结果变量
) -> "DoubleMLEstimator":
"""
训练Double ML模型
实现步骤:
1. K折交叉拟合
2. 第一阶段:预测处理和结果
3. 第二阶段:残差回归估计ATE
"""
n_samples = X.shape[0]
kf = KFold(n_splits=self.n_folds, shuffle=True, random_state=self.random_state)
# 存储交叉验证的预测结果
T_hat = np.zeros_like(T)
Y_hat = np.zeros_like(Y)
self.logger.info(f"开始{self.n_folds}折交叉拟合...")
# 第一阶段:交叉拟合
for train_idx, test_idx in kf.split(X):
# 训练集
X_train, T_train, Y_train = X[train_idx], T[train_idx], Y[train_idx]
# 测试集
X_test = X[test_idx]
# 克隆模型避免数据泄露
treat_model = clone(self.treatment_model)
out_model = clone(self.outcome_model)
# 拟合处理模型
treat_model.fit(X_train, T_train)
T_hat[test_idx] = treat_model.predict(X_test)
# 拟合结果模型
out_model.fit(X_train, Y_train)
Y_hat[test_idx] = out_model.predict(X_test)
# 第二阶段:残差回归
self.logger.info("计算残差并估计ATE...")
# 计算残差
T_residual = T - T_hat
Y_residual = Y - Y_hat
# 残差回归得到ATE估计
# ATE = E[Y_residual * T_residual] / E[T_residual^2]
numerator = np.mean(Y_residual * T_residual)
denominator = np.mean(T_residual ** 2)
self.ate_estimate = numerator / denominator
# 计算标准误(用于置信区间)
self._compute_standard_error(Y_residual, T_residual)
self.is_fitted = True
self.logger.info(f"ATE估计完成: {self.ate_estimate:.4f} ± {self.ate_std:.4f}")
return self
def _compute_standard_error(
self,
Y_residual: np.ndarray,
T_residual: np.ndarray
) -> None:
"""计算ATE的标准误"""
n = len(Y_residual)
# 影响函数(influence function)
psi = (Y_residual - self.ate_estimate * T_residual) * T_residual
# 标准误
self.ate_std = np.sqrt(np.var(psi, ddof=1) / n)
def effect_inference(
self,
confidence_level: float = 0.95
) -> Dict[str, float]:
"""
提供统计推断结果
返回:
- ATE点估计
- 置信区间
- t统计量
"""
if not self.is_fitted:
raise ValueError("模型未拟合,请先调用fit()")
from scipy import stats
# 置信区间
alpha = 1 - confidence_level
z_score = stats.norm.ppf(1 - alpha/2)
ci_lower = self.ate_estimate - z_score * self.ate_std
ci_upper = self.ate_estimate + z_score * self.ate_std
# t统计量
t_stat = self.ate_estimate / self.ate_std
p_value = 2 * (1 - stats.norm.cdf(abs(t_stat)))
return {
"ate": float(self.ate_estimate),
"std_error": float(self.ate_std),
"ci_lower": float(ci_lower),
"ci_upper": float(ci_upper),
"t_statistic": float(t_stat),
"p_value": float(p_value),
"significant": p_value < 0.05
}
def predict_individual_effect(
self,
X: np.ndarray
) -> np.ndarray:
"""
预测条件平均处理效应(CATE)
注意:这需要元学习器(如X-Learner),此处为简化版
"""
if not self.is_fitted:
raise ValueError("模型未拟合")
# 简化的CATE估计:ATE + 基于X的残差调整
# 生产环境应使用更复杂的元学习器
base_effect = np.full(X.shape[0], self.ate_estimate)
return base_effect
def save(self, path: str) -> None:
"""序列化模型到磁盘"""
joblib.dump({
"ate_estimate": self.ate_estimate,
"ate_std": self.ate_std,
"is_fitted": self.is_fitted,
"treatment_model": self.treatment_model,
"outcome_model": self.outcome_model,
"n_folds": self.n_folds
}, path)
self.logger.info(f"模型已保存到 {path}")
@classmethod
def load(cls, path: str) -> "DoubleMLEstimator":
"""从磁盘加载模型"""
data = joblib.load(path)
instance = cls(
treatment_model=data["treatment_model"],
outcome_model=data["outcome_model"],
n_folds=data["n_folds"]
)
instance.ate_estimate = data["ate_estimate"]
instance.ate_std = data["ate_std"]
instance.is_fitted = data["is_fitted"]
return instance
罗马数字列表:因果引擎设计要点
I. 算法选择:Double ML适合高维数据,但计算成本高;线性方法快但假设强
II. 识别性验证:必须在服务前验证因果图是否可识别目标效应
III. 不确定性量化:所有估计必须附带置信区间,避免盲目决策
IV. 计算效率:使用Cython/JIT编译关键路径,<100ms响应时间
V. 模型版本化:每个因果图和估计器都有唯一版本,支持灰度发布
Parse error on line 5: ...nt DM as DoubleML估计器 participant FS -----------------------^ Expecting 'SOLID_OPEN_ARROW', 'DOTTED_OPEN_ARROW', 'SOLID_ARROW', 'DOTTED_ARROW', 'SOLID_CROSS', 'DOTTED_CROSS', 'SOLID_POINT', 'DOTTED_POINT', got 'NEWLINE'
V. 实时决策服务:构建可解释的决策API
5.1 高性能决策服务
生产环境要求决策API在毫秒级返回结果,同时保证因果推理的准确性。我们使用FastAPI构建异步服务,并用Redis缓存频繁访问的因果估计。
# 实时因果决策API
from fastapi import FastAPI, HTTPException, Depends
from pydantic import BaseModel
from typing import List, Dict, Optional
import redis
import numpy as np
import hashlib
import time
from datetime import datetime
app = FastAPI(
title="Causal Decision API",
description="实时因果推理决策服务"
)
# Redis缓存(存储预计算的CATE)
redis_client = redis.Redis(
host='redis-cluster.prod.internal',
port=6379,
db=0,
decode_responses=True
)
# 模型管理器
class ModelManager:
"""管理多版本因果模型"""
def __init__(self):
self.models: Dict[str, DoubleMLEstimator] = {}
self.causal_graphs: Dict[str, CausalGraphModel] = {}
self.default_version = "v1.2"
def load_model(self, version: str) -> None:
"""从模型仓库加载特定版本"""
model_path = f"s3://causal-models/warehouse/{version}/double_ml.joblib"
graph_path = f"s3://causal-models/warehouse/{version}/causal_graph.json"
self.models[version] = DoubleMLEstimator.load(model_path)
self.causal_graphs[version] = CausalGraphModel(version)
app.logger.info(f"模型版本 {version} 加载完成")
def get_model(self, version: Optional[str] = None):
version = version or self.default_version
if version not in self.models:
raise HTTPException(
status_code=404,
detail=f"模型版本 {version} 不存在"
)
return self.models[version], self.causal_graphs[version]
model_manager = ModelManager()
# 输入输出模型
class DecisionRequest(BaseModel):
user_id: str
context: Dict[str, float] # 实时上下文特征
available_treatments: List[Dict[str, any]] # 可选干预
version: Optional[str] = None
class TreatmentEffect(BaseModel):
treatment_id: str
estimated_effect: float
confidence_interval: List[float]
p_value: float
significant: bool
class DecisionResponse(BaseModel):
user_id: str
recommended_treatment: Optional[str]
expected_outcome: float
effects: List[TreatmentEffect]
confidence_threshold: float
computed_at: datetime
def get_cache_key(user_id: str, treatment: Dict) -> str:
"""生成缓存键"""
feature_str = f"{user_id}_{json.dumps(treatment)}"
return f"cate:{hashlib.md5(feature_str.encode()).hexdigest()}"
@app.post("/v1/decide", response_model=DecisionResponse)
async def make_causal_decision(request: DecisionRequest):
"""
实时因果决策接口
决策逻辑:
1. 验证请求
2. 检查缓存
3. 计算各处理效应
4. 应用决策策略
5. 记录日志
"""
start_time = time.time()
# 1. 加载模型
model, causal_graph = model_manager.get_model(request.version)
# 2. 准备特征向量
# 从上下文中提取混淆变量
confounder_values = [
request.context.get("avg_session_duration", 0),
request.context.get("days_since_last_purchase", 30),
request.context.get("historical_spend", 0)
]
X = np.array([confounder_values])
# 3. 计算每个候选干预的效应
effects = []
for treatment in request.available_treatments:
treatment_id = treatment["treatment_id"]
# 检查缓存
cache_key = get_cache_key(request.user_id, treatment)
cached_effect = redis_client.get(cache_key)
if cached_effect:
# 缓存命中
effect_data = json.loads(cached_effect)
else:
# 缓存未命中,计算CATE
# 模拟处理变量(根据treatment参数)
if treatment["type"] == "coupon":
T = np.array([[treatment["discount_percent"]]])
else:
T = np.array([[0]]) # 对照组
# 获取ATE并调整为CATE(此处简化)
ate_info = model.effect_inference()
cate = model.predict_individual_effect(X)[0]
effect_data = {
"treatment_id": treatment_id,
"estimated_effect": float(cate),
"confidence_interval": ate_info["ci_lower"],
"p_value": ate_info["p_value"],
"significant": ate_info["significant"]
}
# 写入缓存(TTL=5分钟)
redis_client.setex(
cache_key,
300,
json.dumps(effect_data)
)
effects.append(TreatmentEffect(**effect_data))
# 4. 应用决策策略
# 策略:选择效应显著且最大的干预
significant_effects = [
e for e in effects if e.significant and e.p_value < 0.05
]
if significant_effects:
# 选择最优干预
best_treatment = max(
significant_effects,
key=lambda x: x.estimated_effect
)
recommended = best_treatment.treatment_id
expected_outcome = best_treatment.estimated_effect
else:
# 无显著效应,选择默认(通常是不干预)
recommended = None
expected_outcome = 0.0
# 5. 记录决策日志(异步)
log_decision(
user_id=request.user_id,
recommendation=recommended,
effects=effects,
latency_ms=(time.time() - start_time) * 1000
)
return DecisionResponse(
user_id=request.user_id,
recommended_treatment=recommended,
expected_outcome=expected_outcome,
effects=effects,
confidence_threshold=0.05,
computed_at=datetime.utcnow()
)
def log_decision(user_id: str, recommendation: str, effects: List, latency_ms: float):
"""异步日志记录"""
log_entry = {
"timestamp": datetime.utcnow().isoformat(),
"user_id": user_id,
"recommendation": recommendation,
"effects": [e.dict() for e in effects],
"latency_ms": latency_ms
}
# 发送到Kafka进行后续分析
# 实际生产中使用异步生产者
kafka_producer.send(
topic="causal_decisions",
value=json.dumps(log_entry).encode()
)
@app.get("/health")
async def health_check():
"""健康检查"""
return {
"status": "healthy",
"model_versions": list(model_manager.models.keys()),
"redis_connected": redis_client.ping(),
"timestamp": datetime.utcnow().isoformat()
}
5.2 决策策略引擎
策略引擎将因果效应转化为业务动作,支持多目标优化和风险约束。
# 策略引擎实现
class PolicyEngine:
"""
基于因果效应的策略优化引擎
支持:
- 多目标优化(收入、留存、成本)
- 风险约束(置信度、预算)
- A/B测试分流
"""
def __init__(
self,
budget_constraint: Optional[float] = None,
risk_aversion: float = 0.5
):
self.budget_constraint = budget_constraint
self.risk_aversion = risk_aversion # 风险厌恶系数
def optimize(
self,
effects: List[TreatmentEffect],
user_segment: str,
context: Dict
) -> Tuple[Optional[str], float]:
"""
多目标策略优化
目标函数:
max Σ (effect_i * confidence_i^α) - λ * cost_i
约束:
- 总预算 <= B
- 最小置信度 >= threshold
"""
# 过滤不满足最小置信度的干预
valid_effects = [
e for e in effects
if e.significant and e.p_value < 0.05
]
if not valid_effects:
return None, 0.0 # 无最优干预
# 计算每个干预的效用分数
scores = []
for effect in valid_effects:
# 效应大小
effect_size = effect.estimated_effect
# 置信度加权(风险调整)
confidence_weight = (1 - effect.p_value) ** self.risk_aversion
# 成本(如果有)
cost = self._get_treatment_cost(effect.treatment_id)
# 效用分数
utility = effect_size * confidence_weight - 0.1 * cost
scores.append((effect.treatment_id, utility, effect_size))
# 选择最优干预
best_treatment, best_utility, expected_effect = max(
scores,
key=lambda x: x[1]
)
# 检查预算约束
if self.budget_constraint:
treatment_cost = self._get_treatment_cost(best_treatment)
if treatment_cost > self.budget_constraint:
return None, 0.0 # 超出预算,不干预
return best_treatment, expected_effect
def _get_treatment_cost(self, treatment_id: str) -> float:
"""获取干预成本映射"""
cost_map = {
"coupon_5pct": 5.0,
"coupon_10pct": 10.0,
"free_shipping": 3.0,
"no_treatment": 0.0
}
return cost_map.get(treatment_id, 0.0)
罗马数字列表:实时服务设计原则
I. 无状态设计:API服务无状态,可水平扩展应对流量峰值
II. 智能缓存:CATE计算使用Redis缓存,减少重复计算
III. 降级策略:模型服务失败时回退到规则引擎
IV. 异步日志:决策日志异步写入,避免阻塞主流程
V. 多版本支持:同时服务多个模型版本,支持灰度发布
VI. 模型训练与更新管道:持续学习
6.1 离线训练管道
因果模型需要定期重训练以适应数据分布变化。我们使用Airflow编排训练管道。
# Airflow DAG定义:因果模型训练管道
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.amazon.aws.operators.s3 import S3CreateObjectOperator
from datetime import datetime, timedelta
import boto3
default_args = {
'owner': 'causal-ml-team',
'depends_on_past': False,
'start_date': datetime(2024, 1, 1),
'email_on_failure': True,
'retries': 3,
'retry_delay': timedelta(minutes=15),
}
dag = DAG(
'causal_model_training',
default_args=default_args,
description='因果推理模型每日训练管道',
schedule_interval='0 2 * * *', # 每天凌晨2点
catchup=False,
max_active_runs=1
)
def check_data_quality(**context):
"""数据质量检查"""
from great_expectations_provider.operators.great_expectations import GreatExpectationsOperator
# 检查反事实数据集的完整性
tasks = GreatExpectationsOperator(
task_id='validate_counterfactual_data',
data_context_root_dir='/great_expectations',
expectation_suite_name='counterfactual_dataset',
data_asset_name='daily_counterfactuals'
)
tasks.execute(context)
def generate_counterfactual_data(**context):
"""生成最新的反事实数据集"""
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("DailyCausalETL").getOrCreate()
# 读取前一天的数据
execution_date = context['ds']
input_path = f"s3://user-logs/dt={execution_date}/*"
output_path = f"s3://causal-training/data/{execution_date}/"
# 调用反事实生成函数
generate_counterfactual_dataset(input_path, output_path)
# 推送XCom供下游任务使用
context['task_instance'].xcom_push(
key='training_data_path',
value=output_path
)
def train_and_validate_model(**context):
"""训练Double ML模型并验证"""
from src.models import DoubleMLEstimator
from src.validation import CausalModelValidator
# 获取数据路径
data_path = context['task_instance'].xcom_pull(
task_ids='generate_counterfactual_data',
key='training_data_path'
)
# 加载数据
import pandas as pd
df = pd.read_parquet(data_path)
# 准备特征
feature_cols = [
'confounder_avg_session_duration',
'confounder_session_frequency',
'confounder_historical_spend'
]
X = df[feature_cols].values
T = df['treatment_coupon_discount'].values.reshape(-1, 1)
Y = df['outcome_purchase_made'].values
# 训练模型
model = DoubleMLEstimator(
treatment_model=RandomForestRegressor(n_estimators=200),
outcome_model=GradientBoostingClassifier(n_estimators=200),
n_folds=5
)
model.fit(X, T, Y)
# 验证因果假设
validator = CausalModelValidator()
validation_results = validator.validate(
model,
X, T, Y,
causal_graph=CausalGraphModel()
)
# 检查效应符号是否符合业务预期
if validation_results['ate_sign_valid'] is False:
raise ValueError("ATE符号与业务常识不符,可能因果图定义错误")
# 保存模型
model_version = f"v{context['ds_nodash']}"
model_path = f"s3://causal-models/staging/{model_version}/"
model.save(model_path + "double_ml.joblib")
# 推送模型信息
context['task_instance'].xcom_push(
key='model_version',
value=model_version
)
return validation_results
def shadow_deployment_test(**context):
"""影子部署测试(Shadow Testing)"""
import requests
model_version = context['task_instance'].xcom_pull(
task_ids='train_and_validate_model',
key='model_version'
)
# 加载测试数据
test_data = load_shadow_test_data()
# 同时查询生产模型和新模型
results = []
for sample in test_data:
prod_response = requests.post(
"https://api.prod.internal/v1/decide",
json=sample
)
shadow_response = requests.post(
"https://api.staging.internal/v1/decide",
json={**sample, "version": model_version}
)
results.append({
"user_id": sample["user_id"],
"prod_recommendation": prod_response.json()["recommended_treatment"],
"shadow_recommendation": shadow_response.json()["recommended_treatment"],
"effect_diff": abs(
prod_response.json()["expected_outcome"] -
shadow_response.json()["expected_outcome"]
)
})
# 分析差异
diff_df = pd.DataFrame(results)
avg_diff = diff_df["effect_diff"].mean()
if avg_diff > 0.05: # 效应差异超过5%
raise ValueError(f"影子测试失败:平均效应差异{avg_diff:.2%}")
return {"shadow_test_passed": True, "avg_effect_diff": avg_diff}
def promote_to_production(**context):
"""模型上线"""
model_version = context['task_instance'].xcom_pull(
task_ids='train_and_validate_model',
key='model_version'
)
# 复制模型到生产仓库
s3 = boto3.client('s3')
staging_prefix = f"s3://causal-models/staging/{model_version}/"
prod_prefix = f"s3://causal-models/production/{model_version}/"
# 实际实现中需要递归复制所有文件
s3.copy_object(
CopySource=staging_prefix + "double_ml.joblib",
Bucket="causal-models",
Key=prod_prefix + "double_ml.joblib"
)
# 更新服务配置(通过Kubernetes ConfigMap)
update_model_version_config(model_version)
return {"promoted_version": model_version}
# 定义任务依赖
data_quality_task = PythonOperator(
task_id='check_data_quality',
python_callable=check_data_quality,
dag=dag
)
generate_data_task = PythonOperator(
task_id='generate_counterfactual_data',
python_callable=generate_counterfactual_data,
dag=dag
)
train_model_task = PythonOperator(
task_id='train_and_validate_model',
python_callable=train_and_validate_model,
dag=dag,
execution_timeout=timedelta(hours=2)
)
shadow_test_task = PythonOperator(
task_id='shadow_deployment_test',
python_callable=shadow_deployment_test,
dag=dag
)
promote_task = PythonOperator(
task_id='promote_to_production',
python_callable=promote_to_production,
dag=dag
)
# 任务流
data_quality_task >> generate_data_task >> train_model_task
train_model_task >> shadow_test_task >> promote_task
6.2 在线学习适应
对于快速变化的环境,离线训练可能不够及时。实现在线学习更新机制。
# 在线学习更新器
class OnlineCausalUpdater:
"""
基于FTRL的在线因果更新
适用于:
- 数据分布快速变化
- 需要实时调整CATE估计
"""
def __init__(
self,
base_model: DoubleMLEstimator,
learning_rate: float = 0.01,
l1_regularization: float = 1.0
):
self.base_model = base_model
self.learning_rate = learning_rate
self.l1 = l1_regularization
# FTRL参数
self.z = None # 累积梯度
self.n = None # 累积平方梯度
self._initialize_weights()
def _initialize_weights(self):
"""初始化在线学习参数"""
# 获取基础模型的CATE估计作为初始权重
self.weights = np.array([self.base_model.ate_estimate])
self.z = np.zeros_like(self.weights)
self.n = np.zeros_like(self.weights)
def partial_fit(
self,
X: np.ndarray,
T: np.ndarray,
Y: np.ndarray,
weights: Optional[np.ndarray] = None
):
"""
在线更新参数
使用重要性采样纠正选择偏差
"""
# 获取基础模型预测
y_pred = self.base_model.predict_individual_effect(X)
# 计算残差
residuals = Y - y_pred
# 重要性权重(处理选择偏差)
if weights is None:
weights = np.ones_like(T)
# FTRL-Proximal更新
for i in range(len(X)):
self._ftrl_update(X[i], residuals[i], weights[i])
def _ftrl_update(self, x: np.ndarray, residual: float, weight: float):
"""单样本FTRL更新"""
# 简化的线性更新(实际应为双稳健估计)
for j in range(len(self.weights)):
sigma = (np.sqrt(self.n[j] + (self.learning_rate * x[j])**2) -
np.sqrt(self.n[j])) / self.learning_rate
self.z[j] += self.learning_rate * x[j] * residual * weight - sigma * self.weights[j]
self.n[j] += (self.learning_rate * x[j])**2
# 权重更新
if abs(self.z[j]) <= self.l1:
self.weights[j] = 0
else:
self.weights[j] = -(self.z[j] -
np.sign(self.z[j]) * self.l1) / (self.l1 + np.sqrt(self.n[j]))
def get_updated_effect(self, X: np.ndarray) -> np.ndarray:
"""获取更新后的CATE估计"""
base_effect = self.base_model.predict_individual_effect(X)
online_adjustment = X @ self.weights.reshape(-1, 1)
return base_effect + online_adjustment.flatten()
# 在线更新 worker(使用Faust)
import faust
app = faust.App(
'causal_online_updater',
broker='kafka://kafka-cluster:9092',
value_serializer='json'
)
decision_log_topic = app.topic('causal_decisions', partitions=16)
@app.agent(decision_log_topic)
async def update_model(stream):
"""
消费决策日志,在线更新模型
处理逻辑:
1. 获取决策和实际结果
2. 构建反事实样本
3. 在线更新参数
"""
updater = OnlineCausalUpdater(
base_model=DoubleMLEstimator.load("s3://causal-models/production/latest/")
)
async for event in stream.group_by(
lambda e: e["user_id"], # 按键分区
name='decisions_by_user'
):
# 检查是否观察到结果(用户实际购买)
if event["outcome_observed"]:
# 构建训练样本
X = np.array([extract_features(event)])
T = np.array([[event["treatment_assigned"]]])
Y = np.array([event["actual_outcome"]])
# 在线更新
updater.partial_fit(X, T, Y)
# 定期保存更新后的模型
if event["count"] % 1000 == 0:
save_online_model(updater)
VII. 监控与可观测性:因果系统的特殊性
因果推理系统需要监控传统ML指标之外的特殊指标:识别性假设是否仍然成立、反事实预测是否准确等。
7.1 因果假设监控
检查关键混淆变量的平衡性,确保随机化(或伪随机化)仍然有效。
# 因果假设监控器
class CausalAssumptionMonitor:
"""
监控因果识别所需的假设
核心假设:
1. 无未观测混淆(敏感性分析)
2. 正值假设(Positivity)
3. 稳定单位处理值假设(SUTVA)
"""
def __init__(self, causal_graph: CausalGraphModel):
self.graph = causal_graph
self.metrics = {}
def check_positivity(self, data: pd.DataFrame, treatment: str) -> Dict:
"""
检查正值假设:每个特征组合都有接受各处理的可能性
诊断:
- 倾向得分接近0或1的样本过多
- 某些子空间缺乏处理/对照样本
"""
from scipy import stats
# 计算倾向得分
ps_model = LogisticRegression()
feature_cols = [
c for c in data.columns
if c.startswith("confounder_")
]
X = data[feature_cols]
T = data[treatment]
ps_model.fit(X, T)
propensity_scores = ps_model.predict_proba(X)[:, 1]
# 检查极端倾向得分比例
extreme_propensity = np.mean(
(propensity_scores < 0.01) | (propensity_scores > 0.99)
)
# 计算重叠系数
treated_ps = propensity_scores[T == 1]
control_ps = propensity_scores[T == 0]
# 使用核密度估计计算重叠
kde_treated = stats.gaussian_kde(treated_ps)
kde_control = stats.gaussian_kde(control_ps)
x_range = np.linspace(0, 1, 1000)
overlap = np.trapz(
np.minimum(kde_treated(x_range), kde_control(x_range)),
x_range
)
return {
"positivity_violation": extreme_propensity > 0.1,
"extreme_propensity_ratio": float(extreme_propensity),
"overlap_coefficient": float(overlap),
"warning": overlap < 0.5
}
def detect_unobserved_confounding(self, residuals: np.ndarray) -> Dict:
"""
通过残差模式检测未观测混淆
方法:如果残差与混淆变量存在系统性相关,可能暗示未观测变量
"""
from statsmodels.stats.diagnostic import het_white
# 异方差检验(简化版)
# 显著的异方差可能暗示遗漏变量
try:
lm, p_value, f_stat, fp_value = het_white(
residuals,
np.column_stack([
np.ones(len(residuals)),
np.arange(len(residuals))
])
)
return {
"heteroskedasticity_pvalue": float(p_value),
"suspicious": p_value < 0.01,
"recommendation": (
"检测到异方差,建议检查遗漏变量"
if p_value < 0.01
else "未发现明显异方差"
)
}
except Exception as e:
return {
"error": str(e),
"recommendation": "检测失败,需人工审查"
}
def monitor_sutva_violation(self, network_data: pd.DataFrame) -> Dict:
"""
监控SUTVA违反:处理效应是否受其他单位影响
在社交网络中,优惠券可能影响朋友购买行为(溢出效应)
"""
# 计算网络相关性
# 简化的实现:检查空间/时间自相关
from sklearn.neighbors import NearestNeighbors
# 基于地理位置的邻近性
coords = network_data[['lat', 'lng']].values
nbrs = NearestNeighbors(n_neighbors=5).fit(coords)
# 计算空间自相关Moran's I
treatment_effects = network_data['observed_effects'].values
# 简化的空间自相关计算
distances, indices = nbrs.kneighbors(coords)
spatial_lag = np.mean(treatment_effects[indices[:, 1:]], axis=1)
correlation = np.corrcoef(treatment_effects, spatial_lag)[0, 1]
return {
"spatial_autocorrelation": float(correlation),
"sutva_risk": abs(correlation) > 0.3,
"recommendation": (
"检测到溢出效应,需使用网络因果推断"
if abs(correlation) > 0.3
else "SUTVA假设可能成立"
)
}
# Prometheus监控指标
from prometheus_client import Counter, Histogram, Gauge
# 因果特异性指标
causal_positivity_violations = Counter(
'causal_positivity_violations_total',
'正值假设违反次数'
)
causal_effect_sign_flips = Counter(
'causal_effect_sign_flips_total',
'效应符号翻转次数(与前版本对比)'
)
cate_prediction_latency = Histogram(
'cate_prediction_duration_seconds',
'CATE预测延迟'
)
cate_confidence_width = Gauge(
'cate_confidence_interval_width',
'置信区间宽度(反映不确定性)'
)
@app.middleware("http")
async def monitor_causal_metrics(request: Request, call_next):
"""监控中间件"""
if request.url.path == "/v1/decide":
# 记录预测延迟
with cate_prediction_latency.time():
response = await call_next(request)
# 记录置信区间宽度
# 实际需要从response.body中解析
cate_confidence_width.set(...)
return response
return await call_next(request)
7.2 反事实预测验证
监控反事实预测的准确性是因果系统的关键挑战。我们使用随机对照试验(RCT)数据进行回测。
# 反事实验证器
class CounterfactualValidator:
"""
使用RCT数据验证反事实预测
核心思想:
- 在RCT中我们知道 ground truth 的个体处理效应(ITE)
- 比较模型预测的CATE与真实的ITE
"""
def __init__(self, rct_data_path: str):
self.rct_data = pd.read_parquet(rct_data_path)
def calculate_pehe(self, model: DoubleMLEstimator) -> float:
"""
计算Precision in Estimation of Heterogeneous Effect (PEHE)
PEHE = sqrt(1/n Σ (CATE(x_i) - ITE(x_i))^2)
其中ITE在RCT中可观测:ITE = Y_i(1) - Y_i(0)
"""
X = self.rct_data[['feature1', 'feature2', 'feature3']].values
# 预测CATE
cate_pred = model.predict_individual_effect(X)
# 计算真实ITE(仅在RCT中可行)
# 需要同时观测处理和对照结果(通过交叉验证)
true_ite = self._estimate_true_ite()
pehe = np.sqrt(np.mean((cate_pred - true_ite) ** 2))
return pehe
def _estimate_true_ite(self) -> np.ndarray:
"""在RCT中估计真实ITE"""
# 使用T-learner估计ITE作为ground truth
from sklearn.ensemble import RandomForestRegressor
X = self.rct_data[['feature1', 'feature2', 'feature3']]
T = self.rct_data['treatment']
Y = self.rct_data['outcome']
# 训练处理组和对照组模型
model_treated = RandomForestRegressor()
model_control = RandomForestRegressor()
model_treated.fit(X[T == 1], Y[T == 1])
model_control.fit(X[T == 0], Y[T == 0])
# 预测反事实结果
y1_pred = model_treated.predict(X)
y0_pred = model_control.predict(X)
return y1_pred - y0_pred
def policy_risk_evaluation(
self,
model: DoubleMLEstimator,
budget: float = 0.3
) -> Dict:
"""
策略风险评估:如果根据模型做决策,效果如何?
模拟:
- 根据CATE排序,对前budget%的用户施加处理
- 计算期望收益
- 与最优策略对比(需要先知)
"""
n_samples = len(self.rct_data)
n_treated = int(budget * n_samples)
X = self.rct_data[['feature1', 'feature2', 'feature3']].values
Y = self.rct_data['outcome'].values
# 模型预测CATE
cate_scores = model.predict_individual_effect(X)
# 模型策略:处理CATE最高的用户
treatment_mask_model = np.zeros(n_samples, dtype=bool)
treatment_mask_model[np.argsort(-cate_scores)[:n_treated]] = True
# 计算模型策略的收益
model_revenue = Y[treatment_mask_model].sum()
# 最优策略(先知):处理真实ITE最高的用户
true_ite = self._estimate_true_ite()
optimal_mask = np.zeros(n_samples, dtype=bool)
optimal_mask[np.argsort(-true_ite)[:n_treated]] = True
optimal_revenue = Y[optimal_mask].sum()
# 策略遗憾(Regret)
regret = (optimal_revenue - model_revenue) / optimal_revenue
return {
"model_revenue": float(model_revenue),
"optimal_revenue": float(optimal_revenue),
"regret": float(regret),
"regret_pct": f"{regret*100:.2f}%",
"policy_quality": (
"优秀" if regret < 0.05 else
"良好" if regret < 0.15 else
"需改进"
)
}
# Grafana监控仪表板配置(JSON片段)
grafana_dashboard = {
"panels": [
{
"title": "因果效应估计趋势",
"targets": [{
"expr": "avg(causal_ate_estimate)",
"legendFormat": "ATE",
}],
"alert": {
"conditions": [{
"evaluator": {"params": [0], "type": "lt"},
"operator": {"type": "and"},
"query": {"params": ["A", "5m", "now"]},
"reducer": {"params": [], "type": "avg"},
"type": "query"
}],
"message": "ATE变为负值,可能因果图错误"
}
},
{
"title": "正值假设健康状况",
"targets": [{
"expr": "causal_positivity_violations_total",
"legendFormat": "违反次数",
}]
},
{
"title": "策略遗憾(Regret)",
"targets": [{
"expr": "causal_policy_regret_pct",
"legendFormat": "模型vs最优",
}],
"alert": {
"conditions": [{
"evaluator": {"params": [0.2], "type": "gt"},
"operator": {"type": "and"},
"query": {"params": ["A", "1h", "now"]},
"reducer": {"params": [], "type": "avg"},
"type": "query"
}],
"message": "策略遗憾超过20%,模型需要重新训练"
}
}
]
}
罗马数字列表:监控关键指标
I. 识别性监控:正值假设违反率、工具变量相关性
II. 效应稳定性:ATE漂移检测,跨版本符号一致性
III. 反事实准确性:PEHE指标、RCT回测遗憾
IV. 业务影响:实际采纳率、ROI、A/B测试胜率
V. 系统性能:P99延迟、缓存命中率、模型加载时间
VIII. 生产部署实战:Kubernetes与Docker
8.1 容器化与编排
将因果推理服务容器化,使用Kubernetes管理生命周期和扩缩容。
# Dockerfile:因果决策服务
FROM python:3.10-slim as base
# 安装系统依赖
RUN apt-get update && apt-get install -y \
gcc \
g++ \
libgomp1 \
&& rm -rf /var/lib/apt/lists/*
# 设置工作目录
WORKDIR /app
# 安装Python依赖
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# 复制应用代码
COPY src/ ./src/
COPY models/ ./models/
# 创建非root用户
RUN useradd -m -u 1000 causaluser
RUN chown -R causaluser:causaluser /app
USER causaluser
# 健康检查
HEALTHCHECK --interval=30s --timeout=10s --start-period=5s --retries=3 \
CMD curl -f http://localhost:8000/health || exit 1
# 启动命令
CMD ["uvicorn", "src.api:app", "--host", "0.0.0.0", "--port", "8000", "--workers", "4"]
# Kubernetes Deployment配置
apiVersion: apps/v1
kind: Deployment
metadata:
name: causal-decision-api
namespace: ml-services
labels:
app: causal-api
version: v1.2
spec:
replicas: 5
selector:
matchLabels:
app: causal-api
template:
metadata:
labels:
app: causal-api
version: v1.2
annotations:
prometheus.io/scrape: "true"
prometheus.io/port: "8000"
prometheus.io/path: "/metrics"
spec:
# 服务账户(用于访问S3和参数存储)
serviceAccountName: ml-service-account
# 安全上下文
securityContext:
runAsNonRoot: true
runAsUser: 1000
fsGroup: 1000
# 初始化容器:下载模型
initContainers:
- name: model-downloader
image: amazon/aws-cli:latest
command: ["sh", "-c"]
args:
- |
aws s3 sync s3://causal-models/production/v1.2/ /models/
echo "模型下载完成"
volumeMounts:
- name: model-volume
mountPath: /models
# 资源请求
resources:
requests:
memory: "128Mi"
cpu: "100m"
limits:
memory: "256Mi"
cpu: "200m"
containers:
- name: api
image: causal-decision-api:v1.2
ports:
- containerPort: 8000
name: http
protocol: TCP
# 环境变量
env:
- name: MODEL_VERSION
value: "v1.2"
- name: REDIS_HOST
valueFrom:
secretKeyRef:
name: redis-credentials
key: host
- name: AWS_REGION
value: "us-east-1"
# 资源请求与限制
resources:
requests:
memory: "2Gi"
cpu: "1000m"
limits:
memory: "4Gi"
cpu: "2000m"
# 就绪探针
readinessProbe:
httpGet:
path: /health
port: 8000
initialDelaySeconds: 10
periodSeconds: 5
timeoutSeconds: 3
failureThreshold: 3
# 存活探针
livenessProbe:
httpGet:
path: /health
port: 8000
initialDelaySeconds: 30
periodSeconds: 30
timeoutSeconds: 5
failureThreshold: 5
# 模型卷挂载
volumeMounts:
- name: model-volume
mountPath: /app/models
readOnly: true
# 安全上下文
securityContext:
allowPrivilegeEscalation: false
capabilities:
drop:
- ALL
volumes:
- name: model-volume
emptyDir:
sizeLimit: "2Gi"
# 节点亲和性(优先使用计算优化型实例)
affinity:
nodeAffinity:
preferredDuringSchedulingIgnoredDuringExecution:
- weight: 1
preference:
matchExpressions:
- key: node-type
operator: In
values:
- compute-optimized
# 拓扑分布约束
topologySpreadConstraints:
- maxSkew: 1
topologyKey: topology.kubernetes.io/zone
whenUnsatisfiable: DoNotSchedule
labelSelector:
matchLabels:
app: causal-api
---
# HorizontalPodAutoscaler配置
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
name: causal-api-hpa
namespace: ml-services
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: causal-decision-api
minReplicas: 5
maxReplicas: 50
metrics:
- type: Resource
resource:
name: cpu
target:
type: Utilization
averageUtilization: 70
- type: Resource
resource:
name: memory
target:
type: Utilization
averageUtilization: 80
- type: Pods
pods:
metric:
name: cate_prediction_duration_seconds
target:
type: AverageValue
averageValue: "200m" # 200ms平均延迟
behavior:
scaleUp:
stabilizationWindowSeconds: 60
policies:
- type: Percent
value: 100
periodSeconds: 15
scaleDown:
stabilizationWindowSeconds: 300
policies:
- type: Percent
value: 50
periodSeconds: 60
---
# Service配置
apiVersion: v1
kind: Service
metadata:
name: causal-decision-api-service
namespace: ml-services
annotations:
service.beta.kubernetes.io/aws-load-balancer-type: "nlb"
spec:
type: LoadBalancer
ports:
- port: 80
targetPort: 8000
protocol: TCP
name: http
selector:
app: causal-api
8.2 服务网格集成
使用Istio实现流量管理、熔断和可观测性。
# Istio VirtualService配置(金丝雀发布)
apiVersion: networking.istio.io/v1beta1
kind: VirtualService
metadata:
name: causal-api-virtualservice
namespace: ml-services
spec:
hosts:
- causal-api.prod.internal
http:
- match:
- headers:
x-model-version:
exact: "v1.2"
route:
- destination:
host: causal-decision-api-service
subset: v1.2
weight: 100
- route:
- destination:
host: causal-decision-api-service
subset: v1.1
weight: 90
- destination:
host: causal-decision-api-service
subset: v1.2
weight: 10 # 10%流量到新版本
---
# DestinationRule配置
apiVersion: networking.istio.io/v1beta1
kind: DestinationRule
metadata:
name: causal-api-destinationrule
namespace: ml-services
spec:
host: causal-decision-api-service
subsets:
- name: v1.1
labels:
version: v1.1
- name: v1.2
labels:
version: v1.2
trafficPolicy:
connectionPool:
tcp:
maxConnections: 100
http:
http1MaxPendingRequests: 50
maxRequestsPerConnection: 3
circuitBreaker:
consecutiveErrors: 5
interval: 30s
baseEjectionTime: 30s
maxEjectionPercent: 50
outlierDetection:
consecutiveErrors: 5
interval: 30s
baseEjectionTime: 30s
retries:
attempts: 3
perTryTimeout: 200ms
timeout: 1s
---
# Istio Gateway配置(外部访问)
apiVersion: networking.istio.io/v1beta1
kind: Gateway
metadata:
name: causal-api-gateway
namespace: ml-services
spec:
selector:
istio: ingressgateway
servers:
- port:
number: 443
name: https
protocol: HTTPS
tls:
mode: SIMPLE
credentialName: causal-api-tls-cert
hosts:
- "causal-api.company.com"
- 点赞
- 收藏
- 关注作者
评论(0)