Pyserial – работа с COM-портами

онлайн тренажер по питону
Онлайн-тренажер Python для начинающих

Изучайте Python легко и без перегрузки теорией. Решайте практические задачи с автоматической проверкой, получайте подсказки на русском языке и пишите код прямо в браузере — без необходимости что-либо устанавливать.

Начать курс

Введение

Работа с последовательными портами остаётся актуальной в индустриальных, IoT, робототехнических и микроконтроллерных системах. Управление устройствами через COM (RS-232/RS-485/USB-Serial) требует стабильного, кроссплатформенного и гибкого инструмента. В Python таким решением является библиотека PySerial.

PySerial обеспечивает простой и мощный интерфейс для подключения, конфигурации и передачи данных по последовательным линиям. Она используется для связи с Arduino, Raspberry Pi, модемами, GPS-приёмниками, контроллерами, сенсорами и многим другим оборудованием.

О библиотеке PySerial

PySerial — это кроссплатформенная Python-библиотека для работы с последовательными портами. Разработанная Крисом Лиехтти, она стала стандартом де-факто для serial-коммуникаций в экосистеме Python. Библиотека поддерживает Windows, Linux, macOS и множество других операционных систем.

Ключевые особенности PySerial

Библиотека предоставляет унифицированный API для работы с различными типами последовательных соединений. Она автоматически обрабатывает платформо-специфичные особенности работы с портами и предоставляет разработчику простой интерфейс.

PySerial поддерживает не только физические COM-порты, но и виртуальные соединения, включая USB-to-Serial адаптеры, Bluetooth Serial Port Profile, и даже сетевые соединения через RFC2217 протокол.

Установка и подключение

Установка библиотеки выполняется через менеджер пакетов pip:

pip install pyserial

Для подключения библиотеки в Python-скрипте используется стандартный импорт:

import serial

Базовая архитектура работы

Жизненный цикл соединения

Типичная работа с PySerial следует простой схеме:

  1. Создание объекта Serial — инициализация с параметрами порта
  2. Открытие соединения — установка связи с устройством
  3. Обмен данными — отправка команд и чтение ответов
  4. Закрытие соединения — корректное завершение работы
ser = serial.Serial('COM3', baudrate=9600, timeout=1)
ser.write(b'command')
data = ser.readline()
ser.close()

Основные параметры конфигурации

При создании объекта Serial можно задать множество параметров:

  • port — имя устройства (COM1, /dev/ttyUSB0, /dev/ttyAMA0)
  • baudrate — скорость передачи данных (обычно 9600, 38400, 115200)
  • bytesize — размер символа в битах (5, 6, 7, 8)
  • parity — контроль четности (PARITY_NONE, PARITY_EVEN, PARITY_ODD)
  • stopbits — количество стоп-бит (1, 1.5, 2)
  • timeout — таймаут операций чтения в секундах

Методы чтения данных

Базовые методы чтения

PySerial предоставляет несколько способов чтения данных из порта:

Serial.read(size=1) — читает точно указанное количество байтов. Если данных недостаточно, метод блокируется до получения всех байтов или истечения таймаута.

data = ser.read(10)  # Прочитать 10 байтов

Serial.readline() — читает данные до символа новой строки (\n). Наиболее популярный метод для текстовых протоколов.

line = ser.readline().decode('utf-8').strip()

Serial.read_all() — читает все доступные данные из входного буфера немедленно.

all_data = ser.read_all()

Продвинутые методы чтения

Serial.read_until(expected) — читает данные до указанного разделителя. По умолчанию использует символ новой строки.

response = ser.read_until(b'\r\n')  # Читать до CRLF

Serial.readlines() — читает все строки, доступные в буфере.

lines = ser.readlines()
for line in lines:
    print(line.decode('utf-8').strip())

Методы записи данных

Отправка данных

Serial.write(data) — основной метод для отправки данных. Принимает только объекты типа bytes.

ser.write(b'Hello World\n')
# или
message = "Hello World\n"
ser.write(message.encode('utf-8'))

Serial.flush() — принудительно отправляет все данные из выходного буфера. Полезно для критичных по времени операций.

ser.write(b'URGENT_COMMAND\n')
ser.flush()  # Гарантировать немедленную отправку

Управление буферами

Контроль буферов ввода-вывода

PySerial автоматически управляет буферами, но предоставляет методы для ручного контроля:

reset_input_buffer() — очищает входной буфер от накопившихся данных.

reset_output_buffer() — очищает выходной буфер от неотправленных данных.

ser.reset_input_buffer()  # Очистить старые данные
ser.write(b'NEW_COMMAND\n')
response = ser.readline()

Мониторинг состояния буферов

in_waiting — свойство, показывающее количество байтов в входном буфере.

out_waiting — количество байтов в выходном буфере, ожидающих отправки.

if ser.in_waiting > 0:
    data = ser.read(ser.in_waiting)
    print(f"Получено {len(data)} байтов")

Управление соединением

Открытие и закрытие портов

Serial.open() — открывает порт для работы. Обычно порт открывается автоматически при создании объекта.

Serial.close() — закрывает соединение и освобождает ресурсы.

is_open — свойство для проверки состояния порта.

if not ser.is_open:
    ser.open()

# Работа с портом
ser.write(b'command\n')
response = ser.readline()

if ser.is_open:
    ser.close()

Context Manager

PySerial поддерживает использование в качестве контекст-менеджера:

with serial.Serial('COM3', 9600, timeout=1) as ser:
    ser.write(b'command\n')
    response = ser.readline()
# Порт автоматически закроется

Обнаружение и мониторинг портов

Список доступных портов

Модуль serial.tools.list_ports предоставляет функции для обнаружения портов:

from serial.tools import list_ports

ports = list_ports.comports()
for port in ports:
    print(f"Порт: {port.device}")
    print(f"Описание: {port.description}")
    print(f"VID:PID: {port.vid}:{port.pid}")
    print("---")

Автоматическое обнаружение Arduino

def find_arduino():
    arduino_ports = []
    ports = list_ports.comports()
    for port in ports:
        if 'Arduino' in port.description or port.vid == 0x2341:
            arduino_ports.append(port.device)
    return arduino_ports

Работа с таймаутами

Настройка таймаутов

timeout — основной таймаут для операций чтения. Может быть None (блокировка), 0 (неблокирующий режим) или положительное число секунд.

write_timeout — таймаут для операций записи.

inter_byte_timeout — таймаут между байтами при чтении.

ser = serial.Serial('COM3', 9600, 
                    timeout=5.0,        # 5 секунд на чтение
                    write_timeout=2.0,  # 2 секунды на запись
                    inter_byte_timeout=0.1)  # 100мс между байтами

Обработка ошибок

Типичные исключения

PySerial генерирует различные исключения при ошибках:

try:
    ser = serial.Serial('COM1', 9600, timeout=1)
    ser.write(b'test\n')
    response = ser.readline()
except serial.SerialException as e:
    print(f"Ошибка последовательного порта: {e}")
except serial.SerialTimeoutException as e:
    print(f"Таймаут: {e}")
except PermissionError as e:
    print(f"Нет прав доступа к порту: {e}")
except FileNotFoundError as e:
    print(f"Порт не найден: {e}")
finally:
    if 'ser' in locals() and ser.is_open:
        ser.close()

Асинхронная работа и многопоточность

Работа в отдельном потоке

Для непрерывного мониторинга данных можно использовать потоки:

import threading
import time

def serial_reader(ser, stop_event):
    while not stop_event.is_set():
        if ser.in_waiting > 0:
            data = ser.readline()
            print(f"Получено: {data.decode('utf-8').strip()}")
        time.sleep(0.01)

# Запуск читающего потока
stop_event = threading.Event()
reader_thread = threading.Thread(target=serial_reader, args=(ser, stop_event))
reader_thread.start()

# Работа основного потока
time.sleep(10)

# Остановка потока
stop_event.set()
reader_thread.join()

Очередь для безопасной передачи данных

import queue
import threading

data_queue = queue.Queue()

def data_collector(ser, q):
    while True:
        if ser.in_waiting > 0:
            data = ser.readline().decode('utf-8').strip()
            q.put(data)

collector_thread = threading.Thread(target=data_collector, args=(ser, data_queue))
collector_thread.daemon = True
collector_thread.start()

# Чтение из очереди в основном потоке
while True:
    try:
        data = data_queue.get(timeout=1)
        print(f"Обработка: {data}")
    except queue.Empty:
        continue

Управление сигналами RS-232

DTR и RTS сигналы

PySerial предоставляет полный контроль над управляющими сигналами:

# Управление DTR (Data Terminal Ready)
ser.dtr = True   # Установить DTR
ser.dtr = False  # Сбросить DTR

# Управление RTS (Request To Send)
ser.rts = True   # Установить RTS
ser.rts = False  # Сбросить RTS

# Чтение состояния входных сигналов
cts = ser.cts  # Clear To Send
dsr = ser.dsr  # Data Set Ready
ri = ser.ri    # Ring Indicator
cd = ser.cd    # Carrier Detect

Аппаратное управление потоком

ser = serial.Serial('COM3', 115200,
                    rtscts=True,  # RTS/CTS управление потоком
                    dsrdtr=True)  # DSR/DTR управление потоком

Работа с различными платформами

Windows

В Windows порты именуются как COM1, COM2, и т.д.:

ser = serial.Serial('COM3', 9600)
# или для портов > COM9
ser = serial.Serial('\\\\.\\COM10', 9600)

Linux

В Linux используются файлы устройств:

# USB-to-Serial адаптеры
ser = serial.Serial('/dev/ttyUSB0', 9600)

# Встроенные порты
ser = serial.Serial('/dev/ttyS0', 9600)

# Raspberry Pi GPIO UART
ser = serial.Serial('/dev/ttyAMA0', 9600)

macOS

В macOS порты доступны через /dev:

ser = serial.Serial('/dev/cu.usbserial-1410', 9600)
# или
ser = serial.Serial('/dev/tty.usbserial-1410', 9600)

Практические примеры

Работа с Arduino

import serial
import time

def arduino_communication():
    try:
        ser = serial.Serial('COM4', 9600, timeout=2)
        time.sleep(2)  # Ждать перезагрузки Arduino
        
        # Включить светодиод
        ser.write(b'LED_ON\n')
        response = ser.readline().decode('utf-8').strip()
        print(f"Arduino ответил: {response}")
        
        time.sleep(1)
        
        # Выключить светодиод
        ser.write(b'LED_OFF\n')
        response = ser.readline().decode('utf-8').strip()
        print(f"Arduino ответил: {response}")
        
    except serial.SerialException as e:
        print(f"Ошибка: {e}")
    finally:
        if 'ser' in locals():
            ser.close()

arduino_communication()

Работа с GPS-модулем

def parse_gps_data():
    ser = serial.Serial('/dev/ttyUSB0', 4800, timeout=1)
    
    while True:
        line = ser.readline().decode('ascii', errors='replace')
        
        if line.startswith('$GPGGA'):
            parts = line.split(',')
            if len(parts) >= 6 and parts[2] and parts[4]:
                lat = float(parts[2][:2]) + float(parts[2][2:]) / 60
                lon = float(parts[4][:3]) + float(parts[4][3:]) / 60
                
                if parts[3] == 'S':
                    lat = -lat
                if parts[5] == 'W':
                    lon = -lon
                    
                print(f"Координаты: {lat:.6f}, {lon:.6f}")

Работа с модемом

def modem_at_commands():
    ser = serial.Serial('COM5', 115200, timeout=3)
    
    def send_at_command(command):
        ser.write(f'{command}\r\n'.encode())
        response = ser.read_until(b'OK\r\n').decode()
        return response
    
    # Проверка связи
    response = send_at_command('AT')
    print('Модем отвечает:', 'OK' in response)
    
    # Информация о модеме
    info = send_at_command('ATI')
    print('Информация о модеме:', info)
    
    # Уровень сигнала
    signal = send_at_command('AT+CSQ')
    print('Уровень сигнала:', signal)
    
    ser.close()

Отладка и диагностика

Мониторинг трафика

Для отладки полезно логировать весь обмен данными:

class SerialDebugger:
    def __init__(self, port, baudrate, **kwargs):
        self.ser = serial.Serial(port, baudrate, **kwargs)
        
    def write(self, data):
        print(f"TX: {data}")
        return self.ser.write(data)
        
    def readline(self):
        data = self.ser.readline()
        print(f"RX: {data}")
        return data
        
    def close(self):
        self.ser.close()

# Использование
debug_ser = SerialDebugger('COM3', 9600)
debug_ser.write(b'test\n')
response = debug_ser.readline()

Использование spy:// URL

PySerial поддерживает специальные URL для отладки:

# Перехват трафика
ser = serial.serial_for_url('spy://COM3?file=log.txt', baudrate=9600)

Интеграция с внешними системами

Интеграция с MQTT

import paho.mqtt.client as mqtt
import json

def serial_to_mqtt():
    ser = serial.Serial('/dev/ttyUSB0', 9600)
    client = mqtt.Client()
    client.connect("mqtt.broker.com", 1883, 60)
    
    while True:
        if ser.in_waiting > 0:
            data = ser.readline().decode('utf-8').strip()
            
            # Парсинг данных сенсора
            try:
                sensor_data = json.loads(data)
                topic = f"sensors/{sensor_data['id']}"
                client.publish(topic, data)
            except json.JSONDecodeError:
                continue

Логирование данных с pandas

import pandas as pd
from datetime import datetime

def log_sensor_data():
    ser = serial.Serial('COM3', 9600)
    data_log = []
    
    try:
        while True:
            line = ser.readline().decode('utf-8').strip()
            if line:
                timestamp = datetime.now()
                # Предполагается формат: "temperature:25.6,humidity:60.2"
                values = dict(item.split(':') for item in line.split(','))
                
                log_entry = {'timestamp': timestamp}
                log_entry.update(values)
                data_log.append(log_entry)
                
                # Сохранение каждые 100 записей
                if len(data_log) >= 100:
                    df = pd.DataFrame(data_log)
                    df.to_csv('sensor_log.csv', index=False, mode='a', header=False)
                    data_log.clear()
                    
    except KeyboardInterrupt:
        if data_log:
            df = pd.DataFrame(data_log)
            df.to_csv('sensor_log.csv', index=False, mode='a', header=False)
        ser.close()

Таблица методов и свойств PySerial

Метод/Свойство Описание Пример использования
Создание и управление соединением    
serial.Serial() Создает объект последовательного порта ser = serial.Serial('COM3', 9600)
open() Открывает порт ser.open()
close() Закрывает порт ser.close()
is_open Проверяет, открыт ли порт if ser.is_open:
Чтение данных    
read(size=1) Читает указанное количество байтов data = ser.read(10)
readline() Читает строку до \n line = ser.readline()
readlines() Читает все строки из буфера lines = ser.readlines()
read_all() Читает все доступные данные data = ser.read_all()
read_until(expected) Читает до разделителя data = ser.read_until(b'\r\n')
Запись данных    
write(data) Записывает байты в порт ser.write(b'hello\n')
flush() Принудительная отправка буфера ser.flush()
Управление буферами    
reset_input_buffer() Очищает входной буфер ser.reset_input_buffer()
reset_output_buffer() Очищает выходной буфер ser.reset_output_buffer()
in_waiting Количество байтов во входном буфере bytes_available = ser.in_waiting
out_waiting Количество байтов в выходном буфере bytes_pending = ser.out_waiting
Настройки порта    
baudrate Скорость передачи ser.baudrate = 115200
bytesize Размер байта (5,6,7,8) ser.bytesize = serial.EIGHTBITS
parity Контроль четности ser.parity = serial.PARITY_NONE
stopbits Количество стоп-бит ser.stopbits = serial.STOPBITS_ONE
timeout Таймаут чтения ser.timeout = 5.0
write_timeout Таймаут записи ser.write_timeout = 2.0
inter_byte_timeout Таймаут между байтами ser.inter_byte_timeout = 0.1
Управление сигналами    
dtr Управление DTR сигналом ser.dtr = True
rts Управление RTS сигналом ser.rts = False
cts Чтение состояния CTS if ser.cts:
dsr Чтение состояния DSR if ser.dsr:
ri Чтение состояния RI if ser.ri:
cd Чтение состояния CD if ser.cd:
Специальные функции    
send_break(duration) Отправка break-сигнала ser.send_break(0.25)
serial_for_url() Создание порта по URL ser = serial.serial_for_url('loop://')
list_ports.comports() Список доступных портов ports = list_ports.comports()

