Cassandra-Driver-work with Apache Cassandra

онлайн тренажер по питону
Online Python Trainer for Beginners

Learn Python easily without overwhelming theory. Solve practical tasks with automatic checking, get hints in Russian, and write code directly in your browser — no installation required.

Start Course

Introduction

Apache Cassandra — a high‑performance, scalable, and fault‑tolerant NoSQL database designed to handle massive data volumes with minimal latency. It powers companies such as Netflix, Apple, eBay, and Instagram for real‑time processing of petabytes of data.

For Python there is the official driver — , which provides full access to Cassandra’s capabilities from Python applications. It supports synchronous and asynchronous queries, prepared statements, UUID handling, TTL, and much more.

What Is cassandra-driver?

Cassandra‑driver is the official Python client for Apache Cassandra, developed by the DataStax team. It is a high‑throughput driver that delivers seamless integration between Python applications and a Cassandra cluster.

Main driver features

  • Synchronous and asynchronous operations
  • Prepared statements for performance gains
  • Automatic routing of queries to optimal nodes
  • Support for all Cassandra data types
  • Built‑in error handling and retry logic
  • Authentication and SSL encryption support
  • Load balancing across cluster nodes

Installation and configuration

Installing the driver

pip install cassandra-driver

Additional extras may be required for certain features:

pip install cassandra-driver[cqlengine]  # ORM support
pip install cassandra-driver[graph]      # Graph functionality

Connecting to a cluster

from cassandra.cluster import Cluster
from cassandra.auth import PlainTextAuthProvider

# Simple connection
cluster = Cluster(['127.0.0.1'])
session = cluster.connect()

# Connection with authentication
auth_provider = PlainTextAuthProvider(username='cassandra', password='cassandra')
cluster = Cluster(['127.0.0.1'], auth_provider=auth_provider)
session = cluster.connect()

# Connection with SSL
from cassandra.cluster import Cluster
from ssl import PROTOCOL_TLSv1_2, CERT_REQUIRED

cluster = Cluster(['127.0.0.1'], 
                  ssl_context={
                      'ssl_version': PROTOCOL_TLSv1_2,
                      'cert_reqs': CERT_REQUIRED,
                      'ca_certs': '/path/to/ca.pem'
                  })
session = cluster.connect()

Connecting to a specific keyspace

session.set_keyspace('my_keyspace')
# or
session = cluster.connect('my_keyspace')

Key Cassandra concepts

Data architecture

  • Keyspace — the equivalent of a database in SQL; holds replication settings
  • Table — stores rows and columns with a different internal layout
  • Partition key — determines which node stores a row
  • Clustering key — defines the sort order within a partition
  • Primary key — composed of the partition key and optional clustering key(s)

Data‑model specifics

Cassandra does not support JOINs or aggregates like SQL, but it scales horizontally and provides high availability. Schema design should be driven by query patterns.

Creating keyspaces and tables

Keyspace creation

session.execute("""
    CREATE KEYSPACE IF NOT EXISTS my_keyspace
    WITH replication = {
        'class': 'SimpleStrategy',
        'replication_factor': '3'
    }
""")

# For production, NetworkTopologyStrategy is recommended
session.execute("""
    CREATE KEYSPACE IF NOT EXISTS my_keyspace
    WITH replication = {
        'class': 'NetworkTopologyStrategy',
        'datacenter1': 3,
        'datacenter2': 2
    }
""")

Table creation

session.execute("""
    CREATE TABLE IF NOT EXISTS users (
        id UUID PRIMARY KEY,
        name TEXT,
        email TEXT,
        created_at TIMESTAMP,
        age INT,
        tags SET<TEXT>,
        metadata MAP<TEXT, TEXT>
    )
""")

# Table with a composite key
session.execute("""
    CREATE TABLE IF NOT EXISTS user_events (
        user_id UUID,
        event_time TIMESTAMP,
        event_type TEXT,
        data TEXT,
        PRIMARY KEY (user_id, event_time)
    ) WITH CLUSTERING ORDER BY (event_time DESC)
""")

CRUD operations with cassandra-driver

Inserting data

import uuid
from datetime import datetime

# Simple insert
session.execute(
    "INSERT INTO users (id, name, email, created_at, age) VALUES (%s, %s, %s, %s, %s)",
    (uuid.uuid4(), "Ivan", "ivan@example.com", datetime.now(), 30)
)

# Insert with TTL
session.execute(
    "INSERT INTO users (id, name, email) VALUES (%s, %s, %s) USING TTL 3600",
    (uuid.uuid4(), "Petr", "petr@example.com")
)

# Conditional insert
session.execute(
    "INSERT INTO users (id, name, email) VALUES (%s, %s, %s) IF NOT EXISTS",
    (uuid.uuid4(), "Anna", "anna@example.com")
)

Reading data

# Fetch all rows
rows = session.execute("SELECT * FROM users")
for row in rows:
    print(f"ID: {row.id}, Name: {row.name}, Email: {row.email}")

# Fetch a single row
row = session.execute("SELECT * FROM users WHERE id=%s", (user_id,)).one()
if row:
    print(f"User: {row.name}")

# Pagination
from cassandra.query import SimpleStatement
statement = SimpleStatement("SELECT * FROM users", fetch_size=100)
for row in session.execute(statement):
    print(row.name)

# Fetch with limit
rows = session.execute("SELECT * FROM users LIMIT 10")

Updating data

# Update fields
session.execute(
    "UPDATE users SET email=%s, age=%s WHERE id=%s",
    ("new_email@example.com", 31, user_id)
)

# Conditional update
session.execute(
    "UPDATE users SET email=%s WHERE id=%s IF age=%s",
    ("updated@example.com", user_id, 30)
)

# Update collections
session.execute(
    "UPDATE users SET tags = tags + %s WHERE id = %s",
    ({'programming', 'python'}, user_id)
)

Deleting data

# Delete a row
session.execute("DELETE FROM users WHERE id=%s", (user_id,))

# Delete specific columns
session.execute("DELETE email FROM users WHERE id=%s", (user_id,))

# Conditional delete
session.execute("DELETE FROM users WHERE id=%s IF age=%s", (user_id, 30))

Working with asynchronous queries

Basic async operations

# Async execution
future = session.execute_async("SELECT * FROM users")
rows = future.result()  # Blocking call

# Non‑blocking check
if future.has_more_pages:
    future.start_fetching_next_page()

# Result handling
def handle_success(results):
    for row in results:
        print(f"User: {row.name}")

def handle_error(exception):
    print(f"Error: {exception}")

future = session.execute_async("SELECT * FROM users")
future.add_callbacks(callback=handle_success, errback=handle_error)

Parallel queries

import concurrent.futures

def execute_query(query, params):
    return session.execute(query, params)

# Parallel execution of multiple queries
queries = [
    ("SELECT * FROM users WHERE id=%s", (id1,)),
    ("SELECT * FROM users WHERE id=%s", (id2,)),
    ("SELECT * FROM users WHERE id=%s", (id3,))
]

with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
    futures = [executor.submit(execute_query, query, params) for query, params in queries]
    results = [future.result() for future in concurrent.futures.as_completed(futures)]

Prepared statements and security

Creating and using prepared statements

# Create a prepared statement
prepared_insert = session.prepare(
    "INSERT INTO users (id, name, email, age) VALUES (?, ?, ?, ?)"
)

# Execute the prepared statement
session.execute(prepared_insert, (uuid.uuid4(), "Maria", "maria@example.com", 25))

# Prepared statement with named parameters
prepared_select = session.prepare(
    "SELECT * FROM users WHERE name = :name AND age > :min_age"
)

rows = session.execute(prepared_select, {'name': 'Maria', 'min_age': 20})

Benefits of prepared statements

  • Performance boost via execution‑plan caching
  • Protection against CQL injection attacks
  • Automatic routing to optimal nodes
  • Reduced load on the CQL parser

