Skip to content

Commit

Permalink
Merge pull request #308 from sanders41/refactor
Browse files Browse the repository at this point in the history
Refactor batching for better concurrency
  • Loading branch information
sanders41 committed Oct 3, 2022
2 parents 7669878 + 87b53e4 commit 99abf01
Show file tree
Hide file tree
Showing 3 changed files with 99 additions and 117 deletions.
204 changes: 93 additions & 111 deletions meilisearch_python_async/index.py
Expand Up @@ -6,7 +6,7 @@
from datetime import datetime
from functools import partial
from pathlib import Path
from typing import Any, AsyncGenerator
from typing import Any, Generator
from urllib.parse import urlencode

import aiofiles
Expand Down Expand Up @@ -49,8 +49,8 @@ def __init__(
"""
self.uid = uid
self.primary_key = primary_key
self.created_at: datetime | None = Index._iso_to_date_time(created_at)
self.updated_at: datetime | None = Index._iso_to_date_time(updated_at)
self.created_at: datetime | None = _iso_to_date_time(created_at)
self.updated_at: datetime | None = _iso_to_date_time(updated_at)
self._base_url = "indexes/"
self._base_url_with_uid = f"{self._base_url}{self.uid}"
self._documents_url = f"{self._base_url_with_uid}/documents"
Expand Down Expand Up @@ -170,10 +170,10 @@ async def fetch_info(self) -> Index:
self.primary_key = index_dict["primaryKey"]
loop = get_running_loop()
self.created_at = await loop.run_in_executor(
None, partial(Index._iso_to_date_time, index_dict["createdAt"])
None, partial(_iso_to_date_time, index_dict["createdAt"])
)
self.updated_at = await loop.run_in_executor(
None, partial(Index._iso_to_date_time, index_dict["updatedAt"])
None, partial(_iso_to_date_time, index_dict["updatedAt"])
)
return self

Expand Down Expand Up @@ -492,13 +492,8 @@ async def add_documents_in_batches(
>>> index = client.index("movies")
>>> await index.add_documents_in_batches(documents)
"""
task_ids: list[TaskInfo] = []

async for document_batch in Index._batch(documents, batch_size):
task_id = await self.add_documents(document_batch, primary_key)
task_ids.append(task_id)

return task_ids
batches = [self.add_documents(x, primary_key) for x in _batch(documents, batch_size)]
return await gather(*batches)

async def add_documents_from_directory(
self,
Expand Down Expand Up @@ -547,26 +542,24 @@ async def add_documents_from_directory(
all_documents = []
for path in directory.iterdir():
if path.suffix == f".{document_type}":
documents = await Index._load_documents_from_file(path)
documents = await _load_documents_from_file(path)
all_documents.append(documents)

Index._raise_on_no_documents(all_documents, document_type, directory_path)
_raise_on_no_documents(all_documents, document_type, directory_path)

loop = get_running_loop()
combined = await loop.run_in_executor(
None, partial(Index._combine_documents, all_documents)
)
combined = await loop.run_in_executor(None, partial(_combine_documents, all_documents))

response = await self.add_documents(combined, primary_key)
return [response]

add_documents = []
for path in directory.iterdir():
if path.suffix == f".{document_type}":
documents = await Index._load_documents_from_file(path)
documents = await _load_documents_from_file(path)
add_documents.append(self.add_documents(documents, primary_key))

Index._raise_on_no_documents(add_documents, document_type, directory_path)
_raise_on_no_documents(add_documents, document_type, directory_path)

if len(add_documents) > 1:
# Send the first document on its own before starting the gather. Otherwise MeiliSearch
Expand Down Expand Up @@ -629,15 +622,13 @@ async def add_documents_from_directory_in_batches(
all_documents = []
for path in directory.iterdir():
if path.suffix == f".{document_type}":
documents = await Index._load_documents_from_file(path)
documents = await _load_documents_from_file(path)
all_documents.append(documents)

Index._raise_on_no_documents(all_documents, document_type, directory_path)
_raise_on_no_documents(all_documents, document_type, directory_path)

loop = get_running_loop()
combined = await loop.run_in_executor(
None, partial(Index._combine_documents, all_documents)
)
combined = await loop.run_in_executor(None, partial(_combine_documents, all_documents))

return await self.add_documents_in_batches(
combined, batch_size=batch_size, primary_key=primary_key
Expand All @@ -648,14 +639,14 @@ async def add_documents_from_directory_in_batches(
add_documents = []
for path in directory.iterdir():
if path.suffix == f".{document_type}":
documents = await Index._load_documents_from_file(path)
documents = await _load_documents_from_file(path)
add_documents.append(
self.add_documents_in_batches(
documents, batch_size=batch_size, primary_key=primary_key
)
)

Index._raise_on_no_documents(add_documents, document_type, directory_path)
_raise_on_no_documents(add_documents, document_type, directory_path)

if len(add_documents) > 1:
# Send the first document on its own before starting the gather. Otherwise MeiliSearch
Expand Down Expand Up @@ -699,7 +690,7 @@ async def add_documents_from_file(
>>> index = client.index("movies")
>>> await index.add_documents_from_file(file_path)
"""
documents = await Index._load_documents_from_file(file_path)
documents = await _load_documents_from_file(file_path)

return await self.add_documents(documents, primary_key=primary_key)

Expand Down Expand Up @@ -736,7 +727,7 @@ async def add_documents_from_file_in_batches(
>>> index = client.index("movies")
>>> await index.add_documents_from_file_in_batches(file_path)
"""
documents = await Index._load_documents_from_file(file_path)
documents = await _load_documents_from_file(file_path)

return await self.add_documents_in_batches(
documents, batch_size=batch_size, primary_key=primary_key
Expand Down Expand Up @@ -871,13 +862,8 @@ async def update_documents_in_batches(
>>> index = client.index("movies")
>>> await index.update_documents_in_batches(documents)
"""
task_ids: list[TaskInfo] = []

async for document_batch in Index._batch(documents, batch_size):
task_id = await self.update_documents(document_batch, primary_key)
task_ids.append(task_id)

return task_ids
batches = [self.update_documents(x, primary_key) for x in _batch(documents, batch_size)]
return await gather(*batches)

async def update_documents_from_directory(
self,
Expand Down Expand Up @@ -926,26 +912,24 @@ async def update_documents_from_directory(
all_documents = []
for path in directory.iterdir():
if path.suffix == f".{document_type}":
documents = await Index._load_documents_from_file(path)
documents = await _load_documents_from_file(path)
all_documents.append(documents)

Index._raise_on_no_documents(all_documents, document_type, directory_path)
_raise_on_no_documents(all_documents, document_type, directory_path)

loop = get_running_loop()
combined = await loop.run_in_executor(
None, partial(Index._combine_documents, all_documents)
)
combined = await loop.run_in_executor(None, partial(_combine_documents, all_documents))

response = await self.update_documents(combined, primary_key)
return [response]

update_documents = []
for path in directory.iterdir():
if path.suffix == f".{document_type}":
documents = await Index._load_documents_from_file(path)
documents = await _load_documents_from_file(path)
update_documents.append(self.update_documents(documents, primary_key))

Index._raise_on_no_documents(update_documents, document_type, directory_path)
_raise_on_no_documents(update_documents, document_type, directory_path)

if len(update_documents) > 1:
# Send the first document on its own before starting the gather. Otherwise MeiliSearch
Expand Down Expand Up @@ -1008,15 +992,13 @@ async def update_documents_from_directory_in_batches(
all_documents = []
for path in directory.iterdir():
if path.suffix == f".{document_type}":
documents = await Index._load_documents_from_file(path)
documents = await _load_documents_from_file(path)
all_documents.append(documents)

Index._raise_on_no_documents(all_documents, document_type, directory_path)
_raise_on_no_documents(all_documents, document_type, directory_path)

loop = get_running_loop()
combined = await loop.run_in_executor(
None, partial(Index._combine_documents, all_documents)
)
combined = await loop.run_in_executor(None, partial(_combine_documents, all_documents))

return await self.update_documents_in_batches(
combined, batch_size=batch_size, primary_key=primary_key
Expand All @@ -1027,14 +1009,14 @@ async def update_documents_from_directory_in_batches(
update_documents = []
for path in directory.iterdir():
if path.suffix == f".{document_type}":
documents = await Index._load_documents_from_file(path)
documents = await _load_documents_from_file(path)
update_documents.append(
self.update_documents_in_batches(
documents, batch_size=batch_size, primary_key=primary_key
)
)

Index._raise_on_no_documents(update_documents, document_type, directory_path)
_raise_on_no_documents(update_documents, document_type, directory_path)

if len(update_documents) > 1:
# Send the first document on its own before starting the gather. Otherwise MeiliSearch
Expand Down Expand Up @@ -1076,7 +1058,7 @@ async def update_documents_from_file(
>>> index = client.index("movies")
>>> await index.update_documents_from_file(file_path)
"""
documents = await Index._load_documents_from_file(file_path)
documents = await _load_documents_from_file(file_path)

return await self.update_documents(documents, primary_key=primary_key)

Expand Down Expand Up @@ -1111,7 +1093,7 @@ async def update_documents_from_file_in_batches(
>>> index = client.index("movies")
>>> await index.update_documents_from_file_in_batches(file_path)
"""
documents = await Index._load_documents_from_file(file_path)
documents = await _load_documents_from_file(file_path)

return await self.update_documents_in_batches(
documents, batch_size=batch_size, primary_key=primary_key
Expand Down Expand Up @@ -2124,73 +2106,73 @@ async def reset_faceting(self) -> TaskInfo:

return TaskInfo(**response.json())

@staticmethod
async def _batch(documents: list[dict], batch_size: int) -> AsyncGenerator[list[dict], None]:
total_len = len(documents)
for i in range(0, total_len, batch_size):
yield documents[i : i + batch_size]

@staticmethod
def _raise_on_no_documents(
documents: list[Any], document_type: str, directory_path: str | Path
) -> None:
if not documents:
raise MeiliSearchError(f"No {document_type} files found in {directory_path}")

@staticmethod
def _combine_documents(documents: list[list[Any]]) -> list[Any]:
return [x for y in documents for x in y]

@staticmethod
def _iso_to_date_time(iso_date: datetime | str | None) -> datetime | None:
"""Handle conversion of iso string to datetime.
The microseconds from MeiliSearch are sometimes too long for python to convert so this
strips off the last digits to shorten it when that happens.
"""
if not iso_date:
return None

if isinstance(iso_date, datetime):
return iso_date
def _batch(documents: list[dict], batch_size: int) -> Generator[list[dict], None, None]:
total_len = len(documents)
for i in range(0, total_len, batch_size):
yield documents[i : i + batch_size]

try:
return datetime.strptime(iso_date, "%Y-%m-%dT%H:%M:%S.%fZ")
except ValueError:
split = iso_date.split(".")
reduce = len(split[1]) - 6
reduced = f"{split[0]}.{split[1][:-reduce]}Z"
return datetime.strptime(reduced, "%Y-%m-%dT%H:%M:%S.%fZ")

@staticmethod
async def _load_documents_from_file(
file_path: Path | str,
) -> list[dict[Any, Any]]:
if isinstance(file_path, str):
file_path = Path(file_path)
def _raise_on_no_documents(
documents: list[Any], document_type: str, directory_path: str | Path
) -> None:
if not documents:
raise MeiliSearchError(f"No {document_type} files found in {directory_path}")

loop = get_running_loop()
await loop.run_in_executor(None, partial(Index._validate_file_type, file_path))

if file_path.suffix == ".csv":
with open(file_path, mode="r") as f:
documents = await loop.run_in_executor(None, partial(DictReader, f))
return list(documents)
def _combine_documents(documents: list[list[Any]]) -> list[Any]:
return [x for y in documents for x in y]


def _iso_to_date_time(iso_date: datetime | str | None) -> datetime | None:
"""Handle conversion of iso string to datetime.
The microseconds from MeiliSearch are sometimes too long for python to convert so this
strips off the last digits to shorten it when that happens.
"""
if not iso_date:
return None

if isinstance(iso_date, datetime):
return iso_date

try:
return datetime.strptime(iso_date, "%Y-%m-%dT%H:%M:%S.%fZ")
except ValueError:
split = iso_date.split(".")
reduce = len(split[1]) - 6
reduced = f"{split[0]}.{split[1][:-reduce]}Z"
return datetime.strptime(reduced, "%Y-%m-%dT%H:%M:%S.%fZ")


async def _load_documents_from_file(
file_path: Path | str,
) -> list[dict[Any, Any]]:
if isinstance(file_path, str):
file_path = Path(file_path)

loop = get_running_loop()
await loop.run_in_executor(None, partial(_validate_file_type, file_path))

if file_path.suffix == ".csv":
with open(file_path, mode="r") as f:
documents = await loop.run_in_executor(None, partial(DictReader, f))
return list(documents)

if file_path.suffix == ".ndjson":
with open(file_path, mode="r") as f:
return [await loop.run_in_executor(None, partial(json.loads, x)) for x in f]

if file_path.suffix == ".ndjson":
with open(file_path, mode="r") as f:
return [await loop.run_in_executor(None, partial(json.loads, x)) for x in f]
async with aiofiles.open(file_path, mode="r") as f: # type: ignore
data = await f.read() # type: ignore
documents = await loop.run_in_executor(None, partial(json.loads, data))

async with aiofiles.open(file_path, mode="r") as f: # type: ignore
data = await f.read() # type: ignore
documents = await loop.run_in_executor(None, partial(json.loads, data))
if not isinstance(documents, list):
raise InvalidDocumentError("MeiliSearch requires documents to be in a list")

if not isinstance(documents, list):
raise InvalidDocumentError("MeiliSearch requires documents to be in a list")
return documents

return documents

@staticmethod
def _validate_file_type(file_path: Path) -> None:
if file_path.suffix not in (".json", ".csv", ".ndjson"):
raise MeiliSearchError("File must be a json, ndjson, or csv file")
def _validate_file_type(file_path: Path) -> None:
if file_path.suffix not in (".json", ".csv", ".ndjson"):
raise MeiliSearchError("File must be a json, ndjson, or csv file")
6 changes: 3 additions & 3 deletions tests/test_documents.py
Expand Up @@ -9,7 +9,7 @@
MeiliSearchApiError,
MeiliSearchError,
)
from meilisearch_python_async.index import Index
from meilisearch_python_async.index import _combine_documents, _load_documents_from_file
from meilisearch_python_async.task import wait_for_task


Expand Down Expand Up @@ -914,7 +914,7 @@ async def test_load_documents_from_file_invalid_document(tmp_path):
json.dump(doc, f)

with pytest.raises(InvalidDocumentError):
await Index._load_documents_from_file(file_path)
await _load_documents_from_file(file_path)


def test_combine_documents():
Expand All @@ -923,7 +923,7 @@ def test_combine_documents():
[{"id": 3, "name": "Test 3"}],
]

combined = Index._combine_documents(docs)
combined = _combine_documents(docs)

assert len(combined) == 3
assert [1, 2, 3] == [x["id"] for x in combined]

0 comments on commit 99abf01

Please sign in to comment.