Concurrent and Parallel Programming in Python (Part 2)

The previous post introduced essential approaches to creating threads and processes in Python. In this post, a more detailed focus on interfaces to concurrent and parallel programming in Python will be described, specifically working with a pool of threads or processes via the multiprocessing and concurrent.futures modules.

Introduction

The main goal of this post is to illustrate on examples the different interfaces of the multiprocessing and concurrent.futures modules. These modules can be used both for concurrent programming as well as for parallel programming. In other words, they provide interfaces for working with threads and also interfaces for working with processes. In this article, only different ways of working with a pool of threads or processes will be explained although this represents just a small part of the API provided by the multiprocessing module. Maybe you are familiar with The Zen of Python, especially with its rule There should be one — and preferably only one — obvious way to do it. As we will see, the multiprocessing and concurrent.futures modules break this rule by providing different ways for doing one thing in Python.

Since both modules have support for blocking as well as for non-blocking methods, the distinction between these two types of methods must be understood before proceeding to the examples. Suppose that we have a process that calls a blocking method. The process has to wait for this method to finish before moving to the next task. However, if the process executes a non-blocking method, the process can move on to the next task before this method finishes and a result can arrive later. Notice that the behavioral difference between these two types of methods does not have any influence on whether tasks are executed in parallel or not.

The multiprocessing Module

multiprocessing is the original Python module for parallel programming. As you can see from its documentation, it provides a very extensive API. Its multiprocessing.pool.ThreadPool and multiprocessing.pool.Pool (abbreviated as multiprocessing.Pool) classes support many ways of working with a pool of threads or processes. Since the presented demonstration programs are CPU-bound rather than I/O-bound, we will use a pool of processes created by multiprocessing.Pool. However, all introduced interfaces are also supported by multiprocessing.pool.ThreadPool. To use it, you only have to change the line where the pool is created. Though the multiprocessing module provides a lot of flexibility, it is harder to comprehend than the concurrent.futures module. So, lets get started.

Blocking Methods

The Pool class provides the following blocking methods:

In the next parts, all of them will be explained and accompanied with the examples.

The apply() Method

The apply() method from the Pool class is mentioned here only for the sake of completeness. Since I do not see any practical usage of this method, an example is not provided here.

If you are familiar with Python 2, you probably remember the apply() built-in function. It took three arguments: two mandatory and one optional. The first argument, called function, represented a callable object. The second one, args, was a sequence of positional arguments. The last one, keywords, represented keyword arguments. The purpose of this function lied in calling function with arbitrary arguments provided in args and keywords (last one only if given). Using apply() was labeled as deprecated in Python 2.3 and it was suggested to use function(*args, **kwargs) instead, as an equivalent. In Python 3, apply() is no longer available.

The apply() method from the Pool class seems to be inspired by this built-in function. It has almost identical interface. However, in this case function is executed in a separate process. Moreover, apply() employs only one worker from the pool and blocks until the result is available. Therefore, undermentioned methods may be more suitable for performing work in parallel.

The map() Method

Arguably, the most known and used Pool method is map(). It is useful when we have one function that needs to be computed over different data. The following code snippet illustrates the use of this method to compute the Fibonacci numbers:

NUMS = list(range(1, 40))

def fib(n):
    if n <= 1:
        return 1
    else:
        return fib(n - 1) + fib(n - 2)

if __name__ == '__main__':
    with multiprocessing.Pool(4) as pool:
        results = pool.map(fib, NUMS)
        for n, result in zip(NUMS, results):
            print('{} {:>3} {:>10}'.format(datetime.now().time(), n, result))

Firstly, NUMS stores the inputs for the Fibonacci function, referred as fib() in our code. Then, a pool with four processes is created via a context manager (the with statement). From this point on, map() takes care of everything. It splits the provided iterable, NUMS in our example, into a number of chunks. Then, it submits a function to be executed, fib() in our case, and these chunks to every process in the pool. Like apply(), map() blocks until the results are ready. Then, it returns a list of results that are printed to the screen:

