Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

dask/distributed occasionally fails to deserialize pandas dataframe due to `AssertionError: Number of manager items must equal union of block items #8605

Open
fangzhouxie opened this issue Apr 2, 2024 · 0 comments

Comments

@fangzhouxie
Copy link

fangzhouxie commented Apr 2, 2024

Describe the issue:

Hi team, in our use case, task B depends on the output of task A, which is a pandas dataframe. Occasionally, dask/distributed fails to deserialize the output of task A (i.e. a pandas dataframe) while gathering dependencies for task B, causing task B to hang forever. Currently, we can fix the hanging task by killing its worker so that it is retried on another worker.

Minimal Complete Verifiable Example:
Unfortunately I was not able to re-create the issue in dev environment even though it has happened multiple times in our production environment.

Anything else we need to know?:
It would be great if tasks could be retried on deserialization errors. But that does not seem to be the case right now according to #6705.

Environment:

  • Dask version: 2024.1.0
  • Pandas version: 1.5.3
  • Python version: 3.10.8
  • Operating System: Ubuntu 20.04.6 LTS
  • Install method (conda, pip, source): pip

Stacktrace

2024-03-30 04:07:23,576 - distributed.protocol.pickle - INFO - Failed to deserialize
Traceback (most recent call last):
  File "/opt/conda/envs/custom/lib/python3.10/site-packages/distributed/protocol/pickle.py", line 71, in loads
    return pickle.loads(x, buffers=buffers)
  File "/opt/conda/envs/custom/lib/python3.10/site-packages/pandas/core/internals/managers.py", line 1038, in __init__
    self._verify_integrity()
  File "/opt/conda/envs/custom/lib/python3.10/site-packages/pandas/core/internals/managers.py", line 1047, in _verify_integrity
    raise AssertionError(
AssertionError: Number of manager items must equal union of block items # manager items: 891, # tot_items: 890
2024-03-30 04:07:23,581 - distributed.protocol.core - CRITICAL - Failed to deserialize
Traceback (most recent call last):
  File "/opt/conda/envs/custom/lib/python3.10/site-packages/distributed/protocol/core.py", line 158, in loads return
    msgpack.loads( File "msgpack/_unpacker.pyx", line 194, in msgpack._cmsgpack.unpackb
  File "/opt/conda/envs/custom/lib/python3.10/site-packages/distributed/protocol/core.py", line 138, in _decode_default return
    merge_and_deserialize(
  File "/opt/conda/envs/custom/lib/python3.10/site-packages/distributed/protocol/serialize.py", line 497, in merge_and_deserialize
    return deserialize(header, merged_frames, deserializers=deserializers)
  File "/opt/conda/envs/custom/lib/python3.10/site-packages/distributed/protocol/serialize.py", line 426, in deserialize
    return loads(header, frames)
  File "/opt/conda/envs/custom/lib/python3.10/site-packages/distributed/protocol/serialize.py", line 96, in pickle_loads
    return pickle.loads(x, buffers=buffers)
  File "/opt/conda/envs/custom/lib/python3.10/site-packages/distributed/protocol/pickle.py", line 71, in loads return
    pickle.loads(x, buffers=buffers)
  File "/opt/conda/envs/custom/lib/python3.10/site-packages/pandas/core/internals/managers.py", line 1038, in __init__
    self._verify_integrity()
  File "/opt/conda/envs/custom/lib/python3.10/site-packages/pandas/core/internals/managers.py", line 1047, in _verify_integrity
    raise AssertionError(
AssertionError: Number of manager items must equal union of block items # manager items: 891, # tot_items: 890
2024-03-30 04:07:23,582 - distributed.worker - ERROR - Number of manager items must equal union of block items # manager items: 891, # tot_items: 890
Traceback (most recent call last):
  File "/opt/conda/envs/custom/lib/python3.10/site-packages/distributed/worker.py", line 2050, in gather_dep
    response = await get_data_from_worker(
  File "/opt/conda/envs/custom/lib/python3.10/site-packages/distributed/worker.py", line 2839, in get_data_from_worker
    return await retry_operation(_get_data, operation="get_data_from_worker")
  File "/opt/conda/envs/custom/lib/python3.10/site-packages/distributed/utils_comm.py", line 386, in retry_operation
    return await retry(
  File "/opt/conda/envs/custom/lib/python3.10/site-packages/distributed/utils_comm.py", line 371, in retry
    return await coro()
  File "/opt/conda/envs/custom/lib/python3.10/site-packages/distributed/worker.py", line 2819, in _get_data
    response = await send_recv(
  File "/opt/conda/envs/custom/lib/python3.10/site-packages/distributed/core.py", line 928, in send_recv
    response = await comm.read(deserializers=deserializers)
   File "/opt/conda/envs/custom/lib/python3.10/site-packages/distributed/comm/asyncio_tcp.py", line 445, in read
    return await from_frames(
  File "/opt/conda/envs/custom/lib/python3.10/site-packages/distributed/comm/utils.py", line 98, in from_frames
    res = await offload(_from_frames)
  File "/opt/conda/envs/custom/lib/python3.10/site-packages/distributed/utils.py", line 1421, in offload
    return await loop.run_in_executor(
  File "/opt/conda/envs/custom/lib/python3.10/concurrent/futures/thread.py", line 58, in run
    result = self.fn(*self.args, **self.kwargs)
  File "/opt/conda/envs/custom/lib/python3.10/site-packages/distributed/utils.py", line 1422, in <lambda>
    _offload_executor, lambda: context.run(fn, *args, **kwargs)
  File "/opt/conda/envs/custom/lib/python3.10/site-packages/distributed/comm/utils.py", line 83, in _from_frames
    return protocol.loads(
  File "/opt/conda/envs/custom/lib/python3.10/site-packages/distributed/protocol/core.py", line 158, in loads
    return msgpack.loads(
  File "msgpack/_unpacker.pyx", line 194, in msgpack._cmsgpack.unpackb
  File "/opt/conda/envs/custom/lib/python3.10/site-packages/distributed/protocol/core.py", line 138, in _decode_default
    return merge_and_deserialize(
  File "/opt/conda/envs/custom/lib/python3.10/site-packages/distributed/protocol/serialize.py", line 497, in merge_and_deserialize
    return deserialize(header, merged_frames, deserializers=deserializers)
  File "/opt/conda/envs/custom/lib/python3.10/site-packages/distributed/protocol/serialize.py", line 426, in deserialize
    return loads(header, frames)
  File "/opt/conda/envs/custom/lib/python3.10/site-packages/distributed/protocol/serialize.py", line 96, in pickle_loads
    return pickle.loads(x, buffers=buffers)
  File "/opt/conda/envs/custom/lib/python3.10/site-packages/distributed/protocol/pickle.py", line 71, in loads
    return pickle.loads(x, buffers=buffers)
  File "/opt/conda/envs/custom/lib/python3.10/site-packages/pandas/core/internals/managers.py", line 1038, in __init__
    self._verify_integrity()
  File "/opt/conda/envs/custom/lib/python3.10/site-packages/pandas/core/internals/managers.py", line 1047, in _verify_integrity
    raise AssertionError(
AssertionError: Number of manager items must equal union of block items # manager items: 891, # tot_items: 890
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

1 participant