面板数据因果推断:交互固定效应与共同趋势检验
I. 交互固定效应模型的数学框架
1.1 从TWFE到IFE:问题意识的演进
传统TWFE模型设定为:
其中为处理指示变量,为待估的因果效应。识别假设要求:
I. 共同趋势:
II. 无预期效应:不影响干预前的结果
III. 排他性:处理仅通过影响结果
实例分析:电商平台动态定价效应评估
某电商平台2023年对100个SKU实施动态定价策略,希望评估其对销量的影响。数据为月度面板(2022-2024)。传统TWFE假设所有SKU的时间趋势仅受共同季节因子影响。但实际情况是:新品SKU的趋势由产品生命周期决定(引入因子),而老品由库存周期决定(清仓因子)。这些时变因子与处理变量相关,导致TWFE估计量混杂了生命周期效应,产生负偏误(低估价格效应)。
1.2 IFE模型的参数化设定
IFE模型将潜在结果建模为:
其中为个体载荷向量,为时间因子向量。交互项捕捉了异质性时间趋势。
关键识别假设:在干预后时期,反事实结果的生成机制与干预前相同,即结构稳定。
矩阵形式表达:
1.3 低秩假设的经济学含义
因子维度的设定至关重要。通常在应用研究中足够。经济学解释为:
- 共同冲击:宏观经济周期、行业政策
- 群体特异冲击:区域市场成熟度、供应链能力
- 个体特质:品牌忠诚度、用户画像
定理(Bai, 2009):若且,则PC估计量具有一致性,且。
1.4 处理效应异质性下的估计策略
当存在异质性时,TWFE的加权平均解释失效。Gardner(2022)证明:
其中权重可能为负,导致负权重问题。
IFE解决方案:
- 两阶段估计:先估计并剥离交互项,再用残差做TWFE
- 交互加权:直接建模,用因子修正混淆偏误
# ife_model.py
import numpy as np
import pandas as pd
from scipy.linalg import svd
from typing import Tuple, Optional, Dict
import warnings
class InteractiveFixedEffectsModel:
"""
交互固定效应模型基类
支持:
- 同质处理效应(const_tau)
- 异质处理效应(het_tau)
- 因子数自动选择
- 共同趋势检验
"""
def __init__(self, rank: int = 2,
max_iter: int = 200,
tolerance: float = 1e-6,
iter_id: bool = True):
"""
参数:
- rank: 因子维度r
- max_iter: 迭代次数
- tolerance: 收敛阈值
- iter_id: 是否迭代估计个体固定效应
"""
self.rank = rank
self.max_iter = max_iter
self.tolerance = tolerance
self.iter_id = iter_id
# 估计结果
self.alpha = None # 个体效应 (N,)
self.delta = None # 时间效应 (T,)
self.Lambda = None # 载荷矩阵 (N, rank)
self.F = None # 因子矩阵 (T, rank)
self.tau = None # 处理效应
self.residuals = None # 残差
# 收敛信息
self.converged = False
self.iterations = 0
def fit(self, Y: np.ndarray, D: np.ndarray,
intercept: bool = True) -> 'InteractiveFixedEffectsModel':
"""
拟合IFE模型
参数:
- Y: 结果矩阵 (N, T)
- D: 处理矩阵 (N, T),二进制
- intercept: 是否包含常数项
返回:
self,包含所有估计参数
"""
self.N, self.T = Y.shape
self.D = D
# 初始化
self._initialize_params(Y)
# 迭代估计
for iteration in range(self.max_iter):
self.iterations = iteration + 1
# 阶段1: 估计处理效应 (给定Λ, F)
self._estimate_tau(Y)
# 阶段2: 剥离处理效应
Y0 = Y - self.tau * D
# 阶段3: 估计交互固定效应
self._estimate_interactive_effects(Y0)
# 收敛检查
if self._check_convergence():
self.converged = True
break
if not self.converged:
warnings.warn("未能在最大迭代次数内收敛")
# 计算残差
self.residuals = Y - (self._get_mu() + self.Lambda @ self.F.T + self.tau * D)
return self
def _initialize_params(self, Y: np.ndarray):
"""参数初始化"""
if self.iter_id:
# 初始化个体和时间效应
self.alpha = Y.mean(axis=1) # 个体均值
self.delta = Y.mean(axis=0) - Y.mean() # 时间效应
else:
self.alpha = np.zeros(self.N)
self.delta = np.zeros(self.T)
# 交互效应初始化:对残差做SVD
Y_resid = Y - (self.alpha.reshape(-1, 1) + self.delta.reshape(1, -1))
U, s, Vt = svd(Y_resid, full_matrices=False)
self.Lambda = U[:, :self.rank] * np.sqrt(s[:self.rank])
self.F = Vt[:self.rank, :].T * np.sqrt(s[:self.rank])
def _estimate_tau(self, Y: np.ndarray):
"""估计处理效应"""
# 计算反事实结果
Y0_pred = self._get_mu() + self.Lambda @ self.F.T
# 简单差分(可扩展为加权最小二乘)
treated_mask = self.D == 1
if treated_mask.sum() > 0:
# 同质效应:所有处理单元平均
self.tau = np.mean(Y[treated_mask] - Y0_pred[treated_mask])
else:
self.tau = 0.0
def _estimate_interactive_effects(self, Y0: np.ndarray):
"""估计交互固定效应(交替最小二乘)"""
# 固定F,估计Λ
# Λ = Y0 * F * (F'F)^{-1}
FTF = self.F.T @ self.F + 0.01 * np.eye(self.rank) # 正则化
self.Lambda = Y0 @ self.F @ np.linalg.inv(FTF)
# 固定Λ,估计F
# F = Y0' * Λ * (Λ'Λ)^{-1}
LTL = self.Lambda.T @ self.Lambda + 0.01 * np.eye(self.rank)
self.F = Y0.T @ self.Lambda @ np.linalg.inv(LTL)
# 标准化(消除旋转不变性)
# 要求 Λ'Λ / N = I_r
self._normalize_factors()
def _normalize_factors(self):
"""因子标准化"""
# QR分解保证识别
Q, R = np.linalg.qr(self.Lambda)
self.Lambda = Q * np.sqrt(self.N)
self.F = self.F @ R.T / np.sqrt(self.N)
def _get_mu(self) -> np.ndarray:
"""获取双向固定效应矩阵"""
return self.alpha.reshape(-1, 1) + self.delta.reshape(1, -1)
def _check_convergence(self) -> bool:
"""检查收敛性"""
if self.iterations < 2:
return False
# 计算参数变化范数
param_change = np.linalg.norm(self.Lambda - self.Lambda_prev, 'fro') / np.linalg.norm(self.Lambda_prev, 'fro')
self.Lambda_prev = self.Lambda.copy()
return param_change < self.tolerance
def predict_counterfactual(self) -> np.ndarray:
"""预测反事实结果"""
return self._get_mu() + self.Lambda @ self.F.T
def treatment_effect(self, unit_idx: Optional[int] = None) -> np.ndarray:
"""获取处理效应估计"""
if unit_idx is not None:
# 特定单元的处理效应
return self.tau * self.D[unit_idx, :]
return self.tau * self.D
def residuals_analysis(self) -> Dict[str, float]:
"""残差诊断"""
resid = self.residuals
return {
'mse': np.mean(resid**2),
'max_abs_resid': np.max(np.abs(resid)),
'autocorr_1': np.corrcoef(resid[:, 1:].flatten(),
resid[:, :-1].flatten())[0, 1],
'normality_pval': self._jarque_bera_test(resid.flatten())
}
def _jarque_bera_test(self, x: np.ndarray) -> float:
"""Jarque-Bera正态性检验"""
from scipy.stats import skew, kurtosis, chi2
n = len(x)
S = skew(x)
K = kurtosis(x)
JB = n/6 * (S**2 + (K-3)**2/4)
return 1 - chi2.cdf(JB, df=2)
# 电商案例数据生成
def generate_ecommerce_panel(N: int = 100, T: int = 36,
treat_share: float = 0.3,
rank: int = 3) -> Tuple[np.ndarray, np.ndarray, Dict]:
"""
生成电商面板数据,模拟动态定价效应
数据生成过程:
Y_it = α_i + δ_t + λ_i'F_t + τ * D_it + ε_it
返回:
- Y: 结果矩阵 (N, T)
- D: 处理矩阵 (N, T)
- true_params: 真实参数
"""
np.random.seed(42)
# 个体和时间效应
alpha = np.random.randn(N) * 2
delta = np.sin(np.arange(T) * np.pi / 6) * 5 # 季节效应
# 交互固定效应
Lambda = np.random.randn(N, rank)
# 因子:生命周期因子、库存因子、竞品因子
F = np.zeros((T, rank))
F[:, 0] = np.sin(np.arange(T) * np.pi / 18) # 产品生命周期
F[:, 1] = np.cos(np.arange(T) * np.pi / 12) # 库存周期
F[:, 2] = np.random.randn(T) * 0.5 # 随机冲击
# 处理分配:前30%的SKU在第13个月开始动态定价
D = np.zeros((N, T))
treat_idx = np.random.choice(N, int(N * treat_share), replace=False)
treat_start = 12
D[treat_idx, treat_start:] = 1
# 真实处理效应τ=1.5
tau = 1.5
# 合成Y
Y = (alpha.reshape(-1, 1) + delta.reshape(1, -1) +
Lambda @ F.T + tau * D + np.random.randn(N, T) * 0.5)
true_params = {
'alpha': alpha,
'delta': delta,
'Lambda': Lambda,
'F': F,
'tau': tau,
'treat_idx': treat_idx,
'treat_start': treat_start
}
return Y, D, true_params
if __name__ == "__main__":
Y, D, true_params = generate_ecommerce_panel()
# 拟合IFE模型
ife_model = InteractiveFixedEffectsModel(rank=3)
ife_model.fit(Y, D)
print(f"估计的处理效应: {ife_model.tau:.3f}")
print(f"真实处理效应: {true_params['tau']:.3f}")
print(f"相对误差: {abs(ife_model.tau - true_params['tau']) / true_params['tau']:.2%}")
# 残差诊断
resid_stats = ife_model.residuals_analysis()
print(f"\n残差诊断:")
for k, v in resid_stats.items():
print(f" {k}: {v:.4f}")
# 反事实预测
Y_cf = ife_model.predict_counterfactual()
# 可视化
import matplotlib.pyplot as plt
fig, axes = plt.subplots(2, 1, figsize=(14, 10))
# 处理组观测 vs 反事实
ax1 = axes[0]
treat_idx = true_params['treat_idx'][0] # 选一个处理单元
time_idx = np.arange(Y.shape[1])
ax1.plot(time_idx, Y[treat_idx, :], 'k-', label='Observed', linewidth=2)
ax1.plot(time_idx, Y_cf[treat_idx, :], 'r--', label='Counterfactual', linewidth=2)
ax1.axvline(x=true_params['treat_start'], color='g', linestyle=':')
ax1.set_title('Observed vs Counterfactual (Treated Unit)')
ax1.legend()
# 因子时序
ax2 = axes[1]
for i in range(3):
ax2.plot(time_idx, ife_model.F[:, i], label=f'Factor {i}')
ax2.set_title('Estimated Time Factors')
ax2.legend()
plt.tight_layout()
plt.savefig('ife_model_results.png')
II. 共同趋势检验的系统性框架
2.1 经典检验的局限与改进方向
共同趋势假设不可直接检验,但可通过安慰剂检验、前趋势检验和敏感性分析提供支持性证据。
| 检验类型 | 原假设 | 检验统计量 | 优点 | 局限 |
|---|---|---|---|---|
| ** 事件研究图 ** | 干预前效应为0 | 直观可视化 | 多重检验问题 | |
| ** 置换检验 ** | 处理单元随机分配 | 不依赖渐近理论 | 计算成本高 | |
| ** 协变量平衡 ** | $E[X_{it} | D_i=1]=E[X_{it} | D_i=0]$ | t-test / KS检验 |
| ** 因子结构检验 ** | 与正交 | Bai-Ng检验 | 直接检验内生性 | 需要强分布假设 |
实例分析:预趋势检验的陷阱
在电商案例中,直接绘制事件研究图显示干预前3个月存在显著负效应()。这可能被误判为"违反共同趋势"。但深入分析发现:平台在干预前3个月已开始灰度测试,导致处理组提前暴露。IFE模型通过将灰度期纳入因子结构,正确剥离了预趋势。
2.2 基于因子模型的共同趋势检验
IFE框架下,共同趋势等价于:
检验步骤:
I. 估计干预前因子:使用估计
II. 检验正交性:回归对,检验系数显著性
III. 安慰剂干预:将干预时间点提前,检验估计效应分布
# common_trend_tests.py
from scipy.stats import ttest_ind, ks_2samp
import warnings
from tqdm import tqdm
class CommonTrendTests:
"""
共同趋势检验工具箱
包含:
1. 事件研究图与置信区间
2. 置换检验(permutation test)
3. 因子正交性检验
4. 安慰剂时间检验
"""
def __init__(self, Y: np.ndarray, D: np.ndarray,
treat_start: int, unit_var: str = "unit",
time_var: str = "time"):
"""
参数:
- Y: 结果矩阵 (N, T)
- D: 处理矩阵 (N, T)
- treat_start: 真实干预开始时间索引
- unit_var: 单位变量名
- time_var: 时间变量名
"""
self.Y = Y
self.D = D
self.treat_start = treat_start
self.N, self.T = Y.shape
self.treated_units = np.where(D[:, treat_start] == 1)[0]
self.control_units = np.where(D[:, treat_start] == 0)[0]
self.unit_var = unit_var
self.time_var = time_var
def event_study_plot(self, max_lead: int = 5, max_lag: int = 10,
figsize: tuple = (12, 6)) -> Dict[str, np.ndarray]:
"""
生成事件研究图数据
估计每个相对时间点的处理效应:
τ_k = E[Y_{i,treat_start+k} | D_i=1] - E[Y_{i,treat_start+k} | D_i=0]
其中k<0为干预前,k>0为干预后
"""
# 动态效应估计
tau_k = np.zeros(max_lead + max_lag - 1)
ci_lower = np.zeros_like(tau_k)
ci_upper = np.zeros_like(tau_k)
relative_time = np.arange(-max_lag, max_lead)
for k in relative_time:
if k == 0:
continue # 干预当期通常不估计
t_idx = self.treat_start + k
# 检查时间索引边界
if t_idx < 0 or t_idx >= self.T:
tau_k[k + max_lag - 1] = np.nan
continue
# 处理组和对照组观测值
Y_treat = self.Y[self.treated_units, t_idx]
Y_control = self.Y[self.control_units, t_idx]
# 简单差分(可扩展为加权)
tau_k[k + max_lag - 1] = np.mean(Y_treat) - np.mean(Y_control)
# Bootstrap置信区间
n_boot = 200
tau_boot = np.zeros(n_boot)
for b in range(n_boot):
# 重采样
treat_boot = np.random.choice(Y_treat, size=len(Y_treat), replace=True)
ctrl_boot = np.random.choice(Y_control, size=len(Y_control), replace=True)
tau_boot[b] = np.mean(treat_boot) - np.mean(ctrl_boot)
ci_lower[k + max_lag - 1] = np.percentile(tau_boot, 2.5)
ci_upper[k + max_lag - 1] = np.percentile(tau_boot, 97.5)
results = {
'relative_time': relative_time,
'tau_k': tau_k,
'ci_lower': ci_lower,
'ci_upper': ci_upper,
'pre_trend_pval': self._test_pre_trend(tau_k[:max_lag-1])
}
return results
def _test_pre_trend(self, tau_pre: np.ndarray) -> float:
"""
检验干预前趋势联合显著性
原假设:τ_k=0 对于所有k<0
使用F检验(简化版:检验均值是否为0)
"""
if np.all(np.isnan(tau_pre)):
return np.nan
# 忽略nan值
valid_tau = tau_pre[~np.isnan(tau_pre)]
if len(valid_tau) == 0:
return np.nan
# t检验
t_stat, p_val = ttest_ind(valid_tau, np.zeros_like(valid_tau))
return p_val
def permutation_test(self, n_permutations: int = 500,
test_statistic: str = 'post_mean') -> Dict[str, float]:
"""
置换检验:随机分配处理组,构建零分布
test_statistic: 'post_mean'|'max_abs'|'rmspe'
"""
# 计算真实统计量
real_stat = self._compute_test_statistic(test_statistic)
# 随机置换处理分配
placebo_stats = np.zeros(n_permutations)
for p in tqdm(range(n_permutations), desc="Permutations"):
# 随机选择处理单元
shuffled_treat = np.random.choice(self.N,
size=len(self.treated_units),
replace=False)
# 创建安慰剂处理矩阵
D_placebo = np.zeros_like(self.D)
treat_start_temp = self.treat_start
D_placebo[shuffled_treat, treat_start_temp:] = 1
# 简单估计(可用IFE模型)
Y_placebo_post = self.Y[shuffled_treat, treat_start_temp:]
Y_control_post = self.Y[np.setdiff1d(np.arange(self.N), shuffled_treat), treat_start_temp:]
if test_statistic == 'post_mean':
placebo_stats[p] = np.mean(Y_placebo_post) - np.mean(Y_control_post)
elif test_statistic == 'rmspe':
# 计算全期RMSPE
Y_placebo = self.Y[shuffled_treat, :]
Y_control = self.Y[np.setdiff1d(np.arange(self.N), shuffled_treat), :]
placebo_stats[p] = np.sqrt(np.mean((Y_placebo - Y_control)**2))
# p值计算
p_val = np.mean(np.abs(placebo_stats) >= np.abs(real_stat))
return {
'p_value': p_val,
'real_statistic': real_stat,
'placebo_mean': np.mean(placebo_stats),
'placebo_std': np.std(placebo_stats)
}
def _compute_test_statistic(self, method: str) -> float:
"""计算检验统计量"""
Y_post_treat = self.Y[self.treated_units, self.treat_start:]
Y_post_ctrl = self.Y[self.control_units, self.treat_start:]
if method == 'post_mean':
return np.mean(Y_post_treat) - np.mean(Y_post_ctrl)
elif method == 'rmspe':
# 全时期均方根预测误差
Y_treat_all = self.Y[self.treated_units, :]
Y_ctrl_all = self.Y[self.control_units, :]
return np.sqrt(np.mean((Y_treat_all - Y_ctrl_all)**2))
else:
raise ValueError("Unsupported test statistic")
def factor_orthogonality_test(self, ife_model) -> Dict[str, float]:
"""
基于因子估计的正交性检验
原假设:λ_1 ⊥ F_t
"""
if not hasattr(ife_model, 'F'):
raise ValueError("需要先拟合IFE模型")
# 干预前因子
F_pre = ife_model.F[:self.treat_start, :]
# 处理指示变量(滞后一期避免机械相关)
D_lag = self.D[:, self.treat_start-1]
# 对每个因子回归
orthogonality_pvals = []
for factor_idx in range(F_pre.shape[1]):
# 因子得分对处理状态回归
X = np.column_stack([np.ones(len(D_lag)), D_lag])
y = F_pre[:, factor_idx]
# OLS
beta = np.linalg.lstsq(X, y, rcond=None)[0]
resid = y - X @ beta
# t检验
se = np.sqrt(np.sum(resid**2) / (len(y) - 2)) / np.sqrt(np.sum((D_lag - D_lag.mean())**2))
t_stat = beta[1] / se
p_val = 2 * (1 - scipy.stats.t.cdf(abs(t_stat), df=len(y)-2))
orthogonality_pvals.append(p_val)
return {
'min_pval': np.min(orthogonality_pvals),
'max_pval': np.max(orthogonality_pvals),
'joint_pval': np.mean(orthogonality_pvals), # 简化
'factor_pvals': orthogonality_pvals
}
def placebo_time_test(self, placebo_periods: List[int]) -> pd.DataFrame:
"""
安慰剂时间检验:将干预时间点提前
返回每个安慰剂时间的效应估计
"""
results = []
for placebo_time in placebo_periods:
# 创建安慰剂处理矩阵
D_placebo = np.zeros_like(self.D)
D_placebo[self.treated_units, placebo_time:] = 1
# 拟合安慰剂模型(使用IFE)
placebo_ife = InteractiveFixedEffectsModel(rank=2)
placebo_ife.fit(self.Y, D_placebo)
# 计算干预后效应
Y_post = self.Y[:, placebo_time:]
Y_cf_post = placebo_ife.predict_counterfactual()[:, placebo_time:]
placebo_effect = np.mean(Y_post[self.treated_units, :] -
Y_cf_post[self.treated_units, :])
results.append({
'placebo_time': placebo_time,
'effect': placebo_effect,
'abs_effect': abs(placebo_effect)
})
return pd.DataFrame(results)
# 电商案例检验
if __name__ == "__main__":
# 加载数据
Y, D, true_params = generate_ecommerce_panel()
# 拟合IFE模型(用于因子检验)
ife_model = InteractiveFixedEffectsModel(rank=3)
ife_model.fit(Y, D)
# 运行检验
ct_tests = CommonTrendTests(Y, D, true_params['treat_start'])
# 1. 事件研究图
event_results = ct_tests.event_study_plot()
print(f"干预前趋势p值: {event_results['pre_trend_pval']:.3f}")
# 2. 置换检验
perm_result = ct_tests.permutation_test(n_permutations=100)
print(f"置换检验p值: {perm_result['p_value']:.3f}")
# 3. 因子正交性检验
ortho_result = ct_tests.factor_orthogonality_test(ife_model)
print(f"因子正交性p值: {ortho_result['min_pval']:.3f}")
# 4. 安慰剂时间检验
placebo_times = [8, 9, 10, 11]
placebo_df = ct_tests.placebo_time_test(placebo_times)
print("\n安慰剂时间检验结果:")
print(placebo_df)
2.3 可视化与综合推断
# 可视化模块
def plot_comprehensive_diagnostics(ct_tests: CommonTrendTests,
ife_model: InteractiveFixedEffectsModel,
save_path: str = None):
"""
生成全面的共同趋势诊断图
包含:
1. 事件研究图
2. 因子正交性可视化
3. 置换分布
4. 安慰剂时间效应
"""
fig, axes = plt.subplots(2, 2, figsize=(16, 12))
# 1. 事件研究图
ax1 = axes[0, 0]
event_results = ct_tests.event_study_plot()
relative_time = event_results['relative_time']
tau_k = event_results['tau_k']
ci_lower = event_results['ci_lower']
ci_upper = event_results['ci_upper']
# 绘制干预前
pre_mask = relative_time < 0
ax1.plot(relative_time[pre_mask], tau_k[pre_mask], 'ko-',
label='Pre-treatment', markersize=8)
ax1.fill_between(relative_time[pre_mask],
ci_lower[pre_mask], ci_upper[pre_mask],
alpha=0.3, color='gray')
# 绘制干预后
post_mask = relative_time > 0
ax1.plot(relative_time[post_mask], tau_k[post_mask], 'ro-',
label='Post-treatment', markersize=8)
ax1.fill_between(relative_time[post_mask],
ci_lower[post_mask], ci_upper[post_mask],
alpha=0.3, color='red')
ax1.axhline(y=0, color='k', linestyle='--')
ax1.axvline(x=0, color='g', linestyle=':')
ax1.set_title('Event Study Plot')
ax1.set_xlabel('Relative Time to Treatment')
ax1.set_ylabel('Effect')
ax1.legend()
ax1.grid(True, alpha=0.3)
# 2. 因子正交性
ax2 = axes[0, 1]
ortho_result = ct_tests.factor_orthogonality_test(ife_model)
factor_pvals = ortho_result['factor_pvals']
ax2.bar(range(len(factor_pvals)), factor_pvals,
color=['red' if p < 0.05 else 'green' for p in factor_pvals])
ax2.axhline(y=0.05, color='k', linestyle='--')
ax2.set_title('Factor Orthogonality Tests')
ax2.set_xlabel('Factor ID')
ax2.set_ylabel('p-value')
# 3. 置换分布
ax3 = axes[1, 0]
perm_result = ct_tests.permutation_test()
# 绘制安慰剂效应分布
# 简化:从结果中获取数据
placebo_mean = perm_result['placebo_mean']
placebo_std = perm_result['placebo_std']
real_stat = perm_result['real_statistic']
# 生成正态近似分布
x = np.linspace(placebo_mean - 3*placebo_std,
placebo_mean + 3*placebo_std, 100)
y = scipy.stats.norm.pdf(x, placebo_mean, placebo_std)
ax3.plot(x, y, 'b-', label='Placebo distribution')
ax3.axvline(x=real_stat, color='r', linestyle='-',
label=f'Real effect: {real_stat:.2f}')
ax3.set_title('Permutation Test Distribution')
ax3.set_xlabel('Test Statistic')
ax3.set_ylabel('Density')
ax3.legend()
# 4. 因子时序
ax4 = axes[1, 1]
F = ife_model.F
for i in range(min(3, F.shape[1])):
ax4.plot(F[:, i], label=f'Factor {i}')
ax4.axvline(x=ct_tests.treat_start, color='g', linestyle=':')
ax4.set_title('Time Factors Evolution')
ax4.set_xlabel('Time')
ax4.legend()
plt.tight_layout()
if save_path:
plt.savefig(save_path, dpi=300, bbox_inches='tight')
return fig
# 综合推断报告
def generate_diagnosis_report(ct_tests: CommonTrendTests,
ife_model: InteractiveFixedEffectsModel) -> str:
"""
生成共同趋势诊断报告
"""
report = f"""
共同趋势假设诊断报告
生成时间: {datetime.now().isoformat()}
实验设计:
- 处理单元数: {len(ct_tests.treated_units)}
- 对照单元数: {len(ct_tests.control_units)}
- 干预开始时间: {ct_tests.treat_start}
- 总时期数: {ct_tests.T}
"""
# 事件研究检验
event_results = ct_tests.event_study_plot()
report += f"""
事件研究检验:
- 干预前趋势联合p值: {event_results['pre_trend_pval']:.3f}
- 最大预干预效应: {np.nanmax(np.abs(event_results['tau_k'][:ct_tests.treat_start])):.3f}
"""
# 置换检验
perm_result = ct_tests.permutation_test()
report += f"""
置换检验:
- p值: {perm_result['p_value']:.3f}
- 真实统计量: {perm_result['real_statistic']:.2f}
- 安慰剂均值: {perm_result['placebo_mean']:.2f}
"""
# 因子正交性
ortho_result = ct_tests.factor_orthogonality_test(ife_model)
report += f"""
因子正交性检验:
- 最小p值: {ortho_result['min_pval']:.3f}
- 最大p值: {ortho_result['max_pval']:.3f}
"""
# 结论
all_passed = (event_results['pre_trend_pval'] > 0.05 and
perm_result['p_value'] < 0.05 and
ortho_result['min_pval'] > 0.05)
report += f"""
综合判断:
- 共同趋势假设 {'通过' if all_passed else '未通过'}所有检验
- 建议使用IFE模型: {'是' if all_passed else '需要调整模型设定'}
"""
return report
# 运行完整诊断
if __name__ == "__main__":
# 生成诊断图
fig = plot_comprehensive_diagnostics(ct_tests, ife_model,
save_path='common_trend_diagnostics.png')
# 生成报告
report = generate_diagnosis_report(ct_tests, ife_model)
print(report)
Lexical error on line 5. Unrecognized text.
...[拒绝共同趋势] C -->|p≥0.05| E[进入置换检验]
----------------------^
III. 生产级部署:实时面板因果推断系统
3.1 系统架构设计
# deployment/panel_causal_api.py
from fastapi import FastAPI, HTTPException, Depends, BackgroundTasks
from pydantic import BaseModel, Field, validator
from typing import List, Dict, Optional, Any
import redis
import hashlib
import pickle
import numpy as np
from datetime import datetime
import uvicorn
from concurrent.futures import ProcessPoolExecutor
import json
from ife_model import InteractiveFixedEffectsModel
from common_trend_tests import CommonTrendTests
# 配置
class Config:
REDIS_HOST = 'localhost'
REDIS_PORT = 6379
REDIS_DB = 0
MAX_WORKERS = 4
MODEL_TTL = 86400 # 24小时
config = Config()
# 数据模型
class PanelDataInput(BaseModel):
"""
面板数据输入模型
支持两种格式:
1. 长格式:list of dicts with unit, time, outcome, treatment
2. 宽格式:dict with 'Y' and 'D' matrices
"""
data_format: str = Field("long", regex="^(long|wide)$")
data: List[Dict[str, float]] = Field(..., description="面板数据")
treat_start: int = Field(..., description="干预开始时间索引")
rank: int = Field(2, ge=1, le=10, description="因子维度")
auto_rank_selection: bool = Field(False, description="是否自动选择秩")
@validator('data')
def validate_data(cls, v):
if not v:
raise ValueError("数据不能为空")
return v
class FitResponse(BaseModel):
task_id: str
model_id: str
status: str
message: str
class DiagnosisResponse(BaseModel):
model_id: str
pre_trend_pval: float
permutation_pval: float
orthogonality_min_pval: float
passed_all_tests: bool
details: Dict[str, Any]
# Redis客户端
class RedisManager:
def __init__(self, host: str, port: int, db: int):
self.client = redis.Redis(host=host, port=port, db=db)
def save_model(self, model: Any, model_id: str):
"""序列化存储模型"""
pickled = pickle.dumps(model)
self.client.setex(f"ife:model:{model_id}", config.MODEL_TTL, pickled)
def load_model(self, model_id: str) -> Any:
"""加载模型"""
pickled = self.client.get(f"ife:model:{model_id}")
if not pickled:
raise ValueError("Model not found")
return pickle.loads(pickled)
def save_task(self, task_id: str, status: Dict):
"""存储任务状态"""
self.client.setex(f"ife:task:{task_id}", 3600, json.dumps(status))
def get_task(self, task_id: str) -> Optional[Dict]:
"""获取任务状态"""
data = self.client.get(f"ife:task:{task_id}")
if data:
return json.loads(data)
return None
redis_manager = RedisManager(config.REDIS_HOST, config.REDIS_PORT, config.REDIS_DB)
# 线程池
executor = ProcessPoolExecutor(max_workers=config.MAX_WORKERS)
app = FastAPI(title="Panel Causal Inference API", version="2.0.0")
# 数据转换工具
def long_to_wide(data: List[Dict], outcome_col: str = "outcome",
treatment_col: str = "treatment") -> Tuple[np.ndarray, np.ndarray]:
"""将长格式数据转换为宽格式矩阵"""
df = pd.DataFrame(data)
df = df.sort_values(['unit', 'time'])
# 结果矩阵
Y = df.pivot(index='unit', columns='time', values=outcome_col).values
# 处理矩阵
D = df.pivot(index='unit', columns='time', values=treatment_col).values
return Y, D
def generate_id(data_str: str) -> str:
"""生成模型ID"""
return f"ife_{hashlib.sha256(data_str.encode()).hexdigest()[:16]}"
@app.post("/model/fit", response_model=FitResponse)
async def fit_model(request: PanelDataInput, background_tasks: BackgroundTasks):
"""
异步拟合IFE模型
工作流程:
1. 验证输入数据
2. 转换数据格式
3. 生成模型ID
4. 提交后台任务
5. 返回任务ID
"""
try:
# 转换数据
if request.data_format == "long":
Y, D = long_to_wide(request.data)
else:
# 宽格式需重构
Y = np.array([list(d.values()) for d in request.data])
D = np.ones_like(Y) * 0 # 简化
# 生成模型ID
model_id = generate_id(str(Y.shape) + str(request.treat_start))
# 检查缓存
if redis_manager.client.exists(f"ife:model:{model_id}"):
return FitResponse(
task_id="",
model_id=model_id,
status="cached",
message="Model already exists"
)
# 提交后台任务
task_id = f"task_{model_id}"
status = {
"task_id": task_id,
"model_id": model_id,
"status": "submitted",
"submitted_at": datetime.now().isoformat()
}
redis_manager.save_task(task_id, status)
# 后台执行
background_tasks.add_task(_fit_model_task, Y, D, request, model_id, task_id)
return FitResponse(
task_id=task_id,
model_id=model_id,
status="submitted",
message="Model fitting started"
)
except Exception as e:
raise HTTPException(status_code=400, detail=str(e))
def _fit_model_task(Y: np.ndarray, D: np.ndarray,
request: PanelDataInput, model_id: str, task_id: str):
"""后台拟合任务"""
try:
# 更新状态:运行中
redis_manager.save_task(task_id, {
"task_id": task_id,
"model_id": model_id,
"status": "running",
"started_at": datetime.now().isoformat()
})
# 秩选择
if request.auto_rank_selection:
rank = _select_optimal_rank(Y, D, max_rank=5)
else:
rank = request.rank
# 拟合模型
model = InteractiveFixedEffectsModel(rank=rank)
model.fit(Y, D)
# 保存模型
redis_manager.save_model(model, model_id)
# 更新状态:完成
redis_manager.save_task(task_id, {
"task_id": task_id,
"model_id": model_id,
"status": "completed",
"completed_at": datetime.now().isoformat(),
"rank_used": rank
})
except Exception as e:
redis_manager.save_task(task_id, {
"task_id": task_id,
"model_id": model_id,
"status": "failed",
"error": str(e)
})
def _select_optimal_rank(Y: np.ndarray, D: np.ndarray,
max_rank: int = 5) -> int:
"""自动选择最优因子数(BIC准则)"""
bic_scores = []
for rank in range(1, max_rank + 1):
model = InteractiveFixedEffectsModel(rank=rank)
model.fit(Y, D)
# 计算BIC
n_params = Y.size * rank + rank**2
mse = np.mean(model.residuals**2)
bic = np.log(mse) + n_params / Y.size * np.log(Y.size)
bic_scores.append(bic)
return np.argmin(bic_scores) + 1
@app.get("/task/{task_id}")
async def get_task_status(task_id: str):
"""查询任务状态"""
status = redis_manager.get_task(task_id)
if not status:
raise HTTPException(status_code=404, detail="Task not found")
return status
@app.post("/model/diagnose/{model_id}", response_model=DiagnosisResponse)
async def diagnose_model(model_id: str,
test_config: Optional[Dict] = None):
"""
对拟合好的模型进行共同趋势诊断
测试配置包括:
- n_permutations: 置换检验次数
- max_lead/lag: 事件研究图范围
"""
try:
# 加载模型
model = redis_manager.load_model(model_id)
# 构建检验对象
treat_start = model.treat_start if hasattr(model, 'treat_start') else 0
ct_tests = CommonTrendTests(
Y=model.Y, D=model.D,
treat_start=treat_start
)
# 运行检验
# 1. 事件研究
event_results = ct_tests.event_study_plot()
# 2. 置换检验
n_perm = test_config.get("n_permutations", 100) if test_config else 100
perm_result = ct_tests.permutation_test(n_permutations=n_perm)
# 3. 因子正交性
ortho_result = ct_tests.factor_orthogonality_test(model)
# 综合判断
passed_all = (
event_results['pre_trend_pval'] > 0.05 and
perm_result['p_value'] < 0.05 and
ortho_result['min_pval'] > 0.05
)
return DiagnosisResponse(
model_id=model_id,
pre_trend_pval=event_results['pre_trend_pval'],
permutation_pval=perm_result['p_value'],
orthogonality_min_pval=ortho_result['min_pval'],
passed_all_tests=passed_all,
details={
'event_study': event_results,
'permutation': perm_result,
'orthogonality': ortho_result
}
)
except Exception as e:
raise HTTPException(status_code=400, detail=str(e))
@app.post("/model/predict/{model_id}")
async def predict_counterfactual(model_id: str,
post_periods: List[int]):
"""预测反事实结果"""
try:
model = redis_manager.load_model(model_id)
if not hasattr(model, 'predict_counterfactual'):
raise HTTPException(status_code=400, detail="Model does not support prediction")
Y_cf = model.predict_counterfactual()
# 提取指定时期
Y_cf_requested = Y_cf[:, post_periods]
return {
"model_id": model_id,
"counterfactual": Y_cf_requested.tolist(),
"periods": post_periods
}
except Exception as e:
raise HTTPException(status_code=400, detail=str(e))
@app.post("/model/update/{model_id}")
async def incremental_update(model_id: str,
new_data: List[Dict[str, float]]):
"""
增量更新模型(流式数据)
适用于在线学习场景:
1. 加载现有模型
2. 添加新数据
3. 部分重新估计
"""
try:
model = redis_manager.load_model(model_id)
# 转换为矩阵
if isinstance(model, InteractiveFixedEffectsModel):
# 扩展数据
# 简化:重新拟合(生产环境应实现增量算法)
pass
return {"status": "updated", "model_id": model_id}
except Exception as e:
raise HTTPException(status_code=400, detail=str(e))
@app.get("/health")
async def health_check():
"""健康检查"""
redis_status = redis_manager.client.ping()
return {
"status": "healthy" if redis_status else "degraded",
"redis_connected": redis_status,
"active_models": len(redis_manager.client.keys("ife:model:*"))
}
# Docker配置
"""
FROM python:3.9-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY . .
EXPOSE 8000
HEALTHCHECK --interval=30s --timeout=3s \
CMD curl -f http://localhost:8000/health || exit 1
CMD ["uvicorn", "api_server:app",
"--host", "0.0.0.0",
"--port", "8000",
"--workers", "4",
"--limit-max-requests", "1000"]
"""
# 部署脚本
"""
# 构建
docker build -t panel-causal-api:v2.0 .
# 运行(带Redis)
docker run -d -p 8000:8000 \
--name causal-api \
-e REDIS_HOST=redis.internal \
-e REDIS_PORT=6379 \
panel-causal-api:v2.0
# Kubernetes部署(生产)
kubectl apply -f k8s-deployment.yaml
"""
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=8000)
3.2 性能优化与扩展性
| 优化策略 | 实现方式 | 性能提升 | 成本 | 适用场景 |
|---|---|---|---|---|
| ** 模型缓存 ** | Redis存储序列化模型 | 100-1000x | 内存增加 | 高频预测 |
| ** 异步训练** | Celery/ProcessPool | 并发度↑ | 计算资源 | 批量任务 |
| ** 增量更新** | 在线PCA算法 | 10-50x | 复杂度↑ | 流式数据 |
| 分布式因子估计 | Spark MLlib | N节点加速 | 集群成本 | N>10k |
| GPU加速 | CuPy/TensorFlow | 5-20x | 硬件成本 | 深度扩展 |
在线PCA算法(Candidacy算法):
class OnlineFactorModel:
"""在线因子更新"""
def __init__(self, rank: int):
self.rank = rank
self.Lambda = None
self.F = None
self.N_obs = 0
def update(self, y_t: np.ndarray):
"""
观测到新向量y_t时更新因子
y_t: (N,) 新时刻所有单元观测
"""
if self.F is None:
# 初始化
self.N_obs = len(y_t)
self.F = np.zeros((0, self.rank))
self.Lambda = np.random.randn(self.N_obs, self.rank)
# 投影到当前因子空间
f_t_new = np.linalg.pinv(self.Lambda) @ y_t
# 更新因子矩阵
self.F = np.vstack([self.F, f_t_new.reshape(1, -1)])
# 更新载荷(小步长梯度下降)
learning_rate = 1.0 / (self.F.shape[0] + 1)
self.Lambda += learning_rate * np.outer(y_t - self.Lambda @ f_t_new, f_t_new)
IV. 高级扩展:非线性因子与深度学习
4.1 非线性因子模型
当与关系非线性时,使用神经网络扩展:
其中为多层感知机。
# deep_ife.py
import torch
import torch.nn as nn
from torch.optim import Adam
class DeepFactorModel(nn.Module):
"""
深度因子模型
网络结构:
λ_i → FC(128) → ReLU → Dropout → FC(rank)
F_t → FC(128) → ReLU → Dropout → FC(rank)
交互: [λ_i, F_t] → FC(64) → ReLU → Output
"""
def __init__(self, N: int, T: int, rank: int, hidden_dim: int = 128):
super().__init__()
self.N = N
self.T = T
self.rank = rank
# 载荷网络
self.lambda_net = nn.Sequential(
nn.Embedding(N, hidden_dim),
nn.ReLU(),
nn.Dropout(0.2),
nn.Linear(hidden_dim, rank)
)
# 因子网络
self.F_net = nn.Sequential(
nn.Embedding(T, hidden_dim),
nn.ReLU(),
nn.Dropout(0.2),
nn.Linear(hidden_dim, rank)
)
# 交互网络
self.interaction_net = nn.Sequential(
nn.Linear(2 * rank, 64),
nn.ReLU(),
nn.Linear(64, 1)
)
def forward(self, unit_ids: torch.Tensor, time_ids: torch.Tensor):
"""前向传播"""
# 获取载荷和因子
lambda_i = self.lambda_net(unit_ids) # (batch, rank)
F_t = self.F_net(time_ids) # (batch, rank)
# 交互特征
interaction = torch.cat([lambda_i, F_t], dim=1)
# 预测结果
Y_pred = self.interaction_net(interaction)
return Y_pred.squeeze()
def fit(self, Y: np.ndarray, D: np.ndarray,
lr: float = 0.001, epochs: int = 100,
lambda_reg: float = 0.01) -> 'DeepFactorModel':
"""训练深度因子模型"""
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
self.to(device)
# 构建训练数据
N, T = Y.shape
unit_ids, time_ids = np.meshgrid(np.arange(N), np.arange(T), indexing='ij')
# 处理单元
treated_mask = D == 1
# 转换张量
Y_tensor = torch.tensor(Y, dtype=torch.float32, device=device)
D_tensor = torch.tensor(D, dtype=torch.float32, device=device)
unit_tensor = torch.tensor(unit_ids, dtype=torch.long, device=device)
time_tensor = torch.tensor(time_ids, dtype=torch.long, device=device)
# 优化器
optimizer = Adam(self.parameters(), lr=lr)
# 训练循环
for epoch in range(epochs):
self.train()
optimizer.zero_grad()
# 前向传播
Y_pred = self.forward(unit_tensor.flatten(), time_tensor.flatten())
Y_pred = Y_pred.reshape(N, T)
# 损失:对照组拟合 + 处理组惩罚
control_loss = torch.mean((Y_pred[~treated_mask] - Y_tensor[~treated_mask])**2)
# 处理组损失(干预后)
post_treat = treated_mask & (time_ids >= self.treat_start)
treat_error = Y_pred[post_treat] - Y_tensor[post_treat]
treat_loss = torch.mean(treat_error**2)
# 正则化
lambda_norm = torch.mean(self.lambda_net[0].weight**2)
F_norm = torch.mean(self.F_net[0].weight**2)
reg_loss = lambda_reg * (lambda_norm + F_norm)
total_loss = control_loss + 0.5 * treat_loss + reg_loss
total_loss.backward()
optimizer.step()
if epoch % 10 == 0:
print(f"Epoch {epoch}: Loss={total_loss.item():.4f}")
return self
# 使用示例
def demo_deep_ife():
"""深度因子模型示例"""
Y, D, true_params = generate_ecommerce_panel(N=50, T=24)
deep_model = DeepFactorModel(N=50, T=24, rank=2)
deep_model.treat_start = true_params['treat_start']
deep_model.fit(Y, D)
# 预测
with torch.no_grad():
Y_pred = np.zeros_like(Y)
for i in range(Y.shape[0]):
for t in range(Y.shape[1]):
unit_id = torch.tensor([i], dtype=torch.long)
time_id = torch.tensor([t], dtype=torch.long)
Y_pred[i, t] = deep_model.forward(unit_id, time_id).item()
# 反事实
Y_cf = Y_pred.copy()
Y_cf[D == 1] = Y[D == 1] - 1.5 # 假设已知效应
print(f"深度模型MSE: {np.mean((Y_pred - Y)**2):.3f}")
return deep_model, Y_pred
if __name__ == "__main__":
deep_model, Y_pred = demo_deep_ife()
4.2 处理效应的机器学习推断
结合双重机器学习(Double ML)框架:
from sklearn.ensemble import RandomForestRegressor
from sklearn.model_selection import KFold
class DML_IFE:
"""双重机器学习IFE"""
def __init__(self, N: int, T: int, rank: int):
self.ife = InteractiveFixedEffectsModel(rank=rank)
self.m1 = RandomForestRegressor(n_estimators=100) # 结果模型
self.m2 = RandomForestRegressor(n_estimators=100) # 处理模型
def fit(self, Y: np.ndarray, D: np.ndarray, X: np.ndarray,
folds: int = 5):
"""
X: 协变量 (N*T, K)
"""
N, T = Y.shape
n_samples = N * T
# K折交叉拟合
kf = KFold(n_splits=folds)
D_resid = np.zeros(n_samples)
Y_resid = np.zeros(n_samples)
for train_idx, test_idx in kf.split(X):
X_train, X_test = X[train_idx], X[test_idx]
D_train, D_test = D.flatten()[train_idx], D.flatten()[test_idx]
Y_train, Y_test = Y.flatten()[train_idx], Y.flatten()[test_idx]
# nuisance估计
self.m1.fit(X_train, Y_train)
self.m2.fit(X_train, D_train)
# 残差
Y_resid[test_idx] = Y_test - self.m1.predict(X_test)
D_resid[test_idx] = D_test - self.m2.predict(X_test)
# IFE on residuals
self.ife.fit(Y_resid.reshape(N, T), D_resid.reshape(N, T))
return self
- 点赞
- 收藏
- 关注作者
评论(0)