-
-
Notifications
You must be signed in to change notification settings - Fork 1.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
regression in a custom groupby.agg when split_out is not None #8611
Comments
It's probably not a regression in dask, but an existing issue with the interaction with geopandas.GeoDataFrame being a special sublcass. What changed in #8468, is that dask/dask/dataframe/groupby.py Lines 929 to 930 in 08eee6d
where A very simple example that reproduces the underlying issue (without involving groupby / apply_concat_apply): import geopandas
import dask_geopandas
df = geopandas.read_file(geopandas.datasets.get_path("naturalearth_lowres"))
ddf = dask_geopandas.from_geopandas(df, npartitions=4)
def func(df):
result = {}
for col in df.columns:
result["prefix-" + col] = df[col]
# naively recreating the class, without accounting for the renamed geometry column
return df.__class__(result)
>>> res = ddf.map_partitions(func)
>>> res
Dask GeoDataFrame Structure:
prefix-pop_est prefix-continent prefix-name prefix-iso_a3 prefix-gdp_md_est prefix-geometry
npartitions=4
0 int64 object object object float64 geometry
45 ... ... ... ... ... ...
90 ... ... ... ... ... ...
135 ... ... ... ... ... ...
176 ... ... ... ... ... ...
Dask Name: func, 8 tasks
>>> res._meta
Empty GeoDataFrame
Columns: [prefix-pop_est, prefix-continent, prefix-name, prefix-iso_a3, prefix-gdp_md_est, prefix-geometry]
Index: []
>>> res._meta._geometry_column_name
'geometry' So we get a dask_geopandas GeoDataFrame where the Since dask cannot know that it should handle GeoDataFrame construction in a special way inside |
Thanks for investigating this @jorisvandenbossche! Feel free to ping me if we need changes on the Dask side |
Maybe a general question about the groupby logic in dask: why is it needed to rename the columns (with prefix about the task and hash suffix) in the intermediary objects? |
The renaming seems to happen at dask/dask/dataframe/groupby.py Lines 574 to 575 in 08eee6d
If I naively change that to return the original |
We have discussed this issue with @jorisvandenbossche and it is unlikely that we will be able to fix it on geopandas or dask-geopandas side.
I am not certain what has changed in dask now, but it seems that until #8468 the renaming either did not happen or happened in a way that did not break this link column<-> |
Is this a regression we need to fix or revert before releasing dask/community#216? cc @jrbourbeau @martinfleis @jorisvandenbossche do you have a sense of what the fix would be? Not renaming the columns in groupby seems like one idea—I haven't checked yet to understand why we do that. But I get the sense that the more robust fix would be to use something more advanced than: dask/dask/dataframe/groupby.py Lines 929 to 930 in 08eee6d
Even if we stopped renaming the columns, if a user was using a geometry column that wasn't named |
It breaks
It probably would. When the GeoDataFrame is constructed from a dict as in this case, geopandas either checks for "geometry" or needs the geometry column to be specified. I think that we would need to pass a custom constructor that would get the information on the geometry column from |
Hmm - I am having a hard time figuring out how #8468 (or setting split_out) ended up changing behavior like this. It really seems like I'm really hoping to find exactly what changed in #8468, but it also seems like the assumption that this |
@rjzamora It had to be some commit merged on 22/1 and #8468 seems to be the only candidate.
I am not sure what do you mean but we need to use it, we can't just raise an informative error. The whole point of dask-geopandas is that is enables geospatial operations and the needs to include aggregations. The The invalid object is not created by user, it is created by dask and user has no way of influencing it as far as I can tell.
The actual aggregation of the column is not necessarily problematic. If you get a GeoSeries, you no longer have the issue with unset @meta_nonempty.register(geopandas.GeoDataFrame)
def _nonempty_geodataframe(x):
df = meta_nonempty_dataframe(x)
return geopandas.GeoDataFrame(df, geometry=x._geometry_column_name, crs=x.crs) Once Dask calls this after it renames the columns, |
Ah - I was completely focused on the |
It seems like the problem may only show up when a "non-empty" metadata object needs to be generated by geopandas. I experimented with some quick things in ACA, and it seems like this branch (which avoids the need to non-empty metadata in the |
@rjzamora |
Given your last comments it might now be clear to you, but just to reiterate in general, my understanding is that:
So indeed if we could avoid the use of While writing this up, I also thought of a potential solution on the dask-geopandas side in being a bit more flexible in creating the meta-nonempty: --- a/dask_geopandas/backends.py
+++ b/dask_geopandas/backends.py
@@ -56,8 +56,12 @@ def _nonempty_geoseries(x, idx=None):
@meta_nonempty.register(geopandas.GeoDataFrame)
def _nonempty_geodataframe(x):
df = meta_nonempty_dataframe(x)
- return geopandas.GeoDataFrame(df, geometry=x._geometry_column_name, crs=x.crs)
+ try:
+ return geopandas.GeoDataFrame(df, geometry=x._geometry_column_name, crs=x.crs)
+ except:
+ return geopandas.GeoDataFrame(df) This also "solves" the issue for this specific case. I am only not sure if it won't give problems in other cases. |
@jorisvandenbossche I think that the solution avoiding call to |
@jorisvandenbossche - As far as I can tell, your understanding is 100% correct. I'm sorry I didn't get a chance to explain in detail, and appreciate you writing everything up! I think it makes sense to "fix" this issue by either avoiding the call to meta_nonempty when the output will be invalid, or by making the |
What happened:
It seems that the changes in #8468 broke custom aggregation over in
dask_geopandas
, where we implement our own function to be able to aggregate geometries. When specifyingsplit_out
in the.agg
method, it currently raises an error. Withsplit_out=None
, it is all fine, so I think that this is an issue on the dask side, not on dask_geopandas side.What you expected to happen:
I expect that the aggregation will work no matter if I specify
split_out
or not.Minimal Complete Verifiable Example:
This example requires
geopandas
anddask_geopandas
, I wasn't able to craft one with pure dask.Traceback:
~/mambaforge/envs/geo_dev/lib/python3.9/site-packages/pandas/_libs/index.pyx in pandas._libs.index.IndexEngine.get_loc()
~/mambaforge/envs/geo_dev/lib/python3.9/site-packages/pandas/_libs/index.pyx in pandas._libs.index.IndexEngine.get_loc()
pandas/_libs/hashtable_class_helper.pxi in pandas._libs.hashtable.PyObjectHashTable.get_item()
pandas/_libs/hashtable_class_helper.pxi in pandas._libs.hashtable.PyObjectHashTable.get_item()
KeyError: 'geometry'
The above exception was the direct cause of the following exception:
KeyError Traceback (most recent call last)
~/Git/geopandas/geopandas/geodataframe.py in set_geometry(self, col, drop, inplace, crs)
310 try:
--> 311 level = frame[col]
312 except KeyError:
~/Git/geopandas/geopandas/geodataframe.py in getitem(self, key)
1349 """
-> 1350 result = super().getitem(key)
1351 geo_col = self._geometry_column_name
~/mambaforge/envs/geo_dev/lib/python3.9/site-packages/pandas/core/frame.py in getitem(self, key)
3457 return self._getitem_multilevel(key)
-> 3458 indexer = self.columns.get_loc(key)
3459 if is_integer(indexer):
~/mambaforge/envs/geo_dev/lib/python3.9/site-packages/pandas/core/indexes/base.py in get_loc(self, key, method, tolerance)
3362 except KeyError as err:
-> 3363 raise KeyError(key) from err
3364
KeyError: 'geometry'
During handling of the above exception, another exception occurred:
ValueError Traceback (most recent call last)
~/mambaforge/envs/geo_dev/lib/python3.9/site-packages/dask/dataframe/utils.py in raise_on_meta_error(funcname, udf)
174 try:
--> 175 yield
176 except Exception as e:
~/mambaforge/envs/geo_dev/lib/python3.9/site-packages/dask/dataframe/core.py in _emulate(func, udf, args, **kwargs)
5965 with raise_on_meta_error(funcname(func), udf=udf):
-> 5966 return func(_extract_meta(args, True), **_extract_meta(kwargs, True))
5967
~/mambaforge/envs/geo_dev/lib/python3.9/site-packages/dask/dataframe/core.py in _extract_meta(x, nonempty)
5945 elif isinstance(x, tuple):
-> 5946 return tuple(_extract_meta(_x, nonempty) for _x in x)
5947 elif isinstance(x, dict):
~/mambaforge/envs/geo_dev/lib/python3.9/site-packages/dask/dataframe/core.py in (.0)
5945 elif isinstance(x, tuple):
-> 5946 return tuple(_extract_meta(_x, nonempty) for _x in x)
5947 elif isinstance(x, dict):
~/mambaforge/envs/geo_dev/lib/python3.9/site-packages/dask/dataframe/core.py in _extract_meta(x, nonempty)
5941 if isinstance(x, (Scalar, _Frame)):
-> 5942 return x._meta_nonempty if nonempty else x._meta
5943 elif isinstance(x, list):
~/mambaforge/envs/geo_dev/lib/python3.9/site-packages/dask/dataframe/core.py in _meta_nonempty(self)
430 """A non-empty version of
_meta
with fake data."""--> 431 return meta_nonempty(self._meta)
432
~/mambaforge/envs/geo_dev/lib/python3.9/site-packages/dask/utils.py in call(self, arg, *args, **kwargs)
623 meth = self.dispatch(type(arg))
--> 624 return meth(arg, *args, **kwargs)
625
~/Git/dask-geopandas/dask_geopandas/backends.py in _nonempty_geodataframe(x)
59 df = meta_nonempty_dataframe(x)
---> 60 return geopandas.GeoDataFrame(df, geometry=x._geometry_column_name, crs=x.crs)
61
~/Git/geopandas/geopandas/geodataframe.py in init(self, data, geometry, crs, *args, **kwargs)
203 # TODO: raise error in 0.9 or 0.10.
--> 204 self.set_geometry(geometry, inplace=True)
205
~/Git/geopandas/geopandas/geodataframe.py in set_geometry(self, col, drop, inplace, crs)
312 except KeyError:
--> 313 raise ValueError("Unknown column %s" % col)
314 except Exception:
ValueError: Unknown column geometry
The above exception was the direct cause of the following exception:
ValueError Traceback (most recent call last)
/var/folders/2f/fhks6w_d0k556plcv3rfmshw0000gn/T/ipykernel_44916/2101873994.py in
12 data_agg = {col: aggfunc for col in ddf.columns.drop([ddf.geometry.name])}
13 data_agg[ddf.geometry.name] = merge_geometries
---> 14 aggregated = ddf.groupby(by="continent").agg(data_agg, split_out=2)
~/mambaforge/envs/geo_dev/lib/python3.9/site-packages/dask/dataframe/groupby.py in agg(self, arg, split_every, split_out)
2006 @derived_from(pd.core.groupby.DataFrameGroupBy)
2007 def agg(self, arg, split_every=None, split_out=1):
-> 2008 return self.aggregate(arg, split_every=split_every, split_out=split_out)
2009
2010
~/mambaforge/envs/geo_dev/lib/python3.9/site-packages/dask/dataframe/groupby.py in aggregate(self, arg, split_every, split_out)
2002 return self.size()
2003
-> 2004 return super().aggregate(arg, split_every=split_every, split_out=split_out)
2005
2006 @derived_from(pd.core.groupby.DataFrameGroupBy)
~/mambaforge/envs/geo_dev/lib/python3.9/site-packages/dask/dataframe/groupby.py in aggregate(self, arg, split_every, split_out)
1619 )
1620
-> 1621 return aca(
1622 chunk_args,
1623 chunk=_groupby_apply_funcs,
~/mambaforge/envs/geo_dev/lib/python3.9/site-packages/dask/dataframe/core.py in apply_concat_apply(args, chunk, aggregate, combine, meta, token, chunk_kwargs, aggregate_kwargs, combine_kwargs, split_every, split_out, split_out_setup, split_out_setup_kwargs, sort, ignore_index, **kwargs)
5878 # Blockwise Split Layer
5879 if split_out and split_out > 1:
-> 5880 chunked = chunked.map_partitions(
5881 hash_shard,
5882 split_out,
~/mambaforge/envs/geo_dev/lib/python3.9/site-packages/dask/dataframe/core.py in map_partitions(self, func, *args, **kwargs)
769 None as the division.
770 """
--> 771 return map_partitions(func, self, *args, **kwargs)
772
773 @insert_meta_param_description(pad=12)
~/mambaforge/envs/geo_dev/lib/python3.9/site-packages/dask/dataframe/core.py in map_partitions(func, meta, enforce_metadata, transform_divisions, align_dataframes, *args, **kwargs)
6033 # Use non-normalized kwargs here, as we want the real values (not
6034 # delayed values)
-> 6035 meta = _emulate(func, *args, udf=True, **kwargs)
6036 else:
6037 meta = make_meta(meta, index=meta_index, parent_meta=parent_meta)
~/mambaforge/envs/geo_dev/lib/python3.9/site-packages/dask/dataframe/core.py in _emulate(func, udf, args, **kwargs)
5964 """
5965 with raise_on_meta_error(funcname(func), udf=udf):
-> 5966 return func(_extract_meta(args, True), **_extract_meta(kwargs, True))
5967
5968
~/mambaforge/envs/geo_dev/lib/python3.9/contextlib.py in exit(self, typ, value, traceback)
135 value = typ()
136 try:
--> 137 self.gen.throw(typ, value, traceback)
138 except StopIteration as exc:
139 # Suppress StopIteration unless it's the same exception that
~/mambaforge/envs/geo_dev/lib/python3.9/site-packages/dask/dataframe/utils.py in raise_on_meta_error(funcname, udf)
194 )
195 msg = msg.format(f" in
{funcname}
" if funcname else "", repr(e), tb)--> 196 raise ValueError(msg) from e
197
198
ValueError: Metadata inference failed in
hash_shard
.You have supplied a custom function and Dask is unable to
determine the type of output that that function returns.
To resolve this please provide a meta= keyword.
The docstring of the Dask function you ran should have more information.
Original error is below:
ValueError('Unknown column geometry')
Traceback:
File "/Users/martin/mambaforge/envs/geo_dev/lib/python3.9/site-packages/dask/dataframe/utils.py", line 175, in raise_on_meta_error
yield
File "/Users/martin/mambaforge/envs/geo_dev/lib/python3.9/site-packages/dask/dataframe/core.py", line 5966, in _emulate
return func(*_extract_meta(args, True), **_extract_meta(kwargs, True))
File "/Users/martin/mambaforge/envs/geo_dev/lib/python3.9/site-packages/dask/dataframe/core.py", line 5946, in _extract_meta
return tuple(_extract_meta(_x, nonempty) for _x in x)
File "/Users/martin/mambaforge/envs/geo_dev/lib/python3.9/site-packages/dask/dataframe/core.py", line 5946, in
return tuple(_extract_meta(_x, nonempty) for _x in x)
File "/Users/martin/mambaforge/envs/geo_dev/lib/python3.9/site-packages/dask/dataframe/core.py", line 5942, in _extract_meta
return x._meta_nonempty if nonempty else x._meta
File "/Users/martin/mambaforge/envs/geo_dev/lib/python3.9/site-packages/dask/dataframe/core.py", line 431, in _meta_nonempty
return meta_nonempty(self._meta)
File "/Users/martin/mambaforge/envs/geo_dev/lib/python3.9/site-packages/dask/utils.py", line 624, in call
return meth(arg, *args, **kwargs)
File "/Users/martin/Git/dask-geopandas/dask_geopandas/backends.py", line 60, in _nonempty_geodataframe
return geopandas.GeoDataFrame(df, geometry=x._geometry_column_name, crs=x.crs)
File "/Users/martin/Git/geopandas/geopandas/geodataframe.py", line 204, in init
self.set_geometry(geometry, inplace=True)
File "/Users/martin/Git/geopandas/geopandas/geodataframe.py", line 313, in set_geometry
raise ValueError("Unknown column %s" % col)
The text was updated successfully, but these errors were encountered: