中断时间序列分析的完整实现步骤

举报
数字扫地僧 发表于 2025/09/30 17:56:06 2025/09/30
【摘要】 I. 中断时间序列分析的基本原理 核心概念与理论基础中断时间序列分析是一种准实验研究方法,其核心思想是通过建立时间序列模型,比较干预点前后序列的趋势和水平变化,从而识别干预的因果效应。 实例分析:安全带立法对交通事故死亡率的影响考虑一个经典案例:某国家在2005年1月实施强制性安全带立法,我们希望评估该政策对交通事故死亡率的影响。使用ITSA方法,我们可以:分析立法前死亡率的长期趋势检测立...

I. 中断时间序列分析的基本原理

核心概念与理论基础

中断时间序列分析是一种准实验研究方法,其核心思想是通过建立时间序列模型,比较干预点前后序列的趋势和水平变化,从而识别干预的因果效应。

实例分析:安全带立法对交通事故死亡率的影响

考虑一个经典案例:某国家在2005年1月实施强制性安全带立法,我们希望评估该政策对交通事故死亡率的影响。使用ITSA方法,我们可以:

  • 分析立法前死亡率的长期趋势
  • 检测立法实施时死亡率的即时变化(水平变化)
  • 评估立法后死亡率趋势的变化(斜率变化)
  • 区分政策效果和自然的时间趋势

数学模型框架

ITSA的基本模型可以表示为:

Yt=β0+β1Tt+β2Xt+β3TtXt+ϵtY_t = \beta_0 + \beta_1 \cdot T_t + \beta_2 \cdot X_t + \beta_3 \cdot T_t \cdot X_t + \epsilon_t

其中:

  • YtY_t:时间点t的结果变量
  • TtT_t:时间变量(从观察期开始计算)
  • XtX_t:干预虚拟变量(干预后=1,干预前=0)
  • TtXtT_t \cdot X_t:时间与干预的交互项
  • β2\beta_2:干预的即时水平变化
  • β3\beta_3:干预后的趋势变化
时间序列数据
干预前阶段
干预后阶段
基线水平 β0
基线趋势 β1
水平变化 β2
趋势变化 β3
完整ITSA模型
干预效果评估
即时效应
持续效应
总体效应

ITSA的优势与适用场景

分析场景 传统方法局限性 ITSA优势
政策效果评估 无法区分政策效果和自然趋势 分离趋势变化和水平变化
营销活动分析 忽略季节性等因素 控制时间序列的自相关性
公共卫生干预 简单前后对比可能误导 提供统计显著性检验
产品改版评估 难以量化长期影响 评估即时和持续效果

II. 数据准备与探索性分析

时间序列数据结构

ITSA要求数据具有明确的时间顺序,通常需要至少8个干预前和8个干预后的观测点以确保统计效力。

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import statsmodels.api as sm
from statsmodels.tsa.stattools import adfuller
from statsmodels.tsa.seasonal import seasonal_decompose
import warnings
warnings.filterwarnings('ignore')

