asyncpg – асинхронный клиент PostgreSQL

онлайн тренажер по питону
Онлайн-тренажер Python для начинающих

Изучайте Python легко и без перегрузки теорией. Решайте практические задачи с автоматической проверкой, получайте подсказки на русском языке и пишите код прямо в браузере — без необходимости что-либо устанавливать.

Начать курс

Введение

С ростом популярности асинхронных веб-фреймворков, таких как FastAPI и Starlette, возрастает потребность в быстрых, неблокирующих способах взаимодействия с базой данных. asyncpg — это асинхронный клиент для PostgreSQL, разработанный с нуля для максимальной производительности.

asyncpg представляет собой низкоуровневый, быстрый и эффективный способ работы с PostgreSQL в async/await стиле, что делает его отличным выбором для современных Python-приложений. Библиотека написана на Python и Cython, что обеспечивает высокую скорость выполнения операций и минимальные накладные расходы.

Что такое asyncpg

asyncpg — это полнофункциональный асинхронный драйвер PostgreSQL для Python, который обеспечивает прямое взаимодействие с базой данных без использования ORM. Библиотека поддерживает все современные возможности PostgreSQL, включая JSON, UUID, массивы, пользовательские типы данных и расширенные функции.

Основные преимущества asyncpg:

  • Полная асинхронность с поддержкой asyncio
  • Высокая производительность благодаря использованию Cython
  • Нативная поддержка всех типов данных PostgreSQL
  • Встроенная поддержка пула соединений
  • Безопасность от SQL-инъекций
  • Простая интеграция с асинхронными фреймворками

Установка и первоначальная настройка

Требования к системе

Для работы с asyncpg необходимы:

  • Python 3.7 или выше
  • PostgreSQL 9.5 или выше
  • Операционная система: Linux, macOS, Windows

Установка библиотеки

pip install asyncpg

Для разработки рекомендуется установить дополнительные пакеты:

pip install asyncpg pytest-asyncio

Базовое подключение к базе данных

import asyncpg
import asyncio

async def main():
    # Подключение с параметрами
    conn = await asyncpg.connect(
        user='postgres',
        password='secret',
        database='test_db',
        host='127.0.0.1',
        port=5432
    )
    
    # Проверка подключения
    version = await conn.fetchval('SELECT version()')
    print(f"Подключено к: {version}")
    
    await conn.close()

asyncio.run(main())

Подключение через DSN

# Использование строки подключения
dsn = "postgresql://postgres:secret@127.0.0.1:5432/test_db"
conn = await asyncpg.connect(dsn)

Асинхронность в Python и роль asyncpg

asyncpg использует async/await и полностью совместим с asyncio, что позволяет выполнять неблокирующие запросы к PostgreSQL. Это критически важно для высоконагруженных приложений и API, где блокировка event loop может привести к значительному снижению производительности.

Отличие от синхронных драйверов

В отличие от psycopg2, который является синхронным, asyncpg не блокирует выполнение других задач:

# Синхронный подход (psycopg2)
import psycopg2
conn = psycopg2.connect(...)  # Блокирует поток
cursor = conn.cursor()
cursor.execute("SELECT * FROM users")  # Блокирует поток

# Асинхронный подход (asyncpg)
import asyncpg
conn = await asyncpg.connect(...)  # Не блокирует event loop
rows = await conn.fetch("SELECT * FROM users")  # Не блокирует event loop

Конкурентное выполнение запросов

async def concurrent_queries():
    conn = await asyncpg.connect(...)
    
    # Выполнение нескольких запросов параллельно
    tasks = [
        conn.fetch("SELECT * FROM users"),
        conn.fetch("SELECT * FROM orders"),
        conn.fetch("SELECT * FROM products")
    ]
    
    results = await asyncio.gather(*tasks)
    return results

Основные методы и функции asyncpg

Таблица основных методов

