Ir para o conteúdo

Autenticação e Autorização com JWT


Objetivos da Aula:

  • Um entendimento básico sobre JWT
  • Implementar autenticação de usuários com JWT.
  • Adicionar lógica de autorização aos endpoints de atualização e deleção.
  • Utilizar a biblioteca pwdlib para encriptar as senhas dos usuários.
Caso prefira ver a aula em vídeo

Esse aula ainda não está disponível em formato de vídeo, somente em texto ou live!

Aula Slides Código Quiz Exercícios


Introdução

Nesta aula, abordaremos dois aspectos cruciais de qualquer aplicação web: a autenticação e a autorização. Até agora, nossos usuários podem criar, ler, atualizar e deletar suas contas, mas qualquer pessoa pode fazer essas ações. Não queremos que qualquer usuário possa deletar ou modificar a conta de outro usuário. Para evitar isso, vamos implementar autenticação e autorização em nossa aplicação.

A autenticação é o processo de verificar quem um usuário é, enquanto a autorização é o processo de verificar o que ele tem permissão para fazer. Usaremos o JSON Web Token (JWT) para implementar a autenticação, e adicionaremos lógica de autorização aos nossos endpoints.

Além disso, até agora, estamos armazenando as senhas dos usuários como texto puro no banco de dados, o que é uma prática insegura. Corrigiremos isso utilizando a biblioteca pwdlib para encriptar as senhas.

O que é um JWT

O JWT é um padrão (RFC 7519) que define uma maneira compacta e autônoma de transmitir informações entre as partes de maneira segura. Essas informações são transmitidas como um objeto JSON que é digitalmente assinado usando um segredo (com o algoritmo HMAC) ou um par de chaves pública/privada usando RSA, ou ECDSA.

Um JWT consiste em três partes:

  1. Header: O cabeçalho do JWT consiste tipicamente em dois componentes: o tipo de token, que é JWT neste caso, e o algoritmo de assinatura, como HMAC SHA256 ou RSA. Essas informações são codificadas em Base64Url e formam a primeira parte do JWT.

    {
       "alg": "HS256",
       "typ": "JWT"
    }
    
  2. Payload: O payload de um JWT é onde as reivindicações (em inglês claims) são armazenadas. As reivindicações são informações que queremos transmitir e que são relevantes para a interação entre o cliente e o servidor. As reivindicações são codificadas em Base64Url e formam a segunda parte do JWT.

    {
      "sub": "teste@test.com",
      "exp": 1690258153
    }
    
  3. Signature: A assinatura é utilizada para verificar que o remetente do JWT é quem afirma ser e para garantir que a mensagem não foi alterada ao longo do caminho. Para criar a assinatura, você precisa codificar o cabeçalho, o payload, e um segredo utilizando o algoritmo especificado no cabeçalho. A assinatura é a terceira parte do JWT. Uma assinatura de JWT pode ser criada como se segue:

    HMACSHA256(
        base64UrlEncode(header) + "." +
        base64UrlEncode(payload),
     nosso-segredo
    )
    

Essas três partes são separadas por pontos (.) e juntas formam um token JWT.

Formando a estrutura: HEADER.PAYLOAD.SIGNATURE que formam um token parecido com

eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiJ0ZXN0ZUB0ZXN0LmNvbSIsImV4cCI6MTY5MDI1ODE1M30.Nx0P_ornVwJBH_LLLVrlJoh6RmJeXR-Nr7YJ_mlGY04

É importante ressaltar que, apesar de a informação em um JWT estar codificada, ela não está criptografada. Isso significa que qualquer pessoa com acesso ao token pode decodificar e ler as informações nele. No entanto, sem o segredo usado para assinar o token, eles não podem alterar as informações ou forjar um novo token. Portanto, não devemos incluir informações sensíveis ou confidenciais no payload do JWT.

Se quisermos ver o header, o payload e a assinatura contidas nesse token podemos acessar o debuger do jwt e checar quais as informações que estão nesse token:

Captura de tela de nosso token na página de debugger do jwt.io

Claims

As Claims do JWT são as informações que serão adicionadas ao token via payload. Como:

{
    "sub": "teste@test.com",
    "exp": 1690258153
}

Onde as chaves deste exemplo:

  • sub: identifica o "assunto" (subject), basicamente uma forma de identificar o cliente. Pode ser um id, um uuid, email, ...
  • exp: tempo de expiração do token. O backend vai usar esse dado para validar se o token ainda é válido ou existe a necessidade de uma atualização do token.

Em nossos exemplos iremos usar somente essas duas claims, mais existem muitas outras. Você pode ver a lista completa das claims aqui caso queira aprender mais.

Como funciona o JWT

Em uma aplicação web, o processo de autenticação geralmente funciona da seguinte maneira:

  1. O usuário envia suas credenciais (e-mail e senha) para o servidor em um endpoint de geração de token (/token por exemplo);
  2. O servidor verifica as credenciais e, se estiverem corretas, gera um token JWT e o envia de volta ao cliente;
  3. Nas solicitações subsequentes, o cliente deve incluir esse token no cabeçalho de autorização de suas solicitações. Como, por exemplo: Authorization: Bearer <token>;
  4. Quando o servidor recebe uma solicitação com um token JWT, ele pode verificar a assinatura e se o token é válido e não expirou, ele processa a solicitação.
sequenceDiagram
  participant Cliente as Cliente
  participant Servidor as Servidor
  Cliente->>Servidor: Envia credenciais (e-mail e senha)
  Servidor->>Cliente: Verifica as credenciais
  Servidor->>Cliente: Envia token JWT
  Cliente->>Servidor: Envia solicitação com token JWT no cabeçalho de autorização
  Servidor->>Cliente: Verifica o token JWT e processa a solicitação

Nos próximos tópicos, vamos detalhar como podemos gerar e verificar tokens JWT em nossa aplicação FastAPI, bem como adicionar autenticação e autorização aos nossos endpoints.

Gerando tokens JWT

Para gerar tokens JWT, precisamos de duas bibliotecas extras: pyjwt e pwdlib. A primeira será usada para a geração do token, enquanto a segunda será usada para criptografar as senhas dos usuários. Para instalá-las, execute o seguinte comando no terminal:

$ Execução no terminal!
poetry add pyjwt "pwdlib[argon2]"

Agora, criaremos uma função para gerar nossos tokens JWT. Criaremos um novo arquivo para gerenciar a segurança: security.py. Nesse arquivo iniciaremos a geração dos tokens:

fast_zero/security.py
from datetime import datetime, timedelta

from jwt import encode
from pwdlib import PasswordHash
from zoneinfo import ZoneInfo

SECRET_KEY = 'your-secret-key'  # Isso é provisório, vamos ajustar!
ALGORITHM = 'HS256'
ACCESS_TOKEN_EXPIRE_MINUTES = 30
pwd_context = PasswordHash.recommended()


def create_access_token(data: dict):
    to_encode = data.copy()
    expire = datetime.now(tz=ZoneInfo('UTC')) + timedelta(
        minutes=ACCESS_TOKEN_EXPIRE_MINUTES
    )
    to_encode.update({'exp': expire})
    encoded_jwt = encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
    return encoded_jwt

A função create_access_token é responsável por criar um novo token JWT que será usado para autenticar o usuário. Ela recebe um dicionário de dados, adiciona um tempo de expiração ao token (baseado na constante ACCESS_TOKEN_EXPIRE_MINUTES). Esses dados, em conjunto, formam o payload do JWT. Em seguida, usa a biblioteca pyjwt para codificar essas informações em um token JWT, que é então retornado.

Note que a constante SECRET_KEY é usada para assinar o token, e o algoritmo HS256 é usado para a codificação. Em um cenário de produção, você deve manter a SECRET_KEY em um local seguro e não expô-la em seu código.

Testando a geração de tokens

Embora esse código será coberto no futuro com a utilização do token, é interessante criarmos um teste para essa função com uma finalidade puramente didática. De forma que vejamos os tokens gerados pelo pyjwt e interagirmos com ele.

Com isso criaremos um arquivo chamado tests/test_security.py para efetuar esse teste:

tests/test_security.py
from jwt import decode

from fast_zero.security import SECRET_KEY, create_access_token


def test_jwt():
    data = {'test': 'test'}
    token = create_access_token(data)

    decoded = decode(token, SECRET_KEY, algorithms=['HS256'])

    assert decoded['test'] == data['test']
    assert decoded['exp']  # Testa se o valor de exp foi adicionado ao token

Na próxima seção, veremos como podemos usar a biblioteca pwdlib para tratar as senhas dos usuários.

Hashing de Senhas

Armazenar senhas em texto puro é uma prática de segurança extremamente perigosa. Em vez disso, é uma prática padrão criptografar ("hash") as senhas antes de armazená-las. Quando um usuário tenta se autenticar, a senha inserida é criptografada novamente e comparada com a versão criptografada armazenada no banco de dados. Se as duas correspondem, o usuário é autenticado.

Implementaremos essa funcionalidade usando a biblioteca pwdlib. Criaremos duas funções: uma para criar o hash da senha e outra para verificar se uma senha inserida corresponde ao hash armazenado. Adicione o seguinte código ao arquivo security.py:

fast_zero/security.py
def get_password_hash(password: str):
    return pwd_context.hash(password)


def verify_password(plain_password: str, hashed_password: str):
    return pwd_context.verify(plain_password, hashed_password)

A função get_password_hash recebe uma senha em texto puro como argumento e retorna uma versão criptografada dessa senha. A função verify_password recebe uma senha em texto puro e uma senha criptografada como argumentos, e verifica se a senha em texto puro, quando criptografada, corresponde à senha criptografada. Ambas as funções utilizam o objeto pwd_context, que definimos anteriormente usando a biblioteca pwdlib.

Agora, quando um usuário se registra em nossa aplicação, devemos usar a função get_password_hash para armazenar uma versão criptografada da senha. Quando um usuário tenta se autenticar, devemos usar a função verify_password para verificar se a senha inserida corresponde à senha armazenada.

Na próxima seção, modificaremos nossos endpoints para fazer uso dessas funções.

Modificando o endpoint de POST para encriptar a senha

Com as funções de criação de hash de senha e verificação de senha em vigor, agora podemos atualizar nossos endpoints para usar essa nova funcionalidade de encriptação.

Primeiro, modificaremos a função create_user para criar um hash da senha antes de armazená-la no banco de dados. Para fazer isso precisamos importar a função de geração de hash get_password_hash e no momento da criação do registro na tabela a senha deve ser passada com o hash gerado:

fast_zero/app.py
from fast_zero.security import get_password_hash

# ...

@app.post('/users/', status_code=HTTPStatus.CREATED, response_model=UserPublic)
def create_user(user: UserSchema, session: Session = Depends(get_session)):
    db_user = session.scalar(
        select(User).where(
            (User.username == user.username) | (User.email == user.email)
        )
    )

    if db_user:
        if db_user.username == user.username:
            raise HTTPException(
                status_code=HTTPStatus.BAD_REQUEST,
                detail='Username already exists',
            )
        elif db_user.email == user.email:
            raise HTTPException(
                status_code=HTTPStatus.BAD_REQUEST,
                detail='Email already exists',
            )

    hashed_password = get_password_hash(user.password)

    db_user = User(
        email=user.email,
        username=user.username,
        password=hashed_password,
    )

    session.add(db_user)
    session.commit()
    session.refresh(db_user)

    return db_user

Desta forma, a senha não será mais criada em texto plano no objeto User. Fazendo com que caso exista algum problema relacionado a vazamento de dados, as senhas das pessoas nunca sejam expostas.

Sobre o teste da POST /users/

Por não validar o password, usando o retorno UserPublic, o teste já escrito deve passar normalmente:

$ Execução no terminal!
task test

# ...

tests/test_app.py::test_root_deve_retornar_ok_e_ola_mundo PASSED
tests/test_app.py::test_create_user PASSED

Modificando o endpoint de atualização de usuários

É igualmente importante modificar a função update_user para também criar um hash da senha antes de atualizar User no banco de dados. Caso contrário, a senha em texto puro seria armazenada no banco de dados no momento da atualização.

fast_zero/app.py
@app.put('/users/{user_id}', response_model=UserPublic)
def update_user(
    user_id: int,
    user: UserSchema,
    session: Session = Depends(get_session),
):
    db_user = session.scalar(select(User).where(User.id == user_id))
    if not db_user:
        raise HTTPException(
            status_code=HTTPStatus.NOT_FOUND, detail='User not found'
        )

    db_user.username = user.username
    db_user.password = get_password_hash(user.password)
    db_user.email = user.email
    session.commit()
    session.refresh(db_user)
    return db_user

Assim, a atualização de um User, via método PUT, também criará o hash da senha no momento da atualização. Pois, nesse caso em específico, existe a possibilidade de alterar qualquer coluna da tabela, inclusive o campo password.

Sobre os testes da PUT /users/{user_id}

Assim como no teste da rota de criação, os testes também passam normalmente por não validarem o campo password.

$ Execução no terminal!
task test

# ...

tests/test_app.py::test_root_deve_retornar_ok_e_ola_mundo PASSED
tests/test_app.py::test_create_user PASSED
tests/test_app.py::test_read_users PASSED
tests/test_app.py::test_read_users_with_users PASSED
tests/test_app.py::test_update_user PASSED

Criando um endpoint de geração do token

Antes de criar o endpoint, precisamos criar um schema para o nosso token. Em um contexto JWT, access_token é o próprio token que representa a sessão do usuário e contém informações sobre o usuário, enquanto token_type é um tipo de autenticação que será incluído no cabeçalho de autorização de cada solicitação. Em geral, o token_type para JWT é "bearer".

fast_zero/schemas.py
class Token(BaseModel):
    access_token: str
    token_type: str

Utilizando OAuth2PasswordRequestForm

A classe OAuth2PasswordRequestForm é uma classe especial do FastAPI que gera automaticamente um formulário para solicitar o username (email neste caso) e a senha. Este formulário será apresentado automaticamente no Swagger UI e Redoc, facilitando a realização de testes de autenticação.

Para usar os formulários no FastAPI, precisamos instalar o python-multipart:

$ Execução no terminal!
poetry add python-multipart

Criando um endpoint de geração do token

Agora criaremos o endpoint que irá autenticar o usuário e fornecer um token de acesso JWT. Este endpoint irá receber as informações de login do usuário, verificar se as credenciais são válidas e, em caso afirmativo, retornar um token de acesso JWT.

fast_zero/app.py
from fastapi.security import OAuth2PasswordRequestForm
from fast_zero.schemas import Message, Token, UserList, UserPublic, UserSchema
from fast_zero.security import (
    create_access_token,
    get_password_hash,
    verify_password,
)

# ...

@app.post('/token', response_model=Token)
def login_for_access_token(
    form_data: OAuth2PasswordRequestForm = Depends(),
    session: Session = Depends(get_session),
):
    user = session.scalar(select(User).where(User.email == form_data.username))

    if not user:
        raise HTTPException(
            status_code=HTTPStatus.BAD_REQUEST,
            detail='Incorrect email or password'
        )

    if not verify_password(form_data.password, user.password):
        raise HTTPException(
            status_code=HTTPStatus.BAD_REQUEST,
            detail='Incorrect email or password'
        )

    access_token = create_access_token(data={'sub': user.email})

    return {'access_token': access_token, 'token_type': 'bearer'}

Esse endpoint recebe os dados do formulário através do form_data (que são injetados automaticamente graças ao Depends()) e tenta recuperar um usuário com o email fornecido. Se o usuário não for encontrado ou a senha não corresponder ao hash armazenado no banco de dados, uma exceção é lançada. Caso contrário, um token de acesso é criado usando o create_access_token() que criamos anteriormente e retornado como uma resposta.

Testando /token

Agora escreveremos um teste para verificar se o nosso novo endpoint está funcionando corretamente.

tests/test_app.py
def test_get_token(client, user):
    response = client.post(
        '/token',
        data={'username': user.email, 'password': user.password},
    )
    token = response.json()

    assert response.status_code == HTTPStatus.OK
    assert 'access_token' in token
    assert 'token_type' in token

Nesse teste, nós enviamos uma requisição POST para o endpoint "/token" com um username e uma senha válidos. Então, nós verificamos que a resposta contém um "access_token" e um "token_type", que são os campos que esperamos de um JWT válido.

No entanto, há um problema. Agora que a senha está sendo criptografada, nosso teste falhará:

$ Execução no terminal!
task test

# ...
tests/test_app.py::test_get_token FAILED

Para corrigir isso, precisamos garantir que a senha esteja sendo criptografada na fixture antes de ser salva:

tests/conftest.py
from fast_zero.security import get_password_hash

# ...

@pytest.fixture
def user(session):
    user = User(
        username='Teste',
        email='teste@test.com',
        password=get_password_hash('testtest'),
    )
    session.add(user)
    session.commit()
    session.refresh(user)

    return user

Rodaremos o teste novamente. No entanto, ainda teremos um problema. Agora só temos a versão criptografada da senha, que não é útil para fazer o login, já que o login exige a senha em texto puro:

$ Execução no terminal!
task test

# ...
tests/test_app.py::test_get_token FAILED

Para resolver isso, faremos uma modificação no objeto user (um monkey patch) para adicionar a senha em texto puro:

tests/conftest.py
@pytest.fixture
def user(session):
    password = 'testtest'
    user = User(
        username='Teste',
        email='teste@test.com',
        password=get_password_hash(password),
    )
    session.add(user)
    session.commit()
    session.refresh(user)

    user.clean_password = 'testtest'

    return user

Monkey patching é uma técnica em que modificamos ou estendemos o código em tempo de execução. Neste caso, estamos adicionando um novo atributo clean_password ao objeto user para armazenar a senha em texto puro.

Agora, podemos alterar o teste para usar clean_password:

tests/test_app.py
def test_get_token(client, user):
    response = client.post(
        '/token',
        data={'username': user.email, 'password': user.clean_password},
    )
    token = response.json()

    assert response.status_code == HTTPStatus.OK
    assert 'access_token' in token
    assert 'token_type' in token

E agora todos os testes devem passar normalmente:

$ Execução no terminal!
task test

# ...

tests/test_app.py::test_root_deve_retornar_ok_e_ola_mundo PASSED
tests/test_app.py::test_create_user PASSED
tests/test_app.py::test_read_users PASSED
tests/test_app.py::test_read_users_with_users PASSED
tests/test_app.py::test_update_user PASSED
tests/test_app.py::test_delete_user PASSED
tests/test_app.py::test_get_token PASSED
tests/test_db.py::test_create_user PASSED

Isso conclui a parte de autenticação de nossa API. No próximo passo, implementaremos a autorização nos endpoints.

Protegendo os Endpoints

Agora que temos uma forma de autenticar nossos usuários e emitir tokens JWT, é hora de usar essa infraestrutura para proteger nossos endpoints. Neste passo, adicionaremos autenticação aos endpoints PUT e DELETE.

Para garantir que as informações do usuário sejam extraídas corretamente do token JWT, precisamos de um schema especial, o TokenData. Esse schema será utilizado para tipificar os dados extraídos do token JWT e garantir que temos um campo username que será usado para identificar o usuário.

fast_zero/schemas.py
class TokenData(BaseModel):
    username: str | None = None

Nesse ponto, criaremos uma a função get_current_user que será responsável por extrair o token JWT do header Authorization da requisição, decodificar esse token, extrair as informações do usuário e obter finalmente o usuário do banco de dados. Se qualquer um desses passos falhar, uma exceção será lançada e a requisição será negada. A adicionaremos ao security.py:

fast_zero/security.py
from datetime import datetime, timedelta
from http import HTTPStatus

from fastapi import Depends, HTTPException
from fastapi.security import OAuth2PasswordBearer
from jwt import DecodeError, decode, encode
from pwdlib import PasswordHash
from sqlalchemy import select
from sqlalchemy.orm import Session

from fast_zero.database import get_session
from fast_zero.models import User
from fast_zero.schemas import TokenData

# ...

oauth2_scheme = OAuth2PasswordBearer(tokenUrl='token')

def get_current_user(
    session: Session = Depends(get_session),
    token: str = Depends(oauth2_scheme), #(1)!
):
    credentials_exception = HTTPException(  #(2)!
        status_code=HTTPStatus.UNAUTHORIZED,
        detail='Could not validate credentials',
        headers={'WWW-Authenticate': 'Bearer'},
    )

    try:
        payload = decode(token, SECRET_KEY, algorithms=[ALGORITHM])
        username: str = payload.get('sub')
        if not username:
            raise credentials_exception  #(3)!
        token_data = TokenData(username=username)
    except DecodeError:
        raise credentials_exception  #(4)!

    user = session.scalar(
        select(User).where(User.email == token_data.username)
    )

    if not user:
        raise credentials_exception  #(5)!

    return user
  1. A injeção de oauth2_scheme garante que um token foi enviado, caso não tenha sido enviado ele redirecionará a tokenUrl do objeto OAuth2PasswordBearer.
  2. Como essa operação pode apresentar erros em diversos momentos, foi atribuído um único erro a variável credentials_exception.
  3. Nessa primeira validação é checado se após o decode do token a chave username está presente. Caso não esteja, o erro será levantado.
  4. Nessa validação é testada se o token é um token JWT válido. Caso não seja, o erro também será levantado.
  5. Nossa última validação checa, após de garantir que existe username, se ele está presente em nossa base de dados. Caso não, o erro será levantado.

Aqui, a função get_current_user é definida como assíncrona, indicando que ela pode realizar operações de IO (como consultar um banco de dados) de forma não bloqueante. Esta função aceita dois argumentos: session e token. O session é obtido através da função get_session (não mostrada aqui), que deve retornar uma sessão de banco de dados ativa. O token é obtido do header de autorização da requisição, que é esperado ser do tipo Bearer (indicado pelo esquema OAuth2).

A variável credentials_exception é definida como uma exceção HTTP que será lançada sempre que houver um problema com as credenciais fornecidas pelo usuário. O status 401 indica que a autenticação falhou e a mensagem "Could not validate credentials" é retornada ao cliente. Além disso, um cabeçalho 'WWW-Authenticate' é incluído na resposta, indicando que o cliente deve fornecer autenticação.

No bloco try, tentamos decodificar o token JWT usando a chave secreta e o algoritmo especificado. O token decodificado é armazenado na variável payload. Extraímos o campo 'sub' (normalmente usado para armazenar o identificador do usuário no token JWT) e verificamos se ele existe. Se não, lançamos a exceção credentials_exception. Em seguida, criamos um objeto TokenData com o username.

Por fim, realizamos uma consulta ao banco de dados para encontrar o usuário com o e-mail correspondente ao username contido no token. session.scalar é usado para retornar a primeira coluna do primeiro resultado da consulta. Se nenhum usuário for encontrado, lançamos a exceção credentials_exception. Se um usuário for encontrado, retornamos esse usuário.

Aplicação da proteção ao endpoint

Primeiro, aplicaremos a autenticação no endpoint PUT. Se o user_id da rota não corresponder ao id do usuário autenticado, retornaremos um erro 400. Se tudo estiver correto, o usuário será atualizado normalmente.

fast_zero/app.py
from fast_zero.security import (
    create_access_token,
    get_current_user,
    get_password_hash,
    verify_password,
)

# ...

@app.put('/users/{user_id}', response_model=UserPublic)
def update_user(
    user_id: int,
    user: UserSchema,
    session: Session = Depends(get_session),
    current_user: User = Depends(get_current_user),
):
    if current_user.id != user_id:
        raise HTTPException(
            status_code=HTTPStatus.FORBIDDEN, detail='Not enough permissions'
        )

    current_user.username = user.username
    current_user.password = get_password_hash(user.password)
    current_user.email = user.email
    session.commit()
    session.refresh(current_user)

    return current_user

Com isso, podemos remover a query feita no endpoint para encontrar o User, pois ela já está sendo feita no get_current_user, simplificando ainda mais nosso endpoint.

Agora, aplicaremos a autenticação no endpoint DELETE. Semelhante ao PUT, se o user_id da rota não corresponder ao id do usuário autenticado, retornaremos um erro 400. Se tudo estiver correto, o usuário será deletado.

fast_zero/app.py
@app.delete('/users/{user_id}', response_model=Message)
def delete_user(
    user_id: int,
    session: Session = Depends(get_session),
    current_user: User = Depends(get_current_user),
):
    if current_user.id != user_id:
        raise HTTPException(
            status_code=HTTPStatus.FORBIDDEN, detail='Not enough permissions'
        )

    session.delete(current_user)
    session.commit()

    return {'message': 'User deleted'}

Com essa nova dependência, o FastAPI automaticamente garantirá que um token de autenticação válido seja fornecido antes de permitir o acesso a esses endpoints. Se o token não for válido, ou se o usuário tentar modificar ou deletar um usuário diferente, um erro será retornado.

Atualizando os Testes

Os testes precisam ser atualizados para refletir essas mudanças. Primeiro, precisamos criar uma nova fixture que gere um token para um usuário de teste.

tests/conftest.py
@pytest.fixture
def token(client, user):
    response = client.post(
        '/token',
        data={'username': user.email, 'password': user.clean_password},
    )
    return response.json()['access_token']

Agora, podemos atualizar os testes para o endpoint PUT e DELETE para incluir a autenticação.

tests/test_app.py
def test_update_user(client, user, token):
    response = client.put(
        f'/users/{user.id}',
        headers={'Authorization': f'Bearer {token}'},
        json={
            'username': 'bob',
            'email': 'bob@example.com',
            'password': 'mynewpassword',
        },
    )
    assert response.status_code == HTTPStatus.OK
    assert response.json() == {
        'username': 'bob',
        'email': 'bob@example.com',
        'id': user.id,
    }


def test_delete_user(client, user, token):
    response = client.delete(
        f'/users/{user.id}',
        headers={'Authorization': f'Bearer {token}'},
    )
    assert response.status_code == HTTPStatus.OK
    assert response.json() == {'message': 'User deleted'}

Finalmente, podemos rodar todos os testes para garantir que tudo esteja funcionando corretamente.

$ Execução no terminal!
task test

# ...

tests/test_app.py::test_root_deve_retornar_ok_e_ola_mundo PASSED
tests/test_app.py::test_create_user PASSED
tests/test_app.py::test_read_users PASSED
tests/test_app.py::test_read_users_with_users PASSED
tests/test_app.py::test_update_user PASSED
tests/test_app.py::test_delete_user PASSED
tests/test_app.py::test_get_token PASSED
tests/test_db.py::test_create_user PASSED

Com essas alterações, nossos endpoints agora estão seguramente protegidos pela autenticação. Apenas os usuários autenticados podem alterar ou deletar seus próprios dados. Isso traz uma camada adicional de segurança e integridade para o nosso aplicativo.

Checagem de tokens inválidos

Uma coisa que pode ter passado em branco durante a fase de testes é a validação do token JWT. Ele pode estar em um formato inválido, às vezes por erro do cliente, outras por algum problema de segurança. É importante validarmos como a aplicação vai reagir a um token inválido.

Durante a construção da função get_current_user, criamos um fluxo para esse erro:

Parte do código da função `get_current_user` em security.py
credentials_exception = HTTPException(
    status_code=HTTPStatus.UNAUTHORIZED,
    detail='Could not validate credentials',
    headers={'WWW-Authenticate': 'Bearer'},
)

try:
    payload = decode(token, SECRET_KEY, algorithms=[ALGORITHM])
    # Código para caso o token seja decodificado
except DecodeError:
    raise credentials_exception

Caso não seja possível decodificar o token, por algum motivo, é importante validarmos se o retorno será um 401 UNAUTHORIZED. Para isso, seria importante criar um teste que verifica essa camada.

Poderíamos fazer de duas formas, chamando diretamente a função get_current_user passando um token inválido, ou fazer a validação diretamente via a requisição de alguém que dependa de um token válido. Optarei pela segunda opção, por ser mais próxima de uma requisição real.

tests/test_security.py
from http import HTTPStatus

# ...

def test_jwt_invalid_token(client):
    response = client.delete(
        '/users/1', headers={'Authorization': 'Bearer token-invalido'}
    )

    assert response.status_code == HTTPStatus.UNAUTHORIZED
    assert response.json() == {'detail': 'Could not validate credentials'}

Usamos um assert para validar se o código é mesmo 401 e outro para validar nossa mensagem de erro. Assim, temos uma garantia que, caso alguém envie um token inválido, a resposta seguirá como esperado.

Antes de finalizar, ideal seria executarmos os testes:

$ Execução no terminal!
task test

# ...

tests/test_app.py::test_get_token PASSED
tests/test_db.py::test_create_user PASSED
tests/test_security.py::test_jwt_invalid_token PASSED

Exercícios

  1. Faça um teste para cobrir o cenário que levanta exception credentials_exception na autenticação caso o User não seja encontrado. Ao olhar a cobertura de security.py você vai notar que esse contexto não está coberto.

  2. Reveja os testes criados até a aula 5 e veja se eles ainda fazem sentido (testes envolvendo 400)

Commit

Depois de finalizar a proteção dos endpoints e atualizar os testes, é hora de fazer commit das alterações. Não se esqueça de revisar as alterações antes de fazer o commit.

$ Execução no terminal!
git status
git add .
git commit -m "Protege os endpoints PUT e DELETE com autenticação"

Conclusão

Nesta aula, demos um passo importante para aumentar a segurança da nossa API. Implementamos a autenticação e a autorização para os endpoints PUT e DELETE, garantindo que apenas usuários autenticados possam alterar ou excluir seus próprios dados. Também atualizamos os testes para incluir a autenticação. Na próxima aula, continuaremos a expandir a funcionalidade da nossa API. Até lá!


Agora que a aula acabou, é um bom momento para você relembrar alguns conceitos e fixar melhor o conteúdo respondendo ao questionário referente a ela.

Quiz