PSYCOPG2 - PostgreSQL driver

онлайн тренажер по питону
Online Python Trainer for Beginners

Learn Python easily without overwhelming theory. Solve practical tasks with automatic checking, get hints in Russian, and write code directly in your browser — no installation required.

Start Course

Psycopg2: Complete Guide to Working with PostgreSQL in Python

Introduction

PostgreSQL is a powerful open‑source relational DBMS known for its reliability, scalability, and support for complex data types. The most popular and reliable tool for accessing PostgreSQL from Python is Psycopg2 – the official synchronous driver that provides a stable and high‑performance solution for connecting to the database, executing SQL queries, managing transactions, and integrating with web frameworks.

Psycopg2 is the de‑facto standard for PostgreSQL in Python projects thanks to its maturity, feature completeness, and active community support.

What Is Psycopg2

Psycopg2 is a Python adapter for PostgreSQL written in C using libpq (the official PostgreSQL client library). The library fully complies with the DB‑API 2.0 specification and offers many PostgreSQL‑specific extensions.

Key Features of Psycopg2:

  • High performance: C implementation and optimized libpq interaction
  • Security: Built‑in protection against SQL injection via parameterised queries
  • Transaction support: Full control with optional automatic management
  • Data‑type compatibility: Automatic conversion between PostgreSQL and Python types
  • Extension support: Works with JSON, UUID, arrays, hstore and other PostgreSQL data types
  • Thread safety: Usable in multithreaded applications

Installation and Setup

Installing the Library

pip install psycopg2-binary

For production environments the full package is recommended:

pip install psycopg2

System Requirements

Before installing, make sure the required system dependencies are present:

Ubuntu/Debian:

sudo apt-get install libpq-dev python3-dev

CentOS/RHEL:

sudo yum install postgresql-devel python3-devel

macOS:

brew install postgresql

Connecting to the Database

Basic Connection

import psycopg2

# Connect to PostgreSQL
conn = psycopg2.connect(
    dbname="mydb",
    user="postgres", 
    password="password",
    host="localhost",
    port="5432"
)

# Create a cursor
cur = conn.cursor()

Connection via DSN String

conn = psycopg2.connect("postgresql://user:password@localhost:5432/dbname")

Connection with Additional Parameters

conn = psycopg2.connect(
    dbname="mydb",
    user="postgres",
    password="password",
    host="localhost",
    port="5432",
    sslmode="require",
    connect_timeout=10,
    application_name="my_app"
)

Using Context Managers

import psycopg2
from contextlib import contextmanager

@contextmanager
def get_db_connection():
    conn = psycopg2.connect(
        dbname="mydb",
        user="postgres",
        password="password",
        host="localhost"
    )
    try:
        yield conn
    finally:
        conn.close()

# Usage
with get_db_connection() as conn:
    with conn.cursor() as cur:
        cur.execute("SELECT * FROM users")
        results = cur.fetchall()

Creating Databases and Tables

Creating a Table

cur.execute("""
CREATE TABLE IF NOT EXISTS users (
    id SERIAL PRIMARY KEY,
    name VARCHAR(100) NOT NULL,
    email VARCHAR(255) UNIQUE NOT NULL,
    age INTEGER CHECK (age > 0),
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    is_active BOOLEAN DEFAULT TRUE
)
""")
conn.commit()

Creating Indexes

cur.execute("CREATE INDEX IF NOT EXISTS idx_users_email ON users(email)")
cur.execute("CREATE INDEX IF NOT EXISTS idx_users_created_at ON users(created_at)")
conn.commit()

Creating Composite Tables

cur.execute("""
CREATE TABLE IF NOT EXISTS orders (
    id SERIAL PRIMARY KEY,
    user_id INTEGER REFERENCES users(id) ON DELETE CASCADE,
    product_name VARCHAR(200) NOT NULL,
    quantity INTEGER NOT NULL DEFAULT 1,
    price DECIMAL(10,2) NOT NULL,
    order_date TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    metadata JSONB
)
""")
conn.commit()

CRUD Operations with Psycopg2

Inserting Records

Single Insert

cur.execute(
    "INSERT INTO users (name, email, age) VALUES (%s, %s, %s)",
    ("Ivan Ivanov", "ivan@example.com", 30)
)
conn.commit()

Getting the Inserted ID

cur.execute(
    "INSERT INTO users (name, email, age) VALUES (%s, %s, %s) RETURNING id",
    ("Petr Petrov", "petr@example.com", 25)
)
user_id = cur.fetchone()[0]
conn.commit()

Bulk Insert

users_data = [
    ("Anna Smirnova", "anna@example.com", 28),
    ("Maxim Kozlov", "maxim@example.com", 35),
    ("Elena Vasilieva", "elena@example.com", 24)
]

cur.executemany(
    "INSERT INTO users (name, email, age) VALUES (%s, %s, %s)",
    users_data
)
conn.commit()

Efficient Bulk Insert

from psycopg2.extras import execute_values

execute_values(
    cur,
    "INSERT INTO users (name, email, age) VALUES %s",
    users_data,
    template=None,
    page_size=100
)
conn.commit()

Selecting Data

Basic Select

cur.execute("SELECT id, name, email, age FROM users")
rows = cur.fetchall()
for row in rows:
    print(f"ID: {row[0]}, Name: {row[1]}, Email: {row[2]}, Age: {row[3]}")

Select with Conditions

cur.execute("SELECT * FROM users WHERE age > %s AND is_active = %s", (25, True))
active_users = cur.fetchall()

Select with Sorting and Limit

cur.execute("""
SELECT name, email, age 
FROM users 
WHERE age BETWEEN %s AND %s 
ORDER BY age DESC 
LIMIT %s
""", (20, 50, 10))

Using fetchone() and fetchmany()

# Fetch a single row
cur.execute("SELECT * FROM users WHERE id = %s", (1,))
user = cur.fetchone()

# Fetch multiple rows
cur.execute("SELECT * FROM users")
first_five = cur.fetchmany(5)

# Iterate over the result set
cur.execute("SELECT * FROM users")
for row in cur:
    print(row)

Updating Data

# Update a single row
cur.execute(
    "UPDATE users SET email = %s WHERE id = %s",
    ("new_email@example.com", 1)
)

# Update multiple columns
cur.execute("""
UPDATE users 
SET email = %s, age = %s, is_active = %s 
WHERE id = %s
""", ("updated@example.com", 31, True, 1))

# Bulk update
cur.execute(
    "UPDATE users SET is_active = %s WHERE age < %s",
    (False, 18)
)

conn.commit()

Deleting Data

# Delete by ID
cur.execute("DELETE FROM users WHERE id = %s", (1,))

# Conditional delete
cur.execute("DELETE FROM users WHERE is_active = %s", (False,))

# Delete with RETURNING
cur.execute("DELETE FROM users WHERE age < %s RETURNING id, name", (18,))
deleted_users = cur.fetchall()

conn.commit()

Working with Cursors and Transactions

Transaction Management

Psycopg2 uses transactions by default. Every data‑modifying operation must be confirmed with commit():

try:
    cur.execute("INSERT INTO users (name, email) VALUES (%s, %s)", 
                ("Test", "test@example.com"))
    cur.execute("UPDATE users SET age = %s WHERE email = %s", 
                (25, "test@example.com"))
    conn.commit()
    print("Transaction completed successfully")
except Exception as e:
    conn.rollback()
    print(f"Error: {e}")

Automatic Transaction Management

conn.autocommit = True  # Enable autocommit
cur.execute("INSERT INTO users (name, email) VALUES (%s, %s)", 
            ("Auto", "auto@example.com"))
# Commit happens automatically

Using with for Transactions

try:
    with conn:
        with conn.cursor() as cur:
            cur.execute("INSERT INTO users (name, email) VALUES (%s, %s)", 
                       ("Context", "context@example.com"))
            cur.execute("UPDATE users SET age = %s WHERE email = %s", 
                       (30, "context@example.com"))
            # Automatic commit on block exit
except Exception as e:
    # Automatic rollback on error
    print(f"Transaction error: {e}")

Savepoints

try:
    with conn:
        with conn.cursor() as cur:
            cur.execute("INSERT INTO users (name, email) VALUES (%s, %s)", 
                       ("User1", "user1@example.com"))
            
            # Create a savepoint
            cur.execute("SAVEPOINT sp1")
            
            try:
                cur.execute("INSERT INTO users (name, email) VALUES (%s, %s)", 
                           ("User2", "user2@example.com"))
                cur.execute("INSERT INTO users (name, email) VALUES (%s, %s)", 
                           ("Duplicate", "user1@example.com"))  # Duplicate error
            except Exception:
                # Roll back to savepoint
                cur.execute("ROLLBACK TO SAVEPOINT sp1")
                print("Rolled back to savepoint")
            
            cur.execute("INSERT INTO users (name, email) VALUES (%s, %s)", 
                       ("User3", "user3@example.com"))
except Exception as e:
    print(f"Error: {e}")

Reading Query Results

Data Retrieval Methods

# Execute a query
cur.execute("SELECT id, name, email FROM users LIMIT 10")

# fetchone() – get a single row
first_row = cur.fetchone()
print(first_row)  # (1, 'Ivan', 'ivan@example.com')

# fetchmany(n) – get n rows
cur.execute("SELECT id, name, email FROM users")
five_rows = cur.fetchmany(5)
for row in five_rows:
    print(row)

# fetchall() – get all rows
cur.execute("SELECT id, name, email FROM users")
all_rows = cur.fetchall()
print(f"Total rows: {len(all_rows)}")

Iterating Over a Cursor

cur.execute("SELECT id, name, email FROM users")
for row in cur:
    id, name, email = row
    print(f"ID: {id}, Name: {name}, Email: {email}")

Working with Named Cursors

from psycopg2.extras import DictCursor

# Create a cursor that allows column name access
cur = conn.cursor(cursor_factory=DictCursor)
cur.execute("SELECT id, name, email FROM users LIMIT 1")
row = cur.fetchone()

print(row['name'])   # Access by column name
print(row['email'])  # Access by column name
print(row[0])        # Index access also works

Server‑Side Cursors for Large Datasets

# Create a server‑side cursor for processing huge result sets
cur = conn.cursor(name='large_data_cursor')
cur.execute("SELECT * FROM large_table")

# Process data in batches
while True:
    rows = cur.fetchmany(1000)
    if not rows:
        break
    
    for row in rows:
        # Process each row
        process_row(row)

cur.close()

Security and SQL Injection Protection

Correct Use of Parameters

# CORRECT – use parameterised queries
email = "user@example.com"
cur.execute("SELECT * FROM users WHERE email = %s", (email,))

# CORRECT – multiple parameters
cur.execute("SELECT * FROM users WHERE age > %s AND email = %s", (25, email))

# CORRECT – named parameters
cur.execute("SELECT * FROM users WHERE age > %(min_age)s AND email = %(email)s", 
            {'min_age': 25, 'email': email})

What NOT to Do

# WRONG – vulnerable to SQL injection
email = "user@example.com"
cur.execute(f"SELECT * FROM users WHERE email = '{email}'")

# WRONG – using .format()
cur.execute("SELECT * FROM users WHERE email = '{}'".format(email))

# WRONG – using % for strings
cur.execute("SELECT * FROM users WHERE email = '%s'" % email)

Escaping Identifiers

from psycopg2.sql import SQL, Identifier, Literal

# Safe dynamic query construction
table_name = "users"
column_name = "email"

query = SQL("SELECT * FROM {} WHERE {} = %s").format(
    Identifier(table_name),
    Identifier(column_name)
)

cur.execute(query, ("user@example.com",))

Composing Complex Queries

from psycopg2.sql import SQL, Identifier, Literal, Composed

# Build a complex SELECT
def build_select_query(table, columns, where_conditions):
    query = SQL("SELECT {} FROM {}").format(
        SQL(', ').join(map(Identifier, columns)),
        Identifier(table)
    )
    
    if where_conditions:
        where_clause = SQL(" AND ").join(
            SQL("{} = %s").format(Identifier(col)) 
            for col in where_conditions.keys()
        )
        query = SQL("{} WHERE {}").format(query, where_clause)
    
    return query

# Usage
query = build_select_query(
    'users', 
    ['id', 'name', 'email'], 
    {'age': 25, 'is_active': True}
)
cur.execute(query, (25, True))

Working with PostgreSQL Data Types

Automatic Type Conversion

Psycopg2 automatically converts data types between PostgreSQL and Python:

PostgreSQL Type Python Type Example
INTEGER, SERIAL int 42
VARCHAR, TEXT str "string"
BOOLEAN bool True
DECIMAL, NUMERIC decimal.Decimal Decimal('123.45')
TIMESTAMP datetime.datetime datetime.now()
DATE datetime.date date.today()
TIME datetime.time time(14, 30)
JSON, JSONB dict, list {"key": "value"}
UUID uuid.UUID uuid.uuid4()
BYTEA bytes b"binary data"
ARRAY list [1, 2, 3]

Working with JSON Data

import json
from psycopg2.extras import Json

