threading — Параллелизм на основе потоков


Данный модуль создаёт высокоуровневые потоковые интерфейсы поверх модуля нижнего уровня _thread. См. также модуль queue.

Изменено в версии 3.7: Раньше данный модуль был необязательным, теперь он доступен всегда.

Примечание

Не перечислены далее имена camelCase, используемые для некоторых методов и функций в этом модуле в ветке Python 2.x, по-прежнему поддерживаются данным модулем.

Данный модуль определяет следующие функции:

threading.active_count()

Возвращает количество активных объектов Thread. Возвращенное количество равно длине списка, возвращаемого enumerate().

threading.current_thread()

Возвращает текущий объект Thread, соответствующий потоку управления вызывающего объекта. Если поток управления вызывающей стороны не был создан через модуль threading, возвращается фиктивный объект потока с ограниченной функциональностью.

threading.excepthook(args, /)

Обработка неперехваченного исключения, вызванного Thread.run().

У аргумента args следующие атрибуты:

  • exc_type: тип исключения.
  • exc_value: значение исключения, может быть None.
  • exc_traceback: Отслеживание исключения, может быть None.
  • thread: поток, вызвавший исключение, может быть None.

Если exc_typeSystemExit, исключение автоматически игнорируется. В противном случае исключение распечатывается на sys.stderr.

Если данная функция вызывает исключение, для его обработки вызывается sys.excepthook().

threading.excepthook() можно переопределить, чтобы управлять обработкой неперехваченных исключений, вызванных Thread.run().

Сохранение exc_value с использованием настраиваемого обработчика может создать ссылочный цикл. Его следует явно очистить, чтобы прервать ссылочный цикл, когда исключение больше не требуется.

Сохранение потока с использованием настраиваемого хука может воскресить его, если он установлен на финализируемый объект. Избегайте сохранения потока после завершения настраиваемой хука, чтобы избежать воскрешения объектов.

См.также

sys.excepthook() обрабатывает неперехваченные исключения.

Добавлено в версии 3.8.

threading.get_ident()

Возвращает «идентификатор потока» текущего потока. Это ненулевое целое число. Его значение не имеет прямого значения; оно предназначено для использования в качестве волшебного файла cookie, например для индексации словаря данных, относящихся к потоку. Идентификаторы потока могут быть повторно использованы, когда поток завершается и создаётся другой поток.

Добавлено в версии 3.3.

threading.get_native_id()

Возвращает собственный интегральный идентификатор потока текущего потока, назначенного ядром. Это целое неотрицательное число. Его значение может использоваться для уникальной идентификации данного потока в масштабах всей системы (до тех пор, пока поток не завершится, после чего значение может быть переработано ОС).

Доступность: Windows, FreeBSD, Linux, macOS, OpenBSD, NetBSD, AIX.

Добавлено в версии 3.8.

threading.enumerate()

Возвращает список всех живых объектов Thread. В список входят демонические потоки, объекты фиктивных потоков, созданные current_thread(), и основной поток. Он исключает завершенные потоки и потоки, которые ещё не были запущены.

threading.main_thread()

Возвращает основной объект Thread. В обычных условиях основной поток — это поток, из которого был запущен интерпретатор Python.

Добавлено в версии 3.4.

threading.settrace(func)

Устанавливает функцию трассировки для всех потоков, запускаемых из модуля threading. func будет передан в sys.settrace() для каждого потока перед вызовом его метода run().

threading.setprofile(func)

Устанавливает функцию профиля для всех потоков, запускаемых из модуля threading. func будет передан в sys.setprofile() для каждого потока перед вызовом его метода run().

threading.stack_size([size])

Возвращает размер стека потоков, используемый при создании новых потоков. Необязательный аргумент size указывает размер стека, который будет использоваться для создаваемых впоследствии потоков, и должен быть 0 (использовать платформу или настроенное по умолчанию) или положительное целочисленное значение не менее 32 768 (32 КиБ). Если size не указан, используется 0. Если изменение размера стека потоков не поддерживается, возникает RuntimeError. Если указанный размер стека недопустим, вызывается ValueError и размер стека не изменяется. 32 КиБ в настоящее время является минимальным поддерживаемым значением размера стека, чтобы гарантировать достаточное пространство стека для самого интерпретатора. Обратите внимание, что некоторые платформы могут иметь определённые ограничения на значения размера стека, такие как требование минимального размера стека > 32 КиБ или требование выделения, кратного размеру страницы системной памяти — для получения дополнительной информации следует обратиться к документации по платформе (страницы 4 КиБ распространены; использование кратных 4096 для размера стека является предлагаемым подходом в отсутствие более подробной информации).

Доступность: Windows, системы с POSIX потоками.

Данный модуль также определяет следующую константу:

threading.TIMEOUT_MAX

Максимально допустимое значение параметра timeout для функций блокировки (Lock.acquire(), RLock.acquire(), Condition.wait() и т. д.). Указание тайм-аута больше этого значения вызовет OverflowError.

Добавлено в версии 3.2.

Данный модуль определяет ряд классов, которые подробно описаны в разделах далее.

Дизайн этого модуля частично основан на потоковой модели Java. Однако там, где Java устанавливает блокировки и условные переменные в базовое поведение каждого объекта, в Python они являются отдельными объектами. Класс Python Thread поддерживает подмножество поведения класса Java Thread; в настоящее время нет ни приоритетов, ни групп потоков, и потоки не могут быть уничтожены, остановлены, приостановлены, возобновлены или прерваны. Статические методы Java-класса Thread, если они реализованы, отображаются на функции уровня модуля.

Все рассмотренные далее методы выполняются атомарно.

Локальные данные потока

Локальные данные потока — это данные, значения которых зависят от потока. Чтобы управлять локальными данными потока, просто создайте экземпляр local (или подкласс) и сохранить на нём атрибуты:

mydata = threading.local()
mydata.x = 1

Значения экземпляра будут разными для разных потоков.

class threading.local

Класс, представляющий локальные данные потока.

Дополнительные сведения и обширные примеры см. в строке документации модуля _threading_local.

Объекты потока

Класс Thread представляет действие, которое выполняется в отдельном потоке управления. Есть два способа указать действие: передав вызываемый объект в конструктор или переопределив метод run() в подклассе. Никакие другие методы (кроме конструктора) не должны переопределяться в подклассе. Другими словами, переопределяются только методы __init__() и run() этого класса.

После создания объекта потока его активность должна быть запущена путём вызова метода потока start(). Это вызывает метод run() в отдельном потоке управления.

Как только активность потока запущена, он считается «живым». Он перестаёт быть активным, когда его метод run() завершается — либо обычно, либо при возникновении необработанного исключения. Метод is_alive() проверяет, жив ли поток.

Другие потоки могут вызывать метод потока join(). Он блокирует вызывающий поток до тех пор, пока поток, чей метод join() вызывается, не будет завершён.

У потока есть имя. Имя можно передать конструктору и прочитать или изменить с помощью атрибута name.

Если метод run() вызывает исключение, для его обработки вызывается threading.excepthook(). По умолчанию threading.excepthook() игнорирует SystemExit без уведомления.

Поток можно пометить как «поток демона». Значение этого флага заключается в том, что вся программа Python завершается, когда остаются только потоки демона. Начальное значение наследуется от создающего потока. Флаг можно установить с помощью свойства daemon или daemon аргумента конструктора.

Примечание

Потоки демона внезапно останавливаются при завершении работы. Их ресурсы (например открытые файлы, транзакции базы данных и т. д.) Могут быть освобождены некорректно. Если вам нужно чтобы ваши потоки останавливались корректно, сделайте их недемоническими и используйте подходящий механизм сигнализации, например Event.

Есть объект «основной поток»; это соответствует начальному потоку управления в программе Python. Это не поток демона.

Есть вероятность того, что будут созданы «объекты фиктивного потока». Это объекты потоков, соответствующие «чужеродным потокам», которые представляют собой потоки управления, запущенные вне модуля потоковой передачи, например, непосредственно из кода C. Объекты фиктивного потока имеют ограниченную функциональность; они всегда считаются живыми и демоническими и не могут быть join(). Они никогда не удаляются, т. к. невозможно обнаружить завершение чужих потоков.

class threading.Thread(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None)

Данный конструктор всегда следует вызывать с ключевыми аргументами. Аргументы следующие:

group должно быть None; зарезервировано для будущего расширения при реализации класса ThreadGroup.

target — это вызываемый объект, вызываемый методом run(). По умолчанию None, что означает, что ничего не вызывается.

name — это имя потока. По умолчанию уникальное имя создаётся в форме «Thread-N», где N — небольшое десятичное число.

args — это кортеж аргументов для целевого вызова. По умолчанию ().

kwargs — это словарь ключевых аргументов для целевого вызова. По умолчанию {}.

Если не None, daemon явно устанавливает, является ли поток демоническим. Если None (по умолчанию), демоническое свойство наследуется от текущего потока.

Если подкласс переопределяет конструктор, он должен обязательно вызвать конструктор базового класса (Thread.__init__()), прежде чем делать что-либо ещё с потоком.

Изменено в версии 3.3: Добавлен аргумент daemon.

start()

Начинает активность потока.

Он должен вызываться не более одного раза для каждого объекта потока. Он организует вызов метода объекта run() в отдельном потоке управления.

Данный метод вызовет RuntimeError, если вызывается более одного раза для одного и того же объекта потока.

run()

Представляющий активность потока метод.

Вы можете переопределить данный метод в подклассе. Стандартный метод run() вызывает вызываемый объект, переданный конструктору объекта в качестве аргумента target, если таковой имеется, с позиционными аргументами и ключевыми аргументами, взятыми из аргументов args и kwargs соответственно.

join(timeout=None)

Ожидает, пока поток не завершится. Он блокирует вызывающий поток до тех пор, пока поток, чей метод join() вызван, не завершится — либо обычно, либо через необработанное исключение — либо до тех пор, пока не произойдет необязательный тайм-аут.

Если присутствует аргумент timeout, а не None, это должно быть число с плавающей запятой, указывающее тайм-аут для операции в секундах (или его долях). Поскольку join() всегда возвращает None, вы должны вызвать is_alive() после join(), чтобы решить, произошёл ли тайм-аут — если поток все ещё жив, время ожидания вызова join() истекло.

Если аргумент timeout отсутствует или None, операция будет заблокирована до завершения потока.

Поток может быть join()“ым много раз.

join() вызывает RuntimeError, если предпринимается попытка присоединиться к текущему потоку, поскольку это может вызвать взаимоблокировку. Это также ошибка join() потока до того, как он был запущен, и попытки сделать это вызвать то же исключение.

name

Строка, используемая только для идентификации. У неё нет семантики. Одно и то же имя может быть присвоено нескольким потокам. Начальное имя задается конструктором.

getName()
setName()

Старый API геттера/сеттера для name; вместо этого используйте его напрямую как свойство.

ident

«Идентификатор потока» этого потока или None, если поток не был запущен. Это ненулевое целое число. См. функцию get_ident(). Идентификаторы потока могут быть повторно использованы, когда поток завершается и создаётся другой поток. Идентификатор доступен даже после выхода из потока.

native_id

Собственный интегральный идентификатор потока. Это неотрицательное целое число или None, если поток не был запущен. См. функцию get_native_id(). Это представляет собой идентификатор потока (TID), назначенный потоку ОС (ядром). Его значение может использоваться для уникальной идентификации данного потока в масштабах всей системы (до тех пор, пока поток не завершится, после чего значение может быть переработано ОС).

Примечание

Подобно идентификаторам процессов, идентификаторы потоков действительны (гарантированно уникальны для всей системы) только с момента создания потока до его завершения.

Доступность: Требуется функция get_native_id().

Добавлено в версии 3.8.

is_alive()

Возвращает, жив ли поток.

Данный метод возвращает True непосредственно перед запуском метода run() до момента сразу после завершения метода run(). Функция модуля enumerate() возвращает список всех активных потоков.

daemon

Логическое значение, указывающее, является ли данный поток потоком демона (True) или нет (False). Оно должно быть установлено до вызова start(), в противном случае будет повышено значение RuntimeError. Его начальное значение наследуется от создающего потока; основной поток не является потоком демона, и поэтому все потоки, созданные в основном потоке, по умолчанию имеют значение daemon = False.

Вся программа Python завершается, когда не остаётся живых потоков, не являющихся демонами.

isDaemon()
setDaemon()

Старый API геттера/сеттера для daemon; вместо этого используйте его напрямую как свойство.

Детали реализации CPython: В CPython из-за Глобальной блокировки интерпретатора только один поток может выполнять код Python одновременно (даже если некоторые библиотеки, ориентированные на производительность, могут преодолеть это ограничение). Если вы хотите, чтобы ваше приложение лучше использовало вычислительные ресурсы многоядерных машин, рекомендуется использовать multiprocessing или concurrent.futures.ProcessPoolExecutor. Однако многопоточность по-прежнему является подходящей моделью, если вы хотите одновременно выполнять несколько задач, связанных с вводом-выводом.

Объекты блокировки

Примитивная блокировка — это примитив синхронизации, не принадлежащий потоку при блокировке. В Python в настоящее время это самый низкий из доступных примитивов синхронизации, реализованный непосредственно модулем расширения _thread.

Примитивная блокировка находится в одном из двух состояний: «заблокировано» или «разблокировано». Создаётся в разблокированном состоянии. У неё два основных метода: acquire() и release(). Когда состояние разблокировано, acquire() меняет состояние на заблокированное и немедленно возвращается. Когда состояние заблокировано, acquire() блокируется до тех пор, пока вызов release() в другом потоке не изменит его на разблокированный, затем вызов acquire() сбрасывает его на заблокированный и возвращается. Метод release() следует вызывать только в заблокированном состоянии; она меняет состояние на разблокировано и немедленно возвращается. Если будет сделана попытка снять разблокированную блокировку, будет вызвано RuntimeError.

Блокировки также поддерживают протокол управления контекстом.

Когда более одного потока заблокированы в acquire() в ожидании перехода состояния в разблокированное, только один поток продолжает работу, когда вызов release() сбрасывает состояние в разблокированное; какой из ожидающих потоков продолжается, не определён и может варьироваться в зависимости от реализации.

Все методы выполняются атомарно.

class threading.Lock

Класс, реализующий примитивные объекты блокировки. Как только поток получил блокировку, последующие попытки получить его блокируются, пока он не будет освобожден; любой поток может освободить его.

Обратите внимание, что Lock на самом деле является фабричной функцией, которая возвращает экземпляр наиболее эффективной версии класса Lock, поддерживаемого платформой.

acquire(blocking=True, timeout=-1)

Получает блокировку, блокирующую или неблокирующую.

При вызове с аргументом blocking, установленным на True (по умолчанию), блокировать до тех пор, пока блокировка не будет разблокирована, затем устанавливает для него значение заблокировано и возвращает True.

При вызове с аргументом blocking, установленным на False, не блокировать. Если вызов с blocking, установленным на True, будет заблокирован, немедленно возвращает False; в противном случае устанавливает блокировку на блокировку и возвращает True.

При вызове с аргументом timeout с плавающей запятой, установленным в положительное значение, блокируется максимум на количество секунд, указанное timeout, и пока блокировка не может быть получена. Аргумент timeout для -1 указывает неограниченное ожидание. Запрещено указывать timeout, когда blocking ложно.

Возвращаемое значение — True, если блокировка получена успешно, False, если нет (например, если срок действия timeout истёк).

Изменено в версии 3.2: Параметр timeout новый.

Изменено в версии 3.2: Получение блокировки теперь может быть прервано сигналами в POSIX, если базовая реализация потоковой передачи поддерживает это.

release()

Отпускает блокировку. Может быть вызвано из любого потока, а не только из потока, получившего блокировку.

Когда блокировка заблокирована, сбрасывает ее до разблокированного и возвращает. Если какие-либо другие потоки заблокированы в ожидании разблокировки блокировки, разрешает выполнение ровно одному из них.

При вызове разблокированной блокировки вызывается RuntimeError.

Нет возвращаемого значения.

locked()

Возвращает истину, если блокировка получена.

Объекты RLock

Повторяющаяся блокировка — это примитив синхронизации, который может быть получен несколько раз одним и тем же потоком. Внутри он использует понятия «поток-владелец» и «уровень рекурсии» в дополнение к заблокированному/ разблокированному состоянию, используемому примитивными блокировками. В заблокированном состоянии какой-то поток владеет блокировкой; в разблокированном состоянии ни один поток не владеет им.

Чтобы заблокировать блокировку, поток вызывает свой метод acquire(); это возвращается, когда поток владеет блокировкой. Чтобы разблокировать блокировку, поток вызывает свой метод release(). Пары вызовов acquire()/ release() могут быть вложенными; только последний release() (release() самой внешней пары) сбрасывает блокировку на разблокировку и позволяет продолжить работу другому потоку, заблокированному в acquire().

Повторяющиеся блокировки также поддерживают протокол управления контекстом.

class threading.RLock

Данный класс реализует объекты повторяющейся блокировки. Повторяющаяся блокировка должна быть снята потоком, который её получил. Как только поток получил повторно повторяющуюся блокировку, тот же поток может получить её снова без блокировки; поток должен освобождать его каждый раз, когда он его получает.

Обратите внимание, что RLock на самом деле является фабричной функцией, которая возвращает экземпляр наиболее эффективной версии класса RLock, поддерживаемого платформой.

acquire(blocking=True, timeout=-1)

Получить блокировку, блокирующую или неблокирующую.

При вызове без аргументов: если данный поток уже владеет блокировкой, увеличивает уровень рекурсии на единицу и немедленно возвращается. В противном случае, если другой поток владеет блокировкой, блокируется, пока блокировка не будет разблокирована. Как только блокировка разблокирована (не принадлежит ни одному потоку), захватывает владение, устанавливает уровень рекурсии на единицу и возвращается. Если более одного потока заблокированы в ожидании разблокировки блокировки, только один из них сможет получить право владения блокировкой. В этом случае нет возвращаемого значения.

При вызове с аргументом blocking, установленным в значение истина, выполнить те же действия, что и при вызове без аргументов, и возвращает True.

При вызове с аргументом blocking, установленным в значение ложь, не блокировать. Если вызов без аргумента будет заблокирован, немедленно возвращает False; в противном случае сделает то же самое, что и при вызове без аргументов, и возвращает True.

При вызове с аргументом timeout с плавающей запятой, установленным в положительное значение, блокируется максимум на количество секунд, указанное timeout, и пока блокировка не может быть получена. Возвращает True, если блокировка была получена, и ложь, если истекло время ожидания.

Изменено в версии 3.2: Новый параметр timeout.

release()

Снять блокировку, уменьшив уровень рекурсии. Если после декремента он равен нулю, сбрасывает блокировку на разблокировано (не принадлежащую ни одному потоку), и если какие-либо другие потоки заблокированы в ожидании разблокировки блокировки, разрешает выполнение ровно одного из них. Если после декремента уровень рекурсии все ещё отличен от нуля, блокировка остаётся заблокированной и принадлежит вызывающему потоку.

Вызывайте данный метод только тогда, когда вызывающий поток владеет блокировкой. RuntimeError возникает, если данный метод вызывается при разблокировке блокировки.

Нет возвращаемого значения.

Состояние объектов

Переменную состояния всегда связана с какой-то блокировкой; её можно передать, или она будет создана по умолчанию. Передача одного полезна, когда несколько переменных состояния должны использовать одну и ту же блокировку. Блокировка является частью объекта состояния: вам не нужно отслеживать её отдельно.

Переменная состояния подчиняется протоколу управления контекстом: с помощью оператора with устанавливается связанная блокировка на время вложенного блока. Методы acquire() и release() также вызывают соответствующие методы связанной блокировки.

Другие методы должны вызываться с удерживаемой связанной блокировкой. Метод wait() снимает блокировку, а затем блокируется, пока другой поток не разбудит её, вызвав notify() или notify_all(). После пробуждения wait() повторно получает блокировку и возвращается. Также можно указать тайм-аут.

Метод notify() пробуждает один из потоков, ожидающих переменной условия, если таковые ожидают. Метод notify_all() пробуждает все потоки, ожидающие переменной условия.

Примечание: методы notify() и notify_all() не снимают блокировку; это означает, что пробуждённый поток или потоки не вернутся из своего вызова wait() немедленно, а только тогда, когда поток, который вызвал notify() или notify_all(), окончательно откажется от владения блокировкой.

Типичный стиль программирования с использованием условных переменных использует блокировку для синхронизации доступа к некоторому общему состоянию; заинтересованные в изменении состояния потоки, повторно вызывают wait(), пока не увидят желаемое состояние, в то время как изменяющие состояние потоки, вызывают notify() или notify_all(), когда они изменяют состояние таким образом, что оно может быть желаемым состоянием для одного из обслуживающих. Например, следующий код представляет собой общую ситуацию производитель-потребитель с неограниченной ёмкостью буфера:

# Потребить один элемент
with cv:
    while not an_item_is_available():
        cv.wait()
    get_an_available_item()

# Произвести один элемент
with cv:
    make_an_item_available()
    cv.notify()

Проверка цикла while для условия приложения необходима, потому что wait() может вернуться через произвольно долгое время, а условие, которое вызвало вызов notify(), может больше не выполняться. Это присуще многопоточному программированию. Метод wait_for() может использоваться для автоматизации проверки условий и упрощает вычисление тайм-аутов:

# Потребить один элемент
with cv:
    cv.wait_for(an_item_is_available)
    get_an_available_item()

Чтобы выбрать между notify() и notify_all(), подумайте, может ли одно изменение состояния быть интересным только для одного или нескольких ожидающих потоков. Например, в типичной ситуации производитель-потребитель добавление одного элемента в буфер требует только пробуждения одного потока-потребителя.

class threading.Condition(lock=None)

Данный класс реализует объекты переменных состояний. Переменная состояния позволяет одному или нескольким потокам ждать, пока они не будут уведомлены другим потоком.

Если указан аргумент lock, а не None, это должен быть объект Lock или RLock, и он используется в качестве базовой блокировки. В противном случае создаётся новый объект RLock, который используется в качестве базовой блокировки.