class ITSADataPreparer:
    """ITSA数据准备与探索性分析类"""
    
    def __init__(self):
        self.data = None
        self.exploratory_results = {}
    
    def generate_synthetic_itsa_data(self, n_points=60, intervention_point=30, 
                                   base_level=100, trend_pre=0.5, trend_post=0.2,
                                   level_change=-15, seasonal_amplitude=5, 
                                   noise_sd=3):
        """
        生成合成ITSA数据
        
        参数:
        n_points: 总时间点数
        intervention_point: 干预时点
        base_level: 基线水平
        trend_pre: 干预前趋势
        trend_post: 干预后趋势
        level_change: 水平变化
        seasonal_amplitude: 季节性振幅
        noise_sd: 噪声标准差
        """
        np.random.seed(42)
        
        # 创建时间索引
        time_index = pd.date_range(start='2020-01-01', periods=n_points, freq='M')
        
        # 时间变量
        time = np.arange(n_points)
        
        # 干预变量
        intervention = (time >= intervention_point).astype(int)
        
        # 基础趋势
        trend = np.where(time < intervention_point, 
                        trend_pre * time, 
                        trend_pre * intervention_point + trend_post * (time - intervention_point))
        
        # 季节性成分(年度季节性)
        seasonal = seasonal_amplitude * np.sin(2 * np.pi * time / 12)
        
        # 水平变化
        level_effect = level_change * intervention
        
        # 生成结果变量
        Y = base_level + trend + seasonal + level_effect + np.random.normal(0, noise_sd, n_points)
        
        self.data = pd.DataFrame({
            'date': time_index,
            'time': time,
            'intervention': intervention,
            'Y': Y,
            'post_intervention_time': np.maximum(0, time - intervention_point)
        })
        
        # 添加干预后时间交互项
        self.data['time_after_intervention'] = self.data['post_intervention_time'] * self.data['intervention']
        
        return self.data
    
    def exploratory_analysis(self):
        """执行探索性数据分析"""
        if self.data is None:
            raise ValueError("请先生成数据")
        
        # 基本统计描述
        desc_stats = self.data[['Y', 'time', 'intervention']].describe()
        
        # 平稳性检验(ADF检验)
        adf_result = adfuller(self.data['Y'])
        
        # 季节性分解
        decomposition = seasonal_decompose(self.data.set_index('date')['Y'], model='additive', period=12)
        
        self.exploratory_results = {
            'descriptive_stats': desc_stats,
            'stationarity_test': adf_result,
            'decomposition': decomposition,
            'is_stationary': adf_result[1] < 0.05  # p值小于0.05表示平稳
        }
        
        return self.exploratory_results
    
    def visualize_exploratory_analysis(self):
        """可视化探索性分析结果"""
        if self.data is None:
            raise ValueError("请先生成数据")
        
        fig, ((ax1, ax2), (ax3, ax4)) = plt.subplots(2, 2, figsize=(15, 12))
        
        # 1. 原始时间序列图
        ax1.plot(self.data['date'], self.data['Y'], linewidth=2)
        intervention_date = self.data[self.data['intervention'] == 1]['date'].iloc[0]
        ax1.axvline(x=intervention_date, color='red', linestyle='--', 
                   label=f'干预时点: {intervention_date.strftime("%Y-%m")}')
        ax1.set_title('原始时间序列')
        ax1.set_ylabel('结果变量 Y')
        ax1.legend()
        ax1.grid(True, alpha=0.3)
        
        # 2. 自相关和偏自相关图
        from statsmodels.graphics.tsaplots import plot_acf, plot_pacf
        plot_acf(self.data['Y'], ax=ax2, lags=20)
        ax2.set_title('自相关函数 (ACF)')
        
        plot_pacf(self.data['Y'], ax=ax3, lags=20)
        ax3.set_title('偏自相关函数 (PACF)')
        
        # 3. 季节性分解
        decomposition = self.exploratory_results['decomposition']
        ax4.plot(decomposition.trend, label='趋势', linewidth=2)
        ax4.plot(decomposition.seasonal, label='季节性', alpha=0.7)
        ax4.plot(decomposition.resid, label='残差', alpha=0.5)
        ax4.axvline(x=intervention_date, color='red', linestyle='--', alpha=0.7)
        ax4.set_title('季节性分解')
        ax4.legend()
        ax4.grid(True, alpha=0.3)
        
        plt.tight_layout()
        plt.show()
        
        # 打印统计结果
        print("探索性分析结果:")
        print(f"ADF检验p值: {self.exploratory_results['stationarity_test'][1]:.4f}")
        print(f"序列是否平稳: {'是' if self.exploratory_results['is_stationary'] else '否'}")
        print(f"干预前均值: {self.data[self.data['intervention'] == 0]['Y'].mean():.2f}")
        print(f"干预后均值: {self.data[self.data['intervention'] == 1]['Y'].mean():.2f}")
        
        return fig

# 数据准备示例
data_preparer = ITSADataPreparer()
itsa_data = data_preparer.generate_synthetic_itsa_data(
    n_points=60, intervention_point=30, 
    base_level=100, trend_pre=0.5, trend_post=0.2,
    level_change=-15, seasonal_amplitude=3, noise_sd=2
)

exploratory_results = data_preparer.exploratory_analysis()
fig = data_preparer.visualize_exploratory_analysis()

数据质量检查清单

数据准备
时间序列完整性检查
异常值检测
平稳性检验
季节性分析
检查缺失值
验证时间间隔一致性
确认干预时点准确
使用箱线图识别异常值
检查测量误差
决定异常值处理方法
ADF单位根检验
可视化趋势判断
决定差分阶数
季节性分解
自相关分析
季节性模型选择
数据质量评估
数据质量是否合格?
进行ITSA分析
数据清洗与处理

III. 基础ITSA模型构建

标准ITSA模型设定

基础ITSA模型通过OLS回归估计干预效果,包含时间趋势、干预变量及其交互项。

class BasicITSAModel:
    """基础ITSA模型实现"""
    
    def __init__(self, data):
        self.data = data
        self.model = None
        self.results = {}
    
    def fit_basic_itsa(self, include_seasonality=False):
        """拟合基础ITSA模型"""
        
        # 准备变量
        X = self.data[['time', 'intervention', 'time_after_intervention']].copy()
        
        if include_seasonality:
            # 添加季节性虚拟变量(月度)
            month_dummies = pd.get_dummies(self.data['date'].dt.month, prefix='month')
            X = pd.concat([X, month_dummies.iloc[:, 1:]], axis=1)  # 避免多重共线性
        
        X = sm.add_constant(X)
        y = self.data['Y']
        
        # 拟合模型
        self.model = sm.OLS(y, X).fit()
        
        # 存储结果
        self.results = {
            'model': self.model,
            'params': self.model.params,
            'pvalues': self.model.pvalues,
            'rsquared': self.model.rsquared,
            'aic': self.model.aic
        }
        
        return self.results
    
    def interpret_effects(self):
        """解释干预效果"""
        if self.model is None:
            raise ValueError("请先拟合模型")
        
        params = self.model.params
        pvalues = self.model.pvalues
        
        # 提取关键系数
        level_change = params.get('intervention', 0)
        trend_change = params.get('time_after_intervention', 0)
        base_trend = params.get('time', 0)
        
        level_p = pvalues.get('intervention', 1)
        trend_p = pvalues.get('time_after_intervention', 1)
        
        # 计算干预后趋势
        post_trend = base_trend + trend_change
        
        # 效应解释
        interpretation = {
            'immediate_effect': {
                'estimate': level_change,
                'p_value': level_p,
                'significant': level_p < 0.05,
                'interpretation': f"干预导致水平{"上升" if level_change > 0 else "下降"}了{abs(level_change):.2f}单位"
            },
            'trend_effect': {
                'estimate': trend_change,
                'p_value': trend_p,
                'significant': trend_p < 0.05,
                'interpretation': f"干预导致趋势{"加速" if trend_change > 0 else "减速"}了{abs(trend_change):.2f}单位/期"
            },
            'baseline_trend': base_trend,
            'post_intervention_trend': post_trend
        }
        
        # 计算总体效应(在特定时间点)
        n_post_periods = len(self.data[self.data['intervention'] == 1])
        total_effect = level_change + trend_change * n_post_periods
        
        interpretation['total_effect'] = {
            'estimate': total_effect,
            'interpretation': f"在干预后{n_post_periods}期,总体效应为{total_effect:.2f}单位"
        }
        
        self.results['interpretation'] = interpretation
        return interpretation
    
    def visualize_itsa_results(self):
        """可视化ITSA结果"""
        if self.model is None:
            raise ValueError("请先拟合模型")
        
        fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(15, 6))
        
        # 1. 实际值 vs 拟合值
        ax1.plot(self.data['date'], self.data['Y'], 'o-', label='实际值', alpha=0.7, linewidth=2)
        ax1.plot(self.data['date'], self.model.fittedvalues, '--', label='拟合值', linewidth=2)
        
        intervention_date = self.data[self.data['intervention'] == 1]['date'].iloc[0]
        ax1.axvline(x=intervention_date, color='red', linestyle='--', 
                   label=f'干预时点', alpha=0.7)
        
        ax1.set_title('ITSA模型拟合结果')
        ax1.set_ylabel('结果变量 Y')
        ax1.legend()
        ax1.grid(True, alpha=0.3)
        
        # 2. 残差分析
        residuals = self.model.resid
        ax2.plot(self.data['date'], residuals, 'o-', alpha=0.7)
        ax2.axhline(y=0, color='red', linestyle='-', alpha=0.5)
        ax2.axvline(x=intervention_date, color='red', linestyle='--', alpha=0.7)
        ax2.set_title('模型残差')
        ax2.set_ylabel('残差')
        ax2.grid(True, alpha=0.3)
        
        plt.tight_layout()
        plt.show()
        
        # 打印模型摘要
        print("基础ITSA模型结果:")
        print(self.model.summary())
        
        # 打印效应解释
        interpretation = self.results.get('interpretation', {})
        if interpretation:
            print("\n干预效果解释:")
            print(f"即时效应: {interpretation['immediate_effect']['interpretation']}")
            print(f"趋势效应: {interpretation['trend_effect']['interpretation']}")
            print(f"总体效应: {interpretation['total_effect']['interpretation']}")
        
        return fig

# 基础ITSA模型示例
basic_itsa = BasicITSAModel(itsa_data)
basic_results = basic_itsa.fit_basic_itsa(include_seasonality=True)
interpretation = basic_itsa.interpret_effects()
basic_itsa.visualize_itsa_results()

ITSA模型参数解释

Lexical error on line 5. Unrecognized text. ...水平变化] A --> E[时间×干预 β3
趋势变化] ----------------------^

IV. 高级ITSA模型:处理自相关和季节性

自相关问题诊断与处理

时间序列数据常常存在自相关性,这会破坏OLS回归的假设,导致标准误估计有偏。

