生产环境中的因果推理:实时决策系统的架构设计

举报
数字扫地僧 发表于 2025/12/22 09:36:17 2025/12/22
【摘要】 I. 引言:从相关性到因果性的范式转变在当今数据驱动的商业环境中,机器学习系统已经能够做出令人惊叹的预测。然而,当我们将这些系统部署到生产环境进行实时决策时,一个根本性的挑战浮现出来:预测相关性并不等同于理解因果关系。一个推荐系统可以预测用户点击某个商品的概率,但它无法告诉我们为什么用户会点击,以及如果我们改变推荐策略会发生什么。因果推理(Causal Inference)正在改变这一现状...

I. 引言:从相关性到因果性的范式转变

在当今数据驱动的商业环境中,机器学习系统已经能够做出令人惊叹的预测。然而,当我们将这些系统部署到生产环境进行实时决策时,一个根本性的挑战浮现出来:预测相关性并不等同于理解因果关系。一个推荐系统可以预测用户点击某个商品的概率,但它无法告诉我们为什么用户会点击,以及如果我们改变推荐策略会发生什么。

因果推理(Causal Inference)正在改变这一现状。它使我们能够回答诸如"如果我们降低价格10%,销量会增加多少?"或"向这个用户展示广告是否真正促成了购买,而不仅仅是相关性?"这类反事实问题。在生产环境中构建实时因果推理系统,需要将统计学的严谨性与工程实践的健壮性相结合。

II. 系统架构概览:分层设计思想

2.1 整体架构设计

生产级因果推理系统采用分层架构,各层之间通过明确的接口契约进行通信,确保系统的可扩展性和可维护性。

监控层
服务层
模型层
数据层
效果追踪
Tracking
性能监控
Prometheus
模型漂移检测
Drift Detection
实时决策API
FastAPI
A/B测试框架
Experimentation
策略引擎
Policy Engine
因果发现引擎
Do-Calculus
因果图模型
NetworkX
效应估计器
Double ML
特征存储
Feature Store
实时事件流
Kafka
事务数据库
MySQL/PostgreSQL
离线计算
Spark
数据湖
S3/HDFS

2.2 核心组件职责划分

组件名称 技术选型 主要职责 性能要求
事件采集器 Kafka Streams 实时收集用户行为事件 < 50ms延迟
特征工程 Feast 生成因果特征向量 QPS > 10k
因果引擎 Python+Cython 执行do-calculus运算 单次推理<100ms
模型服务 FastAPI+Redis 提供实时决策接口 99%ile < 200ms
效果追踪 PostgreSQL+TimescaleDB 记录决策结果 写入吞吐量>5k/s

III. 数据基础设施:因果特征的工程化实践

3.1 因果特征的独特性

与传统ML特征不同,因果特征需要明确区分处理变量(Treatment)结果变量(Outcome)混淆变量(Confounder)。我们必须追踪变量的干预路径,而不仅仅是相关性。

# 因果特征定义示例(使用Feast特征存储)
from feast import Entity, Feature, FeatureView, ValueType
from google.protobuf.duration_pb2 import Duration

# 定义实体:用户
user = Entity(
    name="user_id",
    value_type=ValueType.STRING,
    description="Unique user identifier"
)

# 定义因果特征视图
causal_user_features = FeatureView(
    name="causal_user_features",
    entities=["user_id"],
    ttl=Duration(seconds=86400),  # 24小时
    features=[
        # 处理变量:是否发放优惠券
        Feature(name="treatment_coupon_issued", dtype=ValueType.BOOL),
        Feature(name="treatment_coupon_discount", dtype=ValueType.FLOAT),
        
        # 结果变量:购买行为
        Feature(name="outcome_purchase_made", dtype=ValueType.BOOL),
        Feature(name="outcome_purchase_amount", dtype=ValueType.FLOAT),
        
        # 混淆变量:用户历史行为
        Feature(name="confounder_avg_session_duration", dtype=ValueType.FLOAT),
        Feature(name="confounder_days_since_last_purchase", dtype=ValueType.INT32),
        Feature(name="confounder_user_tier", dtype=ValueType.STRING),
        
        # 工具变量:用于处理内生性(如优惠券发放机制)
        Feature(name="instrument_user_location", dtype=ValueType.STRING),
    ]
)

3.2 反事实数据生成管道

为了训练因果模型,我们需要构建反事实数据集。这需要一个复杂的ETL过程,从原始日志中提取干预前后的状态。

# 反事实数据生成:PySpark实现
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window

def generate_counterfactual_dataset(
    raw_logs_path: str, 
    output_path: str
) -> None:
    """
    从原始日志生成反事实训练数据集
    
    核心逻辑:
    1. 识别干预事件(优惠券发放)
    2. 为每个干预找到匹配的"伪对照组"
    3. 构建时间窗口观察结果
    """
    spark = SparkSession.builder.appName("CausalETL").getOrCreate()
    
    # 读取原始行为日志
    logs = spark.read.parquet(raw_logs_path)
    
    # 步骤1:标记干预事件及其时间戳
    treatments = logs.filter(
        logs.event_type == "coupon_issued"
    ).select(
        "user_id",
        F.col("timestamp").alias("treatment_time"),
        F.col("properties.discount_percent").alias("treatment_dose"),
        F.lit(1).alias("treatment_assigned")  # 实际接受处理
    )
    
    # 步骤2:为每个用户构建观察窗口
    window_spec = Window.partitionBy("user_id").orderBy("timestamp")
    
    # 计算干预前30天的用户状态(混淆变量)
    user_states = logs.withColumn(
        "row_num", 
        F.row_number().over(window_spec)
    ).groupBy("user_id", "treatment_time").agg(
        F.avg(
            F.when(
                F.datediff("timestamp", "treatment_time").between(-30, -1),
                F.col("properties.session_duration")
            )
        ).alias("confounder_avg_session_duration"),
        
        F.countDistinct(
            F.when(
                F.datediff("timestamp", "treatment_time").between(-30, -1),
                F.col("session_id")
            )
        ).alias("confounder_session_frequency")
    )
    
    # 步骤3:构建结果变量(干预后7天是否购买)
    outcomes = logs.filter(
        F.col("event_type") == "purchase"
    ).groupBy("user_id", "treatment_time").agg(
        F.max(
            F.when(
                F.datediff("timestamp", "treatment_time").between(0, 7),
                1
            ).otherwise(0)
        ).alias("outcome_purchase_made"),
        
        F.sum(
            F.when(
                F.datediff("timestamp", "treatment_time").between(0, 7),
                F.col("properties.order_value")
            )
        ).alias("outcome_purchase_amount")
    )
    
    # 步骤4:创建反事实对照组(通过匹配混淆变量)
    # 使用倾向得分匹配(PSM)技术
    from pyspark.ml.classification import LogisticRegression
    from pyspark.ml.feature import VectorAssembler
    
    # 计算倾向得分
    feature_cols = ["confounder_avg_session_duration", "confounder_session_frequency"]
    assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")
    
    labeled_data = user_states.join(
        treatments.select("user_id", "treatment_time", "treatment_assigned"),
        ["user_id", "treatment_time"],
        "left"
    ).fillna(0, subset=["treatment_assigned"])
    
    assembled = assembler.transform(labeled_data)
    
    # 训练倾向得分模型
    ps_model = LogisticRegression(featuresCol="features", labelCol="treatment_assigned")
    ps_results = ps_model.fit(assembled).transform(assembled)
    
    # 根据倾向得分匹配对照组
    matched_pairs = ps_results.filter(
        (F.col("treatment_assigned") == 1) | 
        (F.col("prediction") < 0.3)  # 选择低概率作为对照
    )
    
    # 最终数据集:包含处理组和对照组的反事实样本
    final_dataset = matched_pairs.join(
        outcomes,
        ["user_id", "treatment_time"],
        "left"
    ).fillna(0, subset=["outcome_purchase_made", "outcome_purchase_amount"])
    
    # 写入特征存储
    final_dataset.write.mode("overwrite").parquet(output_path)
    
    print(f"反事实数据集生成完成:{output_path}")
    print(f"样本总数:{final_dataset.count()}")
    print(f"处理组数量:{final_dataset.filter('treatment_assigned=1').count()}")


