合成控制法的原理与典型案例解析
I. 合成控制法的基本理论与原理
反事实框架下的因果推断挑战
在因果推断中,我们始终面临一个根本性问题:对于接受干预的处理组,我们无法同时观测到其接受干预和未接受干预的两种状态。合成控制法的核心思想是:通过对未接受干预的控制单元进行加权组合,构建一个与处理组在干预前特征尽可能相似的"合成控制组",用这个合成控制组来模拟处理组在没有干预情况下的反事实结果。
经典案例:加利福尼亚州控烟政策的效果评估
Abadie等人(2010)使用合成控制法评估了加州1988年通过的控烟法案(Proposition 99)对香烟消费量的影响。在这个案例中:
- 处理组:加利福尼亚州
- 控制组:其他未实施类似控烟政策的州
- 干预时点:1988年
- 结果变量:人均香烟消费量
- 特征变量:干预前的人均香烟消费量、收入水平、人口结构等
数学模型框架
合成控制法的基本模型可以表示为:
设有个单元,其中第一个单元()为处理组,其余个单元()为控制组。观测时间为,干预发生在时间。
处理组的结果变量为,控制组的结果变量为()。我们希望通过控制组的加权组合来构建合成控制组:
其中权重满足且。
权重的选择通过最小化干预前处理组与合成控制组在特征变量上的差异来确定:
其中是处理组的特征向量,是控制组的特征矩阵。
合成控制法与传统方法的比较
| 方法 | 适用场景 | 优势 | 局限性 | 
|---|---|---|---|
| 合成控制法 | 单个处理单元,多个控制单元 | 避免主观选择控制组,透明可复制 | 对控制组数量和质量要求高 | 
| 双重差分法 | 多个处理单元和对照单元 | 模型简单,易于实现 | 需要平行趋势假设 | 
| 匹配方法 | 横截面数据或面板数据 | 直观易懂,减少选择性偏误 | 依赖可观测变量,忽略不可观测因素 | 
| 断点回归 | 处理分配基于连续变量阈值 | 内部有效性高 | 只能估计局部平均处理效应 | 
II. 合成控制法的数学模型与优化
权重优化的数学基础
合成控制法的核心是通过优化算法找到最优权重,使得合成控制组在干预前的特征与处理组最接近。这可以形式化为一个约束优化问题:
\begin{align*} \min_w & \quad \|X_1 - X_0 w\|_V = \sqrt{(X_1 - X_0 w)' V (X_1 - X_0 w)} \\ \text{s.t.} & \quad w_j \geq 0, \quad j = 2, \dots, J+1 \\ & \quad \sum_{j=2}^{J+1} w_j = 1 \end{align*}其中是一个对角矩阵,表示各特征变量的相对重要性。
矩阵V的选择方法
矩阵的选择对于合成控制法的结果至关重要。常用的方法包括:
- 等权重法:所有特征变量赋予相同权重
- 回归法:通过回归分析确定各变量的预测能力
- 交叉验证法:在干预前期间内进行交叉验证选择最优V
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from scipy.optimize import minimize
import cvxpy as cp
from sklearn.linear_model import LinearRegression
import warnings
warnings.filterwarnings('ignore')
class SyntheticControlOptimizer:
    """合成控制法优化器"""
    
    def __init__(self):
        self.weights = None
        self.V_matrix = None
        self.optimization_results = {}
    
    def fit_weights_simple(self, X1, X0):
        """
        简单版本的权重拟合(使用等权重V矩阵)
        
        参数:
        X1: 处理组特征向量 (k维)
        X0: 控制组特征矩阵 (k x J)
        """
        k, J = X0.shape
        
        # 使用等权重V矩阵
        self.V_matrix = np.eye(k)
        
        # 定义优化问题
        def objective(w):
            return np.sqrt((X1 - X0 @ w).T @ self.V_matrix @ (X1 - X0 @ w))
        
        # 约束条件
        constraints = [
            {'type': 'eq', 'fun': lambda w: np.sum(w) - 1},  # 权重和为1
        ]
        bounds = [(0, 1) for _ in range(J)]  # 权重非负
        
        # 初始值
        w0 = np.ones(J) / J
        
        # 优化
        result = minimize(objective, w0, method='SLSQP', 
                         bounds=bounds, constraints=constraints)
        
        if result.success:
            self.weights = result.x
            self.optimization_results['simple'] = {
                'weights': self.weights,
                'loss': result.fun,
                'success': True
            }
        else:
            raise ValueError("权重优化失败")
        
        return self.weights
    
    def fit_weights_advanced(self, X1, X0, Z1, Z0, method='regression'):
        """
        高级权重拟合方法(优化V矩阵)
        
        参数:
        X1: 处理组干预前特征
        X0: 控制组干预前特征
        Z1: 处理组干预前结果变量
        Z0: 控制组干预前结果变量
        method: V矩阵优化方法
        """
        k, J = X0.shape
        T0 = Z0.shape[0]  # 干预前期数
        
        if method == 'regression':
            # 使用回归方法确定V矩阵
            self.V_matrix = self._compute_V_regression(Z1, Z0, X1, X0)
        elif method == 'equal':
            # 等权重方法
            self.V_matrix = np.eye(k)
        else:
            raise ValueError("不支持的V矩阵优化方法")
        
        # 优化权重
        w = cp.Variable(J)
        objective = cp.Minimize(cp.quad_form(X1 - X0 @ w, self.V_matrix))
        constraints = [w >= 0, cp.sum(w) == 1]
        
        problem = cp.Problem(objective, constraints)
        result = problem.solve()
        
        if problem.status == cp.OPTIMAL:
            self.weights = w.value
            self.optimization_results['advanced'] = {
                'weights': self.weights,
                'loss': result,
                'V_matrix': self.V_matrix,
                'success': True
            }
        else:
            raise ValueError("权重优化失败")
        
        return self.weights
    
    def _compute_V_regression(self, Z1, Z0, X1, X0):
        """通过回归方法计算V矩阵"""
        T0, J = Z0.shape
        k = X0.shape[0]
        
        # 使用干预前结果变量对特征变量回归,确定各特征的重要性
        # 这里使用简化的方法:基于特征对结果变量的预测能力
        
        # 计算每个特征与结果变量的相关性
        feature_importance = np.zeros(k)
        for i in range(k):
            # 使用该特征预测结果变量
            corr_coefs = []
            for j in range(J):
                corr = np.corrcoef(X0[i, j] * np.ones(T0), Z0[:, j])[0, 1]
                if not np.isnan(corr):
                    corr_coefs.append(abs(corr))
            
            if len(corr_coefs) > 0:
                feature_importance[i] = np.mean(corr_coefs)
            else:
                feature_importance[i] = 1  # 默认值
        
        # 归一化并构建V矩阵
        if np.sum(feature_importance) > 0:
            feature_importance = feature_importance / np.sum(feature_importance)
        else:
            feature_importance = np.ones(k) / k
        
        V = np.diag(feature_importance)
        return V
    
    def compute_pretreatment_fit(self, X1, X0, Z1, Z0):
        """计算干预前拟合优度"""
        if self.weights is None:
            raise ValueError("请先拟合权重")
        
        # 特征变量拟合
        X_synthetic = X0 @ self.weights
        feature_mse = np.mean((X1 - X_synthetic)**2)
        feature_rmse = np.sqrt(feature_mse)
        
        # 结果变量拟合
        Z_synthetic = Z0 @ self.weights
        outcome_mse = np.mean((Z1 - Z_synthetic)**2)
        outcome_rmse = np.sqrt(outcome_mse)
        
        # R-squared
        ss_total = np.sum((Z1 - np.mean(Z1))**2)
        ss_residual = np.sum((Z1 - Z_synthetic)**2)
        r_squared = 1 - (ss_residual / ss_total) if ss_total > 0 else 0
        
        fit_metrics = {
            'feature_rmse': feature_rmse,
            'outcome_rmse': outcome_rmse,
            'r_squared': r_squared,
            'X_synthetic': X_synthetic,
            'Z_synthetic': Z_synthetic
        }
        
        self.optimization_results['fit_metrics'] = fit_metrics
        return fit_metrics
    
    def visualize_optimization_results(self, X1, X0, feature_names=None):
        """可视化优化结果"""
        if self.weights is None:
            raise ValueError("请先拟合权重")
        
        if feature_names is None:
            feature_names = [f'Feature_{i}' for i in range(len(X1))]
        
        X_synthetic = X0 @ self.weights
        
        fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(15, 6))
        
        # 特征变量比较
        x_pos = np.arange(len(X1))
        width = 0.35
        
        ax1.bar(x_pos - width/2, X1, width, label='处理组', alpha=0.7)
        ax1.bar(x_pos + width/2, X_synthetic, width, label='合成控制组', alpha=0.7)
        ax1.set_xlabel('特征变量')
        ax1.set_ylabel('特征值')
        ax1.set_title('处理组与合成控制组特征比较')
        ax1.set_xticks(x_pos)
        ax1.set_xticklabels(feature_names, rotation=45)
        ax1.legend()
        
        # 权重分布
        nonzero_weights = self.weights[self.weights > 0.01]  # 只显示显著权重大于0.01的控制单元
        nonzero_indices = np.where(self.weights > 0.01)[0]
        
        if len(nonzero_weights) > 0:
            ax2.bar(range(len(nonzero_weights)), nonzero_weights, alpha=0.7)
            ax2.set_xlabel('控制单元')
            ax2.set_ylabel('权重')
            ax2.set_title('合成控制权重分布(权重大于0.01)')
            ax2.set_xticks(range(len(nonzero_weights)))
            ax2.set_xticklabels([f'Unit_{i+1}' for i in nonzero_indices])
        
        plt.tight_layout()
        plt.show()
        
        # 打印拟合优度
        if 'fit_metrics' in self.optimization_results:
            metrics = self.optimization_results['fit_metrics']
            print(f"干预前拟合优度:")
            print(f"  特征变量RMSE: {metrics['feature_rmse']:.4f}")
            print(f"  结果变量RMSE: {metrics['outcome_rmse']:.4f}")
            print(f"  R-squared: {metrics['r_squared']:.4f}")
