Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[App] Enable running with spawn context #15923

Merged
merged 19 commits into from Dec 7, 2022
2 changes: 2 additions & 0 deletions src/lightning_app/CHANGELOG.md
Expand Up @@ -19,6 +19,8 @@ The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/).

- Added the property `ready` of the LightningFlow to inform when the `Open App` should be visible ([#15921](https://github.com/Lightning-AI/lightning/pull/15921))

- Added private work attributed `_start_method` to customize how to start the works ([#15923](https://github.com/Lightning-AI/lightning/pull/15923))


### Changed

Expand Down
3 changes: 2 additions & 1 deletion src/lightning_app/core/queues.py
Expand Up @@ -198,7 +198,8 @@ class MultiProcessQueue(BaseQueue):
def __init__(self, name: str, default_timeout: float):
self.name = name
self.default_timeout = default_timeout
self.queue = multiprocessing.Queue()
context = multiprocessing.get_context("spawn")
self.queue = context.Queue()

def put(self, item):
self.queue.put(item)
Expand Down
1 change: 1 addition & 0 deletions src/lightning_app/core/work.py
Expand Up @@ -46,6 +46,7 @@ class LightningWork:
)

_run_executor_cls: Type[WorkRunExecutor] = WorkRunExecutor
_start_method = "fork"

def __init__(
self,
Expand Down
5 changes: 4 additions & 1 deletion src/lightning_app/runners/backends/mp_process.py
Expand Up @@ -31,7 +31,10 @@ def start(self):
flow_to_work_delta_queue=self.app.flow_to_work_delta_queues[self.work.name],
run_executor_cls=self.work._run_executor_cls,
)
self._process = multiprocessing.Process(target=self._work_runner)

start_method = getattr(self.work, "_start_method", "fork")
tchaton marked this conversation as resolved.
Show resolved Hide resolved
context = multiprocessing.get_context(start_method)
self._process = context.Process(target=self._work_runner)
self._process.start()

def kill(self):
Expand Down