multiprocessing — Процессный параллелизм

Исходный код: Lib/multiprocessing/


Введение

multiprocessing — это пакет, поддерживающий порождение процессов с использованием API, аналогичный модулю threading. Пакет multiprocessing предлагает как локальный, так и удаленный параллелизм, эффективно обходя Глобальную блокировку интерпретатора за счёт использования подпроцессов вместо потоков. Благодаря этому, модуль multiprocessing позволяет программисту полностью использовать несколько процессоров на компьютере. Он работает как в Unix, так и в Windows.

В модуле multiprocessing также представлены API, не имеющие аналогов в модуле threading. Ярким примером этого является объект Pool, который предлагает удобные средства распараллеливания выполнения функции по нескольким входным значениям, распределяя входные данные по процессам (параллелизм данных). В следующем примере демонстрируется обычная практика определения таких функций в модуле, чтобы дочерние процессы могли успешно импортировать этот модуль. Это базовый пример параллелизма данных с использованием Pool:

from multiprocessing import Pool

def f(x):
    return x*x

if __name__ == '__main__':
    with Pool(5) as p:
        print(p.map(f, [1, 2, 3]))

будет напечатано в стандартный вывод:

[1, 4, 9]

Класс Process

В multiprocessing процессы порождаются путём создания объекта Process и последующего вызова его метода start(). Process следует API threading.Thread. Тривиальный пример многопроцессорной программы:

from multiprocessing import Process

def f(name):
    print('hello', name)

if __name__ == '__main__':
    p = Process(target=f, args=('bob',))
    p.start()
    p.join()

Вот расширенный пример, чтобы показать идентификаторы отдельных процессов:

from multiprocessing import Process
import os

def info(title):
    print(title)
    print('module name:', __name__)
    print('parent process:', os.getppid())
    print('process id:', os.getpid())

def f(name):
    info('function f')
    print('hello', name)

if __name__ == '__main__':
    info('main line')
    p = Process(target=f, args=('bob',))
    p.start()
    p.join()

Для объяснения того, почему необходима часть if __name__ == '__main__', см. Руководство по программированию.

Контексты и методы запуска

В зависимости от платформы multiprocessing поддерживает три способа запуска процесса. Далее перечислены методы запуска.

spawn

Родительский процесс запускает новый процесс интерпретатора Python. Дочерний процесс унаследует только те ресурсы, которые необходимы для выполнения метода run() объекта процесса. В частности, ненужные файловые дескрипторы и дескрипторы родительского процесса не будут унаследованы. Запуск процесса с использованием этого метода довольно медленный по сравнению с использованием fork или forkserver.

Доступен в Unix и Windows. По умолчанию в Windows и macOS.

fork

Родительский процесс использует os.fork() для разветвления интерпретатора Python. Когда начинается дочерний процесс, фактически идентичен родительскому процессу. Все ресурсы родителя наследуются дочерним процессом. Обратите внимание, что безопасное разветвление многопоточного процесса проблематично.

Доступен только в Unix. По умолчанию в Unix.

forkserver

С этого момента всякий раз, когда требуется новый процесс, родительский процесс подключается к серверу и запрашивает у него форк для нового процесса. Процесс сервера форка является однопоточным, поэтому для него безопасно использовать os.fork(). Никакие ненужные ресурсы не наследуются.

Доступен на платформах Unix, которые поддерживают передачу файловых дескрипторов по каналам (pipes) Unix.

Изменено в версии 3.8: В macOS метод запуска spawn теперь используется по умолчанию. Метод запуска fork следует считать небезопасным, поскольку он может привести к сбою подпроцесса. См. bpo-33725.

Изменено в версии 3.4: spawn добавлен на всех unix платформах, а forkserver добавлен для некоторых платформ unix. Дочерние процессы больше не наследуют все родительские дескрипторы в Windows.

В Unix с использованием методов запуска spawn или forkserver также запускается процесс трекер ресурсов, который отслеживает несвязанные именованные системные ресурсы (такие как именованные семафоры или объекты SharedMemory), созданные процессами программы. Когда все процессы завершатся, трекер ресурсов отсоединит все оставшиеся отслеживаемые объекты. Обычно их не должно быть, но если процесс был остановлен сигналом, могут быть «утечки» ресурсов. (Ни протекающие семафоры, ни сегменты разделяемой памяти не будут автоматически разъединены до следующей перезагрузки. Это проблематично для обоих объектов, поскольку система допускает только ограниченное количество именованных семафоров, а сегменты разделяемой памяти занимают некоторое пространство в основной памяти.)

Чтобы выбрать метод запуска, используйте set_start_method() в предложении if __name__ == '__main__' основного модуля. Например:

import multiprocessing as mp

def foo(q):
    q.put('hello')

if __name__ == '__main__':
    mp.set_start_method('spawn')
    q = mp.Queue()
    p = mp.Process(target=foo, args=(q,))
    p.start()
    print(q.get())
    p.join()

set_start_method() не следует использовать в программе более одного раза.

В качестве альтернативы вы можете использовать get_context() для получения объекта контекста. Объекты контекста имеют тот же API, что и модуль multiprocessing, и позволяют использовать несколько методов запуска в одной программе.

import multiprocessing as mp

def foo(q):
    q.put('hello')

if __name__ == '__main__':
    ctx = mp.get_context('spawn')
    q = ctx.Queue()
    p = ctx.Process(target=foo, args=(q,))
    p.start()
    print(q.get())
    p.join()

Обратите внимание, что объекты, относящиеся к одному контексту, могут быть несовместимы с процессами для другого контекста. В частности, блокировки, созданные с использованием контекста fork, не могут быть переданы процессам, запущенным с использованием методов запуска spawn или forkserver.

Библиотека, которая хочет использовать определенный метод запуска, вероятно, должна использовать get_context(), чтобы не мешать выбору пользователя библиотеки.

Предупреждение

Методы запуска 'spawn' и 'forkserver' в настоящее время не могут использоваться с «замороженными» исполняемыми файлами (т. е. двоичными файлами, созданными такими пакетами, как PyInstaller и cx_Freeze) в Unix. Метод запуска 'fork' действительно работает.

Обмен объектами между процессами

multiprocessing поддерживает два типа конвейеров связи между процессами:

Очереди

Класс Queue является почти клоном queue.Queue. Например:

from multiprocessing import Process, Queue

def f(q):
    q.put([42, None, 'hello'])

if __name__ == '__main__':
    q = Queue()
    p = Process(target=f, args=(q,))
    p.start()
    print(q.get())    # распечатает "[42, None, 'hello']"
    p.join()

Очереди безопасны для потоков и процессов.

Конвейеры (pipes)

Функция Pipe() возвращает пару объектов соединения, соединенных конвейером (pipe), который по умолчанию является дуплексным (двусторонним). Например:

from multiprocessing import Process, Pipe

def f(conn):
    conn.send([42, None, 'hello'])
    conn.close()

if __name__ == '__main__':
    parent_conn, child_conn = Pipe()
    p = Process(target=f, args=(child_conn,))
    p.start()
    print(parent_conn.recv())   # распечатает "[42, None, 'hello']"
    p.join()

Два объекта подключения, возвращаемые Pipe(), представляют собой два конца конвейера. Каждый объект подключения содержит методы send() и recv() (среди прочих). Обратите внимание, что данные в конвейере могут быть повреждены, если два процесса (или потока) попытаются одновременно прочитать или записать на конец конвейера одно и тоже. Конечно, нет риска повреждения из-за процессов, использующих разные концы конвейер одновременно.

Синхронизация процессов

multiprocessing содержит эквиваленты всех примитивов синхронизации из threading. Например, можно использовать блокировку, чтобы гарантировать, что только один процесс печатает в стандартный вывод за раз:

from multiprocessing import Process, Lock

def f(l, i):
    l.acquire()
    try:
        print('hello world', i)
    finally:
        l.release()

if __name__ == '__main__':
    lock = Lock()

    for num in range(10):
        Process(target=f, args=(lock, num)).start()

Без использования блокировки вывода из разных процессов может все запутаться.

Обмен состояниями между процессами

Как упоминалось выше, при параллельном программировании обычно лучше избегать использования общего состояния, насколько это возможно. Это особенно верно при использовании нескольких процессов.

Однако, если вам действительно нужно использовать некоторые общие данные, multiprocessing предоставляет несколько способов сделать это.

Общая память

Данные могут храниться в отображении общей памяти с использованием Value или Array. Например, следующий код

from multiprocessing import Process, Value, Array

def f(n, a):
    n.value = 3.1415927
    for i in range(len(a)):
        a[i] = -a[i]

if __name__ == '__main__':
    num = Value('d', 0.0)
    arr = Array('i', range(10))

    p = Process(target=f, args=(num, arr))
    p.start()
    p.join()

    print(num.value)
    print(arr[:])

Напечатает:

3.1415927
[0, -1, -2, -3, -4, -5, -6, -7, -8, -9]

Аргументы 'd' и 'i', используемые при создании num и arr, являются кодами типа того типа, который используется модулем array: 'd' указывает на число с плавающей запятой двойной точности, а 'i' указывает на целое число со знаком. Эти общие объекты будут технологическими и поточно-ориентированными.

Для большей гибкости в использовании разделяемой памяти можно использовать модуль multiprocessing.sharedctypes, который поддерживает создание произвольных объектов ctypes, выделенных из разделяемой памяти.

Серверный процесс

Объект-менеджер, возвращаемый Manager(), управляет серверным процессом, который содержит объекты Python и позволяет другим процессам управлять ими с помощью прокси.

Менеджер возвращенный Manager() поддержит типы list, dict, Namespace, Lock, RLock, Semaphore, BoundedSemaphore, Condition, Event, Barrier, Queue, Value и Array. Например:

from multiprocessing import Process, Manager

def f(d, l):
    d[1] = '1'
    d['2'] = 2
    d[0.25] = None
    l.reverse()

if __name__ == '__main__':
    with Manager() as manager:
        d = manager.dict()
        l = manager.list(range(10))

        p = Process(target=f, args=(d, l))
        p.start()
        p.join()

        print(d)
        print(l)

Напечатает:

{0.25: None, 1: '1', '2': 2}
[9, 8, 7, 6, 5, 4, 3, 2, 1, 0]

Менеджеры серверных процессов более гибкие, чем использование объектов общей памяти, потому что они могут быть созданы для поддержки произвольных типов объектов. Кроме того, один менеджер может использоваться совместно процессами на разных компьютерах в сети. Однако они медленнее, чем при использовании общей памяти.

Использование пула рабочих

Класс Pool представляет собой пул рабочих процессов. У него есть методы, которые позволяют передавать задачи рабочим процессам несколькими способами.

Например:

from multiprocessing import Pool, TimeoutError
import time
import os

def f(x):
    return x*x

if __name__ == '__main__':
    # запуск 4 рабочих процессов
    with Pool(processes=4) as pool:

        # печать "[0, 1, 4,..., 81]"
        print(pool.map(f, range(10)))

        # печатать одинаковые числа в произвольном порядке
        for i in pool.imap_unordered(f, range(10)):
            print(i)

        # вычислить "f(20)" асинхронно
        res = pool.apply_async(f, (20,))      # запускается в *только* одном процессе
        print(res.get(timeout=1))             # печатает "400"

        # вычислить "os.getpid()" асинхронно
        res = pool.apply_async(os.getpid, ()) # запускается в *только* одном процессе
        print(res.get(timeout=1))             # печатает PID этого процесса

        # запуск нескольких оценок асинхронно *может* использовать больше процессов
        multiple_results = [pool.apply_async(os.getpid, ()) for i in range(4)]
        print([res.get(timeout=1) for res in multiple_results])

        # уснуть одному рабочему на 10 секунд
        res = pool.apply_async(time.sleep, (10,))
        try:
            print(res.get(timeout=1))
        except TimeoutError:
            print("We lacked patience and got a multiprocessing.TimeoutError")

        print("For the moment, the pool remains available for more work")

    # выход из блока with с остановкой пула
    print("Now the pool is closed and no longer available")

Обратите внимание, что методы пула должны использоваться только тем процессом, который его создал.

Примечание

Функциональность в этом пакете требует, чтобы модуль __main__ был импортирован дочерними элементами. Это описано в Руководство по программированию, однако здесь стоит указать, что некоторые примеры, такие как multiprocessing.pool.Pool, не будут работать в интерактивном интерпретаторе. Например:

>>> from multiprocessing import Pool
>>> p = Pool(5)
>>> def f(x):
...     return x*x
...
>>> with p:
...   p.map(f, [1,2,3])
Process PoolWorker-1:
Process PoolWorker-2:
Process PoolWorker-3:
Traceback (most recent call last):
AttributeError: 'module' object has no attribute 'f'
AttributeError: 'module' object has no attribute 'f'
AttributeError: 'module' object has no attribute 'f'

(Если выполнить это, то на самом деле будет выведено три полных трейсбэка, перемежающихся полуслучайным образом, а затем, возможно, придется как-то остановить родительский процесс.)

Справочник

Пакет multiprocessing в основном реплицирует API модуля threading.

Process и исключения

class multiprocessing.Process(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None)

Объекты процессов представляют собой действия, которые выполняются в отдельном процессе. Класс Process содержит эквиваленты всех методов threading.Thread.

Конструктор всегда должен вызываться с ключевыми аргументами. group всегда должен быть None; он существует исключительно для совместимости с threading.Thread. target — это вызываемый объект, вызываемый методом run(). По умолчанию используется None, что означает, что ничего не вызывается. name — это имя процесса (подробнее см. name). args — это кортеж аргументов для целевого вызова. kwargs — это словарь ключевых аргументов для целевого вызова. Если указан аргумент daemon, содержащий только ключевой аргумент, для флага процесса daemon устанавливается значение True или False. Если None (по умолчанию), этот флаг будет унаследован от процесса создания.

По умолчанию в target не передаются аргументы.

Если подкласс переопределяет конструктор, он должен убедиться, что он вызывает конструктор базового класса (Process.__init__()), прежде чем делать что-либо ещё с процессом.

Изменено в версии 3.3: Добавлен аргумент daemon.

run()

Метод, представляющий активность процесса.

Вы можете переопределить этот метод в подклассе. Стандартный метод run() вызывает вызываемый объект, переданный конструктору объекта в качестве целевого аргумента, если таковой имеется, с последовательными аргументами и ключевыми аргументами, взятыми из аргументов args и kwargs соответственно.

start()

Начать работу процесса.

Он должен вызываться не более одного раза для каждого объекта процесса. Он организует вызов метода объекта run() в отдельном процессе.

join([timeout])

Если необязательный аргумент timeoutNone (по умолчанию), метод блокируется до тех пор, пока не завершится процесс, чей метод join() вызван. Если timeout — положительное число, блокируется не более timeout секунд. Обратите внимание, что метод возвращает None, если его процесс завершается или время ожидания метода истекло. Проверьте exitcode процесса, чтобы определить, завершился ли он.

К процессу можно присоединяться много раз.

Процесс не может присоединиться к самому себе, потому что это приведет к тупиковой ситуации. Попытка присоединиться к процессу до его запуска является ошибкой.

name

Имя процесса. Имя — это строка, используемая только для идентификации. У него нет семантики. Одно и то же имя может быть присвоено нескольким процессам.

Начальное имя задается конструктором. Если конструктору не указано явное имя, создается имя формы «Process-N1:N2:…:Nk», где каждый Nk является N-м дочерним элементом своего родителя.

is_alive()

Возвращает, жив ли процесс.

Грубо говоря, объект процесса жив с момента возврата метода start() до завершения дочернего процесса.

daemon

Флаг демона процесса, логическое значение. Это должно быть установлено до вызова start().

Начальное значение наследуется от процесса создания.

Когда процесс завершается, он пытается завершить все свои демонические дочерние процессы.

Обратите внимание, что демонический процесс не может создавать дочерние процессы. В противном случае демонический процесс оставит своих дочерних процессов сиротами, если он будет завершен при выходе из родительского процесса. Кроме того, это демоны или службы не Unix, это обычные процессы, которые будут завершены (а не присоединены), если завершились недемонические процессы.

Помимо API threading.Thread, объекты Process также поддерживают следующие атрибуты и методы

pid

Вернуть идентификатор процесса. Перед тем, как процесс будет запущен, это будет None.

exitcode

Это будет None, если процесс еще не завершен. Отрицательное значение -N указывает, что дочерний элемент был прерван сигналом N.

authkey

Ключ аутентификации процесса (байтовая строка).

При инициализации multiprocessing главному процессу назначается случайная строка с использованием os.urandom().

При создании объекта Process он унаследует ключ аутентификации своего родительского процесса, хотя это можно изменить, установив authkey на другую байтовую строку.

См. Ключи аутентификации.

sentinel

Числовой дескриптор системного объекта, который станет «готовым» по окончании процесса.

Вы можете использовать это значение, если хотите дождаться нескольких событий одновременно, используя multiprocessing.connection.wait(). В противном случае проще вызвать join().

В Windows это дескриптор ОС, который можно использовать с семейством вызовов API WaitForSingleObject и WaitForMultipleObjects. В Unix это файловый дескриптор, который можно использовать с примитивами из модуля select.

Добавлено в версии 3.3.

terminate()

Прекратить процесс. В Unix это делается с помощью сигнала SIGTERM; в Windows используется TerminateProcess(). Обратите внимание, что обработчики выхода, предложения finally и т. д. не будут выполняться.

Обратите внимание, что дочерние процессы процесса не будут завершены - они просто станут осиротевшими.

Предупреждение

Если этот метод используется, когда связанный процесс использует конвейер или очередь, тогда конвейер или очередь могут быть повреждены и могут стать непригодными для использования другим процессом. Точно так же, если процесс получил блокировку или семафор и т. д., его завершение может привести к взаимоблокировке других процессов.

kill()

Такой же, как terminate(), но с использованием сигнала SIGKILL в Unix.

Добавлено в версии 3.7.

close()

Закрыть объект Process, освободив все связанные с ним ресурсы. Поднимается ValueError, если основной процесс всё ещё выполняется. После успешного возврата close() большинство других методов и атрибутов объекта Process вызовут ValueError.

Добавлено в версии 3.7.

Обратите внимание, что методы start(), join(), is_alive(), terminate() и exitcode должны вызываться только процессом, создавшим объект процесса.

Пример использования некоторых методов Process :

 >>> import multiprocessing, time, signal
 >>> p = multiprocessing.Process(target=time.sleep, args=(1000,))
 >>> print(p, p.is_alive())
 <Process ... initial> False
 >>> p.start()
 >>> print(p, p.is_alive())
 <Process ... started> True
 >>> p.terminate()
 >>> time.sleep(0.1)
 >>> print(p, p.is_alive())
 <Process ... stopped exitcode=-SIGTERM> False
 >>> p.exitcode == -signal.SIGTERM
 True
exception multiprocessing.ProcessError

Базовый класс всех исключений multiprocessing.

exception multiprocessing.BufferTooShort

Исключение, вызываемое Connection.recv_bytes_into(), когда предоставленный буферный объект слишком мал для прочитанного сообщения.

Если e является экземпляром BufferTooShort, то e.args[0] выдаст сообщение в виде байтовой строки.

exception multiprocessing.AuthenticationError

Возникает при ошибке аутентификации.

exception multiprocessing.TimeoutError

Возникает методами с таймаутом по истечении тайм-аута.

Пайпы и очереди

При использовании нескольких процессов обычно используется передача сообщений для связи между процессами и избегается необходимость использования каких-либо примитивов синхронизации, таких как блокировки.

Для передачи сообщений можно использовать Pipe() (для соединения между двумя процессами) или очередь (что позволяет нескольких производителей и потребителей).

Типы Queue, SimpleQueue и JoinableQueue представляют собой очереди FIFO с несколькими производителями и несколькими потребителями, смоделированные на основе класса queue.Queue в стандартной библиотеке. Они отличаются тем, что в Queue отсутствуют методы task_done() и join(), представленные в классе queue.Queue Python 2.5.

Если вы используете JoinableQueue, вы должны вызывать JoinableQueue.task_done() для каждой задачи, удаленной из очереди, иначе семафор, используемый для подсчета количества незавершенных задач, может в конечном итоге переполниться, вызывая исключение.

Обратите внимание, что можно также создать общую очередь с помощью объекта- менеджера — см. Менеджеры.

Примечание

multiprocessing использует обычные исключения queue.Empty и queue.Full для сигнала тайм-аута. Они недоступны в пространстве имён multiprocessing, поэтому вам необходимо импортировать их из queue.

Примечание

Когда объект помещается в очередь, объект обрабатывается, а фоновый поток позже сбрасывает обработанные данные в нижележащий конвейер. Это имеет некоторые последствия, которые немного удивительны, но не должны вызывать каких-либо практических трудностей, — если они действительно беспокоят вас, вы можете вместо этого использовать очередь, созданную с помощью менеджера.

  1. После помещения объекта в пустую очередь может произойти бесконечная задержка перед тем, как метод empty() очереди возвращает False и get_nowait() может возвращает без повышения queue.Empty.
  2. Если несколько процессов ставят в очередь объекты, возможно, чтобы объекты были приняты на другом конце вне порядка. Однако объекты, поставленные в очередь одним и тем же процессом, всегда будут находиться в ожидаемом порядке относительно друг друга.

Предупреждение

Если процесс завершается с использованием Process.terminate() или os.kill(), когда он пытается использовать Queue, то данные в очереди, вероятно, будут повреждены. Это может привести к тому, что любой другой процесс получит исключение, когда он попытается использовать очередь позже.

Предупреждение

Как упоминалось выше, если дочерний процесс поместил элементы в очередь (и он не использовал JoinableQueue.cancel_join_thread), то этот процесс не завершится, пока все буферизованные элементы не будут сброшены в конвейер.

Это означает, что если вы попытаетесь присоединиться к этому процессу, вы можете попасть в тупик, если не уверены, что все элементы, помещенные в очередь, были израсходованы. Точно так же, если дочерний процесс не является демоническим, то родительский процесс может зависнуть при выходе, когда он пытается присоединиться ко всем своим недемоническим дочерним процессам.

Обратите внимание, что очередь, созданная с помощью диспетчера, не содержит этой проблемы. См. Руководство по программированию.

Пример использования очередей для межпроцессной связи см. в разделе Примеры.

multiprocessing.Pipe([duplex])

Возвращает пару (conn1, conn2) из Connection объектов, представляющих концы конвейера.

Если duplexTrue (по умолчанию), то конвейер является двунаправленным. Если duplexFalse, тогда конвейера является однонаправленными: conn1 можно использовать только для приема сообщений, а conn2 можно использовать только для отправки сообщений.

class multiprocessing.Queue([maxsize])

Возвращает общую очередь процесса, реализованную с использованием конвейера и нескольких блокировок/семафоров. Когда процесс впервые помещает элемент в очередь, запускается поток, который передает объекты из буфера в конвейер.

Обычные исключения queue.Empty и queue.Full из модуля queue стандартной библиотеки вызываются для сигналов тайм-аутов.

Queue реализует все методы queue.Queue, кроме task_done() и join().

qsize()

Возвращает приблизительный размер очереди. Из-за семантики многопоточности/многопроцессорности это число не является надежным.

Обратите внимание, что это может вызвать NotImplementedError на платформах Unix, таких как Mac OS X, где sem_getvalue() не реализован.

empty()

Возвращает True, если очередь пуста, в противном случае — False. Из-за семантики многопоточности/многопроцессорности ненадежно.

full()

Возвращает True, если очередь заполнена, в противном случае — False. Из-за семантики многопоточности/многопроцессорности ненадежно.

put(obj[, block[, timeout]])

Поместить obj в очередь. Если необязательный аргумент blockTrue (по умолчанию), а timeoutNone (по умолчанию), при необходимости заблокируйте, пока не станет доступен свободный слот. Если timeout является положительным числом, он блокирует не более timeout секунд и вызывает исключение queue.Full, если в течение этого времени не было доступного свободного слота. В противном случае (blockFalse) поместить элемент в очередь, если свободный слот доступен немедленно, иначе вызвать исключение queue.Full (timeout в этом случае игнорируется).

Изменено в версии 3.8: Если очередь закрыта, то вместо AssertionError поднимается ValueError.

put_nowait(obj)

Эквивалент put(obj, False).

get([block[, timeout]])

Удаление и возврат элемента из очереди. Если необязательные аргументы blockTrue (по умолчанию), а timeoutNone (по умолчанию), заблокировать, если необходимо, до тех пор, пока элемент не станет доступным. Если timeout является положительным числом, он блокируется не более timeout секунд и вызывает исключение queue.Empty, если в течение этого времени ни один элемент не был доступен. В противном случае (блок — False) вернуть элемент, если он доступен немедленно, иначе вызовите исключение queue.Empty (в этом случае timeout игнорируется).

Изменено в версии 3.8: Если очередь закрыта, то вместо OSError поднимается ValueError.

get_nowait()

Эквивалент get(False).

multiprocessing.Queue содержит несколько дополнительных методов, которых нет в queue.Queue. Эти методы обычно не нужны для большей части кода :

close()

Указать, что текущий процесс больше не будет помещать данные в эту очередь. Фоновый поток завершится, как только он сбросит все буферизованные данные в конвейер. Это вызывается автоматически при сборке мусора.

join_thread()

Присоединиться к фоновому потоку. Это можно использовать только после вызова close(). Он блокируется до тех пор, пока фоновый поток не завершится, гарантируя, что все данные в буфере будут сброшены в конвейер.

По умолчанию, если процесс не является создателем очереди, при выходе он попытается присоединиться к фоновому потоку очереди. Процесс может вызвать cancel_join_thread(), чтобы join_thread() ничего не делал.

cancel_join_thread()

Предотвращение блокировки join_thread(). В частности, это предотвращает автоматическое присоединение фонового потока при выходе из процесса. См. join_thread().

Лучшее название для этого метода - allow_exit_without_flush(). Это может привести к потере данных в очереди, и вам почти наверняка не нужно будет их использовать. Это действительно только там, если вам нужно, чтобы текущий процесс немедленно завершился, не дожидаясь сброса данных из очереди в базовый конвейер, и вас не заботят потерянные данные.

Примечание

Функциональность этого класса требует работающей реализации общего семафора в операционной системе хоста. Без него функциональность в этом классе будет отключена, и попытки создать экземпляр Queue приведут к ImportError. См. bpo-3770 для получения дополнительной информации. То же самое верно для любого из специализированных типов очередей, перечисленных ниже.

class multiprocessing.SimpleQueue

Упрощенный тип Queue, очень близкий к заблокированному Pipe.

empty()

Возвращает True, если очередь пуста, False иначе.

get()

Удаление и возврат элемента из очереди.

put(item)

Поместить item в очередь.

class multiprocessing.JoinableQueue([maxsize])

JoinableQueue, подкласс Queue, представляет собой очередь, которая дополнительно содержит методы task_done() и join().

task_done()

Указать, что задача, ранее поставленная в очередь, завершена. Используется потребителями очереди. Для каждого get(), используемого для выборки задачи, последующий вызов task_done() сообщает очереди, что обработка задачи завершена.

Если join() в настоящее время блокируется, он возобновится, когда все элементы будут обработаны (это означает, что вызов task_done() был получен для каждого элемента, который был put() в очереди).

Поднимает ValueError, если вызывается больше раз, чем было помещено в очередь элементов.

join()

Блокировать до тех пор, пока все элементы в очереди не будут получены и обработаны.

Количество незавершенных задач увеличивается всякий раз, когда элемент добавляется в очередь. Счетчик уменьшается всякий раз, когда потребитель вызывает task_done(), чтобы указать, что элемент был получен и вся работа над ним завершена. Когда количество незавершенных задач падает до нуля, join() разблокируется.

Разное

multiprocessing.active_children()

Возвращает список всех живых потомков текущего процесса.

Вызов содержит побочный эффект «присоединения» к уже завершенным процессам.

multiprocessing.cpu_count()

Возвращает количество процессоров в системе.

Это число не эквивалентно количеству процессоров, которые может использовать текущий процесс. Количество используемых процессоров можно узнать с помощью len(os.sched_getaffinity(0))

Может поднять NotImplementedError.

См.также

os.cpu_count()

multiprocessing.current_process()

Возвращает объект Process, соответствующий текущему процессу.

Аналог threading.current_thread().

multiprocessing.parent_process()

Возвращает объект Process, соответствующий родительскому процессу current_process(). Для основного процесса parent_process будет None.

Добавлено в версии 3.8.

multiprocessing.freeze_support()

Добавить поддержку, когда программа, использующая multiprocessing, была заморожена для создания исполняемого файла Windows. (Был протестирован с py2exe, PyInstaller и cx_Freeze.)

Эту функцию нужно вызывать сразу после строки if __name__ == '__main__' основного модуля. Например:

from multiprocessing import Process, freeze_support

def f():
    print('hello world!')

if __name__ == '__main__':
    freeze_support()
    Process(target=f).start()

Если строка freeze_support() пропущена, то попытка запустить замороженный исполняемый файл вызовет RuntimeError.

Вызов freeze_support() не действует при вызове в любой операционной системе, кроме Windows. Кроме того, если модуль нормально запускается интерпретатором Python в Windows (программа не была заморожена), то freeze_support() не действует.

multiprocessing.get_all_start_methods()

Возвращает список поддерживаемых методов запуска, первый из которых используется по умолчанию. Возможные методы запуска: 'fork', 'spawn' и 'forkserver'. В Windows доступен только 'spawn'. В Unix всегда поддерживаются 'fork' и 'spawn', по умолчанию — 'fork'.

Добавлено в версии 3.4.

multiprocessing.get_context(method=None)

Возвращает объект контекста, который содержит те же атрибуты, что и модуль multiprocessing.

Если methodNone, то возвращается контекст по умолчанию. В противном случае method должен быть 'fork', 'spawn', 'forkserver'. ValueError возникает, если указанный метод запуска недоступен.

Добавлено в версии 3.4.

multiprocessing.get_start_method(allow_none=False)

Возвращает имя метода запуска, используемого для запуска процессов.

Если метод запуска не был исправлен и allow_none содержит ложное значение, то метод запуска ремонтируется по умолчанию и возвращается имя. Если метод запуска не был исправлен и allow_none истинно, возвращается None.

Возвращаемое значение может быть 'fork', 'spawn', 'forkserver' или None. 'fork' используется по умолчанию в Unix, а 'spawn' - по умолчанию в Windows.

Добавлено в версии 3.4.

multiprocessing.set_executable()

Задаёт путь к интерпретатору Python, который будет использоваться при запуске дочернего процесса. (По умолчанию используется sys.executable). Встраивающим, вероятно, потребуется что-то вроде:

set_executable(os.path.join(sys.exec_prefix, 'pythonw.exe'))

прежде чем они смогут создавать дочерние процессы.

Изменено в версии 3.4: Теперь поддерживается в Unix при использовании метода запуска 'spawn'.

multiprocessing.set_start_method(method)

Установить метод, который следует использовать для запуска дочерних процессов. method может быть 'fork', 'spawn' или 'forkserver'.

Обратите внимание, что он должен вызываться не более одного раза, и он должен быть защищён в разделе if __name__ == '__main__' основного модуля.

Добавлено в версии 3.4.

Объекты подключения

Объекты подключения позволяют отправлять и получать пиклингуемые (picklable) объекты или строки. Их можно рассматривать как подключенные сокеты, ориентированные на сообщения.

Объекты подключения обычно создаются с использованием Pipe — см. также Слушатели и клиенты.

class multiprocessing.connection.Connection
send(obj)

Отправить объект на другой конец соединения, который следует прочитать с помощью команды recv().

Объект должен быть пиклингуемым. Очень большие пикли (приблизительно 32 MiB+, хотя это зависит от ОС) могут вызвать исключение ValueError.

recv()

Возвращает объект, отправленный с другого конца соединения, используя send(). Блокирует, пока не будет что получить. Поднимает EOFError, если нечего принял, а другой конец был закрыт.

fileno()

Возвращает файловый дескриптор или дескриптор, используемый соединением.

close()

Закрыть соединение.

Он вызывается автоматически при сборке мусора.

poll([timeout])

Возвращает, есть ли данные, доступные для чтения.

Если timeout не указан, он немедленно вернется. Если timeout — это число, то это указывает максимальное время в секундах для блокировки. Если timeoutNone, то используется бесконечный тайм-аут.

Обратите внимание, что с помощью multiprocessing.connection.wait() можно опрашивать сразу несколько объектов подключения.

send_bytes(buffer[, offset[, size]])

Отправить байтовые данные из байтоподобного объекта в виде полного сообщения.

Если задано offset, данные считываются с этой позиции в buffer. Если задано size, то это количество байтов будет прочитано из буфера. Очень большие буферы (примерно 32 MiB+, хотя это зависит от ОС) могут вызвать исключение ValueError

recv_bytes([maxlength])

Возвращает полное сообщение байтовых данных, отправленных с другого конца соединения, в виде строки. Блокирует, пока не будет что получить. Вызывает EOFError, если нечего принимать и другой конец закрыт.

Если указан maxlength и длина сообщения превышает maxlength, то возникает OSError и соединение становится недоступным для чтения.

Изменено в версии 3.3: Эта функция использовалась для вызова IOError, который теперь является псевдонимом OSError.

recv_bytes_into(buffer[, offset])

Считывание в buffer полное сообщение байтовых данных, отправленных с другого конца соединения, и вернуть количество байтов в сообщении. Блокирует, пока не будет что получить. Вызывает EOFError, если нечего принимать, а другой конец был закрыт.

buffer должен быть байтоподобным объектом с возможностью записи. Если задано offset, то сообщение будет записано в буфер с этой позиции. Смещение должно быть неотрицательным целым числом меньше длины buffer (в байтах).

Если буфер слишком короткий, возникает исключение BufferTooShort, и полное сообщение доступно как e.args[0], где e является экземпляром исключения.

Изменено в версии 3.3: Сами объекты соединения теперь могут передаваться между процессами с помощью Connection.send() и Connection.recv().

Добавлено в версии 3.3: Объекты подключения теперь поддерживают протокол управления контекстом — см. Типы менеджера контекста. __enter__() возвращает объект подключения, а __exit__() вызывает close().

Например:

>>> from multiprocessing import Pipe
>>> a, b = Pipe()
>>> a.send([1, 'hello', None])
>>> b.recv()
[1, 'hello', None]
>>> b.send_bytes(b'thank you')
>>> a.recv_bytes()
b'thank you'
>>> import array
>>> arr1 = array.array('i', range(5))
>>> arr2 = array.array('i', [0] * 10)
>>> a.send_bytes(arr1)
>>> count = b.recv_bytes_into(arr2)
>>> assert count == len(arr1) * arr1.itemsize
>>> arr2
array('i', [0, 1, 2, 3, 4, 0, 0, 0, 0, 0])

Предупреждение

Метод Connection.recv() автоматически отключает полученные данные, что может быть угрозой безопасности, если вы не можете доверять процессу, отправившему сообщение.

Поэтому, если объект подключения не был создан с помощью Pipe(), следует использовать методы recv() и send() только после выполнения той или иной аутентификации. См. Ключи аутентификации.

Предупреждение

Если процесс погибает, когда он пытается прочитать или записать в конвейер, то данные в конвейере, вероятно, будут повреждены, поскольку может оказаться невозможным убедиться, где лежат границы сообщения.

Примитивы синхронизации

Как правило, примитивы синхронизации не так необходимы в многопроцессорной программе, как в многопоточной программе. См. документацию для модуля threading.

Обратите внимание, что можно также создавать примитивы синхронизации с помощью объекта-менеджера, см. Менеджеры.

class multiprocessing.Barrier(parties[, action[, timeout]])

Барьерный объект: клон threading.Barrier.

Добавлено в версии 3.3.

class multiprocessing.BoundedSemaphore([value])

Ограниченный семафорный объект: близкий аналог threading.BoundedSemaphore.

Существует единственное отличие от его близкого аналога: первый аргумент его метода acquire называется блок, что соответствует Lock.acquire().

Примечание

В Mac OS X он неотличим от Semaphore, поскольку sem_getvalue() не реализован на этой платформе.

class multiprocessing.Condition([lock])

Условная переменная: псевдоним для threading.Condition.

Если указан lock, то это должен быть объект Lock или RLock из multiprocessing.

Изменено в версии 3.3: Добавлен метод wait_for().

class multiprocessing.Event

Клон threading.Event.

class multiprocessing.Lock

Объект нерекурсивной блокировки: близкий аналог threading.Lock. Как только процесс или поток получил блокировку, последующие попытки получить её от любого процесса или потока будут блокироваться до тех пор, пока она не будет снята; любой процесс или поток может освободить его. Концепции и поведение threading.Lock применительно к потокам воспроизведены здесь, в multiprocessing.Lock, применительно к процессам или потокам, за исключением случаев, указанных выше.

Обратите внимание, что Lock на самом деле является функцией фабрикой, которая возвращает экземпляр multiprocessing.synchronize.Lock, инициализированный контекстом по умолчанию.

Lock поддерживает протокол контекстного менеджера и, таким образом, может использоваться в операторах with.

acquire(block=True, timeout=None)

Получить блокировку, блокировку или неблокирование.

Если для аргумента block задано значение True (по умолчанию), вызов метода будет блокироваться до тех пор, пока блокировка не перейдет в разблокированное состояние, а затем установит его в заблокированное состояние и вернёт True. Обратите внимание, что имя этого первого аргумента отличается от имени в threading.Lock.acquire().

Если для аргумента block задано значение False, вызов метода не блокируется. Если блокировка в настоящее время находится в заблокированном состоянии, вернётся False; в противном случае установите блокировку в заблокированное состояние и вернётся True.

При вызове с положительным значением с плавающей запятой для timeout блокировать максимум на количество секунд, указанное в timeout, пока блокировка не может быть получена. Вызовы с отрицательным значением timeout эквивалентны timeout, равному нулю. Вызовы со значением timeout None (по умолчанию) устанавливают бесконечный период ожидания. Обратите внимание, что обработка отрицательных значений или значений None для timeout отличается от реализованного поведения в threading.Lock.acquire(). Аргумент timeout не содержит практического значения, если аргумент block установлен в False и, таким образом, игнорируется. Возвращает True, если блокировка была получена, или False, если время ожидания истекло.

release()

Снять блокировку. Это может быть вызвано из любого процесса или потока, а не только из процесса или потока, который изначально получил блокировку.

Поведение такое же, как и в threading.Lock.release(), за исключением того, что при вызове разблокированной блокировки возникает ValueError.

class multiprocessing.RLock

Объект рекурсивной блокировки: близкий аналог threading.RLock. Рекурсивная блокировка должна быть снята процессом или потоком, который её получил. Как только процесс или поток получил рекурсивную блокировку, тот же процесс или поток может получить ее снова без блокировки; этот процесс или поток должен освобождать его один раз при каждом захвате.

Обратите внимание, что RLock на самом деле является функцией фабрикой, которая возвращает экземпляр multiprocessing.synchronize.RLock, инициализированный контекстом по умолчанию.

RLock поддерживает протокол term:контекстного менеджера <контекстный менеджер> и, таким образом, может использоваться в операторах with.

acquire(block=True, timeout=None)

Получить блокировку, блокировку или неблокирование.

При вызове с аргументом block, установленным в True, блокировка до тех пор, пока блокировка не перейдёт в разблокированное состояние (не принадлежит ни одному процессу или потоку), если блокировка уже не принадлежит текущему процессу или потоку. Текущий процесс или поток затем становится владельцем блокировки (если он еще не владеет), а уровень рекурсии внутри блокировки увеличивается на единицу, в результате чего возвращается значение True. Обратите внимание, что есть несколько отличий в поведении этого первого аргумента от реализации threading.RLock.acquire(), начиная с имени самого аргумента.

При вызове с аргументом block, установленным на False, не блокировать. Если блокировка уже была получена (и, следовательно, принадлежит) другому процессу или потоку, текущий процесс или поток не становится владельцем, и уровень рекурсии в блокировке не изменяется, в результате чего возвращается значение False. Если блокировка находится в разблокированном состоянии, текущий процесс или поток становится владельцем, и уровень рекурсии увеличивается, в результате чего возвращается значение True.

Использование и поведение аргумента timeout такие же, как и в Lock.acquire(). Обратите внимание, что некоторые из этих поведений timeout отличаются от реализованных поведений в threading.RLock.acquire().

release()

Снять блокировку, уменьшая уровень рекурсии. Если после декремента уровень рекурсии равен нулю, сбросить блокировку на разблокировку (не принадлежащую ни одному процессу или потоку), и если какие-либо другие процессы или потоки заблокированы в ожидании разблокировки блокировки, разрешите выполнение ровно одного из них. Если после декремента уровень рекурсии всё ещё не равен нулю, блокировка остается заблокированной и принадлежит вызывающему процессу или потоку.

Вызывайте этот метод только тогда, когда вызывающий процесс или поток владеет блокировкой. AssertionError возникает, если этот метод вызывается процессом или потоком, отличным от владельца, или если блокировка находится в разблокированном (незарегистрированном) состоянии. Обратите внимание, что тип исключения, возникающего в этой ситуации, отличается от реализованного поведения в threading.RLock.release().

class multiprocessing.Semaphore([value])

Семафорный объект: близкий аналог threading.Semaphore.

Существует единственное отличие от его близкого аналога: первый аргумент его метода acquire называется block, что соответствует Lock.acquire().

Примечание

В Mac OS X sem_timedwait не поддерживается, поэтому вызов acquire() с тайм-аутом будет имитировать поведение этой функции с использованием цикла ожидания.

Примечание

Если сигнал SIGINT, сгенерированный Ctrl-C, поступает, когда основной поток заблокирован вызовом BoundedSemaphore.acquire(), Lock.acquire(), RLock.acquire(), Semaphore.acquire(), Condition.acquire() или Condition.wait(), то вызов будет немедленно прерван и будет активирован KeyboardInterrupt.

Это отличается от поведения threading, где SIGINT будет игнорироваться, пока выполняются эквивалентные блокирующие вызовы.

Примечание

Для некоторых функций этого пакета требуется работающая реализация общего семафора в операционной системе хоста. Без него модуль multiprocessing.synchronize будет отключён, а попытки его импорта приведут к ImportError. См. bpo-3770 для получения дополнительной информации.

Общие объекты ctypes

Можно создавать общие объекты с использованием общей памяти, которая может быть унаследована дочерними процессами.

multiprocessing.Value(typecode_or_type, *args, lock=True)

Возврат ctypes объект, выделенный из общей памяти. По умолчанию возвращаемое значение фактически является синхронизированной обёрткой для объекта. Доступ к самому объекту можно получить через атрибут value в Value.

typecode_or_type определяет тип возвращаемого объекта: это либо тип ctypes, либо односимвольный код типа того типа, который используется модулем array. *args передается конструктору для типа.

Если lockTrue (по умолчанию), то создаётся новый объект рекурсивной блокировки для синхронизации доступа к значению. Если lock является объектом Lock или RLock, то он будет использоваться для синхронизации доступа к значению. Если lock — это False, то доступ к возвращаемому объекту не будет автоматически защищен блокировкой, поэтому он не обязательно будет «безопасным для процесса».

Такие операции, как +=, которые включают чтение и запись, не являются атомарными. Так что, например, если вы хотите атомарно увеличить общее значение, этого недостаточно:

counter.value += 1

Предполагая, что соответствующая блокировка является рекурсивной (что по умолчанию), вы можете сделать это:

with counter.get_lock():
    counter.value += 1

Обратите внимание, что lock — это ключевой аргумент.

multiprocessing.Array(typecode_or_type, size_or_initializer, *, lock=True)

Возвращает массив ctypes, выделенный из общей памяти. По умолчанию возвращаемое значение фактически является синхронизированной обёрткой для массива.

typecode_or_type определяет тип элементов возвращаемого массива: это либо тип ctypes, либо односимвольный код типа того типа, который используется модулем array. Если size_or_initializer является целым числом, то оно определяет длину массива, и изначально массив будет обнулен. В противном случае size_or_initializer — это последовательность, которая используется для инициализации массива, длина которой определяет длину массива.

Если lock — это True (по умолчанию), то создается новый объект блокировки для синхронизации доступа к значению. Если lock является объектом Lock или RLock, то он будет использоваться для синхронизации доступа к значению. Если lock - это False, то доступ к возвращаемому объекту не будет автоматически защищен блокировкой, поэтому он не обязательно будет «безопасным для процесса».

Обратите внимание, что lock - это только ключевой аргумент.

Обратите внимание, что массив ctypes.c_char содержит атрибуты value и raw, которые позволяют использовать его для хранения и извлечения строк.

Модуль multiprocessing.sharedctypes

Модуль multiprocessing.sharedctypes предоставляет функции для выделения объектов ctypes из общей памяти, которые могут быть унаследованы дочерними процессами.

Примечание

Хотя можно сохранить указатель в разделяемой памяти, помните, что он будет относиться к месту в адресном пространстве определенного процесса. Однако указатель, скорее всего, окажется недействительным в контексте второго процесса, и попытка разыменования указателя из второго процесса может вызвать сбой.

multiprocessing.sharedctypes.RawArray(typecode_or_type, size_or_initializer)

Возвращает массив ctypes, выделенный из общей памяти.

typecode_or_type определяет тип элементов возвращаемого массива: это либо тип ctypes, либо односимвольный код типа того типа, который используется модулем array. Если size_or_initializer является целым числом, то оно определяет длину массива, и массив будет изначально обнулен. В противном случае size_or_initializer — это последовательность, которая используется для инициализации массива, длина которой определяет длину массива.

Обратите внимание, что установка и получение элемента потенциально не атомарны, — используйте вместо него Array(), чтобы обеспечить автоматическую синхронизацию доступа с помощью блокировки.

multiprocessing.sharedctypes.RawValue(typecode_or_type, *args)

Возвращает объект ctypes, выделенный из общей памяти.

typecode_or_type определяет тип возвращаемого объекта: это либо тип ctypes, либо односимвольный код типа того типа, который используется модулем array. *args передается конструктору типа.

Обратите внимание, что установка и получение значения потенциально не атомарны — вместо этого используйте Value(), чтобы обеспечить автоматическую синхронизацию доступа с помощью блокировки.

Обратите внимание, что массив ctypes.c_char содержит атрибуты value и raw, которые позволяют использовать его для хранения и извлечения строк см. документацию для ctypes.

multiprocessing.sharedctypes.Array(typecode_or_type, size_or_initializer, *, lock=True)

То же, что и RawArray(), за исключением того, что в зависимости от значения lock может быть возвращена безопасная для процесса обёртка синхронизации вместо необработанного массива ctypes.

Если lockTrue (по умолчанию), то создается новый объект блокировки для синхронизации доступа к значению. Если lock является объектом Lock или RLock, то он будет использоваться для синхронизации доступа к значению. Если lockFalse, то доступ к возвращаемому объекту не будет автоматически защищен блокировкой, поэтому он не обязательно будет «безопасным для процесса».

Обратите внимание, что lock — это только ключевой аргумент.

multiprocessing.sharedctypes.Value(typecode_or_type, *args, lock=True)

То же, что и RawValue(), за исключением того, что в зависимости от значения lock может быть возвращена безопасная для процесса обёртка синхронизации вместо необработанного объекта ctypes.

Если lockTrue (по умолчанию), то создаётся новый объект блокировки для синхронизации доступа к значению. Если lock является объектом Lock или RLock, то он будет использоваться для синхронизации доступа к значению. Если lock - это False, то доступ к возвращаемому объекту не будет автоматически защищен блокировкой, поэтому он не обязательно будет «безопасным для процесса».

Обратите внимание, что lock — это только ключевой аргумент.

multiprocessing.sharedctypes.copy(obj)

Возвращает объект ctypes, выделенный из общей памяти, который является копией объекта ctypes obj.

multiprocessing.sharedctypes.synchronized(obj[, lock])

Возвращает безопасный для процесса объект-обёртку для объекта ctypes, который использует lock для синхронизации доступа. Если lockNone (по умолчанию), то объект multiprocessing.RLock создаётся автоматически.

Синхронизированная обёртка будет содержать два метода в дополнение к методам объекта, который она обертывает: get_obj() возвращает завернутый объект, а get_lock() возвращает объект блокировки, используемый для синхронизации.

Обратите внимание, что доступ к объекту ctypes через обёртку может быть намного медленнее, чем доступ к необработанному объекту ctypes.

Изменено в версии 3.5: Синхронизированные объекты поддерживают протокол менеджера контекста.

В таблице ниже сравнивается синтаксис для создания общих объектов ctypes из общей памяти с обычным синтаксисом ctypes. (В таблице MyStruct — некоторый подкласс ctypes.Structure.)

ctypes sharedctypes используя тип sharedctypes используя typecode
c_double(2.4) RawValue(c_double, 2.4) RawValue(„d“, 2.4)
MyStruct(4, 6) RawValue(MyStruct, 4, 6)  
(c_short * 7)() RawArray(c_short, 7) RawArray(„h“, 7)
(c_int * 3)(9, 2, 8) RawArray(c_int, (9, 2, 8)) RawArray(„i“, (9, 2, 8))

Ниже приведен пример, в котором несколько объектов ctypes изменяются дочерним процессом:

from multiprocessing import Process, Lock
from multiprocessing.sharedctypes import Value, Array
from ctypes import Structure, c_double

class Point(Structure):
    _fields_ = [('x', c_double), ('y', c_double)]

def modify(n, x, s, A):
    n.value **= 2
    x.value **= 2
    s.value = s.value.upper()
    for a in A:
        a.x **= 2
        a.y **= 2

if __name__ == '__main__':
    lock = Lock()

    n = Value('i', 7)
    x = Value(c_double, 1.0/3.0, lock=False)
    s = Array('c', b'hello world', lock=lock)
    A = Array(Point, [(1.875,-6.25), (-5.75,2.0), (2.375,9.5)], lock=lock)

    p = Process(target=modify, args=(n, x, s, A))
    p.start()
    p.join()

    print(n.value)
    print(x.value)
    print(s.value)
    print([(a.x, a.y) for a in A])

Распечатанные результаты:

49
0.1111111111111111
HELLO WORLD
[(3.515625, 39.0625), (33.0625, 4.0), (5.640625, 90.25)]

Менеджеры

Менеджеры предоставляют способ создания данных, которые могут совместно использоваться разными процессами, включая обмен по сети между процессами, запущенными на разных машинах. Объект-менеджер управляет серверным процессом, который управляет общими объектами. Другие процессы могут получить доступ к общим объектам с помощью прокси.

multiprocessing.Manager()

Возвращает запущенный объект SyncManager, который может использоваться для разделения объектов между процессами. Возвращенный объект-менеджер соответствует порожденному дочернему процессу и содержит методы, которые будут создавать общие объекты и возвращать соответствующие прокси.

Управляющие процессы будут завершены, как только они будут собраны сборщиком мусора или их родительский процесс завершится. Классы менеджеров определены в модуле multiprocessing.managers :

class multiprocessing.managers.BaseManager([address[, authkey]])

Создать объект BaseManager.

После создания необходимо вызвать start() или get_server().serve_forever(), чтобы убедиться, что объект-менеджер ссылается на запущенный процесс-менеджер.

address — это адрес, по которому процесс-менеджер прослушивает новые соединения. Если addressNone, то выбирается произвольный.

authkey — это ключ аутентификации, который будет использоваться для проверки действительности входящих соединений с серверным процессом. Если authkeyNone, то используется current_process().authkey. В противном случае используется authkey, и это должна быть строка байтов.

start([initializer[, initargs]])

Запустить подпроцесс для запуска менеджера. Если initializer не None, тогда подпроцесс вызовет initializer(*initargs) при запуске.

get_server()

Возвращает объект Server, который представляет фактический сервер под управлением менеджера. Объект Server поддерживает метод serve_forever():

>>> from multiprocessing.managers import BaseManager
>>> manager = BaseManager(address=('', 50000), authkey=b'abc')
>>> server = manager.get_server()
>>> server.serve_forever()

Server дополнительно содержит атрибут address.

connect()

Подключить объект локального менеджера к процессу удаленного менеджера:

>>> from multiprocessing.managers import BaseManager
>>> m = BaseManager(address=('127.0.0.1', 50000), authkey=b'abc')
>>> m.connect()
shutdown()

Остановить процесс, используемый менеджером. Это доступно только в том случае, если start() использовался для запуска серверного процесса.

Его можно вызывать несколько раз.

register(typeid[, callable[, proxytype[, exposed[, method_to_typeid[, create_method]]]]])

Метод класса, который можно использовать для регистрации типа или вызова с помощью класса менеджера.

typeid — это «идентификатор типа», который используется для идентификации определенного типа совместно используемого объекта. Это должна быть строка.

callable — это вызываемый объект, используемый для создания объектов для этого идентификатора типа. Если экземпляр менеджера будет подключен к серверу с помощью метода connect() или если аргумент create_method содержит значение False, то его можно оставить как None.

proxytype является подклассом BaseProxy, который используется для создания прокси для общих объектов с этим typeid. Если None, тогда прокси-класс создается автоматически.

exposed используется для указания последовательности имён методов, к которым прокси для этого typeid должны иметь доступ с помощью BaseProxy._callmethod(). (Если exposedNone, то вместо него используется proxytype._exposed_, если он существует.) в случае, если не указан открытый список, будут доступны все «общедоступные методы» общего объекта. (Здесь «общедоступный метод» означает любой атрибут, который содержит метод __call__() и имя которого не начинается с '_'.)

method_to_typeid — это отображение, используемое для указания типа возвращаемого значения тех открытых методов, которые должны возвращать прокси. Он сопоставляет имена методов с типизированными строками. (Если method_to_typeid — это None, тогда вместо него используется proxytype._method_to_typeid_, если он существует.) Если имя метода не является ключом этого сопоставления или если сопоставление — None, то объект, возвращаемый методом, будет скопирован по значению.

create_method определяет, следует ли создавать метод с именем typeid, которое можно использовать, чтобы указать серверному процессу создать новый общий объект и вернуть для него прокси. По умолчанию это True.

Экземпляры BaseManager также имеют одно свойство только для чтения:

address

Адрес, используемый менеджером.

Изменено в версии 3.3: Управляющие объекты поддерживают протокол управления контекстом, см. Типы менеджера контекста. __enter__() запускает серверный процесс (если он еще не запущен), а затем возвращает объект- менеджер. __exit__() вызывает shutdown().

В предыдущих версиях __enter__() не запускал серверный процесс менеджера, если он ещё не был запущен.

class multiprocessing.managers.SyncManager

Подкласс BaseManager, который может использоваться для синхронизации процессов. Объекты этого типа возвращаются multiprocessing.Manager().

Его методы создают и возвращают Прокси объекты для ряда часто используемых типов данных для синхронизации между процессами. Это, в частности, включает общие списки и словари.

Barrier(parties[, action[, timeout]])

Создать общий объект threading.Barrier и вернуть для него прокси.

Добавлено в версии 3.3.

BoundedSemaphore([value])

Создать общий объект threading.BoundedSemaphore и вернуть для него прокси.

Condition([lock])

Создать общий объект threading.Condition и вернуть для него прокси.

Если предоставляется lock, он должен быть прокси для объекта threading.Lock или threading.RLock.

Изменено в версии 3.3: Добавлен метод wait_for().

Event()

Создать общий объект threading.Event и вернуть для него прокси.

Lock()

Создать общий объект threading.Lock и вернуть для него прокси.

Namespace()

Создать общий объект Namespace и вернуть для него прокси.

Queue([maxsize])

Создать общий объект queue.Queue и вернуть для него прокси.

RLock()

Создать общий объект threading.RLock и вернуть для него прокси.

Semaphore([value])

Создать общий объект threading.Semaphore и вернуть для него прокси.

Array(typecode, sequence)

Создать массив и вернуть для него прокси.

Value(typecode, value)

Создать объект с доступным для записи атрибутом value и вернуть для него прокси.

dict()
dict(mapping)
dict(sequence)

Создать общий объект dict и вернуть для него прокси.

list()
list(sequence)

Создать общий объект list и вернуть для него прокси.

Изменено в версии 3.6: Общие объекты могут быть вложенными. Например, общий контейнерный объект, такой как общий список, может содержать другие общие объекты, которые будут управляться и синхронизироваться SyncManager.

class multiprocessing.managers.Namespace

Тип, который может регистрироваться с помощью SyncManager.

Объект пространства имён не содержит общедоступных методов, но содержит атрибуты с возможностью записи. Его представление показывает значения его атрибутов.

Однако при использовании прокси для объекта пространства имён атрибут, начинающийся с '_', будет атрибутом прокси, а не атрибутом референта :

>>> manager = multiprocessing.Manager()
>>> Global = manager.Namespace()
>>> Global.x = 10
>>> Global.y = 'hello'
>>> Global._z = 12.3    # это атрибут прокси
>>> print(Global)
Namespace(x=10, y='hello')

Настраиваемые менеджеры

Чтобы создать своего собственного менеджера, нужно создать подкласс BaseManager и использовать метод класса register() для регистрации новых типов или вызываемых объектов в классе менеджера. Например:

from multiprocessing.managers import BaseManager

class MathsClass:
    def add(self, x, y):
        return x + y
    def mul(self, x, y):
        return x * y

class MyManager(BaseManager):
    pass

MyManager.register('Maths', MathsClass)

if __name__ == '__main__':
    with MyManager() as manager:
        maths = manager.Maths()
        print(maths.add(4, 3))         # распечатает 7
        print(maths.mul(7, 8))         # распечатает 56

Использование удаленного менеджера

Можно запустить управляющий сервер на одной машине, а клиенты будут использовать его с других машин (при условии, что задействованные брандмауэры это позволяют).

Выполнение следующих команд создаёт сервер для единой общей очереди, к которой могут получить доступ удаленные клиенты:

>>> from multiprocessing.managers import BaseManager
>>> from queue import Queue
>>> queue = Queue()
>>> class QueueManager(BaseManager): pass
>>> QueueManager.register('get_queue', callable=lambda:queue)
>>> m = QueueManager(address=('', 50000), authkey=b'abracadabra')
>>> s = m.get_server()
>>> s.serve_forever()

Один клиент может получить доступ к серверу следующим образом:

>>> from multiprocessing.managers import BaseManager
>>> class QueueManager(BaseManager): pass
>>> QueueManager.register('get_queue')
>>> m = QueueManager(address=('foo.bar.org', 50000), authkey=b'abracadabra')
>>> m.connect()
>>> queue = m.get_queue()
>>> queue.put('hello')

Его также может использовать другой клиент:

>>> from multiprocessing.managers import BaseManager
>>> class QueueManager(BaseManager): pass
>>> QueueManager.register('get_queue')
>>> m = QueueManager(address=('foo.bar.org', 50000), authkey=b'abracadabra')
>>> m.connect()
>>> queue = m.get_queue()
>>> queue.get()
'hello'

Локальные процессы также могут обращаться к этой очереди, используя указанный выше код на клиенте для удаленного доступа к ней:

>>> from multiprocessing import Process, Queue
>>> from multiprocessing.managers import BaseManager
>>> class Worker(Process):
...     def __init__(self, q):
...         self.q = q
...         super(Worker, self).__init__()
...     def run(self):
...         self.q.put('local hello')
...
>>> queue = Queue()
>>> w = Worker(queue)
>>> w.start()
>>> class QueueManager(BaseManager): pass
...
>>> QueueManager.register('get_queue', callable=lambda: queue)
>>> m = QueueManager(address=('', 50000), authkey=b'abracadabra')
>>> s = m.get_server()
>>> s.serve_forever()

Прокси объекты

Прокси — это объект, который ссылается на общий объект, который (предположительно) живет в другом процессе. Общий объект называется прокси ссылкой. У нескольких прокси-объектов может быть одна и та же ссылка.

Прокси-объект содержит методы, которые вызывают соответствующие методы его ссылки (хотя не каждый метод ссылки обязательно будет доступен через прокси). Таким образом, прокси можно использовать так же, как и его ссылку :

