Multiprocessing in Python

Speed up your Python programs with parallelism

Time spent to execute 24 CPU-bound tasks in four worker processes in parallel
Time spent to execute 24 CPU-bound tasks in four worker processes in parallel

Table of Contents

Intro

This post is an introduction to multiprocessing in Python using the multiprocessing module, with some examples and visualisations to better understand the content.

The resources section at the end has some links to wonderful material that you can use to go deeper into the topic 🤓

Related posts

The code from this post can be found in this repository.

What is a Process

A process is a program in execution, it is an instance of a computer program that is being executed. In Python, a process is an instance of the Python interpreter that executes Python code.

A program is an executable file that contains processsing instructions. They can be application programs or system programs:

  • Application programs: word processors, games, Google Chrome, etc.
  • System programs: compilers, file management programs, etc.

They are stored on disk and are compiled and loaded into memory (RAM) to be executed by the processor (CPU).

There can be multiple processes running at the same time, and one program can have several processes associated with it (like Chrome). Each process running on the machine has its own memory space allocated.

Each process has at least one thread, the main thread. A thread is the basic unit of execution within a process. It is an independent flow of execution that shares the same address space as other independent flows of execution within the same process. A process can have one or more threads.

Processes and threads
Processes and threads

Below you can see some of the processes running on my computer and the number of threads within them. It shows multiple processes belonging to the chrome program, each one with its corresponding process id Pid.

btop resource monitor
btop resource monitor - https://github.com/aristocratos/btop

So when you run a Python script, the operating system creates a process to execute the Python interpreter, which in turn runs your Python code.

  1. The operating system launches a new process.
  2. This process runs the Python interpreter.
  3. The Python interpreter reads and executes your Python script.

Multiprocessing use cases

The kind of tasks multiprocessing is best suited for are CPU-bound operations. CPU-bound operations are those whose completion time is primarily determined by the speed of the processor, like mathematical computations, data compression, compiling code, machine learning, etc.

In Python, when you use multiprocessing you are actually implementing true parallelism, as opposed to multithreading, which can’t run in parallel because of the Python’s Global Interpreter Lock (GIL).

To be able to run several tasks in parallel by using multiprocessing your machine has to have at least two CPU cores. Today, however, it is possible to run several tasks in parallel with only one core thanks to a technology called Simultaneous Multithreading SMT or Hyper-Threading. This technology will allow to your computer to have two or more logical cores in one physical CPU core.

Consider two tasks with CPU-bound operations. Those task spend more time to be completed. Below they are represented as spaghettized filled circles. If we run them in parallel we can save a big amount of time.

Parallel execution of tasks with CPU-bound operations
Parallel execution of tasks with CPU-bound operations

Instead, if we run them synchronously the time needed to complete both is larger.

Sequential execution of tasks with CPU-bound operations
Sequential execution of tasks with CPU-bound operations

Nor will we have any benefit if we run them concurrently without true parallelism.

Concurrent execution of tasks with CPU-bound operations
Concurrent execution of tasks with CPU-bound operations

Child and Parent processes

A parent process is a process that has created one or more child processes. It initiates the creation of the child process by making a system call.

Depending on the platform, multiprocessing supports three ways to start a process:

  • spawn
    • The parent process starts a fresh Python interpreter process. The child process will only inherit those resources necessary to run the process object’s Process.run() method.
    • Default on Windows and macOS.
  • fork
    • The parent process uses os.fork() to fork the Python interpreter. The child process is identical to the parent when it begins.
    • Default on POSIX (Linux) except macOS.
  • forkserver
    • A server process is spawned and whenever a new process is needed, the parent process connects to the server and requests that it fork a new process.
    • Available on POSIX platforms which support passing file descriptors over Unix pipes such as Linux.

Find detailed info on starting proceses in the docs: Contexts and start methods

It will control and manage its child processes, as well as communicate with them through various inter-process communication (IPC) mechanisms that the OS use to allow processes collaborate, share data, and synchronize their activities.

It can also monitor the status of its child processes or wait for them to terminate, as well as terminate them.

The initial process that starts when a program is executed is called the main process.

A child process is a process created by a parent process. It is a copy of the parent process but with a unique process ID (PID).

It can inherit attributes from the parent, including environment variables, open files, and certain resource limits.

When working with processes, the OS allocates necessary resources to processes (CPU time, memory, etc) and determines which processes run at any given time, ensuring fair distribution of CPU time and efficient multitasking.

Process states

During its execution, a process may change state among the below ones. When a process is running it can get interrupted by a higher priority process so it goes back to Ready state again. Also, when the process is running it may need to wait for an Input/Output operation or an Event wait. In that case the process changes to Waiting state, and when the waiting state finished it goes back to Ready.

Process states
Process states

If a child process has completed execution but remain in the process table because their parent has not yet read their exit status, then it is called a zombie process or defunct process.

If the parent process has already terminated before the child, the child becomes an orphan process.

Python multiprocessing first steps

The multiprocessing module supports spawning processes using an API similar to the threading module. The multiprocessing package offers both local and remote concurrency, effectively side-stepping the Global Interpreter Lock by using subprocesses instead of threads.

We can spawn a new process from the main process by creating a multiprocessing.Process object. The main process is the parent of our new child process.

But first, let’s define our CPU-bound operation, which is the same used in Threading in Python.

def cpu_bound_operation(n: int) -> tuple[float]:
    """CPU-bound task."""
    start = perf_counter()
    count = 0
    for i in range(n):
        count += i
    finish = perf_counter()

    return start, finish

It iterates over a range of n numbers and adds them together. Then it returns a tuple containing the start and finish times so we can use them to plot charts later.

Then we simply call the function executing the CPU intensive operation from another function and log the returned value.

And finally, we create the child process using the Process class from the multiprocessing module.

def cpu_bound_task(counts: int) -> None:
    """Run a CPU-bound task and append the results to shared_list."""
    time = cpu_bound_operation(counts)
    logging.info(f"time - {time}")

def multiprocessing() -> None:

    # Run in a child process ; 150000000 is 3.25 secs aprox in my laptop
    p = Process(target=cpu_bound_task, args=(150000000,))
    p.start()  # Starts the process and calls the target function
    p.join()  # Blocks the thread


if __name__ == "__main__":
    logging.info("Init program")
    multiprocessing()
    logging.info("Finish program")

In the multiprocessing() function above:

  1. When we create a multiprocessing.Process object we can pass a target function to the constructor with some arguments in the args parameter.
    • The target function cpu_bound_task will be called when the internal run() method from the multiprocessing.Process (inherited from the process.BaseProcess class), but not yet.
    • The process keeps waiting in the Ready state until the start() method is called.
  2. When we call the start() method the process transitions to the Running state and a new child process is spawned with its own main thread.
    • The new process executes the run() method, which is in charge of executing the target function with the arguments we passed.
    • Process is in Running state unless it gets blocked.
  3. The join() method waits for the process to complete its execution. The calling thread (main thread of the mian process) will block until the process terminates.

Below is the result of running the example, which shows the time that the child process needed to complete.

20:45:31: Init program
20:45:35: time - (45723.689522083, 45727.402436291)
20:45:35: Finish program

Each process has a separate memory address space

If you read the post on Threading in Python you will have noticed that we saved the times taken by each thread to complete in the shared_data variable. Since the threads in the same process share the same memory space, they can all share data. Processes cannot directly access shared data in other processes.

In the following example we can verify that two processes cannot share data directly through a simple variable because each process has its own separate memory space.

We create a variable in the main process called shared_list to store the time results in it. Then we define two tasks, each one will append its results in the shared_list variable.

import logging
from concurrency.operations.cpu_bound import cpu_bound_operation
from multiprocessing import Process


format = "%(asctime)s: %(message)s"
logging.basicConfig(format=format, level=logging.INFO, datefmt="%H:%M:%S")


# Threads from the same process share memory space
# Processes have different memory space
shared_list = []


