Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use pyarrow S3 file system at read time for arrow parquet engine #9669

Closed
wants to merge 3 commits into from

Conversation

rjzamora
Copy link
Member

As discussed in #9619, there seem to be some performance benefits of using pyarrow.fs.S3FileSystem to open s3-based Parquet files for data ingest. This PR proposes a simple change to the defaults open_file_options contents for the "arrow" read_parquet engine.

  • Closes #xxxx
  • Tests added / passed
  • Passes pre-commit run --all-files

Copy link
Member

@jrbourbeau jrbourbeau left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @rjzamora. I need to catch up on #9619 to get a better understanding for the motivation behind this change, but thought I'd leave a couple of drive-by comments in the meantime

**kwargs,
):

# Optimize open_file_func for remote filesystems
cls._update_open_file_func(fs, storage_options, kwargs)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we make this something more like:

kwargs = cls._update_open_file_func(fs, storage_options, kwargs)

or (psuedocode below):

if "open_file_options" not in kwargs:
    open_file_options = cls._get_open_file_options(fs, storage_options, kwargs)
    if open_file_options:
        kwargs["open_file_options"] = open_file_options

Where possible I'd like to avoid methods that mutate things inplace as it can make it more difficult to reason about the logic at play. Being more explicit about where things are set is, I think, a good thing.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where possible I'd like to avoid methods that mutate things inplace as it can make it more difficult to reason about the logic at play. Being more explicit about where things are set is, I think, a good thing.

I completely agree that code should be functional whenever possible. Sorry, you were so quick to review that I didn't get a chance to clean this up


Currently supports ``s3fs`` -> ``pyarrow.fs.S3FileSystem``.
"""
is_s3fs = type(fs).__module__.split(".")[0] == "s3fs"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As is, I think we could use dask.utils.typename(...) here to make things more succinct. That said, I wonder if we could use a more direct check using isinstance(fs, fsspec.AbstractFileSystem) and fs.protocol (or something along these lines)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, this line is pretty gross. If we cant boil this down to a direct instance check, then I like the idea of using a well-defined utility.

@jrbourbeau
Copy link
Member

It also appears that pyarrow has utilities for converting between pyarrow filesystem objects and fsspec-compatible filesystem objects (see https://arrow.apache.org/docs/python/filesystems.html#using-fsspec-compatible-filesystems-with-arrow and https://arrow.apache.org/docs/python/filesystems.html#using-arrow-filesystems-with-fsspec). Do these help us here?

@rjzamora
Copy link
Member Author

It also appears that pyarrow has utilities for converting between pyarrow filesystem objects and fsspec-compatible filesystem objects ... Do these help us here?

Unfortunately not :/

The first link is already what we are doing to create pyarrow Dataset objects (the backend filesystem is still python-based fsspec). The second link (ArrowFSWrapper) explains that we can wrap a pyarrow-based filesystem object in fsspec to expose fsspec-compatible APIs for ops like ls, mkdir, etc. The problem is that the wrapper does not seem work for opening/reading files. Even if we could use ArrowFSWrapper for data ingest, we still need to create the pyarrow-based filesystem somewhere.

**kwargs,
):

# Set default open_file_options for remote filesystems
kwargs["open_file_options"] = cls._default_open_file_options(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_get_open_file_options

Maybe this is dumb, but rather than build this convention, could we do something like ...

if engine == "pyarrow" and filename.startswith("s3://") and not storage_options:
    open_file_options[...] = ...

Building a convention like _get_open_file_options for this seems a bit heavyweight to me.

(please feel free to entirely ignore this comment. I also may not ever respond. Please do not block on me)

@mrocklin
Copy link
Member

Thanks for taking this on @rjzamora

Copy link
Member

@jrbourbeau jrbourbeau left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Checking in here -- @rjzamora the code changes here generally look good and seem pretty straightforward. What additional changes / benchmarking do you think need to be done (if any)?

Also, could we add a test for the functionality added here? It'd be good to confirm that we swap between s3fs / pyarrow S3 filesystems under the expected conditions

@@ -762,6 +768,30 @@ def write_metadata(cls, parts, meta, fs, path, append=False, **kwargs):
# Private Class Methods
#

@classmethod
def _default_open_file_options(cls, fs, storage_options, input_options):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sidenote: I agree with Matt that it'd be nice to inline this logic, since it's relatively straightward and only used in one place. However, my guess is inlining this logic will make it more difficult to test. If that's the case, then I think testability should take priority.

EDIT: moving to small, private utility function seems lighter weight than a @classmethod and should make it easier to test

@jrbourbeau jrbourbeau changed the title Use S3FileSystem at read time for arrow parquet engine Use pyarrow S3 file system at read time for arrow parquet engine Nov 22, 2022
@rjzamora
Copy link
Member Author

@martindurant - Do you think we should be able to use ArrowFSWrapper (https://arrow.apache.org/docs/python/filesystems.html#using-arrow-filesystems-with-fsspec) to expose an fsspec API for a pyarrow file-system? It would be nice to use this functionality to support temporary/specific optimizations without diverging from an fsspec-based design.

@martindurant
Copy link
Member

fsspec/filesystem_spec#1113 is what I was referring to. It ought to be fine to pass various connection strings to arrow, but I don't really know what their API expects. Assuming this is fine, the rest of the interface should work as normal. Note the options in _open (https://github.com/fsspec/filesystem_spec/blob/master/fsspec/implementations/arrow.py#L149), where presumably we always need seekable.

Copy link
Member

@jrbourbeau jrbourbeau left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@martindurant I didn't quite follow #9669 (comment) -- do you know if it's possible to use ArrowFSWrapper to get an fsspec-compatible object that still gets the performance benefits of using pyarrows S3 filesystem? @rjzamora mentioned offline that he tried this but ran into some issues (I don't have any details here -- perhaps Rick could fill them in).

Regardless, I think we all view this as a temporary optimization. Long-term, it'd be great to have s3fs be just as performant and rip out the changes in this PR. However, given that it looks like users will see a sizeable performance improvement, and the code changes here are relatively straightforward / isolated, I think it makes sense to include these changes until we can make upstream changes to s3fs.

@martindurant
Copy link
Member

martindurant commented Nov 23, 2022

I wish to raise the following caveats:

  • users will not generally know that some IO is being done by a different library, and so things that appear to work on the client will maybe not work on workers. They may well start raising issues at s3fs that we (I) can do nothing about.
  • there is no way not to do this, and the change does not appear in any higher documentation; also it introduces a real difference in what happens between the parquet backend engines.
  • the configuration of s3fs and arrow's fs are not the same, they require different kwargs. Here we have dealt with exactly one: anonymous access.
  • all other config that might be passed in storage_options are lost, so any data requiring, for example, profile= will not be usable at all, which is a big regression. Things might magically work if auth can be automatically determined without any kwargs, but boto and arrow probably don't have the same try order for the various config options either (e.g., .boto3 file will only be used by boto/s3fs).

Copy link
Member

@jrbourbeau jrbourbeau left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @martindurant

things that appear to work on the client will maybe not work on workers

Hmm I don't follow this point. Could you give an example fail case?

the change does not appear in any higher documentation

That's a fair point. @rjzamora thoughts on adding a note about this in the docstring?

the configuration of s3fs and arrow's fs are not the same, they require different kwargs. Here we have dealt with exactly one: anonymous access.

This seems okay to me. anon/anonymous seems somehow special as it's very commonly used -- I think special casing that one keyword is reasonable. Other than that, s3fs.S3FileSystem and pyarrow.S3FileSystem share no arguments, and since we handle falling back to s3fs gracefully, any existing code that specifies s3fs-specific storage options will continue to use s3fs.S3FileSystem. I don't anticipate any change in behavior/performance when a non-trivial storage_options is provided.

We could make this more explicit by saying "if storage_options has anything other than anon/anonymous use s3fs". @rjzamora mentioned the current approach would make it easier for folks using pyarrow.S3FileSystem outside of Dask to start using Dask. This seems fine, but not super motivated. I'd be happy adding a more explicit storage_options check if that's what you would prefer.

all other config that might be passed in storage_options are lost, so any data requiring, for example, profile= will not be usable at all, which is a big regression

I think this also falls under the point above

there is no way not to do this

Users could specify a open_file_options value that uses s3fs.S3FileSystem. Fully admit that's not a very obvious workaround, but I wanted to highlight there is a way to override the new default behavior proposed here.

Things might magically work if auth can be automatically determined without any kwargs, but boto and arrow probably don't have the same try order for the various config options either (e.g., .boto3 file will only be used by boto/s3fs).

That's a fair point that hadn't occurred to me. I wonder (1) how common this is, (2) what we can do in those cases. Maybe gracefully fallback to s3fs is the initial metadata can't be read? @rjzamora I'd be curious about your thoughts here

@martindurant
Copy link
Member

things that appear to work on the client will maybe not work on workers

Hmm I don't follow this point. Could you give an example fail case?

The user does pd.read_parquet() on part of the dataset in the client. This uses s3fs only, and succeeds. Then they do dd.read_parquet() on the full dataset, where the arrow FS doesn't know how to establish auth. Strange errors will result,. OR we fall back to s3fs and it works; but it's hard for the user to reason about what's happening.

if storage_options has anything other than anon/anonymous use s3fs

There are many ways to configure botocore sessions, not just kwargs. (same as last point above). It is super common in some circumstances, e.g., each user in a jhub, and probably any situation where the user does not own the compute instance they are on.

Note that for non-local read, e.g., from my laptop, I already showed that s3fs is just as fast as pyarrow, saturating the bandwidth, so no need for the change in that case. This is probably the biggest user base (but not in enterprise).

@hayesgb
Copy link
Contributor

hayesgb commented Nov 28, 2022

Here's a data point.

Did this after seeing mixed results in some of my tests. The following is 10 runs of Query 1 on a 50GB datasets for H2O benchmarks with this branch vs dask==2022.11.1 against s3fs==2022.11.0. This data suggests users won't see a significant impact, and any difference we're seeing is likely noise.

Details: These are 10 runs of Query1 for each environment. We compare 10 runs on the same cluster, calling client.restart() between each run (left two boxplots) vs 10 runs with each run on a fresh cluster (right 2 boxplots).

image

@martindurant
Copy link
Member

@hayesgb , you are showing wall time, correct?

@mrocklin
Copy link
Member

@hayesgb can I ask you to dig into this and see what's actually happening? I would look at profiling, task streams, etc.. I'm also curious about the difference in the benchmarks that we've seen above, which seem inconsistent with what you're presenting here.

@rjzamora
Copy link
Member Author

@rjzamora I'd be curious about your thoughts here

I am having qualms about merging this PR for two reasons:

  1. It feels "hacky" to construct an fsspec filesystem on the client, only to replace it with an additional pyarrow filesystem at data-ingest time. This feels even worse now that @martindurant has pointed out that storage_options is not guaranteed to include all necessary configuration options (and so an empty storage_options argument does not mean we will successfully construct a S3FileSystem object).
  2. It is not clear to me (yet) that switching from fsspec-pyarrow provides a consistent performance bump. I have seen mixed results locally. I am only "comfortable" with (1) if it is clear that we are improving performance in most (real-world) cases.

One possible "middle-ground" solution is to make it easier for the user to define/pass-in the desired filesystem themselves:

import dask.dataframe as dd
import pyarrow.filesystem as pa_fs

fs = pa_fs.S3FileSystem(...)
# OR MAYBE allow {"arrow", "fsspec", <fs-object>}
ddf = dd.read_parquet("s3://...", filesystem=fs)

The "arrow" engine could simply add non-default logic to wrap a Pyarrow-based filesystem with ArrowFSWrapper (and the default logic would raise an error for anything other than an fsspec filesystem implementation).

The obvious drawback to this approach is that it doesn't address the original goal of automatically using a Pyarrow-based filesystem for s3. With that said, it does provide a more-intuitive mechanism for overriding the default behavior.

Also , if we do find that S3FileSystem consistently outperforms s3fs for GIL-related reasons, we could use the clearly-defined filesystem= argument to make default behavior engine-dependent. For example, engine="arrow" could be designed to use from_uri to infer the filesystem, while the default engine would use fsspec's url_to_fs.

@hayesgb
Copy link
Contributor

hayesgb commented Nov 28, 2022

@martindurant -- Yes. Sorry that wasn't clear.

@mrocklin -- Of course. I've done some digging already. Planned to dig deeper.

@mrocklin
Copy link
Member

I would like for us to find a solution where the default behavior of reading Parquet from S3 was close to hardware speeds. I don't think that this should require special configuration.

@rjzamora
Copy link
Member Author

I would like for us to find a solution where the default behavior of reading Parquet from S3 was close to hardware speeds. I don't think that this should require special configuration.

Yeah, I agree that we need good default behavior here. However, I'm just pointing out that this PR feels like a footgun to me. I think it makes sense to enable the use of an optimized filesystem object, but my gut tells me that we should be very transparent about it. For example, #9699 is still very rough, but it proposes a mechanism to use a Pyarrow-based filesystem for metadata parsing and IO (in a way that may be easier to document).

@mrocklin
Copy link
Member

Footguns sound bad. Do you have suggestions for how to make the default behavior fast? Does #9699 bring us towards that outcome?

@hayesgb
Copy link
Contributor

hayesgb commented Nov 29, 2022

Additional work on this topic, which makes a pretty solid case for some version of this PR being implemented. Shoutout to @gjoseph92 for his help with profiling, and some digging to find thatpyarrow.S3Filesystem starts 6 threads by default.

Summary
Evidence below indicates that, in this use case, the pa.S3Filesystem will yield similar to faster end-to-end wall clock times when running workloads that originate from S3 as Parquet Datasets. Differences are more substantial as the number of cores per worker increases.

All of these runs are with "Query1" from a 50GB H2O-benchmark dataset. We run on 3 different types of EC2 instances, as follows:
m6i.large -- 10 workers, 2 cores/worker
m6i.xlarge -- 5 workers, 4 cores/workers
m6i.2xlarge -- 3 workers, 8 cores/worker

All of these machines a maximum network bandwidth of "up to 12.5Gbps". Networking performance of these instances is burstable. We have between 10 and 30 repeats, depending on the run.

We're comparing s3fs + dask==2022.11.1 vs this PR.

boxplot5

pyspy profile from a single worker of the 10 worker cluster:
Screenshot 2022-11-29 at 1 31 09 PM

A few interesting things about this.

  1. pyspy shows 6 threads (52-57) that are not visible in the pyarrow.S3Filesystem run (below). Pyarrow starts 6 threads, but in the pa.S3Filesystem case, there's no interaction with Python, so they are not visible.
  2. in the above plot, we see pthread_cond_wait@@GLIBC_2.3.2, which is time spent waiting on Arrow Futures, takes 34 seconds, or 54% of the total time in this thread.
  3. The open call takes about 11 seconds (17% of total time).

For comparison, the below is the pyarrow.S3Filesystem profile.
Screenshot 2022-11-29 at 1 46 39 PM

  1. Note the difference in the number of threads.
  2. Time spent waiting on Arrow Futures is 13 seconds (37% of total time)

This plot is from the run with s3fs, and shows one of the pyarrow threads running fsspec code. These are the threads that are not visible in the pyarrow.S3Filesystem run.

Screenshot 2022-11-29 at 2 32 02 PM

Run to run variance -- When we run multiple runs on the same cluster with restarts between, wall clock times get less jittery. Presumably S3 is doing some caching of data, but the first 1-2 runs can be pretty variable in their run times.

@rjzamora
Copy link
Member Author

rjzamora commented Dec 1, 2022

@martindurant, @jrbourbeau and I had an offline discussion about this work. Rough summary of this discussion:


  • It is probably a bad idea to (automatically) use a different filesystem backend on the client and workers. For this reason, we should probably go in the direction of Make filesystem-backend configurable in read_parquet #9699.
    • This seems both dangerous and confusing to explain/debug
  • It makes sense to support a user-specified filesystem in read_parquet, since both fastparquet and pyarrow already allow this on their own.
  • It may take significant time before the performance gap between S3FileSystem and s3fs can be closed for the specific case of a multi-threaded Dask cluster that is co-located with storage (if ever). Therefore:
    • Users should definitely be able to pass in their own S3FileSystem filesystem in cases like this
    • The “arrow” read_parquet engine should probably try creating it’s own S3FileSystem on the client by default

Remaining Uncertainty:

  • How do we go about changing default behavior without doing more harm than good?

Tentative Plan:

  • Add filesystem argument to read_parquet
    • This could be a top-level argument or a dataset-level argument
    • Will need to add an optional class method to set the filesystem object to be used throughout the engine. Default will use ffspec
  • Always try to initialize an S3FileSystem in the “arrow” engine for paths with the “s3” protocol
    • Fall back to fsspec/s3fs in the case that S3FileSystem initialization fails
    • If default S3FileSystem-initialization does not fail, throw a meaningful error message if/when file-opening fails. The message should explain the recent change in behavior for s3 data, and tell the user how to use the filesystem argument.

(James and Martin: Feel free to make corrections)

@martindurant
Copy link
Member

Thanks for writing up the summary

@rjzamora
Copy link
Member Author

Closing this in favor of #9699 (but will re-open if necessary).

@rjzamora rjzamora closed this Dec 13, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

5 participants