Elasticsearch-Py-Work with Elasticsearch

онлайн тренажер по питону
Online Python Trainer for Beginners

Learn Python easily without overwhelming theory. Solve practical tasks with automatic checking, get hints in Russian, and write code directly in your browser — no installation required.

Start Course

Introduction

Elasticsearch — a powerful distributed search engine built on Apache Lucene, designed for full‑text search, logging, monitoring, and analytics of large data volumes. The official elasticsearch-py client provides complete access to all Elasticsearch APIs from Python code.

The elasticsearch-py library enables developers to implement advanced search, aggregations, and large‑scale data processing directly in Python applications, including web services, analytics tools, and monitoring systems.

Library Overview: elasticsearch-py

elasticsearch-py is the official Python client for interacting with Elasticsearch. The library offers low‑level access to every Elasticsearch feature via the RESTful API while delivering a convenient Pythonic interface.

Key Features

  • Official support: Developed and maintained by the Elastic team
  • Full compatibility: Works with all Elasticsearch versions
  • Automatic node discovery: Detects and connects to cluster nodes without manual configuration
  • Connection pooling: Efficiently manages connections to the cluster
  • Error handling: Built‑in exception management
  • Serialization: Automatic conversion of Python objects to JSON

Installation and Cluster Connection

Installation

pip install elasticsearch

For SSL support:

pip install elasticsearch[async]

Connecting to a Local Cluster

from elasticsearch import Elasticsearch

es = Elasticsearch("http://localhost:9200")

Connecting to a Remote Cluster with Basic Authentication

es = Elasticsearch(
    "https://your-cluster:9200",
    basic_auth=("user", "password"),
    verify_certs=True
)

Connecting Using API Keys

es = Elasticsearch(
    "https://your-cluster:9200",
    api_key="your_api_key",
    verify_certs=True
)

Connecting to Multiple Nodes

es = Elasticsearch(
    [
        "https://node1:9200",
        "https://node2:9200",
        "https://node3:9200"
    ],
    basic_auth=("user", "password")
)

Core Elasticsearch Concepts

  • Index — the equivalent of a table in relational databases; stores documents with similar structure
  • Document — a JSON object stored in an index, analogous to a row
  • Field — an attribute of a document, similar to a column
  • Mapping — definition of a document’s structure and field types
  • Shard — the physical storage unit of an index
  • Replica — a copy of a shard that provides high availability

Index Management

Creating Indices and Mappings

mapping = {
    "mappings": {
        "properties": {
            "title": {"type": "text", "analyzer": "russian"},
            "tags": {"type": "keyword"},
            "published": {"type": "date"},
            "content": {"type": "text"},
            "author": {"type": "keyword"},
            "views": {"type": "integer"}
        }
    },
    "settings": {
        "number_of_shards": 1,
        "number_of_replicas": 0
    }
}

es.indices.create(index="articles", body=mapping)

Deleting an Index

es.indices.delete(index="articles")

Checking Index Existence

if es.indices.exists(index="articles"):
    print("Index exists")

Retrieving Index Information

index_info = es.indices.get(index="articles")
print(index_info)

Updating a Mapping

new_mapping = {
    "properties": {
        "category": {"type": "keyword"}
    }
}

es.indices.put_mapping(index="articles", body=new_mapping)

Document Operations

Indexing a Document

doc = {
    "title": "Elasticsearch for Beginners",
    "tags": ["search", "data"],
    "published": "2024-01-01",
    "content": "Comprehensive guide to working with Elasticsearch",
    "author": "Ivan Ivanov",
    "views": 150
}

es.index(index="articles", id=1, document=doc)

Creating a Document (if it does not exist)

try:
    es.create(index="articles", id=2, document=doc)
except Exception as e:
    print(f"Document already exists: {e}")

Retrieving a Document

doc = es.get(index="articles", id=1)
print(doc["_source"])

Updating a Document

# Partial update
es.update(index="articles", id=1, doc={"title": "Updated Title"})

# Scripted update
es.update(
    index="articles",
    id=1,
    script={
        "source": "ctx._source.views += params.increment",
        "params": {"increment": 10}
    }
)

Deleting a Document

es.delete(index="articles", id=1)

Checking Document Existence

