Compare commits

..

No commits in common. "f5286990fa6b84bc08351ce6b3722d401d7669f8" and "5191ff7c691bde6b4b2de6c2e298b1b117ffa0a7" have entirely different histories.

14 changed files with 1359 additions and 788 deletions

View File

@ -18,10 +18,22 @@
"import numpy as np\n",
"# 变量定义\n",
"login_url = \"http://10.200.32.39/jingbo-api/api/server/login\"\n",
"search_url = \"http://10.200.32.39/jingbo-api/api/warehouse/dwDataItem/queryByItemNos\"\n",
"# query_data_list_item_nos_url\n",
"search_url = \"http://10.200.32.39/jingbo-api/api/warehouse/dwDataItem/queryByItemNos\" #jingbo-dev/api/warehouse/dwDataItem/queryDataListItemNos\n",
"upload_url = \"http://10.200.32.39/jingbo-api/api/dw/dataValue/pushDataValueList\"\n",
"\n",
"\n",
"query_data_list_item_nos_data = {\n",
" \"funcModule\": \"数据项\",\n",
" \"funcOperation\": \"查询\",\n",
" \"data\": {\n",
" \"dateStart\": \"20200101\",\n",
" \"dateEnd\": \"20241231\",\n",
" \"dataItemNoList\": [\"Brentzdj\", \"Brentzgj\"] # 数据项编码,代表 brent最低价和最高价\n",
" }\n",
"}\n",
"\n",
"\n",
"login_data = {\n",
" \"data\": {\n",
" \"account\": \"api_dev\",\n",
@ -844,7 +856,7 @@
" # headers1 = {\"Authorization\": token_push}\n",
" # res = requests.post(url=upload_url, headers=headers1, json=data1, timeout=(3, 5))\n",
" \n",
" \n",
"\n",
"\n",
"\n",
"\n",
@ -960,8 +972,6 @@
" # 保存新的xls文件\n",
" new_workbook.save(\"定性模型数据项12-11.xlsx\")\n",
"\n",
"\n",
"\n",
"\n"
]
},

View File

