Introduction to PyJWT
JSON Web Token (JWT) has become the standard in API authentication and authorization. It enables secure transmission of identity information between client and server without the need to store sessions on the server side. One of the most popular Python libraries for working with JWTs is PyJWT.
PyJWT is a Python library for handling JWT tokens that simplifies creation, decoding, and verification while providing support for multiple signing algorithms. This article explains in detail how to use PyJWT efficiently and securely in your project.
JWT (JSON Web Token) Basics
JWT Token Structure
A JWT consists of three parts separated by dots:
header.payload.signature
- Header — contains the token type and the signing algorithm (e.g., HS256)
- Payload — contains claims, i.e., the actual data
- Signature — used to verify the token’s authenticity
Benefits of Using JWT
- Stateless: the server does not need to store sessions
- Compact: easy to transmit in HTTP headers
- Secure: supports HMAC and RSA signing
- Cross‑platform: works with any programming language
- Self‑contained: carries all required user information
How JWT Works
JWT follows this flow:
- The user authenticates with the server
- The server generates a JWT containing user data
- The client stores the token and sends it in the
Authorizationheader - The server validates the signature and extracts user data
Installing and Configuring PyJWT
How to Install PyJWT
pip install PyJWT
To work with RSA and ECDSA algorithms, install the extra crypto dependencies:
pip install "PyJWT[crypto]"
Import and Basic Setup
import jwt
import datetime
import os
from typing import Dict, Any
PyJWT fully implements RFC 7519 and provides a simple API for token generation and verification.
In‑Depth Overview of the PyJWT Library
History and Evolution
PyJWT was created in 2013 and has become one of the most popular JWT libraries for Python. It is actively maintained by the community and regularly updated to meet the latest security standards.
Library Architecture
PyJWT follows a modular architecture with clear separation of concerns:
- Core encoding/decoding — main functions for token handling
- Algorithm module — support for various signing algorithms
- Exception module — error handling and special cases
- Utility module — helper functions for time and key management
Key Features
- Support for all standard JWT algorithms
- Automatic token expiration verification
- Ability to add custom claims
- Support for symmetric and asymmetric keys
- Integration with popular web frameworks
Creating JWT Tokens with PyJWT
Basic Example
payload = {"user_id": 123}
secret = "mysecret"
token = jwt.encode(payload, secret, algorithm="HS256")
print(token)
Adding Expiration and Extra Data
payload = {
"user_id": 123,
"username": "admin",
"role": "administrator",
"exp": datetime.datetime.utcnow() + datetime.timedelta(hours=1),
"iat": datetime.datetime.utcnow(),
"iss": "myapp"
}
token = jwt.encode(payload, secret, algorithm="HS256")
exp stands for expiration — the token’s lifetime, which is strongly recommended for security.
Creating a Token with Custom Headers
headers = {
"kid": "key-id-001",
"typ": "JWT"
}
payload = {
"user_id": 123,
"exp": datetime.datetime.utcnow() + datetime.timedelta(hours=1)
}
token = jwt.encode(payload, secret, algorithm="HS256", headers=headers)
Verifying and Decoding JWT Tokens
How to Decode a Token
try:
decoded = jwt.decode(token, secret, algorithms=["HS256"])
print(decoded)
except jwt.InvalidTokenError:
print("Invalid token")
Error Handling and Expiration
try:
decoded = jwt.decode(token, secret, algorithms=["HS256"])
print(f"User: {decoded['user_id']}")
except jwt.ExpiredSignatureError:
print("Token has expired")
except jwt.InvalidSignatureError:
print("Invalid token signature")
except jwt.InvalidTokenError:
print("Invalid token")
Decoding with Additional Options
options = {
"verify_exp": True,
"verify_signature": True,
"verify_aud": False,
"verify_iss": False
}
decoded = jwt.decode(token, secret, algorithms=["HS256"], options=options)
Encryption Algorithms in PyJWT
Supported Algorithms
PyJWT supports the following algorithms:
- HMAC: HS256, HS384, HS512
- RSA: RS256, RS384, RS512
- ECDSA: ES256, ES384, ES512
- EdDSA: EdDSA
- RSA‑PSS: PS256, PS384, PS512
Choosing the Right Algorithm
- Simple apps: HS256 (shared secret)
- Distributed systems: RS256 (public/private key pair)
- Maximum security: ES256 (elliptic curve)
Algorithm Usage Examples
# HS256 – symmetric signing
payload = {"user_id": 123}
secret = "my-secret-key"
token_hs256 = jwt.encode(payload, secret, algorithm="HS256")
# RS256 – asymmetric signing
with open("private_key.pem", "r") as f:
private_key = f.read()
token_rs256 = jwt.encode(payload, private_key, algorithm="RS256")
# ES256 – elliptic curve signing
with open("ec_private_key.pem", "r") as f:
ec_private_key = f.read()
token_es256 = jwt.encode(payload, ec_private_key, algorithm="ES256")
PyJWT Methods and Functions Reference
| Method / Function | Description | Parameters | Return Value |
|---|---|---|---|
jwt.encode() |
Creates a JWT token | payload, key, algorithm, headers | string (JWT token) |
jwt.decode() |
Decodes a JWT token | token, key, algorithms, options | dict (payload) |
jwt.get_unverified_header() |
Retrieves the header without verification | token | dict (header) |
jwt.get_unverified_claims() |
Retrieves the payload without verification | token | dict (claims) |
jwt.decode_complete() |
Fully decodes a token (header + payload) | token, key, algorithms, options | dict (header + payload) |
jwt.get_algorithm_by_name() |
Gets an algorithm object by its name | algorithm_name | algorithm object |
jwt.register_algorithm() |
Registers a custom algorithm | algorithm_name, algorithm_obj | None |
jwt.unregister_algorithm() |
Unregisters an algorithm | algorithm_name | None |
Working with Secrets and Keys
Secure Key Storage
Never store keys in plain text inside your code. Use environment variables and secret managers:
import os
from cryptography.hazmat.primitives import serialization
# Load from environment variables
secret = os.environ.get("JWT_SECRET")
if not secret:
raise ValueError("JWT_SECRET is not set")
# Load a private key
def load_private_key(key_path: str, password: str = None):
with open(key_path, "rb") as key_file:
private_key = serialization.load_pem_private_key(
key_file.read(),
password=password.encode() if password else None,
)
return private_key
Using Public and Private Keys
# Generate RSA keys
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.hazmat.primitives import serialization
# Private key
private_key = rsa.generate_private_key(
public_exponent=65537,
key_size=2048,
)
# Public key
public_key = private_key.public_key()
# Serialize keys
private_pem = private_key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=serialization.NoEncryption()
)
public_pem = public_key.public_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PublicFormat.SubjectPublicKeyInfo
)
# Use with JWT
payload = {"user_id": 123, "exp": datetime.datetime.utcnow() + datetime.timedelta(hours=1)}
token = jwt.encode(payload, private_pem, algorithm="RS256")
decoded = jwt.decode(token, public_pem, algorithms=["RS256"])
Key Rotation
class KeyManager:
def __init__(self):
self.keys = {}
self.current_key_id = "key-001"
def add_key(self, key_id: str, key: str):
self.keys[key_id] = key
def get_key(self, key_id: str):
return self.keys.get(key_id)
def create_token(self, payload: dict):
headers = {"kid": self.current_key_id}
return jwt.encode(
payload,
self.keys[self.current_key_id],
algorithm="HS256",
headers=headers
)
def verify_token(self, token: str):
unverified_header = jwt.get_unverified_header(token)
key_id = unverified_header.get("kid")
key = self.get_key(key_id)
if not key:
raise jwt.InvalidTokenError("Unknown key")
return jwt.decode(token, key, algorithms=["HS256"])
Integrating PyJWT into Web Applications
Flask Example
from flask import Flask, request, jsonify
import jwt
import datetime
from functools import wraps
app = Flask(__name__)
SECRET_KEY = "your-secret-key"
def token_required(f):
@wraps(f)
def decorated(*args, **kwargs):
token = request.headers.get('Authorization')
if not token:
return jsonify({'message': 'Token missing'}), 401
try:
token = token.split(' ')[1] # Remove "Bearer "
data = jwt.decode(token, SECRET_KEY, algorithms=["HS256"])
current_user = data['user_id']
except jwt.ExpiredSignatureError:
return jsonify({'message': 'Token expired'}), 401
except jwt.InvalidTokenError:
return jsonify({'message': 'Invalid token'}), 401
return f(current_user, *args, **kwargs)
return decorated
@app.route("/login", methods=["POST"])
def login():
# Add your user verification logic here
user_id = request.json.get('user_id')
payload = {
"user_id": user_id,
"exp": datetime.datetime.utcnow() + datetime.timedelta(hours=1),
"iat": datetime.datetime.utcnow()
}
token = jwt.encode(payload, SECRET_KEY, algorithm="HS256")
return jsonify({"token": token})
@app.route("/protected")
@token_required
def protected(current_user):
return jsonify({
"message": "Access granted",
"user_id": current_user
})
FastAPI Example
from fastapi import FastAPI, HTTPException, Depends
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
import jwt
import datetime
from typing import Optional
app = FastAPI()
security = HTTPBearer()
SECRET_KEY = "your-secret-key"
ALGORITHM = "HS256"
def create_access_token(data: dict, expires_delta: Optional[datetime.timedelta] = None):
to_encode = data.copy()
if expires_delta:
expire = datetime.datetime.utcnow() + expires_delta
else:
expire = datetime.datetime.utcnow() + datetime.timedelta(minutes=15)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
def verify_token(credentials: HTTPAuthorizationCredentials = Depends(security)):
try:
payload = jwt.decode(credentials.credentials, SECRET_KEY, algorithms=[ALGORITHM])
user_id: str = payload.get("sub")
if user_id is None:
raise HTTPException(status_code=401, detail="Invalid token")
return user_id
except jwt.ExpiredSignatureError:
raise HTTPException(status_code=401, detail="Token expired")
except jwt.InvalidTokenError:
raise HTTPException(status_code=401, detail="Invalid token")
@app.post("/login")
async def login(user_id: str):
access_token = create_access_token(data={"sub": user_id})
return {"access_token": access_token, "token_type": "bearer"}
@app.get("/protected")
async def protected_route(current_user: str = Depends(verify_token)):
return {"message": f"Welcome, user {current_user}"}
Middleware for Authentication
from fastapi import FastAPI, Request, HTTPException
from fastapi.middleware.base import BaseHTTPMiddleware
import jwt
class JWTMiddleware(BaseHTTPMiddleware):
def __init__(self, app, secret_key: str):
super().__init__(app)
self.secret_key = secret_key
self.protected_paths = ["/api/protected", "/api/admin"]
async def dispatch(self, request: Request, call_next):
path = request.url.path
if path in self.protected_paths:
auth_header = request.headers.get("Authorization")
if not auth_header:
raise HTTPException(status_code=401, detail="Token missing")
try:
token = auth_header.split(" ")[1]
payload = jwt.decode(token, self.secret_key, algorithms=["HS256"])
request.state.user = payload
except jwt.InvalidTokenError:
raise HTTPException(status_code=401, detail="Invalid token")
response = await call_next(request)
return response
app = FastAPI()
app.add_middleware(JWTMiddleware, secret_key="your-secret-key")
Error Handling and Exceptions
Common PyJWT Errors
PyJWT provides an exception hierarchy for various error types:
try:
decoded = jwt.decode(token, secret, algorithms=["HS256"])
except jwt.ExpiredSignatureError:
# Token has expired
print("Token expired")
except jwt.InvalidSignatureError:
# Invalid signature
print("Invalid signature")
except jwt.InvalidAudienceError:
# Wrong audience
print("Invalid audience")
except jwt.InvalidIssuerError:
# Wrong issuer
print("Invalid issuer")
except jwt.InvalidKeyError:
# Invalid key
print("Invalid key")
except jwt.InvalidTokenError:
# Base exception for all token errors
print("Invalid token")
except jwt.DecodeError:
# Decoding error
print("Failed to decode token")
Custom Error Handler
class JWTErrorHandler:
@staticmethod
def handle_jwt_error(error: Exception) -> tuple:
if isinstance(error, jwt.ExpiredSignatureError):
return {"error": "Token expired", "code": "TOKEN_EXPIRED"}, 401
elif isinstance(error, jwt.InvalidSignatureError):
return {"error": "Invalid signature", "code": "INVALID_SIGNATURE"}, 401
elif isinstance(error, jwt.InvalidTokenError):
return {"error": "Invalid token", "code": "INVALID_TOKEN"}, 401
else:
return {"error": "Unknown error", "code": "UNKNOWN_ERROR"}, 500
# Usage
def verify_token_with_error_handling(token: str, secret: str):
try:
return jwt.decode(token, secret, algorithms=["HS256"])
except Exception as e:
error_response, status_code = JWTErrorHandler.handle_jwt_error(e)
raise HTTPException(status_code=status_code, detail=error_response)
Advanced PyJWT Features
Custom Payloads and Claims
def create_comprehensive_token(user_data: dict, permissions: list, expires_in: int = 3600):
payload = {
# Standard claims
"sub": str(user_data["id"]), # Subject
"iss": "myapp.com", # Issuer
"aud": "myapp-users", # Audience
"exp": datetime.datetime.utcnow() + datetime.timedelta(seconds=expires_in),
"iat": datetime.datetime.utcnow(), # Issued at
"nbf": datetime.datetime.utcnow(), # Not before
"jti": str(uuid.uuid4()), # JWT ID
# Custom claims
"username": user_data["username"],
"email": user_data["email"],
"role": user_data["role"],
"permissions": permissions,
"last_login": user_data.get("last_login"),
"ip_address": user_data.get("ip_address"),
"device_id": user_data.get("device_id")
}
return jwt.encode(payload, SECRET_KEY, algorithm="HS256")
Validating Custom Claims
def validate_token_claims(token: str, secret: str, required_permissions: list = None):
try:
payload = jwt.decode(token, secret, algorithms=["HS256"])
# Check required fields
required_fields = ["sub", "username", "role"]
for field in required_fields:
if field not in payload:
raise ValueError(f"Missing required field: {field}")
# Check permissions
if required_permissions:
user_permissions = payload.get("permissions", [])
if not all(perm in user_permissions for perm in required_permissions):
raise ValueError("Insufficient permissions")
return payload
except jwt.InvalidTokenError as e:
raise ValueError(f"Invalid token: {str(e)}")
Working with Multiple Algorithms
class MultiAlgorithmJWT:
def __init__(self):
self.algorithms = {
"HS256": {"secret": "hmac-secret"},
"RS256": {
"private_key": self.load_private_key("private.pem"),
"public_key": self.load_public_key("public.pem")
}
}
def encode_token(self, payload: dict, algorithm: str = "HS256"):
if algorithm == "HS256":
key = self.algorithms[algorithm]["secret"]
elif algorithm == "RS256":
key = self.algorithms[algorithm]["private_key"]
else:
raise ValueError(f"Unsupported algorithm: {algorithm}")
return jwt.encode(payload, key, algorithm=algorithm)
def decode_token(self, token: str):
# Get algorithm from header
header = jwt.get_unverified_header(token)
algorithm = header.get("alg")
if algorithm == "HS256":
key = self.algorithms[algorithm]["secret"]
elif algorithm == "RS256":
key = self.algorithms[algorithm]["public_key"]
else:
raise ValueError(f"Unsupported algorithm: {algorithm}")
return jwt.decode(token, key, algorithms=[algorithm])
Testing Tokens with PyJWT
Basic Tests
import unittest
from unittest.mock import patch
import jwt
import datetime
class TestJWTTokens(unittest.TestCase):
def setUp(self):
self.secret = "test-secret"
self.payload = {"user_id": 123, "username": "testuser"}
def test_token_creation(self):
token = jwt.encode(self.payload, self.secret, algorithm="HS256")
self.assertIsInstance(token, str)
self.assertTrue(len(token) > 0)
def test_token_decoding(self):
token = jwt.encode(self.payload, self.secret, algorithm="HS256")
decoded = jwt.decode(token, self.secret, algorithms=["HS256"])
self.assertEqual(decoded["user_id"], 123)
self.assertEqual(decoded["username"], "testuser")
def test_expired_token(self):
expired_payload = {
"user_id": 123,
"exp": datetime.datetime.utcnow() - datetime.timedelta(seconds=1)
}
token = jwt.encode(expired_payload, self.secret, algorithm="HS256")
with self.assertRaises(jwt.ExpiredSignatureError):
jwt.decode(token, self.secret, algorithms=["HS256"])
def test_invalid_signature(self):
token = jwt.encode(self.payload, self.secret, algorithm="HS256")
with self.assertRaises(jwt.InvalidSignatureError):
jwt.decode(token, "wrong-secret", algorithms=["HS256"])
@patch('datetime.datetime')
def test_token_with_mocked_time(self, mock_datetime):
# Mock current time for testing
mock_datetime.utcnow.return_value = datetime.datetime(2023, 1, 1, 12, 0, 0)
payload = {
"user_id": 123,
"exp": datetime.datetime(2023, 1, 1, 13, 0, 0) # Expires in one hour
}
token = jwt.encode(payload, self.secret, algorithm="HS256")
decoded = jwt.decode(token, self.secret, algorithms=["HS256"])
self.assertEqual(decoded["user_id"], 123)
Flask Integration Tests
import unittest
from flask import Flask
import json
class TestFlaskJWTIntegration(unittest.TestCase):
def setUp(self):
self.app = Flask(__name__)
self.app.config['SECRET_KEY'] = 'test-secret'
self.client = self.app.test_client()
# Add test routes
@self.app.route('/login', methods=['POST'])
def login():
token = jwt.encode({'user_id': 123}, 'test-secret', algorithm='HS256')
return {'token': token}
@self.app.route('/protected')
def protected():
auth_header = request.headers.get('Authorization')
if not auth_header:
return {'error': 'No token'}, 401
token = auth_header.split(' ')[1]
try:
payload = jwt.decode(token, 'test-secret', algorithms=['HS256'])
return {'user_id': payload['user_id']}
except jwt.InvalidTokenError:
return {'error': 'Invalid token'}, 401
def test_login_endpoint(self):
response = self.client.post('/login')
self.assertEqual(response.status_code, 200)
data = json.loads(response.data)
self.assertIn('token', data)
def test_protected_endpoint_with_token(self):
# Get token
login_response = self.client.post('/login')
token = json.loads(login_response.data)['token']
# Use token to access protected endpoint
headers = {'Authorization': f'Bearer {token}'}
response = self.client.get('/protected', headers=headers)
self.assertEqual(response.status_code, 200)
data = json.loads(response.data)
self.assertEqual(data['user_id'], 123)
def test_protected_endpoint_without_token(self):
response = self.client.get('/protected')
self.assertEqual(response.status_code, 401)
Performance and Optimization
PyJWT Benchmarks
import time
import jwt
from typing import List
def benchmark_jwt_operations(iterations: int = 10000):
secret = "benchmark-secret"
payload = {"user_id": 123, "role": "user"}
# Token creation test
start_time = time.time()
tokens = []
for _ in range(iterations):
token = jwt.encode(payload, secret, algorithm="HS256")
tokens.append(token)
encode_time = time.time() - start_time
# Token decoding test
start_time = time.time()
for token in tokens:
decoded = jwt.decode(token, secret, algorithms=["HS256"])
decode_time = time.time() - start_time
print(f"Created {iterations} tokens: {encode_time:.2f}s")
print(f"Decoded {iterations} tokens: {decode_time:.2f}s")
print(f"Tokens created per second: {iterations/encode_time:.0f}")
print(f"Tokens decoded per second: {iterations/decode_time:.0f}")
# Run benchmark
benchmark_jwt_operations()
Performance Optimizations
class OptimizedJWTHandler:
def __init__(self, secret: str):
self.secret = secret
self.algorithm = "HS256"
# Cache frequently used tokens
self._token_cache = {}
self._cache_ttl = 300 # 5 minutes
def encode_with_cache(self, payload: dict, cache_key: str = None):
if cache_key and cache_key in self._token_cache:
cached_token, timestamp = self._token_cache[cache_key]
if time.time() - timestamp < self._cache_ttl:
return cached_token
token = jwt.encode(payload, self.secret, algorithm=self.algorithm)
if cache_key:
self._token_cache[cache_key] = (token, time.time())
return token
def batch_encode(self, payloads: List[dict]) -> List[str]:
"""Batch token creation for better performance"""
return [
jwt.encode(payload, self.secret, algorithm=self.algorithm)
for payload in payloads
]
def verify_with_cache(self, token: str):
"""Verify token with result caching"""
token_hash = hash(token)
if token_hash in self._token_cache:
cached_result, timestamp = self._token_cache[token_hash]
if time.time() - timestamp < 60: # 1‑minute cache
return cached_result
try:
decoded = jwt.decode(token, self.secret, algorithms=[self.algorithm])
self._token_cache[token_hash] = (decoded, time.time())
return decoded
except jwt.InvalidTokenError:
return None
Comparison of PyJWT with Other Libraries
Comparison Table
| Library | Features | Pros | Cons |
|---|---|---|---|
| PyJWT | Simplicity, flexibility, active maintenance | Easy to use, good documentation | No built‑in framework integrations |
| Authlib | Advanced OAuth2 capabilities | Full OAuth2/OpenID Connect support | More complex configuration |
| python-jose | Extended cryptography | Supports JWE, JWS, JWK | Integration can be harder |
| Flask‑JWT‑Extended | Flask integration | Ready‑made decorators, refresh tokens | Tied to Flask |
Migrating from Other Libraries
# Migration from python-jose
from jose import jwt as jose_jwt
import jwt as pyjwt
def migrate_from_jose():
# Old code using python-jose
# token = jose_jwt.encode(payload, secret, algorithm="HS256")
# New code using PyJWT
token = pyjwt.encode(payload, secret, algorithm="HS256")
# Key differences:
# 1. PyJWT returns a string, not bytes
# 2. Slightly different error handling
# 3. Some options have different names
Security Recommendations
Common Vulnerabilities and Mitigations
# Bad: missing exp
payload = {"user_id": 123}
token = jwt.encode(payload, secret, algorithm="HS256")
# Good: include expiration
payload = {
"user_id": 123,
"exp": datetime.datetime.utcnow() + datetime.timedelta(hours=1)
}
token = jwt.encode(payload, secret, algorithm="HS256")
# Bad: weak secret
secret = "123"
# Good: strong secret
import secrets
secret = secrets.token_urlsafe(32)
# Bad: not verifying algorithm
decoded = jwt.decode(token, secret)
# Good: explicitly specify algorithms
decoded = jwt.decode(token, secret, algorithms=["HS256"])
Best Security Practices
class SecureJWTHandler:
def __init__(self, secret: str):
self.secret = secret
self.algorithm = "HS256"
self.max_token_age = datetime.timedelta(hours=1)
def create_secure_token(self, user_id: int, additional_claims: dict = None):
"""Create a secure token with mandatory claims"""
now = datetime.datetime.utcnow()
payload = {
"sub": str(user_id),
"iat": now,
"exp": now + self.max_token_age,
"nbf": now,
"jti": str(uuid.uuid4()),
"iss": "myapp.com"
}
if additional_claims:
payload.update(additional_claims)
return jwt.encode(payload, self.secret, algorithm=self.algorithm)
def verify_secure_token(self, token: str, audience: str = None):
"""Secure token verification"""
try:
options = {
"verify_signature": True,
"verify_exp": True,
"verify_nbf": True,
"verify_iat": True,
"verify_aud": bool(audience)
}
return jwt.decode(
token,
self.secret,
algorithms=[self.algorithm],
audience=audience,
options=options
)
except jwt.ExpiredSignatureError:
raise SecurityError("Token expired")
except jwt.InvalidSignatureError:
raise SecurityError("Invalid signature")
except jwt.InvalidTokenError:
raise SecurityError("Invalid token")
Common Use Cases
Authentication System with Refresh Tokens
class RefreshTokenSystem:
def __init__(self, secret: str):
self.secret = secret
self.access_token_expire = datetime.timedelta(minutes=15)
self.refresh_token_expire = datetime.timedelta(days=30)
def create_token_pair(self, user_id: int):
"""Generate access and refresh token pair"""
now = datetime.datetime.utcnow()
# Access token (short‑lived)
access_payload = {
"sub": str(user_id),
"type": "access",
"exp": now + self.access_token_expire,
"iat": now
}
access_token = jwt.encode(access_payload, self.secret, algorithm="HS256")
# Refresh token (long‑lived)
refresh_payload = {
"sub": str(user_id),
"type": "refresh",
"exp": now + self.refresh_token_expire,
"iat": now
}
refresh_token = jwt.encode(refresh_payload, self.secret, algorithm="HS256")
return {
"access_token": access_token,
"refresh_token": refresh_token,
"token_type": "Bearer",
"expires_in": int(self.access_token_expire.total_seconds())
}
def refresh_access_token(self, refresh_token: str):
"""Refresh an access token using a refresh token"""
try:
payload = jwt.decode(refresh_token, self.secret, algorithms=["HS256"])
if payload.get("type") != "refresh":
raise ValueError("Not a refresh token")
user_id = int(payload["sub"])
return self.create_token_pair(user_id)
except jwt.ExpiredSignatureError:
raise ValueError("Refresh token expired")
except jwt.InvalidTokenError:
raise ValueError("Invalid refresh token")
Multi‑Level Authorization
class RoleBasedJWT:
def __init__(self, secret: str):
self.secret = secret
self.role_hierarchy = {
"admin": ["admin", "moderator", "user"],
"moderator": ["moderator", "user"],
"user": ["user"]
}
def create_token_with_roles(self, user_id: int, roles: List[str], permissions: List[str]):
payload = {
"sub": str(user_id),
"roles": roles,
"permissions": permissions,
"exp": datetime.datetime.utcnow() + datetime.timedelta(hours=1)
}
return jwt.encode(payload, self.secret, algorithm="HS256")
def check_permission(self, token: str, required_role: str = None, required_permission: str = None):
try:
payload = jwt.decode(token, self.secret, algorithms=["HS256"])
user_roles = payload.get("roles", [])
user_permissions = payload.get("permissions", [])
# Role check
if required_role:
allowed_roles = self.role_hierarchy.get(required_role, [])
if not any(role in allowed_roles for role in user_roles):
return False
# Permission check
if required_permission and required_permission not in user_permissions:
return False
return True
except jwt.InvalidTokenError:
return False
Database Integration
Storing Tokens in Redis
import redis
import json
class RedisTokenStore:
def __init__(self, redis_client: redis.Redis, secret: str):
self.redis = redis_client
self.secret = secret
def store_token(self, user_id: int, token: str, expires_in: int):
"""Save token in Redis with TTL"""
key = f"user_token:{user_id}"
self.redis.setex(key, expires_in, token)
def get_token(self, user_id: int) -> str:
"""Retrieve token from Redis"""
key = f"user_token:{user_id}"
token = self.redis.get(key)
return token.decode() if token else None
def revoke_token(self, user_id: int):
"""Delete token from Redis"""
key = f"user_token:{user_id}"
self.redis.delete(key)
def create_and_store_token(self, user_id: int, payload: dict, expires_in: int = 3600):
"""Create a token and store it in Redis"""
payload.update({
"exp": datetime.datetime.utcnow() + datetime.timedelta(seconds=expires_in),
"sub": str(user_id)
})
token = jwt.encode(payload, self.secret, algorithm="HS256")
self.store_token(user_id, token, expires_in)
return token
SQLAlchemy Integration
from sqlalchemy import Column, Integer, String, DateTime, Text
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
Base = declarative_base()
class TokenBlacklist(Base):
__tablename__ = "token_blacklist"
id = Column(Integer, primary_key=True)
jti = Column(String(36), unique=True, nullable=False)
user_id = Column(Integer, nullable=False)
created_at = Column(DateTime, default=datetime.datetime.utcnow)
expires_at = Column(DateTime, nullable=False)
class DatabaseTokenManager:
def __init__(self, db_session, secret: str):
self.db = db_session
self.secret = secret
def create_token_with_jti(self, user_id: int, payload: dict):
"""Create a token with a JTI for blacklist tracking"""
jti = str(uuid.uuid4())
payload.update({
"jti": jti,
"sub": str(user_id),
"exp": datetime.datetime.utcnow() + datetime.timedelta(hours=1)
})
token = jwt.encode(payload, self.secret, algorithm="HS256")
return token, jti
def blacklist_token(self, token: str):
"""Add token to blacklist"""
try:
payload = jwt.decode(token, self.secret, algorithms=["HS256"])
jti = payload.get("jti")
user_id = payload.get("sub")
expires_at = datetime.datetime.fromtimestamp(payload.get("exp"))
if jti:
blacklisted_token = TokenBlacklist(
jti=jti,
user_id=int(user_id),
expires_at=expires_at
)
self.db.add(blacklisted_token)
self.db.commit()
except jwt.InvalidTokenError:
pass
def is_token_blacklisted(self, token: str) -> bool:
"""Check if token is blacklisted"""
try:
payload = jwt.decode(token, self.secret, algorithms=["HS256"])
jti = payload.get("jti")
if jti:
return self.db.query(TokenBlacklist).filter_by(jti=jti).first() is not None
return False
except jwt.InvalidTokenError:
return True
Monitoring and Logging
JWT Operations Logging System
import logging
import json
from datetime import datetime
class JWTLogger:
def __init__(self, logger_name: str = "jwt_operations"):
self.logger = logging.getLogger(logger_name)
self.logger.setLevel(logging.INFO)
# File handler
handler = logging.FileHandler("jwt_operations.log")
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
handler.setFormatter(formatter)
self.logger.addHandler(handler)
def log_token_creation(self, user_id: int, token_type: str, expires_in: int):
self.logger.info(json.dumps({
"operation": "token_created",
"user_id": user_id,
"token_type": token_type,
"expires_in": expires_in,
"timestamp": datetime.utcnow().isoformat()
}))
def log_token_verification(self, user_id: int, success: bool, error: str = None):
log_data = {
"operation": "token_verified",
"user_id": user_id,
"success": success,
"timestamp": datetime.utcnow().isoformat()
}
if error:
log_data["error"] = error
if success:
self.logger.info(json.dumps(log_data))
else:
self.logger.warning(json.dumps(log_data))
def log_token_revocation(self, user_id: int, reason: str):
self.logger.info(json.dumps({
"operation": "token_revoked",
"user_id": user_id,
"reason": reason,
"timestamp": datetime.utcnow().isoformat()
}))
Metrics and Monitoring
from dataclasses import dataclass
from typing import Dict
import time
@dataclass
class JWTMetrics:
tokens_created: int = 0
tokens_verified: int = 0
verification_failures: int = 0
expired_tokens: int = 0
invalid_signatures: int = 0
def to_dict(self) -> Dict[str, int]:
return {
"tokens_created": self.tokens_created,
"tokens_verified": self.tokens_verified,
"verification_failures": self.verification_failures,
"expired_tokens": self.expired_tokens,
"invalid_signatures": self.invalid_signatures
}
class MonitoredJWTHandler:
def __init__(self, secret: str):
self.secret = secret
self.metrics = JWTMetrics()
self.logger = JWTLogger()
def create_token(self, payload: dict) -> str:
start_time = time.time()
token = jwt.encode(payload, self.secret, algorithm="HS256")
# Update metrics
self.metrics.tokens_created += 1
# Log operation
user_id = payload.get("sub", "unknown")
expires_in = payload.get("exp", 0) - time.time() if payload.get("exp") else 0
self.logger.log_token_creation(user_id, "access", int(expires_in))
return token
def verify_token(self, token: str) -> dict:
try:
payload = jwt.decode(token, self.secret, algorithms=["HS256"])
# Update metrics
self.metrics.tokens_verified += 1
# Log success
user_id = payload.get("sub", "unknown")
self.logger.log_token_verification(user_id, True)
return payload
except jwt.ExpiredSignatureError:
self.metrics.expired_tokens += 1
self.metrics.verification_failures += 1
self.logger.log_token_verification("unknown", False, "expired")
raise
except jwt.InvalidSignatureError:
self.metrics.invalid_signatures += 1
self.metrics.verification_failures += 1
self.logger.log_token_verification("unknown", False, "invalid_signature")
raise
except jwt.InvalidTokenError:
self.metrics.verification_failures += 1
self.logger.log_token_verification("unknown", False, "invalid_token")
raise
def get_metrics(self) -> Dict[str, int]:
return self.metrics.to_dict()
Frequently Asked Questions
What is PyJWT and what is it used for?
PyJWT is a Python library for creating, encoding, and decoding JWT tokens. It is used for user authentication, API authorization, and secure data exchange between client and server.
Is it safe to use JWT in production?
Yes, JWT can be safely used in production when you follow best practices:
- Use strong secret keys
- Set short token lifetimes
- Transmit tokens only over HTTPS
- Avoid storing sensitive data in the payload
Which algorithm is better: HS256 or RS256?
- HS256 is suitable for simple, single‑server applications where a shared secret can be securely distributed.
- RS256 is preferred for distributed systems where multiple services need to verify tokens but only one service signs them.
Where and how should I store the secret key securely?
- Use environment variables
- Leverage secret managers (AWS Secrets Manager, HashiCorp Vault)
- Never hard‑code keys in source code or config files
- Rotate keys regularly
Can I use PyJWT with FastAPI or Flask?
Yes, PyJWT integrates smoothly with both frameworks:
- In Flask you can create decorators to protect routes
- In FastAPI you can use dependency injection for token validation
Does PyJWT support refresh tokens?
PyJWT does not provide built‑in refresh‑token handling, but you can easily implement access/refresh token pairs at the application level.
How should I handle expired tokens?
try:
payload = jwt.decode(token, secret, algorithms=["HS256"])
except jwt.ExpiredSignatureError:
# Token expired – request a new one or redirect to login
return {"error": "Token expired", "code": "TOKEN_EXPIRED"}
Can a JWT be revoked before its expiration?
JWTs are stateless, so they cannot be revoked directly. However you can:
- Maintain a blacklist in a database
- Use short lifetimes combined with refresh tokens
- Store active tokens in Redis and delete them on revocation
How do I test applications that use JWT?
def test_protected_endpoint():
# Create a test token
test_payload = {"user_id": 123, "exp": datetime.utcnow() + timedelta(hours=1)}
test_token = jwt.encode(test_payload, "test-secret", algorithm="HS256")
# Use the token in a request
headers = {"Authorization": f"Bearer {test_token}"}
response = client.get("/protected", headers=headers)
assert response.status_code == 200
Does payload size affect performance?
Yes, a larger payload means a larger token, which impacts:
- Network transmission time
- HTTP header size
- Processing time
It is recommended to store only the data that is strictly necessary in a JWT.
Conclusion
PyJWT is a powerful, flexible, and reliable library for handling JWT tokens in Python. It provides all the tools needed for secure authentication and authorization in modern web applications.
Key advantages of PyJWT:
- Easy to use with an intuitive API
- Full compliance with RFC 7519
- Wide range of supported signing algorithms
- Active community and regular updates
- Excellent compatibility with popular web frameworks
When using PyJWT in production, always follow security best practices: use strong keys, set appropriate token lifetimes, transmit data over HTTPS only, and keep the library up to date.
PyJWT will become an indispensable tool for building modern APIs, microservices, and web applications that require a robust authentication system.
The Future of AI in Mathematics and Everyday Life: How Intelligent Agents Are Already Changing the Game
Experts warned about the risks of fake charity with AI
In Russia, universal AI-agent for robots and industrial processes was developed