From 9647067f6564f748fb38d4518bac03bed95e6eeb Mon Sep 17 00:00:00 2001 From: workpc Date: Mon, 28 Jul 2025 18:08:52 +0800 Subject: [PATCH] =?UTF-8?q?=E8=81=9A=E7=83=AF=E7=83=83=E5=85=AB=E5=A4=A7?= =?UTF-8?q?=E7=BB=B4=E5=BA=A6=E7=BB=93=E6=9E=9C=E7=BB=98=E5=9B=BE?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- lib/tools.py | 412 +++++++++++++++++++++++++++-------------------- main_juxiting.py | 9 +- 2 files changed, 248 insertions(+), 173 deletions(-) diff --git a/lib/tools.py b/lib/tools.py index 72d0074..6f75219 100644 --- a/lib/tools.py +++ b/lib/tools.py @@ -1,3 +1,4 @@ +import datetime from decimal import Decimal from langchain_core.documents import Document from langchain_openai import ChatOpenAI @@ -838,228 +839,297 @@ def convert_df_to_pydantic_pp(df_predict, model_id_name_dict, global_config): return results -# 查找上一交易日各维度的最佳模型 def find_best_models(date='', global_config=None): - best_models = {} model_id_name_dict = get_model_id_name_dict(global_config=global_config) - import datetime - if date == '': + # 处理日期输入 + if not date: date = datetime.datetime.now().strftime('%Y-%m-%d') else: - date = datetime.datetime.strptime( - date, '%Y-%m-%d').strftime('%Y-%m-%d') + try: + date = datetime.datetime.strptime( + date, '%Y-%m-%d').strftime('%Y-%m-%d') + except ValueError: + global_config['logger'].error( + f"日期格式错误,期望格式为 '%Y-%m-%d',实际输入: {date}") + return best_models - # 获取真实价格的八个维度价格 - true_price = pd.read_csv(os.path.join( - global_config['dataset'], '指标数据.csv')) - true_price = true_price[['ds', 'y']] + current_date = datetime.datetime.strptime(date, '%Y-%m-%d') + # 计算date对应月的一日 + first_day_of_month = current_date.replace(day=1) + # 计算date对应周的周一 + date_monday = current_date - \ + datetime.timedelta(days=current_date.weekday()) - year = int(date.split('-')[0]) - month = int(date.split('-')[1]) - day = int(date.split('-')[2]) + # 获取真实价格数据 + try: + true_price = pd.read_csv(os.path.join( + global_config['dataset'], '指标数据.csv'))[['ds', 'y']] + except FileNotFoundError: + global_config['logger'].error( + f"未找到文件: {os.path.join(global_config['dataset'], '指标数据.csv')}") + return best_models # 计算六月前的年月 + year, month = map(int, date.split('-')[:2]) if month <= 6: - year = int(year) - 1 + year -= 1 month = 12 else: - month = month - 6 + month -= 6 tb = 'v_tbl_predict_pp_prediction_results' sql = f'select * from {tb} where data_date >= \'{year}-{month}-01\'' # 数据库查询对应日期的预测值 predictresult = global_config['db_mysql'].execute_query(sql) - if len(predictresult) == 0: - print('没有预测结果') - return - df = pd.DataFrame(predictresult) - df = df[['data_date', 'model_id']+global_config['price_columns']] - print('预测结果数量:', df.shape) - print('预测结果日期范围:', df['data_date'].min(), '到', df['data_date'].max()) + if not predictresult: + global_config['logger'].info('没有预测结果') + return best_models + + df = pd.DataFrame(predictresult)[ + ['data_date', 'model_id'] + global_config['price_columns']] + global_config['logger'].info(f'预测结果数量:{df.shape}') + global_config['logger'].info( + f'预测结果日期范围:{df["data_date"].min()} 到 {df["data_date"].max()}') + + def query_predict_result(date, model_id, global_config, wd): + tb = 'v_tbl_predict_pp_prediction_results' + sql = f'select {wd} from {tb} where data_date = \'{date}\' and model_id = {model_id}' + predictresult = global_config['db_mysql'].execute_query(sql) + if not predictresult: + global_config['logger'].info('没有预测结果') + return None + predictresult = float(predictresult[0][wd]) + return predictresult + + def calculate_best_model(price, trend, weektrueprice=None, monthtrueprice=None): + """ + 计算最佳模型的辅助函数 + :param price: 包含预测价格的数据框 + :param trend: 价格趋势 + :param weektrueprice: 周真实价格均值 + :param monthtrueprice: 月真实价格均值 + :return: 最佳模型的 ID 和名称 + """ + price = price.copy() # Explicitly create a copy of the DataFrame + price[global_config['price_columns'][i] + ] = price[global_config['price_columns'][i]].astype(float) + price = price.dropna(subset=[global_config['price_columns'][i]]) + if weektrueprice is not None: + true_price_value = weektrueprice + elif monthtrueprice is not None: + true_price_value = monthtrueprice + else: + true_price_value = true_price[true_price['ds'] + == date]['y'].values[0] + + if not price.empty: + price.loc[:, 'trueprice'] = true_price_value + price.loc[:, 'trend'] = np.where( + price['trueprice'] - price[global_config['price_columns'][i]] > 0, 1, -1) + price.loc[:, 'abs'] = (price['trueprice'] - + price[global_config['price_columns'][i]]).abs() + if trend is not None: + price = price[price['trend'] == trend] + if not price.empty: + price = price[price['abs'] == price['abs'].min()] + best_model_id = price.iloc[0]['model_id'] + best_model_name = model_id_name_dict[best_model_id] + return best_model_id, best_model_name + # Return None if the DataFrame is empty + return None, None # 遍历全局配置中的价格列 for i, wd in enumerate(global_config['price_columns']): - # 为每个价格列初始化一个空字典,用于存储最佳模型信息 + global_config['logger'].info( + f'*********************************************************************************************************计算预测{date}的{wd}最佳模型') best_models[wd] = {} - # 处理第一个价格列,计算次日的最佳模型 + if i == 0: - # 计算当前日期的前一天日期 - ciridate = (pd.Timestamp(date) - pd.Timedelta(days=1) - ).strftime('%Y-%m-%d') - # 记录日志,提示开始计算次日的最佳模型 + # 计算当前日期的前一工作日日期 + ciridate = (pd.Timestamp(date) - + pd.tseries.offsets.BusinessDay(1)).strftime('%Y-%m-%d') global_config['logger'].info(f'计算预测{date}的次日{ciridate}最佳模型') - # 记录日志,输出当前日期的真实价格 global_config['logger'].info( f'{date}真实价格:{true_price[true_price["ds"] == date]["y"].values[0]}') - # 从数据框中选取需要的列 price = df[['data_date', wd, 'model_id']] - # 筛选出数据日期在 ciridate 到 date 之间的数据 price = price[(price['data_date'] == ciridate) | (price['data_date'] == date)] - # 将价格列的数据类型转换为 float - price[wd] = price[wd].astype(float) - # 删除价格列中包含缺失值的行 - price = price.dropna(subset=[wd]) - # 判断价格趋势,若当前日期价格大于前一天价格,趋势为 1,否则为 -1 trend = 1 if true_price[true_price['ds'] == date]['y'].values[0] - \ true_price[true_price['ds'] == ciridate]['y'].values[0] > 0 else -1 - # 为数据框添加真实价格列 - price['trueprice'] = true_price[true_price['ds'] - == date]['y'].values[0] - # 根据预测价格与真实价格的差值判断趋势,大于 0 为 1,否则为 -1 - price['trend'] = np.where( - price['trueprice'] - price[wd] > 0, 1, -1) - # 计算预测价格与真实价格差值的绝对值 - price['abs'] = (price['trueprice'] - price[wd]).abs() - # 筛选出趋势与整体趋势一致的数据 - price = price[price['trend'] == - trend] - # 筛选出预测价格与真实价格差值绝对值最小的数据 - price = price[price['abs'] == price['abs'].min()] - # 记录日志,输出筛选后的价格数据 - global_config['logger'].info(price) - # 获取最佳模型的 ID - best_model_id = price.iloc[0]['model_id'] - # 记录日志,输出次日预测最准确的模型 ID - global_config['logger'].info(f'{ciridate}预测最准确的模型:{best_model_id}') - # 将最佳模型的 ID 存入字典 + best_model_id, best_model_name = calculate_best_model(price, trend) best_models[wd]['model_id'] = best_model_id - # 根据模型 ID 获取模型名称并存入字典 - best_models[wd]['model_name'] = model_id_name_dict[best_model_id] - # 记录日志,输出次日预测最准确的模型名称 - global_config['logger'].info(f'{ciridate}预测最准确的模型名称:{best_models}') + best_models[wd]['model_name'] = best_model_name + global_config['logger'].info(f'{ciridate}预测最准确的模型:{best_model_id}') + global_config['logger'].info( + f'{ciridate}预测最准确的模型名称:{best_models[wd]}') + predictresult = query_predict_result( + date, best_model_id, global_config, wd) + if predictresult: + global_config['logger'].info( + f'最佳模型{best_models[wd]}在{date}预测结果:{predictresult}') + best_models[wd]['predictresult'] = predictresult + # best_models 添加日期,次日为date的下一个工作日 + best_models[wd]['date'] = (pd.Timestamp(date) + + pd.tseries.offsets.BusinessDay(1)).strftime('%Y-%m-%d') - if i == 1: + elif i == 1: # 计算五个工作日之前的日期 - benzhoudate = (pd.Timestamp(date) - pd.Timedelta(days=7) - ).strftime('%Y-%m-%d') - - # 记录日志,提示开始计算五天前的最佳模型 + benzhoudate = (pd.Timestamp(date) - + pd.Timedelta(days=7)).strftime('%Y-%m-%d') global_config['logger'].info(f'计算预测{date}的五天前{benzhoudate}最佳模型') - # 记录日志,输出当前日期的真实价格 global_config['logger'].info( f'{date}真实价格:{true_price[true_price["ds"] == date]["y"].values[0]}') - # 从数据框中选取需要的列 price = df[['data_date', wd, 'model_id']] - # 筛选出数据日期在 benzhoudate 到 date 之间的数据 price = price[(price['data_date'] == benzhoudate) | (price['data_date'] == date)] - # 将价格列的数据类型转换为 float - price[wd] = price[wd].astype(float) - # 删除价格列中包含缺失值的行 - price = price.dropna(subset=[wd]) - # 判断价格趋势,若当前日期价格大于前一天价格,趋势为 1,否则为 -1 trend = 1 if true_price[true_price['ds'] == date]['y'].values[0] - \ true_price[true_price['ds'] == benzhoudate]['y'].values[0] > 0 else -1 - # 记录日志,输出五天前预测最准确的模型名称 - global_config['logger'].info(f'实际趋势是:{trend}') - # 为数据框添加真实价格列 - price['trueprice'] = true_price[true_price['ds'] - == date]['y'].values[0] - # 根据预测价格与真实价格的差值判断趋势,大于 0 为 1,否则为 -1 - price['trend'] = np.where( - price['trueprice'] - price[wd] > 0, 1, -1) - # 计算预测价格与真实价格差值的绝对值 - price['abs'] = (price['trueprice'] - price[wd]).abs() - # 筛选出趋势与整体趋势一致的数据 - price = price[price['trend'] == - trend] - # 筛选出预测价格与真实价格差值绝对值最小的数据 - price = price[price['abs'] == price['abs'].min()] - # 记录日志,输出筛选后的价格数据 - global_config['logger'].info(price) - # 获取最佳模型的 ID - best_model_id = price.iloc[0]['model_id'] - # 记录日志,输出五天前预测最准确的模型 ID - global_config['logger'].info( - f'{benzhoudate}预测最准确的模型:{best_model_id}') - # 将最佳模型的 ID 存入字典 + best_model_id, best_model_name = calculate_best_model(price, trend) best_models[wd]['model_id'] = best_model_id - # 根据模型 ID 获取模型名称并存入字典 - best_models[wd]['model_name'] = model_id_name_dict[best_model_id] - # 记录日志,输出五天前预测最准确的模型名称 + best_models[wd]['model_name'] = best_model_name global_config['logger'].info( - f'{benzhoudate}预测最准确的模型名称:{best_models}') + f'{benzhoudate}预测最准确的模型名称:{best_models[wd]}') + predictresult = query_predict_result( + date, best_model_id, global_config, wd) + if predictresult: + global_config['logger'].info( + f'最佳模型{best_models[wd]}在{date}预测结果:{predictresult}') + best_models[wd]['predictresult'] = predictresult + else: + best_models[wd]['predictresult'] = None + best_models[wd]['date'] = (pd.Timestamp(date) + + pd.tseries.offsets.BusinessDay(5)).strftime('%Y-%m-%d') - if i == 2: - # 计算当前周的前两周的周一和周日的日期 - current_date = datetime.datetime.strptime(date, '%Y-%m-%d') - # 计算前两一周周一 - one_weeks_ago_monday = current_date - \ - datetime.timedelta(days=current_date.weekday() + 7) - # 计算前一周周日 - one_weeks_ago_sunday = one_weeks_ago_monday + \ - datetime.timedelta(days=6) - cizhoudate = f"{one_weeks_ago_monday.strftime('%Y-%m-%d')} - {one_weeks_ago_sunday.strftime('%Y-%m-%d')}" - print(f'计算预测{date}次周最佳模型,前一周日期区间: {cizhoudate}') - if i == 3: - # 计算当前周的前两周的周一和周日的日期 - current_date = datetime.datetime.strptime(date, '%Y-%m-%d') - # 计算前两周周一 - two_weeks_ago_monday = current_date - \ - datetime.timedelta(days=current_date.weekday() + 14) - # 计算前两周周日 - two_weeks_ago_sunday = two_weeks_ago_monday + \ - datetime.timedelta(days=6) - gezhoudate = f"{two_weeks_ago_monday.strftime('%Y-%m-%d')} - {two_weeks_ago_sunday.strftime('%Y-%m-%d')}" - print(f'计算预测{date}隔周最佳模型,前两周日期区间: {gezhoudate}') - if i == 4: - # 计算当上月的1日及最后一日 - current_date = pd.Timestamp(date) - # 获取上月第一天 + elif i in [2, 3]: + weeks_ago = 1 if i == 2 else 2 + ago_monday = current_date - \ + datetime.timedelta(days=current_date.weekday() + 7 * weeks_ago) + ago_sunday = ago_monday + datetime.timedelta(days=6) + ago_date_str = f"{ago_monday.strftime('%Y-%m-%d')} - {ago_sunday.strftime('%Y-%m-%d')}" + global_config['logger'].info( + f'计算预测{date}的前{weeks_ago}周{ago_date_str}最佳模型') + weektrueprice = true_price[(true_price['ds'] >= date_monday.strftime( + '%Y-%m-%d')) & (true_price['ds'] <= date)]['y'].mean() + global_config['logger'].info( + f'当周{date_monday.strftime("%Y-%m-%d")}---{date}真实价格的周均价:{weektrueprice}') + price = df[['data_date', wd, 'model_id']] + price = price[(price['data_date'] >= ago_monday) & + (price['data_date'] <= ago_sunday)] + price = price.groupby('model_id')[wd].mean().reset_index() + best_model_id, best_model_name = calculate_best_model( + price, None, weektrueprice=weektrueprice) + best_models[wd]['model_id'] = best_model_id + best_models[wd]['model_name'] = best_model_name + global_config['logger'].info( + f'{ago_date_str}预测最准确的模型名称:{best_models[wd]}') + predictresult = query_predict_result( + date, best_model_id, global_config, wd) + if predictresult: + global_config['logger'].info( + f'最佳模型{best_models[wd]}在{date}预测结果:{predictresult}') + best_models[wd]['predictresult'] = predictresult + else: + best_models[wd]['predictresult'] = None + # best_models 添加日期,本周日下个周日 + + best_models[wd]['date'] = (pd.Timestamp(ago_sunday) + + pd.tseries.offsets.Week(weeks_ago*2)).strftime('%Y-%m-%d') + + elif i in [4, 5, 6, 7]: + months_ago = i - 3 + current_date_ts = pd.Timestamp(date) last_month_first_day = ( - current_date - pd.offsets.MonthBegin(2)).strftime('%Y-%m-%d') - # 获取上月最后一天 + current_date_ts - pd.offsets.MonthBegin(months_ago)).strftime('%Y-%m-%d') last_month_last_day = (pd.Timestamp( last_month_first_day) + pd.offsets.MonthEnd(0)).strftime('%Y-%m-%d') - print( - f'计算预测{date}次月最佳模型,上月日期区间: {last_month_first_day} - {last_month_last_day}') - if i == 5: - # 计算两月前的1日及最后一日 - current_date = pd.Timestamp(date) - # 获取上上月第一天 - last_month_first_day = ( - current_date - pd.offsets.MonthBegin(3)).strftime('%Y-%m-%d') - # 获取上上月最后一天 - last_month_last_day = (pd.Timestamp( - last_month_first_day) + pd.offsets.MonthEnd(0)).strftime('%Y-%m-%d') - print( - f'计算预测{date}次二月最佳模型,两月前日期区间: {last_month_first_day} - {last_month_last_day}') - if i == 6: - # 计算三月前的1日及最后一日 - current_date = pd.Timestamp(date) - # 获取前三月第一天 - last_month_first_day = ( - current_date - pd.offsets.MonthBegin(4)).strftime('%Y-%m-%d') - # 获取前三月最后一天 - last_month_last_day = (pd.Timestamp( - last_month_first_day) + pd.offsets.MonthEnd(0)).strftime('%Y-%m-%d') - print( - f'计算预测{date}次三月最佳模型,三月前日期区间: {last_month_first_day} - {last_month_last_day}') - if i == 7: - # 计算四月前的1日及最后一日 - current_date = pd.Timestamp(date) - # 获取前四月第一天 - last_month_first_day = ( - current_date - pd.offsets.MonthBegin(5)).strftime('%Y-%m-%d') - # 获取前四月最后一天 - last_month_last_day = (pd.Timestamp( - last_month_first_day) + pd.offsets.MonthEnd(0)).strftime('%Y-%m-%d') - print( - f'计算预测{date}次四月最佳模型,四月前日期区间: {last_month_first_day} - {last_month_last_day}') + global_config['logger'].info( + f'计算预测{date}的{months_ago}月前{last_month_first_day}-{last_month_last_day}最佳模型') + monthtrueprice = true_price[(true_price['ds'] >= first_day_of_month.strftime( + '%Y-%m-%d')) & (true_price['ds'] <= date)]['y'].mean() + global_config['logger'].info( + f'当月{first_day_of_month.strftime("%Y-%m-%d")}-{date}真实价格的月均价:{monthtrueprice}') + price = df[['data_date', wd, 'model_id']] + price = price[(price['data_date'] >= last_month_first_day) & ( + price['data_date'] <= last_month_last_day)] + price = price.groupby('model_id')[wd].mean().reset_index() + best_model_id, best_model_name = calculate_best_model( + price, None, monthtrueprice=monthtrueprice) + best_models[wd]['model_id'] = best_model_id + best_models[wd]['model_name'] = best_model_name + global_config['logger'].info( + f'{last_month_first_day}-{last_month_last_day}预测最准确的模型名称:{best_models[wd]}') + predictresult = query_predict_result( + date, best_model_id, global_config, wd) + if predictresult: + global_config['logger'].info( + f'最佳模型{best_models[wd]}在{date}预测结果:{predictresult}') + best_models[wd]['predictresult'] = predictresult + else: + best_models[wd]['predictresult'] = None + best_models[wd]['date'] = (pd.Timestamp(date) + + pd.tseries.offsets.MonthEnd(months_ago+1)).strftime('%Y-%m-%d') - # # 获取真实价格的八个维度价格 - # true_price = pd.read_csv(os.path.join( - # global_config['dataset'], '指标数据.csv')) - # true_price = true_price[['ds', 'y']] - # print(true_price.head()) + return best_models - # # 根据当前日期date,计算对应八个维度的价格 - # bdwd_price = get_bdwd_price(date, true_price) - return predictresult +def plot_pp_predict_result(y_hat, global_config): + """ + 绘制PP期货预测结果的图表 + """ + import matplotlib.pyplot as plt + import seaborn as sns + + # 获取y的真实值 + y = pd.read_csv(os.path.join( + global_config['dataset'], '指标数据.csv'))[['ds', 'y']] + y['ds'] = pd.to_datetime(y['ds']) + y = y[y['ds'] < y_hat['ds'].iloc[0]][-30:] + + # 创建图表和子图布局,为表格预留空间 + fig, ax = plt.subplots(figsize=(16, 9)) + + # 对日期列进行排序,确保日期大的在右边 + y_hat = y_hat.sort_values(by='ds') + y = y.sort_values(by='ds') + + # 绘制 y_hat 的折线图,颜色为橙色 + sns.lineplot(x=y_hat['ds'], y=y_hat['predictresult'], + color='orange', label='y_hat', ax=ax, linestyle='--') + # 绘制 y 的折线图,颜色为蓝色 + sns.lineplot(x=y['ds'], y=y['y'], color='blue', label='y', ax=ax) + + # date_str = pd.Timestamp(y_hat["ds"].iloc[0]).strftime('%Y-%m-%d') + ax.set_title(f'{global_config["end_time"]} PP期货八大维度 预测价格走势') + ax.set_xlabel('日期') + ax.set_ylabel('预测结果') + ax.tick_params(axis='x', rotation=45) + + # 准备表格数据 + y_hat = y_hat[['predictresult']].T + print(y_hat) + y_hat.rename(columns={'day_price': '次日', 'week_price': '本周', + 'second_week_price': '次周', 'next_week_price': '隔周', + 'next_month_price': '次月', 'next_february_price': '次二月', + 'next_march_price': '次三月', 'next_april_price': '次四月', + }, inplace=True) + columns = y_hat.columns.tolist() + data = y_hat.values.tolist() + + # 将日期转换为字符串格式 + for row in data: + if isinstance(row[0], pd.Timestamp): + row[0] = row[0].strftime('%Y-%m-%d') + + # 在图表下方添加表格 + table = ax.table(cellText=data, colLabels=columns, + loc='bottom', bbox=[0, -0.6, 1, 0.2]) + table.auto_set_font_size(False) + table.set_fontsize(14) + + plt.tight_layout(rect=[0, 0.1, 1, 1]) # 调整布局,为表格留出空间 + plt.savefig('pp_predict_result.png') if __name__ == '__main__': diff --git a/main_juxiting.py b/main_juxiting.py index dd35d77..9894cb7 100644 --- a/main_juxiting.py +++ b/main_juxiting.py @@ -2,7 +2,7 @@ from lib.dataread import * from config_juxiting import * -from lib.tools import SendMail, exception_logger, convert_df_to_pydantic_pp, exception_logger, get_modelsname +from lib.tools import SendMail, exception_logger, convert_df_to_pydantic_pp, exception_logger, get_modelsname, plot_pp_predict_result from models.nerulforcastmodels import ex_Model_Juxiting, model_losss_juxiting, pp_export_pdf import datetime import torch @@ -559,4 +559,9 @@ if __name__ == '__main__': # push_market_value() # sql_inset_predict(global_config) from lib.tools import find_best_models - find_best_models(date='2025-07-18', global_config=global_config) + best_bdwd_price = find_best_models( + date='2025-07-22', global_config=global_config) + y_hat = pd.DataFrame(best_bdwd_price).T[['date', 'predictresult']] + y_hat['ds'] = pd.to_datetime(y_hat['date']) + # 绘制PP期货预测结果的图表 + plot_pp_predict_result(y_hat, global_config)