191 lines
5.0 KiB
Python
191 lines
5.0 KiB
Python
|
import pandas as pd
|
|||
|
import re
|
|||
|
import os
|
|||
|
import pandas as pd
|
|||
|
|
|||
|
import multiprocessing
|
|||
|
import time
|
|||
|
import joblib
|
|||
|
import torch
|
|||
|
|
|||
|
# 定义函数
|
|||
|
def loadcsv(filename):
|
|||
|
try:
|
|||
|
df = pd.read_csv(filename, encoding='utf-8')
|
|||
|
except UnicodeDecodeError:
|
|||
|
df = pd.read_csv(filename, encoding='gbk')
|
|||
|
return df
|
|||
|
|
|||
|
|
|||
|
def datachuli(df, datecol='date'):
|
|||
|
# 删除空列
|
|||
|
df = df.dropna(axis=1, how='all')
|
|||
|
# 向上填充
|
|||
|
df.ffill
|
|||
|
# 向下填充
|
|||
|
df.bfill
|
|||
|
# date转为pddate
|
|||
|
df.rename(columns={datecol: 'ds'}, inplace=True)
|
|||
|
# 设置ds为pd.datetime
|
|||
|
df['ds'] = pd.to_datetime(df['ds'])
|
|||
|
# 重命名预测列
|
|||
|
df.rename(columns={'Brent连1合约价格': 'y'}, inplace=True)
|
|||
|
|
|||
|
return df
|
|||
|
|
|||
|
|
|||
|
def getdata(filename, datecol='date'):
|
|||
|
df = loadcsv(filename)
|
|||
|
df = datachuli(df, datecol)
|
|||
|
return df
|
|||
|
|
|||
|
|
|||
|
|
|||
|
|
|||
|
|
|||
|
# 预测函数
|
|||
|
def predict(X_test, nf,result_list):
|
|||
|
df_predict = nf.predict(X_test).reset_index()
|
|||
|
result_list.append(df_predict.values.tolist())
|
|||
|
return df_predict
|
|||
|
|
|||
|
|
|||
|
def testSetPredict(X_test, nf, columns,dataset):
|
|||
|
|
|||
|
# 记录开始时间
|
|||
|
start_time = time.time()
|
|||
|
|
|||
|
# 计算每个进程处理的样本数
|
|||
|
num_samples = len(X_test)
|
|||
|
num_processes = multiprocessing.cpu_count()
|
|||
|
samples_per_process = num_samples // num_processes
|
|||
|
|
|||
|
manager = multiprocessing.Manager()
|
|||
|
result_list = manager.list() # 创建共享的列表
|
|||
|
# 创建进程池
|
|||
|
with multiprocessing.Pool(num_processes) as pool:
|
|||
|
processes = []
|
|||
|
for i in range(num_processes):
|
|||
|
# 计算 每个进程需要处理的数据索引
|
|||
|
start_index = i * samples_per_process
|
|||
|
end_index = (i + 1) * samples_per_process if i != num_processes - 1 else num_samples
|
|||
|
# 按计算的索引切分数据
|
|||
|
X_test_split = X_test[start_index:end_index]
|
|||
|
# 添加任务到进程池
|
|||
|
for X in X_test_split:
|
|||
|
processes.append(pool.apply_async(predict, args=(X, nf,result_list)))
|
|||
|
for process in processes:
|
|||
|
process.get()
|
|||
|
# 将共享列表中的数据转换回 DataFrame
|
|||
|
df_combined = pd.DataFrame()
|
|||
|
df_combined2 = pd.DataFrame()
|
|||
|
for result in result_list:
|
|||
|
try:
|
|||
|
df_shared = pd.DataFrame(result, columns=['index', 'ds'] + columns)
|
|||
|
df_combined = pd.concat([df_combined, df_shared]).reset_index(drop=True)
|
|||
|
except ValueError:
|
|||
|
# 如果数据不匹配,就放到另一个 DataFrame 中
|
|||
|
df_shared2 = pd.DataFrame(result, columns=['index', 'ds']+ columns2)
|
|||
|
df_combined2 = pd.concat([df_combined2, df_shared2]).reset_index(drop=True)
|
|||
|
# df_combined.drop(['index'], axis=1, inplace=True)
|
|||
|
df_combined.to_csv(os.path.join(dataset, 'df_combined.csv'), index=False)
|
|||
|
# df_combined2.drop(['index'], axis=1, inplace=True)
|
|||
|
df_combined2.to_csv('df_combined.csv', index=False)
|
|||
|
end_time = time.time()
|
|||
|
# 打印运行时间,转为时分秒
|
|||
|
print("运行时间:", end_time - start_time, "秒")
|
|||
|
|
|||
|
|
|||
|
if __name__ == '__main__':
|
|||
|
# 记录开始时间
|
|||
|
start_time = time.time()
|
|||
|
|
|||
|
|
|||
|
# file = '指标数据处理.csv'
|
|||
|
file = 'brentpricepredict.csv'
|
|||
|
df = getdata(file)
|
|||
|
df.head()
|
|||
|
|
|||
|
# 选择特征和标签列
|
|||
|
X = df.drop(['y', 'ds'], axis=1) # 特征集,排除时间戳和标签列 Brent连1合约价格
|
|||
|
y = df['y'] # 标签集
|
|||
|
|
|||
|
# 计算训练集的结束索引,占总数据的80%
|
|||
|
split_index = int(0.8 * df.shape[0])
|
|||
|
|
|||
|
# 按照时间顺序划分训练集和测试集
|
|||
|
df_train = df[:split_index]
|
|||
|
df_test = df[split_index:]
|
|||
|
df_train['unique_id'] = 1
|
|||
|
df_test['unique_id'] = 1
|
|||
|
|
|||
|
df_combined = pd.DataFrame()
|
|||
|
df_test = df_test.reindex()
|
|||
|
# df_test = df_test[-20:]
|
|||
|
|
|||
|
# 读取模型列表,用来预测结果列名
|
|||
|
columns = [
|
|||
|
'NHITS',
|
|||
|
'Informer',
|
|||
|
'LSTM',
|
|||
|
'iTransformer',
|
|||
|
'TSMixer',
|
|||
|
'TSMixerx',
|
|||
|
'PatchTST',
|
|||
|
'RNN',
|
|||
|
'GRU',
|
|||
|
'TCN',
|
|||
|
'DeepAR',
|
|||
|
'BiTCN',
|
|||
|
'DilatedRNN',
|
|||
|
'MLP',
|
|||
|
'DLinear',
|
|||
|
'NLinear',
|
|||
|
'TFT',
|
|||
|
'FEDformer',
|
|||
|
'StemGNN',
|
|||
|
'MLPMultivariate',
|
|||
|
'TiDE',
|
|||
|
'DeepNPTS',
|
|||
|
]
|
|||
|
|
|||
|
# deepar 的预测结果会多 五个列,需要单独处理
|
|||
|
columns2 = [
|
|||
|
'NHITS',
|
|||
|
'Informer',
|
|||
|
'LSTM',
|
|||
|
'iTransformer',
|
|||
|
'TSMixer',
|
|||
|
'TSMixerx',
|
|||
|
'PatchTST',
|
|||
|
'RNN',
|
|||
|
'GRU',
|
|||
|
'TCN',
|
|||
|
'DeepAR',
|
|||
|
'DeepAR-median',
|
|||
|
'DeepAR-lo-90',
|
|||
|
'DeepAR-lo-80',
|
|||
|
'DeepAR-hi-80',
|
|||
|
'DeepAR-hi-90',
|
|||
|
'BiTCN',
|
|||
|
'DilatedRNN',
|
|||
|
'MLP',
|
|||
|
'DLinear',
|
|||
|
'NLinear',
|
|||
|
'TFT',
|
|||
|
'FEDformer',
|
|||
|
'StemGNN',
|
|||
|
'MLPMultivariate',
|
|||
|
'TiDE',
|
|||
|
'DeepNPT',
|
|||
|
]
|
|||
|
|
|||
|
|
|||
|
input_size = 14
|
|||
|
X_test = []
|
|||
|
for i in range(0, len(df_test) - input_size + 1):
|
|||
|
X_test.append(df_test.iloc[i:i + input_size])
|
|||
|
|
|||
|
nf = joblib.load('model_reg.joblib')
|
|||
|
|
|||
|
testSetPredict(X_test, nf, columns)
|