Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Problem with concat and hive partitioned data in combination with in-memory data frames #16285

Open
2 tasks done
ericdoerheit opened this issue May 17, 2024 · 0 comments
Open
2 tasks done
Labels
bug Something isn't working needs triage Awaiting prioritization by a maintainer python Related to Python Polars

Comments

@ericdoerheit
Copy link

ericdoerheit commented May 17, 2024

Checks

  • I have checked that this issue has not already been reported.
  • I have confirmed this bug exists on the latest version of Polars.

Reproducible example

import os

import numpy as np
import polars as pl


def main():
    os.makedirs("tmp/year=2024/month=5", exist_ok=True)

    df1 = pl.DataFrame({
        "a": [1, 2, 3],
        "b": ["d", "e", "f"],
        "c": [1.1, 1.2, 1.3],
        "year": np.array([2024, 2024, 2024], dtype=np.int64),
        "month": np.array([5, 5, 5], dtype=np.int64),
    }).lazy()

    pl.DataFrame({"a": [4, 5, 6], "b": ["d", "e", "f"], "c": [2.1, 2.2, 2.3], }) \
        .write_parquet("tmp/year=2024/month=5/df.parquet")

    df2 = pl.scan_parquet(f'tmp/*/*/*.parquet', hive_partitioning=True)

    df = pl.concat([df1, df2])

    result = df.filter(pl.col("a") >= 4) \
        .select([pl.col("c").mean().alias("mean_c")]).collect()

    print("Test Result:", result)


if __name__ == "__main__":
    main()

Log output

UNION: union is run in parallel
parquet file must be read, statistics not sufficient for predicate.
Traceback (most recent call last):
  File "C:\Users\ericd\dataspree\Projects\dataspree-insights\scripts\test_concat_hot_and_cold_data.py", line 32, in <module>
    main()
  File "C:\Users\ericd\dataspree\Projects\dataspree-insights\scripts\test_concat_hot_and_cold_data.py", line 26, in main
    .select([pl.col("c").mean().alias("mean_c")]).collect()
  File "C:\Users\ericd\AppData\Roaming\Python\Python310\site-packages\polars\lazyframe\frame.py", line 1816, in collect
    return wrap_df(ldf.collect(callback))
polars.exceptions.ShapeError: unable to append to a DataFrame of width 2 with a DataFrame of width 4

Issue description

There is an issue when trying to concat lazy frames read from hive partitioned data with data that rests in-memory. This results in a ShapeError. When we ignore the hive partitioning (hive_partitioning=False), the example above works. Another workaround is to deactivate projection pushdown (projection_pushdown=False).

For reference, this is how the data frames look:

df1 shape: (3, 5)
┌─────┬─────┬─────┬──────┬───────┐
│ a   ┆ b   ┆ c   ┆ year ┆ month │
│ --- ┆ --- ┆ --- ┆ ---  ┆ ---   │
│ i64 ┆ str ┆ f64 ┆ i64  ┆ i64   │
╞═════╪═════╪═════╪══════╪═══════╡
│ 1   ┆ d   ┆ 1.1 ┆ 2024 ┆ 5     │
│ 2   ┆ e   ┆ 1.2 ┆ 2024 ┆ 5     │
│ 3   ┆ f   ┆ 1.3 ┆ 2024 ┆ 5     │
└─────┴─────┴─────┴──────┴───────┘
df2 shape: (3, 5)
┌─────┬─────┬─────┬──────┬───────┐
│ a   ┆ b   ┆ c   ┆ year ┆ month │
│ --- ┆ --- ┆ --- ┆ ---  ┆ ---   │
│ i64 ┆ str ┆ f64 ┆ i64  ┆ i64   │
╞═════╪═════╪═════╪══════╪═══════╡
│ 4   ┆ d   ┆ 2.1 ┆ 2024 ┆ 5     │
│ 5   ┆ e   ┆ 2.2 ┆ 2024 ┆ 5     │
│ 6   ┆ f   ┆ 2.3 ┆ 2024 ┆ 5     │
└─────┴─────┴─────┴──────┴───────┘

I found the following issues which might be related:

I created a similar test in Rust and it yields the same result:

use std::fs;
use polars::io::HiveOptions;
use polars::prelude::*;

const TEST_DIR: &str = "tmp/integration_tests/polars";
const YEAR: i32 = 2024;
const MONTH: i8 = 5;

/// Function that writes example data to a parquet file with hive partitioning.
fn write_example_data() {
    // Write a DataFrame to a parquet file with hive partitioning
    let mut data_frame = df! {
        "a" => [1, 2, 3],
        "b" => ["a", "b", "c"],
        "c" => [1.1, 1.2, 1.3],
    }.unwrap();

    let out_dir = format!("{}/year={}/month={}", TEST_DIR, YEAR, MONTH);
    match fs::create_dir_all(out_dir.clone()) {
        Ok(_) => println!("Successfully created directory: {}", out_dir.clone()),
        Err(e) => println!("Error creating directory: {}", e),
    }

    let file_path = format!("{}/data.parquet", out_dir);
    let file = fs::File::create(&file_path).expect("Failed to create file");
    let writer = ParquetWriter::new(file);
    writer.finish(&mut data_frame).expect("Failed to write data to file");
}

