1503 lines
62 KiB
Python
1503 lines
62 KiB
Python
|
||
# 导入模块
|
||
import pandas as pd
|
||
import numpy as np
|
||
import datetime
|
||
import string
|
||
import base64
|
||
import requests
|
||
import random
|
||
import time
|
||
import re
|
||
import os
|
||
import hmac
|
||
import hashlib
|
||
import json
|
||
import math
|
||
import torch
|
||
torch.set_float32_matmul_precision("high")
|
||
import matplotlib.pyplot as plt
|
||
#设置plt显示中文
|
||
plt.rcParams['font.sans-serif'] = ['SimHei'] # 用来正常显示中文标签
|
||
plt.rcParams['axes.unicode_minus'] = False # 用来正常显示负号
|
||
|
||
from datetime import timedelta
|
||
from sklearn import metrics
|
||
from reportlab.pdfbase import pdfmetrics # 注册字体
|
||
from reportlab.pdfbase.ttfonts import TTFont # 字体类
|
||
from reportlab.platypus import Table, SimpleDocTemplate, Paragraph, Image # 报告内容相关类
|
||
from reportlab.lib.pagesizes import letter # 页面的标志尺寸(8.5*inch, 11*inch)
|
||
from reportlab.lib.styles import getSampleStyleSheet # 文本样式
|
||
from reportlab.lib import colors # 颜色模块
|
||
from reportlab.graphics.charts.barcharts import VerticalBarChart # 图表类
|
||
from reportlab.graphics.charts.legends import Legend # 图例类
|
||
from reportlab.graphics.shapes import Drawing # 绘图工具
|
||
from reportlab.lib.units import cm # 单位:cm
|
||
|
||
# 注册字体(提前准备好字体文件, 如果同一个文件需要多种字体可以注册多个)
|
||
pdfmetrics.registerFont(TTFont('SimSun', 'SimSun.ttf'))
|
||
#设置plt显示中文
|
||
plt.rcParams['font.sans-serif'] = ['SimHei'] # 用来正常显示中文标签
|
||
plt.rcParams['axes.unicode_minus'] = False # 用来正常显示负号
|
||
|
||
# from config_jingbo import *
|
||
from config_juxiting import *
|
||
|
||
|
||
|
||
# 定义函数
|
||
def loadcsv(filename):
|
||
# 读取csv文件
|
||
try:
|
||
df = pd.read_csv(filename, encoding='utf-8')
|
||
except UnicodeDecodeError:
|
||
df = pd.read_csv(filename, encoding='gbk')
|
||
return df
|
||
|
||
def dateConvert(df, datecol='ds'):
|
||
# 将date列转换为datetime类型
|
||
try:
|
||
df[datecol] = pd.to_datetime(df[datecol],format=r'%Y-%m-%d')
|
||
except:
|
||
df[datecol] = pd.to_datetime(df[datecol],format=r'%Y/%m/%d')
|
||
return df
|
||
|
||
|
||
def calculate_kdj(data, n=9):
|
||
'''
|
||
给传进来的df 添加列: 波动率,最高,最低,k ,d ,j
|
||
'''
|
||
data = data.sort_values(by='ds', ascending=True)
|
||
# 因为没有高开低价格,利用每日波动率模拟当天最高价和最低价
|
||
data['pctchange'] = data['y'].pct_change()
|
||
# 收益为0的用0.01
|
||
data['pctchange'] = data['pctchange'].replace(0,0.01)
|
||
data.dropna(inplace=True)
|
||
# 重置索引
|
||
data.reset_index(drop=True,inplace=True)
|
||
data['high'] = data['y']* (1+abs(data['pctchange'])/2)
|
||
data['low'] = data['y']* (1-abs(data['pctchange'])/2)
|
||
low_list = data['y'].rolling(window=n, min_periods=1).min()
|
||
high_list = data['y'].rolling(window=n, min_periods=1).max()
|
||
rsv = ((data['y'] - low_list) / (high_list - low_list)) * 100
|
||
k = pd.Series(50, index=data.index)
|
||
d = pd.Series(50, index=data.index)
|
||
for i in range(1, len(data)):
|
||
k[i] = (2/3 * k[i - 1]) + (1/3 * rsv[i])
|
||
d[i] = (2/3 * d[i - 1]) + (1/3 * k[i])
|
||
j = 3 * k - 2 * d
|
||
|
||
data['K'] = k
|
||
data['D'] = d
|
||
data['J'] = j
|
||
# 将包含 KDJ 指标的数据保存到新的 CSV 文件
|
||
data.to_csv('stock_data_with_kdj.csv', index=False)
|
||
# data = data.dropna()
|
||
return data
|
||
|
||
# 上传报告
|
||
def get_head_auth_report():
|
||
login_res = requests.post(url=login_pushreport_url, json=login_data, timeout=(3, 5))
|
||
text = json.loads(login_res.text)
|
||
if text["status"]:
|
||
token = text["data"]["accessToken"]
|
||
return token
|
||
|
||
|
||
def upload_report_data(token, upload_data):
|
||
upload_data = upload_data
|
||
headers = {"Authorization": token}
|
||
logger.info("报告上传中...")
|
||
logger.info(f"token:{token}")
|
||
logger.info(f"upload_data:{upload_data}" )
|
||
upload_res = requests.post(url=upload_url, headers=headers, json=upload_data, timeout=(3, 15))
|
||
upload_res = json.loads(upload_res.text)
|
||
logger.info(upload_res)
|
||
if upload_res:
|
||
return upload_res
|
||
else:
|
||
logger.info("报告上传失败")
|
||
return None
|
||
|
||
def upload_warning_data(warning_data):
|
||
token = get_head_auth_report()
|
||
warning_data = warning_data
|
||
headers = {"Authorization": token}
|
||
logger.info("预警上传中...")
|
||
logger.info(f"token:{token}")
|
||
logger.info(f"warning_data:{warning_data}" )
|
||
upload_res = requests.post(url=upload_warning_url, headers=headers, json=warning_data, timeout=(3, 15))
|
||
if upload_res:
|
||
return upload_res
|
||
else:
|
||
logger.info("预警上传失败")
|
||
return None
|
||
|
||
|
||
def upload_warning_info(last_update_times_df,y_last_update_time):
|
||
logger.info(f'上传预警信息')
|
||
try:
|
||
warning_data_df = last_update_times_df[last_update_times_df['warning_date']<y_last_update_time][['stop_update_period','warning_date','last_update_time','update_period','feature']]
|
||
warning_data_df.columns = ['停更周期','预警日期','最后更新时间','更新周期','特征名称']
|
||
if len(warning_data_df) > 0:
|
||
content = '原油特征指标预警信息:\n\n'
|
||
warning_data_df = warning_data_df.sort_values(by='停更周期',ascending=False)
|
||
fixed_length = 20
|
||
warning_data_df['特征名称'] = warning_data_df['特征名称'].str.replace(" ", "")
|
||
content = warning_data_df.to_string(index=False, col_space=fixed_length)
|
||
|
||
else:
|
||
logger.info(f'没有需要上传的预警信息')
|
||
content = '没有需要维护的特征指标'
|
||
warning_date = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
|
||
warning_data['data']['WARNING_DATE'] = warning_date
|
||
warning_data['data']['WARNING_CONTENT'] = content
|
||
|
||
upload_warning_data(warning_data)
|
||
logger.info(f'上传预警信息成功')
|
||
except Exception as e:
|
||
logger.error(f'上传预警信息失败:{e}')
|
||
|
||
|
||
def create_feature_last_update_time(df):
|
||
"""
|
||
计算特征停更信息用
|
||
参数:
|
||
df (DataFrame): 包含特征数据的 DataFrame
|
||
返回:
|
||
DataFrame: 包含特征停更信息的 DataFrame
|
||
str: y 列的最后更新时间
|
||
"""
|
||
df1 = df.copy()
|
||
# 找到每列的最后更新时间
|
||
df1.set_index('ds', inplace=True)
|
||
last_update_times = df1.apply(lambda x: x.dropna().index.max().strftime('%Y-%m-%d') if not x.dropna().empty else None)
|
||
|
||
# 保存每列的最后更新时间到文件
|
||
last_update_times_df = pd.DataFrame(columns = ['feature', 'last_update_time','is_value','update_period','warning_date','stop_update_period'])
|
||
|
||
# 打印每列的最后更新时间
|
||
for column, last_update_time in last_update_times.items():
|
||
values = []
|
||
# 判断是不是常数值
|
||
if df1[column].tail(20).nunique() == 1:
|
||
values = values + [column, last_update_time,1]
|
||
else:
|
||
values = values + [column, last_update_time,0]
|
||
# 计算特征数据值的时间差
|
||
try:
|
||
# 计算预警日期
|
||
time_diff = (df1[column].dropna().index.to_series().diff().mode()[0]).total_seconds() / 3600 / 24
|
||
last_update_time_datetime = datetime.datetime.strptime(last_update_time, '%Y-%m-%d')
|
||
last_update_date = end_time if end_time != '' else datetime.datetime.now().strftime('%Y-%m-%d')
|
||
end_time_datetime = datetime.datetime.strptime(last_update_date, '%Y-%m-%d')
|
||
early_warning_date = last_update_time_datetime + timedelta(days=time_diff)*2 + timedelta(days=1)
|
||
stop_update_period = int(math.ceil((end_time_datetime-last_update_time_datetime).days / time_diff))
|
||
early_warning_date = early_warning_date.strftime('%Y-%m-%d')
|
||
except KeyError:
|
||
time_diff = 0
|
||
early_warning_date = end_time
|
||
continue
|
||
|
||
values = values + [time_diff,early_warning_date,stop_update_period]
|
||
last_update_times_df.loc[len(last_update_times_df)] = values
|
||
|
||
logger.info(f"Column {column} was last updated at {last_update_time}")
|
||
y_last_update_time = last_update_times_df[last_update_times_df['feature']=='y']['warning_date'].values[0]
|
||
last_update_times_df.to_csv(os.path.join(dataset,'last_update_times.csv'), index=False)
|
||
logger.info('特征停更信息保存到文件:last_update_times.csv')
|
||
return last_update_times_df,y_last_update_time
|
||
|
||
|
||
# 统计特征频度
|
||
def featurePindu(dataset):
|
||
# 读取文件
|
||
df = loadcsv(os.path.join(dataset,'未填充的特征数据.csv'))
|
||
df['ds'] = pd.to_datetime(df['ds'])
|
||
# 按ds正序排序,重置索引
|
||
df = df.sort_values(by='ds', ascending=True).reset_index(drop=True)
|
||
|
||
# 统计特征频度
|
||
# 每列随机抽取10个值,计算出5个时间间隔,统计每个时间间隔的频度
|
||
columns = df.columns.to_list()
|
||
columns.remove('ds')
|
||
count_dict = {}
|
||
for column in columns:
|
||
# 获取每列时间间隔
|
||
values = df[[column,'ds']]
|
||
values.dropna(inplace=True,axis=0)
|
||
values=values.reset_index(drop=True)
|
||
|
||
# 抽取20%个值
|
||
value = values.sample(frac=0.2)
|
||
index = value.index
|
||
next_index = index + 1
|
||
count = []
|
||
for i,j in zip(index, next_index):
|
||
#通过索引计算日期差
|
||
try:
|
||
count.append((values.loc[j,'ds'] - values.loc[i,'ds']).days)
|
||
except:
|
||
pass
|
||
# 把31 换成 30
|
||
count = [30 if i == 31 else i for i in count]
|
||
# 保留count中出现次数最多的数
|
||
try:
|
||
count = max(set(count), key=count.count)
|
||
except ValueError :
|
||
logger.info(f'{column}列数据为空')
|
||
continue
|
||
# 存储到字典中
|
||
count_dict[column] = count
|
||
|
||
df = pd.DataFrame(count_dict,index=['count']).T
|
||
pindu_dfs = pd.DataFrame()
|
||
# 根据count分组
|
||
# 输出特征频度统计
|
||
pindudict = {'1':'日度','3':'日度','7':'周度','30':'月度','90':'季度','180':'半年度','365':'年度'}
|
||
for i in df.groupby('count'):
|
||
# 获取 i[1] 的索引值
|
||
index = i[1].index
|
||
pindu_df = pd.DataFrame()
|
||
try:
|
||
pindu_df[pindudict[str(i[0])]+f'({len(i[1])})'] = index
|
||
except KeyError :
|
||
pindu_df[str(i[0])+f'天({len(i[1])})'] = index
|
||
# 合并到pindu_dfs
|
||
pindu_dfs = pd.concat([pindu_dfs,pindu_df],axis=1)
|
||
# nan替换为 ' '
|
||
pindu_dfs = pindu_dfs.fillna('')
|
||
pindu_dfs.to_csv(os.path.join(dataset,'特征频度统计.csv'),index=False)
|
||
logger.info(pindu_dfs)
|
||
featureInfo = f'特征信息:总共有{len(columns)-2}个'
|
||
for i in pindu_dfs.columns:
|
||
featureInfo += f',{i}'
|
||
|
||
featureInfo += ', 详看 附1、特征列表'
|
||
|
||
featureInfo += '''
|
||
数据特征工程:
|
||
1. 数据日期排序,新日期在最后
|
||
2. 删除空列,特征数据列没有值,就删除
|
||
3. 删除近两月不再更新值的指标
|
||
4. 非日度数据填充为日度数据,填充规则:
|
||
-- 向后填充,举例:假设周五出现一个周度指标数据,那么在这之前的数据用上周五的数据
|
||
-- 向前填充,举例:采集数据开始日期为2018年1月1日,那么周度数据可能是2018年1月3日,那么3日的数据向前填充,使1日2日都有数值
|
||
数据特征相关性分析:
|
||
'''
|
||
logger.info(featureInfo)
|
||
with open(os.path.join(dataset,'特征频度统计.txt'), 'w', encoding='utf-8') as f:
|
||
f.write(featureInfo)
|
||
logger.info('*'*200)
|
||
|
||
|
||
def featureAnalysis(df,dataset,y):
|
||
# 特征筛选
|
||
import matplotlib.pyplot as plt
|
||
# 选择特征和标签列
|
||
X = df.drop(['ds', 'y'], axis=1) # 特征集,排除时间戳和标签列
|
||
yy = df['y'] # 标签集
|
||
|
||
# 标签集自相关函数分析
|
||
from statsmodels.graphics.tsaplots import plot_acf
|
||
plot_acf(yy, lags=30)
|
||
plt.savefig(os.path.join(dataset,'指标数据自相关图.png'))
|
||
plt.close()
|
||
|
||
# 标签集偏自相关函数分析
|
||
from statsmodels.graphics.tsaplots import plot_pacf
|
||
plot_pacf(yy, lags=30)
|
||
plt.savefig(os.path.join(dataset,'指标数据偏自相关图.png'))
|
||
plt.close()
|
||
|
||
# 画 特征与价格散点图
|
||
# 删除所有*散点图.png
|
||
for file in os.listdir(dataset):
|
||
if file.endswith("散点图.png"):
|
||
os.remove(os.path.join(dataset, file))
|
||
plt.rcParams['font.sans-serif'] = ['SimHei']
|
||
plt.rcParams['axes.unicode_minus'] = False
|
||
plt.figure(figsize=(10, 10))
|
||
# # 遍历X每一列,和yy画散点图 ,
|
||
# for i, col in enumerate(X.columns):
|
||
# plt.subplot(2, 2, i%4+1)
|
||
# plt.scatter(X[col], yy)
|
||
# plt.xlabel(col)
|
||
# plt.ylabel(y)
|
||
# plt.title(col)
|
||
# if i % 4 == 3 or i == len(X.columns)-1:
|
||
# plt.tight_layout()
|
||
# plt.savefig(os.path.join(dataset,f'{i}指标数据特征与价格散点图.png'))
|
||
# plt.close()
|
||
|
||
|
||
|
||
def corr_feature(df):
|
||
# 重新命名列名,列名排序,y在第一个
|
||
df.reindex(['y'] + sorted(df.columns.difference(['y'])))
|
||
df_test = df.copy()
|
||
# 取最后的220行
|
||
df_test = df_test.tail(220)
|
||
# 去掉日期列
|
||
df_test = df_test.drop(columns=['ds'])
|
||
# 不参与标准化
|
||
df_test_noscaler = df_test.copy() # 滞后处理备份
|
||
df_noscaler = df_test.copy()
|
||
# 画出相关性热力图
|
||
df_test.to_csv(os.path.join(dataset,'同步相关性.csv'))
|
||
corr = df_test.corr()
|
||
# 保存相关系数
|
||
corr.to_csv(os.path.join(dataset,'同步相关性系数.csv'))
|
||
# plt.figure(figsize=(10, 10))
|
||
# sns.heatmap(corr, annot=True, cmap='coolwarm')
|
||
# plt.savefig('dataset/同步相关性热力图.png')
|
||
# plt.show()
|
||
|
||
# 读取滞后周期文件,更改特征
|
||
characteristic_period = pd.read_csv('dataset/特征滞后周期.csv',encoding='utf-8')
|
||
# 去掉周期为0的行
|
||
characteristic_period = characteristic_period.drop(characteristic_period[characteristic_period['滞后周期'] == 0].index)
|
||
for col in df.columns:
|
||
# 跳过y列
|
||
if col in ['y']:
|
||
continue
|
||
# 特征滞后n个周期,计算与y的相关性
|
||
if col in characteristic_period['特征'].values:
|
||
# 获取特征对应的周期
|
||
period = characteristic_period[characteristic_period['特征'] == col]['滞后周期'].values[0]
|
||
# 滞后处理
|
||
df[col] = df[col].shift(period)
|
||
df.to_csv(os.path.join(dataset,'滞后处理后的数据集.csv'))
|
||
|
||
|
||
# corr_feture_noscaler = {} # 保存相关性最大的周期
|
||
# 遍历df_test的每一列,计算相关性
|
||
# for col in df_noscaler.columns:
|
||
# # 跳过y列
|
||
# if col in ['y']:
|
||
# continue
|
||
# logger.info('特征:', col)
|
||
# # 特征滞后n个周期,计算与y的相关性
|
||
# corr_dict = {}
|
||
# try:
|
||
# for i in range(0, 200):
|
||
# if i == 0:
|
||
# df_noscaler[col+'_'+str(i)] = df_noscaler[col]
|
||
# else:
|
||
# df_noscaler[col+'_'+str(i)] = df_noscaler[col].shift(i)
|
||
# corr_dict[col+'_'+str(i)] = abs(df_noscaler[col+'_'+str(i)].corr(df_noscaler['y']))
|
||
# except :
|
||
# logger.info('特征:', col, '滑动错误,请查看')
|
||
# continue
|
||
# 输出相关性最大的特征
|
||
# logger.info(max(corr_dict, key=corr_dict.get), corr_dict[max(corr_dict, key=corr_dict.get)])
|
||
# corr_feture_noscaler[col] = max(corr_dict, key=corr_dict.get).split('_')[-1]
|
||
# 画出最相关性最大的特征和y的折线图
|
||
# plt.figure(figsize=(10, 5))
|
||
# plt.plot(df_noscaler[max(corr_dict, key=corr_dict.get)], label=max(corr_dict, key=corr_dict.get))
|
||
# # 设置双坐标轴
|
||
# ax1 = plt.gca()
|
||
# ax2 = ax1.twinx()
|
||
# ax2.plot(df_noscaler['y'], color='r', label='y')
|
||
# plt.legend()
|
||
# try:
|
||
# plt.savefig('dataset/特征与y的折线图_'+max(corr_dict, key=corr_dict.get)+'.png')
|
||
# except :
|
||
# # :替换成_
|
||
# plt.savefig('dataset/特征与y的折线图_'+max(corr_dict, key=corr_dict.get).replace(':','_').replace('/','_').replace('(','_').replace(')','_')+'.png')
|
||
# plt.close()
|
||
# 结果保存到txt文件
|
||
# logger.info('不参与标准化的特征滞后相关性写入txt文件')
|
||
# with open('dataset/不参与标准化的特征滞后相关性.txt', 'w') as f:
|
||
# for key, value in corr_feture_noscaler.items():
|
||
# f.write('%s:%s\n' % (key, value))
|
||
# 遍历corr_feture_noscaler,更改df
|
||
# colnames_noscaler = []
|
||
# for col in corr_feture_noscaler:
|
||
# colname = col+'_'+corr_feture_noscaler[col]
|
||
# if int(corr_feture_noscaler[col]) == 0:
|
||
# continue
|
||
# df_test_noscaler[colname] = df_test_noscaler[col].shift(int(corr_feture_noscaler[col]))
|
||
# df_test_noscaler = df_test_noscaler.drop(columns=[col])
|
||
# colnames_noscaler.append(colname)
|
||
# 去除有空值的行
|
||
# df_test_noscaler = df_test_noscaler.dropna()
|
||
# df_test_noscaler.reindex(['y'] + sorted(df_test_noscaler.columns.difference(['y'])))
|
||
# df_test_noscaler.to_csv('dataset/不参与标准化的特征滞后相关性.csv', index=False)
|
||
# 画出相关性热力图
|
||
# corr = df_test_noscaler.corr()
|
||
# 保存相关系数
|
||
# corr.to_csv(os.path.join(dataset,'不参与标准化的特征滞后相关性系数.csv'))
|
||
# plt.figure(figsize=(10, 10))
|
||
# sns.heatmap(corr, annot=True, cmap='coolwarm')
|
||
# plt.savefig('dataset/不参与标准化的特征滞后相关性热力图.png')
|
||
# plt.close()
|
||
# # 标准化每列
|
||
# from sklearn.preprocessing import StandardScaler
|
||
# scaler = StandardScaler()
|
||
# df_test = pd.DataFrame(scaler.fit_transform(df_test), columns=df_test.columns)
|
||
# corr_feture = {} # 保存相关性最大的周期
|
||
# # 遍历df_test的每一列,计算相关性
|
||
# for col in df_test.columns:
|
||
# # 跳过y列
|
||
# if col == 'y':
|
||
# continue
|
||
# logger.info('特征:', col)
|
||
# # 特征滞后n个周期,计算与y的相关性
|
||
# corr_dict = {}
|
||
# try:
|
||
# for i in range(0, 200):
|
||
# if i == 0:
|
||
# df_test[col+'_'+str(i)] = df_test[col]
|
||
# else:
|
||
# df_test[col+'_'+str(i)] = df_test[col].shift(i)
|
||
# corr_dict[col+'_'+str(i)] = abs(df_test[col+'_'+str(i)].corr(df_test['y']))
|
||
# except :
|
||
# logger.info('特征:', col, '滑动错误,请查看')
|
||
# continue
|
||
# # 输出相关性最大的特征
|
||
# logger.info(max(corr_dict, key=corr_dict.get), corr_dict[max(corr_dict, key=corr_dict.get)])
|
||
# corr_feture[col] = max(corr_dict, key=corr_dict.get).split('_')[-1]
|
||
|
||
|
||
# # 结果保存到txt文件
|
||
# with open('dataset/标准化的特征滞后相关性.txt', 'w') as f:
|
||
# for key, value in corr_feture.items():
|
||
# f.write('%s:%s\n' % (key, value))
|
||
# # 遍历corr_feture,更改df
|
||
# colnames = []
|
||
# for col in corr_feture:
|
||
# colname = col+'_'+corr_feture[col]
|
||
# if int(corr_feture[col]) == 0:
|
||
# continue
|
||
# df[colname] = df[col].shift(int(corr_feture[col]))
|
||
# df = df.drop(columns=[col])
|
||
# colnames.append(colname)
|
||
# # 去除有空值的行
|
||
# df = df.dropna()
|
||
# df.reindex(['y'] + sorted(df.columns.difference(['y'])))
|
||
# df.to_csv('dataset/标准化后的特征滞后相关性.csv', index=False)
|
||
# # 画出相关性热力图
|
||
# ds = df['ds']
|
||
# df = df.drop(columns=['ds'])
|
||
# corr = df.corr()
|
||
# # 保存相关系数
|
||
# corr.to_csv(os.path.join(dataset,'标准化后的特征滞后相关性系数.csv'))
|
||
# plt.figure(figsize=(10, 10))
|
||
# sns.heatmap(corr, annot=True, cmap='coolwarm')
|
||
# plt.savefig('dataset/标准化后的特征滞后相关性热力图.png')
|
||
# plt.show()
|
||
# df['ds'] = ds
|
||
|
||
# 去除nan值
|
||
df = df.dropna()
|
||
return df
|
||
|
||
|
||
def calculate_kdj(data, n=9):
|
||
'''
|
||
给传进来的df 添加列: 波动率,最高,最低,k ,d ,j
|
||
'''
|
||
data = data.sort_values(by='ds', ascending=True)
|
||
# 因为没有高开低价格,利用每日波动率模拟当天最高价和最低价
|
||
data['pctchange'] = data['y'].pct_change()
|
||
# 收益为0的用0.01
|
||
data['pctchange'] = data['pctchange'].replace(0,0.01)
|
||
data.dropna(inplace=True)
|
||
# 重置索引
|
||
data.reset_index(drop=True,inplace=True)
|
||
data['high'] = data['y']* (1+abs(data['pctchange'])/2)
|
||
data['low'] = data['y']* (1-abs(data['pctchange'])/2)
|
||
low_list = data['y'].rolling(window=n, min_periods=1).min()
|
||
high_list = data['y'].rolling(window=n, min_periods=1).max()
|
||
rsv = ((data['y'] - low_list) / (high_list - low_list)) * 100
|
||
k = pd.Series(50, index=data.index)
|
||
d = pd.Series(50, index=data.index)
|
||
for i in range(1, len(data)):
|
||
k[i] = (2/3 * k[i - 1]) + (1/3 * rsv[i])
|
||
d[i] = (2/3 * d[i - 1]) + (1/3 * k[i])
|
||
j = 3 * k - 2 * d
|
||
|
||
data['K'] = k
|
||
data['D'] = d
|
||
data['J'] = j
|
||
# 将包含 KDJ 指标的数据保存到新的 CSV 文件
|
||
data.to_csv('dataset\stock_data_with_kdj.csv', index=False)
|
||
# data = data.dropna()
|
||
return data
|
||
|
||
def check_column(df,col_name,two_months_ago):
|
||
'''
|
||
检查列是否需要删除。
|
||
该函数会检查列是否为空值列、180天没有更新的列或常数值列。
|
||
参数:
|
||
col_name (str): 列名。
|
||
df (DataFrame): 包含列的 DataFrame。
|
||
返回:
|
||
bool: 如果列需要删除,返回 True;否则,返回 False。
|
||
'''
|
||
if 'ds' in col_name or 'y' in col_name:
|
||
return False
|
||
df_check_column = df[['ds',col_name]]
|
||
df_check_column = df_check_column.dropna()
|
||
|
||
if len(df_check_column) == 0:
|
||
print(f'空值列:{col_name}')
|
||
return True
|
||
# 判断是不是常数列
|
||
if df_check_column[(df_check_column['ds']>= two_months_ago)].groupby(col_name).ngroups < 2:
|
||
print(f'180没有更新:{col_name}')
|
||
return True
|
||
corresponding_date = df_check_column.iloc[-1]['ds']
|
||
return corresponding_date < two_months_ago
|
||
|
||
def datachuli(df_zhibiaoshuju,df_zhibiaoliebiao,datecol='date',end_time='',y='y',dataset='dataset',delweekenday=False,add_kdj=False,is_timefurture=False):
|
||
'''
|
||
原油特征数据处理函数,
|
||
接收的是两个df,一个是指标数据,一个是指标列表
|
||
输出的是一个df,包含ds,y,指标列
|
||
'''
|
||
df = df_zhibiaoshuju.copy()
|
||
|
||
if end_time == '':
|
||
end_time = datetime.datetime.now().strftime('%Y-%m-%d')
|
||
# 重命名时间列,预测列
|
||
df.rename(columns={datecol:'ds'},inplace=True)
|
||
df.rename(columns={y:'y'},inplace=True)
|
||
# 按时间顺序排列
|
||
df.sort_values(by='ds',inplace=True)
|
||
df['ds'] = pd.to_datetime(df['ds'])
|
||
# 获取start_year年到end_time的数据
|
||
df = df[df['ds'].dt.year >= start_year]
|
||
df = df[df['ds'] <= end_time]
|
||
last_update_times_df,y_last_update_time = create_feature_last_update_time(df)
|
||
logger.info(f'删除预警的特征前数据量:{df.shape}')
|
||
columns_to_drop = last_update_times_df[last_update_times_df['warning_date'] < y_last_update_time ]['feature'].values.tolist()
|
||
df = df.drop(columns = columns_to_drop)
|
||
logger.info(f'删除预警的特征后数据量:{df.shape}')
|
||
if is_update_warning_data:
|
||
upload_warning_info(last_update_times_df,y_last_update_time)
|
||
# 去掉近最后数据对应的日期在六月以前的列,删除近2月的数据是常熟的列
|
||
current_date = datetime.datetime.now()
|
||
two_months_ago = current_date - timedelta(days=180)
|
||
logger.info(f'删除两月不更新特征前数据量:{df.shape}')
|
||
columns_to_drop = []
|
||
for clo in df.columns:
|
||
if check_column(df,clo,two_months_ago):
|
||
columns_to_drop.append(clo)
|
||
df = df.drop(columns=columns_to_drop)
|
||
|
||
logger.info(f'删除两月不更新特征后数据量:{df.shape}')
|
||
|
||
if freq == 'W':
|
||
# 按周取样
|
||
df = df.resample('W', on='ds').mean().reset_index()
|
||
elif freq == 'M':
|
||
# 按月取样
|
||
df = df.resample('M', on='ds').mean().reset_index()
|
||
# 删除预测列空值的行
|
||
df = df.dropna(subset=['y'])
|
||
logger.info(f'删除预测列为空值的行后数据量:{df.shape}')
|
||
df = df.dropna(axis=1, how='all')
|
||
logger.info(f'删除全为空值的列后数据量:{df.shape}')
|
||
df.to_csv(os.path.join(dataset,'未填充的特征数据.csv'),index=False)
|
||
# 去掉指标列表中的columns_to_drop的行
|
||
df_zhibiaoliebiao = df_zhibiaoliebiao[df_zhibiaoliebiao['指标名称'].isin(df.columns.tolist())]
|
||
df_zhibiaoliebiao.to_csv(os.path.join(dataset,'特征处理后的指标名称及分类.csv'),index=False)
|
||
# 数据频度分析
|
||
featurePindu(dataset=dataset)
|
||
# 向上填充
|
||
df = df.ffill()
|
||
# 向下填充
|
||
df = df.bfill()
|
||
|
||
# 删除周六日的数据
|
||
if delweekenday:
|
||
df = df[df['ds'].dt.weekday < 5]
|
||
|
||
# kdj指标
|
||
if add_kdj:
|
||
df = calculate_kdj(df)
|
||
# 衍生时间特征
|
||
if is_timefurture:
|
||
df = addtimecharacteristics(df=df,dataset=dataset)
|
||
# 特征分析
|
||
featureAnalysis(df,dataset=dataset,y=y)
|
||
return df
|
||
|
||
def datachuli_juxiting(df_zhibiaoshuju,df_zhibiaoliebiao,datecol='date',end_time='',y='y',dataset='dataset',delweekenday=False,add_kdj=False,is_timefurture=False):
|
||
'''
|
||
聚烯烃特征数据处理函数,
|
||
接收的是两个df,一个是指标数据,一个是指标列表
|
||
输出的是一个df,包含ds,y,指标列
|
||
'''
|
||
df = df_zhibiaoshuju.copy()
|
||
if end_time == '':
|
||
end_time = datetime.datetime.now().strftime('%Y-%m-%d')
|
||
# date转为pddate
|
||
df.rename(columns={datecol:'ds'},inplace=True)
|
||
|
||
# 指定列统一减少数值
|
||
df[offsite_col] = df[offsite_col]-offsite
|
||
# 预测列为avg_cols的均值
|
||
df[y] = df[avg_cols].mean(axis=1)
|
||
# 去掉多余的列avg_cols
|
||
df = df.drop(columns=avg_cols)
|
||
|
||
# 重命名预测列
|
||
df.rename(columns={y:'y'},inplace=True)
|
||
# 按时间顺序排列
|
||
df.sort_values(by='ds',inplace=True)
|
||
df['ds'] = pd.to_datetime(df['ds'])
|
||
# 获取2018年到当前日期的数据
|
||
df = df[df['ds'].dt.year >= 2018]
|
||
# 获取小于等于当前日期的数据
|
||
df = df[df['ds'] <= end_time]
|
||
logger.info(f'删除两月不更新特征前数据量:{df.shape}')
|
||
# 去掉近最后数据对应的日期在两月以前的列,删除近2月的数据是常数的列
|
||
current_date = datetime.datetime.now()
|
||
two_months_ago = current_date - timedelta(days=40)
|
||
# 检查两月不更新的特征
|
||
def check_column(col_name):
|
||
if 'ds' in col_name or 'y' in col_name:
|
||
return False
|
||
df_check_column = df[['ds',col_name]]
|
||
df_check_column = df_check_column.dropna()
|
||
if len(df_check_column) == 0:
|
||
return True
|
||
if df_check_column[(df_check_column['ds']>= two_months_ago)].groupby(col_name).ngroups < 2:
|
||
return True
|
||
corresponding_date = df_check_column.iloc[-1]['ds']
|
||
return corresponding_date < two_months_ago
|
||
columns_to_drop = df.columns[df.columns.map(check_column)].tolist()
|
||
df = df.drop(columns = columns_to_drop)
|
||
|
||
logger.info(f'删除两月不更新特征后数据量:{df.shape}')
|
||
|
||
# 删除预测列空值的行
|
||
df = df.dropna(subset=['y'])
|
||
logger.info(f'删除预测列为空值的行后数据量:{df.shape}')
|
||
df = df.dropna(axis=1, how='all')
|
||
logger.info(f'删除全为空值的列后数据量:{df.shape}')
|
||
df.to_csv(os.path.join(dataset,'未填充的特征数据.csv'),index=False)
|
||
# 去掉指标列表中的columns_to_drop的行
|
||
df_zhibiaoliebiao = df_zhibiaoliebiao[df_zhibiaoliebiao['指标名称'].isin(df.columns.tolist())]
|
||
df_zhibiaoliebiao.to_csv(os.path.join(dataset,'特征处理后的指标名称及分类.csv'),index=False)
|
||
# 频度分析
|
||
featurePindu(dataset=dataset)
|
||
# 向上填充
|
||
df = df.ffill()
|
||
# 向下填充
|
||
df = df.bfill()
|
||
|
||
# 删除周六日的数据
|
||
if delweekenday:
|
||
df = df[df['ds'].dt.weekday < 5]
|
||
|
||
if add_kdj:
|
||
df = calculate_kdj(df)
|
||
|
||
if is_timefurture:
|
||
df = addtimecharacteristics(df=df,dataset=dataset)
|
||
|
||
featureAnalysis(df,dataset=dataset,y=y)
|
||
return df
|
||
|
||
def getdata(filename, datecol='date',y='y',dataset='',add_kdj=False,is_timefurture=False,end_time=''):
|
||
logger.info('getdata接收:'+filename+' '+datecol+' '+end_time)
|
||
# 判断后缀名 csv或excel
|
||
if filename.endswith('.csv'):
|
||
df = loadcsv(filename)
|
||
else:
|
||
# 读取excel 指标数据
|
||
df_zhibiaoshuju = pd.read_excel(filename,sheet_name='指标数据')
|
||
df_zhibiaoliebiao = pd.read_excel(filename,sheet_name='指标列表')
|
||
|
||
|
||
# 日期字符串转为datatime
|
||
df = datachuli(df_zhibiaoshuju,df_zhibiaoliebiao,datecol,y = y,dataset=dataset,add_kdj=add_kdj,is_timefurture=is_timefurture,end_time=end_time)
|
||
|
||
return df
|
||
def getdata_juxiting(filename, datecol='date',y='y',dataset='',add_kdj=False,is_timefurture=False,end_time=''):
|
||
logger.info('getdata接收:'+filename+' '+datecol+' '+end_time)
|
||
# 判断后缀名 csv或excel
|
||
if filename.endswith('.csv'):
|
||
df = loadcsv(filename)
|
||
else:
|
||
# 读取excel 指标数据
|
||
df_zhibiaoshuju = pd.read_excel(filename,sheet_name='指标数据')
|
||
df_zhibiaoliebiao = pd.read_excel(filename,sheet_name='指标列表')
|
||
|
||
# 日期字符串转为datatime
|
||
df = datachuli_juxiting(df_zhibiaoshuju,df_zhibiaoliebiao,datecol,y = y,dataset=dataset,add_kdj=add_kdj,is_timefurture=is_timefurture,end_time=end_time)
|
||
|
||
return df
|
||
|
||
|
||
def sanitize_filename(filename):
|
||
# 使用正则表达式替换不合规的字符
|
||
# 这里我们替换为下划线'_',但你可以根据需要选择其他字符
|
||
sanitized = re.sub(r'[\\/*?:"<>|\s]', '_', filename)
|
||
# 移除开头的点(在某些系统中,以点开头的文件可能是隐藏的)
|
||
sanitized = re.sub(r'^\.', '', sanitized)
|
||
# 如果需要,可以添加更多替换规则
|
||
return sanitized
|
||
|
||
class BinanceAPI:
|
||
'''
|
||
获取 Binance API 请求头签名
|
||
'''
|
||
def __init__(self, APPID, SECRET):
|
||
self.APPID = APPID
|
||
self.SECRET = SECRET
|
||
self.get_signature()
|
||
|
||
# 生成随机字符串作为 nonce
|
||
def generate_nonce(self, length=32):
|
||
self.nonce = ''.join(random.choices(string.ascii_letters + string.digits, k=length))
|
||
return self.nonce
|
||
|
||
# 获取当前时间戳(秒)
|
||
def get_timestamp(self):
|
||
return int(time.time())
|
||
|
||
# 构建待签名字符串
|
||
def build_sign_str(self):
|
||
return f'appid={self.APPID}&nonce={self.nonce}×tamp={self.timestamp}'
|
||
|
||
# 使用 HMAC SHA-256 计算签名
|
||
def calculate_signature(self, secret, message):
|
||
return base64.urlsafe_b64encode(hmac.new(secret.encode('utf-8'), message.encode('utf-8'), hashlib.sha256).digest()).decode('utf-8')
|
||
|
||
def get_signature(self):
|
||
# 调用上述方法生成签名
|
||
self.nonce = self.generate_nonce()
|
||
self.timestamp = self.get_timestamp()
|
||
self.sign_str = self.build_sign_str()
|
||
self.signature = self.calculate_signature(self.SECRET, self.sign_str)
|
||
# return self.signature
|
||
class Graphs:
|
||
# 绘制标题
|
||
@staticmethod
|
||
def draw_title(title: str):
|
||
# 获取所有样式表
|
||
style = getSampleStyleSheet()
|
||
# 拿到标题样式
|
||
ct = style['Heading1']
|
||
# 单独设置样式相关属性
|
||
ct.fontName = 'SimSun' # 字体名
|
||
ct.fontSize = 18 # 字体大小
|
||
ct.leading = 50 # 行间距
|
||
ct.textColor = colors.green # 字体颜色
|
||
ct.alignment = 1 # 居中
|
||
ct.bold = True
|
||
# 创建标题对应的段落,并且返回
|
||
return Paragraph(title, ct)
|
||
|
||
# 绘制小标题
|
||
@staticmethod
|
||
def draw_little_title(title: str):
|
||
# 获取所有样式表
|
||
style = getSampleStyleSheet()
|
||
# 拿到标题样式
|
||
ct = style['Normal']
|
||
# 单独设置样式相关属性
|
||
ct.fontName = 'SimSun' # 字体名
|
||
ct.fontSize = 15 # 字体大小
|
||
ct.leading = 30 # 行间距
|
||
ct.textColor = colors.red # 字体颜色
|
||
# 创建标题对应的段落,并且返回
|
||
return Paragraph(title, ct)
|
||
|
||
# 绘制普通段落内容
|
||
@staticmethod
|
||
def draw_text(text: str):
|
||
# 获取所有样式表
|
||
style = getSampleStyleSheet()
|
||
# 获取普通样式
|
||
ct = style['Normal']
|
||
ct.fontName = 'SimSun'
|
||
ct.fontSize = 12
|
||
ct.wordWrap = 'CJK' # 设置自动换行
|
||
ct.alignment = 0 # 左对齐
|
||
ct.firstLineIndent = 32 # 第一行开头空格
|
||
ct.leading = 25
|
||
return Paragraph(text, ct)
|
||
|
||
# 绘制表格
|
||
@staticmethod
|
||
def draw_table(*args):
|
||
# 列宽度
|
||
col_width = args[0]
|
||
style = [
|
||
('FONTNAME', (0, 0), (-1, -1), 'SimSun'), # 字体
|
||
('FONTSIZE', (0, 0), (-1, 0), 12), # 第一行的字体大小
|
||
('FONTSIZE', (0, 1), (-1, -1), 10), # 第二行到最后一行的字体大小
|
||
('BACKGROUND', (0, 0), (-1, 0), '#d5dae6'), # 设置第一行背景颜色
|
||
('ALIGN', (0, 0), (-1, -1), 'CENTER'), # 第一行水平居中
|
||
('ALIGN', (0, 1), (-1, -1), 'LEFT'), # 第二行到最后一行左右左对齐
|
||
('VALIGN', (0, 0), (-1, -1), 'MIDDLE'), # 所有表格上下居中对齐
|
||
('TEXTCOLOR', (0, 0), (-1, -1), colors.darkslategray), # 设置表格内文字颜色
|
||
('GRID', (0, 0), (-1, -1), 0.5, colors.grey), # 设置表格框线为grey色,线宽为0.5
|
||
# ('SPAN', (0, 1), (0, 2)), # 合并第一列二三行
|
||
# ('SPAN', (0, 3), (0, 4)), # 合并第一列三四行
|
||
# ('SPAN', (0, 5), (0, 6)), # 合并第一列五六行
|
||
# ('SPAN', (0, 7), (0, 8)), # 合并第一列五六行
|
||
]
|
||
table = Table(args[1:], colWidths=col_width, style=style)
|
||
return table
|
||
|
||
# 创建图表
|
||
@staticmethod
|
||
def draw_bar(bar_data: list, ax: list, items: list):
|
||
drawing = Drawing(500, 250)
|
||
bc = VerticalBarChart()
|
||
bc.x = 45 # 整个图表的x坐标
|
||
bc.y = 45 # 整个图表的y坐标
|
||
bc.height = 200 # 图表的高度
|
||
bc.width = 350 # 图表的宽度
|
||
bc.data = bar_data
|
||
bc.strokeColor = colors.black # 顶部和右边轴线的颜色
|
||
bc.valueAxis.valueMin = 5000 # 设置y坐标的最小值
|
||
bc.valueAxis.valueMax = 26000 # 设置y坐标的最大值
|
||
bc.valueAxis.valueStep = 2000 # 设置y坐标的步长
|
||
bc.categoryAxis.labels.dx = 2
|
||
bc.categoryAxis.labels.dy = -8
|
||
bc.categoryAxis.labels.angle = 20
|
||
bc.categoryAxis.categoryNames = ax
|
||
|
||
# 图示
|
||
leg = Legend()
|
||
leg.fontName = 'SimSun'
|
||
leg.alignment = 'right'
|
||
leg.boxAnchor = 'ne'
|
||
leg.x = 475 # 图例的x坐标
|
||
leg.y = 240
|
||
leg.dxTextSpace = 10
|
||
leg.columnMaximum = 3
|
||
leg.colorNamePairs = items
|
||
drawing.add(leg)
|
||
drawing.add(bc)
|
||
return drawing
|
||
|
||
# 绘制图片
|
||
@staticmethod
|
||
def draw_img(path):
|
||
img = Image(path) # 读取指定路径下的图片
|
||
img.drawWidth = 20*cm # 设置图片的宽度
|
||
img.drawHeight = 10*cm # 设置图片的高度
|
||
return img
|
||
|
||
# 定义样式函数
|
||
def style_row(row):
|
||
if '周' in row['频度']:
|
||
return ['background-color: yellow'] * len(row)
|
||
else:
|
||
return ['background-color: gray'] * len(row)
|
||
|
||
|
||
|
||
class EtaReader():
|
||
def __init__(self,signature,classifylisturl,classifyidlisturl,edbcodedataurl,edbcodelist,edbdatapushurl,edbdeleteurl,edbbusinessurl):
|
||
# 获取签名
|
||
self.signature = signature
|
||
self.classifylisturl = classifylisturl
|
||
self.classifyidlisturl = classifyidlisturl
|
||
self.edbcodedataurl = edbcodedataurl
|
||
self.edbdatapushurl = edbdatapushurl
|
||
self.edbcodelist = edbcodelist
|
||
self.edbdeleteurl = edbdeleteurl
|
||
self.edbbusinessurl = edbbusinessurl
|
||
pass
|
||
|
||
def filter_yuanyou_data(self,ClassifyName,data):
|
||
'''
|
||
指标名称保留规则
|
||
'''
|
||
|
||
# 包含 关键词 去除, 返回flase
|
||
if any(keyword in data for keyword in ['运费','检修','波动率','地缘政治','股价',
|
||
'同比','环比','环差','裂差','4WMA','变频','道琼斯','标普500','纳斯达克',
|
||
'四周均值','名占比','残差','DMA',
|
||
'连7-连9','4周平均','4周均值','滚动相关性','日本']):
|
||
return False
|
||
|
||
# 检查需要的特征
|
||
# 去掉 分析 分类下的数据
|
||
if ClassifyName == '分析':
|
||
return False
|
||
|
||
# 保留 库存中特殊关键词
|
||
if ClassifyName == '库存':
|
||
if any(keyword in data for keyword in ['原油' , '美国' ,'全球' ,'中国' ,'富查伊拉','ARA' ]):
|
||
return True
|
||
else:
|
||
pass
|
||
else:
|
||
pass
|
||
|
||
# 去掉 持仓中不是基金的数据
|
||
if ClassifyName == '持仓':
|
||
if '基金' not in data:
|
||
return False
|
||
else:
|
||
pass
|
||
else:
|
||
pass
|
||
|
||
# 去掉 航班中不是中国、美国 的数据
|
||
if ClassifyName == '需求':
|
||
if '航班' in data :
|
||
if '中国' in data or '美国' in data :
|
||
return True
|
||
else:
|
||
return False
|
||
else:
|
||
pass
|
||
else:
|
||
pass
|
||
|
||
# 分类为 期货市场,同质性数据取第一个
|
||
if ClassifyName == '期货市场':
|
||
# 去掉c1-9 以后的
|
||
if 'c1-c' in data:
|
||
try:
|
||
c = int(data.split('c1-c')[1])
|
||
except:
|
||
return False
|
||
if c > 9 :
|
||
return False
|
||
else:
|
||
pass
|
||
|
||
else:
|
||
pass
|
||
|
||
# 判断 同质性数据, 字符串开头
|
||
strstartdict = {'ICE Brent c':"ICE Brent c14",
|
||
'NYMWX WTI c':"NYMWX WTI c5",
|
||
'INE SC c':"INE SC c1",
|
||
'EFS c':"EFS c",
|
||
'Dubai Swap c':"Dubai Swap c1",
|
||
'Oman Swap c':"Oman Swap c1",
|
||
'DME Oman c':"DME Oman c1",
|
||
'Murban Futures c':"Murban Futures c1",
|
||
'Dubai连合约价格':'Dubai连1合约价格',
|
||
'美国RBOB期货月份合约价格':'美国RBOB期货2309月份合约价格',
|
||
'Brent连合约价格':'Brent连1合约价格',
|
||
'WTI连合约价格':'WTI连1合约价格',
|
||
'布伦特连合约价格':'Brent连1合约价格',
|
||
'Brent 连合约价格':'Brent连1合约价格',
|
||
'Dubai连合约价格':'Dubai连1合约价格',
|
||
'Brent连':'Brent连1合约价格',
|
||
'brent连':'Brent连1合约价格',
|
||
}
|
||
# 判断名称字符串开头是否在 strstartdict.keys中
|
||
match = re.match(r'([a-zA-Z\s]+)(\d+)', data)
|
||
if match:
|
||
part1 = match.group(1)
|
||
part2 = match.group(2)
|
||
if part1 in [i for i in strstartdict.keys()]:
|
||
if data == strstartdict[part1]:
|
||
return True
|
||
else:
|
||
return False
|
||
# data = 'Brent 连7合约价格'
|
||
# 判断名称字符串去掉数字后是否在 strstartdict.keys中
|
||
match = re.findall(r'\D+', data)
|
||
if match :
|
||
if len(match) == 2:
|
||
part1 = match[0]
|
||
part2 = match[1]
|
||
if part1+part2 in [i for i in strstartdict.keys()]:
|
||
if data == strstartdict[part1+part2]:
|
||
return True
|
||
else:
|
||
return False
|
||
else:
|
||
pass
|
||
elif len(match) == 1:
|
||
match = re.findall(r'\D+', data)
|
||
part1 = match[0]
|
||
|
||
if part1 in [i for i in strstartdict.keys()]:
|
||
if data == strstartdict[part1]:
|
||
return True
|
||
else:
|
||
return False
|
||
else:
|
||
pass
|
||
else:
|
||
pass
|
||
|
||
# 去掉kpler数据源
|
||
if 'Kpler' in ClassifyName or 'kpler' in ClassifyName:
|
||
return False
|
||
|
||
return True
|
||
|
||
def filter_pp_data(self,ClassifyName,data):
|
||
'''
|
||
指标名称保留规则
|
||
'''
|
||
|
||
# 包含 关键词 去除, 返回flase
|
||
# if any(keyword in data for keyword in ['运费','检修','波动率','地缘政治','股价',
|
||
# '同比','环比','环差','裂差','4WMA','变频','道琼斯','标普500','纳斯达克',
|
||
# '四周均值','名占比','残差','DMA',
|
||
# '连7-连9','4周平均','4周均值','滚动相关性','日本']):
|
||
# return False
|
||
# 包含 关键词 保留, 返回True
|
||
if any(keyword in data for keyword in ['拉丝']):
|
||
return True
|
||
|
||
|
||
|
||
# 检查需要的特征
|
||
# 去掉 期货市场 分类下的数据
|
||
if ClassifyName == '期货市场':
|
||
return False
|
||
else:
|
||
pass
|
||
|
||
# 保留 库存 下所有指标
|
||
if ClassifyName == '库存':
|
||
return True
|
||
else:
|
||
pass
|
||
|
||
# 保留 进出口 下所有指标
|
||
if ClassifyName == '进出口':
|
||
return True
|
||
else:
|
||
pass
|
||
|
||
# 保留 价差 下所有指标
|
||
if ClassifyName == '价差':
|
||
return True
|
||
else:
|
||
pass
|
||
|
||
# 保留 供应 下所有指标
|
||
if ClassifyName == '供应':
|
||
return True
|
||
else:
|
||
pass
|
||
|
||
|
||
# 保留 需求 下所有指标
|
||
if ClassifyName == '需求':
|
||
return True
|
||
else:
|
||
pass
|
||
|
||
|
||
return True
|
||
|
||
# 通过edbcode 获取指标数据
|
||
def edbcodegetdata(self,df,EdbCode,EdbName):
|
||
# 根据指标id,获取指标数据
|
||
url = self.edbcodedataurl+str(EdbCode)
|
||
# 发送GET请求
|
||
response = requests.get(url, headers=self.headers)
|
||
|
||
# 检查响应状态码
|
||
if response.status_code == 200:
|
||
data = response.json() # 假设接口返回的是JSON数据
|
||
all_data_items = data.get('Data')
|
||
# 列表转换为DataFrame
|
||
df3 = pd.DataFrame(all_data_items, columns=['DataTime', 'Value', 'UpdateTime'])
|
||
# df3 = pd.read_json(all_data_items, orient='records')
|
||
|
||
# 去掉UpdateTime 列
|
||
df3 = df3.drop(columns=['UpdateTime'])
|
||
# df3.set_index('DataTime')
|
||
df3.rename(columns={'Value': EdbName}, inplace=True)
|
||
# 将数据存储df1
|
||
df = pd.merge(df, df3, how='outer',on='DataTime',suffixes= ('', '_y'))
|
||
return df
|
||
|
||
else:
|
||
# 请求失败,打印错误信息
|
||
logger.info(f'Error: {response.status_code}, {response.text}')
|
||
# 主动抛出异常
|
||
raise Exception(f'Error: {response.status_code}, {response.text}')
|
||
|
||
def get_eta_api_yuanyou_data(self,data_set,dataset=''):
|
||
|
||
today = datetime.date.today().strftime("%Y-%m-%d")
|
||
|
||
# 定义你的headers,这里可以包含多个参数
|
||
self.headers = {
|
||
'nonce': self.signature.nonce, # 例如,一个认证令牌
|
||
'timestamp': str(self.signature.timestamp), # 自定义的header参数
|
||
'appid': self.signature.APPID, # 另一个自定义的header参数
|
||
'signature': self.signature.signature
|
||
}
|
||
|
||
# 从列表数据中获取指标名称,判断指标名称频度是否为日 ,如果是,则获取UniqueCode,然后获取指标数据,保存到xlat文件中的sheet表。
|
||
|
||
'''
|
||
df = sheetname 指标列表,存储 指标分类-指标名称-指标id-频度
|
||
df1 = sheetname 指标数据 ,存储 时间-指标名称1-指标名称2...
|
||
|
||
'''
|
||
|
||
# 构建新的DataFrame df df1
|
||
df = pd.DataFrame(columns=['指标分类', '指标名称', '指标id', '频度','指标来源','来源id'])
|
||
df1 = pd.DataFrame(columns=['DataTime'])
|
||
|
||
|
||
# 外网环境无法访问,请确认是否为内网环境
|
||
try:
|
||
# 发送GET请求 获取指标分类列表
|
||
response = requests.get(self.classifylisturl, headers=self.headers)
|
||
except requests.exceptions.RequestException as e:
|
||
raise Exception(f"请求失败,请确认是否为内网环境: {e}","\033[0m")
|
||
|
||
# 检查响应状态码
|
||
if response.status_code == 200:
|
||
# 获取成功, 处理响应内容
|
||
data = response.json() # 假设接口返回的是JSON数据
|
||
|
||
# 请求成功,处理响应内容
|
||
# logger.info(data.get('Data'))
|
||
# 定义你想要保留的固定值
|
||
fixed_value = 1214
|
||
|
||
# 遍历列表,只保留那些'category' key的值为固定值的数据项
|
||
filtered_data = [item for item in data.get('Data') if item.get('ParentId') == fixed_value]
|
||
|
||
#然后循环filtered_data去获取list数据,才能获取到想要获取的ClassifyId
|
||
n = 0
|
||
for item in filtered_data:
|
||
n+= 1
|
||
# if n>50:
|
||
# break
|
||
ClassifyId = item["ClassifyId"] #分类id,分类下的指标列表接口的请求参数
|
||
ClassifyName = item["ClassifyName"] #分类名称,要保存到df的指标分类列
|
||
# 根据分类id,获取指标列表
|
||
url = self.classifyidlisturl+str(ClassifyId)
|
||
response = requests.get(url, headers=self.headers)
|
||
if response.status_code == 200:
|
||
# logger.info(response.text)
|
||
data2 = response.json()
|
||
Data = data2.get('Data')
|
||
for i in Data:
|
||
# s+= 1
|
||
EdbCode = i.get('EdbCode')
|
||
EdbName = i.get('EdbName') # 指标名称,要保存到df2的指标名称列,df的指标名称列
|
||
Frequency = i.get('Frequency') # 频度,要保存到df的频度列
|
||
SourceName = i.get('SourceName') # 来源名称,要保存到df的频度列
|
||
Source = i.get('Source') # 来源ID,要保存到df的频度列
|
||
# 频度不是 日 或者 周的 跳过
|
||
if Frequency not in ['日度','周度','日','周']:
|
||
continue
|
||
|
||
# 只保留手工数据中,名称带有 海运出口 海运进口
|
||
if Source == 9 and not ('海运出口' in EdbName or '海运进口' in EdbName):
|
||
continue
|
||
|
||
# 不要wind数据
|
||
if Source == 2:
|
||
continue
|
||
|
||
|
||
# 判断名称是否需要保存
|
||
isSave = self.filter_yuanyou_data(ClassifyName,EdbName)
|
||
if isSave:
|
||
# 保存到df
|
||
# 保存频度 指标名称 分类 指标id 到 df
|
||
df2 = pd.DataFrame({'指标分类': ClassifyName, '指标名称': EdbName, '指标id': EdbCode, '频度': Frequency,'指标来源':SourceName,'来源id':Source},index=[0])
|
||
|
||
# df = pd.merge(df, df2, how='outer')
|
||
df = pd.concat([df, df2])
|
||
df1 = self.edbcodegetdata(df1,EdbCode,EdbName)
|
||
else:
|
||
logger.info(f'跳过指标 {EdbName}')
|
||
|
||
# 找到列表中不在指标列中的指标id,保存成新的list
|
||
new_list = [item for item in self.edbcodelist if item not in df['指标id'].tolist()]
|
||
logger.info(new_list)
|
||
# 遍历new_list,获取指标数据,保存到df1
|
||
for item in new_list:
|
||
logger.info(item)
|
||
# 将item 加入到 df['指标id']中
|
||
try:
|
||
itemname = edbcodenamedict[item]
|
||
except:
|
||
itemname = item
|
||
|
||
df1 = self.edbcodegetdata(df1,item,itemname)
|
||
df = pd.concat([df, pd.DataFrame({'指标分类': '其他', '指标名称': itemname, '指标id': item, '频度': '其他','指标来源':'其他','来源id':'其他'},index=[0])])
|
||
|
||
# 按时间排序
|
||
df1.sort_values('DataTime',inplace=True,ascending=False)
|
||
df1.rename(columns={'DataTime': 'date'},inplace=True)
|
||
# df1.dropna(inplace=True)
|
||
# 去掉大于今天日期的行
|
||
df1 = df1[df1['date'] <= datetime.datetime.now().strftime('%Y-%m-%d')]
|
||
logger.info(df1.head())
|
||
# logger.info(f'{df1.head()}')
|
||
# 保存到xlsx文件的sheet表
|
||
with pd.ExcelWriter(os.path.join(dataset,data_set)) as file:
|
||
df1.to_excel(file, sheet_name='指标数据', index=False)
|
||
df.to_excel(file, sheet_name='指标列表', index=False)
|
||
|
||
df_zhibiaoshuju = df1.copy()
|
||
df_zhibiaoliebiao = df.copy()
|
||
return df_zhibiaoshuju,df_zhibiaoliebiao
|
||
|
||
def get_eta_api_pp_data(self,data_set,dataset=''):
|
||
global ClassifyId
|
||
today = datetime.date.today().strftime("%Y-%m-%d")
|
||
|
||
# 定义你的headers,这里可以包含多个参数
|
||
self.headers = {
|
||
'nonce': self.signature.nonce, # 例如,一个认证令牌
|
||
'timestamp': str(self.signature.timestamp), # 自定义的header参数
|
||
'appid': self.signature.APPID, # 另一个自定义的header参数
|
||
'signature': self.signature.signature
|
||
}
|
||
|
||
# 从列表数据中获取指标名称,判断指标名称频度是否为日 ,如果是,则获取UniqueCode,然后获取指标数据,保存到xlat文件中的sheet表。
|
||
|
||
'''
|
||
df = sheetname 指标列表,存储 指标分类-指标名称-指标id-频度
|
||
df1 = sheetname 指标数据 ,存储 时间-指标名称1-指标名称2...
|
||
|
||
'''
|
||
|
||
# 构建新的DataFrame df df1
|
||
df = pd.DataFrame(columns=['指标分类', '指标名称', '指标id', '频度'])
|
||
df1 = pd.DataFrame(columns=['DataTime'])
|
||
|
||
|
||
# 外网环境无法访问,请确认是否为内网环境
|
||
try:
|
||
# 发送GET请求 获取指标分类列表
|
||
response = requests.get(self.classifylisturl, headers=self.headers)
|
||
except requests.exceptions.RequestException as e:
|
||
raise Exception(f"请求失败,请确认是否为内网环境: {e}","\033[0m")
|
||
|
||
# 检查响应状态码
|
||
if response.status_code == 200:
|
||
# 获取成功, 处理响应内容
|
||
data = response.json() # 假设接口返回的是JSON数据
|
||
|
||
# 请求成功,处理响应内容
|
||
# logger.info(data.get('Data'))
|
||
# 定义你想要保留的固定值
|
||
fixed_value = ClassifyId
|
||
|
||
# 遍历列表,只保留那些'category' key的值为固定值的数据项
|
||
filtered_data = [item for item in data.get('Data') if item.get('ParentId') == fixed_value]
|
||
|
||
#然后循环filtered_data去获取list数据,才能获取到想要获取的ClassifyId
|
||
n = 0
|
||
for item in filtered_data:
|
||
n+= 1
|
||
# if n>50:
|
||
# break
|
||
ClassifyId = item["ClassifyId"] #分类id,分类下的指标列表接口的请求参数
|
||
ClassifyName = item["ClassifyName"] #分类名称,要保存到df的指标分类列
|
||
# 根据分类id,获取指标列表
|
||
url = self.classifyidlisturl+str(ClassifyId)
|
||
response = requests.get(url, headers=self.headers)
|
||
if response.status_code == 200:
|
||
# logger.info(response.text)
|
||
data2 = response.json()
|
||
Data = data2.get('Data')
|
||
for i in Data:
|
||
# s+= 1
|
||
EdbCode = i.get('EdbCode')
|
||
EdbName = i.get('EdbName') # 指标名称,要保存到df2的指标名称列,df的指标名称列
|
||
Frequency = i.get('Frequency') # 频度,要保存到df的频度列
|
||
# 频度不是 日 或者 周的 跳过
|
||
if Frequency not in ['日度','周度','日','周']:
|
||
continue
|
||
|
||
# 判断名称是否需要保存
|
||
isSave = self.filter_pp_data(ClassifyName,EdbName)
|
||
if isSave:
|
||
# 保存到df
|
||
# 保存频度 指标名称 分类 指标id 到 df
|
||
df2 = pd.DataFrame({'指标分类': ClassifyName, '指标名称': EdbName, '指标id': EdbCode, '频度': Frequency},index=[0])
|
||
|
||
# df = pd.merge(df, df2, how='outer')
|
||
df = pd.concat([df, df2])
|
||
df1 = self.edbcodegetdata(df1,EdbCode,EdbName)
|
||
else:
|
||
logger.info(f'跳过指标 {EdbName}')
|
||
|
||
# 找到列表中不在指标列中的指标id,保存成新的list
|
||
new_list = [item for item in self.edbcodelist if item not in df['指标id'].tolist()]
|
||
logger.info(new_list)
|
||
# 遍历new_list,获取指标数据,保存到df1
|
||
for item in new_list:
|
||
logger.info(item)
|
||
# 将item 加入到 df['指标id']中
|
||
try:
|
||
itemname = edbcodenamedict[item]
|
||
except:
|
||
itemname = item
|
||
|
||
df1 = self.edbcodegetdata(df1,item,itemname)
|
||
df = pd.concat([df, pd.DataFrame({'指标分类': '其他', '指标名称': itemname, '指标id': item, '频度': '其他'},index=[0])])
|
||
|
||
# 按时间排序
|
||
df1.sort_values('DataTime',inplace=True,ascending=False)
|
||
df1.rename(columns={'DataTime': 'date'},inplace=True)
|
||
# df1.dropna(inplace=True)
|
||
# 去掉大于今天日期的行
|
||
df1 = df1[df1['date'] <= datetime.datetime.now().strftime('%Y-%m-%d')]
|
||
logger.info(df1.head())
|
||
# logger.info(f'{df1.head()}')
|
||
# 保存到xlsx文件的sheet表
|
||
with pd.ExcelWriter(os.path.join(dataset,data_set)) as file:
|
||
df1.to_excel(file, sheet_name='指标数据', index=False)
|
||
df.to_excel(file, sheet_name='指标列表', index=False)
|
||
|
||
df_zhibiaoshuju = df1.copy()
|
||
df_zhibiaoliebiao = df.copy()
|
||
return df_zhibiaoshuju,df_zhibiaoliebiao
|
||
|
||
def push_data(self,data):
|
||
|
||
today = datetime.date.today().strftime("%Y-%m-%d")
|
||
|
||
# 定义你的headers,这里可以包含多个参数
|
||
self.headers = {
|
||
'nonce': self.signature.nonce, # 例如,一个认证令牌
|
||
'timestamp': str(self.signature.timestamp), # 自定义的header参数
|
||
'appid': self.signature.APPID, # 另一个自定义的header参数
|
||
'signature': self.signature.signature
|
||
}
|
||
|
||
# 发送post请求 上传数据
|
||
logger.info('请求参数:',data)
|
||
response = requests.post(self.edbdatapushurl, headers=self.headers,data=json.dumps(data))
|
||
|
||
# 检查响应状态码
|
||
if response.status_code == 200:
|
||
data = response.json() # 假设接口返回的是JSON数据
|
||
|
||
logger.info('上传成功,响应为:', data)
|
||
|
||
else:
|
||
# 请求失败,打印错误信息
|
||
logger.info(f'Error: {response.status_code}, {response.text}')
|
||
# 主动抛出异常
|
||
raise Exception(f'Error: {response.status_code}, {response.text}')
|
||
|
||
def del_zhibiao(self,IndexCodeList):
|
||
today = datetime.date.today().strftime("%Y-%m-%d")
|
||
|
||
# 定义你的headers,这里可以包含多个参数
|
||
self.headers = {
|
||
'nonce': self.signature.nonce, # 例如,一个认证令牌
|
||
'timestamp': str(self.signature.timestamp), # 自定义的header参数
|
||
'appid': self.signature.APPID, # 另一个自定义的header参数
|
||
'signature': self.signature.signature
|
||
}
|
||
|
||
data = {
|
||
"IndexCodeList": IndexCodeList #指标编码列表
|
||
}
|
||
# 发送post请求 上传数据
|
||
response = requests.post(self.edbdeleteurl, headers=self.headers,data=json.dumps(data))
|
||
|
||
|
||
# 检查响应状态码
|
||
if response.status_code == 200:
|
||
data = response.json() # 假设接口返回的是JSON数据
|
||
|
||
logger.info('删除成功,响应为:', data)
|
||
|
||
else:
|
||
# 请求失败,打印错误信息
|
||
logger.info(f'Error: {response.status_code}, {response.text}')
|
||
# 主动抛出异常
|
||
raise Exception(f'Error: {response.status_code}, {response.text}')
|
||
|
||
def del_business(self,data):
|
||
''''
|
||
接口地址
|
||
https://console-docs.apipost.cn/preview/fce869601d0be1d9/9a637c2f9ed0c589?target_id=d3cafcbf-a68c-42b3-b105-7bbd0e95a9cd
|
||
|
||
请求体 body
|
||
{
|
||
"IndexCode": "W001067", //指标编码
|
||
"StartDate": "2020-04-20", //指标需要删除的开始日期(>=),如果开始日期和结束日期相等,那么就是删除该日期
|
||
"EndDate": "2024-05-28" //指标需要删除的结束日期(<=),如果开始日期和结束日期相等,那么就是删除该日期
|
||
}
|
||
'''
|
||
today = datetime.date.today().strftime("%Y-%m-%d")
|
||
|
||
# 定义你的headers,这里可以包含多个参数
|
||
self.headers = {
|
||
'nonce': self.signature.nonce, # 例如,一个认证令牌
|
||
'timestamp': str(self.signature.timestamp), # 自定义的header参数
|
||
'appid': self.signature.APPID, # 另一个自定义的header参数
|
||
'signature': self.signature.signature
|
||
}
|
||
|
||
|
||
# 发送post请求 上传数据
|
||
response = requests.post(self.edbbusinessurl, headers=self.headers,data=json.dumps(data))
|
||
|
||
|
||
# 检查响应状态码
|
||
if response.status_code == 200:
|
||
data = response.json() # 假设接口返回的是JSON数据
|
||
|
||
logger.info('删除成功,响应为:', data)
|
||
|
||
else:
|
||
# 请求失败,打印错误信息
|
||
logger.info(f'Error: {response.status_code}, {response.text}')
|
||
# 主动抛出异常
|
||
raise Exception(f'Error: {response.status_code}, {response.text}')
|
||
|
||
|
||
# 时间特征,年,月,一年的多少天,周几,第几周,第几季度,每月的第几天, 每季度的第几天,是否每月的第一天,是否每月的最后一天,是否每季度的第一天,是否每季度的最后一天,是否每年的第一天,是否每年的最后一天
|
||
def addtimecharacteristics(df,dataset):
|
||
"""
|
||
为输入的 DataFrame 添加日期相关信息列
|
||
|
||
参数:
|
||
df (pandas.DataFrame): 包含日期列 'ds' 的 DataFrame
|
||
|
||
返回:
|
||
pandas.DataFrame: 添加了相关列的 DataFrame
|
||
"""
|
||
df['year'] = df['ds'].dt.year
|
||
df['month'] = df['ds'].dt.month
|
||
df['day'] = df['ds'].dt.day
|
||
df['dayofweek'] = df['ds'].dt.dayofweek
|
||
df['weekofyear'] = df['ds'].dt.isocalendar().week
|
||
df['dayofyear'] = df['ds'].dt.dayofyear
|
||
df['quarternum'] = df['ds'].dt.quarter
|
||
# 将ds列转换为季度Period对象
|
||
df['quarter'] = df['ds'].dt.to_period('Q')
|
||
# 获取每个季度的开始日期
|
||
df['quarter_start'] = df['quarter'].dt.to_timestamp('s')
|
||
# 计算每个日期是所在季度的第几天
|
||
df['dayofquarter'] = (df['ds'] - df['quarter_start']).dt.days + 1
|
||
# 是否月初
|
||
df['is_month_start'] = df['ds'].dt.is_month_start.astype(int)
|
||
# 是否月末
|
||
df['is_month_end'] = df['ds'].dt.is_month_end.astype(int)
|
||
# 是否季度初
|
||
df['is_quarter_start'] = df['ds'].dt.is_quarter_start.astype(int)
|
||
# 是否季度末
|
||
df['is_quarter_end'] = df['ds'].dt.is_quarter_end.astype(int)
|
||
# 是否年初
|
||
df['is_year_start'] = df['ds'].dt.is_year_start.astype(int)
|
||
# 是否年末
|
||
df['is_year_end'] = df['ds'].dt.is_year_end.astype(int)
|
||
# 去掉 quarter_start quarter
|
||
df.drop(columns=['quarter_start','quarter'],inplace=True)
|
||
df.to_csv(os.path.join(dataset,'指标数据添加时间特征.csv'), index=False)
|
||
return df
|