-
-
Notifications
You must be signed in to change notification settings - Fork 8
/
test_task.py
74 lines (56 loc) · 2.95 KB
/
test_task.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
import pytest
from meilisearch_python_async.errors import MeiliSearchTimeoutError
from meilisearch_python_async.task import cancel_tasks, get_task, get_tasks, wait_for_task
async def test_cancel_tasks(test_client):
task = await cancel_tasks(test_client.http_client, uids=["1", "2"])
tasks = await get_tasks(test_client.http_client, types=["taskCancelation"])
assert task.task_uid is not None
assert task.index_uids is None
assert task.status in {"enqueued", "processing", "succeeded"}
assert task.task_type == "taskCancelation"
assert tasks[0].details is not None
assert "uids=1%2C2" in tasks[0].details["originalFilter"]
async def test_cancel_every_task(test_client):
task = await cancel_tasks(test_client.http_client, statuses=["enqueued", "processing"])
tasks = await get_tasks(test_client.http_client, types="taskCancelation")
assert task.task_uid is not None
assert task.index_uids is None
async def test_get_tasks(empty_index, small_movies):
index = await empty_index()
tasks = await get_tasks(index.http_client)
current_tasks = len(tasks)
response = await index.add_documents(small_movies)
await wait_for_task(index.http_client, response.task_uid)
response = await index.add_documents(small_movies)
await wait_for_task(index.http_client, response.task_uid)
response = await get_tasks(index.http_client)
assert len(response) >= current_tasks
async def test_get_tasks_for_index(empty_index, small_movies):
index = await empty_index()
tasks = await get_tasks(index.http_client, index_ids=[index.uid])
current_tasks = len(tasks)
response = await index.add_documents(small_movies)
await wait_for_task(index.http_client, response.task_uid)
response = await index.add_documents(small_movies)
await wait_for_task(index.http_client, response.task_uid)
response = await get_tasks(index.http_client, index_ids=[index.uid])
assert len(response) >= current_tasks
async def test_get_task(empty_index, small_movies):
index = await empty_index()
response = await index.add_documents(small_movies)
await wait_for_task(index.http_client, response.task_uid)
update = await get_task(index.http_client, response.task_uid)
assert update.status == "succeeded"
async def test_wait_for_task(empty_index, small_movies):
index = await empty_index()
response = await index.add_documents(small_movies)
update = await wait_for_task(index.http_client, response.task_uid)
assert update.status == "succeeded"
async def test_wait_for_pending_update_time_out(empty_index, small_movies):
index = await empty_index()
with pytest.raises(MeiliSearchTimeoutError):
response = await index.add_documents(small_movies)
await wait_for_task(index.http_client, response.task_uid, timeout_in_ms=1, interval_in_ms=1)
await wait_for_task( # Make sure the indexing finishes so subsequent tests don't have issues.
index.http_client, response.task_uid
)