PyJWT – работа с JWT-токенами

онлайн тренажер по питону
Онлайн-тренажер Python для начинающих

Изучайте Python легко и без перегрузки теорией. Решайте практические задачи с автоматической проверкой, получайте подсказки на русском языке и пишите код прямо в браузере — без необходимости что-либо устанавливать.

Начать курс

Введение в PyJWT

JSON Web Token (JWT) стал стандартом в сфере аутентификации и авторизации API. Он позволяет передавать удостоверяющую информацию между клиентом и сервером безопасно и без необходимости сохранять сессии на стороне сервера. Одной из наиболее популярных библиотек в Python для работы с JWT является PyJWT.

PyJWT – это Python-библиотека для работы с JWT-токенами, которая упрощает создание, декодирование и проверку JWT, обеспечивая при этом поддержку различных алгоритмов шифрования. Эта статья подробно объяснит, как эффективно и безопасно использовать PyJWT в вашем проекте.

Основы JWT (JSON Web Token)

Структура JWT-токена

JWT состоит из трех частей, разделенных точками:

header.payload.signature
  • Header — содержит тип токена и используемый алгоритм (например, HS256)
  • Payload — содержит claims (требования), то есть полезные данные
  • Signature — используется для проверки подлинности токена

Преимущества использования JWT

  • Stateless: серверу не нужно хранить сессии
  • Компактность: легко передается в HTTP-заголовках
  • Безопасность: поддержка HMAC и RSA шифрования
  • Кроссплатформенность: работает на всех языках программирования
  • Самодостаточность: содержит всю необходимую информацию о пользователе

Принцип работы JWT

JWT работает по следующему принципу:

  1. Пользователь аутентифицируется на сервере
  2. Сервер генерирует JWT-токен с информацией о пользователе
  3. Клиент сохраняет токен и отправляет его в заголовке Authorization
  4. Сервер проверяет подпись токена и извлекает данные пользователя

Установка и настройка PyJWT

Как установить PyJWT

pip install PyJWT

Для работы с алгоритмами RSA и ECDSA установите дополнительные зависимости:

pip install "PyJWT[crypto]"

Импорт и начальная настройка

import jwt
import datetime
import os
from typing import Dict, Any

PyJWT полностью реализует стандарт RFC 7519 и предоставляет простой интерфейс для генерации и верификации токенов.

Детальное описание библиотеки PyJWT

История и развитие

PyJWT была создана в 2013 году и является одной из самых популярных библиотек для работы с JWT-токенами в Python. Библиотека активно развивается и поддерживается сообществом, регулярно обновляется для соответствия последним стандартам безопасности.

Архитектура библиотеки

PyJWT построена на модульной архитектуре с четким разделением ответственности:

  • Ядро кодирования/декодирования — основные функции для работы с токенами
  • Модуль алгоритмов — поддержка различных алгоритмов шифрования
  • Модуль исключений — обработка ошибок и исключительных ситуаций
  • Модуль утилит — вспомогательные функции для работы с временем и ключами

Основные возможности

  • Поддержка всех стандартных алгоритмов JWT
  • Автоматическая проверка времени жизни токена
  • Возможность добавления пользовательских claims
  • Поддержка различных типов ключей (симметричные и асимметричные)
  • Интеграция с популярными веб-фреймворками

Создание JWT-токенов с помощью PyJWT

Базовый пример

payload = {"user_id": 123}
secret = "mysecret"
token = jwt.encode(payload, secret, algorithm="HS256")
print(token)

Указание срока действия и дополнительных данных

payload = {
    "user_id": 123,
    "username": "admin",
    "role": "administrator",
    "exp": datetime.datetime.utcnow() + datetime.timedelta(hours=1),
    "iat": datetime.datetime.utcnow(),
    "iss": "myapp"
}
token = jwt.encode(payload, secret, algorithm="HS256")

exp означает expiration — срок действия токена, который крайне рекомендуется использовать для обеспечения безопасности.

Создание токена с пользовательскими заголовками

headers = {
    "kid": "key-id-001",
    "typ": "JWT"
}

payload = {
    "user_id": 123,
    "exp": datetime.datetime.utcnow() + datetime.timedelta(hours=1)
}

token = jwt.encode(payload, secret, algorithm="HS256", headers=headers)

Проверка и декодирование JWT-токенов

Как декодировать токен

try:
    decoded = jwt.decode(token, secret, algorithms=["HS256"])
    print(decoded)
except jwt.InvalidTokenError:
    print("Токен недействителен")

Обработка ошибок и истечения срока действия

try:
    decoded = jwt.decode(token, secret, algorithms=["HS256"])
    print(f"Пользователь: {decoded['user_id']}")
except jwt.ExpiredSignatureError:
    print("Срок действия токена истёк")
except jwt.InvalidSignatureError:
    print("Недействительная подпись токена")
except jwt.InvalidTokenError:
    print("Недействительный токен")

Декодирование с дополнительными опциями

options = {
    "verify_exp": True,
    "verify_signature": True,
    "verify_aud": False,
    "verify_iss": False
}

decoded = jwt.decode(token, secret, algorithms=["HS256"], options=options)

Алгоритмы шифрования в PyJWT

Поддерживаемые алгоритмы

