PriceForecast/lib/dataread.py

1736 lines
70 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# 导入模块
import pandas as pd
import numpy as np
import datetime
import string
import base64
import requests
import random
import time
import re
import os
import hmac
import hashlib
import json
import math
import torch
torch.set_float32_matmul_precision("high")
import matplotlib.pyplot as plt
#设置plt显示中文
plt.rcParams['font.sans-serif'] = ['SimHei'] # 用来正常显示中文标签
plt.rcParams['axes.unicode_minus'] = False # 用来正常显示负号
from datetime import timedelta
from sklearn import metrics
from reportlab.pdfbase import pdfmetrics # 注册字体
from reportlab.pdfbase.ttfonts import TTFont # 字体类
from reportlab.platypus import Table, SimpleDocTemplate, Paragraph, Image # 报告内容相关类
from reportlab.lib.pagesizes import letter # 页面的标志尺寸(8.5*inch, 11*inch)
from reportlab.lib.styles import getSampleStyleSheet # 文本样式
from reportlab.lib import colors # 颜色模块
from reportlab.graphics.charts.barcharts import VerticalBarChart # 图表类
from reportlab.graphics.charts.legends import Legend # 图例类
from reportlab.graphics.shapes import Drawing # 绘图工具
from reportlab.lib.units import cm # 单位cm
# 注册字体(提前准备好字体文件, 如果同一个文件需要多种字体可以注册多个)
pdfmetrics.registerFont(TTFont('SimSun', 'SimSun.ttf'))
#设置plt显示中文
plt.rcParams['font.sans-serif'] = ['SimHei'] # 用来正常显示中文标签
plt.rcParams['axes.unicode_minus'] = False # 用来正常显示负号
# from config_jingbo_pro import *
from config_jingbo import *
# from config_yongan import *
# from config_juxiting import *
# 定义函数
def loadcsv(filename):
"""
读取指定文件名的 CSV 文件。
如果文件编码为 UTF-8则使用 UTF-8 编码读取;否则,使用 GBK 编码读取。
参数:
filename (str): 要读取的 CSV 文件的文件名。
返回:
pandas.DataFrame: 读取的数据。
"""
# 读取csv文件
try:
df = pd.read_csv(filename, encoding='utf-8')
except UnicodeDecodeError:
df = pd.read_csv(filename, encoding='gbk')
return df
def dateConvert(df, datecol='ds'):
"""
将数据框 df 中的 datecol 列转换为日期时间类型。
参数:
df (pandas.DataFrame): 要转换的 DataFrame。
datecol (str): 要转换的列名,默认为 'ds'
返回:
pandas.DataFrame: 转换后的 DataFrame。
"""
# 将date列转换为datetime类型
try:
df[datecol] = pd.to_datetime(df[datecol],format=r'%Y-%m-%d')
except:
df[datecol] = pd.to_datetime(df[datecol],format=r'%Y/%m/%d')
return df
def calculate_kdj(data, n=9):
'''
给传进来的df 添加列: 波动率最高最低k,d j
'''
# 对数据按照日期升序排序
data = data.sort_values(by='ds', ascending=True)
# 因为没有高开低价格,利用每日波动率模拟当天最高价和最低价
data['pctchange'] = data['y'].pct_change()
# 收益为0的用0.01
data['pctchange'] = data['pctchange'].replace(0,0.01)
# 去除空值
data.dropna(inplace=True)
# 重置索引
data.reset_index(drop=True,inplace=True)
# 计算最高价和最低价
data['high'] = data['y']* (1+abs(data['pctchange'])/2)
data['low'] = data['y']* (1-abs(data['pctchange'])/2)
# 计算n日内最低价
low_list = data['y'].rolling(window=n, min_periods=1).min()
# 计算n日内最高价
high_list = data['y'].rolling(window=n, min_periods=1).max()
# 计算未成熟随机值
rsv = ((data['y'] - low_list) / (high_list - low_list)) * 100
# 初始化k值为50
k = pd.Series(50, index=data.index)
# 初始化d值为50
d = pd.Series(50, index=data.index)
# 计算k值和d值
for i in range(1, len(data)):
k[i] = (2/3 * k[i - 1]) + (1/3 * rsv[i])
d[i] = (2/3 * d[i - 1]) + (1/3 * k[i])
# 计算j值
j = 3 * k - 2 * d
# 将k值、d值和j值添加到数据中
data['K'] = k
data['D'] = d
data['J'] = j
# 将包含 KDJ 指标的数据保存到新的 CSV 文件
data.to_csv('stock_data_with_kdj.csv', index=False)
# data = data.dropna()
return data
# 上传报告
def get_head_auth_report():
"""
通过 POST 请求登录到指定的 URL并从响应中获取认证令牌。
返回:
str: 如果登录成功,返回认证令牌;否则返回 None。
"""
logger.info("获取token中...")
logger.info(f'url:{login_pushreport_url},login_data:{login_data}')
# 发送 POST 请求到登录 URL携带登录数据
login_res = requests.post(url=login_pushreport_url, json=login_data, timeout=(3, 30))
# 将响应内容转换为 JSON 格式
text = json.loads(login_res.text)
logger.info(f'token接口响应{text}')
# 如果响应状态为成功
if text["status"]:
# 从响应数据中获取认证令牌
token = text["data"]["accessToken"]
# 返回认证令牌
return token
def upload_report_data(token, upload_data):
"""
上传报告数据到指定的URL
参数:
token (str): 认证令牌
upload_data (dict): 要上传的报告数据,包含必要的字段和信息
返回:
dict: 如果上传成功返回响应对象否则返回None
"""
# 直接使用传入的 upload_data
upload_data = upload_data
# 设置请求头部
headers = {"Authorization": token}
# 打印日志,显示正在上传报告数据
logger.info("报告上传中...")
# 打印日志,显示认证头部信息
logger.info(f"token:{token}")
# 打印日志,显示要上传的报告数据
logger.info(f"upload_data:{upload_data}" )
# 发送POST请求上传报告数据
upload_res = requests.post(url=upload_url, headers=headers, json=upload_data, timeout=(3, 15))
# 将响应内容转换为 JSON 格式
upload_res = json.loads(upload_res.text)
# 打印日志,显示响应内容
logger.info(upload_res)
# 如果上传成功,返回响应对象
if upload_res:
return upload_res
# 如果上传失败打印日志并返回None
else:
logger.info("报告上传失败")
return None
def upload_warning_data(warning_data):
"""
上传预警数据到指定的URL
参数:
warning_data (dict): 要上传的预警数据,包含必要的字段和信息
返回:
requests.Response: 如果上传成功返回响应对象否则返回None
"""
# 获取认证头部信息
token = get_head_auth_report()
# 设置请求头部
headers = {"Authorization": token}
# 打印日志,显示正在上传预警数据
logger.info("预警上传中...")
# 打印日志显示上传的URL
logger.info(f"upload_warning_url:{upload_warning_url}")
# 打印日志,显示认证头部信息
logger.info(f"token:{token}")
# 打印日志,显示要上传的预警数据
logger.info(f"warning_data:{warning_data}")
# 发送POST请求上传预警数据
upload_res = requests.post(url=upload_warning_url, headers=headers, json=warning_data, timeout=(3, 15))
# 如果上传成功,返回响应对象
if upload_res:
return upload_res
# 如果上传失败打印日志并返回None
else:
logger.info("预警上传失败")
return None
def upload_warning_info(df_count):
"""
上传预警信息到指定的URL
参数:
df_count (int): 停更的数量
返回:
None
"""
# 打印日志,显示正在上传预警信息
logger.info(f'上传预警信息')
try:
# 获取当前日期
warning_date = datetime.datetime.now().strftime('%Y-%m-%d')
# 构建预警内容
content = f'{warning_date}{df_count}个停更'
# 更新预警数据中的日期和内容
warning_data['data']['WARNING_DATE'] = warning_date
warning_data['data']['WARNING_CONTENT'] = content
# 调用 upload_warning_data 函数上传预警数据
upload_warning_data(warning_data)
# 打印日志,显示上传预警信息成功
logger.info(f'上传预警信息成功')
except Exception as e:
# 打印日志,显示上传预警信息失败,并记录异常信息
logger.error(f'上传预警信息失败:{e}')
def create_feature_last_update_time(df):
"""
计算特征停更信息用
参数:
df (DataFrame): 包含特征数据的 DataFrame
返回:
DataFrame: 包含特征停更信息的 DataFrame
str: y 列的最后更新时间
"""
df1 = df.copy()
# 找到每列的最后更新时间
df1.set_index('ds', inplace=True)
last_update_times = df1.apply(lambda x: x.dropna().index.max().strftime('%Y-%m-%d') if not x.dropna().empty else None)
# 保存每列的最后更新时间到文件
last_update_times_df = pd.DataFrame(columns = ['feature', 'last_update_time','is_value','update_period','warning_date','stop_update_period'])
# 打印每列的最后更新时间
for column, last_update_time in last_update_times.items():
values = []
# 判断是不是常数值
if df1[column].tail(20).nunique() == 1:
values = values + [column, last_update_time,1]
else:
values = values + [column, last_update_time,0]
# 计算特征数据值的时间差
try:
# 计算预警日期
time_diff = (df1[column].dropna().index.to_series().diff().mode()[0]).total_seconds() / 3600 / 24
last_update_time_datetime = datetime.datetime.strptime(last_update_time, '%Y-%m-%d')
last_update_date = end_time if end_time != '' else datetime.datetime.now().strftime('%Y-%m-%d')
end_time_datetime = datetime.datetime.strptime(last_update_date, '%Y-%m-%d')
early_warning_date = last_update_time_datetime + timedelta(days=time_diff)*2 + timedelta(days=1)
stop_update_period = int(math.ceil((end_time_datetime-last_update_time_datetime).days / time_diff))
early_warning_date = early_warning_date.strftime('%Y-%m-%d')
except KeyError:
time_diff = 0
early_warning_date = end_time
continue
values = values + [time_diff,early_warning_date,stop_update_period]
last_update_times_df.loc[len(last_update_times_df)] = values
logger.info(f"Column {column} was last updated at {last_update_time}")
y_last_update_time = last_update_times_df[last_update_times_df['feature']=='y']['warning_date'].values[0]
last_update_times_df.to_csv(os.path.join(dataset,'last_update_times.csv'), index=False)
logger.info('特征停更信息保存到文件last_update_times.csv')
return last_update_times_df,y_last_update_time
# 统计特征频度
def featurePindu(dataset):
# 读取文件
df = loadcsv(os.path.join(dataset,'未填充的特征数据.csv'))
df['ds'] = pd.to_datetime(df['ds'])
# 按ds正序排序重置索引
df = df.sort_values(by='ds', ascending=True).reset_index(drop=True)
# 统计特征频度
# 每列随机抽取10个值计算出5个时间间隔统计每个时间间隔的频度
columns = df.columns.to_list()
columns.remove('ds')
count_dict = {}
for column in columns:
# 获取每列时间间隔
values = df[[column,'ds']]
values.dropna(inplace=True,axis=0)
values=values.reset_index(drop=True)
# 抽取20%个值
value = values.sample(frac=0.2)
index = value.index
next_index = index + 1
count = []
for i,j in zip(index, next_index):
#通过索引计算日期差
try:
count.append((values.loc[j,'ds'] - values.loc[i,'ds']).days)
except:
pass
# 把31 换成 30
count = [30 if i == 31 else i for i in count]
# 保留count中出现次数最多的数
try:
count = max(set(count), key=count.count)
except ValueError :
logger.info(f'{column}列数据为空')
continue
# 存储到字典中
count_dict[column] = count
df = pd.DataFrame(count_dict,index=['count']).T
pindu_dfs = pd.DataFrame()
# 根据count分组
# 输出特征频度统计
pindudict = {'1':'日度','3':'日度','7':'周度','30':'月度','90':'季度','180':'半年度','365':'年度'}
for i in df.groupby('count'):
# 获取 i[1] 的索引值
index = i[1].index
pindu_df = pd.DataFrame()
try:
pindu_df[pindudict[str(i[0])]+f'({len(i[1])})'] = index
except KeyError :
pindu_df[str(i[0])+f'天({len(i[1])})'] = index
# 合并到pindu_dfs
pindu_dfs = pd.concat([pindu_dfs,pindu_df],axis=1)
# nan替换为 ' '
pindu_dfs = pindu_dfs.fillna('')
pindu_dfs.to_csv(os.path.join(dataset,'特征频度统计.csv'),index=False)
logger.info(pindu_dfs)
featureInfo = f'特征信息:总共有{len(columns)-2}'
for i in pindu_dfs.columns:
featureInfo += f',{i}'
featureInfo += ', 详看 附1、特征列表'
featureInfo += '''
数据特征工程:
1. 数据日期排序,新日期在最后
2. 删除空列,特征数据列没有值,就删除
3. 删除近两月不再更新值的指标
4. 非日度数据填充为日度数据,填充规则:
-- 向后填充,举例:假设周五出现一个周度指标数据,那么在这之前的数据用上周五的数据
-- 向前填充举例采集数据开始日期为2018年1月1日那么周度数据可能是2018年1月3日那么3日的数据向前填充使1日2日都有数值
数据特征相关性分析:
'''
logger.info(featureInfo)
with open(os.path.join(dataset,'特征频度统计.txt'), 'w', encoding='utf-8') as f:
f.write(featureInfo)
logger.info('*'*200)
def featureAnalysis(df,dataset,y):
# 特征筛选
import matplotlib.pyplot as plt
# 选择特征和标签列
X = df.drop(['ds', 'y'], axis=1) # 特征集,排除时间戳和标签列
yy = df['y'] # 标签集
# 标签集自相关函数分析
from statsmodels.graphics.tsaplots import plot_acf
plot_acf(yy, lags=30)
plt.savefig(os.path.join(dataset,'指标数据自相关图.png'))
plt.close()
# 标签集偏自相关函数分析
from statsmodels.graphics.tsaplots import plot_pacf
plot_pacf(yy, lags=30)
plt.savefig(os.path.join(dataset,'指标数据偏自相关图.png'))
plt.close()
# 画 特征与价格散点图
# 删除所有*散点图.png
for file in os.listdir(dataset):
if file.endswith("散点图.png"):
os.remove(os.path.join(dataset, file))
plt.rcParams['font.sans-serif'] = ['SimHei']
plt.rcParams['axes.unicode_minus'] = False
plt.figure(figsize=(10, 10))
# # 遍历X每一列和yy画散点图
# for i, col in enumerate(X.columns):
# plt.subplot(2, 2, i%4+1)
# plt.scatter(X[col], yy)
# plt.xlabel(col)
# plt.ylabel(y)
# plt.title(col)
# if i % 4 == 3 or i == len(X.columns)-1:
# plt.tight_layout()
# plt.savefig(os.path.join(dataset,f'{i}指标数据特征与价格散点图.png'))
# plt.close()
def corr_feature(df):
# 重新命名列名,列名排序,y在第一个
df.reindex(['y'] + sorted(df.columns.difference(['y'])))
df_test = df.copy()
# 取最后的220行
df_test = df_test.tail(220)
# 去掉日期列
df_test = df_test.drop(columns=['ds'])
# 不参与标准化
df_test_noscaler = df_test.copy() # 滞后处理备份
df_noscaler = df_test.copy()
# 画出相关性热力图
df_test.to_csv(os.path.join(dataset,'同步相关性.csv'))
corr = df_test.corr()
# 保存相关系数
corr.to_csv(os.path.join(dataset,'同步相关性系数.csv'))
# plt.figure(figsize=(10, 10))
# sns.heatmap(corr, annot=True, cmap='coolwarm')
# plt.savefig('dataset/同步相关性热力图.png')
# plt.show()
# 读取滞后周期文件,更改特征
characteristic_period = pd.read_csv('dataset/特征滞后周期.csv',encoding='utf-8')
# 去掉周期为0的行
characteristic_period = characteristic_period.drop(characteristic_period[characteristic_period['滞后周期'] == 0].index)
for col in df.columns:
# 跳过y列
if col in ['y']:
continue
# 特征滞后n个周期计算与y的相关性
if col in characteristic_period['特征'].values:
# 获取特征对应的周期
period = characteristic_period[characteristic_period['特征'] == col]['滞后周期'].values[0]
# 滞后处理
df[col] = df[col].shift(period)
df.to_csv(os.path.join(dataset,'滞后处理后的数据集.csv'))
# corr_feture_noscaler = {} # 保存相关性最大的周期
# 遍历df_test的每一列计算相关性
# for col in df_noscaler.columns:
# # 跳过y列
# if col in ['y']:
# continue
# logger.info('特征:', col)
# # 特征滞后n个周期计算与y的相关性
# corr_dict = {}
# try:
# for i in range(0, 200):
# if i == 0:
# df_noscaler[col+'_'+str(i)] = df_noscaler[col]
# else:
# df_noscaler[col+'_'+str(i)] = df_noscaler[col].shift(i)
# corr_dict[col+'_'+str(i)] = abs(df_noscaler[col+'_'+str(i)].corr(df_noscaler['y']))
# except :
# logger.info('特征:', col, '滑动错误,请查看')
# continue
# 输出相关性最大的特征
# logger.info(max(corr_dict, key=corr_dict.get), corr_dict[max(corr_dict, key=corr_dict.get)])
# corr_feture_noscaler[col] = max(corr_dict, key=corr_dict.get).split('_')[-1]
# 画出最相关性最大的特征和y的折线图
# plt.figure(figsize=(10, 5))
# plt.plot(df_noscaler[max(corr_dict, key=corr_dict.get)], label=max(corr_dict, key=corr_dict.get))
# # 设置双坐标轴
# ax1 = plt.gca()
# ax2 = ax1.twinx()
# ax2.plot(df_noscaler['y'], color='r', label='y')
# plt.legend()
# try:
# plt.savefig('dataset/特征与y的折线图_'+max(corr_dict, key=corr_dict.get)+'.png')
# except :
# # 替换成_
# plt.savefig('dataset/特征与y的折线图_'+max(corr_dict, key=corr_dict.get).replace(':','_').replace('/','_').replace('(','_').replace(')','_')+'.png')
# plt.close()
# 结果保存到txt文件
# logger.info('不参与标准化的特征滞后相关性写入txt文件')
# with open('dataset/不参与标准化的特征滞后相关性.txt', 'w') as f:
# for key, value in corr_feture_noscaler.items():
# f.write('%s:%s\n' % (key, value))
# 遍历corr_feture_noscaler,更改df
# colnames_noscaler = []
# for col in corr_feture_noscaler:
# colname = col+'_'+corr_feture_noscaler[col]
# if int(corr_feture_noscaler[col]) == 0:
# continue
# df_test_noscaler[colname] = df_test_noscaler[col].shift(int(corr_feture_noscaler[col]))
# df_test_noscaler = df_test_noscaler.drop(columns=[col])
# colnames_noscaler.append(colname)
# 去除有空值的行
# df_test_noscaler = df_test_noscaler.dropna()
# df_test_noscaler.reindex(['y'] + sorted(df_test_noscaler.columns.difference(['y'])))
# df_test_noscaler.to_csv('dataset/不参与标准化的特征滞后相关性.csv', index=False)
# 画出相关性热力图
# corr = df_test_noscaler.corr()
# 保存相关系数
# corr.to_csv(os.path.join(dataset,'不参与标准化的特征滞后相关性系数.csv'))
# plt.figure(figsize=(10, 10))
# sns.heatmap(corr, annot=True, cmap='coolwarm')
# plt.savefig('dataset/不参与标准化的特征滞后相关性热力图.png')
# plt.close()
# # 标准化每列
# from sklearn.preprocessing import StandardScaler
# scaler = StandardScaler()
# df_test = pd.DataFrame(scaler.fit_transform(df_test), columns=df_test.columns)
# corr_feture = {} # 保存相关性最大的周期
# # 遍历df_test的每一列计算相关性
# for col in df_test.columns:
# # 跳过y列
# if col == 'y':
# continue
# logger.info('特征:', col)
# # 特征滞后n个周期计算与y的相关性
# corr_dict = {}
# try:
# for i in range(0, 200):
# if i == 0:
# df_test[col+'_'+str(i)] = df_test[col]
# else:
# df_test[col+'_'+str(i)] = df_test[col].shift(i)
# corr_dict[col+'_'+str(i)] = abs(df_test[col+'_'+str(i)].corr(df_test['y']))
# except :
# logger.info('特征:', col, '滑动错误,请查看')
# continue
# # 输出相关性最大的特征
# logger.info(max(corr_dict, key=corr_dict.get), corr_dict[max(corr_dict, key=corr_dict.get)])
# corr_feture[col] = max(corr_dict, key=corr_dict.get).split('_')[-1]
# # 结果保存到txt文件
# with open('dataset/标准化的特征滞后相关性.txt', 'w') as f:
# for key, value in corr_feture.items():
# f.write('%s:%s\n' % (key, value))
# # 遍历corr_feture,更改df
# colnames = []
# for col in corr_feture:
# colname = col+'_'+corr_feture[col]
# if int(corr_feture[col]) == 0:
# continue
# df[colname] = df[col].shift(int(corr_feture[col]))
# df = df.drop(columns=[col])
# colnames.append(colname)
# # 去除有空值的行
# df = df.dropna()
# df.reindex(['y'] + sorted(df.columns.difference(['y'])))
# df.to_csv('dataset/标准化后的特征滞后相关性.csv', index=False)
# # 画出相关性热力图
# ds = df['ds']
# df = df.drop(columns=['ds'])
# corr = df.corr()
# # 保存相关系数
# corr.to_csv(os.path.join(dataset,'标准化后的特征滞后相关性系数.csv'))
# plt.figure(figsize=(10, 10))
# sns.heatmap(corr, annot=True, cmap='coolwarm')
# plt.savefig('dataset/标准化后的特征滞后相关性热力图.png')
# plt.show()
# df['ds'] = ds
# 去除nan值
df = df.dropna()
return df
def calculate_kdj(data, n=9):
'''
给传进来的df 添加列: 波动率最高最低k ,d j
'''
data = data.sort_values(by='ds', ascending=True)
# 因为没有高开低价格,利用每日波动率模拟当天最高价和最低价
data['pctchange'] = data['y'].pct_change()
# 收益为0的用0.01
data['pctchange'] = data['pctchange'].replace(0,0.01)
data.dropna(inplace=True)
# 重置索引
data.reset_index(drop=True,inplace=True)
data['high'] = data['y']* (1+abs(data['pctchange'])/2)
data['low'] = data['y']* (1-abs(data['pctchange'])/2)
low_list = data['y'].rolling(window=n, min_periods=1).min()
high_list = data['y'].rolling(window=n, min_periods=1).max()
rsv = ((data['y'] - low_list) / (high_list - low_list)) * 100
k = pd.Series(50, index=data.index)
d = pd.Series(50, index=data.index)
for i in range(1, len(data)):
k[i] = (2/3 * k[i - 1]) + (1/3 * rsv[i])
d[i] = (2/3 * d[i - 1]) + (1/3 * k[i])
j = 3 * k - 2 * d
data['K'] = k
data['D'] = d
data['J'] = j
# 将包含 KDJ 指标的数据保存到新的 CSV 文件
data.to_csv('dataset\stock_data_with_kdj.csv', index=False)
# data = data.dropna()
return data
def check_column(df,col_name,two_months_ago):
'''
检查列是否需要删除。
该函数会检查列是否为空值列、180天没有更新的列或常数值列。
参数:
col_name (str): 列名。
df (DataFrame): 包含列的 DataFrame。
返回:
bool: 如果列需要删除,返回 True否则返回 False。
'''
if 'ds' in col_name or 'y' in col_name:
return False
df_check_column = df[['ds',col_name,'y']]
df_check_column = df_check_column.dropna()
if len(df_check_column) == 0:
print(f'空值列:{col_name}')
return True
# 判断是不是常数列
if df_check_column[(df_check_column['ds']>= two_months_ago)].groupby(col_name).ngroups < 2:
print(f'180没有更新{col_name}')
return True
# 判断相关系数大于0.6
if is_del_corr > 0:
if abs(df_check_column[col_name].corr(df_check_column['y'])) < is_del_corr:
print(f'相关系数小于0.6{col_name}')
return True
corresponding_date = df_check_column.iloc[-1]['ds']
return corresponding_date < two_months_ago
def datachuli(df_zhibiaoshuju,df_zhibiaoliebiao,datecol='date',end_time='',y='y',dataset='dataset',delweekenday=False,add_kdj=False,is_timefurture=False):
'''
原油特征数据处理函数,
接收的是两个df一个是指标数据一个是指标列表
输出的是一个df包含dsy指标列
'''
df = df_zhibiaoshuju.copy()
if end_time == '':
end_time = datetime.datetime.now().strftime('%Y-%m-%d')
# 重命名时间列,预测列
df.rename(columns={datecol:'ds'},inplace=True)
df.rename(columns={y:'y'},inplace=True)
# 按时间顺序排列
df.sort_values(by='ds',inplace=True)
df['ds'] = pd.to_datetime(df['ds'])
# 获取start_year年到end_time的数据
df = df[df['ds'].dt.year >= start_year]
df = df[df['ds'] <= end_time]
# last_update_times_df,y_last_update_time = create_feature_last_update_time(df)
# logger.info(f'删除预警的特征前数据量:{df.shape}')
# columns_to_drop = last_update_times_df[last_update_times_df['warning_date'] < y_last_update_time ]['feature'].values.tolist()
# df = df.drop(columns = columns_to_drop)
# logger.info(f'删除预警的特征后数据量:{df.shape}')
# if is_update_warning_data:
# upload_warning_info(last_update_times_df,y_last_update_time)
# 去掉近最后数据对应的日期在六月以前的列删除近2月的数据是常熟的列
if is_del_tow_month:
current_date = datetime.datetime.now()
two_months_ago = current_date - timedelta(days=180)
logger.info(f'删除两月不更新特征前数据量:{df.shape}')
columns_to_drop = []
for clo in df.columns:
if check_column(df,clo,two_months_ago):
columns_to_drop.append(clo)
df = df.drop(columns=columns_to_drop)
logger.info(f'删除两月不更新特征后数据量:{df.shape}')
if freq == 'W':
# 按周取样
df = df.resample('W', on='ds').mean().reset_index()
elif freq == 'M':
# 按月取样
df = df.resample('M', on='ds').mean().reset_index()
# 删除预测列空值的行
''' 工作日缺失,如果删除,会影响预测结果,导致统计准确率出错 '''
# df = df.dropna(subset=['y'])
logger.info(f'删除预测列为空值的行后数据量:{df.shape}')
df = df.dropna(axis=1, how='all')
logger.info(f'删除全为空值的列后数据量:{df.shape}')
df.to_csv(os.path.join(dataset,'未填充的特征数据.csv'),index=False)
# 去掉指标列表中的columns_to_drop的行
df_zhibiaoliebiao = df_zhibiaoliebiao[df_zhibiaoliebiao['指标名称'].isin(df.columns.tolist())]
df_zhibiaoliebiao.to_csv(os.path.join(dataset,'特征处理后的指标名称及分类.csv'),index=False)
# 数据频度分析
featurePindu(dataset=dataset)
# 向上填充
df = df.ffill()
# 向下填充
df = df.bfill()
# 删除周六日的数据
if delweekenday:
df = df[df['ds'].dt.weekday < 5]
# kdj指标
if add_kdj:
df = calculate_kdj(df)
# 衍生时间特征
if is_timefurture:
df = addtimecharacteristics(df=df,dataset=dataset)
# 特征分析
featureAnalysis(df,dataset=dataset,y=y)
return df
def datachuli_juxiting(df_zhibiaoshuju,df_zhibiaoliebiao,datecol='date',end_time='',y='y',dataset='dataset',delweekenday=False,add_kdj=False,is_timefurture=False):
'''
聚烯烃特征数据处理函数,
接收的是两个df一个是指标数据一个是指标列表
输出的是一个df包含dsy指标列
'''
df = df_zhibiaoshuju.copy()
if end_time == '':
end_time = datetime.datetime.now().strftime('%Y-%m-%d')
# date转为pddate
df.rename(columns={datecol:'ds'},inplace=True)
# 指定列统一减少数值
df[offsite_col] = df[offsite_col]-offsite
# 预测列为avg_cols的均值
df[y] = df[avg_cols].mean(axis=1)
# 去掉多余的列avg_cols
df = df.drop(columns=avg_cols)
# 重命名预测列
df.rename(columns={y:'y'},inplace=True)
# 按时间顺序排列
df.sort_values(by='ds',inplace=True)
df['ds'] = pd.to_datetime(df['ds'])
# 获取2018年到当前日期的数据
df = df[df['ds'].dt.year >= 2018]
# 获取小于等于当前日期的数据
df = df[df['ds'] <= end_time]
logger.info(f'删除两月不更新特征前数据量:{df.shape}')
# 去掉近最后数据对应的日期在两月以前的列删除近2月的数据是常数的列
current_date = datetime.datetime.now()
two_months_ago = current_date - timedelta(days=40)
# 检查两月不更新的特征
def check_column(col_name):
if 'ds' in col_name or 'y' in col_name:
return False
df_check_column = df[['ds',col_name]]
df_check_column = df_check_column.dropna()
if len(df_check_column) == 0:
return True
if df_check_column[(df_check_column['ds']>= two_months_ago)].groupby(col_name).ngroups < 2:
return True
corresponding_date = df_check_column.iloc[-1]['ds']
return corresponding_date < two_months_ago
columns_to_drop = df.columns[df.columns.map(check_column)].tolist()
df = df.drop(columns = columns_to_drop)
logger.info(f'删除两月不更新特征后数据量:{df.shape}')
# 删除预测列空值的行
df = df.dropna(subset=['y'])
logger.info(f'删除预测列为空值的行后数据量:{df.shape}')
df = df.dropna(axis=1, how='all')
logger.info(f'删除全为空值的列后数据量:{df.shape}')
df.to_csv(os.path.join(dataset,'未填充的特征数据.csv'),index=False)
# 去掉指标列表中的columns_to_drop的行
df_zhibiaoliebiao = df_zhibiaoliebiao[df_zhibiaoliebiao['指标名称'].isin(df.columns.tolist())]
df_zhibiaoliebiao.to_csv(os.path.join(dataset,'特征处理后的指标名称及分类.csv'),index=False)
# 频度分析
featurePindu(dataset=dataset)
# 向上填充
df = df.ffill()
# 向下填充
df = df.bfill()
# 删除周六日的数据
if delweekenday:
df = df[df['ds'].dt.weekday < 5]
if add_kdj:
df = calculate_kdj(df)
if is_timefurture:
df = addtimecharacteristics(df=df,dataset=dataset)
featureAnalysis(df,dataset=dataset,y=y)
return df
def getdata(filename, datecol='date',y='y',dataset='',add_kdj=False,is_timefurture=False,end_time=''):
logger.info('getdata接收'+filename+' '+datecol+' '+end_time)
# 判断后缀名 csv或excel
if filename.endswith('.csv'):
df = loadcsv(filename)
else:
# 读取excel 指标数据
df_zhibiaoshuju = pd.read_excel(filename,sheet_name='指标数据')
df_zhibiaoliebiao = pd.read_excel(filename,sheet_name='指标列表')
# 日期字符串转为datatime
df = datachuli(df_zhibiaoshuju,df_zhibiaoliebiao,datecol,y = y,dataset=dataset,add_kdj=add_kdj,is_timefurture=is_timefurture,end_time=end_time)
return df,df_zhibiaoliebiao
def getdata_juxiting(filename, datecol='date',y='y',dataset='',add_kdj=False,is_timefurture=False,end_time=''):
logger.info('getdata接收'+filename+' '+datecol+' '+end_time)
# 判断后缀名 csv或excel
if filename.endswith('.csv'):
df = loadcsv(filename)
else:
# 读取excel 指标数据
df_zhibiaoshuju = pd.read_excel(filename,sheet_name='指标数据')
df_zhibiaoliebiao = pd.read_excel(filename,sheet_name='指标列表')
# 日期字符串转为datatime
df = datachuli_juxiting(df_zhibiaoshuju,df_zhibiaoliebiao,datecol,y = y,dataset=dataset,add_kdj=add_kdj,is_timefurture=is_timefurture,end_time=end_time)
return df
def sanitize_filename(filename):
# 使用正则表达式替换不合规的字符
# 这里我们替换为下划线'_',但你可以根据需要选择其他字符
sanitized = re.sub(r'[\\/*?:"<>|\s]', '_', filename)
# 移除开头的点(在某些系统中,以点开头的文件可能是隐藏的)
sanitized = re.sub(r'^\.', '', sanitized)
# 如果需要,可以添加更多替换规则
return sanitized
class BinanceAPI:
'''
获取 Binance API 请求头签名
'''
def __init__(self, APPID, SECRET):
self.APPID = APPID
self.SECRET = SECRET
self.get_signature()
# 生成随机字符串作为 nonce
def generate_nonce(self, length=32):
self.nonce = ''.join(random.choices(string.ascii_letters + string.digits, k=length))
return self.nonce
# 获取当前时间戳(秒)
def get_timestamp(self):
return int(time.time())
# 构建待签名字符串
def build_sign_str(self):
return f'appid={self.APPID}&nonce={self.nonce}&timestamp={self.timestamp}'
# 使用 HMAC SHA-256 计算签名
def calculate_signature(self, secret, message):
return base64.urlsafe_b64encode(hmac.new(secret.encode('utf-8'), message.encode('utf-8'), hashlib.sha256).digest()).decode('utf-8')
def get_signature(self):
# 调用上述方法生成签名
self.nonce = self.generate_nonce()
self.timestamp = self.get_timestamp()
self.sign_str = self.build_sign_str()
self.signature = self.calculate_signature(self.SECRET, self.sign_str)
# return self.signature
class Graphs:
# 绘制标题
@staticmethod
def draw_title(title: str):
# 获取所有样式表
style = getSampleStyleSheet()
# 拿到标题样式
ct = style['Heading1']
# 单独设置样式相关属性
ct.fontName = 'SimSun' # 字体名
ct.fontSize = 18 # 字体大小
ct.leading = 50 # 行间距
ct.textColor = colors.green # 字体颜色
ct.alignment = 1 # 居中
ct.bold = True
# 创建标题对应的段落,并且返回
return Paragraph(title, ct)
# 绘制小标题
@staticmethod
def draw_little_title(title: str):
# 获取所有样式表
style = getSampleStyleSheet()
# 拿到标题样式
ct = style['Normal']
# 单独设置样式相关属性
ct.fontName = 'SimSun' # 字体名
ct.fontSize = 15 # 字体大小
ct.leading = 30 # 行间距
ct.textColor = colors.red # 字体颜色
# 创建标题对应的段落,并且返回
return Paragraph(title, ct)
# 绘制普通段落内容
@staticmethod
def draw_text(text: str):
# 获取所有样式表
style = getSampleStyleSheet()
# 获取普通样式
ct = style['Normal']
ct.fontName = 'SimSun'
ct.fontSize = 12
ct.wordWrap = 'CJK' # 设置自动换行
ct.alignment = 0 # 左对齐
ct.firstLineIndent = 32 # 第一行开头空格
ct.leading = 25
return Paragraph(text, ct)
# 绘制表格
@staticmethod
def draw_table(*args):
# 列宽度
col_width = args[0]
style = [
('FONTNAME', (0, 0), (-1, -1), 'SimSun'), # 字体
('FONTSIZE', (0, 0), (-1, 0), 12), # 第一行的字体大小
('FONTSIZE', (0, 1), (-1, -1), 10), # 第二行到最后一行的字体大小
('BACKGROUND', (0, 0), (-1, 0), '#d5dae6'), # 设置第一行背景颜色
('ALIGN', (0, 0), (-1, -1), 'CENTER'), # 第一行水平居中
('ALIGN', (0, 1), (-1, -1), 'LEFT'), # 第二行到最后一行左右左对齐
('VALIGN', (0, 0), (-1, -1), 'MIDDLE'), # 所有表格上下居中对齐
('TEXTCOLOR', (0, 0), (-1, -1), colors.darkslategray), # 设置表格内文字颜色
('GRID', (0, 0), (-1, -1), 0.5, colors.grey), # 设置表格框线为grey色线宽为0.5
# ('SPAN', (0, 1), (0, 2)), # 合并第一列二三行
# ('SPAN', (0, 3), (0, 4)), # 合并第一列三四行
# ('SPAN', (0, 5), (0, 6)), # 合并第一列五六行
# ('SPAN', (0, 7), (0, 8)), # 合并第一列五六行
]
table = Table(args[1:], colWidths=col_width, style=style)
return table
# 创建图表
@staticmethod
def draw_bar(bar_data: list, ax: list, items: list):
drawing = Drawing(500, 250)
bc = VerticalBarChart()
bc.x = 45 # 整个图表的x坐标
bc.y = 45 # 整个图表的y坐标
bc.height = 200 # 图表的高度
bc.width = 350 # 图表的宽度
bc.data = bar_data
bc.strokeColor = colors.black # 顶部和右边轴线的颜色
bc.valueAxis.valueMin = 5000 # 设置y坐标的最小值
bc.valueAxis.valueMax = 26000 # 设置y坐标的最大值
bc.valueAxis.valueStep = 2000 # 设置y坐标的步长
bc.categoryAxis.labels.dx = 2
bc.categoryAxis.labels.dy = -8
bc.categoryAxis.labels.angle = 20
bc.categoryAxis.categoryNames = ax
# 图示
leg = Legend()
leg.fontName = 'SimSun'
leg.alignment = 'right'
leg.boxAnchor = 'ne'
leg.x = 475 # 图例的x坐标
leg.y = 240
leg.dxTextSpace = 10
leg.columnMaximum = 3
leg.colorNamePairs = items
drawing.add(leg)
drawing.add(bc)
return drawing
# 绘制图片
@staticmethod
def draw_img(path):
img = Image(path) # 读取指定路径下的图片
img.drawWidth = 20*cm # 设置图片的宽度
img.drawHeight = 10*cm # 设置图片的高度
return img
# 定义样式函数
def style_row(row):
if '' in row['频度']:
return ['background-color: yellow'] * len(row)
else:
return ['background-color: gray'] * len(row)
class EtaReader():
def __init__(self,signature,classifylisturl,classifyidlisturl,edbcodedataurl,edbcodelist,edbdatapushurl,edbdeleteurl,edbbusinessurl):
'''
初始化 EtaReader 类的实例。
参数:
signature (str): 用于 API 请求的签名。
classifylisturl (str): 分类列表的 URL。
classifyidlisturl (str): 分类 ID 列表的 URL。
edbcodedataurl (str): EDB 代码数据的 URL。
edbdatapushurl (str): EDB 数据推送的 URL。
edbcodelist (str): EDB 代码列表的 URL。
edbdeleteurl (str): EDB 数据删除的 URL。
edbbusinessurl (str): EDB 业务数据的 URL。
返回:
None
'''
self.signature = signature
self.classifylisturl = classifylisturl
self.classifyidlisturl = classifyidlisturl
self.edbcodedataurl = edbcodedataurl
self.edbdatapushurl = edbdatapushurl
self.edbcodelist = edbcodelist
self.edbdeleteurl = edbdeleteurl
self.edbbusinessurl = edbbusinessurl
def filter_yuanyou_data(self,ClassifyName,data):
'''
指标名称保留规则
'''
# 包含 关键词 去除, 返回flase
if any(keyword in data for keyword in ['运费','检修','波动率','地缘政治','股价',
'同比','环比','环差','裂差','4WMA','变频','道琼斯','标普500','纳斯达克',
'四周均值','名占比','残差','DMA',
'连7-连9','4周平均','4周均值','滚动相关性','日本']):
return False
# 检查需要的特征
# 去掉 分析 分类下的数据
if ClassifyName == '分析':
return False
# 保留 库存中特殊关键词
if ClassifyName == '库存':
if any(keyword in data for keyword in ['原油' , '美国' ,'全球' ,'中国' ,'富查伊拉','ARA' ]):
return True
else:
pass
else:
pass
# 去掉 持仓中不是基金的数据
if ClassifyName == '持仓':
if '基金' not in data:
return False
else:
pass
else:
pass
# 去掉 航班中不是中国、美国 的数据
if ClassifyName == '需求':
if '航班' in data :
if '中国' in data or '美国' in data :
return True
else:
return False
else:
pass
else:
pass
# 分类为 期货市场,同质性数据取第一个
if ClassifyName == '期货市场':
# 去掉c1-9 以后的
if 'c1-c' in data:
try:
c = int(data.split('c1-c')[1])
except:
return False
if c > 9 :
return False
else:
pass
else:
pass
# 判断 同质性数据, 字符串开头
strstartdict = {'ICE Brent c':"ICE Brent c14",
'NYMWX WTI c':"NYMWX WTI c5",
'INE SC c':"INE SC c1",
'EFS c':"EFS c",
'Dubai Swap c':"Dubai Swap c1",
'Oman Swap c':"Oman Swap c1",
'DME Oman c':"DME Oman c1",
'Murban Futures c':"Murban Futures c1",
'Dubai连合约价格':'Dubai连1合约价格',
'美国RBOB期货月份合约价格':'美国RBOB期货2309月份合约价格',
'Brent连合约价格':'Brent连1合约价格',
'WTI连合约价格':'WTI连1合约价格',
'布伦特连合约价格':'Brent连1合约价格',
'Brent 连合约价格':'Brent连1合约价格',
'Dubai连合约价格':'Dubai连1合约价格',
'Brent连':'Brent连1合约价格',
'brent连':'Brent连1合约价格',
}
# 判断名称字符串开头是否在 strstartdict.keys中
match = re.match(r'([a-zA-Z\s]+)(\d+)', data)
if match:
part1 = match.group(1)
part2 = match.group(2)
if part1 in [i for i in strstartdict.keys()]:
if data == strstartdict[part1]:
return True
else:
return False
# data = 'Brent 连7合约价格'
# 判断名称字符串去掉数字后是否在 strstartdict.keys中
match = re.findall(r'\D+', data)
if match :
if len(match) == 2:
part1 = match[0]
part2 = match[1]
if part1+part2 in [i for i in strstartdict.keys()]:
if data == strstartdict[part1+part2]:
return True
else:
return False
else:
pass
elif len(match) == 1:
match = re.findall(r'\D+', data)
part1 = match[0]
if part1 in [i for i in strstartdict.keys()]:
if data == strstartdict[part1]:
return True
else:
return False
else:
pass
else:
pass
# 去掉kpler数据源
if 'Kpler' in ClassifyName or 'kpler' in ClassifyName:
return False
return True
def filter_pp_data(self,ClassifyName,data):
'''
指标名称保留规则
'''
# 包含 关键词 去除, 返回flase
# if any(keyword in data for keyword in ['运费','检修','波动率','地缘政治','股价',
# '同比','环比','环差','裂差','4WMA','变频','道琼斯','标普500','纳斯达克',
# '四周均值','名占比','残差','DMA',
# '连7-连9','4周平均','4周均值','滚动相关性','日本']):
# return False
# 包含 关键词 保留, 返回True
if any(keyword in data for keyword in ['拉丝']):
return True
# 检查需要的特征
# 去掉 期货市场 分类下的数据
if ClassifyName == '期货市场':
return False
else:
pass
# 保留 库存 下所有指标
if ClassifyName == '库存':
return True
else:
pass
# 保留 进出口 下所有指标
if ClassifyName == '进出口':
return True
else:
pass
# 保留 价差 下所有指标
if ClassifyName == '价差':
return True
else:
pass
# 保留 供应 下所有指标
if ClassifyName == '供应':
return True
else:
pass
# 保留 需求 下所有指标
if ClassifyName == '需求':
return True
else:
pass
return True
# 通过edbcode 获取指标数据
def edbcodegetdata(self,df,EdbCode,EdbName):
# 根据指标id获取指标数据
url = self.edbcodedataurl+str(EdbCode)
# 发送GET请求
response = requests.get(url, headers=self.headers)
# 检查响应状态码
if response.status_code == 200:
data = response.json() # 假设接口返回的是JSON数据
all_data_items = data.get('Data')
# 列表转换为DataFrame
df3 = pd.DataFrame(all_data_items, columns=['DataTime', 'Value', 'UpdateTime'])
# df3 = pd.read_json(all_data_items, orient='records')
# 去掉UpdateTime 列
df3 = df3.drop(columns=['UpdateTime'])
# df3.set_index('DataTime')
df3.rename(columns={'Value': EdbName}, inplace=True)
# 将数据存储df1
df = pd.merge(df, df3, how='outer',on='DataTime',suffixes= ('', '_y'))
# 按时间排序
df = df.sort_values(by='DataTime', ascending=True)
return df
else:
# 请求失败,打印错误信息
logger.info(f'Error: {response.status_code}, {response.text}')
# 主动抛出异常
raise Exception(f'Error: {response.status_code}, {response.text}')
def get_eta_api_yuanyou_data(self,data_set,dataset=''):
'''
从ETA API获取原油数据
参数:
data_set (str): 数据集名称
dataset (str): 数据集ID默认为空
返回:
None
'''
today = datetime.date.today().strftime("%Y-%m-%d")
# 定义你的headers这里可以包含多个参数
self.headers = {
'nonce': self.signature.nonce, # 例如,一个认证令牌
'timestamp': str(self.signature.timestamp), # 自定义的header参数
'appid': self.signature.APPID, # 另一个自定义的header参数
'signature': self.signature.signature
}
# 从列表数据中获取指标名称,判断指标名称频度是否为日 如果是则获取UniqueCode然后获取指标数据保存到xlat文件中的sheet表。
'''
df = sheetname 指标列表,存储 指标分类-指标名称-指标id-频度
df1 = sheetname 指标数据 ,存储 时间-指标名称1-指标名称2...
'''
# 构建新的DataFrame df df1
df = pd.DataFrame(columns=['指标分类', '指标名称', '指标id', '频度','指标来源','来源id','最后更新时间','更新周期','预警日期','停更周期'])
df1 = pd.DataFrame(columns=['DataTime'])
# 外网环境无法访问,请确认是否为内网环境
try:
# 发送GET请求 获取指标分类列表
response = requests.get(self.classifylisturl, headers=self.headers)
except requests.exceptions.RequestException as e:
raise Exception(f"请求失败,请确认是否为内网环境: {e}","\033[0m")
# 检查响应状态码
if response.status_code == 200:
# 获取成功, 处理响应内容
data = response.json() # 假设接口返回的是JSON数据
# 请求成功,处理响应内容
# logger.info(data.get('Data'))
# 定义你想要保留的固定值
fixed_value = 1214
# 遍历列表,只保留那些'category' key的值为固定值的数据项
filtered_data = [item for item in data.get('Data') if item.get('ParentId') == fixed_value]
#然后循环filtered_data去获取list数据才能获取到想要获取的ClassifyId
n = 0
for item in filtered_data:
n+= 1
# if n>50:
# break
ClassifyId = item["ClassifyId"] #分类id分类下的指标列表接口的请求参数
ClassifyName = item["ClassifyName"] #分类名称要保存到df的指标分类列
# 根据分类id获取指标列表
url = self.classifyidlisturl+str(ClassifyId)
response = requests.get(url, headers=self.headers)
if response.status_code == 200:
# logger.info(response.text)
data2 = response.json()
Data = data2.get('Data')
for i in Data:
# s+= 1
EdbCode = i.get('EdbCode')
EdbName = i.get('EdbName') # 指标名称要保存到df2的指标名称列,df的指标名称列
Frequency = i.get('Frequency') # 频度要保存到df的频度列
SourceName = i.get('SourceName') # 来源名称要保存到df的频度列
Source = i.get('Source') # 来源ID要保存到df的频度列
Unit = i.get('Unit') # 单位要保存到df的单位列
# 频度不是 日 或者 周的 跳过
if Frequency not in ['日度','周度','','']:
continue
# 只保留手工数据中,名称带有 海运出口 海运进口
if Source == 9 and not ('海运出口' in EdbName or '海运进口' in EdbName):
continue
# 不要wind数据
if Source == 2:
continue
# 判断名称是否需要保存
isSave = self.filter_yuanyou_data(ClassifyName,EdbName)
if isSave:
# 保存到df
df1 = self.edbcodegetdata(df1,EdbCode,EdbName)
# 取df1所有行最后一列
edbname_df = df1[['DataTime',f'{EdbName}']]
edbname_df = edbname_df.dropna()
if len(edbname_df) == 0:
logger.info(f'指标名称:{EdbName} 没有数据')
continue
try:
time_sequence = edbname_df['DataTime'].values.tolist()[-10:]
except IndexError:
time_sequence = edbname_df['DataTime'].values.tolist()
# 使用Counter来统计每个星期几的出现次数
from collections import Counter
weekday_counter = Counter(datetime.datetime.strptime(time_str, "%Y-%m-%d").strftime('%A') for time_str in time_sequence)
# 打印出现次数最多的星期几
try:
most_common_weekday = weekday_counter.most_common(1)[0][0]
# 计算两周后的日期
warning_date = (datetime.datetime.strptime(time_sequence[-1], "%Y-%m-%d") + datetime.timedelta(weeks=2)).strftime("%Y-%m-%d")
stop_update_period = (datetime.datetime.strptime(today, "%Y-%m-%d") - datetime.datetime.strptime(time_sequence[-1], "%Y-%m-%d")).days // 7
except IndexError:
most_common_weekday = '其他'
stop_update_period = 0
if '' in Frequency:
most_common_weekday = '每天'
warning_date = (datetime.datetime.strptime(time_sequence[-1], "%Y-%m-%d") + datetime.timedelta(days=3)).strftime("%Y-%m-%d")
stop_update_period = (datetime.datetime.strptime(today, "%Y-%m-%d") - datetime.datetime.strptime(time_sequence[-1], "%Y-%m-%d")).days
# 保存频度 指标名称 分类 指标id 到 df
df2 = pd.DataFrame({'指标分类': ClassifyName,
'指标名称': EdbName,
'指标id': EdbCode,
'单位': Unit,
'频度': Frequency,
'指标来源':SourceName,
'来源id':Source,
'最后更新时间':edbname_df['DataTime'].values[-1],
'更新周期':most_common_weekday,
'预警日期':warning_date,
'停更周期':stop_update_period},index=[0],
)
# df = pd.merge(df, df2, how='outer')
df = pd.concat([df, df2])
else:
logger.info(f'跳过指标 {EdbName}')
# 找到列表中不在指标列中的指标id保存成新的list
new_list = [item for item in self.edbcodelist if item not in df['指标id'].tolist()]
logger.info(new_list)
# 遍历new_list获取指标数据保存到df1
for item in new_list:
logger.info(item)
# 将item 加入到 df['指标id']中
try:
itemname = edbcodenamedict[item]
except:
itemname = item
df1 = self.edbcodegetdata(df1,item,itemname)
df = pd.concat([df, pd.DataFrame({'指标分类': '其他', '指标名称': itemname, '指标id': item, '频度': '其他','指标来源':'其他','来源id':'其他'},index=[0])])
# 按时间排序
df1.sort_values('DataTime',inplace=True,ascending=False)
df1.rename(columns={'DataTime': 'date'},inplace=True)
# df1.dropna(inplace=True)
# 去掉大于今天日期的行
df1 = df1[df1['date'] <= datetime.datetime.now().strftime('%Y-%m-%d')]
logger.info(df1.head())
# logger.info(f'{df1.head()}')
df_zhibiaoshuju = df1.copy()
df_zhibiaoliebiao = df.copy()
return df_zhibiaoshuju,df_zhibiaoliebiao
def get_eta_api_pp_data(self,data_set,dataset=''):
global ClassifyId
today = datetime.date.today().strftime("%Y-%m-%d")
# 定义你的headers这里可以包含多个参数
self.headers = {
'nonce': self.signature.nonce, # 例如,一个认证令牌
'timestamp': str(self.signature.timestamp), # 自定义的header参数
'appid': self.signature.APPID, # 另一个自定义的header参数
'signature': self.signature.signature
}
# 从列表数据中获取指标名称,判断指标名称频度是否为日 如果是则获取UniqueCode然后获取指标数据保存到xlat文件中的sheet表。
'''
df = sheetname 指标列表,存储 指标分类-指标名称-指标id-频度
df1 = sheetname 指标数据 ,存储 时间-指标名称1-指标名称2...
'''
# 构建新的DataFrame df df1
df = pd.DataFrame(columns=['指标分类', '指标名称', '指标id', '频度'])
df1 = pd.DataFrame(columns=['DataTime'])
# 外网环境无法访问,请确认是否为内网环境
try:
# 发送GET请求 获取指标分类列表
response = requests.get(self.classifylisturl, headers=self.headers)
except requests.exceptions.RequestException as e:
raise Exception(f"请求失败,请确认是否为内网环境: {e}","\033[0m")
# 检查响应状态码
if response.status_code == 200:
# 获取成功, 处理响应内容
data = response.json() # 假设接口返回的是JSON数据
# 请求成功,处理响应内容
# logger.info(data.get('Data'))
# 定义你想要保留的固定值
fixed_value = ClassifyId
# 遍历列表,只保留那些'category' key的值为固定值的数据项
filtered_data = [item for item in data.get('Data') if item.get('ParentId') == fixed_value]
#然后循环filtered_data去获取list数据才能获取到想要获取的ClassifyId
n = 0
for item in filtered_data:
n+= 1
# if n>50:
# break
ClassifyId = item["ClassifyId"] #分类id分类下的指标列表接口的请求参数
ClassifyName = item["ClassifyName"] #分类名称要保存到df的指标分类列
# 根据分类id获取指标列表
url = self.classifyidlisturl+str(ClassifyId)
response = requests.get(url, headers=self.headers)
if response.status_code == 200:
# logger.info(response.text)
data2 = response.json()
Data = data2.get('Data')
for i in Data:
# s+= 1
EdbCode = i.get('EdbCode')
EdbName = i.get('EdbName') # 指标名称要保存到df2的指标名称列,df的指标名称列
Frequency = i.get('Frequency') # 频度要保存到df的频度列
# 频度不是 日 或者 周的 跳过
if Frequency not in ['日度','周度','','']:
continue
# 判断名称是否需要保存
isSave = self.filter_pp_data(ClassifyName,EdbName)
if isSave:
# 保存到df
# 保存频度 指标名称 分类 指标id 到 df
df2 = pd.DataFrame({'指标分类': ClassifyName, '指标名称': EdbName, '指标id': EdbCode, '频度': Frequency},index=[0])
# df = pd.merge(df, df2, how='outer')
df = pd.concat([df, df2])
df1 = self.edbcodegetdata(df1,EdbCode,EdbName)
else:
logger.info(f'跳过指标 {EdbName}')
# 找到列表中不在指标列中的指标id保存成新的list
new_list = [item for item in self.edbcodelist if item not in df['指标id'].tolist()]
logger.info(new_list)
# 遍历new_list获取指标数据保存到df1
for item in new_list:
logger.info(item)
# 将item 加入到 df['指标id']中
try:
itemname = edbcodenamedict[item]
except:
itemname = item
df1 = self.edbcodegetdata(df1,item,itemname)
df = pd.concat([df, pd.DataFrame({'指标分类': '其他', '指标名称': itemname, '指标id': item, '频度': '其他'},index=[0])])
# 按时间排序
df1.sort_values('DataTime',inplace=True,ascending=False)
df1.rename(columns={'DataTime': 'date'},inplace=True)
# df1.dropna(inplace=True)
# 去掉大于今天日期的行
df1 = df1[df1['date'] <= datetime.datetime.now().strftime('%Y-%m-%d')]
logger.info(df1.head())
# logger.info(f'{df1.head()}')
# 保存到xlsx文件的sheet表
with pd.ExcelWriter(os.path.join(dataset,data_set)) as file:
df1.to_excel(file, sheet_name='指标数据', index=False)
df.to_excel(file, sheet_name='指标列表', index=False)
df_zhibiaoshuju = df1.copy()
df_zhibiaoliebiao = df.copy()
return df_zhibiaoshuju,df_zhibiaoliebiao
def push_data(self,data):
today = datetime.date.today().strftime("%Y-%m-%d")
# 定义你的headers这里可以包含多个参数
self.headers = {
'nonce': self.signature.nonce, # 例如,一个认证令牌
'timestamp': str(self.signature.timestamp), # 自定义的header参数
'appid': self.signature.APPID, # 另一个自定义的header参数
'signature': self.signature.signature
}
# 发送post请求 上传数据
logger.info('请求参数:',data)
response = requests.post(self.edbdatapushurl, headers=self.headers,data=json.dumps(data))
# 检查响应状态码
if response.status_code == 200:
data = response.json() # 假设接口返回的是JSON数据
logger.info('上传成功,响应为:', data)
else:
# 请求失败,打印错误信息
logger.info(f'Error: {response.status_code}, {response.text}')
# 主动抛出异常
raise Exception(f'Error: {response.status_code}, {response.text}')
def del_zhibiao(self,IndexCodeList):
today = datetime.date.today().strftime("%Y-%m-%d")
# 定义你的headers这里可以包含多个参数
self.headers = {
'nonce': self.signature.nonce, # 例如,一个认证令牌
'timestamp': str(self.signature.timestamp), # 自定义的header参数
'appid': self.signature.APPID, # 另一个自定义的header参数
'signature': self.signature.signature
}
data = {
"IndexCodeList": IndexCodeList #指标编码列表
}
# 发送post请求 上传数据
response = requests.post(self.edbdeleteurl, headers=self.headers,data=json.dumps(data))
# 检查响应状态码
if response.status_code == 200:
data = response.json() # 假设接口返回的是JSON数据
logger.info('删除成功,响应为:', data)
else:
# 请求失败,打印错误信息
logger.info(f'Error: {response.status_code}, {response.text}')
# 主动抛出异常
raise Exception(f'Error: {response.status_code}, {response.text}')
def del_business(self,data):
''''
接口地址
https://console-docs.apipost.cn/preview/fce869601d0be1d9/9a637c2f9ed0c589?target_id=d3cafcbf-a68c-42b3-b105-7bbd0e95a9cd
请求体 body
{
"IndexCode": "W001067", //指标编码
"StartDate": "2020-04-20", //指标需要删除的开始日期(>=),如果开始日期和结束日期相等,那么就是删除该日期
"EndDate": "2024-05-28" //指标需要删除的结束日期(<=),如果开始日期和结束日期相等,那么就是删除该日期
}
'''
today = datetime.date.today().strftime("%Y-%m-%d")
# 定义你的headers这里可以包含多个参数
self.headers = {
'nonce': self.signature.nonce, # 例如,一个认证令牌
'timestamp': str(self.signature.timestamp), # 自定义的header参数
'appid': self.signature.APPID, # 另一个自定义的header参数
'signature': self.signature.signature
}
# 发送post请求 上传数据
response = requests.post(self.edbbusinessurl, headers=self.headers,data=json.dumps(data))
# 检查响应状态码
if response.status_code == 200:
data = response.json() # 假设接口返回的是JSON数据
logger.info('删除成功,响应为:', data)
else:
# 请求失败,打印错误信息
logger.info(f'Error: {response.status_code}, {response.text}')
# 主动抛出异常
raise Exception(f'Error: {response.status_code}, {response.text}')
def get_market_data(end_time,df):
"""
获取市场数据拼接到df中
"""
# 获取token
token = get_head_auth_report()
# 定义请求参数
query_data_list_item_nos_data['data']['dateEnd'] = end_time.replace('-','')
# 发送请求
headers = {"Authorization": token}
logger.info('获取数据中...')
items_res = requests.post(url=query_data_list_item_nos_url, headers=headers, json=query_data_list_item_nos_data, timeout=(3, 35))
json_data = json.loads(items_res.text)
logger.info(f"获取到的数据:{json_data}")
df3 = pd.DataFrame(json_data['data'])
# 按照dataItemNo 分组 得到多个dataframe 最后根据dataDate merge 成一个dataframe
df2 = pd.DataFrame()
for i in df3['dataItemNo'].unique():
df1 = df3[df3['dataItemNo'] == i]
df1 = df1[['dataDate', 'dataValue']]
df1 = df1.rename(columns={'dataValue': i})
if len(df2) == 0:
df2 = df1
continue
df2 = pd.merge(df2, df1, how='left')
df2 = df2.rename(columns={'dataDate': 'date'})
# 20240101 转换为 2024-01-01
df2['date'] = pd.to_datetime(df2['date'], format='%Y%m%d')
df2['date'] = df2['date'].dt.strftime('%Y-%m-%d')
df = pd.merge(df, df2, how='left',on='date')
return df
def get_high_low_data(df):
# 读取excel 从第五行开始
df1 = pd.read_excel(os.path.join(dataset,'数据项下载.xls'),header=5, names=['numid','date', 'Brentzdj', 'Brentzgj'])
# 合并数据
df = pd.merge(df, df1, how='left',on='date')
return df
# 时间特征,年,月,一年的多少天,周几,第几周,第几季度,每月的第几天, 每季度的第几天,是否每月的第一天,是否每月的最后一天,是否每季度的第一天,是否每季度的最后一天,是否每年的第一天,是否每年的最后一天
def addtimecharacteristics(df,dataset):
"""
为输入的 DataFrame 添加日期相关信息列
参数:
df (pandas.DataFrame): 包含日期列 'ds' 的 DataFrame
返回:
pandas.DataFrame: 添加了相关列的 DataFrame
"""
df['year'] = df['ds'].dt.year
df['month'] = df['ds'].dt.month
df['day'] = df['ds'].dt.day
df['dayofweek'] = df['ds'].dt.dayofweek
df['weekofyear'] = df['ds'].dt.isocalendar().week
df['dayofyear'] = df['ds'].dt.dayofyear
df['quarternum'] = df['ds'].dt.quarter
# 将ds列转换为季度Period对象
df['quarter'] = df['ds'].dt.to_period('Q')
# 获取每个季度的开始日期
df['quarter_start'] = df['quarter'].dt.to_timestamp('s')
# 计算每个日期是所在季度的第几天
df['dayofquarter'] = (df['ds'] - df['quarter_start']).dt.days + 1
# 是否月初
df['is_month_start'] = df['ds'].dt.is_month_start.astype(int)
# 是否月末
df['is_month_end'] = df['ds'].dt.is_month_end.astype(int)
# 是否季度初
df['is_quarter_start'] = df['ds'].dt.is_quarter_start.astype(int)
# 是否季度末
df['is_quarter_end'] = df['ds'].dt.is_quarter_end.astype(int)
# 是否年初
df['is_year_start'] = df['ds'].dt.is_year_start.astype(int)
# 是否年末
df['is_year_end'] = df['ds'].dt.is_year_end.astype(int)
# 去掉 quarter_start quarter
df.drop(columns=['quarter_start','quarter'],inplace=True)
df.to_csv(os.path.join(dataset,'指标数据添加时间特征.csv'), index=False)
return df