Часто задаваемые вопросы

Как выбрать правильную скорость передачи?

Скорость передачи (baudrate) должна совпадать с настройками подключенного устройства. Наиболее распространенные значения: 9600, 38400, 57600, 115200. Для Arduino по умолчанию используется 9600, для ESP32 — 115200.

Почему возникает ошибка "Access denied" при открытии порта?

Эта ошибка обычно возникает когда порт уже используется другим приложением (например, Arduino IDE Serial Monitor) или у пользователя нет прав доступа к порту. Закройте все программы, использующие порт, и запустите Python-скрипт с административными правами если необходимо.

Как работать с данными в кодировке, отличной от UTF-8?

По умолчанию PySerial работает с байтами. Для декодирования используйте метод decode() с нужной кодировкой:

data = ser.readline().decode('cp1251')  # Windows-1251
data = ser.readline().decode('ascii', errors='ignore')  # Игнорировать ошибки

Что делать, если данные приходят по частям?

Используйте буферизацию или читайте до определенного разделителя:

buffer = b''
while True:
    chunk = ser.read(ser.in_waiting or 1)
    buffer += chunk
    if b'\n' in buffer:
        line, buffer = buffer.split(b'\n', 1)
        process_data(line)

Как настроить неблокирующее чтение?

Установите timeout=0 для неблокирующего режима:

ser = serial.Serial('COM3', 9600, timeout=0)
data = ser.read(100)  # Вернет доступные данные немедленно

Можно ли использовать PySerial с виртуальными портами?

Да, PySerial поддерживает виртуальные порты через URL:

# Петлевое соединение для тестирования
ser = serial.serial_for_url('loop://', timeout=1)

# Соединение через сеть (RFC2217)
ser = serial.serial_for_url('rfc2217://remote_host:4001', baudrate=9600)

Преимущества и ограничения

Преимущества PySerial

Простота использования — библиотека предоставляет интуитивно понятный API, который позволяет начать работу с минимальным количеством кода.

Кроссплатформенность — один и тот же код работает на Windows, Linux, macOS и других операционных системах.

Гибкость конфигурации — полный контроль над всеми параметрами последовательного соединения.

Надежность — библиотека используется в промышленных решениях и имеет долгую историю развития.

Расширяемость — поддержка различных типов соединений через URL-схемы.

Ограничения библиотеки

Синхронная природа — PySerial не имеет встроенной поддержки asyncio, что может быть проблемой в асинхронных приложениях.

Требует знаний протокола — для эффективной работы необходимо понимать особенности последовательной передачи данных.

Платформо-зависимые особенности — несмотря на кроссплатформенность, некоторые функции могут работать по-разному на разных ОС.

Альтернативы и дополнения

PySerial-asyncio

Для асинхронной работы рекомендуется использовать пакет pyserial-asyncio:

import asyncio
import serial_asyncio

async def main():
    reader, writer = await serial_asyncio.open_serial_connection(
        url='COM3', baudrate=9600)
    
    writer.write(b'Hello\n')
    response = await reader.readline()
    print(response.decode())

asyncio.run(main())

PyFirmata для Arduino

Для работы с Arduino через протокол Firmata:

from pyfirmata import Arduino

board = Arduino('COM3')
led = board.get_pin('d:13:o')  # Digital pin 13 as output
led.write(1)  # Turn on LED

Заключение

PySerial является незаменимым инструментом для разработчиков, работающих с последовательными портами в Python. Благодаря своей простоте, стабильности и богатому функционалу, библиотека находит применение в широком спектре задач — от простых DIY-проектов до сложных промышленных систем.

Правильное использование PySerial требует понимания основ последовательной связи, но после освоения базовых принципов библиотека предоставляет мощные возможности для создания надежных коммуникационных решений. Она остается стандартом для работы с Arduino, микроконтроллерами, промышленными контроллерами, GPS-модулями, модемами и другими устройствами, использующими последовательную передачу данных.

Независимо от сложности проекта, PySerial предоставляет необходимые инструменты для эффективной работы с последовательными портами, делая интеграцию аппаратных устройств с Python-приложениями простой и надежной задачей.

Новости