PyJWT поддерживает следующие алгоритмы:

  • HMAC: HS256, HS384, HS512
  • RSA: RS256, RS384, RS512
  • ECDSA: ES256, ES384, ES512
  • EdDSA: EdDSA
  • RSA-PSS: PS256, PS384, PS512

Как выбрать нужный алгоритм

  • Для простых приложений: HS256 (секретный ключ)
  • Для распределенных систем: RS256 (публичный/приватный ключ)
  • Для максимальной безопасности: ES256 (эллиптическая криптография)

Примеры использования разных алгоритмов

# HS256 - симметричное шифрование
payload = {"user_id": 123}
secret = "my-secret-key"
token_hs256 = jwt.encode(payload, secret, algorithm="HS256")

# RS256 - асимметричное шифрование
with open("private_key.pem", "r") as f:
    private_key = f.read()
    
token_rs256 = jwt.encode(payload, private_key, algorithm="RS256")

# ES256 - эллиптическая криптография
with open("ec_private_key.pem", "r") as f:
    ec_private_key = f.read()
    
token_es256 = jwt.encode(payload, ec_private_key, algorithm="ES256")

Таблица методов и функций PyJWT

Метод/Функция Описание Параметры Возвращаемое значение
jwt.encode() Создает JWT-токен payload, key, algorithm, headers строка (JWT-токен)
jwt.decode() Декодирует JWT-токен token, key, algorithms, options словарь (payload)
jwt.get_unverified_header() Получает заголовки без проверки token словарь (header)
jwt.get_unverified_claims() Получает payload без проверки token словарь (claims)
jwt.decode_complete() Полное декодирование токена token, key, algorithms, options словарь (header + payload)
jwt.get_algorithm_by_name() Получает алгоритм по имени algorithm_name объект алгоритма
jwt.register_algorithm() Регистрирует новый алгоритм algorithm_name, algorithm_obj None
jwt.unregister_algorithm() Удаляет алгоритм algorithm_name None

Работа с секретами и ключами

Безопасность хранения ключей

Никогда не храните ключи в открытом виде в коде. Используйте переменные окружения и менеджеры секретов:

import os
from cryptography.hazmat.primitives import serialization

# Загрузка из переменных окружения
secret = os.environ.get("JWT_SECRET")
if not secret:
    raise ValueError("JWT_SECRET не установлен")

# Загрузка приватного ключа
def load_private_key(key_path: str, password: str = None):
    with open(key_path, "rb") as key_file:
        private_key = serialization.load_pem_private_key(
            key_file.read(),
            password=password.encode() if password else None,
        )
    return private_key

Использование публичных и приватных ключей

# Генерация ключей RSA
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.hazmat.primitives import serialization

# Генерация приватного ключа
private_key = rsa.generate_private_key(
    public_exponent=65537,
    key_size=2048,
)

# Получение публичного ключа
public_key = private_key.public_key()

# Сериализация ключей
private_pem = private_key.private_bytes(
    encoding=serialization.Encoding.PEM,
    format=serialization.PrivateFormat.PKCS8,
    encryption_algorithm=serialization.NoEncryption()
)

public_pem = public_key.public_bytes(
    encoding=serialization.Encoding.PEM,
    format=serialization.PublicFormat.SubjectPublicKeyInfo
)

# Использование для JWT
payload = {"user_id": 123, "exp": datetime.datetime.utcnow() + datetime.timedelta(hours=1)}
token = jwt.encode(payload, private_pem, algorithm="RS256")
decoded = jwt.decode(token, public_pem, algorithms=["RS256"])

Ротация ключей

class KeyManager:
    def __init__(self):
        self.keys = {}
        self.current_key_id = "key-001"
    
    def add_key(self, key_id: str, key: str):
        self.keys[key_id] = key
    
    def get_key(self, key_id: str):
        return self.keys.get(key_id)
    
    def create_token(self, payload: dict):
        headers = {"kid": self.current_key_id}
        return jwt.encode(
            payload, 
            self.keys[self.current_key_id], 
            algorithm="HS256",
            headers=headers
        )
    
    def verify_token(self, token: str):
        unverified_header = jwt.get_unverified_header(token)
        key_id = unverified_header.get("kid")
        key = self.get_key(key_id)
        
        if not key:
            raise jwt.InvalidTokenError("Неизвестный ключ")
        
        return jwt.decode(token, key, algorithms=["HS256"])

Интеграция PyJWT в веб-приложения

Пример с Flask

from flask import Flask, request, jsonify
import jwt
import datetime
from functools import wraps

app = Flask(__name__)
SECRET_KEY = "your-secret-key"

def token_required(f):
    @wraps(f)
    def decorated(*args, **kwargs):
        token = request.headers.get('Authorization')
        
        if not token:
            return jsonify({'message': 'Токен отсутствует'}), 401
        
        try:
            token = token.split(' ')[1]  # Убираем "Bearer "
            data = jwt.decode(token, SECRET_KEY, algorithms=["HS256"])
            current_user = data['user_id']
        except jwt.ExpiredSignatureError:
            return jsonify({'message': 'Токен истёк'}), 401
        except jwt.InvalidTokenError:
            return jsonify({'message': 'Недействительный токен'}), 401
        
        return f(current_user, *args, **kwargs)
    return decorated

@app.route("/login", methods=["POST"])
def login():
    # Здесь должна быть логика проверки пользователя
    user_id = request.json.get('user_id')
    
    payload = {
        "user_id": user_id,
        "exp": datetime.datetime.utcnow() + datetime.timedelta(hours=1),
        "iat": datetime.datetime.utcnow()
    }
    
    token = jwt.encode(payload, SECRET_KEY, algorithm="HS256")
    return jsonify({"token": token})

@app.route("/protected")
@token_required
def protected(current_user):
    return jsonify({
        "message": "Доступ разрешён",
        "user_id": current_user
    })

Пример с FastAPI

from fastapi import FastAPI, HTTPException, Depends
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
import jwt
import datetime
from typing import Optional

app = FastAPI()
security = HTTPBearer()

SECRET_KEY = "your-secret-key"
ALGORITHM = "HS256"

def create_access_token(data: dict, expires_delta: Optional[datetime.timedelta] = None):
    to_encode = data.copy()
    if expires_delta:
        expire = datetime.datetime.utcnow() + expires_delta
    else:
        expire = datetime.datetime.utcnow() + datetime.timedelta(minutes=15)
    
    to_encode.update({"exp": expire})
    encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
    return encoded_jwt

def verify_token(credentials: HTTPAuthorizationCredentials = Depends(security)):
    try:
        payload = jwt.decode(credentials.credentials, SECRET_KEY, algorithms=[ALGORITHM])
        user_id: str = payload.get("sub")
        if user_id is None:
            raise HTTPException(status_code=401, detail="Недействительный токен")
        return user_id
    except jwt.ExpiredSignatureError:
        raise HTTPException(status_code=401, detail="Токен истёк")
    except jwt.InvalidTokenError:
        raise HTTPException(status_code=401, detail="Недействительный токен")

@app.post("/login")
async def login(user_id: str):
    access_token = create_access_token(data={"sub": user_id})
    return {"access_token": access_token, "token_type": "bearer"}

@app.get("/protected")
async def protected_route(current_user: str = Depends(verify_token)):
    return {"message": f"Добро пожаловать, пользователь {current_user}"}

Использование middleware для аутентификации

from fastapi import FastAPI, Request, HTTPException
from fastapi.middleware.base import BaseHTTPMiddleware
import jwt

class JWTMiddleware(BaseHTTPMiddleware):
    def __init__(self, app, secret_key: str):
        super().__init__(app)
        self.secret_key = secret_key
        self.protected_paths = ["/api/protected", "/api/admin"]
    
    async def dispatch(self, request: Request, call_next):
        path = request.url.path
        
        if path in self.protected_paths:
            auth_header = request.headers.get("Authorization")
            if not auth_header:
                raise HTTPException(status_code=401, detail="Токен отсутствует")
            
            try:
                token = auth_header.split(" ")[1]
                payload = jwt.decode(token, self.secret_key, algorithms=["HS256"])
                request.state.user = payload
            except jwt.InvalidTokenError:
                raise HTTPException(status_code=401, detail="Недействительный токен")
        
        response = await call_next(request)
        return response

app = FastAPI()
app.add_middleware(JWTMiddleware, secret_key="your-secret-key")

Обработка ошибок и исключений

Типичные ошибки PyJWT

PyJWT предоставляет иерархию исключений для различных типов ошибок:

try:
    decoded = jwt.decode(token, secret, algorithms=["HS256"])
except jwt.ExpiredSignatureError:
    # Токен истёк
    print("Срок действия токена истёк")
except jwt.InvalidSignatureError:
    # Неверная подпись
    print("Подпись токена недействительна")
except jwt.InvalidAudienceError:
    # Неверная аудитория
    print("Токен предназначен для другой аудитории")
except jwt.InvalidIssuerError:
    # Неверный издатель
    print("Токен выдан неизвестным издателем")
except jwt.InvalidKeyError:
    # Неверный ключ
    print("Ключ недействителен")
except jwt.InvalidTokenError:
    # Базовое исключение для всех ошибок
    print("Токен недействителен")
except jwt.DecodeError:
    # Ошибка декодирования
    print("Не удалось декодировать токен")

Создание пользовательского обработчика ошибок

class JWTErrorHandler:
    @staticmethod
    def handle_jwt_error(error: Exception) -> tuple:
        if isinstance(error, jwt.ExpiredSignatureError):
            return {"error": "Токен истёк", "code": "TOKEN_EXPIRED"}, 401
        elif isinstance(error, jwt.InvalidSignatureError):
            return {"error": "Недействительная подпись", "code": "INVALID_SIGNATURE"}, 401
        elif isinstance(error, jwt.InvalidTokenError):
            return {"error": "Недействительный токен", "code": "INVALID_TOKEN"}, 401
        else:
            return {"error": "Неизвестная ошибка", "code": "UNKNOWN_ERROR"}, 500

# Использование
def verify_token_with_error_handling(token: str, secret: str):
    try:
        return jwt.decode(token, secret, algorithms=["HS256"])
    except Exception as e:
        error_response, status_code = JWTErrorHandler.handle_jwt_error(e)
        raise HTTPException(status_code=status_code, detail=error_response)

Расширенные возможности PyJWT

Пользовательские payload и claims

def create_comprehensive_token(user_data: dict, permissions: list, expires_in: int = 3600):
    payload = {
        # Стандартные claims
        "sub": str(user_data["id"]),  # Subject
        "iss": "myapp.com",           # Issuer
        "aud": "myapp-users",         # Audience
        "exp": datetime.datetime.utcnow() + datetime.timedelta(seconds=expires_in),
        "iat": datetime.datetime.utcnow(),  # Issued at
        "nbf": datetime.datetime.utcnow(),  # Not before
        "jti": str(uuid.uuid4()),           # JWT ID
        
        # Пользовательские claims
        "username": user_data["username"],
        "email": user_data["email"],
        "role": user_data["role"],
        "permissions": permissions,
        "last_login": user_data.get("last_login"),
        "ip_address": user_data.get("ip_address"),
        "device_id": user_data.get("device_id")
    }
    
    return jwt.encode(payload, SECRET_KEY, algorithm="HS256")

Валидация пользовательских claims

def validate_token_claims(token: str, secret: str, required_permissions: list = None):
    try:
        payload = jwt.decode(token, secret, algorithms=["HS256"])
        
        # Проверка обязательных полей
        required_fields = ["sub", "username", "role"]
        for field in required_fields:
            if field not in payload:
                raise ValueError(f"Отсутствует обязательное поле: {field}")
        
        # Проверка прав доступа
        if required_permissions:
            user_permissions = payload.get("permissions", [])
            if not all(perm in user_permissions for perm in required_permissions):
                raise ValueError("Недостаточно прав доступа")
        
        return payload
    except jwt.InvalidTokenError as e:
        raise ValueError(f"Недействительный токен: {str(e)}")

Работа с несколькими алгоритмами

class MultiAlgorithmJWT:
    def __init__(self):
        self.algorithms = {
            "HS256": {"secret": "hmac-secret"},
            "RS256": {
                "private_key": self.load_private_key("private.pem"),
                "public_key": self.load_public_key("public.pem")
            }
        }
    
    def encode_token(self, payload: dict, algorithm: str = "HS256"):
        if algorithm == "HS256":
            key = self.algorithms[algorithm]["secret"]
        elif algorithm == "RS256":
            key = self.algorithms[algorithm]["private_key"]
        else:
            raise ValueError(f"Неподдерживаемый алгоритм: {algorithm}")
        
        return jwt.encode(payload, key, algorithm=algorithm)
    
    def decode_token(self, token: str):
        # Получаем алгоритм из заголовка
        header = jwt.get_unverified_header(token)
        algorithm = header.get("alg")
        
        if algorithm == "HS256":
            key = self.algorithms[algorithm]["secret"]
        elif algorithm == "RS256":
            key = self.algorithms[algorithm]["public_key"]
        else:
            raise ValueError(f"Неподдерживаемый алгоритм: {algorithm}")
        
        return jwt.decode(token, key, algorithms=[algorithm])

Тестирование токенов с PyJWT

Базовые тесты

import unittest
from unittest.mock import patch
import jwt
import datetime

class TestJWTTokens(unittest.TestCase):
    def setUp(self):
        self.secret = "test-secret"
        self.payload = {"user_id": 123, "username": "testuser"}
    
    def test_token_creation(self):
        token = jwt.encode(self.payload, self.secret, algorithm="HS256")
        self.assertIsInstance(token, str)
        self.assertTrue(len(token) > 0)
    
    def test_token_decoding(self):
        token = jwt.encode(self.payload, self.secret, algorithm="HS256")
        decoded = jwt.decode(token, self.secret, algorithms=["HS256"])
        self.assertEqual(decoded["user_id"], 123)
        self.assertEqual(decoded["username"], "testuser")
    
    def test_expired_token(self):
        expired_payload = {
            "user_id": 123,
            "exp": datetime.datetime.utcnow() - datetime.timedelta(seconds=1)
        }
        token = jwt.encode(expired_payload, self.secret, algorithm="HS256")
        
        with self.assertRaises(jwt.ExpiredSignatureError):
            jwt.decode(token, self.secret, algorithms=["HS256"])
    
    def test_invalid_signature(self):
        token = jwt.encode(self.payload, self.secret, algorithm="HS256")
        
        with self.assertRaises(jwt.InvalidSignatureError):
            jwt.decode(token, "wrong-secret", algorithms=["HS256"])
    
    @patch('datetime.datetime')
    def test_token_with_mocked_time(self, mock_datetime):
        # Мокаем время для тестирования
        mock_datetime.utcnow.return_value = datetime.datetime(2023, 1, 1, 12, 0, 0)
        
        payload = {
            "user_id": 123,
            "exp": datetime.datetime(2023, 1, 1, 13, 0, 0)  # Истекает через час
        }
        token = jwt.encode(payload, self.secret, algorithm="HS256")
        decoded = jwt.decode(token, self.secret, algorithms=["HS256"])
        
        self.assertEqual(decoded["user_id"], 123)

Тесты интеграции с Flask

import unittest
from flask import Flask
import json

