From d2a8fbf6dd308ffcaa7a39d01d272cf2e4b656b9 Mon Sep 17 00:00:00 2001 From: thomas chaton Date: Wed, 7 Dec 2022 03:01:31 +0000 Subject: [PATCH] [App] Enable running with spawn context (#15923) --- examples/app_installation_commands/app.py | 4 ++++ src/lightning_app/CHANGELOG.md | 2 ++ src/lightning_app/core/flow.py | 7 +++++++ src/lightning_app/core/queues.py | 3 ++- src/lightning_app/core/work.py | 3 +++ src/lightning_app/runners/backends/mp_process.py | 5 ++++- tests/tests_app/core/test_queues.py | 8 +++++--- 7 files changed, 27 insertions(+), 5 deletions(-) diff --git a/examples/app_installation_commands/app.py b/examples/app_installation_commands/app.py index 9eb1c2944ee2e..087d84b1335b2 100644 --- a/examples/app_installation_commands/app.py +++ b/examples/app_installation_commands/app.py @@ -13,6 +13,10 @@ def run(self): print("lmdb successfully installed") print("accessing a module in a Work or Flow body works!") + @property + def ready(self) -> bool: + return True + print(f"accessing an object in main code body works!: version={lmdb.version()}") diff --git a/src/lightning_app/CHANGELOG.md b/src/lightning_app/CHANGELOG.md index ec7812e6e7417..f7cd8be3db982 100644 --- a/src/lightning_app/CHANGELOG.md +++ b/src/lightning_app/CHANGELOG.md @@ -19,6 +19,8 @@ The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/). - Added the property `ready` of the LightningFlow to inform when the `Open App` should be visible ([#15921](https://github.com/Lightning-AI/lightning/pull/15921)) +- Added private work attributed `_start_method` to customize how to start the works ([#15923](https://github.com/Lightning-AI/lightning/pull/15923)) + ### Changed diff --git a/src/lightning_app/core/flow.py b/src/lightning_app/core/flow.py index 72527bf7aee6f..56947b0d2cbef 100644 --- a/src/lightning_app/core/flow.py +++ b/src/lightning_app/core/flow.py @@ -763,6 +763,13 @@ def __init__(self, work): super().__init__() self.work = work + @property + def ready(self) -> bool: + ready = getattr(self.work, "ready", None) + if ready: + return ready + return self.work.url != "" + def run(self): if self.work.has_succeeded: self.work.stop() diff --git a/src/lightning_app/core/queues.py b/src/lightning_app/core/queues.py index a7fee9a3b6e12..f38942915abc3 100644 --- a/src/lightning_app/core/queues.py +++ b/src/lightning_app/core/queues.py @@ -198,7 +198,8 @@ class MultiProcessQueue(BaseQueue): def __init__(self, name: str, default_timeout: float): self.name = name self.default_timeout = default_timeout - self.queue = multiprocessing.Queue() + context = multiprocessing.get_context("spawn") + self.queue = context.Queue() def put(self, item): self.queue.put(item) diff --git a/src/lightning_app/core/work.py b/src/lightning_app/core/work.py index ab0dc8426ac91..857cbc9447ff1 100644 --- a/src/lightning_app/core/work.py +++ b/src/lightning_app/core/work.py @@ -1,3 +1,4 @@ +import sys import time import warnings from copy import deepcopy @@ -46,6 +47,8 @@ class LightningWork: ) _run_executor_cls: Type[WorkRunExecutor] = WorkRunExecutor + # TODO: Move to spawn for all Operating System. + _start_method = "spawn" if sys.platform == "win32" else "fork" def __init__( self, diff --git a/src/lightning_app/runners/backends/mp_process.py b/src/lightning_app/runners/backends/mp_process.py index 36a067d0bfd80..dc0681390046e 100644 --- a/src/lightning_app/runners/backends/mp_process.py +++ b/src/lightning_app/runners/backends/mp_process.py @@ -31,7 +31,10 @@ def start(self): flow_to_work_delta_queue=self.app.flow_to_work_delta_queues[self.work.name], run_executor_cls=self.work._run_executor_cls, ) - self._process = multiprocessing.Process(target=self._work_runner) + + start_method = self.work._start_method + context = multiprocessing.get_context(start_method) + self._process = context.Process(target=self._work_runner) self._process.start() def kill(self): diff --git a/tests/tests_app/core/test_queues.py b/tests/tests_app/core/test_queues.py index 899ad9f606e85..0f930bcacabc7 100644 --- a/tests/tests_app/core/test_queues.py +++ b/tests/tests_app/core/test_queues.py @@ -5,7 +5,6 @@ from unittest import mock import pytest -import redis import requests_mock from lightning_app import LightningFlow @@ -23,6 +22,7 @@ def test_queue_api(queue_type, monkeypatch): This test run all the Queue implementation but we monkeypatch the Redis Queues to avoid external interaction """ + import redis blpop_out = (b"entry-id", pickle.dumps("test_entry")) @@ -104,12 +104,14 @@ def test_redis_queue_read_timeout(redis_mock): @pytest.mark.parametrize( "queue_type, queue_process_mock", - [(QueuingSystem.SINGLEPROCESS, queue), (QueuingSystem.MULTIPROCESS, multiprocessing)], + [(QueuingSystem.MULTIPROCESS, multiprocessing)], ) def test_process_queue_read_timeout(queue_type, queue_process_mock, monkeypatch): + context = mock.MagicMock() queue_mocked = mock.MagicMock() - monkeypatch.setattr(queue_process_mock, "Queue", queue_mocked) + context.Queue = queue_mocked + monkeypatch.setattr(queue_process_mock, "get_context", mock.MagicMock(return_value=context)) my_queue = queue_type.get_readiness_queue() # default timeout