目录
前后端混合开发,可以通过HttpResponse对象来设置cookie进而校验登录,现在前后端分离开发,用不到cookie,那么该怎么认证?DRF提供了认证的方法
我们知道在APIView执行的过程中,在dispatch方法中走了三大认证self.initial(request, *args, **kwargs)
def initial(self, request, *args, **kwargs):
···
self.perform_authentication(request) # 认证
self.check_permissions(request) # 权限
self.check_throttles(request) # 频率
需求
我们通过登录接口,来模拟认证登录,登录成功返回json字符串,并且携带随机字符串(uuid模拟生成token),通过token随机字符串来判断用户是否登录,登录了就更新token,首次登录就存token;
分析
模型
from django.db import models
class User(models.Model):
username = models.CharField(max_length=32)
password = models.CharField(max_length=16)
user_type = models.IntegerField(choices=((1, '超级管理员'), (2, '普通管理员'), (3, '普通用户')))
def get_code(self):
self.get_user_type_display()
print(self.get_user_type_display())
class UserToken(models.Model):
user = models.OneToOneField(to=User,on_delete=models.CASCADE)
token = models.CharField(max_length=32)
视图
from rest_framework.response import Response
from rest_framework.viewsets import ViewSet
from rest_framework.decorators import action
from app01 import models
class UserView(ViewSet):
@action(methods=['POST'], detail=False)
def login(self, request, *args, **kwargs):
# 获取数据
username = request.data.get('username')
password = request.data.get('password')
user = models.User.objects.filter(username=username, password=password).first()
if user:
# 如果user有值说明登录成功,生产随机字符串,存入数据库,如果重复登录那么就更新随机字符串
import uuid
uuid_str = uuid.uuid4()
# print(type(uuid_str)) # <class 'uuid.UUID'>
token = str(uuid_str)
# 如果存在就更新,如果不存在就新增,指定搜索对象,然后defaults指定更新内容
models.UserToken.objects.update_or_create(user=user,defaults={'token': token} )
# 返回随机字符串
return Response({'code': 100, 'msg': '登录成功', 'token': token})
return Response({'code': 101, 'msg': '登录失败,用户名或密码错误'})
路由
from django.contrib import admin
from django.urls import path,include
from app01 import views
from rest_framework.routers import SimpleRouter
router = SimpleRouter()
router.register('user',views.UserView,'user')
urlpatterns = [
path('admin/', admin.site.urls),
path('',include(router.urls))
]
我们知道平时生活中,有一些接口是认证后才能调用的,比如我们登录后才能查看个人站点内容等··· 在执行视图函数之前执行了认证方法:self.perform_authentication(request)
这里写一个认证demo,只有登录过的才能查看Book表
'''auth.py'''
from rest_framework.authentication import BaseAuthentication
from rest_framework.exceptions import AuthenticationFailed
from app01 import models
# 写一个类继承BaseAuthentication
class LoginAuth(BaseAuthentication):
# 重写authenticate方法
def authenticate(self, request):
# 获取前端携带的token,token放在哪是自己规定的,比如从查询参数中获取
token = request.query_params.get('token')
# 比对随机字符串
user_token = models.UserToken.objects.filter(token=token).first()
if user_token:
# 登录了,返回当前登录用户和token
return user_token.user,token
else:
# 没有登录,抛异常
raise AuthenticationFailed('您没有登录,请登录')
'''serializer.py'''
from rest_framework import serializers
from app01 import models
class BookSerializer(serializers.ModelSerializer):
class Meta:
model = models.Book
fields = '__all__'
'''models.py'''
class Book(models.Model):
name = models.CharField(max_length=32)
price = models.DecimalField(decimal_places=2,max_digits=5)
author = models.CharField(max_length=32)
'''urls.py'''
from django.contrib import admin
from django.urls import path,include
from app01 import views
from rest_framework.routers import SimpleRouter
router = SimpleRouter()
router.register('user',views.UserView,'user')
router.register('books',views.BookView,'books')
urlpatterns = [
path('admin/', admin.site.urls),
path('',include(router.urls))
]
总结
和认证一样,都是写一个类去继承,写权限继承BasePermission,重写has_permission方法,判断如果有权限,返回True,如果没有权限,返回False 然后局部使用或者全局使用,或局部禁用
permission_classes = [UserPermission, ]
REST_FRAMEWORK={ "DEFAULT_PERMISSION_CLASSES":["app01.auth.UserPermission",] }
permission_classes = []
需求
权限类
class UserPermission(BasePermission):
def has_permission(self, request, view):
# 没有权限的提示信息
self.message = '您是:%s,没有权限' % request.user.get_user_type_display()
# 如果有权限,返回True,没有权限返回False
# 权限类,在认证类之后,request.user有了当前登录用户
user_type = request.user.user_type
print(user_type)
if user_type < 3: # 只要不是1,2,就没有权限
return True
else:
return False
视图
from .auth import LoginAuth, UserPermission
from rest_framework.mixins import RetrieveModelMixin, DestroyModelMixin, UpdateModelMixin,ListModelMixin,CreateModelMixin
from rest_framework.viewsets import GenericViewSet
class BookView(RetrieveModelMixin,ListModelMixin,GenericViewSet):
# 局部使用,普通用户登录后只能获取一条或所有
authentication_classes = [LoginAuth, ]
queryset = models.Book.objects.all()
serializer_class = serializer.BookSerializer
class BookDetailView(CreateModelMixin, DestroyModelMixin, UpdateModelMixin, GenericViewSet):
# 局部使用,普通用户没有权限
authentication_classes = [LoginAuth, ]
permission_classes = [UserPermission, ]
queryset = models.Book.objects.all()
serializer_class = serializer.BookSerializer
路由
from rest_framework.routers import SimpleRouter
router = SimpleRouter()
router.register('user',views.UserView,'user')
router.register('books',views.BookView,'books')
router.register('bookdetail',views.BookDetailView,'bookdetail')
urlpatterns = [
path('admin/', admin.site.urls),
path('',include(router.urls))
]
总结
步骤
注意
频率类
# 频率类
class IPThrottle(SimpleRateThrottle):
scope = 'ip'
# get_cache_key返回什么就以什么方法做限制,限制条件必须唯一,比如用户id
def get_cache_key(self, request, view):
# 限制ip地址,从request.META字典中获取ip
'''
request.META:请求头中的数据
'''
return request.META.get('REMOTE_ADDR') # 客户端ip
配置文件
REST_FRAMEWORK={
'DEFAULT_THROTTLE_RATES': {
'ip': '3/m' # minute_3是scope的字符串,一分钟访问3次
}
局部使用
class BookView(RetrieveModelMixin, ListModelMixin, GenericViewSet):
authentication_classes = [LoginAuth, ] # 登录认证
permission_classes = [UserPermission, ] # 权限限制
throttle_classes = [IPThrottle, ] # 频率限制
queryset = models.Book.objects.all()
serializer_class = serializer.BookSerializer
全局使用
REST_FRAMEWORK={
'DEFAULT_THROTTLE_CLASSES': ( # 全局配置频率类
'app01.auth.IPThrottle'
),
}
总结
from django.db import models
# Create your models here.
class User(models.Model):
username = models.CharField(max_length=32)
password = models.CharField(max_length=16)
user_type = models.IntegerField(choices=((1, '超级管理员'), (2, '普通管理员'), (3, '普通用户')))
class UserToken(models.Model):
user = models.OneToOneField(to=User,on_delete=models.CASCADE)
token = models.CharField(max_length=32)
class Book(models.Model):
name = models.CharField(max_length=32)
price = models.DecimalField(decimal_places=2,max_digits=5)
author = models.CharField(max_length=32)
from rest_framework.decorators import action
from rest_framework.response import Response
from rest_framework.viewsets import ViewSet
from app01 import models
from app01 import serializer
class UserView(ViewSet):
@action(methods=['POST'], detail=False)
def login(self, request, *args, **kwargs):
# 获取数据
username = request.data.get('username')
password = request.data.get('password')
user = models.User.objects.filter(username=username, password=password).first()
if user:
# 如果user有值说明登录成功,生产随机字符串,存入数据库,如果重复登录那么就更新随机字符串
import uuid
uuid_str = uuid.uuid4()
# print(type(uuid_str)) # <class 'uuid.UUID'>
token = str(uuid_str)
# 如果存在就更新,如果不存在就新增,指定搜索对象,然后defaults指定更新内容
models.UserToken.objects.update_or_create(user=user, defaults={'token': token})
# 返回随机字符串
return Response({'code': 100, 'msg': '登录成功', 'token': token})
return Response({'code': 101, 'msg': '登录失败,用户名或密码错误'})
from .auth import LoginAuth, UserPermission, IPThrottle
from rest_framework.mixins import RetrieveModelMixin, DestroyModelMixin, UpdateModelMixin, ListModelMixin, \
CreateModelMixin
from rest_framework.viewsets import GenericViewSet
class BookView(RetrieveModelMixin, ListModelMixin, GenericViewSet):
# 局部使用,普通用户登录后只能获取一条或所有
authentication_classes = [LoginAuth, ]
throttle_classes = [IPThrottle, ]
queryset = models.Book.objects.all()
serializer_class = serializer.BookSerializer
class BookDetailView(CreateModelMixin, DestroyModelMixin, UpdateModelMixin, GenericViewSet):
# 局部使用,普通用户没有权限
authentication_classes = [LoginAuth, ]
permission_classes = [UserPermission, ]
queryset = models.Book.objects.all()
serializer_class = serializer.BookSerializer
from rest_framework import serializers
from app01 import models
class BookSerializer(serializers.ModelSerializer):
class Meta:
model = models.Book
fields = '__all__'
from rest_framework.authentication import BaseAuthentication
from rest_framework.exceptions import AuthenticationFailed
from rest_framework.permissions import BasePermission
from rest_framework.throttling import SimpleRateThrottle
from app01 import models
# 认证类
class LoginAuth(BaseAuthentication):
# 重写authenticate方法
def authenticate(self, request):
# 获取前端携带的token,token放在哪是自己规定的,比如从查询参数中获取
token = request.query_params.get('token')
# 比对随机字符串
user_token = models.UserToken.objects.filter(token=token).first()
if user_token:
# 登录了,返回当前登录用户和token
return user_token.user, token
else:
# 没有登录,抛异常
raise AuthenticationFailed('您没有登录,请登录')
# 权限类
class UserPermission(BasePermission):
def has_permission(self, request, view):
# 没有权限的提示信息
self.message = '您是:%s,没有权限' % request.user.get_user_type_display()
# 如果有权限,返回True,没有权限返回False
# 权限类,在认证类之后,request.user有了当前登录用户
user_type = request.user.user_type
print(user_type)
if user_type < 3: # 只要不是1,2,就没有权限
return True
else:
return False
# 频率类
class IPThrottle(SimpleRateThrottle):
scope = 'ip'
# get_cache_key返回什么就以什么方法做限制,限制条件必须唯一,比如用户id
def get_cache_key(self, request, view):
# 限制ip地址,从request.META字典中获取ip
'''
request.META:请求头中的数据
'''
return request.META.get('REMOTE_ADDR') # 客户端ip
REST_FRAMEWORK={
'DEFAULT_THROTTLE_RATES': {
'ip': '3/m' # minute_3是scope的字符串,一分钟访问3次
},
from django.contrib import admin
from django.urls import path,include
from app01 import views
from rest_framework.routers import SimpleRouter
router = SimpleRouter()
router.register('user',views.UserView,'user')
router.register('books',views.BookView,'books')
router.register('bookdetail',views.BookDetailView,'bookdetail')
urlpatterns = [
path('admin/', admin.site.urls),
path('',include(router.urls))
]