Метод Описание Возвращаемое значение Пример использования
connect() Создание соединения Connection await asyncpg.connect(dsn)
create_pool() Создание пула соединений Pool await asyncpg.create_pool(dsn)
execute() Выполнение SQL без возврата данных Строка статуса await conn.execute("INSERT...")
fetch() Получение всех строк результата List[Record] await conn.fetch("SELECT...")
fetchrow() Получение одной строки Record или None await conn.fetchrow("SELECT...")
fetchval() Получение одного значения Значение или None await conn.fetchval("SELECT COUNT(*)")
executemany() Выполнение запроса с множественными параметрами Список статусов await conn.executemany("INSERT...", data)
copy_from_table() Копирование данных из таблицы Количество строк await conn.copy_from_table("users")
copy_to_table() Копирование данных в таблицу Количество строк await conn.copy_to_table("users", records)

Подробное описание методов

execute() - выполнение команд

# Создание таблицы
await conn.execute("""
    CREATE TABLE IF NOT EXISTS users (
        id SERIAL PRIMARY KEY,
        name TEXT NOT NULL,
        email TEXT UNIQUE,
        created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
    )
""")

# Вставка данных
await conn.execute(
    "INSERT INTO users (name, email) VALUES ($1, $2)",
    "Иван Петров", "ivan@example.com"
)

# Обновление данных
await conn.execute(
    "UPDATE users SET name = $1 WHERE id = $2",
    "Иван Иванов", 1
)

# Удаление данных
await conn.execute("DELETE FROM users WHERE id = $1", 1)

fetch() - получение множественных результатов

# Получение всех пользователей
users = await conn.fetch("SELECT * FROM users")

# Получение с условием
active_users = await conn.fetch(
    "SELECT * FROM users WHERE created_at > $1",
    datetime.now() - timedelta(days=30)
)

# Сортировка и лимит
top_users = await conn.fetch(
    "SELECT name, email FROM users ORDER BY created_at DESC LIMIT $1",
    10
)

fetchrow() - получение одной строки

# Получение пользователя по ID
user = await conn.fetchrow("SELECT * FROM users WHERE id = $1", 1)
if user:
    print(f"Имя: {user['name']}, Email: {user['email']}")

# Получение последнего пользователя
last_user = await conn.fetchrow(
    "SELECT * FROM users ORDER BY created_at DESC LIMIT 1"
)

fetchval() - получение одного значения

# Подсчет количества пользователей
count = await conn.fetchval("SELECT COUNT(*) FROM users")

# Получение максимального ID
max_id = await conn.fetchval("SELECT MAX(id) FROM users")

# Проверка существования пользователя
exists = await conn.fetchval(
    "SELECT EXISTS(SELECT 1 FROM users WHERE email = $1)",
    "test@example.com"
)

executemany() - массовые операции

# Массовая вставка данных
users_data = [
    ("Анна", "anna@example.com"),
    ("Петр", "petr@example.com"),
    ("Мария", "maria@example.com")
]

await conn.executemany(
    "INSERT INTO users (name, email) VALUES ($1, $2)",
    users_data
)

Работа с пулом соединений

Создание и настройка пула

# Создание пула с настройками
pool = await asyncpg.create_pool(
    user='postgres',
    password='secret',
    database='test_db',
    host='127.0.0.1',
    port=5432,
    min_size=1,        # Минимальное количество соединений
    max_size=10,       # Максимальное количество соединений
    max_queries=50000, # Максимальное количество запросов на соединение
    max_inactive_connection_lifetime=300,  # Время жизни неактивного соединения
    timeout=30,        # Таймаут подключения
    command_timeout=60 # Таймаут выполнения команд
)

Использование пула

# Получение соединения из пула
async with pool.acquire() as conn:
    users = await conn.fetch("SELECT * FROM users")
    
# Альтернативный способ
conn = await pool.acquire()
try:
    users = await conn.fetch("SELECT * FROM users")
finally:
    await pool.release(conn)

Закрытие пула

# Корректное закрытие пула
await pool.close()

Работа с транзакциями

Автоматическое управление транзакциями

# Транзакция с автоматическим управлением
async with conn.transaction():
    await conn.execute(
        "INSERT INTO users (name, email) VALUES ($1, $2)",
        "Тест", "test@example.com"
    )
    await conn.execute(
        "UPDATE users SET name = $1 WHERE id = $2",
        "Тест Обновленный", 1
    )
    # Если произойдет исключение, транзакция будет отменена

Ручное управление транзакциями

# Ручное управление транзакцией
tr = conn.transaction()
await tr.start()

try:
    await conn.execute(
        "INSERT INTO users (name, email) VALUES ($1, $2)",
        "Тест", "test@example.com"
    )
    
    # Проверка условий
    count = await conn.fetchval("SELECT COUNT(*) FROM users")
    if count > 1000:
        raise Exception("Слишком много пользователей")
    
    await tr.commit()
except Exception:
    await tr.rollback()
    raise

Вложенные транзакции (Savepoints)

async with conn.transaction():
    await conn.execute("INSERT INTO users (name) VALUES ('User1')")
    
    # Создание savepoint
    async with conn.transaction():
        await conn.execute("INSERT INTO users (name) VALUES ('User2')")
        # Если здесь произойдет ошибка, откатится только этот блок
        
    await conn.execute("INSERT INTO users (name) VALUES ('User3')")

Поддержка типов данных PostgreSQL

Базовые типы данных

# Работа с различными типами данных
await conn.execute("""
    CREATE TABLE data_types (
        id SERIAL PRIMARY KEY,
        text_field TEXT,
        int_field INTEGER,
        float_field REAL,
        bool_field BOOLEAN,
        date_field DATE,
        timestamp_field TIMESTAMP,
        json_field JSON,
        uuid_field UUID,
        array_field INTEGER[]
    )
""")

# Вставка данных разных типов
import uuid
from datetime import datetime, date

await conn.execute("""
    INSERT INTO data_types (
        text_field, int_field, float_field, bool_field,
        date_field, timestamp_field, json_field, 
        uuid_field, array_field
    ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
""", 
    "Текст", 42, 3.14, True,
    date.today(), datetime.now(),
    {"key": "value", "number": 123},
    uuid.uuid4(), [1, 2, 3, 4, 5]
)

Работа с JSON

# Вставка JSON данных
json_data = {
    "name": "Иван",
    "age": 30,
    "skills": ["Python", "JavaScript", "SQL"],
    "address": {
        "city": "Москва",
        "country": "Россия"
    }
}

await conn.execute(
    "INSERT INTO users (profile) VALUES ($1)",
    json_data
)

# Запрос JSON данных
profiles = await conn.fetch("SELECT profile FROM users")
for profile in profiles:
    print(profile['profile']['name'])

Работа с массивами

# Вставка массива
await conn.execute(
    "INSERT INTO data_types (array_field) VALUES ($1)",
    [1, 2, 3, 4, 5]
)

# Поиск по массиву
results = await conn.fetch(
    "SELECT * FROM data_types WHERE $1 = ANY(array_field)",
    3
)

Кастомные преобразования типов

# Настройка кодека для custom типов
await conn.set_type_codec(
    'json',
    encoder=json.dumps,
    decoder=json.loads,
    schema='pg_catalog'
)

# Регистрация кастомного enum типа
await conn.execute("""
    CREATE TYPE user_role AS ENUM ('admin', 'user', 'guest')
""")

# Работа с enum
await conn.execute("""
    ALTER TABLE users ADD COLUMN role user_role DEFAULT 'user'
""")

Параметризированные запросы и безопасность

Плейсхолдеры в asyncpg

# asyncpg использует числовые плейсхолдеры $1, $2, $3...
await conn.execute(
    "INSERT INTO users (name, email, age) VALUES ($1, $2, $3)",
    "Иван", "ivan@example.com", 30
)