$ python multiprocessing_map_fib.py
18:33:51.401014   1          1
18:33:51.401113   2          2
18:33:51.401151   3          3
18:33:51.401171   4          5
18:33:51.401188   5          8
...
18:33:51.401671  35   14930352
18:33:51.401686  36   24157817
18:33:51.401702  37   39088169
18:33:51.401718  38   63245986
18:33:51.401733  39  102334155

From the times printed in every line, we can see that the computation in all processes had finished before the printing started.

Apart from other things, the map() method and all its variants provide the chunksize parameter. This parameter, if set to an appropriate value, can improve performance. To illustrate this behaviour, the previous code needs to be slightly modified:

NUMS = list(reversed(range(1, 40)))

... # Exactly the same as before.

As opposed to the previous example, the input numbers in NUMS are stored in reversed order. The reason behind this modification will be explained shortly. The output from the new program can be seen below:

$ time python multiprocessing_map_fib_chunksize_implicit.py
10:10:37.207600  39  102334155
10:10:37.207675  38   63245986
10:10:37.207688  37   39088169
10:10:37.207697  36   24157817
10:10:37.207706  35   14930352
...
10:10:37.207949   5          8
10:10:37.207957   4          5
10:10:37.207965   3          3
10:10:37.207973   2          2
10:10:37.207981   1          1

real    0m53.237s
user    1m12.317s
sys     0m0.016s

Notice that it took almost one minute to compute the Fibonacci numbers for our inputs. We can play a little bit with this solution and improve its speed. As mentioned earlier, map() splits the provided iterable into chunks. The approximate size of these chunks can be set through the optional chunksize parameter. Setting this parameter to a more suitable value can lead to better performance. For illustration, substitute pool.map(fib, NUMS) with pool.map(fib, NUMS, chunksize=1) in our example. And run the program:

$ time python multiprocessing_map_fib_chunksize_explicit.py
10:18:00.138143  39  102334155
10:18:00.138245  38   63245986
10:18:00.138271  37   39088169
10:18:00.138288  36   24157817
10:18:00.138305  35   14930352
...
10:18:00.138715   5          8
10:18:00.138729   4          5
10:18:00.138741   3          3
10:18:00.138755   2          2
10:18:00.138768   1          1

real    0m28.003s
user    1m18.471s
sys     0m0.013s

Indeed, the new version is faster. Instead of 53 seconds, our program needed only 28 seconds to finish. To understand the reason behind this improvement, we need to first figure out to what value is chunksize set if not provided. According to the CPython implementation, chunksize is computed as follows:

if chunksize is None:
    chunksize, extra = divmod(len(iterable), len(self._pool) * 4)
    if extra:
        chunksize += 1

In our program (multiprocessing_map_fib_chunksize_implicit, the one without the specified chunksize), the length of the iterable, len(NUMS), is 39 and we have 4 processes in the pool. Therefore, the result of divmod() is (2, 7) and chunksize is set to 3. Hence, the first three numbers from NUMS are assigned as the inputs to the first process. The next three numbers are assigned to the second process and so on. Notice that the numbers in NUMS are in descending order and we use these numbers as inputs to the Fibonacci function. Therefore, we can see that the first process gets the three most computationally challenging tasks to do. However, when we set chunksize to 1, the three most computationally intensive tasks are assigned to three different processes. In this particular example, this guarantees more fair division of labor between processes and brings us a speed improvement. Consider setting chunksize explicitly if the computation time of each function call varies from input to input.

The starmap() Method

It is another variant of the map() method. Essentially, starmap() behaves almost exactly as map(). The only difference is in passing of arguments. Unlike map(), for starmap() it is expected that the elements of the passed iterable are iterables themselves that need to be unpacked as arguments for the provided function. Lets illustrate this trait practically by comparing the computation of the Ackermann function with map() and starmap(). Firstly, look at the map() version:

NUMS = [(i, j) for i in range(4) for j in range(5)]

def ackermann(x):
    m, n = x
    assert m >= 0 and n >= 0
    if m == 0:
        return n + 1
    elif m > 0 and n == 0:
        return ackermann((m - 1, 1))
    else:
        return ackermann((m - 1, ackermann((m, n - 1))))

if __name__ == '__main__':
    with multiprocessing.Pool(4) as pool:
        results = pool.map(ackermann, NUMS)
        for n, result in zip(NUMS, results):
            print('{} {} {}'.format(datetime.now().time(), n, result))

