您的位置:  首页 » Python » – 基于进程的并行性 – 并发执行(Python教程)(参考资料)

multiprocessing– 基于流程的并行性

源代码:LIB /多


介绍

multiprocessing是一个使用类似于的API支持生成进程的包threading模块。multiprocessingpackage提供本地和远程并发,通过使用子进程而不是线程有效地侧面执行全局解释器锁。Dueto,multiprocessing模块允许程序员在给定的机器上充分利用多个处理器。它可以在Unix和Windows上运行.

multiprocessing模块还介绍了中没有注册的APIthreading模块。一个很好的例子就是Pool这个对象提供了一种方便的方法,可以跨多个输入值并行执行函数,在进程间分配输入数据(数据并行)。以下示例演示了在模块中定义此类函数的常见做法,即子进程可以成功导入该模块。这个使用Pool,

from multiprocessing import Pooldef f(x):    return x*xif __name__ == "__main__":    with Pool(5) as p:        print(p.map(f, [1, 2, 3]))

将打印到标准输出

[1, 4, 9]

Process

multiprocessing,通过创建Process对象,然后调用它start()方法。Process遵循threading.Thread。一个简单的amultiprocess程序的例子是

from multiprocessing import Processdef f(name):    print("hello", name)if __name__ == "__main__":    p = Process(target=f, args=("bob",))    p.start()    p.join()

为了显示所涉及的各个进程ID,这里有一个扩展的例子:

from multiprocessing import Processimport osdef info(title):    print(title)    print("module name:", __name__)    print("parent process:", os.getppid())    print("process id:", os.getpid())def f(name):    info("function f")    print("hello", name)if __name__ == "__main__":    info("main line")    p = Process(target=f, args=("bob",))    p.start()    p.join()

为了解释为什么if __name__ == "__main__"部分是必要的,请参阅编程指南.

上下文和启动方法

根据平台,multiprocessing支持三种启动流程的方法。这些start methods

spawn

进程启动一个新的python解释器进程。子进程只会继承那些破坏进程对象run()方法所需的资源。特别是,不会继承父进程中不必要的文件描述符和句柄。与使用forkforkserver.

相比,使用此方法启动进程的速度较慢,可在Unix和Windows上使用。Windows上的默认值

fork

父进程使用os.fork()来分叉Python解释器。子进程在开始时与父进程有效地相似。父进程的所有资源都由子进程继承。请注意,安全分叉amultithreaded进程是有问题的.

仅在Unix上可用。Unix上的默认值

forkserver

当程序启动并选择forkserver启动方法,启动服务器进程。从那时起,无论何时需要新进程,父进程都会连接到服务器并请求它分叉一个新进程。fork服务器进程是单线程的,所以使用os.fork()。Nounnecessary资源是继承的.

在Unix平台上可用,支持在Unix管道上传递文件描述符.

版本3.4更改:spawn添加到所有unix平台上,并且forkserver添加了很多unix平台.Child进程不再继承Windows上所有父类的继承句柄.

在Unix上使用spawnforkserver启动方法也会启动semaphore tracker跟踪由程序进程创建的未链接的命名信号的进程。当所有进程都退出时,信号量跟踪器取消链接任何剩余的信号量。通常应该没有,但是如果某个进程被信号杀死,那么可能会有一些“泄露”的信号量。(取消链接命名的semaphoresis是一个严重的问题,因为系统只允许有限的数量,并且在下次重新启动之前它们不会自动取消链接。)

要选择启动方法,请使用主模块的set_start_method()if __name__ == "__main__"子句中。例如:

import multiprocessing as mpdef foo(q):    q.put("hello")if __name__ == "__main__":    mp.set_start_method("spawn")    q = mp.Queue()    p = mp.Process(target=foo, args=(q,))    p.start()    print(q.get())    p.join()

set_start_method()在程序中不应该多次使用.

或者,你可以使用get_context()获得一个contextobject。Context对象与multiprocessingmodule具有相同的API,并且允许在同一程序中使用多个start方法.

import multiprocessing as mpdef foo(q):    q.put("hello")if __name__ == "__main__":    ctx = mp.get_context("spawn")    q = ctx.Queue()    p = ctx.Process(target=foo, args=(q,))    p.start()    print(q.get())    p.join()

请注意,与一个上下文相关的对象可能与不同上下文的过程不兼容。特别是使用fork上下文不能传递给使用spawnforkserver启动方法启动的进程

想要使用特定启动方法的库应该使用get_context()避免干扰图书馆用户的选择.

警告

"spawn""forkserver"start方法目前不能与“冻结”的可执行文件一起使用(即,像PyInstaller cx_Freeze )在Unix上."fork"启动方法确实有效.

在进程之间交换对象

multiprocessing支持进程之间的两种通信通道:

队列

Queue类是queue.Queue的近似克隆。例如:

from multiprocessing import Process, Queuedef f(q):    q.put([42, None, "hello"])if __name__ == "__main__":    q = Queue()    p = Process(target=f, args=(q,))    p.start()    print(q.get())    # prints "[42, None, "hello"]"    p.join()

队列是线程和进程安全的.

管道

Pipe()函数返回一对由apipe连接的连接对象,默认为双工(双向)。例如:

from multiprocessing import Process, Pipedef f(conn):    conn.send([42, None, "hello"])    conn.close()if __name__ == "__main__":    parent_conn, child_conn = Pipe()    p = Process(target=f, args=(child_conn,))    p.start()    print(parent_conn.recv())   # prints "[42, None, "hello"]"    p.join()

返回的两个连接对象Pipe()代表管道的两端。每个连接对象都有send()recv()方法(等等)。请注意,如果两个进程(或线程)同时尝试读取或写入管道的same末端,则管道中的数据可能会损坏。当然,同时使用管道不同端的进程不存在损坏的风险.

进程之间的同步

multiprocessing包含来自threading的所有同步原语的等价物。例如,一个人可以使用一个锁来确保一次只有一个进程打印到标准输出:

from multiprocessing import Process, Lockdef f(l, i):    l.acquire()    try:        print("hello world", i)    finally:        l.release()if __name__ == "__main__":    lock = Lock()    for num in range(10):        Process(target=f, args=(lock, num)).start()

不使用来自不同进程的锁定输出可能会混淆掉

共享状态进程之间

如上所述,在进行并发编程时,通常最好尽量避免使用共享状态。当使用多个进程时尤其如此.

但是,如果你真的需要使用一些共享数据,那么multiprocessing提供了几种方法.

共享内存

数据可以使用Value要么Array。例如,以下代码

from multiprocessing import Process, Value, Arraydef f(n, a):    n.value = 3.1415927    for i in range(len(a)):        a[i] = -a[i]if __name__ == "__main__":    num = Value("d", 0.0)    arr = Array("i", range(10))    p = Process(target=f, args=(num, arr))    p.start()    p.join()    print(num.value)    print(arr[:])

会打印

3.1415927[0, -1, -2, -3, -4, -5, -6, -7, -8, -9]

"d""i"创建时使用的参数numarr使用的那种类型array模块:"d"表示双精度浮点数和"i"表示有符号整数。这些共享对象将是进程和thread-safe.

为了更灵活地使用共享内存,可以使用multiprocessing.sharedctypes支持创建从共享内存分配的任意ctypes对象的模块.

Server process

Manager()控制一个服务器进程,该进程保留Python对象并允许其他进程使用proxies来操作它们.

一位经理Manager()将支持类型list, dict, Namespace, Lock,RLock, Semaphore, BoundedSemaphore,Condition, Event, Barrier,Queue, ValueArray。例如,

from multiprocessing import Process, Managerdef f(d, l):    d[1] = "1"    d["2"] = 2    d[0.25] = None    l.reverse()if __name__ == "__main__":    with Manager() as manager:        d = manager.dict()        l = manager.list(range(10))        p = Process(target=f, args=(d, l))        p.start()        p.join()        print(d)        print(l)

会打印

{0.25: None, 1: "1", "2": 2}[9, 8, 7, 6, 5, 4, 3, 2, 1, 0]

服务器进程管理器比使用共享内存对象更灵活,因为它们可以支持任意对象类型。此外,单个管理器可以通过网络在不同计算机上的进程共享。但是,它们比使用共享内存要慢.

使用工作池

Pool类表示一个workerprocesses池。它有一些方法允许以几种不同的方式将任务卸载到workerprocesses .

例如:

from multiprocessing import Pool, TimeoutErrorimport timeimport osdef f(x):    return x*xif __name__ == "__main__":    # start 4 worker processes    with Pool(processes=4) as pool:        # print "[0, 1, 4,..., 81]"        print(pool.map(f, range(10)))        # print same numbers in arbitrary order        for i in pool.imap_unordered(f, range(10)):            print(i)        # evaluate "f(20)" asynchronously        res = pool.apply_async(f, (20,))      # runs in *only* one process        print(res.get(timeout=1))             # prints "400"        # evaluate "os.getpid()" asynchronously        res = pool.apply_async(os.getpid, ()) # runs in *only* one process        print(res.get(timeout=1))             # prints the PID of that process        # launching multiple evaluations asynchronously *may* use more processes        multiple_results = [pool.apply_async(os.getpid, ()) for i in range(4)]        print([res.get(timeout=1) for res in multiple_results])        # make a single worker sleep for 10 secs        res = pool.apply_async(time.sleep, (10,))        try:            print(res.get(timeout=1))        except TimeoutError:            print("We lacked patience and got a multiprocessing.TimeoutError")        print("For the moment, the pool remains available for more work")    # exiting the "with"-block has stopped the pool    print("Now the pool is closed and no longer available")

请注意,池的方法只能由创建它的进程使用.

注意

这个包中的功能要求__main__模块由孩子们提供。这包含在编程指南中但是值得指出这里。这意味着一些例子,比如multiprocessing.pool.Pool示例在交互式解释器中不起作用。例如:

>>> from multiprocessing import Pool>>> p = Pool(5)>>> def f(x):...     return x*x...>>> p.map(f, [1,2,3])Process PoolWorker-1:Process PoolWorker-2:Process PoolWorker-3:Traceback (most recent call last):AttributeError: "module" object has no attribute "f"AttributeError: "module" object has no attribute "f"AttributeError: "module" object has no attribute "f"

(如果你试试这个,它实际上会以半随机的方式输出三个完整的回溯,然后你可能会以某种方式对主进程进行tostop。)

参考

multiprocessing包大多复制了threadingmodule.

Process和例外

class multiprocessing.Processgroup=None, target=None, name=None, args=(), kwargs={}, *, daemon=None

流程对象表示在单独的流程中运行的活动。该Processclass具有threading.Thread.

的所有方法的等价物。应始终使用关键字参数调用构造函数。group应该总是None;它只是为了兼容threading.Thread. targetrun()方法调用的可调用对象。它默认为None,这意味着什么都没有。name是进程名称(详见name).args是目标调用的参数元组。kwargs是目标调用的关键字参数的adictionary。如果提供,则仅关键字daemon参数设置进程daemon flagto True要么 False。如果None(默认值),此标志将从创建进程中继承.

默认情况下,没有参数传递给target.

如果子类重写构造函数,则必须确保它调用基类构造函数(Process.__init__())在做任何其他事情之前进行处理.

在版本3.3中更改:添加了daemon论据

run//(

表示进程活动的方法.

你可以在子类中覆盖这个方法。标准run()方法调用传递给对象构造函数的可调用对象作为目标参数(如果有),分别从argskwargs参数中获取顺序和关键字参数.

start

启动进程的活动

每个进程对象最多必须调用一次。它安排对象的run()方法在一个单独的过程中调用.

join ( [timeout])

如果可选参数timeoutNone(默认值),则方法会阻塞到join()方法被调用终止。如果timeout是正数,它最多会阻塞timeout秒。注意方法返回None如果其进程终止或方法超时。检查进程exitcode确定是否终止了

一个过程可以连接多次.

进程无法加入自身,因为这会导致死锁。在启动进程之前尝试加入进程是错误的.

name

流程的名称。该名称是用于识别目的的字符串。它没有语义。多个进程可以给出相同的名称.

初始名称由构造函数设置。如果没有为构造函数提供明确的名称,则表示名称’Process-N 1 :N 2 :…:N k ‘是构造的,在N k 是其父母的第N个孩子.

is_alive()

返回该过程是否还活着.

粗略地说,从start()方法返回,直到子进程终止.

daemon

进程的守护进程标志,一个布尔值。这必须在调用start()之前设置.

初始值是从创建进程继承的.

当一个进程退出时,它会尝试终止所有的守护进程子进程.

请注意,不允许守护进程创建子进程。否则,如果守护进程在其父进程退出时被终止,则守护进程会使其子进行孤立。另外,这些是 Unix守护进程或服务,如果非守护进程已经退出,它们是正常进程(并且不会被连接)。

另外到threading.Thread API,Process对象还支持以下属性和方法:

pid

返回进程ID。在生成该过程之前,这将是None.

exitcode

孩子的退出代码。如果这个过程还没有终止,那将是None。负值-N表示孩子被信号终止N.

authkey

进程的身份验证密钥(字节字符串).

初始化multiprocessing时,使用os.urandom().

为主进程分配一个随机字符串当创建一个Process对象时,它会继承其父进程的身份验证密钥,虽然这可以通过将authkey设置为另一个字节字符串来改变.

查看验证密钥.

sentinel

A系统对象的数字句柄,当进程结束时将变为“就绪”.

如果你想使用multiprocessing.connection.wait()等待几个事件,你可以使用这个值。否则调用join()更简单

在Windows上,这是一个OS句柄,可用于WaitForSingleObjectWaitForMultipleObjects API调用系列。在Unix上,这是一个文件描述符,可用于select模块中的原语

新版本3.3.

terminate ( )

终止流程。在Unix上,这是使用SIGTERM信号完成的;在Windows上使用TerminateProcess()。请注意,退出处理程序和最终条款等不会被执行.

注意进程的后代进程将not被终止 – 他们将成为孤儿.

警告

如果在关联进程使用管道队列时使用此方法,则管道或队列可能会损坏并可能被其他进程无法使用。类似地,如果进程已获取锁或信号量等,则终止它可能导致其他进程死锁.

kill

terminate()相同但在Unix上使用SIGKILL信号

新版本3.7.

close

关闭Process对象,释放与之关联的所有资源。ValueError如果基础进程仍在运行,则引发此异常。一次close()返回成功,Process对象意志ValueError.

版本3.7.

注意start(), join(), is_alive(),terminate()exitcode方法只能由创建过程对象的进程调用.

使用Process

>>> import multiprocessing, time, signal>>> p = multiprocessing.Process(target=time.sleep, args=(1000,))>>> print(p, p.is_alive())<Process(Process-1, initial)> False>>> p.start()>>> print(p, p.is_alive())<Process(Process-1, started)> True>>> p.terminate()>>> time.sleep(0.1)>>> print(p, p.is_alive())<Process(Process-1, stopped[SIGTERM])> False>>> p.exitcode == -signal.SIGTERMTrue
exception multiprocessing.ProcessError

所有的基类multiprocessingexceptions.

exception multiprocessing.BufferTooShort

当// @//提供缓冲区对象太小而无法读取消息时Connection.recv_bytes_into()引发异常

如果eBufferTooShort的实例e.args[0]将消息作为字节串.

exception multiprocessing.AuthenticationError

出现身份验证错误时引发.

exception multiprocessing.TimeoutError

超时到期时超时的方法提示.

管道和队列

当使用多个进程时,通常使用消息传递来进行进程之间的通信,并避免必须使用任何同步原语,如locks.

对于传递消息,可以使用Pipe()(用于两个连接之间的连接)或队列(允许多个制作者和消费者).

Queue, SimpleQueueJoinableQueuetypes是多生产者,多用户FIFO 队列,在标准库的queue.Queue类上建模。它们的不同之处在于Queue缺少task_done()join()在Python 2.5中引入的方法queue.Queue class.

如果你使用JoinableQueue然后你必须来电话JoinableQueue.task_done()对于从队列中删除的每个任务,或者这些用于计算未完成任务数量的文件最终可能会溢出,引发异常.

注意也可以使用管理器对象创建共享队列 – 请参阅经理.

注意

multiprocessing使用通常的queue.Emptyqueue.Full例外来表示超时。它们在multiprocessing命名空间中不可用,所以你需要从queue.

导入它们

当一个对象被放入一个队列时,该对象被腌制并且后续的线程会被刷新腌制数据到底层管道。这有一些后果有点令人惊讶,但不应该造成任何实际困难 – 如果他们真的很烦你那么你可以改为使用一个用管理器.

  1. 创建的队列。一个空队列,在队列的empty()方法返回Falseget_nowait() canreturn而不抬起queue.Empty.
  2. 如果多个进程将对象排入队列,则可能无序地在另一端接收对象。但是,由相同进程排队的对象将始终处于相对于彼此的预期顺序中.

警告

如果在尝试使用Process.terminate()时使用os.kill()Queue,那么队列中的数据可能会被破坏。这可能导致任何其他进程在以后尝试使用队列时获得异常.

警告

如上所述,如果子进程已将项目放在队列中(并且它没有使用JoinableQueue.cancel_join_thread),那么该进程将不会终止,直到所有缓冲的项目都被刷新到管道.

这意味着如果您尝试加入该过程,您可能会遇到死锁,除非您确定已经消耗了已放入队列的所有项目。类似地,如果子进程是非守护进程,则父进程在尝试加入所有非守护进程的子进程时可能会在退出时挂起.

请注意,使用管理器创建的队列没有此问题。请参阅编程指南.

有关使用队列进行进程间通信的示例,请参阅示例.

multiprocessing.Pipe[duplex]

返回一对(conn1, conn2)Connection代表管子末端的物体

如果duplexTrue(默认值)然后管道是双向的。如果duplexFalse管道是单向的:conn1只能用于接收信息,而conn2只能用于发送信息.

