django-vue3-admin-backend/dvadmin/utils/filters.py
2025-10-20 21:30:27 +08:00

461 lines
20 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- coding: utf-8 -*-
"""
@author: 猿小天
@contact: QQ:1638245306
@Created on: 2021/6/6 006 12:39
@Remark: 自定义过滤器
"""
import operator
import re
from collections import OrderedDict
from functools import reduce
import six
from django.db import models
from django.db.models import Q, F
from django.db.models.constants import LOOKUP_SEP
from django_filters.constants import ALL_FIELDS
from django_filters.rest_framework import DjangoFilterBackend
from django_filters.utils import get_model_field, translate_validation, deprecate
from rest_framework.request import Request
from rest_framework.filters import BaseFilterBackend
from django_filters.conf import settings
from dvadmin.system.models import Dept, ApiWhiteList, RoleMenuButtonPermission, MenuButton, Users
from util.currency import recursion_down_fast
class CoreModelFilterBankend(BaseFilterBackend):
"""
自定义时间范围过滤器
"""
def filter_queryset(self, request, queryset, view):
create_datetime_after = request.query_params.get('create_datetime_after', None)
create_datetime_before = request.query_params.get('create_datetime_before', None)
update_datetime_after = request.query_params.get('update_datetime_after', None)
update_datetime_before = request.query_params.get('update_datetime_before', None)
if any([create_datetime_after, create_datetime_before, update_datetime_after, update_datetime_before]):
create_filter = Q()
if create_datetime_after and create_datetime_before:
create_filter &= Q(create_datetime__gte=create_datetime_after) & Q(create_datetime__lte=f'{create_datetime_before} 23:59:59')
elif create_datetime_after:
create_filter &= Q(create_datetime__gte=create_datetime_after)
elif create_datetime_before:
create_filter &= Q(create_datetime__lte=f'{create_datetime_before} 23:59:59')
# 更新时间范围过滤条件
update_filter = Q()
if update_datetime_after and update_datetime_before:
update_filter &= Q(update_datetime__gte=update_datetime_after) & Q(update_datetime__lte=update_datetime_before)
elif update_datetime_after:
update_filter &= Q(update_datetime__gte=update_datetime_after)
elif update_datetime_before:
update_filter &= Q(update_datetime__lte=update_datetime_before)
# 结合两个时间范围过滤条件
queryset = queryset.filter(create_filter & update_filter)
return queryset
return queryset
def get_dept(dept_id: int, dept_all_list=None, dept_list=None):
"""
递归获取部门的所有下级部门
:param dept_id: 需要获取的部门id
:param dept_all_list: 所有部门列表
:param dept_list: 递归部门list
:return:
"""
if not dept_all_list:
dept_all_list = Dept.objects.all().values("id", "parent")
if dept_list is None:
dept_list = [dept_id]
for ele in dept_all_list:
if ele.get("parent") == dept_id:
dept_list.append(ele.get("id"))
get_dept(ele.get("id"), dept_all_list, dept_list)
return list(set(dept_list))
class DataLevelPermissionsFilter(BaseFilterBackend):
"""
数据 级权限过滤器
0. 获取用户的部门id没有部门则返回空
1. 判断过滤的数据是否有创建人所在部门 "creator" 字段,没有则返回全部
2. 如果用户没有关联角色则返回本部门数据
3. 根据角色的最大权限进行数据过滤(会有多个角色,进行去重取最大权限)
3.1 判断用户是否为超级管理员角色/如果有1(所有数据) 则返回所有数据
4. 只为仅本人数据权限时只返回过滤本人数据,并且部门为自己本部门(考虑到用户会变部门,只能看当前用户所在的部门数据)
5. 自定数据权限 获取部门,根据部门过滤
"""
def filter_queryset(self, request, queryset, view):
"""
接口白名单是否认证数据权限
"""
api = request.path # 当前请求接口
method = request.method # 当前请求方法
methodList = ["GET", "POST", "PUT", "DELETE", "OPTIONS"]
method = methodList.index(method)
# ***接口白名单***
api_white_list = ApiWhiteList.objects.filter(enable_datasource=False).values(
permission__api=F("url"), permission__method=F("method")
)
api_white_list = [
str(item.get("permission__api").replace("{id}", ".*?"))
+ ":"
+ str(item.get("permission__method"))
for item in api_white_list
if item.get("permission__api")
]
for item in api_white_list:
new_api = f"{api}:{method}"
matchObj = re.match(item, new_api, re.M | re.I)
if matchObj is None:
continue
else:
return queryset
"""
判断是否为超级管理员:
如果不是超级管理员,则进入下一步权限判断
"""
if request.user.is_superuser == 0:
return self._extracted_from_filter_queryset_33(request, queryset, api, method)
else:
return queryset
# TODO Rename this here and in `filter_queryset`
def _extracted_from_filter_queryset_33(self, request, queryset, api, method):
# 0. 获取用户的部门id没有部门则返回空
user_dept_id = getattr(request.user, "dept_id", None)
if not user_dept_id:
return queryset.none()
# 1. 判断过滤的数据是否有创建人所在部门 "dept_belong_id" 字段
if not getattr(queryset.model, "dept_belong_id", None):
return queryset
# 2. 如果用户没有关联角色则返回本部门数据
if not hasattr(request.user, "role"):
return queryset.filter(dept_belong_id=user_dept_id)
# 3. 根据所有角色 获取所有权限范围
# (0, "仅本人数据权限"),
# (1, "本部门及以下数据权限"),
# (2, "本部门数据权限"),
# (3, "全部数据权限"),
# (4, "自定数据权限")
re_api = api
_pk = request.parser_context["kwargs"].get('pk')
if _pk: # 判断是否是单例查询
re_api = re.sub(_pk,'{id}', api)
role_id_list = request.user.role.values_list('id', flat=True)
# 修复权限获取bug
menu_button_ids = MenuButton.objects.filter(api=re_api,method=method).values_list('id', flat=True)
role_permission_list = []
if menu_button_ids:
role_permission_list=RoleMenuButtonPermission.objects.filter(
role__in=role_id_list,
role__status=1,
menu_button_id__in=menu_button_ids).values(
'data_range'
)
dataScope_list = [] # 权限范围列表
for ele in role_permission_list:
# 判断用户是否为超级管理员角色/如果拥有[全部数据权限]则返回所有数据
if ele.get("data_range") == 3:
return queryset
dataScope_list.append(ele.get("data_range"))
dataScope_list = list(set(dataScope_list))
# 4. 只为仅本人数据权限时只返回过滤本人数据,并且部门为自己本部门(考虑到用户会变部门,只能看当前用户所在的部门数据)
if 0 in dataScope_list:
return queryset.filter(
creator=request.user, dept_belong_id=user_dept_id
)
# 5. 自定数据权限 获取部门,根据部门过滤
dept_list = []
for ele in dataScope_list:
if ele == 1:
dept_list.append(user_dept_id)
dept_list.extend(
get_dept(
user_dept_id,
)
)
elif ele == 2:
dept_list.append(user_dept_id)
elif ele == 4:
dept_ids = RoleMenuButtonPermission.objects.filter(
role__in=role_id_list,
role__status=1,
data_range=4).values_list(
'dept__id',flat=True
)
dept_list.extend(
dept_ids
)
if queryset.model._meta.model_name == 'dept':
return queryset.filter(id__in=list(set(dept_list)))
return queryset.filter(dept_belong_id__in=list(set(dept_list)))
class DataLevelPermissionsSubFilter(DataLevelPermissionsFilter):
"""数据级权限过滤的子过滤器过滤管理部门字段manage_dept"""
def _extracted_from_filter_queryset_33(self, request:Request, queryset, api, method):
u:Users = request.user
if u.is_superuser:
return queryset
dept_list = []
# 自己部门 交 管理部门
if u.manage_dept.exists(): # 兼容旧数据
for dept in u.manage_dept.all():
dept_list.extend(recursion_down_fast(dept, 'parent', 'id'))
else:
dept_list = recursion_down_fast(u.dept, 'parent', 'id')
dept_list = set(recursion_down_fast(u.dept)) & set(dept_list)
# 自己创建的数据要能看到
# 应对归属a管b、c等情况如果自己创建数据则是a不显式指定自己的数据就查不到
if queryset.model._meta.model_name == 'dept':
return queryset.filter(Q(id__in=dept_list) | Q(creator=u))
return queryset.filter(Q(dept_belong_id__in=dept_list) | Q(creator=u))
class DataLevelPermissionMargeFilter(DataLevelPermissionsFilter):
def _extracted_from_filter_queryset_33(self, request, queryset, api, method):
queryset = super()._extracted_from_filter_queryset_33(request, queryset, api, method)
return DataLevelPermissionsSubFilter._extracted_from_filter_queryset_33(self, request, queryset, api, method)
class CustomDjangoFilterBackend(DjangoFilterBackend):
lookup_prefixes = {
"^": "istartswith",
"=": "iexact",
"@": "search",
"$": "iregex",
"~": "icontains",
}
filter_fields = "__all__"
def construct_search(self, field_name, lookup_expr=None):
lookup = self.lookup_prefixes.get(field_name[0])
if lookup:
field_name = field_name[1:]
else:
lookup = lookup_expr
if lookup:
if field_name.endswith(lookup):
return field_name
return LOOKUP_SEP.join([field_name, lookup])
return field_name
def find_filter_lookups(self, orm_lookups, search_term_key):
for lookup in orm_lookups:
# if lookup.find(search_term_key) >= 0:
new_lookup = LOOKUP_SEP.join(lookup.split(LOOKUP_SEP)[:-1]) if len(lookup.split(LOOKUP_SEP)) > 1 else lookup
# 修复条件搜索错误 bug
if new_lookup == search_term_key:
return lookup
return None
def get_filterset_class(self, view, queryset=None):
"""
Return the `FilterSet` class used to filter the queryset.
"""
filterset_class = getattr(view, "filterset_class", None)
filterset_fields = getattr(view, "filterset_fields", None)
# TODO: remove assertion in 2.1
if filterset_class is None and hasattr(view, "filter_class"):
deprecate(
"`%s.filter_class` attribute should be renamed `filterset_class`." % view.__class__.__name__
)
filterset_class = getattr(view, "filter_class", None)
# TODO: remove assertion in 2.1
if filterset_fields is None and hasattr(view, "filter_fields"):
deprecate(
"`%s.filter_fields` attribute should be renamed `filterset_fields`." % view.__class__.__name__
)
self.filter_fields = getattr(view, "filter_fields", None)
if isinstance(self.filter_fields, (list, tuple)):
filterset_fields = [
field[1:] if field[0] in self.lookup_prefixes.keys() else field for field in self.filter_fields
]
else:
filterset_fields = self.filter_fields
if filterset_class:
filterset_model = filterset_class._meta.model
# FilterSets do not need to specify a Meta class
if filterset_model and queryset is not None:
assert issubclass(
queryset.model, filterset_model
), "FilterSet model %s does not match queryset model %s" % (
filterset_model,
queryset.model,
)
return filterset_class
if filterset_fields and queryset is not None:
MetaBase = getattr(self.filterset_base, "Meta", object)
class AutoFilterSet(self.filterset_base):
@classmethod
def get_all_model_fields(cls, model):
opts = model._meta
return [
f.name
for f in sorted(opts.fields + opts.many_to_many)
if (f.name == "id")
or not isinstance(f, models.AutoField)
and not (getattr(f.remote_field, "parent_link", False))
]
@classmethod
def get_fields(cls):
"""
Resolve the 'fields' argument that should be used for generating filters on the
filterset. This is 'Meta.fields' sans the fields in 'Meta.exclude'.
"""
model = cls._meta.model
fields = cls._meta.fields
exclude = cls._meta.exclude
assert not (fields is None and exclude is None), (
"Setting 'Meta.model' without either 'Meta.fields' or 'Meta.exclude' "
"has been deprecated since 0.15.0 and is now disallowed. Add an explicit "
"'Meta.fields' or 'Meta.exclude' to the %s class." % cls.__name__
)
# Setting exclude with no fields implies all other fields.
if exclude is not None and fields is None:
fields = ALL_FIELDS
# Resolve ALL_FIELDS into all fields for the filterset's model.
if fields == ALL_FIELDS:
fields = cls.get_all_model_fields(model)
# Remove excluded fields
exclude = exclude or []
if not isinstance(fields, dict):
fields = [(f, [settings.DEFAULT_LOOKUP_EXPR]) for f in fields if f not in exclude]
else:
fields = [(f, lookups) for f, lookups in fields.items() if f not in exclude]
return OrderedDict(fields)
@classmethod
def get_filters(cls):
"""
Get all filters for the filterset. This is the combination of declared and
generated filters.
"""
# No model specified - skip filter generation
if not cls._meta.model:
return cls.declared_filters.copy()
# Determine the filters that should be included on the filterset.
filters = OrderedDict()
fields = cls.get_fields()
undefined = []
for field_name, lookups in fields.items():
field = get_model_field(cls._meta.model, field_name)
from django.db import models
from timezone_field import TimeZoneField
# 不进行 过滤的model 类
if isinstance(field, (models.JSONField, TimeZoneField, models.FileField)):
continue
# warn if the field doesn't exist.
if field is None:
undefined.append(field_name)
# 更新默认字符串搜索为模糊搜索
if (
isinstance(field, (models.CharField))
and filterset_fields == "__all__"
and lookups == ["exact"]
):
lookups = ["icontains"]
for lookup_expr in lookups:
filter_name = cls.get_filter_name(field_name, lookup_expr)
# If the filter is explicitly declared on the class, skip generation
if filter_name in cls.declared_filters:
filters[filter_name] = cls.declared_filters[filter_name]
continue
if field is not None:
filters[filter_name] = cls.filter_for_field(field, field_name, lookup_expr)
# Allow Meta.fields to contain declared filters *only* when a list/tuple
if isinstance(cls._meta.fields, (list, tuple)):
undefined = [f for f in undefined if f not in cls.declared_filters]
if undefined:
raise TypeError(
"'Meta.fields' must not contain non-model field names: %s" % ", ".join(undefined)
)
# Add in declared filters. This is necessary since we don't enforce adding
# declared filters to the 'Meta.fields' option
filters.update(cls.declared_filters)
return filters
class Meta(MetaBase):
model = queryset.model
fields = filterset_fields
return AutoFilterSet
return None
def filter_queryset(self, request, queryset, view):
filterset = self.get_filterset(request, queryset, view)
if filterset is None:
return queryset
if filterset.__class__.__name__ == "AutoFilterSet":
queryset = filterset.queryset
filter_fields = filterset.filters if self.filter_fields == "__all__" else self.filter_fields
orm_lookup_dict = dict(
zip(
[field for field in filter_fields],
[filterset.filters[lookup].lookup_expr for lookup in filterset.filters.keys()],
)
)
orm_lookups = [
self.construct_search(lookup, lookup_expr) for lookup, lookup_expr in orm_lookup_dict.items()
]
# print(orm_lookups)
conditions = []
queries = []
for search_term_key in filterset.data.keys():
orm_lookup = self.find_filter_lookups(orm_lookups, search_term_key)
if not orm_lookup or filterset.data.get(search_term_key) == '':
continue
filterset_data_len = len(filterset.data.getlist(search_term_key))
if filterset_data_len == 1:
query = Q(**{orm_lookup: filterset.data[search_term_key]})
queries.append(query)
elif filterset_data_len == 2:
orm_lookup += '__range'
query = Q(**{orm_lookup: filterset.data.getlist(search_term_key)})
queries.append(query)
if len(queries) > 0:
conditions.append(reduce(operator.and_, queries))
queryset = queryset.filter(reduce(operator.and_, conditions))
return queryset
else:
return queryset
if not filterset.is_valid() and self.raise_exception:
raise translate_validation(filterset.errors)
return filterset.qs