Изменено в версии 3.3: Изменена с фабричной функции на класс.

acquire(*args)

Получить базовую блокировку. Данный метод вызывает соответствующий метод базовой блокировки; возвращаемое значение — это то, что возвращает данный метод.

release()

Освободить базовую блокировку. Данный метод вызывает соответствующий метод базовой блокировки; нет возвращаемого значения.

wait(timeout=None)

Подождать, пока не появится уведомление или, пока не истечёт время ожидания. Если вызывающий поток не получил блокировку при вызове этого метода, возникает RuntimeError.

Данный метод освобождает базовую блокировку, а затем блокируется до тех пор, пока он не будет пробуждён вызовом notify() или notify_all() для той же переменной условия в другом потоке или до тех пор, пока не произойдет необязательный тайм-аут. После пробуждения или истечения времени ожидания он повторно устанавливает блокировку и возвращается.

Если присутствует аргумент timeout, а не None, это должно быть число с плавающей запятой, указывающее тайм-аут для операции в секундах (или его долях).

Когда базовая блокировка — это RLock, она не снимается с помощью своего метода release(), т. к. это может фактически не разблокировать блокировку, когда она была получена несколько раз рекурсивно. Вместо этого используется внутренний интерфейс класса RLock, который действительно разблокирует его, даже если он был рекурсивно получен несколько раз. Затем используется другой внутренний интерфейс для восстановления уровня рекурсии при повторном захвате блокировки.

Возвращаемое значение — True, если срок действия данного timeout не истёк, в этом случае это False.

Изменено в версии 3.2: Раньше метод всегда возвращал None.

wait_for(predicate, timeout=None)

Подождать, пока условие не станет истинным. predicate должен быть вызываемым, результат которого будет интерпретироваться как логическое значение. Может быть предоставлен timeout, дающий максимальное время ожидания.

Данный служебный метод может вызывать wait() несколько раз, пока не будет выполнен предикат или пока не истечёт время ожидания. Возвращаемое значение является последним возвращаемым значением предиката и будет вычисляться как False, если время ожидания метода истекло.

Игнорирование функции тайм-аута вызов этого метода примерно эквивалентен записи:

while not predicate():
    cv.wait()

Следовательно, применяются те же правила, что и для wait(): блокировка должна удерживаться при вызове и повторно захвачена при возврате. Предикат вычисляется с удерживаемой блокировкой.

Добавлено в версии 3.2.

notify(n=1)

По умолчанию пробуждает один поток, ожидающий этого условия, если таковой имеется. Если вызывающий поток не получил блокировку при вызове этого метода, возникает RuntimeError.

Данный метод пробуждает не более n потоков, ожидающих переменной условия; это не работает, если нет ожидающих потоков.

Текущая реализация пробуждает ровно n поток, если хотя бы n поток ожидает. Однако полагаться на такое поведение небезопасно. Будущая оптимизированная реализация может иногда активировать более n потоков.

Примечание: пробужденный поток фактически не возвращается из своего вызова wait(), пока он не сможет повторно получить блокировку. Поскольку notify() не снимает блокировку, его вызывающий должен сделать это.

notify_all()

Разбудить все потоки, ожидающие этого условия. Данный метод действует как notify(), но пробуждает все ожидающие потоки вместо одного. Если вызывающий поток не получил блокировку при вызове этого метода, возникает RuntimeError.

Семафорные объекты

Это один из старейших примитивов синхронизации в истории информатики, изобретенный ранним голландским учёным-компьютерщиком Эдсгером В. Дейкстра (он использовал имена P() и V() вместо acquire() и release()).

Семафор управляет внутренним счетчиком, который уменьшается при каждом вызове acquire() и увеличивается при каждом вызове release(). Счётчик никогда не может опуститься ниже нуля; когда acquire() обнаруживает, что он равен нулю, он блокируется, ожидая, пока какой-либо другой поток не вызовет release().

Семафоры также поддерживают протокол управления контекстом.

class threading.Semaphore(value=1)

Данный класс реализует объекты семафоров. Семафор управляет атомарным счётчиком, представляющим количество вызовов release() минус количество вызовов acquire() плюс начальное значение. Метод acquire() при необходимости блокирует выполнение до тех пор, пока не сможет вернуться без отрицательного значения счетчика. Если не указан, value по умолчанию равен 1.

Необязательный аргумент предоставляет начальное значение value для внутреннего счетчика; по умолчанию это 1. Если заданное значение value меньше 0, вызывается ValueError.

Изменено в версии 3.3: изменена с фабричной функции на класс.

acquire(blocking=True, timeout=None)

