diff --git a/config_jingbo.py b/config_jingbo.py index b719e2d..6a2b80d 100644 --- a/config_jingbo.py +++ b/config_jingbo.py @@ -105,216 +105,39 @@ data = { ClassifyId = 1214 -################################################################################################################ 变量定义--线上环境 -# server_host = '10.200.32.39' -# login_pushreport_url = "http://10.200.32.39/jingbo-api/api/server/login" -# upload_url = "http://10.200.32.39/jingbo-api/api/analysis/reportInfo/researchUploadReportSave" -# upload_warning_url = "http://10.200.32.39/jingbo-api/api/basicBuiness/crudeOilWarning/save" -# query_data_list_item_nos_url = "http://10.200.32.39/jingbo-api/api/warehouse/dwDataItem/queryDataListItemNos" - -# login_data = { -# "data": { -# "account": "api_dev", -# "password": "ZTEwYWRjMzk0OWJhNTlhYmJlNTZlMDU3ZjIwZjg4M2U=", -# "tenantHashCode": "8a4577dbd919675758d57999a1e891fe", -# "terminal": "API" -# }, -# "funcModule": "API", -# "funcOperation": "获取token" -# } - - - -# upload_data = { -# "funcModule":'研究报告信息', -# "funcOperation":'上传原油价格预测报告', -# "data":{ -# "ownerAccount":'27663', #报告所属用户账号 27663 - 刘小朋 -# "reportType":'OIL_PRICE_FORECAST', # 报告类型,固定为OIL_PRICE_FORECAST -# "fileName": '', #文件名称 -# "fileBase64": '' ,#文件内容base64 -# "categoryNo":'yyjgycbg', # 研究报告分类编码 -# "smartBusinessClassCode":'YCJGYCBG', #分析报告分类编码 -# "reportEmployeeCode":"E40482" ,# 报告人 E40482 - 管理员 0000027663 - 刘小朋 -# "reportDeptCode" :"002000621000", # 报告部门 - 002000621000 SH期货研究部 -# "productGroupCode":"RAW_MATERIAL" # 商品分类 -# } -# } - -# warning_data = { -# "funcModule":'原油特征停更预警', -# "funcOperation":'原油特征停更预警', -# "data":{ -# 'WARNING_TYPE_NAME':'特征数据停更预警', -# 'WARNING_CONTENT':'', -# 'WARNING_DATE':'' -# } -# } - -# query_data_list_item_nos_data = { -# "funcModule": "数据项", -# "funcOperation": "查询", -# "data": { -# "dateStart":"20200101", -# "dateEnd":"20241231", -# "dataItemNoList":["Brentzdj","Brentzgj"] # 数据项编码,代表 brent最低价和最高价 -# } -# } - - - -# ## 生产环境数据库 -# # host = 'rm-2zehj3r1n60ttz9x5.mysql.rds.aliyuncs.com' -# # port = 3306 -# # dbusername ='jingbo' -# # password = 'shihua@123' -# # dbname = 'jingbo' -# # table_name = 'v_tbl_crude_oil_warning' - - - -# ## 预生产环境 -# host = 'rm-2zehj3r1n60ttz9x5ko.mysql.rds.aliyuncs.com' -# port = 3306 -# dbusername ='jingbo' -# password = 'shihua@123' -# dbname = 'jingbo-test' -# table_name = 'v_tbl_crude_oil_warning' - - - -# # 线上开关备份 -# is_train = True # 是否训练 -# is_debug = False # 是否调试 -# is_eta = True # 是否使用eta接口 -# is_timefurture = True # 是否使用时间特征 -# is_fivemodels = False # 是否使用之前保存的最佳的5个模型 -# is_edbcode = False # 特征使用edbcoding列表中的 -# is_edbnamelist = False # 自定义特征,对应上面的edbnamelist -# is_update_eta = True # 预测结果上传到eta -# is_update_report = True # 是否上传报告 -# is_update_warning_data = True if datetime.datetime.now().weekday() == 1 else False # 是否上传预警数据 -# is_del_corr = 0.6 # 是否删除相关性高的特征,取值为 0-1 ,0 为不删除,0.6 表示删除相关性小于0.6的特征 - - - -################################################################################################################ 变量定义--测试环境 -# server_host = '192.168.100.53' - -# login_pushreport_url = f"http://{server_host}:8080/jingbo-dev/api/server/login" -# upload_url = f"http://{server_host}:8080/jingbo-dev/api/analysis/reportInfo/researchUploadReportSave" -# upload_warning_url = f"http://{server_host}:8080/jingbo-dev/api/basicBuiness/crudeOilWarning/save" -# query_data_list_item_nos_url = f"http://{server_host}:8080/jingbo-dev/api/warehouse/dwDataItem/queryDataListItemNos" - -# login_data = { -# "data": { -# "account": "api_test", -# # "password": "MmVmNzNlOWI0MmY0ZDdjZGUwNzE3ZjFiMDJiZDZjZWU=", # Shihua@123456 -# "password": "ZTEwYWRjMzk0OWJhNTlhYmJlNTZlMDU3ZjIwZjg4M2U=", # 123456 -# "tenantHashCode": "8a4577dbd919675758d57999a1e891fe", -# "terminal": "API" -# }, -# "funcModule": "API", -# "funcOperation": "获取token" -# } - -# upload_data = { -# "funcModule":'研究报告信息', -# "funcOperation":'上传原油价格预测报告', -# "data":{ -# "ownerAccount":'arui', #报告所属用户账号 -# "reportType":'OIL_PRICE_FORECAST', # 报告类型,固定为OIL_PRICE_FORECAST -# "fileName": '2000-40-5-50--100-原油指标数据.xlsx-Brent活跃合约--2024-09-06-15-01-29-预测报告.pdf', #文件名称 -# "fileBase64": '' ,#文件内容base64 -# "categoryNo":'yyjgycbg', # 研究报告分类编码 -# "smartBusinessClassCode":'YCJGYCBG', #分析报告分类编码 -# "reportEmployeeCode":"E40116", # 报告人 -# "reportDeptCode" :"D0044" ,# 报告部门 -# "productGroupCode":"RAW_MATERIAL" # 商品分类 -# } -# } - - -# warning_data = { -# "funcModule":'原油特征停更预警', -# "funcOperation":'原油特征停更预警', -# "data":{ -# 'WARNING_TYPE_NAME':'特征数据停更预警', -# 'WARNING_CONTENT':'', -# 'WARNING_DATE':'' -# } -# } - -# query_data_list_item_nos_data = { -# "funcModule": "数据项", -# "funcOperation": "查询", -# "data": { -# "dateStart":"20200101", -# "dateEnd":"20241231", -# "dataItemNoList":["Brentzdj","Brentzgj"] # 数据项编码,代表 brent最低价和最高价 -# } -# } - - -# # 北京环境数据库 -# host = '192.168.101.27' -# port = 3306 -# dbusername ='root' -# password = '123456' -# dbname = 'jingbo_test' -# table_name = 'v_tbl_crude_oil_warning' - - -# ### 开关 -# is_train = False # 是否训练 -# is_debug = False # 是否调试 -# is_eta = False # 是否使用eta接口 -# is_market = True # 是否通过市场信息平台获取特征 ,在is_eta 为true 的情况下生效 -# is_timefurture = True # 是否使用时间特征 -# is_fivemodels = False # 是否使用之前保存的最佳的5个模型 -# is_edbcode = False # 特征使用edbcoding列表中的 -# is_edbnamelist = False # 自定义特征,对应上面的edbnamelist -# is_update_eta = False # 预测结果上传到eta -# is_update_report = True # 是否上传报告 -# is_update_warning_data = True # 是否上传预警数据 -# is_del_corr = 0.6 # 是否删除相关性高的特征,取值为 0-1 ,0 为不删除,0.6 表示删除相关性小于0.6的特征 -# is_del_tow_month = True # 是否删除两个月不更新的特征 - - -################################################################################################################ 变量定义--雍安测试环境 -login_pushreport_url = "http://192.168.100.115:9090/dom-api/api/server/login" -upload_url = "http://192.168.100.115:9090/dom-api/api/analysis/reportInfo/researchUploadReportSave" -# upload_url = "http://192.168.100.109:8080/jingbo/api/analysis/reportInfo/researchUploadReportSave" # zhaoqiwei -upload_warning_url = "http://192.168.100.115:9090/dom-api/api/basicBuiness/crudeOilWarning/save" -query_data_list_item_nos_url = "http://192.168.100.115:9090/dom-api/api/warehouse/dwDataItem/queryDataListItemNos" +############################################################################################################### 变量定义--测试环境 +server_host = '192.168.100.53' +login_pushreport_url = f"http://{server_host}:8080/jingbo-dev/api/server/login" +upload_url = f"http://{server_host}:8080/jingbo-dev/api/analysis/reportInfo/researchUploadReportSave" +upload_warning_url = f"http://{server_host}:8080/jingbo-dev/api/basicBuiness/crudeOilWarning/save" +query_data_list_item_nos_url = f"http://{server_host}:8080/jingbo-dev/api/warehouse/dwDataItem/queryDataListItemNos" login_data = { "data": { - "account": "api-dev", - "password": "ZTEwYWRjMzk0OWJhNTlhYmJlNTZlMDU3ZjIwZjg4M2U=", - "tenantHashCode": "1eb24ab5a6af12e30daf78af276664f1", + "account": "api_test", + # "password": "MmVmNzNlOWI0MmY0ZDdjZGUwNzE3ZjFiMDJiZDZjZWU=", # Shihua@123456 + "password": "ZTEwYWRjMzk0OWJhNTlhYmJlNTZlMDU3ZjIwZjg4M2U=", # 123456 + "tenantHashCode": "8a4577dbd919675758d57999a1e891fe", "terminal": "API" }, "funcModule": "API", "funcOperation": "获取token" } - upload_data = { "funcModule":'研究报告信息', "funcOperation":'上传原油价格预测报告', "data":{ - "ownerAccount":'rui.liu', #报告所属用户账号 + "ownerAccount":'arui', #报告所属用户账号 "reportType":'OIL_PRICE_FORECAST', # 报告类型,固定为OIL_PRICE_FORECAST "fileName": '2000-40-5-50--100-原油指标数据.xlsx-Brent活跃合约--2024-09-06-15-01-29-预测报告.pdf', #文件名称 "fileBase64": '' ,#文件内容base64 "categoryNo":'yyjgycbg', # 研究报告分类编码 - "smartBusinessClassCode":'1', #分析报告分类编码 - "reportEmployeeCode":"U270018", # 报告人 - "reportDeptCode" :"D270001" ,# 报告部门 - # "reportDeptCode" :"000001" ,# 报告部门 + "smartBusinessClassCode":'YCJGYCBG', #分析报告分类编码 + "reportEmployeeCode":"E40116", # 报告人 + "reportDeptCode" :"D0044" ,# 报告部门 "productGroupCode":"RAW_MATERIAL" # 商品分类 } } @@ -361,7 +184,7 @@ is_edbcode = False # 特征使用edbcoding列表中的 is_edbnamelist = False # 自定义特征,对应上面的edbnamelist is_update_eta = False # 预测结果上传到eta is_update_report = True # 是否上传报告 -is_update_warning_data = False # 是否上传预警数据 +is_update_warning_data = True # 是否上传预警数据 is_del_corr = 0.6 # 是否删除相关性高的特征,取值为 0-1 ,0 为不删除,0.6 表示删除相关性小于0.6的特征 is_del_tow_month = True # 是否删除两个月不更新的特征 @@ -424,8 +247,8 @@ if end_time == '': username='1321340118@qq.com' passwd='wgczgyhtyyyyjghi' # recv=['liurui_test@163.com','52585119@qq.com'] -# recv=['liurui_test@163.com','jin.wang@chambroad.com'] -recv=['liurui_test@163.com'] +recv=['liurui_test@163.com','jin.wang@chambroad.com'] +# recv=['liurui_test@163.com'] title='reportname' content='brent价格预测报告请看附件' file=os.path.join(dataset,'reportname') diff --git a/config_jingbo_pro.py b/config_jingbo_pro.py index 4d09fda..07b8a56 100644 --- a/config_jingbo_pro.py +++ b/config_jingbo_pro.py @@ -168,73 +168,6 @@ is_update_warning_data = True if datetime.datetime.now().weekday() == 1 else Fa is_del_corr = 0.6 # 是否删除相关性高的特征,取值为 0-1 ,0 为不删除,0.6 表示删除相关性小于0.6的特征 -################################################################################################################ 变量定义--测试环境 -# login_pushreport_url = "http://192.168.100.53:8080/jingbo-dev/api/server/login" -# upload_url = "http://192.168.100.53:8080/jingbo-dev/api/analysis/reportInfo/researchUploadReportSave" -# # upload_url = "http://192.168.100.109:8080/jingbo/api/analysis/reportInfo/researchUploadReportSave" # zhaoqiwei -# upload_warning_url = "http://192.168.100.53:8080/jingbo-dev/api/basicBuiness/crudeOilWarning/save" - - -# login_data = { -# "data": { -# "account": "api_test", -# # "password": "MmVmNzNlOWI0MmY0ZDdjZGUwNzE3ZjFiMDJiZDZjZWU=", # Shihua@123456 -# "password": "ZTEwYWRjMzk0OWJhNTlhYmJlNTZlMDU3ZjIwZjg4M2U=", # 123456 -# "tenantHashCode": "8a4577dbd919675758d57999a1e891fe", -# "terminal": "API" -# }, -# "funcModule": "API", -# "funcOperation": "获取token" -# } - -# upload_data = { -# "funcModule":'研究报告信息', -# "funcOperation":'上传原油价格预测报告', -# "data":{ -# "ownerAccount":'arui', #报告所属用户账号 -# "reportType":'OIL_PRICE_FORECAST', # 报告类型,固定为OIL_PRICE_FORECAST -# "fileName": '2000-40-5-50--100-原油指标数据.xlsx-Brent活跃合约--2024-09-06-15-01-29-预测报告.pdf', #文件名称 -# "fileBase64": '' ,#文件内容base64 -# "categoryNo":'yyjgycbg', # 研究报告分类编码 -# "smartBusinessClassCode":'YCJGYCBG', #分析报告分类编码 -# "reportEmployeeCode":"E40116", # 报告人 -# "reportDeptCode" :"D0044" ,# 报告部门 -# "productGroupCode":"RAW_MATERIAL" # 商品分类 -# } -# } - -# warning_data = { -# "funcModule":'原油特征停更预警', -# "funcOperation":'原油特征停更预警', -# "data":{ -# 'WARNING_TYPE_NAME':'特征数据停更预警', -# 'WARNING_CONTENT':'', -# 'WARNING_DATE':'' -# } -# } - -# # 北京环境数据库 -# host = '192.168.101.27' -# port = 3306 -# dbusername ='root' -# password = '123456' -# dbname = 'jingbo_test' -# table_name = 'v_tbl_crude_oil_warning' - - -# ### 开关 -# is_train = True # 是否训练 -# is_debug = False # 是否调试 -# is_eta = False # 是否使用eta接口 -# is_timefurture = True # 是否使用时间特征 -# is_fivemodels = False # 是否使用之前保存的最佳的5个模型 -# is_edbcode = False # 特征使用edbcoding列表中的 -# is_edbnamelist = False # 自定义特征,对应上面的edbnamelist -# is_update_eta = False # 预测结果上传到eta -# is_update_report = False # 是否上传报告 -# is_update_warning_data = False # 是否上传预警数据 -# is_del_corr = 0.6 # 是否删除相关性高的特征,取值为 0-1 ,0 为不删除,0.6 表示删除相关性小于0.6的特征 - # 连接到数据库 db_mysql = MySQLDB(host=host, user=dbusername, password=password, database=dbname) db_mysql.connect() diff --git a/lib/dataread_jingbo_pro.py b/lib/dataread_jingbo_pro.py new file mode 100644 index 0000000..07e1d87 --- /dev/null +++ b/lib/dataread_jingbo_pro.py @@ -0,0 +1,1733 @@ + +# 导入模块 +import pandas as pd +import numpy as np +import datetime +import string +import base64 +import requests +import random +import time +import re +import os +import hmac +import hashlib +import json +import math +import torch +torch.set_float32_matmul_precision("high") +import matplotlib.pyplot as plt +#设置plt显示中文 +plt.rcParams['font.sans-serif'] = ['SimHei'] # 用来正常显示中文标签 +plt.rcParams['axes.unicode_minus'] = False # 用来正常显示负号 + +from datetime import timedelta +from sklearn import metrics +from reportlab.pdfbase import pdfmetrics # 注册字体 +from reportlab.pdfbase.ttfonts import TTFont # 字体类 +from reportlab.platypus import Table, SimpleDocTemplate, Paragraph, Image # 报告内容相关类 +from reportlab.lib.pagesizes import letter # 页面的标志尺寸(8.5*inch, 11*inch) +from reportlab.lib.styles import getSampleStyleSheet # 文本样式 +from reportlab.lib import colors # 颜色模块 +from reportlab.graphics.charts.barcharts import VerticalBarChart # 图表类 +from reportlab.graphics.charts.legends import Legend # 图例类 +from reportlab.graphics.shapes import Drawing # 绘图工具 +from reportlab.lib.units import cm # 单位:cm + +# 注册字体(提前准备好字体文件, 如果同一个文件需要多种字体可以注册多个) +pdfmetrics.registerFont(TTFont('SimSun', 'SimSun.ttf')) +#设置plt显示中文 +plt.rcParams['font.sans-serif'] = ['SimHei'] # 用来正常显示中文标签 +plt.rcParams['axes.unicode_minus'] = False # 用来正常显示负号 + + +from config_jingbo_pro import * + + + + +# 定义函数 +def loadcsv(filename): + """ + 读取指定文件名的 CSV 文件。 + 如果文件编码为 UTF-8,则使用 UTF-8 编码读取;否则,使用 GBK 编码读取。 + + 参数: + filename (str): 要读取的 CSV 文件的文件名。 + + 返回: + pandas.DataFrame: 读取的数据。 + + """ + # 读取csv文件 + try: + df = pd.read_csv(filename, encoding='utf-8') + except UnicodeDecodeError: + df = pd.read_csv(filename, encoding='gbk') + return df + + +def dateConvert(df, datecol='ds'): + """ + 将数据框 df 中的 datecol 列转换为日期时间类型。 + + 参数: + df (pandas.DataFrame): 要转换的 DataFrame。 + datecol (str): 要转换的列名,默认为 'ds'。 + + 返回: + pandas.DataFrame: 转换后的 DataFrame。 + + """ + # 将date列转换为datetime类型 + try: + df[datecol] = pd.to_datetime(df[datecol],format=r'%Y-%m-%d') + except: + df[datecol] = pd.to_datetime(df[datecol],format=r'%Y/%m/%d') + return df + + +def calculate_kdj(data, n=9): + ''' + 给传进来的df 添加列: 波动率,最高,最低,k,d ,j + ''' + # 对数据按照日期升序排序 + data = data.sort_values(by='ds', ascending=True) + # 因为没有高开低价格,利用每日波动率模拟当天最高价和最低价 + data['pctchange'] = data['y'].pct_change() + # 收益为0的用0.01 + data['pctchange'] = data['pctchange'].replace(0,0.01) + # 去除空值 + data.dropna(inplace=True) + # 重置索引 + data.reset_index(drop=True,inplace=True) + # 计算最高价和最低价 + data['high'] = data['y']* (1+abs(data['pctchange'])/2) + data['low'] = data['y']* (1-abs(data['pctchange'])/2) + # 计算n日内最低价 + low_list = data['y'].rolling(window=n, min_periods=1).min() + # 计算n日内最高价 + high_list = data['y'].rolling(window=n, min_periods=1).max() + # 计算未成熟随机值 + rsv = ((data['y'] - low_list) / (high_list - low_list)) * 100 + # 初始化k值为50 + k = pd.Series(50, index=data.index) + # 初始化d值为50 + d = pd.Series(50, index=data.index) + # 计算k值和d值 + for i in range(1, len(data)): + k[i] = (2/3 * k[i - 1]) + (1/3 * rsv[i]) + d[i] = (2/3 * d[i - 1]) + (1/3 * k[i]) + # 计算j值 + j = 3 * k - 2 * d + + # 将k值、d值和j值添加到数据中 + data['K'] = k + data['D'] = d + data['J'] = j + # 将包含 KDJ 指标的数据保存到新的 CSV 文件 + data.to_csv('stock_data_with_kdj.csv', index=False) + # data = data.dropna() + return data + + +# 上传报告 +def get_head_auth_report(): + """ + 通过 POST 请求登录到指定的 URL,并从响应中获取认证令牌。 + + 返回: + str: 如果登录成功,返回认证令牌;否则返回 None。 + """ + logger.info("获取token中...") + logger.info(f'url:{login_pushreport_url},login_data:{login_data}') + # 发送 POST 请求到登录 URL,携带登录数据 + login_res = requests.post(url=login_pushreport_url, json=login_data, timeout=(3, 30)) + + # 将响应内容转换为 JSON 格式 + text = json.loads(login_res.text) + logger.info(f'token接口响应:{text}') + # 如果响应状态为成功 + if text["status"]: + # 从响应数据中获取认证令牌 + token = text["data"]["accessToken"] + # 返回认证令牌 + return token + + +def upload_report_data(token, upload_data): + """ + 上传报告数据到指定的URL + + 参数: + token (str): 认证令牌 + upload_data (dict): 要上传的报告数据,包含必要的字段和信息 + + 返回: + dict: 如果上传成功,返回响应对象;否则返回None + """ + # 直接使用传入的 upload_data + upload_data = upload_data + + # 设置请求头部 + headers = {"Authorization": token} + + # 打印日志,显示正在上传报告数据 + logger.info("报告上传中...") + + # 打印日志,显示认证头部信息 + logger.info(f"token:{token}") + + # 打印日志,显示要上传的报告数据 + logger.info(f"upload_data:{upload_data}" ) + + # 发送POST请求,上传报告数据 + upload_res = requests.post(url=upload_url, headers=headers, json=upload_data, timeout=(3, 15)) + + # 将响应内容转换为 JSON 格式 + upload_res = json.loads(upload_res.text) + + # 打印日志,显示响应内容 + logger.info(upload_res) + + # 如果上传成功,返回响应对象 + if upload_res: + return upload_res + # 如果上传失败,打印日志并返回None + else: + logger.info("报告上传失败") + return None + + +def upload_warning_data(warning_data): + """ + 上传预警数据到指定的URL + + 参数: + warning_data (dict): 要上传的预警数据,包含必要的字段和信息 + + 返回: + requests.Response: 如果上传成功,返回响应对象;否则返回None + """ + # 获取认证头部信息 + token = get_head_auth_report() + + # 设置请求头部 + headers = {"Authorization": token} + + # 打印日志,显示正在上传预警数据 + logger.info("预警上传中...") + + # 打印日志,显示上传的URL + logger.info(f"upload_warning_url:{upload_warning_url}") + + # 打印日志,显示认证头部信息 + logger.info(f"token:{token}") + + # 打印日志,显示要上传的预警数据 + logger.info(f"warning_data:{warning_data}") + + # 发送POST请求,上传预警数据 + upload_res = requests.post(url=upload_warning_url, headers=headers, json=warning_data, timeout=(3, 15)) + + # 如果上传成功,返回响应对象 + if upload_res: + return upload_res + # 如果上传失败,打印日志并返回None + else: + logger.info("预警上传失败") + return None + + + +def upload_warning_info(df_count): + """ + 上传预警信息到指定的URL + + 参数: + df_count (int): 停更的数量 + + 返回: + None + """ + # 打印日志,显示正在上传预警信息 + logger.info(f'上传预警信息') + + try: + # 获取当前日期 + warning_date = datetime.datetime.now().strftime('%Y-%m-%d') + + # 构建预警内容 + content = f'{warning_date}有{df_count}个停更' + + # 更新预警数据中的日期和内容 + warning_data['data']['WARNING_DATE'] = warning_date + warning_data['data']['WARNING_CONTENT'] = content + + # 调用 upload_warning_data 函数上传预警数据 + upload_warning_data(warning_data) + + # 打印日志,显示上传预警信息成功 + logger.info(f'上传预警信息成功') + except Exception as e: + # 打印日志,显示上传预警信息失败,并记录异常信息 + logger.error(f'上传预警信息失败:{e}') + + + +def create_feature_last_update_time(df): + """ + 计算特征停更信息用 + 参数: + df (DataFrame): 包含特征数据的 DataFrame + 返回: + DataFrame: 包含特征停更信息的 DataFrame + str: y 列的最后更新时间 + """ + df1 = df.copy() + # 找到每列的最后更新时间 + df1.set_index('ds', inplace=True) + last_update_times = df1.apply(lambda x: x.dropna().index.max().strftime('%Y-%m-%d') if not x.dropna().empty else None) + + # 保存每列的最后更新时间到文件 + last_update_times_df = pd.DataFrame(columns = ['feature', 'last_update_time','is_value','update_period','warning_date','stop_update_period']) + + # 打印每列的最后更新时间 + for column, last_update_time in last_update_times.items(): + values = [] + # 判断是不是常数值 + if df1[column].tail(20).nunique() == 1: + values = values + [column, last_update_time,1] + else: + values = values + [column, last_update_time,0] + # 计算特征数据值的时间差 + try: + # 计算预警日期 + time_diff = (df1[column].dropna().index.to_series().diff().mode()[0]).total_seconds() / 3600 / 24 + last_update_time_datetime = datetime.datetime.strptime(last_update_time, '%Y-%m-%d') + last_update_date = end_time if end_time != '' else datetime.datetime.now().strftime('%Y-%m-%d') + end_time_datetime = datetime.datetime.strptime(last_update_date, '%Y-%m-%d') + early_warning_date = last_update_time_datetime + timedelta(days=time_diff)*2 + timedelta(days=1) + stop_update_period = int(math.ceil((end_time_datetime-last_update_time_datetime).days / time_diff)) + early_warning_date = early_warning_date.strftime('%Y-%m-%d') + except KeyError: + time_diff = 0 + early_warning_date = end_time + continue + + values = values + [time_diff,early_warning_date,stop_update_period] + last_update_times_df.loc[len(last_update_times_df)] = values + + logger.info(f"Column {column} was last updated at {last_update_time}") + y_last_update_time = last_update_times_df[last_update_times_df['feature']=='y']['warning_date'].values[0] + last_update_times_df.to_csv(os.path.join(dataset,'last_update_times.csv'), index=False) + logger.info('特征停更信息保存到文件:last_update_times.csv') + return last_update_times_df,y_last_update_time + + + +# 统计特征频度 +def featurePindu(dataset): + # 读取文件 + df = loadcsv(os.path.join(dataset,'未填充的特征数据.csv')) + df['ds'] = pd.to_datetime(df['ds']) + # 按ds正序排序,重置索引 + df = df.sort_values(by='ds', ascending=True).reset_index(drop=True) + + # 统计特征频度 + # 每列随机抽取10个值,计算出5个时间间隔,统计每个时间间隔的频度 + columns = df.columns.to_list() + columns.remove('ds') + count_dict = {} + for column in columns: + # 获取每列时间间隔 + values = df[[column,'ds']] + values.dropna(inplace=True,axis=0) + values=values.reset_index(drop=True) + + # 抽取20%个值 + value = values.sample(frac=0.2) + index = value.index + next_index = index + 1 + count = [] + for i,j in zip(index, next_index): + #通过索引计算日期差 + try: + count.append((values.loc[j,'ds'] - values.loc[i,'ds']).days) + except: + pass + # 把31 换成 30 + count = [30 if i == 31 else i for i in count] + # 保留count中出现次数最多的数 + try: + count = max(set(count), key=count.count) + except ValueError : + logger.info(f'{column}列数据为空') + continue + # 存储到字典中 + count_dict[column] = count + + df = pd.DataFrame(count_dict,index=['count']).T + pindu_dfs = pd.DataFrame() + # 根据count分组 + # 输出特征频度统计 + pindudict = {'1':'日度','3':'日度','7':'周度','30':'月度','90':'季度','180':'半年度','365':'年度'} + for i in df.groupby('count'): + # 获取 i[1] 的索引值 + index = i[1].index + pindu_df = pd.DataFrame() + try: + pindu_df[pindudict[str(i[0])]+f'({len(i[1])})'] = index + except KeyError : + pindu_df[str(i[0])+f'天({len(i[1])})'] = index + # 合并到pindu_dfs + pindu_dfs = pd.concat([pindu_dfs,pindu_df],axis=1) + # nan替换为 ' ' + pindu_dfs = pindu_dfs.fillna('') + pindu_dfs.to_csv(os.path.join(dataset,'特征频度统计.csv'),index=False) + logger.info(pindu_dfs) + featureInfo = f'特征信息:总共有{len(columns)-2}个' + for i in pindu_dfs.columns: + featureInfo += f',{i}' + + featureInfo += ', 详看 附1、特征列表' + + featureInfo += ''' + 数据特征工程: + 1. 数据日期排序,新日期在最后 + 2. 删除空列,特征数据列没有值,就删除 + 3. 删除近两月不再更新值的指标 + 4. 非日度数据填充为日度数据,填充规则: + -- 向后填充,举例:假设周五出现一个周度指标数据,那么在这之前的数据用上周五的数据 + -- 向前填充,举例:采集数据开始日期为2018年1月1日,那么周度数据可能是2018年1月3日,那么3日的数据向前填充,使1日2日都有数值 + 数据特征相关性分析: + ''' + logger.info(featureInfo) + with open(os.path.join(dataset,'特征频度统计.txt'), 'w', encoding='utf-8') as f: + f.write(featureInfo) + logger.info('*'*200) + + +def featureAnalysis(df,dataset,y): + # 特征筛选 + import matplotlib.pyplot as plt + # 选择特征和标签列 + X = df.drop(['ds', 'y'], axis=1) # 特征集,排除时间戳和标签列 + yy = df['y'] # 标签集 + + # 标签集自相关函数分析 + from statsmodels.graphics.tsaplots import plot_acf + plot_acf(yy, lags=30) + plt.savefig(os.path.join(dataset,'指标数据自相关图.png')) + plt.close() + + # 标签集偏自相关函数分析 + from statsmodels.graphics.tsaplots import plot_pacf + plot_pacf(yy, lags=30) + plt.savefig(os.path.join(dataset,'指标数据偏自相关图.png')) + plt.close() + + # 画 特征与价格散点图 + # 删除所有*散点图.png + for file in os.listdir(dataset): + if file.endswith("散点图.png"): + os.remove(os.path.join(dataset, file)) + plt.rcParams['font.sans-serif'] = ['SimHei'] + plt.rcParams['axes.unicode_minus'] = False + plt.figure(figsize=(10, 10)) + # # 遍历X每一列,和yy画散点图 , + # for i, col in enumerate(X.columns): + # plt.subplot(2, 2, i%4+1) + # plt.scatter(X[col], yy) + # plt.xlabel(col) + # plt.ylabel(y) + # plt.title(col) + # if i % 4 == 3 or i == len(X.columns)-1: + # plt.tight_layout() + # plt.savefig(os.path.join(dataset,f'{i}指标数据特征与价格散点图.png')) + # plt.close() + + + +def corr_feature(df): + # 重新命名列名,列名排序,y在第一个 + df.reindex(['y'] + sorted(df.columns.difference(['y']))) + df_test = df.copy() + # 取最后的220行 + df_test = df_test.tail(220) + # 去掉日期列 + df_test = df_test.drop(columns=['ds']) + # 不参与标准化 + df_test_noscaler = df_test.copy() # 滞后处理备份 + df_noscaler = df_test.copy() + # 画出相关性热力图 + df_test.to_csv(os.path.join(dataset,'同步相关性.csv')) + corr = df_test.corr() + # 保存相关系数 + corr.to_csv(os.path.join(dataset,'同步相关性系数.csv')) + # plt.figure(figsize=(10, 10)) + # sns.heatmap(corr, annot=True, cmap='coolwarm') + # plt.savefig('dataset/同步相关性热力图.png') + # plt.show() + + # 读取滞后周期文件,更改特征 + characteristic_period = pd.read_csv('dataset/特征滞后周期.csv',encoding='utf-8') + # 去掉周期为0的行 + characteristic_period = characteristic_period.drop(characteristic_period[characteristic_period['滞后周期'] == 0].index) + for col in df.columns: + # 跳过y列 + if col in ['y']: + continue + # 特征滞后n个周期,计算与y的相关性 + if col in characteristic_period['特征'].values: + # 获取特征对应的周期 + period = characteristic_period[characteristic_period['特征'] == col]['滞后周期'].values[0] + # 滞后处理 + df[col] = df[col].shift(period) + df.to_csv(os.path.join(dataset,'滞后处理后的数据集.csv')) + + + # corr_feture_noscaler = {} # 保存相关性最大的周期 + # 遍历df_test的每一列,计算相关性 + # for col in df_noscaler.columns: + # # 跳过y列 + # if col in ['y']: + # continue + # logger.info('特征:', col) + # # 特征滞后n个周期,计算与y的相关性 + # corr_dict = {} + # try: + # for i in range(0, 200): + # if i == 0: + # df_noscaler[col+'_'+str(i)] = df_noscaler[col] + # else: + # df_noscaler[col+'_'+str(i)] = df_noscaler[col].shift(i) + # corr_dict[col+'_'+str(i)] = abs(df_noscaler[col+'_'+str(i)].corr(df_noscaler['y'])) + # except : + # logger.info('特征:', col, '滑动错误,请查看') + # continue + # 输出相关性最大的特征 + # logger.info(max(corr_dict, key=corr_dict.get), corr_dict[max(corr_dict, key=corr_dict.get)]) + # corr_feture_noscaler[col] = max(corr_dict, key=corr_dict.get).split('_')[-1] + # 画出最相关性最大的特征和y的折线图 + # plt.figure(figsize=(10, 5)) + # plt.plot(df_noscaler[max(corr_dict, key=corr_dict.get)], label=max(corr_dict, key=corr_dict.get)) + # # 设置双坐标轴 + # ax1 = plt.gca() + # ax2 = ax1.twinx() + # ax2.plot(df_noscaler['y'], color='r', label='y') + # plt.legend() + # try: + # plt.savefig('dataset/特征与y的折线图_'+max(corr_dict, key=corr_dict.get)+'.png') + # except : + # # :替换成_ + # plt.savefig('dataset/特征与y的折线图_'+max(corr_dict, key=corr_dict.get).replace(':','_').replace('/','_').replace('(','_').replace(')','_')+'.png') + # plt.close() + # 结果保存到txt文件 + # logger.info('不参与标准化的特征滞后相关性写入txt文件') + # with open('dataset/不参与标准化的特征滞后相关性.txt', 'w') as f: + # for key, value in corr_feture_noscaler.items(): + # f.write('%s:%s\n' % (key, value)) + # 遍历corr_feture_noscaler,更改df + # colnames_noscaler = [] + # for col in corr_feture_noscaler: + # colname = col+'_'+corr_feture_noscaler[col] + # if int(corr_feture_noscaler[col]) == 0: + # continue + # df_test_noscaler[colname] = df_test_noscaler[col].shift(int(corr_feture_noscaler[col])) + # df_test_noscaler = df_test_noscaler.drop(columns=[col]) + # colnames_noscaler.append(colname) + # 去除有空值的行 + # df_test_noscaler = df_test_noscaler.dropna() + # df_test_noscaler.reindex(['y'] + sorted(df_test_noscaler.columns.difference(['y']))) + # df_test_noscaler.to_csv('dataset/不参与标准化的特征滞后相关性.csv', index=False) + # 画出相关性热力图 + # corr = df_test_noscaler.corr() + # 保存相关系数 + # corr.to_csv(os.path.join(dataset,'不参与标准化的特征滞后相关性系数.csv')) + # plt.figure(figsize=(10, 10)) + # sns.heatmap(corr, annot=True, cmap='coolwarm') + # plt.savefig('dataset/不参与标准化的特征滞后相关性热力图.png') + # plt.close() + # # 标准化每列 + # from sklearn.preprocessing import StandardScaler + # scaler = StandardScaler() + # df_test = pd.DataFrame(scaler.fit_transform(df_test), columns=df_test.columns) + # corr_feture = {} # 保存相关性最大的周期 + # # 遍历df_test的每一列,计算相关性 + # for col in df_test.columns: + # # 跳过y列 + # if col == 'y': + # continue + # logger.info('特征:', col) + # # 特征滞后n个周期,计算与y的相关性 + # corr_dict = {} + # try: + # for i in range(0, 200): + # if i == 0: + # df_test[col+'_'+str(i)] = df_test[col] + # else: + # df_test[col+'_'+str(i)] = df_test[col].shift(i) + # corr_dict[col+'_'+str(i)] = abs(df_test[col+'_'+str(i)].corr(df_test['y'])) + # except : + # logger.info('特征:', col, '滑动错误,请查看') + # continue + # # 输出相关性最大的特征 + # logger.info(max(corr_dict, key=corr_dict.get), corr_dict[max(corr_dict, key=corr_dict.get)]) + # corr_feture[col] = max(corr_dict, key=corr_dict.get).split('_')[-1] + + + # # 结果保存到txt文件 + # with open('dataset/标准化的特征滞后相关性.txt', 'w') as f: + # for key, value in corr_feture.items(): + # f.write('%s:%s\n' % (key, value)) + # # 遍历corr_feture,更改df + # colnames = [] + # for col in corr_feture: + # colname = col+'_'+corr_feture[col] + # if int(corr_feture[col]) == 0: + # continue + # df[colname] = df[col].shift(int(corr_feture[col])) + # df = df.drop(columns=[col]) + # colnames.append(colname) + # # 去除有空值的行 + # df = df.dropna() + # df.reindex(['y'] + sorted(df.columns.difference(['y']))) + # df.to_csv('dataset/标准化后的特征滞后相关性.csv', index=False) + # # 画出相关性热力图 + # ds = df['ds'] + # df = df.drop(columns=['ds']) + # corr = df.corr() + # # 保存相关系数 + # corr.to_csv(os.path.join(dataset,'标准化后的特征滞后相关性系数.csv')) + # plt.figure(figsize=(10, 10)) + # sns.heatmap(corr, annot=True, cmap='coolwarm') + # plt.savefig('dataset/标准化后的特征滞后相关性热力图.png') + # plt.show() + # df['ds'] = ds + + # 去除nan值 + df = df.dropna() + return df + + +def calculate_kdj(data, n=9): + ''' + 给传进来的df 添加列: 波动率,最高,最低,k ,d ,j + ''' + data = data.sort_values(by='ds', ascending=True) + # 因为没有高开低价格,利用每日波动率模拟当天最高价和最低价 + data['pctchange'] = data['y'].pct_change() + # 收益为0的用0.01 + data['pctchange'] = data['pctchange'].replace(0,0.01) + data.dropna(inplace=True) + # 重置索引 + data.reset_index(drop=True,inplace=True) + data['high'] = data['y']* (1+abs(data['pctchange'])/2) + data['low'] = data['y']* (1-abs(data['pctchange'])/2) + low_list = data['y'].rolling(window=n, min_periods=1).min() + high_list = data['y'].rolling(window=n, min_periods=1).max() + rsv = ((data['y'] - low_list) / (high_list - low_list)) * 100 + k = pd.Series(50, index=data.index) + d = pd.Series(50, index=data.index) + for i in range(1, len(data)): + k[i] = (2/3 * k[i - 1]) + (1/3 * rsv[i]) + d[i] = (2/3 * d[i - 1]) + (1/3 * k[i]) + j = 3 * k - 2 * d + + data['K'] = k + data['D'] = d + data['J'] = j + # 将包含 KDJ 指标的数据保存到新的 CSV 文件 + data.to_csv('dataset\stock_data_with_kdj.csv', index=False) + # data = data.dropna() + return data + +def check_column(df,col_name,two_months_ago): + ''' + 检查列是否需要删除。 + 该函数会检查列是否为空值列、180天没有更新的列或常数值列。 + 参数: + col_name (str): 列名。 + df (DataFrame): 包含列的 DataFrame。 + 返回: + bool: 如果列需要删除,返回 True;否则,返回 False。 + ''' + if 'ds' in col_name or 'y' in col_name: + return False + df_check_column = df[['ds',col_name,'y']] + df_check_column = df_check_column.dropna() + + if len(df_check_column) == 0: + print(f'空值列:{col_name}') + return True + # 判断是不是常数列 + if df_check_column[(df_check_column['ds']>= two_months_ago)].groupby(col_name).ngroups < 2: + print(f'180没有更新:{col_name}') + return True + + # 判断相关系数大于0.6 + if is_del_corr > 0: + if abs(df_check_column[col_name].corr(df_check_column['y'])) < is_del_corr: + print(f'相关系数小于0.6:{col_name}') + return True + + corresponding_date = df_check_column.iloc[-1]['ds'] + return corresponding_date < two_months_ago + +def datachuli(df_zhibiaoshuju,df_zhibiaoliebiao,datecol='date',end_time='',y='y',dataset='dataset',delweekenday=False,add_kdj=False,is_timefurture=False): + ''' + 原油特征数据处理函数, + 接收的是两个df,一个是指标数据,一个是指标列表 + 输出的是一个df,包含ds,y,指标列 + ''' + df = df_zhibiaoshuju.copy() + + if end_time == '': + end_time = datetime.datetime.now().strftime('%Y-%m-%d') + # 重命名时间列,预测列 + df.rename(columns={datecol:'ds'},inplace=True) + df.rename(columns={y:'y'},inplace=True) + # 按时间顺序排列 + df.sort_values(by='ds',inplace=True) + df['ds'] = pd.to_datetime(df['ds']) + # 获取start_year年到end_time的数据 + df = df[df['ds'].dt.year >= start_year] + df = df[df['ds'] <= end_time] + # last_update_times_df,y_last_update_time = create_feature_last_update_time(df) + # logger.info(f'删除预警的特征前数据量:{df.shape}') + # columns_to_drop = last_update_times_df[last_update_times_df['warning_date'] < y_last_update_time ]['feature'].values.tolist() + # df = df.drop(columns = columns_to_drop) + # logger.info(f'删除预警的特征后数据量:{df.shape}') + # if is_update_warning_data: + # upload_warning_info(last_update_times_df,y_last_update_time) + # 去掉近最后数据对应的日期在六月以前的列,删除近2月的数据是常熟的列 + if is_del_tow_month: + current_date = datetime.datetime.now() + two_months_ago = current_date - timedelta(days=180) + logger.info(f'删除两月不更新特征前数据量:{df.shape}') + columns_to_drop = [] + for clo in df.columns: + if check_column(df,clo,two_months_ago): + columns_to_drop.append(clo) + df = df.drop(columns=columns_to_drop) + + logger.info(f'删除两月不更新特征后数据量:{df.shape}') + + if freq == 'W': + # 按周取样 + df = df.resample('W', on='ds').mean().reset_index() + elif freq == 'M': + # 按月取样 + df = df.resample('M', on='ds').mean().reset_index() + # 删除预测列空值的行 + df = df.dropna(subset=['y']) + logger.info(f'删除预测列为空值的行后数据量:{df.shape}') + df = df.dropna(axis=1, how='all') + logger.info(f'删除全为空值的列后数据量:{df.shape}') + df.to_csv(os.path.join(dataset,'未填充的特征数据.csv'),index=False) + # 去掉指标列表中的columns_to_drop的行 + df_zhibiaoliebiao = df_zhibiaoliebiao[df_zhibiaoliebiao['指标名称'].isin(df.columns.tolist())] + df_zhibiaoliebiao.to_csv(os.path.join(dataset,'特征处理后的指标名称及分类.csv'),index=False) + # 数据频度分析 + featurePindu(dataset=dataset) + # 向上填充 + df = df.ffill() + # 向下填充 + df = df.bfill() + + # 删除周六日的数据 + if delweekenday: + df = df[df['ds'].dt.weekday < 5] + + # kdj指标 + if add_kdj: + df = calculate_kdj(df) + # 衍生时间特征 + if is_timefurture: + df = addtimecharacteristics(df=df,dataset=dataset) + # 特征分析 + featureAnalysis(df,dataset=dataset,y=y) + return df + +def datachuli_juxiting(df_zhibiaoshuju,df_zhibiaoliebiao,datecol='date',end_time='',y='y',dataset='dataset',delweekenday=False,add_kdj=False,is_timefurture=False): + ''' + 聚烯烃特征数据处理函数, + 接收的是两个df,一个是指标数据,一个是指标列表 + 输出的是一个df,包含ds,y,指标列 + ''' + df = df_zhibiaoshuju.copy() + if end_time == '': + end_time = datetime.datetime.now().strftime('%Y-%m-%d') + # date转为pddate + df.rename(columns={datecol:'ds'},inplace=True) + + # 指定列统一减少数值 + df[offsite_col] = df[offsite_col]-offsite + # 预测列为avg_cols的均值 + df[y] = df[avg_cols].mean(axis=1) + # 去掉多余的列avg_cols + df = df.drop(columns=avg_cols) + + # 重命名预测列 + df.rename(columns={y:'y'},inplace=True) + # 按时间顺序排列 + df.sort_values(by='ds',inplace=True) + df['ds'] = pd.to_datetime(df['ds']) + # 获取2018年到当前日期的数据 + df = df[df['ds'].dt.year >= 2018] + # 获取小于等于当前日期的数据 + df = df[df['ds'] <= end_time] + logger.info(f'删除两月不更新特征前数据量:{df.shape}') + # 去掉近最后数据对应的日期在两月以前的列,删除近2月的数据是常数的列 + current_date = datetime.datetime.now() + two_months_ago = current_date - timedelta(days=40) + # 检查两月不更新的特征 + def check_column(col_name): + if 'ds' in col_name or 'y' in col_name: + return False + df_check_column = df[['ds',col_name]] + df_check_column = df_check_column.dropna() + if len(df_check_column) == 0: + return True + if df_check_column[(df_check_column['ds']>= two_months_ago)].groupby(col_name).ngroups < 2: + return True + corresponding_date = df_check_column.iloc[-1]['ds'] + return corresponding_date < two_months_ago + columns_to_drop = df.columns[df.columns.map(check_column)].tolist() + df = df.drop(columns = columns_to_drop) + + logger.info(f'删除两月不更新特征后数据量:{df.shape}') + + # 删除预测列空值的行 + df = df.dropna(subset=['y']) + logger.info(f'删除预测列为空值的行后数据量:{df.shape}') + df = df.dropna(axis=1, how='all') + logger.info(f'删除全为空值的列后数据量:{df.shape}') + df.to_csv(os.path.join(dataset,'未填充的特征数据.csv'),index=False) + # 去掉指标列表中的columns_to_drop的行 + df_zhibiaoliebiao = df_zhibiaoliebiao[df_zhibiaoliebiao['指标名称'].isin(df.columns.tolist())] + df_zhibiaoliebiao.to_csv(os.path.join(dataset,'特征处理后的指标名称及分类.csv'),index=False) + # 频度分析 + featurePindu(dataset=dataset) + # 向上填充 + df = df.ffill() + # 向下填充 + df = df.bfill() + + # 删除周六日的数据 + if delweekenday: + df = df[df['ds'].dt.weekday < 5] + + if add_kdj: + df = calculate_kdj(df) + + if is_timefurture: + df = addtimecharacteristics(df=df,dataset=dataset) + + featureAnalysis(df,dataset=dataset,y=y) + return df + +def getdata(filename, datecol='date',y='y',dataset='',add_kdj=False,is_timefurture=False,end_time=''): + logger.info('getdata接收:'+filename+' '+datecol+' '+end_time) + # 判断后缀名 csv或excel + if filename.endswith('.csv'): + df = loadcsv(filename) + else: + # 读取excel 指标数据 + df_zhibiaoshuju = pd.read_excel(filename,sheet_name='指标数据') + df_zhibiaoliebiao = pd.read_excel(filename,sheet_name='指标列表') + + + # 日期字符串转为datatime + df = datachuli(df_zhibiaoshuju,df_zhibiaoliebiao,datecol,y = y,dataset=dataset,add_kdj=add_kdj,is_timefurture=is_timefurture,end_time=end_time) + + return df,df_zhibiaoliebiao +def getdata_juxiting(filename, datecol='date',y='y',dataset='',add_kdj=False,is_timefurture=False,end_time=''): + logger.info('getdata接收:'+filename+' '+datecol+' '+end_time) + # 判断后缀名 csv或excel + if filename.endswith('.csv'): + df = loadcsv(filename) + else: + # 读取excel 指标数据 + df_zhibiaoshuju = pd.read_excel(filename,sheet_name='指标数据') + df_zhibiaoliebiao = pd.read_excel(filename,sheet_name='指标列表') + + # 日期字符串转为datatime + df = datachuli_juxiting(df_zhibiaoshuju,df_zhibiaoliebiao,datecol,y = y,dataset=dataset,add_kdj=add_kdj,is_timefurture=is_timefurture,end_time=end_time) + + return df + + +def sanitize_filename(filename): + # 使用正则表达式替换不合规的字符 + # 这里我们替换为下划线'_',但你可以根据需要选择其他字符 + sanitized = re.sub(r'[\\/*?:"<>|\s]', '_', filename) + # 移除开头的点(在某些系统中,以点开头的文件可能是隐藏的) + sanitized = re.sub(r'^\.', '', sanitized) + # 如果需要,可以添加更多替换规则 + return sanitized + +class BinanceAPI: + ''' + 获取 Binance API 请求头签名 + ''' + def __init__(self, APPID, SECRET): + self.APPID = APPID + self.SECRET = SECRET + self.get_signature() + + # 生成随机字符串作为 nonce + def generate_nonce(self, length=32): + self.nonce = ''.join(random.choices(string.ascii_letters + string.digits, k=length)) + return self.nonce + + # 获取当前时间戳(秒) + def get_timestamp(self): + return int(time.time()) + + # 构建待签名字符串 + def build_sign_str(self): + return f'appid={self.APPID}&nonce={self.nonce}×tamp={self.timestamp}' + + # 使用 HMAC SHA-256 计算签名 + def calculate_signature(self, secret, message): + return base64.urlsafe_b64encode(hmac.new(secret.encode('utf-8'), message.encode('utf-8'), hashlib.sha256).digest()).decode('utf-8') + + def get_signature(self): + # 调用上述方法生成签名 + self.nonce = self.generate_nonce() + self.timestamp = self.get_timestamp() + self.sign_str = self.build_sign_str() + self.signature = self.calculate_signature(self.SECRET, self.sign_str) + # return self.signature + +class Graphs: + # 绘制标题 + @staticmethod + def draw_title(title: str): + # 获取所有样式表 + style = getSampleStyleSheet() + # 拿到标题样式 + ct = style['Heading1'] + # 单独设置样式相关属性 + ct.fontName = 'SimSun' # 字体名 + ct.fontSize = 18 # 字体大小 + ct.leading = 50 # 行间距 + ct.textColor = colors.green # 字体颜色 + ct.alignment = 1 # 居中 + ct.bold = True + # 创建标题对应的段落,并且返回 + return Paragraph(title, ct) + + # 绘制小标题 + @staticmethod + def draw_little_title(title: str): + # 获取所有样式表 + style = getSampleStyleSheet() + # 拿到标题样式 + ct = style['Normal'] + # 单独设置样式相关属性 + ct.fontName = 'SimSun' # 字体名 + ct.fontSize = 15 # 字体大小 + ct.leading = 30 # 行间距 + ct.textColor = colors.red # 字体颜色 + # 创建标题对应的段落,并且返回 + return Paragraph(title, ct) + + # 绘制普通段落内容 + @staticmethod + def draw_text(text: str): + # 获取所有样式表 + style = getSampleStyleSheet() + # 获取普通样式 + ct = style['Normal'] + ct.fontName = 'SimSun' + ct.fontSize = 12 + ct.wordWrap = 'CJK' # 设置自动换行 + ct.alignment = 0 # 左对齐 + ct.firstLineIndent = 32 # 第一行开头空格 + ct.leading = 25 + return Paragraph(text, ct) + + # 绘制表格 + @staticmethod + def draw_table(*args): + # 列宽度 + col_width = args[0] + style = [ + ('FONTNAME', (0, 0), (-1, -1), 'SimSun'), # 字体 + ('FONTSIZE', (0, 0), (-1, 0), 12), # 第一行的字体大小 + ('FONTSIZE', (0, 1), (-1, -1), 10), # 第二行到最后一行的字体大小 + ('BACKGROUND', (0, 0), (-1, 0), '#d5dae6'), # 设置第一行背景颜色 + ('ALIGN', (0, 0), (-1, -1), 'CENTER'), # 第一行水平居中 + ('ALIGN', (0, 1), (-1, -1), 'LEFT'), # 第二行到最后一行左右左对齐 + ('VALIGN', (0, 0), (-1, -1), 'MIDDLE'), # 所有表格上下居中对齐 + ('TEXTCOLOR', (0, 0), (-1, -1), colors.darkslategray), # 设置表格内文字颜色 + ('GRID', (0, 0), (-1, -1), 0.5, colors.grey), # 设置表格框线为grey色,线宽为0.5 + # ('SPAN', (0, 1), (0, 2)), # 合并第一列二三行 + # ('SPAN', (0, 3), (0, 4)), # 合并第一列三四行 + # ('SPAN', (0, 5), (0, 6)), # 合并第一列五六行 + # ('SPAN', (0, 7), (0, 8)), # 合并第一列五六行 + ] + table = Table(args[1:], colWidths=col_width, style=style) + return table + + # 创建图表 + @staticmethod + def draw_bar(bar_data: list, ax: list, items: list): + drawing = Drawing(500, 250) + bc = VerticalBarChart() + bc.x = 45 # 整个图表的x坐标 + bc.y = 45 # 整个图表的y坐标 + bc.height = 200 # 图表的高度 + bc.width = 350 # 图表的宽度 + bc.data = bar_data + bc.strokeColor = colors.black # 顶部和右边轴线的颜色 + bc.valueAxis.valueMin = 5000 # 设置y坐标的最小值 + bc.valueAxis.valueMax = 26000 # 设置y坐标的最大值 + bc.valueAxis.valueStep = 2000 # 设置y坐标的步长 + bc.categoryAxis.labels.dx = 2 + bc.categoryAxis.labels.dy = -8 + bc.categoryAxis.labels.angle = 20 + bc.categoryAxis.categoryNames = ax + + # 图示 + leg = Legend() + leg.fontName = 'SimSun' + leg.alignment = 'right' + leg.boxAnchor = 'ne' + leg.x = 475 # 图例的x坐标 + leg.y = 240 + leg.dxTextSpace = 10 + leg.columnMaximum = 3 + leg.colorNamePairs = items + drawing.add(leg) + drawing.add(bc) + return drawing + + # 绘制图片 + @staticmethod + def draw_img(path): + img = Image(path) # 读取指定路径下的图片 + img.drawWidth = 20*cm # 设置图片的宽度 + img.drawHeight = 10*cm # 设置图片的高度 + return img + +# 定义样式函数 +def style_row(row): + if '周' in row['频度']: + return ['background-color: yellow'] * len(row) + else: + return ['background-color: gray'] * len(row) + + + +class EtaReader(): + def __init__(self,signature,classifylisturl,classifyidlisturl,edbcodedataurl,edbcodelist,edbdatapushurl,edbdeleteurl,edbbusinessurl): + ''' + 初始化 EtaReader 类的实例。 + + 参数: + signature (str): 用于 API 请求的签名。 + classifylisturl (str): 分类列表的 URL。 + classifyidlisturl (str): 分类 ID 列表的 URL。 + edbcodedataurl (str): EDB 代码数据的 URL。 + edbdatapushurl (str): EDB 数据推送的 URL。 + edbcodelist (str): EDB 代码列表的 URL。 + edbdeleteurl (str): EDB 数据删除的 URL。 + edbbusinessurl (str): EDB 业务数据的 URL。 + + 返回: + None + ''' + self.signature = signature + self.classifylisturl = classifylisturl + self.classifyidlisturl = classifyidlisturl + self.edbcodedataurl = edbcodedataurl + self.edbdatapushurl = edbdatapushurl + self.edbcodelist = edbcodelist + self.edbdeleteurl = edbdeleteurl + self.edbbusinessurl = edbbusinessurl + + + def filter_yuanyou_data(self,ClassifyName,data): + ''' + 指标名称保留规则 + ''' + + # 包含 关键词 去除, 返回flase + if any(keyword in data for keyword in ['运费','检修','波动率','地缘政治','股价', + '同比','环比','环差','裂差','4WMA','变频','道琼斯','标普500','纳斯达克', + '四周均值','名占比','残差','DMA', + '连7-连9','4周平均','4周均值','滚动相关性','日本']): + return False + + # 检查需要的特征 + # 去掉 分析 分类下的数据 + if ClassifyName == '分析': + return False + + # 保留 库存中特殊关键词 + if ClassifyName == '库存': + if any(keyword in data for keyword in ['原油' , '美国' ,'全球' ,'中国' ,'富查伊拉','ARA' ]): + return True + else: + pass + else: + pass + + # 去掉 持仓中不是基金的数据 + if ClassifyName == '持仓': + if '基金' not in data: + return False + else: + pass + else: + pass + + # 去掉 航班中不是中国、美国 的数据 + if ClassifyName == '需求': + if '航班' in data : + if '中国' in data or '美国' in data : + return True + else: + return False + else: + pass + else: + pass + + # 分类为 期货市场,同质性数据取第一个 + if ClassifyName == '期货市场': + # 去掉c1-9 以后的 + if 'c1-c' in data: + try: + c = int(data.split('c1-c')[1]) + except: + return False + if c > 9 : + return False + else: + pass + + else: + pass + + # 判断 同质性数据, 字符串开头 + strstartdict = {'ICE Brent c':"ICE Brent c14", + 'NYMWX WTI c':"NYMWX WTI c5", + 'INE SC c':"INE SC c1", + 'EFS c':"EFS c", + 'Dubai Swap c':"Dubai Swap c1", + 'Oman Swap c':"Oman Swap c1", + 'DME Oman c':"DME Oman c1", + 'Murban Futures c':"Murban Futures c1", + 'Dubai连合约价格':'Dubai连1合约价格', + '美国RBOB期货月份合约价格':'美国RBOB期货2309月份合约价格', + 'Brent连合约价格':'Brent连1合约价格', + 'WTI连合约价格':'WTI连1合约价格', + '布伦特连合约价格':'Brent连1合约价格', + 'Brent 连合约价格':'Brent连1合约价格', + 'Dubai连合约价格':'Dubai连1合约价格', + 'Brent连':'Brent连1合约价格', + 'brent连':'Brent连1合约价格', + } + # 判断名称字符串开头是否在 strstartdict.keys中 + match = re.match(r'([a-zA-Z\s]+)(\d+)', data) + if match: + part1 = match.group(1) + part2 = match.group(2) + if part1 in [i for i in strstartdict.keys()]: + if data == strstartdict[part1]: + return True + else: + return False + # data = 'Brent 连7合约价格' + # 判断名称字符串去掉数字后是否在 strstartdict.keys中 + match = re.findall(r'\D+', data) + if match : + if len(match) == 2: + part1 = match[0] + part2 = match[1] + if part1+part2 in [i for i in strstartdict.keys()]: + if data == strstartdict[part1+part2]: + return True + else: + return False + else: + pass + elif len(match) == 1: + match = re.findall(r'\D+', data) + part1 = match[0] + + if part1 in [i for i in strstartdict.keys()]: + if data == strstartdict[part1]: + return True + else: + return False + else: + pass + else: + pass + + # 去掉kpler数据源 + if 'Kpler' in ClassifyName or 'kpler' in ClassifyName: + return False + + return True + + def filter_pp_data(self,ClassifyName,data): + ''' + 指标名称保留规则 + ''' + + # 包含 关键词 去除, 返回flase + # if any(keyword in data for keyword in ['运费','检修','波动率','地缘政治','股价', + # '同比','环比','环差','裂差','4WMA','变频','道琼斯','标普500','纳斯达克', + # '四周均值','名占比','残差','DMA', + # '连7-连9','4周平均','4周均值','滚动相关性','日本']): + # return False + # 包含 关键词 保留, 返回True + if any(keyword in data for keyword in ['拉丝']): + return True + + + + # 检查需要的特征 + # 去掉 期货市场 分类下的数据 + if ClassifyName == '期货市场': + return False + else: + pass + + # 保留 库存 下所有指标 + if ClassifyName == '库存': + return True + else: + pass + + # 保留 进出口 下所有指标 + if ClassifyName == '进出口': + return True + else: + pass + + # 保留 价差 下所有指标 + if ClassifyName == '价差': + return True + else: + pass + + # 保留 供应 下所有指标 + if ClassifyName == '供应': + return True + else: + pass + + + # 保留 需求 下所有指标 + if ClassifyName == '需求': + return True + else: + pass + + + return True + + # 通过edbcode 获取指标数据 + def edbcodegetdata(self,df,EdbCode,EdbName): + # 根据指标id,获取指标数据 + url = self.edbcodedataurl+str(EdbCode) + # 发送GET请求 + response = requests.get(url, headers=self.headers) + + # 检查响应状态码 + if response.status_code == 200: + data = response.json() # 假设接口返回的是JSON数据 + all_data_items = data.get('Data') + # 列表转换为DataFrame + df3 = pd.DataFrame(all_data_items, columns=['DataTime', 'Value', 'UpdateTime']) + # df3 = pd.read_json(all_data_items, orient='records') + + # 去掉UpdateTime 列 + df3 = df3.drop(columns=['UpdateTime']) + # df3.set_index('DataTime') + df3.rename(columns={'Value': EdbName}, inplace=True) + # 将数据存储df1 + df = pd.merge(df, df3, how='outer',on='DataTime',suffixes= ('', '_y')) + # 按时间排序 + df = df.sort_values(by='DataTime', ascending=True) + return df + + else: + # 请求失败,打印错误信息 + logger.info(f'Error: {response.status_code}, {response.text}') + # 主动抛出异常 + raise Exception(f'Error: {response.status_code}, {response.text}') + + def get_eta_api_yuanyou_data(self,data_set,dataset=''): + ''' + 从ETA API获取原油数据 + + 参数: + data_set (str): 数据集名称 + dataset (str): 数据集ID,默认为空 + + 返回: + None + ''' + today = datetime.date.today().strftime("%Y-%m-%d") + + # 定义你的headers,这里可以包含多个参数 + self.headers = { + 'nonce': self.signature.nonce, # 例如,一个认证令牌 + 'timestamp': str(self.signature.timestamp), # 自定义的header参数 + 'appid': self.signature.APPID, # 另一个自定义的header参数 + 'signature': self.signature.signature + } + + # 从列表数据中获取指标名称,判断指标名称频度是否为日 ,如果是,则获取UniqueCode,然后获取指标数据,保存到xlat文件中的sheet表。 + + ''' + df = sheetname 指标列表,存储 指标分类-指标名称-指标id-频度 + df1 = sheetname 指标数据 ,存储 时间-指标名称1-指标名称2... + + ''' + + # 构建新的DataFrame df df1 + df = pd.DataFrame(columns=['指标分类', '指标名称', '指标id', '频度','指标来源','来源id','最后更新时间','更新周期','预警日期','停更周期']) + df1 = pd.DataFrame(columns=['DataTime']) + + + # 外网环境无法访问,请确认是否为内网环境 + try: + # 发送GET请求 获取指标分类列表 + response = requests.get(self.classifylisturl, headers=self.headers) + except requests.exceptions.RequestException as e: + raise Exception(f"请求失败,请确认是否为内网环境: {e}","\033[0m") + + # 检查响应状态码 + if response.status_code == 200: + # 获取成功, 处理响应内容 + data = response.json() # 假设接口返回的是JSON数据 + + # 请求成功,处理响应内容 + # logger.info(data.get('Data')) + # 定义你想要保留的固定值 + fixed_value = 1214 + + # 遍历列表,只保留那些'category' key的值为固定值的数据项 + filtered_data = [item for item in data.get('Data') if item.get('ParentId') == fixed_value] + + #然后循环filtered_data去获取list数据,才能获取到想要获取的ClassifyId + n = 0 + for item in filtered_data: + n+= 1 + # if n>50: + # break + ClassifyId = item["ClassifyId"] #分类id,分类下的指标列表接口的请求参数 + ClassifyName = item["ClassifyName"] #分类名称,要保存到df的指标分类列 + # 根据分类id,获取指标列表 + url = self.classifyidlisturl+str(ClassifyId) + response = requests.get(url, headers=self.headers) + if response.status_code == 200: + # logger.info(response.text) + data2 = response.json() + Data = data2.get('Data') + for i in Data: + # s+= 1 + EdbCode = i.get('EdbCode') + EdbName = i.get('EdbName') # 指标名称,要保存到df2的指标名称列,df的指标名称列 + Frequency = i.get('Frequency') # 频度,要保存到df的频度列 + SourceName = i.get('SourceName') # 来源名称,要保存到df的频度列 + Source = i.get('Source') # 来源ID,要保存到df的频度列 + Unit = i.get('Unit') # 单位,要保存到df的单位列 + # 频度不是 日 或者 周的 跳过 + if Frequency not in ['日度','周度','日','周']: + continue + + # 只保留手工数据中,名称带有 海运出口 海运进口 + if Source == 9 and not ('海运出口' in EdbName or '海运进口' in EdbName): + continue + + # 不要wind数据 + if Source == 2: + continue + + + # 判断名称是否需要保存 + isSave = self.filter_yuanyou_data(ClassifyName,EdbName) + if isSave: + # 保存到df + df1 = self.edbcodegetdata(df1,EdbCode,EdbName) + # 取df1所有行最后一列 + edbname_df = df1[['DataTime',f'{EdbName}']] + edbname_df = edbname_df.dropna() + + if len(edbname_df) == 0: + logger.info(f'指标名称:{EdbName} 没有数据') + continue + try: + time_sequence = edbname_df['DataTime'].values.tolist()[-10:] + except IndexError: + time_sequence = edbname_df['DataTime'].values.tolist() + # 使用Counter来统计每个星期几的出现次数 + from collections import Counter + weekday_counter = Counter(datetime.datetime.strptime(time_str, "%Y-%m-%d").strftime('%A') for time_str in time_sequence) + + # 打印出现次数最多的星期几 + try: + most_common_weekday = weekday_counter.most_common(1)[0][0] + # 计算两周后的日期 + warning_date = (datetime.datetime.strptime(time_sequence[-1], "%Y-%m-%d") + datetime.timedelta(weeks=2)).strftime("%Y-%m-%d") + stop_update_period = (datetime.datetime.strptime(today, "%Y-%m-%d") - datetime.datetime.strptime(time_sequence[-1], "%Y-%m-%d")).days // 7 + + except IndexError: + most_common_weekday = '其他' + stop_update_period = 0 + if '日' in Frequency: + most_common_weekday = '每天' + warning_date = (datetime.datetime.strptime(time_sequence[-1], "%Y-%m-%d") + datetime.timedelta(days=3)).strftime("%Y-%m-%d") + stop_update_period = (datetime.datetime.strptime(today, "%Y-%m-%d") - datetime.datetime.strptime(time_sequence[-1], "%Y-%m-%d")).days + + # 保存频度 指标名称 分类 指标id 到 df + df2 = pd.DataFrame({'指标分类': ClassifyName, + '指标名称': EdbName, + '指标id': EdbCode, + '单位': Unit, + '频度': Frequency, + '指标来源':SourceName, + '来源id':Source, + '最后更新时间':edbname_df['DataTime'].values[-1], + '更新周期':most_common_weekday, + '预警日期':warning_date, + '停更周期':stop_update_period},index=[0], + ) + + # df = pd.merge(df, df2, how='outer') + df = pd.concat([df, df2]) + else: + logger.info(f'跳过指标 {EdbName}') + + # 找到列表中不在指标列中的指标id,保存成新的list + new_list = [item for item in self.edbcodelist if item not in df['指标id'].tolist()] + logger.info(new_list) + # 遍历new_list,获取指标数据,保存到df1 + for item in new_list: + logger.info(item) + # 将item 加入到 df['指标id']中 + try: + itemname = edbcodenamedict[item] + except: + itemname = item + + df1 = self.edbcodegetdata(df1,item,itemname) + df = pd.concat([df, pd.DataFrame({'指标分类': '其他', '指标名称': itemname, '指标id': item, '频度': '其他','指标来源':'其他','来源id':'其他'},index=[0])]) + + # 按时间排序 + df1.sort_values('DataTime',inplace=True,ascending=False) + df1.rename(columns={'DataTime': 'date'},inplace=True) + # df1.dropna(inplace=True) + # 去掉大于今天日期的行 + df1 = df1[df1['date'] <= datetime.datetime.now().strftime('%Y-%m-%d')] + logger.info(df1.head()) + # logger.info(f'{df1.head()}') + + df_zhibiaoshuju = df1.copy() + df_zhibiaoliebiao = df.copy() + return df_zhibiaoshuju,df_zhibiaoliebiao + + def get_eta_api_pp_data(self,data_set,dataset=''): + global ClassifyId + today = datetime.date.today().strftime("%Y-%m-%d") + + # 定义你的headers,这里可以包含多个参数 + self.headers = { + 'nonce': self.signature.nonce, # 例如,一个认证令牌 + 'timestamp': str(self.signature.timestamp), # 自定义的header参数 + 'appid': self.signature.APPID, # 另一个自定义的header参数 + 'signature': self.signature.signature + } + + # 从列表数据中获取指标名称,判断指标名称频度是否为日 ,如果是,则获取UniqueCode,然后获取指标数据,保存到xlat文件中的sheet表。 + + ''' + df = sheetname 指标列表,存储 指标分类-指标名称-指标id-频度 + df1 = sheetname 指标数据 ,存储 时间-指标名称1-指标名称2... + + ''' + + # 构建新的DataFrame df df1 + df = pd.DataFrame(columns=['指标分类', '指标名称', '指标id', '频度']) + df1 = pd.DataFrame(columns=['DataTime']) + + + # 外网环境无法访问,请确认是否为内网环境 + try: + # 发送GET请求 获取指标分类列表 + response = requests.get(self.classifylisturl, headers=self.headers) + except requests.exceptions.RequestException as e: + raise Exception(f"请求失败,请确认是否为内网环境: {e}","\033[0m") + + # 检查响应状态码 + if response.status_code == 200: + # 获取成功, 处理响应内容 + data = response.json() # 假设接口返回的是JSON数据 + + # 请求成功,处理响应内容 + # logger.info(data.get('Data')) + # 定义你想要保留的固定值 + fixed_value = ClassifyId + + # 遍历列表,只保留那些'category' key的值为固定值的数据项 + filtered_data = [item for item in data.get('Data') if item.get('ParentId') == fixed_value] + + #然后循环filtered_data去获取list数据,才能获取到想要获取的ClassifyId + n = 0 + for item in filtered_data: + n+= 1 + # if n>50: + # break + ClassifyId = item["ClassifyId"] #分类id,分类下的指标列表接口的请求参数 + ClassifyName = item["ClassifyName"] #分类名称,要保存到df的指标分类列 + # 根据分类id,获取指标列表 + url = self.classifyidlisturl+str(ClassifyId) + response = requests.get(url, headers=self.headers) + if response.status_code == 200: + # logger.info(response.text) + data2 = response.json() + Data = data2.get('Data') + for i in Data: + # s+= 1 + EdbCode = i.get('EdbCode') + EdbName = i.get('EdbName') # 指标名称,要保存到df2的指标名称列,df的指标名称列 + Frequency = i.get('Frequency') # 频度,要保存到df的频度列 + # 频度不是 日 或者 周的 跳过 + if Frequency not in ['日度','周度','日','周']: + continue + + # 判断名称是否需要保存 + isSave = self.filter_pp_data(ClassifyName,EdbName) + if isSave: + # 保存到df + # 保存频度 指标名称 分类 指标id 到 df + df2 = pd.DataFrame({'指标分类': ClassifyName, '指标名称': EdbName, '指标id': EdbCode, '频度': Frequency},index=[0]) + + # df = pd.merge(df, df2, how='outer') + df = pd.concat([df, df2]) + df1 = self.edbcodegetdata(df1,EdbCode,EdbName) + else: + logger.info(f'跳过指标 {EdbName}') + + # 找到列表中不在指标列中的指标id,保存成新的list + new_list = [item for item in self.edbcodelist if item not in df['指标id'].tolist()] + logger.info(new_list) + # 遍历new_list,获取指标数据,保存到df1 + for item in new_list: + logger.info(item) + # 将item 加入到 df['指标id']中 + try: + itemname = edbcodenamedict[item] + except: + itemname = item + + df1 = self.edbcodegetdata(df1,item,itemname) + df = pd.concat([df, pd.DataFrame({'指标分类': '其他', '指标名称': itemname, '指标id': item, '频度': '其他'},index=[0])]) + + # 按时间排序 + df1.sort_values('DataTime',inplace=True,ascending=False) + df1.rename(columns={'DataTime': 'date'},inplace=True) + # df1.dropna(inplace=True) + # 去掉大于今天日期的行 + df1 = df1[df1['date'] <= datetime.datetime.now().strftime('%Y-%m-%d')] + logger.info(df1.head()) + # logger.info(f'{df1.head()}') + # 保存到xlsx文件的sheet表 + with pd.ExcelWriter(os.path.join(dataset,data_set)) as file: + df1.to_excel(file, sheet_name='指标数据', index=False) + df.to_excel(file, sheet_name='指标列表', index=False) + + df_zhibiaoshuju = df1.copy() + df_zhibiaoliebiao = df.copy() + return df_zhibiaoshuju,df_zhibiaoliebiao + + def push_data(self,data): + + today = datetime.date.today().strftime("%Y-%m-%d") + + # 定义你的headers,这里可以包含多个参数 + self.headers = { + 'nonce': self.signature.nonce, # 例如,一个认证令牌 + 'timestamp': str(self.signature.timestamp), # 自定义的header参数 + 'appid': self.signature.APPID, # 另一个自定义的header参数 + 'signature': self.signature.signature + } + + # 发送post请求 上传数据 + logger.info('请求参数:',data) + response = requests.post(self.edbdatapushurl, headers=self.headers,data=json.dumps(data)) + + # 检查响应状态码 + if response.status_code == 200: + data = response.json() # 假设接口返回的是JSON数据 + + logger.info('上传成功,响应为:', data) + + else: + # 请求失败,打印错误信息 + logger.info(f'Error: {response.status_code}, {response.text}') + # 主动抛出异常 + raise Exception(f'Error: {response.status_code}, {response.text}') + + def del_zhibiao(self,IndexCodeList): + today = datetime.date.today().strftime("%Y-%m-%d") + + # 定义你的headers,这里可以包含多个参数 + self.headers = { + 'nonce': self.signature.nonce, # 例如,一个认证令牌 + 'timestamp': str(self.signature.timestamp), # 自定义的header参数 + 'appid': self.signature.APPID, # 另一个自定义的header参数 + 'signature': self.signature.signature + } + + data = { + "IndexCodeList": IndexCodeList #指标编码列表 + } + # 发送post请求 上传数据 + response = requests.post(self.edbdeleteurl, headers=self.headers,data=json.dumps(data)) + + + # 检查响应状态码 + if response.status_code == 200: + data = response.json() # 假设接口返回的是JSON数据 + + logger.info('删除成功,响应为:', data) + + else: + # 请求失败,打印错误信息 + logger.info(f'Error: {response.status_code}, {response.text}') + # 主动抛出异常 + raise Exception(f'Error: {response.status_code}, {response.text}') + + def del_business(self,data): + '''' + 接口地址 + https://console-docs.apipost.cn/preview/fce869601d0be1d9/9a637c2f9ed0c589?target_id=d3cafcbf-a68c-42b3-b105-7bbd0e95a9cd + + 请求体 body + { + "IndexCode": "W001067", //指标编码 + "StartDate": "2020-04-20", //指标需要删除的开始日期(>=),如果开始日期和结束日期相等,那么就是删除该日期 + "EndDate": "2024-05-28" //指标需要删除的结束日期(<=),如果开始日期和结束日期相等,那么就是删除该日期 + } + ''' + today = datetime.date.today().strftime("%Y-%m-%d") + + # 定义你的headers,这里可以包含多个参数 + self.headers = { + 'nonce': self.signature.nonce, # 例如,一个认证令牌 + 'timestamp': str(self.signature.timestamp), # 自定义的header参数 + 'appid': self.signature.APPID, # 另一个自定义的header参数 + 'signature': self.signature.signature + } + + + # 发送post请求 上传数据 + response = requests.post(self.edbbusinessurl, headers=self.headers,data=json.dumps(data)) + + + # 检查响应状态码 + if response.status_code == 200: + data = response.json() # 假设接口返回的是JSON数据 + + logger.info('删除成功,响应为:', data) + + else: + # 请求失败,打印错误信息 + logger.info(f'Error: {response.status_code}, {response.text}') + # 主动抛出异常 + raise Exception(f'Error: {response.status_code}, {response.text}') + + +def get_market_data(end_time,df): + """ + 获取市场数据,拼接到df中 + """ + # 获取token + token = get_head_auth_report() + # 定义请求参数 + query_data_list_item_nos_data['data']['dateEnd'] = end_time.replace('-','') + # 发送请求 + headers = {"Authorization": token} + logger.info('获取数据中...') + items_res = requests.post(url=query_data_list_item_nos_url, headers=headers, json=query_data_list_item_nos_data, timeout=(3, 35)) + json_data = json.loads(items_res.text) + logger.info(f"获取到的数据:{json_data}") + df3 = pd.DataFrame(json_data['data']) + # 按照dataItemNo 分组 得到多个dataframe ,最后根据dataDate merge 成一个dataframe + df2 = pd.DataFrame() + for i in df3['dataItemNo'].unique(): + df1 = df3[df3['dataItemNo'] == i] + df1 = df1[['dataDate', 'dataValue']] + df1 = df1.rename(columns={'dataValue': i}) + if len(df2) == 0: + df2 = df1 + continue + df2 = pd.merge(df2, df1, how='left') + df2 = df2.rename(columns={'dataDate': 'date'}) + # 20240101 转换为 2024-01-01 + df2['date'] = pd.to_datetime(df2['date'], format='%Y%m%d') + df2['date'] = df2['date'].dt.strftime('%Y-%m-%d') + df = pd.merge(df, df2, how='left',on='date') + return df + + +def get_high_low_data(df): + # 读取excel 从第五行开始 + df1 = pd.read_excel(os.path.join(dataset,'数据项下载.xls'),header=5, names=['numid','date', 'Brentzdj', 'Brentzgj']) + # 合并数据 + df = pd.merge(df, df1, how='left',on='date') + return df + + +# 时间特征,年,月,一年的多少天,周几,第几周,第几季度,每月的第几天, 每季度的第几天,是否每月的第一天,是否每月的最后一天,是否每季度的第一天,是否每季度的最后一天,是否每年的第一天,是否每年的最后一天 +def addtimecharacteristics(df,dataset): + """ + 为输入的 DataFrame 添加日期相关信息列 + + 参数: + df (pandas.DataFrame): 包含日期列 'ds' 的 DataFrame + + 返回: + pandas.DataFrame: 添加了相关列的 DataFrame + """ + df['year'] = df['ds'].dt.year + df['month'] = df['ds'].dt.month + df['day'] = df['ds'].dt.day + df['dayofweek'] = df['ds'].dt.dayofweek + df['weekofyear'] = df['ds'].dt.isocalendar().week + df['dayofyear'] = df['ds'].dt.dayofyear + df['quarternum'] = df['ds'].dt.quarter + # 将ds列转换为季度Period对象 + df['quarter'] = df['ds'].dt.to_period('Q') + # 获取每个季度的开始日期 + df['quarter_start'] = df['quarter'].dt.to_timestamp('s') + # 计算每个日期是所在季度的第几天 + df['dayofquarter'] = (df['ds'] - df['quarter_start']).dt.days + 1 + # 是否月初 + df['is_month_start'] = df['ds'].dt.is_month_start.astype(int) + # 是否月末 + df['is_month_end'] = df['ds'].dt.is_month_end.astype(int) + # 是否季度初 + df['is_quarter_start'] = df['ds'].dt.is_quarter_start.astype(int) + # 是否季度末 + df['is_quarter_end'] = df['ds'].dt.is_quarter_end.astype(int) + # 是否年初 + df['is_year_start'] = df['ds'].dt.is_year_start.astype(int) + # 是否年末 + df['is_year_end'] = df['ds'].dt.is_year_end.astype(int) + # 去掉 quarter_start quarter + df.drop(columns=['quarter_start','quarter'],inplace=True) + df.to_csv(os.path.join(dataset,'指标数据添加时间特征.csv'), index=False) + return df diff --git a/main_yuanyou.py b/main_yuanyou.py index b0ba92b..873e362 100644 --- a/main_yuanyou.py +++ b/main_yuanyou.py @@ -284,7 +284,7 @@ def predict_main(): file=max(glob.glob(os.path.join(dataset,'*.pdf')), key=os.path.getctime), ssl=ssl, ) - # m.send_mail() + m.send_mail() if __name__ == '__main__': diff --git a/models/nerulforcastmodels.py b/models/nerulforcastmodels.py index affe008..e14c5a7 100644 --- a/models/nerulforcastmodels.py +++ b/models/nerulforcastmodels.py @@ -489,6 +489,7 @@ def model_losss(sqlitedb,end_time): accuracy_rote = 0 for i,group in df3.groupby('CREAT_DATE'): accuracy_rote += (group['ACCURACY'].sum()/len(group))*weight_dict[len(group)-1] + accuracy_rote = round(accuracy_rote,2) df4 = pd.DataFrame(columns=['开始日期','结束日期','准确率']) df4.loc[len(df4)] = {'开始日期':ds_dates[0],'结束日期':ds_dates[-1],'准确率':accuracy_rote} df4.to_sql("accuracy_rote", con=sqlitedb.connection, if_exists='append', index=False)