# 示例:优化过程演示
np.random.seed(42)
# 生成模拟数据
k = 5  # 特征变量数量
J = 20  # 控制单元数量
# 处理组特征(我们想要匹配的目标)
X1 = np.random.normal(0, 1, k)
# 控制组特征矩阵
X0 = np.random.normal(0, 1, (k, J))
# 干预前结果变量(用于V矩阵优化)
T0 = 10  # 干预前期数
Z1 = np.random.normal(0, 1, T0)
Z0 = np.random.normal(0, 1, (T0, J))
# 优化权重
optimizer = SyntheticControlOptimizer()
weights_simple = optimizer.fit_weights_simple(X1, X0)
weights_advanced = optimizer.fit_weights_advanced(X1, X0, Z1, Z0, method='regression')
# 计算拟合优度
fit_metrics = optimizer.compute_pretreatment_fit(X1, X0, Z1, Z0)
# 可视化结果
feature_names = [f'特征{i+1}' for i in range(k)]
optimizer.visualize_optimization_results(X1, X0, feature_names)
合成控制法优化流程
III. 经典案例重现:加州控烟政策评估
案例背景与数据准备
我们现在重现Abadie等人(2010)的经典研究,评估加州1988年控烟政策的效果。由于原始数据不易获取,我们生成符合真实数据特征的模拟数据。
class CaliforniaSmokingCase:
    """加州控烟政策评估案例"""
    
    def __init__(self):
        self.data = None
        self.synthetic_control = None
        self.results = {}
    
    def generate_smoking_data(self):
        """生成加州香烟消费量模拟数据"""
        np.random.seed(42)
        
        # 创建1970-2000年的年度数据
        years = np.arange(1970, 2001)
        n_years = len(years)
        intervention_year = 1988
        intervention_idx = np.where(years == intervention_year)[0][0]
        
        # 创建38个州的数据(加州+37个控制州)
        states = ['California'] + [f'State_{i}' for i in range(1, 38)]
        n_states = len(states)
        
        # 基础特征变量(干预前1970-1987年的均值)
        # 这些特征影响香烟消费量
        feature_names = [
            '人均收入', '青年人口比例', '啤酒消费量', 
            '卷烟价格', '教育水平'
        ]
        n_features = len(feature_names)
        
        # 生成特征数据
        features = np.zeros((n_features, n_states))
        
        # 加州特征(与其他州有系统性差异)
        features[:, 0] = [1.2, 0.8, 1.1, 0.9, 1.3]  # 加州特征
        
        # 控制州特征
        features[:, 1:] = np.random.multivariate_normal(
            mean=[1.0, 1.0, 1.0, 1.0, 1.0],
            cov=np.eye(5) * 0.1,
            size=n_states-1
        ).T
        
        # 生成香烟消费量数据
        cigarette_consumption = np.zeros((n_years, n_states))
        
        # 基础趋势和季节性
        base_trend = -0.5  # 全国性的下降趋势
        time_trend = base_trend * (years - years[0]) / 10
        
        for j in range(n_states):
            # 州特异性趋势
            state_trend = np.random.normal(0, 0.1)
            
            # 特征对消费量的影响
            feature_effect = (
                0.3 * features[0, j] +  # 收入效应
                0.4 * features[1, j] +  # 青年人口效应
                0.2 * features[2, j] +  # 啤酒消费效应
                -0.3 * features[3, j] + # 价格效应
                -0.5 * features[4, j]   # 教育效应
            )
            
            # 干预前消费量
            pre_consumption = 120 + feature_effect * 10 + state_trend * 10
            
            # 生成时间序列
            for t, year in enumerate(years):
                if year < intervention_year:
                    # 干预前:趋势 + 随机波动
                    if j == 0:  # 加州
                        consumption = (pre_consumption + time_trend[t] + 
                                     np.random.normal(0, 1))
                    else:
                        consumption = (pre_consumption + time_trend[t] + 
                                     np.random.normal(0, 2))
                else:
                    # 干预后
                    if j == 0:  # 加州有政策效应
                        policy_effect = -2 * (year - intervention_year + 1)  # 累积效应
                        consumption = (pre_consumption + time_trend[t] + 
                                     policy_effect + np.random.normal(0, 1))
                    else:
                        consumption = (pre_consumption + time_trend[t] + 
                                     np.random.normal(0, 2))
                
                cigarette_consumption[t, j] = consumption
        
        # 创建DataFrame
        years_repeated = np.repeat(years, n_states)
        states_repeated = np.tile(states, n_years)
        consumption_flat = cigarette_consumption.flatten()
        
        self.data = pd.DataFrame({
            'year': years_repeated,
            'state': states_repeated,
            'cigarette_consumption': consumption_flat
        })
        
        # 添加特征数据
        feature_data = []
        for j, state in enumerate(states):
            for feature_idx, feature_name in enumerate(feature_names):
                feature_data.append({
                    'state': state,
                    'feature': feature_name,
                    'value': features[feature_idx, j]
                })
        
        self.feature_data = pd.DataFrame(feature_data)
        
        return self.data, self.feature_data
    
    def prepare_synthetic_control_data(self):
        """准备合成控制法所需数据"""
        if self.data is None:
            raise ValueError("请先生成数据")
        
        # 提取干预前数据(1970-1987)
        pre_intervention_data = self.data[self.data['year'] < 1988]
        
        # 处理组:加州
        california_pre = pre_intervention_data[
            pre_intervention_data['state'] == 'California'
        ]['cigarette_consumption'].values
        
        # 控制组:其他州
        control_states = [state for state in self.data['state'].unique() 
                         if state != 'California']
        
        control_pre = np.column_stack([
            pre_intervention_data[
                pre_intervention_data['state'] == state
            ]['cigarette_consumption'].values 
            for state in control_states
        ])
        
        # 特征变量数据(干预前各州的特征均值)
        california_features = self.feature_data[
            self.feature_data['state'] == 'California'
        ]['value'].values
        
        control_features = np.column_stack([
            self.feature_data[
                self.feature_data['state'] == state
            ]['value'].values 
            for state in control_states
        ])
        
        return (california_pre, control_pre, california_features, 
                control_features, control_states)
    
    def fit_synthetic_control(self):
        """拟合合成控制模型"""
        (california_pre, control_pre, california_features, 
         control_features, control_states) = self.prepare_synthetic_control_data()
        
        # 使用高级优化方法
        optimizer = SyntheticControlOptimizer()
        weights = optimizer.fit_weights_advanced(
            california_features, control_features,
            california_pre, control_pre, method='regression'
        )
        
        # 计算合成控制组的消费量
        full_data = self.data.pivot(index='year', columns='state', 
                                  values='cigarette_consumption')
        
        control_states_full = [state for state in control_states 
                             if state in full_data.columns]
        
        synthetic_consumption = full_data[control_states_full] @ weights
        
        self.synthetic_control = pd.DataFrame({
            'year': full_data.index,
            'california_actual': full_data['California'],
            'california_synthetic': synthetic_consumption
        })
        
        self.results['weights'] = dict(zip(control_states, weights))
        self.results['optimizer'] = optimizer
        
        return self.synthetic_control
    
    def calculate_treatment_effects(self):
        """计算处理效应"""
        if self.synthetic_control is None:
            raise ValueError("请先拟合合成控制模型")
        
        synth_df = self.synthetic_control
        
        # 计算干预效应
        synth_df['treatment_effect'] = (
            synth_df['california_actual'] - synth_df['california_synthetic']
        )
        
        # 区分干预前后
        synth_df['period'] = np.where(synth_df['year'] < 1988, 'Pre', 'Post')
        
        # 计算累计效应
        post_period = synth_df[synth_df['period'] == 'Post']
        cumulative_effect = post_period['treatment_effect'].sum()
        
        self.results['treatment_effects'] = synth_df
        self.results['cumulative_effect'] = cumulative_effect
        
        return synth_df, cumulative_effect
    
    def visualize_results(self):
        """可视化结果"""
        if self.synthetic_control is None:
            raise ValueError("请先拟合合成控制模型")
        
        synth_df = self.synthetic_control
        
        fig, ((ax1, ax2), (ax3, ax4)) = plt.subplots(2, 2, figsize=(15, 12))
        
        # 1. 香烟消费量趋势
        ax1.plot(synth_df['year'], synth_df['california_actual'], 
                label='加州实际值', linewidth=2, color='red')
        ax1.plot(synth_df['year'], synth_df['california_synthetic'], 
                label='合成加州', linewidth=2, color='blue', linestyle='--')
        ax1.axvline(x=1988, color='black', linestyle='--', alpha=0.7, 
                   label='控烟政策实施 (1988)')
        ax1.set_xlabel('年份')
        ax1.set_ylabel('人均香烟消费量')
        ax1.set_title('加州控烟政策效果: 实际值 vs 合成控制')
        ax1.legend()
        ax1.grid(True, alpha=0.3)
        
        # 2. 处理效应时间路径
        ax2.plot(synth_df['year'], synth_df['treatment_effect'], 
                linewidth=2, color='green')
        ax2.axvline(x=1988, color='black', linestyle='--', alpha=0.7)
        ax2.axhline(y=0, color='black', linestyle='-', alpha=0.3)
        ax2.set_xlabel('年份')
        ax2.set_ylabel('处理效应')
        ax2.set_title('控烟政策的处理效应时间路径')
        ax2.grid(True, alpha=0.3)
        
        # 3. 权重分布
        weights = self.results['weights']
        significant_weights = {k: v for k, v in weights.items() if v > 0.01}
        
        if significant_weights:
            states = list(significant_weights.keys())
            weights_values = list(significant_weights.values())
            
            ax3.bar(range(len(states)), weights_values, alpha=0.7)
            ax3.set_xlabel('控制州')
            ax3.set_ylabel('权重')
            ax3.set_title('合成控制权重分布 (权重大于0.01)')
            ax3.set_xticks(range(len(states)))
            ax3.set_xticklabels(states, rotation=45)
        
        # 4. 干预前后效应对比
        pre_effect = synth_df[synth_df['year'] < 1988]['treatment_effect'].mean()
        post_effect = synth_df[synth_df['year'] >= 1988]['treatment_effect'].mean()
        
        periods = ['干预前 (1970-1987)', '干预后 (1988-2000)']
        effects = [pre_effect, post_effect]
        
        colors = ['gray', 'red'] if post_effect < pre_effect else ['gray', 'blue']
        bars = ax4.bar(periods, effects, color=colors, alpha=0.7)
        ax4.set_ylabel('平均处理效应')
        ax4.set_title('干预前后平均处理效应比较')
        
        # 添加数值标签
        for bar, effect in zip(bars, effects):
            height = bar.get_height()
            ax4.text(bar.get_x() + bar.get_width()/2., height,
                    f'{effect:.2f}', ha='center', va='bottom')
        
        plt.tight_layout()
        plt.show()
        
        # 打印关键结果
        cumulative_effect = self.results['cumulative_effect']
        print("加州控烟政策评估结果:")
        print(f"干预前平均效应: {pre_effect:.2f} (应该接近0)")
        print(f"干预后平均效应: {post_effect:.2f}")
        print(f"累计处理效应: {cumulative_effect:.2f}")
        
        if post_effect < 0:
            print("结论: 控烟政策显著降低了加州的香烟消费量")
        else:
            print("结论: 未发现控烟政策有显著效果")
