中断时间序列分析的完整实现步骤
【摘要】 I. 中断时间序列分析的基本原理 核心概念与理论基础中断时间序列分析是一种准实验研究方法,其核心思想是通过建立时间序列模型,比较干预点前后序列的趋势和水平变化,从而识别干预的因果效应。 实例分析:安全带立法对交通事故死亡率的影响考虑一个经典案例:某国家在2005年1月实施强制性安全带立法,我们希望评估该政策对交通事故死亡率的影响。使用ITSA方法,我们可以:分析立法前死亡率的长期趋势检测立...
I. 中断时间序列分析的基本原理
核心概念与理论基础
中断时间序列分析是一种准实验研究方法,其核心思想是通过建立时间序列模型,比较干预点前后序列的趋势和水平变化,从而识别干预的因果效应。
实例分析:安全带立法对交通事故死亡率的影响
考虑一个经典案例:某国家在2005年1月实施强制性安全带立法,我们希望评估该政策对交通事故死亡率的影响。使用ITSA方法,我们可以:
- 分析立法前死亡率的长期趋势
- 检测立法实施时死亡率的即时变化(水平变化)
- 评估立法后死亡率趋势的变化(斜率变化)
- 区分政策效果和自然的时间趋势
数学模型框架
ITSA的基本模型可以表示为:
其中:
- :时间点t的结果变量
- :时间变量(从观察期开始计算)
- :干预虚拟变量(干预后=1,干预前=0)
- :时间与干预的交互项
- :干预的即时水平变化
- :干预后的趋势变化
ITSA的优势与适用场景
分析场景 | 传统方法局限性 | ITSA优势 |
---|---|---|
政策效果评估 | 无法区分政策效果和自然趋势 | 分离趋势变化和水平变化 |
营销活动分析 | 忽略季节性等因素 | 控制时间序列的自相关性 |
公共卫生干预 | 简单前后对比可能误导 | 提供统计显著性检验 |
产品改版评估 | 难以量化长期影响 | 评估即时和持续效果 |
II. 数据准备与探索性分析
时间序列数据结构
ITSA要求数据具有明确的时间顺序,通常需要至少8个干预前和8个干预后的观测点以确保统计效力。
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import statsmodels.api as sm
from statsmodels.tsa.stattools import adfuller
from statsmodels.tsa.seasonal import seasonal_decompose
import warnings
warnings.filterwarnings('ignore')
class ITSADataPreparer:
"""ITSA数据准备与探索性分析类"""
def __init__(self):
self.data = None
self.exploratory_results = {}
def generate_synthetic_itsa_data(self, n_points=60, intervention_point=30,
base_level=100, trend_pre=0.5, trend_post=0.2,
level_change=-15, seasonal_amplitude=5,
noise_sd=3):
"""
生成合成ITSA数据
参数:
n_points: 总时间点数
intervention_point: 干预时点
base_level: 基线水平
trend_pre: 干预前趋势
trend_post: 干预后趋势
level_change: 水平变化
seasonal_amplitude: 季节性振幅
noise_sd: 噪声标准差
"""
np.random.seed(42)
# 创建时间索引
time_index = pd.date_range(start='2020-01-01', periods=n_points, freq='M')
# 时间变量
time = np.arange(n_points)
# 干预变量
intervention = (time >= intervention_point).astype(int)
# 基础趋势
trend = np.where(time < intervention_point,
trend_pre * time,
trend_pre * intervention_point + trend_post * (time - intervention_point))
# 季节性成分(年度季节性)
seasonal = seasonal_amplitude * np.sin(2 * np.pi * time / 12)
# 水平变化
level_effect = level_change * intervention
# 生成结果变量
Y = base_level + trend + seasonal + level_effect + np.random.normal(0, noise_sd, n_points)
self.data = pd.DataFrame({
'date': time_index,
'time': time,
'intervention': intervention,
'Y': Y,
'post_intervention_time': np.maximum(0, time - intervention_point)
})
# 添加干预后时间交互项
self.data['time_after_intervention'] = self.data['post_intervention_time'] * self.data['intervention']
return self.data
def exploratory_analysis(self):
"""执行探索性数据分析"""
if self.data is None:
raise ValueError("请先生成数据")
# 基本统计描述
desc_stats = self.data[['Y', 'time', 'intervention']].describe()
# 平稳性检验(ADF检验)
adf_result = adfuller(self.data['Y'])
# 季节性分解
decomposition = seasonal_decompose(self.data.set_index('date')['Y'], model='additive', period=12)
self.exploratory_results = {
'descriptive_stats': desc_stats,
'stationarity_test': adf_result,
'decomposition': decomposition,
'is_stationary': adf_result[1] < 0.05 # p值小于0.05表示平稳
}
return self.exploratory_results
def visualize_exploratory_analysis(self):
"""可视化探索性分析结果"""
if self.data is None:
raise ValueError("请先生成数据")
fig, ((ax1, ax2), (ax3, ax4)) = plt.subplots(2, 2, figsize=(15, 12))
# 1. 原始时间序列图
ax1.plot(self.data['date'], self.data['Y'], linewidth=2)
intervention_date = self.data[self.data['intervention'] == 1]['date'].iloc[0]
ax1.axvline(x=intervention_date, color='red', linestyle='--',
label=f'干预时点: {intervention_date.strftime("%Y-%m")}')
ax1.set_title('原始时间序列')
ax1.set_ylabel('结果变量 Y')
ax1.legend()
ax1.grid(True, alpha=0.3)
# 2. 自相关和偏自相关图
from statsmodels.graphics.tsaplots import plot_acf, plot_pacf
plot_acf(self.data['Y'], ax=ax2, lags=20)
ax2.set_title('自相关函数 (ACF)')
plot_pacf(self.data['Y'], ax=ax3, lags=20)
ax3.set_title('偏自相关函数 (PACF)')
# 3. 季节性分解
decomposition = self.exploratory_results['decomposition']
ax4.plot(decomposition.trend, label='趋势', linewidth=2)
ax4.plot(decomposition.seasonal, label='季节性', alpha=0.7)
ax4.plot(decomposition.resid, label='残差', alpha=0.5)
ax4.axvline(x=intervention_date, color='red', linestyle='--', alpha=0.7)
ax4.set_title('季节性分解')
ax4.legend()
ax4.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
# 打印统计结果
print("探索性分析结果:")
print(f"ADF检验p值: {self.exploratory_results['stationarity_test'][1]:.4f}")
print(f"序列是否平稳: {'是' if self.exploratory_results['is_stationary'] else '否'}")
print(f"干预前均值: {self.data[self.data['intervention'] == 0]['Y'].mean():.2f}")
print(f"干预后均值: {self.data[self.data['intervention'] == 1]['Y'].mean():.2f}")
return fig
# 数据准备示例
data_preparer = ITSADataPreparer()
itsa_data = data_preparer.generate_synthetic_itsa_data(
n_points=60, intervention_point=30,
base_level=100, trend_pre=0.5, trend_post=0.2,
level_change=-15, seasonal_amplitude=3, noise_sd=2
)
exploratory_results = data_preparer.exploratory_analysis()
fig = data_preparer.visualize_exploratory_analysis()
数据质量检查清单
III. 基础ITSA模型构建
标准ITSA模型设定
基础ITSA模型通过OLS回归估计干预效果,包含时间趋势、干预变量及其交互项。
class BasicITSAModel:
"""基础ITSA模型实现"""
def __init__(self, data):
self.data = data
self.model = None
self.results = {}
def fit_basic_itsa(self, include_seasonality=False):
"""拟合基础ITSA模型"""
# 准备变量
X = self.data[['time', 'intervention', 'time_after_intervention']].copy()
if include_seasonality:
# 添加季节性虚拟变量(月度)
month_dummies = pd.get_dummies(self.data['date'].dt.month, prefix='month')
X = pd.concat([X, month_dummies.iloc[:, 1:]], axis=1) # 避免多重共线性
X = sm.add_constant(X)
y = self.data['Y']
# 拟合模型
self.model = sm.OLS(y, X).fit()
# 存储结果
self.results = {
'model': self.model,
'params': self.model.params,
'pvalues': self.model.pvalues,
'rsquared': self.model.rsquared,
'aic': self.model.aic
}
return self.results
def interpret_effects(self):
"""解释干预效果"""
if self.model is None:
raise ValueError("请先拟合模型")
params = self.model.params
pvalues = self.model.pvalues
# 提取关键系数
level_change = params.get('intervention', 0)
trend_change = params.get('time_after_intervention', 0)
base_trend = params.get('time', 0)
level_p = pvalues.get('intervention', 1)
trend_p = pvalues.get('time_after_intervention', 1)
# 计算干预后趋势
post_trend = base_trend + trend_change
# 效应解释
interpretation = {
'immediate_effect': {
'estimate': level_change,
'p_value': level_p,
'significant': level_p < 0.05,
'interpretation': f"干预导致水平{"上升" if level_change > 0 else "下降"}了{abs(level_change):.2f}单位"
},
'trend_effect': {
'estimate': trend_change,
'p_value': trend_p,
'significant': trend_p < 0.05,
'interpretation': f"干预导致趋势{"加速" if trend_change > 0 else "减速"}了{abs(trend_change):.2f}单位/期"
},
'baseline_trend': base_trend,
'post_intervention_trend': post_trend
}
# 计算总体效应(在特定时间点)
n_post_periods = len(self.data[self.data['intervention'] == 1])
total_effect = level_change + trend_change * n_post_periods
interpretation['total_effect'] = {
'estimate': total_effect,
'interpretation': f"在干预后{n_post_periods}期,总体效应为{total_effect:.2f}单位"
}
self.results['interpretation'] = interpretation
return interpretation
def visualize_itsa_results(self):
"""可视化ITSA结果"""
if self.model is None:
raise ValueError("请先拟合模型")
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(15, 6))
# 1. 实际值 vs 拟合值
ax1.plot(self.data['date'], self.data['Y'], 'o-', label='实际值', alpha=0.7, linewidth=2)
ax1.plot(self.data['date'], self.model.fittedvalues, '--', label='拟合值', linewidth=2)
intervention_date = self.data[self.data['intervention'] == 1]['date'].iloc[0]
ax1.axvline(x=intervention_date, color='red', linestyle='--',
label=f'干预时点', alpha=0.7)
ax1.set_title('ITSA模型拟合结果')
ax1.set_ylabel('结果变量 Y')
ax1.legend()
ax1.grid(True, alpha=0.3)
# 2. 残差分析
residuals = self.model.resid
ax2.plot(self.data['date'], residuals, 'o-', alpha=0.7)
ax2.axhline(y=0, color='red', linestyle='-', alpha=0.5)
ax2.axvline(x=intervention_date, color='red', linestyle='--', alpha=0.7)
ax2.set_title('模型残差')
ax2.set_ylabel('残差')
ax2.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
# 打印模型摘要
print("基础ITSA模型结果:")
print(self.model.summary())
# 打印效应解释
interpretation = self.results.get('interpretation', {})
if interpretation:
print("\n干预效果解释:")
print(f"即时效应: {interpretation['immediate_effect']['interpretation']}")
print(f"趋势效应: {interpretation['trend_effect']['interpretation']}")
print(f"总体效应: {interpretation['total_effect']['interpretation']}")
return fig
# 基础ITSA模型示例
basic_itsa = BasicITSAModel(itsa_data)
basic_results = basic_itsa.fit_basic_itsa(include_seasonality=True)
interpretation = basic_itsa.interpret_effects()
basic_itsa.visualize_itsa_results()
ITSA模型参数解释
Lexical error on line 5. Unrecognized text. ...水平变化] A --> E[时间×干预 β3趋势变化] ----------------------^
IV. 高级ITSA模型:处理自相关和季节性
自相关问题诊断与处理
时间序列数据常常存在自相关性,这会破坏OLS回归的假设,导致标准误估计有偏。
class AdvancedITSAModel:
"""高级ITSA模型 - 处理自相关和季节性"""
def __init__(self, data):
self.data = data
self.models = {}
self.diagnostics = {}
def check_autocorrelation(self, model):
"""检查模型残差的自相关性"""
from statsmodels.stats.stattools import durbin_watson
from statsmodels.stats.diagnostic import acorr_ljungbox
residuals = model.resid
# Durbin-Watson检验
dw_stat = durbin_watson(residuals)
# Ljung-Box检验
lb_test = acorr_ljungbox(residuals, lags=10, return_df=True)
# 自相关函数
acf_values = sm.tsa.acf(residuals, nlags=10)
diagnostics = {
'durbin_watson': dw_stat,
'ljung_box': lb_test,
'autocorrelation_function': acf_values,
'has_autocorrelation': dw_stat < 1.5 or dw_stat > 2.5 # 简化的判断标准
}
return diagnostics
def fit_arima_itsa(self, order=(1,0,0)):
"""使用ARIMA模型处理自相关"""
# 准备外生变量
exog_vars = self.data[['intervention', 'time_after_intervention']].copy()
# 添加时间趋势
exog_vars['time'] = self.data['time']
# 拟合ARIMA模型
model = sm.tsa.ARIMA(self.data['Y'],
exog=exog_vars,
order=order).fit()
# 检查自相关
diagnostics = self.check_autocorrelation(model)
self.models['arima'] = model
self.diagnostics['arima'] = diagnostics
return model, diagnostics
def fit_seasonal_model(self, seasonal_period=12):
"""拟合考虑季节性的模型"""
# 使用季节性虚拟变量
X = self.data[['time', 'intervention', 'time_after_intervention']].copy()
# 添加季节性虚拟变量
for month in range(2, 13): # 1月作为基准
X[f'month_{month}'] = (self.data['date'].dt.month == month).astype(int)
X = sm.add_constant(X)
y = self.data['Y']
model = sm.OLS(y, X).fit()
diagnostics = self.check_autocorrelation(model)
self.models['seasonal'] = model
self.diagnostics['seasonal'] = diagnostics
return model, diagnostics
def compare_models(self):
"""比较不同模型的表现"""
comparison_results = {}
# 基础模型
basic_model = BasicITSAModel(self.data)
basic_results = basic_model.fit_basic_itsa(include_seasonality=False)
basic_diagnostics = self.check_autocorrelation(basic_results['model'])
comparison_results['basic'] = {
'rsquared': basic_results['rsquared'],
'aic': basic_results['aic'],
'autocorrelation': basic_diagnostics['has_autocorrelation'],
'dw_stat': basic_diagnostics['durbin_watson']
}
# 季节性模型
if 'seasonal' in self.models:
seasonal_model = self.models['seasonal']
comparison_results['seasonal'] = {
'rsquared': seasonal_model.rsquared,
'aic': seasonal_model.aic,
'autocorrelation': self.diagnostics['seasonal']['has_autocorrelation'],
'dw_stat': self.diagnostics['seasonal']['durbin_watson']
}
# ARIMA模型
if 'arima' in self.models:
arima_model = self.models['arima']
comparison_results['arima'] = {
'rsquared': 1 - (arima_model.resid.var() / self.data['Y'].var()),
'aic': arima_model.aic,
'autocorrelation': self.diagnostics['arima']['has_autocorrelation'],
'dw_stat': self.diagnostics['arima']['durbin_watson']
}
# 选择最佳模型
best_model = min(comparison_results.keys(),
key=lambda x: comparison_results[x]['aic'])
comparison_results['best_model'] = best_model
return comparison_results
def visualize_model_comparison(self):
"""可视化模型比较结果"""
comparison = self.compare_models()
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(15, 6))
# 模型拟合指标比较
models = list(comparison.keys())
models.remove('best_model')
aic_values = [comparison[model]['aic'] for model in models]
rsquared_values = [comparison[model]['rsquared'] for model in models]
x_pos = np.arange(len(models))
bars1 = ax1.bar(x_pos - 0.2, aic_values, 0.4, label='AIC', alpha=0.7)
bars2 = ax1.bar(x_pos + 0.2, rsquared_values, 0.4, label='R²', alpha=0.7)
ax1.set_xlabel('模型类型')
ax1.set_ylabel('指标值')
ax1.set_title('模型比较 - AIC和R²')
ax1.set_xticks(x_pos)
ax1.set_xticklabels(models)
ax1.legend()
# 添加数值标签
for bars in [bars1, bars2]:
for bar in bars:
height = bar.get_height()
ax1.text(bar.get_x() + bar.get_width()/2., height,
f'{height:.2f}', ha='center', va='bottom')
# 自相关诊断
dw_stats = [comparison[model]['dw_stat'] for model in models]
autocorr_status = [comparison[model]['autocorrelation'] for model in models]
colors = ['red' if status else 'green' for status in autocorr_status]
bars3 = ax2.bar(models, dw_stats, color=colors, alpha=0.7)
ax2.axhline(y=2.0, color='black', linestyle='--', label='理想值(DW=2)')
ax2.axhspan(1.5, 2.5, alpha=0.2, color='green', label='可接受范围')
ax2.set_ylabel('Durbin-Watson统计量')
ax2.set_title('自相关性诊断')
ax2.legend()
plt.tight_layout()
plt.show()
print("模型比较结果:")
for model in models:
status = "存在自相关" if comparison[model]['autocorrelation'] else "无自相关"
print(f"{model}模型: AIC={comparison[model]['aic']:.2f}, R²={comparison[model]['rsquared']:.3f}, {status}")
print(f"\n推荐模型: {comparison['best_model']}")
return fig
# 高级ITSA模型示例
advanced_itsa = AdvancedITSAModel(itsa_data)
# 拟合不同模型
arima_model, arima_diag = advanced_itsa.fit_arima_itsa(order=(1,0,0))
seasonal_model, seasonal_diag = advanced_itsa.fit_seasonal_model()
# 比较模型
comparison_results = advanced_itsa.compare_models()
advanced_itsa.visualize_model_comparison()
时间序列模型选择框架
V. 中断时间序列分析的统计检验
模型假设检验
为确保ITSA结果的有效性,需要进行系统的统计检验:
class ITSAStatisticalTests:
"""ITSA统计检验类"""
def __init__(self, data, model):
self.data = data
self.model = model
self.test_results = {}
def run_comprehensive_tests(self):
"""执行全面的统计检验"""
# 1. 自相关检验
self._test_autocorrelation()
# 2. 异方差性检验
self._test_heteroscedasticity()
# 3. 正态性检验
self._test_normality()
# 4. 结构稳定性检验
self._test_structural_break()
# 5. 模型设定检验
self._test_model_specification()
return self.test_results
def _test_autocorrelation(self):
"""自相关性检验"""
from statsmodels.stats.stattools import durbin_watson
from statsmodels.stats.diagnostic import acorr_breusch_godfrey
residuals = self.model.resid
# Durbin-Watson检验
dw_stat = durbin_watson(residuals)
# Breusch-Godfrey检验
bg_test = acorr_breusch_godfrey(self.model, nlags=5)
self.test_results['autocorrelation'] = {
'durbin_watson': dw_stat,
'breusch_godfrey': {
'statistic': bg_test[0],
'p_value': bg_test[1],
'significant': bg_test[1] < 0.05
},
'conclusion': '存在自相关' if dw_stat < 1.5 or dw_stat > 2.5 else '无自相关'
}
def _test_heteroscedasticity(self):
"""异方差性检验"""
from statsmodels.stats.diagnostic import het_breuschpagan
# Breusch-Pagan检验
bp_test = het_breuschpagan(self.model.resid, self.model.model.exog)
# White检验(简化版)
squared_resid = self.model.resid ** 2
white_aux = sm.OLS(squared_resid, self.model.model.exog).fit()
white_stat = len(self.model.resid) * white_aux.rsquared
self.test_results['heteroscedasticity'] = {
'breusch_pagan': {
'statistic': bp_test[0],
'p_value': bp_test[1],
'significant': bp_test[1] < 0.05
},
'white_test': {
'statistic': white_stat,
'p_value': 1 - stats.chi2.cdf(white_stat, self.model.model.exog.shape[1] - 1)
},
'conclusion': '存在异方差' if bp_test[1] < 0.05 else '同方差'
}
def _test_normality(self):
"""正态性检验"""
from scipy import stats
from statsmodels.stats.diagnostic import normal_ad
residuals = self.model.resid
# Anderson-Darling检验
ad_test = normal_ad(residuals)
# Shapiro-Wilk检验
shapiro_test = stats.shapiro(residuals)
# Q-Q图分析
qq_data = sm.qqplot(residuals, line='45', fit=True)
self.test_results['normality'] = {
'anderson_darling': {
'statistic': ad_test[0],
'p_value': ad_test[1],
'significant': ad_test[1] < 0.05
},
'shapiro_wilk': {
'statistic': shapiro_test[0],
'p_value': shapiro_test[1],
'significant': shapiro_test[1] < 0.05
},
'conclusion': '非正态分布' if ad_test[1] < 0.05 else '近似正态分布'
}
def _test_structural_break(self):
"""结构突变检验"""
from statsmodels.stats.diagnostic import breaks_cusumolsresid
# CUSUM检验
cusum_test = breaks_cusumolsresid(self.model.resid)
# Chow检验(在干预点)
intervention_idx = self.data[self.data['intervention'] == 1].index[0]
self.test_results['structural_break'] = {
'cusum_test': {
'statistic': cusum_test[0],
'critical_values': cusum_test[1],
'significant': np.max(np.abs(cusum_test[0])) > 1 # 简化判断
},
'intervention_point': intervention_idx,
'conclusion': '存在结构突变' if np.max(np.abs(cusum_test[0])) > 1 else '无结构突变'
}
def _test_model_specification(self):
"""模型设定检验"""
# Ramsey RESET检验
reset_test = sm.stats.linear_reset(self.model, power=2, test_type='fitted')
self.test_results['specification'] = {
'reset_test': {
'statistic': reset_test[0],
'p_value': reset_test[1],
'significant': reset_test[1] < 0.05
},
'conclusion': '模型设定错误' if reset_test[1] < 0.05 else '模型设定正确'
}
def generate_diagnostic_report(self):
"""生成诊断报告"""
if not self.test_results:
self.run_comprehensive_tests()
print("=" * 60)
print("ITSA模型诊断报告")
print("=" * 60)
tests_passed = 0
total_tests = 0
for test_name, results in self.test_results.items():
total_tests += 1
conclusion = results.get('conclusion', '')
passed = '问题' not in conclusion and '存在' not in conclusion and '错误' not in conclusion
status = "✓ 通过" if passed else "✗ 未通过"
if passed:
tests_passed += 1
print(f"\n{test_name.upper()}检验:")
print(f" 状态: {status}")
print(f" 结论: {conclusion}")
# 显示关键统计量
for key, value in results.items():
if key != 'conclusion' and isinstance(value, dict):
if 'p_value' in value:
print(f" {key}: 统计量={value.get('statistic', 'N/A'):.3f}, p值={value['p_value']:.3f}")
print(f"\n总结: {tests_passed}/{total_tests} 项检验通过")
if tests_passed == total_tests:
print("🎉 模型诊断通过,结果可靠")
else:
print("⚠️ 模型存在一些问题,建议检查模型设定")
return self.test_results
# 统计检验示例
statistical_tests = ITSAStatisticalTests(itsa_data, basic_itsa.model)
test_results = statistical_tests.run_comprehensive_tests()
diagnostic_report = statistical_tests.generate_diagnostic_report()
模型诊断检验框架
Parse error on line 1: timeline title I ^ Expecting 'open_directive', 'NEWLINE', 'SPACE', 'GRAPH', got 'ALPHA'VI. 实际案例:疫情防控政策对经济活动的影响
案例背景
2020年COVID-19疫情期间,各国实施了不同的防控政策。我们分析某城市在2020年3月实施严格封控措施对零售销售额的影响。
完整分析流程
class COVIDPolicyImpactAnalysis:
"""疫情防控政策影响分析案例"""
def __init__(self):
self.data = None
self.models = {}
self.results = {}
def generate_covid_data(self):
"""生成COVID-19政策影响数据"""
np.random.seed(42)
# 创建时间范围:2019年1月到2021年12月
dates = pd.date_range('2019-01-01', '2021-12-31', freq='M')
n_points = len(dates)
# 基础经济趋势
time = np.arange(n_points)
base_trend = 0.3 * time # 长期增长趋势
# 季节性模式
seasonal = 5 * np.sin(2 * np.pi * (time % 12) / 12)
# 政策干预点(2020年3月)
policy_date = pd.Timestamp('2020-03-01')
policy_index = dates.get_loc(policy_date)
policy = (time >= policy_index).astype(int)
# 政策效应
immediate_effect = -25 # 封控立即导致的销售额下降
trend_change = -0.8 # 封控期间的额外趋势变化
# 恢复效应(2020年6月开始逐步恢复)
recovery_start = dates.get_loc(pd.Timestamp('2020-06-01'))
recovery_effect = 0.5 * np.maximum(0, time - recovery_start)
policy_effect = (policy * immediate_effect +
policy * trend_change * (time - policy_index) +
recovery_effect)
# 随机冲击(模拟疫情不确定性)
covid_shock = np.zeros(n_points)
shock_period = (time >= policy_index) & (time < policy_index + 6)
covid_shock[shock_period] = np.random.normal(-5, 2, shock_period.sum())
# 生成零售销售额数据
retail_sales = (100 + base_trend + seasonal + policy_effect +
covid_shock + np.random.normal(0, 2, n_points))
self.data = pd.DataFrame({
'date': dates,
'time': time,
'retail_sales': retail_sales,
'policy': policy,
'time_after_policy': np.maximum(0, time - policy_index),
'recovery_period': (time >= recovery_start).astype(int),
'recovery_time': np.maximum(0, time - recovery_start)
})
return self.data
def comprehensive_analysis(self):
"""执行综合分析"""
# 1. 探索性分析
explorative_fig = self._exploratory_analysis()
# 2. 基础ITSA模型
basic_model = BasicITSAModel(self.data)
basic_results = basic_model.fit_basic_itsa(include_seasonality=True)
basic_interpretation = basic_model.interpret_effects()
# 3. 考虑恢复阶段的扩展模型
extended_model = self._fit_extended_model()
# 4. 反事实分析
counterfactual = self._counterfactual_analysis()
# 5. 经济影响量化
economic_impact = self._quantify_economic_impact()
self.results.update({
'basic_model': basic_results,
'extended_model': extended_model,
'counterfactual': counterfactual,
'economic_impact': economic_impact,
'interpretation': basic_interpretation
})
return self.results
def _fit_extended_model(self):
"""拟合考虑恢复阶段的扩展模型"""
X = self.data[['time', 'policy', 'time_after_policy',
'recovery_period', 'recovery_time']].copy()
# 添加月度季节性
for month in range(2, 13):
X[f'month_{month}'] = (self.data['date'].dt.month == month).astype(int)
X = sm.add_constant(X)
y = self.data['retail_sales']
model = sm.OLS(y, X).fit()
return {
'model': model,
'params': model.params,
'rsquared': model.rsquared,
'aic': model.aic
}
def _counterfactual_analysis(self):
"""反事实分析:如果没有政策会怎样"""
# 使用政策前数据建立预测模型
pre_policy_data = self.data[self.data['policy'] == 0].copy()
# 拟合政策前趋势模型
X_pre = pre_policy_data[['time']].copy()
# 添加季节性
for month in range(2, 13):
X_pre[f'month_{month}'] = (pre_policy_data['date'].dt.month == month).astype(int)
X_pre = sm.add_constant(X_pre)
y_pre = pre_policy_data['retail_sales']
trend_model = sm.OLS(y_pre, X_pre).fit()
# 预测整个时期的反事实值
X_full = self.data[['time']].copy()
for month in range(2, 13):
X_full[f'month_{month}'] = (self.data['date'].dt.month == month).astype(int)
X_full = sm.add_constant(X_full)
counterfactual_pred = trend_model.predict(X_full)
# 计算政策效应
actual_values = self.data['retail_sales']
policy_effect = actual_values - counterfactual_pred
return {
'counterfactual_values': counterfactual_pred,
'policy_effects': policy_effect,
'cumulative_effect': policy_effect[self.data['policy'] == 1].sum(),
'trend_model': trend_model
}
def _quantify_economic_impact(self):
"""量化经济影响"""
counterfactual = self._counterfactual_analysis()
# 政策期间的实际损失
policy_period = self.data[self.data['policy'] == 1]
total_loss = counterfactual['cumulative_effect']
# 月均损失
avg_monthly_loss = total_loss / len(policy_period)
# 最大单月损失
max_monthly_loss = counterfactual['policy_effects'].min()
# 恢复时间分析
recovery_threshold = -5 # 定义恢复阈值
recovery_index = None
for i, effect in enumerate(counterfactual['policy_effects']):
if effect > recovery_threshold and i > len(self.data) // 2:
recovery_index = i
break
recovery_time = recovery_index - self.data[self.data['policy'] == 1].index[0] if recovery_index else None
return {
'total_loss': total_loss,
'avg_monthly_loss': avg_monthly_loss,
'max_monthly_loss': max_monthly_loss,
'recovery_time_months': recovery_time,
'recovery_date': self.data.iloc[recovery_index]['date'] if recovery_index else None
}
def _exploratory_analysis(self):
"""探索性分析可视化"""
fig, ((ax1, ax2), (ax3, ax4)) = plt.subplots(2, 2, figsize=(15, 12))
# 1. 零售销售额趋势
ax1.plot(self.data['date'], self.data['retail_sales'], linewidth=2)
policy_date = self.data[self.data['policy'] == 1]['date'].iloc[0]
ax1.axvline(x=policy_date, color='red', linestyle='--',
label='封控政策实施', alpha=0.7)
ax1.set_title('零售销售额趋势 (2019-2021)')
ax1.set_ylabel('零售销售额指数')
ax1.legend()
ax1.grid(True, alpha=0.3)
# 2. 同比变化率
self.data['yoy_growth'] = self.data['retail_sales'].pct_change(12) * 100
ax2.plot(self.data['date'], self.data['yoy_growth'], linewidth=2, color='orange')
ax2.axvline(x=policy_date, color='red', linestyle='--', alpha=0.7)
ax2.axhline(y=0, color='black', linestyle='-', alpha=0.3)
ax2.set_title('零售销售额同比增长率')
ax2.set_ylabel('同比增长率 (%)')
ax2.grid(True, alpha=0.3)
# 3. 季节性分解
decomposition = seasonal_decompose(self.data.set_index('date')['retail_sales'],
model='additive', period=12)
ax3.plot(decomposition.trend, label='趋势', linewidth=2)
ax3.plot(decomposition.seasonal, label='季节性', alpha=0.7)
ax3.axvline(x=policy_date, color='red', linestyle='--', alpha=0.7)
ax3.set_title('季节性分解')
ax3.legend()
ax3.grid(True, alpha=0.3)
# 4. 政策前后分布比较
pre_policy = self.data[self.data['policy'] == 0]['retail_sales']
post_policy = self.data[self.data['policy'] == 1]['retail_sales']
ax4.boxplot([pre_policy, post_policy], labels=['政策前', '政策后'])
ax4.set_title('政策前后零售销售额分布比较')
ax4.set_ylabel('零售销售额指数')
plt.tight_layout()
plt.show()
return fig
def visualize_comprehensive_results(self):
"""可视化综合分析结果"""
if not self.results:
raise ValueError("请先执行综合分析")
fig, ((ax1, ax2), (ax3, ax4)) = plt.subplots(2, 2, figsize=(15, 12))
# 1. 实际值 vs 反事实值
counterfactual = self.results['counterfactual']
ax1.plot(self.data['date'], self.data['retail_sales'],
label='实际值', linewidth=2)
ax1.plot(self.data['date'], counterfactual['counterfactual_values'],
'--', label='反事实值(无政策)', linewidth=2)
policy_date = self.data[self.data['policy'] == 1]['date'].iloc[0]
ax1.axvline(x=policy_date, color='red', linestyle='--', alpha=0.7)
ax1.fill_between(self.data['date'],
self.data['retail_sales'],
counterfactual['counterfactual_values'],
where=(self.data['policy'] == 1),
alpha=0.3, color='red', label='政策效应')
ax1.set_title('实际值与反事实值比较')
ax1.set_ylabel('零售销售额指数')
ax1.legend()
ax1.grid(True, alpha=0.3)
# 2. 政策效应时间路径
ax2.plot(self.data['date'], counterfactual['policy_effects'],
linewidth=2, color='red')
ax2.axhline(y=0, color='black', linestyle='-', alpha=0.3)
ax2.axvline(x=policy_date, color='red', linestyle='--', alpha=0.7)
ax2.set_title('政策效应时间路径')
ax2.set_ylabel('政策效应 (实际值 - 反事实值)')
ax2.grid(True, alpha=0.3)
# 3. 累计效应
cumulative_effect = counterfactual['policy_effects'].cumsum()
ax3.plot(self.data['date'], cumulative_effect, linewidth=2, color='purple')
ax3.axvline(x=policy_date, color='red', linestyle='--', alpha=0.7)
ax3.set_title('累计政策效应')
ax3.set_ylabel('累计损失')
ax3.grid(True, alpha=0.3)
# 4. 经济影响总结
impact = self.results['economic_impact']
metrics = ['总损失', '月均损失', '最大单月损失']
values = [impact['total_loss'], impact['avg_monthly_loss'], impact['max_monthly_loss']]
bars = ax4.bar(metrics, values, color=['red', 'orange', 'darkred'], alpha=0.7)
ax4.set_title('经济影响量化')
ax4.set_ylabel('销售额损失指数')
# 添加数值标签
for bar, value in zip(bars, values):
height = bar.get_height()
ax4.text(bar.get_x() + bar.get_width()/2., height,
f'{height:.1f}', ha='center', va='bottom')
plt.tight_layout()
plt.show()
# 打印政策建议
print("疫情防控政策经济影响分析结果:")
print(f"政策总效应: {self.results['interpretation']['total_effect']['estimate']:.1f}")
print(f"累计经济损失: {impact['total_loss']:.1f} 指数点")
print(f"平均月损失: {impact['avg_monthly_loss']:.1f} 指数点")
print(f"最大单月损失: {impact['max_monthly_loss']:.1f} 指数点")
if impact['recovery_time_months']:
print(f"恢复时间: {impact['recovery_time_months']} 个月")
print(f"预计恢复时间: {impact['recovery_date'].strftime('%Y年%m月')}")
print("\n政策启示:")
print("1. 封控政策对经济活动有显著的短期冲击")
print("2. 经济在政策放松后呈现恢复趋势")
print("3. 未来类似政策需配套经济支持措施")
# 运行COVID政策影响分析
covid_analysis = COVIDPolicyImpactAnalysis()
covid_data = covid_analysis.generate_covid_data()
comprehensive_results = covid_analysis.comprehensive_analysis()
covid_analysis.visualize_comprehensive_results()
政策评估分析框架
【声明】本内容来自华为云开发者社区博主,不代表华为云及华为云开发者社区的观点和立场。转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息,否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱:
cloudbbs@huaweicloud.com
- 点赞
- 收藏
- 关注作者
评论(0)