# coding=utf-8 """ @project: qabot @Author:虎 @file: user_serializers.py @date:2023/9/5 16:32 @desc: """ import datetime import os import random import re from django.core import validators, signing, cache from django.core.mail import send_mail from django.db.models import Q from drf_yasg import openapi from rest_framework import serializers from common.constants.exception_code_constants import ExceptionCodeConstants from common.constants.permission_constants import RoleConstants from common.exception.app_exception import AppApiException from common.mixins.api_mixin import ApiMixin from common.response.result import get_api_response from common.util.lock import lock from smartdoc.conf import PROJECT_DIR from smartdoc.settings import EMAIL_ADDRESS from users.models.user import User, password_encrypt user_cache = cache.caches['user_cache'] class LoginSerializer(ApiMixin, serializers.Serializer): username = serializers.CharField(required=True, validators=[ validators.MaxLengthValidator(limit_value=20, message=ExceptionCodeConstants.USERNAME_ERROR.value.message), validators.MinLengthValidator(limit_value=6, message=ExceptionCodeConstants.USERNAME_ERROR.value.message) ]) password = serializers.CharField(required=True) def is_valid(self, *, raise_exception=False): """ 校验参数 :param raise_exception: 是否抛出异常 只能是True :return: 用户信息 """ super().is_valid(raise_exception=True) username = self.data.get("username") password = password_encrypt(self.data.get("password")) user = self.Meta.model.objects.filter(Q(username=username, password=password) | Q(email=username, password=password)).first() if user is None: raise ExceptionCodeConstants.INCORRECT_USERNAME_AND_PASSWORD.value.to_app_api_exception() return user def get_user_token(self): """ 获取用户Token :return: 用户Token(认证信息) """ user = self.is_valid() token = signing.dumps({'username': user.username, 'id': user.id, 'email': user.email}) return token class Meta: model = User fields = '__all__' def get_request_body_api(self): return openapi.Schema( type=openapi.TYPE_OBJECT, required=['username', 'password'], properties={ 'username': openapi.Schema(type=openapi.TYPE_STRING, title="用户名", description="用户名"), 'password': openapi.Schema(type=openapi.TYPE_STRING, title="密码", description="密码") } ) def get_response_body_api(self): return get_api_response(openapi.Schema( type=openapi.TYPE_STRING, title="token", default="xxxx", description="认证token" ), "token value") class RegisterSerializer(ApiMixin, serializers.Serializer): """ 注册请求对象 """ email = serializers.EmailField( validators=[validators.EmailValidator(message=ExceptionCodeConstants.EMAIL_FORMAT_ERROR.value.message, code=ExceptionCodeConstants.EMAIL_FORMAT_ERROR.value.code)]) username = serializers.CharField(required=True, validators=[ validators.MaxLengthValidator(limit_value=20, message=ExceptionCodeConstants.USERNAME_ERROR.value.message), validators.MinLengthValidator(limit_value=6, message=ExceptionCodeConstants.USERNAME_ERROR.value.message) ]) password = serializers.CharField(required=True) re_password = serializers.CharField(required=True) code = serializers.CharField(required=True) class Meta: model = User fields = '__all__' @lock(lock_key=lambda this, raise_exception: ( this.initial_data.get("email") + ":register" )) def is_valid(self, *, raise_exception=False): super().is_valid(raise_exception=True) if self.data.get('password') != self.data.get('re_password'): raise ExceptionCodeConstants.PASSWORD_NOT_EQ_RE_PASSWORD.value.to_app_api_exception() username = self.data.get("username") email = self.data.get("email") code = self.data.get("code") code_cache_key = email + ":register" cache_code = user_cache.get(code_cache_key) if code != cache_code: raise ExceptionCodeConstants.CODE_ERROR.value.to_app_api_exception() u = User.objects.filter(Q(username=username) | Q(email=email)).first() if u is not None: if u.email == email: raise ExceptionCodeConstants.EMAIL_IS_EXIST.value.to_app_api_exception() if u.username == username: raise ExceptionCodeConstants.USERNAME_IS_EXIST.value.to_app_api_exception() return True def save(self, **kwargs): m = User( **{'email': self.data.get("email"), 'username': self.data.get("username"), 'role': RoleConstants.USER.name}) m.set_password(self.data.get("password")) # 插入用户 m.save() email = self.data.get("email") code_cache_key = email + ":register" # 删除验证码缓存 user_cache.delete(code_cache_key) def get_request_body_api(self): return openapi.Schema( type=openapi.TYPE_OBJECT, required=['username', 'email', 'password', 're_password', 'code'], properties={ 'username': openapi.Schema(type=openapi.TYPE_STRING, title="用户名", description="用户名"), 'email': openapi.Schema(type=openapi.TYPE_STRING, title="邮箱", description="邮箱地址"), 'password': openapi.Schema(type=openapi.TYPE_STRING, title="密码", description="密码"), 're_password': openapi.Schema(type=openapi.TYPE_STRING, title="确认密码", description="确认密码"), 'code': openapi.Schema(type=openapi.TYPE_STRING, title="验证码", description="验证码") } ) class CheckCodeSerializer(ApiMixin, serializers.Serializer): """ 校验验证码 """ email = serializers.EmailField( validators=[validators.EmailValidator(message=ExceptionCodeConstants.EMAIL_FORMAT_ERROR.value.message, code=ExceptionCodeConstants.EMAIL_FORMAT_ERROR.value.code)]) code = serializers.CharField(required=True) type = serializers.CharField(required=True, validators=[ validators.RegexValidator(regex=re.compile("^register|reset_password$"), message="只支持register|reset_password", code=500) ]) def is_valid(self, *, raise_exception=False): super().is_valid() value = user_cache.get(self.data.get("email") + ":" + self.data.get("type")) if value is None or value != self.data.get("code"): raise ExceptionCodeConstants.CODE_ERROR.value.to_app_api_exception() return True class Meta: model = User fields = '__all__' def get_request_body_api(self): return openapi.Schema( type=openapi.TYPE_OBJECT, required=['email', 'code', 'type'], properties={ 'email': openapi.Schema(type=openapi.TYPE_STRING, title="邮箱", description="邮箱地址"), 'code': openapi.Schema(type=openapi.TYPE_STRING, title="验证码", description="验证码"), 'type': openapi.Schema(type=openapi.TYPE_STRING, title="类型", description="register|reset_password") } ) def get_response_body_api(self): return get_api_response(openapi.Schema( type=openapi.TYPE_BOOLEAN, title="是否成功", default=True, description="错误提示"), True) class RePasswordSerializer(ApiMixin, serializers.Serializer): email = serializers.EmailField( validators=[validators.EmailValidator(message=ExceptionCodeConstants.EMAIL_FORMAT_ERROR.value.message, code=ExceptionCodeConstants.EMAIL_FORMAT_ERROR.value.code)]) code = serializers.CharField(required=True) password = serializers.CharField(required=True) re_password = serializers.CharField(required=True) class Meta: model = User fields = '__all__' def is_valid(self, *, raise_exception=False): super().is_valid(raise_exception=True) email = self.data.get("email") cache_code = user_cache.get(email + ':reset_password') if self.data.get('password') != self.data.get('re_password'): raise AppApiException(ExceptionCodeConstants.PASSWORD_NOT_EQ_RE_PASSWORD.value.code, ExceptionCodeConstants.PASSWORD_NOT_EQ_RE_PASSWORD.value.message) if cache_code != self.data.get('code'): raise AppApiException(ExceptionCodeConstants.CODE_ERROR.value.code, ExceptionCodeConstants.CODE_ERROR.value.message) return True def reset_password(self): """ 修改密码 :return: 是否成功 """ if self.is_valid(): email = self.data.get("email") self.Meta.model.objects.filter(email=email).update( password=password_encrypt(self.data.get('password'))) code_cache_key = email + ":reset_password" # 删除验证码缓存 user_cache.delete(code_cache_key) return True def get_request_body_api(self): return openapi.Schema( type=openapi.TYPE_OBJECT, required=['email', 'code', "password", 're_password'], properties={ 'email': openapi.Schema(type=openapi.TYPE_STRING, title="邮箱", description="邮箱地址"), 'code': openapi.Schema(type=openapi.TYPE_STRING, title="验证码", description="验证码"), 'password': openapi.Schema(type=openapi.TYPE_STRING, title="密码", description="密码"), 're_password': openapi.Schema(type=openapi.TYPE_STRING, title="确认密码", description="确认密码") } ) class SendEmailSerializer(ApiMixin, serializers.Serializer): email = serializers.EmailField( validators=[validators.EmailValidator(message=ExceptionCodeConstants.EMAIL_FORMAT_ERROR.value.message, code=ExceptionCodeConstants.EMAIL_FORMAT_ERROR.value.code)]) type = serializers.CharField(required=True, validators=[ validators.RegexValidator(regex=re.compile("^register|reset_password$"), message="只支持register|reset_password", code=500) ]) class Meta: model = User fields = '__all__' def is_valid(self, *, raise_exception=False): super().is_valid(raise_exception=raise_exception) user_exists = self.Meta.model.objects.filter(email=self.data.get('email')).exists() if not user_exists and self.data.get('type') == 'reset_password': raise ExceptionCodeConstants.EMAIL_IS_NOT_EXIST.value.to_app_api_exception() elif user_exists and self.data.get('type') == 'register': raise ExceptionCodeConstants.EMAIL_IS_EXIST.value.to_app_api_exception() code_cache_key = self.data.get('email') + ":" + self.data.get("type") code_cache_key_lock = code_cache_key + "_lock" ttl = user_cache.ttl(code_cache_key_lock) if ttl is not None: raise AppApiException(500, f"{ttl.total_seconds()}秒内请勿重复发送邮件") return True def send(self): """ 发送邮件 :return: 是否发送成功 :exception 发送失败异常 """ email = self.data.get("email") state = self.data.get("type") # 生成随机验证码 code = "".join(list(map(lambda i: random.choice(['1', '2', '3', '4', '5', '6', '7', '8', '9', '0' ]), range(6)))) # 获取邮件模板 file = open(os.path.join(PROJECT_DIR, "apps", "common", 'template', 'email_template.html'), "r", encoding='utf-8') content = file.read() file.close() code_cache_key = email + ":" + state code_cache_key_lock = code_cache_key + "_lock" # 设置缓存 user_cache.set(code_cache_key_lock, code, timeout=datetime.timedelta(minutes=1)) try: # 发送邮件 send_mail(f'【智能客服{"用户注册" if state == "register" else "修改密码"}】', '', html_message=f'{content.replace("${code}", code)}', from_email=EMAIL_ADDRESS, recipient_list=[email], fail_silently=False) except Exception as e: user_cache.delete(code_cache_key_lock) raise AppApiException(500, f"{str(e)}邮件发送失败") user_cache.set(code_cache_key, code, timeout=datetime.timedelta(minutes=30)) return True def get_request_body_api(self): return openapi.Schema( type=openapi.TYPE_OBJECT, required=['email', 'type'], properties={ 'email': openapi.Schema(type=openapi.TYPE_STRING, title="邮箱", description="邮箱地址"), 'type': openapi.Schema(type=openapi.TYPE_STRING, title="类型", description="register|reset_password") } ) def get_response_body_api(self): return get_api_response(openapi.Schema(type=openapi.TYPE_STRING, default=True), True) class UserSerializer(serializers.ModelSerializer): class Meta: model = User fields = ["email", "id", "username", ]