Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dask support #278

Closed
aplavin opened this issue Oct 1, 2016 · 23 comments · Fixed by #1079 or #1132
Closed

Dask support #278

aplavin opened this issue Oct 1, 2016 · 23 comments · Fixed by #1079 or #1132
Assignees
Labels
p3-enhancement 🔥 Much new such feature submodule ⊂ Periphery/subclasses synchronisation ⇶ Multi-thread/processing to-merge ↰ Imminent
Projects
Milestone

Comments

@aplavin
Copy link
Contributor

aplavin commented Oct 1, 2016

Dask itself has a basic progressbar, but tqdm is certainly better - so I made a basic wrapper:

from dask.callbacks import Callback

class ProgressBar(Callback):
    def _start_state(self, dsk, state):
        self._tqdm = tqdm_notebook(total=sum(len(state[k]) for k in ['ready', 'waiting', 'running', 'finished']))

    def _posttask(self, key, result, dsk, state, worker_id):
        self._tqdm.update(1)

    def _finish(self, dsk, state, errored):
        pass

Usage (the same as dask progressbar):

with ProgressBar():
    ... .compute()

Does it belong to tqdm, what to you think? Also, any further suggestions/improvements?

@lrq3000
Copy link
Member

lrq3000 commented Oct 1, 2016

Thank's for the contribution, that's great!

I did not use dask myself yet, so I cannot provide a precise feedback.

Just a question: in dask, are each element processed with equal time, or
can they take different amount of time? Also, are elements full objects
(like a full dataframe, so the callback will happen after the dataframe
gets entirely processed), or are they exploded into subelements (eg,
dataframe being exploded into rows, and the rows are the elements composing
a dask object)?

2016-10-01 13:30 GMT+02:00 Alexander notifications@github.com:

Dask itself has a basic progressbar, but tqdm is certainly better - so I
made a basic wrapper:

from dask.callbacks import Callback
class ProgressBar(Callback):
def _start_state(self, dsk, state):
self._tqdm = tqdm_notebook(total=sum(len(state[k]) for k in ['ready', 'waiting', 'running', 'finished']))

def _posttask(self, key, result, dsk, state, worker_id):
    self._tqdm.update(1)

def _finish(self, dsk, state, errored):
    pass

Usage (the same as dask progressbar):

with ProgressBar():
... .compute()

Does it belong to tqdm, what to you think? Also, any further
suggestions/improvements?


You are receiving this because you are subscribed to this thread.
Reply to this email directly, view it on GitHub
#278, or mute the thread
https://github.com/notifications/unsubscribe-auth/ABES3ktncMWBPi2FC1Q0redMDz9Y0PK2ks5qvkRggaJpZM4KLu7J
.

@aplavin
Copy link
Contributor Author

aplavin commented Oct 1, 2016

@lrq3000 well, in dask "tasks" are chunks - the count of which you usually specify when creating dask dataframe or something. Each chunk in case of dask dataframe is a pandas dataframe, and we can have callback only once for each chunk. I don't think it's possible to get the count of individual rows or items in the general case because on the low level dask is just a scheduler for various multiprocessing stuff.

Of course, in general tasks can take very different times to process, but for most common things like applying functions to dataframes (I only use dask a a parallel pandas DataFrame.apply) and such - they usually complete in about the same time.

@lrq3000
Copy link
Member

lrq3000 commented Oct 1, 2016

Ok Alex, then I don't think it's possible to do better for now, you already
made a great wrapper :) We will add it to the readme! (or maybe this can be
monkeypatched like we did for pandas?).

2016-10-01 14:40 GMT+02:00 Alexander notifications@github.com:

@lrq3000 https://github.com/lrq3000 well, in dask "tasks" are chunks -
the count of which you usually specify when creating dask dataframe or
something. Each chunk in case of dask dataframe is a pandas dataframe, and
we can have callback only once for each chunk. I don't think it's possible
to get the count of individual rows or items in the general case because on
the low level dask is just a scheduler for various multiprocessing stuff.


You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub
#278 (comment), or mute
the thread
https://github.com/notifications/unsubscribe-auth/ABES3iQM-aPuQneeULHvIVzCVCDNWAEIks5qvlS6gaJpZM4KLu7J
.

@aplavin
Copy link
Contributor Author

aplavin commented Oct 1, 2016

@lrq3000 I don't think monkeypatching makes sense here - the very basic progressbar built-in in dask is actually used the same way as I wrote above:

from dask.diagnostics import ProgressBar

with ProgressBar():
    ... .compute()

So, something like from tqdm.dask import ProgressBar would be ok, I think.

@lrq3000
Copy link
Member

lrq3000 commented Oct 1, 2016

Ok thank you Alexander, indeed it's better to follow the dask's API, we
will do that :)

2016-10-01 16:37 GMT+02:00 Alexander notifications@github.com:

@lrq3000 https://github.com/lrq3000 I don't think monkeypatching makes
sense here - the very basic progressbar built-in in dask is actually used
the same way as I wrote above:

from dask.diagnostics import ProgressBar

with ProgressBar():
... .compute()

So, something like from tqdm.dask import ProgressBar would be ok, I think.


You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub
#278 (comment), or mute
the thread
https://github.com/notifications/unsubscribe-auth/ABES3nea8yFZM38C3t7NowP_-t4-Z58Nks5qvnAMgaJpZM4KLu7J
.

@aplavin
Copy link
Contributor Author

aplavin commented Oct 1, 2016

So I guess I'll make a PR soon.

@lrq3000
Copy link
Member

lrq3000 commented Oct 1, 2016

If you want to be accredited as author in the commits then yes, feel free
to make a PR, we will gladly merge it :) But note that it should be a
submodule, not in the core tqdm!
Le 1 Oct. 2016 17:26, "Alexander" notifications@github.com a écrit :

So I guess I'll make a PR soon.


You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub
#278 (comment), or mute
the thread
https://github.com/notifications/unsubscribe-auth/ABES3trh6w36VkeD9FO9bfLT4ViWDG8Qks5qvnu_gaJpZM4KLu7J
.

@norandom
Copy link

This is very interesting. Has this been merged into a current release?

@lrq3000
Copy link
Member

lrq3000 commented Oct 16, 2016

Not yet merged but it will be.
Le 16 Oct. 2016 11:10, "Marius Ciepluch" notifications@github.com a
écrit :

This is very interesting. Has this been merged into a current release?


You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub
#278 (comment), or mute
the thread
https://github.com/notifications/unsubscribe-auth/ABES3lv62Wi0dL3rH67gM9RQjUDKBbDBks5q0enrgaJpZM4KLu7J
.

@lrq3000
Copy link
Member

lrq3000 commented Oct 16, 2016

To be more precise: this will get merged after #198, and before that I'm waiting for someone to review some bugfixes and enhancements. So don't hold your breath, it will be merged eventually before Christmas I think, but I'm not sure when.

@LankyCyril
Copy link

LankyCyril commented Jul 17, 2018

politely bumps thread :)

@alexifm
Copy link

alexifm commented Jun 30, 2019

Hey, wanted to follow up on this and show a dask distributed version for tqdm. This is based off the progress bars built here: https://github.com/dask/distributed/blob/master/distributed/diagnostics/progressbar.py

import tqdm
from tornado.ioloop import IOLoop
from distributed.utils import LoopRunner, is_kernel
from distributed.client import futures_of
from distributed.diagnostics.progressbar import ProgressBar

class TqdmProgressBar(ProgressBar):
    def __init__(
        self,
        keys,
        scheduler=None,
        interval="100ms",
        loop=None,
        complete=True,
        start=True,
        **tqdm_kwargs
    ):
        super(TqdmProgressBar, self).__init__(
            keys, scheduler, interval, complete)
        self.tqdm = tqdm.tqdm(keys, **tqdm_kwargs)
        self.loop = loop or IOLoop()

        if start:
            loop_runner = LoopRunner(self.loop)
            loop_runner.run_sync(self.listen)

    def _draw_bar(self, remaining, all, **kwargs):
        update_ct = (all - remaining) - self.tqdm.n
        self.tqdm.update(update_ct)

    def _draw_stop(self, **kwargs):
        self.tqdm.close()

class TqdmNotebookProgress(ProgressBar):
    def __init__(
        self,
        keys,
        scheduler=None,
        interval="100ms",
        loop=None,
        complete=True,
        start=True,
        **tqdm_kwargs
    ):
        super(TqdmNotebookProgress, self).__init__(
            keys, scheduler, interval, complete)
        self.tqdm = tqdm.tqdm_notebook(keys, **tqdm_kwargs)
        self.loop = loop or IOLoop()

        if start:
            loop_runner = LoopRunner(self.loop)
            loop_runner.run_sync(self.listen)

    def _draw_bar(self, remaining, all, **kwargs):
        update_ct = (all - remaining) - self.tqdm.n
        self.tqdm.update(update_ct)

    def _draw_stop(self, **kwargs):
        self.tqdm.close()

def tqdm_dask(futures, **kwargs):
    notebook = is_kernel()
    futures = futures_of(futures)
    if not isinstance(futures, (set, list)):
        futures = [futures]
    if notebook:
        return TqdmNotebookProgress(futures, **kwargs)
    else:
        TqdmProgressBar(futures, **kwargs)

If you run in a notebook, you'll get the widget version.

from dask.distributed import Client, LocalCluster

def square(x):
    return x ** 2

def test_progress(n):
    A = client.map(square, range(n))
    TqdmProgressBar(A)
    results = client.gather(A)
    return results

def test_progress_notebook(n):
    A = client.map(square, range(n))
    TqdmNotebookProgress(A)
    results = client.gather(A)
    return results

def test_tqdm_dask(n):
    A = client.map(square, range(n))
    tqdm_dask(A)
    results = client.gather(A)
    return results

cluster = LocalCluster()
client = Client(cluster)

n = 10_000
results = test_progress(n)
results = test_progress_notebook(n)
results = test_tqdm_dask(n)
client.close()
cluster.close()

@casperdcl casperdcl self-assigned this Jun 30, 2019
@casperdcl casperdcl added p3-enhancement 🔥 Much new such feature submodule ⊂ Periphery/subclasses synchronisation ⇶ Multi-thread/processing to-review 🔍 Awaiting final confirmation labels Jun 30, 2019
@casperdcl casperdcl added this to To Do in Casper Aug 30, 2019
@casperdcl casperdcl mentioned this issue Sep 17, 2019
5 tasks
@casperdcl casperdcl moved this from To Do to In Progress in Casper Sep 17, 2019
@thomasaarholt
Copy link

Since it wasn't mentioned in this issue, I'd just like to add to the original post that one can now use tqdm.auto.tqdm instead of tqdm.tqdm_notebook, so the code from OP becomes:

from dask.callbacks import Callback
from tqdm.auto import tqdm

class ProgressBar(Callback):
    def _start_state(self, dsk, state):
        self._tqdm = tqdm(total=sum(len(state[k]) for k in ['ready', 'waiting', 'running', 'finished']))

    def _posttask(self, key, result, dsk, state, worker_id):
        self._tqdm.update(1)

    def _finish(self, dsk, state, errored):
        pass

Usage (the same as dask progressbar):

with ProgressBar():
    ... .compute()

which results in the lovely widget-based progressbar in the notebook:

and the standard one in command line:

@Eliran-Turgeman
Copy link

@thomasaarholt Is there a way to support a custom description for this ProgressBar callback?

@thomasaarholt
Copy link

Sure! Just modify the tqdm(...) command to be tqdm(..., desc='your description'). See the docs for lots more options.

@Eliran-Turgeman
Copy link

@thomasaarholt I meant like how you would pass as an argument a description to
with ProgressBar():
I didn't mean to add some fixed text as a description like that

def _start_state(self, dsk, state):
        self._tqdm = tqdm(total=sum(len(state[k]) for k in ['ready', 'waiting', 'running', 'finished']), desc="some description")

I would want that every context manager would have it's own description; how can I modify ProgressBar class to allow such functionallity?

For example

with ProgressBar(desc="doing some stuff"):
    ----do some stuff---

with ProgressBar(desc="doing other stuff"):
    ----do other stuff----

@casperdcl casperdcl added this to the Non-breaking milestone Nov 24, 2020
@casperdcl casperdcl added to-merge ↰ Imminent and removed to-review 🔍 Awaiting final confirmation labels Nov 24, 2020
@thomasaarholt
Copy link

Ah, sure! I'm not able to check right now, but I reckon that if you add a

__init__(self, desc):
    self.desc=desc

And inside tqdm() set desc=self.desc, your examples should work. Let me know if it does.

casperdcl added a commit that referenced this issue Nov 24, 2020
- fixes #278
- replaces/closes #279
- based on `tqdm.keras`
@casperdcl casperdcl mentioned this issue Nov 24, 2020
3 tasks
@casperdcl
Copy link
Sponsor Member

try out #1079 (tqdm.dask) - this uses the same approach as tqdm.keras, including the recent fixes.

@deschman
Copy link

deschman commented Feb 22, 2021

I have been working with #1079 and have found it is not compatible with the most recent dask release 2021.2.0.

File "c:\users\deschman\spyder-env\lib\site-packages\dask\base.py", line 281, in compute
(result,) = compute(self, traverse=False, **kwargs)
File "c:\users\deschman\spyder-env\lib\site-packages\dask\base.py", line 563, in compute
results = schedule(dsk, keys, **kwargs)
File "c:\users\deschman\spyder-env\lib\site-packages\dask\threaded.py", line 76, in get
results = get_async(
File "c:\users\deschman\spyder-env\lib\site-packages\dask\local.py", line 502, in get_async
finish(dsk, state, not succeeded)
File "c:\users\deschman\spyder-env\lib\site-packages\reportio\future\tqdm\dask.py", line 37, in _finish
self.pbar.close()
AttributeError: 'TqdmCallback' object has no attribute 'pbar'

casperdcl added a commit that referenced this issue Feb 22, 2021
- fixes #278
- replaces/closes #279
- based on `tqdm.keras`
casperdcl added a commit that referenced this issue Mar 5, 2021
- fixes #278
- replaces/closes #279
- based on `tqdm.keras`
Casper automation moved this from In Progress to Done Mar 5, 2021
@thomasaarholt
Copy link

I came back to this after improving a StackOverflow post asking the same. The following is a working solution for dask progressbar based on tqdm:

from dask.callbacks import Callback
from tqdm.auto import tqdm

class ProgressBar(Callback):
    def __init__(self, desc=""):
        self.desc = desc

    def _start_state(self, dsk, state):
        self._tqdm = tqdm(total=sum(len(state[k]) for k in ['ready', 'waiting', 'running', 'finished']), desc=self.desc)

    def _posttask(self, key, result, dsk, state, worker_id):
        self._tqdm.update(1)

    def _finish(self, dsk, state, errored):
        pass

Use it as:

with ProgressBar("your description"):
    arr.compute() # your Dask computation here

@casperdcl
Copy link
Sponsor Member

@thomasaarholt
Copy link

Ah! Brilliant, I will use that instead!

@dendihandian
Copy link

how apply ProgressBar on apply() like result_df = raw_df.apply(lambda row: process_row(row), axis=1).compute()?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
p3-enhancement 🔥 Much new such feature submodule ⊂ Periphery/subclasses synchronisation ⇶ Multi-thread/processing to-merge ↰ Imminent
Projects
Casper
  
Done
Development

Successfully merging a pull request may close this issue.

10 participants