# Множественные условия
users = await conn.fetch("""
    SELECT * FROM users 
    WHERE age BETWEEN $1 AND $2 
    AND name ILIKE $3
""", 20, 50, "%иван%")

Защита от SQL-инъекций

# БЕЗОПАСНО - использование параметров
user_id = "1; DROP TABLE users; --"  # Попытка инъекции
user = await conn.fetchrow(
    "SELECT * FROM users WHERE id = $1",
    user_id
)

# НЕБЕЗОПАСНО - НЕ делайте так!
# query = f"SELECT * FROM users WHERE id = {user_id}"
# await conn.execute(query)

Динамические запросы

# Безопасное построение динамических запросов
def build_filter_query(filters):
    conditions = []
    params = []
    param_count = 0
    
    for field, value in filters.items():
        param_count += 1
        conditions.append(f"{field} = ${param_count}")
        params.append(value)
    
    where_clause = " AND ".join(conditions) if conditions else "TRUE"
    query = f"SELECT * FROM users WHERE {where_clause}"
    
    return query, params

# Использование
filters = {"name": "Иван", "age": 30}
query, params = build_filter_query(filters)
results = await conn.fetch(query, *params)

Производительность и оптимизация

Бенчмарки производительности

asyncpg показывает превосходную производительность по сравнению с другими драйверами:

  • В 2-3 раза быстрее psycopg2
  • В 1.5-2 раза быстрее aiopg
  • Меньше потребления памяти
  • Лучшая масштабируемость при высоких нагрузках

Оптимизация запросов

# Использование EXPLAIN для анализа запросов
explain_result = await conn.fetch("""
    EXPLAIN (FORMAT JSON) 
    SELECT * FROM users WHERE age > 25
""")

print(explain_result[0]['QUERY PLAN'])

Подготовленные запросы

# Подготовка запроса для многократного использования
stmt = await conn.prepare("""
    SELECT * FROM users WHERE age > $1 AND city = $2
""")

# Многократное выполнение
results1 = await stmt.fetch(25, "Москва")
results2 = await stmt.fetch(30, "Санкт-Петербург")
results3 = await stmt.fetch(35, "Екатеринбург")

Массовые операции

# Эффективная массовая вставка
async def bulk_insert_users(conn, users_data):
    await conn.executemany("""
        INSERT INTO users (name, email, age) VALUES ($1, $2, $3)
    """, users_data)

# Использование COPY для больших объемов данных
async def copy_insert_users(conn, users_data):
    await conn.copy_records_to_table(
        'users',
        records=users_data,
        columns=['name', 'email', 'age']
    )

Интеграция с популярными фреймворками

Интеграция с FastAPI

from fastapi import FastAPI, Depends, HTTPException
from contextlib import asynccontextmanager
import asyncpg

# Глобальный пул соединений
db_pool = None

@asynccontextmanager
async def lifespan(app: FastAPI):
    global db_pool
    # Инициализация пула при старте
    db_pool = await asyncpg.create_pool(
        user='postgres',
        password='secret',
        database='test_db',
        host='127.0.0.1',
        min_size=1,
        max_size=10
    )
    yield
    # Закрытие пула при остановке
    await db_pool.close()

app = FastAPI(lifespan=lifespan)

# Dependency для получения соединения
async def get_db():
    async with db_pool.acquire() as conn:
        yield conn

# Эндпоинты
@app.get("/users")
async def get_users(db=Depends(get_db)):
    users = await db.fetch("SELECT * FROM users")
    return [dict(user) for user in users]

@app.post("/users")
async def create_user(user_data: dict, db=Depends(get_db)):
    user_id = await db.fetchval("""
        INSERT INTO users (name, email) 
        VALUES ($1, $2) 
        RETURNING id
    """, user_data['name'], user_data['email'])
    
    return {"id": user_id, **user_data}

@app.get("/users/{user_id}")
async def get_user(user_id: int, db=Depends(get_db)):
    user = await db.fetchrow(
        "SELECT * FROM users WHERE id = $1", user_id
    )
    if not user:
        raise HTTPException(status_code=404, detail="User not found")
    return dict(user)

