Docker-py – управление контейнерами Docker

онлайн тренажер по питону
Онлайн-тренажер Python для начинающих

Изучайте Python легко и без перегрузки теорией. Решайте практические задачи с автоматической проверкой, получайте подсказки на русском языке и пишите код прямо в браузере — без необходимости что-либо устанавливать.

Начать курс

Введение в Docker SDK for Python

Контейнеризация — краеугольный камень современной DevOps-инфраструктуры. Docker стал стандартом де-факто для изоляции приложений и управления средами. Однако, когда требуется более гибкое и программируемое управление контейнерами, в дело вступает Docker SDK for Python, более известный как docker-py.

Эта библиотека позволяет напрямую взаимодействовать с Docker-демоном через Python-код, управляя контейнерами, образами, сетями, томами и всем, что обычно делается из CLI. Благодаря этому docker-py широко используется в CI/CD пайплайнах, автоматизации, тестировании и динамическом оркестрационном контроле.

Что такое Docker SDK for Python

Docker SDK for Python представляет собой официальную Python-библиотеку, которая предоставляет высокоуровневый интерфейс для взаимодействия с Docker Engine API. Библиотека позволяет разработчикам интегрировать функциональность Docker непосредственно в Python-приложения, автоматизировать процессы развертывания и управления контейнерами.

Основные возможности библиотеки

Docker SDK предоставляет полный набор инструментов для работы с Docker:

  • Программное управление жизненным циклом контейнеров
  • Автоматизация сборки и развертывания образов
  • Конфигурация сетей и хранилищ данных
  • Интеграция с системами мониторинга и логирования
  • Поддержка Docker Swarm для оркестрации
  • Обработка событий Docker в реальном времени

Установка и первоначальная настройка

Установка библиотеки

Для установки Docker SDK используется стандартный менеджер пакетов pip:

pip install docker

Для установки с дополнительными зависимостями для поддержки SSH и TLS:

pip install docker[ssh,tls]

Подключение к Docker Engine

Базовое подключение к локальному Docker-демону:

import docker

# Автоматическое подключение через переменные окружения
client = docker.from_env()

# Явное указание параметров подключения
client = docker.DockerClient(base_url='unix://var/run/docker.sock')

# Подключение к удаленному Docker Engine
client = docker.DockerClient(base_url='tcp://192.168.1.100:2376')

Проверка подключения

try:
    # Проверка доступности Docker Engine
    print("Docker Engine доступен:", client.ping())
    
    # Получение информации о системе
    info = client.info()
    print(f"Версия Docker: {info['ServerVersion']}")
    print(f"Архитектура: {info['Architecture']}")
    
except docker.errors.DockerException as e:
    print(f"Ошибка подключения к Docker: {e}")

Архитектура и структура библиотеки

Docker SDK for Python построен как обертка над REST API Docker Engine, предоставляя как высокоуровневые, так и низкоуровневые интерфейсы для взаимодействия с Docker.

Структура модулей

Библиотека организована в несколько основных модулей:

  • docker.DockerClient — основной клиент для высокоуровневого взаимодействия
  • docker.APIClient — низкоуровневый API-клиент
  • docker.models — объектно-ориентированные модели для работы с ресурсами
  • docker.types — вспомогательные типы данных
  • docker.errors — исключения и обработка ошибок

Архитектурные принципы

Библиотека следует принципам REST API Docker Engine:

  • Каждый ресурс (контейнер, образ, сеть) представлен отдельным объектом
  • Операции выполняются через методы объектов
  • Поддерживается асинхронное выполнение операций
  • Автоматическая обработка соединений и переподключений

Управление контейнерами

Создание и запуск контейнеров

Базовый запуск контейнера:

# Простой запуск с автоматическим удалением
container = client.containers.run("nginx:latest", detach=True, remove=True)

# Запуск с дополнительными параметрами
container = client.containers.run(
    image="nginx:latest",
    name="my-nginx",
    ports={"80/tcp": 8080},
    environment={"ENV_VAR": "value"},
    volumes={"/host/path": {"bind": "/container/path", "mode": "rw"}},
    detach=True
)

Управление жизненным циклом

# Получение информации о контейнере
container = client.containers.get("my-nginx")
print(f"Статус: {container.status}")
print(f"ID: {container.short_id}")

