PyMySQL – драйвер для MySQL

онлайн тренажер по питону
Онлайн-тренажер Python для начинающих

Изучайте Python легко и без перегрузки теорией. Решайте практические задачи с автоматической проверкой, получайте подсказки на русском языке и пишите код прямо в браузере — без необходимости что-либо устанавливать.

Начать курс

Введение

MySQL — одна из самых популярных реляционных баз данных, широко используемая в веб-разработке и корпоративных решениях. Для работы с MySQL в Python часто используется драйвер PyMySQL — чисто Python-реализация протокола MySQL. Он прост в использовании, активно поддерживается и совместим с популярными библиотеками, такими как SQLAlchemy и Pandas.

PyMySQL представляет собой драйвер для MySQL, который позволяет подключаться к базе данных, выполнять SQL-запросы, управлять транзакциями и интегрироваться с веб-фреймворками без сложной настройки. В отличие от других драйверов, PyMySQL написан полностью на Python, что делает его кроссплатформенным и простым в установке.

Что такое PyMySQL

PyMySQL является клиентской библиотекой для Python, которая реализует протокол MySQL Client/Server. Она предоставляет интерфейс для подключения к серверу MySQL и выполнения SQL-запросов. Библиотека следует спецификации Python Database API 2.0 (PEP 249), что обеспечивает совместимость с другими Python-приложениями.

Основные характеристики PyMySQL

  • Чистый Python: Не требует внешних зависимостей на C/C++
  • Совместимость: Поддерживает MySQL версий 5.5 и выше
  • Безопасность: Встроенная защита от SQL-инъекций
  • Производительность: Оптимизирован для быстрого выполнения запросов
  • Простота использования: Интуитивно понятный API

Установка и подключение к MySQL

Установка

pip install pymysql

Для дополнительной функциональности также рекомендуется установить:

pip install pymysql[rsa]  # для поддержки RSA-аутентификации

Подключение к базе данных

import pymysql

connection = pymysql.connect(
    host='localhost',
    user='root',
    password='password',
    database='test_db',
    port=3306,
    charset='utf8mb4',
    cursorclass=pymysql.cursors.DictCursor
)

DictCursor позволяет получать результаты в виде словарей, а не кортежей, что делает работу с данными более удобной.

Параметры подключения

Параметр Описание Тип По умолчанию
host Хост сервера MySQL str 'localhost'
port Порт сервера int 3306
user Имя пользователя str None
password Пароль str None
database Имя базы данных str None
charset Кодировка str 'utf8mb4'
autocommit Автоматический коммит bool False
cursorclass Класс курсора class Cursor

Создание базы данных и таблиц

# Создание базы данных
with connection.cursor() as cursor:
    cursor.execute("CREATE DATABASE IF NOT EXISTS test_db")

# Создание таблицы
with connection.cursor() as cursor:
    cursor.execute("""
        CREATE TABLE IF NOT EXISTS users (
            id INT AUTO_INCREMENT PRIMARY KEY,
            name VARCHAR(100) NOT NULL,
            email VARCHAR(100) UNIQUE,
            age INT,
            created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
        )
    """)
connection.commit()

CRUD-операции с PyMySQL

Вставка данных

# Вставка одной записи
with connection.cursor() as cursor:
    sql = "INSERT INTO users (name, email, age) VALUES (%s, %s, %s)"
    cursor.execute(sql, ("Иван", "ivan@example.com", 25))
    connection.commit()

# Вставка множественных записей
with connection.cursor() as cursor:
    sql = "INSERT INTO users (name, email, age) VALUES (%s, %s, %s)"
    data = [
        ("Петр", "petr@example.com", 30),
        ("Анна", "anna@example.com", 28),
        ("Мария", "maria@example.com", 35)
    ]
    cursor.executemany(sql, data)
    connection.commit()

Чтение данных

# Получение всех записей
with connection.cursor() as cursor:
    cursor.execute("SELECT * FROM users")
    result = cursor.fetchall()
    for user in result:
        print(user)

# Получение одной записи
with connection.cursor() as cursor:
    cursor.execute("SELECT * FROM users WHERE id = %s", (1,))
    user = cursor.fetchone()
    print(user)

# Получение нескольких записей
with connection.cursor() as cursor:
    cursor.execute("SELECT * FROM users WHERE age > %s", (25,))
    users = cursor.fetchmany(10)  # первые 10 записей
    for user in users:
        print(user)

Обновление данных

with connection.cursor() as cursor:
    sql = "UPDATE users SET email = %s WHERE id = %s"
    cursor.execute(sql, ("newemail@example.com", 1))
    connection.commit()
    print(f"Обновлено записей: {cursor.rowcount}")

Удаление данных

with connection.cursor() as cursor:
    sql = "DELETE FROM users WHERE id = %s"
    cursor.execute(sql, (1,))
    connection.commit()
    print(f"Удалено записей: {cursor.rowcount}")

Работа с курсором и управление транзакциями

Типы курсоров

PyMySQL предоставляет несколько типов курсоров:

  • pymysql.cursors.Cursor - стандартный курсор, возвращает кортежи
  • pymysql.cursors.DictCursor - возвращает словари
  • pymysql.cursors.SSCursor - серверный курсор для больших результатов
  • pymysql.cursors.SSDictCursor - серверный курсор с возвратом словарей

Управление транзакциями

try:
    with connection.cursor() as cursor:
        # Начинаем транзакцию
        cursor.execute("INSERT INTO users (name, email) VALUES (%s, %s)", 
                      ("Тест", "test@example.com"))
        cursor.execute("UPDATE users SET age = %s WHERE id = %s", (30, 1))
        
        # Подтверждаем изменения
        connection.commit()
        print("Транзакция успешно завершена")
        
except Exception as e:
    # Откатываем изменения при ошибке
    connection.rollback()
    print(f"Ошибка: {e}")

Методы и функции PyMySQL

Основные методы объекта Connection

Метод Описание
cursor() Создает новый курсор
commit() Подтверждает текущую транзакцию
rollback() Откатывает текущую транзакцию
close() Закрывает соединение
ping() Проверяет соединение с сервером
select_db(database) Выбирает базу данных
begin() Начинает транзакцию
autocommit(flag) Включает/выключает автокоммит

Основные методы объекта Cursor

Метод Описание
execute(query, args) Выполняет SQL-запрос
executemany(query, args) Выполняет запрос для множественных данных
fetchone() Возвращает одну строку результата
fetchall() Возвращает все строки результата
fetchmany(size) Возвращает указанное количество строк
close() Закрывает курсор
callproc(procname, args) Вызывает хранимую процедуру
nextset() Переходит к следующему набору результатов

Свойства курсора

Свойство Описание
rowcount Количество затронутых строк
lastrowid ID последней вставленной строки
description Описание столбцов результата
connection Объект соединения

Безопасность и защита от SQL-инъекций

PyMySQL поддерживает параметризацию запросов, что является основным способом защиты от SQL-инъекций:

# Правильно - безопасно
cursor.execute("SELECT * FROM users WHERE email = %s", ("user@example.com",))

# Неправильно - уязвимо к SQL-инъекциям
email = "user@example.com"
cursor.execute(f"SELECT * FROM users WHERE email = '{email}'")

Экранирование данных

# Ручное экранирование (не рекомендуется)
escaped_string = pymysql.escape_string("O'Reilly")
print(escaped_string)  # O\'Reilly

# Используйте параметризацию вместо экранирования
cursor.execute("INSERT INTO books (title) VALUES (%s)", ("O'Reilly",))

Работа с датами и временем

from datetime import datetime, date, time

# Вставка текущего времени
now = datetime.now()
cursor.execute("INSERT INTO logs (event_time, message) VALUES (%s, %s)", 
               (now, "Событие произошло"))

# Работа с датой
birth_date = date(1990, 5, 15)
cursor.execute("UPDATE users SET birth_date = %s WHERE id = %s", 
               (birth_date, 1))

