Threading in Python

The threading module and the ThreadPoolExecutor class

Time spent to load exotic Australian animals asynchronously
Time spent to load twenty exotic Australian animals asynchronously

Table of Contents

Intro

This post is an introduction to multithreading in Python with the threading module and the ThreadPoolExecutor class from the concurrent.futures module.

The resources section at the end has some links to wonderful material that you can use to go deeper into the topic 🤓

Related posts

The code from this post can be found in this repository.

What is a Thread

A thread is the basic unit of execution within a process. It is an independent flow of execution that shares the same address space as other independent flows of execution within the same process. A process can have one or more threads, one of them is the main thread, which is the default thread of a Python process.

Processes and threads
Processes and threads

📘 In Python we can use the threading module to write programs that use multiple threads, a well as the ThreadPoolExecutor class from the concurrent.futures module.

If we write the program so it makes use of several threads, then the program will be able to run concurrently in one core. It is also possible executing a one thread program concurrently by using coroutines.

The threads within each Python (CPython implementation) process cannot run in parallel even when multiple cores are available because of Python’s Global Interpreter Lock (GIL), unlike threads in other programming languages such as Java, C/C++, and Go. If you have CPU-bound operations and need parallel implementation in Python you should use the multiprocessing module or the ProcessPoolExecutor class (see Multiprocessing in Python).

Imagine that we write a program that will become a single process when it starts execution. Also, the process will have two threads. When there are two threads it is possible to start playing with concurrency.

In a single-core CPU it is possible for the program to execute concurrently. With one core and two threads, the threads can be switched one for the other in the same core. That is called context switching.

During a context switch, one thread is switched out of the CPU so another thread can run. For this purpose, the state of the process or thread is stored, so that it can be restored and resume execution at a later point, and then another previously saved state is restored.

Context switching is usually computationally expensive, switching from one process or thread to another requires a certain amount of time for saving and loading registers and other operations. Switch context between threads is generally faster than between processes.

Threading use cases

The tasks where multithreading is best suited for use are I/O-bound operations. For example, if a thread executes an instruction that has to make a request to a database, it would not make sense to block the CPU core with a thread waiting for a response. Instead, it would be a better use of resources to allow another thread to use the core while the first thread is waiting.

In the figure below the empty circles represent I/O operations where the thread keeps waiting for something to occur. When the first I/O operation begins (empty green circle) the operative system quickly switch the waiting thread for the red one in order to better allocate computational resources. This is a decision taken by the OS, the developer cannot decide when to switch between threads.

Concurrent execution of tasks with I/O-bound operations
Concurrent execution of tasks with I/O-bound operations

If the program does not use multiples threads running concurrently and instead it runs tasks within a single thread sequentially, it would need to wait for the green task to complete to begins the execution of the red task, so it would spend more time to complete both tasks.

Sequential execution of tasks with I/O-bound operations
Sequential execution of tasks with I/O-bound operations

So when dealing with I/O operations multithreading is a good choice to achieve a better allocation of resources.

Now let’s see some implementations of multithreaded programs! 🥷🏽

Python threading first steps

First let’s define the I/O-bound and the CPU-bound tasks. The io_bound_operation just sleeps as long as the number of seconds specified. The cpu_bound_operation adds the range of numbers specified. Both functions append the results to the shared_list. Remember that threads in the same process can share data.

import logging
from threading import Thread
from time import perf_counter, sleep

from concurrency.utils import flaten_list_of_lists, get_saving_path, postprocess_times
from concurrency.visualize import barh


format = "%(asctime)s: %(message)s"
logging.basicConfig(format=format, level=logging.INFO, datefmt="%H:%M:%S")


shared_list = []  # threads from the same process share data


def io_bound_operation(secs: float | int) -> None:
    """Run 1 I/O-bound task of secs seconds and append the results to shared_list."""
    start = perf_counter()
    sleep(secs)
    finish = perf_counter()

    shared_list.append([(start, finish)])


def cpu_bound_operation(n: int) -> None:
    """CPU-bound task."""
    start = perf_counter()
    count = 0
    for i in range(n):
        count += i
    finish = perf_counter()

    shared_list.append([(start, finish)])

Now we are going to create two new threads: t1 and t2. When instantiating the Thread object, we need to add the target, which is the task/function we want to run in the thread. Arguments can be passed via the args parameter, which accepts an Iterable object.

For this example, we will make the I/O-bound operation last for 1 second, while my processor takes approximately 3.5 seconds to add those 100,000,000 numbers.

def threading_two_threads():
    # Create two thread objects
    t1 = Thread(target=io_bound_operation, args=(1,))
    t2 = Thread(target=cpu_bound_operation, args=(100000000,))

    # Start activity -> invokes run() method
    t1.start()
    sleep(0.1)
    t2.start()

    # Block the calling thread -> Avoids continuing to run without threads
    # being finished
    t1.join()
    t2.join()

    logging.info(f"shared_list {shared_list}")

Then we need to start the thread’s activity. This is done by calling the start() method. It arranges for the object’s run() method to be invoked in a separate thread of control.

Also, there is a sleep(0.1) function to make the second thread start a bit later. It will help us to better visualize.

def threading_two_threads():
    # Create two thread objects
    t1 = Thread(target=io_bound_operation, args=(1,))
    t2 = Thread(target=cpu_bound_operation, args=(100000000,))

    # Start activity -> invokes run() method
    t1.start()
    sleep(0.1)
    t2.start()

    # Block the calling thread -> Avoids continuing to run without threads
    # being finished
    t1.join()
    t2.join()

    logging.info(f"shared_list {shared_list}")

Finally, we must call the Thread object’s join() method if we want to wait until the thread terminates.

The main thread will not exit until both threads have completed.

By joining the threads we are blocking the calling thread (main thread) until the thread whose join() method is called terminates — either normally or through an unhandled exception or until the optional timeout occurs.

You can play with this example, if you comment both join() methods the program will raise an exception, since there will be nothing in the shared_list and the postprocess_times function will try to index the empty list.

def threading_two_threads():
    # Create two thread objects
    t1 = Thread(target=io_bound_operation, args=(1,))
    t2 = Thread(target=cpu_bound_operation, args=(100000000,))

    # Start activity -> invokes run() method
    t1.start()
    sleep(0.1)
    t2.start()

    # Block the calling thread -> Avoids continuing to run without threads
    # being finished
    t1.join()
    t2.join()

    logging.info(f"shared_list {shared_list}")

In order to visualize the time spent by the threads the threading_two_threads function has a couple of additional functions to postprocess and plot them on the horizontal bar chart. The code can be found in the repo.

def threading_two_threads():
    # Create two thread objects
    t1 = Thread(target=io_bound_operation, args=(1,))
    t2 = Thread(target=cpu_bound_operation, args=(100000000,))

    # Start activity -> invokes run() method
    t1.start()
    sleep(0.1)
    t2.start()

    # Block the calling thread -> Avoids continuing to run without threads
    # being finished
    t1.join()
    t2.join()

    logging.info(f"shared_list {shared_list}")

    # Just some processing for chart
    start_points, end_points = postprocess_times(flaten_list_of_lists(shared_list))
    # start_points, end_points = postprocess_times(shared_list)

    barh(
        title="Concurrent execution, 2 threads, 1 I/O-bound task of 1s + 1 \
               CPU-task of 3.5s approx",
        start_points=start_points,
        end_points=end_points,
        path=get_saving_path("threading/images/first_multithreaded_program.png"),
        n=2,
    )


if __name__ == "__main__":
    logging.info(f"Init concurrent tasks")
    threading_two_threads()
    logging.info(f"Finish concurrent tasks")

The image below shows the time spent by each thread to complete. The sleep function makes the second thread (cpu_bound_operation) start a little later. The first thread (0 in the graph) starts and 0.1 seconds later the second one starts.

As the I/O-bound task only lasts for 1 second and the function io_bound_operation only has to do that operation, during the time that the I/O-bound task is waiting (the whole second) the CPU-bound task can be executing. That is why the CPU-bound task (second thread) only lasts 3.5 seconds approximately and is not delayed by the I/O-bound task.

Time spent by multithreaded program
Time spent by multithreaded program. 2 threads, 1 I/O-bound task of 1s + 1 CPU-task of 3.5s approx

The Thread object is a naive way of creating threads, there are more convenient ways of doing it. However, let’s look a some more simple examples before delving a little deeper.

Visualizing multihreading times with threading module

Example 1 - 2 threads

