反事实框架:理解因果推断的核心概念

举报
数字扫地僧 发表于 2025/09/30 17:33:38 2025/09/30
【摘要】 I. 反事实思维:从相关性到因果性 相关性思维的局限性在传统数据分析中,我们往往满足于发现变量之间的统计关联。然而,这种思维方式存在根本缺陷:混淆偏倚:两个变量可能因为第三个未观测变量而表现出虚假相关反向因果:因果方向可能与假设相反选择偏倚:样本选择过程可能引入系统性偏差 实例分析:健康生活方式与长寿假设我们观察到定期锻炼的人平均寿命更长。传统分析可能得出结论:锻炼导致长寿。但反事实思维促...

I. 反事实思维:从相关性到因果性

相关性思维的局限性

在传统数据分析中,我们往往满足于发现变量之间的统计关联。然而,这种思维方式存在根本缺陷:

  • 混淆偏倚:两个变量可能因为第三个未观测变量而表现出虚假相关
  • 反向因果:因果方向可能与假设相反
  • 选择偏倚:样本选择过程可能引入系统性偏差

实例分析:健康生活方式与长寿

假设我们观察到定期锻炼的人平均寿命更长。传统分析可能得出结论:锻炼导致长寿。但反事实思维促使我们思考:

如果那些定期锻炼的人没有锻炼,他们的寿命会是怎样?
如果那些不锻炼的人开始锻炼,他们的寿命会如何变化?

这种对比思维揭示了因果推断的核心挑战:我们永远无法同时观测到同一个体在锻炼和不锻炼两种状态下的结果。这就是著名的"因果推断根本问题"。

反事实框架的哲学基础

反事实框架建立在三个核心概念之上:

概念 定义 示例
事实 实际观察到的结果 患者接受了治疗并康复
反事实 未发生的潜在结果 如果同一患者未接受治疗,是否会康复
因果效应 事实与反事实的差异 治疗导致的康复概率变化
观察到的世界
事实结果
未观察到的反事实世界
反事实结果
因果效应
治疗组因果效应
对照组因果效应
平均处理效应
因果结论

II. 潜在结果框架:反事实的数学表达

Rubin因果模型

Donald Rubin提出的潜在结果框架为反事实概念提供了严格的数学基础。该框架的核心公式如下:

对于每个个体i,定义:

  • Yi(1)Y_i(1):接受处理时的潜在结果
  • Yi(0)Y_i(0):未接受处理时的潜在结果
  • DiD_i:处理指示变量(1表示处理,0表示对照)

个体因果效应为:

τi=Yi(1)Yi(0)\tau_i = Y_i(1) - Y_i(0)

因果推断的根本问题

我们永远无法同时观测到Yi(1)Y_i(1)Yi(0)Y_i(0),只能观测到:

Yiobs=DiYi(1)+(1Di)Yi(0)Y_i^{obs} = D_i \cdot Y_i(1) + (1-D_i) \cdot Y_i(0)

这导致了因果推断的根本问题:个体层面的因果效应不可识别

平均处理效应(ATE)

虽然个体效应不可识别,我们可以在群体层面估计平均处理效应:

ATE=E[Yi(1)Yi(0)]=E[Yi(1)]E[Yi(0)]ATE = \mathbb{E}[Y_i(1) - Y_i(0)] = \mathbb{E}[Y_i(1)] - \mathbb{E}[Y_i(0)]

识别假设

要从未观测数据中识别ATE,需要以下关键假设:

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.ensemble import RandomForestRegressor
from sklearn.linear_model import LinearRegression
import statsmodels.api as sm

class PotentialOutcomesFramework:
    """潜在结果框架实现"""
    
    def __init__(self):
        self.individual_data = None
        self.ate_estimate = None
        
    def generate_synthetic_data(self, n=1000, true_ate=0.5, confounding_strength=0.3):
        """
        生成包含混淆变量的合成数据
        
        参数:
        n: 样本量
        true_ate: 真实平均处理效应
        confounding_strength: 混淆变量强度
        """
        np.random.seed(42)
        
        # 生成混淆变量
        X = np.random.normal(0, 1, n)
        
        # 处理分配受混淆变量影响
        propensity = 1 / (1 + np.exp(-confounding_strength * X))
        D = np.random.binomial(1, propensity)
        
        # 生成潜在结果
        Y0 = 2 * X + np.random.normal(0, 0.5, n)  # 对照组结果
        Y1 = 2 * X + true_ate + np.random.normal(0, 0.5, n)  # 处理组结果
        
        # 观测结果
        Y_obs = D * Y1 + (1 - D) * Y0
        
        # 创建数据框
        self.individual_data = pd.DataFrame({
            'X': X,
            'D': D,
            'Y0': Y0,
            'Y1': Y1,
            'Y_obs': Y_obs,
            'tau': Y1 - Y0  # 个体处理效应
        })
        
        return self.individual_data
    
    def illustrate_fundamental_problem(self):
        """展示因果推断的根本问题"""
        if self.individual_data is None:
            raise ValueError("请先生成数据")
        
        fig, axes = plt.subplots(1, 3, figsize=(15, 5))
        
        # 展示潜在结果分布
        axes[0].hist(self.individual_data['Y0'], alpha=0.7, label='Y(0)', bins=30)
        axes[0].hist(self.individual_data['Y1'], alpha=0.7, label='Y(1)', bins=30)
        axes[0].set_xlabel('潜在结果')
        axes[0].set_ylabel('频数')
        axes[0].set_title('潜在结果分布')
        axes[0].legend()
        
        # 展示观测到的结果
        treated = self.individual_data[self.individual_data['D'] == 1]
        control = self.individual_data[self.individual_data['D'] == 0]
        
        axes[1].hist(control['Y_obs'], alpha=0.7, label='观测到的 Y|D=0', bins=30)
        axes[1].hist(treated['Y_obs'], alpha=0.7, label='观测到的 Y|D=1', bins=30)
        axes[1].set_xlabel('观测结果')
        axes[1].set_ylabel('频数')
        axes[1].set_title('观测结果分布')
        axes[1].legend()
        
        # 展示因果效应分布
        axes[2].hist(self.individual_data['tau'], alpha=0.7, bins=30, color='green')
        axes[2].axvline(self.individual_data['tau'].mean(), color='red', 
                       linestyle='--', label=f'真实ATE: {self.individual_data["tau"].mean():.2f}')
        axes[2].set_xlabel('个体因果效应')
        axes[2].set_ylabel('频数')
        axes[2].set_title('个体因果效应分布')
        axes[2].legend()
        
        plt.tight_layout()
        plt.show()
        
        # 计算各种估计量
        naive_ate = treated['Y_obs'].mean() - control['Y_obs'].mean()
        true_ate = self.individual_data['tau'].mean()
        
        print(f"真实ATE: {true_ate:.3f}")
        print(f"朴素估计ATE: {naive_ate:.3f}")
        print(f"偏误: {naive_ate - true_ate:.3f}")
        
        return {
            'true_ate': true_ate,
            'naive_ate': naive_ate,
            'bias': naive_ate - true_ate
        }

# 演示根本问题
framework = PotentialOutcomesFramework()
data = framework.generate_synthetic_data(n=2000, true_ate=0.5, confounding_strength=0.8)
results = framework.illustrate_fundamental_problem()

识别假设的数学表达

Lexical error on line 7. Unrecognized text. ...果独立] B --> B2[D ⊥ Y1,Y0 | X] ----------------------^

III. 随机化实验:反事实推断的黄金标准

随机化的作用

随机化实验通过随机分配处理,解决了因果推断的根本问题。在完全随机化下:

(Yi(1),Yi(0))Di(Y_i(1), Y_i(0)) \perp D_i

这意味着处理组和对照组在潜在结果分布上是可比的。

实例分析:药物疗效的RCT

考虑一个药物随机对照试验:

  • 处理组:接受新药治疗
  • 对照组:接受安慰剂
  • 结果变量:健康状况改善程度

通过随机化,我们可以直接比较两组的平均结果来估计ATE:

ATE^=1n1i:Di=1Yi1n0i:Di=0Yi\hat{ATE} = \frac{1}{n_1}\sum_{i:D_i=1} Y_i - \frac{1}{n_0}\sum_{i:D_i=0} Y_i

随机化实验的代码实现