def cpu_bound_task_1(counts: int) -> None:
    """Run a CPU-bound task and append the results to shared_list."""
    time = cpu_bound_operation(counts)
    logging.info("time 1 - ", time)
    logging.info(f"shared_list 1 {shared_list}")
    shared_list.append([time])
    logging.info(f"shared_list 1 {shared_list}")


def cpu_bound_task_2(counts: int) -> None:
    """Run a CPU-bound task and append the results to shared_list."""
    time = cpu_bound_operation(counts)
    logging.info("time 2 - ", time)
    logging.info(f"shared_list 2 {shared_list}")
    shared_list.append([time])
    logging.info(f"shared_list 2 {shared_list}")

Now we call those CPU-bound tasks in the multiprocessing() function.

The cpu_bound_task_1() runs in the main thread from the main process, the parent process.

Then, when task 1 completes we execute the task 2 in a child process.

def multiprocessing() -> None:

    start = perf_counter()
    # Run in the main process - 1
    cpu_bound_task_1(150000000)

    # Run in a child process - 2
    p = Process(target=cpu_bound_task_2, args=(150000000,))
    p.start()  # Starts the process and calls the target function
    p.join()  # Blocks the thread

    logging.info(f"final shared_list {shared_list}")
    end = perf_counter()

    logging.info(f"Total time :: {round(end - start, 4)} secs")


if __name__ == "__main__":
    logging.info("Init program")
    multiprocessing()
    logging.info("Finish program")

As you may be thinking, we are not running both tasks in parallel, since task 1 has to complete before task 2 start execution.

Both tasks take more than 6 secs to complete:

17:50:35: Init program
...
17:50:41: Total time :: 6.7446 secs
17:50:41: Finish program

If we were to change the order of both tasks (example below) we would not be running them in parallel either, because the join() method blocks the main thread until the child process completes.

def multiprocessing() -> None:

    start = perf_counter()
    # Run in the main process - 1
    cpu_bound_task_1(150000000)
    # Run in a child process - 2
    p = Process(target=cpu_bound_task_2, args=(150000000,))
    p.start()  # Starts the process and calls the target function
    p.join()  # Blocks the thread

    # Run in a child process - 2
    p = Process(target=cpu_bound_task_2, args=(150000000,))
    p.start()  # Starts the process and calls the target function
    p.join()  # Blocks the thread
    # Run in the main process - 1
    cpu_bound_task_1(150000000)

    logging.info(f"final shared_list {shared_list}")
    end = perf_counter()

    logging.info(f"Total time :: {round(end - start, 4)} secs")

It takes appoximately the same amount of time to complete.

17:55:18: Init program
...
17:55:25: Total time :: 6.6648 secs
17:55:25: Finish program

To actually run both tasks in parallel we have to avoid blocking the main thread of the parent process before task 1 starts running. So we call the join() method after task 1 has started.

def multiprocessing() -> None:

    start = perf_counter()
    # Run in a child process - 2
    p = Process(target=cpu_bound_task_2, args=(150000000,))
    p.start()  # Starts the process and calls the target function
    p.join()  # Blocks the thread

    # Run in the main process - 1
    cpu_bound_task_1(150000000)
    
    p.join()  # Blocks the thread

    logging.info(f"final shared_list {shared_list}")
    end = perf_counter()

    logging.info(f"Total time :: {round(end - start, 4)} secs")

As you can see below, the time spent for both tasks to complete has been reduced by about half:

18:08:48: Init parallel program
...
18:08:51: Total time :: 3.3857 secs
18:08:51: Finish parallel program

Hey, wait a second! Weren’t we just verifying that different processes don’t share the same memory space? 🧐

If we inspect the complete output of our program capable of executing in parallel, we notice a few interesting things.

18:08:48: Init parallel program

18:08:51: time 1 - (14708.567523791, 14711.932722375)
18:08:51: shared_list 1 []
18:08:51: shared_list 1 [[(14708.567523791, 14711.932722375)]]

18:08:51: time 2 -  (14708.587407041, 14711.944044333)
18:08:51: shared_list 2 []
18:08:51: shared_list 2 [[(14708.587407041, 14711.944044333)]]

18:08:51: final shared_list [[(14708.567523791, 14711.932722375)]]

18:08:51: Total time :: 3.3857 secs

18:08:51: Finish parallel program
  • We create and start the child process with the task 2 before we run the task 1 in the parent process. However, task 1 starts executing before task 2. I think this is because the process of creating the child process takes enough time to delay it until after task 1 has started.
  • Both tasks append their resulting times to the share_list variable created in the parent process. However, only the results from the task 1, the task executed in the parent process where the variable was defined, keep stored in the variable when we log it. This verifies that the child process couldn’t share its data with the parent process.

If you look carefully at the results, both tasks store their data in a variable called shared_list even when the processes do not share the same memory space. 🤯 What is happening here? 😱

Each process created by the multiprocessing module using the spawn method (starts a fresh Python interpreter process) has its own separate memory space. This means that any variable, including global ones like shared_list, are duplicated in each process, and changes made to these variables in one process do not affect the other process.

Let’s add some code to check the process IDs and the shared_list memory address so we understand it better.

...
shared_list = []
logging.info(
    f"--> id - main thread in {os.getpid()} process shared_list: {id(shared_list)}"
)


def cpu_bound_task_1(counts: int) -> None:
    ...
    shared_list.append([time])
    logging.info(
        f"--> id - main thread in {os.getpid()} process in task 1 shared_list: {id(shared_list)}"
    )
    logging.info(f"shared_list 1 {shared_list}")


def cpu_bound_task_2(counts: int) -> None:
    ...
    shared_list.append([time])
    logging.info(
        f"--> id - main thread in {os.getpid()} process in task 2 shared_list: {id(shared_list)}"
    )
    logging.info(f"shared_list 2 {shared_list}")


def multiprocessing() -> None:

    start = perf_counter()
    # Run in a child process - 2
    p = Process(target=cpu_bound_task_2, args=(150000000,))
    p.start()  # Starts the process and calls the target function

    # Run in the main process - 1
    cpu_bound_task_1(150000000)

    p.join()  # Blocks the thread

    logging.info(f"final shared_list {shared_list}")
    logging.info(
        f"--> id - main thread in {os.getpid()} process final shared_list: {id(shared_list)}"
    )
    end = perf_counter()

    logging.info(f"Total time :: {round(end - start, 4)} secs")

The results from the code above show the following:

19:58:05: --> FIRST id - main thread in 17723 process shared_list: 4340073984

19:58:05: Init parallel program

19:58:05: --> FIRST id - main thread in 17725 process shared_list: 4313028032

19:58:08: time 2 -  (21265.75012825, 21269.074238291)
19:58:08: shared_list 2 []
19:58:08: --> id - main thread in 17725 process in task 2 shared_list: 4313028032
19:58:08: shared_list 2 [[(21265.75012825, 21269.074238291)]]

19:58:09: time 1 - (21265.729837416, 21269.162869458)
19:58:09: shared_list 1 []
19:58:09: --> id - main thread in 17723 process in task 1 shared_list: 4340073984
19:58:09: shared_list 1 [[(21265.729837416, 21269.162869458)]]

19:58:09: final shared_list [[(21265.729837416, 21269.162869458)]]

19:58:09: --> id - main thread in 17723 process final shared_list: 4340073984

19:58:09: Total time :: 3.4389 secs

19:58:09: Finish parallel program

As you can see, each process has its own version of the shared_list global variable with its own memory address. They are completely different.

  • line 1
    • Process PID 17723 -> parent process
    • shared_list memory address 4340073984
  • line 5
    • Process PID 17725 -> child process
    • shared_list memory address 4313028032
  • line 9
    • Process PID 17725 -> child process
    • shared_list memory address 4313028032
  • line 14
    • Process PID 17723 -> parent process
    • shared_list memory address 4340073984
  • line 19
    • Process PID 17723 -> parent process
    • shared_list memory address 4340073984

I am using macOS, so the multiprocessing module uses the spawn method to create new child processes. The spawn method is not a method that is internally called like spawn(). When using this method, the multiprocessing module internally handles the creation of a new process by launching a fresh Python interpreter. That is why our shared_list global variable has different memory address.

Now, if you focus on the time results, you can see that both processes append such results to the shared_list variable. However, each one append it to its own version of the variable.

As in the end we check the value of the shared_list variable from the parent process, it only shows the values that the task 1 in the parent process appended to it, since the task 2 in the child process appended its values to its own version of the variable, so they do not share the data.

19:58:05: --> FIRST id - main thread in 17723 process shared_list: 4340073984

19:58:05: Init parallel program

19:58:05: --> FIRST id - main thread in 17725 process shared_list: 4313028032

19:58:08: time 2 -  (21265.75012825, 21269.074238291)
19:58:08: shared_list 2 []
19:58:08: --> id - main thread in 17725 process in task 2 shared_list: 4313028032
19:58:08: shared_list 2 [[(21265.75012825, 21269.074238291)]] 

19:58:09: time 1 - (21265.729837416, 21269.162869458)
19:58:09: shared_list 1 []
19:58:09: --> id - main thread in 17723 process in task 1 shared_list: 4340073984
19:58:09: shared_list 1 [[(21265.729837416, 21269.162869458)]] 

19:58:09: final shared_list [[(21265.729837416, 21269.162869458)]] 

19:58:09: --> id - main thread in 17723 process final shared_list: 4340073984

19:58:09: Total time :: 3.4389 secs

19:58:09: Finish parallel program

Now you are probably thinking, okay, but what if we were using Linux instead of your wonderful MacBook Pro that cost you so much money? huh? So, what happens when you use the os.fork() method to create child processes?

Well, let’s find it out! ✨

We can splicitly specify the multiprocessing module start method via set_start_method():

...
from multiprocessing import Process 
import multiprocessing as mp 

...

def multiprocessing() -> None:

    mp.set_start_method("fork")

    start = perf_counter()
    # Run in a child process - 2
    p = Process(target=cpu_bound_task_2, args=(150000000,))
    p = mp.Process(target=cpu_bound_task_2, args=(150000000,))
    p.start()  # Starts the process and calls the target function

    # Run in the main process - 1
    cpu_bound_task_1(150000000)

    p.join()  # Blocks the thread

    logging.info(f"final shared_list {shared_list}")
    logging.info(
        f"--> id - main thread in {os.getpid()} process final shared_list: {id(shared_list)}"
    )
    end = perf_counter()

    logging.info(f"Total time :: {round(end - start, 4)} secs")

If you focus on the process and memory address logs, you will notice that now the shared_list variable has the same memory address even when it is in different processes! 🤯

20:54:27: --> id - main thread in 19098 process shared_list: 4375988096

20:54:27: Init parallel program

20:54:31: time 1 - (24648.12229775, 24651.257626791)
20:54:31: shared_list 1 []
20:54:31: --> id - main thread in 19098 process in task 1 shared_list: 4375988096
20:54:31: shared_list 1 [[(24648.12229775, 24651.257626791)]]

20:54:31: time 2 -  (24648.122759958, 24651.44781075)
20:54:31: shared_list 2 []
20:54:31: --> id - main thread in 19099 process in task 2 shared_list: 4375988096
20:54:31: shared_list 2 [[(24648.122759958, 24651.44781075)]]

20:54:31: final shared_list [[(24648.12229775, 24651.257626791)]]

20:54:31: --> id - main thread in 19098 process final shared_list: 4375988096

20:54:31: Total time :: 3.3296 secs

20:54:31: Finish parallel program

When the parent process uses os.fork() method to fork the Python interpreter, the child process is effectively identical to the parent process when it begins. All resources of the parent are inherited by the child process.

So when using os.fork(), the child process is created as a copy of the parent process. This includes copying the memory space of the parent process, which means that the child process initially has the same memory layout as the parent process, including the addresses of all objects. Therefore, immediately after the fork, both processes (parent and child) have the same memory content and addresses for variables, like shared_list. However, this does not mean they share the same memory space. They have separate memory spaces that just happen to be identical right after the fork.

This is done using a technique called “copy-on-write” to optimize memory usage. This means that the child process initially shares the same memory pages as the parent process, but as soon as either the parent or the child process modifies a page, a separate copy of that page is created for the modifying process. Thus, while the memory is initially shared, any changes result in separate copies, leading to independent memory spaces.

The idea behind a copy-on-write is that when a parent process creates a child process then both of these processes initially will share the same pages in memory and these shared pages will be marked as copy-on-write which means that if any of these processes will try to modify the shared pages then only a copy of these pages will be created and the modifications will be done on the copy of pages by that process and thus not affecting the other process.

Copy on Write | GeeksforGeeks

📒 Note: The default start method will change away from fork in Python 3.14. Code that requires fork should explicitly specify that via get_context() or set_start_method().

Contexts and start methods

Exchanging objects between processes

The multiprocessing module supports two types of communication channel between processes:

  • Queue()
  • Pipe()

multiprocessing.Queue internally uses multiprocessing.Pipe and is typically preferred. Queue provides a higher-level abstraction with a simpler and more intuitive API compared to Pipe.

Queue also provides some advantages over Pipe such as:

  • Queue also supports multiple producers and multiple consumers out of the box, which means that you can have several processes putting items into the queue and several processes getting items from the queue simultaneously. In contrast, Pipe is best suited for one-to-one communication between two endpoints, and managing multiple producers or consumers can become complex and error-prone.
  • Queue is thread and process safe. It internally manages locks and semaphores to ensure that operations are performed safely without data corruption or race conditions.
  • Queue includes internal buffering, which can improve performance by reducing the frequency of context switches between processes. A buffer is a temporary storage area in computer memory used to hold data while it is being transferred from one place to another.

The queue typically employs a Pipe to establish a unidirectional or bidirectional channel between processes.

However, let’s start our example using a Pipe()!

Pipe function

The Pipe() function returns a pair of connection objects connected by a pipe which by default is duplex (two-way). The two connection objects returned by Pipe() represent the two ends of the pipe. Each connection object has send() and recv() methods (among others).

Here we use send() to send the data from the child process and recv() to recibe the data from the parent process.

...
from multiprocessing.connection import Connection 
...
from multiprocessing import Process 
from multiprocessing import Pipe, Process 


shared_list = []

...

def cpu_bound_task_2(counts: int, conn: Connection) -> None:
    """Runs a CPU-bound task and sends the results to parent."""
    time = cpu_bound_operation(counts)
    logging.info(f"time 2 -  {time}")
    logging.info(f"shared_list 2 {shared_list}")
    shared_list.append([time])
    logging.info(f"shared_list 2 {shared_list}")
    conn.send([time])
    conn.close()


def multiprocessing() -> None:

    parent_conn, child_conn = Pipe()

    start = perf_counter()
    # Run in a child process - 2
    p = Process(target=cpu_bound_task_2, args=(150000000, child_conn))
    p.start()  # Starts the process and calls the target function

    # Run in the main process - 1
    cpu_bound_task_1(150000000)

    child_time = parent_conn.recv()
    logging.info(f"child process time: {child_time}")
    shared_list.append(child_time)

    p.join()  # Blocks the thread
    ...

The parent_conn.recv() method has to be placed after the cpu_bound_task_1, otherwise the main thread of the parent process would block until the child process sends the data.

18:10:59: Init parallel program

18:11:02: time 1 - (44691.260117416, 44694.601532625)
18:11:02: shared_list 1 []
18:11:02: shared_list 1 [[(44691.260117416, 44694.601532625)]]

18:11:02: time 2 -  (44691.283312583, 44694.670279125)

18:11:02: child process time: [(44691.283312583, 44694.670279125)]

18:11:02: final shared_list [[(44691.260117416, 44694.601532625)], [(44691.283312583, 44694.670279125)]]

18:11:02: Total time :: 3.4163 secs

18:11:02: Finish parallel program

Now with the times being again in the shared_list variable we can use our plotting function to generate the bar chart! 💃🏻

Time spent to execute two CPU-bound tasks in two parallel processes
Time spent to execute two CPU-bound tasks in two parallel processes

Queue class

The multiprocessing.Queue class is the preferred method for exchanging objects between processes.

We just need some minor changes to the program above in order to implement it.

...
from multiprocessing.connection import Connection 
...
from multiprocessing import Pipe, Process 
from multiprocessing import Process, Queue 

...

shared_list = []

...

def cpu_bound_task_2(counts: int, q: Queue) -> None:
    """Runs a CPU-bound task and sends the results to parent."""
    time = cpu_bound_operation(counts)
    logging.info(f"time 2 -  {time}")
    conn.send([time])
    conn.close()
    q.put([time]) # Add items to the queue


def multiprocessing() -> None:

    parent_conn, child_conn = Pipe()
    q = Queue()

    start = perf_counter()
    # Run in a child process - 2
    p = Process(target=cpu_bound_task_2, args=(150000000, child_conn))
    p = Process(target=cpu_bound_task_2, args=(150000000, q))
    p.start()  # Starts the process and calls the target function

    # Run in the main process - 1
    cpu_bound_task_1(150000000)

    child_time = parent_conn.recv()
    child_time = q.get() # Remove and return an item from the queue
    logging.info(f"child process time: {child_time}")
    shared_list.append(child_time)

    p.join()  # Blocks the thread
    ...

The put() method is used to add items to the queue. It serializes the object and sends it through the pipe to the internal buffer of the queue.

The get() method is used to remove and return an item from the queue. It reads the serialized object from the pipe and deserializes it.

And the result is pretty similar.

Time spent to execute two CPU-bound tasks in two parallel processes
Time spent to execute two CPU-bound tasks in two parallel processes

Pool of worker processes with Pool class

The Pool class provides a convenient way to parallelize the execution of a function across multiple input values, distributing the input data across available processes.

When a Pool object is created, it initializes several worker processes. The number of worker processes can be specified with the processes parameter, or it defaults to the number of logical CPU cores returned by os.cpu_count() (or by os.process_cpu_count() since Python 3.13).

Methods like map, map_async, apply, apply_async, imap, imap_unordered, starmap and starmap_async are used to submit tasks to the pool.

The Pool object internally uses queues to send tasks from the main process to the worker processes and to send results back from the worker processes to the main process.

multiprocessing.Pool() source code. Python 3.13

Let’s see some examples!

map

We can provide a function, an iterable, and an integer specifying the chunksize to the map() method.

This method chops the iterable into a number of chunks and distributes them among the worker processes in the pool.

map blocks until all items have been processed.

In the first example we have 4 processes availables and an iterable of 4 integers. Our function is executed in each process, and the values of the iterable are distributed among the processes and passed as arguments to the function.

...

def cpu_bound_task(counts: int) -> None:
    """Runs a CPU-bound task."""
    time = cpu_bound_operation(counts)
    logging.info(f"-------- Process: {os.getpid()} --------")
    logging.info(f"time - {time}\n")
    return [time]


def multiprocessing() -> None:

    args = [50000000, 50000000, 50000000, 50000000]

    # Run in worker processes
    with Pool(processes=4) as pool:
        res = pool.map(cpu_bound_task, args)  # blocks until the result is ready

        logging.info(res)
    ...

Each process execute the given function with one value from the iterable.

22:17:24: Init parallel program

22:17:25: -------- Process: 44665 --------
22:17:25: time - (56633.60939425, 56634.651306916)

22:17:25: -------- Process: 44663 --------
22:17:25: time - (56633.612802333, 56634.656112583)

22:17:25: -------- Process: 44666 --------
22:17:25: time - (56633.64181075, 56634.682685541)

22:17:25: -------- Process: 44664 --------
22:17:25: time - (56633.641066125, 56634.707866125)

22:17:25: [[(56633.60939425, 56634.651306916)], [(56633.612802333, 56634.656112583)], [(56633.641066125, 56634.707866125)], [(56633.64181075, 56634.682685541)]]

22:17:26: Finish parallel program

The four tasks are effectively executed in parallel.

Time spent to execute 4 CPU-bound tasks in four worker processes in parallel
Time spent to execute 4 CPU-bound tasks in four worker processes in parallel

Now if we add two more values to the iterable and keep the four workers we can see that the two new values are processed as the worker processes become available.

...

def multiprocessing() -> None:

    args = [50000000, 50000000, 50000000, 50000000]
    args = [50000000, 50000000, 50000000, 50000000, 50000000, 50000000]

    # Run in worker processes
    with Pool(processes=4) as pool:
        res = pool.map(cpu_bound_task, args)  # blocks until the result is ready

        logging.info(res)

    ...
Time spent to execute 6 CPU-bound tasks in four worker processes in parallel
Time spent to execute 6 CPU-bound tasks in four worker processes in parallel

Now let’s play with the chunksize parameter.

If we submit an iterable with 24 values to be processed and pass a chunksizeof 1, we can see the same behaviour as above.

...

def multiprocessing() -> None:

    n_tasks = 24
    args = [50000000] * n_tasks
    chunksize = 1

    # Run in worker processes
    with Pool(processes=4) as pool:
        res = pool.map(
            cpu_bound_task, args, chunksize
        )  # blocks until the result is ready

        logging.info(res)

    ...
Time spent to execute 24 CPU-bound tasks in four worker processes in parallel
Time spent to execute 24 CPU-bound tasks in four worker processes in parallel

Instead, we can reduce the number of tasks sent to be processed by setting the chunksize parameter. The map() method chops iterables into a number of chunks which it submits to the pool as separate tasks. The (approximate) size of these chunks can be specified by setting chunksize to a positive integer.

Below we have 24 elements in our iterable args and a chunksize of 2. This means that the 24 tasks will be sent as if they were 12 instead of 24, and each task will have 2 elements from the iterable.

...

def multiprocessing() -> None:

    n_tasks = 24
    args = [50000000] * n_tasks
    chunksize = 1
    chunksize = 2

    # Run in worker processes
    ...
Time spent to execute 24 CPU-bound tasks in four worker processes in parallel
Time spent to execute 24 CPU-bound tasks in four worker processes in parallel

And with a chunksize of 3 we get somethin like this:

...

def multiprocessing() -> None:

    n_tasks = 24
    args = [50000000] * n_tasks
    chunksize = 2
    chunksize = 3

    # Run in worker processes
    ...
Time spent to execute 24 CPU-bound tasks in four worker processes in parallel
Time spent to execute 24 CPU-bound tasks in four worker processes in parallel

map_async

map_async() returns an AsyncResult object and does not block the thread. You can use AsyncResult methods like get(), wait() or ready() to wait for the results or check if they are ready. get() and wait() accept a timeout argument.

...

def multiprocessing() -> None:

    n_tasks = 24
    args = [50000000] * n_tasks

    # Run in worker processes
    with Pool(processes=4) as pool:
        res = pool.map_async(cpu_bound_task, args, 1)  # does not block

        logging.info(f"res.ready(): {res.ready()}")
        logging.info("waiting...")
        res.wait()  # blocks until the result is ready
        logging.info(f"res.ready(): {res.ready()}")
        res = res.get()

    ...
20:05:53: Init parallel program

20:05:53: res.ready(): False
20:05:53: waiting...

20:05:54: -------- Process: 35723 --------
20:05:54: time - (151823.566433833, 151824.614282083)

...

20:06:00: res.ready(): True

20:06:00: Finish parallel program
Time spent to execute 24 CPU-bound tasks in four worker processes in parallel
Time spent to execute 24 CPU-bound tasks in four worker processes in parallel

You can find other useful methods such as apply_async and others in the multiprocessing.Pool and the multiprocessing.AsyncResult docs.

The apply() method calls the function with the arguments that you provide and blocks until the function completes, returning the result. It is similar to a direct function call, but it executes in a worker process from the pool. apply_async() returns an AsyncResult object and does not block the thread.


Below I leave some resources that helped me to understand this topic better.

Feel free to contact me by any social network. Any feedback is appreciated!

Thanks for reading 💛

Other Resources

By Javier Castaño on May 21, 2024