Корутины и задачи

В этом разделе приведено высокоуровневое API asyncio для работы с корутинами и задачами.

Корутины

Корутины, объявляемые с помощью async/await синтаксиса, является предпочтительным способом написания asyncio приложений. Например, следующий фрагмент кода (требует Python 3.7) напечатает «hello», ожидает 1 секунду, а затем печатает «world»:

>>> import asyncio

>>> async def main():
...     print('hello')
...     await asyncio.sleep(1)
...     print('world')

>>> asyncio.run(main())
hello
world

Заметим, что простой вызов корутины не приведёт к её выполнению:

>>> main()
<coroutine object main at 0x1053bb7c8>

Чтобы фактически запустить корутину, asyncio предоставляет три основных механизма:

  • Функция asyncio.run() для запуска функции точки входа верхнего уровня «main()» (см. приведённый пример выше)

  • Ожидающая корутина. Следующий фрагмент кода напечатает «hello» после ожидания в 1 секунду, а затем напечатает «world» после ожидания в течении ещё 2х секунд

    import asyncio
    import time
    
    async def say_after(delay, what):
        await asyncio.sleep(delay)
        print(what)
    
    async def main():
        print(f"started at {time.strftime('%X')}")
    
        await say_after(1, 'hello')
        await say_after(2, 'world')
    
        print(f"finished at {time.strftime('%X')}")
    
    asyncio.run(main())
    

    Ожидаемый вывод:

    started at 17:13:52
    hello
    world
    finished at 17:13:55
    
  • Функция asyncio.create_task() используется для конкурентного запуска корутин как asyncio: Tasks.

    Давайте изменим приведённый выше пример и запустим две say_after корутины конкурентно:

    async def main():
        task1 = asyncio.create_task(
            say_after(1, 'hello'))
    
        task2 = asyncio.create_task(
            say_after(2, 'world'))
    
        print(f"started at {time.strftime('%X')}")
    
        # Подождать, пока обе задачи не будут выполнены (должны принять
        # около 2 секунд.)
        await task1
        await task2
    
        print(f"finished at {time.strftime('%X')}")
    

    Обратите внимание, что ожидаемые выходные данные теперь показывают, что фрагмент выполняется на 1 секунду быстрее, чем ранее:

    started at 17:14:32
    hello
    world
    finished at 17:14:34
    

Ожидаемые объекты

Мы говорим, что объект является ожидаемым объектом, если его можно использовать в await выражении. Многие API-интерфейсы asyncio предназначены для приёма ожидаемых. Существует три основных типа ожидаемых объектов: корутины, задачи и футуры.

Корутины

Python корутины являются ожидаемыми и поэтому могут ожидаться из других корутин:

import asyncio

async def nested():
    return 42

async def main():
    # Ничего не произойдет, если мы просто вызовем "nested()".
    # Объект корутины создан, но не await,
    # так что *не будет работать вообще*.
    nested()

    # Давайте сделаем это по-другому и подождём:
    print(await nested())  # Напечатает  "42".

asyncio.run(main())

Важно

В этой документации термин «корутина» может использоваться для двух тесно связанных понятий:

  • Функция корутина: функция async def;
  • Объект корутины: возвращенный объект после вызова функции корутины.

asyncio также поддерживает устаревшие основанные на генераторах корутины.

Задачи

Задачи используются для конкурентного планирования корутин.

Когда корутина обвёрнута в Задачу с такими функциями, как asyncio.create_task(), то автоматически планируется запуск корутины в ближайшее время:

import asyncio

async def nested():
    return 42

async def main():
    # Запланировать nested() ближайший одновременный запуск
    # с "main()".
    task = asyncio.create_task(nested())

    # "task" теперь может использовать отмену "nested()" или
    # можно просто ждать, пока она не завершится:
    await task

asyncio.run(main())

Футуры

Объект Future — это специальный ожидаемый (await) объект низкого уровня , представляющий конечный результат асинхронной операции.

Когда объект Футуры ожидается, это означает, что корутина будет ждать, пока Футура будет решена в каком-то другом месте.

Объекты Футуры в asyncio нужны, чтобы позволить основанному на колбэках коду использоваться с async/await.

Обычно нет нужды создавать объекты Футуры на уровне кода приложения.

Объекты Футуры, иногда раскрываемые библиотеками и некоторыми asyncio API, могут быть ожидаемыми:

async def main():
    await function_that_returns_a_future_object()

    # это также верно:
    await asyncio.gather(
        function_that_returns_a_future_object(),
        some_python_coroutine()
    )

Хорошим примером низкоуровневой функции, возвращающей объект Футуры, является loop.run_in_executor().

Запуск asyncio программы

asyncio.run(coro, *, debug=False)

Выполняет корутину coro и возвращает результат.

Функция управляет переданной корутиной, заботясь об управлении asyncio событийного цикла и завершения асинхронных генераторов.

Функция не может быть вызвана, когда в том же потоке выполняется другой asyncio событийный цикл.

Если debugTrue, событийный цикл будет выполняться в режиме отладки.

Функция всегда создаёт новый событийный цикл и закрывает его в конце. Его следует использовать в качестве основной точки входа для asyncio программ, и в идеале его следует вызывать только один раз.

Пример:

async def main():
    await asyncio.sleep(1)
    print('hello')

asyncio.run(main())

Добавлено в версии 3.7.

Примечание

Исходный код asyncio.run() можно найти в Lib/asyncio/runners.py.

Создание задач

asyncio.create_task(coro, *, name=None)

Обёртывание coro корутины в Task и запланировать её выполнение. Возвращает объект задачи.

Если name не None, он задаётся как имя задачи с помощью Task.set_name().

Задача выполняется в цикле, возвращенного get_running_loop(). Возникает RuntimeError, если в текущем потоке нет запущенного цикла.

Эта функция была добавлена в Python 3.7. Ранее Python 3.7 вместо неё можно использовать низкоуровневую функцию asyncio.ensure_future():

async def coro():
    ...

# В Python 3.7+
task = asyncio.create_task(coro())
...

# Это работает во всех версиях Python, но менее читабельно
task = asyncio.ensure_future(coro())
...

Добавлено в версии 3.7.

Изменено в версии 3.8: Добавлен параметр name.

Сон

coroutine asyncio.sleep(delay, result=None, *, loop=None)

Блокировка на delay секунд.

Если result предоставляется, он возвращается вызывающему после завершения корутины.

sleep() всегда приостанавливает выполнение текущей задачи, позволяя выполнять другие задачи.

Устарело с версии 3.8, будет удалено в 3.10 версии.: Параметр loop.

Пример корутины, отображающей текущую дату каждую секунду в течение 5 секунд:

import asyncio
import datetime

async def display_date():
    loop = asyncio.get_running_loop()
    end_time = loop.time() + 5.0
    while True:
        print(datetime.datetime.now())
        if (loop.time() + 1.0) >= end_time:
            break
        await asyncio.sleep(1)

asyncio.run(display_date())

Конкурентный запуск задач

awaitable asyncio.gather(*aws, loop=None, return_exceptions=False)

Запускает ожидаемые объекты в последовательности aws конкурентно.

Если какой-либо ожидаемый объект в aws является корутиной, он автоматически назначается как задача.

Если все await объекты выполнены успешно, результатом является сводный список возвращенных значений. Порядок значений результата соответствует порядку await в aws.

Если return_exceptions является False (по умолчанию), первое вызванное исключение немедленно распространяется на задачу, которая ожидает на gather(). Другие await объекты в aws последовательности не будут отменены и продолжат работу.

При return_exceptions True исключения обрабатываются так же, как и успешные результаты, и агрегируются в списке результатов.

Если gather() отменён, все представленные ожидаемые (которые ещё не завершены) также будут отменены.

Если какая-либо задача или футура в последовательности aws отменена, это рассматривается, как будто сработало исключение CancelledError — вызов gather() не отменяется в этом случае. Это необходимо для предотвращения отмены одной отправленной задачи/футуры, чтобы привести к отмене других задач/футур.

Устарело с версии 3.8, будет удалено в 3.10 версии.: Параметр loop.

Пример:

import asyncio

async def factorial(name, number):
    f = 1
    for i in range(2, number + 1):
        print(f"Task {name}: Compute factorial({i})...")
        await asyncio.sleep(1)
        f *= i
    print(f"Task {name}: factorial({number}) = {f}")

async def main():
    # Запланировать дерево вызовов *конкурентно*:
    await asyncio.gather(
        factorial("A", 2),
        factorial("B", 3),
        factorial("C", 4),
    )

asyncio.run(main())

# Ожидаемый вывод:
#
#     Task A: Compute factorial(2)...
#     Task B: Compute factorial(2)...
#     Task C: Compute factorial(2)...
#     Task A: factorial(2) = 2
#     Task B: Compute factorial(3)...
#     Task C: Compute factorial(3)...
#     Task B: factorial(3) = 6
#     Task C: Compute factorial(4)...
#     Task C: factorial(4) = 24

Примечание

Если return_exceptions содержит значение False, отмена gather() после того, как он был помечен как выполненный, не отменит ни одного отправленного ожидаемого объекта. Например, gather может быть помечена как выполненная после передачи исключения вызывающей стороне, поэтому вызов gather.cancel() после перехвата исключения (вызванного одним из ожидаемых объектов) из gather не отменяет другие ожидаемые объекты.

Изменено в версии 3.7: Если gather отменяется, отмена распространяется независимо от return_exceptions.

Защита от отмены

awaitable asyncio.shield(aw, *, loop=None)

Защита ожидаемого объекта от отмены.

Если aw корутина, она автоматически назначается как задача.

Оператор:

res = await shield(something())

эквивалентен:

res = await something()

кроме того, что если корутина, содержащая её, отменяется, задача, выполняемая в something(), не отменяется. С точки зрения something() отмены не произошло. Хотя его вызывающий объект всё ещё отменён, «await» выражение по- прежнему вызывает CancelledError.

Если something() отменяется другими средствами (т.е. изнутри), которые также отменяют shield().

Если требуется полностью игнорировать отмену (не рекомендуется), функция shield() должна быть объединена с предложением try/except следующим образом:

try:
    res = await shield(something())
except CancelledError:
    res = None

Устарело с версии 3.8, будет удалено в 3.10 версии.: Параметр loop.

Таймауты

coroutine asyncio.wait_for(aw, timeout, *, loop=None)

Дождаться завершения aw ожидаемого с таймаутом.

Если aw корутина, она автоматически назначается как задача.

timeout может быть либо None, либо числом секунд ожидания с плавающей запятой, либо int числом. Если timeout None, блокировать до завершения футуры.

Если завершается таймаут, задача отменяется и вызывается asyncio.TimeoutError.

Чтобы избежать отмены задачи, оберните её в shield().

Функция будет ждать, пока футура будет фактически отменена, поэтому общее время ожидания может превысить timeout.

Если ожидание отменяется, то также отменяется и будущий aw.

Устарело с версии 3.8, будет удалено в 3.10 версии.: Параметр loop.

Пример:

async def eternity():
    # Спать в течение одного часа
    await asyncio.sleep(3600)
    print('yay!')

async def main():
    # Ожидать не более 1 секунды
    try:
        await asyncio.wait_for(eternity(), timeout=1.0)
    except asyncio.TimeoutError:
        print('timeout!')

asyncio.run(main())

# Ожидаемый вывод:
#
#     timeout!

Изменено в версии 3.7: Когда aw отменяется из-за тайм-аута, wait_for ожидает отмены aw. Ранее она сразу вызывала asyncio.TimeoutError.

Примитивы ожидания

coroutine asyncio.wait(aws, *, loop=None, timeout=None, return_when=ALL_COMPLETED)

Конкурентный запуск ожидаемых объектов в итерации aws и блокировка, пока не будет выполнено условие, указанное в return_when.

Возвращает два множества задачи/футуры: (done, pending).

Использование:

done, pending = await asyncio.wait(aws)

timeout (float или int), если он указан, можно использовать для управления максимальным количеством секунд ожидания перед возвращением.

Обратите внимание, что эта функция не вызывает asyncio.TimeoutError. Футуры или задачи, которые не были выполнены при наступлении тайм-аута, просто возвращаются во втором множестве.

return_when указывает, когда функция должна возвращать. Она должна быть одной из следующих констант:

Константа Описание
FIRST_COMPLETED Функция возвращает, когда любая футура завершится или отменится.
FIRST_EXCEPTION Функция возвращает после завершения любого процесса футуры путём создания исключения. Если в футуре исключение не возникает, то оно эквивалентно ALL_COMPLETED.
ALL_COMPLETED Функция возвращает после завершения или отмены всех футур.

В отличие от wait_for(), wait() не отменяет футуры при наступлении тайм-аута.

Не рекомендуется, начиная с версии 3.8: Если какой-либо ожидаемый в aws является корутиной, он автоматически назначается как задача. Непосредственная передача объектов корутине в wait() является устаревшей практикой, т. к. приводит к запутанному поведению.

Устарело с версии 3.8, будет удалено в 3.10 версии.: Параметр loop.

Примечание

wait() автоматическое планирование корутины как задачи, затем возвращает создаваемые объекты задачи в множестве (done, pending). Поэтому следующий код не будет работать так, как ожидалось:

async def foo():
    return 42

coro = foo()
done, pending = await asyncio.wait({coro})

if coro in done:
    # Эта ветка никогда не будет запущена!

Вот как можно зафиксировать вышеуказанный фрагмент:

async def foo():
    return 42

task = asyncio.create_task(foo())
done, pending = await asyncio.wait({task})

if task in done:
    # Теперь все будет работать так, как и ожидалось.

Не рекомендуется, начиная с версии 3.8: Передача корутиновых объектов непосредственно в wait() устарела.

asyncio.as_completed(aws, *, loop=None, timeout=None)

Запуск ожидаемых объектов в aws итеративно и конкурентно. Возвращает итератор корутины. Каждую возвращенную корутину можно ожидать, чтобы получить самый ранний следующий результат от итерируемого из оставшихся ожидаемых.

Вызывает asyncio.TimeoutError, если тайм-аут наступает до выполнения всех футур.

Устарело с версии 3.8, будет удалено в 3.10 версии.: Параметр loop.

Пример:

for coro in as_completed(aws):
    earliest_result = await coro
    # ...

Планирование из других потоков

asyncio.run_coroutine_threadsafe(coro, loop)

Отправить корутину в событийный цикл. Потокобезопасный.

Возвращает concurrent.futures.Future дожидаясь результата из другого потока ОС.

Эта функция вызывается из потока операционной системы, отличного от того, где выполняется событийный цикл. Пример:

# Создание корутины
coro = asyncio.sleep(1, result=3)

# Отправить корутину в заданный цикл
future = asyncio.run_coroutine_threadsafe(coro, loop)

# Ожидать результата с необязательным аргументом timeout
assert future.result(timeout) == 3

Если в корутине возникает исключение, возвращенная футура будет уведомлена. Также можно использовать отмену задачи в событийном цикле:

try:
    result = future.result(timeout)
except asyncio.TimeoutError:
    print('The coroutine took too long, cancelling the task...')
    future.cancel()
except Exception as exc:
    print(f'The coroutine raised an exception: {exc!r}')
else:
    print(f'The coroutine returned: {result!r}')

См. раздел конкурентность и многопоточность документации.

В отличие от других asyncio функций эта функция требует явной передачи loop аргумента.

Добавлено в версии 3.5.1.

Интроспекция

asyncio.current_task(loop=None)

Возвращает текущую Task сущность или None, если задача не выполняется.

Если loopNone, используется get_running_loop() для получения текущего цикла.

Добавлено в версии 3.7.

asyncio.all_tasks(loop=None)

Возвращает множество ещё не завершенных объектов Task, запущенных в цикле.

Если loopNone, используется get_running_loop() для получения текущего цикла.

Добавлено в версии 3.7.

Объект задачи

class asyncio.Task(coro, *, loop=None, name=None)

Футуроподобный объект, запускающий Python корутину. Не потокобезопасной.

Задачи используются для запуска корутин в событийных циклах. Если корутина ожидает футуры, задача приостанавливает выполнение корутины и ожидает завершения футуры. Когда выполнено, футура возобновляет исполнение обёрнутой корутины.

Событийный цикл использует совместное планирование: событийный цикл выполняет одну задачу одновременно. В то время как задача ожидает для завершения футуры, событийный цикл выполняет другие задачи, колбэки или выполняет операции ввода- вывода.

Используйте высокоуровневую функцию asyncio.create_task(), чтобы создать задачи или низкоуровневые функции loop.create_task() или ensure_future(). Не рекомендуется создавать экземпляры задач вручную.

Для отмены выполняемой задачи используйте метод cancel(). Этот вызов приведёт к тому, что задача бросит CancelledError исключение в обернутую корутину. Если корутина ожидает в объекте футуры во время отмены, объект футуры будет отменён.

Можно использовать cancelled(), чтобы проверить, была ли задача отменена. Метод возвращает True, если обёрнутая корутина не подавила CancelledError исключение и фактически была отменена.

asyncio.Task наследует от Future все свои API, кроме Future.set_result() и Future.set_exception().

Задачи поддерживают модуль contextvars. При создании задачи она копирует текущий контекст, а затем запускает корутину в скопированном контексте.

Изменено в версии 3.7: Добавлена поддержка модуля contextvars.

Изменено в версии 3.8: Добавлен параметр name.

Устарело с версии 3.8, будет удалено в 3.10 версии.: Параметр loop.

cancel()

Запрос отмены задачи.

Это позволяет создать CancelledError исключение для обернутой корутины на следующем цикле событийного цикла.

После этого у корутины есть шанс очистить или даже отклонить просьбу, подавив исключение в блоке tryexcept CancelledErrorfinally. Поэтому, в отличие от Future.cancel(), Task.cancel() не гарантирует, что задача будет отменена, хотя подавление отмены полностью не распространено и активно отговаривается.

В следующем примере показано, как корутины могут перехватывать запрос на отмену:

async def cancel_me():
    print('cancel_me(): before sleep')

    try:
        # Ждать 1 секунду
        await asyncio.sleep(3600)
    except asyncio.CancelledError:
        print('cancel_me(): cancel sleep')
        raise
    finally:
        print('cancel_me(): after sleep')

async def main():
    # Создание задачи "cancel_me"
    task = asyncio.create_task(cancel_me())

    # Ждать 1 секунду
    await asyncio.sleep(1)

    task.cancel()
    try:
        await task
    except asyncio.CancelledError:
        print("main(): cancel_me is cancelled now")

asyncio.run(main())

# Ожидаемый результат:
#
#     cancel_me(): before sleep
#     cancel_me(): cancel sleep
#     cancel_me(): after sleep
#     main(): cancel_me is cancelled now
cancelled()

Возвращает True, если задача отменена.

Задача отменена, когда запрашивалась отмена с cancel() и обернутая корутина распространила в неё CancelledError исключение.

done()

Возвращает True, если задача завершена.

Задача завершена, когда обернутая корутина либо возвратила значение, либо вызвала исключение, либо задача была отменена.

result()

Возвращает результат выполнения задачи.

Если задача является завершенной, то результатом обернутой корутины является возвращаемое (или если корутина вызвала исключение, то это исключение возникает повторно.)

Если задача была отменена, это метод вызывает CancelledError исключение.

Если результат задачи ещё не доступен, это метод вызывает InvalidStateError исключение.

exception()

Возвращает исключение задачи.

Если у обернутой корутины возникло исключение, то возвращается это исключение. Если обернутая корутина возвращается нормально, то этот метод возвращает None.

Если задача была отменена, это метод вызывает CancelledError исключение.

Если задача еще не завершена, это метод вызывает InvalidStateError исключение.

add_done_callback(callback, *, context=None)

Добавление колбэка для выполнения при выполнении задачи.

Этот метод должен быть использован только в низкоуровневом основанном на колбэках коде.

Для получения дополнительной информации см. документацию Future.add_done_callback().

remove_done_callback(callback)

Удалить callback из списка колбэков.

Этот метод должен быть использован только в низкоуровневом основанном на колбэках коде.

Для получения дополнительной информации см. документацию Future.remove_done_callback().

get_stack(*, limit=None)

Возвращает список фреймов стека для этой задачи.

Если обёрнутая корутина не выполнена, он возвращает стек, где он приостановлен. Если короутина успешно завершена или отменена, возвращает пустой список. Если корутина была прервана исключением, возвращает список кадров трейсбэка.

Фреймы всегда упорядочиваются от самых старых до самых новых.

Для приостановленной корутины возвращается только одни фрейм стека.

Необязательный аргумент limit устанавливает максимальное количество возвращаемых кадров; по умолчанию возвращаются все доступные фреймы. Порядок возвращаемого списка различается в зависимости от того, возвращается ли стек или трассировка: возвращаются самые новые фреймы стека, но возвращаются самые старые фреймы трассировки. (Это соответствует поведению модуля трассировки.)

print_stack(*, limit=None, file=None)

Печать стека или трейсбэка для этой задачи.

При этом выводятся выходные данные, аналогичные данным модуля traceback для фреймов, извлекаемых get_stack().

Аргумент limit передается непосредственно get_stack().

Аргумент file представляет собой поток I/O, в который записываются выходные данные; по умолчанию выходные данные записываются в sys.stderr.

get_coro()

Возвращает объект корутины, обернутый Task.

Добавлено в версии 3.8.

get_name()

Возвращает имя задачи.

Если ни одно имя не было явно назначено задаче, реализация задачи asyncio по умолчанию создаёт имя по умолчанию во время создания экземпляра.

Добавлено в версии 3.8.

set_name(value)

Задание имя задачи.

Аргументом value может быть любой объект, который затем преобразуется в строку.

В реализации задачи по умолчанию имя будет отображаться в repr() выходных данных объекта задачи.

Добавлено в версии 3.8.

classmethod all_tasks(loop=None)

Возвращает множество всех задач для событийного цикла.

По умолчанию возвращаются все задачи для текущего событийного цикла. Если loop None, то используется get_event_loop() функция для получения текущего цикла.

Устарело с версии 3.7, будет удалено в 3.9 версии.: Не вызывайте этот метод задач. Вместо этого используйте функцию asyncio.all_tasks().

classmethod current_task(loop=None)

Возвращает текущую запущенную задачу или None.

Если loopNone, используется функция get_event_loop() для получения текущего цикла.

Устарело с версии 3.7, будет удалено в 3.9 версии.: Не вызывайте этот метод задач. Вместо него используйте функцию asyncio.current_task().

Основанные на генераторах корутины

Примечание

Поддержка основанных на генераторах корутин запрещено и планируется к удалению в Python 3.10.

Корутины на основе генератора предшествовали синтаксису async/await. Они представляют собой Python генераторы, которые используют yield from выражения для ожидания футур и других корутин.

Генераторные корутины должны быть задекорированы @asyncio.coroutine, хотя это не применяется.

@asyncio.coroutine

Декоратор для маркировки основанных на генераторах корутин.

Этот декоратор обеспечивает совместимость устаревших основанных на генераторах корутин с async/await кодом:

@asyncio.coroutine
def old_style_coroutine():
    yield from asyncio.sleep(1)

async def main():
    await old_style_coroutine()

Этот декоратор не должен использоваться для async def корутин.

Устарело с версии 3.8, будет удалено в 3.10 версии.: Используйте async def вместо этого.

asyncio.iscoroutine(obj)

Возвращает True, если obj объект корутины.

Метод отличается от inspect.iscoroutine() потому что возвращает True для основанных на генераторах корутин.

asyncio.iscoroutinefunction(func)

Возвращает True, если func функция корутина.

Метод отличается от inspect.iscoroutinefunction(), потому что возвращает True для основанных на генераторах функций корутин декорированных с @coroutine.