аватар question@mail.ru · 01.01.1970 03:00

asyncio: корректная остановка не своих сопрограмм

Я опять с той же проблемой. В все таски были под моим контролем, и я мог их вручную закрывать. Но вот беда: мне тут потребовались вебсокеты. В примерах в никакого корректного закрытия не производится:

start_server = websockets.serve(hello, 'localhost', 8765)asyncio.get_event_loop().run_until_complete(start_server)asyncio.get_event_loop().run_forever()

Я поигрался с примерами, и да, при Ctrl+C ругается, если есть подключенный клиент:

$ python websocket_server.py ^CKeyboardInterruptException ignored in: Task was destroyed but it is pending!task: <Task pending coro=<run() running at websockets/protocol.py:235>   wait_for=<Future pending cb=[Task._wakeup()]>  cb=[_wait.<locals>._on_completion() at asyncio/tasks.py:399]>Task was destroyed but it is pending!task: <Task pending coro=<get() running at asyncio/queues.py:198>   wait_for=<Future pending cb=[Task._wakeup()]>  cb=[_wait.<locals>._on_completion() at asyncio/tasks.py:399]>Task was destroyed but it is pending!task: <Task pending coro=<handler() running at websockets/server.py:64>   wait_for=<Future pending cb=[Task._wakeup()]>>

Так как корректного способа завершения в документации не представлено и быстрое тыкание наличия всяких server.close() ни к какому продуктивному результату не привело, снова появляется вопрос о корректном завершении приложения, если таски мной не контролируются.

Это просто эта библиотека кривая? Или это характерно для любых asyncio-библиотек и я чего-то не понимаю в самой сути asyncio и кто-нибудь ткнёт меня а какую-нибудь матчасть по этому поводу?

Как всё-таки корректно завершать всё это дело в общем случае?


С общим случаем принцип понятен, но websockets закрываться никак не хочет. Минимальный пример для повторения (на основе ответа @jfs):

#!/usr/bin/env python3# -*- coding: utf-8 -*-import asyncioimport websockets@asyncio.coroutinedef echo(ws, path):    print('client started')    while True:        data = yield from ws.recv()        if data is None:            break        yield from ws.send(data)    print('client finished')loop = asyncio.get_event_loop()start_server = websockets.serve(echo, '127.0.0.1', 8888, loop=loop)server = loop.run_until_complete(start_server)print('Listen')try:    server = loop.run_forever()except KeyboardInterrupt:    passserver.close()loop.run_until_complete(server.wait_closed())loop.close()print('Finished')

Достаточно просто подключить к нему любой клиент (хоть из браузера — ws = new WebSocket('ws://127.0.0.1:8888/');) и нажать Ctrl+C — отпечатается приведённая ранее ошибка (правда, уже без KeyboardInterrupt).

аватар answer@mail.ru · 01.01.1970 03:00

asyncio реализует кооперативную многозадачность. Это значит, что websockets библиотека должна кооперировать (предоставлять возможность чисто завершить соединения). Был открыт баг на websockets .

Даже для обычных (preemptive) потоков не существует общего решения, которое бы корректно остановило бы потоки вне зависимости от кода, который они исполняют (Java выучила этот урок тяжёлым путём). Безопасным для остановки ресурсом является процесс операционной системы, если нас не волнует потеря данных из-за неочищенного файлового буфера, уведомления другой стороны о прекращении сетевого соединения, завершение транзакции базы данных итд. Если волнует, то процесс также обязан кооперировать, если мы хотим его завершить преждевременно.

Чтобы обработать Ctrl+C в Питоне, как обычно нужно поймать KeyboardInterrupt или определить свой обработчик для SIGINT сигнала. :

import asyncio@asyncio.coroutinedef handle_echo(reader, writer):    data = yield from reader.read(100)    message = data.decode()    addr = writer.get_extra_info('peeame')    print(""Received %r from %r"" % (message, addr))    print(""Send: %r"" % message)    writer.write(data)    yield from writer.drain()    print(""Close the client socket"")    writer.close()loop = asyncio.get_event_loop()coro = asyncio.start_server(handle_echo, '127.0.0.1', 8888, loop=loop)server = loop.run_until_complete(coro)# Serve requests until CTRL+c is pressedprint('Serving on {}'.format(server.sockets[0].getsockname()))try:    loop.run_forever()except KeyboardInterrupt:    pass# Close the serverserver.close()loop.run_until_complete(server.wait_closed())loop.close()

В gevent (кооперативная многозадачность) также необходимо KeyboardInterrtupt ловить.

Документация явно упоминает:

serve() yields a Server which provides a close() method and a wait_closed() coroutine to stop serving requests.

то есть для websocket сервера можно точно такой же код использовать для корректной остановки:

try:    loop.run_forever()except KeyboardInterrupt:    pass# Close the serverserver.close()loop.run_until_complete(server.wait_closed())loop.close()

Последние

Похожие