class TestFlaskJWTIntegration(unittest.TestCase):
    def setUp(self):
        self.app = Flask(__name__)
        self.app.config['SECRET_KEY'] = 'test-secret'
        self.client = self.app.test_client()
        
        # Добавляем роуты для тестирования
        @self.app.route('/login', methods=['POST'])
        def login():
            token = jwt.encode({'user_id': 123}, 'test-secret', algorithm='HS256')
            return {'token': token}
        
        @self.app.route('/protected')
        def protected():
            auth_header = request.headers.get('Authorization')
            if not auth_header:
                return {'error': 'No token'}, 401
            
            token = auth_header.split(' ')[1]
            try:
                payload = jwt.decode(token, 'test-secret', algorithms=['HS256'])
                return {'user_id': payload['user_id']}
            except jwt.InvalidTokenError:
                return {'error': 'Invalid token'}, 401
    
    def test_login_endpoint(self):
        response = self.client.post('/login')
        self.assertEqual(response.status_code, 200)
        data = json.loads(response.data)
        self.assertIn('token', data)
    
    def test_protected_endpoint_with_token(self):
        # Получаем токен
        login_response = self.client.post('/login')
        token = json.loads(login_response.data)['token']
        
        # Используем токен для доступа к защищённому эндпоинту
        headers = {'Authorization': f'Bearer {token}'}
        response = self.client.get('/protected', headers=headers)
        
        self.assertEqual(response.status_code, 200)
        data = json.loads(response.data)
        self.assertEqual(data['user_id'], 123)
    
    def test_protected_endpoint_without_token(self):
        response = self.client.get('/protected')
        self.assertEqual(response.status_code, 401)

Производительность и оптимизация

Бенчмарки PyJWT

import time
import jwt
from typing import List

def benchmark_jwt_operations(iterations: int = 10000):
    secret = "benchmark-secret"
    payload = {"user_id": 123, "role": "user"}
    
    # Тест создания токенов
    start_time = time.time()
    tokens = []
    for _ in range(iterations):
        token = jwt.encode(payload, secret, algorithm="HS256")
        tokens.append(token)
    encode_time = time.time() - start_time
    
    # Тест декодирования токенов
    start_time = time.time()
    for token in tokens:
        decoded = jwt.decode(token, secret, algorithms=["HS256"])
    decode_time = time.time() - start_time
    
    print(f"Создание {iterations} токенов: {encode_time:.2f}s")
    print(f"Декодирование {iterations} токенов: {decode_time:.2f}s")
    print(f"Создание токенов в секунду: {iterations/encode_time:.0f}")
    print(f"Декодирование токенов в секунду: {iterations/decode_time:.0f}")

# Запуск бенчмарка
benchmark_jwt_operations()

Оптимизация производительности

class OptimizedJWTHandler:
    def __init__(self, secret: str):
        self.secret = secret
        self.algorithm = "HS256"
        # Кэширование часто используемых данных
        self._token_cache = {}
        self._cache_ttl = 300  # 5 минут
    
    def encode_with_cache(self, payload: dict, cache_key: str = None):
        if cache_key and cache_key in self._token_cache:
            cached_token, timestamp = self._token_cache[cache_key]
            if time.time() - timestamp < self._cache_ttl:
                return cached_token
        
        token = jwt.encode(payload, self.secret, algorithm=self.algorithm)
        
        if cache_key:
            self._token_cache[cache_key] = (token, time.time())
        
        return token
    
    def batch_encode(self, payloads: List[dict]) -> List[str]:
        """Пакетное создание токенов для улучшения производительности"""
        return [
            jwt.encode(payload, self.secret, algorithm=self.algorithm)
            for payload in payloads
        ]
    
    def verify_with_cache(self, token: str):
        """Проверка токена с кэшированием результата"""
        token_hash = hash(token)
        if token_hash in self._token_cache:
            cached_result, timestamp = self._token_cache[token_hash]
            if time.time() - timestamp < 60:  # Кэш на 1 минуту
                return cached_result
        
        try:
            decoded = jwt.decode(token, self.secret, algorithms=[self.algorithm])
            self._token_cache[token_hash] = (decoded, time.time())
            return decoded
        except jwt.InvalidTokenError:
            return None

Сравнение PyJWT с другими библиотеками

Сравнительная таблица

Библиотека Особенности Плюсы Минусы
PyJWT Простота, гибкость, активная поддержка Легкость использования, хорошая документация Нет встроенной интеграции с веб-фреймворками
Authlib Расширенные возможности OAuth2 Полная поддержка OAuth2/OpenID Connect Более сложная настройка
python-jose Расширенная криптография Поддержка JWE, JWS, JWK Сложность интеграции
Flask-JWT-Extended Интеграция с Flask Готовые декораторы, refresh tokens Привязка к Flask

Миграция с других библиотек

# Миграция с python-jose
from jose import jwt as jose_jwt
import jwt as pyjwt

def migrate_from_jose():
    # Старый код с python-jose
    # token = jose_jwt.encode(payload, secret, algorithm="HS256")
    
    # Новый код с PyJWT
    token = pyjwt.encode(payload, secret, algorithm="HS256")
    
    # Основные различия:
    # 1. PyJWT возвращает строку, а не bytes
    # 2. Немного другая обработка ошибок
    # 3. Некоторые опции могут отличаться

Безопасность и рекомендации

Частые уязвимости и способы их предотвращения

# Плохо: отсутствие exp
payload = {"user_id": 123}
token = jwt.encode(payload, secret, algorithm="HS256")

# Хорошо: с указанием срока действия
payload = {
    "user_id": 123,
    "exp": datetime.datetime.utcnow() + datetime.timedelta(hours=1)
}
token = jwt.encode(payload, secret, algorithm="HS256")

# Плохо: слабый ключ
secret = "123"

# Хорошо: сильный ключ
import secrets
secret = secrets.token_urlsafe(32)

# Плохо: не проверяем алгоритм
decoded = jwt.decode(token, secret)

# Хорошо: явно указываем алгоритмы
decoded = jwt.decode(token, secret, algorithms=["HS256"])

Лучшие практики безопасности

class SecureJWTHandler:
    def __init__(self, secret: str):
        self.secret = secret
        self.algorithm = "HS256"
        self.max_token_age = datetime.timedelta(hours=1)
    
    def create_secure_token(self, user_id: int, additional_claims: dict = None):
        """Создание безопасного токена с обязательными полями"""
        now = datetime.datetime.utcnow()
        
        payload = {
            "sub": str(user_id),
            "iat": now,
            "exp": now + self.max_token_age,
            "nbf": now,
            "jti": str(uuid.uuid4()),
            "iss": "myapp.com"
        }
        
        if additional_claims:
            payload.update(additional_claims)
        
        return jwt.encode(payload, self.secret, algorithm=self.algorithm)
    
    def verify_secure_token(self, token: str, audience: str = None):
        """Безопасная проверка токена"""
        try:
            options = {
                "verify_signature": True,
                "verify_exp": True,
                "verify_nbf": True,
                "verify_iat": True,
                "verify_aud": bool(audience)
            }
            
            return jwt.decode(
                token, 
                self.secret, 
                algorithms=[self.algorithm],
                audience=audience,
                options=options
            )
        except jwt.ExpiredSignatureError:
            raise SecurityError("Токен истёк")
        except jwt.InvalidSignatureError:
            raise SecurityError("Недействительная подпись")
        except jwt.InvalidTokenError:
            raise SecurityError("Недействительный токен")

Распространенные сценарии использования

Система аутентификации с refresh токенами

class RefreshTokenSystem:
    def __init__(self, secret: str):
        self.secret = secret
        self.access_token_expire = datetime.timedelta(minutes=15)
        self.refresh_token_expire = datetime.timedelta(days=30)
    
    def create_token_pair(self, user_id: int):
        """Создание пары access и refresh токенов"""
        now = datetime.datetime.utcnow()
        
        # Access токен (короткий срок жизни)
        access_payload = {
            "sub": str(user_id),
            "type": "access",
            "exp": now + self.access_token_expire,
            "iat": now
        }
        access_token = jwt.encode(access_payload, self.secret, algorithm="HS256")
        
        # Refresh токен (длинный срок жизни)
        refresh_payload = {
            "sub": str(user_id),
            "type": "refresh",
            "exp": now + self.refresh_token_expire,
            "iat": now
        }
        refresh_token = jwt.encode(refresh_payload, self.secret, algorithm="HS256")
        
        return {
            "access_token": access_token,
            "refresh_token": refresh_token,
            "token_type": "Bearer",
            "expires_in": int(self.access_token_expire.total_seconds())
        }
    
    def refresh_access_token(self, refresh_token: str):
        """Обновление access токена с помощью refresh токена"""
        try:
            payload = jwt.decode(refresh_token, self.secret, algorithms=["HS256"])
            
            if payload.get("type") != "refresh":
                raise ValueError("Не refresh токен")
            
            user_id = int(payload["sub"])
            return self.create_token_pair(user_id)
            
        except jwt.ExpiredSignatureError:
            raise ValueError("Refresh токен истёк")
        except jwt.InvalidTokenError:
            raise ValueError("Недействительный refresh токен")

Многоуровневая авторизация

class RoleBasedJWT:
    def __init__(self, secret: str):
        self.secret = secret
        self.role_hierarchy = {
            "admin": ["admin", "moderator", "user"],
            "moderator": ["moderator", "user"],
            "user": ["user"]
        }
    
    def create_token_with_roles(self, user_id: int, roles: List[str], permissions: List[str]):
        payload = {
            "sub": str(user_id),
            "roles": roles,
            "permissions": permissions,
            "exp": datetime.datetime.utcnow() + datetime.timedelta(hours=1)
        }
        return jwt.encode(payload, self.secret, algorithm="HS256")
    
    def check_permission(self, token: str, required_role: str = None, required_permission: str = None):
        try:
            payload = jwt.decode(token, self.secret, algorithms=["HS256"])
            user_roles = payload.get("roles", [])
            user_permissions = payload.get("permissions", [])
            
            # Проверка роли
            if required_role:
                allowed_roles = self.role_hierarchy.get(required_role, [])
                if not any(role in allowed_roles for role in user_roles):
                    return False
            
            # Проверка разрешения
            if required_permission and required_permission not in user_permissions:
                return False
            
            return True
            
        except jwt.InvalidTokenError:
            return False

Интеграция с базами данных

Хранение токенов в Redis

import redis
import json