原始事件日志
是否为
干预事件?
标记处理组
候选对照组
提取干预前特征
计算倾向得分
得分是否
匹配?
创建匹配对
丢弃样本
观察干预后结果
反事实数据集

IV. 因果推理引擎:从理论到实现

4.1 因果图建模

生产系统的因果图需要版本控制和可视化能力。我们使用NetworkX构建因果图,并通过GraphQL API暴露给其他服务。

# 因果图定义与版本管理
import networkx as nx
from typing import Dict, List, Optional
import json
from datetime import datetime

class CausalGraphModel:
    """
    可版本化的因果图模型
    
    核心功能:
    1. 支持do-calculus运算
    2. 识别后门路径和前门路径
    3. 验证因果识别性
    4. 导出干预策略
    """
    
    def __init__(self, version: str = "v1.0"):
        self.graph = nx.DiGraph()
        self.version = version
        self.created_at = datetime.now()
        self._build_graph()
        
    def _build_graph(self) -> None:
        """定义电商场景的因果结构"""
        
        # 添加节点(变量类型标注)
        nodes = {
            # 处理变量
            "coupon_discount": {"type": "treatment", "description": "优惠券折扣率"},
            "ad_exposure": {"type": "treatment", "description": "广告曝光次数"},
            
            # 结果变量
            "purchase_decision": {"type": "outcome", "description": "是否购买"},
            "purchase_amount": {"type": "outcome", "description": "购买金额"},
            
            # 混淆变量
            "user_engagement": {"type": "confounder", "description": "用户参与度"},
            "price_sensitivity": {"type": "confounder", "description": "价格敏感度"},
            "historical_spend": {"type": "confounder", "description": "历史消费"},
            
            # 中介变量
            "cart_value": {"type": "mediator", "description": "购物车金额"},
            "browsing_time": {"type": "mediator", "description": "浏览时长"},
            
            # 未观测变量(用于敏感性分析)
            "user_intent": {"type": "unobserved", "description": "购买意图(未观测)"}
        }
        
        self.graph.add_nodes_from([
            (name, attrs) for name, attrs in nodes.items()
        ])
        
        # 添加因果边(基于领域知识)
        edges = [
            # 处理 -> 中介
            ("coupon_discount", "cart_value"),
            ("ad_exposure", "browsing_time"),
            
            # 中介 -> 结果
            ("cart_value", "purchase_amount"),
            ("browsing_time", "purchase_decision"),
            
            # 处理 -> 结果(直接效应)
            ("coupon_discount", "purchase_decision"),
            
            # 混淆变量关系
            ("user_engagement", "ad_exposure"),
            ("user_engagement", "browsing_time"),
            ("user_engagement", "purchase_decision"),
            
            ("price_sensitivity", "coupon_discount"),
            ("price_sensitivity", "purchase_decision"),
            ("price_sensitivity", "cart_value"),
            
            ("historical_spend", "price_sensitivity"),
            ("historical_spend", "purchase_amount"),
            
            # 未观测混淆
            ("user_intent", "ad_exposure"),
            ("user_intent", "purchase_decision"),
        ]
        
        self.graph.add_edges_from(edges)
        
    def identify_backdoor_paths(
        self, 
        treatment: str, 
        outcome: str
    ) -> List[List[str]]:
        """
        识别所有后门路径
        
        后门路径:从处理变量到结果变量,且箭头指向处理变量的路径
        这些路径需要被阻断以获得无偏估计
        """
        all_paths = []
        
        # 找到所有连接treatment和outcome的简单路径
        for path in nx.all_simple_paths(
            self.graph, 
            treatment, 
            outcome, 
            cutoff=5
        ):
            # 检查是否为后门路径(第一个边指向treatment)
            if len(path) > 2:
                first_edge = self.graph.get_edge_data(path[0], path[1])
                # 在因果图中,我们需要检查是否存在指向treatment的路径
                # 更准确的实现需要考虑边的方向性
                
                # 简化的后门路径检测:路径中包含混淆变量
                confounders = [
                    node for node in path[1:-1] 
                    if self.graph.nodes[node].get("type") == "confounder"
                ]
                if confounders:
                    all_paths.append(path)
        
        return all_paths
    
    def get_adjustment_set(
        self, 
        treatment: str, 
        outcome: str
    ) -> Optional[List[str]]:
        """
        使用do-calculus找到最小调整集
        
        基于后门准则:阻断所有后门路径的最小变量集合
        """
        backdoor_paths = self.identify_backdoor_paths(treatment, outcome)
        
        if not backdoor_paths:
            return []  # 无前门路径,无需调整
        
        # 使用贪心算法寻找最小调整集
        # 实际生产中会使用更复杂的算法(如基于图的最小顶点覆盖)
        
        all_confounders = set()
        for path in backdoor_paths:
            confounders = [
                node for node in path 
                if self.graph.nodes[node].get("type") == "confounder"
            ]
            all_confounders.update(confounders)
        
        return list(all_confounders)
    
    def export_for_serving(self) -> Dict:
        """导出为生产环境可用的配置"""
        return {
            "version": self.version,
            "adjustment_sets": {
                "coupon_effect": self.get_adjustment_set(
                    "coupon_discount", 
                    "purchase_decision"
                ),
                "ad_effect": self.get_adjustment_set(
                    "ad_exposure", 
                    "purchase_decision"
                )
            },
            "topology": nx.node_link_data(self.graph)
        }

4.2 双重机器学习估计器

生产级因果效应估计需要无偏且稳健的方法。双重机器学习(Double ML)通过交叉拟合减少正则化偏差。

# 双重机器学习实现
import numpy as np
from sklearn.base import clone
from sklearn.ensemble import RandomForestRegressor, GradientBoostingClassifier
from sklearn.model_selection import KFold
import joblib
import logging