class RandomizedExperiment:
    """随机化实验模拟"""
    
    def __init__(self, n=1000, true_ate=0.3, heterogeneous_effect=False):
        self.n = n
        self.true_ate = true_ate
        self.heterogeneous_effect = heterogeneous_effect
        self.data = None
        
    def run_experiment(self):
        """运行随机化实验"""
        np.random.seed(42)
        
        # 完全随机分配
        D = np.random.binomial(1, 0.5, self.n)
        
        # 生成基线特征
        X1 = np.random.normal(0, 1, self.n)  # 年龄标准化
        X2 = np.random.binomial(1, 0.4, self.n)  # 性别
        
        # 生成潜在结果
        if self.heterogeneous_effect:
            # 异质性处理效应:对年轻患者效果更好
            treatment_effect = self.true_ate * (1 + 0.5 * (X1 < 0))
            Y0 = 2 + 0.3 * X1 + 0.2 * X2 + np.random.normal(0, 0.5, self.n)
            Y1 = Y0 + treatment_effect
        else:
            # 同质性处理效应
            Y0 = 2 + 0.3 * X1 + 0.2 * X2 + np.random.normal(0, 0.5, self.n)
            Y1 = Y0 + self.true_ate
        
        # 观测结果
        Y_obs = D * Y1 + (1 - D) * Y0
        
        self.data = pd.DataFrame({
            'D': D,
            'X1': X1,
            'X2': X2,
            'Y0': Y0,
            'Y1': Y1,
            'Y_obs': Y_obs,
            'tau': Y1 - Y0
        })
        
        return self.data
    
    def analyze_experiment(self):
        """分析实验结果"""
        if self.data is None:
            raise ValueError("请先运行实验")
        
        treated = self.data[self.data['D'] == 1]
        control = self.data[self.data['D'] == 0]
        
        # 平衡性检验
        balance_table = pd.DataFrame({
            'Variable': ['X1', 'X2'],
            'Treated_Mean': [treated['X1'].mean(), treated['X2'].mean()],
            'Control_Mean': [control['X1'].mean(), control['X2'].mean()],
            'Difference': [treated['X1'].mean() - control['X1'].mean(), 
                          treated['X2'].mean() - control['X2'].mean()],
            'P_Value': [
                stats.ttest_ind(treated['X1'], control['X1']).pvalue,
                stats.ttest_ind(treated['X2'], control['X2']).pvalue
            ]
        })
        
        # ATE估计
        naive_ate = treated['Y_obs'].mean() - control['Y_obs'].mean()
        true_ate = self.data['tau'].mean()
        
        # 置信区间
        n_treated = len(treated)
        n_control = len(control)
        var_treated = treated['Y_obs'].var()
        var_control = control['Y_obs'].var()
        
        se = np.sqrt(var_treated/n_treated + var_control/n_control)
        ci_lower = naive_ate - 1.96 * se
        ci_upper = naive_ate + 1.96 * se
        
        # 可视化结果
        self._plot_experiment_results(treated, control)
        
        return {
            'balance_table': balance_table,
            'true_ate': true_ate,
            'estimated_ate': naive_ate,
            'bias': naive_ate - true_ate,
            'confidence_interval': (ci_lower, ci_upper),
            'standard_error': se
        }
    
    def _plot_experiment_results(self, treated, control):
        """绘制实验结果"""
        fig, axes = plt.subplots(2, 2, figsize=(12, 10))
        
        # 协变量分布比较
        axes[0, 0].hist(treated['X1'], alpha=0.7, label='处理组', bins=20)
        axes[0, 0].hist(control['X1'], alpha=0.7, label='对照组', bins=20)
        axes[0, 0].set_xlabel('X1 (年龄)')
        axes[0, 0].set_ylabel('频数')
        axes[0, 0].set_title('协变量X1的分布平衡')
        axes[0, 0].legend()
        
        # 结果分布比较
        axes[0, 1].hist(treated['Y_obs'], alpha=0.7, label='处理组', bins=20)
        axes[0, 1].hist(control['Y_obs'], alpha=0.7, label='对照组', bins=20)
        axes[0, 1].set_xlabel('观测结果 Y')
        axes[0, 1].set_ylabel('频数')
        axes[0, 1].set_title('结果变量分布')
        axes[0, 1].legend()
        
        # 处理效应分布
        axes[1, 0].hist(self.data['tau'], alpha=0.7, bins=20, color='green')
        axes[1, 0].axvline(self.data['tau'].mean(), color='red', linestyle='--', 
                          label=f'真实ATE: {self.data["tau"].mean():.2f}')
        axes[1, 0].set_xlabel('个体处理效应')
        axes[1, 0].set_ylabel('频数')
        axes[1, 0].set_title('个体处理效应分布')
        axes[1, 0].legend()
        
        # 异质性分析(如果存在)
        if self.heterogeneous_effect:
            self.data['age_group'] = np.where(self.data['X1'] < 0, '年轻', '年长')
            heterogeneous_effects = self.data.groupby('age_group')['tau'].mean()
            
            axes[1, 1].bar(heterogeneous_effects.index, heterogeneous_effects.values, 
                          alpha=0.7, color=['blue', 'orange'])
            axes[1, 1].set_ylabel('平均处理效应')
            axes[1, 1].set_title('按年龄分组的异质性处理效应')
        
        plt.tight_layout()
        plt.show()

# 运行随机化实验
experiment = RandomizedExperiment(n=2000, true_ate=0.5, heterogeneous_effect=True)
experiment_data = experiment.run_experiment()
results = experiment.analyze_experiment()

print("随机化实验结果:")
print(f"真实ATE: {results['true_ate']:.3f}")
print(f"估计ATE: {results['estimated_ate']:.3f}")
print(f"偏误: {results['bias']:.3f}")
print(f"95%置信区间: [{results['confidence_interval'][0]:.3f}, {results['confidence_interval'][1]:.3f}]")
print("\n平衡性检验:")
print(results['balance_table'])

随机化实验的价值与局限