>>> from multiprocessing import Manager
>>> manager = Manager()
>>> l = manager.list([i*i for i in range(10)])
>>> print(l)
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
>>> print(repr(l))
<ListProxy object, typeid 'list' at 0x...>
>>> l[4]
16
>>> l[2:5]
[4, 9, 16]

Обратите внимание, что применение str() к прокси вернёт представление ссылки, тогда как применение repr() вернёт представление прокси.

Важной особенностью прокси-объектов является то, что их можно пиклить, поэтому их можно передавать между процессами. Таким образом, ссылка может содержать Прокси объекты. Это разрешает вложение этих управляемых списков, dict’ов и прочего Прокси объекты :

>>> a = manager.list()
>>> b = manager.list()
>>> a.append(b)         # ссылка a теперь содержит ссылку b
>>> print(a, b)
[<ListProxy object, typeid 'list' at ...>] []
>>> b.append('hello')
>>> print(a[0], b)
['hello'] ['hello']

Аналогичным образом, прокси dict и list могут быть вложены друг в друга:

>>> l_outer = manager.list([ manager.dict() for i in range(2) ])
>>> d_first_inner = l_outer[0]
>>> d_first_inner['a'] = 1
>>> d_first_inner['b'] = 2
>>> l_outer[1]['c'] = 3
>>> l_outer[1]['z'] = 26
>>> print(l_outer[0])
{'a': 1, 'b': 2}
>>> print(l_outer[1])
{'c': 3, 'z': 26}

Если стандартные (не прокси) объекты list или dict содержатся в ссылке, изменения этих изменяемых значений не будут распространяться через диспетчер, поскольку прокси не содержит возможности узнать, когда значения, содержащиеся в нём, изменены. Однако сохранение значения в прокси контейнера (которое вызывает __setitem__ на объекте-прокси) распространяется через диспетчер, и поэтому для эффективного изменения такого элемента можно повторно назначить измененное значение прокси контейнера:

# создать список прокси и добавить изменяемый объект (словарь)
lproxy = manager.list()
lproxy.append({})
# теперь изменяемый словарь
d = lproxy[0]
d['a'] = 1
d['b'] = 2
# на этом этапе изменения d еще не синхронизируются, но при обновлении словаря
# прокси уведомляется об изменении
lproxy[0] = d

Этот подход, возможно, менее удобен, чем использование вложенного Прокси объекты для большинства случаев использования, но также демонстрирует уровень контроля над синхронизацией.

Примечание

Типы прокси в multiprocessing не поддерживают сравнения по значению. Так, например, у нас есть:

>>> manager.list([1,2,3]) == [1,2,3]
False

Вместо этого следует использовать копию ссылки при проведении сравнений.

class multiprocessing.managers.BaseProxy

Прокси-объекты являются экземплярами подклассов BaseProxy.

_callmethod(methodname[, args[, kwds]])

Вызов и возврат результата метода ссылки прокси.

Если proxy — это прокси, ссылка которого — obj, тогда выражение:

proxy._callmethod(methodname, args, kwds)

Вычислит выражение:

getattr(obj, methodname)(*args, **kwds)

в процессе менеджера.

Возвращаемое значение будет копией результата вызова или прокси для нового общего объекта, см. документацию для аргумента method_to_typeid BaseManager.register().

Если при вызове возникает исключение, оно повторно вызывается _callmethod(). Если в процессе менеджера возникает какое-либо другое исключение, оно преобразуется в исключение RemoteError и вызывается _callmethod().

Обратите внимание, что исключение будет вызвано, если methodname не был exposed.

Пример использования _callmethod() :

>>> l = manager.list(range(10))
>>> l._callmethod('__len__')
10
>>> l._callmethod('__getitem__', (slice(2, 7),)) # эквивалентно l[2:7]
[2, 3, 4, 5, 6]
>>> l._callmethod('__getitem__', (20,))          # эквивалентно l[20]
Traceback (most recent call last):
...
IndexError: list index out of range
_getvalue()

Вернуть копию ссылки.

Если ссылку невозможно выбрать, вызовет исключение.

__repr__()

Возвращает представление прокси-объекта.

__str__()

Возвращает представление ссылки.

Очистка

Прокси-объект использует обратный вызов weakref, поэтому, когда он получает сборщик мусора, он отменяет регистрацию у менеджера, которому принадлежит его ссылка.

Общий объект удаляется из процесса менеджера, когда на него больше не ссылаются прокси.

Пулы процессов

Можно создать пул процессов, которые будут выполнять задачи, переданные ему с помощью класса Pool.

class multiprocessing.pool.Pool([processes[, initializer[, initargs[, maxtasksperchild[, context]]]]])

Он поддерживает асинхронные результаты с таймаутами и обратными вызовами и содержит реализацию параллельного отображения.

processes — количество используемых рабочих процессов. Если processes - None, то используется номер, возвращенный os.cpu_count().

Если initializer не None, то каждый рабочий процесс при запуске будет вызывать initializer(*initargs).

maxtasksperchild — это количество задач, которые рабочий процесс может выполнить до того, как он завершится и будет заменен новым рабочим процессом, чтобы освободить неиспользуемые ресурсы. По умолчанию maxtasksperchild - None, что означает, что рабочие процессы будут жить столько же, сколько и пул.

context может использоваться для указания контекста, используемого для запуска рабочих процессов. Обычно пул создается с помощью функции multiprocessing.Pool() или метода Pool() объекта контекста. В обоих случаях context установлен соответствующим образом.

Обратите внимание, что методы объекта пула должны вызываться только процессом, создавшим пул.

Предупреждение

Объекты multiprocessing.pool имеют внутренние ресурсы, которыми необходимо правильно управлять (как и любой другой ресурс), используя пул в качестве диспетчера контекста или вызывая close() и terminate() вручную. Невыполнение этого требования может привести к зависанию процесса при завершении.

Обратите внимание, что это неправильно, чтобы полагаться на сборщик мусора для уничтожения пула, поскольку CPython не гарантирует, что будет вызван финализатор пула (см. object.__del__() для получения дополнительной информации).

Добавлено в версии 3.2: maxtasksperchild

Добавлено в версии 3.4: context

Примечание

Рабочие процессы в Pool обычно живут в течение всего времени очереди работ пула. Часто используемый в других системах (например, Apache, mod_wsgi и т. д.) шаблон для освобождения ресурсов, удерживаемых воркерами, заключается в том, чтобы позволить воркеру в пуле выполнить только заданный объем работы перед выходом, очисткой и созданием нового процесса, заменив старый. Аргумент maxtasksperchild для Pool предоставляет эту возможность конечному пользователю.

apply(func[, args[, kwds]])

Вызов func с аргументами args и ключевыми аргументами kwds. Блокирует, пока не будет готов результат. Учитывая эти блоки, apply_async() лучше подходит для выполнения работы параллельно. Кроме того, func выполняется только в одном из рабочих пула.

apply_async(func[, args[, kwds[, callback[, error_callback]]]])

Вариант метода apply(), который возвращает объект AsyncResult.

Если указан callback, то это должен быть вызываемый объект, который принимает единственный аргумент. Когда результат становится готовым, к нему применяется callback, то есть если вызов не завершился неудачно, и в этом случае вместо него применяется error_callback.

Если указан error_callback, то это должен быть вызываемый объект, который принимает единственный аргумент. Если целевая функция терпит неудачу, то вызывается error_callback с экземпляром исключения.

Обратные вызовы должны выполняться немедленно, иначе поток, обрабатывающий результаты, будет заблокирован.

map(func, iterable[, chunksize])

Параллельный эквивалент встроенной функции map() (хотя он поддерживает только один аргумент iterable, для нескольких итераций см. starmap()). Блокирует, пока не будет готов результат.

Этот метод разбивает итерируемый объект на несколько фрагментов, которые он отправляет в пул процессов как отдельные задачи. (Приблизительный) размер этих фрагментов можно указать, задав для chunksize положительное целое число.

Обратите внимание, что это может привести к высокому использованию памяти для очень длинных итераций. Для большей эффективности рассмотрите возможность использования imap() или imap_unordered() с явной опцией chunksize.

map_async(func, iterable[, chunksize[, callback[, error_callback]]])

Вариант метода map(), который возвращает объект AsyncResult.

Если указан callback, то это должен быть вызываемый объект, который принимает единственный аргумент. Когда результат становится готовым, к нему применяется callback, то есть если вызов не завершился неудачно, и в этом случае вместо него применяется error_callback.

Если указан error_callback, то это должен быть вызываемый объект, принимающий единственный аргумент. Если целевая функция терпит неудачу, то вызывается error_callback с экземпляром исключения.

Обратные вызовы должны выполняться немедленно, иначе поток, обрабатывающий результаты, будет заблокирован.

imap(func, iterable[, chunksize])

Более ленивая версия map().

Аргумент chunksize совпадает с аргументом, используемым методом map(). Для очень длинных итераций использование большого значения chunksize может сильно ускорить выполнение задания, чем использование значения по умолчанию 1.

Также, если chunksize1, то метод next() итератора, возвращаемый методом imap(), содержит необязательный параметр timeout: next(timeout) поднимет multiprocessing.TimeoutError, если результат не может быть возвращён в течение timeout секунд.

imap_unordered(func, iterable[, chunksize])

То же, что и imap(), за исключением того, что порядок результатов возвращаемого итератора следует считать произвольным. (Только когда есть только один рабочий процесс, порядок гарантированно будет «правильным».)

starmap(func, iterable[, chunksize])

Как map() за исключением того, что элементы iterable, как ожидается, будут итерируемы, которые распакованы как аргументы.

Следовательно iterable [(1,2), (3, 4)] приводит к [func(1,2), func(3,4)].

Добавлено в версии 3.3.

starmap_async(func, iterable[, chunksize[, callback[, error_callback]]])

Комбинация starmap() и map_async(), которая выполняет итерацию по iterable итераций и вызывает func с распакованными итерациями. Возвращает объект результата.

Добавлено в версии 3.3.

close()

Предотвращает отправку каких-либо других задач в пул. Как только все задачи будут выполнены, рабочие процессы завершатся.

terminate()

Когда объект пула будет обработан сборщиком мусора, немедленно будет вызван terminate().

join()

Дождаться завершения рабочих процессов. Перед использованием join() необходимо позвонить в close() или terminate().

Добавлено в версии 3.3: Объекты пула теперь поддерживают протокол управления контекстом, см. Типы менеджера контекста. __enter__() возвращает объект пула, а __exit__() вызывает terminate().

class multiprocessing.pool.AsyncResult

Класс результата, возвращаемого Pool.apply_async() и Pool.map_async().

get([timeout])

Вернуть результат, когда он появится. Если timeout не None и результат не приходит в течение timeout секунд, то возникает multiprocessing.TimeoutError. Если удаленный вызов вызвал исключение, это исключение будет повторно вызвано get().

wait([timeout])

Подождите, пока не будет доступен результат или пока не пройдет timeout секунд.

ready()

Вернуть, был ли вызов завершён.

successful()

Возвращает, был ли вызов завершён без создания исключения. Поднимет ValueError, если результат не готов.

Изменено в версии 3.7: Если результат не готов, то вместо AssertionError вызывается ValueError.

В следующем примере демонстрируется использование пула:

from multiprocessing import Pool
import time

def f(x):
    return x*x

if __name__ == '__main__':
    with Pool(processes=4) as pool:         # запустить 4 процесса воркера
        result = pool.apply_async(f, (10,)) # асинхронно вычислить "f(10)" в одном процессе
        print(result.get(timeout=1))        # печатает "100", если только ваш компьютер не работает *очень* медленно

        print(pool.map(f, range(10)))       # печать "[0, 1, 4,..., 81]"

        it = pool.imap(f, range(10))
        print(next(it))                     # печать "0"
        print(next(it))                     # печать "1"
        print(it.next(timeout=1))           # печать "4" если ваш компьютер не *очень* медленный

        result = pool.apply_async(time.sleep, (10,))
        print(result.get(timeout=1))        # поднять multiprocessing.TimeoutError

Слушатели и клиенты

Обычно передача сообщений между процессами осуществляется с помощью очередей или объектов Connection, возвращаемых Pipe().