Интеграция с Starlette

from starlette.applications import Starlette
from starlette.responses import JSONResponse
from starlette.routing import Route
import asyncpg

# Глобальный пул
db_pool = None

async def startup():
    global db_pool
    db_pool = await asyncpg.create_pool(
        user='postgres',
        password='secret',
        database='test_db'
    )

async def shutdown():
    await db_pool.close()

async def get_users(request):
    async with db_pool.acquire() as conn:
        users = await conn.fetch("SELECT * FROM users")
        return JSONResponse([dict(user) for user in users])

app = Starlette(
    routes=[
        Route('/users', get_users, methods=['GET']),
    ],
    on_startup=[startup],
    on_shutdown=[shutdown]
)

Интеграция с Quart

from quart import Quart, jsonify
import asyncpg

app = Quart(__name__)

@app.before_serving
async def startup():
    app.db_pool = await asyncpg.create_pool(
        user='postgres',
        password='secret',
        database='test_db'
    )

@app.after_serving
async def shutdown():
    await app.db_pool.close()

@app.route('/users')
async def get_users():
    async with app.db_pool.acquire() as conn:
        users = await conn.fetch("SELECT * FROM users")
        return jsonify([dict(user) for user in users])

Интеграция с ORM

Использование с SQLAlchemy 2.0

from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker

# Создание асинхронного движка с asyncpg
engine = create_async_engine(
    "postgresql+asyncpg://postgres:secret@localhost/test_db",
    echo=True
)

# Создание сессии
async_session = sessionmaker(
    engine, class_=AsyncSession, expire_on_commit=False
)

# Использование
async with async_session() as session:
    result = await session.execute(
        text("SELECT * FROM users WHERE age > :age"),
        {"age": 25}
    )
    users = result.fetchall()

Использование с Tortoise ORM

from tortoise import Tortoise, fields
from tortoise.models import Model

class User(Model):
    id = fields.IntField(pk=True)
    name = fields.CharField(max_length=100)
    email = fields.CharField(max_length=255, unique=True)
    
    class Meta:
        table = "users"

# Инициализация с asyncpg
async def init_db():
    await Tortoise.init(
        db_url='asyncpg://postgres:secret@localhost/test_db',
        modules={'models': ['__main__']}
    )
    await Tortoise.generate_schemas()

# Использование
async def main():
    await init_db()
    
    # Создание пользователя
    user = await User.create(name="Иван", email="ivan@example.com")
    
    # Получение пользователей
    users = await User.all()

Обработка ошибок и исключений

Основные типы исключений

import asyncpg

try:
    conn = await asyncpg.connect(
        user='wrong_user',
        password='wrong_password',
        database='test_db'
    )
except asyncpg.InvalidAuthorizationSpecificationError:
    print("Неправильные учетные данные")
except asyncpg.InvalidCatalogNameError:
    print("База данных не найдена")
except asyncpg.ConnectionDoesNotExistError:
    print("Соединение недоступно")
except asyncpg.PostgresError as e:
    print(f"Ошибка PostgreSQL: {e}")

Обработка ошибок в транзакциях

async def safe_transaction(conn):
    async with conn.transaction():
        try:
            await conn.execute(
                "INSERT INTO users (name, email) VALUES ($1, $2)",
                "Тест", "test@example.com"
            )
            
            # Операция, которая может вызвать ошибку
            await conn.execute(
                "INSERT INTO users (name, email) VALUES ($1, $2)",
                "Тест2", "test@example.com"  # Дубликат email
            )
            
        except asyncpg.UniqueViolationError:
            print("Пользователь с таким email уже существует")
            raise  # Транзакция будет отменена
        except asyncpg.PostgresError as e:
            print(f"Ошибка базы данных: {e}")
            raise

Retry логика

import asyncio
from typing import Optional