if es.exists(index="articles", id=1):
    print("Document exists")

Search and Filtering

Simple Search

query = {
    "query": {
        "match": {
            "title": "Elasticsearch"
        }
    }
}

response = es.search(index="articles", body=query)

Multi‑Field Search

query = {
    "query": {
        "multi_match": {
            "query": "search data",
            "fields": ["title", "content"]
        }
    }
}

Filtering and Boolean Queries

query = {
    "query": {
        "bool": {
            "must": [
                {"match": {"title": "search"}}
            ],
            "filter": [
                {"term": {"tags": "data"}},
                {"range": {"published": {"gte": "2024-01-01"}}}
            ],
            "must_not": [
                {"term": {"author": "anonymous"}}
            ]
        }
    }
}

Term Query (Exact Match)

query = {
    "query": {
        "term": {
            "tags": "data"
        }
    }
}

Phrase Search

query = {
    "query": {
        "match_phrase": {
            "title": "search data"
        }
    }
}

Wildcard Search

query = {
    "query": {
        "wildcard": {
            "title": "*search*"
        }
    }
}

Pagination

response = es.search(
    index="articles",
    body=query,
    from_=0,
    size=10
)

Sorting Results

query = {
    "query": {"match_all": {}},
    "sort": [
        {"published": {"order": "desc"}},
        {"views": {"order": "desc"}}
    ]
}

Analyzers and Full‑Text Search

Creating an Index with a Custom Analyzer

settings = {
    "settings": {
        "analysis": {
            "analyzer": {
                "russian_analyzer": {
                    "type": "custom",
                    "tokenizer": "standard",
                    "filter": [
                        "lowercase",
                        "russian_stop",
                        "russian_stemmer"
                    ]
                }
            },
            "filter": {
                "russian_stop": {
                    "type": "stop",
                    "stopwords": "_russian_"
                },
                "russian_stemmer": {
                    "type": "stemmer",
                    "language": "russian"
                }
            }
        }
    },
    "mappings": {
        "properties": {
            "title": {
                "type": "text",
                "analyzer": "russian_analyzer"
            }
        }
    }
}

es.indices.create(index="articles", body=settings)

Fuzzy Search

query = {
    "query": {
        "fuzzy": {
            "title": {
                "value": "поисс",
                "fuzziness": "AUTO"
            }
        }
    }
}

Autocomplete Search

query = {
    "query": {
        "match_phrase_prefix": {
            "title": "search da"
        }
    }
}

Aggregations and Analytics

Terms Aggregation

query = {
    "size": 0,
    "aggs": {
        "tags_count": {
            "terms": {
                "field": "tags",
                "size": 10
            }
        }
    }
}

Statistical Aggregation

query = {
    "size": 0,
    "aggs": {
        "views_stats": {
            "stats": {
                "field": "views"
            }
        }
    }
}

Date Histogram

query = {
    "size": 0,
    "aggs": {
        "articles_over_time": {
            "date_histogram": {
                "field": "published",
                "calendar_interval": "month"
            }
        }
    }
}

Nested Aggregations

query = {
    "size": 0,
    "aggs": {
        "authors": {
            "terms": {
                "field": "author"
            },
            "aggs": {
                "avg_views": {
                    "avg": {
                        "field": "views"
                    }
                }
            }
        }
    }
}

Bulk API Operations

Bulk Data Indexing

from elasticsearch.helpers import bulk

actions = [
    {
        "_index": "articles",
        "_id": i,
        "_source": {
            "title": f"Article {i}",
            "content": f"Content of article {i}",
            "published": "2024-01-01"
        }
    }
    for i in range(1000)
]

bulk(es, actions)

Bulk Updates

actions = [
    {
        "_op_type": "update",
        "_index": "articles",
        "_id": i,
        "_source": {
            "doc": {"views": i * 10}
        }
    }
    for i in range(1, 100)
]

bulk(es, actions)

Streaming Bulk Loading

from elasticsearch.helpers import streaming_bulk

def generate_docs():
    for i in range(10000):
        yield {
            "_index": "articles",
            "_id": i,
            "_source": {
                "title": f"Article {i}",
                "content": f"Content {i}"
            }
        }

for success, info in streaming_bulk(es, generate_docs(), chunk_size=100):
    if not success:
        print(f"Error: {info}")

Scroll API for Large Result Sets

Exporting Data with Scroll

def scroll_documents(index, query, scroll_size=1000):
    data = es.search(
        index=index,
        body=query,
        scroll='2m',
        size=scroll_size
    )
    
    scroll_id = data['_scroll_id']
    scroll_size = len(data['hits']['hits'])
    
    while scroll_size > 0:
        # Process current batch
        for hit in data['hits']['hits']:
            yield hit['_source']
        
        # Fetch next batch
        data = es.scroll(scroll_id=scroll_id, scroll='2m')
        scroll_id = data['_scroll_id']
        scroll_size = len(data['hits']['hits'])
    
    # Clean up scroll context
    es.clear_scroll(scroll_id=scroll_id)

# Example usage
query = {"query": {"match_all": {}}}
for doc in scroll_documents("articles", query):
    print(doc)

Error Handling and Exceptions

from elasticsearch.exceptions import (
    NotFoundError,
    RequestError,
    ConflictError,
    ConnectionError,
    TransportError
)

try:
    es.get(index="articles", id=999)
except NotFoundError:
    print("Document not found")
except RequestError as e:
    print(f"Request error: {e}")
except ConnectionError:
    print("Connection error with Elasticsearch")
except TransportError as e:
    print(f"Transport error: {e}")

Cluster Monitoring and Statistics

Retrieving Cluster Information

# Cluster health
cluster_health = es.cluster.health()
print(f"Cluster status: {cluster_health['status']}")

# Node statistics
nodes_stats = es.nodes.stats()
print(f"Number of nodes: {len(nodes_stats['nodes'])}")

# Index statistics
indices_stats = es.indices.stats()
print(f"Total document count: {indices_stats['_all']['total']['docs']['count']}")

Integration with Popular Frameworks

Integration with Django

# settings.py
ELASTICSEARCH_DSL = {
    'default': {
        'hosts': 'localhost:9200'
    },
}

# models.py
from django.db import models
from elasticsearch_dsl import Document, Text, Keyword, Date

class Article(models.Model):
    title = models.CharField(max_length=200)
    content = models.TextField()
    published = models.DateTimeField()

class ArticleDocument(Document):
    title = Text()
    content = Text()
    published = Date()
    
    class Index:
        name = 'articles'
    
    def save(self, **kwargs):
        return super().save(**kwargs)

Integration with FastAPI

from fastapi import FastAPI, Depends
from elasticsearch import Elasticsearch

app = FastAPI()

def get_es():
    return Elasticsearch("http://localhost:9200")

@app.get("/search/")
async def search_articles(q: str, es: Elasticsearch = Depends(get_es)):
    query = {
        "query": {
            "multi_match": {
                "query": q,
                "fields": ["title", "content"]
            }
        }
    }
    
    response = es.search(index="articles", body=query)
    return response["hits"]["hits"]

Integration with Flask

from flask import Flask, request, jsonify
from elasticsearch import Elasticsearch

app = Flask(__name__)
es = Elasticsearch("http://localhost:9200")

@app.route('/search')
def search():
    query = request.args.get('q', '')
    
    search_query = {
        "query": {
            "multi_match": {
                "query": query,
                "fields": ["title", "content"]
            }
        }
    }
    
    response = es.search(index="articles", body=search_query)
    return jsonify(response["hits"]["hits"])

Integration with Pandas and Data Analysis

import pandas as pd
import matplotlib.pyplot as plt

# Export data to a DataFrame
def es_to_dataframe(index, query):
    response = es.search(index=index, body=query, size=10000)
    docs = [hit["_source"] for hit in response["hits"]["hits"]]
    return pd.DataFrame(docs)

# Example usage
query = {"query": {"match_all": {}}}
df = es_to_dataframe("articles", query)

# Data analysis
print(df.describe())
print(df.groupby('author')['views'].mean())

# Visualization
df.groupby('author')['views'].sum().plot(kind='bar')
plt.show()

Comprehensive Method Reference for elasticsearch-py

Category Method Description Example
Connection Elasticsearch() Create a client instance es = Elasticsearch("http://localhost:9200")
Documents index() Index a document es.index(index="test", id=1, document=doc)
  create() Create a document es.create(index="test", id=1, document=doc)
  get() Retrieve a document es.get(index="test", id=1)
  get_source() Fetch only the source es.get_source(index="test", id=1)
  exists() Check existence es.exists(index="test", id=1)
  update() Update a document es.update(index="test", id=1, doc={"field": "value"})
  delete() Delete a document es.delete(index="test", id=1)
  update_by_query() Update by query es.update_by_query(index="test", body=query)
  delete_by_query() Delete by query es.delete_by_query(index="test", body=query)
Search search() Search documents es.search(index="test", body=query)
  msearch() Multi‑search es.msearch(body=queries)
  count() Count documents es.count(index="test", body=query)
  scroll() Scroll through results es.scroll(scroll_id=scroll_id, scroll="2m")
  clear_scroll() Clear scroll context es.clear_scroll(scroll_id=scroll_id)
Indices indices.create() Create an index es.indices.create(index="test", body=settings)
  indices.delete() Delete an index es.indices.delete(index="test")
  indices.exists() Check index existence es.indices.exists(index="test")
  indices.get() Retrieve index settings es.indices.get(index="test")
  indices.put_settings() Update index settings es.indices.put_settings(index="test", body=settings)
  indices.get_settings() Get index settings es.indices.get_settings(index="test")
  indices.put_mapping() Update mapping es.indices.put_mapping(index="test", body=mapping)
  indices.get_mapping() Retrieve mapping es.indices.get_mapping(index="test")
  indices.refresh() Refresh an index es.indices.refresh(index="test")
  indices.flush() Flush to disk es.indices.flush(index="test")
  indices.close() Close an index es.indices.close(index="test")
  indices.open() Open an index es.indices.open(index="test")
Cluster cluster.health() Cluster health status es.cluster.health()
  cluster.stats() Cluster statistics es.cluster.stats()
  cluster.state() Cluster state es.cluster.state()
  cluster.pending_tasks() Pending tasks es.cluster.pending_tasks()
Nodes nodes.info() Node information es.nodes.info()
  nodes.stats() Node statistics es.nodes.stats()
  nodes.hot_threads() Hot threads es.nodes.hot_threads()
Bulk Operations bulk() Bulk request es.bulk(body=actions)
  mget() Multi‑get documents es.mget(body={"docs": docs})
Helpers helpers.bulk() Simplified bulk operations bulk(es, actions)
  helpers.streaming_bulk() Streaming bulk streaming_bulk(es, actions)
  helpers.parallel_bulk() Parallel bulk parallel_bulk(es, actions)
  helpers.scan() Scan results scan(es, query=query, index="test")
  helpers.reindex() Reindex data reindex(es, source_index="old", target_index="new")
Templates indices.put_template() Create a template es.indices.put_template(name="template", body=template)
  indices.get_template() Retrieve a template es.indices.get_template(name="template")
  indices.delete_template() Delete a template es.indices.delete_template(name="template")
Alias indices.put_alias() Create an alias es.indices.put_alias(index="test", name="alias")
  indices.get_alias() Get an alias es.indices.get_alias(name="alias")
  indices.delete_alias() Delete an alias es.indices.delete_alias(index="test", name="alias")

Security and Authentication

Configuring SSL/TLS

es = Elasticsearch(
    "https://localhost:9200",
    ca_certs="/path/to/ca.crt",
    client_cert="/path/to/client.crt",
    client_key="/path/to/client.key",
    verify_certs=True
)

Using API Keys

es = Elasticsearch(
    "https://localhost:9200",
    api_key=("api_key_id", "api_key_secret"),
    verify_certs=True
)

Setting Up Roles and Permissions

# Create a role
role_body = {
    "cluster": ["monitor"],
    "indices": [
        {
            "names": ["articles*"],
            "privileges": ["read", "write", "create_index"]
        }
    ]
}

es.security.put_role(name="article_writer", body=role_body)

Testing with elasticsearch-py

Test Environment Setup

import pytest
from elasticsearch import Elasticsearch
from elasticsearch.helpers import bulk

@pytest.fixture(scope="session")
def es_client():
    return Elasticsearch("http://localhost:9200")

@pytest.fixture(scope="function")
def test_index(es_client):
    index_name = "test_articles"
    
    # Create a test index
    es_client.indices.create(index=index_name, ignore=400)
    
    yield index_name
    
    # Clean up after test
    es_client.indices.delete(index=index_name, ignore=[400, 404])

def test_document_indexing(es_client, test_index):
    doc = {"title": "Test Article", "content": "Test content"}
    
    response = es_client.index(index=test_index, id=1, document=doc)
    assert response["result"] == "created"
    
    # Refresh for immediate search
    es_client.indices.refresh(index=test_index)
    
    # Verify document is searchable
    search_result = es_client.search(
        index=test_index,
        body={"query": {"match": {"title": "Test"}}}
    )
    assert search_result["hits"]["total"]["value"] == 1

Using Docker for Tests

# docker-compose.test.yml
version: '3.8'
services:
  elasticsearch:
    image: elasticsearch:8.5.0
    environment:
      - discovery.type=single-node
      - xpack.security.enabled=false
    ports:
      - "9200:9200"
    mem_limit: 1g

Performance Optimization

Connection Pool Settings

es = Elasticsearch(
    "http://localhost:9200",
    max_retries=3,
    retry_on_timeout=True,
    timeout=30,
    maxsize=25
)

Using a Custom Connection Pool

from elasticsearch import Elasticsearch
from urllib3 import HTTPSConnectionPool

es = Elasticsearch(
    "https://localhost:9200",
    connection_class=HTTPSConnectionPool,
    maxsize=20
)

Asynchronous Client

from elasticsearch import AsyncElasticsearch
import asyncio

async def async_search():
    es = AsyncElasticsearch("http://localhost:9200")
    
    response = await es.search(
        index="articles",
        body={"query": {"match_all": {}}}
    )
    
    await es.close()
    return response

# Run the async function
asyncio.run(async_search())

Comparison with Alternative Solutions

Solution Search Aggregations Speed Scalability Integration
Elasticsearch Excellent Excellent Very high Excellent Broad
PostgreSQL FTS Good Limited Medium Good Standard
Solr Excellent Good High Good Good
Typesense Good Partial Very high Good REST API
Meilisearch Good Limited Very high Limited Simple
Sphinx Good Basic High Medium Limited

Monitoring and Debugging

Request Logging

import logging

# Enable debug logging
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger('elasticsearch')
logger.setLevel(logging.DEBUG)

# Create client with logging enabled
es = Elasticsearch(
    "http://localhost:9200",
    logging=True
)

Query Profiling

query = {
    "profile": True,
    "query": {
        "match": {
            "title": "elasticsearch"
        }
    }
}

response = es.search(index="articles", body=query)
print(response["profile"])

Frequently Asked Questions

What is elasticsearch-py?

elasticsearch-py is the official Python client for Elasticsearch. It provides full access to the Elasticsearch REST API through a convenient Python interface.

Can Elasticsearch be used as a primary database?

Elasticsearch is not recommended as a primary database for mission‑critical data. It excels at search, analytics, and temporary data storage.

Does elasticsearch-py support cluster operations?

Yes. The client automatically discovers cluster nodes, balances load, and provides fault tolerance when individual nodes become unavailable.

How can I efficiently load massive data volumes?

Use the Bulk API via the bulk() helper function. It batches many operations into a single HTTP request for high throughput.

How do I perform complex aggregations?

Aggregations are defined under the aggs key in the request body. Elasticsearch supports a wide range of aggregation types: terms, stats, date_histogram, nested, and more.

Is Elasticsearch suitable for autocomplete?

Absolutely. Implement autocomplete with match_phrase_prefix, the completion suggester, or the search_as_you_type field type.

How can I secure my Elasticsearch deployment?

Use HTTPS connections, enable authentication (basic auth or API keys), restrict access by IP, and configure role‑based access control.

What alternatives exist to elasticsearch-py?

High‑level libraries such as elasticsearch-dsl-py simplify query building, but elasticsearch-py remains the core official client.

How do I monitor query performance?

Enable the profile: true parameter in queries, turn on client logging, and track cluster metrics via the stats APIs.

Is elasticsearch-py production‑ready?

Yes. The client is stable for production use. Ensure proper connection pooling, robust error handling, and comprehensive monitoring.

News