Введение в PyJWT
JSON Web Token (JWT) стал стандартом в сфере аутентификации и авторизации API. Он позволяет передавать удостоверяющую информацию между клиентом и сервером безопасно и без необходимости сохранять сессии на стороне сервера. Одной из наиболее популярных библиотек в Python для работы с JWT является PyJWT.
PyJWT – это Python-библиотека для работы с JWT-токенами, которая упрощает создание, декодирование и проверку JWT, обеспечивая при этом поддержку различных алгоритмов шифрования. Эта статья подробно объяснит, как эффективно и безопасно использовать PyJWT в вашем проекте.
Основы JWT (JSON Web Token)
Структура JWT-токена
JWT состоит из трех частей, разделенных точками:
header.payload.signature
- Header — содержит тип токена и используемый алгоритм (например, HS256)
- Payload — содержит claims (требования), то есть полезные данные
- Signature — используется для проверки подлинности токена
Преимущества использования JWT
- Stateless: серверу не нужно хранить сессии
- Компактность: легко передается в HTTP-заголовках
- Безопасность: поддержка HMAC и RSA шифрования
- Кроссплатформенность: работает на всех языках программирования
- Самодостаточность: содержит всю необходимую информацию о пользователе
Принцип работы JWT
JWT работает по следующему принципу:
- Пользователь аутентифицируется на сервере
- Сервер генерирует JWT-токен с информацией о пользователе
- Клиент сохраняет токен и отправляет его в заголовке Authorization
- Сервер проверяет подпись токена и извлекает данные пользователя
Установка и настройка PyJWT
Как установить PyJWT
pip install PyJWT
Для работы с алгоритмами RSA и ECDSA установите дополнительные зависимости:
pip install "PyJWT[crypto]"
Импорт и начальная настройка
import jwt
import datetime
import os
from typing import Dict, Any
PyJWT полностью реализует стандарт RFC 7519 и предоставляет простой интерфейс для генерации и верификации токенов.
Детальное описание библиотеки PyJWT
История и развитие
PyJWT была создана в 2013 году и является одной из самых популярных библиотек для работы с JWT-токенами в Python. Библиотека активно развивается и поддерживается сообществом, регулярно обновляется для соответствия последним стандартам безопасности.
Архитектура библиотеки
PyJWT построена на модульной архитектуре с четким разделением ответственности:
- Ядро кодирования/декодирования — основные функции для работы с токенами
- Модуль алгоритмов — поддержка различных алгоритмов шифрования
- Модуль исключений — обработка ошибок и исключительных ситуаций
- Модуль утилит — вспомогательные функции для работы с временем и ключами
Основные возможности
- Поддержка всех стандартных алгоритмов JWT
- Автоматическая проверка времени жизни токена
- Возможность добавления пользовательских claims
- Поддержка различных типов ключей (симметричные и асимметричные)
- Интеграция с популярными веб-фреймворками
Создание JWT-токенов с помощью PyJWT
Базовый пример
payload = {"user_id": 123}
secret = "mysecret"
token = jwt.encode(payload, secret, algorithm="HS256")
print(token)
Указание срока действия и дополнительных данных
payload = {
"user_id": 123,
"username": "admin",
"role": "administrator",
"exp": datetime.datetime.utcnow() + datetime.timedelta(hours=1),
"iat": datetime.datetime.utcnow(),
"iss": "myapp"
}
token = jwt.encode(payload, secret, algorithm="HS256")
exp означает expiration — срок действия токена, который крайне рекомендуется использовать для обеспечения безопасности.
Создание токена с пользовательскими заголовками
headers = {
"kid": "key-id-001",
"typ": "JWT"
}
payload = {
"user_id": 123,
"exp": datetime.datetime.utcnow() + datetime.timedelta(hours=1)
}
token = jwt.encode(payload, secret, algorithm="HS256", headers=headers)
Проверка и декодирование JWT-токенов
Как декодировать токен
try:
decoded = jwt.decode(token, secret, algorithms=["HS256"])
print(decoded)
except jwt.InvalidTokenError:
print("Токен недействителен")
Обработка ошибок и истечения срока действия
try:
decoded = jwt.decode(token, secret, algorithms=["HS256"])
print(f"Пользователь: {decoded['user_id']}")
except jwt.ExpiredSignatureError:
print("Срок действия токена истёк")
except jwt.InvalidSignatureError:
print("Недействительная подпись токена")
except jwt.InvalidTokenError:
print("Недействительный токен")
Декодирование с дополнительными опциями
options = {
"verify_exp": True,
"verify_signature": True,
"verify_aud": False,
"verify_iss": False
}
decoded = jwt.decode(token, secret, algorithms=["HS256"], options=options)
Алгоритмы шифрования в PyJWT
Поддерживаемые алгоритмы
PyJWT поддерживает следующие алгоритмы:
- HMAC: HS256, HS384, HS512
- RSA: RS256, RS384, RS512
- ECDSA: ES256, ES384, ES512
- EdDSA: EdDSA
- RSA-PSS: PS256, PS384, PS512
Как выбрать нужный алгоритм
- Для простых приложений: HS256 (секретный ключ)
- Для распределенных систем: RS256 (публичный/приватный ключ)
- Для максимальной безопасности: ES256 (эллиптическая криптография)
Примеры использования разных алгоритмов
# HS256 - симметричное шифрование
payload = {"user_id": 123}
secret = "my-secret-key"
token_hs256 = jwt.encode(payload, secret, algorithm="HS256")
# RS256 - асимметричное шифрование
with open("private_key.pem", "r") as f:
private_key = f.read()
token_rs256 = jwt.encode(payload, private_key, algorithm="RS256")
# ES256 - эллиптическая криптография
with open("ec_private_key.pem", "r") as f:
ec_private_key = f.read()
token_es256 = jwt.encode(payload, ec_private_key, algorithm="ES256")
Таблица методов и функций PyJWT
| Метод/Функция | Описание | Параметры | Возвращаемое значение |
|---|---|---|---|
jwt.encode() |
Создает JWT-токен | payload, key, algorithm, headers | строка (JWT-токен) |
jwt.decode() |
Декодирует JWT-токен | token, key, algorithms, options | словарь (payload) |
jwt.get_unverified_header() |
Получает заголовки без проверки | token | словарь (header) |
jwt.get_unverified_claims() |
Получает payload без проверки | token | словарь (claims) |
jwt.decode_complete() |
Полное декодирование токена | token, key, algorithms, options | словарь (header + payload) |
jwt.get_algorithm_by_name() |
Получает алгоритм по имени | algorithm_name | объект алгоритма |
jwt.register_algorithm() |
Регистрирует новый алгоритм | algorithm_name, algorithm_obj | None |
jwt.unregister_algorithm() |
Удаляет алгоритм | algorithm_name | None |
Работа с секретами и ключами
Безопасность хранения ключей
Никогда не храните ключи в открытом виде в коде. Используйте переменные окружения и менеджеры секретов:
import os
from cryptography.hazmat.primitives import serialization
# Загрузка из переменных окружения
secret = os.environ.get("JWT_SECRET")
if not secret:
raise ValueError("JWT_SECRET не установлен")
# Загрузка приватного ключа
def load_private_key(key_path: str, password: str = None):
with open(key_path, "rb") as key_file:
private_key = serialization.load_pem_private_key(
key_file.read(),
password=password.encode() if password else None,
)
return private_key
Использование публичных и приватных ключей
# Генерация ключей RSA
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.hazmat.primitives import serialization
# Генерация приватного ключа
private_key = rsa.generate_private_key(
public_exponent=65537,
key_size=2048,
)
# Получение публичного ключа
public_key = private_key.public_key()
# Сериализация ключей
private_pem = private_key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=serialization.NoEncryption()
)
public_pem = public_key.public_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PublicFormat.SubjectPublicKeyInfo
)
# Использование для JWT
payload = {"user_id": 123, "exp": datetime.datetime.utcnow() + datetime.timedelta(hours=1)}
token = jwt.encode(payload, private_pem, algorithm="RS256")
decoded = jwt.decode(token, public_pem, algorithms=["RS256"])
Ротация ключей
class KeyManager:
def __init__(self):
self.keys = {}
self.current_key_id = "key-001"
def add_key(self, key_id: str, key: str):
self.keys[key_id] = key
def get_key(self, key_id: str):
return self.keys.get(key_id)
def create_token(self, payload: dict):
headers = {"kid": self.current_key_id}
return jwt.encode(
payload,
self.keys[self.current_key_id],
algorithm="HS256",
headers=headers
)
def verify_token(self, token: str):
unverified_header = jwt.get_unverified_header(token)
key_id = unverified_header.get("kid")
key = self.get_key(key_id)
if not key:
raise jwt.InvalidTokenError("Неизвестный ключ")
return jwt.decode(token, key, algorithms=["HS256"])
Интеграция PyJWT в веб-приложения
Пример с Flask
from flask import Flask, request, jsonify
import jwt
import datetime
from functools import wraps
app = Flask(__name__)
SECRET_KEY = "your-secret-key"
def token_required(f):
@wraps(f)
def decorated(*args, **kwargs):
token = request.headers.get('Authorization')
if not token:
return jsonify({'message': 'Токен отсутствует'}), 401
try:
token = token.split(' ')[1] # Убираем "Bearer "
data = jwt.decode(token, SECRET_KEY, algorithms=["HS256"])
current_user = data['user_id']
except jwt.ExpiredSignatureError:
return jsonify({'message': 'Токен истёк'}), 401
except jwt.InvalidTokenError:
return jsonify({'message': 'Недействительный токен'}), 401
return f(current_user, *args, **kwargs)
return decorated
@app.route("/login", methods=["POST"])
def login():
# Здесь должна быть логика проверки пользователя
user_id = request.json.get('user_id')
payload = {
"user_id": user_id,
"exp": datetime.datetime.utcnow() + datetime.timedelta(hours=1),
"iat": datetime.datetime.utcnow()
}
token = jwt.encode(payload, SECRET_KEY, algorithm="HS256")
return jsonify({"token": token})
@app.route("/protected")
@token_required
def protected(current_user):
return jsonify({
"message": "Доступ разрешён",
"user_id": current_user
})
Пример с FastAPI
from fastapi import FastAPI, HTTPException, Depends
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
import jwt
import datetime
from typing import Optional
app = FastAPI()
security = HTTPBearer()
SECRET_KEY = "your-secret-key"
ALGORITHM = "HS256"
def create_access_token(data: dict, expires_delta: Optional[datetime.timedelta] = None):
to_encode = data.copy()
if expires_delta:
expire = datetime.datetime.utcnow() + expires_delta
else:
expire = datetime.datetime.utcnow() + datetime.timedelta(minutes=15)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
def verify_token(credentials: HTTPAuthorizationCredentials = Depends(security)):
try:
payload = jwt.decode(credentials.credentials, SECRET_KEY, algorithms=[ALGORITHM])
user_id: str = payload.get("sub")
if user_id is None:
raise HTTPException(status_code=401, detail="Недействительный токен")
return user_id
except jwt.ExpiredSignatureError:
raise HTTPException(status_code=401, detail="Токен истёк")
except jwt.InvalidTokenError:
raise HTTPException(status_code=401, detail="Недействительный токен")
@app.post("/login")
async def login(user_id: str):
access_token = create_access_token(data={"sub": user_id})
return {"access_token": access_token, "token_type": "bearer"}
@app.get("/protected")
async def protected_route(current_user: str = Depends(verify_token)):
return {"message": f"Добро пожаловать, пользователь {current_user}"}
Использование middleware для аутентификации
from fastapi import FastAPI, Request, HTTPException
from fastapi.middleware.base import BaseHTTPMiddleware
import jwt
class JWTMiddleware(BaseHTTPMiddleware):
def __init__(self, app, secret_key: str):
super().__init__(app)
self.secret_key = secret_key
self.protected_paths = ["/api/protected", "/api/admin"]
async def dispatch(self, request: Request, call_next):
path = request.url.path
if path in self.protected_paths:
auth_header = request.headers.get("Authorization")
if not auth_header:
raise HTTPException(status_code=401, detail="Токен отсутствует")
try:
token = auth_header.split(" ")[1]
payload = jwt.decode(token, self.secret_key, algorithms=["HS256"])
request.state.user = payload
except jwt.InvalidTokenError:
raise HTTPException(status_code=401, detail="Недействительный токен")
response = await call_next(request)
return response
app = FastAPI()
app.add_middleware(JWTMiddleware, secret_key="your-secret-key")
Обработка ошибок и исключений
Типичные ошибки PyJWT
PyJWT предоставляет иерархию исключений для различных типов ошибок:
try:
decoded = jwt.decode(token, secret, algorithms=["HS256"])
except jwt.ExpiredSignatureError:
# Токен истёк
print("Срок действия токена истёк")
except jwt.InvalidSignatureError:
# Неверная подпись
print("Подпись токена недействительна")
except jwt.InvalidAudienceError:
# Неверная аудитория
print("Токен предназначен для другой аудитории")
except jwt.InvalidIssuerError:
# Неверный издатель
print("Токен выдан неизвестным издателем")
except jwt.InvalidKeyError:
# Неверный ключ
print("Ключ недействителен")
except jwt.InvalidTokenError:
# Базовое исключение для всех ошибок
print("Токен недействителен")
except jwt.DecodeError:
# Ошибка декодирования
print("Не удалось декодировать токен")
Создание пользовательского обработчика ошибок
class JWTErrorHandler:
@staticmethod
def handle_jwt_error(error: Exception) -> tuple:
if isinstance(error, jwt.ExpiredSignatureError):
return {"error": "Токен истёк", "code": "TOKEN_EXPIRED"}, 401
elif isinstance(error, jwt.InvalidSignatureError):
return {"error": "Недействительная подпись", "code": "INVALID_SIGNATURE"}, 401
elif isinstance(error, jwt.InvalidTokenError):
return {"error": "Недействительный токен", "code": "INVALID_TOKEN"}, 401
else:
return {"error": "Неизвестная ошибка", "code": "UNKNOWN_ERROR"}, 500
# Использование
def verify_token_with_error_handling(token: str, secret: str):
try:
return jwt.decode(token, secret, algorithms=["HS256"])
except Exception as e:
error_response, status_code = JWTErrorHandler.handle_jwt_error(e)
raise HTTPException(status_code=status_code, detail=error_response)
Расширенные возможности PyJWT
Пользовательские payload и claims
def create_comprehensive_token(user_data: dict, permissions: list, expires_in: int = 3600):
payload = {
# Стандартные claims
"sub": str(user_data["id"]), # Subject
"iss": "myapp.com", # Issuer
"aud": "myapp-users", # Audience
"exp": datetime.datetime.utcnow() + datetime.timedelta(seconds=expires_in),
"iat": datetime.datetime.utcnow(), # Issued at
"nbf": datetime.datetime.utcnow(), # Not before
"jti": str(uuid.uuid4()), # JWT ID
# Пользовательские claims
"username": user_data["username"],
"email": user_data["email"],
"role": user_data["role"],
"permissions": permissions,
"last_login": user_data.get("last_login"),
"ip_address": user_data.get("ip_address"),
"device_id": user_data.get("device_id")
}
return jwt.encode(payload, SECRET_KEY, algorithm="HS256")
Валидация пользовательских claims
def validate_token_claims(token: str, secret: str, required_permissions: list = None):
try:
payload = jwt.decode(token, secret, algorithms=["HS256"])
# Проверка обязательных полей
required_fields = ["sub", "username", "role"]
for field in required_fields:
if field not in payload:
raise ValueError(f"Отсутствует обязательное поле: {field}")
# Проверка прав доступа
if required_permissions:
user_permissions = payload.get("permissions", [])
if not all(perm in user_permissions for perm in required_permissions):
raise ValueError("Недостаточно прав доступа")
return payload
except jwt.InvalidTokenError as e:
raise ValueError(f"Недействительный токен: {str(e)}")
Работа с несколькими алгоритмами
class MultiAlgorithmJWT:
def __init__(self):
self.algorithms = {
"HS256": {"secret": "hmac-secret"},
"RS256": {
"private_key": self.load_private_key("private.pem"),
"public_key": self.load_public_key("public.pem")
}
}
def encode_token(self, payload: dict, algorithm: str = "HS256"):
if algorithm == "HS256":
key = self.algorithms[algorithm]["secret"]
elif algorithm == "RS256":
key = self.algorithms[algorithm]["private_key"]
else:
raise ValueError(f"Неподдерживаемый алгоритм: {algorithm}")
return jwt.encode(payload, key, algorithm=algorithm)
def decode_token(self, token: str):
# Получаем алгоритм из заголовка
header = jwt.get_unverified_header(token)
algorithm = header.get("alg")
if algorithm == "HS256":
key = self.algorithms[algorithm]["secret"]
elif algorithm == "RS256":
key = self.algorithms[algorithm]["public_key"]
else:
raise ValueError(f"Неподдерживаемый алгоритм: {algorithm}")
return jwt.decode(token, key, algorithms=[algorithm])
Тестирование токенов с PyJWT
Базовые тесты
import unittest
from unittest.mock import patch
import jwt
import datetime
class TestJWTTokens(unittest.TestCase):
def setUp(self):
self.secret = "test-secret"
self.payload = {"user_id": 123, "username": "testuser"}
def test_token_creation(self):
token = jwt.encode(self.payload, self.secret, algorithm="HS256")
self.assertIsInstance(token, str)
self.assertTrue(len(token) > 0)
def test_token_decoding(self):
token = jwt.encode(self.payload, self.secret, algorithm="HS256")
decoded = jwt.decode(token, self.secret, algorithms=["HS256"])
self.assertEqual(decoded["user_id"], 123)
self.assertEqual(decoded["username"], "testuser")
def test_expired_token(self):
expired_payload = {
"user_id": 123,
"exp": datetime.datetime.utcnow() - datetime.timedelta(seconds=1)
}
token = jwt.encode(expired_payload, self.secret, algorithm="HS256")
with self.assertRaises(jwt.ExpiredSignatureError):
jwt.decode(token, self.secret, algorithms=["HS256"])
def test_invalid_signature(self):
token = jwt.encode(self.payload, self.secret, algorithm="HS256")
with self.assertRaises(jwt.InvalidSignatureError):
jwt.decode(token, "wrong-secret", algorithms=["HS256"])
@patch('datetime.datetime')
def test_token_with_mocked_time(self, mock_datetime):
# Мокаем время для тестирования
mock_datetime.utcnow.return_value = datetime.datetime(2023, 1, 1, 12, 0, 0)
payload = {
"user_id": 123,
"exp": datetime.datetime(2023, 1, 1, 13, 0, 0) # Истекает через час
}
token = jwt.encode(payload, self.secret, algorithm="HS256")
decoded = jwt.decode(token, self.secret, algorithms=["HS256"])
self.assertEqual(decoded["user_id"], 123)
Тесты интеграции с Flask
import unittest
from flask import Flask
import json
class TestFlaskJWTIntegration(unittest.TestCase):
def setUp(self):
self.app = Flask(__name__)
self.app.config['SECRET_KEY'] = 'test-secret'
self.client = self.app.test_client()
# Добавляем роуты для тестирования
@self.app.route('/login', methods=['POST'])
def login():
token = jwt.encode({'user_id': 123}, 'test-secret', algorithm='HS256')
return {'token': token}
@self.app.route('/protected')
def protected():
auth_header = request.headers.get('Authorization')
if not auth_header:
return {'error': 'No token'}, 401
token = auth_header.split(' ')[1]
try:
payload = jwt.decode(token, 'test-secret', algorithms=['HS256'])
return {'user_id': payload['user_id']}
except jwt.InvalidTokenError:
return {'error': 'Invalid token'}, 401
def test_login_endpoint(self):
response = self.client.post('/login')
self.assertEqual(response.status_code, 200)
data = json.loads(response.data)
self.assertIn('token', data)
def test_protected_endpoint_with_token(self):
# Получаем токен
login_response = self.client.post('/login')
token = json.loads(login_response.data)['token']
# Используем токен для доступа к защищённому эндпоинту
headers = {'Authorization': f'Bearer {token}'}
response = self.client.get('/protected', headers=headers)
self.assertEqual(response.status_code, 200)
data = json.loads(response.data)
self.assertEqual(data['user_id'], 123)
def test_protected_endpoint_without_token(self):
response = self.client.get('/protected')
self.assertEqual(response.status_code, 401)
Производительность и оптимизация
Бенчмарки PyJWT
import time
import jwt
from typing import List
def benchmark_jwt_operations(iterations: int = 10000):
secret = "benchmark-secret"
payload = {"user_id": 123, "role": "user"}
# Тест создания токенов
start_time = time.time()
tokens = []
for _ in range(iterations):
token = jwt.encode(payload, secret, algorithm="HS256")
tokens.append(token)
encode_time = time.time() - start_time
# Тест декодирования токенов
start_time = time.time()
for token in tokens:
decoded = jwt.decode(token, secret, algorithms=["HS256"])
decode_time = time.time() - start_time
print(f"Создание {iterations} токенов: {encode_time:.2f}s")
print(f"Декодирование {iterations} токенов: {decode_time:.2f}s")
print(f"Создание токенов в секунду: {iterations/encode_time:.0f}")
print(f"Декодирование токенов в секунду: {iterations/decode_time:.0f}")
# Запуск бенчмарка
benchmark_jwt_operations()
Оптимизация производительности
class OptimizedJWTHandler:
def __init__(self, secret: str):
self.secret = secret
self.algorithm = "HS256"
# Кэширование часто используемых данных
self._token_cache = {}
self._cache_ttl = 300 # 5 минут
def encode_with_cache(self, payload: dict, cache_key: str = None):
if cache_key and cache_key in self._token_cache:
cached_token, timestamp = self._token_cache[cache_key]
if time.time() - timestamp < self._cache_ttl:
return cached_token
token = jwt.encode(payload, self.secret, algorithm=self.algorithm)
if cache_key:
self._token_cache[cache_key] = (token, time.time())
return token
def batch_encode(self, payloads: List[dict]) -> List[str]:
"""Пакетное создание токенов для улучшения производительности"""
return [
jwt.encode(payload, self.secret, algorithm=self.algorithm)
for payload in payloads
]
def verify_with_cache(self, token: str):
"""Проверка токена с кэшированием результата"""
token_hash = hash(token)
if token_hash in self._token_cache:
cached_result, timestamp = self._token_cache[token_hash]
if time.time() - timestamp < 60: # Кэш на 1 минуту
return cached_result
try:
decoded = jwt.decode(token, self.secret, algorithms=[self.algorithm])
self._token_cache[token_hash] = (decoded, time.time())
return decoded
except jwt.InvalidTokenError:
return None
Сравнение PyJWT с другими библиотеками
Сравнительная таблица
| Библиотека | Особенности | Плюсы | Минусы |
|---|---|---|---|
| PyJWT | Простота, гибкость, активная поддержка | Легкость использования, хорошая документация | Нет встроенной интеграции с веб-фреймворками |
| Authlib | Расширенные возможности OAuth2 | Полная поддержка OAuth2/OpenID Connect | Более сложная настройка |
| python-jose | Расширенная криптография | Поддержка JWE, JWS, JWK | Сложность интеграции |
| Flask-JWT-Extended | Интеграция с Flask | Готовые декораторы, refresh tokens | Привязка к Flask |
Миграция с других библиотек
# Миграция с python-jose
from jose import jwt as jose_jwt
import jwt as pyjwt
def migrate_from_jose():
# Старый код с python-jose
# token = jose_jwt.encode(payload, secret, algorithm="HS256")
# Новый код с PyJWT
token = pyjwt.encode(payload, secret, algorithm="HS256")
# Основные различия:
# 1. PyJWT возвращает строку, а не bytes
# 2. Немного другая обработка ошибок
# 3. Некоторые опции могут отличаться
Безопасность и рекомендации
Частые уязвимости и способы их предотвращения
# Плохо: отсутствие exp
payload = {"user_id": 123}
token = jwt.encode(payload, secret, algorithm="HS256")
# Хорошо: с указанием срока действия
payload = {
"user_id": 123,
"exp": datetime.datetime.utcnow() + datetime.timedelta(hours=1)
}
token = jwt.encode(payload, secret, algorithm="HS256")
# Плохо: слабый ключ
secret = "123"
# Хорошо: сильный ключ
import secrets
secret = secrets.token_urlsafe(32)
# Плохо: не проверяем алгоритм
decoded = jwt.decode(token, secret)
# Хорошо: явно указываем алгоритмы
decoded = jwt.decode(token, secret, algorithms=["HS256"])
Лучшие практики безопасности
class SecureJWTHandler:
def __init__(self, secret: str):
self.secret = secret
self.algorithm = "HS256"
self.max_token_age = datetime.timedelta(hours=1)
def create_secure_token(self, user_id: int, additional_claims: dict = None):
"""Создание безопасного токена с обязательными полями"""
now = datetime.datetime.utcnow()
payload = {
"sub": str(user_id),
"iat": now,
"exp": now + self.max_token_age,
"nbf": now,
"jti": str(uuid.uuid4()),
"iss": "myapp.com"
}
if additional_claims:
payload.update(additional_claims)
return jwt.encode(payload, self.secret, algorithm=self.algorithm)
def verify_secure_token(self, token: str, audience: str = None):
"""Безопасная проверка токена"""
try:
options = {
"verify_signature": True,
"verify_exp": True,
"verify_nbf": True,
"verify_iat": True,
"verify_aud": bool(audience)
}
return jwt.decode(
token,
self.secret,
algorithms=[self.algorithm],
audience=audience,
options=options
)
except jwt.ExpiredSignatureError:
raise SecurityError("Токен истёк")
except jwt.InvalidSignatureError:
raise SecurityError("Недействительная подпись")
except jwt.InvalidTokenError:
raise SecurityError("Недействительный токен")
Распространенные сценарии использования
Система аутентификации с refresh токенами
class RefreshTokenSystem:
def __init__(self, secret: str):
self.secret = secret
self.access_token_expire = datetime.timedelta(minutes=15)
self.refresh_token_expire = datetime.timedelta(days=30)
def create_token_pair(self, user_id: int):
"""Создание пары access и refresh токенов"""
now = datetime.datetime.utcnow()
# Access токен (короткий срок жизни)
access_payload = {
"sub": str(user_id),
"type": "access",
"exp": now + self.access_token_expire,
"iat": now
}
access_token = jwt.encode(access_payload, self.secret, algorithm="HS256")
# Refresh токен (длинный срок жизни)
refresh_payload = {
"sub": str(user_id),
"type": "refresh",
"exp": now + self.refresh_token_expire,
"iat": now
}
refresh_token = jwt.encode(refresh_payload, self.secret, algorithm="HS256")
return {
"access_token": access_token,
"refresh_token": refresh_token,
"token_type": "Bearer",
"expires_in": int(self.access_token_expire.total_seconds())
}
def refresh_access_token(self, refresh_token: str):
"""Обновление access токена с помощью refresh токена"""
try:
payload = jwt.decode(refresh_token, self.secret, algorithms=["HS256"])
if payload.get("type") != "refresh":
raise ValueError("Не refresh токен")
user_id = int(payload["sub"])
return self.create_token_pair(user_id)
except jwt.ExpiredSignatureError:
raise ValueError("Refresh токен истёк")
except jwt.InvalidTokenError:
raise ValueError("Недействительный refresh токен")
Многоуровневая авторизация
class RoleBasedJWT:
def __init__(self, secret: str):
self.secret = secret
self.role_hierarchy = {
"admin": ["admin", "moderator", "user"],
"moderator": ["moderator", "user"],
"user": ["user"]
}
def create_token_with_roles(self, user_id: int, roles: List[str], permissions: List[str]):
payload = {
"sub": str(user_id),
"roles": roles,
"permissions": permissions,
"exp": datetime.datetime.utcnow() + datetime.timedelta(hours=1)
}
return jwt.encode(payload, self.secret, algorithm="HS256")
def check_permission(self, token: str, required_role: str = None, required_permission: str = None):
try:
payload = jwt.decode(token, self.secret, algorithms=["HS256"])
user_roles = payload.get("roles", [])
user_permissions = payload.get("permissions", [])
# Проверка роли
if required_role:
allowed_roles = self.role_hierarchy.get(required_role, [])
if not any(role in allowed_roles for role in user_roles):
return False
# Проверка разрешения
if required_permission and required_permission not in user_permissions:
return False
return True
except jwt.InvalidTokenError:
return False
Интеграция с базами данных
Хранение токенов в Redis
import redis
import json
class RedisTokenStore:
def __init__(self, redis_client: redis.Redis, secret: str):
self.redis = redis_client
self.secret = secret
def store_token(self, user_id: int, token: str, expires_in: int):
"""Сохранение токена в Redis"""
key = f"user_token:{user_id}"
self.redis.setex(key, expires_in, token)
def get_token(self, user_id: int) -> str:
"""Получение токена из Redis"""
key = f"user_token:{user_id}"
token = self.redis.get(key)
return token.decode() if token else None
def revoke_token(self, user_id: int):
"""Отзыв токена"""
key = f"user_token:{user_id}"
self.redis.delete(key)
def create_and_store_token(self, user_id: int, payload: dict, expires_in: int = 3600):
"""Создание и сохранение токена"""
payload.update({
"exp": datetime.datetime.utcnow() + datetime.timedelta(seconds=expires_in),
"sub": str(user_id)
})
token = jwt.encode(payload, self.secret, algorithm="HS256")
self.store_token(user_id, token, expires_in)
return token
Интеграция с SQLAlchemy
from sqlalchemy import Column, Integer, String, DateTime, Text
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
Base = declarative_base()
class TokenBlacklist(Base):
__tablename__ = "token_blacklist"
id = Column(Integer, primary_key=True)
jti = Column(String(36), unique=True, nullable=False)
user_id = Column(Integer, nullable=False)
created_at = Column(DateTime, default=datetime.datetime.utcnow)
expires_at = Column(DateTime, nullable=False)
class DatabaseTokenManager:
def __init__(self, db_session, secret: str):
self.db = db_session
self.secret = secret
def create_token_with_jti(self, user_id: int, payload: dict):
"""Создание токена с JTI для отслеживания"""
jti = str(uuid.uuid4())
payload.update({
"jti": jti,
"sub": str(user_id),
"exp": datetime.datetime.utcnow() + datetime.timedelta(hours=1)
})
token = jwt.encode(payload, self.secret, algorithm="HS256")
return token, jti
def blacklist_token(self, token: str):
"""Добавление токена в черный список"""
try:
payload = jwt.decode(token, self.secret, algorithms=["HS256"])
jti = payload.get("jti")
user_id = payload.get("sub")
expires_at = datetime.datetime.fromtimestamp(payload.get("exp"))
if jti:
blacklisted_token = TokenBlacklist(
jti=jti,
user_id=int(user_id),
expires_at=expires_at
)
self.db.add(blacklisted_token)
self.db.commit()
except jwt.InvalidTokenError:
pass
def is_token_blacklisted(self, token: str) -> bool:
"""Проверка, находится ли токен в черном списке"""
try:
payload = jwt.decode(token, self.secret, algorithms=["HS256"])
jti = payload.get("jti")
if jti:
return self.db.query(TokenBlacklist).filter_by(jti=jti).first() is not None
return False
except jwt.InvalidTokenError:
return True
Мониторинг и логирование
Система логирования JWT операций
import logging
import json
from datetime import datetime
class JWTLogger:
def __init__(self, logger_name: str = "jwt_operations"):
self.logger = logging.getLogger(logger_name)
self.logger.setLevel(logging.INFO)
# Создаем обработчик для записи в файл
handler = logging.FileHandler("jwt_operations.log")
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
handler.setFormatter(formatter)
self.logger.addHandler(handler)
def log_token_creation(self, user_id: int, token_type: str, expires_in: int):
self.logger.info(json.dumps({
"operation": "token_created",
"user_id": user_id,
"token_type": token_type,
"expires_in": expires_in,
"timestamp": datetime.utcnow().isoformat()
}))
def log_token_verification(self, user_id: int, success: bool, error: str = None):
log_data = {
"operation": "token_verified",
"user_id": user_id,
"success": success,
"timestamp": datetime.utcnow().isoformat()
}
if error:
log_data["error"] = error
if success:
self.logger.info(json.dumps(log_data))
else:
self.logger.warning(json.dumps(log_data))
def log_token_revocation(self, user_id: int, reason: str):
self.logger.info(json.dumps({
"operation": "token_revoked",
"user_id": user_id,
"reason": reason,
"timestamp": datetime.utcnow().isoformat()
}))
Метрики и мониторинг
from dataclasses import dataclass
from typing import Dict
import time
@dataclass
class JWTMetrics:
tokens_created: int = 0
tokens_verified: int = 0
verification_failures: int = 0
expired_tokens: int = 0
invalid_signatures: int = 0
def to_dict(self) -> Dict[str, int]:
return {
"tokens_created": self.tokens_created,
"tokens_verified": self.tokens_verified,
"verification_failures": self.verification_failures,
"expired_tokens": self.expired_tokens,
"invalid_signatures": self.invalid_signatures
}
class MonitoredJWTHandler:
def __init__(self, secret: str):
self.secret = secret
self.metrics = JWTMetrics()
self.logger = JWTLogger()
def create_token(self, payload: dict) -> str:
start_time = time.time()
token = jwt.encode(payload, self.secret, algorithm="HS256")
# Обновляем метрики
self.metrics.tokens_created += 1
# Логируем операцию
user_id = payload.get("sub", "unknown")
expires_in = payload.get("exp", 0) - time.time() if payload.get("exp") else 0
self.logger.log_token_creation(user_id, "access", int(expires_in))
return token
def verify_token(self, token: str) -> dict:
try:
payload = jwt.decode(token, self.secret, algorithms=["HS256"])
# Обновляем метрики
self.metrics.tokens_verified += 1
# Логируем успешную проверку
user_id = payload.get("sub", "unknown")
self.logger.log_token_verification(user_id, True)
return payload
except jwt.ExpiredSignatureError:
self.metrics.expired_tokens += 1
self.metrics.verification_failures += 1
self.logger.log_token_verification("unknown", False, "expired")
raise
except jwt.InvalidSignatureError:
self.metrics.invalid_signatures += 1
self.metrics.verification_failures += 1
self.logger.log_token_verification("unknown", False, "invalid_signature")
raise
except jwt.InvalidTokenError:
self.metrics.verification_failures += 1
self.logger.log_token_verification("unknown", False, "invalid_token")
raise
def get_metrics(self) -> Dict[str, int]:
return self.metrics.to_dict()
Часто задаваемые вопросы
Что такое PyJWT и для чего он используется?
PyJWT — это Python-библиотека для создания, кодирования и декодирования JWT-токенов. Она используется для аутентификации пользователей, авторизации API и безопасной передачи данных между клиентом и сервером.
Безопасно ли использовать JWT в продакшене?
Да, JWT безопасно использовать в продакшене при соблюдении следующих рекомендаций:
- Используйте сильные секретные ключи
- Устанавливайте короткий срок действия токенов
- Передавайте токены только по HTTPS
- Не храните чувствительные данные в payload
Какой алгоритм лучше использовать: HS256 или RS256?
- HS256 подходит для простых приложений с одним сервером, где секретный ключ может быть безопасно распределен
- RS256 лучше для распределенных систем, где несколько сервисов должны проверять токены, но только один может их создавать
Где и как безопасно хранить секретный ключ?
- Используйте переменные окружения
- Применяйте менеджеры секретов (AWS Secrets Manager, HashiCorp Vault)
- Никогда не храните ключи в коде или файлах конфигурации
- Регулярно меняйте ключи
Можно ли использовать PyJWT с FastAPI или Flask?
Да, PyJWT отлично интегрируется с обоими фреймворками:
- В Flask можно создавать декораторы для защиты роутов
- В FastAPI можно использовать dependency injection для проверки токенов
Поддерживает ли PyJWT обновление токенов (refresh tokens)?
PyJWT не имеет встроенной поддержки refresh токенов, но это можно легко реализовать на уровне приложения, создавая пары access/refresh токенов с разными сроками действия.
Как обрабатывать истекшие токены?
try:
payload = jwt.decode(token, secret, algorithms=["HS256"])
except jwt.ExpiredSignatureError:
# Токен истёк - запросить новый или перенаправить на авторизацию
return {"error": "Token expired", "code": "TOKEN_EXPIRED"}
Можно ли отозвать JWT-токен досрочно?
JWT-токены по своей природе stateless, поэтому нельзя отозвать их напрямую. Однако можно:
- Использовать черные списки токенов в базе данных
- Реализовать короткий срок действия с refresh токенами
- Использовать Redis для хранения активных токенов
Как тестировать приложения с JWT?
def test_protected_endpoint():
# Создаем тестовый токен
test_payload = {"user_id": 123, "exp": datetime.utcnow() + timedelta(hours=1)}
test_token = jwt.encode(test_payload, "test-secret", algorithm="HS256")
# Используем токен в запросе
headers = {"Authorization": f"Bearer {test_token}"}
response = client.get("/protected", headers=headers)
assert response.status_code == 200
Влияет ли размер payload на производительность?
Да, больший payload означает больший токен, что влияет на:
- Время передачи по сети
- Размер HTTP-заголовков
- Время обработки
Рекомендуется хранить в JWT только необходимые данные.
Заключение
PyJWT — это мощная, гибкая и надежная библиотека для работы с JWT-токенами в Python. Она предоставляет все необходимые инструменты для безопасной аутентификации и авторизации в современных веб-приложениях.
Основные преимущества PyJWT:
- Простота использования и интуитивный API
- Полная поддержка стандарта RFC 7519
- Широкий выбор алгоритмов шифрования
- Активное сообщество и регулярные обновления
- Отличная совместимость с популярными веб-фреймворками
При использовании PyJWT в продакшене обязательно следуйте рекомендациям по безопасности: используйте сильные ключи, устанавливайте срок действия токенов, передавайте данные только по HTTPS и регулярно обновляйте библиотеку.
Библиотека PyJWT станет незаменимым инструментом для разработки современных API, микросервисов и веб-приложений, требующих надежной системы аутентификации.
Настоящее и будущее развития ИИ: классической математики уже недостаточно
Эксперты предупредили о рисках фейковой благотворительности с помощью ИИ
В России разработали универсального ИИ-агента для роботов и индустриальных процессов