Working with data types

Core data types

from cassandra.util import uuid1, uuid4
from datetime import datetime, date, time
from decimal import Decimal

# UUIDs
user_id = uuid4()  # Random UUID
time_id = uuid1()  # Time‑based UUID

# Temporal types
now = datetime.now()
today = date.today()
current_time = time(12, 30, 45)

# Numeric types
big_number = Decimal('123.456')
small_int = 42
big_int = 9223372036854775807

Working with collections

# Sets (SET)
session.execute(
    "INSERT INTO users (id, name, tags) VALUES (%s, %s, %s)",
    (uuid.uuid4(), "Vladimir", {'python', 'cassandra', 'nosql'})
)

# Lists (LIST)
session.execute(
    "INSERT INTO user_events (user_id, event_time, event_data) VALUES (%s, %s, %s)",
    (user_id, datetime.now(), ['login', 'browse', 'logout'])
)

# Maps (MAP)
session.execute(
    "INSERT INTO users (id, name, metadata) VALUES (%s, %s, %s)",
    (uuid.uuid4(), "Elena", {'department': 'IT', 'level': 'senior'})
)

Scalability and distribution

Consistency settings

from cassandra.query import SimpleStatement
from cassandra import ConsistencyLevel

# Different consistency levels
statement = SimpleStatement(
    "SELECT * FROM users WHERE id=%s",
    consistency_level=ConsistencyLevel.ONE
)

# Available levels:
# ONE, TWO, THREE, QUORUM, ALL, LOCAL_QUORUM, EACH_QUORUM, LOCAL_ONE

Routing policies

from cassandra.cluster import Cluster
from cassandra.policies import DCAwareRoundRobinPolicy, TokenAwarePolicy

# Data‑center aware policy
policy = DCAwareRoundRobinPolicy(local_dc='datacenter1')

# Combined token‑aware policy
policy = TokenAwarePolicy(DCAwareRoundRobinPolicy(local_dc='datacenter1'))

cluster = Cluster(['127.0.0.1'], load_balancing_policy=policy)

Working with TimeUUID and TTL

TimeUUID for time‑series data

from cassandra.util import uuid1, unix_time_from_uuid1

# Create a TimeUUID
time_uuid = uuid1()

# Extract timestamp from TimeUUID
timestamp = unix_time_from_uuid1(time_uuid)

# Use in queries
session.execute(
    "INSERT INTO events (id, event_type, data) VALUES (%s, %s, %s)",
    (time_uuid, "user_action", "click_button")
)

# Time‑range queries
session.execute(
    "SELECT * FROM events WHERE id > maxTimeuuid(%s) AND id < minTimeuuid(%s)",
    (start_time, end_time)
)

TTL (Time To Live)

# Set TTL on insert
session.execute(
    "INSERT INTO cache (key, value) VALUES (%s, %s) USING TTL 3600",
    ("temp_key", "temp_value")
)

# Update TTL
session.execute(
    "UPDATE cache USING TTL 7200 SET value = %s WHERE key = %s",
    ("new_value", "temp_key")
)

# Check remaining TTL
row = session.execute("SELECT key, value, TTL(value) FROM cache WHERE key = %s", ("temp_key",)).one()
if row:
    print(f"TTL: {row.ttl}")

Error handling and resilience

Common exception types

from cassandra import ReadTimeout, WriteTimeout, Unavailable, InvalidRequest
from cassandra.cluster import NoHostAvailable

try:
    session.execute("SELECT * FROM users WHERE id = %s", (user_id,))
except ReadTimeout:
    print("Read timeout")
except WriteTimeout:
    print("Write timeout")
except Unavailable:
    print("Insufficient nodes for the request")
except InvalidRequest as e:
    print(f"Invalid query: {e}")
except NoHostAvailable:
    print("No available hosts to connect")

Configuring retry policies

from cassandra.policies import RetryPolicy
from cassandra.cluster import Cluster

class CustomRetryPolicy(RetryPolicy):
    def on_read_timeout(self, query, consistency, required_responses, 
                       received_responses, data_retrieved, retry_num):
        if retry_num < 3:
            return (self.RETRY, consistency)
        return (self.RETHROW, None)

cluster = Cluster(['127.0.0.1'], default_retry_policy=CustomRetryPolicy())

Integration with web frameworks

Django

# settings.py
DATABASES = {
    'default': {
        'ENGINE': 'django.db.backends.postgresql',
        # ... PostgreSQL configuration
    }
}

# Separate module for Cassandra
from cassandra.cluster import Cluster

class CassandraConnection:
    _instance = None
    
    def __new__(cls):
        if cls._instance is None:
            cls._instance = super().__new__(cls)
            cls._instance.cluster = Cluster(['127.0.0.1'])
            cls._instance.session = cls._instance.cluster.connect('my_keyspace')
        return cls._instance
    
    def get_session(self):
        return self.session

# views.py
from django.http import JsonResponse
from .cassandra_connection import CassandraConnection

def get_user_events(request, user_id):
    cassandra = CassandraConnection()
    session = cassandra.get_session()
    
    rows = session.execute(
        "SELECT * FROM user_events WHERE user_id = %s",
        (user_id,)
    )
    
    events = [{'event_type': row.event_type, 'data': row.data} for row in rows]
    return JsonResponse({'events': events})

Flask

from flask import Flask, jsonify
from cassandra.cluster import Cluster

app = Flask(__name__)

# Initialize connection
cluster = Cluster(['127.0.0.1'])
session = cluster.connect('my_keyspace')

@app.route('/users/<user_id>')
def get_user(user_id):
    try:
        row = session.execute(
            "SELECT * FROM users WHERE id = %s",
            (user_id,)
        ).one()
        
        if row:
            return jsonify({
                'id': str(row.id),
                'name': row.name,
                'email': row.email
            })
        else:
            return jsonify({'error': 'User not found'}), 404
    except Exception as e:
        return jsonify({'error': str(e)}), 500

if __name__ == '__main__':
    app.run(debug=True)

FastAPI

from fastapi import FastAPI, Depends, HTTPException
from cassandra.cluster import Cluster
from pydantic import BaseModel
import uuid

app = FastAPI()

# Data models
class User(BaseModel):
    id: str
    name: str
    email: str

class UserCreate(BaseModel):
    name: str
    email: str

# Dependency to get a Cassandra session
def get_cassandra_session():
    cluster = Cluster(['127.0.0.1'])
    session = cluster.connect('my_keyspace')
    try:
        yield session
    finally:
        cluster.shutdown()

@app.post("/users/", response_model=User)
async def create_user(user: UserCreate, session=Depends(get_cassandra_session)):
    user_id = uuid.uuid4()
    
    session.execute(
        "INSERT INTO users (id, name, email) VALUES (%s, %s, %s)",
        (user_id, user.name, user.email)
    )
    
    return User(id=str(user_id), name=user.name, email=user.email)

@app.get("/users/{user_id}", response_model=User)
async def get_user(user_id: str, session=Depends(get_cassandra_session)):
    try:
        row = session.execute(
            "SELECT * FROM users WHERE id = %s",
            (uuid.UUID(user_id),)
        ).one()
        
        if row:
            return User(id=str(row.id), name=row.name, email=row.email)
        else:
            raise HTTPException(status_code=404, detail="User not found")
    except ValueError:
        raise HTTPException(status_code=400, detail="Invalid user ID format")

Testing with cassandra-driver

Setting up a test environment

import unittest
from cassandra.cluster import Cluster
from cassandra.query import SimpleStatement

class TestCassandraIntegration(unittest.TestCase):
    @classmethod
    def setUpClass(cls):
        cls.cluster = Cluster(['127.0.0.1'])
        cls.session = cls.cluster.connect()
        
        # Create test keyspace
        cls.session.execute("""
            CREATE KEYSPACE IF NOT EXISTS test_keyspace
            WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1}
        """)
        
        cls.session.set_keyspace('test_keyspace')
        
        # Create test tables
        cls.session.execute("""
            CREATE TABLE IF NOT EXISTS test_users (
                id UUID PRIMARY KEY,
                name TEXT,
                email TEXT
            )
        """)
    
    def setUp(self):
        # Clean tables before each test
        self.session.execute("TRUNCATE test_users")
    
    def test_insert_and_select(self):
        user_id = uuid.uuid4()
        
        # Insert
        self.session.execute(
            "INSERT INTO test_users (id, name, email) VALUES (%s, %s, %s)",
            (user_id, "Test User", "test@example.com")
        )
        
        # Verify
        row = self.session.execute(
            "SELECT * FROM test_users WHERE id = %s",
            (user_id,)
        ).one()
        
        self.assertEqual(row.name, "Test User")
        self.assertEqual(row.email, "test@example.com")
    
    @classmethod
    def tearDownClass(cls):
        cls.session.execute("DROP KEYSPACE IF EXISTS test_keyspace")
        cls.cluster.shutdown()

if __name__ == '__main__':
    unittest.main()

Using Docker for testing

import docker
import time
from cassandra.cluster import Cluster

class CassandraTestContainer:
    def __init__(self):
        self.client = docker.from_env()
        self.container = None
    
    def start(self):
        self.container = self.client.containers.run(
            'cassandra:3.11',
            ports={'9042/tcp': 9042},
            environment={'CASSANDRA_START_RPC': 'true'},
            detach=True
        )
        
        # Wait for startup
        time.sleep(30)
        
        # Verify readiness
        cluster = Cluster(['127.0.0.1'])
        session = cluster.connect()
        cluster.shutdown()
    
    def stop(self):
        if self.container:
            self.container.stop()
            self.container.remove()

Monitoring and performance

Connection metrics

from cassandra.cluster import Cluster
from cassandra.metrics import Metrics

# Enable metrics
cluster = Cluster(['127.0.0.1'], metrics_enabled=True)
session = cluster.connect()

# Retrieve metrics
metrics = cluster.metrics
print(f"Requests completed: {metrics.stats.request_timer.count}")
print(f"Errors: {metrics.stats.errors}")

Query profiling

import time
from cassandra.cluster import Cluster

def profile_query(session, query, params=None):
    start_time = time.time()
    
    try:
        result = session.execute(query, params)
        rows = list(result)  # Materialize results
        
        end_time = time.time()
        execution_time = end_time - start_time
        
        print(f"Query executed in: {execution_time:.4f} seconds")
        print(f"Row count: {len(rows)}")
        
        return rows
    except Exception as e:
        print(f"Query error: {e}")
        return None

# Usage
session = Cluster(['127.0.0.1']).connect('my_keyspace')
profile_query(session, "SELECT * FROM users LIMIT 1000")

Table of cassandra-driver methods and functions

Method / Function Description Example usage
Cluster() Create a connection to a Cassandra cluster cluster = Cluster(['127.0.0.1'])
connect() Establish a session with the cluster session = cluster.connect()
execute() Synchronous query execution session.execute("SELECT * FROM users")
execute_async() Asynchronous query execution future = session.execute_async("SELECT * FROM users")
prepare() Create a prepared statement prepared = session.prepare("INSERT INTO users...")
set_keyspace() Switch to a different keyspace session.set_keyspace('my_keyspace')
shutdown() Close the cluster connection cluster.shutdown()
one() Retrieve a single row from a result set row = session.execute("SELECT...").one()
all() Retrieve all rows from a result set rows = session.execute("SELECT...").all()
uuid1() Generate a TimeUUID from cassandra.util import uuid1; id = uuid1()
uuid4() Generate a random UUID import uuid; id = uuid.uuid4()
BatchStatement() Create a batch query batch = BatchStatement()
add() Add a query to a batch batch.add(SimpleStatement("INSERT..."))
SimpleStatement() Create a simple (non‑prepared) statement stmt = SimpleStatement("SELECT * FROM users")
bind() Bind parameters to a prepared statement bound = prepared.bind((id, name))
result() Obtain the result of an asynchronous query rows = future.result()
add_callbacks() Attach success and error callbacks to an async query future.add_callbacks(success_callback, error_callback)
register_user_type() Register a custom user‑defined type session.register_user_type(keyspace, type_name, User)
row_factory Configure the row factory (e.g., dict_factory) session.row_factory = dict_factory
default_timeout Set the default request timeout (seconds) session.default_timeout = 30.0
get_hosts() Retrieve the list of cluster hosts hosts = cluster.metadata.get_hosts()
refresh_schema() Refresh metadata schema information cluster.refresh_schema_metadata()

