New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Race condition in scatter->dereference->scatter #8576
Comments
We've done some nontrivial refactoring in how we're hashing/tokenizing objects (dask/dask#10905) I see this came up in an OSS project. Is it possible for you to link to the code this is running? cc @crusaderky |
Certainly. The problem is that the code is quite complex and the objects we use are also quite complex. Actually running this particular workflow currently requires a proprietary program. I was hoping to be able to create a smaller example, but wasn't able to. I could potentially do some debugging in dask/distributed myself if I knew where to start looking. Could it be that scattered objects gets removed after they are deemed not needed any longer? The main github page: https://github.com/pharmpy/pharmpy |
The lifetime of a scattered object is coupled to the |
Hi @rikardn,
What does "first" and "second" workflow mean? are they two keys inside the same dsk that end up summarized and retrieved at once through the In other words - could your algorithm be simplified down to this? import distributed
with distributed.Client(processes=False) as client:
dsk = {
"results": (sum, ["first", "second"]),
"first": client.scatter(123),
"second": client.scatter(123), # produces a future with the same key as first
}
print(client.get(dsk, "results")) or to this? with distributed.Client(processes=False) as client:
dsk = {"results": client.scatter(123)}
print("first", client.get(dsk, "results"))
with distributed.Client(processes=False) as client:
dsk = {"results": client.scatter(123)}
print("second", client.get(dsk, "results")) |
The full workflow is something like: def func():
obj = create_object()
dsk1 = create_workflow1(client.scatter(obj))
res1 = run_dynamic_workflow(dsk1)
dsk2 = create_workflow2(client.scatter(obj), res1) # Note same object as before. New call to scatter.
res2 = run_dynamic_workflow2(dsk2)
return res2
def run_dynamic_workflow(dsk):
client = get_client()
futures = client.get(dsk, "result", sync=False)
secede()
T = client.gather(futures)
rejoin()
return res
with distributed.Client(processes=False) as client:
dsk = {
"results": (func),
}
print(client.get(dsk, "results")) |
Thanks @rikardn, this helps a lot. An important nitpick though: did you accidentally omit def func():
client = get_client() ? In other words, do Second important nit:
is res1 a computed result, as your pseudocode lets intend, or a Future to the output of the first workflow? |
First: Second: Third:
Fourth: An updated example for reference: def func():
obj = create_object()
dsk1 = create_workflow1(client.scatter(obj))
res1, new_obj = run_dynamic_workflow(dsk1) # Note that new_obj and obj is the same object, i.e. having the same hash
dsk2 = create_workflow2(client.scatter(new_obj), client.scatter(res1))
res2 = run_dynamic_workflow2(dsk2)
return res2
def run_dynamic_workflow(dsk):
client = get_client()
futures = client.get(dsk, "result", sync=False)
secede()
res = client.gather(futures)
rejoin()
return res
with distributed.with LocalCluster(processes=False) as cluster, Client(cluster) as client:
dsk = {
"results": (func),
}
print(client.get(dsk, "results")) |
I realize my first point doesn't make sense. It is acutally the |
It isn't. What I meant was if res is actually just the raw output of client.get, e.g. distributed.Future objects. |
Reproduced. I'm willing to bet that your pseudocode is missing a detail: before you're calling from time import sleep
import distributed
client = distributed.Client(processes=False)
while True:
print(".", end="")
x = client.scatter(123)
assert client.cluster.scheduler.tasks[x.key].state == "memory"
del x
# while client.cluster.scheduler.tasks:
# sleep(0.01) output:
if I uncomment the sleep after the deletion, it goes on indefinitely. |
@crusaderky Just wow! I am amazed that you from my messy information could figure this one out. Thanks! I can confirm that this issue is also in dask/distributed 2024.2.0 and the reason it was triggered for me starting with 2024.2.1 was that the hash function in dask was changed so that it now gives the same hash for the objects that are the same (which is a good change). One potential workaround is to use |
Triage summary
If you scatter a key, dereference the returned future, and then scatter again the same key (with the same value), the release of the first future is likely to reach the scheduler after the transition to memory of the second one and you'll end up holding a Future to a forgotten task. This in turn will cause your computation to fail.
Reproducer below: #8576 (comment)
Original post
I am using dask/distributed to run custom workflows. A workflow that used to work with versions 2024.2.0 stopped working with 2024.2.1 (and still does not work with 2024.3.0). I have been unable to create a minimal reproducible example, but I have some information that could give clues to what could be the problem.
The setup is that a static workflow, run on a
LocalCluster
with threads, is calling two dynamic workflows (see the graphs below) and many of the objects are scattered. I think that the problem is that the scattered object with idModelEntry-d3c014c28f9af6108cba2a6c960688ce
is the same for both dynamic workflows. Using distributed 2024.2.0 (or earlier) have different ids for this object even though it is the same object. Also if tellingscatter
to not usehash
the workflow will run, I guess because now the ids will be different. Not scattering the object will also work. Given the log messages (see below) it seems as if we are losing theModelEntry-d3c014c28f9af6108cba2a6c960688ce
in the second dynamic workflow and it seems to only happen when the scatter-id is the same in the two workflows.The stacktrace:
The log:
First dynamic workflow (works):
Failing dynamic workflow (see first function call for the suspect scattered object):
The text was updated successfully, but these errors were encountered: