Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix: DOS-625 Address action execution blocking new requests #189

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 4 additions & 0 deletions packages/dara-core/changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@
title: Changelog
---

## NEXT

- Address action execution blocking new requests due to an issue around BackgroundTask processing in starlette

## 1.6.0

- Fixed an issue where import discovery would consider the same symbols repeatedly causing it to run much longer than necessary
Expand Down
9 changes: 5 additions & 4 deletions packages/dara-core/dara/core/internal/execute_action.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,11 @@
"""
from __future__ import annotations

import asyncio
from contextvars import ContextVar
from typing import Any, Callable, Mapping, Optional, Union

import anyio
from fastapi import BackgroundTasks

from dara.core.base_definitions import ActionResolverDef, BaseTask
from dara.core.interactivity.actions import (
Expand Down Expand Up @@ -109,7 +109,6 @@ async def execute_action(
ws_channel: str,
store: CacheStore,
task_mgr: TaskManager,
background_tasks: BackgroundTasks,
) -> Union[Any, BaseTask]:
"""
Execute a given action with the provided context.
Expand Down Expand Up @@ -182,6 +181,8 @@ async def handle_action(act_impl: Optional[ActionImpl]):
process_result=_stream_action, args=[action, ctx], kwargs=resolved_kwargs, notify_channels=notify_channels
)

# No tasks - run directly as bg task and return execution id
background_tasks.add_task(_stream_action, action, ctx, **resolved_kwargs)
# No tasks - run directly as an asyncio task and return the execution id
# Originally used to use FastAPI BackgroundTasks, but these ended up causing a blocking behavior that blocked some
# new requests from being handled until the task had ended
asyncio.create_task(_stream_action(action, ctx, **resolved_kwargs))
return execution_id
17 changes: 3 additions & 14 deletions packages/dara-core/dara/core/internal/routing.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,7 @@
from typing import Any, Callable, List, Mapping, Optional

import pandas
from fastapi import (
APIRouter,
BackgroundTasks,
Depends,
File,
Form,
HTTPException,
Response,
UploadFile,
)
from fastapi import APIRouter, Depends, File, Form, HTTPException, Response, UploadFile
from fastapi.responses import StreamingResponse
from pandas import DataFrame
from pydantic import BaseModel
Expand Down Expand Up @@ -128,9 +119,7 @@ class ActionRequestBody(BaseModel):
"""Execution id, unique to this request"""

@core_api_router.post('/action/{uid}', dependencies=[Depends(verify_session)])
async def get_action(
uid: str, body: ActionRequestBody, bg_tasks: BackgroundTasks
): # pylint: disable=unused-variable
async def get_action(uid: str, body: ActionRequestBody): # pylint: disable=unused-variable
store: CacheStore = utils_registry.get('Store')
task_mgr: TaskManager = utils_registry.get('TaskManager')
registry_mgr: RegistryLookup = utils_registry.get('RegistryLookup')
Expand All @@ -147,7 +136,7 @@ async def get_action(

# Execute the action - kick off a background task to run the handler
response = await action_def.execute_action(
action_def, body.input, values, static_kwargs, body.execution_id, body.ws_channel, store, task_mgr, bg_tasks
action_def, body.input, values, static_kwargs, body.execution_id, body.ws_channel, store, task_mgr
)

if isinstance(response, BaseTask):
Expand Down