Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

polars.EagerPolarsDataset fails to read parquet #590

Open
mark-druffel opened this issue Mar 1, 2024 · 16 comments
Open

polars.EagerPolarsDataset fails to read parquet #590

mark-druffel opened this issue Mar 1, 2024 · 16 comments
Labels
bug Something isn't working datasets

Comments

@mark-druffel
Copy link

mark-druffel commented Mar 1, 2024

Description

Trying to load a parquet w/ polars.EagerPolarsDataset with data catalog results in an error saying no such file or directory :

model_input_table@eagerPolars:
  type: polars.EagerPolarsDataset
  filepath: data/03_primary/model_input_table.parquet/*.parquet
  file_format: parquet

Context

I'm working on a new pipeline, bug is brand new to me. I'm trying to load parquets into my pipeline using polars. I realized lazy polars seems to work fine so this isn't preventing me from doing anything at the moment.

Steps to Reproduce

I reproduced the issue with the kedro starter pipeline in a conda environment.

  1. Create project by running kedro_new --name polars_issue --tools all --example y
  2. kedro run to populate data/.
  3. Edit requirements.txt to include polars and run pip install requirements.txt:
# requirements.txt
ipython>=8.10
jupyterlab>=3.0
kedro~=0.19.3
kedro-datasets[pandas.CSVDataset, pandas.ExcelDataset, pandas.ParquetDataset, polars.EagerPolarsDataset, polars.LazyPolarsDataset, spark.SparkDataset, plotly.PlotlyDataset, plotly.JSONDataset, matplotlib.MatplotlibWriter]>=1.0
kedro-telemetry>=0.3.1
kedro-viz>=6.7.0
notebook
pytest~=7.2
pytest-cov~=3.0
pytest-mock>=1.7.1, <2.0
ruff~=0.1.8
scikit-learn~=1.0
seaborn~=0.12.1
  1. Append two entries to conf/base/catalog.yml:
# catalog.yml
model_input_table@eagerPolars:
  type: polars.EagerPolarsDataset
  filepath: data/03_primary/model_input_table.parquet/*.parquet
  file_format: parquet

model_input_table@lazyPolars:
  type: polars.LazyPolarsDataset
  filepath: data/03_primary/model_input_table.parquet/*.parquet
  file_format: parquet
  1. Run the below code in a kedro session (jupyter or ipython):
import polars as pl
df_with_polars = pl.read_parquet(source = "data/03_primary/model_input_table.parquet/*.parquet")
print(df_with_polars)

df_lazy = catalog.load("model_input_table@lazyPolars")
print(df_lazy)
print(df_lazy.collect())

df_eager = catalog.load("model_input_table@eagerPolars")
print(df_eager)

Expected Result

The code below should work the same as `pl.read_parquet() and LazyPolarsDataset did in steps to reproduce.

df_eager = catalog.load("model_input_table@eagerPolars")
print(df_eager)

Actual Result

02/29/24 21:21:22] INFO     Loading data from model_input_table@eagerPolars                    [data_catalog.py](file:///~/miniconda3/envs/polars_bug/lib/python3.10/site-packages/kedro/io/data_catalog.py):[483](file:///~/miniconda3/envs/polars_bug/lib/python3.10/site-packages/kedro/io/data_catalog.py#483)
                             (EagerPolarsDataset)...                                                               
╭─────────────────────────────── Traceback (most recent call last) ────────────────────────────────╮
│ ~/miniconda3/envs/polars_bug/lib/python3.10/site-packages/kedro/io/core.py:193 in   │
│ load                                                                                             │
│                                                                                                  │
│   190 │   │   self._logger.debug("Loading %s", str(self))                                        │
│   191 │   │                                                                                      │
│   192 │   │   try:                                                                               │
│ ❱ 193 │   │   │   return self._load()                                                            │
│   194 │   │   except DatasetError:                                                               │
│   195 │   │   │   raise                                                                          │
│   196 │   │   except Exception as exc:                                                           │
│                                                                                                  │
│ ~/miniconda3/envs/polars_bug/lib/python3.10/site-packages/kedro_datasets/polars/eag │
│ er_polars_dataset.py:156 in _load                                                                │
│                                                                                                  │
│   153 │   │   │   │   " API"                                                                     │
│   154 │   │   │   │   " https://pola-rs.github.io/polars/py-polars/html/reference/io.html"       │
│   155 │   │   │   )                                                                              │
│ ❱ 156 │   │   with self._fs.open(load_path, **self._fs_open_args_load) as fs_file:               │
│   157 │   │   │   return load_method(fs_file, **self._load_args)                                 │
│   158 │                                                                                          │
│   159 │   def _save(self, data: pl.DataFrame) -> None:                                           │
│                                                                                                  │
│ ~/miniconda3/envs/polars_bug/lib/python3.10/site-packages/fsspec/spec.py:1295 in    │
│ open                                                                                             │
│                                                                                                  │
│   1292 │   │   │   )                                                                             │
│   1293 │   │   else:                                                                             │
│   1294 │   │   │   ac = kwargs.pop("autocommit", not self._intrans)                              │
│ ❱ 1295 │   │   │   f = self._open(                                                               │
│   1296 │   │   │   │   path,                                                                     │
│   1297 │   │   │   │   mode=mode,                                                                │
│   1298 │   │   │   │   block_size=block_size,                                                    │
│                                                                                                  │
│ ~/miniconda3/envs/polars_bug/lib/python3.10/site-packages/fsspec/implementations/lo │
│ cal.py:180 in _open                                                                              │
│                                                                                                  │
│   177 │   │   path = self._strip_protocol(path)                                                  │
│   178 │   │   if self.auto_mkdir and "w" in mode:                                                │
│   179 │   │   │   self.makedirs(self._parent(path), exist_ok=True)                               │
│ ❱ 180 │   │   return LocalFileOpener(path, mode, fs=self, **kwargs)                              │
│   181 │                                                                                          │
│   182 │   def touch(self, path, truncate=True, **kwargs):                                        │
│   183 │   │   path = self._strip_protocol(path)                                                  │
│                                                                                                  │
│ ~/miniconda3/envs/polars_bug/lib/python3.10/site-packages/fsspec/implementations/lo │
│ cal.py:302 in __init__                                                                           │
│                                                                                                  │
│   299 │   │   self.autocommit = autocommit                                                       │
│   300 │   │   self.compression = get_compression(path, compression)                              │
│   301 │   │   self.blocksize = io.DEFAULT_BUFFER_SIZE                                            │
│ ❱ 302 │   │   self._open()                                                                       │
│   303 │                                                                                          │
│   304 │   def _open(self):                                                                       │
│   305 │   │   if self.f is None or self.f.closed:                                                │
│                                                                                                  │
│ ~/miniconda3/envs/polars_bug/lib/python3.10/site-packages/fsspec/implementations/lo │
│ cal.py:307 in _open                                                                              │
│                                                                                                  │
│   304 │   def _open(self):                                                                       │
│   305 │   │   if self.f is None or self.f.closed:                                                │
│   306 │   │   │   if self.autocommit or "w" not in self.mode:                                    │
│ ❱ 307 │   │   │   │   self.f = open(self.path, mode=self.mode)                                   │
│   308 │   │   │   │   if self.compression:                                                       │
│   309 │   │   │   │   │   compress = compr[self.compression]                                     │
│   310 │   │   │   │   │   self.f = compress(self.f, mode=self.mode)                              │
╰──────────────────────────────────────────────────────────────────────────────────────────────────╯
FileNotFoundError: [Errno 2] No such file or directory: 
'~/polars-bug/data/03_primary/model_input_table.parquet/*.parquet'

The above exception was the direct cause of the following exception:

╭─────────────────────────────── Traceback (most recent call last) ────────────────────────────────╮
│ in <module>:1                                                                                    │
│                                                                                                  │
│ ❱ 1 df_eager = catalog.load("model_input_table@eagerPolars")                                     │
│   2 print(df_eager)                                                                              │
│   3                                                                                              │
│                                                                                                  │
│ ~/miniconda3/envs/polars_bug/lib/python3.10/site-packages/kedro/io/data_catalog.py: │
│ 490 in load                                                                                      │
│                                                                                                  │
│   487 │   │   │   extra={"markup": True},                                                        │
│   488 │   │   )                                                                                  │
│   489 │   │                                                                                      │
│ ❱ 490 │   │   result = dataset.load()                                                            │
│   491 │   │                                                                                      │
│   492 │   │   return result                                                                      │
│   493                                                                                            │
│                                                                                                  │
│ ~/miniconda3/envs/polars_bug/lib/python3.10/site-packages/kedro/io/core.py:615 in   │
│ load                                                                                             │
│                                                                                                  │
│   612 │   │   return self._filepath / version / self._filepath.name                              │
│   613 │                                                                                          │
│   614 │   def load(self) -> _DO:                                                                 │
│ ❱ 615 │   │   return super().load()                                                              │
│   616 │                                                                                          │
│   617 │   def save(self, data: _DI) -> None:                                                     │
│   618 │   │   self._version_cache.clear()                                                        │
│                                                                                                  │
│ ~/miniconda3/envs/polars_bug/lib/python3.10/site-packages/kedro/io/core.py:202 in   │
│ load                                                                                             │
│                                                                                                  │
│   199 │   │   │   message = (                                                                    │
│   200 │   │   │   │   f"Failed while loading data from data set {str(self)}.\n{str(exc)}"        │
│   201 │   │   │   )                                                                              │
│ ❱ 202 │   │   │   raise DatasetError(message) from exc                                           │
│   203 │                                                                                          │
│   204 │   def save(self, data: _DI) -> None:                                                     │
│   205 │   │   """Saves data by delegation to the provided save method.                           │
╰──────────────────────────────────────────────────────────────────────────────────────────────────╯
DatasetError: Failed while loading data from data set EagerPolarsDataset(file_format=parquet, 
filepath=~/polars-bug/data/03_primary/model_input_table.parquet/*.parquet, load_args={}, 
protocol=file, save_args={}).
[Errno 2] No such file or directory: 
'~/polars-bug/data/03_primary/model_input_table.parquet/*.parquet'

Your Environment

Include as many relevant details about the environment in which you experienced the bug:

  • Kedro version used (pip show kedro or kedro -V): kedro, version 0.19.3
  • Kedro plugin and kedro plugin version used (pip show kedro-airflow): kedro-datasets 2.1.0
  • Python version used (python -V): Python 3.10.13
  • Operating system and version: Mac Sonoma 14.3.1
@grofte
Copy link

grofte commented Mar 6, 2024

It's because Kedro opens the file and somehow doesn't recognize it as a bytes / io.BufferedIOBase / io.RawIOBase (or Polars doesn't) and therefore sends it to .scan_parquet which only takes paths as an argument.

If you do this is catalog.yml it will work:

load_args:
    use_pyarrow: true

Kedro sends it off to Polars here: https://github.com/kedro-org/kedro-plugins/blob/main/kedro-datasets/kedro_datasets/polars/eager_polars_dataset.py#L156

And here Polars checks if the file is already opened: https://github.com/pola-rs/polars/blob/py-0.20.13/py-polars/polars/io/parquet/functions.py#L156

Then Polars decides that it isn't opened and goes here: https://github.com/pola-rs/polars/blob/py-0.20.13/py-polars/polars/io/parquet/functions.py#L171

and it goes to .scan_parquet. And that only works with Paths or path strings (hence it doesn't find the path).

Either it is a Kedro bug or a Polars bug but I don't know which.

@astrojuanlu Apparently this has already been reported.

@astrojuanlu
Copy link
Member

Thanks @grofte and sorry @mark-druffel for the delay, this fell off my radar. I'm labeling this accordingly and adding it to our backlog.

@astrojuanlu astrojuanlu added bug Something isn't working datasets labels Mar 6, 2024
@mark-druffel
Copy link
Author

mark-druffel commented Mar 6, 2024

@grofte I could be doing it wrong, but I'm still getting an error with pyarrow. @astrojuanlu Definitely no rush on my side, this isn't holding anything up for my team. Just wanted to report in case it was helpful / uknown.

# catalog.yml
model_input_table@eagerPolars:
  type: polars.EagerPolarsDataset
  filepath: data/03_primary/model_input_table.parquet/*.parquet
  file_format: parquet
  load_args:
    use_pyarrow: true

I ran catalog.load("model_input_table@eagerPolars") inside a kedro ipython environment and got what appears to be the same error 🤷

In [1]: catalog.load("model_input_table@eagerPolars")
[03/06/24 10:57:56] INFO     Loading data from model_input_table@eagerPolars (EagerPolarsDataset)...                                   data_catalog.py:483
╭─────────────────────────────── Traceback (most recent call last) ────────────────────────────────╮
│ ~/lib/python3.10/site-packages/kedro/io/core.py:193 in   │
│ load                                                                                             │
│                                                                                                  │
│   190 │   │   self._logger.debug("Loading %s", str(self))                                        │
│   191 │   │                                                                                      │
│   192 │   │   try:                                                                               │
│ ❱ 193 │   │   │   return self._load()                                                            │
│   194 │   │   except DatasetError:                                                               │
│   195 │   │   │   raise                                                                          │
│   196 │   │   except Exception as exc:                                                           │
│                                                                                                  │
│ ~/lib/python3.10/site-packages/kedro_datasets/polars/eag │
│ er_polars_dataset.py:156 in _load                                                                │
│                                                                                                  │
│   153 │   │   │   │   " API"                                                                     │
│   154 │   │   │   │   " https://pola-rs.github.io/polars/py-polars/html/reference/io.html"       │
│   155 │   │   │   )                                                                              │
│ ❱ 156 │   │   with self._fs.open(load_path, **self._fs_open_args_load) as fs_file:               │
│   157 │   │   │   return load_method(fs_file, **self._load_args)                                 │
│   158 │                                                                                          │
│   159 │   def _save(self, data: pl.DataFrame) -> None:                                           │
│                                                                                                  │
│ ~/lib/python3.10/site-packages/fsspec/spec.py:1295 in    │
│ open                                                                                             │
│                                                                                                  │
│   1292 │   │   │   )                                                                             │
│   1293 │   │   else:                                                                             │
│   1294 │   │   │   ac = kwargs.pop("autocommit", not self._intrans)                              │
│ ❱ 1295 │   │   │   f = self._open(                                                               │
│   1296 │   │   │   │   path,                                                                     │
│   1297 │   │   │   │   mode=mode,                                                                │
│   1298 │   │   │   │   block_size=block_size,                                                    │
│                                                                                                  │
│ ~/lib/python3.10/site-packages/fsspec/implementations/lo │
│ cal.py:180 in _open                                                                              │
│                                                                                                  │
│   177 │   │   path = self._strip_protocol(path)                                                  │
│   178 │   │   if self.auto_mkdir and "w" in mode:                                                │
│   179 │   │   │   self.makedirs(self._parent(path), exist_ok=True)                               │
│ ❱ 180 │   │   return LocalFileOpener(path, mode, fs=self, **kwargs)                              │
│   181 │                                                                                          │
│   182 │   def touch(self, path, truncate=True, **kwargs):                                        │
│   183 │   │   path = self._strip_protocol(path)                                                  │
│                                                                                                  │
│ ~/lib/python3.10/site-packages/fsspec/implementations/lo │
│ cal.py:302 in __init__                                                                           │
│                                                                                                  │
│   299 │   │   self.autocommit = autocommit                                                       │
│   300 │   │   self.compression = get_compression(path, compression)                              │
│   301 │   │   self.blocksize = io.DEFAULT_BUFFER_SIZE                                            │
│ ❱ 302 │   │   self._open()                                                                       │
│   303 │                                                                                          │
│   304 │   def _open(self):                                                                       │
│   305 │   │   if self.f is None or self.f.closed:                                                │
│                                                                                                  │
│ ~/lib/python3.10/site-packages/fsspec/implementations/lo │
│ cal.py:307 in _open                                                                              │
│                                                                                                  │
│   304 │   def _open(self):                                                                       │
│   305 │   │   if self.f is None or self.f.closed:                                                │
│   306 │   │   │   if self.autocommit or "w" not in self.mode:                                    │
│ ❱ 307 │   │   │   │   self.f = open(self.path, mode=self.mode)                                   │
│   308 │   │   │   │   if self.compression:                                                       │
│   309 │   │   │   │   │   compress = compr[self.compression]                                     │
│   310 │   │   │   │   │   self.f = compress(self.f, mode=self.mode)                              │
╰──────────────────────────────────────────────────────────────────────────────────────────────────╯
FileNotFoundError: [Errno 2] No such file or directory: '/Users/m109993/Downloads/polars-bug/data/03_primary/model_input_table.parquet/*.parquet'

The above exception was the direct cause of the following exception:

╭─────────────────────────────── Traceback (most recent call last) ────────────────────────────────╮
│ in <module>:1                                                                                    │
│                                                                                                  │
│ ~/lib/python3.10/site-packages/kedro/io/data_catalog.py: │
│ 490 in load                                                                                      │
│                                                                                                  │
│   487 │   │   │   extra={"markup": True},                                                        │
│   488 │   │   )                                                                                  │
│   489 │   │                                                                                      │
│ ❱ 490 │   │   result = dataset.load()                                                            │
│   491 │   │                                                                                      │
│   492 │   │   return result                                                                      │
│   493                                                                                            │
│                                                                                                  │
│ ~/lib/python3.10/site-packages/kedro/io/core.py:615 in   │
│ load                                                                                             │
│                                                                                                  │
│   612 │   │   return self._filepath / version / self._filepath.name                              │
│   613 │                                                                                          │
│   614 │   def load(self) -> _DO:                                                                 │
│ ❱ 615 │   │   return super().load()                                                              │
│   616 │                                                                                          │
│   617 │   def save(self, data: _DI) -> None:                                                     │
│   618 │   │   self._version_cache.clear()                                                        │
│                                                                                                  │
│ ~/lib/python3.10/site-packages/kedro/io/core.py:202 in   │
│ load                                                                                             │
│                                                                                                  │
│   199 │   │   │   message = (                                                                    │
│   200 │   │   │   │   f"Failed while loading data from data set {str(self)}.\n{str(exc)}"        │
│   201 │   │   │   )                                                                              │
│ ❱ 202 │   │   │   raise DatasetError(message) from exc                                           │
│   203 │                                                                                          │
│   204 │   def save(self, data: _DI) -> None:                                                     │
│   205 │   │   """Saves data by delegation to the provided save method.                           │
╰──────────────────────────────────────────────────────────────────────────────────────────────────╯
DatasetError: Failed while loading data from data set EagerPolarsDataset(file_format=parquet, 
filepath=~/data/03_primary/model_input_table.parquet/*.parquet, load_args={'use_pyarrow': True}, protocol=file, 
save_args={}).
[Errno 2] No such file or directory: '~/data/03_primary/model_input_table.parquet/*.parquet'

@grofte
Copy link

grofte commented Mar 7, 2024

@mark-druffel
You are absolutely right. This is seems to be because Lazy goes to https://docs.pola.rs/py-polars/html/reference/api/polars.scan_parquet.html#polars.scan_parquet which supports globbing while Eager calls open() on your globbing pattern which doesn't work. Polars read_parquet doesn't seem to support globbing either.

I thought it was to same thing as I ran into so I didn't read your error messages closely enough. Sorry.

@grofte
Copy link

grofte commented Mar 7, 2024

If you wanted it to work with Eager I think you would have to use the dataset factory https://docs.kedro.org/en/stable/data/kedro_dataset_factories.html and then in your pipeline find out what files were there, create a node for each of them and then feed them into a node with a vstack to join them.

Much better to use the Lazy dataset ingestion and then call .collect() on it to go to eager / dataframe mode.

@merelcht merelcht added the Community Issue/PR opened by the open-source community label Mar 11, 2024
@astrojuanlu

This comment was marked as off-topic.

@astrojuanlu

This comment was marked as off-topic.

@astrojuanlu
Copy link
Member

@grofte @mark-druffel finally spent some time reflecting on this. Given that the underlying read_* methods do not support globbing, I'm voting for closing this as "won't fix" and encourage users to use polars.LazyPolarsDataset instead.

Thoughts?

@grofte
Copy link

grofte commented Mar 19, 2024

I think it would be nice with a method that checks load/save arguments on the Kedro classes since Kedro is a layer of indirection in front of Polars. To inform the user better when they hit these gotchas (path globbing and dtypes). I know you could do this in documentation but that also seems awkward. But I don't think eager should support globbing when Polars doesn't.

@mark-druffel
Copy link
Author

Sorry I'm not sure I follow the point on polars not globbing, probably because I have some flawed understanding of what's happening under the hood. Both these work for me in python environment, is this not what's happening with the two kedro datasets?

import polars as pl
# This code is eager
pl.read_parquet("~/*.parquet")
# This code is lazy
pl.scan_parquet("~/*.parquet")

@astrojuanlu Regardless of your response, I'm happy and unblocked. I would lean on your team with respect to what direction makes the most sense, I'm still too much of a novice (w/ kedro & polars) to have an authoritative opinion on what the code should do. Just let me know if you want me to close this issue and thanks!

@noklam
Copy link
Contributor

noklam commented Mar 20, 2024

(edited). There are some discussion of glob and open which I haven't confirmed. But I think we can simply the problem to be

# This work
import polars as pl 
path = '/workspace/kedro-plugins/polar-eager/data/03_primary/model_input_table.pq/*.parquet'
pl.read_parquet(path)

# This fail
catalog.load("model_input_table") # an eager dataset with filepath: '/workspace/kedro-plugins/polar-eager/data/03_primary/model_input_table.pq/*.parquet'

There are claims that polars doesn't work with glob with eager, this does not seem to be true (the code above show it works).

with self._fs.open(load_path, **self._fs_open_args_load) as fs_file:
return load_method(fs_file, **self._load_args)

If you change this in the _load method, it works immediately. So the real problem here is fsspec doesn't work with glob like this.

return load_method(load_path, **self._load_args) 

@grofte
Copy link

grofte commented Mar 20, 2024

@mark-druffel
I'm so sorry. I read the Polars docs and scan explicitly mentions globbing and read doesn't. But at second look read will take all the files in a directory if that's the path you give it. So makes sense that it also works with globbing.

If LazyDataset works with cloud buckets and globbing and Eager doesn't with globbing then I guess Kedro could just invoke scan and run .collect() before returning it? Maybe Eager could just inherit Lazy?

@astrojuanlu
Copy link
Member

So the real problem here is fsspec doesn't work with glob like this.

Thanks for investigating @noklam 💯 As @grofte , I was also confused by the lack of mentions to globbing in the read docs.

As far as I remember, Polars supports remote paths with its own native methods. Maybe we should ditch fsspec for this particular dataset?

@noklam
Copy link
Contributor

noklam commented Mar 20, 2024

As far as I remember, Polars supports remote paths with its own native methods. Maybe we should ditch fsspec for this particular dataset?

This is what we do with spark. If the support is good enough we can definitely do this. Though we have to be careful, pandas also support reading from s3 but we still use fsspec. We should check how well does it support other remote path (i.e. not s3).

@astrojuanlu
Copy link
Member

Yeah when I say "ditch fsspec for this particular dataset", I mean our specific boilerplate on the datasets code. I don't mind what mechanism does the underlying library use.

@astrojuanlu
Copy link
Member

It was noted in backlog grooming today by @noklam that our fsspec boilerplate is linked to our versioning, so it's not that easy to ditch fsspec.

@merelcht merelcht removed the Community Issue/PR opened by the open-source community label May 24, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working datasets
Projects
Status: No status
Development

No branches or pull requests

5 participants