aiohttp-asynchronous http checks

онлайн тренажер по питону
Online Python Trainer for Beginners

Learn Python easily without overwhelming theory. Solve practical tasks with automatic checking, get hints in Russian, and write code directly in your browser — no installation required.

Start Course

Introduction to aiohttp

aiohttp is a powerful asynchronous library for working with HTTP clients and servers in Python. It is built on top of asyncio and provides a flexible interface for creating scalable web applications and performing concurrent HTTP requests. The library supports HTTP/1.1, WebSocket, cookies, sessions, and integration with popular template engines.

What is aiohttp

aiohttp (Asynchronous I/O HTTP) is a comprehensive Python library that combines the functionality of an HTTP client and an HTTP server using asynchronous programming. Designed to work with asyncio, it allows you to create high-performance web applications capable of handling thousands of concurrent connections.

Main Features of the Library

The aiohttp library provides a wide range of features for developing modern web applications. It supports the creation of simple HTTP clients for interacting with external APIs and full-fledged web servers with support for middleware, routing, and templating.

Advantages and Features of aiohttp

Asynchronous Architecture

The main advantage of aiohttp is its asynchronous nature. By using asyncio, the library can handle multiple concurrent connections without blocking the execution thread. This makes it an ideal choice for high-load applications.

Versatility of Use

aiohttp supports both client and server functionality in a single package. This means that developers can use a single library to create complete web applications that simultaneously act as an HTTP client and server.

Built-in Support for Modern Technologies

The library natively supports WebSocket, which allows you to create interactive real-time applications. Support for various data formats, including JSON, forms, and files, is also included.

Extensibility and Flexibility

aiohttp provides a middleware system for adding additional functionality, a routing system for organizing URL handlers, and the ability to integrate with popular template engines.

Installation and Basic Configuration

Installing the Main Library

pip install aiohttp

Installing Additional Dependencies

For working with Jinja2 templates:

pip install aiohttp_jinja2 jinja2

For working with sessions:

pip install aiohttp_session

Checking the Installation

After installation, you can check the library version:

import aiohttp
print(aiohttp.__version__)

Asynchronous HTTP Client with aiohttp

Basics of Working with ClientSession

ClientSession is the main class for performing HTTP requests in aiohttp. It manages the connection pool and ensures efficient connection reuse.

import aiohttp
import asyncio

async def fetch(url):
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as response:
            return await response.text()

# Running an asynchronous function
asyncio.run(fetch('https://example.com'))

Client Session Configuration

async def create_configured_session():
    timeout = aiohttp.ClientTimeout(total=30)
    connector = aiohttp.TCPConnector(limit=100, limit_per_host=10)
    
    async with aiohttp.ClientSession(
        timeout=timeout,
        connector=connector,
        headers={'User-Agent': 'MyApp/1.0'}
    ) as session:
        return session

Sending GET and POST Requests

Performing GET Requests

async def get_example():
    async with aiohttp.ClientSession() as session:
        async with session.get('https://api.example.com/data') as resp:
            if resp.status == 200:
                data = await resp.json()
                return data
            else:
                print(f"Error: {resp.status}")

Sending POST Requests

async def post_example():
    payload = {'key': 'value', 'number': 42}
    async with aiohttp.ClientSession() as session:
        async with session.post('https://api.example.com/post', json=payload) as resp:
            response_text = await resp.text()
            print(response_text)

Sending Form Data

async def send_form_data():
    form_data = aiohttp.FormData()
    form_data.add_field('username', 'john_doe')
    form_data.add_field('password', 'secret123')
    
    async with aiohttp.ClientSession() as session:
        async with session.post('https://example.com/login', data=form_data) as resp:
            return await resp.text()

Handling Headers and Parameters

Working with Headers

async def request_with_headers():
    headers = {
        'Authorization': 'Bearer your_token_here',
        'Content-Type': 'application/json',
        'Accept': 'application/json'
    }
    
    async with aiohttp.ClientSession() as session:
        async with session.get('https://api.example.com/protected', headers=headers) as resp:
            return await resp.json()

Passing Request Parameters

async def request_with_params():
    params = {
        'q': 'python programming',
        'limit': 20,
        'offset': 0,
        'sort': 'date'
    }
    
    async with aiohttp.ClientSession() as session:
        async with session.get('https://api.example.com/search', params=params) as resp:
            return await resp.json()

Uploading and Sending Files

Sending Files

async def upload_file():
    with open('document.pdf', 'rb') as file:
        data = aiohttp.FormData()
        data.add_field('file', file, filename='document.pdf', content_type='application/pdf')
        data.add_field('description', 'Important document')
        
        async with aiohttp.ClientSession() as session:
            async with session.post('https://example.com/upload', data=data) as resp:
                return await resp.json()

Downloading Files

async def download_file(url, filename):
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as resp:
            if resp.status == 200:
                with open(filename, 'wb') as file:
                    async for chunk in resp.content.iter_chunked(8192):
                        file.write(chunk)
                return f"File {filename} successfully downloaded"

Using Sessions for Multiple Requests

Effective Use of Sessions

async def multiple_requests():
    urls = [
        'https://api.example.com/user/1',
        'https://api.example.com/user/2',
        'https://api.example.com/user/3'
    ]
    
    async with aiohttp.ClientSession() as session:
        tasks = []
        for url in urls:
            task = asyncio.create_task(fetch_user(session, url))
            tasks.append(task)
        
        results = await asyncio.gather(*tasks)
        return results

async def fetch_user(session, url):
    async with session.get(url) as resp:
        return await resp.json()

Connection Reuse

Using one session for multiple requests allows you to efficiently reuse HTTP connections through the HTTP Keep-Alive mechanism, which significantly improves performance.

Error and Exception Handling

Main Types of Exceptions

from aiohttp import ClientError, ClientTimeout, ClientConnectorError
import asyncio

async def robust_request(url):
    try:
        timeout = ClientTimeout(total=10, connect=5)
        async with aiohttp.ClientSession(timeout=timeout) as session:
            async with session.get(url) as resp:
                resp.raise_for_status()  # Raises an exception for HTTP errors
                return await resp.text()
    
    except ClientConnectorError as e:
        print(f"Connection error: {e}")
    except ClientTimeout as e:
        print(f"Timeout exceeded: {e}")
    except ClientError as e:
        print(f"Client error: {e}")
    except Exception as e:
        print(f"Unexpected error: {e}")

Handling HTTP Statuses

async def handle_http_status(url):
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as resp:
            if resp.status == 200:
                return await resp.json()
            elif resp.status == 404:
                print("Resource not found")
            elif resp.status == 500:
                print("Internal server error")
            else:
                print(f"Unexpected status: {resp.status}")

Creating an Asynchronous Web Server

Simplest Server

from aiohttp import web

async def hello_handler(request):
    return web.Response(text="Hello from aiohttp server!")

async def create_app():
    app = web.Application()
    app.router.add_get('/', hello_handler)
    return app

if __name__ == '__main__':
    app = create_app()
    web.run_app(app, host='localhost', port=8080)

Structured Application

from aiohttp import web
import aiohttp_cors

async def init_app():
    app = web.Application()
    
    # CORS configuration
    cors = aiohttp_cors.setup(app, defaults={
        "*": aiohttp_cors.ResourceOptions(
            allow_credentials=True,
            expose_headers="*",
            allow_headers="*",
            allow_methods="*"
        )
    })
    
    # Adding routes
    app.router.add_get('/', index_handler)
    app.router.add_get('/health', health_check)
    
    return app

Routing and URL Handling

Dynamic Routes

async def user_handler(request):
    user_id = request.match_info['user_id']
    return web.json_response({'user_id': user_id})

async def user_posts_handler(request):
    user_id = request.match_info['user_id']
    post_id = request.match_info.get('post_id')
    
    if post_id:
        return web.json_response({'user_id': user_id, 'post_id': post_id})
    else:
        return web.json_response({'user_id': user_id, 'posts': []})

app.router.add_get('/user/{user_id}', user_handler)
app.router.add_get('/user/{user_id}/posts', user_posts_handler)
app.router.add_get('/user/{user_id}/posts/{post_id}', user_posts_handler)

Using RouteTableDef

from aiohttp import web

routes = web.RouteTableDef()

@routes.get('/')
async def index(request):
    return web.Response(text="Main page")

@routes.get('/api/users')
async def get_users(request):
    return web.json_response([{'id': 1, 'name': 'John'}])

@routes.post('/api/users')
async def create_user(request):
    data = await request.json()
    return web.json_response({'id': 2, 'name': data['name']})

app = web.Application()
app.add_routes(routes)

Working with JSON and Forms

Handling JSON Data

async def json_handler(request):
    try:
        data = await request.json()
        # Processing data
        result = {
            'status': 'success',
            'received': data,
            'processed_at': datetime.now().isoformat()
        }
        return web.json_response(result)
    except ValueError:
        return web.json_response({'error': 'Invalid JSON'}, status=400)

Handling Forms

async def form_handler(request):
    if request.method == 'POST':
        data = await request.post()
        username = data.get('username')
        email = data.get('email')
        
        if not username or not email:
            return web.json_response({'error': 'Missing fields'}, status=400)
        
        # Processing form data
        return web.json_response({'message': f'User {username} created'})
    
    return web.Response(text='''
    <form method="post">
        <input name="username" type="text" placeholder="Username">
        <input name="email" type="email" placeholder="Email">
        <button type="submit">Submit</button>
    </form>
    ''', content_type='text/html')

Handling Files in Forms

async def file_upload_handler(request):
    reader = await request.multipart()
    
    while True:
        field = await reader.next()
        if field is None:
            break
        
        if field.name == 'file':
            filename = field.filename
            # Saving the file
            with open(f'uploads/{filename}', 'wb') as f:
                while True:
                    chunk = await field.read_chunk()
                    if not chunk:
                        break
                    f.write(chunk)
    
    return web.json_response({'message': 'File uploaded successfully'})

WebSocket Connections in aiohttp

Basic WebSocket Implementation

async def websocket_handler(request):
    ws = web.WebSocketResponse()
    await ws.prepare(request)
    
    print("WebSocket connection established")
    
    async for msg in ws:
        if msg.type == web.WSMsgType.TEXT:
            if msg.data == 'close':
                await ws.close()
            else:
                await ws.send_str(f"Echo: {msg.data}")
        elif msg.type == web.WSMsgType.ERROR:
            print(f'WebSocket error: {ws.exception()}')
    
    print("WebSocket connection closed")
    return ws

app.router.add_get('/ws', websocket_handler)

Advanced WebSocket Implementation

import json
import weakref

class WebSocketManager:
    def __init__(self):
        self.connections = weakref.WeakSet()
    
    async def add_connection(self, ws):
        self.connections.add(ws)
    
    async def broadcast(self, message):
        if self.connections:
            await asyncio.gather(
                *[ws.send_str(json.dumps(message)) for ws in self.connections],
                return_exceptions=True
            )

manager = WebSocketManager()

async def websocket_handler(request):
    ws = web.WebSocketResponse()
    await ws.prepare(request)
    await manager.add_connection(ws)
    
    try:
        async for msg in ws:
            if msg.type == web.WSMsgType.TEXT:
                data = json.loads(msg.data)
                if data['type'] == 'broadcast':
                    await manager.broadcast(data['message'])
                else:
                    await ws.send_str(json.dumps({'echo': data}))
    except Exception as e:
        print(f"WebSocket error: {e}")
    
    return ws

Integration with Template Engines

Jinja2 Configuration

import aiohttp_jinja2
import jinja2
from aiohttp import web

async def create_app():
    app = web.Application()
    
    # Jinja2 configuration
    aiohttp_jinja2.setup(
        app,
        loader=jinja2.FileSystemLoader('templates'),
        auto_reload=True,
        enable_async=True
    )
    
    return app

Using Templates

@aiohttp_jinja2.template('index.html')
async def index_handler(request):
    return {
        'title': 'Main page',
        'user': {'name': 'John', 'email': 'john@example.com'},
        'posts': [
            {'title': 'First post', 'content': 'Post content'},
            {'title': 'Second post', 'content': 'Another content'}
        ]
    }

# Or without decorator
async def about_handler(request):
    context = {'title': 'About us', 'description': 'Company information'}
    return aiohttp_jinja2.render_template('about.html', request, context)

Middleware and Request Handling

Creating Middleware

@web.middleware
async def logging_middleware(request, handler):
    start_time = time.time()
    
    try:
        response = await handler(request)
        process_time = time.time() - start_time
        
        print(f"{request.method} {request.path} - {response.status} - {process_time:.3f}s")
        return response
    
    except Exception as e:
        process_time = time.time() - start_time
        print(f"{request.method} {request.path} - ERROR: {e} - {process_time:.3f}s")
        raise

@web.middleware
async def auth_middleware(request, handler):
    if request.path.startswith('/api/'):
        auth_header = request.headers.get('Authorization')
        if not auth_header or not auth_header.startswith('Bearer '):
            return web.json_response({'error': 'Unauthorized'}, status=401)
    
    return await handler(request)

Handling Exceptions

@web.middleware
async def error_middleware(request, handler):
    try:
        return await handler(request)
    except web.HTTPException as e:
        return web.json_response({'error': str(e)}, status=e.status)
    except Exception as e:
        return web.json_response({'error': 'Internal server error'}, status=500)

Deployment and Configuration

Running with Settings

async def init_app():
    app = web.Application(middlewares=[
        logging_middleware,
        auth_middleware,
        error_middleware
    ])
    
    app.router.add_routes(routes)
    return app

if __name__ == '__main__':
    app = init_app()
    web.run_app(
        app,
        host='0.0.0.0',
        port=8080,
        access_log=None,  # Disable standard logging
        reuse_port=True   # For performance improvement
    )

Integration with Gunicorn

gunicorn app:create_app --bind 0.0.0.0:8080 --worker-class aiohttp.GunicornWebWorker --workers 4

Creating a gunicorn.conf.py configuration file:

bind = "0.0.0.0:8080"
worker_class = "aiohttp.GunicornWebWorker"
workers = 4
worker_connections = 1000
max_requests = 1000
max_requests_jitter = 100
preload_app = True

Table of Main aiohttp Methods and Functions

Category Method/Function Description Example Usage
HTTP Client ClientSession() Creating a client session async with ClientSession() as session:
session.get(url) GET request await session.get('https://api.com')
session.post(url, data) POST request await session.post(url, json=data)
session.put(url, data) PUT request await session.put(url, json=data)
session.delete(url) DELETE request await session.delete(url)
session.patch(url, data) PATCH request await session.patch(url, json=data)
session.head(url) HEAD request await session.head(url)
session.options(url) OPTIONS request await session.options(url)
Response Handling response.text() Getting the response text await response.text()
response.json() Getting the JSON response await response.json()
response.read() Getting the response bytes await response.read()
response.status HTTP status code if response.status == 200:
response.headers Response headers response.headers['Content-Type']
response.cookies Response cookies response.cookies['session_id']
Web Server web.Application() Creating an application app = web.Application()
web.run_app(app) Starting the server web.run_app(app, port=8080)
web.Response() Creating a response web.Response(text='Hello')
web.json_response() JSON response web.json_response({'key': 'value'})
web.FileResponse() File response web.FileResponse('file.txt')
web.StreamResponse() Stream response web.StreamResponse()
Routing app.router.add_get() Adding a GET route app.router.add_get('/', handler)
app.router.add_post() Adding a POST route app.router.add_post('/api', handler)
app.router.add_route() Adding a route app.router.add_route('PUT', '/api', handler)
app.add_routes() Adding a list of routes app.add_routes([route1, route2])
web.RouteTableDef() Creating a route table routes = web.RouteTableDef()
Request request.method HTTP method if request.method == 'POST':
request.url Request URL str(request.url)
request.query Query parameters request.query['param']
request.headers Request headers request.headers['Authorization']
request.cookies Request cookies request.cookies['session']
request.json() JSON data data = await request.json()
request.text() Request text text = await request.text()
request.post() Form data form = await request.post()
request.multipart() Multipart data reader = await request.multipart()
request.match_info Route parameters request.match_info['user_id']
WebSocket web.WebSocketResponse() Creating a WebSocket ws = web.WebSocketResponse()
ws.prepare() Preparing a connection await ws.prepare(request)
ws.send_str() Sending a string await ws.send_str('message')
ws.send_bytes() Sending bytes await ws.send_bytes(data)
ws.send_json() Sending JSON await ws.send_json({'key': 'value'})
ws.receive() Receiving a message msg = await ws.receive()
ws.close() Closing the connection await ws.close()
Middleware @web.middleware Middleware decorator @web.middleware async def func():
app.middlewares Middleware list app.middlewares.append(middleware)
Exceptions web.HTTPException Base HTTP exception raise web.HTTPNotFound()
web.HTTPNotFound 404 error raise web.HTTPNotFound()
web.HTTPBadRequest 400 error raise web.HTTPBadRequest()
web.HTTPUnauthorized 401 error raise web.HTTPUnauthorized()
web.HTTPForbidden 403 error raise web.HTTPForbidden()
web.HTTPInternalServerError 500 error raise web.HTTPInternalServerError()
Configuration ClientTimeout() Timeout settings ClientTimeout(total=30)
TCPConnector() Connection settings TCPConnector(limit=100)
FormData() Creating a form FormData()
MultipartWriter() Multipart writer MultipartWriter()

Practical Application Examples

API Aggregator

async def aggregate_data():
    urls = [
        'https://api.service1.com/data',
        'https://api.service2.com/data',
        'https://api.service3.com/data'
    ]
    
    async with aiohttp.ClientSession() as session:
        tasks = [session.get(url) for url in urls]
        responses = await asyncio.gather(*tasks)
        
        results = []
        for response in responses:
            if response.status == 200:
                data = await response.json()
                results.append(data)
        
        return results

Microservice with REST API

from aiohttp import web
import json

class UserService:
    def __init__(self):
        self.users = {}
        self.next_id = 1
    
    async def get_users(self, request):
        return web.json_response(list(self.users.values()))
    
    async def create_user(self, request):
        data = await request.json()
        user_id = self.next_id
        self.users[user_id] = {
            'id': user_id,
            'name': data['name'],
            'email': data['email']
        }
        self.next_id += 1
        return web.json_response(self.users[user_id], status=201)
    
    async def get_user(self, request):
        user_id = int(request.match_info['user_id'])
        if user_id not in self.users:
            raise web.HTTPNotFound()
        return web.json_response(self.users[user_id])

service = UserService()
app = web.Application()
app.router.add_get('/users', service.get_users)
app.router.add_post('/users', service.create_user)
app.router.add_get('/users/{user_id}', service.get_user)

Chat Server on WebSocket

import json
import weakref
from aiohttp import web

class ChatRoom:
    def __init__(self):
        self.clients = weakref.WeakSet()
    
    async def join(self, ws):
        self.clients.add(ws)
        await self.broadcast({'type': 'user_joined', 'count': len(self.clients)})
    
    async def leave(self, ws):
        if ws in self.clients:
            self.clients.remove(ws)
            await self.broadcast({'type': 'user_left', 'count': len(self.clients)})
    
    async def broadcast(self, message):
        if self.clients:
            await asyncio.gather(
                *[client.send_str(json.dumps(message)) for client in self.clients],
                return_exceptions=True
            )

chat_room = ChatRoom()

async def websocket_handler(request):
    ws = web.WebSocketResponse()
    await ws.prepare(request)
    
    await chat_room.join(ws)
    
    try:
        async for msg in ws:
            if msg.type == web.WSMsgType.TEXT:
                data = json.loads(msg.data)
                if data['type'] == 'message':
                    await chat_room.broadcast({
                        'type': 'message',
                        'text': data['text'],
                        'user': data['user']
                    })
    finally:
        await chat_room.leave(ws)
    
    return ws

Performance and Optimization

Connection Settings

async def optimized_client():
    connector = aiohttp.TCPConnector(
        limit=100,           # Maximum connections
        limit_per_host=10,   # Maximum connections per host
        ttl_dns_cache=300,   # DNS cache for 5 minutes
        use_dns_cache=True,  # Use DNS cache
        keepalive_timeout=30 # Keep-alive timeout
    )
    
    timeout = aiohttp.ClientTimeout(
        total=30,           # Total timeout
        connect=5,          # Connection timeout
        sock_read=10        # Read timeout
    )
    
    return aiohttp.ClientSession(
        connector=connector,
        timeout=timeout,
        headers={'User-Agent': 'MyApp/1.0'}
    )

Connection Pools

class ConnectionPool:
    def __init__(self):
        self.session = None
    
    async def get_session(self):
        if self.session is None:
            self.session = await optimized_client()
        return self.session
    
    async def close(self):
        if self.session:
            await self.session.close()

pool = ConnectionPool()

async def make_request(url):
    session = await pool.get_session()
    async with session.get(url) as response:
        return await response.json()

Testing aiohttp Applications

Basic Tests

import pytest
from aiohttp import web
from aiohttp.test_utils import make_mocked_request

async def test_handler():
    request = make_mocked_request('GET', '/')
    response = await hello_handler(request)
    assert response.status == 200
    assert response.text == "Hello from aiohttp server!"

@pytest.fixture
async def client(aiohttp_client):
    app = web.Application()
    app.router.add_get('/', hello_handler)
    return await aiohttp_client(app)

async def test_hello_endpoint(client):
    response = await client.get('/')
    assert response.status == 200
    text = await response.text()
    assert text == "Hello from aiohttp server!"

Testing with a Database

import pytest
import asyncio
from aiohttp import web
from aiohttp.test_utils import make_mocked_request

@pytest.fixture
def event_loop():
    loop = asyncio.new_event_loop()
    yield loop
    loop.close()

@pytest.fixture
async def app():
    app = web.Application()
    # Configuring the application for tests
    return app

async def test_user_creation(client):
    user_data = {'name': 'Test User', 'email': 'test@example.com'}
    response = await client.post('/users', json=user_data)
    assert response.status == 201
    
    data = await response.json()
    assert data['name'] == 'Test User'
    assert data['email'] == 'test@example.com'

Security

Data Validation

from aiohttp import web
import json

async def validate_user_data(data):
    required_fields = ['name', 'email']
    for field in required_fields:
        if field not in data:
            raise web.HTTPBadRequest(text=f"Missing field: {field}")
    
    if '@' not in data['email']:
        raise web.HTTPBadRequest(text="Invalid email format")

async def create_user_handler(request):
    try:
        data = await request.json()
        await validate_user_data(data)
        
        # Creating a user
        user = {'id': 1, 'name': data['name'], 'email': data['email']}
        return web.json_response(user, status=201)
    
    except json.JSONDecodeError:
        raise web.HTTPBadRequest(text="Invalid JSON")

Authentication and Authorization

import jwt
from aiohttp import web

async def require_auth(request):
    auth_header = request.headers.get('Authorization')
    if not auth_header or not auth_header.startswith('Bearer '):
        raise web.HTTPUnauthorized()
    
    token = auth_header.split(' ')[1]
    try:
        payload = jwt.decode(token, 'secret', algorithms=['HS256'])
        request['user'] = payload
    except jwt.InvalidTokenError:
        raise web.HTTPUnauthorized()

@web.middleware
async def auth_middleware(request, handler):
    if request.path.startswith('/protected/'):
        await require_auth(request)
    return await handler(request)

Frequently Asked Questions

What is aiohttp?

aiohttp is an asynchronous Python library for working with HTTP clients and servers, built on top of asyncio. It provides tools for creating high-performance web applications and performing concurrent HTTP requests.

Does aiohttp Support WebSocket?

Yes, aiohttp has built-in WebSocket support for both the client and server sides. This allows you to create interactive real-time applications.

Can I Use aiohttp with Templates?

Yes, aiohttp supports integration with popular template engines, especially with Jinja2 through the aiohttp_jinja2 library.

What is the Difference Between aiohttp and FastAPI?

aiohttp is a low-level asynchronous HTTP framework that provides basic tools for working with HTTP. FastAPI is a high-level framework for creating REST APIs with automatic typing, validation, and documentation generation.

Is aiohttp Suitable for Production?

Yes, aiohttp is suitable for production with proper configuration. It is recommended to use it with process managers like Gunicorn or uWSGI, and to configure monitoring and logging.

How to Handle Large Files in aiohttp?

To handle large files, use streaming with response.content.iter_chunked() or response.content.read() with a specified chunk size.

News