/// Function that scans the saved data from the parquet file.
fn get_saved_data() -> LazyFrame {
    let mut hive_options = HiveOptions::default();

    // Define the fields of the hive schema
    let fields = vec![
        Field::new("year", DataType::Int32),
        Field::new("month", DataType::Int8),
    ];

    // Create the schema
    let schema = Schema::from_iter(fields.iter().map(|f| f.clone()));

    // Create a reference to the schema
    let schema_ref = SchemaRef::new(schema);

    hive_options.schema = Some(schema_ref);
    hive_options.enabled = true;

    let mut scan_args = ScanArgsParquet::default();
    scan_args.hive_options = hive_options;

    // Use the scan parquet function to scan all parquet files in the directory
    let schema_path = format!("{}/*/*/*.parquet", TEST_DIR);

    let lazy_frame = LazyFrame::scan_parquet(schema_path, scan_args.clone()).unwrap();

    // let schema_path = format!("{}/year=2024/month=5/*.parquet", TEST_DIR);
    // let lazy_frame = LazyFrame::scan_parquet(schema_path, ScanArgsParquet::default()).unwrap();

    println!("In storage: {:?}", lazy_frame.schema());

    lazy_frame
}

/// Test that concatenates a saved DataFrame with an in-memory DataFrame and executes a query.
#[test]
fn test_polars_concat() {
    write_example_data();

    let df_stored = get_saved_data();
    let df_in_memory = df! {
        "a" => [4, 5, 6],
        "b" => ["d", "e", "f"],
        "c" => [2.1, 2.2, 2.3],
        "year" => [2024, 2024, 2024],
        "month" => [5, 5, 5],
    }.unwrap().lazy();

    println!("In memory: {:?}", df_stored.schema());

    let args = UnionArgs {
        parallel: false,
        rechunk: false,
        to_supertypes: false,
    };
    let df = concat(&[df_stored, df_in_memory], args).unwrap();

    let result = df.filter(col("a").gt_eq(lit(4)))
        .select([col("c").mean().alias("mean_c")]).collect();

    println!("{:?}", result.unwrap());
}

Log output:

called `Result::unwrap()` on an `Err` value: ShapeMismatch(ErrString("unable to append to a DataFrame of width 4 with a DataFrame of width 2"))
thread 'test_polars_concat' panicked at tests\polars_concat_in_memory_with_saved_data.rs:91:29:
called `Result::unwrap()` on an `Err` value: ShapeMismatch(ErrString("unable to append to a DataFrame of width 4 with a DataFrame of width 2"))
stack backtrace:
   0: std::panicking::begin_panic_handler
             at /rustc/25ef9e3d85d934b27d9dada2f9dd52b1dc63bb04/library\std\src\panicking.rs:647
   1: core::panicking::panic_fmt
             at /rustc/25ef9e3d85d934b27d9dada2f9dd52b1dc63bb04/library\core\src\panicking.rs:72
   2: core::result::unwrap_failed
             at /rustc/25ef9e3d85d934b27d9dada2f9dd52b1dc63bb04/library\core\src\result.rs:1649
   3: enum2$<core::result::Result<polars_core::frame::DataFrame,enum2$<polars_error::PolarsError> > >::unwrap
             at /rustc/25ef9e3d85d934b27d9dada2f9dd52b1dc63bb04\library\core\src\result.rs:1073
   4: polars_concat_in_memory_with_saved_data::test_polars_concat
             at .\tests\polars_concat_in_memory_with_saved_data.rs:91
   5: polars_concat_in_memory_with_saved_data::test_polars_concat::closure$0
             at .\tests\polars_concat_in_memory_with_saved_data.rs:67
   6: core::ops::function::FnOnce::call_once<polars_concat_in_memory_with_saved_data::test_polars_concat::closure_env$0,tuple$<> >
             at /rustc/25ef9e3d85d934b27d9dada2f9dd52b1dc63bb04\library\core\src\ops\function.rs:250
   7: core::ops::function::FnOnce::call_once
             at /rustc/25ef9e3d85d934b27d9dada2f9dd52b1dc63bb04/library\core\src\ops\function.rs:250
note: Some details are omitted, run with `RUST_BACKTRACE=full` for a verbose backtrace.

Expected behavior

I would expect that the concat function yields the same result whether or not we have hive partitioning or projection pushdown activated.

Installed versions

--------Version info---------
Polars:               0.20.26
Index type:           UInt32
Platform:             Windows-10-10.0.22631-SP0
Python:               3.10.5 (tags/v3.10.5:f377153, Jun  6 2022, 16:14:13) [MSC v.1929 64 bit (AMD64)]

----Optional dependencies----
adbc_driver_manager:  <not installed>
cloudpickle:          <not installed>
connectorx:           <not installed>
deltalake:            <not installed>
fastexcel:            <not installed>
fsspec:               <not installed>
gevent:               <not installed>
hvplot:               <not installed>
matplotlib:           <not installed>
nest_asyncio:         <not installed>
numpy:                1.26.4
openpyxl:             <not installed>
pandas:               <not installed>
pyarrow:              16.0.0
pydantic:             <not installed>
pyiceberg:            <not installed>
pyxlsb:               <not installed>
sqlalchemy:           <not installed>
torch:                1.12.1+cpu
xlsx2csv:             <not installed>
xlsxwriter:           <not installed>
@ericdoerheit ericdoerheit added bug Something isn't working needs triage Awaiting prioritization by a maintainer python Related to Python Polars labels May 17, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working needs triage Awaiting prioritization by a maintainer python Related to Python Polars
Projects
None yet
Development

No branches or pull requests

1 participant