Firstly, we need to create the list of inputs. Since the Ackermann function takes two arguments, the NUMS list stores pairs that are later used as input arguments to this function. Next, the definition of the Ackermann function is provided. Although it should have two parameters, our ackermann() function has only one, referred as x. However, x is then unpacked into two variables, namely m and n, and these variables represent the real parameters for ackermann(). Another inconvenience, besides unpacking the input parameter, is packing arguments into the tuple when ackermann() is recursively called. The rest of the code is similar to the previous example. The results after running this program are printed to the screen:

$ python multiprocessing_map_ack.py
12:10:27.295612 (0, 0) 1
12:10:27.295695 (0, 1) 2
12:10:27.295721 (0, 2) 3
12:10:27.295729 (0, 3) 4
12:10:27.295737 (0, 4) 5
...
12:10:27.295812 (3, 0) 5
12:10:27.295820 (3, 1) 13
12:10:27.295833 (3, 2) 29
12:10:27.295841 (3, 3) 61
12:10:27.295848 (3, 4) 125

To simplify the previous solution the starmap() method can be used:

NUMS = [(i, j) for i in range(4) for j in range(5)]

def ackermann(m, n):
    assert m >= 0 and n >= 0
    if m == 0:
        return n + 1
    elif m > 0 and n == 0:
        return ackermann(m - 1, 1)
    else:
        return ackermann(m - 1, ackermann(m, n - 1))

if __name__ == '__main__':
    with multiprocessing.Pool(4) as pool:
        results = pool.starmap(ackermann, NUMS)
        for n, result in zip(NUMS, results):
            print('{} {} {}'.format(datetime.now().time(), n, result))

Unlike map(), starmap() unpacks the inputs from NUMS automatically before supplying them to the called function. Therefore, the implementation of ackermann() looks more natural and it is easier to grasp. No unpacking, packing and extra parentheses are necessary. The results are as expected:

$ python multiprocessing_starmap_ack.py
10:28:37.381094 (0, 0) 1
10:28:37.381932 (0, 1) 2
10:28:37.381968 (0, 2) 3
10:28:37.381985 (0, 3) 4
10:28:37.381998 (0, 4) 5
...
10:28:37.382124 (3, 0) 5
10:28:37.382135 (3, 1) 13
10:28:37.382147 (3, 2) 29
10:28:37.382158 (3, 3) 61
10:28:37.382189 (3, 4) 125

The starmap() method is useful in situations where the provided function takes more than one argument and inputs for this function are stored in an iterable as tuples. Each tuple represents arguments for the given function.

Non-Blocking Methods

Next, the non-blocking methods of the Pool class will be presented. Particularly, we will cover these methods:

For non-blocking methods it is typical to return a proxy for a result immediately after calling. By this technique, the main process does not need to wait for other processes to finish their jobs. It can continue with its tasks and later it can inquire the proxy about the results. This proxy can be an iterator like in the imap() or imap_unordered() methods. However, for apply_async(), map_async() and starmap_async() this proxy is an instance of the multiprocessing.pool.AsyncResult class. This class provides the following methods:

The first method, get(), returns the result if available. Otherwise, it blocks until the result becomes ready. The method also provides an optional parameter called timeout that specifies the time interval in seconds for blocking. If this interval is exceeded and the result is not available, the multiprocessing.TimeoutError exception is raised. The wait() method blocks until the result is available. If it is used with an optional timeout argument, it blocks for at most timeout seconds. Whereas ready() returns whether the call has completed, the successful() method returns whether the call has completed without raising an exception.

The imap() Method

imap() is another method from the Pool class. The computation of the Fibonacci numbers can be reimplemented via this method in the following way:

NUMS = list(range(1, 40))

def fib(n):
    ... # Exactly the same as before.

if __name__ == '__main__':
    with multiprocessing.Pool(4) as pool:
        results = pool.imap(fib, NUMS)
        for n, result in zip(NUMS, results):
            print('{} {:>3} {:>10}'.format(datetime.now().time(), n, result))

The only difference between this implementation and the one using map() lies in employing imap() instead of map(). Basically, imap() behaves almost identically to the map() method. However, there is a small distinction. For careful readers, this small behavioural difference may be obvious from the output of this program:

$ python multiprocessing_imap_fib.py
18:41:48.714369   1          1
18:41:48.715026   2          2
18:41:48.715517   3          3
18:41:48.716009   4          5
18:41:48.716491   5          8
...
18:41:54.560798  35   14930352
18:41:57.037644  36   24157817
18:42:13.798026  37   39088169
18:42:21.370056  38   63245986
18:42:22.833434  39  102334155

From the times printed in every line, you can see that there is more than 30 seconds delay between printing the first line and printing the last line. Also, if you run the program on your own, you will be getting the output gradually and not all at once as in the map() example. The reason behind this behaviour lies in the return value of imap(). Whereas map() returns a list of results, imap() returns an iterator to results. Therefore, you can iterate over the results immediately after they become available. Of course, the iteration preserves the order and nothing is skipped. If the next result is not available, then the iterator blocks. This behaviour makes imap() suitable for scenarios where you need to process computed results in some way after the computation (e.g. pipelines). There is no need to have all results from the first step before the second step may be launched.

Like map(), the imap() method also provides the chunksize parameter. However, the default value for this parameter does not depend on the length of the iterable and the number of processes. Instead, it is implicitly set to 1. By tuning this value, you can achieve better performance like in the previous example with map().

The imap_unordered() Method

The imap_unordered() method is a different flavour of imap(). Almost everything that has been said about imap() applies to imap_unordered() as well. The previous implementation can be rewritten as follows:

NUMS = list(range(1, 40))

def fib(n):
    ... # Exactly the same as before.

def n_fib(n):
    return n, fib(n)

if __name__ == '__main__':
    with multiprocessing.Pool(4) as pool:
        results = pool.imap_unordered(n_fib, NUMS)
        for n, result in results:
            print('{} {:>3} {:>10}'.format(datetime.now().time(), n, result))

In the code, we substitute the imap() method for imap_unordered(). Like imap(), imap_unordered() returns an iterator to the results. However, as the name implies, this method does not preserve the order of the results. Therefore, it is suitable for situations where the order in which results come is not important. Of course, if the order is not important but you need to know the corresponding input for each result, imap_unordered() can be used. However, we need to ensure that the called function returns a pair (input, output). In our case, the n_fib() function is employed for this purpose. The results are:

$ python multiprocessing_imap_unordered_fib.py
20:44:02.060931   1          1
20:44:02.061025   3          3
20:44:02.061070   2          2
20:44:02.061109   5          8
20:44:02.061146   6         13
...
20:44:09.518507  36   24157817
20:44:11.028212  35   14930352
20:44:15.393635  37   39088169
20:44:24.840203  38   63245986
20:44:35.874355  39  102334155

Although the results are not ordered with respect to the inputs, from the output it is obvious to which input the result belongs.

The apply_async() Method

The next non-blocking method is apply_async(). Basically, it is almost identical to the apply() method with only one little but very powerful distinction. Like apply(), apply_async() assigns and executes the given function in a separate process. Unlike apply(), the main process does not wait for the function to finish. It receives the AsyncResult object from apply_async() and this object is later used to access the result. In the meantime, the main process can do whatever it needs to do. The apply_async() method is useful in conditions where we need to run one function over inputs of unknown length. Moreover, it can be also employed in scenarios where there is a need to run different functions over different or same data. This last situation is shown in the following example:

def ackermann(m, n):
    ... # Exactly the same as before.

def fib(n):
    ... # Exactly the same as before.

if __name__ == '__main__':
    tasks = multiprocessing.Queue()
    tasks.put((fib, 8))
    tasks.put((ackermann, 2, 4))
    tasks.put((fib, 18))
    tasks.put((ackermann, 3, 1))
    tasks.put((fib, 28))
    tasks.put((ackermann, 3, 4))
    tasks.put((fib, 34))
    tasks.put(None)

    results = []
    with multiprocessing.Pool(4) as pool:
        while True:
            task = tasks.get()
            if task is None:
                break
            func, *args = task
            result = pool.apply_async(func, args)
            results.append((func.__name__, args, result))

        for func_name, args, result in results:
            result = result.get()
            args = ', '.join(str(arg) for arg in args)
            print('{}({}) = {}'.format(func_name, args, result))

Firstly, two functions, namely ackermann() and fib(), are implemented. Subsequently, the queue tasks is created and several tasks are added to this queue. Some of these tasks require the computation of the Fibonacci function, while others require the computation of the Ackermann function. Additionally, a sentinel value, None, is added as the last item to the queue. Although multiprocessing.Queue provides the empty() method to test if the queue is empty, due to multiprocessing semantics, it is not reliable. Therefore, the presence of None is used as a condition of termination later in the program. Then, a pool of 4 worker processes is created and all tasks from the queue are assigned to separate workers via the apply_async() method. The result returned from this method is an instance of AsyncResult and it is appended to the results list together with the function name and arguments. Finally, the output is printed. To obtain the result from AsyncResult the get() method is used. Notice that this method blocks until the result is available. The results from this program are:

$ python multiprocessing_apply_async.py
fib(8) = 34
ackermann(2, 4) = 11
fib(18) = 4181
ackermann(3, 1) = 13
fib(28) = 514229
ackermann(3, 4) = 125
fib(34) = 9227465
The map_async() Method

The map_async() method is another derivate of map(). The difference between map() and map_async() lies in their return values. Whereas map() returns a list of results after the computations are completed, map_async() returns an instance of AsyncResult immediately. Therefore, map_async() comes in handy whenever you need to compute one function over different data in separate processes and you do not want to block the main process during these computations. Handling the AsyncResult object can be done in a similar way like in the previous example illustrating apply_async().

The starmap_async() Method

The behaviour of starmap_async() is a combination of starmap() and map_async(). Like starmap(), starmap_async() expects that the provided iterable contains iterables that need to be unpacked and passed to the given function as arguments. Like map_async(), it returns an AsyncResult object and does not block. It can be used in similar situations like starmap() when we want to avoid blocking.

Now that we have covered the multiprocessing module, lets move to the concurrent.futures module.

The concurrent.futures Module

The concurrent.futures module is a new addition to Python since the 3.2 version. Unlike multiprocessing, it offers a much simpler interface with fewer options. Its concurrent.futures.ThreadPoolExecutor and concurrent.futures.ProcessPoolExecutor classes provide a minimalistic interface for working with a pool of threads or processes. Since both classes inherit from the abstract Executor class, they implement the same interface. Whereas the ThreadPoolExecutor class can be seen as the lightweight variant of multiprocessing.pool.ThreadPool, ProcessPoolExecutor can be used as a substitute for multiprocessing.Pool. Like in the previous text, the presented examples are CPU-bound, so we will use ProcessPoolExecutor. For working with a pool of threads, you will only need to modify the line responsible for creating the pool.

Non-Blocking Methods

As opposed to the rich API provided by the pools created via the multiprocessing module, the Executor class and its two offsprings offer only two methods:

Both of them are non-blocking. The map() method returns an iterator to the results, while submit() returns an instance of the concurrent.futures.Future class which is an equivalent to AsyncResult from the multiprocessing.pool module. This class offers several methods. Some of them are:

The first method, running(), returns True if the call is currently executed and cannot be cancelled. On the other hand, the done() method returns True if the call was successfully cancelled or its execution has finished. The result() method returns the value returned by the call if available. Otherwise, it blocks until the result becomes ready. Like get() from the AsyncResult class, this method also provides the timeout parameter. If it is specified, then the method waits up to timeout seconds. If the call does not finish in the specified time interval, the concurrent.futures.TimeoutError exception is raised. The add_done_callback() method is special for concurrent.futures.Future because it does not have an equivalent in the AsyncResult class. This method binds a callable to the Future instance. When Future is cancelled or its run has completed, then this callable will be called with Future as its only argument. This functionality can be useful in several situations, like logging.

The map() Method

The map() method from concurrent.futures.ProcessPoolExecutor has the exact behaviour as imap() from multiprocessing.Pool. The imap() implementation of computation of the Fibonacci numbers can be rewritten with ProcessPoolExecutor and its map() method as follows:

NUMS = list(range(1, 40))

def fib(n):
    ... # Exactly the same as before.

if __name__ == '__main__':
    with concurrent.futures.ProcessPoolExecutor(max_workers=4) as pool:
        results = pool.map(fib, NUMS)
        for n, result in zip(NUMS, results):
            print('{} {:>3} {:>10}'.format(datetime.now().time(), n, result))

The NUMS and fib() definitions are the same as in the imap() version. The creating of a process pool is slightly different. Instead of multiprocessing.Pool, ProcessPoolExecutor from concurrent.futures is used to create a pool of at most max_workers processes. If max_workers is not provided or set to None, it will default to the number of processors on the machine. The rest of the code is familiar from the imap() implementation. The only difference is employing map() instead of imap(). Like imap(), map() returns an iterator to the results. Therefore, it does not block the execution of the main process. Of course, the output is the same as well. Indeed, it is also printed gradually as the results become available:

$ python concurrent_futures_map.py
16:19:15.247012   1          1
16:19:15.247125   2          2
16:19:15.247734   3          3
16:19:15.247995   4          5
16:19:15.248055   5          8
...
16:19:24.537348  35   14930352
16:19:24.537410  36   24157817
16:19:31.161037  37   39088169
16:19:36.433663  38   63245986
16:19:49.447529  39  102334155

By the way, everything that was previously said for imap() from multiprocessing.Pool is also valid for the map() method from concurrent.futures.ProcessPoolExecutor.

For the sake of precision, a small detour is needed. Although the interface for ThreadPoolExecutor is the same as for ProcessPoolExecutor, the max_workers parameter of the map() method behaves differently. For versions older than Python 3.5, this argument must be provided. However, for newer versions this argument does not need to be given or can be explicitly set to None. In these circumstances, the number of worker threads will be set to the number of processors on the machine multiplied by 5.

The submit() Method

The next method from concurrent.futures.ProcessPoolExecutor is submit(), which is an equivalent to the apply_async() method from the multiprocessing.Pool class. The apply_async() solution of computation of the Fibonacci numbers can be reimplemented with the submit() method as follows:

def ackermann(m, n):
    ... # Exactly the same as before.

def fib(n):
    ... # Exactly the same as before.

if __name__ == '__main__':
    tasks = multiprocessing.Queue()
    tasks.put((fib, 8))
    tasks.put((ackermann, 2, 4))
    tasks.put((fib, 18))
    tasks.put((ackermann, 3, 1))
    tasks.put((fib, 28))
    tasks.put((ackermann, 3, 4))
    tasks.put((fib, 34))
    tasks.put(None)

    results = []
    with concurrent.futures.ProcessPoolExecutor(max_workers=4) as pool:
        while True:
            task = tasks.get()
            if task is None:
                break
            func, *args = task
            result = pool.submit(func, *args)
            results.append((func.__name__, args, result))

        for func_name, args, result in results:
            result = result.result()
            args = ', '.join(str(arg) for arg in args)
            print('{}({}) = {}'.format(func_name, args, result))

The implementation of the ackermann() and fib() functions remains unchanged. Also, the creating and initializing of the tasks queue are exactly the same as in the apply_async() solution. For creating a pool of processes, ProcessPoolExecutor is employed in a same way as in the preceding example with map(). The rest of the code is the same as in the apply_async() version with three small differences. Firstly, the submit() method replaces the apply_async() method. Secondly, unlike apply_async() that takes an iterable as the second argument, submit() needs this iterable to be unpacked. And lastly, submit() returns an instance of concurrent.futures.Future instead of a multiprocessing.pool.AsyncResult object. Like AsyncResult, Future is a reference to the result that does not have to be computed yet. The output is as follows:

$ python concurrent_futures_submit.py
fib(8) = 34
ackermann(2, 4) = 11
fib(18) = 4181
ackermann(3, 1) = 13
fib(28) = 514229
ackermann(3, 4) = 125
fib(34) = 9227465

Otherwise, everything that was said for apply_async() is also valid for this method.

Source Code

The complete source codes of all the provided examples are available on GitHub. All code was written and tested in Python 3.4 and should work also in newer Python versions.

4 Comments

Leave a Comment.