PriceForecast/models/grumodels.py
2024-11-01 16:38:21 +08:00

165 lines
5.3 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import matplotlib as mpl
mpl.rcParams['font.family'] = 'SimHei' # 设置字体为黑体
import random
import string
import time
import base64
import requests
from hashlib import sha256
from hmac import HMAC
from math import sqrt
from sklearn.preprocessing import MinMaxScaler
from sklearn.metrics import mean_squared_error
from keras.models import Sequential
from keras.layers import GRU, Dense, Dropout
from keras.optimizers import Adam
from keras.callbacks import EarlyStopping
# 数据获取和预处理部分
from sklearn.preprocessing import MinMaxScaler
import pandas as pd
import datetime
import string
import base64
import requests
import random
import time
import re
import hmac
import hashlib
def series_to_supervised(data, n_in=1, n_out=1, dropnan=True):
'''
将时间序列数据转换为监督学习数据
:param data:数据集
:param n_in: 输入序列长度默认为1
:param n_out:输出序列长度默认为1
:param dropnan:
:return:
'''
n_vars = 1 if type(data) is list else data.shape[1]
df = pd.DataFrame(data)
cols, names = list(), list()
# input sequence (t-n, ... t-1)
# 将3组输入数据依次向下移动321行将数据加入cols列表技巧(n_in, 0, -1)中的-1指倒序循环步长为1
for i in range(n_in, 0, -1):
cols.append(df.shift(i))
names += [('var%d(t-%d)' % (j + 1, i)) for j in range(n_vars)]
# forecast sequence (t, t+1, ... t+n)
# 将一组输出数据加入cols列表技巧其中i=0
for i in range(0, n_out):
cols.append(df.shift(-i))
if i == 0:
names += [('var%d(t)' % (j + 1)) for j in range(n_vars)]
else:
names += [('var%d(t+%d)' % (j + 1, i)) for j in range(n_vars)]
# cols列表(list)中现在有四块经过下移后的数据(即df(-3),df(-2),df(-1),df),将四块数据按列 并排合并
agg = pd.concat(cols, axis=1)
# 给合并后的数据添加列名
agg.columns = names
# 删除NaN值列
if dropnan:
agg.dropna(inplace=True)
return agg
def ex_GRU(df):
dataset = df.copy()
dataset.set_index('ds', inplace=True)
values = dataset.values
# 标准化/放缩 特征值在0,1之间
scaler = MinMaxScaler(feature_range=(0, 1))
scaled = scaler.fit_transform(values)
# 数据准备
n_days = 14 # 使用过去14天的数据
n_features = scaled.shape[1] # 特征数量根据实际数据调整
reframed = series_to_supervised(scaled, n_days, 1)
# 划分训练集和测试集
values = reframed.values
n_train_days = int(values.shape[0] * 0.8)
train = values[:n_train_days, :]
test = values[n_train_days:, :]
# 输入输出数据
n_obs = n_days * n_features
train_X, train_y = train[:, :n_obs], train[:, -n_features]
test_X, test_y = test[:, :n_obs], test[:, -n_features]
# 输入数据重塑为 [样本数, 时间步长, 特征数]
train_X = train_X.reshape((train_X.shape[0], n_days, n_features))
test_X = test_X.reshape((test_X.shape[0], n_days, n_features))
# 构造GRU模型
model = Sequential()
model.add(GRU(50, return_sequences=True, input_shape=(n_days, n_features)))
model.add(Dropout(0.2))
model.add(GRU(50))
model.add(Dropout(0.2))
model.add(Dense(1))
# 编译模型
optimizer = Adam(learning_rate=0.001)
model.compile(loss='mean_squared_error', optimizer=optimizer)
# 定义回调函数
early_stopping = EarlyStopping(monitor='val_loss', patience=10)
# 训练模型
history = model.fit(train_X, train_y, epochs=100, batch_size=72, validation_data=(test_X, test_y), verbose=2, shuffle=False, callbacks=[early_stopping])
# 执行预测
yhat = model.predict(test_X)
test_X = test_X.reshape((test_X.shape[0], n_days * n_features))
# 将预测列和真实列数据逆归一化
inv_yhat = np.concatenate((yhat, test_X[:, -n_features+1:]), axis=1)
inv_yhat = scaler.inverse_transform(inv_yhat)
inv_yhat = inv_yhat[:, 0]
test_y = test_y.reshape((len(test_y), 1))
inv_y = np.concatenate((test_y, test_X[:, -n_features+1:]), axis=1)
inv_y = scaler.inverse_transform(inv_y)
inv_y = inv_y[:, 0]
# 计算RMSE
rmse = sqrt(mean_squared_error(inv_y, inv_yhat))
print('Test RMSE: %.3f' % rmse)
# 可视化结果
n = 150
time_axis_data = np.array(range(n))
time_axis_future = np.array(range(n + 7))
inv_y = inv_y[-n:]
inv_yhat = inv_yhat[-n-7:]
fig, ax = plt.subplots(2, 1, gridspec_kw={'height_ratios': [5, 4]})
fig.set_size_inches(8, 6)
ax[0].plot(time_axis_data, inv_y, label='历史价格')
ax[0].plot(time_axis_future, inv_yhat, linestyle='dashed', label='预测价格')
ax[0].set_xlabel('时间')
ax[0].set_ylabel('价格')
ax[0].legend()
ax[0].set_title('布伦特_多价格预测')
ax[0].set_ylim(min(inv_y[-n - 7:]) * 0.4, max(inv_y[-n - 7:]) * 1.6)
ax[1].axis('off')
table_data = [[f"Day {i + 1}", "{:.2f}".format(val)] for i, val in enumerate(inv_yhat[-7:])]
table = ax[1].table(cellText=table_data, colLabels=['Day', 'Prediction'], loc='center')
table.auto_set_font_size(True)
filename = os.path.basename(__file__).split('.')[0]
plt.savefig(filename + '.png')
plt.show()