How to update single progress bar in multiprocessing map() #1121
Replies: 23 comments
-
import time
import random
from multiprocessing import Pool
import tqdm
def myfunc(a):
time.sleep(random.random())
pbar.update(1)
return a ** 2
pool = Pool(2)
for _ in tqdm.tqdm(pool.imap_unordered(myfunc, range(100), total=100):
pass
pool.close()
pool.join()
pbar.close() |
Beta Was this translation helpful? Give feedback.
-
interesting. As @Akhail mentioned you could move the progress logic to outside: import time
import random
from multiprocessing import Pool
import tqdm
def myfunc(a):
time.sleep(random.random())
return a ** 2
pool = Pool(2)
for _ in tqdm.tqdm(pool.imap_unordered(myfunc, range(100)), total=100):
pass
pool.close()
pool.join()
100%|████████████████████████████| 100/100 [00:23<00:00, 4.20it/s] but this isn't ideal. There must be a simpler solution. |
Beta Was this translation helpful? Give feedback.
-
but how about if the for k, task in enumerate(tasks):
pool.apply_async(task_runner, args=(), callback=task_callback)
pool.close()
pool.join() |
Beta Was this translation helpful? Give feedback.
-
of course... import time
import random
from multiprocessing import Pool
from tqdm import tqdm
def myfunc(a):
time.sleep(random.random())
return a ** 2
pool = Pool(2)
'''
for _ in tqdm(pool.imap_unordered(myfunc, range(100)), total=100):
pass
'''
pbar = tqdm(total=100)
def update(*a):
pbar.update()
# tqdm.write(str(a))
for i in range(pbar.total):
pool.apply_async(myfunc, args=(i,), callback=update)
# tqdm.write('scheduled')
pool.close()
pool.join() |
Beta Was this translation helpful? Give feedback.
-
@casperdcl thanks very much
thanks |
Beta Was this translation helpful? Give feedback.
-
scheduled (0,)
(1,)
(9,)
(4,)
...
(9216,)
(9025,)
(9604,)
(9409,)
(9801,)
100%|████████████████████████████| 100/100 [00:22<00:00, 3.48it/s] |
Beta Was this translation helpful? Give feedback.
-
I used the following solution in my multiprocessing solution that parsed multiple files at once: from multiprocessing import Pool, Process, Manager
from tqdm import tqdm
manager = Manager()
queue = manager.Queue()
def process_file(q, fname):
last_pos = 0
with open(fname) as infile:
for line in infile:
infile.readline()
# ... do something with the line
q.put(infile.tell() - last_pos)
last_pos = infile.tell()
def show_prog(q, total_bytes):
prog = tqdm(total=total_bytes, desc="Total", unit='B', unit_scale=True)
while 1:
try:
to_add = q.get(timeout=1)
prog.n += to_add
prog.update(0)
if prog.n >= total_bytes:
break
except:
continue
pool = Pool(processes=5)
manager = Manager()
queue = manager.Queue()
total_bytes = 0
for f in files:
total_bytes += os.stat(f).st_size
pool.apply_async(process_file, args=(queue, f))
progress = Process(target=show_prog, args=(queue, final_bytes))
progress.start()
pool.close()
pool.start()
pool.join()
progress.join() |
Beta Was this translation helpful? Give feedback.
-
To answer the original question, from multiprocessing import Pool
def myfunc(a):
return a ** 2
N = 100
pool = Pool(2)
res = pool.map(myfunc, range(N))
pool.close()
pool.join() becomes: from multiprocessing import Pool
from tqdm import tqdm
def myfunc(a):
return a ** 2
N = 100
pbar = tqdm(total=N)
res = [None] * N # result list of correct size
def wrapMyFunc(arg):
return arg, myfunc(arg)
def update((i, ans)):
# note: input comes from async `wrapMyFunc`
res[i] = ans # put answer into correct index of result list
pbar.update()
pool = Pool(2)
for i in range(N):
pool.apply_async(wrapMyFunc, args=(i,), callback=update)
pool.close()
pool.join()
pbar.close() |
Beta Was this translation helpful? Give feedback.
-
@casperdcl, I would like to reopen this issue, because I'm still seeing the same issue, but in a slightly different situation. I have a need to parallelise calling a function that can't be directly imported (because its defined on-the-fly, for example). This means (AFAIK) that I can't use See the following (somewhat contrived) example: #!/usr/bin/env python
import time
import random
from operator import itemgetter
from multiprocessing import (Process, Queue)
import tqdm
NINPUTS = 100
NPROC = 4
def process(func, inq, outq, bar):
"""Read from input `Queue`, and put result of `func` into output `Queue`
"""
while True:
idx, arg = inq.get()
if idx is None:
break
outq.put((idx, func(arg)))
bar.update()
def run():
"""Parallelise inline function
"""
inputs = list(range(NINPUTS))
bar = tqdm.tqdm(total=NINPUTS)
# function to be parallelised
def myfunc(a):
time.sleep(random.random())
return a ** 2
inq = Queue()
outq = Queue()
# create processing pool
pool = [Process(target=process, args=(myfunc, inq, outq, bar)) for
_ in range(NPROC)]
for proc in pool:
proc.daemon = True
proc.start()
for x in enumerate(inputs): # fill queue with inputs
inq.put(x, block=False)
for _ in range(NPROC): # sentinel to signal empty
inq.put((None, None))
# get results and close pool
results = [outq.get() for _ in range(NINPUTS)]
for proc in pool:
proc.join()
bar.close()
print([x for i, x in sorted(results, key=itemgetter(0))])
run() I end up with an output like:
Can you (or anyone else) help me out for this case as well? [Thanks for the previous solution, which was very helpful] |
Beta Was this translation helpful? Give feedback.
-
You may consider only use tqdm where you collect the results |
Beta Was this translation helpful? Give feedback.
-
Or change it to imap_unordered with chunksize defined |
Beta Was this translation helpful? Give feedback.
-
@chengs, can you elaborate on how this would work? |
Beta Was this translation helpful? Give feedback.
-
@chengs, never mind that last request, the suggestion to add the |
Beta Was this translation helpful? Give feedback.
-
Yes in this case
should be from tqdm import trange
results = [outq.get() for _ in trange(NINPUTS)] And remove the |
Beta Was this translation helpful? Give feedback.
-
from https://stackoverflow.com/questions/41920124/multiprocessing-use-tqdm-to-display-a-progress-bar from multiprocessing import Pool
import tqdm
import time
def _foo(my_number):
square = my_number * my_number
time.sleep(1)
return square
if __name__ == '__main__':
with Pool(2) as p:
r = list(tqdm.tqdm(p.imap(_foo, range(30)), total=30)) |
Beta Was this translation helpful? Give feedback.
-
After investigating lots of methods, I write a package to handle it, here is the usage: import threading
from concurrent.futures import ThreadPoolExecutor
import time
from tqdm_multi_thread import TqdmMultiThreadFactory
def demo(factory, position, total):
with factory.create(position, total) as progress:
for _ in range(0, total, 5):
progress.update(5)
time.sleep(0.001 * (position % 5 + 1))
with ThreadPoolExecutor(max_workers=20) as executor:
tasks = range(100)
lock = threading.Lock()
multi_thread_factory = TqdmMultiThreadFactory()
for i, url in enumerate(tasks, 1):
executor.submit(demo, multi_thread_factory, i, 100) more details: tqdm_multi_thread |
Beta Was this translation helpful? Give feedback.
-
Another repo and approach here https://github.com/swansonk14/p_tqdm |
Beta Was this translation helpful? Give feedback.
-
Since this is still the top search result, I will add yet another option. While I think that the suggestion from @epruesse is probably the best option all around since it gives in order results but also updates the progress bar in an elegant way while the process pool is working, the one place where this is less ideal is with work that has a highly variable runtime, as the progress bar will stop waiting while the slow work is being done, then suddenly jump very far ahead. In this case, if it's desired to update the progress bar as the work runs, it's possible to update the progress bar manually: import time
import multiprocessing as mp
from ctypes import c_int32
import tqdm
def f(p):
time.sleep(min(p, 1))
with counter_lock:
counter.value += 1
return p
counter = mp.Value(c_int32)
counter_lock = mp.Lock()
params = [i for i in range(10)]
with tqdm.tqdm(total=len(params)) as pbar:
with mp.Pool() as pool:
future = pool.map_async(f, params)
while not future.ready():
if counter.value != 0:
with counter_lock:
increment = counter.value
counter.value = 0
pbar.update(n=increment)
time.sleep(1)
result = future.get() |
Beta Was this translation helpful? Give feedback.
-
Yet another approach using from concurrent.futures import as_completed, ProcessPoolExecutor
from tqdm import tqdm
def process_param(param):
return param
params = range(100000)
executor = ProcessPoolExecutor()
jobs = [executor.submit(process_param, param) for param in params]
results = []
for job in tqdm(as_completed(jobs), total=len(jobs)):
results.append(job.result()) |
Beta Was this translation helpful? Give feedback.
-
note there's a Feel free to PR and improve it and/or add alternative approaches. |
Beta Was this translation helpful? Give feedback.
-
Just a quick note that I wasn't able to get Because I also need to handle uncaught exceptions in the parent process, I can't actually use So, for now, @mzur's approach is best. I just wrap the calls to |
Beta Was this translation helpful? Give feedback.
-
The simplest way would probably be to apply tqdm() around the inputs, rather than the mapping function. For example:
|
Beta Was this translation helpful? Give feedback.
-
Beta Was this translation helpful? Give feedback.
-
[macOS 10.13.1, python 2.7.14 (macports), tqdm 4.19.4 (macports)]
I am struggling to work out how to get a single progress bar to update on every completion from a multiprocessed, mapped function. Consider the following example:
The idea is to have a single progress bar that updates every time a call to
myfunc
completes anywhere in the pool. However, what I see is that in each process thebar
seems to have an independent counter, so I only get to 50% (or thereabouts, because ofrandom()
) when the job finishes:Is there a clean way to implement a single
tqdm
progress bar that I can then update from inside one of many child processes?Apologies if this is already asked/solved.
Beta Was this translation helpful? Give feedback.
All reactions