Разработка с помощью asyncio

Асинхронное программирование отличается от классического «последовательного» программирования.

На этой странице перечислены распространенные ошибки и ловушки, а также объясняется, как их избежать.

Режим отладки

По умолчанию asyncio работает в производственном режиме. Для облегчения разработки у asyncio есть режим отладки.

Существует несколько способов активации режима отладки asyncio:

  • Установка переменной среды PYTHONASYNCIODEBUG на 1.
  • Использование параметра командной строки Python -X dev.
  • Передача debug=True в asyncio.run().
  • Вызов loop.set_debug().

Помимо включения режима отладки, подумайте также:

  • Установка уровня журналирования asyncio логгера на logging.DEBUG, например, следующий фрагмент кода может выполняться при запуске приложения:

    logging.basicConfig(level=logging.DEBUG)
    
  • Настройка модуля warnings для отображения предупреждений ResourceWarning. Один из способов сделать это — использовать параметр командной строки -W default.

Когда включён режим отладки:

  • asyncio проверяет наличие корутин, которые не ожидают и регистрирует их; это смягчает ловушку «забытого ожидания».
  • Многие не потокобезопасные асинхронные API-интерфейсы (например, методы loop.call_soon() и loop.call_at()) вызывают исключение, если они вызываются из неправильного потока.
  • Время выполнения селектора ввода-вывода регистрируется, если выполнение операции ввода-вывода занимает слишком много времени.
  • Записываются обратные вызовы, занимающие более 100 мс. Можно использовать атрибут loop.slow_callback_duration для установки минимальной продолжительности выполнения в секундах, которая считается «медленной».

Параллелизм и многопоточность

Событийный цикл выполняется в потоке (обычно в основном потоке) и выполняет все обратные вызовы и задачи в своем потоке. Пока задача выполняется в цикле событий, никакие другие задачи не могут выполняться в том же потоке. Когда задача выполняет выражение await, выполняющаяся задача приостанавливается, и событийный цикл выполняет следующую задачу.

Чтобы запланировать колбэк из другого потока ОС, следует использовать метод loop.call_soon_threadsafe(). Пример:

loop.call_soon_threadsafe(callback, *args)

Почти все объекты asyncio не являются потокобезопасными, что обычно не является проблемой, если нет кода, который работает с ними извне задачи или обратного вызова. Если есть необходимость в таком коде для вызова низкоуровневого асинхронного API, следует использовать метод loop.call_soon_threadsafe(), например

loop.call_soon_threadsafe(fut.cancel)

Чтобы запланировать объект корутины из другого потока ОС, следует использовать функцию run_coroutine_threadsafe(). Он возвращает concurrent.futures.Future для доступа к результату:

async def coro_func():
     return await asyncio.sleep(1, 42)

# Позже в другом потоке ОС

future = asyncio.run_coroutine_threadsafe(coro_func(), loop)
# Ожидание результата:
result = future.result()

Для обработки сигналов и выполнения подпроцессов цикл обработки событий должен выполняться в основном потоке.

Метод loop.run_in_executor() можно использовать с concurrent.futures.ThreadPoolExecutor для выполнения кода блокировки в другом потоке ОС без блокировки потока ОС, в котором выполняется событийный цикл.

В настоящее время нет возможности планировать корутины или обратные вызовы непосредственно из другого процесса (например, запущенного с multiprocessing). В разделе Методы событийного цикла перечислены API-интерфейсы, которые могут читать из каналов и просматривать дескрипторы файлов, не блокируя событийный цикл. Кроме того, API- интерфейсы asyncio Подпроцессов предоставляют способ запуска процесса и взаимодействия с ним из событийного цикла. Наконец, вышеупомянутый метод loop.run_in_executor() также может использоваться с concurrent.futures.ProcessPoolExecutor для выполнения кода в другом процессе.

Выполнение блокирующего кода

Блокирующий (связанный с ЦП) код не должен вызываться напрямую. Например, если функция выполняет вычисление с интенсивным использованием ЦП в течение 1 секунды, все параллельные задачи asyncio и операции ввода-вывода будут отложены на 1 секунду.

Исполнитель может использоваться для запуска задачи в другом потоке или даже в другом процессе, чтобы избежать блокировки потока ОС с помощью событийного цикла. Подробнее см. метод loop.run_in_executor().

Логирование

asyncio использует модуль logging, и всё журналирование ведётся через логгер "asyncio".

Уровень журнала по умолчанию — logging.INFO, который можно легко настроить:

logging.getLogger("asyncio").setLevel(logging.WARNING)

Обнаружение не ожидаемых корутин

Когда функция корутины вызывается, но не ожидается (например, coro() вместо await coro()) или корутина не запланирована с asyncio.create_task(), asyncio вызовет RuntimeWarning:

import asyncio

async def test():
    print("never scheduled")

async def main():
    test()

asyncio.run(main())

Выход:

test.py:7: RuntimeWarning: coroutine 'test' was never awaited
  test()

Вывод в режиме отладки:

test.py:7: RuntimeWarning: coroutine 'test' was never awaited
Coroutine created at (most recent call last)
  File "../t.py", line 9, in <module>
    asyncio.run(main(), debug=True)

  < .. >

  File "../t.py", line 7, in main
    test()
  test()

Обычное исправление — либо дождаться корутины, либо вызвать функцию asyncio.create_task():

async def main():
    await test()

Обнаружение не перехваченных исключений

Если вызывается Future.set_exception(), но объект Future никогда не ожидается, исключение никогда не будет распространяться выше по пользовательскому коду. В этом случае asyncio выдаст сообщение журнала, когда объект Future будет собран сборщиком мусора.

Пример необработанного исключения:

import asyncio

async def bug():
    raise Exception("not consumed")

async def main():
    asyncio.create_task(bug())

asyncio.run(main())

Выход:

Task exception was never retrieved
future: <Task finished coro=<bug() done, defined at test.py:3>
  exception=Exception('not consumed')>

Traceback (most recent call last):
  File "test.py", line 4, in bug
    raise Exception("not consumed")
Exception: not consumed

Активация режима отладки позволяет получить трейсбэк места создания задачи:

asyncio.run(main(), debug=True)

Вывод в режиме отладки:

Task exception was never retrieved
future: <Task finished coro=<bug() done, defined at test.py:3>
    exception=Exception('not consumed') created at asyncio/tasks.py:321>

source_traceback: Object created at (most recent call last):
  File "../t.py", line 9, in <module>
    asyncio.run(main(), debug=True)

< .. >

Traceback (most recent call last):
  File "../t.py", line 4, in bug
    raise Exception("not consumed")
Exception: not consumed