Разработка с помощью asyncio
Асинхронное программирование отличается от классического «последовательного» программирования.
На этой странице перечислены распространенные ошибки и ловушки, а также объясняется, как их избежать.
Режим отладки
По умолчанию asyncio работает в производственном режиме. Для облегчения разработки у asyncio есть режим отладки.
Существует несколько способов активации режима отладки asyncio:
- Установка переменной среды
PYTHONASYNCIODEBUG
на1
. - Использование параметра командной строки Python
-X
dev
. - Передача
debug=True
вasyncio.run()
. - Вызов
loop.set_debug()
.
Помимо включения режима отладки, подумайте также:
Установка уровня журналирования asyncio логгера на
logging.DEBUG
, например, следующий фрагмент кода может выполняться при запуске приложения:logging.basicConfig(level=logging.DEBUG)
Настройка модуля
warnings
для отображения предупрежденийResourceWarning
. Один из способов сделать это — использовать параметр командной строки-W
default
.
Когда включён режим отладки:
- asyncio проверяет наличие корутин, которые не ожидают и регистрирует их; это смягчает ловушку «забытого ожидания».
- Многие не потокобезопасные асинхронные API-интерфейсы (например, методы
loop.call_soon()
иloop.call_at()
) вызывают исключение, если они вызываются из неправильного потока. - Время выполнения селектора ввода-вывода регистрируется, если выполнение операции ввода-вывода занимает слишком много времени.
- Записываются обратные вызовы, занимающие более 100 мс. Можно использовать
атрибут
loop.slow_callback_duration
для установки минимальной продолжительности выполнения в секундах, которая считается «медленной».
Параллелизм и многопоточность
Событийный цикл выполняется в потоке (обычно в основном потоке) и выполняет все
обратные вызовы и задачи в своем потоке. Пока задача выполняется в цикле
событий, никакие другие задачи не могут выполняться в том же потоке. Когда
задача выполняет выражение await
, выполняющаяся задача приостанавливается,
и событийный цикл выполняет следующую задачу.
Чтобы запланировать колбэк из другого потока ОС, следует использовать
метод loop.call_soon_threadsafe()
. Пример:
loop.call_soon_threadsafe(callback, *args)
Почти все объекты asyncio не являются потокобезопасными, что обычно не является
проблемой, если нет кода, который работает с ними извне задачи или обратного
вызова. Если есть необходимость в таком коде для вызова низкоуровневого
асинхронного API, следует использовать метод loop.call_soon_threadsafe()
,
например
loop.call_soon_threadsafe(fut.cancel)
Чтобы запланировать объект корутины из другого потока ОС, следует использовать
функцию run_coroutine_threadsafe()
. Он возвращает
concurrent.futures.Future
для доступа к результату:
async def coro_func():
return await asyncio.sleep(1, 42)
# Позже в другом потоке ОС
future = asyncio.run_coroutine_threadsafe(coro_func(), loop)
# Ожидание результата:
result = future.result()
Для обработки сигналов и выполнения подпроцессов цикл обработки событий должен выполняться в основном потоке.
Метод loop.run_in_executor()
можно использовать с
concurrent.futures.ThreadPoolExecutor
для выполнения кода блокировки в
другом потоке ОС без блокировки потока ОС, в котором выполняется событийный
цикл.
В настоящее время нет возможности планировать корутины или обратные вызовы
непосредственно из другого процесса (например, запущенного с
multiprocessing
). В разделе Методы событийного цикла
перечислены API-интерфейсы, которые могут читать из каналов и
просматривать дескрипторы файлов, не блокируя событийный цикл. Кроме того, API-
интерфейсы asyncio Подпроцессов предоставляют способ
запуска процесса и взаимодействия с ним из событийного цикла. Наконец,
вышеупомянутый метод loop.run_in_executor()
также может использоваться с
concurrent.futures.ProcessPoolExecutor
для выполнения кода в другом
процессе.
Выполнение блокирующего кода
Блокирующий (связанный с ЦП) код не должен вызываться напрямую. Например, если функция выполняет вычисление с интенсивным использованием ЦП в течение 1 секунды, все параллельные задачи asyncio и операции ввода-вывода будут отложены на 1 секунду.
Исполнитель может использоваться для запуска задачи в другом потоке или даже в
другом процессе, чтобы избежать блокировки потока ОС с помощью событийного
цикла. Подробнее см. метод loop.run_in_executor()
.
Логирование
asyncio использует модуль logging
, и всё журналирование ведётся через
логгер "asyncio"
.
Уровень журнала по умолчанию — logging.INFO
, который можно легко
настроить:
logging.getLogger("asyncio").setLevel(logging.WARNING)
Обнаружение не ожидаемых корутин
Когда функция корутины вызывается, но не ожидается (например, coro()
вместо
await coro()
) или корутина не запланирована с asyncio.create_task()
,
asyncio вызовет RuntimeWarning
:
import asyncio
async def test():
print("never scheduled")
async def main():
test()
asyncio.run(main())
Выход:
test.py:7: RuntimeWarning: coroutine 'test' was never awaited
test()
Вывод в режиме отладки:
test.py:7: RuntimeWarning: coroutine 'test' was never awaited
Coroutine created at (most recent call last)
File "../t.py", line 9, in <module>
asyncio.run(main(), debug=True)
< .. >
File "../t.py", line 7, in main
test()
test()
Обычное исправление — либо дождаться корутины, либо вызвать функцию
asyncio.create_task()
:
async def main():
await test()
Обнаружение не перехваченных исключений
Если вызывается Future.set_exception()
, но объект Future никогда не
ожидается, исключение никогда не будет распространяться выше по пользовательскому
коду. В
этом случае asyncio выдаст сообщение журнала, когда объект Future будет собран
сборщиком мусора.
Пример необработанного исключения:
import asyncio
async def bug():
raise Exception("not consumed")
async def main():
asyncio.create_task(bug())
asyncio.run(main())
Выход:
Task exception was never retrieved
future: <Task finished coro=<bug() done, defined at test.py:3>
exception=Exception('not consumed')>
Traceback (most recent call last):
File "test.py", line 4, in bug
raise Exception("not consumed")
Exception: not consumed
Активация режима отладки позволяет получить трейсбэк места создания задачи:
asyncio.run(main(), debug=True)
Вывод в режиме отладки:
Task exception was never retrieved
future: <Task finished coro=<bug() done, defined at test.py:3>
exception=Exception('not consumed') created at asyncio/tasks.py:321>
source_traceback: Object created at (most recent call last):
File "../t.py", line 9, in <module>
asyncio.run(main(), debug=True)
< .. >
Traceback (most recent call last):
File "../t.py", line 4, in bug
raise Exception("not consumed")
Exception: not consumed