Pyjwt-Work with JWT TOKENS

онлайн тренажер по питону
Online Python Trainer for Beginners

Learn Python easily without overwhelming theory. Solve practical tasks with automatic checking, get hints in Russian, and write code directly in your browser — no installation required.

Start Course

Introduction to PyJWT

JSON Web Token (JWT) has become the standard in API authentication and authorization. It enables secure transmission of identity information between client and server without the need to store sessions on the server side. One of the most popular Python libraries for working with JWTs is PyJWT.

PyJWT is a Python library for handling JWT tokens that simplifies creation, decoding, and verification while providing support for multiple signing algorithms. This article explains in detail how to use PyJWT efficiently and securely in your project.

JWT (JSON Web Token) Basics

JWT Token Structure

A JWT consists of three parts separated by dots:

header.payload.signature
  • Header — contains the token type and the signing algorithm (e.g., HS256)
  • Payload — contains claims, i.e., the actual data
  • Signature — used to verify the token’s authenticity

Benefits of Using JWT

  • Stateless: the server does not need to store sessions
  • Compact: easy to transmit in HTTP headers
  • Secure: supports HMAC and RSA signing
  • Cross‑platform: works with any programming language
  • Self‑contained: carries all required user information

How JWT Works

JWT follows this flow:

  1. The user authenticates with the server
  2. The server generates a JWT containing user data
  3. The client stores the token and sends it in the Authorization header
  4. The server validates the signature and extracts user data

Installing and Configuring PyJWT

How to Install PyJWT

pip install PyJWT

To work with RSA and ECDSA algorithms, install the extra crypto dependencies:

pip install "PyJWT[crypto]"

Import and Basic Setup

import jwt
import datetime
import os
from typing import Dict, Any

PyJWT fully implements RFC 7519 and provides a simple API for token generation and verification.

In‑Depth Overview of the PyJWT Library

History and Evolution

PyJWT was created in 2013 and has become one of the most popular JWT libraries for Python. It is actively maintained by the community and regularly updated to meet the latest security standards.

Library Architecture

PyJWT follows a modular architecture with clear separation of concerns:

  • Core encoding/decoding — main functions for token handling
  • Algorithm module — support for various signing algorithms
  • Exception module — error handling and special cases
  • Utility module — helper functions for time and key management

Key Features

  • Support for all standard JWT algorithms
  • Automatic token expiration verification
  • Ability to add custom claims
  • Support for symmetric and asymmetric keys
  • Integration with popular web frameworks

Creating JWT Tokens with PyJWT

Basic Example

payload = {"user_id": 123}
secret = "mysecret"
token = jwt.encode(payload, secret, algorithm="HS256")
print(token)

Adding Expiration and Extra Data

payload = {
    "user_id": 123,
    "username": "admin",
    "role": "administrator",
    "exp": datetime.datetime.utcnow() + datetime.timedelta(hours=1),
    "iat": datetime.datetime.utcnow(),
    "iss": "myapp"
}
token = jwt.encode(payload, secret, algorithm="HS256")

exp stands for expiration — the token’s lifetime, which is strongly recommended for security.

Creating a Token with Custom Headers

headers = {
    "kid": "key-id-001",
    "typ": "JWT"
}

payload = {
    "user_id": 123,
    "exp": datetime.datetime.utcnow() + datetime.timedelta(hours=1)
}

token = jwt.encode(payload, secret, algorithm="HS256", headers=headers)

Verifying and Decoding JWT Tokens

How to Decode a Token

try:
    decoded = jwt.decode(token, secret, algorithms=["HS256"])
    print(decoded)
except jwt.InvalidTokenError:
    print("Invalid token")

Error Handling and Expiration

try:
    decoded = jwt.decode(token, secret, algorithms=["HS256"])
    print(f"User: {decoded['user_id']}")
except jwt.ExpiredSignatureError:
    print("Token has expired")
except jwt.InvalidSignatureError:
    print("Invalid token signature")
except jwt.InvalidTokenError:
    print("Invalid token")

Decoding with Additional Options

options = {
    "verify_exp": True,
    "verify_signature": True,
    "verify_aud": False,
    "verify_iss": False
}

decoded = jwt.decode(token, secret, algorithms=["HS256"], options=options)

Encryption Algorithms in PyJWT

Supported Algorithms

PyJWT supports the following algorithms:

  • HMAC: HS256, HS384, HS512
  • RSA: RS256, RS384, RS512
  • ECDSA: ES256, ES384, ES512
  • EdDSA: EdDSA
  • RSA‑PSS: PS256, PS384, PS512

Choosing the Right Algorithm

  • Simple apps: HS256 (shared secret)
  • Distributed systems: RS256 (public/private key pair)
  • Maximum security: ES256 (elliptic curve)

Algorithm Usage Examples

# HS256 – symmetric signing
payload = {"user_id": 123}
secret = "my-secret-key"
token_hs256 = jwt.encode(payload, secret, algorithm="HS256")

