queue — Синхронизированный класс очереди

Исходный код: Lib/queue.py


Модуль queue реализует очереди с несколькими производителями и потребителями. Особенно полезен в многопоточном программировании, когда необходимо безопасно обмениваться информацией между несколькими потоками. Класс Queue в данном модуле реализует всю необходимую семантику блокировки.

В модуле реализованы три типа очередей, которые различаются только порядком получения записей. В очереди FIFO первые добавленные задачи извлекаются первыми. В очереди LIFO последняя добавленная запись извлекается первой (работает как стек). В очереди с приоритетом записи хранятся отсортированными (с использованием модуля heapq), и сначала извлекается запись с наименьшим значением.

Внутри данных трёх типов очередей используются блокировки для временной блокировки конкурирующих потоков; однако они не предназначены для обработки повторного входа в поток.

Кроме того, в модуле реализован «простой» тип очереди FIFO, SimpleQueue, конкретная реализация которого обеспечивает дополнительные гарантии в обмен на меньшую функциональность.

Модуль queue определяет следующие классы и исключения:

class queue.Queue(maxsize=0)

Конструктор очереди FIFO. maxsize — целое число, которое устанавливает верхний предел количества элементов, которые могут быть помещены в очередь. Вставка будет заблокирована по достижении этого размера до тех пор, пока не будут израсходованы элементы очереди. Если maxsize меньше или равен нулю, размер очереди бесконечен.

class queue.LifoQueue(maxsize=0)

Конструктор очереди LIFO. maxsize — целое число, которое устанавливает верхний предел количества элементов, которые могут быть помещены в очередь. Вставка будет заблокирована по достижении этого размера до тех пор, пока не будут израсходованы элементы очереди. Если maxsize меньше или равен нулю, размер очереди бесконечен.

class queue.PriorityQueue(maxsize=0)

Конструктор приоритетной очереди. maxsize — целое число, которое устанавливает верхний предел количества элементов, которые могут быть помещены в очередь. Вставка будет заблокирована по достижении этого размера до тех пор, пока не будут израсходованы элементы очереди. Если maxsize меньше или равен нулю, размер очереди бесконечен.

Сначала извлекаются записи с наименьшим значением (запись с наименьшим значением возвращается sorted(list(entries))[0]). Типичным шаблоном для записей является кортеж в форме: (priority_number, data).

Если элементы data не сопоставимы, данные могут быть заключены в класс, который игнорирует элемент данных и сравнивает только номер приоритета:

from dataclasses import dataclass, field
from typing import Any

@dataclass(order=True)
class PrioritizedItem:
    priority: int
    item: Any=field(compare=False)
class queue.SimpleQueue

Конструктор неограниченной очереди FIFO. Простым очередям не хватает расширенных функций, таких как отслеживание задач.

Добавлено в версии 3.7.

exception queue.Empty

Исключение возникает, когда неблокирующий get() (или get_nowait()) вызывается для пустого объекта Queue.

exception queue.Full

Исключение возникает, когда неблокирующий put() (или put_nowait()) вызывается для заполненного объекта Queue.

Объекты очереди

Объекты очереди (Queue, LifoQueue или PriorityQueue) предоставляют общедоступные методы, описанные ниже.

Queue.qsize()

Вернуть примерный (approximate) размер очереди. Обратите внимание, что qsize() > 0 не гарантирует, что последующий get() не будет блокироваться, а qsize() < maxsize не гарантирует, что put() не будет блокироваться.

Queue.empty()

Вернуть True, если очередь пуста, в противном случае — False. Если empty() возвращает True, это не гарантирует, что последующий вызов put() не будет заблокирован. Точно так же, если empty() возвращает False, это не гарантирует, что последующий вызов get() не будет заблокирован.

Queue.full()

Вернуть True, если очередь заполнена, в противном случае — False. Если full() возвращает True, это не гарантирует, что последующий вызов get() не будет заблокирован. Точно так же, если full() возвращает False, это не гарантирует, что последующий вызов put() не заблокируется.

Queue.put(item, block=True, timeout=None)

Поместить item в очередь. Если необязательные аргументы block истинны, а timeoutNone (по умолчанию), при необходимости блокируется, пока не станет доступен свободный слот. Если timeout является положительным числом, он блокирует не более timeout секунд и вызывает исключение Full, если в течение этого времени не было доступного свободного слота. В противном случае (block — false) поместить элемент в очередь, если свободный слот доступен немедленно, иначе вызвать исключение Full (timeout в этом случае игнорируется).

Queue.put_nowait(item)

Эквивалентен put(item, False).

Queue.get(block=True, timeout=None)

Удалить и вернуть элемент из очереди. Если необязательные аргументы block — истина, а timeoutNone (по умолчанию), при необходимости блокируется, пока элемент не станет доступным. Если timeout является положительным числом, он блокируется не более timeout секунд и вызывает исключение Empty, если в течение этого времени ни один элемент не был доступен. В противном случае (block — false) сразу возвращает элемент, если он доступен, иначе вызовет исключение Empty (timeout в данном случае игнорируется).

До 3.0 в системах POSIX и для всех версий в Windows, если block истинно, а timeoutNone, данная операция переходит в непрерывное ожидание базовой блокировки. Это означает, что никаких исключений произойти не может, и, в частности, SIGINT не вызовет KeyboardInterrupt.

Queue.get_nowait()

Эквивалентен get(False).

Для поддержки отслеживания того, были ли поставленные в очередь задачи полностью обработаны потоками-потребителями демонов, предлагаются два метода.

Queue.task_done()

Указать, что ранее поставленная в очередь задача, завершена. Используется потоками потребителей очереди. Для каждого get(), используемого для выборки задачи, последующий вызов task_done() сообщает очереди, что обработка задачи завершена.

Если в настоящее время блокируется join(), он возобновится, когда все элементы будут обработаны (это означает, что вызов task_done() был получен для каждого элемента, который был put() в очереди).

Поднимает ValueError, если вызывается больше раз, чем было помещено в очередь.

Queue.join()

Блокируется до тех пор, пока все элементы в очереди не будут получены и обработаны.

Счетчик незавершенных задач увеличивается всякий раз, когда элемент добавляется в очередь. Счетчик уменьшается всякий раз, когда потребительский поток вызывает task_done(), чтобы указать, что элемент был получен и вся работа над ним завершена. Когда количество незавершенных задач падает до нуля, join() разблокируется.

Пример того, как дождаться завершения поставленных в очередь задач:

import threading, queue

q = queue.Queue()

def worker():
    while True:
        item = q.get()
        print(f'Working on {item}')
        print(f'Finished {item}')
        q.task_done()

# включить рабочий поток
threading.Thread(target=worker, daemon=True).start()

# отправить 30 запросов задач работнику
for item in range(30):
    q.put(item)
print('All task requests sent\n', end='')

# блокировать, пока все задачи не будут выполнены
q.join()
print('All work completed')

Объекты SimpleQueue

Объекты SimpleQueue предоставляют общедоступные методы, описанные ниже.

SimpleQueue.qsize()

Вернуть примерный размер очереди. Обратите внимание, что qsize() > 0 не гарантирует, что последующий get() не заблокируется.

SimpleQueue.empty()

Вернуть True, если очередь пуста, в противном случае — False. Если empty() возвращает False, это не гарантирует, что последующий вызов get() не будет заблокирован.

SimpleQueue.put(item, block=True, timeout=None)

Поместить item в очередь. Метод никогда не блокируется и всегда завершается успешно (за исключением потенциальных низкоуровневых ошибок, таких как отказ от выделения памяти). Необязательные аргументы block и timeout игнорируются и предоставляются только для совместимости с Queue.put().

Детали реализации CPython: У метода есть реентерабельная (reentrant) реализация на языке C. То есть вызов put() или get() может быть прерван другим вызовом put() в том же потоке без взаимоблокировки или нарушения внутреннего состояния внутри очереди. Это делает его подходящим для использования в деструкторах, таких как методы __del__ или обратные вызовы (callbacks) weakref.

SimpleQueue.put_nowait(item)

Эквивалентен put(item), при условии совместимости с Queue.put_nowait().

SimpleQueue.get(block=True, timeout=None)

Удалить и вернуть элемент из очереди. Если необязательные аргументы block — истина, а timeoutNone (по умолчанию), если необходимо заблокироваться, до тех пор, пока элемент не станет доступным. Если timeout положительное число, она блокируется не более timeout секунд и вызывает исключение Empty, если в течение этого времени ни один элемент не был доступен. В противном случае (block — false) немедленно возвращает элемент, если он доступен, иначе вызовет исключение Empty (timeout в этом случае игнорируется).

SimpleQueue.get_nowait()

Эквивалентен get(False).

См.также

Класс multiprocessing.Queue
Класс очереди для использования в контексте многопроцессной обработки (а не в многопоточной).

collections.deque — альтернативная реализация неограниченных очередей с быстрыми атомарными операциями append() и popleft(), которые не требуют блокировки и также поддерживают индексацию.