diff --git a/ETA获取数据.py b/ETA获取数据.py new file mode 100644 index 0000000..b6aaa66 --- /dev/null +++ b/ETA获取数据.py @@ -0,0 +1,729 @@ +import pandas as pd +import datetime +import hashlib +import hmac +import base64 +import random +import time +import string +import logging +import logging.handlers +import os +import re +import requests + +class EtaReader(): + def __init__(self,signature,classifylisturl,classifyidlisturl,edbcodedataurl,edbcodelist,edbdatapushurl,edbdeleteurl,edbbusinessurl): + ''' + 初始化 EtaReader 类的实例。 + + 参数: + signature (str): 用于 API 请求的签名。 + classifylisturl (str): 分类列表的 URL。 + classifyidlisturl (str): 分类 ID 列表的 URL。 + edbcodedataurl (str): EDB 代码数据的 URL。 + edbdatapushurl (str): EDB 数据推送的 URL。 + edbcodelist (str): EDB 代码列表的 URL。 + edbdeleteurl (str): EDB 数据删除的 URL。 + edbbusinessurl (str): EDB 业务数据的 URL。 + + 返回: + None + ''' + self.signature = signature + self.classifylisturl = classifylisturl + self.classifyidlisturl = classifyidlisturl + self.edbcodedataurl = edbcodedataurl + self.edbdatapushurl = edbdatapushurl + self.edbcodelist = edbcodelist + self.edbdeleteurl = edbdeleteurl + self.edbbusinessurl = edbbusinessurl + + + def filter_yuanyou_data(self,ClassifyName,data): + ''' + 指标名称保留规则 + ''' + + # 包含 关键词 去除, 返回flase + if any(keyword in data for keyword in ['运费','检修','波动率','地缘政治','股价', + '同比','环比','环差','裂差','4WMA','变频','道琼斯','标普500','纳斯达克', + '四周均值','名占比','残差','DMA', + '连7-连9','4周平均','4周均值','滚动相关性','日本']): + return False + + # 检查需要的特征 + # 去掉 分析 分类下的数据 + if ClassifyName == '分析': + return False + + # 保留 库存中特殊关键词 + if ClassifyName == '库存': + if any(keyword in data for keyword in ['原油' , '美国' ,'全球' ,'中国' ,'富查伊拉','ARA' ]): + return True + else: + pass + else: + pass + + # 去掉 持仓中不是基金的数据 + if ClassifyName == '持仓': + if '基金' not in data: + return False + else: + pass + else: + pass + + # 去掉 航班中不是中国、美国 的数据 + if ClassifyName == '需求': + if '航班' in data : + if '中国' in data or '美国' in data : + return True + else: + return False + else: + pass + else: + pass + + # 分类为 期货市场,同质性数据取第一个 + if ClassifyName == '期货市场': + # 去掉c1-9 以后的 + if 'c1-c' in data: + try: + c = int(data.split('c1-c')[1]) + except: + return False + if c > 9 : + return False + else: + pass + + else: + pass + + # 判断 同质性数据, 字符串开头 + strstartdict = {'ICE Brent c':"ICE Brent c14", + 'NYMWX WTI c':"NYMWX WTI c5", + 'INE SC c':"INE SC c1", + 'EFS c':"EFS c", + 'Dubai Swap c':"Dubai Swap c1", + 'Oman Swap c':"Oman Swap c1", + 'DME Oman c':"DME Oman c1", + 'Murban Futures c':"Murban Futures c1", + 'Dubai连合约价格':'Dubai连1合约价格', + '美国RBOB期货月份合约价格':'美国RBOB期货2309月份合约价格', + 'Brent连合约价格':'Brent连1合约价格', + 'WTI连合约价格':'WTI连1合约价格', + '布伦特连合约价格':'Brent连1合约价格', + 'Brent 连合约价格':'Brent连1合约价格', + 'Dubai连合约价格':'Dubai连1合约价格', + 'Brent连':'Brent连1合约价格', + 'brent连':'Brent连1合约价格', + } + # 判断名称字符串开头是否在 strstartdict.keys中 + match = re.match(r'([a-zA-Z\s]+)(\d+)', data) + if match: + part1 = match.group(1) + part2 = match.group(2) + if part1 in [i for i in strstartdict.keys()]: + if data == strstartdict[part1]: + return True + else: + return False + # data = 'Brent 连7合约价格' + # 判断名称字符串去掉数字后是否在 strstartdict.keys中 + match = re.findall(r'\D+', data) + if match : + if len(match) == 2: + part1 = match[0] + part2 = match[1] + if part1+part2 in [i for i in strstartdict.keys()]: + if data == strstartdict[part1+part2]: + return True + else: + return False + else: + pass + elif len(match) == 1: + match = re.findall(r'\D+', data) + part1 = match[0] + + if part1 in [i for i in strstartdict.keys()]: + if data == strstartdict[part1]: + return True + else: + return False + else: + pass + else: + pass + + # 去掉kpler数据源 + if 'Kpler' in ClassifyName or 'kpler' in ClassifyName: + return False + + return True + + def filter_pp_data(self,ClassifyName,data): + ''' + 指标名称保留规则 + ''' + + # 包含 关键词 去除, 返回flase + # if any(keyword in data for keyword in ['运费','检修','波动率','地缘政治','股价', + # '同比','环比','环差','裂差','4WMA','变频','道琼斯','标普500','纳斯达克', + # '四周均值','名占比','残差','DMA', + # '连7-连9','4周平均','4周均值','滚动相关性','日本']): + # return False + # 包含 关键词 保留, 返回True + if any(keyword in data for keyword in ['拉丝']): + return True + + + + # 检查需要的特征 + # 去掉 期货市场 分类下的数据 + if ClassifyName == '期货市场': + return False + else: + pass + + # 保留 库存 下所有指标 + if ClassifyName == '库存': + return True + else: + pass + + # 保留 进出口 下所有指标 + if ClassifyName == '进出口': + return True + else: + pass + + # 保留 价差 下所有指标 + if ClassifyName == '价差': + return True + else: + pass + + # 保留 供应 下所有指标 + if ClassifyName == '供应': + return True + else: + pass + + + # 保留 需求 下所有指标 + if ClassifyName == '需求': + return True + else: + pass + + + return True + + # 通过edbcode 获取指标数据 + def edbcodegetdata(self,df,EdbCode,EdbName): + # 根据指标id,获取指标数据 + url = self.edbcodedataurl+str(EdbCode) + # 发送GET请求 + response = requests.get(url, headers=self.headers) + + # 检查响应状态码 + if response.status_code == 200: + data = response.json() # 假设接口返回的是JSON数据 + all_data_items = data.get('Data') + # 列表转换为DataFrame + df3 = pd.DataFrame(all_data_items, columns=['DataTime', 'Value', 'UpdateTime']) + # df3 = pd.read_json(all_data_items, orient='records') + + # 去掉UpdateTime 列 + df3 = df3.drop(columns=['UpdateTime']) + # df3.set_index('DataTime') + df3.rename(columns={'Value': EdbName}, inplace=True) + # 将数据存储df1 + df = pd.merge(df, df3, how='outer',on='DataTime',suffixes= ('', '_y')) + # 按时间排序 + df = df.sort_values(by='DataTime', ascending=True) + return df + + else: + # 请求失败,打印错误信息 + logger.info(f'Error: {response.status_code}, {response.text}') + # 主动抛出异常 + raise Exception(f'Error: {response.status_code}, {response.text}') + + def get_eta_api_yuanyou_data(self,data_set,dataset=''): + ''' + 从ETA API获取原油数据 + + 参数: + data_set (str): 数据集名称 + dataset (str): 数据集ID,默认为空 + + 返回: + None + ''' + today = datetime.date.today().strftime("%Y-%m-%d") + + # 定义你的headers,这里可以包含多个参数 + self.headers = { + 'nonce': self.signature.nonce, # 例如,一个认证令牌 + 'timestamp': str(self.signature.timestamp), # 自定义的header参数 + 'appid': self.signature.APPID, # 另一个自定义的header参数 + 'signature': self.signature.signature + } + + # 从列表数据中获取指标名称,判断指标名称频度是否为日 ,如果是,则获取UniqueCode,然后获取指标数据,保存到xlat文件中的sheet表。 + + ''' + df = sheetname 指标列表,存储 指标分类-指标名称-指标id-频度 + df1 = sheetname 指标数据 ,存储 时间-指标名称1-指标名称2... + + ''' + + # 构建新的DataFrame df df1 + df = pd.DataFrame(columns=['指标分类', '指标名称', '指标id', '频度','指标来源','来源id','最后更新时间','更新周期','预警日期','停更周期']) + df1 = pd.DataFrame(columns=['DataTime']) + + + # 外网环境无法访问,请确认是否为内网环境 + try: + # 发送GET请求 获取指标分类列表 + response = requests.get(self.classifylisturl, headers=self.headers) + except requests.exceptions.RequestException as e: + raise Exception(f"请求失败,请确认是否为内网环境: {e}","\033[0m") + + # 检查响应状态码 + if response.status_code == 200: + # 获取成功, 处理响应内容 + data = response.json() # 假设接口返回的是JSON数据 + + # 请求成功,处理响应内容 + # logger.info(data.get('Data')) + # 定义你想要保留的固定值 + fixed_value = 1214 + + # 遍历列表,只保留那些'category' key的值为固定值的数据项 + filtered_data = [item for item in data.get('Data') if item.get('ParentId') == fixed_value] + + #然后循环filtered_data去获取list数据,才能获取到想要获取的ClassifyId + n = 0 + for item in filtered_data: + n+= 1 + # if n>50: + # break + ClassifyId = item["ClassifyId"] #分类id,分类下的指标列表接口的请求参数 + ClassifyName = item["ClassifyName"] #分类名称,要保存到df的指标分类列 + # 根据分类id,获取指标列表 + url = self.classifyidlisturl+str(ClassifyId) + response = requests.get(url, headers=self.headers) + if response.status_code == 200: + # logger.info(response.text) + data2 = response.json() + Data = data2.get('Data') + for i in Data: + # s+= 1 + EdbCode = i.get('EdbCode') + EdbName = i.get('EdbName') # 指标名称,要保存到df2的指标名称列,df的指标名称列 + Frequency = i.get('Frequency') # 频度,要保存到df的频度列 + SourceName = i.get('SourceName') # 来源名称,要保存到df的频度列 + Source = i.get('Source') # 来源ID,要保存到df的频度列 + # 频度不是 日 或者 周的 跳过 + if Frequency not in ['日度','周度','日','周']: + continue + + # 只保留手工数据中,名称带有 海运出口 海运进口 + if Source == 9 and not ('海运出口' in EdbName or '海运进口' in EdbName): + continue + + # 不要wind数据 + if Source == 2: + continue + + + # 判断名称是否需要保存 + isSave = self.filter_yuanyou_data(ClassifyName,EdbName) + if isSave: + # 保存到df + df1 = self.edbcodegetdata(df1,EdbCode,EdbName) + # 取df1所有行最后一列 + edbname_df = df1[['DataTime',f'{EdbName}']] + edbname_df = edbname_df.dropna() + + if len(edbname_df) == 0: + logger.info(f'指标名称:{EdbName} 没有数据') + continue + try: + time_sequence = edbname_df['DataTime'].values.tolist()[-10:] + except IndexError: + time_sequence = edbname_df['DataTime'].values.tolist() + # 使用Counter来统计每个星期几的出现次数 + from collections import Counter + weekday_counter = Counter(datetime.datetime.strptime(time_str, "%Y-%m-%d").strftime('%A') for time_str in time_sequence) + + # 打印出现次数最多的星期几 + try: + most_common_weekday = weekday_counter.most_common(1)[0][0] + # 计算两周后的日期 + warning_date = (datetime.datetime.strptime(time_sequence[-1], "%Y-%m-%d") + datetime.timedelta(weeks=2)).strftime("%Y-%m-%d") + stop_update_period = (datetime.datetime.strptime(today, "%Y-%m-%d") - datetime.datetime.strptime(time_sequence[-1], "%Y-%m-%d")).days // 7 + + except IndexError: + most_common_weekday = '其他' + stop_update_period = 0 + if '日' in Frequency: + most_common_weekday = '每天' + warning_date = (datetime.datetime.strptime(time_sequence[-1], "%Y-%m-%d") + datetime.timedelta(days=3)).strftime("%Y-%m-%d") + stop_update_period = (datetime.datetime.strptime(today, "%Y-%m-%d") - datetime.datetime.strptime(time_sequence[-1], "%Y-%m-%d")).days + + # 保存频度 指标名称 分类 指标id 到 df + df2 = pd.DataFrame({'指标分类': ClassifyName, '指标名称': EdbName, '指标id': EdbCode, '频度': Frequency,'指标来源':SourceName,'来源id':Source,'最后更新时间':edbname_df['DataTime'].values[-1],'更新周期':most_common_weekday,'预警日期':warning_date,'停更周期':stop_update_period},index=[0]) + + # df = pd.merge(df, df2, how='outer') + df = pd.concat([df, df2]) + else: + logger.info(f'跳过指标 {EdbName}') + + # 找到列表中不在指标列中的指标id,保存成新的list + new_list = [item for item in self.edbcodelist if item not in df['指标id'].tolist()] + logger.info(new_list) + # 遍历new_list,获取指标数据,保存到df1 + for item in new_list: + logger.info(item) + # 将item 加入到 df['指标id']中 + try: + itemname = edbcodenamedict[item] + except: + itemname = item + + df1 = self.edbcodegetdata(df1,item,itemname) + df = pd.concat([df, pd.DataFrame({'指标分类': '其他', '指标名称': itemname, '指标id': item, '频度': '其他','指标来源':'其他','来源id':'其他'},index=[0])]) + + # 按时间排序 + df1.sort_values('DataTime',inplace=True,ascending=False) + df1.rename(columns={'DataTime': 'date'},inplace=True) + # df1.dropna(inplace=True) + # 去掉大于今天日期的行 + df1 = df1[df1['date'] <= datetime.datetime.now().strftime('%Y-%m-%d')] + logger.info(df1.head()) + # logger.info(f'{df1.head()}') + # 保存到xlsx文件的sheet表 + with pd.ExcelWriter(os.path.join(dataset,data_set)) as file: + df1.to_excel(file, sheet_name='指标数据', index=False) + df.to_excel(file, sheet_name='指标列表', index=False) + df_zhibiaoshuju = df1.copy() + df_zhibiaoliebiao = df.copy() + return df_zhibiaoshuju,df_zhibiaoliebiao + + def get_eta_api_pp_data(self,data_set,dataset=''): + global ClassifyId + today = datetime.date.today().strftime("%Y-%m-%d") + + # 定义你的headers,这里可以包含多个参数 + self.headers = { + 'nonce': self.signature.nonce, # 例如,一个认证令牌 + 'timestamp': str(self.signature.timestamp), # 自定义的header参数 + 'appid': self.signature.APPID, # 另一个自定义的header参数 + 'signature': self.signature.signature + } + + # 从列表数据中获取指标名称,判断指标名称频度是否为日 ,如果是,则获取UniqueCode,然后获取指标数据,保存到xlat文件中的sheet表。 + + ''' + df = sheetname 指标列表,存储 指标分类-指标名称-指标id-频度 + df1 = sheetname 指标数据 ,存储 时间-指标名称1-指标名称2... + + ''' + + # 构建新的DataFrame df df1 + df = pd.DataFrame(columns=['指标分类', '指标名称', '指标id', '频度']) + df1 = pd.DataFrame(columns=['DataTime']) + + + # 外网环境无法访问,请确认是否为内网环境 + try: + # 发送GET请求 获取指标分类列表 + response = requests.get(self.classifylisturl, headers=self.headers) + except requests.exceptions.RequestException as e: + raise Exception(f"请求失败,请确认是否为内网环境: {e}","\033[0m") + + # 检查响应状态码 + if response.status_code == 200: + # 获取成功, 处理响应内容 + data = response.json() # 假设接口返回的是JSON数据 + + # 请求成功,处理响应内容 + # logger.info(data.get('Data')) + # 定义你想要保留的固定值 + fixed_value = ClassifyId + + # 遍历列表,只保留那些'category' key的值为固定值的数据项 + filtered_data = [item for item in data.get('Data') if item.get('ParentId') == fixed_value] + + #然后循环filtered_data去获取list数据,才能获取到想要获取的ClassifyId + n = 0 + for item in filtered_data: + n+= 1 + # if n>50: + # break + ClassifyId = item["ClassifyId"] #分类id,分类下的指标列表接口的请求参数 + ClassifyName = item["ClassifyName"] #分类名称,要保存到df的指标分类列 + # 根据分类id,获取指标列表 + url = self.classifyidlisturl+str(ClassifyId) + response = requests.get(url, headers=self.headers) + if response.status_code == 200: + # logger.info(response.text) + data2 = response.json() + Data = data2.get('Data') + for i in Data: + # s+= 1 + EdbCode = i.get('EdbCode') + EdbName = i.get('EdbName') # 指标名称,要保存到df2的指标名称列,df的指标名称列 + Frequency = i.get('Frequency') # 频度,要保存到df的频度列 + # 频度不是 日 或者 周的 跳过 + if Frequency not in ['日度','周度','日','周']: + continue + + # 判断名称是否需要保存 + isSave = self.filter_pp_data(ClassifyName,EdbName) + if isSave: + # 保存到df + # 保存频度 指标名称 分类 指标id 到 df + df2 = pd.DataFrame({'指标分类': ClassifyName, '指标名称': EdbName, '指标id': EdbCode, '频度': Frequency},index=[0]) + + # df = pd.merge(df, df2, how='outer') + df = pd.concat([df, df2]) + df1 = self.edbcodegetdata(df1,EdbCode,EdbName) + else: + logger.info(f'跳过指标 {EdbName}') + + # 找到列表中不在指标列中的指标id,保存成新的list + new_list = [item for item in self.edbcodelist if item not in df['指标id'].tolist()] + logger.info(new_list) + # 遍历new_list,获取指标数据,保存到df1 + for item in new_list: + logger.info(item) + # 将item 加入到 df['指标id']中 + try: + itemname = edbcodenamedict[item] + except: + itemname = item + + df1 = self.edbcodegetdata(df1,item,itemname) + df = pd.concat([df, pd.DataFrame({'指标分类': '其他', '指标名称': itemname, '指标id': item, '频度': '其他'},index=[0])]) + + # 按时间排序 + df1.sort_values('DataTime',inplace=True,ascending=False) + df1.rename(columns={'DataTime': 'date'},inplace=True) + # df1.dropna(inplace=True) + # 去掉大于今天日期的行 + df1 = df1[df1['date'] <= datetime.datetime.now().strftime('%Y-%m-%d')] + logger.info(df1.head()) + # logger.info(f'{df1.head()}') + # 保存到xlsx文件的sheet表 + with pd.ExcelWriter(os.path.join(dataset,data_set)) as file: + df1.to_excel(file, sheet_name='指标数据', index=False) + df.to_excel(file, sheet_name='指标列表', index=False) + + df_zhibiaoshuju = df1.copy() + df_zhibiaoliebiao = df.copy() + return df_zhibiaoshuju,df_zhibiaoliebiao + + def push_data(self,data): + + today = datetime.date.today().strftime("%Y-%m-%d") + + # 定义你的headers,这里可以包含多个参数 + self.headers = { + 'nonce': self.signature.nonce, # 例如,一个认证令牌 + 'timestamp': str(self.signature.timestamp), # 自定义的header参数 + 'appid': self.signature.APPID, # 另一个自定义的header参数 + 'signature': self.signature.signature + } + + # 发送post请求 上传数据 + logger.info('请求参数:',data) + response = requests.post(self.edbdatapushurl, headers=self.headers,data=json.dumps(data)) + + # 检查响应状态码 + if response.status_code == 200: + data = response.json() # 假设接口返回的是JSON数据 + + logger.info('上传成功,响应为:', data) + + else: + # 请求失败,打印错误信息 + logger.info(f'Error: {response.status_code}, {response.text}') + # 主动抛出异常 + raise Exception(f'Error: {response.status_code}, {response.text}') + + def del_zhibiao(self,IndexCodeList): + today = datetime.date.today().strftime("%Y-%m-%d") + + # 定义你的headers,这里可以包含多个参数 + self.headers = { + 'nonce': self.signature.nonce, # 例如,一个认证令牌 + 'timestamp': str(self.signature.timestamp), # 自定义的header参数 + 'appid': self.signature.APPID, # 另一个自定义的header参数 + 'signature': self.signature.signature + } + + data = { + "IndexCodeList": IndexCodeList #指标编码列表 + } + # 发送post请求 上传数据 + response = requests.post(self.edbdeleteurl, headers=self.headers,data=json.dumps(data)) + + + # 检查响应状态码 + if response.status_code == 200: + data = response.json() # 假设接口返回的是JSON数据 + + logger.info('删除成功,响应为:', data) + + else: + # 请求失败,打印错误信息 + logger.info(f'Error: {response.status_code}, {response.text}') + # 主动抛出异常 + raise Exception(f'Error: {response.status_code}, {response.text}') + + def del_business(self,data): + '''' + 接口地址 + https://console-docs.apipost.cn/preview/fce869601d0be1d9/9a637c2f9ed0c589?target_id=d3cafcbf-a68c-42b3-b105-7bbd0e95a9cd + + 请求体 body + { + "IndexCode": "W001067", //指标编码 + "StartDate": "2020-04-20", //指标需要删除的开始日期(>=),如果开始日期和结束日期相等,那么就是删除该日期 + "EndDate": "2024-05-28" //指标需要删除的结束日期(<=),如果开始日期和结束日期相等,那么就是删除该日期 + } + ''' + today = datetime.date.today().strftime("%Y-%m-%d") + + # 定义你的headers,这里可以包含多个参数 + self.headers = { + 'nonce': self.signature.nonce, # 例如,一个认证令牌 + 'timestamp': str(self.signature.timestamp), # 自定义的header参数 + 'appid': self.signature.APPID, # 另一个自定义的header参数 + 'signature': self.signature.signature + } + + + # 发送post请求 上传数据 + response = requests.post(self.edbbusinessurl, headers=self.headers,data=json.dumps(data)) + + + # 检查响应状态码 + if response.status_code == 200: + data = response.json() # 假设接口返回的是JSON数据 + + logger.info('删除成功,响应为:', data) + + else: + # 请求失败,打印错误信息 + logger.info(f'Error: {response.status_code}, {response.text}') + # 主动抛出异常 + raise Exception(f'Error: {response.status_code}, {response.text}') + + +class BinanceAPI: + ''' + 获取 Binance API 请求头签名 + ''' + def __init__(self, APPID, SECRET): + self.APPID = APPID + self.SECRET = SECRET + self.get_signature() + + # 生成随机字符串作为 nonce + def generate_nonce(self, length=32): + self.nonce = ''.join(random.choices(string.ascii_letters + string.digits, k=length)) + return self.nonce + + # 获取当前时间戳(秒) + def get_timestamp(self): + return int(time.time()) + + # 构建待签名字符串 + def build_sign_str(self): + return f'appid={self.APPID}&nonce={self.nonce}×tamp={self.timestamp}' + + # 使用 HMAC SHA-256 计算签名 + def calculate_signature(self, secret, message): + return base64.urlsafe_b64encode(hmac.new(secret.encode('utf-8'), message.encode('utf-8'), hashlib.sha256).digest()).decode('utf-8') + + def get_signature(self): + # 调用上述方法生成签名 + self.nonce = self.generate_nonce() + self.timestamp = self.get_timestamp() + self.sign_str = self.build_sign_str() + self.signature = self.calculate_signature(self.SECRET, self.sign_str) + # return self.signature + +### 日志配置 + +# 创建日志目录(如果不存在) +log_dir = 'logs' +if not os.path.exists(log_dir): + os.makedirs(log_dir) + +# 配置日志记录器 +logger = logging.getLogger('pricepredict') +logger.setLevel(logging.INFO) + +# 配置文件处理器,将日志记录到文件 +file_handler = logging.handlers.RotatingFileHandler(os.path.join(log_dir, 'pricepredict.log'), maxBytes=1024 * 1024, backupCount=5) +file_handler.setFormatter(logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')) + +# 配置控制台处理器,将日志打印到控制台 +console_handler = logging.StreamHandler() +console_handler.setFormatter(logging.Formatter('%(message)s')) + +# 将处理器添加到日志记录器 +logger.addHandler(file_handler) +logger.addHandler(console_handler) + + + +# eta 接口url +sourcelisturl = 'http://10.189.2.78:8108/v1/edb/source/list' +classifylisturl = 'http://10.189.2.78:8108/v1/edb/classify/list?ClassifyType=' +uniquecodedataurl = 'http://10.189.2.78:8108/v1/edb/data?UniqueCode=4991c37becba464609b409909fe4d992&StartDate=2024-02-01' +classifyidlisturl = 'http://10.189.2.78:8108/v1/edb/list?ClassifyId=' +edbcodedataurl = 'http://10.189.2.78:8108/v1/edb/data?EdbCode=' +edbdatapushurl = 'http://10.189.2.78:8108/v1/edb/push' +edbdeleteurl = 'http://10.189.2.78:8108/v1/edb/business/edb/del' +edbbusinessurl = 'http://10.189.2.78:8108/v1/edb/business/data/del' +edbcodelist = ['CO1 Comdty', 'ovx index', 'C2404194834', 'C2404199738', 'dxy curncy', 'C2403128043', 'C2403150124', + 'DOESCRUD Index', 'WTRBM1 EEGC Index', 'FVHCM1 INDEX', 'doedtprd index', 'CFFDQMMN INDEX', + 'C2403083739', 'C2404167878', 'C2403250571', 'lmcads03 lme comdty', 'GC1 COMB Comdty', + 'C2404171822','C2404167855', + # 'W000825','W000826','G.IPE', # 美国汽柴油 + # 'S5131019','ID00135604','FSGAM1 Index','S5120408','ID00136724', # 新加坡汽柴油 + ] + +### 文件 +data_set = '原油指标数据.xlsx' # 数据集文件 +# data_set = 'INE_OIL(1).csv' +### 文件夹 +dataset = 'yuanyoudataset' # 数据集文件夹 + + +# eta 接口token +APPID = "XNLDvxZHHugj7wJ7" +SECRET = "iSeU4s6cKKBVbt94htVY1p0sqUMqb2xa" +signature = BinanceAPI(APPID, SECRET) +etadata = EtaReader(signature=signature, + classifylisturl=classifylisturl, + classifyidlisturl=classifyidlisturl, + edbcodedataurl=edbcodedataurl, + edbcodelist=edbcodelist, + edbdatapushurl=edbdatapushurl, + edbdeleteurl=edbdeleteurl, + edbbusinessurl=edbbusinessurl, + ) +df_zhibiaoshuju, df_zhibiaoliebiao = etadata.get_eta_api_yuanyou_data(data_set=data_set, dataset=dataset) # 原始数据,未处理 \ No newline at end of file diff --git a/lib/dataread.py b/lib/dataread.py index 76edfaf..f4a19e6 100644 --- a/lib/dataread.py +++ b/lib/dataread.py @@ -890,6 +890,7 @@ class BinanceAPI: self.sign_str = self.build_sign_str() self.signature = self.calculate_signature(self.SECRET, self.sign_str) # return self.signature + class Graphs: # 绘制标题 @staticmethod @@ -1013,7 +1014,22 @@ def style_row(row): class EtaReader(): def __init__(self,signature,classifylisturl,classifyidlisturl,edbcodedataurl,edbcodelist,edbdatapushurl,edbdeleteurl,edbbusinessurl): - # 获取签名 + ''' + 初始化 EtaReader 类的实例。 + + 参数: + signature (str): 用于 API 请求的签名。 + classifylisturl (str): 分类列表的 URL。 + classifyidlisturl (str): 分类 ID 列表的 URL。 + edbcodedataurl (str): EDB 代码数据的 URL。 + edbdatapushurl (str): EDB 数据推送的 URL。 + edbcodelist (str): EDB 代码列表的 URL。 + edbdeleteurl (str): EDB 数据删除的 URL。 + edbbusinessurl (str): EDB 业务数据的 URL。 + + 返回: + None + ''' self.signature = signature self.classifylisturl = classifylisturl self.classifyidlisturl = classifyidlisturl @@ -1022,7 +1038,7 @@ class EtaReader(): self.edbcodelist = edbcodelist self.edbdeleteurl = edbdeleteurl self.edbbusinessurl = edbbusinessurl - pass + def filter_yuanyou_data(self,ClassifyName,data): ''' @@ -1240,7 +1256,16 @@ class EtaReader(): raise Exception(f'Error: {response.status_code}, {response.text}') def get_eta_api_yuanyou_data(self,data_set,dataset=''): - + ''' + 从ETA API获取原油数据 + + 参数: + data_set (str): 数据集名称 + dataset (str): 数据集ID,默认为空 + + 返回: + None + ''' today = datetime.date.today().strftime("%Y-%m-%d") # 定义你的headers,这里可以包含多个参数