-
-
Notifications
You must be signed in to change notification settings - Fork 250
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
support for non dask arrays for HyperbandSearchCV #751
base: main
Are you sure you want to change the base?
Conversation
…hanged the async def _fit function to allow Hyperband to work with non dask arrays
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR!
Could you also implement a short test for this? The custom data/model doesn't have to be complex, something like
from sklearn.base import BaseEstimator
class CustomDataLoader:
def __len__(self):
return 100
...
class CustomModel(BaseEstimator):
def _partial_fit(self, X, y=None, **kwargs):
assert isinstance(X, CustomDataLoader)
return self
fit = partial_fit = _partial_fit
def score(self, X, y=None):
assert isinstance(X, CustomDataLoader)
return np.random.uniform()
|
||
train_eg = await client.gather(client.map(len, y_train)) | ||
msg = "[CV%s] For training there are between %d and %d examples in each chunk" | ||
logger.info(msg, prefix, min(train_eg), max(train_eg)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why won't this code work?
train_eg = ...
msg = ...
if len(train_eg):
logger.info(msg, prefix, min(train_eg), max(train_eg))
This avoids min([])
as mentioned in #748 (comment).
Then, I think the if-statement could be moved inside get_futures
:
def get_futures(partial_fit_calls):
if not isinstance(X_train, da.Array):
return X_train, y_train
... # existing implementation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you are right, I was not careful when I red your comment, this would work.
The reason why I did not implement it that way is because saying if len(train_eg):
or even shorter if train_eg
is the same as checking isinstance(X_train, da.Array)
because train_eg
will be empty only if X_train
is not a da.Array
. This means that in your implementation you are still executing these lines, even if not needed:
X_train = sorted(futures_of(X_train), key=lambda f: f.key)
y_train = sorted(futures_of(y_train), key=lambda f: f.key)
assert len(X_train) == len(y_train)
train_eg = await client.gather(client.map(len, y_train))
finally the reason why it might be better not to incorporate the if statement inside the get_futures
function is because this will force you to check the condition every time you call the function which will not be ideal for dask.collections
with a large number of partitions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the logging message should be issued if the dataset object has an implementation of __len__
. The computation of train_eg
only happens once (train_eg
is a list of ints).
I don't like two separate definitions of get_futures
, especially if the both return training data.
train_eg = await client.gather(client.map(len, y_train)) | ||
msg = "[CV%s] For training there are between %d and %d examples in each chunk" | ||
logger.info(msg, prefix, min(train_eg), max(train_eg)) | ||
if hasattr(X_train, 'npartitions'): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can this if-statement be changed to if isinstance(X_train, da.Array)
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the reason I don't want to restrict the if statement to be isinstance(X_train, da.Array)
is because, potentially any custom data structure like the hypotetical CustomFrame
could have a dask-like
API by implementing npartitions
and __dask_graph__
.
In this case, for example, in the future I could further extend my CustomFrame
(I or any other user) to work with data larger than memory and this would still work with hyperband
even if is not a da.Array
, because the API is compatible.
With this in mind maybe a good compromise could be:
if dask.is_dask_collection(X_train)
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if dask.is_dask_collection(X_train)
I think that's a better choice (though @TomAugspurger might have more input).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I think we is_dask_collection
is better.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Even just testing with a NumPy array or pandas DataFrame would be good.
train_eg = await client.gather(client.map(len, y_train)) | ||
msg = "[CV%s] For training there are between %d and %d examples in each chunk" | ||
logger.info(msg, prefix, min(train_eg), max(train_eg)) | ||
if hasattr(X_train, 'npartitions'): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I think we is_dask_collection
is better.
- change in checking array using dask.is_dask_collection - added test in tests.model_selection.test_hyperband_non_daskarray.py
@stsievert I implemented the changes we discussed. Cheers |
@stsievert do you have another chance to look at this? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm surprised this Hyperband/Incremental didn't support Pandas Dataframes already. That shouldn't be surprising – Dask DataFrames weren't supported until #701, and I have verified that the test in this PR fails on v1.7.0.
This PR LGTM, past a couple style nits and one implementation detail.
# Shuffle blocks going forward to get uniform-but-random access | ||
while partial_fit_calls >= len(order): | ||
L = list(range(len(X_train))) | ||
rng.shuffle(L) | ||
order.extend(L) | ||
j = order[partial_fit_calls] | ||
return X_train[j], y_train[j] | ||
### end addition ### |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Style nit: could this comment be removed?
@@ -218,13 +220,20 @@ def get_futures(partial_fit_calls): | |||
This function handles that policy internally, and also controls random | |||
access to training data. | |||
""" | |||
if dask.is_dask_collection(y_train): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this should only check X_train
because y_train
is an optional argument.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've done some tracing and it appears that passing y=None
into search.fit
isn't supported? That's unrelated to this PR.
|
||
# Order by which we process training data futures | ||
order = [] | ||
### start addition ### |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Style nit: can this comment be removed?
Hi @stsievert I agree with you in principle the check should be done on EDIT: Done |
@gen_cluster(client=True) | ||
def test_pandas(c, s, a, b): | ||
X, y = make_classification(chunks=100) | ||
X, y = pd.DataFrame(X.compute()), pd.Series(y.compute()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like the test is failing in the .compute
function. Why not use from sklearn.datasets import make_classification
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's strange, it was working on my local machine.
Let me try to change it. Maybe it's something to do with how I wrote the test function. I just tried to emulate the structure I found in test_hyperband.py
|
||
|
||
@gen_cluster(client=True) | ||
def test_pandas(c, s, a, b): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could this test go in test_hyperband.py
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes let me move it there
…d dask make_classification to sklearn make_calssification
does anyone have any idea of what happened with the tests? many thanks |
I'm not sure, but it does look there's a linting issue (both black and isort fail):
Try making those lint changes and pushing; that will (likely) resolve the issue. |
I've fixed the linting changes locally (and substantially reduced the computational size of the test), but the |
The main changes in this PR are:
change to the async def _fit function in
dask_ml.model_selection._incremental.py
to allow Hyperband to work with non dask arraysfixed default
test_size
which didn't work with pandas dataframe