# RS256 – asymmetric signing
with open("private_key.pem", "r") as f:
    private_key = f.read()
    
token_rs256 = jwt.encode(payload, private_key, algorithm="RS256")

# ES256 – elliptic curve signing
with open("ec_private_key.pem", "r") as f:
    ec_private_key = f.read()
    
token_es256 = jwt.encode(payload, ec_private_key, algorithm="ES256")

PyJWT Methods and Functions Reference

Method / Function Description Parameters Return Value
jwt.encode() Creates a JWT token payload, key, algorithm, headers string (JWT token)
jwt.decode() Decodes a JWT token token, key, algorithms, options dict (payload)
jwt.get_unverified_header() Retrieves the header without verification token dict (header)
jwt.get_unverified_claims() Retrieves the payload without verification token dict (claims)
jwt.decode_complete() Fully decodes a token (header + payload) token, key, algorithms, options dict (header + payload)
jwt.get_algorithm_by_name() Gets an algorithm object by its name algorithm_name algorithm object
jwt.register_algorithm() Registers a custom algorithm algorithm_name, algorithm_obj None
jwt.unregister_algorithm() Unregisters an algorithm algorithm_name None

Working with Secrets and Keys

Secure Key Storage

Never store keys in plain text inside your code. Use environment variables and secret managers:

import os
from cryptography.hazmat.primitives import serialization

# Load from environment variables
secret = os.environ.get("JWT_SECRET")
if not secret:
    raise ValueError("JWT_SECRET is not set")

# Load a private key
def load_private_key(key_path: str, password: str = None):
    with open(key_path, "rb") as key_file:
        private_key = serialization.load_pem_private_key(
            key_file.read(),
            password=password.encode() if password else None,
        )
    return private_key

Using Public and Private Keys

# Generate RSA keys
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.hazmat.primitives import serialization

# Private key
private_key = rsa.generate_private_key(
    public_exponent=65537,
    key_size=2048,
)

# Public key
public_key = private_key.public_key()

# Serialize keys
private_pem = private_key.private_bytes(
    encoding=serialization.Encoding.PEM,
    format=serialization.PrivateFormat.PKCS8,
    encryption_algorithm=serialization.NoEncryption()
)

public_pem = public_key.public_bytes(
    encoding=serialization.Encoding.PEM,
    format=serialization.PublicFormat.SubjectPublicKeyInfo
)

# Use with JWT
payload = {"user_id": 123, "exp": datetime.datetime.utcnow() + datetime.timedelta(hours=1)}
token = jwt.encode(payload, private_pem, algorithm="RS256")
decoded = jwt.decode(token, public_pem, algorithms=["RS256"])

Key Rotation

class KeyManager:
    def __init__(self):
        self.keys = {}
        self.current_key_id = "key-001"
    
    def add_key(self, key_id: str, key: str):
        self.keys[key_id] = key
    
    def get_key(self, key_id: str):
        return self.keys.get(key_id)
    
    def create_token(self, payload: dict):
        headers = {"kid": self.current_key_id}
        return jwt.encode(
            payload, 
            self.keys[self.current_key_id], 
            algorithm="HS256",
            headers=headers
        )
    
    def verify_token(self, token: str):
        unverified_header = jwt.get_unverified_header(token)
        key_id = unverified_header.get("kid")
        key = self.get_key(key_id)
        
        if not key:
            raise jwt.InvalidTokenError("Unknown key")
        
        return jwt.decode(token, key, algorithms=["HS256"])

Integrating PyJWT into Web Applications

Flask Example

from flask import Flask, request, jsonify
import jwt
import datetime
from functools import wraps

app = Flask(__name__)
SECRET_KEY = "your-secret-key"

def token_required(f):
    @wraps(f)
    def decorated(*args, **kwargs):
        token = request.headers.get('Authorization')
        
        if not token:
            return jsonify({'message': 'Token missing'}), 401
        
        try:
            token = token.split(' ')[1]  # Remove "Bearer "
            data = jwt.decode(token, SECRET_KEY, algorithms=["HS256"])
            current_user = data['user_id']
        except jwt.ExpiredSignatureError:
            return jsonify({'message': 'Token expired'}), 401
        except jwt.InvalidTokenError:
            return jsonify({'message': 'Invalid token'}), 401
        
        return f(current_user, *args, **kwargs)
    return decorated

@app.route("/login", methods=["POST"])
def login():
    # Add your user verification logic here
    user_id = request.json.get('user_id')
    
    payload = {
        "user_id": user_id,
        "exp": datetime.datetime.utcnow() + datetime.timedelta(hours=1),
        "iat": datetime.datetime.utcnow()
    }
    
    token = jwt.encode(payload, SECRET_KEY, algorithm="HS256")
    return jsonify({"token": token})

@app.route("/protected")
@token_required
def protected(current_user):
    return jsonify({
        "message": "Access granted",
        "user_id": current_user
    })

FastAPI Example

from fastapi import FastAPI, HTTPException, Depends
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
import jwt
import datetime
from typing import Optional

app = FastAPI()
security = HTTPBearer()

SECRET_KEY = "your-secret-key"
ALGORITHM = "HS256"

def create_access_token(data: dict, expires_delta: Optional[datetime.timedelta] = None):
    to_encode = data.copy()
    if expires_delta:
        expire = datetime.datetime.utcnow() + expires_delta
    else:
        expire = datetime.datetime.utcnow() + datetime.timedelta(minutes=15)
    
    to_encode.update({"exp": expire})
    encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
    return encoded_jwt

def verify_token(credentials: HTTPAuthorizationCredentials = Depends(security)):
    try:
        payload = jwt.decode(credentials.credentials, SECRET_KEY, algorithms=[ALGORITHM])
        user_id: str = payload.get("sub")
        if user_id is None:
            raise HTTPException(status_code=401, detail="Invalid token")
        return user_id
    except jwt.ExpiredSignatureError:
        raise HTTPException(status_code=401, detail="Token expired")
    except jwt.InvalidTokenError:
        raise HTTPException(status_code=401, detail="Invalid token")

@app.post("/login")
async def login(user_id: str):
    access_token = create_access_token(data={"sub": user_id})
    return {"access_token": access_token, "token_type": "bearer"}

@app.get("/protected")
async def protected_route(current_user: str = Depends(verify_token)):
    return {"message": f"Welcome, user {current_user}"}

Middleware for Authentication

from fastapi import FastAPI, Request, HTTPException
from fastapi.middleware.base import BaseHTTPMiddleware
import jwt

class JWTMiddleware(BaseHTTPMiddleware):
    def __init__(self, app, secret_key: str):
        super().__init__(app)
        self.secret_key = secret_key
        self.protected_paths = ["/api/protected", "/api/admin"]
    
    async def dispatch(self, request: Request, call_next):
        path = request.url.path
        
        if path in self.protected_paths:
            auth_header = request.headers.get("Authorization")
            if not auth_header:
                raise HTTPException(status_code=401, detail="Token missing")
            
            try:
                token = auth_header.split(" ")[1]
                payload = jwt.decode(token, self.secret_key, algorithms=["HS256"])
                request.state.user = payload
            except jwt.InvalidTokenError:
                raise HTTPException(status_code=401, detail="Invalid token")
        
        response = await call_next(request)
        return response

app = FastAPI()
app.add_middleware(JWTMiddleware, secret_key="your-secret-key")

Error Handling and Exceptions

Common PyJWT Errors

PyJWT provides an exception hierarchy for various error types:

try:
    decoded = jwt.decode(token, secret, algorithms=["HS256"])
except jwt.ExpiredSignatureError:
    # Token has expired
    print("Token expired")
except jwt.InvalidSignatureError:
    # Invalid signature
    print("Invalid signature")
except jwt.InvalidAudienceError:
    # Wrong audience
    print("Invalid audience")
except jwt.InvalidIssuerError:
    # Wrong issuer
    print("Invalid issuer")
except jwt.InvalidKeyError:
    # Invalid key
    print("Invalid key")
except jwt.InvalidTokenError:
    # Base exception for all token errors
    print("Invalid token")
except jwt.DecodeError:
    # Decoding error
    print("Failed to decode token")

Custom Error Handler

class JWTErrorHandler:
    @staticmethod
    def handle_jwt_error(error: Exception) -> tuple:
        if isinstance(error, jwt.ExpiredSignatureError):
            return {"error": "Token expired", "code": "TOKEN_EXPIRED"}, 401
        elif isinstance(error, jwt.InvalidSignatureError):
            return {"error": "Invalid signature", "code": "INVALID_SIGNATURE"}, 401
        elif isinstance(error, jwt.InvalidTokenError):
            return {"error": "Invalid token", "code": "INVALID_TOKEN"}, 401
        else:
            return {"error": "Unknown error", "code": "UNKNOWN_ERROR"}, 500

# Usage
def verify_token_with_error_handling(token: str, secret: str):
    try:
        return jwt.decode(token, secret, algorithms=["HS256"])
    except Exception as e:
        error_response, status_code = JWTErrorHandler.handle_jwt_error(e)
        raise HTTPException(status_code=status_code, detail=error_response)

Advanced PyJWT Features

Custom Payloads and Claims

def create_comprehensive_token(user_data: dict, permissions: list, expires_in: int = 3600):
    payload = {
        # Standard claims
        "sub": str(user_data["id"]),          # Subject
        "iss": "myapp.com",                  # Issuer
        "aud": "myapp-users",                # Audience
        "exp": datetime.datetime.utcnow() + datetime.timedelta(seconds=expires_in),
        "iat": datetime.datetime.utcnow(),   # Issued at
        "nbf": datetime.datetime.utcnow(),   # Not before
        "jti": str(uuid.uuid4()),            # JWT ID
        
        # Custom claims
        "username": user_data["username"],
        "email": user_data["email"],
        "role": user_data["role"],
        "permissions": permissions,
        "last_login": user_data.get("last_login"),
        "ip_address": user_data.get("ip_address"),
        "device_id": user_data.get("device_id")
    }
    
    return jwt.encode(payload, SECRET_KEY, algorithm="HS256")

Validating Custom Claims

def validate_token_claims(token: str, secret: str, required_permissions: list = None):
    try:
        payload = jwt.decode(token, secret, algorithms=["HS256"])
        
        # Check required fields
        required_fields = ["sub", "username", "role"]
        for field in required_fields:
            if field not in payload:
                raise ValueError(f"Missing required field: {field}")
        
        # Check permissions
        if required_permissions:
            user_permissions = payload.get("permissions", [])
            if not all(perm in user_permissions for perm in required_permissions):
                raise ValueError("Insufficient permissions")
        
        return payload
    except jwt.InvalidTokenError as e:
        raise ValueError(f"Invalid token: {str(e)}")

Working with Multiple Algorithms

class MultiAlgorithmJWT:
    def __init__(self):
        self.algorithms = {
            "HS256": {"secret": "hmac-secret"},
            "RS256": {
                "private_key": self.load_private_key("private.pem"),
                "public_key": self.load_public_key("public.pem")
            }
        }
    
    def encode_token(self, payload: dict, algorithm: str = "HS256"):
        if algorithm == "HS256":
            key = self.algorithms[algorithm]["secret"]
        elif algorithm == "RS256":
            key = self.algorithms[algorithm]["private_key"]
        else:
            raise ValueError(f"Unsupported algorithm: {algorithm}")
        
        return jwt.encode(payload, key, algorithm=algorithm)
    
    def decode_token(self, token: str):
        # Get algorithm from header
        header = jwt.get_unverified_header(token)
        algorithm = header.get("alg")
        
        if algorithm == "HS256":
            key = self.algorithms[algorithm]["secret"]
        elif algorithm == "RS256":
            key = self.algorithms[algorithm]["public_key"]
        else:
            raise ValueError(f"Unsupported algorithm: {algorithm}")
        
        return jwt.decode(token, key, algorithms=[algorithm])

Testing Tokens with PyJWT

Basic Tests

import unittest
from unittest.mock import patch
import jwt
import datetime

class TestJWTTokens(unittest.TestCase):
    def setUp(self):
        self.secret = "test-secret"
        self.payload = {"user_id": 123, "username": "testuser"}
    
    def test_token_creation(self):
        token = jwt.encode(self.payload, self.secret, algorithm="HS256")
        self.assertIsInstance(token, str)
        self.assertTrue(len(token) > 0)
    
    def test_token_decoding(self):
        token = jwt.encode(self.payload, self.secret, algorithm="HS256")
        decoded = jwt.decode(token, self.secret, algorithms=["HS256"])
        self.assertEqual(decoded["user_id"], 123)
        self.assertEqual(decoded["username"], "testuser")
    
    def test_expired_token(self):
        expired_payload = {
            "user_id": 123,
            "exp": datetime.datetime.utcnow() - datetime.timedelta(seconds=1)
        }
        token = jwt.encode(expired_payload, self.secret, algorithm="HS256")
        
        with self.assertRaises(jwt.ExpiredSignatureError):
            jwt.decode(token, self.secret, algorithms=["HS256"])
    
    def test_invalid_signature(self):
        token = jwt.encode(self.payload, self.secret, algorithm="HS256")
        
        with self.assertRaises(jwt.InvalidSignatureError):
            jwt.decode(token, "wrong-secret", algorithms=["HS256"])
    
    @patch('datetime.datetime')
    def test_token_with_mocked_time(self, mock_datetime):
        # Mock current time for testing
        mock_datetime.utcnow.return_value = datetime.datetime(2023, 1, 1, 12, 0, 0)
        
        payload = {
            "user_id": 123,
            "exp": datetime.datetime(2023, 1, 1, 13, 0, 0)  # Expires in one hour
        }
        token = jwt.encode(payload, self.secret, algorithm="HS256")
        decoded = jwt.decode(token, self.secret, algorithms=["HS256"])
        
        self.assertEqual(decoded["user_id"], 123)

Flask Integration Tests

import unittest
from flask import Flask
import json

class TestFlaskJWTIntegration(unittest.TestCase):
    def setUp(self):
        self.app = Flask(__name__)
        self.app.config['SECRET_KEY'] = 'test-secret'
        self.client = self.app.test_client()
        
        # Add test routes
        @self.app.route('/login', methods=['POST'])
        def login():
            token = jwt.encode({'user_id': 123}, 'test-secret', algorithm='HS256')
            return {'token': token}
        
        @self.app.route('/protected')
        def protected():
            auth_header = request.headers.get('Authorization')
            if not auth_header:
                return {'error': 'No token'}, 401
            
            token = auth_header.split(' ')[1]
            try:
                payload = jwt.decode(token, 'test-secret', algorithms=['HS256'])
                return {'user_id': payload['user_id']}
            except jwt.InvalidTokenError:
                return {'error': 'Invalid token'}, 401
    
    def test_login_endpoint(self):
        response = self.client.post('/login')
        self.assertEqual(response.status_code, 200)
        data = json.loads(response.data)
        self.assertIn('token', data)
    
    def test_protected_endpoint_with_token(self):
        # Get token
        login_response = self.client.post('/login')
        token = json.loads(login_response.data)['token']
        
        # Use token to access protected endpoint
        headers = {'Authorization': f'Bearer {token}'}
        response = self.client.get('/protected', headers=headers)
        
        self.assertEqual(response.status_code, 200)
        data = json.loads(response.data)
        self.assertEqual(data['user_id'], 123)
    
    def test_protected_endpoint_without_token(self):
        response = self.client.get('/protected')
        self.assertEqual(response.status_code, 401)

Performance and Optimization

PyJWT Benchmarks

import time
import jwt
from typing import List

def benchmark_jwt_operations(iterations: int = 10000):
    secret = "benchmark-secret"
    payload = {"user_id": 123, "role": "user"}
    
    # Token creation test
    start_time = time.time()
    tokens = []
    for _ in range(iterations):
        token = jwt.encode(payload, secret, algorithm="HS256")
        tokens.append(token)
    encode_time = time.time() - start_time
    
    # Token decoding test
    start_time = time.time()
    for token in tokens:
        decoded = jwt.decode(token, secret, algorithms=["HS256"])
    decode_time = time.time() - start_time
    
    print(f"Created {iterations} tokens: {encode_time:.2f}s")
    print(f"Decoded {iterations} tokens: {decode_time:.2f}s")
    print(f"Tokens created per second: {iterations/encode_time:.0f}")
    print(f"Tokens decoded per second: {iterations/decode_time:.0f}")

# Run benchmark
benchmark_jwt_operations()

Performance Optimizations

class OptimizedJWTHandler:
    def __init__(self, secret: str):
        self.secret = secret
        self.algorithm = "HS256"
        # Cache frequently used tokens
        self._token_cache = {}
        self._cache_ttl = 300  # 5 minutes
    
    def encode_with_cache(self, payload: dict, cache_key: str = None):
        if cache_key and cache_key in self._token_cache:
            cached_token, timestamp = self._token_cache[cache_key]
            if time.time() - timestamp < self._cache_ttl:
                return cached_token
        
        token = jwt.encode(payload, self.secret, algorithm=self.algorithm)
        
        if cache_key:
            self._token_cache[cache_key] = (token, time.time())
        
        return token
    
    def batch_encode(self, payloads: List[dict]) -> List[str]:
        """Batch token creation for better performance"""
        return [
            jwt.encode(payload, self.secret, algorithm=self.algorithm)
            for payload in payloads
        ]
    
    def verify_with_cache(self, token: str):
        """Verify token with result caching"""
        token_hash = hash(token)
        if token_hash in self._token_cache:
            cached_result, timestamp = self._token_cache[token_hash]
            if time.time() - timestamp < 60:  # 1‑minute cache
                return cached_result
        
        try:
            decoded = jwt.decode(token, self.secret, algorithms=[self.algorithm])
            self._token_cache[token_hash] = (decoded, time.time())
            return decoded
        except jwt.InvalidTokenError:
            return None

Comparison of PyJWT with Other Libraries

Comparison Table

Library Features Pros Cons
PyJWT Simplicity, flexibility, active maintenance Easy to use, good documentation No built‑in framework integrations
Authlib Advanced OAuth2 capabilities Full OAuth2/OpenID Connect support More complex configuration
python-jose Extended cryptography Supports JWE, JWS, JWK Integration can be harder
Flask‑JWT‑Extended Flask integration Ready‑made decorators, refresh tokens Tied to Flask

Migrating from Other Libraries

# Migration from python-jose
from jose import jwt as jose_jwt
import jwt as pyjwt

def migrate_from_jose():
    # Old code using python-jose
    # token = jose_jwt.encode(payload, secret, algorithm="HS256")
    
    # New code using PyJWT
    token = pyjwt.encode(payload, secret, algorithm="HS256")
    
    # Key differences:
    # 1. PyJWT returns a string, not bytes
    # 2. Slightly different error handling
    # 3. Some options have different names

Security Recommendations

Common Vulnerabilities and Mitigations

# Bad: missing exp
payload = {"user_id": 123}
token = jwt.encode(payload, secret, algorithm="HS256")

