Примитивы синхронизации

Исходный код: Lib/asyncio/locks.py


Примитивы синхронизации asyncio спроектированы так, чтобы быть похожими на примитивы модуля threading с двумя важными оговорками:

  • Примитивы asyncio не являются потокобезопасными, поэтому их не следует использовать для синхронизации потоков ОС (для этого используется threading);
  • Методы этих примитивов синхронизации не принимают аргумент timeout; используйте функцию asyncio.wait_for() для выполнения операций с таймаутами.

У asyncio есть следующие базовые примитивы синхронизации:


Блокировка

class asyncio.Lock(*, loop=None)

Реализует блокировку мьютекса для asyncio задач. Не потокобезопасный.

Блокировка asyncio может использоваться, чтобы гарантировать монопольный доступ к общему ресурсу.

Предпочтительный способ использования Lock — это оператор async with:

lock = asyncio.Lock()

# ... позже
async with lock:
    # доступ к общему состоянию

что эквивалентно:

lock = asyncio.Lock()

# ... позже
await lock.acquire()
try:
    # доступ к общему состоянию
finally:
    lock.release()

Устарело с версии 3.8, будет удалено в 3.10 версии.: Параметр loop.

coroutine acquire()

Захват блокировки.

Этот метод ожидает, пока блокировка не станет разблокированной, устанавливает для неё значение заблокирована и возвращает True.

Когда в acquire() блокируется более одной корутины, ожидая разблокировки блокировки, в конечном итоге продолжает работу только одна корутина.

Захват блокировки выполняется честно: выполняемая корутина, будет первой корутиной, которая начала ожидать блокировки.

release()

Снять блокировку.

Когда блокировка будет заблокирована, сбросить её на разблокирована и вернуться.

Если блокировка разблокирована, вызывается RuntimeError.

locked()

Возвращает True, если блокировка заблокирована.

Событие

class asyncio.Event(*, loop=None)

Объект события. Не потокобезопасный.

Событие asyncio можно использовать для уведомления нескольких asyncio задач о том, что произошло какое-то событие.

Объект Event управляет внутренним флагом, который может быть установлен на true с помощью метода set() и сброшен на false с помощью метода clear(). Метод wait() блокируется, пока для флага не будет установлено значение true. Первоначально у флага значение false.

Устарело с версии 3.8, будет удалено в 3.10 версии.: Параметр loop.

Пример:

async def waiter(event):
    print('waiting for it ...')
    await event.wait()
    print('... got it!')

async def main():
    # Создание объекта Event.
    event = asyncio.Event()

    # Создать задачу, чтобы подождать, пока не будет установлено "событие2".
    waiter_task = asyncio.create_task(waiter(event))

    # Спать в течение 1 секунды и установить событие.
    await asyncio.sleep(1)
    event.set()

    # Подождить, пока исполнитель не закончит свою работу.
    await waiter_task

asyncio.run(main())
coroutine wait()

Подождать, пока не установится событие.

Если событие установлено, немедленно возвращает True. В противном случае блокируется, пока другая задача не вызовет set().

set()

Установить событие.

Все задачи, ожидающие установки события, будут немедленно пробуждены.

clear()

Очистить (сбросить) событие.

Задачи, ожидающие wait(), теперь будут блокироваться до тех пор, пока метод set() не будет повторно вызван.

is_set()

Возвращает True, если событие установлено.

Состояние

class asyncio.Condition(lock=None, *, loop=None)

Объект Condition. Не потокобезопасный.

Примитив состояния asyncio может использоваться задачей, чтобы дождаться какого-либо события, а затем получить монопольный доступ к общему ресурсу.

По сути, объект Condition сочетает в себе функциональность Event и Lock. Можно создавать несколько объектов Condition, совместно использующих одну Lock, что позволяет координировать монопольный доступ к общему ресурсу между различными задачами, заинтересованными в определенных состояниях общего ресурса.

Необязательный аргумент lock должен быть объектом Lock или None. В последнем случае автоматически создаётся новый объект Lock.

Устарело с версии 3.8, будет удалено в 3.10 версии.: Параметр loop.

Предпочтительный способ использования условия — это оператор async with:

cond = asyncio.Condition()

# ... позже
async with cond:
    await cond.wait()

что эквивалентно:

cond = asyncio.Condition()

# ... позже
await cond.acquire()
try:
    await cond.wait()
finally:
    cond.release()
coroutine acquire()

Захватить базовую блокировку.

Этот метод ожидает, пока базовая блокировка не станет разблокированной, устанавливает для неё значение заблокирована и возвращает True.

notify(n=1)

Разбудить не более n задач (по умолчанию 1), ожидающих этого условия. Метод не работает, если нет ожидающих задач.

Блокировка должна быть получена до вызова этого метода и выпущена вскоре после этого. При вызове разблокировки блокировки возникает ошибка RuntimeError.

locked()

Возвращает True, если захвачена базовая блокировка.

notify_all()

Разбудите все задачи, ожидающие этого условия.

Этот метод действует как notify(), но пробуждает все ожидающие задачи.

Блокировка должна быть получена до вызова этого метода и отпущена вскоре после этого. При вызове разблокировки блокировки возникает ошибка RuntimeError.

release()

Освободить базовую блокировку.

При вызове разблокированной блокировки вызывается RuntimeError.

coroutine wait()

Дождаться уведомления.

Если вызывающая задача не получила блокировку при вызове этого метода, возникает RuntimeError.

Этот метод освобождает базовую блокировку, а затем блокируется, пока не будет пробужден вызовом notify() или notify_all(). После пробуждения Condition повторно получает свою блокировку, и этот метод возвращает True.

coroutine wait_for(predicate)

Подождать, пока predicate не станет истиной.

predicate должен быть вызываемым, результат которого будет интерпретироваться как логическое значение. Результирующее значение — это возвращаемое значение.

Семафор

class asyncio.Semaphore(value=1, *, loop=None)

Объект семафора. Не потокобезопасный.

Семафор управляет внутренним счётчиком, который уменьшается при каждом вызове acquire() и увеличивается при каждом вызове release(). Счётчик никогда не может опуститься ниже нуля; когда acquire() обнаруживает, что он равен нулю, он блокируется, ожидая, пока какая-либо задача вызовет release().

Необязательный аргумент value задаёт начальное значение для внутреннего счётчика (по умолчанию 1). Если заданное значение меньше 0, вызывается ValueError.

Устарело с версии 3.8, будет удалено в 3.10 версии.: Параметр loop.

Предпочтительный способ использования семафора — это оператор async with:

sem = asyncio.Semaphore(10)

# ... позже
async with sem:
    # работать с общим ресурсом

что эквивалентно:

sem = asyncio.Semaphore(10)

# ... позже
await sem.acquire()
try:
    # работать с общим ресурсом
finally:
    sem.release()
coroutine acquire()

Захватить семафор.

Если внутренний счётчик больше нуля, уменьшает его на единицу и немедленно возвращает True. Если он равен нулю, дожидается вызова release() и возвращает True.

locked()

Возвращает True, если семафор не может быть получен немедленно.

release()

Освободить семафор, увеличив внутренний счётчик на единицу. Может разбудить задачу, ожидающую получения семафора.

В отличие от BoundedSemaphore, Semaphore позволяет делать больше вызовов release(), чем вызовов acquire().

BoundedSemaphore

class asyncio.BoundedSemaphore(value=1, *, loop=None)

Ограниченный семафорный объект. Не потокобезопасный.

Ограниченный семафор — это версия Semaphore, которая вызывает ValueError в release(), если увеличивает внутренний счётчик выше начального value.

Устарело с версии 3.8, будет удалено в 3.10 версии.: Параметр loop.


Не рекомендуется, начиная с версии 3.7: Захват блокировки с помощью оператора await lock или yield from lock и/или with (with await lock, with (yield from lock)) не рекомендуется. Вместо этого используйте async with lock.