Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Regression: P2P shuffle skeleton (#5520) causes test flakiness #5688

Closed
crusaderky opened this issue Jan 24, 2022 · 6 comments · Fixed by #5689 or #5697
Closed

Regression: P2P shuffle skeleton (#5520) causes test flakiness #5688

crusaderky opened this issue Jan 24, 2022 · 6 comments · Fixed by #5689 or #5697
Assignees

Comments

@crusaderky
Copy link
Collaborator

crusaderky commented Jan 24, 2022

Since #5520 was merged earlier today, several tests that I never observed failing before became very flaky:

FAILED distributed/shuffle/tests/test_shuffle_extension.py::test_get_partition
FAILED distributed/shuffle/tests/test_shuffle_extension.py::test_get_partition
FAILED distributed/shuffle/tests/test_shuffle_extension.py::test_get_partition
FAILED distributed/tests/test_client.py::test_rebalance - assert 0 == 10
FAILED distributed/tests/test_client.py::test_rebalance - assert 0 == 10
FAILED distributed/tests/test_scheduler.py::test_rebalance - AssertionError: ...
FAILED distributed/tests/test_scheduler.py::test_rebalance - AssertionError: ...
FAILED distributed/tests/test_tls_functional.py::test_rebalance - assert 0 == 10

logs:
https://github.com/dask/distributed/actions/runs/1739731969
https://github.com/dask/distributed/actions/runs/1740153471
https://github.com/dask/distributed/actions/runs/1740658508

CC @gjoseph92

@gjoseph92
Copy link
Collaborator

I've identified and fixed the flaky shuffle test in #5689.

The rebalance tests I'm less sure about, and less sure that shuffling is the culprit.

The only change #5520 makes that I think could have any effect on other tests is that ShuffleWorkerExtension is now added to workers by default. However, all the extension does is add a few RPC handlers—unless they're called, it should do nothing.

Looking at the logs you posted, I frequently see something like this in stderr for the failing rebalance tests:
https://github.com/dask/distributed/runs/4921368065?check_suite_focus=true#step:12:1548

distributed.worker - WARNING - Worker is at 80% memory usage. Pausing worker.  Process memory: 826.48 MiB -- Worker memory limit: 1.00 GiB

distributed.worker - WARNING - Worker is at 55% memory usage. Resuming worker. Process memory: 570.68 MiB -- Worker memory limit: 1.00 GiB

https://github.com/dask/distributed/runs/4922504753?check_suite_focus=true#step:12:1338

distributed.worker - INFO - Run out-of-band function 'lambda'

distributed.worker - INFO - Run out-of-band function 'lambda'

distributed.worker - WARNING - Worker is at 85% memory usage. Pausing worker.  Process memory: 877.50 MiB -- Worker memory limit: 1.00 GiB

distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.scheduler - INFO - Remove worker <WorkerState 'tcp://127.0.0.1:54688', name: 0, status: running, memory: 10, processing: 0>
distributed.core - INFO - Removing comms to tcp://127.0.0.1:54688
distributed.worker - WARNING - Could not find data: {'lambda-3eaf13a0de8c3c7aa63e4a7dac0cf7c8': ['tcp://127.0.0.1:54688'], 'lambda-67004364535cebd1e894b084bda293dc': ['tcp://127.0.0.1:54688'], 'lambda-394dfbb70ca393e71593eb63157a0ae2': ['tcp://127.0.0.1:54688'], 'lambda-ff4fe63f2fb498fd9d8dfff7d57b33d1': ['tcp://127.0.0.1:54688'], 'lambda-be5442181f9824141a3f9e1b05f7a9f7': ['tcp://127.0.0.1:54688']} on workers: ['tcp://127.0.0.1:54688'] (who_has: {'lambda-3eaf13a0de8c3c7aa63e4a7dac0cf7c8': ['tcp://127.0.0.1:54688'], 'lambda-67004364535cebd1e894b084bda293dc': ['tcp://127.0.0.1:54688'], 'lambda-be5442181f9824141a3f9e1b05f7a9f7': ['tcp://127.0.0.1:54688'], 'lambda-394dfbb70ca393e71593eb63157a0ae2': ['tcp://127.0.0.1:54688'], 'lambda-ff4fe63f2fb498fd9d8dfff7d57b33d1': ['tcp://127.0.0.1:54688']})

distributed.scheduler - WARNING - Worker tcp://127.0.0.1:54691 failed to acquire keys: {'lambda-3eaf13a0de8c3c7aa63e4a7dac0cf7c8': ('tcp://127.0.0.1:54688',), 'lambda-67004364535cebd1e894b084bda293dc': ('tcp://127.0.0.1:54688',), 'lambda-394dfbb70ca393e71593eb63157a0ae2': ('tcp://127.0.0.1:54688',), 'lambda-ff4fe63f2fb498fd9d8dfff7d57b33d1': ('tcp://127.0.0.1:54688',), 'lambda-be5442181f9824141a3f9e1b05f7a9f7': ('tcp://127.0.0.1:54688',)}
distributed.worker - INFO - Run out-of-band function 'lambda'

distributed.nanny - INFO - Worker process 2252 was killed by signal 15

Basically it seems that the worker from which we want to rebalance is now:

  1. Getting paused
  2. Fully running out of memory and being restarted

Unless the memory usage was previously so close to the limit that adding a few extra Python objects in ShuffleWorkerExtension is pushing memory usage over the line, I'm not sure how the extension could be causing this OOM that wasn't happening before.

I notice #5665 was merged right after the shuffle PR. Is it possible that's related?

@crusaderky
Copy link
Collaborator Author

crusaderky commented Jan 24, 2022

distributed.worker - WARNING - Worker is at 85% memory usage. Pausing worker. Process memory: 877.50 MiB -- Worker memory limit: 1.00 GiB

The test is responsible for 512 MiB worth of managed memory and can cope with up to 204 MiB worth of unmanaged memory before it starts misbehaving:

  • 1 GiB memory_limit
  • 512 MiB deliberate managed memory
  • pause threshold=0.7
  • 1024 * 0.7 - 512 = 204

I would expect <50 MiB worth of unmanaged memory on a spawned process; instead here we've having 877 - 512 = 365 MiB. How much did you have before your PR?

gjoseph92 added a commit to gjoseph92/distributed that referenced this issue Jan 24, 2022
Reduce memory spikes during data transfer with `test_rebalance`, by using NumPy arrays (zero-copy) and more, smaller keys.

Fixes dask#5688
@gjoseph92
Copy link
Collaborator

We determined that the shuffle extension increases worker unmanaged memory by ~10MiB, because it imports pandas. It seems that this slight increase, combined with the memory spiking during the rebalance, was enough to push the worker over its spill threshold (or sometimes even limit), leading to incomplete rebalancing because some data would get spilled to disk (or the worker would restart).

I've refactored the test_rebalance tests to hopefully be less spiky: #5691. They were originally using ~50MiB keys of string data. Now they're using ~5MiB NumPy arrays, which should also support zero-copy serialization.

gjoseph92 added a commit to gjoseph92/distributed that referenced this issue Jan 24, 2022
In dask#5688, we discovered that dask#5520 was increasing worker unmanaged memory usage by ~10MiB at startup (on my mac). I suspect that this came from importing pandas.

The shuffle extension already worked if pandas wasn't available: it just wouldn't install itself on the worker. However, to check if pandas was available, it was importing pandas—so if pandas _was_ available, every worker would have to spend the time and memory to import it at startup, even if it wasn't used at all.

With this PR, all use of pandas is deferred. There's also now an explicit test that importing the shuffle does not import pandas.
@crusaderky crusaderky reopened this Jan 24, 2022
@fjetter
Copy link
Member

fjetter commented Jan 25, 2022

If this causes issues, we can consider reverting the PR

@crusaderky
Copy link
Collaborator Author

Reopening; test_get_partition is still flaky although on a different point.

https://github.com/dask/distributed/runs/4965019825?check_suite_focus=true

>           assert_frame_equal(expected, result)

distributed\shuffle\tests\test_shuffle_extension.py:279: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
pandas\_libs\testing.pyx:52: in pandas._libs.testing.assert_almost_equal
    ???
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

>   ???
E   AssertionError: DataFrame.index are different
E   
E   DataFrame.index values are different (66.66667 %)
E   [left]:  Int64Index([3, 3, 7], dtype='int64')
E   [right]: Int64Index([3, 7, 3], dtype='int64')

@crusaderky crusaderky reopened this Jan 27, 2022
@gjoseph92
Copy link
Collaborator

Fixed by #5706

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
3 participants