Python multiprocessing with loading bar

Create a nice multiprocessing logic with a loading bar
programming
Published

April 1, 2022

Multiprocessing in Python is already not the best. But on top of it, I always want to add a loading bar that tells me how much work has been performed. It took me a while to figure out how to best do that.

What I want is: - Work gets done in parrallel, either in threads or in processes depending on how much GIL locking there is in my function. - The loading bar progresses as work gets done. - When the progress bars hits the end, work is finished. - You can pipe a generator into the parrallel processing, and it will be consumed progressively

What I settled for is the below code. It consumes the iterable generator progressively, and displays a progress bar indicating how much work has been achieved.

For multiprocessing:

from multiprocessing.pool import Pool
from tqdm.auto import tqdm 
import time

def work_function(arg): 
    time.sleep(arg)
    return arg

def iterable():
    for i in range(0,20):
        time.sleep(i/10)
        yield i

with Pool(10) as p: 
    results = list(tqdm(p.imap(work_function, iterable(), chunksize=1)))
    print(results)
    print("done")

For multithreading:

from multiprocessing.pool import ThreadPool
from tqdm.auto import tqdm 
import time

def work_function(arg): 
    time.sleep(arg)
    return arg

def iterable():
    for i in range(0,20):
        time.sleep(i/10)
        yield i

with ThreadPool(10) as p: 
    results = list(tqdm(p.imap(work_function, iterable(), chunksize=1)))
    print(results)
    print("done")

Why not use concurrent.futures ?

Because this option does not allow for the imap multiprocessing. This means that all the iterable will be consumed before being sent to the workers. This could be fine, but sometimes, if the iterable takes time to compute or is a generator itself, you don’t want to consume it fully before starting the concurrent processing.

Try the code below. Notice that the loading bar starts appearing once the iterable has been consumed, which means it already reached the 13/20 iteration.

from concurrent.futures import ProcessPoolExecutor
from tqdm.auto import tqdm 
import time

def work_function(arg): 
    time.sleep(arg)
    return arg

def iterable():
    for i in range(0,20):
        time.sleep(i/10)
        yield i

with ProcessPoolExecutor(10) as p: 
    results = list(tqdm(p.map(work_function, iterable(), chunksize=1)))
    print(results)
    print("done")

What about tqdm process_map ?

tqdm.contrib.concurrent.process_map is essentially the same as the concurrent.futures behind the scenes, and will exhibit the same behavior.

How to share data accross workers

When you use thread workers, the data will simply be accessible to every thread directly. You can share it as a variable or as a global object.

some_global_data = pd.read_parquet("huge_file.parquet")

def work_function(arg): 
    # `some_global_data` and `arg` are coming straight form the memory
    # shared accross threads
    s = some_global_data[some_global_data.val == some_data].sum()
    return s

with ThreadPoolExecutor(10) as p: 
    results = list(tqdm(p.map(work_function, iterable())))

However, when you use process workers, the data shared to workers as arguments is most often pickled and shared as a string. This could quickly be an issue. For 2 reasons: - Pickling a big object and sending it to every worker can be very expensive - This will copy the object many times, which could harm the available memory

The global data is shared directly from the memory, but it is copied to each worker. Which is also harming the memory.

So what can we do ? Not a lot, there are no great mechanics (that I am aware of) to share data accross process workers in Python. It’s basically a work in progress: https://lukasz.langa.pl/5d044f91-49c1-4170-aed1-62b6763e6ad0/

There is still a trick you can use, but with varying degrees of success. It’s to simply share the object globally accross the workers.

When Python starts a process, it’s going to fork the main process. This means that all data will be copied to the child processes. But it’s going to do a copy on write. This means that the underlying data will still be read from the main process (even from a child process) and will only be copied when it changes. This means that you can share the data accross all the processes withtout any memory increases.

Unfortunately, this assumption does not hold very long in Python. Since Python modifies object for reference counting (and other) reasons behind the scenes, the object will soon be copied to the child process even if you don’t explicity modify it yourself.

Still, I’ve noticed that this often works and saves me when processing a huge object with many processes. I would then do something like this:


big_object = pd.read_parquet("big_file.parquet")

def work_function(arg): 
    # big object is in the global scope, and might not be copied to the
    # child process for a while. 
    s = big_object[big_object.val == arg].sum()
    return s

with ProcessPoolExecutor(10) as p: 
    results = list(tqdm(p.map(work_function, iterable())))