class DoubleMLEstimator:
    """
    生产级双重机器学习估计器
    
    核心特性:
    1. 支持连续和二元处理变量
    2. 内置交叉验证防止过拟合
    3. 提供置信区间估计
    4. 可序列化用于模型服务
    """
    
    def __init__(
        self, 
        treatment_model=None,
        outcome_model=None,
        n_folds: int = 5,
        random_state: int = 42
    ):
        self.treatment_model = treatment_model or RandomForestRegressor(
            n_estimators=100, 
            random_state=random_state
        )
        self.outcome_model = outcome_model or GradientBoostingClassifier(
            n_estimators=100, 
            random_state=random_state
        )
        self.n_folds = n_folds
        self.random_state = random_state
        
        self.is_fitted = False
        self.ate_estimate = None
        self.ate_std = None
        
        # 日志记录
        self.logger = logging.getLogger(__name__)
        
    def fit(
        self, 
        X: np.ndarray,  # 混淆变量
        T: np.ndarray,  # 处理变量
        Y: np.ndarray   # 结果变量
    ) -> "DoubleMLEstimator":
        """
        训练Double ML模型
        
        实现步骤:
        1. K折交叉拟合
        2. 第一阶段:预测处理和结果
        3. 第二阶段:残差回归估计ATE
        """
        n_samples = X.shape[0]
        kf = KFold(n_splits=self.n_folds, shuffle=True, random_state=self.random_state)
        
        # 存储交叉验证的预测结果
        T_hat = np.zeros_like(T)
        Y_hat = np.zeros_like(Y)
        
        self.logger.info(f"开始{self.n_folds}折交叉拟合...")
        
        # 第一阶段:交叉拟合
        for train_idx, test_idx in kf.split(X):
            # 训练集
            X_train, T_train, Y_train = X[train_idx], T[train_idx], Y[train_idx]
            # 测试集
            X_test = X[test_idx]
            
            # 克隆模型避免数据泄露
            treat_model = clone(self.treatment_model)
            out_model = clone(self.outcome_model)
            
            # 拟合处理模型
            treat_model.fit(X_train, T_train)
            T_hat[test_idx] = treat_model.predict(X_test)
            
            # 拟合结果模型
            out_model.fit(X_train, Y_train)
            Y_hat[test_idx] = out_model.predict(X_test)
        
        # 第二阶段:残差回归
        self.logger.info("计算残差并估计ATE...")
        
        # 计算残差
        T_residual = T - T_hat
        Y_residual = Y - Y_hat
        
        # 残差回归得到ATE估计
        # ATE = E[Y_residual * T_residual] / E[T_residual^2]
        numerator = np.mean(Y_residual * T_residual)
        denominator = np.mean(T_residual ** 2)
        
        self.ate_estimate = numerator / denominator
        
        # 计算标准误(用于置信区间)
        self._compute_standard_error(Y_residual, T_residual)
        
        self.is_fitted = True
        
        self.logger.info(f"ATE估计完成: {self.ate_estimate:.4f} ± {self.ate_std:.4f}")
        
        return self
    
    def _compute_standard_error(
        self, 
        Y_residual: np.ndarray, 
        T_residual: np.ndarray
    ) -> None:
        """计算ATE的标准误"""
        n = len(Y_residual)
        
        # 影响函数(influence function)
        psi = (Y_residual - self.ate_estimate * T_residual) * T_residual
        
        # 标准误
        self.ate_std = np.sqrt(np.var(psi, ddof=1) / n)
    
    def effect_inference(
        self, 
        confidence_level: float = 0.95
    ) -> Dict[str, float]:
        """
        提供统计推断结果
        
        返回:
        - ATE点估计
        - 置信区间
        - t统计量
        """
        if not self.is_fitted:
            raise ValueError("模型未拟合,请先调用fit()")
        
        from scipy import stats
        
        # 置信区间
        alpha = 1 - confidence_level
        z_score = stats.norm.ppf(1 - alpha/2)
        
        ci_lower = self.ate_estimate - z_score * self.ate_std
        ci_upper = self.ate_estimate + z_score * self.ate_std
        
        # t统计量
        t_stat = self.ate_estimate / self.ate_std
        p_value = 2 * (1 - stats.norm.cdf(abs(t_stat)))
        
        return {
            "ate": float(self.ate_estimate),
            "std_error": float(self.ate_std),
            "ci_lower": float(ci_lower),
            "ci_upper": float(ci_upper),
            "t_statistic": float(t_stat),
            "p_value": float(p_value),
            "significant": p_value < 0.05
        }
    
    def predict_individual_effect(
        self, 
        X: np.ndarray
    ) -> np.ndarray:
        """
        预测条件平均处理效应(CATE)
        
        注意:这需要元学习器(如X-Learner),此处为简化版
        """
        if not self.is_fitted:
            raise ValueError("模型未拟合")
        
        # 简化的CATE估计:ATE + 基于X的残差调整
        # 生产环境应使用更复杂的元学习器
        base_effect = np.full(X.shape[0], self.ate_estimate)
        
        return base_effect
    
    def save(self, path: str) -> None:
        """序列化模型到磁盘"""
        joblib.dump({
            "ate_estimate": self.ate_estimate,
            "ate_std": self.ate_std,
            "is_fitted": self.is_fitted,
            "treatment_model": self.treatment_model,
            "outcome_model": self.outcome_model,
            "n_folds": self.n_folds
        }, path)
        self.logger.info(f"模型已保存到 {path}")
    
    @classmethod
    def load(cls, path: str) -> "DoubleMLEstimator":
        """从磁盘加载模型"""
        data = joblib.load(path)
        instance = cls(
            treatment_model=data["treatment_model"],
            outcome_model=data["outcome_model"],
            n_folds=data["n_folds"]
        )
        instance.ate_estimate = data["ate_estimate"]
        instance.ate_std = data["ate_std"]
        instance.is_fitted = data["is_fitted"]
        return instance

罗马数字列表:因果引擎设计要点

I. 算法选择:Double ML适合高维数据,但计算成本高;线性方法快但假设强
II. 识别性验证:必须在服务前验证因果图是否可识别目标效应
III. 不确定性量化:所有估计必须附带置信区间,避免盲目决策
IV. 计算效率:使用Cython/JIT编译关键路径,<100ms响应时间
V. 模型版本化:每个因果图和估计器都有唯一版本,支持灰度发布


Parse error on line 5: ...nt DM as DoubleML估计器 participant FS -----------------------^ Expecting 'SOLID_OPEN_ARROW', 'DOTTED_OPEN_ARROW', 'SOLID_ARROW', 'DOTTED_ARROW', 'SOLID_CROSS', 'DOTTED_CROSS', 'SOLID_POINT', 'DOTTED_POINT', got 'NEWLINE'

V. 实时决策服务:构建可解释的决策API

5.1 高性能决策服务

生产环境要求决策API在毫秒级返回结果,同时保证因果推理的准确性。我们使用FastAPI构建异步服务,并用Redis缓存频繁访问的因果估计。

# 实时因果决策API
from fastapi import FastAPI, HTTPException, Depends
from pydantic import BaseModel
from typing import List, Dict, Optional
import redis
import numpy as np
import hashlib
import time
from datetime import datetime

app = FastAPI(
    title="Causal Decision API",
    description="实时因果推理决策服务"
)

# Redis缓存(存储预计算的CATE)
redis_client = redis.Redis(
    host='redis-cluster.prod.internal',
    port=6379, 
    db=0,
    decode_responses=True
)

# 模型管理器
class ModelManager:
    """管理多版本因果模型"""
    
    def __init__(self):
        self.models: Dict[str, DoubleMLEstimator] = {}
        self.causal_graphs: Dict[str, CausalGraphModel] = {}
        self.default_version = "v1.2"
        
    def load_model(self, version: str) -> None:
        """从模型仓库加载特定版本"""
        model_path = f"s3://causal-models/warehouse/{version}/double_ml.joblib"
        graph_path = f"s3://causal-models/warehouse/{version}/causal_graph.json"
        
        self.models[version] = DoubleMLEstimator.load(model_path)
        self.causal_graphs[version] = CausalGraphModel(version)
        
        app.logger.info(f"模型版本 {version} 加载完成")
    
    def get_model(self, version: Optional[str] = None):
        version = version or self.default_version
        if version not in self.models:
            raise HTTPException(
                status_code=404, 
                detail=f"模型版本 {version} 不存在"
            )
        return self.models[version], self.causal_graphs[version]

model_manager = ModelManager()

