diff --git a/config_juxiting.py b/config_juxiting.py index bcb9242..f5d2612 100644 --- a/config_juxiting.py +++ b/config_juxiting.py @@ -227,8 +227,16 @@ add_kdj = False # 是否添加kdj指标 if add_kdj and is_edbnamelist: edbnamelist = edbnamelist+['K','D','J'] ### 模型参数 -y = 'PP:拉丝:1102K:市场价:青州:国家能源宁煤(日)' # 原油指标数据的目标变量 -# y = '期货结算价(连续):布伦特原油:前一个观测值' # ineoil的目标变量 +# y = 'PP:拉丝:1102K:市场价:青州:国家能源宁煤(日)' # 原油指标数据的目标变量 +y = 'AVG(金能大唐久泰青州)' # 原油指标数据的目标变量 +avg_cols = [ + 'PP:拉丝:1102K:出厂价:青州:国家能源宁煤(日)', + 'PP:拉丝:L5E89:出厂价:华北(第二区域):内蒙古久泰新材料(日)', + 'PP:拉丝:L5E89:出厂价:河北、鲁北:大唐内蒙多伦(日)', + 'PP:拉丝:HP550J:市场价:青岛:金能化学(日)' +] +offsite = 50 +offsite_col = ['PP:拉丝:HP550J:市场价:青岛:金能化学(日)'] horizon =5 # 预测的步长 input_size = 40 # 输入序列长度 train_steps = 50 if is_debug else 1000 # 训练步数,用来限定epoch次数 diff --git a/lib/dataread.py b/lib/dataread.py index 1320b42..d1496c9 100644 --- a/lib/dataread.py +++ b/lib/dataread.py @@ -501,6 +501,75 @@ def datachuli(df_zhibiaoshuju,df_zhibiaoliebiao,datecol='date',end_time='',y='y' featureAnalysis(df,dataset=dataset,y=y) return df +def datachuli_juxiting(df_zhibiaoshuju,df_zhibiaoliebiao,datecol='date',end_time='',y='y',dataset='dataset',delweekenday=False,add_kdj=False,is_timefurture=False): + df = df_zhibiaoshuju.copy() + if end_time == '': + end_time = datetime.datetime.now().strftime('%Y-%m-%d') + # date转为pddate + df.rename(columns={datecol:'ds'},inplace=True) + + df[offsite_col] = df[offsite_col]-offsite + df['AVG(金能大唐久泰青州)'] = df[avg_cols].mean(axis=1) + print(df[['ds','AVG(金能大唐久泰青州)']+avg_cols].head(20)) + # 重命名预测列 + df.rename(columns={y:'y'},inplace=True) + # 按时间顺序排列 + df.sort_values(by='ds',inplace=True) + df['ds'] = pd.to_datetime(df['ds']) + # 获取2018年到当前日期的数据 + df = df[df['ds'].dt.year >= 2018] + # 获取小于等于当前日期的数据 + df = df[df['ds'] <= end_time] + logger.info(f'删除两月不更新特征前数据量:{df.shape}') + # 去掉近最后数据对应的日期在两月以前的列,删除近2月的数据是常熟的列 + current_date = datetime.datetime.now() + two_months_ago = current_date - timedelta(days=40) + + def check_column(col_name): + if 'ds' in col_name or 'y' in col_name: + return False + df_check_column = df[['ds',col_name]] + df_check_column = df_check_column.dropna() + if len(df_check_column) == 0: + return True + if df_check_column[(df_check_column['ds']>= two_months_ago)].groupby(col_name).ngroups < 2: + return True + corresponding_date = df_check_column.iloc[-1]['ds'] + return corresponding_date < two_months_ago + columns_to_drop = df.columns[df.columns.map(check_column)].tolist() + df = df.drop(columns = columns_to_drop) + + logger.info(f'删除两月不更新特征后数据量:{df.shape}') + + # 删除预测列空值的行 + df = df.dropna(subset=['y']) + logger.info(f'删除预测列为空值的行后数据量:{df.shape}') + df = df.dropna(axis=1, how='all') + logger.info(f'删除全为空值的列后数据量:{df.shape}') + df.to_csv(os.path.join(dataset,'未填充的特征数据.csv'),index=False) + # 去掉指标列表中的columns_to_drop的行 + df_zhibiaoliebiao = df_zhibiaoliebiao[df_zhibiaoliebiao['指标名称'].isin(df.columns.tolist())] + df_zhibiaoliebiao.to_csv(os.path.join(dataset,'特征处理后的指标名称及分类.csv'),index=False) + # 频度分析 + featurePindu(dataset=dataset) + # 向上填充 + df = df.ffill() + # 向下填充 + df = df.bfill() + + # 删除周六日的数据 + if delweekenday: + df = df[df['ds'].dt.weekday < 5] + + if add_kdj: + df = calculate_kdj(df) + + if is_timefurture: + df = addtimecharacteristics(df=df,dataset=dataset) + + featureAnalysis(df,dataset=dataset,y=y) + return df + def getdata(filename, datecol='date',y='y',dataset='',add_kdj=False,is_timefurture=False,end_time=''): logger.info('getdata接收:'+filename+' '+datecol+' '+end_time) # 判断后缀名 csv或excel @@ -514,6 +583,20 @@ def getdata(filename, datecol='date',y='y',dataset='',add_kdj=False,is_timefurtu # 日期字符串转为datatime df = datachuli(df_zhibiaoshuju,df_zhibiaoliebiao,datecol,y = y,dataset=dataset,add_kdj=add_kdj,is_timefurture=is_timefurture,end_time=end_time) + return df +def getdata_juxiting(filename, datecol='date',y='y',dataset='',add_kdj=False,is_timefurture=False,end_time=''): + logger.info('getdata接收:'+filename+' '+datecol+' '+end_time) + # 判断后缀名 csv或excel + if filename.endswith('.csv'): + df = loadcsv(filename) + else: + # 读取excel 指标数据 + df_zhibiaoshuju = pd.read_excel(filename,sheet_name='指标数据') + df_zhibiaoliebiao = pd.read_excel(filename,sheet_name='指标列表') + + # 日期字符串转为datatime + df = datachuli_juxiting(df_zhibiaoshuju,df_zhibiaoliebiao,datecol,y = y,dataset=dataset,add_kdj=add_kdj,is_timefurture=is_timefurture,end_time=end_time) + return df # def filter_data(ClassifyName,data): diff --git a/main.py b/main.py index f0e2ede..802697d 100644 --- a/main.py +++ b/main.py @@ -114,25 +114,25 @@ def predict_main(): row,col = df.shape now = datetime.datetime.now().strftime('%Y%m%d%H%M%S') - # ex_Model(df, - # horizon=horizon, - # input_size=input_size, - # train_steps=train_steps, - # val_check_steps=val_check_steps, - # early_stop_patience_steps=early_stop_patience_steps, - # is_debug=is_debug, - # dataset=dataset, - # is_train=is_train, - # is_fivemodels=is_fivemodels, - # val_size=val_size, - # test_size=test_size, - # settings=settings, - # now=now, - # etadata = etadata, - # modelsindex = modelsindex, - # data = data, - # is_eta=is_eta, - # ) + ex_Model(df, + horizon=horizon, + input_size=input_size, + train_steps=train_steps, + val_check_steps=val_check_steps, + early_stop_patience_steps=early_stop_patience_steps, + is_debug=is_debug, + dataset=dataset, + is_train=is_train, + is_fivemodels=is_fivemodels, + val_size=val_size, + test_size=test_size, + settings=settings, + now=now, + etadata = etadata, + modelsindex = modelsindex, + data = data, + is_eta=is_eta, + ) logger.info('模型训练完成') diff --git a/main_juxiting.py b/main_juxiting.py new file mode 100644 index 0000000..df8f85a --- /dev/null +++ b/main_juxiting.py @@ -0,0 +1,178 @@ +# 读取配置 +from config_juxiting import * +from lib.dataread import * +from lib.tools import * +from models.nerulforcastmodels import ex_Model,model_losss,brent_export_pdf,tansuanli_export_pdf,pp_export_pdf,model_losss_juxiting + +import glob +import torch +torch.set_float32_matmul_precision("high") + +sqlitedb = SQLiteHandler(db_name) +sqlitedb.connect() + +def predict_main(): + signature = BinanceAPI(APPID, SECRET) + etadata = EtaReader(signature=signature, + classifylisturl = classifylisturl, + classifyidlisturl=classifyidlisturl, + edbcodedataurl=edbcodedataurl, + edbcodelist=edbcodelist, + edbdatapushurl=edbdatapushurl, + edbdeleteurl=edbdeleteurl, + edbbusinessurl=edbbusinessurl + ) + # 获取数据 + if is_eta: + # eta数据 + logger.info('从eta获取数据...') + signature = BinanceAPI(APPID, SECRET) + etadata = EtaReader(signature=signature, + classifylisturl = classifylisturl, + classifyidlisturl=classifyidlisturl, + edbcodedataurl=edbcodedataurl, + edbcodelist=edbcodelist, + edbdatapushurl=edbdatapushurl, + edbdeleteurl=edbdeleteurl, + edbbusinessurl=edbbusinessurl, + ) + df_zhibiaoshuju,df_zhibiaoliebiao = etadata.get_eta_api_pp_data(data_set=data_set,dataset=dataset) # 原始数据,未处理 + + # 数据处理 + df = datachuli_juxiting(df_zhibiaoshuju,df_zhibiaoliebiao,y = y,dataset=dataset,add_kdj=add_kdj,is_timefurture=is_timefurture,end_time=end_time) + + else: + logger.info('读取本地数据:'+os.path.join(dataset,data_set)) + df = getdata_juxiting(filename=os.path.join(dataset,data_set),y=y,dataset=dataset,add_kdj=add_kdj,is_timefurture=is_timefurture,end_time=end_time) # 原始数据,未处理 + + # 更改预测列名称 + df.rename(columns={y:'y'},inplace=True) + + if is_edbnamelist: + df = df[edbnamelist] + df.to_csv(os.path.join(dataset,'指标数据.csv'), index=False) + # 保存最新日期的y值到数据库 + # 取第一行数据存储到数据库中 + first_row = df[['ds','y']].tail(1) + # 将最新真实值保存到数据库 + if not sqlitedb.check_table_exists('trueandpredict'): + first_row.to_sql('trueandpredict',sqlitedb.connection,index=False) + else: + for row in first_row.itertuples(index=False): + row_dict = row._asdict() + row_dict['ds'] = row_dict['ds'].strftime('%Y-%m-%d %H:%M:%S') + check_query = sqlitedb.select_data('trueandpredict',where_condition = f"ds = '{row.ds}'") + if len(check_query) > 0: + set_clause = ", ".join([f"{key} = '{value}'" for key, value in row_dict.items()]) + sqlitedb.update_data('trueandpredict',set_clause,where_condition = f"ds = '{row.ds}'") + continue + sqlitedb.insert_data('trueandpredict',tuple(row_dict.values()),columns=row_dict.keys()) + + import datetime + # 判断当前日期是不是周一 + is_weekday = datetime.datetime.now().weekday() == 0 + if is_weekday: + logger.info('今天是周一,更新预测模型') + # 计算最近20天预测残差最低的模型名称 + + model_results = sqlitedb.select_data('trueandpredict',order_by = "ds DESC",limit = "20") + model_results = model_results.dropna() + modelnames = model_results.columns.to_list()[2:] + for col in model_results[modelnames].select_dtypes(include=['object']).columns: + model_results[col] = model_results[col].astype(np.float32) + # 计算每个预测值与真实值之间的偏差率 + for model in modelnames: + model_results[f'{model}_abs_error_rate'] = abs(model_results['y'] - model_results[model]) / model_results['y'] + + # 获取每行对应的最小偏差率值 + min_abs_error_rate_values = model_results.apply(lambda row: row[[f'{model}_abs_error_rate' for model in modelnames]].min(), axis=1) + # 获取每行对应的最小偏差率值对应的列名 + min_abs_error_rate_column_name = model_results.apply(lambda row: row[[f'{model}_abs_error_rate' for model in modelnames]].idxmin(), axis=1) + # 将列名索引转换为列名 + min_abs_error_rate_column_name = min_abs_error_rate_column_name.map(lambda x: x.split('_')[0]) + # 取出现次数最多的模型名称 + most_common_model = min_abs_error_rate_column_name.value_counts().idxmax() + logger.info(f"最近20天预测残差最低的模型名称:{most_common_model}") + + # 保存结果到数据库 + + if not sqlitedb.check_table_exists('most_model'): + sqlitedb.create_table('most_model',columns="ds datetime, most_common_model TEXT") + sqlitedb.insert_data('most_model',(datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'),most_common_model,),columns=('ds','most_common_model',)) + + if is_corr: + df = corr_feature(df=df) + + df1 = df.copy() # 备份一下,后面特征筛选完之后加入ds y 列用 + logger.info(f"开始训练模型...") + row,col = df.shape + + now = datetime.datetime.now().strftime('%Y%m%d%H%M%S') + ex_Model(df, + horizon=horizon, + input_size=input_size, + train_steps=train_steps, + val_check_steps=val_check_steps, + early_stop_patience_steps=early_stop_patience_steps, + is_debug=is_debug, + dataset=dataset, + is_train=is_train, + is_fivemodels=is_fivemodels, + val_size=val_size, + test_size=test_size, + settings=settings, + now=now, + etadata = etadata, + modelsindex = modelsindex, + data = data, + is_eta=is_eta, + ) + + + logger.info('模型训练完成') + # # 模型评估 + + logger.info('训练数据绘图ing') + model_results3 = model_losss_juxiting(sqlitedb) + + logger.info('训练数据绘图end') + # 模型报告 + + logger.info('制作报告ing') + title = f'{settings}--{now}-预测报告' # 报告标题 + if 'Brent' in y: + brent_export_pdf(dataset=dataset,num_models = 5 if is_fivemodels else 22,time=end_time, + reportname=reportname,sqlitedb=sqlitedb), + else: + pp_export_pdf(dataset=dataset,num_models = 5 if is_fivemodels else 22,time=end_time, + reportname=reportname,sqlitedb=sqlitedb), + + logger.info('制作报告end') + logger.info('模型训练完成') + + # tansuanli_export_pdf(dataset=dataset,num_models = 5 if is_fivemodels else 22,end_time=end_time,reportname=reportname) + + # # LSTM 单变量模型 + # ex_Lstm(df,input_seq_len=input_size,output_seq_len=horizon,is_debug=is_debug,dataset=dataset) + + # # lstm 多变量模型 + # ex_Lstm_M(df,n_days=input_size,out_days=horizon,is_debug=is_debug,datasetpath=dataset) + + # # GRU 模型 + # # ex_GRU(df) + + # 发送邮件 + m = SendMail( + username=username, + passwd=passwd, + recv=recv, + title=title, + content=content, + file=max(glob.glob(os.path.join(dataset,'*.pdf')), key=os.path.getctime), + ssl=ssl, + ) + # m.send_mail() + + +if __name__ == '__main__': + predict_main() \ No newline at end of file