Thread 1 - 1 I/O-bound operation of 1s and 1 CPU-bound operation of 1s approx.

Thread 2 - 1 CPU-bound task of 3.5 seconds approximately

Now let’s consider that the first thread runs a task consisting of a CPU-bound operation of approximately 1 second and an I/O-bound operation of 1 second instead of having a task that only executes an I/O-bound operation.

So now we have a program that creates two new threads, one running an I/O-bound operation and a CPU-bound operation and another running the CPU-bound operation of 3.5 seconds approximately.

def cpu_io_bound_operations(secs: float | int, n: int) -> None:
    """Run 1 function that execute 1 I/O-bound task of secs seconds and 1 CPU-bound task.
    Append the results to shared_list."""
    start = perf_counter()
    count = 0
    for i in range(n):  # CPU-bound
        count += i
    sleep(secs)  # I/O-bound
    finish = perf_counter()

    shared_list.append([(start, finish)])

The thread 2 needs 3.5 seconds of processor approximately, while the thread 1 only needs 1 second.

The thread 1 only needs 1 second due to the CPU-bound operation, since the waiting time of the I/O-bound task can be used by the thread 2.

If we add 3.5s (thread 2) + 1s (thread 1) we have 4.5s of CPU work.

The graph below shows just that, both tasks together last 4.5 seconds. The time required by each CPU-intensive task may vary slightly.

Time spent by multithreaded program
Time spent by multithreaded program. 2 threads, (1 I/O-bound op of 1s and 1 CPU-bound op of 1s) + 1 CPU-task of 3.5s approx

However, the thread 1 spends 3 seconds to terminate. That is because we don’t control when context switches occur, so the thread 1 may have been waiting some time to use the processor, even when the I/O-bound was terminated. Context switches are out of developer’s control, so they can occur more time than we actually would like and in moments we wouldn’t switch one thread for the other.

Now let’s take a quick look at some more examples! If you already got it you can skip this part and go directly to the next section: ThreadPoolExecutor 🚀

Example 2 - 1 thread

10 IO-bound operations of 1s sequentially

Here, we are representing a sequential execution and we don’t need to create more threads, the main thread is enough.

def sequential(n: int = 10, secs: float | int = 1) -> None:
    """Perform n I/O-bound operations of secs seconds sequentially in one thread
    and plot a horizontal bar chart.
    """
    # Perform n I/O-bound operations, save a tuple for each task
    times = [io_bound_operation(secs) for _ in range(n)]
    start_points, end_points = postprocess_times(times)

    barh(
        title="Sequential execution, 1 thread, 10 I/O-bound tasks of 1s",
        start_points=start_points,
        end_points=end_points,
        path=get_saving_path("threading/images/ex_1_one_thread.png"),
    )

In the figures above a thread was represented as a bar because each thread performed a task (even when we joined an I/O-bound and a CPU-bound operation we considered them as belonging to the same task).

Now we consider 10 different I/O-bound tasks executed in the same thread, so we can better visualize each task in a bar. So those ten bars belong to the same thread.

Time spent by single-thread program
Time spent by single-thread program. 1 thread, 10 I/O-bound tasks of 1s

Example 3 - 1 thread

2 CPU-bound tasks

If we execute two CPU-bound tasks of 3.5 seconds approximately in the same thread sequentially we see that they take about 7 seconds to complete.

Before the second task begins, the first one have to complete.

def sequential(counts: int, n: int = 10) -> None:
    # Perform n CPU-bound operations, save a tuple for each task
    times = [cpu_bound_operation(counts) for _ in range(n)]
    start_points, end_points = postprocess_times(times)
Time spent by sequential program
Time spent by single-thread program. 1 thread, 2 CPU-bound tasks of 3.5s approx

Example 4 - 2 threads

Thread 1 - 1 CPU-bound task of 3.5 seconds approximately

Thread 2 - 1 CPU-bound task of 3.5 seconds approximately

The same two CPU-bound tasks from above show a very different chart when we execute them concurrently. Both tasks seem to take 7 seconds, but in reality they take 3.5. They just switch back and forth until they are both finished.

This is not a proper use of multithreading, it is only for didactic purposes. There is no time improvement using multithreading with only CPU-bound tasks.

def thread_cpu_bound_operations(counts: int) -> None:
    """Run a CPU-bound task and append the results to shared_list."""
    shared_list.append([cpu_bound_operation(counts)])


def threading_two_threads() -> None:
    # Create two thread objects, each thread will perform five I/O-bound tasks
    t1 = Thread(target=thread_cpu_bound_operations, args=(100000000,))
    t2 = Thread(target=thread_cpu_bound_operations, args=(100000000,))

    # Start activity -> invokes run() method
    t1.start()
    t2.start()

    # Block the calling thread -> Avoids continuing to run without threads being finished
    t1.join()
    t2.join()
Time spent by multithreaded program
Time spent by multithreaded program. 2 threads, 1 CPU-bound tasks of 3.5s approx each

Example 5 - 2 threads

Thread 1 - 5 I/O-bound tasks of 1s each

Thread 2 - 5 I/O-bound tasks of 1s each

Ten I/O-bound tasks of 1 second and two threads. Each thread is in charge of executing five I/O-bound tasks sequentially, and both groups of five tasks are executed concurrently.

def thread_io_bound_operations(n: int, secs: float | int) -> None:
    """Run n I/O-bound tasks of secs seconds and append the results to shared_list."""
    shared_list.append([io_bound_operation(secs) for _ in range(n)])


def threading_two_threads() -> None:
    # Create two thread objects, each thread will perform five I/O-bound tasks
    t1 = Thread(target=thread_io_bound_operations, args=(5, 1))
    t2 = Thread(target=thread_io_bound_operations, args=(5, 1))

    # Start activity -> invokes run() method
    t1.start()
    t2.start()

    # Block the calling thread -> Avoids continuing to run without threads being finished
    t1.join()
    t2.join()
Time spent by multithreaded program
Time spent by multithreaded program. 2 threads, 5 I/O-bound tasks of 1s each

Example 6 - 10 threads

Each Thread - 1 I/O-bound task of 1s

Similar to the last example, but now we have ten threads instead of two, and each one is in charge of executing only one I/O-bound task of 1 second.

def thread_io_bound_operations(n: int, secs: float | int) -> None:
    """Run n I/O-bound tasks of secs seconds and append the results to shared_list."""
    shared_list.append([io_bound_operation(secs) for _ in range(n)])


def threading_two_threads() -> None:
    threads = []
    # Create ten thread objects, each thread will one I/O-bound tasks
    for _ in range(10):
        t = Thread(target=thread_io_bound_operations, args=(1, 1))
        t.start()
        threads.append(t)

    # Block the calling thread -> Avoids continuing to run without threads being finished
    [thread.join() for thread in threads]
Time spent by multithreaded program
Time spent by multithreaded program. 10 threads, 1 I/O-bound tasks of 1s each

Example 7 - 2 threads

Thread 1 - 1 CPU-bound task of 3.5s approx

Thread 2 - 5 I/O-bound tasks of 1s each

Now we have two threads. The thread 1 executes one CPU-bound operation of 3.5 seconds approximately, and the thread 2 executes five I/O-bound tasks of 1 second.

While the I/O tasks are waiting the CPU intensive task is executing, as every time an I/O task begins, the OS switches from one thread to the other very quickly.

def thread_io_bound_operations(n: int, secs: float | int) -> None:
    """Run n I/O-bound tasks of secs seconds and append the results to shared_list."""
    shared_list.append([io_bound_operation(secs) for _ in range(n)])


def thread_cpu_bound_operations(counts: int) -> None:
    """Run a CPU-bound task and append the results to shared_list."""
    shared_list.append([cpu_bound_operation(counts)])


def threading_two_threads() -> None:
    # Create two thread objects, each thread will perform five I/O-bound tasks
    t1 = Thread(target=thread_cpu_bound_operations, args=(100000000,))
    t2 = Thread(target=thread_io_bound_operations, args=(5, 1))

    # Start activity -> invokes run() method
    t1.start()
    t2.start()

    # Block the calling thread -> Avoids continuing to run without threads being finished
    t1.join()
    t2.join()
Time spent by multithreaded program
Time spent by multithreaded program. Thread 1: 1 CPU-bound task of 3.5s approx, Thread 2: 5 I/O-bound tasks of 1s

Example 8 - 6 threads

Thread 1 - 1 CPU-bound task of 3.5s approx (bar 5)

Thread 2 - 1 CPU-bound task (bar 4)

Thread 3 - 1 CPU-bound task (bar 3)

Thread 4 - 1 I/O-bound task of 1s

Thread 5 - 1 I/O-bound task of 1s

Thread 6 - 1 I/O-bound task of 1s

Here, we have three threads in charge of executing one I/O task each, and three threads in charge of executing one CPU intensive task each. The three CPU intensive tasks take different amounts of time to complete.

The longest task, which last 3.5 seconds, is the first to begin (5). It takes almost 6 seconds to complete due to the other two CPU intensive tasks.

def thread_io_bound_operations(n: int, secs: float | int) -> None:
    """Run n I/O-bound tasks of secs seconds and append the results to shared_list."""
    shared_list.append([io_bound_operation(secs) for _ in range(n)])


def thread_cpu_bound_operations(counts: int) -> None:
    """Run a CPU-bound task and append the results to shared_list."""
    shared_list.append([cpu_bound_operation(counts)])


def threading_six_threads() -> None:
    # Create two thread objects, each thread will perform five I/O-bound tasks
    t1 = Thread(target=thread_cpu_bound_operations, args=(100000000,))
    t2 = Thread(target=thread_cpu_bound_operations, args=(50000000,))
    t3 = Thread(target=thread_cpu_bound_operations, args=(20000000,))
    t4 = Thread(target=thread_io_bound_operations, args=(1, 1))
    t5 = Thread(target=thread_io_bound_operations, args=(1, 1))
    t6 = Thread(target=thread_io_bound_operations, args=(1, 1))

    # Start activity -> invokes run() method
    t1.start()
    t2.start()
    t3.start()
    t4.start()
    t5.start()
    t6.start()

    # Block the calling thread -> Avoids continuing to run without threads being finished
    t1.join()
    t2.join()
    t3.join()
    t4.join()
    t5.join()
    t6.join()
Time spent by multithreaded program
Time spent by multithreaded program. 6 threads, 3 I/O-bound tasks of 1s and 3 CPU-bound tasks

Example 9 - 4 threads

The thread 1 executes two CPU-bound operations of 3.5 seconds each sequentially (bars 6 and 7). The thread 2 executes two CPU-bound operations of almost 1 second each sequentially (bars 4 and 5).

The other bars represent the I/O-bound tasks, two of 1s for each thread.

def thread_io_bound_operations(n: int, secs: float | int) -> None:
    """Run n I/O-bound tasks of secs seconds and append the results to shared_list."""
    shared_list.append([io_bound_operation(secs) for _ in range(n)])


def thread_cpu_bound_operations(counts: int, n: int) -> None:
    """Run a CPU-bound task and append the results to shared_list."""
    shared_list.append([cpu_bound_operation(counts) for _ in range(n)])


def threading_four_threads() -> None:
    # Create two thread objects, each thread will perform five I/O-bound tasks
    t1 = Thread(target=thread_cpu_bound_operations, args=(100000000, 2))
    t2 = Thread(target=thread_cpu_bound_operations, args=(20000000, 2))
    t3 = Thread(target=thread_io_bound_operations, args=(2, 1))
    t4 = Thread(target=thread_io_bound_operations, args=(2, 1))

    # Start activity -> invokes run() method
    t1.start()
    t2.start()
    t3.start()
    t4.start()

    # Block the calling thread -> Avoids continuing to run without threads being finished
    t1.join()
    t2.join()
    t3.join()
    t4.join()
Time spent by multithreaded program
Time spent by multithreaded program. 4 threads, 4 I/O-bound tasks of 1s and 4 CPU-bound tasks

When Thread 1 (bars 6 and 7) is executing its first 3.5s CPU-bound task (bar 6), it is alternated with the other threads, so it finally takes 5s to finish execution. All the I/O-bound operations take 1s approx to finish, but while they are waiting the CPU intensive tasks can be executing. So the Thread 1 takes 5s for its first operation mainly due to the two CPU-bound operations from the Thread 2 (bars 4 and 5).

When Thread 1 (bars 6 and 7) is executing its second 3.5s CPU-bound task (bar 7), it has all the power of the processor to itself. That’s because it takes only 3.5s approx to finish.

These were some examples to clarify the concept, now let’s look at other more convenient ways to do this!

ThreadPoolExecutor

