From 87b53e4ee17427aed1d0708bedee83484adc3e14 Mon Sep 17 00:00:00 2001 From: Paul Sanders Date: Sun, 2 Oct 2022 21:32:03 -0400 Subject: [PATCH] Refactor batching for better concurency --- meilisearch_python_async/index.py | 204 ++++++++++++++---------------- tests/test_documents.py | 6 +- tests/test_index.py | 6 +- 3 files changed, 99 insertions(+), 117 deletions(-) diff --git a/meilisearch_python_async/index.py b/meilisearch_python_async/index.py index 5c69c2c4..f0da17e7 100644 --- a/meilisearch_python_async/index.py +++ b/meilisearch_python_async/index.py @@ -6,7 +6,7 @@ from datetime import datetime from functools import partial from pathlib import Path -from typing import Any, AsyncGenerator +from typing import Any, Generator from urllib.parse import urlencode import aiofiles @@ -49,8 +49,8 @@ def __init__( """ self.uid = uid self.primary_key = primary_key - self.created_at: datetime | None = Index._iso_to_date_time(created_at) - self.updated_at: datetime | None = Index._iso_to_date_time(updated_at) + self.created_at: datetime | None = _iso_to_date_time(created_at) + self.updated_at: datetime | None = _iso_to_date_time(updated_at) self._base_url = "indexes/" self._base_url_with_uid = f"{self._base_url}{self.uid}" self._documents_url = f"{self._base_url_with_uid}/documents" @@ -170,10 +170,10 @@ async def fetch_info(self) -> Index: self.primary_key = index_dict["primaryKey"] loop = get_running_loop() self.created_at = await loop.run_in_executor( - None, partial(Index._iso_to_date_time, index_dict["createdAt"]) + None, partial(_iso_to_date_time, index_dict["createdAt"]) ) self.updated_at = await loop.run_in_executor( - None, partial(Index._iso_to_date_time, index_dict["updatedAt"]) + None, partial(_iso_to_date_time, index_dict["updatedAt"]) ) return self @@ -492,13 +492,8 @@ async def add_documents_in_batches( >>> index = client.index("movies") >>> await index.add_documents_in_batches(documents) """ - task_ids: list[TaskInfo] = [] - - async for document_batch in Index._batch(documents, batch_size): - task_id = await self.add_documents(document_batch, primary_key) - task_ids.append(task_id) - - return task_ids + batches = [self.add_documents(x, primary_key) for x in _batch(documents, batch_size)] + return await gather(*batches) async def add_documents_from_directory( self, @@ -547,15 +542,13 @@ async def add_documents_from_directory( all_documents = [] for path in directory.iterdir(): if path.suffix == f".{document_type}": - documents = await Index._load_documents_from_file(path) + documents = await _load_documents_from_file(path) all_documents.append(documents) - Index._raise_on_no_documents(all_documents, document_type, directory_path) + _raise_on_no_documents(all_documents, document_type, directory_path) loop = get_running_loop() - combined = await loop.run_in_executor( - None, partial(Index._combine_documents, all_documents) - ) + combined = await loop.run_in_executor(None, partial(_combine_documents, all_documents)) response = await self.add_documents(combined, primary_key) return [response] @@ -563,10 +556,10 @@ async def add_documents_from_directory( add_documents = [] for path in directory.iterdir(): if path.suffix == f".{document_type}": - documents = await Index._load_documents_from_file(path) + documents = await _load_documents_from_file(path) add_documents.append(self.add_documents(documents, primary_key)) - Index._raise_on_no_documents(add_documents, document_type, directory_path) + _raise_on_no_documents(add_documents, document_type, directory_path) if len(add_documents) > 1: # Send the first document on its own before starting the gather. Otherwise MeiliSearch @@ -629,15 +622,13 @@ async def add_documents_from_directory_in_batches( all_documents = [] for path in directory.iterdir(): if path.suffix == f".{document_type}": - documents = await Index._load_documents_from_file(path) + documents = await _load_documents_from_file(path) all_documents.append(documents) - Index._raise_on_no_documents(all_documents, document_type, directory_path) + _raise_on_no_documents(all_documents, document_type, directory_path) loop = get_running_loop() - combined = await loop.run_in_executor( - None, partial(Index._combine_documents, all_documents) - ) + combined = await loop.run_in_executor(None, partial(_combine_documents, all_documents)) return await self.add_documents_in_batches( combined, batch_size=batch_size, primary_key=primary_key @@ -648,14 +639,14 @@ async def add_documents_from_directory_in_batches( add_documents = [] for path in directory.iterdir(): if path.suffix == f".{document_type}": - documents = await Index._load_documents_from_file(path) + documents = await _load_documents_from_file(path) add_documents.append( self.add_documents_in_batches( documents, batch_size=batch_size, primary_key=primary_key ) ) - Index._raise_on_no_documents(add_documents, document_type, directory_path) + _raise_on_no_documents(add_documents, document_type, directory_path) if len(add_documents) > 1: # Send the first document on its own before starting the gather. Otherwise MeiliSearch @@ -699,7 +690,7 @@ async def add_documents_from_file( >>> index = client.index("movies") >>> await index.add_documents_from_file(file_path) """ - documents = await Index._load_documents_from_file(file_path) + documents = await _load_documents_from_file(file_path) return await self.add_documents(documents, primary_key=primary_key) @@ -736,7 +727,7 @@ async def add_documents_from_file_in_batches( >>> index = client.index("movies") >>> await index.add_documents_from_file_in_batches(file_path) """ - documents = await Index._load_documents_from_file(file_path) + documents = await _load_documents_from_file(file_path) return await self.add_documents_in_batches( documents, batch_size=batch_size, primary_key=primary_key @@ -871,13 +862,8 @@ async def update_documents_in_batches( >>> index = client.index("movies") >>> await index.update_documents_in_batches(documents) """ - task_ids: list[TaskInfo] = [] - - async for document_batch in Index._batch(documents, batch_size): - task_id = await self.update_documents(document_batch, primary_key) - task_ids.append(task_id) - - return task_ids + batches = [self.update_documents(x, primary_key) for x in _batch(documents, batch_size)] + return await gather(*batches) async def update_documents_from_directory( self, @@ -926,15 +912,13 @@ async def update_documents_from_directory( all_documents = [] for path in directory.iterdir(): if path.suffix == f".{document_type}": - documents = await Index._load_documents_from_file(path) + documents = await _load_documents_from_file(path) all_documents.append(documents) - Index._raise_on_no_documents(all_documents, document_type, directory_path) + _raise_on_no_documents(all_documents, document_type, directory_path) loop = get_running_loop() - combined = await loop.run_in_executor( - None, partial(Index._combine_documents, all_documents) - ) + combined = await loop.run_in_executor(None, partial(_combine_documents, all_documents)) response = await self.update_documents(combined, primary_key) return [response] @@ -942,10 +926,10 @@ async def update_documents_from_directory( update_documents = [] for path in directory.iterdir(): if path.suffix == f".{document_type}": - documents = await Index._load_documents_from_file(path) + documents = await _load_documents_from_file(path) update_documents.append(self.update_documents(documents, primary_key)) - Index._raise_on_no_documents(update_documents, document_type, directory_path) + _raise_on_no_documents(update_documents, document_type, directory_path) if len(update_documents) > 1: # Send the first document on its own before starting the gather. Otherwise MeiliSearch @@ -1008,15 +992,13 @@ async def update_documents_from_directory_in_batches( all_documents = [] for path in directory.iterdir(): if path.suffix == f".{document_type}": - documents = await Index._load_documents_from_file(path) + documents = await _load_documents_from_file(path) all_documents.append(documents) - Index._raise_on_no_documents(all_documents, document_type, directory_path) + _raise_on_no_documents(all_documents, document_type, directory_path) loop = get_running_loop() - combined = await loop.run_in_executor( - None, partial(Index._combine_documents, all_documents) - ) + combined = await loop.run_in_executor(None, partial(_combine_documents, all_documents)) return await self.update_documents_in_batches( combined, batch_size=batch_size, primary_key=primary_key @@ -1027,14 +1009,14 @@ async def update_documents_from_directory_in_batches( update_documents = [] for path in directory.iterdir(): if path.suffix == f".{document_type}": - documents = await Index._load_documents_from_file(path) + documents = await _load_documents_from_file(path) update_documents.append( self.update_documents_in_batches( documents, batch_size=batch_size, primary_key=primary_key ) ) - Index._raise_on_no_documents(update_documents, document_type, directory_path) + _raise_on_no_documents(update_documents, document_type, directory_path) if len(update_documents) > 1: # Send the first document on its own before starting the gather. Otherwise MeiliSearch @@ -1076,7 +1058,7 @@ async def update_documents_from_file( >>> index = client.index("movies") >>> await index.update_documents_from_file(file_path) """ - documents = await Index._load_documents_from_file(file_path) + documents = await _load_documents_from_file(file_path) return await self.update_documents(documents, primary_key=primary_key) @@ -1111,7 +1093,7 @@ async def update_documents_from_file_in_batches( >>> index = client.index("movies") >>> await index.update_documents_from_file_in_batches(file_path) """ - documents = await Index._load_documents_from_file(file_path) + documents = await _load_documents_from_file(file_path) return await self.update_documents_in_batches( documents, batch_size=batch_size, primary_key=primary_key @@ -2124,73 +2106,73 @@ async def reset_faceting(self) -> TaskInfo: return TaskInfo(**response.json()) - @staticmethod - async def _batch(documents: list[dict], batch_size: int) -> AsyncGenerator[list[dict], None]: - total_len = len(documents) - for i in range(0, total_len, batch_size): - yield documents[i : i + batch_size] - - @staticmethod - def _raise_on_no_documents( - documents: list[Any], document_type: str, directory_path: str | Path - ) -> None: - if not documents: - raise MeiliSearchError(f"No {document_type} files found in {directory_path}") - - @staticmethod - def _combine_documents(documents: list[list[Any]]) -> list[Any]: - return [x for y in documents for x in y] - - @staticmethod - def _iso_to_date_time(iso_date: datetime | str | None) -> datetime | None: - """Handle conversion of iso string to datetime. - - The microseconds from MeiliSearch are sometimes too long for python to convert so this - strips off the last digits to shorten it when that happens. - """ - if not iso_date: - return None - if isinstance(iso_date, datetime): - return iso_date +def _batch(documents: list[dict], batch_size: int) -> Generator[list[dict], None, None]: + total_len = len(documents) + for i in range(0, total_len, batch_size): + yield documents[i : i + batch_size] - try: - return datetime.strptime(iso_date, "%Y-%m-%dT%H:%M:%S.%fZ") - except ValueError: - split = iso_date.split(".") - reduce = len(split[1]) - 6 - reduced = f"{split[0]}.{split[1][:-reduce]}Z" - return datetime.strptime(reduced, "%Y-%m-%dT%H:%M:%S.%fZ") - @staticmethod - async def _load_documents_from_file( - file_path: Path | str, - ) -> list[dict[Any, Any]]: - if isinstance(file_path, str): - file_path = Path(file_path) +def _raise_on_no_documents( + documents: list[Any], document_type: str, directory_path: str | Path +) -> None: + if not documents: + raise MeiliSearchError(f"No {document_type} files found in {directory_path}") - loop = get_running_loop() - await loop.run_in_executor(None, partial(Index._validate_file_type, file_path)) - if file_path.suffix == ".csv": - with open(file_path, mode="r") as f: - documents = await loop.run_in_executor(None, partial(DictReader, f)) - return list(documents) +def _combine_documents(documents: list[list[Any]]) -> list[Any]: + return [x for y in documents for x in y] + + +def _iso_to_date_time(iso_date: datetime | str | None) -> datetime | None: + """Handle conversion of iso string to datetime. + + The microseconds from MeiliSearch are sometimes too long for python to convert so this + strips off the last digits to shorten it when that happens. + """ + if not iso_date: + return None + + if isinstance(iso_date, datetime): + return iso_date + + try: + return datetime.strptime(iso_date, "%Y-%m-%dT%H:%M:%S.%fZ") + except ValueError: + split = iso_date.split(".") + reduce = len(split[1]) - 6 + reduced = f"{split[0]}.{split[1][:-reduce]}Z" + return datetime.strptime(reduced, "%Y-%m-%dT%H:%M:%S.%fZ") + + +async def _load_documents_from_file( + file_path: Path | str, +) -> list[dict[Any, Any]]: + if isinstance(file_path, str): + file_path = Path(file_path) + + loop = get_running_loop() + await loop.run_in_executor(None, partial(_validate_file_type, file_path)) + + if file_path.suffix == ".csv": + with open(file_path, mode="r") as f: + documents = await loop.run_in_executor(None, partial(DictReader, f)) + return list(documents) + + if file_path.suffix == ".ndjson": + with open(file_path, mode="r") as f: + return [await loop.run_in_executor(None, partial(json.loads, x)) for x in f] - if file_path.suffix == ".ndjson": - with open(file_path, mode="r") as f: - return [await loop.run_in_executor(None, partial(json.loads, x)) for x in f] + async with aiofiles.open(file_path, mode="r") as f: # type: ignore + data = await f.read() # type: ignore + documents = await loop.run_in_executor(None, partial(json.loads, data)) - async with aiofiles.open(file_path, mode="r") as f: # type: ignore - data = await f.read() # type: ignore - documents = await loop.run_in_executor(None, partial(json.loads, data)) + if not isinstance(documents, list): + raise InvalidDocumentError("MeiliSearch requires documents to be in a list") - if not isinstance(documents, list): - raise InvalidDocumentError("MeiliSearch requires documents to be in a list") + return documents - return documents - @staticmethod - def _validate_file_type(file_path: Path) -> None: - if file_path.suffix not in (".json", ".csv", ".ndjson"): - raise MeiliSearchError("File must be a json, ndjson, or csv file") +def _validate_file_type(file_path: Path) -> None: + if file_path.suffix not in (".json", ".csv", ".ndjson"): + raise MeiliSearchError("File must be a json, ndjson, or csv file") diff --git a/tests/test_documents.py b/tests/test_documents.py index 14c1e357..334890e6 100644 --- a/tests/test_documents.py +++ b/tests/test_documents.py @@ -9,7 +9,7 @@ MeiliSearchApiError, MeiliSearchError, ) -from meilisearch_python_async.index import Index +from meilisearch_python_async.index import _combine_documents, _load_documents_from_file from meilisearch_python_async.task import wait_for_task @@ -914,7 +914,7 @@ async def test_load_documents_from_file_invalid_document(tmp_path): json.dump(doc, f) with pytest.raises(InvalidDocumentError): - await Index._load_documents_from_file(file_path) + await _load_documents_from_file(file_path) def test_combine_documents(): @@ -923,7 +923,7 @@ def test_combine_documents(): [{"id": 3, "name": "Test 3"}], ] - combined = Index._combine_documents(docs) + combined = _combine_documents(docs) assert len(combined) == 3 assert [1, 2, 3] == [x["id"] for x in combined] diff --git a/tests/test_index.py b/tests/test_index.py index 2e723a27..938bc03d 100644 --- a/tests/test_index.py +++ b/tests/test_index.py @@ -5,7 +5,7 @@ from meilisearch_python_async._http_requests import HttpRequests from meilisearch_python_async.errors import MeiliSearchApiError -from meilisearch_python_async.index import Index +from meilisearch_python_async.index import Index, _iso_to_date_time from meilisearch_python_async.models.settings import ( Faceting, MeiliSearchSettings, @@ -473,8 +473,8 @@ async def test_reset_faceting(empty_index, default_faceting): ], ) def test_iso_to_date_time(iso_date, expected, test_client): - index = Index(test_client, "test") - converted = index._iso_to_date_time(iso_date) + Index(test_client, "test") + converted = _iso_to_date_time(iso_date) assert converted == expected