241 lines
10 KiB
Python
241 lines
10 KiB
Python
from __future__ import annotations
|
||
import pdfkit
|
||
from bs4 import BeautifulSoup
|
||
import numpy as np
|
||
import pandas as pd
|
||
import matplotlib.pyplot as plt
|
||
import statsmodels.api as sm
|
||
from statsmodels.tsa.stattools import adfuller as ADF
|
||
from statsmodels.stats.diagnostic import acorr_ljungbox
|
||
from statsmodels.graphics.tsaplots import plot_acf, plot_pacf
|
||
from statsmodels.tsa.arima.model import ARIMA
|
||
from statsmodels.graphics.api import qqplot
|
||
from statsmodels.stats.stattools import durbin_watson
|
||
from scipy import stats
|
||
import warnings
|
||
|
||
from lib.tools import DeepSeek
|
||
warnings.filterwarnings("ignore")
|
||
|
||
plt.rcParams['font.sans-serif'] = ['SimHei']
|
||
plt.rcParams['axes.unicode_minus'] = False
|
||
|
||
|
||
class ARIMAReportGenerator(DeepSeek):
|
||
def __init__(self, data, forecast_steps=7):
|
||
super().__init__()
|
||
self.data = data
|
||
self.forecast_steps = forecast_steps
|
||
self.model = None
|
||
self.diff_num = 0
|
||
self.report_content = []
|
||
self.figure_paths = {}
|
||
|
||
def _save_figure(self, fig_name):
|
||
"""统一保存图表并记录路径"""
|
||
path = f"{fig_name}.png"
|
||
plt.savefig(path, dpi=300, bbox_inches='tight')
|
||
plt.close()
|
||
self.figure_paths[fig_name] = path
|
||
return path
|
||
|
||
def _add_report_section(self, title, content, level=2):
|
||
"""添加报告章节"""
|
||
self.report_content.append(f"{'#'*level} {title}\n{content}\n")
|
||
|
||
def plot_forecast(self, predicted_mean, conf_int):
|
||
"""预测结果可视化"""
|
||
plt.figure(figsize=(12, 6))
|
||
plt.plot(self.data[-30:], label='历史数据')
|
||
plt.plot(predicted_mean, label='预测值', color='r')
|
||
plt.fill_between(conf_int.index,
|
||
conf_int['lower'],
|
||
conf_int['upper'],
|
||
color='r', alpha=0.2)
|
||
plt.title('ARIMA模型预测结果')
|
||
plt.legend()
|
||
self._save_figure('forecast_plot')
|
||
|
||
def generate_diagnostic_plots(self):
|
||
"""生成诊断图表集"""
|
||
# 残差诊断图
|
||
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(12, 5))
|
||
qqplot(self.model.resid, line='q', ax=ax1)
|
||
ax1.set_title('Q-Q图')
|
||
self.model.resid.plot(ax=ax2, title='残差序列')
|
||
self._save_figure('residual_diagnostic')
|
||
|
||
# ACF/PACF图
|
||
fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(12, 8))
|
||
plot_acf(self.model.resid, ax=ax1, lags=20)
|
||
plot_pacf(self.model.resid, ax=ax2, lags=20)
|
||
self._save_figure('acf_pacf')
|
||
|
||
def build_model(self):
|
||
"""模型构建与诊断"""
|
||
# 差分平稳化处理
|
||
diff_data = self.data.copy()
|
||
while ADF(diff_data)[1] > 0.05:
|
||
diff_data = diff_data.diff().dropna()
|
||
self.diff_num += 1
|
||
|
||
# 自动定阶(示例使用AIC准则)
|
||
aic_results = sm.tsa.arma_order_select_ic(
|
||
diff_data, max_ar=4, max_ma=4, ic='aic')
|
||
p, q = aic_results['aic_min_order']
|
||
|
||
# 模型训练
|
||
self.model = ARIMA(self.data, order=(p, self.diff_num, q)).fit()
|
||
|
||
# 生成预测
|
||
forecast = self.model.get_forecast(steps=self.forecast_steps)
|
||
|
||
# 生成工作日日期索引
|
||
last_date = self.data.index[-1].normalize()
|
||
forecast_dates = pd.date_range(
|
||
start=last_date + pd.Timedelta(days=1),
|
||
periods=self.forecast_steps,
|
||
freq='B' # B表示工作日
|
||
).normalize()
|
||
# 设置预测结果日期索引
|
||
predicted_mean = pd.Series(
|
||
forecast.predicted_mean.values,
|
||
index=forecast_dates,
|
||
name='predicted_mean'
|
||
)
|
||
conf_int = pd.DataFrame(
|
||
forecast.conf_int().values,
|
||
index=forecast_dates,
|
||
columns=['lower', 'upper']
|
||
)
|
||
variance_series = pd.Series(
|
||
forecast.se_mean.values,
|
||
index=forecast_dates,
|
||
name='std_error'
|
||
)
|
||
|
||
# 保存预测结果
|
||
predicted_mean.to_csv('ARIMA预测结果.csv')
|
||
|
||
# 生成图表
|
||
self.plot_forecast(predicted_mean, conf_int)
|
||
self.generate_diagnostic_plots()
|
||
|
||
return predicted_mean, conf_int, variance_series
|
||
|
||
def _build_stat_table(self, test_name, results):
|
||
"""构建统计检验表格"""
|
||
return pd.DataFrame(results.items(), columns=['指标', '值']).to_markdown(index=False)
|
||
|
||
def generate_report(self):
|
||
"""生成完整报告"""
|
||
# 预测结果
|
||
predicted_mean, conf_int, variance_series = self.build_model()
|
||
|
||
aifengxi = self.summary(predicted_mean.to_markdown(index=False))
|
||
|
||
# 创建带日期索引的汇总表格
|
||
summary_df = pd.DataFrame({
|
||
'mean': predicted_mean.rename(None),
|
||
'mean_se': variance_series.rename(None),
|
||
'mean_ci_lower': conf_int['lower'].values,
|
||
'mean_ci_upper': conf_int['upper'].values
|
||
}, index=predicted_mean.index.normalize().strftime('%Y-%m-%d'))
|
||
forecast_table = summary_df.to_markdown()
|
||
self._add_report_section('核心预测结果',
|
||
f"\n\n"
|
||
"该图表展示了历史数据(蓝线)与模型预测值(红线),阴影区域表示95%置信区间。"
|
||
f"预测区间显示随着预测步长增加,不确定性逐渐扩大。\n\n{forecast_table}")
|
||
|
||
self._add_report_section('预测结果AI分析',
|
||
aifengxi)
|
||
# 模型诊断
|
||
diag_content = (
|
||
f"**模型阶数**: ARIMA({self.model.model.order})\n\n"
|
||
f"\n\n"
|
||
"左图Q-Q图用于检验残差的正态性,理想情况下散点应沿对角线分布。"
|
||
"右图展示残差序列应呈现随机波动,无明显趋势或周期性。\n\n"
|
||
f"\n\n"
|
||
"自相关图(ACF)和偏自相关图(PACF)显示残差序列的相关性,良好的模型应不存在显著的自相关"
|
||
"(各阶滞后系数应落在置信区间内)。\n\n"
|
||
f"**DW检验**: {durbin_watson(self.model.resid):.2f}\n"
|
||
"DW检验值接近2(当前值{value})表明残差间不存在显著的一阶自相关。".format(
|
||
value=f"{durbin_watson(self.model.resid):.2f}")
|
||
)
|
||
|
||
diag_content = (
|
||
f"**模型阶数**: ARIMA({self.model.model.order})\n\n"
|
||
f"\n\n"
|
||
"左图Q-Q图用于检验残差的正态性,理想情况下散点应沿对角线分布。"
|
||
"右图展示残差序列应呈现随机波动,无明显趋势或周期性。\n\n"
|
||
f"\n\n"
|
||
"自相关图(ACF)和偏自相关图(PACF)显示残差序列的相关性,良好的模型应不存在显著的自相关"
|
||
"(各阶滞后系数应落在置信区间内)。\n\n"
|
||
f"**DW检验**: {durbin_watson(self.model.resid):.2f}\n"
|
||
"DW检验值接近2(当前值{value})表明残差间不存在显著的一阶自相关。".format(
|
||
value=f"{durbin_watson(self.model.resid):.2f}")
|
||
)
|
||
|
||
self._add_report_section('模型诊断', diag_content)
|
||
|
||
# 统计检验
|
||
adf_results = {
|
||
"ADF统计量": ADF(self.data)[0],
|
||
"p值": ADF(self.data)[1],
|
||
"差分阶数": self.diff_num
|
||
}
|
||
adf_test_text = (
|
||
"ADF检验用于验证时间序列的平稳性,原假设为存在单位根(非平稳)。"
|
||
f"当p值小于0.05时拒绝原假设,认为序列已平稳。本案例经过{self.diff_num}次差分后达到平稳状态(p值={ADF(self.data)[1]:.5f})。"
|
||
)
|
||
self._add_report_section('平稳性检验',
|
||
f"{adf_test_text}\n\n{self._build_stat_table('ADF检验', adf_results)}")
|
||
|
||
# 模型评价指标
|
||
metrics = {
|
||
"AIC": self.model.aic,
|
||
"BIC": self.model.bic,
|
||
"HQIC": self.model.hqic
|
||
}
|
||
metric_explanation = (
|
||
"AIC(赤池信息准则)、BIC(贝叶斯信息准则)和HQIC(汉南-奎因信息准则)用于评估模型拟合优度与复杂度的平衡,"
|
||
"数值越小通常表示模型越优。但这些准则更适用于相同差分阶数下的模型比较。"
|
||
)
|
||
self._add_report_section('模型评价',
|
||
f"{metric_explanation}\n\n{self._build_stat_table('信息准则', metrics)}")
|
||
|
||
# 保存报告
|
||
with open('ARIMA_Report.md', 'w', encoding='utf-8') as f:
|
||
f.write("\n".join(self.report_content))
|
||
|
||
# 执行cmd命令转pdf pandoc ARIMA_Report.md -o ARIMA_Report.pdf --pdf-engine=xelatex -V CJKmainfont="SimHei"
|
||
# 转换为PDF
|
||
try:
|
||
import subprocess
|
||
subprocess.run([
|
||
'pandoc',
|
||
'ARIMA_Report.md',
|
||
'-o', 'ARIMA_Report.pdf',
|
||
'--pdf-engine=xelatex',
|
||
'-V', 'CJKmainfont=SimHei'
|
||
], check=True)
|
||
print("PDF报告已生成:ARIMA_Report.pdf")
|
||
except subprocess.CalledProcessError as e:
|
||
print(f"PDF转换失败,请确保已安装pandoc和xelatex: {e}")
|
||
except FileNotFoundError:
|
||
print("未找到pandoc,请先安装: https://pandoc.org/installing.html")
|
||
|
||
|
||
if __name__ == '__main__':
|
||
# 示例数据加载
|
||
data = pd.read_csv(
|
||
r'D:\code\PriceForecast-svn\yuanyouzhoududataset\指标数据.csv', index_col='ds', parse_dates=True)
|
||
# 示例数据加载
|
||
# data = pd.read_csv(
|
||
# r'D:\code\PriceForecast-svn\juxitingdataset\指标数据.csv', index_col='ds', parse_dates=True)
|
||
|
||
# 生成报告
|
||
reporter = ARIMAReportGenerator(data['y'], forecast_steps=30)
|
||
reporter.generate_report()
|
||
print("ARIMA分析报告已生成:ARIMA_Report.md")
|