Table of Contents
Intro
In this post we introduce the ProcessPoolExecutor class provided by the concurrent.futures module, implementing some examples and generating some visualisations to better understand the content.
The resources section at the end has some links to wonderful material that you can use to go deeper into the topic 🤓
Related posts
The code from this post can be found in this repository.
ProcessPoolExecutor
The ProcessPoolExecutor
class is an Executor
subclass that uses a pool of separate processes to execute callables asynchronously.
A process pool is a design pattern to optimize the execution of tasks by reusing a fixed number of processes, rather than creating and destroying them on demand. It is used to manage and control a collection of processes that can be used to perform concurrent tasks.
Using the ProcessPoolExecutor
object we can submit tasks to the process pool to be performed concurrently and in parallel.
Why we need it
The multiprocessing module in Python already provides the Pool
class to initialize and manage several worker processes, so why one might choose to use ProcessPoolExecutor
over Pool
?
Find an implementation example of the
Pool
object in the Multiprocessing in Python post.
The ProcessPoolExecutor
class comes from the same module as ThreadPoolExecutor
, the concurrent.futures module, and both have the same API. This consistency makes it easy to switch between multithreading and multiprocessing, depending on the nature of the tasks.
The ProcessPoolExecutor
class is easier to use because it has a simpler API and facilitates error handling by wrapping exceptions in Future
objects.
It offers a more convenient method for managing and working with pools of processes.
How it works
The ProcessPoolExecutor
object internally creates the pool of worker processes by instantiating the multiprocessing.Process
class. Each worker process waits for tasks to execute them and returns the result to the main process.
In this section, we will explore the
ProcessPoolExecutor
source code to understand its organization. While a deep understanding is not required to leverage the potential of multiprocessing, examining it a little bit can provide valuable insights. This is a good entry point if you want to go deeper into the inner workings of this module. However, if you prefer, you can go directly to the short summary and then move on to the next section to see some implementation examples.
- concurrent.futures module docs
- concurrent.futures module. Python 3.13
- concurrent.futures.ProcessPoolExecutor source code. Python 3.13
The number of worker processes can be specified with the max_workers
parameter, or it defaults to the number of logical CPU cores returned by os.cpu_count()
(or by os.process_cpu_count()
since Python 3.13).
class ProcessPoolExecutor(_base.Executor):
def __init__(self, max_workers=None, mp_context=None,
initializer=None, initargs=(), *, max_tasks_per_child=None):
"""Initializes a new ProcessPoolExecutor instance.
...
"""
_check_system_limits()
if max_workers is None:
self._max_workers = os.process_cpu_count() or 1
...
The Executor
class
As well as the ThreadPoolExecutor
class, the ProcessPoolExecutor
also extends the Executor
class, which is an abstract base class that defines only five methods:
submit()
map()
shutdown()
Executor
source code Python 3.13
The other two methods are actually the __enter__()
and __exit__()
Python’s magic methods to implement the context management protocol. Thanks to them we can use the ProcessPoolExecutor
in a with
statement (recommended). The with
statement will call the __enter__()
method, and __exit__()
will be called when the execution leaves the with
code block.
class Executor(object):
"""This is an abstract base class for concrete asynchronous executors."""
...
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.shutdown(wait=True)
return False
Executor
is only an abstract class and most of the logic is implemented in the ProcessPoolExecutor
methods. The submit()
and shutdown()
methods are implemented in the ProcessPoolExecutor
class while most of the logic for the map()
method is implemented in the Executor
class, since it uses submit()
internally.
But let’s focus first on the submit()
method.
submit()
method
The submit()
method allows you to schedule a callable to be executed asynchronously in a separate process. It returns a Future
instance representing the execution of the callable.
submit()
accepts a function and its arguments in an iterable object.
Let’s take a brief look at the internal workflow.
First it acquires a lock to ensure thread safety and checks the Executor
’s state.
Then it creates a Future
and a _WorkItem
object. The Future
object represents a placeholder for the result of the callable and provides methods to check if the computation is complete, wait for its completion, and retrieve the result once available. It transitions from pending to completed state when the task is completed and returns the result.
class ProcessPoolExecutor(_base.Executor):
...
def submit(self, fn, /, *args, **kwargs):
with self._shutdown_lock:
...
f = _base.Future()
w = _WorkItem(f, fn, args, kwargs)
...
return f
...
More details of
_WorkItem
s in The Work IDs Queue section.
After that, the _WorkItem
is added to the _pending_work_items
dictionary and its identifier is queued in the _work_ids
queue, ensuring it will be processed in the correct order later by the Executor Manager Thread or queue management thread, which is in charge of putting them in the call queue. (see The queues section).
class ProcessPoolExecutor(_base.Executor):
...
def submit(self, fn, /, *args, **kwargs):
with self._shutdown_lock:
...
self._pending_work_items[self._queue_count] = w
self._work_ids.put(self._queue_count)
self._queue_count += 1
...
return f
...
The line of code below is responsible for waking up the thread that manages the execution queue and worker processes, the queue management thread. This thread retrieves the work ids from the work_ids_queue
, gets the work item from the pending_work_items
dictionary, creates a _CallItem
object with the callable and its arguments, and puts it into the call_queue
, among other tasks.
class ProcessPoolExecutor(_base.Executor):
...
def submit(self, fn, /, *args, **kwargs):
with self._shutdown_lock:
...
# Wake up queue management thread
self._executor_manager_thread_wakeup.wakeup()
...
return f
...
Finally, the lines 8 and 9 below make the executor adjusts the number of worker processes to match the workload when the processes start method is different than fork (see Child and Parent processes).
The _start_executor_manager_thread()
method cheks if the queue management thread is running and if it isn’t, it checks if the processes starting method is fork, in which case it creates and starts all processes by calling the _spawn_process()
method. Then it creates and starts the queue management thread. By doing this the executor ensures the manager thread is always running to coordinating the execution of tasks and managing worker processes.
class ProcessPoolExecutor(_base.Executor):
...
def submit(self, fn, /, *args, **kwargs):
with self._shutdown_lock:
...
if self._safe_to_dynamically_spawn_children:
self._adjust_process_count()
self._start_executor_manager_thread()
return f
...
The _start_executor_manager_thread()
method:
class ProcessPoolExecutor(_base.Executor):
...
def _start_executor_manager_thread(self):
if self._executor_manager_thread is None:
# Start the processes so that their sentinels are known.
if not self._safe_to_dynamically_spawn_children: # ie, using fork.
self._launch_processes()
self._executor_manager_thread = _ExecutorManagerThread(self)
self._executor_manager_thread.start()
_threads_wakeups[self._executor_manager_thread] = \
self._executor_manager_thread_wakeup
...
The _ExecutorManagerThread
is a class defined in the same module and extends the threading.Thread
class (see The Executor Manager Thread section).
I leave the full code of the function below.
class ProcessPoolExecutor(_base.Executor):
...
def submit(self, fn, /, *args, **kwargs):
with self._shutdown_lock:
if self._broken:
raise BrokenProcessPool(self._broken)
if self._shutdown_thread:
raise RuntimeError('cannot schedule new futures after shutdown')
if _global_shutdown:
raise RuntimeError('cannot schedule new futures after '
'interpreter shutdown')
f = _base.Future()
w = _WorkItem(f, fn, args, kwargs)
self._pending_work_items[self._queue_count] = w
self._work_ids.put(self._queue_count)
self._queue_count += 1
# Wake up queue management thread
self._executor_manager_thread_wakeup.wakeup()
if self._safe_to_dynamically_spawn_children:
self._adjust_process_count()
self._start_executor_manager_thread()
return f
map()
method
The map()
method is the other way we have to submit tasks to the process pool. It is similar to the built-in map(fn, *iterables)
function, but the function we pass to it through the fn
parameter is executed asynchronously.
The built-in map()
function provides lazy evaluation, meaning that values from the iterable returned by it are computed and returned only when requested.
However, when Executor.map(fn, *iterables)
is called, the function gathers all the items from the provided iterables upfront. This is in contrast to a lazy evaluation approach where items are processed as they are needed (i.e., on-demand).
It returns an iterator that we can iterate over to get the values from the tasks as they are available in the same order that the iterable we provided.
If timeout
is not specified or None
, there is no limit to the wait time. So when we start iterating, we won’t access the second element of the iterator until the first one is available. The returned iterator raises a TimeoutError
if the result isn’t available after timeout seconds when it is set to a specific int
or float
.
This method chops iterables into a number of chunks which it submits to the pool as separate tasks. The (approximate) size of these chunks can be specified by setting chunksize
to a positive integer.
If a fn
call raises an exception, then that exception will be raised when its value is retrieved from the iterator.
Most of the logic for the map()
method is implemented in the Executor
class, since it uses submit()
internally.
class Executor(object):
"""This is an abstract base class for concrete asynchronous executors."""
...
def map(self, fn, *iterables, timeout=None, chunksize=1):
"""Returns an iterator equivalent to map(fn, iter).
...
"""
...
fs = [self.submit(fn, *args) for args in zip(*iterables)]
...
return result_iterator()
...
Although this method is overrided in the ProcessPoolExecutor
class, which processes the callable and the iterables to chop them up and passes them to the super().map()
method, which applies the partial(_process_chunk, fn)
function to each chunk generated by itertools.batched(zip(*iterables), chunksize)
.
class ProcessPoolExecutor(_base.Executor):
...
def map(self, fn, *iterables, timeout=None, chunksize=1):
"""Returns an iterator equivalent to map(fn, iter).
...
"""
...
results = super().map(partial(_process_chunk, fn),
itertools.batched(zip(*iterables), chunksize),
timeout=timeout)
return _chain_from_iterable_of_lists(results)
...
shutdown()
method
The shutdown()
method is responsible for safely shutting down the process pool and cleaning up resources. It signals the executor that it should free any resources that it is using when the currently pending futures are done executing.
As we saw above, you can avoid having to call this method explicitly if you use the with
statement thanks to the __exit__()
magic method:
class Executor(object):
"""This is an abstract base class for concrete asynchronous executors."""
...
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.shutdown(wait=True)
return False
The shutdown()
method signature has two parameters:
wait
defaults toTrue
wait=True
- It will not return until all the pending futures are done executing and the resources associated with the executor have been freed.wait=False
- It will return immediately and the resources associated with the executor will be freed when all pending futures are done executing.
cancel_futures
defaults toFalse
cancel_futures=True
- It will cancel all pending futures that the executor has not started running.cancel_futures=False
- It will not cancel any future.
If both cancel_futures
and wait
are True
, all futures that the executor has started running will be completed prior to this method returning. The remaining futures are cancelled.
Regardless of the value of wait
, the entire Python program will not exit until all pending futures are done executing.
The shutdown()
method starts acquiring a lock to ensure thread safety and checking the _executor_manager_thread_wakeup
signal exists and wait
is True
(default), in which case it wakes up the thread that manages the execution queue and worker processes, the queue management thread.
class ProcessPoolExecutor(_base.Executor):
...
def shutdown(self, wait=True, *, cancel_futures=False):
with self._shutdown_lock:
self._cancel_pending_futures = cancel_futures
self._shutdown_thread = True
if self._executor_manager_thread_wakeup is not None:
# Wake up queue management thread
self._executor_manager_thread_wakeup.wakeup()
...
Then it checks that the queue management thread (_executor_manager_thread
) exists and waits for this thread to complete by join()
ing it, ensuring all tasks are finished if wait
is True
. This blocks the main thread from the main process, assuming all this was called there.
class ProcessPoolExecutor(_base.Executor):
...
def shutdown(self, wait=True, *, cancel_futures=False):
...
if self._executor_manager_thread is not None and wait:
self._executor_manager_thread.join()
...
Finally, it clean up resources by removing references to threads, queues, and processes.
class ProcessPoolExecutor(_base.Executor):
...
def shutdown(self, wait=True, *, cancel_futures=False):
...
# To reduce the risk of opening too many files, remove references to
# objects that use file descriptors.
self._executor_manager_thread = None
self._call_queue = None
if self._result_queue is not None and wait:
self._result_queue.close()
self._result_queue = None
self._processes = None
self._executor_manager_thread_wakeup = None
Any call to Executor.submit()
and Executor.map()
made after shutdown will raise RuntimeError
.
Data-flow through the system
You can find the following diagram in the concurrent.futures.process module where the data-flow is explained.
- When
Executor.submit()
is called (by callingProcessPoolExecutor.map()
orProcessPoolExecutor.submit()
) a_WorkItem
object is created and added to the dictionary_pending_work_items
. Then the ID of the_WorkItem
is added to the_work_ids
queue.- A
_WorkItem
is just an object used to wrap the task (fn
), its arguments (args
andkwargs
) and the future object (_base.Future()
) together (see The Work IDs Queue section for more details).
- A
- The Local worker thread or Executor Manager Thread reads work item IDs from the Work IDs queue and looks up the corresponding
_WorkItem
from the Work Items dict. - If the work item has been cancelled, it is removed from the dictionary.
- If it is not cancelled, it is repackaged as a
_CallItem
and placed in the Call Q. The thread continues to put_CallItems
into the Call Q until it is full.- A
_CallItem
is just an object used to wrap the task (work_item.fn
), its arguments (work_item.args
andwork_item.kwargs
) and the work item ID (work_id
) together (see The Call Queue section for more details).
- A
- Separate processes (Process #1..n) read
_CallItem
s from the Call Q, execute the calls, and place the resulting_ResultItem
s into the Result Q.- A
_ResultItem
is just an object used to wrap the work item ID (work_id
), the result (r
), the process ID (os.getpid()
) and a possible exception together (see The Result Queue section for more details).
- A
- The Local Worker Thread also reads
_ResultItem
s from the Result Q. It updates the future stored in the Work Items dictionary with the result and then deletes the dictionary entry for the completed work item.
"""Implements ProcessPoolExecutor.
The following diagram and text describe the data-flow through the system:
|======================= In-process =====================|== Out-of-process ==|
+----------+ +----------+ +--------+ +-----------+ +---------+
| | => | Work Ids | | | | Call Q | | Process |
| | +----------+ | | +-----------+ | Pool |
| | | ... | | | | ... | +---------+
| | | 6 | => | | => | 5, call() | => | |
| | | 7 | | | | ... | | |
| Process | | ... | | Local | +-----------+ | Process |
| Pool | +----------+ | Worker | | #1..n |
| Executor | | Thread | | |
| | +----------- + | | +-----------+ | |
| | <=> | Work Items | <=> | | <= | Result Q | <= | |
| | +------------+ | | +-----------+ | |
| | | 6: call() | | | | ... | | |
| | | future | | | | 4, result | | |
| | | ... | | | | 3, except | | |
+----------+ +------------+ +--------+ +-----------+ +---------+
Executor.submit() called:
- creates a uniquely numbered _WorkItem and adds it to the "Work Items" dict
- adds the id of the _WorkItem to the "Work Ids" queue
Local worker thread:
- reads work ids from the "Work Ids" queue and looks up the corresponding
WorkItem from the "Work Items" dict: if the work item has been cancelled then
it is simply removed from the dict, otherwise it is repackaged as a
_CallItem and put in the "Call Q". New _CallItems are put in the "Call Q"
until "Call Q" is full. NOTE: the size of the "Call Q" is kept small because
calls placed in the "Call Q" can no longer be cancelled with Future.cancel().
- reads _ResultItems from "Result Q", updates the future stored in the
"Work Items" dict and deletes the dict entry
Process #1..n:
- reads _CallItems from "Call Q", executes the calls, and puts the resulting
_ResultItems in "Result Q"
"""
__author__ = 'Brian Quinlan (brian@sweetapp.com)'
The Executor Manager Thread
The Executor Manager Thread is a dedicated thread responsible for overseeing the state and behavior of the worker processes and manage the communication between the main process and the worker processes.. It is started once by the _start_executor_manager_thread
method.
class ProcessPoolExecutor(_base.Executor):
...
def _start_executor_manager_thread(self):
if self._executor_manager_thread is None:
# Start the processes so that their sentinels are known.
if not self._safe_to_dynamically_spawn_children: # ie, using fork.
self._launch_processes()
self._executor_manager_thread = _ExecutorManagerThread(self)
self._executor_manager_thread.start()
_threads_wakeups[self._executor_manager_thread] = \
self._executor_manager_thread_wakeup
...
It extends the threading.Thread
class and overrides the run()
method.
class _ExecutorManagerThread(threading.Thread):
"""Manages the communication between this process and the worker processes.
The manager is run in a local thread.
Args:
executor: A reference to the ProcessPoolExecutor that owns
this thread. A weakref will be own by the manager as well as
references to internal objects used to introspect the state of
the executor.
"""
def __init__(self, executor):
...
super().__init__()
def run(self):
# Main loop for the executor manager thread.
while True:
# gh-109047: During Python finalization, self.call_queue.put()
# creation of a thread can fail with RuntimeError.
try:
self.add_call_item_to_queue()
except BaseException as exc:
cause = format_exception(exc)
self.terminate_broken(cause)
return
result_item, is_broken, cause = self.wait_result_broken_or_wakeup()
if is_broken:
self.terminate_broken(cause)
return
if result_item is not None:
self.process_result_item(result_item)
process_exited = result_item.exit_pid is not None
if process_exited:
p = self.processes.pop(result_item.exit_pid)
p.join()
# Delete reference to result_item to avoid keeping references
# while waiting on new results.
del result_item
if executor := self.executor_reference():
if process_exited:
with self.shutdown_lock:
executor._adjust_process_count()
else:
executor._idle_worker_semaphore.release()
del executor
...
- It monitors the worker processes and spawns new ones if it is required.
- It ensures tasks in the call queue are picked up by the worker processes.
- It retrieves completed task results from the result queue and sets them in the
Future
object. - It ensures that all worker processes are terminated properly during shutdown.
The queues
The ProcessPoolExecutor
class utilizes three main queues, each serving distinct purposes in managing the execution of tasks across multiple processes.
The Work IDs Queue
This queue keeps track of the identifiers of the tasks (work items’ IDs) that have been submitted to the executor. It helps in keeping track of which tasks are pending and ensures each task has a unique identifier.
class ProcessPoolExecutor(_base.Executor):
def __init__(self, max_workers=None, mp_context=None,
initializer=None, initargs=(), *, max_tasks_per_child=None):
...
self._work_ids = queue.Queue()
A _WorkItem
is just an object used to wrap the task (fn
), its arguments (args
and kwargs
) and the future object (_base.Future()
) together.
class _WorkItem(object):
def __init__(self, future, fn, args, kwargs):
self.future = future
self.fn = fn
self.args = args
self.kwargs = kwargs
The future will contain the result once it is available.
The Call Queue
This queue is responsible for holding the tasks that need to be executed by the worker processes. When a task is submitted to the ProcessPoolExecutor
, it is placed into this call queue. The worker processes then retrieve tasks from this queue, execute them, and put the results into the result queue.
class ProcessPoolExecutor(_base.Executor):
def __init__(self, max_workers=None, mp_context=None,
initializer=None, initargs=(), *, max_tasks_per_child=None):
...
self._call_queue = _SafeQueue(
max_size=queue_size, ctx=self._mp_context,
pending_work_items=self._pending_work_items,
shutdown_lock=self._shutdown_lock,
thread_wakeup=self._executor_manager_thread_wakeup)
...
A _CallItem
is just an object used to wrap the task (work_item.fn
), its arguments (work_item.args
and work_item.kwargs
) and the work item ID (work_id
) together.
class _CallItem(object):
def __init__(self, work_id, fn, args, kwargs):
self.work_id = work_id
self.fn = fn
self.args = args
self.kwargs = kwargs
The _CallItem
is created by the Executor Manager Thread and added to the call_queue
, which comes directly from the ThreadPoolExecutor
object.
class _ExecutorManagerThread(threading.Thread):
...
def __init__(self, executor):
...
# A ctx.Queue that will be filled with _CallItems derived from
# _WorkItems for processing by the process workers.
self.call_queue = executor._call_queue
...
def add_call_item_to_queue(self):
# Fills call_queue with _WorkItems from pending_work_items.
# This function never blocks.
...
if work_item.future.set_running_or_notify_cancel():
self.call_queue.put(_CallItem(work_id,
work_item.fn,
work_item.args,
work_item.kwargs),
block=True)
...
The Result Queue
This queue is used for communication from the worker processes back to the main process. It is created by instantiating the ProcessPoolExecutor
class.
class ProcessPoolExecutor(_base.Executor):
def __init__(self, max_workers=None, mp_context=None,
initializer=None, initargs=(), *, max_tasks_per_child=None):
...
self._result_queue = mp_context.SimpleQueue()
...
After a worker process completes a task, it places the result of the task into this queue.
# Module function passed as target to the Process
def _process_worker(call_queue, result_queue, initializer, initargs, max_tasks=None):
while True:
...
try:
r = call_item.fn(*call_item.args, **call_item.kwargs)
except BaseException as e:
exc = _ExceptionWithTraceback(e, e.__traceback__)
_sendback_result(result_queue, call_item.work_id, exception=exc,
exit_pid=exit_pid)
else:
_sendback_result(result_queue, call_item.work_id, result=r,
exit_pid=exit_pid)
del r
...
def _sendback_result(result_queue, work_id, result=None, exception=None,
exit_pid=None):
"""Safely send back the given result or exception"""
try:
result_queue.put(_ResultItem(work_id, result=result,
exception=exception, exit_pid=exit_pid))
except BaseException as e:
exc = _ExceptionWithTraceback(e, e.__traceback__)
result_queue.put(_ResultItem(work_id, exception=exc,
exit_pid=exit_pid))
Then the Executor Manager Thread retrieves the results from this queue to return them to the appropriate Future
objects.
class _ExecutorManagerThread(threading.Thread):
...
def process_result_item(self, result_item):
# Process the received a result_item. This can be either the PID of a
# worker that exited gracefully or a _ResultItem
# Received a _ResultItem so mark the future as completed.
work_item = self.pending_work_items.pop(result_item.work_id, None)
# work_item can be None if another process terminated (see above)
if work_item is not None:
if result_item.exception:
work_item.future.set_exception(result_item.exception)
else:
work_item.future.set_result(result_item.result)
...
A _ResultItem
is just an object used to wrap the work item ID (work_id
), the result (r
), the process ID (os.getpid()
) and a possible exception.
class _ResultItem(object):
def __init__(self, work_id, exception=None, result=None, exit_pid=None):
self.work_id = work_id
self.exception = exception
self.result = result
self.exit_pid = exit_pid
The Wake Up mechanism
When the Executor Manager Thread is not actively performing any management operation it is in a waiting state. During those periods of time it is waiting for something to change, such as when new tasks are submitted, when worker processes complete their tasks, or when shutdown()
is called.
The wake up mechanism is a utility designed to assist with waking up threads that are waiting for those events. This is accomplished using the _ThreadWakeup()
object, which internally uses a Pipe()
object to sends a signal (an empty byte string) through the writer end of the pipe to the reader end.
class ProcessPoolExecutor(_base.Executor):
def __init__(self, max_workers=None, mp_context=None,
initializer=None, initargs=(), *, max_tasks_per_child=None):
"""Initializes a new ProcessPoolExecutor instance.
...
"""
...
self._executor_manager_thread_wakeup = _ThreadWakeup()
...
Short summary
There is a dedicated thread called Executor Manager Thread which is responsible for monitoring the worker processes, tasks, and results. When a new task is submitted, a result is ready, or there is the need to spawn new worker processes, the wake up mechanism sends it a signal to activate it to do its job.
The ProcessPoolExecutor
internally uses three main queues:
- The call queue, which is responsible for holding the task until a worker process retrieves it.
- The result queue, which is responsible for holding the result from the worker processes until the main process retieves it.
- The work IDs queue, which is responsible for holding the tasks IDs to keep track of which tasks are pending.
The ProcessPoolExecutor
class extends the abstract Executor
class, which defines the submit()
and shutdown()
methods, and mostly implements the map()
one. The Executor
class implements the context management protocol, which means the ProcessPoolExecutor
object can be used in a with
statemen.
The submit()
method allows you to schedule a callable to be executed asynchronously in a separate process. It returns a Future
instance representing the execution of the callable.
The map()
method is the other way we have to submit tasks to the process pool. It is similar to the built-in map(fn, *iterables)
function, but the function we pass to it as fn
is executed asynchronously and several calls to fn
may be made concurrently. It returns an iterator that we can iterate over to get the values from the tasks as they are available in the same order that the iterable we provided. You can use the timeout
and chunksize
parameters to modify its behavior.
The shutdown()
method is responsible for safely shutting down the process pool and cleaning up resources. It signals the executor that it should free any resources that it is using when the currently pending futures are done executing. You can avoid having to call this method explicitly if you use the with
statement.
Examples
Let’s start by defining our CPU intensive task. As in the previous posts, it iterates over a range of numbers and adds them into the count
variable.
def cpu_bound_task(counts: int) -> tuple[float]:
"""Runs a CPU-bound task."""
start = perf_counter()
count = 0
for i in range(counts):
count += i
finish = perf_counter()
logging.info(f"-------- Process: {os.getpid()} --------")
logging.info(f"time - {round(finish - start, 4)}\n")
return start, finish
submit()
We can use the submit()
method to schedule the callable to be executed in a separate process, and it returns a Future
instance representing its execution.
submit()
accepts a function and its arguments in an iterable object.
In this example we are submittin 8 tasks to a process pool of 2 worker processes.
def main():
n_tasks = 8
args = [50000000] * n_tasks
# Run in worker processes
start = time()
# Use ProcessPoolExecutor to manage concurrency
with ProcessPoolExecutor(max_workers=2) as executor:
# Submit tasks
futures = [
executor.submit(cpu_bound_task, count) for count in args
] # does not block
# Retrieve the results
times = [future.result() for future in futures] # blocks
end = time()
total_time = end - start
logging.info(f"Total time: {total_time}")
When retieving the results the main thread of the main process blocks. They are retrieved sequentially, in the same order they were submitted.
As there are two available workers, only two processes can run in parallel. When one of the processes completes it retrieves the following tasks to execute it.
Below you can see the two process IDs, one for each process.
17:28:15: Init parallel program
17:28:16: -------- Process: 11327 --------
17:28:16: time - 1.0103
17:28:16: -------- Process: 11326 --------
17:28:16: time - 1.0427
17:28:17: -------- Process: 11327 --------
17:28:17: time - 1.0171
17:28:17: -------- Process: 11326 --------
17:28:17: time - 1.0471
17:28:18: -------- Process: 11327 --------
17:28:18: time - 1.0201
17:28:18: -------- Process: 11326 --------
17:28:18: time - 1.0459
17:28:19: -------- Process: 11327 --------
17:28:19: time - 1.0306
17:28:20: -------- Process: 11326 --------
17:28:20: time - 1.0541
17:28:20: Total time: 4.660969018936157
17:28:20: Finish parallel program
This program would have taken more than 8 seconds to complete if it had been executed sequentially. But because we have run it in parallel the time needed to complete has been cut in half.
as_completed()
The as_completed()
module function can be used to retrieve the results as they become available, so they will not necessarily be extracted in the same order in which they were submitted.
def main():
n_tasks = 8
args = [50000000] * n_tasks
# Run in worker processes
start = time()
# Use ProcessPoolExecutor to manage concurrency
with ProcessPoolExecutor(max_workers=2) as executor:
# Submit tasks
futures = [
executor.submit(cpu_bound_task, count) for count in args
] # does not block
# Process the results
times = [future.result() for future in futures] # blocks
times = [future.result() for future in as_completed(futures)] # blocks
end = time()
total_time = end - start
logging.info(f"Total time: {total_time}")
map()
We can use the map()
method to submit tasks to the process pool.
It returns an iterator that we can iterate over to get the values from the tasks as they are available in the same order that the iterable we provided.
def main():
n_tasks = 8
args = [50000000] * n_tasks
# Run in worker processes
start = time()
# Use ProcessPoolExecutor to manage concurrency
with ProcessPoolExecutor(max_workers=2) as executor:
# Submit tasks
results = executor.map(cpu_bound_task, args) # does not block
# Retrieve results
times = [result for result in results] # blocks
end = time()
total_time = end - start
logging.info(f"Total time: {total_time}")
The main thread of the main process is blocked when we retrieve the results.
If timeout
is not specified or None
, there is no limit to the wait time. So when we start iterating, we won’t access the second element of the iterator until the first one is available. The returned iterator raises a TimeoutError
if the result isn’t available after timeout seconds when it is set to a specific int
or float
.
map()
and chunksize
parameter
The chunksize
parameter can be used to optimize the program.
It is 1 by default but if you had a very long iterable it could lead to a more time consuming program with poor performace, since for each element in the iterable the program submits a task to be executed with that argument. This means that for each element in the iterable the whole process of picking, queuing, retrieving, etc. is executed.
Instead, you can reduce the number of tasks sent to be processed by setting the chunksize
parameter. The map()
method chops iterables into a number of chunks which it submits to the pool as separate tasks. The (approximate) size of these chunks can be specified by setting chunksize
to a positive integer.
Below we have 24 elements in our iterable args
and a chunksize
of 3. This means that the 24 tasks will be sent as if they were 8 instead of 24, and each task will have 3 elements from the iterable.
def main():
n_tasks = 24
args = [50000000] * n_tasks
max_workers = 4
chunksize = 3
# Run in worker processes
start = time()
# Use ProcessPoolExecutor to manage concurrency
with ProcessPoolExecutor(max_workers=max_workers) as executor:
# Submit tasks
results = executor.map(cpu_bound_task, args) # does not block
results = executor.map(
cpu_bound_task, args, chunksize=chunksize
) # does not block
# Retrieve results
times = [result for result in results] # blocks
end = time()
total_time = end - start
logging.info(f"Total time: {total_time}")
The image below shows that the first four tasks are executed in parallel and each one process three elements from the iterable. When they complete, the other four start.
The ProcessPoolExecutor in Python: The Complete Guide from SuperFastPython is an extensive post on how to work with the ProcessPoolExecutor class. You can find there an example on how to perform tuning to the chunksize
parameter when working with distributed systems.
Multiprocessing and multithreading
Multiprocessing is a powerful asset for every programmer, but you can take it to the next level by leveraging it along with multithreading.
For this example we will rescue our list of exotic Australian animals from the post Threading in Python. But this time we have a list of lists, and each list within the URLS
list has five urls. Each url gives you useful info about an Australian animal 🦘.
URLS = [
[
"https://en.wikipedia.org/wiki/Emu",
"https://en.wikipedia.org/wiki/Wombat",
"https://en.wikipedia.org/wiki/Kangaroo",
"https://en.wikipedia.org/wiki/Platypus",
"https://en.wikipedia.org/wiki/Koala",
],
[
"https://en.wikipedia.org/wiki/Tasmanian_devil",
"https://en.wikipedia.org/wiki/Echidna",
"https://en.wikipedia.org/wiki/Dingo",
"https://en.wikipedia.org/wiki/Kookaburra",
"https://en.wikipedia.org/wiki/Wallaby",
],
[
"https://en.wikipedia.org/wiki/Macrotis",
"https://en.wikipedia.org/wiki/Quokka",
"https://en.wikipedia.org/wiki/Cassowary",
"https://en.wikipedia.org/wiki/Sugar_glider",
"https://en.wikipedia.org/wiki/Laughing_kookaburra",
],
[
"https://en.wikipedia.org/wiki/Rainbow_lorikeet",
"https://en.wikipedia.org/wiki/Coastal_taipan",
"https://en.wikipedia.org/wiki/Mistletoebird",
"https://en.wikipedia.org/wiki/Thylacine",
"https://en.wikipedia.org/wiki/Quoll",
],
]
Our program will execute I/O-bound operations as well as CPU-bound ones.
I/O operations will consist of opening and reading urls containing exotic Australian animals information.
CPU-bound operations will iterate over a range of numbers and adds them into a variable. The length of the range will be the length of the content returned, multiplied by 4 (only to further extend its execution).
animals = {}
counts = 0
# I/O-bound operation
def load_url(url: str) -> tuple[float]:
"""Retrieve a single page and return start and finish times."""
logging.info(f"PID: {os.getpid()} ; url: {url}")
start = perf_counter()
try:
with urllib.request.urlopen(url) as conn:
animals[url] = conn.read()
except Exception:
logging.error(f"PID: {os.getpid()} ; url: {url}")
return start, start
finish = perf_counter()
global counts
counts += len(animals[url]) * 4
return start, finish
def cpu_bound_task(counts: int) -> tuple[float]:
"""Runs a CPU-bound task."""
logging.info(f"PID: {os.getpid()} ; counts: {counts}")
start = perf_counter()
count = 0
for i in range(counts):
count += i
finish = perf_counter()
return start, finish
Both fuctions will be called within task()
. This is the function that will be passed to ProcessPoolExecutor
as argument, together along with the URLS
list.
The tasks()
function receives a list of urls, then creates several threads and passes the load_url
function to the map()
method, together along with the urls.
Every list of urls has five, so five threads will be created in each process (max_workers=len(urls)
), each one will process one url.
Finally, the cpu_bound_task
will be executed.
So each worker process will start five thread to open the urls, and when all of them are completed a CPU-bound operation will be executed.
def task(urls: list[str]) -> list:
logging.info(f"Loading australian animals... PID: {os.getpid()}")
start = perf_counter()
with ThreadPoolExecutor(max_workers=len(urls)) as executor:
# Use the map method to apply load_url to each URL
results = executor.map(load_url, urls) # does not block
# Process the results and times
times = [result for result in results] # blocks
logging.info(f"Australian animals loaded! PID: {os.getpid()}")
times.append(cpu_bound_task(counts))
finish = perf_counter()
logging.info(f"PID: {os.getpid()} time - {round(finish - start, 4)}")
return times
From the main thread we use the ProcesPoolExecutor
to submit the tasks to be executed by the separate processes.
def main():
n_tasks = 20 + 4
max_workers = 4
start = time()
# Use ProcessPoolExecutor to manage concurrency
with ProcessPoolExecutor(max_workers=max_workers) as executor:
# Submit tasks
results = executor.map(task, URLS) # does not block
# Retrieve results
times = [result for result in results] # blocks
end = time()
total_time = end - start
logging.info(f"Total time: {total_time}")
If we had executed all these I/O-bound and CPU-bound operations sequentially it would have taken us almost 10 seconds, but thanks to multiprocessing and multithreading we have been able to reduce it to less than 1 second!
20:20:20: Init parallel program
20:20:21: Loading australian animals... PID: 19807
20:20:21: PID: 19807 ; url: https://en.wikipedia.org/wiki/Emu
20:20:21: PID: 19807 ; url: https://en.wikipedia.org/wiki/Wombat
20:20:21: PID: 19807 ; url: https://en.wikipedia.org/wiki/Kangaroo
20:20:21: PID: 19807 ; url: https://en.wikipedia.org/wiki/Platypus
20:20:21: PID: 19807 ; url: https://en.wikipedia.org/wiki/Koala
20:20:21: Loading australian animals... PID: 19806
20:20:21: PID: 19806 ; url: https://en.wikipedia.org/wiki/Tasmanian_devil
20:20:21: PID: 19806 ; url: https://en.wikipedia.org/wiki/Echidna
20:20:21: PID: 19806 ; url: https://en.wikipedia.org/wiki/Dingo
20:20:21: PID: 19806 ; url: https://en.wikipedia.org/wiki/Kookaburra
20:20:21: PID: 19806 ; url: https://en.wikipedia.org/wiki/Wallaby
20:20:21: Loading australian animals... PID: 19808
20:20:21: PID: 19808 ; url: https://en.wikipedia.org/wiki/Macrotis
20:20:21: PID: 19808 ; url: https://en.wikipedia.org/wiki/Quokka
20:20:21: PID: 19808 ; url: https://en.wikipedia.org/wiki/Cassowary
20:20:21: PID: 19808 ; url: https://en.wikipedia.org/wiki/Sugar_glider
20:20:21: PID: 19808 ; url: https://en.wikipedia.org/wiki/Laughing_kookaburra
20:20:21: Loading australian animals... PID: 19805
20:20:21: PID: 19805 ; url: https://en.wikipedia.org/wiki/Rainbow_lorikeet
20:20:21: PID: 19805 ; url: https://en.wikipedia.org/wiki/Coastal_taipan
20:20:21: PID: 19805 ; url: https://en.wikipedia.org/wiki/Mistletoebird
20:20:21: PID: 19805 ; url: https://en.wikipedia.org/wiki/Thylacine
20:20:21: PID: 19805 ; url: https://en.wikipedia.org/wiki/Quoll
20:20:21: Australian animals loaded! PID: 19807
20:20:21: PID: 19807 ; counts: 7658572
20:20:21: Australian animals loaded! PID: 19808
20:20:21: PID: 19808 ; counts: 4782624
20:20:21: Australian animals loaded! PID: 19806
20:20:21: PID: 19806 ; counts: 6722396
20:20:21: Australian animals loaded! PID: 19805
20:20:21: PID: 19805 ; counts: 4774528
20:20:21: PID: 19808 time - 0.402
20:20:21: PID: 19805 time - 0.4078
20:20:21: PID: 19806 time - 0.4613
20:20:21: PID: 19807 time - 0.5056
20:20:21: Total time: 1.1066241264343262
20:20:21: Finish parallel program
Thread safety with Locks
There is one detail to take care of. Modifying shared state (like counts
) from multiple threads can lead to race conditions and inconsistent results.
The counts
variable is being incremented based on the length of data read from URLs. If multiple threads modify counts
simultaneously, their operations can interleave in ways that lead to incorrect final values.
To safely modify counts
from multiple threads, we can use threading.Lock to ensure that only one thread can modify it at a time.
A lock is a synchronization primitive that provides exclusive access to a shared resource.
When a thread acquires a lock (acquire()
method) the lock gets the locked state and only that thread can access and modify the resource. When the lock is released (release()
method) it gets the unlocked state and other threads can acquire the lock.
I am not going to continue this topic as I would like to study it further and write about it. In the snipped code below is the implementation using it as context manager in a with
satatement. Now our program is thread safe! ✨
counts = 0
counts_lock = threading.Lock()
# I/O-bound operation
def load_url(url: str) -> tuple[float]:
"""Retrieve a single page and return start and finish times."""
logging.info(f"PID: {os.getpid()} ; url: {url}")
start = perf_counter()
try:
with urllib.request.urlopen(url) as conn:
animals[url] = conn.read()
except Exception:
logging.error(f"PID: {os.getpid()} ; url: {url}")
return start, start
finish = perf_counter()
global counts
with counts_lock:
counts += len(animals[url]) * 4
return start, finish
Below I leave some resources that helped me to understand this topic better.
Feel free to contact me by any social network. Any feedback is appreciated!
Thanks for reading 💜
Other Resources
- concurrent.futures module docs
- multiprocessing module docs
- concurrent.futures module. Python 3.13
- concurrent.futures.ProcessPoolExecutor source code. Python 3.13
Executor
source code. Python 3.13- Synchronizing Threads in Python With Locks
- Thread safety | Wikipedia
- ProcessPoolExecutor in Python: The Complete Guide
- Introduction to Concurrency and Parallelism
- Threading in Python
- Multiprocessing in Python