@ -2,7 +2,7 @@
"cells": [
{
"cell_type": "code",
"execution_count": 3,
"execution_count": null,
"metadata": {},
"outputs": [
{

View File

@ -2,7 +2,7 @@
"cells": [
{
"cell_type": "code",
"execution_count": 27,
"execution_count": null,
"metadata": {},
"outputs": [
{

320
config_shiyoujiao.py Normal file
View File

@ -0,0 +1,320 @@
import logging
import os
import logging.handlers
import datetime
from lib.tools import MySQLDB,SQLiteHandler
# eta 接口token
APPID = "XNLDvxZHHugj7wJ7"
SECRET = "iSeU4s6cKKBVbt94htVY1p0sqUMqb2xa"
# eta 接口url
sourcelisturl = 'http://10.189.2.78:8108/v1/edb/source/list'
classifylisturl = 'http://10.189.2.78:8108/v1/edb/classify/list?ClassifyType='
uniquecodedataurl = 'http://10.189.2.78:8108/v1/edb/data?UniqueCode=4991c37becba464609b409909fe4d992&StartDate=2024-02-01'
classifyidlisturl = 'http://10.189.2.78:8108/v1/edb/list?ClassifyId='
edbcodedataurl = 'http://10.189.2.78:8108/v1/edb/data?EdbCode='
edbdatapushurl = 'http://10.189.2.78:8108/v1/edb/push'
edbdeleteurl = 'http://10.189.2.78:8108/v1/edb/business/edb/del'
edbbusinessurl = 'http://10.189.2.78:8108/v1/edb/business/data/del'
edbcodelist = ['ID01385938','lmcads03 lme comdty',
'GC1 COMB Comdty',
'C2404171822',
'dxy curncy',
'S5443199 ',
'S5479800',
'S5443108',
'H7358586',
'LC3FM1 INDEX',
'CNY REGN Curncy',
's0105897',
'M0067419',
'M0066351',
'S0266372',
'S0266438',
'S0266506',
'ID01384463']
# 临时写死用指定的列,与上面的edbcode对应后面更改
edbnamelist = [
'ds','y',
'LME铜价',
'黄金连1合约',
'Brent-WTI',
'美元指数',
'甲醇鲁南价格',
'甲醇太仓港口价格',
'山东丙烯主流价',
'丙烷(山东)',
'FEI丙烷 M1',
'在岸人民币汇率',
'南华工业品指数',
'PVC期货主力',
'PE期货收盘价',
'PP连续-1月',
'PP连续-5月',
'PP连续-9月',
'PP拉丝L5E89出厂价华北第二区域内蒙古久泰新材料'
]
edbcodenamedict = {
'ID01385938':'PP拉丝1102K市场价青州国家能源宁煤',
'ID01384463':'PP拉丝L5E89出厂价华北第二区域内蒙古久泰新材料',
'lmcads03 lme comdty':'LME铜价',
'GC1 COMB Comdty':'黄金连1合约',
'C2404171822':'Brent-WTI',
'dxy curncy':'美元指数',
'S5443199 ':'甲醇鲁南价格',
'S5479800':'甲醇太仓港口价格',
'S5443108':'山东丙烯主流价',
'H7358586':'丙烷(山东)',
'LC3FM1 INDEX':'FEI丙烷 M1',
'CNY REGN Curncy':'在岸人民币汇率',
's0105897':'南华工业品指数',
'M0067419':'PVC期货主力',
'M0066351':'PE期货收盘价',
'S0266372':'PP连续-1月',
'S0266438':'PP连续-5月',
'S0266506':'PP连续-9月',
}
# eta自有数据指标编码
modelsindex = {
'NHITS': 'SELF0000077',
'Informer':'SELF0000078',
'LSTM':'SELF0000079',
'iTransformer':'SELF0000080',
'TSMixer':'SELF0000081',
'TSMixerx':'SELF0000082',
'PatchTST':'SELF0000083',
'RNN':'SELF0000084',
'GRU':'SELF0000085',
'TCN':'SELF0000086',
'BiTCN':'SELF0000087',
'DilatedRNN':'SELF0000088',
'MLP':'SELF0000089',
'DLinear':'SELF0000090',
'NLinear':'SELF0000091',
'TFT':'SELF0000092',
'FEDformer':'SELF0000093',
'StemGNN':'SELF0000094',
'MLPMultivariate':'SELF0000095',
'TiDE':'SELF0000096',
'DeepNPTS':'SELF0000097'
}
# eta 上传预测结果的请求体,后面发起请求的时候更改 model datalist 数据
data = {
"IndexCode": "",
"IndexName": "价格预测模型",
"Unit": "",
"Frequency": "日度",
"SourceName": f"价格预测",
"Remark": 'ddd',
"DataList": [
{
"Date": "2024-05-02",
"Value": 333444
}
]
}
# eta 分类
# level3才可以获取到数据所以需要人工把能源化工下所有的level3级都找到
# url = 'http://10.189.2.78:8108/v1/edb/list?ClassifyId=1214'
#ParentId ":1160, 能源化工
# ClassifyId ":1214,原油 3912 石油焦
#ParentId ":1214,",就是原油下所有的数据。
ClassifyId = 3707
############################################################################################################### 变量定义--测试环境
server_host = '192.168.100.53'
login_pushreport_url = f"http://{server_host}:8080/jingbo-dev/api/server/login"
upload_url = f"http://{server_host}:8080/jingbo-dev/api/analysis/reportInfo/researchUploadReportSave"
upload_warning_url = f"http://{server_host}:8080/jingbo-dev/api/basicBuiness/crudeOilWarning/save"
query_data_list_item_nos_url = f"http://{server_host}:8080/jingbo-dev/api/warehouse/dwDataItem/queryDataListItemNos"
login_data = {
"data": {
"account": "api_test",
# "password": "MmVmNzNlOWI0MmY0ZDdjZGUwNzE3ZjFiMDJiZDZjZWU=", # Shihua@123456
"password": "ZTEwYWRjMzk0OWJhNTlhYmJlNTZlMDU3ZjIwZjg4M2U=", # 123456
"tenantHashCode": "8a4577dbd919675758d57999a1e891fe",
"terminal": "API"
},
"funcModule": "API",
"funcOperation": "获取token"
}
upload_data = {
"funcModule":'研究报告信息',
"funcOperation":'上传聚烯烃PP价格预测报告',
"data":{
"groupNo":'000128', # 用户组编号
"ownerAccount":'arui', #报告所属用户账号
"reportType":'OIL_PRICE_FORECAST', # 报告类型固定为OIL_PRICE_FORECAST
"fileName": '2000-40-5-50--100-原油指标数据.xlsx-Brent活跃合约--2024-09-06-15-01-29-预测报告.pdf', #文件名称
"fileBase64": '' ,#文件内容base64
"categoryNo":'yyjgycbg', # 研究报告分类编码
"smartBusinessClassCode":'JXTJGYCBG', #分析报告分类编码
"reportEmployeeCode":"E40116", # 报告人
"reportDeptCode" :"D0044" ,# 报告部门
"productGroupCode":"RAW_MATERIAL" # 商品分类
}
}
warning_data = {
"groupNo":'000128', # 用户组编号
"funcModule":'原油特征停更预警',
"funcOperation":'原油特征停更预警',
"data":{
'WARNING_TYPE_NAME':'特征数据停更预警',
'WARNING_CONTENT':'',
'WARNING_DATE':''
}
}
query_data_list_item_nos_data = {
"funcModule": "数据项",
"funcOperation": "查询",
"data": {
"dateStart":"20200101",
"dateEnd":"20241231",
"dataItemNoList":["Brentzdj","Brentzgj"] # 数据项编码,代表 brent最低价和最高价
}
}
# 北京环境数据库
host = '192.168.101.27'
port = 3306
dbusername ='root'
password = '123456'
dbname = 'jingbo_test'
table_name = 'v_tbl_crude_oil_warning'
### 开关
is_train = False # 是否训练
is_debug = True # 是否调试
is_eta = True # 是否使用eta接口
is_market = False # 是否通过市场信息平台获取特征 ,在is_eta 为true 的情况下生效
is_timefurture = True # 是否使用时间特征
is_fivemodels = False # 是否使用之前保存的最佳的5个模型
is_edbcode = False # 特征使用edbcoding列表中的
is_edbnamelist = False # 自定义特征对应上面的edbnamelist
is_update_eta = False # 预测结果上传到eta
is_update_report = True # 是否上传报告
is_update_warning_data = False # 是否上传预警数据
is_del_corr = 0.6 # 是否删除相关性高的特征,取值为 0-1 0 为不删除0.6 表示删除相关性小于0.6的特征
is_del_tow_month = True # 是否删除两个月不更新的特征
# 连接到数据库
db_mysql = MySQLDB(host=host, user=dbusername, password=password, database=dbname)
db_mysql.connect()
print("数据库连接成功",host,dbname,dbusername)
# 数据截取日期
start_year = 2020 # 数据开始年份
end_time = '' # 数据截取日期
freq = 'B' # 时间频率,"D": 天 "W": 周"M": 月"Q": 季度"A": 年 "H": 小时 "T": 分钟 "S": 秒 "B": 工作日
delweekenday = True if freq == 'B' else False # 是否删除周末数据
is_corr = False # 特征是否参与滞后领先提升相关系数
add_kdj = False # 是否添加kdj指标
if add_kdj and is_edbnamelist:
edbnamelist = edbnamelist+['K','D','J']
### 模型参数
y = 'AVG-金能大唐久泰青州'
avg_cols = [
'PP拉丝1102K出厂价青州国家能源宁煤',
'PP拉丝L5E89出厂价华北第二区域内蒙古久泰新材料',
'PP拉丝L5E89出厂价河北、鲁北大唐内蒙多伦',
'PP拉丝HP550J市场价青岛金能化学'
]
offsite = 80
offsite_col = ['PP拉丝HP550J市场价青岛金能化学']
horizon =5 # 预测的步长
input_size = 40 # 输入序列长度
train_steps = 50 if is_debug else 1000 # 训练步数,用来限定epoch次数
val_check_steps = 30 # 评估频率
early_stop_patience_steps = 5 # 早停的耐心步数
# --- 交叉验证用的参数
test_size = 200 # 测试集大小定义100后面使用的时候重新赋值
val_size = test_size # 验证集大小,同测试集大小
### 特征筛选用到的参数
k = 100 # 特征筛选数量如果是0或者值比特征数量大代表全部特征
corr_threshold = 0.6 # 相关性大于0.6的特征
rote = 0.06 # 绘图上下界阈值
### 计算准确率
weight_dict = [0.4,0.15,0.1,0.1,0.25] # 权重
### 文件
data_set = '石油焦指标数据.xlsx' # 数据集文件
dataset = 'shiyoujiaodataset' # 数据集文件夹
# 数据库名称
db_name = os.path.join(dataset,'jbsh_juxiting.db')
sqlitedb = SQLiteHandler(db_name)
sqlitedb.connect()
settings = f'{input_size}-{horizon}-{train_steps}--{k}-{data_set}-{y}'
# 获取日期时间
# now = datetime.datetime.now().strftime('%Y%m%d%H%M%S') # 获取当前日期时间
now = datetime.datetime.now().strftime('%Y-%m-%d') # 获取当前日期时间
reportname = f'PP大模型预测报告--{end_time}.pdf' # 报告文件名
reportname = reportname.replace(':', '-') # 替换冒号
if end_time == '':
end_time = now
### 邮件配置
username='1321340118@qq.com'
passwd='wgczgyhtyyyyjghi'
# recv=['liurui_test@163.com','52585119@qq.com']
recv=['liurui_test@163.com']
# recv=['liurui_test@163.com']
title='reportname'
content=y+'预测报告请看附件'
file=os.path.join(dataset,'reportname')
# file=os.path.join(dataset,'14-7-50--100-原油指标数据.xlsx-Brent连1合约价格--20240731175936-预测报告.pdf')
ssl=True
### 日志配置
# 创建日志目录(如果不存在)
log_dir = 'logs'
if not os.path.exists(log_dir):
os.makedirs(log_dir)
# 配置日志记录器
logger = logging.getLogger('my_logger')
logger.setLevel(logging.INFO)
# 配置文件处理器,将日志记录到文件
file_handler = logging.handlers.RotatingFileHandler(os.path.join(log_dir, 'pricepredict.log'), maxBytes=1024 * 1024, backupCount=5)
file_handler.setFormatter(logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s'))
# 配置控制台处理器,将日志打印到控制台
console_handler = logging.StreamHandler()
console_handler.setFormatter(logging.Formatter('%(message)s'))
# 将处理器添加到日志记录器
logger.addHandler(file_handler)
logger.addHandler(console_handler)
# logger.info('当前配置:'+settings)

View File

@ -143,7 +143,7 @@ modelsindex = {
}
# 百川数据指标编码
baicangidnamedict = {
baichuanidnamedict = {
'1588348470396480000': '石油焦滨州-友泰',
'1588348470396480000.00': '石油焦东营-海科瑞林',
'1588348470396480000.00': '石油焦东营-华联2',
@ -160,6 +160,8 @@ baicangidnamedict = {
}
# baichuanidnamedict = {'1588348470396475286': 'test1', '1666': 'test2'} # 北京环境测试用
# eta 上传预测结果的请求体,后面发起请求的时候更改 model datalist 数据
data = {
"IndexCode": "",
@ -272,14 +274,14 @@ push_data_value_list_data = {
}
# 八大维度数据项编码
bdwd_items = {
# 'ciri': 'yyycbdwdcr',
# 'benzhou': 'yyycbdwdbz',
# 'cizhou': 'yyycbdwdcz',
# 'gezhou': 'yyycbdwdgz',
# 'ciyue': 'yyycbdwdcy',
# 'cieryue': 'yyycbdwdcey',
# 'cisanyue': 'yyycbdwdcsy',
# 'cisiyue': 'yyycbdwdcsiy',
'ciri': 'syjlyycbdwdcr',
'benzhou': 'syjlyycbdwdbz',
'cizhou': 'syjlyycbdwdcz',
'gezhou': 'syjlyycbdwdgz',
'ciyue': 'syjlyycbdwdcy',
'cieryue': 'syjlyycbdwdcey',
'cisanyue': 'syjlyycbdwdcsy',
'cisiyue': 'syjlyycbdwdcsiy',
}
# 北京环境数据库
@ -326,7 +328,7 @@ if add_kdj and is_edbnamelist:
edbnamelist = edbnamelist+['K', 'D', 'J']
# 模型参数
y = 'B46cc7d0a90155b5bfd'
y = '煅烧焦山东高硫(高端S < 3.5,普货)(元/吨)'
avg_cols = [
]

View File

@ -57,6 +57,7 @@ global_config = {
'y': None, # 目标变量列名
'is_fivemodels': None,
'weight_dict': None,
'baicangidnamedict': None, # 百川id名称映射
# 模型参数
'data_set': None, # 数据集名称
@ -120,6 +121,8 @@ global_config = {
# 数据库配置
'sqlitedb': None,
'db_mysql': None,
'baichuan_table_name': None,
}
# 定义函数
@ -1199,6 +1202,8 @@ class Config:
# 数据库配置
@property
def sqlitedb(self): return global_config['sqlitedb']
@property
def db_mysql(self): return global_config['db_mysql']
config = Config()
@ -2213,3 +2218,38 @@ def addtimecharacteristics(df, dataset):
df.drop(columns=['quarter_start', 'quarter'], inplace=True)
df.to_csv(os.path.join(dataset, '指标数据添加时间特征.csv'), index=False)
return df
# 从数据库获取百川数据接收一个百川id列表返回df格式的数据
def get_baichuan_data(baichuanidnamedict):
baichuanidlist = list(baichuanidnamedict.keys())
# 连接数据库
db = config.db_mysql
db.connect()
# 执行SQL查询 select BAICHUAN_ID,DATA_DATE,DATA_VALUE from V_TBL_BAICHUAN_YINGFU_VALUE where BAICHUAN_ID in ('1588348470396475286','1666');
sql = f"SELECT BAICHUAN_ID,DATA_DATE,DATA_VALUE FROM {global_config['baichuan_table_name']} WHERE BAICHUAN_ID in ({','.join(baichuanidlist)})"
# 获取查询结果
results = db.execute_query(sql)
df = pd.DataFrame(results, columns=[
'BAICHUAN_ID', 'DATA_DATE', 'DATA_VALUE'])
# 按BAICHUAN_ID 进行分组然后按DATA_DATE合并
df1 = pd.DataFrame(columns=['DATA_DATE'])
for baichuan_id, group in df.groupby('BAICHUAN_ID'):
# group 删除BAICHUAN_ID列
group.drop(columns=['BAICHUAN_ID'], inplace=True)
# group DATA_value 转换为float类型,保留两位小数
group['DATA_VALUE'] = group['DATA_VALUE'].astype(float).round(2)
# group 更改列名
group.rename(
columns={'DATA_VALUE': baichuanidnamedict[baichuan_id]}, inplace=True)
# 按DATA_DATE合并
df1 = pd.merge(
df1, group[['DATA_DATE', baichuanidnamedict[baichuan_id]]], on='DATA_DATE', how='outer')
# 把DATA_DATE 列转换成日期格式
df1['date'] = pd.to_datetime(
df1['DATA_DATE']).dt.strftime('%Y-%m-%d')
df1.drop(columns=['DATA_DATE'], inplace=True)
return df1

301
main_shiyoujiao.py Normal file
View File

@ -0,0 +1,301 @@
# 读取配置
from lib.dataread import *
from lib.tools import SendMail,exception_logger
from models.nerulforcastmodels import ex_Model_Juxiting,model_losss,model_losss_juxiting,brent_export_pdf,tansuanli_export_pdf,pp_export_pdf,model_losss_juxiting
import glob
import torch
torch.set_float32_matmul_precision("high")
def predict_main():
"""
主预测函数用于从 ETA 获取数据处理数据训练模型并进行预测
参数:
signature (BinanceAPI): Binance API 实例
etadata (EtaReader): ETA 数据读取器实例
is_eta (bool): 是否从 ETA 获取数据
data_set (str): 数据集名称
dataset (str): 数据集路径
add_kdj (bool): 是否添加 KDJ 指标
is_timefurture (bool): 是否添加时间衍生特征
end_time (str): 结束时间
is_edbnamelist (bool): 是否使用 EDB 名称列表
edbnamelist (list): EDB 名称列表
y (str): 预测目标列名
sqlitedb (SQLiteDB): SQLite 数据库实例
is_corr (bool): 是否进行相关性分析
horizon (int): 预测时域
input_size (int): 输入数据大小
train_steps (int): 训练步数
val_check_steps (int): 验证检查步数
early_stop_patience_steps (int): 早停耐心步数
is_debug (bool): 是否调试模式
dataset (str): 数据集名称
is_train (bool): 是否训练模型
is_fivemodels (bool): 是否使用五个模型
val_size (float): 验证集大小
test_size (float): 测试集大小
settings (dict): 模型设置
now (str): 当前时间
etadata (EtaReader): ETA 数据读取器实例
modelsindex (list): 模型索引列表
data (str): 数据类型
is_eta (bool): 是否从 ETA 获取数据
返回:
None
"""
global end_time
signature = BinanceAPI(APPID, SECRET)
etadata = EtaReader(signature=signature,
classifylisturl=classifylisturl,
classifyidlisturl=classifyidlisturl,
edbcodedataurl=edbcodedataurl,
edbcodelist=edbcodelist,
edbdatapushurl=edbdatapushurl,
edbdeleteurl=edbdeleteurl,
edbbusinessurl=edbbusinessurl
)
# 获取数据
if is_eta:
logger.info('从eta获取数据...')
signature = BinanceAPI(APPID, SECRET)
etadata = EtaReader(signature=signature,
classifylisturl=classifylisturl,
classifyidlisturl=classifyidlisturl,
edbcodedataurl=edbcodedataurl,
edbcodelist=edbcodelist,
edbdatapushurl=edbdatapushurl,
edbdeleteurl=edbdeleteurl,
edbbusinessurl=edbbusinessurl,
)
df_zhibiaoshuju, df_zhibiaoliebiao = etadata.get_eta_api_shiyoujiao_data(data_set=data_set, dataset=dataset) # 原始数据,未处理
if is_market:
logger.info('从市场信息平台获取数据...')
try:
# 如果是测试环境最高价最低价取excel文档
if server_host == '192.168.100.53':
logger.info('从excel文档获取最高价最低价')
df_zhibiaoshuju = get_high_low_data(df_zhibiaoshuju)
else:
logger.info('从市场信息平台获取数据')
df_zhibiaoshuju = get_market_data(end_time,df_zhibiaoshuju)
except :
logger.info('最高最低价拼接失败')
# 保存到xlsx文件的sheet表
with pd.ExcelWriter(os.path.join(dataset,data_set)) as file:
df_zhibiaoshuju.to_excel(file, sheet_name='指标数据', index=False)
df_zhibiaoliebiao.to_excel(file, sheet_name='指标列表', index=False)
# 数据处理
df = datachuli_juxiting(df_zhibiaoshuju, df_zhibiaoliebiao, y=y, dataset=dataset, add_kdj=add_kdj, is_timefurture=is_timefurture,
end_time=end_time)
else:
# 读取数据
logger.info('读取本地数据:' + os.path.join(dataset, data_set))
df,df_zhibiaoliebiao = getdata_juxiting(filename=os.path.join(dataset, data_set), y=y, dataset=dataset, add_kdj=add_kdj,
is_timefurture=is_timefurture, end_time=end_time) # 原始数据,未处理
# 更改预测列名称
df.rename(columns={y: 'y'}, inplace=True)
if is_edbnamelist:
df = df[edbnamelist]
df.to_csv(os.path.join(dataset, '指标数据.csv'), index=False)
# 保存最新日期的y值到数据库
# 取第一行数据存储到数据库中
first_row = df[['ds', 'y']].tail(1)
# 判断y的类型是否为float
if not isinstance(first_row['y'].values[0], float):
logger.info(f'{end_time}预测目标数据为空,跳过')
return None
# 将最新真实值保存到数据库
if not sqlitedb.check_table_exists('trueandpredict'):
first_row.to_sql('trueandpredict', sqlitedb.connection, index=False)
else:
for row in first_row.itertuples(index=False):
row_dict = row._asdict()
row_dict['ds'] = row_dict['ds'].strftime('%Y-%m-%d %H:%M:%S')
check_query = sqlitedb.select_data('trueandpredict', where_condition=f"ds = '{row.ds}'")
if len(check_query) > 0:
set_clause = ", ".join([f"{key} = '{value}'" for key, value in row_dict.items()])
sqlitedb.update_data('trueandpredict', set_clause, where_condition=f"ds = '{row.ds}'")
continue
sqlitedb.insert_data('trueandpredict', tuple(row_dict.values()), columns=row_dict.keys())
# 更新accuracy表的y值
if not sqlitedb.check_table_exists('accuracy'):
pass
else:
update_y = sqlitedb.select_data('accuracy',where_condition="y is null")
if len(update_y) > 0:
logger.info('更新accuracy表的y值')
# 找到update_y 中ds且df中的y的行
update_y = update_y[update_y['ds']<=end_time]
logger.info(f'要更新y的信息{update_y}')
# try:
for row in update_y.itertuples(index=False):
try:
row_dict = row._asdict()
yy = df[df['ds']==row_dict['ds']]['y'].values[0]
LOW = df[df['ds']==row_dict['ds']]['Brentzdj'].values[0]
HIGH = df[df['ds']==row_dict['ds']]['Brentzgj'].values[0]
sqlitedb.update_data('accuracy', f"y = {yy},LOW_PRICE = {LOW},HIGH_PRICE = {HIGH}", where_condition=f"ds = '{row_dict['ds']}'")
except:
logger.info(f'更新accuracy表的y值失败{row_dict}')
# except Exception as e:
# logger.info(f'更新accuracy表的y值失败{e}')
import datetime
# 判断当前日期是不是周一
is_weekday = datetime.datetime.now().weekday() == 0
if is_weekday:
logger.info('今天是周一,更新预测模型')
# 计算最近60天预测残差最低的模型名称
model_results = sqlitedb.select_data('trueandpredict', order_by="ds DESC", limit="60")
# 删除空值率为90%以上的列
if len(model_results) > 10:
model_results = model_results.dropna(thresh=len(model_results)*0.1,axis=1)
# 删除空行
model_results = model_results.dropna()
modelnames = model_results.columns.to_list()[2:-1]
for col in model_results[modelnames].select_dtypes(include=['object']).columns:
model_results[col] = model_results[col].astype(np.float32)
# 计算每个预测值与真实值之间的偏差率
for model in modelnames:
model_results[f'{model}_abs_error_rate'] = abs(model_results['y'] - model_results[model]) / model_results['y']
# 获取每行对应的最小偏差率值
min_abs_error_rate_values = model_results.apply(lambda row: row[[f'{model}_abs_error_rate' for model in modelnames]].min(), axis=1)
# 获取每行对应的最小偏差率值对应的列名
min_abs_error_rate_column_name = model_results.apply(lambda row: row[[f'{model}_abs_error_rate' for model in modelnames]].idxmin(), axis=1)
# 将列名索引转换为列名
min_abs_error_rate_column_name = min_abs_error_rate_column_name.map(lambda x: x.split('_')[0])
# 取出现次数最多的模型名称
most_common_model = min_abs_error_rate_column_name.value_counts().idxmax()
logger.info(f"最近60天预测残差最低的模型名称{most_common_model}")
# 保存结果到数据库
if not sqlitedb.check_table_exists('most_model'):
sqlitedb.create_table('most_model', columns="ds datetime, most_common_model TEXT")
sqlitedb.insert_data('most_model', (datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), most_common_model,), columns=('ds', 'most_common_model',))
try:
if is_weekday:
# if True:
logger.info('今天是周一,发送特征预警')
# 上传预警信息到数据库
warning_data_df = df_zhibiaoliebiao.copy()
warning_data_df = warning_data_df[warning_data_df['停更周期']> 3 ][['指标名称', '指标id', '频度','更新周期','指标来源','最后更新时间','停更周期']]
# 重命名列名
warning_data_df = warning_data_df.rename(columns={'指标名称': 'INDICATOR_NAME', '指标id': 'INDICATOR_ID', '频度': 'FREQUENCY', '更新周期': 'UPDATE_FREQUENCY', '指标来源': 'DATA_SOURCE', '最后更新时间': 'LAST_UPDATE_DATE', '停更周期': 'UPDATE_SUSPENSION_CYCLE'})
from sqlalchemy import create_engine
import urllib
global password
if '@' in password:
password = urllib.parse.quote_plus(password)
engine = create_engine(f'mysql+pymysql://{dbusername}:{password}@{host}:{port}/{dbname}')
warning_data_df['WARNING_DATE'] = datetime.date.today().strftime("%Y-%m-%d %H:%M:%S")
warning_data_df['TENANT_CODE'] = 'T0004'
# 插入数据之前查询表数据然后新增id列
existing_data = pd.read_sql(f"SELECT * FROM {table_name}", engine)
if not existing_data.empty:
max_id = existing_data['ID'].astype(int).max()
warning_data_df['ID'] = range(max_id + 1, max_id + 1 + len(warning_data_df))
else:
warning_data_df['ID'] = range(1, 1 + len(warning_data_df))
warning_data_df.to_sql(table_name, con=engine, if_exists='append', index=False)
if is_update_warning_data:
upload_warning_info(len(warning_data_df))
except:
logger.info('上传预警信息到数据库失败')
if is_corr:
df = corr_feature(df=df)
df1 = df.copy() # 备份一下后面特征筛选完之后加入ds y 列用
logger.info(f"开始训练模型...")
row, col = df.shape
now = datetime.datetime.now().strftime('%Y%m%d%H%M%S')
ex_Model_Juxiting(df,
horizon=horizon,
input_size=input_size,
train_steps=train_steps,
val_check_steps=val_check_steps,
early_stop_patience_steps=early_stop_patience_steps,
is_debug=is_debug,
dataset=dataset,
is_train=is_train,
is_fivemodels=is_fivemodels,
val_size=val_size,
test_size=test_size,
settings=settings,
now=now,
etadata=etadata,
modelsindex=modelsindex,
data=data,
is_eta=is_eta,
end_time=end_time,
)
logger.info('模型训练完成')
logger.info('训练数据绘图ing')
model_results3 = model_losss_juxiting(sqlitedb)
logger.info('训练数据绘图end')
# 模型报告
logger.info('制作报告ing')
title = f'{settings}--{end_time}-预测报告' # 报告标题
reportname = f'PP大模型预测报告--{end_time}.pdf' # 报告文件名
reportname = reportname.replace(':', '-') # 替换冒号
pp_export_pdf(dataset=dataset,num_models = 5 if is_fivemodels else 22,time=end_time,
reportname=reportname,sqlitedb=sqlitedb),
logger.info('制作报告end')
logger.info('模型训练完成')
# # LSTM 单变量模型
# ex_Lstm(df,input_seq_len=input_size,output_seq_len=horizon,is_debug=is_debug,dataset=dataset)
# # lstm 多变量模型
# ex_Lstm_M(df,n_days=input_size,out_days=horizon,is_debug=is_debug,datasetpath=dataset)
# # GRU 模型
# # ex_GRU(df)
# 发送邮件
m = SendMail(
username=username,
passwd=passwd,
recv=recv,
title=title,
content=content,
file=max(glob.glob(os.path.join(dataset,'*.pdf')), key=os.path.getctime),
ssl=ssl,
)
# m.send_mail()
if __name__ == '__main__':
# global end_time
# is_on = True
# # 遍历2024-11-25 到 2024-12-3 之间的工作日日期
# for i_time in pd.date_range('2025-1-20', '2025-2-6', freq='B'):
# end_time = i_time.strftime('%Y-%m-%d')
# try:
# predict_main()
# except:
# pass
predict_main()

View File

@ -3,7 +3,7 @@
from lib.dataread import *
from config_shiyoujiao_lvyong import *
from lib.tools import SendMail, exception_logger
from models.nerulforcastmodels import model_losss, shiyoujiao_lvyong_export_pdf
from models.nerulforcastmodels import ex_Model, model_losss, model_losss_juxiting, brent_export_pdf, tansuanli_export_pdf, pp_export_pdf, model_losss_juxiting
import datetime
import torch
torch.set_float32_matmul_precision("high")
@ -18,6 +18,7 @@ global_config.update({
'is_fivemodels': is_fivemodels,
'settings': settings,
'weight_dict': weight_dict,
'baichuanidnamedict': baichuanidnamedict,
# 模型参数
@ -72,11 +73,14 @@ global_config.update({
'edbdatapushurl': edbdatapushurl,
'edbdeleteurl': edbdeleteurl,
'edbbusinessurl': edbbusinessurl,
'edbcodenamedict': edbcodenamedict,
'ClassifyId': ClassifyId,
'classifylisturl': classifylisturl,
# 数据库配置
'sqlitedb': sqlitedb,
'db_mysql': db_mysql,
'baichuan_table_name': baichuan_table_name,
})
@ -173,228 +177,242 @@ def predict_main():
返回:
None
"""
# end_time = global_config['end_time']
# # 获取数据
# if is_eta:
# logger.info('从eta获取数据...')
# signature = BinanceAPI(APPID, SECRET)
# etadata = EtaReader(signature=signature,
# classifylisturl=global_config['classifylisturl'],
# classifyidlisturl=global_config['classifyidlisturl'],
# edbcodedataurl=global_config['edbcodedataurl'],
# edbcodelist=global_config['edbcodelist'],
# edbdatapushurl=global_config['edbdatapushurl'],
# edbdeleteurl=global_config['edbdeleteurl'],
# edbbusinessurl=global_config['edbbusinessurl'],
# classifyId=global_config['ClassifyId'],
# )
# df_zhibiaoshuju, df_zhibiaoliebiao = etadata.get_eta_api_shiyoujiao_lvyong_data(
# data_set=data_set, dataset=dataset) # 原始数据,未处理
# if is_market:
# logger.info('从市场信息平台获取数据...')
# try:
# # 如果是测试环境最高价最低价取excel文档
# if server_host == '192.168.100.53':
# logger.info('从excel文档获取最高价最低价')
# df_zhibiaoshuju = get_high_low_data(df_zhibiaoshuju)
# else:
# logger.info('从市场信息平台获取数据')
# df_zhibiaoshuju = get_market_data(
# end_time, df_zhibiaoshuju)
end_time = global_config['end_time']
# 获取数据
if is_eta:
logger.info('从eta获取数据...')
signature = BinanceAPI(APPID, SECRET)
etadata = EtaReader(signature=signature,
classifylisturl=global_config['classifylisturl'],
classifyidlisturl=global_config['classifyidlisturl'],
edbcodedataurl=global_config['edbcodedataurl'],
edbcodelist=global_config['edbcodelist'],
edbdatapushurl=global_config['edbdatapushurl'],
edbdeleteurl=global_config['edbdeleteurl'],
edbbusinessurl=global_config['edbbusinessurl'],
classifyId=global_config['ClassifyId'],
)
df_zhibiaoshuju, df_zhibiaoliebiao = etadata.get_eta_api_shiyoujiao_lvyong_data(
data_set=data_set, dataset=dataset) # 原始数据,未处理
# except:
# logger.info('最高最低价拼接失败')
if is_market:
logger.info('从市场信息平台获取数据...')
try:
# 如果是测试环境最高价最低价取excel文档
if server_host == '192.168.100.53':
logger.info('从excel文档获取最高价最低价')
df_zhibiaoshuju = get_high_low_data(df_zhibiaoshuju)
else:
logger.info('从市场信息平台获取数据')
df_zhibiaoshuju = get_market_data(
end_time, df_zhibiaoshuju)
# # 保存到xlsx文件的sheet表
# with pd.ExcelWriter(os.path.join(dataset, data_set)) as file:
# df_zhibiaoshuju.to_excel(file, sheet_name='指标数据', index=False)
# df_zhibiaoliebiao.to_excel(file, sheet_name='指标列表', index=False)
except:
logger.info('最高最低价拼接失败')
# # 数据处理
# df = datachuli(df_zhibiaoshuju, df_zhibiaoliebiao, y=global_config['y'], dataset=dataset, add_kdj=add_kdj, is_timefurture=is_timefurture,
# end_time=end_time)
if len(global_config['baichuanidnamedict']) > 0:
logger.info('从市场数据库获取百川数据...')
baichuandf = get_baichuan_data(global_config['baichuanidnamedict'])
df_zhibiaoshuju = pd.merge(
df_zhibiaoshuju, baichuandf, on='date', how='outer')
# 指标列表添加百川数据
df_baichuanliebiao = pd.DataFrame(
global_config['baichuanidnamedict'].items(), columns=['指标id', '指标名称'])
df_baichuanliebiao['指标分类'] = '百川'
df_baichuanliebiao['频度'] = '其他'
df_zhibiaoliebiao = pd.concat(
[df_zhibiaoliebiao, df_baichuanliebiao], axis=0)
# else:
# # 读取数据
# logger.info('读取本地数据:' + os.path.join(dataset, data_set))
# df, df_zhibiaoliebiao = getdata(filename=os.path.join(dataset, data_set), y=y, dataset=dataset, add_kdj=add_kdj,
# is_timefurture=is_timefurture, end_time=end_time) # 原始数据,未处理
# 保存到xlsx文件的sheet表
with pd.ExcelWriter(os.path.join(dataset, data_set)) as file:
df_zhibiaoshuju.to_excel(file, sheet_name='指标数据', index=False)
df_zhibiaoliebiao.to_excel(file, sheet_name='指标列表', index=False)
# # 更改预测列名称
# df.rename(columns={y: 'y'}, inplace=True)
# 数据处理
df = datachuli(df_zhibiaoshuju, df_zhibiaoliebiao, y=global_config['y'], dataset=dataset, add_kdj=add_kdj, is_timefurture=is_timefurture,
end_time=end_time)
# if is_edbnamelist:
# df = df[edbnamelist]
# df.to_csv(os.path.join(dataset, '指标数据.csv'), index=False)
# # 保存最新日期的y值到数据库
# # 取第一行数据存储到数据库中
# first_row = df[['ds', 'y']].tail(1)
# # 判断y的类型是否为float
# if not isinstance(first_row['y'].values[0], float):
# logger.info(f'{end_time}预测目标数据为空,跳过')
# return None
else:
# 读取数据
logger.info('读取本地数据:' + os.path.join(dataset, data_set))
df, df_zhibiaoliebiao = getdata(filename=os.path.join(dataset, data_set), y=y, dataset=dataset, add_kdj=add_kdj,
is_timefurture=is_timefurture, end_time=end_time) # 原始数据,未处理
# # 将最新真实值保存到数据库
# if not sqlitedb.check_table_exists('trueandpredict'):
# first_row.to_sql('trueandpredict', sqlitedb.connection, index=False)
# else:
# for row in first_row.itertuples(index=False):
# row_dict = row._asdict()
# config.logger.info(f'要保存的真实值:{row_dict}')
# # 判断ds是否为字符串类型,如果不是则转换为字符串类型
# if isinstance(row_dict['ds'], (pd.Timestamp, datetime.datetime)):
# row_dict['ds'] = row_dict['ds'].strftime('%Y-%m-%d')
# elif not isinstance(row_dict['ds'], str):
# try:
# row_dict['ds'] = pd.to_datetime(
# row_dict['ds']).strftime('%Y-%m-%d')
# except:
# logger.warning(f"无法解析的时间格式: {row_dict['ds']}")
# # row_dict['ds'] = row_dict['ds'].strftime('%Y-%m-%d')
# # row_dict['ds'] = row_dict['ds'].strftime('%Y-%m-%d %H:%M:%S')
# check_query = sqlitedb.select_data(
# 'trueandpredict', where_condition=f"ds = '{row.ds}'")
# if len(check_query) > 0:
# set_clause = ", ".join(
# [f"{key} = '{value}'" for key, value in row_dict.items()])
# sqlitedb.update_data(
# 'trueandpredict', set_clause, where_condition=f"ds = '{row.ds}'")
# continue
# sqlitedb.insert_data('trueandpredict', tuple(
# row_dict.values()), columns=row_dict.keys())
# 更改预测列名称
df.rename(columns={y: 'y'}, inplace=True)
# # 更新accuracy表的y值
# if not sqlitedb.check_table_exists('accuracy'):
# pass
# else:
# update_y = sqlitedb.select_data(
# 'accuracy', where_condition="y is null")
# if len(update_y) > 0:
# logger.info('更新accuracy表的y值')
# # 找到update_y 中ds且df中的y的行
# update_y = update_y[update_y['ds'] <= end_time]
# logger.info(f'要更新y的信息{update_y}')
# # try:
# for row in update_y.itertuples(index=False):
# try:
# row_dict = row._asdict()
# yy = df[df['ds'] == row_dict['ds']]['y'].values[0]
# LOW = df[df['ds'] == row_dict['ds']]['Brentzdj'].values[0]
# HIGH = df[df['ds'] == row_dict['ds']]['Brentzgj'].values[0]
# sqlitedb.update_data(
# 'accuracy', f"y = {yy},LOW_PRICE = {LOW},HIGH_PRICE = {HIGH}", where_condition=f"ds = '{row_dict['ds']}'")
# except:
# logger.info(f'更新accuracy表的y值失败{row_dict}')
# # except Exception as e:
# # logger.info(f'更新accuracy表的y值失败{e}')
if is_edbnamelist:
df = df[edbnamelist]
df.to_csv(os.path.join(dataset, '指标数据.csv'), index=False)
# 保存最新日期的y值到数据库
# 取第一行数据存储到数据库中
first_row = df[['ds', 'y']].tail(1)
# 判断y的类型是否为float
if not isinstance(first_row['y'].values[0], float):
logger.info(f'{end_time}预测目标数据为空,跳过')
return None
# # 判断当前日期是不是周一
# is_weekday = datetime.datetime.now().weekday() == 0
# if is_weekday:
# logger.info('今天是周一,更新预测模型')
# # 计算最近60天预测残差最低的模型名称
# model_results = sqlitedb.select_data(
# 'trueandpredict', order_by="ds DESC", limit="60")
# # 删除空值率为90%以上的列
# if len(model_results) > 10:
# model_results = model_results.dropna(
# thresh=len(model_results)*0.1, axis=1)
# # 删除空行
# model_results = model_results.dropna()
# modelnames = model_results.columns.to_list()[2:-1]
# for col in model_results[modelnames].select_dtypes(include=['object']).columns:
# model_results[col] = model_results[col].astype(np.float32)
# # 计算每个预测值与真实值之间的偏差率
# for model in modelnames:
# model_results[f'{model}_abs_error_rate'] = abs(
# model_results['y'] - model_results[model]) / model_results['y']
# # 获取每行对应的最小偏差率值
# min_abs_error_rate_values = model_results.apply(
# lambda row: row[[f'{model}_abs_error_rate' for model in modelnames]].min(), axis=1)
# # 获取每行对应的最小偏差率值对应的列名
# min_abs_error_rate_column_name = model_results.apply(
# lambda row: row[[f'{model}_abs_error_rate' for model in modelnames]].idxmin(), axis=1)
# # 将列名索引转换为列名
# min_abs_error_rate_column_name = min_abs_error_rate_column_name.map(
# lambda x: x.split('_')[0])
# # 取出现次数最多的模型名称
# most_common_model = min_abs_error_rate_column_name.value_counts().idxmax()
# logger.info(f"最近60天预测残差最低的模型名称{most_common_model}")
# # 保存结果到数据库
# if not sqlitedb.check_table_exists('most_model'):
# sqlitedb.create_table(
# 'most_model', columns="ds datetime, most_common_model TEXT")
# sqlitedb.insert_data('most_model', (datetime.datetime.now().strftime(
# '%Y-%m-%d %H:%M:%S'), most_common_model,), columns=('ds', 'most_common_model',))
# 将最新真实值保存到数据库
if not sqlitedb.check_table_exists('trueandpredict'):
first_row.to_sql('trueandpredict', sqlitedb.connection, index=False)
else:
for row in first_row.itertuples(index=False):
row_dict = row._asdict()
config.logger.info(f'要保存的真实值:{row_dict}')
# 判断ds是否为字符串类型,如果不是则转换为字符串类型
if isinstance(row_dict['ds'], (pd.Timestamp, datetime.datetime)):
row_dict['ds'] = row_dict['ds'].strftime('%Y-%m-%d')
elif not isinstance(row_dict['ds'], str):
try:
row_dict['ds'] = pd.to_datetime(
row_dict['ds']).strftime('%Y-%m-%d')
except:
logger.warning(f"无法解析的时间格式: {row_dict['ds']}")
# row_dict['ds'] = row_dict['ds'].strftime('%Y-%m-%d')
# row_dict['ds'] = row_dict['ds'].strftime('%Y-%m-%d %H:%M:%S')
check_query = sqlitedb.select_data(
'trueandpredict', where_condition=f"ds = '{row.ds}'")
if len(check_query) > 0:
set_clause = ", ".join(
[f"{key} = '{value}'" for key, value in row_dict.items()])
sqlitedb.update_data(
'trueandpredict', set_clause, where_condition=f"ds = '{row.ds}'")
continue
sqlitedb.insert_data('trueandpredict', tuple(
row_dict.values()), columns=row_dict.keys())
# try:
# if is_weekday:
# # if True:
# logger.info('今天是周一,发送特征预警')
# # 上传预警信息到数据库
# warning_data_df = df_zhibiaoliebiao.copy()
# warning_data_df = warning_data_df[warning_data_df['停更周期'] > 3][[
# '指标名称', '指标id', '频度', '更新周期', '指标来源', '最后更新时间', '停更周期']]
# # 重命名列名
# warning_data_df = warning_data_df.rename(columns={'指标名称': 'INDICATOR_NAME', '指标id': 'INDICATOR_ID', '频度': 'FREQUENCY',
# '更新周期': 'UPDATE_FREQUENCY', '指标来源': 'DATA_SOURCE', '最后更新时间': 'LAST_UPDATE_DATE', '停更周期': 'UPDATE_SUSPENSION_CYCLE'})
# from sqlalchemy import create_engine
# import urllib
# global password
# if '@' in password:
# password = urllib.parse.quote_plus(password)
# 更新accuracy表的y值
if not sqlitedb.check_table_exists('accuracy'):
pass
else:
update_y = sqlitedb.select_data(
'accuracy', where_condition="y is null")
if len(update_y) > 0:
logger.info('更新accuracy表的y值')
# 找到update_y 中ds且df中的y的行
update_y = update_y[update_y['ds'] <= end_time]
logger.info(f'要更新y的信息{update_y}')
# try:
for row in update_y.itertuples(index=False):
try:
row_dict = row._asdict()
yy = df[df['ds'] == row_dict['ds']]['y'].values[0]
LOW = df[df['ds'] == row_dict['ds']]['Brentzdj'].values[0]
HIGH = df[df['ds'] == row_dict['ds']]['Brentzgj'].values[0]
sqlitedb.update_data(
'accuracy', f"y = {yy},LOW_PRICE = {LOW},HIGH_PRICE = {HIGH}", where_condition=f"ds = '{row_dict['ds']}'")
except:
logger.info(f'更新accuracy表的y值失败{row_dict}')
# except Exception as e:
# logger.info(f'更新accuracy表的y值失败{e}')
# engine = create_engine(
# f'mysql+pymysql://{dbusername}:{password}@{host}:{port}/{dbname}')
# warning_data_df['WARNING_DATE'] = datetime.date.today().strftime(
# "%Y-%m-%d %H:%M:%S")
# warning_data_df['TENANT_CODE'] = 'T0004'
# # 插入数据之前查询表数据然后新增id列
# existing_data = pd.read_sql(f"SELECT * FROM {table_name}", engine)
# if not existing_data.empty:
# max_id = existing_data['ID'].astype(int).max()
# warning_data_df['ID'] = range(
# max_id + 1, max_id + 1 + len(warning_data_df))
# else:
# warning_data_df['ID'] = range(1, 1 + len(warning_data_df))
# warning_data_df.to_sql(
# table_name, con=engine, if_exists='append', index=False)
# if is_update_warning_data:
# upload_warning_info(len(warning_data_df))
# except:
# logger.info('上传预警信息到数据库失败')
# 判断当前日期是不是周一
is_weekday = datetime.datetime.now().weekday() == 0
if is_weekday:
logger.info('今天是周一,更新预测模型')
# 计算最近60天预测残差最低的模型名称
model_results = sqlitedb.select_data(
'trueandpredict', order_by="ds DESC", limit="60")
# 删除空值率为90%以上的列
if len(model_results) > 10:
model_results = model_results.dropna(
thresh=len(model_results)*0.1, axis=1)
# 删除空行
model_results = model_results.dropna()
modelnames = model_results.columns.to_list()[2:-1]
for col in model_results[modelnames].select_dtypes(include=['object']).columns:
model_results[col] = model_results[col].astype(np.float32)
# 计算每个预测值与真实值之间的偏差率
for model in modelnames:
model_results[f'{model}_abs_error_rate'] = abs(
model_results['y'] - model_results[model]) / model_results['y']
# 获取每行对应的最小偏差率值
min_abs_error_rate_values = model_results.apply(
lambda row: row[[f'{model}_abs_error_rate' for model in modelnames]].min(), axis=1)
# 获取每行对应的最小偏差率值对应的列名
min_abs_error_rate_column_name = model_results.apply(
lambda row: row[[f'{model}_abs_error_rate' for model in modelnames]].idxmin(), axis=1)
# 将列名索引转换为列名
min_abs_error_rate_column_name = min_abs_error_rate_column_name.map(
lambda x: x.split('_')[0])
# 取出现次数最多的模型名称
most_common_model = min_abs_error_rate_column_name.value_counts().idxmax()
logger.info(f"最近60天预测残差最低的模型名称{most_common_model}")
# 保存结果到数据库
if not sqlitedb.check_table_exists('most_model'):
sqlitedb.create_table(
'most_model', columns="ds datetime, most_common_model TEXT")
sqlitedb.insert_data('most_model', (datetime.datetime.now().strftime(
'%Y-%m-%d %H:%M:%S'), most_common_model,), columns=('ds', 'most_common_model',))
# if is_corr:
# df = corr_feature(df=df)
try:
if is_weekday:
# if True:
logger.info('今天是周一,发送特征预警')
# 上传预警信息到数据库
warning_data_df = df_zhibiaoliebiao.copy()
warning_data_df = warning_data_df[warning_data_df['停更周期'] > 3][[
'指标名称', '指标id', '频度', '更新周期', '指标来源', '最后更新时间', '停更周期']]
# 重命名列名
warning_data_df = warning_data_df.rename(columns={'指标名称': 'INDICATOR_NAME', '指标id': 'INDICATOR_ID', '频度': 'FREQUENCY',
'更新周期': 'UPDATE_FREQUENCY', '指标来源': 'DATA_SOURCE', '最后更新时间': 'LAST_UPDATE_DATE', '停更周期': 'UPDATE_SUSPENSION_CYCLE'})
from sqlalchemy import create_engine
import urllib
global password
if '@' in password:
password = urllib.parse.quote_plus(password)
# df1 = df.copy() # 备份一下后面特征筛选完之后加入ds y 列用
# logger.info(f"开始训练模型...")
# row, col = df.shape
engine = create_engine(
f'mysql+pymysql://{dbusername}:{password}@{host}:{port}/{dbname}')
warning_data_df['WARNING_DATE'] = datetime.date.today().strftime(
"%Y-%m-%d %H:%M:%S")
warning_data_df['TENANT_CODE'] = 'T0004'
# 插入数据之前查询表数据然后新增id列
existing_data = pd.read_sql(f"SELECT * FROM {table_name}", engine)
if not existing_data.empty:
max_id = existing_data['ID'].astype(int).max()
warning_data_df['ID'] = range(
max_id + 1, max_id + 1 + len(warning_data_df))
else:
warning_data_df['ID'] = range(1, 1 + len(warning_data_df))
warning_data_df.to_sql(
table_name, con=engine, if_exists='append', index=False)
if is_update_warning_data:
upload_warning_info(len(warning_data_df))
except:
logger.info('上传预警信息到数据库失败')
# now = datetime.datetime.now().strftime('%Y%m%d%H%M%S')
# ex_Model(df,
# horizon=global_config['horizon'],
# input_size=global_config['input_size'],
# train_steps=global_config['train_steps'],
# val_check_steps=global_config['val_check_steps'],
# early_stop_patience_steps=global_config['early_stop_patience_steps'],
# is_debug=global_config['is_debug'],
# dataset=global_config['dataset'],
# is_train=global_config['is_train'],
# is_fivemodels=global_config['is_fivemodels'],
# val_size=global_config['val_size'],
# test_size=global_config['test_size'],
# settings=global_config['settings'],
# now=now,
# etadata=global_config['etadata'],
# modelsindex=global_config['modelsindex'],
# data=data,
# is_eta=global_config['is_eta'],
# end_time=global_config['end_time'],
# )
if is_corr:
df = corr_feature(df=df)
# logger.info('模型训练完成')
df1 = df.copy() # 备份一下后面特征筛选完之后加入ds y 列用
logger.info(f"开始训练模型...")
row, col = df.shape
now = datetime.datetime.now().strftime('%Y%m%d%H%M%S')
ex_Model(df,
horizon=global_config['horizon'],
input_size=global_config['input_size'],
train_steps=global_config['train_steps'],
val_check_steps=global_config['val_check_steps'],
early_stop_patience_steps=global_config['early_stop_patience_steps'],
is_debug=global_config['is_debug'],
dataset=global_config['dataset'],
is_train=global_config['is_train'],
is_fivemodels=global_config['is_fivemodels'],
val_size=global_config['val_size'],
test_size=global_config['test_size'],
settings=global_config['settings'],
now=now,
etadata=global_config['etadata'],
modelsindex=global_config['modelsindex'],
data=data,
is_eta=global_config['is_eta'],
end_time=global_config['end_time'],
)
logger.info('模型训练完成')
logger.info('训练数据绘图ing')
model_results3 = model_losss(sqlitedb, end_time=end_time)
@ -403,15 +421,15 @@ def predict_main():
# 模型报告
logger.info('制作报告ing')
title = f'{settings}--{end_time}-预测报告' # 报告标题
reportname = f'石油焦铝用大模型日度预测--{end_time}.pdf' # 报告文件名
reportname = f'Brent原油大模型日度预测--{end_time}.pdf' # 报告文件名
reportname = reportname.replace(':', '-') # 替换冒号
shiyoujiao_lvyong_export_pdf(dataset=dataset, num_models=5 if is_fivemodels else 22, time=end_time,
brent_export_pdf(dataset=dataset, num_models=5 if is_fivemodels else 22, time=end_time,
reportname=reportname, sqlitedb=sqlitedb),
logger.info('制作报告end')
logger.info('模型训练完成')
# push_market_value()
push_market_value()
# # LSTM 单变量模型
# ex_Lstm(df,input_seq_len=input_size,output_seq_len=horizon,is_debug=is_debug,dataset=dataset)

View File

@ -301,7 +301,7 @@ def predict_main():
thresh=len(model_results)*0.1, axis=1)
# 删除空行
model_results = model_results.dropna()
modelnames = model_results.columns.to_list()[2:-2]
modelnames = model_results.columns.to_list()[2:-1]
for col in model_results[modelnames].select_dtypes(include=['object']).columns:
model_results[col] = model_results[col].astype(np.float32)
# 计算每个预测值与真实值之间的偏差率

View File

@ -866,7 +866,7 @@ def model_losss_yongan(sqlitedb, end_time, table_name_prefix):
plt.text(i, j, str(j), ha='center', va='bottom')
# 当前日期画竖虚线
plt.axvline(x=df['ds'].iloc[-config.horizon], color='r', linestyle='--')
plt.axvline(x=df['ds'].iloc[-horizon], color='r', linestyle='--')
plt.legend()
plt.xlabel('日期')
plt.ylabel('价格')
@ -881,8 +881,8 @@ def model_losss_yongan(sqlitedb, end_time, table_name_prefix):
ax.axis('off') # 关闭坐标轴
# 数值保留2位小数
df = df.round(2)
df = df[-config.horizon:]
df['Day'] = [f'Day_{i}' for i in range(1, config.horizon+1)]
df = df[-horizon:]
df['Day'] = [f'Day_{i}' for i in range(1, horizon+1)]
# Day列放到最前面
df = df[['Day'] + list(df.columns[:-1])]
table = ax.table(cellText=df.values,
@ -1297,7 +1297,7 @@ def model_losss(sqlitedb, end_time):
# plt.plot(df['ds'], df[model], label=model,marker='o')
plt.plot(df['ds'], df[most_model_name], label=model, marker='o')
# 当前日期画竖虚线
plt.axvline(x=df['ds'].iloc[-config.horizon], color='r', linestyle='--')
plt.axvline(x=df['ds'].iloc[-horizon], color='r', linestyle='--')
plt.legend()
plt.xlabel('日期')
# 设置横轴日期格式为年-月-日
@ -1338,7 +1338,7 @@ def model_losss(sqlitedb, end_time):
plt.text(i, j, str(j), ha='center', va='bottom')
# 当前日期画竖虚线
plt.axvline(x=df['ds'].iloc[-config.horizon], color='r', linestyle='--')
plt.axvline(x=df['ds'].iloc[-horizon], color='r', linestyle='--')
plt.legend()
plt.xlabel('日期')
# 自动设置横轴日期显示
@ -1357,8 +1357,8 @@ def model_losss(sqlitedb, end_time):
ax.axis('off') # 关闭坐标轴
# 数值保留2位小数
df = df.round(2)
df = df[-config.horizon:]
df['Day'] = [f'Day_{i}' for i in range(1, config.horizon+1)]
df = df[-horizon:]
df['Day'] = [f'Day_{i}' for i in range(1, horizon+1)]
# Day列放到最前面
df = df[['Day'] + list(df.columns[:-1])]
table = ax.table(cellText=df.values,
@ -1388,10 +1388,10 @@ def model_losss(sqlitedb, end_time):
bbox_inches='tight')
plt.close()
_plt_predict_ture(df_combined3)
# _plt_predict_ture(df_combined3)
# _plt_modeltopten_predict_ture(df_combined4)
_plt_predict_table(df_combined3)
_plt_model_results3()
# _plt_predict_table(df_combined3)
# _plt_model_results3()
return model_results3
@ -2461,319 +2461,6 @@ def brent_export_pdf(num_indicators=475, num_models=21, num_dayindicator=202, in
print(f"请求超时: {e}")
@exception_logger
def shiyoujiao_lvyong_export_pdf(num_indicators=475, num_models=21, num_dayindicator=202, inputsize=5, dataset='dataset', time='2024-07-30', reportname='report.pdf', sqlitedb='jbsh_yuanyou.db'):
global y
# 创建内容对应的空列表
content = list()
# 获取特征的近一月值
import pandas as pd
feature_data_df = pd.read_csv(os.path.join(
config.dataset,'指标数据添加时间特征.csv'), parse_dates=['ds']).tail(60)
def draw_feature_trend(feature_data_df, features):
# 画特征近60天的趋势图
feature_df = feature_data_df[['ds', 'y']+features]
# 遍历X每一列和yy画散点图
for i, col in enumerate(features):
# try:
print(f'正在绘制第{i+1}个特征{col}与价格散点图...')
if col not in ['ds', 'y']:
fig, ax1 = plt.subplots(figsize=(10, 6))
# 在第一个坐标轴上绘制数据
sns.lineplot(data=feature_df, x='ds', y='y', ax=ax1, color='b')
ax1.set_xlabel('日期')
ax1.set_ylabel('y', color='b')
ax1.tick_params('y', colors='b')
# 在 ax1 上添加文本显示值,添加一定的偏移避免值与曲线重叠
for j in range(1, len(feature_df), 2):
value = feature_df['y'].iloc[j]
date = feature_df['ds'].iloc[j]
offset = 1.001
ax1.text(date, value * offset, str(round(value, 2)),
ha='center', va='bottom', color='b', fontsize=10)
# 创建第二个坐标轴
ax2 = ax1.twinx()
# 在第二个坐标轴上绘制数据
sns.lineplot(data=feature_df, x='ds', y=col, ax=ax2, color='r')
ax2.set_ylabel(col, color='r')
ax2.tick_params('y', colors='r')
# 在 ax2 上添加文本显示值,添加一定的偏移避免值与曲线重叠
for j in range(0, len(feature_df), 2):
value = feature_df[col].iloc[j]
date = feature_df['ds'].iloc[j]
offset = 1.0003
ax2.text(date, value * offset, str(round(value, 2)),
ha='center', va='bottom', color='r', fontsize=10)
# 添加标题
plt.title(col)
# 设置横坐标为日期格式并自动调整
locator = mdates.AutoDateLocator()
formatter = mdates.AutoDateFormatter(locator)
ax1.xaxis.set_major_locator(locator)
ax1.xaxis.set_major_formatter(formatter)
# 文件名特殊字符处理
col = col.replace('*', '-')
col = col.replace(':', '-')
col = col.replace(r'/', '-')
plt.savefig(os.path.join(config.dataset, f'{col}与价格散点图.png'))
content.append(Graphs.draw_img(
os.path.join(config.dataset, f'{col}与价格散点图.png')))
plt.close()
# except Exception as e:
# print(f'绘制第{i+1}个特征{col}与价格散点图时出错:{e}')
# 添加标题
content.append(Graphs.draw_title(f'{config.y}{time}预测报告'))
# 预测结果
content.append(Graphs.draw_little_title('一、预测结果:'))
# 添加历史走势及预测价格的走势图片
content.append(Graphs.draw_img(os.path.join(config.dataset, '历史价格-预测值.png')))
# 波动率画图逻辑
content.append(Graphs.draw_text('图示说明:'))
content.append(Graphs.draw_text(
' 确定置信区间:设置残差置信阈值,以每周最佳模型为基准,选取在置信区间的预测值作为置信区间;'))
# 取df中y列为空的行
import pandas as pd
df = pd.read_csv(os.path.join(config.dataset, 'predict.csv'), encoding='gbk')
df_true = pd.read_csv(os.path.join(
config.dataset,'指标数据添加时间特征.csv'), encoding='utf-8') # 获取预测日期对应的真实值
df_true = df_true[['ds', 'y']]
eval_df = pd.read_csv(os.path.join(
config.dataset,'model_evaluation.csv'), encoding='utf-8')
# 按评估指标排序,取前五
fivemodels_list = eval_df['模型(Model)'].values # 列表形式,后面当作列名索引使用
# 取 fivemodels_list 和 ds 列
df = df[['ds'] + fivemodels_list.tolist()]
# 拼接预测日期对应的真实值
df = pd.merge(df, df_true, on='ds', how='left')
# 删除全部为nan的列
df = df.dropna(how='all', axis=1)
# 选择除 'ds' 列外的数值列,并进行类型转换和四舍五入
num_cols = [col for col in df.columns if col !=
'ds' and pd.api.types.is_numeric_dtype(df[col])]
for col in num_cols:
df[col] = df[col].astype(float).round(2)
# 添加最大值、最小值、平均值三列
df['平均值'] = df[num_cols].mean(axis=1).round(2)
df['最大值'] = df[num_cols].max(axis=1)
df['最小值'] = df[num_cols].min(axis=1)
# df转置
df = df.T
# df重置索引
df = df.reset_index()
# 添加预测值表格
data = df.values.tolist()
col_width = 500/len(df.columns)
content.append(Graphs.draw_table(col_width, *data))
content.append(Graphs.draw_little_title('二、上一预测周期偏差率分析:'))
df = pd.read_csv(os.path.join(
config.dataset,'testandpredict_groupby.csv'), encoding='utf-8')
df4 = df.copy() # 计算偏差率使用
# 去掉created_dt 列
df4 = df4.drop(columns=['created_dt'])
# 计算模型偏差率
# 计算各列对于y列的差值百分比
df3 = pd.DataFrame() # 存储偏差率
# 删除有null的行
df4 = df4.dropna()
df3['ds'] = df4['ds']
for col in fivemodels_list:
df3[col] = round(abs(df4[col] - df4['y']) / df4['y'] * 100, 2)
# 找出决定系数前五的偏差率
df3 = df3[['ds']+fivemodels_list.tolist()][-inputsize:]
# 找出上一预测区间的时间
stime = df3['ds'].iloc[0]
etime = df3['ds'].iloc[-1]
# 添加偏差率表格
fivemodels = ''.join(eval_df['模型(Model)'].values[:5]) # 字符串形式,后面写入字符串使用
content.append(Graphs.draw_text(
f'预测使用了{num_models}个模型进行训练使用评估结果MAE前五的模型分别是 {fivemodels} ,模型上一预测区间 {stime} -- {etime}的偏差率(%)分别是:'))
# # 添加偏差率表格
df3 = df3.T
df3 = df3.reset_index()
data = df3.values.tolist()
col_width = 500/len(df3.columns)
content.append(Graphs.draw_table(col_width, *data))
content.append(Graphs.draw_little_title('上一周预测准确率:'))
df4 = sqlitedb.select_data('accuracy_rote', order_by='结束日期 desc', limit=1)
df4 = df4.T
df4 = df4.reset_index()
df4 = df4.T
data = df4.values.tolist()
col_width = 500/len(df4.columns)
content.append(Graphs.draw_table(col_width, *data))
content.append(Graphs.draw_little_title('三、预测过程解析:'))
# 特征、模型、参数配置
content.append(Graphs.draw_little_title('模型选择:'))
content.append(Graphs.draw_text(
f'本次预测使用了一个专门收集时间序列的NeuralForecast库中的{num_models}个模型:'))
content.append(Graphs.draw_text(f'使用40天的数据预测未来{inputsize}天的数据。'))
content.append(Graphs.draw_little_title('指标情况:'))
with open(os.path.join(config.dataset, '特征频度统计.txt'), encoding='utf-8') as f:
for line in f.readlines():
content.append(Graphs.draw_text(line))
data = pd.read_csv(os.path.join(config.dataset, '指标数据添加时间特征.csv'),
encoding='utf-8') # 计算相关系数用
df_zhibiaofenlei = loadcsv(os.path.join(
config.dataset,'特征处理后的指标名称及分类.csv')) # 气泡图用
df_zhibiaoshuju = data.copy() # 气泡图用
# 绘制特征相关气泡图
grouped = df_zhibiaofenlei.groupby('指标分类')
grouped_corr = pd.DataFrame(columns=['指标分类', '指标数量', '相关性总和'])
content.append(Graphs.draw_little_title('按指标分类分别与预测目标进行皮尔逊相关系数分析:'))
content.append(Graphs.draw_text('''皮尔逊相关系数说明:'''))
content.append(Graphs.draw_text('''衡量两个特征之间的线性相关性。'''))
content.append(Graphs.draw_text('''
相关系数为1表示两个变量之间存在完全正向的线性关系即当一个变量增加时另一个变量也相应增加且变化是完全一致的'''))
content.append(Graphs.draw_text(
'''相关系数为-1表示两个变量之间存在完全负向的线性关系即当一个变量增加时另一个变量会相应减少且变化是完全相反的'''))
content.append(Graphs.draw_text(
'''相关系数接近0表示两个变量之间不存在线性关系即它们的变化不会随着对方的变化而变化。'''))
for name, group in grouped:
cols = group['指标名称'].tolist()
config.logger.info(f'开始绘制{name}类指标的相关性直方图')
cols_subset = cols
feature_names = ['y'] + cols_subset
correlation_matrix = df_zhibiaoshuju[feature_names].corr()['y']
# 绘制特征相关性直方分布图
plt.figure(figsize=(10, 8))
sns.histplot(correlation_matrix.values.flatten(),
bins=20, kde=True, color='skyblue')
plt.title(f'{name}类指标(共{len(cols_subset)}个)相关性直方分布图')
plt.xlabel('相关系数')
plt.ylabel('频数')
plt.savefig(os.path.join(
config.dataset,f'{name}类指标相关性直方分布图.png'), bbox_inches='tight')
plt.close()
content.append(Graphs.draw_img(
os.path.join(config.dataset, f'{name}类指标相关性直方分布图.png')))
content.append(Graphs.draw_text(
f'{name}类指标(共{len(cols_subset)}个)的相关性直方分布图如上所示。'))
# 相关性大于0的特征
positive_corr_features = correlation_matrix[correlation_matrix > 0].sort_values(
ascending=False).index.tolist()[1:]
print(f'{name}下正相关的特征值有:', positive_corr_features)
if len(positive_corr_features) > 5:
positive_corr_features = positive_corr_features[0:5]
content.append(Graphs.draw_text(
f'{name}类指标中与预测目标y正相关前五的特征有{positive_corr_features}'))
draw_feature_trend(feature_data_df, positive_corr_features)
elif len(positive_corr_features) == 0:
pass
else:
positive_corr_features = positive_corr_features
content.append(Graphs.draw_text(
f'其中与预测目标y正相关的特征有{positive_corr_features}'))
draw_feature_trend(feature_data_df, positive_corr_features)
# 相关性小于0的特征
negative_corr_features = correlation_matrix[correlation_matrix < 0].sort_values(
ascending=True).index.tolist()
print(f'{name}下负相关的特征值有:', negative_corr_features)
if len(negative_corr_features) > 5:
negative_corr_features = negative_corr_features[:5]
content.append(Graphs.draw_text(
f'与预测目标y负相关前五的特征有{negative_corr_features}'))
draw_feature_trend(feature_data_df, negative_corr_features)
elif len(negative_corr_features) == 0:
pass
else:
content.append(Graphs.draw_text(
f'{name}类指标中与预测目标y负相关的特征有{negative_corr_features}'))
draw_feature_trend(feature_data_df, negative_corr_features)
# 计算correlation_sum 第一行的相关性的绝对值的总和
correlation_sum = correlation_matrix.abs().sum()
config.logger.info(f'{name}类指标的相关性总和为:{correlation_sum}')
# 分组的相关性总和拼接到grouped_corr
goup_corr = pd.DataFrame(
{'指标分类': [name], '指标数量': [len(cols_subset)], '相关性总和': [correlation_sum]})
grouped_corr = pd.concat(
[grouped_corr, goup_corr], axis=0, ignore_index=True)
# 绘制相关性总和的气泡图
config.logger.info(f'开始绘制相关性总和的气泡图')
plt.figure(figsize=(10, 10))
sns.scatterplot(data=grouped_corr, x='相关性总和', y='指标数量', size='相关性总和', sizes=(
grouped_corr['相关性总和'].min()*5, grouped_corr['相关性总和'].max()*5), hue='指标分类', palette='viridis')
plt.title('指标分类相关性总和的气泡图')
plt.ylabel('数量')
plt.savefig(os.path.join(config.dataset, '指标分类相关性总和的气泡图.png'),
bbox_inches='tight')
plt.close()
content.append(Graphs.draw_img(os.path.join(config.dataset, '指标分类相关性总和的气泡图.png')))
content.append(Graphs.draw_text(
'气泡图中,横轴为指标分类,纵轴为指标分类下的特征数量,气泡的面积越大表示该分类中特征的相关系数和越大。'))
config.logger.info(f'绘制相关性总和的气泡图结束')
content.append(Graphs.draw_little_title('模型选择:'))
content.append(Graphs.draw_text(
f'预测使用了{num_models}个模型进行训练拟合通过评估指标MAE从小到大排列前5个模型的简介如下'))
# 读取模型简介
with open(os.path.join(config.dataset, 'model_introduction.txt'), 'r', encoding='utf-8') as f:
for line in f:
line_split = line.strip().split('--')
if line_split[0] in fivemodels_list:
for introduction in line_split:
content.append(Graphs.draw_text(introduction))
content.append(Graphs.draw_little_title('模型评估:'))
df = pd.read_csv(os.path.join(
config.dataset,'model_evaluation.csv'), encoding='utf-8')
# 判断 df 的数值列转为float
for col in eval_df.columns:
if col not in ['模型(Model)']:
eval_df[col] = eval_df[col].astype(float)
eval_df[col] = eval_df[col].round(3)
# 筛选 fivemodels_list.tolist() 的行
eval_df = eval_df[eval_df['模型(Model)'].isin(fivemodels_list)]
# df转置
eval_df = eval_df.T
# df重置索引
eval_df = eval_df.reset_index()
eval_df = eval_df.T
# # 添加表格
data = eval_df.values.tolist()
col_width = 500/len(eval_df.columns)
content.append(Graphs.draw_table(col_width, *data))
content.append(Graphs.draw_text('评估指标释义:'))
content.append(Graphs.draw_text(
'1. 均方根误差(RMSE):均方根误差是衡量预测值与实际值之间误差的一种方法,取值越小,误差越小,预测效果越好。'))
content.append(Graphs.draw_text(
'2. 平均绝对误差(MAE):平均绝对误差是衡量预测值与实际值之间误差的一种方法,取值越小,误差越小,预测效果越好。'))
content.append(Graphs.draw_text(
'3. 平均平方误差(MSE):平均平方误差是衡量预测值与实际值之间误差的一种方法,取值越小,误差越小,预测效果越好。'))
content.append(Graphs.draw_text('模型拟合:'))
# 添加图片
content.append(Graphs.draw_img(os.path.join(config.dataset, '预测值与真实值对比图.png')))
# 生成pdf文件
doc = SimpleDocTemplate(os.path.join(config.dataset, reportname), pagesize=letter)
doc.build(content)
# pdf 上传到数字化信息平台
try:
if config.is_update_report:
with open(os.path.join(config.dataset, reportname), 'rb') as f:
base64_data = base64.b64encode(f.read()).decode('utf-8')
upload_data["data"]["fileBase64"] = base64_data
upload_data["data"]["fileName"] = reportname
token = get_head_auth_report()
upload_report_data(token, upload_data)
except TimeoutError as e:
print(f"请求超时: {e}")
@exception_logger
def pp_export_pdf(num_indicators=475, num_models=21, num_dayindicator=202, inputsize=5, dataset='dataset', time='2024-07-30', reportname='report.pdf', sqlitedb='jbsh_yuanyou.db'):
# 创建内容对应的空列表

View File

@ -11,9 +11,11 @@ import logging.handlers
import os
import re
import requests
import json
class EtaReader():
def __init__(self,signature,classifylisturl,classifyidlisturl,edbcodedataurl,edbcodelist,edbdatapushurl,edbdeleteurl,edbbusinessurl):
def __init__(self, signature, classifylisturl, classifyidlisturl, edbcodedataurl, edbcodelist, edbdatapushurl, edbdeleteurl, edbbusinessurl):
'''
初始化 EtaReader 类的实例
@ -39,17 +41,16 @@ class EtaReader():
self.edbdeleteurl = edbdeleteurl
self.edbbusinessurl = edbbusinessurl
def filter_yuanyou_data(self,ClassifyName,data):
def filter_yuanyou_data(self, ClassifyName, data):
'''
指标名称保留规则
'''
# 包含 关键词 去除, 返回flase
if any(keyword in data for keyword in ['运费','检修','波动率','地缘政治','股价',
'同比','环比','环差','裂差','4WMA','变频','道琼斯','标普500','纳斯达克',
'四周均值','名占比','残差','DMA',
'连7-连9','4周平均','4周均值','滚动相关性','日本']):
if any(keyword in data for keyword in ['运费', '检修', '波动率', '地缘政治', '股价',
'同比', '环比', '环差', '裂差', '4WMA', '变频', '道琼斯', '标普500', '纳斯达克',
'四周均值', '名占比', '残差', 'DMA',
'连7-连9', '4周平均', '4周均值', '滚动相关性', '日本']):
return False
# 检查需要的特征
@ -59,7 +60,7 @@ class EtaReader():
# 保留 库存中特殊关键词
if ClassifyName == '库存':
if any(keyword in data for keyword in ['原油' , '美国' ,'全球' ,'中国' ,'富查伊拉','ARA' ]):
if any(keyword in data for keyword in ['原油', '美国', '全球', '中国', '富查伊拉', 'ARA']):
return True
else:
pass
@ -77,8 +78,8 @@ class EtaReader():
# 去掉 航班中不是中国、美国 的数据
if ClassifyName == '需求':
if '航班' in data :
if '中国' in data or '美国' in data :
if '航班' in data:
if '中国' in data or '美国' in data:
return True
else:
return False
@ -95,7 +96,7 @@ class EtaReader():
c = int(data.split('c1-c')[1])
except:
return False
if c > 9 :
if c > 9:
return False
else:
pass
@ -104,24 +105,24 @@ class EtaReader():
pass
# 判断 同质性数据, 字符串开头
strstartdict = {'ICE Brent c':"ICE Brent c14",
'NYMWX WTI c':"NYMWX WTI c5",
'INE SC c':"INE SC c1",
'EFS c':"EFS c",
'Dubai Swap c':"Dubai Swap c1",
'Oman Swap c':"Oman Swap c1",
'DME Oman c':"DME Oman c1",
'Murban Futures c':"Murban Futures c1",
'Dubai连合约价格':'Dubai连1合约价格',
'美国RBOB期货月份合约价格':'美国RBOB期货2309月份合约价格',
'Brent连合约价格':'Brent连1合约价格',
'WTI连合约价格':'WTI连1合约价格',
'布伦特连合约价格':'Brent连1合约价格',
'Brent 连合约价格':'Brent连1合约价格',
'Dubai连合约价格':'Dubai连1合约价格',
'Brent连':'Brent连1合约价格',
'brent连':'Brent连1合约价格',
}
strstartdict = {'ICE Brent c': "ICE Brent c14",
'NYMWX WTI c': "NYMWX WTI c5",
'INE SC c': "INE SC c1",
'EFS c': "EFS c",
'Dubai Swap c': "Dubai Swap c1",
'Oman Swap c': "Oman Swap c1",
'DME Oman c': "DME Oman c1",
'Murban Futures c': "Murban Futures c1",
'Dubai连合约价格': 'Dubai连1合约价格',
'美国RBOB期货月份合约价格': '美国RBOB期货2309月份合约价格',
'Brent连合约价格': 'Brent连1合约价格',
'WTI连合约价格': 'WTI连1合约价格',
'布伦特连合约价格': 'Brent连1合约价格',
'Brent 连合约价格': 'Brent连1合约价格',
'Dubai连合约价格': 'Dubai连1合约价格',
'Brent连': 'Brent连1合约价格',
'brent连': 'Brent连1合约价格',
}
# 判断名称字符串开头是否在 strstartdict.keys中
match = re.match(r'([a-zA-Z\s]+)(\d+)', data)
if match:
@ -135,11 +136,11 @@ class EtaReader():
# data = 'Brent 连7合约价格'
# 判断名称字符串去掉数字后是否在 strstartdict.keys中
match = re.findall(r'\D+', data)
if match :
if match:
if len(match) == 2:
part1 = match[0]
part2 = match[1]
if part1+part2 in [i for i in strstartdict.keys()]:
if part1+part2 in [i for i in strstartdict.keys()]:
if data == strstartdict[part1+part2]:
return True
else:
@ -150,7 +151,7 @@ class EtaReader():
match = re.findall(r'\D+', data)
part1 = match[0]
if part1 in [i for i in strstartdict.keys()]:
if part1 in [i for i in strstartdict.keys()]:
if data == strstartdict[part1]:
return True
else:
@ -166,7 +167,7 @@ class EtaReader():
return True
def filter_pp_data(self,ClassifyName,data):
def filter_pp_data(self, ClassifyName, data):
'''
指标名称保留规则
'''
@ -181,8 +182,6 @@ class EtaReader():
if any(keyword in data for keyword in ['拉丝']):
return True
# 检查需要的特征
# 去掉 期货市场 分类下的数据
if ClassifyName == '期货市场':
@ -214,18 +213,16 @@ class EtaReader():
else:
pass
# 保留 需求 下所有指标
if ClassifyName == '需求':
return True
else:
pass
return True
# 通过edbcode 获取指标数据
def edbcodegetdata(self,df,EdbCode,EdbName):
def edbcodegetdata(self, df, EdbCode, EdbName):
# 根据指标id获取指标数据
url = self.edbcodedataurl+str(EdbCode)
# 发送GET请求
@ -236,7 +233,8 @@ class EtaReader():
data = response.json() # 假设接口返回的是JSON数据
all_data_items = data.get('Data')
# 列表转换为DataFrame
df3 = pd.DataFrame(all_data_items, columns=['DataTime', 'Value', 'UpdateTime'])
df3 = pd.DataFrame(all_data_items, columns=[
'DataTime', 'Value', 'UpdateTime'])
# df3 = pd.read_json(all_data_items, orient='records')
# 去掉UpdateTime 列
@ -244,7 +242,8 @@ class EtaReader():
# df3.set_index('DataTime')
df3.rename(columns={'Value': EdbName}, inplace=True)
# 将数据存储df1
df = pd.merge(df, df3, how='outer',on='DataTime',suffixes= ('', '_y'))
df = pd.merge(df, df3, how='outer',
on='DataTime', suffixes=('', '_y'))
# 按时间排序
df = df.sort_values(by='DataTime', ascending=True)
return df
@ -255,7 +254,7 @@ class EtaReader():
# 主动抛出异常
raise Exception(f'Error: {response.status_code}, {response.text}')
def get_eta_api_yuanyou_data(self,data_set,dataset=''):
def get_eta_api_yuanyou_data(self, data_set, dataset=''):
'''
从ETA API获取原油数据
@ -271,7 +270,8 @@ class EtaReader():
# 定义你的headers这里可以包含多个参数
self.headers = {
'nonce': self.signature.nonce, # 例如,一个认证令牌
'timestamp': str(self.signature.timestamp), # 自定义的header参数
# 自定义的header参数
'timestamp': str(self.signature.timestamp),
'appid': self.signature.APPID, # 另一个自定义的header参数
'signature': self.signature.signature
}
@ -285,16 +285,16 @@ class EtaReader():
'''
# 构建新的DataFrame df df1
df = pd.DataFrame(columns=['指标分类', '指标名称', '指标id', '频度','指标来源','来源id','最后更新时间','更新周期','预警日期','停更周期'])
df = pd.DataFrame(columns=[
'指标分类', '指标名称', '指标id', '频度', '指标来源', '来源id', '最后更新时间', '更新周期', '预警日期', '停更周期'])
df1 = pd.DataFrame(columns=['DataTime'])
# 外网环境无法访问,请确认是否为内网环境
try:
# 发送GET请求 获取指标分类列表
response = requests.get(self.classifylisturl, headers=self.headers)
except requests.exceptions.RequestException as e:
raise Exception(f"请求失败,请确认是否为内网环境: {e}","\033[0m")
raise Exception(f"请求失败,请确认是否为内网环境: {e}", "\033[0m")
# 检查响应状态码
if response.status_code == 200:
@ -304,19 +304,20 @@ class EtaReader():
# 请求成功,处理响应内容
# logger.info(data.get('Data'))
# 定义你想要保留的固定值
fixed_value = 1214
fixed_value = 1193
# 遍历列表,只保留那些'category' key的值为固定值的数据项
filtered_data = [item for item in data.get('Data') if item.get('ParentId') == fixed_value]
filtered_data = [item for item in data.get(
'Data') if item.get('ParentId') == fixed_value]
#然后循环filtered_data去获取list数据才能获取到想要获取的ClassifyId
# 然后循环filtered_data去获取list数据才能获取到想要获取的ClassifyId
n = 0
for item in filtered_data:
n+= 1
n += 1
# if n>50:
# break
ClassifyId = item["ClassifyId"] #分类id分类下的指标列表接口的请求参数
ClassifyName = item["ClassifyName"] #分类名称要保存到df的指标分类列
ClassifyId = item["ClassifyId"] # 分类id分类下的指标列表接口的请求参数
ClassifyName = item["ClassifyName"] # 分类名称要保存到df的指标分类列
# 根据分类id获取指标列表
url = self.classifyidlisturl+str(ClassifyId)
response = requests.get(url, headers=self.headers)
@ -327,12 +328,13 @@ class EtaReader():
for i in Data:
# s+= 1
EdbCode = i.get('EdbCode')
EdbName = i.get('EdbName') # 指标名称要保存到df2的指标名称列,df的指标名称列
# 指标名称要保存到df2的指标名称列,df的指标名称列
EdbName = i.get('EdbName')
Frequency = i.get('Frequency') # 频度要保存到df的频度列
SourceName = i.get('SourceName') # 来源名称要保存到df的频度列
Source = i.get('Source') # 来源ID要保存到df的频度列
# 频度不是 日 或者 周的 跳过
if Frequency not in ['日度','周度','','']:
if Frequency not in ['日度', '周度', '', '']:
continue
# 只保留手工数据中,名称带有 海运出口 海运进口
@ -343,44 +345,53 @@ class EtaReader():
if Source == 2:
continue
# 判断名称是否需要保存
isSave = self.filter_yuanyou_data(ClassifyName,EdbName)
isSave = self.filter_yuanyou_data(
ClassifyName, EdbName)
if isSave:
# 保存到df
df1 = self.edbcodegetdata(df1,EdbCode,EdbName)
df1 = self.edbcodegetdata(df1, EdbCode, EdbName)
# 取df1所有行最后一列
edbname_df = df1[['DataTime',f'{EdbName}']]
edbname_df = df1[['DataTime', f'{EdbName}']]
edbname_df = edbname_df.dropna()
if len(edbname_df) == 0:
logger.info(f'指标名称:{EdbName} 没有数据')
continue
try:
time_sequence = edbname_df['DataTime'].values.tolist()[-10:]
time_sequence = edbname_df['DataTime'].values.tolist(
)[-10:]
except IndexError:
time_sequence = edbname_df['DataTime'].values.tolist()
time_sequence = edbname_df['DataTime'].values.tolist(
)
# 使用Counter来统计每个星期几的出现次数
from collections import Counter
weekday_counter = Counter(datetime.datetime.strptime(time_str, "%Y-%m-%d").strftime('%A') for time_str in time_sequence)
weekday_counter = Counter(datetime.datetime.strptime(
time_str, "%Y-%m-%d").strftime('%A') for time_str in time_sequence)
# 打印出现次数最多的星期几
try:
most_common_weekday = weekday_counter.most_common(1)[0][0]
most_common_weekday = weekday_counter.most_common(1)[
0][0]
# 计算两周后的日期
warning_date = (datetime.datetime.strptime(time_sequence[-1], "%Y-%m-%d") + datetime.timedelta(weeks=2)).strftime("%Y-%m-%d")
stop_update_period = (datetime.datetime.strptime(today, "%Y-%m-%d") - datetime.datetime.strptime(time_sequence[-1], "%Y-%m-%d")).days // 7
warning_date = (datetime.datetime.strptime(
time_sequence[-1], "%Y-%m-%d") + datetime.timedelta(weeks=2)).strftime("%Y-%m-%d")
stop_update_period = (datetime.datetime.strptime(
today, "%Y-%m-%d") - datetime.datetime.strptime(time_sequence[-1], "%Y-%m-%d")).days // 7
except IndexError:
most_common_weekday = '其他'
stop_update_period = 0
if '' in Frequency:
most_common_weekday = '每天'
warning_date = (datetime.datetime.strptime(time_sequence[-1], "%Y-%m-%d") + datetime.timedelta(days=3)).strftime("%Y-%m-%d")
stop_update_period = (datetime.datetime.strptime(today, "%Y-%m-%d") - datetime.datetime.strptime(time_sequence[-1], "%Y-%m-%d")).days
warning_date = (datetime.datetime.strptime(
time_sequence[-1], "%Y-%m-%d") + datetime.timedelta(days=3)).strftime("%Y-%m-%d")
stop_update_period = (datetime.datetime.strptime(
today, "%Y-%m-%d") - datetime.datetime.strptime(time_sequence[-1], "%Y-%m-%d")).days
# 保存频度 指标名称 分类 指标id 到 df
df2 = pd.DataFrame({'指标分类': ClassifyName, '指标名称': EdbName, '指标id': EdbCode, '频度': Frequency,'指标来源':SourceName,'来源id':Source,'最后更新时间':edbname_df['DataTime'].values[-1],'更新周期':most_common_weekday,'预警日期':warning_date,'停更周期':stop_update_period},index=[0])
df2 = pd.DataFrame({'指标分类': ClassifyName, '指标名称': EdbName, '指标id': EdbCode, '频度': Frequency, '指标来源': SourceName, '来源id': Source,
'最后更新时间': edbname_df['DataTime'].values[-1], '更新周期': most_common_weekday, '预警日期': warning_date, '停更周期': stop_update_period}, index=[0])
# df = pd.merge(df, df2, how='outer')
df = pd.concat([df, df2])
@ -388,7 +399,8 @@ class EtaReader():
logger.info(f'跳过指标 {EdbName}')
# 找到列表中不在指标列中的指标id保存成新的list
new_list = [item for item in self.edbcodelist if item not in df['指标id'].tolist()]
new_list = [
item for item in self.edbcodelist if item not in df['指标id'].tolist()]
logger.info(new_list)
# 遍历new_list获取指标数据保存到df1
for item in new_list:
@ -399,33 +411,196 @@ class EtaReader():
except:
itemname = item
df1 = self.edbcodegetdata(df1,item,itemname)
df = pd.concat([df, pd.DataFrame({'指标分类': '其他', '指标名称': itemname, '指标id': item, '频度': '其他','指标来源':'其他','来源id':'其他'},index=[0])])
df1 = self.edbcodegetdata(df1, item, itemname)
df = pd.concat([df, pd.DataFrame(
{'指标分类': '其他', '指标名称': itemname, '指标id': item, '频度': '其他', '指标来源': '其他', '来源id': '其他'}, index=[0])])
# 按时间排序
df1.sort_values('DataTime',inplace=True,ascending=False)
df1.rename(columns={'DataTime': 'date'},inplace=True)
df1.sort_values('DataTime', inplace=True, ascending=False)
df1.rename(columns={'DataTime': 'date'}, inplace=True)
# df1.dropna(inplace=True)
# 去掉大于今天日期的行
df1 = df1[df1['date'] <= datetime.datetime.now().strftime('%Y-%m-%d')]
logger.info(df1.head())
# logger.info(f'{df1.head()}')
# 保存到xlsx文件的sheet表
with pd.ExcelWriter(os.path.join(dataset,data_set)) as file:
with pd.ExcelWriter(os.path.join(dataset, data_set)) as file:
df1.to_excel(file, sheet_name='指标数据', index=False)
df.to_excel(file, sheet_name='指标列表', index=False)
df_zhibiaoshuju = df1.copy()
df_zhibiaoliebiao = df.copy()
return df_zhibiaoshuju,df_zhibiaoliebiao
return df_zhibiaoshuju, df_zhibiaoliebiao
def get_eta_api_pp_data(self,data_set,dataset=''):
def get_eta_api_chengpinyou_data(self, data_set, dataset=''):
'''
从ETA API获取原油数据
参数:
data_set (str): 数据集名称
dataset (str): 数据集ID默认为空
返回:
None
'''
today = datetime.date.today().strftime("%Y-%m-%d")
# 定义你的headers这里可以包含多个参数
self.headers = {
'nonce': self.signature.nonce, # 例如,一个认证令牌
# 自定义的header参数
'timestamp': str(self.signature.timestamp),
'appid': self.signature.APPID, # 另一个自定义的header参数
'signature': self.signature.signature
}
# 从列表数据中获取指标名称,判断指标名称频度是否为日 如果是则获取UniqueCode然后获取指标数据保存到xlat文件中的sheet表。
'''
df = sheetname 指标列表存储 指标分类-指标名称-指标id-频度
df1 = sheetname 指标数据 ,存储 时间-指标名称1-指标名称2...
'''
# 构建新的DataFrame df df1
df = pd.DataFrame(columns=[
'指标分类', '指标名称', '指标id', '频度', '指标来源', '来源id', '最后更新时间', '更新周期', '预警日期', '停更周期'])
df1 = pd.DataFrame(columns=['DataTime'])
# 外网环境无法访问,请确认是否为内网环境
try:
# 发送GET请求 获取指标分类列表
response = requests.get(self.classifylisturl, headers=self.headers)
except requests.exceptions.RequestException as e:
raise Exception(f"请求失败,请确认是否为内网环境: {e}", "\033[0m")
# 检查响应状态码
if response.status_code == 200:
# 获取成功, 处理响应内容
data = response.json() # 假设接口返回的是JSON数据
# 请求成功,处理响应内容
# logger.info(data.get('Data'))
# 定义你想要保留的固定值
# fixed_value = 1193 # 成品油
fixed_value = 1285 # 沥青
# 遍历列表,只保留那些'category' key的值为固定值的数据项
filtered_data = [item for item in data.get(
'Data') if item.get('ParentId') == fixed_value]
# 然后循环filtered_data去获取list数据才能获取到想要获取的ClassifyId
n = 0
for item in filtered_data:
n += 1
# if n>50:
# break
ClassifyId = item["ClassifyId"] # 分类id分类下的指标列表接口的请求参数
ClassifyName = item["ClassifyName"] # 分类名称要保存到df的指标分类列
# 根据分类id获取指标列表
url = self.classifyidlisturl+str(ClassifyId)
response = requests.get(url, headers=self.headers)
if response.status_code == 200:
# logger.info(response.text)
data2 = response.json()
Data = data2.get('Data')
for i in Data:
# s+= 1
EdbCode = i.get('EdbCode')
# 指标名称要保存到df2的指标名称列,df的指标名称列
EdbName = i.get('EdbName')
Frequency = i.get('Frequency') # 频度要保存到df的频度列
SourceName = i.get('SourceName') # 来源名称要保存到df的频度列
Source = i.get('Source') # 来源ID要保存到df的频度列
# 保存到df
df1 = self.edbcodegetdata(df1, EdbCode, EdbName)
# 取df1所有行最后一列
edbname_df = df1[['DataTime', f'{EdbName}']]
edbname_df = edbname_df.dropna()
if len(edbname_df) == 0:
logger.info(f'指标名称:{EdbName} 没有数据')
continue
try:
time_sequence = edbname_df['DataTime'].values.tolist(
)[-10:]
except IndexError:
time_sequence = edbname_df['DataTime'].values.tolist(
)
# 使用Counter来统计每个星期几的出现次数
from collections import Counter
weekday_counter = Counter(datetime.datetime.strptime(
time_str, "%Y-%m-%d").strftime('%A') for time_str in time_sequence)
# 打印出现次数最多的星期几
try:
most_common_weekday = weekday_counter.most_common(1)[
0][0]
# 计算两周后的日期
warning_date = (datetime.datetime.strptime(
time_sequence[-1], "%Y-%m-%d") + datetime.timedelta(weeks=2)).strftime("%Y-%m-%d")
stop_update_period = (datetime.datetime.strptime(
today, "%Y-%m-%d") - datetime.datetime.strptime(time_sequence[-1], "%Y-%m-%d")).days // 7
except IndexError:
most_common_weekday = '其他'
stop_update_period = 0
if '' in Frequency:
most_common_weekday = '每天'
warning_date = (datetime.datetime.strptime(
time_sequence[-1], "%Y-%m-%d") + datetime.timedelta(days=3)).strftime("%Y-%m-%d")
stop_update_period = (datetime.datetime.strptime(
today, "%Y-%m-%d") - datetime.datetime.strptime(time_sequence[-1], "%Y-%m-%d")).days
# 保存频度 指标名称 分类 指标id 到 df
df2 = pd.DataFrame({'指标分类': ClassifyName, '指标名称': EdbName, '指标id': EdbCode, '频度': Frequency, '指标来源': SourceName, '来源id': Source,
'最后更新时间': edbname_df['DataTime'].values[-1], '更新周期': most_common_weekday, '预警日期': warning_date, '停更周期': stop_update_period}, index=[0])
# df = pd.merge(df, df2, how='outer')
df = pd.concat([df, df2])
# 找到列表中不在指标列中的指标id保存成新的list
new_list = [
item for item in self.edbcodelist if item not in df['指标id'].tolist()]
logger.info(new_list)
# 遍历new_list获取指标数据保存到df1
for item in new_list:
logger.info(item)
# 将item 加入到 df['指标id']中
try:
itemname = edbcodenamedict[item]
except:
itemname = item
df1 = self.edbcodegetdata(df1, item, itemname)
df = pd.concat([df, pd.DataFrame(
{'指标分类': '其他', '指标名称': itemname, '指标id': item, '频度': '其他', '指标来源': '其他', '来源id': '其他'}, index=[0])])
# 按时间排序
df1.sort_values('DataTime', inplace=True, ascending=False)
df1.rename(columns={'DataTime': 'date'}, inplace=True)
# df1.dropna(inplace=True)
# 去掉大于今天日期的行
df1 = df1[df1['date'] <= datetime.datetime.now().strftime('%Y-%m-%d')]
logger.info(df1.head())
# logger.info(f'{df1.head()}')
# 保存到xlsx文件的sheet表
with pd.ExcelWriter(os.path.join(dataset, data_set)) as file:
df1.to_excel(file, sheet_name='指标数据', index=False)
df.to_excel(file, sheet_name='指标列表', index=False)
df_zhibiaoshuju = df1.copy()
df_zhibiaoliebiao = df.copy()
return df_zhibiaoshuju, df_zhibiaoliebiao
def get_eta_api_pp_data(self, data_set, dataset=''):
global ClassifyId
today = datetime.date.today().strftime("%Y-%m-%d")
# 定义你的headers这里可以包含多个参数
self.headers = {
'nonce': self.signature.nonce, # 例如,一个认证令牌
'timestamp': str(self.signature.timestamp), # 自定义的header参数
# 自定义的header参数
'timestamp': str(self.signature.timestamp),
'appid': self.signature.APPID, # 另一个自定义的header参数
'signature': self.signature.signature
}
@ -442,13 +617,12 @@ class EtaReader():
df = pd.DataFrame(columns=['指标分类', '指标名称', '指标id', '频度'])
df1 = pd.DataFrame(columns=['DataTime'])
# 外网环境无法访问,请确认是否为内网环境
try:
# 发送GET请求 获取指标分类列表
response = requests.get(self.classifylisturl, headers=self.headers)
except requests.exceptions.RequestException as e:
raise Exception(f"请求失败,请确认是否为内网环境: {e}","\033[0m")
raise Exception(f"请求失败,请确认是否为内网环境: {e}", "\033[0m")
# 检查响应状态码
if response.status_code == 200:
@ -461,16 +635,17 @@ class EtaReader():
fixed_value = ClassifyId
# 遍历列表,只保留那些'category' key的值为固定值的数据项
filtered_data = [item for item in data.get('Data') if item.get('ParentId') == fixed_value]
filtered_data = [item for item in data.get(
'Data') if item.get('ParentId') == fixed_value]
#然后循环filtered_data去获取list数据才能获取到想要获取的ClassifyId
# 然后循环filtered_data去获取list数据才能获取到想要获取的ClassifyId
n = 0
for item in filtered_data:
n+= 1
n += 1
# if n>50:
# break
ClassifyId = item["ClassifyId"] #分类id分类下的指标列表接口的请求参数
ClassifyName = item["ClassifyName"] #分类名称要保存到df的指标分类列
ClassifyId = item["ClassifyId"] # 分类id分类下的指标列表接口的请求参数
ClassifyName = item["ClassifyName"] # 分类名称要保存到df的指标分类列
# 根据分类id获取指标列表
url = self.classifyidlisturl+str(ClassifyId)
response = requests.get(url, headers=self.headers)
@ -481,27 +656,30 @@ class EtaReader():
for i in Data:
# s+= 1
EdbCode = i.get('EdbCode')
EdbName = i.get('EdbName') # 指标名称要保存到df2的指标名称列,df的指标名称列
# 指标名称要保存到df2的指标名称列,df的指标名称列
EdbName = i.get('EdbName')
Frequency = i.get('Frequency') # 频度要保存到df的频度列
# 频度不是 日 或者 周的 跳过
if Frequency not in ['日度','周度','','']:
if Frequency not in ['日度', '周度', '', '']:
continue
# 判断名称是否需要保存
isSave = self.filter_pp_data(ClassifyName,EdbName)
isSave = self.filter_pp_data(ClassifyName, EdbName)
if isSave:
# 保存到df
# 保存频度 指标名称 分类 指标id 到 df
df2 = pd.DataFrame({'指标分类': ClassifyName, '指标名称': EdbName, '指标id': EdbCode, '频度': Frequency},index=[0])
df2 = pd.DataFrame(
{'指标分类': ClassifyName, '指标名称': EdbName, '指标id': EdbCode, '频度': Frequency}, index=[0])
# df = pd.merge(df, df2, how='outer')
df = pd.concat([df, df2])
df1 = self.edbcodegetdata(df1,EdbCode,EdbName)
df1 = self.edbcodegetdata(df1, EdbCode, EdbName)
else:
logger.info(f'跳过指标 {EdbName}')
# 找到列表中不在指标列中的指标id保存成新的list
new_list = [item for item in self.edbcodelist if item not in df['指标id'].tolist()]
new_list = [
item for item in self.edbcodelist if item not in df['指标id'].tolist()]
logger.info(new_list)
# 遍历new_list获取指标数据保存到df1
for item in new_list:
@ -512,41 +690,44 @@ class EtaReader():
except:
itemname = item
df1 = self.edbcodegetdata(df1,item,itemname)
df = pd.concat([df, pd.DataFrame({'指标分类': '其他', '指标名称': itemname, '指标id': item, '频度': '其他'},index=[0])])
df1 = self.edbcodegetdata(df1, item, itemname)
df = pd.concat([df, pd.DataFrame(
{'指标分类': '其他', '指标名称': itemname, '指标id': item, '频度': '其他'}, index=[0])])
# 按时间排序
df1.sort_values('DataTime',inplace=True,ascending=False)
df1.rename(columns={'DataTime': 'date'},inplace=True)
df1.sort_values('DataTime', inplace=True, ascending=False)
df1.rename(columns={'DataTime': 'date'}, inplace=True)
# df1.dropna(inplace=True)
# 去掉大于今天日期的行
df1 = df1[df1['date'] <= datetime.datetime.now().strftime('%Y-%m-%d')]
logger.info(df1.head())
# logger.info(f'{df1.head()}')
# 保存到xlsx文件的sheet表
with pd.ExcelWriter(os.path.join(dataset,data_set)) as file:
with pd.ExcelWriter(os.path.join(dataset, data_set)) as file:
df1.to_excel(file, sheet_name='指标数据', index=False)
df.to_excel(file, sheet_name='指标列表', index=False)
df_zhibiaoshuju = df1.copy()
df_zhibiaoliebiao = df.copy()
return df_zhibiaoshuju,df_zhibiaoliebiao
return df_zhibiaoshuju, df_zhibiaoliebiao
def push_data(self,data):
def push_data(self, data):
today = datetime.date.today().strftime("%Y-%m-%d")
# 定义你的headers这里可以包含多个参数
self.headers = {
'nonce': self.signature.nonce, # 例如,一个认证令牌
'timestamp': str(self.signature.timestamp), # 自定义的header参数
# 自定义的header参数
'timestamp': str(self.signature.timestamp),
'appid': self.signature.APPID, # 另一个自定义的header参数
'signature': self.signature.signature
}
# 发送post请求 上传数据
logger.info('请求参数:',data)
response = requests.post(self.edbdatapushurl, headers=self.headers,data=json.dumps(data))
logger.info('请求参数:', data)
response = requests.post(
self.edbdatapushurl, headers=self.headers, data=json.dumps(data))
# 检查响应状态码
if response.status_code == 200:
@ -560,23 +741,24 @@ class EtaReader():
# 主动抛出异常
raise Exception(f'Error: {response.status_code}, {response.text}')
def del_zhibiao(self,IndexCodeList):
def del_zhibiao(self, IndexCodeList):
today = datetime.date.today().strftime("%Y-%m-%d")
# 定义你的headers这里可以包含多个参数
self.headers = {
'nonce': self.signature.nonce, # 例如,一个认证令牌
'timestamp': str(self.signature.timestamp), # 自定义的header参数
# 自定义的header参数
'timestamp': str(self.signature.timestamp),
'appid': self.signature.APPID, # 另一个自定义的header参数
'signature': self.signature.signature
}
data = {
"IndexCodeList": IndexCodeList #指标编码列表
}
"IndexCodeList": IndexCodeList # 指标编码列表
}
# 发送post请求 上传数据
response = requests.post(self.edbdeleteurl, headers=self.headers,data=json.dumps(data))
response = requests.post(
self.edbdeleteurl, headers=self.headers, data=json.dumps(data))
# 检查响应状态码
if response.status_code == 200:
@ -590,7 +772,7 @@ class EtaReader():
# 主动抛出异常
raise Exception(f'Error: {response.status_code}, {response.text}')
def del_business(self,data):
def del_business(self, data):
''''
接口地址
https://console-docs.apipost.cn/preview/fce869601d0be1d9/9a637c2f9ed0c589?target_id=d3cafcbf-a68c-42b3-b105-7bbd0e95a9cd
@ -607,15 +789,15 @@ class EtaReader():
# 定义你的headers这里可以包含多个参数
self.headers = {
'nonce': self.signature.nonce, # 例如,一个认证令牌
'timestamp': str(self.signature.timestamp), # 自定义的header参数
# 自定义的header参数
'timestamp': str(self.signature.timestamp),
'appid': self.signature.APPID, # 另一个自定义的header参数
'signature': self.signature.signature
}
# 发送post请求 上传数据
response = requests.post(self.edbbusinessurl, headers=self.headers,data=json.dumps(data))
response = requests.post(
self.edbbusinessurl, headers=self.headers, data=json.dumps(data))
# 检查响应状态码
if response.status_code == 200:
@ -634,6 +816,7 @@ class BinanceAPI:
'''
获取 Binance API 请求头签名
'''
def __init__(self, APPID, SECRET):
self.APPID = APPID
self.SECRET = SECRET
@ -641,7 +824,8 @@ class BinanceAPI:
# 生成随机字符串作为 nonce
def generate_nonce(self, length=32):
self.nonce = ''.join(random.choices(string.ascii_letters + string.digits, k=length))
self.nonce = ''.join(random.choices(
string.ascii_letters + string.digits, k=length))
return self.nonce
# 获取当前时间戳(秒)
@ -664,7 +848,8 @@ class BinanceAPI:
self.signature = self.calculate_signature(self.SECRET, self.sign_str)
# return self.signature
### 日志配置
# 日志配置
# 创建日志目录(如果不存在)
log_dir = 'logs'
@ -676,8 +861,10 @@ logger = logging.getLogger('pricepredict')
logger.setLevel(logging.INFO)
# 配置文件处理器,将日志记录到文件
file_handler = logging.handlers.RotatingFileHandler(os.path.join(log_dir, 'pricepredict.log'), maxBytes=1024 * 1024, backupCount=5)
file_handler.setFormatter(logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s'))
file_handler = logging.handlers.RotatingFileHandler(os.path.join(
log_dir, 'pricepredict.log'), maxBytes=1024 * 1024, backupCount=5)
file_handler.setFormatter(logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'))
# 配置控制台处理器,将日志打印到控制台
console_handler = logging.StreamHandler()
@ -688,7 +875,6 @@ logger.addHandler(file_handler)
logger.addHandler(console_handler)
# eta 接口url
sourcelisturl = 'http://10.189.2.78:8108/v1/edb/source/list'
classifylisturl = 'http://10.189.2.78:8108/v1/edb/classify/list?ClassifyType='
@ -698,47 +884,48 @@ edbcodedataurl = 'http://10.189.2.78:8108/v1/edb/data?EdbCode='
edbdatapushurl = 'http://10.189.2.78:8108/v1/edb/push'
edbdeleteurl = 'http://10.189.2.78:8108/v1/edb/business/edb/del'
edbbusinessurl = 'http://10.189.2.78:8108/v1/edb/business/data/del'
edbcodelist = ['CO1 Comdty', 'ovx index', 'C2404194834', 'C2404199738', 'dxy curncy', 'C2403128043', 'C2403150124',
'DOESCRUD Index', 'WTRBM1 EEGC Index', 'FVHCM1 INDEX', 'doedtprd index', 'CFFDQMMN INDEX',
'C2403083739', 'C2404167878', 'C2403250571', 'lmcads03 lme comdty', 'GC1 COMB Comdty',
'C2404171822','C2404167855',
# 'W000825','W000826','G.IPE', # 美国汽柴油
# 'S5131019','ID00135604','FSGAM1 Index','S5120408','ID00136724', # 新加坡汽柴油
]
edbcodelist = [
# 'CO1 Comdty', 'ovx index', 'C2404194834', 'C2404199738', 'dxy curncy', 'C2403128043', 'C2403150124',
# 'DOESCRUD Index', 'WTRBM1 EEGC Index', 'FVHCM1 INDEX', 'doedtprd index', 'CFFDQMMN INDEX',
# 'C2403083739', 'C2404167878', 'C2403250571', 'lmcads03 lme comdty', 'GC1 COMB Comdty',
# 'C2404171822', 'C2404167855',
# 'W000825','W000826','G.IPE', # 美国汽柴油
# 'S5131019','ID00135604','FSGAM1 Index','S5120408','ID00136724', # 新加坡汽柴油
]
# eta自有数据指标编码
modelsindex = {
'NHITS': 'SELF0000001',
'Informer':'SELF0000057',
'LSTM':'SELF0000058',
'iTransformer':'SELF0000059',
'TSMixer':'SELF0000060',
'TSMixerx':'SELF0000061',
'PatchTST':'SELF0000062',
'RNN':'SELF0000063',
'GRU':'SELF0000064',
'TCN':'SELF0000065',
'BiTCN':'SELF0000066',
'DilatedRNN':'SELF0000067',
'MLP':'SELF0000068',
'DLinear':'SELF0000069',
'NLinear':'SELF0000070',
'TFT':'SELF0000071',
'FEDformer':'SELF0000072',
'StemGNN':'SELF0000073',
'MLPMultivariate':'SELF0000074',
'TiDE':'SELF0000075',
'DeepNPTS':'SELF0000076'
}
# 'NHITS': 'SELF0000001',
# 'Informer': 'SELF0000057',
# 'LSTM': 'SELF0000058',
# 'iTransformer': 'SELF0000059',
# 'TSMixer': 'SELF0000060',
# 'TSMixerx': 'SELF0000061',
# 'PatchTST': 'SELF0000062',
# 'RNN': 'SELF0000063',
# 'GRU': 'SELF0000064',
# 'TCN': 'SELF0000065',
# 'BiTCN': 'SELF0000066',
# 'DilatedRNN': 'SELF0000067',
# 'MLP': 'SELF0000068',
# 'DLinear': 'SELF0000069',
# 'NLinear': 'SELF0000070',
# 'TFT': 'SELF0000071',
# 'FEDformer': 'SELF0000072',
# 'StemGNN': 'SELF0000073',
# 'MLPMultivariate': 'SELF0000074',
# 'TiDE': 'SELF0000075',
# 'DeepNPTS': 'SELF0000076'
}
edbcodelist = edbcodelist+list(modelsindex.values())
### 文件
data_set = '历史预测结果.xlsx' # 数据集文件
# 文件
data_set = '沥青eta数据.xlsx' # 数据集文件
# data_set = 'INE_OIL(1).csv'
### 文件夹
dataset = 'yuanyoudataset' # 数据集文件夹
# 文件夹
dataset = 'yuanyoudataset' # 数据集文件夹
# eta 接口token
@ -754,4 +941,10 @@ etadata = EtaReader(signature=signature,
edbdeleteurl=edbdeleteurl,
edbbusinessurl=edbbusinessurl,
)
df_zhibiaoshuju, df_zhibiaoliebiao = etadata.get_eta_api_yuanyou_data(data_set=data_set, dataset=dataset) # 原始数据,未处理
df_zhibiaoshuju, df_zhibiaoliebiao = etadata.get_eta_api_chengpinyou_data(
data_set=data_set, dataset=dataset) # 原始数据,未处理
# 保存到xlsx文件的sheet表
with pd.ExcelWriter(os.path.join(data_set)) as file:
df_zhibiaoshuju.to_excel(file, sheet_name='指标数据', index=False)
df_zhibiaoliebiao.to_excel(file, sheet_name='指标列表', index=False)

BIN
成品油eta数据.xlsx Normal file

Binary file not shown.

BIN
沥青eta数据.xlsx Normal file

Binary file not shown.