PriceForecast/auptest.py

593 lines
18 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import requests
import json
import functools
from fastapi import FastAPI, HTTPException, Body,Request
from fastapi.middleware.cors import CORSMiddleware
from requests_ntlm import HttpNtlmAuth
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
from typing import Dict
import mysql.connector
from datetime import datetime
# host = 'rm-2zehj3r1n60ttz9x5.mysql.rds.aliyuncs.com' # 服务器访问使用
# database = 'jingbo_test' # 服务器访问使用
host = 'rm-2zehj3r1n60ttz9x5ko.mysql.rds.aliyuncs.com' # 北京访问使用
database = 'jingbo-test' # 北京访问使用
# 配置数据库连接信息,根据实际情况修改
config = {
"user": "jingbo",
"password": "shihua@123",
"host": host,
"database": database
}
# GraphQL API eg url = 'http://10.88.14.86/AspenTech/AspenUnified/api/v1/model/Chambroad20241205/graphql'
graphql_host = 'http://10.88.14.86'
graphql_path = '/AspenTech/AspenUnified/api/v1/model/Chambroad20241205/graphql'
url = graphql_host + graphql_path
graphql_username = "bw19382"
graphql_password = "Fudong3!"
auth = HttpNtlmAuth(f'{graphql_username}', f'{graphql_password}')
# 请求头设置
headers = {'content-type': 'application/json;charset=UTF-8'}
def insert_api_log(request_time, request_url, request_method, request_params, response_content, response_time):
'''
请求日志表 v_tbl_aup_api_log 写入
'''
try:
# 建立数据库连接
global cnx
if cnx is None:
cnx = mysql.connector.connect(**config)
cursor = cnx.cursor()
# 先查询表中已有记录的数量用于生成新记录的ID
# count_query = "SELECT max(ID) FROM v_tbl_aup_api_log"
# cursor.execute(count_query)
# result = cursor.fetchone()
# new_id = int(result[0]) + 1 if result else 1 # 如果表为空ID设为1否则数量加1
# 插入数据的SQL语句
# insert_query = """
# INSERT INTO v_tbl_aup_api_log (ID,REQUEST_TIME, REQUEST_URL, REQUEST_METHOD, REQUEST_PARAMS, RESPONSE_CONTENT, RESPONSE_TIME)
# VALUES (%s,%s, %s, %s, %s, %s, %s)
# """
insert_query = """
INSERT INTO v_tbl_aup_api_log (REQUEST_TIME, REQUEST_URL, REQUEST_METHOD, REQUEST_PARAMS, RESPONSE_CONTENT, RESPONSE_TIME)
VALUES (%s, %s, %s, %s, %s, %s)
"""
# 准备要插入的数据注意数据顺序要和SQL语句中的占位符顺序一致
# data = (new_id,request_time, request_url, request_method, request_params, response_content, response_time)
data = (request_time, request_url, request_method, request_params.encode('utf-8'), response_content.encode('utf-8'), response_time)
# 执行插入操作
cursor.execute(insert_query, data)
# 提交事务,使插入生效
cnx.commit()
except mysql.connector.Error as err:
print(f"Error: {err}")
except UnboundLocalError as err:
print(f"Error: {err}")
finally:
# 关闭游标和连接
try:
if cursor:
cursor.close()
except UnboundLocalError:
pass
cnx = None
tags_metadata = [
{
"name": "get_cases",
"description": "获取所有cases",
},
{
"name": "generate_graphql_query",
"description": "生成Graphql查询语句并接收查询结果",
},
]
app = FastAPI(
title="AUP数据集成信息化接口",
version="0.0.1",
openapi_tags=tags_metadata,
# openapi_url=""
)
# 允许跨域请求
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_methods=["*"],
allow_headers=["*"],
)
class GraphqlQueryTemplates:
'''
从GraphqlQuery 查询语句中找到模板,并保留配置示例
'''
def __init__(self):
# 参数模板
self.purchase_inputs_template = """
name:"11月度计划"
inputs:[
{
name:"CWT"
inputs:[
{
field:Cost
periodName:"1"
value: 3100
}
]
},
{
name:"CWT"
inputs:[
{
field:Cost
periodName:"1"
value: 3100
}
]
},
]
"""
self.case_execution_input_template = """
name: "Job2"
cases: [
{name: "11月度计划"}
{name: "二催开工"}
{name: "一焦化停工"}
{name: "焦化加工油浆"}
{name: "焦化加工低硫原油"}
{name: "焦化加工低硫渣油"}
]
"""
self.wait_for_case_stack_job_template = "waitForCaseStackJob(name: \"Job2\")"
self.case_qurey = '''
query
{
cases
{
items
{
name
}
}
}
'''
# 参数示例
self.purchase_inputs = '''
[{
"name": "11月度计划",
"inputs": [
{
"name": "CWT",
"inputs": [
{
"field": "Cost",
"periodName": "1",
"value": 3100
}
]
},
{
"name": "CWT",
"inputs": [
{
"field": "Cost",
"periodName": "1",
"value": 3100
}
]
}
]
}]
'''
self.case_execution_input = '''
{
"name": "Job2",
"cases": [
{
"name": "11月度计划"
},
{
"name": "二催开工"
},
{
"name": "一焦化停工"
},
{
"name": "焦化加工油浆"
},
{
"name": "焦化加工低硫原油"
},
{
"name": "焦化加工低硫渣油"
}
]
}
'''
self.wait_for_case_stack_job_name = 'Job2'
templates = GraphqlQueryTemplates()
def generate_custom_graphql_query(
purchase_inputs=None,
case_execution_input=None,
wait_for_case_stack_job_name=None
):
base_query = """
mutation{
purchases{
update(inputs:[{
name:"11月度计划"
inputs:[
{
name:"CWT"
inputs:[
{
field:Cost
periodName:"1"
value: 3100
}
]
},
{
name:"CWT"
inputs:[
{
field:Cost
periodName:"1"
value: 3100
}
]
},
]
}])
}
caseExecution {
submitCaseStack(
input:{
name: "Job2"
cases: [
{name: "11月度计划"}
{name: "二催开工"}
{name: "一焦化停工"}
{name: "焦化加工油浆"}
{name: "焦化加工低硫原油"}
{name: "焦化加工低硫渣油"}
]
}
)
{id}
waitForCaseStackJob(name: "Job2")
{
started
submitted
finished
executionStatus
cases{
items{
name
objectiveValue
}
}
}
}
}
"""
# 检查purchase_inputs参数类型如果不为None需为列表类型且列表元素需为字典类型
if purchase_inputs is not None:
if not isinstance(purchase_inputs, list):
raise TypeError("purchase_inputs should be a list or None.")
for input_data in purchase_inputs:
if not isinstance(input_data, dict):
raise TypeError("Elements in purchase_inputs should be dictionaries.")
# 检查case_execution_input参数类型如果不为None需为字典类型
if case_execution_input is not None:
if not isinstance(case_execution_input, dict):
raise TypeError("case_execution_input should be a dictionary or None.")
# 检查wait_for_case_stack_job_name参数类型如果不为None需为字符串类型
if wait_for_case_stack_job_name is not None:
if not isinstance(wait_for_case_stack_job_name, str):
raise TypeError("wait_for_case_stack_job_name should be a string or None.")
if purchase_inputs:
new_purchase_inputs_str = "["
for input_data in purchase_inputs:
input_str = f"""
name: "{input_data['name']}"
inputs: [
"""
inner_inputs = input_data.get('inputs', [])
for inner_input in inner_inputs:
inner_str = f"""
name: "{inner_input['name']}"
inputs: [
"""
input_str += inner_str
for input in inner_input['inputs']:
inner_str = f"""
{{
field: "{input['field']}"
periodName: "{input['periodName']}"
value: {input['value']}
}}
"""
input_str += inner_str
input_str += " ]"
input_str += " ]"
new_purchase_inputs_str += input_str
base_query = base_query.replace(templates.purchase_inputs_template, new_purchase_inputs_str)
if case_execution_input:
input_dict_str = f"""
name: "{case_execution_input['name']}"
cases: [
"""
for case in case_execution_input['cases']:
case_str = f"""
{{name: "{case['name']}"}}
"""
input_dict_str += case_str
input_dict_str += " ]"
base_query = base_query.replace(templates.case_execution_input_template, input_dict_str)
if wait_for_case_stack_job_name:
new_wait_for_case_stack_job_str = f"waitForCaseStackJob(name: \"{wait_for_case_stack_job_name}\")"
base_query = base_query.replace(templates.wait_for_case_stack_job_template, new_wait_for_case_stack_job_str)
return base_query
@app.post("/generate_graphql_query",tags=['generate_graphql_query'])
async def generate_graphql_query(
request: Request,
purchase_inputs: list[dict] = Body(templates.purchase_inputs, embed=True,example_query=json.loads(templates.purchase_inputs)),
case_execution_input: dict = Body(templates.case_execution_input, embed=True,example_query=json.loads(templates.case_execution_input)),
wait_for_case_stack_job_name: str = Body(templates.wait_for_case_stack_job_name, embed=True,example_query=templates.wait_for_case_stack_job_name),
):
try:
custom_query = generate_custom_graphql_query(purchase_inputs, case_execution_input, wait_for_case_stack_job_name)
payload_json = {
"query": custom_query
}
request_time = datetime.now()
full_path = str(request.url.path)
session = requests.Session()
try:
response = session.post(url=url, headers=headers, json=payload_json, auth=auth, verify=False, timeout=300)
except requests.exceptions.ConnectTimeout as e:
# 构造符合错误情况的响应数据字典
error_response_data = {
"errors": [{"message": "连接超时,请检查网络或稍后重试"}],
"data": {},
"status_code": 503 # 使用合适的状态码如503表示服务暂时不可用可根据具体错误类型调整
}
response = error_response_data
raise HTTPException(status_code=503, detail=response) # 抛出合适状态码的HTTPException
except requests.exceptions.RequestException as e:
# 捕获其他请求相关的异常,统一处理
error_response_data = {
"errors": [{"message": "请求出现其他错误,请联系管理员"}],
"data": {},
"status_code": 500
}
response = error_response_data
raise HTTPException(status_code=500, detail=response)
finally:
response_time = datetime.now()
try:
res = response.json()
except (UnboundLocalError,AttributeError):
res = response
# 调用插入日志的函数将相关信息记录到数据库中假设insert_api_log函数已正确定义且可访问
insert_api_log(
request_time,
full_path,
request.method,
json.dumps(payload_json),
json.dumps(res),
response_time
)
if response.status_code!= 200:
raise HTTPException(status_code=response.status_code, detail=response.text)
print(response.json())
return response.json()
except TypeError as e:
return {"error": str(e)}
@app.get("/get_cases",tags=['get_cases'])
async def get_cases_query_async(request: Request):
payload_json2 = {
"query": templates.case_qurey
}
full_path = str(request.url.path)
request_time = datetime.now()
session = requests.Session()
try:
response = session.post(url=url, headers=headers, json=payload_json2, auth=auth, verify=False)
# 将JSON字符串解析为Python字典对象
res = response.json()
# # 提取name列表
# name_list = [item["name"] for item in res["data"]["cases"]["items"]]
# res['name_list'] = name_list
except requests.exceptions.ConnectTimeout as e:
# 构造符合错误情况的响应数据字典
error_response_data = {
"errors": [{"message": "连接超时,请检查网络或稍后重试"}],
"data": {},
"status_code": 503 # 使用合适的状态码如503表示服务暂时不可用可根据具体错误类型调整
}
res = error_response_data
raise HTTPException(status_code=503, detail=res) # 抛出合适状态码的HTTPException
except requests.exceptions.RequestException as e:
# 捕获其他请求相关的异常,统一处理
error_response_data = {
"errors": [{"message": "请求出现其他错误,请联系管理员"}],
"data": {},
"status_code": 500
}
res = error_response_data
raise HTTPException(status_code=500, detail=res)
finally:
response_time = datetime.now()
# 调用插入日志的函数将相关信息记录到数据库中假设insert_api_log函数已正确定义且可访问
insert_api_log(
request_time,
full_path,
request.method,
json.dumps(payload_json2),
json.dumps(res),
response_time
)
if response.status_code!= 200:
raise HTTPException(status_code=response.status_code, detail=response.text)
return res
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8003)
# query = """
# mutation{
# purchases{
# update(inputs:[{
# name:"11月度计划"
# inputs:[
# {
# name:"CWT"
# inputs:[
# {
# field:Cost
# periodName:"1"
# value: 3100
# }
# ]
# },
# {
# name:"CWT"
# inputs:[
# {
# field:Cost
# periodName:"1"
# value: 3100
# }
# ]
# },
# ]
# }])
# }
# caseExecution {
# submitCaseStack(
# input:{
# name: "Job2"
# cases: [
# {name: "11月度计划"}
# {name: "二催开工"}
# {name: "一焦化停工"}
# {name: "焦化加工油浆"}
# {name: "焦化加工低硫原油"}
# {name: "焦化加工低硫渣油"}
# ]
# }
# )
# {id}
# waitForCaseStackJob(name: "Job2")
# {
# started
# submitted
# finished
# executionStatus
# cases{
# items{
# name
# objectiveValue
# }
# }
# }
# }
# }
# """
# payload_json = {
# "query": query,
# "operationName": ""
# }
# query2 = '''
# query
# {
# cases
# {
# items
# {
# name
# }
# }
# }
# '''
# payload_json2 = {
# "query": query2,
# "operationName": ""
# }
# @app.post("/graphql")
# async def post_execute_graphql_query(request: Request,
# query:str = Body(query,example_query=query)
# ):
# payload_json = {
# "query": query
# }
# request_time = datetime.now()
# full_path = str(request.url.path)
# session = requests.Session()
# response = session.post(url=url, headers=headers, json=payload_json, auth=auth, verify=False)
# response_time = datetime.now()
# # 调用插入日志的函数将相关信息记录到数据库中假设insert_api_log函数已正确定义且可访问
# insert_api_log(
# request_time,
# full_path,
# 'POST',
# json.dumps(payload_json),
# json.dumps(response.json()),
# response_time
# )
# if response.status_code!= 200:
# raise HTTPException(status_code=response.status_code, detail=response.text)
# return response.json()
# def insert_api_log(request_time, request_url, request_method, request_params, response_content, response_time):