class RedisTokenStore:
    def __init__(self, redis_client: redis.Redis, secret: str):
        self.redis = redis_client
        self.secret = secret
    
    def store_token(self, user_id: int, token: str, expires_in: int):
        """Сохранение токена в Redis"""
        key = f"user_token:{user_id}"
        self.redis.setex(key, expires_in, token)
    
    def get_token(self, user_id: int) -> str:
        """Получение токена из Redis"""
        key = f"user_token:{user_id}"
        token = self.redis.get(key)
        return token.decode() if token else None
    
    def revoke_token(self, user_id: int):
        """Отзыв токена"""
        key = f"user_token:{user_id}"
        self.redis.delete(key)
    
    def create_and_store_token(self, user_id: int, payload: dict, expires_in: int = 3600):
        """Создание и сохранение токена"""
        payload.update({
            "exp": datetime.datetime.utcnow() + datetime.timedelta(seconds=expires_in),
            "sub": str(user_id)
        })
        
        token = jwt.encode(payload, self.secret, algorithm="HS256")
        self.store_token(user_id, token, expires_in)
        return token

Интеграция с SQLAlchemy

from sqlalchemy import Column, Integer, String, DateTime, Text
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker

Base = declarative_base()

class TokenBlacklist(Base):
    __tablename__ = "token_blacklist"
    
    id = Column(Integer, primary_key=True)
    jti = Column(String(36), unique=True, nullable=False)
    user_id = Column(Integer, nullable=False)
    created_at = Column(DateTime, default=datetime.datetime.utcnow)
    expires_at = Column(DateTime, nullable=False)

class DatabaseTokenManager:
    def __init__(self, db_session, secret: str):
        self.db = db_session
        self.secret = secret
    
    def create_token_with_jti(self, user_id: int, payload: dict):
        """Создание токена с JTI для отслеживания"""
        jti = str(uuid.uuid4())
        payload.update({
            "jti": jti,
            "sub": str(user_id),
            "exp": datetime.datetime.utcnow() + datetime.timedelta(hours=1)
        })
        
        token = jwt.encode(payload, self.secret, algorithm="HS256")
        return token, jti
    
    def blacklist_token(self, token: str):
        """Добавление токена в черный список"""
        try:
            payload = jwt.decode(token, self.secret, algorithms=["HS256"])
            jti = payload.get("jti")
            user_id = payload.get("sub")
            expires_at = datetime.datetime.fromtimestamp(payload.get("exp"))
            
            if jti:
                blacklisted_token = TokenBlacklist(
                    jti=jti,
                    user_id=int(user_id),
                    expires_at=expires_at
                )
                self.db.add(blacklisted_token)
                self.db.commit()
                
        except jwt.InvalidTokenError:
            pass
    
    def is_token_blacklisted(self, token: str) -> bool:
        """Проверка, находится ли токен в черном списке"""
        try:
            payload = jwt.decode(token, self.secret, algorithms=["HS256"])
            jti = payload.get("jti")
            
            if jti:
                return self.db.query(TokenBlacklist).filter_by(jti=jti).first() is not None
            return False
            
        except jwt.InvalidTokenError:
            return True

Мониторинг и логирование

Система логирования JWT операций

import logging
import json
from datetime import datetime

class JWTLogger:
    def __init__(self, logger_name: str = "jwt_operations"):
        self.logger = logging.getLogger(logger_name)
        self.logger.setLevel(logging.INFO)
        
        # Создаем обработчик для записи в файл
        handler = logging.FileHandler("jwt_operations.log")
        formatter = logging.Formatter(
            '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
        )
        handler.setFormatter(formatter)
        self.logger.addHandler(handler)
    
    def log_token_creation(self, user_id: int, token_type: str, expires_in: int):
        self.logger.info(json.dumps({
            "operation": "token_created",
            "user_id": user_id,
            "token_type": token_type,
            "expires_in": expires_in,
            "timestamp": datetime.utcnow().isoformat()
        }))
    
    def log_token_verification(self, user_id: int, success: bool, error: str = None):
        log_data = {
            "operation": "token_verified",
            "user_id": user_id,
            "success": success,
            "timestamp": datetime.utcnow().isoformat()
        }
        if error:
            log_data["error"] = error
        
        if success:
            self.logger.info(json.dumps(log_data))
        else:
            self.logger.warning(json.dumps(log_data))
    
    def log_token_revocation(self, user_id: int, reason: str):
        self.logger.info(json.dumps({
            "operation": "token_revoked",
            "user_id": user_id,
            "reason": reason,
            "timestamp": datetime.utcnow().isoformat()
        }))

Метрики и мониторинг

from dataclasses import dataclass
from typing import Dict
import time

@dataclass
class JWTMetrics:
    tokens_created: int = 0
    tokens_verified: int = 0
    verification_failures: int = 0
    expired_tokens: int = 0
    invalid_signatures: int = 0
    
    def to_dict(self) -> Dict[str, int]:
        return {
            "tokens_created": self.tokens_created,
            "tokens_verified": self.tokens_verified,
            "verification_failures": self.verification_failures,
            "expired_tokens": self.expired_tokens,
            "invalid_signatures": self.invalid_signatures
        }

