diff --git a/lib/dataread.py b/lib/dataread.py index 39c913a..4e893ac 100644 --- a/lib/dataread.py +++ b/lib/dataread.py @@ -47,6 +47,17 @@ from config_jingbo import * # 定义函数 def loadcsv(filename): + """ + 读取指定文件名的 CSV 文件。 + 如果文件编码为 UTF-8,则使用 UTF-8 编码读取;否则,使用 GBK 编码读取。 + + 参数: + filename (str): 要读取的 CSV 文件的文件名。 + + 返回: + pandas.DataFrame: 读取的数据。 + + """ # 读取csv文件 try: df = pd.read_csv(filename, encoding='utf-8') @@ -54,7 +65,19 @@ def loadcsv(filename): df = pd.read_csv(filename, encoding='gbk') return df + def dateConvert(df, datecol='ds'): + """ + 将数据框 df 中的 datecol 列转换为日期时间类型。 + + 参数: + df (pandas.DataFrame): 要转换的 DataFrame。 + datecol (str): 要转换的列名,默认为 'ds'。 + + 返回: + pandas.DataFrame: 转换后的 DataFrame。 + + """ # 将date列转换为datetime类型 try: df[datecol] = pd.to_datetime(df[datecol],format=r'%Y-%m-%d') @@ -65,28 +88,39 @@ def dateConvert(df, datecol='ds'): def calculate_kdj(data, n=9): ''' - 给传进来的df 添加列: 波动率,最高,最低,k ,d ,j + 给传进来的df 添加列: 波动率,最高,最低,k,d ,j ''' + # 对数据按照日期升序排序 data = data.sort_values(by='ds', ascending=True) # 因为没有高开低价格,利用每日波动率模拟当天最高价和最低价 data['pctchange'] = data['y'].pct_change() # 收益为0的用0.01 data['pctchange'] = data['pctchange'].replace(0,0.01) + # 去除空值 data.dropna(inplace=True) # 重置索引 data.reset_index(drop=True,inplace=True) + # 计算最高价和最低价 data['high'] = data['y']* (1+abs(data['pctchange'])/2) data['low'] = data['y']* (1-abs(data['pctchange'])/2) + # 计算n日内最低价 low_list = data['y'].rolling(window=n, min_periods=1).min() + # 计算n日内最高价 high_list = data['y'].rolling(window=n, min_periods=1).max() + # 计算未成熟随机值 rsv = ((data['y'] - low_list) / (high_list - low_list)) * 100 + # 初始化k值为50 k = pd.Series(50, index=data.index) + # 初始化d值为50 d = pd.Series(50, index=data.index) + # 计算k值和d值 for i in range(1, len(data)): k[i] = (2/3 * k[i - 1]) + (1/3 * rsv[i]) d[i] = (2/3 * d[i - 1]) + (1/3 * k[i]) + # 计算j值 j = 3 * k - 2 * d + # 将k值、d值和j值添加到数据中 data['K'] = k data['D'] = d data['J'] = j @@ -95,44 +129,112 @@ def calculate_kdj(data, n=9): # data = data.dropna() return data + # 上传报告 def get_head_auth_report(): + """ + 通过 POST 请求登录到指定的 URL,并从响应中获取认证令牌。 + + 返回: + str: 如果登录成功,返回认证令牌;否则返回 None。 + """ + # 发送 POST 请求到登录 URL,携带登录数据 login_res = requests.post(url=login_pushreport_url, json=login_data, timeout=(3, 5)) + + # 将响应内容转换为 JSON 格式 text = json.loads(login_res.text) + + # 如果响应状态为成功 if text["status"]: + # 从响应数据中获取认证令牌 token = text["data"]["accessToken"] + # 返回认证令牌 return token def upload_report_data(token, upload_data): + """ + 上传报告数据到指定的URL + + 参数: + token (str): 认证令牌 + upload_data (dict): 要上传的报告数据,包含必要的字段和信息 + + 返回: + dict: 如果上传成功,返回响应对象;否则返回None + """ + # 直接使用传入的 upload_data upload_data = upload_data + + # 设置请求头部 headers = {"Authorization": token} + + # 打印日志,显示正在上传报告数据 logger.info("报告上传中...") + + # 打印日志,显示认证头部信息 logger.info(f"token:{token}") + + # 打印日志,显示要上传的报告数据 logger.info(f"upload_data:{upload_data}" ) + + # 发送POST请求,上传报告数据 upload_res = requests.post(url=upload_url, headers=headers, json=upload_data, timeout=(3, 15)) + + # 将响应内容转换为 JSON 格式 upload_res = json.loads(upload_res.text) + + # 打印日志,显示响应内容 logger.info(upload_res) + + # 如果上传成功,返回响应对象 if upload_res: return upload_res + # 如果上传失败,打印日志并返回None else: logger.info("报告上传失败") return None + def upload_warning_data(warning_data): + """ + 上传预警数据到指定的URL + + 参数: + warning_data (dict): 要上传的预警数据,包含必要的字段和信息 + + 返回: + requests.Response: 如果上传成功,返回响应对象;否则返回None + """ + # 获取认证头部信息 token = get_head_auth_report() - warning_data = warning_data + + # 设置请求头部 headers = {"Authorization": token} + + # 打印日志,显示正在上传预警数据 logger.info("预警上传中...") + + # 打印日志,显示上传的URL logger.info(f"upload_warning_url:{upload_warning_url}") + + # 打印日志,显示认证头部信息 logger.info(f"token:{token}") - logger.info(f"warning_data:{warning_data}" ) + + # 打印日志,显示要上传的预警数据 + logger.info(f"warning_data:{warning_data}") + + # 发送POST请求,上传预警数据 upload_res = requests.post(url=upload_warning_url, headers=headers, json=warning_data, timeout=(3, 15)) + + # 如果上传成功,返回响应对象 if upload_res: return upload_res + # 如果上传失败,打印日志并返回None else: logger.info("预警上传失败") return None + def upload_warning_info(df_count):