# Create a table with JSON
cur.execute("""
CREATE TABLE IF NOT EXISTS logs (
    id SERIAL PRIMARY KEY,
    message TEXT,
    metadata JSONB,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")

# Insert JSON data
metadata = {
    "ip": "192.168.1.1",
    "user_agent": "Mozilla/5.0...",
    "session_id": "abc123"
}

cur.execute(
    "INSERT INTO logs (message, metadata) VALUES (%s, %s)",
    ("User logged in", Json(metadata))
)

# Query JSON data
cur.execute("SELECT metadata FROM logs WHERE id = %s", (1,))
row = cur.fetchone()
print(row[0]['ip'])  # Automatically converted to dict

Working with PostgreSQL Arrays

# Create a table with an array column
cur.execute("""
CREATE TABLE IF NOT EXISTS products (
    id SERIAL PRIMARY KEY,
    name TEXT,
    tags TEXT[],
    prices DECIMAL[]
)
""")

# Insert arrays
cur.execute(
    "INSERT INTO products (name, tags, prices) VALUES (%s, %s, %s)",
    ("Laptop", ["electronics", "computer"], [50000.00, 45000.00])
)

# Search within an array
cur.execute("SELECT * FROM products WHERE %s = ANY(tags)", ("electronics",))

Working with UUIDs

import uuid

# Create a table with UUID
cur.execute("""
CREATE TABLE IF NOT EXISTS sessions (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    user_id INTEGER,
    token UUID NOT NULL,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")

# Insert a UUID
session_token = uuid.uuid4()
cur.execute(
    "INSERT INTO sessions (user_id, token) VALUES (%s, %s)",
    (1, session_token)
)

# Query by UUID
cur.execute("SELECT * FROM sessions WHERE token = %s", (session_token,))

Working with Temporal Types

from datetime import datetime, date, time

# Create a table with temporal columns
cur.execute("""
CREATE TABLE IF NOT EXISTS events (
    id SERIAL PRIMARY KEY,
    title TEXT,
    event_date DATE,
    event_time TIME,
    created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP
)
""")

# Insert temporal data
cur.execute(
    "INSERT INTO events (title, event_date, event_time) VALUES (%s, %s, %s)",
    ("Meeting", date(2024, 12, 25), time(14, 30))
)

# Work with time zones
cur.execute("SELECT created_at AT TIME ZONE 'UTC' FROM events")

Advanced Psycopg2 Features

Working with Stored Procedures

# Create a stored procedure
cur.execute("""
CREATE OR REPLACE FUNCTION get_user_stats(user_id INTEGER)
RETURNS TABLE(total_orders INTEGER, total_amount DECIMAL) AS $$
BEGIN
    RETURN QUERY
    SELECT COUNT(*)::INTEGER, COALESCE(SUM(price * quantity), 0)::DECIMAL
    FROM orders
    WHERE orders.user_id = get_user_stats.user_id;
END;
$$ LANGUAGE plpgsql;
""")

# Call the stored procedure
cur.callproc('get_user_stats', (1,))
result = cur.fetchone()
print(f"Orders: {result[0]}, Amount: {result[1]}")

Working with Large Objects (LOBs)

# Create a large object
large_obj = conn.lobject(mode='w')
large_obj.write(b"Large amount of data...")
loid = large_obj.oid
large_obj.close()

# Store a reference to the LOB
cur.execute("INSERT INTO files (name, data_oid) VALUES (%s, %s)", 
            ("document.pdf", loid))

# Read the large object
cur.execute("SELECT data_oid FROM files WHERE name = %s", ("document.pdf",))
oid = cur.fetchone()[0]
large_obj = conn.lobject(oid, mode='r')
data = large_obj.read()
large_obj.close()

Asynchronous Notifications

# Subscribe to notifications
cur.execute("LISTEN user_updates")
conn.commit()

# Send a notification (from another connection)
cur.execute("NOTIFY user_updates, 'New user created'")
conn.commit()

# Check for notifications
conn.poll()
while conn.notifies:
    notify = conn.notifies.pop(0)
    print(f"Received notification: {notify.channel} - {notify.payload}")

Copying Data

from io import StringIO

# COPY TO – export data
cur.execute("COPY users TO STDOUT WITH CSV HEADER")
with open('users_export.csv', 'w') as f:
    while True:
        data = cur.fetchone()
        if data is None:
            break
        f.write(data)

# COPY FROM – import data
with open('users_import.csv', 'r') as f:
    cur.copy_from(f, 'users', columns=('name', 'email', 'age'), sep=',')

Comprehensive List of Psycopg2 Methods and Functions

Main Connection Methods

Method / Function Description Example Usage
psycopg2.connect() Create a connection to the database conn = psycopg2.connect(dsn)
conn.cursor() Create a cursor for executing queries cur = conn.cursor()
conn.close() Close the connection conn.close()
conn.commit() Commit the current transaction conn.commit()
conn.rollback() Rollback the current transaction conn.rollback()

Cursor Methods

Method Description Example Usage
cur.execute(sql, vars) Execute an SQL statement cur.execute("SELECT * FROM users WHERE id = %s", (1,))
cur.executemany(sql, vars_list) Execute a statement for a sequence of parameters cur.executemany("INSERT INTO users (name) VALUES (%s)", [("John",), ("Jane",)])
cur.fetchone() Fetch a single row row = cur.fetchone()
cur.fetchmany(size) Fetch a number of rows rows = cur.fetchmany(10)
cur.fetchall() Fetch all remaining rows all_rows = cur.fetchall()
cur.callproc(procname, vars) Call a stored procedure cur.callproc('my_proc', (param1, param2))
cur.close() Close the cursor cur.close()
cur.copy_from(file, table) Copy data from a file into a table cur.copy_from(f, 'users', columns=('name', 'email'))
cur.copy_to(file, table) Copy data from a table into a file cur.copy_to(f, 'users')

Cursor Attributes

Attribute Description Example Usage
cur.description Metadata about result columns columns = [desc[0] for desc in cur.description]
cur.rowcount Number of rows affected print(f"Rows updated: {cur.rowcount}")
cur.rownumber Current row number in the result set print(f"Current row: {cur.rownumber}")
cur.query Last executed query print(cur.query)
cur.statusmessage Message describing the command status print(cur.statusmessage)

Transaction Management Methods

Method Description Example Usage
conn.autocommit Enable/disable autocommit mode conn.autocommit = True
conn.get_transaction_status() Get the current transaction status status = conn.get_transaction_status()
conn.set_isolation_level(level) Set the transaction isolation level conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_SERIALIZABLE)
conn.set_session(readonly=True) Configure session parameters conn.set_session(readonly=True, deferrable=True)

Additional Modules and Functions

Module / Function Description Example Usage
psycopg2.extras.DictCursor Cursor that allows column access by name cur = conn.cursor(cursor_factory=DictCursor)
psycopg2.extras.RealDictCursor Cursor that returns real dictionaries cur = conn.cursor(cursor_factory=RealDictCursor)
psycopg2.extras.execute_values() Efficient bulk insertion of many rows execute_values(cur, "INSERT INTO t VALUES %s", data)
psycopg2.extras.execute_batch() Batch execution of statements execute_batch(cur, "INSERT INTO t VALUES (%s, %s)", data)
psycopg2.extras.Json() Wrapper for JSON data cur.execute("INSERT INTO t (data) VALUES (%s)", (Json(data),))
psycopg2.sql.SQL() Compose SQL statements safely query = SQL("SELECT * FROM {}").format(Identifier(table))
psycopg2.sql.Identifier() Safely escape identifiers Identifier('table_name')
psycopg2.sql.Literal() Safely escape literals Literal('string_value')

Error Handling

Exception Description When It Occurs
psycopg2.Error Base class for all database errors Parent of all other exceptions
psycopg2.Warning Warning messages from the server Server‑side notices
psycopg2.InterfaceError Errors related to the DB‑API interface Module‑level problems
psycopg2.DatabaseError General database errors Issues directly tied to the database
psycopg2.DataError Invalid data supplied Incorrect data types or values
psycopg2.OperationalError Operational problems Connection failures, transaction issues
psycopg2.IntegrityError Integrity constraint violations Unique, foreign‑key, check constraints
psycopg2.InternalError Internal PostgreSQL errors Server‑side internal failures
psycopg2.ProgrammingError Programming mistakes SQL syntax errors, misuse of the API
psycopg2.NotSupportedError Unsupported operation Attempting features not supported by the driver

Integration with Popular Frameworks

Integration with Pandas

import pandas as pd
import psycopg2

# Read data from PostgreSQL into a DataFrame
conn = psycopg2.connect(dsn)
df = pd.read_sql("SELECT * FROM users WHERE age > 25", conn)

# Write a DataFrame to PostgreSQL
df.to_sql('users_backup', conn, if_exists='replace', index=False)

# Use parameters in queries
query = "SELECT * FROM users WHERE age BETWEEN %s AND %s"
df = pd.read_sql(query, conn, params=(25, 50))

Integration with SQLAlchemy

from sqlalchemy import create_engine
import pandas as pd

# Create a SQLAlchemy engine using psycopg2
engine = create_engine('postgresql+psycopg2://user:password@localhost/dbname')

# Use with Pandas
df = pd.read_sql("SELECT * FROM users", engine)
df.to_sql('users_copy', engine, if_exists='replace')

# Direct usage of the connection
with engine.connect() as conn:
    result = conn.execute("SELECT COUNT(*) FROM users")
    count = result.scalar()

Integration with Flask

from flask import Flask, jsonify, request
import psycopg2
from psycopg2.extras import RealDictCursor

app = Flask(__name__)

# Database configuration
DATABASE_CONFIG = {
    'host': 'localhost',
    'database': 'myapp',
    'user': 'postgres',
    'password': 'password'
}

def get_db_connection():
    return psycopg2.connect(**DATABASE_CONFIG)

@app.route('/users', methods=['GET'])
def get_users():
    with get_db_connection() as conn:
        with conn.cursor(cursor_factory=RealDictCursor) as cur:
            cur.execute("SELECT id, name, email FROM users")
            users = cur.fetchall()
            return jsonify([dict(user) for user in users])

@app.route('/users', methods=['POST'])
def create_user():
    data = request.get_json()
    with get_db_connection() as conn:
        with conn.cursor() as cur:
            cur.execute(
                "INSERT INTO users (name, email) VALUES (%s, %s) RETURNING id",
                (data['name'], data['email'])
            )
            user_id = cur.fetchone()[0]
            return jsonify({'id': user_id, 'message': 'User created'})

if __name__ == '__main__':
    app.run(debug=True)

Integration with Django

Django uses psycopg2 by default for PostgreSQL. Configuration in settings.py:

DATABASES = {
    'default': {
        'ENGINE': 'django.db.backends.postgresql',
        'NAME': 'myproject',
        'USER': 'postgres',
        'PASSWORD': 'password',
        'HOST': 'localhost',
        'PORT': '5432',
        'OPTIONS': {
            'options': '-c default_transaction_isolation=serializable'
        }
    }
}

# Additional psycopg2 settings
DATABASES['default']['OPTIONS']['isolation_level'] = psycopg2.extensions.ISOLATION_LEVEL_SERIALIZABLE

Using raw SQL in Django:

from django.db import connection

def get_user_statistics():
    with connection.cursor() as cursor:
        cursor.execute("""
            SELECT COUNT(*) as total_users,
                   AVG(age) as avg_age,
                   MAX(created_at) as last_registration
            FROM users
        """)
        row = cursor.fetchone()
        return {
            'total_users': row[0],
            'avg_age': row[1],
            'last_registration': row[2]
        }

Integration with FastAPI

from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import psycopg2
from psycopg2.extras import RealDictCursor
from contextlib import contextmanager

app = FastAPI()

DATABASE_URL = "postgresql://user:password@localhost/dbname"

@contextmanager
def get_db():
    conn = psycopg2.connect(DATABASE_URL)
    try:
        yield conn
    finally:
        conn.close()

class UserCreate(BaseModel):
    name: str
    email: str
    age: int

class UserResponse(BaseModel):
    id: int
    name: str
    email: str
    age: int

@app.post("/users/", response_model=UserResponse)
async def create_user(user: UserCreate):
    with get_db() as conn:
        with conn.cursor(cursor_factory=RealDictCursor) as cur:
            cur.execute(
                "INSERT INTO users (name, email, age) VALUES (%s, %s, %s) RETURNING *",
                (user.name, user.email, user.age)
            )
            result = cur.fetchone()
            return UserResponse(**result)

@app.get("/users/{user_id}", response_model=UserResponse)
async def get_user(user_id: int):
    with get_db() as conn:
        with conn.cursor(cursor_factory=RealDictCursor) as cur:
            cur.execute("SELECT * FROM users WHERE id = %s", (user_id,))
            result = cur.fetchone()
            if not result:
                raise HTTPException(status_code=404, detail="User not found")
            return UserResponse(**result)

Comparison with Other PostgreSQL Drivers

Criterion Psycopg2 asyncpg aiopg psycopg3
Asynchrony No Yes Yes Yes
Speed High Very high Medium High
Maturity Very high High Medium Medium
ORM Compatibility Excellent Limited Limited Good
Community Size Very large Large Medium Growing
PostgreSQL Type Support Full Full Partial Full
Ease of Use Very easy Easy Easy Easy
Memory Consumption Moderate Low Moderate Moderate
Supported Python Versions 2.7, 3.6+ 3.6+ 3.6+ 3.7+

When to Use Each Driver:

Psycopg2 – best choice for:

  • Synchronous applications
  • Working with Django, Flask
  • Maximum compatibility
  • Stable production systems

asyncpg – best choice for:

  • High‑performance asynchronous applications
  • Microservices with heavy load
  • When query speed is critical

psycopg3 – best choice for:

  • New projects needing async support
  • Applications that require ORM compatibility
  • When modern Python features are desired

Performance Optimization

Configuring a Connection Pool

from psycopg2 import pool

# Create a threaded connection pool
connection_pool = psycopg2.pool.ThreadedConnectionPool(
    minconn=1,
    maxconn=20,
    host='localhost',
    database='mydb',
    user='postgres',
    password='password'
)

def execute_query(query, params=None):
    conn = connection_pool.getconn()
    try:
        with conn.cursor() as cur:
            cur.execute(query, params)
            return cur.fetchall()
    finally:
        connection_pool.putconn(conn)

Query Optimization

# Use prepared statements for repeated queries
cur.execute("PREPARE get_user_by_email AS SELECT * FROM users WHERE email = $1")
cur.execute("EXECUTE get_user_by_email (%s)", ("user@example.com",))

# Create indexes
cur.execute("CREATE INDEX CONCURRENTLY idx_users_email ON users(email)")

# Analyze execution plans
cur.execute("EXPLAIN ANALYZE SELECT * FROM users WHERE email = %s", ("user@example.com",))
plan = cur.fetchall()

Optimizing Bulk Operations

from psycopg2.extras import execute_values, execute_batch

# Efficient bulk insert
data = [(f"user_{i}", f"user_{i}@example.com") for i in range(1000)]

# Method 1: execute_values (fastest)
execute_values(
    cur, 
    "INSERT INTO users (name, email) VALUES %s",
    data,
    template=None,
    page_size=100
)

# Method 2: execute_batch (moderate speed)
execute_batch(
    cur,
    "INSERT INTO users (name, email) VALUES (%s, %s)",
    data,
    page_size=100
)

# Method 3: COPY (for very large volumes)
from io import StringIO
import csv

data_file = StringIO()
writer = csv.writer(data_file)
writer.writerows(data)
data_file.seek(0)

cur.copy_from(data_file, 'users', columns=('name', 'email'), sep=',')

Testing and Debugging

Setting Up a Test Environment

import pytest
import psycopg2
from psycopg2.extras import DictCursor

@pytest.fixture
def db_connection():
    """Fixture for connecting to the test database"""
    conn = psycopg2.connect(
        host='localhost',
        database='test_db',
        user='test_user',
        password='test_password'
    )
    yield conn
    conn.close()

@pytest.fixture
def clean_db(db_connection):
    """Fixture for cleaning the DB before each test"""
    with db_connection.cursor() as cur:
        cur.execute("TRUNCATE users RESTART IDENTITY CASCADE")
    db_connection.commit()
    yield db_connection

Example Tests

def test_user_creation(clean_db):
    """Test creating a user"""
    with clean_db.cursor() as cur:
        cur.execute(
            "INSERT INTO users (name, email) VALUES (%s, %s) RETURNING id",
            ("Test User", "test@example.com")
        )
        user_id = cur.fetchone()[0]
        assert user_id is not None
        assert user_id > 0

def test_user_retrieval(clean_db):
    """Test retrieving a user"""
    with clean_db.cursor(cursor_factory=DictCursor) as cur:
        # Create a user
        cur.execute(
            "INSERT INTO users (name, email) VALUES (%s, %s) RETURNING id",
            ("Test User", "test@example.com")
        )
        user_id = cur.fetchone()[0]
        
        # Retrieve the user
        cur.execute("SELECT * FROM users WHERE id = %s", (user_id,))
        user = cur.fetchone()
        
        assert user['name'] == "Test User"
        assert user['email'] == "test@example.com"

Debugging SQL Queries

import logging

# Configure logging for debugging
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)

def debug_query(cur, query, params=None):
    """Utility to log query execution details"""
    logger.debug(f"Executing query: {query}")
    if params:
        logger.debug(f"Parameters: {params}")
    
    cur.execute(query, params)
    logger.debug(f"Rows affected: {cur.rowcount}")
    logger.debug(f"Status: {cur.statusmessage}")
    
    return cur.fetchall()

Performance Monitoring

import time
from contextlib import contextmanager

@contextmanager
def measure_query_time(query_name):
    """Context manager to measure query execution time"""
    start_time = time.time()
    try:
        yield
    finally:
        end_time = time.time()
        execution_time = end_time - start_time
        print(f"Query '{query_name}' executed in {execution_time:.4f} seconds")

# Usage example
with measure_query_time("get_all_users"):
    cur.execute("SELECT * FROM users")
    users = cur.fetchall()

Working with Large Data Volumes

Server‑Side Cursors

def process_large_dataset(conn, query, batch_size=1000):
    """Process huge result sets using a server‑side cursor"""
    with conn.cursor(name='large_data_cursor') as cur:
        cur.execute(query)
        
        while True:
            batch = cur.fetchmany(batch_size)
            if not batch:
                break
            
            for row in batch:
                process_row(row)
                
            # Periodic commit to free memory
            conn.commit()

def process_row(row):
    """Placeholder for row‑processing logic"""
    pass

Using COPY for Massive Loads

import csv
from io import StringIO

def bulk_import_from_csv(conn, table_name, csv_file_path):
    """Bulk import data from a CSV file"""
    with open(csv_file_path, 'r', encoding='utf-8') as file:
        with conn.cursor() as cur:
            cur.copy_expert(
                f"COPY {table_name} FROM STDIN WITH CSV HEADER",
                file
            )
    conn.commit()

def bulk_export_to_csv(conn, table_name, csv_file_path):
    """Bulk export data to a CSV file"""
    with open(csv_file_path, 'w', encoding='utf-8', newline='') as file:
        with conn.cursor() as cur:
            cur.copy_expert(
                f"COPY {table_name} TO STDOUT WITH CSV HEADER",
                file
            )

Streaming Data Processing

def stream_process_data(conn, query, processor_func):
    """Stream rows without loading everything into memory"""
    with conn.cursor(name='stream_cursor') as cur:
        cur.execute(query)
        
        for row in cur:
            try:
                processor_func(row)
            except Exception as e:
                logger.error(f"Error processing row {row}: {e}")
                continue

Frequently Asked Questions

What is psycopg2 and why should I use it?

Psycopg2 is a popular PostgreSQL adapter for Python that provides reliable and efficient communication between Python applications and a PostgreSQL database. It is essential for executing SQL statements, handling transactions, and working with PostgreSQL’s rich data types.

How should I handle connection errors?

import psycopg2
from psycopg2 import OperationalError, InterfaceError

try:
    conn = psycopg2.connect(
        host='localhost',
        database='mydb',
        user='postgres',
        password='password',
        connect_timeout=10
    )
except OperationalError as e:
    print(f"Connection error: {e}")
except InterfaceError as e:
    print(f"Interface error: {e}")
except Exception as e:
    print(f"Unexpected error: {e}")

Does psycopg2 support transactions?

Yes, psycopg2 fully supports transactions. By default every operation runs inside a transaction that must be confirmed with commit() or cancelled with rollback().

How can I protect my application from SQL injection?

Always use parameterised queries with %s placeholders instead of string formatting:

# CORRECT
cur.execute("SELECT * FROM users WHERE email = %s", (email,))

# WRONG
cur.execute(f"SELECT * FROM users WHERE email = '{email}'")

Can I use psycopg2 with Django?

Yes, psycopg2 is the recommended driver for PostgreSQL in Django. It is automatically used when you set ENGINE: 'django.db.backends.postgresql' in your database settings.

Does psycopg2 provide asynchronous support?

No, psycopg2 is a synchronous driver. For asynchronous applications consider asyncpg or the newer psycopg3, which offers both sync and async APIs.

How does psycopg2 work in multithreaded applications?

Psycopg2 supports multithreading at the connection level. It is best practice to use a connection pool and avoid sharing a single connection across threads:

from psycopg2 import pool

connection_pool = pool.ThreadedConnectionPool(
    minconn=1,
    maxconn=20,
    host='localhost',
    database='mydb',
    user='postgres',
    password='password'
)

How can I optimise performance when handling huge datasets?

Use these techniques:

  • Server‑side cursors for streaming large result sets
  • execute_values() for fast bulk inserts
  • COPY for massive import/export tasks
  • Connection pooling to reduce connection overhead

Does psycopg2 support JSON data?

Yes, psycopg2 automatically maps PostgreSQL JSON/JSONB fields to Python dictionaries and lists. You can also use psycopg2.extras.Json to explicitly send JSON data.

What is the proper way to close connections?

Always close connections and cursors after use. The recommended approach is to use context managers:

with psycopg2.connect(dsn) as conn:
    with conn.cursor() as cur:
        cur.execute("SELECT * FROM users")
        # Connection and cursor are closed automatically

Conclusion

Psycopg2 remains the gold standard for PostgreSQL access in Python thanks to its reliability, performance, and extensive feature set. The library provides everything needed to build robust, scalable applications that work with data efficiently.

Key advantages of psycopg2:

  • Stability and mature codebase
  • Full PostgreSQL compatibility
  • Excellent performance
  • Rich support for diverse data types
  • Broad ecosystem integration

For new projects you may want to explore psycopg3, which offers a modern API and async support while maintaining compatibility with psycopg2. However, for existing codebases and scenarios demanding maximum stability, psycopg2 remains an excellent choice.

$

News