class AdvancedITSAModel:
    """高级ITSA模型 - 处理自相关和季节性"""
    
    def __init__(self, data):
        self.data = data
        self.models = {}
        self.diagnostics = {}
    
    def check_autocorrelation(self, model):
        """检查模型残差的自相关性"""
        from statsmodels.stats.stattools import durbin_watson
        from statsmodels.stats.diagnostic import acorr_ljungbox
        
        residuals = model.resid
        
        # Durbin-Watson检验
        dw_stat = durbin_watson(residuals)
        
        # Ljung-Box检验
        lb_test = acorr_ljungbox(residuals, lags=10, return_df=True)
        
        # 自相关函数
        acf_values = sm.tsa.acf(residuals, nlags=10)
        
        diagnostics = {
            'durbin_watson': dw_stat,
            'ljung_box': lb_test,
            'autocorrelation_function': acf_values,
            'has_autocorrelation': dw_stat < 1.5 or dw_stat > 2.5  # 简化的判断标准
        }
        
        return diagnostics
    
    def fit_arima_itsa(self, order=(1,0,0)):
        """使用ARIMA模型处理自相关"""
        
        # 准备外生变量
        exog_vars = self.data[['intervention', 'time_after_intervention']].copy()
        
        # 添加时间趋势
        exog_vars['time'] = self.data['time']
        
        # 拟合ARIMA模型
        model = sm.tsa.ARIMA(self.data['Y'], 
                           exog=exog_vars,
                           order=order).fit()
        
        # 检查自相关
        diagnostics = self.check_autocorrelation(model)
        
        self.models['arima'] = model
        self.diagnostics['arima'] = diagnostics
        
        return model, diagnostics
    
    def fit_seasonal_model(self, seasonal_period=12):
        """拟合考虑季节性的模型"""
        
        # 使用季节性虚拟变量
        X = self.data[['time', 'intervention', 'time_after_intervention']].copy()
        
        # 添加季节性虚拟变量
        for month in range(2, 13):  # 1月作为基准
            X[f'month_{month}'] = (self.data['date'].dt.month == month).astype(int)
        
        X = sm.add_constant(X)
        y = self.data['Y']
        
        model = sm.OLS(y, X).fit()
        diagnostics = self.check_autocorrelation(model)
        
        self.models['seasonal'] = model
        self.diagnostics['seasonal'] = diagnostics
        
        return model, diagnostics
    
    def compare_models(self):
        """比较不同模型的表现"""
        comparison_results = {}
        
        # 基础模型
        basic_model = BasicITSAModel(self.data)
        basic_results = basic_model.fit_basic_itsa(include_seasonality=False)
        basic_diagnostics = self.check_autocorrelation(basic_results['model'])
        
        comparison_results['basic'] = {
            'rsquared': basic_results['rsquared'],
            'aic': basic_results['aic'],
            'autocorrelation': basic_diagnostics['has_autocorrelation'],
            'dw_stat': basic_diagnostics['durbin_watson']
        }
        
        # 季节性模型
        if 'seasonal' in self.models:
            seasonal_model = self.models['seasonal']
            comparison_results['seasonal'] = {
                'rsquared': seasonal_model.rsquared,
                'aic': seasonal_model.aic,
                'autocorrelation': self.diagnostics['seasonal']['has_autocorrelation'],
                'dw_stat': self.diagnostics['seasonal']['durbin_watson']
            }
        
        # ARIMA模型
        if 'arima' in self.models:
            arima_model = self.models['arima']
            comparison_results['arima'] = {
                'rsquared': 1 - (arima_model.resid.var() / self.data['Y'].var()),
                'aic': arima_model.aic,
                'autocorrelation': self.diagnostics['arima']['has_autocorrelation'],
                'dw_stat': self.diagnostics['arima']['durbin_watson']
            }
        
        # 选择最佳模型
        best_model = min(comparison_results.keys(), 
                        key=lambda x: comparison_results[x]['aic'])
        
        comparison_results['best_model'] = best_model
        
        return comparison_results
    
    def visualize_model_comparison(self):
        """可视化模型比较结果"""
        comparison = self.compare_models()
        
        fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(15, 6))
        
        # 模型拟合指标比较
        models = list(comparison.keys())
        models.remove('best_model')
        
        aic_values = [comparison[model]['aic'] for model in models]
        rsquared_values = [comparison[model]['rsquared'] for model in models]
        
        x_pos = np.arange(len(models))
        
        bars1 = ax1.bar(x_pos - 0.2, aic_values, 0.4, label='AIC', alpha=0.7)
        bars2 = ax1.bar(x_pos + 0.2, rsquared_values, 0.4, label='R²', alpha=0.7)
        
        ax1.set_xlabel('模型类型')
        ax1.set_ylabel('指标值')
        ax1.set_title('模型比较 - AIC和R²')
        ax1.set_xticks(x_pos)
        ax1.set_xticklabels(models)
        ax1.legend()
        
        # 添加数值标签
        for bars in [bars1, bars2]:
            for bar in bars:
                height = bar.get_height()
                ax1.text(bar.get_x() + bar.get_width()/2., height,
                        f'{height:.2f}', ha='center', va='bottom')
        
        # 自相关诊断
        dw_stats = [comparison[model]['dw_stat'] for model in models]
        autocorr_status = [comparison[model]['autocorrelation'] for model in models]
        
        colors = ['red' if status else 'green' for status in autocorr_status]
        
        bars3 = ax2.bar(models, dw_stats, color=colors, alpha=0.7)
        ax2.axhline(y=2.0, color='black', linestyle='--', label='理想值(DW=2)')
        ax2.axhspan(1.5, 2.5, alpha=0.2, color='green', label='可接受范围')
        ax2.set_ylabel('Durbin-Watson统计量')
        ax2.set_title('自相关性诊断')
        ax2.legend()
        
        plt.tight_layout()
        plt.show()
        
        print("模型比较结果:")
        for model in models:
            status = "存在自相关" if comparison[model]['autocorrelation'] else "无自相关"
            print(f"{model}模型: AIC={comparison[model]['aic']:.2f}, R²={comparison[model]['rsquared']:.3f}, {status}")
        
        print(f"\n推荐模型: {comparison['best_model']}")
        
        return fig