# Работа с временем
event_time = time(14, 30, 0)
cursor.execute("INSERT INTO events (time, description) VALUES (%s, %s)", 
               (event_time, "Встреча"))

Обработка результатов запросов

Методы получения данных

# fetchone() - получение одной строки
cursor.execute("SELECT * FROM users LIMIT 1")
user = cursor.fetchone()
print(user)

# fetchall() - получение всех строк
cursor.execute("SELECT * FROM users")
all_users = cursor.fetchall()
for user in all_users:
    print(user)

# fetchmany(n) - получение n строк
cursor.execute("SELECT * FROM users")
some_users = cursor.fetchmany(5)
for user in some_users:
    print(user)

Итерация по результатам

cursor.execute("SELECT * FROM users")
while True:
    row = cursor.fetchone()
    if not row:
        break
    print(row)

Работа с большими результатами

Серверные курсоры

# Для больших результатов используйте серверный курсор
connection = pymysql.connect(
    host='localhost',
    user='root',
    password='password',
    database='test_db',
    cursorclass=pymysql.cursors.SSCursor
)

with connection.cursor() as cursor:
    cursor.execute("SELECT * FROM large_table")
    for row in cursor:
        process_row(row)

Работа с JSON данными

import json

# Вставка JSON данных
data = {"name": "Иван", "age": 30, "skills": ["Python", "SQL"]}
cursor.execute("INSERT INTO users (data) VALUES (%s)", (json.dumps(data),))

# Получение JSON данных
cursor.execute("SELECT data FROM users WHERE id = %s", (1,))
result = cursor.fetchone()
if result:
    data = json.loads(result['data'])
    print(data)

Интеграция с популярными библиотеками

Pandas

import pandas as pd
import sqlalchemy

# Создание движка SQLAlchemy
engine = sqlalchemy.create_engine("mysql+pymysql://root:password@localhost/test_db")

# Чтение данных в DataFrame
df = pd.read_sql("SELECT * FROM users", engine)
print(df.head())

# Запись DataFrame в таблицу
df.to_sql("users_copy", con=engine, if_exists="replace", index=False)

SQLAlchemy

from sqlalchemy import create_engine, text

engine = create_engine("mysql+pymysql://root:password@localhost/test_db")

# Выполнение запроса
with engine.connect() as conn:
    result = conn.execute(text("SELECT * FROM users WHERE age > :age"), 
                         {"age": 25})
    for row in result:
        print(row)

Интеграция с веб-фреймворками

Flask

from flask import Flask, jsonify
import pymysql

app = Flask(__name__)

def get_db_connection():
    return pymysql.connect(
        host='localhost',
        user='root',
        password='password',
        database='test_db',
        cursorclass=pymysql.cursors.DictCursor
    )

@app.route('/users')
def get_users():
    connection = get_db_connection()
    try:
        with connection.cursor() as cursor:
            cursor.execute("SELECT * FROM users")
            users = cursor.fetchall()
            return jsonify(users)
    finally:
        connection.close()

@app.route('/users/<int:user_id>')
def get_user(user_id):
    connection = get_db_connection()
    try:
        with connection.cursor() as cursor:
            cursor.execute("SELECT * FROM users WHERE id = %s", (user_id,))
            user = cursor.fetchone()
            if user:
                return jsonify(user)
            return jsonify({"error": "User not found"}), 404
    finally:
        connection.close()

Django

В settings.py:

DATABASES = {
    'default': {
        'ENGINE': 'django.db.backends.mysql',
        'NAME': 'test_db',
        'USER': 'root',
        'PASSWORD': 'password',
        'HOST': 'localhost',
        'PORT': '3306',
        'OPTIONS': {
            'init_command': "SET sql_mode='STRICT_TRANS_TABLES'",
            'charset': 'utf8mb4',
        },
    }
}

FastAPI

from fastapi import FastAPI, HTTPException
import pymysql
from typing import List, Optional

app = FastAPI()

def get_db_connection():
    return pymysql.connect(
        host='localhost',
        user='root',
        password='password',
        database='test_db',
        cursorclass=pymysql.cursors.DictCursor
    )

