# Copyright 2009 Brian Quinlan. All Rights Reserved.# Licensed to PSF under a Contributor Agreement."""Implements ThreadPoolExecutor."""__author__='Brian Quinlan (brian@sweetapp.com)'fromconcurrent.futuresimport_baseimportitertoolsimportqueueimportthreadingimporttypesimportweakrefimportos_threads_queues=weakref.WeakKeyDictionary()_shutdown=False# Lock that ensures that new workers are not created while the interpreter is# shutting down. Must be held while mutating _threads_queues and _shutdown._global_shutdown_lock=threading.Lock()def_python_exit():global_shutdownwith_global_shutdown_lock:_shutdown=Trueitems=list(_threads_queues.items())fort,qinitems:q.put(None)fort,qinitems:t.join()# Register for `_python_exit()` to be called just before joining all# non-daemon threads. This is used instead of `atexit.register()` for# compatibility with subinterpreters, which no longer support daemon threads.# See bpo-39812 for context.threading._register_atexit(_python_exit)# At fork, reinitialize the `_global_shutdown_lock` lock in the child processifhasattr(os,'register_at_fork'):os.register_at_fork(before=_global_shutdown_lock.acquire,after_in_child=_global_shutdown_lock._at_fork_reinit,after_in_parent=_global_shutdown_lock.release)class_WorkItem(object):def__init__(self,future,fn,args,kwargs):self.future=futureself.fn=fnself.args=argsself.kwargs=kwargsdefrun(self):ifnotself.future.set_running_or_notify_cancel():returntry:result=self.fn(*self.args,**self.kwargs)exceptBaseExceptionasexc:self.future.set_exception(exc)# Break a reference cycle with the exception 'exc'self=Noneelse:self.future.set_result(result)__class_getitem__=classmethod(types.GenericAlias)def_worker(executor_reference,work_queue,initializer,initargs):ifinitializerisnotNone:try:initializer(*initargs)exceptBaseException:_base.LOGGER.critical('Exception in initializer:',exc_info=True)executor=executor_reference()ifexecutorisnotNone:executor._initializer_failed()returntry:whileTrue:work_item=work_queue.get(block=True)ifwork_itemisnotNone:work_item.run()# Delete references to object. See issue16284delwork_item# attempt to increment idle countexecutor=executor_reference()ifexecutorisnotNone:executor._idle_semaphore.release()delexecutorcontinueexecutor=executor_reference()# Exit if:# - The interpreter is shutting down OR# - The executor that owns the worker has been collected OR# - The executor that owns the worker has been shutdown.if_shutdownorexecutorisNoneorexecutor._shutdown:# Flag the executor as shutting down as early as possible if it# is not gc-ed yet.ifexecutorisnotNone:executor._shutdown=True# Notice other workerswork_queue.put(None)returndelexecutorexceptBaseException:_base.LOGGER.critical('Exception in worker',exc_info=True)classBrokenThreadPool(_base.BrokenExecutor):""" Raised when a worker thread in a ThreadPoolExecutor failed initializing. """classThreadPoolExecutor(_base.Executor):# Used to assign unique thread names when thread_name_prefix is not supplied._counter=itertools.count().__next__def__init__(self,max_workers=None,thread_name_prefix='',initializer=None,initargs=()):"""Initializes a new ThreadPoolExecutor instance. Args: max_workers: The maximum number of threads that can be used to execute the given calls. thread_name_prefix: An optional name prefix to give our threads. initializer: A callable used to initialize worker threads. initargs: A tuple of arguments to pass to the initializer. """ifmax_workersisNone:# ThreadPoolExecutor is often used to:# * CPU bound task which releases GIL# * I/O bound task (which releases GIL, of course)## We use cpu_count + 4 for both types of tasks.# But we limit it to 32 to avoid consuming surprisingly large resource# on many core machine.max_workers=min(32,(os.cpu_count()or1)+4)ifmax_workers<=0:raiseValueError("max_workers must be greater than 0")ifinitializerisnotNoneandnotcallable(initializer):raiseTypeError("initializer must be a callable")self._max_workers=max_workersself._work_queue=queue.SimpleQueue()self._idle_semaphore=threading.Semaphore(0)self._threads=set()self._broken=Falseself._shutdown=Falseself._shutdown_lock=threading.Lock()self._thread_name_prefix=(thread_name_prefixor("ThreadPoolExecutor-%d"%self._counter()))self._initializer=initializerself._initargs=initargsdefsubmit(self,fn,/,*args,**kwargs):withself._shutdown_lock,_global_shutdown_lock:ifself._broken:raiseBrokenThreadPool(self._broken)ifself._shutdown:raiseRuntimeError('cannot schedule new futures after shutdown')if_shutdown:raiseRuntimeError('cannot schedule new futures after ''interpreter shutdown')f=_base.Future()w=_WorkItem(f,fn,args,kwargs)self._work_queue.put(w)self._adjust_thread_count()returnfsubmit.__doc__=_base.Executor.submit.__doc__def_adjust_thread_count(self):# if idle threads are available, don't spin new threadsifself._idle_semaphore.acquire(timeout=0):return# When the executor gets lost, the weakref callback will wake up# the worker threads.defweakref_cb(_,q=self._work_queue):q.put(None)num_threads=len(self._threads)ifnum_threads<self._max_workers:thread_name='%s_%d'%(self._thread_name_prefixorself,num_threads)t=threading.Thread(name=thread_name,target=_worker,args=(weakref.ref(self,weakref_cb),self._work_queue,self._initializer,self._initargs))t.start()self._threads.add(t)_threads_queues[t]=self._work_queuedef_initializer_failed(self):withself._shutdown_lock:self._broken=('A thread initializer failed, the thread pool ''is not usable anymore')# Drain work queue and mark pending futures failedwhileTrue:try:work_item=self._work_queue.get_nowait()exceptqueue.Empty:breakifwork_itemisnotNone:work_item.future.set_exception(BrokenThreadPool(self._broken))defshutdown(self,wait=True,*,cancel_futures=False):withself._shutdown_lock:self._shutdown=Trueifcancel_futures:# Drain all work items from the queue, and then cancel their# associated futures.whileTrue:try:work_item=self._work_queue.get_nowait()exceptqueue.Empty:breakifwork_itemisnotNone:work_item.future.cancel()# Send a wake-up to prevent threads calling# _work_queue.get(block=True) from permanently blocking.self._work_queue.put(None)ifwait:fortinself._threads:t.join()shutdown.__doc__=_base.Executor.shutdown.__doc__