Skip to content

Commit

Permalink
Fix overlap so that set_index doesn't drop rows (#9423)
Browse files Browse the repository at this point in the history
  • Loading branch information
jsignell committed Sep 15, 2022
1 parent 982376e commit f45df2b
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 4 deletions.
10 changes: 6 additions & 4 deletions dask/dataframe/shuffle.py
Expand Up @@ -1029,10 +1029,12 @@ def fix_overlap(ddf, mins, maxes, lens):
# this partition (i) if the data from this partition will need to be moved
# to the next partition (i+1) anyway. If we concatenate data too early,
# we may lose rows (https://github.com/dask/dask/issues/6972).
if i == len(mins) - 2 or divisions[i] != divisions[i + 1]:
frames.append(ddf_keys[i])
dsk[(name, i)] = (methods.concat, frames)
frames = []
if divisions[i] == divisions[i + 1] and i + 1 in overlap:
continue

frames.append(ddf_keys[i])
dsk[(name, i)] = (methods.concat, frames)
frames = []

graph = HighLevelGraph.from_collections(name, dsk, dependencies=[ddf])
return new_dd_object(graph, name, ddf._meta, divisions)
Expand Down
12 changes: 12 additions & 0 deletions dask/dataframe/tests/test_multi.py
Expand Up @@ -713,6 +713,18 @@ def test_merge_asof_on_left_right(left_col, right_col):
assert_eq(result_df, result_dd, check_index=False)


def test_merge_asof_with_various_npartitions():
# https://github.com/dask/dask/issues/8999
df = pd.DataFrame(dict(ts=[pd.to_datetime("1-1-2020")] * 3, foo=[1, 2, 3]))
expected = pd.merge_asof(left=df, right=df, on="ts")

for npartitions in range(1, 5):
ddf = dd.from_pandas(df, npartitions=npartitions)

result = dd.merge_asof(left=ddf, right=ddf, on="ts")
assert_eq(expected, result)


@pytest.mark.parametrize("join", ["inner", "outer"])
def test_indexed_concat(join):
A = pd.DataFrame(
Expand Down
11 changes: 11 additions & 0 deletions dask/dataframe/tests/test_shuffle.py
Expand Up @@ -1182,6 +1182,17 @@ def test_set_index_overlap_2():
assert ddf2.npartitions == 8


def test_set_index_overlap_does_not_drop_rows_when_divisions_overlap():
# https://github.com/dask/dask/issues/9339
df = pd.DataFrame({"ts": [1, 1, 2, 2, 3, 3, 3, 3], "value": "abc"})
ddf = dd.from_pandas(df, npartitions=3)

expected = df.set_index("ts")
actual = ddf.set_index("ts", sorted=True)

assert_eq(expected, actual)


def test_compute_current_divisions_nan_partition():
# Compute divisions 1 null partition
a = d[d.a > 3].sort_values("a")
Expand Down

0 comments on commit f45df2b

Please sign in to comment.