diff --git a/dask/dataframe/shuffle.py b/dask/dataframe/shuffle.py index 7b229a06666..0c359e2ee24 100644 --- a/dask/dataframe/shuffle.py +++ b/dask/dataframe/shuffle.py @@ -1029,10 +1029,12 @@ def fix_overlap(ddf, mins, maxes, lens): # this partition (i) if the data from this partition will need to be moved # to the next partition (i+1) anyway. If we concatenate data too early, # we may lose rows (https://github.com/dask/dask/issues/6972). - if i == len(mins) - 2 or divisions[i] != divisions[i + 1]: - frames.append(ddf_keys[i]) - dsk[(name, i)] = (methods.concat, frames) - frames = [] + if divisions[i] == divisions[i + 1] and i + 1 in overlap: + continue + + frames.append(ddf_keys[i]) + dsk[(name, i)] = (methods.concat, frames) + frames = [] graph = HighLevelGraph.from_collections(name, dsk, dependencies=[ddf]) return new_dd_object(graph, name, ddf._meta, divisions) diff --git a/dask/dataframe/tests/test_multi.py b/dask/dataframe/tests/test_multi.py index b5b8d26ecf7..bde70f34ecd 100644 --- a/dask/dataframe/tests/test_multi.py +++ b/dask/dataframe/tests/test_multi.py @@ -712,6 +712,18 @@ def test_merge_asof_on_left_right(left_col, right_col): assert_eq(result_df, result_dd, check_index=False) +def test_merge_asof_with_various_npartitions(): + # https://github.com/dask/dask/issues/8999 + df = pd.DataFrame(dict(ts=[pd.to_datetime("1-1-2020")] * 3, foo=[1, 2, 3])) + expected = pd.merge_asof(left=df, right=df, on="ts") + + for npartitions in range(1, 5): + ddf = dd.from_pandas(df, npartitions=npartitions) + + result = dd.merge_asof(left=ddf, right=ddf, on="ts") + assert_eq(expected, result) + + @pytest.mark.parametrize("join", ["inner", "outer"]) def test_indexed_concat(join): A = pd.DataFrame( diff --git a/dask/dataframe/tests/test_shuffle.py b/dask/dataframe/tests/test_shuffle.py index 0c5e75cb104..1b2e53159e4 100644 --- a/dask/dataframe/tests/test_shuffle.py +++ b/dask/dataframe/tests/test_shuffle.py @@ -1182,6 +1182,17 @@ def test_set_index_overlap_2(): assert ddf2.npartitions == 8 +def test_set_index_overlap_does_not_drop_rows_when_divisions_overlap(): + # https://github.com/dask/dask/issues/9339 + df = pd.DataFrame({"ts": [1, 1, 2, 2, 3, 3, 3, 3], "value": "abc"}) + ddf = dd.from_pandas(df, npartitions=3) + + expected = df.set_index("ts") + actual = ddf.set_index("ts", sorted=True) + + assert_eq(expected, actual) + + def test_compute_current_divisions_nan_partition(): # Compute divisions 1 null partition a = d[d.a > 3].sort_values("a")