Skip to content

Commit

Permalink
Merge pull request #327 from sanders41/cancel-task
Browse files Browse the repository at this point in the history
Add task cancellation
  • Loading branch information
sanders41 committed Nov 20, 2022
2 parents 3d5dc8c + cf8e0c0 commit 8fa64d5
Show file tree
Hide file tree
Showing 3 changed files with 112 additions and 14 deletions.
8 changes: 5 additions & 3 deletions meilisearch_python_async/models/task.py
@@ -1,5 +1,5 @@
from datetime import datetime
from typing import Any, Dict, Optional, Union
from typing import Any, Dict, List, Optional, Union

from camel_converter.pydantic_base import CamelBase
from pydantic import Field
Expand All @@ -10,10 +10,12 @@ class TaskId(CamelBase):


class TaskStatus(TaskId):
index_uid: Optional[str] = None
index_uids: Optional[List[str]] = None
status: str
task_type: Union[str, Dict[str, Any]] = Field(..., alias="type")
details: Optional[Dict[str, Any]]
error: Optional[Dict[str, Any]]
canceled_by: Optional[int]
duration: Optional[str]
enqueued_at: datetime
started_at: Optional[datetime]
Expand All @@ -22,7 +24,7 @@ class TaskStatus(TaskId):

class TaskInfo(CamelBase):
task_uid: int
index_uid: Optional[str] = None
index_uids: Optional[List[str]] = None
status: str
task_type: Union[str, Dict[str, Any]] = Field(..., alias="type")
enqueued_at: datetime
92 changes: 84 additions & 8 deletions meilisearch_python_async/task.py
Expand Up @@ -2,22 +2,96 @@

from asyncio import sleep
from datetime import datetime
from urllib.parse import urlencode

from httpx import AsyncClient

from meilisearch_python_async._http_requests import HttpRequests
from meilisearch_python_async.errors import MeiliSearchTimeoutError
from meilisearch_python_async.models.task import TaskStatus
from meilisearch_python_async.models.task import TaskInfo, TaskStatus


