整理eta获取数据脚本

This commit is contained in:
workpc 2024-12-06 10:15:28 +08:00
parent a29c4f002d
commit 9812e92f52
2 changed files with 757 additions and 3 deletions

729
ETA获取数据.py Normal file
View File

@ -0,0 +1,729 @@
import pandas as pd
import datetime
import hashlib
import hmac
import base64
import random
import time
import string
import logging
import logging.handlers
import os
import re
import requests
class EtaReader():
def __init__(self,signature,classifylisturl,classifyidlisturl,edbcodedataurl,edbcodelist,edbdatapushurl,edbdeleteurl,edbbusinessurl):
'''
初始化 EtaReader 类的实例
参数:
signature (str): 用于 API 请求的签名
classifylisturl (str): 分类列表的 URL
classifyidlisturl (str): 分类 ID 列表的 URL
edbcodedataurl (str): EDB 代码数据的 URL
edbdatapushurl (str): EDB 数据推送的 URL
edbcodelist (str): EDB 代码列表的 URL
edbdeleteurl (str): EDB 数据删除的 URL
edbbusinessurl (str): EDB 业务数据的 URL
返回:
None
'''
self.signature = signature
self.classifylisturl = classifylisturl
self.classifyidlisturl = classifyidlisturl
self.edbcodedataurl = edbcodedataurl
self.edbdatapushurl = edbdatapushurl
self.edbcodelist = edbcodelist
self.edbdeleteurl = edbdeleteurl
self.edbbusinessurl = edbbusinessurl
def filter_yuanyou_data(self,ClassifyName,data):
'''
指标名称保留规则
'''
# 包含 关键词 去除, 返回flase
if any(keyword in data for keyword in ['运费','检修','波动率','地缘政治','股价',
'同比','环比','环差','裂差','4WMA','变频','道琼斯','标普500','纳斯达克',
'四周均值','名占比','残差','DMA',
'连7-连9','4周平均','4周均值','滚动相关性','日本']):
return False
# 检查需要的特征
# 去掉 分析 分类下的数据
if ClassifyName == '分析':
return False
# 保留 库存中特殊关键词
if ClassifyName == '库存':
if any(keyword in data for keyword in ['原油' , '美国' ,'全球' ,'中国' ,'富查伊拉','ARA' ]):
return True
else:
pass
else:
pass
# 去掉 持仓中不是基金的数据
if ClassifyName == '持仓':
if '基金' not in data:
return False
else:
pass
else:
pass
# 去掉 航班中不是中国、美国 的数据
if ClassifyName == '需求':
if '航班' in data :
if '中国' in data or '美国' in data :
return True
else:
return False
else:
pass
else:
pass
# 分类为 期货市场,同质性数据取第一个
if ClassifyName == '期货市场':
# 去掉c1-9 以后的
if 'c1-c' in data:
try:
c = int(data.split('c1-c')[1])
except:
return False
if c > 9 :
return False
else:
pass
else:
pass
# 判断 同质性数据, 字符串开头
strstartdict = {'ICE Brent c':"ICE Brent c14",
'NYMWX WTI c':"NYMWX WTI c5",
'INE SC c':"INE SC c1",
'EFS c':"EFS c",
'Dubai Swap c':"Dubai Swap c1",
'Oman Swap c':"Oman Swap c1",
'DME Oman c':"DME Oman c1",
'Murban Futures c':"Murban Futures c1",
'Dubai连合约价格':'Dubai连1合约价格',
'美国RBOB期货月份合约价格':'美国RBOB期货2309月份合约价格',
'Brent连合约价格':'Brent连1合约价格',
'WTI连合约价格':'WTI连1合约价格',
'布伦特连合约价格':'Brent连1合约价格',
'Brent 连合约价格':'Brent连1合约价格',
'Dubai连合约价格':'Dubai连1合约价格',
'Brent连':'Brent连1合约价格',
'brent连':'Brent连1合约价格',
}
# 判断名称字符串开头是否在 strstartdict.keys中
match = re.match(r'([a-zA-Z\s]+)(\d+)', data)
if match:
part1 = match.group(1)
part2 = match.group(2)
if part1 in [i for i in strstartdict.keys()]:
if data == strstartdict[part1]:
return True
else:
return False
# data = 'Brent 连7合约价格'
# 判断名称字符串去掉数字后是否在 strstartdict.keys中
match = re.findall(r'\D+', data)
if match :
if len(match) == 2:
part1 = match[0]
part2 = match[1]
if part1+part2 in [i for i in strstartdict.keys()]:
if data == strstartdict[part1+part2]:
return True
else:
return False
else:
pass
elif len(match) == 1:
match = re.findall(r'\D+', data)
part1 = match[0]
if part1 in [i for i in strstartdict.keys()]:
if data == strstartdict[part1]:
return True
else:
return False
else:
pass
else:
pass
# 去掉kpler数据源
if 'Kpler' in ClassifyName or 'kpler' in ClassifyName:
return False
return True
def filter_pp_data(self,ClassifyName,data):
'''
指标名称保留规则
'''
# 包含 关键词 去除, 返回flase
# if any(keyword in data for keyword in ['运费','检修','波动率','地缘政治','股价',
# '同比','环比','环差','裂差','4WMA','变频','道琼斯','标普500','纳斯达克',
# '四周均值','名占比','残差','DMA',
# '连7-连9','4周平均','4周均值','滚动相关性','日本']):
# return False
# 包含 关键词 保留, 返回True
if any(keyword in data for keyword in ['拉丝']):
return True
# 检查需要的特征
# 去掉 期货市场 分类下的数据
if ClassifyName == '期货市场':
return False
else:
pass
# 保留 库存 下所有指标
if ClassifyName == '库存':
return True
else:
pass
# 保留 进出口 下所有指标
if ClassifyName == '进出口':
return True
else:
pass
# 保留 价差 下所有指标
if ClassifyName == '价差':
return True
else:
pass
# 保留 供应 下所有指标
if ClassifyName == '供应':
return True
else:
pass
# 保留 需求 下所有指标
if ClassifyName == '需求':
return True
else:
pass
return True
# 通过edbcode 获取指标数据
def edbcodegetdata(self,df,EdbCode,EdbName):
# 根据指标id获取指标数据
url = self.edbcodedataurl+str(EdbCode)
# 发送GET请求
response = requests.get(url, headers=self.headers)
# 检查响应状态码
if response.status_code == 200:
data = response.json() # 假设接口返回的是JSON数据
all_data_items = data.get('Data')
# 列表转换为DataFrame
df3 = pd.DataFrame(all_data_items, columns=['DataTime', 'Value', 'UpdateTime'])
# df3 = pd.read_json(all_data_items, orient='records')
# 去掉UpdateTime 列
df3 = df3.drop(columns=['UpdateTime'])
# df3.set_index('DataTime')
df3.rename(columns={'Value': EdbName}, inplace=True)
# 将数据存储df1
df = pd.merge(df, df3, how='outer',on='DataTime',suffixes= ('', '_y'))
# 按时间排序
df = df.sort_values(by='DataTime', ascending=True)
return df
else:
# 请求失败,打印错误信息
logger.info(f'Error: {response.status_code}, {response.text}')
# 主动抛出异常
raise Exception(f'Error: {response.status_code}, {response.text}')
def get_eta_api_yuanyou_data(self,data_set,dataset=''):
'''
从ETA API获取原油数据
参数:
data_set (str): 数据集名称
dataset (str): 数据集ID默认为空
返回:
None
'''
today = datetime.date.today().strftime("%Y-%m-%d")
# 定义你的headers这里可以包含多个参数
self.headers = {
'nonce': self.signature.nonce, # 例如,一个认证令牌
'timestamp': str(self.signature.timestamp), # 自定义的header参数
'appid': self.signature.APPID, # 另一个自定义的header参数
'signature': self.signature.signature
}
# 从列表数据中获取指标名称,判断指标名称频度是否为日 如果是则获取UniqueCode然后获取指标数据保存到xlat文件中的sheet表。
'''
df = sheetname 指标列表存储 指标分类-指标名称-指标id-频度
df1 = sheetname 指标数据 ,存储 时间-指标名称1-指标名称2...
'''
# 构建新的DataFrame df df1
df = pd.DataFrame(columns=['指标分类', '指标名称', '指标id', '频度','指标来源','来源id','最后更新时间','更新周期','预警日期','停更周期'])
df1 = pd.DataFrame(columns=['DataTime'])
# 外网环境无法访问,请确认是否为内网环境
try:
# 发送GET请求 获取指标分类列表
response = requests.get(self.classifylisturl, headers=self.headers)
except requests.exceptions.RequestException as e:
raise Exception(f"请求失败,请确认是否为内网环境: {e}","\033[0m")
# 检查响应状态码
if response.status_code == 200:
# 获取成功, 处理响应内容
data = response.json() # 假设接口返回的是JSON数据
# 请求成功,处理响应内容
# logger.info(data.get('Data'))
# 定义你想要保留的固定值
fixed_value = 1214
# 遍历列表,只保留那些'category' key的值为固定值的数据项
filtered_data = [item for item in data.get('Data') if item.get('ParentId') == fixed_value]
#然后循环filtered_data去获取list数据才能获取到想要获取的ClassifyId
n = 0
for item in filtered_data:
n+= 1
# if n>50:
# break
ClassifyId = item["ClassifyId"] #分类id分类下的指标列表接口的请求参数
ClassifyName = item["ClassifyName"] #分类名称要保存到df的指标分类列
# 根据分类id获取指标列表
url = self.classifyidlisturl+str(ClassifyId)
response = requests.get(url, headers=self.headers)
if response.status_code == 200:
# logger.info(response.text)
data2 = response.json()
Data = data2.get('Data')
for i in Data:
# s+= 1
EdbCode = i.get('EdbCode')
EdbName = i.get('EdbName') # 指标名称要保存到df2的指标名称列,df的指标名称列
Frequency = i.get('Frequency') # 频度要保存到df的频度列
SourceName = i.get('SourceName') # 来源名称要保存到df的频度列
Source = i.get('Source') # 来源ID要保存到df的频度列
# 频度不是 日 或者 周的 跳过
if Frequency not in ['日度','周度','','']:
continue
# 只保留手工数据中,名称带有 海运出口 海运进口
if Source == 9 and not ('海运出口' in EdbName or '海运进口' in EdbName):
continue
# 不要wind数据
if Source == 2:
continue
# 判断名称是否需要保存
isSave = self.filter_yuanyou_data(ClassifyName,EdbName)
if isSave:
# 保存到df
df1 = self.edbcodegetdata(df1,EdbCode,EdbName)
# 取df1所有行最后一列
edbname_df = df1[['DataTime',f'{EdbName}']]
edbname_df = edbname_df.dropna()
if len(edbname_df) == 0:
logger.info(f'指标名称:{EdbName} 没有数据')
continue
try:
time_sequence = edbname_df['DataTime'].values.tolist()[-10:]
except IndexError:
time_sequence = edbname_df['DataTime'].values.tolist()
# 使用Counter来统计每个星期几的出现次数
from collections import Counter
weekday_counter = Counter(datetime.datetime.strptime(time_str, "%Y-%m-%d").strftime('%A') for time_str in time_sequence)
# 打印出现次数最多的星期几
try:
most_common_weekday = weekday_counter.most_common(1)[0][0]
# 计算两周后的日期
warning_date = (datetime.datetime.strptime(time_sequence[-1], "%Y-%m-%d") + datetime.timedelta(weeks=2)).strftime("%Y-%m-%d")
stop_update_period = (datetime.datetime.strptime(today, "%Y-%m-%d") - datetime.datetime.strptime(time_sequence[-1], "%Y-%m-%d")).days // 7
except IndexError:
most_common_weekday = '其他'
stop_update_period = 0
if '' in Frequency:
most_common_weekday = '每天'
warning_date = (datetime.datetime.strptime(time_sequence[-1], "%Y-%m-%d") + datetime.timedelta(days=3)).strftime("%Y-%m-%d")
stop_update_period = (datetime.datetime.strptime(today, "%Y-%m-%d") - datetime.datetime.strptime(time_sequence[-1], "%Y-%m-%d")).days
# 保存频度 指标名称 分类 指标id 到 df
df2 = pd.DataFrame({'指标分类': ClassifyName, '指标名称': EdbName, '指标id': EdbCode, '频度': Frequency,'指标来源':SourceName,'来源id':Source,'最后更新时间':edbname_df['DataTime'].values[-1],'更新周期':most_common_weekday,'预警日期':warning_date,'停更周期':stop_update_period},index=[0])
# df = pd.merge(df, df2, how='outer')
df = pd.concat([df, df2])
else:
logger.info(f'跳过指标 {EdbName}')
# 找到列表中不在指标列中的指标id保存成新的list
new_list = [item for item in self.edbcodelist if item not in df['指标id'].tolist()]
logger.info(new_list)
# 遍历new_list获取指标数据保存到df1
for item in new_list:
logger.info(item)
# 将item 加入到 df['指标id']中
try:
itemname = edbcodenamedict[item]
except:
itemname = item
df1 = self.edbcodegetdata(df1,item,itemname)
df = pd.concat([df, pd.DataFrame({'指标分类': '其他', '指标名称': itemname, '指标id': item, '频度': '其他','指标来源':'其他','来源id':'其他'},index=[0])])
# 按时间排序
df1.sort_values('DataTime',inplace=True,ascending=False)
df1.rename(columns={'DataTime': 'date'},inplace=True)
# df1.dropna(inplace=True)
# 去掉大于今天日期的行
df1 = df1[df1['date'] <= datetime.datetime.now().strftime('%Y-%m-%d')]
logger.info(df1.head())
# logger.info(f'{df1.head()}')
# 保存到xlsx文件的sheet表
with pd.ExcelWriter(os.path.join(dataset,data_set)) as file:
df1.to_excel(file, sheet_name='指标数据', index=False)
df.to_excel(file, sheet_name='指标列表', index=False)
df_zhibiaoshuju = df1.copy()
df_zhibiaoliebiao = df.copy()
return df_zhibiaoshuju,df_zhibiaoliebiao
def get_eta_api_pp_data(self,data_set,dataset=''):
global ClassifyId
today = datetime.date.today().strftime("%Y-%m-%d")
# 定义你的headers这里可以包含多个参数
self.headers = {
'nonce': self.signature.nonce, # 例如,一个认证令牌
'timestamp': str(self.signature.timestamp), # 自定义的header参数
'appid': self.signature.APPID, # 另一个自定义的header参数
'signature': self.signature.signature
}
# 从列表数据中获取指标名称,判断指标名称频度是否为日 如果是则获取UniqueCode然后获取指标数据保存到xlat文件中的sheet表。
'''
df = sheetname 指标列表存储 指标分类-指标名称-指标id-频度
df1 = sheetname 指标数据 ,存储 时间-指标名称1-指标名称2...
'''
# 构建新的DataFrame df df1
df = pd.DataFrame(columns=['指标分类', '指标名称', '指标id', '频度'])
df1 = pd.DataFrame(columns=['DataTime'])
# 外网环境无法访问,请确认是否为内网环境
try:
# 发送GET请求 获取指标分类列表
response = requests.get(self.classifylisturl, headers=self.headers)
except requests.exceptions.RequestException as e:
raise Exception(f"请求失败,请确认是否为内网环境: {e}","\033[0m")
# 检查响应状态码
if response.status_code == 200:
# 获取成功, 处理响应内容
data = response.json() # 假设接口返回的是JSON数据
# 请求成功,处理响应内容
# logger.info(data.get('Data'))
# 定义你想要保留的固定值
fixed_value = ClassifyId
# 遍历列表,只保留那些'category' key的值为固定值的数据项
filtered_data = [item for item in data.get('Data') if item.get('ParentId') == fixed_value]
#然后循环filtered_data去获取list数据才能获取到想要获取的ClassifyId
n = 0
for item in filtered_data:
n+= 1
# if n>50:
# break
ClassifyId = item["ClassifyId"] #分类id分类下的指标列表接口的请求参数
ClassifyName = item["ClassifyName"] #分类名称要保存到df的指标分类列
# 根据分类id获取指标列表
url = self.classifyidlisturl+str(ClassifyId)
response = requests.get(url, headers=self.headers)
if response.status_code == 200:
# logger.info(response.text)
data2 = response.json()
Data = data2.get('Data')
for i in Data:
# s+= 1
EdbCode = i.get('EdbCode')
EdbName = i.get('EdbName') # 指标名称要保存到df2的指标名称列,df的指标名称列
Frequency = i.get('Frequency') # 频度要保存到df的频度列
# 频度不是 日 或者 周的 跳过
if Frequency not in ['日度','周度','','']:
continue
# 判断名称是否需要保存
isSave = self.filter_pp_data(ClassifyName,EdbName)
if isSave:
# 保存到df
# 保存频度 指标名称 分类 指标id 到 df
df2 = pd.DataFrame({'指标分类': ClassifyName, '指标名称': EdbName, '指标id': EdbCode, '频度': Frequency},index=[0])
# df = pd.merge(df, df2, how='outer')
df = pd.concat([df, df2])
df1 = self.edbcodegetdata(df1,EdbCode,EdbName)
else:
logger.info(f'跳过指标 {EdbName}')
# 找到列表中不在指标列中的指标id保存成新的list
new_list = [item for item in self.edbcodelist if item not in df['指标id'].tolist()]
logger.info(new_list)
# 遍历new_list获取指标数据保存到df1
for item in new_list:
logger.info(item)
# 将item 加入到 df['指标id']中
try:
itemname = edbcodenamedict[item]
except:
itemname = item
df1 = self.edbcodegetdata(df1,item,itemname)
df = pd.concat([df, pd.DataFrame({'指标分类': '其他', '指标名称': itemname, '指标id': item, '频度': '其他'},index=[0])])
# 按时间排序
df1.sort_values('DataTime',inplace=True,ascending=False)
df1.rename(columns={'DataTime': 'date'},inplace=True)
# df1.dropna(inplace=True)
# 去掉大于今天日期的行
df1 = df1[df1['date'] <= datetime.datetime.now().strftime('%Y-%m-%d')]
logger.info(df1.head())
# logger.info(f'{df1.head()}')
# 保存到xlsx文件的sheet表
with pd.ExcelWriter(os.path.join(dataset,data_set)) as file:
df1.to_excel(file, sheet_name='指标数据', index=False)
df.to_excel(file, sheet_name='指标列表', index=False)
df_zhibiaoshuju = df1.copy()
df_zhibiaoliebiao = df.copy()
return df_zhibiaoshuju,df_zhibiaoliebiao
def push_data(self,data):
today = datetime.date.today().strftime("%Y-%m-%d")
# 定义你的headers这里可以包含多个参数
self.headers = {
'nonce': self.signature.nonce, # 例如,一个认证令牌
'timestamp': str(self.signature.timestamp), # 自定义的header参数
'appid': self.signature.APPID, # 另一个自定义的header参数
'signature': self.signature.signature
}
# 发送post请求 上传数据
logger.info('请求参数:',data)
response = requests.post(self.edbdatapushurl, headers=self.headers,data=json.dumps(data))
# 检查响应状态码
if response.status_code == 200:
data = response.json() # 假设接口返回的是JSON数据
logger.info('上传成功,响应为:', data)
else:
# 请求失败,打印错误信息
logger.info(f'Error: {response.status_code}, {response.text}')
# 主动抛出异常
raise Exception(f'Error: {response.status_code}, {response.text}')
def del_zhibiao(self,IndexCodeList):
today = datetime.date.today().strftime("%Y-%m-%d")
# 定义你的headers这里可以包含多个参数
self.headers = {
'nonce': self.signature.nonce, # 例如,一个认证令牌
'timestamp': str(self.signature.timestamp), # 自定义的header参数
'appid': self.signature.APPID, # 另一个自定义的header参数
'signature': self.signature.signature
}
data = {
"IndexCodeList": IndexCodeList #指标编码列表
}
# 发送post请求 上传数据
response = requests.post(self.edbdeleteurl, headers=self.headers,data=json.dumps(data))
# 检查响应状态码
if response.status_code == 200:
data = response.json() # 假设接口返回的是JSON数据
logger.info('删除成功,响应为:', data)
else:
# 请求失败,打印错误信息
logger.info(f'Error: {response.status_code}, {response.text}')
# 主动抛出异常
raise Exception(f'Error: {response.status_code}, {response.text}')
def del_business(self,data):
''''
接口地址
https://console-docs.apipost.cn/preview/fce869601d0be1d9/9a637c2f9ed0c589?target_id=d3cafcbf-a68c-42b3-b105-7bbd0e95a9cd
请求体 body
{
"IndexCode": "W001067", //指标编码
"StartDate": "2020-04-20", //指标需要删除的开始日期>=如果开始日期和结束日期相等那么就是删除该日期
"EndDate": "2024-05-28" //指标需要删除的结束日期<=如果开始日期和结束日期相等那么就是删除该日期
}
'''
today = datetime.date.today().strftime("%Y-%m-%d")
# 定义你的headers这里可以包含多个参数
self.headers = {
'nonce': self.signature.nonce, # 例如,一个认证令牌
'timestamp': str(self.signature.timestamp), # 自定义的header参数
'appid': self.signature.APPID, # 另一个自定义的header参数
'signature': self.signature.signature
}
# 发送post请求 上传数据
response = requests.post(self.edbbusinessurl, headers=self.headers,data=json.dumps(data))
# 检查响应状态码
if response.status_code == 200:
data = response.json() # 假设接口返回的是JSON数据
logger.info('删除成功,响应为:', data)
else:
# 请求失败,打印错误信息
logger.info(f'Error: {response.status_code}, {response.text}')
# 主动抛出异常
raise Exception(f'Error: {response.status_code}, {response.text}')
class BinanceAPI:
'''
获取 Binance API 请求头签名
'''
def __init__(self, APPID, SECRET):
self.APPID = APPID
self.SECRET = SECRET
self.get_signature()
# 生成随机字符串作为 nonce
def generate_nonce(self, length=32):
self.nonce = ''.join(random.choices(string.ascii_letters + string.digits, k=length))
return self.nonce
# 获取当前时间戳(秒)
def get_timestamp(self):
return int(time.time())
# 构建待签名字符串
def build_sign_str(self):
return f'appid={self.APPID}&nonce={self.nonce}&timestamp={self.timestamp}'
# 使用 HMAC SHA-256 计算签名
def calculate_signature(self, secret, message):
return base64.urlsafe_b64encode(hmac.new(secret.encode('utf-8'), message.encode('utf-8'), hashlib.sha256).digest()).decode('utf-8')
def get_signature(self):
# 调用上述方法生成签名
self.nonce = self.generate_nonce()
self.timestamp = self.get_timestamp()
self.sign_str = self.build_sign_str()
self.signature = self.calculate_signature(self.SECRET, self.sign_str)
# return self.signature
### 日志配置
# 创建日志目录(如果不存在)
log_dir = 'logs'
if not os.path.exists(log_dir):
os.makedirs(log_dir)
# 配置日志记录器
logger = logging.getLogger('pricepredict')
logger.setLevel(logging.INFO)
# 配置文件处理器,将日志记录到文件
file_handler = logging.handlers.RotatingFileHandler(os.path.join(log_dir, 'pricepredict.log'), maxBytes=1024 * 1024, backupCount=5)
file_handler.setFormatter(logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s'))
# 配置控制台处理器,将日志打印到控制台
console_handler = logging.StreamHandler()
console_handler.setFormatter(logging.Formatter('%(message)s'))
# 将处理器添加到日志记录器
logger.addHandler(file_handler)
logger.addHandler(console_handler)
# eta 接口url
sourcelisturl = 'http://10.189.2.78:8108/v1/edb/source/list'
classifylisturl = 'http://10.189.2.78:8108/v1/edb/classify/list?ClassifyType='
uniquecodedataurl = 'http://10.189.2.78:8108/v1/edb/data?UniqueCode=4991c37becba464609b409909fe4d992&StartDate=2024-02-01'
classifyidlisturl = 'http://10.189.2.78:8108/v1/edb/list?ClassifyId='
edbcodedataurl = 'http://10.189.2.78:8108/v1/edb/data?EdbCode='
edbdatapushurl = 'http://10.189.2.78:8108/v1/edb/push'
edbdeleteurl = 'http://10.189.2.78:8108/v1/edb/business/edb/del'
edbbusinessurl = 'http://10.189.2.78:8108/v1/edb/business/data/del'
edbcodelist = ['CO1 Comdty', 'ovx index', 'C2404194834', 'C2404199738', 'dxy curncy', 'C2403128043', 'C2403150124',
'DOESCRUD Index', 'WTRBM1 EEGC Index', 'FVHCM1 INDEX', 'doedtprd index', 'CFFDQMMN INDEX',
'C2403083739', 'C2404167878', 'C2403250571', 'lmcads03 lme comdty', 'GC1 COMB Comdty',
'C2404171822','C2404167855',
# 'W000825','W000826','G.IPE', # 美国汽柴油
# 'S5131019','ID00135604','FSGAM1 Index','S5120408','ID00136724', # 新加坡汽柴油
]
### 文件
data_set = '原油指标数据.xlsx' # 数据集文件
# data_set = 'INE_OIL(1).csv'
### 文件夹
dataset = 'yuanyoudataset' # 数据集文件夹
# eta 接口token
APPID = "XNLDvxZHHugj7wJ7"
SECRET = "iSeU4s6cKKBVbt94htVY1p0sqUMqb2xa"
signature = BinanceAPI(APPID, SECRET)
etadata = EtaReader(signature=signature,
classifylisturl=classifylisturl,
classifyidlisturl=classifyidlisturl,
edbcodedataurl=edbcodedataurl,
edbcodelist=edbcodelist,
edbdatapushurl=edbdatapushurl,
edbdeleteurl=edbdeleteurl,
edbbusinessurl=edbbusinessurl,
)
df_zhibiaoshuju, df_zhibiaoliebiao = etadata.get_eta_api_yuanyou_data(data_set=data_set, dataset=dataset) # 原始数据,未处理

View File

@ -890,6 +890,7 @@ class BinanceAPI:
self.sign_str = self.build_sign_str() self.sign_str = self.build_sign_str()
self.signature = self.calculate_signature(self.SECRET, self.sign_str) self.signature = self.calculate_signature(self.SECRET, self.sign_str)
# return self.signature # return self.signature
class Graphs: class Graphs:
# 绘制标题 # 绘制标题
@staticmethod @staticmethod
@ -1013,7 +1014,22 @@ def style_row(row):
class EtaReader(): class EtaReader():
def __init__(self,signature,classifylisturl,classifyidlisturl,edbcodedataurl,edbcodelist,edbdatapushurl,edbdeleteurl,edbbusinessurl): def __init__(self,signature,classifylisturl,classifyidlisturl,edbcodedataurl,edbcodelist,edbdatapushurl,edbdeleteurl,edbbusinessurl):
# 获取签名 '''
初始化 EtaReader 类的实例
参数:
signature (str): 用于 API 请求的签名
classifylisturl (str): 分类列表的 URL
classifyidlisturl (str): 分类 ID 列表的 URL
edbcodedataurl (str): EDB 代码数据的 URL
edbdatapushurl (str): EDB 数据推送的 URL
edbcodelist (str): EDB 代码列表的 URL
edbdeleteurl (str): EDB 数据删除的 URL
edbbusinessurl (str): EDB 业务数据的 URL
返回:
None
'''
self.signature = signature self.signature = signature
self.classifylisturl = classifylisturl self.classifylisturl = classifylisturl
self.classifyidlisturl = classifyidlisturl self.classifyidlisturl = classifyidlisturl
@ -1022,7 +1038,7 @@ class EtaReader():
self.edbcodelist = edbcodelist self.edbcodelist = edbcodelist
self.edbdeleteurl = edbdeleteurl self.edbdeleteurl = edbdeleteurl
self.edbbusinessurl = edbbusinessurl self.edbbusinessurl = edbbusinessurl
pass
def filter_yuanyou_data(self,ClassifyName,data): def filter_yuanyou_data(self,ClassifyName,data):
''' '''
@ -1240,7 +1256,16 @@ class EtaReader():
raise Exception(f'Error: {response.status_code}, {response.text}') raise Exception(f'Error: {response.status_code}, {response.text}')
def get_eta_api_yuanyou_data(self,data_set,dataset=''): def get_eta_api_yuanyou_data(self,data_set,dataset=''):
'''
从ETA API获取原油数据
参数:
data_set (str): 数据集名称
dataset (str): 数据集ID默认为空
返回:
None
'''
today = datetime.date.today().strftime("%Y-%m-%d") today = datetime.date.today().strftime("%Y-%m-%d")
# 定义你的headers这里可以包含多个参数 # 定义你的headers这里可以包含多个参数