# 运行加州控烟案例
california_case = CaliforniaSmokingCase()
smoking_data, feature_data = california_case.generate_smoking_data()
synthetic_control = california_case.fit_synthetic_control()
treatment_effects, cumulative_effect = california_case.calculate_treatment_effects()
california_case.visualize_results()
加州控烟政策评估框架
IV. 统计推断与稳健性检验
安慰剂检验(Placebo Test)
安慰剂检验是合成控制法中最重要的统计推断方法,通过将处理组假设应用到控制组来检验结果的显著性。
class PlaceboTest:
    """安慰剂检验类"""
    
    def __init__(self, data, feature_data):
        self.data = data
        self.feature_data = feature_data
        self.placebo_results = {}
    
    def run_placebo_tests(self, n_placebo=20):
        """运行安慰剂检验"""
        placebo_effects = []
        placebo_weights = {}
        
        control_states = [state for state in self.data['state'].unique() 
                         if state != 'California']
        
        # 随机选择n_placebo个控制州进行安慰剂检验
        selected_states = np.random.choice(control_states, 
                                         size=min(n_placebo, len(control_states)), 
                                         replace=False)
        
        for i, state in enumerate(selected_states):
            print(f"进行安慰剂检验: {state} ({i+1}/{len(selected_states)})")
            
            try:
                effect = self._single_placebo_test(state)
                placebo_effects.append({
                    'state': state,
                    'treatment_effect': effect,
                    'pre_intervention_fit': self._calculate_pre_intervention_fit(state)
                })
            except Exception as e:
                print(f"州 {state} 的安慰剂检验失败: {e}")
                continue
        
        self.placebo_results['effects'] = placebo_effects
        return placebo_effects
    
    def _single_placebo_test(self, placebo_state):
        """对单个州进行安慰剂检验"""
        # 准备数据:将当前州作为假想的处理组
        pre_data = self.data[self.data['year'] < 1988]
        
        # 假想处理组
        placebo_pre = pre_data[pre_data['state'] == placebo_state]['cigarette_consumption'].values
        
        # 控制组:其他州(不包括真处理组加州)
        other_states = [state for state in self.data['state'].unique() 
                       if state not in ['California', placebo_state]]
        
        control_pre = np.column_stack([
            pre_data[pre_data['state'] == state]['cigarette_consumption'].values 
            for state in other_states
        ])
        
        # 特征变量
        placebo_features = self.feature_data[
            self.feature_data['state'] == placebo_state
        ]['value'].values
        
        control_features = np.column_stack([
            self.feature_data[self.feature_data['state'] == state]['value'].values 
            for state in other_states
        ])
        
        # 拟合合成控制
        optimizer = SyntheticControlOptimizer()
        weights = optimizer.fit_weights_advanced(
            placebo_features, control_features,
            placebo_pre, control_pre, method='regression'
        )
        
        # 计算处理效应
        full_data = self.data.pivot(index='year', columns='state', 
                                  values='cigarette_consumption')
        
        control_states_full = [state for state in other_states 
                             if state in full_data.columns]
        
        synthetic_consumption = full_data[control_states_full] @ weights
        actual_consumption = full_data[placebo_state]
        
        # 计算干预后平均效应
        post_period = full_data.index >= 1988
        treatment_effect = (actual_consumption[post_period] - 
                          synthetic_consumption[post_period]).mean()
        
        return treatment_effect
    
    def _calculate_pre_intervention_fit(self, state):
        """计算干预前拟合优度"""
        pre_data = self.data[self.data['year'] < 1988]
        
        state_pre = pre_data[pre_data['state'] == state]['cigarette_consumption'].values
        
        # 这里使用简化的拟合优度计算
        # 在实际应用中应该重新拟合模型并计算RMSE
        return np.std(state_pre)  # 简化版本
    
    def visualize_placebo_results(self, california_effect):
        """可视化安慰剂检验结果"""
        if not self.placebo_results.get('effects'):
            raise ValueError("请先运行安慰剂检验")
        
        placebo_effects = self.placebo_results['effects']
        
        # 提取处理效应和拟合优度
        effects = [result['treatment_effect'] for result in placebo_effects]
        states = [result['state'] for result in placebo_effects]
        pre_fit = [result['pre_intervention_fit'] for result in placebo_effects]
        
        fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(15, 6))
        
        # 1. 安慰剂效应分布
        ax1.hist(effects, bins=15, alpha=0.7, color='gray', edgecolor='black')
        ax1.axvline(x=california_effect, color='red', linewidth=2, 
                   label=f'加州真实效应: {california_effect:.2f}')
        ax1.axvline(x=np.mean(effects), color='blue', linestyle='--',
                   label=f'安慰剂平均效应: {np.mean(effects):.2f}')
        ax1.set_xlabel('处理效应')
        ax1.set_ylabel('频数')
        ax1.set_title('安慰剂检验: 处理效应分布')
        ax1.legend()
        ax1.grid(True, alpha=0.3)
        
        # 2. 效应 vs 干预前拟合优度
        ax2.scatter(pre_fit, effects, alpha=0.7, s=60)
        ax2.axhline(y=california_effect, color='red', linewidth=2, 
                   label='加州真实效应')
        ax2.axhline(y=0, color='black', linestyle='-', alpha=0.3)
        
        # 标记加州点(使用平均拟合优度)
        california_pre_fit = np.mean(pre_fit)  # 简化处理
        ax2.scatter([california_pre_fit], [california_effect], 
                   color='red', s=100, marker='*', label='加州')
        
        ax2.set_xlabel('干预前拟合优度(标准差)')
        ax2.set_ylabel('处理效应')
        ax2.set_title('处理效应 vs 干预前拟合优度')
        ax2.legend()
        ax2.grid(True, alpha=0.3)
        
        plt.tight_layout()
        plt.show()
        
        # 计算p值
        all_effects = effects + [california_effect]
        california_rank = np.sum(np.array(all_effects) <= california_effect)
        p_value = california_rank / len(all_effects)
        
        print("安慰剂检验结果:")
        print(f"加州真实效应: {california_effect:.2f}")
        print(f"安慰剂效应均值: {np.mean(effects):.2f}")
        print(f"安慰剂效应标准差: {np.std(effects):.2f}")
        print(f"加州效应在分布中的排名: {california_rank}/{len(all_effects)}")
        print(f"经验p值: {p_value:.3f}")
        
        if p_value < 0.05:
            print("✅ 结果在5%水平上统计显著")
        else:
            print("❌ 结果在5%水平上不显著")
        
        return p_value