# 输入输出模型
class DecisionRequest(BaseModel):
    user_id: str
    context: Dict[str, float]  # 实时上下文特征
    available_treatments: List[Dict[str, any]]  # 可选干预
    version: Optional[str] = None

class TreatmentEffect(BaseModel):
    treatment_id: str
    estimated_effect: float
    confidence_interval: List[float]
    p_value: float
    significant: bool

class DecisionResponse(BaseModel):
    user_id: str
    recommended_treatment: Optional[str]
    expected_outcome: float
    effects: List[TreatmentEffect]
    confidence_threshold: float
    computed_at: datetime

def get_cache_key(user_id: str, treatment: Dict) -> str:
    """生成缓存键"""
    feature_str = f"{user_id}_{json.dumps(treatment)}"
    return f"cate:{hashlib.md5(feature_str.encode()).hexdigest()}"

@app.post("/v1/decide", response_model=DecisionResponse)
async def make_causal_decision(request: DecisionRequest):
    """
    实时因果决策接口
    
    决策逻辑:
    1. 验证请求
    2. 检查缓存
    3. 计算各处理效应
    4. 应用决策策略
    5. 记录日志
    """
    start_time = time.time()
    
    # 1. 加载模型
    model, causal_graph = model_manager.get_model(request.version)
    
    # 2. 准备特征向量
    # 从上下文中提取混淆变量
    confounder_values = [
        request.context.get("avg_session_duration", 0),
        request.context.get("days_since_last_purchase", 30),
        request.context.get("historical_spend", 0)
    ]
    X = np.array([confounder_values])
    
    # 3. 计算每个候选干预的效应
    effects = []
    for treatment in request.available_treatments:
        treatment_id = treatment["treatment_id"]
        
        # 检查缓存
        cache_key = get_cache_key(request.user_id, treatment)
        cached_effect = redis_client.get(cache_key)
        
        if cached_effect:
            # 缓存命中
            effect_data = json.loads(cached_effect)
        else:
            # 缓存未命中,计算CATE
            # 模拟处理变量(根据treatment参数)
            if treatment["type"] == "coupon":
                T = np.array([[treatment["discount_percent"]]])
            else:
                T = np.array([[0]])  # 对照组
            
            # 获取ATE并调整为CATE(此处简化)
            ate_info = model.effect_inference()
            cate = model.predict_individual_effect(X)[0]
            
            effect_data = {
                "treatment_id": treatment_id,
                "estimated_effect": float(cate),
                "confidence_interval": ate_info["ci_lower"],
                "p_value": ate_info["p_value"],
                "significant": ate_info["significant"]
            }
            
            # 写入缓存(TTL=5分钟)
            redis_client.setex(
                cache_key, 
                300, 
                json.dumps(effect_data)
            )
        
        effects.append(TreatmentEffect(**effect_data))
    
    # 4. 应用决策策略
    # 策略:选择效应显著且最大的干预
    significant_effects = [
        e for e in effects if e.significant and e.p_value < 0.05
    ]
    
    if significant_effects:
        # 选择最优干预
        best_treatment = max(
            significant_effects, 
            key=lambda x: x.estimated_effect
        )
        recommended = best_treatment.treatment_id
        expected_outcome = best_treatment.estimated_effect
    else:
        # 无显著效应,选择默认(通常是不干预)
        recommended = None
        expected_outcome = 0.0
    
    # 5. 记录决策日志(异步)
    log_decision(
        user_id=request.user_id,
        recommendation=recommended,
        effects=effects,
        latency_ms=(time.time() - start_time) * 1000
    )
    
    return DecisionResponse(
        user_id=request.user_id,
        recommended_treatment=recommended,
        expected_outcome=expected_outcome,
        effects=effects,
        confidence_threshold=0.05,
        computed_at=datetime.utcnow()
    )

def log_decision(user_id: str, recommendation: str, effects: List, latency_ms: float):
    """异步日志记录"""
    log_entry = {
        "timestamp": datetime.utcnow().isoformat(),
        "user_id": user_id,
        "recommendation": recommendation,
        "effects": [e.dict() for e in effects],
        "latency_ms": latency_ms
    }
    
    # 发送到Kafka进行后续分析
    # 实际生产中使用异步生产者
    kafka_producer.send(
        topic="causal_decisions",
        value=json.dumps(log_entry).encode()
    )

@app.get("/health")
async def health_check():
    """健康检查"""
    return {
        "status": "healthy",
        "model_versions": list(model_manager.models.keys()),
        "redis_connected": redis_client.ping(),
        "timestamp": datetime.utcnow().isoformat()
    }

5.2 决策策略引擎

策略引擎将因果效应转化为业务动作,支持多目标优化和风险约束。

# 策略引擎实现
class PolicyEngine:
    """
    基于因果效应的策略优化引擎
    
    支持:
    - 多目标优化(收入、留存、成本)
    - 风险约束(置信度、预算)
    - A/B测试分流
    """
    
    def __init__(
        self, 
        budget_constraint: Optional[float] = None,
        risk_aversion: float = 0.5
    ):
        self.budget_constraint = budget_constraint
        self.risk_aversion = risk_aversion  # 风险厌恶系数
        
    def optimize(
        self,
        effects: List[TreatmentEffect],
        user_segment: str,
        context: Dict
    ) -> Tuple[Optional[str], float]:
        """
        多目标策略优化
        
        目标函数:
        max Σ (effect_i * confidence_i^α) - λ * cost_i
        
        约束:
        - 总预算 <= B
        - 最小置信度 >= threshold
        """
        # 过滤不满足最小置信度的干预
        valid_effects = [
            e for e in effects 
            if e.significant and e.p_value < 0.05
        ]
        
        if not valid_effects:
            return None, 0.0  # 无最优干预
        
        # 计算每个干预的效用分数
        scores = []
        for effect in valid_effects:
            # 效应大小
            effect_size = effect.estimated_effect
            
            # 置信度加权(风险调整)
            confidence_weight = (1 - effect.p_value) ** self.risk_aversion
            
            # 成本(如果有)
            cost = self._get_treatment_cost(effect.treatment_id)
            
            # 效用分数
            utility = effect_size * confidence_weight - 0.1 * cost
            
            scores.append((effect.treatment_id, utility, effect_size))
        
        # 选择最优干预
        best_treatment, best_utility, expected_effect = max(
            scores, 
            key=lambda x: x[1]
        )
        
        # 检查预算约束
        if self.budget_constraint:
            treatment_cost = self._get_treatment_cost(best_treatment)
            if treatment_cost > self.budget_constraint:
                return None, 0.0  # 超出预算,不干预
        
        return best_treatment, expected_effect
    
    def _get_treatment_cost(self, treatment_id: str) -> float:
        """获取干预成本映射"""
        cost_map = {
            "coupon_5pct": 5.0,
            "coupon_10pct": 10.0,
            "free_shipping": 3.0,
            "no_treatment": 0.0
        }
        return cost_map.get(treatment_id, 0.0)

罗马数字列表:实时服务设计原则

I. 无状态设计:API服务无状态,可水平扩展应对流量峰值
II. 智能缓存:CATE计算使用Redis缓存,减少重复计算
III. 降级策略:模型服务失败时回退到规则引擎
IV. 异步日志:决策日志异步写入,避免阻塞主流程
V. 多版本支持:同时服务多个模型版本,支持灰度发布


通过
拒绝
命中
未命中
用户请求
API Gateway
限流&认证
决策服务
返回429
特征获取
Redis缓存
返回CATE
DoubleML计算
写入缓存
策略优化
置信度
>阈值?
返回最优干预
返回默认策略
异步日志
Kafka

