Введение
Работа с последовательными портами остаётся актуальной в индустриальных, IoT, робототехнических и микроконтроллерных системах. Управление устройствами через COM (RS-232/RS-485/USB-Serial) требует стабильного, кроссплатформенного и гибкого инструмента. В Python таким решением является библиотека PySerial.
PySerial обеспечивает простой и мощный интерфейс для подключения, конфигурации и передачи данных по последовательным линиям. Она используется для связи с Arduino, Raspberry Pi, модемами, GPS-приёмниками, контроллерами, сенсорами и многим другим оборудованием.
О библиотеке PySerial
PySerial — это кроссплатформенная Python-библиотека для работы с последовательными портами. Разработанная Крисом Лиехтти, она стала стандартом де-факто для serial-коммуникаций в экосистеме Python. Библиотека поддерживает Windows, Linux, macOS и множество других операционных систем.
Ключевые особенности PySerial
Библиотека предоставляет унифицированный API для работы с различными типами последовательных соединений. Она автоматически обрабатывает платформо-специфичные особенности работы с портами и предоставляет разработчику простой интерфейс.
PySerial поддерживает не только физические COM-порты, но и виртуальные соединения, включая USB-to-Serial адаптеры, Bluetooth Serial Port Profile, и даже сетевые соединения через RFC2217 протокол.
Установка и подключение
Установка библиотеки выполняется через менеджер пакетов pip:
pip install pyserial
Для подключения библиотеки в Python-скрипте используется стандартный импорт:
import serial
Базовая архитектура работы
Жизненный цикл соединения
Типичная работа с PySerial следует простой схеме:
- Создание объекта Serial — инициализация с параметрами порта
- Открытие соединения — установка связи с устройством
- Обмен данными — отправка команд и чтение ответов
- Закрытие соединения — корректное завершение работы
ser = serial.Serial('COM3', baudrate=9600, timeout=1)
ser.write(b'command')
data = ser.readline()
ser.close()
Основные параметры конфигурации
При создании объекта Serial можно задать множество параметров:
- port — имя устройства (COM1, /dev/ttyUSB0, /dev/ttyAMA0)
- baudrate — скорость передачи данных (обычно 9600, 38400, 115200)
- bytesize — размер символа в битах (5, 6, 7, 8)
- parity — контроль четности (PARITY_NONE, PARITY_EVEN, PARITY_ODD)
- stopbits — количество стоп-бит (1, 1.5, 2)
- timeout — таймаут операций чтения в секундах
Методы чтения данных
Базовые методы чтения
PySerial предоставляет несколько способов чтения данных из порта:
Serial.read(size=1) — читает точно указанное количество байтов. Если данных недостаточно, метод блокируется до получения всех байтов или истечения таймаута.
data = ser.read(10) # Прочитать 10 байтов
Serial.readline() — читает данные до символа новой строки (\n). Наиболее популярный метод для текстовых протоколов.
line = ser.readline().decode('utf-8').strip()
Serial.read_all() — читает все доступные данные из входного буфера немедленно.
all_data = ser.read_all()
Продвинутые методы чтения
Serial.read_until(expected) — читает данные до указанного разделителя. По умолчанию использует символ новой строки.
response = ser.read_until(b'\r\n') # Читать до CRLF
Serial.readlines() — читает все строки, доступные в буфере.
lines = ser.readlines()
for line in lines:
print(line.decode('utf-8').strip())
Методы записи данных
Отправка данных
Serial.write(data) — основной метод для отправки данных. Принимает только объекты типа bytes.
ser.write(b'Hello World\n')
# или
message = "Hello World\n"
ser.write(message.encode('utf-8'))
Serial.flush() — принудительно отправляет все данные из выходного буфера. Полезно для критичных по времени операций.
ser.write(b'URGENT_COMMAND\n')
ser.flush() # Гарантировать немедленную отправку
Управление буферами
Контроль буферов ввода-вывода
PySerial автоматически управляет буферами, но предоставляет методы для ручного контроля:
reset_input_buffer() — очищает входной буфер от накопившихся данных.
reset_output_buffer() — очищает выходной буфер от неотправленных данных.
ser.reset_input_buffer() # Очистить старые данные
ser.write(b'NEW_COMMAND\n')
response = ser.readline()
Мониторинг состояния буферов
in_waiting — свойство, показывающее количество байтов в входном буфере.
out_waiting — количество байтов в выходном буфере, ожидающих отправки.
if ser.in_waiting > 0:
data = ser.read(ser.in_waiting)
print(f"Получено {len(data)} байтов")
Управление соединением
Открытие и закрытие портов
Serial.open() — открывает порт для работы. Обычно порт открывается автоматически при создании объекта.
Serial.close() — закрывает соединение и освобождает ресурсы.
is_open — свойство для проверки состояния порта.
if not ser.is_open:
ser.open()
# Работа с портом
ser.write(b'command\n')
response = ser.readline()
if ser.is_open:
ser.close()
Context Manager
PySerial поддерживает использование в качестве контекст-менеджера:
with serial.Serial('COM3', 9600, timeout=1) as ser:
ser.write(b'command\n')
response = ser.readline()
# Порт автоматически закроется
Обнаружение и мониторинг портов
Список доступных портов
Модуль serial.tools.list_ports предоставляет функции для обнаружения портов:
from serial.tools import list_ports
ports = list_ports.comports()
for port in ports:
print(f"Порт: {port.device}")
print(f"Описание: {port.description}")
print(f"VID:PID: {port.vid}:{port.pid}")
print("---")
Автоматическое обнаружение Arduino
def find_arduino():
arduino_ports = []
ports = list_ports.comports()
for port in ports:
if 'Arduino' in port.description or port.vid == 0x2341:
arduino_ports.append(port.device)
return arduino_ports
Работа с таймаутами
Настройка таймаутов
timeout — основной таймаут для операций чтения. Может быть None (блокировка), 0 (неблокирующий режим) или положительное число секунд.
write_timeout — таймаут для операций записи.
inter_byte_timeout — таймаут между байтами при чтении.
ser = serial.Serial('COM3', 9600,
timeout=5.0, # 5 секунд на чтение
write_timeout=2.0, # 2 секунды на запись
inter_byte_timeout=0.1) # 100мс между байтами
Обработка ошибок
Типичные исключения
PySerial генерирует различные исключения при ошибках:
try:
ser = serial.Serial('COM1', 9600, timeout=1)
ser.write(b'test\n')
response = ser.readline()
except serial.SerialException as e:
print(f"Ошибка последовательного порта: {e}")
except serial.SerialTimeoutException as e:
print(f"Таймаут: {e}")
except PermissionError as e:
print(f"Нет прав доступа к порту: {e}")
except FileNotFoundError as e:
print(f"Порт не найден: {e}")
finally:
if 'ser' in locals() and ser.is_open:
ser.close()
Асинхронная работа и многопоточность
Работа в отдельном потоке
Для непрерывного мониторинга данных можно использовать потоки:
import threading
import time
def serial_reader(ser, stop_event):
while not stop_event.is_set():
if ser.in_waiting > 0:
data = ser.readline()
print(f"Получено: {data.decode('utf-8').strip()}")
time.sleep(0.01)
# Запуск читающего потока
stop_event = threading.Event()
reader_thread = threading.Thread(target=serial_reader, args=(ser, stop_event))
reader_thread.start()
# Работа основного потока
time.sleep(10)
# Остановка потока
stop_event.set()
reader_thread.join()
Очередь для безопасной передачи данных
import queue
import threading
data_queue = queue.Queue()
def data_collector(ser, q):
while True:
if ser.in_waiting > 0:
data = ser.readline().decode('utf-8').strip()
q.put(data)
collector_thread = threading.Thread(target=data_collector, args=(ser, data_queue))
collector_thread.daemon = True
collector_thread.start()
# Чтение из очереди в основном потоке
while True:
try:
data = data_queue.get(timeout=1)
print(f"Обработка: {data}")
except queue.Empty:
continue
Управление сигналами RS-232
DTR и RTS сигналы
PySerial предоставляет полный контроль над управляющими сигналами:
# Управление DTR (Data Terminal Ready)
ser.dtr = True # Установить DTR
ser.dtr = False # Сбросить DTR
# Управление RTS (Request To Send)
ser.rts = True # Установить RTS
ser.rts = False # Сбросить RTS
# Чтение состояния входных сигналов
cts = ser.cts # Clear To Send
dsr = ser.dsr # Data Set Ready
ri = ser.ri # Ring Indicator
cd = ser.cd # Carrier Detect
Аппаратное управление потоком
ser = serial.Serial('COM3', 115200,
rtscts=True, # RTS/CTS управление потоком
dsrdtr=True) # DSR/DTR управление потоком
Работа с различными платформами
Windows
В Windows порты именуются как COM1, COM2, и т.д.:
ser = serial.Serial('COM3', 9600)
# или для портов > COM9
ser = serial.Serial('\\\\.\\COM10', 9600)
Linux
В Linux используются файлы устройств:
# USB-to-Serial адаптеры
ser = serial.Serial('/dev/ttyUSB0', 9600)
# Встроенные порты
ser = serial.Serial('/dev/ttyS0', 9600)
# Raspberry Pi GPIO UART
ser = serial.Serial('/dev/ttyAMA0', 9600)
macOS
В macOS порты доступны через /dev:
ser = serial.Serial('/dev/cu.usbserial-1410', 9600)
# или
ser = serial.Serial('/dev/tty.usbserial-1410', 9600)
Практические примеры
Работа с Arduino
import serial
import time
def arduino_communication():
try:
ser = serial.Serial('COM4', 9600, timeout=2)
time.sleep(2) # Ждать перезагрузки Arduino
# Включить светодиод
ser.write(b'LED_ON\n')
response = ser.readline().decode('utf-8').strip()
print(f"Arduino ответил: {response}")
time.sleep(1)
# Выключить светодиод
ser.write(b'LED_OFF\n')
response = ser.readline().decode('utf-8').strip()
print(f"Arduino ответил: {response}")
except serial.SerialException as e:
print(f"Ошибка: {e}")
finally:
if 'ser' in locals():
ser.close()
arduino_communication()
Работа с GPS-модулем
def parse_gps_data():
ser = serial.Serial('/dev/ttyUSB0', 4800, timeout=1)
while True:
line = ser.readline().decode('ascii', errors='replace')
if line.startswith('$GPGGA'):
parts = line.split(',')
if len(parts) >= 6 and parts[2] and parts[4]:
lat = float(parts[2][:2]) + float(parts[2][2:]) / 60
lon = float(parts[4][:3]) + float(parts[4][3:]) / 60
if parts[3] == 'S':
lat = -lat
if parts[5] == 'W':
lon = -lon
print(f"Координаты: {lat:.6f}, {lon:.6f}")
Работа с модемом
def modem_at_commands():
ser = serial.Serial('COM5', 115200, timeout=3)
def send_at_command(command):
ser.write(f'{command}\r\n'.encode())
response = ser.read_until(b'OK\r\n').decode()
return response
# Проверка связи
response = send_at_command('AT')
print('Модем отвечает:', 'OK' in response)
# Информация о модеме
info = send_at_command('ATI')
print('Информация о модеме:', info)
# Уровень сигнала
signal = send_at_command('AT+CSQ')
print('Уровень сигнала:', signal)
ser.close()
Отладка и диагностика
Мониторинг трафика
Для отладки полезно логировать весь обмен данными:
class SerialDebugger:
def __init__(self, port, baudrate, **kwargs):
self.ser = serial.Serial(port, baudrate, **kwargs)
def write(self, data):
print(f"TX: {data}")
return self.ser.write(data)
def readline(self):
data = self.ser.readline()
print(f"RX: {data}")
return data
def close(self):
self.ser.close()
# Использование
debug_ser = SerialDebugger('COM3', 9600)
debug_ser.write(b'test\n')
response = debug_ser.readline()
Использование spy:// URL
PySerial поддерживает специальные URL для отладки:
# Перехват трафика
ser = serial.serial_for_url('spy://COM3?file=log.txt', baudrate=9600)
Интеграция с внешними системами
Интеграция с MQTT
import paho.mqtt.client as mqtt
import json
def serial_to_mqtt():
ser = serial.Serial('/dev/ttyUSB0', 9600)
client = mqtt.Client()
client.connect("mqtt.broker.com", 1883, 60)
while True:
if ser.in_waiting > 0:
data = ser.readline().decode('utf-8').strip()
# Парсинг данных сенсора
try:
sensor_data = json.loads(data)
topic = f"sensors/{sensor_data['id']}"
client.publish(topic, data)
except json.JSONDecodeError:
continue
Логирование данных с pandas
import pandas as pd
from datetime import datetime
def log_sensor_data():
ser = serial.Serial('COM3', 9600)
data_log = []
try:
while True:
line = ser.readline().decode('utf-8').strip()
if line:
timestamp = datetime.now()
# Предполагается формат: "temperature:25.6,humidity:60.2"
values = dict(item.split(':') for item in line.split(','))
log_entry = {'timestamp': timestamp}
log_entry.update(values)
data_log.append(log_entry)
# Сохранение каждые 100 записей
if len(data_log) >= 100:
df = pd.DataFrame(data_log)
df.to_csv('sensor_log.csv', index=False, mode='a', header=False)
data_log.clear()
except KeyboardInterrupt:
if data_log:
df = pd.DataFrame(data_log)
df.to_csv('sensor_log.csv', index=False, mode='a', header=False)
ser.close()
Таблица методов и свойств PySerial
| Метод/Свойство | Описание | Пример использования |
|---|---|---|
| Создание и управление соединением | ||
serial.Serial() |
Создает объект последовательного порта | ser = serial.Serial('COM3', 9600) |
open() |
Открывает порт | ser.open() |
close() |
Закрывает порт | ser.close() |
is_open |
Проверяет, открыт ли порт | if ser.is_open: |
| Чтение данных | ||
read(size=1) |
Читает указанное количество байтов | data = ser.read(10) |
readline() |
Читает строку до \n | line = ser.readline() |
readlines() |
Читает все строки из буфера | lines = ser.readlines() |
read_all() |
Читает все доступные данные | data = ser.read_all() |
read_until(expected) |
Читает до разделителя | data = ser.read_until(b'\r\n') |
| Запись данных | ||
write(data) |
Записывает байты в порт | ser.write(b'hello\n') |
flush() |
Принудительная отправка буфера | ser.flush() |
| Управление буферами | ||
reset_input_buffer() |
Очищает входной буфер | ser.reset_input_buffer() |
reset_output_buffer() |
Очищает выходной буфер | ser.reset_output_buffer() |
in_waiting |
Количество байтов во входном буфере | bytes_available = ser.in_waiting |
out_waiting |
Количество байтов в выходном буфере | bytes_pending = ser.out_waiting |
| Настройки порта | ||
baudrate |
Скорость передачи | ser.baudrate = 115200 |
bytesize |
Размер байта (5,6,7,8) | ser.bytesize = serial.EIGHTBITS |
parity |
Контроль четности | ser.parity = serial.PARITY_NONE |
stopbits |
Количество стоп-бит | ser.stopbits = serial.STOPBITS_ONE |
timeout |
Таймаут чтения | ser.timeout = 5.0 |
write_timeout |
Таймаут записи | ser.write_timeout = 2.0 |
inter_byte_timeout |
Таймаут между байтами | ser.inter_byte_timeout = 0.1 |
| Управление сигналами | ||
dtr |
Управление DTR сигналом | ser.dtr = True |
rts |
Управление RTS сигналом | ser.rts = False |
cts |
Чтение состояния CTS | if ser.cts: |
dsr |
Чтение состояния DSR | if ser.dsr: |
ri |
Чтение состояния RI | if ser.ri: |
cd |
Чтение состояния CD | if ser.cd: |
| Специальные функции | ||
send_break(duration) |
Отправка break-сигнала | ser.send_break(0.25) |
serial_for_url() |
Создание порта по URL | ser = serial.serial_for_url('loop://') |
list_ports.comports() |
Список доступных портов | ports = list_ports.comports() |
Часто задаваемые вопросы
Как выбрать правильную скорость передачи?
Скорость передачи (baudrate) должна совпадать с настройками подключенного устройства. Наиболее распространенные значения: 9600, 38400, 57600, 115200. Для Arduino по умолчанию используется 9600, для ESP32 — 115200.
Почему возникает ошибка "Access denied" при открытии порта?
Эта ошибка обычно возникает когда порт уже используется другим приложением (например, Arduino IDE Serial Monitor) или у пользователя нет прав доступа к порту. Закройте все программы, использующие порт, и запустите Python-скрипт с административными правами если необходимо.
Как работать с данными в кодировке, отличной от UTF-8?
По умолчанию PySerial работает с байтами. Для декодирования используйте метод decode() с нужной кодировкой:
data = ser.readline().decode('cp1251') # Windows-1251
data = ser.readline().decode('ascii', errors='ignore') # Игнорировать ошибки
Что делать, если данные приходят по частям?
Используйте буферизацию или читайте до определенного разделителя:
buffer = b''
while True:
chunk = ser.read(ser.in_waiting or 1)
buffer += chunk
if b'\n' in buffer:
line, buffer = buffer.split(b'\n', 1)
process_data(line)
Как настроить неблокирующее чтение?
Установите timeout=0 для неблокирующего режима:
ser = serial.Serial('COM3', 9600, timeout=0)
data = ser.read(100) # Вернет доступные данные немедленно
Можно ли использовать PySerial с виртуальными портами?
Да, PySerial поддерживает виртуальные порты через URL:
# Петлевое соединение для тестирования
ser = serial.serial_for_url('loop://', timeout=1)
# Соединение через сеть (RFC2217)
ser = serial.serial_for_url('rfc2217://remote_host:4001', baudrate=9600)
Преимущества и ограничения
Преимущества PySerial
Простота использования — библиотека предоставляет интуитивно понятный API, который позволяет начать работу с минимальным количеством кода.
Кроссплатформенность — один и тот же код работает на Windows, Linux, macOS и других операционных системах.
Гибкость конфигурации — полный контроль над всеми параметрами последовательного соединения.
Надежность — библиотека используется в промышленных решениях и имеет долгую историю развития.
Расширяемость — поддержка различных типов соединений через URL-схемы.
Ограничения библиотеки
Синхронная природа — PySerial не имеет встроенной поддержки asyncio, что может быть проблемой в асинхронных приложениях.
Требует знаний протокола — для эффективной работы необходимо понимать особенности последовательной передачи данных.
Платформо-зависимые особенности — несмотря на кроссплатформенность, некоторые функции могут работать по-разному на разных ОС.
Альтернативы и дополнения
PySerial-asyncio
Для асинхронной работы рекомендуется использовать пакет pyserial-asyncio:
import asyncio
import serial_asyncio
async def main():
reader, writer = await serial_asyncio.open_serial_connection(
url='COM3', baudrate=9600)
writer.write(b'Hello\n')
response = await reader.readline()
print(response.decode())
asyncio.run(main())
PyFirmata для Arduino
Для работы с Arduino через протокол Firmata:
from pyfirmata import Arduino
board = Arduino('COM3')
led = board.get_pin('d:13:o') # Digital pin 13 as output
led.write(1) # Turn on LED
Заключение
PySerial является незаменимым инструментом для разработчиков, работающих с последовательными портами в Python. Благодаря своей простоте, стабильности и богатому функционалу, библиотека находит применение в широком спектре задач — от простых DIY-проектов до сложных промышленных систем.
Правильное использование PySerial требует понимания основ последовательной связи, но после освоения базовых принципов библиотека предоставляет мощные возможности для создания надежных коммуникационных решений. Она остается стандартом для работы с Arduino, микроконтроллерами, промышленными контроллерами, GPS-модулями, модемами и другими устройствами, использующими последовательную передачу данных.
Независимо от сложности проекта, PySerial предоставляет необходимые инструменты для эффективной работы с последовательными портами, делая интеграцию аппаратных устройств с Python-приложениями простой и надежной задачей.
Настоящее и будущее развития ИИ: классической математики уже недостаточно
Эксперты предупредили о рисках фейковой благотворительности с помощью ИИ
В России разработали универсального ИИ-агента для роботов и индустриальных процессов