PriceForecast/auptest.py

548 lines
16 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import requests
import json
import functools
from fastapi import FastAPI, HTTPException, Body,Request
from fastapi.middleware.cors import CORSMiddleware
from requests_ntlm import HttpNtlmAuth
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
from typing import Dict
'''
sql:
-- 创建名为pims_api_log的表
CREATE TABLE pims_api_log (
-- 自增的唯一标识主键
id INT AUTO_INCREMENT PRIMARY KEY,
-- 请求时间记录请求到达服务器的时间戳使用DATETIME类型方便查看具体时间
request_time DATETIME NOT NULL,
-- 请求的IP地址用于标识请求来源VARCHAR类型根据实际IP长度设置合适的长度
request_ip VARCHAR(15) NOT NULL,
-- 请求的URL记录具体是向哪个接口路径发起的请求VARCHAR类型可根据预计最长路径长度来设置长度
request_url VARCHAR(255) NOT NULL,
-- 请求方法如GET、POST等使用VARCHAR类型存储简短的方法名称
request_method VARCHAR(10) NOT NULL,
-- 接收到的请求参数以JSON格式的文本存储方便应对复杂结构的参数情况TEXT类型可存储较长的文本内容
request_params TEXT,
-- 响应状态码记录接口返回给客户端的HTTP状态码INT类型
response_status_code INT NOT NULL,
-- 响应内容同样以JSON格式的文本存储便于保存各种格式的数据返回情况TEXT类型
response_content TEXT,
-- 响应时间记录接口完成处理并返回响应的时间戳DATETIME类型
response_time DATETIME NOT NULL
);
'''
import mysql.connector
from datetime import datetime
# host = 'rm-2zehj3r1n60ttz9x5.mysql.rds.aliyuncs.com' # 服务器访问使用
# database = 'jingbo_test' # 服务器访问使用
host = 'rm-2zehj3r1n60ttz9x5ko.mysql.rds.aliyuncs.com' # 北京访问使用
database = 'jingbo-test' # 北京访问使用
# 配置数据库连接信息,根据实际情况修改
config = {
"user": "jingbo",
"password": "shihua@123",
"host": host,
"database": database
}
'''
`ID` varchar(128) NOT NULL COMMENT 'ID',
`REQUEST_METHOD` varchar(128) DEFAULT NULL COMMENT '方法名称',
`REQUEST_TIME` datetime DEFAULT NULL COMMENT '请求时间',
`REQUEST_URL` varchar(256) DEFAULT NULL COMMENT '请求URL',
`USING_FLAG` varchar(1) DEFAULT NULL COMMENT '启用状态',
`REQUEST_PARAMS` text COMMENT '接收到的请求参数',
`RESPONSE_CONTENT` text COMMENT '响应内容',
`RESPONSE_TIME` datetime DEFAULT NULL COMMENT '响应时间',
'''
def insert_api_log(request_time, request_url, request_method, request_params, response_content, response_time):
try:
# 建立数据库连接
cnx = mysql.connector.connect(**config)
cursor = cnx.cursor()
# 先查询表中已有记录的数量用于生成新记录的ID
count_query = "SELECT COUNT(*) FROM v_tbl_aup_api_log"
cursor.execute(count_query)
result = cursor.fetchone()
new_id = result[0] + 1 if result else 1 # 如果表为空ID设为1否则数量加1
# 插入数据的SQL语句
insert_query = """
INSERT INTO v_tbl_aup_api_log (ID,REQUEST_TIME, REQUEST_URL, REQUEST_METHOD, REQUEST_PARAMS, RESPONSE_CONTENT, RESPONSE_TIME)
VALUES (%s,%s, %s, %s, %s, %s, %s)
"""
# 准备要插入的数据注意数据顺序要和SQL语句中的占位符顺序一致
data = (new_id,request_time, request_url, request_method, request_params, response_content, response_time)
# 执行插入操作
cursor.execute(insert_query, data)
# 提交事务,使插入生效
cnx.commit()
except mysql.connector.Error as err:
print(f"Error: {err}")
finally:
# 关闭游标和连接
if cursor:
cursor.close()
if cnx:
cnx.close()
app = FastAPI(docs_url="/docs")
# 允许跨域请求
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_methods=["*"],
allow_headers=["*"],
)
headers = {'content-type': 'application/json;charset=UTF-8'}
# 您的 GraphQL API eg url = 'http://10.88.14.86/AspenTech/AspenUnified/api/v1/model/Chambroad20241205/graphql'
graphql_host = 'http://10.88.14.86'
graphql_path = '/AspenTech/AspenUnified/api/v1/model/Chambroad20241205/graphql'
url = graphql_host + graphql_path
query = """
mutation{
purchases{
update(inputs:[{
name:"11月度计划"
inputs:[
{
name:"CWT"
inputs:[
{
field:Cost
periodName:"1"
value: 3100
}
]
},
{
name:"CWT"
inputs:[
{
field:Cost
periodName:"1"
value: 3100
}
]
},
]
}])
}
caseExecution {
submitCaseStack(
input:{
name: "Job2"
cases: [
{name: "11月度计划"}
{name: "二催开工"}
{name: "一焦化停工"}
{name: "焦化加工油浆"}
{name: "焦化加工低硫原油"}
{name: "焦化加工低硫渣油"}
]
}
)
{id}
waitForCaseStackJob(name: "Job2")
{
started
submitted
finished
executionStatus
cases{
items{
name
objectiveValue
}
}
}
}
}
"""
payload_json = {
"query": query,
"operationName": ""
}
query2 = '''
query
{
cases
{
items
{
name
}
}
}
'''
payload_json2 = {
"query": query2,
"operationName": ""
}
graphql_username = "bw19382"
graphql_password = "Fudong3!"
auth = HttpNtlmAuth(f'{graphql_username}', f'{graphql_password}')
example_query = '''
'inputs':{
name:"11月度计划"
inputs:[
{
name:"CWT"
inputs:[
{
field:Cost
periodName:"1"
value: 3100
}
]
},
{
name:"VRa"
inputs:[
{
field:Cost
periodName:"1"
value: 3333
}
]
},
'''
def log_api_call(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
request_time = datetime.now()
request_url = None
request_method = 'post'
request_params = None
try:
# 执行被装饰的函数,获取其返回的响应
request_time = datetime.now()
response = func(*args, **kwargs)
response_time = datetime.now()
# 准备请求参数和响应内容转换为合适的字符串格式用于记录例如JSON格式字符串
request_params_str = json.dumps(request_params) if request_params else None
response_content = response.text if hasattr(response, 'text') else None
# 调用插入日志的函数将相关信息记录到数据库中假设insert_api_log函数已正确定义且可访问
insert_api_log(
request_time,
request_url,
request_method,
request_params_str,
response_content,
response_time
)
return response
except Exception as e:
print(f"Error occurred during API call: {e}")
raise
return wrapper
@app.post("/graphql")
async def post_execute_graphql_query(request: Request,
query:str = Body(query,example_query=query)
):
payload_json = {
"query": query
}
request_time = datetime.now()
full_path = str(request.url.path)
session = requests.Session()
response = session.post(url=url, headers=headers, json=payload_json, auth=auth, verify=False)
response_time = datetime.now()
# 调用插入日志的函数将相关信息记录到数据库中假设insert_api_log函数已正确定义且可访问
insert_api_log(
request_time,
full_path,
'POST',
json.dumps(payload_json),
json.dumps(response.json()),
response_time
)
if response.status_code!= 200:
raise HTTPException(status_code=response.status_code, detail=response.text)
return response.json()
# def insert_api_log(request_time, request_url, request_method, request_params, response_content, response_time):
@app.post("/cases")
async def post_cases_query_async(request: Request):
payload_json2 = {
"query": query2
}
full_path = str(request.url.path)
request_time = datetime.now()
session = requests.Session()
response = session.post(url=url, headers=headers, json=payload_json2, auth=auth, verify=False)
response_time = datetime.now()
# 调用插入日志的函数将相关信息记录到数据库中假设insert_api_log函数已正确定义且可访问
insert_api_log(
request_time,
full_path,
'POST',
json.dumps(payload_json),
json.dumps(response.json()),
response_time
)
if response.status_code!= 200:
raise HTTPException(status_code=response.status_code, detail=response.text)
return response.json()
# 定义一个函数用于生成定制化的GraphQL查询语句添加了参数类型检查
def generate_custom_graphql_query(
purchase_inputs=None,
case_execution_input=None,
wait_for_case_stack_job_name=None
):
base_query = """
mutation{
purchases{
update(inputs:[{
name:"11月度计划"
inputs:[
{
name:"CWT"
inputs:[
{
field:Cost
periodName:"1"
value: 3100
}
]
},
{
name:"CWT"
inputs:[
{
field:Cost
periodName:"1"
value: 3100
}
]
},
]
}])
}
caseExecution {
submitCaseStack(
input:{
name: "Job2"
cases: [
{name: "11月度计划"}
{name: "二催开工"}
{name: "一焦化停工"}
{name: "焦化加工油浆"}
{name: "焦化加工低硫原油"}
{name: "焦化加工低硫渣油"}
]
}
)
{id}
waitForCaseStackJob(name: "Job2")
{
started
submitted
finished
executionStatus
cases{
items{
name
objectiveValue
}
}
}
}
}
"""
# 检查purchase_inputs参数类型如果不为None需为列表类型且列表元素需为字典类型
if purchase_inputs is not None:
if not isinstance(purchase_inputs, list):
raise TypeError("purchase_inputs should be a list or None.")
for input_data in purchase_inputs:
if not isinstance(input_data, dict):
raise TypeError("Elements in purchase_inputs should be dictionaries.")
# 检查case_execution_input参数类型如果不为None需为字典类型
if case_execution_input is not None:
if not isinstance(case_execution_input, dict):
raise TypeError("case_execution_input should be a dictionary or None.")
# 检查wait_for_case_stack_job_name参数类型如果不为None需为字符串类型
if wait_for_case_stack_job_name is not None:
if not isinstance(wait_for_case_stack_job_name, str):
raise TypeError("wait_for_case_stack_job_name should be a string or None.")
if purchase_inputs:
# 购买相关的inputs部分的模板
purchase_inputs_template = """
name:"11月度计划"
inputs:[
{
name:"CWT"
inputs:[
{
field:Cost
periodName:"1"
value: 3100
}
]
},
{
name:"CWT"
inputs:[
{
field:Cost
periodName:"1"
value: 3100
}
]
},
]
"""
new_purchase_inputs_str = ""
for input_data in purchase_inputs:
input_str = f"""
name: "{input_data['name']}"
inputs: [
"""
inner_inputs = input_data.get('inputs', [])
for inner_input in inner_inputs:
inner_str = f"""
{{
field: "{inner_input['field']}"
periodName: "{inner_input['periodName']}"
value: {inner_input['value']}
}}
"""
input_str += inner_str
input_str += " ]"
new_purchase_inputs_str += input_str
base_query = base_query.replace(purchase_inputs_template, new_purchase_inputs_str)
if case_execution_input:
# caseExecution相关的input部分的模板
case_execution_input_template = """
name: "Job2"
cases: [
{name: "11月度计划"}
{name: "二催开工"}
{name: "一焦化停工"}
{name: "焦化加工油浆"}
{name: "焦化加工低硫原油"}
{name: "焦化加工低硫渣油"}
]
"""
input_dict_str = f"""
name: "{case_execution_input['name']}"
cases: [
"""
for case in case_execution_input['cases']:
case_str = f"""
{{name: "{case['name']}"}}
"""
input_dict_str += case_str
input_dict_str += " ]"
base_query = base_query.replace(case_execution_input_template, input_dict_str)
if wait_for_case_stack_job_name:
# waitForCaseStackJob部分的模板
wait_for_case_stack_job_template = "waitForCaseStackJob(name: \"Job2\")"
new_wait_for_case_stack_job_str = f"waitForCaseStackJob(name: \"{wait_for_case_stack_job_name}\")"
base_query = base_query.replace(wait_for_case_stack_job_template, new_wait_for_case_stack_job_str)
return base_query
# 定义一个POST请求的接口用于接收参数并生成GraphQL查询语句
@app.post("/generate_graphql_query")
async def generate_graphql_query(
request = Request,
purchase_inputs: list[dict] = Body(None, embed=True),
case_execution_input: dict = Body(None, embed=True),
wait_for_case_stack_job_name: str = Body(None, embed=True)
):
try:
custom_query = generate_custom_graphql_query(purchase_inputs, case_execution_input, wait_for_case_stack_job_name)
payload_json = {
"query": custom_query
}
request_time = datetime.now()
full_path = str(request.url.path)
session = requests.Session()
response = session.post(url=url, headers=headers, json=payload_json, auth=auth, verify=False)
response_time = datetime.now()
# 调用插入日志的函数将相关信息记录到数据库中假设insert_api_log函数已正确定义且可访问
insert_api_log(
request_time,
full_path,
'POST',
json.dumps(payload_json),
json.dumps(response.json()),
response_time
)
if response.status_code!= 200:
raise HTTPException(status_code=response.status_code, detail=response.text)
return response.json()
except TypeError as e:
return {"error": str(e)}
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="127.0.0.1", port=8001)