PriceForecast/auptest.py

220 lines
5.8 KiB
Python
Raw Normal View History

2024-12-18 17:51:01 +08:00
from fastapi import FastAPI, HTTPException, Body
from fastapi.middleware.cors import CORSMiddleware
import requests
from requests_ntlm import HttpNtlmAuth
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
'''
sql:
-- 创建名为pims_api_log的表
CREATE TABLE pims_api_log (
-- 自增的唯一标识主键
id INT AUTO_INCREMENT PRIMARY KEY,
-- 请求时间记录请求到达服务器的时间戳使用DATETIME类型方便查看具体时间
request_time DATETIME NOT NULL,
-- 请求的IP地址用于标识请求来源VARCHAR类型根据实际IP长度设置合适的长度
request_ip VARCHAR(15) NOT NULL,
-- 请求的URL记录具体是向哪个接口路径发起的请求VARCHAR类型可根据预计最长路径长度来设置长度
request_url VARCHAR(255) NOT NULL,
-- 请求方法如GETPOST等使用VARCHAR类型存储简短的方法名称
request_method VARCHAR(10) NOT NULL,
-- 接收到的请求参数以JSON格式的文本存储方便应对复杂结构的参数情况TEXT类型可存储较长的文本内容
request_params TEXT,
-- 响应状态码记录接口返回给客户端的HTTP状态码INT类型
response_status_code INT NOT NULL,
-- 响应内容同样以JSON格式的文本存储便于保存各种格式的数据返回情况TEXT类型
response_content TEXT,
-- 响应时间记录接口完成处理并返回响应的时间戳DATETIME类型
response_time DATETIME NOT NULL
);
'''
import mysql.connector
from datetime import datetime
# 配置数据库连接信息,根据实际情况修改
config = {
"user": "your_username",
"password": "your_password",
"host": "your_host",
"database": "your_database"
}
def insert_api_log(request_time, request_ip, request_url, request_method, request_params, response_status_code, response_content, response_time):
try:
# 建立数据库连接
cnx = mysql.connector.connect(**config)
cursor = cnx.cursor()
# 插入数据的SQL语句
insert_query = """
INSERT INTO pims_api_log (request_time, request_ip, request_url, request_method, request_params, response_status_code, response_content, response_time)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
"""
# 准备要插入的数据注意数据顺序要和SQL语句中的占位符顺序一致
data = (request_time, request_ip, request_url, request_method, request_params, response_status_code, response_content, response_time)
# 执行插入操作
cursor.execute(insert_query, data)
# 提交事务,使插入生效
cnx.commit()
except mysql.connector.Error as err:
print(f"Error: {err}")
finally:
# 关闭游标和连接
if cursor:
cursor.close()
if cnx:
cnx.close()
app = FastAPI(docs_url="/docs")
# 允许跨域请求
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_methods=["*"],
allow_headers=["*"],
)
headers = {'content-type': 'application/json;charset=UTF-8'}
# 您的 GraphQL API eg url = 'http://10.88.14.86/AspenTech/AspenUnified/api/v1/model/Chambroad20241205/graphql'
graphql_host = '10.88.14.86'
graphql_path = '/AspenTech/AspenUnified/api/v1/model/Chambroad20241205/graphql'
query = """
mutation{
purchases{
update(inputs:[%s
]
}])
}
caseExecution {
submitCaseStack(
input:{
name: "Job2"
cases: [
{name: "11月度计划"}
{name: "二催开工"}
{name: "一焦化停工"}
{name: "焦化加工油浆"}
{name: "焦化加工低硫原油"}
{name: "焦化加工低硫渣油"}
]
}
)
{id}
waitForCaseStackJob(name: "Job2")
{
started
submitted
finished
executionStatus
cases{
items{
name
objectiveValue
}
}
}
}
}
"""
payload_json = {
"query": query,
"operationName": ""
}
graphql_username = "bw19382"
graphql_password = "Fudong3!"
auth = HttpNtlmAuth(f'{graphql_username}', f'{graphql_password}')
example_query = '''
'inputs':{
name:"11月度计划"
inputs:[
{
name:"CWT"
inputs:[
{
field:Cost
periodName:"1"
value: 3100
}
]
},
{
name:"VRa"
inputs:[
{
field:Cost
periodName:"1"
value: 3333
}
]
},
'''
@app.post("/graphql")
async def execute_graphql_query(
query: str = Body(..., example=example_query) # 使用Body和example参数添加示例
):
session = requests.Session()
response = session.post(url=url, headers=headers, json=payload_json, auth=auth, verify=False)
if response.status_code != 200:
raise HTTPException(status_code=response.status_code, detail=response.text)
return response.json()
query2 = '''
query
{
cases
{
items
{
name
}
}
}
'''
payload_json2 = {
"query": query2,
"operationName": ""
}
@app.get("/cases")
async def get_cases_query_async():
session = requests.Session()
response = session.post(url=url, headers=headers, json=payload_json2, auth=auth, verify=False)
insert_api_log(
datetime.now(),
'IP_ADDRESS '12
'IP_ADDRESS',
'URL_ADDRESS,
'http://127.0.0.1:8000/cases',
'GET',
'',
response.status_code,
response.text,
datetime.now()
)
if response.status_code!= 200:
raise HTTPException(status_code=response.status_code, detail=response.text)
return response.json()
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="127.0.0.1", port=8000)