Comparison with other databases and storage solutions

Storage Type Scaling Performance SQL support Consistency
Cassandra NoSQL (Wide‑Column) Horizontal Very high Partial (CQL) Configurable
PostgreSQL Relational Vertical Medium Full ACID
MongoDB Document Horizontal High Partial Configurable
Redis Key‑Value Horizontal Very high None Eventual
MySQL Relational Limited Medium Full ACID
ElasticSearch Full‑text search Horizontal High Partial Eventual

Best practices

Schema design

  • Model tables around specific query patterns
  • Avoid large partitions (> 100 MB)
  • Use composite keys to group related data
  • Denormalize where it improves query efficiency

Performance optimization

  • Leverage prepared statements for repetitive queries
  • Tune the connection pool size
  • Employ asynchronous operations under high load
  • Continuously monitor performance metrics

Error handling

  • Implement retry logic for transient failures
  • Use a circuit breaker to prevent cascading outages
  • Log all exceptions for later analysis

Frequently asked questions

What is cassandra-driver?

It is the official Python client for connecting to and working with Apache Cassandra. The driver offers a high‑level API for data operations, connection management, and result handling.

Does Cassandra support SQL?

No. Cassandra uses CQL (Cassandra Query Language), which resembles SQL but does not support JOINs, GROUP BY, or other complex relational operations. CQL is optimized for Cassandra’s distributed architecture.

Which data types does Cassandra support?

Cassandra supports a broad range of types: UUID, text, int, bigint, float, double, boolean, timestamp, date, time, decimal, blob, inet, as well as collections (set, list, map) and user‑defined types.

Is there asynchronous support in cassandra-driver?

Yes. The driver provides asynchronous operations via execute_async() and allows callback functions for non‑blocking result processing.

How do I implement TTL in Cassandra?

TTL (Time To Live) can be set during insert or update using the USING TTL clause, specifying the lifetime of a row in seconds. After the TTL expires, the row is automatically removed.

Is Cassandra suitable for analytics?

Yes, Cassandra excels at time‑series data, logging, and massive data ingestion. For complex analytical queries with aggregations, consider complementary tools such as Apache Spark or ClickHouse.

How can I secure my Cassandra deployment?

Use authentication, SSL/TLS encryption, prepared statements to guard against injections, enforce proper role‑based permissions, and keep the driver up to date.

Can I use an ORM with Cassandra?

Yes. The cqlengine ORM, bundled with cassandra-driver, allows object‑oriented interaction with Cassandra data.

Conclusion

Cassandra‑driver is a powerful tool for Python developers who need scalability, fault tolerance, and rapid processing of large data sets. Combined with Apache Cassandra, it delivers an enterprise‑grade solution for high‑throughput systems.

Key advantages of using cassandra-driver include easy integration, high performance, flexible configuration, and reliable operation with distributed data. Cassandra shines in write‑heavy workloads, time‑series, analytics, and IoT applications.

Thanks to its comprehensive API and extensive documentation, cassandra-driver enables straightforward, reliable, and efficient integration of Python applications with Apache Cassandra, making the development of scalable systems more accessible and manageable.

News