class MonitoredJWTHandler:
    def __init__(self, secret: str):
        self.secret = secret
        self.metrics = JWTMetrics()
        self.logger = JWTLogger()
    
    def create_token(self, payload: dict) -> str:
        start_time = time.time()
        
        token = jwt.encode(payload, self.secret, algorithm="HS256")
        
        # Обновляем метрики
        self.metrics.tokens_created += 1
        
        # Логируем операцию
        user_id = payload.get("sub", "unknown")
        expires_in = payload.get("exp", 0) - time.time() if payload.get("exp") else 0
        self.logger.log_token_creation(user_id, "access", int(expires_in))
        
        return token
    
    def verify_token(self, token: str) -> dict:
        try:
            payload = jwt.decode(token, self.secret, algorithms=["HS256"])
            
            # Обновляем метрики
            self.metrics.tokens_verified += 1
            
            # Логируем успешную проверку
            user_id = payload.get("sub", "unknown")
            self.logger.log_token_verification(user_id, True)
            
            return payload
            
        except jwt.ExpiredSignatureError:
            self.metrics.expired_tokens += 1
            self.metrics.verification_failures += 1
            self.logger.log_token_verification("unknown", False, "expired")
            raise
        except jwt.InvalidSignatureError:
            self.metrics.invalid_signatures += 1
            self.metrics.verification_failures += 1
            self.logger.log_token_verification("unknown", False, "invalid_signature")
            raise
        except jwt.InvalidTokenError:
            self.metrics.verification_failures += 1
            self.logger.log_token_verification("unknown", False, "invalid_token")
            raise
    
    def get_metrics(self) -> Dict[str, int]:
        return self.metrics.to_dict()

Часто задаваемые вопросы

Что такое PyJWT и для чего он используется?

PyJWT — это Python-библиотека для создания, кодирования и декодирования JWT-токенов. Она используется для аутентификации пользователей, авторизации API и безопасной передачи данных между клиентом и сервером.

Безопасно ли использовать JWT в продакшене?

Да, JWT безопасно использовать в продакшене при соблюдении следующих рекомендаций:

  • Используйте сильные секретные ключи
  • Устанавливайте короткий срок действия токенов
  • Передавайте токены только по HTTPS
  • Не храните чувствительные данные в payload

Какой алгоритм лучше использовать: HS256 или RS256?

  • HS256 подходит для простых приложений с одним сервером, где секретный ключ может быть безопасно распределен
  • RS256 лучше для распределенных систем, где несколько сервисов должны проверять токены, но только один может их создавать

Где и как безопасно хранить секретный ключ?

  • Используйте переменные окружения
  • Применяйте менеджеры секретов (AWS Secrets Manager, HashiCorp Vault)
  • Никогда не храните ключи в коде или файлах конфигурации
  • Регулярно меняйте ключи

Можно ли использовать PyJWT с FastAPI или Flask?

Да, PyJWT отлично интегрируется с обоими фреймворками:

  • В Flask можно создавать декораторы для защиты роутов
  • В FastAPI можно использовать dependency injection для проверки токенов

Поддерживает ли PyJWT обновление токенов (refresh tokens)?

PyJWT не имеет встроенной поддержки refresh токенов, но это можно легко реализовать на уровне приложения, создавая пары access/refresh токенов с разными сроками действия.

Как обрабатывать истекшие токены?

try:
    payload = jwt.decode(token, secret, algorithms=["HS256"])
except jwt.ExpiredSignatureError:
    # Токен истёк - запросить новый или перенаправить на авторизацию
    return {"error": "Token expired", "code": "TOKEN_EXPIRED"}

Можно ли отозвать JWT-токен досрочно?

JWT-токены по своей природе stateless, поэтому нельзя отозвать их напрямую. Однако можно:

  • Использовать черные списки токенов в базе данных
  • Реализовать короткий срок действия с refresh токенами
  • Использовать Redis для хранения активных токенов

Как тестировать приложения с JWT?

def test_protected_endpoint():
    # Создаем тестовый токен
    test_payload = {"user_id": 123, "exp": datetime.utcnow() + timedelta(hours=1)}
    test_token = jwt.encode(test_payload, "test-secret", algorithm="HS256")
    
    # Используем токен в запросе
    headers = {"Authorization": f"Bearer {test_token}"}
    response = client.get("/protected", headers=headers)
    
    assert response.status_code == 200

Влияет ли размер payload на производительность?

Да, больший payload означает больший токен, что влияет на:

  • Время передачи по сети
  • Размер HTTP-заголовков
  • Время обработки

Рекомендуется хранить в JWT только необходимые данные.

Заключение

PyJWT — это мощная, гибкая и надежная библиотека для работы с JWT-токенами в Python. Она предоставляет все необходимые инструменты для безопасной аутентификации и авторизации в современных веб-приложениях.

Основные преимущества PyJWT:

  • Простота использования и интуитивный API
  • Полная поддержка стандарта RFC 7519
  • Широкий выбор алгоритмов шифрования
  • Активное сообщество и регулярные обновления
  • Отличная совместимость с популярными веб-фреймворками

При использовании PyJWT в продакшене обязательно следуйте рекомендациям по безопасности: используйте сильные ключи, устанавливайте срок действия токенов, передавайте данные только по HTTPS и регулярно обновляйте библиотеку.

Библиотека PyJWT станет незаменимым инструментом для разработки современных API, микросервисов и веб-приложений, требующих надежной системы аутентификации.

Новости