Pymongo – драйвер MongoDB

онлайн тренажер по питону
Онлайн-тренажер Python для начинающих

Изучайте Python легко и без перегрузки теорией. Решайте практические задачи с автоматической проверкой, получайте подсказки на русском языке и пишите код прямо в браузере — без необходимости что-либо устанавливать.

Начать курс

Введение

В современном мире разработки программного обеспечения NoSQL-базы данных стали неотъемлемой частью технологического стека. MongoDB, как одна из ведущих документно-ориентированных СУБД, предоставляет разработчикам мощные инструменты для работы с неструктурированными данными. PyMongo служит официальным Python-драйвером для MongoDB, обеспечивая полную интеграцию между Python-приложениями и MongoDB.

Что такое PyMongo

PyMongo представляет собой официальную библиотеку Python для взаимодействия с MongoDB. Это высокопроизводительный драйвер, который предоставляет прямой доступ ко всем возможностям MongoDB, включая современные функции, такие как транзакции, изменение потоков и временные коллекции.

Ключевые особенности PyMongo

Совместимость и поддержка версий

  • Поддержка MongoDB версий 3.6, 4.0, 4.2, 4.4, 5.0, 6.0 и 7.0
  • Совместимость с Python 3.7+
  • Регулярные обновления и исправления безопасности

Функциональные возможности

  • Полная поддержка BSON (Binary JSON) формата
  • Встроенная поддержка GridFS для хранения больших файлов
  • Автоматическое переподключение при сбоях
  • Поддержка SSL/TLS шифрования
  • Мониторинг производительности и профилирование

Архитектурные преимущества

  • Thread-safe операции
  • Пулы соединений
  • Автоматическое обнаружение топологии кластера
  • Поддержка реплик и шардинга

Установка и настройка PyMongo

Базовая установка

pip install pymongo

Установка с дополнительными возможностями

# Для работы с шифрованием
pip install pymongo[encryption]

# Для работы с Kerberos
pip install pymongo[gssapi]

# Для асинхронной работы
pip install motor

Проверка установки

import pymongo
print(pymongo.version)

Подключение к MongoDB

Основные способы подключения

Локальное подключение

from pymongo import MongoClient

# Простое подключение
client = MongoClient()
# или
client = MongoClient('localhost', 27017)
# или
client = MongoClient('mongodb://localhost:27017/')

Подключение с аутентификацией

client = MongoClient('mongodb://username:password@localhost:27017/')

Подключение к MongoDB Atlas

client = MongoClient('mongodb+srv://username:password@cluster.mongodb.net/')

Подключение с SSL

client = MongoClient('mongodb://localhost:27017/',
                    ssl=True,
                    ssl_cert_reqs=ssl.CERT_NONE)

Параметры подключения

client = MongoClient(
    host='localhost',
    port=27017,
    maxPoolSize=50,
    minPoolSize=10,
    maxIdleTimeMS=30000,
    connectTimeoutMS=5000,
    socketTimeoutMS=5000,
    serverSelectionTimeoutMS=5000,
    retryWrites=True,
    retryReads=True
)

Основные компоненты PyMongo

Работа с базами данных

# Получение базы данных
db = client['mydatabase']
# или
db = client.mydatabase

# Список всех баз данных
databases = client.list_database_names()

# Удаление базы данных
client.drop_database('mydatabase')

# Получение статистики базы данных
stats = db.command('dbstats')

Работа с коллекциями

# Получение коллекции
collection = db['mycollection']
# или
collection = db.mycollection

# Создание коллекции с опциями
db.create_collection('mycollection', 
                    capped=True, 
                    size=1000000,
                    max=5000)

# Список коллекций
collections = db.list_collection_names()

# Удаление коллекции
db.drop_collection('mycollection')

Полная таблица методов и функций PyMongo

Методы MongoClient

Метод Описание Пример использования
close() Закрывает соединение client.close()
list_database_names() Список баз данных client.list_database_names()
drop_database() Удаляет базу данных client.drop_database('mydb')
start_session() Начинает сессию client.start_session()
server_info() Информация о сервере client.server_info()

