Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TQDM bar freezing script with multiprocessing #1160

Open
cwfparsonson opened this issue Apr 20, 2021 · 10 comments
Open

TQDM bar freezing script with multiprocessing #1160

cwfparsonson opened this issue Apr 20, 2021 · 10 comments
Labels
help wanted 🙏 We need you (discussion or implementation) p0-bug-critical ☢ Exception rasing synchronisation ⇶ Multi-thread/processing

Comments

@cwfparsonson
Copy link

Hi, this may be related to #54 (comment) and #249 (comment).

My tqdm bar keeps hanging/freezing right towards the end of my script (when it is 99% complete), which is using multiprocessing Pool.apply_async(callback=lambda _: pbar.update(1) as in https://stackoverflow.com/a/58799918/13398561). The script works fine when I am not using tqdm, and even when I write a custom pbar class to update to sanity check that apply_async() is performing the callback correctly. I've tried setting miniters to 1 and playing with mininterval, maxinterval, and smoothing, and I've tried setting refresh to False for set_description() and set_postfix(). Any ideas what might be causing this?

@casperdcl
Copy link
Sponsor Member

Hmm not too sure. Can you post a minimal code example? I can run the StackOverflow answer you posted to 100% consistently.

Also check out e.g. tqdm.contrib.concurrent.process_map

@casperdcl casperdcl self-assigned this Apr 20, 2021
@casperdcl casperdcl added need-feedback 📢 We need your response (question) p2-bug-warning ⚠ Visual output bad synchronisation ⇶ Multi-thread/processing labels Apr 20, 2021
@casperdcl casperdcl removed their assignment Apr 20, 2021
@cwfparsonson
Copy link
Author

Hi, thanks for the quick reply. I am struggling to reproduce the problem with a minimal code example.

I believe the problem may be originating from the fact I am running a repeating loop which resets the multiprocessing Pool object before re-starting for the next repeat. I think this because it is always right at the end of a repeat that tqdm freezes. Exactly which repeat it freezes on is non-deterministic. I thought maybe I needed to add some delays between each re-start, but this doesn't seem to help. I have also tried re-initialising tqdm at the same time I initialise Pool, but this also doesn't stop the freezing.

This is my best attempt so far at writing a minimal implementation of my actual script. It will only hang if I do not set miniters=1.

import multiprocessing
from tqdm import tqdm
import time

def do_work(x):
    time.sleep(x*0.1)

num_repeats = 10
num_tasks_per_repeat = 100
pbar = tqdm(total=num_repeats*num_tasks_per_repeat)
for repeat in range(num_repeats):
    pool = multiprocessing.Pool(10, maxtasksperchild=1)
    results = [pool.apply_async(do_work, args=(repeat,), callback=lambda _: pbar.update(1)) for _ in range(num_tasks_per_repeat)]
    pool.close()
    pool.join()
    del pool
pbar.close()

However, in my actual (very non-minimal!) script, setting miniters=1 does not prevent hanging. For completness, I will include the script here, however it probably won't be very helpful since it is so messy. The multiprocessing section is on line pool = multiprocessing.Pool(kwargs['num_processes'], maxtasksperchild=kwargs['maxtasksperchild'])

def duplicate_demands_in_demand_data_dict(demand_data, num_duplications=1, **kwargs):
    '''Duplicates set of demands by the specified number of times.'''
    demand_data = copy.deepcopy(demand_data)

    if 'use_multiprocessing' not in kwargs:
        kwargs['use_multiprocessing'] = False
    if 'num_processes' not in kwargs:
        # increase to decrease processing time, decrease to decrease risk of memory errors
        kwargs['num_processes'] = 10 # num processes to run in parallel if multiprocessing
    if 'maxtasksperchild' not in kwargs:
        kwargs['maxtasksperchild'] = 1 # num tasks per process
    
    if 'job_id' in demand_data:
        job_centric = True
    else:
        job_centric = False

    # ensure values of dict are lists
    for key, value in demand_data.items():
        demand_data[key] = list(value)

    init_num_demands = len(demand_data['event_time'])
    demands_to_add = ((2**num_duplications)*init_num_demands) - init_num_demands

    # progress bar
    # if not kwargs['use_multiprocessing']:
        # # TODO: Get progress bar working for multiprocessing
        # pbar = tqdm(total=demands_to_add, 
                # desc='Duplicating demands',
                    # # miniters=1, 
                    # # mininterval=1,
                    # # maxinterval=1, # 2
                    # leave=False,
                    # smoothing=1e-5) # 1
    pbar = tqdm(total=demands_to_add, 
            desc='Duplicating demands',
                miniters=1, 
                # mininterval=1,
                # maxinterval=1, # 2
                leave=False,
                smoothing=1e-5) # 1
    pbar.set_description(refresh=False)
    pbar.set_postfix(refresh=False)

    start = time.time()
    for dup in range(num_duplications):

        # get curr num demands
        num_demands = len(demand_data['event_time'])

        # get curr duration
        duration = max(demand_data['event_time']) - min(demand_data['event_time']) 

        # duplicate demands
        if kwargs['use_multiprocessing']:
            # duplicate demands in parallel
            print('Duplication {} of {}...'.format(dup+1, num_duplications))
            s = time.time()

            # init shared lists for appending duplicated demands to
            jobs = multiprocessing.Manager().list()
            job_ids = multiprocessing.Manager().list()
            unique_ids = multiprocessing.Manager().list()
            flow_ids = multiprocessing.Manager().list()
            sns = multiprocessing.Manager().list()
            dns = multiprocessing.Manager().list()
            flow_sizes = multiprocessing.Manager().list()
            event_times = multiprocessing.Manager().list()
            establishes = multiprocessing.Manager().list()
            indexes = multiprocessing.Manager().list()

            # duplicate demands in parallel
            # pool = multiprocessing.Pool(multiprocessing.cpu_count())
            pool = multiprocessing.Pool(kwargs['num_processes'], maxtasksperchild=kwargs['maxtasksperchild'])
            results = [pool.apply_async(duplicate_demand,
                                        args=(
                                        demand_data['job'][idx], 
                                        demand_data['sn'][idx],
                                        demand_data['dn'][idx],
                                        demand_data['flow_size'][idx],
                                        demand_data['event_time'][idx],
                                        duration,
                                        demand_data['establish'][idx],
                                        demand_data['index'][idx],
                                        num_demands,
                                        idx, 
                                        jobs, 
                                        job_ids,
                                        unique_ids,
                                        flow_ids,
                                        sns,
                                        dns,
                                        flow_sizes,
                                        event_times,
                                        establishes,
                                        indexes,
                                        job_centric),
                                       callback=lambda _: pbar.update(1))
                                        for idx in range(num_demands)]
            pool.close()
            pool.join()
            del pool

            # collect duplicated demands from multiprocessing and add to demand_data
            if job_centric:
                demand_data['job_id'].extend(list(job_ids))
                demand_data['job'].extend(list(jobs))
            demand_data['flow_id'].extend(list(flow_ids))
            demand_data['sn'].extend(list(sns))
            demand_data['dn'].extend(list(dns))
            demand_data['flow_size'].extend(list(flow_sizes))
            demand_data['event_time'].extend(list(event_times))
            demand_data['establish'].extend(list(establishes))
            demand_data['index'].extend(list(indexes))

            e = time.time()
            print('Duplication completed in {} s.'.format(e-s))

        else:
            # not multiprocessing -> duplicate demands sequentially
            # init lists for appending duplicated demands to
            jobs = []
            job_ids = []
            unique_ids = []
            flow_ids = []
            sns = []
            dns = []
            flow_sizes = []
            event_times = []
            establishes = []
            indexes = []
            for idx in range(num_demands):
                duplicate_demand(demand_data['job'][idx], 
                                 demand_data['sn'][idx],
                                 demand_data['dn'][idx],
                                 demand_data['flow_size'][idx],
                                 demand_data['event_time'][idx],
                                 duration,
                                 demand_data['establish'][idx],
                                 demand_data['index'][idx],
                                 num_demands,
                                 idx, 
                                 jobs, 
                                 job_ids,
                                 unique_ids,
                                 flow_ids,
                                 sns,
                                 dns,
                                 flow_sizes,
                                 event_times,
                                 establishes,
                                 indexes,
                                 job_centric)

                # collect duplicated demands and add to demand_data
                if job_centric:
                    demand_data['job_id'].extend(list(job_ids))
                    demand_data['job'].extend(list(jobs))
                demand_data['flow_id'].extend(list(flow_ids))
                demand_data['sn'].extend(list(sns))
                demand_data['dn'].extend(list(dns))
                demand_data['flow_size'].extend(list(flow_sizes))
                demand_data['event_time'].extend(list(event_times))
                demand_data['establish'].extend(list(establishes))
                demand_data['index'].extend(list(indexes))

                pbar.update(1)

    # make sure demand data still ordered in order of event time
    index = np.argsort(demand_data['event_time'])
    for key in demand_data.keys():
        if len(demand_data[key]) == len(index):
            # only index keys which are events (i.e. for job-centric, these are job keys, not flow keys)
            demand_data[key] = [demand_data[key][i] for i in index]

    if not kwargs['use_multiprocessing']:
        pbar.close()
    end = time.time()
    print('Duplicated from {} to {} total demands ({} duplication(s)) in {} s.'.format(init_num_demands, len(demand_data['event_time']), num_duplications, end-start))

    return demand_data 




def duplicate_demand(job, 
                     sn,
                     dn,
                     flow_size,
                     event_time,
                     duration,
                     establish,
                     index,
                     num_demands, 
                     idx, 
                     jobs,
                     job_ids,
                     unique_ids,
                     flow_ids,
                     sns,
                     dns,
                     flow_sizes,
                     event_times,
                     establishes,
                     indexes,
                     job_centric=True):

    if job_centric:
        # job id
        job_id = int(idx + num_demands)
        job.graph['job_id'] = 'job_{}'.format(job_id)

        # attrs inside job
        # job = copy.deepcopy(demand_data['job'])[idx]
        flow_stats = {flow: job.get_edge_data(flow[0], flow[1]) for flow in job.edges} 
        for flow in flow_stats:
            # grab attr_dict for flow
            attr_dict = flow_stats[flow]['attr_dict']

            # update ids
            attr_dict['job_id'] = 'job_{}'.format(idx+num_demands)
            attr_dict['unique_id'] = attr_dict['job_id'] + '_' + attr_dict['flow_id']

            # flow src, dst, & size
            # if data dependency, is a flow
            if attr_dict['dependency_type'] == 'data_dep':
                flow_ids.append(attr_dict['unique_id'])
                sns.append(attr_dict['sn'])
                dns.append(attr_dict['dn'])
                flow_sizes.append(attr_dict['flow_size'])

            # confirm updates
            edge = attr_dict['edge']
            job.add_edge(edge[0], edge[1], attr_dict=attr_dict)

        jobs.append(job)
        job_ids.append('job_{}'.format(job_id))


    else:
        flow_ids.append('flow_{}'.format(int(idx+num_demands)))
        flow_sizes.append(flow_size)
        sns.append(sn)
        dns.append(dn)

    event_times.append(duration + event_time)
    establishes.append(establish)
    indexes.append(index + num_demands)

@cwfparsonson
Copy link
Author

A piece of information which may be relevant is that, when the progress bar gets stuck just at the end of a repeat, I can leave it for as long as I like and nothing will happen, but when I exit the script with ctrl+C, the progress bar updates just after the KeyboardInterrupt.

For example, in the below example, the progress bar reached stage 69/10230 (the next repeat was due to start at 70/10230), and it froze. When I interrupted it, the progress bar updated to 70/10230 as it should:

Duplicating demands: 1%|▍ | 69/10230 [00:02<03:40, 46.08it/s]^CTraceback (most recent call last):
File "main_gen_benchmark_data.py", line 12, in
benchmark_demands = gen_benchmark_demands(path_to_save=path_to_save,
File "/home/zciccwf/phd_project/projects/trafpy/trafpy/benchmarker/tools.py", line 123, in gen_benchmark_demands
demand_data = create_demand_data(min_num_demands=config.MIN_NUM_DEMANDS,
File "/home/zciccwf/phd_project/projects/trafpy/trafpy/generator/src/builder.py", line 147, in create_demand_data
return generator.create_job_centric_demand_data()
File "/home/zciccwf/phd_project/projects/trafpy/trafpy/generator/src/jobcentric.py", line 277, in create_job_centric_demand_data
demand_data = duplicate_demands_in_demand_data_dict(demand_data,
File "/home/zciccwf/phd_project/projects/trafpy/trafpy/generator/src/flowcentric.py", line 833, in duplicate_demands_in_demand_data_dict
pool.join()
File "/home/zciccwf/.conda/envs/deep_scheduler/lib/python3.8/multiprocessing/pool.py", line 666, in join
p.join()
File "/home/zciccwf/.conda/envs/deep_scheduler/lib/python3.8/multiprocessing/process.py", line 149, in join
res = self._popen.wait(timeout)
File "/home/zciccwf/.conda/envs/deep_scheduler/lib/python3.8/multiprocessing/popen_fork.py", line 47, in wait
return self.poll(os.WNOHANG if timeout == 0.0 else 0)
File "/home/zciccwf/.conda/envs/deep_scheduler/lib/python3.8/multiprocessing/popen_fork.py", line 27, in poll
pid, sts = os.waitpid(self.pid, flag)
KeyboardInterrupt
Duplicating demands: 1%|▍ | 70/10230 [00:17<41:14, 4.11it/s]

@casperdcl
Copy link
Sponsor Member

casperdcl commented Apr 20, 2021

Hmm there looks like there's some sort of deadlock caused by pool.join() and perhaps tqdm.tqdm.close() or similar.

This works though:

from tqdm import trange
from tqdm.contrib.concurrent import process_map
from time import sleep


def do_work(x):
    sleep(x*0.1)

num_repeats = 10
num_tasks_per_repeat = 100
for repeat in trange(num_repeats, unit_scale=num_tasks_per_repeat):
    results = process_map(do_work, [repeat] * num_tasks_per_repeat, leave=False)

@casperdcl casperdcl added help wanted 🙏 We need you (discussion or implementation) p0-bug-critical ☢ Exception rasing and removed need-feedback 📢 We need your response (question) p2-bug-warning ⚠ Visual output bad labels Apr 20, 2021
@cwfparsonson
Copy link
Author

process_map seems to result in memory errors for my application, whereas pool.apply_async() does not, so I'll need to stick with pool.apply_async() I think. I don't think it can be tqdm.tqdm.close() causing the issue since the issue occurs inside the for loop even when tqdm.tqdm.close() is called outside of the for loop. Must be some kind of interaction between the progress bar and pool.join()

@cwfparsonson
Copy link
Author

cwfparsonson commented Apr 26, 2021

Hi,

I have finally managed to find a simplified script which will reproduce this bug. Please could you let me know if you can run the below script (might take a couple of minutes)?

import multiprocessing
from tqdm import tqdm
import time
import random

def do_work(x):
    time.sleep(0.00001*random.randint(1, 1000))

num_repeats = 100
num_tasks_per_repeat = 100
pbar = tqdm(total=num_repeats*num_tasks_per_repeat, miniters=1, desc='Testing', leave=False, smoothing=1e-5)
start = time.time()

for repeat in range(num_repeats):
    pool = multiprocessing.Pool(10, maxtasksperchild=1)
    results = [pool.apply_async(do_work, args=(repeat,), callback=lambda _: pbar.update(1)) for _ in range(num_tasks_per_repeat)]
    pool.close()
    pool.join()
    output = [p.get() for p in results]
    del pool
    jobs = output

end = time.time()
pbar.close()
print('Completed test in {} s.'.format(end-start))

I'm going to go through and see if I can remove things to work out if there's a line here causing this, but thought I should post this while I try and do this in case you can see an easy solution I cannot.

@ikegwukc
Copy link

ikegwukc commented Sep 6, 2022

@cwfparsonson were you able to find a solution?

@ellisbrown
Copy link

+1

@cwfparsonson
Copy link
Author

Apologies but from what I remember I was not able to find a solution to using tqdm with multiprocessing apply_async(). I have since switched to using ray for my parallel processing rather than the multiprocessing library - it works with tqdm and in my experience is easier to use, faster, and more memory efficient.

@theisen1337
Copy link

theisen1337 commented May 11, 2023

As of Today I have this issue too, the odd thing is it works fine when I test in pycharm locally but once I move the server to a kube docker on AWS it freezes the process. just hangs.. I don't even see the first tqdm console log. so it hangs at the beginning.

Here is a snipit of code where I am using tqdm in multiple process.

        # total models created.
        length = rows_daily.shape[0]

        # Set timer to only update database every 3 seconds.
        time_offset = 3.0
        timer = time.time() - time_offset - 0.1

        with tqdm(total=length) as pbar:
            with concurrent.futures.ProcessPoolExecutor(max_workers=3) as executor:
                # Submit the tasks to the ProcessPoolExecutor and store the resulting Future objects
                futures = [executor.submit(self.process_row, (index, row)) for index, row in rows_daily.iterrows()]

                # Collect the results as they become available
                for future in concurrent.futures.as_completed(futures):
                    index, result, result_debug = future.result()
                    forecast.loc[index, :] = result
                    forecast_debug.loc[index, :] = result_debug

                    pbar.update(1)

                    current_time = round((time.time() - timer), 2)
                    if current_time > time_offset:
                        timer = time.time()
                        self.app_tracker.update_progress(self.task_id, pbar.format_dict)

        cols = [col for col in rows_daily.columns if col[:7] == self.app_dates['today'].strftime(DATE_FORMAT)[:7]]

Edit: More information, It seems to be that tqdm freezes the collection of results from future.result() When I take TQDM context out this works as expected.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
help wanted 🙏 We need you (discussion or implementation) p0-bug-critical ☢ Exception rasing synchronisation ⇶ Multi-thread/processing
Projects
None yet
Development

No branches or pull requests

5 participants