代码加注释

This commit is contained in:
workpc 2024-12-06 09:43:01 +08:00
parent 89b4a2c7a6
commit 4025e512cb

View File

@ -47,6 +47,17 @@ from config_jingbo import *
# 定义函数
def loadcsv(filename):
"""
读取指定文件名的 CSV 文件
如果文件编码为 UTF-8则使用 UTF-8 编码读取否则使用 GBK 编码读取
参数:
filename (str): 要读取的 CSV 文件的文件名
返回:
pandas.DataFrame: 读取的数据
"""
# 读取csv文件
try:
df = pd.read_csv(filename, encoding='utf-8')
@ -54,7 +65,19 @@ def loadcsv(filename):
df = pd.read_csv(filename, encoding='gbk')
return df
def dateConvert(df, datecol='ds'):
"""
将数据框 df 中的 datecol 列转换为日期时间类型
参数:
df (pandas.DataFrame): 要转换的 DataFrame
datecol (str): 要转换的列名默认为 'ds'
返回:
pandas.DataFrame: 转换后的 DataFrame
"""
# 将date列转换为datetime类型
try:
df[datecol] = pd.to_datetime(df[datecol],format=r'%Y-%m-%d')
@ -65,28 +88,39 @@ def dateConvert(df, datecol='ds'):
def calculate_kdj(data, n=9):
'''
给传进来的df 添加列 波动率最高最低k ,d j
给传进来的df 添加列 波动率最高最低k,d j
'''
# 对数据按照日期升序排序
data = data.sort_values(by='ds', ascending=True)
# 因为没有高开低价格,利用每日波动率模拟当天最高价和最低价
data['pctchange'] = data['y'].pct_change()
# 收益为0的用0.01
data['pctchange'] = data['pctchange'].replace(0,0.01)
# 去除空值
data.dropna(inplace=True)
# 重置索引
data.reset_index(drop=True,inplace=True)
# 计算最高价和最低价
data['high'] = data['y']* (1+abs(data['pctchange'])/2)
data['low'] = data['y']* (1-abs(data['pctchange'])/2)
# 计算n日内最低价
low_list = data['y'].rolling(window=n, min_periods=1).min()
# 计算n日内最高价
high_list = data['y'].rolling(window=n, min_periods=1).max()
# 计算未成熟随机值
rsv = ((data['y'] - low_list) / (high_list - low_list)) * 100
# 初始化k值为50
k = pd.Series(50, index=data.index)
# 初始化d值为50
d = pd.Series(50, index=data.index)
# 计算k值和d值
for i in range(1, len(data)):
k[i] = (2/3 * k[i - 1]) + (1/3 * rsv[i])
d[i] = (2/3 * d[i - 1]) + (1/3 * k[i])
# 计算j值
j = 3 * k - 2 * d
# 将k值、d值和j值添加到数据中
data['K'] = k
data['D'] = d
data['J'] = j
@ -95,46 +129,114 @@ def calculate_kdj(data, n=9):
# data = data.dropna()
return data
# 上传报告
def get_head_auth_report():
"""
通过 POST 请求登录到指定的 URL并从响应中获取认证令牌
返回:
str: 如果登录成功返回认证令牌否则返回 None
"""
# 发送 POST 请求到登录 URL携带登录数据
login_res = requests.post(url=login_pushreport_url, json=login_data, timeout=(3, 5))
# 将响应内容转换为 JSON 格式
text = json.loads(login_res.text)
# 如果响应状态为成功
if text["status"]:
# 从响应数据中获取认证令牌
token = text["data"]["accessToken"]
# 返回认证令牌
return token
def upload_report_data(token, upload_data):
"""
上传报告数据到指定的URL
参数:
token (str): 认证令牌
upload_data (dict): 要上传的报告数据包含必要的字段和信息
返回:
dict: 如果上传成功返回响应对象否则返回None
"""
# 直接使用传入的 upload_data
upload_data = upload_data
# 设置请求头部
headers = {"Authorization": token}
# 打印日志,显示正在上传报告数据
logger.info("报告上传中...")
# 打印日志,显示认证头部信息
logger.info(f"token:{token}")
# 打印日志,显示要上传的报告数据
logger.info(f"upload_data:{upload_data}" )
# 发送POST请求上传报告数据
upload_res = requests.post(url=upload_url, headers=headers, json=upload_data, timeout=(3, 15))
# 将响应内容转换为 JSON 格式
upload_res = json.loads(upload_res.text)
# 打印日志,显示响应内容
logger.info(upload_res)
# 如果上传成功,返回响应对象
if upload_res:
return upload_res
# 如果上传失败打印日志并返回None
else:
logger.info("报告上传失败")
return None
def upload_warning_data(warning_data):
"""
上传预警数据到指定的URL
参数:
warning_data (dict): 要上传的预警数据包含必要的字段和信息
返回:
requests.Response: 如果上传成功返回响应对象否则返回None
"""
# 获取认证头部信息
token = get_head_auth_report()
warning_data = warning_data
# 设置请求头部
headers = {"Authorization": token}
# 打印日志,显示正在上传预警数据
logger.info("预警上传中...")
# 打印日志显示上传的URL
logger.info(f"upload_warning_url:{upload_warning_url}")
# 打印日志,显示认证头部信息
logger.info(f"token:{token}")
logger.info(f"warning_data:{warning_data}" )
# 打印日志,显示要上传的预警数据
logger.info(f"warning_data:{warning_data}")
# 发送POST请求上传预警数据
upload_res = requests.post(url=upload_warning_url, headers=headers, json=warning_data, timeout=(3, 15))
# 如果上传成功,返回响应对象
if upload_res:
return upload_res
# 如果上传失败打印日志并返回None
else:
logger.info("预警上传失败")
return None
def upload_warning_info(df_count):
logger.info(f'上传预警信息')
try: