石油焦周度调试

This commit is contained in:
workpc 2025-05-27 09:42:06 +08:00
parent 93c0820510
commit 46adadb042
3 changed files with 852 additions and 3 deletions

View File

@ -0,0 +1,425 @@
import logging
import os
import logging.handlers
import datetime
from lib.tools import MySQLDB, SQLiteHandler
# eta 接口token
APPID = "XNLDvxZHHugj7wJ7"
SECRET = "iSeU4s6cKKBVbt94htVY1p0sqUMqb2xa"
# eta 接口url
sourcelisturl = 'http://10.189.2.78:8108/v1/edb/source/list'
classifylisturl = 'http://10.189.2.78:8108/v1/edb/classify/list?ClassifyType='
uniquecodedataurl = 'http://10.189.2.78:8108/v1/edb/data?UniqueCode=4991c37becba464609b409909fe4d992&StartDate=2024-02-01'
classifyidlisturl = 'http://10.189.2.78:8108/v1/edb/list?ClassifyId='
edbcodedataurl = 'http://10.189.2.78:8108/v1/edb/data?EdbCode='
edbdatapushurl = 'http://10.189.2.78:8108/v1/edb/push'
edbdeleteurl = 'http://10.189.2.78:8108/v1/edb/business/edb/del'
edbbusinessurl = 'http://10.189.2.78:8108/v1/edb/business/data/del'
edbcodenamedict = {
'C2403283369': '预赔阳极加工利润(高端)',
'C2403285560': '预培阳极加工利润(低端)',
'C2403288616': '低硫石油焦煅烧利润',
'S6949656': '平均价:氧化铝:一级:全国',
'S5807052': '氧化铝:一级:贵阳',
'S5443355': '市场价:煤沥青:河北地区',
'S5443357': '市场价:煤沥青:山西地区',
'W000294': '国内主要港口石油焦出货量(隆重)',
'W000293': '日照港库存(隆重)',
'W000292': '港口总库存(隆重)',
'W000283': '主营石油焦产量(隆重)',
'W000282': '地炼石油焦产量(隆重)',
'W000281': '中国石油焦产量(隆重)',
'W000280': '主营石油焦开工负荷率(隆重)',
'W000279': '地炼石油焦开工负荷率(隆重)',
'ID00150273': '石油焦1 # :市场低端价:东北地区(日)',
'ID00150281': '石油焦1 # :市场主流价:东北地区(日)',
'ID00150277': '石油焦1 # :市场高端价:东北地区(日)',
'ID00150289': '石油焦2 # :市场低端价:华东地区(日)',
'ID00150285': '石油焦2 # :市场低端价:西北地区(日)',
'ID00150313': '石油焦2 # :市场主流价:华东地区(日)',
'ID00150309': '石油焦2 # :市场主流价:西北地区(日)',
'ID00150301': '石油焦2 # :市场高端价:华东地区(日)',
'ID00150297': '石油焦2 # :市场高端价:西北地区(日)',
'ID00150321': '石油焦2 # A市场低端价山东',
'ID00150329': '石油焦2 # A市场主流价山东',
'ID00150325': '石油焦2 # A市场高端价山东',
'ID00150337': '石油焦2 # B市场低端价华南地区',
'ID00150341': '石油焦2 # B市场低端价华中地区',
'ID00150361': '石油焦2 # B市场主流价华南地区',
'ID00150365': '石油焦2 # B市场主流价华中地区',
'ID00150349': '石油焦2 # B市场高端价华南地区',
'ID00150353': '石油焦2 # B市场高端价华中地区',
'ID00150333': '石油焦2 # B市场低端价山东',
'ID00150357': '石油焦2 # B市场主流价山东',
'ID00150345': '石油焦2 # B市场高端价山东',
'ID00150369': '石油焦3 # :市场低端价:华中地区(日)',
'ID00150373': '石油焦3 # :市场高端价:华中地区(日)',
'ID00150385': '石油焦3 # A市场高端价山东',
'ID00150393': '石油焦3 # B市场低端价华东地区',
'ID00150409': '石油焦3 # B市场主流价华东地区',
'ID00150401': '石油焦3 # B市场高端价华东地区',
'ID00146589': '海绵焦4 # :出厂价:华中地区:洛阳石化(日)',
'ID01242846': '石油焦4 # B挂牌价华北地区中石化燕山',
'ID01300358': '石油焦3 # C市场低端价山东',
'ID01300357': '石油焦3 # C市场高端价山东',
'ID00150377': '石油焦3 # :市场主流价:华中地区(日)',
'ID01387643': '煅烧焦低硫0.5 % S市场价东北地区',
'ID01387646': '煅烧焦低硫3.5 % S市场价东北地区',
'ID01387660': '煅烧焦中硫3 % S400V市场价山东',
'ID00150381': '石油焦3 # A市场低端价山东',
'ID00150397': '石油焦3 # B市场低端价山东',
'ID00150405': '石油焦3 # B市场高端价山东',
'ID00146545': '海绵焦3B出厂价山东山东东明',
'B3e90b34e4b9e7a6ea3': '石油焦市场均价(元/吨)',
'B6b5c53b270a3af12ac': '石油焦1 # 市场均价(元/吨)',
'B10721189a11c209a20': '石油焦2 # 市场均价(元/吨)',
'B6accfa9d2bf4735a50': '石油焦3 # 市场均价(元/吨)',
'B8a0ab5357569c385a9': '石油焦海绵焦市场均价(元/吨)',
'B19dcf45e22fbfd3e43': '石油焦海绵焦东北1 # A焦低端(元/吨)(百川)',
'B5832a62d1e0fba50b6': '石油焦海绵焦东北1 # A焦高端(元/吨)(百川)',
'B1de4fba026d4609cc7': '石油焦海绵焦东北1 # B焦低端(元/吨)(百川)',
'B38f89180736172490d': '石油焦海绵焦东北1 # B焦高端(元/吨)(百川)',
'B4f847871674c3d77f2': '石油焦海绵焦山东地炼1 # -3#焦(低端)(元/吨)',
'B1aefb8a64a5200adbd': '石油焦海绵焦山东地炼1 # -3#焦(高端)(元/吨)',
'B1df7d0afbfedfb628a': '煅烧焦东北低硫(高端S < 0.5)(元/吨)',
'B5f8f9859635876da28': '煅烧焦东北低硫(低端S < 0.5)(元/吨)',
'B2342a8c5a39fa00348': '煅烧焦华北中硫(高端S < 3.0,钒 < 400)(元/吨)',
'B051f27900397c6a35f': '煅烧焦山东中硫(高端S < 3.0,钒 < 400)(元/吨)',
'Be2a8050a48e86cae1f': '煅烧焦华东中硫(高端S < 3.0,钒 < 400)(元/吨)',
'B4a1811938f85065f6a': '煅烧焦华中中硫(高端S < 3.0,钒 < 400)(元/吨)',
'Bc197d4834ef7fb98ec': '煅烧焦华东高硫(高端S < 3.5,钒 < 400)(元/吨)',
'B62be5dbdb8c6454530': '煅烧焦低硫参考价格(元/吨)(百川)',
'Bdd813140bffc4edfa6': '煅烧焦中硫微量市场均价(元/吨)(百川)',
'B185a597decfc71915a': '预焙阳极山东低端(元/吨)(百川)',
'B1bcde6130de031bd42': '山西 改质沥青(元/吨)',
'Bb9f4a1f6dd32b4ad8a': '山东 改质沥青(元/吨)',
'C2411261557491549': '石油焦市场均价(元/吨)/4DMA',
'C2411271143174617': '石油焦市场均价(元/吨)/9DMA',
'ID01387649': '煅烧焦中硫3 % S350V市场价华东地区',
'ID01387655': '煅烧焦中硫3 % S350V市场价山东',
'RE00010076': '煅烧焦:低硫:生产毛利:东北地区(周)',
'B9d1acaf80383683da3': '石油焦总产量(周)(吨)',
'Bdaa719a38936c8dd76': '石油焦开工率(周)( % )',
'B9459d549a332b200e7': '石油焦行业总库存(周)(吨)',
'Bce6e098b9518370cff': '石油焦工厂库存(周)(吨)',
'B577ce2809772779710': '石油焦市场库存(周)(吨)',
'B5d8c564c62f3e6b77f': '石油焦成本(周)(吨)',
'B43baa98bcaa06c11a5': '石油焦利润(周)(吨)',
'Bdd0c1361d94081211c': '煅烧石油焦总产量(周)(吨)',
'B65315111fa28951b1e': '煅烧石油焦开工率(周)( % )',
'B2aff5f2632a20027d0': '煅烧石油焦行业总库存(周)(吨)',
'B29fbd31128cd71b212': '煅烧石油焦工厂库存(周)(吨)',
'B7a88313a89d1261c53': '煅烧石油焦成本(周)(吨)',
'Bd4fa36b4decec0aafa': '煅烧石油焦利润(周)(吨)',
'B9bd80eac7df81ffbd4': '预焙阳极总产量(周)(吨)',
'B27074786605f4660d2': '预焙阳极开工率(周)( % )',
'Bdc2a5985ecb56b6a0c': '预焙阳极行业总库存(周)(吨)',
'Bce8511f899e487e5b6': '预焙阳极工厂库存(周)(吨)',
'B13ec89105bd866a2bd': '预焙阳极成本(周)(吨)',
'B66c3abcfa15a2e611c': '预焙阳极利润(周)(吨)',
'Bf7efe3200f9abc0453': '电解铝开工率(周)( % )',
'Be193166f347267b1a7': '电解铝行业总库存(周)(吨)',
'Baa744fc97769353175': '电解铝工厂库存(周)(吨)',
'Bf9654603913cfc5282': '电解铝市场库存(周)(吨)',
'Bef1535c96da0d70fbc': '电解铝利润(周)(吨)',
'B7d1d0b24316d49cbdc': '煤沥青总产量(周)(吨)',
'B4303fb002ea1c214da': '煤沥青开工率(周)( % )',
'Be9a470c97e9efe660c': '煤沥青行业总库存(周)(吨)',
'B50d4d87f6b78bca587': '煤沥青工厂库存(周)(吨)',
'B46cc7d0a90155b5bfd': '煅烧焦山东高硫(高端S < 3.5,普货)(元/吨)'
}
edbcodelist = edbcodenamedict.keys()
# 临时写死用指定的列,与上面的edbcode对应后面更改
edbnamelist = ['ds', 'y']+[edbcodenamedict[edbcodename]
for edbcodename in edbcodelist]
# eta自有数据指标编码石油焦铝用还没新增暂且留空
# eta自有数据指标编码 次周,隔周
bdwdname = [
'次周',
'隔周',
]
modelsindex = {
}
# 百川数据指标编码
baichuanidnamedict = {
'1588348470396480901': '石油焦滨州-友泰',
'1588348470396480903': '石油焦东营-海科瑞林',
'1588348470396480902': '石油焦东营-华联2',
'1588348470396481080': '石油焦东营-华联3',
'1588348470396480905': '石油焦东营-联合',
'1588348470396481081': '石油焦东营-联合3',
'1588348470396480915': '石油焦淄博-汇丰',
'1588348470396480888': '石油焦沧州-鑫海',
'1588348470396480917': '石油焦东营-万通',
'1588348470396480925': '石油焦东营-齐润',
'1588348470396481084': '石油焦东营-尚能4',
'1588348470396480930': '石油焦潍坊-寿光鲁清',
'1588348470396480929': '石油焦滨州-鑫岳'
}
# baichuanidnamedict = {'1588348470396475286': 'test1', '1666': 'test2'} # 北京环境测试用
# eta 上传预测结果的请求体,后面发起请求的时候更改 model datalist 数据
data = {
"IndexCode": "",
"IndexName": "石油焦铝用价格预测xx模型-yy",
"Unit": "",
"Frequency": "日度",
"SourceName": f"价格预测",
"Remark": 'ddd',
"DataList": [
{
"Date": "2024-05-02",
"Value": 333444
}
]
}
# eta 分类
# level3才可以获取到数据所以需要人工把能源化工下所有的level3级都找到
# url = 'http://10.189.2.78:8108/v1/edb/list?ClassifyId=1214'
# ParentId ":1160, 能源化工
# ClassifyId ":1214,原油 3912 石油焦
# ParentId ":1214,",就是原油下所有的数据。
ClassifyId = 3707
# 变量定义--测试环境
server_host = '192.168.100.53' # 内网
# server_host = '183.242.74.28' # 外网
login_pushreport_url = f"http://{server_host}:8080/jingbo-dev/api/server/login"
# 上传报告
upload_url = f"http://{server_host}:8080/jingbo-dev/api/analysis/reportInfo/researchUploadReportSave"
# 停更预警
upload_warning_url = f"http://{server_host}:8080/jingbo-dev/api/basicBuiness/crudeOilWarning/save"
# 查询数据项编码
query_data_list_item_nos_url = f"http://{server_host}:8080/jingbo-dev/api/warehouse/dwDataItem/queryDataListItemNos"
# 上传数据项值
push_data_value_list_url = f"http://{server_host}:8080/jingbo-dev/api/dw/dataValue/pushDataValueList"
login_data = {
"data": {
"account": "api_test",
# "password": "MmVmNzNlOWI0MmY0ZDdjZGUwNzE3ZjFiMDJiZDZjZWU=", # Shihua@123456
"password": "ZTEwYWRjMzk0OWJhNTlhYmJlNTZlMDU3ZjIwZjg4M2U=", # 123456
"tenantHashCode": "8a4577dbd919675758d57999a1e891fe",
"terminal": "API"
},
"funcModule": "API",
"funcOperation": "获取token"
}
upload_data = {
"groupNo": '', # 用户组id
"funcModule": '研究报告信息',
"funcOperation": '上传原油价格预测报告',
"data": {
"ownerAccount": 'arui', # 报告所属用户账号
"reportType": 'OIL_PRICE_FORECAST', # 报告类型固定为OIL_PRICE_FORECAST
"fileName": '2000-40-5-50--100-原油指标数据.xlsx-Brent活跃合约--2024-09-06-15-01-29-预测报告.pdf', # 文件名称
"fileBase64": '', # 文件内容base64
"categoryNo": 'yyjgycbg', # 研究报告分类编码
"smartBusinessClassCode": 'YCJGYCBG', # 分析报告分类编码
"reportEmployeeCode": "E40116", # 报告人
"reportDeptCode": "D0044", # 报告部门
"productGroupCode": "RAW_MATERIAL" # 商品分类
}
}
warning_data = {
"groupNo": '', # 用户组id
"funcModule": '原油特征停更预警',
"funcOperation": '原油特征停更预警',
"data": {
'WARNING_TYPE_NAME': '特征数据停更预警',
'WARNING_CONTENT': '',
'WARNING_DATE': ''
}
}
query_data_list_item_nos_data = {
"funcModule": "数据项",
"funcOperation": "查询",
"data": {
"dateStart": "20200101",
"dateEnd": "20241231",
"dataItemNoList": ["Brentzdj", "Brentzgj"] # 数据项编码,代表 brent最低价和最高价
}
}
push_data_value_list_data = {
"funcModule": "数据表信息列表",
"funcOperation": "新增",
"data": [
{"dataItemNo": "91230600716676129",
"dataDate": "20230113",
"dataStatus": "add",
"dataValue": 100.11
},
{"dataItemNo": "91230600716676129P|ETHYL_BEN|CAPACITY",
"dataDate": "20230113",
"dataStatus": "add",
"dataValue": 100.55
},
{"dataItemNo": "91230600716676129P|ETHYL_BEN|CAPACITY",
"dataDate": "20230113",
"dataStatus": "add",
"dataValue": 100.55
}
]
}
# 八大维度数据项编码
bdwd_items = {
'ciri': 'syjlyycbdwdcr',
'benzhou': 'syjlyycbdwdbz',
'cizhou': 'syjlyycbdwdcz',
'gezhou': 'syjlyycbdwdgz',
'ciyue': 'syjlyycbdwdcy',
'cieryue': 'syjlyycbdwdcey',
'cisanyue': 'syjlyycbdwdcsy',
'cisiyue': 'syjlyycbdwdcsiy',
}
# 北京环境数据库
host = '192.168.101.27'
port = 3306
dbusername = 'root'
password = '123456'
dbname = 'jingbo_test'
# 京博测试环境
# host = 'rm-2zehj3r1n60ttz9x5ko.mysql.rds.aliyuncs.com'
# port = 3306
# dbusername = 'jingbo'
# password = 'shihua@123'
# dbname = 'jingbo-test'
table_name = 'v_tbl_crude_oil_warning'
baichuan_table_name = 'V_TBL_BAICHUAN_YINGFU_VALUE'
# select BAICHUAN_ID, DATA_DATE, DATA_VALUE from V_TBL_BAICHUAN_YINGFU_VALUE where BAICHUAN_ID in ('1588348470396475286', '1666')
# 开关
is_train = True # 是否训练
is_debug = False # 是否调试
is_eta = True # 是否使用eta接口
is_market = False # 是否通过市场信息平台获取特征 ,在is_eta 为true 的情况下生效
is_timefurture = True # 是否使用时间特征
is_fivemodels = False # 是否使用之前保存的最佳的5个模型
is_edbcode = False # 特征使用edbcoding列表中的
is_edbnamelist = False # 自定义特征对应上面的edbnamelist
is_update_eta = True # 预测结果上传到eta
is_update_report = True # 是否上传报告
is_update_warning_data = False # 是否上传预警数据
is_update_predict_value = True # 是否上传预测值到市场信息平台
is_del_corr = 0.6 # 是否删除相关性高的特征,取值为 0-1 0 为不删除0.6 表示删除相关性小于0.6的特征
is_del_tow_month = False # 是否删除两个月不更新的特征
is_bdwd = False # 是否使用八大维度
# 连接到数据库
db_mysql = MySQLDB(host=host, user=dbusername,
password=password, database=dbname)
db_mysql.connect()
print("数据库连接成功", host, dbname, dbusername)
# 数据截取日期
start_year = 2015 # 数据开始年份
end_time = '' # 数据截取日期
freq = 'WW' # 时间频率,"D": 天 "W": 周"M": 月"Q": 季度"A": 年 "H": 小时 "T": 分钟 "S": 秒 "B": 工作日 "WW" 自定义周
delweekenday = True if freq == 'B' else False # 是否删除周末数据
is_corr = False # 特征是否参与滞后领先提升相关系数
add_kdj = False # 是否添加kdj指标
if add_kdj and is_edbnamelist:
edbnamelist = edbnamelist+['K', 'D', 'J']
# 模型参数
y = '石油焦3 # B市场低端价山东'
avg_cols = [
]
offsite = 80
offsite_col = []
horizon = 2 # 预测的步长
input_size = 14 # 输入序列长度
train_steps = 50 if is_debug else 1000 # 训练步数,用来限定epoch次数
val_check_steps = 30 # 评估频率
early_stop_patience_steps = 5 # 早停的耐心步数
# --- 交叉验证用的参数
test_size = 200 # 测试集大小定义100后面使用的时候重新赋值
val_size = test_size # 验证集大小,同测试集大小
# 特征筛选用到的参数
k = 100 # 特征筛选数量如果是0或者值比特征数量大代表全部特征
corr_threshold = 0.6 # 相关性大于0.6的特征
rote = 0.06 # 绘图上下界阈值
# 计算准确率
weight_dict = [0.4, 0.15, 0.1, 0.1, 0.25] # 权重
# 文件
data_set = '石油焦铝用指标数据.xlsx' # 数据集文件
dataset = 'shiyoujiaolvyongzhoududataset' # 数据集文件夹
# 数据库名称
db_name = os.path.join(dataset, 'jbsh_shiyoujiao_lvyong_zhoudu.db')
sqlitedb = SQLiteHandler(db_name)
sqlitedb.connect()
settings = f'{input_size}-{horizon}-{train_steps}--{k}-{data_set}'
# 获取日期时间
# now = datetime.datetime.now().strftime('%Y%m%d%H%M%S') # 获取当前日期时间
now = datetime.datetime.now().strftime('%Y-%m-%d') # 获取当前日期时间
reportname = f'石油焦铝用大模型周度预测报告--{end_time}.pdf' # 报告文件名
reportname = reportname.replace(':', '-') # 替换冒号
if end_time == '':
end_time = now
# 邮件配置
username = '1321340118@qq.com'
passwd = 'wgczgyhtyyyyjghi'
# recv=['liurui_test@163.com','52585119@qq.com']
recv = ['liurui_test@163.com']
# recv=['liurui_test@163.com']
title = 'reportname'
content = y+'预测报告请看附件'
file = os.path.join(dataset, 'reportname')
# file=os.path.join(dataset,'14-7-50--100-原油指标数据.xlsx-Brent连1合约价格--20240731175936-预测报告.pdf')
ssl = True
# 日志配置
# 创建日志目录(如果不存在)
log_dir = 'logs'
if not os.path.exists(log_dir):
os.makedirs(log_dir)
# 配置日志记录器
logger = logging.getLogger('my_logger')
logger.setLevel(logging.INFO)
# 配置文件处理器,将日志记录到文件
file_handler = logging.handlers.RotatingFileHandler(os.path.join(
log_dir, 'pricepredict.log'), maxBytes=1024 * 1024, backupCount=5)
file_handler.setFormatter(logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'))
# 配置控制台处理器,将日志打印到控制台
console_handler = logging.StreamHandler()
console_handler.setFormatter(logging.Formatter('%(message)s'))
# 将处理器添加到日志记录器
logger.addHandler(file_handler)
logger.addHandler(console_handler)
# logger.info('当前配置:'+settings)

View File

@ -2285,7 +2285,7 @@ def push_market_data(data):
return json_data
def push_waring_market_data(data,dataSource=8):
def push_waring_market_data(data, dataSource=8):
'''
dataSource : 8 原油 9 聚烯烃PP
上传停更预警数据到市场信息平台
@ -2313,7 +2313,8 @@ def push_waring_market_data(data,dataSource=8):
token = get_head_auth_report()
# 定义请求参数
global_config['push_waring_data_value_list_data']['data']["crudeOilWarningDtoList"] = data
global_config['push_waring_data_value_list_data']['data']["dataSource"] = str(dataSource)
global_config['push_waring_data_value_list_data']['data']["dataSource"] = str(
dataSource)
# 发送请求
headers = {"Authorization": token}
config.logger.info('上传数据中...')
@ -2413,7 +2414,7 @@ def get_baichuan_data(baichuanidnamedict):
startdate = str(config.start_year)+'0101'
# 连接数据库
db = config.db_mysql
if db is None:
if db.cursor is None:
db.connect()
# 执行SQL查询 select BAICHUAN_ID,DATA_DATE,DATA_VALUE from V_TBL_BAICHUAN_YINGFU_VALUE where BAICHUAN_ID in ('1588348470396475286','1666');
sql = f"SELECT BAICHUAN_ID,DATA_DATE,DATA_VALUE FROM {global_config['baichuan_table_name']} WHERE DATA_DATE >= {startdate} AND BAICHUAN_ID in ({','.join(baichuanidlist)})"

View File

@ -0,0 +1,423 @@
# 读取配置
from lib.dataread import *
from config_shiyoujiao_lvyong_zhoudu import *
from lib.tools import SendMail, exception_logger
from models.nerulforcastmodels import ex_Model, model_losss, shiyoujiao_lvyong_export_pdf
import datetime
import torch
torch.set_float32_matmul_precision("high")
global_config.update({
# 核心参数
'logger': logger,
'dataset': dataset,
'y': y,
'is_debug': is_debug,
'is_train': is_train,
'is_fivemodels': is_fivemodels,
'is_update_report': is_update_report,
'settings': settings,
'weight_dict': weight_dict,
'baichuanidnamedict': baichuanidnamedict,
'bdwdname': bdwdname,
# 模型参数
'data_set': data_set,
'input_size': input_size,
'horizon': horizon,
'train_steps': train_steps,
'val_check_steps': val_check_steps,
'val_size': val_size,
'test_size': test_size,
'modelsindex': modelsindex,
'rote': rote,
'bdwd_items': bdwd_items,
'baichuanidnamedict': baichuanidnamedict,
# 特征工程开关
'is_del_corr': is_del_corr,
'is_del_tow_month': is_del_tow_month,
'is_eta': is_eta,
'is_update_eta': is_update_eta,
'is_fivemodels': is_fivemodels,
'is_update_predict_value': is_update_predict_value,
'early_stop_patience_steps': early_stop_patience_steps,
# 时间参数
'start_year': start_year,
'end_time': end_time or datetime.datetime.now().strftime("%Y-%m-%d"),
'freq': freq, # 保持列表结构
# 接口配置
'login_pushreport_url': login_pushreport_url,
'login_data': login_data,
'upload_url': upload_url,
'upload_warning_url': upload_warning_url,
'warning_data': warning_data,
# 查询接口
'query_data_list_item_nos_url': query_data_list_item_nos_url,
'query_data_list_item_nos_data': query_data_list_item_nos_data,
# 上传数据项
'push_data_value_list_url': push_data_value_list_url,
'push_data_value_list_data': push_data_value_list_data,
# eta 配置
'APPID': APPID,
'SECRET': SECRET,
'etadata': data,
'edbcodelist': edbcodelist,
'ClassifyId': ClassifyId,
'edbcodedataurl': edbcodedataurl,
'classifyidlisturl': classifyidlisturl,
'edbdatapushurl': edbdatapushurl,
'edbdeleteurl': edbdeleteurl,
'edbbusinessurl': edbbusinessurl,
'edbcodenamedict': edbcodenamedict,
'ClassifyId': ClassifyId,
'classifylisturl': classifylisturl,
# 数据库配置
'sqlitedb': sqlitedb,
'db_mysql': db_mysql,
'baichuan_table_name': baichuan_table_name,
})
def push_market_value():
logger.info('发送预测结果到市场信息平台')
# 读取预测数据和模型评估数据
predict_file_path = os.path.join(config.dataset, 'predict.csv')
model_eval_file_path = os.path.join(config.dataset, 'model_evaluation.csv')
try:
predictdata_df = pd.read_csv(predict_file_path)
top_models_df = pd.read_csv(model_eval_file_path)
except FileNotFoundError as e:
logger.error(f"文件未找到: {e}")
return
predictdata = predictdata_df.copy()
# 取模型前十
top_models = top_models_df['模型(Model)'].head(10).tolist()
# 去掉FDBformer
if 'FEDformer' in top_models:
top_models.remove('FEDformer')
# 计算前十模型的均值
predictdata_df['top_models_mean'] = predictdata_df[top_models].mean(axis=1)
# 打印日期和前十模型均值
print(predictdata_df[['ds', 'top_models_mean']])
# 准备要推送的数据
first_mean = predictdata_df['top_models_mean'].iloc[0]
last_mean = predictdata_df['top_models_mean'].iloc[-1]
# 保留两位小数
first_mean = round(first_mean, 2)
last_mean = round(last_mean, 2)
predictdata = [
{
"dataItemNo": global_config['bdwd_items']['cizhou'],
"dataDate": global_config['end_time'].replace('-', ''),
"dataStatus": "add",
"dataValue": first_mean
},
{
"dataItemNo": global_config['bdwd_items']['gezhou'],
"dataDate": global_config['end_time'].replace('-', ''),
"dataStatus": "add",
"dataValue": last_mean
}
]
print(predictdata)
# 推送数据到市场信息平台
try:
push_market_data(predictdata)
except Exception as e:
logger.error(f"推送数据失败: {e}")
def predict_main():
"""
主预测函数用于从 ETA 获取数据处理数据训练模型并进行预测
参数:
signature (BinanceAPI): Binance API 实例
etadata (EtaReader): ETA 数据读取器实例
is_eta (bool): 是否从 ETA 获取数据
data_set (str): 数据集名称
dataset (str): 数据集路径
add_kdj (bool): 是否添加 KDJ 指标
is_timefurture (bool): 是否添加时间衍生特征
end_time (str): 结束时间
is_edbnamelist (bool): 是否使用 EDB 名称列表
edbnamelist (list): EDB 名称列表
y (str): 预测目标列名
sqlitedb (SQLiteDB): SQLite 数据库实例
is_corr (bool): 是否进行相关性分析
horizon (int): 预测时域
input_size (int): 输入数据大小
train_steps (int): 训练步数
val_check_steps (int): 验证检查步数
early_stop_patience_steps (int): 早停耐心步数
is_debug (bool): 是否调试模式
dataset (str): 数据集名称
is_train (bool): 是否训练模型
is_fivemodels (bool): 是否使用五个模型
val_size (float): 验证集大小
test_size (float): 测试集大小
settings (dict): 模型设置
now (str): 当前时间
etadata (EtaReader): ETA 数据读取器实例
modelsindex (list): 模型索引列表
data (str): 数据类型
is_eta (bool): 是否从 ETA 获取数据
返回:
None
"""
end_time = global_config['end_time']
# 获取数据
if is_eta:
logger.info('从eta获取数据...')
signature = BinanceAPI(APPID, SECRET)
etadata = EtaReader(signature=signature,
classifylisturl=global_config['classifylisturl'],
classifyidlisturl=global_config['classifyidlisturl'],
edbcodedataurl=global_config['edbcodedataurl'],
edbcodelist=global_config['edbcodelist'],
edbdatapushurl=global_config['edbdatapushurl'],
edbdeleteurl=global_config['edbdeleteurl'],
edbbusinessurl=global_config['edbbusinessurl'],
classifyId=global_config['ClassifyId'],
)
df_zhibiaoshuju, df_zhibiaoliebiao = etadata.get_eta_api_shiyoujiao_lvyong_data(
data_set=data_set, dataset=dataset) # 原始数据,未处理
if is_market:
logger.info('从市场信息平台获取数据...')
try:
# 如果是测试环境最高价最低价取excel文档
if server_host == '192.168.100.53':
logger.info('从excel文档获取最高价最低价')
df_zhibiaoshuju = get_high_low_data(df_zhibiaoshuju)
else:
logger.info('从市场信息平台获取数据')
df_zhibiaoshuju = get_market_data(
end_time, df_zhibiaoshuju)
except:
logger.info('最高最低价拼接失败')
if len(global_config['baichuanidnamedict']) > 0:
logger.info('从市场数据库获取百川数据...')
baichuandf = get_baichuan_data(global_config['baichuanidnamedict'])
df_zhibiaoshuju = pd.merge(
df_zhibiaoshuju, baichuandf, on='date', how='outer')
# 指标列表添加百川数据
df_baichuanliebiao = pd.DataFrame(
global_config['baichuanidnamedict'].items(), columns=['指标id', '指标名称'])
df_baichuanliebiao['指标分类'] = '石油焦对标炼厂价格'
df_baichuanliebiao['频度'] = '其他'
df_zhibiaoliebiao = pd.concat(
[df_zhibiaoliebiao, df_baichuanliebiao], axis=0)
# 保存到xlsx文件的sheet表
with pd.ExcelWriter(os.path.join(dataset, data_set)) as file:
df_zhibiaoshuju.to_excel(file, sheet_name='指标数据', index=False)
df_zhibiaoliebiao.to_excel(file, sheet_name='指标列表', index=False)
# 数据处理
df = datachuli(df_zhibiaoshuju, df_zhibiaoliebiao, y=global_config['y'], dataset=dataset, add_kdj=add_kdj, is_timefurture=is_timefurture,
end_time=end_time)
else:
# 读取数据
logger.info('读取本地数据:' + os.path.join(dataset, data_set))
df, df_zhibiaoliebiao = getdata(filename=os.path.join(dataset, data_set), y=y, dataset=dataset, add_kdj=add_kdj,
is_timefurture=is_timefurture, end_time=end_time) # 原始数据,未处理
# 更改预测列名称
df.rename(columns={y: 'y'}, inplace=True)
if is_edbnamelist:
df = df[edbnamelist]
df.to_csv(os.path.join(dataset, '指标数据.csv'), index=False)
# 保存最新日期的y值到数据库
# 取第一行数据存储到数据库中
first_row = df[['ds', 'y']].tail(1)
# 判断y的类型是否为float
if not isinstance(first_row['y'].values[0], float):
logger.info(f'{end_time}预测目标数据为空,跳过')
return None
# 将最新真实值保存到数据库
if not sqlitedb.check_table_exists('trueandpredict'):
first_row.to_sql('trueandpredict', sqlitedb.connection, index=False)
else:
for row in first_row.itertuples(index=False):
row_dict = row._asdict()
config.logger.info(f'要保存的真实值:{row_dict}')
# 判断ds是否为字符串类型,如果不是则转换为字符串类型
if isinstance(row_dict['ds'], (pd.Timestamp, datetime.datetime)):
row_dict['ds'] = row_dict['ds'].strftime('%Y-%m-%d')
elif not isinstance(row_dict['ds'], str):
try:
row_dict['ds'] = pd.to_datetime(
row_dict['ds']).strftime('%Y-%m-%d')
except:
logger.warning(f"无法解析的时间格式: {row_dict['ds']}")
# row_dict['ds'] = row_dict['ds'].strftime('%Y-%m-%d')
# row_dict['ds'] = row_dict['ds'].strftime('%Y-%m-%d %H:%M:%S')
check_query = sqlitedb.select_data(
'trueandpredict', where_condition=f"ds = '{row.ds}'")
if len(check_query) > 0:
set_clause = ", ".join(
[f"{key} = '{value}'" for key, value in row_dict.items()])
sqlitedb.update_data(
'trueandpredict', set_clause, where_condition=f"ds = '{row.ds}'")
continue
sqlitedb.insert_data('trueandpredict', tuple(
row_dict.values()), columns=row_dict.keys())
# 更新accuracy表的y值
if not sqlitedb.check_table_exists('accuracy'):
pass
else:
update_y = sqlitedb.select_data(
'accuracy', where_condition="y is null")
if len(update_y) > 0:
logger.info('更新accuracy表的y值')
# 找到update_y 中ds且df中的y的行
update_y = update_y[update_y['ds'] <= end_time]
logger.info(f'要更新y的信息{update_y}')
# try:
for row in update_y.itertuples(index=False):
try:
row_dict = row._asdict()
yy = df[df['ds'] == row_dict['ds']]['y'].values[0]
LOW = df[df['ds'] == row_dict['ds']]['Brentzdj'].values[0]
HIGH = df[df['ds'] == row_dict['ds']]['Brentzgj'].values[0]
sqlitedb.update_data(
'accuracy', f"y = {yy},LOW_PRICE = {LOW},HIGH_PRICE = {HIGH}", where_condition=f"ds = '{row_dict['ds']}'")
except:
logger.info(f'更新accuracy表的y值失败{row_dict}')
# except Exception as e:
# logger.info(f'更新accuracy表的y值失败{e}')
# 判断当前日期是不是周一
is_weekday = datetime.datetime.now().weekday() == 0
if is_weekday:
logger.info('今天是周一,更新预测模型')
# 计算最近60天预测残差最低的模型名称
model_results = sqlitedb.select_data(
'trueandpredict', order_by="ds DESC", limit="60")
# 删除空值率为90%以上的列
if len(model_results) > 10:
model_results = model_results.dropna(
thresh=len(model_results)*0.1, axis=1)
# 删除空行
model_results = model_results.dropna()
modelnames = model_results.columns.to_list()[2:-2]
for col in model_results[modelnames].select_dtypes(include=['object']).columns:
model_results[col] = model_results[col].astype(np.float32)
# 计算每个预测值与真实值之间的偏差率
for model in modelnames:
model_results[f'{model}_abs_error_rate'] = abs(
model_results['y'] - model_results[model]) / model_results['y']
# 获取每行对应的最小偏差率值
min_abs_error_rate_values = model_results.apply(
lambda row: row[[f'{model}_abs_error_rate' for model in modelnames]].min(), axis=1)
# 获取每行对应的最小偏差率值对应的列名
min_abs_error_rate_column_name = model_results.apply(
lambda row: row[[f'{model}_abs_error_rate' for model in modelnames]].idxmin(), axis=1)
# 将列名索引转换为列名
min_abs_error_rate_column_name = min_abs_error_rate_column_name.map(
lambda x: x.split('_')[0])
# 取出现次数最多的模型名称
most_common_model = min_abs_error_rate_column_name.value_counts().idxmax()
logger.info(f"最近60天预测残差最低的模型名称{most_common_model}")
# 保存结果到数据库
if not sqlitedb.check_table_exists('most_model'):
sqlitedb.create_table(
'most_model', columns="ds datetime, most_common_model TEXT")
sqlitedb.insert_data('most_model', (datetime.datetime.now().strftime(
'%Y-%m-%d %H:%M:%S'), most_common_model,), columns=('ds', 'most_common_model',))
if is_corr:
df = corr_feature(df=df)
df1 = df.copy() # 备份一下后面特征筛选完之后加入ds y 列用
logger.info(f"开始训练模型...")
row, col = df.shape
now = datetime.datetime.now().strftime('%Y%m%d%H%M%S')
ex_Model(df,
horizon=global_config['horizon'],
input_size=global_config['input_size'],
train_steps=global_config['train_steps'],
val_check_steps=global_config['val_check_steps'],
early_stop_patience_steps=global_config['early_stop_patience_steps'],
is_debug=global_config['is_debug'],
dataset=global_config['dataset'],
is_train=global_config['is_train'],
is_fivemodels=global_config['is_fivemodels'],
val_size=global_config['val_size'],
test_size=global_config['test_size'],
settings=global_config['settings'],
now=now,
etadata=global_config['etadata'],
modelsindex=global_config['modelsindex'],
data=data,
is_eta=global_config['is_eta'],
end_time=global_config['end_time'],
)
logger.info('模型训练完成')
logger.info('训练数据绘图ing')
model_results3 = model_losss(sqlitedb, end_time=end_time)
logger.info('训练数据绘图end')
# 模型报告
logger.info('制作报告ing')
title = f'{settings}--{end_time}-预测报告' # 报告标题
reportname = '石油焦大模型铝用渠道.pdf' # 报告文件名
# reportname = f'石油焦铝用大模型周度预测--{end_time}.pdf' # 报告文件名
# reportname = reportname.replace(':', '-') # 替换冒号
shiyoujiao_lvyong_export_pdf(dataset=dataset, num_models=5 if is_fivemodels else 22, time=end_time,
reportname=reportname, sqlitedb=sqlitedb),
logger.info('制作报告end')
logger.info('模型训练完成')
push_market_value()
# 发送邮件
# m = SendMail(
# username=username,
# passwd=passwd,
# recv=recv,
# title=title,
# content=content,
# file=max(glob.glob(os.path.join(dataset,'*.pdf')), key=os.path.getctime),
# ssl=ssl,
# )
# m.send_mail()
if __name__ == '__main__':
# global end_time
# # 遍历2024-11-25 到 2024-12-3 之间的工作日日期
# for i_time in pd.date_range('2024-12-1', '2025-2-26', freq='W'):
# end_time = i_time.strftime('%Y-%m-%d')
# predict_main()
predict_main()