Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Read_parquet is slower than expected with S3 #9619

Open
mrocklin opened this issue Nov 3, 2022 · 51 comments
Open

Read_parquet is slower than expected with S3 #9619

mrocklin opened this issue Nov 3, 2022 · 51 comments
Labels
dataframe io needs attention It's been a while since this was pushed on. Needs attention from the owner or a maintainer. parquet

Comments

@mrocklin
Copy link
Member

mrocklin commented Nov 3, 2022

I was looking at a read_parquet profile with @th3ed @ncclementi and @gjoseph92

Looking at this performance report: https://raw.githubusercontent.com/coiled/h2o-benchmarks/main/performance-reports-pyarr_str-50GB/q1_50GB_pyarr.html
I see the following analysis (two minute video): https://www.loom.com/share/4c8ad1c5251a4e658c1c47ee2113f34a

We're spending only about 20-25% of our time reading from S3, and about 5% of our time converting data to Pandas. We're spending a lot of our time doing something else.

@gjoseph92 took a look at this with pyspy and generated reports like the following: tls-10_0_0_177-42425.json

I'm copying a note from him below:

What you'll see from this is that pyarrow isn't doing the actual reads. Because dask uses s3fs, the C++ arrow code has to call back into Python for each read. Ultimately, the reads are actually happening on the fsspec event loop (see the fsspecIO thread in profiles). If we look there, about 40% of CPU time is spent waiting for something (aka data from S3, good), but 60% is spent doing stuff in Python (which I'd consider overhead, to some degree).

We can also see that 30% of the total time is spent blocking on Python's GIL (all the pthread_cond_timedwaits) (look at the functions calling into this and the corresponding lines in the Python source if you don't believe me; they're all Py_END_ALLOW_THREADS). This is an issue known as the convoy effect: https://bugs.python.org/issue7946, dask/distributed#6325.

My takeaway is that using fsspec means dask is using Python for reads, which might be adding significant overhead / reducing parallelism due to the GIL.

I'd be interested in doing a comparison by hacking together a version that bypasses fsspec, and uses pyarrow's native S3FileSystem directly. Before that though, it might be good to get some baseline numbers on how fast we can pull the raw data from S3 (just as bytes), to understand what performance we can expect.

FYI I also tried https://developer.nvidia.com/blog/optimizing-access-to-parquet-data-with-fsspec/, but it was ~2x slower. Haven't tried repeating that though, so not sure if it's a real result.

One other thing I find surprising is that polars appears to be using fsspec for reads as well, rather than the native S3FileSystem or GCSFileSystem:
https://github.com/pola-rs/polars/blob/445c550e8f965d9e8f2da1cb2d01b6c15874f6c8/py-polars/polars/io.py#L949-L956
https://github.com/pola-rs/polars/blob/445c550e8f965d9e8f2da1cb2d01b6c15874f6c8/py-polars/polars/internals/io.py#L114-L121

I would have expected polars and dask read performance to be closer in this case. We should probably confirm for ourselves that they're not.

It looks like we could make things a lot faster. I'm curious about the right steps to isolate the problem further.

cc'ing @martindurant @rjzamora @ritchie46 @fjetter

@github-actions github-actions bot added the needs triage Needs a response from a contributor label Nov 3, 2022
@ncclementi ncclementi added dataframe io parquet and removed needs triage Needs a response from a contributor labels Nov 3, 2022
@rjzamora
Copy link
Member

rjzamora commented Nov 3, 2022

I'd be interested in doing a comparison by hacking together a version that bypasses fsspec, and uses pyarrow's native S3FileSystem directly. Before that though, it might be good to get some baseline numbers on how fast we can pull the raw data from S3 (just as bytes), to understand what performance we can expect.

Note that you should already be able to do this by passing open_file_options={"open_file_func": <pyarrow-file-open-func>} to dd.read_parquet. For example:

import dask.dataframe as dd
import pyarrow as pa
import pyarrow.fs as pa_fs

path = "s3://ursa-labs-taxi-data/2009/01/data.parquet"
fs = pa_fs.S3FileSystem(anonymous=True)

ddf = dd.read_parquet(
    path,
    engine="pyarrow",
    storage_options={"anon": True},
    open_file_options={
        "open_file_func": fs.open_input_file,
    },
)

ddf.partitions[0].compute()

Using fs.open_input_file does cut my wall time by ~50% for this simple example.

@martindurant
Copy link
Member

If we are talking about IO latency problems, then obviously the chunksize and caching strategy in the fsspec filelike will be very important, as pyarrow treats it like a regular file with many small reads. This is why we created fsspec.parquet to preemptively fetch all the bytes you will be needing before handing them to arrow.

@rjzamora
Copy link
Member

rjzamora commented Nov 3, 2022

pyarrow treats it like a regular file with many small reads

Pyarrow actually doesn't do this anymore. They now expose a pre_buffer option in read_table (and other read functions) to do something similar to what fsspec.parquet does. Therefore the performance difference may really be python-overhead related.

@martindurant
Copy link
Member

@rjzamora - worth testing! I wonder if they read the buffers concurrently.

@mrocklin
Copy link
Member Author

mrocklin commented Nov 3, 2022

Using fs.open_input_file does cut my wall time by ~50% for this simple example

I'll be dumb for a moment. If there aren't any backend options specified, and if arrow is present, then should we switch to using Arrow by default for things like this?

@rjzamora
Copy link
Member

rjzamora commented Nov 3, 2022

If there aren't any backend options specified, and if arrow is present, then should we switch to using Arrow by default for things like this?

The issue is that Dask has adopted fsspec as it's standard filesystem interface, and the fsspec API is not always aligned with the pyarrow.fs API. Therefore, the user would still need to pass in fsspec-based storage_options to read_parquet, and those options may be slightly different than the options needed to initialize an s3fs instance. This is why the code I shared above requires the user to create the pyarrow filesystem themselves.

Although, I guess you are explicitly asking "If there aren't any backend options specified". So, I suppose that case may work.

@mrocklin
Copy link
Member Author

mrocklin commented Nov 3, 2022 via email

@rjzamora
Copy link
Member

rjzamora commented Nov 3, 2022

If storage_options was empty could we safely do this?

Yes. Sorry - I hit enter too early. We would still need to use fsspec for graph construction, but would be able to use pyarrow to open the file at data-reading time if storage_options was empty anyway.

Note that we could certainly add the option to use a pyarrow filesystem throught the read_parquet code (I've implemented this a few different times), but it's not a trivial change.

@gjoseph92
Copy link
Collaborator

A 50% speedup seems worth a non-trivial change to me?

@rjzamora thanks for sharing the snippet to do this. We should benchmark this on Coiled as well to get some more data points.

I wonder if there's some subset of commonly-used fsspec options that we could easily translate into arrow?

@martindurant
Copy link
Member

It's worth turning on logging

fsspec.utils.setup_logging(logger_name="s3fs")

creating dataframe:

2022-11-03 20:47:02,664 - s3fs - DEBUG - _lsdir -- Get directory listing page for ursa-labs-taxi-data/2009/01/data.parquet
2022-11-03 20:47:02,937 - s3fs - DEBUG - _call_s3 -- CALL: head_object - ({},) - {'Bucket': 'ursa-labs-taxi-data', 'Key': '2009/01/data.parquet'}
2022-11-03 20:47:02,993 - s3fs - DEBUG - _call_s3 -- CALL: head_object - ({},) - {'Bucket': 'ursa-labs-taxi-data', 'Key': '2009/01/data.parquet'}
2022-11-03 20:47:03,047 - s3fs - DEBUG - _fetch_range -- Fetch: ursa-labs-taxi-data/2009/01/data.parquet, 461900991-461966527
2022-11-03 20:47:03,048 - s3fs - DEBUG - _call_s3 -- CALL: get_object - () - {'Bucket': 'ursa-labs-taxi-data', 'Key': '2009/01/data.parquet', 'Range': 'bytes=461900991-461966526', 'IfMatch': '"880538d41446f7b8083573b44f0b8b37-27"'}
2022-11-03 20:47:03,162 - s3fs - DEBUG - _fetch_range -- Fetch: ursa-labs-taxi-data/2009/01/data.parquet, 461642441-461900991
2022-11-03 20:47:03,162 - s3fs - DEBUG - _call_s3 -- CALL: get_object - () - {'Bucket': 'ursa-labs-taxi-data', 'Key': '2009/01/data.parquet', 'Range': 'bytes=461642441-461900990', 'IfMatch': '"880538d41446f7b8083573b44f0b8b37-27"'}
2022-11-03 20:47:03,324 - s3fs - DEBUG - _call_s3 -- CALL: head_object - ({},) - {'Bucket': 'ursa-labs-taxi-data', 'Key': '2009/01/data.parquet'}
2022-11-03 20:47:03,384 - s3fs - DEBUG - _call_s3 -- CALL: head_object - ({},) - {'Bucket': 'ursa-labs-taxi-data', 'Key': '2009/01/data.parquet'}
2022-11-03 20:47:03,454 - s3fs - DEBUG - _fetch_range -- Fetch: ursa-labs-taxi-data/2009/01/data.parquet, 461900991-461966527
2022-11-03 20:47:03,455 - s3fs - DEBUG - _call_s3 -- CALL: get_object - () - {'Bucket': 'ursa-labs-taxi-data', 'Key': '2009/01/data.parquet', 'Range': 'bytes=461900991-461966526', 'IfMatch': '"880538d41446f7b8083573b44f0b8b37-27"'}
2022-11-03 20:47:03,524 - s3fs - DEBUG - _fetch_range -- Fetch: ursa-labs-taxi-data/2009/01/data.parquet, 461642441-461900991
2022-11-03 20:47:03,525 - s3fs - DEBUG - _call_s3 -- CALL: get_object - () - {'Bucket': 'ursa-labs-taxi-data', 'Key': '2009/01/data.parquet', 'Range': 'bytes=461642441-461900990', 'IfMatch': '"880538d41446f7b8083573b44f0b8b37-27"'}

(4 head calls, 4 get calls, all the same range)

and read

2022-11-03 20:48:11,558 - s3fs - DEBUG - _call_s3 -- CALL: head_object - ({},) - {'Bucket': 'ursa-labs-taxi-data', 'Key': '2009/01/data.parquet'}
2022-11-03 20:48:11,816 - s3fs - DEBUG - _fetch_range -- Fetch: ursa-labs-taxi-data/2009/01/data.parquet, 461900991-461966527
2022-11-03 20:48:11,817 - s3fs - DEBUG - _call_s3 -- CALL: get_object - () - {'Bucket': 'ursa-labs-taxi-data', 'Key': '2009/01/data.parquet', 'Range': 'bytes=461900991-461966526', 'IfMatch': '"880538d41446f7b8083573b44f0b8b37-27"'}
2022-11-03 20:48:11,922 - s3fs - DEBUG - _fetch_range -- Fetch: ursa-labs-taxi-data/2009/01/data.parquet, 461642441-461966519
2022-11-03 20:48:11,922 - s3fs - DEBUG - _call_s3 -- CALL: get_object - () - {'Bucket': 'ursa-labs-taxi-data', 'Key': '2009/01/data.parquet', 'Range': 'bytes=461642441-461966518', 'IfMatch': '"880538d41446f7b8083573b44f0b8b37-27"'}
2022-11-03 20:48:12,089 - s3fs - DEBUG - _fetch_range -- Fetch: ursa-labs-taxi-data/2009/01/data.parquet, 4-33347731
2022-11-03 20:48:12,090 - s3fs - DEBUG - _call_s3 -- CALL: get_object - () - {'Bucket': 'ursa-labs-taxi-data', 'Key': '2009/01/data.parquet', 'Range': 'bytes=4-33347730', 'IfMatch': '"880538d41446f7b8083573b44f0b8b37-27"'}
2022-11-03 20:48:14,017 - s3fs - DEBUG - _fetch_range -- Fetch: ursa-labs-taxi-data/2009/01/data.parquet, 266106048-299556487
2022-11-03 20:48:14,017 - s3fs - DEBUG - _call_s3 -- CALL: get_object - () - {'Bucket': 'ursa-labs-taxi-data', 'Key': '2009/01/data.parquet', 'Range': 'bytes=266106048-299556486', 'IfMatch': '"880538d41446f7b8083573b44f0b8b37-27"'}
2022-11-03 20:48:15,890 - s3fs - DEBUG - _fetch_range -- Fetch: ursa-labs-taxi-data/2009/01/data.parquet, 299556576-332631576
2022-11-03 20:48:15,891 - s3fs - DEBUG - _call_s3 -- CALL: get_object - () - {'Bucket': 'ursa-labs-taxi-data', 'Key': '2009/01/data.parquet', 'Range': 'bytes=299556576-332631575', 'IfMatch': '"880538d41446f7b8083573b44f0b8b37-27"'}
2022-11-03 20:48:17,532 - s3fs - DEBUG - _fetch_range -- Fetch: ursa-labs-taxi-data/2009/01/data.parquet, 332631643-366119909
2022-11-03 20:48:17,533 - s3fs - DEBUG - _call_s3 -- CALL: get_object - () - {'Bucket': 'ursa-labs-taxi-data', 'Key': '2009/01/data.parquet', 'Range': 'bytes=332631643-366119908', 'IfMatch': '"880538d41446f7b8083573b44f0b8b37-27"'}
2022-11-03 20:48:19,814 - s3fs - DEBUG - _fetch_range -- Fetch: ursa-labs-taxi-data/2009/01/data.parquet, 366119998-399192441
2022-11-03 20:48:19,815 - s3fs - DEBUG - _call_s3 -- CALL: get_object - () - {'Bucket': 'ursa-labs-taxi-data', 'Key': '2009/01/data.parquet', 'Range': 'bytes=366119998-399192440', 'IfMatch': '"880538d41446f7b8083573b44f0b8b37-27"'}
2022-11-03 20:48:21,581 - s3fs - DEBUG - _fetch_range -- Fetch: ursa-labs-taxi-data/2009/01/data.parquet, 399192508-432738114
2022-11-03 20:48:21,582 - s3fs - DEBUG - _call_s3 -- CALL: get_object - () - {'Bucket': 'ursa-labs-taxi-data', 'Key': '2009/01/data.parquet', 'Range': 'bytes=399192508-432738113', 'IfMatch': '"880538d41446f7b8083573b44f0b8b37-27"'}
2022-11-03 20:48:23,680 - s3fs - DEBUG - _fetch_range -- Fetch: ursa-labs-taxi-data/2009/01/data.parquet, 432738203-461642359
2022-11-03 20:48:23,680 - s3fs - DEBUG - _call_s3 -- CALL: get_object - () - {'Bucket': 'ursa-labs-taxi-data', 'Key': '2009/01/data.parquet', 'Range': 'bytes=432738203-461642358', 'IfMatch': '"880538d41446f7b8083573b44f0b8b37-27"'}
2022-11-03 20:48:25,287 - s3fs - DEBUG - _fetch_range -- Fetch: ursa-labs-taxi-data/2009/01/data.parquet, 166582521-199685105
2022-11-03 20:48:25,288 - s3fs - DEBUG - _call_s3 -- CALL: get_object - () - {'Bucket': 'ursa-labs-taxi-data', 'Key': '2009/01/data.parquet', 'Range': 'bytes=166582521-199685104', 'IfMatch': '"880538d41446f7b8083573b44f0b8b37-27"'}
2022-11-03 20:48:27,760 - s3fs - DEBUG - _fetch_range -- Fetch: ursa-labs-taxi-data/2009/01/data.parquet, 33347817-66443866
2022-11-03 20:48:27,762 - s3fs - DEBUG - _call_s3 -- CALL: get_object - () - {'Bucket': 'ursa-labs-taxi-data', 'Key': '2009/01/data.parquet', 'Range': 'bytes=33347817-66443865', 'IfMatch': '"880538d41446f7b8083573b44f0b8b37-27"'}
2022-11-03 20:48:29,462 - s3fs - DEBUG - _fetch_range -- Fetch: ursa-labs-taxi-data/2009/01/data.parquet, 99952366-133117918
2022-11-03 20:48:29,462 - s3fs - DEBUG - _call_s3 -- CALL: get_object - () - {'Bucket': 'ursa-labs-taxi-data', 'Key': '2009/01/data.parquet', 'Range': 'bytes=99952366-133117917', 'IfMatch': '"880538d41446f7b8083573b44f0b8b37-27"'}
2022-11-03 20:48:31,172 - s3fs - DEBUG - _fetch_range -- Fetch: ursa-labs-taxi-data/2009/01/data.parquet, 199685171-233041894
2022-11-03 20:48:31,173 - s3fs - DEBUG - _call_s3 -- CALL: get_object - () - {'Bucket': 'ursa-labs-taxi-data', 'Key': '2009/01/data.parquet', 'Range': 'bytes=199685171-233041893', 'IfMatch': '"880538d41446f7b8083573b44f0b8b37-27"'}
2022-11-03 20:48:32,837 - s3fs - DEBUG - _fetch_range -- Fetch: ursa-labs-taxi-data/2009/01/data.parquet, 66443929-99952280
2022-11-03 20:48:32,838 - s3fs - DEBUG - _call_s3 -- CALL: get_object - () - {'Bucket': 'ursa-labs-taxi-data', 'Key': '2009/01/data.parquet', 'Range': 'bytes=66443929-99952279', 'IfMatch': '"880538d41446f7b8083573b44f0b8b37-27"'}
2022-11-03 20:48:34,515 - s3fs - DEBUG - _fetch_range -- Fetch: ursa-labs-taxi-data/2009/01/data.parquet, 133117983-166582432
2022-11-03 20:48:34,517 - s3fs - DEBUG - _call_s3 -- CALL: get_object - () - {'Bucket': 'ursa-labs-taxi-data', 'Key': '2009/01/data.parquet', 'Range': 'bytes=133117983-166582431', 'IfMatch': '"880538d41446f7b8083573b44f0b8b37-27"'}
2022-11-03 20:48:36,159 - s3fs - DEBUG - _fetch_range -- Fetch: ursa-labs-taxi-data/2009/01/data.parquet, 233041983-266105981
2022-11-03 20:48:36,170 - s3fs - DEBUG - _call_s3 -- CALL: get_object - () - {'Bucket': 'ursa-labs-taxi-data', 'Key': '2009/01/data.parquet', 'Range': 'bytes=233041983-266105980', 'IfMatch': '"880538d41446f7b8083573b44f0b8b37-27"'}

(fetched 266MB in 16 serial, but unordered calls in 30s, including parse time, which is near my available bandwidth)

When loading with pyarrow's FS using the snippet above, I get

File ~/conda/envs/py39/lib/python3.9/site-packages/pyarrow/_fs.pyx:763, in pyarrow._fs.FileSystem.open_input_file()

File ~/conda/envs/py39/lib/python3.9/site-packages/pyarrow/error.pxi:144, in pyarrow.lib.pyarrow_internal_check_status()

File ~/conda/envs/py39/lib/python3.9/site-packages/pyarrow/error.pxi:115, in pyarrow.lib.check_status()

OSError: When reading information for key '2009/01/data.parquet' in bucket 'ursa-labs-taxi-data': AWS Error [code 100]: No response body.

The time to download the whole file with s3fs in a single continuous call is 27.7s.

fs = fsspec.filesystem("s3", anon=True)
%time fs.cat(path)
fsspec.parquet.open_parquet_file(path, storage_options={"anon": True}, engine="pyarrow")

takes 29s with s3fs, with two concurrent reads (35s if actually parsing the data)

Please run these on your machines closer to the data! Note that Rick's blog ( https://developer.nvidia.com/blog/optimizing-access-to-parquet-data-with-fsspec/ ) specifically measured s3fs with various caching versus arrow's FS versus fsspec.parquet.
I would appreciate some deeper benchmarking and consideration before jumping to conclusions.

@mrocklin
Copy link
Member Author

mrocklin commented Nov 4, 2022

Totally agreed that folks shouldn't jump to conclusions.

@martindurant have you had a chance to look at this video. I would be curious about your opinion on what is going on there. Also, if you have a chance to look at @gjoseph92 's pyspy profiles (I click "Left Heavy" and find that that helps with interpretability).

@mrocklin
Copy link
Member Author

mrocklin commented Nov 4, 2022

If I had to guess (and I only have like 20% confidence here) it would be that while fsspec is good on its own and dask is good on its own there is some negative interaction when having both event loops doing their thing at once, and for some reason things get sticky. Just a total guess though. If that guess is anything near to correct though then I think we would want to evaluate things in the wild, using fsspec/arrow + dask + s3, ideally on cloud machines.

If you'd like your very own cloud machines I'm happy to set you up with a Coiled account. It's a pretty smooth experience today. I'm also happy to pay if folks don't already have AWS credentials through work.

@martindurant
Copy link
Member

NB CLI curl took 22s to fetch the file from the HTTP endpoint.

@martindurant
Copy link
Member

I will also do my own measurements n AWS, but certainly not today. I can't say there's too much rush.
It may be worthwhile for someone to check out the tricks in https://boto3.amazonaws.com/v1/documentation/api/latest/guide/s3.html#using-the-transfer-manager to make things faster (although that may be for whole files only).

I would prefer if we can improve s3fs rather than splitting the FS backends.

@martindurant
Copy link
Member

Feel free to see if this makes any difference

--- a/s3fs/core.py
+++ b/s3fs/core.py
@@ -2231,7 +2231,11 @@ def _fetch_range(fs, bucket, key, version_id, start, end, req_kw=None):
         )
         return b""
     logger.debug("Fetch: %s/%s, %s-%s", bucket, key, start, end)
-    resp = fs.call_s3(
+    return sync(fs.loop, _inner_fetch, fs, bucket, key, version_id, start, end, req_kw)
+
+
+async def _inner_fetch(fs, bucket, key, version_id, start, end, req_kw=None):
+    resp = await fs._call_s3(
         "get_object",
         Bucket=bucket,
         Key=key,
@@ -2239,4 +2243,4 @@ def _fetch_range(fs, bucket, key, version_id, start, end, req_kw=None):
         **version_id_kw(version_id),
         **req_kw,
     )
-    return sync(fs.loop, resp["Body"].read)
+    return await resp["Body"].read()

@mrocklin
Copy link
Member Author

mrocklin commented Nov 4, 2022

I would prefer if we can improve s3fs rather than splitting the FS backends

Can you motivate this a bit more?

What are some of the downsides of using Arrow filesystems in specialized cases like above?

@martindurant
Copy link
Member

To dask: some additional complexity and development, as well as possible confusion in users when multiple backends are used in conjunction.
To the community: s3fs is used an awful lot without arrow, so improvements will be far reaching (arrow's install size on, for instance, AWS Lambda, is one of the main reasons for the continued existence of fastparquet); any specific features that might be needing development are much easier to achieve in python than arrow's C++ monolith.

@mrocklin
Copy link
Member Author

mrocklin commented Nov 4, 2022

Sure, and I'm not currently planning to invest any development resources in improving arrow. However, if today Arrow's reading is better than s3fs's reading in a particular case, and if we know that we are in a situation where we can swap the two safely, then it seems to me like we should do so.

To be clear, I'm not saying "let's rip out S3" I'm saying "In this restricted but common case where we know it's safe, maybe we can automatically swap things and get a speed boost"

@mrocklin
Copy link
Member Author

mrocklin commented Nov 4, 2022

You might also be underestimating the importance of this speed boost. We've just discovered a way for Dask to go 2x faster on very common workloads. This is HUGE for a lot of users.

@mrocklin
Copy link
Member Author

mrocklin commented Nov 4, 2022

(if indeed, this speedup exists (which currently our benchmarking shows that it does in the wild, but we should do more homework))

@rjzamora
Copy link
Member

rjzamora commented Nov 4, 2022

I would prefer if we can improve s3fs rather than splitting the FS backends.

We built everything on fsspec for good reason. It is flexible, effective, and has excellent API coverage. Therefore, fsspec/s3fs will clearly remain the primary code path in Dask-IO in the future. With that said, I do feel that the user should be allowed to set the global filesystem to an S3FileSystem object for the ”pyarrow” engine. There have been issues related to pyarrow users who want to pass the same arguments to Dask that they are already passing to pyarrow.dataset. Also, if we ultimately find that S3FileSystem has a consistent performance advantage (one that cannot be fixed), it would be pretty silly not to tweak default behavior.

@martindurant
Copy link
Member

On ec2:

%time fs.get("s3://ursa-labs-taxi-data/2009/01/data.parquet", "data.parquet")
CPU times: user 1.16 s, sys: 641 ms, total: 1.8 s
Wall time: 4.67 s

> wget https://ursa-labs-taxi-data.s3.amazonaws.com/2009/01/data.parquet
461,966,527 96.0MB/s   in 4.6s

%time f = fsspec.parquet.open_parquet_file("s3://ursa-labs-taxi-data/2009/01/data.parquet", storage_options={"anon": True})
CPU times: user 1.36 s, sys: 833 ms, total: 2.2 s
Wall time: 3.46 s

%%time
with fs.open_input_file("ursa-labs-taxi-data/2009/01/data.parquet") as f:
    data = True
    while data:
        data = f.read(2**10)
AWS Error [code 100]: No response body  # also fails via dd.read_parquet as in Rick's snippet

%%time  # serial
with fsspec.open("s3://ursa-labs-taxi-data/2009/01/data.parquet", mode="rb", anon=True) as f:
    data = True
    while data:
        data = f.read(2**10)
CPU times: user 2.87 s, sys: 530 ms, total: 3.4 s
Wall time: 13.9 s

In [26]: print(_i23)
%%time  # serial, bigger blocks
with fsspec.open("s3://ursa-labs-taxi-data/2009/01/data.parquet", mode="rb", anon=True, default_block_size=50*2**20) as f:
    data = True
    while data:
        data = f.read(2**10)
CPU times: user 3.09 s, sys: 1.13 s, total: 4.22 s
Wall time: 7.63 s

> curl https://ursa-labs-taxi-data.s3.amazonaws.com/2009/01/data.parquet -o temp
4s

So s3fs matches wget and curl for throughput.

With dask in the loop, exactly the same

imoprt dask
import dask
import fsspec
fs = fsspec.filesystem("s3", anon=True)

@dask.delayed
def f():
    fs.get("s3://ursa-labs-taxi-data/2009/01/data.parquet", "data.parquet")
    return True

d = f()
%timeit dask.compute(d)
4.64 s ± 3.14 ms per loop

So all saturate at about 100MB/s, as expected for a .medium EC2 instance. Do I need something bigger? Why does pyarrow consistently fail?

pyarrow 8.0.0
fsspec, s3fs 2022.11.0
dask 2022.10.2
(fresh conda environment with conda-forge, no particular care taken)

@rjzamora
Copy link
Member

Why does pyarrow consistently fail?

I haven't seen any failures on pyarrow-9.0.0 (using S3FileSystem(anonymous=True)).

So all saturate at about 100MB/s, as expected for a .medium EC2 instance. Do I need something bigger?

My network is too noisy right now to run any useful experiments. However, it may be the case that the data-transfer time is indeed optimal (or close to it) for fsspec.read_parquet_file. If there is actually a consistent discrepancy in the read_parquet wall time (still not sure if there is), it may be related to something unrelated to data transfer.

@martindurant
Copy link
Member

I haven't seen any failures on pyarrow-9.0.0

The env was created like

conda create -n new python=3.9 pyarrow dask s3fs ipython pandas -c conda-forge

under AWS linux from fresh miniconda, nothing special.

@gjoseph92
Copy link
Collaborator

However, it may be the case that the data-transfer time is indeed optimal (or close to it) for fsspec.read_parquet_file. If there is actually a consistent discrepancy in the read_parquet wall time (still not sure if there is), it may be related to something unrelated to data transfer.

I'd expect plain read_parquet wall time to be pretty much optimal.

I think the problem here is about the interaction—particularly regarding concurrency—between s3fs and dask (and pyarrow?), as Matt is saying. s3fs seems perfectly fast on its own, but when you have multiple Arrow threads calling into single-threaded, GIL-locked Python to do their reads, which then call sync into a single-threaded event loop, which then call s3fs stuff, is that optimal?

As an example, please take a look at the py-spy profile I recorded of multi-threaded dask workers on a real cluster loading Parquet data: tls-10_0_0_177-42425.json

If you look at the fsspecIO thread (which is a different thread from the worker event loop MainThread, which itself is quite concerning—this implies there are two separate event loops running in separate threads), you can see that 30% of the time, the event loop is blocked acquiring the Python GIL:

Screen Shot 2022-11-14 at 11 04 48 AM

This is the convoy effect https://bugs.python.org/issue7946; you can read more here dask/distributed#6325. It means that async networking performance will be significantly degraded if you're doing stuff in other threads that hold the GIL. In Dask, we're using async networking to load parquet, and we have other worker threads doing stuff that's likely to hold the GIL (plenty of pandas operations hold the GIL at least some of the time).

This is just an inherent limitation of Python right now, there's not much fsspec can do about it.

I'm not saying this is the only problem. I also would imagine (unfounded) that arrow can be more memory-efficient and reduce the number of copies if it to manage its own IO without leaving the C++ world. I'd hope it could read from network directly into an arrow-managed buffer—ideally zero-copy sendfile, but at least without a copy into non-Arrow userspace then getting copied again into Arrow-manged userspace.

Broadly, I just find it weird to see 4 Arrow threads all calling into Python to do their IO. I would expect, intuitively, that a performance and memory sensitive C++ library like Arrow can perform better if it gets to manage its own IO, also in C++.

@mrocklin
Copy link
Member Author

I'm not sure it helps me, since I was doing interactive timings mostly without dask;

Yeah, just to be clear, what I'm interested in here isn't "the relative performance of s3fs vs pyarrow s3" it's the "relative performance of these libraries in the context of Dask".

What I'm seeing here (thanks @gjoseph92 for the profiling) is that s3fs is 2x slower in the context of Dask in a specific but very common use case. Unless we see an easy development path towards making this much faster (and @gjoseph92 's comment leads me to believe that such a path will be hard to find), I think that we should probably move in the direction of swapping things out when we know that it is safe to do so.

@mrocklin
Copy link
Member Author

quick addendum: "*at least in the short term"

@martindurant
Copy link
Member

Note that I don't actually know what code was run, the basis of this thread. I have been using Rick's snippet, which does not show the problem. The report shows

ddf_q1 = ddf[["id1", "v1"]].astype({"id1": "string[pyarrow]"})
ddf_q1.groupby("id1", dropna=False, observed=True).agg({"v1": "sum"}).compute()

(is it not possible to get arrow's parquet reader to produce arrow-string columns?!)

@mrocklin
Copy link
Member Author

@ncclementi can you give @martindurant a quick code snippet that he can run that reads the right parqet file (or any parquet file I suspect).

@martindurant I think that this is being pulled from https://github.com/coiled/coiled-runtime/blob/main/tests/benchmarks/h2o/test_h2o_benchmarks.py

It looks like this:

dd.read_parquet(
            request.param, engine="pyarrow", storage_options={"anon": True}
        )

I suspect with this parameter, "s3://coiled-datasets/h2o-benchmark/N_1e8_K_1e2_parquet/*.parquet" but I'm not sure. @ncclementi will know more.

(is it not possible to get arrow's parquet reader to produce arrow-string columns?!)

Yup. Current source of frustration. See #9631

@mrocklin
Copy link
Member Author

Never mind, here is an entirely different computation with the same result:

import coiled
from dask.distributed import Client
import dask.dataframe as dd

df = dd.read_parquet("s3://coiled-datasets/mrocklin/nyc-taxi-fhv",
                    storage_options={"anon": True})

cluster = coiled.Cluster(package_sync=True)
client = Client(cluster)

with coiled.performance_report():
    df[["trip_miles", "trip_time", "base_passenger_fare"]].sum().compute()

https://cloud.coiled.io/mrocklin/reports/8919

@mrocklin
Copy link
Member Author

In contrast:

import pyarrow.fs as pa_fs
fs = pa_fs.S3FileSystem(anonymous=True)
df = dd.read_parquet(
    "s3://coiled-datasets/mrocklin/nyc-taxi-fhv",
    storage_options={"anon": True},
    open_file_options={
        "open_file_func": fs.open_input_file,
    },
)
with coiled.performance_report():
    df[["trip_miles", "trip_time", "base_passenger_fare"]].sum().compute()

https://cloud.coiled.io/mrocklin/reports/8921

@ncclementi
Copy link
Member

@ncclementi can you give @martindurant a quick code snippet that he can run that reads the right parqet file (or any parquet file I suspect).

Saw this a bit late, for reference things in the coiled_datasets s3 bucket are public.
This are datasets that can be useful:

  • 0.5GB dataset: s3://coiled-datasets/h2o-benchmark/N_1e7_K_1e2_parquet/*.parquet
  • 5GB dataset: s3://coiled-datasets/h2o-benchmark/N_1e8_K_1e2_parquet/*.parquet
  • 50GB dataset : s3://coiled-datasets/h2o-benchmark/N_1e9_K_1e2_parquet/*.parquet

So an example for the 5GB dataset would be,

import coiled
from dask.distributed import Client
import dask.dataframe as dd

cluster = coiled.Cluster( n_workers=4, package_sync=True)
client = Client(cluster)

ddf = dd.read_parquet(
    "s3://coiled-datasets/h2o-benchmark/N_1e8_K_1e2_parquet/*.parquet",
    engine="pyarrow",
    storage_options={"anon": True},
)

with performance_report():
    ddf_q1 = ddf[["id1", "v1"]]
    ddf_q1.groupby("id1", dropna=False, observed=True).agg({"v1": "sum"}).compute()

here you can find multiple performance reports for all different queries. But q1 is the one, for this example, and I believe the same one that Gabe run in his py-spy report

@martindurant
Copy link
Member

Did anyone try uvloop with s3fs?

@mrocklin
Copy link
Member Author

I haven't, no.

@gjoseph92
Copy link
Collaborator

I also haven't. But uvloop suffers from the convoy effect just like asyncio does so I'd be surprised to see much difference.

A thing that could help is if operations on nonblocking sockets kept the GIL. Since they should be nonblocking, there should be no reason to release—and then have to immediately re-acquire—the GIL. Getting this change into CPython would be slow and maybe not possible: https://bugs.python.org/issue45819.

The main appeal of uvloop is that getting such a change MagicStack/uvloop@124c2c3 merged is probably a lot more feasible. A bit of the motivation for the asyncio TCP comm dask/distributed#5450 was for this purpose—so we could then go fix the convoy effect in uvloop, and use the new comm to take advantage of it—but that effort fizzled out.

(It probably is worth trying the asyncio comm though. Re-reading the description of that PR, it seems like there might be a few advantages over Tornado even with the convoy effect in play. But that's a tangent to this conversation—comms aren't being used by fsspec to read parquet from S3, that's only for worker<->worker data transfer.)

@mrocklin
Copy link
Member Author

Even if uvloop solved the problem I would still push for this change. Many people don't use uvloop and if we can give those people a 2x speedup for presumably no cost then we should.

@martindurant
Copy link
Member

If we can offer a speedup for people using s3fs without arrow, or on backends not supported by arrow, we should do that too

@mrocklin
Copy link
Member Author

mrocklin commented Nov 18, 2022 via email

@martindurant
Copy link
Member

martindurant commented Nov 22, 2022

For fun, I grabbed the 5GB (2.8GB on disk) data set and

Python 3.8.10 | packaged by conda-forge | (default, May 11 2021, 07:01:05) 
Type 'copyright', 'credits' or 'license' for more information
IPython 7.23.1 -- An enhanced Interactive Python. Type '?' for help.

In [1]: import pandas

In [2]: %time df = pandas.read_parquet("bench", engine="pyarrow")
CPU times: user 27.5 s, sys: 5.62 s, total: 33.1 s
Wall time: 15.3 s
4.9GB process memory, spike above 10GB


(py3) mdurant@walnut:~/data$ ipython
Python 3.8.10 | packaged by conda-forge | (default, May 11 2021, 07:01:05) 
Type 'copyright', 'credits' or 'license' for more information
IPython 7.23.1 -- An enhanced Interactive Python. Type '?' for help.

In [1]: import pandas

In [2]: %time df = pandas.read_parquet("bench", engine="fastparquet")
CPU times: user 7.93 s, sys: 1.76 s, total: 9.69 s
Wall time: 10.4 s
4.3GB process memory, no spike

Note particularly the CPU time - pyarrow was using threads and still came in way behind.

On Intel Core i5-9400F, 6-core, 16GB RAM and SSD storage. My GPU is not big or new enough to bother trying with that.

@the-matt-morris
Copy link

This is a great find!

Just wanted to mention something I'm running into when using this, not as a problem for anyone else to solve, but to highlight a potential situation to consider if this is implemented as the default behavior. It may well be I've hit an edge case, it's related to one of the other libraries I'm using, or I'm doing something wrong.

I use a prefect-dask workflow that calls dd.read_parquet with open_file_options. Simplified here for illustration:

dask==2022.11.1
prefect==2.7.10
prefect_dask==0.2.2
import os

import dask.dataframe as dd
import numpy as np
import pandas as pd
import pyarrow.fs as pa_fs
from dask.distributed import LocalCluster
from prefect import flow, task
from prefect_dask import DaskTaskRunner

fs = pa_fs.S3FileSystem(
    access_key=os.environ["AWS_ACCESS_KEY_ID"],
    secret_key=os.environ["AWS_SECRET_ACCESS_KEY"],
)


@task
def my_task(part: int) -> pd.DataFrame:
    df = dd.read_parquet(
        f"s3://<MY_BUCKET>/<MY_FILE_PREFIX>/part_{part:04d}.parquet",
        engine="pyarrow",
        filters= < SOME_FILTERS >,
        open_file_options={
            "open_file_func": fs.open_input_file,
        },
    )

    return df.compute()


@flow(
    task_runner=DaskTaskRunner(
        cluster_class=LocalCluster,
        cluster_kwargs={"processes": True, "n_workers": 8},
    ),
)
def my_flow(partitions: list[int]):
    my_task.map(np.array_split(partitions, 8))


if __name__ == "__main__":
    my_flow(1000)

When there's 1000 partitions, each process get through maybe half of them before inevitably encountering this error

...
    result = result.compute()
  File "/usr/local/lib/python3.9/site-packages/dask/base.py", line 315, in compute
    (result,) = compute(self, traverse=False, **kwargs)
  File "/usr/local/lib/python3.9/site-packages/dask/base.py", line 600, in compute
    results = schedule(dsk, keys, **kwargs)
  File "/usr/local/lib/python3.9/site-packages/dask/threaded.py", line 89, in get
    results = get_async(
  File "/usr/local/lib/python3.9/site-packages/dask/local.py", line 511, in get_async
    raise_exception(exc, tb)
  File "/usr/local/lib/python3.9/site-packages/dask/local.py", line 319, in reraise
    raise exc
  File "/usr/local/lib/python3.9/site-packages/dask/local.py", line 224, in execute_task
    result = _execute_task(task, data)
  File "/usr/local/lib/python3.9/site-packages/dask/core.py", line 119, in _execute_task
    return func(*(_execute_task(a, cache) for a in args))
  File "/usr/local/lib/python3.9/site-packages/dask/optimization.py", line 990, in __call__
    return core.get(self.dsk, self.outkey, dict(zip(self.inkeys, args)))
  File "/usr/local/lib/python3.9/site-packages/dask/core.py", line 149, in get
    result = _execute_task(task, cache)
  File "/usr/local/lib/python3.9/site-packages/dask/core.py", line 119, in _execute_task
    return func(*(_execute_task(a, cache) for a in args))
  File "/usr/local/lib/python3.9/site-packages/dask/dataframe/io/parquet/core.py", line 90, in __call__
    return read_parquet_part(
  File "/usr/local/lib/python3.9/site-packages/dask/dataframe/io/parquet/core.py", line 589, in read_parquet_part
    dfs = [
  File "/usr/local/lib/python3.9/site-packages/dask/dataframe/io/parquet/core.py", line 590, in <listcomp>
    func(fs, rg, columns.copy(), index, **toolz.merge(kwargs, kw))
  File "/usr/local/lib/python3.9/site-packages/dask/dataframe/io/parquet/arrow.py", line 436, in read_partition
    arrow_table = cls._read_table(
  File "/usr/local/lib/python3.9/site-packages/dask/dataframe/io/parquet/arrow.py", line 1519, in _read_table
    arrow_table = _read_table_from_path(
  File "/usr/local/lib/python3.9/site-packages/dask/dataframe/io/parquet/arrow.py", line 233, in _read_table_from_path
    with _open_input_files(
  File "/usr/local/lib/python3.9/site-packages/dask/dataframe/io/utils.py", line 159, in _open_input_files
    return [
  File "/usr/local/lib/python3.9/site-packages/dask/dataframe/io/utils.py", line 160, in <listcomp>
    _set_context(open_file_func(path, **kwargs), context_stack)
  File "pyarrow/_fs.pyx", line 770, in pyarrow._fs.FileSystem.open_input_file
    in_handle = GetResultValue(self.fs.OpenInputFile(pathstr))
  File "pyarrow/error.pxi", line 144, in pyarrow.lib.pyarrow_internal_check_status
    return check_status(status)
  File "pyarrow/error.pxi", line 115, in pyarrow.lib.check_status
    raise IOError(message)
OSError: When reading information for key '<MY_KEY>' in bucket '<MY_BUCKET>': AWS Error NETWORK_CONNECTION during HeadObject operation: curlCode: 43, A libcurl function was given a bad argument

Fortunately, I can simply remove the open_file_options argument and my workflow runs fine again, but wanted to point this out in case there's something to consider here (I don't know what that would be). Perhaps I need to pass a pyarrow FileSystem instance to each process, which I will try next.

@jrbourbeau
Copy link
Member

Thanks for the feedback @the-matt-morris. I'm curious if you run into the same problem if you use the new filesystem= keyword argument in the read_parquet call? Something like

    df = dd.read_parquet(
        f"s3://<MY_BUCKET>/<MY_FILE_PREFIX>/part_{part:04d}.parquet",
        engine="pyarrow",
        filters= < SOME_FILTERS >,
        filesystem=fs,  # pass your pyarrow filesystem object here 
    )

@the-matt-morris
Copy link

Thanks for the feedback @the-matt-morris. I'm curious if you run into the same problem if you use the new filesystem= keyword argument in the read_parquet call?

That solved my problem, no longer seeing the error in my use case. Thanks @jrbourbeau !

@jrbourbeau
Copy link
Member

Awesome, thanks for trying that out @the-matt-morris! Glad to hear things are working. Is using the pyarrow-backed filesystem more performant in your use case than the default fsspec/s3fs filesystem implementation?

@the-matt-morris
Copy link

Hey @jrbourbeau , sorry for the delay. You know what, I'm seeing nearly identical performance between both methods.

I suppose it might make sense to benchmark this against publicly available data. Looks like there was a coiled-datasets parquet file that was being used previously in this thread. I can do some trials on that with this change.

@dbalabka
Copy link

dbalabka commented Jul 20, 2023

It is my first experience with Dask. After hours of investigation, I found this issue, and it solves the problem for me.
Before providing the custom open_file_options function, Dask took minutes (!!!) to calculate the result. By applying the fix, it leads to a few seconds.

In contrast to the problem reported by @mrocklin, Dask became nearly impossible to use when I'm trying to load files from AWS S3. Also, I have concerns that ineffective S3 usage increases the requests rate which should lead to service cost increase. Therefore, I would say that it is a serious performance issue.

Based on the difference in latency, the simple min/max calculation might take from 20 seconds up to 2 minutes for a pretty small parquet file, 500MB (snappy compressed)

Dask version:

dask-kubernetes = "2023.7.1"
dask = "2023.7.0"
s3fs = "2023.6.0"

@martindurant
Copy link
Member

@dbalabka , thanks for your report. It would be very useful to know more details about your workflow and setup. How many threads versus processes are you using, for example? How big are your typical chunks in bytes?

If you turn on the "s3fs" logger, you can also know what calls it is making. You suggest there are too many, so we need to know which could in theory be eliminated. Could excessive calls by themselves lead to your slowdown due to AWS' throttling?

@dbalabka
Copy link

@martindurant, I apologise for the late response. I had an issue that was resolved, but it was not related to the problem we previously discussed. I believe the update for dask up until 2023.8.1 was beneficial.

The loading of files from S3 is speedy enough. However, following your advice, I utilized s3fs debug logging to study the effect on performance. I hope these logs will be useful in your investigation.

Logs of loading 500MB file from the S3:

%%time
import dask.dataframe as dd

df = dd.read_parquet('s3://s3-bucket/file').persist()
2023-08-30 15:21:53,581 - s3fs - DEBUG - _lsdir -- Get directory listing page for s3-bucket/file
2023-08-30 15:21:53,583 - s3fs - DEBUG - set_session -- Setting up s3fs instance
2023-08-30 15:21:53,597 - s3fs - DEBUG - set_session -- RC: caching enabled? False (explicit option is False)
2023-08-30 15:21:54,418 - s3fs - DEBUG - _call_s3 -- CALL: head_object - ({},) - {'Bucket': 's3-bucket', 'Key': 'file'}
2023-08-30 15:21:54,569 - s3fs - DEBUG - _call_s3 -- CALL: head_object - ({},) - {'Bucket': 's3-bucket', 'Key': 'file'}
2023-08-30 15:21:54,718 - s3fs - DEBUG - _fetch_range -- Fetch: s3-bucket/file, 560317336-560382872
2023-08-30 15:21:54,721 - s3fs - DEBUG - _call_s3 -- CALL: get_object - () - {'Bucket': 's3-bucket', 'Key': 'file', 'Range': 'bytes=560317336-560382871', 'IfMatch': '"12dc2c8685d333e43d3d4101c238688f-11"'}
2023-08-30 15:21:55,156 - s3fs - DEBUG - _fetch_range -- Fetch: s3-bucket/file, 558747836-560382872
2023-08-30 15:21:55,158 - s3fs - DEBUG - _call_s3 -- CALL: get_object - () - {'Bucket': 's3-bucket', 'Key': 'file', 'Range': 'bytes=558747836-560382871', 'IfMatch': '"12dc2c8685d333e43d3d4101c238688f-11"'}
2023-08-30 15:21:56,299 - s3fs - DEBUG - _call_s3 -- CALL: head_object - ({},) - {'Bucket': 's3-bucket', 'Key': 'file'}
2023-08-30 15:21:56,448 - s3fs - DEBUG - _call_s3 -- CALL: head_object - ({},) - {'Bucket': 's3-bucket', 'Key': 'file'}
2023-08-30 15:21:56,597 - s3fs - DEBUG - _fetch_range -- Fetch: s3-bucket/file, 560317336-560382872
2023-08-30 15:21:56,600 - s3fs - DEBUG - _call_s3 -- CALL: get_object - () - {'Bucket': 's3-bucket', 'Key': 'file', 'Range': 'bytes=560317336-560382871', 'IfMatch': '"12dc2c8685d333e43d3d4101c238688f-11"'}
2023-08-30 15:21:56,756 - s3fs - DEBUG - _fetch_range -- Fetch: s3-bucket/file, 558747836-560382872
2023-08-30 15:21:56,759 - s3fs - DEBUG - _call_s3 -- CALL: get_object - () - {'Bucket': 's3-bucket', 'Key': 'file', 'Range': 'bytes=558747836-560382871', 'IfMatch': '"12dc2c8685d333e43d3d4101c238688f-11"'}
CPU times: user 1.19 s, sys: 138 ms, total: 1.33 s
Wall time: 8.94 s

@martindurant
Copy link
Member

We do see that there are some extraneous head_object calls here (cost up to 100ms each) that would be worth tracking down. I suspect that if there were an initial ls on the bucket to populate the cache, these would disappear.

Also, it is interesting that the range reads are in reverse order - maybe this is pyarrow reading the parquet footer and finding 64KB isn't big enough (but it happens more than once). This is probably not something s3fs can do anything about.

@github-actions github-actions bot added the needs attention It's been a while since this was pushed on. Needs attention from the owner or a maintainer. label Apr 29, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
dataframe io needs attention It's been a while since this was pushed on. Needs attention from the owner or a maintainer. parquet
Projects
None yet
Development

No branches or pull requests

8 participants