# ezops/utils/permissions.py:
class UserLock(APIException):
status_code = status.HTTP_400_BAD_REQUEST
default_detail = '用户已被锁定,请联系管理员!!!'
default_code = 'not_authenticated'
# "User account is disabled."
class RbacPermission(BasePermission):
"""
自定义Rbac权限认证
"""
@staticmethod
def api_uri(uri):
base_api = settings.BASE_API
uri = '/' + base_api + '/' + uri + '/'
return re.sub('/+', '/', uri)
def has_permission(self, request, view):
# -------------------- 白名单 -------------------- #
for safe_url in settings.WHITE_LIST:
if re.match('{}'.format(safe_url), request.path):
logger.info("自定义rbac认证方式: 白名单接口")
return True
# -------------------- admin 管理员白名单 -------------------- #
if 'admin' in request.user.roles.values_list('name', flat=True) or request.user.is_superuser:
logger.info("自定义rbac认证方式: admin 放行")
return True
if request.user.is_active:
if False: # 禁用redis
# if settings.REDIS_STATUS:
# -------------------- 缓存rbac -------------------- #
logger.info(f"自定义rbac认证方式: redis缓存规则, userId:{request.user.id}")
orm_userinfo = Users.objects.get(pk=request.user.id).get_user_info()
logger.info(f"orm_userinfo:{orm_userinfo}")
conn = get_redis_connection('user_info')
url_keys = conn.hkeys('user_permissions_manage')
for url_key in url_keys:
if re.match('{}'.format(self.api_uri(url_key.decode())), request.path):
redis_key = url_key.decode()
break
else:
return True
permissions = json.loads(conn.hget('user_permissions_manage', redis_key).decode())
method_hit = False
# -------------------- 同一接口配置不同权限验证 -------------------- #
for permission in permissions:
if permission.get('method') == request.method:
method_hit = True
if permission.get('sign') in orm_userinfo['permissions']:
info = 'sign: {}'.format(permission.get('sign'))
logger.info(info)
return True
else:
if method_hit:
return False
else:
return True
else:
# -------------------- 数据库中权限列表 -------------------- #
logger.info("自定义rbac认证方式: 数据库规则")
permissions = Permission.objects.all()
user_permissions_manage = {}
for i in permissions:
if not i.menu and i.path:
user_permissions_manage[i.sign] = { 'path':i.path,'method':i.method,'id':i.id}
# -------------------- 用户permissions -------------------- #
orm_userinfo = Users.objects.get(pk=request.user.id).get_user_info()
logger.info(f"用户userinfo:{orm_userinfo}")
logger.info(f"user_permissions_manage:{user_permissions_manage}")
# -------------------- 匹配路径和method -------------------- #
for permission in user_permissions_manage:
user_permission_path = self.api_uri(user_permissions_manage[permission]['path'])
if re.match(user_permission_path,request.path) : # 请求路径匹配
if request.method == user_permissions_manage[permission]['method'] and permission in orm_userinfo['permissions']:
info = '[+] {} {} {}'.format(user_permission_path,user_permissions_manage[permission]['method'],permission)
logger.info(info)
return True
else:
info = '[+] GET url: {} - {} - {}'.format(request.path,request.method,permission)
logger.info(info)
return True
else:
raise UserLock()