VI. 模型训练与更新管道:持续学习

6.1 离线训练管道

因果模型需要定期重训练以适应数据分布变化。我们使用Airflow编排训练管道。

# Airflow DAG定义:因果模型训练管道
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.amazon.aws.operators.s3 import S3CreateObjectOperator
from datetime import datetime, timedelta
import boto3

default_args = {
    'owner': 'causal-ml-team',
    'depends_on_past': False,
    'start_date': datetime(2024, 1, 1),
    'email_on_failure': True,
    'retries': 3,
    'retry_delay': timedelta(minutes=15),
}

dag = DAG(
    'causal_model_training',
    default_args=default_args,
    description='因果推理模型每日训练管道',
    schedule_interval='0 2 * * *',  # 每天凌晨2点
    catchup=False,
    max_active_runs=1
)

def check_data_quality(**context):
    """数据质量检查"""
    from great_expectations_provider.operators.great_expectations import GreatExpectationsOperator
    
    # 检查反事实数据集的完整性
    tasks = GreatExpectationsOperator(
        task_id='validate_counterfactual_data',
        data_context_root_dir='/great_expectations',
        expectation_suite_name='counterfactual_dataset',
        data_asset_name='daily_counterfactuals'
    )
    tasks.execute(context)

def generate_counterfactual_data(**context):
    """生成最新的反事实数据集"""
    from pyspark.sql import SparkSession
    
    spark = SparkSession.builder.appName("DailyCausalETL").getOrCreate()
    
    # 读取前一天的数据
    execution_date = context['ds']
    input_path = f"s3://user-logs/dt={execution_date}/*"
    output_path = f"s3://causal-training/data/{execution_date}/"
    
    # 调用反事实生成函数
    generate_counterfactual_dataset(input_path, output_path)
    
    # 推送XCom供下游任务使用
    context['task_instance'].xcom_push(
        key='training_data_path', 
        value=output_path
    )

def train_and_validate_model(**context):
    """训练Double ML模型并验证"""
    from src.models import DoubleMLEstimator
    from src.validation import CausalModelValidator
    
    # 获取数据路径
    data_path = context['task_instance'].xcom_pull(
        task_ids='generate_counterfactual_data',
        key='training_data_path'
    )
    
    # 加载数据
    import pandas as pd
    df = pd.read_parquet(data_path)
    
    # 准备特征
    feature_cols = [
        'confounder_avg_session_duration',
        'confounder_session_frequency',
        'confounder_historical_spend'
    ]
    X = df[feature_cols].values
    T = df['treatment_coupon_discount'].values.reshape(-1, 1)
    Y = df['outcome_purchase_made'].values
    
    # 训练模型
    model = DoubleMLEstimator(
        treatment_model=RandomForestRegressor(n_estimators=200),
        outcome_model=GradientBoostingClassifier(n_estimators=200),
        n_folds=5
    )
    
    model.fit(X, T, Y)
    
    # 验证因果假设
    validator = CausalModelValidator()
    validation_results = validator.validate(
        model, 
        X, T, Y, 
        causal_graph=CausalGraphModel()
    )
    
    # 检查效应符号是否符合业务预期
    if validation_results['ate_sign_valid'] is False:
        raise ValueError("ATE符号与业务常识不符,可能因果图定义错误")
    
    # 保存模型
    model_version = f"v{context['ds_nodash']}"
    model_path = f"s3://causal-models/staging/{model_version}/"
    
    model.save(model_path + "double_ml.joblib")
    
    # 推送模型信息
    context['task_instance'].xcom_push(
        key='model_version',
        value=model_version
    )
    
    return validation_results

def shadow_deployment_test(**context):
    """影子部署测试(Shadow Testing)"""
    import requests
    
    model_version = context['task_instance'].xcom_pull(
        task_ids='train_and_validate_model',
        key='model_version'
    )
    
    # 加载测试数据
    test_data = load_shadow_test_data()
    
    # 同时查询生产模型和新模型
    results = []
    for sample in test_data:
        prod_response = requests.post(
            "https://api.prod.internal/v1/decide",
            json=sample
        )
        
        shadow_response = requests.post(
            "https://api.staging.internal/v1/decide",
            json={**sample, "version": model_version}
        )
        
        results.append({
            "user_id": sample["user_id"],
            "prod_recommendation": prod_response.json()["recommended_treatment"],
            "shadow_recommendation": shadow_response.json()["recommended_treatment"],
            "effect_diff": abs(
                prod_response.json()["expected_outcome"] - 
                shadow_response.json()["expected_outcome"]
            )
        })
    
    # 分析差异
    diff_df = pd.DataFrame(results)
    avg_diff = diff_df["effect_diff"].mean()
    
    if avg_diff > 0.05:  # 效应差异超过5%
        raise ValueError(f"影子测试失败:平均效应差异{avg_diff:.2%}")
    
    return {"shadow_test_passed": True, "avg_effect_diff": avg_diff}

def promote_to_production(**context):
    """模型上线"""
    model_version = context['task_instance'].xcom_pull(
        task_ids='train_and_validate_model',
        key='model_version'
    )
    
    # 复制模型到生产仓库
    s3 = boto3.client('s3')
    staging_prefix = f"s3://causal-models/staging/{model_version}/"
    prod_prefix = f"s3://causal-models/production/{model_version}/"
    
    # 实际实现中需要递归复制所有文件
    s3.copy_object(
        CopySource=staging_prefix + "double_ml.joblib",
        Bucket="causal-models",
        Key=prod_prefix + "double_ml.joblib"
    )
    
    # 更新服务配置(通过Kubernetes ConfigMap)
    update_model_version_config(model_version)
    
    return {"promoted_version": model_version}

# 定义任务依赖
data_quality_task = PythonOperator(
    task_id='check_data_quality',
    python_callable=check_data_quality,
    dag=dag
)

generate_data_task = PythonOperator(
    task_id='generate_counterfactual_data',
    python_callable=generate_counterfactual_data,
    dag=dag
)

train_model_task = PythonOperator(
    task_id='train_and_validate_model',
    python_callable=train_and_validate_model,
    dag=dag,
    execution_timeout=timedelta(hours=2)
)

shadow_test_task = PythonOperator(
    task_id='shadow_deployment_test',
    python_callable=shadow_deployment_test,
    dag=dag
)

promote_task = PythonOperator(
    task_id='promote_to_production',
    python_callable=promote_to_production,
    dag=dag
)

# 任务流
data_quality_task >> generate_data_task >> train_model_task
train_model_task >> shadow_test_task >> promote_task

6.2 在线学习适应

对于快速变化的环境,离线训练可能不够及时。实现在线学习更新机制。