# 运行安慰剂检验
placebo_tester = PlaceboTest(smoking_data, feature_data)
placebo_effects = placebo_tester.run_placebo_tests(n_placebo=30)
# 获取加州的真实效应
california_effect = treatment_effects[
    treatment_effects['year'] >= 1988
]['treatment_effect'].mean()
p_value = placebo_tester.visualize_placebo_results(california_effect)
统计推断方法比较
| 方法 | 原理 | 优点 | 缺点 | 
|---|---|---|---|
| 安慰剂检验 | 将处理组假设应用于控制组 | 直观,不依赖分布假设 | 需要足够多的控制单元 | 
| 排列检验 | 随机排列处理组分配 | 提供精确p值 | 计算量大 | 
| bootstrap | 重抽样构建置信区间 | 适用性广 | 可能低估不确定性 | 
| 刀切法 | 依次删除控制单元 | 稳健性检验 | 对异常值敏感 | 
V. 合成控制法的扩展与变体
多种合成控制方法的比较
除了经典的合成控制法,研究者还提出了多种扩展方法以适应不同的应用场景。
class ExtendedSyntheticControl:
    """扩展的合成控制方法"""
    
    def __init__(self, data, feature_data):
        self.data = data
        self.feature_data = feature_data
        self.results = {}
    
    def fit_classical_scm(self):
        """经典合成控制法"""
        return california_case.fit_synthetic_control()
    
    def fit_ridge_synthetic_control(self, alpha=0.1):
        """岭回归合成控制法(允许负权重)"""
        california_pre, control_pre, california_features, control_features, control_states = \
            california_case.prepare_synthetic_control_data()
        
        from sklearn.linear_model import Ridge
        
        # 使用岭回归拟合权重(允许负权重但施加L2惩罚)
        ridge = Ridge(alpha=alpha, fit_intercept=False)
        ridge.fit(control_features.T, california_features)
        
        weights = ridge.coef_
        
        # 计算合成控制组
        full_data = self.data.pivot(index='year', columns='state', 
                                  values='cigarette_consumption')
        
        control_states_full = [state for state in control_states 
                             if state in full_data.columns]
        
        synthetic_consumption = full_data[control_states_full] @ weights
        
        result_df = pd.DataFrame({
            'year': full_data.index,
            'california_actual': full_data['California'],
            'california_synthetic': synthetic_consumption,
            'method': 'Ridge SCM'
        })
        
        self.results['ridge'] = {
            'weights': weights,
            'synthetic_series': synthetic_consumption,
            'result_df': result_df
        }
        
        return result_df
    
    def fit_constrained_lasso(self, alpha=0.01):
        """约束LASSO合成控制法"""
        from sklearn.linear_model import Lasso
        
        california_pre, control_pre, california_features, control_features, control_states = \
            california_case.prepare_synthetic_control_data()
        
        # LASSO回归(自动特征选择)
        lasso = Lasso(alpha=alpha, positive=True, fit_intercept=False)
        lasso.fit(control_features.T, california_features)
        
        weights = lasso.coef_
        
        # 归一化权重(近似的约束)
        if weights.sum() > 0:
            weights = weights / weights.sum()
        
        # 计算合成控制组
        full_data = self.data.pivot(index='year', columns='state', 
                                  values='cigarette_consumption')
        
        control_states_full = [state for state in control_states 
                             if state in full_data.columns]
        
        synthetic_consumption = full_data[control_states_full] @ weights
        
        result_df = pd.DataFrame({
            'year': full_data.index,
            'california_actual': full_data['California'],
            'california_synthetic': synthetic_consumption,
            'method': 'Constrained LASSO'
        })
        
        self.results['lasso'] = {
            'weights': weights,
            'synthetic_series': synthetic_consumption,
            'result_df': result_df
        }
        
        return result_df
    
    def fit_elastic_net_synthetic_control(self, alpha=0.01, l1_ratio=0.5):
        """弹性网络合成控制法"""
        from sklearn.linear_model import ElasticNet
        
        california_pre, control_pre, california_features, control_features, control_states = \
            california_case.prepare_synthetic_control_data()
        
        # 弹性网络回归
        enet = ElasticNet(alpha=alpha, l1_ratio=l1_ratio, positive=True, fit_intercept=False)
        enet.fit(control_features.T, california_features)
        
        weights = enet.coef_
        
        # 归一化权重
        if weights.sum() > 0:
            weights = weights / weights.sum()
        
        # 计算合成控制组
        full_data = self.data.pivot(index='year', columns='state', 
                                  values='cigarette_consumption')
        
        control_states_full = [state for state in control_states 
                             if state in full_data.columns]
        
        synthetic_consumption = full_data[control_states_full] @ weights
        
        result_df = pd.DataFrame({
            'year': full_data.index,
            'california_actual': full_data['California'],
            'california_synthetic': synthetic_consumption,
            'method': 'Elastic Net SCM'
        })
        
        self.results['elastic_net'] = {
            'weights': weights,
            'synthetic_series': synthetic_consumption,
            'result_df': result_df
        }
        
        return result_df
    
    def compare_methods(self):
        """比较不同方法的结果"""
        # 拟合各种方法
        classical_result = self.fit_classical_scm()
        ridge_result = self.fit_ridge_synthetic_control()
        lasso_result = self.fit_constrained_lasso()
        enet_result = self.fit_elastic_net_synthetic_control()
        
        # 组合结果
        all_results = pd.concat([
            classical_result.assign(method='Classical SCM'),
            ridge_result,
            lasso_result,
            enet_result
        ])
        
        # 计算各方法的处理效应
        methods = all_results['method'].unique()
        comparison_metrics = {}
        
        for method in methods:
            method_data = all_results[all_results['method'] == method].copy()
            method_data['treatment_effect'] = (
                method_data['california_actual'] - method_data['california_synthetic']
            )
            
            # 干预前拟合优度
            pre_data = method_data[method_data['year'] < 1988]
            pre_rmse = np.sqrt(np.mean(pre_data['treatment_effect']**2))
            
            # 干预后平均效应
            post_data = method_data[method_data['year'] >= 1988]
            post_effect = post_data['treatment_effect'].mean()
            
            comparison_metrics[method] = {
                'pre_rmse': pre_rmse,
                'post_effect': post_effect,
                'n_weights': self._count_nonzero_weights(method)
            }
        
        self.results['comparison'] = comparison_metrics
        return all_results, comparison_metrics
    
    def _count_nonzero_weights(self, method):
        """计算非零权重数量"""
        if method in self.results:
            weights = self.results[method]['weights']
            return np.sum(weights > 0.01)
        return 0
    
    def visualize_method_comparison(self):
        """可视化方法比较结果"""
        if 'comparison' not in self.results:
            raise ValueError("请先比较不同方法")
        
        comparison_metrics = self.results['comparison']
        
        fig, (ax1, ax2, ax3) = plt.subplots(1, 3, figsize=(18, 6))
        
        methods = list(comparison_metrics.keys())
        pre_rmse = [comparison_metrics[m]['pre_rmse'] for m in methods]
        post_effect = [comparison_metrics[m]['post_effect'] for m in methods]
        n_weights = [comparison_metrics[m]['n_weights'] for m in methods]
        
        # 1. 干预前拟合优度比较
        bars1 = ax1.bar(methods, pre_rmse, alpha=0.7, color='blue')
        ax1.set_ylabel('干预前RMSE')
        ax1.set_title('干预前拟合优度比较')
        ax1.set_xticklabels(methods, rotation=45)
        
        # 添加数值标签
        for bar, value in zip(bars1, pre_rmse):
            ax1.text(bar.get_x() + bar.get_width()/2, bar.get_height() + 0.01,
                    f'{value:.3f}', ha='center', va='bottom')
        
        # 2. 干预后效应比较
        colors2 = ['red' if effect < 0 else 'blue' for effect in post_effect]
        bars2 = ax2.bar(methods, post_effect, alpha=0.7, color=colors2)
        ax2.axhline(y=0, color='black', linestyle='-', alpha=0.3)
        ax2.set_ylabel('干预后平均效应')
        ax2.set_title('干预后处理效应比较')
        ax2.set_xticklabels(methods, rotation=45)
        
        for bar, value in zip(bars2, post_effect):
            ax2.text(bar.get_x() + bar.get_width()/2, bar.get_height() + 0.01,
                    f'{value:.2f}', ha='center', va='bottom')
        
        # 3. 使用的控制单元数量
        bars3 = ax3.bar(methods, n_weights, alpha=0.7, color='green')
        ax3.set_ylabel('使用的控制单元数量')
        ax3.set_title('模型稀疏性比较')
        ax3.set_xticklabels(methods, rotation=45)
        
        for bar, value in zip(bars3, n_weights):
            ax3.text(bar.get_x() + bar.get_width()/2, bar.get_height() + 0.1,
                    f'{value}', ha='center', va='bottom')
        
        plt.tight_layout()
        plt.show()
        
        # 打印比较结果
        print("合成控制方法比较结果:")
        for method in methods:
            metrics = comparison_metrics[method]
            print(f"\n{method}:")
            print(f"  干预前RMSE: {metrics['pre_rmse']:.4f}")
            print(f"  干预后效应: {metrics['post_effect']:.2f}")
            print(f"  使用的控制单元: {metrics['n_weights']}")
# 扩展方法比较
extended_scm = ExtendedSyntheticControl(smoking_data, feature_data)
all_results, comparison_metrics = extended_scm.compare_methods()
extended_scm.visualize_method_comparison()
合成控制法扩展方法框架
VI. 实际应用指南与最佳实践
合成控制法实施步骤总结
基于理论分析和案例实践,我们总结出合成控制法的标准实施流程:
| 步骤 | 关键任务 | 注意事项 | 
|---|---|---|
| 1. 问题定义 | 明确处理组、干预时点、结果变量 | 确保干预外生性,避免选择偏误 | 
| 2. 数据准备 | 收集处理组和控制组的面板数据 | 保证数据质量,处理缺失值 | 
| 3. 特征选择 | 选择预测结果变量的特征 | 避免过度拟合,选择经济意义明确的变量 | 
| 4. 模型拟合 | 优化权重,构建合成控制组 | 检查权重合理性,避免极端权重 | 
| 5. 拟合优度评估 | 检验干预前拟合质量 | 确保合成控制组能很好复制处理组 | 
| 6. 效应估计 | 计算干预效应 | 区分即时效应和持续效应 | 
| 7. 统计推断 | 进行安慰剂检验等 | 提供统计显著性证据 | 
| 8. 稳健性检验 | 尝试不同模型设定 | 确保结果对模型选择不敏感 | 
| 9. 结果解释 | 结合背景知识解释效应 | 考虑经济意义和实际重要性 | 
常见陷阱与避免方法
class PracticalGuidelines:
    """合成控制法实践指南"""
    
    @staticmethod
    def check_application_suitability(n_control_units, n_pre_periods, n_features):
        """检查合成控制法的适用性"""
        suitability_scores = {}
        
        # 控制单元数量
        if n_control_units >= 20:
            suitability_scores['control_units'] = {'score': 5, 'comment': '充足'}
        elif n_control_units >= 10:
            suitability_scores['control_units'] = {'score': 3, 'comment': '一般'}
        else:
            suitability_scores['control_units'] = {'score': 1, 'comment': '不足'}
        
        # 干预前期数
        if n_pre_periods >= 10:
            suitability_scores['pre_periods'] = {'score': 5, 'comment': '充足'}
        elif n_pre_periods >= 5:
            suitability_scores['pre_periods'] = {'score': 3, 'comment': '一般'}
        else:
            suitability_scores['pre_periods'] = {'score': 1, 'comment': '不足'}
        
        # 特征变量数量
        if n_features >= 3:
            suitability_scores['features'] = {'score': 5, 'comment': '充足'}
        elif n_features >= 1:
            suitability_scores['features'] = {'score': 3, 'comment': '一般'}
        else:
            suitability_scores['features'] = {'score': 1, 'comment': '不足'}
        
        # 总体评估
        total_score = sum([s['score'] for s in suitability_scores.values()])
        max_score = 15
        
        if total_score >= 12:
            overall_comment = "非常适合应用合成控制法"
        elif total_score >= 9:
            overall_comment = "比较适合应用合成控制法"
        else:
            overall_comment = "需要谨慎应用合成控制法"
        
        suitability_scores['overall'] = {
            'score': total_score,
            'max_score': max_score,
            'comment': overall_comment
        }
        
        return suitability_scores
    
    @staticmethod
    def diagnose_common_issues(weights, pre_treatment_fit, placebo_p_value):
        """诊断常见问题"""
        issues = []
        recommendations = []
        
        # 检查权重分布
        if np.max(weights) > 0.8:
            issues.append("某个控制单元权重过高")
            recommendations.append("考虑增加控制单元或使用正则化方法")
        
        if np.sum(weights > 0.01) < 2:
            issues.append("使用的控制单元过少")
            recommendations.append("检查特征变量选择,考虑放松权重约束")
        
        # 检查干预前拟合
        if pre_treatment_fit > 0.1:  # 假设RMSE阈值
            issues.append("干预前拟合不佳")
            recommendations.append("增加干预前期数或改进特征变量")
        
        # 检查统计显著性
        if placebo_p_value > 0.1:
            issues.append("统计显著性不足")
            recommendations.append("增加安慰剂检验次数或收集更多数据")
        
        return {
            'issues': issues,
            'recommendations': recommendations,
            'has_issues': len(issues) > 0
        }
    
    @staticmethod
    def generate_report(application_suitability, diagnosis_results, treatment_effect):
        """生成分析报告"""
        print("=" * 60)
        print("合成控制法分析报告")
        print("=" * 60)
        
        print("\n1. 应用适用性评估:")
        for criterion, assessment in application_suitability.items():
            if criterion != 'overall':
                print(f"   {criterion}: {assessment['comment']} (分数: {assessment['score']})")
        
        overall = application_suitability['overall']
        print(f"   总体评估: {overall['comment']} (总分: {overall['score']}/{overall['max_score']})")
        
        print("\n2. 问题诊断:")
        if diagnosis_results['has_issues']:
            for issue in diagnosis_results['issues']:
                print(f"   ⚠️  {issue}")
        else:
            print("   ✅ 未发现明显问题")
        
        print("\n3. 处理效应估计:")
        print(f"   平均处理效应: {treatment_effect:.4f}")
        
        print("\n4. 建议:")
        if diagnosis_results['has_issues']:
            for recommendation in diagnosis_results['recommendations']:
                print(f"   💡 {recommendation}")
        else:
            print("   🎉 分析质量良好,结果可信")
# 应用指南示例
guidelines = PracticalGuidelines()
# 检查适用性
suitability = guidelines.check_application_suitability(
    n_control_units=37,  # 控制州数量
    n_pre_periods=18,    # 1970-1987
    n_features=5         # 特征变量数量
)
# 诊断问题
diagnosis = guidelines.diagnose_common_issues(
    weights=list(california_case.results['weights'].values()),
    pre_treatment_fit=0.05,  # 假设的RMSE
    placebo_p_value=0.03     # 安慰剂检验p值
)
# 生成报告
guidelines.generate_report(suitability, diagnosis, california_effect)
合成控制法决策流程图
- 点赞
- 收藏
- 关注作者
 
             
           
评论(0)