ProcessPoolExecutor in Python

Working with pools of processes

Time spent to execute 20 I/O-bound and 4 CPU-bound tasks in 4 worker processes with 5 threads each
Time spent to execute 20 I/O-bound and 4 CPU-bound tasks in 4 worker processes with 5 threads each

Table of Contents

Intro

In this post we introduce the ProcessPoolExecutor class provided by the concurrent.futures module, implementing some examples and generating some visualisations to better understand the content.

The resources section at the end has some links to wonderful material that you can use to go deeper into the topic 🤓

Related posts

The code from this post can be found in this repository.

ProcessPoolExecutor

The ProcessPoolExecutor class is an Executor subclass that uses a pool of separate processes to execute callables asynchronously.

A process pool is a design pattern to optimize the execution of tasks by reusing a fixed number of processes, rather than creating and destroying them on demand. It is used to manage and control a collection of processes that can be used to perform concurrent tasks.

Using the ProcessPoolExecutor object we can submit tasks to the process pool to be performed concurrently and in parallel.

Why we need it

The multiprocessing module in Python already provides the Pool class to initialize and manage several worker processes, so why one might choose to use ProcessPoolExecutor over Pool?

Find an implementation example of the Pool object in the Multiprocessing in Python post.

The ProcessPoolExecutor class comes from the same module as ThreadPoolExecutor, the concurrent.futures module, and both have the same API. This consistency makes it easy to switch between multithreading and multiprocessing, depending on the nature of the tasks.

The ProcessPoolExecutor class is easier to use because it has a simpler API and facilitates error handling by wrapping exceptions in Future objects.

It offers a more convenient method for managing and working with pools of processes.

How it works

The ProcessPoolExecutor object internally creates the pool of worker processes by instantiating the multiprocessing.Process class. Each worker process waits for tasks to execute them and returns the result to the main process.

In this section, we will explore the ProcessPoolExecutor source code to understand its organization. While a deep understanding is not required to leverage the potential of multiprocessing, examining it a little bit can provide valuable insights. This is a good entry point if you want to go deeper into the inner workings of this module. However, if you prefer, you can go directly to the short summary and then move on to the next section to see some implementation examples.

The number of worker processes can be specified with the max_workers parameter, or it defaults to the number of logical CPU cores returned by os.cpu_count() (or by os.process_cpu_count() since Python 3.13).

class ProcessPoolExecutor(_base.Executor):
    def __init__(self, max_workers=None, mp_context=None,
                 initializer=None, initargs=(), *, max_tasks_per_child=None):
        """Initializes a new ProcessPoolExecutor instance.
        ...
        """
        _check_system_limits()

        if max_workers is None:
            self._max_workers = os.process_cpu_count() or 1
        ...

The Executor class

As well as the ThreadPoolExecutor class, the ProcessPoolExecutor also extends the Executor class, which is an abstract base class that defines only five methods:

  • submit()
  • map()
  • shutdown()

Executor source code Python 3.13

The other two methods are actually the __enter__() and __exit__() Python’s magic methods to implement the context management protocol. Thanks to them we can use the ProcessPoolExecutor in a with statement (recommended). The with statement will call the __enter__() method, and __exit__() will be called when the execution leaves the with code block.

class Executor(object):
    """This is an abstract base class for concrete asynchronous executors."""
    ...

    def __enter__(self):
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        self.shutdown(wait=True)
        return False

Executor is only an abstract class and most of the logic is implemented in the ProcessPoolExecutor methods. The submit() and shutdown() methods are implemented in the ProcessPoolExecutor class while most of the logic for the map() method is implemented in the Executor class, since it uses submit() internally.

But let’s focus first on the submit() method.

submit() method

The submit() method allows you to schedule a callable to be executed asynchronously in a separate process. It returns a Future instance representing the execution of the callable.

submit() accepts a function and its arguments in an iterable object.

Let’s take a brief look at the internal workflow.

First it acquires a lock to ensure thread safety and checks the Executor’s state.

Then it creates a Future and a _WorkItem object. The Future object represents a placeholder for the result of the callable and provides methods to check if the computation is complete, wait for its completion, and retrieve the result once available. It transitions from pending to completed state when the task is completed and returns the result.

class ProcessPoolExecutor(_base.Executor):
    ...

    def submit(self, fn, /, *args, **kwargs):
        with self._shutdown_lock:
            ...

            f = _base.Future()
            w = _WorkItem(f, fn, args, kwargs)

            ...
            return f
    ...

More details of _WorkItems in The Work IDs Queue section.

After that, the _WorkItem is added to the _pending_work_items dictionary and its identifier is queued in the _work_ids queue, ensuring it will be processed in the correct order later by the Executor Manager Thread or queue management thread, which is in charge of putting them in the call queue. (see The queues section).

class ProcessPoolExecutor(_base.Executor):
    ...

    def submit(self, fn, /, *args, **kwargs):
        with self._shutdown_lock:
            ...

            self._pending_work_items[self._queue_count] = w
            self._work_ids.put(self._queue_count)
            self._queue_count += 1

            ...
            return f
    ...

The line of code below is responsible for waking up the thread that manages the execution queue and worker processes, the queue management thread. This thread retrieves the work ids from the work_ids_queue, gets the work item from the pending_work_items dictionary, creates a _CallItem object with the callable and its arguments, and puts it into the call_queue, among other tasks.

class ProcessPoolExecutor(_base.Executor):
    ...

    def submit(self, fn, /, *args, **kwargs):
        with self._shutdown_lock:
            ...

            # Wake up queue management thread
            self._executor_manager_thread_wakeup.wakeup()

            ...
            return f
    ...

Finally, the lines 8 and 9 below make the executor adjusts the number of worker processes to match the workload when the processes start method is different than fork (see Child and Parent processes).

The _start_executor_manager_thread() method cheks if the queue management thread is running and if it isn’t, it checks if the processes starting method is fork, in which case it creates and starts all processes by calling the _spawn_process() method. Then it creates and starts the queue management thread. By doing this the executor ensures the manager thread is always running to coordinating the execution of tasks and managing worker processes.

class ProcessPoolExecutor(_base.Executor):
    ...

    def submit(self, fn, /, *args, **kwargs):
        with self._shutdown_lock:
            ...

            if self._safe_to_dynamically_spawn_children:
                self._adjust_process_count()
            self._start_executor_manager_thread()
            return f
    ...

The _start_executor_manager_thread() method:

class ProcessPoolExecutor(_base.Executor):
    ...

    def _start_executor_manager_thread(self):
        if self._executor_manager_thread is None:
            # Start the processes so that their sentinels are known.
            if not self._safe_to_dynamically_spawn_children:  # ie, using fork.
                self._launch_processes()
            self._executor_manager_thread = _ExecutorManagerThread(self)
            self._executor_manager_thread.start()
            _threads_wakeups[self._executor_manager_thread] = \
                self._executor_manager_thread_wakeup
    ...

The _ExecutorManagerThread is a class defined in the same module and extends the threading.Thread class (see The Executor Manager Thread section).

I leave the full code of the function below.

class ProcessPoolExecutor(_base.Executor):
    ...

    def submit(self, fn, /, *args, **kwargs):
        with self._shutdown_lock:
            if self._broken:
                raise BrokenProcessPool(self._broken)
            if self._shutdown_thread:
                raise RuntimeError('cannot schedule new futures after shutdown')
            if _global_shutdown:
                raise RuntimeError('cannot schedule new futures after '
                                   'interpreter shutdown')

            f = _base.Future()
            w = _WorkItem(f, fn, args, kwargs)

            self._pending_work_items[self._queue_count] = w
            self._work_ids.put(self._queue_count)
            self._queue_count += 1
            # Wake up queue management thread
            self._executor_manager_thread_wakeup.wakeup()

            if self._safe_to_dynamically_spawn_children:
                self._adjust_process_count()
            self._start_executor_manager_thread()
            return f

map() method

The map() method is the other way we have to submit tasks to the process pool. It is similar to the built-in map(fn, *iterables) function, but the function we pass to it through the fn parameter is executed asynchronously.

The built-in map() function provides lazy evaluation, meaning that values from the iterable returned by it are computed and returned only when requested.

However, when Executor.map(fn, *iterables) is called, the function gathers all the items from the provided iterables upfront. This is in contrast to a lazy evaluation approach where items are processed as they are needed (i.e., on-demand).

It returns an iterator that we can iterate over to get the values from the tasks as they are available in the same order that the iterable we provided.

If timeout is not specified or None, there is no limit to the wait time. So when we start iterating, we won’t access the second element of the iterator until the first one is available. The returned iterator raises a TimeoutError if the result isn’t available after timeout seconds when it is set to a specific int or float.

This method chops iterables into a number of chunks which it submits to the pool as separate tasks. The (approximate) size of these chunks can be specified by setting chunksize to a positive integer.

If a fn call raises an exception, then that exception will be raised when its value is retrieved from the iterator.

Most of the logic for the map() method is implemented in the Executor class, since it uses submit() internally.

class Executor(object):
    """This is an abstract base class for concrete asynchronous executors."""
    ...

    def map(self, fn, *iterables, timeout=None, chunksize=1):
        """Returns an iterator equivalent to map(fn, iter).
        ...
        """
        ...

        fs = [self.submit(fn, *args) for args in zip(*iterables)]

        ...
        return result_iterator()

    ...

Although this method is overrided in the ProcessPoolExecutor class, which processes the callable and the iterables to chop them up and passes them to the super().map() method, which applies the partial(_process_chunk, fn) function to each chunk generated by itertools.batched(zip(*iterables), chunksize).

class ProcessPoolExecutor(_base.Executor):
    ...

    def map(self, fn, *iterables, timeout=None, chunksize=1):
        """Returns an iterator equivalent to map(fn, iter).
        ...
        """
        ...

        results = super().map(partial(_process_chunk, fn),
                              itertools.batched(zip(*iterables), chunksize),
                              timeout=timeout)
        return _chain_from_iterable_of_lists(results)
      
      ...

shutdown() method

The shutdown() method is responsible for safely shutting down the process pool and cleaning up resources. It signals the executor that it should free any resources that it is using when the currently pending futures are done executing.

As we saw above, you can avoid having to call this method explicitly if you use the with statement thanks to the __exit__() magic method:

class Executor(object):
    """This is an abstract base class for concrete asynchronous executors."""
    ...

    def __enter__(self):
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        self.shutdown(wait=True)
        return False

The shutdown() method signature has two parameters:

  • wait defaults to True
    • wait=True - It will not return until all the pending futures are done executing and the resources associated with the executor have been freed.
    • wait=False - It will return immediately and the resources associated with the executor will be freed when all pending futures are done executing.
  • cancel_futures defaults to False
    • cancel_futures=True - It will cancel all pending futures that the executor has not started running.
    • cancel_futures=False - It will not cancel any future.

If both cancel_futures and wait are True, all futures that the executor has started running will be completed prior to this method returning. The remaining futures are cancelled.

Regardless of the value of wait, the entire Python program will not exit until all pending futures are done executing.

The shutdown() method starts acquiring a lock to ensure thread safety and checking the _executor_manager_thread_wakeup signal exists and wait is True (default), in which case it wakes up the thread that manages the execution queue and worker processes, the queue management thread.

class ProcessPoolExecutor(_base.Executor):
    ...

    def shutdown(self, wait=True, *, cancel_futures=False):
        with self._shutdown_lock:
            self._cancel_pending_futures = cancel_futures
            self._shutdown_thread = True
            if self._executor_manager_thread_wakeup is not None:
                # Wake up queue management thread
                self._executor_manager_thread_wakeup.wakeup()

        ...

Then it checks that the queue management thread (_executor_manager_thread) exists and waits for this thread to complete by join()ing it, ensuring all tasks are finished if wait is True. This blocks the main thread from the main process, assuming all this was called there.

class ProcessPoolExecutor(_base.Executor):
    ...

    def shutdown(self, wait=True, *, cancel_futures=False):
        ...

        if self._executor_manager_thread is not None and wait:
            self._executor_manager_thread.join()

        ...

Finally, it clean up resources by removing references to threads, queues, and processes.

class ProcessPoolExecutor(_base.Executor):
    ...

    def shutdown(self, wait=True, *, cancel_futures=False):
        ...

        # To reduce the risk of opening too many files, remove references to
        # objects that use file descriptors.
        self._executor_manager_thread = None
        self._call_queue = None
        if self._result_queue is not None and wait:
            self._result_queue.close()
        self._result_queue = None
        self._processes = None
        self._executor_manager_thread_wakeup = None

Any call to Executor.submit() and Executor.map() made after shutdown will raise RuntimeError.

Data-flow through the system

