上传图片报告调试

This commit is contained in:
workpc 2025-08-05 15:49:29 +08:00
parent 5ff6213717
commit cc0295822a
4 changed files with 611 additions and 1 deletions

View File

@ -329,6 +329,8 @@ upload_warning_url = f"http://{server_host}/jingbo-dev/api/basicBuiness/crudeOil
query_data_list_item_nos_url = f"http://{server_host}/jingbo-dev/api/warehouse/dwDataItem/queryDataListItemNos"
# 上传数据项值
push_data_value_list_url = f"http://{server_host}/jingbo-dev/api/dw/dataValue/pushDataValueList"
# 上传图片报告
push_png_report_url = f"http://{server_host}/jingbo-dev/api/analysis/reportInfo/priceForecastImg"
login_data = {
"data": {
@ -401,6 +403,21 @@ push_data_value_list_data = {
}
]
}
push_png_report_data = {
"funcModule": '研究报告信息',
"funcOperation": '上传聚烯烃PP价格预测报告',
"data": {
"groupNo": "000161",
"updateTime": "2024-09-06 15:01:29",
"fileBase64": '', # 文件内容base64
"title": '2025年8月5日日度周度预测结果',
"billNo": '',
}
}
# 八大维度数据项编码
bdwd_items = {
'ciri': 'jxtppbdwdcr',

567
juxiting_push_png_report.py Normal file
View File

@ -0,0 +1,567 @@
# 读取配置
from lib.dataread import *
from config_juxiting_yuedu import *
from lib.tools import SendMail, convert_df_to_pydantic_pp, exception_logger, find_best_models, get_modelsname
from models.nerulforcastmodels import ex_Model, model_losss_juxiting, pp_bdwd_png, pp_export_pdf
import datetime
import torch
torch.set_float32_matmul_precision("high")
global_config.update({
# 核心参数
'logger': logger,
'dataset': dataset,
'y': y,
# 'offsite_col': offsite_col,
# 'avg_cols': avg_cols,
# 'offsite': offsite,
'edbcodenamedict': edbcodenamedict,
'is_debug': is_debug,
'is_train': is_train,
'is_fivemodels': is_fivemodels,
'is_update_report': is_update_report,
'settings': settings,
'bdwdname': bdwdname,
'columnsrename': columnsrename,
'price_columns': price_columns,
# 模型参数
'data_set': data_set,
'input_size': input_size,
'horizon': horizon,
'train_steps': train_steps,
'val_check_steps': val_check_steps,
'val_size': val_size,
'test_size': test_size,
'modelsindex': modelsindex,
'rote': rote,
'bdwd_items': bdwd_items,
# 特征工程开关
'is_del_corr': is_del_corr,
'is_del_tow_month': is_del_tow_month,
'is_eta': is_eta,
'is_update_eta': is_update_eta,
'is_fivemodels': is_fivemodels,
'is_update_predict_value': is_update_predict_value,
'early_stop_patience_steps': early_stop_patience_steps,
# 时间参数
'start_year': start_year,
'end_time': end_time or datetime.datetime.now().strftime("%Y-%m-%d"),
'freq': freq, # 保持列表结构
# 接口配置
'login_pushreport_url': login_pushreport_url,
'login_data': login_data,
'upload_url': upload_url,
'upload_data': upload_data,
'upload_warning_url': upload_warning_url,
'warning_data': warning_data,
# 查询接口
'query_data_list_item_nos_url': query_data_list_item_nos_url,
'query_data_list_item_nos_data': query_data_list_item_nos_data,
# 上传数据项
'push_data_value_list_url': push_data_value_list_url,
'push_data_value_list_data': push_data_value_list_data,
'push_png_report_url': push_png_report_url,
'push_png_report_data': push_png_report_data,
# eta 配置
'APPID': APPID,
'SECRET': SECRET,
'etadata': data,
'edbcodelist': edbcodelist,
'ClassifyId': ClassifyId,
'edbcodedataurl': edbcodedataurl,
'classifyidlisturl': classifyidlisturl,
'edbdatapushurl': edbdatapushurl,
'edbdeleteurl': edbdeleteurl,
'edbbusinessurl': edbbusinessurl,
'ClassifyId': ClassifyId,
'classifylisturl': classifylisturl,
# 数据库配置
'sqlitedb': sqlitedb,
'is_bdwd': is_bdwd,
'db_mysql': db_mysql,
'DEFAULT_CONFIG': DEFAULT_CONFIG,
})
def push_png_report():
logger.info('发送图片到钉钉工作组')
current_end_time = global_config['end_time']
previous_trading_day = (pd.Timestamp(current_end_time) -
pd.tseries.offsets.BusinessDay(1)).strftime('%Y-%m-%d')
png_report_files = ['pp_zhouducorrelation.png', 'pp_yueducorrelation.png']
with open(os.path.join(global_config['dataset'], 'pp_zhouducorrelation.png'), 'rb') as f:
base64_data = base64.b64encode(f.read()).decode('utf-8')
config.upload_data["data"]["fileBase64"] = base64_data
data = global_config['push_png_report_data']
data['data']['fileBase64'] = base64_data
pngreportdata = push_png_report(data)
print(pngreportdata)
def push_market_value():
logger.info('发送预测结果到市场信息平台')
current_end_time = global_config['end_time']
previous_trading_day = (pd.Timestamp(current_end_time) -
pd.tseries.offsets.BusinessDay(1)).strftime('%Y-%m-%d')
# 读取预测数据和模型评估数据
best_bdwd_price = find_best_models(
date=previous_trading_day, global_config=global_config)
# 获取本月最佳模型的预测价格
four_month_predict_price = pd.read_csv(
os.path.join(global_config['dataset'], 'predict.csv'))
four_month_predict_price['ds'] = pd.to_datetime(
four_month_predict_price['ds'])
# 设置索引 次月 次二月 次三月 次四月
index_labels = ["次月", "次二月", "次三月", "次四月"]
four_month_predict_price.index = index_labels
global_config['logger'].info(f"best_bdwd_price: {best_bdwd_price}")
# 准备要推送的数据
ciyue_mean = four_month_predict_price[best_bdwd_price['next_month_price']
['model_name']].iloc[0]
cieryue_mean = four_month_predict_price[best_bdwd_price['next_february_price']
['model_name']].iloc[1]
cisanyue_mean = four_month_predict_price[best_bdwd_price['next_march_price']
['model_name']].iloc[2]
cisieryue_mean = four_month_predict_price[best_bdwd_price['next_april_price']
['model_name']].iloc[3]
# # 保留两位小数
ciyue_mean = round(ciyue_mean, 2)
cieryue_mean = round(cieryue_mean, 2)
cisanyue_mean = round(cisanyue_mean, 2)
cisieryue_mean = round(cisieryue_mean, 2)
predictdata = [
{
"dataItemNo": global_config['bdwd_items']['ciyue'],
"dataDate": global_config['end_time'].replace('-', ''),
"dataStatus": "add",
"dataValue": ciyue_mean
},
{
"dataItemNo": global_config['bdwd_items']['cieryue'],
"dataDate": global_config['end_time'].replace('-', ''),
"dataStatus": "add",
"dataValue": cieryue_mean
},
{
"dataItemNo": global_config['bdwd_items']['cisanyue'],
"dataDate": global_config['end_time'].replace('-', ''),
"dataStatus": "add",
"dataValue": cisanyue_mean
},
{
"dataItemNo": global_config['bdwd_items']['cisiyue'],
"dataDate": global_config['end_time'].replace('-', ''),
"dataStatus": "add",
"dataValue": cisieryue_mean
}
]
print(predictdata)
# 推送数据到市场信息平台
try:
push_market_data(predictdata)
except Exception as e:
logger.error(f"推送数据失败: {e}")
def sql_inset_predict(global_config):
df = pd.read_csv(os.path.join(config.dataset, 'predict.csv'))
df['created_dt'] = pd.to_datetime(df['created_dt'])
df['ds'] = pd.to_datetime(df['ds'])
# 获取次月预测结果
next_month_price_df = df[df['ds'] == df['ds'].min()]
# 获取次二月预测结果
next_february_price_df = df.iloc[[1]]
# 获取次三月预测结果
next_march_price_df = df.iloc[[2]]
# 获取次四月预测结果
next_april_price_df = df[df['ds'] == df['ds'].max()]
wd = ['next_month_price', 'next_february_price',
'next_march_price', 'next_april_price']
model_name_list, model_id_name_dict = get_modelsname(df, global_config)
PRICE_COLUMNS = [
'day_price', 'week_price', 'second_week_price', 'next_week_price',
'next_month_price', 'next_february_price', 'next_march_price', 'next_april_price'
]
params_list = []
for df, price_type in zip([next_month_price_df, next_february_price_df, next_march_price_df, next_april_price_df], wd):
update_columns = [
"feature_factor_frequency = VALUES(feature_factor_frequency)",
"oil_code = VALUES(oil_code)",
"oil_name = VALUES(oil_name)",
"data_date = VALUES(data_date)",
"market_price = VALUES(market_price)",
f"{price_type} = VALUES({price_type})",
"model_evaluation_id = VALUES(model_evaluation_id)",
"tenant_code = VALUES(tenant_code)",
"version_num = VALUES(version_num)",
"delete_flag = VALUES(delete_flag)",
"update_user = VALUES(update_user)",
"update_date = VALUES(update_date)"
]
insert_query = f"""
INSERT INTO v_tbl_predict_pp_prediction_results (
feature_factor_frequency, strategy_id, oil_code, oil_name, data_date,
market_price, day_price, week_price, second_week_price, next_week_price,
next_month_price, next_february_price, next_march_price, next_april_price,
model_evaluation_id, model_id, tenant_code, version_num, delete_flag,
create_user, create_date, update_user, update_date
) VALUES (
%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s
)
ON DUPLICATE KEY UPDATE
{', '.join(update_columns)}
"""
next_day_df = df[['ds', 'created_dt'] + model_name_list]
pydantic_results = convert_df_to_pydantic_pp(
next_day_df, model_id_name_dict, global_config)
if pydantic_results:
for result in pydantic_results:
price_values = [None] * len(PRICE_COLUMNS)
price_index = PRICE_COLUMNS.index(price_type)
price_values[price_index] = next_day_df[model_id_name_dict[result.model_id]].values[0]
params = (
result.feature_factor_frequency,
result.strategy_id,
result.oil_code,
result.oil_name,
next_day_df['created_dt'].values[0],
result.market_price,
*price_values,
result.model_evaluation_id,
result.model_id,
result.tenant_code,
1,
'0',
result.create_user,
result.create_date,
result.update_user,
result.update_date
)
params_list.append(params)
affected_rows = config.db_mysql.execute_batch_insert(
insert_query, params_list)
config.logger.info(f"成功插入或更新 {affected_rows} 条记录")
config.db_mysql.close()
def predict_main():
"""
主预测函数用于从 ETA 获取数据处理数据训练模型并进行预测
参数:
signature (BinanceAPI): Binance API 实例
etadata (EtaReader): ETA 数据读取器实例
is_eta (bool): 是否从 ETA 获取数据
data_set (str): 数据集名称
dataset (str): 数据集路径
add_kdj (bool): 是否添加 KDJ 指标
is_timefurture (bool): 是否添加时间衍生特征
end_time (str): 结束时间
is_edbnamelist (bool): 是否使用 EDB 名称列表
edbnamelist (list): EDB 名称列表
y (str): 预测目标列名
sqlitedb (SQLiteDB): SQLite 数据库实例
is_corr (bool): 是否进行相关性分析
horizon (int): 预测时域
input_size (int): 输入数据大小
train_steps (int): 训练步数
val_check_steps (int): 验证检查步数
early_stop_patience_steps (int): 早停耐心步数
is_debug (bool): 是否调试模式
dataset (str): 数据集名称
is_train (bool): 是否训练模型
is_fivemodels (bool): 是否使用五个模型
val_size (float): 验证集大小
test_size (float): 测试集大小
settings (dict): 模型设置
now (str): 当前时间
etadata (EtaReader): ETA 数据读取器实例
modelsindex (list): 模型索引列表
data (str): 数据类型
is_eta (bool): 是否从 ETA 获取数据
返回:
None
"""
# end_time = global_config['end_time']
# signature = BinanceAPI(APPID, SECRET)
# etadata = EtaReader(signature=signature,
# classifylisturl=global_config['classifylisturl'],
# classifyidlisturl=global_config['classifyidlisturl'],
# edbcodedataurl=global_config['edbcodedataurl'],
# edbcodelist=global_config['edbcodelist'],
# edbdatapushurl=global_config['edbdatapushurl'],
# edbdeleteurl=global_config['edbdeleteurl'],
# edbbusinessurl=global_config['edbbusinessurl'],
# classifyId=global_config['ClassifyId'],
# )
# # 获取数据
# if is_eta:
# logger.info('从eta获取数据...')
# df_zhibiaoshuju, df_zhibiaoliebiao = etadata.get_eta_api_pp_data(
# data_set=data_set, dataset=dataset) # 原始数据,未处理
# if is_market:
# logger.info('从市场信息平台获取数据...')
# try:
# # 如果是测试环境最高价最低价取excel文档
# if server_host == '192.168.100.53':
# logger.info('从excel文档获取最高价最低价')
# df_zhibiaoshuju = get_high_low_data(df_zhibiaoshuju)
# else:
# logger.info('从市场信息平台获取数据')
# df_zhibiaoshuju = get_market_data(
# end_time, df_zhibiaoshuju)
# except:
# logger.info('最高最低价拼接失败')
# # 保存到xlsx文件的sheet表
# with pd.ExcelWriter(os.path.join(dataset, data_set)) as file:
# df_zhibiaoshuju.to_excel(file, sheet_name='指标数据', index=False)
# df_zhibiaoliebiao.to_excel(file, sheet_name='指标列表', index=False)
# # 数据处理
# df = datachuli_juxiting(df_zhibiaoshuju, df_zhibiaoliebiao, y=global_config['y'], dataset=dataset, add_kdj=add_kdj, is_timefurture=is_timefurture,
# end_time=end_time)
# else:
# # 读取数据
# logger.info('读取本地数据:' + os.path.join(dataset, data_set))
# df, df_zhibiaoliebiao = getdata_zhoudu_juxiting(filename=os.path.join(dataset, data_set), y=y, dataset=dataset, add_kdj=add_kdj,
# is_timefurture=is_timefurture, end_time=end_time) # 原始数据,未处理
# # 更改预测列名称
# df.rename(columns={y: 'y'}, inplace=True)
# if is_edbnamelist:
# df = df[edbnamelist]
# df.to_csv(os.path.join(dataset, '指标数据.csv'), index=False)
# # 保存最新日期的y值到数据库
# # 取第一行数据存储到数据库中
# first_row = df[['ds', 'y']].tail(1)
# # 判断y的类型是否为float
# if not isinstance(first_row['y'].values[0], float):
# logger.info(f'{end_time}预测目标数据为空,跳过')
# return None
# # 将最新真实值保存到数据库
# if not sqlitedb.check_table_exists('trueandpredict'):
# first_row.to_sql('trueandpredict', sqlitedb.connection, index=False)
# else:
# for row in first_row.itertuples(index=False):
# row_dict = row._asdict()
# config.logger.info(f'要保存的真实值:{row_dict}')
# # 判断ds是否为字符串类型,如果不是则转换为字符串类型
# if isinstance(row_dict['ds'], (pd.Timestamp, datetime.datetime)):
# row_dict['ds'] = row_dict['ds'].strftime('%Y-%m-%d')
# elif not isinstance(row_dict['ds'], str):
# try:
# row_dict['ds'] = pd.to_datetime(
# row_dict['ds']).strftime('%Y-%m-%d')
# except:
# logger.warning(f"无法解析的时间格式: {row_dict['ds']}")
# # row_dict['ds'] = row_dict['ds'].strftime('%Y-%m-%d')
# # row_dict['ds'] = row_dict['ds'].strftime('%Y-%m-%d %H:%M:%S')
# check_query = sqlitedb.select_data(
# 'trueandpredict', where_condition=f"ds = '{row.ds}'")
# if len(check_query) > 0:
# set_clause = ", ".join(
# [f"{key} = '{value}'" for key, value in row_dict.items()])
# sqlitedb.update_data(
# 'trueandpredict', set_clause, where_condition=f"ds = '{row.ds}'")
# continue
# sqlitedb.insert_data('trueandpredict', tuple(
# row_dict.values()), columns=row_dict.keys())
# # 更新accuracy表的y值
# if not sqlitedb.check_table_exists('accuracy'):
# pass
# else:
# update_y = sqlitedb.select_data(
# 'accuracy', where_condition="y is null")
# if len(update_y) > 0:
# logger.info('更新accuracy表的y值')
# # 找到update_y 中ds且df中的y的行
# update_y = update_y[update_y['ds'] <= end_time]
# logger.info(f'要更新y的信息{update_y}')
# # try:
# for row in update_y.itertuples(index=False):
# try:
# row_dict = row._asdict()
# yy = df[df['ds'] == row_dict['ds']]['y'].values[0]
# LOW = df[df['ds'] == row_dict['ds']]['Brentzdj'].values[0]
# HIGH = df[df['ds'] == row_dict['ds']]['Brentzgj'].values[0]
# sqlitedb.update_data(
# 'accuracy', f"y = {yy},LOW_PRICE = {LOW},HIGH_PRICE = {HIGH}", where_condition=f"ds = '{row_dict['ds']}'")
# except:
# logger.info(f'更新accuracy表的y值失败{row_dict}')
# # except Exception as e:
# # logger.info(f'更新accuracy表的y值失败{e}')
# # 判断当前日期是不是周一
# is_weekday = datetime.datetime.now().weekday() == 0
# if is_weekday:
# logger.info('今天是周一,更新预测模型')
# # 计算最近60天预测残差最低的模型名称
# model_results = sqlitedb.select_data(
# 'trueandpredict', order_by="ds DESC", limit="60")
# # 删除空值率为90%以上的列
# if len(model_results) > 10:
# model_results = model_results.dropna(
# thresh=len(model_results)*0.1, axis=1)
# # 删除空行
# model_results = model_results.dropna()
# modelnames = model_results.columns.to_list()[2:-1]
# for col in model_results[modelnames].select_dtypes(include=['object']).columns:
# model_results[col] = model_results[col].astype(np.float32)
# # 计算每个预测值与真实值之间的偏差率
# for model in modelnames:
# model_results[f'{model}_abs_error_rate'] = abs(
# model_results['y'] - model_results[model]) / model_results['y']
# # 获取每行对应的最小偏差率值
# min_abs_error_rate_values = model_results.apply(
# lambda row: row[[f'{model}_abs_error_rate' for model in modelnames]].min(), axis=1)
# # 获取每行对应的最小偏差率值对应的列名
# min_abs_error_rate_column_name = model_results.apply(
# lambda row: row[[f'{model}_abs_error_rate' for model in modelnames]].idxmin(), axis=1)
# # 将列名索引转换为列名
# min_abs_error_rate_column_name = min_abs_error_rate_column_name.map(
# lambda x: x.split('_')[0])
# # 取出现次数最多的模型名称
# most_common_model = min_abs_error_rate_column_name.value_counts().idxmax()
# logger.info(f"最近60天预测残差最低的模型名称{most_common_model}")
# # 保存结果到数据库
# if not sqlitedb.check_table_exists('most_model'):
# sqlitedb.create_table(
# 'most_model', columns="ds datetime, most_common_model TEXT")
# sqlitedb.insert_data('most_model', (datetime.datetime.now().strftime(
# '%Y-%m-%d %H:%M:%S'), most_common_model,), columns=('ds', 'most_common_model',))
# if is_corr:
# df = corr_feature(df=df)
# df1 = df.copy() # 备份一下后面特征筛选完之后加入ds y 列用
# logger.info(f"开始训练模型...")
# row, col = df.shape
# now = datetime.datetime.now().strftime('%Y%m%d%H%M%S')
# ex_Model(df,
# horizon=global_config['horizon'],
# input_size=global_config['input_size'],
# train_steps=global_config['train_steps'],
# val_check_steps=global_config['val_check_steps'],
# early_stop_patience_steps=global_config['early_stop_patience_steps'],
# is_debug=global_config['is_debug'],
# dataset=global_config['dataset'],
# is_train=global_config['is_train'],
# is_fivemodels=global_config['is_fivemodels'],
# val_size=global_config['val_size'],
# test_size=global_config['test_size'],
# settings=global_config['settings'],
# now=now,
# etadata=etadata,
# modelsindex=global_config['modelsindex'],
# data=data,
# is_eta=global_config['is_eta'],
# end_time=global_config['end_time'],
# )
# logger.info('模型训练完成')
# logger.info('训练数据绘图ing')
# model_results3 = model_losss_juxiting(
# sqlitedb, end_time=global_config['end_time'], is_fivemodels=global_config['is_fivemodels'])
# logger.info('训练数据绘图end')
push_market_value()
# sql_inset_predict(global_config)
# # 模型报告
# logger.info('制作报告ing')
# title = f'{settings}--{end_time}-预测报告' # 报告标题
# reportname = f'聚烯烃PP大模型月度预测--{end_time}.pdf' # 报告文件名
# reportname = reportname.replace(':', '-') # 替换冒号
# pp_export_pdf(dataset=dataset, num_models=5 if is_fivemodels else 22, time=end_time,
# reportname=reportname, sqlitedb=sqlitedb),
# logger.info('制作报告end')
# 图片报告
logger.info('图片报告ing')
pp_bdwd_png(global_config=global_config)
logger.info('图片报告end')
# # LSTM 单变量模型
# ex_Lstm(df,input_seq_len=input_size,output_seq_len=horizon,is_debug=is_debug,dataset=dataset)
# # lstm 多变量模型
# ex_Lstm_M(df,n_days=input_size,out_days=horizon,is_debug=is_debug,datasetpath=dataset)
# # GRU 模型
# # ex_GRU(df)
# 发送邮件
# m = SendMail(
# username=username,
# passwd=passwd,
# recv=recv,
# title=title,
# content=content,
# file=max(glob.glob(os.path.join(dataset,'*.pdf')), key=os.path.getctime),
# ssl=ssl,
# )
# m.send_mail()
if __name__ == '__main__':
# global end_time
# 遍历2024-11-25 到 2024-12-3 之间的工作日日期
# for i_time in pd.date_range('2025-7-28', '2025-7-29', freq='B'):
# try:
# global_config['end_time'] = i_time.strftime('%Y-%m-%d')
# global_config['db_mysql'].connect()
# predict_main()
# except Exception as e:
# logger.info(f'预测失败:{e}')
# continue
# predict_main()
# push_market_value()
push_png_report()
# 图片报告
# global_config['end_time'] = '2025-07-31'
# logger.info('图片报告ing')
# pp_bdwd_png(global_config=global_config)
# logger.info('图片报告end')

View File

@ -1381,6 +1381,14 @@ class Config:
def get_waring_data_value_list_data(
self): return global_config['get_waring_data_value_list_data']
@property
def push_png_report_url(
self): return global_config['push_png_report_url']
@property
def push_png_report_data(
self): return global_config['push_png_report_data']
@property
def bdwd_items(self): return global_config['bdwd_items']
@ -2384,6 +2392,24 @@ def get_market_data(end_time, df):
return df
def push_png_report(data):
'''
上传预测价格到市场信息平台
data: 预测价格数据,示例
'''
# 获取token
token = get_head_auth_report()
# 发送请求
headers = {"Authorization": token}
config.logger.info('推送图片报告中...')
items_res = requests.post(url=config.push_png_report_url, headers=headers,
json=data, timeout=(3, 35))
json_data = json.loads(items_res.text)
config.logger.info(f"推送图片报告结果:{json_data}")
return json_data
def push_market_data(data):
'''
上传预测价格到市场信息平台

View File

@ -477,7 +477,7 @@ def predict_main():
# sqlitedb, end_time=global_config['end_time'], is_fivemodels=global_config['is_fivemodels'])
# logger.info('训练数据绘图end')
# push_market_value()
push_market_value()
# sql_inset_predict(global_config)