# 高级ITSA模型示例
advanced_itsa = AdvancedITSAModel(itsa_data)

# 拟合不同模型
arima_model, arima_diag = advanced_itsa.fit_arima_itsa(order=(1,0,0))
seasonal_model, seasonal_diag = advanced_itsa.fit_seasonal_model()

# 比较模型
comparison_results = advanced_itsa.compare_models()
advanced_itsa.visualize_model_comparison()

时间序列模型选择框架

短期自相关
长期依赖
季节性自相关
时间序列数据
是否存在自相关?
是否存在季节性?
使用基础OLS模型
自相关结构?
使用AR模型
使用AR模型
使用ARIMA模型
使用SARIMA模型
模型诊断检验
残差是否白噪声?
模型选择完成
调整模型参数

V. 中断时间序列分析的统计检验

模型假设检验

为确保ITSA结果的有效性,需要进行系统的统计检验:

class ITSAStatisticalTests:
    """ITSA统计检验类"""
    
    def __init__(self, data, model):
        self.data = data
        self.model = model
        self.test_results = {}
    
    def run_comprehensive_tests(self):
        """执行全面的统计检验"""
        
        # 1. 自相关检验
        self._test_autocorrelation()
        
        # 2. 异方差性检验
        self._test_heteroscedasticity()
        
        # 3. 正态性检验
        self._test_normality()
        
        # 4. 结构稳定性检验
        self._test_structural_break()
        
        # 5. 模型设定检验
        self._test_model_specification()
        
        return self.test_results
    
    def _test_autocorrelation(self):
        """自相关性检验"""
        from statsmodels.stats.stattools import durbin_watson
        from statsmodels.stats.diagnostic import acorr_breusch_godfrey
        
        residuals = self.model.resid
        
        # Durbin-Watson检验
        dw_stat = durbin_watson(residuals)
        
        # Breusch-Godfrey检验
        bg_test = acorr_breusch_godfrey(self.model, nlags=5)
        
        self.test_results['autocorrelation'] = {
            'durbin_watson': dw_stat,
            'breusch_godfrey': {
                'statistic': bg_test[0],
                'p_value': bg_test[1],
                'significant': bg_test[1] < 0.05
            },
            'conclusion': '存在自相关' if dw_stat < 1.5 or dw_stat > 2.5 else '无自相关'
        }
    
    def _test_heteroscedasticity(self):
        """异方差性检验"""
        from statsmodels.stats.diagnostic import het_breuschpagan
        
        # Breusch-Pagan检验
        bp_test = het_breuschpagan(self.model.resid, self.model.model.exog)
        
        # White检验(简化版)
        squared_resid = self.model.resid ** 2
        white_aux = sm.OLS(squared_resid, self.model.model.exog).fit()
        white_stat = len(self.model.resid) * white_aux.rsquared
        
        self.test_results['heteroscedasticity'] = {
            'breusch_pagan': {
                'statistic': bp_test[0],
                'p_value': bp_test[1],
                'significant': bp_test[1] < 0.05
            },
            'white_test': {
                'statistic': white_stat,
                'p_value': 1 - stats.chi2.cdf(white_stat, self.model.model.exog.shape[1] - 1)
            },
            'conclusion': '存在异方差' if bp_test[1] < 0.05 else '同方差'
        }
    
    def _test_normality(self):
        """正态性检验"""
        from scipy import stats
        from statsmodels.stats.diagnostic import normal_ad
        
        residuals = self.model.resid
        
        # Anderson-Darling检验
        ad_test = normal_ad(residuals)
        
        # Shapiro-Wilk检验
        shapiro_test = stats.shapiro(residuals)
        
        # Q-Q图分析
        qq_data = sm.qqplot(residuals, line='45', fit=True)
        
        self.test_results['normality'] = {
            'anderson_darling': {
                'statistic': ad_test[0],
                'p_value': ad_test[1],
                'significant': ad_test[1] < 0.05
            },
            'shapiro_wilk': {
                'statistic': shapiro_test[0],
                'p_value': shapiro_test[1],
                'significant': shapiro_test[1] < 0.05
            },
            'conclusion': '非正态分布' if ad_test[1] < 0.05 else '近似正态分布'
        }
    
    def _test_structural_break(self):
        """结构突变检验"""
        from statsmodels.stats.diagnostic import breaks_cusumolsresid
        
        # CUSUM检验
        cusum_test = breaks_cusumolsresid(self.model.resid)
        
        # Chow检验(在干预点)
        intervention_idx = self.data[self.data['intervention'] == 1].index[0]
        
        self.test_results['structural_break'] = {
            'cusum_test': {
                'statistic': cusum_test[0],
                'critical_values': cusum_test[1],
                'significant': np.max(np.abs(cusum_test[0])) > 1  # 简化判断
            },
            'intervention_point': intervention_idx,
            'conclusion': '存在结构突变' if np.max(np.abs(cusum_test[0])) > 1 else '无结构突变'
        }
    
    def _test_model_specification(self):
        """模型设定检验"""
        # Ramsey RESET检验
        reset_test = sm.stats.linear_reset(self.model, power=2, test_type='fitted')
        
        self.test_results['specification'] = {
            'reset_test': {
                'statistic': reset_test[0],
                'p_value': reset_test[1],
                'significant': reset_test[1] < 0.05
            },
            'conclusion': '模型设定错误' if reset_test[1] < 0.05 else '模型设定正确'
        }
    
    def generate_diagnostic_report(self):
        """生成诊断报告"""
        if not self.test_results:
            self.run_comprehensive_tests()
        
        print("=" * 60)
        print("ITSA模型诊断报告")
        print("=" * 60)
        
        tests_passed = 0
        total_tests = 0
        
        for test_name, results in self.test_results.items():
            total_tests += 1
            conclusion = results.get('conclusion', '')
            passed = '问题' not in conclusion and '存在' not in conclusion and '错误' not in conclusion
            
            status = "✓ 通过" if passed else "✗ 未通过"
            if passed:
                tests_passed += 1
            
            print(f"\n{test_name.upper()}检验:")
            print(f"  状态: {status}")
            print(f"  结论: {conclusion}")
            
            # 显示关键统计量
            for key, value in results.items():
                if key != 'conclusion' and isinstance(value, dict):
                    if 'p_value' in value:
                        print(f"  {key}: 统计量={value.get('statistic', 'N/A'):.3f}, p值={value['p_value']:.3f}")
        
        print(f"\n总结: {tests_passed}/{total_tests} 项检验通过")
        
        if tests_passed == total_tests:
            print("🎉 模型诊断通过,结果可靠")
        else:
            print("⚠️ 模型存在一些问题,建议检查模型设定")
        
        return self.test_results

