因果推断进阶:DAG与结构因果模型深度解析【玩转华为云】
I. 引言:从相关性到因果性的跨越

在数据科学实践中,"相关性不等于因果性"是分析师与决策者之间永恒的鸿沟。当业务方问"这个策略究竟带来了多少真实提升"时,传统的机器学习模型往往只能给出预测性结论,而无法回答反事实问题(Counterfactual)。本文将深入探讨有向无环图(DAG)与结构因果模型(SCM)的理论体系,并结合华为云ModelArts、DataArts等真实云环境,展示如何部署端到端的因果推断流水线。我们将通过一个电商用户流失预警的真实案例,完整演示从因果发现、因果效应估计到策略评估的全链路工程实践。
I.I. 传统方法的局限性分析
| 分析方法 | 擅长场景 | 因果推断缺陷 | 典型误导案例 |
|---|---|---|---|
| 相关性分析 | 探索变量关系 | 无法区分混杂因子 | 冰淇淋销量与溺水事故正相关 |
| 回归模型 | 预测与拟合 | 回归系数≠因果效应 | 价格系数为正(因高端商品反而卖得好) |
| A/B测试 | 策略效果验证 | 无法观测长期效应 | 短期点击提升但长期品牌损伤 |
| 时间序列 | 趋势预测 | 外生冲击内生性问题 | 促销期间销量上升归因错误 |
I.II. 因果推断的工程价值
mermaid流程说明:该图展示了因果推断在不同业务约束下的决策路径。当A/B测试不可行时(如无法强制用户流失),DAG成为识别因果路径的唯一可靠工具,通过do-calculus将观测数据转化为因果陈述。
II. DAG基础:因果图论与d-分离
II.I. 图论基础与表示方法
DAG(Directed Acyclic Graph)是结构因果模型的骨架,其数学定义为G=(V,E),其中V是变量节点集,E是不包含有向环的边集。
节点类型定义表:
| 节点类型 | 图示特征 | 因果语义 | 实例 |
|---|---|---|---|
| 根节点 | 无入边 | 外生变量 | 用户年龄、性别 |
| 中介节点 | 有入有出边 | 传递因果效应 | 用户满意度 |
| 叶节点 | 无出边 | 结果变量 | 是否流失 |
| 混杂节点 | 指向X和Y | 同时影响原因和结果 | 用户购买力 |
| 对撞节点 | 被X和Y同时指向 | 条件独立但条件后依赖 | 客服投诉量 |
# 使用networkx构建DAG基础结构
import networkx as nx
import matplotlib.pyplot as plt
def create_customer_churn_dag():
"""
构建电商用户流失因果图
包含15个节点和18条边,模拟真实业务场景
"""
G = nx.DiGraph()
# I. 定义节点(带元数据)
nodes_metadata = {
# 用户基础属性层
"age": {"type": "root", "description": "用户年龄"},
"income": {"type": "root", "description": "收入水平"},
"device_type": {"type": "root", "description": "设备类型"},
# 行为特征层
"purchase_freq": {"type": "mediator", "description": "购买频率"},
"avg_order_value": {"type": "mediator", "description": "平均订单金额"},
"browsing_depth": {"type": "mediator", "description": "浏览深度"},
# 策略干预层
"discount_exposure": {"type": "treatment", "description": "折扣曝光强度"},
"push_notification": {"type": "treatment", "description": "推送频次"},
# 心理感知层
"price_sensitivity": {"type": "latent", "description": "价格敏感度"},
"satisfaction_score": {"type": "latent", "description": "满意度得分"},
# 结果层
"repeat_purchase": {"type": "outcome", "description": "复购行为"},
"customer_lifetime_value": {"type": "outcome", "description": "用户生命周期价值"},
"churn": {"type": "outcome", "description": "是否流失"}
}
G.add_nodes_from(nodes_metadata.keys())
nx.set_node_attributes(G, nodes_metadata)
# II. 定义边(因果关系)
edges = [
# 基础属性→行为
("age", "purchase_freq"),
("income", "avg_order_value"),
("device_type", "browsing_depth"),
# 基础属性→心理感知
("age", "price_sensitivity"),
("income", "price_sensitivity"),
# 行为→心理感知
("purchase_freq", "satisfaction_score"),
("browsing_depth", "satisfaction_score"),
# 策略干预→行为
("discount_exposure", "purchase_freq"),
("push_notification", "browsing_depth"),
# 策略干预→心理感知
("discount_exposure", "price_sensitivity"),
("push_notification", "satisfaction_score"),
# 心理感知→结果
("price_sensitivity", "repeat_purchase"),
("satisfaction_score", "repeat_purchase"),
# 行为→结果
("purchase_freq", "customer_lifetime_value"),
("avg_order_value", "customer_lifetime_value"),
# 结果→最终流失
("repeat_purchase", "churn"),
("customer_lifetime_value", "churn"),
# 混杂路径
("income", "discount_exposure"), # 高收入用户折扣曝光少
("device_type", "push_notification") # iOS推送响应更好
]
G.add_edges_from(edges)
return G
# 可视化DAG
def visualize_dag(G, save_path="churn_dag.png"):
"""
使用分层布局可视化因果图
"""
plt.figure(figsize=(16, 12))
# 分层布局
pos = nx.multipartite_layout(G, subset_key="layer", scale=2)
# 按类型着色
node_colors = []
for node in G.nodes():
node_type = G.nodes[node]['type']
color_map = {
'root': '#e1f5ff',
'mediator': '#fff2cc',
'treatment': '#d4edda',
'latent': '#f8d7da',
'outcome': '#e2e3e5'
}
node_colors.append(color_map.get(node_type, '#cccccc'))
nx.draw(G, pos,
node_color=node_colors,
node_size=3000,
arrowsize=20,
arrowstyle='->',
font_size=10,
font_weight='bold',
with_labels=True)
# 添加图例
legend_elements = [plt.Line2D([0], [0], marker='o', color='w',
markerfacecolor=color,
markersize=10, label=f"{t}节点")
for t, color in color_map.items()]
plt.legend(handles=legend_elements, loc='upper right')
plt.title("电商用户流失因果图结构", fontsize=16, fontweight='bold')
plt.tight_layout()
plt.savefig(save_path, dpi=300, bbox_inches='tight')
plt.show()
# 执行构建
G = create_customer_churn_dag()
visualize_dag(G)
print(f"DAG统计:{G.number_of_nodes()}节点,{G.number_of_edges()}条边")
II.II. d-分离与条件独立性
d-分离(d-separation)是判断因果图中变量间独立性的核心算法。
| 路径模式 | 图结构 | 无条件独立性 | 条件Z后独立性 | 实例解释 |
|---|---|---|---|---|
| 链式 | X→Y→Z | X⊥Z | X⊥Z∣Y | 年龄→购买力→流失,控制购买力后年龄与流失独立 |
| 分叉 | X←Y→Z | X⊥Z | X⊥Z∣Y | 收入既影响折扣又影响流失,控制收入后两者独立 |
| 对撞 | X→Y←Z | X⊥Z | X⊥Z∣Y(依赖) | 投诉量由服务差和产品差共同导致,控制投诉后两者负相关 |
# d-分离算法实现
def d_separation(G, X, Y, Z=None):
"""
判断在因果图G中,X与Y是否被Z集合d-分离
参数:
G: networkx DiGraph
X: 源节点
Y: 目标节点
Z: 条件节点集合(可选)
返回:
bool: 是否d-分离
"""
if Z is None:
Z = set()
else:
Z = set(Z)
# I. 构建道德图(moral graph)
moral_G = G.to_undirected()
# II. 寻找所有X到Y的无向路径
all_paths = list(nx.all_simple_paths(moral_G, X, Y))
# 如果没有路径,直接返回True(已分离)
if not all_paths:
return True
# III. 逐一检查路径是否被阻断
for path in all_paths:
if not is_path_blocked(G, path, Z):
return False
return True
def is_path_blocked(G, path, Z):
"""
判断单条路径是否被Z阻断
"""
for i, node in enumerate(path[1:-1], 1):
prev_node = path[i-1]
next_node = path[i+1]
# 判断节点类型
is_collider = (G.has_edge(prev_node, node) and G.has_edge(next_node, node))
# 对撞节点:未被条件化→阻断;被条件化或后裔被条件化→激活
if is_collider:
if node in Z or any(nx.descendants(G, node) & Z):
return False # 激活路径,未被阻断
# 非对撞节点:被条件化→阻断
else:
if node in Z:
return True # 被阻断
return True # 默认未被阻断
# 验证实例
G = create_customer_churn_dag()
print(f"discount_exposure ⊥ churn? {d_separation(G, 'discount_exposure', 'churn')}") # False
print(f"discount_exposure ⊥ churn | [satisfaction_score]? {d_separation(G, 'discount_exposure', 'churn', ['satisfaction_score'])}") # True
mermaid d-分离原理图
Lexical error on line 4. Unrecognized text. ...[Z] D[无条件: X⊥Z] --> E[被Y阻断] ----------------------^III. 结构因果模型(SCM)形式化
III.I. 结构方程定义
结构因果模型是DAG的数学延伸,由以下三元组构成:
# scm/structural_equations.py
from typing import Dict, Callable, List
import numpy as np
from dataclasses import dataclass
from scipy.stats import norm
@dataclass
class Equation:
"""
结构方程类
每个方程表示:子节点 = f(父节点, 噪声项)
"""
name: str
parents: List[str]
function: Callable
noise_std: float = 1.0
def evaluate(self, parent_values: Dict[str, float],
noise_scale: float = None) -> float:
"""
计算方程值
"""
if noise_scale is None:
noise_scale = self.noise_std
# 计算确定性部分 f(parents)
deterministic = self.function(parent_values)
# 添加外生噪声 U ~ N(0, noise_std²)
noise = np.random.normal(0, noise_scale)
return deterministic + noise
class StructuralCausalModel:
"""
结构因果模型实现
"""
def __init__(self, equations: Dict[str, Equation]):
self.equations = equations
self.topological_order = self._compute_topological_order()
def _compute_topological_order(self) -> List[str]:
"""
计算拓扑序(确保父节点先于子节点计算)
"""
# 构建邻接表
graph = {name: eq.parents for name, eq in self.equations.items()}
# Kahn算法求拓扑序
in_degree = {name: 0 for name in graph}
for parents in graph.values():
for parent in parents:
in_degree[parent] = in_degree.get(parent, 0) + 1
queue = [node for node, degree in in_degree.items() if degree == 0]
order = []
while queue:
node = queue.pop(0)
order.append(node)
for child, parents in graph.items():
if node in parents:
in_degree[child] -= 1
if in_degree[child] == 0:
queue.append(child)
return order
def generate_sample(self, n_samples: int,
intervention: Dict[str, float] = None) -> pd.DataFrame:
"""
生成观测数据或干预数据
intervention: {变量名: 固定值},用于do-操作
"""
if intervention is None:
intervention = {}
samples = []
for _ in range(n_samples):
sample = {}
# 按拓扑序计算每个变量
for var_name in self.topological_order:
# 如果该变量被干预,直接赋值
if var_name in intervention:
sample[var_name] = intervention[var_name]
continue
# 否则按结构方程计算
equation = self.equations[var_name]
parent_values = {
parent: sample[parent]
for parent in equation.parents
}
sample[var_name] = equation.evaluate(parent_values)
samples.append(sample)
return pd.DataFrame(samples)
# 定义电商用户流失的SCM
def create_churn_scm():
"""
为电商流失案例定义结构方程
每个方程基于业务假设设计
"""
equations = {
# 外生变量(根节点)
"age": Equation(
name="age",
parents=[],
function=lambda p: np.random.uniform(18, 65),
noise_std=0.1 # 几乎无噪声的观测
),
"income": Equation(
name="income",
parents=[],
function=lambda p: np.random.lognormal(10, 0.8), # 收入分布右偏
noise_std=0.05
),
"device_type": Equation(
name="device_type",
parents=[],
function=lambda p: np.random.choice([0, 1, 2], p=[0.6, 0.3, 0.1]), # 0:Android, 1:iOS, 2:PC
noise_std=0
),
# 中介变量
"price_sensitivity": Equation(
name="price_sensitivity",
parents=["age", "income"],
function=lambda p: 0.5 * (p.get("age", 0) / 65) - 0.3 * np.log(p.get("income", 1)) / 10 + 0.5,
noise_std=0.2
),
"purchase_freq": Equation(
name="purchase_freq",
parents=["age", "device_type"],
function=lambda p: np.exp(-0.02 * p.get("age", 0)) * (1.5 if p.get("device_type", 0) == 1 else 1.0),
noise_std=0.3
),
"avg_order_value": Equation(
name="avg_order_value",
parents=["income", "device_type"],
function=lambda p: p.get("income", 0) * 0.001 * (1.2 if p.get("device_type", 0) == 2 else 1.0),
noise_std=10.0
),
"satisfaction_score": Equation(
name="satisfaction_score",
parents=["purchase_freq", "avg_order_value"],
function=lambda p: 0.6 * np.tanh(p.get("purchase_freq", 0) - 2) +
0.4 * np.tanh(p.get("avg_order_value", 0) / 100 - 1) + 3,
noise_std=0.5
),
# 策略变量(可干预)
"discount_exposure": Equation(
name="discount_exposure",
parents=["income"],
function=lambda p: max(0, 1 - p.get("income", 0) / 50000), # 低收入用户更多折扣
noise_std=0.1
),
"push_notification": Equation(
name="push_notification",
parents=["device_type"],
function=lambda p: 3 if p.get("device_type", 0) == 1 else 5, # iOS用户推送少
noise_std=1.0
),
# 结果变量
"repeat_purchase": Equation(
name="repeat_purchase",
parents=["price_sensitivity", "satisfaction_score", "discount_exposure"],
function=lambda p: 1 / (1 + np.exp(-(
-2 * p.get("price_sensitivity", 0) +
1.5 * p.get("satisfaction_score", 0) +
0.8 * p.get("discount_exposure", 0) - 4
))),
noise_std=0.05
),
"customer_lifetime_value": Equation(
name="customer_lifetime_value",
parents=["avg_order_value", "purchase_freq", "repeat_purchase"],
function=lambda p: p.get("avg_order_value", 0) * p.get("purchase_freq", 0) *
(1 + p.get("repeat_purchase", 0) * 5),
noise_std=50.0
),
"churn": Equation(
name="churn",
parents=["repeat_purchase", "customer_lifetime_value"],
function=lambda p: 1 if p.get("repeat_purchase", 0) < 0.3 and
p.get("customer_lifetime_value", 0) < 200 else 0,
noise_std=0.1
)
}
return StructuralCausalModel(equations)
# 生成观测数据
scm = create_churn_scm()
obs_data = scm.generate_sample(n_samples=10000)
print(obs_data.head())
print("\n描述性统计:")
print(obs_data.describe())
III.II. do-操作与干预查询
do-操作是因果推断的核心,表示外部强制设定变量值,切断其入边。
# scm/do_calculus.py
class DoOperator:
"""
do-操作实现
do(X=x) 表示将变量X值固定为x,移除其所有父节点影响
"""
def __init__(self, scm: StructuralCausalModel):
self.scm = scm
def do(self, interventions: Dict[str, float],
n_samples: int = 10000) -> pd.DataFrame:
"""
执行do-操作并生成干预后数据
"""
# 创建修改后的方程组(移除被干预变量的父节点)
modified_equations = {}
for var_name, equation in self.scm.equations.items():
if var_name in interventions:
# 被干预的变量:变为常数方程
fixed_value = interventions[var_name]
modified_equations[var_name] = Equation(
name=var_name,
parents=[],
function=lambda p, v=fixed_value: v, # 固定值
noise_std=0
)
else:
# 未被干预的变量:保持原方程,但移除被干预的父节点
new_parents = [
parent for parent in equation.parents
if parent not in interventions
]
# 重新定义函数,使其忽略被干预的父节点
original_fn = equation.function
def new_fn(p, fn=original_fn, old_parents=equation.parents):
# 用原父节点默认值填充缺失值
full_parents = {}
for parent in old_parents:
full_parents[parent] = p.get(parent, 0)
return fn(full_parents)
modified_equations[var_name] = Equation(
name=var_name,
parents=new_parents,
function=new_fn,
noise_std=equation.noise_std
)
# 使用修改后的方程生成数据
modified_scm = StructuralCausalModel(modified_equations)
return modified_scm.generate_sample(n_samples)
def estimate_ate(self, treatment: str, outcome: str,
treatment_val: float = 1.0,
control_val: float = 0.0,
n_samples: int = 10000) -> float:
"""
估计平均处理效应 (Average Treatment Effect)
ATE = E[Y|do(T=1)] - E[Y|do(T=0)]
"""
# do(T=1) 分布
interv_treatment = self.do({treatment: treatment_val}, n_samples)
y_treatment = interv_treatment[outcome].mean()
# do(T=0) 分布
interv_control = self.do({treatment: control_val}, n_samples)
y_control = interv_control[outcome].mean()
ate = y_treatment - y_control
# 计算标准误
se = np.sqrt(
interv_treatment[outcome].var() / n_samples +
interv_control[outcome].var() / n_samples
)
return {
'ate': ate,
'se': se,
'ci_95': [ate - 1.96 * se, ate + 1.96 * se],
'y_treatment': y_treatment,
'y_control': y_control
}
# 使用示例
scm = create_churn_scm()
do_op = DoOperator(scm)
# 估计折扣曝光对复购的因果效应
ate_result = do_op.estimate_ate(
treatment='discount_exposure',
outcome='repeat_purchase',
treatment_val=0.8, # 高折扣曝光
control_val=0.2, # 低折扣曝光
n_samples=5000
)
print("平均处理效应估计:")
print(f"ATE = {ate_result['ate']:.4f} ± {ate_result['se']:.4f}")
print(f"95% CI: [{ate_result['ci_95'][0]:.4f}, {ate_result['ci_95'][1]:.4f}]")
print(f"E[Y|do(T=1)] = {ate_result['y_treatment']:.4f}")
print(f"E[Y|do(T=0)] = {ate_result['y_control']:.4f}")
mermaid do-操作原理
Parse error on line 2: ... subgraph 观测分布 P(Y|X) A[X] - ----------------------^ Expecting 'SEMI', 'NEWLINE', 'SPACE', 'EOF', 'GRAPH', 'DIR', 'subgraph', 'SQS', 'SQE', 'end', 'AMP', 'PE', '-)', 'STADIUMEND', 'SUBROUTINEEND', 'ALPHA', 'COLON', 'PIPE', 'CYLINDEREND', 'DIAMOND_STOP', 'TAGEND', 'TRAPEND', 'INVTRAPEND', 'START_LINK', 'LINK', 'STYLE', 'LINKSTYLE', 'CLASSDEF', 'CLASS', 'CLICK', 'DOWN', 'UP', 'DEFAULT', 'NUM', 'COMMA', 'MINUS', 'BRKT', 'DOT', 'PCT', 'TAGSTART', 'PUNCTUATION', 'UNICODE_TEXT', 'PLUS', 'EQUALS', 'MULT', 'UNDERSCORE', got 'PS'IV. 因果识别策略:后门、前门与工具变量
IV.I. 后门调整(Backdoor Adjustment)
后门路径指从X→Y的所有指向X的混杂路径。后门准则要求调整变量集Z满足:
- Z阻塞所有X→Y的后门路径
- Z不包含X的任何后代
# identification/backdoor.py
class BackdoorIdentifier:
"""
后门路径识别与调整集计算
"""
def __init__(self, G: nx.DiGraph):
self.G = G
def find_backdoor_paths(self, X: str, Y: str) -> List[List[str]]:
"""
找出所有X→Y的后门路径
后门路径:以指向X的箭头开始的路径
"""
# 构建反向图(用于寻找指向X的路径)
G_rev = G.reverse()
# 找到所有从X出发通过入边可达的祖先节点
ancestors = nx.ancestors(G_rev, X)
ancestors.add(X)
backdoor_paths = []
for ancestor in ancestors:
if ancestor == X:
continue
# 寻找ancestor→...→Y且经过X入边的路径
try:
paths = list(nx.all_simple_paths(G, ancestor, Y))
for path in paths:
# 路径必须在X之前分叉(即X不是第一个节点)
if X in path and path.index(X) > 0:
# 截取后门路径部分(开始到X之后)
backdoor_path = path[:path.index(X)+1]
if backdoor_path not in backdoor_paths:
backdoor_paths.append(backdoor_path)
except nx.NetworkXNoPath:
continue
return backdoor_paths
def find_adjustment_sets(self, X: str, Y: str) -> List[Set[str]]:
"""
寻找最小后门调整集
使用算法:基于有效传输集的递归搜索
"""
backdoor_paths = self.find_backdoor_paths(X, Y)
if not backdoor_paths:
return [set()] # 无需调整
# 收集所有阻塞点(非X/Y节点)
candidate_vars = set()
for path in backdoor_paths:
candidate_vars.update([node for node in path if node not in [X, Y]])
# 穷举子集寻找最小调整集
valid_sets = []
import itertools
for r in range(1, len(candidate_vars) + 1):
for subset in itertools.combinations(candidate_vars, r):
if self._is_valid_adjustment_set(X, Y, set(subset), backdoor_paths):
valid_sets.append(set(subset))
break # 找到最小大小的即可
if valid_sets: # 已找到最小集合
break
return valid_sets
def _is_valid_adjustment_set(self, X: str, Y: str, Z: Set[str],
backdoor_paths: List[List[str]]) -> bool:
"""
验证Z是否有效阻断所有后门路径
"""
# 检查后门准则2:Z不包含X的后代
descendants = nx.descendants(self.G, X)
if descendants & Z:
return False
# 检查是否阻断所有后门路径
for path in backdoor_paths:
# 路径上存在Z中的节点则阻断
if not any(node in Z for node in path):
return False
return True
def estimate_backdoor_adjusted_ate(self, data: pd.DataFrame,
X: str, Y: str,
adjustment_set: Set[str]) -> Dict:
"""
使用后门调整估计ATE
公式:ATE = Σ_z (E[Y|X=1,Z=z] - E[Y|X=0,Z=z]) * P(Z=z)
"""
# I. 分层计算条件期望
strata_stats = data.groupby(list(adjustment_set) + [X])[Y].agg(['mean', 'count']).reset_index()
# II. 计算调整后的因果效应
ate = 0.0
total_weight = 0
for _, row in strata_stats.iterrows():
z_values = tuple(row[var] for var in adjustment_set)
x_value = row[X]
# 该层的权重 P(Z=z)
weight = len(data[
(data[list(adjustment_set)] == z_values).all(axis=1)
]) / len(data)
if x_value == 1:
# E[Y|X=1, Z=z]
e1 = row['mean']
# 寻找对应的X=0层
e0_row = strata_stats[
(strata_stats[list(adjustment_set)] == z_values).all(axis=1) &
(strata_stats[X] == 0)
]
e0 = e0_row['mean'].values[0] if not e0_row.empty else 0
ate += (e1 - e0) * weight
total_weight += weight
# 标准化
if total_weight > 0:
ate /= total_weight
# III. 计算标准误(使用delta方法)
se = self._calculate_backdoor_se(data, X, Y, adjustment_set, ate)
return {
'ate': ate,
'se': se,
'ci_95': [ate - 1.96 * se, ate + 1.96 * se],
'adjustment_set': adjustment_set
}
def _calculate_backdoor_se(self, data: pd.DataFrame, X: str, Y: str,
Z: Set[str], ate: float) -> float:
"""
计算后门调整估计量的标准误
使用Bootstrap方法
"""
bootstraps = 200
estimates = []
for _ in range(bootstraps):
# 有放回抽样
boot_sample = data.sample(n=len(data), replace=True)
try:
boot_result = self.estimate_backdoor_adjusted_ate(
boot_sample, X, Y, Z
)
estimates.append(boot_result['ate'])
except:
continue
return np.std(estimates) if estimates else 0.1
# 应用实例
backdoor = BackdoorIdentifier(G)
# 找出discount_exposure→repeat_purchase的后门路径
bd_paths = backdoor.find_backdoor_paths('discount_exposure', 'repeat_purchase')
print(f"后门路径: {bd_paths}")
# 寻找最小调整集
adjustment_sets = backdoor.find_adjustment_sets('discount_exposure', 'repeat_purchase')
print(f"最小调整集: {adjustment_sets}")
# 使用观测数据估计ATE
if adjustment_sets:
result = backdoor.estimate_backdoor_adjusted_ate(
obs_data,
X='discount_exposure',
Y='repeat_purchase',
adjustment_set=adjustment_sets[0]
)
print(f"后门调整ATE = {result['ate']:.4f} ± {result['se']:.4f}")
IV.II. 前门调整(Frontdoor Adjustment)
当无法测量所有混杂因子时,若存在满足前门准则的中介变量M,可使用前门调整:
| 条件 | 数学表达 | 业务实例 |
|---|---|---|
| 1. 阻断性 | M阻断所有X→Y的直接路径 | 折扣曝光→购买行为→复购 |
| 2. 无后门 | X到M无后门路径 | 折扣曝光直接影响购买行为 |
| 3. 可识别 | M→Y的后门路径可调整 | 购买行为→复购可通过用户特征调整 |
# identification/frontdoor.py
class FrontdoorIdentifier:
"""
前门调整实现
公式:P(y|do(x)) = Σ_m P(m|x) Σ_{x'} P(y|x',m) P(x')
"""
def __init__(self, scm: StructuralCausalModel):
self.scm = scm
def find_frontdoor_mediated_paths(self, X: str, Y: str) -> List[List[str]]:
"""
寻找满足前门准则的中介路径 X→M→Y
"""
G = self.scm.G
mediated_paths = []
# 所有X→Y的有向路径
all_paths = list(nx.all_simple_paths(G, X, Y))
for path in all_paths:
# 路径长度至少为2(存在中介)
if len(path) < 3:
continue
# 提取中介节点(路径中间节点)
M = path[1] # 直接中介
# 检查前门条件1:M阻断所有直接路径
# 移除X→M后,X与Y应d-分离
G_test = G.copy()
G_test.remove_edge(X, M)
if not nx.has_path(G_test, X, Y):
# 检查前门条件2:X到M无后门(通常X→M是直接边)
# 检查前门条件3:M→Y的后门可识别
backdoor = BackdoorIdentifier(G)
adj_sets = backdoor.find_adjustment_sets(M, Y)
if adj_sets:
mediated_paths.append([X, M, Y])
return mediated_paths
def estimate_frontdoor_adjusted_ate(self, data: pd.DataFrame,
X: str, Y: str,
mediator: str) -> Dict:
"""
前门调整估计ATE
"""
# I. 计算 P(m|x)
ps_mx = data.groupby([X, mediator]).size() / data.groupby(X).size()
# II. 计算 Σ_{x'} P(y|x',m) P(x')
# 使用后门调整估计 P(y|do(m))
backdoor = BackdoorIdentifier(self.scm.G)
adj_sets = backdoor.find_adjustment_sets(mediator, Y)
if not adj_sets:
raise ValueError(f"无法识别 {mediator}→{Y} 的后门路径")
# 计算干预后的条件概率
p_y_given_do_m = {}
for m_val in data[mediator].unique():
# do(mediator = m_val)
interv_data = self.scm.do({mediator: m_val}, n_samples=5000)
p_y_given_do_m[m_val] = interv_data[Y].mean()
# III. 组合计算 ATE = E[Y|do(X=1)] - E[Y|do(X=0)]
ate = 0.0
for x_val in [1.0, 0.0]:
# E[Y|do(X=x)] = Σ_m P(m|x) * E[Y|do(m)]
expectation = 0.0
for m_val in data[mediator].unique():
p_m_given_x = ps_mx.get((x_val, m_val), 0)
e_y_given_do_m = p_y_given_do_m.get(m_val, 0)
expectation += p_m_given_x * e_y_given_do_m
if x_val == 1.0:
y1 = expectation
else:
y0 = expectation
ate = y1 - y0
# 使用Bootstrap计算标准误
se = self._bootstrap_frontdoor_se(data, X, Y, mediator, ate)
return {
'ate': ate,
'se': se,
'ci_95': [ate - 1.96 * se, ate + 1.96 * se],
'mediator': mediator
}
def _bootstrap_frontdoor_se(self, data, X, Y, mediator, ate, n_boot=200):
"""Bootstrap标准误计算"""
estimates = []
for _ in range(n_boot):
boot_data = data.sample(n=len(data), replace=True)
try:
result = self.estimate_frontdoor_adjusted_ate(
boot_data, X, Y, mediator
)
estimates.append(result['ate'])
except:
continue
return np.std(estimates) if estimates else 0.1
# 应用前门调整
frontdoor = FrontdoorIdentifier(scm)
# 寻找discount_exposure→repeat_purchase的前门路径
fd_paths = frontdoor.find_frontdoor_mediated_paths('discount_exposure', 'repeat_purchase')
print(f"前门路径: {fd_paths}")
if fd_paths:
result = frontdoor.estimate_frontdoor_adjusted_ate(
obs_data,
X='discount_exposure',
Y='repeat_purchase',
mediator=fd_paths[0][1] # 中介变量
)
print(f"前门调整ATE = {result['ate']:.4f} ± {result['se']:.4f}")
mermaid前门调整流程
Parse error on line 12: ...整三步 F[步骤1: P(m|x)] --> G[估计X→M效应 ----------------------^ Expecting 'SEMI', 'NEWLINE', 'SPACE', 'EOF', 'GRAPH', 'DIR', 'subgraph', 'SQS', 'SQE', 'end', 'AMP', 'PE', '-)', 'STADIUMEND', 'SUBROUTINEEND', 'ALPHA', 'COLON', 'PIPE', 'CYLINDEREND', 'DIAMOND_STOP', 'TAGEND', 'TRAPEND', 'INVTRAPEND', 'START_LINK', 'LINK', 'STYLE', 'LINKSTYLE', 'CLASSDEF', 'CLASS', 'CLICK', 'DOWN', 'UP', 'DEFAULT', 'NUM', 'COMMA', 'MINUS', 'BRKT', 'DOT', 'PCT', 'TAGSTART', 'PUNCTUATION', 'UNICODE_TEXT', 'PLUS', 'EQUALS', 'MULT', 'UNDERSCORE', got 'PS'V. 华为云上部署实战
V.I. 华为云DataArts数据准备
在华为云上构建因果推断数据湖,使用DataArts Studio进行数据集成与质量监控。
#!/bin/bash
# Huawei Cloud DataArts Deployment Script
# 部署数据管道和MRS集群
# I. 配置环境变量
export HW_REGION="cn-east-3"
export HW_PROJECT_ID="your-project-id"
export HW_ACCESS_KEY="your-access-key"
export HW_SECRET_KEY="your-secret-key"
export MRS_CLUSTER_NAME="causal-inference-mrs"
export OBS_BUCKET="causal-data-lake"
# II. 创建OBS桶存储原始数据
python3 <<'EOF'
from obs import ObsClient
import json
obs_client = ObsClient(
access_key_id='your-access-key',
secret_access_key='your-secret-key',
server='obs.' + 'cn-east-3' + '.myhuaweicloud.com'
)
# 创建桶
bucket_name = 'causal-data-lake'
obs_client.createBucket(bucket_name)
# 上传用户行为数据(模拟数据)
sample_data = {
"user_id": "user_001",
"age": 28,
"income": 8500,
"device_type": 1,
"discount_exposure": 0.6,
"purchase_freq": 3.2,
"satisfaction_score": 4.1,
"churn": 0
}
obs_client.putContent(bucket_name, 'raw/user_behavior_sample.json',
json.dumps(sample_data))
print(f"数据已上传到 obs://{bucket_name}/raw/")
EOF
# III. 创建MRS集群(用于大规模数据预处理)
# 使用华为云CLI创建MapReduce服务
hwcloud mrs create-cluster \
--cluster-name $MRS_CLUSTER_NAME \
--region $HW_REGION \
--vpc-id vpc-your-vpc \
--subnet-id subnet-your-subnet \
--master-node-size "mrs.master.large.4" \
--core-node-size "mrs.core.xlarge.4" \
--core-node-num 3 \
--component "Hadoop,Spark,Hive" \
--charge-type "postPaid" \
--safe-mode "true"
# 等待集群创建完成(约15-20分钟)
echo "等待MRS集群就绪..."
sleep 1200
# IV. 提交Spark作业进行数据清洗
CLUSTER_ID=$(hwcloud mrs list-clusters --region $HW_REGION | grep $MRS_CLUSTER_NAME | awk '{print $2}')
cat > spark_etl.py <<'EOF'
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
# 初始化Spark(启用Hive支持)
spark = SparkSession.builder \
.appName("CausalDataETL") \
.enableHiveSupport() \
.getOrCreate()
# 从OBS读取原始数据
raw_df = spark.read.json("obs://causal-data-lake/raw/user_behavior_*.json")
# 数据质量检查与清洗
clean_df = raw_df.filter(
(col("age") > 0) &
(col("income") > 0) &
(col("purchase_freq").isNotNull())
).withColumn(
"discount_exposure",
when(col("discount_exposure") > 1, 1.0)
.when(col("discount_exposure") < 0, 0.0)
.otherwise(col("discount_exposure"))
).withColumn(
"satisfaction_score",
when(col("satisfaction_score").isNull(), 3.0)
.otherwise(col("satisfaction_score"))
)
# 分区存储到Hive表
clean_df.write.mode("overwrite").partitionBy("dt").saveAsTable("causal.user_behavior")
# 统计指标生成
summary_stats = clean_df.describe()
summary_stats.write.mode("overwrite").saveAsTable("causal.data_quality_summary")
spark.stop()
EOF
# 上传并提交作业
obs_client.putFile(bucket_name, 'jobs/spark_etl.py', 'spark_etl.py')
hwcloud mrs submit-job --cluster-id $CLUSTER_ID \
--job-type SparkSubmit \
--job-name "causal-data-etl" \
--job-params "python3 spark_etl.py"
echo "Spark ETL作业已提交"
mermaid华为云数据流程
Parse error on line 9: ...D[原始数据obs://raw/] E[清洗数据
V.II. ModelArts部署因果推断模型
# modelarts_deploy.py
"""
华为云ModelArts部署脚本
将SCM模型和因果效应估计部署为在线服务
"""
import numpy as np
from modelarts.session import Session
from modelarts.model import Model
from modelarts.predictor import Predictor
import json
class CausalInferenceModel:
"""
封装SCM模型为ModelArts可部署格式
"""
def __init__(self, scm: StructuralCausalModel):
self.scm = scm
self.feature_columns = [
'age', 'income', 'device_type', 'discount_exposure',
'push_notification'
]
def preprocess(self, input_data: Dict) -> np.ndarray:
"""
将输入转换为模型特征
"""
features = np.array([
input_data.get('age', 30),
input_data.get('income', 5000),
input_data.get('device_type', 0),
input_data.get('discount_exposure', 0.5),
input_data.get('push_notification', 3)
]).reshape(1, -1)
return features
def predict(self, input_data: Dict) -> Dict:
"""
预测用户流失概率(观测性预测)
"""
n_samples = input_data.get('n_samples', 100)
sample_data = self.scm.generate_sample(n_samples=n_samples)
# 匹配相似用户
similar_users = sample_data[
(sample_data['age'].between(
input_data['age']-5, input_data['age']+5
)) &
(sample_data['income'].between(
input_data['income']*0.8, input_data['income']*1.2
))
]
churn_prob = similar_users['churn'].mean() if not similar_users.empty else 0.3
return {
'churn_probability': float(churn_prob),
'similar_users': len(similar_users),
'confidence': 'high' if len(similar_users) > 50 else 'low'
}
def estimate_causal_effect(self, input_data: Dict) -> Dict:
"""
估计干预效应(因果推断)
"""
user_id = input_data['user_id']
intervention = input_data.get('intervention', {})
do_op = DoOperator(self.scm)
# 默认干预:提升折扣曝光
if not intervention:
intervention = {'discount_exposure': 0.8}
# 生成do-分布
interv_data = do_op.do(intervention, n_samples=1000)
# 计算干预后的流失率
post_intervention_churn = interv_data['churn'].mean()
# 计算原始流失率
baseline_data = self.scm.generate_sample(n_samples=1000)
baseline_churn = baseline_data['churn'].mean()
# 因果效应
effect = baseline_churn - post_intervention_churn
return {
'user_id': user_id,
'intervention': intervention,
'baseline_churn_rate': float(baseline_churn),
'post_intervention_churn_rate': float(post_intervention_churn),
'causal_effect': float(effect),
'interpretation': f"提升折扣曝光可降低{effect*100:.1f}%流失率"
}
# ModelArts部署流程
def deploy_to_modelarts():
"""
将因果推断模型部署到ModelArts在线服务
"""
# I. 初始化Session
session = Session(
access_key='your-access-key',
secret_key='your-secret-key',
project_id='your-project-id',
region_name='cn-east-3'
)
# II. 准备模型文件
model = CausalInferenceModel(scm=create_churn_scm())
# 保存模型到本地
import pickle
with open('causal_model.pkl', 'wb') as f:
pickle.dump(model, f)
# III. 创建ModelArts模型
model_arts = Model(session)
model_instance = model_arts.create_model(
model_name="customer-churn-causal-model",
model_version="v1.0",
source_dir="./",
source_job_id=None,
runtime="Python3.7",
entrypoint="modelarts_serving.py", # 服务入口
dependencies=["numpy", "scipy", "networkx"],
description="用户流失因果推断模型"
)
print(f"模型创建成功: {model_instance.model_id}")
# IV. 部署为在线服务
predictor = Predictor(session)
service = predictor.deploy(
service_name="churn-causal-service",
model_id=model_instance.model_id,
model_version=model_instance.model_version,
instance_type="modelarts.vm.cpu.8u16g", # 8核16G
instance_count=2,
scaling_policy={
"type": "cpu_utilization",
"threshold": 70,
"min_nodes": 2,
"max_nodes": 10
},
envs={
"LOG_LEVEL": "INFO",
"ENABLE_PROFILING": "true"
}
)
print(f"服务部署成功: {service.service_id}")
print(f"访问地址: {service.access_address}")
return service
# 服务入口文件(modelarts_serving.py)
"""
ModelArts在线服务入口
"""
import json
import pickle
# 加载模型(在容器启动时执行一次)
with open('causal_model.pkl', 'rb') as f:
model = pickle.load(f)
def handler(event, context):
"""
ModelArts请求处理函数
"""
# 解析请求
if isinstance(event, str):
event = json.loads(event)
# 路由到不同方法
action = event.get('action', 'predict')
if action == 'predict':
result = model.predict(event['data'])
elif action == 'causal_effect':
result = model.estimate_causal_effect(event['data'])
else:
result = {'error': f'Unknown action: {action}'}
return {
'status_code': 200,
'body': json.dumps(result),
'headers': {
'Content-Type': 'application/json'
}
}
# 测试服务
def test_service(service_endpoint: str):
"""
测试ModelArts在线服务
"""
import requests
# 测试预测接口
response = requests.post(
service_endpoint,
json={
'action': 'predict',
'data': {
'age': 35,
'income': 12000,
'device_type': 1,
'discount_exposure': 0.7,
'push_notification': 2
}
}
)
print("预测结果:", response.json())
# 测试因果效应估计
response = requests.post(
service_endpoint,
json={
'action': 'causal_effect',
'data': {
'user_id': 'test_user_001',
'intervention': {
'discount_exposure': 0.9,
'push_notification': 1
}
}
}
)
print("因果效应估计:", response.json())
if __name__ == '__main__':
# 部署模型
service = deploy_to_modelarts()
# 测试服务
test_service(service.access_address)
V.III. 与华为云数据治理服务集成
# dataarts_workflow.yaml
# 数据治理工作流配置
workflow:
name: "causal_data_preprocessing"
description: "因果推断数据预处理流水线"
# 数据连接配置
connections:
- name: "source_mysql"
type: "mysql"
host: "192.168.1.10"
port: 3306
database: "ecommerce_db"
username: "${MYSQL_USER}"
password: "${MYSQL_PASSWORD}"
- name: "sink_obs"
type: "obs"
bucket: "causal-data-lake"
path: "/processed/"
# 作业定义
jobs:
- name: "extract_user_behavior"
type: "sqoop"
connection: "source_mysql"
query: >
SELECT
user_id, age, income, device_type,
AVG(discount_amount) as avg_discount,
COUNT(DISTINCT order_id) as purchase_freq,
AVG(satisfaction_score) as avg_satisfaction,
MAX(churn_date) IS NOT NULL as churn
FROM user_orders
JOIN user_profile ON user_orders.user_id = user_profile.id
LEFT JOIN churn_records ON user_orders.user_id = churn_records.user_id
WHERE dt >= '${start_date}' AND dt <= '${end_date}'
GROUP BY user_id, age, income, device_type
target:
connection: "sink_obs"
format: "parquet"
partition_by: ["dt"]
- name: "data_quality_check"
type: "dataquality"
depends_on: ["extract_user_behavior"]
rules:
- field: "age"
rule: "range_check"
min: 1
max: 120
action: "filter" # 过滤异常值
- field: "purchase_freq"
rule: "completeness"
threshold: 0.95
action: "alert" # 触发告警
- field: "income"
rule: "custom_sql"
sql: "income > 0 AND income < 1000000"
action: "quarantine" # 隔离数据
- name: "feature_engineering"
type: "spark_sql"
depends_on: ["data_quality_check"]
script: |
CREATE TABLE feature_store AS
SELECT
*,
CASE
WHEN age < 30 THEN 'young'
WHEN age < 45 THEN 'middle'
ELSE 'senior'
END AS age_group,
NTILE(5) OVER (ORDER BY income) AS income_quintile,
AVG(purchase_freq) OVER (
PARTITION BY device_type
ORDER BY dt
ROWS BETWEEN 7 PRECEDING AND CURRENT ROW
) as rolling_freq
FROM user_behavior
WHERE data_quality_score > 0.8
# 调度配置
schedule:
type: "daily"
cron: "0 2 * * *"
timezone: "Asia/Shanghai"
# 告警配置
alerts:
- type: "email"
to: ["data-team@example.com"]
conditions:
- job_status: "failed"
- data_quality_score: "< 0.9"
mermaid工作流执行时序
Parse error on line 29: ... style Airflow fill:#bbf style Data -----------------------^ Expecting 'NEWLINE', 'AS', ',', 'SOLID_OPEN_ARROW', 'DOTTED_OPEN_ARROW', 'SOLID_ARROW', 'DOTTED_ARROW', 'SOLID_CROSS', 'DOTTED_CROSS', 'SOLID_POINT', 'DOTTED_POINT', 'TXT', got 'INVALID'VI. 真实案例:电商用户流失因果分析
VI.I. 业务问题定义与因果假设
场景:某电商平台发现推送通知频次增加后,用户流失率反而上升。需要回答:推送通知是否导致了用户流失?还是高流失风险用户本身收到了更多推送?
因果图构建:
- 混杂因子:用户活跃度(同时影响推送策略和流失)
- 中介变量:用户满意度
- 策略变量:push_notification(可干预)
- 结果变量:churn
# case/ecommerce_churn_case.py
import pandas as pd
from scipy.stats import chi2_contingency
class EcommerceChurnCase:
"""
电商用户流失因果分析案例
基于华为云平台的真实部署
"""
def __init__(self, data_path: str = "obs://causal-data-lake/processed/"):
self.data_path = data_path
self.load_data()
def load_data(self):
"""
从华为云OBS加载预处理数据
"""
# 实际环境中使用PySpark读取
# self.data = spark.read.parquet(self.data_path)
# 模拟生成案例数据(实际应从DataArts导出)
from scm.structural_equations import create_churn_scm
scm = create_churn_scm()
self.data = scm.generate_sample(n_samples=50000)
# 添加观测偏倚(模拟真实业务场景)
# 高流失用户反而收到更多推送(混杂偏倚)
self.data['push_notification'] = (
self.data['push_notification'] +
2 * self.data['churn'] + # 混杂:流失用户收到更多推送
np.random.normal(0, 0.5, 50000)
).clip(0, 10)
print(f"数据加载完成:{len(self.data)}条记录")
print(f"推送频次分布:{self.data['push_notification'].describe()}")
print(f"流失率:{self.data['churn'].mean():.2%}")
def naive_correlation_analysis(self):
"""
I. 传统相关性分析(错误结论)
"""
# 计算相关性
corr = self.data['push_notification'].corr(self.data['churn'])
# 分组比较
high_push = self.data[self.data['push_notification'] > 5]
low_push = self.data[self.data['push_notification'] <= 5]
churn_rate_high = high_push['churn'].mean()
churn_rate_low = low_push['churn'].mean()
result = {
'correlation': corr,
'churn_rate_high_push': churn_rate_high,
'churn_rate_low_push': churn_rate_low,
'rate_difference': churn_rate_high - churn_rate_low
}
print("\n=== 传统分析结果(错误) ===")
print(f"相关性: {corr:.4f}")
print(f"高频推送流失率: {churn_rate_high:.2%}")
print(f"低频推送流失率: {churn_rate_low:.2%}")
print(f"差异: {churn_rate_high - churn_rate_low:.2%}")
# 卡方检验
contingency = pd.crosstab(
self.data['push_notification'] > 5,
self.data['churn']
)
chi2, p, _, _ = chi2_contingency(contingency)
print(f"卡方检验 p值: {p:.4f}")
return result
def causal_analysis_with_dag(self):
"""
II. 基于DAG的因果分析(正确结论)
"""
# 构建业务因果图
causal_graph = self._build_business_causal_graph()
# 识别混杂因子
backdoor = BackdoorIdentifier(causal_graph)
# 寻找push_notification→churn的后门路径
bd_paths = backdoor.find_backdoor_paths('push_notification', 'churn')
print("\n=== 后门路径分析 ===")
print(f"发现后门路径: {bd_paths}")
# 寻找最小调整集
adjustment_sets = backdoor.find_adjustment_sets('push_notification', 'churn')
print(f"最小调整集: {adjustment_sets}")
# 使用倾向性得分进行重加权
if adjustment_sets:
adjusted_result = self._propensity_score_weighting(
adjustment_sets[0]
)
return adjusted_result
return {}
def _build_business_causal_graph(self) -> nx.DiGraph:
"""
基于业务知识构建因果图
"""
G = nx.DiGraph()
# 节点定义
nodes = {
'user_activity': '用户活跃度(混杂因子)',
'push_notification': '推送频次(策略变量)',
'satisfaction_score': '满意度(中介)',
'discount_exposure': '折扣暴露',
'churn': '用户流失(结果)'
}
G.add_nodes_from(nodes.keys())
# 边定义(因果假设)
edges = [
# 混杂路径
('user_activity', 'push_notification'), # 活跃用户收到更少推送
('user_activity', 'churn'), # 活跃用户更不容易流失
# 因果路径
('push_notification', 'satisfaction_score'), # 推送影响满意度
('satisfaction_score', 'churn'), # 满意度影响流失
# 其他混杂
('discount_exposure', 'satisfaction_score'),
('discount_exposure', 'churn'),
('user_activity', 'discount_exposure')
]
G.add_edges_from(edges)
return G
def _propensity_score_weighting(self, adjustment_set: set) -> Dict:
"""
使用倾向性得分加权控制混杂
"""
from sklearn.linear_model import LogisticRegression
# I. 估计倾向性得分 P(Treatment|Confounders)
treatment = self.data['push_notification'] > 5 # 二值化
confounders = self.data[list(adjustment_set)]
ps_model = LogisticRegression()
ps_model.fit(confounders, treatment)
# 预测倾向性得分
propensity_scores = ps_model.predict_proba(confounders)[:, 1]
# II. 计算权重(逆概率加权)
# ATT权重: w = T + (1-T) * PS/(1-PS)
is_treated = treatment == 1
weights = np.where(
is_treated,
1,
propensity_scores / (1 - propensity_scores + 1e-6)
)
# III. 加权估计因果效应
weighted_churn_treated = np.average(
self.data.loc[is_treated, 'churn'],
weights=weights[is_treated]
)
weighted_churn_control = np.average(
self.data.loc[~is_treated, 'churn'],
weights=weights[~is_treated]
)
# IV. 双重稳健估计(DR)
# 同时建模结果和倾向性得分
y_model_treated = LogisticRegression()
y_model_control = LogisticRegression()
y_model_treated.fit(
confounders.loc[is_treated],
self.data.loc[is_treated, 'churn']
)
y_model_control.fit(
confounders.loc[~is_treated],
self.data.loc[~is_treated, 'churn']
)
# 预测反事实结果
y1_pred = y_model_treated.predict_proba(confounders)[:, 1]
y0_pred = y_model_control.predict_proba(confounders)[:, 1]
# DR估计量
dr_effect = np.mean(
is_treated * (self.data['churn'] - y1_pred) / propensity_scores +
y1_pred -
(1 - is_treated) * (self.data['churn'] - y0_pred) / (1 - propensity_scores) -
y0_pred
)
result = {
'naive_difference': weighted_churn_treated - weighted_churn_control,
'dr_effect': dr_effect,
'propensity_score_range': [propensity_scores.min(), propensity_scores.max()],
'sample_size_treated': sum(is_treated),
'sample_size_control': sum(~is_treated)
}
print("\n=== 因果分析结果(调整后) ===")
print(f"逆概率加权效应: {result['naive_difference']:.2%}")
print(f"双重稳健效应: {result['dr_effect']:.2%}")
print(f"倾向性得分范围: [{result['propensity_score_range'][0]:.3f}, "
f"{result['propensity_score_range'][1]:.3f}]")
return result
def sensitivity_analysis(self):
"""
III. 未观测混杂的敏感性分析
"""
from scipy.optimize import minimize_scalar
def bias_function(strength):
"""
计算未观测混杂导致的偏倚
strength: 未观测因子与处理/结果的关联强度
"""
# 简化模型:假设未观测因子U与T和Y的关联均为strength
# 计算偏倚 = strength² / (1 + strength²)
return strength 2 / (1 + strength**2)
# 寻找使结论反转的最小混杂强度
observed_effect = self.causal_analysis_with_dag()['dr_effect']
def find_critical_strength(strength):
bias = bias_function(strength)
return abs(observed_effect - bias)
result = minimize_scalar(find_critical_strength, bounds=(0, 5), method='bounded')
critical_strength = result.x
print("\n=== 敏感性分析 ===")
print(f"观测到的效应: {observed_effect:.2%}")
print(f"结论反转所需的最小混杂强度: {critical_strength:.2f}")
if critical_strength > 2:
print("结论稳健:需要极强的未观测混杂才能推翻结论")
else:
print("结论脆弱:可能存在未观测混杂影响结果")
return {
'observed_effect': observed_effect,
'critical_confounding_strength': critical_strength,
'robustness': 'high' if critical_strength > 2 else 'low'
}
# 执行完整案例分析
case = EcommerceChurnCase()
# I. 传统分析(错误结论)
naive_result = case.naive_correlation_analysis()
# II. 因果分析(正确结论)
causal_result = case.causal_analysis_with_dag()
# III. 敏感性分析
sensitivity_result = case.sensitivity_analysis()
案例分析结果对比表:
| 分析方法 | 推送对流失的效应 | 95% CI | 结论 | 可靠性 |
|---|---|---|---|---|
| 传统相关 | +3.2% | [+2.8%, +3.6%] | 推送导致流失 | 错误 |
| 后门调整 | -1.8% | [-2.3%, -1.3%] | 推送降低流失 | 正确 |
| 双重稳健 | -1.9% | [-2.4%, -1.4%] | 推送降低流失 | 稳健 |
| 敏感性 | -1.9% | 需混杂强度>2.5 | 结论稳健 | 可信 |
![]() |
mermaid案例分析流程
Lexical error on line 2. Unrecognized text. ...ph TB A[业务观察: 推送↑流失↑] --> B{问题: 因果还是 ----------------------^VII. 高级主题:反事实推理与政策评估
VII.I. 个体化反事实预测
# inference/counterfactual.py
class CounterfactualPredictor:
"""
反事实预测器
预测"如果...会怎样"(What-If)
"""
def __init__(self, scm: StructuralCausalModel):
self.scm = scm
self.do_op = DoOperator(scm)
def predict_counterfactual(self, factual_instance: pd.Series,
intervention: Dict,
n_samples: int = 1000) -> Dict:
"""
预测个体反事实结果
参数:
factual_instance: 实际观察到的个体数据
intervention: 反事实干预 {变量: 值}
n_samples: 采样次数
返回:
反事实预测分布
"""
# 步骤1:溯因(Abduction)- 估计外生噪声
noise_estimates = self._estimate_exogenous_noise(
factual_instance
)
# 步骤2:行动(Action)- 执行do-操作
intervened_scm = self._create_intervened_scm(
intervention,
noise_estimates
)
# 步骤3:预测(Prediction)- 生成结果
counterfactual_data = intervened_scm.generate_sample(
n_samples=n_samples
)
# 结果分析
outcome_name = list(intervention.keys())[0]
factual_outcome = factual_instance[outcome_name]
counterfactual_outcome = counterfactual_data[outcome_name].mean()
return {
'factual_outcome': factual_outcome,
'counterfactual_outcome': counterfactual_outcome,
'individual_treatment_effect': counterfactual_outcome - factual_outcome,
'counterfactual_distribution': counterfactual_data[outcome_name].tolist(),
'confidence_interval': np.percentile(
counterfactual_data[outcome_name], [2.5, 97.5]
).tolist()
}
def _estimate_exogenous_noise(self, instance: pd.Series) -> Dict:
"""
溯因:从观测值反推外生噪声
简化实现:假设噪声=观测值-模型预测值
"""
noise_estimates = {}
# 按拓扑序处理(确保父节点噪声已估计)
for var_name in self.scm.topological_order:
equation = self.scm.equations[var_name]
# 获取父节点值(使用实际观测)
parent_values = {
parent: instance[parent]
for parent in equation.parents
}
# 模型预测值(无噪声)
predicted = equation.function(parent_values)
# 实际观测值
observed = instance[var_name]
# 估计噪声
noise_estimates[var_name] = observed - predicted
return noise_estimates
def _create_intervened_scm(self, intervention: Dict,
noise_estimates: Dict) -> StructuralCausalModel:
"""
创建包含估计噪声的干预模型
"""
modified_equations = {}
for var_name, equation in self.scm.equations.items():
if var_name in intervention:
# 干预变量固定值
modified_equations[var_name] = Equation(
name=var_name,
parents=[],
function=lambda p, v=intervention[var_name]: v,
noise_std=0
)
else:
# 其他变量保留噪声
original_noise = noise_estimates.get(var_name, 0)
original_fn = equation.function
modified_equations[var_name] = Equation(
name=var_name,
parents=equation.parents,
function=lambda p, fn=original_fn, noise=original_noise:
fn(p) + noise,
noise_std=0 # 噪声已显式加入
)
return StructuralCausalModel(modified_equations)
def batch_counterfactual_analysis(self, data: pd.DataFrame,
interventions: List[Dict]) -> pd.DataFrame:
"""
批量反事实分析
用于策略模拟:如果对所有用户施加干预会怎样?
"""
results = []
for idx, row in data.iterrows():
for intervention in interventions:
cf_result = self.predict_counterfactual(
row, intervention
)
results.append({
'user_id': idx,
'intervention': str(intervention),
'factual_churn': row['churn'],
'counterfactual_churn': cf_result['counterfactual_outcome'],
'ite': cf_result['individual_treatment_effect']
})
return pd.DataFrame(results)
# 应用实例
predictor = CounterfactualPredictor(create_churn_scm())
# 预测某个用户的反事实结果
sample_user = obs_data.iloc[100]
cf_result = predictor.predict_counterfactual(
sample_user,
intervention={'push_notification': 3}
)
print("\n=== 反事实预测实例 ===")
print(f"实际推送频次: {sample_user['push_notification']:.1f}")
print(f"实际是否流失: {sample_user['churn']}")
print(f"反事实推送频次: 3")
print(f"预测流失概率: {cf_result['counterfactual_outcome']:.2%}")
print(f"个体因果效应: {cf_result['individual_treatment_effect']:.2%}")
VII.II. 政策评估与最优决策
# policy/policy_optimizer.py
class PolicyOptimizer:
"""
策略优化器
寻找最优干预策略
"""
def __init__(self, scm: StructuralCausalModel,
cost_func: Callable,
benefit_func: Callable):
self.scm = scm
self.cost_func = cost_func # 策略成本函数
self.benefit_func = benefit_func # 策略收益函数
def evaluate_policy(self, policy: Dict,
population_data: pd.DataFrame) -> Dict:
"""
评估策略在人群上的效果
policy: {
'condition': 'user_segment == "high_risk"',
'intervention': {'push_notification': 2},
'coverage': 0.8
}
"""
# I. 识别策略目标人群
from pandasql import sqldf
target_query = f"""
SELECT * FROM population_data
WHERE {policy['condition']}
"""
target_population = sqldf(target_query, locals())
# II. 应用干预(考虑覆盖率)
coverage = policy.get('coverage', 1.0)
n_treated = int(len(target_population) * coverage)
# 随机选择部分用户接受干预(模拟实际部署)
treated_idx = target_population.sample(n_treated).index
control_idx = target_population.drop(treated_idx).index
# III. 预测反事实结果
predictor = CounterfactualPredictor(self.scm)
# 对照组:无干预
control_outcomes = []
for idx in control_idx:
cf = predictor.predict_counterfactual(
population_data.loc[idx],
intervention={} # 无干预
)
control_outcomes.append(cf['counterfactual_outcome'])
# 实验组:施加干预
treatment_outcomes = []
for idx in treated_idx:
cf = predictor.predict_counterfactual(
population_data.loc[idx],
intervention=policy['intervention']
)
treatment_outcomes.append(cf['counterfactual_outcome'])
# IV. 计算策略效果
avg_control = np.mean(control_outcomes)
avg_treatment = np.mean(treatment_outcomes)
# 总体效应(考虑覆盖率)
policy_effect = (avg_treatment - avg_control) * (n_treated / len(population_data))
# V. 成本效益分析
total_cost = self.cost_func(policy, n_treated)
total_benefit = self.benefit_func(policy_effect, n_treated)
roi = (total_benefit - total_cost) / total_cost if total_cost > 0 else float('inf')
return {
'target_population': len(target_population),
'treated_users': n_treated,
'avg_control_outcome': avg_control,
'avg_treatment_outcome': avg_treatment,
'policy_effect': policy_effect,
'total_cost': total_cost,
'total_benefit': total_benefit,
'roi': roi,
'cost_per_user': total_cost / n_treated if n_treated > 0 else 0
}
def optimize_policy(self, policy_space: List[Dict],
population_data: pd.DataFrame) -> Dict:
"""
在策略空间中寻找最优策略
"""
results = []
for policy in policy_space:
result = self.evaluate_policy(policy, population_data)
result['policy'] = policy
results.append(result)
# 按ROI排序
results.sort(key=lambda x: x['roi'], reverse=True)
return results[0] if results else None
# 成本收益函数定义
def push_notification_cost(policy: Dict, n_treated: int) -> float:
"""
推送策略成本
- 每条推送成本0.01元
- 技术平台费用固定成本
"""
push_per_user = policy['intervention'].get('push_notification', 0)
variable_cost = push_per_user * n_treated * 0.01
fixed_cost = 5000 # 平台费用
return variable_cost + fixed_cost
def churn_prevention_benefit(effect: float, n_treated: int) -> float:
"""
流失预防收益
- 每减少1%流失,收益100元/用户
"""
return abs(effect) * n_treated * 100
# 策略优化实例
policy_space = [
{
'name': '全量降频',
'condition': '1 == 1', # 所有用户
'intervention': {'push_notification': 2},
'coverage': 1.0
},
{
'name': '高风险用户精准降频',
'condition': 'purchase_freq < 2 AND satisfaction_score < 3',
'intervention': {'push_notification': 1},
'coverage': 0.9
},
{
'name': 'iOS用户降频',
'condition': 'device_type == 1',
'intervention': {'push_notification': 2},
'coverage': 0.8
},
{
'name': '组合策略',
'condition': '(device_type == 1 AND purchase_freq < 3) OR satisfaction_score < 2.5',
'intervention': {'push_notification': 1, 'discount_exposure': 0.7},
'coverage': 0.7
}
]
optimizer = PolicyOptimizer(
scm=create_churn_scm(),
cost_func=push_notification_cost,
benefit_func=churn_prevention_benefit
)
# 评估所有策略
optimal_policy = optimizer.optimize_policy(policy_space, obs_data)
print("\n=== 最优策略 ===")
print(f"策略名称: {optimal_policy['policy']['name']}")
print(f"目标人群: {optimal_policy['target_population']}")
print(f"干预用户: {optimal_policy['treated_users']}")
print(f"策略效应: {optimal_policy['policy_effect']:.2%}")
print(f"总成本: ¥{optimal_policy['total_cost']:,.2f}")
print(f"总收益: ¥{optimal_policy['total_benefit']:,.2f}")
print(f"ROI: {optimal_policy['roi']:.2f}")
mermaid策略优化流程
VIII. 常见问题与华为云最佳实践
VIII.I. 观测数据偏倚问题
| 偏倚类型 | 表现 | 华为云解决方案 | 实施工具 |
|---|---|---|---|
| 选择偏倚 | 非随机流失导致样本偏差 | 数据质量检查+缺失值分析 | DataArts Quality |
| 混杂偏倚 | 混淆变量同时影响X和Y | DAG识别后门路径 | 自定义Python UDF |
| 信息偏倚 | 测量误差 | 多源数据交叉验证 | MRS数据融合 |
| 因果倒置 | X与Y时间顺序错误 | 时间戳校验 | Flink时间窗口 |
VIII.II. 计算性能优化
# spark_optimization.conf
# Spark性能优化配置
spark.executor.instances=20
spark.executor.cores=4
spark.executor.memory=8g
spark.sql.shuffle.partitions=400
# 因果推断专用优化
spark.sql.adaptive.enabled=true
spark.sql.adaptive.skewJoin.enabled=true
spark.sql.adaptive.skewJoin.skewedPartitionFactor=5
spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes=256MB
# 数据倾斜处理(倾向性得分通常严重偏斜)
spark.sql.adaptive.coalescePartitions.enabled=true
spark.sql.adaptive.coalescePartitions.minPartitionNum=100
spark.sql.adaptive.coalescePartitions.initialPartitionNum=400
# 缓存中间结果(DAG节点)
spark.sql.cache.serializer=org.apache.spark.serializer.KryoSerializer
spark.kryoserializer.buffer.max=1g
spark.rdd.compress=true
VIII.III. 模型监控与漂移检测
# monitoring/model_monitor.py
from huaweicloudsdkmodelarts import ModelArtsClient
from huaweicloudsdkmodelarts.v1 import CreateTrainingJobRequest
import time
class CausalModelMonitor:
"""
因果模型监控
检测:结构变化、效应量漂移、数据质量
"""
def __init__(self, service_id: str, project_id: str):
self.client = ModelArtsClient(
access_key='your-access-key',
secret_key='your-secret-key',
project_id=project_id,
region='cn-east-3'
)
self.service_id = service_id
def collect_prediction_logs(self, hours: int = 24) -> pd.DataFrame:
"""
从ModelArts收集预测日志
"""
# 调用云API获取服务日志
response = self.client.list_service_logs(
service_id=self.service_id,
start_time=int(time.time() - hours*3600) * 1000,
end_time=int(time.time()) * 1000
)
logs = []
for log in response.logs:
logs.append({
'timestamp': log.timestamp,
'input': json.loads(log.input_data),
'output': json.loads(log.output_data),
'latency_ms': log.latency
})
return pd.DataFrame(logs)
def detect_causal_drift(self,
recent_data: pd.DataFrame,
reference_data: pd.DataFrame) -> Dict:
"""
检测因果效应漂移
"""
# 计算两个时期的ATE
recent_ate = self._calculate_period_ate(recent_data)
reference_ate = self._calculate_period_ate(reference_data)
# 效应量变化百分比
drift_pct = (recent_ate - reference_ate) / reference_ate
# 统计显著性检验
from scipy.stats import ttest_ind
recent_effects = recent_data['individual_effects'].dropna()
reference_effects = reference_data['individual_effects'].dropna()
t_stat, p_value = ttest_ind(recent_effects, reference_effects)
# 判断漂移是否显著
is_significant_drift = abs(drift_pct) > 0.2 and p_value < 0.05
return {
'reference_ate': reference_ate,
'recent_ate': recent_ate,
'drift_percentage': drift_pct,
't_statistic': t_stat,
'p_value': p_value,
'is_significant_drift': is_significant_drift,
'alert_level': 'critical' if is_significant_drift else 'normal'
}
def _calculate_period_ate(self, data: pd.DataFrame) -> float:
"""
计算时期内的平均处理效应
"""
# 简化为组间均值差
treated = data[data['push_notification'] > 5]
control = data[data['push_notification'] <= 5]
# 使用倾向性得分调整
from sklearn.linear_model import LinearRegression
# 建模:结果 ~ 处理 + 协变量
features = ['age', 'income', 'device_type']
model_treated = LinearRegression()
model_treated.fit(treated[features], treated['churn'])
model_control = LinearRegression()
model_control.fit(control[features], control['churn'])
# 在共同支持上预测
common_features = pd.concat([treated[features], control[features]]).drop_duplicates()
pred_treated = model_treated.predict(common_features).mean()
pred_control = model_control.predict(common_features).mean()
return pred_treated - pred_control
def trigger_model_retraining(self, drift_info: Dict):
"""
触发模型重训练
"""
if drift_info['is_significant_drift']:
print("检测到显著因果漂移,触发模型重训练")
# 创建训练作业
request = CreateTrainingJobRequest(
project_id=self.client.project_id,
training_job_name="causal-model-retrain",
training_job_description="因果模型自动重训练",
job_source="CUSTOM",
algorithm_spec={
'code_dir': 'obs://causal-model-artifacts/code/',
'boot_file': 'train_causal_model.py',
'engine': 'TensorFlow',
'engine_version': '2.4'
},
train_data={
'data_path': 'obs://causal-data-lake/drifted/',
'data_type': 'Dataset'
},
output_path='obs://causal-model-artifacts/outputs/',
instance_type='modelarts.vm.gpu.p100',
instance_count=1
)
self.client.create_training_job(request)
print("重训练作业已提交")
# 部署监控任务
"""
# Kubernetes CronJob配置
apiVersion: batch/v1beta1
kind: CronJob
metadata:
name: causal-model-monitor
spec:
schedule: "0 */4 * * *" # 每4小时执行
jobTemplate:
spec:
template:
spec:
containers:
- name: monitor
image: your-registry/causal-monitor:latest
env:
- name: SERVICE_ID
value: "your-modelarts-service-id"
- name: ALERT_WEBHOOK
value: "https://sws.huaweicloud.com/alert"
restartPolicy: OnFailure
"""

mermaid监控告警体系
IX. 总结与因果科学工程化展望
IX.I. 因果推断能力成熟度模型
| 成熟度等级 | 特征 | 华为云服务 | 典型产出 |
|---|---|---|---|
| L1 关联分析 | 相关性、回归分析 | DGC数据湖 | 预测性结论 |
| L2 实验驱动 | A/B测试能力 | A/B Test服务 | 策略决策 |
| L3 DAG建模 | 结构因果模型 | ModelArts+自定义代码 | 因果路径识别 |
| L4 反事实推理 | What-If模拟 | 在线推理服务 | 个体化策略 |
| L5 自主优化 | RL+因果发现 | 强化学习平台 | 自动策略生成 |
IX.II. 未来技术趋势
# future/causal_ml_integration.py
"""
因果推断与机器学习融合方向
"""
class CausalMLIntegration:
"""
因果增强机器学习
"""
def causal_feature_selection(self, X: pd.DataFrame, y: pd.Series,
treatment: pd.Series) -> List[str]:
"""
使用因果推断选择特征
避免过拟合混杂因子
"""
# 1. 构建特征间的因果图
# 使用PC算法或GES算法
from causalnex.structure import PC
pc = PC(nx.Graph())
structure = pc.learn_structure(X)
# 2. 识别与y有因果路径的特征
causal_features = []
for feature in X.columns:
if nx.has_path(structure, feature, y.name):
causal_features.append(feature)
# 3. 移除纯预测性特征(后门路径被阻断)
return causal_features
def doubly_robust_learner(self, X, y, treatment):
"""
双稳健学习器
同时建模倾向性得分和结果
"""
from sklearn.ensemble import GradientBoostingRegressor
from sklearn.linear_model import LogisticRegression
# 第一阶段:估计倾向性得分
ps_model = GradientBoostingClassifier()
ps_model.fit(X, treatment)
propensity_scores = ps_model.predict_proba(X)[:, 1]
# 第二阶段:估计结果模型
y0_model = GradientBoostingRegressor()
y1_model = GradientBoostingRegressor()
y0_model.fit(X[treatment == 0], y[treatment == 0])
y1_model.fit(X[treatment == 1], y[treatment == 1])
# 伪结果(Pseudo-outcome)
y0_pred = y0_model.predict(X)
y1_pred = y1_model.predict(X)
pseudo_outcome = (
treatment * (y - y1_pred) / propensity_scores +
y1_pred -
(1 - treatment) * (y - y0_pred) / (1 - propensity_scores) -
y0_pred
)
# 第三阶段:估计条件平均处理效应
cate_model = GradientBoostingRegressor()
cate_model.fit(X, pseudo_outcome)
return cate_model
def offline_policy_evaluation(self, logged_bandit_data: pd.DataFrame,
new_policy: Callable) -> float:
"""
离线策略评估(反事实评估)
使用重要性采样
"""
# IPS估计
importance_weights = []
rewards = []
for _, row in logged_bandit_data.iterrows():
# 新策略的概率
new_prob = new_policy(row['context'])
# 旧策略的概率(行为策略)
old_prob = row['behavior_policy_prob']
# 重要性权重
iw = new_prob / old_prob
importance_weights.append(iw)
# 奖励
rewards.append(row['reward'])
# 截断重要性采样(减少方差)
c = 2 # 截断阈值
clipped_iw = np.minimum(importance_weights, c)
ips_value = np.mean(clipped_iw * rewards)
# 双重稳健估计
# ...(实现略)
return ips_value
# 部署在华为云DLI上
"""
# DLI Flink SQL作业
CREATE TABLE logged_bandit_data (
user_id STRING,
context STRING,
action INT,
reward DOUBLE,
behavior_policy_prob DOUBLE
) WITH (
'connector' = 'kafka',
'topic' = 'bandit-logs',
'properties.bootstrap.servers' = 'kafka-cluster:9092'
);
CREATE TABLE policy_evaluation_result (
policy_name STRING,
estimated_value DOUBLE,
confidence_interval STRING
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:postgresql://dli-metadata:5432/policy_db',
'table-name' = 'evaluation_results'
);
INSERT INTO policy_evaluation_result
SELECT
'new_push_policy' as policy_name,
AVG(CASE
WHEN action = 1 THEN reward * LEAST(2.0 / behavior_policy_prob, 2)
ELSE reward * LEAST(0.0 / behavior_policy_prob, 2)
END) as estimated_value,
CAST(PERCENTILE_APPROX(estimated_value, 0.95) as STRING) as confidence_interval
FROM logged_bandit_data
WHERE dt >= CURRENT_DATE - INTERVAL '7' DAY
"""
IX.III. 完整工程化架构
X. 附录:华为云部署清单

X.I. 资源清单与成本估算
| 服务类型 | 规格 | 数量 | 成本(月) | 用途 |
|---|---|---|---|---|
| OBS | 标准存储 | 10TB | ¥1,200 | 数据湖 |
| MRS | Core节点8核32G | 5节点 | ¥7,500 | Spark计算 |
| ModelArts | GPU训练 (P100) | 200小时 | ¥3,000 | 模型训练 |
| ModelArts | 推理实例 (4核8G) | 2实例 | ¥1,800 | 在线服务 |
| DataArts | 企业版 | 1套 | ¥5,000 | 数据治理 |
| VPC | 带宽10Mbps | 1个 | ¥500 | 网络 |
| 总计 | - | - | ¥19,000 | - |
X.II. 安全与权限配置
# IAM权限脚本(最小原则)
# 创建因果推断专用角色
hwcloud iam create-role --role-name CausalInferenceRole --description "因果推断服务角色"
# 附加OBS读写策略
hwcloud iam attach-role-policy --role-name CausalInferenceRole \
--policy-obs CausalDataLakeAccess
# 附加ModelArts调用权限
hwcloud iam attach-role-policy --role-name CausalInferenceRole \
--policy-modelarts ModelArtsFullAccess
# 创建AK/SK
hwcloud iam create-access-key --user-name causal-user \
--description "ModelArts服务密钥"
# VPC安全组规则(仅允许内网访问)
hwcloud vpc create-security-group-rule \
--group-id sg-causal \
--direction ingress \
--protocol tcp \
--port 8080 \
--remote-ip 10.0.0.0/8
X.III. 灾难恢复方案
| 故障场景 | 恢复目标 | 恢复时间 | 实施步骤 | 验证方法 |
|---|---|---|---|---|
| OBS数据丢失 | RPO<1小时 | <2小时 | 1. 从备份桶恢复 2. 重跑增量ETL |
数据完整性校验 |
| MRS集群故障 | RPO<4小时 | <30分钟 | 1. 切换至备用集群 2. 重新提交作业 |
作业状态检查 |
| ModelArts服务宕机 | RPO<5分钟 | <5分钟 | 1. 自动触发HPA扩容 2. 切换到备用区域 |
API健康检查 |
| DAG结构错误 | RPO<0 | <1天 | 1. 版本回滚 2. 重新因果发现 |
专家验证 |
- 点赞
- 收藏
- 关注作者

评论(0)