Psycopg2: Complete Guide to Working with PostgreSQL in Python
Introduction
PostgreSQL is a powerful open‑source relational DBMS known for its reliability, scalability, and support for complex data types. The most popular and reliable tool for accessing PostgreSQL from Python is Psycopg2 – the official synchronous driver that provides a stable and high‑performance solution for connecting to the database, executing SQL queries, managing transactions, and integrating with web frameworks.
Psycopg2 is the de‑facto standard for PostgreSQL in Python projects thanks to its maturity, feature completeness, and active community support.
What Is Psycopg2
Psycopg2 is a Python adapter for PostgreSQL written in C using libpq (the official PostgreSQL client library). The library fully complies with the DB‑API 2.0 specification and offers many PostgreSQL‑specific extensions.
Key Features of Psycopg2:
- High performance: C implementation and optimized libpq interaction
- Security: Built‑in protection against SQL injection via parameterised queries
- Transaction support: Full control with optional automatic management
- Data‑type compatibility: Automatic conversion between PostgreSQL and Python types
- Extension support: Works with JSON, UUID, arrays, hstore and other PostgreSQL data types
- Thread safety: Usable in multithreaded applications
Installation and Setup
Installing the Library
pip install psycopg2-binary
For production environments the full package is recommended:
pip install psycopg2
System Requirements
Before installing, make sure the required system dependencies are present:
Ubuntu/Debian:
sudo apt-get install libpq-dev python3-dev
CentOS/RHEL:
sudo yum install postgresql-devel python3-devel
macOS:
brew install postgresql
Connecting to the Database
Basic Connection
import psycopg2
# Connect to PostgreSQL
conn = psycopg2.connect(
dbname="mydb",
user="postgres",
password="password",
host="localhost",
port="5432"
)
# Create a cursor
cur = conn.cursor()
Connection via DSN String
conn = psycopg2.connect("postgresql://user:password@localhost:5432/dbname")
Connection with Additional Parameters
conn = psycopg2.connect(
dbname="mydb",
user="postgres",
password="password",
host="localhost",
port="5432",
sslmode="require",
connect_timeout=10,
application_name="my_app"
)
Using Context Managers
import psycopg2
from contextlib import contextmanager
@contextmanager
def get_db_connection():
conn = psycopg2.connect(
dbname="mydb",
user="postgres",
password="password",
host="localhost"
)
try:
yield conn
finally:
conn.close()
# Usage
with get_db_connection() as conn:
with conn.cursor() as cur:
cur.execute("SELECT * FROM users")
results = cur.fetchall()
Creating Databases and Tables
Creating a Table
cur.execute("""
CREATE TABLE IF NOT EXISTS users (
id SERIAL PRIMARY KEY,
name VARCHAR(100) NOT NULL,
email VARCHAR(255) UNIQUE NOT NULL,
age INTEGER CHECK (age > 0),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
is_active BOOLEAN DEFAULT TRUE
)
""")
conn.commit()
Creating Indexes
cur.execute("CREATE INDEX IF NOT EXISTS idx_users_email ON users(email)")
cur.execute("CREATE INDEX IF NOT EXISTS idx_users_created_at ON users(created_at)")
conn.commit()
Creating Composite Tables
cur.execute("""
CREATE TABLE IF NOT EXISTS orders (
id SERIAL PRIMARY KEY,
user_id INTEGER REFERENCES users(id) ON DELETE CASCADE,
product_name VARCHAR(200) NOT NULL,
quantity INTEGER NOT NULL DEFAULT 1,
price DECIMAL(10,2) NOT NULL,
order_date TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
metadata JSONB
)
""")
conn.commit()
CRUD Operations with Psycopg2
Inserting Records
Single Insert
cur.execute(
"INSERT INTO users (name, email, age) VALUES (%s, %s, %s)",
("Ivan Ivanov", "ivan@example.com", 30)
)
conn.commit()
Getting the Inserted ID
cur.execute(
"INSERT INTO users (name, email, age) VALUES (%s, %s, %s) RETURNING id",
("Petr Petrov", "petr@example.com", 25)
)
user_id = cur.fetchone()[0]
conn.commit()
Bulk Insert
users_data = [
("Anna Smirnova", "anna@example.com", 28),
("Maxim Kozlov", "maxim@example.com", 35),
("Elena Vasilieva", "elena@example.com", 24)
]
cur.executemany(
"INSERT INTO users (name, email, age) VALUES (%s, %s, %s)",
users_data
)
conn.commit()
Efficient Bulk Insert
from psycopg2.extras import execute_values
execute_values(
cur,
"INSERT INTO users (name, email, age) VALUES %s",
users_data,
template=None,
page_size=100
)
conn.commit()
Selecting Data
Basic Select
cur.execute("SELECT id, name, email, age FROM users")
rows = cur.fetchall()
for row in rows:
print(f"ID: {row[0]}, Name: {row[1]}, Email: {row[2]}, Age: {row[3]}")
Select with Conditions
cur.execute("SELECT * FROM users WHERE age > %s AND is_active = %s", (25, True))
active_users = cur.fetchall()
Select with Sorting and Limit
cur.execute("""
SELECT name, email, age
FROM users
WHERE age BETWEEN %s AND %s
ORDER BY age DESC
LIMIT %s
""", (20, 50, 10))
Using fetchone() and fetchmany()
# Fetch a single row
cur.execute("SELECT * FROM users WHERE id = %s", (1,))
user = cur.fetchone()
# Fetch multiple rows
cur.execute("SELECT * FROM users")
first_five = cur.fetchmany(5)
# Iterate over the result set
cur.execute("SELECT * FROM users")
for row in cur:
print(row)
Updating Data
# Update a single row
cur.execute(
"UPDATE users SET email = %s WHERE id = %s",
("new_email@example.com", 1)
)
# Update multiple columns
cur.execute("""
UPDATE users
SET email = %s, age = %s, is_active = %s
WHERE id = %s
""", ("updated@example.com", 31, True, 1))
# Bulk update
cur.execute(
"UPDATE users SET is_active = %s WHERE age < %s",
(False, 18)
)
conn.commit()
Deleting Data
# Delete by ID
cur.execute("DELETE FROM users WHERE id = %s", (1,))
# Conditional delete
cur.execute("DELETE FROM users WHERE is_active = %s", (False,))
# Delete with RETURNING
cur.execute("DELETE FROM users WHERE age < %s RETURNING id, name", (18,))
deleted_users = cur.fetchall()
conn.commit()
Working with Cursors and Transactions
Transaction Management
Psycopg2 uses transactions by default. Every data‑modifying operation must be confirmed with commit():
try:
cur.execute("INSERT INTO users (name, email) VALUES (%s, %s)",
("Test", "test@example.com"))
cur.execute("UPDATE users SET age = %s WHERE email = %s",
(25, "test@example.com"))
conn.commit()
print("Transaction completed successfully")
except Exception as e:
conn.rollback()
print(f"Error: {e}")
Automatic Transaction Management
conn.autocommit = True # Enable autocommit
cur.execute("INSERT INTO users (name, email) VALUES (%s, %s)",
("Auto", "auto@example.com"))
# Commit happens automatically
Using with for Transactions
try:
with conn:
with conn.cursor() as cur:
cur.execute("INSERT INTO users (name, email) VALUES (%s, %s)",
("Context", "context@example.com"))
cur.execute("UPDATE users SET age = %s WHERE email = %s",
(30, "context@example.com"))
# Automatic commit on block exit
except Exception as e:
# Automatic rollback on error
print(f"Transaction error: {e}")
Savepoints
try:
with conn:
with conn.cursor() as cur:
cur.execute("INSERT INTO users (name, email) VALUES (%s, %s)",
("User1", "user1@example.com"))
# Create a savepoint
cur.execute("SAVEPOINT sp1")
try:
cur.execute("INSERT INTO users (name, email) VALUES (%s, %s)",
("User2", "user2@example.com"))
cur.execute("INSERT INTO users (name, email) VALUES (%s, %s)",
("Duplicate", "user1@example.com")) # Duplicate error
except Exception:
# Roll back to savepoint
cur.execute("ROLLBACK TO SAVEPOINT sp1")
print("Rolled back to savepoint")
cur.execute("INSERT INTO users (name, email) VALUES (%s, %s)",
("User3", "user3@example.com"))
except Exception as e:
print(f"Error: {e}")
Reading Query Results
Data Retrieval Methods
# Execute a query
cur.execute("SELECT id, name, email FROM users LIMIT 10")
# fetchone() – get a single row
first_row = cur.fetchone()
print(first_row) # (1, 'Ivan', 'ivan@example.com')
# fetchmany(n) – get n rows
cur.execute("SELECT id, name, email FROM users")
five_rows = cur.fetchmany(5)
for row in five_rows:
print(row)
# fetchall() – get all rows
cur.execute("SELECT id, name, email FROM users")
all_rows = cur.fetchall()
print(f"Total rows: {len(all_rows)}")
Iterating Over a Cursor
cur.execute("SELECT id, name, email FROM users")
for row in cur:
id, name, email = row
print(f"ID: {id}, Name: {name}, Email: {email}")
Working with Named Cursors
from psycopg2.extras import DictCursor
# Create a cursor that allows column name access
cur = conn.cursor(cursor_factory=DictCursor)
cur.execute("SELECT id, name, email FROM users LIMIT 1")
row = cur.fetchone()
print(row['name']) # Access by column name
print(row['email']) # Access by column name
print(row[0]) # Index access also works
Server‑Side Cursors for Large Datasets
# Create a server‑side cursor for processing huge result sets
cur = conn.cursor(name='large_data_cursor')
cur.execute("SELECT * FROM large_table")
# Process data in batches
while True:
rows = cur.fetchmany(1000)
if not rows:
break
for row in rows:
# Process each row
process_row(row)
cur.close()
Security and SQL Injection Protection
Correct Use of Parameters
# CORRECT – use parameterised queries
email = "user@example.com"
cur.execute("SELECT * FROM users WHERE email = %s", (email,))
# CORRECT – multiple parameters
cur.execute("SELECT * FROM users WHERE age > %s AND email = %s", (25, email))
# CORRECT – named parameters
cur.execute("SELECT * FROM users WHERE age > %(min_age)s AND email = %(email)s",
{'min_age': 25, 'email': email})
What NOT to Do
# WRONG – vulnerable to SQL injection
email = "user@example.com"
cur.execute(f"SELECT * FROM users WHERE email = '{email}'")
# WRONG – using .format()
cur.execute("SELECT * FROM users WHERE email = '{}'".format(email))
# WRONG – using % for strings
cur.execute("SELECT * FROM users WHERE email = '%s'" % email)
Escaping Identifiers
from psycopg2.sql import SQL, Identifier, Literal
# Safe dynamic query construction
table_name = "users"
column_name = "email"
query = SQL("SELECT * FROM {} WHERE {} = %s").format(
Identifier(table_name),
Identifier(column_name)
)
cur.execute(query, ("user@example.com",))
Composing Complex Queries
from psycopg2.sql import SQL, Identifier, Literal, Composed
# Build a complex SELECT
def build_select_query(table, columns, where_conditions):
query = SQL("SELECT {} FROM {}").format(
SQL(', ').join(map(Identifier, columns)),
Identifier(table)
)
if where_conditions:
where_clause = SQL(" AND ").join(
SQL("{} = %s").format(Identifier(col))
for col in where_conditions.keys()
)
query = SQL("{} WHERE {}").format(query, where_clause)
return query
# Usage
query = build_select_query(
'users',
['id', 'name', 'email'],
{'age': 25, 'is_active': True}
)
cur.execute(query, (25, True))
Working with PostgreSQL Data Types
Automatic Type Conversion
Psycopg2 automatically converts data types between PostgreSQL and Python:
| PostgreSQL Type | Python Type | Example |
|---|---|---|
| INTEGER, SERIAL | int | 42 |
| VARCHAR, TEXT | str | "string" |
| BOOLEAN | bool | True |
| DECIMAL, NUMERIC | decimal.Decimal | Decimal('123.45') |
| TIMESTAMP | datetime.datetime | datetime.now() |
| DATE | datetime.date | date.today() |
| TIME | datetime.time | time(14, 30) |
| JSON, JSONB | dict, list | {"key": "value"} |
| UUID | uuid.UUID | uuid.uuid4() |
| BYTEA | bytes | b"binary data" |
| ARRAY | list | [1, 2, 3] |
Working with JSON Data
import json
from psycopg2.extras import Json
# Create a table with JSON
cur.execute("""
CREATE TABLE IF NOT EXISTS logs (
id SERIAL PRIMARY KEY,
message TEXT,
metadata JSONB,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")
# Insert JSON data
metadata = {
"ip": "192.168.1.1",
"user_agent": "Mozilla/5.0...",
"session_id": "abc123"
}
cur.execute(
"INSERT INTO logs (message, metadata) VALUES (%s, %s)",
("User logged in", Json(metadata))
)
# Query JSON data
cur.execute("SELECT metadata FROM logs WHERE id = %s", (1,))
row = cur.fetchone()
print(row[0]['ip']) # Automatically converted to dict
Working with PostgreSQL Arrays
# Create a table with an array column
cur.execute("""
CREATE TABLE IF NOT EXISTS products (
id SERIAL PRIMARY KEY,
name TEXT,
tags TEXT[],
prices DECIMAL[]
)
""")
# Insert arrays
cur.execute(
"INSERT INTO products (name, tags, prices) VALUES (%s, %s, %s)",
("Laptop", ["electronics", "computer"], [50000.00, 45000.00])
)
# Search within an array
cur.execute("SELECT * FROM products WHERE %s = ANY(tags)", ("electronics",))
Working with UUIDs
import uuid
# Create a table with UUID
cur.execute("""
CREATE TABLE IF NOT EXISTS sessions (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
user_id INTEGER,
token UUID NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")
# Insert a UUID
session_token = uuid.uuid4()
cur.execute(
"INSERT INTO sessions (user_id, token) VALUES (%s, %s)",
(1, session_token)
)
# Query by UUID
cur.execute("SELECT * FROM sessions WHERE token = %s", (session_token,))
Working with Temporal Types
from datetime import datetime, date, time
# Create a table with temporal columns
cur.execute("""
CREATE TABLE IF NOT EXISTS events (
id SERIAL PRIMARY KEY,
title TEXT,
event_date DATE,
event_time TIME,
created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP
)
""")
# Insert temporal data
cur.execute(
"INSERT INTO events (title, event_date, event_time) VALUES (%s, %s, %s)",
("Meeting", date(2024, 12, 25), time(14, 30))
)
# Work with time zones
cur.execute("SELECT created_at AT TIME ZONE 'UTC' FROM events")
Advanced Psycopg2 Features
Working with Stored Procedures
# Create a stored procedure
cur.execute("""
CREATE OR REPLACE FUNCTION get_user_stats(user_id INTEGER)
RETURNS TABLE(total_orders INTEGER, total_amount DECIMAL) AS $$
BEGIN
RETURN QUERY
SELECT COUNT(*)::INTEGER, COALESCE(SUM(price * quantity), 0)::DECIMAL
FROM orders
WHERE orders.user_id = get_user_stats.user_id;
END;
$$ LANGUAGE plpgsql;
""")
# Call the stored procedure
cur.callproc('get_user_stats', (1,))
result = cur.fetchone()
print(f"Orders: {result[0]}, Amount: {result[1]}")
Working with Large Objects (LOBs)
# Create a large object
large_obj = conn.lobject(mode='w')
large_obj.write(b"Large amount of data...")
loid = large_obj.oid
large_obj.close()
# Store a reference to the LOB
cur.execute("INSERT INTO files (name, data_oid) VALUES (%s, %s)",
("document.pdf", loid))
# Read the large object
cur.execute("SELECT data_oid FROM files WHERE name = %s", ("document.pdf",))
oid = cur.fetchone()[0]
large_obj = conn.lobject(oid, mode='r')
data = large_obj.read()
large_obj.close()
Asynchronous Notifications
# Subscribe to notifications
cur.execute("LISTEN user_updates")
conn.commit()
# Send a notification (from another connection)
cur.execute("NOTIFY user_updates, 'New user created'")
conn.commit()
# Check for notifications
conn.poll()
while conn.notifies:
notify = conn.notifies.pop(0)
print(f"Received notification: {notify.channel} - {notify.payload}")
Copying Data
from io import StringIO
# COPY TO – export data
cur.execute("COPY users TO STDOUT WITH CSV HEADER")
with open('users_export.csv', 'w') as f:
while True:
data = cur.fetchone()
if data is None:
break
f.write(data)
# COPY FROM – import data
with open('users_import.csv', 'r') as f:
cur.copy_from(f, 'users', columns=('name', 'email', 'age'), sep=',')
Comprehensive List of Psycopg2 Methods and Functions
Main Connection Methods
| Method / Function | Description | Example Usage |
|---|---|---|
psycopg2.connect() |
Create a connection to the database | conn = psycopg2.connect(dsn) |
conn.cursor() |
Create a cursor for executing queries | cur = conn.cursor() |
conn.close() |
Close the connection | conn.close() |
conn.commit() |
Commit the current transaction | conn.commit() |
conn.rollback() |
Rollback the current transaction | conn.rollback() |
Cursor Methods
| Method | Description | Example Usage |
|---|---|---|
cur.execute(sql, vars) |
Execute an SQL statement | cur.execute("SELECT * FROM users WHERE id = %s", (1,)) |
cur.executemany(sql, vars_list) |
Execute a statement for a sequence of parameters | cur.executemany("INSERT INTO users (name) VALUES (%s)", [("John",), ("Jane",)]) |
cur.fetchone() |
Fetch a single row | row = cur.fetchone() |
cur.fetchmany(size) |
Fetch a number of rows | rows = cur.fetchmany(10) |
cur.fetchall() |
Fetch all remaining rows | all_rows = cur.fetchall() |
cur.callproc(procname, vars) |
Call a stored procedure | cur.callproc('my_proc', (param1, param2)) |
cur.close() |
Close the cursor | cur.close() |
cur.copy_from(file, table) |
Copy data from a file into a table | cur.copy_from(f, 'users', columns=('name', 'email')) |
cur.copy_to(file, table) |
Copy data from a table into a file | cur.copy_to(f, 'users') |
Cursor Attributes
| Attribute | Description | Example Usage |
|---|---|---|
cur.description |
Metadata about result columns | columns = [desc[0] for desc in cur.description] |
cur.rowcount |
Number of rows affected | print(f"Rows updated: {cur.rowcount}") |
cur.rownumber |
Current row number in the result set | print(f"Current row: {cur.rownumber}") |
cur.query |
Last executed query | print(cur.query) |
cur.statusmessage |
Message describing the command status | print(cur.statusmessage) |
Transaction Management Methods
| Method | Description | Example Usage |
|---|---|---|
conn.autocommit |
Enable/disable autocommit mode | conn.autocommit = True |
conn.get_transaction_status() |
Get the current transaction status | status = conn.get_transaction_status() |
conn.set_isolation_level(level) |
Set the transaction isolation level | conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_SERIALIZABLE) |
conn.set_session(readonly=True) |
Configure session parameters | conn.set_session(readonly=True, deferrable=True) |
Additional Modules and Functions
| Module / Function | Description | Example Usage |
|---|---|---|
psycopg2.extras.DictCursor |
Cursor that allows column access by name | cur = conn.cursor(cursor_factory=DictCursor) |
psycopg2.extras.RealDictCursor |
Cursor that returns real dictionaries | cur = conn.cursor(cursor_factory=RealDictCursor) |
psycopg2.extras.execute_values() |
Efficient bulk insertion of many rows | execute_values(cur, "INSERT INTO t VALUES %s", data) |
psycopg2.extras.execute_batch() |
Batch execution of statements | execute_batch(cur, "INSERT INTO t VALUES (%s, %s)", data) |
psycopg2.extras.Json() |
Wrapper for JSON data | cur.execute("INSERT INTO t (data) VALUES (%s)", (Json(data),)) |
psycopg2.sql.SQL() |
Compose SQL statements safely | query = SQL("SELECT * FROM {}").format(Identifier(table)) |
psycopg2.sql.Identifier() |
Safely escape identifiers | Identifier('table_name') |
psycopg2.sql.Literal() |
Safely escape literals | Literal('string_value') |
Error Handling
| Exception | Description | When It Occurs |
|---|---|---|
psycopg2.Error |
Base class for all database errors | Parent of all other exceptions |
psycopg2.Warning |
Warning messages from the server | Server‑side notices |
psycopg2.InterfaceError |
Errors related to the DB‑API interface | Module‑level problems |
psycopg2.DatabaseError |
General database errors | Issues directly tied to the database |
psycopg2.DataError |
Invalid data supplied | Incorrect data types or values |
psycopg2.OperationalError |
Operational problems | Connection failures, transaction issues |
psycopg2.IntegrityError |
Integrity constraint violations | Unique, foreign‑key, check constraints |
psycopg2.InternalError |
Internal PostgreSQL errors | Server‑side internal failures |
psycopg2.ProgrammingError |
Programming mistakes | SQL syntax errors, misuse of the API |
psycopg2.NotSupportedError |
Unsupported operation | Attempting features not supported by the driver |
Integration with Popular Frameworks
Integration with Pandas
import pandas as pd
import psycopg2
# Read data from PostgreSQL into a DataFrame
conn = psycopg2.connect(dsn)
df = pd.read_sql("SELECT * FROM users WHERE age > 25", conn)
# Write a DataFrame to PostgreSQL
df.to_sql('users_backup', conn, if_exists='replace', index=False)
# Use parameters in queries
query = "SELECT * FROM users WHERE age BETWEEN %s AND %s"
df = pd.read_sql(query, conn, params=(25, 50))
Integration with SQLAlchemy
from sqlalchemy import create_engine
import pandas as pd
# Create a SQLAlchemy engine using psycopg2
engine = create_engine('postgresql+psycopg2://user:password@localhost/dbname')
# Use with Pandas
df = pd.read_sql("SELECT * FROM users", engine)
df.to_sql('users_copy', engine, if_exists='replace')
# Direct usage of the connection
with engine.connect() as conn:
result = conn.execute("SELECT COUNT(*) FROM users")
count = result.scalar()
Integration with Flask
from flask import Flask, jsonify, request
import psycopg2
from psycopg2.extras import RealDictCursor
app = Flask(__name__)
# Database configuration
DATABASE_CONFIG = {
'host': 'localhost',
'database': 'myapp',
'user': 'postgres',
'password': 'password'
}
def get_db_connection():
return psycopg2.connect(**DATABASE_CONFIG)
@app.route('/users', methods=['GET'])
def get_users():
with get_db_connection() as conn:
with conn.cursor(cursor_factory=RealDictCursor) as cur:
cur.execute("SELECT id, name, email FROM users")
users = cur.fetchall()
return jsonify([dict(user) for user in users])
@app.route('/users', methods=['POST'])
def create_user():
data = request.get_json()
with get_db_connection() as conn:
with conn.cursor() as cur:
cur.execute(
"INSERT INTO users (name, email) VALUES (%s, %s) RETURNING id",
(data['name'], data['email'])
)
user_id = cur.fetchone()[0]
return jsonify({'id': user_id, 'message': 'User created'})
if __name__ == '__main__':
app.run(debug=True)
Integration with Django
Django uses psycopg2 by default for PostgreSQL. Configuration in settings.py:
DATABASES = {
'default': {
'ENGINE': 'django.db.backends.postgresql',
'NAME': 'myproject',
'USER': 'postgres',
'PASSWORD': 'password',
'HOST': 'localhost',
'PORT': '5432',
'OPTIONS': {
'options': '-c default_transaction_isolation=serializable'
}
}
}
# Additional psycopg2 settings
DATABASES['default']['OPTIONS']['isolation_level'] = psycopg2.extensions.ISOLATION_LEVEL_SERIALIZABLE
Using raw SQL in Django:
from django.db import connection
def get_user_statistics():
with connection.cursor() as cursor:
cursor.execute("""
SELECT COUNT(*) as total_users,
AVG(age) as avg_age,
MAX(created_at) as last_registration
FROM users
""")
row = cursor.fetchone()
return {
'total_users': row[0],
'avg_age': row[1],
'last_registration': row[2]
}
Integration with FastAPI
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import psycopg2
from psycopg2.extras import RealDictCursor
from contextlib import contextmanager
app = FastAPI()
DATABASE_URL = "postgresql://user:password@localhost/dbname"
@contextmanager
def get_db():
conn = psycopg2.connect(DATABASE_URL)
try:
yield conn
finally:
conn.close()
class UserCreate(BaseModel):
name: str
email: str
age: int
class UserResponse(BaseModel):
id: int
name: str
email: str
age: int
@app.post("/users/", response_model=UserResponse)
async def create_user(user: UserCreate):
with get_db() as conn:
with conn.cursor(cursor_factory=RealDictCursor) as cur:
cur.execute(
"INSERT INTO users (name, email, age) VALUES (%s, %s, %s) RETURNING *",
(user.name, user.email, user.age)
)
result = cur.fetchone()
return UserResponse(**result)
@app.get("/users/{user_id}", response_model=UserResponse)
async def get_user(user_id: int):
with get_db() as conn:
with conn.cursor(cursor_factory=RealDictCursor) as cur:
cur.execute("SELECT * FROM users WHERE id = %s", (user_id,))
result = cur.fetchone()
if not result:
raise HTTPException(status_code=404, detail="User not found")
return UserResponse(**result)
Comparison with Other PostgreSQL Drivers
| Criterion | Psycopg2 | asyncpg | aiopg | psycopg3 |
|---|---|---|---|---|
| Asynchrony | No | Yes | Yes | Yes |
| Speed | High | Very high | Medium | High |
| Maturity | Very high | High | Medium | Medium |
| ORM Compatibility | Excellent | Limited | Limited | Good |
| Community Size | Very large | Large | Medium | Growing |
| PostgreSQL Type Support | Full | Full | Partial | Full |
| Ease of Use | Very easy | Easy | Easy | Easy |
| Memory Consumption | Moderate | Low | Moderate | Moderate |
| Supported Python Versions | 2.7, 3.6+ | 3.6+ | 3.6+ | 3.7+ |
When to Use Each Driver:
Psycopg2 – best choice for:
- Synchronous applications
- Working with Django, Flask
- Maximum compatibility
- Stable production systems
asyncpg – best choice for:
- High‑performance asynchronous applications
- Microservices with heavy load
- When query speed is critical
psycopg3 – best choice for:
- New projects needing async support
- Applications that require ORM compatibility
- When modern Python features are desired
Performance Optimization
Configuring a Connection Pool
from psycopg2 import pool
# Create a threaded connection pool
connection_pool = psycopg2.pool.ThreadedConnectionPool(
minconn=1,
maxconn=20,
host='localhost',
database='mydb',
user='postgres',
password='password'
)
def execute_query(query, params=None):
conn = connection_pool.getconn()
try:
with conn.cursor() as cur:
cur.execute(query, params)
return cur.fetchall()
finally:
connection_pool.putconn(conn)
Query Optimization
# Use prepared statements for repeated queries
cur.execute("PREPARE get_user_by_email AS SELECT * FROM users WHERE email = $1")
cur.execute("EXECUTE get_user_by_email (%s)", ("user@example.com",))
# Create indexes
cur.execute("CREATE INDEX CONCURRENTLY idx_users_email ON users(email)")
# Analyze execution plans
cur.execute("EXPLAIN ANALYZE SELECT * FROM users WHERE email = %s", ("user@example.com",))
plan = cur.fetchall()
Optimizing Bulk Operations
from psycopg2.extras import execute_values, execute_batch
# Efficient bulk insert
data = [(f"user_{i}", f"user_{i}@example.com") for i in range(1000)]
# Method 1: execute_values (fastest)
execute_values(
cur,
"INSERT INTO users (name, email) VALUES %s",
data,
template=None,
page_size=100
)
# Method 2: execute_batch (moderate speed)
execute_batch(
cur,
"INSERT INTO users (name, email) VALUES (%s, %s)",
data,
page_size=100
)
# Method 3: COPY (for very large volumes)
from io import StringIO
import csv
data_file = StringIO()
writer = csv.writer(data_file)
writer.writerows(data)
data_file.seek(0)
cur.copy_from(data_file, 'users', columns=('name', 'email'), sep=',')
Testing and Debugging
Setting Up a Test Environment
import pytest
import psycopg2
from psycopg2.extras import DictCursor
@pytest.fixture
def db_connection():
"""Fixture for connecting to the test database"""
conn = psycopg2.connect(
host='localhost',
database='test_db',
user='test_user',
password='test_password'
)
yield conn
conn.close()
@pytest.fixture
def clean_db(db_connection):
"""Fixture for cleaning the DB before each test"""
with db_connection.cursor() as cur:
cur.execute("TRUNCATE users RESTART IDENTITY CASCADE")
db_connection.commit()
yield db_connection
Example Tests
def test_user_creation(clean_db):
"""Test creating a user"""
with clean_db.cursor() as cur:
cur.execute(
"INSERT INTO users (name, email) VALUES (%s, %s) RETURNING id",
("Test User", "test@example.com")
)
user_id = cur.fetchone()[0]
assert user_id is not None
assert user_id > 0
def test_user_retrieval(clean_db):
"""Test retrieving a user"""
with clean_db.cursor(cursor_factory=DictCursor) as cur:
# Create a user
cur.execute(
"INSERT INTO users (name, email) VALUES (%s, %s) RETURNING id",
("Test User", "test@example.com")
)
user_id = cur.fetchone()[0]
# Retrieve the user
cur.execute("SELECT * FROM users WHERE id = %s", (user_id,))
user = cur.fetchone()
assert user['name'] == "Test User"
assert user['email'] == "test@example.com"
Debugging SQL Queries
import logging
# Configure logging for debugging
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)
def debug_query(cur, query, params=None):
"""Utility to log query execution details"""
logger.debug(f"Executing query: {query}")
if params:
logger.debug(f"Parameters: {params}")
cur.execute(query, params)
logger.debug(f"Rows affected: {cur.rowcount}")
logger.debug(f"Status: {cur.statusmessage}")
return cur.fetchall()
Performance Monitoring
import time
from contextlib import contextmanager
@contextmanager
def measure_query_time(query_name):
"""Context manager to measure query execution time"""
start_time = time.time()
try:
yield
finally:
end_time = time.time()
execution_time = end_time - start_time
print(f"Query '{query_name}' executed in {execution_time:.4f} seconds")
# Usage example
with measure_query_time("get_all_users"):
cur.execute("SELECT * FROM users")
users = cur.fetchall()
Working with Large Data Volumes
Server‑Side Cursors
def process_large_dataset(conn, query, batch_size=1000):
"""Process huge result sets using a server‑side cursor"""
with conn.cursor(name='large_data_cursor') as cur:
cur.execute(query)
while True:
batch = cur.fetchmany(batch_size)
if not batch:
break
for row in batch:
process_row(row)
# Periodic commit to free memory
conn.commit()
def process_row(row):
"""Placeholder for row‑processing logic"""
pass
Using COPY for Massive Loads
import csv
from io import StringIO
def bulk_import_from_csv(conn, table_name, csv_file_path):
"""Bulk import data from a CSV file"""
with open(csv_file_path, 'r', encoding='utf-8') as file:
with conn.cursor() as cur:
cur.copy_expert(
f"COPY {table_name} FROM STDIN WITH CSV HEADER",
file
)
conn.commit()
def bulk_export_to_csv(conn, table_name, csv_file_path):
"""Bulk export data to a CSV file"""
with open(csv_file_path, 'w', encoding='utf-8', newline='') as file:
with conn.cursor() as cur:
cur.copy_expert(
f"COPY {table_name} TO STDOUT WITH CSV HEADER",
file
)
Streaming Data Processing
def stream_process_data(conn, query, processor_func):
"""Stream rows without loading everything into memory"""
with conn.cursor(name='stream_cursor') as cur:
cur.execute(query)
for row in cur:
try:
processor_func(row)
except Exception as e:
logger.error(f"Error processing row {row}: {e}")
continue
Frequently Asked Questions
What is psycopg2 and why should I use it?
Psycopg2 is a popular PostgreSQL adapter for Python that provides reliable and efficient communication between Python applications and a PostgreSQL database. It is essential for executing SQL statements, handling transactions, and working with PostgreSQL’s rich data types.
How should I handle connection errors?
import psycopg2
from psycopg2 import OperationalError, InterfaceError
try:
conn = psycopg2.connect(
host='localhost',
database='mydb',
user='postgres',
password='password',
connect_timeout=10
)
except OperationalError as e:
print(f"Connection error: {e}")
except InterfaceError as e:
print(f"Interface error: {e}")
except Exception as e:
print(f"Unexpected error: {e}")
Does psycopg2 support transactions?
Yes, psycopg2 fully supports transactions. By default every operation runs inside a transaction that must be confirmed with commit() or cancelled with rollback().
How can I protect my application from SQL injection?
Always use parameterised queries with %s placeholders instead of string formatting:
# CORRECT
cur.execute("SELECT * FROM users WHERE email = %s", (email,))
# WRONG
cur.execute(f"SELECT * FROM users WHERE email = '{email}'")
Can I use psycopg2 with Django?
Yes, psycopg2 is the recommended driver for PostgreSQL in Django. It is automatically used when you set ENGINE: 'django.db.backends.postgresql' in your database settings.
Does psycopg2 provide asynchronous support?
No, psycopg2 is a synchronous driver. For asynchronous applications consider asyncpg or the newer psycopg3, which offers both sync and async APIs.
How does psycopg2 work in multithreaded applications?
Psycopg2 supports multithreading at the connection level. It is best practice to use a connection pool and avoid sharing a single connection across threads:
from psycopg2 import pool
connection_pool = pool.ThreadedConnectionPool(
minconn=1,
maxconn=20,
host='localhost',
database='mydb',
user='postgres',
password='password'
)
How can I optimise performance when handling huge datasets?
Use these techniques:
- Server‑side cursors for streaming large result sets
execute_values()for fast bulk insertsCOPYfor massive import/export tasks- Connection pooling to reduce connection overhead
Does psycopg2 support JSON data?
Yes, psycopg2 automatically maps PostgreSQL JSON/JSONB fields to Python dictionaries and lists. You can also use psycopg2.extras.Json to explicitly send JSON data.
What is the proper way to close connections?
Always close connections and cursors after use. The recommended approach is to use context managers:
with psycopg2.connect(dsn) as conn:
with conn.cursor() as cur:
cur.execute("SELECT * FROM users")
# Connection and cursor are closed automatically
Conclusion
Psycopg2 remains the gold standard for PostgreSQL access in Python thanks to its reliability, performance, and extensive feature set. The library provides everything needed to build robust, scalable applications that work with data efficiently.
Key advantages of psycopg2:
- Stability and mature codebase
- Full PostgreSQL compatibility
- Excellent performance
- Rich support for diverse data types
- Broad ecosystem integration
For new projects you may want to explore psycopg3, which offers a modern API and async support while maintaining compatibility with psycopg2. However, for existing codebases and scenarios demanding maximum stability, psycopg2 remains an excellent choice.
$
The Future of AI in Mathematics and Everyday Life: How Intelligent Agents Are Already Changing the Game
Experts warned about the risks of fake charity with AI
In Russia, universal AI-agent for robots and industrial processes was developed