Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feedback - DataFrame query planning #10995

Open
fjetter opened this issue Mar 12, 2024 · 12 comments
Open

Feedback - DataFrame query planning #10995

fjetter opened this issue Mar 12, 2024 · 12 comments
Labels
dask-expr dataframe discussion Discussing a topic with no specific actions yet

Comments

@fjetter
Copy link
Member

fjetter commented Mar 12, 2024

The latest release 2024.3.0 enabled query planning for DataFrames by default. This issue can be used to report feedback and ask related questions.

If you encountered a bug or unexpected behavior, please check if you got the most recent version of dask-expr installed. This is a separate package with a decoupled release process allowing us to roll out hotfixes quickly.
If you are still encountering issues after an update, please open an issue with a reproducer and we will respond as soon as possible.

See below for a list of known issues and/or check the issue tracker

Brief introduction

The legacy DataFrame implementation did not offer a way to optimize your query and dask executed whatever you requested literally. In many situations this was suboptimal and could cause significant performance overhead.

Let's take a simple example by using the NYC Uber/Lyft dataset

df = read_parquet(
    "s3://coiled-datasets/uber-lyft-tlc/",
    filesystem='pyarrow',
)
df = df[df.hvfhs_license_num == "HV0003"] 
result = df.sum(numeric_only=True)["tips"]

This query loads the data, applies a filter on the vendor, calculates a sum and picks a single column for the result. The legacy DataFrame would load all data, only then apply the filter, compute the sum over all columns and then throw all the data away since we're only interested in a single column.

Advanced users may be tempted to rewrite this to an optimized version that provides the required columns and filters to the read_parquet call already but this is now done automatically for you. After optimization, the above query is identical to

df_man_opt = dd.read_parquet(
    "s3://coiled-datasets/uber-lyft-tlc/",
    columns=["tips"],
    filters=[("hvfhs_license_num", "==", "HV0003")],
)
result_man_opt = df_man_opt.sum(numeric_only=True)

which loads only the required columns from storage and applies filters even on rowgroup level, if applicable.

result.simplify().pprint()

Projection: columns='tips'
  Sum: numeric_only=True
    ReadParquetPyarrowFS: path='s3://coiled-datasets/uber-lyft-tlc/' columns=['tips'] filters=[[('hvfhs_license_num', '==', 'HV0003')]] filesystem='pyarrow'

Column projection and predicate pushdown are only some of the most obvious optimizations that are performed automaticaly for you.

See for yourself and let us know what you think.

Known issues

dask-expr is not installed when updating dask

When using a package manager like pip it can happen that an upgrade like pip update dask does not pull in the extra dependencies of dask.dataframe. To ensure all dependencies are installed, please install dask[dataframe] or use conda install dask

Pandas copy-on-write enabled as an import side effect

Prior to version dask-expr==1.0.2, importing dask.dataframe set the pandas option copy-on-write to True

See also #10996

@thomas-fred
Copy link

thomas-fred commented Mar 15, 2024

Hello, I'm wondering if I've stumbled into a bug/change in usage with updating from 2024.2.0 to 2024.3.0. With the former, the below works, and indicies is bound to a list of indicies. With the latter, I get an empty list.

df = dask.dataframe.read_parquet(<file_path>)
indicies = list(set(df.index))

The strange thing is that with 2024.3.0, the following seems to know about the index values, look at the repr str..!

(Pdb) df.index.compute()
Empty DataFrame
Columns: []
Index: [2007345N18298, <truncated>, 2022255N15324]

But actually try and access those values ...

(Pdb) df.index.compute().values
array([], shape=(10, 0), dtype=float64)

Here's the data I'm trying to read as zipped parquet: test_data.zip

I installed both dask versions using micromamba 1.5.7.

@jackguac
Copy link

jackguac commented Mar 21, 2024

Using drop(col, axis=1) drops any column which begins with the prefix col, but everything is fine when using drop(columns=col) and also was fine I think on 2024.2.1. Here's an example:

import dask.dataframe as dd
import pandas as pd

# Create a range of dates
dates = pd.date_range(start='2024-01-01', periods=9, freq='D')

# Create a Pandas DataFrame with a series of numbers including NaN values and a datetime index
pdf = pd.DataFrame({
    'n1': [1, 1, 2, 2, 2, 2, 2, 2, 2],
    'n2': [1, pd.NA, 1, 5, pd.NA, 7, 8, 9, 10],
}, index=dates)

# Convert the Pandas DataFrame to a Dask DataFrame
ddf = dd.from_pandas(pdf, npartitions=2)

copy = ddf.copy().rename(columns={"n2": "n1_new"})
ddf = ddf.merge(copy, on=["n1"], how="left")

# THIS ONE REMOVES BOTH n1_new and n1:
ddf = ddf.drop("n1_new", axis=1) 
# THIS ONE (CORRECTLY) REMOVES JUST n1_new: 
# ddf = ddf.drop(columns=["n1_new"])

ddf.head()

@fjetter
Copy link
Member Author

fjetter commented Mar 22, 2024

@thomas-fred I would consider it best practice to have an example prepared that does not involve sharing the actual parquet file regardless of how small it is. Parquet has been known to be vulnerable to arbitrary code execution. While the known issues have been fixed (see https://www.cve.org/CVERecord?id=CVE-2023-47248) it still requires a little trust to load a file from an otherwise unknown source.

I tried reproducing what you're describing as

import dask
import dask.dataframe as dd
import pandas as pd
pdf = pd.DataFrame({"foo": range(5)}, index=range(50, 55))
pdf.to_parquet("test.parquet")
dd.read_parquet("test.parquet").index.compute()

and encountered this error dask/dask-expr#993

can you attempt to recreate your issue this way?

@fjetter
Copy link
Member Author

fjetter commented Mar 22, 2024

@jackguac I have a fix for this up here dask/dask-expr#992

@benrutter
Copy link
Contributor

benrutter commented Mar 22, 2024

General feedback: I'm really excited about dask getting a query optimiser, but seems like there's a few sharp edges still.

We (my work) have library with a bunch of data pipeline functions, some of which are fairly involved, and all of which use dask.

I trialled bumping up the dask version today and our test suite hit 2 (previously passing) fails before hanging indefinitely.

I've raised a couple bugs for the issues I can recreate- will keep digging around the specifics and raise any bugs I see (and put in a PR if I track down the specific cause). In the meantime, we're setting query planning to false and will check in again with the next dask version.

Hope this is useful feedback btw, very excited about dask expressions becoming a thing.

@thomas-fred
Copy link

thomas-fred commented Mar 25, 2024

Hello @fjetter, thanks for getting back so promptly. I didn't know about arbitrary code execution issues with parquet files. I knew that my example wasn't ideal, but I also wanted to post something before I stopped work for the weekend. Anyway, I have played with your example and can reproduce your ZeroDivisionError.

I think the specific problem I ran into concerns accessing the values of a string index:

import dask.dataframe as dd
import pandas as pd
pdf = pd.DataFrame({"foo": range(3)}, index=["a", "b", "c"])
pdf.to_parquet("test.pq")
df = dd.read_parquet("test.pq")
print(f"{df.index.compute()=}")
print(f"{df.index.compute().values=}")

@phofl
Copy link
Collaborator

phofl commented Mar 25, 2024

Hi @thomas-fred

thanks for providing the reproducer. We have a fix here: dask/dask-expr#1000

@avriiil
Copy link
Contributor

avriiil commented Apr 18, 2024

hey folks :) I'm getting a 404 error when trying to access the docs for dask-expr as listed in the changelog. Is there somewhere else I should be looking for API reference?

@phofl
Copy link
Collaborator

phofl commented Apr 18, 2024

Sorry about that, https://docs.dask.org/en/stable/dataframe-api.html we switched the link

@kritikakshirsagar03
Copy link

kritikakshirsagar03 commented Apr 29, 2024

Here is the Documentation feedback for Dask data frame query planning:
Documentation_Feedback.pdf

1.1 Reading the dataset with Dask and obtaining the time it takes to query the “retail value” column from the dataset:
import dask.dataframe as dd
import os
import time

Function to measure the time taken to query 'RetailValue' column from CSV files with Dask

def query_retail_value(directory, file_names):
start_time = time.time()
retail_values = []

for filename in file_names:
    df = dd.read_csv(os.path.join(directory, filename))
    
    Perform a query to get the 'RetailValue' column:
    retail_value = df['retailvalue'].compute()
    
    Store the retail values in a list:
    retail_values.append(retail_value)

end_time = time.time()
print("Time taken by Dask to query 'RetailValue' column from all datasets:", end_time - start_time)

return retail_values

Directory containing the CSV files:
directory = r'C:\Users....\Downloads'

List of file names:
file_names = ['utrechthousingsmall.csv', 'utrechthousinglarge.csv', 'utrechthousinghuge.csv']

Measure the time taken to query 'RetailValue' column from CSV files with Dask:
retail_values = query_retail_value(directory, file_names)

Example: Print the first 10 retail values from each dataset:
for i, values in enumerate(retail_values):
print(f"Dataset {i+1} Retail Values:")
print(values.head(10)) # Just printing the first 10 values for demonstration purposes
print()

1.2 Reading the dataset with pandas and obtaining the time it takes to query the “retail value” column from the dataset:
import pandas as pd
import os
import time

Function to measure the time taken to query 'RetailValue' column from CSV files with Pandas

def query_retail_value_with_pandas(directory, file_names):
start_time = time.time()
retail_values = []

for filename in file_names:
    df = pd.read_csv(os.path.join(directory, filename))
    
    Perform a query to get the 'RetailValue' column:
    retail_value = df['retailvalue']
    
    Store the retail values in a list:
    retail_values.append(retail_value)

end_time = time.time()
print("Time taken by Pandas to query 'RetailValue' column from all datasets:", end_time - start_time)

return retail_values

The directory containing the CSV files:
directory = r'C:\Users...\Downloads'

List of file names:
file_names = ['utrechthousingsmall.csv', 'utrechthousinglarge.csv', 'utrechthousinghuge.csv']

Measure the time taken to query the 'RetailValue' column from CSV files with Pandas:
retail_values = query_retail_value_with_pandas(directory, file_names)

Example: Print the first 10 retail values from each dataset:
for i, values in enumerate(retail_values):
print(f"Dataset {i+1} Retail Values:")
print(values.head(10)) # Just printing the first 10 values for demonstration purposes
print()

@phofl
Copy link
Collaborator

phofl commented Apr 29, 2024

@kritikakshirsagar03 Could you add a bit more context what you want to tell us with this?

@zmbc
Copy link

zmbc commented May 13, 2024

xref dask/dask-expr#1060 -- looks like a fix is already in the works!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
dask-expr dataframe discussion Discussing a topic with no specific actions yet
Projects
None yet
Development

No branches or pull requests

8 participants