Pyppeteer - Browser Management through Puppeter

онлайн тренажер по питону
Online Python Trainer for Beginners

Learn Python easily without overwhelming theory. Solve practical tasks with automatic checking, get hints in Russian, and write code directly in your browser — no installation required.

Start Course

What Is Pyppeteer and Why You Need It

Pyppeteer is a powerful Python library that serves as an asynchronous port of Google’s popular JavaScript library Puppeteer. It gives developers full control over the Chromium/Chrome browser through Python code, making it an essential tool for web‑automation tasks.

Unlike traditional web‑scraping tools, Pyppeteer works with a real browser, allowing you to execute JavaScript, interact with dynamic content, and handle modern web applications built with React, Vue, Angular, and other frameworks.

Key Features and Benefits

Asynchronous Architecture

Pyppeteer is built on asyncio, delivering high performance when handling multiple concurrent tasks. This is crucial for large‑scale data extraction or automating many web pages at once.

Full JavaScript Support

The library lets you run any JavaScript code inside the browser, retrieve execution results, and manipulate DOM elements in real time.

Headless and GUI Modes

Pyppeteer can operate in headless mode for automation or in a regular GUI mode for debugging and demonstration.

Document Generation

Built‑in screenshot and PDF creation make the library ideal for generating reports and documentation.

Detailed Comparison with Popular Tools

Criterion Pyppeteer Selenium Puppeteer (JS) Playwright
Programming Language Python Python/Java/C# JavaScript Python/JS/Java/C#
Asynchrony Yes (asyncio) Partial Yes Yes
JavaScript Control Full Limited Full Full
Execution Speed High Medium High Very High
Headless Support Yes Yes Yes Yes
Browser Compatibility Chromium/Chrome only All browsers Chromium/Chrome Chrome/Firefox/Safari
Community Size Medium Large Large Growing
Development Activity Slowed Active Active Very Active

Installation and Environment Setup

Basic Installation

pip install pyppeteer

Installation with Development Dependencies

pip install pyppeteer[dev]

Manual Chromium Installation

Chromium is downloaded automatically on first run, but you can install it ahead of time:

import pyppeteer
import asyncio

async def install_chromium():
    await pyppeteer.install()

asyncio.run(install_chromium())

Specifying a Custom Browser Path

browser = await launch(
    executablePath='/path/to/chrome',
    headless=True
)

Comprehensive Usage Guide

Asynchronous Browser Launch and Configuration

import asyncio
from pyppeteer import launch

async def main():
    # Launch with basic options
    browser = await launch(
        headless=True,
        args=[
            '--no-sandbox',
            '--disable-setuid-sandbox',
            '--disable-dev-shm-usage',
            '--disable-gpu'
        ]
    )
    
    # Open a new page
    page = await browser.newPage()
    
    # Set viewport size
    await page.setViewport({
        'width': 1920,
        'height': 1080
    })
    
    # Navigate to a URL
    await page.goto('https://example.com')
    
    # Close the browser
    await browser.close()

asyncio.run(main())

Advanced Navigation Strategies

# Various load‑wait strategies
await page.goto('https://example.com', {
    'waitUntil': 'networkidle2',  # Wait until network is idle
    'timeout': 30000              # 30‑second timeout
})

# Wait for a specific selector
await page.waitForSelector('h1', {'timeout': 5000})

# Wait using XPath
await page.waitForXPath('//div[@class="content"]')

# Wait for a JavaScript condition
await page.waitForFunction('document.readyState === "complete"')

Working with Page Elements

# Find a single element
element = await page.querySelector('input[name="username"]')

# Find multiple elements
elements = await page.querySelectorAll('a')

# Get an attribute
href = await page.evaluate('element => element.href', element)

# Get element text
text = await page.evaluate('element => element.textContent', element)

# Check visibility
is_visible = await page.evaluate('element => element.offsetParent !== null', element)

Form Interaction and Controls

# Type with human‑like delay
await page.type('#email', 'user@example.com', {'delay': 100})

# Clear a field before typing
await page.click('#password', {'clickCount': 3})
await page.type('#password', 'newpassword')

# Work with dropdowns
await page.select('select#country', 'Russia')

# Toggle checkboxes
await page.click('input[type="checkbox"]')

# Upload files
file_input = await page.querySelector('input[type="file"]')
await file_input.uploadFile('/path/to/file.pdf')

Executing JavaScript in Page Context

# Extract data from the page
data = await page.evaluate('''() => {
    return {
        title: document.title,
        url: window.location.href,
        links: Array.from(document.querySelectorAll('a')).map(a => a.href)
    };
}''')

# Run a script with arguments
result = await page.evaluate('''(selector) => {
    const element = document.querySelector(selector);
    return element ? element.textContent : null;
}''', 'h1')

# Inject custom JavaScript
await page.addScriptTag({'content': 'window.myVar = "test";'})

Creating Screenshots and PDFs

# Full‑page screenshot
await page.screenshot({
    'path': 'fullpage.png',
    'fullPage': True,
    'type': 'png'
})

# Screenshot a specific element
element = await page.querySelector('.chart')
await element.screenshot({'path': 'chart.png'})

# Generate a PDF with custom margins
await page.pdf({
    'path': 'document.pdf',
    'format': 'A4',
    'printBackground': True,
    'margin': {
        'top': '20px',
        'right': '20px',
        'bottom': '20px',
        'left': '20px'
    }
})

Managing Cookies and Local Storage

# Work with cookies
cookies = await page.cookies()
await page.setCookie({
    'name': 'session_id',
    'value': 'abc123',
    'domain': 'example.com',
    'path': '/',
    'httpOnly': True,
    'secure': True
})

# Delete a cookie
await page.deleteCookie({'name': 'session_id'})

# Use localStorage
await page.evaluate('''() => {
    localStorage.setItem('user_preferences', JSON.stringify({
        theme: 'dark',
        language: 'en'
    }));
}''')

# Retrieve data from localStorage
storage_data = await page.evaluate('localStorage.getItem("user_preferences")')

Bypassing Bot Protection

# Set a custom user‑agent
await page.setUserAgent('Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36')

# Add extra HTTP headers
await page.setExtraHTTPHeaders({
    'Accept-Language': 'en-US,en;q=0.9'
})

# Emulate geolocation
await page.setGeolocation({'latitude': 55.7558, 'longitude': 37.6173})

# Disable images to speed up loading
await page.setRequestInterception(True)
page.on('request', lambda req: asyncio.ensure_future(
    req.abort() if req.resourceType == 'image' else req.continue_()
))

Pyppeteer Methods and Functions Overview

Core Browser Methods

Method Description Example
launch() Start a browser instance browser = await launch(headless=True)
newPage() Create a new page/tab page = await browser.newPage()
pages() Retrieve all open pages pages = await browser.pages()
close() Close the browser await browser.close()
version() Get browser version version = await browser.version()
userAgent() Return the browser’s user‑agent string ua = await browser.userAgent()

Page Navigation Methods

Method Description Example
goto() Navigate to a URL await page.goto('https://example.com')
goBack() Go back in history await page.goBack()
goForward() Go forward in history await page.goForward()
reload() Reload the current page await page.reload()
url() Return the current URL url = page.url()
title() Return the page title title = await page.title()

Element Search Methods

Method Description Example
querySelector() Find an element by CSS selector el = await page.querySelector('div')
querySelectorAll() Find all matching elements els = await page.querySelectorAll('a')
xpath() Find elements using XPath els = await page.xpath('//div')
waitForSelector() Wait for a selector to appear await page.waitForSelector('h1')
waitForXPath() Wait for an XPath match await page.waitForXPath('//h1')

Interaction Methods

Method Description Example
click() Click an element await page.click('button')
type() Type text into an input await page.type('input', 'text')
focus() Focus on an element await page.focus('input')
hover() Hover over an element await page.hover('a')
select() Select an option in a <select> await page.select('select', 'value')
keyboard.press() Press a keyboard key await page.keyboard.press('Enter')

Data Retrieval Methods

Method Description Example
content() Get page HTML content html = await page.content()
evaluate() Run JavaScript in page context result = await page.evaluate('code')
evaluateOnNewDocument() Inject JS before any page loads await page.evaluateOnNewDocument('code')
cookies() Retrieve cookies cookies = await page.cookies()
screenshot() Take a screenshot await page.screenshot({'path': 'img.png'})
pdf() Create a PDF file await page.pdf({'path': 'doc.pdf'})

Configuration Methods

Method Description Example
setViewport() Define viewport dimensions await page.setViewport({'width': 1280})
setUserAgent() Override the user‑agent string await page.setUserAgent('Mozilla...')
setExtraHTTPHeaders() Set additional HTTP headers await page.setExtraHTTPHeaders({})
setCookie() Set a cookie manually await page.setCookie({'name': 'key'})
setGeolocation() Emulate a geographic location await page.setGeolocation({'latitude': 55})
setRequestInterception() Enable request interception await page.setRequestInterception(True)

Practical Usage Examples

Automating Login

async def login_automation():
    browser = await launch(headless=False)
    page = await browser.newPage()
    
    await page.goto('https://example.com/login')
    
    # Fill the login form
    await page.type('#username', 'your_username')
    await page.type('#password', 'your_password')
    
    # Click the submit button
    await page.click('button[type="submit"]')
    
    # Wait for dashboard element
    await page.waitForSelector('.dashboard')
    
    # Save cookies for later sessions
    cookies = await page.cookies()
    
    await browser.close()
    return cookies

Parsing Dynamic Content

async def parse_dynamic_content():
    browser = await launch(headless=True)
    page = await browser.newPage()
    
    await page.goto('https://example.com/products')
    
    # Wait for AJAX‑loaded product list
    await page.waitForSelector('.product-list')
    
    # Infinite scroll to load all items
    await page.evaluate('''() => {
        return new Promise((resolve) => {
            let totalHeight = 0;
            const distance = 100;
            const timer = setInterval(() => {
                const scrollHeight = document.body.scrollHeight;
                window.scrollBy(0, distance);
                totalHeight += distance;
                
                if (totalHeight >= scrollHeight) {
                    clearInterval(timer);
                    resolve();
                }
            }, 100);
        });
    }''')
    
    # Extract product data
    products = await page.evaluate('''() => {
        return Array.from(document.querySelectorAll('.product')).map(product => ({
            name: product.querySelector('.name').textContent,
            price: product.querySelector('.price').textContent,
            image: product.querySelector('img').src
        }));
    }''')
    
    await browser.close()
    return products

Generating Chart Reports

async def generate_chart_report():
    browser = await launch(headless=True)
    page = await browser.newPage()
    
    # Simple HTML page with Chart.js
    chart_html = '''
    
    
    
        
    
    
        
        
    
    
    '''
    
    await page.setContent(chart_html)
    
    # Wait for the chart to render
    await page.waitForTimeout(2000)
    
    # Capture screenshot
    await page.screenshot({'path': 'chart_report.png', 'fullPage': True})
    
    # Save as PDF
    await page.pdf({'path': 'chart_report.pdf', 'format': 'A4'})
    
    await browser.close()

Integration with Other Libraries

Using aiohttp Together with Pyppeteer

import aiohttp
import asyncio
from pyppeteer import launch

async def combined_scraping():
    # Parallel aiohttp and Pyppeteer usage
    async with aiohttp.ClientSession() as session:
        browser = await launch(headless=True)
        page = await browser.newPage()
        
        # Fetch data via API
        async with session.get('https://api.example.com/data') as response:
            api_data = await response.json()
        
        # Scrape a web page
        await page.goto('https://example.com')
        web_data = await page.evaluate('document.title')
        
        await browser.close()
        
        return {'api': api_data, 'web': web_data}

Integrating with pandas

import pandas as pd
from pyppeteer import launch

async def scrape_to_dataframe():
    browser = await launch(headless=True)
    page = await browser.newPage()
    
    await page.goto('https://example.com/table')
    
    # Extract table rows
    table_data = await page.evaluate('''() => {
        const rows = Array.from(document.querySelectorAll('table tr'));
        return rows.map(row => {
            const cells = Array.from(row.querySelectorAll('td, th'));
            return cells.map(cell => cell.textContent.trim());
        });
    }''')
    
    await browser.close()
    
    # Build DataFrame (first row = header)
    df = pd.DataFrame(table_data[1:], columns=table_data[0])
    return df

Error Handling and Debugging

Common Error Management

import asyncio
from pyppeteer import launch
from pyppeteer.errors import TimeoutError, NetworkError

async def robust_scraping():
    browser = None
    try:
        browser = await launch(headless=True)
        page = await browser.newPage()
        
        # Set timeouts
        page.setDefaultTimeout(30000)
        page.setDefaultNavigationTimeout(60000)
        
        await page.goto('https://example.com')
        
        # Safe element wait
        try:
            await page.waitForSelector('.content', timeout=10000)
        except TimeoutError:
            print("Element not found within 10 seconds")
            return None
        
        content = await page.content()
        return content
        
    except NetworkError as e:
        print(f"Network error: {e}")
        return None
    except Exception as e:
        print(f"Unexpected error: {e}")
        return None
    finally:
        if browser:
            await browser.close()

Debugging with Logging

import logging
from pyppeteer import launch

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

async def debug_scraping():
    browser = await launch(
        headless=False,
        slowMo=250,          # Slow down actions for observation
        devtools=True        # Open Chrome DevTools
    )
    
    page = await browser.newPage()
    
    # Log console messages from the page
    page.on('console', lambda msg: logger.info(f'Console: {msg.text}'))
    
    # Log network requests
    page.on('request', lambda req: logger.info(f'Request: {req.url}'))
    
    await page.goto('https://example.com')
    await page.screenshot({'path': 'debug.png'})
    
    await browser.close()

Performance Optimization

Disabling Unnecessary Resources

async def optimized_scraping():
    browser = await launch(
        headless=True,
        args=[
            '--no-sandbox',
            '--disable-setuid-sandbox',
            '--disable-dev-shm-usage',
            '--disable-gpu',
            '--disable-images',
            '--disable-javascript',   # Only if JS isn’t needed
            '--disable-plugins',
            '--disable-extensions'
        ]
    )
    
    page = await browser.newPage()
    
    # Block images, stylesheets, and fonts
    await page.setRequestInterception(True)
    page.on('request', lambda req: asyncio.ensure_future(
        req.abort() if req.resourceType in ['image', 'stylesheet', 'font']
        else req.continue_()
    ))
    
    await page.goto('https://example.com')
    content = await page.content()
    
    await browser.close()
    return content

Browser Pooling

import asyncio
from pyppeteer import launch

class BrowserPool:
    def __init__(self, size=5):
        self.size = size
        self.browsers = []
        self.available = asyncio.Queue()
    
    async def init(self):
        for _ in range(self.size):
            browser = await launch(headless=True)
            self.browsers.append(browser)
            await self.available.put(browser)
    
    async def get_browser(self):
        return await self.available.get()
    
    async def return_browser(self, browser):
        await self.available.put(browser)
    
    async def close_all(self):
        for browser in self.browsers:
            await browser.close()

# Usage example
pool = BrowserPool(3)
await pool.init()

browser = await pool.get_browser()
page = await browser.newPage()
# ... work with the page ...
await page.close()
await pool.return_browser(browser)

Testing with Pyppeteer

Creating Automated Tests

import pytest
from pyppeteer import launch

@pytest.fixture
async def browser():
    browser = await launch(headless=True)
    yield browser
    await browser.close()

@pytest.fixture
async def page(browser):
    page = await browser.newPage()
    yield page
    await page.close()

async def test_login_form(page):
    await page.goto('https://example.com/login')
    
    # Verify form presence
    login_form = await page.querySelector('form#login')
    assert login_form is not None
    
    # Verify input fields
    username_field = await page.querySelector('input[name="username"]')
    password_field = await page.querySelector('input[name="password"]')
    
    assert username_field is not None
    assert password_field is not None
    
    # Fill the form
    await page.type('input[name="username"]', 'testuser')
    await page.type('input[name="password"]', 'testpass')
    
    # Check entered values
    username_value = await page.evaluate('document.querySelector("input[name=username]").value')
    assert username_value == 'testuser'

async def test_page_load_time(page):
    start_time = asyncio.get_event_loop().time()
    await page.goto('https://example.com')
    end_time = asyncio.get_event_loop().time()
    
    load_time = end_time - start_time
    assert load_time < 5.0  # Page should load in under 5 seconds

Alternatives and the Future of Pyppeteer

Current Project Status

Pyppeteer has not received active updates since 2021, which imposes several limitations for new projects:

  • Lack of support for newer Chrome releases
  • No critical bug fixes
  • Limited compatibility with the latest Python versions

Recommended Alternatives

Playwright for Python – the most modern, actively maintained alternative:

# Playwright example
from playwright.async_api import async_playwright

async def playwright_example():
    async with async_playwright() as p:
        browser = await p.chromium.launch()
        page = await browser.new_page()
        await page.goto('https://example.com')
        await page.screenshot(path='example.png')
        await browser.close()

Pyppeteer2 – an unofficial fork that receives regular updates:

pip install pyppeteer2

Best Practices and Recommendations

Project Structure

project/
├── scrapers/
│   ├── __init__.py
│   ├── base.py
│   └── specific_scraper.py
├── utils/
│   ├── __init__.py
│   ├── browser_utils.py
│   └── data_processing.py
├── config.py
└── main.py

Production Configuration

# config.py
import os

BROWSER_CONFIG = {
    'headless': os.getenv('HEADLESS', 'true').lower() == 'true',
    'args': [
        '--no-sandbox',
        '--disable-setuid-sandbox',
        '--disable-dev-shm-usage',
        '--disable-gpu',
        '--memory-pressure-off',
        '--max_old_space_size=4096'
    ]
}

TIMEOUTS = {
    'navigation': 60000,
    'default': 30000,
    'element_wait': 10000
}

Monitoring and Logging

import logging
import time
from functools import wraps

def log_execution_time(func):
    @wraps(func)
    async def wrapper(*args, **kwargs):
        start_time = time.time()
        result = await func(*args, **kwargs)
        end_time = time.time()
        logging.info(f'{func.__name__} completed in {end_time - start_time:.2f} seconds')
        return result
    return wrapper

@log_execution_time
async def scrape_page(url):
    # Scraping logic goes here
    pass

Common Problem Solutions

Memory Issues

import gc
from pyppeteer import launch

async def memory_efficient_scraping():
    browser = await launch(headless=True)
    
    try:
        for url in urls:
            page = await browser.newPage()
            await page.goto(url)
            # Process page data here
            await page.close()          # Important: close each page
            gc.collect()                # Force garbage collection
    finally:
        await browser.close()

Bypassing Blocks

import random
import asyncio

async def stealth_scraping():
    # Random delays
    await asyncio.sleep(random.uniform(1, 3))
    
    # Randomized user‑agents
    user_agents = [
        'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
        'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36',
        'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36'
    ]
    
    await page.setUserAgent(random.choice(user_agents))
    
    # Human‑like mouse movements
    await page.mouse.move(100, 100)
    await page.mouse.move(200, 200)

Security and Ethical Considerations

Respecting robots.txt

import urllib.robotparser

def can_fetch(url, user_agent='*'):
    rp = urllib.robotparser.RobotFileParser()
    rp.set_url(f"{url}/robots.txt")
    rp.read()
    return rp.can_fetch(user_agent, url)

async def ethical_scraping(url):
    if not can_fetch(url):
        print(f"Access to {url} is disallowed by robots.txt")
        return
    
    # Continue with scraping...

Rate Limiting Requests

import asyncio
from asyncio import Semaphore

class RateLimiter:
    def __init__(self, max_requests_per_second=1):
        self.max_requests = max_requests_per_second
        self.semaphore = Semaphore(max_requests_per_second)
        self.requests = []
    
    async def acquire(self):
        async with self.semaphore:
            now = asyncio.get_event_loop().time()
            self.requests = [req for req in self.requests if now - req < 1.0]
            
            if len(self.requests) >= self.max_requests:
                sleep_time = 1.0 - (now - self.requests[0])
                await asyncio.sleep(sleep_time)
            
            self.requests.append(now)

Pyppeteer remains a powerful browser‑automation tool for Python despite its slower development pace. For new projects, consider modern alternatives like Playwright, but existing Pyppeteer codebases can continue to thrive with proper architecture, robust error handling, performance tuning, and ethical scraping practices.

News