Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Workflow breaks due to non-deterministic pickling #879

Open
jrbourbeau opened this issue Feb 14, 2024 · 8 comments
Open

Workflow breaks due to non-deterministic pickling #879

jrbourbeau opened this issue Feb 14, 2024 · 8 comments

Comments

@jrbourbeau
Copy link
Member

jrbourbeau commented Feb 14, 2024

While writing out a DataFrame to parquet from a Prefect task (not sure if the Prefect part is actually important or not), I got the following error:

Traceback (most recent call last):
  File "/Users/james/mambaforge/envs/etl-tpch2/lib/python3.11/site-packages/dask/base.py", line 1186, in normalize_callable
    return function_cache[func]
           ~~~~~~~~~~~~~~^^^^^^
KeyError: <function repartition_table.<locals>.name at 0x15ed85ee0>

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/Users/james/mambaforge/envs/etl-tpch2/lib/python3.11/site-packages/prefect/engine.py", line 840, in orchestrate_flow_run
    result = await flow_call.aresult()
             ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/james/mambaforge/envs/etl-tpch2/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 291, in aresult
    return await asyncio.wrap_future(self.future)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/james/mambaforge/envs/etl-tpch2/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 315, in _run_sync
    result = self.fn(*self.args, **self.kwargs)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/james/projects/coiled/etl-tpch-backup/pipeline/resize.py", line 65, in resize_parquet
    files = repartition_table(files_, table=table)
            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/james/mambaforge/envs/etl-tpch2/lib/python3.11/site-packages/prefect/tasks.py", line 549, in __call__
    return enter_task_run_engine(
           ^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/james/mambaforge/envs/etl-tpch2/lib/python3.11/site-packages/prefect/engine.py", line 1155, in enter_task_run_engine
    return from_sync.wait_for_call_in_loop_thread(begin_run)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/james/mambaforge/envs/etl-tpch2/lib/python3.11/site-packages/prefect/_internal/concurrency/api.py", line 243, in wait_for_call_in_loop_thread
    return call.result()
           ^^^^^^^^^^^^^
  File "/Users/james/mambaforge/envs/etl-tpch2/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 282, in result
    return self.future.result(timeout=timeout)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/james/mambaforge/envs/etl-tpch2/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 168, in result
    return self.__get_result()
           ^^^^^^^^^^^^^^^^^^^
  File "/Users/james/mambaforge/envs/etl-tpch2/lib/python3.11/concurrent/futures/_base.py", line 401, in __get_result
    raise self._exception
  File "/Users/james/mambaforge/envs/etl-tpch2/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 345, in _run_async
    result = await coro
             ^^^^^^^^^^
  File "/Users/james/mambaforge/envs/etl-tpch2/lib/python3.11/site-packages/prefect/engine.py", line 1323, in get_task_call_return_value
    return await future._result()
           ^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/james/mambaforge/envs/etl-tpch2/lib/python3.11/site-packages/prefect/futures.py", line 237, in _result
    return await final_state.result(raise_on_failure=raise_on_failure, fetch=True)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/james/mambaforge/envs/etl-tpch2/lib/python3.11/site-packages/prefect/states.py", line 91, in _get_state_result
    raise await get_state_exception(state)
  File "/Users/james/mambaforge/envs/etl-tpch2/lib/python3.11/site-packages/prefect/engine.py", line 1744, in orchestrate_task_run
    result = await call.aresult()
             ^^^^^^^^^^^^^^^^^^^^
  File "/Users/james/mambaforge/envs/etl-tpch2/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 291, in aresult
    return await asyncio.wrap_future(self.future)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/james/mambaforge/envs/etl-tpch2/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 315, in _run_sync
    result = self.fn(*self.args, **self.kwargs)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/james/projects/coiled/etl-tpch-backup/pipeline/resize.py", line 41, in repartition_table
    df.to_parquet(outdir, compression="snappy", name_function=name)
  File "/Users/james/mambaforge/envs/etl-tpch2/lib/python3.11/site-packages/dask_expr/_collection.py", line 2887, in to_parquet
    return to_parquet(self, path, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/james/mambaforge/envs/etl-tpch2/lib/python3.11/site-packages/dask_expr/io/parquet.py", line 380, in to_parquet
    ToParquet(
  File "/Users/james/mambaforge/envs/etl-tpch2/lib/python3.11/site-packages/dask_expr/_core.py", line 43, in __new__
    _name = inst._name
            ^^^^^^^^^^
  File "/Users/james/mambaforge/envs/etl-tpch2/lib/python3.11/functools.py", line 1001, in __get__
    val = self.func(instance)
          ^^^^^^^^^^^^^^^^^^^
  File "/Users/james/mambaforge/envs/etl-tpch2/lib/python3.11/site-packages/dask_expr/_core.py", line 415, in _name
    funcname(type(self)).lower() + "-" + _tokenize_deterministic(*self.operands)
                                         ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/james/mambaforge/envs/etl-tpch2/lib/python3.11/site-packages/dask_expr/_util.py", line 100, in _tokenize_deterministic
    return tokenize(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/james/mambaforge/envs/etl-tpch2/lib/python3.11/site-packages/dask/base.py", line 1023, in tokenize
    token = _normalize_seq_func(args)
            ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/james/mambaforge/envs/etl-tpch2/lib/python3.11/site-packages/dask/base.py", line 1117, in _normalize_seq_func
    item = normalize_token(item)
           ^^^^^^^^^^^^^^^^^^^^^
  File "/Users/james/mambaforge/envs/etl-tpch2/lib/python3.11/site-packages/dask/utils.py", line 773, in __call__
    return meth(arg, *args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/james/mambaforge/envs/etl-tpch2/lib/python3.11/site-packages/dask/base.py", line 1165, in normalize_object
    return normalize_callable(o)
           ^^^^^^^^^^^^^^^^^^^^^
  File "/Users/james/mambaforge/envs/etl-tpch2/lib/python3.11/site-packages/dask/base.py", line 1188, in normalize_callable
    result = _normalize_callable(func)
             ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/james/mambaforge/envs/etl-tpch2/lib/python3.11/site-packages/dask/base.py", line 1229, in _normalize_callable
    raise RuntimeError(
RuntimeError: Function <function repartition_table.<locals>.name at 0x15ed85ee0> may not be deterministically hashed by cloudpickle. See: https://github.com/cloudpipe/cloudpickle/issues/385 for more information.

A few of things come to mind:

  1. Should this be the default behavior? I'm not sure how common it is for this RuntimeError to be raised
  2. Can I disable this and say I'm okay with non-deterministic tokenization?
  3. This error message isn't very approachable to naive users, it'd be nice to reword with that in mind. In particular, "What am I supposed to do about this?" is probably a question users will have

This might actually be a dask/dask issue, but opening up here as I'm currently using dask-expr

cc @phofl @crusaderky

@jrbourbeau
Copy link
Member Author

Can I disable this and say I'm okay with non-deterministic tokenization?

Hopefully with tokenize.ensure-deterministic=False

@jrbourbeau
Copy link
Member Author

Hopefully with tokenize.ensure-deterministic=False

This didn't seem to do the trick

@fjetter
Copy link
Member

fjetter commented Feb 14, 2024

I believe this should go away with latest main. tokenization is a weird thing and a lot of work went into this recently. Most notably dask/dask#10883 will change this to work I suspect.

The config option is useless since dask-expr hard codes because we rely on this to be actually deterministic

@jrbourbeau
Copy link
Member Author

Ah, thanks. I'll try again with the main branch of dask/dask.

The config option is useless since dask-expr hard codes because we rely on this to be actually deterministic

Just to clarify, things should still work when using the main dask version even though dask-expr is hardcoded the config option?

@phofl
Copy link
Collaborator

phofl commented Feb 14, 2024

Just to clarify, things should still work when using the main dask version even though dask-expr is hardcoded the config option?

dask/dask will respect the config option, dask-expr does not. Was this your question?

@crusaderky
Copy link
Collaborator

We just removed a hack in dask-expr (#822); things will break if you don't upgrade to the latest dask/dask (dask/dask#10883) too

@fjetter
Copy link
Member

fjetter commented Feb 15, 2024

things will break if you don't upgrade to the latest dask/da

We should start being mindful about compatibility and pinning here if we want to keep this as a separate repo/package

@phofl
Copy link
Collaborator

phofl commented Feb 15, 2024

I’ve already started pinning the releases to a specific dask/dask version

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants