Table of Contents
- Intro
- What is a Process
- Multiprocessing use cases
- Child and Parent processes
- Process states
- Python
multiprocessing
first steps - Each process has a separate memory address space
- Exchanging objects between processes
- Pool of worker processes with Pool class
- Other Resources
Intro
This post is an introduction to multiprocessing in Python using the multiprocessing module, with some examples and visualisations to better understand the content.
The resources section at the end has some links to wonderful material that you can use to go deeper into the topic 🤓
Related posts
The code from this post can be found in this repository.
What is a Process
A process is a program in execution, it is an instance of a computer program that is being executed. In Python, a process is an instance of the Python interpreter that executes Python code.
A program is an executable file that contains processsing instructions. They can be application programs or system programs:
- Application programs: word processors, games, Google Chrome, etc.
- System programs: compilers, file management programs, etc.
They are stored on disk and are compiled and loaded into memory (RAM) to be executed by the processor (CPU).
There can be multiple processes running at the same time, and one program can have several processes associated with it (like Chrome). Each process running on the machine has its own memory space allocated.
Each process has at least one thread, the main thread. A thread is the basic unit of execution within a process. It is an independent flow of execution that shares the same address space as other independent flows of execution within the same process. A process can have one or more threads.
Below you can see some of the processes running on my computer and the number of threads within them. It shows multiple processes belonging to the chrome
program, each one with its corresponding process id Pid
.
So when you run a Python script, the operating system creates a process to execute the Python interpreter, which in turn runs your Python code.
- The operating system launches a new process.
- This process runs the Python interpreter.
- The Python interpreter reads and executes your Python script.
Multiprocessing use cases
The kind of tasks multiprocessing is best suited for are CPU-bound operations. CPU-bound operations are those whose completion time is primarily determined by the speed of the processor, like mathematical computations, data compression, compiling code, machine learning, etc.
In Python, when you use multiprocessing you are actually implementing true parallelism, as opposed to multithreading, which can’t run in parallel because of the Python’s Global Interpreter Lock (GIL).
To be able to run several tasks in parallel by using multiprocessing your machine has to have at least two CPU cores. Today, however, it is possible to run several tasks in parallel with only one core thanks to a technology called Simultaneous Multithreading SMT or Hyper-Threading. This technology will allow to your computer to have two or more logical cores in one physical CPU core.
Consider two tasks with CPU-bound operations. Those task spend more time to be completed. Below they are represented as spaghettized filled circles. If we run them in parallel we can save a big amount of time.
Instead, if we run them synchronously the time needed to complete both is larger.
Nor will we have any benefit if we run them concurrently without true parallelism.
Child and Parent processes
A parent process is a process that has created one or more child processes. It initiates the creation of the child process by making a system call.
Depending on the platform, multiprocessing supports three ways to start a process:
- spawn
- The parent process starts a fresh Python interpreter process. The child process will only inherit those resources necessary to run the process object’s
Process.run()
method. - Default on Windows and macOS.
- The parent process starts a fresh Python interpreter process. The child process will only inherit those resources necessary to run the process object’s
- fork
- The parent process uses
os.fork()
to fork the Python interpreter. The child process is identical to the parent when it begins. - Default on POSIX (Linux) except macOS.
- The parent process uses
- forkserver
- A server process is spawned and whenever a new process is needed, the parent process connects to the server and requests that it fork a new process.
- Available on POSIX platforms which support passing file descriptors over Unix pipes such as Linux.
Find detailed info on starting proceses in the docs: Contexts and start methods
It will control and manage its child processes, as well as communicate with them through various inter-process communication (IPC) mechanisms that the OS use to allow processes collaborate, share data, and synchronize their activities.
It can also monitor the status of its child processes or wait for them to terminate, as well as terminate them.
The initial process that starts when a program is executed is called the main process.
A child process is a process created by a parent process. It is a copy of the parent process but with a unique process ID (PID).
It can inherit attributes from the parent, including environment variables, open files, and certain resource limits.
When working with processes, the OS allocates necessary resources to processes (CPU time, memory, etc) and determines which processes run at any given time, ensuring fair distribution of CPU time and efficient multitasking.
Process states
During its execution, a process may change state among the below ones. When a process is running it can get interrupted by a higher priority process so it goes back to Ready state again. Also, when the process is running it may need to wait for an Input/Output operation or an Event wait. In that case the process changes to Waiting state, and when the waiting state finished it goes back to Ready.
If a child process has completed execution but remain in the process table because their parent has not yet read their exit status, then it is called a zombie process or defunct process.
If the parent process has already terminated before the child, the child becomes an orphan process.
Python multiprocessing
first steps
The multiprocessing module supports spawning processes using an API similar to the threading module. The multiprocessing package offers both local and remote concurrency, effectively side-stepping the Global Interpreter Lock by using subprocesses instead of threads.
We can spawn a new process from the main process by creating a multiprocessing.Process
object. The main process is the parent of our new child process.
But first, let’s define our CPU-bound operation, which is the same used in Threading in Python.
def cpu_bound_operation(n: int) -> tuple[float]:
"""CPU-bound task."""
start = perf_counter()
count = 0
for i in range(n):
count += i
finish = perf_counter()
return start, finish
It iterates over a range of n
numbers and adds them together. Then it returns a tuple containing the start and finish times so we can use them to plot charts later.
Then we simply call the function executing the CPU intensive operation from another function and log the returned value.
And finally, we create the child process using the Process
class from the multiprocessing
module.
def cpu_bound_task(counts: int) -> None:
"""Run a CPU-bound task and append the results to shared_list."""
time = cpu_bound_operation(counts)
logging.info(f"time - {time}")
def multiprocessing() -> None:
# Run in a child process ; 150000000 is 3.25 secs aprox in my laptop
p = Process(target=cpu_bound_task, args=(150000000,))
p.start() # Starts the process and calls the target function
p.join() # Blocks the thread
if __name__ == "__main__":
logging.info("Init program")
multiprocessing()
logging.info("Finish program")
In the multiprocessing()
function above:
- When we create a
multiprocessing.Process
object we can pass atarget
function to the constructor with some arguments in theargs
parameter.- The target function
cpu_bound_task
will be called when the internalrun()
method from themultiprocessing.Process
(inherited from theprocess.BaseProcess
class), but not yet. - The process keeps waiting in the Ready state until the
start()
method is called.
- The target function
- When we call the
start()
method the process transitions to the Running state and a new child process is spawned with its own main thread.- The new process executes the
run()
method, which is in charge of executing the target function with the arguments we passed. - Process is in Running state unless it gets blocked.
- The new process executes the
- The
join()
method waits for the process to complete its execution. The calling thread (main thread of the mian process) will block until the process terminates.
Below is the result of running the example, which shows the time that the child process needed to complete.
20:45:31: Init program
20:45:35: time - (45723.689522083, 45727.402436291)
20:45:35: Finish program
Each process has a separate memory address space
If you read the post on Threading in Python you will have noticed that we saved the times taken by each thread to complete in the shared_data
variable. Since the threads in the same process share the same memory space, they can all share data. Processes cannot directly access shared data in other processes.
In the following example we can verify that two processes cannot share data directly through a simple variable because each process has its own separate memory space.
We create a variable in the main process called shared_list
to store the time results in it. Then we define two tasks, each one will append its results in the shared_list
variable.
import logging
from concurrency.operations.cpu_bound import cpu_bound_operation
from multiprocessing import Process
format = "%(asctime)s: %(message)s"
logging.basicConfig(format=format, level=logging.INFO, datefmt="%H:%M:%S")
# Threads from the same process share memory space
# Processes have different memory space
shared_list = []
def cpu_bound_task_1(counts: int) -> None:
"""Run a CPU-bound task and append the results to shared_list."""
time = cpu_bound_operation(counts)
logging.info("time 1 - ", time)
logging.info(f"shared_list 1 {shared_list}")
shared_list.append([time])
logging.info(f"shared_list 1 {shared_list}")
def cpu_bound_task_2(counts: int) -> None:
"""Run a CPU-bound task and append the results to shared_list."""
time = cpu_bound_operation(counts)
logging.info("time 2 - ", time)
logging.info(f"shared_list 2 {shared_list}")
shared_list.append([time])
logging.info(f"shared_list 2 {shared_list}")
Now we call those CPU-bound tasks in the multiprocessing()
function.
The cpu_bound_task_1()
runs in the main thread from the main process, the parent process.
Then, when task 1 completes we execute the task 2 in a child process.
def multiprocessing() -> None:
start = perf_counter()
# Run in the main process - 1
cpu_bound_task_1(150000000)
# Run in a child process - 2
p = Process(target=cpu_bound_task_2, args=(150000000,))
p.start() # Starts the process and calls the target function
p.join() # Blocks the thread
logging.info(f"final shared_list {shared_list}")
end = perf_counter()
logging.info(f"Total time :: {round(end - start, 4)} secs")
if __name__ == "__main__":
logging.info("Init program")
multiprocessing()
logging.info("Finish program")
As you may be thinking, we are not running both tasks in parallel, since task 1 has to complete before task 2 start execution.
Both tasks take more than 6 secs to complete:
17:50:35: Init program
...
17:50:41: Total time :: 6.7446 secs
17:50:41: Finish program
If we were to change the order of both tasks (example below) we would not be running them in parallel either, because the join()
method blocks the main thread until the child process completes.
def multiprocessing() -> None:
start = perf_counter()
# Run in the main process - 1
cpu_bound_task_1(150000000)
# Run in a child process - 2
p = Process(target=cpu_bound_task_2, args=(150000000,))
p.start() # Starts the process and calls the target function
p.join() # Blocks the thread
# Run in a child process - 2
p = Process(target=cpu_bound_task_2, args=(150000000,))
p.start() # Starts the process and calls the target function
p.join() # Blocks the thread
# Run in the main process - 1
cpu_bound_task_1(150000000)
logging.info(f"final shared_list {shared_list}")
end = perf_counter()
logging.info(f"Total time :: {round(end - start, 4)} secs")
It takes appoximately the same amount of time to complete.
17:55:18: Init program
...
17:55:25: Total time :: 6.6648 secs
17:55:25: Finish program
To actually run both tasks in parallel we have to avoid blocking the main thread of the parent process before task 1 starts running. So we call the join()
method after task 1 has started.
def multiprocessing() -> None:
start = perf_counter()
# Run in a child process - 2
p = Process(target=cpu_bound_task_2, args=(150000000,))
p.start() # Starts the process and calls the target function
p.join() # Blocks the thread
# Run in the main process - 1
cpu_bound_task_1(150000000)
p.join() # Blocks the thread
logging.info(f"final shared_list {shared_list}")
end = perf_counter()
logging.info(f"Total time :: {round(end - start, 4)} secs")
As you can see below, the time spent for both tasks to complete has been reduced by about half:
18:08:48: Init parallel program
...
18:08:51: Total time :: 3.3857 secs
18:08:51: Finish parallel program
Hey, wait a second! Weren’t we just verifying that different processes don’t share the same memory space? 🧐
If we inspect the complete output of our program capable of executing in parallel, we notice a few interesting things.
18:08:48: Init parallel program
18:08:51: time 1 - (14708.567523791, 14711.932722375)
18:08:51: shared_list 1 []
18:08:51: shared_list 1 [[(14708.567523791, 14711.932722375)]]
18:08:51: time 2 - (14708.587407041, 14711.944044333)
18:08:51: shared_list 2 []
18:08:51: shared_list 2 [[(14708.587407041, 14711.944044333)]]
18:08:51: final shared_list [[(14708.567523791, 14711.932722375)]]
18:08:51: Total time :: 3.3857 secs
18:08:51: Finish parallel program
- We create and start the child process with the task 2 before we run the task 1 in the parent process. However, task 1 starts executing before task 2. I think this is because the process of creating the child process takes enough time to delay it until after task 1 has started.
- Both tasks append their resulting times to the
share_list
variable created in the parent process. However, only the results from the task 1, the task executed in the parent process where the variable was defined, keep stored in the variable when we log it. This verifies that the child process couldn’t share its data with the parent process.
If you look carefully at the results, both tasks store their data in a variable called shared_list
even when the processes do not share the same memory space. 🤯 What is happening here? 😱
Each process created by the multiprocessing
module using the spawn method (starts a fresh Python interpreter process) has its own separate memory space. This means that any variable, including global ones like shared_list
, are duplicated in each process, and changes made to these variables in one process do not affect the other process.
Let’s add some code to check the process IDs and the shared_list
memory address so we understand it better.
...
shared_list = []
logging.info(
f"--> id - main thread in {os.getpid()} process shared_list: {id(shared_list)}"
)
def cpu_bound_task_1(counts: int) -> None:
...
shared_list.append([time])
logging.info(
f"--> id - main thread in {os.getpid()} process in task 1 shared_list: {id(shared_list)}"
)
logging.info(f"shared_list 1 {shared_list}")
def cpu_bound_task_2(counts: int) -> None:
...
shared_list.append([time])
logging.info(
f"--> id - main thread in {os.getpid()} process in task 2 shared_list: {id(shared_list)}"
)
logging.info(f"shared_list 2 {shared_list}")
def multiprocessing() -> None:
start = perf_counter()
# Run in a child process - 2
p = Process(target=cpu_bound_task_2, args=(150000000,))
p.start() # Starts the process and calls the target function
# Run in the main process - 1
cpu_bound_task_1(150000000)
p.join() # Blocks the thread
logging.info(f"final shared_list {shared_list}")
logging.info(
f"--> id - main thread in {os.getpid()} process final shared_list: {id(shared_list)}"
)
end = perf_counter()
logging.info(f"Total time :: {round(end - start, 4)} secs")
The results from the code above show the following:
19:58:05: --> FIRST id - main thread in 17723 process shared_list: 4340073984
19:58:05: Init parallel program
19:58:05: --> FIRST id - main thread in 17725 process shared_list: 4313028032
19:58:08: time 2 - (21265.75012825, 21269.074238291)
19:58:08: shared_list 2 []
19:58:08: --> id - main thread in 17725 process in task 2 shared_list: 4313028032
19:58:08: shared_list 2 [[(21265.75012825, 21269.074238291)]]
19:58:09: time 1 - (21265.729837416, 21269.162869458)
19:58:09: shared_list 1 []
19:58:09: --> id - main thread in 17723 process in task 1 shared_list: 4340073984
19:58:09: shared_list 1 [[(21265.729837416, 21269.162869458)]]
19:58:09: final shared_list [[(21265.729837416, 21269.162869458)]]
19:58:09: --> id - main thread in 17723 process final shared_list: 4340073984
19:58:09: Total time :: 3.4389 secs
19:58:09: Finish parallel program
As you can see, each process has its own version of the shared_list
global variable with its own memory address. They are completely different.
- line 1
- Process PID
17723
-> parent process shared_list
memory address4340073984
- Process PID
- line 5
- Process PID
17725
-> child process shared_list
memory address4313028032
- Process PID
- line 9
- Process PID
17725
-> child process shared_list
memory address4313028032
- Process PID
- line 14
- Process PID
17723
-> parent process shared_list
memory address4340073984
- Process PID
- line 19
- Process PID
17723
-> parent process shared_list
memory address4340073984
- Process PID
I am using macOS, so the multiprocessing
module uses the spawn method to create new child processes. The spawn method is not a method that is internally called like spawn()
. When using this method, the multiprocessing
module internally handles the creation of a new process by launching a fresh Python interpreter. That is why our shared_list
global variable has different memory address.
Now, if you focus on the time results, you can see that both processes append such results to the shared_list
variable. However, each one append it to its own version of the variable.
As in the end we check the value of the shared_list
variable from the parent process, it only shows the values that the task 1 in the parent process appended to it, since the task 2 in the child process appended its values to its own version of the variable, so they do not share the data.
19:58:05: --> FIRST id - main thread in 17723 process shared_list: 4340073984
19:58:05: Init parallel program
19:58:05: --> FIRST id - main thread in 17725 process shared_list: 4313028032
19:58:08: time 2 - (21265.75012825, 21269.074238291)
19:58:08: shared_list 2 []
19:58:08: --> id - main thread in 17725 process in task 2 shared_list: 4313028032
19:58:08: shared_list 2 [[(21265.75012825, 21269.074238291)]]
19:58:09: time 1 - (21265.729837416, 21269.162869458)
19:58:09: shared_list 1 []
19:58:09: --> id - main thread in 17723 process in task 1 shared_list: 4340073984
19:58:09: shared_list 1 [[(21265.729837416, 21269.162869458)]]
19:58:09: final shared_list [[(21265.729837416, 21269.162869458)]]
19:58:09: --> id - main thread in 17723 process final shared_list: 4340073984
19:58:09: Total time :: 3.4389 secs
19:58:09: Finish parallel program
Now you are probably thinking, okay, but what if we were using Linux instead of your wonderful MacBook Pro that cost you so much money? huh? So, what happens when you use the os.fork()
method to create child processes?
Well, let’s find it out! ✨
We can splicitly specify the multiprocessing module start method via set_start_method()
:
...
from multiprocessing import Process
import multiprocessing as mp
...
def multiprocessing() -> None:
mp.set_start_method("fork")
start = perf_counter()
# Run in a child process - 2
p = Process(target=cpu_bound_task_2, args=(150000000,))
p = mp.Process(target=cpu_bound_task_2, args=(150000000,))
p.start() # Starts the process and calls the target function
# Run in the main process - 1
cpu_bound_task_1(150000000)
p.join() # Blocks the thread
logging.info(f"final shared_list {shared_list}")
logging.info(
f"--> id - main thread in {os.getpid()} process final shared_list: {id(shared_list)}"
)
end = perf_counter()
logging.info(f"Total time :: {round(end - start, 4)} secs")
If you focus on the process and memory address logs, you will notice that now the shared_list
variable has the same memory address even when it is in different processes! 🤯
20:54:27: --> id - main thread in 19098 process shared_list: 4375988096
20:54:27: Init parallel program
20:54:31: time 1 - (24648.12229775, 24651.257626791)
20:54:31: shared_list 1 []
20:54:31: --> id - main thread in 19098 process in task 1 shared_list: 4375988096
20:54:31: shared_list 1 [[(24648.12229775, 24651.257626791)]]
20:54:31: time 2 - (24648.122759958, 24651.44781075)
20:54:31: shared_list 2 []
20:54:31: --> id - main thread in 19099 process in task 2 shared_list: 4375988096
20:54:31: shared_list 2 [[(24648.122759958, 24651.44781075)]]
20:54:31: final shared_list [[(24648.12229775, 24651.257626791)]]
20:54:31: --> id - main thread in 19098 process final shared_list: 4375988096
20:54:31: Total time :: 3.3296 secs
20:54:31: Finish parallel program
When the parent process uses os.fork()
method to fork the Python interpreter, the child process is effectively identical to the parent process when it begins. All resources of the parent are inherited by the child process.
So when using os.fork()
, the child process is created as a copy of the parent process. This includes copying the memory space of the parent process, which means that the child process initially has the same memory layout as the parent process, including the addresses of all objects. Therefore, immediately after the fork, both processes (parent and child) have the same memory content and addresses for variables, like shared_list
. However, this does not mean they share the same memory space. They have separate memory spaces that just happen to be identical right after the fork.
This is done using a technique called “copy-on-write” to optimize memory usage. This means that the child process initially shares the same memory pages as the parent process, but as soon as either the parent or the child process modifies a page, a separate copy of that page is created for the modifying process. Thus, while the memory is initially shared, any changes result in separate copies, leading to independent memory spaces.
The idea behind a copy-on-write is that when a parent process creates a child process then both of these processes initially will share the same pages in memory and these shared pages will be marked as copy-on-write which means that if any of these processes will try to modify the shared pages then only a copy of these pages will be created and the modifications will be done on the copy of pages by that process and thus not affecting the other process.
📒 Note: The default start method will change away from fork in Python 3.14. Code that requires fork should explicitly specify that via
get_context()
orset_start_method()
.
Exchanging objects between processes
The multiprocessing module supports two types of communication channel between processes:
Queue()
Pipe()
multiprocessing.Queue
internally uses multiprocessing.Pipe
and is typically preferred. Queue
provides a higher-level abstraction with a simpler and more intuitive API compared to Pipe
.
Queue
also provides some advantages over Pipe
such as:
Queue
also supports multiple producers and multiple consumers out of the box, which means that you can have several processes putting items into the queue and several processes getting items from the queue simultaneously. In contrast,Pipe
is best suited for one-to-one communication between two endpoints, and managing multiple producers or consumers can become complex and error-prone.Queue
is thread and process safe. It internally manages locks and semaphores to ensure that operations are performed safely without data corruption or race conditions.Queue
includes internal buffering, which can improve performance by reducing the frequency of context switches between processes. A buffer is a temporary storage area in computer memory used to hold data while it is being transferred from one place to another.
The queue
typically employs a Pipe
to establish a unidirectional or bidirectional channel between processes.
However, let’s start our example using a Pipe()
!
Pipe function
The Pipe()
function returns a pair of connection objects connected by a pipe which by default is duplex (two-way). The two connection objects returned by Pipe()
represent the two ends of the pipe. Each connection object has send()
and recv()
methods (among others).
Here we use send()
to send the data from the child process and recv()
to recibe the data from the parent process.
...
from multiprocessing.connection import Connection
...
from multiprocessing import Process
from multiprocessing import Pipe, Process
shared_list = []
...
def cpu_bound_task_2(counts: int, conn: Connection) -> None:
"""Runs a CPU-bound task and sends the results to parent."""
time = cpu_bound_operation(counts)
logging.info(f"time 2 - {time}")
logging.info(f"shared_list 2 {shared_list}")
shared_list.append([time])
logging.info(f"shared_list 2 {shared_list}")
conn.send([time])
conn.close()
def multiprocessing() -> None:
parent_conn, child_conn = Pipe()
start = perf_counter()
# Run in a child process - 2
p = Process(target=cpu_bound_task_2, args=(150000000, child_conn))
p.start() # Starts the process and calls the target function
# Run in the main process - 1
cpu_bound_task_1(150000000)
child_time = parent_conn.recv()
logging.info(f"child process time: {child_time}")
shared_list.append(child_time)
p.join() # Blocks the thread
...
The parent_conn.recv()
method has to be placed after the cpu_bound_task_1
, otherwise the main thread of the parent process would block until the child process sends the data.
18:10:59: Init parallel program
18:11:02: time 1 - (44691.260117416, 44694.601532625)
18:11:02: shared_list 1 []
18:11:02: shared_list 1 [[(44691.260117416, 44694.601532625)]]
18:11:02: time 2 - (44691.283312583, 44694.670279125)
18:11:02: child process time: [(44691.283312583, 44694.670279125)]
18:11:02: final shared_list [[(44691.260117416, 44694.601532625)], [(44691.283312583, 44694.670279125)]]
18:11:02: Total time :: 3.4163 secs
18:11:02: Finish parallel program
Now with the times being again in the shared_list
variable we can use our plotting function to generate the bar chart! 💃🏻
Queue class
The multiprocessing.Queue
class is the preferred method for exchanging objects between processes.
We just need some minor changes to the program above in order to implement it.
...
from multiprocessing.connection import Connection
...
from multiprocessing import Pipe, Process
from multiprocessing import Process, Queue
...
shared_list = []
...
def cpu_bound_task_2(counts: int, q: Queue) -> None:
"""Runs a CPU-bound task and sends the results to parent."""
time = cpu_bound_operation(counts)
logging.info(f"time 2 - {time}")
conn.send([time])
conn.close()
q.put([time]) # Add items to the queue
def multiprocessing() -> None:
parent_conn, child_conn = Pipe()
q = Queue()
start = perf_counter()
# Run in a child process - 2
p = Process(target=cpu_bound_task_2, args=(150000000, child_conn))
p = Process(target=cpu_bound_task_2, args=(150000000, q))
p.start() # Starts the process and calls the target function
# Run in the main process - 1
cpu_bound_task_1(150000000)
child_time = parent_conn.recv()
child_time = q.get() # Remove and return an item from the queue
logging.info(f"child process time: {child_time}")
shared_list.append(child_time)
p.join() # Blocks the thread
...
The put()
method is used to add items to the queue. It serializes the object and sends it through the pipe to the internal buffer of the queue.
The get()
method is used to remove and return an item from the queue. It reads the serialized object from the pipe and deserializes it.
And the result is pretty similar.
Pool of worker processes with Pool class
The Pool
class provides a convenient way to parallelize the execution of a function across multiple input values, distributing the input data across available processes.
When a Pool
object is created, it initializes several worker processes. The number of worker processes can be specified with the processes
parameter, or it defaults to the number of logical CPU cores returned by os.cpu_count()
(or by os.process_cpu_count()
since Python 3.13).
Methods like map
, map_async
, apply
, apply_async
, imap
, imap_unordered
, starmap
and starmap_async
are used to submit tasks to the pool.
The Pool
object internally uses queues to send tasks from the main process to the worker processes and to send results back from the worker processes to the main process.
multiprocessing.Pool() source code. Python 3.13
Let’s see some examples!
map
We can provide a function, an iterable, and an integer specifying the chunksize to the map()
method.
This method chops the iterable into a number of chunks and distributes them among the worker processes in the pool.
map
blocks until all items have been processed.
In the first example we have 4 processes availables and an iterable of 4 integers. Our function is executed in each process, and the values of the iterable are distributed among the processes and passed as arguments to the function.
...
def cpu_bound_task(counts: int) -> None:
"""Runs a CPU-bound task."""
time = cpu_bound_operation(counts)
logging.info(f"-------- Process: {os.getpid()} --------")
logging.info(f"time - {time}\n")
return [time]
def multiprocessing() -> None:
args = [50000000, 50000000, 50000000, 50000000]
# Run in worker processes
with Pool(processes=4) as pool:
res = pool.map(cpu_bound_task, args) # blocks until the result is ready
logging.info(res)
...
Each process execute the given function with one value from the iterable.
22:17:24: Init parallel program
22:17:25: -------- Process: 44665 --------
22:17:25: time - (56633.60939425, 56634.651306916)
22:17:25: -------- Process: 44663 --------
22:17:25: time - (56633.612802333, 56634.656112583)
22:17:25: -------- Process: 44666 --------
22:17:25: time - (56633.64181075, 56634.682685541)
22:17:25: -------- Process: 44664 --------
22:17:25: time - (56633.641066125, 56634.707866125)
22:17:25: [[(56633.60939425, 56634.651306916)], [(56633.612802333, 56634.656112583)], [(56633.641066125, 56634.707866125)], [(56633.64181075, 56634.682685541)]]
22:17:26: Finish parallel program
The four tasks are effectively executed in parallel.
Now if we add two more values to the iterable and keep the four workers we can see that the two new values are processed as the worker processes become available.
...
def multiprocessing() -> None:
args = [50000000, 50000000, 50000000, 50000000]
args = [50000000, 50000000, 50000000, 50000000, 50000000, 50000000]
# Run in worker processes
with Pool(processes=4) as pool:
res = pool.map(cpu_bound_task, args) # blocks until the result is ready
logging.info(res)
...
Now let’s play with the chunksize
parameter.
If we submit an iterable with 24 values to be processed and pass a chunksize
of 1, we can see the same behaviour as above.
...
def multiprocessing() -> None:
n_tasks = 24
args = [50000000] * n_tasks
chunksize = 1
# Run in worker processes
with Pool(processes=4) as pool:
res = pool.map(
cpu_bound_task, args, chunksize
) # blocks until the result is ready
logging.info(res)
...
Instead, we can reduce the number of tasks sent to be processed by setting the chunksize
parameter. The map()
method chops iterables into a number of chunks which it submits to the pool as separate tasks. The (approximate) size of these chunks can be specified by setting chunksize
to a positive integer.
Below we have 24 elements in our iterable args
and a chunksize
of 2. This means that the 24 tasks will be sent as if they were 12 instead of 24, and each task will have 2 elements from the iterable.
...
def multiprocessing() -> None:
n_tasks = 24
args = [50000000] * n_tasks
chunksize = 1
chunksize = 2
# Run in worker processes
...
And with a chunksize
of 3 we get somethin like this:
...
def multiprocessing() -> None:
n_tasks = 24
args = [50000000] * n_tasks
chunksize = 2
chunksize = 3
# Run in worker processes
...
map_async
map_async()
returns an AsyncResult object and does not block the thread. You can use AsyncResult
methods like get()
, wait()
or ready()
to wait for the results or check if they are ready. get()
and wait()
accept a timeout
argument.
...
def multiprocessing() -> None:
n_tasks = 24
args = [50000000] * n_tasks
# Run in worker processes
with Pool(processes=4) as pool:
res = pool.map_async(cpu_bound_task, args, 1) # does not block
logging.info(f"res.ready(): {res.ready()}")
logging.info("waiting...")
res.wait() # blocks until the result is ready
logging.info(f"res.ready(): {res.ready()}")
res = res.get()
...
20:05:53: Init parallel program
20:05:53: res.ready(): False
20:05:53: waiting...
20:05:54: -------- Process: 35723 --------
20:05:54: time - (151823.566433833, 151824.614282083)
...
20:06:00: res.ready(): True
20:06:00: Finish parallel program
You can find other useful methods such as apply_async
and others in the multiprocessing.Pool and the multiprocessing.AsyncResult docs.
The apply()
method calls the function with the arguments that you provide and blocks until the function completes, returning the result. It is similar to a direct function call, but it executes in a worker process from the pool. apply_async()
returns an AsyncResult
object and does not block the thread.
Below I leave some resources that helped me to understand this topic better.
Feel free to contact me by any social network. Any feedback is appreciated!
Thanks for reading 💛