原油处理数据逻辑更改,添加预警消息生成并推送

This commit is contained in:
workpc 2024-11-19 13:27:57 +08:00
parent 280bbab349
commit bba6ae8548
11 changed files with 20304 additions and 34269 deletions

View File

@ -159,7 +159,7 @@ warning_data = {
"funcModule":'原油特征停更预警',
"funcOperation":'原油特征停更预警',
"data":{
'WARNING_TYPE_NAME':'日度数据',
'WARNING_TYPE_NAME':'特征数据停更预警test',
'WARNING_CONTENT':'',
'WARNING_DATE':''
}
@ -181,14 +181,14 @@ warning_data = {
### 开关
is_train = True # 是否训练
is_debug = False # 是否调试
is_eta = False # 是否使用eta接口
is_eta = True # 是否使用eta接口
is_timefurture = True # 是否使用时间特征
is_fivemodels = False # 是否使用之前保存的最佳的5个模型
is_edbcode = False # 特征使用edbcoding列表中的
is_edbnamelist = False # 自定义特征对应上面的edbnamelist
is_update_eta = False # 预测结果上传到eta
is_update_report = False # 是否上传报告
is_update_warning_data = False # 是否上传预警数据
is_update_warning_data = True # 是否上传预警数据
# 数据截取日期
end_time = '' # 数据截取日期

Binary file not shown.

View File

@ -12,6 +12,7 @@ import os
import hmac
import hashlib
import json
import math
import torch
torch.set_float32_matmul_precision("high")
import matplotlib.pyplot as plt
@ -114,7 +115,8 @@ def upload_report_data(token, upload_data):
logger.info("报告上传失败")
return None
def upload_warning_data(token, warning_data):
def upload_warning_data(warning_data):
token = get_head_auth_report()
warning_data = warning_data
headers = {"Authorization": token}
logger.info("预警上传中...")
@ -127,6 +129,80 @@ def upload_warning_data(token, warning_data):
logger.info("预警上传失败")
return None
def upload_warning_info(last_update_times_df,y_last_update_time):
logger.info(f'上传预警信息')
try:
warning_data_df = last_update_times_df[last_update_times_df['warning_date']<y_last_update_time][['stop_update_period','warning_date','last_update_time','update_period','feature']]
warning_data_df.columns = ['停更周期','预警日期','最后更新时间','更新周期','特征名称']
if len(warning_data_df) > 0:
content = '原油特征指标预警信息:\n\n'
warning_data_df = warning_data_df.sort_values(by='停更周期',ascending=False)
fixed_length = 30
warning_data_df['特征名称'] = warning_data_df['特征名称'].str.replace(" ", "")
content = warning_data_df.to_string(index=False, col_space=fixed_length)
else:
logger.info(f'没有需要上传的预警信息')
content = '没有需要维护的特征指标'
warning_date = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
warning_data['data']['WARNING_DATE'] = warning_date
warning_data['data']['WARNING_CONTENT'] = content
upload_warning_data(warning_data)
logger.info(f'上传预警信息成功')
except Exception as e:
logger.error(f'上传预警信息失败:{e}')
def create_feature_last_update_time(df):
"""
计算特征停更信息用
参数:
df (DataFrame): 包含特征数据的 DataFrame
返回:
DataFrame: 包含特征停更信息的 DataFrame
str: y 列的最后更新时间
"""
df1 = df.copy()
# 找到每列的最后更新时间
df1.set_index('ds', inplace=True)
last_update_times = df1.apply(lambda x: x.dropna().index.max().strftime('%Y-%m-%d') if not x.dropna().empty else None)
# 保存每列的最后更新时间到文件
last_update_times_df = pd.DataFrame(columns = ['feature', 'last_update_time','is_value','update_period','warning_date','stop_update_period'])
# 打印每列的最后更新时间
for column, last_update_time in last_update_times.items():
values = []
# 判断是不是常数值
if df1[column].tail(20).nunique() == 1:
values = values + [column, last_update_time,1]
else:
values = values + [column, last_update_time,0]
# 计算特征数据值的时间差
try:
# 计算预警日期
time_diff = (df1[column].dropna().index.to_series().diff().mode()[0]).total_seconds() / 3600 / 24
from datetime import timedelta
last_update_time_datetime = datetime.datetime.strptime(last_update_time, '%Y-%m-%d')
last_update_date = end_time if end_time != '' else datetime.datetime.now().strftime('%Y-%m-%d')
end_time_datetime = datetime.datetime.strptime(last_update_date, '%Y-%m-%d')
early_warning_date = last_update_time_datetime + timedelta(days=time_diff)*2 + timedelta(days=1)
stop_update_period = int(math.ceil((end_time_datetime-last_update_time_datetime).days / time_diff))
early_warning_date = early_warning_date.strftime('%Y-%m-%d')
except KeyError:
time_diff = 0
early_warning_date = end_time
values = values + [time_diff,early_warning_date,stop_update_period]
last_update_times_df.loc[len(last_update_times_df)] = values
logger.info(f"Column {column} was last updated at {last_update_time}")
y_last_update_time = last_update_times_df[last_update_times_df['feature']=='y']['warning_date'].values[0]
last_update_times_df.to_csv(os.path.join(dataset,'last_update_times.csv'), index=False)
logger.info('特征停更信息保存到文件last_update_times.csv')
return last_update_times_df,y_last_update_time
# 统计特征频度
def featurePindu(dataset):
@ -445,7 +521,30 @@ def calculate_kdj(data, n=9):
# data = data.dropna()
return data
def check_column(df,col_name,two_months_ago):
'''
检查列是否需要删除
该函数会检查列是否为空值列180天没有更新的列或常数值列
参数:
col_name (str): 列名
df (DataFrame): 包含列的 DataFrame
返回:
bool: 如果列需要删除返回 True否则返回 False
'''
if 'ds' in col_name or 'y' in col_name:
return False
df_check_column = df[['ds',col_name]]
df_check_column = df_check_column.dropna()
if len(df_check_column) == 0:
print(f'空值列:{col_name}')
return True
# 判断是不是常数列
if df_check_column[(df_check_column['ds']>= two_months_ago)].groupby(col_name).ngroups < 2:
print(f'180没有更新{col_name}')
return True
corresponding_date = df_check_column.iloc[-1]['ds']
return corresponding_date < two_months_ago
def datachuli(df_zhibiaoshuju,df_zhibiaoliebiao,datecol='date',end_time='',y='y',dataset='dataset',delweekenday=False,add_kdj=False,is_timefurture=False):
'''
@ -467,107 +566,27 @@ def datachuli(df_zhibiaoshuju,df_zhibiaoliebiao,datecol='date',end_time='',y='y'
df = df[df['ds'].dt.year >= 2018]
df = df[df['ds'] <= end_time]
df1 = df.copy() # 计算特征停更信息用
# 找到每列的最后更新时间
df1.set_index('ds', inplace=True)
last_update_times = df1.apply(lambda x: x.dropna().index.max().strftime('%Y-%m-%d') if not x.dropna().empty else None)
# 保存每列的最后更新时间到文件
last_update_times_df = pd.DataFrame(columns = ['feature', 'last_update_time','is_value','update_period','warning_date'])
# 打印每列的最后更新时间
for column, last_update_time in last_update_times.items():
values = []
# 判断是不是常数值
if df1[column].tail(20).nunique() == 1:
values = values + [column, last_update_time,1]
else:
values = values + [column, last_update_time,0]
# 计算特征数据值的时间差
try:
# 计算预警日期
time_diff = (df1[column].dropna().index.to_series().diff().mode()[0]).total_seconds() / 3600 / 24
from datetime import timedelta
early_warning_date = datetime.datetime.strptime(last_update_time, '%Y-%m-%d') + timedelta(days=time_diff)*2 + timedelta(days=1)
early_warning_date = early_warning_date.strftime('%Y-%m-%d')
except KeyError:
time_diff = 0
early_warning_date = end_time
values = values + [time_diff,early_warning_date]
last_update_times_df.loc[len(last_update_times_df)] = values
logger.info(f"Column {column} was last updated at {last_update_time}")
last_update_times_df.to_csv(os.path.join(dataset,'last_update_times.csv'), index=False)
logger.info('特征停更信息保存到文件last_update_times.csv')
last_update_times_df,y_last_update_time = create_feature_last_update_time(df)
logger.info(f'删除预警的特征前数据量:{df.shape}')
y_last_update_time = last_update_times_df[last_update_times_df['feature']=='y']['warning_date'].values[0]
columns_to_drop = last_update_times_df[last_update_times_df['warning_date'] < y_last_update_time ]['feature'].values.tolist()
df = df.drop(columns = columns_to_drop)
logger.info(f'删除预警的特征后数据量:{df.shape}')
if is_update_warning_data:
logger.info(f'上传预警信息')
try:
warning_data_df = last_update_times_df[last_update_times_df['warning_date']<y_last_update_time][['last_update_time','feature']]
if len(warning_data_df) > 0:
content = '原油特征指标预警信息:\n\n'
warning_data_df = warning_data_df.sort_values(by='last_update_time',ascending=False)
for ds, df in warning_data_df.groupby('last_update_time'):
content += f'{ds} \n {df["feature"].to_string(index=False).replace(" ", "")}\n\n'
else:
logger.info(f'没有需要上传的预警信息')
content = '没有需要维护的特征指标'
warning_data['data']['WARNING_DATE'] = y_last_update_time
warning_data['data']['WARNING_CONTENT'] = content
token = get_head_auth_report()
upload_warning_data(token, warning_data)
logger.info(f'上传预警信息成功')
except Exception as e:
logger.error(f'上传预警信息失败:{e}')
upload_warning_info(last_update_times_df,y_last_update_time)
# 去掉近最后数据对应的日期在六月以前的列删除近2月的数据是常熟的列
current_date = datetime.datetime.now()
two_months_ago = current_date - timedelta(days=180)
logger.info(f'删除两月不更新特征前数据量:{df.shape}')
def check_column(col_name):
'''
去掉空值列
去掉180天没有更新的列
去掉常数值列
输入列名
输出True or False
'''
if 'ds' in col_name or 'y' in col_name:
return False
df_check_column = df[['ds',col_name]]
df_check_column = df_check_column.dropna()
columns_to_drop = []
for clo in df.columns:
if check_column(df,clo,two_months_ago):
columns_to_drop.append(clo)
df = df.drop(columns=columns_to_drop)
if len(df_check_column) == 0:
print(f'空值列:{col_name}')
return True
# 判断是不是常数列
if df_check_column[(df_check_column['ds']>= two_months_ago)].groupby(col_name).ngroups < 2:
print(f'180没有更新{col_name}')
return True
corresponding_date = df_check_column.iloc[-1]['ds']
return corresponding_date < two_months_ago
columns_to_drop = df.columns[df.columns.map(check_column)].tolist()
df = df.drop(columns = columns_to_drop)
logger.info(f'删除两月不更新特征后数据量:{df.shape}')
# 删除预测列空值的行
df = df.dropna(subset=['y'])
logger.info(f'删除预测列为空值的行后数据量:{df.shape}')

File diff suppressed because one or more lines are too long

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because one or more lines are too long

File diff suppressed because it is too large Load Diff

View File

@ -11,63 +11,103 @@ torch.set_float32_matmul_precision("high")
sqlitedb = SQLiteHandler(db_name)
sqlitedb.connect()
def predict_main():
"""
主预测函数用于从 ETA 获取数据处理数据训练模型并进行预测
参数:
signature (BinanceAPI): Binance API 实例
etadata (EtaReader): ETA 数据读取器实例
is_eta (bool): 是否从 ETA 获取数据
data_set (str): 数据集名称
dataset (str): 数据集路径
add_kdj (bool): 是否添加 KDJ 指标
is_timefurture (bool): 是否添加时间衍生特征
end_time (str): 结束时间
is_edbnamelist (bool): 是否使用 EDB 名称列表
edbnamelist (list): EDB 名称列表
y (str): 预测目标列名
sqlitedb (SQLiteDB): SQLite 数据库实例
is_corr (bool): 是否进行相关性分析
horizon (int): 预测时域
input_size (int): 输入数据大小
train_steps (int): 训练步数
val_check_steps (int): 验证检查步数
early_stop_patience_steps (int): 早停耐心步数
is_debug (bool): 是否调试模式
dataset (str): 数据集名称
is_train (bool): 是否训练模型
is_fivemodels (bool): 是否使用五个模型
val_size (float): 验证集大小
test_size (float): 测试集大小
settings (dict): 模型设置
now (str): 当前时间
etadata (EtaReader): ETA 数据读取器实例
modelsindex (list): 模型索引列表
data (str): 数据类型
is_eta (bool): 是否从 ETA 获取数据
返回:
None
"""
signature = BinanceAPI(APPID, SECRET)
etadata = EtaReader(signature=signature,
classifylisturl = classifylisturl,
classifyidlisturl=classifyidlisturl,
edbcodedataurl=edbcodedataurl,
edbcodelist=edbcodelist,
edbdatapushurl=edbdatapushurl,
edbdeleteurl=edbdeleteurl,
edbbusinessurl=edbbusinessurl
)
classifylisturl=classifylisturl,
classifyidlisturl=classifyidlisturl,
edbcodedataurl=edbcodedataurl,
edbcodelist=edbcodelist,
edbdatapushurl=edbdatapushurl,
edbdeleteurl=edbdeleteurl,
edbbusinessurl=edbbusinessurl
)
# 获取数据
if is_eta:
logger.info('从eta获取数据...')
signature = BinanceAPI(APPID, SECRET)
etadata = EtaReader(signature=signature,
classifylisturl = classifylisturl,
classifyidlisturl=classifyidlisturl,
edbcodedataurl=edbcodedataurl,
edbcodelist=edbcodelist,
edbdatapushurl=edbdatapushurl,
edbdeleteurl=edbdeleteurl,
edbbusinessurl=edbbusinessurl,
)
df_zhibiaoshuju,df_zhibiaoliebiao = etadata.get_eta_api_yuanyou_data(data_set=data_set,dataset=dataset) # 原始数据,未处理
classifylisturl=classifylisturl,
classifyidlisturl=classifyidlisturl,
edbcodedataurl=edbcodedataurl,
edbcodelist=edbcodelist,
edbdatapushurl=edbdatapushurl,
edbdeleteurl=edbdeleteurl,
edbbusinessurl=edbbusinessurl,
)
df_zhibiaoshuju, df_zhibiaoliebiao = etadata.get_eta_api_yuanyou_data(data_set=data_set, dataset=dataset) # 原始数据,未处理
# 数据处理
df = datachuli(df_zhibiaoshuju,df_zhibiaoliebiao,y = y,dataset=dataset,add_kdj=add_kdj,is_timefurture=is_timefurture,end_time=end_time)
df = datachuli(df_zhibiaoshuju, df_zhibiaoliebiao, y=y, dataset=dataset, add_kdj=add_kdj, is_timefurture=is_timefurture,
end_time=end_time)
else:
# 读取数据
logger.info('读取本地数据:'+os.path.join(dataset,data_set))
df = getdata(filename=os.path.join(dataset,data_set),y=y,dataset=dataset,add_kdj=add_kdj,is_timefurture=is_timefurture,end_time=end_time) # 原始数据,未处理
logger.info('读取本地数据:' + os.path.join(dataset, data_set))
df = getdata(filename=os.path.join(dataset, data_set), y=y, dataset=dataset, add_kdj=add_kdj,
is_timefurture=is_timefurture, end_time=end_time) # 原始数据,未处理
# 更改预测列名称
df.rename(columns={y:'y'},inplace=True)
df.rename(columns={y: 'y'}, inplace=True)
if is_edbnamelist:
df = df[edbnamelist]
df.to_csv(os.path.join(dataset,'指标数据.csv'), index=False)
df = df[edbnamelist]
df.to_csv(os.path.join(dataset, '指标数据.csv'), index=False)
# 保存最新日期的y值到数据库
# 取第一行数据存储到数据库中
first_row = df[['ds','y']].tail(1)
first_row = df[['ds', 'y']].tail(1)
# 将最新真实值保存到数据库
if not sqlitedb.check_table_exists('trueandpredict'):
first_row.to_sql('trueandpredict',sqlitedb.connection,index=False)
first_row.to_sql('trueandpredict', sqlitedb.connection, index=False)
else:
for row in first_row.itertuples(index=False):
row_dict = row._asdict()
row_dict['ds'] = row_dict['ds'].strftime('%Y-%m-%d %H:%M:%S')
check_query = sqlitedb.select_data('trueandpredict',where_condition = f"ds = '{row.ds}'")
check_query = sqlitedb.select_data('trueandpredict', where_condition=f"ds = '{row.ds}'")
if len(check_query) > 0:
set_clause = ", ".join([f"{key} = '{value}'" for key, value in row_dict.items()])
sqlitedb.update_data('trueandpredict',set_clause,where_condition = f"ds = '{row.ds}'")
sqlitedb.update_data('trueandpredict', set_clause, where_condition=f"ds = '{row.ds}'")
continue
sqlitedb.insert_data('trueandpredict',tuple(row_dict.values()),columns=row_dict.keys())
sqlitedb.insert_data('trueandpredict', tuple(row_dict.values()), columns=row_dict.keys())
import datetime
# 判断当前日期是不是周一
@ -75,10 +115,10 @@ def predict_main():
if is_weekday:
logger.info('今天是周一,更新预测模型')
# 计算最近20天预测残差最低的模型名称
model_results = sqlitedb.select_data('trueandpredict',order_by = "ds DESC",limit = "20")
model_results = sqlitedb.select_data('trueandpredict', order_by="ds DESC", limit="20")
model_results = model_results.dropna()
modelnames = model_results.columns.to_list()[2:]
modelnames = model_results.columns.to_list()[2:]
for col in model_results[modelnames].select_dtypes(include=['object']).columns:
model_results[col] = model_results[col].astype(np.float32)
# 计算每个预测值与真实值之间的偏差率
@ -96,49 +136,47 @@ def predict_main():
logger.info(f"最近20天预测残差最低的模型名称{most_common_model}")
# 保存结果到数据库
if not sqlitedb.check_table_exists('most_model'):
sqlitedb.create_table('most_model',columns="ds datetime, most_common_model TEXT")
sqlitedb.insert_data('most_model',(datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'),most_common_model,),columns=('ds','most_common_model',))
sqlitedb.create_table('most_model', columns="ds datetime, most_common_model TEXT")
sqlitedb.insert_data('most_model', (datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), most_common_model,), columns=('ds', 'most_common_model',))
if is_corr:
df = corr_feature(df=df)
df1 = df.copy() # 备份一下后面特征筛选完之后加入ds y 列用
logger.info(f"开始训练模型...")
row,col = df.shape
row, col = df.shape
now = datetime.datetime.now().strftime('%Y%m%d%H%M%S')
ex_Model(df,
horizon=horizon,
input_size=input_size,
train_steps=train_steps,
val_check_steps=val_check_steps,
early_stop_patience_steps=early_stop_patience_steps,
is_debug=is_debug,
dataset=dataset,
is_train=is_train,
is_fivemodels=is_fivemodels,
val_size=val_size,
test_size=test_size,
settings=settings,
now=now,
etadata = etadata,
modelsindex = modelsindex,
data = data,
is_eta=is_eta,
)
horizon=horizon,
input_size=input_size,
train_steps=train_steps,
val_check_steps=val_check_steps,
early_stop_patience_steps=early_stop_patience_steps,
is_debug=is_debug,
dataset=dataset,
is_train=is_train,
is_fivemodels=is_fivemodels,
val_size=val_size,
test_size=test_size,
settings=settings,
now=now,
etadata=etadata,
modelsindex=modelsindex,
data=data,
is_eta=is_eta,
)
logger.info('模型训练完成')
# # 模型评估
logger.info('训练数据绘图ing')
model_results3 = model_losss_juxiting(sqlitedb)
logger.info('训练数据绘图end')
# 模型报告
# 模型报告
logger.info('制作报告ing')
title = f'{settings}--{now}-预测报告' # 报告标题
@ -147,9 +185,7 @@ def predict_main():
logger.info('制作报告end')
logger.info('模型训练完成')
# tansuanli_export_pdf(dataset=dataset,num_models = 5 if is_fivemodels else 22,end_time=end_time,reportname=reportname)
# # LSTM 单变量模型
# ex_Lstm(df,input_seq_len=input_size,output_seq_len=horizon,is_debug=is_debug,dataset=dataset)

Binary file not shown.

File diff suppressed because one or more lines are too long