You can find the following diagram in the concurrent.futures.process module where the data-flow is explained.

  1. When Executor.submit() is called (by calling ProcessPoolExecutor.map() or ProcessPoolExecutor.submit()) a _WorkItem object is created and added to the dictionary _pending_work_items. Then the ID of the _WorkItem is added to the _work_ids queue.
    • A _WorkItem is just an object used to wrap the task (fn), its arguments (args and kwargs) and the future object (_base.Future()) together (see The Work IDs Queue section for more details).
  2. The Local worker thread or Executor Manager Thread reads work item IDs from the Work IDs queue and looks up the corresponding _WorkItem from the Work Items dict.
  3. If the work item has been cancelled, it is removed from the dictionary.
  4. If it is not cancelled, it is repackaged as a _CallItem and placed in the Call Q. The thread continues to put _CallItems into the Call Q until it is full.
    • A _CallItem is just an object used to wrap the task (work_item.fn), its arguments (work_item.args and work_item.kwargs) and the work item ID (work_id) together (see The Call Queue section for more details).
  5. Separate processes (Process #1..n) read _CallItems from the Call Q, execute the calls, and place the resulting _ResultItems into the Result Q.
    • A _ResultItem is just an object used to wrap the work item ID (work_id), the result (r), the process ID (os.getpid()) and a possible exception together (see The Result Queue section for more details).
  6. The Local Worker Thread also reads _ResultItems from the Result Q. It updates the future stored in the Work Items dictionary with the result and then deletes the dictionary entry for the completed work item.
"""Implements ProcessPoolExecutor.

The following diagram and text describe the data-flow through the system:

|======================= In-process =====================|== Out-of-process ==|

+----------+     +----------+       +--------+     +-----------+    +---------+
|          |  => | Work Ids |       |        |     | Call Q    |    | Process |
|          |     +----------+       |        |     +-----------+    |  Pool   |
|          |     | ...      |       |        |     | ...       |    +---------+
|          |     | 6        |    => |        |  => | 5, call() | => |         |
|          |     | 7        |       |        |     | ...       |    |         |
| Process  |     | ...      |       | Local  |     +-----------+    | Process |
|  Pool    |     +----------+       | Worker |                      |  #1..n  |
| Executor |                        | Thread |                      |         |
|          |     +----------- +     |        |     +-----------+    |         |
|          | <=> | Work Items | <=> |        | <=  | Result Q  | <= |         |
|          |     +------------+     |        |     +-----------+    |         |
|          |     | 6: call()  |     |        |     | ...       |    |         |
|          |     |    future  |     |        |     | 4, result |    |         |
|          |     | ...        |     |        |     | 3, except |    |         |
+----------+     +------------+     +--------+     +-----------+    +---------+

Executor.submit() called:
- creates a uniquely numbered _WorkItem and adds it to the "Work Items" dict
- adds the id of the _WorkItem to the "Work Ids" queue

Local worker thread:
- reads work ids from the "Work Ids" queue and looks up the corresponding
  WorkItem from the "Work Items" dict: if the work item has been cancelled then
  it is simply removed from the dict, otherwise it is repackaged as a
  _CallItem and put in the "Call Q". New _CallItems are put in the "Call Q"
  until "Call Q" is full. NOTE: the size of the "Call Q" is kept small because
  calls placed in the "Call Q" can no longer be cancelled with Future.cancel().
- reads _ResultItems from "Result Q", updates the future stored in the
  "Work Items" dict and deletes the dict entry

Process #1..n:
- reads _CallItems from "Call Q", executes the calls, and puts the resulting
  _ResultItems in "Result Q"
"""

__author__ = 'Brian Quinlan (brian@sweetapp.com)'

The Executor Manager Thread

The Executor Manager Thread is a dedicated thread responsible for overseeing the state and behavior of the worker processes and manage the communication between the main process and the worker processes.. It is started once by the _start_executor_manager_thread method.

class ProcessPoolExecutor(_base.Executor):
    ...

    def _start_executor_manager_thread(self):
        if self._executor_manager_thread is None:
            # Start the processes so that their sentinels are known.
            if not self._safe_to_dynamically_spawn_children:  # ie, using fork.
                self._launch_processes()
            self._executor_manager_thread = _ExecutorManagerThread(self)
            self._executor_manager_thread.start()
            _threads_wakeups[self._executor_manager_thread] = \
                self._executor_manager_thread_wakeup
    ...

It extends the threading.Thread class and overrides the run() method.

class _ExecutorManagerThread(threading.Thread):
    """Manages the communication between this process and the worker processes.

    The manager is run in a local thread.

    Args:
        executor: A reference to the ProcessPoolExecutor that owns
            this thread. A weakref will be own by the manager as well as
            references to internal objects used to introspect the state of
            the executor.
    """
    def __init__(self, executor):
        ...
        super().__init__()

    def run(self):
        # Main loop for the executor manager thread.

        while True:
            # gh-109047: During Python finalization, self.call_queue.put()
            # creation of a thread can fail with RuntimeError.
            try:
                self.add_call_item_to_queue()
            except BaseException as exc:
                cause = format_exception(exc)
                self.terminate_broken(cause)
                return

            result_item, is_broken, cause = self.wait_result_broken_or_wakeup()

            if is_broken:
                self.terminate_broken(cause)
                return
            if result_item is not None:
                self.process_result_item(result_item)

                process_exited = result_item.exit_pid is not None
                if process_exited:
                    p = self.processes.pop(result_item.exit_pid)
                    p.join()

                # Delete reference to result_item to avoid keeping references
                # while waiting on new results.
                del result_item

                if executor := self.executor_reference():
                    if process_exited:
                        with self.shutdown_lock:
                            executor._adjust_process_count()
                    else:
                        executor._idle_worker_semaphore.release()
                    del executor
    
    ...
  • It monitors the worker processes and spawns new ones if it is required.
  • It ensures tasks in the call queue are picked up by the worker processes.
  • It retrieves completed task results from the result queue and sets them in the Future object.
  • It ensures that all worker processes are terminated properly during shutdown.

The queues

The ProcessPoolExecutor class utilizes three main queues, each serving distinct purposes in managing the execution of tasks across multiple processes.

The Work IDs Queue

This queue keeps track of the identifiers of the tasks (work items’ IDs) that have been submitted to the executor. It helps in keeping track of which tasks are pending and ensures each task has a unique identifier.

class ProcessPoolExecutor(_base.Executor):
    def __init__(self, max_workers=None, mp_context=None,
                 initializer=None, initargs=(), *, max_tasks_per_child=None):
        ...
        self._work_ids = queue.Queue()

A _WorkItem is just an object used to wrap the task (fn), its arguments (args and kwargs) and the future object (_base.Future()) together.

class _WorkItem(object):
    def __init__(self, future, fn, args, kwargs):
        self.future = future
        self.fn = fn
        self.args = args
        self.kwargs = kwargs

The future will contain the result once it is available.

The Call Queue

This queue is responsible for holding the tasks that need to be executed by the worker processes. When a task is submitted to the ProcessPoolExecutor, it is placed into this call queue. The worker processes then retrieve tasks from this queue, execute them, and put the results into the result queue.

class ProcessPoolExecutor(_base.Executor):
    def __init__(self, max_workers=None, mp_context=None,
                 initializer=None, initargs=(), *, max_tasks_per_child=None):
        ...
        self._call_queue = _SafeQueue(
            max_size=queue_size, ctx=self._mp_context,
            pending_work_items=self._pending_work_items,
            shutdown_lock=self._shutdown_lock,
            thread_wakeup=self._executor_manager_thread_wakeup)
        ...

A _CallItem is just an object used to wrap the task (work_item.fn), its arguments (work_item.args and work_item.kwargs) and the work item ID (work_id) together.

class _CallItem(object):
    def __init__(self, work_id, fn, args, kwargs):
        self.work_id = work_id
        self.fn = fn
        self.args = args
        self.kwargs = kwargs

The _CallItem is created by the Executor Manager Thread and added to the call_queue, which comes directly from the ThreadPoolExecutor object.

class _ExecutorManagerThread(threading.Thread):
    ...
    def __init__(self, executor):
        ...
        # A ctx.Queue that will be filled with _CallItems derived from
        # _WorkItems for processing by the process workers.
        self.call_queue = executor._call_queue
        ...
    def add_call_item_to_queue(self):
        # Fills call_queue with _WorkItems from pending_work_items.
        # This function never blocks.
        ...
                if work_item.future.set_running_or_notify_cancel():
                    self.call_queue.put(_CallItem(work_id,
                                                  work_item.fn,
                                                  work_item.args,
                                                  work_item.kwargs),
                                        block=True)
                ...
The Result Queue

This queue is used for communication from the worker processes back to the main process. It is created by instantiating the ProcessPoolExecutor class.

class ProcessPoolExecutor(_base.Executor):
    def __init__(self, max_workers=None, mp_context=None,
                 initializer=None, initargs=(), *, max_tasks_per_child=None):
        ...
        self._result_queue = mp_context.SimpleQueue()
        ...

After a worker process completes a task, it places the result of the task into this queue.

# Module function passed as target to the Process
def _process_worker(call_queue, result_queue, initializer, initargs, max_tasks=None):
    while True:
        ...

        try:
            r = call_item.fn(*call_item.args, **call_item.kwargs)
        except BaseException as e:
            exc = _ExceptionWithTraceback(e, e.__traceback__)
            _sendback_result(result_queue, call_item.work_id, exception=exc,
                             exit_pid=exit_pid)
        else:
            _sendback_result(result_queue, call_item.work_id, result=r,
                             exit_pid=exit_pid)
            del r
        
        ...
def _sendback_result(result_queue, work_id, result=None, exception=None,
                     exit_pid=None):
    """Safely send back the given result or exception"""
    try:
        result_queue.put(_ResultItem(work_id, result=result,
                                     exception=exception, exit_pid=exit_pid))
    except BaseException as e:
        exc = _ExceptionWithTraceback(e, e.__traceback__)
        result_queue.put(_ResultItem(work_id, exception=exc,
                                     exit_pid=exit_pid))

Then the Executor Manager Thread retrieves the results from this queue to return them to the appropriate Future objects.

class _ExecutorManagerThread(threading.Thread):
    ...
    def process_result_item(self, result_item):
        # Process the received a result_item. This can be either the PID of a
        # worker that exited gracefully or a _ResultItem

        # Received a _ResultItem so mark the future as completed.
        work_item = self.pending_work_items.pop(result_item.work_id, None)
        # work_item can be None if another process terminated (see above)
        if work_item is not None:
            if result_item.exception:
                work_item.future.set_exception(result_item.exception)
            else:
                work_item.future.set_result(result_item.result)
    ...

A _ResultItem is just an object used to wrap the work item ID (work_id), the result (r), the process ID (os.getpid()) and a possible exception.

class _ResultItem(object):
    def __init__(self, work_id, exception=None, result=None, exit_pid=None):
        self.work_id = work_id
        self.exception = exception
        self.result = result
        self.exit_pid = exit_pid

The Wake Up mechanism

When the Executor Manager Thread is not actively performing any management operation it is in a waiting state. During those periods of time it is waiting for something to change, such as when new tasks are submitted, when worker processes complete their tasks, or when shutdown() is called.

The wake up mechanism is a utility designed to assist with waking up threads that are waiting for those events. This is accomplished using the _ThreadWakeup() object, which internally uses a Pipe() object to sends a signal (an empty byte string) through the writer end of the pipe to the reader end.

class ProcessPoolExecutor(_base.Executor):
    def __init__(self, max_workers=None, mp_context=None,
                 initializer=None, initargs=(), *, max_tasks_per_child=None):
        """Initializes a new ProcessPoolExecutor instance.
        ...
        """
        ...

        self._executor_manager_thread_wakeup = _ThreadWakeup()
        ...

Short summary

There is a dedicated thread called Executor Manager Thread which is responsible for monitoring the worker processes, tasks, and results. When a new task is submitted, a result is ready, or there is the need to spawn new worker processes, the wake up mechanism sends it a signal to activate it to do its job.

The ProcessPoolExecutor internally uses three main queues:

  • The call queue, which is responsible for holding the task until a worker process retrieves it.
  • The result queue, which is responsible for holding the result from the worker processes until the main process retieves it.
  • The work IDs queue, which is responsible for holding the tasks IDs to keep track of which tasks are pending.

The ProcessPoolExecutor class extends the abstract Executor class, which defines the submit() and shutdown() methods, and mostly implements the map() one. The Executor class implements the context management protocol, which means the ProcessPoolExecutor object can be used in a with statemen.

The submit() method allows you to schedule a callable to be executed asynchronously in a separate process. It returns a Future instance representing the execution of the callable.

The map() method is the other way we have to submit tasks to the process pool. It is similar to the built-in map(fn, *iterables) function, but the function we pass to it as fn is executed asynchronously and several calls to fn may be made concurrently. It returns an iterator that we can iterate over to get the values from the tasks as they are available in the same order that the iterable we provided. You can use the timeout and chunksize parameters to modify its behavior.

The shutdown() method is responsible for safely shutting down the process pool and cleaning up resources. It signals the executor that it should free any resources that it is using when the currently pending futures are done executing. You can avoid having to call this method explicitly if you use the with statement.

Examples

Let’s start by defining our CPU intensive task. As in the previous posts, it iterates over a range of numbers and adds them into the count variable.

def cpu_bound_task(counts: int) -> tuple[float]:
    """Runs a CPU-bound task."""
    start = perf_counter()
    count = 0
    for i in range(counts):
        count += i
    finish = perf_counter()

    logging.info(f"-------- Process: {os.getpid()} --------")
    logging.info(f"time - {round(finish - start, 4)}\n")
    return start, finish

submit()

We can use the submit() method to schedule the callable to be executed in a separate process, and it returns a Future instance representing its execution.

submit() accepts a function and its arguments in an iterable object.

In this example we are submittin 8 tasks to a process pool of 2 worker processes.

def main():

    n_tasks = 8
    args = [50000000] * n_tasks

    # Run in worker processes
    start = time()
    # Use ProcessPoolExecutor to manage concurrency
    with ProcessPoolExecutor(max_workers=2) as executor:
        # Submit tasks
        futures = [
            executor.submit(cpu_bound_task, count) for count in args
        ]  # does not block

        # Retrieve the results
        times = [future.result() for future in futures]  # blocks
    end = time()

    total_time = end - start
    logging.info(f"Total time: {total_time}")

When retieving the results the main thread of the main process blocks. They are retrieved sequentially, in the same order they were submitted.

As there are two available workers, only two processes can run in parallel. When one of the processes completes it retrieves the following tasks to execute it.

Time spent to execute 8 CPU-bound tasks in 2 worker processes in parallel
Time spent to execute 8 CPU-bound tasks in 2 worker processes in parallel

Below you can see the two process IDs, one for each process.

17:28:15: Init parallel program
17:28:16: -------- Process: 11327 --------
17:28:16: time - 1.0103

17:28:16: -------- Process: 11326 --------
17:28:16: time - 1.0427

17:28:17: -------- Process: 11327 --------
17:28:17: time - 1.0171

17:28:17: -------- Process: 11326 --------
17:28:17: time - 1.0471

17:28:18: -------- Process: 11327 --------
17:28:18: time - 1.0201

17:28:18: -------- Process: 11326 --------
17:28:18: time - 1.0459

17:28:19: -------- Process: 11327 --------
17:28:19: time - 1.0306

17:28:20: -------- Process: 11326 --------
17:28:20: time - 1.0541

17:28:20: Total time: 4.660969018936157
17:28:20: Finish parallel program

This program would have taken more than 8 seconds to complete if it had been executed sequentially. But because we have run it in parallel the time needed to complete has been cut in half.

as_completed()

The as_completed() module function can be used to retrieve the results as they become available, so they will not necessarily be extracted in the same order in which they were submitted.

def main():

    n_tasks = 8
    args = [50000000] * n_tasks

    # Run in worker processes
    start = time()
    # Use ProcessPoolExecutor to manage concurrency
    with ProcessPoolExecutor(max_workers=2) as executor:
        # Submit tasks
        futures = [
            executor.submit(cpu_bound_task, count) for count in args
        ]  # does not block

        # Process the results
        times = [future.result() for future in futures]  # blocks
        times = [future.result() for future in as_completed(futures)] # blocks
    end = time()

    total_time = end - start
    logging.info(f"Total time: {total_time}")
Time spent to execute 8 CPU-bound tasks in 2 worker processes in parallel
Time spent to execute 8 CPU-bound tasks in 2 worker processes in parallel

map()

We can use the map() method to submit tasks to the process pool.

It returns an iterator that we can iterate over to get the values from the tasks as they are available in the same order that the iterable we provided.

def main():

    n_tasks = 8
    args = [50000000] * n_tasks

    # Run in worker processes
    start = time()
    # Use ProcessPoolExecutor to manage concurrency
    with ProcessPoolExecutor(max_workers=2) as executor:
        # Submit tasks
        results = executor.map(cpu_bound_task, args)  # does not block

        # Retrieve results
        times = [result for result in results]  # blocks
    end = time()

    total_time = end - start
    logging.info(f"Total time: {total_time}")

The main thread of the main process is blocked when we retrieve the results.

Time spent to execute 8 CPU-bound tasks in 2 worker processes in parallel
Time spent to execute 8 CPU-bound tasks in 2 worker processes in parallel

If timeout is not specified or None, there is no limit to the wait time. So when we start iterating, we won’t access the second element of the iterator until the first one is available. The returned iterator raises a TimeoutError if the result isn’t available after timeout seconds when it is set to a specific int or float.

map() and chunksize parameter

The chunksize parameter can be used to optimize the program.

It is 1 by default but if you had a very long iterable it could lead to a more time consuming program with poor performace, since for each element in the iterable the program submits a task to be executed with that argument. This means that for each element in the iterable the whole process of picking, queuing, retrieving, etc. is executed.

Instead, you can reduce the number of tasks sent to be processed by setting the chunksize parameter. The map() method chops iterables into a number of chunks which it submits to the pool as separate tasks. The (approximate) size of these chunks can be specified by setting chunksize to a positive integer.

Below we have 24 elements in our iterable args and a chunksize of 3. This means that the 24 tasks will be sent as if they were 8 instead of 24, and each task will have 3 elements from the iterable.

def main():

    n_tasks = 24
    args = [50000000] * n_tasks
    max_workers = 4
    chunksize = 3

    # Run in worker processes
    start = time()
    # Use ProcessPoolExecutor to manage concurrency
    with ProcessPoolExecutor(max_workers=max_workers) as executor:
        # Submit tasks
        results = executor.map(cpu_bound_task, args)  # does not block
        results = executor.map(
            cpu_bound_task, args, chunksize=chunksize 
        )  # does not block

        # Retrieve results
        times = [result for result in results]  # blocks
    end = time()

    total_time = end - start
    logging.info(f"Total time: {total_time}")

The image below shows that the first four tasks are executed in parallel and each one process three elements from the iterable. When they complete, the other four start.

Time spent to execute 24 CPU-bound tasks in 4 worker processes in parallel
Time spent to execute 24 CPU-bound tasks in 4 worker processes in parallel

The ProcessPoolExecutor in Python: The Complete Guide from SuperFastPython is an extensive post on how to work with the ProcessPoolExecutor class. You can find there an example on how to perform tuning to the chunksize parameter when working with distributed systems.

Multiprocessing and multithreading

Multiprocessing is a powerful asset for every programmer, but you can take it to the next level by leveraging it along with multithreading.

For this example we will rescue our list of exotic Australian animals from the post Threading in Python. But this time we have a list of lists, and each list within the URLS list has five urls. Each url gives you useful info about an Australian animal 🦘.

URLS = [
    [
        "https://en.wikipedia.org/wiki/Emu",
        "https://en.wikipedia.org/wiki/Wombat",
        "https://en.wikipedia.org/wiki/Kangaroo",
        "https://en.wikipedia.org/wiki/Platypus",
        "https://en.wikipedia.org/wiki/Koala",
    ],
    [
        "https://en.wikipedia.org/wiki/Tasmanian_devil",
        "https://en.wikipedia.org/wiki/Echidna",
        "https://en.wikipedia.org/wiki/Dingo",
        "https://en.wikipedia.org/wiki/Kookaburra",
        "https://en.wikipedia.org/wiki/Wallaby",
    ],
    [
        "https://en.wikipedia.org/wiki/Macrotis",
        "https://en.wikipedia.org/wiki/Quokka",
        "https://en.wikipedia.org/wiki/Cassowary",
        "https://en.wikipedia.org/wiki/Sugar_glider",
        "https://en.wikipedia.org/wiki/Laughing_kookaburra",
    ],
    [
        "https://en.wikipedia.org/wiki/Rainbow_lorikeet",
        "https://en.wikipedia.org/wiki/Coastal_taipan",
        "https://en.wikipedia.org/wiki/Mistletoebird",
        "https://en.wikipedia.org/wiki/Thylacine",
        "https://en.wikipedia.org/wiki/Quoll",
    ],
]

Our program will execute I/O-bound operations as well as CPU-bound ones.

I/O operations will consist of opening and reading urls containing exotic Australian animals information.

CPU-bound operations will iterate over a range of numbers and adds them into a variable. The length of the range will be the length of the content returned, multiplied by 4 (only to further extend its execution).

animals = {}
counts = 0

# I/O-bound operation
def load_url(url: str) -> tuple[float]:
    """Retrieve a single page and return start and finish times."""
    logging.info(f"PID: {os.getpid()} ; url: {url}")

    start = perf_counter()
    try:
        with urllib.request.urlopen(url) as conn:
            animals[url] = conn.read()
    except Exception:
        logging.error(f"PID: {os.getpid()} ; url: {url}")
        return start, start
    finish = perf_counter()

    global counts
    counts += len(animals[url]) * 4

    return start, finish

def cpu_bound_task(counts: int) -> tuple[float]:
    """Runs a CPU-bound task."""
    logging.info(f"PID: {os.getpid()} ; counts: {counts}")

    start = perf_counter()
    count = 0
    for i in range(counts):
        count += i
    finish = perf_counter()
    return start, finish

Both fuctions will be called within task(). This is the function that will be passed to ProcessPoolExecutor as argument, together along with the URLS list.

The tasks() function receives a list of urls, then creates several threads and passes the load_url function to the map() method, together along with the urls.

Every list of urls has five, so five threads will be created in each process (max_workers=len(urls)), each one will process one url.

Finally, the cpu_bound_task will be executed.

So each worker process will start five thread to open the urls, and when all of them are completed a CPU-bound operation will be executed.

def task(urls: list[str]) -> list:
    logging.info(f"Loading australian animals... PID: {os.getpid()}")

    start = perf_counter()
    with ThreadPoolExecutor(max_workers=len(urls)) as executor:
        # Use the map method to apply load_url to each URL
        results = executor.map(load_url, urls)  # does not block

        # Process the results and times
        times = [result for result in results]  # blocks

    logging.info(f"Australian animals loaded! PID: {os.getpid()}")

    times.append(cpu_bound_task(counts))

    finish = perf_counter()
    logging.info(f"PID: {os.getpid()} time - {round(finish - start, 4)}")

    return times

From the main thread we use the ProcesPoolExecutor to submit the tasks to be executed by the separate processes.

def main():

    n_tasks = 20 + 4
    max_workers = 4

    start = time()
    # Use ProcessPoolExecutor to manage concurrency
    with ProcessPoolExecutor(max_workers=max_workers) as executor:
        # Submit tasks
        results = executor.map(task, URLS)  # does not block

        # Retrieve results
        times = [result for result in results]  # blocks
    end = time()

    total_time = end - start
    logging.info(f"Total time: {total_time}")

If we had executed all these I/O-bound and CPU-bound operations sequentially it would have taken us almost 10 seconds, but thanks to multiprocessing and multithreading we have been able to reduce it to less than 1 second!

Time spent to execute 20 I/O-bound and 4 CPU-bound tasks in 4 worker processes and with 5 threads each
Time spent to execute 20 I/O-bound and 4 CPU-bound tasks in 4 worker processes and with 5 threads each
20:20:20: Init parallel program
20:20:21: Loading australian animals... PID: 19807
20:20:21: PID: 19807 ; url: https://en.wikipedia.org/wiki/Emu
20:20:21: PID: 19807 ; url: https://en.wikipedia.org/wiki/Wombat
20:20:21: PID: 19807 ; url: https://en.wikipedia.org/wiki/Kangaroo
20:20:21: PID: 19807 ; url: https://en.wikipedia.org/wiki/Platypus
20:20:21: PID: 19807 ; url: https://en.wikipedia.org/wiki/Koala
20:20:21: Loading australian animals... PID: 19806
20:20:21: PID: 19806 ; url: https://en.wikipedia.org/wiki/Tasmanian_devil
20:20:21: PID: 19806 ; url: https://en.wikipedia.org/wiki/Echidna
20:20:21: PID: 19806 ; url: https://en.wikipedia.org/wiki/Dingo
20:20:21: PID: 19806 ; url: https://en.wikipedia.org/wiki/Kookaburra
20:20:21: PID: 19806 ; url: https://en.wikipedia.org/wiki/Wallaby
20:20:21: Loading australian animals... PID: 19808
20:20:21: PID: 19808 ; url: https://en.wikipedia.org/wiki/Macrotis
20:20:21: PID: 19808 ; url: https://en.wikipedia.org/wiki/Quokka
20:20:21: PID: 19808 ; url: https://en.wikipedia.org/wiki/Cassowary
20:20:21: PID: 19808 ; url: https://en.wikipedia.org/wiki/Sugar_glider
20:20:21: PID: 19808 ; url: https://en.wikipedia.org/wiki/Laughing_kookaburra
20:20:21: Loading australian animals... PID: 19805
20:20:21: PID: 19805 ; url: https://en.wikipedia.org/wiki/Rainbow_lorikeet
20:20:21: PID: 19805 ; url: https://en.wikipedia.org/wiki/Coastal_taipan
20:20:21: PID: 19805 ; url: https://en.wikipedia.org/wiki/Mistletoebird
20:20:21: PID: 19805 ; url: https://en.wikipedia.org/wiki/Thylacine
20:20:21: PID: 19805 ; url: https://en.wikipedia.org/wiki/Quoll
20:20:21: Australian animals loaded! PID: 19807
20:20:21: PID: 19807 ; counts: 7658572
20:20:21: Australian animals loaded! PID: 19808
20:20:21: PID: 19808 ; counts: 4782624
20:20:21: Australian animals loaded! PID: 19806
20:20:21: PID: 19806 ; counts: 6722396
20:20:21: Australian animals loaded! PID: 19805
20:20:21: PID: 19805 ; counts: 4774528
20:20:21: PID: 19808 time - 0.402
20:20:21: PID: 19805 time - 0.4078
20:20:21: PID: 19806 time - 0.4613
20:20:21: PID: 19807 time - 0.5056
20:20:21: Total time: 1.1066241264343262
20:20:21: Finish parallel program

Thread safety with Locks

There is one detail to take care of. Modifying shared state (like counts) from multiple threads can lead to race conditions and inconsistent results.

The counts variable is being incremented based on the length of data read from URLs. If multiple threads modify counts simultaneously, their operations can interleave in ways that lead to incorrect final values.

To safely modify counts from multiple threads, we can use threading.Lock to ensure that only one thread can modify it at a time.

A lock is a synchronization primitive that provides exclusive access to a shared resource.

When a thread acquires a lock (acquire() method) the lock gets the locked state and only that thread can access and modify the resource. When the lock is released (release() method) it gets the unlocked state and other threads can acquire the lock.

I am not going to continue this topic as I would like to study it further and write about it. In the snipped code below is the implementation using it as context manager in a with satatement. Now our program is thread safe! ✨

counts = 0
counts_lock = threading.Lock()


# I/O-bound operation
def load_url(url: str) -> tuple[float]:
    """Retrieve a single page and return start and finish times."""
    logging.info(f"PID: {os.getpid()} ; url: {url}")

    start = perf_counter()
    try:
        with urllib.request.urlopen(url) as conn:
            animals[url] = conn.read()
    except Exception:
        logging.error(f"PID: {os.getpid()} ; url: {url}")
        return start, start
    finish = perf_counter()

    global counts
    with counts_lock:
        counts += len(animals[url]) * 4

    return start, finish
Time spent to execute 20 I/O-bound and 4 CPU-bound tasks in 4 worker processes with 5 threads each
Time spent to execute 20 I/O-bound and 4 CPU-bound tasks in 4 worker processes with 5 threads each

Below I leave some resources that helped me to understand this topic better.

Feel free to contact me by any social network. Any feedback is appreciated!

Thanks for reading 💜

Other Resources

By Javier Castaño on Jun 10, 2024