PriceForecast/test/demo.py
2025-07-23 14:03:07 +08:00

141 lines
4.0 KiB
Python

import requests
import json
def get_cptcha():
url = 'https://marketinfo.jbshihua.com/jingbo-api/captcha'
res = requests.get(url)
res = json.loads(res.text)
res = res["data"]["img"]
imgurl = '' + res
res = requests.get(imgurl)
with open('cptcha.png', 'wb') as f:
f.write(res.content)
def main() -> dict:
login_url = "http://marketinfo.jbshihua.com/jingbo-api/api/server/login"
login_data = {
"data": {
"account": "api_dev",
"password": "ZTEwYWRjMzk0OWJhNTlhYmJlNTZlMDU3ZjIwZjg4M2U=",
"tenantHashCode": "8a4577dbd919675758d57999a1e891fe",
"terminal": "API"
},
"funcModule": "API",
"funcOperation": "获取token"
}
query_data_list_item_nos_url = f"http://marketinfo.jbshihua.com/jingbo-api/api/warehouse/dwDataItem/queryDataListItemNos"
query_data_list_item_nos_data = {
"funcModule": "数据项",
"funcOperation": "查询",
"data": {
"dateStart": "20200101",
"dateEnd": "20241231",
"dataItemNoList": ["Brentjsj"] # 数据项编码,代表 brent结算价
}
}
login_res = requests.post(
url=login_url, json=login_data, timeout=(3, 5))
text = json.loads(login_res.text)
if text["status"]:
token = text["data"]["accessToken"]
print('获取到的token', token)
headers = {"Authorization": token}
print('获取数据中...')
items_res = requests.post(url=query_data_list_item_nos_url, headers=headers,
json=query_data_list_item_nos_data, timeout=(3, 35))
json_data = json.loads(items_res.text)
print(json_data)
return text
return {
"result": token,
}
class AuthHandler:
def __init__(self):
# 初始化登录表单数据
self.loginForm = {
'captchaToken': '',
'src': ''
}
self.loading = False
def loadCaptcha(self):
"""加载验证码图片"""
try:
self.loading = True
# 发送 POST 请求获取验证码
response = requests.post(
url="https://marketinfo.jbshihua.com/jingbo-api/captcha",
json={}, # 发送空 JSON 数据体
timeout=10 # 设置超时时间为 10 秒
)
# 检查响应状态码
response.raise_for_status()
# 处理响应数据
data = response.json()
if data:
self.loginForm['captchaToken'] = data.get('token', '')
self.loginForm['src'] = "data:image/jpeg;base64," + \
data.get('img', '')
# 保存图片
with open('cptcha.png', 'wb') as f:
f.write(res.content)
except requests.exceptions.RequestException as e:
print(f"请求出错: {e}")
finally:
self.loading = False
def main2() -> dict:
import requests
import json
login_url = "http://marketinfo.jbshihua.com/jingbo-api/api/server/login"
login_data = {
"data": {
"account": "admin",
"password": "OWZlYjcyNDAwZDRkYjEwZjE1ZjA0MTIwNDAwOGI5NjI=",
"tenantHashCode": "8a4577dbd919675758d57999a1e891fe",
"terminal": "PC"
},
"funcModule": "登录页面",
"funcOperation": "登录"
}
login_res = requests.post(
url=login_url, json=login_data, timeout=(3, 5))
text = json.loads(login_res.text)
print(text)
if text["status"]:
token = text["data"]["accessToken"]
return {
"accessToken": token,
}
return {
"accessToken": "",
}
# 使用示例
if __name__ == "__main__":
# auth = AuthHandler()
# auth.loadCaptcha()
# # 打印获取到的验证码信息
# print(f"验证码 Token: {auth.loginForm['captchaToken']}")
# print(f"验证码图片: {auth.loginForm['src']}...") # 只显示前50个字符
# main2()
main()