From d574ea7895ef5552ea25670f0be9e676fc555ebf Mon Sep 17 00:00:00 2001 From: workpc Date: Tue, 5 Aug 2025 17:38:53 +0800 Subject: [PATCH] =?UTF-8?q?=E5=9B=BE=E7=89=87=E6=8A=A5=E5=91=8A=E5=8F=91?= =?UTF-8?q?=E9=80=81=E8=B0=83=E8=AF=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- juxiting_push_png_report.py | 479 +----------------------------------- lib/dataread.py | 4 +- 2 files changed, 16 insertions(+), 467 deletions(-) diff --git a/juxiting_push_png_report.py b/juxiting_push_png_report.py index 601a30d..ae63a5d 100644 --- a/juxiting_push_png_report.py +++ b/juxiting_push_png_report.py @@ -1,12 +1,7 @@ # 读取配置 - from lib.dataread import * from config_juxiting_yuedu import * -from lib.tools import SendMail, convert_df_to_pydantic_pp, exception_logger, find_best_models, get_modelsname -from models.nerulforcastmodels import ex_Model, model_losss_juxiting, pp_bdwd_png, pp_export_pdf import datetime -import torch -torch.set_float32_matmul_precision("high") global_config.update({ # 核心参数 @@ -94,474 +89,26 @@ global_config.update({ def push_png_report(): - logger.info('发送图片到钉钉工作组') current_end_time = global_config['end_time'] previous_trading_day = (pd.Timestamp(current_end_time) - pd.tseries.offsets.BusinessDay(1)).strftime('%Y-%m-%d') png_report_files = ['pp_zhouducorrelation.png', 'pp_yueducorrelation.png'] - with open(os.path.join(global_config['dataset'], 'pp_zhouducorrelation.png'), 'rb') as f: - base64_data = base64.b64encode(f.read()).decode('utf-8') - config.upload_data["data"]["fileBase64"] = base64_data - data = global_config['push_png_report_data'] - data['data']['fileBase64'] = base64_data + for png_report_file in png_report_files: + logger.info(f'发送图片{png_report_file}到钉钉工作组') + try: + with open(os.path.join(global_config['dataset'], png_report_file), 'rb') as f: + base64_data = base64.b64encode(f.read()).decode('utf-8') + config.upload_data["data"]["fileBase64"] = base64_data + data = global_config['push_png_report_data'] + data['data']['fileBase64'] = base64_data + data['data']['billNo'] = str(time.time()) - pngreportdata = push_png_report(data) - - print(pngreportdata) - - -def push_market_value(): - logger.info('发送预测结果到市场信息平台') - current_end_time = global_config['end_time'] - previous_trading_day = (pd.Timestamp(current_end_time) - - pd.tseries.offsets.BusinessDay(1)).strftime('%Y-%m-%d') - - # 读取预测数据和模型评估数据 - best_bdwd_price = find_best_models( - date=previous_trading_day, global_config=global_config) - - # 获取本月最佳模型的预测价格 - four_month_predict_price = pd.read_csv( - os.path.join(global_config['dataset'], 'predict.csv')) - four_month_predict_price['ds'] = pd.to_datetime( - four_month_predict_price['ds']) - # 设置索引 次月 次二月 次三月 次四月 - index_labels = ["次月", "次二月", "次三月", "次四月"] - four_month_predict_price.index = index_labels - global_config['logger'].info(f"best_bdwd_price: {best_bdwd_price}") - - # 准备要推送的数据 - ciyue_mean = four_month_predict_price[best_bdwd_price['next_month_price'] - ['model_name']].iloc[0] - cieryue_mean = four_month_predict_price[best_bdwd_price['next_february_price'] - ['model_name']].iloc[1] - cisanyue_mean = four_month_predict_price[best_bdwd_price['next_march_price'] - ['model_name']].iloc[2] - cisieryue_mean = four_month_predict_price[best_bdwd_price['next_april_price'] - ['model_name']].iloc[3] - # # 保留两位小数 - ciyue_mean = round(ciyue_mean, 2) - cieryue_mean = round(cieryue_mean, 2) - cisanyue_mean = round(cisanyue_mean, 2) - cisieryue_mean = round(cisieryue_mean, 2) - - predictdata = [ - { - "dataItemNo": global_config['bdwd_items']['ciyue'], - "dataDate": global_config['end_time'].replace('-', ''), - "dataStatus": "add", - "dataValue": ciyue_mean - }, - { - "dataItemNo": global_config['bdwd_items']['cieryue'], - "dataDate": global_config['end_time'].replace('-', ''), - "dataStatus": "add", - "dataValue": cieryue_mean - }, - { - "dataItemNo": global_config['bdwd_items']['cisanyue'], - "dataDate": global_config['end_time'].replace('-', ''), - "dataStatus": "add", - "dataValue": cisanyue_mean - }, - { - "dataItemNo": global_config['bdwd_items']['cisiyue'], - "dataDate": global_config['end_time'].replace('-', ''), - "dataStatus": "add", - "dataValue": cisieryue_mean - } - ] - - print(predictdata) - - # 推送数据到市场信息平台 - try: - push_market_data(predictdata) - except Exception as e: - logger.error(f"推送数据失败: {e}") - - -def sql_inset_predict(global_config): - df = pd.read_csv(os.path.join(config.dataset, 'predict.csv')) - df['created_dt'] = pd.to_datetime(df['created_dt']) - df['ds'] = pd.to_datetime(df['ds']) - # 获取次月预测结果 - next_month_price_df = df[df['ds'] == df['ds'].min()] - # 获取次二月预测结果 - next_february_price_df = df.iloc[[1]] - # 获取次三月预测结果 - next_march_price_df = df.iloc[[2]] - # 获取次四月预测结果 - next_april_price_df = df[df['ds'] == df['ds'].max()] - - wd = ['next_month_price', 'next_february_price', - 'next_march_price', 'next_april_price'] - model_name_list, model_id_name_dict = get_modelsname(df, global_config) - - PRICE_COLUMNS = [ - 'day_price', 'week_price', 'second_week_price', 'next_week_price', - 'next_month_price', 'next_february_price', 'next_march_price', 'next_april_price' - ] - - params_list = [] - for df, price_type in zip([next_month_price_df, next_february_price_df, next_march_price_df, next_april_price_df], wd): - - update_columns = [ - "feature_factor_frequency = VALUES(feature_factor_frequency)", - "oil_code = VALUES(oil_code)", - "oil_name = VALUES(oil_name)", - "data_date = VALUES(data_date)", - "market_price = VALUES(market_price)", - f"{price_type} = VALUES({price_type})", - "model_evaluation_id = VALUES(model_evaluation_id)", - "tenant_code = VALUES(tenant_code)", - "version_num = VALUES(version_num)", - "delete_flag = VALUES(delete_flag)", - "update_user = VALUES(update_user)", - "update_date = VALUES(update_date)" - ] - - insert_query = f""" - INSERT INTO v_tbl_predict_pp_prediction_results ( - feature_factor_frequency, strategy_id, oil_code, oil_name, data_date, - market_price, day_price, week_price, second_week_price, next_week_price, - next_month_price, next_february_price, next_march_price, next_april_price, - model_evaluation_id, model_id, tenant_code, version_num, delete_flag, - create_user, create_date, update_user, update_date - ) VALUES ( - %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s - ) - ON DUPLICATE KEY UPDATE - {', '.join(update_columns)} - """ - - next_day_df = df[['ds', 'created_dt'] + model_name_list] - - pydantic_results = convert_df_to_pydantic_pp( - next_day_df, model_id_name_dict, global_config) - if pydantic_results: - - for result in pydantic_results: - price_values = [None] * len(PRICE_COLUMNS) - price_index = PRICE_COLUMNS.index(price_type) - price_values[price_index] = next_day_df[model_id_name_dict[result.model_id]].values[0] - - params = ( - result.feature_factor_frequency, - result.strategy_id, - result.oil_code, - result.oil_name, - next_day_df['created_dt'].values[0], - result.market_price, - *price_values, - result.model_evaluation_id, - result.model_id, - result.tenant_code, - 1, - '0', - result.create_user, - result.create_date, - result.update_user, - result.update_date - ) - params_list.append(params) - affected_rows = config.db_mysql.execute_batch_insert( - insert_query, params_list) - config.logger.info(f"成功插入或更新 {affected_rows} 条记录") - config.db_mysql.close() - - -def predict_main(): - """ - 主预测函数,用于从 ETA 获取数据、处理数据、训练模型并进行预测。 - - 参数: - signature (BinanceAPI): Binance API 实例。 - etadata (EtaReader): ETA 数据读取器实例。 - is_eta (bool): 是否从 ETA 获取数据。 - data_set (str): 数据集名称。 - dataset (str): 数据集路径。 - add_kdj (bool): 是否添加 KDJ 指标。 - is_timefurture (bool): 是否添加时间衍生特征。 - end_time (str): 结束时间。 - is_edbnamelist (bool): 是否使用 EDB 名称列表。 - edbnamelist (list): EDB 名称列表。 - y (str): 预测目标列名。 - sqlitedb (SQLiteDB): SQLite 数据库实例。 - is_corr (bool): 是否进行相关性分析。 - horizon (int): 预测时域。 - input_size (int): 输入数据大小。 - train_steps (int): 训练步数。 - val_check_steps (int): 验证检查步数。 - early_stop_patience_steps (int): 早停耐心步数。 - is_debug (bool): 是否调试模式。 - dataset (str): 数据集名称。 - is_train (bool): 是否训练模型。 - is_fivemodels (bool): 是否使用五个模型。 - val_size (float): 验证集大小。 - test_size (float): 测试集大小。 - settings (dict): 模型设置。 - now (str): 当前时间。 - etadata (EtaReader): ETA 数据读取器实例。 - modelsindex (list): 模型索引列表。 - data (str): 数据类型。 - is_eta (bool): 是否从 ETA 获取数据。 - - 返回: - None - """ - # end_time = global_config['end_time'] - # signature = BinanceAPI(APPID, SECRET) - # etadata = EtaReader(signature=signature, - # classifylisturl=global_config['classifylisturl'], - # classifyidlisturl=global_config['classifyidlisturl'], - # edbcodedataurl=global_config['edbcodedataurl'], - # edbcodelist=global_config['edbcodelist'], - # edbdatapushurl=global_config['edbdatapushurl'], - # edbdeleteurl=global_config['edbdeleteurl'], - # edbbusinessurl=global_config['edbbusinessurl'], - # classifyId=global_config['ClassifyId'], - # ) - # # 获取数据 - # if is_eta: - # logger.info('从eta获取数据...') - - # df_zhibiaoshuju, df_zhibiaoliebiao = etadata.get_eta_api_pp_data( - # data_set=data_set, dataset=dataset) # 原始数据,未处理 - - # if is_market: - # logger.info('从市场信息平台获取数据...') - # try: - # # 如果是测试环境,最高价最低价取excel文档 - # if server_host == '192.168.100.53': - # logger.info('从excel文档获取最高价最低价') - # df_zhibiaoshuju = get_high_low_data(df_zhibiaoshuju) - # else: - # logger.info('从市场信息平台获取数据') - # df_zhibiaoshuju = get_market_data( - # end_time, df_zhibiaoshuju) - - # except: - # logger.info('最高最低价拼接失败') - - # # 保存到xlsx文件的sheet表 - # with pd.ExcelWriter(os.path.join(dataset, data_set)) as file: - # df_zhibiaoshuju.to_excel(file, sheet_name='指标数据', index=False) - # df_zhibiaoliebiao.to_excel(file, sheet_name='指标列表', index=False) - - # # 数据处理 - # df = datachuli_juxiting(df_zhibiaoshuju, df_zhibiaoliebiao, y=global_config['y'], dataset=dataset, add_kdj=add_kdj, is_timefurture=is_timefurture, - # end_time=end_time) - - # else: - # # 读取数据 - # logger.info('读取本地数据:' + os.path.join(dataset, data_set)) - # df, df_zhibiaoliebiao = getdata_zhoudu_juxiting(filename=os.path.join(dataset, data_set), y=y, dataset=dataset, add_kdj=add_kdj, - # is_timefurture=is_timefurture, end_time=end_time) # 原始数据,未处理 - - # # 更改预测列名称 - # df.rename(columns={y: 'y'}, inplace=True) - - # if is_edbnamelist: - # df = df[edbnamelist] - # df.to_csv(os.path.join(dataset, '指标数据.csv'), index=False) - # # 保存最新日期的y值到数据库 - # # 取第一行数据存储到数据库中 - # first_row = df[['ds', 'y']].tail(1) - # # 判断y的类型是否为float - # if not isinstance(first_row['y'].values[0], float): - # logger.info(f'{end_time}预测目标数据为空,跳过') - # return None - - # # 将最新真实值保存到数据库 - # if not sqlitedb.check_table_exists('trueandpredict'): - # first_row.to_sql('trueandpredict', sqlitedb.connection, index=False) - # else: - # for row in first_row.itertuples(index=False): - # row_dict = row._asdict() - # config.logger.info(f'要保存的真实值:{row_dict}') - # # 判断ds是否为字符串类型,如果不是则转换为字符串类型 - # if isinstance(row_dict['ds'], (pd.Timestamp, datetime.datetime)): - # row_dict['ds'] = row_dict['ds'].strftime('%Y-%m-%d') - # elif not isinstance(row_dict['ds'], str): - # try: - # row_dict['ds'] = pd.to_datetime( - # row_dict['ds']).strftime('%Y-%m-%d') - # except: - # logger.warning(f"无法解析的时间格式: {row_dict['ds']}") - # # row_dict['ds'] = row_dict['ds'].strftime('%Y-%m-%d') - # # row_dict['ds'] = row_dict['ds'].strftime('%Y-%m-%d %H:%M:%S') - # check_query = sqlitedb.select_data( - # 'trueandpredict', where_condition=f"ds = '{row.ds}'") - # if len(check_query) > 0: - # set_clause = ", ".join( - # [f"{key} = '{value}'" for key, value in row_dict.items()]) - # sqlitedb.update_data( - # 'trueandpredict', set_clause, where_condition=f"ds = '{row.ds}'") - # continue - # sqlitedb.insert_data('trueandpredict', tuple( - # row_dict.values()), columns=row_dict.keys()) - - # # 更新accuracy表的y值 - # if not sqlitedb.check_table_exists('accuracy'): - # pass - # else: - # update_y = sqlitedb.select_data( - # 'accuracy', where_condition="y is null") - # if len(update_y) > 0: - # logger.info('更新accuracy表的y值') - # # 找到update_y 中ds且df中的y的行 - # update_y = update_y[update_y['ds'] <= end_time] - # logger.info(f'要更新y的信息:{update_y}') - # # try: - # for row in update_y.itertuples(index=False): - # try: - # row_dict = row._asdict() - # yy = df[df['ds'] == row_dict['ds']]['y'].values[0] - # LOW = df[df['ds'] == row_dict['ds']]['Brentzdj'].values[0] - # HIGH = df[df['ds'] == row_dict['ds']]['Brentzgj'].values[0] - # sqlitedb.update_data( - # 'accuracy', f"y = {yy},LOW_PRICE = {LOW},HIGH_PRICE = {HIGH}", where_condition=f"ds = '{row_dict['ds']}'") - # except: - # logger.info(f'更新accuracy表的y值失败:{row_dict}') - # # except Exception as e: - # # logger.info(f'更新accuracy表的y值失败:{e}') - - # # 判断当前日期是不是周一 - # is_weekday = datetime.datetime.now().weekday() == 0 - # if is_weekday: - # logger.info('今天是周一,更新预测模型') - # # 计算最近60天预测残差最低的模型名称 - # model_results = sqlitedb.select_data( - # 'trueandpredict', order_by="ds DESC", limit="60") - # # 删除空值率为90%以上的列 - # if len(model_results) > 10: - # model_results = model_results.dropna( - # thresh=len(model_results)*0.1, axis=1) - # # 删除空行 - # model_results = model_results.dropna() - # modelnames = model_results.columns.to_list()[2:-1] - # for col in model_results[modelnames].select_dtypes(include=['object']).columns: - # model_results[col] = model_results[col].astype(np.float32) - # # 计算每个预测值与真实值之间的偏差率 - # for model in modelnames: - # model_results[f'{model}_abs_error_rate'] = abs( - # model_results['y'] - model_results[model]) / model_results['y'] - # # 获取每行对应的最小偏差率值 - # min_abs_error_rate_values = model_results.apply( - # lambda row: row[[f'{model}_abs_error_rate' for model in modelnames]].min(), axis=1) - # # 获取每行对应的最小偏差率值对应的列名 - # min_abs_error_rate_column_name = model_results.apply( - # lambda row: row[[f'{model}_abs_error_rate' for model in modelnames]].idxmin(), axis=1) - # # 将列名索引转换为列名 - # min_abs_error_rate_column_name = min_abs_error_rate_column_name.map( - # lambda x: x.split('_')[0]) - # # 取出现次数最多的模型名称 - # most_common_model = min_abs_error_rate_column_name.value_counts().idxmax() - # logger.info(f"最近60天预测残差最低的模型名称:{most_common_model}") - # # 保存结果到数据库 - # if not sqlitedb.check_table_exists('most_model'): - # sqlitedb.create_table( - # 'most_model', columns="ds datetime, most_common_model TEXT") - # sqlitedb.insert_data('most_model', (datetime.datetime.now().strftime( - # '%Y-%m-%d %H:%M:%S'), most_common_model,), columns=('ds', 'most_common_model',)) - - # if is_corr: - # df = corr_feature(df=df) - - # df1 = df.copy() # 备份一下,后面特征筛选完之后加入ds y 列用 - # logger.info(f"开始训练模型...") - # row, col = df.shape - - # now = datetime.datetime.now().strftime('%Y%m%d%H%M%S') - # ex_Model(df, - # horizon=global_config['horizon'], - # input_size=global_config['input_size'], - # train_steps=global_config['train_steps'], - # val_check_steps=global_config['val_check_steps'], - # early_stop_patience_steps=global_config['early_stop_patience_steps'], - # is_debug=global_config['is_debug'], - # dataset=global_config['dataset'], - # is_train=global_config['is_train'], - # is_fivemodels=global_config['is_fivemodels'], - # val_size=global_config['val_size'], - # test_size=global_config['test_size'], - # settings=global_config['settings'], - # now=now, - # etadata=etadata, - # modelsindex=global_config['modelsindex'], - # data=data, - # is_eta=global_config['is_eta'], - # end_time=global_config['end_time'], - # ) - - # logger.info('模型训练完成') - - # logger.info('训练数据绘图ing') - # model_results3 = model_losss_juxiting( - # sqlitedb, end_time=global_config['end_time'], is_fivemodels=global_config['is_fivemodels']) - # logger.info('训练数据绘图end') - - push_market_value() - - # sql_inset_predict(global_config) - - # # 模型报告 - # logger.info('制作报告ing') - # title = f'{settings}--{end_time}-预测报告' # 报告标题 - # reportname = f'聚烯烃PP大模型月度预测--{end_time}.pdf' # 报告文件名 - # reportname = reportname.replace(':', '-') # 替换冒号 - # pp_export_pdf(dataset=dataset, num_models=5 if is_fivemodels else 22, time=end_time, - # reportname=reportname, sqlitedb=sqlitedb), - - # logger.info('制作报告end') - - # 图片报告 - logger.info('图片报告ing') - pp_bdwd_png(global_config=global_config) - logger.info('图片报告end') - - # # LSTM 单变量模型 - # ex_Lstm(df,input_seq_len=input_size,output_seq_len=horizon,is_debug=is_debug,dataset=dataset) - - # # lstm 多变量模型 - # ex_Lstm_M(df,n_days=input_size,out_days=horizon,is_debug=is_debug,datasetpath=dataset) - - # # GRU 模型 - # # ex_GRU(df) - - # 发送邮件 - # m = SendMail( - # username=username, - # passwd=passwd, - # recv=recv, - # title=title, - # content=content, - # file=max(glob.glob(os.path.join(dataset,'*.pdf')), key=os.path.getctime), - # ssl=ssl, - # ) - # m.send_mail() + pngreportdata = push_png_report_to_market(data) + logger.info(f'{png_report_file}推送图片报告到钉钉成功{pngreportdata}') + except Exception as e: + logger.error(f'{png_report_file}推送图片报告到钉钉失败:{e}') if __name__ == '__main__': - # global end_time - # 遍历2024-11-25 到 2024-12-3 之间的工作日日期 - # for i_time in pd.date_range('2025-7-28', '2025-7-29', freq='B'): - # try: - # global_config['end_time'] = i_time.strftime('%Y-%m-%d') - # global_config['db_mysql'].connect() - # predict_main() - # except Exception as e: - # logger.info(f'预测失败:{e}') - # continue - - # predict_main() - # push_market_value() push_png_report() - - # 图片报告 - # global_config['end_time'] = '2025-07-31' - # logger.info('图片报告ing') - # pp_bdwd_png(global_config=global_config) - # logger.info('图片报告end') diff --git a/lib/dataread.py b/lib/dataread.py index 74b70d2..7659733 100644 --- a/lib/dataread.py +++ b/lib/dataread.py @@ -2392,7 +2392,7 @@ def get_market_data(end_time, df): return df -def push_png_report(data): +def push_png_report_to_market(data): ''' 上传预测价格到市场信息平台 data: 预测价格数据,示例: @@ -2403,6 +2403,8 @@ def push_png_report(data): # 发送请求 headers = {"Authorization": token} config.logger.info('推送图片报告中...') + config.logger.info(f'推送图片报告URL:{config.push_png_report_url}') + # config.logger.info(f'推送图片报告数据:{data}') items_res = requests.post(url=config.push_png_report_url, headers=headers, json=data, timeout=(3, 35)) json_data = json.loads(items_res.text)