Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Large numpy arrays stored in big-endian format cannot be serialized, leading to errors with Parallel #1545

Open
alexisthual opened this issue Feb 15, 2024 · 5 comments · May be fixed by #1561
Assignees

Comments

@alexisthual
Copy link

alexisthual commented Feb 15, 2024

Problem
The following snippet will fail with joblib>=1.3.0 (and not with joblib==1.2.0) with the stacktrace reported at the end of this message.

import numpy as np
from joblib import Parallel, delayed


def dummy(x, i):
    return i


x = np.random.randint(0, 100, (200000, 3)).view(">i4")
Parallel(n_jobs=2)(delayed(dummy)(x, i) for i in range(3))

It only fails when x is large enough and stored in big-endian format. For instance,

  1. x = np.random.randint(0, 100, (20, 3)).view(">i4") (small array, big-endian format)
  2. x = np.random.randint(0, 100, (200000, 3)) (large array, little-endian format)

will both run without error.

Temporary solution
As @lesteve suggested, using Parallel(n_jobs=2, max_nbytes=None)(...) gets the snippet to run without error. Maybe we should change default parameters / error messages to guide users towards this solution?

Stacktrace
{
	"name": "BrokenProcessPool",
	"message": "A task has failed to un-serialize. Please ensure that the arguments of the function are all picklable.",
	"stack": "---------------------------------------------------------------------------
_RemoteTraceback                          Traceback (most recent call last)
_RemoteTraceback: 
\"\"\"
Traceback (most recent call last):
  File \"/home/alexis/miniconda3/envs/singbrain/lib/python3.10/site-packages/joblib/externals/loky/process_executor.py\", line 426, in _process_worker
    call_item = call_queue.get(block=True, timeout=timeout)
  File \"/home/alexis/miniconda3/envs/singbrain/lib/python3.10/multiprocessing/queues.py\", line 122, in get
    return _ForkingPickler.loads(res)
  File \"/home/alexis/miniconda3/envs/singbrain/lib/python3.10/site-packages/joblib/numpy_pickle.py\", line 600, in load_temporary_memmap
    add_maybe_unlink_finalizer(obj)
  File \"/home/alexis/miniconda3/envs/singbrain/lib/python3.10/site-packages/joblib/_memmapping_reducer.py\", line 72, in add_maybe_unlink_finalizer
    \"\".format(type(memmap), id(memmap), os.path.basename(memmap.filename),
  File \"/home/alexis/miniconda3/envs/singbrain/lib/python3.10/posixpath.py\", line 142, in basename
    p = os.fspath(p)
TypeError: expected str, bytes or os.PathLike object, not NoneType
\"\"\"

The above exception was the direct cause of the following exception:

BrokenProcessPool                         Traceback (most recent call last)
/home/alexis/singbrain/repo/alexis_thual/_198_fix_fugw_lmds.py in <cell line: 7>()
      <a href='file:///home/alexis/singbrain/repo/alexis_thual/_198_fix_fugw_lmds.py?line=202'>203</a>     return i
      <a href='file:///home/alexis/singbrain/repo/alexis_thual/_198_fix_fugw_lmds.py?line=205'>206</a> x = np.random.randint(0, 100, (200000, 3)).view(\">i4\")
----> <a href='file:///home/alexis/singbrain/repo/alexis_thual/_198_fix_fugw_lmds.py?line=206'>207</a> Parallel(n_jobs=2)(delayed(dummy)(x, i) for i in range(3))

File ~/miniconda3/envs/singbrain/lib/python3.10/site-packages/joblib/parallel.py:1944, in Parallel.__call__(self, iterable)
   <a href='file:///home/alexis/miniconda3/envs/singbrain/lib/python3.10/site-packages/joblib/parallel.py?line=1937'>1938</a> # The first item from the output is blank, but it makes the interpreter
   <a href='file:///home/alexis/miniconda3/envs/singbrain/lib/python3.10/site-packages/joblib/parallel.py?line=1938'>1939</a> # progress until it enters the Try/Except block of the generator and
   <a href='file:///home/alexis/miniconda3/envs/singbrain/lib/python3.10/site-packages/joblib/parallel.py?line=1939'>1940</a> # reach the first `yield` statement. This starts the aynchronous
   <a href='file:///home/alexis/miniconda3/envs/singbrain/lib/python3.10/site-packages/joblib/parallel.py?line=1940'>1941</a> # dispatch of the tasks to the workers.
   <a href='file:///home/alexis/miniconda3/envs/singbrain/lib/python3.10/site-packages/joblib/parallel.py?line=1941'>1942</a> next(output)
-> <a href='file:///home/alexis/miniconda3/envs/singbrain/lib/python3.10/site-packages/joblib/parallel.py?line=1943'>1944</a> return output if self.return_generator else list(output)

File ~/miniconda3/envs/singbrain/lib/python3.10/site-packages/joblib/parallel.py:1587, in Parallel._get_outputs(self, iterator, pre_dispatch)
   <a href='file:///home/alexis/miniconda3/envs/singbrain/lib/python3.10/site-packages/joblib/parallel.py?line=1583'>1584</a>     yield
   <a href='file:///home/alexis/miniconda3/envs/singbrain/lib/python3.10/site-packages/joblib/parallel.py?line=1585'>1586</a>     with self._backend.retrieval_context():
-> <a href='file:///home/alexis/miniconda3/envs/singbrain/lib/python3.10/site-packages/joblib/parallel.py?line=1586'>1587</a>         yield from self._retrieve()
   <a href='file:///home/alexis/miniconda3/envs/singbrain/lib/python3.10/site-packages/joblib/parallel.py?line=1588'>1589</a> except GeneratorExit:
   <a href='file:///home/alexis/miniconda3/envs/singbrain/lib/python3.10/site-packages/joblib/parallel.py?line=1589'>1590</a>     # The generator has been garbage collected before being fully
   <a href='file:///home/alexis/miniconda3/envs/singbrain/lib/python3.10/site-packages/joblib/parallel.py?line=1590'>1591</a>     # consumed. This aborts the remaining tasks if possible and warn
   <a href='file:///home/alexis/miniconda3/envs/singbrain/lib/python3.10/site-packages/joblib/parallel.py?line=1591'>1592</a>     # the user if necessary.
   <a href='file:///home/alexis/miniconda3/envs/singbrain/lib/python3.10/site-packages/joblib/parallel.py?line=1592'>1593</a>     self._exception = True

File ~/miniconda3/envs/singbrain/lib/python3.10/site-packages/joblib/parallel.py:1691, in Parallel._retrieve(self)
   <a href='file:///home/alexis/miniconda3/envs/singbrain/lib/python3.10/site-packages/joblib/parallel.py?line=1683'>1684</a> while self._wait_retrieval():
   <a href='file:///home/alexis/miniconda3/envs/singbrain/lib/python3.10/site-packages/joblib/parallel.py?line=1684'>1685</a> 
   <a href='file:///home/alexis/miniconda3/envs/singbrain/lib/python3.10/site-packages/joblib/parallel.py?line=1685'>1686</a>     # If the callback thread of a worker has signaled that its task
   <a href='file:///home/alexis/miniconda3/envs/singbrain/lib/python3.10/site-packages/joblib/parallel.py?line=1686'>1687</a>     # triggered an exception, or if the retrieval loop has raised an
   <a href='file:///home/alexis/miniconda3/envs/singbrain/lib/python3.10/site-packages/joblib/parallel.py?line=1687'>1688</a>     # exception (e.g. `GeneratorExit`), exit the loop and surface the
   <a href='file:///home/alexis/miniconda3/envs/singbrain/lib/python3.10/site-packages/joblib/parallel.py?line=1688'>1689</a>     # worker traceback.
   <a href='file:///home/alexis/miniconda3/envs/singbrain/lib/python3.10/site-packages/joblib/parallel.py?line=1689'>1690</a>     if self._aborting:
-> <a href='file:///home/alexis/miniconda3/envs/singbrain/lib/python3.10/site-packages/joblib/parallel.py?line=1690'>1691</a>         self._raise_error_fast()
   <a href='file:///home/alexis/miniconda3/envs/singbrain/lib/python3.10/site-packages/joblib/parallel.py?line=1691'>1692</a>         break
   <a href='file:///home/alexis/miniconda3/envs/singbrain/lib/python3.10/site-packages/joblib/parallel.py?line=1693'>1694</a>     # If the next job is not ready for retrieval yet, we just wait for
   <a href='file:///home/alexis/miniconda3/envs/singbrain/lib/python3.10/site-packages/joblib/parallel.py?line=1694'>1695</a>     # async callbacks to progress.

File ~/miniconda3/envs/singbrain/lib/python3.10/site-packages/joblib/parallel.py:1726, in Parallel._raise_error_fast(self)
   <a href='file:///home/alexis/miniconda3/envs/singbrain/lib/python3.10/site-packages/joblib/parallel.py?line=1721'>1722</a> # If this error job exists, immediatly raise the error by
   <a href='file:///home/alexis/miniconda3/envs/singbrain/lib/python3.10/site-packages/joblib/parallel.py?line=1722'>1723</a> # calling get_result. This job might not exists if abort has been
   <a href='file:///home/alexis/miniconda3/envs/singbrain/lib/python3.10/site-packages/joblib/parallel.py?line=1723'>1724</a> # called directly or if the generator is gc'ed.
   <a href='file:///home/alexis/miniconda3/envs/singbrain/lib/python3.10/site-packages/joblib/parallel.py?line=1724'>1725</a> if error_job is not None:
-> <a href='file:///home/alexis/miniconda3/envs/singbrain/lib/python3.10/site-packages/joblib/parallel.py?line=1725'>1726</a>     error_job.get_result(self.timeout)

File ~/miniconda3/envs/singbrain/lib/python3.10/site-packages/joblib/parallel.py:735, in BatchCompletionCallBack.get_result(self, timeout)
    <a href='file:///home/alexis/miniconda3/envs/singbrain/lib/python3.10/site-packages/joblib/parallel.py?line=728'>729</a> backend = self.parallel._backend
    <a href='file:///home/alexis/miniconda3/envs/singbrain/lib/python3.10/site-packages/joblib/parallel.py?line=730'>731</a> if backend.supports_retrieve_callback:
    <a href='file:///home/alexis/miniconda3/envs/singbrain/lib/python3.10/site-packages/joblib/parallel.py?line=731'>732</a>     # We assume that the result has already been retrieved by the
    <a href='file:///home/alexis/miniconda3/envs/singbrain/lib/python3.10/site-packages/joblib/parallel.py?line=732'>733</a>     # callback thread, and is stored internally. It's just waiting to
    <a href='file:///home/alexis/miniconda3/envs/singbrain/lib/python3.10/site-packages/joblib/parallel.py?line=733'>734</a>     # be returned.
--> <a href='file:///home/alexis/miniconda3/envs/singbrain/lib/python3.10/site-packages/joblib/parallel.py?line=734'>735</a>     return self._return_or_raise()
    <a href='file:///home/alexis/miniconda3/envs/singbrain/lib/python3.10/site-packages/joblib/parallel.py?line=736'>737</a> # For other backends, the main thread needs to run the retrieval step.
    <a href='file:///home/alexis/miniconda3/envs/singbrain/lib/python3.10/site-packages/joblib/parallel.py?line=737'>738</a> try:

File ~/miniconda3/envs/singbrain/lib/python3.10/site-packages/joblib/parallel.py:753, in BatchCompletionCallBack._return_or_raise(self)
    <a href='file:///home/alexis/miniconda3/envs/singbrain/lib/python3.10/site-packages/joblib/parallel.py?line=750'>751</a> try:
    <a href='file:///home/alexis/miniconda3/envs/singbrain/lib/python3.10/site-packages/joblib/parallel.py?line=751'>752</a>     if self.status == TASK_ERROR:
--> <a href='file:///home/alexis/miniconda3/envs/singbrain/lib/python3.10/site-packages/joblib/parallel.py?line=752'>753</a>         raise self._result
    <a href='file:///home/alexis/miniconda3/envs/singbrain/lib/python3.10/site-packages/joblib/parallel.py?line=753'>754</a>     return self._result
    <a href='file:///home/alexis/miniconda3/envs/singbrain/lib/python3.10/site-packages/joblib/parallel.py?line=754'>755</a> finally:

BrokenProcessPool: A task has failed to un-serialize. Please ensure that the arguments of the function are all picklable."
}
@lesteve
Copy link
Member

lesteve commented Feb 15, 2024

So I am going to guess that the memmapping of a non-native endianness is the issue here but this needs more investigation

@fcharras
Copy link
Contributor

fcharras commented Apr 4, 2024

Did a bit of digging, the filename attribute is lost when going through this function.

The bug was most likely added there 0569c89

@fcharras
Copy link
Contributor

fcharras commented Apr 4, 2024

Not related to 0569c89

Can be fixed by passing inplace=True to byteswap, indeed it makes sense that we're loosing the filename attribute if we're making a copy. What is even the meaning of a copy of a memmap ? it's weird that it's also a memmap object.

Anyway, inplace = True could fix but now, why did it use to work with previous joblib versions ? Immediately I don't see changes in joblib that could explain, maybe something happened in numpy ?

@fcharras
Copy link
Contributor

fcharras commented Apr 4, 2024

inplace = True to byteswap on a memmap might be bad for performance however. Continuing investigating a better fix after discussing with @lesteve .

Edit: actually it doesn't even work because the memmap is read-only.

@fcharras
Copy link
Contributor

fcharras commented Apr 4, 2024

So the bug was in fact likely introduced there 0fa2cb9

The changes to endianess in this PR were aimed at fixing behavior of joblib.dump and joblib.load and it was overshighted that it also affected joblib.Parallel.

The fix would be to just bypass the endianess standardization step for automated dump/load steps in joblib.Parallel.

I suspect this will also affect the endianess of small arrays received by joblib workers, when the endianess of the arrays in the main worker weren't the same than system endianess... which is technically a breaking change, even if the usecase is very marginal..

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants