Introduction to Docker SDK for Python
Containerization is the cornerstone of modern DevOps infrastructure. Docker has become the de‑facto standard for application isolation and environment management. However, when more flexible and programmable container control is needed, Docker SDK for Python, also known as docker‑py, steps in.
This library enables direct interaction with the Docker daemon via Python code, managing containers, images, networks, volumes, and everything typically handled through the CLI. Because of this, docker‑py is widely used in CI/CD pipelines, automation, testing, and dynamic orchestration control.
What Is Docker SDK for Python
Docker SDK for Python is the official Python library that provides a high‑level interface to the Docker Engine API. The library allows developers to embed Docker functionality directly into Python applications, automating image builds, deployments, and container management.
Key Features of the Library
Docker SDK offers a complete set of tools for working with Docker:
- Programmatic lifecycle management of containers
- Automation of image building and deployment
- Configuration of networks and storage
- Integration with monitoring and logging systems
- Support for Docker Swarm orchestration
- Real‑time Docker event handling
Installation and Initial Setup
Installing the Library
Install Docker SDK using the standard pip package manager:
pip install docker
Install with additional dependencies for SSH and TLS support:
pip install docker[ssh,tls]
Connecting to Docker Engine
Basic connection to a local Docker daemon:
import docker
# Automatic connection using environment variables
client = docker.from_env()
# Explicit connection parameters
client = docker.DockerClient(base_url='unix://var/run/docker.sock')
# Remote Docker Engine connection
client = docker.DockerClient(base_url='tcp://192.168.1.100:2376')
Verifying the Connection
try:
# Check Docker Engine availability
print("Docker Engine reachable:", client.ping())
# Retrieve system information
info = client.info()
print(f"Docker version: {info['ServerVersion']}")
print(f"Architecture: {info['Architecture']}")
except docker.errors.DockerException as e:
print(f"Connection error to Docker: {e}")
Architecture and Library Structure
Docker SDK for Python is built as a wrapper around the Docker Engine REST API, offering both high‑level and low‑level interfaces for Docker interaction.
Module Structure
The library is organized into several core modules:
- docker.DockerClient — primary client for high‑level operations
- docker.APIClient — low‑level API client
- docker.models — object‑oriented models for resources
- docker.types — helper data types
- docker.errors — exceptions and error handling
Architectural Principles
The library follows Docker Engine REST API conventions:
- Each resource (container, image, network) is represented by a distinct object
- Operations are performed via object methods
- Asynchronous operation support
- Automatic connection handling and reconnection
Container Management
Creating and Running Containers
Basic container launch:
# Simple run with automatic removal
container = client.containers.run("nginx:latest", detach=True, remove=True)
# Run with additional parameters
container = client.containers.run(
image="nginx:latest",
name="my-nginx",
ports={"80/tcp": 8080},
environment={"ENV_VAR": "value"},
volumes={"/host/path": {"bind": "/container/path", "mode": "rw"}},
detach=True
)
Lifecycle Management
# Retrieve container details
container = client.containers.get("my-nginx")
print(f"Status: {container.status}")
print(f"ID: {container.short_id}")
# State control
container.stop(timeout=10)
container.start()
container.restart()
container.pause()
container.unpause()
# Remove container
container.remove(force=True, v=True) # force=True, v=True also removes volumes
Monitoring and Logging
# Fetch logs
logs = container.logs(
stdout=True,
stderr=True,
timestamps=True,
since="2023-01-01T00:00:00"
)
print(logs.decode('utf-8'))
# Real‑time resource monitoring
for stats in container.stats(stream=True):
cpu_usage = stats['cpu_stats']['cpu_usage']['total_usage']
memory_usage = stats['memory_stats']['usage']
print(f"CPU: {cpu_usage}, Memory: {memory_usage}")
break # Exit after first read
Executing Commands Inside a Container
# Run a command
exit_code, output = container.exec_run("ls -la /")
print(f"Exit code: {exit_code}")
print(output.decode('utf-8'))
# Interactive execution
exec_id = container.exec_run("bash", stdin=True, tty=True, detach=True)
Working with Docker Images
Managing Local Images
# Pull an image from a registry
image = client.images.pull("ubuntu:20.04", tag="latest")
print(f"Image pulled: {image.id}")
# List local images
images = client.images.list()
for img in images:
print(f"Tags: {img.tags}, Size: {img.attrs['Size']}")
# Search images in a registry
search_results = client.images.search("nginx")
for result in search_results[:5]: # First 5 results
print(f"Name: {result['name']}, Stars: {result['star_count']}")
Building Images
# Build from Dockerfile
image, build_logs = client.images.build(
path="./my-app", # Build context path
tag="my-app:v1.0", # Image tag
dockerfile="Dockerfile", # Dockerfile name
rm=True, # Remove intermediate containers
nocache=False, # Use cache
buildargs={"VERSION": "1.0"} # Build arguments
)
# Process build logs
for log in build_logs:
if 'stream' in log:
print(log['stream'].strip())
if 'error' in log:
print(f"Build error: {log['error']}")
Working with Registries
# Docker Hub authentication
client.login(username="your_username", password="your_password")
# Push image to a registry
push_logs = client.images.push("my-app:v1.0", stream=True)
for log in push_logs:
print(log)
# Private registry handling
auth_config = {
'username': 'user',
'password': 'pass',
'serveraddress': 'registry.example.com'
}
client.images.push("registry.example.com/my-app:v1.0", auth_config=auth_config)
Docker Network Management
Creating and Configuring Networks
# Create a bridge network
network = client.networks.create(
name="my-network",
driver="bridge",
options={
"com.docker.network.bridge.enable_icc": "true",
"com.docker.network.bridge.enable_ip_masquerade": "true"
},
ipam=docker.types.IPAMConfig(
pool_configs=[docker.types.IPAMPool(subnet="172.20.0.0/16")]
)
)
# Create an overlay network for Swarm
overlay_network = client.networks.create(
name="overlay-net",
driver="overlay",
scope="swarm",
attachable=True
)
Connecting Containers to Networks
# Run a container attached to a network
container = client.containers.run(
"nginx:latest",
name="web-server",
networks=["my-network"],
detach=True
)
# Connect an existing container
network.connect(container, aliases=["web", "frontend"])
# Disconnect from the network
network.disconnect(container, force=True)
Docker Volume Management
Creating and Configuring Volumes
# Create a named volume
volume = client.volumes.create(
name="app-data",
driver="local",
driver_opts={
"type": "none",
"o": "bind",
"device": "/host/data"
},
labels={"environment": "production"}
)
# Get volume details
volume_info = client.volumes.get("app-data")
print(f"Mount point: {volume_info.attrs['Mountpoint']}")
Using Volumes in Containers
# Mount a volume when creating a container
container = client.containers.run(
"postgres:13",
environment={"POSTGRES_PASSWORD": "secret"},
volumes={
"postgres-data": {"bind": "/var/lib/postgresql/data", "mode": "rw"},
"/host/config": {"bind": "/etc/postgresql", "mode": "ro"}
},
detach=True
)
# Use typed mounts
from docker.types import Mount
mounts = [
Mount(target="/data", source="app-data", type="volume"),
Mount(target="/config", source="/host/config", type="bind", read_only=True)
]
container = client.containers.run("app:latest", mounts=mounts, detach=True)
Advanced Capabilities
Docker Event Processing
# Subscribe to real‑time events
for event in client.events(decode=True, filters={"type": "container"}):
if event["Action"] == "start":
print(f"Container {event['Actor']['Attributes']['name']} started")
elif event["Action"] == "die":
print(f"Container {event['Actor']['Attributes']['name']} stopped")
Working with Docker Compose via Python
import docker
import yaml
# Load docker-compose.yml
with open('docker-compose.yml', 'r') as file:
compose_config = yaml.safe_load(file)
# Create services from compose file
for service_name, service_config in compose_config['services'].items():
container = client.containers.run(
image=service_config['image'],
name=f"{service_name}_1",
ports=service_config.get('ports', {}),
environment=service_config.get('environment', {}),
detach=True
)
print(f"Service {service_name} started")
Creating a Docker Swarm Cluster
# Initialize Swarm
swarm = client.swarm.init(advertise_addr="192.168.1.100:2377")
# Create a service in Swarm
service = client.services.create(
image="nginx:latest",
name="web-service",
replicas=3,
publish_ports=[{"published_port": 80, "target_port": 80}],
networks=["ingress"]
)
# Scale the service
service.scale(5)
Complete Method and Function Reference for Docker SDK
| Module | Method/Function | Description | Example usage |
|---|---|---|---|
| DockerClient | from_env() |
Create a client from environment variables | client = docker.from_env() |
ping() |
Check Docker availability | client.ping() |
|
version() |
Retrieve Docker Engine version | client.version() |
|
info() |
Get Docker Engine information | client.info() |
|
close() |
Close the client connection | client.close() |
|
login() |
Authenticate with a registry | client.login(username, password) |
|
| Containers | run() |
Launch a new container | client.containers.run("nginx") |
list() |
List containers | client.containers.list(all=True) |
|
get() |
Get a container by ID | client.containers.get("container_id") |
|
create() |
Create a container without starting | client.containers.create("nginx") |
|
prune() |
Remove stopped containers | client.containers.prune() |
|
| Container | start() |
Start the container | container.start() |
stop() |
Stop the container | container.stop(timeout=10) |
|
restart() |
Restart the container | container.restart() |
|
pause() |
Pause the container | container.pause() |
|
unpause() |
Unpause the container | container.unpause() |
|
remove() |
Remove the container | container.remove(force=True) |
|
kill() |
Force‑stop the container | container.kill(signal="SIGTERM") |
|
logs() |
Fetch container logs | container.logs(timestamps=True) |
|
stats() |
Retrieve resource statistics | container.stats(stream=False) |
|
exec_run() |
Execute a command inside the container | container.exec_run("ls -la") |
|
attach() |
Attach to the container's streams | container.attach(stream=True) |
|
commit() |
Create an image from the container | container.commit(repository="new_image") |
|
export() |
Export the container filesystem | container.export() |
|
resize() |
Resize the TTY | container.resize(height=24, width=80) |
|
update() |
Update container configuration | container.update(mem_limit="512m") |
|
| Images | pull() |
Pull an image from a registry | client.images.pull("ubuntu:20.04") |
build() |
Build an image | client.images.build(path=".") |
|
list() |
List images | client.images.list() |
|
get() |
Get an image by ID | client.images.get("image_id") |
|
remove() |
Remove an image | client.images.remove("image_id") |
|
search() |
Search for images | client.images.search("nginx") |
|
prune() |
Remove unused images | client.images.prune() |
|
push() |
Push an image to a registry | client.images.push("repo/image") |
|
import_image() |
Import an image from a tar archive | client.images.import_image(src="image.tar") |
|
load() |
Load images from a tar stream | client.images.load(data=tar_data) |
|
| Image | tag() |
Add a tag to the image | image.tag("new_repo", "new_tag") |
history() |
Show image layer history | image.history() |
|
save() |
Save the image to a tar archive | image.save() |
|
| Networks | create() |
Create a new network | client.networks.create("my_net") |
list() |
List networks | client.networks.list() |
|
get() |
Get a network by ID | client.networks.get("network_id") |
|
prune() |
Remove unused networks | client.networks.prune() |
|
| Network | connect() |
Connect a container to the network | network.connect(container) |
disconnect() |
Disconnect a container from the network | network.disconnect(container) |
|
remove() |
Delete the network | network.remove() |
|
reload() |
Refresh network information | network.reload() |
|
| Volumes | create() |
Create a new volume | client.volumes.create("my_volume") |
list() |
List volumes | client.volumes.list() |
|
get() |
Get a volume by name | client.volumes.get("volume_name") |
|
prune() |
Remove unused volumes | client.volumes.prune() |
|
| Volume | remove() |
Delete the volume | volume.remove(force=True) |
reload() |
Refresh volume information | volume.reload() |
|
| Services | create() |
Create a Swarm service | client.services.create(image="nginx") |
list() |
List services | client.services.list() |
|
get() |
Get a service by ID | client.services.get("service_id") |
|
| Service | update() |
Update the service | service.update(image="nginx:latest") |
remove() |
Delete the service | service.remove() |
|
scale() |
Scale the service | service.scale(replicas=5) |
|
tasks() |
List service tasks | service.tasks() |
|
logs() |
Fetch service logs | service.logs() |
|
| Swarm | init() |
Initialize a Swarm | client.swarm.init() |
join() |
Join an existing Swarm | client.swarm.join(remote_addrs) |
|
leave() |
Leave a Swarm | client.swarm.leave(force=True) |
|
update() |
Update Swarm configuration | client.swarm.update() |
|
reload() |
Refresh Swarm information | client.swarm.reload() |
|
| Nodes | list() |
List Swarm nodes | client.nodes.list() |
get() |
Get a node by ID | client.nodes.get("node_id") |
|
| Node | update() |
Update node settings | node.update(availability="drain") |
| Secrets | create() |
Create a secret | client.secrets.create(name="secret", data="data") |
list() |
List secrets | client.secrets.list() |
|
get() |
Retrieve a secret by ID | client.secrets.get("secret_id") |
|
| Secret | remove() |
Delete a secret | secret.remove() |
update() |
Update secret data | secret.update(data="new_data") |
|
| Configs | create() |
Create a config | client.configs.create(name="config", data="data") |
list() |
List configs | client.configs.list() |
|
get() |
Retrieve a config by ID | client.configs.get("config_id") |
|
| Config | remove() |
Delete a config | config.remove() |
update() |
Update config data | config.update(data="new_data") |
|
| Plugins | install() |
Install a plugin | client.plugins.install("plugin_name") |
list() |
List plugins | client.plugins.list() |
|
get() |
Retrieve a plugin by name | client.plugins.get("plugin_name") |
|
| Plugin | remove() |
Remove a plugin | plugin.remove() |
enable() |
Enable a plugin | plugin.enable() |
|
disable() |
Disable a plugin | plugin.disable() |
|
configure() |
Configure a plugin | plugin.configure(options) |
|
upgrade() |
Upgrade a plugin | plugin.upgrade() |
Error Handling and Debugging
Common Exception Types
from docker.errors import (
DockerException, # Base exception
APIError, # API errors
BuildError, # Build failures
ContainerError, # Container errors
ImageNotFound, # Image not found
NotFound, # Generic resource not found
InvalidVersion # Unsupported API version
)
try:
container = client.containers.get("nonexistent")
except NotFound:
print("Container not found")
except APIError as e:
print(f"API Error: {e.response.status_code} - {e.explanation}")
except DockerException as e:
print(f"Docker Error: {e}")
Logging Configuration
import logging
# Enable debug logging
logging.basicConfig(
level=logging.DEBUG,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
# Configure Docker logger
docker_logger = logging.getLogger('docker')
docker_logger.setLevel(logging.INFO)
Timeouts and Retries
import time
from docker.errors import APIError
def retry_docker_operation(operation, max_attempts=3, delay=1):
"""Retry a Docker operation on failure"""
for attempt in range(max_attempts):
try:
return operation()
except APIError as e:
if attempt == max_attempts - 1:
raise
print(f"Attempt {attempt + 1} failed: {e}")
time.sleep(delay)
# Example usage
container = retry_docker_operation(
lambda: client.containers.run("nginx", detach=True)
)
CI/CD Integration
GitHub Actions
# deploy.py - script for GitHub Actions
import docker
import sys
import os
def deploy_application():
client = docker.from_env()
# Build the image
try:
image, logs = client.images.build(
path=".",
tag=f"myapp:{os.environ['GITHUB_SHA'][:7]}",
rm=True
)
print("Image built successfully")
# Stop old container
try:
old_container = client.containers.get("myapp")
old_container.stop()
old_container.remove()
except:
pass
# Run new container
container = client.containers.run(
f"myapp:{os.environ['GITHUB_SHA'][:7]}",
name="myapp",
ports={"8080/tcp": 80},
detach=True,
restart_policy={"Name": "unless-stopped"}
)
print(f"Application deployed: {container.id}")
except Exception as e:
print(f"Deployment error: {e}")
sys.exit(1)
if __name__ == "__main__":
deploy_application()
Jenkins Pipeline
# jenkins_deploy.py
import docker
import json
def jenkins_deployment(build_number, git_commit):
"""Deploy via Jenkins"""
client = docker.from_env()
# Image tag
image_tag = f"myapp:build-{build_number}"
# Build with Jenkins build args
build_args = {
"BUILD_NUMBER": str(build_number),
"GIT_COMMIT": git_commit,
"BUILD_DATE": "$(date -u +'%Y-%m-%dT%H:%M:%SZ')"
}
image, logs = client.images.build(
path=".",
tag=image_tag,
buildargs=build_args,
rm=True
)
# Deploy with health check
container = client.containers.run(
image_tag,
name=f"myapp-{build_number}",
ports={"8080/tcp": 8080},
environment={"ENV": "production"},
healthcheck={
"test": ["CMD", "curl", "-f", "http://localhost:8080/health"],
"interval": 30000000000, # 30 seconds in nanoseconds
"timeout": 10000000000, # 10 seconds
"retries": 3
},
detach=True
)
return container.id
Monitoring and Metrics
Collecting Container Metrics
import time
import json
from datetime import datetime
def collect_container_metrics(container_names):
"""Collect metrics for specified containers"""
metrics = {}
for name in container_names:
try:
container = client.containers.get(name)
stats = container.stats(stream=False)
# CPU calculation
cpu_delta = stats['cpu_stats']['cpu_usage']['total_usage'] - \
stats['precpu_stats']['cpu_usage']['total_usage']
system_delta = stats['cpu_stats']['system_cpu_usage'] - \
stats['precpu_stats']['system_cpu_usage']
cpu_percent = (cpu_delta / system_delta) * 100.0
# Memory usage
memory_usage = stats['memory_stats']['usage']
memory_limit = stats['memory_stats']['limit']
memory_percent = (memory_usage / memory_limit) * 100.0
metrics[name] = {
"timestamp": datetime.now().isoformat(),
"cpu_percent": round(cpu_percent, 2),
"memory_usage_mb": round(memory_usage / 1024 / 1024, 2),
"memory_percent": round(memory_percent, 2),
"status": container.status
}
except Exception as e:
metrics[name] = {"error": str(e)}
return metrics
# Example usage
container_metrics = collect_container_metrics(["web", "db", "cache"])
print(json.dumps(container_metrics, indent=2))
Automatic Scaling
def auto_scale_service(service_name, cpu_threshold=80, max_replicas=10):
"""Auto‑scale a Swarm service based on CPU usage"""
service = client.services.get(service_name)
current_replicas = service.attrs['Spec']['Mode']['Replicated']['Replicas']
# Gather metrics from all service tasks
tasks = service.tasks()
total_cpu = 0
active_tasks = 0
for task in tasks:
if task['Status']['State'] == 'running':
container_id = task['Status']['ContainerStatus']['ContainerID']
try:
container = client.containers.get(container_id)
stats = container.stats(stream=False)
# Simple CPU calculation
cpu_percent = calculate_cpu_percent(stats)
total_cpu += cpu_percent
active_tasks += 1
except:
continue
if active_tasks > 0:
avg_cpu = total_cpu / active_tasks
if avg_cpu > cpu_threshold and current_replicas < max_replicas:
new_replicas = min(current_replicas + 1, max_replicas)
service.scale(new_replicas)
print(f"Scaling {service_name}: {current_replicas} → {new_replicas}")
elif avg_cpu < cpu_threshold / 2 and current_replicas > 1:
new_replicas = max(current_replicas - 1, 1)
service.scale(new_replicas)
print(f"Scaling down {service_name}: {current_replicas} → {new_replicas}")
Security and Authentication
Configuring TLS Connections
import docker
import ssl
# TLS connection with certificates
tls_config = docker.tls.TLSConfig(
client_cert=('/path/to/cert.pem', '/path/to/key.pem'),
ca_cert='/path/to/ca.pem',
verify=True
)
client = docker.DockerClient(
base_url='https://docker-host:2376',
tls=tls_config
)
Managing Secrets
# Create a secret in Docker Swarm
secret = client.secrets.create(
name="db_password",
data="super_secret_password",
labels={"environment": "production"}
)
# Use the secret in a service
service = client.services.create(
image="postgres:13",
name="database",
secrets=[
{
"secret_id": secret.id,
"secret_name": "db_password",
"filename": "/run/secrets/db_password",
"uid": "999",
"gid": "999",
"mode": 0o400
}
],
environment={
"POSTGRES_PASSWORD_FILE": "/run/secrets/db_password"
}
)
Performance Optimization
Connection Caching
class DockerManager:
"""Manager for reusing a Docker client"""
def __init__(self):
self._client = None
@property
def client(self):
if self._client is None:
self._client = docker.from_env()
return self._client
def __del__(self):
if self._client:
self._client.close()
# Global manager instance
docker_manager = DockerManager()
# Usage example
def deploy_container():
client = docker_manager.client
return client.containers.run("nginx", detach=True)
Parallel Operation Execution
import concurrent.futures
import threading
def parallel_container_operations():
"""Parallel management of multiple containers"""
def start_container(image_name, container_name):
local_client = docker.from_env() # Separate client per thread
try:
container = local_client.containers.run(
image_name,
name=container_name,
detach=True
)
return f"Started: {container_name}"
except Exception as e:
return f"Error {container_name}: {e}"
finally:
local_client.close()
# Containers to launch
containers_to_start = [
("nginx:latest", "web1"),
("nginx:latest", "web2"),
("redis:latest", "cache"),
("postgres:13", "database")
]
# Execute in parallel
with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
futures = [
executor.submit(start_container, image, name)
for image, name in containers_to_start
]
results = [future.result() for future in concurrent.futures.as_completed(futures)]
return results
Testing with Docker SDK
Unit Tests with Temporary Containers
import unittest
import docker
import time
class TestWithDocker(unittest.TestCase):
@classmethod
def setUpClass(cls):
cls.client = docker.from_env()
@classmethod
def tearDownClass(cls):
cls.client.close()
def setUp(self):
"""Prepare test environment"""
self.test_containers = []
def tearDown(self):
"""Clean up test containers"""
for container in self.test_containers:
try:
container.stop()
container.remove()
except:
pass
def test_nginx_container(self):
"""Test launching an nginx container"""
container = self.client.containers.run(
"nginx:latest",
ports={"80/tcp": None}, # Random host port
detach=True
)
self.test_containers.append(container)
# Wait for start
time.sleep(2)
container.reload()
self.assertEqual(container.status, "running")
# Verify ports
ports = container.ports
self.assertIn("80/tcp", ports)
def test_database_connection(self):
"""Test connecting to a PostgreSQL database"""
container = self.client.containers.run(
"postgres:13",
environment={
"POSTGRES_PASSWORD": "testpass",
"POSTGRES_DB": "testdb"
},
ports={"5432/tcp": None},
detach=True
)
self.test_containers.append(container)
# Wait for DB readiness
time.sleep(10)
# Check logs for readiness message
logs = container.logs().decode('utf-8')
self.assertIn("database system is ready to accept connections", logs)
if __name__ == '__main__':
unittest.main()
Practical Use Cases
Automated Microservice Deployment
class MicroserviceDeployer:
"""Automated deployment of a microservice architecture"""
def __init__(self):
self.client = docker.from_env()
self.services = {}
def deploy_service_stack(self, config):
"""Deploy a stack of services"""
# Create network
network = self._create_network(config['network'])
# Deploy services in order
for service_config in config['services']:
service_name = service_config['name']
# Wait for dependencies
self._wait_for_dependencies(service_config.get('depends_on', []))
# Launch service
container = self._deploy_service(service_config, network)
self.services[service_name] = container
print(f"Service {service_name} deployed")
def _create_network(self, network_config):
"""Create a network for services"""
try:
return self.client.networks.get(network_config['name'])
except:
return self.client.networks.create(
name=network_config['name'],
driver=network_config.get('driver', 'bridge')
)
def _deploy_service(self, service_config, network):
"""Deploy an individual service"""
# Stop existing container
try:
old_container = self.client.containers.get(service_config['name'])
old_container.stop()
old_container.remove()
except:
pass
# Run new container
container = self.client.containers.run(
image=service_config['image'],
name=service_config['name'],
ports=service_config.get('ports', {}),
environment=service_config.get('environment', {}),
volumes=service_config.get('volumes', {}),
networks=[network.name],
restart_policy={"Name": "unless-stopped"},
detach=True
)
return container
def _wait_for_dependencies(self, dependencies):
"""Wait for dependent services to become ready"""
for dep_name in dependencies:
if dep_name in self.services:
container = self.services[dep_name]
# Simple health check or log inspection could be added here
self._wait_for_service_ready(container)
def health_check(self):
"""Collect health status of all services"""
status = {}
for name, container in self.services.items():
container.reload()
status[name] = {
"status": container.status,
"health": self._get_health_status(container)
}
return status
# Example microservice configuration
microservices_config = {
"network": {"name": "microservices_net"},
"services": [
{
"name": "database",
"image": "postgres:13",
"environment": {
"POSTGRES_PASSWORD": "secret",
"POSTGRES_DB": "appdb"
},
"volumes": {"db_data": {"bind": "/var/lib/postgresql/data"}}
},
{
"name": "api",
"image": "myapi:latest",
"ports": {"8080/tcp": 8080},
"depends_on": ["database"],
"environment": {"DB_HOST": "database"}
},
{
"name": "frontend",
"image": "myfrontend:latest",
"ports": {"80/tcp": 80},
"depends_on": ["api"],
"environment": {"API_URL": "http://api:8080"}
}
]
}
# Deploy the stack
deployer = MicroserviceDeployer()
deployer.deploy_service_stack(microservices_config)
Comparison with Alternative Solutions
| Criterion | Docker SDK for Python | Docker CLI | Podman API | Kubernetes Python Client |
|---|---|---|---|---|
| Language integration | Native Python | Shell / Subprocess | Python available | Native Python |
| Performance | High | Medium | High | Medium |
| Functionality | Full | Full | Limited | Extended |
| Ease of use | Low | Low | Medium | High |
| Orchestration support | Docker Swarm | Docker Swarm | None | Kubernetes |
| Documentation | Excellent | Excellent | Good | Excellent |
| Ecosystem | Extensive | Extensive | Growing | Very extensive |
Frequently Asked Questions
How to update an existing container without downtime?
def rolling_update(service_name, new_image):
"""Update a container without downtime"""
# Launch new container
new_container = client.containers.run(
new_image,
name=f"{service_name}_new",
ports={"80/tcp": None}, # Temporary port
detach=True
)
# Wait for readiness
time.sleep(5)
new_container.reload()
if new_container.status == "running":
# Stop old container
old_container = client.containers.get(service_name)
old_port = old_container.ports["80/tcp"][0]["HostPort"]
old_container.stop()
old_container.remove()
# Rename new container
new_container.rename(service_name)
# Re‑assign original port (requires restart)
new_container.stop()
client.containers.run(
new_image,
name=service_name,
ports={"80/tcp": old_port},
detach=True
)
How to monitor container resource usage?
def monitor_resources(container_name, duration=60):
"""Monitor resources of a container for a given duration"""
container = client.containers.get(container_name)
start_time = time.time()
metrics = []
for stats in container.stats(stream=True):
if time.time() - start_time > duration:
break
# Parse statistics
memory_usage = stats['memory_stats']['usage']
memory_limit = stats['memory_stats']['limit']
cpu_usage = stats['cpu_stats']['cpu_usage']['total_usage']
prev_cpu = stats['precpu_stats']['cpu_usage']['total_usage']
metrics.append({
"timestamp": time.time(),
"memory_percent": (memory_usage / memory_limit) * 100,
"cpu_usage": cpu_usage - prev_cpu
})
time.sleep(1)
return metrics
How to handle long‑running operations properly?
import threading
from queue import Queue
def async_build_image(dockerfile_path, tag, callback=None):
"""Asynchronous image build with optional callback"""
def build_worker():
try:
image, logs = client.images.build(
path=dockerfile_path,
tag=tag,
rm=True
)
if callback:
callback(True, image, list(logs))
except Exception as e:
if callback:
callback(False, None, str(e))
thread = threading.Thread(target=build_worker)
thread.daemon = True
thread.start()
return thread
# Example callback usage
def build_complete(success, image, logs):
if success:
print(f"Image built: {image.id}")
else:
print(f"Build error: {logs}")
build_thread = async_build_image("./app", "myapp:latest", build_complete)
How to set up centralized logging?
import logging
import json
from datetime import datetime
class DockerLogHandler(logging.Handler):
"""Custom logging handler that forwards logs to Docker containers"""
def __init__(self, container_names):
super().__init__()
self.container_names = container_names
self.client = docker.from_env()
def emit(self, record):
"""Send log entry to containers"""
log_entry = {
"timestamp": datetime.utcnow().isoformat(),
"level": record.levelname,
"message": self.format(record),
"module": record.name
}
for container_name in self.container_names:
try:
container = self.client.containers.get(container_name)
# Append log entry inside the container via stdin
container.exec_run(
f"echo '{json.dumps(log_entry)}' >> /var/log/app.log"
)
except:
pass
# Configure centralized logging
logger = logging.getLogger("myapp")
docker_handler = DockerLogHandler(["log-aggregator"])
logger.addHandler(docker_handler)
Conclusion
Docker SDK for Python is a powerful and flexible tool for programmatic Docker container management. The library offers full integration of Docker capabilities into Python applications, from simple automation tasks to complex orchestration systems.
Key advantages of Docker SDK include:
- Native Python integration — no need for subprocess calls or shell scripts
- Complete Docker API coverage — access to every Docker Engine feature
- High‑level abstractions — ease of use without sacrificing functionality
- Asynchronous support — efficient handling of long‑running operations
- Extensibility — ability to build custom wrappers and integrations
The library is indispensable for DevOps engineers, developers, and system administrators working with container technologies. It dramatically simplifies automation of deployment, monitoring, and management of containerized applications, making Docker infrastructures more programmable and controllable.
Docker SDK for Python continues to evolve alongside new Docker Engine releases, ensuring long‑term support and compatibility with modern container ecosystems.
$
The Future of AI in Mathematics and Everyday Life: How Intelligent Agents Are Already Changing the Game
Experts warned about the risks of fake charity with AI
In Russia, universal AI-agent for robots and industrial processes was developed