# Good: include expiration
payload = {
    "user_id": 123,
    "exp": datetime.datetime.utcnow() + datetime.timedelta(hours=1)
}
token = jwt.encode(payload, secret, algorithm="HS256")

# Bad: weak secret
secret = "123"

# Good: strong secret
import secrets
secret = secrets.token_urlsafe(32)

# Bad: not verifying algorithm
decoded = jwt.decode(token, secret)

# Good: explicitly specify algorithms
decoded = jwt.decode(token, secret, algorithms=["HS256"])

Best Security Practices

class SecureJWTHandler:
    def __init__(self, secret: str):
        self.secret = secret
        self.algorithm = "HS256"
        self.max_token_age = datetime.timedelta(hours=1)
    
    def create_secure_token(self, user_id: int, additional_claims: dict = None):
        """Create a secure token with mandatory claims"""
        now = datetime.datetime.utcnow()
        
        payload = {
            "sub": str(user_id),
            "iat": now,
            "exp": now + self.max_token_age,
            "nbf": now,
            "jti": str(uuid.uuid4()),
            "iss": "myapp.com"
        }
        
        if additional_claims:
            payload.update(additional_claims)
        
        return jwt.encode(payload, self.secret, algorithm=self.algorithm)
    
    def verify_secure_token(self, token: str, audience: str = None):
        """Secure token verification"""
        try:
            options = {
                "verify_signature": True,
                "verify_exp": True,
                "verify_nbf": True,
                "verify_iat": True,
                "verify_aud": bool(audience)
            }
            
            return jwt.decode(
                token, 
                self.secret, 
                algorithms=[self.algorithm],
                audience=audience,
                options=options
            )
        except jwt.ExpiredSignatureError:
            raise SecurityError("Token expired")
        except jwt.InvalidSignatureError:
            raise SecurityError("Invalid signature")
        except jwt.InvalidTokenError:
            raise SecurityError("Invalid token")

Common Use Cases

Authentication System with Refresh Tokens

class RefreshTokenSystem:
    def __init__(self, secret: str):
        self.secret = secret
        self.access_token_expire = datetime.timedelta(minutes=15)
        self.refresh_token_expire = datetime.timedelta(days=30)
    
    def create_token_pair(self, user_id: int):
        """Generate access and refresh token pair"""
        now = datetime.datetime.utcnow()
        
        # Access token (short‑lived)
        access_payload = {
            "sub": str(user_id),
            "type": "access",
            "exp": now + self.access_token_expire,
            "iat": now
        }
        access_token = jwt.encode(access_payload, self.secret, algorithm="HS256")
        
        # Refresh token (long‑lived)
        refresh_payload = {
            "sub": str(user_id),
            "type": "refresh",
            "exp": now + self.refresh_token_expire,
            "iat": now
        }
        refresh_token = jwt.encode(refresh_payload, self.secret, algorithm="HS256")
        
        return {
            "access_token": access_token,
            "refresh_token": refresh_token,
            "token_type": "Bearer",
            "expires_in": int(self.access_token_expire.total_seconds())
        }
    
    def refresh_access_token(self, refresh_token: str):
        """Refresh an access token using a refresh token"""
        try:
            payload = jwt.decode(refresh_token, self.secret, algorithms=["HS256"])
            
            if payload.get("type") != "refresh":
                raise ValueError("Not a refresh token")
            
            user_id = int(payload["sub"])
            return self.create_token_pair(user_id)
            
        except jwt.ExpiredSignatureError:
            raise ValueError("Refresh token expired")
        except jwt.InvalidTokenError:
            raise ValueError("Invalid refresh token")

Multi‑Level Authorization

class RoleBasedJWT:
    def __init__(self, secret: str):
        self.secret = secret
        self.role_hierarchy = {
            "admin": ["admin", "moderator", "user"],
            "moderator": ["moderator", "user"],
            "user": ["user"]
        }
    
    def create_token_with_roles(self, user_id: int, roles: List[str], permissions: List[str]):
        payload = {
            "sub": str(user_id),
            "roles": roles,
            "permissions": permissions,
            "exp": datetime.datetime.utcnow() + datetime.timedelta(hours=1)
        }
        return jwt.encode(payload, self.secret, algorithm="HS256")
    
    def check_permission(self, token: str, required_role: str = None, required_permission: str = None):
        try:
            payload = jwt.decode(token, self.secret, algorithms=["HS256"])
            user_roles = payload.get("roles", [])
            user_permissions = payload.get("permissions", [])
            
            # Role check
            if required_role:
                allowed_roles = self.role_hierarchy.get(required_role, [])
                if not any(role in allowed_roles for role in user_roles):
                    return False
            
            # Permission check
            if required_permission and required_permission not in user_permissions:
                return False
            
            return True
            
        except jwt.InvalidTokenError:
            return False

Database Integration

Storing Tokens in Redis

import redis
import json

