From 7dac931b96537758c76711a4495c6aab8a4d3d58 Mon Sep 17 00:00:00 2001 From: Sam Smith Date: Tue, 6 Feb 2024 15:53:02 +0000 Subject: [PATCH] Fix: DOS-625 Address action execution blocking new requests * The usage of BackgroundTasks in the new execute_action code was found to be causing blockages of new requests being picked up by the event loop. I think this maybe a reccurance of https://github.com/encode/starlette/issues/919 which they claim is resolved, but still doesn't appear to be working for us. It may be due to one of our middlewares behaving in a particular way that causes this, but the fix applied here appears to be the simplest fix for now. * In this instance we have replaced the usage of background_tasks.add_task with asyncio.create_task that will run the task in the loop without worrying about its result. --- packages/dara-core/changelog.md | 4 ++++ packages/dara-core/dara/core/internal/execute_action.py | 9 +++++---- packages/dara-core/dara/core/internal/routing.py | 6 ++---- 3 files changed, 11 insertions(+), 8 deletions(-) diff --git a/packages/dara-core/changelog.md b/packages/dara-core/changelog.md index 046eb906..e50c61cc 100644 --- a/packages/dara-core/changelog.md +++ b/packages/dara-core/changelog.md @@ -2,6 +2,10 @@ title: Changelog --- +## NEXT + +- Address action execution blocking new requests due to an issue around BackgroundTask processing in starlette + ## 1.6.0 - Fixed an issue where import discovery would consider the same symbols repeatedly causing it to run much longer than necessary diff --git a/packages/dara-core/dara/core/internal/execute_action.py b/packages/dara-core/dara/core/internal/execute_action.py index b5bf7701..3c5d4c9b 100644 --- a/packages/dara-core/dara/core/internal/execute_action.py +++ b/packages/dara-core/dara/core/internal/execute_action.py @@ -16,11 +16,11 @@ """ from __future__ import annotations +import asyncio from contextvars import ContextVar from typing import Any, Callable, Mapping, Optional, Union import anyio -from fastapi import BackgroundTasks from dara.core.base_definitions import ActionResolverDef, BaseTask from dara.core.interactivity.actions import ( @@ -109,7 +109,6 @@ async def execute_action( ws_channel: str, store: CacheStore, task_mgr: TaskManager, - background_tasks: BackgroundTasks, ) -> Union[Any, BaseTask]: """ Execute a given action with the provided context. @@ -182,6 +181,8 @@ async def handle_action(act_impl: Optional[ActionImpl]): process_result=_stream_action, args=[action, ctx], kwargs=resolved_kwargs, notify_channels=notify_channels ) - # No tasks - run directly as bg task and return execution id - background_tasks.add_task(_stream_action, action, ctx, **resolved_kwargs) + # No tasks - run directly as an asyncio task and return the execution id + # Originally used to use FastAPI BackgroundTasks, but these ended up causing a blocking behavior that blocked some + # new requests from being handled until the task had ended + asyncio.create_task(_stream_action(action, ctx, **resolved_kwargs)) return execution_id diff --git a/packages/dara-core/dara/core/internal/routing.py b/packages/dara-core/dara/core/internal/routing.py index e9bc99db..59db86a0 100644 --- a/packages/dara-core/dara/core/internal/routing.py +++ b/packages/dara-core/dara/core/internal/routing.py @@ -128,9 +128,7 @@ class ActionRequestBody(BaseModel): """Execution id, unique to this request""" @core_api_router.post('/action/{uid}', dependencies=[Depends(verify_session)]) - async def get_action( - uid: str, body: ActionRequestBody, bg_tasks: BackgroundTasks - ): # pylint: disable=unused-variable + async def get_action(uid: str, body: ActionRequestBody): # pylint: disable=unused-variable store: CacheStore = utils_registry.get('Store') task_mgr: TaskManager = utils_registry.get('TaskManager') registry_mgr: RegistryLookup = utils_registry.get('RegistryLookup') @@ -147,7 +145,7 @@ async def get_action( # Execute the action - kick off a background task to run the handler response = await action_def.execute_action( - action_def, body.input, values, static_kwargs, body.execution_id, body.ws_channel, store, task_mgr, bg_tasks + action_def, body.input, values, static_kwargs, body.execution_id, body.ws_channel, store, task_mgr ) if isinstance(response, BaseTask):