# 导入模块 import pandas as pd import numpy as np import datetime import string import base64 import requests import random import time import re import os import hmac import hashlib import json import math import torch torch.set_float32_matmul_precision("high") import matplotlib.pyplot as plt #设置plt显示中文 plt.rcParams['font.sans-serif'] = ['SimHei'] # 用来正常显示中文标签 plt.rcParams['axes.unicode_minus'] = False # 用来正常显示负号 from datetime import timedelta from sklearn import metrics from reportlab.pdfbase import pdfmetrics # 注册字体 from reportlab.pdfbase.ttfonts import TTFont # 字体类 from reportlab.platypus import Table, SimpleDocTemplate, Paragraph, Image # 报告内容相关类 from reportlab.lib.pagesizes import letter # 页面的标志尺寸(8.5*inch, 11*inch) from reportlab.lib.styles import getSampleStyleSheet # 文本样式 from reportlab.lib import colors # 颜色模块 from reportlab.graphics.charts.barcharts import VerticalBarChart # 图表类 from reportlab.graphics.charts.legends import Legend # 图例类 from reportlab.graphics.shapes import Drawing # 绘图工具 from reportlab.lib.units import cm # 单位:cm # 注册字体(提前准备好字体文件, 如果同一个文件需要多种字体可以注册多个) pdfmetrics.registerFont(TTFont('SimSun', 'SimSun.ttf')) #设置plt显示中文 plt.rcParams['font.sans-serif'] = ['SimHei'] # 用来正常显示中文标签 plt.rcParams['axes.unicode_minus'] = False # 用来正常显示负号 # from config_jingbo_pro import * from config_jingbo import * # from config_juxiting import * # 定义函数 def loadcsv(filename): """ 读取指定文件名的 CSV 文件。 如果文件编码为 UTF-8,则使用 UTF-8 编码读取;否则,使用 GBK 编码读取。 参数: filename (str): 要读取的 CSV 文件的文件名。 返回: pandas.DataFrame: 读取的数据。 """ # 读取csv文件 try: df = pd.read_csv(filename, encoding='utf-8') except UnicodeDecodeError: df = pd.read_csv(filename, encoding='gbk') return df def dateConvert(df, datecol='ds'): """ 将数据框 df 中的 datecol 列转换为日期时间类型。 参数: df (pandas.DataFrame): 要转换的 DataFrame。 datecol (str): 要转换的列名,默认为 'ds'。 返回: pandas.DataFrame: 转换后的 DataFrame。 """ # 将date列转换为datetime类型 try: df[datecol] = pd.to_datetime(df[datecol],format=r'%Y-%m-%d') except: df[datecol] = pd.to_datetime(df[datecol],format=r'%Y/%m/%d') return df def calculate_kdj(data, n=9): ''' 给传进来的df 添加列: 波动率,最高,最低,k,d ,j ''' # 对数据按照日期升序排序 data = data.sort_values(by='ds', ascending=True) # 因为没有高开低价格,利用每日波动率模拟当天最高价和最低价 data['pctchange'] = data['y'].pct_change() # 收益为0的用0.01 data['pctchange'] = data['pctchange'].replace(0,0.01) # 去除空值 data.dropna(inplace=True) # 重置索引 data.reset_index(drop=True,inplace=True) # 计算最高价和最低价 data['high'] = data['y']* (1+abs(data['pctchange'])/2) data['low'] = data['y']* (1-abs(data['pctchange'])/2) # 计算n日内最低价 low_list = data['y'].rolling(window=n, min_periods=1).min() # 计算n日内最高价 high_list = data['y'].rolling(window=n, min_periods=1).max() # 计算未成熟随机值 rsv = ((data['y'] - low_list) / (high_list - low_list)) * 100 # 初始化k值为50 k = pd.Series(50, index=data.index) # 初始化d值为50 d = pd.Series(50, index=data.index) # 计算k值和d值 for i in range(1, len(data)): k[i] = (2/3 * k[i - 1]) + (1/3 * rsv[i]) d[i] = (2/3 * d[i - 1]) + (1/3 * k[i]) # 计算j值 j = 3 * k - 2 * d # 将k值、d值和j值添加到数据中 data['K'] = k data['D'] = d data['J'] = j # 将包含 KDJ 指标的数据保存到新的 CSV 文件 data.to_csv('stock_data_with_kdj.csv', index=False) # data = data.dropna() return data # 上传报告 def get_head_auth_report(): """ 通过 POST 请求登录到指定的 URL,并从响应中获取认证令牌。 返回: str: 如果登录成功,返回认证令牌;否则返回 None。 """ # 发送 POST 请求到登录 URL,携带登录数据 login_res = requests.post(url=login_pushreport_url, json=login_data, timeout=(3, 5)) # 将响应内容转换为 JSON 格式 text = json.loads(login_res.text) # 如果响应状态为成功 if text["status"]: # 从响应数据中获取认证令牌 token = text["data"]["accessToken"] # 返回认证令牌 return token def upload_report_data(token, upload_data): """ 上传报告数据到指定的URL 参数: token (str): 认证令牌 upload_data (dict): 要上传的报告数据,包含必要的字段和信息 返回: dict: 如果上传成功,返回响应对象;否则返回None """ # 直接使用传入的 upload_data upload_data = upload_data # 设置请求头部 headers = {"Authorization": token} # 打印日志,显示正在上传报告数据 logger.info("报告上传中...") # 打印日志,显示认证头部信息 logger.info(f"token:{token}") # 打印日志,显示要上传的报告数据 logger.info(f"upload_data:{upload_data}" ) # 发送POST请求,上传报告数据 upload_res = requests.post(url=upload_url, headers=headers, json=upload_data, timeout=(3, 15)) # 将响应内容转换为 JSON 格式 upload_res = json.loads(upload_res.text) # 打印日志,显示响应内容 logger.info(upload_res) # 如果上传成功,返回响应对象 if upload_res: return upload_res # 如果上传失败,打印日志并返回None else: logger.info("报告上传失败") return None def upload_warning_data(warning_data): """ 上传预警数据到指定的URL 参数: warning_data (dict): 要上传的预警数据,包含必要的字段和信息 返回: requests.Response: 如果上传成功,返回响应对象;否则返回None """ # 获取认证头部信息 token = get_head_auth_report() # 设置请求头部 headers = {"Authorization": token} # 打印日志,显示正在上传预警数据 logger.info("预警上传中...") # 打印日志,显示上传的URL logger.info(f"upload_warning_url:{upload_warning_url}") # 打印日志,显示认证头部信息 logger.info(f"token:{token}") # 打印日志,显示要上传的预警数据 logger.info(f"warning_data:{warning_data}") # 发送POST请求,上传预警数据 upload_res = requests.post(url=upload_warning_url, headers=headers, json=warning_data, timeout=(3, 15)) # 如果上传成功,返回响应对象 if upload_res: return upload_res # 如果上传失败,打印日志并返回None else: logger.info("预警上传失败") return None def upload_warning_info(df_count): """ 上传预警信息到指定的URL 参数: df_count (int): 停更的数量 返回: None """ # 打印日志,显示正在上传预警信息 logger.info(f'上传预警信息') try: # 获取当前日期 warning_date = datetime.datetime.now().strftime('%Y-%m-%d') # 构建预警内容 content = f'{warning_date}有{df_count}个停更' # 更新预警数据中的日期和内容 warning_data['data']['WARNING_DATE'] = warning_date warning_data['data']['WARNING_CONTENT'] = content # 调用 upload_warning_data 函数上传预警数据 upload_warning_data(warning_data) # 打印日志,显示上传预警信息成功 logger.info(f'上传预警信息成功') except Exception as e: # 打印日志,显示上传预警信息失败,并记录异常信息 logger.error(f'上传预警信息失败:{e}') def create_feature_last_update_time(df): """ 计算特征停更信息用 参数: df (DataFrame): 包含特征数据的 DataFrame 返回: DataFrame: 包含特征停更信息的 DataFrame str: y 列的最后更新时间 """ df1 = df.copy() # 找到每列的最后更新时间 df1.set_index('ds', inplace=True) last_update_times = df1.apply(lambda x: x.dropna().index.max().strftime('%Y-%m-%d') if not x.dropna().empty else None) # 保存每列的最后更新时间到文件 last_update_times_df = pd.DataFrame(columns = ['feature', 'last_update_time','is_value','update_period','warning_date','stop_update_period']) # 打印每列的最后更新时间 for column, last_update_time in last_update_times.items(): values = [] # 判断是不是常数值 if df1[column].tail(20).nunique() == 1: values = values + [column, last_update_time,1] else: values = values + [column, last_update_time,0] # 计算特征数据值的时间差 try: # 计算预警日期 time_diff = (df1[column].dropna().index.to_series().diff().mode()[0]).total_seconds() / 3600 / 24 last_update_time_datetime = datetime.datetime.strptime(last_update_time, '%Y-%m-%d') last_update_date = end_time if end_time != '' else datetime.datetime.now().strftime('%Y-%m-%d') end_time_datetime = datetime.datetime.strptime(last_update_date, '%Y-%m-%d') early_warning_date = last_update_time_datetime + timedelta(days=time_diff)*2 + timedelta(days=1) stop_update_period = int(math.ceil((end_time_datetime-last_update_time_datetime).days / time_diff)) early_warning_date = early_warning_date.strftime('%Y-%m-%d') except KeyError: time_diff = 0 early_warning_date = end_time continue values = values + [time_diff,early_warning_date,stop_update_period] last_update_times_df.loc[len(last_update_times_df)] = values logger.info(f"Column {column} was last updated at {last_update_time}") y_last_update_time = last_update_times_df[last_update_times_df['feature']=='y']['warning_date'].values[0] last_update_times_df.to_csv(os.path.join(dataset,'last_update_times.csv'), index=False) logger.info('特征停更信息保存到文件:last_update_times.csv') return last_update_times_df,y_last_update_time # 统计特征频度 def featurePindu(dataset): # 读取文件 df = loadcsv(os.path.join(dataset,'未填充的特征数据.csv')) df['ds'] = pd.to_datetime(df['ds']) # 按ds正序排序,重置索引 df = df.sort_values(by='ds', ascending=True).reset_index(drop=True) # 统计特征频度 # 每列随机抽取10个值,计算出5个时间间隔,统计每个时间间隔的频度 columns = df.columns.to_list() columns.remove('ds') count_dict = {} for column in columns: # 获取每列时间间隔 values = df[[column,'ds']] values.dropna(inplace=True,axis=0) values=values.reset_index(drop=True) # 抽取20%个值 value = values.sample(frac=0.2) index = value.index next_index = index + 1 count = [] for i,j in zip(index, next_index): #通过索引计算日期差 try: count.append((values.loc[j,'ds'] - values.loc[i,'ds']).days) except: pass # 把31 换成 30 count = [30 if i == 31 else i for i in count] # 保留count中出现次数最多的数 try: count = max(set(count), key=count.count) except ValueError : logger.info(f'{column}列数据为空') continue # 存储到字典中 count_dict[column] = count df = pd.DataFrame(count_dict,index=['count']).T pindu_dfs = pd.DataFrame() # 根据count分组 # 输出特征频度统计 pindudict = {'1':'日度','3':'日度','7':'周度','30':'月度','90':'季度','180':'半年度','365':'年度'} for i in df.groupby('count'): # 获取 i[1] 的索引值 index = i[1].index pindu_df = pd.DataFrame() try: pindu_df[pindudict[str(i[0])]+f'({len(i[1])})'] = index except KeyError : pindu_df[str(i[0])+f'天({len(i[1])})'] = index # 合并到pindu_dfs pindu_dfs = pd.concat([pindu_dfs,pindu_df],axis=1) # nan替换为 ' ' pindu_dfs = pindu_dfs.fillna('') pindu_dfs.to_csv(os.path.join(dataset,'特征频度统计.csv'),index=False) logger.info(pindu_dfs) featureInfo = f'特征信息:总共有{len(columns)-2}个' for i in pindu_dfs.columns: featureInfo += f',{i}' featureInfo += ', 详看 附1、特征列表' featureInfo += ''' 数据特征工程: 1. 数据日期排序,新日期在最后 2. 删除空列,特征数据列没有值,就删除 3. 删除近两月不再更新值的指标 4. 非日度数据填充为日度数据,填充规则: -- 向后填充,举例:假设周五出现一个周度指标数据,那么在这之前的数据用上周五的数据 -- 向前填充,举例:采集数据开始日期为2018年1月1日,那么周度数据可能是2018年1月3日,那么3日的数据向前填充,使1日2日都有数值 数据特征相关性分析: ''' logger.info(featureInfo) with open(os.path.join(dataset,'特征频度统计.txt'), 'w', encoding='utf-8') as f: f.write(featureInfo) logger.info('*'*200) def featureAnalysis(df,dataset,y): # 特征筛选 import matplotlib.pyplot as plt # 选择特征和标签列 X = df.drop(['ds', 'y'], axis=1) # 特征集,排除时间戳和标签列 yy = df['y'] # 标签集 # 标签集自相关函数分析 from statsmodels.graphics.tsaplots import plot_acf plot_acf(yy, lags=30) plt.savefig(os.path.join(dataset,'指标数据自相关图.png')) plt.close() # 标签集偏自相关函数分析 from statsmodels.graphics.tsaplots import plot_pacf plot_pacf(yy, lags=30) plt.savefig(os.path.join(dataset,'指标数据偏自相关图.png')) plt.close() # 画 特征与价格散点图 # 删除所有*散点图.png for file in os.listdir(dataset): if file.endswith("散点图.png"): os.remove(os.path.join(dataset, file)) plt.rcParams['font.sans-serif'] = ['SimHei'] plt.rcParams['axes.unicode_minus'] = False plt.figure(figsize=(10, 10)) # # 遍历X每一列,和yy画散点图 , # for i, col in enumerate(X.columns): # plt.subplot(2, 2, i%4+1) # plt.scatter(X[col], yy) # plt.xlabel(col) # plt.ylabel(y) # plt.title(col) # if i % 4 == 3 or i == len(X.columns)-1: # plt.tight_layout() # plt.savefig(os.path.join(dataset,f'{i}指标数据特征与价格散点图.png')) # plt.close() def corr_feature(df): # 重新命名列名,列名排序,y在第一个 df.reindex(['y'] + sorted(df.columns.difference(['y']))) df_test = df.copy() # 取最后的220行 df_test = df_test.tail(220) # 去掉日期列 df_test = df_test.drop(columns=['ds']) # 不参与标准化 df_test_noscaler = df_test.copy() # 滞后处理备份 df_noscaler = df_test.copy() # 画出相关性热力图 df_test.to_csv(os.path.join(dataset,'同步相关性.csv')) corr = df_test.corr() # 保存相关系数 corr.to_csv(os.path.join(dataset,'同步相关性系数.csv')) # plt.figure(figsize=(10, 10)) # sns.heatmap(corr, annot=True, cmap='coolwarm') # plt.savefig('dataset/同步相关性热力图.png') # plt.show() # 读取滞后周期文件,更改特征 characteristic_period = pd.read_csv('dataset/特征滞后周期.csv',encoding='utf-8') # 去掉周期为0的行 characteristic_period = characteristic_period.drop(characteristic_period[characteristic_period['滞后周期'] == 0].index) for col in df.columns: # 跳过y列 if col in ['y']: continue # 特征滞后n个周期,计算与y的相关性 if col in characteristic_period['特征'].values: # 获取特征对应的周期 period = characteristic_period[characteristic_period['特征'] == col]['滞后周期'].values[0] # 滞后处理 df[col] = df[col].shift(period) df.to_csv(os.path.join(dataset,'滞后处理后的数据集.csv')) # corr_feture_noscaler = {} # 保存相关性最大的周期 # 遍历df_test的每一列,计算相关性 # for col in df_noscaler.columns: # # 跳过y列 # if col in ['y']: # continue # logger.info('特征:', col) # # 特征滞后n个周期,计算与y的相关性 # corr_dict = {} # try: # for i in range(0, 200): # if i == 0: # df_noscaler[col+'_'+str(i)] = df_noscaler[col] # else: # df_noscaler[col+'_'+str(i)] = df_noscaler[col].shift(i) # corr_dict[col+'_'+str(i)] = abs(df_noscaler[col+'_'+str(i)].corr(df_noscaler['y'])) # except : # logger.info('特征:', col, '滑动错误,请查看') # continue # 输出相关性最大的特征 # logger.info(max(corr_dict, key=corr_dict.get), corr_dict[max(corr_dict, key=corr_dict.get)]) # corr_feture_noscaler[col] = max(corr_dict, key=corr_dict.get).split('_')[-1] # 画出最相关性最大的特征和y的折线图 # plt.figure(figsize=(10, 5)) # plt.plot(df_noscaler[max(corr_dict, key=corr_dict.get)], label=max(corr_dict, key=corr_dict.get)) # # 设置双坐标轴 # ax1 = plt.gca() # ax2 = ax1.twinx() # ax2.plot(df_noscaler['y'], color='r', label='y') # plt.legend() # try: # plt.savefig('dataset/特征与y的折线图_'+max(corr_dict, key=corr_dict.get)+'.png') # except : # # :替换成_ # plt.savefig('dataset/特征与y的折线图_'+max(corr_dict, key=corr_dict.get).replace(':','_').replace('/','_').replace('(','_').replace(')','_')+'.png') # plt.close() # 结果保存到txt文件 # logger.info('不参与标准化的特征滞后相关性写入txt文件') # with open('dataset/不参与标准化的特征滞后相关性.txt', 'w') as f: # for key, value in corr_feture_noscaler.items(): # f.write('%s:%s\n' % (key, value)) # 遍历corr_feture_noscaler,更改df # colnames_noscaler = [] # for col in corr_feture_noscaler: # colname = col+'_'+corr_feture_noscaler[col] # if int(corr_feture_noscaler[col]) == 0: # continue # df_test_noscaler[colname] = df_test_noscaler[col].shift(int(corr_feture_noscaler[col])) # df_test_noscaler = df_test_noscaler.drop(columns=[col]) # colnames_noscaler.append(colname) # 去除有空值的行 # df_test_noscaler = df_test_noscaler.dropna() # df_test_noscaler.reindex(['y'] + sorted(df_test_noscaler.columns.difference(['y']))) # df_test_noscaler.to_csv('dataset/不参与标准化的特征滞后相关性.csv', index=False) # 画出相关性热力图 # corr = df_test_noscaler.corr() # 保存相关系数 # corr.to_csv(os.path.join(dataset,'不参与标准化的特征滞后相关性系数.csv')) # plt.figure(figsize=(10, 10)) # sns.heatmap(corr, annot=True, cmap='coolwarm') # plt.savefig('dataset/不参与标准化的特征滞后相关性热力图.png') # plt.close() # # 标准化每列 # from sklearn.preprocessing import StandardScaler # scaler = StandardScaler() # df_test = pd.DataFrame(scaler.fit_transform(df_test), columns=df_test.columns) # corr_feture = {} # 保存相关性最大的周期 # # 遍历df_test的每一列,计算相关性 # for col in df_test.columns: # # 跳过y列 # if col == 'y': # continue # logger.info('特征:', col) # # 特征滞后n个周期,计算与y的相关性 # corr_dict = {} # try: # for i in range(0, 200): # if i == 0: # df_test[col+'_'+str(i)] = df_test[col] # else: # df_test[col+'_'+str(i)] = df_test[col].shift(i) # corr_dict[col+'_'+str(i)] = abs(df_test[col+'_'+str(i)].corr(df_test['y'])) # except : # logger.info('特征:', col, '滑动错误,请查看') # continue # # 输出相关性最大的特征 # logger.info(max(corr_dict, key=corr_dict.get), corr_dict[max(corr_dict, key=corr_dict.get)]) # corr_feture[col] = max(corr_dict, key=corr_dict.get).split('_')[-1] # # 结果保存到txt文件 # with open('dataset/标准化的特征滞后相关性.txt', 'w') as f: # for key, value in corr_feture.items(): # f.write('%s:%s\n' % (key, value)) # # 遍历corr_feture,更改df # colnames = [] # for col in corr_feture: # colname = col+'_'+corr_feture[col] # if int(corr_feture[col]) == 0: # continue # df[colname] = df[col].shift(int(corr_feture[col])) # df = df.drop(columns=[col]) # colnames.append(colname) # # 去除有空值的行 # df = df.dropna() # df.reindex(['y'] + sorted(df.columns.difference(['y']))) # df.to_csv('dataset/标准化后的特征滞后相关性.csv', index=False) # # 画出相关性热力图 # ds = df['ds'] # df = df.drop(columns=['ds']) # corr = df.corr() # # 保存相关系数 # corr.to_csv(os.path.join(dataset,'标准化后的特征滞后相关性系数.csv')) # plt.figure(figsize=(10, 10)) # sns.heatmap(corr, annot=True, cmap='coolwarm') # plt.savefig('dataset/标准化后的特征滞后相关性热力图.png') # plt.show() # df['ds'] = ds # 去除nan值 df = df.dropna() return df def calculate_kdj(data, n=9): ''' 给传进来的df 添加列: 波动率,最高,最低,k ,d ,j ''' data = data.sort_values(by='ds', ascending=True) # 因为没有高开低价格,利用每日波动率模拟当天最高价和最低价 data['pctchange'] = data['y'].pct_change() # 收益为0的用0.01 data['pctchange'] = data['pctchange'].replace(0,0.01) data.dropna(inplace=True) # 重置索引 data.reset_index(drop=True,inplace=True) data['high'] = data['y']* (1+abs(data['pctchange'])/2) data['low'] = data['y']* (1-abs(data['pctchange'])/2) low_list = data['y'].rolling(window=n, min_periods=1).min() high_list = data['y'].rolling(window=n, min_periods=1).max() rsv = ((data['y'] - low_list) / (high_list - low_list)) * 100 k = pd.Series(50, index=data.index) d = pd.Series(50, index=data.index) for i in range(1, len(data)): k[i] = (2/3 * k[i - 1]) + (1/3 * rsv[i]) d[i] = (2/3 * d[i - 1]) + (1/3 * k[i]) j = 3 * k - 2 * d data['K'] = k data['D'] = d data['J'] = j # 将包含 KDJ 指标的数据保存到新的 CSV 文件 data.to_csv('dataset\stock_data_with_kdj.csv', index=False) # data = data.dropna() return data def check_column(df,col_name,two_months_ago): ''' 检查列是否需要删除。 该函数会检查列是否为空值列、180天没有更新的列或常数值列。 参数: col_name (str): 列名。 df (DataFrame): 包含列的 DataFrame。 返回: bool: 如果列需要删除,返回 True;否则,返回 False。 ''' if 'ds' in col_name or 'y' in col_name: return False df_check_column = df[['ds',col_name,'y']] df_check_column = df_check_column.dropna() if len(df_check_column) == 0: print(f'空值列:{col_name}') return True # 判断是不是常数列 if df_check_column[(df_check_column['ds']>= two_months_ago)].groupby(col_name).ngroups < 2: print(f'180没有更新:{col_name}') return True # 判断相关系数大于0.6 if is_del_corr > 0: if abs(df_check_column[col_name].corr(df_check_column['y'])) < is_del_corr: print(f'相关系数小于0.6:{col_name}') return True corresponding_date = df_check_column.iloc[-1]['ds'] return corresponding_date < two_months_ago def datachuli(df_zhibiaoshuju,df_zhibiaoliebiao,datecol='date',end_time='',y='y',dataset='dataset',delweekenday=False,add_kdj=False,is_timefurture=False): ''' 原油特征数据处理函数, 接收的是两个df,一个是指标数据,一个是指标列表 输出的是一个df,包含ds,y,指标列 ''' df = df_zhibiaoshuju.copy() if end_time == '': end_time = datetime.datetime.now().strftime('%Y-%m-%d') # 重命名时间列,预测列 df.rename(columns={datecol:'ds'},inplace=True) df.rename(columns={y:'y'},inplace=True) # 按时间顺序排列 df.sort_values(by='ds',inplace=True) df['ds'] = pd.to_datetime(df['ds']) # 获取start_year年到end_time的数据 df = df[df['ds'].dt.year >= start_year] df = df[df['ds'] <= end_time] # last_update_times_df,y_last_update_time = create_feature_last_update_time(df) # logger.info(f'删除预警的特征前数据量:{df.shape}') # columns_to_drop = last_update_times_df[last_update_times_df['warning_date'] < y_last_update_time ]['feature'].values.tolist() # df = df.drop(columns = columns_to_drop) # logger.info(f'删除预警的特征后数据量:{df.shape}') # if is_update_warning_data: # upload_warning_info(last_update_times_df,y_last_update_time) # 去掉近最后数据对应的日期在六月以前的列,删除近2月的数据是常熟的列 current_date = datetime.datetime.now() two_months_ago = current_date - timedelta(days=180) logger.info(f'删除两月不更新特征前数据量:{df.shape}') columns_to_drop = [] for clo in df.columns: if check_column(df,clo,two_months_ago): columns_to_drop.append(clo) df = df.drop(columns=columns_to_drop) logger.info(f'删除两月不更新特征后数据量:{df.shape}') if freq == 'W': # 按周取样 df = df.resample('W', on='ds').mean().reset_index() elif freq == 'M': # 按月取样 df = df.resample('M', on='ds').mean().reset_index() # 删除预测列空值的行 df = df.dropna(subset=['y']) logger.info(f'删除预测列为空值的行后数据量:{df.shape}') df = df.dropna(axis=1, how='all') logger.info(f'删除全为空值的列后数据量:{df.shape}') df.to_csv(os.path.join(dataset,'未填充的特征数据.csv'),index=False) # 去掉指标列表中的columns_to_drop的行 df_zhibiaoliebiao = df_zhibiaoliebiao[df_zhibiaoliebiao['指标名称'].isin(df.columns.tolist())] df_zhibiaoliebiao.to_csv(os.path.join(dataset,'特征处理后的指标名称及分类.csv'),index=False) # 数据频度分析 featurePindu(dataset=dataset) # 向上填充 df = df.ffill() # 向下填充 df = df.bfill() # 删除周六日的数据 if delweekenday: df = df[df['ds'].dt.weekday < 5] # kdj指标 if add_kdj: df = calculate_kdj(df) # 衍生时间特征 if is_timefurture: df = addtimecharacteristics(df=df,dataset=dataset) # 特征分析 featureAnalysis(df,dataset=dataset,y=y) return df def datachuli_juxiting(df_zhibiaoshuju,df_zhibiaoliebiao,datecol='date',end_time='',y='y',dataset='dataset',delweekenday=False,add_kdj=False,is_timefurture=False): ''' 聚烯烃特征数据处理函数, 接收的是两个df,一个是指标数据,一个是指标列表 输出的是一个df,包含ds,y,指标列 ''' df = df_zhibiaoshuju.copy() if end_time == '': end_time = datetime.datetime.now().strftime('%Y-%m-%d') # date转为pddate df.rename(columns={datecol:'ds'},inplace=True) # 指定列统一减少数值 df[offsite_col] = df[offsite_col]-offsite # 预测列为avg_cols的均值 df[y] = df[avg_cols].mean(axis=1) # 去掉多余的列avg_cols df = df.drop(columns=avg_cols) # 重命名预测列 df.rename(columns={y:'y'},inplace=True) # 按时间顺序排列 df.sort_values(by='ds',inplace=True) df['ds'] = pd.to_datetime(df['ds']) # 获取2018年到当前日期的数据 df = df[df['ds'].dt.year >= 2018] # 获取小于等于当前日期的数据 df = df[df['ds'] <= end_time] logger.info(f'删除两月不更新特征前数据量:{df.shape}') # 去掉近最后数据对应的日期在两月以前的列,删除近2月的数据是常数的列 current_date = datetime.datetime.now() two_months_ago = current_date - timedelta(days=40) # 检查两月不更新的特征 def check_column(col_name): if 'ds' in col_name or 'y' in col_name: return False df_check_column = df[['ds',col_name]] df_check_column = df_check_column.dropna() if len(df_check_column) == 0: return True if df_check_column[(df_check_column['ds']>= two_months_ago)].groupby(col_name).ngroups < 2: return True corresponding_date = df_check_column.iloc[-1]['ds'] return corresponding_date < two_months_ago columns_to_drop = df.columns[df.columns.map(check_column)].tolist() df = df.drop(columns = columns_to_drop) logger.info(f'删除两月不更新特征后数据量:{df.shape}') # 删除预测列空值的行 df = df.dropna(subset=['y']) logger.info(f'删除预测列为空值的行后数据量:{df.shape}') df = df.dropna(axis=1, how='all') logger.info(f'删除全为空值的列后数据量:{df.shape}') df.to_csv(os.path.join(dataset,'未填充的特征数据.csv'),index=False) # 去掉指标列表中的columns_to_drop的行 df_zhibiaoliebiao = df_zhibiaoliebiao[df_zhibiaoliebiao['指标名称'].isin(df.columns.tolist())] df_zhibiaoliebiao.to_csv(os.path.join(dataset,'特征处理后的指标名称及分类.csv'),index=False) # 频度分析 featurePindu(dataset=dataset) # 向上填充 df = df.ffill() # 向下填充 df = df.bfill() # 删除周六日的数据 if delweekenday: df = df[df['ds'].dt.weekday < 5] if add_kdj: df = calculate_kdj(df) if is_timefurture: df = addtimecharacteristics(df=df,dataset=dataset) featureAnalysis(df,dataset=dataset,y=y) return df def getdata(filename, datecol='date',y='y',dataset='',add_kdj=False,is_timefurture=False,end_time=''): logger.info('getdata接收:'+filename+' '+datecol+' '+end_time) # 判断后缀名 csv或excel if filename.endswith('.csv'): df = loadcsv(filename) else: # 读取excel 指标数据 df_zhibiaoshuju = pd.read_excel(filename,sheet_name='指标数据') df_zhibiaoliebiao = pd.read_excel(filename,sheet_name='指标列表') # 日期字符串转为datatime df = datachuli(df_zhibiaoshuju,df_zhibiaoliebiao,datecol,y = y,dataset=dataset,add_kdj=add_kdj,is_timefurture=is_timefurture,end_time=end_time) return df,df_zhibiaoliebiao def getdata_juxiting(filename, datecol='date',y='y',dataset='',add_kdj=False,is_timefurture=False,end_time=''): logger.info('getdata接收:'+filename+' '+datecol+' '+end_time) # 判断后缀名 csv或excel if filename.endswith('.csv'): df = loadcsv(filename) else: # 读取excel 指标数据 df_zhibiaoshuju = pd.read_excel(filename,sheet_name='指标数据') df_zhibiaoliebiao = pd.read_excel(filename,sheet_name='指标列表') # 日期字符串转为datatime df = datachuli_juxiting(df_zhibiaoshuju,df_zhibiaoliebiao,datecol,y = y,dataset=dataset,add_kdj=add_kdj,is_timefurture=is_timefurture,end_time=end_time) return df def sanitize_filename(filename): # 使用正则表达式替换不合规的字符 # 这里我们替换为下划线'_',但你可以根据需要选择其他字符 sanitized = re.sub(r'[\\/*?:"<>|\s]', '_', filename) # 移除开头的点(在某些系统中,以点开头的文件可能是隐藏的) sanitized = re.sub(r'^\.', '', sanitized) # 如果需要,可以添加更多替换规则 return sanitized class BinanceAPI: ''' 获取 Binance API 请求头签名 ''' def __init__(self, APPID, SECRET): self.APPID = APPID self.SECRET = SECRET self.get_signature() # 生成随机字符串作为 nonce def generate_nonce(self, length=32): self.nonce = ''.join(random.choices(string.ascii_letters + string.digits, k=length)) return self.nonce # 获取当前时间戳(秒) def get_timestamp(self): return int(time.time()) # 构建待签名字符串 def build_sign_str(self): return f'appid={self.APPID}&nonce={self.nonce}×tamp={self.timestamp}' # 使用 HMAC SHA-256 计算签名 def calculate_signature(self, secret, message): return base64.urlsafe_b64encode(hmac.new(secret.encode('utf-8'), message.encode('utf-8'), hashlib.sha256).digest()).decode('utf-8') def get_signature(self): # 调用上述方法生成签名 self.nonce = self.generate_nonce() self.timestamp = self.get_timestamp() self.sign_str = self.build_sign_str() self.signature = self.calculate_signature(self.SECRET, self.sign_str) # return self.signature class Graphs: # 绘制标题 @staticmethod def draw_title(title: str): # 获取所有样式表 style = getSampleStyleSheet() # 拿到标题样式 ct = style['Heading1'] # 单独设置样式相关属性 ct.fontName = 'SimSun' # 字体名 ct.fontSize = 18 # 字体大小 ct.leading = 50 # 行间距 ct.textColor = colors.green # 字体颜色 ct.alignment = 1 # 居中 ct.bold = True # 创建标题对应的段落,并且返回 return Paragraph(title, ct) # 绘制小标题 @staticmethod def draw_little_title(title: str): # 获取所有样式表 style = getSampleStyleSheet() # 拿到标题样式 ct = style['Normal'] # 单独设置样式相关属性 ct.fontName = 'SimSun' # 字体名 ct.fontSize = 15 # 字体大小 ct.leading = 30 # 行间距 ct.textColor = colors.red # 字体颜色 # 创建标题对应的段落,并且返回 return Paragraph(title, ct) # 绘制普通段落内容 @staticmethod def draw_text(text: str): # 获取所有样式表 style = getSampleStyleSheet() # 获取普通样式 ct = style['Normal'] ct.fontName = 'SimSun' ct.fontSize = 12 ct.wordWrap = 'CJK' # 设置自动换行 ct.alignment = 0 # 左对齐 ct.firstLineIndent = 32 # 第一行开头空格 ct.leading = 25 return Paragraph(text, ct) # 绘制表格 @staticmethod def draw_table(*args): # 列宽度 col_width = args[0] style = [ ('FONTNAME', (0, 0), (-1, -1), 'SimSun'), # 字体 ('FONTSIZE', (0, 0), (-1, 0), 12), # 第一行的字体大小 ('FONTSIZE', (0, 1), (-1, -1), 10), # 第二行到最后一行的字体大小 ('BACKGROUND', (0, 0), (-1, 0), '#d5dae6'), # 设置第一行背景颜色 ('ALIGN', (0, 0), (-1, -1), 'CENTER'), # 第一行水平居中 ('ALIGN', (0, 1), (-1, -1), 'LEFT'), # 第二行到最后一行左右左对齐 ('VALIGN', (0, 0), (-1, -1), 'MIDDLE'), # 所有表格上下居中对齐 ('TEXTCOLOR', (0, 0), (-1, -1), colors.darkslategray), # 设置表格内文字颜色 ('GRID', (0, 0), (-1, -1), 0.5, colors.grey), # 设置表格框线为grey色,线宽为0.5 # ('SPAN', (0, 1), (0, 2)), # 合并第一列二三行 # ('SPAN', (0, 3), (0, 4)), # 合并第一列三四行 # ('SPAN', (0, 5), (0, 6)), # 合并第一列五六行 # ('SPAN', (0, 7), (0, 8)), # 合并第一列五六行 ] table = Table(args[1:], colWidths=col_width, style=style) return table # 创建图表 @staticmethod def draw_bar(bar_data: list, ax: list, items: list): drawing = Drawing(500, 250) bc = VerticalBarChart() bc.x = 45 # 整个图表的x坐标 bc.y = 45 # 整个图表的y坐标 bc.height = 200 # 图表的高度 bc.width = 350 # 图表的宽度 bc.data = bar_data bc.strokeColor = colors.black # 顶部和右边轴线的颜色 bc.valueAxis.valueMin = 5000 # 设置y坐标的最小值 bc.valueAxis.valueMax = 26000 # 设置y坐标的最大值 bc.valueAxis.valueStep = 2000 # 设置y坐标的步长 bc.categoryAxis.labels.dx = 2 bc.categoryAxis.labels.dy = -8 bc.categoryAxis.labels.angle = 20 bc.categoryAxis.categoryNames = ax # 图示 leg = Legend() leg.fontName = 'SimSun' leg.alignment = 'right' leg.boxAnchor = 'ne' leg.x = 475 # 图例的x坐标 leg.y = 240 leg.dxTextSpace = 10 leg.columnMaximum = 3 leg.colorNamePairs = items drawing.add(leg) drawing.add(bc) return drawing # 绘制图片 @staticmethod def draw_img(path): img = Image(path) # 读取指定路径下的图片 img.drawWidth = 20*cm # 设置图片的宽度 img.drawHeight = 10*cm # 设置图片的高度 return img # 定义样式函数 def style_row(row): if '周' in row['频度']: return ['background-color: yellow'] * len(row) else: return ['background-color: gray'] * len(row) class EtaReader(): def __init__(self,signature,classifylisturl,classifyidlisturl,edbcodedataurl,edbcodelist,edbdatapushurl,edbdeleteurl,edbbusinessurl): ''' 初始化 EtaReader 类的实例。 参数: signature (str): 用于 API 请求的签名。 classifylisturl (str): 分类列表的 URL。 classifyidlisturl (str): 分类 ID 列表的 URL。 edbcodedataurl (str): EDB 代码数据的 URL。 edbdatapushurl (str): EDB 数据推送的 URL。 edbcodelist (str): EDB 代码列表的 URL。 edbdeleteurl (str): EDB 数据删除的 URL。 edbbusinessurl (str): EDB 业务数据的 URL。 返回: None ''' self.signature = signature self.classifylisturl = classifylisturl self.classifyidlisturl = classifyidlisturl self.edbcodedataurl = edbcodedataurl self.edbdatapushurl = edbdatapushurl self.edbcodelist = edbcodelist self.edbdeleteurl = edbdeleteurl self.edbbusinessurl = edbbusinessurl def filter_yuanyou_data(self,ClassifyName,data): ''' 指标名称保留规则 ''' # 包含 关键词 去除, 返回flase if any(keyword in data for keyword in ['运费','检修','波动率','地缘政治','股价', '同比','环比','环差','裂差','4WMA','变频','道琼斯','标普500','纳斯达克', '四周均值','名占比','残差','DMA', '连7-连9','4周平均','4周均值','滚动相关性','日本']): return False # 检查需要的特征 # 去掉 分析 分类下的数据 if ClassifyName == '分析': return False # 保留 库存中特殊关键词 if ClassifyName == '库存': if any(keyword in data for keyword in ['原油' , '美国' ,'全球' ,'中国' ,'富查伊拉','ARA' ]): return True else: pass else: pass # 去掉 持仓中不是基金的数据 if ClassifyName == '持仓': if '基金' not in data: return False else: pass else: pass # 去掉 航班中不是中国、美国 的数据 if ClassifyName == '需求': if '航班' in data : if '中国' in data or '美国' in data : return True else: return False else: pass else: pass # 分类为 期货市场,同质性数据取第一个 if ClassifyName == '期货市场': # 去掉c1-9 以后的 if 'c1-c' in data: try: c = int(data.split('c1-c')[1]) except: return False if c > 9 : return False else: pass else: pass # 判断 同质性数据, 字符串开头 strstartdict = {'ICE Brent c':"ICE Brent c14", 'NYMWX WTI c':"NYMWX WTI c5", 'INE SC c':"INE SC c1", 'EFS c':"EFS c", 'Dubai Swap c':"Dubai Swap c1", 'Oman Swap c':"Oman Swap c1", 'DME Oman c':"DME Oman c1", 'Murban Futures c':"Murban Futures c1", 'Dubai连合约价格':'Dubai连1合约价格', '美国RBOB期货月份合约价格':'美国RBOB期货2309月份合约价格', 'Brent连合约价格':'Brent连1合约价格', 'WTI连合约价格':'WTI连1合约价格', '布伦特连合约价格':'Brent连1合约价格', 'Brent 连合约价格':'Brent连1合约价格', 'Dubai连合约价格':'Dubai连1合约价格', 'Brent连':'Brent连1合约价格', 'brent连':'Brent连1合约价格', } # 判断名称字符串开头是否在 strstartdict.keys中 match = re.match(r'([a-zA-Z\s]+)(\d+)', data) if match: part1 = match.group(1) part2 = match.group(2) if part1 in [i for i in strstartdict.keys()]: if data == strstartdict[part1]: return True else: return False # data = 'Brent 连7合约价格' # 判断名称字符串去掉数字后是否在 strstartdict.keys中 match = re.findall(r'\D+', data) if match : if len(match) == 2: part1 = match[0] part2 = match[1] if part1+part2 in [i for i in strstartdict.keys()]: if data == strstartdict[part1+part2]: return True else: return False else: pass elif len(match) == 1: match = re.findall(r'\D+', data) part1 = match[0] if part1 in [i for i in strstartdict.keys()]: if data == strstartdict[part1]: return True else: return False else: pass else: pass # 去掉kpler数据源 if 'Kpler' in ClassifyName or 'kpler' in ClassifyName: return False return True def filter_pp_data(self,ClassifyName,data): ''' 指标名称保留规则 ''' # 包含 关键词 去除, 返回flase # if any(keyword in data for keyword in ['运费','检修','波动率','地缘政治','股价', # '同比','环比','环差','裂差','4WMA','变频','道琼斯','标普500','纳斯达克', # '四周均值','名占比','残差','DMA', # '连7-连9','4周平均','4周均值','滚动相关性','日本']): # return False # 包含 关键词 保留, 返回True if any(keyword in data for keyword in ['拉丝']): return True # 检查需要的特征 # 去掉 期货市场 分类下的数据 if ClassifyName == '期货市场': return False else: pass # 保留 库存 下所有指标 if ClassifyName == '库存': return True else: pass # 保留 进出口 下所有指标 if ClassifyName == '进出口': return True else: pass # 保留 价差 下所有指标 if ClassifyName == '价差': return True else: pass # 保留 供应 下所有指标 if ClassifyName == '供应': return True else: pass # 保留 需求 下所有指标 if ClassifyName == '需求': return True else: pass return True # 通过edbcode 获取指标数据 def edbcodegetdata(self,df,EdbCode,EdbName): # 根据指标id,获取指标数据 url = self.edbcodedataurl+str(EdbCode) # 发送GET请求 response = requests.get(url, headers=self.headers) # 检查响应状态码 if response.status_code == 200: data = response.json() # 假设接口返回的是JSON数据 all_data_items = data.get('Data') # 列表转换为DataFrame df3 = pd.DataFrame(all_data_items, columns=['DataTime', 'Value', 'UpdateTime']) # df3 = pd.read_json(all_data_items, orient='records') # 去掉UpdateTime 列 df3 = df3.drop(columns=['UpdateTime']) # df3.set_index('DataTime') df3.rename(columns={'Value': EdbName}, inplace=True) # 将数据存储df1 df = pd.merge(df, df3, how='outer',on='DataTime',suffixes= ('', '_y')) # 按时间排序 df = df.sort_values(by='DataTime', ascending=True) return df else: # 请求失败,打印错误信息 logger.info(f'Error: {response.status_code}, {response.text}') # 主动抛出异常 raise Exception(f'Error: {response.status_code}, {response.text}') def get_eta_api_yuanyou_data(self,data_set,dataset=''): ''' 从ETA API获取原油数据 参数: data_set (str): 数据集名称 dataset (str): 数据集ID,默认为空 返回: None ''' today = datetime.date.today().strftime("%Y-%m-%d") # 定义你的headers,这里可以包含多个参数 self.headers = { 'nonce': self.signature.nonce, # 例如,一个认证令牌 'timestamp': str(self.signature.timestamp), # 自定义的header参数 'appid': self.signature.APPID, # 另一个自定义的header参数 'signature': self.signature.signature } # 从列表数据中获取指标名称,判断指标名称频度是否为日 ,如果是,则获取UniqueCode,然后获取指标数据,保存到xlat文件中的sheet表。 ''' df = sheetname 指标列表,存储 指标分类-指标名称-指标id-频度 df1 = sheetname 指标数据 ,存储 时间-指标名称1-指标名称2... ''' # 构建新的DataFrame df df1 df = pd.DataFrame(columns=['指标分类', '指标名称', '指标id', '频度','指标来源','来源id','最后更新时间','更新周期','预警日期','停更周期']) df1 = pd.DataFrame(columns=['DataTime']) # 外网环境无法访问,请确认是否为内网环境 try: # 发送GET请求 获取指标分类列表 response = requests.get(self.classifylisturl, headers=self.headers) except requests.exceptions.RequestException as e: raise Exception(f"请求失败,请确认是否为内网环境: {e}","\033[0m") # 检查响应状态码 if response.status_code == 200: # 获取成功, 处理响应内容 data = response.json() # 假设接口返回的是JSON数据 # 请求成功,处理响应内容 # logger.info(data.get('Data')) # 定义你想要保留的固定值 fixed_value = 1214 # 遍历列表,只保留那些'category' key的值为固定值的数据项 filtered_data = [item for item in data.get('Data') if item.get('ParentId') == fixed_value] #然后循环filtered_data去获取list数据,才能获取到想要获取的ClassifyId n = 0 for item in filtered_data: n+= 1 # if n>50: # break ClassifyId = item["ClassifyId"] #分类id,分类下的指标列表接口的请求参数 ClassifyName = item["ClassifyName"] #分类名称,要保存到df的指标分类列 # 根据分类id,获取指标列表 url = self.classifyidlisturl+str(ClassifyId) response = requests.get(url, headers=self.headers) if response.status_code == 200: # logger.info(response.text) data2 = response.json() Data = data2.get('Data') for i in Data: # s+= 1 EdbCode = i.get('EdbCode') EdbName = i.get('EdbName') # 指标名称,要保存到df2的指标名称列,df的指标名称列 Frequency = i.get('Frequency') # 频度,要保存到df的频度列 SourceName = i.get('SourceName') # 来源名称,要保存到df的频度列 Source = i.get('Source') # 来源ID,要保存到df的频度列 # 频度不是 日 或者 周的 跳过 if Frequency not in ['日度','周度','日','周']: continue # 只保留手工数据中,名称带有 海运出口 海运进口 if Source == 9 and not ('海运出口' in EdbName or '海运进口' in EdbName): continue # 不要wind数据 if Source == 2: continue # 判断名称是否需要保存 isSave = self.filter_yuanyou_data(ClassifyName,EdbName) if isSave: # 保存到df df1 = self.edbcodegetdata(df1,EdbCode,EdbName) # 取df1所有行最后一列 edbname_df = df1[['DataTime',f'{EdbName}']] edbname_df = edbname_df.dropna() if len(edbname_df) == 0: logger.info(f'指标名称:{EdbName} 没有数据') continue try: time_sequence = edbname_df['DataTime'].values.tolist()[-10:] except IndexError: time_sequence = edbname_df['DataTime'].values.tolist() # 使用Counter来统计每个星期几的出现次数 from collections import Counter weekday_counter = Counter(datetime.datetime.strptime(time_str, "%Y-%m-%d").strftime('%A') for time_str in time_sequence) # 打印出现次数最多的星期几 try: most_common_weekday = weekday_counter.most_common(1)[0][0] # 计算两周后的日期 warning_date = (datetime.datetime.strptime(time_sequence[-1], "%Y-%m-%d") + datetime.timedelta(weeks=2)).strftime("%Y-%m-%d") stop_update_period = (datetime.datetime.strptime(today, "%Y-%m-%d") - datetime.datetime.strptime(time_sequence[-1], "%Y-%m-%d")).days // 7 except IndexError: most_common_weekday = '其他' stop_update_period = 0 if '日' in Frequency: most_common_weekday = '每天' warning_date = (datetime.datetime.strptime(time_sequence[-1], "%Y-%m-%d") + datetime.timedelta(days=3)).strftime("%Y-%m-%d") stop_update_period = (datetime.datetime.strptime(today, "%Y-%m-%d") - datetime.datetime.strptime(time_sequence[-1], "%Y-%m-%d")).days # 保存频度 指标名称 分类 指标id 到 df df2 = pd.DataFrame({'指标分类': ClassifyName, '指标名称': EdbName, '指标id': EdbCode, '频度': Frequency,'指标来源':SourceName,'来源id':Source,'最后更新时间':edbname_df['DataTime'].values[-1],'更新周期':most_common_weekday,'预警日期':warning_date,'停更周期':stop_update_period},index=[0]) # df = pd.merge(df, df2, how='outer') df = pd.concat([df, df2]) else: logger.info(f'跳过指标 {EdbName}') # 找到列表中不在指标列中的指标id,保存成新的list new_list = [item for item in self.edbcodelist if item not in df['指标id'].tolist()] logger.info(new_list) # 遍历new_list,获取指标数据,保存到df1 for item in new_list: logger.info(item) # 将item 加入到 df['指标id']中 try: itemname = edbcodenamedict[item] except: itemname = item df1 = self.edbcodegetdata(df1,item,itemname) df = pd.concat([df, pd.DataFrame({'指标分类': '其他', '指标名称': itemname, '指标id': item, '频度': '其他','指标来源':'其他','来源id':'其他'},index=[0])]) # 按时间排序 df1.sort_values('DataTime',inplace=True,ascending=False) df1.rename(columns={'DataTime': 'date'},inplace=True) # df1.dropna(inplace=True) # 去掉大于今天日期的行 df1 = df1[df1['date'] <= datetime.datetime.now().strftime('%Y-%m-%d')] logger.info(df1.head()) # logger.info(f'{df1.head()}') # 保存到xlsx文件的sheet表 with pd.ExcelWriter(os.path.join(dataset,data_set)) as file: df1.to_excel(file, sheet_name='指标数据', index=False) df.to_excel(file, sheet_name='指标列表', index=False) df_zhibiaoshuju = df1.copy() df_zhibiaoliebiao = df.copy() return df_zhibiaoshuju,df_zhibiaoliebiao def get_eta_api_pp_data(self,data_set,dataset=''): global ClassifyId today = datetime.date.today().strftime("%Y-%m-%d") # 定义你的headers,这里可以包含多个参数 self.headers = { 'nonce': self.signature.nonce, # 例如,一个认证令牌 'timestamp': str(self.signature.timestamp), # 自定义的header参数 'appid': self.signature.APPID, # 另一个自定义的header参数 'signature': self.signature.signature } # 从列表数据中获取指标名称,判断指标名称频度是否为日 ,如果是,则获取UniqueCode,然后获取指标数据,保存到xlat文件中的sheet表。 ''' df = sheetname 指标列表,存储 指标分类-指标名称-指标id-频度 df1 = sheetname 指标数据 ,存储 时间-指标名称1-指标名称2... ''' # 构建新的DataFrame df df1 df = pd.DataFrame(columns=['指标分类', '指标名称', '指标id', '频度']) df1 = pd.DataFrame(columns=['DataTime']) # 外网环境无法访问,请确认是否为内网环境 try: # 发送GET请求 获取指标分类列表 response = requests.get(self.classifylisturl, headers=self.headers) except requests.exceptions.RequestException as e: raise Exception(f"请求失败,请确认是否为内网环境: {e}","\033[0m") # 检查响应状态码 if response.status_code == 200: # 获取成功, 处理响应内容 data = response.json() # 假设接口返回的是JSON数据 # 请求成功,处理响应内容 # logger.info(data.get('Data')) # 定义你想要保留的固定值 fixed_value = ClassifyId # 遍历列表,只保留那些'category' key的值为固定值的数据项 filtered_data = [item for item in data.get('Data') if item.get('ParentId') == fixed_value] #然后循环filtered_data去获取list数据,才能获取到想要获取的ClassifyId n = 0 for item in filtered_data: n+= 1 # if n>50: # break ClassifyId = item["ClassifyId"] #分类id,分类下的指标列表接口的请求参数 ClassifyName = item["ClassifyName"] #分类名称,要保存到df的指标分类列 # 根据分类id,获取指标列表 url = self.classifyidlisturl+str(ClassifyId) response = requests.get(url, headers=self.headers) if response.status_code == 200: # logger.info(response.text) data2 = response.json() Data = data2.get('Data') for i in Data: # s+= 1 EdbCode = i.get('EdbCode') EdbName = i.get('EdbName') # 指标名称,要保存到df2的指标名称列,df的指标名称列 Frequency = i.get('Frequency') # 频度,要保存到df的频度列 # 频度不是 日 或者 周的 跳过 if Frequency not in ['日度','周度','日','周']: continue # 判断名称是否需要保存 isSave = self.filter_pp_data(ClassifyName,EdbName) if isSave: # 保存到df # 保存频度 指标名称 分类 指标id 到 df df2 = pd.DataFrame({'指标分类': ClassifyName, '指标名称': EdbName, '指标id': EdbCode, '频度': Frequency},index=[0]) # df = pd.merge(df, df2, how='outer') df = pd.concat([df, df2]) df1 = self.edbcodegetdata(df1,EdbCode,EdbName) else: logger.info(f'跳过指标 {EdbName}') # 找到列表中不在指标列中的指标id,保存成新的list new_list = [item for item in self.edbcodelist if item not in df['指标id'].tolist()] logger.info(new_list) # 遍历new_list,获取指标数据,保存到df1 for item in new_list: logger.info(item) # 将item 加入到 df['指标id']中 try: itemname = edbcodenamedict[item] except: itemname = item df1 = self.edbcodegetdata(df1,item,itemname) df = pd.concat([df, pd.DataFrame({'指标分类': '其他', '指标名称': itemname, '指标id': item, '频度': '其他'},index=[0])]) # 按时间排序 df1.sort_values('DataTime',inplace=True,ascending=False) df1.rename(columns={'DataTime': 'date'},inplace=True) # df1.dropna(inplace=True) # 去掉大于今天日期的行 df1 = df1[df1['date'] <= datetime.datetime.now().strftime('%Y-%m-%d')] logger.info(df1.head()) # logger.info(f'{df1.head()}') # 保存到xlsx文件的sheet表 with pd.ExcelWriter(os.path.join(dataset,data_set)) as file: df1.to_excel(file, sheet_name='指标数据', index=False) df.to_excel(file, sheet_name='指标列表', index=False) df_zhibiaoshuju = df1.copy() df_zhibiaoliebiao = df.copy() return df_zhibiaoshuju,df_zhibiaoliebiao def push_data(self,data): today = datetime.date.today().strftime("%Y-%m-%d") # 定义你的headers,这里可以包含多个参数 self.headers = { 'nonce': self.signature.nonce, # 例如,一个认证令牌 'timestamp': str(self.signature.timestamp), # 自定义的header参数 'appid': self.signature.APPID, # 另一个自定义的header参数 'signature': self.signature.signature } # 发送post请求 上传数据 logger.info('请求参数:',data) response = requests.post(self.edbdatapushurl, headers=self.headers,data=json.dumps(data)) # 检查响应状态码 if response.status_code == 200: data = response.json() # 假设接口返回的是JSON数据 logger.info('上传成功,响应为:', data) else: # 请求失败,打印错误信息 logger.info(f'Error: {response.status_code}, {response.text}') # 主动抛出异常 raise Exception(f'Error: {response.status_code}, {response.text}') def del_zhibiao(self,IndexCodeList): today = datetime.date.today().strftime("%Y-%m-%d") # 定义你的headers,这里可以包含多个参数 self.headers = { 'nonce': self.signature.nonce, # 例如,一个认证令牌 'timestamp': str(self.signature.timestamp), # 自定义的header参数 'appid': self.signature.APPID, # 另一个自定义的header参数 'signature': self.signature.signature } data = { "IndexCodeList": IndexCodeList #指标编码列表 } # 发送post请求 上传数据 response = requests.post(self.edbdeleteurl, headers=self.headers,data=json.dumps(data)) # 检查响应状态码 if response.status_code == 200: data = response.json() # 假设接口返回的是JSON数据 logger.info('删除成功,响应为:', data) else: # 请求失败,打印错误信息 logger.info(f'Error: {response.status_code}, {response.text}') # 主动抛出异常 raise Exception(f'Error: {response.status_code}, {response.text}') def del_business(self,data): '''' 接口地址 https://console-docs.apipost.cn/preview/fce869601d0be1d9/9a637c2f9ed0c589?target_id=d3cafcbf-a68c-42b3-b105-7bbd0e95a9cd 请求体 body { "IndexCode": "W001067", //指标编码 "StartDate": "2020-04-20", //指标需要删除的开始日期(>=),如果开始日期和结束日期相等,那么就是删除该日期 "EndDate": "2024-05-28" //指标需要删除的结束日期(<=),如果开始日期和结束日期相等,那么就是删除该日期 } ''' today = datetime.date.today().strftime("%Y-%m-%d") # 定义你的headers,这里可以包含多个参数 self.headers = { 'nonce': self.signature.nonce, # 例如,一个认证令牌 'timestamp': str(self.signature.timestamp), # 自定义的header参数 'appid': self.signature.APPID, # 另一个自定义的header参数 'signature': self.signature.signature } # 发送post请求 上传数据 response = requests.post(self.edbbusinessurl, headers=self.headers,data=json.dumps(data)) # 检查响应状态码 if response.status_code == 200: data = response.json() # 假设接口返回的是JSON数据 logger.info('删除成功,响应为:', data) else: # 请求失败,打印错误信息 logger.info(f'Error: {response.status_code}, {response.text}') # 主动抛出异常 raise Exception(f'Error: {response.status_code}, {response.text}') # 时间特征,年,月,一年的多少天,周几,第几周,第几季度,每月的第几天, 每季度的第几天,是否每月的第一天,是否每月的最后一天,是否每季度的第一天,是否每季度的最后一天,是否每年的第一天,是否每年的最后一天 def addtimecharacteristics(df,dataset): """ 为输入的 DataFrame 添加日期相关信息列 参数: df (pandas.DataFrame): 包含日期列 'ds' 的 DataFrame 返回: pandas.DataFrame: 添加了相关列的 DataFrame """ df['year'] = df['ds'].dt.year df['month'] = df['ds'].dt.month df['day'] = df['ds'].dt.day df['dayofweek'] = df['ds'].dt.dayofweek df['weekofyear'] = df['ds'].dt.isocalendar().week df['dayofyear'] = df['ds'].dt.dayofyear df['quarternum'] = df['ds'].dt.quarter # 将ds列转换为季度Period对象 df['quarter'] = df['ds'].dt.to_period('Q') # 获取每个季度的开始日期 df['quarter_start'] = df['quarter'].dt.to_timestamp('s') # 计算每个日期是所在季度的第几天 df['dayofquarter'] = (df['ds'] - df['quarter_start']).dt.days + 1 # 是否月初 df['is_month_start'] = df['ds'].dt.is_month_start.astype(int) # 是否月末 df['is_month_end'] = df['ds'].dt.is_month_end.astype(int) # 是否季度初 df['is_quarter_start'] = df['ds'].dt.is_quarter_start.astype(int) # 是否季度末 df['is_quarter_end'] = df['ds'].dt.is_quarter_end.astype(int) # 是否年初 df['is_year_start'] = df['ds'].dt.is_year_start.astype(int) # 是否年末 df['is_year_end'] = df['ds'].dt.is_year_end.astype(int) # 去掉 quarter_start quarter df.drop(columns=['quarter_start','quarter'],inplace=True) df.to_csv(os.path.join(dataset,'指标数据添加时间特征.csv'), index=False) return df