import requests import json def get_cptcha(): url = 'https://marketinfo.jbshihua.com/jingbo-api/captcha' res = requests.get(url) res = json.loads(res.text) res = res["data"]["img"] imgurl = 'data:image/jpeg;base64,/' + res res = requests.get(imgurl) with open('cptcha.png', 'wb') as f: f.write(res.content) def main() -> dict: login_url = "http://marketinfo.jbshihua.com/jingbo-api/api/server/login" login_data = { "data": { "account": "api_dev", "password": "ZTEwYWRjMzk0OWJhNTlhYmJlNTZlMDU3ZjIwZjg4M2U=", "tenantHashCode": "8a4577dbd919675758d57999a1e891fe", "terminal": "API" }, "funcModule": "API", "funcOperation": "获取token" } query_data_list_item_nos_url = f"http://marketinfo.jbshihua.com/jingbo-api/api/warehouse/dwDataItem/queryDataListItemNos" query_data_list_item_nos_data = { "funcModule": "数据项", "funcOperation": "查询", "data": { "dateStart": "20200101", "dateEnd": "20241231", "dataItemNoList": ["Brentjsj"] # 数据项编码,代表 brent结算价 } } login_res = requests.post( url=login_url, json=login_data, timeout=(3, 5)) text = json.loads(login_res.text) if text["status"]: token = text["data"]["accessToken"] print('获取到的token', token) headers = {"Authorization": token} print('获取数据中...') items_res = requests.post(url=query_data_list_item_nos_url, headers=headers, json=query_data_list_item_nos_data, timeout=(3, 35)) json_data = json.loads(items_res.text) print(json_data) return text return { "result": token, } class AuthHandler: def __init__(self): # 初始化登录表单数据 self.loginForm = { 'captchaToken': '', 'src': '' } self.loading = False def loadCaptcha(self): """加载验证码图片""" try: self.loading = True # 发送 POST 请求获取验证码 response = requests.post( url="https://marketinfo.jbshihua.com/jingbo-api/captcha", json={}, # 发送空 JSON 数据体 timeout=10 # 设置超时时间为 10 秒 ) # 检查响应状态码 response.raise_for_status() # 处理响应数据 data = response.json() if data: self.loginForm['captchaToken'] = data.get('token', '') self.loginForm['src'] = "data:image/jpeg;base64," + \ data.get('img', '') # 保存图片 with open('cptcha.png', 'wb') as f: f.write(res.content) except requests.exceptions.RequestException as e: print(f"请求出错: {e}") finally: self.loading = False def main2() -> dict: import requests import json login_url = "http://marketinfo.jbshihua.com/jingbo-api/api/server/login" login_data = { "data": { "account": "admin", "password": "OWZlYjcyNDAwZDRkYjEwZjE1ZjA0MTIwNDAwOGI5NjI=", "tenantHashCode": "8a4577dbd919675758d57999a1e891fe", "terminal": "PC" }, "funcModule": "登录页面", "funcOperation": "登录" } login_res = requests.post( url=login_url, json=login_data, timeout=(3, 5)) text = json.loads(login_res.text) print(text) if text["status"]: token = text["data"]["accessToken"] return { "accessToken": token, } return { "accessToken": "", } # 使用示例 if __name__ == "__main__": # auth = AuthHandler() # auth.loadCaptcha() # # 打印获取到的验证码信息 # print(f"验证码 Token: {auth.loginForm['captchaToken']}") # print(f"验证码图片: {auth.loginForm['src']}...") # 只显示前50个字符 # main2() main()