From 352d30dfa97257ec4433ee66ea07f9cb54f6c1de Mon Sep 17 00:00:00 2001 From: workpc Date: Thu, 26 Jun 2025 19:50:19 +0800 Subject: [PATCH] =?UTF-8?q?=E5=85=AB=E5=A4=A7=E7=BB=B4=E5=BA=A6=E6=95=B0?= =?UTF-8?q?=E6=8D=AE=E5=AD=98=E5=82=A8=E5=88=B0=E6=95=B0=E6=8D=AE=E5=BA=93?= =?UTF-8?q?=EF=BC=8C=E4=BF=AE=E6=94=B9=E8=A1=A8=E8=AE=BE=E8=AE=A1?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- lib/pydantic_models.py | 15 +++- lib/tools.py | 19 +++-- main_yuanyou.py | 93 +++++++++++++--------- main_yuanyou_yuedu.py | 171 +++++++++++++++++++++++++++++++---------- main_yuanyou_zhoudu.py | 101 ++++++++++++++---------- 5 files changed, 274 insertions(+), 125 deletions(-) diff --git a/lib/pydantic_models.py b/lib/pydantic_models.py index 17c56b1..4b1c9ea 100644 --- a/lib/pydantic_models.py +++ b/lib/pydantic_models.py @@ -7,11 +7,20 @@ from decimal import Decimal class PredictionResult(BaseModel): - prediction_date: datetime - bdwd: Optional[str] = None feature_factor_frequency: str strategy_id: int - predicted_price: Decimal + oil_code: Optional[str] = None + oil_name: Optional[str] = None + data_date: Optional[datetime] = None + market_price: Optional[Decimal] = None + day_price: Optional[Decimal] = None + week_price: Optional[Decimal] = None + second_week_price: Optional[Decimal] = None + next_week_price: Optional[Decimal] = None + next_month_price: Optional[Decimal] = None + next_february_price: Optional[Decimal] = None + next_march_price: Optional[Decimal] = None + next_april_price: Optional[Decimal] = None model_evaluation_id: int model_id: int tenant_code: Optional[str] = None diff --git a/lib/tools.py b/lib/tools.py index 2a41232..c2f7b9b 100644 --- a/lib/tools.py +++ b/lib/tools.py @@ -576,6 +576,8 @@ class MySQLDB: return 0 try: cursor = self.connection.cursor() + logging.info(f"Executing batch insert SQL: {query}") + logging.info(f"Batch insert parameters: {params_list}") cursor.executemany(query, params_list) self.connection.commit() affected_rows = cursor.rowcount @@ -680,7 +682,7 @@ def get_modelsname(df, global_config): tb = 'v_tbl_predict_models' sql = f'select model_name,id from {tb} ' modelsname = global_config['db_mysql'].execute_query(sql) - model_id_name_dict = {row['model_name']: row['id'] for row in modelsname} + model_id_name_dict = {row['id']: row['model_name'] for row in modelsname} model_name_list = [row['model_name'] for row in modelsname] model_name_list = set(columns) & set(model_name_list) model_name_list = list(model_name_list) @@ -688,18 +690,19 @@ def get_modelsname(df, global_config): return model_name_list, model_id_name_dict -def convert_df_to_pydantic(df_predict, model_id_name_dict, bdwd, global_config): +def convert_df_to_pydantic(df_predict, model_id_name_dict, global_config): + reverse_model_id_name_dict = { + value: key for key, value in model_id_name_dict.items()} results = [] data = global_config['DEFAULT_CONFIG'].copy() - data['prediction_date'] = df_predict['created_dt'].values[0] - if isinstance(data['prediction_date'], np.datetime64): - data['prediction_date'] = pd.Timestamp( - data['prediction_date']).to_pydatetime() + data['data_date'] = df_predict['created_dt'].values[0] + if isinstance(data['data_date'], np.datetime64): + data['data_date'] = pd.Timestamp( + data['data_date']).to_pydatetime() for c in df_predict.columns: if c not in ['ds', 'created_dt']: - data['model_id'] = model_id_name_dict[c] - data['bdwd'] = bdwd + data['model_id'] = reverse_model_id_name_dict[c] data['predicted_price'] = Decimal( round(df_predict[c].values[0], 2)) result = PredictionResult(**data) diff --git a/main_yuanyou.py b/main_yuanyou.py index 85ab106..1e56d43 100644 --- a/main_yuanyou.py +++ b/main_yuanyou.py @@ -161,58 +161,80 @@ def sql_inset_predict(global_config): # 获取本周预测结果 this_week_df = df[df['ds'] == df['ds'].max()] - wd = ['next_day', 'this_week'] + wd = ['day_price', 'week_price'] model_name_list, model_id_name_dict = get_modelsname(df, global_config) - for df, w in zip([next_day_df, this_week_df], wd): + PRICE_COLUMNS = [ + 'day_price', 'week_price', 'second_week_price', 'next_week_price', + 'next_month_price', 'next_february_price', 'next_march_price', 'next_april_price' + ] + + params_list = [] + for df, price_type in zip([next_day_df, this_week_df], wd): + + update_columns = [ + "feature_factor_frequency = VALUES(feature_factor_frequency)", + "oil_code = VALUES(oil_code)", + "oil_name = VALUES(oil_name)", + "data_date = VALUES(data_date)", + "market_price = VALUES(market_price)", + f"{price_type} = VALUES({price_type})", + "model_evaluation_id = VALUES(model_evaluation_id)", + "tenant_code = VALUES(tenant_code)", + "version_num = VALUES(version_num)", + "delete_flag = VALUES(delete_flag)", + "update_user = VALUES(update_user)", + "update_date = VALUES(update_date)" + ] + + insert_query = f""" + INSERT INTO v_tbl_predict_prediction_results ( + feature_factor_frequency, strategy_id, oil_code, oil_name, data_date, + market_price, day_price, week_price, second_week_price, next_week_price, + next_month_price, next_february_price, next_march_price, next_april_price, + model_evaluation_id, model_id, tenant_code, version_num, delete_flag, + create_user, create_date, update_user, update_date + ) VALUES ( + %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s + ) + ON DUPLICATE KEY UPDATE + {', '.join(update_columns)} + """ + next_day_df = df[['ds', 'created_dt'] + model_name_list] + pydantic_results = convert_df_to_pydantic( - next_day_df, model_id_name_dict, w, global_config) + next_day_df, model_id_name_dict, global_config) if pydantic_results: - insert_query = """ - INSERT INTO v_tbl_predict_prediction_results ( - prediction_date, bdwd, - feature_factor_frequency, strategy_id, predicted_price, - model_evaluation_id, model_id, tenant_code, - version_num, delete_flag, create_user, create_date, update_user, update_date - ) VALUES ( - %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s - ) - ON DUPLICATE KEY UPDATE - feature_factor_frequency = VALUES(feature_factor_frequency), - predicted_price = VALUES(predicted_price), - model_evaluation_id = VALUES(model_evaluation_id), - tenant_code = VALUES(tenant_code), - version_num = VALUES(version_num), - delete_flag = VALUES(delete_flag), - create_user = VALUES(create_user), - create_date = VALUES(create_date), - update_user = VALUES(update_user), - update_date = VALUES(update_date) - """ - params_list = [] + for result in pydantic_results: + price_values = [None] * len(PRICE_COLUMNS) + price_index = PRICE_COLUMNS.index(price_type) + price_values[price_index] = next_day_df[model_id_name_dict[result.model_id]].values[0] + params = ( - result.prediction_date, - result.bdwd, result.feature_factor_frequency, result.strategy_id, - result.predicted_price, + result.oil_code, + result.oil_name, + next_day_df['created_dt'].values[0], + result.market_price, + *price_values, result.model_evaluation_id, result.model_id, result.tenant_code, - result.version_num, - result.delete_flag, + 1, + '0', result.create_user, result.create_date, result.update_user, result.update_date ) params_list.append(params) - affected_rows = config.db_mysql.execute_batch_insert( - insert_query, params_list) - print(f"成功插入 {affected_rows} 条记录") - config.db_mysql.close() + affected_rows = config.db_mysql.execute_batch_insert( + insert_query, params_list) + config.logger.info(f"成功插入或更新 {affected_rows} 条记录") + config.db_mysql.close() def predict_main(): @@ -563,5 +585,6 @@ if __name__ == '__main__': # end_time = i_time.strftime('%Y-%m-%d') # predict_main() - predict_main() + # predict_main() # push_market_value() + sql_inset_predict(global_config=global_config) diff --git a/main_yuanyou_yuedu.py b/main_yuanyou_yuedu.py index a25f0c8..5150f7e 100644 --- a/main_yuanyou_yuedu.py +++ b/main_yuanyou_yuedu.py @@ -163,68 +163,158 @@ def sql_inset_predict(global_config): df['created_dt'] = pd.to_datetime(df['created_dt']) df['ds'] = pd.to_datetime(df['ds']) # 获取次月预测结果 - next_month_df = df[df['ds'] == df['ds'].min()] + next_month_price_df = df[df['ds'] == df['ds'].min()] # 获取次二月预测结果 - next_two_months_df = df.iloc[[1]] + next_february_price_df = df.iloc[[1]] # 获取次三月预测结果 - next_three_months_df = df.iloc[[2]] + next_march_price_df = df.iloc[[2]] # 获取次四月预测结果 - next_four_months_df = df[df['ds'] == df['ds'].max()] + next_april_price_df = df[df['ds'] == df['ds'].max()] - wd = ['next_month', 'next_two_months', - 'next_three_months', 'next_four_months'] + wd = ['next_month_price', 'next_february_price', + 'next_march_price', 'next_april_price'] model_name_list, model_id_name_dict = get_modelsname(df, global_config) - for df, w in zip([next_month_df, next_two_months_df, next_three_months_df, next_four_months_df], wd): + PRICE_COLUMNS = [ + 'day_price', 'week_price', 'second_week_price', 'next_week_price', + 'next_month_price', 'next_february_price', 'next_march_price', 'next_april_price' + ] + + params_list = [] + for df, price_type in zip([next_month_price_df, next_february_price_df, next_march_price_df, next_april_price_df], wd): + + update_columns = [ + "feature_factor_frequency = VALUES(feature_factor_frequency)", + "oil_code = VALUES(oil_code)", + "oil_name = VALUES(oil_name)", + "data_date = VALUES(data_date)", + "market_price = VALUES(market_price)", + f"{price_type} = VALUES({price_type})", + "model_evaluation_id = VALUES(model_evaluation_id)", + "tenant_code = VALUES(tenant_code)", + "version_num = VALUES(version_num)", + "delete_flag = VALUES(delete_flag)", + "update_user = VALUES(update_user)", + "update_date = VALUES(update_date)" + ] + + insert_query = f""" + INSERT INTO v_tbl_predict_prediction_results ( + feature_factor_frequency, strategy_id, oil_code, oil_name, data_date, + market_price, day_price, week_price, second_week_price, next_week_price, + next_month_price, next_february_price, next_march_price, next_april_price, + model_evaluation_id, model_id, tenant_code, version_num, delete_flag, + create_user, create_date, update_user, update_date + ) VALUES ( + %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s + ) + ON DUPLICATE KEY UPDATE + {', '.join(update_columns)} + """ next_day_df = df[['ds', 'created_dt'] + model_name_list] + pydantic_results = convert_df_to_pydantic( - next_day_df, model_id_name_dict, w, global_config) + next_day_df, model_id_name_dict, global_config) if pydantic_results: - insert_query = """ - INSERT INTO v_tbl_predict_prediction_results ( - prediction_date, bdwd, - feature_factor_frequency, strategy_id, predicted_price, - model_evaluation_id, model_id, tenant_code, - version_num, delete_flag, create_user, create_date, update_user, update_date - ) VALUES ( - %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s - ) - ON DUPLICATE KEY UPDATE - feature_factor_frequency = VALUES(feature_factor_frequency), - predicted_price = VALUES(predicted_price), - model_evaluation_id = VALUES(model_evaluation_id), - tenant_code = VALUES(tenant_code), - version_num = VALUES(version_num), - delete_flag = VALUES(delete_flag), - create_user = VALUES(create_user), - create_date = VALUES(create_date), - update_user = VALUES(update_user), - update_date = VALUES(update_date) - """ - params_list = [] + for result in pydantic_results: + price_values = [None] * len(PRICE_COLUMNS) + price_index = PRICE_COLUMNS.index(price_type) + price_values[price_index] = next_day_df[model_id_name_dict[result.model_id]].values[0] + params = ( - result.prediction_date, - result.bdwd, result.feature_factor_frequency, result.strategy_id, - result.predicted_price, + result.oil_code, + result.oil_name, + next_day_df['created_dt'].values[0], + result.market_price, + *price_values, result.model_evaluation_id, result.model_id, result.tenant_code, - result.version_num, - result.delete_flag, + 1, + '0', result.create_user, result.create_date, result.update_user, result.update_date ) params_list.append(params) - affected_rows = config.db_mysql.execute_batch_insert( - insert_query, params_list) - print(f"成功插入 {affected_rows} 条记录") - config.db_mysql.close() + affected_rows = config.db_mysql.execute_batch_insert( + insert_query, params_list) + config.logger.info(f"成功插入或更新 {affected_rows} 条记录") + config.db_mysql.close() + + +# def sql_inset_predict(global_config): +# df = pd.read_csv(os.path.join(config.dataset, 'predict.csv')) +# df['created_dt'] = pd.to_datetime(df['created_dt']) +# df['ds'] = pd.to_datetime(df['ds']) +# # 获取次月预测结果 +# next_month_df = df[df['ds'] == df['ds'].min()] +# # 获取次二月预测结果 +# next_two_months_df = df.iloc[[1]] +# # 获取次三月预测结果 +# next_three_months_df = df.iloc[[2]] +# # 获取次四月预测结果 +# next_four_months_df = df[df['ds'] == df['ds'].max()] + +# wd = ['next_month', 'next_two_months', +# 'next_three_months', 'next_four_months'] +# model_name_list, model_id_name_dict = get_modelsname(df, global_config) + +# for df, w in zip([next_month_df, next_two_months_df, next_three_months_df, next_four_months_df], wd): + +# next_day_df = df[['ds', 'created_dt'] + model_name_list] +# pydantic_results = convert_df_to_pydantic( +# next_day_df, model_id_name_dict, w, global_config) +# if pydantic_results: +# insert_query = """ +# INSERT INTO v_tbl_predict_prediction_results ( +# prediction_date, bdwd, +# feature_factor_frequency, strategy_id, predicted_price, +# model_evaluation_id, model_id, tenant_code, +# version_num, delete_flag, create_user, create_date, update_user, update_date +# ) VALUES ( +# %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s +# ) +# ON DUPLICATE KEY UPDATE +# feature_factor_frequency = VALUES(feature_factor_frequency), +# predicted_price = VALUES(predicted_price), +# model_evaluation_id = VALUES(model_evaluation_id), +# tenant_code = VALUES(tenant_code), +# version_num = VALUES(version_num), +# delete_flag = VALUES(delete_flag), +# create_user = VALUES(create_user), +# create_date = VALUES(create_date), +# update_user = VALUES(update_user), +# update_date = VALUES(update_date) +# """ +# params_list = [] +# for result in pydantic_results: +# params = ( +# result.prediction_date, +# result.bdwd, +# result.feature_factor_frequency, +# result.strategy_id, +# result.predicted_price, +# result.model_evaluation_id, +# result.model_id, +# result.tenant_code, +# result.version_num, +# result.delete_flag, +# result.create_user, +# result.create_date, +# result.update_user, +# result.update_date +# ) +# params_list.append(params) +# affected_rows = config.db_mysql.execute_batch_insert( +# insert_query, params_list) +# print(f"成功插入 {affected_rows} 条记录") +# config.db_mysql.close() def predict_main(): @@ -508,4 +598,5 @@ if __name__ == '__main__': # logger.info(f'预测失败:{e}') # continue - predict_main() + # predict_main() + sql_inset_predict(global_config=global_config) diff --git a/main_yuanyou_zhoudu.py b/main_yuanyou_zhoudu.py index 449d187..543d7b9 100644 --- a/main_yuanyou_zhoudu.py +++ b/main_yuanyou_zhoudu.py @@ -145,63 +145,85 @@ def sql_inset_predict(global_config): df = pd.read_csv(os.path.join(config.dataset, 'predict.csv')) df['created_dt'] = pd.to_datetime(df['created_dt']) df['ds'] = pd.to_datetime(df['ds']) - # 获取次日预测结果 - next_week_df = df[df['ds'] == df['ds'].min()] - # 获取本周预测结果 - next_two_weeks_df = df[df['ds'] == df['ds'].max()] + # 获取次周预测结果 + second_week_price_df = df[df['ds'] == df['ds'].min()] + # 获取隔周周预测结果 + next_week_price_df = df[df['ds'] == df['ds'].max()] - wd = ['next_week', 'next_two_weeks'] + wd = ['second_week_price', 'next_week_price'] model_name_list, model_id_name_dict = get_modelsname(df, global_config) - for df, w in zip([next_week_df, next_two_weeks_df], wd): + PRICE_COLUMNS = [ + 'day_price', 'week_price', 'second_week_price', 'next_week_price', + 'next_month_price', 'next_february_price', 'next_march_price', 'next_april_price' + ] + + params_list = [] + for df, price_type in zip([second_week_price_df, next_week_price_df], wd): + + update_columns = [ + "feature_factor_frequency = VALUES(feature_factor_frequency)", + "oil_code = VALUES(oil_code)", + "oil_name = VALUES(oil_name)", + "data_date = VALUES(data_date)", + "market_price = VALUES(market_price)", + f"{price_type} = VALUES({price_type})", + "model_evaluation_id = VALUES(model_evaluation_id)", + "tenant_code = VALUES(tenant_code)", + "version_num = VALUES(version_num)", + "delete_flag = VALUES(delete_flag)", + "update_user = VALUES(update_user)", + "update_date = VALUES(update_date)" + ] + + insert_query = f""" + INSERT INTO v_tbl_predict_prediction_results ( + feature_factor_frequency, strategy_id, oil_code, oil_name, data_date, + market_price, day_price, week_price, second_week_price, next_week_price, + next_month_price, next_february_price, next_march_price, next_april_price, + model_evaluation_id, model_id, tenant_code, version_num, delete_flag, + create_user, create_date, update_user, update_date + ) VALUES ( + %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s + ) + ON DUPLICATE KEY UPDATE + {', '.join(update_columns)} + """ + next_day_df = df[['ds', 'created_dt'] + model_name_list] + pydantic_results = convert_df_to_pydantic( - next_day_df, model_id_name_dict, w, global_config) + next_day_df, model_id_name_dict, global_config) if pydantic_results: - insert_query = """ - INSERT INTO v_tbl_predict_prediction_results ( - prediction_date, bdwd, - feature_factor_frequency, strategy_id, predicted_price, - model_evaluation_id, model_id, tenant_code, - version_num, delete_flag, create_user, create_date, update_user, update_date - ) VALUES ( - %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s - ) - ON DUPLICATE KEY UPDATE - feature_factor_frequency = VALUES(feature_factor_frequency), - predicted_price = VALUES(predicted_price), - model_evaluation_id = VALUES(model_evaluation_id), - tenant_code = VALUES(tenant_code), - version_num = VALUES(version_num), - delete_flag = VALUES(delete_flag), - create_user = VALUES(create_user), - create_date = VALUES(create_date), - update_user = VALUES(update_user), - update_date = VALUES(update_date) - """ - params_list = [] + for result in pydantic_results: + price_values = [None] * len(PRICE_COLUMNS) + price_index = PRICE_COLUMNS.index(price_type) + price_values[price_index] = next_day_df[model_id_name_dict[result.model_id]].values[0] + params = ( - result.prediction_date, - result.bdwd, result.feature_factor_frequency, result.strategy_id, - result.predicted_price, + result.oil_code, + result.oil_name, + next_day_df['created_dt'].values[0], + result.market_price, + *price_values, result.model_evaluation_id, result.model_id, result.tenant_code, - result.version_num, - result.delete_flag, + 1, + '0', result.create_user, result.create_date, result.update_user, result.update_date ) params_list.append(params) - affected_rows = config.db_mysql.execute_batch_insert( - insert_query, params_list) - print(f"成功插入 {affected_rows} 条记录") - config.db_mysql.close() + affected_rows = config.db_mysql.execute_batch_insert( + insert_query, params_list) + config.logger.info(f"成功插入或更新 {affected_rows} 条记录") + config.db_mysql.close() def predict_main(): @@ -478,4 +500,5 @@ if __name__ == '__main__': # logger.info(f'预测失败:{e}') # continue - predict_main() + # predict_main() + sql_inset_predict(global_config=global_config)