class RedisTokenStore:
    def __init__(self, redis_client: redis.Redis, secret: str):
        self.redis = redis_client
        self.secret = secret
    
    def store_token(self, user_id: int, token: str, expires_in: int):
        """Save token in Redis with TTL"""
        key = f"user_token:{user_id}"
        self.redis.setex(key, expires_in, token)
    
    def get_token(self, user_id: int) -> str:
        """Retrieve token from Redis"""
        key = f"user_token:{user_id}"
        token = self.redis.get(key)
        return token.decode() if token else None
    
    def revoke_token(self, user_id: int):
        """Delete token from Redis"""
        key = f"user_token:{user_id}"
        self.redis.delete(key)
    
    def create_and_store_token(self, user_id: int, payload: dict, expires_in: int = 3600):
        """Create a token and store it in Redis"""
        payload.update({
            "exp": datetime.datetime.utcnow() + datetime.timedelta(seconds=expires_in),
            "sub": str(user_id)
        })
        
        token = jwt.encode(payload, self.secret, algorithm="HS256")
        self.store_token(user_id, token, expires_in)
        return token

SQLAlchemy Integration

from sqlalchemy import Column, Integer, String, DateTime, Text
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker

Base = declarative_base()

class TokenBlacklist(Base):
    __tablename__ = "token_blacklist"
    
    id = Column(Integer, primary_key=True)
    jti = Column(String(36), unique=True, nullable=False)
    user_id = Column(Integer, nullable=False)
    created_at = Column(DateTime, default=datetime.datetime.utcnow)
    expires_at = Column(DateTime, nullable=False)

class DatabaseTokenManager:
    def __init__(self, db_session, secret: str):
        self.db = db_session
        self.secret = secret
    
    def create_token_with_jti(self, user_id: int, payload: dict):
        """Create a token with a JTI for blacklist tracking"""
        jti = str(uuid.uuid4())
        payload.update({
            "jti": jti,
            "sub": str(user_id),
            "exp": datetime.datetime.utcnow() + datetime.timedelta(hours=1)
        })
        
        token = jwt.encode(payload, self.secret, algorithm="HS256")
        return token, jti
    
    def blacklist_token(self, token: str):
        """Add token to blacklist"""
        try:
            payload = jwt.decode(token, self.secret, algorithms=["HS256"])
            jti = payload.get("jti")
            user_id = payload.get("sub")
            expires_at = datetime.datetime.fromtimestamp(payload.get("exp"))
            
            if jti:
                blacklisted_token = TokenBlacklist(
                    jti=jti,
                    user_id=int(user_id),
                    expires_at=expires_at
                )
                self.db.add(blacklisted_token)
                self.db.commit()
                
        except jwt.InvalidTokenError:
            pass
    
    def is_token_blacklisted(self, token: str) -> bool:
        """Check if token is blacklisted"""
        try:
            payload = jwt.decode(token, self.secret, algorithms=["HS256"])
            jti = payload.get("jti")
            
            if jti:
                return self.db.query(TokenBlacklist).filter_by(jti=jti).first() is not None
            return False
            
        except jwt.InvalidTokenError:
            return True

Monitoring and Logging

JWT Operations Logging System

import logging
import json
from datetime import datetime

class JWTLogger:
    def __init__(self, logger_name: str = "jwt_operations"):
        self.logger = logging.getLogger(logger_name)
        self.logger.setLevel(logging.INFO)
        
        # File handler
        handler = logging.FileHandler("jwt_operations.log")
        formatter = logging.Formatter(
            '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
        )
        handler.setFormatter(formatter)
        self.logger.addHandler(handler)
    
    def log_token_creation(self, user_id: int, token_type: str, expires_in: int):
        self.logger.info(json.dumps({
            "operation": "token_created",
            "user_id": user_id,
            "token_type": token_type,
            "expires_in": expires_in,
            "timestamp": datetime.utcnow().isoformat()
        }))
    
    def log_token_verification(self, user_id: int, success: bool, error: str = None):
        log_data = {
            "operation": "token_verified",
            "user_id": user_id,
            "success": success,
            "timestamp": datetime.utcnow().isoformat()
        }
        if error:
            log_data["error"] = error
        
        if success:
            self.logger.info(json.dumps(log_data))
        else:
            self.logger.warning(json.dumps(log_data))
    
    def log_token_revocation(self, user_id: int, reason: str):
        self.logger.info(json.dumps({
            "operation": "token_revoked",
            "user_id": user_id,
            "reason": reason,
            "timestamp": datetime.utcnow().isoformat()
        }))

Metrics and Monitoring

from dataclasses import dataclass
from typing import Dict
import time

@dataclass
class JWTMetrics:
    tokens_created: int = 0
    tokens_verified: int = 0
    verification_failures: int = 0
    expired_tokens: int = 0
    invalid_signatures: int = 0
    
    def to_dict(self) -> Dict[str, int]:
        return {
            "tokens_created": self.tokens_created,
            "tokens_verified": self.tokens_verified,
            "verification_failures": self.verification_failures,
            "expired_tokens": self.expired_tokens,
            "invalid_signatures": self.invalid_signatures
        }

class MonitoredJWTHandler:
    def __init__(self, secret: str):
        self.secret = secret
        self.metrics = JWTMetrics()
        self.logger = JWTLogger()
    
    def create_token(self, payload: dict) -> str:
        start_time = time.time()
        
        token = jwt.encode(payload, self.secret, algorithm="HS256")
        
        # Update metrics
        self.metrics.tokens_created += 1
        
        # Log operation
        user_id = payload.get("sub", "unknown")
        expires_in = payload.get("exp", 0) - time.time() if payload.get("exp") else 0
        self.logger.log_token_creation(user_id, "access", int(expires_in))
        
        return token
    
    def verify_token(self, token: str) -> dict:
        try:
            payload = jwt.decode(token, self.secret, algorithms=["HS256"])
            
            # Update metrics
            self.metrics.tokens_verified += 1
            
            # Log success
            user_id = payload.get("sub", "unknown")
            self.logger.log_token_verification(user_id, True)
            
            return payload
            
        except jwt.ExpiredSignatureError:
            self.metrics.expired_tokens += 1
            self.metrics.verification_failures += 1
            self.logger.log_token_verification("unknown", False, "expired")
            raise
        except jwt.InvalidSignatureError:
            self.metrics.invalid_signatures += 1
            self.metrics.verification_failures += 1
            self.logger.log_token_verification("unknown", False, "invalid_signature")
            raise
        except jwt.InvalidTokenError:
            self.metrics.verification_failures += 1
            self.logger.log_token_verification("unknown", False, "invalid_token")
            raise
    
    def get_metrics(self) -> Dict[str, int]:
        return self.metrics.to_dict()

Frequently Asked Questions

What is PyJWT and what is it used for?

PyJWT is a Python library for creating, encoding, and decoding JWT tokens. It is used for user authentication, API authorization, and secure data exchange between client and server.

Is it safe to use JWT in production?

Yes, JWT can be safely used in production when you follow best practices:

  • Use strong secret keys
  • Set short token lifetimes
  • Transmit tokens only over HTTPS
  • Avoid storing sensitive data in the payload

Which algorithm is better: HS256 or RS256?

  • HS256 is suitable for simple, single‑server applications where a shared secret can be securely distributed.
  • RS256 is preferred for distributed systems where multiple services need to verify tokens but only one service signs them.

Where and how should I store the secret key securely?

  • Use environment variables
  • Leverage secret managers (AWS Secrets Manager, HashiCorp Vault)
  • Never hard‑code keys in source code or config files
  • Rotate keys regularly

Can I use PyJWT with FastAPI or Flask?

Yes, PyJWT integrates smoothly with both frameworks:

  • In Flask you can create decorators to protect routes
  • In FastAPI you can use dependency injection for token validation

Does PyJWT support refresh tokens?

PyJWT does not provide built‑in refresh‑token handling, but you can easily implement access/refresh token pairs at the application level.

How should I handle expired tokens?

try:
    payload = jwt.decode(token, secret, algorithms=["HS256"])
except jwt.ExpiredSignatureError:
    # Token expired – request a new one or redirect to login
    return {"error": "Token expired", "code": "TOKEN_EXPIRED"}

Can a JWT be revoked before its expiration?

JWTs are stateless, so they cannot be revoked directly. However you can:

  • Maintain a blacklist in a database
  • Use short lifetimes combined with refresh tokens
  • Store active tokens in Redis and delete them on revocation

How do I test applications that use JWT?

def test_protected_endpoint():
    # Create a test token
    test_payload = {"user_id": 123, "exp": datetime.utcnow() + timedelta(hours=1)}
    test_token = jwt.encode(test_payload, "test-secret", algorithm="HS256")
    
    # Use the token in a request
    headers = {"Authorization": f"Bearer {test_token}"}
    response = client.get("/protected", headers=headers)
    
    assert response.status_code == 200

Does payload size affect performance?

Yes, a larger payload means a larger token, which impacts:

  • Network transmission time
  • HTTP header size
  • Processing time

It is recommended to store only the data that is strictly necessary in a JWT.

Conclusion

PyJWT is a powerful, flexible, and reliable library for handling JWT tokens in Python. It provides all the tools needed for secure authentication and authorization in modern web applications.

Key advantages of PyJWT:

  • Easy to use with an intuitive API
  • Full compliance with RFC 7519
  • Wide range of supported signing algorithms
  • Active community and regular updates
  • Excellent compatibility with popular web frameworks

When using PyJWT in production, always follow security best practices: use strong keys, set appropriate token lifetimes, transmit data over HTTPS only, and keep the library up to date.

PyJWT will become an indispensable tool for building modern APIs, microservices, and web applications that require a robust authentication system.

News