八大维度数据存储到数据库,修改表设计

This commit is contained in:
workpc 2025-06-26 19:50:19 +08:00
parent b2c958a4fd
commit 352d30dfa9
5 changed files with 274 additions and 125 deletions

View File

@ -7,11 +7,20 @@ from decimal import Decimal
class PredictionResult(BaseModel): class PredictionResult(BaseModel):
prediction_date: datetime
bdwd: Optional[str] = None
feature_factor_frequency: str feature_factor_frequency: str
strategy_id: int strategy_id: int
predicted_price: Decimal oil_code: Optional[str] = None
oil_name: Optional[str] = None
data_date: Optional[datetime] = None
market_price: Optional[Decimal] = None
day_price: Optional[Decimal] = None
week_price: Optional[Decimal] = None
second_week_price: Optional[Decimal] = None
next_week_price: Optional[Decimal] = None
next_month_price: Optional[Decimal] = None
next_february_price: Optional[Decimal] = None
next_march_price: Optional[Decimal] = None
next_april_price: Optional[Decimal] = None
model_evaluation_id: int model_evaluation_id: int
model_id: int model_id: int
tenant_code: Optional[str] = None tenant_code: Optional[str] = None

View File

@ -576,6 +576,8 @@ class MySQLDB:
return 0 return 0
try: try:
cursor = self.connection.cursor() cursor = self.connection.cursor()
logging.info(f"Executing batch insert SQL: {query}")
logging.info(f"Batch insert parameters: {params_list}")
cursor.executemany(query, params_list) cursor.executemany(query, params_list)
self.connection.commit() self.connection.commit()
affected_rows = cursor.rowcount affected_rows = cursor.rowcount
@ -680,7 +682,7 @@ def get_modelsname(df, global_config):
tb = 'v_tbl_predict_models' tb = 'v_tbl_predict_models'
sql = f'select model_name,id from {tb} ' sql = f'select model_name,id from {tb} '
modelsname = global_config['db_mysql'].execute_query(sql) modelsname = global_config['db_mysql'].execute_query(sql)
model_id_name_dict = {row['model_name']: row['id'] for row in modelsname} model_id_name_dict = {row['id']: row['model_name'] for row in modelsname}
model_name_list = [row['model_name'] for row in modelsname] model_name_list = [row['model_name'] for row in modelsname]
model_name_list = set(columns) & set(model_name_list) model_name_list = set(columns) & set(model_name_list)
model_name_list = list(model_name_list) model_name_list = list(model_name_list)
@ -688,18 +690,19 @@ def get_modelsname(df, global_config):
return model_name_list, model_id_name_dict return model_name_list, model_id_name_dict
def convert_df_to_pydantic(df_predict, model_id_name_dict, bdwd, global_config): def convert_df_to_pydantic(df_predict, model_id_name_dict, global_config):
reverse_model_id_name_dict = {
value: key for key, value in model_id_name_dict.items()}
results = [] results = []
data = global_config['DEFAULT_CONFIG'].copy() data = global_config['DEFAULT_CONFIG'].copy()
data['prediction_date'] = df_predict['created_dt'].values[0] data['data_date'] = df_predict['created_dt'].values[0]
if isinstance(data['prediction_date'], np.datetime64): if isinstance(data['data_date'], np.datetime64):
data['prediction_date'] = pd.Timestamp( data['data_date'] = pd.Timestamp(
data['prediction_date']).to_pydatetime() data['data_date']).to_pydatetime()
for c in df_predict.columns: for c in df_predict.columns:
if c not in ['ds', 'created_dt']: if c not in ['ds', 'created_dt']:
data['model_id'] = model_id_name_dict[c] data['model_id'] = reverse_model_id_name_dict[c]
data['bdwd'] = bdwd
data['predicted_price'] = Decimal( data['predicted_price'] = Decimal(
round(df_predict[c].values[0], 2)) round(df_predict[c].values[0], 2))
result = PredictionResult(**data) result = PredictionResult(**data)

View File

