Running Daemon Processes from Dask Delayed as a Bridge for Non-Supported functions #7047
-
Hi Everyone, In my hypothetical project, I use some functions that I would like to run on multiple processes/CPUs/workers outside of my webserver; however, are unsupported by Dask. In general, there might be some lesser known functions that are useful but use the base multiprocessing library/some sort of other way to multiprocess outside of Joblib or in the Dask Environment (looking at you Sci-kit) that someone might want to run outside of their main process. Rationale/Why do I ask: My Thought Process Therefore, since we are only using 1 worker, we might avoid having some nested parallelization errors or daemon processes errors, so I decided to test it out (these errors happen in celery https://stackoverflow.com/questions/30624290/celery-daemonic-processes-are-not-allowed-to-have-children). I don't have the knowledge yet to try to start up my own cluster, so I just tried to see if even the most basic example would work given the cross between Dask and Multiprocessing is the real potential issue. from gensim.test.utils import common_corpus, common_dictionary
from gensim.models import LdaMulticore
def LdaMulticoreFunction():
lda = LdaMulticore(common_corpus, id2word=common_dictionary, num_topics=10, workers=1)
print(lda)
return lda
import dask
from dask import compute, delayed
from dask.distributed import Client
dask.config.set({'distributed.worker.daemon': False}) (setting this to true returns the same result)
client = Client(threads_per_worker=1, n_workers=1)
client
delayed_values = [delayed(LdaMulticoreFunction)()]
results = list(compute(*delayed_values, scheduler='processes', num_workers=1)) Error Trace: Traceback (most recent call last):
File "<ipython-input-1-99462bf7d02a>", line 19, in <module>
results = list(compute(*delayed_values, scheduler='processes', num_workers=1))
File "C:\Users\omars\anaconda3\lib\site-packages\dask\base.py", line 437, in compute
results = schedule(dsk, keys, **kwargs)
File "C:\Users\omars\anaconda3\lib\site-packages\dask\multiprocessing.py", line 222, in get
**kwargs
File "C:\Users\omars\anaconda3\lib\site-packages\dask\local.py", line 486, in get_async
raise_exception(exc, tb)
File "C:\Users\omars\anaconda3\lib\site-packages\dask\local.py", line 316, in reraise
raise exc
File "C:\Users\omars\anaconda3\lib\site-packages\dask\local.py", line 222, in execute_task
result = _execute_task(task, data)
File "C:\Users\omars\anaconda3\lib\site-packages\dask\core.py", line 119, in _execute_task
return func(*args2)
File "<ipython-input-1-99462bf7d02a>", line 6, in LdaMulticoreFunction
lda = LdaMulticore(common_corpus, id2word=common_dictionary, num_topics=10, workers=1)
File "C:\Users\omars\anaconda3\lib\site-packages\gensim\models\ldamulticore.py", line 184, in __init__
minimum_phi_value=minimum_phi_value, per_word_topics=per_word_topics, dtype=dtype
File "C:\Users\omars\anaconda3\lib\site-packages\gensim\models\ldamodel.py", line 519, in __init__
self.update(corpus, chunks_as_numpy=use_numpy)
File "C:\Users\omars\anaconda3\lib\site-packages\gensim\models\ldamulticore.py", line 280, in update
pool = Pool(self.workers, worker_e_step, (job_queue, result_queue,))
File "C:\Users\omars\anaconda3\lib\multiprocessing\context.py", line 119, in Pool
context=self.get_context())
File "C:\Users\omars\anaconda3\lib\multiprocessing\pool.py", line 176, in __init__
self._repopulate_pool()
File "C:\Users\omars\anaconda3\lib\multiprocessing\pool.py", line 241, in _repopulate_pool
w.start()
File "C:\Users\omars\anaconda3\lib\multiprocessing\process.py", line 110, in start
'daemonic processes are not allowed to have children'
AssertionError: daemonic processes are not allowed to have children In this case, I am attempting to run a function from Gensim, a text-processing library, and one of there models that runs on multiple cores. What I've tried: Was wondering if anyone knew how to work around this |
Beta Was this translation helpful? Give feedback.
Replies: 1 comment
-
Found a potential answer, when I turn off Daemon Process for the client, I can use submit instead of delayed for the function. Until I can set up a cluster, this at least proves that it works and I can move forward. Most likely, this is due to: https://stackoverflow.com/questions/6974695/python-process-pool-non-daemonic, the fact that concurrent.futures supports non-daemonic workers, although I currently don't know enough about Dask right now to know why the delayed function didn't work. I'll still be setting up the Criticism is greatly welcomed! Here's the new code for anyone to comb: from gensim.test.utils import common_corpus, common_dictionary
from gensim.models import LdaMulticore
def LdaMulticoreFunction():
lda = LdaMulticore(common_corpus, id2word=common_dictionary, num_topics=10, workers=5)
print(lda)
return lda
import dask
from dask import compute, delayed
from dask.distributed import Client
dask.config.set({'distributed.worker.daemon': False})
# dask.config.set({'distributed.worker.daemon': False}) Doesn't work - proving that this is the correct implementation
client = Client(threads_per_worker=1, n_workers=5, processes=True)
client
x = client.submit(LdaMulticoreFunction)
results = x.result() |
Beta Was this translation helpful? Give feedback.
Found a potential answer, when I turn off Daemon Process for the client, I can use submit instead of delayed for the function. Until I can set up a cluster, this at least proves that it works and I can move forward.
Most likely, this is due to: https://stackoverflow.com/questions/6974695/python-process-pool-non-daemonic, the fact that concurrent.futures supports non-daemonic workers, although I currently don't know enough about Dask right now to know why the delayed function didn't work.
I'll still be setting up the
KubeCluster
(out of this scope) with only 1 Worker and larger # of processes to keep it on 1 single worker until someone else can tell you directly that submitting a function …