multiprocessing
— Процессный параллелизм¶
Введение¶
multiprocessing
— это пакет, поддерживающий порождение процессов с
использованием API, аналогичный модулю threading
. Пакет
multiprocessing
предлагает как локальный, так и удаленный параллелизм,
эффективно обходя
Глобальную блокировку интерпретатора за
счёт использования подпроцессов вместо потоков. Благодаря этому, модуль
multiprocessing
позволяет программисту полностью использовать несколько
процессоров на компьютере. Он работает как в Unix, так и в Windows.
В модуле multiprocessing
также представлены API, не имеющие аналогов в
модуле threading
. Ярким примером этого является объект
Pool
, который предлагает удобные средства
распараллеливания выполнения функции по нескольким входным значениям,
распределяя входные данные по процессам (параллелизм данных). В следующем
примере демонстрируется обычная практика определения таких функций в модуле,
чтобы дочерние процессы могли успешно импортировать этот модуль. Это базовый
пример параллелизма данных с использованием Pool
:
from multiprocessing import Pool
def f(x):
return x*x
if __name__ == '__main__':
with Pool(5) as p:
print(p.map(f, [1, 2, 3]))
будет напечатано в стандартный вывод:
[1, 4, 9]
Класс Process
¶
В multiprocessing
процессы порождаются путём создания объекта
Process
и последующего вызова его метода start()
.
Process
следует API threading.Thread
. Тривиальный пример
многопроцессорной программы:
from multiprocessing import Process
def f(name):
print('hello', name)
if __name__ == '__main__':
p = Process(target=f, args=('bob',))
p.start()
p.join()
Вот расширенный пример, чтобы показать идентификаторы отдельных процессов:
from multiprocessing import Process
import os
def info(title):
print(title)
print('module name:', __name__)
print('parent process:', os.getppid())
print('process id:', os.getpid())
def f(name):
info('function f')
print('hello', name)
if __name__ == '__main__':
info('main line')
p = Process(target=f, args=('bob',))
p.start()
p.join()
Для объяснения того, почему необходима часть if __name__ == '__main__'
, см.
Руководство по программированию.
Контексты и методы запуска¶
В зависимости от платформы multiprocessing
поддерживает три способа
запуска процесса. Далее перечислены методы запуска.
- spawn
Родительский процесс запускает новый процесс интерпретатора Python. Дочерний процесс унаследует только те ресурсы, которые необходимы для выполнения метода
run()
объекта процесса. В частности, ненужные файловые дескрипторы и дескрипторы родительского процесса не будут унаследованы. Запуск процесса с использованием этого метода довольно медленный по сравнению с использованием fork или forkserver.Доступен в Unix и Windows. По умолчанию в Windows и macOS.
- fork
Родительский процесс использует
os.fork()
для разветвления интерпретатора Python. Когда начинается дочерний процесс, фактически идентичен родительскому процессу. Все ресурсы родителя наследуются дочерним процессом. Обратите внимание, что безопасное разветвление многопоточного процесса проблематично.Доступен только в Unix. По умолчанию в Unix.
- forkserver
С этого момента всякий раз, когда требуется новый процесс, родительский процесс подключается к серверу и запрашивает у него форк для нового процесса. Процесс сервера форка является однопоточным, поэтому для него безопасно использовать
os.fork()
. Никакие ненужные ресурсы не наследуются.Доступен на платформах Unix, которые поддерживают передачу файловых дескрипторов по каналам (pipes) Unix.
Изменено в версии 3.8: В macOS метод запуска spawn теперь используется по умолчанию. Метод запуска fork следует считать небезопасным, поскольку он может привести к сбою подпроцесса. См. bpo-33725.
Изменено в версии 3.4: spawn добавлен на всех unix платформах, а forkserver добавлен для некоторых платформ unix. Дочерние процессы больше не наследуют все родительские дескрипторы в Windows.
В Unix с использованием методов запуска spawn или forkserver также
запускается процесс трекер ресурсов, который отслеживает несвязанные
именованные системные ресурсы (такие как именованные семафоры или объекты
SharedMemory
), созданные процессами
программы. Когда все процессы завершатся, трекер ресурсов отсоединит все
оставшиеся отслеживаемые объекты. Обычно их не должно быть, но если процесс был
остановлен сигналом, могут быть «утечки» ресурсов. (Ни протекающие семафоры,
ни сегменты разделяемой памяти не будут автоматически разъединены до следующей
перезагрузки. Это проблематично для обоих объектов, поскольку система допускает
только ограниченное количество именованных семафоров, а сегменты разделяемой
памяти занимают некоторое пространство в основной памяти.)
Чтобы выбрать метод запуска, используйте set_start_method()
в предложении
if __name__ == '__main__'
основного модуля. Например:
import multiprocessing as mp
def foo(q):
q.put('hello')
if __name__ == '__main__':
mp.set_start_method('spawn')
q = mp.Queue()
p = mp.Process(target=foo, args=(q,))
p.start()
print(q.get())
p.join()
set_start_method()
не следует использовать в программе более одного раза.
В качестве альтернативы вы можете использовать get_context()
для
получения объекта контекста. Объекты контекста имеют тот же API, что и модуль
multiprocessing, и позволяют использовать несколько методов запуска в одной
программе.
import multiprocessing as mp
def foo(q):
q.put('hello')
if __name__ == '__main__':
ctx = mp.get_context('spawn')
q = ctx.Queue()
p = ctx.Process(target=foo, args=(q,))
p.start()
print(q.get())
p.join()
Обратите внимание, что объекты, относящиеся к одному контексту, могут быть несовместимы с процессами для другого контекста. В частности, блокировки, созданные с использованием контекста fork, не могут быть переданы процессам, запущенным с использованием методов запуска spawn или forkserver.
Библиотека, которая хочет использовать определенный метод запуска, вероятно,
должна использовать get_context()
, чтобы не мешать выбору пользователя
библиотеки.
Предупреждение
Методы запуска 'spawn'
и 'forkserver'
в настоящее время не могут
использоваться с «замороженными» исполняемыми файлами (т. е. двоичными файлами,
созданными такими пакетами, как PyInstaller и cx_Freeze) в Unix.
Метод запуска 'fork'
действительно работает.
Обмен объектами между процессами¶
multiprocessing
поддерживает два типа конвейеров связи между процессами:
Очереди
Класс
Queue
является почти клономqueue.Queue
. Например:from multiprocessing import Process, Queue def f(q): q.put([42, None, 'hello']) if __name__ == '__main__': q = Queue() p = Process(target=f, args=(q,)) p.start() print(q.get()) # распечатает "[42, None, 'hello']" p.join()Очереди безопасны для потоков и процессов.
Конвейеры (pipes)
Функция
Pipe()
возвращает пару объектов соединения, соединенных конвейером (pipe), который по умолчанию является дуплексным (двусторонним). Например:from multiprocessing import Process, Pipe def f(conn): conn.send([42, None, 'hello']) conn.close() if __name__ == '__main__': parent_conn, child_conn = Pipe() p = Process(target=f, args=(child_conn,)) p.start() print(parent_conn.recv()) # распечатает "[42, None, 'hello']" p.join()Два объекта подключения, возвращаемые
Pipe()
, представляют собой два конца конвейера. Каждый объект подключения содержит методыsend()
иrecv()
(среди прочих). Обратите внимание, что данные в конвейере могут быть повреждены, если два процесса (или потока) пытаются читать или писать в один и тот же конец конвейера одновременно. Конечно, нет риска повреждения из-за процессов, использующих разные концы конвейера одновременно.
Синхронизация процессов¶
multiprocessing
содержит эквиваленты всех примитивов синхронизации из
threading
. Например, можно использовать блокировку, чтобы гарантировать,
что только один процесс печатает в стандартный вывод за раз:
from multiprocessing import Process, Lock
def f(l, i):
l.acquire()
try:
print('hello world', i)
finally:
l.release()
if __name__ == '__main__':
lock = Lock()
for num in range(10):
Process(target=f, args=(lock, num)).start()
Без использования блокировки вывода из разных процессов может все запутаться.
Обмен состояниями между процессами¶
Как упоминалось выше, при параллельном программировании обычно лучше избегать использования общего состояния, насколько это возможно. Это особенно верно при использовании нескольких процессов.
Однако, если вам действительно нужно использовать некоторые общие данные,
multiprocessing
предоставляет несколько способов сделать это.
Общая память
Данные могут храниться в отображении общей памяти с использованием
Value
илиArray
. Например, следующий кодfrom multiprocessing import Process, Value, Array def f(n, a): n.value = 3.1415927 for i in range(len(a)): a[i] = -a[i] if __name__ == '__main__': num = Value('d', 0.0) arr = Array('i', range(10)) p = Process(target=f, args=(num, arr)) p.start() p.join() print(num.value) print(arr[:])Напечатает:
3.1415927 [0, -1, -2, -3, -4, -5, -6, -7, -8, -9]Аргументы
'd'
и'i'
, используемые при созданииnum
иarr
, являются кодами типа того типа, который используется модулемarray
:'d'
указывает на число с плавающей запятой двойной точности, а'i'
указывает на целое число со знаком. Эти общие объекты будут технологическими и поточно-ориентированными.Для большей гибкости в использовании разделяемой памяти можно использовать модуль
multiprocessing.sharedctypes
, который поддерживает создание произвольных объектов ctypes, выделенных из разделяемой памяти.
Серверный процесс
Объект-менеджер, возвращаемый
Manager()
, управляет серверным процессом, который содержит объекты Python и позволяет другим процессам управлять ими с помощью прокси.Менеджер возвращенный
Manager()
поддержит типыlist
,dict
,Namespace
,Lock
,RLock
,Semaphore
,BoundedSemaphore
,Condition
,Event
,Barrier
,Queue
,Value
иArray
. Например:from multiprocessing import Process, Manager def f(d, l): d[1] = '1' d['2'] = 2 d[0.25] = None l.reverse() if __name__ == '__main__': with Manager() as manager: d = manager.dict() l = manager.list(range(10)) p = Process(target=f, args=(d, l)) p.start() p.join() print(d) print(l)Напечатает:
{0.25: None, 1: '1', '2': 2} [9, 8, 7, 6, 5, 4, 3, 2, 1, 0]Менеджеры серверных процессов более гибкие, чем использование объектов общей памяти, потому что они могут быть созданы для поддержки произвольных типов объектов. Кроме того, один менеджер может использоваться совместно процессами на разных компьютерах в сети. Однако они медленнее, чем при использовании общей памяти.
Использование пула рабочих¶
Класс Pool
представляет собой пул рабочих
процессов. У него есть методы, которые позволяют передавать задачи рабочим
процессам несколькими способами.
Например:
from multiprocessing import Pool, TimeoutError
import time
import os
def f(x):
return x*x
if __name__ == '__main__':
# запуск 4 рабочих процессов
with Pool(processes=4) as pool:
# печать "[0, 1, 4,..., 81]"
print(pool.map(f, range(10)))
# печатать одинаковые числа в произвольном порядке
for i in pool.imap_unordered(f, range(10)):
print(i)
# вычислить "f(20)" асинхронно
res = pool.apply_async(f, (20,)) # запускается в *только* одном процессе
print(res.get(timeout=1)) # печатает "400"
# вычислить "os.getpid()" асинхронно
res = pool.apply_async(os.getpid, ()) # запускается в *только* одном процессе
print(res.get(timeout=1)) # печатает PID этого процесса
# запуск нескольких оценок асинхронно *может* использовать больше процессов
multiple_results = [pool.apply_async(os.getpid, ()) for i in range(4)]
print([res.get(timeout=1) for res in multiple_results])
# уснуть одному рабочему на 10 секунд
res = pool.apply_async(time.sleep, (10,))
try:
print(res.get(timeout=1))
except TimeoutError:
print("We lacked patience and got a multiprocessing.TimeoutError")
print("For the moment, the pool remains available for more work")
# выход из блока with с остановкой пула
print("Now the pool is closed and no longer available")
Обратите внимание, что методы пула должны использоваться только тем процессом, который его создал.
Примечание
Функциональность в этом пакете требует, чтобы модуль __main__
был
импортирован дочерними элементами. Это описано в
Руководство по программированию, однако здесь стоит указать, что
некоторые примеры, такие как
multiprocessing.pool.Pool
, не будут работать в интерактивном
интерпретаторе. Например:
>>> from multiprocessing import Pool
>>> p = Pool(5)
>>> def f(x):
... return x*x
...
>>> with p:
... p.map(f, [1,2,3])
Process PoolWorker-1:
Process PoolWorker-2:
Process PoolWorker-3:
Traceback (most recent call last):
AttributeError: 'module' object has no attribute 'f'
AttributeError: 'module' object has no attribute 'f'
AttributeError: 'module' object has no attribute 'f'
(Если выполнить это, то на самом деле будет выведено три полных трейсбэка, перемежающихся полуслучайным образом, а затем, возможно, придется как-то остановить родительский процесс.)
Справочник¶
Пакет multiprocessing
в основном реплицирует API модуля threading
.
Process
и исключения¶
-
class
multiprocessing.
Process
(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None)¶ Объекты процессов представляют собой действия, которые выполняются в отдельном процессе. Класс
Process
содержит эквиваленты всех методовthreading.Thread
.Конструктор всегда должен вызываться с ключевыми аргументами. group всегда должен быть
None
; он существует исключительно для совместимости сthreading.Thread
. target — это вызываемый объект, вызываемый методомrun()
. По умолчанию используетсяNone
, что означает, что ничего не вызывается. name — это имя процесса (подробнее см.name
). args — это кортеж аргументов для целевого вызова. kwargs — это словарь ключевых аргументов для целевого вызова. Если указан аргумент daemon, содержащий только ключевой аргумент, для флага процессаdaemon
устанавливается значениеTrue
илиFalse
. ЕслиNone
(по умолчанию), этот флаг будет унаследован от процесса создания.По умолчанию в target не передаются аргументы.
Если подкласс переопределяет конструктор, он должен убедиться, что он вызывает конструктор базового класса (
Process.__init__()
), прежде чем делать что-либо ещё с процессом.Изменено в версии 3.3: Добавлен аргумент daemon.
-
run
()¶ Метод, представляющий активность процесса.
Вы можете переопределить этот метод в подклассе. Стандартный метод
run()
вызывает вызываемый объект, переданный конструктору объекта в качестве целевого аргумента, если таковой имеется, с последовательными аргументами и ключевыми аргументами, взятыми из аргументов args и kwargs соответственно.
-
start
()¶ Начать работу процесса.
Он должен вызываться не более одного раза для каждого объекта процесса. Он организует вызов метода объекта
run()
в отдельном процессе.
-
join
([timeout])¶ Если необязательный аргумент timeout —
None
(по умолчанию), метод блокируется до тех пор, пока не завершится процесс, чей методjoin()
вызван. Если timeout — положительное число, блокируется не более timeout секунд. Обратите внимание, что метод возвращаетNone
, если его процесс завершается или время ожидания метода истекло. Проверьтеexitcode
процесса, чтобы определить, завершился ли он.К процессу можно присоединяться много раз.
Процесс не может присоединиться к самому себе, потому что это приведет к тупиковой ситуации. Попытка присоединиться к процессу до его запуска является ошибкой.
-
name
¶ Имя процесса. Имя — это строка, используемая только для идентификации. У него нет семантики. Одно и то же имя может быть присвоено нескольким процессам.
Начальное имя задается конструктором. Если конструктору не указано явное имя, создается имя формы «Process-N1:N2:…:Nk», где каждый Nk является N-м дочерним элементом своего родителя.
-
is_alive
()¶ Возвращает, жив ли процесс.
Грубо говоря, объект процесса жив с момента возврата метода
start()
до завершения дочернего процесса.
-
daemon
¶ Флаг демона процесса, логическое значение. Он должен устанавливаться до вызова
start()
.Начальное значение наследуется от создаваемого процесса.
Когда процесс завершается, он пытается завершить все свои демонические дочерние процессы.
Обратите внимание, что демонический процесс не может создавать дочерние процессы. В противном случае демонический процесс оставит своих дочерних процессов сиротами, если он будет завершен при выходе из родительского процесса. Кроме того, это демоны или службы не Unix, это обычные процессы, которые будут завершены (а не присоединены), если завершились недемонические процессы.
Помимо API
threading.Thread
, объектыProcess
также поддерживают следующие атрибуты и методы:-
pid
¶ Возвращает идентификатор процесса. Перед тем, как процесс будет запущен, это будет
None
.
-
exitcode
¶ Будет
None
, если процесс ещё не завершён. Отрицательное значение -N указывает, что дочерний элемент был прерван сигналом N.
-
authkey
¶ Ключ аутентификации процесса (байтовая строка).
При инициализации
multiprocessing
главному процессу назначается случайная строка с использованиемos.urandom()
.При создании объекта
Process
он унаследует ключ аутентификации своего родительского процесса, хотя это можно изменить, установивauthkey
на другую байтовую строку.См. Ключи аутентификации.
-
sentinel
¶ Числовой дескриптор системного объекта, который станет «готовым» по окончании процесса.
Вы можете использовать это значение, если хотите дождаться нескольких событий одновременно, используя
multiprocessing.connection.wait()
. В противном случае проще вызватьjoin()
.В Windows это дескриптор ОС, который можно использовать с семейством вызовов API
WaitForSingleObject
иWaitForMultipleObjects
. В Unix это файловый дескриптор, который можно использовать с примитивами из модуляselect
.Добавлено в версии 3.3.
-
terminate
()¶ Прекратить процесс. В Unix это делается с помощью сигнала
SIGTERM
; в Windows используетсяTerminateProcess()
. Обратите внимание, что обработчики выхода, предложения finally и т. д. не будут выполняться.Обратите внимание, что дочерние процессы процесса не будут завершены — они просто станут осиротевшими.
Предупреждение
Если используется данный метод, когда связанный процесс использует конвейер или очередь, тогда конвейер или очередь могут быть повреждены и могут стать непригодными для использования другим процессом. Точно так же, если процесс получил блокировку или семафор и т. д., его завершение может привести к взаимоблокировке других процессов.
-
kill
()¶ Такой же, как
terminate()
, но с использованием сигналаSIGKILL
в Unix.Добавлено в версии 3.7.
-
close
()¶ Закрыть объект
Process
, освободив все связанные с ним ресурсы. ВызываетсяValueError
, если основной процесс всё ещё выполняется. После успешного возвратаclose()
большинство других методов и атрибутов объектаProcess
вызовутValueError
.Добавлено в версии 3.7.
Обратите внимание, что методы
start()
,join()
,is_alive()
,terminate()
иexitcode
должны вызываться только процессом, создавшим объект процесса.Пример использования некоторых методов
Process
:>>> import multiprocessing, time, signal >>> p = multiprocessing.Process(target=time.sleep, args=(1000,)) >>> print(p, p.is_alive()) <Process ... initial> False >>> p.start() >>> print(p, p.is_alive()) <Process ... started> True >>> p.terminate() >>> time.sleep(0.1) >>> print(p, p.is_alive()) <Process ... stopped exitcode=-SIGTERM> False >>> p.exitcode == -signal.SIGTERM True
-
-
exception
multiprocessing.
ProcessError
¶ Базовый класс всех исключений
multiprocessing
.
-
exception
multiprocessing.
BufferTooShort
¶ Исключение, вызываемое
Connection.recv_bytes_into()
, когда предоставленный буферный объект слишком мал для прочитанного сообщения.Если
e
является экземпляромBufferTooShort
, тоe.args[0]
выдаст сообщение в виде байтовой строки.
-
exception
multiprocessing.
AuthenticationError
¶ Возникает при ошибке аутентификации.
-
exception
multiprocessing.
TimeoutError
¶ Возникает методами с таймаутом по истечении тайм-аута.
Пайпы и очереди¶
При использовании нескольких процессов обычно используется передача сообщений для связи между процессами и избегается необходимость использования каких-либо примитивов синхронизации, таких как блокировки.
Для передачи сообщений можно использовать Pipe()
(для соединения между
двумя процессами) или очередь (что позволяет нескольких производителей и
потребителей).
Типы Queue
, SimpleQueue
и JoinableQueue
представляют
собой очереди FIFO с несколькими производителями
и несколькими потребителями, смоделированные на основе класса
queue.Queue
в стандартной библиотеке. Они отличаются тем, что в
Queue
отсутствуют методы task_done()
и
join()
, представленные в классе queue.Queue
Python
2.5.
Если вы используете JoinableQueue
, вы должны вызывать
JoinableQueue.task_done()
для каждой задачи, удаленной из очереди, иначе
семафор, используемый для подсчета количества незавершённых задач, может в
конечном итоге переполниться, вызывая исключение.
Обратите внимание, что можно также создать общую очередь с помощью объекта- менеджера — см. Менеджеры.
Примечание
multiprocessing
использует обычные исключения queue.Empty
и
queue.Full
для сигнала тайм-аута. Они недоступны в пространстве
имён multiprocessing
, поэтому вам необходимо импортировать их из
queue
.
Примечание
Когда объект помещается в очередь, объект обрабатывается, а фоновый поток позже сбрасывает обработанные данные в нижележащий конвейер. Это имеет некоторые последствия, которые немного удивительны, но не должны вызывать каких-либо практических трудностей, — если они действительно беспокоят вас, вы можете вместо этого использовать очередь, созданную с помощью менеджера.
- После помещения объекта в пустую очередь может произойти бесконечная
задержка перед тем, как метод
empty()
очереди возвращаетFalse
иget_nowait()
может возвращает без вызоваqueue.Empty
. - Если несколько процессов ставят в очередь объекты, возможно, чтобы объекты были приняты на другом конце вне порядка. Однако объекты, поставленные в очередь одним и тем же процессом, всегда будут находиться в ожидаемом порядке относительно друг друга.
Предупреждение
Если процесс завершается с использованием Process.terminate()
или
os.kill()
, когда он пытается использовать Queue
, то данные в
очереди, вероятно, будут повреждены. Это может привести к тому, что любой
другой процесс получит исключение, когда он попытается использовать очередь
позже.
Предупреждение
Как упоминалось выше, если дочерний процесс поместил элементы в очередь (и он
не использовал JoinableQueue.cancel_join_thread
), то этот процесс не завершится,
пока все буферизованные элементы не будут сброшены в конвейер.
Это означает, что если вы попытаетесь присоединиться к этому процессу, вы можете попасть в тупик, если не уверены, что все элементы, помещенные в очередь, были израсходованы. Точно так же, если дочерний процесс не является демоническим, то родительский процесс может зависнуть при выходе, когда он пытается присоединиться ко всем своим недемоническим дочерним процессам.
Обратите внимание, что очередь, созданная с помощью диспетчера, не содержит этой проблемы. См. Руководство по программированию.
Пример использования очередей для межпроцессной связи см. в разделе Примеры.
-
multiprocessing.
Pipe
([duplex])¶ Возвращает пару
(conn1, conn2)
изConnection
объектов, представляющих концы конвейера.Если duplex —
True
(по умолчанию), то конвейер является двунаправленным. Если duplex —False
, тогда конвейера является однонаправленными:conn1
можно использовать только для приема сообщений, аconn2
можно использовать только для отправки сообщений.
-
class
multiprocessing.
Queue
([maxsize])¶ Возвращает общую очередь процесса, реализованную с использованием конвейера и нескольких блокировок/семафоров. Когда процесс впервые помещает элемент в очередь, запускается поток, который передает объекты из буфера в конвейер.
Обычные исключения
queue.Empty
иqueue.Full
из модуляqueue
стандартной библиотеки вызываются для сигналов тайм-аутов.Queue
реализует все методыqueue.Queue
, кромеtask_done()
иjoin()
.-
qsize
()¶ Возвращает приблизительный размер очереди. Из-за семантики многопоточности/многопроцессорности это число не является надежным.
Обратите внимание, что может вызвать
NotImplementedError
на платформах Unix, таких как Mac OS X, гдеsem_getvalue()
не реализован.
-
empty
()¶ Возвращает
True
, если очередь пуста, в противном случае —False
. Из-за семантики многопоточности/многопроцессорности ненадежно.
-
full
()¶ Возвращает
True
, если очередь заполнена, в противном случае —False
. Из-за семантики многопоточности/многопроцессорности ненадежно.
-
put
(obj[, block[, timeout]])¶ Поместить obj в очередь. Если необязательный аргумент block —
True
(по умолчанию), а timeout —None
(по умолчанию), при необходимости заблокируйте, пока не станет доступен свободный слот. Если timeout является положительным числом, он блокирует не более timeout секунд и вызывает исключениеqueue.Full
, если в течение этого времени не было доступного свободного слота. В противном случае (block —False
) поместить элемент в очередь, если свободный слот доступен немедленно, иначе вызвать исключениеqueue.Full
(timeout в этом случае игнорируется).Изменено в версии 3.8: Если очередь закрыта, то вместо
AssertionError
вызываетсяValueError
.
-
put_nowait
(obj)¶ Эквивалент
put(obj, False)
.
-
get
([block[, timeout]])¶ Удаление и возвращение элемента из очереди. Если необязательные аргументы block —
True
(по умолчанию), а timeout —None
(по умолчанию), заблокировать, если необходимо, до тех пор, пока элемент не станет доступным. Если timeout является положительным числом, он блокируется не более timeout секунд и вызывает исключениеqueue.Empty
, если в течение этого времени ни один элемент не был доступен. В противном случае (блок —False
) возвращает элемент, если он доступен немедленно, иначе вызовет исключениеqueue.Empty
(в этом случае timeout игнорируется).Изменено в версии 3.8: Если очередь закрыта, то вместо
OSError
вызываетсяValueError
.
-
get_nowait
()¶ Эквивалент
get(False)
.
multiprocessing.Queue
содержит несколько дополнительных методов, которых нет вqueue.Queue
. Эти методы обычно не нужны для большей части кода :-
close
()¶ Указать, что текущий процесс больше не будет помещать данные в эту очередь. Фоновый поток завершится, как только он сбросит все буферизованные данные в конвейер. Это вызывается автоматически при сборке мусора.
-
join_thread
()¶ Присоединиться к фоновому потоку. Это можно использовать только после вызова
close()
. Он блокируется до тех пор, пока фоновый поток не завершится, гарантируя, что все данные в буфере будут сброшены в конвейер.По умолчанию, если процесс не является создателем очереди, при выходе он попытается присоединиться к фоновому потоку очереди. Процесс может вызвать
cancel_join_thread()
, чтобыjoin_thread()
ничего не делал.
-
cancel_join_thread
()¶ Предотвращение блокировки
join_thread()
. В частности, он предотвращает автоматическое присоединение фонового потока при выходе из процесса. См.join_thread()
.Лучшее название для этого метода —
allow_exit_without_flush()
. Это может привести к потере данных в очереди, и вам почти наверняка не нужно будет их использовать. Это действительно только там, если вам нужно, чтобы текущий процесс немедленно завершился, не дожидаясь сброса данных из очереди в базовый конвейер, и вас не заботят потерянные данные.
Примечание
Функциональность данного класса требует работающей реализации общего семафора в операционной системе хоста. Без него функциональность в этом классе будет отключена, и попытки создать экземпляр
Queue
приведут кImportError
. См. bpo-3770 для получения дополнительной информации. То же самое верно для любого из специализированных типов очередей, перечисленных ниже.-
-
class
multiprocessing.
SimpleQueue
¶ Упрощенный тип
Queue
, очень близкий к заблокированномуPipe
.-
empty
()¶ Возвращает
True
, если очередь пуста,False
иначе.
-
get
()¶ Удаление и возвращение элемента из очереди.
-
put
(item)¶ Поместить item в очередь.
-
-
class
multiprocessing.
JoinableQueue
([maxsize])¶ JoinableQueue
, подклассQueue
, представляет собой очередь, которая дополнительно содержит методыtask_done()
иjoin()
.-
task_done
()¶ Указать, что задача, ранее поставленная в очередь, завершена. Используется потребителями очереди. Для каждого
get()
, используемого для выборки задачи, последующий вызовtask_done()
сообщает очереди, что обработка задачи завершена.Если
join()
в настоящее время блокируется, он возобновится, когда все элементы будут обработаны (это означает, что вызовtask_done()
был получен для каждого элемента, который былput()
в очереди).Вызывает
ValueError
, если вызывается больше раз, чем было помещено в очередь элементов.
-
join
()¶ Блокировать до тех пор, пока все элементы в очереди не будут получены и обработаны.
Количество незавершенных задач увеличивается всякий раз, когда элемент добавляется в очередь. Счетчик уменьшается всякий раз, когда потребитель вызывает
task_done()
, чтобы указать, что элемент был получен и вся работа над ним завершена. Когда количество незавершенных задач падает до нуля,join()
разблокируется.
-
Разное¶
-
multiprocessing.
active_children
()¶ Возвращает список всех живых потомков текущего процесса.
Вызов содержит побочный эффект «присоединения» к уже завершенным процессам.
-
multiprocessing.
cpu_count
()¶ Возвращает количество процессоров в системе.
Это число не эквивалентно количеству процессоров, которые может использовать текущий процесс. Количество используемых процессоров можно узнать с помощью
len(os.sched_getaffinity(0))
Может вызвать
NotImplementedError
.См.также
-
multiprocessing.
current_process
()¶ Возвращает объект
Process
, соответствующий текущему процессу.Аналог
threading.current_thread()
.
-
multiprocessing.
parent_process
()¶ Возвращает объект
Process
, соответствующий родительскому процессуcurrent_process()
. Для основного процессаparent_process
будетNone
.Добавлено в версии 3.8.
-
multiprocessing.
freeze_support
()¶ Добавить поддержку, когда программа, использующая
multiprocessing
, была заморожена для создания исполняемого файла Windows. (Был протестирован с py2exe, PyInstaller и cx_Freeze.)Эту функцию нужно вызывать сразу после строки
if __name__ == '__main__'
основного модуля. Например:from multiprocessing import Process, freeze_support def f(): print('hello world!') if __name__ == '__main__': freeze_support() Process(target=f).start()
Если строка
freeze_support()
пропущена, то попытка запустить замороженный исполняемый файл вызоветRuntimeError
.Вызов
freeze_support()
не действует при вызове в любой операционной системе, кроме Windows. Кроме того, если модуль нормально запускается интерпретатором Python в Windows (программа не была заморожена), тоfreeze_support()
не действует.
-
multiprocessing.
get_all_start_methods
()¶ Возвращает список поддерживаемых методов запуска, первый из которых используется по умолчанию. Возможные методы запуска:
'fork'
,'spawn'
и'forkserver'
. В Windows доступен только'spawn'
. В Unix всегда поддерживаются'fork'
и'spawn'
, по умолчанию —'fork'
.Добавлено в версии 3.4.
-
multiprocessing.
get_context
(method=None)¶ Возвращает объект контекста, который содержит те же атрибуты, что и модуль
multiprocessing
.Если method —
None
, то возвращается контекст по умолчанию. В противном случае method должен быть'fork'
,'spawn'
,'forkserver'
. ВозникаетValueError
, если указанный метод запуска недоступен.Добавлено в версии 3.4.
-
multiprocessing.
get_start_method
(allow_none=False)¶ Возвращает имя метода запуска, используемого для запуска процессов.
Если метод запуска не был исправлен и allow_none содержит ложное значение, то метод запуска ремонтируется по умолчанию и возвращается имя. Если метод запуска не был исправлен и allow_none истинно, возвращается
None
.Возвращаемое значение может быть
'fork'
,'spawn'
,'forkserver'
илиNone
.'fork'
используется по умолчанию в Unix, а'spawn'
- по умолчанию в Windows.Добавлено в версии 3.4.
-
multiprocessing.
set_executable
()¶ Задаёт путь к интерпретатору Python, который будет использоваться при запуске дочернего процесса. (По умолчанию используется
sys.executable
). Встраивающим, вероятно, потребуется что-то вроде:set_executable(os.path.join(sys.exec_prefix, 'pythonw.exe'))
прежде чем они смогут создавать дочерние процессы.
Изменено в версии 3.4: Теперь поддерживается в Unix при использовании метода запуска
'spawn'
.
-
multiprocessing.
set_start_method
(method)¶ Установить метод, который следует использовать для запуска дочерних процессов. method может быть
'fork'
,'spawn'
или'forkserver'
.Обратите внимание, что он должен вызываться не более одного раза, и он должен быть защищён в разделе
if __name__ == '__main__'
основного модуля.Добавлено в версии 3.4.
Примечание
multiprocessing
не содержит аналогов threading.active_count()
,
threading.enumerate()
, threading.settrace()
,
threading.setprofile()
, threading.Timer
или
threading.local
.
Объекты подключения¶
Объекты подключения позволяют отправлять и получать пиклингуемые (picklable) объекты или строки. Их можно рассматривать как подключенные сокеты, ориентированные на сообщения.
Объекты подключения обычно создаются с использованием Pipe
— см. также Слушатели и клиенты.
-
class
multiprocessing.connection.
Connection
¶ -
send
(obj)¶ Отправляет объект на другой конец соединения, который следует прочитать с помощью команды
recv()
.Объект должен быть пиклингуемым. Очень большие пикли (приблизительно 32 MiB+, хотя это зависит от ОС) могут вызвать исключение
ValueError
.
-
recv
()¶ Возвращает объект, отправленный с другого конца соединения, используя
send()
. Блокирует, пока не будет что получить. ВызываетEOFError
, если нечего принял, а другой конец был закрыт.
-
fileno
()¶ Возвращает файловый дескриптор или дескриптор, используемый соединением.
-
close
()¶ Закрывает соединение.
Он вызывается автоматически при сборке мусора.
-
poll
([timeout])¶ Возвращает, есть ли данные, доступные для чтения.
Если timeout не указан, он немедленно вернётся. Если timeout — это число, то это указывает максимальное время в секундах для блокировки. Если timeout —
None
, то используется бесконечный тайм-аут.Обратите внимание, что с помощью
multiprocessing.connection.wait()
можно опрашивать сразу несколько объектов подключения.
-
send_bytes
(buffer[, offset[, size]])¶ Отправляет байтовые данные из байтоподобного объекта в виде полного сообщения.
Если задано offset, данные считываются с этой позиции в buffer. Если задано size, то это количество байтов будет прочитано из буфера. Очень большие буферы (примерно 32 MiB+, хотя это зависит от ОС) могут вызвать исключение
ValueError
-
recv_bytes
([maxlength])¶ Возвращает полное сообщение байтовых данных, отправленных с другого конца соединения, в виде строки. Блокирует, пока не будет что получить. Вызывает
EOFError
, если нечего принимать и другой конец закрыт.Если указан maxlength и длина сообщения превышает maxlength, то возникает
OSError
и соединение становится недоступным для чтения.
-
recv_bytes_into
(buffer[, offset])¶ Считывание в buffer полное сообщение байтовых данных, отправленных с другого конца соединения, и вернуть количество байтов в сообщении. Блокирует, пока не будет что получить. Вызывает
EOFError
, если нечего принимать, а другой конец был закрыт.buffer должен быть байтоподобным объектом с возможностью записи. Если задано offset, то сообщение будет записано в буфер с этой позиции. Смещение должно быть неотрицательным целым числом меньше длины buffer (в байтах).
Если буфер слишком короткий, возникает исключение
BufferTooShort
, и полное сообщение доступно какe.args[0]
, гдеe
является экземпляром исключения.
Изменено в версии 3.3: Сами объекты соединения теперь могут передаваться между процессами с помощью
Connection.send()
иConnection.recv()
.Добавлено в версии 3.3: Объекты подключения теперь поддерживают протокол управления контекстом — см. Типы менеджера контекста.
__enter__()
возвращает объект подключения, а__exit__()
вызываетclose()
.-
Например:
>>> from multiprocessing import Pipe
>>> a, b = Pipe()
>>> a.send([1, 'hello', None])
>>> b.recv()
[1, 'hello', None]
>>> b.send_bytes(b'thank you')
>>> a.recv_bytes()
b'thank you'
>>> import array
>>> arr1 = array.array('i', range(5))
>>> arr2 = array.array('i', [0] * 10)
>>> a.send_bytes(arr1)
>>> count = b.recv_bytes_into(arr2)
>>> assert count == len(arr1) * arr1.itemsize
>>> arr2
array('i', [0, 1, 2, 3, 4, 0, 0, 0, 0, 0])
Предупреждение
Метод Connection.recv()
автоматически отключает полученные данные, что
может быть угрозой безопасности, если вы не можете доверять процессу,
отправившему сообщение.
Поэтому, если объект подключения не был создан с помощью Pipe()
, следует
использовать методы recv()
и send()
только
после выполнения той или иной аутентификации. См. Ключи аутентификации.
Предупреждение
Если процесс погибает, когда он пытается прочитать или записать в конвейер, то данные в конвейере, вероятно, будут повреждены, поскольку может оказаться невозможным убедиться, где лежат границы сообщения.
Примитивы синхронизации¶
Как правило, примитивы синхронизации не так необходимы в многопроцессорной
программе, как в многопоточной программе. См. документацию для модуля
threading
.
Обратите внимание, что можно также создавать примитивы синхронизации с помощью объекта-менеджера, см. Менеджеры.
-
class
multiprocessing.
Barrier
(parties[, action[, timeout]])¶ Барьерный объект: клон
threading.Barrier
.Добавлено в версии 3.3.
-
class
multiprocessing.
BoundedSemaphore
([value])¶ Ограниченный семафорный объект: близкий аналог
threading.BoundedSemaphore
.Существует единственное отличие от его близкого аналога: первый аргумент его метода
acquire
называется блок, что соответствуетLock.acquire()
.Примечание
В Mac OS X он неотличим от
Semaphore
, посколькуsem_getvalue()
не реализован на этой платформе.
-
class
multiprocessing.
Condition
([lock])¶ Условная переменная: псевдоним для
threading.Condition
.Если указан lock, то это должен быть объект
Lock
илиRLock
изmultiprocessing
.Изменено в версии 3.3: Добавлен метод
wait_for()
.
-
class
multiprocessing.
Event
¶ Клон
threading.Event
.
-
class
multiprocessing.
Lock
¶ Объект нерекурсивной блокировки: близкий аналог
threading.Lock
. Как только процесс или поток получил блокировку, последующие попытки получить её от любого процесса или потока будут блокироваться до тех пор, пока она не будет снята; любой процесс или поток может освободить его. Концепции и поведениеthreading.Lock
применительно к потокам воспроизведены здесь, вmultiprocessing.Lock
, применительно к процессам или потокам, за исключением случаев, указанных выше.Обратите внимание, что
Lock
на самом деле является функцией фабрикой, которая возвращает экземплярmultiprocessing.synchronize.Lock
, инициализированный контекстом по умолчанию.Lock
поддерживает протокол контекстного менеджера и, таким образом, может использоваться в операторахwith
.-
acquire
(block=True, timeout=None)¶ Получить блокировку, блокировку или неблокирование.
Если для аргумента block задано значение
True
(по умолчанию), вызов метода будет блокироваться до тех пор, пока блокировка не перейдет в разблокированное состояние, а затем установит его в заблокированное состояние и вернётTrue
. Обратите внимание, что имя этого первого аргумента отличается от имени вthreading.Lock.acquire()
.Если для аргумента block задано значение
False
, вызов метода не блокируется. Если блокировка в настоящее время находится в заблокированном состоянии, вернётсяFalse
; в противном случае установите блокировку в заблокированное состояние и вернётсяTrue
.При вызове с положительным значением с плавающей запятой для timeout блокировать максимум на количество секунд, указанное в timeout, пока блокировка не может быть получена. Вызовы с отрицательным значением timeout эквивалентны timeout, равному нулю. Вызовы со значением timeout
None
(по умолчанию) устанавливают бесконечный период ожидания. Обратите внимание, что обработка отрицательных значений или значенийNone
для timeout отличается от реализованного поведения вthreading.Lock.acquire()
. Аргумент timeout не содержит практического значения, если аргумент block установлен вFalse
и, таким образом, игнорируется. ВозвращаетTrue
, если блокировка была получена, илиFalse
, если время ожидания истекло.
-
release
()¶ Снять блокировку. Может вызваться из любого процесса или потока, а не только из процесса или потока, который изначально получил блокировку.
Поведение такое же, как и в
threading.Lock.release()
, за исключением того, что при вызове разблокированной блокировки возникаетValueError
.
-
-
class
multiprocessing.
RLock
¶ Объект рекурсивной блокировки: близкий аналог
threading.RLock
. Рекурсивная блокировка должна быть снята процессом или потоком, который её получил. Как только процесс или поток получил рекурсивную блокировку, тот же процесс или поток может получить ее снова без блокировки; этот процесс или поток должен освобождать его один раз при каждом захвате.Обратите внимание, что
RLock
на самом деле является функцией фабрикой, которая возвращает экземплярmultiprocessing.synchronize.RLock
, инициализированный контекстом по умолчанию.RLock
поддерживает протокол term:контекстного менеджера <контекстный менеджер> и, таким образом, может использоваться в операторахwith
.-
acquire
(block=True, timeout=None)¶ Получить блокировку, блокировку или неблокирование.
При вызове с аргументом block, установленным в
True
, блокировка до тех пор, пока блокировка не перейдёт в разблокированное состояние (не принадлежит ни одному процессу или потоку), если блокировка уже не принадлежит текущему процессу или потоку. Текущий процесс или поток затем становится владельцем блокировки (если он еще не владеет), а уровень рекурсии внутри блокировки увеличивается на единицу, в результате чего возвращается значениеTrue
. Обратите внимание, что есть несколько отличий в поведении этого первого аргумента от реализацииthreading.RLock.acquire()
, начиная с имени самого аргумента.При вызове с аргументом block, установленным на
False
, не блокировать. Если блокировка уже была получена (и, следовательно, принадлежит) другому процессу или потоку, текущий процесс или поток не становится владельцем, и уровень рекурсии в блокировке не изменяется, в результате чего возвращается значениеFalse
. Если блокировка находится в разблокированном состоянии, текущий процесс или поток становится владельцем, и уровень рекурсии увеличивается, в результате чего возвращается значениеTrue
.Использование и поведение аргумента timeout такие же, как и в
Lock.acquire()
. Обратите внимание, что некоторые из этих поведений timeout отличаются от реализованных поведений вthreading.RLock.acquire()
.
-
release
()¶ Снять блокировку, уменьшая уровень рекурсии. Если после декремента уровень рекурсии равен нулю, сбросить блокировку на разблокировку (не принадлежащую ни одному процессу или потоку), и если какие-либо другие процессы или потоки заблокированы в ожидании разблокировки блокировки, разрешите выполнение ровно одного из них. Если после декремента уровень рекурсии всё ещё не равен нулю, блокировка остается заблокированной и принадлежит вызывающему процессу или потоку.
Вызывайте этот метод только тогда, когда вызывающий процесс или поток владеет блокировкой.
AssertionError
возникает, если этот метод вызывается процессом или потоком, отличным от владельца, или если блокировка находится в разблокированном (незарегистрированном) состоянии. Обратите внимание, что тип исключения, возникающего в этой ситуации, отличается от реализованного поведения вthreading.RLock.release()
.
-
-
class
multiprocessing.
Semaphore
([value])¶ Семафорный объект: близкий аналог
threading.Semaphore
.Существует единственное отличие от его близкого аналога: первый аргумент его метода
acquire
называется block, что соответствуетLock.acquire()
.
Примечание
В Mac OS X sem_timedwait
не поддерживается, поэтому вызов acquire()
с тайм-аутом будет имитировать поведение этой функции с использованием цикла
ожидания.
Примечание
Если сигнал SIGINT, сгенерированный Ctrl-C, поступает, когда основной
поток заблокирован вызовом BoundedSemaphore.acquire()
,
Lock.acquire()
, RLock.acquire()
, Semaphore.acquire()
,
Condition.acquire()
или Condition.wait()
, то вызов будет
немедленно прерван и будет активирован KeyboardInterrupt
.
Отличается от поведения threading
, где SIGINT будет
игнорироваться, пока выполняются эквивалентные блокирующие вызовы.
Примечание
Для некоторых функций этого пакета требуется работающая реализация общего
семафора в операционной системе хоста. Без него модуль
multiprocessing.synchronize
будет отключён, а попытки его импорта
приведут к ImportError
. См. bpo-3770 для получения
дополнительной информации.
Общие объекты ctypes
¶
Можно создавать общие объекты с использованием общей памяти, которая может быть унаследована дочерними процессами.
-
multiprocessing.
Value
(typecode_or_type, *args, lock=True)¶ Возвращает
ctypes
объект, выделенный из общей памяти. По умолчанию возвращаемое значение фактически является синхронизированной обёрткой для объекта. Доступ к самому объекту можно получить через атрибут value вValue
.typecode_or_type определяет тип возвращаемого объекта: это либо тип ctypes, либо односимвольный код типа того типа, который используется модулем
array
. *args передается конструктору для типа.Если lock —
True
(по умолчанию), то создаётся новый объект рекурсивной блокировки для синхронизации доступа к значению. Если lock является объектомLock
илиRLock
, то он будет использоваться для синхронизации доступа к значению. Если lock — этоFalse
, то доступ к возвращаемому объекту не будет автоматически защищен блокировкой, поэтому он не обязательно будет «безопасным для процесса».Такие операции, как
+=
, которые включают чтение и запись, не являются атомарными. Так что, например, если вы хотите атомарно увеличить общее значение, этого недостаточноcounter.value += 1
Предполагая, что соответствующая блокировка является рекурсивной (что по умолчанию), вы можете сделать это
with counter.get_lock(): counter.value += 1
Обратите внимание, что lock — это ключевой аргумент.
-
multiprocessing.
Array
(typecode_or_type, size_or_initializer, *, lock=True)¶ Возвращает массив ctypes, выделенный из общей памяти. По умолчанию возвращаемое значение фактически является синхронизированной обёрткой для массива.
typecode_or_type определяет тип элементов возвращаемого массива: это либо тип ctypes, либо односимвольный код типа того типа, который используется модулем
array
. Если size_or_initializer является целым числом, то оно определяет длину массива, и изначально массив будет обнулен. В противном случае size_or_initializer — это последовательность, которая используется для инициализации массива, длина которой определяет длину массива.Если lock — это
True
(по умолчанию), то создаётся новый объект блокировки для синхронизации доступа к значению. Если lock является объектомLock
илиRLock
, то он будет использоваться для синхронизации доступа к значению. Если lock — этоFalse
, то доступ к возвращаемому объекту не будет автоматически защищен блокировкой, поэтому он не обязательно будет «безопасным для процесса».Обратите внимание, что lock — это только ключевой аргумент.
Обратите внимание, что массив
ctypes.c_char
содержит атрибуты value и raw, которые позволяют использовать его для хранения и извлечения строк.
Менеджеры¶
Менеджеры предоставляют способ создания данных, которые могут совместно использоваться разными процессами, включая обмен по сети между процессами, запущенными на разных машинах. Объект-менеджер управляет серверным процессом, который управляет общими объектами. Другие процессы могут получить доступ к общим объектам с помощью прокси.
Возвращает запущенный объект
SyncManager
, который может использоваться для разделения объектов между процессами. Возвращенный объект-менеджер соответствует порожденному дочернему процессу и содержит методы, которые будут создавать общие объекты и возвращать соответствующие прокси.
Управляющие процессы будут завершены, как только они будут собраны сборщиком
мусора или их родительский процесс завершится. Классы менеджеров определены в
модуле multiprocessing.managers
:
-
class
multiprocessing.managers.
BaseManager
([address[, authkey]])¶ Создать объект BaseManager.
После создания необходимо вызвать
start()
илиget_server().serve_forever()
, чтобы убедиться, что объект-менеджер ссылается на запущенный процесс-менеджер.address — это адрес, по которому процесс-менеджер прослушивает новые соединения. Если address —
None
, то выбирается произвольный.authkey — это ключ аутентификации, который будет использоваться для проверки действительности входящих соединений с серверным процессом. Если authkey —
None
, то используетсяcurrent_process().authkey
. В противном случае используется authkey, и это должна быть строка байтов.-
start
([initializer[, initargs]])¶ Запустить подпроцесс для запуска менеджера. Если initializer не
None
, тогда подпроцесс вызоветinitializer(*initargs)
при запуске.
-
get_server
()¶ Возвращает объект
Server
, который представляет фактический сервер под управлением менеджера. ОбъектServer
поддерживает методserve_forever()
:>>> from multiprocessing.managers import BaseManager >>> manager = BaseManager(address=('', 50000), authkey=b'abc') >>> server = manager.get_server() >>> server.serve_forever()
Server
дополнительно содержит атрибутaddress
.
-
connect
()¶ Подключить объект локального менеджера к процессу удаленного менеджера
>>> from multiprocessing.managers import BaseManager >>> m = BaseManager(address=('127.0.0.1', 50000), authkey=b'abc') >>> m.connect()
-
shutdown
()¶ Остановить процесс, используемый менеджером. Это доступно только в том случае, если
start()
использовался для запуска серверного процесса.Его можно вызывать несколько раз.
-
register
(typeid[, callable[, proxytype[, exposed[, method_to_typeid[, create_method]]]]])¶ Метод класса, который можно использовать для регистрации типа или вызова с помощью класса менеджера.
typeid — это «идентификатор типа», который используется для идентификации определенного типа совместно используемого объекта. Это должна быть строка.
callable — это вызываемый объект, используемый для создания объектов для этого идентификатора типа. Если экземпляр менеджера будет подключен к серверу с помощью метода
connect()
или если аргумент create_method содержит значениеFalse
, то его можно оставить какNone
.proxytype является подклассом
BaseProxy
, который используется для создания прокси для общих объектов с этим typeid. ЕслиNone
, тогда прокси-класс создается автоматически.exposed используется для указания последовательности имён методов, к которым прокси для этого typeid должны иметь доступ с помощью
BaseProxy._callmethod()
. (Если exposed —None
, то вместо него используетсяproxytype._exposed_
, если он существует.) в случае, если не указан открытый список, будут доступны все «общедоступные методы» общего объекта. (Здесь «общедоступный метод» означает любой атрибут, который содержит метод__call__()
и имя которого не начинается с'_'
.)method_to_typeid — это отображение, используемое для указания типа возвращаемого значения тех открытых методов, которые должны возвращать прокси. Он сопоставляет имена методов с типизированными строками. (Если method_to_typeid — это
None
, тогда вместо него используетсяproxytype._method_to_typeid_
, если он существует.) Если имя метода не является ключом этого сопоставления или если сопоставление —None
, то объект, возвращаемый методом, будет скопирован по значению.create_method определяет, следует ли создавать метод с именем typeid, которое можно использовать, чтобы указать серверному процессу создать новый общий объект и вернуть для него прокси. По умолчанию это
True
.
Экземпляры
BaseManager
также имеют одно свойство только для чтения:-
address
¶ Адрес, используемый менеджером.
Изменено в версии 3.3: Управляющие объекты поддерживают протокол управления контекстом, см. Типы менеджера контекста.
__enter__()
запускает серверный процесс (если он еще не запущен), а затем возвращает объект- менеджер.__exit__()
вызываетshutdown()
.В предыдущих версиях
__enter__()
не запускал серверный процесс менеджера, если он ещё не был запущен.-
-
class
multiprocessing.managers.
SyncManager
¶ Подкласс
BaseManager
, который может использоваться для синхронизации процессов. Объекты этого типа возвращаютсяmultiprocessing.Manager()
.Его методы создают и возвращают Прокси объекты для ряда часто используемых типов данных для синхронизации между процессами. Это, в частности, включает общие списки и словари.
-
Barrier
(parties[, action[, timeout]])¶ Создать общий объект
threading.Barrier
и вернуть для него прокси.Добавлено в версии 3.3.
-
BoundedSemaphore
([value])¶ Создать общий объект
threading.BoundedSemaphore
и вернуть для него прокси.
-
Condition
([lock])¶ Создать общий объект
threading.Condition
и вернуть для него прокси.Если предоставляется lock, он должен быть прокси для объекта
threading.Lock
илиthreading.RLock
.Изменено в версии 3.3: Добавлен метод
wait_for()
.
-
Event
()¶ Создать общий объект
threading.Event
и вернуть для него прокси.
-
Lock
()¶ Создать общий объект
threading.Lock
и вернуть для него прокси.
-
Queue
([maxsize])¶ Создать общий объект
queue.Queue
и вернуть для него прокси.
-
RLock
()¶ Создать общий объект
threading.RLock
и вернуть для него прокси.
-
Semaphore
([value])¶ Создать общий объект
threading.Semaphore
и вернуть для него прокси.
-
Array
(typecode, sequence)¶ Создать массив и вернуть для него прокси.
-
Value
(typecode, value)¶ Создать объект с доступным для записи атрибутом
value
и вернуть для него прокси.
Изменено в версии 3.6: Общие объекты могут быть вложенными. Например, общий контейнерный объект, такой как общий список, может содержать другие общие объекты, которые будут управляться и синхронизироваться
SyncManager
.-
-
class
multiprocessing.managers.
Namespace
¶ Тип, который может регистрироваться с помощью
SyncManager
.Объект пространства имён не содержит общедоступных методов, но содержит атрибуты с возможностью записи. Его представление показывает значения его атрибутов.
Однако при использовании прокси для объекта пространства имён атрибут, начинающийся с
'_'
, будет атрибутом прокси, а не атрибутом референта :>>> manager = multiprocessing.Manager() >>> Global = manager.Namespace() >>> Global.x = 10 >>> Global.y = 'hello' >>> Global._z = 12.3 # это атрибут прокси >>> print(Global) Namespace(x=10, y='hello')
Настраиваемые менеджеры¶
Чтобы создать своего собственного менеджера, нужно создать подкласс
BaseManager
и использовать метод класса register()
для регистрации новых типов или вызываемых объектов в классе менеджера.
Например:
from multiprocessing.managers import BaseManager
class MathsClass:
def add(self, x, y):
return x + y
def mul(self, x, y):
return x * y
class MyManager(BaseManager):
pass
MyManager.register('Maths', MathsClass)
if __name__ == '__main__':
with MyManager() as manager:
maths = manager.Maths()
print(maths.add(4, 3)) # распечатает 7
print(maths.mul(7, 8)) # распечатает 56
Использование удаленного менеджера¶
Можно запустить управляющий сервер на одной машине, а клиенты будут использовать его с других машин (при условии, что задействованные брандмауэры это позволяют).
Выполнение следующих команд создаёт сервер для единой общей очереди, к которой могут получить доступ удаленные клиенты:
>>> from multiprocessing.managers import BaseManager
>>> from queue import Queue
>>> queue = Queue()
>>> class QueueManager(BaseManager): pass
>>> QueueManager.register('get_queue', callable=lambda:queue)
>>> m = QueueManager(address=('', 50000), authkey=b'abracadabra')
>>> s = m.get_server()
>>> s.serve_forever()
Один клиент может получить доступ к серверу следующим образом:
>>> from multiprocessing.managers import BaseManager
>>> class QueueManager(BaseManager): pass
>>> QueueManager.register('get_queue')
>>> m = QueueManager(address=('foo.bar.org', 50000), authkey=b'abracadabra')
>>> m.connect()
>>> queue = m.get_queue()
>>> queue.put('hello')
Его также может использовать другой клиент:
>>> from multiprocessing.managers import BaseManager
>>> class QueueManager(BaseManager): pass
>>> QueueManager.register('get_queue')
>>> m = QueueManager(address=('foo.bar.org', 50000), authkey=b'abracadabra')
>>> m.connect()
>>> queue = m.get_queue()
>>> queue.get()
'hello'
Локальные процессы также могут обращаться к этой очереди, используя указанный выше код на клиенте для удаленного доступа к ней:
>>> from multiprocessing import Process, Queue
>>> from multiprocessing.managers import BaseManager
>>> class Worker(Process):
... def __init__(self, q):
... self.q = q
... super(Worker, self).__init__()
... def run(self):
... self.q.put('local hello')
...
>>> queue = Queue()
>>> w = Worker(queue)
>>> w.start()
>>> class QueueManager(BaseManager): pass
...
>>> QueueManager.register('get_queue', callable=lambda: queue)
>>> m = QueueManager(address=('', 50000), authkey=b'abracadabra')
>>> s = m.get_server()
>>> s.serve_forever()
Прокси объекты¶
Прокси — это объект, который ссылается на общий объект, который (предположительно) живет в другом процессе. Общий объект называется прокси ссылкой. У нескольких прокси-объектов может быть одна и та же ссылка.
Прокси-объект содержит методы, которые вызывают соответствующие методы его ссылки (хотя не каждый метод ссылки обязательно будет доступен через прокси). Таким образом, прокси можно использовать так же, как и его ссылку :
>>> from multiprocessing import Manager
>>> manager = Manager()
>>> l = manager.list([i*i for i in range(10)])
>>> print(l)
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
>>> print(repr(l))
<ListProxy object, typeid 'list' at 0x...>
>>> l[4]
16
>>> l[2:5]
[4, 9, 16]
Обратите внимание, что применение str()
к прокси вернёт представление
ссылки, тогда как применение repr()
вернёт представление прокси.
Важной особенностью прокси-объектов является то, что их можно пиклить, поэтому их можно передавать между процессами. Таким образом, ссылка может содержать Прокси объекты. Это разрешает вложение этих управляемых списков, dict’ов и прочего Прокси объекты :
>>> a = manager.list()
>>> b = manager.list()
>>> a.append(b) # ссылка a теперь содержит ссылку b
>>> print(a, b)
[<ListProxy object, typeid 'list' at ...>] []
>>> b.append('hello')
>>> print(a[0], b)
['hello'] ['hello']
Аналогичным образом, прокси dict и list могут быть вложены друг в друга:
>>> l_outer = manager.list([ manager.dict() for i in range(2) ])
>>> d_first_inner = l_outer[0]
>>> d_first_inner['a'] = 1
>>> d_first_inner['b'] = 2
>>> l_outer[1]['c'] = 3
>>> l_outer[1]['z'] = 26
>>> print(l_outer[0])
{'a': 1, 'b': 2}
>>> print(l_outer[1])
{'c': 3, 'z': 26}
Если стандартные (не прокси) объекты list
или dict
содержатся
в ссылке, изменения этих изменяемых значений не будут распространяться через
диспетчер, поскольку прокси не содержит возможности узнать, когда значения,
содержащиеся в нём, изменены. Однако сохранение значения в прокси
контейнера (которое вызывает __setitem__
на объекте-прокси)
распространяется через диспетчер, и поэтому для эффективного изменения такого
элемента можно повторно назначить измененное значение прокси
контейнера:
# создать список прокси и добавить изменяемый объект (словарь)
lproxy = manager.list()
lproxy.append({})
# теперь изменяемый словарь
d = lproxy[0]
d['a'] = 1
d['b'] = 2
# на этом этапе изменения d еще не синхронизируются, но при обновлении словаря
# прокси уведомляется об изменении
lproxy[0] = d
Этот подход, возможно, менее удобен, чем использование вложенного Прокси объекты для большинства случаев использования, но также демонстрирует уровень контроля над синхронизацией.
Примечание
Типы прокси в multiprocessing
не поддерживают сравнения по значению.
Так, например, у нас есть:
>>> manager.list([1,2,3]) == [1,2,3]
False
Вместо этого следует использовать копию ссылки при проведении сравнений.
-
class
multiprocessing.managers.
BaseProxy
¶ Прокси-объекты являются экземплярами подклассов
BaseProxy
.-
_callmethod
(methodname[, args[, kwds]])¶ Вызов и возвращение результата метода ссылки прокси.
Если
proxy
— это прокси, ссылка которого —obj
, тогда выражение:proxy._callmethod(methodname, args, kwds)
Вычислит выражение:
getattr(obj, methodname)(*args, **kwds)
в процессе менеджера.
Возвращаемое значение будет копией результата вызова или прокси для нового общего объекта, см. документацию для аргумента method_to_typeid
BaseManager.register()
.Если при вызове возникает исключение, оно повторно вызывается
_callmethod()
. Если в процессе менеджера возникает какое-либо другое исключение, оно преобразуется в исключениеRemoteError
и вызывается_callmethod()
.Обратите внимание, что исключение будет вызвано, если methodname не был exposed.
Пример использования
_callmethod()
:>>> l = manager.list(range(10)) >>> l._callmethod('__len__') 10 >>> l._callmethod('__getitem__', (slice(2, 7),)) # эквивалентно l[2:7] [2, 3, 4, 5, 6] >>> l._callmethod('__getitem__', (20,)) # эквивалентно l[20] Traceback (most recent call last): ... IndexError: list index out of range
-
_getvalue
()¶ Вернуть копию ссылки.
Если ссылку невозможно выбрать, вызовет исключение.
-
__repr__
()¶ Возвращает представление прокси-объекта.
-
__str__
()¶ Возвращает представление ссылки.
-
Очистка¶
Прокси-объект использует обратный вызов weakref, поэтому, когда он получает сборщик мусора, он отменяет регистрацию у менеджера, которому принадлежит его ссылка.
Общий объект удаляется из процесса менеджера, когда на него больше не ссылаются прокси.
Пулы процессов¶
Можно создать пул процессов, которые будут выполнять задачи, переданные ему с
помощью класса Pool
.
-
class
multiprocessing.pool.
Pool
([processes[, initializer[, initargs[, maxtasksperchild[, context]]]]])¶ Он поддерживает асинхронные результаты с таймаутами и обратными вызовами и содержит реализацию параллельного отображения.
processes — количество используемых рабочих процессов. Если processes -
None
, то используется номер, возвращенныйos.cpu_count()
.Если initializer не
None
, то каждый рабочий процесс при запуске будет вызыватьinitializer(*initargs)
.maxtasksperchild — это количество задач, которые рабочий процесс может выполнить до того, как он завершится и будет заменен новым рабочим процессом, чтобы освободить неиспользуемые ресурсы. По умолчанию maxtasksperchild -
None
, что означает, что рабочие процессы будут жить столько же, сколько и пул.context может использоваться для указания контекста, используемого для запуска рабочих процессов. Обычно пул создается с помощью функции
multiprocessing.Pool()
или методаPool()
объекта контекста. В обоих случаях context установлен соответствующим образом.Обратите внимание, что методы объекта пула должны вызываться только процессом, создавшим пул.
Предупреждение
Объекты
multiprocessing.pool
имеют внутренние ресурсы, которыми необходимо правильно управлять (как и любой другой ресурс), используя пул в качестве диспетчера контекста или вызываяclose()
иterminate()
вручную. Невыполнение этого требования может привести к зависанию процесса при завершении.Обратите внимание, что это неправильно, чтобы полагаться на сборщик мусора для уничтожения пула, поскольку CPython не гарантирует, что будет вызван финализатор пула (см.
object.__del__()
для получения дополнительной информации).Добавлено в версии 3.2: maxtasksperchild
Добавлено в версии 3.4: context
Примечание
Рабочие процессы в
Pool
обычно живут в течение всего времени очереди работ пула. Часто используемый в других системах (например, Apache, mod_wsgi и т. д.) шаблон для освобождения ресурсов, удерживаемых воркерами, заключается в том, чтобы позволить воркеру в пуле выполнить только заданный объем работы перед выходом, очисткой и созданием нового процесса, заменив старый. Аргумент maxtasksperchild дляPool
предоставляет эту возможность конечному пользователю.-
apply
(func[, args[, kwds]])¶ Вызов func с аргументами args и ключевыми аргументами kwds. Блокирует, пока не будет готов результат. Учитывая эти блоки,
apply_async()
лучше подходит для выполнения работы параллельно. Кроме того, func выполняется только в одном из рабочих пула.
-
apply_async
(func[, args[, kwds[, callback[, error_callback]]]])¶ Вариант метода
apply()
, который возвращает объектAsyncResult
.Если указан callback, то это должен быть вызываемый объект, который принимает единственный аргумент. Когда результат становится готовым, к нему применяется callback, то есть если вызов не завершился неудачно, и в этом случае вместо него применяется error_callback.
Если указан error_callback, то это должен быть вызываемый объект, который принимает единственный аргумент. Если целевая функция терпит неудачу, то вызывается error_callback с экземпляром исключения.
Обратные вызовы должны выполняться немедленно, иначе поток, обрабатывающий результаты, будет заблокирован.
-
map
(func, iterable[, chunksize])¶ Параллельный эквивалент встроенной функции
map()
(хотя он поддерживает только один аргумент iterable, для нескольких итераций см.starmap()
). Блокирует, пока не будет готов результат.Этот метод разбивает итерируемый объект на несколько фрагментов, которые он отправляет в пул процессов как отдельные задачи. (Приблизительный) размер этих фрагментов можно указать, задав для chunksize положительное целое число.
Обратите внимание, что это может привести к высокому использованию памяти для очень длинных итераций. Для большей эффективности рассмотрите возможность использования
imap()
илиimap_unordered()
с явной опцией chunksize.
-
map_async
(func, iterable[, chunksize[, callback[, error_callback]]])¶ Вариант метода
map()
, который возвращает объектAsyncResult
.Если указан callback, то это должен быть вызываемый объект, который принимает единственный аргумент. Когда результат становится готовым, к нему применяется callback, то есть если вызов не завершился неудачно, и в этом случае вместо него применяется error_callback.
Если указан error_callback, то это должен быть вызываемый объект, принимающий единственный аргумент. Если целевая функция терпит неудачу, то вызывается error_callback с экземпляром исключения.
Обратные вызовы должны выполняться немедленно, иначе поток, обрабатывающий результаты, будет заблокирован.
-
imap
(func, iterable[, chunksize])¶ Более ленивая версия
map()
.Аргумент chunksize совпадает с аргументом, используемым методом
map()
. Для очень длинных итераций использование большого значения chunksize может сильно ускорить выполнение задания, чем использование значения по умолчанию1
.Также, если chunksize —
1
, то методnext()
итератора, возвращаемый методомimap()
, содержит необязательный параметр timeout:next(timeout)
подниметmultiprocessing.TimeoutError
, если результат не может быть возвращён в течение timeout секунд.
-
imap_unordered
(func, iterable[, chunksize])¶ То же, что и
imap()
, за исключением того, что порядок результатов возвращаемого итератора следует считать произвольным. (Только когда есть только один рабочий процесс, порядок гарантированно будет «правильным».)
-
starmap
(func, iterable[, chunksize])¶ Как
map()
за исключением того, что элементы iterable, как ожидается, будут итерируемы, которые распакованы как аргументы.Следовательно iterable
[(1,2), (3, 4)]
приводит к[func(1,2), func(3,4)]
.Добавлено в версии 3.3.
-
starmap_async
(func, iterable[, chunksize[, callback[, error_callback]]])¶ Комбинация
starmap()
иmap_async()
, которая выполняет итерацию по iterable итераций и вызывает func с распакованными итерациями. Возвращает объект результата.Добавлено в версии 3.3.
-
close
()¶ Предотвращает отправку каких-либо других задач в пул. Как только все задачи будут выполнены, рабочие процессы завершатся.
-
terminate
()¶ Когда объект пула будет обработан сборщиком мусора, немедленно будет вызван
terminate()
.
-
join
()¶ Дождаться завершения рабочих процессов. Перед использованием
join()
необходимо вызватьclose()
илиterminate()
.
Добавлено в версии 3.3: Объекты пула теперь поддерживают протокол управления контекстом, см. Типы менеджера контекста.
__enter__()
возвращает объект пула, а__exit__()
вызываетterminate()
.-
-
class
multiprocessing.pool.
AsyncResult
¶ Класс результата, возвращаемого
Pool.apply_async()
иPool.map_async()
.-
get
([timeout])¶ Вернуть результат, когда он появится. Если timeout не
None
и результат не приходит в течение timeout секунд, то возникаетmultiprocessing.TimeoutError
. Если удаленный вызов вызвал исключение, это исключение будет повторно вызваноget()
.
-
wait
([timeout])¶ Подождите, пока не будет доступен результат или пока не пройдет timeout секунд.
-
ready
()¶ Вернуть, был ли вызов завершён.
-
successful
()¶ Возвращает, был ли вызов завершён без создания исключения. Поднимет
ValueError
, если результат не готов.Изменено в версии 3.7: Если результат не готов, то вместо
AssertionError
вызываетсяValueError
.
-
В следующем примере демонстрируется использование пула:
from multiprocessing import Pool
import time
def f(x):
return x*x
if __name__ == '__main__':
with Pool(processes=4) as pool: # запустить 4 процесса воркера
result = pool.apply_async(f, (10,)) # асинхронно вычислить "f(10)" в одном процессе
print(result.get(timeout=1)) # печатает "100", если только ваш компьютер не работает *очень* медленно
print(pool.map(f, range(10))) # печать "[0, 1, 4,..., 81]"
it = pool.imap(f, range(10))
print(next(it)) # печать "0"
print(next(it)) # печать "1"
print(it.next(timeout=1)) # печать "4" если ваш компьютер не *очень* медленный
result = pool.apply_async(time.sleep, (10,))
print(result.get(timeout=1)) # вызывает multiprocessing.TimeoutError
Слушатели и клиенты¶
Обычно передача сообщений между процессами осуществляется с помощью очередей
или объектов Connection
, возвращаемых Pipe()
.
Однако модуль multiprocessing.connection
допускает некоторую
дополнительную гибкость. Он в основном предоставляет API высокого уровня,
ориентированный на сообщения, для работы с сокетами или именованными конвейерами
Windows. Он также поддерживает дайджест-аутентификацию с использованием модуля
hmac
и для одновременного опроса нескольких подключений.
-
multiprocessing.connection.
deliver_challenge
(connection, authkey)¶ Отправьте случайно сгенерированное сообщение на другой конец соединения и дождаться ответа.
Если ответ совпадает с дайджестом сообщения с использованием authkey в качестве ключа, то на другой конец соединения отправляется приветственное сообщение. В противном случае повышается
AuthenticationError
.
-
multiprocessing.connection.
answer_challenge
(connection, authkey)¶ Получите сообщение, вычислите дайджест сообщения, используя authkey в качестве ключа, а затем отправить дайджест обратно.
Если приветственное сообщение не получено, выводится
AuthenticationError
.
-
multiprocessing.connection.
Client
(address[, family[, authkey]])¶ Попытаться установить соединение со слушателем, который использует адрес address, возвращая
Connection
.Тип соединения определяется аргументом family, но, как правило, его можно пропустить, поскольку его обычно можно вывести из формата address. (См. Форматы адресов)
Если задан authkey, а не None, это должна быть байтовая строка, которая будет использоваться в качестве секретного ключа для проверки подлинности на основе HMAC. Аутентификация не выполняется, если authkey — нет.
AuthenticationError
возникает в случае сбоя аутентификации. См. Ключи аутентификации.
-
class
multiprocessing.connection.
Listener
([address[, family[, backlog[, authkey]]]])¶ Обёртка для связанного сокета или именованного конвейера Windows, который «прослушивает» соединения.
address — это адрес, который будет использоваться привязанным сокетом или именованным конвейером объекта слушателя.
Примечание
Если используется адрес 0.0.0.0, этот адрес не будет конечной точкой для подключения в Windows. Если вам нужна подключаемая конечная точка, вы должны использовать 127.0.0.1.
family — это тип используемого сокета (или именованного конвейера). Это может быть одна из строк
'AF_INET'
(для сокета TCP),'AF_UNIX'
(для сокета домена Unix) или'AF_PIPE'
(для именованного конвейера Windows). Из них гарантированно будет доступен только первый. Если family —None
, то семейство выводится из формата address. Если address также являетсяNone
, то выбирается значение по умолчанию. По умолчанию это семейство, которое считается самым быстрым из доступных. См. Форматы адресов. Обратите внимание, что если family -'AF_UNIX'
, а адрес —None
, то сокет будет создан в частном временном каталоге, созданном с использованиемtempfile.mkstemp()
.Если объект прослушивателя использует сокет, то backlog (1 по умолчанию) передается методу
listen()
сокета после его привязки.Если задан authkey, а не None, это должна быть байтовая строка, которая будет использоваться в качестве секретного ключа для проверки подлинности на основе HMAC. Аутентификация не выполняется, если authkey — нет.
AuthenticationError
возникает в случае сбоя аутентификации. См. Ключи аутентификации.-
accept
()¶ Принять соединение с привязанным сокетом или именованным каналом объекта слушателя и вернуть объект
Connection
. Если попытка аутентификации не удалась, возникаетAuthenticationError
.
-
close
()¶ Закрыть связанный сокет или именованный конвейер объекта-слушателя. Он вызывается автоматически, когда слушатель собирает мусор. Однако желательно вызывать его явно.
Объекты-слушатели содержат следующие свойства, доступные только для чтения:
-
address
¶ Адрес, который используется объектом Listener.
-
last_accepted
¶ Адрес, с которого пришло последнее принятое соединение. Если он недоступен, то это
None
.
Добавлено в версии 3.3: Объекты-слушатели теперь поддерживают протокол управления контекстом, см. Типы менеджера контекста.
__enter__()
возвращает объект слушателя, а__exit__()
вызываетclose()
.-
-
multiprocessing.connection.
wait
(object_list, timeout=None)¶ Дождаться готовности объекта в object_list. Возвращает список готовых объектов в object_list. Если timeout — это число с плавающей запятой, то вызов блокируется максимум на столько секунд. Если timeout —
None
, то он будет заблокирован на неограниченный период. Отрицательный тайм-аут эквивалентен нулевому таймауту.И для Unix, и для Windows объект может отображаться в object_list, если это так
- читаемый объект
Connection
; - подключенный и читаемый объект
socket.socket
; или атрибутsentinel
объектаProcess
.
Соединение или объект сокета готовы, когда есть данные, доступные для чтения из него, или когда другой конец был закрыт.
Unix:
wait(object_list, timeout)
почти эквивалентенselect.select(object_list, [], [], timeout)
. Разница в том, что еслиselect.select()
прерывается сигналом, он может вызватьOSError
с номером ошибкиEINTR
, аwait()
— нет.Windows: элемент в object_list должен быть либо целочисленным дескриптором, который является ожидаемым (согласно определению, используемым в документации функции Win32
WaitForMultipleObjects()
), либо это может быть объект с методомfileno()
, который возвращает дескриптор сокета, либо ручка трубы. (Обратите внимание, что ручки конвейеров и сокетов не являются ожидаемыми дескрипторами.)Добавлено в версии 3.3.
- читаемый объект
Примеры
Следующий код сервера создает слушателя, который использует
'secret password'
в качестве ключа аутентификации. Затем он ожидает
подключения и отправляет некоторые данные клиенту:
from multiprocessing.connection import Listener
from array import array
address = ('localhost', 6000) # семейство выведено как 'AF_INET'
with Listener(address, authkey=b'secret password') as listener:
with listener.accept() as conn:
print('connection accepted from', listener.last_accepted)
conn.send([2.25, None, 'junk', float])
conn.send_bytes(b'hello')
conn.send_bytes(array('i', [42, 1729]))
Следующий код подключается к серверу и получает некоторые данные с сервера:
from multiprocessing.connection import Client
from array import array
address = ('localhost', 6000)
with Client(address, authkey=b'secret password') as conn:
print(conn.recv()) # => [2.25, None, 'junk', float]
print(conn.recv_bytes()) # => 'hello'
arr = array('i', [0, 0, 0, 0, 0])
print(conn.recv_bytes_into(arr)) # => 8
print(arr) # => array('i', [42, 1729, 0, 0, 0])
Следующий код использует wait()
для ожидания
сообщений от нескольких процессов одновременно:
import time, random
from multiprocessing import Process, Pipe, current_process
from multiprocessing.connection import wait
def foo(w):
for i in range(10):
w.send((i, current_process().name))
w.close()
if __name__ == '__main__':
readers = []
for i in range(4):
r, w = Pipe(duplex=False)
readers.append(r)
p = Process(target=foo, args=(w,))
p.start()
# Мы закрываем записываемый конец конвейера, чтобы убедиться, что p является
# единственным процессом, который владеет дескриптором для него. Это гарантирует,
# что когда p закроет свой дескриптор для записываемого конца, wait()
# быстро сообщит о читаемом конце как о прочитанном.
w.close()
while readers:
for r in wait(readers):
try:
msg = r.recv()
except EOFError:
readers.remove(r)
else:
print(msg)
Форматы адресов¶
- Адрес
'AF_INET'
— это кортеж вида(hostname, port)
, где hostname — строка, а port — целое число. - Адрес
'AF_UNIX'
— это строка, представляющая имя файла в файловой системе. - Адрес
'AF_PIPE'
представляет собой строку видаr'\\.\pipe{PipeName}'
. Чтобы использоватьClient()
для подключения к именованному каналу на удаленном компьютере с именем ServerName, вместо этого следует использовать адрес в формеr'\ServerName\pipe{PipeName}'
.
Обратите внимание, что любая строка, начинающаяся с двух обратных косых черт,
по умолчанию считается адресом 'AF_PIPE'
, а не адресом 'AF_UNIX'
.
Ключи аутентификации¶
При использовании Connection.recv
полученные данные
автоматически не выбираются. К сожалению, получение данных из ненадежного
источника представляет собой угрозу безопасности. Поэтому Listener
и
Client()
используют модуль hmac
для обеспечения дайджест-
аутентификации.
Ключ аутентификации — это строка байтов, которую можно рассматривать как пароль: после установления соединения оба конца потребуют доказательства того, что другой конец знает ключ аутентификации. (Демонстрация того, что оба конца используют один и тот же ключ, подразумевает ли не отправку ключа по соединению.
Если аутентификация запрошена, но ключ аутентификации не указан, используется
возвращаемое значение current_process().authkey
(см.
Process
). Это значение будет автоматически
унаследовано любым объектом Process
, созданным
текущим процессом. Это означает, что (по умолчанию) все процессы
многопроцессорной программы будут использовать один ключ аутентификации,
который можно использовать при установке соединений между собой.
Подходящие ключи аутентификации также могут быть сгенерированы с помощью
os.urandom()
.
Логирование¶
Доступна некоторая поддержка ведения журнала. Однако обратите внимание, что
пакет logging
не использует разделяемые блокировки процессов, поэтому (в
зависимости от типа обработчика) сообщения от разных процессов могут
смешиваться.
-
multiprocessing.
get_logger
()¶ Возвращает регистратор, используемый
multiprocessing
. При необходимости будет создан новый.При первом создании логгер содержит уровень
logging.NOTSET
и не содержит обработчика по умолчанию. Сообщения, отправленные этому логгеру, по умолчанию не передаются корневому логгеру.Обратите внимание, что в Windows дочерние процессы наследуют только уровень логгера родительского процесса — любые другие настройки логгера не будут унаследованы.
-
multiprocessing.
log_to_stderr
()¶ Функция выполняет вызов
get_logger()
, но помимо возврата логгера, созданного get_logger, добавляет обработчик, который отправляет вывод вsys.stderr
в формате'[%(levelname)s/%(processName)s] %(message)s'
.
Ниже приведен пример сеанса с включенным ведением журнала:
>>> import multiprocessing, logging
>>> logger = multiprocessing.log_to_stderr()
>>> logger.setLevel(logging.INFO)
>>> logger.warning('doomed')
[WARNING/MainProcess] doomed
>>> m = multiprocessing.Manager()
[INFO/SyncManager-...] child process calling self.run()
[INFO/SyncManager-...] created temp directory /.../pymp-...
[INFO/SyncManager-...] manager serving at '/.../listener-...'
>>> del m
[INFO/MainProcess] sending shutdown message to manager
[INFO/SyncManager-...] manager exiting with exitcode 0
Полную таблицу уровней ведения журнала см. В модуле logging
.
Модуль multiprocessing.dummy
¶
multiprocessing.dummy
копирует API multiprocessing
, но является
не более чем обёрткой для модуля threading
.
В частности, функция Pool
, предоставляемая multiprocessing.dummy
,
возвращает экземпляр ThreadPool
, который является подклассом
Pool
, который поддерживает все те же вызовы методов, но использует пул
рабочих потоков, а не рабочих процессов.
-
class
multiprocessing.pool.
ThreadPool
([processes[, initializer[, initargs]]])¶ Экземпляры
ThreadPool
полностью совместимы по интерфейсу с экземплярамиPool
, и их ресурсами также необходимо правильно управлять, используя пул в качестве диспетчера контекста или вручную вызываяclose()
иterminate()
.processes — количество используемых рабочих потоков. Если processes —
None
, то используется номер, возвращаемыйos.cpu_count()
.Если initializer не
None
, то каждый рабочий процесс при запуске будет вызыватьinitializer(*initargs)
.В отличие от
Pool
, maxtasksperchild и context не могут быть предоставлены.Примечание
ThreadPool
использует тот же интерфейс, что иPool
, который разработан на основе пула процессов и предшествует введению модуляconcurrent.futures
. Таким образом, он наследует некоторые операции, которые не имеют смысла для пула, поддерживаемого потоками, и содержит свой собственный тип для представления состояния асинхронных заданий,AsyncResult
, который не понимается никакими другими библиотеками.Обычно пользователи предпочитают использовать
concurrent.futures.ThreadPoolExecutor
, который содержит более простой интерфейс, изначально спроектированный вокруг потоков, и который возвращает экземплярыconcurrent.futures.Future
, совместимые со многими другими библиотеками, включаяasyncio
.
Руководство по программированию¶
Существуют определенные правила и идиомы, которых следует придерживаться при
использовании multiprocessing
.
Все методы запуска¶
Следующее относится ко всем методам запуска.
Избегайте разделяемого состояния
По возможности следует избегать перемещения больших объемов данных между процессами.
Вероятно, лучше придерживаться использования очередей или конвейеров для связи между процессами, а не примитивов синхронизации более низкого уровня.
Пикленгуемость
Убедитесь, что аргументы методов прокси-серверов пиклингуемые.
Поточная безопасность прокси
Не используйте прокси-объект из более чем одного потока, если вы не защитите его блокировкой.
Присоединение к процессам зомби
В Unix, когда процесс завершается, но не присоединяется к нему, он становится зомби. Их никогда не должно быть очень много, потому что каждый раз, когда запускается новый процесс (или вызываетсяactive_children()
), все завершенные процессы, которые еще не были присоединены, будут присоединены. Также вызов завершенного процессаProcess.is_alive
присоединится к процессу. Тем не менее, вероятно, хорошей практикой является явное присоединение ко всем процессам, которые вы запускаете.
Лучше наследовать, чем пиклить/анпиклить
При использовании методов запуска spawn или forkserver многие типы изmultiprocessing
должны быть выбираемыми, чтобы дочерние процессы могли их использовать. Однако обычно следует избегать отправки общих объектов другим процессам с использованием конвейеров или очередей. Вместо этого вы должны организовать программу так, чтобы процесс, которому требуется доступ к совместно используемому ресурсу, созданному где-то ещё, мог наследовать его от процесса-предка.
Избегайте завершения процессов
Использование методаProcess.terminate
для остановки процесса может привести к тому, что любые общие ресурсы (такие как блокировки, семафоры, конвейеры и очереди), в настоящее время используемые процессом, станут сломанными или недоступными для других процессов.
Присоединение к процессам, использующим очереди
Имейте в виду, что процесс, который поместил элементы в очередь, будет ждать перед завершением, пока все буферизованные элементы не будут переданы потоком-фидером в нижележащий канал. (Дочерний процесс может вызвать метод очереди
Queue.cancel_join_thread
, чтобы избежать такого поведения.Это означает, что всякий раз, когда вы используете очередь, вам нужно убедиться, что все элементы, которые были помещены в очередь, в конечном итоге будут удалены до присоединения к процессу. В противном случае вы не можете быть уверены, что процессы, поместившие элементы в очередь, завершатся. Помните также, что недемонические процессы будут подключаться автоматически.
Примером взаимоблокировки является следующий код:
from multiprocessing import Process, Queue def f(q): q.put('X' * 1000000) if __name__ == '__main__': queue = Queue() p = Process(target=f, args=(queue,)) p.start() p.join() # это мертвая блокировка obj = queue.get()Исправление заключается в том, чтобы поменять местами последние две строки (или просто удалить строку
p.join()
).
Явная передача ресурсов дочерним процессам
В Unix, использующем метод запуска fork, дочерний процесс может использовать общий ресурс, созданный в родительском процессе с использованием глобального ресурса. Однако лучше передать объект в качестве аргумента конструктору дочернего процесса.
Помимо обеспечения совместимости кода (потенциально) с Windows и другими методами запуска, это также гарантирует, что, пока дочерний процесс все еще жив, объект не будет собираться сборщиком мусора в родительском процессе. Это может быть важно, если какой-то ресурс освобождается при сборке мусора в родительском процессе.
Таким образом для сущность:
from multiprocessing import Process, Lock def f(): """сделать что-то используя 'lock'""" if __name__ == '__main__': lock = Lock() for i in range(10): Process(target=f).start()следует переписать как:
from multiprocessing import Process, Lock def f(l): """ сделать что-нибудь, используя 'l' """ if __name__ == '__main__': lock = Lock() for i in range(10): Process(target=f, args=(lock,)).start()
Остерегайтесь замены sys.stdin
на «файл как объект».
multiprocessing
, изначально вызываемый безоговорочно:os.close(sys.stdin.fileno())в методе
multiprocessing.Process._bootstrap()
— это привело к проблемам с процессами в процессах. Это было изменено на:sys.stdin.close() sys.stdin = open(os.open(os.devnull, os.O_RDONLY), closefd=False)Он решает фундаментальную проблему столкновения процессов друг с другом, что приводит к ошибке неверного файлового дескриптора, но представляет потенциальную опасность для приложений, которые заменяют
sys.stdin()
на «файловый объект» с буферизацией вывода. Опасность заключается в том, что если несколько процессов вызовутclose()
для файлового объекта, это может привести к тому, что одни и те же данные будут сброшены в объект несколько раз, что приведет к повреждению.Если вы пишете объект в виде файла и реализуете собственное кэширование, вы можете сделать его безопасным для вилки, сохраняя pid всякий раз, когда вы добавляете в кеш, и отбрасывая кеш при изменении pid. Например:
@property def cache(self): pid = os.getpid() if pid != self._pid: self._pid = pid self._cache = [] return self._cacheДля получения дополнительной информации см. bpo-5155, bpo-5313 и bpo-5331
Методы запуска spawn и forkserver¶
Есть несколько дополнительных ограничений, не относящихся к методу запуска fork.
Больше пикленгуемости
Убедитесь, что все аргументыProcess.__init__()
можно пиклинговать. Кроме того, если вы создаете подклассProcess
, убедитесь, что экземпляры будут пиклингуемыми при вызове методаProcess.start
.
Глобальные переменные
Имейте в виду, что если код, выполняемый в дочернем процессе, пытается получить доступ к глобальной переменной, то значение, которое он видит (если оно есть), может не совпадать со значением в родительском процессе во время вызова
Process.start
.Однако глобальные переменные, которые являются просто константами уровня модуля, не вызывают проблем.
Безопасный импорт основного модуля
Убедитесь, что основной модуль может быть безопасно импортирован новым интерпретатором Python, не вызывая нежелательных побочных эффектов (таких как запуск нового процесса).
Например, при использовании метода запуска spawn или forkserver следующий модуль завершится ошибкой с кодом
RuntimeError
:from multiprocessing import Process def foo(): print('hello') p = Process(target=foo) p.start()Вместо этого следует защитить «точку входа» программы с помощью
if __name__ == '__main__':
следующим образом:from multiprocessing import Process, freeze_support, set_start_method def foo(): print('hello') if __name__ == '__main__': freeze_support() set_start_method('spawn') p = Process(target=foo) p.start()(Строку
freeze_support()
можно не указывать, если программа будет запускаться в обычном режиме, а не в замороженном состоянии.)Это позволяет новому интерпретатору Python безопасно импортировать модуль, а затем запускать функцию модуля
foo()
.Аналогичные ограничения применяются, если пул или менеджер создается в основном модуле.
Примеры¶
Демонстрация создания и использования настраиваемых менеджеров и прокси:
from multiprocessing import freeze_support
from multiprocessing.managers import BaseManager, BaseProxy
import operator
##
class Foo:
def f(self):
print('you called Foo.f()')
def g(self):
print('you called Foo.g()')
def _h(self):
print('you called Foo._h()')
# Простая функция генератора
def baz():
for i in range(10):
yield i*i
# Тип прокси для объектов-генераторов
class GeneratorProxy(BaseProxy):
_exposed_ = ['__next__']
def __iter__(self):
return self
def __next__(self):
return self._callmethod('__next__')
# Функция для возврата модуля operator
def get_operator_module():
return operator
##
class MyManager(BaseManager):
pass
# зарегистрировать класс Foo; сделать `f()` и `g()` доступными через прокси
MyManager.register('Foo1', Foo)
# зарегистрировать класс Foo; сделать `g()` and `_h() `доступным через прокси
MyManager.register('Foo2', Foo, exposed=('g', '_h'))
# регистрируем генераторную функцию baz; используется `GeneratorProxy` для создания
# прокси
MyManager.register('baz', baz, proxytype=GeneratorProxy)
# зарегистрировать get_operator_module(); сделать общедоступные функции
# доступными через прокси
MyManager.register('operator', get_operator_module)
##
def test():
manager = MyManager()
manager.start()
print('-' * 20)
f1 = manager.Foo1()
f1.f()
f1.g()
assert not hasattr(f1, '_h')
assert sorted(f1._exposed_) == sorted(['f', 'g'])
print('-' * 20)
f2 = manager.Foo2()
f2.g()
f2._h()
assert not hasattr(f2, 'f')
assert sorted(f2._exposed_) == sorted(['g', '_h'])
print('-' * 20)
it = manager.baz()
for i in it:
print('<%d>' % i, end=' ')
print()
print('-' * 20)
op = manager.operator()
print('op.add(23, 45) =', op.add(23, 45))
print('op.pow(2, 94) =', op.pow(2, 94))
print('op._exposed_ =', op._exposed_)
##
if __name__ == '__main__':
freeze_support()
test()
Использование Pool
:
import multiprocessing
import time
import random
import sys
#
# Функции, используемые тестовым кодом
#
def calculate(func, args):
result = func(*args)
return '%s says that %s%s = %s' % (
multiprocessing.current_process().name,
func.__name__, args, result
)
def calculatestar(args):
return calculate(*args)
def mul(a, b):
time.sleep(0.5 * random.random())
return a * b
def plus(a, b):
time.sleep(0.5 * random.random())
return a + b
def f(x):
return 1.0 / (x - 5.0)
def pow3(x):
return x ** 3
def noop(x):
pass
#
# Тестовый код
#
def test():
PROCESSES = 4
print('Creating pool with %d processes\n' % PROCESSES)
with multiprocessing.Pool(PROCESSES) as pool:
#
# Тестовы
#
TASKS = [(mul, (i, 7)) for i in range(10)] + \
[(plus, (i, 8)) for i in range(10)]
results = [pool.apply_async(calculate, t) for t in TASKS]
imap_it = pool.imap(calculatestar, TASKS)
imap_unordered_it = pool.imap_unordered(calculatestar, TASKS)
print('Ordered results using pool.apply_async():')
for r in results:
print('\t', r.get())
print()
print('Ordered results using pool.imap():')
for x in imap_it:
print('\t', x)
print()
print('Unordered results using pool.imap_unordered():')
for x in imap_unordered_it:
print('\t', x)
print()
print('Ordered results using pool.map() --- will block till complete:')
for x in pool.map(calculatestar, TASKS):
print('\t', x)
print()
#
# Обработка ошибок теста
#
print('Testing error handling:')
try:
print(pool.apply(f, (5,)))
except ZeroDivisionError:
print('\tGot ZeroDivisionError as expected from pool.apply()')
else:
raise AssertionError('expected ZeroDivisionError')
try:
print(pool.map(f, list(range(10))))
except ZeroDivisionError:
print('\tGot ZeroDivisionError as expected from pool.map()')
else:
raise AssertionError('expected ZeroDivisionError')
try:
print(list(pool.imap(f, list(range(10)))))
except ZeroDivisionError:
print('\tGot ZeroDivisionError as expected from list(pool.imap())')
else:
raise AssertionError('expected ZeroDivisionError')
it = pool.imap(f, list(range(10)))
for i in range(10):
try:
x = next(it)
except ZeroDivisionError:
if i == 5:
pass
except StopIteration:
break
else:
if i == 5:
raise AssertionError('expected ZeroDivisionError')
assert i == 9
print('\tGot ZeroDivisionError as expected from IMapIterator.next()')
print()
#
# Тестирование таймаутов
#
print('Testing ApplyResult.get() with timeout:', end=' ')
res = pool.apply_async(calculate, TASKS[0])
while 1:
sys.stdout.flush()
try:
sys.stdout.write('\n\t%s' % res.get(0.02))
break
except multiprocessing.TimeoutError:
sys.stdout.write('.')
print()
print()
print('Testing IMapIterator.next() with timeout:', end=' ')
it = pool.imap(calculatestar, TASKS)
while 1:
sys.stdout.flush()
try:
sys.stdout.write('\n\t%s' % it.next(0.02))
except StopIteration:
break
except multiprocessing.TimeoutError:
sys.stdout.write('.')
print()
print()
if __name__ == '__main__':
multiprocessing.freeze_support()
test()
Пример использования очередей для подачи задач в коллекцию рабочих процессов и сбора результатов: