PriceForecast/aisenzhecode/聚合级丙烯/丙烯每日价格预测.py
2025-07-08 11:25:31 +08:00

718 lines
22 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from statsmodels.tools.eval_measures import mse, rmse
from pandas import Series, DataFrame
import cufflinks as cf
from sklearn.metrics import r2_score
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error
import pickle
import warnings
from sklearn.model_selection import GridSearchCV
from sklearn.metrics import mean_absolute_error
from xgboost import plot_importance, plot_tree
import xgboost as xgb
import plotly.graph_objects as go
import plotly.express as px
from tensorflow.keras.preprocessing.sequence import TimeseriesGenerator
import statsmodels.api as sm
import datetime
from xgboost import XGBRegressor
from sklearn.linear_model import Lasso
import sklearn.datasets as datasets
from sklearn import preprocessing
from plotly.offline import download_plotlyjs, init_notebook_mode, plot, iplot
from plotly import __version__
import random
import seaborn as sn
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import requests
import json
import xlrd
import xlwt
from datetime import datetime
import time
# 变量定义
login_url = "http://10.200.32.39/jingbo-api/api/server/login"
search_url = "http://10.200.32.39/jingbo-api/api/warehouse/dwDataItem/queryByItemNos"
queryDataListItemNos_url = "http://10.200.32.39/jingbo-api//api/warehouse/dwDataItem/queryDataListItemNos"
login_push_url = "http://10.200.32.39/jingbo-api/api/server/login"
upload_url = "http://10.200.32.39/jingbo-api/api/dw/dataValue/pushDataValueList"
login_data = {
"data": {
"account": "api_dev",
"password": "ZTEwYWRjMzk0OWJhNTlhYmJlNTZlMDU3ZjIwZjg4M2U=",
"tenantHashCode": "8a4577dbd919675758d57999a1e891fe",
"terminal": "API"
},
"funcModule": "API",
"funcOperation": "获取token"
}
login_push_data = {
"data": {
"account": "api_dev",
"password": "ZTEwYWRjMzk0OWJhNTlhYmJlNTZlMDU3ZjIwZjg4M2U=",
"tenantHashCode": "8a4577dbd919675758d57999a1e891fe",
"terminal": "API"
},
"funcModule": "API",
"funcOperation": "获取token"
}
read_file_path_name = "丙烯基础数据收集表.xls"
one_cols = []
two_cols = []
# 导入机器学习算法模型
# 切割训练数据和样本数据
# 用于模型评分
le = preprocessing.LabelEncoder()
# print(__version__) # requires version >= 1.9.0
cf.go_offline()
random.seed(100)
# 数据获取
def get_head_auth():
login_res = requests.post(url=login_url, json=login_data, timeout=(3, 5))
text = json.loads(login_res.text)
if text["status"]:
token = text["data"]["accessToken"]
return token
else:
print("获取认证失败")
return None
def get_data_value(token, dataItemNoList):
search_data = {
"data": {
"date": get_cur_time()[0],
"dataItemNoList": dataItemNoList
},
"funcModule": "数据项",
"funcOperation": "查询"
}
headers = {"Authorization": token}
search_res = requests.post(
url=search_url, headers=headers, json=search_data, timeout=(3, 5))
search_value = json.loads(search_res.text)["data"]
if search_value:
return search_value
else:
print("今天没有新数据")
return search_value
# xls文件处理
def write_xls(data):
# 创建一个Workbook对象
workbook = xlwt.Workbook()
# 创建一个Sheet对象可指定名称
sheet = workbook.load('Sheet1')
# 写入数据行
for row_index, row_data in enumerate(data):
for col_index, cell_data in enumerate(row_data):
sheet.write(row_index, col_index, cell_data)
# 保存Workbook到文件
workbook.save(get_cur_time()[0] + '.xls')
def get_cur_time():
now = datetime.datetime.now()
year = now.year
month = now.month
day = now.day
if month < 10:
month = "0" + str(month)
if day < 10:
day = "0" + str(day)
cur_time = str(year) + str(month) + str(day)
cur_time2 = str(year) + "-" + str(month) + "-" + str(day)
# cur_time = '20231011'
# cur_time2 = '2023-10-11'
return cur_time, cur_time2
def get_head_push_auth():
login_res = requests.post(
url=login_push_url, json=login_push_data, timeout=(3, 5))
text = json.loads(login_res.text)
if text["status"]:
token = text["data"]["accessToken"]
return token
else:
print("获取认证失败")
return None
def upload_data_to_system(token_push):
data = {
"funcModule": "数据表信息列表",
"funcOperation": "新增",
"data": [
{"dataItemNo": "C01100007|Forecast_Price|ACN",
"dataDate": get_cur_time()[0],
"dataStatus": "add",
# "dataValue": 7100
"dataValue": forecast_price()
}
]
}
headers = {"Authorization": token_push}
res = requests.post(url=upload_url, headers=headers,
json=data, timeout=(3, 5))
print(res.text)
# def upload_data_to_system(token):
# data = {
# "funcModule": "数据表信息列表",
# "funcOperation": "新增",
# "data": [
# {"dataItemNo": "C01100036|Forecast_ Price|ACN",
# "dataDate": '20230706',
# "dataStatus": "add",
# "dataValue": 3780.0
# }
# ]
# }
# headers = {"Authorization": token}
# res = requests.post(url=upload_url, headers=headers, json=data, timeout=(3, 5))
# print(res.text)
def forecast_price():
# df_test = pd.read_csv('定价模型数据收集0212.csv')
df_test = pd.read_excel('丙烯基础数据收集表.xlsx')
df_test.drop([0], inplace=True)
df_test['Date'] = pd.to_datetime(
df_test['Date'], format='%Y-%m-%d', infer_datetime_format=True)
# 将缺失值补为前一个或者后一个数值
df_test_1 = df_test
df_test_1 = df_test_1.fillna(df_test.ffill())
df_test_1 = df_test_1.fillna(df_test_1.bfill())
# 选择用于模型训练的列名称
col_for_training = df_test_1.columns
import joblib
Best_model_DalyLGPrice = joblib.load("日度价格预测_丙烯最佳模型.pkl")
# 最新的一天为最后一行的数据
df_test_1_Day = df_test_1.tail(1)
# 移除不需要的列
df_test_1_Day.index = df_test_1_Day["Date"]
df_test_1_Day = df_test_1_Day.drop(["Date"], axis=1)
df_test_1_Day = df_test_1_Day.drop('Price', axis=1)
df_test_1_Day = df_test_1_Day.dropna()
# 转换数据类型
df_test_1_Day = df_test_1_Day.astype(float)
# df_test_1_Day
# 预测今日价格,显示至小数点后两位
Ypredict_Today = Best_model_DalyLGPrice.predict(df_test_1_Day)
df_test_1_Day['日度预测价格'] = Ypredict_Today
print(df_test_1_Day['日度预测价格'])
a = df_test_1_Day['日度预测价格']
a = a[0]
a = float(a)
a = round(a, 2)
return a
def optimize_Model():
from sklearn.model_selection import train_test_split
from sklearn.impute import SimpleImputer
from sklearn.preprocessing import OrdinalEncoder
from sklearn.feature_selection import SelectFromModel
from sklearn.metrics import mean_squared_error, r2_score
import pandas as pd
pd.set_option('display.max_rows', 40)
pd.set_option('display.max_columns', 40)
df_test = pd.read_excel('丙烯基础数据收集表.xlsx')
df_test.drop([0], inplace=True)
df_test['Date'] = pd.to_datetime(
df_test['Date'], format='%Y-%m-%d', infer_datetime_format=True)
# 查看每个特征缺失值数量
MisVal_Check = df_test.isnull().sum().sort_values(ascending=False)
# 去掉缺失值百分比>0.4的特征去掉这些特征后的新表格命名为df_test_1
df_MisVal_Check = pd.DataFrame(MisVal_Check,)
df_MisVal_Check_1 = df_MisVal_Check.reset_index()
df_MisVal_Check_1.columns = ['Variable_Name', 'Missing_Number']
df_MisVal_Check_1['Missing_Number'] = df_MisVal_Check_1['Missing_Number'] / \
len(df_test)
df_test_1 = df_test.drop(
df_MisVal_Check_1[df_MisVal_Check_1['Missing_Number'] > 0.4].Variable_Name, axis=1)
# 将缺失值补为前一个或者后一个数值
df_test_1 = df_test
df_test_1 = df_test_1.fillna(df_test.ffill())
df_test_1 = df_test_1.fillna(df_test_1.bfill())
df_test_1["Date"] = pd.to_datetime(df_test_1["Date"])
df_test_1.index = df_test_1["Date"]
df_test_1 = df_test_1.drop(["Date"], axis=1)
df_test_1 = df_test_1.astype('float')
import numpy as np
import pandas as pd
from pandas import Series, DataFrame
import matplotlib.pyplot as plt
import sklearn.datasets as datasets
# 导入机器学习算法模型
from sklearn.linear_model import Lasso
from xgboost import XGBRegressor
from datetime import datetime
import statsmodels.api as sm
from tensorflow.keras.preprocessing.sequence import TimeseriesGenerator
import plotly.express as px
import plotly.graph_objects as go
import xgboost as xgb
from xgboost import plot_importance, plot_tree
from sklearn.metrics import mean_absolute_error
from statsmodels.tools.eval_measures import mse, rmse
from sklearn.model_selection import GridSearchCV
from xgboost import XGBRegressor
import warnings
import pickle
from sklearn.metrics import mean_squared_error
# 切割训练数据和样本数据
from sklearn.model_selection import train_test_split
# 用于模型评分
from sklearn.metrics import r2_score
dataset1 = df_test_1.drop('Price', axis=1) # .astype(float)
y = df_test_1['Price']
x = dataset1
train = x
target = y
# 切割数据样本集合测试集
X_train, x_test, y_train, y_true = train_test_split(
train, target, test_size=0.2, random_state=0)
# 模型缩写
Lasso = Lasso(random_state=0)
XGBR = XGBRegressor(random_state=0)
# 训练模型
Lasso.fit(X_train, y_train)
XGBR.fit(X_train, y_train)
# 模型拟合
y_pre_Lasso = Lasso.predict(x_test)
y_pre_XGBR = XGBR.predict(x_test)
# 计算Lasso、XGBR、RandomForestR、AdaBoostR、GradientBoostingR、BaggingRegressor各模型的R²
Lasso_score = r2_score(y_true, y_pre_Lasso)
XGBR_score = r2_score(y_true, y_pre_XGBR)
# 计算Lasso、XGBR的MSE和RMSE
Lasso_MSE = mean_squared_error(y_true, y_pre_Lasso)
XGBR_MSE = mean_squared_error(y_true, y_pre_XGBR)
Lasso_RMSE = np.sqrt(Lasso_MSE)
XGBR_RMSE = np.sqrt(XGBR_MSE)
# 将不同模型的不同误差值整合成一个表格
model_results = pd.DataFrame([['Lasso', Lasso_RMSE, Lasso_score],
['XgBoost', XGBR_RMSE, XGBR_score]],
columns=['模型(Model)', '均方根误差(RMSE)', 'R^2 score'])
# 将模型名称(Model)列设置为索引
model_results1 = model_results.set_index('模型(Model)')
model_results1
# 定义plot_feature_importance函数该函数用于计算特征重要性。此部分代码无需调整
def plot_feature_importance(importance, names, model_type):
feature_importance = np.array(importance)
feature_names = np.array(names)
data = {'feature_names': feature_names,
'feature_importance': feature_importance}
fi_df = pd.DataFrame(data)
fi_df.sort_values(by=['feature_importance'],
ascending=False, inplace=True)
plt.figure(figsize=(10, 8))
sn.barplot(x=fi_df['feature_importance'], y=fi_df['feature_names'])
plt.title(model_type + " "+'FEATURE IMPORTANCE')
plt.xlabel('FEATURE IMPORTANCE')
plt.ylabel('FEATURE NAMES')
from pylab import mpl
mpl.rcParams['font.sans-serif'] = ['SimHei']
# Xgboost 模型参数优化-初步
# 参考: https://juejin.im/post/6844903661013827598
# 每次调参时备选参数数值以同数量级的1、3、10设置即可比如设置1、3、10或0.1、0.3、1.0或0.01,0.03,0.10即可)
from xgboost import XGBRegressor
from sklearn.model_selection import GridSearchCV
estimator = XGBRegressor(random_state=0,
nthread=4,
seed=0
)
parameters = {
'max_depth': range(2, 11, 2), # 树的最大深度
'n_estimators': range(50, 101, 10), # 迭代次数
'learning_rate': [0.01, 0.03, 0.1, 0.3, 0.5, 1]
}
grid_search_XGB = GridSearchCV(
estimator=estimator,
param_grid=parameters,
# n_jobs = 10,
cv=3,
verbose=True
)
grid_search_XGB.fit(X_train, y_train)
# 如果电脑在此步骤报错可能是因为计算量太大超过硬件可支持程度可注释掉“n_jobs=10”一行
best_parameters = grid_search_XGB.best_estimator_.get_params()
y_pred = grid_search_XGB.predict(x_test)
op_XGBR_score = r2_score(y_true, y_pred)
op_XGBR_MSE = mean_squared_error(y_true, y_pred)
op_XGBR_RMSE = np.sqrt(op_XGBR_MSE)
model_results2 = pd.DataFrame([['Optimized_Xgboost', op_XGBR_RMSE, op_XGBR_score]],
columns=['模型(Model)', '均方根误差(RMSE)', 'R^2 score'])
model_results2 = model_results2.set_index('模型(Model)')
results = pd.concat([model_results1, model_results2], ignore_index=False)
import pickle
Pkl_Filename = "日度价格预测_丙烯最佳模型.pkl"
with open(Pkl_Filename, 'wb') as file:
pickle.dump(grid_search_XGB, file)
def queryDataListItemNos(token=None):
df = pd.read_excel('丙烯基础数据收集表.xlsx')
dataItemNoList = df.iloc[0].tolist()[1:]
if token is None:
token = get_head_auth()
if not token:
print('token获取失败')
return
# 获取当前日期
from datetime import datetime, timedelta
current_date = datetime.now()
# 获取当月1日
first_day_of_month = current_date.replace(day=1)
# 格式化为 YYYYMMDD 格式
dateEnd = current_date.strftime('%Y%m%d')
dateStart = first_day_of_month.strftime('%Y%m%d')
# dateStart = '20241026'
search_value = get_queryDataListItemNos_value(
token, queryDataListItemNos_url, dataItemNoList, dateStart, dateEnd)
data_df = pd.DataFrame(search_value)
data_df["dataDate"] = pd.to_datetime(data_df["dataDate"])
data_df["dataDate"] = data_df["dataDate"].dt.strftime('%Y-%m-%d')
save_queryDataListItemNos_xls(data_df, dataItemNoList)
print('当月数据更新完成')
def save_queryDataListItemNos_xls(data_df, dataItemNoList):
from datetime import datetime, timedelta
current_year_month = datetime.now().strftime('%Y-%m')
grouped = data_df.groupby("dataDate")
# 使用openpyxl打开xlsx文件
from openpyxl import load_workbook
workbook = load_workbook('丙烯基础数据收集表.xlsx')
# 创建新工作簿
new_workbook = load_workbook('丙烯基础数据收集表.xlsx')
for sheetname in workbook.sheetnames:
sheet = workbook[sheetname]
new_sheet = new_workbook[sheetname]
current_year_month_row = 0
# 查找当前月份数据起始行
for row_idx, row in enumerate(sheet.iter_rows(values_only=True), 1):
if str(row[0]).startswith(current_year_month):
current_year_month_row += 1
# 追加新数据
if sheetname == workbook.sheetnames[0]:
start_row = sheet.max_row - current_year_month_row + 1
for row_idx, (date, group) in enumerate(grouped, start=start_row):
new_sheet.cell(row=row_idx, column=1, value=date)
for j, dataItemNo in enumerate(dataItemNoList, start=2):
if group[group["dataItemNo"] == dataItemNo]["dataValue"].values:
new_sheet.cell(row=row_idx, column=j,
value=group[group["dataItemNo"] == dataItemNo]["dataValue"].values[0])
# 保存修改后的xlsx文件
new_workbook.save("丙烯基础数据收集表.xlsx")
def get_queryDataListItemNos_value(token, url, dataItemNoList, dateStart, dateEnd):
search_data = {
"funcModule": "数据项",
"funcOperation": "查询",
"data": {
"dateStart": dateStart,
"dateEnd": dateEnd,
"dataItemNoList": dataItemNoList # 数据项编码,代表 brent最低价和最高价
}
}
headers = {"Authorization": token}
search_res = requests.post(
url=url, headers=headers, json=search_data, timeout=(3, 5))
search_value = json.loads(search_res.text)["data"]
if search_value:
return search_value
else:
return None
def read_xls_data():
global one_cols, two_cols
# 打开 XLS 文件
workbook = xlrd.open_workbook(read_file_path_name)
# 获取所有表格名称
# sheet_names = workbook.sheet_names()
# 选择第一个表格
sheet = workbook.sheet_by_index(0)
# 获取行数和列数
num_rows = sheet.nrows
# num_cols = sheet.ncols
# 遍历每一行,获取单元格数据
# for i in range(num_rows):
# row_data = sheet.row_values(i)
# one_cols.append(row_data)
# two_cols.append(row_data[1])
row_data = sheet.row_values(1)
one_cols = row_data
# 关闭 XLS 文件
# workbook.close()
def start():
read_xls_data()
token = get_head_auth()
if not token:
return
token_push = get_head_push_auth()
if not token_push:
return
datas = get_data_value(token, one_cols[1:])
# if not datas:
# return
# data_list = [two_cols, one_cols]
append_rows = [get_cur_time()[1]]
dataItemNo_dataValue = {}
for data_value in datas:
if "dataValue" not in data_value:
print(data_value)
dataItemNo_dataValue[data_value["dataItemNo"]] = ""
else:
dataItemNo_dataValue[data_value["dataItemNo"]
] = data_value["dataValue"]
for value in one_cols[1:]:
if value in dataItemNo_dataValue:
append_rows.append(dataItemNo_dataValue[value])
else:
append_rows.append("")
save_xls(append_rows)
optimize_Model()
upload_data_to_system(token_push)
# data_list.append(three_cols)
# write_xls(data_list)
def start_1():
read_xls_data()
token = get_head_auth()
if not token:
return
datas = get_data_value(token, one_cols[1:])
# if not datas:
# return
# data_list = [two_cols, one_cols]
append_rows = [get_cur_time()[1]]
dataItemNo_dataValue = {}
for data_value in datas:
if "dataValue" not in data_value:
print(data_value)
dataItemNo_dataValue[data_value["dataItemNo"]] = ""
else:
dataItemNo_dataValue[data_value["dataItemNo"]
] = data_value["dataValue"]
for value in one_cols[1:]:
if value in dataItemNo_dataValue:
append_rows.append(dataItemNo_dataValue[value])
else:
append_rows.append("")
save_xls_1(append_rows)
# data_list.append(three_cols)
# write_xls(data_list)
def save_xls_1(append_rows):
# 打开xls文件
workbook = xlrd.open_workbook('丙烯基础数据收集表.xls')
# 获取所有sheet的个数
sheet_count = len(workbook.sheet_names())
# 获取所有sheet的名称
sheet_names = workbook.sheet_names()
new_workbook = xlwt.Workbook()
for i in range(sheet_count):
# 获取当前sheet
sheet = workbook.sheet_by_index(i)
# 获取sheet的行数和列数
row_count = sheet.nrows - 1
col_count = sheet.ncols
# 获取原有数据
data = []
for row in range(row_count):
row_data = []
for col in range(col_count):
row_data.append(sheet.cell_value(row, col))
data.append(row_data)
# 创建xlwt的Workbook对象
# 创建sheet
new_sheet = new_workbook.add_sheet(sheet_names[i])
# 将原有的数据写入新的sheet
for row in range(row_count):
for col in range(col_count):
new_sheet.write(row, col, data[row][col])
if i == 0:
# 在新的sheet中添加数据
for col in range(col_count):
new_sheet.write(row_count, col, append_rows[col])
# 保存新的xls文件
new_workbook.save("丙烯基础数据收集表.xls")
def check_data(dataItemNo):
token = get_head_auth()
if not token:
return
datas = get_data_value(token, dataItemNo)
if not datas:
return
def save_xls(append_rows):
# 打开xls文件
workbook = xlrd.open_workbook('丙烯基础数据收集表.xls')
# 获取所有sheet的个数
sheet_count = len(workbook.sheet_names())
# 获取所有sheet的名称
sheet_names = workbook.sheet_names()
new_workbook = xlwt.Workbook()
for i in range(sheet_count):
# 获取当前sheet
sheet = workbook.sheet_by_index(i)
# 获取sheet的行数和列数
row_count = sheet.nrows
col_count = sheet.ncols
# 获取原有数据
data = []
for row in range(row_count):
row_data = []
for col in range(col_count):
row_data.append(sheet.cell_value(row, col))
data.append(row_data)
# 创建xlwt的Workbook对象
# 创建sheet
new_sheet = new_workbook.add_sheet(sheet_names[i])
# 将原有的数据写入新的sheet
for row in range(row_count):
for col in range(col_count):
new_sheet.write(row, col, data[row][col])
if i == 0:
# 在新的sheet中添加数据
for col in range(col_count):
new_sheet.write(row_count, col, append_rows[col])
# 保存新的xls文件
new_workbook.save("丙烯基础数据收集表.xls")
if __name__ == "__main__":
# start()
queryDataListItemNos()
optimize_Model()
forecast_price()
upload_data_to_system(token_push=get_head_push_auth())