Очереди

Исходный код: Lib/asyncio/queues.py


Очереди asyncio спроектированы так, чтобы быть похожими на классы модуля queue. Хотя очереди asyncio не являются потокобезопасными, они предназначены специально для использования в async/await коде.

Обратите внимание, что у методов очередей asyncio нет параметра timeout; используйте функцию asyncio.wait_for() для выполнения операций с очередью с тайм-аутом.

См. также раздел Примеры ниже.

Очередь

class asyncio.Queue(maxsize=0, *, loop=None)

Очередь «первым пришёл — первым ушёл» (FIFO).

Если maxsize меньше или равен нулю, размер очереди бесконечен. Если это целое число больше 0, то await put() блокируется, когда очередь достигает maxsize, пока элемент не будет удалён get().

В отличие от стандартной библиотеки потоков queue, размер очереди всегда известен и может быть возвращён путём вызова метода qsize().

Устарело с версии 3.8, будет удалено в 3.10 версии.: Параметр loop.

Данный класс не потокобезопасный.

maxsize

Количество элементов, разрешённых в очереди.

empty()

Возвращает True, если очередь пуста, в противном случае — False.

full()

Возвращает True, если в очереди есть элементы maxsize.

Если очередь была инициализирована с maxsize=0 (по умолчанию), то full() никогда не возвращает True.

coroutine get()

Удалить и вернуть элемент из очереди. Если очередь пуста, подождать, пока не станет доступен элемент.

get_nowait()

Возвращает элемент, если он доступен сразу, иначе поднимает QueueEmpty.

coroutine join()

Блокировать, пока все элементы в очереди не будут получены и обработаны.

Счётчик незавершенных задач увеличивается всякий раз, когда элемент добавляется в очередь. Счётчик уменьшается всякий раз, когда корутина- потребитель вызывает task_done(), чтобы указать, что элемент был получен и вся работа над ним завершена. Когда количество незавершенных задач падает до нуля, join() разблокируется.

coroutine put(item)

Поместить элемент в очередь. Если очередь заполнена, подождать, пока освободится свободный слот, прежде чем добавлять элемент.

put_nowait(item)

Поставить элемент в очередь без блокировки.

Если сразу нет свободного слота, поднимается QueueFull.

qsize()

Возвращает количество элементов в очереди.

task_done()

Указать, что задача, ранее поставленная в очередь, завершена.

Используется потребителями очереди. Для каждого get(), используемого для выборки задачи, последующий вызов task_done() сообщает очереди, что обработка задачи завершена.

Если в настоящее время блокируется join(), он возобновится, когда все элементы будут обработаны (это означает, что вызов task_done() был получен для каждого элемента, который был put() в очереди).

Поднимает ValueError, если вызывается больше раз, чем было помещено в очередь элементов.

Приоритетная очередь

class asyncio.PriorityQueue

Вариант Queue; извлекает записи в порядке приоритета (сначала самые низкие).

Записи обычно представляют собой кортежи вида (priority_number, data).

Очередь LIFO

class asyncio.LifoQueue

Вариант Queue, который сначала извлекает самые последние добавленные записи (последний пришёл, первый ушёл).

Исключения

exception asyncio.QueueEmpty

Это исключение возникает, когда метод get_nowait() вызывается в пустой очереди.

exception asyncio.QueueFull

Исключение возникает, когда метод put_nowait() вызывается в очереди, достигшей своего maxsize.

Примеры

Очереди можно использовать для распределения нагрузки между несколькими параллельными задачами:

import asyncio
import random
import time


async def worker(name, queue):
    while True:
        # Получить "рабочий элемент" вне очереди.
        sleep_for = await queue.get()

        # Спать "sleep_for" секунд.
        await asyncio.sleep(sleep_for)

        # Сообщение очереди, для обработки "рабочего элемента".
        queue.task_done()

        print(f'{name} has slept for {sleep_for:.2f} seconds')


async def main():
    # Создать очередь, которую мы будем использовать для хранения нашей "рабочей нагрузки".
    queue = asyncio.Queue()

    # Генерирует случайные тайминги и помещает их в очередь.
    total_sleep_time = 0
    for _ in range(20):
        sleep_for = random.uniform(0.05, 1.0)
        total_sleep_time += sleep_for
        queue.put_nowait(sleep_for)

    # Создание трёх рабочих задач для одновременной обработки очереди.
    tasks = []
    for i in range(3):
        task = asyncio.create_task(worker(f'worker-{i}', queue))
        tasks.append(task)

    # Подождать, пока очередь не будет полностью обработана.
    started_at = time.monotonic()
    await queue.join()
    total_slept_for = time.monotonic() - started_at

    # Отменить рабочие задания.
    for task in tasks:
        task.cancel()
    # Подождать, пока все рабочие задачи не будут отменены.
    await asyncio.gather(*tasks, return_exceptions=True)

    print('====')
    print(f'3 workers slept in parallel for {total_slept_for:.2f} seconds')
    print(f'total expected sleep time: {total_sleep_time:.2f} seconds')


asyncio.run(main())