Захватить семафор.

При вызове без аргументов:

  • Если внутренний счётчик больше нуля при входе, уменьшает его на единицу и немедленно возвращает True.
  • Если внутренний счётчик при входе равен нулю, блокировка до пробуждения вызовом release(). После пробуждения (и счётчик больше 1) уменьшает счётчик на 1 и возвращает True. При каждом вызове release() будет пробуждаться ровно один поток. Не следует полагаться на порядок, в котором пробуждаются потоки.

При вызове с blocking, установленным в ложь, не блокировать. Если вызов без аргумента будет заблокирован, немедленно возвращает False; в противном случае сделайте то же самое, что и при вызове без аргументов, и возвращает True.

При вызове с timeout, отличным от None, он будет блокироваться максимум на timeout секунд. Если получение не завершилось успешно в течение этого интервала, возвращает False. В противном случае возвращает True.

Изменено в версии 3.2: Новый параметр timeout.

release()

Освободить семафор, увеличив внутренний счётчик на единицу. Когда он был равен нулю при входе, а другой поток ожидает, когда он снова станет больше нуля, данный поток разбудят.

class threading.BoundedSemaphore(value=1)

Класс, реализующий ограниченные объекты семафоров. Ограниченный семафор проверяет, не превышает ли его текущее значение его начальное значение. Если это так, вызывается ValueError. В большинстве случаев семафоры используются для защиты ресурсов с ограниченной ёмкостью. Если семафор выпускается слишком много раз, это признак ошибки. Если не указан, value по умолчанию равен 1.

Изменено в версии 3.3: Изменена с фабричной функции на класс.

Пример Semaphore

Семафоры часто используются для защиты ресурсов с ограниченной ёмкостью, например сервера базы данных. В любой ситуации, когда размер ресурса фиксирован, вы должны использовать ограниченный семафор. Перед созданием любых рабочих потоков ваш основной поток инициализирует семафор:

maxconnections = 5
# ...
pool_sema = BoundedSemaphore(value=maxconnections)

После создания рабочие потоки вызывают методы получения и освобождения семафора, когда им нужно подключиться к серверу:

with pool_sema:
    conn = connectdb()
    try:
        # ... использовать соединение ...
    finally:
        conn.close()

Использование ограниченного семафора снижает вероятность того, что ошибка программирования, из-за которой семафор освобождается больше, чем он захватывается, останется незамеченной.

Объекты событий

Это один из простейших механизмов связи между потоками: один поток сигнализирует о событии, а другие потоки его ждут.

Объект события управляет внутренним флагом, которому можно присвоить значение истина с помощью метода set() и сбросить значение ложь с помощью метода clear(). Метод wait() блокируется до тех пор, пока флаг не станет истинным.

class threading.Event

Класс, реализующий объекты событий. Событие управляет флагом, которому можно присвоить значение истина с помощью метода set() и сбросить значение ложь с помощью метода clear(). Метод wait() блокируется, пока флаг не станет истинным. Флаг изначально ложный.

Изменено в версии 3.3: Изменена с фабричной функции на класс.

is_set()

Возвращает True тогда и только тогда, когда внутренний флаг истинен.

set()

Устанавливает для внутреннего флага значение истина. Пробуждаются все потоки, ожидающие его выполнения. Потоки, которые вызывают wait() после установки флага, не будут блокироваться вообще.

clear()

Сбрасывает внутренний флаг на ложь. Впоследствии потоки, вызывающие wait(), будут блокироваться до тех пор, пока не будет вызван set(), чтобы снова установить для внутреннего флага значение истина.

wait(timeout=None)

Блокировать, пока внутренний флаг не станет истинным. Если внутренний флаг установлен в истинное состояние при входе, немедленно вернуться. В противном случае блокируется, пока другой поток не вызовет set(), чтобы установить флаг в значение истина, или пока не наступит необязательный тайм-аут.

Если присутствует аргумент тайм-аута, а не None, это должно быть число с плавающей запятой, определяющее тайм-аут для операции в секундах (или его долях).

Данный метод возвращает True тогда и только тогда, когда для внутреннего флага установлено значение истина, либо до вызова ожидания, либо после начала ожидания, поэтому он всегда будет возвращать True, за исключением случаев, когда задан тайм-аут и время ожидания операции истекло.

Изменено в версии 3.1: Раньше метод всегда возвращал None.

Объекты таймера

Данный класс представляет действие, которое следует запускать только после того, как — пройдет определенное время по таймеру. Timer является подклассом Thread и, как таковой, также служит примером создания пользовательских потоков.

Таймеры запускаются, как и потоки, путём вызова их метода start(). Таймер можно остановить (до начала его действия), вызвав метод cancel(). Интервал, который таймер будет ожидать перед выполнением своего действия, может не совпадать с интервалом, указанным пользователем.

Например:

def hello():
    print("hello, world")

t = Timer(30.0, hello)
t.start()  # через 30 секунд будет напечатано "hello, world"
class threading.Timer(interval, function, args=None, kwargs=None)

Создать таймер, который будет запускать function с аргументами args и ключевыми аргументами kwargs по истечении interval секунд. Если args равен None (по умолчанию), то будет использоваться пустой список. Если kwargs равен None (по умолчанию), то будет использоваться пустой словарь.

Изменено в версии 3.3: изменена с фабричной функции на класс.

cancel()

Остановить таймер и отменить выполнение действия таймера. Это будет работать, только если таймер все ещё находится в стадии ожидания.

Барьерные объекты

Добавлено в версии 3.2.

Данный класс предоставляет простой примитив синхронизации для использования фиксированным числом потоков, которым необходимо ждать друг друга. Каждый из потоков пытается преодолеть барьер, вызывая метод wait(), и будет блокироваться до тех пор, пока все потоки не выполнят свои вызовы wait(). В данный момент потоки освобождаются одновременно.

Барьер можно использовать повторно любое количество раз для одного и того же количества потоков.

В качестве примера приведём простой способ синхронизации клиентского и серверного потоков:

b = Barrier(2, timeout=5)

def server():
    start_server()
    b.wait()
    while True:
        connection = accept_connection()
        process_server_connection(connection)

def client():
    b.wait()
    while True:
        connection = make_connection()
        process_client_connection(connection)
class threading.Barrier(parties, action=None, timeout=None)

Создать объект барьера для parties числа потоков. action, если он предоставлен, может быть вызван одним из потоков при их освобождении. timeout — значение тайм-аута по умолчанию, если для метода wait() ничего не указано.

wait(timeout=None)

Пропустить барьер. Когда все потоки, участвующие в барьере, вызвали эту функцию, они все освобождаются одновременно. Если предоставляется timeout, он используется вместо того, что было предоставлено конструктору класса.

Возвращаемое значение — целое число в диапазоне от 0 до parties — 1, различное для каждого потока. Это можно использовать для выбора потока для выполнения некоторых специальных операций, например:

i = barrier.wait()
if i == 0:
    # Только один поток должен напечатать это
    print("passed the barrier")

Если конструктору был предоставлен action, один из потоков вызовет его до того, как он будет освобождён. Если данный вызов вызывает ошибку, барьер переводится в неработающее состояние.

Если время вызова истекает, барьер переводится в сломанное состояние.

Данный метод может вызвать исключение BrokenBarrierError, если барьер сломан или сброшен во время ожидания потока.

reset()

Возвращает барьер в пустое состояние по умолчанию. Любые ожидающие его потоки получат исключение BrokenBarrierError.

Обратите внимание, что для использования этой функции может потребоваться некоторая внешняя синхронизация, если есть другие потоки, состояние которых неизвестно. Если барьер сломан, может быть лучше просто оставить его и создать новый.

abort()

Привести барьер в сломанное состояние. Это приводит к сбою любых активных или будущих вызовов wait() с BrokenBarrierError. Используйте это, например, если один из потоков необходимо прервать, чтобы избежать взаимоблокировки приложения.

Может быть предпочтительнее просто создать барьер с разумным значением timeout для автоматической защиты от сбоя одного из потоков.

parties

Количество потоков, необходимое для прохождения барьера.

n_waiting

Количество потоков, ожидающих в данный момент в барьере.

broken

Логическое значение True, если барьер находится в сломанном состоянии.

exception threading.BrokenBarrierError

Это исключение, подкласс RuntimeError, возникает, когда объект Barrier сбрасывается или ломается.

Использование блокировок, условий и семафоров в операторе with

Все объекты, предоставляемые данным модулем, у которых есть методы acquire() и release(), могут использоваться в качестве менеджеров контекста для оператора with. Метод acquire() будет вызываться при входе в блок, а release() будет вызываться при выходе из блока. Отсюда следующий фрагмент:

with some_lock:
    # делать что-нибудь...

эквивалентно:

some_lock.acquire()
try:
    # делать что-нибудь...
finally:
    some_lock.release()

В настоящее время объекты Lock, RLock, Condition, Semaphore и BoundedSemaphore могут использоваться в качестве менеджеров контекста операторов with.