石油焦铝用日度预测调试完成

This commit is contained in:
workpc 2025-03-25 10:38:29 +08:00
parent 313e9e229d
commit 5191ff7c69
12 changed files with 1138 additions and 254 deletions

View File

@ -18,10 +18,22 @@
"import numpy as np\n",
"# 变量定义\n",
"login_url = \"http://10.200.32.39/jingbo-api/api/server/login\"\n",
"search_url = \"http://10.200.32.39/jingbo-api/api/warehouse/dwDataItem/queryByItemNos\"\n",
"# query_data_list_item_nos_url\n",
"search_url = \"http://10.200.32.39/jingbo-api/api/warehouse/dwDataItem/queryByItemNos\" #jingbo-dev/api/warehouse/dwDataItem/queryDataListItemNos\n",
"upload_url = \"http://10.200.32.39/jingbo-api/api/dw/dataValue/pushDataValueList\"\n",
"\n",
"\n",
"query_data_list_item_nos_data = {\n",
" \"funcModule\": \"数据项\",\n",
" \"funcOperation\": \"查询\",\n",
" \"data\": {\n",
" \"dateStart\": \"20200101\",\n",
" \"dateEnd\": \"20241231\",\n",
" \"dataItemNoList\": [\"Brentzdj\", \"Brentzgj\"] # 数据项编码,代表 brent最低价和最高价\n",
" }\n",
"}\n",
"\n",
"\n",
"login_data = {\n",
" \"data\": {\n",
" \"account\": \"api_dev\",\n",
@ -844,7 +856,7 @@
" # headers1 = {\"Authorization\": token_push}\n",
" # res = requests.post(url=upload_url, headers=headers1, json=data1, timeout=(3, 5))\n",
" \n",
" \n",
"\n",
"\n",
"\n",
"\n",
@ -960,8 +972,6 @@
" # 保存新的xls文件\n",
" new_workbook.save(\"定性模型数据项12-11.xlsx\")\n",
"\n",
"\n",
"\n",
"\n"
]
},

View File

@ -2,7 +2,7 @@
"cells": [
{
"cell_type": "code",
"execution_count": 3,
"execution_count": null,
"metadata": {},
"outputs": [
{

View File

@ -2,7 +2,7 @@
"cells": [
{
"cell_type": "code",
"execution_count": 27,
"execution_count": null,
"metadata": {},
"outputs": [
{

320
config_shiyoujiao.py Normal file
View File

@ -0,0 +1,320 @@
import logging
import os
import logging.handlers
import datetime
from lib.tools import MySQLDB,SQLiteHandler
# eta 接口token
APPID = "XNLDvxZHHugj7wJ7"
SECRET = "iSeU4s6cKKBVbt94htVY1p0sqUMqb2xa"
# eta 接口url
sourcelisturl = 'http://10.189.2.78:8108/v1/edb/source/list'
classifylisturl = 'http://10.189.2.78:8108/v1/edb/classify/list?ClassifyType='
uniquecodedataurl = 'http://10.189.2.78:8108/v1/edb/data?UniqueCode=4991c37becba464609b409909fe4d992&StartDate=2024-02-01'
classifyidlisturl = 'http://10.189.2.78:8108/v1/edb/list?ClassifyId='
edbcodedataurl = 'http://10.189.2.78:8108/v1/edb/data?EdbCode='
edbdatapushurl = 'http://10.189.2.78:8108/v1/edb/push'
edbdeleteurl = 'http://10.189.2.78:8108/v1/edb/business/edb/del'
edbbusinessurl = 'http://10.189.2.78:8108/v1/edb/business/data/del'
edbcodelist = ['ID01385938','lmcads03 lme comdty',
'GC1 COMB Comdty',
'C2404171822',
'dxy curncy',
'S5443199 ',
'S5479800',
'S5443108',
'H7358586',
'LC3FM1 INDEX',
'CNY REGN Curncy',
's0105897',
'M0067419',
'M0066351',
'S0266372',
'S0266438',
'S0266506',
'ID01384463']
# 临时写死用指定的列,与上面的edbcode对应后面更改
edbnamelist = [
'ds','y',
'LME铜价',
'黄金连1合约',
'Brent-WTI',
'美元指数',
'甲醇鲁南价格',
'甲醇太仓港口价格',
'山东丙烯主流价',
'丙烷(山东)',
'FEI丙烷 M1',
'在岸人民币汇率',
'南华工业品指数',
'PVC期货主力',
'PE期货收盘价',
'PP连续-1月',
'PP连续-5月',
'PP连续-9月',
'PP拉丝L5E89出厂价华北第二区域内蒙古久泰新材料'
]
edbcodenamedict = {
'ID01385938':'PP拉丝1102K市场价青州国家能源宁煤',
'ID01384463':'PP拉丝L5E89出厂价华北第二区域内蒙古久泰新材料',
'lmcads03 lme comdty':'LME铜价',
'GC1 COMB Comdty':'黄金连1合约',
'C2404171822':'Brent-WTI',
'dxy curncy':'美元指数',
'S5443199 ':'甲醇鲁南价格',
'S5479800':'甲醇太仓港口价格',
'S5443108':'山东丙烯主流价',
'H7358586':'丙烷(山东)',
'LC3FM1 INDEX':'FEI丙烷 M1',
'CNY REGN Curncy':'在岸人民币汇率',
's0105897':'南华工业品指数',
'M0067419':'PVC期货主力',
'M0066351':'PE期货收盘价',
'S0266372':'PP连续-1月',
'S0266438':'PP连续-5月',
'S0266506':'PP连续-9月',
}
# eta自有数据指标编码
modelsindex = {
'NHITS': 'SELF0000077',
'Informer':'SELF0000078',
'LSTM':'SELF0000079',
'iTransformer':'SELF0000080',
'TSMixer':'SELF0000081',
'TSMixerx':'SELF0000082',
'PatchTST':'SELF0000083',
'RNN':'SELF0000084',
'GRU':'SELF0000085',
'TCN':'SELF0000086',
'BiTCN':'SELF0000087',
'DilatedRNN':'SELF0000088',
'MLP':'SELF0000089',
'DLinear':'SELF0000090',
'NLinear':'SELF0000091',
'TFT':'SELF0000092',
'FEDformer':'SELF0000093',
'StemGNN':'SELF0000094',
'MLPMultivariate':'SELF0000095',
'TiDE':'SELF0000096',
'DeepNPTS':'SELF0000097'
}
# eta 上传预测结果的请求体,后面发起请求的时候更改 model datalist 数据
data = {
"IndexCode": "",
"IndexName": "价格预测模型",
"Unit": "",
"Frequency": "日度",
"SourceName": f"价格预测",
"Remark": 'ddd',
"DataList": [
{
"Date": "2024-05-02",
"Value": 333444
}
]
}
# eta 分类
# level3才可以获取到数据所以需要人工把能源化工下所有的level3级都找到
# url = 'http://10.189.2.78:8108/v1/edb/list?ClassifyId=1214'
#ParentId ":1160, 能源化工
# ClassifyId ":1214,原油 3912 石油焦
#ParentId ":1214,",就是原油下所有的数据。
ClassifyId = 3707
############################################################################################################### 变量定义--测试环境
server_host = '192.168.100.53'
login_pushreport_url = f"http://{server_host}:8080/jingbo-dev/api/server/login"
upload_url = f"http://{server_host}:8080/jingbo-dev/api/analysis/reportInfo/researchUploadReportSave"
upload_warning_url = f"http://{server_host}:8080/jingbo-dev/api/basicBuiness/crudeOilWarning/save"
query_data_list_item_nos_url = f"http://{server_host}:8080/jingbo-dev/api/warehouse/dwDataItem/queryDataListItemNos"
login_data = {
"data": {
"account": "api_test",
# "password": "MmVmNzNlOWI0MmY0ZDdjZGUwNzE3ZjFiMDJiZDZjZWU=", # Shihua@123456
"password": "ZTEwYWRjMzk0OWJhNTlhYmJlNTZlMDU3ZjIwZjg4M2U=", # 123456
"tenantHashCode": "8a4577dbd919675758d57999a1e891fe",
"terminal": "API"
},
"funcModule": "API",
"funcOperation": "获取token"
}
upload_data = {
"funcModule":'研究报告信息',
"funcOperation":'上传聚烯烃PP价格预测报告',
"data":{
"groupNo":'000128', # 用户组编号
"ownerAccount":'arui', #报告所属用户账号
"reportType":'OIL_PRICE_FORECAST', # 报告类型固定为OIL_PRICE_FORECAST
"fileName": '2000-40-5-50--100-原油指标数据.xlsx-Brent活跃合约--2024-09-06-15-01-29-预测报告.pdf', #文件名称
"fileBase64": '' ,#文件内容base64
"categoryNo":'yyjgycbg', # 研究报告分类编码
"smartBusinessClassCode":'JXTJGYCBG', #分析报告分类编码
"reportEmployeeCode":"E40116", # 报告人
"reportDeptCode" :"D0044" ,# 报告部门
"productGroupCode":"RAW_MATERIAL" # 商品分类
}
}
warning_data = {
"groupNo":'000128', # 用户组编号
"funcModule":'原油特征停更预警',
"funcOperation":'原油特征停更预警',
"data":{
'WARNING_TYPE_NAME':'特征数据停更预警',
'WARNING_CONTENT':'',
'WARNING_DATE':''
}
}
query_data_list_item_nos_data = {
"funcModule": "数据项",
"funcOperation": "查询",
"data": {
"dateStart":"20200101",
"dateEnd":"20241231",
"dataItemNoList":["Brentzdj","Brentzgj"] # 数据项编码,代表 brent最低价和最高价
}
}
# 北京环境数据库
host = '192.168.101.27'
port = 3306
dbusername ='root'
password = '123456'
dbname = 'jingbo_test'
table_name = 'v_tbl_crude_oil_warning'
### 开关
is_train = False # 是否训练
is_debug = True # 是否调试
is_eta = True # 是否使用eta接口
is_market = False # 是否通过市场信息平台获取特征 ,在is_eta 为true 的情况下生效
is_timefurture = True # 是否使用时间特征
is_fivemodels = False # 是否使用之前保存的最佳的5个模型
is_edbcode = False # 特征使用edbcoding列表中的
is_edbnamelist = False # 自定义特征对应上面的edbnamelist
is_update_eta = False # 预测结果上传到eta
is_update_report = True # 是否上传报告
is_update_warning_data = False # 是否上传预警数据
is_del_corr = 0.6 # 是否删除相关性高的特征,取值为 0-1 0 为不删除0.6 表示删除相关性小于0.6的特征
is_del_tow_month = True # 是否删除两个月不更新的特征
# 连接到数据库
db_mysql = MySQLDB(host=host, user=dbusername, password=password, database=dbname)
db_mysql.connect()
print("数据库连接成功",host,dbname,dbusername)
# 数据截取日期
start_year = 2020 # 数据开始年份
end_time = '' # 数据截取日期
freq = 'B' # 时间频率,"D": 天 "W": 周"M": 月"Q": 季度"A": 年 "H": 小时 "T": 分钟 "S": 秒 "B": 工作日
delweekenday = True if freq == 'B' else False # 是否删除周末数据
is_corr = False # 特征是否参与滞后领先提升相关系数
add_kdj = False # 是否添加kdj指标
if add_kdj and is_edbnamelist:
edbnamelist = edbnamelist+['K','D','J']
### 模型参数
y = 'AVG-金能大唐久泰青州'
avg_cols = [
'PP拉丝1102K出厂价青州国家能源宁煤',
'PP拉丝L5E89出厂价华北第二区域内蒙古久泰新材料',
'PP拉丝L5E89出厂价河北、鲁北大唐内蒙多伦',
'PP拉丝HP550J市场价青岛金能化学'
]
offsite = 80
offsite_col = ['PP拉丝HP550J市场价青岛金能化学']
horizon =5 # 预测的步长
input_size = 40 # 输入序列长度
train_steps = 50 if is_debug else 1000 # 训练步数,用来限定epoch次数
val_check_steps = 30 # 评估频率
early_stop_patience_steps = 5 # 早停的耐心步数
# --- 交叉验证用的参数
test_size = 200 # 测试集大小定义100后面使用的时候重新赋值
val_size = test_size # 验证集大小,同测试集大小
### 特征筛选用到的参数
k = 100 # 特征筛选数量如果是0或者值比特征数量大代表全部特征
corr_threshold = 0.6 # 相关性大于0.6的特征
rote = 0.06 # 绘图上下界阈值
### 计算准确率
weight_dict = [0.4,0.15,0.1,0.1,0.25] # 权重
### 文件
data_set = '石油焦指标数据.xlsx' # 数据集文件
dataset = 'shiyoujiaodataset' # 数据集文件夹
# 数据库名称
db_name = os.path.join(dataset,'jbsh_juxiting.db')
sqlitedb = SQLiteHandler(db_name)
sqlitedb.connect()
settings = f'{input_size}-{horizon}-{train_steps}--{k}-{data_set}-{y}'
# 获取日期时间
# now = datetime.datetime.now().strftime('%Y%m%d%H%M%S') # 获取当前日期时间
now = datetime.datetime.now().strftime('%Y-%m-%d') # 获取当前日期时间
reportname = f'PP大模型预测报告--{end_time}.pdf' # 报告文件名
reportname = reportname.replace(':', '-') # 替换冒号
if end_time == '':
end_time = now
### 邮件配置
username='1321340118@qq.com'
passwd='wgczgyhtyyyyjghi'
# recv=['liurui_test@163.com','52585119@qq.com']
recv=['liurui_test@163.com']
# recv=['liurui_test@163.com']
title='reportname'
content=y+'预测报告请看附件'
file=os.path.join(dataset,'reportname')
# file=os.path.join(dataset,'14-7-50--100-原油指标数据.xlsx-Brent连1合约价格--20240731175936-预测报告.pdf')
ssl=True
### 日志配置
# 创建日志目录(如果不存在)
log_dir = 'logs'
if not os.path.exists(log_dir):
os.makedirs(log_dir)
# 配置日志记录器
logger = logging.getLogger('my_logger')
logger.setLevel(logging.INFO)
# 配置文件处理器,将日志记录到文件
file_handler = logging.handlers.RotatingFileHandler(os.path.join(log_dir, 'pricepredict.log'), maxBytes=1024 * 1024, backupCount=5)
file_handler.setFormatter(logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s'))
# 配置控制台处理器,将日志打印到控制台
console_handler = logging.StreamHandler()
console_handler.setFormatter(logging.Formatter('%(message)s'))
# 将处理器添加到日志记录器
logger.addHandler(file_handler)
logger.addHandler(console_handler)
# logger.info('当前配置:'+settings)

View File

@ -143,7 +143,7 @@ modelsindex = {
}
# 百川数据指标编码
baicangidnamedict = {
baichuanidnamedict = {
'1588348470396480000': '石油焦滨州-友泰',
'1588348470396480000.00': '石油焦东营-海科瑞林',
'1588348470396480000.00': '石油焦东营-华联2',
@ -160,6 +160,8 @@ baicangidnamedict = {
}
# baichuanidnamedict = {'1588348470396475286': 'test1', '1666': 'test2'} # 北京环境测试用
# eta 上传预测结果的请求体,后面发起请求的时候更改 model datalist 数据
data = {
"IndexCode": "",
@ -272,14 +274,14 @@ push_data_value_list_data = {
}
# 八大维度数据项编码
bdwd_items = {
# 'ciri': 'yyycbdwdcr',
# 'benzhou': 'yyycbdwdbz',
# 'cizhou': 'yyycbdwdcz',
# 'gezhou': 'yyycbdwdgz',
# 'ciyue': 'yyycbdwdcy',
# 'cieryue': 'yyycbdwdcey',
# 'cisanyue': 'yyycbdwdcsy',
# 'cisiyue': 'yyycbdwdcsiy',
'ciri': 'syjlyycbdwdcr',
'benzhou': 'syjlyycbdwdbz',
'cizhou': 'syjlyycbdwdcz',
'gezhou': 'syjlyycbdwdgz',
'ciyue': 'syjlyycbdwdcy',
'cieryue': 'syjlyycbdwdcey',
'cisanyue': 'syjlyycbdwdcsy',
'cisiyue': 'syjlyycbdwdcsiy',
}
# 北京环境数据库
@ -326,7 +328,7 @@ if add_kdj and is_edbnamelist:
edbnamelist = edbnamelist+['K', 'D', 'J']
# 模型参数
y = 'B46cc7d0a90155b5bfd'
y = '煅烧焦山东高硫(高端S < 3.5,普货)(元/吨)'
avg_cols = [
]

View File

@ -57,6 +57,7 @@ global_config = {
'y': None, # 目标变量列名
'is_fivemodels': None,
'weight_dict': None,
'baicangidnamedict': None, # 百川id名称映射
# 模型参数
'data_set': None, # 数据集名称
@ -120,6 +121,8 @@ global_config = {
# 数据库配置
'sqlitedb': None,
'db_mysql': None,
'baichuan_table_name': None,
}
# 定义函数
@ -1199,6 +1202,8 @@ class Config:
# 数据库配置
@property
def sqlitedb(self): return global_config['sqlitedb']
@property
def db_mysql(self): return global_config['db_mysql']
config = Config()
@ -2213,3 +2218,38 @@ def addtimecharacteristics(df, dataset):
df.drop(columns=['quarter_start', 'quarter'], inplace=True)
df.to_csv(os.path.join(dataset, '指标数据添加时间特征.csv'), index=False)
return df
# 从数据库获取百川数据接收一个百川id列表返回df格式的数据
def get_baichuan_data(baichuanidnamedict):
baichuanidlist = list(baichuanidnamedict.keys())
# 连接数据库
db = config.db_mysql
db.connect()
# 执行SQL查询 select BAICHUAN_ID,DATA_DATE,DATA_VALUE from V_TBL_BAICHUAN_YINGFU_VALUE where BAICHUAN_ID in ('1588348470396475286','1666');
sql = f"SELECT BAICHUAN_ID,DATA_DATE,DATA_VALUE FROM {global_config['baichuan_table_name']} WHERE BAICHUAN_ID in ({','.join(baichuanidlist)})"
# 获取查询结果
results = db.execute_query(sql)
df = pd.DataFrame(results, columns=[
'BAICHUAN_ID', 'DATA_DATE', 'DATA_VALUE'])
# 按BAICHUAN_ID 进行分组然后按DATA_DATE合并
df1 = pd.DataFrame(columns=['DATA_DATE'])
for baichuan_id, group in df.groupby('BAICHUAN_ID'):
# group 删除BAICHUAN_ID列
group.drop(columns=['BAICHUAN_ID'], inplace=True)
# group DATA_value 转换为float类型,保留两位小数
group['DATA_VALUE'] = group['DATA_VALUE'].astype(float).round(2)
# group 更改列名
group.rename(
columns={'DATA_VALUE': baichuanidnamedict[baichuan_id]}, inplace=True)
# 按DATA_DATE合并
df1 = pd.merge(
df1, group[['DATA_DATE', baichuanidnamedict[baichuan_id]]], on='DATA_DATE', how='outer')
# 把DATA_DATE 列转换成日期格式
df1['date'] = pd.to_datetime(
df1['DATA_DATE']).dt.strftime('%Y-%m-%d')
df1.drop(columns=['DATA_DATE'], inplace=True)
return df1

301
main_shiyoujiao.py Normal file
View File

@ -0,0 +1,301 @@
# 读取配置
from lib.dataread import *
from lib.tools import SendMail,exception_logger
from models.nerulforcastmodels import ex_Model_Juxiting,model_losss,model_losss_juxiting,brent_export_pdf,tansuanli_export_pdf,pp_export_pdf,model_losss_juxiting
import glob
import torch
torch.set_float32_matmul_precision("high")
def predict_main():
"""
主预测函数用于从 ETA 获取数据处理数据训练模型并进行预测
参数:
signature (BinanceAPI): Binance API 实例
etadata (EtaReader): ETA 数据读取器实例
is_eta (bool): 是否从 ETA 获取数据
data_set (str): 数据集名称
dataset (str): 数据集路径
add_kdj (bool): 是否添加 KDJ 指标
is_timefurture (bool): 是否添加时间衍生特征
end_time (str): 结束时间
is_edbnamelist (bool): 是否使用 EDB 名称列表
edbnamelist (list): EDB 名称列表
y (str): 预测目标列名
sqlitedb (SQLiteDB): SQLite 数据库实例
is_corr (bool): 是否进行相关性分析
horizon (int): 预测时域
input_size (int): 输入数据大小
train_steps (int): 训练步数
val_check_steps (int): 验证检查步数
early_stop_patience_steps (int): 早停耐心步数
is_debug (bool): 是否调试模式
dataset (str): 数据集名称
is_train (bool): 是否训练模型
is_fivemodels (bool): 是否使用五个模型
val_size (float): 验证集大小
test_size (float): 测试集大小
settings (dict): 模型设置
now (str): 当前时间
etadata (EtaReader): ETA 数据读取器实例
modelsindex (list): 模型索引列表
data (str): 数据类型
is_eta (bool): 是否从 ETA 获取数据
返回:
None
"""
global end_time
signature = BinanceAPI(APPID, SECRET)
etadata = EtaReader(signature=signature,
classifylisturl=classifylisturl,
classifyidlisturl=classifyidlisturl,
edbcodedataurl=edbcodedataurl,
edbcodelist=edbcodelist,
edbdatapushurl=edbdatapushurl,
edbdeleteurl=edbdeleteurl,
edbbusinessurl=edbbusinessurl
)
# 获取数据
if is_eta:
logger.info('从eta获取数据...')
signature = BinanceAPI(APPID, SECRET)
etadata = EtaReader(signature=signature,
classifylisturl=classifylisturl,
classifyidlisturl=classifyidlisturl,
edbcodedataurl=edbcodedataurl,
edbcodelist=edbcodelist,
edbdatapushurl=edbdatapushurl,
edbdeleteurl=edbdeleteurl,
edbbusinessurl=edbbusinessurl,
)
df_zhibiaoshuju, df_zhibiaoliebiao = etadata.get_eta_api_shiyoujiao_data(data_set=data_set, dataset=dataset) # 原始数据,未处理
if is_market:
logger.info('从市场信息平台获取数据...')
try:
# 如果是测试环境最高价最低价取excel文档
if server_host == '192.168.100.53':
logger.info('从excel文档获取最高价最低价')
df_zhibiaoshuju = get_high_low_data(df_zhibiaoshuju)
else:
logger.info('从市场信息平台获取数据')
df_zhibiaoshuju = get_market_data(end_time,df_zhibiaoshuju)
except :
logger.info('最高最低价拼接失败')
# 保存到xlsx文件的sheet表
with pd.ExcelWriter(os.path.join(dataset,data_set)) as file:
df_zhibiaoshuju.to_excel(file, sheet_name='指标数据', index=False)
df_zhibiaoliebiao.to_excel(file, sheet_name='指标列表', index=False)
# 数据处理
df = datachuli_juxiting(df_zhibiaoshuju, df_zhibiaoliebiao, y=y, dataset=dataset, add_kdj=add_kdj, is_timefurture=is_timefurture,
end_time=end_time)
else:
# 读取数据
logger.info('读取本地数据:' + os.path.join(dataset, data_set))
df,df_zhibiaoliebiao = getdata_juxiting(filename=os.path.join(dataset, data_set), y=y, dataset=dataset, add_kdj=add_kdj,
is_timefurture=is_timefurture, end_time=end_time) # 原始数据,未处理
# 更改预测列名称
df.rename(columns={y: 'y'}, inplace=True)
if is_edbnamelist:
df = df[edbnamelist]
df.to_csv(os.path.join(dataset, '指标数据.csv'), index=False)
# 保存最新日期的y值到数据库
# 取第一行数据存储到数据库中
first_row = df[['ds', 'y']].tail(1)
# 判断y的类型是否为float
if not isinstance(first_row['y'].values[0], float):
logger.info(f'{end_time}预测目标数据为空,跳过')
return None
# 将最新真实值保存到数据库
if not sqlitedb.check_table_exists('trueandpredict'):
first_row.to_sql('trueandpredict', sqlitedb.connection, index=False)
else:
for row in first_row.itertuples(index=False):
row_dict = row._asdict()
row_dict['ds'] = row_dict['ds'].strftime('%Y-%m-%d %H:%M:%S')
check_query = sqlitedb.select_data('trueandpredict', where_condition=f"ds = '{row.ds}'")
if len(check_query) > 0:
set_clause = ", ".join([f"{key} = '{value}'" for key, value in row_dict.items()])
sqlitedb.update_data('trueandpredict', set_clause, where_condition=f"ds = '{row.ds}'")
continue
sqlitedb.insert_data('trueandpredict', tuple(row_dict.values()), columns=row_dict.keys())
# 更新accuracy表的y值
if not sqlitedb.check_table_exists('accuracy'):
pass
else:
update_y = sqlitedb.select_data('accuracy',where_condition="y is null")
if len(update_y) > 0:
logger.info('更新accuracy表的y值')
# 找到update_y 中ds且df中的y的行
update_y = update_y[update_y['ds']<=end_time]
logger.info(f'要更新y的信息{update_y}')
# try:
for row in update_y.itertuples(index=False):
try:
row_dict = row._asdict()
yy = df[df['ds']==row_dict['ds']]['y'].values[0]
LOW = df[df['ds']==row_dict['ds']]['Brentzdj'].values[0]
HIGH = df[df['ds']==row_dict['ds']]['Brentzgj'].values[0]
sqlitedb.update_data('accuracy', f"y = {yy},LOW_PRICE = {LOW},HIGH_PRICE = {HIGH}", where_condition=f"ds = '{row_dict['ds']}'")
except:
logger.info(f'更新accuracy表的y值失败{row_dict}')
# except Exception as e:
# logger.info(f'更新accuracy表的y值失败{e}')
import datetime
# 判断当前日期是不是周一
is_weekday = datetime.datetime.now().weekday() == 0
if is_weekday:
logger.info('今天是周一,更新预测模型')
# 计算最近60天预测残差最低的模型名称
model_results = sqlitedb.select_data('trueandpredict', order_by="ds DESC", limit="60")
# 删除空值率为90%以上的列
if len(model_results) > 10:
model_results = model_results.dropna(thresh=len(model_results)*0.1,axis=1)
# 删除空行
model_results = model_results.dropna()
modelnames = model_results.columns.to_list()[2:-1]
for col in model_results[modelnames].select_dtypes(include=['object']).columns:
model_results[col] = model_results[col].astype(np.float32)
# 计算每个预测值与真实值之间的偏差率
for model in modelnames:
model_results[f'{model}_abs_error_rate'] = abs(model_results['y'] - model_results[model]) / model_results['y']
# 获取每行对应的最小偏差率值
min_abs_error_rate_values = model_results.apply(lambda row: row[[f'{model}_abs_error_rate' for model in modelnames]].min(), axis=1)
# 获取每行对应的最小偏差率值对应的列名
min_abs_error_rate_column_name = model_results.apply(lambda row: row[[f'{model}_abs_error_rate' for model in modelnames]].idxmin(), axis=1)
# 将列名索引转换为列名
min_abs_error_rate_column_name = min_abs_error_rate_column_name.map(lambda x: x.split('_')[0])
# 取出现次数最多的模型名称
most_common_model = min_abs_error_rate_column_name.value_counts().idxmax()
logger.info(f"最近60天预测残差最低的模型名称{most_common_model}")
# 保存结果到数据库
if not sqlitedb.check_table_exists('most_model'):
sqlitedb.create_table('most_model', columns="ds datetime, most_common_model TEXT")
sqlitedb.insert_data('most_model', (datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), most_common_model,), columns=('ds', 'most_common_model',))
try:
if is_weekday:
# if True:
logger.info('今天是周一,发送特征预警')
# 上传预警信息到数据库
warning_data_df = df_zhibiaoliebiao.copy()
warning_data_df = warning_data_df[warning_data_df['停更周期']> 3 ][['指标名称', '指标id', '频度','更新周期','指标来源','最后更新时间','停更周期']]
# 重命名列名
warning_data_df = warning_data_df.rename(columns={'指标名称': 'INDICATOR_NAME', '指标id': 'INDICATOR_ID', '频度': 'FREQUENCY', '更新周期': 'UPDATE_FREQUENCY', '指标来源': 'DATA_SOURCE', '最后更新时间': 'LAST_UPDATE_DATE', '停更周期': 'UPDATE_SUSPENSION_CYCLE'})
from sqlalchemy import create_engine
import urllib
global password
if '@' in password:
password = urllib.parse.quote_plus(password)
engine = create_engine(f'mysql+pymysql://{dbusername}:{password}@{host}:{port}/{dbname}')
warning_data_df['WARNING_DATE'] = datetime.date.today().strftime("%Y-%m-%d %H:%M:%S")
warning_data_df['TENANT_CODE'] = 'T0004'
# 插入数据之前查询表数据然后新增id列
existing_data = pd.read_sql(f"SELECT * FROM {table_name}", engine)
if not existing_data.empty:
max_id = existing_data['ID'].astype(int).max()
warning_data_df['ID'] = range(max_id + 1, max_id + 1 + len(warning_data_df))
else:
warning_data_df['ID'] = range(1, 1 + len(warning_data_df))
warning_data_df.to_sql(table_name, con=engine, if_exists='append', index=False)
if is_update_warning_data:
upload_warning_info(len(warning_data_df))
except:
logger.info('上传预警信息到数据库失败')
if is_corr:
df = corr_feature(df=df)
df1 = df.copy() # 备份一下后面特征筛选完之后加入ds y 列用
logger.info(f"开始训练模型...")
row, col = df.shape
now = datetime.datetime.now().strftime('%Y%m%d%H%M%S')
ex_Model_Juxiting(df,
horizon=horizon,
input_size=input_size,
train_steps=train_steps,
val_check_steps=val_check_steps,
early_stop_patience_steps=early_stop_patience_steps,
is_debug=is_debug,
dataset=dataset,
is_train=is_train,
is_fivemodels=is_fivemodels,
val_size=val_size,
test_size=test_size,
settings=settings,
now=now,
etadata=etadata,
modelsindex=modelsindex,
data=data,
is_eta=is_eta,
end_time=end_time,
)
logger.info('模型训练完成')
logger.info('训练数据绘图ing')
model_results3 = model_losss_juxiting(sqlitedb)
logger.info('训练数据绘图end')
# 模型报告
logger.info('制作报告ing')
title = f'{settings}--{end_time}-预测报告' # 报告标题
reportname = f'PP大模型预测报告--{end_time}.pdf' # 报告文件名
reportname = reportname.replace(':', '-') # 替换冒号
pp_export_pdf(dataset=dataset,num_models = 5 if is_fivemodels else 22,time=end_time,
reportname=reportname,sqlitedb=sqlitedb),
logger.info('制作报告end')
logger.info('模型训练完成')
# # LSTM 单变量模型
# ex_Lstm(df,input_seq_len=input_size,output_seq_len=horizon,is_debug=is_debug,dataset=dataset)
# # lstm 多变量模型
# ex_Lstm_M(df,n_days=input_size,out_days=horizon,is_debug=is_debug,datasetpath=dataset)
# # GRU 模型
# # ex_GRU(df)
# 发送邮件
m = SendMail(
username=username,
passwd=passwd,
recv=recv,
title=title,
content=content,
file=max(glob.glob(os.path.join(dataset,'*.pdf')), key=os.path.getctime),
ssl=ssl,
)
# m.send_mail()
if __name__ == '__main__':
# global end_time
# is_on = True
# # 遍历2024-11-25 到 2024-12-3 之间的工作日日期
# for i_time in pd.date_range('2025-1-20', '2025-2-6', freq='B'):
# end_time = i_time.strftime('%Y-%m-%d')
# try:
# predict_main()
# except:
# pass
predict_main()

View File

@ -18,6 +18,7 @@ global_config.update({
'is_fivemodels': is_fivemodels,
'settings': settings,
'weight_dict': weight_dict,
'baichuanidnamedict': baichuanidnamedict,
# 模型参数
@ -72,11 +73,14 @@ global_config.update({
'edbdatapushurl': edbdatapushurl,
'edbdeleteurl': edbdeleteurl,
'edbbusinessurl': edbbusinessurl,
'edbcodenamedict': edbcodenamedict,
'ClassifyId': ClassifyId,
'classifylisturl': classifylisturl,
# 数据库配置
'sqlitedb': sqlitedb,
'db_mysql': db_mysql,
'baichuan_table_name': baichuan_table_name,
})
@ -173,6 +177,7 @@ def predict_main():
返回:
None
"""
end_time = global_config['end_time']
# 获取数据
if is_eta:
@ -206,6 +211,19 @@ def predict_main():
except:
logger.info('最高最低价拼接失败')
if len(global_config['baichuanidnamedict']) > 0:
logger.info('从市场数据库获取百川数据...')
baichuandf = get_baichuan_data(global_config['baichuanidnamedict'])
df_zhibiaoshuju = pd.merge(
df_zhibiaoshuju, baichuandf, on='date', how='outer')
# 指标列表添加百川数据
df_baichuanliebiao = pd.DataFrame(
global_config['baichuanidnamedict'].items(), columns=['指标id', '指标名称'])
df_baichuanliebiao['指标分类'] = '百川'
df_baichuanliebiao['频度'] = '其他'
df_zhibiaoliebiao = pd.concat(
[df_zhibiaoliebiao, df_baichuanliebiao], axis=0)
# 保存到xlsx文件的sheet表
with pd.ExcelWriter(os.path.join(dataset, data_set)) as file:
df_zhibiaoshuju.to_excel(file, sheet_name='指标数据', index=False)

View File

@ -11,9 +11,11 @@ import logging.handlers
import os
import re
import requests
import json
class EtaReader():
def __init__(self,signature,classifylisturl,classifyidlisturl,edbcodedataurl,edbcodelist,edbdatapushurl,edbdeleteurl,edbbusinessurl):
def __init__(self, signature, classifylisturl, classifyidlisturl, edbcodedataurl, edbcodelist, edbdatapushurl, edbdeleteurl, edbbusinessurl):
'''
初始化 EtaReader 类的实例
@ -39,17 +41,16 @@ class EtaReader():
self.edbdeleteurl = edbdeleteurl
self.edbbusinessurl = edbbusinessurl
def filter_yuanyou_data(self,ClassifyName,data):
def filter_yuanyou_data(self, ClassifyName, data):
'''
指标名称保留规则
'''
# 包含 关键词 去除, 返回flase
if any(keyword in data for keyword in ['运费','检修','波动率','地缘政治','股价',
'同比','环比','环差','裂差','4WMA','变频','道琼斯','标普500','纳斯达克',
'四周均值','名占比','残差','DMA',
'连7-连9','4周平均','4周均值','滚动相关性','日本']):
if any(keyword in data for keyword in ['运费', '检修', '波动率', '地缘政治', '股价',
'同比', '环比', '环差', '裂差', '4WMA', '变频', '道琼斯', '标普500', '纳斯达克',
'四周均值', '名占比', '残差', 'DMA',
'连7-连9', '4周平均', '4周均值', '滚动相关性', '日本']):
return False
# 检查需要的特征
@ -59,7 +60,7 @@ class EtaReader():
# 保留 库存中特殊关键词
if ClassifyName == '库存':
if any(keyword in data for keyword in ['原油' , '美国' ,'全球' ,'中国' ,'富查伊拉','ARA' ]):
if any(keyword in data for keyword in ['原油', '美国', '全球', '中国', '富查伊拉', 'ARA']):
return True
else:
pass
@ -77,8 +78,8 @@ class EtaReader():
# 去掉 航班中不是中国、美国 的数据
if ClassifyName == '需求':
if '航班' in data :
if '中国' in data or '美国' in data :
if '航班' in data:
if '中国' in data or '美国' in data:
return True
else:
return False
@ -95,7 +96,7 @@ class EtaReader():
c = int(data.split('c1-c')[1])
except:
return False
if c > 9 :
if c > 9:
return False
else:
pass
@ -104,24 +105,24 @@ class EtaReader():
pass
# 判断 同质性数据, 字符串开头
strstartdict = {'ICE Brent c':"ICE Brent c14",
'NYMWX WTI c':"NYMWX WTI c5",
'INE SC c':"INE SC c1",
'EFS c':"EFS c",
'Dubai Swap c':"Dubai Swap c1",
'Oman Swap c':"Oman Swap c1",
'DME Oman c':"DME Oman c1",
'Murban Futures c':"Murban Futures c1",
'Dubai连合约价格':'Dubai连1合约价格',
'美国RBOB期货月份合约价格':'美国RBOB期货2309月份合约价格',
'Brent连合约价格':'Brent连1合约价格',
'WTI连合约价格':'WTI连1合约价格',
'布伦特连合约价格':'Brent连1合约价格',
'Brent 连合约价格':'Brent连1合约价格',
'Dubai连合约价格':'Dubai连1合约价格',
'Brent连':'Brent连1合约价格',
'brent连':'Brent连1合约价格',
}
strstartdict = {'ICE Brent c': "ICE Brent c14",
'NYMWX WTI c': "NYMWX WTI c5",
'INE SC c': "INE SC c1",
'EFS c': "EFS c",
'Dubai Swap c': "Dubai Swap c1",
'Oman Swap c': "Oman Swap c1",
'DME Oman c': "DME Oman c1",
'Murban Futures c': "Murban Futures c1",
'Dubai连合约价格': 'Dubai连1合约价格',
'美国RBOB期货月份合约价格': '美国RBOB期货2309月份合约价格',
'Brent连合约价格': 'Brent连1合约价格',
'WTI连合约价格': 'WTI连1合约价格',
'布伦特连合约价格': 'Brent连1合约价格',
'Brent 连合约价格': 'Brent连1合约价格',
'Dubai连合约价格': 'Dubai连1合约价格',
'Brent连': 'Brent连1合约价格',
'brent连': 'Brent连1合约价格',
}
# 判断名称字符串开头是否在 strstartdict.keys中
match = re.match(r'([a-zA-Z\s]+)(\d+)', data)
if match:
@ -135,11 +136,11 @@ class EtaReader():
# data = 'Brent 连7合约价格'
# 判断名称字符串去掉数字后是否在 strstartdict.keys中
match = re.findall(r'\D+', data)
if match :
if match:
if len(match) == 2:
part1 = match[0]
part2 = match[1]
if part1+part2 in [i for i in strstartdict.keys()]:
if part1+part2 in [i for i in strstartdict.keys()]:
if data == strstartdict[part1+part2]:
return True
else:
@ -150,7 +151,7 @@ class EtaReader():
match = re.findall(r'\D+', data)
part1 = match[0]
if part1 in [i for i in strstartdict.keys()]:
if part1 in [i for i in strstartdict.keys()]:
if data == strstartdict[part1]:
return True
else:
@ -166,7 +167,7 @@ class EtaReader():
return True
def filter_pp_data(self,ClassifyName,data):
def filter_pp_data(self, ClassifyName, data):
'''
指标名称保留规则
'''
@ -181,8 +182,6 @@ class EtaReader():
if any(keyword in data for keyword in ['拉丝']):
return True
# 检查需要的特征
# 去掉 期货市场 分类下的数据
if ClassifyName == '期货市场':
@ -214,18 +213,16 @@ class EtaReader():
else:
pass
# 保留 需求 下所有指标
if ClassifyName == '需求':
return True
else:
pass
return True
# 通过edbcode 获取指标数据
def edbcodegetdata(self,df,EdbCode,EdbName):
def edbcodegetdata(self, df, EdbCode, EdbName):
# 根据指标id获取指标数据
url = self.edbcodedataurl+str(EdbCode)
# 发送GET请求
@ -236,7 +233,8 @@ class EtaReader():
data = response.json() # 假设接口返回的是JSON数据
all_data_items = data.get('Data')
# 列表转换为DataFrame
df3 = pd.DataFrame(all_data_items, columns=['DataTime', 'Value', 'UpdateTime'])
df3 = pd.DataFrame(all_data_items, columns=[
'DataTime', 'Value', 'UpdateTime'])
# df3 = pd.read_json(all_data_items, orient='records')
# 去掉UpdateTime 列
@ -244,7 +242,8 @@ class EtaReader():
# df3.set_index('DataTime')
df3.rename(columns={'Value': EdbName}, inplace=True)
# 将数据存储df1
df = pd.merge(df, df3, how='outer',on='DataTime',suffixes= ('', '_y'))
df = pd.merge(df, df3, how='outer',
on='DataTime', suffixes=('', '_y'))
# 按时间排序
df = df.sort_values(by='DataTime', ascending=True)
return df
@ -255,7 +254,7 @@ class EtaReader():
# 主动抛出异常
raise Exception(f'Error: {response.status_code}, {response.text}')
def get_eta_api_yuanyou_data(self,data_set,dataset=''):
def get_eta_api_yuanyou_data(self, data_set, dataset=''):
'''
从ETA API获取原油数据
@ -271,7 +270,8 @@ class EtaReader():
# 定义你的headers这里可以包含多个参数
self.headers = {
'nonce': self.signature.nonce, # 例如,一个认证令牌
'timestamp': str(self.signature.timestamp), # 自定义的header参数
# 自定义的header参数
'timestamp': str(self.signature.timestamp),
'appid': self.signature.APPID, # 另一个自定义的header参数
'signature': self.signature.signature
}
@ -285,16 +285,16 @@ class EtaReader():
'''
# 构建新的DataFrame df df1
df = pd.DataFrame(columns=['指标分类', '指标名称', '指标id', '频度','指标来源','来源id','最后更新时间','更新周期','预警日期','停更周期'])
df = pd.DataFrame(columns=[
'指标分类', '指标名称', '指标id', '频度', '指标来源', '来源id', '最后更新时间', '更新周期', '预警日期', '停更周期'])
df1 = pd.DataFrame(columns=['DataTime'])
# 外网环境无法访问,请确认是否为内网环境
try:
# 发送GET请求 获取指标分类列表
response = requests.get(self.classifylisturl, headers=self.headers)
except requests.exceptions.RequestException as e:
raise Exception(f"请求失败,请确认是否为内网环境: {e}","\033[0m")
raise Exception(f"请求失败,请确认是否为内网环境: {e}", "\033[0m")
# 检查响应状态码
if response.status_code == 200:
@ -304,19 +304,20 @@ class EtaReader():
# 请求成功,处理响应内容
# logger.info(data.get('Data'))
# 定义你想要保留的固定值
fixed_value = 1214
fixed_value = 1193
# 遍历列表,只保留那些'category' key的值为固定值的数据项
filtered_data = [item for item in data.get('Data') if item.get('ParentId') == fixed_value]
filtered_data = [item for item in data.get(
'Data') if item.get('ParentId') == fixed_value]
#然后循环filtered_data去获取list数据才能获取到想要获取的ClassifyId
# 然后循环filtered_data去获取list数据才能获取到想要获取的ClassifyId
n = 0
for item in filtered_data:
n+= 1
n += 1
# if n>50:
# break
ClassifyId = item["ClassifyId"] #分类id分类下的指标列表接口的请求参数
ClassifyName = item["ClassifyName"] #分类名称要保存到df的指标分类列
ClassifyId = item["ClassifyId"] # 分类id分类下的指标列表接口的请求参数
ClassifyName = item["ClassifyName"] # 分类名称要保存到df的指标分类列
# 根据分类id获取指标列表
url = self.classifyidlisturl+str(ClassifyId)
response = requests.get(url, headers=self.headers)
@ -327,12 +328,13 @@ class EtaReader():
for i in Data:
# s+= 1
EdbCode = i.get('EdbCode')
EdbName = i.get('EdbName') # 指标名称要保存到df2的指标名称列,df的指标名称列
# 指标名称要保存到df2的指标名称列,df的指标名称列
EdbName = i.get('EdbName')
Frequency = i.get('Frequency') # 频度要保存到df的频度列
SourceName = i.get('SourceName') # 来源名称要保存到df的频度列
Source = i.get('Source') # 来源ID要保存到df的频度列
# 频度不是 日 或者 周的 跳过
if Frequency not in ['日度','周度','','']:
if Frequency not in ['日度', '周度', '', '']:
continue
# 只保留手工数据中,名称带有 海运出口 海运进口
@ -343,44 +345,53 @@ class EtaReader():
if Source == 2:
continue
# 判断名称是否需要保存
isSave = self.filter_yuanyou_data(ClassifyName,EdbName)
isSave = self.filter_yuanyou_data(
ClassifyName, EdbName)
if isSave:
# 保存到df
df1 = self.edbcodegetdata(df1,EdbCode,EdbName)
df1 = self.edbcodegetdata(df1, EdbCode, EdbName)
# 取df1所有行最后一列
edbname_df = df1[['DataTime',f'{EdbName}']]
edbname_df = df1[['DataTime', f'{EdbName}']]
edbname_df = edbname_df.dropna()
if len(edbname_df) == 0:
logger.info(f'指标名称:{EdbName} 没有数据')
continue
try:
time_sequence = edbname_df['DataTime'].values.tolist()[-10:]
time_sequence = edbname_df['DataTime'].values.tolist(
)[-10:]
except IndexError:
time_sequence = edbname_df['DataTime'].values.tolist()
time_sequence = edbname_df['DataTime'].values.tolist(
)
# 使用Counter来统计每个星期几的出现次数
from collections import Counter
weekday_counter = Counter(datetime.datetime.strptime(time_str, "%Y-%m-%d").strftime('%A') for time_str in time_sequence)
weekday_counter = Counter(datetime.datetime.strptime(
time_str, "%Y-%m-%d").strftime('%A') for time_str in time_sequence)
# 打印出现次数最多的星期几
try:
most_common_weekday = weekday_counter.most_common(1)[0][0]
most_common_weekday = weekday_counter.most_common(1)[
0][0]
# 计算两周后的日期
warning_date = (datetime.datetime.strptime(time_sequence[-1], "%Y-%m-%d") + datetime.timedelta(weeks=2)).strftime("%Y-%m-%d")
stop_update_period = (datetime.datetime.strptime(today, "%Y-%m-%d") - datetime.datetime.strptime(time_sequence[-1], "%Y-%m-%d")).days // 7
warning_date = (datetime.datetime.strptime(
time_sequence[-1], "%Y-%m-%d") + datetime.timedelta(weeks=2)).strftime("%Y-%m-%d")
stop_update_period = (datetime.datetime.strptime(
today, "%Y-%m-%d") - datetime.datetime.strptime(time_sequence[-1], "%Y-%m-%d")).days // 7
except IndexError:
most_common_weekday = '其他'
stop_update_period = 0
if '' in Frequency:
most_common_weekday = '每天'
warning_date = (datetime.datetime.strptime(time_sequence[-1], "%Y-%m-%d") + datetime.timedelta(days=3)).strftime("%Y-%m-%d")
stop_update_period = (datetime.datetime.strptime(today, "%Y-%m-%d") - datetime.datetime.strptime(time_sequence[-1], "%Y-%m-%d")).days
warning_date = (datetime.datetime.strptime(
time_sequence[-1], "%Y-%m-%d") + datetime.timedelta(days=3)).strftime("%Y-%m-%d")
stop_update_period = (datetime.datetime.strptime(
today, "%Y-%m-%d") - datetime.datetime.strptime(time_sequence[-1], "%Y-%m-%d")).days
# 保存频度 指标名称 分类 指标id 到 df
df2 = pd.DataFrame({'指标分类': ClassifyName, '指标名称': EdbName, '指标id': EdbCode, '频度': Frequency,'指标来源':SourceName,'来源id':Source,'最后更新时间':edbname_df['DataTime'].values[-1],'更新周期':most_common_weekday,'预警日期':warning_date,'停更周期':stop_update_period},index=[0])
df2 = pd.DataFrame({'指标分类': ClassifyName, '指标名称': EdbName, '指标id': EdbCode, '频度': Frequency, '指标来源': SourceName, '来源id': Source,
'最后更新时间': edbname_df['DataTime'].values[-1], '更新周期': most_common_weekday, '预警日期': warning_date, '停更周期': stop_update_period}, index=[0])
# df = pd.merge(df, df2, how='outer')
df = pd.concat([df, df2])
@ -388,7 +399,8 @@ class EtaReader():
logger.info(f'跳过指标 {EdbName}')
# 找到列表中不在指标列中的指标id保存成新的list
new_list = [item for item in self.edbcodelist if item not in df['指标id'].tolist()]
new_list = [
item for item in self.edbcodelist if item not in df['指标id'].tolist()]
logger.info(new_list)
# 遍历new_list获取指标数据保存到df1
for item in new_list:
@ -399,33 +411,196 @@ class EtaReader():
except:
itemname = item
df1 = self.edbcodegetdata(df1,item,itemname)
df = pd.concat([df, pd.DataFrame({'指标分类': '其他', '指标名称': itemname, '指标id': item, '频度': '其他','指标来源':'其他','来源id':'其他'},index=[0])])
df1 = self.edbcodegetdata(df1, item, itemname)
df = pd.concat([df, pd.DataFrame(
{'指标分类': '其他', '指标名称': itemname, '指标id': item, '频度': '其他', '指标来源': '其他', '来源id': '其他'}, index=[0])])
# 按时间排序
df1.sort_values('DataTime',inplace=True,ascending=False)
df1.rename(columns={'DataTime': 'date'},inplace=True)
df1.sort_values('DataTime', inplace=True, ascending=False)
df1.rename(columns={'DataTime': 'date'}, inplace=True)
# df1.dropna(inplace=True)
# 去掉大于今天日期的行
df1 = df1[df1['date'] <= datetime.datetime.now().strftime('%Y-%m-%d')]
logger.info(df1.head())
# logger.info(f'{df1.head()}')
# 保存到xlsx文件的sheet表
with pd.ExcelWriter(os.path.join(dataset,data_set)) as file:
with pd.ExcelWriter(os.path.join(dataset, data_set)) as file:
df1.to_excel(file, sheet_name='指标数据', index=False)
df.to_excel(file, sheet_name='指标列表', index=False)
df_zhibiaoshuju = df1.copy()
df_zhibiaoliebiao = df.copy()
return df_zhibiaoshuju,df_zhibiaoliebiao
return df_zhibiaoshuju, df_zhibiaoliebiao
def get_eta_api_pp_data(self,data_set,dataset=''):
def get_eta_api_chengpinyou_data(self, data_set, dataset=''):
'''
从ETA API获取原油数据
参数:
data_set (str): 数据集名称
dataset (str): 数据集ID默认为空
返回:
None
'''
today = datetime.date.today().strftime("%Y-%m-%d")
# 定义你的headers这里可以包含多个参数
self.headers = {
'nonce': self.signature.nonce, # 例如,一个认证令牌
# 自定义的header参数
'timestamp': str(self.signature.timestamp),
'appid': self.signature.APPID, # 另一个自定义的header参数
'signature': self.signature.signature
}
# 从列表数据中获取指标名称,判断指标名称频度是否为日 如果是则获取UniqueCode然后获取指标数据保存到xlat文件中的sheet表。
'''
df = sheetname 指标列表存储 指标分类-指标名称-指标id-频度
df1 = sheetname 指标数据 ,存储 时间-指标名称1-指标名称2...
'''
# 构建新的DataFrame df df1
df = pd.DataFrame(columns=[
'指标分类', '指标名称', '指标id', '频度', '指标来源', '来源id', '最后更新时间', '更新周期', '预警日期', '停更周期'])
df1 = pd.DataFrame(columns=['DataTime'])
# 外网环境无法访问,请确认是否为内网环境
try:
# 发送GET请求 获取指标分类列表
response = requests.get(self.classifylisturl, headers=self.headers)
except requests.exceptions.RequestException as e:
raise Exception(f"请求失败,请确认是否为内网环境: {e}", "\033[0m")
# 检查响应状态码
if response.status_code == 200:
# 获取成功, 处理响应内容
data = response.json() # 假设接口返回的是JSON数据
# 请求成功,处理响应内容
# logger.info(data.get('Data'))
# 定义你想要保留的固定值
# fixed_value = 1193 # 成品油
fixed_value = 1285 # 沥青
# 遍历列表,只保留那些'category' key的值为固定值的数据项
filtered_data = [item for item in data.get(
'Data') if item.get('ParentId') == fixed_value]
# 然后循环filtered_data去获取list数据才能获取到想要获取的ClassifyId
n = 0
for item in filtered_data:
n += 1
# if n>50:
# break
ClassifyId = item["ClassifyId"] # 分类id分类下的指标列表接口的请求参数
ClassifyName = item["ClassifyName"] # 分类名称要保存到df的指标分类列
# 根据分类id获取指标列表
url = self.classifyidlisturl+str(ClassifyId)
response = requests.get(url, headers=self.headers)
if response.status_code == 200:
# logger.info(response.text)
data2 = response.json()
Data = data2.get('Data')
for i in Data:
# s+= 1
EdbCode = i.get('EdbCode')
# 指标名称要保存到df2的指标名称列,df的指标名称列
EdbName = i.get('EdbName')
Frequency = i.get('Frequency') # 频度要保存到df的频度列
SourceName = i.get('SourceName') # 来源名称要保存到df的频度列
Source = i.get('Source') # 来源ID要保存到df的频度列
# 保存到df
df1 = self.edbcodegetdata(df1, EdbCode, EdbName)
# 取df1所有行最后一列
edbname_df = df1[['DataTime', f'{EdbName}']]
edbname_df = edbname_df.dropna()
if len(edbname_df) == 0:
logger.info(f'指标名称:{EdbName} 没有数据')
continue
try:
time_sequence = edbname_df['DataTime'].values.tolist(
)[-10:]
except IndexError:
time_sequence = edbname_df['DataTime'].values.tolist(
)
# 使用Counter来统计每个星期几的出现次数
from collections import Counter
weekday_counter = Counter(datetime.datetime.strptime(
time_str, "%Y-%m-%d").strftime('%A') for time_str in time_sequence)
# 打印出现次数最多的星期几
try:
most_common_weekday = weekday_counter.most_common(1)[
0][0]
# 计算两周后的日期
warning_date = (datetime.datetime.strptime(
time_sequence[-1], "%Y-%m-%d") + datetime.timedelta(weeks=2)).strftime("%Y-%m-%d")
stop_update_period = (datetime.datetime.strptime(
today, "%Y-%m-%d") - datetime.datetime.strptime(time_sequence[-1], "%Y-%m-%d")).days // 7
except IndexError:
most_common_weekday = '其他'
stop_update_period = 0
if '' in Frequency:
most_common_weekday = '每天'
warning_date = (datetime.datetime.strptime(
time_sequence[-1], "%Y-%m-%d") + datetime.timedelta(days=3)).strftime("%Y-%m-%d")
stop_update_period = (datetime.datetime.strptime(
today, "%Y-%m-%d") - datetime.datetime.strptime(time_sequence[-1], "%Y-%m-%d")).days
# 保存频度 指标名称 分类 指标id 到 df
df2 = pd.DataFrame({'指标分类': ClassifyName, '指标名称': EdbName, '指标id': EdbCode, '频度': Frequency, '指标来源': SourceName, '来源id': Source,
'最后更新时间': edbname_df['DataTime'].values[-1], '更新周期': most_common_weekday, '预警日期': warning_date, '停更周期': stop_update_period}, index=[0])
# df = pd.merge(df, df2, how='outer')
df = pd.concat([df, df2])
# 找到列表中不在指标列中的指标id保存成新的list
new_list = [
item for item in self.edbcodelist if item not in df['指标id'].tolist()]
logger.info(new_list)
# 遍历new_list获取指标数据保存到df1
for item in new_list:
logger.info(item)
# 将item 加入到 df['指标id']中
try:
itemname = edbcodenamedict[item]
except:
itemname = item
df1 = self.edbcodegetdata(df1, item, itemname)
df = pd.concat([df, pd.DataFrame(
{'指标分类': '其他', '指标名称': itemname, '指标id': item, '频度': '其他', '指标来源': '其他', '来源id': '其他'}, index=[0])])
# 按时间排序
df1.sort_values('DataTime', inplace=True, ascending=False)
df1.rename(columns={'DataTime': 'date'}, inplace=True)
# df1.dropna(inplace=True)
# 去掉大于今天日期的行
df1 = df1[df1['date'] <= datetime.datetime.now().strftime('%Y-%m-%d')]
logger.info(df1.head())
# logger.info(f'{df1.head()}')
# 保存到xlsx文件的sheet表
with pd.ExcelWriter(os.path.join(dataset, data_set)) as file:
df1.to_excel(file, sheet_name='指标数据', index=False)
df.to_excel(file, sheet_name='指标列表', index=False)
df_zhibiaoshuju = df1.copy()
df_zhibiaoliebiao = df.copy()
return df_zhibiaoshuju, df_zhibiaoliebiao
def get_eta_api_pp_data(self, data_set, dataset=''):
global ClassifyId
today = datetime.date.today().strftime("%Y-%m-%d")
# 定义你的headers这里可以包含多个参数
self.headers = {
'nonce': self.signature.nonce, # 例如,一个认证令牌
'timestamp': str(self.signature.timestamp), # 自定义的header参数
# 自定义的header参数
'timestamp': str(self.signature.timestamp),
'appid': self.signature.APPID, # 另一个自定义的header参数
'signature': self.signature.signature
}
@ -442,13 +617,12 @@ class EtaReader():
df = pd.DataFrame(columns=['指标分类', '指标名称', '指标id', '频度'])
df1 = pd.DataFrame(columns=['DataTime'])
# 外网环境无法访问,请确认是否为内网环境
try:
# 发送GET请求 获取指标分类列表
response = requests.get(self.classifylisturl, headers=self.headers)
except requests.exceptions.RequestException as e:
raise Exception(f"请求失败,请确认是否为内网环境: {e}","\033[0m")
raise Exception(f"请求失败,请确认是否为内网环境: {e}", "\033[0m")
# 检查响应状态码
if response.status_code == 200:
@ -461,16 +635,17 @@ class EtaReader():
fixed_value = ClassifyId
# 遍历列表,只保留那些'category' key的值为固定值的数据项
filtered_data = [item for item in data.get('Data') if item.get('ParentId') == fixed_value]
filtered_data = [item for item in data.get(
'Data') if item.get('ParentId') == fixed_value]
#然后循环filtered_data去获取list数据才能获取到想要获取的ClassifyId
# 然后循环filtered_data去获取list数据才能获取到想要获取的ClassifyId
n = 0
for item in filtered_data:
n+= 1
n += 1
# if n>50:
# break
ClassifyId = item["ClassifyId"] #分类id分类下的指标列表接口的请求参数
ClassifyName = item["ClassifyName"] #分类名称要保存到df的指标分类列
ClassifyId = item["ClassifyId"] # 分类id分类下的指标列表接口的请求参数
ClassifyName = item["ClassifyName"] # 分类名称要保存到df的指标分类列
# 根据分类id获取指标列表
url = self.classifyidlisturl+str(ClassifyId)
response = requests.get(url, headers=self.headers)
@ -481,27 +656,30 @@ class EtaReader():
for i in Data:
# s+= 1
EdbCode = i.get('EdbCode')
EdbName = i.get('EdbName') # 指标名称要保存到df2的指标名称列,df的指标名称列
# 指标名称要保存到df2的指标名称列,df的指标名称列
EdbName = i.get('EdbName')
Frequency = i.get('Frequency') # 频度要保存到df的频度列
# 频度不是 日 或者 周的 跳过
if Frequency not in ['日度','周度','','']:
if Frequency not in ['日度', '周度', '', '']:
continue
# 判断名称是否需要保存
isSave = self.filter_pp_data(ClassifyName,EdbName)
isSave = self.filter_pp_data(ClassifyName, EdbName)
if isSave:
# 保存到df
# 保存频度 指标名称 分类 指标id 到 df
df2 = pd.DataFrame({'指标分类': ClassifyName, '指标名称': EdbName, '指标id': EdbCode, '频度': Frequency},index=[0])
df2 = pd.DataFrame(
{'指标分类': ClassifyName, '指标名称': EdbName, '指标id': EdbCode, '频度': Frequency}, index=[0])
# df = pd.merge(df, df2, how='outer')
df = pd.concat([df, df2])
df1 = self.edbcodegetdata(df1,EdbCode,EdbName)
df1 = self.edbcodegetdata(df1, EdbCode, EdbName)
else:
logger.info(f'跳过指标 {EdbName}')
# 找到列表中不在指标列中的指标id保存成新的list
new_list = [item for item in self.edbcodelist if item not in df['指标id'].tolist()]
new_list = [
item for item in self.edbcodelist if item not in df['指标id'].tolist()]
logger.info(new_list)
# 遍历new_list获取指标数据保存到df1
for item in new_list:
@ -512,41 +690,44 @@ class EtaReader():
except:
itemname = item
df1 = self.edbcodegetdata(df1,item,itemname)
df = pd.concat([df, pd.DataFrame({'指标分类': '其他', '指标名称': itemname, '指标id': item, '频度': '其他'},index=[0])])
df1 = self.edbcodegetdata(df1, item, itemname)
df = pd.concat([df, pd.DataFrame(
{'指标分类': '其他', '指标名称': itemname, '指标id': item, '频度': '其他'}, index=[0])])
# 按时间排序
df1.sort_values('DataTime',inplace=True,ascending=False)
df1.rename(columns={'DataTime': 'date'},inplace=True)
df1.sort_values('DataTime', inplace=True, ascending=False)
df1.rename(columns={'DataTime': 'date'}, inplace=True)
# df1.dropna(inplace=True)
# 去掉大于今天日期的行
df1 = df1[df1['date'] <= datetime.datetime.now().strftime('%Y-%m-%d')]
logger.info(df1.head())
# logger.info(f'{df1.head()}')
# 保存到xlsx文件的sheet表
with pd.ExcelWriter(os.path.join(dataset,data_set)) as file:
with pd.ExcelWriter(os.path.join(dataset, data_set)) as file:
df1.to_excel(file, sheet_name='指标数据', index=False)
df.to_excel(file, sheet_name='指标列表', index=False)
df_zhibiaoshuju = df1.copy()
df_zhibiaoliebiao = df.copy()
return df_zhibiaoshuju,df_zhibiaoliebiao
return df_zhibiaoshuju, df_zhibiaoliebiao
def push_data(self,data):
def push_data(self, data):
today = datetime.date.today().strftime("%Y-%m-%d")
# 定义你的headers这里可以包含多个参数
self.headers = {
'nonce': self.signature.nonce, # 例如,一个认证令牌
'timestamp': str(self.signature.timestamp), # 自定义的header参数
# 自定义的header参数
'timestamp': str(self.signature.timestamp),
'appid': self.signature.APPID, # 另一个自定义的header参数
'signature': self.signature.signature
}
# 发送post请求 上传数据
logger.info('请求参数:',data)
response = requests.post(self.edbdatapushurl, headers=self.headers,data=json.dumps(data))
logger.info('请求参数:', data)
response = requests.post(
self.edbdatapushurl, headers=self.headers, data=json.dumps(data))
# 检查响应状态码
if response.status_code == 200:
@ -560,23 +741,24 @@ class EtaReader():
# 主动抛出异常
raise Exception(f'Error: {response.status_code}, {response.text}')
def del_zhibiao(self,IndexCodeList):
def del_zhibiao(self, IndexCodeList):
today = datetime.date.today().strftime("%Y-%m-%d")
# 定义你的headers这里可以包含多个参数
self.headers = {
'nonce': self.signature.nonce, # 例如,一个认证令牌
'timestamp': str(self.signature.timestamp), # 自定义的header参数
# 自定义的header参数
'timestamp': str(self.signature.timestamp),
'appid': self.signature.APPID, # 另一个自定义的header参数
'signature': self.signature.signature
}
data = {
"IndexCodeList": IndexCodeList #指标编码列表
}
"IndexCodeList": IndexCodeList # 指标编码列表
}
# 发送post请求 上传数据
response = requests.post(self.edbdeleteurl, headers=self.headers,data=json.dumps(data))
response = requests.post(
self.edbdeleteurl, headers=self.headers, data=json.dumps(data))
# 检查响应状态码
if response.status_code == 200:
@ -590,7 +772,7 @@ class EtaReader():
# 主动抛出异常
raise Exception(f'Error: {response.status_code}, {response.text}')
def del_business(self,data):
def del_business(self, data):
''''
接口地址
https://console-docs.apipost.cn/preview/fce869601d0be1d9/9a637c2f9ed0c589?target_id=d3cafcbf-a68c-42b3-b105-7bbd0e95a9cd
@ -607,15 +789,15 @@ class EtaReader():
# 定义你的headers这里可以包含多个参数
self.headers = {
'nonce': self.signature.nonce, # 例如,一个认证令牌
'timestamp': str(self.signature.timestamp), # 自定义的header参数
# 自定义的header参数
'timestamp': str(self.signature.timestamp),
'appid': self.signature.APPID, # 另一个自定义的header参数
'signature': self.signature.signature
}
# 发送post请求 上传数据
response = requests.post(self.edbbusinessurl, headers=self.headers,data=json.dumps(data))
response = requests.post(
self.edbbusinessurl, headers=self.headers, data=json.dumps(data))
# 检查响应状态码
if response.status_code == 200:
@ -634,6 +816,7 @@ class BinanceAPI:
'''
获取 Binance API 请求头签名
'''
def __init__(self, APPID, SECRET):
self.APPID = APPID
self.SECRET = SECRET
@ -641,7 +824,8 @@ class BinanceAPI:
# 生成随机字符串作为 nonce
def generate_nonce(self, length=32):
self.nonce = ''.join(random.choices(string.ascii_letters + string.digits, k=length))
self.nonce = ''.join(random.choices(
string.ascii_letters + string.digits, k=length))
return self.nonce
# 获取当前时间戳(秒)
@ -664,7 +848,8 @@ class BinanceAPI:
self.signature = self.calculate_signature(self.SECRET, self.sign_str)
# return self.signature
### 日志配置
# 日志配置
# 创建日志目录(如果不存在)
log_dir = 'logs'
@ -676,8 +861,10 @@ logger = logging.getLogger('pricepredict')
logger.setLevel(logging.INFO)
# 配置文件处理器,将日志记录到文件
file_handler = logging.handlers.RotatingFileHandler(os.path.join(log_dir, 'pricepredict.log'), maxBytes=1024 * 1024, backupCount=5)
file_handler.setFormatter(logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s'))
file_handler = logging.handlers.RotatingFileHandler(os.path.join(
log_dir, 'pricepredict.log'), maxBytes=1024 * 1024, backupCount=5)
file_handler.setFormatter(logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'))
# 配置控制台处理器,将日志打印到控制台
console_handler = logging.StreamHandler()
@ -688,7 +875,6 @@ logger.addHandler(file_handler)
logger.addHandler(console_handler)
# eta 接口url
sourcelisturl = 'http://10.189.2.78:8108/v1/edb/source/list'
classifylisturl = 'http://10.189.2.78:8108/v1/edb/classify/list?ClassifyType='
@ -698,47 +884,48 @@ edbcodedataurl = 'http://10.189.2.78:8108/v1/edb/data?EdbCode='
edbdatapushurl = 'http://10.189.2.78:8108/v1/edb/push'
edbdeleteurl = 'http://10.189.2.78:8108/v1/edb/business/edb/del'
edbbusinessurl = 'http://10.189.2.78:8108/v1/edb/business/data/del'
edbcodelist = ['CO1 Comdty', 'ovx index', 'C2404194834', 'C2404199738', 'dxy curncy', 'C2403128043', 'C2403150124',
'DOESCRUD Index', 'WTRBM1 EEGC Index', 'FVHCM1 INDEX', 'doedtprd index', 'CFFDQMMN INDEX',
'C2403083739', 'C2404167878', 'C2403250571', 'lmcads03 lme comdty', 'GC1 COMB Comdty',
'C2404171822','C2404167855',
# 'W000825','W000826','G.IPE', # 美国汽柴油
# 'S5131019','ID00135604','FSGAM1 Index','S5120408','ID00136724', # 新加坡汽柴油
]
edbcodelist = [
# 'CO1 Comdty', 'ovx index', 'C2404194834', 'C2404199738', 'dxy curncy', 'C2403128043', 'C2403150124',
# 'DOESCRUD Index', 'WTRBM1 EEGC Index', 'FVHCM1 INDEX', 'doedtprd index', 'CFFDQMMN INDEX',
# 'C2403083739', 'C2404167878', 'C2403250571', 'lmcads03 lme comdty', 'GC1 COMB Comdty',
# 'C2404171822', 'C2404167855',
# 'W000825','W000826','G.IPE', # 美国汽柴油
# 'S5131019','ID00135604','FSGAM1 Index','S5120408','ID00136724', # 新加坡汽柴油
]
# eta自有数据指标编码
modelsindex = {
'NHITS': 'SELF0000001',
'Informer':'SELF0000057',
'LSTM':'SELF0000058',
'iTransformer':'SELF0000059',
'TSMixer':'SELF0000060',
'TSMixerx':'SELF0000061',
'PatchTST':'SELF0000062',
'RNN':'SELF0000063',
'GRU':'SELF0000064',
'TCN':'SELF0000065',
'BiTCN':'SELF0000066',
'DilatedRNN':'SELF0000067',
'MLP':'SELF0000068',
'DLinear':'SELF0000069',
'NLinear':'SELF0000070',
'TFT':'SELF0000071',
'FEDformer':'SELF0000072',
'StemGNN':'SELF0000073',
'MLPMultivariate':'SELF0000074',
'TiDE':'SELF0000075',
'DeepNPTS':'SELF0000076'
}
# 'NHITS': 'SELF0000001',
# 'Informer': 'SELF0000057',
# 'LSTM': 'SELF0000058',
# 'iTransformer': 'SELF0000059',
# 'TSMixer': 'SELF0000060',
# 'TSMixerx': 'SELF0000061',
# 'PatchTST': 'SELF0000062',
# 'RNN': 'SELF0000063',
# 'GRU': 'SELF0000064',
# 'TCN': 'SELF0000065',
# 'BiTCN': 'SELF0000066',
# 'DilatedRNN': 'SELF0000067',
# 'MLP': 'SELF0000068',
# 'DLinear': 'SELF0000069',
# 'NLinear': 'SELF0000070',
# 'TFT': 'SELF0000071',
# 'FEDformer': 'SELF0000072',
# 'StemGNN': 'SELF0000073',
# 'MLPMultivariate': 'SELF0000074',
# 'TiDE': 'SELF0000075',
# 'DeepNPTS': 'SELF0000076'
}
edbcodelist = edbcodelist+list(modelsindex.values())
### 文件
data_set = '历史预测结果.xlsx' # 数据集文件
# 文件
data_set = '沥青eta数据.xlsx' # 数据集文件
# data_set = 'INE_OIL(1).csv'
### 文件夹
dataset = 'yuanyoudataset' # 数据集文件夹
# 文件夹
dataset = 'yuanyoudataset' # 数据集文件夹
# eta 接口token
@ -754,4 +941,10 @@ etadata = EtaReader(signature=signature,
edbdeleteurl=edbdeleteurl,
edbbusinessurl=edbbusinessurl,
)
df_zhibiaoshuju, df_zhibiaoliebiao = etadata.get_eta_api_yuanyou_data(data_set=data_set, dataset=dataset) # 原始数据,未处理
df_zhibiaoshuju, df_zhibiaoliebiao = etadata.get_eta_api_chengpinyou_data(
data_set=data_set, dataset=dataset) # 原始数据,未处理
# 保存到xlsx文件的sheet表
with pd.ExcelWriter(os.path.join(data_set)) as file:
df_zhibiaoshuju.to_excel(file, sheet_name='指标数据', index=False)
df_zhibiaoliebiao.to_excel(file, sheet_name='指标列表', index=False)

BIN
成品油eta数据.xlsx Normal file

Binary file not shown.

BIN
沥青eta数据.xlsx Normal file

Binary file not shown.