async def cancel_tasks(
http_client: AsyncClient,
*,
uids: list[str] | None = None,
index_uids: list[str] | None = None,
statuses: list[str] | None = None,
types: list[str] | None = None,
before_enqueued_at: datetime | None = None,
after_enqueueda_at: datetime | None = None,
before_started_at: datetime | None = None,
after_finished_at: datetime | None = None,
) -> TaskInfo:
"""Cancel a list of enqueued or processing tasks.
Args:
uids: A list of task UIDs to cancel.
index_uids: A list of index UIDs for which to cancel tasks.
statuses: A list of statuses to cancel.
types: A list of types to cancel.
before_enqueued_at: Cancel tasks that were enqueued before the specified date time.
after_enqueueda_at: Cancel tasks that were enqueued after the specified date time.
before_started_at: Cancel tasks that were started before the specified date time.
after_finished_at: Cancel tasks that were finished after the specified date time.
Returns:
The details of the task
async def get_tasks(http_client: AsyncClient, index_id: str | None = None) -> list[TaskStatus]:
"""Get all tasks.
Raises:
MeilisearchCommunicationError: If there was an error communicating with the server.
MeilisearchApiError: If the MeiliSearch API returned an error.
MeiliSearchTimeoutError: If the connection times out.
Examples:
>>> from meilisearch_python_async import Client
>>> from meilisearch_python_async.task import cancel_tasks
>>>
>>> async with Client("http://localhost.com", "masterKey") as client:
>>> await cancel_tasks(client.http_client, uids=[1, 2])
"""
# parameters = {"uids": uids, "indexUids": index_uids}
parameters = {}
if uids:
parameters["uids"] = ",".join([str(x) for x in uids])
if index_uids:
parameters["indexUids"] = ",".join([str(x) for x in index_uids])
if statuses:
parameters["statuses"] = ",".join(statuses)
if types:
parameters["types"] = ",".join(types)
if before_enqueued_at:
parameters["beforeEnqueuedAt"] = str(before_enqueued_at)
if after_enqueueda_at:
parameters["afterEnqueuedAt"] = str(after_enqueueda_at)
if before_started_at:
parameters["beforeStartedAt"] = str(before_started_at)
if after_finished_at:
parameters["afterFinishedAt"] = str(after_finished_at)
url = f"tasks/cancel?{urlencode(parameters)}"
response = await http_client.post(url)

return TaskInfo(**response.json())


async def get_tasks(
http_client: AsyncClient,
*,
index_ids: list[str] | None = None,
types: str | list[str] | None = None,
) -> list[TaskStatus]:
"""Get multiple tasks.
Args:
http_client: An AsyncClient instance.
index_id: The id of the index for which to get the tasks. If provided this will get the
tasks only for the specified index, if not all tasks will be returned. Default = None
index_ids: A list of index UIDs for which to get the tasks. If provided this will get the
tasks only for the specified indexes, if not all tasks will be returned. Default = None
types: Specify specific task types to retrieve. Default = None
Returns:
Expand All @@ -37,11 +111,13 @@ async def get_tasks(http_client: AsyncClient, index_id: str | None = None) -> li
>>> async with Client("http://localhost.com", "masterKey") as client:
>>> await get_tasks(client.http_client)
"""
url = f"tasks?indexUid={index_id}" if index_id else "tasks"
url = f"tasks?indexUids={','.join(index_ids)}" if index_ids else "tasks"
if types:
formatted_types = ",".join(types) if isinstance(types, list) else types
url = f"{url}&types={formatted_types}" if "?" in url else f"{url}?types={formatted_types}"
response = await http_client.get(url)
tasks = [TaskStatus(**x) for x in response.json()["results"]]

return tasks
return [TaskStatus(**x) for x in response.json()["results"]]


async def get_task(http_client: AsyncClient, task_id: int) -> TaskStatus:
Expand Down
26 changes: 23 additions & 3 deletions tests/test_task.py
@@ -1,7 +1,27 @@
import pytest

from meilisearch_python_async.errors import MeiliSearchTimeoutError
from meilisearch_python_async.task import get_task, get_tasks, wait_for_task
from meilisearch_python_async.task import cancel_tasks, get_task, get_tasks, wait_for_task


async def test_cancel_tasks(test_client):
task = await cancel_tasks(test_client.http_client, uids=["1", "2"])
tasks = await get_tasks(test_client.http_client, types=["taskCancelation"])

assert task.task_uid is not None
assert task.index_uids is None
assert task.status in {"enqueued", "processing", "succeeded"}
assert task.task_type == "taskCancelation"
assert tasks[0].details is not None
assert "uids=1%2C2" in tasks[0].details["originalFilter"]


async def test_cancel_every_task(test_client):
task = await cancel_tasks(test_client.http_client, statuses=["enqueued", "processing"])
tasks = await get_tasks(test_client.http_client, types="taskCancelation")

assert task.task_uid is not None
assert task.index_uids is None


async def test_get_tasks(empty_index, small_movies):
Expand All @@ -18,13 +38,13 @@ async def test_get_tasks(empty_index, small_movies):

async def test_get_tasks_for_index(empty_index, small_movies):
index = await empty_index()
tasks = await get_tasks(index.http_client, index.uid)
tasks = await get_tasks(index.http_client, index_ids=[index.uid])
current_tasks = len(tasks)
response = await index.add_documents(small_movies)
await wait_for_task(index.http_client, response.task_uid)
response = await index.add_documents(small_movies)
await wait_for_task(index.http_client, response.task_uid)
response = await get_tasks(index.http_client, index.uid)
response = await get_tasks(index.http_client, index_ids=[index.uid])
assert len(response) >= current_tasks


Expand Down

0 comments on commit 8fa64d5

Please sign in to comment.