Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

test_split_adaptive_aggregate_files failing on main #10721

Open
fjetter opened this issue Dec 19, 2023 · 4 comments
Open

test_split_adaptive_aggregate_files failing on main #10721

fjetter opened this issue Dec 19, 2023 · 4 comments
Labels
tests Unit tests and/or continuous integration

Comments

@fjetter
Copy link
Member

fjetter commented Dec 19, 2023

I only saw this pop up once so far but the test_split_adaptive_aggregate_files was failing on main

https://github.com/dask/dask/actions/runs/7226871563/job/19693365591

        ddf1.to_parquet(
            str(tmpdir),
            engine=write_engine,
            partition_on=partition_on,
            write_index=False,
        )
        with pytest.warns(FutureWarning, match="Behavior may change"):
            ddf2 = dd.read_parquet(
                str(tmpdir),
                engine=read_engine,
                blocksize=blocksize,
                split_row_groups="adaptive",
                aggregate_files=aggregate_files,
            )
    
        # Check that files where aggregated as expected
        if aggregate_files == "a":
            assert ddf2.npartitions == 3
        elif aggregate_files == "b":
            assert ddf2.npartitions == 6
    
        # Check that the final data is correct
        df2 = ddf2.compute().sort_values(["c", "d"])
        df1 = df1.sort_values(["c", "d"])
>       assert_eq(df1[["c", "d"]], df2[["c", "d"]], check_index=False)

dask\dataframe\io\tests\test_parquet.py:3134: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

a =            c   d
0   0.001898  15
1   0.007386  30
2   0.028712  14
3   0.038563  44
4   0.044161   6
..       ...  ..
95  0.948483  83
96  0.959911  49
97  0.970251  32
98  0.995297  82
99  0.999792  65

[100 rows x 2 columns]
b =            c   d
0   0.007386  30
1   0.044161   6
2   0.049735  76
3   0.083297  57
4   0.095994  47
..       ...  ..
92  0.940960  97
93  0.948483  83
94  0.959911  49
95  0.995297  82
96  0.999792  65

[97 rows x 2 columns]
check_names = True, check_dtype = True, check_divisions = True
check_index = False, sort_results = True, scheduler = 'sync', kwargs = {}

    def assert_eq(
        a,
        b,
        check_names=True,
        check_dtype=True,
        check_divisions=True,
        check_index=True,
        sort_results=True,
        scheduler="sync",
        **kwargs,
    ):
        if check_divisions:
            assert_divisions(a, scheduler=scheduler)
            assert_divisions(b, scheduler=scheduler)
            if hasattr(a, "divisions") and hasattr(b, "divisions"):
                at = type(np.asarray(a.divisions).tolist()[0])  # numpy to python
                bt = type(np.asarray(b.divisions).tolist()[0])  # scalar conversion
                assert at == bt, (at, bt)
        assert_sane_keynames(a)
        assert_sane_keynames(b)
        a = _check_dask(
            a, check_names=check_names, check_dtypes=check_dtype, scheduler=scheduler
        )
        b = _check_dask(
            b, check_names=check_names, check_dtypes=check_dtype, scheduler=scheduler
        )
        if hasattr(a, "to_pandas"):
            a = a.to_pandas()
        if hasattr(b, "to_pandas"):
            b = b.to_pandas()
    
        a, b = _maybe_convert_string(a, b)
    
        if isinstance(a, (pd.DataFrame, pd.Series)) and sort_results:
            a = _maybe_sort(a, check_index)
            b = _maybe_sort(b, check_index)
        if not check_index:
            a = a.reset_index(drop=True)
            b = b.reset_index(drop=True)
        if isinstance(a, pd.DataFrame):
>           tm.assert_frame_equal(
                a, b, check_names=check_names, check_dtype=check_dtype, **kwargs
E               AssertionError: DataFrame are different
E               
E               DataFrame shape mismatch
E               [left]:  (100, 2)
E               [right]: (97, 2)

I find this failure particularly concerning since this looks like data loss is possible even if it doesn't happen reliably. @rjzamora do you have time to poke at this?

@github-actions github-actions bot added the needs triage Needs a response from a contributor label Dec 19, 2023
@fjetter fjetter added tests Unit tests and/or continuous integration and removed needs triage Needs a response from a contributor labels Dec 19, 2023
@fjetter
Copy link
Member Author

fjetter commented Dec 19, 2023

I also encountered this in #10722

@rjzamora
Copy link
Member

@rjzamora do you have time to poke at this?

Yes, I also find this concerning. So, I definitely want to figure out what is going wrong.

Side Note: When we start implementing #10602, I have some ideas for how we can get rid of all of this ugly "adaptive aggregation" code without sacrificing our ability to split files.

@jrbourbeau
Copy link
Member

I ran into the same test_split_adaptive_aggregate_files test failing in #10729 but with a different traceback

TypeError: int() argument must be a string, a bytes-like object or a real number, not 'NoneType'

when attempting to read in a partition with the fastparquet engine (full traceback below)

Full traceback:
_______ test_split_adaptive_aggregate_files[b-fastparquet-fastparquet] ________
[gw0] win32 -- Python 3.10.13 C:\Users\runneradmin\miniconda3\envs\test-environment\python.exe

column = column_index_length: null
column_index_offset: null
crypto_metadata: null
encrypted_column_metadata: null
file_offset:...otal_compressed_size: 37
  total_uncompressed_size: null
  type: 1
offset_index_length: null
offset_index_offset: null

schema_helper = <Parquet Schema with 3 entries>
infile = <fsspec.implementations.local.LocalFileOpener object at 0x000001AF9C49BA30>
use_cat = False, selfmade = True, assign = array([0]), catdef = None
row_filter = False

    def read_col(column, schema_helper, infile, use_cat=False,
                 selfmade=False, assign=None, catdef=None,
                 row_filter=None):
        """Using the given metadata, read one column in one row-group.
    
        Parameters
        ----------
        column: thrift structure
            Details on the column
        schema_helper: schema.SchemaHelper
            Based on the schema for this parquet data
        infile: open file or string
            If a string, will open; if an open object, will use as-is
        use_cat: bool (False)
            If this column is encoded throughout with dict encoding, give back
            a pandas categorical column; otherwise, decode to values
        row_filter: bool array or None
            if given, selects which of the values read are to be written
            into the output. Effectively implies NULLs, even for a required
            column.
        """
        cmd = column.meta_data
        try:
>           se = schema_helper.schema_element(cmd.path_in_schema)

C:\Users\runneradmin\miniconda3\envs\test-environment\lib\site-packages\fastparquet\core.py:450: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = <Parquet Schema with 3 entries>, name = ['d']

    def schema_element(self, name):
        """Get the schema element with the given name or path"""
        root = self.root
        if isinstance(name, str):
            name = name.split('.')
        for part in name:
>           root = root["children"][part]
E           KeyError: 'd'

C:\Users\runneradmin\miniconda3\envs\test-environment\lib\site-packages\fastparquet\schema.py:119: KeyError

During handling of the above exception, another exception occurred:

tmpdir = local('C:\\Users\\runneradmin\\AppData\\Local\\Temp\\pytest-of-runneradmin\\pytest-0\\popen-gw0\\test_split_adaptive_aggregate_4')
write_engine = 'fastparquet', read_engine = 'fastparquet', aggregate_files = 'b'

    @write_read_engines()
    @pytest.mark.parametrize("aggregate_files", ["a", "b"])
    def test_split_adaptive_aggregate_files(
        tmpdir, write_engine, read_engine, aggregate_files
    ):
        blocksize = "1MiB"
        partition_on = ["a", "b"]
        df_size = 100
        df1 = pd.DataFrame(
            {
                "a": np.random.choice(["apple", "banana", "carrot"], size=df_size),
                "b": np.random.choice(["small", "large"], size=df_size),
                "c": np.random.random(size=df_size),
                "d": np.random.randint(1, 100, size=df_size),
            }
        )
        ddf1 = dd.from_pandas(df1, npartitions=9)
    
        ddf1.to_parquet(
            str(tmpdir),
            engine=write_engine,
            partition_on=partition_on,
            write_index=False,
        )
        with pytest.warns(FutureWarning, match="Behavior may change"):
            ddf2 = dd.read_parquet(
                str(tmpdir),
                engine=read_engine,
                blocksize=blocksize,
                split_row_groups="adaptive",
                aggregate_files=aggregate_files,
            )
    
        # Check that files where aggregated as expected
        if aggregate_files == "a":
            assert ddf2.npartitions == 3
        elif aggregate_files == "b":
            assert ddf2.npartitions == 6
    
        # Check that the final data is correct
>       df2 = ddf2.compute().sort_values(["c", "d"])

dask\dataframe\io\tests\test_parquet.py:3132: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
dask\base.py:342: in compute
    (result,) = compute(self, traverse=False, **kwargs)
dask\base.py:628: in compute
    results = schedule(dsk, keys, **kwargs)
dask\dataframe\io\parquet\core.py:96: in __call__
    return read_parquet_part(
dask\dataframe\io\parquet\core.py:668: in read_parquet_part
    df = engine.read_partition(
dask\dataframe\io\parquet\fastparquet.py:1075: in read_partition
    return cls.pf_to_pandas(
dask\dataframe\io\parquet\fastparquet.py:1172: in pf_to_pandas
    pf.read_row_group_file(
C:\Users\runneradmin\miniconda3\envs\test-environment\lib\site-packages\fastparquet\api.py:386: in read_row_group_file
    core.read_row_group(
C:\Users\runneradmin\miniconda3\envs\test-environment\lib\site-packages\fastparquet\core.py:642: in read_row_group
    read_row_group_arrays(file, rg, columns, categories, schema_helper,
C:\Users\runneradmin\miniconda3\envs\test-environment\lib\site-packages\fastparquet\core.py:612: in read_row_group_arrays
    read_col(column, schema_helper, file, use_cat=name+'-catdef' in out,
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

column = column_index_length: null
column_index_offset: null
crypto_metadata: null
encrypted_column_metadata: null
file_offset:...otal_compressed_size: 37
  total_uncompressed_size: null
  type: 1
offset_index_length: null
offset_index_offset: null

schema_helper = <Parquet Schema with 3 entries>
infile = <fsspec.implementations.local.LocalFileOpener object at 0x000001AF9C49BA30>
use_cat = False, selfmade = True, assign = array([0]), catdef = None
row_filter = False

    def read_col(column, schema_helper, infile, use_cat=False,
                 selfmade=False, assign=None, catdef=None,
                 row_filter=None):
        """Using the given metadata, read one column in one row-group.
    
        Parameters
        ----------
        column: thrift structure
            Details on the column
        schema_helper: schema.SchemaHelper
            Based on the schema for this parquet data
        infile: open file or string
            If a string, will open; if an open object, will use as-is
        use_cat: bool (False)
            If this column is encoded throughout with dict encoding, give back
            a pandas categorical column; otherwise, decode to values
        row_filter: bool array or None
            if given, selects which of the values read are to be written
            into the output. Effectively implies NULLs, even for a required
            column.
        """
        cmd = column.meta_data
        try:
            se = schema_helper.schema_element(cmd.path_in_schema)
        except KeyError:
            # column not present in this row group
>           assign[:] = None
E           TypeError: int() argument must be a string, a bytes-like object or a real number, not 'NoneType'

C:\Users\runneradmin\miniconda3\envs\test-environment\lib\site-packages\fastparquet\core.py:453: TypeError

@rjzamora
Copy link
Member

when attempting to read in a partition with the fastparquet engine (full traceback below)

Thanks @jrbourbeau ! I have been struggling to figure out what is going wrong beyond "The fastparquet engine is messing up somewhere with hive-partitioned data" - This is helpful.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
tests Unit tests and/or continuous integration
Projects
None yet
Development

No branches or pull requests

3 participants