async def execute_with_retry(
    conn, 
    query: str, 
    *args, 
    max_retries: int = 3,
    delay: float = 1.0
) -> Optional[str]:
    for attempt in range(max_retries):
        try:
            return await conn.execute(query, *args)
        except asyncpg.ConnectionDoesNotExistError:
            if attempt < max_retries - 1:
                await asyncio.sleep(delay * (2 ** attempt))
                continue
            raise
        except asyncpg.PostgresError:
            raise

Тестирование приложений с asyncpg

Настройка тестовой базы данных

import pytest
import asyncpg
from unittest.mock import AsyncMock

@pytest.fixture
async def db_connection():
    # Создание соединения с тестовой базой
    conn = await asyncpg.connect(
        user='postgres',
        password='secret',
        database='test_db',
        host='localhost'
    )
    
    # Создание тестовых таблиц
    await conn.execute("""
        CREATE TABLE IF NOT EXISTS users (
            id SERIAL PRIMARY KEY,
            name TEXT NOT NULL,
            email TEXT UNIQUE
        )
    """)
    
    yield conn
    
    # Очистка после теста
    await conn.execute("DROP TABLE IF EXISTS users")
    await conn.close()

@pytest.fixture
async def db_pool():
    pool = await asyncpg.create_pool(
        user='postgres',
        password='secret',
        database='test_db',
        min_size=1,
        max_size=5
    )
    
    yield pool
    
    await pool.close()

Примеры тестов

@pytest.mark.asyncio
async def test_create_user(db_connection):
    # Вставка пользователя
    await db_connection.execute(
        "INSERT INTO users (name, email) VALUES ($1, $2)",
        "Тест", "test@example.com"
    )
    
    # Проверка вставки
    user = await db_connection.fetchrow(
        "SELECT * FROM users WHERE email = $1",
        "test@example.com"
    )
    
    assert user is not None
    assert user['name'] == "Тест"
    assert user['email'] == "test@example.com"

@pytest.mark.asyncio
async def test_duplicate_email(db_connection):
    # Первая вставка
    await db_connection.execute(
        "INSERT INTO users (name, email) VALUES ($1, $2)",
        "Тест1", "test@example.com"
    )
    
    # Попытка вставки дубликата
    with pytest.raises(asyncpg.UniqueViolationError):
        await db_connection.execute(
            "INSERT INTO users (name, email) VALUES ($1, $2)",
            "Тест2", "test@example.com"
        )

Моки для тестирования

@pytest.fixture
def mock_db_connection():
    mock_conn = AsyncMock()
    mock_conn.fetch.return_value = [
        {'id': 1, 'name': 'Тест', 'email': 'test@example.com'}
    ]
    mock_conn.fetchrow.return_value = {
        'id': 1, 'name': 'Тест', 'email': 'test@example.com'
    }
    mock_conn.execute.return_value = "INSERT 0 1"
    return mock_conn

@pytest.mark.asyncio
async def test_with_mock(mock_db_connection):
    # Тест с использованием мока
    users = await mock_db_connection.fetch("SELECT * FROM users")
    assert len(users) == 1
    assert users[0]['name'] == 'Тест'

Миграции базы данных

Простая система миграций

import asyncpg
import os
from pathlib import Path

class MigrationRunner:
    def __init__(self, db_url: str, migrations_dir: str = "migrations"):
        self.db_url = db_url
        self.migrations_dir = Path(migrations_dir)
    
    async def init_migrations_table(self, conn):
        await conn.execute("""
            CREATE TABLE IF NOT EXISTS migrations (
                id SERIAL PRIMARY KEY,
                filename VARCHAR(255) NOT NULL UNIQUE,
                applied_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
            )
        """)
    
    async def get_applied_migrations(self, conn):
        applied = await conn.fetch(
            "SELECT filename FROM migrations ORDER BY id"
        )
        return {row['filename'] for row in applied}
    
    async def run_migrations(self):
        conn = await asyncpg.connect(self.db_url)
        try:
            await self.init_migrations_table(conn)
            applied = await self.get_applied_migrations(conn)
            
            migration_files = sorted(self.migrations_dir.glob("*.sql"))
            
            for migration_file in migration_files:
                if migration_file.name not in applied:
                    print(f"Применение миграции: {migration_file.name}")
                    
                    with open(migration_file, 'r') as f:
                        sql = f.read()
                    
                    async with conn.transaction():
                        await conn.execute(sql)
                        await conn.execute(
                            "INSERT INTO migrations (filename) VALUES ($1)",
                            migration_file.name
                        )
                    
                    print(f"Миграция {migration_file.name} применена")
        finally:
            await conn.close()

