Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Read Parquet directly into string[pyarrow] #9631

Closed
mrocklin opened this issue Nov 5, 2022 · 30 comments · Fixed by #9719
Closed

Read Parquet directly into string[pyarrow] #9631

mrocklin opened this issue Nov 5, 2022 · 30 comments · Fixed by #9719
Assignees
Labels
dataframe enhancement Improve existing functionality or make things work better io parquet

Comments

@mrocklin
Copy link
Member

mrocklin commented Nov 5, 2022

I would like to read parquet data directly into string pyarrow dtypes

So I'm trying this naively on a dataset:

import dask.dataframe as dd

df = dd.read_parquet(
    "s3://nyc-tlc/trip data/fhvhv_tripdata_2022-06.parquet", 
    split_row_groups=True, 
    use_nullable_dtypes=True,
)
df.dtypes
hvfhs_license_num               object
dispatching_base_num            object
originating_base_num            object
request_datetime        datetime64[ns]
on_scene_datetime       datetime64[ns]
pickup_datetime         datetime64[ns]
dropoff_datetime        datetime64[ns]
PULocationID                     int64
DOLocationID                     int64
trip_miles                     float64
trip_time                        int64
base_passenger_fare            float64
tolls                          float64
bcf                            float64
sales_tax                      float64
congestion_surcharge           float64
airport_fee                    float64
tips                           float64
driver_pay                     float64
shared_request_flag             object
shared_match_flag               object
access_a_ride_flag              object
wav_request_flag                object
wav_match_flag                  object
dtype: object

This is especially important to me because in this case one row group is 10GB when stored naively. The data is somewhat unreadable in its current state on modest machines. I also suspect that I'm spending almost all of my time just creating and then destroying Python objects.

Any thoughts @rjzamora @ian-r-rose ?

@github-actions github-actions bot added the needs triage Needs a response from a contributor label Nov 5, 2022
@hayesgb hayesgb mentioned this issue Nov 6, 2022
4 tasks
@jrbourbeau
Copy link
Member

Thanks @mrocklin. This sounds sensible. Something like use_nullable_dtypes=True + telling pandas to use string[pyarrow] as the default string type should do the trick.

Was the dataset originally written with extension string dtypes? Ideally I'd like to see us respect roundtripping

@mrocklin
Copy link
Member Author

mrocklin commented Nov 7, 2022

Was the dataset originally written with extension string dtypes

This one was not. This one is maintained by nyc tlc and I don't think that they (yet!) use Dask for data prep.

Something like use_nullable_dtypes=True + telling pandas to use string[pyarrow] as the default string type should do the trick

I would hope so, but I'm not sure that that's the case. In this case Dask/Pandas/Arrow aren't sure that the object dtypes are strings, they might be some other serialized object. In this case I'd like to explicitly tell the system "nope, this is definitely text data, please act accordingly".

@ncclementi
Copy link
Member

Was the dataset originally written with extension string dtypes? Ideally I'd like to see us respect roundtripping

@jrbourbeau We have data written with pyarrow strings, python strings and categorical (3 different sets), that we can use to test this if needed.

@phobson phobson added dataframe io parquet enhancement Improve existing functionality or make things work better and removed needs triage Needs a response from a contributor labels Nov 7, 2022
@rjzamora
Copy link
Member

rjzamora commented Nov 7, 2022

Was the dataset originally written with extension string dtypes

This one was not.

If I remember correctly (which probably don't), this may mean there is no way to do anything more efficient than using astype("string[pyarrow]") after the read. I was looking into this for #9476, but got distracted by other things.

@mrocklin
Copy link
Member Author

mrocklin commented Nov 7, 2022

I've got to believe that when Arrow reads this dataset they are able to see "yup, this is text" and read it in appropriately. Spark also. @jorisvandenbossche any suggestions?

@TomAugspurger
Copy link
Member

TomAugspurger commented Nov 7, 2022

I've got to believe that when Arrow reads this dataset they are able to see "yup, this is text" and read it in appropriately.

I think that's right. See pandas-dev/pandas#42664 (comment)

@mrocklin are you able to set pd.option_context("string_storage", "pyarrow") on the workers before reading the data? And maybe doing it just on the client would be sufficient for checking the meta.

I'm not sure if that has a trip from arrow -> python objects -> arrow, but hopefully it's zero-copy from arrow memory to pandas.

@mrocklin
Copy link
Member Author

mrocklin commented Nov 7, 2022

Still no dice on the problem above:

In [1]: import pandas as pd

In [2]: pd.option_context("string_storage", "pyarrow")
Out[2]: <pandas._config.config.option_context at 0x10295d690>

In [3]: import dask.dataframe as dd
   ...:
   ...: df = dd.read_parquet(
   ...:     "s3://nyc-tlc/trip data/fhvhv_tripdata_2022-06.parquet",
   ...:     split_row_groups=True,
   ...:     use_nullable_dtypes=True,
   ...: )
   ...: df.dtypes
   ...:
Out[3]:
hvfhs_license_num               object
dispatching_base_num            object
originating_base_num            object
request_datetime        datetime64[ns]
on_scene_datetime       datetime64[ns]
pickup_datetime         datetime64[ns]
dropoff_datetime        datetime64[ns]
PULocationID                     int64
DOLocationID                     int64
trip_miles                     float64
trip_time                        int64
base_passenger_fare            float64
tolls                          float64
bcf                            float64
sales_tax                      float64
congestion_surcharge           float64
airport_fee                    float64
tips                           float64
driver_pay                     float64
shared_request_flag             object
shared_match_flag               object
access_a_ride_flag              object
wav_request_flag                object
wav_match_flag                  object
dtype: object

I think that the problem here is less about the string[pyarrow] vs string[python] distinction as it is about the string vs object distinction.

@jorisvandenbossche
Copy link
Member

I didn't test in combination with dask, but just using pandas, the following seems to work:

import pyarrow as pa
import pyarrow.parquet as pq
pq.write_table(pa.table({"col": ['a', 'b', 'c']}), "test_string.parquet")

In [5]: pd.read_parquet("test_string.parquet").dtypes
Out[5]: 
col    object
dtype: object

In [6]: pd.read_parquet("test_string.parquet", use_nullable_dtypes=True).dtypes
Out[6]: 
col    string
dtype: object

In [7]: pd.options.mode.string_storage = "pyarrow"

In [8]: pd.read_parquet("test_string.parquet", use_nullable_dtypes=True).dtypes
Out[8]: 
col    string
dtype: object

In [9]: pd.read_parquet("test_string.parquet", use_nullable_dtypes=True).dtypes[0]
Out[9]: string[pyarrow]

So it might be some issue in how dask is propagating those keywords to pandas?

@mrocklin
Copy link
Member Author

mrocklin commented Nov 7, 2022

I was suspicious that this might be due to PyArrow writing metadata differently than however this data in the wild was written so I read the dataset with Pandas. I received the following:

In [1]: import pandas as pd

In [2]: pd.option_context("string_storage", "pyarrow")
Out[2]: <pandas._config.config.option_context at 0x106be1720>

In [3]: %time df = pd.read_parquet("s3://nyc-tlc/trip data/fhvhv_tripdata_2022-0
   ...: 1.parquet", use_nullable_dtypes=True)
CPU times: user 10.3 s, sys: 4.89 s, total: 15.2 s
Wall time: 59.5 s

In [4]: df.dtypes
Out[4]:
hvfhs_license_num               string
dispatching_base_num            string
originating_base_num            string
request_datetime        datetime64[ns]
on_scene_datetime       datetime64[ns]
pickup_datetime         datetime64[ns]
dropoff_datetime        datetime64[ns]
PULocationID                     Int64
DOLocationID                     Int64
trip_miles                     Float64
trip_time                        Int64
base_passenger_fare            Float64
tolls                          Float64
bcf                            Float64
sales_tax                      Float64
congestion_surcharge           Float64
airport_fee                    Float64
tips                           Float64
driver_pay                     Float64
shared_request_flag             string
shared_match_flag               string
access_a_ride_flag              string
wav_request_flag                string
wav_match_flag                  string
dtype: object

In [5]: df.hvfhs_license_num.head()
Out[5]:
0    HV0003
1    HV0003
2    HV0003
3    HV0003
4    HV0003
Name: hvfhs_license_num, dtype: string

In [6]: df.hvfhs_license_num.head().values
Out[6]:
<StringArray>
['HV0003', 'HV0003', 'HV0003', 'HV0003', 'HV0003']
Length: 5, dtype: string

In [9]: df.memory_usage(deep=True)
Out[9]:
Index                         128
hvfhs_license_num       929350233
dispatching_base_num    929350233
originating_base_num    839144900
request_datetime        118012728
on_scene_datetime       118012728
pickup_datetime         118012728
dropoff_datetime        118012728
PULocationID            132764319
DOLocationID            132764319
trip_miles              132764319
trip_time               132764319
base_passenger_fare     132764319
tolls                   132764319
bcf                     132764319
sales_tax               132764319
congestion_surcharge    132764319
airport_fee             132764319
tips                    132764319
driver_pay              132764319
shared_request_flag     855592278
shared_match_flag       855592278
access_a_ride_flag      855592278
wav_request_flag        855592278
wav_match_flag          855592278
dtype: int64

It looks like Pandas is properly getting the string dtype (🎉).

It's not clear to me that it's using pyarrow here for storage. Those storage sizes seem large to me given the values.

@jrbourbeau
Copy link
Member

I think you want to use pd.set_option("mode.string_storage", "pyarrow") instead of pd.option_context("string_storage", "pyarrow")

In [1]: import pandas as pd

In [2]: pd.set_option("mode.string_storage", "pyarrow")

In [3]: df = pd.read_parquet("s3://nyc-tlc/trip data/fhvhv_tripdata_2022-01.parquet", use_nullable_dtypes=True)

In [4]: df.hvfhs_license_num.dtype
Out[4]: string[pyarrow]

@mrocklin
Copy link
Member Author

mrocklin commented Nov 7, 2022

Oh cool. And memory usage is lower in that case too.

OK, I guess then this becomes more a question of how Dask could be leveraging this as well

@mroeschke
Copy link
Contributor

As an aside, but possibly related, in pandas 2.0 there will be another option pd.set_option("io.nullable_backend", "pandas"|"pyarrow") that can be used in conjunction with engine="pyarrow" in IO methods to get back pyarrow types for all columns: https://pandas.pydata.org/docs/dev/whatsnew/v2.0.0.html#configuration-option-io-nullable-backend-to-return-pyarrow-backed-dtypes-from-io-functions

In [29]: df = pd.DataFrame({"1": 1, "2": 2.0, "3": "a", "4": pd.Timestamp.now(), "5": pd.Timedelta(1), "6": True}, index=[0])

In [30]: df
Out[30]:
   1    2  3                          4                         5     6
0  1  2.0  a 2022-11-10 16:13:07.694391 0 days 00:00:00.000000001  True

In [31]: df.to_parquet("tmp.parquet")

In [32]: with pd.option_context("io.nullable_backend", "pyarrow"):
    ...:     df_back = pd.read_parquet("tmp.parquet", engine="pyarrow", use_nullable_dtypes=True)
    ...:

In [33]: df_back.dtypes
Out[33]:
1                            int64[pyarrow]
2                           double[pyarrow]
3                           string[pyarrow]
4                    timestamp[us][pyarrow]
5                     duration[ns][pyarrow]
6                             bool[pyarrow]
__index_level_0__            int64[pyarrow]
dtype: object

In [34]: pd.__version__
Out[34]: '2.0.0.dev0+639.gf81f68720e'

From a quick look, dd.read_parquet doesn't leverage pd.read_parquet correct?

@jrbourbeau
Copy link
Member

Following up here, #9617, which adds support for use_nullable_dtypes has been merged

@jrbourbeau
Copy link
Member

Okay, so with the current main branch of dask we can set use_nullable_dtypes=True to upcast datatypes (at the pyarrow.Table->pd.DataFrame conversion level) to nullable dtypes (woo 🎉 ).

This works great for things like Int64, boolean, Float64, etc. However, this doesn't give us pyarrow strings by default because pandas' default extension string type is string[python] not string[pyarrow]. Fortunately, pandas has an option we can set to make pyarrow strings the default extension string type

In [6]: import pandas as pd

In [7]: pd.set_option("mode.string_storage", "pyarrow")

For Dask users on a single machine, this works great. dd.read_parquet(..., user_nullable_types=True) will respect pandas' string_storage option and return pyarrow strings. However, for Dask users using multi-machine clusters, the string_storage pandas option actually needs to be set on all the workers in the cluster to work as expected. There are ways to do this today, but they're definitely not straightforward (e.g. via client.run or a preload script). I wouldn't expect a new, or even intermediate, Dask users to know to do this.

We've seen some significant memory reductions by using pyarrow strings and it'd be good for Dask users to get those benefits by default (or as close to default as we think is reasonable). One option I see is inside dd.read_parquet we ask what pd.get_option("mode.string_storage") is and we include that in the graph. We then call pd.set_option prior to the pyarrow.Table->pd.DataFrame conversion call. This is nice because it gives a very idiomatic pandas experience both on single- and multi-machine Dask deployments (i.e. Dask and pandas behave exactly the same -- woo).

Thinking a bit more ambitiously, I'm wondering if it makes sense for Dask check if pyarrow is installed and, if it is, automatically prefer string[pyarrow] over string[python]. This gives us "use performant strings by default", but also introduces a difference in behavior with pandas, which can be confusing for users. Now that I've written this option out, I'm curious what pandas devs think about adding this behavior upstream (cc @mroeschke @jorisvandenbossche). If pyarrow is installed, is there a reason to still prefer string[python] over string[arrow]?

@mroeschke
Copy link
Contributor

From an exposure point of view, it would be very cool to see Dask opt into the string[pyarrow] type. Anecdotally, the underlying ArrowExtensionArray backing ArrowStringArray has been getting more development attention (more recently performance improvements) than the StringArray paired with string[python]

For string[pyarrow], some of the computational methods don't have pyarrow.compute equivalents (yet?) and fallback to a numpy-array equivalent operation which may be less slightly less performant

@jrbourbeau
Copy link
Member

That's great to know. Given it's getting more attention, and there seem to be large performance benefits in lots of cases, I'm curious if switching to string[pyarrow] by default (if pyarrow is available) has come up in the pandas community. Do you think there would be an appetite for that?

For string[pyarrow], some of the computational methods don't have pyarrow.compute equivalents (yet?) and fallback to a numpy-array equivalent operation which may be less slightly less performant

My current rough understanding is, in general, string[pyarrow] performance is greater than, or equal to, string[python] -- does that seem like a good rule of thumb?

@mroeschke
Copy link
Contributor

My current rough understanding is, in general, string[pyarrow] performance is greater than, or equal to, string[python] -- does that seem like a good rule of thumb?

Yeah definitely

# pyarrow.__version__ == 8.0.1

In [1]: ser = pd.Series(["a"] * 1_000_000, dtype="string[pyarrow]")

In [2]: ser_pyarrow = pd.Series(["a"] * 1_000_000, dtype="string[pyarrow]")

In [4]: ser_python = pd.Series(["a"] * 1_000_000, dtype="string[python]")

In [5]: %timeit ser_pyarrow.str.replace("a", "A")
16.8 ms ± 264 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)

In [6]: %timeit ser_python.str.replace("a", "A")
236 ms ± 1.51 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

And if pyarrow backed data needs to fallback on a slower path, the user should be warned

In [8]: ser_pyarrow.str.replace("a", "A", case=False)
<ipython-input-8-9fcf10dba08a>:1: PerformanceWarning: Falling back on a non-pyarrow code path which may decrease performance.
  ser_pyarrow.str.replace("a", "A", case=False)
Out[8]:
0         A
1         A
2         A
3         A
4         A
         ..
999995    A
999996    A
999997    A
999998    A
999999    A
Length: 1000000, dtype: string

@jrbourbeau
Copy link
Member

FYI for folks here, in #9719 I'm suggesting we extend use_nullable_dtypes to support "pandas" and "pyarrow" (similar-ish to pandas nullable_backend config option) to allow for easy reading of parquet datasets into pyarrow-backed extension dtypes.

With that PR, @mrocklin's original example looks like:

In [1]: import dask.dataframe as dd

In [2]: df = dd.read_parquet(
   ...:     "s3://nyc-tlc/trip data/fhvhv_tripdata_2022-06.parquet",
   ...:     split_row_groups=True,
   ...:     use_nullable_dtypes="pyarrow",
   ...: )

In [3]: df.dtypes
Out[3]:
hvfhs_license_num                       string
dispatching_base_num                    string
originating_base_num                    string
request_datetime        timestamp[us][pyarrow]
on_scene_datetime       timestamp[us][pyarrow]
pickup_datetime         timestamp[us][pyarrow]
dropoff_datetime        timestamp[us][pyarrow]
PULocationID                    int64[pyarrow]
DOLocationID                    int64[pyarrow]
trip_miles                     double[pyarrow]
trip_time                       int64[pyarrow]
base_passenger_fare            double[pyarrow]
tolls                          double[pyarrow]
bcf                            double[pyarrow]
sales_tax                      double[pyarrow]
congestion_surcharge           double[pyarrow]
airport_fee                    double[pyarrow]
tips                           double[pyarrow]
driver_pay                     double[pyarrow]
shared_request_flag                     string
shared_match_flag                       string
access_a_ride_flag                      string
wav_request_flag                        string
wav_match_flag                          string
dtype: object

Note that string in the above are actually still pyarrow-backed

In [4]: df.wav_match_flag.dtype
Out[4]: string[pyarrow]

As a side not, @mroeschke I've been confused a few times where pyarrow-backed strings sometimes appear as "string[pyarrow]" and sometimes just "string". I definitely could be missing some context, but why not just always have "string[pyarrow]" displayed?

@mrocklin
Copy link
Member Author

mrocklin commented Dec 6, 2022

I'm excited about this. Two questions:

  1. The name makes sense from a pandas internals perspective, but doesn't make much sense from a naive user perspective. Putting on my naive user hat, nullable dtypes are different from pyarrow strings.
  2. Is it possible to make this default in some way with configuration? Or would the plan be for us to try this manually for a while, and then switch the default in the read_parquet definition sometime?

@mrocklin
Copy link
Member Author

mrocklin commented Dec 6, 2022

I care less about the name if there is a good path towards making this default

@jrbourbeau
Copy link
Member

I'm excited about this

Nice, glad to hear it

The name makes sense from a pandas internals perspective, but doesn't make much sense from a naive user perspective. Putting on my naive user hat, nullable dtypes are different from pyarrow strings.

Hm I'm not sure I agree. I view the pyarrow-backed dtypes (e.g. int64[pyarrow]) as nullable in the sense that they have a notion of missing data and work with pd.NA. Totally agree they're different than the numpy-backed nullable dtypes in pandas (e.g. Int64) but I think "nullable" still applies to both. IIUC pandas is also using the term nullable in this context in user-facing APIs.

Is it possible to make this default in some way with configuration? Or would the plan be for us to try this manually for a while, and then switch the default in the read_parquet definition sometime?

I think both. I like the manual keyword here while we try things out, but long-term I think we'll want to add a dask.dataframe.nullable_backend (or some other name) config option that puts Dask into "pyarrow mode". I think there's a smooth path here for (at some point) making pyarrow mode configurable / default.

@mrocklin
Copy link
Member Author

mrocklin commented Dec 7, 2022

The term nullable to me means "a data type that supports an NA or missing value". I agree that that's consistent with pyarrow strings and other pyarrow types, but it's not the first thing that comes to mind.

If I'm a user and I'm saying "gosh, I want those fast arrow data types. How do I get those?" If I see "nullable" I'm not going to immediately say "Yes! that's clearly the fast arrow thing that I want"

It feels a bit like putting bananas in the dairy section of a super market because they're white. Yes, they do fit. But no, it's not the term that I would normally search for them by.

Maybe that doesn't make sense though. I don't know.

Again, I care less if we're going to move this to be default.

@mroeschke
Copy link
Contributor

Putting on my naive user hat, nullable dtypes are different from pyarrow strings.

This is really interesting feedback coming from the pandas side and would help market "why should I use pyarrow" in pandas too.

pandas (I) originally grouped the arrow types with the pandas (numpy based) nullable dtypes since they share the common and original motivation for the nullable dtypes - a dtype independent way to represent missing values. Hence why in pandas we also want to let to user choose arrow via nullable-related keywords e.g. use_nullable_dtypes

The arrow types also happen to generally be more performant too so maybe they should be associated with more performance based keywords? e.g. engine

@mroeschke
Copy link
Contributor

As a side not, @mroeschke I've been confused a few times where pyarrow-backed strings sometimes appear as "string[pyarrow]" and sometimes just "string". I definitely could be missing some context, but why not just always have "string[pyarrow]" displayed?

@jrbourbeau I would consider this a bug. If it's easy to get a simple reproducer, it would be great to have as a pandas issue

@mrocklin
Copy link
Member Author

mrocklin commented Dec 7, 2022

If pandas is using the term nulalble for this kind of thing then I'm happy to back off from a Dask dataframe perspective. We generally just follow whatever the upstream library does. Please ignore my feedback within the context of this PR.

As a pandas user though, this does feel strange to me though. I understand how we got to "nullable" as a term here, but it doesn't feel right to me as someone who has just arrived at this situation.

@jrbourbeau
Copy link
Member

If it's easy to get a simple reproducer, it would be great to have as a pandas issue

Opened pandas-dev/pandas#50099

It feels a bit like putting bananas in the dairy section of a super market because they're white. Yes, they do fit. But no, it's not the term that I would normally search for them by.

I feel like there's a banana split joke to be had here, I just can find it... That said, it's a fair point.

The arrow types also happen to generally be more performant too so maybe they should be associated with more performance based keywords? e.g. engine

@mroeschke how hardened is the relationship between "nullable" and the numpy- and pyarrow-backed extension dtypes in pandas? I know use_nullable_dtypes has been out since pandas=1.2.0, but that's the only user-facing API I'm aware of. Given the nullable_backend option hasn't been released yet (and is also going through a name change at the moment anyways), do you think there's another term that would fit user expectations more? engine, dtype_backend, and dtype_engine come to mind for me -- @mrocklin do you have a term that you like for this concept?

@mrocklin
Copy link
Member Author

mrocklin commented Dec 7, 2022

dtypes="arrow"? I don't really know enough context to have a good idea here.

Also, if we're ok moving the default around in the future then I care much less. It seems like arrow support is pretty good these days. I'm hopeful that the developers and experts will use it, see that it works well, and then we'll switch the defaults around. The devs and experts won't care much about the name.

@mroeschke
Copy link
Contributor

how hardened is the relationship between "nullable" and the numpy- and pyarrow-backed extension dtypes in pandas?

As of 1.5 not really hardened. Their only association is just documenting that they both use pd.NA to (externally) represent missing values. I'll bring this topic up on our next community dev meeting on Wednesday

@jrbourbeau
Copy link
Member

Thanks @mroeschke. I saw online that meeting is open to the public, so I might drop by to say hi and represent Dask folks there

@mroeschke
Copy link
Contributor

Awesome! I put it on the meeting agenda so would be great to hear a representative pitch from the Dask side

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
dataframe enhancement Improve existing functionality or make things work better io parquet
Projects
None yet
8 participants