#django #django-rest-framework
#django #django-rest-framework
Вопрос:
Я хочу получить объект пользователя, чтобы проверить, разрешен ли пользователю доступ к файлу или нет в веб-приложении на основе Django и rest_framework_jwt для остальных веб-сервисов.
У меня есть следующие настройки для моего веб-приложения:
REST_FRAMEWORK = {
'DEFAULT_PERMISSION_CLASSES': (
'rest_framework.permissions.IsAuthenticated',
),
'DEFAULT_AUTHENTICATION_CLASSES': (
'rest_framework_jwt.authentication.JSONWebTokenAuthentication',
'rest_framework.authentication.SessionAuthentication',
'rest_framework.authentication.BasicAuthentication',
),
'DEFAULT_PAGINATION_CLASS':'swiftapp.custom_pagination.CustomPagination',
}
MIDDLEWARE = [
'corsheaders.middleware.CorsMiddleware',
'django.middleware.security.SecurityMiddleware',
'django.contrib.sessions.middleware.SessionMiddleware',
'django.middleware.common.CommonMiddleware',
'django.contrib.auth.middleware.AuthenticationMiddleware',
'django.contrib.messages.middleware.MessageMiddleware',
'django.middleware.clickjacking.XFrameOptionsMiddleware',
]
Мой определенный маршрут:
router = routers.DefaultRouter(trailing_slash=False)
#router.register(r'operation/<str:branch>/<str:filename>/', SwiftFileViewSet, basename='get_queryset') # This didn't work and I don't need a Viewset anyways
router.register(r'operations/(?P<start_date>d{4}-d{2}-d{2})/(?P<end_date>d{4}-d{2}-d{2})$', OperationViewSet, basename='get_queryset')
urlpatterns = [
path(r'api/', include(router.urls)),
path('api/operation/<str:branch>/<str:filename>/', get_operation, name='get_operation'),
]
В приведенном ниже ModelViewSet (метод GET) я могу получить аутентифицированный объект пользователя:
class OperationViewSet(viewsets.ModelViewSet):
permission_classes = (permissions.IsAuthenticated,)
authentication_classes = (authentication.JSONWebTokenAuthentication,)
queryset = Operation.objects.all()
serializer_class = serializers.OperationSerializer
permission_classes = (ReadOnly,)
def perform_create(self, serializer):
serializer.save(user=self.request.user)
def get_queryset(self):
user = self.request.user # I can get the user object here
logger.warning("logged user", user)
req = self.request.data
queryset = Operation.objects.filter(branch=user.get_branch(), date__gte=self.kwargs.get('start_date'),date__lte=self.kwargs.get('end_date'))
return list(queryset)
но не в этом простом контроллере GET API:
def get_operation(request, section, filename):
authentication_classes = (authentication.JSONWebTokenAuthentication,)
permission_classes = (permissions.IsAuthenticated,)
user = self.request.user # I cannot get the user object here
logger.warning("logged user", user)
with open("C:\operations\" section "\" filename ".op", "r") as f:
file_content = f.read()
f.close()
enc = base64.b64encode(bytes(file_content, 'utf-8'))
return HttpResponse(enc)
Пользовательский класс пользователя:
class CustomUserManager(BaseUserManager):
def create_user(self, first_name, last_name, username, branch, password=None):
"""
Creates and saves a User with the given email, date of
birth and password.
"""
if not branch:
raise ValueError('Users must have a branch')
user = self.model(
first_name=first_name,
last_name=last_name,
username=username,
branch=branch,
)
user.is_superuser = False
user.is_active = True
user.set_password(password)
user.save(using=self._db)
return user
def create_superuser(self, first_name, last_name, username, branch, password=None):
"""
Creates and saves a superuser with the given email, date of
birth and password.
"""
user = self.create_user(
first_name,
last_name,
username,
password=password,
branch=branch,
)
user.is_superuser = True
user.is_active = True
user.save(using=self._db)
return user
class CustomUser(AbstractBaseUser):
username = models.CharField(max_length=30, blank=True, null=True, unique=True)
first_name = models.CharField(max_length=30, blank=False, null=False, unique=False)
last_name = models.CharField(max_length=30, blank=False, null=False, unique=False)
branch = models.CharField(max_length=3, blank=True, null=True)
# is_staff = models.BooleanField(default=True, null=True)
is_active = models.BooleanField(default=True, null=False)
is_superuser = models.BooleanField(default=False, blank=False, null=False)
objects = CustomUserManager()
USERNAME_FIELD = 'username'
REQUIRED_FIELDS = ['first_name', 'last_name', 'branch']
class Meta:
db_table='auth_user'
def __str__(self):
return self.username
def has_perm(self, perm, obj=None):
"Does the user have a specific permission?"
# Simplest possible answer: Yes, always
return True
def has_module_perms(self, app_label):
"Does the user have permissions to view the app `app_label`?"
# Simplest possible answer: Yes, always
return True
@property
def is_staff(self):
"Is the user a member of staff?"
# Simplest possible answer: All admins are staff
return self.is_admin
def get_branch(self):
return self.branch
Ответ №1:
Ваше представление get_operation(...)
не совместимо с DRF. Вы должны сделать его совместимым с DRF с помощью @api_view(...)
декоратора. Затем вы можете добавить параметры класса в декоратор
from rest_framework.decorators import api_view, authentication_classes, permission_classes
from rest_framework.response import Response
@api_view(http_method_names=["GET"], )
@authentication_classes((authentication.JSONWebTokenAuthentication,))
@permission_classes((permissions.IsAuthenticated,))
def get_operation(request, section, filename):
user = request.user
return Response({"user pk": user.pk})
Комментарии:
1. Большое вам спасибо! Это спасло меня! 😀
Ответ №2:
В Django есть библиотека с именем PYJWT
сначала установите библиотеку с
pip3 install PyJWT
вы можете определить функцию, которая проверяет JWT с вашим секретом, и вызвать эту функцию перед любым ответом в ваших функциях views.
например :
import jwt
def jwt_authentication(token):
check = False
try:
request_jwt = jwt.decode(token, 'yoursecret', algorithms=['HS512'])
check = True
except:
check = False
return check
и в views.py :
@api_view(['POST'])
def obj_add(request):
if not 'Authorization' in request.headers:
return Response({'Authorization': 'token is required'}, status=status.HTTP_403_FORBIDDEN)
token = request.headers['Authorization']
authentication = jwt_authentication(token)
if not authentication:
return Response({'Authorization': 'token is not valid'}, status=status.HTTP_403_FORBIDDEN)