# 在线学习更新器
class OnlineCausalUpdater:
    """
    基于FTRL的在线因果更新
    
    适用于:
    - 数据分布快速变化
    - 需要实时调整CATE估计
    """
    
    def __init__(
        self, 
        base_model: DoubleMLEstimator,
        learning_rate: float = 0.01,
        l1_regularization: float = 1.0
    ):
        self.base_model = base_model
        self.learning_rate = learning_rate
        self.l1 = l1_regularization
        
        # FTRL参数
        self.z = None  # 累积梯度
        self.n = None  # 累积平方梯度
        
        self._initialize_weights()
    
    def _initialize_weights(self):
        """初始化在线学习参数"""
        # 获取基础模型的CATE估计作为初始权重
        self.weights = np.array([self.base_model.ate_estimate])
        
        self.z = np.zeros_like(self.weights)
        self.n = np.zeros_like(self.weights)
    
    def partial_fit(
        self, 
        X: np.ndarray,
        T: np.ndarray,
        Y: np.ndarray,
        weights: Optional[np.ndarray] = None
    ):
        """
        在线更新参数
        
        使用重要性采样纠正选择偏差
        """
        # 获取基础模型预测
        y_pred = self.base_model.predict_individual_effect(X)
        
        # 计算残差
        residuals = Y - y_pred
        
        # 重要性权重(处理选择偏差)
        if weights is None:
            weights = np.ones_like(T)
        
        # FTRL-Proximal更新
        for i in range(len(X)):
            self._ftrl_update(X[i], residuals[i], weights[i])
    
    def _ftrl_update(self, x: np.ndarray, residual: float, weight: float):
        """单样本FTRL更新"""
        # 简化的线性更新(实际应为双稳健估计)
        for j in range(len(self.weights)):
            sigma = (np.sqrt(self.n[j] + (self.learning_rate * x[j])**2) - 
                     np.sqrt(self.n[j])) / self.learning_rate
            
            self.z[j] += self.learning_rate * x[j] * residual * weight - sigma * self.weights[j]
            self.n[j] += (self.learning_rate * x[j])**2
            
            # 权重更新
            if abs(self.z[j]) <= self.l1:
                self.weights[j] = 0
            else:
                self.weights[j] = -(self.z[j] - 
                                   np.sign(self.z[j]) * self.l1) / (self.l1 + np.sqrt(self.n[j]))
    
    def get_updated_effect(self, X: np.ndarray) -> np.ndarray:
        """获取更新后的CATE估计"""
        base_effect = self.base_model.predict_individual_effect(X)
        online_adjustment = X @ self.weights.reshape(-1, 1)
        
        return base_effect + online_adjustment.flatten()

# 在线更新 worker(使用Faust)
import faust

app = faust.App(
    'causal_online_updater',
    broker='kafka://kafka-cluster:9092',
    value_serializer='json'
)

decision_log_topic = app.topic('causal_decisions', partitions=16)

@app.agent(decision_log_topic)
async def update_model(stream):
    """
    消费决策日志,在线更新模型
    
    处理逻辑:
    1. 获取决策和实际结果
    2. 构建反事实样本
    3. 在线更新参数
    """
    updater = OnlineCausalUpdater(
        base_model=DoubleMLEstimator.load("s3://causal-models/production/latest/")
    )
    
    async for event in stream.group_by(
        lambda e: e["user_id"],  # 按键分区
        name='decisions_by_user'
    ):
        # 检查是否观察到结果(用户实际购买)
        if event["outcome_observed"]:
            # 构建训练样本
            X = np.array([extract_features(event)])
            T = np.array([[event["treatment_assigned"]]])
            Y = np.array([event["actual_outcome"]])
            
            # 在线更新
            updater.partial_fit(X, T, Y)
            
            # 定期保存更新后的模型
            if event["count"] % 1000 == 0:
                save_online_model(updater)

2024-01-012024-01-012024-01-012024-01-022024-01-022024-01-022024-01-022024-01-032024-01-032024-01-032024-01-032024-01-042024-01-042024-01-04数据质量检查 反事实数据生成 DoubleML训练 因果图验证 离线验证 影子部署(48h) 生产发布 监控观察(24h) 效果分析 数据处理模型训练测试验证上线反馈模型训练与部署时间线

VII. 监控与可观测性:因果系统的特殊性

因果推理系统需要监控传统ML指标之外的特殊指标:识别性假设是否仍然成立、反事实预测是否准确等。

7.1 因果假设监控

检查关键混淆变量的平衡性,确保随机化(或伪随机化)仍然有效。

# 因果假设监控器
class CausalAssumptionMonitor:
    """
    监控因果识别所需的假设
    
    核心假设:
    1. 无未观测混淆(敏感性分析)
    2. 正值假设(Positivity)
    3. 稳定单位处理值假设(SUTVA)
    """
    
    def __init__(self, causal_graph: CausalGraphModel):
        self.graph = causal_graph
        self.metrics = {}
    
    def check_positivity(self, data: pd.DataFrame, treatment: str) -> Dict:
        """
        检查正值假设:每个特征组合都有接受各处理的可能性
        
        诊断:
        - 倾向得分接近0或1的样本过多
        - 某些子空间缺乏处理/对照样本
        """
        from scipy import stats
        
        # 计算倾向得分
        ps_model = LogisticRegression()
        feature_cols = [
            c for c in data.columns 
            if c.startswith("confounder_")
        ]
        
        X = data[feature_cols]
        T = data[treatment]
        
        ps_model.fit(X, T)
        propensity_scores = ps_model.predict_proba(X)[:, 1]
        
        # 检查极端倾向得分比例
        extreme_propensity = np.mean(
            (propensity_scores < 0.01) | (propensity_scores > 0.99)
        )
        
        # 计算重叠系数
        treated_ps = propensity_scores[T == 1]
        control_ps = propensity_scores[T == 0]
        
        # 使用核密度估计计算重叠
        kde_treated = stats.gaussian_kde(treated_ps)
        kde_control = stats.gaussian_kde(control_ps)
        
        x_range = np.linspace(0, 1, 1000)
        overlap = np.trapz(
            np.minimum(kde_treated(x_range), kde_control(x_range)),
            x_range
        )
        
        return {
            "positivity_violation": extreme_propensity > 0.1,
            "extreme_propensity_ratio": float(extreme_propensity),
            "overlap_coefficient": float(overlap),
            "warning": overlap < 0.5
        }
    
    def detect_unobserved_confounding(self, residuals: np.ndarray) -> Dict:
        """
        通过残差模式检测未观测混淆
        
        方法:如果残差与混淆变量存在系统性相关,可能暗示未观测变量
        """
        from statsmodels.stats.diagnostic import het_white
        
        # 异方差检验(简化版)
        # 显著的异方差可能暗示遗漏变量
        try:
            lm, p_value, f_stat, fp_value = het_white(
                residuals, 
                np.column_stack([
                    np.ones(len(residuals)),
                    np.arange(len(residuals))
                ])
            )
            
            return {
                "heteroskedasticity_pvalue": float(p_value),
                "suspicious": p_value < 0.01,
                "recommendation": (
                    "检测到异方差,建议检查遗漏变量" 
                    if p_value < 0.01 
                    else "未发现明显异方差"
                )
            }
        except Exception as e:
            return {
                "error": str(e),
                "recommendation": "检测失败,需人工审查"
            }
    
    def monitor_sutva_violation(self, network_data: pd.DataFrame) -> Dict:
        """
        监控SUTVA违反:处理效应是否受其他单位影响
        
        在社交网络中,优惠券可能影响朋友购买行为(溢出效应)
        """
        # 计算网络相关性
        # 简化的实现:检查空间/时间自相关
        
        from sklearn.neighbors import NearestNeighbors
        
        # 基于地理位置的邻近性
        coords = network_data[['lat', 'lng']].values
        nbrs = NearestNeighbors(n_neighbors=5).fit(coords)
        
        # 计算空间自相关Moran's I
        treatment_effects = network_data['observed_effects'].values
        
        # 简化的空间自相关计算
        distances, indices = nbrs.kneighbors(coords)
        spatial_lag = np.mean(treatment_effects[indices[:, 1:]], axis=1)
        
        correlation = np.corrcoef(treatment_effects, spatial_lag)[0, 1]
        
        return {
            "spatial_autocorrelation": float(correlation),
            "sutva_risk": abs(correlation) > 0.3,
            "recommendation": (
                "检测到溢出效应,需使用网络因果推断"
                if abs(correlation) > 0.3
                else "SUTVA假设可能成立"
            )
        }

# Prometheus监控指标
from prometheus_client import Counter, Histogram, Gauge

# 因果特异性指标
causal_positivity_violations = Counter(
    'causal_positivity_violations_total',
    '正值假设违反次数'
)

causal_effect_sign_flips = Counter(
    'causal_effect_sign_flips_total',
    '效应符号翻转次数(与前版本对比)'
)

cate_prediction_latency = Histogram(
    'cate_prediction_duration_seconds',
    'CATE预测延迟'
)

cate_confidence_width = Gauge(
    'cate_confidence_interval_width',
    '置信区间宽度(反映不确定性)'
)

@app.middleware("http")
async def monitor_causal_metrics(request: Request, call_next):
    """监控中间件"""
    if request.url.path == "/v1/decide":
        # 记录预测延迟
        with cate_prediction_latency.time():
            response = await call_next(request)
        
        # 记录置信区间宽度
        # 实际需要从response.body中解析
        cate_confidence_width.set(...)
        
        return response
    
    return await call_next(request)

7.2 反事实预测验证

监控反事实预测的准确性是因果系统的关键挑战。我们使用随机对照试验(RCT)数据进行回测。

# 反事实验证器
class CounterfactualValidator:
    """
    使用RCT数据验证反事实预测
    
    核心思想:
    - 在RCT中我们知道 ground truth 的个体处理效应(ITE)
    - 比较模型预测的CATE与真实的ITE
    """
    
    def __init__(self, rct_data_path: str):
        self.rct_data = pd.read_parquet(rct_data_path)
        
    def calculate_pehe(self, model: DoubleMLEstimator) -> float:
        """
        计算Precision in Estimation of Heterogeneous Effect (PEHE)
        
        PEHE = sqrt(1/n Σ (CATE(x_i) - ITE(x_i))^2)
        
        其中ITE在RCT中可观测:ITE = Y_i(1) - Y_i(0)
        """
        X = self.rct_data[['feature1', 'feature2', 'feature3']].values
        
        # 预测CATE
        cate_pred = model.predict_individual_effect(X)
        
        # 计算真实ITE(仅在RCT中可行)
        # 需要同时观测处理和对照结果(通过交叉验证)
        true_ite = self._estimate_true_ite()
        
        pehe = np.sqrt(np.mean((cate_pred - true_ite) ** 2))
        
        return pehe
    
    def _estimate_true_ite(self) -> np.ndarray:
        """在RCT中估计真实ITE"""
        # 使用T-learner估计ITE作为ground truth
        from sklearn.ensemble import RandomForestRegressor
        
        X = self.rct_data[['feature1', 'feature2', 'feature3']]
        T = self.rct_data['treatment']
        Y = self.rct_data['outcome']
        
        # 训练处理组和对照组模型
        model_treated = RandomForestRegressor()
        model_control = RandomForestRegressor()
        
        model_treated.fit(X[T == 1], Y[T == 1])
        model_control.fit(X[T == 0], Y[T == 0])
        
        # 预测反事实结果
        y1_pred = model_treated.predict(X)
        y0_pred = model_control.predict(X)
        
        return y1_pred - y0_pred
    
    def policy_risk_evaluation(
        self, 
        model: DoubleMLEstimator,
        budget: float = 0.3
    ) -> Dict:
        """
        策略风险评估:如果根据模型做决策,效果如何?
        
        模拟:
        - 根据CATE排序,对前budget%的用户施加处理
        - 计算期望收益
        - 与最优策略对比(需要先知)
        """
        n_samples = len(self.rct_data)
        n_treated = int(budget * n_samples)
        
        X = self.rct_data[['feature1', 'feature2', 'feature3']].values
        Y = self.rct_data['outcome'].values
        
        # 模型预测CATE
        cate_scores = model.predict_individual_effect(X)
        
        # 模型策略:处理CATE最高的用户
        treatment_mask_model = np.zeros(n_samples, dtype=bool)
        treatment_mask_model[np.argsort(-cate_scores)[:n_treated]] = True
        
        # 计算模型策略的收益
        model_revenue = Y[treatment_mask_model].sum()
        
        # 最优策略(先知):处理真实ITE最高的用户
        true_ite = self._estimate_true_ite()
        optimal_mask = np.zeros(n_samples, dtype=bool)
        optimal_mask[np.argsort(-true_ite)[:n_treated]] = True
        
        optimal_revenue = Y[optimal_mask].sum()
        
        # 策略遗憾(Regret)
        regret = (optimal_revenue - model_revenue) / optimal_revenue
        
        return {
            "model_revenue": float(model_revenue),
            "optimal_revenue": float(optimal_revenue),
            "regret": float(regret),
            "regret_pct": f"{regret*100:.2f}%",
            "policy_quality": (
                "优秀" if regret < 0.05 else
                "良好" if regret < 0.15 else
                "需改进"
            )
        }

# Grafana监控仪表板配置(JSON片段)
grafana_dashboard = {
    "panels": [
        {
            "title": "因果效应估计趋势",
            "targets": [{
                "expr": "avg(causal_ate_estimate)",
                "legendFormat": "ATE",
            }],
            "alert": {
                "conditions": [{
                    "evaluator": {"params": [0], "type": "lt"},
                    "operator": {"type": "and"},
                    "query": {"params": ["A", "5m", "now"]},
                    "reducer": {"params": [], "type": "avg"},
                    "type": "query"
                }],
                "message": "ATE变为负值,可能因果图错误"
            }
        },
        {
            "title": "正值假设健康状况",
            "targets": [{
                "expr": "causal_positivity_violations_total",
                "legendFormat": "违反次数",
            }]
        },
        {
            "title": "策略遗憾(Regret)",
            "targets": [{
                "expr": "causal_policy_regret_pct",
                "legendFormat": "模型vs最优",
            }],
            "alert": {
                "conditions": [{
                    "evaluator": {"params": [0.2], "type": "gt"},
                    "operator": {"type": "and"},
                    "query": {"params": ["A", "1h", "now"]},
                    "reducer": {"params": [], "type": "avg"},
                    "type": "query"
                }],
                "message": "策略遗憾超过20%,模型需要重新训练"
            }
        }
    ]
}

罗马数字列表:监控关键指标

I. 识别性监控:正值假设违反率、工具变量相关性
II. 效应稳定性:ATE漂移检测,跨版本符号一致性
III. 反事实准确性:PEHE指标、RCT回测遗憾
IV. 业务影响:实际采纳率、ROI、A/B测试胜率
V. 系统性能:P99延迟、缓存命中率、模型加载时间


传统ML监控
因果特异性监控
模型漂移
Drift
预测延迟
Latency
资源使用
Resource
Prometheus
Exporters
正值假设
Positivity
效应符号
Sign Flips
策略遗憾
Regret
未观测混淆
Confounding
SUTVA违反
Spillover
Prometheus
Server
Grafana
Dashboard
AlertManager
通知渠道
PagerDuty
Slack
Email

VIII. 生产部署实战:Kubernetes与Docker

8.1 容器化与编排

将因果推理服务容器化,使用Kubernetes管理生命周期和扩缩容。

# Dockerfile:因果决策服务
FROM python:3.10-slim as base

