PriceForecast/models/lstmmodels.py

255 lines
9.3 KiB
Python
Raw Permalink Normal View History

2024-11-01 16:38:21 +08:00
import numpy as np
from sklearn.preprocessing import MinMaxScaler
from sklearn.metrics import mean_squared_error
from sklearn.model_selection import train_test_split
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dense
import datetime
import matplotlib.pyplot as plt
import pandas as pd
import os
import random
import string
import time
import base64
from hashlib import sha256
from hmac import HMAC
import requests
import csv
from numpy import concatenate
from math import sqrt
def series_to_supervised(data, n_in=1, n_out=1, dropnan=True):
'''
将时间序列数据转换为监督学习数据
:param data:数据集
:param n_in: 输入序列长度默认为1
:param n_out:输出序列长度默认为1
:param dropnan:
:return:
'''
n_vars = 1 if type(data) is list else data.shape[1]
df = pd.DataFrame(data)
cols, names = list(), list()
# input sequence (t-n, ... t-1)
# 将3组输入数据依次向下移动321行将数据加入cols列表技巧(n_in, 0, -1)中的-1指倒序循环步长为1
for i in range(n_in, 0, -1):
cols.append(df.shift(i))
names += [('var%d(t-%d)' % (j + 1, i)) for j in range(n_vars)]
# forecast sequence (t, t+1, ... t+n)
# 将一组输出数据加入cols列表技巧其中i=0
for i in range(0, n_out):
cols.append(df.shift(-i))
if i == 0:
names += [('var%d(t)' % (j + 1)) for j in range(n_vars)]
else:
names += [('var%d(t+%d)' % (j + 1, i)) for j in range(n_vars)]
# cols列表(list)中现在有四块经过下移后的数据(即df(-3),df(-2),df(-1),df),将四块数据按列 并排合并
agg = pd.concat(cols, axis=1)
# 给合并后的数据添加列名
agg.columns = names
# 删除NaN值列
if dropnan:
agg.dropna(inplace=True)
return agg
def createXY(dataset,n_past):
dataX = []
dataY = []
print(dataset.shape[1])
for i in range(n_past, len(dataset)):
dataX.append(dataset[i - n_past:i, 0:dataset.shape[1]])
dataY.append(dataset[i,0])
return np.array(dataX),np.array(dataY)
def ex_Lstm_M(df,n_days=14,out_days=7,is_debug=False,datasetpath=''):
# dataset = pd.read_csv('brentpricepredict.csv',encoding='utf-8')
dataset = df.copy()
dataset.set_index('ds', inplace=True)
values = dataset.values
if is_debug:
# values = values[-1000:]
pass
# 标准化/放缩 特征值在0,1之间
scaler = MinMaxScaler(feature_range=(0, 1))
scaled = scaler.fit_transform(values)
# 用14天数据预测七天数据
n_features = dataset.shape[1]
# 构造一个14->7的监督学习型数据
reframed = series_to_supervised(scaled, n_days, out_days)
# 切分数据集
values = reframed.values
# 用80%的数据来训练20%的数据来测试
n_train = int(len(dataset) * 0.8)
train = values[:n_train, :]
test = values[n_train:, :]
# 切分输入输出
n_obs = n_days * n_features
# 倒数第19列作为Y
train_X, train_y = train[:, :n_obs], train[:, -n_features]
test_X, test_y = test[:, :n_obs], test[:, -n_features]
# 将数据转换为3D输入timesteps=1414条数据预测7条 [samples, timesteps, features]
train_X = train_X.reshape((train_X.shape[0], n_days, n_features))
test_X = test_X.reshape((test_X.shape[0], n_days, n_features))
print(train_X.shape, train_y.shape, test_X.shape, test_y.shape)
# 设计网络
model = Sequential()
model.add(LSTM(50, input_shape=(train_X.shape[1], train_X.shape[2])))
model.add(Dense(1))
model.compile(loss='mae', optimizer='adam')
# 拟合网络
history = model.fit(train_X, train_y, epochs=100, batch_size=72, validation_data=(test_X, test_y), verbose=2,
shuffle=False)
# 执行预测
yhat = model.predict(test_X)
# 将数据格式化成 n行 * 24列
test_X = test_X.reshape((test_X.shape[0], n_days * n_features))
# 将预测列据和后7列数据拼接因后续逆缩放时数据形状要符合 n行*20列 的要求
inv_yhat = concatenate((yhat, test_X[:, -n_features+1:]), axis=1)
# 对拼接好的数据进行逆缩放
inv_yhat = scaler.inverse_transform(inv_yhat)
inv_yhat = inv_yhat[:, 0]
print(inv_yhat)
test_y = test_y.reshape((len(test_y), 1))
# 将真实列据和后7列数据拼接因后续逆缩放时数据形状要符合 n行*20列 的要求
inv_y = concatenate((test_y, test_X[:, -n_features+1:]), axis=1)
# 对拼接好的数据进行逆缩放
inv_y = scaler.inverse_transform(inv_y)
inv_y = inv_y[:, 0]
# 计算RMSE
rmse = sqrt(mean_squared_error(inv_y, inv_yhat))
print('Test RMSE: %.3f' % rmse)
# 可视化结果
# 保留n天历史数据
n = len(inv_y) - 7
# 设置要可视化的值
time_axis_data = np.array(range(n))
time_axis_future = np.array(range(n + 7))
inv_y = inv_y[-n:]
inv_yhat = inv_yhat[-n-7:]
# Plot data and future predictions
fig, ax = plt.subplots(2, 1, gridspec_kw={'height_ratios': [5, 4]})
# 设置画布大小
fig.set_size_inches(6, 6)
# 第一个子图画历史价格和预测价格
ax[0].plot(time_axis_data, inv_y, label='历史价格')
ax[0].plot(time_axis_future, inv_yhat, linestyle='dashed', label='预测价格')
ax[0].set_xlabel('时间')
ax[0].set_ylabel('价格')
ax[0].legend()
# 设置标题
ax[0].set_title('布伦特_多价格预测')
# 设置y轴范围
ax[0].set_ylim(50, 120)
# 第二个子图画表格,展示预测价格
ax[1].axis('off')
table_data = [[f"Day {i + 1}", "{:.2f}".format(val)] for i, val in enumerate(inv_yhat[-7:])]
table = ax[1].table(cellText=table_data, colLabels=['Day', 'Prediction'], loc='center')
# 设置表格内容居中
table.auto_set_font_size(True)
# 保存图片
filename = os.path.basename(__file__).split('.')[0]
plt.savefig(os.path.join(datasetpath,filename + '_M.png'))
# plt.show()
def ex_Lstm(df,input_seq_len=50, output_seq_len=7,is_debug=False,dataset=''):
# 将日期列转换为 datetime 类型(如果尚未转换)
df['ds'] = pd.to_datetime(df['ds'])
# 分离出数值列(排除日期列)
numeric_df = df.select_dtypes(include=['int64', 'float64'])
prices = df
# prices = df
# print(data1)
# Remove any NaN values
df = df.drop('ds', axis=1)
prices = np.array(df, dtype=float) # convert to NumPy array of floats
prices = prices[~np.isnan(prices)]
if is_debug:
prices = prices[-300:]
# Prepare input sequences
inputs = []
for i in range(len(prices)-input_seq_len-output_seq_len+1):
inputs.append(prices[i:i+input_seq_len])
inputs = np.array(inputs)
# Prepare output sequences
outputs = []
for i in range(input_seq_len, len(prices)-output_seq_len+1):
outputs.append(prices[i:i+output_seq_len])
outputs = np.array(outputs)
# Split dataset into training and testing sets
X_train, X_test, y_train, y_test = train_test_split(inputs, outputs, test_size=0.2)
# Normalize data
scaler_in = MinMaxScaler()
X_train = scaler_in.fit_transform(X_train)
X_test = scaler_in.transform(X_test)
scaler_out = MinMaxScaler()
y_train = scaler_out.fit_transform(y_train)
y_test = scaler_out.transform(y_test)
# Define LSTM model
model = Sequential()
model.add(LSTM(128, activation='relu', input_shape=(input_seq_len, 1)))
model.add(Dense(output_seq_len))
model.compile(optimizer='adam', loss='mse')
# Train LSTM model
model.fit(X_train.reshape(-1, input_seq_len, 1), y_train, epochs=100, batch_size=64, validation_data=(X_test.reshape(-1, input_seq_len, 1), y_test))
# Evaluate LSTM model
mse = model.evaluate(X_test.reshape(-1, input_seq_len, 1), y_test)
# Make future predictions
future_inputs = np.array([prices[-input_seq_len:]])
future_inputs = scaler_in.transform(future_inputs)
future_predictions = model.predict(future_inputs.reshape(-1, input_seq_len, 1))
future_predictions = scaler_out.inverse_transform(future_predictions)[0]
# Print results
print("MSE: ", mse)
print("Future predictions: ", future_predictions)
# Generate time axis for data and future predictions
time_axis_data = np.arange(len(prices))
time_axis_future = np.arange(len(prices), len(prices) + len(future_predictions))
# Concatenate time axis and data
time_axis = np.concatenate((time_axis_data, time_axis_future))
# Concatenate data and future predictions
data_and_predictions = np.concatenate((prices, future_predictions))
# Plot data and future predictions
fig, ax = plt.subplots(2, 1, gridspec_kw={'height_ratios': [3, 1]})
# First subplot: Data and Future Predictions
ax[0].plot(time_axis, data_and_predictions, label='Data and Future Predictions')
ax[0].plot(time_axis_future, future_predictions, linestyle='dashed', label='Future Predictions')
ax[0].set_xlabel('Time')
ax[0].set_ylabel('Price')
ax[0].legend()
# Second subplot: Table for Future Predictions
ax[1].axis('off')
table_data = [[f"Day {i+1}", "{:.2f}".format(val)] for i, val in enumerate(future_predictions)]
table = ax[1].table(cellText=table_data, colLabels=['Day', 'Prediction'], loc='center')
plt.savefig(os.path.join(dataset,'lstmmodels.png'))
# plt.show()