# 统计检验示例
statistical_tests = ITSAStatisticalTests(itsa_data, basic_itsa.model)
test_results = statistical_tests.run_comprehensive_tests()
diagnostic_report = statistical_tests.generate_diagnostic_report()

模型诊断检验框架

Parse error on line 1: timeline title I ^ Expecting 'open_directive', 'NEWLINE', 'SPACE', 'GRAPH', got 'ALPHA'

VI. 实际案例:疫情防控政策对经济活动的影响

案例背景

2020年COVID-19疫情期间,各国实施了不同的防控政策。我们分析某城市在2020年3月实施严格封控措施对零售销售额的影响。

完整分析流程

class COVIDPolicyImpactAnalysis:
    """疫情防控政策影响分析案例"""
    
    def __init__(self):
        self.data = None
        self.models = {}
        self.results = {}
    
    def generate_covid_data(self):
        """生成COVID-19政策影响数据"""
        np.random.seed(42)
        
        # 创建时间范围:2019年1月到2021年12月
        dates = pd.date_range('2019-01-01', '2021-12-31', freq='M')
        n_points = len(dates)
        
        # 基础经济趋势
        time = np.arange(n_points)
        base_trend = 0.3 * time  # 长期增长趋势
        
        # 季节性模式
        seasonal = 5 * np.sin(2 * np.pi * (time % 12) / 12)
        
        # 政策干预点(2020年3月)
        policy_date = pd.Timestamp('2020-03-01')
        policy_index = dates.get_loc(policy_date)
        policy = (time >= policy_index).astype(int)
        
        # 政策效应
        immediate_effect = -25  # 封控立即导致的销售额下降
        trend_change = -0.8     # 封控期间的额外趋势变化
        
        # 恢复效应(2020年6月开始逐步恢复)
        recovery_start = dates.get_loc(pd.Timestamp('2020-06-01'))
        recovery_effect = 0.5 * np.maximum(0, time - recovery_start)
        
        policy_effect = (policy * immediate_effect + 
                        policy * trend_change * (time - policy_index) +
                        recovery_effect)
        
        # 随机冲击(模拟疫情不确定性)
        covid_shock = np.zeros(n_points)
        shock_period = (time >= policy_index) & (time < policy_index + 6)
        covid_shock[shock_period] = np.random.normal(-5, 2, shock_period.sum())
        
        # 生成零售销售额数据
        retail_sales = (100 + base_trend + seasonal + policy_effect + 
                       covid_shock + np.random.normal(0, 2, n_points))
        
        self.data = pd.DataFrame({
            'date': dates,
            'time': time,
            'retail_sales': retail_sales,
            'policy': policy,
            'time_after_policy': np.maximum(0, time - policy_index),
            'recovery_period': (time >= recovery_start).astype(int),
            'recovery_time': np.maximum(0, time - recovery_start)
        })
        
        return self.data
    
    def comprehensive_analysis(self):
        """执行综合分析"""
        
        # 1. 探索性分析
        explorative_fig = self._exploratory_analysis()
        
        # 2. 基础ITSA模型
        basic_model = BasicITSAModel(self.data)
        basic_results = basic_model.fit_basic_itsa(include_seasonality=True)
        basic_interpretation = basic_model.interpret_effects()
        
        # 3. 考虑恢复阶段的扩展模型
        extended_model = self._fit_extended_model()
        
        # 4. 反事实分析
        counterfactual = self._counterfactual_analysis()
        
        # 5. 经济影响量化
        economic_impact = self._quantify_economic_impact()
        
        self.results.update({
            'basic_model': basic_results,
            'extended_model': extended_model,
            'counterfactual': counterfactual,
            'economic_impact': economic_impact,
            'interpretation': basic_interpretation
        })
        
        return self.results
    
    def _fit_extended_model(self):
        """拟合考虑恢复阶段的扩展模型"""
        X = self.data[['time', 'policy', 'time_after_policy', 
                      'recovery_period', 'recovery_time']].copy()
        
        # 添加月度季节性
        for month in range(2, 13):
            X[f'month_{month}'] = (self.data['date'].dt.month == month).astype(int)
        
        X = sm.add_constant(X)
        y = self.data['retail_sales']
        
        model = sm.OLS(y, X).fit()
        
        return {
            'model': model,
            'params': model.params,
            'rsquared': model.rsquared,
            'aic': model.aic
        }
    
    def _counterfactual_analysis(self):
        """反事实分析:如果没有政策会怎样"""
        # 使用政策前数据建立预测模型
        pre_policy_data = self.data[self.data['policy'] == 0].copy()
        
        # 拟合政策前趋势模型
        X_pre = pre_policy_data[['time']].copy()
        
        # 添加季节性
        for month in range(2, 13):
            X_pre[f'month_{month}'] = (pre_policy_data['date'].dt.month == month).astype(int)
        
        X_pre = sm.add_constant(X_pre)
        y_pre = pre_policy_data['retail_sales']
        
        trend_model = sm.OLS(y_pre, X_pre).fit()
        
        # 预测整个时期的反事实值
        X_full = self.data[['time']].copy()
        for month in range(2, 13):
            X_full[f'month_{month}'] = (self.data['date'].dt.month == month).astype(int)
        
        X_full = sm.add_constant(X_full)
        counterfactual_pred = trend_model.predict(X_full)
        
        # 计算政策效应
        actual_values = self.data['retail_sales']
        policy_effect = actual_values - counterfactual_pred
        
        return {
            'counterfactual_values': counterfactual_pred,
            'policy_effects': policy_effect,
            'cumulative_effect': policy_effect[self.data['policy'] == 1].sum(),
            'trend_model': trend_model
        }
    
    def _quantify_economic_impact(self):
        """量化经济影响"""
        counterfactual = self._counterfactual_analysis()
        
        # 政策期间的实际损失
        policy_period = self.data[self.data['policy'] == 1]
        total_loss = counterfactual['cumulative_effect']
        
        # 月均损失
        avg_monthly_loss = total_loss / len(policy_period)
        
        # 最大单月损失
        max_monthly_loss = counterfactual['policy_effects'].min()
        
        # 恢复时间分析
        recovery_threshold = -5  # 定义恢复阈值
        recovery_index = None
        for i, effect in enumerate(counterfactual['policy_effects']):
            if effect > recovery_threshold and i > len(self.data) // 2:
                recovery_index = i
                break
        
        recovery_time = recovery_index - self.data[self.data['policy'] == 1].index[0] if recovery_index else None
        
        return {
            'total_loss': total_loss,
            'avg_monthly_loss': avg_monthly_loss,
            'max_monthly_loss': max_monthly_loss,
            'recovery_time_months': recovery_time,
            'recovery_date': self.data.iloc[recovery_index]['date'] if recovery_index else None
        }
    
    def _exploratory_analysis(self):
        """探索性分析可视化"""
        fig, ((ax1, ax2), (ax3, ax4)) = plt.subplots(2, 2, figsize=(15, 12))
        
        # 1. 零售销售额趋势
        ax1.plot(self.data['date'], self.data['retail_sales'], linewidth=2)
        policy_date = self.data[self.data['policy'] == 1]['date'].iloc[0]
        ax1.axvline(x=policy_date, color='red', linestyle='--', 
                   label='封控政策实施', alpha=0.7)
        ax1.set_title('零售销售额趋势 (2019-2021)')
        ax1.set_ylabel('零售销售额指数')
        ax1.legend()
        ax1.grid(True, alpha=0.3)
        
        # 2. 同比变化率
        self.data['yoy_growth'] = self.data['retail_sales'].pct_change(12) * 100
        ax2.plot(self.data['date'], self.data['yoy_growth'], linewidth=2, color='orange')
        ax2.axvline(x=policy_date, color='red', linestyle='--', alpha=0.7)
        ax2.axhline(y=0, color='black', linestyle='-', alpha=0.3)
        ax2.set_title('零售销售额同比增长率')
        ax2.set_ylabel('同比增长率 (%)')
        ax2.grid(True, alpha=0.3)
        
        # 3. 季节性分解
        decomposition = seasonal_decompose(self.data.set_index('date')['retail_sales'], 
                                         model='additive', period=12)
        ax3.plot(decomposition.trend, label='趋势', linewidth=2)
        ax3.plot(decomposition.seasonal, label='季节性', alpha=0.7)
        ax3.axvline(x=policy_date, color='red', linestyle='--', alpha=0.7)
        ax3.set_title('季节性分解')
        ax3.legend()
        ax3.grid(True, alpha=0.3)
        
        # 4. 政策前后分布比较
        pre_policy = self.data[self.data['policy'] == 0]['retail_sales']
        post_policy = self.data[self.data['policy'] == 1]['retail_sales']
        
        ax4.boxplot([pre_policy, post_policy], labels=['政策前', '政策后'])
        ax4.set_title('政策前后零售销售额分布比较')
        ax4.set_ylabel('零售销售额指数')
        
        plt.tight_layout()
        plt.show()
        
        return fig
    
    def visualize_comprehensive_results(self):
        """可视化综合分析结果"""
        if not self.results:
            raise ValueError("请先执行综合分析")
        
        fig, ((ax1, ax2), (ax3, ax4)) = plt.subplots(2, 2, figsize=(15, 12))
        
        # 1. 实际值 vs 反事实值
        counterfactual = self.results['counterfactual']
        ax1.plot(self.data['date'], self.data['retail_sales'], 
                label='实际值', linewidth=2)
        ax1.plot(self.data['date'], counterfactual['counterfactual_values'], 
                '--', label='反事实值(无政策)', linewidth=2)
        policy_date = self.data[self.data['policy'] == 1]['date'].iloc[0]
        ax1.axvline(x=policy_date, color='red', linestyle='--', alpha=0.7)
        ax1.fill_between(self.data['date'], 
                        self.data['retail_sales'], 
                        counterfactual['counterfactual_values'],
                        where=(self.data['policy'] == 1),
                        alpha=0.3, color='red', label='政策效应')
        ax1.set_title('实际值与反事实值比较')
        ax1.set_ylabel('零售销售额指数')
        ax1.legend()
        ax1.grid(True, alpha=0.3)
        
        # 2. 政策效应时间路径
        ax2.plot(self.data['date'], counterfactual['policy_effects'], 
                linewidth=2, color='red')
        ax2.axhline(y=0, color='black', linestyle='-', alpha=0.3)
        ax2.axvline(x=policy_date, color='red', linestyle='--', alpha=0.7)
        ax2.set_title('政策效应时间路径')
        ax2.set_ylabel('政策效应 (实际值 - 反事实值)')
        ax2.grid(True, alpha=0.3)
        
        # 3. 累计效应
        cumulative_effect = counterfactual['policy_effects'].cumsum()
        ax3.plot(self.data['date'], cumulative_effect, linewidth=2, color='purple')
        ax3.axvline(x=policy_date, color='red', linestyle='--', alpha=0.7)
        ax3.set_title('累计政策效应')
        ax3.set_ylabel('累计损失')
        ax3.grid(True, alpha=0.3)
        
        # 4. 经济影响总结
        impact = self.results['economic_impact']
        metrics = ['总损失', '月均损失', '最大单月损失']
        values = [impact['total_loss'], impact['avg_monthly_loss'], impact['max_monthly_loss']]
        
        bars = ax4.bar(metrics, values, color=['red', 'orange', 'darkred'], alpha=0.7)
        ax4.set_title('经济影响量化')
        ax4.set_ylabel('销售额损失指数')
        
        # 添加数值标签
        for bar, value in zip(bars, values):
            height = bar.get_height()
            ax4.text(bar.get_x() + bar.get_width()/2., height,
                    f'{height:.1f}', ha='center', va='bottom')
        
        plt.tight_layout()
        plt.show()
        
        # 打印政策建议
        print("疫情防控政策经济影响分析结果:")
        print(f"政策总效应: {self.results['interpretation']['total_effect']['estimate']:.1f}")
        print(f"累计经济损失: {impact['total_loss']:.1f} 指数点")
        print(f"平均月损失: {impact['avg_monthly_loss']:.1f} 指数点")
        print(f"最大单月损失: {impact['max_monthly_loss']:.1f} 指数点")
        
        if impact['recovery_time_months']:
            print(f"恢复时间: {impact['recovery_time_months']} 个月")
            print(f"预计恢复时间: {impact['recovery_date'].strftime('%Y年%m月')}")
        
        print("\n政策启示:")
        print("1. 封控政策对经济活动有显著的短期冲击")
        print("2. 经济在政策放松后呈现恢复趋势")
        print("3. 未来类似政策需配套经济支持措施")

# 运行COVID政策影响分析
covid_analysis = COVIDPolicyImpactAnalysis()
covid_data = covid_analysis.generate_covid_data()
comprehensive_results = covid_analysis.comprehensive_analysis()
covid_analysis.visualize_comprehensive_results()

政策评估分析框架

政策干预评估
趋势分析
效应分解
反事实模拟
经济量化
干预前趋势识别
干预后趋势变化
恢复模式分析
即时效应估计
持续效应估计
季节性调整
反事实构建
效应路径分析
累计效应计算
经济损失量化
恢复时间评估
政策成本效益分析
综合政策评估
政策效果结论
政策优化建议
未来预测
【声明】本内容来自华为云开发者社区博主,不代表华为云及华为云开发者社区的观点和立场。转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息,否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。