Методы Database

Метод Описание Пример использования
list_collection_names() Список коллекций db.list_collection_names()
create_collection() Создает коллекцию db.create_collection('users')
drop_collection() Удаляет коллекцию db.drop_collection('users')
command() Выполняет команду db.command('ping')
with_options() Создает копию с опциями db.with_options(codec_options=opts)

Методы Collection - CRUD операции

Метод Описание Пример использования
insert_one() Вставляет один документ collection.insert_one({'name': 'John'})
insert_many() Вставляет несколько документов collection.insert_many([{}, {}])
find_one() Находит один документ collection.find_one({'name': 'John'})
find() Находит документы collection.find({'age': {'$gt': 18}})
find_one_and_update() Находит и обновляет collection.find_one_and_update(filter, update)
find_one_and_replace() Находит и заменяет collection.find_one_and_replace(filter, replacement)
find_one_and_delete() Находит и удаляет collection.find_one_and_delete(filter)
update_one() Обновляет один документ collection.update_one(filter, update)
update_many() Обновляет несколько документов collection.update_many(filter, update)
replace_one() Заменяет один документ collection.replace_one(filter, replacement)
delete_one() Удаляет один документ collection.delete_one(filter)
delete_many() Удаляет несколько документов collection.delete_many(filter)

Методы Collection - Индексы и агрегация

Метод Описание Пример использования
create_index() Создает индекс collection.create_index('name')
create_indexes() Создает несколько индексов collection.create_indexes([IndexModel(...)])
drop_index() Удаляет индекс collection.drop_index('name_1')
drop_indexes() Удаляет все индексы collection.drop_indexes()
list_indexes() Список индексов collection.list_indexes()
index_information() Информация об индексах collection.index_information()
aggregate() Выполняет агрегацию collection.aggregate(pipeline)
count_documents() Подсчитывает документы collection.count_documents(filter)
estimated_document_count() Приблизительный подсчет collection.estimated_document_count()
distinct() Уникальные значения collection.distinct('field')

Методы Collection - Дополнительные операции

Метод Описание Пример использования
bulk_write() Массовые операции collection.bulk_write(operations)
watch() Отслеживает изменения collection.watch(pipeline)
rename() Переименовывает коллекцию collection.rename('new_name')
options() Опции коллекции collection.options()
with_options() Создает копию с опциями collection.with_options(codec_options=opts)

Методы Cursor

Метод Описание Пример использования
sort() Сортирует результаты cursor.sort('name', 1)
limit() Ограничивает количество cursor.limit(10)
skip() Пропускает документы cursor.skip(5)
hint() Указывает индекс cursor.hint('name_1')
explain() Объясняет план выполнения cursor.explain()
count() Подсчитывает результаты cursor.count()
close() Закрывает курсор cursor.close()

Детальное руководство по CRUD операциям

Создание (Create)

Вставка одного документа

from datetime import datetime
from bson import ObjectId

# Простая вставка
result = collection.insert_one({
    'name': 'John Doe',
    'age': 30,
    'email': 'john@example.com',
    'created_at': datetime.now()
})

print(f"Inserted document ID: {result.inserted_id}")

Вставка нескольких документов

documents = [
    {'name': 'Alice', 'age': 25, 'city': 'New York'},
    {'name': 'Bob', 'age': 30, 'city': 'Los Angeles'},
    {'name': 'Charlie', 'age': 35, 'city': 'Chicago'}
]

result = collection.insert_many(documents)
print(f"Inserted {len(result.inserted_ids)} documents")

Чтение (Read)

Базовый поиск

# Найти один документ
user = collection.find_one({'name': 'John Doe'})

# Найти все документы
for doc in collection.find():
    print(doc)

# Поиск с условиями
adults = collection.find({'age': {'$gte': 18}})

Проекция полей

# Показать только имя и возраст
users = collection.find({}, {'name': 1, 'age': 1, '_id': 0})

# Исключить определенные поля
users = collection.find({}, {'password': 0, 'internal_id': 0})

Сложные запросы

# Логические операторы
query = {
    '$and': [
        {'age': {'$gte': 18}},
        {'city': {'$in': ['New York', 'Los Angeles']}}
    ]
}
results = collection.find(query)

# Текстовый поиск
collection.create_index([('name', 'text')])
results = collection.find({'$text': {'$search': 'John'}})

Обновление (Update)

Обновление одного документа

# Использование $set
collection.update_one(
    {'name': 'John Doe'},
    {'$set': {'age': 31, 'last_modified': datetime.now()}}
)

# Увеличение значения
collection.update_one(
    {'name': 'John Doe'},
    {'$inc': {'age': 1}}
)

# Добавление элемента в массив
collection.update_one(
    {'name': 'John Doe'},
    {'$push': {'hobbies': 'reading'}}
)

Обновление нескольких документов

# Обновить всех пользователей старше 30
collection.update_many(
    {'age': {'$gt': 30}},
    {'$set': {'category': 'senior'}}
)

Upsert операции

# Создать документ если не существует
collection.update_one(
    {'username': 'newuser'},
    {'$set': {'email': 'new@example.com'}},
    upsert=True
)

Удаление (Delete)

Удаление одного документа

result = collection.delete_one({'name': 'John Doe'})
print(f"Deleted {result.deleted_count} document")

Удаление нескольких документов

result = collection.delete_many({'age': {'$lt': 18}})
print(f"Deleted {result.deleted_count} documents")

Работа с индексами

Создание индексов

Простые индексы

# Одиночный индекс
collection.create_index('name')

# Составной индекс
collection.create_index([('name', 1), ('age', -1)])

# Уникальный индекс
collection.create_index('email', unique=True)

Специальные типы индексов

# Текстовый индекс
collection.create_index([('title', 'text'), ('content', 'text')])

# Геопространственный индекс
collection.create_index([('location', '2dsphere')])

# Частичный индекс
collection.create_index(
    'email',
    partialFilterExpression={'email': {'$exists': True}}
)

# TTL индекс (Time To Live)
collection.create_index(
    'expireAt',
    expireAfterSeconds=3600
)

Управление индексами

# Список всех индексов
indexes = list(collection.list_indexes())

# Информация об индексах
index_info = collection.index_information()

# Удаление индекса
collection.drop_index('name_1')

# Удаление всех индексов (кроме _id)
collection.drop_indexes()

Агрегация данных

Основы агрегации

# Простая агрегация
pipeline = [
    {'$match': {'age': {'$gte': 18}}},
    {'$group': {'_id': '$city', 'count': {'$sum': 1}}},
    {'$sort': {'count': -1}}
]

results = collection.aggregate(pipeline)
for result in results:
    print(result)

Сложные агрегации

# Сложная агрегация с несколькими этапами
pipeline = [
    # Фильтрация
    {'$match': {'status': 'active'}},
    
    # Добавление вычисляемых полей
    {'$addFields': {
        'age_group': {
            '$cond': {
                'if': {'$gte': ['$age', 30]},
                'then': 'adult',
                'else': 'young'
            }
        }
    }},
    
    # Группировка
    {'$group': {
        '_id': '$age_group',
        'count': {'$sum': 1},
        'avg_age': {'$avg': '$age'},
        'max_age': {'$max': '$age'}
    }},
    
    # Сортировка
    {'$sort': {'count': -1}},
    
    # Ограничение результатов
    {'$limit': 10}
]

results = collection.aggregate(pipeline)

Агрегация с lookup

# Соединение коллекций
pipeline = [
    {'$lookup': {
        'from': 'orders',
        'localField': '_id',
        'foreignField': 'user_id',
        'as': 'user_orders'
    }},
    {'$match': {'user_orders': {'$ne': []}}},
    {'$addFields': {
        'total_orders': {'$size': '$user_orders'}
    }}
]