The concurrent.futures module provides a ThreadPoolExecutor object that we can use to create threads and a ProcessPoolExecutor object for multiprocessing.

As we focus on threads in this article, we will only work with ThreadPoolExecutor.

Why we need it

The ThreadPoolExecutor class is an Executor subclass that uses a pool of threads to execute calls asynchronously.

A thread pool is a software design pattern for achieving concurrency of execution in a computer program. It maintains multiple threads waiting for tasks to be allocated for concurrent execution by the supervising program.

So the ThreadPoolExecutor creates and manages a collection of threads or worker threads which can be reused, avoidind the creation and destruction of threads every time we want execute a task concurrently, as we did above. This will improve performance as these operations are time consuming.

How it works

The Executor class

As well as the ProcessPoolExecutor class, the ThreadPoolExecutor also extends the Executor class, which is an abstract base class that defines only five methods:

  • submit()
  • map()
  • shutdown()

Executor source code Python 3.13

The other two methods are actually the __enter__() and __exit__() Python’s magic methods to implement the context management protocol. Thanks to them we can use the ThreadPoolExecutor in a with statement (recommended). The with statement will call the __enter__() method, and __exit__() will be called when the execution leaves the with code block.

class Executor(object):
    """This is an abstract base class for concrete asynchronous executors."""
    ...
    def __enter__(self):
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        self.shutdown(wait=True)
        return False

Executor is only an abstract class and most of the logic is implemented in the ThreadPoolExecutor methods. The submit() and shutdown() methods are implemented in the ThreadPoolExecutor class while the logic for the map() method is implemented in the Executor class, since it uses submit() internally.

The ThreadPoolExecutor class

ThreadPoolExecutor in Python’s concurrent.futures module uses a queue internally to manage tasks. The queue is created in the ThreadPoolExecutor’s constructor.

ThreadPoolExecutor source code. Python 3.13

__init__() method

The __init__() method initializes a new ThreadPoolExecutor instance and creates the queue and a few more objects.

The SimpleQueue class below is a simple, unbounded FIFO queue. If follows the First-In-First-Out principle, which specifies that an item will be processed or removed from a queue in the order in which it enters.

We can set the maximun number of threads that can be used by passing it as an argument to the max_workers parameter. If we don’t do it, it will be the number of processors in the machine plus 4, limited to 32:

class ThreadPoolExecutor(_base.Executor):
    ...

    def __init__(self, max_workers=None, thread_name_prefix='',
                 initializer=None, initargs=()):
        ...
        if max_workers is None:
            # We use process_cpu_count + 4 for both types of tasks.
            # But we limit it to 32 to avoid consuming surprisingly large resource
            # on many core machine.
            max_workers = min(32, (os.process_cpu_count() or 1) + 4)
        ...

        self._max_workers = max_workers
        self._work_queue = queue.SimpleQueue()
        ...

We can also pass an optional name prefix to give our threads, a callable used to initialize worker threads, and a tuple with its arguments.

submit() method

The submit() method schedules the callable to be executed. It is submitted to the thread pool. The callable is the function whose name we pass as an argument along with its arguments.

with ThreadPoolExecutor(max_workers=1) as executor:
    future = executor.submit(pow, 323, 1235)
    print(future.result()) # blocks

The task is wrapped in a Future object which represents the asynchronous execution of the callable, and it is returned immediately by the submit() method.

A Future is just an abstraction that represents an eventual result of an asynchronous operation, it’s an object that acts as a placeholder for a result that is initially unknown, typically because the computation of the result is yet to be completed.

concurrent.futures.Future documentation.

You can find the source code for the Future class just above the code for the Executor class in the _base module for Python 3.13.

future.result() returns the value returned by the call (the pow function). If the call hasn’t yet completed then this method will wait up to timeout seconds (timeout is the only parameter of result(timeout=None)). If the call hasn’t completed in timeout seconds, then a TimeoutError will be raised. timeout can be an int or float. If timeout is not specified or None, there is no limit to the wait time.

Below we can see some of the source code for the ThreadPoolExecutor class. When the submit() method is called a Future and a _WorkItem objects are created. Then the _WorkItem is put in the _work_queue.