class multiprocessing.Queue ( [maxsize]

返回使用管道和几个锁/信号量实现的进程共享队列。当一个进程首先将一个项目放入队列时,会启动一个feederthread,它将对象从缓冲区传输到管道中.

通常queue.Emptyqueue.Full来自标准库queue模块的异常被提升到信号超时

Queue执行queue.Queue的所有方法除外for task_done()join().

qsize

返回队列的大致大小。由于多线程处理/多处理语义,这个数字不可靠.

注意这可能会在Mac OS X等Unix平台上引起NotImplementedError sem_getvalue()没有实现.

empty ()

如果队列是空的,请回复True,否则False。由于多线程/多处理语义,这是不可靠的.

full ()

返回True如果队列已满,False否则。由于多线程/多处理语义,这是不可靠的.

put (obj [, block [, timeout]]

把obj放入队列。如果可选参数blockTrue(默认值)和timeoutNone(默认值),则必要时阻止untila空闲插槽可用。如果timeout是一个正数,如果nofree插槽在那段时间内可用,它最多会阻塞timeout秒并引发queue.Full异常。否则(blockFalse如果一个空闲插槽可以立即使用,将一个项目放在队列中,否则提起queue.Full异常(timeout在这种情况下被忽略).

put_nowaitobj

相当于put(obj, False).

get [block [, timeout] ]

从队列中删除并返回一个项目。如果可选的args blockTrue(默认值)和timeoutNone(默认值),则在项目可用之前阻止if必要。如果timeout是一个正数,它最多会阻塞timeout秒,如果在那段时间内没有可用的项目,则会引发queue.Empty异常。否则(阻止是False),如果一个项目立即可用,则返回一个项目,否则提高queue.Empty异常(在这种情况下timeout被忽略).

get_nowait

相当于get(False).

multiprocessing.Queue还有一些queue.Queue中没有的附加方法。对于大多数代码,这些方法通常是不必要的:

close

表示当前进程不再有数据放在此队列中。一旦将所有buffereddata刷新到管道,后台线程将退出。当队列被垃圾收集时自动调用.

join_thread (

加入后台主题。这只能在调用close()之后使用。它会阻塞,直到后台线程退出,确保缓冲区中的所有数据都已刷新到管道.

默认情况下,如果进程不是队列的创建者,那么在退出时它将尝试加入队列的后台线程。这个过程可以调用cancel_join_thread()join_thread()什么也不做.

cancel_join_thread ()

预防join_thread()阻挡。特别是,这可以防止后台线程在处理时自动连接 – 请参阅join_thread().

此方法的更好名称可能是allow_exit_without_flush()。它可能会导致enqueueddata丢失,你几乎肯定不需要使用它。只有当你需要当前进程立即退出而不等待将入队数据刷新到底层管道时,它才真正存在,并且你不关心丢失的数据.

注意

此类的功能需要在主机操作系统上运行共享的信号量实现。没有一个,这个类的功能将被禁用,并尝试实例化Queue会导致ImportError。Sebpo-3770了解更多信息。对于下面列出的任何专用队列类型也是如此.

class multiprocessing.SimpleQueue

这是一个简化的Queue类型,非常接近锁定Pipe.

empty

返回True如果队列是空的,False除此以外。

get

从队列中删除并返回一个项目.

putitem

item进入队列

class multiprocessing.JoinableQueue[maxsize]

JoinableQueue,一个Queue子类,是一个task_done()join()方法的队列

task_done// (

表示以前排队的任务已完成。由queueconsumers使用。每个get()用于获取任务,后续调用task_done()告诉队列任务处理完成.

如果join()当前正在阻止,它将在处理完所有项时恢复(意味着task_done()调用已被put()进入队列的每个项目).

提出一个ValueError如果被叫的次数多于队列中放置的物品的数量.

join()

阻止,直到队列中的所有物品都被取出并处理完毕.

每当项目被添加到thequeue时,未完成任务的数量就会增加。每当消费者打电话task_done()表示该项目已被检索,所有工作都已完成。当未完成任务的数量降至零时,join() unblocks.

Miscellaneous

multiprocessing.active_children ()

返回当前进程中所有活孩子的清单.

调用它具有“加入”任何已经完成的过程的副作用.

multiprocessing.cpu_count

返回系统中的CPU数量.

这个数字不等于当前进程可以使用的CPU数量。使用len(os.sched_getaffinity(0))

可以获得可用的CPU数量//可以提高NotImplementedError.

也可以看看

os.cpu_count()

multiprocessing.current_process

退回Process对应当前进程的对象.

的类比threading.current_thread().

multiprocessing.freeze_support

添加对使用multiprocessing已被冻结以生成Windows可执行文件。(已经用 py2exe ,测试了PyInstaller cx_Freeze.)

需要在if __name__ =="__main__"主模块的行。例如:

from multiprocessing import Process, freeze_supportdef f():    print("hello world!")if __name__ == "__main__":    freeze_support()    Process(target=f).start()

如果freeze_support()行被省略,那么尝试运行frozenexecutable会引发RuntimeError.

调用freeze_support()在Windows以外的任何操作系统上调用时无效。另外,如果Windows上的Python解释器正常运行该模块(该程序尚未冻结),那么freeze_support()没有效果.

multiprocessing.get_all_start_methods ()

返回支持的启动方法列表,第一个是默认值。可能的启动方法是"fork","spawn""forkserver"。仅在Windows上"spawn"isavailable。在Unix上"fork""spawn"都是支持的,"fork"是默认的.

新版本3.4.

multiprocessing.get_contextmethod=None

返回一个与multiprocessingmodule

//如果methodNone然后返回默认上下文。否则如果指定的启动方法method应该是"fork", "spawn","forkserver". ValueError不可用.

新版本3.4.

multiprocessing.get_start_method (allow_none=False)

返回用于启动进程的启动方法的名称.

如果启动方法尚未修复且allow_none如果为false,则start方法固定为默认值并返回名称。如果启动方法尚未修复且allow_none为真,则None返回.

返回值可以是"fork", "spawn", "forkserver"None. "fork"是Unix上的默认值,而"spawn"是Windows上的默认值.

版本3.4.

multiprocessing.set_executable (

设置启动子进程时要使用的Python解释器的路径。(默认使用sys.executable)。嵌入器可能需要一些像

set_executable(os.path.join(sys.exec_prefix, "pythonw.exe"))

在他们可以创建子进程之前

版本3.4更改:现在在"spawn"使用启动方法.

multiprocessing.set_start_methodmethod

设置应该用于启动子进程的方法.method可以是"fork", "spawn""forkserver".

请注意,这应该被调用一次,它应该被保护在if __name__ == "__main__"主要模块的条款.

新版本3.4.

注意

multiprocessing不包含threading.active_count(), threading.enumerate(),threading.settrace(), threading.setprofile(),threading.Timer的类似物,或threading.local.

连接对象

连接对象允许发送和接收可选对象或字符串。它们可以被认为是面向消息的连接套接字.

连接对象通常使用Pipe– 也可以看看听众和客户.

class multiprocessing.connection.Connection
sendobj

将对象发送到连接的另一端,应该使用recv().

该对象必须是可选择的。非常大的泡菜(大约32 MiB +,虽然它取决于操作系统)可能会引起ValueError异常

recv// ()

使用send()。阻止直到有东西要收到。引发EOFError如果没有什么可以收到而另一端被关闭了

fileno

返回连接使用的文件描述符或句柄.

close ()

关闭连接.

当连接被垃圾收集时自动调用.

poll ([timeout]

返回是否有任何数据可供阅读

如果没有指定timeout那么它将立即返回。如果timeout是一个数字,然后指定以秒为单位的最大时间。如果timeoutNone则使用无限超时.

注意可以使用multiprocessing.connection.wait().

send_bytesbuffer[, offset[, size] ]

从发送字节数据字节对象作为一个完整的信息

如果offset然后给出数据从中的那个位置读取buffer。如果size然后给出从缓冲区读取的许多字节。非常大的缓冲区(大约32 MiB +,虽然它取决于操作系统)可能会引发ValueError异常

recv_bytes [maxlength]

返回从连接另一端发送的字节数据的完整消息作为字符串。阻止直到有东西接收。起见EOFError如果没有什么东西可以接收而另一端已经关闭了

如果指定了maxlength并且消息长于maxlength那么OSError就会被提起来连接将不再可读.

改版3.3:这个功能用来举起IOError,现在是OSError.

recv_bytes_intobuffer[, offset]

读入buffer从连接另一端发送的字节数据的完整消息,并返回消息中的字节数。阻止直到有东西要收到。引发EOFError如果没有什么可以收到而另一端是封闭的.

buffer必须是一个可写的字节对象。如果offset然后给出消息将从该位置写入缓冲区。偏移量必须是一个非负整数,小于buffer的长度(以字节为单位).

如果缓冲区太短,那么BufferTooShort异常israised和完整的消息可用作e.args[0]其中e是异常实例.

更改版本3.3:现在可以使用Connection.send()Connection.recv().

版本3.3中的新功能:连接对象现在支持上下文管理协议 – 请参阅上下文管理器类型. __enter__()返回连接对象,__exit__()调用close().

例如:

>>> from multiprocessing import Pipe>>> a, b = Pipe()>>> a.send([1, "hello", None])>>> b.recv()[1, "hello", None]>>> b.send_bytes(b"thank you")>>> a.recv_bytes()b"thank you">>> import array>>> arr1 = array.array("i", range(5))>>> arr2 = array.array("i", [0] * 10)>>> a.send_bytes(arr1)>>> count = b.recv_bytes_into(arr2)>>> assert count == len(arr1) * arr1.itemsize>>> arr2array("i", [0, 1, 2, 3, 4, 0, 0, 0, 0, 0])

警告

Connection.recv()方法会自动取消收到的数据,这可能会带来安全风险,除非您可以信任发送邮件的过程.

因此,除非使用Pipe()生成连接对象,否则在执行某种身份验证后,只应使用recv()send()方法。见认证密钥.

警告

如果进程在尝试读取或写入管道时被杀死,则管道中的数据可能会被破坏,因为可能无法确定消息边界所在的位置.

同步原语

通常,同步原语在多进程程序中不像在多线程程序中那样必要。请参阅threadingmodule.

请注意,也可以使用managerobject创建同步原语 – 请参阅经理.

class multiprocessing.Barrierparties[, action[, timeout] ]

屏障对象:的克隆threading.Barrier.

3.3版本中的新功能

class multiprocessing.BoundedSemaphore[value]

一个有界的信号量对象:的密切类比threading.BoundedSemaphore.

存在与其近似类比的独立差异:它acquiremethod’sfirst参数名为block,与Lock.acquire().

一致注意

在Mac OS X上,这与Semaphore无法区分,因为sem_getvalue()未在该平台上实现.

class multiprocessing.Condition [lock]

一个条件变量:threading.Condition.

的别名如果指定了lock那么它应该是LockRLock来自multiprocessing.

改变了版本3.3: wait_for()方法被添加了

class multiprocessing.Event

克隆threading.Event.

class multiprocessing.Lock

一个非递归的锁定对象:一个非常类似的threading.Lock。一旦进程或线程获得了一个锁,后续尝试从任何进程或线程获取它将阻塞直到它被释放;任何进程或线程都可以释放它。适用于线程的threading.Lock的概念和行为在multiprocessing.Lock中复制,因为它适用于进程或线程,除非另有说明.

注意Lock实际上是一个工厂函数,它返回用adefault上下文初始化的multiprocessing.synchronize.Lock的实例.

Lock支持上下文管理器协议,因此可以使用在with陈述中

acquireblock=True, timeout=None)

获取锁定,阻止或无阻塞.

block参数设置为True(默认值),方法callwill阻塞,直到锁处于解锁状态,然后将其设置为locked并返回True。请注意,第一个参数的名称与threading.Lock.acquire().

中的名称不同于block参数设置为False,方法调用不阻塞。如果锁当前处于锁定状态,则返回False;否则将锁设置为锁定状态并返回True.

当使用timeout的正浮点值调用时,只要锁定无法获取锁定,最多可以阻止timeout指定的秒数。对timeout等于timeout零。调用timeout的价值 None(默认值)将超时时间设置为无限期。注意治疗阴性或None的值timeoutthreading.Lock.acquire()中的实现行为不同。如果timeout参数设置为block并且因此被赋值,则False参数没有实际应用。返回True如果已经获得锁定,或者如果超时时间已经过去False.

release()

解锁。这可以从任何进程或线程调用,而不仅仅是最初获取锁的进程或线程.

行为与threading.Lock.release()除了在未锁定的锁上调用时,ValueError被养了

class multiprocessing.RLock

递归锁定对象:的近似模拟threading.RLock。必须由获取它的进程或线程释放递归锁。一旦进程或线程获得了递归锁,相同的处理器线程可以再次获取它而不会阻塞;该过程或线程必须在每次获取时释放一次.

注意RLock实际上是一个工厂函数,它返回用adefault上下文初始化的multiprocessing.synchronize.RLock的实例.

RLock支持上下文管理器协议因此可用于with陈述

acquire (block=True, timeout=None)

获取锁,阻塞或无阻塞.

block参数设置调用至 True,阻止直到锁处于解锁状态(不属于任何进程或线程),取消锁定已被当前进程或线程拥有。然后当前进程或线程取得锁的所有权(如果它还没有所有权)并且锁内的递归级别增加1,导致返回值为True。请注意,与threading.RLock.acquire()的实现相比,第一个参数的行为有几个不同,从参数本身的名称开始.

当用block参数设置为False,不要阻塞。如果锁已被另一个进程或线程获取(因而拥有),则当前进程或线程不占用所有权,并且锁中的递归级别不会更改,从而导致返回值为False。如果锁处于解锁状态,则当前进程或线程获取所有权并递增递归级别,导致返回值为True.

使用和timeout参数与Lock.acquire()中的相同。请注意,timeout的某些行为与threading.RLock.acquire().

release

释放锁定,递减递归级别。如果在递减级别之后递归级别为零,则将锁定重置为已解锁(未被任何进程或线程拥有),并且如果任何其他进程或线程被阻塞等待锁定被解锁,则允许其中一个进程继续。如果在递减之后递归级别仍为非零,则锁定保持锁定并由调用进程正确读取

只有在调用进程或线程拥有锁时才调用此方法。对AssertionError如果此方法由所有者以外的处理器线程调用,或者锁定处于未锁定(无主)状态,则引发此方法。请注意,在这种情况下引发的异常类型与threading.RLock.release().

class multiprocessing.Semaphore[value]

信号量对象:的密切类比threading.Semaphore.

存在与其近似类比的独立差异:它acquiremethod’sfirst参数名为block,与一致Lock.acquire().

注意

在Mac OS X上,sem_timedwait不支持,所以调用acquire()超时将使用睡眠循环模拟该函数行为.

注意

如果由产生的SIGINT信号按Ctrl-C 通过调用BoundedSemaphore.acquire(), Lock.acquire(),RLock.acquire(), Semaphore.acquire(), Condition.acquire()要么 Condition.wait()然后电话会立即中断并且KeyboardInterrupt会被提出来

这与threading当等效的阻塞调用正在进行时,SIGINT将被签名.

注意

该软件包的某些功能需要在主机操作系统上运行共享的信号量实现。没有一个,multiprocessing.synchronize模块将被禁用,尝试导入它将导致ImportError。Seebpo-3770了解更多信息.

分享ctypes物品

可以使用共享内存创建共享对象,这可以由子进程继承.

multiprocessing.Valuetypecode_or_type, *args, lock=True

返回ctypes从共享内存分配的对象。默认情况下,转换值实际上是对象的同步包装器。可以通过valueValue.

typecode_or_type确定返回对象的类型:它是actypes类型或array模块使用的类型的一个字符类型代码。*args传递给该类型的构造函数

如果lockTrue(默认值)然后创建一个新的递归锁定对象来同步对值的访问。如果lockLockRLock然后,将使用对象来同步访问该值。如果lockFalse然后访问返回的对象将不会被锁定自动保护,因此它不一定是“过程安全的”.

+=涉及读写的是非原子的。因此,例如,如果您想要以原子方式增加ashared值,那么就不足以做到

counter.value += 1

假设相关的锁是递归的(默认情况下是这样),你可以改为

with counter.get_lock():    counter.value += 1

注意lock是一个只有关键字的参数.

multiprocessing.Array (typecode_or_type, size_or_initializer, *, lock=True

返回从共享内存分配的ctypes数组。默认情况下,returnvalue实际上是数组的同步包装器.

typecode_or_type确定返回数组的元素类型:它是一个ctypes类型或array模块使用的那种字符类型。如果size_or_initializer是一个整数,然后它确定数组的长度,并且数组最初将归零。否则,size_or_initializer是一个序列,用于初始化数组,其长度决定数组的长度.

如果lockTrue(默认值)然后创建一个新的锁对象来同步对该值的访问。如果lockLockRLock然后,该对象将用于同步对值的访问。如果lockFalse那么访问返回的对象将不会被锁自动保护,所以它不一定是“过程安全的”.

注意lock是一个只有关键字的参数.

注意ctypes.c_char的数组有valueraw属性,允许用它来存储并检索字符串.

multiprocessing.sharedctypes模块

multiprocessing.sharedctypes模块提供了从共享内存中分配ctypes对象的功能,可以继承by childprocesses.

注意

虽然可以将指针存储在共享内存中,但请记住这将引用特定进程的地址空间中的某个位置。但是,指针很可能在第二个进程的上下文中无效并尝试从第二个进程取消引用指针可能会导致崩溃

multiprocessing.sharedctypes.RawArraytypecode_or_type, size_or_initializer)

返回从共享中分配的ctypes数组记忆。

typecode_or_type确定返回数组的元素类型:它是一个ctypes类型或array模块使用的那种字符类型。如果size_or_initializer是一个整数然后它确定数组的长度,并且数组最初将归零。否则size_or_initializer是一个用于初始化阵列的序列,其长度决定了数组的长度.

注意设置和获取元素可能是非原子的 – 使用Array()而是确保使用锁定自动同步访问.

multiprocessing.sharedctypes.RawValue (typecode_or_type, *args)

返回从共享内存分配的ctypes对象.

typecode_or_type确定返回对象的类型:它是actypes类型或array模块使用的类型的一个字符类型代码。*args传递给该类型的构造函数.

请注意,设置和获取值可能是非原子的 – 使用Value()而不是确保使用锁定自动同步访问.

注意ctypes.c_char的数组有valueraw允许用户存储和检索字符串的属性 – ctypes.

multiprocessing.sharedctypes.Arraytypecode_or_type, size_or_initializer, *, lock=True

和一样RawArray()除了取决于lock可以返回aprocess-safe同步包装而不是原始ctypesarray。

如果lockTrue(默认值),则创建一个新的锁对象,以便同步访问该值。如果lockLock要么 RLock对象然后将用于同步对值的访问。如果lockFalse然后访问返回的对象将不会被锁自动保护,因此它不一定是“进程安全的”.

请注意lock是一个仅限关键字的参数.

multiprocessing.sharedctypes.Valuetypecode_or_type, *args, lock=True

RawValue()一样,除了取决于lock的值,可以返回一个进程安全的同步包装而不是一个原始的ctypes对象.

如果lockTrue(默认值)然后创建一个新的锁对象来同步对该值的访问。如果lockLockRLock然后,该对象将用于同步对值的访问。如果lockFalse那么访问返回的对象将不会被锁自动保护,所以它不一定是“过程安全的”.

注意lock是一个只有关键字的参数.

multiprocessing.sharedctypes.copy(obj)

返回从共享内存中分配的ctypes对象,它是该类型对象的副本obj.

multiprocessing.sharedctypes.synchronizedobj [, lock]

为ctypes对象返回一个进程安全的包装器对象,该对象使用lock tosynchronize访问。如果lockNone(默认值)那么multiprocessing.RLock对象会自动创建.

同步包装器除了对象之外还有两个方法它包装:get_obj()返回包装对象,get_lock()返回用于同步的锁定对象.

注意通过包装器访问ctypes对象可能比访问包装对象慢很多raw ctypes object.

更改版本3.5:同步对象支持上下文管理器协议.

下表比较了使用常规ctypes语法从共享内存创建共享ctypes对象的语法。(在表MyStructctypes.Structure的几个子类。)

ctypes 使用类型 共享类型的共享类型使用typecode
c_double(2.4) RawValue(c_double,2.4) RawValue(’d’,2.4)
MyStruct(4,6) RawValue(MyStruct,4,6)  
(c_short * 7)() RawArray(c_short,7) RawArray(’h’,7)
(c_int * 3)(9,2,8) RawArray(c_int,(9),2,8)) RawArray(’i’,(9,2,8))

下面是一个示例,其中一些子进程修改了ctypes对象:

from multiprocessing import Process, Lockfrom multiprocessing.sharedctypes import Value, Arrayfrom ctypes import Structure, c_doubleclass Point(Structure):    _fields_ = [("x", c_double), ("y", c_double)]def modify(n, x, s, A):    n.value **= 2    x.value **= 2    s.value = s.value.upper()    for a in A:        a.x **= 2        a.y **= 2if __name__ == "__main__":    lock = Lock()    n = Value("i", 7)    x = Value(c_double, 1.0/3.0, lock=False)    s = Array("c", b"hello world", lock=lock)    A = Array(Point, [(1.875,-6.25), (-5.75,2.0), (2.375,9.5)], lock=lock)    p = Process(target=modify, args=(n, x, s, A))    p.start()    p.join()    print(n.value)    print(x.value)    print(s.value)    print([(a.x, a.y) for a in A])

打印结果是

490.1111111111111111HELLO WORLD[(3.515625, 39.0625), (33.0625, 4.0), (5.640625, 90.25)]

经理

管理器提供了一种创建可在不同进程之间共享的数据的方法,包括在不同机器上运行的进程之间通过网络共享。管理器对象控制管理shared objects。其他进程可以使用proxies来访问共享对象.

multiprocessing.Manager

返回一个开始的SyncManager可用于在进程之间共享对象的对象。返回的managerobject对应于一个衍生的子进程,并且有一些方法可以创建共享对象并返回相应的代理.

一旦它们被垃圾收集或者父进程退出,它们就会被关闭。经理类定义在multiprocessing.managers module:

class multiprocessing.managers.BaseManager [address [, authkey] ]

创建一个BaseManager对象.

一旦创建一个应该调用start()get_server().serve_forever()以确保管理器对象引用一个启动管理器进程.

address是管理器进程侦听新连接的地址。如果addressNone则选择任意一个.

authkey是用于检查服务器进程的传入连接的有效性的验证密钥。如果authkeyNone然后使用current_process().authkey。否则使用authkey并且它必须是字节串.

start ( [initializer [, initargs] ]

启动子进程以启动管理器。如果initializer不是None那么子进程会在启动时调用initializer(*initargs).

get_server()

返回一个Server对象,它表示对Manager进行控制的实际服务器。Server对象支持serve_forever()方法:

>>> from multiprocessing.managers import BaseManager>>> manager = BaseManager(address=("", 50000), authkey=b"abc")>>> server = manager.get_server()>>> server.serve_forever()

Server还有address属性

connect// ()

将本地管理器对象连接到远程管理器进程

>>> from multiprocessing.managers import BaseManager>>> m = BaseManager(address=("127.0.0.1", 5000), authkey=b"abc")>>> m.connect()
shutdown

停止管理器使用的进程。仅当start()用于启动服务器进程时才可用.

这可以被多次调用.

register (typeid [, callable [, proxytype [, exposed [, method_to_typeid [, create_method] ] ] ] ]

一种类方法,可以用来注册类型或可以与经理类一起调用.

typeid是一个“类型标识符“用于识别共享对象的特定类型。这必须是一个字符串.

callable是一个可调用的,用于为这个typeidentifier创建对象。如果使用connect()方法,或者如果create_method参数是False那么这可以保留为None.

proxytypeBaseProxy的子类,用于为共享对象创建代理这个 typeid。如果None则会自动创建一个proxyclass。

exposed用于指定一个方法名称序列,代码应该允许使用BaseProxy._callmethod()来访问这个typeid。(如果exposedNone则使用proxytype._exposed_而不是它存在。)如果没有指定暴露列表,则可以访问共享对象的所有“公共方法”。(这里的“公共方法”是指任何具有__call__()方法的属性,其名称不以"_"开头。)

method_to_typeid是用于指定那些暴露的返回类型的映射应返回代理的方法。它映射方法名称totypeid字符串。(如果method_to_typeidNone则使用proxytype._method_to_typeid_而不是它存在。)如果amethod的名称不是这个映射的关键字,或者映射是None然后方法返回的对象将被value.

create_method确定是否应该使用名称typeid这可以用来告诉服务器进程创建一个newshared对象并为它返回一个代理。默认情况下它是True.

BaseManager实例也有一个只读属性:

address

管理员使用的地址.

更改版本3.3: Manager对象支持上下文管理协议 – 请参阅上下文管理器类型. __enter__()启动服务器进程(如果尚未启动),然后返回管理器对象。__exit__() class shutdown().

以前的版本__enter__()如果它还没有启动,则没有启动管理器的服务器进程.

class multiprocessing.managers.SyncManager

BaseManager的子类可以用于进程的同步。这种类型的对象由multiprocessing.Manager().

它的方法创建并返回代理对象,以便跨进程同步多种常用数据类型。这主要包括共享列表和词典.

Barrierparties [, action[, timeout] ]

创建一个共享的threading.Barrier对象并为它返回aproxy .

版本3.3.

BoundedSemaphore( [value])

创建一个共享threading.BoundedSemaphore对象和为它返回aproxy .

Condition[lock]

创建共享threading.Condition对象并返回代理forit.

如果lock然后它应该是threading.Lock要么 threading.RLock宾语。

改版3.3:wait_for()方法被添加了

Event

创建共享threading.Event对象并返回它的代理.

Lock

创建共享threading.Lock对象并返回它的代理.

Namespace

创建一个共享Namespace对象并为其返回一个代理.

Queue ( [maxsize])

创建一个共享的queue.Queue对象并为它返回一个代理.

RLock ()

创建一个共享threading.RLock对象并返回它的代理.

Semaphore ( [value])

创建共享threading.Semaphore object并返回一个代理forit.

Array (typecode, sequence)

创建一个数组并为它返回一个代理.

Value (typecode, value

创建一个具有可写value属性的对象,并为其返回一个代理.

dict (
dict (mapping
dictsequence

创建一个共享的dict对象并为其返回一个代理.

list ( )
listsequence

创建共享list对象并返回它的代理.

版本3.6更改:共享对象能够嵌套。例如,共享容器(如共享列表)可以包含其他共享对象,这些对象将由SyncManager.

class multiprocessing.managers.Namespace

可以使用SyncManager.

注册的类型。命名空间对象没有公共方法,但是具有可写属性。它表示显示其属性的值.

但是,当使用代理命名空间对象时,以"_"开头的属性将是代理的属性,而不是其他属性:

>>> manager = multiprocessing.Manager()>>> Global = manager.Namespace()>>> Global.x = 10>>> Global.y = "hello">>> Global._z = 12.3    # this is an attribute of the proxy>>> print(Global)Namespace(x=10, y="hello")

自定义管理器

要创建一个自己的管理器,可以创建一个BaseManager的子类,并使用register() classmethod向管理器类注册新的类型或可编辑器。例如:

from multiprocessing.managers import BaseManagerclass MathsClass:    def add(self, x, y):        return x + y    def mul(self, x, y):        return x * yclass MyManager(BaseManager):    passMyManager.register("Maths", MathsClass)if __name__ == "__main__":    with MyManager() as manager:        maths = manager.Maths()        print(maths.add(4, 3))         # prints 7        print(maths.mul(7, 8))         # prints 56

使用远程管理器

可以在一台计算机上运行管理服务器,并让客户端从其他计算机上使用它(假设所涉及的防火墙允许它).

运行以下命令为远程客户端可以访问的单个共享队列创建服务器:

>>> from multiprocessing.managers import BaseManager>>> from queue import Queue>>> queue = Queue()>>> class QueueManager(BaseManager): pass>>> QueueManager.register("get_queue", callable=lambda:queue)>>> m = QueueManager(address=("", 50000), authkey=b"abracadabra")>>> s = m.get_server()>>> s.serve_forever()

一个客户端可以按如下方式访问服务器:

>>> from multiprocessing.managers import BaseManager>>> class QueueManager(BaseManager): pass>>> QueueManager.register("get_queue")>>> m = QueueManager(address=("foo.bar.org", 50000), authkey=b"abracadabra")>>> m.connect()>>> queue = m.get_queue()>>> queue.put("hello")

另一个客户端也可以使用它:

>>> from multiprocessing.managers import BaseManager>>> class QueueManager(BaseManager): pass>>> QueueManager.register("get_queue")>>> m = QueueManager(address=("foo.bar.org", 50000), authkey=b"abracadabra")>>> m.connect()>>> queue = m.get_queue()>>> queue.get()"hello"

本地进程也可以访问该队列,使用客户端上面的代码远程访问它:

>>> from multiprocessing import Process, Queue>>> from multiprocessing.managers import BaseManager>>> class Worker(Process):...     def __init__(self, q):...         self.q = q...         super(Worker, self).__init__()...     def run(self):...         self.q.put("local hello")...>>> queue = Queue()>>> w = Worker(queue)>>> w.start()>>> class QueueManager(BaseManager): pass...>>> QueueManager.register("get_queue", callable=lambda: queue)>>> m = QueueManager(address=("", 50000), authkey=b"abracadabra")>>> s = m.get_server()>>> s.serve_forever()

代理对象

代理是引用的对象一个共享的对象,它(大概)生活在一个不同的过程中。据说共享对象是代理的指示对象。多个代理对象可能具有相同的指示对象.

代理对象具有调用其指示对象的相应方法的方法(尽管并非所有指示对象的方法都必须通过代理可用)。通过这种方式,代理可以像它的指示对象一样使用:

>>> from multiprocessing import Manager>>> manager = Manager()>>> l = manager.list([i*i for i in range(10)])>>> print(l)[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]>>> print(repr(l))<ListProxy object, typeid "list" at 0x...>>>> l[4]16>>> l[2:5][4, 9, 16]

请注意,将str()应用于代理将返回指示对象的表示,而应用repr()将返回代理的表示.

代理对象的一个​​重要特性是它们是可选择的,因此它们可以在进程之间被绕过。因此,指示对象可以包含代理对象。这允许嵌套这些管理列表,dicts和其他代理对象

>>> a = manager.list()>>> b = manager.list()>>> a.append(b)         # referent of a now contains referent of b>>> print(a, b)[<ListProxy object, typeid "list" at ...>] []>>> b.append("hello")>>> print(a[0], b)["hello"] ["hello"]

同样,dict和list代理可以互相嵌套:

>>> l_outer = manager.list([ manager.dict() for i in range(2) ])>>> d_first_inner = l_outer[0]>>> d_first_inner["a"] = 1>>> d_first_inner["b"] = 2>>> l_outer[1]["c"] = 3>>> l_outer[1]["z"] = 26>>> print(l_outer[0]){"a": 1, "b": 2}>>> print(l_outer[1]){"c": 3, "z": 26}

如果标准(非代理)list要么 dict对象包含在指示对象中,对这些可变值的修改将不会通过管理器传播,因为代理无法知道何时修改其中包含的值。但是,将值存储在容器代理中(触发__setitem__在代理对象上)确实通过管理器传播,因此为了有效地修改这样的项,可以将修改后的值重新分配给容器代理:

# create a list proxy and append a mutable object (a dictionary)lproxy = manager.list()lproxy.append({})# now mutate the dictionaryd = lproxy[0]d["a"] = 1d["b"] = 2# at this point, the changes to d are not yet synced, but by# updating the dictionary, the proxy is notified of the changelproxy[0] = d

对于大多数用例来说,这种方法可能不如使用嵌套的代理对象方便,但也可以对同步进行一定程度的控制.

注意

代理类型在multiprocessing什么都不做,以支持价值比较。所以,例如,我们有:

>>> manager.list([1,2,3]) == [1,2,3]False

在进行比较时,应该使用指示物的副本.

class multiprocessing.managers.BaseProxy

代理对象是BaseProxy.

_callmethodmethodname[, args[, kwds] ]

调用并返回代理的指示方法的结果.

如果proxy是一个代理,其引用是obj然后表达式

proxy._callmethod(methodname, args, kwds)

将在管理器的进程中评估表达式

getattr(obj, methodname)(*args, **kwds)

.

返回的值将是调用结果的副本或新共享对象的代理 – 请参阅method_to_typeid BaseManager.register().

如果通过该调用引发异常,则由_callmethod()重新引发。如果在管理器的进程中引发了一些其他异常,则将其转换为RemoteError异常并由_callmethod().

请特别注意,如果methodname没有exposed.

会引发异常_callmethod()

>>> l = manager.list(range(10))>>> l._callmethod("__len__")10>>> l._callmethod("__getitem__", (slice(2, 7),)) # equivalent to l[2:7][2, 3, 4, 5, 6]>>> l._callmethod("__getitem__", (20,))          # equivalent to l[20]Traceback (most recent call last):...IndexError: list index out of range
_getvalue

返回指示物的副本.

如果指示物是不可取消的,则会引发异常.

__repr__

返回代理对象的表示.

__str__

返回指示物的表示.

清理

一个代理对象使用一个weakref回调,这样当它被垃圾收集时,它会从拥有它所指对象的管理器中注册自己.

当没有引用它的longerany代理时,共享对象将从管理器进程中删除.

进程池

人们可以创建一个流程池来执行提交给它的任务Pool class

class multiprocessing.pool.Pool ( [processes [, initializer [, initargs [, maxtasksperchild [, context] ] ] ] ]

一个进程池对象,它控制可以提交作业的工作进程池。它支持带有超时和回调的异步结果,并具有并行映射实现.

processes是要使用的工作进程数。如果processesNone然后使用os.cpu_count()返回的数字

如果initializer不是None那么每个工作进程在启动时都会调用initializer(*initargs).

maxtasksperchild是工作进程在退出之前可以完成的任务数,并用新的工作进程替换,以便释放未使用的资源。默认的maxtasksperchildNone,只要pool.

context可用于指定用于启动工作进程的上下文,它就是工作进程的存在。通常使用函数multiprocessing.Pool()或上下文对象的Pool()方法创建池。在这两种情况下context都是恰当的.

注意池对象的方法只能由创建池的进程调用.

新版本3.2:maxtasksperchild

版本3.4中的新功能:context

注意

Pool中的工作进程通常用于完成池的工作队列。在其他系统中发现的频繁模式(例如Apache,mod_wsgi等)来释放工作人员所拥有的资源是允许池中的工作者在退出,清理并生成新进程以替换旧工作之前仅完成一项工作。一。maxtasksperchildPool论证将这种能力暴露给最终用户.

apply (func [, args [, kwds] ]

func和关键字参数args调用kwds。它阻止直到结果准备好了。鉴于此块,apply_async()更适合并行执行工作。另外,func只在游泳池的一名工人中执行.

apply_async (func [, args [, kwds[, callback [, error_callback] ] ] ]

一个变种apply()返回结果对象的方法

如果callback如果指定它应该是一个可调用的接受单个参数。当结果准备就绪时callback应用于,即使呼叫失败,在这种情况下error_callback取而代之的是

如果指定了error_callback那么它应该是一个可调用的,它可以接受一个参数。如果目标函数失败,则使用异常实例调用error_callback

回调应立即完成,否则处理结果的线程将被阻止.

map (func, iterable [, chunksize])

A平行等价的map()内置函数(虽然它只支持iterable参数)。它会阻塞,直到结果准备好

此方法将可迭代切换为多个块,并将其作为单独的任务提交给进程池。这些块的(近似)大小可以通过设置chunksize一个正整数.

注意它可能导致很长的迭代次数的高内存使用。考虑使用imap()imap_unordered()明确chunksize提高效率的选择.

map_async (func, iterable [, chunksize [, callback [, error_callback]]]

返回结果对象的map()方法的变种.

如果指定了callback那么它应该是一个可调用的,接受单个参数。当结果准备就绪时callback应用于,即使呼叫失败,在这种情况下应用error_callback

如果error_callback是如果指定它应该是一个可调用的whichaccepts一个参数。如果目标函数失败,则使用异常实例调用error_callback

应该立即完成回调,否则处理结果的线程将被阻止.

imap (func, iterable [, chunksize]

map().

的一个更懒的版本chunksize参数与使用的相同通过map()方法。对于使用chunksize的较大值的非常长的迭代,可以使作业完成 比使用1.

的默认值更快,如果chunksize1然后next()imap()方法返回的迭代器的方法有一个可选的timeout参数:next(timeout)如果在multiprocessing.TimeoutError内无法返回结果会引发timeout秒.

imap_unordered (func, iterable [, chunksize])

imap()一样来自迭代器的结果应该被认为是任意的。(只有当只有一个工人程序才能保证“正确”的顺序。)

starmapfunc, iterable [, chunksize]

喜欢map(),除了iterable的元素应该是作为参数解压缩的迭代.

Hence iterable of [(1,2), (3, 4)]结果[func(1,2),func(3,4)].

版本3.3.

starmap_async (func, iterable [, chunksize [, callback[, error_callback] ] ]

starmap()map_async()的组合迭代iterable迭代和调用func与iterables unpacked.Returns结果对象.

新版本3.3.

close ( )

防止将更多任务提交到池中。一旦所有任务完成,工人进程将退出.

terminate (

立即停止工作流程而不完成未完成的工作。当池对象被垃圾收集时terminate()会立即响起

join// ()

等待工人进程退出。在使用close()之前必须调用terminate()join().

版本3.3中的新内容:池对象现在支持上下文管理协议 – 请参阅 Context Manager Types . __enter__()返回pool对象,__exit__()调用terminate().

class multiprocessing.pool.AsyncResult

Pool.apply_async()Pool.map_async().

get返回的结果类( [timeout]

到达时返回结果。如果timeout不是None并且结果没有在timeout秒内到达那么multiprocessing.TimeoutError被提出。如果远程调用raisean异常,那么该异常将被get().

wait [timeout] )重新调整

等到结果可用或直到timeout秒通过

ready)

返回通话是否已完成.

successful (

返回调用是否完成而不引发异常。威利斯AssertionError如果结果没有准备好

以下示例演示了如何使用池:

from multiprocessing import Poolimport timedef f(x):    return x*xif __name__ == "__main__":    with Pool(processes=4) as pool:         # start 4 worker processes        result = pool.apply_async(f, (10,)) # evaluate "f(10)" asynchronously in a single process        print(result.get(timeout=1))        # prints "100" unless your computer is *very* slow        print(pool.map(f, range(10)))       # prints "[0, 1, 4,..., 81]"        it = pool.imap(f, range(10))        print(next(it))                     # prints "0"        print(next(it))                     # prints "1"        print(it.next(timeout=1))           # prints "4" unless your computer is *very* slow        result = pool.apply_async(time.sleep, (10,))        print(result.get(timeout=1))        # raises multiprocessing.TimeoutError

监听器客户端

通常在进程之间传递消息是使用队列或使用ConnectionPipe().

返回的对象然而,multiprocessing.connection模块允许一些额外的灵活性。它基本上提供了一个面向消息的高级API,用于处理套接字或Windows命名管道。它还支持digestauthentication使用hmac模块,同时用于轮询多个连接.

multiprocessing.connection.deliver_challenge (connection, authkey )

将随机生成的消息发送到连接的另一端并等待回复.

如果回复与使用authkey的消息的摘要匹配,则将欢迎消息发送给另一个连接结束。否则AuthenticationError被抬起来

multiprocessing.connection.answer_challengeconnection, authkey)

接收消息,用authkey作为密钥来计算消息的摘要,然后发送摘要

//如果没有收到欢迎信息,那么AuthenticationError就会被提起来

multiprocessing.connection.Clientaddress [, family [, authkey] ]

尝试建立与使用地址address的监听器的连接,返回Connection.

连接的类型由family参数,但这通常可以省略,因为它通常可以从address的格式推断出来。(参见地址格式

如果给出authkey而不是None,它应该是一个字节字符串,并将用作密钥基于HMAC的身份验证挑战。如果authkey为None,则无法进行身份验证。如果验证失败,则会引发AuthenticationError。查看验证密钥.

class multiprocessing.connection.Listener [address [, family [, backlog [, authkey] ] ] ]

绑定套接字或Windows命名管道的包装器,它正在“监听”连接.

address是由侦听器对象的绑定套接字或命名管道使用的地址。

注意

如果使用的地址为“0.0.0.0”,则该地址将不是Windows上的可连接端点。如果你需要一个可连接的终点,你应该使用’127.0.0.1′.

family是要使用的套接字(或命名管道)的类型。这可以是字符串"AF_INET"(对于TCP套接字),"AF_UNIX"(对于Unixdomain套接字)或"AF_PIPE"(对于Windows命名管道)。其中只有第一个保证可用。如果familyNone那么家庭是从address的格式推断出来的。如果address也是None则选择默认值。此默认值是系列,它被认为是最快的。见地址格式。注意,如果family"AF_UNIX"并且地址是None那么套接字将在使用tempfile.mkstemp().

创建的aprivate临时目录中创建如果监听器对象使用套接字然后backlog(默认情况下为1)一旦被绑定就传递给套接字的listen()方法

如果给出authkey而不是None,它应该是一个字节字符串,并将用作基于HMAC的身份验证质询的密钥。如果authkey为None,则无法进行身份验证。如果验证失败,则会引发AuthenticationError。查看验证密钥.

accept

接受listenerobject的绑定套接字或命名管道上的连接,并返回一个Connection对象。如果尝试验证失败,则AuthenticationError被引发

close// (

关闭侦听器对象的绑定套接字或命名管道。当监听器被垃圾收集时,这会被自动调用。但是建议明确地称之为.

监听器对象具有以下只读属性:

address

监听器对象使用的地址.

last_accepted

上次接受连接的地址。如果这是不可用的那么它是None.

版本3.3中的新功能:监听器对象现在支持上下文管理协议 – 请参阅上下文管理器类型. __enter__()返回thelistener对象,__exit__()close().

multiprocessing.connection.waitobject_list, timeout=None

等到object_list准备好了。返回object_list准备好了。如果timeout然后呼叫阻塞最多那么多秒。如果timeoutNone然后它将无限期地阻塞。负超时相当于零超时.

对于Unix和Windows,一个对象可以出现在object_list if if is

  • 一个可读的Connection对象;
  • 一个连接且可读的socket.socket对象;或者
  • sentinel Process对象的

属性当有数据可以从中读取时,连接或套接字对象就绪,或者另一端已关闭.

Unixwait(object_list, timeout)几乎相当于select.select(object_list, [], [], timeout)。区别在于,如果select.select()被信号中断,它会错误地OSError,错误号为EINTR,而wait()则不会.

Windows object_list中的一个项必须是一个可以等待的整数句柄(根据Win32函数的文档使用的定义WaitForMultipleObjects())或它可以是一个带有fileno()方法的对象,它返回asocket句柄或管道句柄。(请注意,管道把手和套筒把手是不是等待把手。)

3.3版本中的新功能.

示例

以下服务器代码创建一个使用"secret password" asan身份验证密钥的侦听器。然后它等待连接并将一些数据发送到客户端:

from multiprocessing.connection import Listenerfrom array import arrayaddress = ("localhost", 6000)     # family is deduced to be "AF_INET"with Listener(address, authkey=b"secret password") as listener:    with listener.accept() as conn:        print("connection accepted from", listener.last_accepted)        conn.send([2.25, None, "junk", float])        conn.send_bytes(b"hello")        conn.send_bytes(array("i", [42, 1729]))

以下代码连接到服务器并从服务器接收一些数据:

from multiprocessing.connection import Clientfrom array import arrayaddress = ("localhost", 6000)with Client(address, authkey=b"secret password") as conn:    print(conn.recv())                  # => [2.25, None, "junk", float]    print(conn.recv_bytes())            # => "hello"    arr = array("i", [0, 0, 0, 0, 0])    print(conn.recv_bytes_into(arr))    # => 8    print(arr)                          # => array("i", [42, 1729, 0, 0, 0])

以下代码使用wait() towait对于来自多个进程的消息:

import time, randomfrom multiprocessing import Process, Pipe, current_processfrom multiprocessing.connection import waitdef foo(w):    for i in range(10):        w.send((i, current_process().name))    w.close()if __name__ == "__main__":    readers = []    for i in range(4):        r, w = Pipe(duplex=False)        readers.append(r)        p = Process(target=foo, args=(w,))        p.start()        # We close the writable end of the pipe now to be sure that        # p is the only process which owns a handle for it.  This        # ensures that when p closes its handle for the writable end,        # wait() will promptly report the readable end as being ready.        w.close()    while readers:        for r in wait(readers):            try:                msg = r.recv()            except EOFError:                readers.remove(r)            else:                print(msg)

地址格式

  • "AF_INET"地址是(hostname, port)形式的元组,其中hostname是一个字符串和port是一个整数.
  • "AF_UNIX"地址是一个表示文件系统文件名的字符串.
  • 一个"AF_PIPE"地址是一个形式为
    r"\\.\pipe{PipeName}"的字符串。要使用Client()连接到名为ServerName的远程计算机上的命名管道,应该使用表格r"\ServerName\pipe{PipeName}"的地址代替

注意任何字符串开头有两个反斜杠默认为bean "AF_PIPE"地址而不是"AF_UNIX"地址

认证密钥

当使用Connection.recv,收到的数据是自动取消的。不幸的是,从不受信任的来源中删除数据是一种安全风险。因此ListenerClient()使用hmac模块提供摘要认证.

认证密钥是一个字节字符串,可以被认为是一个密码:一旦一个连接建立两端将要求另一方知道认证密钥。(证明bothends正在使用相同的密钥涉及在连接时发送密钥。)

如果请求验证但没有指定验证密钥,则返回值使用current_process().authkey(见Process)。此值将由当前进程创建的任何Process对象自动继承。这意味着(默认情况下)多进程程序的所有进程将共享单个身份验证密钥,可在设置它们之间的连接时使用。

使用os.urandom().

记录

有一些支持日志记录的可用。但请注意logging包不使用进程共享锁,因此可以(取决于处理程序类型)来自不同进程的消息混淆.

multiprocessing.get_logger

返回multiprocessing。如有必要,将创建一个新的.

首次创建时,记录器具有级别logging.NOTSET和nodefault处理程序。发送到此记录器的消息默认不会传播到根记录器.

请注意,在Windows上,子进程只会继承父进程记录器的级别 – 记录器的任何其他自定义都不会被继承.

multiprocessing.log_to_stderr ( )

这个函数执行对get_logger()的调用,但除了返回get_logger创建的记录器之外,还添加了一个处理程序,使用格式sys.stderr将outputout输出到"[%(levelname)s/%(processName)s] %(message)s".

下面是打开日志记录的示例会话:

>>> import multiprocessing, logging>>> logger = multiprocessing.log_to_stderr()>>> logger.setLevel(logging.INFO)>>> logger.warning("doomed")[WARNING/MainProcess] doomed>>> m = multiprocessing.Manager()[INFO/SyncManager-...] child process calling self.run()[INFO/SyncManager-...] created temp directory /.../pymp-...[INFO/SyncManager-...] manager serving at "/.../listener-...">>> del m[INFO/MainProcess] sending shutdown message to manager[INFO/SyncManager-...] manager exiting with exitcode 0

有关完整的日志记录级别表,请参阅logging module.

multiprocessing.dummy模块

multiprocessing.dummy复制multiprocessing但不仅仅是threadingmodule.

编程指南

使用multiprocessing.

所有启动方法

以下适用于所有启动方法.

避免共享状态

应尽量避免移动大量数据库进程.

最好坚持使用队列或管道进行进程之间的通信,而不是使用较低级别的同步原语.

Picklability

确保代理方法的参数是可选择的.

代理的安全性

不要使用多个线程中的代理对象,除非你用锁来保护它.

(使用same代理的不同进程从来没有问题。)

加入僵尸进程

在Unix上,当一个进程完成但尚未加入时,它就变成了一个僵尸。永远不应该有很多,因为每次都有一个新的进程开始(或active_children()被称为)所有尚未加入的已完成的流程将被加入。同时调用完成过程的Process.is_alive将加入这个过程。即便如此,明确加入你开始的所有过程也许是很好的实践.

更好地继承比pickle / unpickle

使用时spawn要么 forkservermultiprocessing需要可以选择,以便子进程可以使用它们。但是,通常应该避免使用管道或队列将共享对象发送到其他进程。相反,您应该安排程序,以便需要访问其他地方创建的共享资源的进程可以从祖先进程继承它.

避免终止进程

使用Process.terminate停止进程的方法可能导致进程当前正在使用的任何共享资源(例如锁,信号量,管道和队列)被破坏或不可用于其他进程.

因此,最好只考虑使用带有输出缓冲的“文件类对象”的Process.terminate从不使用任何共享资源的进程.

加入使用队列的进程

请记住,将项目放入队列的进程将在终止之前等待,直到所有缓冲的项目都由“馈线”线到下面的管道。(子进程可以调用队列的Queue.cancel_join_thread方法来避免这种行为。)

这意味着每当你使用队列时,你需要确保已经放入队列的所有项目在加入进程之前,最终将删除队列。否则,您无法确定队列中的输入项的进程是否将终止。还要记住,非守护进程会自动加入.

会出现死锁的示例如下:

from multiprocessing import Process, Queuedef f(q):    q.put("X" * 1000000)if __name__ == "__main__":    queue = Queue()    p = Process(target=f, args=(queue,))    p.start()    p.join()                    # this deadlocks    obj = queue.get()

这里的修复方法是交换最后两行(或者只是删除p.join()行).

明确地将资源传递给子进程

在Unix上使用forkstart方法中,子进程可以使用全局资源来使用在父进程中创建的共享资源。但是,最好将对象作为参数传递给子进程的构造函数.

使代码(可能)与Windows和其他启动方法兼容,这也确保了只要子进程是仍然活着的对象将不会在父进程中被垃圾收集。如果在对象是在父进程中进行垃圾收集时释放了一些资源,这可能很重要.

例如

from multiprocessing import Process, Lockdef f():    ... do something using "lock" ...if __name__ == "__main__":    lock = Lock()    for i in range(10):        Process(target=f).start()

应该改写为

from multiprocessing import Process, Lockdef f(l):    ... do something using "l" ...if __name__ == "__main__":    lock = Lock()    for i in range(10):        Process(target=f, args=(lock,)).start()

小心更换sys.stdin带有“像对象一样的文件”

multiprocessing最初无条件地调用:

os.close(sys.stdin.fileno())

multiprocessing.Process._bootstrap()方法中 – 这导致了进程中的进程问题。这已经改为:

sys.stdin.close()sys.stdin = open(os.open(os.devnull, os.O_RDONLY), closefd=False)

这解决了进程相互冲突的根本问题,导致错误的文件描述符错误,但是引入了一个潜在的危险应用程序取代sys.stdin()。这种危险是如果多个进程调用close()在这个类文件对象上,可能导致同一个数据被多次刷新到对象,导致损坏.

如果您编写类似文件的对象并实现自己的缓存,则可以通过在附加到缓存时存储pid并在pid更改时丢弃缓存来使其成为fork-safe。例如:

@propertydef cache(self):    pid = os.getpid()    if pid != self._pid:        self._pid = pid        self._cache = []    return self._cache

有关更多信息,请参阅bpo-5155,bpo-5313和bpo-5331

spawnforkserver启动方法

还有一些额外的限制哪个不适用于fork启动方法.

更多的可挑选性

确保Process.__init__()的所有参数都是可选择的。另外,如果你继承Process,那么确保当Process.start方法是的时候可以选择实例called.

全局变量

请记住,如果在子进程中运行的代码尝试访问globalvariable,那么它看到的值(如果有的话)可能与调用Process.start时父进程中的值不同。

然而,只是模块级常量的全局变量会导致noproblems.

主模块的安全导入

确保主模块可以通过新的Python解释器安全地导入没有引起意外的副作用(例如启动新进程).

例如,使用spawnforkserver启动方法运行以下模块将失败,并且RuntimeError

from multiprocessing import Processdef foo():    print("hello")p = Process(target=foo)p.start()

而应该使用if__name__ == "__main__":来保护程序的“入口点”,如下所示:

from multiprocessing import Process, freeze_support, set_start_methoddef foo():    print("hello")if __name__ == "__main__":    freeze_support()    set_start_method("spawn")    p = Process(target=foo)    p.start()

freeze_support()行可以省略如果程序运行正常而不是冻结。)

这允许新生成的Python解释器安全地导入模块,然后运行模块的foo()功能

适用类似的限制如果在mainmodule中创建了一个池或管理器

示例

演示如何创建和使用自定义管理器和代理:

from multiprocessing import freeze_supportfrom multiprocessing.managers import BaseManager, BaseProxyimport operator##class Foo:    def f(self):        print("you called Foo.f()")    def g(self):        print("you called Foo.g()")    def _h(self):        print("you called Foo._h()")# A simple generator functiondef baz():    for i in range(10):        yield i*i# Proxy type for generator objectsclass GeneratorProxy(BaseProxy):    _exposed_ = ["__next__"]    def __iter__(self):        return self    def __next__(self):        return self._callmethod("__next__")# Function to return the operator moduledef get_operator_module():    return operator##class MyManager(BaseManager):    pass# register the Foo class; make `f()` and `g()` accessible via proxyMyManager.register("Foo1", Foo)# register the Foo class; make `g()` and `_h()` accessible via proxyMyManager.register("Foo2", Foo, exposed=("g", "_h"))# register the generator function baz; use `GeneratorProxy` to make proxiesMyManager.register("baz", baz, proxytype=GeneratorProxy)# register get_operator_module(); make public functions accessible via proxyMyManager.register("operator", get_operator_module)##def test():    manager = MyManager()    manager.start()    print("-" * 20)    f1 = manager.Foo1()    f1.f()    f1.g()    assert not hasattr(f1, "_h")    assert sorted(f1._exposed_) == sorted(["f", "g"])    print("-" * 20)    f2 = manager.Foo2()    f2.g()    f2._h()    assert not hasattr(f2, "f")    assert sorted(f2._exposed_) == sorted(["g", "_h"])    print("-" * 20)    it = manager.baz()    for i in it:        print("<%d>" % i, end=" ")    print()    print("-" * 20)    op = manager.operator()    print("op.add(23, 45) =", op.add(23, 45))    print("op.pow(2, 94) =", op.pow(2, 94))    print("op._exposed_ =", op._exposed_)##if __name__ == "__main__":    freeze_support()    test()

使用Pool

import multiprocessingimport timeimport randomimport sys## Functions used by test code#def calculate(func, args):    result = func(*args)    return "%s says that %s%s = %s" % (        multiprocessing.current_process().name,        func.__name__, args, result        )def calculatestar(args):    return calculate(*args)def mul(a, b):    time.sleep(0.5 * random.random())    return a * bdef plus(a, b):    time.sleep(0.5 * random.random())    return a + bdef f(x):    return 1.0 / (x - 5.0)def pow3(x):    return x ** 3def noop(x):    pass## Test code#def test():    PROCESSES = 4    print("Creating pool with %d processes\n" % PROCESSES)    with multiprocessing.Pool(PROCESSES) as pool:        #        # Tests        #        TASKS = [(mul, (i, 7)) for i in range(10)] + \                [(plus, (i, 8)) for i in range(10)]        results = [pool.apply_async(calculate, t) for t in TASKS]        imap_it = pool.imap(calculatestar, TASKS)        imap_unordered_it = pool.imap_unordered(calculatestar, TASKS)        print("Ordered results using pool.apply_async():")        for r in results:            print("\t", r.get())        print()        print("Ordered results using pool.imap():")        for x in imap_it:            print("\t", x)        print()        print("Unordered results using pool.imap_unordered():")        for x in imap_unordered_it:            print("\t", x)        print()        print("Ordered results using pool.map() --- will block till complete:")        for x in pool.map(calculatestar, TASKS):            print("\t", x)        print()        #        # Test error handling        #        print("Testing error handling:")        try:            print(pool.apply(f, (5,)))        except ZeroDivisionError:            print("\tGot ZeroDivisionError as expected from pool.apply()")        else:            raise AssertionError("expected ZeroDivisionError")        try:            print(pool.map(f, list(range(10))))        except ZeroDivisionError:            print("\tGot ZeroDivisionError as expected from pool.map()")        else:            raise AssertionError("expected ZeroDivisionError")        try:            print(list(pool.imap(f, list(range(10)))))        except ZeroDivisionError:            print("\tGot ZeroDivisionError as expected from list(pool.imap())")        else:            raise AssertionError("expected ZeroDivisionError")        it = pool.imap(f, list(range(10)))        for i in range(10):            try:                x = next(it)            except ZeroDivisionError:                if i == 5:                    pass            except StopIteration:                break            else:                if i == 5:                    raise AssertionError("expected ZeroDivisionError")        assert i == 9        print("\tGot ZeroDivisionError as expected from IMapIterator.next()")        print()        #        # Testing timeouts        #        print("Testing ApplyResult.get() with timeout:", end=" ")        res = pool.apply_async(calculate, TASKS[0])        while 1:            sys.stdout.flush()            try:                sys.stdout.write("\n\t%s" % res.get(0.02))                break            except multiprocessing.TimeoutError:                sys.stdout.write(".")        print()        print()        print("Testing IMapIterator.next() with timeout:", end=" ")        it = pool.imap(calculatestar, TASKS)        while 1:            sys.stdout.flush()            try:                sys.stdout.write("\n\t%s" % it.next(0.02))            except StopIteration:                break            except multiprocessing.TimeoutError:                sys.stdout.write(".")        print()        print()if __name__ == "__main__":    multiprocessing.freeze_support()    test()

一个示例显示如何使用队列将任务提供给一组workerprocesses并收集结果:

import timeimport randomfrom multiprocessing import Process, Queue, current_process, freeze_support## Function run by worker processes#def worker(input, output):    for func, args in iter(input.get, "STOP"):        result = calculate(func, args)        output.put(result)## Function used to calculate result#def calculate(func, args):    result = func(*args)    return "%s says that %s%s = %s" % \        (current_process().name, func.__name__, args, result)## Functions referenced by tasks#def mul(a, b):    time.sleep(0.5*random.random())    return a * bdef plus(a, b):    time.sleep(0.5*random.random())    return a + b###def test():    NUMBER_OF_PROCESSES = 4    TASKS1 = [(mul, (i, 7)) for i in range(20)]    TASKS2 = [(plus, (i, 8)) for i in range(10)]    # Create queues    task_queue = Queue()    done_queue = Queue()    # Submit tasks    for task in TASKS1:        task_queue.put(task)    # Start worker processes    for i in range(NUMBER_OF_PROCESSES):        Process(target=worker, args=(task_queue, done_queue)).start()    # Get and print results    print("Unordered results:")    for i in range(len(TASKS1)):        print("\t", done_queue.get())    # Add more tasks using `put()`    for task in TASKS2:        task_queue.put(task)    # Get and print some more results    for i in range(len(TASKS2)):        print("\t", done_queue.get())    # Tell child processes to stop    for i in range(NUMBER_OF_PROCESSES):        task_queue.put("STOP")if __name__ == "__main__":    freeze_support()    test()

发表评论

电子邮件地址不会被公开。 必填项已用*标注