Skip to content

Commit

Permalink
Fix: DOS-625 Address action execution blocking new requests
Browse files Browse the repository at this point in the history
* The usage of BackgroundTasks in the new execute_action code was found to be causing blockages of new requests being picked up by the event loop. I think this maybe a reccurance of encode/starlette#919 which they claim is resolved, but still doesn't appear to be working for us. It may be due to one of our middlewares behaving in a particular way that causes this, but the fix applied here appears to be the simplest fix for now.
* In this instance we have replaced the usage of background_tasks.add_task with asyncio.create_task that will run the task in the loop without worrying about its result.
  • Loading branch information
sam-causalens committed Feb 6, 2024
1 parent 20a72f8 commit 7dac931
Show file tree
Hide file tree
Showing 3 changed files with 11 additions and 8 deletions.
4 changes: 4 additions & 0 deletions packages/dara-core/changelog.md
Expand Up @@ -2,6 +2,10 @@
title: Changelog
---

## NEXT

- Address action execution blocking new requests due to an issue around BackgroundTask processing in starlette

## 1.6.0

- Fixed an issue where import discovery would consider the same symbols repeatedly causing it to run much longer than necessary
Expand Down
9 changes: 5 additions & 4 deletions packages/dara-core/dara/core/internal/execute_action.py
Expand Up @@ -16,11 +16,11 @@
"""
from __future__ import annotations

import asyncio
from contextvars import ContextVar
from typing import Any, Callable, Mapping, Optional, Union

import anyio
from fastapi import BackgroundTasks

from dara.core.base_definitions import ActionResolverDef, BaseTask
from dara.core.interactivity.actions import (
Expand Down Expand Up @@ -109,7 +109,6 @@ async def execute_action(
ws_channel: str,
store: CacheStore,
task_mgr: TaskManager,
background_tasks: BackgroundTasks,
) -> Union[Any, BaseTask]:
"""
Execute a given action with the provided context.
Expand Down Expand Up @@ -182,6 +181,8 @@ async def handle_action(act_impl: Optional[ActionImpl]):
process_result=_stream_action, args=[action, ctx], kwargs=resolved_kwargs, notify_channels=notify_channels
)

# No tasks - run directly as bg task and return execution id
background_tasks.add_task(_stream_action, action, ctx, **resolved_kwargs)
# No tasks - run directly as an asyncio task and return the execution id
# Originally used to use FastAPI BackgroundTasks, but these ended up causing a blocking behavior that blocked some
# new requests from being handled until the task had ended
asyncio.create_task(_stream_action(action, ctx, **resolved_kwargs))
return execution_id
6 changes: 2 additions & 4 deletions packages/dara-core/dara/core/internal/routing.py
Expand Up @@ -128,9 +128,7 @@ class ActionRequestBody(BaseModel):
"""Execution id, unique to this request"""

@core_api_router.post('/action/{uid}', dependencies=[Depends(verify_session)])
async def get_action(
uid: str, body: ActionRequestBody, bg_tasks: BackgroundTasks
): # pylint: disable=unused-variable
async def get_action(uid: str, body: ActionRequestBody): # pylint: disable=unused-variable
store: CacheStore = utils_registry.get('Store')
task_mgr: TaskManager = utils_registry.get('TaskManager')
registry_mgr: RegistryLookup = utils_registry.get('RegistryLookup')
Expand All @@ -147,7 +145,7 @@ async def get_action(

# Execute the action - kick off a background task to run the handler
response = await action_def.execute_action(
action_def, body.input, values, static_kwargs, body.execution_id, body.ws_channel, store, task_mgr, bg_tasks
action_def, body.input, values, static_kwargs, body.execution_id, body.ws_channel, store, task_mgr
)

if isinstance(response, BaseTask):
Expand Down

0 comments on commit 7dac931

Please sign in to comment.