import os import numpy as np import pandas as pd import matplotlib.pyplot as plt import matplotlib as mpl mpl.rcParams['font.family'] = 'SimHei' # 设置字体为黑体 import random import string import time import base64 import requests from hashlib import sha256 from hmac import HMAC from math import sqrt from sklearn.preprocessing import MinMaxScaler from sklearn.metrics import mean_squared_error from keras.models import Sequential from keras.layers import GRU, Dense, Dropout from keras.optimizers import Adam from keras.callbacks import EarlyStopping # 数据获取和预处理部分 from sklearn.preprocessing import MinMaxScaler import pandas as pd import datetime import string import base64 import requests import random import time import re import hmac import hashlib def series_to_supervised(data, n_in=1, n_out=1, dropnan=True): ''' 将时间序列数据转换为监督学习数据 :param data:数据集 :param n_in: 输入序列长度,默认为1 :param n_out:输出序列长度,默认为1 :param dropnan: :return: ''' n_vars = 1 if type(data) is list else data.shape[1] df = pd.DataFrame(data) cols, names = list(), list() # input sequence (t-n, ... t-1) # 将3组输入数据依次向下移动3,2,1行,将数据加入cols列表(技巧:(n_in, 0, -1)中的-1指倒序循环,步长为1) for i in range(n_in, 0, -1): cols.append(df.shift(i)) names += [('var%d(t-%d)' % (j + 1, i)) for j in range(n_vars)] # forecast sequence (t, t+1, ... t+n) # 将一组输出数据加入cols列表(技巧:其中i=0) for i in range(0, n_out): cols.append(df.shift(-i)) if i == 0: names += [('var%d(t)' % (j + 1)) for j in range(n_vars)] else: names += [('var%d(t+%d)' % (j + 1, i)) for j in range(n_vars)] # cols列表(list)中现在有四块经过下移后的数据(即:df(-3),df(-2),df(-1),df),将四块数据按列 并排合并 agg = pd.concat(cols, axis=1) # 给合并后的数据添加列名 agg.columns = names # 删除NaN值列 if dropnan: agg.dropna(inplace=True) return agg def ex_GRU(df): dataset = df.copy() dataset.set_index('ds', inplace=True) values = dataset.values # 标准化/放缩 特征值在(0,1)之间 scaler = MinMaxScaler(feature_range=(0, 1)) scaled = scaler.fit_transform(values) # 数据准备 n_days = 14 # 使用过去14天的数据 n_features = scaled.shape[1] # 特征数量根据实际数据调整 reframed = series_to_supervised(scaled, n_days, 1) # 划分训练集和测试集 values = reframed.values n_train_days = int(values.shape[0] * 0.8) train = values[:n_train_days, :] test = values[n_train_days:, :] # 输入输出数据 n_obs = n_days * n_features train_X, train_y = train[:, :n_obs], train[:, -n_features] test_X, test_y = test[:, :n_obs], test[:, -n_features] # 输入数据重塑为 [样本数, 时间步长, 特征数] train_X = train_X.reshape((train_X.shape[0], n_days, n_features)) test_X = test_X.reshape((test_X.shape[0], n_days, n_features)) # 构造GRU模型 model = Sequential() model.add(GRU(50, return_sequences=True, input_shape=(n_days, n_features))) model.add(Dropout(0.2)) model.add(GRU(50)) model.add(Dropout(0.2)) model.add(Dense(1)) # 编译模型 optimizer = Adam(learning_rate=0.001) model.compile(loss='mean_squared_error', optimizer=optimizer) # 定义回调函数 early_stopping = EarlyStopping(monitor='val_loss', patience=10) # 训练模型 history = model.fit(train_X, train_y, epochs=100, batch_size=72, validation_data=(test_X, test_y), verbose=2, shuffle=False, callbacks=[early_stopping]) # 执行预测 yhat = model.predict(test_X) test_X = test_X.reshape((test_X.shape[0], n_days * n_features)) # 将预测列和真实列数据逆归一化 inv_yhat = np.concatenate((yhat, test_X[:, -n_features+1:]), axis=1) inv_yhat = scaler.inverse_transform(inv_yhat) inv_yhat = inv_yhat[:, 0] test_y = test_y.reshape((len(test_y), 1)) inv_y = np.concatenate((test_y, test_X[:, -n_features+1:]), axis=1) inv_y = scaler.inverse_transform(inv_y) inv_y = inv_y[:, 0] # 计算RMSE rmse = sqrt(mean_squared_error(inv_y, inv_yhat)) print('Test RMSE: %.3f' % rmse) # 可视化结果 n = 150 time_axis_data = np.array(range(n)) time_axis_future = np.array(range(n + 7)) inv_y = inv_y[-n:] inv_yhat = inv_yhat[-n-7:] fig, ax = plt.subplots(2, 1, gridspec_kw={'height_ratios': [5, 4]}) fig.set_size_inches(8, 6) ax[0].plot(time_axis_data, inv_y, label='历史价格') ax[0].plot(time_axis_future, inv_yhat, linestyle='dashed', label='预测价格') ax[0].set_xlabel('时间') ax[0].set_ylabel('价格') ax[0].legend() ax[0].set_title('布伦特_多价格预测') ax[0].set_ylim(min(inv_y[-n - 7:]) * 0.4, max(inv_y[-n - 7:]) * 1.6) ax[1].axis('off') table_data = [[f"Day {i + 1}", "{:.2f}".format(val)] for i, val in enumerate(inv_yhat[-7:])] table = ax[1].table(cellText=table_data, colLabels=['Day', 'Prediction'], loc='center') table.auto_set_font_size(True) filename = os.path.basename(__file__).split('.')[0] plt.savefig(filename + '.png') plt.show()