Skip to content

Commit

Permalink
Add column projection to dask
Browse files Browse the repository at this point in the history
  • Loading branch information
EthanRosenthal committed Nov 16, 2022
1 parent df25e79 commit 6b7452e
Show file tree
Hide file tree
Showing 8 changed files with 24 additions and 18 deletions.
16 changes: 9 additions & 7 deletions README.md
Expand Up @@ -2,23 +2,26 @@

A simple benchmark for python-based data processing libraries.

Guiding Principles:
## Guiding Principles:

- Python Only: I have no intention of using another language to process data. That said, non-Python implementations with Python bindings are perfectly fine (e.g. Polars).
- Single Node: I'm not interested in benchmarking performance on clusters of multiple machines.
- Simple Implementation: I'm not interested in heavily optimizing the benchmarking implementations. My target user is myself: somebody who largely uses pandas but would like to either speed up their computations or work with larger datasets than fit in memory on their machine.


Libraries:
## Libraries:

- ~~[Dask](https://www.dask.org/)~~ Not currently working on my machine (OOM)
- [Dask*](https://www.dask.org/)
- [dask-sql](https://dask-sql.readthedocs.io/en/latest/)
- [DuckDB](https://duckdb.org/)
- [Polars](https://www.pola.rs/)
- [Spark](https://spark.apache.org/docs/latest/api/python/)
- [Vaex](https://vaex.io/)

Results:
\*Dask required a slight, non-beginner optimization to successfully run it on my machine. Specifically, I had to do "manual column projection" by passing in the relevant calculation columns when reading in the dataset. I consider the Dask results to be slightly cheating, although this hack [may be resolved](https://github.com/dask/dask/issues/7933) natively in Dask in the not so distant future.


## Results:

The following results are from running the benchmark locally on my desktop that has:
- Intel Core i7-7820X 3.6 GHz 8-core Processor (16 virtual cores, I think)
Expand All @@ -27,7 +30,7 @@ The following results are from running the benchmark locally on my desktop that

Original 50-partition dataset:

![assets/benchmark.png](assets/benchmark.png)
![assets/benchmark_50.png](assets/benchmark_50.png)

Bakeoff as a function of partitions:

Expand All @@ -43,7 +46,7 @@ Since 2016, I have been pinging the public API every 2 minutes and storing the r

For the benchmark, I first convert the CSVs to snappy-compressed parquet files. This reduces the dataset down to ~4GB in size on disk.

# Computation
## Computation

To start, the bakeoff computation is extremely simple. It's almost "word count". I calculate the average number of bikes available at each station across the full time period of the dataset. In SQL, this is basically

Expand Down Expand Up @@ -108,7 +111,6 @@ docker cp medium-data-bakeoff:/app/results/* .

- Run benchmarks on AWS for reproducibility.
- Run benchmarks as a function of number of CPUs.
- Run benchmarks as a function of number of parquet files.
- Add some harder benchmark computations.
- Add more libraries (e.g. `cudf`).
- Benchmark memory usage.
Binary file removed assets/benchmark.png
Binary file not shown.
Binary file added assets/benchmark_50.png
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file modified assets/partition_benchmark.png
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
6 changes: 0 additions & 6 deletions assets/results.csv

This file was deleted.

7 changes: 7 additions & 0 deletions assets/results_50.csv
@@ -0,0 +1,7 @@
Library,Time (s),multiple
dask* (slightly optimized),11.725253820419312,5.416939032107234
dask_sql,15.683753967285156,7.245722807927704
duckdb,2.16455340385437,1.0
polars,3.482872486114502,1.6090489982426082
spark,10.895400285720825,5.03355577474764
vaex,22.58011507987976,10.431766220076563
5 changes: 1 addition & 4 deletions src/medium_data_bakeoff/bakeoff.py
Expand Up @@ -56,10 +56,7 @@ def bakeoff(num_partitions: int) -> None:
bakeoff = {}

recipe = [
# Dask was working on 2022.10.2. It got downgraded to 2022.10.0 after
# installing dask-sql, and the benchmark no longer works on my machine. I think
# it's running out of memory.
# ("dask", bake_dask),
("dask* (slightly optimized)", bake_dask),
("dask_sql", bake_dask_sql),
("duckdb", bake_duckdb),
("polars", bake_polars),
Expand Down
8 changes: 7 additions & 1 deletion src/medium_data_bakeoff/ingredients/dask.py
Expand Up @@ -10,7 +10,13 @@ def bake(dataset: str) -> float:
client = Client(cluster)
ProgressBar().register()
start = time.time()
df = dd.read_parquet(dataset, index=False)
# NOTE: We pass in the relevant columns for the calculation because Dask
# currently does not do predicate pushdown. This is noticeably a bit of a cheat
# and not fully in the spirit of the bakeoff given that this is somewhat
# advanced knowledge that a user coming from pandas may not have.
df = dd.read_parquet(
dataset, index=False, columns=["station_id", "num_bikes_available"]
)
df.groupby("station_id")["num_bikes_available"].mean().compute()
stop = time.time()
client.close()
Expand Down

2 comments on commit 6b7452e

@rjzamora
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for sharing this code @EthanRosenthal ! I didn't realize that dask was missing column projection for this case, so I submitted dask/dask#9667 with a possible fix.

Side Note: You will get column projection for the groupby().agg API. E.g.

df.groupby("station_id").agg({"num_bikes_available": "mean"}).compute()

@EthanRosenthal
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh cool, thanks for submitting a fix, and good to know about groupby().agg! Maybe I should change the implementation here 🤔

Please sign in to comment.