# Использование
runner = MigrationRunner("postgresql://postgres:secret@localhost/test_db")
await runner.run_migrations()

Лучшие практики и рекомендации

Управление соединениями

# Рекомендуемый паттерн для приложений
class Database:
    def __init__(self):
        self.pool = None
    
    async def connect(self):
        self.pool = await asyncpg.create_pool(
            user='postgres',
            password='secret',
            database='test_db',
            min_size=1,
            max_size=10,
            command_timeout=60
        )
    
    async def disconnect(self):
        if self.pool:
            await self.pool.close()
    
    async def execute(self, query, *args):
        async with self.pool.acquire() as conn:
            return await conn.execute(query, *args)
    
    async def fetch(self, query, *args):
        async with self.pool.acquire() as conn:
            return await conn.fetch(query, *args)
    
    async def fetchrow(self, query, *args):
        async with self.pool.acquire() as conn:
            return await conn.fetchrow(query, *args)

# Использование
db = Database()
await db.connect()

# В приложении
users = await db.fetch("SELECT * FROM users")

Логирование запросов

import logging
from functools import wraps

logger = logging.getLogger(__name__)

def log_query(func):
    @wraps(func)
    async def wrapper(*args, **kwargs):
        query = args[1] if len(args) > 1 else kwargs.get('query', '')
        params = args[2:] if len(args) > 2 else []
        
        logger.info(f"Executing query: {query}")
        logger.debug(f"Parameters: {params}")
        
        try:
            result = await func(*args, **kwargs)
            logger.info("Query executed successfully")
            return result
        except Exception as e:
            logger.error(f"Query failed: {e}")
            raise
    
    return wrapper

# Применение к методам
class DatabaseWithLogging:
    def __init__(self, pool):
        self.pool = pool
    
    @log_query
    async def fetch(self, query, *args):
        async with self.pool.acquire() as conn:
            return await conn.fetch(query, *args)

Конфигурация для продакшена

import os
from urllib.parse import urlparse

def get_db_config():
    db_url = os.getenv('DATABASE_URL')
    if db_url:
        parsed = urlparse(db_url)
        return {
            'user': parsed.username,
            'password': parsed.password,
            'database': parsed.path[1:],  # Убираем '/' в начале
            'host': parsed.hostname,
            'port': parsed.port or 5432,
        }
    
    return {
        'user': os.getenv('DB_USER', 'postgres'),
        'password': os.getenv('DB_PASSWORD', ''),
        'database': os.getenv('DB_NAME', 'postgres'),
        'host': os.getenv('DB_HOST', 'localhost'),
        'port': int(os.getenv('DB_PORT', 5432)),
    }

# Создание пула для продакшена
async def create_production_pool():
    config = get_db_config()
    
    return await asyncpg.create_pool(
        **config,
        min_size=5,
        max_size=20,
        max_queries=50000,
        max_inactive_connection_lifetime=300,
        timeout=30,
        command_timeout=300,
        server_settings={
            'application_name': 'MyApp',
            'jit': 'off'  # Отключение JIT для стабильности
        }
    )

Сравнение с другими драйверами PostgreSQL

Детальное сравнение

Характеристика asyncpg psycopg2 aiopg SQLAlchemy
Асинхронность Полная Нет Полная Опциональная
Производительность Очень высокая Средняя Средняя Средняя
Размер библиотеки Компактная Средняя Средняя Большая
Простота использования Высокая Высокая Средняя Низкая
Поддержка типов PG Полная Полная Полная Полная
ORM возможности Нет Нет Нет Есть
Сообщество Активное Очень активное Небольшое Очень активное
Документация Хорошая Отличная Средняя Отличная

Примеры кода для сравнения

# asyncpg
conn = await asyncpg.connect(dsn)
users = await conn.fetch("SELECT * FROM users")

# psycopg2 (синхронный)
conn = psycopg2.connect(dsn)
cursor = conn.cursor()
cursor.execute("SELECT * FROM users")
users = cursor.fetchall()

# aiopg
conn = await aiopg.connect(dsn)
cursor = await conn.cursor()
await cursor.execute("SELECT * FROM users")
users = await cursor.fetchall()

Часто задаваемые вопросы

Что такое asyncpg и зачем он нужен?

asyncpg — это высокопроизводительный асинхронный драйвер для PostgreSQL, написанный специально для Python. Он обеспечивает быстрое и эффективное взаимодействие с базой данных без блокировки event loop, что критично для современных асинхронных приложений.

Почему asyncpg быстрее других драйверов?

asyncpg написан с использованием Cython и оптимизирован для работы с протоколом PostgreSQL. Он не содержит лишних абстракций и напрямую работает с бинарным протоколом базы данных, что обеспечивает максимальную производительность.

Можно ли использовать asyncpg в синхронном коде?

Нет, asyncpg предназначен исключительно для асинхронного кода. Для синхронных приложений следует использовать psycopg2 или psycopg3.

Как правильно обрабатывать ошибки подключения?

Используйте try/except блоки для обработки специфических исключений asyncpg. Также рекомендуется реализовать retry логику для временных сбоев сети.

Безопасен ли asyncpg для использования в продакшене?

Да, asyncpg активно используется в продакшене многими компаниями. Библиотека стабильна, хорошо протестирована и регулярно обновляется.

Как оптимизировать производительность при работе с большими объемами данных?

Используйте пул соединений, подготовленные запросы, массовые операции (executemany, copy_to_table) и настройте параметры пула в соответствии с вашей нагрузкой.

Поддерживает ли asyncpg все возможности PostgreSQL?

Да, asyncpg поддерживает все основные возможности PostgreSQL, включая транзакции, триггеры, функции, JSON, UUID, массивы и пользовательские типы данных.

Как интегрировать asyncpg с существующим приложением?

Начните с создания пула соединений при запуске приложения, замените синхронные запросы на асинхронные и обновите обработчики для работы с async/await.

Есть ли готовые решения для миграций с asyncpg?

asyncpg не включает встроенную систему миграций, но можно использовать Alembic с SQLAlchemy или написать собственную простую систему миграций.

Как тестировать код, использующий asyncpg?

Используйте pytest-asyncio для асинхронных тестов, создавайте отдельную тестовую базу данных и применяйте фикстуры для настройки и очистки тестовых данных.

Заключение

asyncpg представляет собой мощный и эффективный инструмент для работы с PostgreSQL в асинхронных Python-приложениях. Его высокая производительность, полная поддержка возможностей PostgreSQL и простота использования делают его идеальным выбором для современных веб-приложений, API и микросервисов.

Основные преимущества asyncpg:

  • Превосходная производительность благодаря оптимизированной архитектуре
  • Полная асинхронность без блокировки event loop
  • Простая интеграция с популярными фреймворками
  • Встроенная поддержка пула соединений
  • Безопасность от SQL-инъекций
  • Активное сообщество и регулярные обновления

asyncpg является оптимальным решением для разработчиков, которым необходима высокая производительность, надежность и простота использования при работе с PostgreSQL в асинхронной среде. Библиотека продолжает развиваться и остается одним из лучших выборов для создания масштабируемых и эффективных приложений на Python.

Новости