@app.get("/users")
async def get_users():
    connection = get_db_connection()
    try:
        with connection.cursor() as cursor:
            cursor.execute("SELECT * FROM users")
            users = cursor.fetchall()
            return users
    finally:
        connection.close()

@app.post("/users")
async def create_user(name: str, email: str, age: Optional[int] = None):
    connection = get_db_connection()
    try:
        with connection.cursor() as cursor:
            cursor.execute(
                "INSERT INTO users (name, email, age) VALUES (%s, %s, %s)",
                (name, email, age)
            )
            connection.commit()
            return {"id": cursor.lastrowid, "name": name, "email": email, "age": age}
    except Exception as e:
        raise HTTPException(status_code=400, detail=str(e))
    finally:
        connection.close()

Обработка ошибок и исключений

Иерархия исключений PyMySQL

from pymysql.err import (
    MySQLError,
    OperationalError,
    IntegrityError,
    DataError,
    InterfaceError,
    InternalError,
    ProgrammingError,
    NotSupportedError
)

try:
    with connection.cursor() as cursor:
        cursor.execute("SELECT * FROM non_existent_table")
except OperationalError as e:
    print(f"Ошибка операции: {e}")
except ProgrammingError as e:
    print(f"Ошибка в SQL-запросе: {e}")
except IntegrityError as e:
    print(f"Нарушение целостности данных: {e}")
except MySQLError as e:
    print(f"Общая ошибка MySQL: {e}")

Типы ошибок

Исключение Описание
MySQLError Базовое исключение для всех ошибок MySQL
OperationalError Ошибки соединения и операций
IntegrityError Нарушение ограничений целостности
DataError Ошибки в данных
InterfaceError Ошибки интерфейса
InternalError Внутренние ошибки MySQL
ProgrammingError Ошибки программирования
NotSupportedError Неподдерживаемые операции

Пул соединений

Простой пул соединений

import threading
import queue
import pymysql

class ConnectionPool:
    def __init__(self, max_connections=10, **kwargs):
        self.max_connections = max_connections
        self.connection_kwargs = kwargs
        self.pool = queue.Queue(maxsize=max_connections)
        self.lock = threading.Lock()
        
        # Создаем начальные соединения
        for _ in range(max_connections):
            conn = pymysql.connect(**kwargs)
            self.pool.put(conn)
    
    def get_connection(self):
        try:
            return self.pool.get(timeout=10)
        except queue.Empty:
            raise Exception("Нет доступных соединений")
    
    def return_connection(self, conn):
        if conn.open:
            self.pool.put(conn)
    
    def close_all(self):
        while not self.pool.empty():
            conn = self.pool.get()
            conn.close()

# Использование
pool = ConnectionPool(
    max_connections=5,
    host='localhost',
    user='root',
    password='password',
    database='test_db'
)

def worker():
    conn = pool.get_connection()
    try:
        with conn.cursor() as cursor:
            cursor.execute("SELECT * FROM users")
            result = cursor.fetchall()
            print(result)
    finally:
        pool.return_connection(conn)

Производительность и оптимизация

Советы по оптимизации

  1. Используйте параметризованные запросы
  2. Повторно используйте соединения
  3. Используйте индексы в базе данных
  4. Ограничивайте количество возвращаемых записей
  5. Используйте серверные курсоры для больших результатов

Пример оптимизированного кода

# Неэффективно
for user_id in user_ids:
    cursor.execute("SELECT * FROM users WHERE id = %s", (user_id,))
    user = cursor.fetchone()
    process_user(user)

# Эффективно
placeholders = ','.join(['%s'] * len(user_ids))
cursor.execute(f"SELECT * FROM users WHERE id IN ({placeholders})", user_ids)
users = cursor.fetchall()
for user in users:
    process_user(user)

Работа с хранимыми процедурами

# Создание хранимой процедуры
cursor.execute("""
    CREATE PROCEDURE GetUsersByAge(IN min_age INT)
    BEGIN
        SELECT * FROM users WHERE age >= min_age;
    END
""")

# Вызов хранимой процедуры

Новости