Introduction to aiohttp
aiohttp is a powerful asynchronous library for working with HTTP clients and servers in Python. It is built on top of asyncio and provides a flexible interface for creating scalable web applications and performing concurrent HTTP requests. The library supports HTTP/1.1, WebSocket, cookies, sessions, and integration with popular template engines.
What is aiohttp
aiohttp (Asynchronous I/O HTTP) is a comprehensive Python library that combines the functionality of an HTTP client and an HTTP server using asynchronous programming. Designed to work with asyncio, it allows you to create high-performance web applications capable of handling thousands of concurrent connections.
Main Features of the Library
The aiohttp library provides a wide range of features for developing modern web applications. It supports the creation of simple HTTP clients for interacting with external APIs and full-fledged web servers with support for middleware, routing, and templating.
Advantages and Features of aiohttp
Asynchronous Architecture
The main advantage of aiohttp is its asynchronous nature. By using asyncio, the library can handle multiple concurrent connections without blocking the execution thread. This makes it an ideal choice for high-load applications.
Versatility of Use
aiohttp supports both client and server functionality in a single package. This means that developers can use a single library to create complete web applications that simultaneously act as an HTTP client and server.
Built-in Support for Modern Technologies
The library natively supports WebSocket, which allows you to create interactive real-time applications. Support for various data formats, including JSON, forms, and files, is also included.
Extensibility and Flexibility
aiohttp provides a middleware system for adding additional functionality, a routing system for organizing URL handlers, and the ability to integrate with popular template engines.
Installation and Basic Configuration
Installing the Main Library
pip install aiohttp
Installing Additional Dependencies
For working with Jinja2 templates:
pip install aiohttp_jinja2 jinja2
For working with sessions:
pip install aiohttp_session
Checking the Installation
After installation, you can check the library version:
import aiohttp
print(aiohttp.__version__)
Asynchronous HTTP Client with aiohttp
Basics of Working with ClientSession
ClientSession is the main class for performing HTTP requests in aiohttp. It manages the connection pool and ensures efficient connection reuse.
import aiohttp
import asyncio
async def fetch(url):
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
return await response.text()
# Running an asynchronous function
asyncio.run(fetch('https://example.com'))
Client Session Configuration
async def create_configured_session():
timeout = aiohttp.ClientTimeout(total=30)
connector = aiohttp.TCPConnector(limit=100, limit_per_host=10)
async with aiohttp.ClientSession(
timeout=timeout,
connector=connector,
headers={'User-Agent': 'MyApp/1.0'}
) as session:
return session
Sending GET and POST Requests
Performing GET Requests
async def get_example():
async with aiohttp.ClientSession() as session:
async with session.get('https://api.example.com/data') as resp:
if resp.status == 200:
data = await resp.json()
return data
else:
print(f"Error: {resp.status}")
Sending POST Requests
async def post_example():
payload = {'key': 'value', 'number': 42}
async with aiohttp.ClientSession() as session:
async with session.post('https://api.example.com/post', json=payload) as resp:
response_text = await resp.text()
print(response_text)
Sending Form Data
async def send_form_data():
form_data = aiohttp.FormData()
form_data.add_field('username', 'john_doe')
form_data.add_field('password', 'secret123')
async with aiohttp.ClientSession() as session:
async with session.post('https://example.com/login', data=form_data) as resp:
return await resp.text()
Handling Headers and Parameters
Working with Headers
async def request_with_headers():
headers = {
'Authorization': 'Bearer your_token_here',
'Content-Type': 'application/json',
'Accept': 'application/json'
}
async with aiohttp.ClientSession() as session:
async with session.get('https://api.example.com/protected', headers=headers) as resp:
return await resp.json()
Passing Request Parameters
async def request_with_params():
params = {
'q': 'python programming',
'limit': 20,
'offset': 0,
'sort': 'date'
}
async with aiohttp.ClientSession() as session:
async with session.get('https://api.example.com/search', params=params) as resp:
return await resp.json()
Uploading and Sending Files
Sending Files
async def upload_file():
with open('document.pdf', 'rb') as file:
data = aiohttp.FormData()
data.add_field('file', file, filename='document.pdf', content_type='application/pdf')
data.add_field('description', 'Important document')
async with aiohttp.ClientSession() as session:
async with session.post('https://example.com/upload', data=data) as resp:
return await resp.json()
Downloading Files
async def download_file(url, filename):
async with aiohttp.ClientSession() as session:
async with session.get(url) as resp:
if resp.status == 200:
with open(filename, 'wb') as file:
async for chunk in resp.content.iter_chunked(8192):
file.write(chunk)
return f"File {filename} successfully downloaded"
Using Sessions for Multiple Requests
Effective Use of Sessions
async def multiple_requests():
urls = [
'https://api.example.com/user/1',
'https://api.example.com/user/2',
'https://api.example.com/user/3'
]
async with aiohttp.ClientSession() as session:
tasks = []
for url in urls:
task = asyncio.create_task(fetch_user(session, url))
tasks.append(task)
results = await asyncio.gather(*tasks)
return results
async def fetch_user(session, url):
async with session.get(url) as resp:
return await resp.json()
Connection Reuse
Using one session for multiple requests allows you to efficiently reuse HTTP connections through the HTTP Keep-Alive mechanism, which significantly improves performance.
Error and Exception Handling
Main Types of Exceptions
from aiohttp import ClientError, ClientTimeout, ClientConnectorError
import asyncio
async def robust_request(url):
try:
timeout = ClientTimeout(total=10, connect=5)
async with aiohttp.ClientSession(timeout=timeout) as session:
async with session.get(url) as resp:
resp.raise_for_status() # Raises an exception for HTTP errors
return await resp.text()
except ClientConnectorError as e:
print(f"Connection error: {e}")
except ClientTimeout as e:
print(f"Timeout exceeded: {e}")
except ClientError as e:
print(f"Client error: {e}")
except Exception as e:
print(f"Unexpected error: {e}")
Handling HTTP Statuses
async def handle_http_status(url):
async with aiohttp.ClientSession() as session:
async with session.get(url) as resp:
if resp.status == 200:
return await resp.json()
elif resp.status == 404:
print("Resource not found")
elif resp.status == 500:
print("Internal server error")
else:
print(f"Unexpected status: {resp.status}")
Creating an Asynchronous Web Server
Simplest Server
from aiohttp import web
async def hello_handler(request):
return web.Response(text="Hello from aiohttp server!")
async def create_app():
app = web.Application()
app.router.add_get('/', hello_handler)
return app
if __name__ == '__main__':
app = create_app()
web.run_app(app, host='localhost', port=8080)
Structured Application
from aiohttp import web
import aiohttp_cors
async def init_app():
app = web.Application()
# CORS configuration
cors = aiohttp_cors.setup(app, defaults={
"*": aiohttp_cors.ResourceOptions(
allow_credentials=True,
expose_headers="*",
allow_headers="*",
allow_methods="*"
)
})
# Adding routes
app.router.add_get('/', index_handler)
app.router.add_get('/health', health_check)
return app
Routing and URL Handling
Dynamic Routes
async def user_handler(request):
user_id = request.match_info['user_id']
return web.json_response({'user_id': user_id})
async def user_posts_handler(request):
user_id = request.match_info['user_id']
post_id = request.match_info.get('post_id')
if post_id:
return web.json_response({'user_id': user_id, 'post_id': post_id})
else:
return web.json_response({'user_id': user_id, 'posts': []})
app.router.add_get('/user/{user_id}', user_handler)
app.router.add_get('/user/{user_id}/posts', user_posts_handler)
app.router.add_get('/user/{user_id}/posts/{post_id}', user_posts_handler)
Using RouteTableDef
from aiohttp import web
routes = web.RouteTableDef()
@routes.get('/')
async def index(request):
return web.Response(text="Main page")
@routes.get('/api/users')
async def get_users(request):
return web.json_response([{'id': 1, 'name': 'John'}])
@routes.post('/api/users')
async def create_user(request):
data = await request.json()
return web.json_response({'id': 2, 'name': data['name']})
app = web.Application()
app.add_routes(routes)
Working with JSON and Forms
Handling JSON Data
async def json_handler(request):
try:
data = await request.json()
# Processing data
result = {
'status': 'success',
'received': data,
'processed_at': datetime.now().isoformat()
}
return web.json_response(result)
except ValueError:
return web.json_response({'error': 'Invalid JSON'}, status=400)
Handling Forms
async def form_handler(request):
if request.method == 'POST':
data = await request.post()
username = data.get('username')
email = data.get('email')
if not username or not email:
return web.json_response({'error': 'Missing fields'}, status=400)
# Processing form data
return web.json_response({'message': f'User {username} created'})
return web.Response(text='''
<form method="post">
<input name="username" type="text" placeholder="Username">
<input name="email" type="email" placeholder="Email">
<button type="submit">Submit</button>
</form>
''', content_type='text/html')
Handling Files in Forms
async def file_upload_handler(request):
reader = await request.multipart()
while True:
field = await reader.next()
if field is None:
break
if field.name == 'file':
filename = field.filename
# Saving the file
with open(f'uploads/{filename}', 'wb') as f:
while True:
chunk = await field.read_chunk()
if not chunk:
break
f.write(chunk)
return web.json_response({'message': 'File uploaded successfully'})
WebSocket Connections in aiohttp
Basic WebSocket Implementation
async def websocket_handler(request):
ws = web.WebSocketResponse()
await ws.prepare(request)
print("WebSocket connection established")
async for msg in ws:
if msg.type == web.WSMsgType.TEXT:
if msg.data == 'close':
await ws.close()
else:
await ws.send_str(f"Echo: {msg.data}")
elif msg.type == web.WSMsgType.ERROR:
print(f'WebSocket error: {ws.exception()}')
print("WebSocket connection closed")
return ws
app.router.add_get('/ws', websocket_handler)
Advanced WebSocket Implementation
import json
import weakref
class WebSocketManager:
def __init__(self):
self.connections = weakref.WeakSet()
async def add_connection(self, ws):
self.connections.add(ws)
async def broadcast(self, message):
if self.connections:
await asyncio.gather(
*[ws.send_str(json.dumps(message)) for ws in self.connections],
return_exceptions=True
)
manager = WebSocketManager()
async def websocket_handler(request):
ws = web.WebSocketResponse()
await ws.prepare(request)
await manager.add_connection(ws)
try:
async for msg in ws:
if msg.type == web.WSMsgType.TEXT:
data = json.loads(msg.data)
if data['type'] == 'broadcast':
await manager.broadcast(data['message'])
else:
await ws.send_str(json.dumps({'echo': data}))
except Exception as e:
print(f"WebSocket error: {e}")
return ws
Integration with Template Engines
Jinja2 Configuration
import aiohttp_jinja2
import jinja2
from aiohttp import web
async def create_app():
app = web.Application()
# Jinja2 configuration
aiohttp_jinja2.setup(
app,
loader=jinja2.FileSystemLoader('templates'),
auto_reload=True,
enable_async=True
)
return app
Using Templates
@aiohttp_jinja2.template('index.html')
async def index_handler(request):
return {
'title': 'Main page',
'user': {'name': 'John', 'email': 'john@example.com'},
'posts': [
{'title': 'First post', 'content': 'Post content'},
{'title': 'Second post', 'content': 'Another content'}
]
}
# Or without decorator
async def about_handler(request):
context = {'title': 'About us', 'description': 'Company information'}
return aiohttp_jinja2.render_template('about.html', request, context)
Middleware and Request Handling
Creating Middleware
@web.middleware
async def logging_middleware(request, handler):
start_time = time.time()
try:
response = await handler(request)
process_time = time.time() - start_time
print(f"{request.method} {request.path} - {response.status} - {process_time:.3f}s")
return response
except Exception as e:
process_time = time.time() - start_time
print(f"{request.method} {request.path} - ERROR: {e} - {process_time:.3f}s")
raise
@web.middleware
async def auth_middleware(request, handler):
if request.path.startswith('/api/'):
auth_header = request.headers.get('Authorization')
if not auth_header or not auth_header.startswith('Bearer '):
return web.json_response({'error': 'Unauthorized'}, status=401)
return await handler(request)
Handling Exceptions
@web.middleware
async def error_middleware(request, handler):
try:
return await handler(request)
except web.HTTPException as e:
return web.json_response({'error': str(e)}, status=e.status)
except Exception as e:
return web.json_response({'error': 'Internal server error'}, status=500)
Deployment and Configuration
Running with Settings
async def init_app():
app = web.Application(middlewares=[
logging_middleware,
auth_middleware,
error_middleware
])
app.router.add_routes(routes)
return app
if __name__ == '__main__':
app = init_app()
web.run_app(
app,
host='0.0.0.0',
port=8080,
access_log=None, # Disable standard logging
reuse_port=True # For performance improvement
)
Integration with Gunicorn
gunicorn app:create_app --bind 0.0.0.0:8080 --worker-class aiohttp.GunicornWebWorker --workers 4
Creating a gunicorn.conf.py configuration file:
bind = "0.0.0.0:8080"
worker_class = "aiohttp.GunicornWebWorker"
workers = 4
worker_connections = 1000
max_requests = 1000
max_requests_jitter = 100
preload_app = True
Table of Main aiohttp Methods and Functions
| Category | Method/Function | Description | Example Usage |
|---|---|---|---|
| HTTP Client | ClientSession() |
Creating a client session | async with ClientSession() as session: |
session.get(url) |
GET request | await session.get('https://api.com') |
|
session.post(url, data) |
POST request | await session.post(url, json=data) |
|
session.put(url, data) |
PUT request | await session.put(url, json=data) |
|
session.delete(url) |
DELETE request | await session.delete(url) |
|
session.patch(url, data) |
PATCH request | await session.patch(url, json=data) |
|
session.head(url) |
HEAD request | await session.head(url) |
|
session.options(url) |
OPTIONS request | await session.options(url) |
|
| Response Handling | response.text() |
Getting the response text | await response.text() |
response.json() |
Getting the JSON response | await response.json() |
|
response.read() |
Getting the response bytes | await response.read() |
|
response.status |
HTTP status code | if response.status == 200: |
|
response.headers |
Response headers | response.headers['Content-Type'] |
|
response.cookies |
Response cookies | response.cookies['session_id'] |
|
| Web Server | web.Application() |
Creating an application | app = web.Application() |
web.run_app(app) |
Starting the server | web.run_app(app, port=8080) |
|
web.Response() |
Creating a response | web.Response(text='Hello') |
|
web.json_response() |
JSON response | web.json_response({'key': 'value'}) |
|
web.FileResponse() |
File response | web.FileResponse('file.txt') |
|
web.StreamResponse() |
Stream response | web.StreamResponse() |
|
| Routing | app.router.add_get() |
Adding a GET route | app.router.add_get('/', handler) |
app.router.add_post() |
Adding a POST route | app.router.add_post('/api', handler) |
|
app.router.add_route() |
Adding a route | app.router.add_route('PUT', '/api', handler) |
|
app.add_routes() |
Adding a list of routes | app.add_routes([route1, route2]) |
|
web.RouteTableDef() |
Creating a route table | routes = web.RouteTableDef() |
|
| Request | request.method |
HTTP method | if request.method == 'POST': |
request.url |
Request URL | str(request.url) |
|
request.query |
Query parameters | request.query['param'] |
|
request.headers |
Request headers | request.headers['Authorization'] |
|
request.cookies |
Request cookies | request.cookies['session'] |
|
request.json() |
JSON data | data = await request.json() |
|
request.text() |
Request text | text = await request.text() |
|
request.post() |
Form data | form = await request.post() |
|
request.multipart() |
Multipart data | reader = await request.multipart() |
|
request.match_info |
Route parameters | request.match_info['user_id'] |
|
| WebSocket | web.WebSocketResponse() |
Creating a WebSocket | ws = web.WebSocketResponse() |
ws.prepare() |
Preparing a connection | await ws.prepare(request) |
|
ws.send_str() |
Sending a string | await ws.send_str('message') |
|
ws.send_bytes() |
Sending bytes | await ws.send_bytes(data) |
|
ws.send_json() |
Sending JSON | await ws.send_json({'key': 'value'}) |
|
ws.receive() |
Receiving a message | msg = await ws.receive() |
|
ws.close() |
Closing the connection | await ws.close() |
|
| Middleware | @web.middleware |
Middleware decorator | @web.middleware async def func(): |
app.middlewares |
Middleware list | app.middlewares.append(middleware) |
|
| Exceptions | web.HTTPException |
Base HTTP exception | raise web.HTTPNotFound() |
web.HTTPNotFound |
404 error | raise web.HTTPNotFound() |
|
web.HTTPBadRequest |
400 error | raise web.HTTPBadRequest() |
|
web.HTTPUnauthorized |
401 error | raise web.HTTPUnauthorized() |
|
web.HTTPForbidden |
403 error | raise web.HTTPForbidden() |
|
web.HTTPInternalServerError |
500 error | raise web.HTTPInternalServerError() |
|
| Configuration | ClientTimeout() |
Timeout settings | ClientTimeout(total=30) |
TCPConnector() |
Connection settings | TCPConnector(limit=100) |
|
FormData() |
Creating a form | FormData() |
|
MultipartWriter() |
Multipart writer | MultipartWriter() |
Practical Application Examples
API Aggregator
async def aggregate_data():
urls = [
'https://api.service1.com/data',
'https://api.service2.com/data',
'https://api.service3.com/data'
]
async with aiohttp.ClientSession() as session:
tasks = [session.get(url) for url in urls]
responses = await asyncio.gather(*tasks)
results = []
for response in responses:
if response.status == 200:
data = await response.json()
results.append(data)
return results
Microservice with REST API
from aiohttp import web
import json
class UserService:
def __init__(self):
self.users = {}
self.next_id = 1
async def get_users(self, request):
return web.json_response(list(self.users.values()))
async def create_user(self, request):
data = await request.json()
user_id = self.next_id
self.users[user_id] = {
'id': user_id,
'name': data['name'],
'email': data['email']
}
self.next_id += 1
return web.json_response(self.users[user_id], status=201)
async def get_user(self, request):
user_id = int(request.match_info['user_id'])
if user_id not in self.users:
raise web.HTTPNotFound()
return web.json_response(self.users[user_id])
service = UserService()
app = web.Application()
app.router.add_get('/users', service.get_users)
app.router.add_post('/users', service.create_user)
app.router.add_get('/users/{user_id}', service.get_user)
Chat Server on WebSocket
import json
import weakref
from aiohttp import web
class ChatRoom:
def __init__(self):
self.clients = weakref.WeakSet()
async def join(self, ws):
self.clients.add(ws)
await self.broadcast({'type': 'user_joined', 'count': len(self.clients)})
async def leave(self, ws):
if ws in self.clients:
self.clients.remove(ws)
await self.broadcast({'type': 'user_left', 'count': len(self.clients)})
async def broadcast(self, message):
if self.clients:
await asyncio.gather(
*[client.send_str(json.dumps(message)) for client in self.clients],
return_exceptions=True
)
chat_room = ChatRoom()
async def websocket_handler(request):
ws = web.WebSocketResponse()
await ws.prepare(request)
await chat_room.join(ws)
try:
async for msg in ws:
if msg.type == web.WSMsgType.TEXT:
data = json.loads(msg.data)
if data['type'] == 'message':
await chat_room.broadcast({
'type': 'message',
'text': data['text'],
'user': data['user']
})
finally:
await chat_room.leave(ws)
return ws
Performance and Optimization
Connection Settings
async def optimized_client():
connector = aiohttp.TCPConnector(
limit=100, # Maximum connections
limit_per_host=10, # Maximum connections per host
ttl_dns_cache=300, # DNS cache for 5 minutes
use_dns_cache=True, # Use DNS cache
keepalive_timeout=30 # Keep-alive timeout
)
timeout = aiohttp.ClientTimeout(
total=30, # Total timeout
connect=5, # Connection timeout
sock_read=10 # Read timeout
)
return aiohttp.ClientSession(
connector=connector,
timeout=timeout,
headers={'User-Agent': 'MyApp/1.0'}
)
Connection Pools
class ConnectionPool:
def __init__(self):
self.session = None
async def get_session(self):
if self.session is None:
self.session = await optimized_client()
return self.session
async def close(self):
if self.session:
await self.session.close()
pool = ConnectionPool()
async def make_request(url):
session = await pool.get_session()
async with session.get(url) as response:
return await response.json()
Testing aiohttp Applications
Basic Tests
import pytest
from aiohttp import web
from aiohttp.test_utils import make_mocked_request
async def test_handler():
request = make_mocked_request('GET', '/')
response = await hello_handler(request)
assert response.status == 200
assert response.text == "Hello from aiohttp server!"
@pytest.fixture
async def client(aiohttp_client):
app = web.Application()
app.router.add_get('/', hello_handler)
return await aiohttp_client(app)
async def test_hello_endpoint(client):
response = await client.get('/')
assert response.status == 200
text = await response.text()
assert text == "Hello from aiohttp server!"
Testing with a Database
import pytest
import asyncio
from aiohttp import web
from aiohttp.test_utils import make_mocked_request
@pytest.fixture
def event_loop():
loop = asyncio.new_event_loop()
yield loop
loop.close()
@pytest.fixture
async def app():
app = web.Application()
# Configuring the application for tests
return app
async def test_user_creation(client):
user_data = {'name': 'Test User', 'email': 'test@example.com'}
response = await client.post('/users', json=user_data)
assert response.status == 201
data = await response.json()
assert data['name'] == 'Test User'
assert data['email'] == 'test@example.com'
Security
Data Validation
from aiohttp import web
import json
async def validate_user_data(data):
required_fields = ['name', 'email']
for field in required_fields:
if field not in data:
raise web.HTTPBadRequest(text=f"Missing field: {field}")
if '@' not in data['email']:
raise web.HTTPBadRequest(text="Invalid email format")
async def create_user_handler(request):
try:
data = await request.json()
await validate_user_data(data)
# Creating a user
user = {'id': 1, 'name': data['name'], 'email': data['email']}
return web.json_response(user, status=201)
except json.JSONDecodeError:
raise web.HTTPBadRequest(text="Invalid JSON")
Authentication and Authorization
import jwt
from aiohttp import web
async def require_auth(request):
auth_header = request.headers.get('Authorization')
if not auth_header or not auth_header.startswith('Bearer '):
raise web.HTTPUnauthorized()
token = auth_header.split(' ')[1]
try:
payload = jwt.decode(token, 'secret', algorithms=['HS256'])
request['user'] = payload
except jwt.InvalidTokenError:
raise web.HTTPUnauthorized()
@web.middleware
async def auth_middleware(request, handler):
if request.path.startswith('/protected/'):
await require_auth(request)
return await handler(request)
Frequently Asked Questions
What is aiohttp?
aiohttp is an asynchronous Python library for working with HTTP clients and servers, built on top of asyncio. It provides tools for creating high-performance web applications and performing concurrent HTTP requests.
Does aiohttp Support WebSocket?
Yes, aiohttp has built-in WebSocket support for both the client and server sides. This allows you to create interactive real-time applications.
Can I Use aiohttp with Templates?
Yes, aiohttp supports integration with popular template engines, especially with Jinja2 through the aiohttp_jinja2 library.
What is the Difference Between aiohttp and FastAPI?
aiohttp is a low-level asynchronous HTTP framework that provides basic tools for working with HTTP. FastAPI is a high-level framework for creating REST APIs with automatic typing, validation, and documentation generation.
Is aiohttp Suitable for Production?
Yes, aiohttp is suitable for production with proper configuration. It is recommended to use it with process managers like Gunicorn or uWSGI, and to configure monitoring and logging.
How to Handle Large Files in aiohttp?
To handle large files, use streaming with response.content.iter_chunked() or response.content.read() with a specified chunk size.
The Future of AI in Mathematics and Everyday Life: How Intelligent Agents Are Already Changing the Game
Experts warned about the risks of fake charity with AI
In Russia, universal AI-agent for robots and industrial processes was developed