queue
— Синхронизированный класс очереди¶
Модуль queue
реализует очереди с несколькими производителями и
потребителями. Особенно полезен в многопоточном программировании, когда
необходимо безопасно обмениваться информацией между несколькими потоками. Класс
Queue
в данном модуле реализует всю необходимую семантику блокировки.
В модуле реализованы три типа очередей, которые различаются только порядком
получения записей. В очереди FIFO первые
добавленные задачи извлекаются первыми. В очереди
LIFO последняя
добавленная запись извлекается первой (работает как стек). В
очереди с приоритетом записи хранятся отсортированными (с использованием модуля
heapq
), и сначала извлекается запись с наименьшим значением.
Внутри данных трёх типов очередей используются блокировки для временной блокировки конкурирующих потоков; однако они не предназначены для обработки повторного входа в поток.
Кроме того, в модуле реализован «простой» тип очереди
FIFO, SimpleQueue
,
реализация которого обеспечивает
дополнительные гарантии в обмен на меньшую функциональность.
Модуль queue
определяет следующие классы и исключения:
-
class
queue.
Queue
(maxsize=0)¶ Конструктор очереди FIFO. maxsize — целое число, которое устанавливает верхний предел количества элементов, которые могут быть помещены в очередь. Вставка будет заблокирована по достижении этого размера до тех пор, пока не будут израсходованы элементы очереди. Если maxsize меньше или равен нулю, размер очереди бесконечен.
-
class
queue.
LifoQueue
(maxsize=0)¶ Конструктор очереди LIFO. maxsize — целое число, которое устанавливает верхний предел количества элементов, которые могут быть помещены в очередь. Вставка будет заблокирована по достижении этого размера до тех пор, пока не будут израсходованы элементы очереди. Если maxsize меньше или равен нулю, размер очереди бесконечен.
-
class
queue.
PriorityQueue
(maxsize=0)¶ Конструктор приоритетной очереди. maxsize — целое число, которое устанавливает верхний предел количества элементов, которые могут быть помещены в очередь. Вставка будет заблокирована по достижении этого размера до тех пор, пока не будут израсходованы элементы очереди. Если maxsize меньше или равен нулю, размер очереди бесконечен.
Сначала извлекаются записи с наименьшим значением (запись с наименьшим значением возвращается
sorted(list(entries))[0]
). Типичным шаблоном для записей является кортеж в форме:(priority_number, data)
.Если элементы data не сопоставимы, данные могут быть заключены в класс, который игнорирует элемент данных и сравнивает только номер приоритета:
from dataclasses import dataclass, field from typing import Any @dataclass(order=True) class PrioritizedItem: priority: int item: Any=field(compare=False)
-
class
queue.
SimpleQueue
¶ Конструктор неограниченной очереди FIFO. Простым очередям не хватает расширенных функций, таких как отслеживание задач.
Добавлено в версии 3.7.
-
exception
queue.
Empty
¶ Исключение возникает, когда неблокирующий
get()
(илиget_nowait()
) вызывается для пустого объектаQueue
.
-
exception
queue.
Full
¶ Исключение возникает, когда неблокирующий
put()
(илиput_nowait()
) вызывается для заполненного объектаQueue
.
Объекты очереди¶
Объекты очереди (Queue
, LifoQueue
или PriorityQueue
)
предоставляют общедоступные методы, описанные ниже.
-
Queue.
qsize
()¶ Вернуть примерный (approximate) размер очереди. Обратите внимание, что qsize() > 0 не гарантирует, что последующий get() не будет блокироваться, а qsize() < maxsize не гарантирует, что put() не будет блокироваться.
-
Queue.
empty
()¶ Вернуть
True
, если очередь пуста, в противном случае —False
. Если empty() возвращаетTrue
, это не гарантирует, что последующий вызов put() не будет заблокирован. Точно так же, если empty() возвращаетFalse
, это не гарантирует, что последующий вызов get() не будет заблокирован.
-
Queue.
full
()¶ Вернуть
True
, если очередь заполнена, в противном случае —False
. Если full() возвращаетTrue
, это не гарантирует, что последующий вызов get() не будет заблокирован. Точно так же, если full() возвращаетFalse
, это не гарантирует, что последующий вызов put() не заблокируется.
-
Queue.
put
(item, block=True, timeout=None)¶ Поместить item в очередь. Если необязательные аргументы block истинны, а timeout —
None
(по умолчанию), при необходимости блокируется, пока не станет доступен свободный слот. Если timeout является положительным числом, он блокирует не более timeout секунд и вызывает исключениеFull
, если в течение этого времени не было доступного свободного слота. В противном случае (block — false) поместить элемент в очередь, если свободный слот доступен немедленно, иначе вызвать исключениеFull
(timeout в этом случае игнорируется).
-
Queue.
put_nowait
(item)¶ Эквивалентен
put(item, False)
.
-
Queue.
get
(block=True, timeout=None)¶ Удалить и вернуть элемент из очереди. Если необязательные аргументы block — истина, а timeout —
None
(по умолчанию), при необходимости блокируется, пока элемент не станет доступным. Если timeout является положительным числом, он блокируется не более timeout секунд и вызывает исключениеEmpty
, если в течение этого времени ни один элемент не был доступен. В противном случае (block — false) сразу возвращает элемент, если он доступен, иначе вызовет исключениеEmpty
(timeout в данном случае игнорируется).До 3.0 в системах POSIX и для всех версий в Windows, если block истинно, а timeout —
None
, данная операция переходит в непрерывное ожидание базовой блокировки. Это означает, что никаких исключений произойти не может, и, в частности, SIGINT не вызоветKeyboardInterrupt
.
-
Queue.
get_nowait
()¶ Эквивалентен
get(False)
.
Для поддержки отслеживания того, были ли поставленные в очередь задачи полностью обработаны потоками-потребителями демонов, предлагаются два метода.
-
Queue.
task_done
()¶ Указать, что ранее поставленная в очередь задача, завершена. Используется потоками потребителей очереди. Для каждого
get()
, используемого для выборки задачи, последующий вызовtask_done()
сообщает очереди, что обработка задачи завершена.Если в настоящее время блокируется
join()
, он возобновится, когда все элементы будут обработаны (это означает, что вызовtask_done()
был получен для каждого элемента, который былput()
в очереди).Вызывает
ValueError
, если вызывается больше раз, чем было помещено в очередь.
-
Queue.
join
()¶ Блокируется до тех пор, пока все элементы в очереди не будут получены и обработаны.
Счетчик незавершенных задач увеличивается всякий раз, когда элемент добавляется в очередь. Счетчик уменьшается всякий раз, когда потребительский поток вызывает
task_done()
, чтобы указать, что элемент был получен и вся работа над ним завершена. Когда количество незавершенных задач падает до нуля,join()
разблокируется.
Пример того, как дождаться завершения поставленных в очередь задач:
import threading, queue
q = queue.Queue()
def worker():
while True:
item = q.get()
print(f'Working on {item}')
print(f'Finished {item}')
q.task_done()
# включить рабочий поток
threading.Thread(target=worker, daemon=True).start()
# отправить 30 запросов задач работнику
for item in range(30):
q.put(item)
print('All task requests sent\n', end='')
# блокировать, пока все задачи не будут выполнены
q.join()
print('All work completed')
Объекты SimpleQueue¶
Объекты SimpleQueue
предоставляют общедоступные методы, описанные
ниже.
-
SimpleQueue.
qsize
()¶ Вернуть примерный размер очереди. Обратите внимание, что qsize() > 0 не гарантирует, что последующий get() не заблокируется.
-
SimpleQueue.
empty
()¶ Вернуть
True
, если очередь пуста, в противном случае —False
. Если empty() возвращаетFalse
, это не гарантирует, что последующий вызов get() не будет заблокирован.
-
SimpleQueue.
put
(item, block=True, timeout=None)¶ Поместить item в очередь. Метод никогда не блокируется и всегда завершается успешно (за исключением потенциальных низкоуровневых ошибок, таких как отказ от выделения памяти). Необязательные аргументы block и timeout игнорируются и предоставляются только для совместимости с
Queue.put()
.Детали реализации CPython: У метода есть реентерабельная (reentrant) реализация на языке C. То есть вызов
put()
илиget()
может быть прерван другим вызовомput()
в том же потоке без взаимоблокировки или нарушения внутреннего состояния внутри очереди. Это делает его подходящим для использования в деструкторах, таких как методы__del__
или обратные вызовы (callbacks)weakref
.
-
SimpleQueue.
put_nowait
(item)¶ Эквивалентен
put(item)
, при условии совместимости сQueue.put_nowait()
.
-
SimpleQueue.
get
(block=True, timeout=None)¶ Удалить и вернуть элемент из очереди. Если необязательные аргументы block — истина, а timeout —
None
(по умолчанию), если необходимо заблокироваться, до тех пор, пока элемент не станет доступным. Если timeout положительное число, она блокируется не более timeout секунд и вызывает исключениеEmpty
, если в течение этого времени ни один элемент не был доступен. В противном случае (block — false) немедленно возвращает элемент, если он доступен, иначе вызовет исключениеEmpty
(timeout в этом случае игнорируется).
-
SimpleQueue.
get_nowait
()¶ Эквивалентен
get(False)
.
См.также
- Класс
multiprocessing.Queue
- Класс очереди для использования в контексте многопроцессной обработки (а не в многопоточной).
collections.deque
— альтернативная реализация неограниченных
очередей с быстрыми атомарными операциями append()
и popleft()
, которые не требуют блокировки и также
поддерживают индексацию.