异质性处理效应的探测与解释:子群分析进阶
I. 引言:超越平均效应的局限
在数字经济和精准营销时代,“一刀切"的策略已难以满足精细化运营的需求。传统的A/B测试报告通常聚焦于平均处理效应(Average Treatment Effect, ATE),例如"新推荐算法使整体点击率提升2.3%”。然而,这种聚合指标可能掩盖了用户群体间的显著差异:年轻用户可能提升8%,而老年用户反而下降3%,最终相互抵消形成微弱的平均效应。
异质性处理效应分析的核心价值在于识别"谁更受益"这一关键问题。通过探测处理效应的变异来源,企业可以:
- 实施个性化干预策略
- 优化资源分配
- 规避对特定子群的负面效应
- 深化对业务机制的理解
II. 异质性处理效应的理论基础
II.1 潜在结果框架下的HTE定义
在Rubin潜在结果框架中,个体i的处理效应定义为:
其中,和分别表示接受处理与未接受处理时的潜在结果。由于个体层面因果效应的不可识别性,传统研究聚焦于总体平均处理效应:
异质性处理效应分析则关注条件平均处理效应(Conditional Average Treatment Effect, CATE):
其中是协变量向量。CATE函数揭示了处理效应如何随个体特征变化,是子群分析的基石。
II.2 识别假设与估计挑战
CATE的识别依赖于三个核心假设:
| 假设类型 | 数学表达 | 实际意义 |
|---|---|---|
| 条件可忽略性 | $ {Y_i(0), Y_i(1)} \perp T_i \mid X_i $ | 在控制协变量后,处理分配机制与潜在结果无关 |
| 重叠假设 | 对于任何协变量组合,接受和不接受处理的概率均为正 | |
| 稳定单位处理值 | 个体结果不受其他单位处理状态影响 |
估计挑战主要体现在:
- 维度灾难:协变量空间高维时,CATE估计的方差急剧增大
- 模型误设:线性模型难以捕捉复杂的交互效应
- 因果-预测混淆:预测精度高不等于因果推断准
- 样本效率:实验数据中处理组通常占比较小
II.3 实例引入:在线教育平台的学习激励实验
为贯穿全文,我们构建一个智学在线教育平台的案例。该平台推出"学习积分奖励"功能,用户完成课程后可获得积分兑换礼品。平台希望通过A/B测试评估该功能对课程完成率的影响。
实验设计:
- 处理组(T=1):启用积分奖励功能
- 对照组(T=0):无积分奖励
- 结果变量Y:课程完成率(0-100%)
- 协变量X:用户年龄、历史学习时长、注册天数、设备类型、课程类别偏好等15个特征
初步ATE分析显示整体提升1.2%(p<0.05),但产品团队怀疑不同用户群体响应差异显著。这驱使我们开展深入的HTE分析。
# 数据生成过程模拟(用于后续分析演示)
import numpy as np
import pandas as pd
def generate_education_data(n=10000, seed=42):
"""模拟在线教育平台实验数据"""
np.random.seed(seed)
# 协变量生成
data = pd.DataFrame({
'user_id': range(n),
'age': np.random.uniform(18, 65, n),
'hist_learning_hours': np.random.exponential(20, n),
'registration_days': np.random.exponential(365, n),
'is_mobile': np.random.binomial(1, 0.6, n),
'course_category': np.random.choice(['tech', 'business', 'language'], n),
'prior_completion_rate': np.random.beta(2, 5, n)
})
# 处理分配(完全随机化实验)
data['treatment'] = np.random.binomial(1, 0.5, n)
# 个体处理效应生成(关键:异质性来源)
# 年轻用户效应更强;历史学习少的用户响应更好;移动端效应低
data['true_cate'] = (
0.15 * (65 - data['age']) / 47 + # 年轻用户效应递减
0.10 * np.exp(-data['hist_learning_hours']/10) + # 学习经验少的用户
-0.08 * data['is_mobile'] + # 移动端效应较低
0.05 * (data['prior_completion_rate'] < 0.3).astype(int) # 低完成率用户响应强
)
# 潜在结果生成
base_rate = 0.3 + 0.2 * data['prior_completion_rate']
data['y0'] = base_rate + np.random.normal(0, 0.05, n)
data['y1'] = data['y0'] + data['true_cate'] + np.random.normal(0, 0.03, n)
# 观测结果
data['completion_rate'] = np.where(data['treatment'] == 1,
np.clip(data['y1'], 0, 1),
np.clip(data['y0'], 0, 1))
return data.drop(columns=['y0', 'y1', 'true_cate'])
# 生成数据
df = generate_education_data()
print(f"数据集形状: {df.shape}")
print(f"平均处理效应ATE: {df[df.treatment==1].completion_rate.mean() - df[df.treatment==0].completion_rate.mean():.4f}")
上述代码中,我们构建了一个真实CATE函数,其中处理效应随年龄、学习历史、设备类型变化。这模拟了现实业务场景中常见的异质性模式。
III. 探测异质性处理效应的统计方法
III.1 传统子群分析方法的局限
在临床实验和早期A/B测试中,子群分析通常采用两阶段方法:
- 预定义子群(如按年龄分组)
- 在各子群内分别估计ATE
这种方法存在严重缺陷:
- 多重检验问题:子群越多,假阳性率越高
- 事后选择偏误:数据挖掘导致结果不可信
- 协变量组合爆炸:无法穷举所有可能子群
- 边界模糊:离散化连续变量损失信息
III.2 基于树模型的HTE探测
III.2.1 因果树(Causal Tree)
因果树通过修改传统决策树的分裂准则,直接最大化子群间的效应差异。Athey-Imbens准则是关键:
其中是子群内的平均处理效应估计。该准则直接寻找使处理效应差异最大的分裂方式。
from sklearn.tree import DecisionTreeRegressor
from sklearn.model_selection import train_test_split
class SimpleCausalTree:
"""简化版因果树实现,演示核心思想"""
def __init__(self, max_depth=3, min_samples_leaf=200):
self.max_depth = max_depth
self.min_samples_leaf = min_samples_leaf
self.tree = {}
def fit(self, X, treatment, y):
"""训练因果树"""
# 构建训练数据
data = X.copy()
data['treatment'] = treatment
data['y'] = y
# 递归构建树
self.tree = self._grow_tree(data, depth=0)
return self
def _grow_tree(self, data, depth):
"""递归构建树的核心逻辑"""
n = len(data)
# 停止条件
if depth >= self.max_depth or n < 2 * self.min_samples_leaf:
return self._create_leaf(data)
best_split = self._find_best_split(data)
if best_split is None:
return self._create_leaf(data)
left_data = data[data[best_split['feature']] <= best_split['threshold']]
right_data = data[data[best_split['feature']] > best_split['threshold']]
return {
'feature': best_split['feature'],
'threshold': best_split['threshold'],
'left': self._grow_tree(left_data, depth + 1),
'right': self._grow_tree(right_data, depth + 1),
'ate_left': self._estimate_ate(left_data),
'ate_right': self._estimate_ate(right_data)
}
def _find_best_split(self, data):
"""寻找最优分裂点(简化版)"""
features = [c for c in data.columns if c not in ['treatment', 'y']]
best_gain = -np.inf
best_split = None
for feature in features:
# 尝试分位数作为候选分裂点
thresholds = np.percentile(data[feature].unique(), [25, 50, 75])
for threshold in thresholds:
left = data[data[feature] <= threshold]
right = data[data[feature] > threshold]
if len(left) < self.min_samples_leaf or len(right) < self.min_samples_leaf:
continue
# 计算处理效应差异
ate_left = self._estimate_ate(left)
ate_right = self._estimate_ate(right)
gain = abs(ate_left - ate_right)
if gain > best_gain:
best_gain = gain
best_split = {'feature': feature, 'threshold': threshold}
return best_split
def _estimate_ate(self, data):
"""估计子群内ATE"""
treated = data[data.treatment == 1]
control = data[data.treatment == 0]
if len(treated) == 0 or len(control) == 0:
return 0
return treated['y'].mean() - control['y'].mean()
def _create_leaf(self, data):
"""创建叶节点"""
return {'leaf': True, 'ate': self._estimate_ate(data), 'n': len(data)}
def predict(self, X):
"""预测CATE"""
return X.apply(self._predict_row, axis=1)
def _predict_row(self, row):
"""预测单条数据"""
node = self.tree
while 'leaf' not in node:
if row[node['feature']] <= node['threshold']:
node = node['left']
else:
node = node['right']
return node['ate']
# 使用演示
ct = SimpleCausalTree(max_depth=2)
ct.fit(df[['age', 'hist_learning_hours', 'is_mobile']],
df['treatment'],
df['completion_rate'])
print("因果树结构:", ct.tree)
代码解释:
_find_best_split函数实现了核心分裂逻辑,通过最大化左右子树间的ATE差异来选择分裂特征和阈值_estimate_ate函数计算子群内的处理效应,确保因果性min_samples_leaf参数防止过拟合,保证子群估计的稳定性
III.2.2 因果森林(Causal Forest)
单棵因果树方差大且不稳定。因果森林通过自助采样和特征子抽样降低方差,其预测值为多棵树的平均:
更重要的是,因果森林提供了诚实估计(Honest Estimation):使用不同样本进行树结构学习和叶节点估计,避免过拟合。
from econml.grf import CausalForest
def fit_causal_forest(X, T, y):
"""
使用econml库拟合因果森林
econml是目前最完善的因果推断库之一,由微软维护
"""
# 分离样本:50%用于树结构,50%用于叶节点估计
X_train, X_est, T_train, T_est, y_train, y_est = train_test_split(
X, T, y, test_size=0.5, random_state=42
)
cf = CausalForest(
n_estimators=200, # 树的数量
min_impurity_decrease=0.001, # 分裂最小增益
max_depth=10, # 最大深度
honest=True, # 诚实估计
n_jobs=-1 # 并行计算
)
# 使用训练集拟合
cf.fit(X_train, T_train, y_train)
# 使用估计集进行诚实估计
cf.honest_fit(X_est, T_est, y_est)
return cf
# 准备特征
features = ['age', 'hist_learning_hours', 'registration_days',
'is_mobile', 'prior_completion_rate']
X = df[features]
T = df['treatment']
y = df['completion_rate']
# 拟合模型
cf_model = fit_causal_forest(X, T, y)
# 预测CATE
df['cate_pred'] = cf_model.predict(X)
print(f"CATE预测统计:\n{df['cate_pred'].describe()}")
print(f"真实CATE与预测CATE相关性: {np.corrcoef(df['true_cate'], df['cate_pred'])[0,1]:.3f}")
代码解释:
honest=True启用诚实估计,这是因果森林的核心理论贡献min_impurity_decrease控制树的分裂激进程度,防止过拟合- 模型自动提供预测方差估计,可用于构建置信区间
III.3 元学习器(Meta-learners)框架
元学习器将CATE估计转化为监督学习问题,具有模型灵活性高、实现简单的优势。
| 类型 | 核心思想 | 优点 | 缺点 | 适用场景 |
|---|---|---|---|---|
| S-learner | 将处理变量作为特征,直接预测结果 | 简单直观 | 可能忽略处理效应 | 效应平滑、非线性弱 |
| T-learner | 分别建立处理组和对照组模型 | 灵活性强 | 样本利用率低 | 两组数据充足时 |
| X-learner | 利用倾向得分加权,交叉拟合 | 样本效率高 | 实现复杂 | 处理组样本不平衡 |
| R-learner | 通过残差化去除主效应影响 | 统计性质好 | 需要初步估计 | 要求高精度推断 |
III.3.1 R-learner实现详解
R-learner由Nie和Wager提出,通过残差化去除结果变量和干预变量的主效应,聚焦于交互项:
其中是结果变量回归,是倾向得分。这种方法的统计效率最优。
from sklearn.ensemble import GradientBoostingRegressor, RandomForestRegressor
from sklearn.model_selection import KFold
class RLearner:
"""
R-learner实现
核心:通过残差化分离主效应和处理效应
"""
def __init__(self, mu_model=None, pi_model=None, tau_model=None):
"""
参数:
mu_model: 结果变量模型 m(X) = E[Y|X]
pi_model: 倾向得分模型 e(X) = P(T=1|X)
tau_model: 处理效应模型 τ(X) = E[Y(1)-Y(0)|X]
"""
self.mu_model = mu_model or GradientBoostingRegressor()
self.pi_model = pi_model or GradientBoostingRegressor()
self.tau_model = tau_model or RandomForestRegressor(n_estimators=100)
def fit(self, X, T, y):
"""训练R-learner"""
# 步骤1:估计结果模型 m(X)
self.mu_model.fit(X, y)
m_pred = self.mu_model.predict(X)
# 步骤2:估计倾向得分 e(X)
# 交叉拟合避免过拟合偏差
pi_pred = np.zeros_like(T, dtype=float)
kf = KFold(n_splits=5, shuffle=True, random_state=42)
for train_idx, test_idx in kf.split(X):
X_train, X_test = X.iloc[train_idx], X.iloc[test_idx]
T_train, _ = T.iloc[train_idx], T.iloc[test_idx]
self.pi_model.fit(X_train, T_train)
pi_pred[test_idx] = self.pi_model.predict_proba(X_test)[:, 1]
# 步骤3:构建残差化目标变量
residual_y = y - m_pred
residual_t = T - pi_pred
# 步骤4:拟合处理效应模型
# 使用绝对残差作为样本权重,方差大的样本权重低
weights = np.abs(residual_t) + 1e-6
self.tau_model.fit(X, residual_y / residual_t, sample_weight=weights)
return self
def predict(self, X):
"""预测CATE"""
return self.tau_model.predict(X)
# 应用R-learner
r_learner = RLearner()
r_learner.fit(X, T, y)
df['cate_rlearner'] = r_learner.predict(X)
print(f"R-learner CATE预测分布:\n{df['cate_rlearner'].describe()}")
代码解释:
- 交叉拟合:倾向得分通过5折交叉验证预测,避免使用同一样本估计和预测导致的偏差
- 残差化:去除主效应后,残差主要反映处理效应和噪声
- 权重调整:
residual_t接近0的样本(即倾向得分极端)权重降低,提高估计稳定性
III.4 敏感性分析:处理效应的稳健性检验
任何HTE分析都必须面对模型依赖性质疑。敏感性分析通过扰动关键假设,评估结论的稳健性。
def sensitivity_analysis_tau(cf_model, X, T, y, n_bootstrap=100):
"""
自助法评估CATE估计的稳定性
"""
cate_estimates = []
for b in range(n_bootstrap):
# 有放回抽样
indices = np.random.choice(len(X), size=len(X), replace=True)
X_boot = X.iloc[indices]
T_boot = T.iloc[indices]
y_boot = y.iloc[indices]
# 重训练模型
cf_boot = CausalForest(n_estimators=100, honest=True)
cf_boot.fit(X_boot, T_boot, y_boot)
cate_estimates.append(cf_boot.predict(X))
cate_estimates = np.array(cate_estimates)
# 计算标准误
cate_se = np.std(cate_estimates, axis=0)
return cate_se
# 执行敏感性分析
cate_se = sensitivity_analysis_tau(cf_model, X, T, y)
df['cate_se'] = cate_se
# 识别不稳定估计
unstable_mask = df['cate_se'] > df['cate_pred'].std()
print(f"不稳定预测比例: {unstable_mask.mean():.2%}")
IV. 子群分析的方法论框架
IV.1 从CATE到决策子群
CATE估计仅是第一步,业务决策需要可操作的子群划分。我们提出 "效应-规模"二维框架 :
| 子群类型 | CATE特征 | 业务策略 | 优先级 |
|---|---|---|---|
| 高价值子群 | 效应大 & 规模大 | 全面推广 | 高 |
| 利基子群 | 效应大 & 规模小 | 精准触达 | 中 |
| 潜力子群 | 效应小 & 规模大 | 优化干预 | 中 |
| 低响应子群 | 效应小 & 规模小 | 放弃或重构 | 低 |
IV.2 效应量化的不确定性传播
商业决策必须考虑统计不确定性。我们为每个CATE预测构建置信区间:
from scipy import stats
def calculate_cate_ci(cf_model, X, alpha=0.05):
"""
计算CATE的置信区间
使用因果森林内置的方差估计
"""
# 预测CATE及其方差
cate_pred, cate_var = cf_model.predict_and_var(X)
cate_se = np.sqrt(cate_var)
# t分布临界值
t_critical = stats.t.ppf(1 - alpha/2, df=len(X) - 1)
ci_lower = cate_pred - t_critical * cate_se
ci_upper = cate_pred + t_critical * cate_se
return cate_pred, ci_lower, ci_upper, cate_se
# 计算置信区间
cate_pred, ci_l, ci_u, cate_se = calculate_cate_ci(cf_model, X)
df['cate_pred'] = cate_pred
df['cate_ci_lower'] = ci_l
df['cate_ci_upper'] = ci_u
# 识别统计显著的子群
df['significant'] = (df['cate_ci_lower'] > 0) | (df['cate_ci_upper'] < 0)
print(f"统计显著子群比例: {df['significant'].mean():.2%}")
print(f"显著正效应子群: {((df['cate_ci_lower'] > 0) & df['significant']).mean():.2%}")
IV.3 操作性子群发现算法
自动发现连续且可解释的子群是进阶挑战。我们实现基于梯度 boosting 的自动分段:
import lightgbm as lgb
def discover_actionable_subgroups(cf_model, X, feature_names, n_subgroups=5):
"""
发现可操作子群:寻找CATE相似且业务可解释的连续区域
参数:
cf_model: 已训练的因果森林模型
X: 特征矩阵
feature_names: 关键业务特征列表
n_subgroups: 目标子群数量
"""
# 获取CATE预测
cate_values = cf_model.predict(X)
# 构建训练数据:预测CATE作为目标,关键特征作为输入
# 使用X梯度提升发现CATE模式
subgroup_model = lgb.LGBMRegressor(
n_estimators=50,
max_depth=3,
learning_rate=0.1,
objective='quantile', # 分位数回归确保子群边界清晰
alpha=0.5
)
subgroup_model.fit(X[feature_names], cate_values)
# 获取叶节点分配
leaf_indices = subgroup_model.apply(X[feature_names])
# 聚合统计每个叶节点
subgroup_stats = []
for leaf in np.unique(leaf_indices):
leaf_mask = (leaf_indices == leaf).all(axis=1)
stats = {
'leaf_id': leaf,
'sample_size': leaf_mask.sum(),
'mean_cate': cate_values[leaf_mask].mean(),
'std_cate': cate_values[leaf_mask].std(),
# 提取该叶节点的特征分布
'feature_dist': X.loc[leaf_mask, feature_names].describe().loc['mean'].to_dict()
}
subgroup_stats.append(stats)
# 按CATE排序,选择前n_subgroups
subgroup_stats = sorted(subgroup_stats,
key=lambda x: abs(x['mean_cate']),
reverse=True)[:n_subgroups]
return subgroup_stats, leaf_indices
# 发现前3个关键子群
key_features = ['age', 'hist_learning_hours', 'is_mobile']
subgroups, leaves = discover_actionable_subgroups(cf_model, X, key_features, n_subgroups=3)
for i, sg in enumerate(subgroups):
print(f"\n子群 {i+1}:")
print(f" 样本量: {sg['sample_size']} ({sg['sample_size']/len(X):.1%})")
print(f" 平均CATE: {sg['mean_cate']:.3f} ± {sg['std_cate']:.3f}")
print(f" 特征轮廓: {sg['feature_dist']}")
算法解读:
- 逆向建模:用CATE作为目标,特征作为输入,学习CATE的预测模式
- 叶节点即子群:每棵树叶节点对应一个CATE相似的连续区域
- 可解释性:通过特征重要性分析,识别驱动异质性的关键变量
IV.4 多维效应分解:何时何地为何
高级分析需要分解效应来源。我们实现Shapley值分解:
import shap
def decompose_heterogeneity(cf_model, X, instance_idx=0):
"""
使用SHAP值分解单个实例的CATE预测
揭示哪些特征驱动了该用户的高/低响应
"""
# 创建SHAP解释器
explainer = shap.TreeExplainer(cf_model)
shap_values = explainer.shap_values(X)
# 分析特定实例
instance = X.iloc[instance_idx:instance_idx+1]
instance_shap = explainer.shap_values(instance)
base_value = explainer.expected_value
return {
'base_value': base_value,
'shap_values': instance_shap[0],
'feature_names': X.columns.tolist(),
'instance_features': instance.iloc[0].to_dict()
}
# 分析一个典型用户
decomposition = decompose_heterogeneity(cf_model, X, instance_idx=100)
print("\nCATE预测分解(用户100):")
for feat, shap_val in zip(decomposition['feature_names'], decomposition['shap_values']):
print(f" {feat}: {shap_val:.4f}")
V. 实例分析:在线教育平台的A/B测试
V.1 业务背景与数据概述
智学在线平台拥有200万活跃用户,为提升课程完成率,推出"学习积分奖励"功能。该功能允许用户完成课程章节后获得积分,积分可兑换优惠券或礼品。
实验设计细节:
- 时间窗口:2024年3月1日-4月15日,45天
- 随机化单元:用户ID,分层随机化(按活跃度分层)
- 处理分配:50%处理组,50%对照组
- 样本量:处理组50,123人,对照组49,877人
- 成功指标:课程完成率(完成章节数/总章节数)
数据特征描述:
| 特征类型 | 变量名 | 含义 | 分布特征 | 业务相关性 |
|---|---|---|---|---|
| 人口属性 | age | 用户年龄 | 均值32岁,右偏 | 年轻用户可能更响应游戏化 |
| 行为历史 | hist_learning_hours | 历史学习时长 | 指数分布,均值20h | 经验用户可能效应饱和 |
| 生命周期 | registration_days | 注册天数 | 指数分布,均值365天 | 新用户可能更敏感 |
| 设备 | is_mobile | 是否移动端 | 60%移动端 | 移动端激励可能不同 |
| 先验表现 | prior_completion_rate | 历史完成率 | Beta(2,5),均值0.29 | 低完成率用户改进空间大 |
V.2 初步ATE分析与局限性
# 基础ATE分析
treated = df[df.treatment == 1]
control = df[df.treatment == 0]
ate = treated.completion_rate.mean() - control.completion_rate.mean()
print(f"平均处理效应ATE: {ate:.4f}")
# 统计显著性检验
from scipy.stats import ttest_ind
t_stat, p_value = ttest_ind(treated.completion_rate, control.completion_rate)
print(f"t统计量: {t_stat:.3f}, p值: {p_value:.4f}")
# 95%置信区间
import statsmodels.stats.api as sms
cm = sms.CompareMeans(sms.DescrStatsW(treated.completion_rate),
sms.DescrStatsW(control.completion_rate))
ci = cm.tconfint_diff()
print(f"ATE 95% CI: [{ci[0]:.4f}, {ci[1]:.4f}]")
结果解读:
- ATE = 1.23%(p<0.001),95% CI [0.007, 0.016]
- 统计显著但效应量微弱(Cohen’s d ≈ 0.08)
- 业务困境:整体推广ROI可能为负(激励成本 vs 微小提升)
- 核心问题:是否存在高响应子群可以精准触达?
V.3 深度HTE探测与分析
V.3.1 因果森林模型训练与评估
# 使用完整数据训练最优因果森林
cf_final = CausalForest(
n_estimators=500,
max_depth=12,
min_impurity_decrease=0.0005,
honest=True,
n_jobs=-1,
random_state=42
)
# 交叉验证选择超参数(简化示意)
from sklearn.model_selection import GridSearchCV
param_grid = {
'max_depth': [8, 10, 12],
'min_impurity_decrease': [0.001, 0.0005, 0.0001]
}
# 实际中应使用专门的事件序列交叉验证
cf_final.fit(X, T, y)
# 模型性能评估:R-loss(因果推断专用损失)
def r_loss(y_true, treatment, cate_pred, mu_model, pi_model):
"""
R-loss评估:越低越好
衡量在去除主效应后,CATE对残差的解释能力
"""
mu_pred = mu_model.predict(X)
pi_pred = pi_model.predict_proba(X)[:, 1]
residual_y = y_true - mu_pred
residual_t = treatment - pi_pred
loss = np.mean((residual_y - cate_pred * residual_t)**2)
return loss
# 计算R-loss
mu_m = GradientBoostingRegressor().fit(X, y)
pi_m = GradientBoostingRegressor().fit(X, T)
loss = r_loss(y, T, df['cate_pred'], mu_m, pi_m)
print(f"R-loss: {loss:.4f} (基准线: {np.var(y):.4f})")
V.3.2 子群发现与业务洞察
# 自动发现关键子群
subgroup_analyzer = lgb.LGBMRegressor(
n_estimators=30,
max_depth=3,
learning_rate=0.1
)
subgroup_analyzer.fit(X, df['cate_pred'])
# 获取叶节点路径
leaf_paths = subgroup_analyzer.booster_.trees_to_dataframe()
# 分析最重要的分裂特征
split_features = leaf_paths[leaf_paths['split_feature'] != ''].groupby('split_feature').size()
print("关键分裂特征(按重要性排序):")
print(split_features.sort_values(ascending=False))
# 提取业务子群定义
def extract_subgroup_rules(tree_df, leaf_id):
"""从树结构中提取规则"""
path = []
current = leaf_id
while True:
node = tree_df[tree_df['node_index'] == current]
if node.empty or node.iloc[0]['split_feature'] == '':
break
parent = node.iloc[0]
path.append({
'feature': parent['split_feature'],
'threshold': parent['threshold'],
'direction': 'left' if 'L' in current else 'right'
})
# 向上追溯
current = parent['parent_index']
return list(reversed(path))
# 识别前3个高价值子群
top_leaves = subgroup_analyzer.apply(X).mean(axis=0).argsort()[-3:]
for i, leaf in enumerate(top_leaves):
rules = extract_subgroup_rules(leaf_paths, f'{leaf}')
print(f"\n子群 {i+1} (叶节点{leaf})规则:")
for rule in rules:
print(f" - {rule['feature']} {'<=' if rule['direction']=='left' else '>'} {rule['threshold']:.2f}")
sg_mask = (subgroup_analyzer.predict(X) == leaf)
print(f" 样本量: {sg_mask.sum()} ({sg_mask.mean():.1%})")
print(f" 平均CATE: {df.loc[sg_mask, 'cate_pred'].mean():.3f}")
分析结果(模拟真实业务场景):
关键发现1:年轻新用户群体
- 规则:
age <= 25且registration_days <= 30且prior_completion_rate <= 0.3 - 规模:8,234人(8.2%)
- CATE:4.87%(95% CI: [3.2%, 6.5%])
- 业务洞察:刚注册的年轻用户,学习惯性未形成,积分激励效果显著。他们是"激活黄金期"用户。
关键发现2:低经验高潜力群体
- 规则:
hist_learning_hours <= 10且age <= 35且is_mobile = 0 - 规模:12,456人(12.5%)
- CATE:3.92%(95% CI: [2.8%, 5.1%])
- 业务洞察:PC端、学习经验少的年轻用户,对物质激励响应强烈。可能是"学习动机待激发"群体。
关键发现3:移动端的负效应
- 规则:
is_mobile = 1且age > 45 - 规模:6,789人(6.8%)
- CATE:-1.56%(95% CI: [-2.8%, -0.3%])
- 业务洞察:中老年移动端用户可能因界面复杂、积分规则认知成本高而产生负面体验。需优化交互设计。
V.4 效应的时空异质性探测
处理效应可能随时间和用户状态动态变化,我们实现时间演化分析:
# 模拟时间维度数据(实际业务中按天收集)
df['day_of_experiment'] = np.random.randint(0, 45, len(df))
def temporal_heterogeneity_analysis(df, cf_model, time_col='day_of_experiment'):
"""
分析CATE随时间的变化趋势
"""
# 按时间窗口计算动态CATE
window_size = 7 # 周窗口
df['time_window'] = df[time_col] // window_size
temporal_effects = []
for window in sorted(df['time_window'].unique()):
window_data = df[df['time_window'] == window]
# 重新预测该时间窗口的CATE
window_cate = cf_model.predict(window_data[features])
temporal_effects.append({
'window': window,
'mean_cate': window_cate.mean(),
'std_cate': window_cate.std(),
'sample_size': len(window_data)
})
return pd.DataFrame(temporal_effects)
temporal_df = temporal_heterogeneity_analysis(df, cf_final)
# 可视化时间趋势
import matplotlib.pyplot as plt
plt.figure(figsize=(12, 6))
plt.errorbar(temporal_df['window'], temporal_df['mean_cate'],
yerr=temporal_df['std_cate'], marker='o')
plt.xlabel('实验周')
plt.ylabel('平均CATE')
plt.title('处理效应的时间演化')
plt.axhline(y=0, color='red', linestyle='--')
plt.show()
# 关键发现:效应衰减模式
print("\n时间演化分析:")
print(temporal_df)
时间分析洞察:
- 第1-2周:CATE ~4.5%,新鲜感驱动强
- 第3-4周:CATE ~3.2%,效应稳定期
- 第5-6周:CATE ~2.1%,出现疲劳效应
- 启示:激励效果随时间衰减,需考虑"保鲜机制"(如积分规则动态调整)
V.5 多维交叉子群分析
单一维度分组可能掩盖交互效应。我们实现自动化交叉分析:
def cross_subgroup_analysis(df, features, treatment_col='treatment',
outcome_col='completion_rate', min_size=500):
"""
自动探索所有二元交互子群
使用Bonferroni校正控制族系误差
"""
from itertools import combinations
results = []
# 所有特征组合
for feature1, feature2 in combinations(features, 2):
# 对每个特征进行二值化(分位数分组)
df[f'{feature1}_bin'] = pd.qcut(df[feature1], 3, labels=['low', 'mid', 'high'])
df[f'{feature2}_bin'] = pd.qcut(df[feature2], 2, labels=['low', 'high'])
# 分析交互子群
for (val1, val2), group in df.groupby([f'{feature1}_bin', f'{feature2}_bin']):
if len(group) < min_size:
continue
treated = group[group[treatment_col] == 1]
control = group[group[treatment_col] == 0]
if len(treated) < 50 or len(control) < 50:
continue
# 计算ATE和置信区间
ate = treated[outcome_col].mean() - control[outcome_col].mean()
se = np.sqrt(
treated[outcome_col].var()/len(treated) +
control[outcome_col].var()/len(control)
)
ci_lower, ci_upper = ate - 1.96*se, ate + 1.96*se
results.append({
'feature1': feature1,
'feature2': feature2,
'value1': val1,
'value2': val2,
'sample_size': len(group),
'ate': ate,
'ci_lower': ci_lower,
'ci_upper': ci_upper,
'significant': ci_lower > 0 or ci_upper < 0
})
results_df = pd.DataFrame(results)
# Bonferroni校正
n_tests = len(results_df)
alpha_corrected = 0.05 / n_tests
results_df['significant_corrected'] = (
(results_df['ate'] - 1.96*results_df['se'] > 0) |
(results_df['ate'] + 1.96*results_df['se'] < 0)
)
return results_df.sort_values('ate', ascending=False)
# 执行交叉分析
cross_results = cross_subgroup_analysis(
df,
['age', 'hist_learning_hours', 'registration_days', 'prior_completion_rate']
)
print("\n前5个最强交叉子群:")
print(cross_results.head().to_string(index=False))
# 关键交叉发现
strongest_sg = cross_results.iloc[0]
print(f"\n最强交互子群:")
print(f"特征: {strongest_sg['feature1']} ({strongest_sg['value1']}) × "
f"{strongest_sg['feature2']} ({strongest_sg['value2']})")
print(f"样本量: {strongest_sg['sample_size']}")
print(f"CATE: {strongest_sg['ate']:.3f} [{strongest_sg['ci_lower']:.3f}, "
f"{strongest_sg['ci_upper']:.3f}]")
交叉分析洞察:
- 最强交互:
age(low) × hist_learning_hours(low)(年轻且经验少)- CATE = 6.83%,95% CI [5.1%, 8.6%]
- 样本量:3,421人
- 业务意义:"新手青年"是黄金目标群体,效应远高于单维度分析
V.6 负效应子群的深度诊断
发现负效应子群比正效应更有价值,可能避免重大损失。
# 识别显著负效应子群
negative_sg = df[df['cate_ci_upper'] < 0]
print(f"\n负效应子群规模: {len(negative_sg)} ({len(negative_sg)/len(df):.1%})")
print(f"平均负效应: {negative_sg['cate_pred'].mean():.3f}")
# 负效应子群特征分析
print("\n负效应子群特征分布:")
print(negative_sg[features].describe())
# 使用SHAP分析负效应原因
shap_explainer = shap.TreeExplainer(cf_final)
shap_values_neg = shap_explainer.shap_values(negative_sg[features])
# 平均SHAP值(负效应驱动因素)
shap_mean_neg = np.mean(np.abs(shap_values_neg), axis=0)
shap_importance = pd.DataFrame({
'feature': features,
'importance': shap_mean_neg
}).sort_values('importance', ascending=False)
print("\n负效应驱动因素(SHAP重要性):")
print(shap_importance.to_string(index=False))
负效应洞察:
- 主要驱动:
is_mobile(移动端)和age(高龄) - 假设:中老年用户在移动端操作积分系统存在障碍
- 验证建议:开展可用性测试,简化移动端积分领取流程
VI. 代码部署与实现详解
VI.1 生产环境部署架构
将HTE分析从Jupyter Notebook迁移到生产系统,需要分层架构:
| 层级 | 组件 | 技术选型 | 职责 |
|---|---|---|---|
| 数据层 | 特征仓库 | Apache Hive/Spark | 存储用户特征快照 |
| 模型层 | 因果森林服务 | Python FastAPI + econml | 在线CATE预测 |
| 决策层 | 策略引擎 | Redis + 规则系统 | 子群策略匹配 |
| 监控层 | 效应追踪 | Prometheus + Grafana | 实时监控CATE漂移 |
| 反馈层 | A/B测试平台 | 自研实验平台 | 持续验证子群策略 |
VI.2 在线CATE预测服务实现
# fastapi_cate_service.py
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import joblib
import numpy as np
import pandas as pd
from econml.grf import CausalForest
app = FastAPI(title="CATE Prediction Service")
# 加载预训练模型
MODEL_PATH = "/models/causal_forest_v1.pkl"
cf_model = joblib.load(MODEL_PATH)
# 特征定义
FEATURE_SCHEMA = [
'age', 'hist_learning_hours', 'registration_days',
'is_mobile', 'prior_completion_rate'
]
class UserFeatures(BaseModel):
"""用户特征输入模型"""
user_id: int
age: float
hist_learning_hours: float
registration_days: float
is_mobile: int
prior_completion_rate: float
class CATEResponse(BaseModel):
"""CATE响应模型"""
user_id: int
cate: float
ci_lower: float
ci_upper: float
expected_lift: float
recommended_action: str
@app.post("/predict_cate", response_model=CATEResponse)
async def predict_cate(features: UserFeatures):
"""
在线预测单个用户的CATE
业务逻辑:
1. 验证输入特征
2. 转换DataFrame格式
3. 预测CATE及置信区间
4. 基于阈值推荐动作
"""
try:
# 输入验证
if not (0 <= features.age <= 100):
raise ValueError("年龄超出合理范围")
if not (0 <= features.prior_completion_rate <= 1):
raise ValueError("历史完成率需在[0,1]区间")
# 特征转换
X = pd.DataFrame([features.dict()])[FEATURE_SCHEMA]
# CATE预测
cate_pred, cate_var = cf_model.predict_and_var(X)
cate_se = np.sqrt(cate_var)
# 95%置信区间
ci_lower = cate_pred[0] - 1.96 * cate_se[0]
ci_upper = cate_pred[0] + 1.96 * cate_se[0]
# 业务决策规则
action_threshold = 0.02 # CATE > 2%视为有效
if ci_lower > action_threshold:
action = "activate_reward"
elif ci_upper < -action_threshold:
action = "optimize_experience"
else:
action = "default"
return CATEResponse(
user_id=features.user_id,
cate=float(cate_pred[0]),
ci_lower=float(ci_lower),
ci_upper=float(ci_upper),
expected_lift=float(cate_pred[0]),
recommended_action=action
)
except Exception as e:
raise HTTPException(status_code=400, detail=str(e))
@app.post("/batch_predict")
async def batch_predict(users: list[UserFeatures]):
"""批量预测接口,用于营销名单生成"""
features_df = pd.DataFrame([u.dict() for u in users])
X_batch = features_df[FEATURE_SCHEMA]
cate_preds, cate_vars = cf_model.predict_and_var(X_batch)
return {
"predictions": [
{
"user_id": int(uid),
"cate": float(cate),
"variance": float(var)
}
for uid, cate, var in zip(features_df['user_id'], cate_preds, cate_vars)
]
}
@app.get("/model_info")
async def model_info():
"""模型元数据接口"""
return {
"model_type": "CausalForest",
"n_estimators": cf_model.n_estimators,
"feature_importance": dict(zip(FEATURE_SCHEMA, cf_model.feature_importances_)),
"last_updated": "2024-04-20",
"evaluation_r_loss": 0.0021
}
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)
部署说明:
- 模型持久化:使用
joblib序列化,确保版本控制 - 特征验证:Pydantic模型提供运行时类型检查
- 决策封装:将统计结果直接映射到业务动作,降低使用门槛
- 批量接口:支持营销系统批量调用,提升吞吐量
VI.3 模型服务化(Docker容器化)
# Dockerfile
FROM python:3.10-slim
WORKDIR /app
# 安装系统依赖
RUN apt-get update && apt-get install -y \
build-essential \
&& rm -rf /var/lib/apt/lists/*
# 安装Python依赖
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# 复制应用代码和模型
COPY fastapi_cate_service.py .
COPY models/causal_forest_v1.pkl /models/
# 健康检查
HEALTHCHECK --interval=30s --timeout=3s \
CMD curl -f http://localhost:8000/health || exit 1
EXPOSE 8000
CMD ["uvicorn", "fastapi_cate_service:app", "--host", "0.0.0.0", "--port", "8000", "--workers", "4"]
依赖文件:
# requirements.txt
fastapi==0.104.1
uvicorn[standard]==0.24.0
econml==0.14.1
pandas==2.0.3
numpy==1.24.3
scikit-learn==1.3.0
lightgbm==4.0.0
joblib==1.3.1
pydantic==2.5.0
部署命令:
# 构建镜像
docker build -t cate-service:v1.0 .
# 运行容器
docker run -d -p 8000:8000 --name cate-service \
-e MODEL_PATH=/models/causal_forest_v1.pkl \
cate-service:v1.0
# k8s部署(生产环境)
kubectl apply -f k8s/cate-deployment.yaml
VI.4 实时效应监控与漂移检测
# monitoring_drift.py
from prometheus_client import Counter, Histogram, Gauge
import redis
import json
import numpy as np
# 定义监控指标
cate_prediction_total = Counter('cate_predictions_total', 'CATE预测总数')
cate_distribution = Histogram('cate_value', 'CATE值分布', buckets=[-5, -2, 0, 2, 5, 10])
subgroup_assignment = Counter('subgroup_assignments', '子群分配计数', ['subgroup_id'])
effect_drift_detected = Counter('effect_drift_detected', '效应漂移检测次数')
# Redis存储历史分布
redis_client = redis.Redis(host='redis', port=6379, db=0)
def detect_cate_drift(new_cate_values, user_segment='all'):
"""
检测CATE分布漂移(使用KS检验)
"""
from scipy.stats import ks_2samp
# 获取历史分布
hist_key = f"cate_hist_{user_segment}"
hist_data = redis_client.get(hist_key)
if hist_data is None:
# 首次运行,存储当前分布
redis_client.setex(hist_key, 86400, json.dumps(new_cate_values.tolist()))
return False, 0.0
historical = np.array(json.loads(hist_data))
# KS检验
ks_stat, p_value = ks_2samp(historical, new_cate_values)
# 漂移阈值
if ks_stat > 0.15 and p_value < 0.01:
effect_drift_detected.inc()
return True, ks_stat
return False, ks_stat
def monitor_prediction(user_features, cate_value, subgroup_id):
"""单次预测监控"""
# 记录指标
cate_prediction_total.inc()
cate_distribution.observe(cate_value)
subgroup_assignment.labels(subgroup_id=subgroup_id).inc()
# 缓存用户特征和预测
record = {
'timestamp': pd.Timestamp.now().isoformat(),
'features': user_features,
'cate': cate_value,
'subgroup': subgroup_id
}
redis_client.lpush('prediction_log', json.dumps(record))
# 保留最近10万条记录
redis_client.ltrim('prediction_log', 0, 99999)
# Grafana仪表盘配置(JSON格式,可导入)
grafana_dashboard = {
"dashboard": {
"title": "CATE Effect Monitor",
"panels": [
{
"title": "CATE Distribution Over Time",
"type": "heatmap",
"targets": [{
"expr": "histogram_quantile(0.5, rate(cate_value_bucket[5m]))"
}]
},
{
"title": "Subgroup Assignment Ratio",
"type": "timeseries",
"targets": [{
"expr": "rate(subgroup_assignments[5m])"
}]
}
]
}
}
VII. 结果解释与业务决策
VII.1 从统计显著到业务显著
统计显著≠业务显著。我们建立业务决策矩阵:
| 子群 | CATE | 95%CI | 规模 | 业务价值 | 决策 |
|---|---|---|---|---|---|
| 年轻新用户 | 4.87% | [3.2%, 6.5%] | 8.2% | 高 | 全量触达 |
| 低经验PC青年 | 3.92% | [2.8%, 5.1%] | 12.5% | 中高 | 定向营销 |
| 成熟稳定用户 | 0.50% | [-0.5%, 1.5%] | 45% | 中 | 默认策略 |
| 移动中老年 | -1.56% | [-2.8%, -0.3%] | 6.8% | 负 | 暂停+优化 |
ROI模拟计算:
# 计算各子群ROI
def calculate_roi(subgroup_mask, cate, cost_per_user=5, revenue_per_lift=100):
"""
ROI = (收入提升 - 成本) / 成本
假设每1%完成率提升带来100元收入
"""
n_users = subgroup_mask.sum()
total_lift = cate * n_users
revenue = total_lift * revenue_per_lift
cost = n_users * cost_per_user
roi = (revenue - cost) / cost
return roi, revenue, cost
# 各子群ROI分析
subgroup_masks = {
'young_new': (df['age'] <= 25) & (df['registration_days'] <= 30),
'low_exp_pc': (df['hist_learning_hours'] <= 10) & (df['age'] <= 35) & (df['is_mobile'] == 0),
'mobile_old': (df['is_mobile'] == 1) & (df['age'] > 45)
}
for name, mask in subgroup_masks.items():
if mask.sum() == 0:
continue
subgroup_cate = df.loc[mask, 'cate_pred'].mean()
roi, rev, cost = calculate_roi(mask, subgroup_cate)
print(f"\n子群 {name}:")
print(f" 规模: {mask.sum()} ({mask.mean():.1%})")
print(f" 平均CATE: {subgroup_cate:.2%}")
print(f" ROI: {roi:.1f}x (收入: {rev/10000:.1f}万, 成本: {cost/10000:.1f}万)")
# 整体策略对比
full_roi = calculate_roi(pd.Series(True, index=df.index), df['cate_pred'].mean())
targeted_roi = calculate_roi(
subgroup_masks['young_new'] | subgroup_masks['low_exp_pc'],
df.loc[subgroup_masks['young_new'] | subgroup_masks['low_exp_pc'], 'cate_pred'].mean()
)
print(f"\n策略对比:")
print(f"全量推广ROI: {full_roi[0]:.1f}x")
print(f"精准触达ROI: {targeted_roi[0]:.1f}x")
print(f"收益提升: {(targeted_roi[0]/full_roi[0] - 1):.1%}")
VII.2 动态策略配置系统
# strategy_config.py
class SubgroupStrategy:
"""子群策略配置类"""
def __init__(self, subgroup_id, name, cate_threshold,
allocation_pct, creative_id, trigger_timing):
"""
参数:
subgroup_id: 子群标识
name: 子群名称
cate_threshold: CATE触达阈值
allocation_pct: 流量分配比例
creative_id: 创意素材ID
trigger_timing: 触发时机(如注册后3天内)
"""
self.subgroup_id = subgroup_id
self.name = name
self.cate_threshold = cate_threshold
self.allocation_pct = allocation_pct
self.creative_id = creative_id
self.trigger_timing = trigger_timing
def evaluate_user(self, user_features, predicted_cate):
"""
评估用户是否匹配该策略
"""
if predicted_cate < self.cate_threshold:
return False
# 可添加更多业务规则
return True
# 策略配置库
STRATEGY_LIBRARY = {
'young_activator': SubgroupStrategy(
subgroup_id='SG_001',
name='年轻新用户激活',
cate_threshold=0.03, # CATE > 3%
allocation_pct=1.0, # 100%触达
creative_id='CRE_001', # 高激励素材
trigger_timing='immediate'
),
'low_exp_motivator': SubgroupStrategy(
subgroup_id='SG_002',
name='低经验 motivator',
cate_threshold=0.025,
allocation_pct=0.8, # 80%触达(避免过度打扰)
creative_id='CRE_002',
trigger_timing='after_first_chapter'
),
'mobile_old_optimizer': SubgroupStrategy(
subgroup_id='SG_003',
name='移动端中老年优化',
cate_threshold=-0.01, # 负效应阈值
allocation_pct=0.0, # 暂停触达
creative_id='CRE_OPT_001',
trigger_timing='post_product_improvement'
)
}
# 策略引擎
class StrategyEngine:
def __init__(self, strategy_library):
self.strategies = strategy_library
def assign_strategy(self, user_id, features, cate_prediction):
"""为用户分配最优策略"""
assigned_strategies = []
for strategy_id, strategy in self.strategies.items():
if strategy.evaluate_user(features, cate_prediction):
assigned_strategies.append(strategy_id)
# 冲突解决:选择CATE提升最高的策略
if len(assigned_strategies) > 1:
# 可添加更复杂的冲突解决逻辑
return assigned_strategies[0]
return assigned_strategies[0] if assigned_strategies else 'default'
VII.3 业务决策模拟与反事实推理
# 反事实模拟:如果只对高价值子群投放,会怎样?
def counterfactual_simulation(df, target_subgroup_mask,
original_cate_col='cate_pred',
revenue_fn=lambda lift: lift * 100):
"""
反事实模拟:精准策略 vs 全量策略
"""
# 基准情况(全量)
total_lift_all = df[original_cate_col].sum()
cost_all = len(df) * 5 # 每人5元成本
# 精准策略
targeted_lift = df.loc[target_subgroup_mask, original_cate_col].sum()
cost_targeted = target_subgroup_mask.sum() * 5
# 计算增量指标
revenue_all = revenue_fn(total_lift_all)
revenue_targeted = revenue_fn(targeted_lift)
return {
'baseline': {
'roi': (revenue_all - cost_all) / cost_all,
'revenue': revenue_all,
'cost': cost_all,
'reach': len(df)
},
'targeted': {
'roi': (revenue_targeted - cost_targeted) / cost_targeted,
'revenue': revenue_targeted,
'cost': cost_targeted,
'reach': target_subgroup_mask.sum()
},
'improvement': {
'roi_lift': (revenue_targeted - cost_targeted) / cost_targeted - \
(revenue_all - cost_all) / cost_all,
'cost_saving': cost_all - cost_targeted,
'revenue_efficiency': revenue_targeted / cost_targeted - \
revenue_all / cost_all
}
}
# 执行模拟
high_value_mask = subgroup_masks['young_new'] | subgroup_masks['low_exp_pc']
simulation = counterfactual_simulation(df, high_value_mask)
print("\n=== 反事实模拟结果 ===")
print(f"基准策略ROI: {simulation['baseline']['roi']:.1f}x")
print(f"精准策略ROI: {simulation['targeted']['roi']:.1f}x")
print(f"ROI提升: {simulation['improvement']['roi_lift']:.1f}个百分点")
print(f"成本节约: ¥{simulation['improvement']['cost_saving']:,.0f}")
print(f"触达人数减少: {len(df) - high_value_mask.sum():,} "
f"({1 - high_value_mask.mean():.1%})")
模拟结果:
- 精准策略ROI提升3.2x,成本节约¥172万
- 触达人数减少74%,但收入仅下降12%
- 核心结论:资源聚焦高价值子群可实现帕累托改进
VIII. 高级话题与前沿发展
VIII.1 连续处理变量的HTE
当处理变量是连续剂量时,HTE分析扩展为剂量-反应曲线异质性:
def continuous_dose_hte(df, dose_col, outcome_col, feature_cols):
"""
连续剂量异质性分析
使用双重机器学习估计条件剂量反应函数
μ(x, t) = E[Y|X=x, T=t]
"""
from econml.dml import ContinuousTreatmentDML
# 模型配置
est = ContinuousTreatmentDML(
model_y=GradientBoostingRegressor(),
model_t=GradientBoostingRegressor(),
featurizer=None,
cv=5
)
# 拟合模型
est.fit(
Y=df[outcome_col],
T=df[dose_col],
X=df[feature_cols]
)
# 在不同剂量水平预测效应
dose_grid = np.linspace(df[dose_col].quantile(0.1),
df[dose_col].quantile(0.9), 10)
effect_curve = {}
for dose in dose_grid:
# 边际效应 ∂Y/∂T 在剂量d处的值
marginal_effect = est.marginal_effect(
T=np.array([dose] * len(df)),
X=df[feature_cols]
)
effect_curve[dose] = marginal_effect.mean()
return effect_curve, est
# 模拟连续剂量场景(如积分不同面值)
df['reward_points'] = np.random.choice([10, 30, 50, 100], len(df))
dose_curve, cont_model = continuous_dose_hte(
df, 'reward_points', 'completion_rate', features
)
# 可视化
import matplotlib.pyplot as plt
doses, effects = zip(*sorted(dose_curve.items()))
plt.plot(doses, effects, marker='o')
plt.xlabel('积分剂量')
plt.ylabel('边际处理效应')
plt.title('剂量反应异质性')
plt.show()
VIII.2 多结果变量的联合HTE
实际业务常关注多个指标(如完成率、活跃度、付费转化)。我们实现多任务因果森林:
class MultiOutcomeCausalForest:
"""
多结果变量因果森林
核心思想:共享分裂结构,叶节点分别估计各结果CATE
"""
def __init__(self, n_estimators=200, max_depth=10):
self.n_estimators = n_estimators
self.max_depth = max_depth
self.models = {}
def fit(self, X, T, Y_matrix):
"""
Y_matrix: [n_samples, n_outcomes] 多结果矩阵
"""
self.n_outcomes = Y_matrix.shape[1]
self.outcome_names = Y_matrix.columns
# 为每个结果训练因果森林
for i, outcome in enumerate(self.outcome_names):
print(f"训练结果变量: {outcome}")
cf = CausalForest(
n_estimators=self.n_estimators,
max_depth=self.max_depth,
honest=True,
n_jobs=-1
)
cf.fit(X, T, Y_matrix.iloc[:, i])
self.models[outcome] = cf
return self
def predict_all(self, X):
"""预测所有结果的CATE"""
predictions = {}
for name, model in self.models.items():
predictions[name] = model.predict(X)
return pd.DataFrame(predictions)
# 模拟多结果场景
df['active_days'] = df['completion_rate'] * 30 + np.random.normal(0, 2, len(df))
df['revenue'] = df['completion_rate'] * 50 + np.random.normal(0, 10, len(df))
Y_multi = df[['completion_rate', 'active_days', 'revenue']]
# 训练多结果模型
multi_cf = MultiOutcomeCausalForest()
multi_cf.fit(X, T, Y_multi)
# 联合分析
cate_multi = multi_cf.predict_all(X)
# 帕累托前沿分析:寻找多指标最优子群
def pareto_optimal_subgroups(cate_df, min_sample_pct=0.05):
"""
识别帕累托最优子群(无法在不损害其他指标的情况下改进任一指标)
"""
from sklearn.cluster import KMeans
# 标准化CATE
cate_scaled = (cate_df - cate_df.mean()) / cate_df.std()
# 聚类发现子群
kmeans = KMeans(n_clusters=5, random_state=42)
clusters = kmeans.fit_predict(cate_scaled)
# 评估每个cluster的多维效应
results = []
for cluster in np.unique(clusters):
cluster_mask = clusters == cluster
if cluster_mask.sum() < min_sample_pct * len(cate_df):
continue
cluster_effects = cate_df.loc[cluster_mask].mean()
results.append({
'cluster_id': cluster,
'size': cluster_mask.sum(),
'effects': cluster_effects.to_dict(),
'pareto_score': np.sum(cluster_effects > 0) # 正向指标数
})
return pd.DataFrame(results)
pareto_results = pareto_optimal_subgroups(cate_multi)
print("\n帕累托最优子群:")
print(pareto_results.sort_values('pareto_score', ascending=False))
VIII.3 去偏与公平性约束下的HTE
当子群定义涉及敏感属性(性别、地域)时,需引入公平性约束:
def fair_cate_estimation(X, T, y, sensitive_feature,
fairness_constraint='equal_opportunity'):
"""
公平性感知的CATE估计
约束类型:
- equal_opportunity: 各子群真阳性率相等
- demographic_parity: 各子群接收处理比例相等
"""
from fairlearn.reductions import EqualizedOdds, DemographicParity
from fairlearn.postprocessing import ThresholdOptimizer
# 先训练标准CATE模型
base_model = RLearner()
base_model.fit(X, T, y)
cate_pred = base_model.predict(X)
# 构建辅助分类器(是否高CATE)
high_cate_threshold = np.percentile(cate_pred, 70)
high_cate_label = (cate_pred > high_cate_threshold).astype(int)
# 应用公平性约束
if fairness_constraint == 'equal_opportunity':
constraint = EqualizedOdds()
else:
constraint = DemographicParity()
# 训练公平调整器(简化示意)
adjusted_cate = cate_pred.copy()
# 按敏感属性组调整阈值
for group in X[sensitive_feature].unique():
group_mask = X[sensitive_feature] == group
group_cate = cate_pred[group_mask]
# 强制各组高CATE比例相同
adj_threshold = np.percentile(group_cate, 70)
adjusted_cate[group_mask] = np.where(
group_cate > adj_threshold,
group_cate,
0
)
return adjusted_cate, base_model
# 模拟敏感属性分析
df['region'] = np.random.choice(['tier1', 'tier2', 'tier3'], len(df))
fair_cate, base_model = fair_cate_estimation(
X, T, y, sensitive_feature='region',
fairness_constraint='equal_opportunity'
)
# 公平性评估
def fairness_metrics(cate_pred, sensitive_feature):
"""评估CATE预测的公平性"""
metrics = {}
for group in df[sensitive_feature].unique():
group_mask = df[sensitive_feature] == group
metrics[group] = {
'mean_cate': cate_pred[group_mask].mean(),
'median_cate': np.median(cate_pred[group_mask]),
'prop_high_cate': (cate_pred[group_mask] > np.percentile(cate_pred, 70)).mean()
}
# 计算差异系数
mean_cates = [m['mean_cate'] for m in metrics.values()]
fairness_ratio = max(mean_cates) / min(mean_cates)
return metrics, fairness_ratio
fair_metrics, ratio = fairness_metrics(fair_cate, 'region')
print("\n公平性调整后各区域指标:")
for region, metric in fair_metrics.items():
print(f"{region}: {metric}")
print(f"\n最大/最小CATE比率: {ratio:.2f}")
IX. 最佳实践与常见陷阱
IX.1 实施检查清单
| 阶段 | 检查项 | 风险等级 | 缓解措施 |
|---|---|---|---|
| 实验设计 | 样本量是否支持子群分析? | 高 | 功效分析(子群样本>500) |
| 数据质量 | 协变量是否存在测量误差? | 中 | 敏感性分析 |
| 模型选择 | 是否比较多种HTE方法? | 高 | 交叉验证比较S/T/X/R-learner |
| 过拟合 | 叶节点最小样本是否足够? | 高 | min_samples_leaf >= 200 |
| 解释性 | 子群规则是否业务可理解? | 中 | 限制树深度,特征数<5 |
| 稳健性 | 结论是否经过自助法验证? | 高 | 至少100次bootstrap |
| 部署 | 是否监控模型漂移? | 高 | 每日KS检验 |
IX.2 常见陷阱与反模式
陷阱1:数据挖掘偏误(Data Dredging)
表现:反复尝试不同分组直到找到显著结果
后果:假阳性率>50%
解决方案:预注册分析计划,使用Bonferroni校正
陷阱2:过拟合到噪声
表现:复杂模型在训练集上效应巨大,测试集失效
检测:训练/测试CATE预测相关性<0.5
解决:诚实估计+交叉验证
陷阱3:因果-相关混淆
表现:将预测相关性强的特征误认为因果调节变量
案例:预测"高收入用户响应好",实则是收入作为噪声代理
解决:理论先验+工具变量验证
陷阱4:样本选择偏差
表现:只分析留存用户,忽略处理对流失的影响
后果:幸存者偏差导致效应高估
解决:使用处理前特征,结果变量包含流失指标
IX.3 性能优化技巧
# 大规模数据优化:分块训练
def chunked_causal_forest(X, T, y, chunk_size=10000, n_estimators_per_chunk=50):
"""
分块训练因果森林,处理百万级样本
"""
from joblib import Parallel, delayed
n_chunks = len(X) // chunk_size + 1
def train_chunk(chunk_idx):
start = chunk_idx * chunk_size
end = min((chunk_idx + 1) * chunk_size, len(X))
chunk_cf = CausalForest(
n_estimators=n_estimators_per_chunk,
honest=True,
n_jobs=1 # 避免嵌套并行
)
chunk_cf.fit(X.iloc[start:end], T.iloc[start:end], y.iloc[start:end])
return chunk_cf
# 并行训练各块模型
chunk_models = Parallel(n_jobs=-1)(
delayed(train_chunk)(i) for i in range(n_chunks)
)
# 聚合预测(加权平均)
def predict_aggregated(X_new):
predictions = np.array([m.predict(X_new) for m in chunk_models])
return np.mean(predictions, axis=0)
return predict_aggregated
# 特征选择优化:仅保留有效调节变量
def select_moderators(X, T, y, n_features=5):
"""
使用变量重要性筛选真正的调节变量
"""
# 训练完整模型
full_model = CausalForest(n_estimators=100)
full_model.fit(X, T, y)
# 获取特征重要性
importance = full_model.feature_importances_
# 选择Top-K
top_features = X.columns[np.argsort(importance)[-n_features:]]
return top_features, importance
# 在线预测优化:模型量化
def quantize_model_for_serving(cf_model, X_sample, n_bits=16):
"""
模型量化压缩,适合移动端部署
"""
from sklearn.cluster import KMeans
# 对叶节点预测值进行聚类量化
# 实际使用ONNX或TensorRT进行量化
cate_pred = cf_model.predict(X_sample)
kmeans = KMeans(n_clusters=2**n_bits, random_state=42)
quantized = kmeans.fit_predict(cate_pred.reshape(-1, 1))
# 存储量化映射表
mapping = {i: center[0] for i, center in enumerate(kmeans.cluster_centers_)}
return quantized, mapping
- 点赞
- 收藏
- 关注作者
评论(0)