import numpy as np import logging import requests import json import xlrd import xlwt from datetime import datetime, timedelta import time import pandas as pd pd.set_option('display.max_columns', None) # 配置日志功能 logging.basicConfig( filename='沥青定性每日执行.log', # 日志文件名 level=logging.INFO, # 日志级别,INFO 表示记录所有信息 format='%(asctime)s - %(levelname)s - %(message)s', # 日志格式 datefmt='%Y-%m-%d %H:%M:%S' # 日期格式 ) # 变量定义 login_url = "http://10.200.32.39/jingbo-api/api/server/login" login_push_url = "http://10.200.32.39/jingbo-api/api/server/login" # query_data_list_item_nos_url # jingbo-dev/api/warehouse/dwDataItem/queryDataListItemNos search_url = "http://10.200.32.39/jingbo-api/api/warehouse/dwDataItem/queryByItemNos" upload_url = "http://10.200.32.39/jingbo-api/api/dw/dataValue/pushDataValueList" queryDataListItemNos_url = "http://10.200.32.39/jingbo-api//api/warehouse/dwDataItem/queryDataListItemNos" query_data_list_item_nos_data = { "funcModule": "数据项", "funcOperation": "查询", "data": { "dateStart": "20200101", "dateEnd": "20241231", "dataItemNoList": ["Brentzdj", "Brentzgj"] # 数据项编码,代表 brent最低价和最高价 } } login_data = { "data": { "account": "api_dev", "password": "ZTEwYWRjMzk0OWJhNTlhYmJlNTZlMDU3ZjIwZjg4M2U=", "tenantHashCode": "8a4577dbd919675758d57999a1e891fe", "terminal": "API" }, "funcModule": "API", "funcOperation": "获取token" } login_push_data = { "data": { "account": "api_dev", "password": "ZTEwYWRjMzk0OWJhNTlhYmJlNTZlMDU3ZjIwZjg4M2U=", "tenantHashCode": "8a4577dbd919675758d57999a1e891fe", "terminal": "API" }, "funcModule": "API", "funcOperation": "获取token" } read_file_path_name = "定性模型数据项12-11.xlsx" one_cols = [] two_cols = [] def get_head_auth(): try: login_res = requests.post( url=login_url, json=login_data, timeout=(3, 5)) text = json.loads(login_res.text) if text["status"]: token = text["data"]["accessToken"] logging.info("成功获取认证 token") return token else: logging.error("获取认证失败,响应信息: %s", login_res.text) print("获取认证失败") return None except Exception as e: logging.error("获取认证时发生异常: %s", str(e)) return None def get_head_push_auth(): try: login_res = requests.post( url=login_push_url, json=login_push_data, timeout=(3, 5)) text = json.loads(login_res.text) if text["status"]: token = text["data"]["accessToken"] logging.info("成功获取推送认证 token") return token else: logging.error("获取推送认证失败,响应信息: %s", login_res.text) print("获取认证失败") return None except Exception as e: logging.error("获取推送认证时发生异常: %s", str(e)) return None def update_e_value(file_path, column_index, threshold): """ 数据修正需求:2025年1月8日 如果如果今天的成本即期价跟昨天的成本价差正负1000以上,就按照昨天的成本价计算 更新Excel文件中指定列的值,如果新值与前一天的值变化大于阈值,则将新值改为前一天的值。 :param file_path: Excel文件路径 :param column_index: 需要更新的列索引 :param threshold: 变化阈值 """ try: logging.info("开始更新 Excel 文件中指定列的值,文件路径: %s", file_path) df = pd.read_excel(file_path) df = df.applymap(lambda x: float( x) if isinstance(x, (int, float)) else x) df = df.fillna(method='ffill') df1 = df[-3:-1] previous_value = df1.iloc[0, column_index] current_value = df1.iloc[1, column_index] if abs(current_value - previous_value) > threshold: df.iloc[-2, column_index] = previous_value logging.info("指定列值变化大于阈值,已将当前值修改为前一天的值") df.to_excel(file_path, index=False, engine='openpyxl') logging.info("Excel 文件更新完成") except Exception as e: logging.error("更新 Excel 文件时发生异常: %s", str(e)) def getLogToken(): try: login_res = requests.post( url=login_url, json=login_data, timeout=(3, 5)) text = json.loads(login_res.text) if text["status"]: token = text["data"]["accessToken"] logging.info("成功获取日志 token") return token else: logging.error("获取日志 token 失败,响应信息: %s", login_res.text) print("获取认证失败") token = None return token except Exception as e: logging.error("获取日志 token 时发生异常: %s", str(e)) return None def updateExcelDatabak(date='', token=None): try: logging.info("开始备份更新 Excel 数据,日期: %s", date) workbook = xlrd.open_workbook(read_file_path_name) sheet = workbook.sheet_by_index(0) row_data = sheet.row_values(1) one_cols = row_data cur_time, cur_time2 = getNow(date) search_data = { "data": { "date": cur_time, "dataItemNoList": one_cols[1:] }, "funcModule": "数据项", "funcOperation": "查询" } headers = {"Authorization": token} search_res = requests.post( url=search_url, headers=headers, json=search_data, timeout=(3, 5)) search_value = json.loads(search_res.text)["data"] if search_value: datas = search_value else: datas = None append_rows = [cur_time2] dataItemNo_dataValue = {} for data_value in datas: if "dataValue" not in data_value: print(data_value) dataItemNo_dataValue[data_value["dataItemNo"]] = "" else: dataItemNo_dataValue[data_value["dataItemNo"] ] = data_value["dataValue"] for value in one_cols[1:]: if value in dataItemNo_dataValue: append_rows.append(dataItemNo_dataValue[value]) else: append_rows.append("") workbook = xlrd.open_workbook('定性模型数据项12-11.xlsx') sheet_count = len(workbook.sheet_names()) sheet_names = workbook.sheet_names() new_workbook = xlwt.Workbook() for i in range(sheet_count): sheet = workbook.sheet_by_index(i) row_count = sheet.nrows col_count = sheet.ncols data = [] for row in range(row_count): row_data = [] for col in range(col_count): row_data.append(sheet.cell_value(row, col)) data.append(row_data) new_sheet = new_workbook.add_sheet(sheet_names[i]) for row in range(row_count): for col in range(col_count): new_sheet.write(row, col, data[row][col]) if i == 0: for col in range(col_count): new_sheet.write(row_count, col, append_rows[col]) new_workbook.save("定性模型数据项12-11.xlsx") logging.info("备份更新 Excel 数据完成") except Exception as e: logging.error("备份更新 Excel 数据时发生异常: %s", str(e)) def updateYesterdayExcelData(date='', token=None): try: logging.info("开始更新昨天的 Excel 数据,日期: %s", date) df = pd.read_excel(read_file_path_name, engine='openpyxl') one_cols = df.iloc[0, :].tolist() if date == '': previous_date = (datetime.now() - timedelta(days=1) ).strftime('%Y-%m-%d') else: previous_date = (datetime.strptime(date, "%Y-%m-%d") - timedelta(days=1)).strftime('%Y-%m-%d') cur_time, cur_time2 = getNow(previous_date) search_data = { "data": { "date": cur_time, "dataItemNoList": one_cols[1:] }, "funcModule": "数据项", "funcOperation": "查询" } headers = {"Authorization": token} search_res = requests.post( url=search_url, headers=headers, json=search_data, timeout=(3, 5)) search_value = json.loads(search_res.text)["data"] if search_value: datas = search_value else: datas = None append_rows = [cur_time2] dataItemNo_dataValue = {} for data_value in datas: if "dataValue" not in data_value: print(data_value) dataItemNo_dataValue[data_value["dataItemNo"]] = "" else: dataItemNo_dataValue[data_value["dataItemNo"] ] = data_value["dataValue"] for value in one_cols[1:]: if value in dataItemNo_dataValue: append_rows.append(dataItemNo_dataValue[value]) else: append_rows.append("") print('更新数据前') print(df.tail(1)) if previous_date not in df['日期'].values: new_row = pd.DataFrame([append_rows], columns=df.columns.tolist()) df = pd.concat([df, new_row], ignore_index=True) else: print('日期存在,即将更新') print('新数据', append_rows[1:]) df.loc[df['日期'] == previous_date, df.columns.tolist()[1:]] = append_rows[1:] print('更新数据后') print(df.tail(1)) df.to_excel("定性模型数据项12-11.xlsx", index=False, engine='openpyxl') logging.info("更新昨天的 Excel 数据完成") except Exception as e: logging.error("更新昨天的 Excel 数据时发生异常: %s", str(e)) def updateExcelData(date='', token=None): try: logging.info("开始更新 Excel 数据,日期: %s", date) df = pd.read_excel(read_file_path_name, engine='openpyxl') one_cols = df.iloc[0, :].tolist() cur_time, cur_time2 = getNow(date) search_data = { "data": { "date": cur_time, "dataItemNoList": one_cols[1:] }, "funcModule": "数据项", "funcOperation": "查询" } headers = {"Authorization": token} search_res = requests.post( url=search_url, headers=headers, json=search_data, timeout=(3, 5)) search_value = json.loads(search_res.text)["data"] if search_value: datas = search_value else: datas = None append_rows = [cur_time2] dataItemNo_dataValue = {} for data_value in datas: if "dataValue" not in data_value: print(data_value) dataItemNo_dataValue[data_value["dataItemNo"]] = "" else: dataItemNo_dataValue[data_value["dataItemNo"] ] = data_value["dataValue"] for value in one_cols[1:]: if value in dataItemNo_dataValue: append_rows.append(dataItemNo_dataValue[value]) else: append_rows.append("") new_row = pd.DataFrame([append_rows], columns=df.columns.tolist()) df = pd.concat([df, new_row], ignore_index=True) df.to_excel("定性模型数据项12-11.xlsx", index=False, engine='openpyxl') logging.info("更新 Excel 数据完成") except Exception as e: logging.error("更新 Excel 数据时发生异常: %s", str(e)) def qualitativeModel(): try: logging.info("开始执行定性模型计算") df = pd.read_excel('定性模型数据项12-11.xlsx') df = df.fillna(df.ffill()) df1 = df[-3:-1].reset_index() if df1.loc[1, '70号沥青开工率'] / 100 > 0.3: a = -(df1.loc[1, '70号沥青开工率'] / 100 - 0.2)*5/0.1 else: a = 0 b = df1.loc[1, '资金因素'] df1.loc[1, '昨日计划提货偏差'] = df1.loc[1, '京博产量'] - df1.loc[1, '计划产量'] if df1.loc[1, '昨日计划提货偏差'] > 0: c = df1.loc[1, '昨日计划提货偏差']*10/2000 else: c = df1.loc[1, '昨日计划提货偏差']*10/3000 d = (df1.loc[1, '京博产量'] - df1.loc[1, '计划产量']) / 500 * 5 if df1.loc[1, '基质沥青库存']/265007 > 0.8: e = (df1.loc[1, '基质沥青库存'] - df1.loc[0, '基质沥青库存'])*10/-5000 else: e = 0 f = 1 if abs(df1.loc[1, '即期成本'] - df1.loc[0, '即期成本']) >= 100: g = (df1.loc[1, '即期成本'] - df1.loc[0, '即期成本'])*50/100 else: g = 0 h = df1.loc[1, '订单结构'] x = round(0.08*a+0*b+0.15*c+0.08*d + 0.03*e + 0.08 * f + 0.4*g+0.18*h+df1.loc[0, '京博指导价'], 2) logging.info("定性模型计算完成,预测结果: %s", x) return x except Exception as e: logging.error("定性模型计算时发生异常: %s", str(e)) return None def getNow(date='', offset=0): try: if date == '': now = datetime.now() - timedelta(days=offset) else: try: date = datetime.strptime(date, "%Y-%m-%d") except: date = datetime.strptime(date, "%Y%m%d") now = date year = now.year month = now.month day = now.day if month < 10: month = "0" + str(month) if day < 10: day = "0" + str(day) cur_time = str(year) + str(month) + str(day) cur_time2 = str(year) + "-" + str(month) + "-" + str(day) return cur_time, cur_time2 except Exception as e: logging.error("获取当前日期时发生异常: %s", str(e)) return None, None def pushData(cur_time, x, token_push): try: logging.info("开始推送数据,日期: %s,预测值: %s", cur_time, x) data1 = { "funcModule": "数据表信息列表", "funcOperation": "新增", "data": [ {"dataItemNo": "C01100036|Forecast_Price|DX|ACN", "dataDate": cur_time, "dataStatus": "add", "dataValue": x } ] } headers1 = {"Authorization": token_push} res = requests.post(url=upload_url, headers=headers1, json=data1, timeout=(3, 5)) logging.info("数据推送完成,响应信息: %s", res.text) except Exception as e: logging.error("数据推送时发生异常: %s", str(e)) def start_2(date='', token=None): try: logging.info("开始执行 start_2 函数,日期: %s", date) workbook = xlrd.open_workbook(read_file_path_name) sheet = workbook.sheet_by_index(0) num_rows = sheet.nrows row_data = sheet.row_values(1) one_cols = row_data cur_time, cur_time2 = getNow(date) search_data = { "data": { "date": cur_time, "dataItemNoList": one_cols[1:] }, "funcModule": "数据项", "funcOperation": "查询" } headers = {"Authorization": token} search_res = requests.post( url=search_url, headers=headers, json=search_data, timeout=(3, 5)) search_value = json.loads(search_res.text)["data"] if search_value: datas = search_value else: datas = None append_rows = [cur_time2] dataItemNo_dataValue = {} for data_value in datas: if "dataValue" not in data_value: print(data_value) dataItemNo_dataValue[data_value["dataItemNo"]] = "" else: dataItemNo_dataValue[data_value["dataItemNo"] ] = data_value["dataValue"] for value in one_cols[1:]: if value in dataItemNo_dataValue: append_rows.append(dataItemNo_dataValue[value]) else: append_rows.append("") workbook = xlrd.open_workbook('定性模型数据项12-11.xlsx') sheet_count = len(workbook.sheet_names()) sheet_names = workbook.sheet_names() new_workbook = xlwt.Workbook() for i in range(sheet_count): sheet = workbook.sheet_by_index(i) row_count = sheet.nrows col_count = sheet.ncols data = [] for row in range(row_count): row_data = [] for col in range(col_count): row_data.append(sheet.cell_value(row, col)) data.append(row_data) new_sheet = new_workbook.add_sheet(sheet_names[i]) for row in range(row_count): for col in range(col_count): new_sheet.write(row, col, data[row][col]) if i == 0: for col in range(col_count): new_sheet.write(row_count, col, append_rows[col]) new_workbook.save("定性模型数据项12-11.xlsx") update_e_value('定性模型数据项12-11.xlsx', 8, 1000) df = pd.read_excel('定性模型数据项12-11.xlsx') df = df.fillna(df.ffill()) df1 = df[-2:].reset_index() if df1.loc[1, '70号沥青开工率'] > 30: a = (df1.loc[1, '70号沥青开工率']-0.2)*5/0.1 else: a = 0 b = df1.loc[1, '资金因素'] if df1.loc[1, '昨日计划提货偏差'] > 0: c = df1.loc[1, '昨日计划提货偏差']*10/2000 else: c = df1.loc[1, '昨日计划提货偏差']*10/3000 d = df1.loc[1, '生产情况'] if df1.loc[1, '基质沥青库存']/265007 > 0.8: e = (df1.loc[1, '基质沥青库存'] - df1.loc[0, '基质沥青库存'])*10/-5000 else: e = 0 f = 1 if abs(df1.loc[1, '即期成本'] - df1.loc[0, '即期成本']) >= 100: g = (df1.loc[1, '即期成本'] - df1.loc[0, '即期成本'])*50/100 else: g = 0 h = df1.loc[1, '订单结构'] x = round(0.08*a+0*b+0.15*c+0.08*d + 0.03*e + 0.08 * f + 0.4*g+0.18*h+df1.loc[0, '京博指导价'], 2) login_res1 = requests.post( url=login_push_url, json=login_push_data, timeout=(3, 5)) text1 = json.loads(login_res1.text) token_push = text1["data"]["accessToken"] data1 = { "funcModule": "数据表信息列表", "funcOperation": "新增", "data": [ {"dataItemNo": "C01100036|Forecast_Price|DX|ACN", "dataDate": cur_time, "dataStatus": "add", "dataValue": x } ] } headers1 = {"Authorization": token_push} # res = requests.post(url=upload_url, headers=headers1, json=data1, timeout=(3, 5)) logging.info("start_2 函数执行完成") except Exception as e: logging.error("start_2 函数执行时发生异常: %s", str(e)) def start(now=None): try: logging.info("开始执行 start 函数") workbook = xlrd.open_workbook(read_file_path_name) sheet = workbook.sheet_by_index(0) num_rows = sheet.nrows row_data = sheet.row_values(1) one_cols = row_data login_res = requests.post( url=login_url, json=login_data, timeout=(3, 5)) text = json.loads(login_res.text) if text["status"]: token = text["data"]["accessToken"] else: logging.error("获取认证失败,响应信息: %s", login_res.text) print("获取认证失败") token = None if now is None: now = datetime.now() year = now.year month = now.month day = now.day if month < 10: month = "0" + str(month) if day < 10: day = "0" + str(day) cur_time = str(year) + str(month) + str(day) cur_time2 = str(year) + "-" + str(month) + "-" + str(day) search_data = { "data": { "date": cur_time, "dataItemNoList": one_cols[1:] }, "funcModule": "数据项", "funcOperation": "查询" } headers = {"Authorization": token} search_res = requests.post( url=search_url, headers=headers, json=search_data, timeout=(3, 5)) search_value = json.loads(search_res.text)["data"] if search_value: datas = search_value else: datas = None append_rows = [cur_time2] dataItemNo_dataValue = {} for data_value in datas: if "dataValue" not in data_value: print(data_value) dataItemNo_dataValue[data_value["dataItemNo"]] = "" else: dataItemNo_dataValue[data_value["dataItemNo"] ] = data_value["dataValue"] for value in one_cols[1:]: if value in dataItemNo_dataValue: append_rows.append(dataItemNo_dataValue[value]) else: append_rows.append("") workbook = xlrd.open_workbook('定性模型数据项12-11.xlsx') sheet_count = len(workbook.sheet_names()) sheet_names = workbook.sheet_names() new_workbook = xlwt.Workbook() for i in range(sheet_count): sheet = workbook.sheet_by_index(i) row_count = sheet.nrows col_count = sheet.ncols data = [] for row in range(row_count): row_data = [] for col in range(col_count): row_data.append(sheet.cell_value(row, col)) data.append(row_data) new_sheet = new_workbook.add_sheet(sheet_names[i]) for row in range(row_count): for col in range(col_count): new_sheet.write(row, col, data[row][col]) if i == 0: for col in range(col_count): new_sheet.write(row_count, col, append_rows[col]) new_workbook.save("定性模型数据项12-11.xlsx") update_e_value('定性模型数据项12-11.xlsx', 8, 1000) df = pd.read_excel('定性模型数据项12-11.xlsx') df = df.fillna(df.ffill()) df1 = df[-2:].reset_index() if df1.loc[1, '70号沥青开工率'] / 100 > 0.3: a = (df1.loc[1, '70号沥青开工率'] / 100 - 0.2)*5/0.1 else: a = 0 b = df1.loc[1, '资金因素'] if df1.loc[1, '昨日计划提货偏差'] > 0: c = df1.loc[1, '昨日计划提货偏差']*10/2000 else: c = df1.loc[1, '昨日计划提货偏差']*10/3000 d = df1.loc[1, '生产情况'] if df1.loc[1, '基质沥青库存']/265007 > 0.8: e = (df1.loc[1, '基质沥青库存'] - df1.loc[0, '基质沥青库存'])*10/-5000 else: e = 0 f = 1 if abs(df1.loc[1, '即期成本'] - df1.loc[0, '即期成本']) >= 100: g = (df1.loc[1, '即期成本'] - df1.loc[0, '即期成本'])*50/100 else: g = 0 h = df1.loc[1, '订单结构'] x = round(0.08*a+0*b+0.15*c+0.08*d + 0.03*e + 0.08 * f + 0.4*g+0.18*h+df1.loc[0, '京博指导价'], 2) # login_res1 = requests.post(url=login_url, json=login_data, timeout=(3, 30)) # text1 = json.loads(login_res1.text) # token_push = text1["data"]["accessToken"] # data1 = { # "funcModule": "数据表信息列表", # "funcOperation": "新增", # "data": [ # {"dataItemNo": "C01100036|Forecast_Price|DX|ACN", # "dataDate": cur_time, # "dataStatus": "add", # "dataValue": x # } # ] # } # headers1 = {"Authorization": token_push} # res = requests.post(url=upload_url, headers=headers1, json=data1, timeout=(3, 5)) logging.info("start 函数执行完成") except Exception as e: logging.error("start 函数执行时发生异常: %s", str(e)) def start_test(): try: logging.info("开始执行 start_test 函数") workbook = xlrd.open_workbook(read_file_path_name) sheet = workbook.sheet_by_index(0) num_rows = sheet.nrows row_data = sheet.row_values(1) one_cols = row_data login_res = requests.post( url=login_url, json=login_data, timeout=(3, 5)) text = json.loads(login_res.text) if text["status"]: token = text["data"]["accessToken"] else: logging.error("获取认证失败,响应信息: %s", login_res.text) print("获取认证失败") token = None now = datetime.now() year = now.year month = now.month day = now.day if month < 10: month = "0" + str(month) if day < 10: day = "0" + str(day) cur_time = str(year) + str(month) + str(day) cur_time2 = str(year) + "-" + str(month) + "-" + str(day) search_data = { "data": { "date": cur_time, "dataItemNoList": one_cols[1:] }, "funcModule": "数据项", "funcOperation": "查询" } headers = {"Authorization": token} search_res = requests.post( url=search_url, headers=headers, json=search_data, timeout=(3, 5)) search_value = json.loads(search_res.text)["data"] if search_value: datas = search_value else: datas = None append_rows = [cur_time2] dataItemNo_dataValue = {} for data_value in datas: if "dataValue" not in data_value: print(data_value) dataItemNo_dataValue[data_value["dataItemNo"]] = "" else: dataItemNo_dataValue[data_value["dataItemNo"] ] = data_value["dataValue"] for value in one_cols[1:]: if value in dataItemNo_dataValue: append_rows.append(dataItemNo_dataValue[value]) else: append_rows.append("") workbook = xlrd.open_workbook('定性模型数据项12-11.xlsx') sheet_count = len(workbook.sheet_names()) sheet_names = workbook.sheet_names() new_workbook = xlwt.Workbook() for i in range(sheet_count): sheet = workbook.sheet_by_index(i) row_count = sheet.nrows col_count = sheet.ncols data = [] for row in range(row_count): row_data = [] for col in range(col_count): row_data.append(sheet.cell_value(row, col)) data.append(row_data) new_sheet = new_workbook.add_sheet(sheet_names[i]) for row in range(row_count): for col in range(col_count): new_sheet.write(row, col, data[row][col]) if i == 0: for col in range(col_count): new_sheet.write(row_count, col, append_rows[col]) new_workbook.save("定性模型数据项12-11.xlsx") update_e_value('定性模型数据项12-11.xlsx', 8, 1000) df = pd.read_excel('定性模型数据项12-11.xlsx') df = df.fillna(df.ffill()) df1 = df[-2:].reset_index() if df1.loc[1, '70号沥青开工率'] / 100 > 0.3: a = (df1.loc[1, '70号沥青开工率'] / 100 - 0.2)*5/0.1 else: a = 0 b = df1.loc[1, '资金因素'] if df1.loc[1, '昨日计划提货偏差'] > 0: c = df1.loc[1, '昨日计划提货偏差']*10/2000 else: c = df1.loc[1, '昨日计划提货偏差']*10/3000 d = df1.loc[1, '生产情况'] if df1.loc[1, '基质沥青库存']/265007 > 0.8: e = (df1.loc[1, '基质沥青库存'] - df1.loc[0, '基质沥青库存'])*10/-5000 else: e = 0 f = 1 if abs(df1.loc[1, '即期成本'] - df1.loc[0, '即期成本']) >= 100: g = (df1.loc[1, '即期成本'] - df1.loc[0, '即期成本'])*50/100 else: g = 0 h = df1.loc[1, '订单结构'] x = round(0.08*a+0*b+0.15*c+0.08*d + 0.03*e + 0.08 * f + 0.4*g+0.18*h+df1.loc[0, '京博指导价'], 2) # login_res1 = requests.post(url=login_url, json=login_data, timeout=(3, 30)) # text1 = json.loads(login_res1.text) # token_push = text1["data"]["accessToken"] # data1 = { # "funcModule": "数据表信息列表", # "funcOperation": "新增", # "data": [ # {"dataItemNo": "C01100036|Forecast_Price|DX|ACN", # "dataDate": cur_time, # "dataStatus": "add", # "dataValue": x # } # ] # } # headers1 = {"Authorization": token_push} # res = requests.post(url=upload_url, headers=headers1, json=data1, timeout=(3, 5)) logging.info("start_test 函数执行完成") except Exception as e: logging.error("start_test 函数执行时发生异常: %s", str(e)) def start_1(): try: logging.info("开始执行 start_1 函数") workbook = xlrd.open_workbook(read_file_path_name) sheet = workbook.sheet_by_index(0) num_rows = sheet.nrows row_data = sheet.row_values(1) one_cols = row_data login_res = requests.post( url=login_url, json=login_data, timeout=(3, 5)) text = json.loads(login_res.text) if text["status"]: token = text["data"]["accessToken"] else: logging.error("获取认证失败,响应信息: %s", login_res.text) print("获取认证失败") token = None now = datetime.now() - timedelta(days=1) year = now.year month = now.month day = now.day if month < 10: month = "0" + str(month) if day < 10: day = "0" + str(day) cur_time = str(year) + str(month) + str(day) cur_time2 = str(year) + "-" + str(month) + "-" + str(day) search_data = { "data": { "date": cur_time, "dataItemNoList": one_cols[1:] }, "funcModule": "数据项", "funcOperation": "查询" } headers = {"Authorization": token} search_res = requests.post( url=search_url, headers=headers, json=search_data, timeout=(3, 5)) search_value = json.loads(search_res.text)["data"] if search_value: datas = search_value else: datas = None append_rows = [cur_time2] dataItemNo_dataValue = {} for data_value in datas: if "dataValue" not in data_value: print(data_value) dataItemNo_dataValue[data_value["dataItemNo"]] = "" else: dataItemNo_dataValue[data_value["dataItemNo"] ] = data_value["dataValue"] for value in one_cols[1:]: if value in dataItemNo_dataValue: append_rows.append(dataItemNo_dataValue[value]) else: append_rows.append("") workbook = xlrd.open_workbook('定性模型数据项12-11.xlsx') sheet_count = len(workbook.sheet_names()) sheet_names = workbook.sheet_names() new_workbook = xlwt.Workbook() for i in range(sheet_count): sheet = workbook.sheet_by_index(i) row_count = sheet.nrows - 1 col_count = sheet.ncols data = [] for row in range(row_count): row_data = [] for col in range(col_count): row_data.append(sheet.cell_value(row, col)) data.append(row_data) new_sheet = new_workbook.add_sheet(sheet_names[i]) for row in range(row_count): for col in range(col_count): new_sheet.write(row, col, data[row][col]) if i == 0: for col in range(col_count): new_sheet.write(row_count, col, append_rows[col]) new_workbook.save("定性模型数据项12-11.xlsx") logging.info("start_1 函数执行完成") except Exception as e: logging.error("start_1 函数执行时发生异常: %s", str(e)) def get_queryDataListItemNos_value(token, url, dataItemNoList, dateStart, dateEnd): try: logging.info("开始获取查询数据列表项值,日期范围: %s 至 %s", dateStart, dateEnd) search_data = { "funcModule": "数据项", "funcOperation": "查询", "data": { "dateStart": dateStart, "dateEnd": dateEnd, "dataItemNoList": dataItemNoList # 数据项编码,代表 brent最低价和最高价 } } headers = {"Authorization": token} search_res = requests.post( url=url, headers=headers, json=search_data, timeout=(3, 5)) search_value = json.loads(search_res.text)["data"] if search_value: logging.info("成功获取查询数据列表项值") return search_value else: logging.warning("未获取到查询数据列表项值") return None except Exception as e: logging.error("获取查询数据列表项值时发生异常: %s", str(e)) return None def save_queryDataListItemNos_xls(data_df, dataItemNoList): try: logging.info("开始保存查询数据列表项到 Excel 文件") current_year_month = datetime.now().strftime('%Y-%m') grouped = data_df.groupby("dataDate") df_old = pd.read_excel('定性模型数据项12-11.xlsx') df_old0 = df_old[:1] result_dict = {df_old0.iloc[0][col]: col for col in df_old0.columns} df_old1 = df_old[1:].copy() df_old1["日期"] = pd.to_datetime(df_old1["日期"]) df_old1 = df_old1[~df_old1["日期"].dt.strftime( '%Y-%m').eq(current_year_month)] df_old1["日期"] = df_old1["日期"].dt.strftime('%Y-%m-%d') list_data = [] for date, group in grouped: dict_data = {"日期": date} for index, row in group.iterrows(): dict_data[result_dict[row['dataItemNo']]] = row['dataValue'] list_data.append(dict_data) df_current_year_month = pd.DataFrame(list_data) df_merged = pd.concat( [df_old0, df_old1, df_current_year_month], ignore_index=True) df_merged.to_excel('定性模型数据项12-11.xlsx', index=False) logging.info("保存查询数据列表项到 Excel 文件完成") except Exception as e: logging.error("保存查询数据列表项到 Excel 文件时发生异常: %s", str(e)) def queryDataListItemNos(date=None, token=None): try: logging.info("开始查询数据列表项,日期: %s", date) df = pd.read_excel('定性模型数据项12-11.xlsx') dataItemNoList = df.iloc[0].tolist()[1:] if token is None: token = getLogToken() if token is None: logging.error("获取token失败") print("获取token失败") return if date is None: date = datetime.now() current_date = date first_day_of_month = current_date.replace(day=1) dateEnd = current_date.strftime('%Y%m%d') dateStart = first_day_of_month.strftime('%Y%m%d') search_value = get_queryDataListItemNos_value( token, queryDataListItemNos_url, dataItemNoList, dateStart, dateEnd) data_df = pd.DataFrame(search_value) data_df["dataDate"] = pd.to_datetime(data_df["dataDate"]) data_df["dataDate"] = data_df["dataDate"].dt.strftime('%Y-%m-%d') save_queryDataListItemNos_xls(data_df, dataItemNoList) logging.info("查询数据列表项完成") except Exception as e: logging.error("查询数据列表项时发生异常: %s", str(e)) data_df = pd.DataFrame(search_value) data_df["dataDate"] = pd.to_datetime(data_df["dataDate"]) data_df["dataDate"] = data_df["dataDate"].dt.strftime('%Y-%m-%d') save_queryDataListItemNos_xls(data_df, dataItemNoList) def main(start_date=None, token=None, token_push=None): try: logging.info("开始执行主函数") if start_date is None: start_date = datetime.now() if token is None: token = get_head_auth() if token_push is None: token_push = get_head_push_auth() date = start_date.strftime('%Y%m%d') print(date) logging.info("当前日期: %s", date) updateExcelData(date, token) queryDataListItemNos(token=token) update_e_value('定性模型数据项12-11.xlsx', 8, 1000) x = qualitativeModel() if x is not None: print('**************************************************预测结果:', x) logging.info("预测结果: %s", x) cur_time, cur_time2 = getNow(date) pushData(cur_time, x, token_push) logging.info("主函数执行完成") except Exception as e: logging.error("主函数执行时发生异常: %s", str(e)) if __name__ == "__main__": print("运行中...") logging.info("程序启动") main() logging.info("程序结束")