Created
September 23, 2025 15:15
-
-
Save rg3915/72f7b3efaca2939be2bdb9ffca28c9a1 to your computer and use it in GitHub Desktop.
Django Ninja Permissions
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# api.py - Exemplo genérico | |
from http import HTTPStatus | |
from django.shortcuts import get_object_or_404 | |
from ninja import Router | |
from .security import CombinedAuth, PermissionChecker | |
from .schemas import ItemCreateSchema, ItemSchema, ItemUpdateSchema | |
from .models import Item | |
router = Router(tags=['Items']) | |
# Configuração de autenticação para diferentes operações CRUD | |
# Substitua 'app.add_model', 'app.change_model', etc. pelas permissões específicas do seu modelo | |
auth_create = CombinedAuth(permissions=PermissionChecker('app.add_model')) | |
auth_update = CombinedAuth(permissions=PermissionChecker('app.change_model')) | |
auth_delete = CombinedAuth(permissions=PermissionChecker('app.delete_model')) | |
@router.post( | |
'items', | |
summary="Cria um novo item", | |
description="Cria um novo item no sistema. Requer autenticação e permissão de criação.", | |
auth=auth_create, | |
response={HTTPStatus.CREATED: ItemSchema} | |
) | |
def item_create(request, payload: ItemCreateSchema): | |
""" | |
Endpoint para criar um novo item. | |
Args: | |
request: Objeto de requisição HTTP | |
payload: Dados do item a ser criado | |
Returns: | |
Item: Item criado | |
""" | |
return Item.objects.create(**payload.dict()) | |
def update_instance(instance, payload): | |
""" | |
Atualiza uma instância de modelo Django com os dados fornecidos no payload. | |
Args: | |
instance: Instância do modelo Django a ser atualizada | |
payload: Schema/objeto contendo os novos valores (deve ter método .dict()) | |
Returns: | |
None: A função modifica a instância in-place e salva no banco de dados | |
""" | |
# Converte o payload (schema) em um dicionário Python | |
data = payload.dict() | |
for attr, value in data.items(): | |
# Define dinamicamente o atributo na instância usando setattr() | |
# setattr(objeto, 'nome_atributo', valor) é equivalente a objeto.nome_atributo = valor | |
setattr(instance, attr, value) | |
instance.save() | |
@router.patch( | |
'items/{pk}', | |
summary="Atualiza um item existente", | |
description="Atualiza parcialmente um item existente. Requer autenticação e permissão de alteração.", | |
auth=auth_update, | |
response=ItemSchema | |
) | |
def item_update(request, pk: int, payload: ItemUpdateSchema): | |
""" | |
Endpoint para atualizar um item existente. | |
Args: | |
request: Objeto de requisição HTTP | |
pk: ID do item a ser atualizado | |
payload: Dados para atualização | |
Returns: | |
Item: Item atualizado | |
""" | |
instance = get_object_or_404(Item, pk=pk) | |
update_instance(instance, payload) | |
return instance | |
@router.delete( | |
'items/{pk}', | |
summary="Remove um item", | |
description="Remove um item do sistema. Requer autenticação e permissão de exclusão.", | |
auth=auth_delete | |
) | |
def item_delete(request, pk: int): | |
""" | |
Endpoint para deletar um item. | |
Args: | |
request: Objeto de requisição HTTP | |
pk: ID do item a ser removido | |
Returns: | |
dict: Confirmação de sucesso | |
""" | |
instance = get_object_or_404(Item, pk=pk) | |
instance.delete() | |
return {'success': True} | |
# Exemplo de uso com diferentes modelos: | |
# Para Produtos: | |
# auth_product_create = CombinedAuth(permissions=PermissionChecker('produto.add_produto')) | |
# auth_product_update = CombinedAuth(permissions=PermissionChecker('produto.change_produto')) | |
# auth_product_delete = CombinedAuth(permissions=PermissionChecker('produto.delete_produto')) | |
# Para Usuários: | |
# auth_user_create = CombinedAuth(permissions=PermissionChecker('auth.add_user')) | |
# auth_user_update = CombinedAuth(permissions=PermissionChecker('auth.change_user')) | |
# auth_user_delete = CombinedAuth(permissions=PermissionChecker('auth.delete_user')) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# security.py | |
from typing import Optional | |
import jwt | |
from django.conf import settings | |
from django.contrib.auth.models import User | |
from django.http import HttpRequest | |
from ninja.errors import HttpError | |
from ninja.security import HttpBearer | |
class PermissionChecker: | |
""" | |
Classe responsável por verificar se um usuário possui as permissões necessárias. | |
""" | |
def __init__(self, permission=None): | |
""" | |
Args: | |
permission (str, optional): Nome da permissão a ser verificada | |
""" | |
self.permission = permission | |
def check(self, request, user): | |
""" | |
Verifica se o usuário está autenticado e possui a permissão necessária. | |
Args: | |
request: Objeto de requisição HTTP | |
user: Usuário a ser verificado | |
Raises: | |
HttpError: 401 se não autenticado, 403 se sem permissão | |
""" | |
if not user or not user.is_authenticated: | |
raise HttpError(401, "Authentication required") | |
if self.permission and not user.has_perm(self.permission): | |
raise HttpError(403, f"Permission '{self.permission}' required") | |
class BearerAuth(HttpBearer): | |
""" | |
Classe de autenticação via Bearer Token (JWT). | |
Herda de HttpBearer do Django Ninja. | |
""" | |
def __init__(self, permissions=None): | |
""" | |
Inicializa a autenticação Bearer. | |
Args: | |
permissions: Instância de PermissionChecker para verificar permissões | |
""" | |
super().__init__() | |
self.permissions = permissions or PermissionChecker() | |
def authenticate(self, request: HttpRequest, token: str) -> Optional[User]: | |
""" | |
Autentica um usuário usando JWT Bearer Token. | |
Args: | |
request: Objeto de requisição HTTP | |
token: Token JWT extraído do header Authorization | |
Returns: | |
User: Usuário autenticado se token válido, None caso contrário | |
""" | |
try: | |
# Decodifica o token JWT usando a chave secreta do Django | |
payload = jwt.decode(token, settings.SECRET_KEY, algorithms=['HS256']) | |
# Extrai o ID do usuário do payload do token | |
user_id = payload.get('user_id') | |
if user_id: | |
user = User.objects.get(id=user_id) | |
# Verifica se o usuário possui as permissões necessárias | |
self.permissions.check(request, user) | |
return user | |
except jwt.InvalidTokenError: | |
# Token inválido ou expirado | |
return None | |
except User.DoesNotExist: | |
# Usuário não encontrado na base de dados | |
return None | |
return None | |
class CombinedAuth(BearerAuth): | |
""" | |
Classe de autenticação combinada que suporta tanto sessão Django quanto Bearer Token. | |
Primeiro tenta autenticação por sessão, depois por token JWT. | |
""" | |
def __call__(self, request: HttpRequest) -> Optional[User]: | |
""" | |
Realiza autenticação combinada: primeiro sessão, depois JWT. | |
Args: | |
request: Objeto de requisição HTTP | |
Returns: | |
User: Usuário autenticado se válido, None caso contrário | |
""" | |
# Primeiro tenta autenticação por sessão Django | |
if request.user.is_authenticated: | |
self.permissions.check(request, request.user) | |
return request.user | |
# Se não houver sessão ativa, usa a autenticação JWT do BearerAuth | |
return super().__call__(request) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment