Pyserial-Work with COM ports

онлайн тренажер по питону
Online Python Trainer for Beginners

Learn Python easily without overwhelming theory. Solve practical tasks with automatic checking, get hints in Russian, and write code directly in your browser — no installation required.

Start Course

Introduction

Working with serial ports remains relevant in industrial, IoT, robotics, and microcontroller systems. Controlling devices via COM (RS-232/RS-485/USB‑Serial) requires a stable, cross‑platform, and flexible tool. In Python, the solution is the PySerial library.

PySerial provides a simple yet powerful interface for connecting, configuring, and transmitting data over serial lines. It is used to communicate with Arduino, Raspberry Pi, modems, GPS receivers, controllers, sensors, and many other pieces of equipment.

About the PySerial Library

PySerial — a cross‑platform Python library for working with serial ports. Developed by Chris Liechti, it has become the de‑facto standard for serial communications in the Python ecosystem. The library supports Windows, Linux, macOS, and many other operating systems.

Key Features of PySerial

The library offers a unified API for handling various types of serial connections. It automatically manages platform‑specific quirks and presents developers with a straightforward interface.

PySerial supports not only physical COM ports but also virtual connections, including USB‑to‑Serial adapters, Bluetooth Serial Port Profile, and even network connections via the RFC2217 protocol.

Installation and Import

Install the library using the pip package manager:

pip install pyserial

Import the library in a Python script with the standard import statement:

import serial

Basic Architecture

Connection Lifecycle

A typical PySerial workflow follows a simple scheme:

  1. Create a Serial object — initialize with port parameters
  2. Open the connection — establish communication with the device
  3. Exchange data — send commands and read responses
  4. Close the connection — gracefully terminate the session
ser = serial.Serial('COM3', baudrate=9600, timeout=1)
ser.write(b'command')
data = ser.readline()
ser.close()

Main Configuration Parameters

When creating a Serial object you can specify many parameters:

  • port — device name (COM1, /dev/ttyUSB0, /dev/ttyAMA0)
  • baudrate — data rate (commonly 9600, 38400, 115200)
  • bytesize — character size in bits (5, 6, 7, 8)
  • parity — parity control (PARITY_NONE, PARITY_EVEN, PARITY_ODD)
  • stopbits — number of stop bits (1, 1.5, 2)
  • timeout — read timeout in seconds

Data Reading Methods

Basic Read Methods

PySerial provides several ways to read data from a port:

Serial.read(size=1) — reads exactly the specified number of bytes. If insufficient data is available, the method blocks until all bytes are received or the timeout expires.

data = ser.read(10)  # Read 10 bytes

Serial.readline() — reads until a newline character (\n). This is the most popular method for text‑based protocols.

line = ser.readline().decode('utf-8').strip()

Serial.read_all() — immediately reads all available data from the input buffer.

all_data = ser.read_all()

Advanced Read Methods

Serial.read_until(expected) — reads until the specified delimiter is encountered (defaults to newline).

response = ser.read_until(b'\r\n')  # Read until CRLF

Serial.readlines() — reads all lines currently available in the buffer.

lines = ser.readlines()
for line in lines:
    print(line.decode('utf-8').strip())

Data Writing Methods

Sending Data

Serial.write(data) — the primary method for transmitting data. It accepts only bytes objects.

ser.write(b'Hello World\n')
# or
message = "Hello World\n"
ser.write(message.encode('utf-8'))

Serial.flush() — forces all buffered output data to be sent immediately. Useful for time‑critical operations.

ser.write(b'URGENT_COMMAND\n')
ser.flush()  # Ensure immediate transmission

Buffer Management

Input/Output Buffer Control

PySerial manages buffers automatically but also provides methods for manual control:

reset_input_buffer() — clears the input buffer of accumulated data.

reset_output_buffer() — clears the output buffer of unsent data.

ser.reset_input_buffer()  # Discard stale data
ser.write(b'NEW_COMMAND\n')
response = ser.readline()

Monitoring Buffer State

in_waiting — property that shows the number of bytes waiting in the input buffer.

out_waiting — property that shows the number of bytes pending transmission in the output buffer.

if ser.in_waiting > 0:
    data = ser.read(ser.in_waiting)
    print(f"Received {len(data)} bytes")

Connection Management

Opening and Closing Ports

Serial.open() — opens the port for use. Usually the port opens automatically when the object is instantiated.

Serial.close() — closes the connection and releases resources.

is_open — property to check whether the port is currently open.

if not ser.is_open:
    ser.open()

# Work with the port
ser.write(b'command\n')
response = ser.readline()

if ser.is_open:
    ser.close()

Context Manager

PySerial supports usage as a context manager:

with serial.Serial('COM3', 9600, timeout=1) as ser:
    ser.write(b'command\n')
    response = ser.readline()
# Port is closed automatically

Port Discovery and Monitoring

Listing Available Ports

The serial.tools.list_ports module provides functions for port discovery:

from serial.tools import list_ports

ports = list_ports.comports()
for port in ports:
    print(f"Port: {port.device}")
    print(f"Description: {port.description}")
    print(f"VID:PID: {port.vid}:{port.pid}")
    print("---")

Automatic Arduino Detection

def find_arduino():
    arduino_ports = []
    ports = list_ports.comports()
    for port in ports:
        if 'Arduino' in port.description or port.vid == 0x2341:
            arduino_ports.append(port.device)
    return arduino_ports

Working with Timeouts

Timeout Configuration

timeout — primary timeout for read operations. Can be None (blocking), 0 (non‑blocking), or a positive number of seconds.

write_timeout — timeout for write operations.

inter_byte_timeout — timeout between bytes during a read.

ser = serial.Serial('COM3', 9600, 
                    timeout=5.0,        # 5 seconds for reads
                    write_timeout=2.0,  # 2 seconds for writes
                    inter_byte_timeout=0.1)  # 100 ms between bytes

Error Handling

Typical Exceptions

PySerial raises various exceptions on errors:

try:
    ser = serial.Serial('COM1', 9600, timeout=1)
    ser.write(b'test\n')
    response = ser.readline()
except serial.SerialException as e:
    print(f"Serial port error: {e}")
except serial.SerialTimeoutException as e:
    print(f"Timeout: {e}")
except PermissionError as e:
    print(f"Permission denied for port: {e}")
except FileNotFoundError as e:
    print(f"Port not found: {e}")
finally:
    if 'ser' in locals() and ser.is_open:
        ser.close()

Asynchronous Operation and Multithreading

Running in a Separate Thread

For continuous monitoring you can use a background thread:

import threading
import time

def serial_reader(ser, stop_event):
    while not stop_event.is_set():
        if ser.in_waiting > 0:
            data = ser.readline()
            print(f"Received: {data.decode('utf-8').strip()}")
        time.sleep(0.01)

# Start the reader thread
stop_event = threading.Event()
reader_thread = threading.Thread(target=serial_reader, args=(ser, stop_event))
reader_thread.start()

# Main thread work
time.sleep(10)

# Stop the thread
stop_event.set()
reader_thread.join()

Queue for Safe Data Transfer

import queue
import threading

data_queue = queue.Queue()

def data_collector(ser, q):
    while True:
        if ser.in_waiting > 0:
            data = ser.readline().decode('utf-8').strip()
            q.put(data)

collector_thread = threading.Thread(target=data_collector, args=(ser, data_queue))
collector_thread.daemon = True
collector_thread.start()

# Consume from the queue in the main thread
while True:
    try:
        data = data_queue.get(timeout=1)
        print(f"Processing: {data}")
    except queue.Empty:
        continue

RS‑232 Signal Management

DTR and RTS Signals

PySerial provides full control over control‑line signals:

# Control DTR (Data Terminal Ready)
ser.dtr = True   # Set DTR
ser.dtr = False  # Clear DTR

# Control RTS (Request To Send)
ser.rts = True   # Set RTS
ser.rts = False  # Clear RTS

# Read input signal states
cts = ser.cts  # Clear To Send
dsr = ser.dsr  # Data Set Ready
ri = ser.ri    # Ring Indicator
cd = ser.cd    # Carrier Detect

Hardware Flow Control

ser = serial.Serial('COM3', 115200,
                    rtscts=True,  # RTS/CTS hardware flow control
                    dsrdtr=True)  # DSR/DTR hardware flow control

Cross‑Platform Usage

Windows

On Windows ports are named COM1, COM2, etc.:

ser = serial.Serial('COM3', 9600)
# For ports greater than COM9
ser = serial.Serial('\\\\.\\COM10', 9600)

Linux

Linux uses device files:

# USB‑to‑Serial adapters
ser = serial.Serial('/dev/ttyUSB0', 9600)

# Built‑in ports
ser = serial.Serial('/dev/ttyS0', 9600)

# Raspberry Pi GPIO UART
ser = serial.Serial('/dev/ttyAMA0', 9600)

macOS

On macOS ports are accessible through /dev:

ser = serial.Serial('/dev/cu.usbserial-1410', 9600)
# or
ser = serial.Serial('/dev/tty.usbserial-1410', 9600)

Practical Examples

Working with Arduino

import serial
import time

def arduino_communication():
    try:
        ser = serial.Serial('COM4', 9600, timeout=2)
        time.sleep(2)  # Wait for Arduino reset
        
        # Turn LED on
        ser.write(b'LED_ON\n')
        response = ser.readline().decode('utf-8').strip()
        print(f"Arduino replied: {response}")
        
        time.sleep(1)
        
        # Turn LED off
        ser.write(b'LED_OFF\n')
        response = ser.readline().decode('utf-8').strip()
        print(f"Arduino replied: {response}")
        
    except serial.SerialException as e:
        print(f"Error: {e}")
    finally:
        if 'ser' in locals():
            ser.close()

arduino_communication()

Working with a GPS Module

def parse_gps_data():
    ser = serial.Serial('/dev/ttyUSB0', 4800, timeout=1)
    
    while True:
        line = ser.readline().decode('ascii', errors='replace')
        
        if line.startswith('$GPGGA'):
            parts = line.split(',')
            if len(parts) >= 6 and parts[2] and parts[4]:
                lat = float(parts[2][:2]) + float(parts[2][2:]) / 60
                lon = float(parts[4][:3]) + float(parts[4][3:]) / 60
                
                if parts[3] == 'S':
                    lat = -lat
                if parts[5] == 'W':
                    lon = -lon
                    
                print(f"Coordinates: {lat:.6f}, {lon:.6f}")

Working with a Modem

def modem_at_commands():
    ser = serial.Serial('COM5', 115200, timeout=3)
    
    def send_at_command(command):
        ser.write(f'{command}\r\n'.encode())
        response = ser.read_until(b'OK\r\n').decode()
        return response
    
    # Check connectivity
    response = send_at_command('AT')
    print('Modem response:', 'OK' in response)
    
    # Modem information
    info = send_at_command('ATI')
    print('Modem info:', info)
    
    # Signal quality
    signal = send_at_command('AT+CSQ')
    print('Signal quality:', signal)
    
    ser.close()

Debugging and Diagnostics

Traffic Monitoring

For debugging, it is useful to log the entire data exchange:

class SerialDebugger:
    def __init__(self, port, baudrate, **kwargs):
        self.ser = serial.Serial(port, baudrate, **kwargs)
        
    def write(self, data):
        print(f"TX: {data}")
        return self.ser.write(data)
        
    def readline(self):
        data = self.ser.readline()
        print(f"RX: {data}")
        return data
        
    def close(self):
        self.ser.close()

# Usage example
debug_ser = SerialDebugger('COM3', 9600)
debug_ser.write(b'test\n')
response = debug_ser.readline()

Using the spy:// URL

PySerial supports special URLs for debugging:

# Intercept traffic
ser = serial.serial_for_url('spy://COM3?file=log.txt', baudrate=9600)

Integration with External Systems

Integration with MQTT

import paho.mqtt.client as mqtt
import json

def serial_to_mqtt():
    ser = serial.Serial('/dev/ttyUSB0', 9600)
    client = mqtt.Client()
    client.connect("mqtt.broker.com", 1883, 60)
    
    while True:
        if ser.in_waiting > 0:
            data = ser.readline().decode('utf-8').strip()
            
            # Parse sensor payload
            try:
                sensor_data = json.loads(data)
                topic = f"sensors/{sensor_data['id']}"
                client.publish(topic, data)
            except json.JSONDecodeError:
                continue

Logging Data with pandas

import pandas as pd
from datetime import datetime

def log_sensor_data():
    ser = serial.Serial('COM3', 9600)
    data_log = []
    
    try:
        while True:
            line = ser.readline().decode('utf-8').strip()
            if line:
                timestamp = datetime.now()
                # Expected format: "temperature:25.6,humidity:60.2"
                values = dict(item.split(':') for item in line.split(','))
                
                log_entry = {'timestamp': timestamp}
                log_entry.update(values)
                data_log.append(log_entry)
                
                # Save every 100 entries
                if len(data_log) >= 100:
                    df = pd.DataFrame(data_log)
                    df.to_csv('sensor_log.csv', index=False, mode='a', header=False)
                    data_log.clear()
                    
    except KeyboardInterrupt:
        if data_log:
            df = pd.DataFrame(data_log)
            df.to_csv('sensor_log.csv', index=False, mode='a', header=False)
        ser.close()

PySerial Methods and Properties Table