# 安装系统依赖
RUN apt-get update && apt-get install -y \
    gcc \
    g++ \
    libgomp1 \
    && rm -rf /var/lib/apt/lists/*

# 设置工作目录
WORKDIR /app

# 安装Python依赖
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

# 复制应用代码
COPY src/ ./src/
COPY models/ ./models/

# 创建非root用户
RUN useradd -m -u 1000 causaluser
RUN chown -R causaluser:causaluser /app
USER causaluser

# 健康检查
HEALTHCHECK --interval=30s --timeout=10s --start-period=5s --retries=3 \
    CMD curl -f http://localhost:8000/health || exit 1

# 启动命令
CMD ["uvicorn", "src.api:app", "--host", "0.0.0.0", "--port", "8000", "--workers", "4"]
# Kubernetes Deployment配置
apiVersion: apps/v1
kind: Deployment
metadata:
  name: causal-decision-api
  namespace: ml-services
  labels:
    app: causal-api
    version: v1.2
spec:
  replicas: 5
  selector:
    matchLabels:
      app: causal-api
  template:
    metadata:
      labels:
        app: causal-api
        version: v1.2
      annotations:
        prometheus.io/scrape: "true"
        prometheus.io/port: "8000"
        prometheus.io/path: "/metrics"
    spec:
      # 服务账户(用于访问S3和参数存储)
      serviceAccountName: ml-service-account
      
      # 安全上下文
      securityContext:
        runAsNonRoot: true
        runAsUser: 1000
        fsGroup: 1000
      
      # 初始化容器:下载模型
      initContainers:
      - name: model-downloader
        image: amazon/aws-cli:latest
        command: ["sh", "-c"]
        args:
          - |
            aws s3 sync s3://causal-models/production/v1.2/ /models/
            echo "模型下载完成"
        volumeMounts:
        - name: model-volume
          mountPath: /models
        
        # 资源请求
        resources:
          requests:
            memory: "128Mi"
            cpu: "100m"
          limits:
            memory: "256Mi"
            cpu: "200m"
      
      containers:
      - name: api
        image: causal-decision-api:v1.2
        ports:
        - containerPort: 8000
          name: http
          protocol: TCP
        
        # 环境变量
        env:
        - name: MODEL_VERSION
          value: "v1.2"
        - name: REDIS_HOST
          valueFrom:
            secretKeyRef:
              name: redis-credentials
              key: host
        - name: AWS_REGION
          value: "us-east-1"
        
        # 资源请求与限制
        resources:
          requests:
            memory: "2Gi"
            cpu: "1000m"
          limits:
            memory: "4Gi"
            cpu: "2000m"
        
        # 就绪探针
        readinessProbe:
          httpGet:
            path: /health
            port: 8000
          initialDelaySeconds: 10
          periodSeconds: 5
          timeoutSeconds: 3
          failureThreshold: 3
        
        # 存活探针
        livenessProbe:
          httpGet:
            path: /health
            port: 8000
          initialDelaySeconds: 30
          periodSeconds: 30
          timeoutSeconds: 5
          failureThreshold: 5
        
        # 模型卷挂载
        volumeMounts:
        - name: model-volume
          mountPath: /app/models
          readOnly: true
        
        # 安全上下文
        securityContext:
          allowPrivilegeEscalation: false
          capabilities:
            drop:
            - ALL
      
      volumes:
      - name: model-volume
        emptyDir:
          sizeLimit: "2Gi"
      
      # 节点亲和性(优先使用计算优化型实例)
      affinity:
        nodeAffinity:
          preferredDuringSchedulingIgnoredDuringExecution:
          - weight: 1
            preference:
              matchExpressions:
              - key: node-type
                operator: In
                values:
                - compute-optimized
      
      # 拓扑分布约束
      topologySpreadConstraints:
      - maxSkew: 1
        topologyKey: topology.kubernetes.io/zone
        whenUnsatisfiable: DoNotSchedule
        labelSelector:
          matchLabels:
            app: causal-api

---
# HorizontalPodAutoscaler配置
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
  name: causal-api-hpa
  namespace: ml-services
spec:
  scaleTargetRef:
    apiVersion: apps/v1
    kind: Deployment
    name: causal-decision-api
  minReplicas: 5
  maxReplicas: 50
  metrics:
  - type: Resource
    resource:
      name: cpu
      target:
        type: Utilization
        averageUtilization: 70
  - type: Resource
    resource:
      name: memory
      target:
        type: Utilization
        averageUtilization: 80
  - type: Pods
    pods:
      metric:
        name: cate_prediction_duration_seconds
      target:
        type: AverageValue
        averageValue: "200m"  # 200ms平均延迟
  behavior:
    scaleUp:
      stabilizationWindowSeconds: 60
      policies:
      - type: Percent
        value: 100
        periodSeconds: 15
    scaleDown:
      stabilizationWindowSeconds: 300
      policies:
      - type: Percent
        value: 50
        periodSeconds: 60

---
# Service配置
apiVersion: v1
kind: Service
metadata:
  name: causal-decision-api-service
  namespace: ml-services
  annotations:
    service.beta.kubernetes.io/aws-load-balancer-type: "nlb"
spec:
  type: LoadBalancer
  ports:
  - port: 80
    targetPort: 8000
    protocol: TCP
    name: http
  selector:
    app: causal-api

8.2 服务网格集成

使用Istio实现流量管理、熔断和可观测性。

# Istio VirtualService配置(金丝雀发布)
apiVersion: networking.istio.io/v1beta1
kind: VirtualService
metadata:
  name: causal-api-virtualservice
  namespace: ml-services
spec:
  hosts:
  - causal-api.prod.internal
  http:
  - match:
    - headers:
        x-model-version:
          exact: "v1.2"
    route:
    - destination:
        host: causal-decision-api-service
        subset: v1.2
      weight: 100
  - route:
    - destination:
        host: causal-decision-api-service
        subset: v1.1
      weight: 90
    - destination:
        host: causal-decision-api-service
        subset: v1.2
      weight: 10  # 10%流量到新版本

---
# DestinationRule配置
apiVersion: networking.istio.io/v1beta1
kind: DestinationRule
metadata:
  name: causal-api-destinationrule
  namespace: ml-services
spec:
  host: causal-decision-api-service
  subsets:
  - name: v1.1
    labels:
      version: v1.1
  - name: v1.2
    labels:
      version: v1.2
  trafficPolicy:
    connectionPool:
      tcp:
        maxConnections: 100
      http:
        http1MaxPendingRequests: 50
        maxRequestsPerConnection: 3
    circuitBreaker:
      consecutiveErrors: 5
      interval: 30s
      baseEjectionTime: 30s
      maxEjectionPercent: 50
    outlierDetection:
      consecutiveErrors: 5
      interval: 30s
      baseEjectionTime: 30s
    retries:
      attempts: 3
      perTryTimeout: 200ms
    timeout: 1s

---
# Istio Gateway配置(外部访问)
apiVersion: networking.istio.io/v1beta1
kind: Gateway
metadata:
  name: causal-api-gateway
  namespace: ml-services
spec:
  selector:
    istio: ingressgateway
  servers:
  - port:
      number: 443
      name: https
      protocol: HTTPS
    tls:
      mode: SIMPLE
      credentialName: causal-api-tls-cert
    hosts:
    - "causal-api.company.com"

CD
CI/CD Pipeline
更新K8s配置
ArgoCD同步
部署到staging
自动化E2E测试
测试通过?
部署到prod 10%
通知&回滚
监控关键指标
指标正常?
全量部署
监控48小时
Jenkins/GitLab CI
Git Push
单元测试
集成测试
构建Docker镜像
推送到ECR
【声明】本内容来自华为云开发者社区博主,不代表华为云及华为云开发者社区的观点和立场。转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息,否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。