# Управление состоянием
container.stop(timeout=10)
container.start()
container.restart()
container.pause()
container.unpause()

# Удаление контейнера
container.remove(force=True, v=True)  # force=True, v=True для удаления томов

Мониторинг и логирование

# Получение логов
logs = container.logs(
    stdout=True, 
    stderr=True, 
    timestamps=True, 
    since="2023-01-01T00:00:00"
)
print(logs.decode('utf-8'))

# Мониторинг ресурсов в реальном времени
for stats in container.stats(stream=True):
    cpu_usage = stats['cpu_stats']['cpu_usage']['total_usage']
    memory_usage = stats['memory_stats']['usage']
    print(f"CPU: {cpu_usage}, Memory: {memory_usage}")
    break  # Выход после первого чтения

Выполнение команд в контейнере

# Выполнение команды
exit_code, output = container.exec_run("ls -la /")
print(f"Код возврата: {exit_code}")
print(output.decode('utf-8'))

# Интерактивное выполнение
exec_id = container.exec_run("bash", stdin=True, tty=True, detach=True)

Работа с образами Docker

Управление локальными образами

# Загрузка образа из реестра
image = client.images.pull("ubuntu:20.04", tag="latest")
print(f"Образ загружен: {image.id}")

# Список локальных образов
images = client.images.list()
for img in images:
    print(f"Теги: {img.tags}, Размер: {img.attrs['Size']}")

# Поиск образов в реестре
search_results = client.images.search("nginx")
for result in search_results[:5]:  # Первые 5 результатов
    print(f"Название: {result['name']}, Звезд: {result['star_count']}")

Сборка образов

# Сборка из Dockerfile
image, build_logs = client.images.build(
    path="./my-app",          # Путь к контексту сборки
    tag="my-app:v1.0",        # Тег образа
    dockerfile="Dockerfile",   # Имя Dockerfile
    rm=True,                  # Удаление промежуточных контейнеров
    nocache=False,            # Использование кеша
    buildargs={"VERSION": "1.0"}  # Аргументы сборки
)

# Обработка логов сборки
for log in build_logs:
    if 'stream' in log:
        print(log['stream'].strip())
    if 'error' in log:
        print(f"Ошибка сборки: {log['error']}")

Работа с реестрами

# Аутентификация в Docker Hub
client.login(username="your_username", password="your_password")

# Отправка образа в реестр
push_logs = client.images.push("my-app:v1.0", stream=True)
for log in push_logs:
    print(log)

# Работа с приватными реестрами
auth_config = {
    'username': 'user',
    'password': 'pass',
    'serveraddress': 'registry.example.com'
}
client.images.push("registry.example.com/my-app:v1.0", auth_config=auth_config)

Управление сетями Docker

Создание и настройка сетей

# Создание bridge сети
network = client.networks.create(
    name="my-network",
    driver="bridge",
    options={
        "com.docker.network.bridge.enable_icc": "true",
        "com.docker.network.bridge.enable_ip_masquerade": "true"
    },
    ipam=docker.types.IPAMConfig(
        pool_configs=[docker.types.IPAMPool(subnet="172.20.0.0/16")]
    )
)

# Создание overlay сети для Swarm
overlay_network = client.networks.create(
    name="overlay-net",
    driver="overlay",
    scope="swarm",
    attachable=True
)

Подключение контейнеров к сетям

# Запуск контейнера с подключением к сети
container = client.containers.run(
    "nginx:latest",
    name="web-server",
    networks=["my-network"],
    detach=True
)

# Подключение существующего контейнера
network.connect(container, aliases=["web", "frontend"])

# Отключение от сети
network.disconnect(container, force=True)

Управление томами данных

Создание и настройка томов

# Создание именованного тома
volume = client.volumes.create(
    name="app-data",
    driver="local",
    driver_opts={
        "type": "none",
        "o": "bind",
        "device": "/host/data"
    },
    labels={"environment": "production"}
)

# Получение информации о томе
volume_info = client.volumes.get("app-data")
print(f"Путь монтирования: {volume_info.attrs['Mountpoint']}")

Использование томов в контейнерах

# Монтирование тома при создании контейнера
container = client.containers.run(
    "postgres:13",
    environment={"POSTGRES_PASSWORD": "secret"},
    volumes={
        "postgres-data": {"bind": "/var/lib/postgresql/data", "mode": "rw"},
        "/host/config": {"bind": "/etc/postgresql", "mode": "ro"}
    },
    detach=True
)

# Использование типизированных маунтов
from docker.types import Mount

mounts = [
    Mount(target="/data", source="app-data", type="volume"),
    Mount(target="/config", source="/host/config", type="bind", read_only=True)
]

container = client.containers.run("app:latest", mounts=mounts, detach=True)

Продвинутые возможности

Обработка событий Docker

# Подписка на события в реальном времени
for event in client.events(decode=True, filters={"type": "container"}):
    if event["Action"] == "start":
        print(f"Контейнер {event['Actor']['Attributes']['name']} запущен")
    elif event["Action"] == "die":
        print(f"Контейнер {event['Actor']['Attributes']['name']} остановлен")

Работа с Docker Compose через Python

import docker
import yaml

# Загрузка docker-compose.yml
with open('docker-compose.yml', 'r') as file:
    compose_config = yaml.safe_load(file)

# Создание сервисов из compose файла
for service_name, service_config in compose_config['services'].items():
    container = client.containers.run(
        image=service_config['image'],
        name=f"{service_name}_1",
        ports=service_config.get('ports', {}),
        environment=service_config.get('environment', {}),
        detach=True
    )
    print(f"Сервис {service_name} запущен")

Создание кластера Docker Swarm

# Инициализация Swarm
swarm = client.swarm.init(advertise_addr="192.168.1.100:2377")

# Создание службы в Swarm
service = client.services.create(
    image="nginx:latest",
    name="web-service",
    replicas=3,
    publish_ports=[{"published_port": 80, "target_port": 80}],
    networks=["ingress"]
)

# Масштабирование службы
service.scale(5)

Полная таблица методов и функций Docker SDK

Модуль Метод/Функция Описание Пример использования
DockerClient from_env() Создание клиента из переменных окружения client = docker.from_env()
  ping() Проверка доступности Docker client.ping()
  version() Получение версии Docker client.version()
  info() Информация о Docker Engine client.info()
  close() Закрытие соединения client.close()
  login() Аутентификация в реестре client.login(username, password)
Containers run() Запуск нового контейнера client.containers.run("nginx")
  list() Список контейнеров client.containers.list(all=True)
  get() Получение контейнера по ID client.containers.get("container_id")
  create() Создание контейнера без запуска client.containers.create("nginx")
  prune() Удаление остановленных контейнеров client.containers.prune()
Container start() Запуск контейнера container.start()
  stop() Остановка контейнера container.stop(timeout=10)
  restart() Перезапуск контейнера container.restart()
  pause() Приостановка контейнера container.pause()
  unpause() Возобновление контейнера container.unpause()
  remove() Удаление контейнера container.remove(force=True)
  kill() Принудительная остановка container.kill(signal="SIGTERM")
  logs() Получение логов container.logs(timestamps=True)
  stats() Статистика ресурсов container.stats(stream=False)
  exec_run() Выполнение команды container.exec_run("ls -la")
  attach() Подключение к контейнеру container.attach(stream=True)
  commit() Создание образа из контейнера container.commit(repository="new_image")
  export() Экспорт контейнера container.export()
  resize() Изменение размера TTY container.resize(height=24, width=80)
  update() Обновление конфигурации container.update(mem_limit="512m")
Images pull() Загрузка образа client.images.pull("ubuntu:20.04")
  build() Сборка образа client.images.build(path=".")
  list() Список образов client.images.list()
  get() Получение образа client.images.get("image_id")
  remove() Удаление образа client.images.remove("image_id")
  search() Поиск образов client.images.search("nginx")
  prune() Удаление неиспользуемых образов client.images.prune()
  push() Отправка в реестр client.images.push("repo/image")
  import_image() Импорт образа из tar client.images.import_image(src="image.tar")
  load() Загрузка из tar архива client.images.load(data=tar_data)
Image tag() Добавление тега image.tag("new_repo", "new_tag")
  history() История слоев образа image.history()
  save() Сохранение в tar image.save()
Networks create() Создание сети client.networks.create("my_net")
  list() Список сетей client.networks.list()
  get() Получение сети client.networks.get("network_id")
  prune() Удаление неиспользуемых сетей client.networks.prune()
Network connect() Подключение контейнера network.connect(container)
  disconnect() Отключение контейнера network.disconnect(container)
  remove() Удаление сети network.remove()
  reload() Обновление информации network.reload()
Volumes create() Создание тома client.volumes.create("my_volume")
  list() Список томов client.volumes.list()
  get() Получение тома client.volumes.get("volume_name")
  prune() Удаление неиспользуемых томов client.volumes.prune()
Volume remove() Удаление тома volume.remove(force=True)
  reload() Обновление информации volume.reload()
Services create() Создание службы Swarm client.services.create(image="nginx")
  list() Список служб client.services.list()
  get() Получение службы client.services.get("service_id")
Service update() Обновление службы service.update(image="nginx:latest")
  remove() Удаление службы service.remove()
  scale() Масштабирование службы service.scale(replicas=5)
  tasks() Получение задач службы service.tasks()
  logs() Логи службы service.logs()
Swarm init() Инициализация Swarm client.swarm.init()
  join() Присоединение к Swarm client.swarm.join(remote_addrs)
  leave() Выход из Swarm client.swarm.leave(force=True)
  update() Обновление Swarm client.swarm.update()
  reload() Обновление информации client.swarm.reload()
Nodes list() Список узлов Swarm client.nodes.list()
  get() Получение узла client.nodes.get("node_id")
Node update() Обновление узла node.update(availability="drain")
Secrets create() Создание секрета client.secrets.create(name="secret", data="data")
  list() Список секретов client.secrets.list()
  get() Получение секрета client.secrets.get("secret_id")
Secret remove() Удаление секрета secret.remove()
  update() Обновление секрета secret.update(data="new_data")
Configs create() Создание конфигурации client.configs.create(name="config", data="data")
  list() Список конфигураций client.configs.list()
  get() Получение конфигурации client.configs.get("config_id")
Config remove() Удаление конфигурации config.remove()
  update() Обновление конфигурации config.update(data="new_data")
Plugins install() Установка плагина client.plugins.install("plugin_name")
  list() Список плагинов client.plugins.list()
  get() Получение плагина client.plugins.get("plugin_name")
Plugin remove() Удаление плагина plugin.remove()
  enable() Включение плагина plugin.enable()
  disable() Отключение плагина plugin.disable()
  configure() Настройка плагина plugin.configure(options)
  upgrade() Обновление плагина plugin.upgrade()

Обработка ошибок и отладка

Основные типы исключений

from docker.errors import (
    DockerException,      # Базовое исключение
    APIError,            # Ошибки API
    BuildError,          # Ошибки сборки
    ContainerError,      # Ошибки контейнера
    ImageNotFound,       # Образ не найден
    NotFound,           # Ресурс не найден
    InvalidVersion      # Неподдерживаемая версия API
)

try:
    container = client.containers.get("nonexistent")
except NotFound:
    print("Контейнер не найден")
except APIError as e:
    print(f"API Error: {e.response.status_code} - {e.explanation}")
except DockerException as e:
    print(f"Docker Error: {e}")

Настройка логирования

import logging

# Включение отладочных логов
logging.basicConfig(
    level=logging.DEBUG,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)

# Настройка логгера для docker
docker_logger = logging.getLogger('docker')
docker_logger.setLevel(logging.INFO)

Таймауты и повторные попытки

import time
from docker.errors import APIError

def retry_docker_operation(operation, max_attempts=3, delay=1):
    """Повторное выполнение операции при сбое"""
    for attempt in range(max_attempts):
        try:
            return operation()
        except APIError as e:
            if attempt == max_attempts - 1:
                raise
            print(f"Попытка {attempt + 1} неудачна: {e}")
            time.sleep(delay)

# Использование
container = retry_docker_operation(
    lambda: client.containers.run("nginx", detach=True)
)

Интеграция с CI/CD системами

GitHub Actions

# deploy.py - скрипт для GitHub Actions
import docker
import sys
import os

def deploy_application():
    client = docker.from_env()
    
    # Сборка образа
    try:
        image, logs = client.images.build(
            path=".",
            tag=f"myapp:{os.environ['GITHUB_SHA'][:7]}",
            rm=True
        )
        print("Образ успешно собран")
        
        # Остановка старого контейнера
        try:
            old_container = client.containers.get("myapp")
            old_container.stop()
            old_container.remove()
        except:
            pass
        
        # Запуск нового контейнера
        container = client.containers.run(
            f"myapp:{os.environ['GITHUB_SHA'][:7]}",
            name="myapp",
            ports={"8080/tcp": 80},
            detach=True,
            restart_policy={"Name": "unless-stopped"}
        )
        
        print(f"Приложение развернуто: {container.id}")
        
    except Exception as e:
        print(f"Ошибка развертывания: {e}")
        sys.exit(1)

if __name__ == "__main__":
    deploy_application()

Jenkins Pipeline

# jenkins_deploy.py
import docker
import json

def jenkins_deployment(build_number, git_commit):
    """Развертывание через Jenkins"""
    client = docker.from_env()
    
    # Параметры сборки
    image_tag = f"myapp:build-{build_number}"
    
    # Сборка с Jenkins Build Args
    build_args = {
        "BUILD_NUMBER": str(build_number),
        "GIT_COMMIT": git_commit,
        "BUILD_DATE": "$(date -u +'%Y-%m-%dT%H:%M:%SZ')"
    }
    
    image, logs = client.images.build(
        path=".",
        tag=image_tag,
        buildargs=build_args,
        rm=True
    )
    
    # Деплой с health check
    container = client.containers.run(
        image_tag,
        name=f"myapp-{build_number}",
        ports={"8080/tcp": 8080},
        environment={"ENV": "production"},
        healthcheck={
            "test": ["CMD", "curl", "-f", "http://localhost:8080/health"],
            "interval": 30000000000,  # 30 секунд в наносекундах
            "timeout": 10000000000,   # 10 секунд
            "retries": 3
        },
        detach=True
    )
    
    return container.id

Мониторинг и метрики

Сбор метрик контейнеров

import time
import json
from datetime import datetime

def collect_container_metrics(container_names):
    """Сбор метрик для указанных контейнеров"""
    metrics = {}
    
    for name in container_names:
        try:
            container = client.containers.get(name)
            stats = container.stats(stream=False)
            
            # Расчет использования CPU
            cpu_delta = stats['cpu_stats']['cpu_usage']['total_usage'] - \
                       stats['precpu_stats']['cpu_usage']['total_usage']
            system_delta = stats['cpu_stats']['system_cpu_usage'] - \
                          stats['precpu_stats']['system_cpu_usage']
            cpu_percent = (cpu_delta / system_delta) * 100.0
            
            # Использование памяти
            memory_usage = stats['memory_stats']['usage']
            memory_limit = stats['memory_stats']['limit']
            memory_percent = (memory_usage / memory_limit) * 100.0
            
            metrics[name] = {
                "timestamp": datetime.now().isoformat(),
                "cpu_percent": round(cpu_percent, 2),
                "memory_usage_mb": round(memory_usage / 1024 / 1024, 2),
                "memory_percent": round(memory_percent, 2),
                "status": container.status
            }
            
        except Exception as e:
            metrics[name] = {"error": str(e)}
    
    return metrics

# Использование
container_metrics = collect_container_metrics(["web", "db", "cache"])
print(json.dumps(container_metrics, indent=2))

Автоматическое масштабирование

def auto_scale_service(service_name, cpu_threshold=80, max_replicas=10):
    """Автоматическое масштабирование службы на основе CPU"""
    service = client.services.get(service_name)
    current_replicas = service.attrs['Spec']['Mode']['Replicated']['Replicas']
    
    # Получение метрик всех задач службы
    tasks = service.tasks()
    total_cpu = 0
    active_tasks = 0
    
    for task in tasks:
        if task['Status']['State'] == 'running':
            container_id = task['Status']['ContainerStatus']['ContainerID']
            try:
                container = client.containers.get(container_id)
                stats = container.stats(stream=False)
                # Расчет CPU (упрощенный)
                cpu_percent = calculate_cpu_percent(stats)
                total_cpu += cpu_percent
                active_tasks += 1
            except:
                continue
    
    if active_tasks > 0:
        avg_cpu = total_cpu / active_tasks
        
        if avg_cpu > cpu_threshold and current_replicas < max_replicas:
            new_replicas = min(current_replicas + 1, max_replicas)
            service.scale(new_replicas)
            print(f"Масштабирование {service_name}: {current_replicas} -> {new_replicas}")
        elif avg_cpu < cpu_threshold / 2 and current_replicas > 1:
            new_replicas = max(current_replicas - 1, 1)
            service.scale(new_replicas)
            print(f"Уменьшение {service_name}: {current_replicas} -> {new_replicas}")

Безопасность и аутентификация

Настройка TLS подключения

import docker
import ssl

# Подключение с TLS сертификатами
tls_config = docker.tls.TLSConfig(
    client_cert=('/path/to/cert.pem', '/path/to/key.pem'),
    ca_cert='/path/to/ca.pem',
    verify=True
)

client = docker.DockerClient(
    base_url='https://docker-host:2376',
    tls=tls_config
)

Управление секретами

# Создание секрета в Docker Swarm
secret = client.secrets.create(
    name="db_password",
    data="super_secret_password",
    labels={"environment": "production"}
)

# Использование секрета в службе
service = client.services.create(
    image="postgres:13",
    name="database",
    secrets=[
        {
            "secret_id": secret.id,
            "secret_name": "db_password",
            "filename": "/run/secrets/db_password",
            "uid": "999",
            "gid": "999",
            "mode": 0o400
        }
    ],
    environment={
        "POSTGRES_PASSWORD_FILE": "/run/secrets/db_password"
    }
)

Оптимизация производительности

Кеширование соединений

class DockerManager:
    """Менеджер для переиспользования Docker клиента"""
    
    def __init__(self):
        self._client = None
    
    @property
    def client(self):
        if self._client is None:
            self._client = docker.from_env()
        return self._client
    
    def __del__(self):
        if self._client:
            self._client.close()

# Глобальный менеджер
docker_manager = DockerManager()

# Использование
def deploy_container():
    client = docker_manager.client
    return client.containers.run("nginx", detach=True)

Параллельное выполнение операций

import concurrent.futures
import threading

def parallel_container_operations():
    """Параллельное управление множественными контейнерами"""
    
    def start_container(image_name, container_name):
        local_client = docker.from_env()  # Отдельный клиент для потока
        try:
            container = local_client.containers.run(
                image_name,
                name=container_name,
                detach=True
            )
            return f"Запущен: {container_name}"
        except Exception as e:
            return f"Ошибка {container_name}: {e}"
        finally:
            local_client.close()
    
    # Список контейнеров для запуска
    containers_to_start = [
        ("nginx:latest", "web1"),
        ("nginx:latest", "web2"),
        ("redis:latest", "cache"),
        ("postgres:13", "database")
    ]
    
    # Параллельное выполнение
    with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
        futures = [
            executor.submit(start_container, image, name)
            for image, name in containers_to_start
        ]
        
        results = [future.result() for future in concurrent.futures.as_completed(futures)]
    
    return results

Тестирование с Docker SDK

Юнит-тесты с временными контейнерами

import unittest
import docker
import time

class TestWithDocker(unittest.TestCase):
    
    @classmethod
    def setUpClass(cls):
        cls.client = docker.from_env()
    
    @classmethod
    def tearDownClass(cls):
        cls.client.close()
    
    def setUp(self):
        """Подготовка тестового окружения"""
        self.test_containers = []
    
    def tearDown(self):
        """Очистка тестовых контейнеров"""
        for container in self.test_containers:
            try:
                container.stop()
                container.remove()
            except:
                pass
    
    def test_nginx_container(self):
        """Тест запуска nginx контейнера"""
        container = self.client.containers.run(
            "nginx:latest",
            ports={"80/tcp": None},  # Случайный порт
            detach=True
        )
        self.test_containers.append(container)
        
        # Ожидание запуска
        time.sleep(2)
        container.reload()
        
        self.assertEqual(container.status, "running")
        
        # Проверка портов
        ports = container.ports
        self.assertIn("80/tcp", ports)
    
    def test_database_connection(self):
        """Тест подключения к базе данных"""
        container = self.client.containers.run(
            "postgres:13",
            environment={
                "POSTGRES_PASSWORD": "testpass",
                "POSTGRES_DB": "testdb"
            },
            ports={"5432/tcp": None},
            detach=True
        )
        self.test_containers.append(container)
        
        # Ожидание готовности БД
        time.sleep(10)
        
        # Проверка логов на готовность
        logs = container.logs().decode('utf-8')
        self.assertIn("database system is ready to accept connections", logs)

if __name__ == '__main__':
    unittest.main()

Практические сценарии использования

Автоматизированный деплой микросервисов

class MicroserviceDeployer:
    """Автоматизированный деплой микросервисной архитектуры"""
    
    def __init__(self):
        self.client = docker.from_env()
        self.services = {}
    
    def deploy_service_stack(self, config):
        """Развертывание стека сервисов"""
        
        # Создание сети
        network = self._create_network(config['network'])
        
        # Развертывание сервисов в правильном порядке
        for service_config in config['services']:
            service_name = service_config['name']
            
            # Ожидание зависимостей
            self._wait_for_dependencies(service_config.get('depends_on', []))
            
            # Запуск сервиса
            container = self._deploy_service(service_config, network)
            self.services[service_name] = container
            
            print(f"Сервис {service_name} развернут")
    
    def _create_network(self, network_config):
        """Создание сети для сервисов"""
        try:
            return self.client.networks.get(network_config['name'])
        except:
            return self.client.networks.create(
                name=network_config['name'],
                driver=network_config.get('driver', 'bridge')
            )
    
    def _deploy_service(self, service_config, network):
        """Развертывание отдельного сервиса"""
        
        # Остановка существующего контейнера
        try:
            old_container = self.client.containers.get(service_config['name'])
            old_container.stop()
            old_container.remove()
        except:
            pass
        
        # Запуск нового контейнера
        container = self.client.containers.run(
            image=service_config['image'],
            name=service_config['name'],
            ports=service_config.get('ports', {}),
            environment=service_config.get('environment', {}),
            volumes=service_config.get('volumes', {}),
            networks=[network.name],
            restart_policy={"Name": "unless-stopped"},
            detach=True
        )
        
        return container
    
    def _wait_for_dependencies(self, dependencies):
        """Ожидание готовности зависимых сервисов"""
        for dep_name in dependencies:
            if dep_name in self.services:
                container = self.services[dep_name]
                # Проверка health check или логов
                self._wait_for_service_ready(container)
    
    def health_check(self):
        """Проверка состояния всех сервисов"""
        status = {}
        for name, container in self.services.items():
            container.reload()
            status[name] = {
                "status": container.status,
                "health": self._get_health_status(container)
            }
        return status

# Конфигурация микросервисов
microservices_config = {
    "network": {"name": "microservices_net"},
    "services": [
        {
            "name": "database",
            "image": "postgres:13",
            "environment": {
                "POSTGRES_PASSWORD": "secret",
                "POSTGRES_DB": "appdb"
            },
            "volumes": {"db_data": {"bind": "/var/lib/postgresql/data"}}
        },
        {
            "name": "api",
            "image": "myapi:latest",
            "ports": {"8080/tcp": 8080},
            "depends_on": ["database"],
            "environment": {"DB_HOST": "database"}
        },
        {
            "name": "frontend",
            "image": "myfrontend:latest",
            "ports": {"80/tcp": 80},
            "depends_on": ["api"],
            "environment": {"API_URL": "http://api:8080"}
        }
    ]
}

# Развертывание
deployer = MicroserviceDeployer()
deployer.deploy_service_stack(microservices_config)

Сравнение с альтернативными решениями

Критерий Docker SDK for Python Docker CLI Podman API Kubernetes Python Client
Языковая интеграция Нативный Python Shell/Subprocess Python доступен Нативный Python
Производительность Высокая Средняя Высокая Средняя
Функциональность Полная Полная Ограниченная Расширенная
Сложность использования Низкая Низкая Средняя Высокая
Поддержка оркестрации Docker Swarm Docker Swarm Нет Kubernetes
Документация Отличная Отличная Хорошая Отличная
Экосистема Обширная Обширная Развивающаяся Очень обширная

Часто задаваемые вопросы

Как обновить существующий контейнер без простоя?

def rolling_update(service_name, new_image):
    """Обновление контейнера без простоя"""
    # Запуск нового контейнера
    new_container = client.containers.run(
        new_image,
        name=f"{service_name}_new",
        ports={"80/tcp": None},  # Временный порт
        detach=True
    )
    
    # Проверка готовности
    time.sleep(5)
    new_container.reload()
    
    if new_container.status == "running":
        # Остановка старого контейнера
        old_container = client.containers.get(service_name)
        old_port = old_container.ports["80/tcp"][0]["HostPort"]
        
        old_container.stop()
        old_container.remove()
        
        # Переименование нового контейнера
        new_container.rename(service_name)
        
        # Обновление порта (требует restart)
        new_container.stop()
        client.containers.run(
            new_image,
            name=service_name,
            ports={"80/tcp": old_port},
            detach=True
        )

Как мониторить использование ресурсов контейнера?

def monitor_resources(container_name, duration=60):
    """Мониторинг ресурсов контейнера"""
    container = client.containers.get(container_name)
    
    start_time = time.time()
    metrics = []
    
    for stats in container.stats(stream=True):
        if time.time() - start_time > duration:
            break
            
        # Парсинг статистики
        memory_usage = stats['memory_stats']['usage']
        memory_limit = stats['memory_stats']['limit']
        
        cpu_usage = stats['cpu_stats']['cpu_usage']['total_usage']
        prev_cpu = stats['precpu_stats']['cpu_usage']['total_usage']
        
        metrics.append({
            "timestamp": time.time(),
            "memory_percent": (memory_usage / memory_limit) * 100,
            "cpu_usage": cpu_usage - prev_cpu
        })
        
        time.sleep(1)
    
    return metrics

Как правильно обрабатывать долгоживущие операции?

import threading
from queue import Queue

def async_build_image(dockerfile_path, tag, callback=None):
    """Асинхронная сборка образа"""
    
    def build_worker():
        try:
            image, logs = client.images.build(
                path=dockerfile_path,
                tag=tag,
                rm=True
            )
            
            if callback:
                callback(True, image, list(logs))
                
        except Exception as e:
            if callback:
                callback(False, None, str(e))
    
    thread = threading.Thread(target=build_worker)
    thread.daemon = True
    thread.start()
    
    return thread

# Использование с callback
def build_complete(success, image, logs):
    if success:
        print(f"Образ собран: {image.id}")
    else:
        print(f"Ошибка сборки: {logs}")

build_thread = async_build_image("./app", "myapp:latest", build_complete)

Как организовать централизованное логирование?

import logging
import json
from datetime import datetime

class DockerLogHandler(logging.Handler):
    """Кастомный обработчик логов для Docker"""
    
    def __init__(self, container_names):
        super().__init__()
        self.container_names = container_names
        self.client = docker.from_env()
    
    def emit(self, record):
        """Отправка логов в контейнеры"""
        log_entry = {
            "timestamp": datetime.utcnow().isoformat(),
            "level": record.levelname,
            "message": self.format(record),
            "module": record.name
        }
        
        for container_name in self.container_names:
            try:
                container = self.client.containers.get(container_name)
                # Отправка лога в контейнер через stdin
                container.exec_run(
                    f"echo '{json.dumps(log_entry)}' >> /var/log/app.log"
                )
            except:
                pass

# Настройка централизованного логирования
logger = logging.getLogger("myapp")
docker_handler = DockerLogHandler(["log-aggregator"])
logger.addHandler(docker_handler)

Заключение

Docker SDK for Python представляет собой мощный и гибкий инструмент для программного управления контейнерами Docker. Библиотека обеспечивает полную интеграцию возможностей Docker в Python-приложения, от простых задач автоматизации до сложных систем оркестрации.

Ключевые преимущества Docker SDK включают:

  • Нативная интеграция с Python — отсутствие необходимости в subprocess и shell-скриптах
  • Полное покрытие Docker API — доступ ко всем возможностям Docker Engine
  • Высокоуровневые абстракции — простота использования без потери функциональности
  • Асинхронная поддержка — эффективная обработка долгоживущих операций
  • Расширяемость — возможность создания собственных обёрток и интеграций

Библиотека незаменима для DevOps-инженеров, разработчиков и системных администраторов, работающих с контейнерными технологиями. Она значительно упрощает автоматизацию процессов развертывания, мониторинга и управления контейнерными приложениями, делая Docker-инфраструктуру более программируемой и контролируемой.

Docker SDK for Python продолжает активно развиваться, получая обновления совместно с новыми версиями Docker Engine, что гарантирует долгосрочную поддержку и совместимость с современными контейнерными технологиями.

Новости