# 导入模块 import pandas as pd import numpy as np import datetime import string import base64 import requests import random import time import re import os import hmac import hashlib import json import torch torch.set_float32_matmul_precision("high") import matplotlib.pyplot as plt #设置plt显示中文 plt.rcParams['font.sans-serif'] = ['SimHei'] # 用来正常显示中文标签 plt.rcParams['axes.unicode_minus'] = False # 用来正常显示负号 from datetime import timedelta # from config_jingbo import * from config_juxiting import * from sklearn import metrics from reportlab.pdfbase import pdfmetrics # 注册字体 from reportlab.pdfbase.ttfonts import TTFont # 字体类 from reportlab.platypus import Table, SimpleDocTemplate, Paragraph, Image # 报告内容相关类 from reportlab.lib.pagesizes import letter # 页面的标志尺寸(8.5*inch, 11*inch) from reportlab.lib.styles import getSampleStyleSheet # 文本样式 from reportlab.lib import colors # 颜色模块 from reportlab.graphics.charts.barcharts import VerticalBarChart # 图表类 from reportlab.graphics.charts.legends import Legend # 图例类 from reportlab.graphics.shapes import Drawing # 绘图工具 from reportlab.lib.units import cm # 单位:cm # 注册字体(提前准备好字体文件, 如果同一个文件需要多种字体可以注册多个) pdfmetrics.registerFont(TTFont('SimSun', 'SimSun.ttf')) #设置plt显示中文 plt.rcParams['font.sans-serif'] = ['SimHei'] # 用来正常显示中文标签 plt.rcParams['axes.unicode_minus'] = False # 用来正常显示负号 # 定义函数 def loadcsv(filename): # 读取csv文件 try: df = pd.read_csv(filename, encoding='utf-8') except UnicodeDecodeError: df = pd.read_csv(filename, encoding='gbk') return df def dateConvert(df, datecol='ds'): # 将date列转换为datetime类型 try: df[datecol] = pd.to_datetime(df[datecol],format=r'%Y-%m-%d') except: df[datecol] = pd.to_datetime(df[datecol],format=r'%Y/%m/%d') return df def calculate_kdj(data, n=9): ''' 给传进来的df 添加列: 波动率,最高,最低,k ,d ,j ''' data = data.sort_values(by='ds', ascending=True) # 因为没有高开低价格,利用每日波动率模拟当天最高价和最低价 data['pctchange'] = data['y'].pct_change() # 收益为0的用0.01 data['pctchange'] = data['pctchange'].replace(0,0.01) data.dropna(inplace=True) # 重置索引 data.reset_index(drop=True,inplace=True) data['high'] = data['y']* (1+abs(data['pctchange'])/2) data['low'] = data['y']* (1-abs(data['pctchange'])/2) low_list = data['y'].rolling(window=n, min_periods=1).min() high_list = data['y'].rolling(window=n, min_periods=1).max() rsv = ((data['y'] - low_list) / (high_list - low_list)) * 100 k = pd.Series(50, index=data.index) d = pd.Series(50, index=data.index) for i in range(1, len(data)): k[i] = (2/3 * k[i - 1]) + (1/3 * rsv[i]) d[i] = (2/3 * d[i - 1]) + (1/3 * k[i]) j = 3 * k - 2 * d data['K'] = k data['D'] = d data['J'] = j # 将包含 KDJ 指标的数据保存到新的 CSV 文件 data.to_csv('stock_data_with_kdj.csv', index=False) # data = data.dropna() return data # 上传报告 def get_head_auth_report(): login_res = requests.post(url=login_pushreport_url, json=login_data, timeout=(3, 5)) text = json.loads(login_res.text) if text["status"]: token = text["data"]["accessToken"] return token def upload_report_data(token, upload_data): upload_data = upload_data headers = {"Authorization": token} logger.info("报告上传中...") logger.info(f"token:{token}") logger.info(f"upload_data:{upload_data}" ) upload_res = requests.post(url=upload_url, headers=headers, json=upload_data, timeout=(3, 15)) upload_res = json.loads(upload_res.text) logger.info(upload_res) if upload_res: return upload_res else: logger.info("报告上传失败") return None # 统计特征频度 def featurePindu(dataset): # 读取文件 df = loadcsv(os.path.join(dataset,'未填充的特征数据.csv')) df['ds'] = pd.to_datetime(df['ds']) # 按ds正序排序,重置索引 df = df.sort_values(by='ds', ascending=True).reset_index(drop=True) # 统计特征频度 # 每列随机抽取10个值,计算出5个时间间隔,统计每个时间间隔的频度 columns = df.columns.to_list() columns.remove('ds') count_dict = {} for column in columns: # 获取每列时间间隔 values = df[[column,'ds']] values.dropna(inplace=True,axis=0) values=values.reset_index(drop=True) # 抽取20%个值 value = values.sample(frac=0.2) index = value.index next_index = index + 1 count = [] for i,j in zip(index, next_index): #通过索引计算日期差 try: count.append((values.loc[j,'ds'] - values.loc[i,'ds']).days) except: pass # 把31 换成 30 count = [30 if i == 31 else i for i in count] # 保留count中出现次数最多的数 try: count = max(set(count), key=count.count) except ValueError : logger.info(f'{column}列数据为空') continue # 存储到字典中 count_dict[column] = count df = pd.DataFrame(count_dict,index=['count']).T pindu_dfs = pd.DataFrame() # 根据count分组 # 输出特征频度统计 pindudict = {'1':'日度','3':'日度','7':'周度','30':'月度','90':'季度','180':'半年度','365':'年度'} for i in df.groupby('count'): # 获取 i[1] 的索引值 index = i[1].index pindu_df = pd.DataFrame() try: pindu_df[pindudict[str(i[0])]+f'({len(i[1])})'] = index except KeyError : pindu_df[str(i[0])+f'天({len(i[1])})'] = index # 合并到pindu_dfs pindu_dfs = pd.concat([pindu_dfs,pindu_df],axis=1) # nan替换为 ' ' pindu_dfs = pindu_dfs.fillna('') pindu_dfs.to_csv(os.path.join(dataset,'特征频度统计.csv'),index=False) logger.info(pindu_dfs) featureInfo = f'特征信息:总共有{len(columns)-2}个' for i in pindu_dfs.columns: featureInfo += f',{i}' featureInfo += ', 详看 附1、特征列表' featureInfo += ''' 数据特征工程: 1. 数据日期排序,新日期在最后 2. 删除空列,特征数据列没有值,就删除 3. 删除近两月不再更新值的指标 4. 非日度数据填充为日度数据,填充规则: -- 向后填充,举例:假设周五出现一个周度指标数据,那么在这之前的数据用上周五的数据 -- 向前填充,举例:采集数据开始日期为2018年1月1日,那么周度数据可能是2018年1月3日,那么3日的数据向前填充,使1日2日都有数值 数据特征相关性分析: ''' logger.info(featureInfo) with open(os.path.join(dataset,'特征频度统计.txt'), 'w', encoding='utf-8') as f: f.write(featureInfo) logger.info('*'*200) def featureAnalysis(df,dataset,y): # 特征筛选 import matplotlib.pyplot as plt # 选择特征和标签列 X = df.drop(['ds', 'y'], axis=1) # 特征集,排除时间戳和标签列 yy = df['y'] # 标签集 # 标签集自相关函数分析 from statsmodels.graphics.tsaplots import plot_acf plot_acf(yy, lags=30) plt.savefig(os.path.join(dataset,'指标数据自相关图.png')) plt.close() # 标签集偏自相关函数分析 from statsmodels.graphics.tsaplots import plot_pacf plot_pacf(yy, lags=30) plt.savefig(os.path.join(dataset,'指标数据偏自相关图.png')) plt.close() # 画 特征与价格散点图 # 删除所有*散点图.png for file in os.listdir(dataset): if file.endswith("散点图.png"): os.remove(os.path.join(dataset, file)) plt.rcParams['font.sans-serif'] = ['SimHei'] plt.rcParams['axes.unicode_minus'] = False plt.figure(figsize=(10, 10)) # # 遍历X每一列,和yy画散点图 , # for i, col in enumerate(X.columns): # plt.subplot(2, 2, i%4+1) # plt.scatter(X[col], yy) # plt.xlabel(col) # plt.ylabel(y) # plt.title(col) # if i % 4 == 3 or i == len(X.columns)-1: # plt.tight_layout() # plt.savefig(os.path.join(dataset,f'{i}指标数据特征与价格散点图.png')) # plt.close() def corr_feature(df): # 重新命名列名,列名排序,y在第一个 df.reindex(['y'] + sorted(df.columns.difference(['y']))) df_test = df.copy() # 取最后的220行 df_test = df_test.tail(220) # 去掉日期列 df_test = df_test.drop(columns=['ds']) # 不参与标准化 df_test_noscaler = df_test.copy() # 滞后处理备份 df_noscaler = df_test.copy() # 画出相关性热力图 df_test.to_csv(os.path.join(dataset,'同步相关性.csv')) corr = df_test.corr() # 保存相关系数 corr.to_csv(os.path.join(dataset,'同步相关性系数.csv')) # plt.figure(figsize=(10, 10)) # sns.heatmap(corr, annot=True, cmap='coolwarm') # plt.savefig('dataset/同步相关性热力图.png') # plt.show() # 读取滞后周期文件,更改特征 characteristic_period = pd.read_csv('dataset/特征滞后周期.csv',encoding='utf-8') # 去掉周期为0的行 characteristic_period = characteristic_period.drop(characteristic_period[characteristic_period['滞后周期'] == 0].index) for col in df.columns: # 跳过y列 if col in ['y']: continue # 特征滞后n个周期,计算与y的相关性 if col in characteristic_period['特征'].values: # 获取特征对应的周期 period = characteristic_period[characteristic_period['特征'] == col]['滞后周期'].values[0] # 滞后处理 df[col] = df[col].shift(period) df.to_csv(os.path.join(dataset,'滞后处理后的数据集.csv')) # corr_feture_noscaler = {} # 保存相关性最大的周期 # 遍历df_test的每一列,计算相关性 # for col in df_noscaler.columns: # # 跳过y列 # if col in ['y']: # continue # logger.info('特征:', col) # # 特征滞后n个周期,计算与y的相关性 # corr_dict = {} # try: # for i in range(0, 200): # if i == 0: # df_noscaler[col+'_'+str(i)] = df_noscaler[col] # else: # df_noscaler[col+'_'+str(i)] = df_noscaler[col].shift(i) # corr_dict[col+'_'+str(i)] = abs(df_noscaler[col+'_'+str(i)].corr(df_noscaler['y'])) # except : # logger.info('特征:', col, '滑动错误,请查看') # continue # 输出相关性最大的特征 # logger.info(max(corr_dict, key=corr_dict.get), corr_dict[max(corr_dict, key=corr_dict.get)]) # corr_feture_noscaler[col] = max(corr_dict, key=corr_dict.get).split('_')[-1] # 画出最相关性最大的特征和y的折线图 # plt.figure(figsize=(10, 5)) # plt.plot(df_noscaler[max(corr_dict, key=corr_dict.get)], label=max(corr_dict, key=corr_dict.get)) # # 设置双坐标轴 # ax1 = plt.gca() # ax2 = ax1.twinx() # ax2.plot(df_noscaler['y'], color='r', label='y') # plt.legend() # try: # plt.savefig('dataset/特征与y的折线图_'+max(corr_dict, key=corr_dict.get)+'.png') # except : # # :替换成_ # plt.savefig('dataset/特征与y的折线图_'+max(corr_dict, key=corr_dict.get).replace(':','_').replace('/','_').replace('(','_').replace(')','_')+'.png') # plt.close() # 结果保存到txt文件 # logger.info('不参与标准化的特征滞后相关性写入txt文件') # with open('dataset/不参与标准化的特征滞后相关性.txt', 'w') as f: # for key, value in corr_feture_noscaler.items(): # f.write('%s:%s\n' % (key, value)) # 遍历corr_feture_noscaler,更改df # colnames_noscaler = [] # for col in corr_feture_noscaler: # colname = col+'_'+corr_feture_noscaler[col] # if int(corr_feture_noscaler[col]) == 0: # continue # df_test_noscaler[colname] = df_test_noscaler[col].shift(int(corr_feture_noscaler[col])) # df_test_noscaler = df_test_noscaler.drop(columns=[col]) # colnames_noscaler.append(colname) # 去除有空值的行 # df_test_noscaler = df_test_noscaler.dropna() # df_test_noscaler.reindex(['y'] + sorted(df_test_noscaler.columns.difference(['y']))) # df_test_noscaler.to_csv('dataset/不参与标准化的特征滞后相关性.csv', index=False) # 画出相关性热力图 # corr = df_test_noscaler.corr() # 保存相关系数 # corr.to_csv(os.path.join(dataset,'不参与标准化的特征滞后相关性系数.csv')) # plt.figure(figsize=(10, 10)) # sns.heatmap(corr, annot=True, cmap='coolwarm') # plt.savefig('dataset/不参与标准化的特征滞后相关性热力图.png') # plt.close() # # 标准化每列 # from sklearn.preprocessing import StandardScaler # scaler = StandardScaler() # df_test = pd.DataFrame(scaler.fit_transform(df_test), columns=df_test.columns) # corr_feture = {} # 保存相关性最大的周期 # # 遍历df_test的每一列,计算相关性 # for col in df_test.columns: # # 跳过y列 # if col == 'y': # continue # logger.info('特征:', col) # # 特征滞后n个周期,计算与y的相关性 # corr_dict = {} # try: # for i in range(0, 200): # if i == 0: # df_test[col+'_'+str(i)] = df_test[col] # else: # df_test[col+'_'+str(i)] = df_test[col].shift(i) # corr_dict[col+'_'+str(i)] = abs(df_test[col+'_'+str(i)].corr(df_test['y'])) # except : # logger.info('特征:', col, '滑动错误,请查看') # continue # # 输出相关性最大的特征 # logger.info(max(corr_dict, key=corr_dict.get), corr_dict[max(corr_dict, key=corr_dict.get)]) # corr_feture[col] = max(corr_dict, key=corr_dict.get).split('_')[-1] # # 结果保存到txt文件 # with open('dataset/标准化的特征滞后相关性.txt', 'w') as f: # for key, value in corr_feture.items(): # f.write('%s:%s\n' % (key, value)) # # 遍历corr_feture,更改df # colnames = [] # for col in corr_feture: # colname = col+'_'+corr_feture[col] # if int(corr_feture[col]) == 0: # continue # df[colname] = df[col].shift(int(corr_feture[col])) # df = df.drop(columns=[col]) # colnames.append(colname) # # 去除有空值的行 # df = df.dropna() # df.reindex(['y'] + sorted(df.columns.difference(['y']))) # df.to_csv('dataset/标准化后的特征滞后相关性.csv', index=False) # # 画出相关性热力图 # ds = df['ds'] # df = df.drop(columns=['ds']) # corr = df.corr() # # 保存相关系数 # corr.to_csv(os.path.join(dataset,'标准化后的特征滞后相关性系数.csv')) # plt.figure(figsize=(10, 10)) # sns.heatmap(corr, annot=True, cmap='coolwarm') # plt.savefig('dataset/标准化后的特征滞后相关性热力图.png') # plt.show() # df['ds'] = ds # 去除nan值 df = df.dropna() return df def calculate_kdj(data, n=9): ''' 给传进来的df 添加列: 波动率,最高,最低,k ,d ,j ''' data = data.sort_values(by='ds', ascending=True) # 因为没有高开低价格,利用每日波动率模拟当天最高价和最低价 data['pctchange'] = data['y'].pct_change() # 收益为0的用0.01 data['pctchange'] = data['pctchange'].replace(0,0.01) data.dropna(inplace=True) # 重置索引 data.reset_index(drop=True,inplace=True) data['high'] = data['y']* (1+abs(data['pctchange'])/2) data['low'] = data['y']* (1-abs(data['pctchange'])/2) low_list = data['y'].rolling(window=n, min_periods=1).min() high_list = data['y'].rolling(window=n, min_periods=1).max() rsv = ((data['y'] - low_list) / (high_list - low_list)) * 100 k = pd.Series(50, index=data.index) d = pd.Series(50, index=data.index) for i in range(1, len(data)): k[i] = (2/3 * k[i - 1]) + (1/3 * rsv[i]) d[i] = (2/3 * d[i - 1]) + (1/3 * k[i]) j = 3 * k - 2 * d data['K'] = k data['D'] = d data['J'] = j # 将包含 KDJ 指标的数据保存到新的 CSV 文件 data.to_csv('dataset\stock_data_with_kdj.csv', index=False) # data = data.dropna() return data def datachuli(df_zhibiaoshuju,df_zhibiaoliebiao,datecol='date',end_time='',y='y',dataset='dataset',delweekenday=False,add_kdj=False,is_timefurture=False): df = df_zhibiaoshuju.copy() if end_time == '': end_time = datetime.datetime.now().strftime('%Y-%m-%d') # date转为pddate df.rename(columns={datecol:'ds'},inplace=True) # 重命名预测列 df.rename(columns={y:'y'},inplace=True) # 按时间顺序排列 df.sort_values(by='ds',inplace=True) df['ds'] = pd.to_datetime(df['ds']) # 获取2018年到当前日期的数据 df = df[df['ds'].dt.year >= 2018] # 获取小于等于当前日期的数据 df = df[df['ds'] <= end_time] logger.info(f'删除两月不更新特征前数据量:{df.shape}') # 去掉近最后数据对应的日期在两月以前的列,删除近2月的数据是常熟的列 current_date = datetime.datetime.now() two_months_ago = current_date - timedelta(days=40) def check_column(col_name): if 'ds' in col_name or 'y' in col_name: return False df_check_column = df[['ds',col_name]] df_check_column = df_check_column.dropna() if len(df_check_column) == 0: return True if df_check_column[(df_check_column['ds']>= two_months_ago)].groupby(col_name).ngroups < 2: return True corresponding_date = df_check_column.iloc[-1]['ds'] return corresponding_date < two_months_ago columns_to_drop = df.columns[df.columns.map(check_column)].tolist() df = df.drop(columns = columns_to_drop) logger.info(f'删除两月不更新特征后数据量:{df.shape}') # 删除预测列空值的行 df = df.dropna(subset=['y']) logger.info(f'删除预测列为空值的行后数据量:{df.shape}') df = df.dropna(axis=1, how='all') logger.info(f'删除全为空值的列后数据量:{df.shape}') df.to_csv(os.path.join(dataset,'未填充的特征数据.csv'),index=False) # 去掉指标列表中的columns_to_drop的行 df_zhibiaoliebiao = df_zhibiaoliebiao[df_zhibiaoliebiao['指标名称'].isin(df.columns.tolist())] df_zhibiaoliebiao.to_csv(os.path.join(dataset,'特征处理后的指标名称及分类.csv'),index=False) # 频度分析 featurePindu(dataset=dataset) # 向上填充 df = df.ffill() # 向下填充 df = df.bfill() # 删除周六日的数据 if delweekenday: df = df[df['ds'].dt.weekday < 5] if add_kdj: df = calculate_kdj(df) if is_timefurture: df = addtimecharacteristics(df=df,dataset=dataset) featureAnalysis(df,dataset=dataset,y=y) return df def getdata(filename, datecol='date',y='y',dataset='',add_kdj=False,is_timefurture=False,end_time=''): logger.info('getdata接收:'+filename+' '+datecol+' '+end_time) # 判断后缀名 csv或excel if filename.endswith('.csv'): df = loadcsv(filename) else: # 读取excel 指标数据 df_zhibiaoshuju = pd.read_excel(filename,sheet_name='指标数据') df_zhibiaoliebiao = pd.read_excel(filename,sheet_name='指标列表') # 日期字符串转为datatime df = datachuli(df_zhibiaoshuju,df_zhibiaoliebiao,datecol,y = y,dataset=dataset,add_kdj=add_kdj,is_timefurture=is_timefurture,end_time=end_time) return df # def filter_data(ClassifyName,data): # ''' # 指标名称保留规则 # ''' # # 包含 关键词 去除, 返回flase # if any(keyword in data for keyword in ['运费','检修','波动率','地缘政治','股价', # '同比','环比','环差','裂差','4WMA','变频','道琼斯','标普500','纳斯达克', # '四周均值','名占比','残差','DMA', # '连7-连9','4周平均','4周均值','滚动相关性','日本']): # return False # # 检查需要的特征 # # 去掉 分析 分类下的数据 # if ClassifyName == '分析': # return False # # 保留 库存中特殊关键词 # if ClassifyName == '库存': # if any(keyword in data for keyword in ['原油' , '美国' ,'全球' ,'中国' ,'富查伊拉','ARA' ]): # return True # else: # pass # else: # pass # # 去掉 持仓中不是基金的数据 # if ClassifyName == '持仓': # if '基金' not in data: # return False # else: # pass # else: # pass # # 去掉 航班中不是中国、美国 的数据 # if ClassifyName == '需求': # if '航班' in data : # if '中国' in data or '美国' in data : # return True # else: # return False # else: # pass # else: # pass # # 分类为 期货市场,同质性数据取第一个 # if ClassifyName == '期货市场': # # 去掉c1-9 以后的 # if 'c1-c' in data: # try: # c = int(data.split('c1-c')[1]) # except: # return False # if c > 9 : # return False # else: # pass # else: # pass # # 判断 同质性数据, 字符串开头 # strstartdict = {'ICE Brent c':"ICE Brent c14", # 'NYMWX WTI c':"NYMWX WTI c5", # 'INE SC c':"INE SC c1", # 'EFS c':"EFS c", # 'Dubai Swap c':"Dubai Swap c1", # 'Oman Swap c':"Oman Swap c1", # 'DME Oman c':"DME Oman c1", # 'Murban Futures c':"Murban Futures c1", # 'Dubai连合约价格':'Dubai连1合约价格', # '美国RBOB期货月份合约价格':'美国RBOB期货2309月份合约价格', # 'Brent连合约价格':'Brent连1合约价格', # 'WTI连合约价格':'WTI连1合约价格', # '布伦特连合约价格':'Brent连1合约价格', # 'Brent 连合约价格':'Brent连1合约价格', # 'Dubai连合约价格':'Dubai连1合约价格', # 'Brent连':'Brent连1合约价格', # 'brent连':'Brent连1合约价格', # } # # 判断名称字符串开头是否在 strstartdict.keys中 # match = re.match(r'([a-zA-Z\s]+)(\d+)', data) # if match: # part1 = match.group(1) # part2 = match.group(2) # if part1 in [i for i in strstartdict.keys()]: # if data == strstartdict[part1]: # return True # else: # return False # # data = 'Brent 连7合约价格' # # 判断名称字符串去掉数字后是否在 strstartdict.keys中 # match = re.findall(r'\D+', data) # if match : # if len(match) == 2: # part1 = match[0] # part2 = match[1] # if part1+part2 in [i for i in strstartdict.keys()]: # if data == strstartdict[part1+part2]: # return True # else: # return False # else: # pass # elif len(match) == 1: # match = re.findall(r'\D+', data) # part1 = match[0] # if part1 in [i for i in strstartdict.keys()]: # if data == strstartdict[part1]: # return True # else: # return False # else: # pass # else: # pass # return True def sanitize_filename(filename): # 使用正则表达式替换不合规的字符 # 这里我们替换为下划线'_',但你可以根据需要选择其他字符 sanitized = re.sub(r'[\\/*?:"<>|\s]', '_', filename) # 移除开头的点(在某些系统中,以点开头的文件可能是隐藏的) sanitized = re.sub(r'^\.', '', sanitized) # 如果需要,可以添加更多替换规则 return sanitized class BinanceAPI: ''' 获取 Binance API 请求头签名 ''' def __init__(self, APPID, SECRET): self.APPID = APPID self.SECRET = SECRET self.get_signature() # 生成随机字符串作为 nonce def generate_nonce(self, length=32): self.nonce = ''.join(random.choices(string.ascii_letters + string.digits, k=length)) return self.nonce # 获取当前时间戳(秒) def get_timestamp(self): return int(time.time()) # 构建待签名字符串 def build_sign_str(self): return f'appid={self.APPID}&nonce={self.nonce}×tamp={self.timestamp}' # 使用 HMAC SHA-256 计算签名 def calculate_signature(self, secret, message): return base64.urlsafe_b64encode(hmac.new(secret.encode('utf-8'), message.encode('utf-8'), hashlib.sha256).digest()).decode('utf-8') def get_signature(self): # 调用上述方法生成签名 self.nonce = self.generate_nonce() self.timestamp = self.get_timestamp() self.sign_str = self.build_sign_str() self.signature = self.calculate_signature(self.SECRET, self.sign_str) # return self.signature class Graphs: # 绘制标题 @staticmethod def draw_title(title: str): # 获取所有样式表 style = getSampleStyleSheet() # 拿到标题样式 ct = style['Heading1'] # 单独设置样式相关属性 ct.fontName = 'SimSun' # 字体名 ct.fontSize = 18 # 字体大小 ct.leading = 50 # 行间距 ct.textColor = colors.green # 字体颜色 ct.alignment = 1 # 居中 ct.bold = True # 创建标题对应的段落,并且返回 return Paragraph(title, ct) # 绘制小标题 @staticmethod def draw_little_title(title: str): # 获取所有样式表 style = getSampleStyleSheet() # 拿到标题样式 ct = style['Normal'] # 单独设置样式相关属性 ct.fontName = 'SimSun' # 字体名 ct.fontSize = 15 # 字体大小 ct.leading = 30 # 行间距 ct.textColor = colors.red # 字体颜色 # 创建标题对应的段落,并且返回 return Paragraph(title, ct) # 绘制普通段落内容 @staticmethod def draw_text(text: str): # 获取所有样式表 style = getSampleStyleSheet() # 获取普通样式 ct = style['Normal'] ct.fontName = 'SimSun' ct.fontSize = 12 ct.wordWrap = 'CJK' # 设置自动换行 ct.alignment = 0 # 左对齐 ct.firstLineIndent = 32 # 第一行开头空格 ct.leading = 25 return Paragraph(text, ct) # 绘制表格 @staticmethod def draw_table(*args): # 列宽度 col_width = args[0] style = [ ('FONTNAME', (0, 0), (-1, -1), 'SimSun'), # 字体 ('FONTSIZE', (0, 0), (-1, 0), 12), # 第一行的字体大小 ('FONTSIZE', (0, 1), (-1, -1), 10), # 第二行到最后一行的字体大小 ('BACKGROUND', (0, 0), (-1, 0), '#d5dae6'), # 设置第一行背景颜色 ('ALIGN', (0, 0), (-1, -1), 'CENTER'), # 第一行水平居中 ('ALIGN', (0, 1), (-1, -1), 'LEFT'), # 第二行到最后一行左右左对齐 ('VALIGN', (0, 0), (-1, -1), 'MIDDLE'), # 所有表格上下居中对齐 ('TEXTCOLOR', (0, 0), (-1, -1), colors.darkslategray), # 设置表格内文字颜色 ('GRID', (0, 0), (-1, -1), 0.5, colors.grey), # 设置表格框线为grey色,线宽为0.5 # ('SPAN', (0, 1), (0, 2)), # 合并第一列二三行 # ('SPAN', (0, 3), (0, 4)), # 合并第一列三四行 # ('SPAN', (0, 5), (0, 6)), # 合并第一列五六行 # ('SPAN', (0, 7), (0, 8)), # 合并第一列五六行 ] table = Table(args[1:], colWidths=col_width, style=style) return table # 创建图表 @staticmethod def draw_bar(bar_data: list, ax: list, items: list): drawing = Drawing(500, 250) bc = VerticalBarChart() bc.x = 45 # 整个图表的x坐标 bc.y = 45 # 整个图表的y坐标 bc.height = 200 # 图表的高度 bc.width = 350 # 图表的宽度 bc.data = bar_data bc.strokeColor = colors.black # 顶部和右边轴线的颜色 bc.valueAxis.valueMin = 5000 # 设置y坐标的最小值 bc.valueAxis.valueMax = 26000 # 设置y坐标的最大值 bc.valueAxis.valueStep = 2000 # 设置y坐标的步长 bc.categoryAxis.labels.dx = 2 bc.categoryAxis.labels.dy = -8 bc.categoryAxis.labels.angle = 20 bc.categoryAxis.categoryNames = ax # 图示 leg = Legend() leg.fontName = 'SimSun' leg.alignment = 'right' leg.boxAnchor = 'ne' leg.x = 475 # 图例的x坐标 leg.y = 240 leg.dxTextSpace = 10 leg.columnMaximum = 3 leg.colorNamePairs = items drawing.add(leg) drawing.add(bc) return drawing # 绘制图片 @staticmethod def draw_img(path): img = Image(path) # 读取指定路径下的图片 img.drawWidth = 20*cm # 设置图片的宽度 img.drawHeight = 10*cm # 设置图片的高度 return img # 定义样式函数 def style_row(row): if '周' in row['频度']: return ['background-color: yellow'] * len(row) else: return ['background-color: gray'] * len(row) class EtaReader(): def __init__(self,signature,classifylisturl,classifyidlisturl,edbcodedataurl,edbcodelist,edbdatapushurl,edbdeleteurl,edbbusinessurl): # 获取签名 self.signature = signature self.classifylisturl = classifylisturl self.classifyidlisturl = classifyidlisturl self.edbcodedataurl = edbcodedataurl self.edbdatapushurl = edbdatapushurl self.edbcodelist = edbcodelist self.edbdeleteurl = edbdeleteurl self.edbbusinessurl = edbbusinessurl pass def filter_yuanyou_data(self,ClassifyName,data): ''' 指标名称保留规则 ''' # 包含 关键词 去除, 返回flase if any(keyword in data for keyword in ['运费','检修','波动率','地缘政治','股价', '同比','环比','环差','裂差','4WMA','变频','道琼斯','标普500','纳斯达克', '四周均值','名占比','残差','DMA', '连7-连9','4周平均','4周均值','滚动相关性','日本']): return False # 检查需要的特征 # 去掉 分析 分类下的数据 if ClassifyName == '分析': return False # 保留 库存中特殊关键词 if ClassifyName == '库存': if any(keyword in data for keyword in ['原油' , '美国' ,'全球' ,'中国' ,'富查伊拉','ARA' ]): return True else: pass else: pass # 去掉 持仓中不是基金的数据 if ClassifyName == '持仓': if '基金' not in data: return False else: pass else: pass # 去掉 航班中不是中国、美国 的数据 if ClassifyName == '需求': if '航班' in data : if '中国' in data or '美国' in data : return True else: return False else: pass else: pass # 分类为 期货市场,同质性数据取第一个 if ClassifyName == '期货市场': # 去掉c1-9 以后的 if 'c1-c' in data: try: c = int(data.split('c1-c')[1]) except: return False if c > 9 : return False else: pass else: pass # 判断 同质性数据, 字符串开头 strstartdict = {'ICE Brent c':"ICE Brent c14", 'NYMWX WTI c':"NYMWX WTI c5", 'INE SC c':"INE SC c1", 'EFS c':"EFS c", 'Dubai Swap c':"Dubai Swap c1", 'Oman Swap c':"Oman Swap c1", 'DME Oman c':"DME Oman c1", 'Murban Futures c':"Murban Futures c1", 'Dubai连合约价格':'Dubai连1合约价格', '美国RBOB期货月份合约价格':'美国RBOB期货2309月份合约价格', 'Brent连合约价格':'Brent连1合约价格', 'WTI连合约价格':'WTI连1合约价格', '布伦特连合约价格':'Brent连1合约价格', 'Brent 连合约价格':'Brent连1合约价格', 'Dubai连合约价格':'Dubai连1合约价格', 'Brent连':'Brent连1合约价格', 'brent连':'Brent连1合约价格', } # 判断名称字符串开头是否在 strstartdict.keys中 match = re.match(r'([a-zA-Z\s]+)(\d+)', data) if match: part1 = match.group(1) part2 = match.group(2) if part1 in [i for i in strstartdict.keys()]: if data == strstartdict[part1]: return True else: return False # data = 'Brent 连7合约价格' # 判断名称字符串去掉数字后是否在 strstartdict.keys中 match = re.findall(r'\D+', data) if match : if len(match) == 2: part1 = match[0] part2 = match[1] if part1+part2 in [i for i in strstartdict.keys()]: if data == strstartdict[part1+part2]: return True else: return False else: pass elif len(match) == 1: match = re.findall(r'\D+', data) part1 = match[0] if part1 in [i for i in strstartdict.keys()]: if data == strstartdict[part1]: return True else: return False else: pass else: pass return True def filter_pp_data(self,ClassifyName,data): ''' 指标名称保留规则 ''' # 包含 关键词 去除, 返回flase # if any(keyword in data for keyword in ['运费','检修','波动率','地缘政治','股价', # '同比','环比','环差','裂差','4WMA','变频','道琼斯','标普500','纳斯达克', # '四周均值','名占比','残差','DMA', # '连7-连9','4周平均','4周均值','滚动相关性','日本']): # return False # 包含 关键词 保留, 返回True if any(keyword in data for keyword in ['拉丝']): return True # 检查需要的特征 # 去掉 期货市场 分类下的数据 if ClassifyName == '期货市场': return False else: pass # 保留 库存 下所有指标 if ClassifyName == '库存': return True else: pass # 保留 进出口 下所有指标 if ClassifyName == '进出口': return True else: pass # 保留 价差 下所有指标 if ClassifyName == '价差': return True else: pass # 保留 供应 下所有指标 if ClassifyName == '供应': return True else: pass # 保留 需求 下所有指标 if ClassifyName == '需求': return True else: pass return True # 通过edbcode 获取指标数据 def edbcodegetdata(self,df,EdbCode,EdbName): # 根据指标id,获取指标数据 url = self.edbcodedataurl+str(EdbCode) # 发送GET请求 response = requests.get(url, headers=self.headers) # 检查响应状态码 if response.status_code == 200: data = response.json() # 假设接口返回的是JSON数据 all_data_items = data.get('Data') # 列表转换为DataFrame df3 = pd.DataFrame(all_data_items, columns=['DataTime', 'Value', 'UpdateTime']) # df3 = pd.read_json(all_data_items, orient='records') # 去掉UpdateTime 列 df3 = df3.drop(columns=['UpdateTime']) # df3.set_index('DataTime') df3.rename(columns={'Value': EdbName}, inplace=True) # 将数据存储df1 df = pd.merge(df, df3, how='outer',on='DataTime',suffixes= ('', '_y')) return df else: # 请求失败,打印错误信息 logger.info(f'Error: {response.status_code}, {response.text}') # 主动抛出异常 raise Exception(f'Error: {response.status_code}, {response.text}') def get_eta_api_yuanyou_data(self,data_set,dataset=''): today = datetime.date.today().strftime("%Y-%m-%d") # 定义你的headers,这里可以包含多个参数 self.headers = { 'nonce': self.signature.nonce, # 例如,一个认证令牌 'timestamp': str(self.signature.timestamp), # 自定义的header参数 'appid': self.signature.APPID, # 另一个自定义的header参数 'signature': self.signature.signature } # 从列表数据中获取指标名称,判断指标名称频度是否为日 ,如果是,则获取UniqueCode,然后获取指标数据,保存到xlat文件中的sheet表。 ''' df = sheetname 指标列表,存储 指标分类-指标名称-指标id-频度 df1 = sheetname 指标数据 ,存储 时间-指标名称1-指标名称2... ''' # 构建新的DataFrame df df1 df = pd.DataFrame(columns=['指标分类', '指标名称', '指标id', '频度']) df1 = pd.DataFrame(columns=['DataTime']) # 外网环境无法访问,请确认是否为内网环境 try: # 发送GET请求 获取指标分类列表 response = requests.get(self.classifylisturl, headers=self.headers) except requests.exceptions.RequestException as e: raise Exception(f"请求失败,请确认是否为内网环境: {e}","\033[0m") # 检查响应状态码 if response.status_code == 200: # 获取成功, 处理响应内容 data = response.json() # 假设接口返回的是JSON数据 # 请求成功,处理响应内容 # logger.info(data.get('Data')) # 定义你想要保留的固定值 fixed_value = 1214 # 遍历列表,只保留那些'category' key的值为固定值的数据项 filtered_data = [item for item in data.get('Data') if item.get('ParentId') == fixed_value] #然后循环filtered_data去获取list数据,才能获取到想要获取的ClassifyId n = 0 for item in filtered_data: n+= 1 # if n>50: # break ClassifyId = item["ClassifyId"] #分类id,分类下的指标列表接口的请求参数 ClassifyName = item["ClassifyName"] #分类名称,要保存到df的指标分类列 # 根据分类id,获取指标列表 url = self.classifyidlisturl+str(ClassifyId) response = requests.get(url, headers=self.headers) if response.status_code == 200: # logger.info(response.text) data2 = response.json() Data = data2.get('Data') for i in Data: # s+= 1 EdbCode = i.get('EdbCode') EdbName = i.get('EdbName') # 指标名称,要保存到df2的指标名称列,df的指标名称列 Frequency = i.get('Frequency') # 频度,要保存到df的频度列 # 频度不是 日 或者 周的 跳过 if Frequency not in ['日度','周度','日','周']: continue # 判断名称是否需要保存 isSave = self.filter_yuanyou_data(ClassifyName,EdbName) if isSave: # 保存到df # 保存频度 指标名称 分类 指标id 到 df df2 = pd.DataFrame({'指标分类': ClassifyName, '指标名称': EdbName, '指标id': EdbCode, '频度': Frequency},index=[0]) # df = pd.merge(df, df2, how='outer') df = pd.concat([df, df2]) df1 = self.edbcodegetdata(df1,EdbCode,EdbName) else: logger.info(f'跳过指标 {EdbName}') # 找到列表中不在指标列中的指标id,保存成新的list new_list = [item for item in self.edbcodelist if item not in df['指标id'].tolist()] logger.info(new_list) # 遍历new_list,获取指标数据,保存到df1 for item in new_list: logger.info(item) # 将item 加入到 df['指标id']中 try: itemname = edbcodenamedict[item] except: itemname = item df1 = self.edbcodegetdata(df1,item,itemname) df = pd.concat([df, pd.DataFrame({'指标分类': '其他', '指标名称': itemname, '指标id': item, '频度': '其他'},index=[0])]) # 按时间排序 df1.sort_values('DataTime',inplace=True,ascending=False) df1.rename(columns={'DataTime': 'date'},inplace=True) # df1.dropna(inplace=True) # 去掉大于今天日期的行 df1 = df1[df1['date'] <= datetime.datetime.now().strftime('%Y-%m-%d')] logger.info(df1.head()) # logger.info(f'{df1.head()}') # 保存到xlsx文件的sheet表 with pd.ExcelWriter(os.path.join(dataset,data_set)) as file: df1.to_excel(file, sheet_name='指标数据', index=False) df.to_excel(file, sheet_name='指标列表', index=False) df_zhibiaoshuju = df1.copy() df_zhibiaoliebiao = df.copy() return df_zhibiaoshuju,df_zhibiaoliebiao def get_eta_api_pp_data(self,data_set,dataset=''): global ClassifyId today = datetime.date.today().strftime("%Y-%m-%d") # 定义你的headers,这里可以包含多个参数 self.headers = { 'nonce': self.signature.nonce, # 例如,一个认证令牌 'timestamp': str(self.signature.timestamp), # 自定义的header参数 'appid': self.signature.APPID, # 另一个自定义的header参数 'signature': self.signature.signature } # 从列表数据中获取指标名称,判断指标名称频度是否为日 ,如果是,则获取UniqueCode,然后获取指标数据,保存到xlat文件中的sheet表。 ''' df = sheetname 指标列表,存储 指标分类-指标名称-指标id-频度 df1 = sheetname 指标数据 ,存储 时间-指标名称1-指标名称2... ''' # 构建新的DataFrame df df1 df = pd.DataFrame(columns=['指标分类', '指标名称', '指标id', '频度']) df1 = pd.DataFrame(columns=['DataTime']) # 外网环境无法访问,请确认是否为内网环境 try: # 发送GET请求 获取指标分类列表 response = requests.get(self.classifylisturl, headers=self.headers) except requests.exceptions.RequestException as e: raise Exception(f"请求失败,请确认是否为内网环境: {e}","\033[0m") # 检查响应状态码 if response.status_code == 200: # 获取成功, 处理响应内容 data = response.json() # 假设接口返回的是JSON数据 # 请求成功,处理响应内容 # logger.info(data.get('Data')) # 定义你想要保留的固定值 fixed_value = ClassifyId # 遍历列表,只保留那些'category' key的值为固定值的数据项 filtered_data = [item for item in data.get('Data') if item.get('ParentId') == fixed_value] #然后循环filtered_data去获取list数据,才能获取到想要获取的ClassifyId n = 0 for item in filtered_data: n+= 1 # if n>50: # break ClassifyId = item["ClassifyId"] #分类id,分类下的指标列表接口的请求参数 ClassifyName = item["ClassifyName"] #分类名称,要保存到df的指标分类列 # 根据分类id,获取指标列表 url = self.classifyidlisturl+str(ClassifyId) response = requests.get(url, headers=self.headers) if response.status_code == 200: # logger.info(response.text) data2 = response.json() Data = data2.get('Data') for i in Data: # s+= 1 EdbCode = i.get('EdbCode') EdbName = i.get('EdbName') # 指标名称,要保存到df2的指标名称列,df的指标名称列 Frequency = i.get('Frequency') # 频度,要保存到df的频度列 # 频度不是 日 或者 周的 跳过 if Frequency not in ['日度','周度','日','周']: continue # 判断名称是否需要保存 isSave = self.filter_pp_data(ClassifyName,EdbName) if isSave: # 保存到df # 保存频度 指标名称 分类 指标id 到 df df2 = pd.DataFrame({'指标分类': ClassifyName, '指标名称': EdbName, '指标id': EdbCode, '频度': Frequency},index=[0]) # df = pd.merge(df, df2, how='outer') df = pd.concat([df, df2]) df1 = self.edbcodegetdata(df1,EdbCode,EdbName) else: logger.info(f'跳过指标 {EdbName}') # 找到列表中不在指标列中的指标id,保存成新的list new_list = [item for item in self.edbcodelist if item not in df['指标id'].tolist()] logger.info(new_list) # 遍历new_list,获取指标数据,保存到df1 for item in new_list: logger.info(item) # 将item 加入到 df['指标id']中 try: itemname = edbcodenamedict[item] except: itemname = item df1 = self.edbcodegetdata(df1,item,itemname) df = pd.concat([df, pd.DataFrame({'指标分类': '其他', '指标名称': itemname, '指标id': item, '频度': '其他'},index=[0])]) # 按时间排序 df1.sort_values('DataTime',inplace=True,ascending=False) df1.rename(columns={'DataTime': 'date'},inplace=True) # df1.dropna(inplace=True) # 去掉大于今天日期的行 df1 = df1[df1['date'] <= datetime.datetime.now().strftime('%Y-%m-%d')] logger.info(df1.head()) # logger.info(f'{df1.head()}') # 保存到xlsx文件的sheet表 with pd.ExcelWriter(os.path.join(dataset,data_set)) as file: df1.to_excel(file, sheet_name='指标数据', index=False) df.to_excel(file, sheet_name='指标列表', index=False) df_zhibiaoshuju = df1.copy() df_zhibiaoliebiao = df.copy() return df_zhibiaoshuju,df_zhibiaoliebiao def push_data(self,data): today = datetime.date.today().strftime("%Y-%m-%d") # 定义你的headers,这里可以包含多个参数 self.headers = { 'nonce': self.signature.nonce, # 例如,一个认证令牌 'timestamp': str(self.signature.timestamp), # 自定义的header参数 'appid': self.signature.APPID, # 另一个自定义的header参数 'signature': self.signature.signature } # 发送post请求 上传数据 logger.info('请求参数:',data) response = requests.post(self.edbdatapushurl, headers=self.headers,data=json.dumps(data)) # 检查响应状态码 if response.status_code == 200: data = response.json() # 假设接口返回的是JSON数据 logger.info('上传成功,响应为:', data) else: # 请求失败,打印错误信息 logger.info(f'Error: {response.status_code}, {response.text}') # 主动抛出异常 raise Exception(f'Error: {response.status_code}, {response.text}') def del_zhibiao(self,IndexCodeList): today = datetime.date.today().strftime("%Y-%m-%d") # 定义你的headers,这里可以包含多个参数 self.headers = { 'nonce': self.signature.nonce, # 例如,一个认证令牌 'timestamp': str(self.signature.timestamp), # 自定义的header参数 'appid': self.signature.APPID, # 另一个自定义的header参数 'signature': self.signature.signature } data = { "IndexCodeList": IndexCodeList #指标编码列表 } # 发送post请求 上传数据 response = requests.post(self.edbdeleteurl, headers=self.headers,data=json.dumps(data)) # 检查响应状态码 if response.status_code == 200: data = response.json() # 假设接口返回的是JSON数据 logger.info('删除成功,响应为:', data) else: # 请求失败,打印错误信息 logger.info(f'Error: {response.status_code}, {response.text}') # 主动抛出异常 raise Exception(f'Error: {response.status_code}, {response.text}') def del_business(self,data): '''' 接口地址 https://console-docs.apipost.cn/preview/fce869601d0be1d9/9a637c2f9ed0c589?target_id=d3cafcbf-a68c-42b3-b105-7bbd0e95a9cd 请求体 body { "IndexCode": "W001067", //指标编码 "StartDate": "2020-04-20", //指标需要删除的开始日期(>=),如果开始日期和结束日期相等,那么就是删除该日期 "EndDate": "2024-05-28" //指标需要删除的结束日期(<=),如果开始日期和结束日期相等,那么就是删除该日期 } ''' today = datetime.date.today().strftime("%Y-%m-%d") # 定义你的headers,这里可以包含多个参数 self.headers = { 'nonce': self.signature.nonce, # 例如,一个认证令牌 'timestamp': str(self.signature.timestamp), # 自定义的header参数 'appid': self.signature.APPID, # 另一个自定义的header参数 'signature': self.signature.signature } # 发送post请求 上传数据 response = requests.post(self.edbbusinessurl, headers=self.headers,data=json.dumps(data)) # 检查响应状态码 if response.status_code == 200: data = response.json() # 假设接口返回的是JSON数据 logger.info('删除成功,响应为:', data) else: # 请求失败,打印错误信息 logger.info(f'Error: {response.status_code}, {response.text}') # 主动抛出异常 raise Exception(f'Error: {response.status_code}, {response.text}') # 时间特征,年,月,一年的多少天,周几,第几周,第几季度,每月的第几天, 每季度的第几天,是否每月的第一天,是否每月的最后一天,是否每季度的第一天,是否每季度的最后一天,是否每年的第一天,是否每年的最后一天 def addtimecharacteristics(df,dataset): """ 为输入的 DataFrame 添加日期相关信息列 参数: df (pandas.DataFrame): 包含日期列 'ds' 的 DataFrame 返回: pandas.DataFrame: 添加了相关列的 DataFrame """ df['year'] = df['ds'].dt.year df['month'] = df['ds'].dt.month df['day'] = df['ds'].dt.day df['dayofweek'] = df['ds'].dt.dayofweek df['weekofyear'] = df['ds'].dt.isocalendar().week df['dayofyear'] = df['ds'].dt.dayofyear df['quarternum'] = df['ds'].dt.quarter # 将ds列转换为季度Period对象 df['quarter'] = df['ds'].dt.to_period('Q') # 获取每个季度的开始日期 df['quarter_start'] = df['quarter'].dt.to_timestamp('s') # 计算每个日期是所在季度的第几天 df['dayofquarter'] = (df['ds'] - df['quarter_start']).dt.days + 1 # 是否月初 df['is_month_start'] = df['ds'].dt.is_month_start.astype(int) # 是否月末 df['is_month_end'] = df['ds'].dt.is_month_end.astype(int) # 是否季度初 df['is_quarter_start'] = df['ds'].dt.is_quarter_start.astype(int) # 是否季度末 df['is_quarter_end'] = df['ds'].dt.is_quarter_end.astype(int) # 是否年初 df['is_year_start'] = df['ds'].dt.is_year_start.astype(int) # 是否年末 df['is_year_end'] = df['ds'].dt.is_year_end.astype(int) # 去掉 quarter_start quarter df.drop(columns=['quarter_start','quarter'],inplace=True) df.to_csv(os.path.join(dataset,'指标数据添加时间特征.csv'), index=False) return df