You are here:  Home » Python » – 启动并行任务 – 并发执行(Python教程)(参考资料)

concurrent.futures– 启动并行任务

版本3.2.

新增源代码: Lib / concurrent / futures / thread.py和Lib / concurrent / futures / process.py


concurrent.futures模块提供了一个同步执行callables的高级接口.

可以使用ThreadPoolExecutor使用线程执行异步执行,也可以使用ProcessPoolExecutor。两者都实现了相同的接口,由抽象Executor类。

执行者对象

class concurrent.futures.Executor

一个抽象类,提供异步执行调用的方法。它不应该直接使用,而是通过它的具体子类.

submit (fn, *args, **kwargs)

安排可调用的,fn,以fn(*args **kwargs)并返回Future表示可执行的执行的对象.

with ThreadPoolExecutor(max_workers=1) as executor:    future = executor.submit(pow, 323, 1235)    print(future.result())
mapfunc, *iterables, timeout=None, chunksize=1

相近 map(func, *iterables)除了:

  • /iterables是立即收集而不是懒惰地收集;
  • func是异步执行的,并且可以同时调用func几次调用

返回的迭代器提出concurrent.futures.TimeoutError如果__next__()调用结果后timeout秒从原始调用到Executor.map().timeout可以是int或float。如果没有指定timeoutNone,则等待时间没有限制.

如果func调用引发异常,那么当从iterator

中检索到它的值时,该异常将被调用。当使用ProcessPoolExecutor时,这个方法会调整iterables分成几个块,它作为单独的任务提交给池。这些块的(近似)大小可以通过设置chunksize到正整数。对于很长的迭代,使用chunksize的大值可以显着提高性能,而默认大小为1.对于ThreadPoolExecutor, chunksize没有效果.

在版本3.5中更改:添加了chunksize参数

shutdown// (wait=True )

向执行者发出信号,表示当目前待处理的期货完成执行时,它应该释放它正在使用的任何资源。打电话给Executor.submit()Executor.map()停机后制作 RuntimeError.

如果waitTrue然后这个方法将不会返回,直到所有的待处理期货都完成执行并且已经释放了与执行者相关的资源。如果waitFalse然后这个方法将立即返回,并且当所有待处理的期货完成执行时,与执行者相关的资源将会出现。无论wait,整个Python程序都不会退出,直到所有未完成的期货都执行完毕.

如果使用with语句,你可以避免显式调用这个方法,这将关闭Executor(好像Executor.shutdown()被调用wait设置为True):

import shutilwith ThreadPoolExecutor(max_workers=4) as e:    e.submit(shutil.copy, "src1.txt", "dest1.txt")    e.submit(shutil.copy, "src2.txt", "dest2.txt")    e.submit(shutil.copy, "src3.txt", "dest3.txt")    e.submit(shutil.copy, "src4.txt", "dest4.txt")

ThreadPoolExecutor

ThreadPoolExecutorExecutor使用线程池异步执行调用的子类.

当可调用与Future腰部相关的另一个Future的结果时,就会发生死锁。例如:

import timedef wait_on_b():    time.sleep(5)    print(b.result())  # b will never complete because it is waiting on a.    return 5def wait_on_a():    time.sleep(5)    print(a.result())  # a will never complete because it is waiting on b.    return 6executor = ThreadPoolExecutor(max_workers=2)a = executor.submit(wait_on_b)b = executor.submit(wait_on_a)

And:

def wait_on_future():    f = executor.submit(pow, 5, 2)    # This will never complete because there is only one worker thread and    # it is executing this function.    print(f.result())executor = ThreadPoolExecutor(max_workers=1)executor.submit(wait_on_future)
class concurrent.futures.ThreadPoolExecutormax_workers=None, thread_name_prefix=””, initializer=None, initargs=()

一个Executor子类,最多使用一个池max_workers异步执行调用的线程

initializer是一个可选的可调用函数,在每个工作线程的开头调用;initargs是传递给初始化器的参数元组。应该 initializer提出异常,所有当前正在裁员的工作都会提高BrokenThreadPool,以及任何向工作组提交更多工作的企图.

在版本3.5中更改:如果max_workersNone如果没有给出,它将默认为机器上的处理器数量,乘以5,假设ThreadPoolExecutor通常用于重叠I / O而不是CPU工作,并且工人的数量应该高于ProcessPoolExecutor.

版本3.6中新增:thread_name_prefix添加了一个参数,允许用户控制池创建的工作线程的threading.Thread名称,以便于调试.

更改版本3.7:添加了initializerinitargs参数

ThreadPoolExecutor示例

import concurrent.futuresimport urllib.requestURLS = ["http://www.foxnews.com/",        "http://www.cnn.com/",        "http://europe.wsj.com/",        "http://www.bbc.co.uk/",        "http://some-made-up-domain.com/"]# Retrieve a single page and report the URL and contentsdef load_url(url, timeout):    with urllib.request.urlopen(url, timeout=timeout) as conn:        return conn.read()# We can use a with statement to ensure threads are cleaned up promptlywith concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:    # Start the load operations and mark each future with its URL    future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}    for future in concurrent.futures.as_completed(future_to_url):        url = future_to_url[future]        try:            data = future.result()        except Exception as exc:            print("%r generated an exception: %s" % (url, exc))        else:            print("%r page is %d bytes" % (url, len(data)))

ProcessPoolExecutor

ProcessPoolExecutor类是一个Executor子类,它使用一个进程池来异步执行调用.ProcessPoolExecutor使用multiprocessing模块,它可以左右全局解释器锁但也意味着可以执行和返回只有可选择的对象.

__main__模块必须可由工作子进程导入。这意味着ProcessPoolExecutor将无法在交互式解释器中工作.

调用ExecutorFuture来自可调用的方法提交到ProcessPoolExecutor会导致在deadlock.

class concurrent.futures.ProcessPoolExecutor (max_workers=None, mp_context=None, initializer=None, initargs=()

Executor使用最多max_workers进程池异步执行调用的子类。如果max_workersNone或未通过,它将默认为机器上的处理器数量。如果max_workers低于或等于0,则aValueError将被提升.mp_context可以是多处理上下文或无。它将用于发射工人。如果mp_contextNone或没有给出,则使用defaultmultiprocessing上下文.

initializer是一个可选的可调用函数,在每个工作进程的开始时调用;initargs是传递给初始化器的参数元组。应该 initializer提出异常,所有正在进行的工作都将提高BrokenProcessPool,以及任何向工作池提交更多工作的尝试.

更改版本3.3:当其中一个工作进程突然终止,现在出现了BrokenProcessPool错误。以前,行为是未定义的,但执行者或其未来的操作经常会冻结或死锁.

更改版本3.7:添加了mp_context参数以允许用户控制start_method对于由池创建的工作进程

添加了initializerinitargs arguments.

ProcessPoolExecutor示例

import concurrent.futuresimport mathPRIMES = [    112272535095293,    112582705942171,    112272535095293,    115280095190773,    115797848077099,    1099726899285419]def is_prime(n):    if n % 2 == 0:        return False    sqrt_n = int(math.floor(math.sqrt(n)))    for i in range(3, sqrt_n + 1, 2):        if n % i == 0:            return False    return Truedef main():    with concurrent.futures.ProcessPoolExecutor() as executor:        for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)):            print("%d is prime: %s" % (number, prime))if __name__ == "__main__":    main()

未来对象

Future类封装了callable的异步执行.Future实例由Executor.submit().

class concurrent.futures.Future

创建封装可调用的异步执行。Future实例是由Executor.submit()创建的,不应该直接创建,除了测试.

cancel ()

取消通话。如果当前正在执行调用并且无法取消,则该方法将返回False,否则呼叫将被取消,方法将返回True.

cancelled

如果呼叫被成功取消,则返回True.

running

返回True如果电话当前正在执行且无法解决.

done()

如果呼叫成功取消或结束运行,请回复True

resulttimeout=None

返回调用返回的值。如果呼叫尚未完成,则此方法将等待timeout秒。如果呼叫未在timeout秒内完成,则会引发concurrent.futures.TimeoutErrortimeout可以bean int或float。如果没有指定timeoutNone,则等待时间有限.

如果在完成之前取消将来取消,那么CancelledError将被提出。

如果提出调用,这个方法会引发同样的异常.

exceptiontimeout=None

返回通话引发的异常。如果呼叫尚未完成,则此方法将等到timeout秒。如果呼叫没有在timeout秒内完成,则会引发concurrent.futures.TimeoutErrortimeout可以bean int或float。如果没有指定timeoutNone,等待时间就没有了.

如果未来在完成之前被取消那么CancelledError将被提升.

如果通话完成而没有提高,则None被退回.

add_done_callbackfn

将可调用的fn附加到将来。fn当未来被取消或结束时,将以未来作为唯一的论点来召唤.

添加的callables按照添加的顺序调用,并且总是在属于添加它们的进程的线程中调用。如果可赎回提出Exception子类,它将被记录和签名。如果callable引发BaseException子类,则行为未定义.

如果将来已经完成或被取消,fn将立即考虑

以下Future方法用于单元测试和Executor实施.

set_running_or_notify_cancel ()

这个方法只能由Executor在执行与Future和unittests相关的工作之前的实现

如果方法返回False那么Future被取消,即。Future.cancel()被叫了回来真的。任何等待Future完成的线程(即通过as_completed()wait())都会被唤醒

如果方法返回True然后Future未被取消并且已经处于运行状态,即呼叫Future.running()将返回 真正.

此方法只能调用一次,在Future.set_result()要么 Future.set_exception()被叫了

set_resultresult

将与Future关联的工作结果设置为result.

此方法仅应由Executor实现使用andunit tests.

set_exception(exception)

将与Future关联的工作结果设置为Exception exception.

此方法仅应由Executor implements andunit tests使用

模块函数

concurrent.futures.waitfs, timeout=None, return_when=ALL_COMPLETED

等待 FutureExecutor给出的实例(可能由不同的fs实例创建)完成。返回一组named2元组。第一集,名为done,包含在等待完成之前完成(已完成或已取消)的期货。第二集,名为not_done,包含未完成的期货.

timeout可用于控制返回前等待的最大秒数。timeout可以是int或float。如果没有指定timeoutNone,等待时间没有限制.

return_when表示此功能何时返回。它必须是以下常量之一:

常量 描述
FIRST_COMPLETED 当任何未来完成或被取消时,该函数将返回.
FIRST_EXCEPTION 当任何结束时通过引发异常来返回该函数。如果没有未来会引发异常,则相当于ALL_COMPLETED.
ALL_COMPLETED 所有功能完成或取消后,该功能将返回.
concurrent.futures.as_completedfs, timeout=None

返回Future实例(可能是由不同的Executor实例)由给出fs这使得期货完整(已完成或被取消)。由fs给出的任何复制的期货将被退回一次。任何在as_completed()被称为将首先产生。返回的iteratorraises一个concurrent.futures.TimeoutError如果__next__()被调用,结果在timeout秒后原始调用as_completed(). timeout可以是int或float。如果timeout未指定或None,等待时间没有限制.

也可以看看

PEP 3148 – 期货 – 异步执行计算
描述此功能的提议包含在Python标准库中.

异常类

exception concurrent.futures.CancelledError

在取消未来时引发.

exception concurrent.futures.TimeoutError

当未来的操作超过给定的超时时引发

exception concurrent.futures.BrokenExecutor

来自RuntimeError当执行器由于某种原因而被破坏时,会引发此异常类,并且不能用于提交或执行新任务.

新版本3.7.

exception concurrent.futures.thread.BrokenThreadPool

来自BrokenExecutorThreadPoolExecutor的一个工人初始化失败时,会引发这个异常类.

版本3.7.

exception concurrent.futures.process.BrokenProcessPool

来自BrokenExecutor(原RuntimeError),当一个ProcessPoolExecutor的工作者以非清洁方式终止时(例如,如果它是从外面被杀死的话),就会引发这个异常类.

版本3.3.