import numpy as np from sklearn.preprocessing import MinMaxScaler from sklearn.metrics import mean_squared_error from sklearn.model_selection import train_test_split from tensorflow.keras.models import Sequential from tensorflow.keras.layers import LSTM, Dense import datetime import matplotlib.pyplot as plt import pandas as pd import os import random import string import time import base64 from hashlib import sha256 from hmac import HMAC import requests import csv from numpy import concatenate from math import sqrt def series_to_supervised(data, n_in=1, n_out=1, dropnan=True): ''' 将时间序列数据转换为监督学习数据 :param data:数据集 :param n_in: 输入序列长度,默认为1 :param n_out:输出序列长度,默认为1 :param dropnan: :return: ''' n_vars = 1 if type(data) is list else data.shape[1] df = pd.DataFrame(data) cols, names = list(), list() # input sequence (t-n, ... t-1) # 将3组输入数据依次向下移动3,2,1行,将数据加入cols列表(技巧:(n_in, 0, -1)中的-1指倒序循环,步长为1) for i in range(n_in, 0, -1): cols.append(df.shift(i)) names += [('var%d(t-%d)' % (j + 1, i)) for j in range(n_vars)] # forecast sequence (t, t+1, ... t+n) # 将一组输出数据加入cols列表(技巧:其中i=0) for i in range(0, n_out): cols.append(df.shift(-i)) if i == 0: names += [('var%d(t)' % (j + 1)) for j in range(n_vars)] else: names += [('var%d(t+%d)' % (j + 1, i)) for j in range(n_vars)] # cols列表(list)中现在有四块经过下移后的数据(即:df(-3),df(-2),df(-1),df),将四块数据按列 并排合并 agg = pd.concat(cols, axis=1) # 给合并后的数据添加列名 agg.columns = names # 删除NaN值列 if dropnan: agg.dropna(inplace=True) return agg def createXY(dataset,n_past): dataX = [] dataY = [] print(dataset.shape[1]) for i in range(n_past, len(dataset)): dataX.append(dataset[i - n_past:i, 0:dataset.shape[1]]) dataY.append(dataset[i,0]) return np.array(dataX),np.array(dataY) def ex_Lstm_M(df,n_days=14,out_days=7,is_debug=False,datasetpath=''): # dataset = pd.read_csv('brentpricepredict.csv',encoding='utf-8') dataset = df.copy() dataset.set_index('ds', inplace=True) values = dataset.values if is_debug: # values = values[-1000:] pass # 标准化/放缩 特征值在(0,1)之间 scaler = MinMaxScaler(feature_range=(0, 1)) scaled = scaler.fit_transform(values) # 用14天数据预测七天数据 n_features = dataset.shape[1] # 构造一个14->7的监督学习型数据 reframed = series_to_supervised(scaled, n_days, out_days) # 切分数据集 values = reframed.values # 用80%的数据来训练,20%的数据来测试 n_train = int(len(dataset) * 0.8) train = values[:n_train, :] test = values[n_train:, :] # 切分输入输出 n_obs = n_days * n_features # 倒数第19列作为Y train_X, train_y = train[:, :n_obs], train[:, -n_features] test_X, test_y = test[:, :n_obs], test[:, -n_features] # 将数据转换为3D输入,timesteps=14,14条数据预测7条 [samples, timesteps, features] train_X = train_X.reshape((train_X.shape[0], n_days, n_features)) test_X = test_X.reshape((test_X.shape[0], n_days, n_features)) print(train_X.shape, train_y.shape, test_X.shape, test_y.shape) # 设计网络 model = Sequential() model.add(LSTM(50, input_shape=(train_X.shape[1], train_X.shape[2]))) model.add(Dense(1)) model.compile(loss='mae', optimizer='adam') # 拟合网络 history = model.fit(train_X, train_y, epochs=100, batch_size=72, validation_data=(test_X, test_y), verbose=2, shuffle=False) # 执行预测 yhat = model.predict(test_X) # 将数据格式化成 n行 * 24列 test_X = test_X.reshape((test_X.shape[0], n_days * n_features)) # 将预测列据和后7列数据拼接,因后续逆缩放时,数据形状要符合 n行*20列 的要求 inv_yhat = concatenate((yhat, test_X[:, -n_features+1:]), axis=1) # 对拼接好的数据进行逆缩放 inv_yhat = scaler.inverse_transform(inv_yhat) inv_yhat = inv_yhat[:, 0] print(inv_yhat) test_y = test_y.reshape((len(test_y), 1)) # 将真实列据和后7列数据拼接,因后续逆缩放时,数据形状要符合 n行*20列 的要求 inv_y = concatenate((test_y, test_X[:, -n_features+1:]), axis=1) # 对拼接好的数据进行逆缩放 inv_y = scaler.inverse_transform(inv_y) inv_y = inv_y[:, 0] # 计算RMSE rmse = sqrt(mean_squared_error(inv_y, inv_yhat)) print('Test RMSE: %.3f' % rmse) # 可视化结果 # 保留n天历史数据 n = len(inv_y) - 7 # 设置要可视化的值 time_axis_data = np.array(range(n)) time_axis_future = np.array(range(n + 7)) inv_y = inv_y[-n:] inv_yhat = inv_yhat[-n-7:] # Plot data and future predictions fig, ax = plt.subplots(2, 1, gridspec_kw={'height_ratios': [5, 4]}) # 设置画布大小 fig.set_size_inches(6, 6) # 第一个子图画历史价格和预测价格 ax[0].plot(time_axis_data, inv_y, label='历史价格') ax[0].plot(time_axis_future, inv_yhat, linestyle='dashed', label='预测价格') ax[0].set_xlabel('时间') ax[0].set_ylabel('价格') ax[0].legend() # 设置标题 ax[0].set_title('布伦特_多价格预测') # 设置y轴范围 ax[0].set_ylim(50, 120) # 第二个子图画表格,展示预测价格 ax[1].axis('off') table_data = [[f"Day {i + 1}", "{:.2f}".format(val)] for i, val in enumerate(inv_yhat[-7:])] table = ax[1].table(cellText=table_data, colLabels=['Day', 'Prediction'], loc='center') # 设置表格内容居中 table.auto_set_font_size(True) # 保存图片 filename = os.path.basename(__file__).split('.')[0] plt.savefig(os.path.join(datasetpath,filename + '_M.png')) # plt.show() def ex_Lstm(df,input_seq_len=50, output_seq_len=7,is_debug=False,dataset=''): # 将日期列转换为 datetime 类型(如果尚未转换) df['ds'] = pd.to_datetime(df['ds']) # 分离出数值列(排除日期列) numeric_df = df.select_dtypes(include=['int64', 'float64']) prices = df # prices = df # print(data1) # Remove any NaN values df = df.drop('ds', axis=1) prices = np.array(df, dtype=float) # convert to NumPy array of floats prices = prices[~np.isnan(prices)] if is_debug: prices = prices[-300:] # Prepare input sequences inputs = [] for i in range(len(prices)-input_seq_len-output_seq_len+1): inputs.append(prices[i:i+input_seq_len]) inputs = np.array(inputs) # Prepare output sequences outputs = [] for i in range(input_seq_len, len(prices)-output_seq_len+1): outputs.append(prices[i:i+output_seq_len]) outputs = np.array(outputs) # Split dataset into training and testing sets X_train, X_test, y_train, y_test = train_test_split(inputs, outputs, test_size=0.2) # Normalize data scaler_in = MinMaxScaler() X_train = scaler_in.fit_transform(X_train) X_test = scaler_in.transform(X_test) scaler_out = MinMaxScaler() y_train = scaler_out.fit_transform(y_train) y_test = scaler_out.transform(y_test) # Define LSTM model model = Sequential() model.add(LSTM(128, activation='relu', input_shape=(input_seq_len, 1))) model.add(Dense(output_seq_len)) model.compile(optimizer='adam', loss='mse') # Train LSTM model model.fit(X_train.reshape(-1, input_seq_len, 1), y_train, epochs=100, batch_size=64, validation_data=(X_test.reshape(-1, input_seq_len, 1), y_test)) # Evaluate LSTM model mse = model.evaluate(X_test.reshape(-1, input_seq_len, 1), y_test) # Make future predictions future_inputs = np.array([prices[-input_seq_len:]]) future_inputs = scaler_in.transform(future_inputs) future_predictions = model.predict(future_inputs.reshape(-1, input_seq_len, 1)) future_predictions = scaler_out.inverse_transform(future_predictions)[0] # Print results print("MSE: ", mse) print("Future predictions: ", future_predictions) # Generate time axis for data and future predictions time_axis_data = np.arange(len(prices)) time_axis_future = np.arange(len(prices), len(prices) + len(future_predictions)) # Concatenate time axis and data time_axis = np.concatenate((time_axis_data, time_axis_future)) # Concatenate data and future predictions data_and_predictions = np.concatenate((prices, future_predictions)) # Plot data and future predictions fig, ax = plt.subplots(2, 1, gridspec_kw={'height_ratios': [3, 1]}) # First subplot: Data and Future Predictions ax[0].plot(time_axis, data_and_predictions, label='Data and Future Predictions') ax[0].plot(time_axis_future, future_predictions, linestyle='dashed', label='Future Predictions') ax[0].set_xlabel('Time') ax[0].set_ylabel('Price') ax[0].legend() # Second subplot: Table for Future Predictions ax[1].axis('off') table_data = [[f"Day {i+1}", "{:.2f}".format(val)] for i, val in enumerate(future_predictions)] table = ax[1].table(cellText=table_data, colLabels=['Day', 'Prediction'], loc='center') plt.savefig(os.path.join(dataset,'lstmmodels.png')) # plt.show()