from statsmodels.tools.eval_measures import mse, rmse from pandas import Series, DataFrame import cufflinks as cf from sklearn.metrics import r2_score from sklearn.model_selection import train_test_split from sklearn.metrics import mean_squared_error import pickle import warnings from sklearn.model_selection import GridSearchCV from sklearn.metrics import mean_absolute_error from xgboost import plot_importance, plot_tree import xgboost as xgb import plotly.graph_objects as go import plotly.express as px from tensorflow.keras.preprocessing.sequence import TimeseriesGenerator import statsmodels.api as sm from xgboost import XGBRegressor from sklearn.linear_model import Lasso import sklearn.datasets as datasets from sklearn import preprocessing from plotly.offline import download_plotlyjs, init_notebook_mode, plot, iplot from plotly import __version__ import random import seaborn as sn import matplotlib.pyplot as plt import numpy as np import matplotlib import requests import json from datetime import datetime, timedelta import time import pandas as pd # 变量定义 login_url = "http://10.200.32.39/jingbo-api/api/server/login" search_url = "http://10.200.32.39/jingbo-api/api/warehouse/dwDataItem/queryByItemNos" queryDataListItemNos_url = "http://10.200.32.39/jingbo-api//api/warehouse/dwDataItem/queryDataListItemNos" login_push_url = "http://10.200.32.39/jingbo-api/api/server/login" upload_url = "http://10.200.32.39/jingbo-api/api/dw/dataValue/pushDataValueList" login_data = { "data": { "account": "api_dev", "password": "ZTEwYWRjMzk0OWJhNTlhYmJlNTZlMDU3ZjIwZjg4M2U=", "tenantHashCode": "8a4577dbd919675758d57999a1e891fe", "terminal": "API" }, "funcModule": "API", "funcOperation": "获取token" } login_push_data = { "data": { "account": "api_dev", "password": "ZTEwYWRjMzk0OWJhNTlhYmJlNTZlMDU3ZjIwZjg4M2U=", "tenantHashCode": "8a4577dbd919675758d57999a1e891fe", "terminal": "API" }, "funcModule": "API", "funcOperation": "获取token" } read_file_path_name = "沥青数据项.xlsx" one_cols = [] two_cols = [] # 导入机器学习算法模型 # from keras.preprocessing.sequence import TimeseriesGenerator # 切割训练数据和样本数据 # 用于模型评分 le = preprocessing.LabelEncoder() # print(__version__) # requires version >= 1.9.0 cf.go_offline() random.seed(100) # 数据获取 def get_head_auth(): login_res = requests.post(url=login_url, json=login_data, timeout=(3, 5)) text = json.loads(login_res.text) if text["status"]: token = text["data"]["accessToken"] return token else: print("获取认证失败") return None def get_data_value(token, dataItemNoList, date=''): search_data = { "data": { "date": getNow(date)[0], "dataItemNoList": dataItemNoList }, "funcModule": "数据项", "funcOperation": "查询" } headers = {"Authorization": token} search_res = requests.post( url=search_url, headers=headers, json=search_data, timeout=(3, 5)) search_value = json.loads(search_res.text)["data"] if search_value: return search_value else: print("今天没有新数据") return None # xls文件处理 def getNow(date='', offset=0): """生成指定日期的两种格式字符串 Args: date: 支持多种输入类型: - datetime对象 - 字符串格式(支持'%Y-%m-%d'和'%Y%m%d') - 空字符串表示当前日期 offset: 日期偏移天数 Returns: tuple: (紧凑日期字符串, 标准日期字符串) """ # 日期解析逻辑 from datetime import datetime, timedelta if isinstance(date, datetime): now = date else: now = datetime.now() if date: # 尝试多种日期格式解析 for fmt in ('%Y-%m-%d', '%Y%m%d', '%Y/%m/%d'): try: now = datetime.strptime(str(date), fmt) break except ValueError: continue else: raise ValueError(f"无法解析的日期格式: {date}") # 应用日期偏移 now = now - timedelta(days=offset) # 统一格式化输出 date_str = now.strftime("%Y-%m-%d") compact_date = date_str.replace("-", "") return compact_date, date_str def get_head_push_auth(): login_res = requests.post( url=login_push_url, json=login_push_data, timeout=(3, 5)) text = json.loads(login_res.text) if text["status"]: token = text["data"]["accessToken"] return token else: print("获取认证失败") return None def upload_data_to_system(token_push, date): data = { "funcModule": "数据表信息列表", "funcOperation": "新增", "data": [ {"dataItemNo": "C01100036|Forecast_Price|ACN", "dataDate": getNow(date)[0], "dataStatus": "add", "dataValue": forecast_price() } ] } headers = {"Authorization": token_push} res = requests.post(url=upload_url, headers=headers, json=data, timeout=(3, 5)) print(res.text) def forecast_price(): df_test = pd.read_excel('沥青数据项.xlsx') df_test.drop([0], inplace=True) # 优化的格式转换逻辑:除了'日期'列以外,所有列都转为float for col in df_test.columns: if col != '日期': df_test[col] = pd.to_numeric(df_test[col], errors='coerce') # df_test['日期']=pd.to_datetime(df_test['日期'], format='%d/%m/%Y',infer_datetime_format=True) df_test['日期'] = pd.to_datetime( df_test['日期'], format='%Y-%m-%d', infer_datetime_format=True) # 查看每个特征缺失值数量 MisVal_Check = df_test.isnull().sum().sort_values(ascending=False) # 去掉缺失值百分比>0.4的特征,去掉这些特征后的新表格命名为df_test_1 df_MisVal_Check = pd.DataFrame(MisVal_Check,) df_MisVal_Check_1 = df_MisVal_Check.reset_index() df_MisVal_Check_1.columns = ['Variable_Name', 'Missing_Number'] df_MisVal_Check_1['Missing_Number'] = df_MisVal_Check_1['Missing_Number'] / \ len(df_test) df_test_1 = df_test.drop( df_MisVal_Check_1[df_MisVal_Check_1['Missing_Number'] > 0.4].Variable_Name, axis=1) # 将缺失值补为前一个或者后一个数值 df_test_1 = df_test_1.fillna(df_test.ffill()) df_test_1 = df_test_1.fillna(df_test_1.bfill()) # 选择用于模型训练的列名称 col_for_training = df_test_1.columns import joblib Best_model_DalyLGPrice = joblib.load("日度价格预测_最佳模型.pkl") # 最新的一天为最后一行的数据 df_test_1_Day = df_test_1.tail(1) # 移除不需要的列 df_test_1_Day.index = df_test_1_Day["日期"] df_test_1_Day = df_test_1_Day.drop(["日期"], axis=1) df_test_1_Day = df_test_1_Day.drop('京博指导价', axis=1) df_test_1_Day = df_test_1_Day.dropna() # df_test_1_Day # 预测今日价格,显示至小数点后两位 Ypredict_Today = Best_model_DalyLGPrice.predict(df_test_1_Day) df_test_1_Day['日度预测价格'] = Ypredict_Today print(df_test_1_Day['日度预测价格']) a = df_test_1_Day['日度预测价格'] a = a[0] a = float(a) a = round(a, 2) return a def optimize_Model(): from sklearn.model_selection import train_test_split from sklearn.impute import SimpleImputer from sklearn.preprocessing import OrdinalEncoder from sklearn.feature_selection import SelectFromModel from sklearn.metrics import mean_squared_error, r2_score pd.set_option('display.max_rows', 40) pd.set_option('display.max_columns', 40) df_test = pd.read_excel('沥青数据项.xlsx') df_test.drop([0], inplace=True) # 优化的格式转换逻辑:除了'日期'列以外,所有列都转为float for col in df_test.columns: if col != '日期': df_test[col] = pd.to_numeric(df_test[col], errors='coerce') # df_test = pd.read_csv('定价模型数据收集20190901-20230615.csv',encoding = 'gbk',engine = 'python') # df_test['日期']=pd.to_datetime(df_test['日期'], format='%m/%d/%Y',infer_datetime_format=True) df_test['日期'] = pd.to_datetime( df_test['日期'], format='%Y-%m-%d', infer_datetime_format=True) # df_test.tail(3) MisVal_Check = df_test.isnull().sum().sort_values(ascending=False) # 去掉缺失值百分比>0.4的特征,去掉这些特征后的新表格命名为df_test_1 df_MisVal_Check = pd.DataFrame(MisVal_Check,) df_MisVal_Check_1 = df_MisVal_Check.reset_index() df_MisVal_Check_1.columns = ['Variable_Name', 'Missing_Number'] df_MisVal_Check_1['Missing_Number'] = df_MisVal_Check_1['Missing_Number'] / \ len(df_test) df_test_1 = df_test.drop( df_MisVal_Check_1[df_MisVal_Check_1['Missing_Number'] > 0.4].Variable_Name, axis=1) # 将缺失值补为前一个或者后一个数值 df_test_1 = df_test_1.fillna(df_test.ffill()) df_test_1 = df_test_1.fillna(df_test_1.bfill()) df_test_1["日期"] = pd.to_datetime(df_test_1["日期"]) df_test_1.index = df_test_1["日期"] df_test_1 = df_test_1.drop(["日期"], axis=1) dataset1 = df_test_1.drop('京博指导价', axis=1) # .astype(float) y = df_test_1['京博指导价'] x = dataset1 train = x target = y # 切割数据样本集合测试集 X_train, x_test, y_train, y_true = train_test_split( train, target, test_size=0.2, random_state=0) from sklearn.linear_model import Lasso from xgboost import XGBRegressor import statsmodels.api as sm # from keras.preprocessing.sequence import TimeseriesGenerator from tensorflow.keras.preprocessing.sequence import TimeseriesGenerator import plotly.express as px import plotly.graph_objects as go import xgboost as xgb from xgboost import plot_importance, plot_tree from sklearn.metrics import mean_absolute_error from statsmodels.tools.eval_measures import mse, rmse from sklearn.model_selection import GridSearchCV from xgboost import XGBRegressor import warnings import pickle from sklearn.metrics import mean_squared_error # 切割训练数据和样本数据 from sklearn.model_selection import train_test_split # 用于模型评分 from sklearn.metrics import r2_score # 模型缩写 Lasso = Lasso(random_state=0) XGBR = XGBRegressor(random_state=0) Lasso.fit(X_train, y_train) XGBR.fit(X_train, y_train) y_pre_Lasso = Lasso.predict(x_test) y_pre_XGBR = XGBR.predict(x_test) # 计算Lasso、XGBR、RandomForestR、AdaBoostR、GradientBoostingR、BaggingRegressor各模型的R² Lasso_score = r2_score(y_true, y_pre_Lasso) XGBR_score = r2_score(y_true, y_pre_XGBR) # 计算Lasso、XGBR的MSE和RMSE Lasso_MSE = mean_squared_error(y_true, y_pre_Lasso) XGBR_MSE = mean_squared_error(y_true, y_pre_XGBR) Lasso_RMSE = np.sqrt(Lasso_MSE) XGBR_RMSE = np.sqrt(XGBR_MSE) model_results = pd.DataFrame([['Lasso', Lasso_RMSE, Lasso_score], ['XgBoost', XGBR_RMSE, XGBR_score]], columns=['模型(Model)', '均方根误差(RMSE)', 'R^2 score']) model_results1 = model_results.set_index('模型(Model)') def plot_feature_importance(importance, names, model_type): feature_importance = np.array(importance) feature_names = np.array(names) data = {'feature_names': feature_names, 'feature_importance': feature_importance} fi_df = pd.DataFrame(data) fi_df.sort_values(by=['feature_importance'], ascending=False, inplace=True) plt.figure(figsize=(10, 8)) sn.barplot(x=fi_df['feature_importance'], y=fi_df['feature_names']) plt.title(model_type + " "+'FEATURE IMPORTANCE') plt.xlabel('FEATURE IMPORTANCE') plt.ylabel('FEATURE NAMES') plt.savefig(f'{model_type}-沥青定量特征重要性.png') from pylab import mpl mpl.rcParams['font.sans-serif'] = ['SimHei'] # from pylab import mpl # mpl.rcParams['font.sans-serif'] = ['SimHei'] # 显示XGBoost模型的各特征重要性 # 参考: https://www.analyseup.com/learn-python-for-data-science/python-random-forest-feature-importance-plot.html # matplotlib.rc("font", family='MicroSoft YaHei', weight="bold") # plot_feature_importance(XGBR.feature_importances_, # X_train.columns, 'XGBoost') from pylab import mpl # mpl.rcParams['font.sans-serif'] = ['SimHei'] # 显示Lasso模型的各特征重要性 # 参考: https://www.analyseup.com/learn-python-for-data-science/python-random-forest-feature-importance-plot.html # matplotlib.rc("font", family='MicroSoft YaHei', weight="bold") # plot_feature_importance(Lasso.coef_, X_train.columns, 'Lasso') from xgboost import XGBRegressor from sklearn.model_selection import GridSearchCV estimator = XGBRegressor(random_state=0, nthread=4, seed=0 ) parameters = { 'max_depth': range(2, 11, 2), # 树的最大深度 'n_estimators': range(50, 101, 10), # 迭代次数 'learning_rate': [0.01, 0.03, 0.1, 0.3, 0.5, 1] } grid_search_XGB = GridSearchCV( estimator=estimator, param_grid=parameters, # n_jobs = 10, cv=3, verbose=True ) grid_search_XGB.fit(X_train, y_train) print("Best score: %0.3f" % grid_search_XGB.best_score_) print("Best parameters set:") best_parameters = grid_search_XGB.best_estimator_.get_params() for param_name in sorted(parameters.keys()): print("\t%s: %r" % (param_name, best_parameters[param_name])) y_pred = grid_search_XGB.predict(x_test) op_XGBR_score = r2_score(y_true, y_pred) op_XGBR_MSE = mean_squared_error(y_true, y_pred) op_XGBR_RMSE = np.sqrt(op_XGBR_MSE) model_results2 = pd.DataFrame([['Optimized_Xgboost', op_XGBR_RMSE, op_XGBR_score]], columns=['模型(Model)', '均方根误差(RMSE)', 'R^2 score']) model_results2 = model_results2.set_index('模型(Model)') # results = model_results1.append(model_results2, ignore_index = False) results = pd.concat([model_results1, model_results2], ignore_index=True) results import pickle Pkl_Filename = "日度价格预测_最佳模型.pkl" with open(Pkl_Filename, 'wb') as file: pickle.dump(grid_search_XGB, file) def read_xls_data(): """获取特征项ID""" global one_cols, two_cols # 使用pandas读取Excel文件 df = pd.read_excel(read_file_path_name, header=None) # 不自动识别列名 # 获取第二行数据(索引为1) one_cols = df.iloc[1].tolist()[1:] print(f'获取到的数据项ID{one_cols}') def get_queryDataListItemNos_value(token, url, dataItemNoList, dateStart, dateEnd): search_data = { "funcModule": "数据项", "funcOperation": "查询", "data": { "dateStart": dateStart, "dateEnd": dateEnd, "dataItemNoList": dataItemNoList # 数据项编码,代表 brent最低价和最高价 } } headers = {"Authorization": token} search_res = requests.post( url=url, headers=headers, json=search_data, timeout=(3, 5)) search_value = json.loads(search_res.text)["data"] if search_value: return search_value else: return None def save_queryDataListItemNos_xls(data_df, dataItemNoList): from datetime import datetime, timedelta current_year_month = datetime.now().strftime('%Y-%m') grouped = data_df.groupby("dataDate") # 使用openpyxl打开xlsx文件 from openpyxl import load_workbook workbook = load_workbook('沥青数据项.xlsx') # 创建新工作簿 new_workbook = load_workbook('沥青数据项.xlsx') for sheetname in workbook.sheetnames: sheet = workbook[sheetname] new_sheet = new_workbook[sheetname] current_year_month_row = 0 # 查找当前月份数据起始行 for row_idx, row in enumerate(sheet.iter_rows(values_only=True), 1): if str(row[0]).startswith(current_year_month): current_year_month_row += 1 # 追加新数据 if sheetname == workbook.sheetnames[0]: start_row = sheet.max_row - current_year_month_row + 1 for row_idx, (date, group) in enumerate(grouped, start=start_row): new_sheet.cell(row=row_idx, column=1, value=date) for j, dataItemNo in enumerate(dataItemNoList, start=2): if group[group["dataItemNo"] == dataItemNo]["dataValue"].values: new_sheet.cell(row=row_idx, column=j, value=group[group["dataItemNo"] == dataItemNo]["dataValue"].values[0]) # 保存修改后的xlsx文件 new_workbook.save("沥青数据项.xlsx") def save_recent_two_months_data_xls(data_df, dataItemNoList, filename='沥青数据项.xlsx', dateEnd=None): """ 只更新近两个月的数据,其他历史数据保持不变 从结束日期往回推算两个月,只更新这个时间范围内的数据行 保留源文件的前两行(中文列名+ID行)和两个月之前的所有历史数据 参数: data_df (DataFrame): 包含 dataDate, dataItemNo, dataValue 的数据 dataItemNoList (list): 数据项编号列表 filename (str): 保存的Excel文件名 dateEnd (str): 结束日期,YYYYMMDD格式,默认为当前日期 """ try: from datetime import datetime, timedelta print(f'开始更新近两个月数据到 {filename}') # 处理结束日期,默认为当前日期 if dateEnd is None: end_date = datetime.now() else: # 解析YYYYMMDD格式的日期 try: end_date = datetime.strptime(str(dateEnd), '%Y%m%d') except ValueError: print(f'日期格式错误: {dateEnd},使用当前日期') end_date = datetime.now() # 从结束日期往回推算两个月(60天) two_months_ago = end_date - timedelta(days=60) cutoff_date = two_months_ago.strftime('%Y-%m-%d') end_date_str = end_date.strftime('%Y-%m-%d') print(f'结束日期: {end_date_str} (dateEnd: {dateEnd})') print(f'数据更新范围: {cutoff_date} 到 {end_date_str} (近两个月)') print(f'超过两个月前的数据将保持不变') # 筛选需要更新的近两个月数据 data_df_copy = data_df.copy() data_df_copy["dataDate"] = pd.to_datetime(data_df_copy["dataDate"]) recent_data = data_df_copy[ (data_df_copy["dataDate"] >= two_months_ago) & (data_df_copy["dataDate"] <= end_date) ] print(f'原始数据总数: {len(data_df)}') print(f'需要更新的近两个月数据: {len(recent_data)}') if recent_data.empty: print('❌ 没有需要更新的近两个月数据') return # 将日期转回字符串格式 recent_data["dataDate"] = recent_data["dataDate"].dt.strftime( '%Y-%m-%d') # 读取现有Excel文件 try: df_existing = pd.read_excel(filename, header=None) # 保留前两行:第一行中文列名,第二行ID header_rows = df_existing.iloc[0:2].copy() existing_columns = df_existing.columns.tolist() # 分离现有数据(从第三行开始) if len(df_existing) > 2: existing_data_rows = df_existing.iloc[2:].copy() # 将第一列转换为日期进行比较 existing_data_rows.iloc[:, 0] = pd.to_datetime( existing_data_rows.iloc[:, 0], errors='coerce') # 分离出需要保留的历史数据(两个月之前的数据) old_data_mask = existing_data_rows.iloc[:, 0] < two_months_ago # 显式使用copy() old_data = existing_data_rows[old_data_mask].copy() # 安全地将日期转回字符串 if not old_data.empty: # 先检查是否已经是datetime类型 if pd.api.types.is_datetime64_any_dtype(old_data.iloc[:, 0]): old_data.iloc[:, 0] = old_data.iloc[:, 0].dt.strftime('%Y-%m-%d') else: # 如果不是datetime类型,先转换再格式化 try: old_data_dates = pd.to_datetime( old_data.iloc[:, 0], errors='coerce') if old_data_dates.notna().any(): old_data.iloc[:, 0] = old_data_dates.dt.strftime( '%Y-%m-%d') else: # 如果转换失败,保持原始值 print('警告:历史数据日期转换失败,保持原始格式') except Exception as e: print(f'警告:历史数据日期处理出错: {str(e)},保持原始格式') print(f'文件中现有数据行数: {len(existing_data_rows)}') print(f'保留的两个月前历史数据: {len(old_data)} 行') else: old_data = pd.DataFrame(columns=existing_columns) print('文件中没有现有数据行') print(f'使用现有文件的表头结构: {len(existing_columns)} 列') except FileNotFoundError: # 如果文件不存在,创建新的前两行结构 chinese_names = ['日期'] + \ [f'数据项{i}' for i in range(len(dataItemNoList))] id_row = ['日期'] + [str(item) for item in dataItemNoList] max_cols = max(len(chinese_names), len(id_row)) existing_columns = list(range(max_cols)) header_data = [] header_data.append( chinese_names + [''] * (max_cols - len(chinese_names))) header_data.append(id_row + [''] * (max_cols - len(id_row))) header_rows = pd.DataFrame(header_data, columns=existing_columns) old_data = pd.DataFrame(columns=existing_columns) print(f'创建新的表头结构: {len(existing_columns)} 列') # 处理近两个月的新数据,按日期分组 grouped = recent_data.groupby("dataDate") # 构建数据列表 new_data_rows = [] mapping_debug_info = [] # 添加调试信息 print(f"\n🔍 开始数据映射过程:") print(f"📋 按日期分组数: {len(grouped)}") # 逐日期处理数据 for date, group in grouped: print(f"\n📅 处理日期: {date}, 记录数: {len(group)}") # 创建新行数据,初始化为空值 row_data = [''] * len(existing_columns) row_data[0] = date # 第一列是日期 # 根据第二行的ID来匹配数据 id_row_values = header_rows.iloc[1].tolist() mapped_count = 0 unmapped_items = [] # 遍历每个数据项,找到对应的列位置 for dataItemNo in dataItemNoList: dataItemNo_str = str(dataItemNo) # 在ID行中查找对应的列位置 - 支持字符串和数字类型匹配 col_index = None try: # 首先尝试精确字符串匹配 col_index = id_row_values.index(dataItemNo_str) except ValueError: # 如果字符串匹配失败,尝试数据类型转换匹配 try: # 尝试将dataItemNo转换为数字进行匹配 if dataItemNo_str.isdigit(): dataItemNo_num = int(dataItemNo_str) col_index = id_row_values.index(dataItemNo_num) else: # 尝试将Excel中的数字ID转换为字符串匹配 for i, excel_id in enumerate(id_row_values): if str(excel_id) == dataItemNo_str: col_index = i break except (ValueError, TypeError): pass if col_index is not None: # 查找对应的dataValue matching_rows = group[group["dataItemNo"] == dataItemNo_str] if not matching_rows.empty: data_value = matching_rows["dataValue"].iloc[0] # 安全检查索引范围 if col_index < len(row_data): row_data[col_index] = data_value mapped_count += 1 mapping_debug_info.append( f"{date}|{dataItemNo_str}|{col_index}|{data_value}|成功") else: unmapped_items.append(f"{dataItemNo_str}(索引越界)") mapping_debug_info.append( f"{date}|{dataItemNo_str}|{col_index}|{data_value}|索引越界") else: mapping_debug_info.append( f"{date}|{dataItemNo_str}|{col_index}|NULL|无数据值") else: # 如果在ID行中没找到对应的数据项,跳过 unmapped_items.append(f"{dataItemNo_str}(未找到列)") mapping_debug_info.append( f"{date}|{dataItemNo_str}|N/A|N/A|未找到列") continue print(f" 📊 该日期成功映射: {mapped_count}/{len(dataItemNoList)}") if unmapped_items and len(unmapped_items) <= 3: print(f" ⚠️ 未映射: {unmapped_items}") new_data_rows.append(row_data) print(f"\n📈 数据更新统计:") print(f" 更新日期数: {len(new_data_rows)}") print(f" 映射记录数: {len(mapping_debug_info)}") # 创建新数据DataFrame df_new_data = pd.DataFrame(new_data_rows, columns=existing_columns) # 合并历史数据和新数据 if not old_data.empty: # 有历史数据需要保留 all_data_rows = pd.concat( [old_data, df_new_data], ignore_index=True) print(f"📝 合并数据: {len(old_data)} 行历史数据 + {len(df_new_data)} 行新数据") else: # 没有历史数据 all_data_rows = df_new_data print(f"📝 新建数据: {len(df_new_data)} 行") # 按日期排序 try: date_column = all_data_rows.iloc[:, 0] date_column_dt = pd.to_datetime(date_column, errors='coerce') if date_column_dt.notna().any(): all_data_rows = all_data_rows.iloc[date_column_dt.argsort()] all_data_rows.iloc[:, 0] = date_column_dt.dt.strftime( '%Y-%m-%d') else: print('警告:日期格式转换失败,保持原始格式') except Exception as date_error: print(f'日期处理出现问题,保持原始格式: {str(date_error)}') # 重置索引 all_data_rows = all_data_rows.reset_index(drop=True) # 合并前两行和所有数据行 df_final = pd.concat([header_rows, all_data_rows], ignore_index=True) # 保存到Excel文件 df_final.to_excel(filename, index=False, header=False, engine='openpyxl') print(f'✅ 成功更新近两个月数据到 {filename}') print(f'📊 文件总行数: {len(df_final)} (前两行标题 + {len(all_data_rows)} 行数据)') if len(all_data_rows) > 0: print( f'📅 数据日期范围: {all_data_rows.iloc[:, 0].min()} 到 {all_data_rows.iloc[:, 0].max()}') except Exception as e: print(f'❌ 更新近两个月数据时发生错误: {str(e)}') import traceback traceback.print_exc() raise def save_all_historical_data_xls(data_df, dataItemNoList, filename='沥青数据项.xlsx'): """ 保存所有历史日期的数据,不受日期限制 保留源文件的前两行(中文列名+ID行),然后追加数据 参数: data_df (DataFrame): 包含 dataDate, dataItemNo, dataValue 的数据 dataItemNoList (list): 数据项编号列表 filename (str): 保存的Excel文件名 """ try: print(f'开始保存所有历史数据到 {filename}') # 按日期分组数据 grouped = data_df.groupby("dataDate") print(f'总共有 {len(grouped)} 个日期的数据') # 读取现有Excel文件的前两行结构 try: df_existing = pd.read_excel( filename, header=None) # 不指定header,保持原始结构 # 保留前两行:第一行中文列名,第二行ID header_rows = df_existing.iloc[0:2].copy() existing_columns = df_existing.columns.tolist() # 使用数字列索引 print(f'使用现有文件的表头结构: {len(existing_columns)} 列') print(f'第一行(中文列名): {header_rows.iloc[0].tolist()}') print(f'第二行(ID编号): {header_rows.iloc[1].tolist()}') except FileNotFoundError: # 如果文件不存在,创建新的前两行结构 # 第一行:中文列名(假设第一列是日期,其他列需要从现有数据推断) chinese_names = ['日期'] + \ [f'数据项{i}' for i in range(len(dataItemNoList))] # 第二行:ID编号 id_row = ['日期'] + [str(item) for item in dataItemNoList] # 创建前两行DataFrame max_cols = max(len(chinese_names), len(id_row)) existing_columns = list(range(max_cols)) header_data = [] header_data.append( chinese_names + [''] * (max_cols - len(chinese_names))) header_data.append(id_row + [''] * (max_cols - len(id_row))) header_rows = pd.DataFrame(header_data, columns=existing_columns) print(f'创建新的表头结构: {len(existing_columns)} 列') # 构建数据列表 all_data = [] mapping_debug_info = [] # 添加调试信息 print(f"\n🔍 开始数据映射过程:") print(f"📋 按日期分组数: {len(grouped)}") # 逐日期处理数据 for date, group in grouped: print(f"\n📅 处理日期: {date}, 记录数: {len(group)}") # 创建新行数据,初始化为空值 row_data = [''] * len(existing_columns) row_data[0] = date # 第一列是日期 # 根据第二行的ID来匹配数据 id_row_values = header_rows.iloc[1].tolist() print(f"🏷️ ID行总数: {len(id_row_values)}") mapped_count = 0 unmapped_items = [] # 遍历每个数据项,找到对应的列位置 for dataItemNo in dataItemNoList: dataItemNo_str = str(dataItemNo) # 在ID行中查找对应的列位置 - 支持字符串和数字类型匹配 col_index = None try: # 首先尝试精确字符串匹配 col_index = id_row_values.index(dataItemNo_str) except ValueError: # 如果字符串匹配失败,尝试数据类型转换匹配 try: # 尝试将dataItemNo转换为数字进行匹配 if dataItemNo_str.isdigit(): dataItemNo_num = int(dataItemNo_str) col_index = id_row_values.index(dataItemNo_num) print( f" 🔄 {dataItemNo_str} -> 数字匹配成功,列{col_index}") else: # 尝试将Excel中的数字ID转换为字符串匹配 for i, excel_id in enumerate(id_row_values): if str(excel_id) == dataItemNo_str: col_index = i print( f" 🔄 {dataItemNo_str} -> 类型转换匹配成功,列{col_index}") break except (ValueError, TypeError): pass if col_index is not None: # 查找对应的dataValue matching_rows = group[group["dataItemNo"] == dataItemNo_str] if not matching_rows.empty: data_value = matching_rows["dataValue"].iloc[0] # 安全检查索引范围 if col_index < len(row_data): row_data[col_index] = data_value mapped_count += 1 print( f" ✅ {dataItemNo_str} -> 列{col_index} = {data_value}") mapping_debug_info.append( f"{date}|{dataItemNo_str}|{col_index}|{data_value}|成功") else: print( f" ❌ {dataItemNo_str} -> 列索引{col_index}超出范围{len(row_data)}") unmapped_items.append(f"{dataItemNo_str}(索引越界)") mapping_debug_info.append( f"{date}|{dataItemNo_str}|{col_index}|{data_value}|索引越界") else: print(f" ⚠️ {dataItemNo_str} -> 在列{col_index}但无数据值") mapping_debug_info.append( f"{date}|{dataItemNo_str}|{col_index}|NULL|无数据值") else: # 如果在ID行中没找到对应的数据项,跳过 print(f" ❌ {dataItemNo_str} -> 未找到列位置(已尝试字符串和数字匹配)") unmapped_items.append(f"{dataItemNo_str}(未找到列)") mapping_debug_info.append( f"{date}|{dataItemNo_str}|N/A|N/A|未找到列") continue print(f" 📊 该日期成功映射: {mapped_count}/{len(dataItemNoList)}") if unmapped_items: print( f" ⚠️ 未映射的数据项: {unmapped_items[:3]}{'...' if len(unmapped_items) > 3 else ''}") all_data.append(row_data) print(f"\n📈 数据更新统计:") print(f" 更新日期数: {len(new_data_rows)}") print(f" 映射记录数: {len(mapping_debug_info)}") # 保存调试信息到文件 debug_filename = f'mapping_debug_{datetime.now().strftime("%Y%m%d_%H%M%S")}.txt' with open(debug_filename, 'w', encoding='utf-8') as f: f.write("\n".join(mapping_debug_info)) print(f" 📄 映射调试信息已保存到: {debug_filename}") # 创建新数据DataFrame df_new_data = pd.DataFrame(new_data_rows, columns=existing_columns) # 处理日期排序 - 修复datetime转换问题 try: # 先将第一列转换为datetime,处理可能的转换失败 date_column = df_new_data.iloc[:, 0] date_column_dt = pd.to_datetime(date_column, errors='coerce') # 检查是否有有效的datetime值 if date_column_dt.notna().any(): # 按日期排序 df_new_data = df_new_data.iloc[date_column_dt.argsort()] # 将日期转换为字符串格式 df_new_data.iloc[:, 0] = date_column_dt.dt.strftime('%Y-%m-%d') else: # 如果日期转换全部失败,保持原始格式 print('警告:日期格式转换失败,保持原始格式') except Exception as date_error: print(f'日期处理出现问题,保持原始格式: {str(date_error)}') # 重置索引 df_new_data = df_new_data.reset_index(drop=True) # 合并前两行和数据行 df_final = pd.concat([header_rows, df_new_data], ignore_index=True) # 保存到Excel文件(不包含pandas的列名) df_final.to_excel(filename, index=False, header=False, engine='openpyxl') print(f'成功保存 {len(all_data)} 行数据到 {filename}') if len(all_data) > 0: print( f'数据日期范围: {df_new_data.iloc[:, 0].min()} 到 {df_new_data.iloc[:, 0].max()}') except Exception as e: print(f'保存历史数据时发生错误: {str(e)}') import traceback traceback.print_exc() raise # def save_queryDataListItemNos_xls(data_df,dataItemNoList): # from datetime import datetime, timedelta # current_year_month = datetime.now().strftime('%Y-%m') # grouped = data_df.groupby("dataDate") # # 打开xls文件 # workbook = xlrd.open_workbook('沥青数据项.xlsx') # # 获取所有sheet的个数 # sheet_count = len(workbook.sheet_names()) # # 获取所有sheet的名称 # sheet_names = workbook.sheet_names() # new_workbook = xlwt.Workbook() # for i in range(sheet_count): # # 获取当前sheet # sheet = workbook.sheet_by_index(i) # # 获取sheet的行数和列数 # row_count = sheet.nrows # col_count = sheet.ncols # # 获取原有数据 # data = [] # for row in range(row_count): # row_data = [] # for col in range(col_count): # row_data.append(sheet.cell_value(row, col)) # data.append(row_data) # # 创建xlwt的Workbook对象 # # 创建sheet # new_sheet = new_workbook.add_sheet(sheet_names[i]) # current_year_month_row = 0 # # 将原有的数据写入新的sheet # for row in range(row_count): # for col in range(col_count): # col0 = data[row][0] # # print("col0",col0[:7]) # if col0[:7] == current_year_month: # current_year_month_row += 1 # break # new_sheet.write(row, col, data[row][col]) # # print("current_year_month_row",current_year_month_row) # if i == 0: # rowFlag = 0 # # 查看每组数据 # for date, group in grouped: # new_sheet.write(row_count + rowFlag - current_year_month_row, 0, date) # for j in range(len(dataItemNoList)): # dataItemNo = dataItemNoList[j] # # for dataItemNo in dataItemNoList: # if group[group["dataItemNo"] == dataItemNo]["dataValue"].values: # new_sheet.write(row_count + rowFlag - current_year_month_row, j + 1, group[group["dataItemNo"] == dataItemNo]["dataValue"].values[0]) # rowFlag += 1 # # 保存新的xls文件 # new_workbook.save("沥青数据项.xlsx") def queryDataListItemNos(token=None): df = pd.read_excel('沥青数据项.xlsx') dataItemNoList = df.iloc[0].tolist()[1:] # float 转字符串,不要小数位 dataItemNoList = [str(int(item)) if isinstance(item, float) and not pd.isna( item) else str(item) for item in dataItemNoList] # 打印列表长度,检查是否超过50个限制 print(f'数据项总数: {len(dataItemNoList)}') if token is None: token = get_head_auth() if not token: print('token获取失败') return # 获取当前日期 from datetime import datetime, timedelta current_date = datetime.now() # 从结束日期往回推算两个月,只获取近两个月的数据 two_months_ago = current_date - timedelta(days=60) # 格式化为 YYYYMMDD 格式 dateEnd = current_date.strftime('%Y%m%d') dateStart = two_months_ago.strftime('%Y%m%d') # 从两个月前开始 print(f'📅 数据获取范围: {dateStart} 到 {dateEnd} (近两个月)') # 将数据项列表分批处理,每批最多50个 batch_size = 50 all_search_values = [] for i in range(0, len(dataItemNoList), batch_size): batch = dataItemNoList[i:i + batch_size] print(f'处理第 {i//batch_size + 1} 批,共 {len(batch)} 个数据项') search_value = get_queryDataListItemNos_value( token, queryDataListItemNos_url, batch, dateStart, dateEnd) if search_value: all_search_values.extend(search_value) else: print(f'第 {i//batch_size + 1} 批数据获取失败') if not all_search_values: print('所有批次数据获取失败') return print(f'总共获取到 {len(all_search_values)} 条数据记录') # 合并所有批次的数据 data_df = pd.DataFrame(all_search_values) data_df["dataDate"] = pd.to_datetime(data_df["dataDate"]) data_df["dataDate"] = data_df["dataDate"].dt.strftime('%Y-%m-%d') # # 使用完整的数据项列表保存数据 # save_queryDataListItemNos_xls(data_df, dataItemNoList) # print('当月数据更新完成') # 保存新的数据 # save_all_historical_data_xls(data_df, dataItemNoList) # 保存近两个月数据(新方法,从dateEnd往回推算两个月) save_recent_two_months_data_xls(data_df, dataItemNoList, dateEnd=dateEnd) def save_xls_1(append_rows): # 打开xls文件 workbook = xlrd.open_workbook('沥青数据项.xlsx') # 获取所有sheet的个数 sheet_count = len(workbook.sheet_names()) # 获取所有sheet的名称 sheet_names = workbook.sheet_names() new_workbook = xlwt.Workbook() for i in range(sheet_count): # 获取当前sheet sheet = workbook.sheet_by_index(i) # 获取sheet的行数和列数 row_count = sheet.nrows - 1 col_count = sheet.ncols # 获取原有数据 data = [] for row in range(row_count): row_data = [] for col in range(col_count): row_data.append(sheet.cell_value(row, col)) data.append(row_data) # 创建xlwt的Workbook对象 # 创建sheet new_sheet = new_workbook.add_sheet(sheet_names[i]) # 将原有的数据写入新的sheet for row in range(row_count): for col in range(col_count): new_sheet.write(row, col, data[row][col]) if i == 0: # 在新的sheet中添加数据 for col in range(col_count): new_sheet.write(row_count, col, append_rows[col]) # 保存新的xls文件 new_workbook.save("沥青数据项.xlsx") def start(date=''): """获取当日数据""" read_xls_data() token = get_head_auth() if not token: return cur_time, cur_time2 = getNow(date) print(f"获取{cur_time}数据") datas = get_data_value(token, one_cols, date=cur_time) print(len(datas)) print(datas) if not datas: return append_rows = [cur_time2] dataItemNo_dataValue = {} for data_value in datas: if "dataValue" not in data_value: print(data_value) dataItemNo_dataValue[data_value["dataItemNo"]] = "" else: dataItemNo_dataValue[data_value["dataItemNo"] ] = data_value["dataValue"] for value in one_cols: if value in dataItemNo_dataValue: append_rows.append(dataItemNo_dataValue[value]) else: append_rows.append("") print('添加的行:', len(append_rows), append_rows) save_xls_2(append_rows) def save_xls_2(append_rows): """保存或更新数据到Excel文件 参数: append_rows (list): 需要追加/更新的数据行,格式为[日期, 数据项1, 数据项2,...] """ # try: # 读取现有数据(假设第一行为列名) df = pd.read_excel('沥青数据项.xlsx', sheet_name=0) print('文件中的数据列数:', len(df.columns), df.columns) # 转换append_rows为DataFrame if len(append_rows) != len(df.columns): # 去除第二个元素 ,不知道什么原因多一个空数据 append_rows.pop(1) append_rows = pd.DataFrame([append_rows], columns=df.columns) # 创建新数据行 new_date = append_rows['日期'].values[0] dates = df['日期'].to_list() # 判断日期是否存在 if new_date in dates: # 找到日期所在行的索引 date_mask = df['日期'] == new_date # 存在则更新数据 df.loc[date_mask] = append_rows.values print(f"更新 {new_date} 数据") else: # 不存在则追加数据 df = pd.concat([df, append_rows], ignore_index=True) print(df.head()) print(df.tail()) print(f"插入 {new_date} 新数据") # 保存更新后的数据 df.to_excel('沥青数据项.xlsx', index=False, engine='openpyxl') # except FileNotFoundError: # # 如果文件不存在则创建新文件 # pd.DataFrame([append_rows]).to_excel('沥青数据项.xlsx', index=False, engine='openpyxl') # except Exception as e: # print(f"保存数据时发生错误: {str(e)}") def main(start_date=None, token=None, token_push=None): from datetime import datetime, timedelta if start_date is None: start_date = datetime.now() if token is None: token = get_head_auth() if token_push is None: token_push = get_head_push_auth() date = start_date.strftime('%Y%m%d') print(date) # start(date) # 更新当月数据 queryDataListItemNos(token) # 训练模型 optimize_Model() # # 预测&上传预测结果 upload_data_to_system(token_push, start_date) if __name__ == "__main__": print("运行中ing...") main()