原油预警内容上传到市场信息平台

This commit is contained in:
workpc 2025-05-14 08:34:21 +08:00
parent 035b74a617
commit 32da540a40
5 changed files with 189 additions and 143 deletions

View File

@ -89,8 +89,7 @@ data = {
ClassifyId = 1214 ClassifyId = 1214
# 变量定义--线上环境
################################################################################################################ 变量定义--线上环境
# server_host = '10.200.32.39' # server_host = '10.200.32.39'
# login_pushreport_url = "http://10.200.32.39/jingbo-api/api/server/login" # login_pushreport_url = "http://10.200.32.39/jingbo-api/api/server/login"
# upload_url = "http://10.200.32.39/jingbo-api/api/analysis/reportInfo/researchUploadReportSave" # upload_url = "http://10.200.32.39/jingbo-api/api/analysis/reportInfo/researchUploadReportSave"
@ -111,7 +110,6 @@ ClassifyId = 1214
# } # }
# upload_data = { # upload_data = {
# "funcModule":'研究报告信息', # "funcModule":'研究报告信息',
# "funcOperation":'上传原油价格预测报告', # "funcOperation":'上传原油价格预测报告',
@ -151,7 +149,6 @@ ClassifyId = 1214
# } # }
# push_data_value_list_data = { # push_data_value_list_data = {
# "funcModule": "数据表信息列表", # "funcModule": "数据表信息列表",
# "funcOperation": "新增", # "funcOperation": "新增",
@ -195,7 +192,6 @@ ClassifyId = 1214
# table_name = 'v_tbl_crude_oil_warning' # table_name = 'v_tbl_crude_oil_warning'
# # 变量定义--测试环境 # # 变量定义--测试环境
server_host = '192.168.100.53' # 内网 server_host = '192.168.100.53' # 内网
# server_host = '183.242.74.28' # 外网 # server_host = '183.242.74.28' # 外网
@ -208,6 +204,8 @@ upload_warning_url = f"http://{server_host}:8080/jingbo-dev/api/basicBuiness/cru
query_data_list_item_nos_url = f"http://{server_host}:8080/jingbo-dev/api/warehouse/dwDataItem/queryDataListItemNos" query_data_list_item_nos_url = f"http://{server_host}:8080/jingbo-dev/api/warehouse/dwDataItem/queryDataListItemNos"
# 上传数据项值 # 上传数据项值
push_data_value_list_url = f"http://{server_host}:8080/jingbo-dev/api/dw/dataValue/pushDataValueList" push_data_value_list_url = f"http://{server_host}:8080/jingbo-dev/api/dw/dataValue/pushDataValueList"
# 上传停更数据到市场信息平台
push_waring_data_value_list_url = f"http://{server_host}:8080/jingbo-dev/api/basicBuiness/crudeOilWarning/crudeSaveOrupdate"
login_data = { login_data = {
"data": { "data": {
@ -281,6 +279,26 @@ push_data_value_list_data = {
} }
] ]
} }
push_waring_data_value_list_data = {
"data": {
"crudeOilWarningDtoList": [
{
"lastUpdateDate": "20240501",
"updateSuspensionCycle": 1,
"dataSource": "8",
"frequency": "1",
"indicatorName": "美元指数",
"indicatorId": "myzs001",
"warningDate": "2024-05-13"
}
],
"dataSource": "8"
},
"funcModule": "商品数据同步",
"funcOperation": "同步"
}
# 八大维度数据项编码 # 八大维度数据项编码
bdwd_items = { bdwd_items = {
'ciri': 'yyycbdwdcr', 'ciri': 'yyycbdwdcr',

View File

@ -40,15 +40,6 @@ pdfmetrics.registerFont(TTFont('SimSun', 'SimSun.ttf'))
plt.rcParams['font.sans-serif'] = ['SimHei'] # 用来正常显示中文标签 plt.rcParams['font.sans-serif'] = ['SimHei'] # 用来正常显示中文标签
plt.rcParams['axes.unicode_minus'] = False # 用来正常显示负号 plt.rcParams['axes.unicode_minus'] = False # 用来正常显示负号
# from config_jingbo_pro import *
# from config_jingbo import *
# from config_jingbo_yuedu import *
# from config_yongan import *
# from config_juxiting import *
# from config_juxiting_zhoudu import *
# from config_juxiting_pro import *
# from config_jingbo import logger
global_config = { global_config = {
# 核心配置项 # 核心配置项
@ -93,6 +84,9 @@ global_config = {
# 上传数据项数据 # 上传数据项数据
'push_data_value_list_url': None, 'push_data_value_list_url': None,
'push_data_value_list_data': None, 'push_data_value_list_data': None,
# 上传停更预警数据
'push_waring_data_value_list_url': None,
'push_waring_data_value_list_data': None,
# 字段映射 # 字段映射
'offsite_col': None, # 站点字段 'offsite_col': None, # 站点字段
@ -1222,6 +1216,14 @@ class Config:
def push_data_value_list_data( def push_data_value_list_data(
self): return global_config['push_data_value_list_data'] self): return global_config['push_data_value_list_data']
@property
def push_waring_data_value_list_url(
self): return global_config['push_waring_data_value_list_url']
@property
def push_waring_data_value_list_data(
self): return global_config['push_waring_data_value_list_data']
@property @property
def bdwd_items(self): return global_config['bdwd_items'] def bdwd_items(self): return global_config['bdwd_items']
@ -2203,6 +2205,43 @@ def push_market_data(data):
return json_data return json_data
def push_waring_market_data(data):
'''
上传停更预警数据到市场信息平台
data: 预测价格数据,示例
{
"data": {
"crudeOilWarningDtoList": [
{
"lastUpdateDate": "20240510",
"updateSuspensionCycle": 1,
"dataSource": "8",
"frequency": "1",
"indicatorName": "美元指数",
"indicatorId": "myzs001",
"warningDate": "2024-05-12"
}
],
"dataSource": "8"
},
"funcModule": "商品数据同步",
"funcOperation": "同步"
}
'''
# 获取token
token = get_head_auth_report()
# 定义请求参数
global_config['push_waring_data_value_list_data']['data']["crudeOilWarningDtoList"] = data
# 发送请求
headers = {"Authorization": token}
config.logger.info('上传数据中...')
items_res = requests.post(url=config.push_waring_data_value_list_url, headers=headers,
json=config.push_waring_data_value_list_data, timeout=(3, 35))
json_data = json.loads(items_res.text)
config.logger.info(f"上传结果:{json_data}")
return json_data
def get_high_low_data(df): def get_high_low_data(df):
# 读取excel 从第五行开始 # 读取excel 从第五行开始
df1 = pd.read_excel(os.path.join(config.dataset, '数据项下载.xls'), header=5, names=[ df1 = pd.read_excel(os.path.join(config.dataset, '数据项下载.xls'), header=5, names=[

View File

@ -63,6 +63,10 @@ global_config.update({
'push_data_value_list_url': push_data_value_list_url, 'push_data_value_list_url': push_data_value_list_url,
'push_data_value_list_data': push_data_value_list_data, 'push_data_value_list_data': push_data_value_list_data,
# 上传数据项
'push_waring_data_value_list_url': push_waring_data_value_list_url,
'push_waring_data_value_list_data': push_waring_data_value_list_data,
# eta 配置 # eta 配置
'APPID': APPID, 'APPID': APPID,
'SECRET': SECRET, 'SECRET': SECRET,
@ -83,7 +87,6 @@ global_config.update({
}) })
def push_market_value(): def push_market_value():
config.logger.info('发送预测结果到市场信息平台') config.logger.info('发送预测结果到市场信息平台')
# 读取预测数据和模型评估数据 # 读取预测数据和模型评估数据
@ -141,8 +144,6 @@ def push_market_value():
config.logger.error(f"推送数据失败: {e}") config.logger.error(f"推送数据失败: {e}")
def predict_main(): def predict_main():
""" """
主预测函数用于从 ETA 获取数据处理数据训练模型并进行预测 主预测函数用于从 ETA 获取数据处理数据训练模型并进行预测
@ -338,94 +339,122 @@ def predict_main():
sqlitedb.insert_data('most_model', (datetime.datetime.now().strftime( sqlitedb.insert_data('most_model', (datetime.datetime.now().strftime(
'%Y-%m-%d %H:%M:%S'), most_common_model,), columns=('ds', 'most_common_model',)) '%Y-%m-%d %H:%M:%S'), most_common_model,), columns=('ds', 'most_common_model',))
try: # try:
if is_weekday: # # if is_weekday:
# if True: # if True:
logger.info('今天是周一,发送特征预警') # logger.info('今天是周一,发送特征预警')
# # 上传预警信息到数据库
# warning_data_df = df_zhibiaoliebiao.copy()
# warning_data_df = warning_data_df[warning_data_df['停更周期'] > 3][[
# '指标名称', '指标id', '频度', '更新周期', '指标来源', '最后更新时间', '停更周期']]
# # 重命名列名
# warning_data_df = warning_data_df.rename(columns={'指标名称': 'INDICATOR_NAME', '指标id': 'INDICATOR_ID', '频度': 'FREQUENCY',
# '更新周期': 'UPDATE_FREQUENCY', '指标来源': 'DATA_SOURCE', '最后更新时间': 'LAST_UPDATE_DATE', '停更周期': 'UPDATE_SUSPENSION_CYCLE'})
# from sqlalchemy import create_engine
# import urllib
# global password
# if '@' in password:
# password = urllib.parse.quote_plus(password)
# engine = create_engine(
# f'mysql+pymysql://{dbusername}:{password}@{host}:{port}/{dbname}')
# warning_data_df['WARNING_DATE'] = datetime.date.today().strftime(
# "%Y-%m-%d %H:%M:%S")
# warning_data_df['TENANT_CODE'] = 'T0004'
# # 插入数据之前查询表数据然后新增id列
# existing_data = pd.read_sql(f"SELECT * FROM {table_name}", engine)
# if not existing_data.empty:
# max_id = existing_data['ID'].astype(int).max()
# warning_data_df['ID'] = range(
# max_id + 1, max_id + 1 + len(warning_data_df))
# else:
# warning_data_df['ID'] = range(1, 1 + len(warning_data_df))
# warning_data_df.to_sql(
# table_name, con=engine, if_exists='append', index=False)
# if is_update_warning_data:
# upload_warning_info(len(warning_data_df))
# except:
# logger.info('上传预警信息到数据库失败')
# try:
# if is_weekday:
if True:
logger.info('发送特征预警')
# 获取取消订阅的指标ID
# 上传预警信息到数据库 # 上传预警信息到数据库
warning_data_df = df_zhibiaoliebiao.copy() warning_data_df = df_zhibiaoliebiao.copy()
warning_data_df = warning_data_df[warning_data_df['停更周期'] > 3][[ warning_data_df = warning_data_df[warning_data_df['停更周期'] > 3][[
'指标名称', '指标id', '频度', '更新周期', '指标来源', '最后更新时间', '停更周期']] '指标名称', '指标id', '频度', '更新周期', '指标来源', '最后更新时间', '停更周期']]
# 重命名列名 # 重命名列名
warning_data_df = warning_data_df.rename(columns={'指标名称': 'INDICATOR_NAME', '指标id': 'INDICATOR_ID', '频度': 'FREQUENCY', warning_data_df = warning_data_df.rename(columns={'指标名称': 'indicatorName', '指标id': 'indicatorId', '频度': 'frequency',
'更新周期': 'UPDATE_FREQUENCY', '指标来源': 'DATA_SOURCE', '最后更新时间': 'LAST_UPDATE_DATE', '停更周期': 'UPDATE_SUSPENSION_CYCLE'}) '更新周期': 'UPDATE_FREQUENCY', '指标来源': 'DATA_SOURCE', '最后更新时间': 'LAST_UPDATE_DATE', '停更周期': 'updateSuspensionCycle'})
from sqlalchemy import create_engine
import urllib
global password
if '@' in password:
password = urllib.parse.quote_plus(password)
engine = create_engine( warning_data_df['warningDate'] = datetime.date.today().strftime(
f'mysql+pymysql://{dbusername}:{password}@{host}:{port}/{dbname}')
warning_data_df['WARNING_DATE'] = datetime.date.today().strftime(
"%Y-%m-%d %H:%M:%S") "%Y-%m-%d %H:%M:%S")
warning_data_df['TENANT_CODE'] = 'T0004' warning_data_df['dataSource'] = 8
# 插入数据之前查询表数据然后新增id列 data = warning_data_df.to_json(orient='records', force_ascii=False)
existing_data = pd.read_sql(f"SELECT * FROM {table_name}", engine) data = data.replace('日度', '1')
if not existing_data.empty: data = data.replace('周度', '2')
max_id = existing_data['ID'].astype(int).max() data = data.replace('月度', '3')
warning_data_df['ID'] = range( data = json.loads(data)
max_id + 1, max_id + 1 + len(warning_data_df)) push_waring_market_data(data)
else: # if is_update_warning_data:
warning_data_df['ID'] = range(1, 1 + len(warning_data_df)) # upload_warning_info(len(warning_data_df))
warning_data_df.to_sql( # except:
table_name, con=engine, if_exists='append', index=False) # logger.info('上传预警信息到数据库失败')
if is_update_warning_data:
upload_warning_info(len(warning_data_df))
except:
logger.info('上传预警信息到数据库失败')
if is_corr: # if is_corr:
df = corr_feature(df=df) # df = corr_feature(df=df)
df1 = df.copy() # 备份一下后面特征筛选完之后加入ds y 列用 # df1 = df.copy() # 备份一下后面特征筛选完之后加入ds y 列用
logger.info(f"开始训练模型...") # logger.info(f"开始训练模型...")
row, col = df.shape # row, col = df.shape
now = datetime.datetime.now().strftime('%Y%m%d%H%M%S') # now = datetime.datetime.now().strftime('%Y%m%d%H%M%S')
ex_Model(df, # ex_Model(df,
horizon=global_config['horizon'], # horizon=global_config['horizon'],
input_size=global_config['input_size'], # input_size=global_config['input_size'],
train_steps=global_config['train_steps'], # train_steps=global_config['train_steps'],
val_check_steps=global_config['val_check_steps'], # val_check_steps=global_config['val_check_steps'],
early_stop_patience_steps=global_config['early_stop_patience_steps'], # early_stop_patience_steps=global_config['early_stop_patience_steps'],
is_debug=global_config['is_debug'], # is_debug=global_config['is_debug'],
dataset=global_config['dataset'], # dataset=global_config['dataset'],
is_train=global_config['is_train'], # is_train=global_config['is_train'],
is_fivemodels=global_config['is_fivemodels'], # is_fivemodels=global_config['is_fivemodels'],
val_size=global_config['val_size'], # val_size=global_config['val_size'],
test_size=global_config['test_size'], # test_size=global_config['test_size'],
settings=global_config['settings'], # settings=global_config['settings'],
now=now, # now=now,
etadata=etadata, # etadata=etadata,
modelsindex=global_config['modelsindex'], # modelsindex=global_config['modelsindex'],
data=data, # data=data,
is_eta=global_config['is_eta'], # is_eta=global_config['is_eta'],
end_time=global_config['end_time'], # end_time=global_config['end_time'],
) # )
logger.info('模型训练完成') # logger.info('模型训练完成')
logger.info('训练数据绘图ing') # logger.info('训练数据绘图ing')
model_results3 = model_losss(sqlitedb, end_time=end_time) # model_results3 = model_losss(sqlitedb, end_time=end_time)
logger.info('训练数据绘图end') # logger.info('训练数据绘图end')
# 模型报告 # # 模型报告
logger.info('制作报告ing') # logger.info('制作报告ing')
title = f'{settings}--{end_time}-预测报告' # 报告标题 # title = f'{settings}--{end_time}-预测报告' # 报告标题
reportname = f'Brent原油大模型日度预测--{end_time}.pdf' # 报告文件名 # reportname = f'Brent原油大模型日度预测--{end_time}.pdf' # 报告文件名
reportname = reportname.replace(':', '-') # 替换冒号 # reportname = reportname.replace(':', '-') # 替换冒号
brent_export_pdf(dataset=dataset, # brent_export_pdf(dataset=dataset,
num_models=5 if is_fivemodels else 22, time=end_time, # num_models=5 if is_fivemodels else 22, time=end_time,
reportname=reportname, # reportname=reportname,
inputsize = global_config['horizon'], # inputsize=global_config['horizon'],
sqlitedb=sqlitedb # sqlitedb=sqlitedb
), # ),
logger.info('制作报告end') # logger.info('制作报告end')
logger.info('模型训练完成') # logger.info('模型训练完成')
push_market_value() # push_market_value()
# # LSTM 单变量模型 # # LSTM 单变量模型
# ex_Lstm(df,input_seq_len=input_size,output_seq_len=horizon,is_debug=is_debug,dataset=dataset) # ex_Lstm(df,input_seq_len=input_size,output_seq_len=horizon,is_debug=is_debug,dataset=dataset)

View File

@ -1,40 +0,0 @@
# XGBoost 价格预测分析报告 <span style="color:gray; font-size:0.8em">2025-05-08</span>
## 一、模型实现
### 1. 特征工程
- 使用 5 期历史滞后特征
- 预测未来 10 个时间步长
- 数据集分割比例80% 训练集 / 20% 测试集
### 2. 模型配置
| | learning_rate | max_depth | n_estimators |
| :----------- | ------------: | --------: | -----------: |
| 最佳参数组合 | 1 | 4 | 100 |
## 二、性能评估
### 多步预测误差分析
| 预测步长 | 均方根误差(RMSE) | R² 分数 |
| -------: | ---------------: | -------: |
| 1 | 2.30044 | 0.828193 |
| 2 | 2.74815 | 0.756595 |
| 3 | 3.16346 | 0.679378 |
| 4 | 3.62202 | 0.58126 |
| 5 | 3.77657 | 0.546517 |
## 三、预测结果
### 未来 5 日价格预测
![预测结果](价格预测.png)
| | 预测值 |
|:--------------------|---------:|
| 2025-03-14 00:00:00 | 68.8125 |
| 2025-03-15 00:00:00 | 68.6643 |
| 2025-03-16 00:00:00 | 69.283 |
| 2025-03-17 00:00:00 | 71.7288 |
| 2025-03-18 00:00:00 | 68.8356 |