@ -161,48 +161,70 @@ def sql_inset_predict(global_config):
# 获取本周预测结果 # 获取本周预测结果
this_week_df = df[df['ds'] == df['ds'].max()] this_week_df = df[df['ds'] == df['ds'].max()]
wd = ['next_day', 'this_week'] wd = ['day_price', 'week_price']
model_name_list, model_id_name_dict = get_modelsname(df, global_config) model_name_list, model_id_name_dict = get_modelsname(df, global_config)
for df, w in zip([next_day_df, this_week_df], wd): PRICE_COLUMNS = [
next_day_df = df[['ds', 'created_dt'] + model_name_list] 'day_price', 'week_price', 'second_week_price', 'next_week_price',
pydantic_results = convert_df_to_pydantic( 'next_month_price', 'next_february_price', 'next_march_price', 'next_april_price'
next_day_df, model_id_name_dict, w, global_config) ]
if pydantic_results:
insert_query = """ params_list = []
for df, price_type in zip([next_day_df, this_week_df], wd):
update_columns = [
"feature_factor_frequency = VALUES(feature_factor_frequency)",
"oil_code = VALUES(oil_code)",
"oil_name = VALUES(oil_name)",
"data_date = VALUES(data_date)",
"market_price = VALUES(market_price)",
f"{price_type} = VALUES({price_type})",
"model_evaluation_id = VALUES(model_evaluation_id)",
"tenant_code = VALUES(tenant_code)",
"version_num = VALUES(version_num)",
"delete_flag = VALUES(delete_flag)",
"update_user = VALUES(update_user)",
"update_date = VALUES(update_date)"
]
insert_query = f"""
INSERT INTO v_tbl_predict_prediction_results ( INSERT INTO v_tbl_predict_prediction_results (
prediction_date, bdwd, feature_factor_frequency, strategy_id, oil_code, oil_name, data_date,
feature_factor_frequency, strategy_id, predicted_price, market_price, day_price, week_price, second_week_price, next_week_price,
model_evaluation_id, model_id, tenant_code, next_month_price, next_february_price, next_march_price, next_april_price,
version_num, delete_flag, create_user, create_date, update_user, update_date model_evaluation_id, model_id, tenant_code, version_num, delete_flag,
create_user, create_date, update_user, update_date
) VALUES ( ) VALUES (
%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s
) )
ON DUPLICATE KEY UPDATE ON DUPLICATE KEY UPDATE
feature_factor_frequency = VALUES(feature_factor_frequency), {', '.join(update_columns)}
predicted_price = VALUES(predicted_price),
model_evaluation_id = VALUES(model_evaluation_id),
tenant_code = VALUES(tenant_code),
version_num = VALUES(version_num),
delete_flag = VALUES(delete_flag),
create_user = VALUES(create_user),
create_date = VALUES(create_date),
update_user = VALUES(update_user),
update_date = VALUES(update_date)
""" """
params_list = []
next_day_df = df[['ds', 'created_dt'] + model_name_list]
pydantic_results = convert_df_to_pydantic(
next_day_df, model_id_name_dict, global_config)
if pydantic_results:
for result in pydantic_results: for result in pydantic_results:
price_values = [None] * len(PRICE_COLUMNS)
price_index = PRICE_COLUMNS.index(price_type)
price_values[price_index] = next_day_df[model_id_name_dict[result.model_id]].values[0]
params = ( params = (
result.prediction_date,
result.bdwd,
result.feature_factor_frequency, result.feature_factor_frequency,
result.strategy_id, result.strategy_id,
result.predicted_price, result.oil_code,
result.oil_name,
next_day_df['created_dt'].values[0],
result.market_price,
*price_values,
result.model_evaluation_id, result.model_evaluation_id,
result.model_id, result.model_id,
result.tenant_code, result.tenant_code,
result.version_num, 1,
result.delete_flag, '0',
result.create_user, result.create_user,
result.create_date, result.create_date,
result.update_user, result.update_user,
@ -211,7 +233,7 @@ def sql_inset_predict(global_config):
params_list.append(params) params_list.append(params)
affected_rows = config.db_mysql.execute_batch_insert( affected_rows = config.db_mysql.execute_batch_insert(
insert_query, params_list) insert_query, params_list)
print(f"成功插入 {affected_rows} 条记录") config.logger.info(f"成功插入或更新 {affected_rows} 条记录")
config.db_mysql.close() config.db_mysql.close()
@ -563,5 +585,6 @@ if __name__ == '__main__':
# end_time = i_time.strftime('%Y-%m-%d') # end_time = i_time.strftime('%Y-%m-%d')
# predict_main() # predict_main()
predict_main() # predict_main()
# push_market_value() # push_market_value()
sql_inset_predict(global_config=global_config)

View File

@ -163,58 +163,79 @@ def sql_inset_predict(global_config):
df['created_dt'] = pd.to_datetime(df['created_dt']) df['created_dt'] = pd.to_datetime(df['created_dt'])
df['ds'] = pd.to_datetime(df['ds']) df['ds'] = pd.to_datetime(df['ds'])
# 获取次月预测结果 # 获取次月预测结果
next_month_df = df[df['ds'] == df['ds'].min()] next_month_price_df = df[df['ds'] == df['ds'].min()]
# 获取次二月预测结果 # 获取次二月预测结果
next_two_months_df = df.iloc[[1]] next_february_price_df = df.iloc[[1]]
# 获取次三月预测结果 # 获取次三月预测结果
next_three_months_df = df.iloc[[2]] next_march_price_df = df.iloc[[2]]
# 获取次四月预测结果 # 获取次四月预测结果
next_four_months_df = df[df['ds'] == df['ds'].max()] next_april_price_df = df[df['ds'] == df['ds'].max()]
wd = ['next_month', 'next_two_months', wd = ['next_month_price', 'next_february_price',
'next_three_months', 'next_four_months'] 'next_march_price', 'next_april_price']
model_name_list, model_id_name_dict = get_modelsname(df, global_config) model_name_list, model_id_name_dict = get_modelsname(df, global_config)
for df, w in zip([next_month_df, next_two_months_df, next_three_months_df, next_four_months_df], wd): PRICE_COLUMNS = [
'day_price', 'week_price', 'second_week_price', 'next_week_price',
'next_month_price', 'next_february_price', 'next_march_price', 'next_april_price'
]
next_day_df = df[['ds', 'created_dt'] + model_name_list] params_list = []
pydantic_results = convert_df_to_pydantic( for df, price_type in zip([next_month_price_df, next_february_price_df, next_march_price_df, next_april_price_df], wd):
next_day_df, model_id_name_dict, w, global_config)
if pydantic_results: update_columns = [
insert_query = """ "feature_factor_frequency = VALUES(feature_factor_frequency)",
"oil_code = VALUES(oil_code)",
"oil_name = VALUES(oil_name)",
"data_date = VALUES(data_date)",
"market_price = VALUES(market_price)",
f"{price_type} = VALUES({price_type})",
"model_evaluation_id = VALUES(model_evaluation_id)",
"tenant_code = VALUES(tenant_code)",
"version_num = VALUES(version_num)",
"delete_flag = VALUES(delete_flag)",
"update_user = VALUES(update_user)",
"update_date = VALUES(update_date)"
]
insert_query = f"""
INSERT INTO v_tbl_predict_prediction_results ( INSERT INTO v_tbl_predict_prediction_results (
prediction_date, bdwd, feature_factor_frequency, strategy_id, oil_code, oil_name, data_date,
feature_factor_frequency, strategy_id, predicted_price, market_price, day_price, week_price, second_week_price, next_week_price,
model_evaluation_id, model_id, tenant_code, next_month_price, next_february_price, next_march_price, next_april_price,
version_num, delete_flag, create_user, create_date, update_user, update_date model_evaluation_id, model_id, tenant_code, version_num, delete_flag,
create_user, create_date, update_user, update_date
) VALUES ( ) VALUES (
%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s
) )
ON DUPLICATE KEY UPDATE ON DUPLICATE KEY UPDATE
feature_factor_frequency = VALUES(feature_factor_frequency), {', '.join(update_columns)}
predicted_price = VALUES(predicted_price),
model_evaluation_id = VALUES(model_evaluation_id),
tenant_code = VALUES(tenant_code),
version_num = VALUES(version_num),
delete_flag = VALUES(delete_flag),
create_user = VALUES(create_user),
create_date = VALUES(create_date),
update_user = VALUES(update_user),
update_date = VALUES(update_date)
""" """
params_list = []
next_day_df = df[['ds', 'created_dt'] + model_name_list]
pydantic_results = convert_df_to_pydantic(
next_day_df, model_id_name_dict, global_config)
if pydantic_results:
for result in pydantic_results: for result in pydantic_results:
price_values = [None] * len(PRICE_COLUMNS)
price_index = PRICE_COLUMNS.index(price_type)
price_values[price_index] = next_day_df[model_id_name_dict[result.model_id]].values[0]
params = ( params = (
result.prediction_date,
result.bdwd,
result.feature_factor_frequency, result.feature_factor_frequency,
result.strategy_id, result.strategy_id,
result.predicted_price, result.oil_code,
result.oil_name,
next_day_df['created_dt'].values[0],
result.market_price,
*price_values,
result.model_evaluation_id, result.model_evaluation_id,
result.model_id, result.model_id,
result.tenant_code, result.tenant_code,
result.version_num, 1,
result.delete_flag, '0',
result.create_user, result.create_user,
result.create_date, result.create_date,
result.update_user, result.update_user,
@ -223,10 +244,79 @@ def sql_inset_predict(global_config):
params_list.append(params) params_list.append(params)
affected_rows = config.db_mysql.execute_batch_insert( affected_rows = config.db_mysql.execute_batch_insert(
insert_query, params_list) insert_query, params_list)
print(f"成功插入 {affected_rows} 条记录") config.logger.info(f"成功插入或更新 {affected_rows} 条记录")
config.db_mysql.close() config.db_mysql.close()
# def sql_inset_predict(global_config):
# df = pd.read_csv(os.path.join(config.dataset, 'predict.csv'))
# df['created_dt'] = pd.to_datetime(df['created_dt'])
# df['ds'] = pd.to_datetime(df['ds'])
# # 获取次月预测结果
# next_month_df = df[df['ds'] == df['ds'].min()]
# # 获取次二月预测结果
# next_two_months_df = df.iloc[[1]]
# # 获取次三月预测结果
# next_three_months_df = df.iloc[[2]]
# # 获取次四月预测结果
# next_four_months_df = df[df['ds'] == df['ds'].max()]
# wd = ['next_month', 'next_two_months',
# 'next_three_months', 'next_four_months']
# model_name_list, model_id_name_dict = get_modelsname(df, global_config)
# for df, w in zip([next_month_df, next_two_months_df, next_three_months_df, next_four_months_df], wd):
# next_day_df = df[['ds', 'created_dt'] + model_name_list]
# pydantic_results = convert_df_to_pydantic(
# next_day_df, model_id_name_dict, w, global_config)
# if pydantic_results:
# insert_query = """
# INSERT INTO v_tbl_predict_prediction_results (
# prediction_date, bdwd,
# feature_factor_frequency, strategy_id, predicted_price,
# model_evaluation_id, model_id, tenant_code,
# version_num, delete_flag, create_user, create_date, update_user, update_date
# ) VALUES (
# %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s
# )
# ON DUPLICATE KEY UPDATE
# feature_factor_frequency = VALUES(feature_factor_frequency),
# predicted_price = VALUES(predicted_price),
# model_evaluation_id = VALUES(model_evaluation_id),
# tenant_code = VALUES(tenant_code),
# version_num = VALUES(version_num),
# delete_flag = VALUES(delete_flag),
# create_user = VALUES(create_user),
# create_date = VALUES(create_date),
# update_user = VALUES(update_user),
# update_date = VALUES(update_date)
# """
# params_list = []
# for result in pydantic_results:
# params = (
# result.prediction_date,
# result.bdwd,
# result.feature_factor_frequency,
# result.strategy_id,
# result.predicted_price,
# result.model_evaluation_id,
# result.model_id,
# result.tenant_code,
# result.version_num,
# result.delete_flag,
# result.create_user,
# result.create_date,
# result.update_user,
# result.update_date
# )
# params_list.append(params)
# affected_rows = config.db_mysql.execute_batch_insert(
# insert_query, params_list)
# print(f"成功插入 {affected_rows} 条记录")
# config.db_mysql.close()
def predict_main(): def predict_main():
""" """
主预测函数用于从 ETA 获取数据处理数据训练模型并进行预测 主预测函数用于从 ETA 获取数据处理数据训练模型并进行预测
@ -508,4 +598,5 @@ if __name__ == '__main__':
# logger.info(f'预测失败:{e}') # logger.info(f'预测失败:{e}')
# continue # continue
predict_main() # predict_main()
sql_inset_predict(global_config=global_config)

View File

@ -145,53 +145,75 @@ def sql_inset_predict(global_config):
df = pd.read_csv(os.path.join(config.dataset, 'predict.csv')) df = pd.read_csv(os.path.join(config.dataset, 'predict.csv'))
df['created_dt'] = pd.to_datetime(df['created_dt']) df['created_dt'] = pd.to_datetime(df['created_dt'])
df['ds'] = pd.to_datetime(df['ds']) df['ds'] = pd.to_datetime(df['ds'])
# 获取次预测结果 # 获取次预测结果
next_week_df = df[df['ds'] == df['ds'].min()] second_week_price_df = df[df['ds'] == df['ds'].min()]
# 获取周预测结果 # 获取隔周周预测结果
next_two_weeks_df = df[df['ds'] == df['ds'].max()] next_week_price_df = df[df['ds'] == df['ds'].max()]
wd = ['next_week', 'next_two_weeks'] wd = ['second_week_price', 'next_week_price']
model_name_list, model_id_name_dict = get_modelsname(df, global_config) model_name_list, model_id_name_dict = get_modelsname(df, global_config)
for df, w in zip([next_week_df, next_two_weeks_df], wd): PRICE_COLUMNS = [
next_day_df = df[['ds', 'created_dt'] + model_name_list] 'day_price', 'week_price', 'second_week_price', 'next_week_price',
pydantic_results = convert_df_to_pydantic( 'next_month_price', 'next_february_price', 'next_march_price', 'next_april_price'
next_day_df, model_id_name_dict, w, global_config) ]
if pydantic_results:
insert_query = """ params_list = []
for df, price_type in zip([second_week_price_df, next_week_price_df], wd):
update_columns = [
"feature_factor_frequency = VALUES(feature_factor_frequency)",
"oil_code = VALUES(oil_code)",
"oil_name = VALUES(oil_name)",
"data_date = VALUES(data_date)",
"market_price = VALUES(market_price)",
f"{price_type} = VALUES({price_type})",
"model_evaluation_id = VALUES(model_evaluation_id)",
"tenant_code = VALUES(tenant_code)",
"version_num = VALUES(version_num)",
"delete_flag = VALUES(delete_flag)",
"update_user = VALUES(update_user)",
"update_date = VALUES(update_date)"
]
insert_query = f"""
INSERT INTO v_tbl_predict_prediction_results ( INSERT INTO v_tbl_predict_prediction_results (
prediction_date, bdwd, feature_factor_frequency, strategy_id, oil_code, oil_name, data_date,
feature_factor_frequency, strategy_id, predicted_price, market_price, day_price, week_price, second_week_price, next_week_price,
model_evaluation_id, model_id, tenant_code, next_month_price, next_february_price, next_march_price, next_april_price,
version_num, delete_flag, create_user, create_date, update_user, update_date model_evaluation_id, model_id, tenant_code, version_num, delete_flag,
create_user, create_date, update_user, update_date
) VALUES ( ) VALUES (
%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s
) )
ON DUPLICATE KEY UPDATE ON DUPLICATE KEY UPDATE
feature_factor_frequency = VALUES(feature_factor_frequency), {', '.join(update_columns)}
predicted_price = VALUES(predicted_price),
model_evaluation_id = VALUES(model_evaluation_id),
tenant_code = VALUES(tenant_code),
version_num = VALUES(version_num),
delete_flag = VALUES(delete_flag),
create_user = VALUES(create_user),
create_date = VALUES(create_date),
update_user = VALUES(update_user),
update_date = VALUES(update_date)
""" """
params_list = []
next_day_df = df[['ds', 'created_dt'] + model_name_list]
pydantic_results = convert_df_to_pydantic(
next_day_df, model_id_name_dict, global_config)
if pydantic_results:
for result in pydantic_results: for result in pydantic_results:
price_values = [None] * len(PRICE_COLUMNS)
price_index = PRICE_COLUMNS.index(price_type)
price_values[price_index] = next_day_df[model_id_name_dict[result.model_id]].values[0]
params = ( params = (
result.prediction_date,
result.bdwd,
result.feature_factor_frequency, result.feature_factor_frequency,
result.strategy_id, result.strategy_id,
result.predicted_price, result.oil_code,
result.oil_name,
next_day_df['created_dt'].values[0],
result.market_price,
*price_values,
result.model_evaluation_id, result.model_evaluation_id,
result.model_id, result.model_id,
result.tenant_code, result.tenant_code,
result.version_num, 1,
result.delete_flag, '0',
result.create_user, result.create_user,
result.create_date, result.create_date,
result.update_user, result.update_user,
@ -200,7 +222,7 @@ def sql_inset_predict(global_config):
params_list.append(params) params_list.append(params)
affected_rows = config.db_mysql.execute_batch_insert( affected_rows = config.db_mysql.execute_batch_insert(
insert_query, params_list) insert_query, params_list)
print(f"成功插入 {affected_rows} 条记录") config.logger.info(f"成功插入或更新 {affected_rows} 条记录")
config.db_mysql.close() config.db_mysql.close()
@ -478,4 +500,5 @@ if __name__ == '__main__':
# logger.info(f'预测失败:{e}') # logger.info(f'预测失败:{e}')
# continue # continue
predict_main() # predict_main()
sql_inset_predict(global_config=global_config)