Introduction
Elasticsearch — a powerful distributed search engine built on Apache Lucene, designed for full‑text search, logging, monitoring, and analytics of large data volumes. The official elasticsearch-py client provides complete access to all Elasticsearch APIs from Python code.
The elasticsearch-py library enables developers to implement advanced search, aggregations, and large‑scale data processing directly in Python applications, including web services, analytics tools, and monitoring systems.
Library Overview: elasticsearch-py
elasticsearch-py is the official Python client for interacting with Elasticsearch. The library offers low‑level access to every Elasticsearch feature via the RESTful API while delivering a convenient Pythonic interface.
Key Features
- Official support: Developed and maintained by the Elastic team
- Full compatibility: Works with all Elasticsearch versions
- Automatic node discovery: Detects and connects to cluster nodes without manual configuration
- Connection pooling: Efficiently manages connections to the cluster
- Error handling: Built‑in exception management
- Serialization: Automatic conversion of Python objects to JSON
Installation and Cluster Connection
Installation
pip install elasticsearch
For SSL support:
pip install elasticsearch[async]
Connecting to a Local Cluster
from elasticsearch import Elasticsearch
es = Elasticsearch("http://localhost:9200")
Connecting to a Remote Cluster with Basic Authentication
es = Elasticsearch(
"https://your-cluster:9200",
basic_auth=("user", "password"),
verify_certs=True
)
Connecting Using API Keys
es = Elasticsearch(
"https://your-cluster:9200",
api_key="your_api_key",
verify_certs=True
)
Connecting to Multiple Nodes
es = Elasticsearch(
[
"https://node1:9200",
"https://node2:9200",
"https://node3:9200"
],
basic_auth=("user", "password")
)
Core Elasticsearch Concepts
- Index — the equivalent of a table in relational databases; stores documents with similar structure
- Document — a JSON object stored in an index, analogous to a row
- Field — an attribute of a document, similar to a column
- Mapping — definition of a document’s structure and field types
- Shard — the physical storage unit of an index
- Replica — a copy of a shard that provides high availability
Index Management
Creating Indices and Mappings
mapping = {
"mappings": {
"properties": {
"title": {"type": "text", "analyzer": "russian"},
"tags": {"type": "keyword"},
"published": {"type": "date"},
"content": {"type": "text"},
"author": {"type": "keyword"},
"views": {"type": "integer"}
}
},
"settings": {
"number_of_shards": 1,
"number_of_replicas": 0
}
}
es.indices.create(index="articles", body=mapping)
Deleting an Index
es.indices.delete(index="articles")
Checking Index Existence
if es.indices.exists(index="articles"):
print("Index exists")
Retrieving Index Information
index_info = es.indices.get(index="articles")
print(index_info)
Updating a Mapping
new_mapping = {
"properties": {
"category": {"type": "keyword"}
}
}
es.indices.put_mapping(index="articles", body=new_mapping)
Document Operations
Indexing a Document
doc = {
"title": "Elasticsearch for Beginners",
"tags": ["search", "data"],
"published": "2024-01-01",
"content": "Comprehensive guide to working with Elasticsearch",
"author": "Ivan Ivanov",
"views": 150
}
es.index(index="articles", id=1, document=doc)
Creating a Document (if it does not exist)
try:
es.create(index="articles", id=2, document=doc)
except Exception as e:
print(f"Document already exists: {e}")
Retrieving a Document
doc = es.get(index="articles", id=1)
print(doc["_source"])
Updating a Document
# Partial update
es.update(index="articles", id=1, doc={"title": "Updated Title"})
# Scripted update
es.update(
index="articles",
id=1,
script={
"source": "ctx._source.views += params.increment",
"params": {"increment": 10}
}
)
Deleting a Document
es.delete(index="articles", id=1)
Checking Document Existence
if es.exists(index="articles", id=1):
print("Document exists")
Search and Filtering
Simple Search
query = {
"query": {
"match": {
"title": "Elasticsearch"
}
}
}
response = es.search(index="articles", body=query)
Multi‑Field Search
query = {
"query": {
"multi_match": {
"query": "search data",
"fields": ["title", "content"]
}
}
}
Filtering and Boolean Queries
query = {
"query": {
"bool": {
"must": [
{"match": {"title": "search"}}
],
"filter": [
{"term": {"tags": "data"}},
{"range": {"published": {"gte": "2024-01-01"}}}
],
"must_not": [
{"term": {"author": "anonymous"}}
]
}
}
}
Term Query (Exact Match)
query = {
"query": {
"term": {
"tags": "data"
}
}
}
Phrase Search
query = {
"query": {
"match_phrase": {
"title": "search data"
}
}
}
Wildcard Search
query = {
"query": {
"wildcard": {
"title": "*search*"
}
}
}
Pagination
response = es.search(
index="articles",
body=query,
from_=0,
size=10
)
Sorting Results
query = {
"query": {"match_all": {}},
"sort": [
{"published": {"order": "desc"}},
{"views": {"order": "desc"}}
]
}
Analyzers and Full‑Text Search
Creating an Index with a Custom Analyzer
settings = {
"settings": {
"analysis": {
"analyzer": {
"russian_analyzer": {
"type": "custom",
"tokenizer": "standard",
"filter": [
"lowercase",
"russian_stop",
"russian_stemmer"
]
}
},
"filter": {
"russian_stop": {
"type": "stop",
"stopwords": "_russian_"
},
"russian_stemmer": {
"type": "stemmer",
"language": "russian"
}
}
}
},
"mappings": {
"properties": {
"title": {
"type": "text",
"analyzer": "russian_analyzer"
}
}
}
}
es.indices.create(index="articles", body=settings)
Fuzzy Search
query = {
"query": {
"fuzzy": {
"title": {
"value": "поисс",
"fuzziness": "AUTO"
}
}
}
}
Autocomplete Search
query = {
"query": {
"match_phrase_prefix": {
"title": "search da"
}
}
}
Aggregations and Analytics
Terms Aggregation
query = {
"size": 0,
"aggs": {
"tags_count": {
"terms": {
"field": "tags",
"size": 10
}
}
}
}
Statistical Aggregation
query = {
"size": 0,
"aggs": {
"views_stats": {
"stats": {
"field": "views"
}
}
}
}
Date Histogram
query = {
"size": 0,
"aggs": {
"articles_over_time": {
"date_histogram": {
"field": "published",
"calendar_interval": "month"
}
}
}
}
Nested Aggregations
query = {
"size": 0,
"aggs": {
"authors": {
"terms": {
"field": "author"
},
"aggs": {
"avg_views": {
"avg": {
"field": "views"
}
}
}
}
}
}
Bulk API Operations
Bulk Data Indexing
from elasticsearch.helpers import bulk
actions = [
{
"_index": "articles",
"_id": i,
"_source": {
"title": f"Article {i}",
"content": f"Content of article {i}",
"published": "2024-01-01"
}
}
for i in range(1000)
]
bulk(es, actions)
Bulk Updates
actions = [
{
"_op_type": "update",
"_index": "articles",
"_id": i,
"_source": {
"doc": {"views": i * 10}
}
}
for i in range(1, 100)
]
bulk(es, actions)
Streaming Bulk Loading
from elasticsearch.helpers import streaming_bulk
def generate_docs():
for i in range(10000):
yield {
"_index": "articles",
"_id": i,
"_source": {
"title": f"Article {i}",
"content": f"Content {i}"
}
}
for success, info in streaming_bulk(es, generate_docs(), chunk_size=100):
if not success:
print(f"Error: {info}")
Scroll API for Large Result Sets
Exporting Data with Scroll
def scroll_documents(index, query, scroll_size=1000):
data = es.search(
index=index,
body=query,
scroll='2m',
size=scroll_size
)
scroll_id = data['_scroll_id']
scroll_size = len(data['hits']['hits'])
while scroll_size > 0:
# Process current batch
for hit in data['hits']['hits']:
yield hit['_source']
# Fetch next batch
data = es.scroll(scroll_id=scroll_id, scroll='2m')
scroll_id = data['_scroll_id']
scroll_size = len(data['hits']['hits'])
# Clean up scroll context
es.clear_scroll(scroll_id=scroll_id)
# Example usage
query = {"query": {"match_all": {}}}
for doc in scroll_documents("articles", query):
print(doc)
Error Handling and Exceptions
from elasticsearch.exceptions import (
NotFoundError,
RequestError,
ConflictError,
ConnectionError,
TransportError
)
try:
es.get(index="articles", id=999)
except NotFoundError:
print("Document not found")
except RequestError as e:
print(f"Request error: {e}")
except ConnectionError:
print("Connection error with Elasticsearch")
except TransportError as e:
print(f"Transport error: {e}")
Cluster Monitoring and Statistics
Retrieving Cluster Information
# Cluster health
cluster_health = es.cluster.health()
print(f"Cluster status: {cluster_health['status']}")
# Node statistics
nodes_stats = es.nodes.stats()
print(f"Number of nodes: {len(nodes_stats['nodes'])}")
# Index statistics
indices_stats = es.indices.stats()
print(f"Total document count: {indices_stats['_all']['total']['docs']['count']}")
Integration with Popular Frameworks
Integration with Django
# settings.py
ELASTICSEARCH_DSL = {
'default': {
'hosts': 'localhost:9200'
},
}
# models.py
from django.db import models
from elasticsearch_dsl import Document, Text, Keyword, Date
class Article(models.Model):
title = models.CharField(max_length=200)
content = models.TextField()
published = models.DateTimeField()
class ArticleDocument(Document):
title = Text()
content = Text()
published = Date()
class Index:
name = 'articles'
def save(self, **kwargs):
return super().save(**kwargs)
Integration with FastAPI
from fastapi import FastAPI, Depends
from elasticsearch import Elasticsearch
app = FastAPI()
def get_es():
return Elasticsearch("http://localhost:9200")
@app.get("/search/")
async def search_articles(q: str, es: Elasticsearch = Depends(get_es)):
query = {
"query": {
"multi_match": {
"query": q,
"fields": ["title", "content"]
}
}
}
response = es.search(index="articles", body=query)
return response["hits"]["hits"]
Integration with Flask
from flask import Flask, request, jsonify
from elasticsearch import Elasticsearch
app = Flask(__name__)
es = Elasticsearch("http://localhost:9200")
@app.route('/search')
def search():
query = request.args.get('q', '')
search_query = {
"query": {
"multi_match": {
"query": query,
"fields": ["title", "content"]
}
}
}
response = es.search(index="articles", body=search_query)
return jsonify(response["hits"]["hits"])
Integration with Pandas and Data Analysis
import pandas as pd
import matplotlib.pyplot as plt
# Export data to a DataFrame
def es_to_dataframe(index, query):
response = es.search(index=index, body=query, size=10000)
docs = [hit["_source"] for hit in response["hits"]["hits"]]
return pd.DataFrame(docs)
# Example usage
query = {"query": {"match_all": {}}}
df = es_to_dataframe("articles", query)
# Data analysis
print(df.describe())
print(df.groupby('author')['views'].mean())
# Visualization
df.groupby('author')['views'].sum().plot(kind='bar')
plt.show()
Comprehensive Method Reference for elasticsearch-py
| Category | Method | Description | Example |
|---|---|---|---|
| Connection | Elasticsearch() |
Create a client instance | es = Elasticsearch("http://localhost:9200") |
| Documents | index() |
Index a document | es.index(index="test", id=1, document=doc) |
create() |
Create a document | es.create(index="test", id=1, document=doc) |
|
get() |
Retrieve a document | es.get(index="test", id=1) |
|
get_source() |
Fetch only the source | es.get_source(index="test", id=1) |
|
exists() |
Check existence | es.exists(index="test", id=1) |
|
update() |
Update a document | es.update(index="test", id=1, doc={"field": "value"}) |
|
delete() |
Delete a document | es.delete(index="test", id=1) |
|
update_by_query() |
Update by query | es.update_by_query(index="test", body=query) |
|
delete_by_query() |
Delete by query | es.delete_by_query(index="test", body=query) |
|
| Search | search() |
Search documents | es.search(index="test", body=query) |
msearch() |
Multi‑search | es.msearch(body=queries) |
|
count() |
Count documents | es.count(index="test", body=query) |
|
scroll() |
Scroll through results | es.scroll(scroll_id=scroll_id, scroll="2m") |
|
clear_scroll() |
Clear scroll context | es.clear_scroll(scroll_id=scroll_id) |
|
| Indices | indices.create() |
Create an index | es.indices.create(index="test", body=settings) |
indices.delete() |
Delete an index | es.indices.delete(index="test") |
|
indices.exists() |
Check index existence | es.indices.exists(index="test") |
|
indices.get() |
Retrieve index settings | es.indices.get(index="test") |
|
indices.put_settings() |
Update index settings | es.indices.put_settings(index="test", body=settings) |
|
indices.get_settings() |
Get index settings | es.indices.get_settings(index="test") |
|
indices.put_mapping() |
Update mapping | es.indices.put_mapping(index="test", body=mapping) |
|
indices.get_mapping() |
Retrieve mapping | es.indices.get_mapping(index="test") |
|
indices.refresh() |
Refresh an index | es.indices.refresh(index="test") |
|
indices.flush() |
Flush to disk | es.indices.flush(index="test") |
|
indices.close() |
Close an index | es.indices.close(index="test") |
|
indices.open() |
Open an index | es.indices.open(index="test") |
|
| Cluster | cluster.health() |
Cluster health status | es.cluster.health() |
cluster.stats() |
Cluster statistics | es.cluster.stats() |
|
cluster.state() |
Cluster state | es.cluster.state() |
|
cluster.pending_tasks() |
Pending tasks | es.cluster.pending_tasks() |
|
| Nodes | nodes.info() |
Node information | es.nodes.info() |
nodes.stats() |
Node statistics | es.nodes.stats() |
|
nodes.hot_threads() |
Hot threads | es.nodes.hot_threads() |
|
| Bulk Operations | bulk() |
Bulk request | es.bulk(body=actions) |
mget() |
Multi‑get documents | es.mget(body={"docs": docs}) |
|
| Helpers | helpers.bulk() |
Simplified bulk operations | bulk(es, actions) |
helpers.streaming_bulk() |
Streaming bulk | streaming_bulk(es, actions) |
|
helpers.parallel_bulk() |
Parallel bulk | parallel_bulk(es, actions) |
|
helpers.scan() |
Scan results | scan(es, query=query, index="test") |
|
helpers.reindex() |
Reindex data | reindex(es, source_index="old", target_index="new") |
|
| Templates | indices.put_template() |
Create a template | es.indices.put_template(name="template", body=template) |
indices.get_template() |
Retrieve a template | es.indices.get_template(name="template") |
|
indices.delete_template() |
Delete a template | es.indices.delete_template(name="template") |
|
| Alias | indices.put_alias() |
Create an alias | es.indices.put_alias(index="test", name="alias") |
indices.get_alias() |
Get an alias | es.indices.get_alias(name="alias") |
|
indices.delete_alias() |
Delete an alias | es.indices.delete_alias(index="test", name="alias") |
Security and Authentication
Configuring SSL/TLS
es = Elasticsearch(
"https://localhost:9200",
ca_certs="/path/to/ca.crt",
client_cert="/path/to/client.crt",
client_key="/path/to/client.key",
verify_certs=True
)
Using API Keys
es = Elasticsearch(
"https://localhost:9200",
api_key=("api_key_id", "api_key_secret"),
verify_certs=True
)
Setting Up Roles and Permissions
# Create a role
role_body = {
"cluster": ["monitor"],
"indices": [
{
"names": ["articles*"],
"privileges": ["read", "write", "create_index"]
}
]
}
es.security.put_role(name="article_writer", body=role_body)
Testing with elasticsearch-py
Test Environment Setup
import pytest
from elasticsearch import Elasticsearch
from elasticsearch.helpers import bulk
@pytest.fixture(scope="session")
def es_client():
return Elasticsearch("http://localhost:9200")
@pytest.fixture(scope="function")
def test_index(es_client):
index_name = "test_articles"
# Create a test index
es_client.indices.create(index=index_name, ignore=400)
yield index_name
# Clean up after test
es_client.indices.delete(index=index_name, ignore=[400, 404])
def test_document_indexing(es_client, test_index):
doc = {"title": "Test Article", "content": "Test content"}
response = es_client.index(index=test_index, id=1, document=doc)
assert response["result"] == "created"
# Refresh for immediate search
es_client.indices.refresh(index=test_index)
# Verify document is searchable
search_result = es_client.search(
index=test_index,
body={"query": {"match": {"title": "Test"}}}
)
assert search_result["hits"]["total"]["value"] == 1
Using Docker for Tests
# docker-compose.test.yml
version: '3.8'
services:
elasticsearch:
image: elasticsearch:8.5.0
environment:
- discovery.type=single-node
- xpack.security.enabled=false
ports:
- "9200:9200"
mem_limit: 1g
Performance Optimization
Connection Pool Settings
es = Elasticsearch(
"http://localhost:9200",
max_retries=3,
retry_on_timeout=True,
timeout=30,
maxsize=25
)
Using a Custom Connection Pool
from elasticsearch import Elasticsearch
from urllib3 import HTTPSConnectionPool
es = Elasticsearch(
"https://localhost:9200",
connection_class=HTTPSConnectionPool,
maxsize=20
)
Asynchronous Client
from elasticsearch import AsyncElasticsearch
import asyncio
async def async_search():
es = AsyncElasticsearch("http://localhost:9200")
response = await es.search(
index="articles",
body={"query": {"match_all": {}}}
)
await es.close()
return response
# Run the async function
asyncio.run(async_search())
Comparison with Alternative Solutions
| Solution | Search | Aggregations | Speed | Scalability | Integration |
|---|---|---|---|---|---|
| Elasticsearch | Excellent | Excellent | Very high | Excellent | Broad |
| PostgreSQL FTS | Good | Limited | Medium | Good | Standard |
| Solr | Excellent | Good | High | Good | Good |
| Typesense | Good | Partial | Very high | Good | REST API |
| Meilisearch | Good | Limited | Very high | Limited | Simple |
| Sphinx | Good | Basic | High | Medium | Limited |
Monitoring and Debugging
Request Logging
import logging
# Enable debug logging
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger('elasticsearch')
logger.setLevel(logging.DEBUG)
# Create client with logging enabled
es = Elasticsearch(
"http://localhost:9200",
logging=True
)
Query Profiling
query = {
"profile": True,
"query": {
"match": {
"title": "elasticsearch"
}
}
}
response = es.search(index="articles", body=query)
print(response["profile"])
Frequently Asked Questions
What is elasticsearch-py?
elasticsearch-py is the official Python client for Elasticsearch. It provides full access to the Elasticsearch REST API through a convenient Python interface.
Can Elasticsearch be used as a primary database?
Elasticsearch is not recommended as a primary database for mission‑critical data. It excels at search, analytics, and temporary data storage.
Does elasticsearch-py support cluster operations?
Yes. The client automatically discovers cluster nodes, balances load, and provides fault tolerance when individual nodes become unavailable.
How can I efficiently load massive data volumes?
Use the Bulk API via the bulk() helper function. It batches many operations into a single HTTP request for high throughput.
How do I perform complex aggregations?
Aggregations are defined under the aggs key in the request body. Elasticsearch supports a wide range of aggregation types: terms, stats, date_histogram, nested, and more.
Is Elasticsearch suitable for autocomplete?
Absolutely. Implement autocomplete with match_phrase_prefix, the completion suggester, or the search_as_you_type field type.
How can I secure my Elasticsearch deployment?
Use HTTPS connections, enable authentication (basic auth or API keys), restrict access by IP, and configure role‑based access control.
What alternatives exist to elasticsearch-py?
High‑level libraries such as elasticsearch-dsl-py simplify query building, but elasticsearch-py remains the core official client.
How do I monitor query performance?
Enable the profile: true parameter in queries, turn on client logging, and track cluster metrics via the stats APIs.
Is elasticsearch-py production‑ready?
Yes. The client is stable for production use. Ensure proper connection pooling, robust error handling, and comprehensive monitoring.
The Future of AI in Mathematics and Everyday Life: How Intelligent Agents Are Already Changing the Game
Experts warned about the risks of fake charity with AI
In Russia, universal AI-agent for robots and industrial processes was developed