Method/Property Description Example Usage
Creating and Managing Connections    
serial.Serial() Creates a serial port object ser = serial.Serial('COM3', 9600)
open() Opens the port ser.open()
close() Closes the port ser.close()
is_open Checks whether the port is open if ser.is_open:
Reading Data    
read(size=1) Reads the specified number of bytes data = ser.read(10)
readline() Reads a line up to \n line = ser.readline()
readlines() Reads all lines from the buffer lines = ser.readlines()
read_all() Reads all available data data = ser.read_all()
read_until(expected) Reads until a delimiter data = ser.read_until(b'\r\n')
Writing Data    
write(data) Writes bytes to the port ser.write(b'hello\n')
flush() Forces the output buffer to be sent ser.flush()
Buffer Management    
reset_input_buffer() Clears the input buffer ser.reset_input_buffer()
reset_output_buffer() Clears the output buffer ser.reset_output_buffer()
in_waiting Number of bytes in the input buffer bytes_available = ser.in_waiting
out_waiting Number of bytes in the output buffer bytes_pending = ser.out_waiting
Port Settings    
baudrate Transmission speed ser.baudrate = 115200
bytesize Byte size (5,6,7,8) ser.bytesize = serial.EIGHTBITS
parity Parity control ser.parity = serial.PARITY_NONE
stopbits Number of stop bits ser.stopbits = serial.STOPBITS_ONE
timeout Read timeout ser.timeout = 5.0
write_timeout Write timeout ser.write_timeout = 2.0
inter_byte_timeout Inter‑byte timeout ser.inter_byte_timeout = 0.1
Signal Control    
dtr Controls the DTR line ser.dtr = True
rts Controls the RTS line ser.rts = False
cts Reads CTS state if ser.cts:
dsr Reads DSR state if ser.dsr:
ri Reads RI state if ser.ri:
cd Reads CD state if ser.cd:
Special Functions    
send_break(duration) Sends a break signal ser.send_break(0.25)
serial_for_url() Creates a port from a URL ser = serial.serial_for_url('loop://')
list_ports.comports() Lists available ports ports = list_ports.comports()

Frequently Asked Questions

How do I choose the correct baud rate?

The baud rate must match the settings of the connected device. Common values are 9600, 38400, 57600, and 115200. Arduino defaults to 9600, while ESP32 typically uses 115200.

Why do I get an "Access denied" error when opening a port?

This usually occurs when another application (e.g., the Arduino IDE Serial Monitor) already has the port open or when the user lacks sufficient permissions. Close any programs using the port and run the Python script with administrative rights if necessary.

How do I work with encodings other than UTF‑8?

PySerial deals with raw bytes by default. To decode, use the decode() method with the desired encoding:

data = ser.readline().decode('cp1251')  # Windows‑1251
data = ser.readline().decode('ascii', errors='ignore')  # Ignore errors

What if data arrives in fragments?

Use buffering or read until a specific delimiter:

buffer = b''
while True:
    chunk = ser.read(ser.in_waiting or 1)
    buffer += chunk
    if b'\n' in buffer:
        line, buffer = buffer.split(b'\n', 1)
        process_data(line)

How can I set up non‑blocking reads?

Set timeout=0 for non‑blocking mode:

ser = serial.Serial('COM3', 9600, timeout=0)
data = ser.read(100)  # Returns immediately with any available data

Can PySerial work with virtual ports?

Yes, PySerial supports virtual ports via URLs:

# Loopback connection for testing
ser = serial.serial_for_url('loop://', timeout=1)

# Network connection (RFC2217)
ser = serial.serial_for_url('rfc2217://remote_host:4001', baudrate=9600)

Advantages and Limitations

Advantages of PySerial

Ease of use — the library offers an intuitive API that lets you start working with minimal code.

Cross‑platform — the same code runs on Windows, Linux, macOS, and other operating systems.

Configuration flexibility — full control over every serial‑connection parameter.

Reliability — used in industrial solutions and backed by a long development history.

Extensibility — support for various connection types via URL schemes.

Limitations of the Library

Synchronous nature — PySerial lacks built‑in asyncio support, which may be a drawback for asynchronous applications.

Protocol knowledge required — effective use demands understanding of the specific serial protocol you are communicating with.

Platform‑specific quirks — although cross‑platform, some features may behave differently across operating systems.

Alternatives and Add‑ons

PySerial‑asyncio

For asynchronous work, the pyserial-asyncio package is recommended:

import asyncio
import serial_asyncio

async def main():
    reader, writer = await serial_asyncio.open_serial_connection(
        url='COM3', baudrate=9600)
    
    writer.write(b'Hello\n')
    response = await reader.readline()
    print(response.decode())

asyncio.run(main())

PyFirmata for Arduino

For Arduino communication via the Firmata protocol:

from pyfirmata import Arduino

board = Arduino('COM3')
led = board.get_pin('d:13:o')  # Digital pin 13 as output
led.write(1)  # Turn on LED

Conclusion

PySerial is an indispensable tool for developers working with serial ports in Python. Thanks to its simplicity, stability, and rich feature set, the library is applicable across a wide range of tasks—from simple DIY projects to complex industrial systems.

Proper use of PySerial requires a solid grasp of serial communication fundamentals, but once the basics are mastered, the library offers powerful capabilities for building reliable communication solutions. It remains the standard for interfacing with Arduino, microcontrollers, industrial controllers, GPS modules, modems, and any other devices that rely on serial data transmission.

Regardless of project complexity, PySerial provides the necessary tools for efficient serial‑port work, making hardware integration with Python applications straightforward and dependable.

$

News