Introduction
In today’s software development landscape, NoSQL databases have become an integral part of the technology stack. MongoDB, as one of the leading document‑oriented DBMSs, provides developers with powerful tools for handling unstructured data. PyMongo serves as the official Python driver for MongoDB, delivering full integration between Python applications and MongoDB.
What Is PyMongo
PyMongo is the official Python library for interacting with MongoDB. It is a high‑performance driver that offers direct access to all MongoDB features, including modern capabilities such as transactions, change streams, and temporary collections.
Key Features of PyMongo
Compatibility and Version Support
- Supports MongoDB versions 3.6, 4.0, 4.2, 4.4, 5.0, 6.0 and 7.0
- Compatible with Python 3.7+
- Regular updates and security patches
Functional Capabilities
- Full BSON (Binary JSON) support
- Built‑in GridFS support for large file storage
- Automatic reconnection on failures
- SSL/TLS encryption support
- Performance monitoring and profiling
Architectural Advantages
- Thread‑safe operations
- Connection pooling
- Automatic cluster topology discovery
- Support for replication and sharding
Installing and Configuring PyMongo
Basic Installation
pip install pymongo
Installation with Optional Features
# For encryption support
pip install pymongo[encryption]
# For Kerberos (GSSAPI) support
pip install pymongo[gssapi]
# For asynchronous usage
pip install motor
Verify Installation
import pymongo
print(pymongo.version)
Connecting to MongoDB
Common Connection Methods
Local Connection
from pymongo import MongoClient
# Simple connection
client = MongoClient()
# or
client = MongoClient('localhost', 27017)
# or
client = MongoClient('mongodb://localhost:27017/')
Authenticated Connection
client = MongoClient('mongodb://username:password@localhost:27017/')
MongoDB Atlas Connection
client = MongoClient('mongodb+srv://username:password@cluster.mongodb.net/')
SSL Connection
client = MongoClient('mongodb://localhost:27017/',
ssl=True,
ssl_cert_reqs=ssl.CERT_NONE)
Connection Parameters
client = MongoClient(
host='localhost',
port=27017,
maxPoolSize=50,
minPoolSize=10,
maxIdleTimeMS=30000,
connectTimeoutMS=5000,
socketTimeoutMS=5000,
serverSelectionTimeoutMS=5000,
retryWrites=True,
retryReads=True
)
Core PyMongo Components
Working with Databases
# Get a database
db = client['mydatabase']
# or
db = client.mydatabase
# List all databases
databases = client.list_database_names()
# Drop a database
client.drop_database('mydatabase')
# Get database statistics
stats = db.command('dbstats')
Working with Collections
# Get a collection
collection = db['mycollection']
# or
collection = db.mycollection
# Create a collection with options
db.create_collection('mycollection',
capped=True,
size=1000000,
max=5000)
# List collections
collections = db.list_collection_names()
# Drop a collection
db.drop_collection('mycollection')
Comprehensive Table of PyMongo Methods and Functions
MongoClient Methods
| Method | Description | Example |
|---|---|---|
close() |
Closes the connection | client.close() |
list_database_names() |
Returns a list of database names | client.list_database_names() |
drop_database() |
Deletes a database | client.drop_database('mydb') |
start_session() |
Starts a client session | client.start_session() |
server_info() |
Retrieves server information | client.server_info() |
Database Methods
| Method | Description | Example |
|---|---|---|
list_collection_names() |
Returns a list of collection names | db.list_collection_names() |
create_collection() |
Creates a new collection | db.create_collection('users') |
drop_collection() |
Deletes a collection | db.drop_collection('users') |
command() |
Executes a database command | db.command('ping') |
with_options() |
Creates a copy with different options | db.with_options(codec_options=opts) |
Collection Methods – CRUD Operations
| Method | Description | Example |
|---|---|---|
insert_one() |
Inserts a single document | collection.insert_one({'name': 'John'}) |
insert_many() |
Inserts multiple documents | collection.insert_many([{}, {}]) |
find_one() |
Finds a single document | collection.find_one({'name': 'John'}) |
find() |
Finds multiple documents | collection.find({'age': {'$gt': 18}}) |
find_one_and_update() |
Finds and updates a document | collection.find_one_and_update(filter, update) |
find_one_and_replace() |
Finds and replaces a document | collection.find_one_and_replace(filter, replacement) |
find_one_and_delete() |
Finds and deletes a document | collection.find_one_and_delete(filter) |
update_one() |
Updates a single document | collection.update_one(filter, update) |
update_many() |
Updates multiple documents | collection.update_many(filter, update) |
replace_one() |
Replaces a single document | collection.replace_one(filter, replacement) |
delete_one() |
Deletes a single document | collection.delete_one(filter) |
delete_many() |
Deletes multiple documents | collection.delete_many(filter) |
Collection Methods – Indexes and Aggregation
| Method | Description | Example |
|---|---|---|
create_index() |
Creates an index | collection.create_index('name') |
create_indexes() |
Creates multiple indexes | collection.create_indexes([IndexModel(...)]) |
drop_index() |
Drops a specific index | collection.drop_index('name_1') |
drop_indexes() |
Drops all indexes | collection.drop_indexes() |
list_indexes() |
Lists all indexes | collection.list_indexes() |
index_information() |
Returns index metadata | collection.index_information() |
aggregate() |
Runs an aggregation pipeline | collection.aggregate(pipeline) |
count_documents() |
Counts documents matching a filter | collection.count_documents(filter) |
estimated_document_count() |
Provides an estimated document count | collection.estimated_document_count() |
distinct() |
Finds distinct values for a field | collection.distinct('field') |
Collection Methods – Additional Operations
| Method | Description | Example |
|---|---|---|
bulk_write() |
Performs bulk write operations | collection.bulk_write(operations) |
watch() |
Opens a change stream | collection.watch(pipeline) |
rename() |
Renames the collection | collection.rename('new_name') |
options() |
Returns collection options | collection.options() |
with_options() |
Creates a copy with different options | collection.with_options(codec_options=opts) |
Cursor Methods
| Method | Description | Example |
|---|---|---|
sort() |
Sorts the result set | cursor.sort('name', 1) |
limit() |
Limits the number of returned documents | cursor.limit(10) |
skip() |
Skips a number of documents | cursor.skip(5) |
hint() |
Specifies an index hint | cursor.hint('name_1') |
explain() |
Provides the execution plan | cursor.explain() |
count() |
Counts the documents in the cursor | cursor.count() |
close() |
Closes the cursor | cursor.close() |
Detailed CRUD Guide
Create
Insert a Single Document
from datetime import datetime
from bson import ObjectId
# Simple insert
result = collection.insert_one({
'name': 'John Doe',
'age': 30,
'email': 'john@example.com',
'created_at': datetime.now()
})
print(f"Inserted document ID: {result.inserted_id}")
Insert Multiple Documents
documents = [
{'name': 'Alice', 'age': 25, 'city': 'New York'},
{'name': 'Bob', 'age': 30, 'city': 'Los Angeles'},
{'name': 'Charlie', 'age': 35, 'city': 'Chicago'}
]
result = collection.insert_many(documents)
print(f"Inserted {len(result.inserted_ids)} documents")
Read
Basic Queries
# Find one document
user = collection.find_one({'name': 'John Doe'})
# Find all documents
for doc in collection.find():
print(doc)
# Query with conditions
adults = collection.find({'age': {'$gte': 18}})
Field Projection
# Return only name and age
users = collection.find({}, {'name': 1, 'age': 1, '_id': 0})
# Exclude specific fields
users = collection.find({}, {'password': 0, 'internal_id': 0})
Complex Queries
# Logical operators
query = {
'$and': [
{'age': {'$gte': 18}},
{'city': {'$in': ['New York', 'Los Angeles']}}
]
}
results = collection.find(query)
# Text search
collection.create_index([('name', 'text')])
results = collection.find({'$text': {'$search': 'John'}})
Update
Update a Single Document
# Using $set
collection.update_one(
{'name': 'John Doe'},
{'$set': {'age': 31, 'last_modified': datetime.now()}}
)
# Increment a field
collection.update_one(
{'name': 'John Doe'},
{'$inc': {'age': 1}}
)
# Push an element into an array
collection.update_one(
{'name': 'John Doe'},
{'$push': {'hobbies': 'reading'}}
)
Update Multiple Documents
# Update all users older than 30
collection.update_many(
{'age': {'$gt': 30}},
{'$set': {'category': 'senior'}}
)
Upsert Operations
# Insert if the document does not exist
collection.update_one(
{'username': 'newuser'},
{'$set': {'email': 'new@example.com'}},
upsert=True
)
Delete
Delete a Single Document
result = collection.delete_one({'name': 'John Doe'})
print(f"Deleted {result.deleted_count} document")
Delete Multiple Documents
result = collection.delete_many({'age': {'$lt': 18}})
print(f"Deleted {result.deleted_count} documents")
Working with Indexes
Creating Indexes
Simple Indexes
# Single field index
collection.create_index('name')
# Compound index
collection.create_index([('name', 1), ('age', -1)])
# Unique index
collection.create_index('email', unique=True)
Special Index Types
# Text index
collection.create_index([('title', 'text'), ('content', 'text')])
# Geospatial index
collection.create_index([('location', '2dsphere')])
# Partial index
collection.create_index(
'email',
partialFilterExpression={'email': {'$exists': True}}
)
# TTL index (Time To Live)
collection.create_index(
'expireAt',
expireAfterSeconds=3600
)
Managing Indexes
# List all indexes
indexes = list(collection.list_indexes())
# Get index metadata
index_info = collection.index_information()
# Drop a specific index
collection.drop_index('name_1')
# Drop all indexes (except _id)
collection.drop_indexes()
Data Aggregation
Aggregation Basics
# Simple aggregation pipeline
pipeline = [
{'$match': {'age': {'$gte': 18}}},
{'$group': {'_id': '$city', 'count': {'$sum': 1}}},
{'$sort': {'count': -1}}
]
results = collection.aggregate(pipeline)
for result in results:
print(result)
Advanced Aggregations
# Complex pipeline with multiple stages
pipeline = [
# Filter
{'$match': {'status': 'active'}},
# Add computed fields
{'$addFields': {
'age_group': {
'$cond': {
'if': {'$gte': ['$age', 30]},
'then': 'adult',
'else': 'young'
}
}
}},
# Group
{'$group': {
'_id': '$age_group',
'count': {'$sum': 1},
'avg_age': {'$avg': '$age'},
'max_age': {'$max': '$age'}
}},
# Sort
{'$sort': {'count': -1}},
# Limit results
{'$limit': 10}
]
results = collection.aggregate(pipeline)
Aggregation with $lookup
# Join collections
pipeline = [
{'$lookup': {
'from': 'orders',
'localField': '_id',
'foreignField': 'user_id',
'as': 'user_orders'
}},
{'$match': {'user_orders': {'$ne': []}}},
{'$addFields': {
'total_orders': {'$size': '$user_orders'}
}}
]
results = collection.aggregate(pipeline)
Transactions
Transaction Basics
# Simple transaction
with client.start_session() as session:
with session.start_transaction():
try:
# Operations inside the transaction
collection.insert_one(
{'name': 'Test User', 'balance': 1000},
session=session
)
another_collection.update_one(
{'_id': some_id},
{'$inc': {'total': 1000}},
session=session
)
# Transaction is automatically committed
except Exception as e:
# Transaction is automatically aborted
print(f"Transaction failed: {e}")
Manual Transaction Management
session = client.start_session()
try:
session.start_transaction()
# Operations
collection.insert_one({'test': 'data'}, session=session)
# Manual commit
session.commit_transaction()
except Exception as e:
# Manual abort
session.abort_transaction()
print(f"Error: {e}")
finally:
session.end_session()
Asynchronous Operations with Motor
Installation and Connection
import motor.motor_asyncio
import asyncio
# Asynchronous client
client = motor.motor_asyncio.AsyncIOMotorClient('mongodb://localhost:27017')
db = client.test_database
collection = db.test_collection
Async CRUD Operations
async def async_operations():
# Insert
result = await collection.insert_one({'name': 'Async User'})
# Find
document = await collection.find_one({'name': 'Async User'})
# Update
await collection.update_one(
{'name': 'Async User'},
{'$set': {'status': 'updated'}}
)
# Async iteration
async for doc in collection.find():
print(doc)
# Aggregation
pipeline = [{'$match': {'status': 'active'}}]
async for doc in collection.aggregate(pipeline):
print(doc)
# Run async operations
asyncio.run(async_operations())
Working with GridFS
GridFS Basics
import gridfs
# Create GridFS instance
fs = gridfs.GridFS(db)
# Store a file
with open('image.jpg', 'rb') as f:
file_id = fs.put(f, filename='image.jpg', metadata={'type': 'image'})
# Retrieve a file
file_data = fs.get(file_id)
with open('downloaded_image.jpg', 'wb') as f:
f.write(file_data.read())
# Find files
for file in fs.find({'metadata.type': 'image'}):
print(f"File: {file.filename}, ID: {file._id}")
GridFS with Advanced Options
# GridFS with custom collection names
fs = gridfs.GridFS(db, collection='custom_files')
# Store with metadata
file_id = fs.put(
b'file content',
filename='test.txt',
content_type='text/plain',
metadata={
'author': 'John Doe',
'created': datetime.now(),
'tags': ['important', 'document']
}
)
# Delete a file
fs.delete(file_id)
Monitoring and Change Tracking
Change Streams
# Watch changes on a collection
change_stream = collection.watch()
for change in change_stream:
print(f"Change detected: {change}")
# Watch specific operation types
pipeline = [{'$match': {'operationType': 'insert'}}]
change_stream = collection.watch(pipeline)
for change in change_stream:
print(f"New document inserted: {change['fullDocument']}")
Performance Monitoring
# Enable profiling (level 2 = all operations)
db.set_profiling_level(2)
# Retrieve slow queries
slow_queries = db.system.profile.find().sort('ts', -1).limit(5)
for query in slow_queries:
print(f"Duration: {query['millis']}ms, Query: {query['command']}")
# Disable profiling
db.set_profiling_level(0)
Performance Optimization
Best Practices for Queries
Leverage Indexes
# Create an index before running queries
collection.create_index([('status', 1), ('created_at', -1)])
# Query will use the index
results = collection.find({'status': 'active'}).sort('created_at', -1)
# Verify index usage
explain = collection.find({'status': 'active'}).explain()
print(explain['executionStats'])
Optimize Projections
# Load only needed fields
users = collection.find(
{'age': {'$gte': 18}},
{'name': 1, 'email': 1, '_id': 0}
)
# Exclude large fields
users = collection.find({}, {'large_field': 0})
Pagination
def paginate_results(collection, query, page_size=10, page_num=1):
skip = (page_num - 1) * page_size
return collection.find(query).skip(skip).limit(page_size)
# Example usage
page_1 = paginate_results(collection, {'status': 'active'}, 10, 1)
Connection Optimization
# Configure connection pool
client = MongoClient(
'mongodb://localhost:27017/',
maxPoolSize=50,
minPoolSize=10,
maxIdleTimeMS=30000,
waitQueueTimeoutMS=5000
)
# Use read preferences
from pymongo import ReadPreference
collection_secondary = collection.with_options(
read_preference=ReadPreference.SECONDARY
)
Security
Authentication and Authorization
# Connect with authentication
client = MongoClient(
'mongodb://username:password@localhost:27017/',
authSource='admin'
)
# Use different authentication mechanisms
client = MongoClient(
'mongodb://localhost:27017/',
username='user',
password='pass',
authSource='admin',
authMechanism='SCRAM-SHA-256'
)
SSL/TLS Connections
import ssl
client = MongoClient(
'mongodb://localhost:27017/',
ssl=True,
ssl_cert_reqs=ssl.CERT_REQUIRED,
ssl_ca_certs='path/to/ca.pem',
ssl_certfile='path/to/client.pem'
)
Error Handling
Common Exceptions
from pymongo.errors import (
ConnectionFailure,
ServerSelectionTimeoutError,
DuplicateKeyError,
WriteError,
BulkWriteError
)
try:
collection.insert_one({'email': 'duplicate@example.com'})
except DuplicateKeyError as e:
print(f"Duplicate key error: {e}")
except ConnectionFailure as e:
print(f"Connection failed: {e}")
except ServerSelectionTimeoutError as e:
print(f"Server selection timeout: {e}")
Retry Logic
import time
from pymongo.errors import AutoReconnect
def retry_operation(func, max_retries=3, delay=1):
for attempt in range(max_retries):
try:
return func()
except AutoReconnect as e:
if attempt == max_retries - 1:
raise
time.sleep(delay * (2 ** attempt))
# Usage example
result = retry_operation(
lambda: collection.find_one({'_id': some_id})
)
Practical Usage Examples
Flask Web Application
from flask import Flask, request, jsonify
from pymongo import MongoClient
from bson import ObjectId
from datetime import datetime
app = Flask(__name__)
client = MongoClient('mongodb://localhost:27017/')
db = client.blog_db
posts = db.posts
@app.route('/posts', methods=['POST'])
def create_post():
data = request.json
post = {
'title': data['title'],
'content': data['content'],
'author': data['author'],
'created_at': datetime.now(),
'tags': data.get('tags', []),
'views': 0
}
result = posts.insert_one(post)
return jsonify({'id': str(result.inserted_id)})
@app.route('/posts/<post_id>', methods=['GET'])
def get_post(post_id):
post = posts.find_one({'_id': ObjectId(post_id)})
if post:
post['_id'] = str(post['_id'])
# Increment view counter
posts.update_one(
{'_id': ObjectId(post_id)},
{'$inc': {'views': 1}}
)
return jsonify(post)
return jsonify({'error': 'Post not found'}), 404
@app.route('/posts', methods=['GET'])
def get_posts():
page = int(request.args.get('page', 1))
per_page = int(request.args.get('per_page', 10))
posts_cursor = posts.find().sort('created_at', -1)
posts_cursor = posts_cursor.skip((page - 1) * per_page).limit(per_page)
posts_list = []
for post in posts_cursor:
post['_id'] = str(post['_id'])
posts_list.append(post)
return jsonify(posts_list)
Logging System
import logging
from pymongo import MongoClient
from datetime import datetime
class MongoHandler(logging.Handler):
def __init__(self, connection_string, db_name, collection_name):
super().__init__()
self.client = MongoClient(connection_string)
self.db = self.client[db_name]
self.collection = self.db[collection_name]
# Create TTL index to auto‑remove old logs
self.collection.create_index(
'timestamp',
expireAfterSeconds=30*24*60*60 # 30 days
)
def emit(self, record):
log_entry = {
'timestamp': datetime.now(),
'level': record.levelname,
'message': record.getMessage(),
'module': record.module,
'function': record.funcName,
'line': record.lineno,
'thread': record.thread,
'process': record.process
}
if record.exc_info:
log_entry['exception'] = self.format(record)
self.collection.insert_one(log_entry)
# Usage
logger = logging.getLogger('my_app')
mongo_handler = MongoHandler(
'mongodb://localhost:27017/',
'logs_db',
'app_logs'
)
logger.addHandler(mongo_handler)
logger.setLevel(logging.INFO)
logger.info('Application started')
logger.error('An error occurred')
Caching System
from pymongo import MongoClient
from datetime import datetime, timedelta
import json
import hashlib
class MongoCache:
def __init__(self, connection_string, db_name, collection_name='cache'):
self.client = MongoClient(connection_string)
self.db = self.client[db_name]
self.collection = self.db[collection_name]
# TTL index for automatic expiration
self.collection.create_index('expires_at', expireAfterSeconds=0)
def _generate_key(self, key):
"""Generate a hash for the cache key"""
return hashlib.md5(str(key).encode()).hexdigest()
def set(self, key, value, ttl=3600):
"""Store a value with a TTL"""
hashed_key = self._generate_key(key)
expires_at = datetime.now() + timedelta(seconds=ttl)
self.collection.replace_one(
{'_id': hashed_key},
{
'_id': hashed_key,
'value': value,
'expires_at': expires_at,
'created_at': datetime.now()
},
upsert=True
)
def get(self, key):
"""Retrieve a value by key"""
hashed_key = self._generate_key(key)
doc = self.collection.find_one({'_id': hashed_key})
if doc and doc['expires_at'] > datetime.now():
return doc['value']
return None
def delete(self, key):
"""Delete a value by key"""
hashed_key = self._generate_key(key)
self.collection.delete_one({'_id': hashed_key})
def clear(self):
"""Clear the entire cache"""
self.collection.delete_many({})
# Usage
cache = MongoCache('mongodb://localhost:27017/', 'cache_db')
# Store in cache
cache.set('user:123', {'name': 'John', 'age': 30}, ttl=1800)
# Retrieve from cache
user_data = cache.get('user:123')
if user_data:
print(f"Cached data: {user_data}")
else:
print("Cache miss")
Frequently Asked Questions
How should I properly close a MongoDB connection?
# Proper way to close the connection
client = MongoClient('mongodb://localhost:27017/')
try:
# Perform operations
db = client.test_db
collection = db.test_collection
collection.insert_one({'test': 'data'})
finally:
client.close()
# Or use a context manager
with MongoClient('mongodb://localhost:27017/') as client:
db = client.test_db
collection = db.test_collection
collection.insert_one({'test': 'data'})
How can I handle large query result sets?
# Use batch_size for large results
cursor = collection.find().batch_size(100)
for document in cursor:
process_document(document)
# Or use limit and skip for pagination
def process_large_collection(collection, batch_size=1000):
skip = 0
while True:
batch = list(collection.find().skip(skip).limit(batch_size))
if not batch:
break
for document in batch:
process_document(document)
skip += batch_size
How do I optimize queries that involve sorting?
# Create an index for sorting
collection.create_index([('created_at', -1)])
# Use hint to force index usage
results = collection.find().sort('created_at', -1).hint('created_at_-1')
# Compound index for filter + sort
collection.create_index([('status', 1), ('created_at', -1)])
results = collection.find({'status': 'active'}).sort('created_at', -1)
How can I work with large files in MongoDB?
import gridfs
from pymongo import MongoClient
client = MongoClient()
db = client.test_db
fs = gridfs.GridFS(db)
# Store a large file
with open('large_file.pdf', 'rb') as f:
file_id = fs.put(f, filename='large_file.pdf')
# Read a file in chunks
file_obj = fs.get(file_id)
with open('downloaded_file.pdf', 'wb') as f:
while True:
chunk = file_obj.read(1024) # Read 1KB at a time
if not chunk:
break
f.write(chunk)
How do I implement full‑text search?
# Create a text index
collection.create_index([
('title', 'text'),
('content', 'text')
])
# Perform a text search
results = collection.find({
'$text': {
'$search': 'python mongodb',
'$language': 'russian'
}
})
# Weighted field search
collection.create_index([
('title', 'text'),
('content', 'text')
], weights={'title': 10, 'content': 1})
How can I implement data change auditing?
from datetime import datetime
class AuditMixin:
def __init__(self, collection):
self.collection = collection
self.audit_collection = collection.database[f"{collection.name}_audit"]
def insert_one_with_audit(self, document, user_id=None):
document['created_at'] = datetime.now()
document['created_by'] = user_id
result = self.collection.insert_one(document)
# Write audit record
self.audit_collection.insert_one({
'document_id': result.inserted_id,
'operation': 'insert',
'user_id': user_id,
'timestamp': datetime.now(),
'data': document
})
return result
def update_one_with_audit(self, filter_query, update, user_id=None):
# Retrieve original document
original = self.collection.find_one(filter_query)
# Perform update
result = self.collection.update_one(filter_query, update)
if result.modified_count > 0:
# Write audit record
self.audit_collection.insert_one({
'document_id': original['_id'],
'operation': 'update',
'user_id': user_id,
'timestamp': datetime.now(),
'original_data': original,
'update_query': update
})
return result
Conclusion
PyMongo is a powerful and flexible tool for working with MongoDB in Python applications. The library offers full access to MongoDB’s capabilities, including modern features such as transactions, change streams, and temporary collections.
Key advantages of PyMongo include high performance, ease of use, complete compatibility with MongoDB, and active community support. Proper index usage, query optimization, and adherence to best practices enable the creation of high‑throughput applications capable of handling large data volumes.
When choosing between PyMongo and alternatives like MongoEngine or Motor, consider project specifics: PyMongo is ideal for applications that require direct control over database operations, while ORM solutions may be preferable for projects with well‑defined data schemas.
Regularly updating the library, monitoring performance, and following security recommendations will help you get the most out of PyMongo in your projects.
The Future of AI in Mathematics and Everyday Life: How Intelligent Agents Are Already Changing the Game
Experts warned about the risks of fake charity with AI
In Russia, universal AI-agent for robots and industrial processes was developed