from multiprocessing.pool import Pool
from tqdm.auto import tqdm
import time
def work_function(arg):
time.sleep(arg)return arg
def iterable():
for i in range(0,20):
/10)
time.sleep(iyield i
with Pool(10) as p:
= list(tqdm(p.imap(work_function, iterable(), chunksize=1)))
results print(results)
print("done")
Multiprocessing in Python is already not the best. But on top of it, I always want to add a loading bar that tells me how much work has been performed. It took me a while to figure out how to best do that.
What I want is: - Work gets done in parrallel, either in threads or in processes depending on how much GIL locking there is in my function. - The loading bar progresses as work gets done. - When the progress bars hits the end, work is finished. - You can pipe a generator into the parrallel processing, and it will be consumed progressively
What I settled for is the below code. It consumes the iterable
generator progressively, and displays a progress bar indicating how much work has been achieved.
For multiprocessing:
For multithreading:
from multiprocessing.pool import ThreadPool
from tqdm.auto import tqdm
import time
def work_function(arg):
time.sleep(arg)return arg
def iterable():
for i in range(0,20):
/10)
time.sleep(iyield i
with ThreadPool(10) as p:
= list(tqdm(p.imap(work_function, iterable(), chunksize=1)))
results print(results)
print("done")
Why not use concurrent.futures ?
Because this option does not allow for the imap
multiprocessing. This means that all the iterable will be consumed before being sent to the workers. This could be fine, but sometimes, if the iterable takes time to compute or is a generator itself, you don’t want to consume it fully before starting the concurrent processing.
Try the code below. Notice that the loading bar starts appearing once the iterable has been consumed, which means it already reached the 13/20 iteration.
from concurrent.futures import ProcessPoolExecutor
from tqdm.auto import tqdm
import time
def work_function(arg):
time.sleep(arg)return arg
def iterable():
for i in range(0,20):
/10)
time.sleep(iyield i
with ProcessPoolExecutor(10) as p:
= list(tqdm(p.map(work_function, iterable(), chunksize=1)))
results print(results)
print("done")
What about tqdm process_map ?
tqdm.contrib.concurrent.process_map
is essentially the same as the concurrent.futures behind the scenes, and will exhibit the same behavior.