Введение
Apache Cassandra — это высокопроизводительная, масштабируемая и отказоустойчивая NoSQL база данных, предназначенная для обработки огромных объёмов данных с минимальными задержками. Она используется такими компаниями, как Netflix, Apple, eBay и Instagram для обработки петабайтов данных в реальном времени.
Для Python существует официальный драйвер — cassandra-driver, предоставляющий полный доступ к возможностям Cassandra через Python-программы. Он поддерживает синхронные и асинхронные запросы, подготовленные выражения, работу с UUID, TTL и многое другое.
Что такое cassandra-driver
Cassandra-driver — это официальный Python-клиент для Apache Cassandra, разработанный командой DataStax. Это высокопроизводительный драйвер, который обеспечивает полную интеграцию между Python-приложениями и кластером Cassandra.
Основные возможности драйвера
- Синхронные и асинхронные операции
- Подготовленные запросы для повышения производительности
- Автоматическая маршрутизация запросов к оптимальным узлам
- Поддержка всех типов данных Cassandra
- Встроенная обработка ошибок и повторных попыток
- Поддержка аутентификации и SSL-шифрования
- Балансировка нагрузки между узлами кластера
Установка и настройка
Установка драйвера
pip install cassandra-driver
Для работы с некоторыми функциями могут потребоваться дополнительные зависимости:
pip install cassandra-driver[cqlengine] # для ORM
pip install cassandra-driver[graph] # для работы с графами
Подключение к кластеру
from cassandra.cluster import Cluster
from cassandra.auth import PlainTextAuthProvider
# Простое подключение
cluster = Cluster(['127.0.0.1'])
session = cluster.connect()
# Подключение с аутентификацией
auth_provider = PlainTextAuthProvider(username='cassandra', password='cassandra')
cluster = Cluster(['127.0.0.1'], auth_provider=auth_provider)
session = cluster.connect()
# Подключение с SSL
from cassandra.cluster import Cluster
from ssl import PROTOCOL_TLSv1_2, CERT_REQUIRED
cluster = Cluster(['127.0.0.1'],
ssl_context={
'ssl_version': PROTOCOL_TLSv1_2,
'cert_reqs': CERT_REQUIRED,
'ca_certs': '/path/to/ca.pem'
})
session = cluster.connect()
Подключение к конкретному keyspace
session.set_keyspace('my_keyspace')
# или
session = cluster.connect('my_keyspace')
Ключевые концепции Cassandra
Архитектура данных
- Keyspace — аналог базы данных в SQL, содержит настройки репликации
- Таблица — содержит строки и колонки, но с иным внутренним устройством
- Partition key — определяет, на каком узле хранится запись
- Clustering key — влияет на порядок хранения внутри партиции
- Primary key — состоит из partition key и clustering key
Особенности модели данных
Cassandra не поддерживает JOIN и агрегаты как в SQL, зато масштабируется горизонтально и обеспечивает высокую доступность данных. Проектирование схемы должно основываться на паттернах доступа к данным.
Создание Keyspace и таблиц
Создание Keyspace
session.execute("""
CREATE KEYSPACE IF NOT EXISTS my_keyspace
WITH replication = {
'class': 'SimpleStrategy',
'replication_factor': '3'
}
""")
# Для продакшена рекомендуется NetworkTopologyStrategy
session.execute("""
CREATE KEYSPACE IF NOT EXISTS my_keyspace
WITH replication = {
'class': 'NetworkTopologyStrategy',
'datacenter1': 3,
'datacenter2': 2
}
""")
Создание таблиц
session.execute("""
CREATE TABLE IF NOT EXISTS users (
id UUID PRIMARY KEY,
name TEXT,
email TEXT,
created_at TIMESTAMP,
age INT,
tags SET<TEXT>,
metadata MAP<TEXT, TEXT>
)
""")
# Таблица с составным ключом
session.execute("""
CREATE TABLE IF NOT EXISTS user_events (
user_id UUID,
event_time TIMESTAMP,
event_type TEXT,
data TEXT,
PRIMARY KEY (user_id, event_time)
) WITH CLUSTERING ORDER BY (event_time DESC)
""")
CRUD-операции в cassandra-driver
Вставка данных
import uuid
from datetime import datetime
# Простая вставка
session.execute(
"INSERT INTO users (id, name, email, created_at, age) VALUES (%s, %s, %s, %s, %s)",
(uuid.uuid4(), "Иван", "ivan@example.com", datetime.now(), 30)
)
# Вставка с TTL
session.execute(
"INSERT INTO users (id, name, email) VALUES (%s, %s, %s) USING TTL 3600",
(uuid.uuid4(), "Петр", "petr@example.com")
)
# Условная вставка
session.execute(
"INSERT INTO users (id, name, email) VALUES (%s, %s, %s) IF NOT EXISTS",
(uuid.uuid4(), "Анна", "anna@example.com")
)
Чтение данных
# Получение всех записей
rows = session.execute("SELECT * FROM users")
for row in rows:
print(f"ID: {row.id}, Name: {row.name}, Email: {row.email}")
# Получение одной записи
row = session.execute("SELECT * FROM users WHERE id=%s", (user_id,)).one()
if row:
print(f"User: {row.name}")
# Пагинация
from cassandra.query import SimpleStatement
statement = SimpleStatement("SELECT * FROM users", fetch_size=100)
for row in session.execute(statement):
print(row.name)
# Получение с ограничением
rows = session.execute("SELECT * FROM users LIMIT 10")
Обновление данных
# Обновление полей
session.execute(
"UPDATE users SET email=%s, age=%s WHERE id=%s",
("new_email@example.com", 31, user_id)
)
# Условное обновление
session.execute(
"UPDATE users SET email=%s WHERE id=%s IF age=%s",
("updated@example.com", user_id, 30)
)
# Обновление коллекций
session.execute(
"UPDATE users SET tags = tags + %s WHERE id = %s",
({'programming', 'python'}, user_id)
)
Удаление данных
# Удаление записи
session.execute("DELETE FROM users WHERE id=%s", (user_id,))
# Удаление конкретных полей
session.execute("DELETE email FROM users WHERE id=%s", (user_id,))
# Условное удаление
session.execute("DELETE FROM users WHERE id=%s IF age=%s", (user_id, 30))
Работа с асинхронными запросами
Базовые асинхронные операции
# Асинхронное выполнение
future = session.execute_async("SELECT * FROM users")
rows = future.result() # Блокирующий вызов
# Неблокирующая проверка
if future.has_more_pages:
future.start_fetching_next_page()
# Обработка результатов
def handle_success(results):
for row in results:
print(f"User: {row.name}")
def handle_error(exception):
print(f"Ошибка: {exception}")
future = session.execute_async("SELECT * FROM users")
future.add_callbacks(callback=handle_success, errback=handle_error)
Параллельные запросы
import concurrent.futures
def execute_query(query, params):
return session.execute(query, params)
# Параллельное выполнение нескольких запросов
queries = [
("SELECT * FROM users WHERE id=%s", (id1,)),
("SELECT * FROM users WHERE id=%s", (id2,)),
("SELECT * FROM users WHERE id=%s", (id3,))
]
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
futures = [executor.submit(execute_query, query, params) for query, params in queries]
results = [future.result() for future in concurrent.futures.as_completed(futures)]
Подготовленные запросы и безопасность
Создание и использование подготовленных запросов
# Создание подготовленного запроса
prepared_insert = session.prepare(
"INSERT INTO users (id, name, email, age) VALUES (?, ?, ?, ?)"
)
# Выполнение подготовленного запроса
session.execute(prepared_insert, (uuid.uuid4(), "Мария", "maria@example.com", 25))
# Подготовленный запрос с именованными параметрами
prepared_select = session.prepare(
"SELECT * FROM users WHERE name = :name AND age > :min_age"
)
rows = session.execute(prepared_select, {'name': 'Мария', 'min_age': 20})
Преимущества подготовленных запросов
- Повышение производительности за счет кэширования плана выполнения
- Защита от инъекций CQL
- Автоматическая маршрутизация запросов к оптимальным узлам
- Снижение нагрузки на парсер CQL
Работа с типами данных
Основные типы данных
from cassandra.util import uuid1, uuid4
from datetime import datetime, date, time
from decimal import Decimal
# UUID типы
user_id = uuid4() # Случайный UUID
time_id = uuid1() # UUID на основе времени
# Временные типы
now = datetime.now()
today = date.today()
current_time = time(12, 30, 45)
# Числовые типы
big_number = Decimal('123.456')
small_int = 42
big_int = 9223372036854775807
Работа с коллекциями
# Множества (SET)
session.execute(
"INSERT INTO users (id, name, tags) VALUES (%s, %s, %s)",
(uuid.uuid4(), "Владимир", {'python', 'cassandra', 'nosql'})
)
# Списки (LIST)
session.execute(
"INSERT INTO user_events (user_id, event_time, event_data) VALUES (%s, %s, %s)",
(user_id, datetime.now(), ['login', 'browse', 'logout'])
)
# Словари (MAP)
session.execute(
"INSERT INTO users (id, name, metadata) VALUES (%s, %s, %s)",
(uuid.uuid4(), "Елена", {'department': 'IT', 'level': 'senior'})
)
Масштабируемость и распределённость
Настройка консистентности
from cassandra.query import SimpleStatement
from cassandra import ConsistencyLevel
# Различные уровни консистентности
statement = SimpleStatement(
"SELECT * FROM users WHERE id=%s",
consistency_level=ConsistencyLevel.ONE
)
# Доступные уровни:
# ONE, TWO, THREE, QUORUM, ALL, LOCAL_QUORUM, EACH_QUORUM, LOCAL_ONE
Политики маршрутизации
from cassandra.cluster import Cluster
from cassandra.policies import DCAwareRoundRobinPolicy, TokenAwarePolicy
# Политика с учетом дата-центра
policy = DCAwareRoundRobinPolicy(local_dc='datacenter1')
# Комбинированная политика
policy = TokenAwarePolicy(DCAwareRoundRobinPolicy(local_dc='datacenter1'))
cluster = Cluster(['127.0.0.1'], load_balancing_policy=policy)
Работа с TimeUUID и TTL
TimeUUID для временных рядов
from cassandra.util import uuid1, unix_time_from_uuid1
# Создание TimeUUID
time_uuid = uuid1()
# Извлечение времени из TimeUUID
timestamp = unix_time_from_uuid1(time_uuid)
# Использование в запросах
session.execute(
"INSERT INTO events (id, event_type, data) VALUES (%s, %s, %s)",
(time_uuid, "user_action", "click_button")
)
# Запросы по времени
session.execute(
"SELECT * FROM events WHERE id > maxTimeuuid(%s) AND id < minTimeuuid(%s)",
(start_time, end_time)
)
TTL (Time To Live)
# Установка TTL при вставке
session.execute(
"INSERT INTO cache (key, value) VALUES (%s, %s) USING TTL 3600",
("temp_key", "temp_value")
)
# Обновление TTL
session.execute(
"UPDATE cache USING TTL 7200 SET value = %s WHERE key = %s",
("new_value", "temp_key")
)
# Проверка оставшегося TTL
row = session.execute("SELECT key, value, TTL(value) FROM cache WHERE key = %s", ("temp_key",)).one()
if row:
print(f"TTL: {row.ttl}")
Обработка ошибок и отказоустойчивость
Основные типы исключений
from cassandra import ReadTimeout, WriteTimeout, Unavailable, InvalidRequest
from cassandra.cluster import NoHostAvailable
try:
session.execute("SELECT * FROM users WHERE id = %s", (user_id,))
except ReadTimeout:
print("Превышено время ожидания чтения")
except WriteTimeout:
print("Превышено время ожидания записи")
except Unavailable:
print("Недостаточно узлов для выполнения запроса")
except InvalidRequest as e:
print(f"Неверный запрос: {e}")
except NoHostAvailable:
print("Нет доступных узлов для подключения")
Настройка политик повторов
from cassandra.policies import RetryPolicy
from cassandra.cluster import Cluster
class CustomRetryPolicy(RetryPolicy):
def on_read_timeout(self, query, consistency, required_responses,
received_responses, data_retrieved, retry_num):
if retry_num < 3:
return (self.RETRY, consistency)
return (self.RETHROW, None)
cluster = Cluster(['127.0.0.1'], default_retry_policy=CustomRetryPolicy())
Интеграция с веб-фреймворками
Django
# settings.py
DATABASES = {
'default': {
'ENGINE': 'django.db.backends.postgresql',
# ... PostgreSQL конфигурация
}
}
# Отдельный модуль для Cassandra
from cassandra.cluster import Cluster
class CassandraConnection:
_instance = None
def __new__(cls):
if cls._instance is None:
cls._instance = super().__new__(cls)
cls._instance.cluster = Cluster(['127.0.0.1'])
cls._instance.session = cls._instance.cluster.connect('my_keyspace')
return cls._instance
def get_session(self):
return self.session
# views.py
from django.http import JsonResponse
from .cassandra_connection import CassandraConnection
def get_user_events(request, user_id):
cassandra = CassandraConnection()
session = cassandra.get_session()
rows = session.execute(
"SELECT * FROM user_events WHERE user_id = %s",
(user_id,)
)
events = [{'event_type': row.event_type, 'data': row.data} for row in rows]
return JsonResponse({'events': events})
Flask
from flask import Flask, jsonify
from cassandra.cluster import Cluster
app = Flask(__name__)
# Инициализация подключения
cluster = Cluster(['127.0.0.1'])
session = cluster.connect('my_keyspace')
@app.route('/users/<user_id>')
def get_user(user_id):
try:
row = session.execute(
"SELECT * FROM users WHERE id = %s",
(user_id,)
).one()
if row:
return jsonify({
'id': str(row.id),
'name': row.name,
'email': row.email
})
else:
return jsonify({'error': 'User not found'}), 404
except Exception as e:
return jsonify({'error': str(e)}), 500
if __name__ == '__main__':
app.run(debug=True)
FastAPI
from fastapi import FastAPI, Depends, HTTPException
from cassandra.cluster import Cluster
from pydantic import BaseModel
import uuid
app = FastAPI()
# Модели данных
class User(BaseModel):
id: str
name: str
email: str
class UserCreate(BaseModel):
name: str
email: str
# Dependency для получения сессии
def get_cassandra_session():
cluster = Cluster(['127.0.0.1'])
session = cluster.connect('my_keyspace')
try:
yield session
finally:
cluster.shutdown()
@app.post("/users/", response_model=User)
async def create_user(user: UserCreate, session=Depends(get_cassandra_session)):
user_id = uuid.uuid4()
session.execute(
"INSERT INTO users (id, name, email) VALUES (%s, %s, %s)",
(user_id, user.name, user.email)
)
return User(id=str(user_id), name=user.name, email=user.email)
@app.get("/users/{user_id}", response_model=User)
async def get_user(user_id: str, session=Depends(get_cassandra_session)):
try:
row = session.execute(
"SELECT * FROM users WHERE id = %s",
(uuid.UUID(user_id),)
).one()
if row:
return User(id=str(row.id), name=row.name, email=row.email)
else:
raise HTTPException(status_code=404, detail="User not found")
except ValueError:
raise HTTPException(status_code=400, detail="Invalid user ID format")
Тестирование с cassandra-driver
Настройка тестовой среды
import unittest
from cassandra.cluster import Cluster
from cassandra.query import SimpleStatement
class TestCassandraIntegration(unittest.TestCase):
@classmethod
def setUpClass(cls):
cls.cluster = Cluster(['127.0.0.1'])
cls.session = cls.cluster.connect()
# Создание тестового keyspace
cls.session.execute("""
CREATE KEYSPACE IF NOT EXISTS test_keyspace
WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1}
""")
cls.session.set_keyspace('test_keyspace')
# Создание тестовых таблиц
cls.session.execute("""
CREATE TABLE IF NOT EXISTS test_users (
id UUID PRIMARY KEY,
name TEXT,
email TEXT
)
""")
def setUp(self):
# Очистка таблиц перед каждым тестом
self.session.execute("TRUNCATE test_users")
def test_insert_and_select(self):
user_id = uuid.uuid4()
# Вставка
self.session.execute(
"INSERT INTO test_users (id, name, email) VALUES (%s, %s, %s)",
(user_id, "Test User", "test@example.com")
)
# Проверка
row = self.session.execute(
"SELECT * FROM test_users WHERE id = %s",
(user_id,)
).one()
self.assertEqual(row.name, "Test User")
self.assertEqual(row.email, "test@example.com")
@classmethod
def tearDownClass(cls):
cls.session.execute("DROP KEYSPACE IF EXISTS test_keyspace")
cls.cluster.shutdown()
if __name__ == '__main__':
unittest.main()
Использование Docker для тестирования
import docker
import time
from cassandra.cluster import Cluster
class CassandraTestContainer:
def __init__(self):
self.client = docker.from_env()
self.container = None
def start(self):
self.container = self.client.containers.run(
'cassandra:3.11',
ports={'9042/tcp': 9042},
environment={'CASSANDRA_START_RPC': 'true'},
detach=True
)
# Ожидание запуска
time.sleep(30)
# Проверка готовности
cluster = Cluster(['127.0.0.1'])
session = cluster.connect()
cluster.shutdown()
def stop(self):
if self.container:
self.container.stop()
self.container.remove()
Мониторинг и производительность
Метрики подключения
from cassandra.cluster import Cluster
from cassandra.metrics import Metrics
# Включение метрик
cluster = Cluster(['127.0.0.1'], metrics_enabled=True)
session = cluster.connect()
# Получение метрик
metrics = cluster.metrics
print(f"Запросов выполнено: {metrics.stats.request_timer.count}")
print(f"Ошибок: {metrics.stats.errors}")
Профилирование запросов
import time
from cassandra.cluster import Cluster
def profile_query(session, query, params=None):
start_time = time.time()
try:
result = session.execute(query, params)
rows = list(result) # Материализация результатов
end_time = time.time()
execution_time = end_time - start_time
print(f"Запрос выполнен за: {execution_time:.4f} секунд")
print(f"Количество строк: {len(rows)}")
return rows
except Exception as e:
print(f"Ошибка выполнения запроса: {e}")
return None
# Использование
session = Cluster(['127.0.0.1']).connect('my_keyspace')
profile_query(session, "SELECT * FROM users LIMIT 1000")
Таблица методов и функций cassandra-driver
| Метод/Функция | Описание | Пример использования |
|---|---|---|
Cluster() |
Создание подключения к кластеру | cluster = Cluster(['127.0.0.1']) |
connect() |
Установка соединения с кластером | session = cluster.connect() |
execute() |
Синхронное выполнение запроса | session.execute("SELECT * FROM users") |
execute_async() |
Асинхронное выполнение запроса | future = session.execute_async("SELECT * FROM users") |
prepare() |
Создание подготовленного запроса | prepared = session.prepare("INSERT INTO users...") |
set_keyspace() |
Переключение на другой keyspace | session.set_keyspace('my_keyspace') |
shutdown() |
Закрытие соединения с кластером | cluster.shutdown() |
one() |
Получение одной записи из результата | row = session.execute("SELECT...").one() |
all() |
Получение всех записей из результата | rows = session.execute("SELECT...").all() |
uuid1() |
Создание TimeUUID | from cassandra.util import uuid1; id = uuid1() |
uuid4() |
Создание случайного UUID | import uuid; id = uuid.uuid4() |
BatchStatement() |
Создание пакетного запроса | batch = BatchStatement() |
add() |
Добавление запроса в пакет | batch.add(SimpleStatement("INSERT...")) |
SimpleStatement() |
Создание простого запроса | stmt = SimpleStatement("SELECT * FROM users") |
bind() |
Привязка параметров к подготовленному запросу | bound = prepared.bind((id, name)) |
result() |
Получение результата асинхронного запроса | rows = future.result() |
add_callbacks() |
Добавление callback для асинхронного запроса | future.add_callbacks(success_callback, error_callback) |
register_user_type() |
Регистрация пользовательского типа | session.register_user_type(keyspace, type_name, User) |
row_factory |
Настройка фабрики строк | session.row_factory = dict_factory |
default_timeout |
Настройка тайм-аута по умолчанию | session.default_timeout = 30.0 |
get_hosts() |
Получение списка хостов кластера | hosts = cluster.metadata.get_hosts() |
refresh_schema() |
Обновление схемы метаданных | cluster.refresh_schema_metadata() |
Сравнение с другими СУБД и хранилищами
| Хранилище | Тип | Масштабирование | Производительность | Поддержка SQL | Консистентность |
|---|---|---|---|---|---|
| Cassandra | NoSQL (Wide Column) | Горизонтальное | Очень высокая | Частично (CQL) | Настраиваемая |
| PostgreSQL | Реляционное | Вертикальное | Средняя | Полная | ACID |
| MongoDB | Документное | Горизонтальное | Высокая | Частично | Настраиваемая |
| Redis | Ключ-значение | Горизонтальное | Очень высокая | Нет | Eventual |
| MySQL | Реляционное | Ограниченное | Средняя | Полная | ACID |
| ElasticSearch | Полнотекстовый поиск | Горизонтальное | Высокая | Частично | Eventual |
Лучшие практики
Проектирование схемы
- Проектируйте таблицы под конкретные запросы
- Избегайте больших партиций (>100MB)
- Используйте составные ключи для группировки данных
- Денормализуйте данные для оптимизации запросов
Оптимизация производительности
- Используйте подготовленные запросы для повторяющихся операций
- Настройте размер пула подключений
- Используйте асинхронные операции для высокой нагрузки
- Мониторьте метрики производительности
Обработка ошибок
- Реализуйте retry-логику для временных сбоев
- Используйте circuit breaker для защиты от каскадных сбоев
- Логируйте все исключения для анализа
Часто задаваемые вопросы
Что такое cassandra-driver?
Это официальный Python-клиент для подключения и работы с Apache Cassandra. Он предоставляет высокоуровневый API для выполнения операций с данными, управления подключениями и обработки результатов.
Поддерживает ли Cassandra SQL?
Нет, Cassandra использует собственный язык запросов CQL (Cassandra Query Language), который похож на SQL, но не поддерживает JOIN, GROUP BY и другие сложные операции. CQL оптимизирован для распределенной архитектуры Cassandra.
Какие типы данных поддерживает Cassandra?
Cassandra поддерживает широкий спектр типов данных: UUID, text, int, bigint, float, double, boolean, timestamp, date, time, decimal, blob, inet, а также коллекции (set, list, map) и пользовательские типы данных.
Есть ли асинхронность в cassandra-driver?
Да, драйвер поддерживает асинхронные операции через метод execute_async() с возможностью использования callback-функций для обработки результатов без блокировки основного потока.
Как реализовать TTL в Cassandra?
TTL (Time To Live) можно установить при вставке данных с помощью конструкции USING TTL, указав время жизни записи в секундах. После истечения TTL запись автоматически удаляется.
Подходит ли Cassandra для аналитики?
Да, Cassandra отлично подходит для аналитики временных рядов, логов и больших объемов данных. Однако для сложных аналитических запросов с агрегацией лучше использовать специализированные решения вроде Apache Spark или ClickHouse.
Как обеспечить безопасность при работе с Cassandra?
Используйте аутентификацию, SSL-шифрование, подготовленные запросы для защиты от инъекций, настройте правильные права доступа и регулярно обновляйте драйвер до последней версии.
Можно ли использовать ORM с Cassandra?
Да, можно использовать cqlengine - ORM для Cassandra, который входит в состав cassandra-driver. Он позволяет работать с данными через объектно-ориентированный интерфейс.
Заключение
Cassandra-driver — это мощный инструмент для Python-разработчиков, которым важно обеспечить масштабируемость, отказоустойчивость и быструю обработку больших данных. Apache Cassandra в сочетании с cassandra-driver предоставляет решение корпоративного уровня для высоконагруженных систем.
Ключевые преимущества использования cassandra-driver включают простоту интеграции, высокую производительность, гибкость настройки и надежность работы с распределенными данными. Cassandra отлично подходит для проектов с интенсивной записью, временными рядами, аналитическими системами и приложениями Интернета вещей.
Благодаря comprehensive API и обширной документации, cassandra-driver обеспечивает простую, надежную и эффективную интеграцию Python-приложений с Apache Cassandra, делая разработку масштабируемых приложений более доступной и управляемой.
Настоящее и будущее развития ИИ: классической математики уже недостаточно
Эксперты предупредили о рисках фейковой благотворительности с помощью ИИ
В России разработали универсального ИИ-агента для роботов и индустриальных процессов