class ThreadPoolExecutor(_base.Executor):
    ...

    def submit(self, fn, /, *args, **kwargs):
        with self._shutdown_lock, _global_shutdown_lock:
            ...

            f = _base.Future()
            w = _WorkItem(f, fn, args, kwargs)

            self._work_queue.put(w)
            self._adjust_thread_count()
            return f
        ...

The _WorkItem is an object used to wrap the task (fn), its arguments (args and kwargs) and the future object (_base.Future()) together. It implements a run() method where the task is executed and the result is set in the Future object.

class _WorkItem:
    def __init__(self, future, fn, args, kwargs):
        self.future = future
        self.fn = fn
        self.args = args
        self.kwargs = kwargs

    def run(self):
        if not self.future.set_running_or_notify_cancel():
            return

        try:
            result = self.fn(*self.args, **self.kwargs)
        except BaseException as exc:
            self.future.set_exception(exc)
            # Break a reference cycle with the exception 'exc'
            self = None
        else:
            self.future.set_result(result)

    __class_getitem__ = classmethod(types.GenericAlias)

That run() method is called from the worker thread. It is implemented in the _worker module function, which is the function passed to the thread as target.

def _worker(executor_reference, work_queue, initializer, initargs):
    ...

            if work_item is not None:
                work_item.run()
                # Delete references to object. See GH-60488
                del work_item
                continue

            ...

The threads are created in the _adjust_thread_count() method called in the constructor of the ThreadPoolExecutor class.

class ThreadPoolExecutor(_base.Executor):

    def _adjust_thread_count(self):
        ...
        if num_threads < self._max_workers:
            thread_name = '%s_%d' % (self._thread_name_prefix or self,
                                     num_threads)
            t = threading.Thread(name=thread_name, target=_worker,
                                 args=(weakref.ref(self, weakref_cb),
                                       self._work_queue,
                                       self._initializer,
                                       self._initargs))
            t.start()
            self._threads.add(t)
            _threads_queues[t] = self._work_queue
    ...
map() method

map() is directly implemented in the Executor class, and it uses the submit() method internally.

class Executor(object):
    """This is an abstract base class for concrete asynchronous executors."""
    ...
    def map(self, fn, *iterables, timeout=None, chunksize=1):
        ...
        fs = [self.submit(fn, *args) for args in zip(*iterables)]
        ...

map() is the other way we have to submit tasks to the thread pool. It is similar to the built-in map(fn, *iterables) function, but the function we pass to it as fn is executed asynchronously and several calls to fn may be made concurrently.

The built-in map() function provides lazy evaluation, meaning that values from the iterable returned by it are computed and returned only when requested.

However, when Executor.map(fn, *iterables) is called, the function gathers all the items from the provided iterables upfront. This is in contrast to a lazy evaluation approach where items are processed as they are needed (i.e., on-demand).

It returns an iterator that we can iterate over to get the values from the tasks as they are available in the same order that the iterable we provided.

If timeout is not specified or None, there is no limit to the wait time. So when we start iterating, we won’t access the second element of the iterator until the first one is available. The returned iterator raises a TimeoutError if the result isn’t available after timeout seconds when it is set to a specific int or float.

If a fn call raises an exception, then that exception will be raised when its value is retrieved from the iterator.

Let’s see an example! This example is super beatiful, I ran it about twenty times 🤭

Basically we use a ThreadPoolExecutor with 5 worker threads to load 20 exotic Australian animals from Wikipedia.

Only one thread can run at a specific instant in time, but 5 threads are available. So when the first thread starts execution a context switch occur and the second thread can start, since the OS detects that it is an I/O operation and avoids wasting time resources by allocating resources to another thread.

Below we see that only 5 threads are working concurrently at any instant in time. When one thread completes the tasks, it is reused to start another task. It takes less than 4 seconds to complete.

Time spent to load exotic Australian animals asynchronously
Time spent to load twenty exotic Australian animals asynchronously

On the other hand, if we load the twenty Australian animals synchronously it takes almost fifteen seconds! 😱

Time spent to load Australian animals synchronously
Time spent to load twenty Australian animals synchronously

You can see which are the twenty exotic Australian animals in the code.

import concurrent.futures
from time import perf_counter, time
import urllib.request
import logging

from concurrency.utils import get_saving_path, postprocess_times
from concurrency.visualize import barh


format = "%(asctime)s: %(message)s"
logging.basicConfig(format=format, level=logging.INFO, datefmt="%H:%M:%S")


URLS = [
    "https://en.wikipedia.org/wiki/Emu",
    "https://en.wikipedia.org/wiki/Wombat",
    "https://en.wikipedia.org/wiki/Kangaroo",
    "https://en.wikipedia.org/wiki/Platypus",
    "https://en.wikipedia.org/wiki/Koala",
    "https://en.wikipedia.org/wiki/Tasmanian_devil",
    "https://en.wikipedia.org/wiki/Echidna",
    "https://en.wikipedia.org/wiki/Dingo",
    "https://en.wikipedia.org/wiki/Kookaburra",
    "https://en.wikipedia.org/wiki/Wallaby",
    "https://en.wikipedia.org/wiki/Macrotis",
    "https://en.wikipedia.org/wiki/Quokka",
    "https://en.wikipedia.org/wiki/Cassowary",
    "https://en.wikipedia.org/wiki/Sugar_glider",
    "https://en.wikipedia.org/wiki/Laughing_kookaburra",
    "https://en.wikipedia.org/wiki/Rainbow_lorikeet",
    "https://en.wikipedia.org/wiki/Coastal_taipan",
    "https://en.wikipedia.org/wiki/Mistletoebird",
    "https://en.wikipedia.org/wiki/Thylacine",
    "https://en.wikipedia.org/wiki/Quoll",
]

animals = {}


# I/O-bound operation
def load_url(url: str) -> tuple[float]:
    """Retrieve a single page and return start and finish times."""
    start = perf_counter()
    with urllib.request.urlopen(url) as conn:
        animals[url] = conn.read()
    finish = perf_counter()
    return start, finish


def asynchronous_load_australian_animals() -> None:
    start = time()
    # Use ThreadPoolExecutor to manage concurrency
    with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
        # Use the map method to apply load_url to each URL
        results = executor.map(load_url, URLS)

        # Process the results and times
        times = [time for time in results]
        start_points, end_points = postprocess_times(times)
    end = time()

    total_time = round(end - start) + 1

    barh(
        title="Asynchronous execution, 5 threads, I/O-bound tasks, Australian animals",
        start_points=start_points,
        end_points=end_points,
        path=get_saving_path("thread-pool-executor/images/ThreadPoolExecutor_ex1.png"),
        n=len(URLS),
        secs=total_time,
    )


if __name__ == "__main__":
    logging.info("Init asynchronous tasks")
    asynchronous_load_australian_animals()
    logging.info(f"len(animals): {len(animals)}")
    logging.info("Finish asynchronous tasks")

With the submit() method it would look something like this, although it varies a lot from one run to another.

Time spent to load exotic Australian animals asynchronously
Time spent to load twenty exotic Australian animals asynchronously

We need to use the as_completed() function to iterate over the Future instances. Otherwise our postprocess_times() function will raise an exception.

as_completed() returns an iterator over the Future instances given by results that yields futures as they complete (finished or cancelled futures).

def asynchronous_load_australian_animals() -> None:
    start = time()
    # Use ThreadPoolExecutor to manage concurrency
    with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
        # Use the submit method to apply load_url to each URL
        results = [executor.submit(load_url, url) for url in URLS]

        # Process the results and times
        times = [result.result() for result in concurrent.futures.as_completed(results)]
        start_points, end_points = postprocess_times(times)
    end = time()

    total_time = round(end - start) + 1
shutdown() method

If you call ThreadPoolExecutor in a with statement as a context manager, then you won’t need to call the shutdown() method since it is called inside the __exit__() magic method. Otherwise you will need to call it to signal the executor that it should free any resources that it is using when the current pending futures have finished executing.

class Executor(object):
    """This is an abstract base class for concrete asynchronous executors."""
    ...
    def __enter__(self):
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        self.shutdown(wait=True)
        return False

There are lots of things you can do with these tools. A Future object has several methods that you can use to customize the behaviour of your program (e.g., cancel(), running(), done(), etc.)

There is also a wait() function provided in the concurrent.futures module that allows you to wait for them to complete. You can specify when to return via the return_when parameter.


Below I leave some resources that helped me to understand this topic better.

Feel free to contact me by any social network. Any feedback is appreciated!

Thanks for reading 🙂

Other Resources

By Javier Castaño on May 5, 2024

Last update: May 17, 2024