results = collection.aggregate(pipeline)

Транзакции

Основы транзакций

# Простая транзакция
with client.start_session() as session:
    with session.start_transaction():
        try:
            # Операции в транзакции
            collection.insert_one(
                {'name': 'Test User', 'balance': 1000},
                session=session
            )
            
            another_collection.update_one(
                {'_id': some_id},
                {'$inc': {'total': 1000}},
                session=session
            )
            
            # Транзакция автоматически коммитится
        except Exception as e:
            # Транзакция автоматически откатывается
            print(f"Transaction failed: {e}")

Ручное управление транзакциями

session = client.start_session()

try:
    session.start_transaction()
    
    # Операции
    collection.insert_one({'test': 'data'}, session=session)
    
    # Ручной коммит
    session.commit_transaction()
    
except Exception as e:
    # Ручной откат
    session.abort_transaction()
    print(f"Error: {e}")
    
finally:
    session.end_session()

Асинхронная работа с Motor

Установка и подключение

import motor.motor_asyncio
import asyncio

# Асинхронное подключение
client = motor.motor_asyncio.AsyncIOMotorClient('mongodb://localhost:27017')
db = client.test_database
collection = db.test_collection

Асинхронные операции

async def async_operations():
    # Вставка
    result = await collection.insert_one({'name': 'Async User'})
    
    # Поиск
    document = await collection.find_one({'name': 'Async User'})
    
    # Обновление
    await collection.update_one(
        {'name': 'Async User'},
        {'$set': {'status': 'updated'}}
    )
    
    # Асинхронная итерация
    async for doc in collection.find():
        print(doc)
    
    # Агрегация
    pipeline = [{'$match': {'status': 'active'}}]
    async for doc in collection.aggregate(pipeline):
        print(doc)

# Запуск асинхронных операций
asyncio.run(async_operations())

Работа с GridFS

Основы GridFS

import gridfs

# Создание GridFS
fs = gridfs.GridFS(db)

# Сохранение файла
with open('image.jpg', 'rb') as f:
    file_id = fs.put(f, filename='image.jpg', metadata={'type': 'image'})

# Чтение файла
file_data = fs.get(file_id)
with open('downloaded_image.jpg', 'wb') as f:
    f.write(file_data.read())

# Поиск файлов
for file in fs.find({'metadata.type': 'image'}):
    print(f"File: {file.filename}, ID: {file._id}")

GridFS с дополнительными возможностями

# Создание GridFS с кастомными коллекциями
fs = gridfs.GridFS(db, collection='custom_files')

# Сохранение с метаданными
file_id = fs.put(
    b'file content',
    filename='test.txt',
    content_type='text/plain',
    metadata={
        'author': 'John Doe',
        'created': datetime.now(),
        'tags': ['important', 'document']
    }
)

# Удаление файла
fs.delete(file_id)

Мониторинг и отслеживание изменений

Change Streams

# Отслеживание изменений в коллекции
change_stream = collection.watch()

for change in change_stream:
    print(f"Change detected: {change}")

# Отслеживание определенных операций
pipeline = [{'$match': {'operationType': 'insert'}}]
change_stream = collection.watch(pipeline)

for change in change_stream:
    print(f"New document inserted: {change['fullDocument']}")

Мониторинг производительности

# Включение профилирования
db.set_profiling_level(2)  # Профилировать все операции

# Просмотр медленных операций
slow_queries = db.system.profile.find().sort('ts', -1).limit(5)
for query in slow_queries:
    print(f"Duration: {query['millis']}ms, Query: {query['command']}")

# Отключение профилирования
db.set_profiling_level(0)

Оптимизация производительности

Лучшие практики для запросов

Использование индексов

# Создание индекса перед выполнением запросов
collection.create_index([('status', 1), ('created_at', -1)])

# Запрос будет использовать индекс
results = collection.find({'status': 'active'}).sort('created_at', -1)