Однако модуль multiprocessing.connection допускает некоторую дополнительную гибкость. Он в основном предоставляет API высокого уровня, ориентированный на сообщения, для работы с сокетами или именованными конвейерами Windows. Он также поддерживает дайджест-аутентификацию с использованием модуля hmac и для одновременного опроса нескольких подключений.

multiprocessing.connection.deliver_challenge(connection, authkey)

Отправьте случайно сгенерированное сообщение на другой конец соединения и дождаться ответа.

Если ответ совпадает с дайджестом сообщения с использованием authkey в качестве ключа, то на другой конец соединения отправляется приветственное сообщение. В противном случае повышается AuthenticationError.

multiprocessing.connection.answer_challenge(connection, authkey)

Получите сообщение, вычислите дайджест сообщения, используя authkey в качестве ключа, а затем отправить дайджест обратно.

Если приветственное сообщение не получено, выводится AuthenticationError.

multiprocessing.connection.Client(address[, family[, authkey]])

Попытаться установить соединение со слушателем, который использует адрес address, возвращая Connection.

Тип соединения определяется аргументом family, но, как правило, его можно пропустить, поскольку его обычно можно вывести из формата address. (См. Форматы адресов)

Если задан authkey, а не None, это должна быть байтовая строка, которая будет использоваться в качестве секретного ключа для проверки подлинности на основе HMAC. Аутентификация не выполняется, если authkey — нет. AuthenticationError возникает в случае сбоя аутентификации. См. Ключи аутентификации.

class multiprocessing.connection.Listener([address[, family[, backlog[, authkey]]]])

Обёртка для связанного сокета или именованного конвейера Windows, который «прослушивает» соединения.

address — это адрес, который будет использоваться привязанным сокетом или именованным конвейером объекта слушателя.

Примечание

Если используется адрес 0.0.0.0, этот адрес не будет конечной точкой для подключения в Windows. Если вам нужна подключаемая конечная точка, вы должны использовать 127.0.0.1.

family — это тип используемого сокета (или именованного конвейера). Это может быть одна из строк 'AF_INET' (для сокета TCP), 'AF_UNIX' (для сокета домена Unix) или 'AF_PIPE' (для именованного конвейера Windows). Из них гарантированно будет доступен только первый. Если familyNone, то семейство выводится из формата address. Если address также является None, то выбирается значение по умолчанию. По умолчанию это семейство, которое считается самым быстрым из доступных. См. Форматы адресов. Обратите внимание, что если family - 'AF_UNIX', а адрес — None, то сокет будет создан в частном временном каталоге, созданном с использованием tempfile.mkstemp().

Если объект прослушивателя использует сокет, то backlog (1 по умолчанию) передается методу listen() сокета после его привязки.

Если задан authkey, а не None, это должна быть байтовая строка, которая будет использоваться в качестве секретного ключа для проверки подлинности на основе HMAC. Аутентификация не выполняется, если authkey — нет. AuthenticationError возникает в случае сбоя аутентификации. См. Ключи аутентификации.

accept()

Принять соединение с привязанным сокетом или именованным каналом объекта слушателя и вернуть объект Connection. Если попытка аутентификации не удалась, возникает AuthenticationError.

close()

Закрыть связанный сокет или именованный конвейер объекта-слушателя. Он вызывается автоматически, когда слушатель собирает мусор. Однако желательно вызывать его явно.

Объекты-слушатели содержат следующие свойства, доступные только для чтения:

address

Адрес, который используется объектом Listener.

last_accepted

Адрес, с которого пришло последнее принятое соединение. Если он недоступен, то это None.

Добавлено в версии 3.3: Объекты-слушатели теперь поддерживают протокол управления контекстом, см. Типы менеджера контекста. __enter__() возвращает объект слушателя, а __exit__() вызывает close().

multiprocessing.connection.wait(object_list, timeout=None)

Дождаться готовности объекта в object_list. Возвращает список готовых объектов в object_list. Если timeout — это число с плавающей запятой, то вызов блокируется максимум на столько секунд. Если timeoutNone, то он будет заблокирован на неограниченный период. Отрицательный тайм-аут эквивалентен нулевому таймауту.

И для Unix, и для Windows объект может отображаться в object_list, если это так

Соединение или объект сокета готовы, когда есть данные, доступные для чтения из него, или когда другой конец был закрыт.

Unix: wait(object_list, timeout) почти эквивалентен select.select(object_list, [], [], timeout). Разница в том, что если select.select() прерывается сигналом, он может вызвать OSError с номером ошибки EINTR, а wait() — нет.

Windows: элемент в object_list должен быть либо целочисленным дескриптором, который является ожидаемым (согласно определению, используемым в документации функции Win32 WaitForMultipleObjects()), либо это может быть объект с методом fileno(), который возвращает дескриптор сокета, либо ручка трубы. (Обратите внимание, что ручки конвейеров и сокетов не являются ожидаемыми дескрипторами.)

Добавлено в версии 3.3.

Примеры

Следующий код сервера создает слушателя, который использует 'secret password' в качестве ключа аутентификации. Затем он ожидает подключения и отправляет некоторые данные клиенту:

from multiprocessing.connection import Listener
from array import array

address = ('localhost', 6000)     # семейство выведено как 'AF_INET'

with Listener(address, authkey=b'secret password') as listener:
    with listener.accept() as conn:
        print('connection accepted from', listener.last_accepted)

        conn.send([2.25, None, 'junk', float])

        conn.send_bytes(b'hello')

        conn.send_bytes(array('i', [42, 1729]))

Следующий код подключается к серверу и получает некоторые данные с сервера:

from multiprocessing.connection import Client
from array import array

address = ('localhost', 6000)

with Client(address, authkey=b'secret password') as conn:
    print(conn.recv())                  # => [2.25, None, 'junk', float]

    print(conn.recv_bytes())            # => 'hello'

    arr = array('i', [0, 0, 0, 0, 0])
    print(conn.recv_bytes_into(arr))    # => 8
    print(arr)                          # => array('i', [42, 1729, 0, 0, 0])

Следующий код использует wait() для ожидания сообщений от нескольких процессов одновременно:

import time, random
from multiprocessing import Process, Pipe, current_process
from multiprocessing.connection import wait

def foo(w):
    for i in range(10):
        w.send((i, current_process().name))
    w.close()

if __name__ == '__main__':
    readers = []

    for i in range(4):
        r, w = Pipe(duplex=False)
        readers.append(r)
        p = Process(target=foo, args=(w,))
        p.start()
        # Мы закрываем записываемый конец конвейера, чтобы убедиться, что p является
        # единственным процессом, который владеет дескриптором для него. Это гарантирует,
        # что когда p закроет свой дескриптор для записываемого конца, wait()
        # быстро сообщит о читаемом конце как о прочитанном.
        w.close()

    while readers:
        for r in wait(readers):
            try:
                msg = r.recv()
            except EOFError:
                readers.remove(r)
            else:
                print(msg)

Форматы адресов

  • Адрес 'AF_INET' — это кортеж вида (hostname, port), где hostname — строка, а port — целое число.
  • Адрес 'AF_UNIX' — это строка, представляющая имя файла в файловой системе.
  • Адрес 'AF_PIPE' представляет собой строку вида r'\\.\pipe{PipeName}'. Чтобы использовать Client() для подключения к именованному каналу на удаленном компьютере с именем ServerName, вместо этого следует использовать адрес в форме r'\ServerName\pipe{PipeName}'.

Обратите внимание, что любая строка, начинающаяся с двух обратных косых черт, по умолчанию считается адресом 'AF_PIPE', а не адресом 'AF_UNIX'.

Ключи аутентификации

При использовании Connection.recv полученные данные автоматически не выбираются. К сожалению, получение данных из ненадежного источника представляет собой угрозу безопасности. Поэтому Listener и Client() используют модуль hmac для обеспечения дайджест- аутентификации.

Ключ аутентификации — это строка байтов, которую можно рассматривать как пароль: после установления соединения оба конца потребуют доказательства того, что другой конец знает ключ аутентификации. (Демонстрация того, что оба конца используют один и тот же ключ, подразумевает ли не отправку ключа по соединению.

Если аутентификация запрошена, но ключ аутентификации не указан, используется возвращаемое значение current_process().authkey (см. Process). Это значение будет автоматически унаследовано любым объектом Process, созданным текущим процессом. Это означает, что (по умолчанию) все процессы многопроцессорной программы будут использовать один ключ аутентификации, который можно использовать при установке соединений между собой.

Подходящие ключи аутентификации также могут быть сгенерированы с помощью os.urandom().

Логирование

Доступна некоторая поддержка ведения журнала. Однако обратите внимание, что пакет logging не использует разделяемые блокировки процессов, поэтому (в зависимости от типа обработчика) сообщения от разных процессов могут смешиваться.

multiprocessing.get_logger()

Возвращает регистратор, используемый multiprocessing. При необходимости будет создан новый.

При первом создании логгер содержит уровень logging.NOTSET и не содержит обработчика по умолчанию. Сообщения, отправленные этому логгеру, по умолчанию не передаются корневому логгеру.

Обратите внимание, что в Windows дочерние процессы наследуют только уровень логгера родительского процесса — любые другие настройки логгера не будут унаследованы.

multiprocessing.log_to_stderr()

Эта функция выполняет вызов get_logger(), но помимо возврата логгера, созданного get_logger, добавляет обработчик, который отправляет вывод в sys.stderr в формате '[%(levelname)s/%(processName)s] %(message)s'.

Ниже приведен пример сеанса с включенным ведением журнала:

>>> import multiprocessing, logging
>>> logger = multiprocessing.log_to_stderr()
>>> logger.setLevel(logging.INFO)
>>> logger.warning('doomed')
[WARNING/MainProcess] doomed
>>> m = multiprocessing.Manager()
[INFO/SyncManager-...] child process calling self.run()
[INFO/SyncManager-...] created temp directory /.../pymp-...
[INFO/SyncManager-...] manager serving at '/.../listener-...'
>>> del m
[INFO/MainProcess] sending shutdown message to manager
[INFO/SyncManager-...] manager exiting with exitcode 0

Полную таблицу уровней ведения журнала см. В модуле logging.

Модуль multiprocessing.dummy

multiprocessing.dummy копирует API multiprocessing, но является не более чем обёрткой для модуля threading.

В частности, функция Pool, предоставляемая multiprocessing.dummy, возвращает экземпляр ThreadPool, который является подклассом Pool, который поддерживает все те же вызовы методов, но использует пул рабочих потоков, а не рабочих процессов.

class multiprocessing.pool.ThreadPool([processes[, initializer[, initargs]]])

Экземпляры ThreadPool полностью совместимы по интерфейсу с экземплярами Pool, и их ресурсами также необходимо правильно управлять, используя пул в качестве диспетчера контекста или вручную вызывая close() и terminate().

processes — количество используемых рабочих потоков. Если processesNone, то используется номер, возвращаемый os.cpu_count().

Если initializer не None, то каждый рабочий процесс при запуске будет вызывать initializer(*initargs).

В отличие от Pool, maxtasksperchild и context не могут быть предоставлены.

Примечание

ThreadPool использует тот же интерфейс, что и Pool, который разработан на основе пула процессов и предшествует введению модуля concurrent.futures. Таким образом, он наследует некоторые операции, которые не имеют смысла для пула, поддерживаемого потоками, и содержит свой собственный тип для представления состояния асинхронных заданий, AsyncResult, который не понимается никакими другими библиотеками.

Обычно пользователи предпочитают использовать concurrent.futures.ThreadPoolExecutor, который содержит более простой интерфейс, изначально спроектированный вокруг потоков, и который возвращает экземпляры concurrent.futures.Future, совместимые со многими другими библиотеками, включая asyncio.

Руководство по программированию

Существуют определенные правила и идиомы, которых следует придерживаться при использовании multiprocessing.

Все методы запуска

Следующее относится ко всем методам запуска.

Избегайте разделяемого состояния

По возможности следует избегать перемещения больших объемов данных между процессами.

Вероятно, лучше придерживаться использования очередей или конвейеров для связи между процессами, а не примитивов синхронизации более низкого уровня.

Пикленгуемость

Убедитесь, что аргументы методов прокси-серверов пиклингуемые.

Поточная безопасность прокси

Не используйте прокси-объект из более чем одного потока, если вы не защитите его блокировкой.

Присоединение к процессам зомби

В Unix, когда процесс завершается, но не присоединяется к нему, он становится зомби. Их никогда не должно быть очень много, потому что каждый раз, когда запускается новый процесс (или вызывается active_children()), все завершенные процессы, которые еще не были присоединены, будут присоединены. Также вызов завершенного процесса Process.is_alive присоединится к процессу. Тем не менее, вероятно, хорошей практикой является явное присоединение ко всем процессам, которые вы запускаете.

Лучше наследовать, чем пиклить/анпиклить

При использовании методов запуска spawn или forkserver многие типы из multiprocessing должны быть выбираемыми, чтобы дочерние процессы могли их использовать. Однако обычно следует избегать отправки общих объектов другим процессам с использованием конвейеров или очередей. Вместо этого вы должны организовать программу так, чтобы процесс, которому требуется доступ к совместно используемому ресурсу, созданному где-то ещё, мог наследовать его от процесса-предка.

Избегайте завершения процессов

Использование метода Process.terminate для остановки процесса может привести к тому, что любые общие ресурсы (такие как блокировки, семафоры, конвейеры и очереди), в настоящее время используемые процессом, станут сломанными или недоступными для других процессов.

Присоединение к процессам, использующим очереди

Имейте в виду, что процесс, который поместил элементы в очередь, будет ждать перед завершением, пока все буферизованные элементы не будут переданы потоком-фидером в нижележащий канал. (Дочерний процесс может вызвать метод очереди Queue.cancel_join_thread, чтобы избежать такого поведения.

Это означает, что всякий раз, когда вы используете очередь, вам нужно убедиться, что все элементы, которые были помещены в очередь, в конечном итоге будут удалены до присоединения к процессу. В противном случае вы не можете быть уверены, что процессы, поместившие элементы в очередь, завершатся. Помните также, что недемонические процессы будут подключаться автоматически.

Примером взаимоблокировки является следующий код:

from multiprocessing import Process, Queue

def f(q):
    q.put('X' * 1000000)

if __name__ == '__main__':
    queue = Queue()
    p = Process(target=f, args=(queue,))
    p.start()
    p.join()                    # это мертвая блокировка
    obj = queue.get()

Исправление заключается в том, чтобы поменять местами последние две строки (или просто удалить строку p.join()).

Явная передача ресурсов дочерним процессам

В Unix, использующем метод запуска fork, дочерний процесс может использовать общий ресурс, созданный в родительском процессе с использованием глобального ресурса. Однако лучше передать объект в качестве аргумента конструктору дочернего процесса.

Помимо обеспечения совместимости кода (потенциально) с Windows и другими методами запуска, это также гарантирует, что, пока дочерний процесс все еще жив, объект не будет собираться сборщиком мусора в родительском процессе. Это может быть важно, если какой-то ресурс освобождается при сборке мусора в родительском процессе.

Таким образом для сущность:

from multiprocessing import Process, Lock

def f():
    """сделать что-то используя 'lock'"""

if __name__ == '__main__':
    lock = Lock()
    for i in range(10):
        Process(target=f).start()

следует переписать как:

from multiprocessing import Process, Lock

def f(l):
    """ сделать что-нибудь, используя 'l' """

if __name__ == '__main__':
    lock = Lock()
    for i in range(10):
        Process(target=f, args=(lock,)).start()

Остерегайтесь замены sys.stdin на «файл как объект».

multiprocessing, изначально вызываемый безоговорочно:

os.close(sys.stdin.fileno())

в методе multiprocessing.Process._bootstrap() — это привело к проблемам с процессами в процессах. Это было изменено на:

sys.stdin.close()
sys.stdin = open(os.open(os.devnull, os.O_RDONLY), closefd=False)

Он решает фундаментальную проблему столкновения процессов друг с другом, что приводит к ошибке неверного файлового дескриптора, но представляет потенциальную опасность для приложений, которые заменяют sys.stdin() на «файловый объект» с буферизацией вывода. Эта опасность заключается в том, что если несколько процессов вызовут close() для этого файлового объекта, это может привести к тому, что одни и те же данные будут сброшены в объект несколько раз, что приведет к повреждению.

Если вы пишете объект в виде файла и реализуете собственное кэширование, вы можете сделать его безопасным для вилки, сохраняя pid всякий раз, когда вы добавляете в кеш, и отбрасывая кеш при изменении pid. Например:

@property
def cache(self):
    pid = os.getpid()
    if pid != self._pid:
        self._pid = pid
        self._cache = []
    return self._cache

Для получения дополнительной информации см. bpo-5155, bpo-5313 и bpo-5331

Методы запуска spawn и forkserver

Есть несколько дополнительных ограничений, не относящихся к методу запуска fork.

Больше пикленгуемости

Убедитесь, что все аргументы Process.__init__() можно пиклинговать. Кроме того, если вы создаете подкласс Process, убедитесь, что экземпляры будут пиклингуемыми при вызове метода Process.start.

Глобальные переменные

Имейте в виду, что если код, выполняемый в дочернем процессе, пытается получить доступ к глобальной переменной, то значение, которое он видит (если оно есть), может не совпадать со значением в родительском процессе во время вызова Process.start .

Однако глобальные переменные, которые являются просто константами уровня модуля, не вызывают проблем.

Безопасный импорт основного модуля

Убедитесь, что основной модуль может быть безопасно импортирован новым интерпретатором Python, не вызывая нежелательных побочных эффектов (таких как запуск нового процесса).

Например, при использовании метода запуска spawn или forkserver следующий модуль завершится ошибкой с кодом RuntimeError:

from multiprocessing import Process

def foo():
    print('hello')

p = Process(target=foo)
p.start()

Вместо этого следует защитить «точку входа» программы с помощью if __name__ == '__main__': следующим образом:

from multiprocessing import Process, freeze_support, set_start_method

def foo():
    print('hello')

if __name__ == '__main__':
    freeze_support()
    set_start_method('spawn')
    p = Process(target=foo)
    p.start()

(Строку freeze_support() можно не указывать, если программа будет запускаться в обычном режиме, а не в замороженном состоянии.)

Это позволяет новому интерпретатору Python безопасно импортировать модуль, а затем запускать функцию модуля foo().

Аналогичные ограничения применяются, если пул или менеджер создается в основном модуле.

Примеры

Демонстрация создания и использования настраиваемых менеджеров и прокси:

from multiprocessing import freeze_support
from multiprocessing.managers import BaseManager, BaseProxy
import operator

##

class Foo:
    def f(self):
        print('you called Foo.f()')
    def g(self):
        print('you called Foo.g()')
    def _h(self):
        print('you called Foo._h()')

# Простая функция генератора
def baz():
    for i in range(10):
        yield i*i

# Тип прокси для объектов-генераторов
class GeneratorProxy(BaseProxy):
    _exposed_ = ['__next__']
    def __iter__(self):
        return self
    def __next__(self):
        return self._callmethod('__next__')

# Функция для возврата модуля operator
def get_operator_module():
    return operator

##

class MyManager(BaseManager):
    pass

# зарегистрировать класс Foo; сделать `f()` и `g()` доступными через прокси
MyManager.register('Foo1', Foo)

# зарегистрировать класс Foo; сделать `g()` and `_h() `доступным через прокси
MyManager.register('Foo2', Foo, exposed=('g', '_h'))

# регистрируем генераторную функцию baz; используется `GeneratorProxy` для создания
# прокси
MyManager.register('baz', baz, proxytype=GeneratorProxy)

# зарегистрировать get_operator_module(); сделать общедоступные функции
# доступными через прокси
MyManager.register('operator', get_operator_module)

##

def test():
    manager = MyManager()
    manager.start()

    print('-' * 20)

    f1 = manager.Foo1()
    f1.f()
    f1.g()
    assert not hasattr(f1, '_h')
    assert sorted(f1._exposed_) == sorted(['f', 'g'])

    print('-' * 20)

    f2 = manager.Foo2()
    f2.g()
    f2._h()
    assert not hasattr(f2, 'f')
    assert sorted(f2._exposed_) == sorted(['g', '_h'])

    print('-' * 20)

    it = manager.baz()
    for i in it:
        print('<%d>' % i, end=' ')
    print()

    print('-' * 20)

    op = manager.operator()
    print('op.add(23, 45) =', op.add(23, 45))
    print('op.pow(2, 94) =', op.pow(2, 94))
    print('op._exposed_ =', op._exposed_)

##

if __name__ == '__main__':
    freeze_support()
    test()

Использование Pool:

import multiprocessing
import time
import random
import sys

#
# Функции, используемые тестовым кодом
#

def calculate(func, args):
    result = func(*args)
    return '%s says that %s%s = %s' % (
        multiprocessing.current_process().name,
        func.__name__, args, result
        )

def calculatestar(args):
    return calculate(*args)

def mul(a, b):
    time.sleep(0.5 * random.random())
    return a * b

def plus(a, b):
    time.sleep(0.5 * random.random())
    return a + b

def f(x):
    return 1.0 / (x - 5.0)

def pow3(x):
    return x ** 3

def noop(x):
    pass

#
# Тестовый код
#

def test():
    PROCESSES = 4
    print('Creating pool with %d processes\n' % PROCESSES)

    with multiprocessing.Pool(PROCESSES) as pool:
        #
        # Тестовы
        #

        TASKS = [(mul, (i, 7)) for i in range(10)] + \
                [(plus, (i, 8)) for i in range(10)]

        results = [pool.apply_async(calculate, t) for t in TASKS]
        imap_it = pool.imap(calculatestar, TASKS)
        imap_unordered_it = pool.imap_unordered(calculatestar, TASKS)

        print('Ordered results using pool.apply_async():')
        for r in results:
            print('\t', r.get())
        print()

        print('Ordered results using pool.imap():')
        for x in imap_it:
            print('\t', x)
        print()

        print('Unordered results using pool.imap_unordered():')
        for x in imap_unordered_it:
            print('\t', x)
        print()

        print('Ordered results using pool.map() --- will block till complete:')
        for x in pool.map(calculatestar, TASKS):
            print('\t', x)
        print()

        #
        # Обработка ошибок теста
        #

        print('Testing error handling:')

        try:
            print(pool.apply(f, (5,)))
        except ZeroDivisionError:
            print('\tGot ZeroDivisionError as expected from pool.apply()')
        else:
            raise AssertionError('expected ZeroDivisionError')

        try:
            print(pool.map(f, list(range(10))))
        except ZeroDivisionError:
            print('\tGot ZeroDivisionError as expected from pool.map()')
        else:
            raise AssertionError('expected ZeroDivisionError')

        try:
            print(list(pool.imap(f, list(range(10)))))
        except ZeroDivisionError:
            print('\tGot ZeroDivisionError as expected from list(pool.imap())')
        else:
            raise AssertionError('expected ZeroDivisionError')

        it = pool.imap(f, list(range(10)))
        for i in range(10):
            try:
                x = next(it)
            except ZeroDivisionError:
                if i == 5:
                    pass
            except StopIteration:
                break
            else:
                if i == 5:
                    raise AssertionError('expected ZeroDivisionError')

        assert i == 9
        print('\tGot ZeroDivisionError as expected from IMapIterator.next()')
        print()

        #
        # Тестирование таймаутов
        #

        print('Testing ApplyResult.get() with timeout:', end=' ')
        res = pool.apply_async(calculate, TASKS[0])
        while 1:
            sys.stdout.flush()
            try:
                sys.stdout.write('\n\t%s' % res.get(0.02))
                break
            except multiprocessing.TimeoutError:
                sys.stdout.write('.')
        print()
        print()

        print('Testing IMapIterator.next() with timeout:', end=' ')
        it = pool.imap(calculatestar, TASKS)
        while 1:
            sys.stdout.flush()
            try:
                sys.stdout.write('\n\t%s' % it.next(0.02))
            except StopIteration:
                break
            except multiprocessing.TimeoutError:
                sys.stdout.write('.')
        print()
        print()


if __name__ == '__main__':
    multiprocessing.freeze_support()
    test()

Пример использования очередей для подачи задач в коллекцию рабочих процессов и сбора результатов: