The previous post introduced essential approaches to creating threads and processes in Python. In this post, a more detailed focus on interfaces to concurrent and parallel programming in Python will be described, specifically working with a pool of threads or processes via the multiprocessing
and concurrent.futures
modules.
Introduction
The main goal of this post is to illustrate on examples the different interfaces of the multiprocessing
and concurrent.futures
modules. These modules can be used both for concurrent programming as well as for parallel programming. In other words, they provide interfaces for working with threads and also interfaces for working with processes. In this article, only different ways of working with a pool of threads or processes will be explained although this represents just a small part of the API provided by the multiprocessing
module. Maybe you are familiar with The Zen of Python, especially with its rule There should be one — and preferably only one — obvious way to do it. As we will see, the multiprocessing
and concurrent.futures
modules break this rule by providing different ways for doing one thing in Python.
Since both modules have support for blocking as well as for non-blocking methods, the distinction between these two types of methods must be understood before proceeding to the examples. Suppose that we have a process that calls a blocking method. The process has to wait for this method to finish before moving to the next task. However, if the process executes a non-blocking method, the process can move on to the next task before this method finishes and a result can arrive later. Notice that the behavioral difference between these two types of methods does not have any influence on whether tasks are executed in parallel or not.
The multiprocessing
Module
multiprocessing
is the original Python module for parallel programming. As you can see from its documentation, it provides a very extensive API. Its multiprocessing.pool.ThreadPool
and multiprocessing.pool.Pool
(abbreviated as multiprocessing.Pool
) classes support many ways of working with a pool of threads or processes. Since the presented demonstration programs are CPU-bound rather than I/O-bound, we will use a pool of processes created by multiprocessing.Pool
. However, all introduced interfaces are also supported by multiprocessing.pool.ThreadPool
. To use it, you only have to change the line where the pool is created. Though the multiprocessing
module provides a lot of flexibility, it is harder to comprehend than the concurrent.futures
module. So, lets get started.
Blocking Methods
The Pool
class provides the following blocking methods:
In the next parts, all of them will be explained and accompanied with the examples.
The apply()
Method
The apply()
method from the Pool
class is mentioned here only for the sake of completeness. Since I do not see any practical usage of this method, an example is not provided here.
If you are familiar with Python 2, you probably remember the apply()
built-in function. It took three arguments: two mandatory and one optional. The first argument, called function
, represented a callable object. The second one, args
, was a sequence of positional arguments. The last one, keywords
, represented keyword arguments. The purpose of this function lied in calling function
with arbitrary arguments provided in args
and keywords
(last one only if given). Using apply()
was labeled as deprecated in Python 2.3 and it was suggested to use function(*args, **kwargs)
instead, as an equivalent. In Python 3, apply()
is no longer available.
The apply()
method from the Pool
class seems to be inspired by this built-in function. It has almost identical interface. However, in this case function
is executed in a separate process. Moreover, apply()
employs only one worker from the pool and blocks until the result is available. Therefore, undermentioned methods may be more suitable for performing work in parallel.
The map()
Method
Arguably, the most known and used Pool
method is map()
. It is useful when we have one function that needs to be computed over different data. The following code snippet illustrates the use of this method to compute the Fibonacci numbers:
NUMS = list(range(1, 40)) def fib(n): if n <= 1: return 1 else: return fib(n - 1) + fib(n - 2) if __name__ == '__main__': with multiprocessing.Pool(4) as pool: results = pool.map(fib, NUMS) for n, result in zip(NUMS, results): print('{} {:>3} {:>10}'.format(datetime.now().time(), n, result))
Firstly, NUMS
stores the inputs for the Fibonacci function, referred as fib()
in our code. Then, a pool with four processes is created via a context manager (the with
statement). From this point on, map()
takes care of everything. It splits the provided iterable, NUMS
in our example, into a number of chunks. Then, it submits a function to be executed, fib()
in our case, and these chunks to every process in the pool. Like apply()
, map()
blocks until the results are ready. Then, it returns a list of results that are printed to the screen:
$ python multiprocessing_map_fib.py 18:33:51.401014 1 1 18:33:51.401113 2 2 18:33:51.401151 3 3 18:33:51.401171 4 5 18:33:51.401188 5 8 ... 18:33:51.401671 35 14930352 18:33:51.401686 36 24157817 18:33:51.401702 37 39088169 18:33:51.401718 38 63245986 18:33:51.401733 39 102334155
From the times printed in every line, we can see that the computation in all processes had finished before the printing started.
Apart from other things, the map()
method and all its variants provide the chunksize
parameter. This parameter, if set to an appropriate value, can improve performance. To illustrate this behaviour, the previous code needs to be slightly modified:
NUMS = list(reversed(range(1, 40))) ... # Exactly the same as before.
As opposed to the previous example, the input numbers in NUMS
are stored in reversed order. The reason behind this modification will be explained shortly. The output from the new program can be seen below:
$ time python multiprocessing_map_fib_chunksize_implicit.py 10:10:37.207600 39 102334155 10:10:37.207675 38 63245986 10:10:37.207688 37 39088169 10:10:37.207697 36 24157817 10:10:37.207706 35 14930352 ... 10:10:37.207949 5 8 10:10:37.207957 4 5 10:10:37.207965 3 3 10:10:37.207973 2 2 10:10:37.207981 1 1 real 0m53.237s user 1m12.317s sys 0m0.016s
Notice that it took almost one minute to compute the Fibonacci numbers for our inputs. We can play a little bit with this solution and improve its speed. As mentioned earlier, map()
splits the provided iterable into chunks. The approximate size of these chunks can be set through the optional chunksize
parameter. Setting this parameter to a more suitable value can lead to better performance. For illustration, substitute pool.map(fib, NUMS)
with pool.map(fib, NUMS, chunksize=1)
in our example. And run the program:
$ time python multiprocessing_map_fib_chunksize_explicit.py 10:18:00.138143 39 102334155 10:18:00.138245 38 63245986 10:18:00.138271 37 39088169 10:18:00.138288 36 24157817 10:18:00.138305 35 14930352 ... 10:18:00.138715 5 8 10:18:00.138729 4 5 10:18:00.138741 3 3 10:18:00.138755 2 2 10:18:00.138768 1 1 real 0m28.003s user 1m18.471s sys 0m0.013s
Indeed, the new version is faster. Instead of 53 seconds, our program needed only 28 seconds to finish. To understand the reason behind this improvement, we need to first figure out to what value is chunksize
set if not provided. According to the CPython implementation, chunksize
is computed as follows:
if chunksize is None: chunksize, extra = divmod(len(iterable), len(self._pool) * 4) if extra: chunksize += 1
In our program (multiprocessing_map_fib_chunksize_implicit
, the one without the specified chunksize
), the length of the iterable, len(NUMS)
, is 39 and we have 4 processes in the pool. Therefore, the result of divmod()
is (2, 7)
and chunksize
is set to 3. Hence, the first three numbers from NUMS
are assigned as the inputs to the first process. The next three numbers are assigned to the second process and so on. Notice that the numbers in NUMS
are in descending order and we use these numbers as inputs to the Fibonacci function. Therefore, we can see that the first process gets the three most computationally challenging tasks to do. However, when we set chunksize
to 1, the three most computationally intensive tasks are assigned to three different processes. In this particular example, this guarantees more fair division of labor between processes and brings us a speed improvement. Consider setting chunksize
explicitly if the computation time of each function call varies from input to input.
The starmap()
Method
It is another variant of the map()
method. Essentially, starmap()
behaves almost exactly as map()
. The only difference is in passing of arguments. Unlike map()
, for starmap()
it is expected that the elements of the passed iterable are iterables themselves that need to be unpacked as arguments for the provided function. Lets illustrate this trait practically by comparing the computation of the Ackermann function with map()
and starmap()
. Firstly, look at the map()
version:
NUMS = [(i, j) for i in range(4) for j in range(5)] def ackermann(x): m, n = x assert m >= 0 and n >= 0 if m == 0: return n + 1 elif m > 0 and n == 0: return ackermann((m - 1, 1)) else: return ackermann((m - 1, ackermann((m, n - 1)))) if __name__ == '__main__': with multiprocessing.Pool(4) as pool: results = pool.map(ackermann, NUMS) for n, result in zip(NUMS, results): print('{} {} {}'.format(datetime.now().time(), n, result))
Firstly, we need to create the list of inputs. Since the Ackermann function takes two arguments, the NUMS
list stores pairs that are later used as input arguments to this function. Next, the definition of the Ackermann function is provided. Although it should have two parameters, our ackermann()
function has only one, referred as x
. However, x
is then unpacked into two variables, namely m
and n
, and these variables represent the real parameters for ackermann()
. Another inconvenience, besides unpacking the input parameter, is packing arguments into the tuple when ackermann()
is recursively called. The rest of the code is similar to the previous example. The results after running this program are printed to the screen:
$ python multiprocessing_map_ack.py 12:10:27.295612 (0, 0) 1 12:10:27.295695 (0, 1) 2 12:10:27.295721 (0, 2) 3 12:10:27.295729 (0, 3) 4 12:10:27.295737 (0, 4) 5 ... 12:10:27.295812 (3, 0) 5 12:10:27.295820 (3, 1) 13 12:10:27.295833 (3, 2) 29 12:10:27.295841 (3, 3) 61 12:10:27.295848 (3, 4) 125
To simplify the previous solution the starmap()
method can be used:
NUMS = [(i, j) for i in range(4) for j in range(5)] def ackermann(m, n): assert m >= 0 and n >= 0 if m == 0: return n + 1 elif m > 0 and n == 0: return ackermann(m - 1, 1) else: return ackermann(m - 1, ackermann(m, n - 1)) if __name__ == '__main__': with multiprocessing.Pool(4) as pool: results = pool.starmap(ackermann, NUMS) for n, result in zip(NUMS, results): print('{} {} {}'.format(datetime.now().time(), n, result))
Unlike map()
, starmap()
unpacks the inputs from NUMS
automatically before supplying them to the called function. Therefore, the implementation of ackermann()
looks more natural and it is easier to grasp. No unpacking, packing and extra parentheses are necessary. The results are as expected:
$ python multiprocessing_starmap_ack.py 10:28:37.381094 (0, 0) 1 10:28:37.381932 (0, 1) 2 10:28:37.381968 (0, 2) 3 10:28:37.381985 (0, 3) 4 10:28:37.381998 (0, 4) 5 ... 10:28:37.382124 (3, 0) 5 10:28:37.382135 (3, 1) 13 10:28:37.382147 (3, 2) 29 10:28:37.382158 (3, 3) 61 10:28:37.382189 (3, 4) 125
The starmap()
method is useful in situations where the provided function takes more than one argument and inputs for this function are stored in an iterable as tuples. Each tuple represents arguments for the given function.
Non-Blocking Methods
Next, the non-blocking methods of the Pool
class will be presented. Particularly, we will cover these methods:
For non-blocking methods it is typical to return a proxy for a result immediately after calling. By this technique, the main process does not need to wait for other processes to finish their jobs. It can continue with its tasks and later it can inquire the proxy about the results. This proxy can be an iterator like in the imap()
or imap_unordered()
methods. However, for apply_async()
, map_async()
and starmap_async()
this proxy is an instance of the multiprocessing.pool.AsyncResult
class. This class provides the following methods:
The first method, get()
, returns the result if available. Otherwise, it blocks until the result becomes ready. The method also provides an optional parameter called timeout
that specifies the time interval in seconds for blocking. If this interval is exceeded and the result is not available, the multiprocessing.TimeoutError
exception is raised. The wait()
method blocks until the result is available. If it is used with an optional timeout
argument, it blocks for at most timeout
seconds. Whereas ready()
returns whether the call has completed, the successful()
method returns whether the call has completed without raising an exception.
The imap()
Method
imap()
is another method from the Pool
class. The computation of the Fibonacci numbers can be reimplemented via this method in the following way:
NUMS = list(range(1, 40)) def fib(n): ... # Exactly the same as before. if __name__ == '__main__': with multiprocessing.Pool(4) as pool: results = pool.imap(fib, NUMS) for n, result in zip(NUMS, results): print('{} {:>3} {:>10}'.format(datetime.now().time(), n, result))
The only difference between this implementation and the one using map()
lies in employing imap()
instead of map()
. Basically, imap()
behaves almost identically to the map()
method. However, there is a small distinction. For careful readers, this small behavioural difference may be obvious from the output of this program:
$ python multiprocessing_imap_fib.py 18:41:48.714369 1 1 18:41:48.715026 2 2 18:41:48.715517 3 3 18:41:48.716009 4 5 18:41:48.716491 5 8 ... 18:41:54.560798 35 14930352 18:41:57.037644 36 24157817 18:42:13.798026 37 39088169 18:42:21.370056 38 63245986 18:42:22.833434 39 102334155
From the times printed in every line, you can see that there is more than 30 seconds delay between printing the first line and printing the last line. Also, if you run the program on your own, you will be getting the output gradually and not all at once as in the map()
example. The reason behind this behaviour lies in the return value of imap()
. Whereas map()
returns a list of results, imap()
returns an iterator to results. Therefore, you can iterate over the results immediately after they become available. Of course, the iteration preserves the order and nothing is skipped. If the next result is not available, then the iterator blocks. This behaviour makes imap()
suitable for scenarios where you need to process computed results in some way after the computation (e.g. pipelines). There is no need to have all results from the first step before the second step may be launched.
Like map()
, the imap()
method also provides the chunksize
parameter. However, the default value for this parameter does not depend on the length of the iterable and the number of processes. Instead, it is implicitly set to 1. By tuning this value, you can achieve better performance like in the previous example with map()
.
The imap_unordered()
Method
The imap_unordered()
method is a different flavour of imap()
. Almost everything that has been said about imap()
applies to imap_unordered()
as well. The previous implementation can be rewritten as follows:
NUMS = list(range(1, 40)) def fib(n): ... # Exactly the same as before. def n_fib(n): return n, fib(n) if __name__ == '__main__': with multiprocessing.Pool(4) as pool: results = pool.imap_unordered(n_fib, NUMS) for n, result in results: print('{} {:>3} {:>10}'.format(datetime.now().time(), n, result))
In the code, we substitute the imap()
method for imap_unordered()
. Like imap()
, imap_unordered()
returns an iterator to the results. However, as the name implies, this method does not preserve the order of the results. Therefore, it is suitable for situations where the order in which results come is not important. Of course, if the order is not important but you need to know the corresponding input for each result, imap_unordered()
can be used. However, we need to ensure that the called function returns a pair (input, output)
. In our case, the n_fib()
function is employed for this purpose. The results are:
$ python multiprocessing_imap_unordered_fib.py 20:44:02.060931 1 1 20:44:02.061025 3 3 20:44:02.061070 2 2 20:44:02.061109 5 8 20:44:02.061146 6 13 ... 20:44:09.518507 36 24157817 20:44:11.028212 35 14930352 20:44:15.393635 37 39088169 20:44:24.840203 38 63245986 20:44:35.874355 39 102334155
Although the results are not ordered with respect to the inputs, from the output it is obvious to which input the result belongs.
The apply_async()
Method
The next non-blocking method is apply_async()
. Basically, it is almost identical to the apply()
method with only one little but very powerful distinction. Like apply()
, apply_async()
assigns and executes the given function in a separate process. Unlike apply()
, the main process does not wait for the function to finish. It receives the AsyncResult
object from apply_async()
and this object is later used to access the result. In the meantime, the main process can do whatever it needs to do. The apply_async()
method is useful in conditions where we need to run one function over inputs of unknown length. Moreover, it can be also employed in scenarios where there is a need to run different functions over different or same data. This last situation is shown in the following example:
def ackermann(m, n): ... # Exactly the same as before. def fib(n): ... # Exactly the same as before. if __name__ == '__main__': tasks = multiprocessing.Queue() tasks.put((fib, 8)) tasks.put((ackermann, 2, 4)) tasks.put((fib, 18)) tasks.put((ackermann, 3, 1)) tasks.put((fib, 28)) tasks.put((ackermann, 3, 4)) tasks.put((fib, 34)) tasks.put(None) results = [] with multiprocessing.Pool(4) as pool: while True: task = tasks.get() if task is None: break func, *args = task result = pool.apply_async(func, args) results.append((func.__name__, args, result)) for func_name, args, result in results: result = result.get() args = ', '.join(str(arg) for arg in args) print('{}({}) = {}'.format(func_name, args, result))
Firstly, two functions, namely ackermann()
and fib()
, are implemented. Subsequently, the queue tasks
is created and several tasks are added to this queue. Some of these tasks require the computation of the Fibonacci function, while others require the computation of the Ackermann function. Additionally, a sentinel value, None
, is added as the last item to the queue. Although multiprocessing.Queue
provides the empty()
method to test if the queue is empty, due to multiprocessing semantics, it is not reliable. Therefore, the presence of None
is used as a condition of termination later in the program. Then, a pool of 4 worker processes is created and all tasks from the queue are assigned to separate workers via the apply_async()
method. The result
returned from this method is an instance of AsyncResult
and it is appended to the results
list together with the function name and arguments. Finally, the output is printed. To obtain the result from AsyncResult
the get()
method is used. Notice that this method blocks until the result is available. The results from this program are:
$ python multiprocessing_apply_async.py fib(8) = 34 ackermann(2, 4) = 11 fib(18) = 4181 ackermann(3, 1) = 13 fib(28) = 514229 ackermann(3, 4) = 125 fib(34) = 9227465
The map_async()
Method
The map_async()
method is another derivate of map()
. The difference between map()
and map_async()
lies in their return values. Whereas map()
returns a list of results after the computations are completed, map_async()
returns an instance of AsyncResult
immediately. Therefore, map_async()
comes in handy whenever you need to compute one function over different data in separate processes and you do not want to block the main process during these computations. Handling the AsyncResult
object can be done in a similar way like in the previous example illustrating apply_async()
.
The starmap_async()
Method
The behaviour of starmap_async()
is a combination of starmap()
and map_async()
. Like starmap()
, starmap_async()
expects that the provided iterable contains iterables that need to be unpacked and passed to the given function as arguments. Like map_async()
, it returns an AsyncResult
object and does not block. It can be used in similar situations like starmap()
when we want to avoid blocking.
Now that we have covered the multiprocessing
module, lets move to the concurrent.futures
module.
The concurrent.futures
Module
The concurrent.futures
module is a new addition to Python since the 3.2 version. Unlike multiprocessing
, it offers a much simpler interface with fewer options. Its concurrent.futures.ThreadPoolExecutor
and concurrent.futures.ProcessPoolExecutor
classes provide a minimalistic interface for working with a pool of threads or processes. Since both classes inherit from the abstract Executor
class, they implement the same interface. Whereas the ThreadPoolExecutor
class can be seen as the lightweight variant of multiprocessing.pool.ThreadPool
, ProcessPoolExecutor
can be used as a substitute for multiprocessing.Pool
. Like in the previous text, the presented examples are CPU-bound, so we will use ProcessPoolExecutor
. For working with a pool of threads, you will only need to modify the line responsible for creating the pool.
Non-Blocking Methods
As opposed to the rich API provided by the pools created via the multiprocessing
module, the Executor
class and its two offsprings offer only two methods:
Both of them are non-blocking. The map()
method returns an iterator to the results, while submit()
returns an instance of the concurrent.futures.Future
class which is an equivalent to AsyncResult
from the multiprocessing.pool
module. This class offers several methods. Some of them are:
The first method, running()
, returns True
if the call is currently executed and cannot be cancelled. On the other hand, the done()
method returns True
if the call was successfully cancelled or its execution has finished. The result()
method returns the value returned by the call if available. Otherwise, it blocks until the result becomes ready. Like get()
from the AsyncResult
class, this method also provides the timeout
parameter. If it is specified, then the method waits up to timeout
seconds. If the call does not finish in the specified time interval, the concurrent.futures.TimeoutError
exception is raised. The add_done_callback()
method is special for concurrent.futures.Future
because it does not have an equivalent in the AsyncResult
class. This method binds a callable to the Future
instance. When Future
is cancelled or its run has completed, then this callable will be called with Future
as its only argument. This functionality can be useful in several situations, like logging.
The map()
Method
The map()
method from concurrent.futures.ProcessPoolExecutor
has the exact behaviour as imap()
from multiprocessing.Pool
. The imap()
implementation of computation of the Fibonacci numbers can be rewritten with ProcessPoolExecutor
and its map()
method as follows:
NUMS = list(range(1, 40)) def fib(n): ... # Exactly the same as before. if __name__ == '__main__': with concurrent.futures.ProcessPoolExecutor(max_workers=4) as pool: results = pool.map(fib, NUMS) for n, result in zip(NUMS, results): print('{} {:>3} {:>10}'.format(datetime.now().time(), n, result))
The NUMS
and fib()
definitions are the same as in the imap()
version. The creating of a process pool is slightly different. Instead of multiprocessing.Pool
, ProcessPoolExecutor
from concurrent.futures
is used to create a pool of at most max_workers
processes. If max_workers
is not provided or set to None
, it will default to the number of processors on the machine. The rest of the code is familiar from the imap()
implementation. The only difference is employing map()
instead of imap()
. Like imap()
, map()
returns an iterator to the results. Therefore, it does not block the execution of the main process. Of course, the output is the same as well. Indeed, it is also printed gradually as the results become available:
$ python concurrent_futures_map.py 16:19:15.247012 1 1 16:19:15.247125 2 2 16:19:15.247734 3 3 16:19:15.247995 4 5 16:19:15.248055 5 8 ... 16:19:24.537348 35 14930352 16:19:24.537410 36 24157817 16:19:31.161037 37 39088169 16:19:36.433663 38 63245986 16:19:49.447529 39 102334155
By the way, everything that was previously said for imap()
from multiprocessing.Pool
is also valid for the map()
method from concurrent.futures.ProcessPoolExecutor
.
For the sake of precision, a small detour is needed. Although the interface for ThreadPoolExecutor
is the same as for ProcessPoolExecutor
, the max_workers
parameter of the map()
method behaves differently. For versions older than Python 3.5, this argument must be provided. However, for newer versions this argument does not need to be given or can be explicitly set to None
. In these circumstances, the number of worker threads will be set to the number of processors on the machine multiplied by 5.
The submit()
Method
The next method from concurrent.futures.ProcessPoolExecutor
is submit()
, which is an equivalent to the apply_async()
method from the multiprocessing.Pool
class. The apply_async()
solution of computation of the Fibonacci numbers can be reimplemented with the submit()
method as follows:
def ackermann(m, n): ... # Exactly the same as before. def fib(n): ... # Exactly the same as before. if __name__ == '__main__': tasks = multiprocessing.Queue() tasks.put((fib, 8)) tasks.put((ackermann, 2, 4)) tasks.put((fib, 18)) tasks.put((ackermann, 3, 1)) tasks.put((fib, 28)) tasks.put((ackermann, 3, 4)) tasks.put((fib, 34)) tasks.put(None) results = [] with concurrent.futures.ProcessPoolExecutor(max_workers=4) as pool: while True: task = tasks.get() if task is None: break func, *args = task result = pool.submit(func, *args) results.append((func.__name__, args, result)) for func_name, args, result in results: result = result.result() args = ', '.join(str(arg) for arg in args) print('{}({}) = {}'.format(func_name, args, result))
The implementation of the ackermann()
and fib()
functions remains unchanged. Also, the creating and initializing of the tasks
queue are exactly the same as in the apply_async()
solution. For creating a pool of processes, ProcessPoolExecutor
is employed in a same way as in the preceding example with map()
. The rest of the code is the same as in the apply_async()
version with three small differences. Firstly, the submit()
method replaces the apply_async()
method. Secondly, unlike apply_async()
that takes an iterable as the second argument, submit()
needs this iterable to be unpacked. And lastly, submit()
returns an instance of concurrent.futures.Future
instead of a multiprocessing.pool.AsyncResult
object. Like AsyncResult
, Future
is a reference to the result that does not have to be computed yet. The output is as follows:
$ python concurrent_futures_submit.py fib(8) = 34 ackermann(2, 4) = 11 fib(18) = 4181 ackermann(3, 1) = 13 fib(28) = 514229 ackermann(3, 4) = 125 fib(34) = 9227465
Otherwise, everything that was said for apply_async()
is also valid for this method.
Source Code
The complete source codes of all the provided examples are available on GitHub. All code was written and tested in Python 3.4 and should work also in newer Python versions.
Great articles! Thanks.
You are welcome.
This is such a great piece of work.
Very helpful and easy to read.
Thank you very much for the kind words :).