随机化实验
内部有效性
因果识别
无混淆偏倚
优势: 因果结论可靠
优势: 效应估计无偏
优势: 无需统计调整
外部有效性
伦理可行性
成本可行性
局限: 推广性受限
局限: 伦理约束
局限: 实施成本高
适用场景: 因果识别
适用场景: 政策评估
适用场景: 医学研究

IV. 观察性研究中的反事实推断

处理观察性数据的挑战

在无法进行随机化实验时,我们需要使用观察性数据。主要挑战包括:

  • 混淆偏倚:处理组和对照组在基线特征上系统性差异
  • 选择偏倚:样本选择过程与结果变量相关
  • 测量误差:变量测量不准确

实例分析:教育回报的经济学研究

考虑研究大学教育对收入的影响:

  • 处理:完成大学教育
  • 结果:年收入
  • 挑战:能力、家庭背景等混淆因素

倾向得分匹配

倾向得分匹配通过模拟随机化来减少混淆偏倚:

class ObservationalStudy:
    """观察性研究因果推断"""
    
    def __init__(self):
        self.data = None
        self.propensity_scores = None
        
    def generate_observational_data(self, n=2000, true_ate=0.4):
        """生成观察性数据"""
        np.random.seed(42)
        
        # 生成混淆变量
        ability = np.random.normal(0, 1, n)  # 能力
        family_income = np.random.normal(0, 1, n)  # 家庭收入
        motivation = np.random.normal(0, 1, n)  # 学习动机
        
        # 处理分配(大学教育)受混淆变量影响
        propensity_score = 1 / (1 + np.exp(
            - (0.5 * ability + 0.3 * family_income + 0.4 * motivation)
        ))
        D = np.random.binomial(1, propensity_score)
        
        # 生成潜在结果
        Y0 = (20 + 2 * ability + 1.5 * family_income + 
              1 * motivation + np.random.normal(0, 2, n))
        Y1 = Y0 + true_ate + 0.5 * ability  # 异质性处理效应
        
        # 观测结果
        Y_obs = D * Y1 + (1 - D) * Y0
        
        self.data = pd.DataFrame({
            'ability': ability,
            'family_income': family_income,
            'motivation': motivation,
            'D': D,
            'Y0': Y0,
            'Y1': Y1,
            'Y_obs': Y_obs,
            'tau': Y1 - Y0,
            'propensity_true': propensity_score
        })
        
        return self.data
    
    def estimate_propensity_scores(self, method='logistic'):
        """估计倾向得分"""
        from sklearn.linear_model import LogisticRegression
        
        X = self.data[['ability', 'family_income', 'motivation']]
        D = self.data['D']
        
        if method == 'logistic':
            model = LogisticRegression()
            model.fit(X, D)
            propensity_scores = model.predict_proba(X)[:, 1]
        else:
            # 其他方法如随机森林等
            from sklearn.ensemble import RandomForestClassifier
            model = RandomForestClassifier()
            model.fit(X, D)
            propensity_scores = model.predict_proba(X)[:, 1]
        
        self.data['propensity_estimated'] = propensity_scores
        self.propensity_scores = propensity_scores
        
        return propensity_scores
    
    def propensity_score_matching(self, method='nearest', caliper=0.1):
        """倾向得分匹配"""
        from sklearn.neighbors import NearestNeighbors
        
        treated = self.data[self.data['D'] == 1]
        control = self.data[self.data['D'] == 0]
        
        if method == 'nearest':
            # 最近邻匹配
            nbrs = NearestNeighbors(n_neighbors=1).fit(
                control[['propensity_estimated']].values
            )
            distances, indices = nbrs.kneighbors(
                treated[['propensity_estimated']].values
            )
            
            # 应用卡钳值
            matched_mask = distances.flatten() <= caliper
            matched_treated = treated[matched_mask]
            matched_control_indices = indices[matched_mask].flatten()
            matched_control = control.iloc[matched_control_indices]
            
        elif method == 'optimal':
            # 最优匹配 - 简化实现
            # 实际中应使用专门的匹配算法
            matched_treated = treated
            matched_control = control
            
        # 计算匹配后的ATE
        ate_matched = matched_treated['Y_obs'].mean() - matched_control['Y_obs'].mean()
        
        return {
            'ate_matched': ate_matched,
            'matched_treated': matched_treated,
            'matched_control': matched_control,
            'matching_ratio': len(matched_treated) / len(treated)
        }
    
    def inverse_probability_weighting(self):
        """逆概率加权"""
        if self.propensity_scores is None:
            self.estimate_propensity_scores()
        
        data = self.data.copy()
        data['weight'] = np.where(
            data['D'] == 1, 
            1 / data['propensity_estimated'],
            1 / (1 - data['propensity_estimated'])
        )
        
        # 加权均值
        weighted_treated = np.average(
            data[data['D'] == 1]['Y_obs'],
            weights=data[data['D'] == 1]['weight']
        )
        weighted_control = np.average(
            data[data['D'] == 0]['Y_obs'],
            weights=data[data['D'] == 0]['weight']
        )
        
        ate_ipw = weighted_treated - weighted_control
        
        return {
            'ate_ipw': ate_ipw,
            'weighted_data': data
        }
    
    def compare_methods(self):
        """比较不同方法"""
        if self.data is None:
            raise ValueError("请先生成数据")
        
        # 真实ATE
        true_ate = self.data['tau'].mean()
        
        # 朴素估计
        naive_ate = (self.data[self.data['D'] == 1]['Y_obs'].mean() - 
                    self.data[self.data['D'] == 0]['Y_obs'].mean())
        
        # 倾向得分匹配
        self.estimate_propensity_scores()
        matching_result = self.propensity_score_matching()
        
        # 逆概率加权
        ipw_result = self.inverse_probability_weighting()
        
        # 结果比较
        comparison = pd.DataFrame({
            'Method': ['真实ATE', '朴素估计', '倾向得分匹配', '逆概率加权'],
            'ATE_Estimate': [true_ate, naive_ate, matching_result['ate_matched'], 
                           ipw_result['ate_ipw']],
            'Bias': [0, naive_ate - true_ate, 
                    matching_result['ate_matched'] - true_ate,
                    ipw_result['ate_ipw'] - true_ate]
        })
        
        # 可视化比较
        self._plot_comparison(comparison, matching_result)
        
        return comparison
    
    def _plot_comparison(self, comparison, matching_result):
        """绘制方法比较图"""
        fig, axes = plt.subplots(2, 2, figsize=(15, 10))
        
        # ATE估计比较
        methods = comparison['Method'][1:]  # 排除真实ATE
        estimates = comparison['ATE_Estimate'][1:]
        biases = comparison['Bias'][1:]
        
        axes[0, 0].bar(methods, estimates, alpha=0.7, color=['red', 'blue', 'green'])
        axes[0, 0].axhline(y=comparison['ATE_Estimate'][0], color='black', 
                          linestyle='--', label='真实ATE')
        axes[0, 0].set_ylabel('ATE估计值')
        axes[0, 0].set_title('不同方法的ATE估计比较')
        axes[0, 0].tick_params(axis='x', rotation=45)
        axes[0, 0].legend()
        
        # 偏误比较
        axes[0, 1].bar(methods, biases, alpha=0.7, color=['red', 'blue', 'green'])
        axes[0, 1].axhline(y=0, color='black', linestyle='-', alpha=0.5)
        axes[0, 1].set_ylabel('估计偏误')
        axes[0, 1].set_title('估计偏误比较')
        axes[0, 1].tick_params(axis='x', rotation=45)
        
        # 倾向得分分布
        treated_ps = self.data[self.data['D'] == 1]['propensity_estimated']
        control_ps = self.data[self.data['D'] == 0]['propensity_estimated']
        
        axes[1, 0].hist(control_ps, alpha=0.7, label='对照组', bins=20, density=True)
        axes[1, 0].hist(treated_ps, alpha=0.7, label='处理组', bins=20, density=True)
        axes[1, 0].set_xlabel('倾向得分')
        axes[1, 0].set_ylabel('密度')
        axes[1, 0].set_title('倾向得分分布')
        axes[1, 0].legend()
        
        # 匹配前后平衡性
        original_imbalance = abs(
            self.data[self.data['D'] == 1]['ability'].mean() - 
            self.data[self.data['D'] == 0]['ability'].mean()
        )
        
        matched_imbalance = abs(
            matching_result['matched_treated']['ability'].mean() - 
            matching_result['matched_control']['ability'].mean()
        )
        
        axes[1, 1].bar(['匹配前', '匹配后'], [original_imbalance, matched_imbalance], 
                      alpha=0.7, color=['red', 'green'])
        axes[1, 1].set_ylabel('能力变量差异')
        axes[1, 1].set_title('匹配前后协变量平衡性改进')
        
        plt.tight_layout()
        plt.show()

# 运行观察性研究分析
observational_study = ObservationalStudy()
data = observational_study.generate_observational_data(n=3000, true_ate=0.4)
comparison = observational_study.compare_methods()

print("观察性研究方法比较:")
print(comparison)

观察性研究方法的比较

方法 原理 优点 缺点
倾向得分匹配 模拟随机化,构建可比组 直观,减少模型依赖 可能丢失样本,卡钳值选择敏感
逆概率加权 重加权创建伪总体 使用全部样本,理论性质好 对倾向得分模型敏感,极端权重问题
回归调整 直接控制混淆变量 充分利用数据,效率高 对模型设定敏感,外推风险
双重稳健估计 结合倾向得分和结果模型 双重保护,更稳健 计算复杂,实现难度大

V. 工具变量:处理未观测混淆

工具变量的概念

当存在未观测混淆变量时,工具变量方法提供了一种解决方案。有效的工具变量Z需要满足:

  1. 相关性:Z与处理变量D相关
  2. 排他性:Z仅通过D影响结果Y
  3. 独立性:Z与未观测混淆变量独立

实例分析:教育对收入的影响

著名的工具变量案例:

  • 处理:大学教育
  • 结果:收入
  • 工具变量:到大学的距离(Card, 1995)
  • 理由:距离影响教育决策但不直接影响收入

工具变量估计的代码实现

class InstrumentalVariables:
    """工具变量方法实现"""
    
    def __init__(self):
        self.data = None
        
    def generate_iv_data(self, n=2000, true_ate=0.5, iv_strength=0.3):
        """生成工具变量数据"""
        np.random.seed(42)
        
        # 未观测混淆变量(能力)
        ability = np.random.normal(0, 1, n)
        
        # 工具变量(如到大学的距离)
        Z = np.random.normal(0, 1, n)
        
        # 处理变量(大学教育)受工具变量和混淆变量影响
        D_star = 0.5 * Z + 0.8 * ability + np.random.normal(0, 1, n)
        D = (D_star > 0).astype(int)
        
        # 结果变量(收入)受处理变量和混淆变量影响
        Y = (true_ate * D + 1.2 * ability + np.random.normal(0, 0.5, n))
        
        self.data = pd.DataFrame({
            'Z': Z,
            'D': D,
            'Y': Y,
            'ability': ability,  # 实际中未观测
            'true_ate': true_ate
        })
        
        return self.data
    
    def two_stage_least_squares(self):
        """两阶段最小二乘法"""
        if self.data is None:
            raise ValueError("请先生成数据")
        
        # 第一阶段:D对Z回归
        X1 = sm.add_constant(self.data['Z'])
        stage1 = sm.OLS(self.data['D'], X1).fit()
        D_hat = stage1.predict(X1)
        
        # 第二阶段:Y对预测的D回归
        X2 = sm.add_constant(D_hat)
        stage2 = sm.OLS(self.data['Y'], X2).fit()
        
        # 朴素OLS估计(有偏)
        naive_ols = sm.OLS(self.data['Y'], sm.add_constant(self.data['D'])).fit()
        
        return {
            'stage1': stage1,
            'stage2': stage2,
            'naive_ols': naive_ols,
            'iv_estimate': stage2.params[1],
            'naive_estimate': naive_ols.params[1]
        }
    
    def check_iv_assumptions(self):
        """检验工具变量假设"""
        if self.data is None:
            raise ValueError("请先生成数据")
        
        # 相关性检验(第一阶段F统计量)
        X1 = sm.add_constant(self.data['Z'])
        stage1 = sm.OLS(self.data['D'], X1).fit()
        f_statistic = stage1.fvalue
        
        # 排他性约束无法直接检验,但可以进行过度识别检验
        # 这里简化实现
        
        results = {
            'first_stage_f_statistic': f_statistic,
            'first_stage_r_squared': stage1.rsquared,
            'instrument_strength': '强工具变量' if f_statistic > 10 else '弱工具变量'
        }
        
        return results
    
    def plot_iv_analysis(self, iv_results):
        """绘制工具变量分析图"""
        fig, axes = plt.subplots(2, 2, figsize=(15, 10))
        
        # 工具变量与处理变量关系
        axes[0, 0].scatter(self.data['Z'], self.data['D'], alpha=0.5)
        z_range = np.linspace(self.data['Z'].min(), self.data['Z'].max(), 100)
        stage1_pred = iv_results['stage1'].params[0] + iv_results['stage1'].params[1] * z_range
        axes[0, 0].plot(z_range, stage1_pred, color='red', linewidth=2)
        axes[0, 0].set_xlabel('工具变量 Z')
        axes[0, 0].set_ylabel('处理变量 D')
        axes[0, 0].set_title('第一阶段:工具变量与处理变量关系')
        
        # 处理变量与结果变量关系
        axes[0, 1].scatter(self.data['D'], self.data['Y'], alpha=0.5)
        
        # 添加OLS和IV拟合线
        d_values = np.array([0, 1])
        ols_line = iv_results['naive_ols'].params[0] + iv_results['naive_ols'].params[1] * d_values
        iv_line = iv_results['stage2'].params[0] + iv_results['stage2'].params[1] * d_values
        
        axes[0, 1].plot(d_values, ols_line, color='red', linewidth=2, label='OLS')
        axes[0, 1].plot(d_values, iv_line, color='blue', linewidth=2, label='IV')
        axes[0, 1].set_xlabel('处理变量 D')
        axes[0, 1].set_ylabel('结果变量 Y')
        axes[0, 1].set_title('处理变量与结果变量关系')
        axes[0, 1].legend()
        
        # 估计值比较
        methods = ['真实ATE', '朴素OLS', '工具变量']
        estimates = [self.data['true_ate'].iloc[0], 
                    iv_results['naive_estimate'],
                    iv_results['iv_estimate']]
        
        axes[1, 0].bar(methods, estimates, alpha=0.7, 
                      color=['green', 'red', 'blue'])
        axes[1, 0].set_ylabel('ATE估计值')
        axes[1, 0].set_title('不同方法的ATE估计比较')
        
        # 偏误比较
        true_ate = self.data['true_ate'].iloc[0]
        biases = [0, iv_results['naive_estimate'] - true_ate, 
                 iv_results['iv_estimate'] - true_ate]
        
        axes[1, 1].bar(methods[1:], biases[1:], alpha=0.7, color=['red', 'blue'])
        axes[1, 1].axhline(y=0, color='black', linestyle='-', alpha=0.5)
        axes[1, 1].set_ylabel('估计偏误')
        axes[1, 1].set_title('估计偏误比较')
        
        plt.tight_layout()
        plt.show()

# 运行工具变量分析
iv_analysis = InstrumentalVariables()
iv_data = iv_analysis.generate_iv_data(n=3000, true_ate=0.5, iv_strength=0.5)
iv_results = iv_analysis.two_stage_least_squares()
assumption_checks = iv_analysis.check_iv_assumptions()

iv_analysis.plot_iv_analysis(iv_results)

print("工具变量分析结果:")
print(f"真实ATE: {iv_data['true_ate'].iloc[0]:.3f}")
print(f"朴素OLS估计: {iv_results['naive_estimate']:.3f}")
print(f"工具变量估计: {iv_results['iv_estimate']:.3f}")
print(f"\n工具变量假设检验:")
print(f"第一阶段F统计量: {assumption_checks['first_stage_f_statistic']:.3f}")
print(f"第一阶段R²: {assumption_checks['first_stage_r_squared']:.3f}")
print(f"工具变量强度: {assumption_checks['instrument_strength']}")

工具变量方法的框架

Lexical error on line 7. Unrecognized text. ... E[相关性
Z→D] --> F[假设1满足] ----------------------^

VI. 反事实框架的实际应用案例

案例研究:电商平台的推荐算法评估

业务背景

某电商平台想要评估新推荐算法对用户购买行为的影响。由于无法对所有用户同时测试新旧算法,需要采用因果推断方法。

反事实问题构建

  • 处理变量:是否暴露于新推荐算法
  • 结果变量:用户购买金额
  • 挑战:用户特征与算法暴露相关(活跃用户更可能看到新算法)

解决方案:倾向得分加权

class EcommerceCaseStudy:
    """电商推荐算法评估案例"""
    
    def __init__(self):
        self.data = None
        
    def generate_ecommerce_data(self, n=5000):
        """生成电商平台数据"""
        np.random.seed(42)
        
        # 用户特征
        user_activity = np.random.exponential(2, n)  # 用户活跃度
        historical_spend = np.random.gamma(100, 2, n)  # 历史消费
        user_tenure = np.random.gamma(200, 1, n)  # 用户生命周期
        
        # 处理分配(新算法暴露)与用户特征相关
        propensity = 1 / (1 + np.exp(
            - (0.3 * user_activity + 0.2 * np.log(historical_spend) - 0.1 * user_tenure)
        ))
        treatment = np.random.binomial(1, propensity)
        
        # 潜在结果
        # 基础购买金额
        base_spend = (50 + 0.5 * user_activity + 0.3 * np.log(historical_spend) + 
                      0.2 * user_tenure + np.random.normal(0, 20, n))
        
        # 处理效应:新算法对高活跃用户效果更好
        treatment_effect = 15 * (1 + 0.5 * (user_activity > 2))
        
        Y0 = base_spend
        Y1 = base_spend + treatment_effect
        
        # 观测结果
        Y_obs = treatment * Y1 + (1 - treatment) * Y0
        
        self.data = pd.DataFrame({
            'user_activity': user_activity,
            'historical_spend': historical_spend,
            'user_tenure': user_tenure,
            'treatment': treatment,
            'Y_obs': Y_obs,
            'Y0': Y0,
            'Y1': Y1,
            'tau': Y1 - Y0
        })
        
        return self.data
    
    def analyze_business_impact(self):
        """分析业务影响"""
        if self.data is None:
            raise ValueError("请先生成数据")
        
        # 多种方法比较
        true_ate = self.data['tau'].mean()
        
        # 朴素比较
        naive_ate = (self.data[self.data['treatment'] == 1]['Y_obs'].mean() - 
                    self.data[self.data['treatment'] == 0]['Y_obs'].mean())
        
        # 倾向得分加权
        from sklearn.linear_model import LogisticRegression
        
        X = self.data[['user_activity', 'historical_spend', 'user_tenure']]
        X['log_historical_spend'] = np.log(X['historical_spend'] + 1)
        X = X[['user_activity', 'log_historical_spend', 'user_tenure']]
        
        ps_model = LogisticRegression()
        ps_model.fit(X, self.data['treatment'])
        propensity_scores = ps_model.predict_proba(X)[:, 1]
        
        # IPW估计
        weights = np.where(
            self.data['treatment'] == 1,
            1 / propensity_scores,
            1 / (1 - propensity_scores)
        )
        
        weighted_treated = np.average(
            self.data[self.data['treatment'] == 1]['Y_obs'],
            weights=weights[self.data['treatment'] == 1]
        )
        weighted_control = np.average(
            self.data[self.data['treatment'] == 0]['Y_obs'],
            weights=weights[self.data['treatment'] == 0]
        )
        ipw_ate = weighted_treated - weighted_control
        
        # 业务指标计算
        total_treated = (self.data['treatment'] == 1).sum()
        incremental_revenue = ipw_ate * total_treated
        
        # 投资回报分析
        implementation_cost = 100000  # 算法开发成本
        roi = incremental_revenue / implementation_cost
        
        results = {
            'true_ate': true_ate,
            'naive_ate': naive_ate,
            'ipw_ate': ipw_ate,
            'incremental_revenue': incremental_revenue,
            'roi': roi,
            'total_treated_users': total_treated
        }
        
        self._plot_business_analysis(results)
        
        return results
    
    def _plot_business_analysis(self, results):
        """绘制业务分析图"""
        fig, axes = plt.subplots(2, 2, figsize=(15, 10))
        
        # ATE估计比较
        methods = ['真实ATE', '朴素估计', 'IPW估计']
        estimates = [results['true_ate'], results['naive_ate'], results['ipw_ate']]
        
        axes[0, 0].bar(methods, estimates, alpha=0.7, color=['green', 'red', 'blue'])
        axes[0, 0].set_ylabel('平均处理效应 (元)')
        axes[0, 0].set_title('推荐算法效果估计比较')
        
        # 异质性分析
        self.data['activity_group'] = pd.cut(self.data['user_activity'], 
                                           bins=[0, 1, 2, 5, 10])
        heterogeneous_effects = self.data.groupby('activity_group')['tau'].mean()
        
        axes[0, 1].bar([str(x) for x in heterogeneous_effects.index], 
                      heterogeneous_effects.values, alpha=0.7)
        axes[0, 1].set_xlabel('用户活跃度分组')
        axes[0, 1].set_ylabel('平均处理效应 (元)')
        axes[0, 1].set_title('按用户活跃度的异质性效果')
        axes[0, 1].tick_params(axis='x', rotation=45)
        
        # 业务影响
        impact_components = ['算法开发成本', '增量收入', '净收益']
        values = [100000, results['incremental_revenue'], 
                 results['incremental_revenue'] - 100000]
        
        axes[1, 0].bar(impact_components, values, alpha=0.7, 
                      color=['red', 'green', 'blue'])
        axes[1, 0].set_ylabel('金额 (元)')
        axes[1, 0].set_title('业务影响分析')
        
        # ROI分析
        axes[1, 1].bar(['投资回报率'], [results['roi']], alpha=0.7, color='orange')
        axes[1, 1].axhline(y=1, color='red', linestyle='--', label='盈亏平衡点')
        axes[1, 1].set_ylabel('ROI')
        axes[1, 1].set_title('投资回报率分析')
        axes[1, 1].legend()
        
        plt.tight_layout()
        plt.show()
        
        # 打印业务结论
        print("\n业务决策建议:")
        print(f"推荐算法的真实效果: {results['true_ate']:.1f} 元/用户")
        print(f"朴素估计会高估: {results['naive_ate']:.1f} 元/用户")
        print(f"因果推断估计: {results['ipw_ate']:.1f} 元/用户")
        print(f"受影响用户数: {results['total_treated_users']:,}")
        print(f"年增量收入: {results['incremental_revenue']:,.0f} 元")
        print(f"投资回报率: {results['roi']:.1f} 倍")
        
        if results['roi'] > 1:
            print("✅ 建议: 继续投资并推广新算法")
        else:
            print("❌ 建议: 重新评估算法或寻找优化方向")

# 运行电商案例研究
ecommerce_case = EcommerceCaseStudy()
ecommerce_data = ecommerce_case.generate_ecommerce_data(n=10000)
business_results = ecommerce_case.analyze_business_impact()

因果推断在业务决策中的价值

Parse error on line 1: timeline title 因 ^ Expecting 'open_directive', 'NEWLINE', 'SPACE', 'GRAPH', got 'ALPHA'
【声明】本内容来自华为云开发者社区博主,不代表华为云及华为云开发者社区的观点和立场。转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息,否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。