# Проверка использования индекса
explain = collection.find({'status': 'active'}).explain()
print(explain['executionStats'])

Оптимизация проекций

# Загружать только нужные поля
users = collection.find(
    {'age': {'$gte': 18}},
    {'name': 1, 'email': 1, '_id': 0}
)

# Избегать загрузки больших полей
users = collection.find({}, {'large_field': 0})

Пагинация

def paginate_results(collection, query, page_size=10, page_num=1):
    skip = (page_num - 1) * page_size
    return collection.find(query).skip(skip).limit(page_size)

# Использование
page_1 = paginate_results(collection, {'status': 'active'}, 10, 1)

Оптимизация подключений

# Настройка пула соединений
client = MongoClient(
    'mongodb://localhost:27017/',
    maxPoolSize=50,
    minPoolSize=10,
    maxIdleTimeMS=30000,
    waitQueueTimeoutMS=5000
)

# Использование read preferences
from pymongo import ReadPreference

collection_secondary = collection.with_options(
    read_preference=ReadPreference.SECONDARY
)

Безопасность

Аутентификация и авторизация

# Подключение с аутентификацией
client = MongoClient(
    'mongodb://username:password@localhost:27017/',
    authSource='admin'
)

# Использование различных механизмов аутентификации
client = MongoClient(
    'mongodb://localhost:27017/',
    username='user',
    password='pass',
    authSource='admin',
    authMechanism='SCRAM-SHA-256'
)

SSL/TLS подключения

import ssl

client = MongoClient(
    'mongodb://localhost:27017/',
    ssl=True,
    ssl_cert_reqs=ssl.CERT_REQUIRED,
    ssl_ca_certs='path/to/ca.pem',
    ssl_certfile='path/to/client.pem'
)

Обработка ошибок

Типичные исключения

from pymongo.errors import (
    ConnectionFailure,
    ServerSelectionTimeoutError,
    DuplicateKeyError,
    WriteError,
    BulkWriteError
)

try:
    collection.insert_one({'email': 'duplicate@example.com'})
except DuplicateKeyError as e:
    print(f"Duplicate key error: {e}")
    
except ConnectionFailure as e:
    print(f"Connection failed: {e}")
    
except ServerSelectionTimeoutError as e:
    print(f"Server selection timeout: {e}")

Retry логика

import time
from pymongo.errors import AutoReconnect

def retry_operation(func, max_retries=3, delay=1):
    for attempt in range(max_retries):
        try:
            return func()
        except AutoReconnect as e:
            if attempt == max_retries - 1:
                raise
            time.sleep(delay * (2 ** attempt))
    
# Использование
result = retry_operation(
    lambda: collection.find_one({'_id': some_id})
)

Практические примеры использования

Веб-приложение с Flask

from flask import Flask, request, jsonify
from pymongo import MongoClient
from bson import ObjectId
from datetime import datetime

app = Flask(__name__)
client = MongoClient('mongodb://localhost:27017/')
db = client.blog_db
posts = db.posts

@app.route('/posts', methods=['POST'])
def create_post():
    data = request.json
    post = {
        'title': data['title'],
        'content': data['content'],
        'author': data['author'],
        'created_at': datetime.now(),
        'tags': data.get('tags', []),
        'views': 0
    }
    
    result = posts.insert_one(post)
    return jsonify({'id': str(result.inserted_id)})

@app.route('/posts/<post_id>', methods=['GET'])
def get_post(post_id):
    post = posts.find_one({'_id': ObjectId(post_id)})
    if post:
        post['_id'] = str(post['_id'])
        # Увеличиваем счетчик просмотров
        posts.update_one(
            {'_id': ObjectId(post_id)},
            {'$inc': {'views': 1}}
        )
        return jsonify(post)
    return jsonify({'error': 'Post not found'}), 404

@app.route('/posts', methods=['GET'])
def get_posts():
    page = int(request.args.get('page', 1))
    per_page = int(request.args.get('per_page', 10))
    
    posts_cursor = posts.find().sort('created_at', -1)
    posts_cursor = posts_cursor.skip((page - 1) * per_page).limit(per_page)
    
    posts_list = []
    for post in posts_cursor:
        post['_id'] = str(post['_id'])
        posts_list.append(post)
    
    return jsonify(posts_list)

Система логирования

import logging
from pymongo import MongoClient
from datetime import datetime

class MongoHandler(logging.Handler):
    def __init__(self, connection_string, db_name, collection_name):
        super().__init__()
        self.client = MongoClient(connection_string)
        self.db = self.client[db_name]
        self.collection = self.db[collection_name]
        
        # Создаем TTL индекс для автоматического удаления старых логов
        self.collection.create_index(
            'timestamp',
            expireAfterSeconds=30*24*60*60  # 30 дней
        )
    
    def emit(self, record):
        log_entry = {
            'timestamp': datetime.now(),
            'level': record.levelname,
            'message': record.getMessage(),
            'module': record.module,
            'function': record.funcName,
            'line': record.lineno,
            'thread': record.thread,
            'process': record.process
        }
        
        if record.exc_info:
            log_entry['exception'] = self.format(record)
        
        self.collection.insert_one(log_entry)

# Использование
logger = logging.getLogger('my_app')
mongo_handler = MongoHandler(
    'mongodb://localhost:27017/',
    'logs_db',
    'app_logs'
)
logger.addHandler(mongo_handler)
logger.setLevel(logging.INFO)

logger.info('Application started')
logger.error('An error occurred')

Система кэширования

from pymongo import MongoClient
from datetime import datetime, timedelta
import json
import hashlib

class MongoCache:
    def __init__(self, connection_string, db_name, collection_name='cache'):
        self.client = MongoClient(connection_string)
        self.db = self.client[db_name]
        self.collection = self.db[collection_name]
        
        # Создаем TTL индекс для автоматического удаления истекших записей
        self.collection.create_index('expires_at', expireAfterSeconds=0)
    
    def _generate_key(self, key):
        """Генерирует хеш для ключа"""
        return hashlib.md5(str(key).encode()).hexdigest()
    
    def set(self, key, value, ttl=3600):
        """Сохраняет значение с TTL"""
        hashed_key = self._generate_key(key)
        expires_at = datetime.now() + timedelta(seconds=ttl)
        
        self.collection.replace_one(
            {'_id': hashed_key},
            {
                '_id': hashed_key,
                'value': value,
                'expires_at': expires_at,
                'created_at': datetime.now()
            },
            upsert=True
        )
    
    def get(self, key):
        """Получает значение по ключу"""
        hashed_key = self._generate_key(key)
        doc = self.collection.find_one({'_id': hashed_key})
        
        if doc and doc['expires_at'] > datetime.now():
            return doc['value']
        return None
    
    def delete(self, key):
        """Удаляет значение по ключу"""
        hashed_key = self._generate_key(key)
        self.collection.delete_one({'_id': hashed_key})
    
    def clear(self):
        """Очищает весь кэш"""
        self.collection.delete_many({})

# Использование
cache = MongoCache('mongodb://localhost:27017/', 'cache_db')

# Сохранение в кэш
cache.set('user:123', {'name': 'John', 'age': 30}, ttl=1800)

# Получение из кэша
user_data = cache.get('user:123')
if user_data:
    print(f"Cached data: {user_data}")
else:
    print("Cache miss")

Часто задаваемые вопросы

Как правильно закрывать соединение с MongoDB?

# Правильный способ закрытия соединения
client = MongoClient('mongodb://localhost:27017/')
try:
    # Выполнение операций
    db = client.test_db
    collection = db.test_collection
    collection.insert_one({'test': 'data'})
finally:
    client.close()

# Или использование контекстного менеджера
with MongoClient('mongodb://localhost:27017/') as client:
    db = client.test_db
    collection = db.test_collection
    collection.insert_one({'test': 'data'})

Как обработать большие результаты запросов?

# Использование batch_size для больших результатов
cursor = collection.find().batch_size(100)
for document in cursor:
    process_document(document)

# Или использование limit и skip для пагинации
def process_large_collection(collection, batch_size=1000):
    skip = 0
    while True:
        batch = list(collection.find().skip(skip).limit(batch_size))
        if not batch:
            break
        
        for document in batch:
            process_document(document)
        
        skip += batch_size

Как оптимизировать запросы с сортировкой?

# Создание индекса для сортировки
collection.create_index([('created_at', -1)])

# Использование hint для принудительного использования индекса
results = collection.find().sort('created_at', -1).hint('created_at_-1')

# Комбинированный индекс для фильтрации и сортировки
collection.create_index([('status', 1), ('created_at', -1)])
results = collection.find({'status': 'active'}).sort('created_at', -1)

Как работать с большими файлами в MongoDB?

import gridfs
from pymongo import MongoClient

client = MongoClient()
db = client.test_db
fs = gridfs.GridFS(db)

# Сохранение большого файла
with open('large_file.pdf', 'rb') as f:
    file_id = fs.put(f, filename='large_file.pdf')

# Чтение файла по частям
file_obj = fs.get(file_id)
with open('downloaded_file.pdf', 'wb') as f:
    while True:
        chunk = file_obj.read(1024)  # Читаем по 1KB
        if not chunk:
            break
        f.write(chunk)

Как реализовать полнотекстовый поиск?

# Создание текстового индекса
collection.create_index([
    ('title', 'text'),
    ('content', 'text')
])

# Поиск по тексту
results = collection.find({
    '$text': {
        '$search': 'python mongodb',
        '$language': 'russian'
    }
})

# Поиск с весами полей
collection.create_index([
    ('title', 'text'),
    ('content', 'text')
], weights={'title': 10, 'content': 1})

Как реализовать аудит изменений данных?

from datetime import datetime

class AuditMixin:
    def __init__(self, collection):
        self.collection = collection
        self.audit_collection = collection.database[f"{collection.name}_audit"]
    
    def insert_one_with_audit(self, document, user_id=None):
        document['created_at'] = datetime.now()
        document['created_by'] = user_id
        
        result = self.collection.insert_one(document)
        
        # Запись в аудит
        self.audit_collection.insert_one({
            'document_id': result.inserted_id,
            'operation': 'insert',
            'user_id': user_id,
            'timestamp': datetime.now(),
            'data': document
        })
        
        return result
    
    def update_one_with_audit(self, filter_query, update, user_id=None):
        # Получаем исходный документ
        original = self.collection.find_one(filter_query)
        
        # Обновляем документ
        result = self.collection.update_one(filter_query, update)
        
        if result.modified_count > 0:
            # Записываем в аудит
            self.audit_collection.insert_one({
                'document_id': original['_id'],
                'operation': 'update',
                'user_id': user_id,
                'timestamp': datetime.now(),
                'original_data': original,
                'update_query': update
            })
        
        return result

Заключение

PyMongo представляет собой мощный и гибкий инструмент для работы с MongoDB в Python-приложениях. Библиотека предоставляет полный доступ к функциональности MongoDB, включая современные возможности, такие как транзакции, change streams и временные коллекции.

Ключевые преимущества PyMongo включают высокую производительность, простоту использования, полную совместимость с MongoDB и активную поддержку сообществом. Правильное использование индексов, оптимизация запросов и применение лучших практик позволяют создавать высокопроизводительные приложения, способные обрабатывать большие объемы данных.

При выборе между PyMongo и альтернативными решениями, такими как MongoEngine или Motor, следует учитывать специфику проекта: PyMongo идеально подходит для приложений, требующих прямого контроля над операциями с базой данных, в то время как ORM-решения могут быть предпочтительными для проектов с четко определенными схемами данных.

Регулярное обновление библиотеки, мониторинг производительности и следование рекомендациям по безопасности помогут максимально эффективно использовать возможности PyMongo в ваших проектах.

Новости