from sklearn.metrics import mean_squared_error, r2_score from sklearn.feature_selection import SelectFromModel from sklearn.preprocessing import OrdinalEncoder from sklearn.impute import SimpleImputer from sklearn.model_selection import train_test_split import pandas as pd from sklearn.linear_model import Lasso from xgboost import XGBRegressor import statsmodels.api as sm import plotly.express as px import plotly.graph_objects as go import xgboost as xgb from xgboost import plot_importance, plot_tree from sklearn.metrics import mean_absolute_error from statsmodels.tools.eval_measures import mse, rmse from sklearn.model_selection import GridSearchCV import warnings import pickle from sklearn.metrics import mean_squared_error from sklearn.metrics import r2_score import numpy as np from matplotlib import pyplot as plt import os from matplotlib import rcParams rcParams['font.sans-serif'] = ['SimHei'] # Windows系统自带的中文字体 rcParams['axes.unicode_minus'] = False # 解决负号显示问题 pd.set_option('display.max_rows', 40) pd.set_option('display.max_columns', 40) class xgboostmodels(): def __init__(self, df, steps=3): self.df = df self.steps = steps self.features = [] def optimize_Model(self, df): # 生成滞后特征(示例使用3期滞后) steps = self.steps for i in range(1, steps+1): df[f'lag_{i}'] = df['y'].shift(i) # 生成多步预测目标(示例预测未来3步) forecast_horizon = steps for i in range(forecast_horizon): df[f'target_{i+1}'] = df['y'].shift(-(i+1)) df = df.dropna() # 删除包含NaN的行 # 更新特征和目标变量 features = [col for col in df.columns if col.startswith('lag')] targets = [col for col in df.columns if col.startswith('target')] X = df[features] y = df[targets] # 按时间顺序分割数据集(时间序列不能随机分割) split = int(0.8 * len(df)) X_train, x_test = X.iloc[:split], X.iloc[split:] y_train, y_test = y.iloc[:split], y.iloc[split:] # 修改模型为多输出回归 from sklearn.multioutput import MultiOutputRegressor base_model = XGBRegressor(random_state=0) estimator = MultiOutputRegressor(base_model) # 保持多输出包装器 # 模型参数网格需要调整为适用于多输出回归器 parameters = { 'estimator__max_depth': range(2, 11, 2), # 注意参数前缀 'estimator__n_estimators': range(50, 101, 10), 'estimator__learning_rate': [0.01, 0.03, 0.1, 0.3, 0.5, 1] } grid_search_XGB = GridSearchCV( estimator=estimator, param_grid=parameters, cv=3, verbose=True, scoring='neg_mean_squared_error' # 使用更适合时序的评估指标 ) grid_search_XGB.fit(X_train, y_train) # 优化后的结果展示 print("最佳参数组合:") best_params = {} for param, value in grid_search_XGB.best_params_.items(): best_params[param.split('__')[1]] = value print(f"{param}: {value}") best_params_df = pd.DataFrame( [best_params], index=["最佳参数组合"]) y_pred = grid_search_XGB.predict(x_test) # 多步预测评估指标 evaluation_results = [] for step in range(forecast_horizon): mse = mean_squared_error(y_test.iloc[:, step], y_pred[:, step]) rmse = np.sqrt(mse) r2 = r2_score(y_test.iloc[:, step], y_pred[:, step]) evaluation_results.append({ '预测步长': step+1, '均方根误差(RMSE)': rmse, 'R²分数': r2 }) # 生成多步预测结果对比表 results_df = pd.DataFrame(evaluation_results).set_index('预测步长') print("\n多步预测性能评估:") print(results_df) # 保存完整模型(包含多输出包装器) Pkl_Filename = "report\日度价格预测_最佳模型.pkl" with open(Pkl_Filename, 'wb') as file: pickle.dump(grid_search_XGB.best_estimator_, file) return results_df, best_params_df def forecast_price(self, df): steps = self.steps import joblib Best_model_DalyLGPrice = joblib.load( "report\日度价格预测_最佳模型.pkl") df = df.copy() df.set_index('ds', inplace=True) # 生成滞后特征(与训练时保持一致) for i in range(1, steps+1): df[f'lag_{i}'] = df['y'].shift(i) # 获取特征列(保持与训练时相同的特征顺序) features = [f'lag_{i}' for i in range(1, steps+1)] # 初始化预测序列(保留最后3个已知值作为初始输入) current_values = df[features][-1:].values # 初始形状 (1,3) predictions = Best_model_DalyLGPrice.predict(current_values)[0] # 生成带日期的预测结果 last_date = df.index[-1] date_range = pd.date_range( start=last_date + pd.Timedelta(days=1), periods=steps ) # 保存预测结果到DataFrame forecast_series = pd.Series( predictions, index=date_range.strftime('%Y-%m-%d'), name='预测值') # 拼接原始数据和预测结果 df = pd.concat([df[['y']], forecast_series], axis=0) # 绘制预测结果 plt.figure(figsize=(12, 6)) n_points = min(len(df), 30) # 自动适配数据量 plt.plot(df.index[-n_points:], df['y'].iloc[-n_points:], label=f'原始数据(近{n_points}天)', marker='o', markersize=4) plt.plot(forecast_series.index, forecast_series.values, label=f'预测值 (未来{steps}天)', linestyle='--', marker='x') plt.axvline(x=last_date, color='red', linestyle=':', label='当前日期分界线', alpha=0.7) plt.title('价格预测趋势') plt.xlabel('日期') plt.ylabel('价格') plt.legend() plt.grid(True) # 自动旋转日期标签 plt.gcf().autofmt_xdate() # 确保保存目录存在 os.makedirs("report", exist_ok=True) plt.savefig("report/价格预测.png", dpi=300, bbox_inches='tight') plt.close() return forecast_series def generate_xgboost_report(self, model_results, best_params_df, forecast_series, save_path="report/xgboost_report.md"): """ 生成XGBoost模型报告 参数: model_results: 模型对比结果DataFrame evaluation_df: 多步评估结果DataFrame forecast_series: 预测结果Series save_path: 报告保存路径 """ # 构建Markdown内容 md_content = f""" # XGBoost价格预测分析报告 {pd.Timestamp.now().strftime('%Y-%m-%d')} ## 一、模型实现 ### 1. 特征工程 - 使用{len(model_results)}期历史滞后特征 - 预测未来{len(model_results)}个时间步长 - 数据集分割比例:80% 训练集 / 20% 测试集 ### 2. 模型配置 {best_params_df.to_markdown()} ## 二、性能评估 ### 多步预测误差分析 {model_results.to_markdown()} ## 三、预测结果 ### 未来{len(forecast_series)}日价格预测 ![预测结果](价格预测.png) {forecast_series.to_markdown()} """ # 保存报告 os.makedirs(os.path.dirname(save_path), exist_ok=True) with open(save_path, 'w', encoding='utf-8') as f: f.write(md_content) print(f"报告已生成至:{os.path.abspath(save_path)}") if __name__ == '__main__': df = pd.read_csv( "D:\code\PriceForecast-svn\yuanyoudataset\指标数据.csv") df['ds'] = pd.to_datetime(df['ds']) xg = xgboostmodels(df=df, steps=5) model_results, best_params_df = xg.optimize_Model(df=df) forecast_series = xg.forecast_price(df) xg.generate_xgboost_report( model_results=model_results, best_params_df=best_params_df, forecast_series=forecast_series )