From b1d9e477ffc58bfd8693aa892ba61d355aacfd00 Mon Sep 17 00:00:00 2001 From: Pavel Sankin Date: Mon, 14 Nov 2022 16:16:21 +0100 Subject: [PATCH 01/11] add limit --- src/neptune/new/internal/disk_queue.py | 33 +++++++++++++++++++------- 1 file changed, 25 insertions(+), 8 deletions(-) diff --git a/src/neptune/new/internal/disk_queue.py b/src/neptune/new/internal/disk_queue.py index 69763d8f9..d1d0eb384 100644 --- a/src/neptune/new/internal/disk_queue.py +++ b/src/neptune/new/internal/disk_queue.py @@ -56,6 +56,9 @@ def __init__( self._to_dict = to_dict self._from_dict = from_dict self._max_file_size = max_file_size + self._max_batch_size = os.environ.get("MAX_BATCH_SIZE") + if not self._max_batch_size: + self._max_batch_size = 100 * 1024**2 try: os.makedirs(self._dir_path) @@ -115,31 +118,45 @@ def _skip_and_get(self) -> Tuple[Optional[T], int]: return obj, ver def _get(self) -> Tuple[Optional[T], int]: + serialized_obj = self._get_serialized() + if not serialized_obj: + return None, -1 + try: + return self._deserialize(serialized_obj) + except Exception as e: + raise MalformedOperation from e + + def _get_serialized(self) -> Optional[dict]: _json = self._reader.get() if not _json: if self._read_file_version >= self._write_file_version: - return None, -1 + return None self._reader.close() self._read_file_version = self._next_log_file_version(self._read_file_version) self._reader = JsonFileSplitter(self._get_log_file(self._read_file_version)) # It is safe. Max recursion level is 2. - return self._get() - try: - return self._deserialize(_json) - except Exception as e: - raise MalformedOperation from e + return self._get_serialized() + return _json def get_batch(self, size: int) -> Tuple[List[T], int]: first, ver = self.get() if not first: return [], ver + ret = [first] + cur_batch_size = 0 for _ in range(0, size - 1): - obj, next_ver = self._get() - if not obj: + serialized_obj = self._get_serialized() + if not serialized_obj: break + cur_batch_size += len(serialized_obj) + obj, next_ver = self._deserialize(serialized_obj) + ver = next_ver ret.append(obj) + + if cur_batch_size >= self._max_batch_size: + break return ret, ver def flush(self): From bf0f40407a6eb7a2030d2a6fb99775854895a9b8 Mon Sep 17 00:00:00 2001 From: Pavel Sankin Date: Mon, 14 Nov 2022 16:37:05 +0100 Subject: [PATCH 02/11] add test --- src/neptune/new/internal/disk_queue.py | 4 ++- tests/neptune/new/internal/test_disk_queue.py | 26 ++++++++++++++++++- 2 files changed, 28 insertions(+), 2 deletions(-) diff --git a/src/neptune/new/internal/disk_queue.py b/src/neptune/new/internal/disk_queue.py index d1d0eb384..3c6061392 100644 --- a/src/neptune/new/internal/disk_queue.py +++ b/src/neptune/new/internal/disk_queue.py @@ -57,7 +57,9 @@ def __init__( self._from_dict = from_dict self._max_file_size = max_file_size self._max_batch_size = os.environ.get("MAX_BATCH_SIZE") - if not self._max_batch_size: + if self._max_batch_size is not None: + self._max_batch_size = int(self._max_batch_size) + else: self._max_batch_size = 100 * 1024**2 try: diff --git a/tests/neptune/new/internal/test_disk_queue.py b/tests/neptune/new/internal/test_disk_queue.py index 4115f0fef..1571ca9e7 100644 --- a/tests/neptune/new/internal/test_disk_queue.py +++ b/tests/neptune/new/internal/test_disk_queue.py @@ -13,7 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. # - +import os import random import threading import unittest @@ -98,6 +98,30 @@ def test_get_batch(self): ) queue.close() + def test_batch_limit(self): + os.environ["MAX_BATCH_SIZE"] = "3" + with TemporaryDirectory() as dirpath: + queue = DiskQueue[TestDiskQueue.Obj]( + Path(dirpath), + self._serializer, + self._deserializer, + threading.RLock(), + max_file_size=100, + ) + for i in range(5): + obj = TestDiskQueue.Obj(i, str(i)) + queue.put(obj) + queue.flush() + + self.assertEqual( + queue.get_batch(5), + ([TestDiskQueue.Obj(i, str(i)) for i in range(3)], 3), + ) + self.assertEqual( + queue.get_batch(2), + ([TestDiskQueue.Obj(i, str(i)) for i in range(3, 5)], 5), + ) + def test_resuming_queue(self): with TemporaryDirectory() as dirpath: queue = DiskQueue[TestDiskQueue.Obj]( From e9402fbd9a9c53bade254b86759e42c7c94a822f Mon Sep 17 00:00:00 2001 From: Pavel Sankin Date: Mon, 14 Nov 2022 16:58:29 +0100 Subject: [PATCH 03/11] setting limit --- src/neptune/new/internal/disk_queue.py | 9 ++++----- tests/neptune/new/internal/test_disk_queue.py | 3 +-- 2 files changed, 5 insertions(+), 7 deletions(-) diff --git a/src/neptune/new/internal/disk_queue.py b/src/neptune/new/internal/disk_queue.py index 3c6061392..f52d35084 100644 --- a/src/neptune/new/internal/disk_queue.py +++ b/src/neptune/new/internal/disk_queue.py @@ -51,16 +51,15 @@ def __init__( from_dict: Callable[[dict], T], lock: threading.RLock, max_file_size: int = 64 * 1024**2, + max_batch_size: int = None, ): self._dir_path = dir_path.resolve() self._to_dict = to_dict self._from_dict = from_dict self._max_file_size = max_file_size - self._max_batch_size = os.environ.get("MAX_BATCH_SIZE") - if self._max_batch_size is not None: - self._max_batch_size = int(self._max_batch_size) - else: - self._max_batch_size = 100 * 1024**2 + self._max_batch_size = max_batch_size + if max_batch_size is None: + self._max_batch_size = int(os.environ.get("MAX_BATCH_SIZE") or str(100 * 1024**2)) try: os.makedirs(self._dir_path) diff --git a/tests/neptune/new/internal/test_disk_queue.py b/tests/neptune/new/internal/test_disk_queue.py index 1571ca9e7..3d0d94042 100644 --- a/tests/neptune/new/internal/test_disk_queue.py +++ b/tests/neptune/new/internal/test_disk_queue.py @@ -13,7 +13,6 @@ # See the License for the specific language governing permissions and # limitations under the License. # -import os import random import threading import unittest @@ -99,7 +98,6 @@ def test_get_batch(self): queue.close() def test_batch_limit(self): - os.environ["MAX_BATCH_SIZE"] = "3" with TemporaryDirectory() as dirpath: queue = DiskQueue[TestDiskQueue.Obj]( Path(dirpath), @@ -107,6 +105,7 @@ def test_batch_limit(self): self._deserializer, threading.RLock(), max_file_size=100, + max_batch_size=3, ) for i in range(5): obj = TestDiskQueue.Obj(i, str(i)) From 31f4450ba0bb8318af0f2e7f7f668143f2da6d28 Mon Sep 17 00:00:00 2001 From: Pavel Sankin Date: Mon, 14 Nov 2022 17:58:35 +0100 Subject: [PATCH 04/11] first element fix --- src/neptune/new/internal/disk_queue.py | 19 ++++++++++--------- tests/neptune/new/internal/test_disk_queue.py | 2 +- 2 files changed, 11 insertions(+), 10 deletions(-) diff --git a/src/neptune/new/internal/disk_queue.py b/src/neptune/new/internal/disk_queue.py index f52d35084..55bf95fed 100644 --- a/src/neptune/new/internal/disk_queue.py +++ b/src/neptune/new/internal/disk_queue.py @@ -51,15 +51,15 @@ def __init__( from_dict: Callable[[dict], T], lock: threading.RLock, max_file_size: int = 64 * 1024**2, - max_batch_size: int = None, + max_batch_size_bytes: int = None, ): self._dir_path = dir_path.resolve() self._to_dict = to_dict self._from_dict = from_dict self._max_file_size = max_file_size - self._max_batch_size = max_batch_size - if max_batch_size is None: - self._max_batch_size = int(os.environ.get("MAX_BATCH_SIZE") or str(100 * 1024**2)) + self._max_batch_size_bytes = max_batch_size_bytes + if max_batch_size_bytes is None: + self._max_batch_size_bytes = int(os.environ.get("NEPTUNE_MAX_BATCH_SIZE_BYTES") or str(100 * 1024**2)) try: os.makedirs(self._dir_path) @@ -140,12 +140,13 @@ def _get_serialized(self) -> Optional[dict]: return _json def get_batch(self, size: int) -> Tuple[List[T], int]: - first, ver = self.get() - if not first: - return [], ver + serialized_first = self._get_serialized() + if not serialized_first: + return [], -1 + cur_batch_size = len(serialized_first) + first, ver = self._deserialize(serialized_first) ret = [first] - cur_batch_size = 0 for _ in range(0, size - 1): serialized_obj = self._get_serialized() if not serialized_obj: @@ -156,7 +157,7 @@ def get_batch(self, size: int) -> Tuple[List[T], int]: ver = next_ver ret.append(obj) - if cur_batch_size >= self._max_batch_size: + if cur_batch_size >= self._max_batch_size_bytes: break return ret, ver diff --git a/tests/neptune/new/internal/test_disk_queue.py b/tests/neptune/new/internal/test_disk_queue.py index 3d0d94042..25bfc8c42 100644 --- a/tests/neptune/new/internal/test_disk_queue.py +++ b/tests/neptune/new/internal/test_disk_queue.py @@ -105,7 +105,7 @@ def test_batch_limit(self): self._deserializer, threading.RLock(), max_file_size=100, - max_batch_size=3, + max_batch_size_bytes=6, ) for i in range(5): obj = TestDiskQueue.Obj(i, str(i)) From 2f1f9e5c30d5f5f209cc4e7e853a704af86a2fda Mon Sep 17 00:00:00 2001 From: Pavel Sankin Date: Tue, 15 Nov 2022 16:38:08 +0100 Subject: [PATCH 05/11] issue fixes --- src/neptune/new/internal/disk_queue.py | 50 +++++++++++++------------- 1 file changed, 25 insertions(+), 25 deletions(-) diff --git a/src/neptune/new/internal/disk_queue.py b/src/neptune/new/internal/disk_queue.py index 55bf95fed..ab1b11de6 100644 --- a/src/neptune/new/internal/disk_queue.py +++ b/src/neptune/new/internal/disk_queue.py @@ -43,6 +43,7 @@ class DiskQueue(Generic[T]): # NOTICE: This class is thread-safe as long as there is only one consumer and one producer. + DEFAULT_MAX_BATCH_SIZE_BYTES = 100 * 1024**2 def __init__( self, @@ -57,9 +58,9 @@ def __init__( self._to_dict = to_dict self._from_dict = from_dict self._max_file_size = max_file_size - self._max_batch_size_bytes = max_batch_size_bytes - if max_batch_size_bytes is None: - self._max_batch_size_bytes = int(os.environ.get("NEPTUNE_MAX_BATCH_SIZE_BYTES") or str(100 * 1024**2)) + self._max_batch_size_bytes = max_batch_size_bytes or int( + os.environ.get("NEPTUNE_MAX_BATCH_SIZE_BYTES") or str(self.DEFAULT_MAX_BATCH_SIZE_BYTES) + ) try: os.makedirs(self._dir_path) @@ -96,18 +97,24 @@ def put(self, obj: T) -> int: def get(self) -> Tuple[Optional[T], int]: if self._should_skip_to_ack: - return self._skip_and_get() + serialized_obj = self._skip_and_get_serialized() else: - return self._get() + serialized_obj = self._get_serialized() + + if not serialized_obj: + return None, -1 + try: + return self._deserialize(serialized_obj) + except Exception as e: + raise MalformedOperation from e - def _skip_and_get(self) -> Tuple[Optional[T], int]: + def _skip_and_get_serialized(self) -> Optional[dict]: ack_version = self._last_ack_file.read_local() - ver = -1 while True: - obj, next_ver = self._get() - if obj is None: - return None, ver - ver = next_ver + serialized = self._get_serialized() + if serialized is None: + return None + ver = serialized["version"] if ver > ack_version: self._should_skip_to_ack = False if ver > ack_version + 1: @@ -116,16 +123,7 @@ def _skip_and_get(self) -> Tuple[Optional[T], int]: ack_version, ver, ) - return obj, ver - - def _get(self) -> Tuple[Optional[T], int]: - serialized_obj = self._get_serialized() - if not serialized_obj: - return None, -1 - try: - return self._deserialize(serialized_obj) - except Exception as e: - raise MalformedOperation from e + return serialized def _get_serialized(self) -> Optional[dict]: _json = self._reader.get() @@ -140,7 +138,10 @@ def _get_serialized(self) -> Optional[dict]: return _json def get_batch(self, size: int) -> Tuple[List[T], int]: - serialized_first = self._get_serialized() + if self._should_skip_to_ack: + serialized_first = self._skip_and_get_serialized() + else: + serialized_first = self._get_serialized() if not serialized_first: return [], -1 @@ -148,6 +149,8 @@ def get_batch(self, size: int) -> Tuple[List[T], int]: first, ver = self._deserialize(serialized_first) ret = [first] for _ in range(0, size - 1): + if cur_batch_size >= self._max_batch_size_bytes: + break serialized_obj = self._get_serialized() if not serialized_obj: break @@ -156,9 +159,6 @@ def get_batch(self, size: int) -> Tuple[List[T], int]: ver = next_ver ret.append(obj) - - if cur_batch_size >= self._max_batch_size_bytes: - break return ret, ver def flush(self): From d82817c1689dd69aee3b67de87479c31cc1c629a Mon Sep 17 00:00:00 2001 From: Pavel Sankin Date: Wed, 16 Nov 2022 22:02:27 +0100 Subject: [PATCH 06/11] refactor json_file_splitter to get size --- src/neptune/new/internal/disk_queue.py | 29 +++++++------ .../new/internal/utils/json_file_splitter.py | 16 +++++-- tests/neptune/new/internal/test_disk_queue.py | 3 +- .../internal/utils/test_json_file_splitter.py | 42 +++++++++++++++++++ 4 files changed, 70 insertions(+), 20 deletions(-) diff --git a/src/neptune/new/internal/disk_queue.py b/src/neptune/new/internal/disk_queue.py index ab1b11de6..31e4c4213 100644 --- a/src/neptune/new/internal/disk_queue.py +++ b/src/neptune/new/internal/disk_queue.py @@ -97,9 +97,9 @@ def put(self, obj: T) -> int: def get(self) -> Tuple[Optional[T], int]: if self._should_skip_to_ack: - serialized_obj = self._skip_and_get_serialized() + serialized_obj, _ = self._skip_and_get_serialized() else: - serialized_obj = self._get_serialized() + serialized_obj, _ = self._get_serialized() if not serialized_obj: return None, -1 @@ -108,12 +108,12 @@ def get(self) -> Tuple[Optional[T], int]: except Exception as e: raise MalformedOperation from e - def _skip_and_get_serialized(self) -> Optional[dict]: + def _skip_and_get_serialized(self) -> Tuple[Optional[dict], int]: ack_version = self._last_ack_file.read_local() while True: - serialized = self._get_serialized() + serialized, size = self._get_serialized() if serialized is None: - return None + return None, 0 ver = serialized["version"] if ver > ack_version: self._should_skip_to_ack = False @@ -123,38 +123,37 @@ def _skip_and_get_serialized(self) -> Optional[dict]: ack_version, ver, ) - return serialized + return serialized, size - def _get_serialized(self) -> Optional[dict]: - _json = self._reader.get() + def _get_serialized(self) -> Tuple[Optional[dict], int]: + _json, size = self._reader.get_with_size() if not _json: if self._read_file_version >= self._write_file_version: - return None + return None, 0 self._reader.close() self._read_file_version = self._next_log_file_version(self._read_file_version) self._reader = JsonFileSplitter(self._get_log_file(self._read_file_version)) # It is safe. Max recursion level is 2. return self._get_serialized() - return _json + return _json, size def get_batch(self, size: int) -> Tuple[List[T], int]: if self._should_skip_to_ack: - serialized_first = self._skip_and_get_serialized() + serialized_first, cur_batch_size = self._skip_and_get_serialized() else: - serialized_first = self._get_serialized() + serialized_first, cur_batch_size = self._get_serialized() if not serialized_first: return [], -1 - cur_batch_size = len(serialized_first) first, ver = self._deserialize(serialized_first) ret = [first] for _ in range(0, size - 1): if cur_batch_size >= self._max_batch_size_bytes: break - serialized_obj = self._get_serialized() + serialized_obj, obj_size = self._get_serialized() if not serialized_obj: break - cur_batch_size += len(serialized_obj) + cur_batch_size += obj_size obj, next_ver = self._deserialize(serialized_obj) ver = next_ver diff --git a/src/neptune/new/internal/utils/json_file_splitter.py b/src/neptune/new/internal/utils/json_file_splitter.py index f764c88ff..7e2390fa4 100644 --- a/src/neptune/new/internal/utils/json_file_splitter.py +++ b/src/neptune/new/internal/utils/json_file_splitter.py @@ -17,11 +17,13 @@ from collections import deque from io import StringIO from json import JSONDecodeError -from typing import Optional +from typing import ( + Optional, + Tuple, +) class JsonFileSplitter: - BUFFER_SIZE = 64 * 1024 MAX_PART_READ = 8 * 1024 @@ -37,11 +39,15 @@ def close(self) -> None: self._part_buffer.close() def get(self) -> Optional[dict]: + return (self.get_with_size() or (None, None))[0] + + def get_with_size(self) -> Tuple[Optional[dict], int]: if self._parsed_queue: return self._parsed_queue.popleft() self._read_data() if self._parsed_queue: return self._parsed_queue.popleft() + return None, 0 def _read_data(self): if self._part_buffer.tell() < self.MAX_PART_READ: @@ -64,12 +70,14 @@ def _decode(self, data: str): start = self._json_start(data) while start is not None: try: - json_data, start = self._decoder.raw_decode(data, start) + json_data, new_start = self._decoder.raw_decode(data, start) + size = new_start - start + start = new_start except JSONDecodeError: self._part_buffer.write(data[start:]) break else: - self._parsed_queue.append(json_data) + self._parsed_queue.append((json_data, size)) start = self._json_start(data, start) @staticmethod diff --git a/tests/neptune/new/internal/test_disk_queue.py b/tests/neptune/new/internal/test_disk_queue.py index 25bfc8c42..88293f749 100644 --- a/tests/neptune/new/internal/test_disk_queue.py +++ b/tests/neptune/new/internal/test_disk_queue.py @@ -98,6 +98,7 @@ def test_get_batch(self): queue.close() def test_batch_limit(self): + obj_size = len("{'obj': {'num': 0, 'txt': '0'}, 'version': 1}") with TemporaryDirectory() as dirpath: queue = DiskQueue[TestDiskQueue.Obj]( Path(dirpath), @@ -105,7 +106,7 @@ def test_batch_limit(self): self._deserializer, threading.RLock(), max_file_size=100, - max_batch_size_bytes=6, + max_batch_size_bytes=obj_size * 3, ) for i in range(5): obj = TestDiskQueue.Obj(i, str(i)) diff --git a/tests/neptune/new/internal/utils/test_json_file_splitter.py b/tests/neptune/new/internal/utils/test_json_file_splitter.py index 97eccc1ab..a42ed73f2 100644 --- a/tests/neptune/new/internal/utils/test_json_file_splitter.py +++ b/tests/neptune/new/internal/utils/test_json_file_splitter.py @@ -133,3 +133,45 @@ def test_big_json(self): self.assertEqual(splitter.get(), {}) self.assertEqual(splitter.get(), None) splitter.close() + + def test_data_size(self): + object1 = """{ + "a": 5, + "b": "text" + }""" + object2 = """{ + "a": 155, + "r": "something" + }""" + object3 = """{ + "a": { + "b": [1, 2, 3] + } + }""" + content1 = """ + { + "a": 5, + "b": "text" + } + { + "a": 1""" + + content2 = """55, + "r": "something" + } + { + "a": { + "b": [1, 2, 3] + } + }""" + + with create_file(content1) as filename, open(filename, "a") as fp: + splitter = JsonFileSplitter(filename) + self.assertEqual(splitter.get_with_size(), ({"a": 5, "b": "text"}, len(object1))) + self.assertIsNone(splitter.get_with_size()[0]) + fp.write(content2) + fp.flush() + self.assertEqual(splitter.get_with_size(), ({"a": 155, "r": "something"}, len(object2))) + self.assertEqual(splitter.get_with_size(), ({"a": {"b": [1, 2, 3]}}, len(object3))) + self.assertIsNone(splitter.get_with_size()[0]) + splitter.close() From 7b121178f0e1f37599b79e312ed307be295d5b40 Mon Sep 17 00:00:00 2001 From: Pavel Sankin Date: Thu, 17 Nov 2022 12:40:22 +0100 Subject: [PATCH 07/11] remove _get_serialized --- src/neptune/new/internal/disk_queue.py | 72 ++++++++++++++------------ 1 file changed, 39 insertions(+), 33 deletions(-) diff --git a/src/neptune/new/internal/disk_queue.py b/src/neptune/new/internal/disk_queue.py index 31e4c4213..5c7dfc537 100644 --- a/src/neptune/new/internal/disk_queue.py +++ b/src/neptune/new/internal/disk_queue.py @@ -18,6 +18,7 @@ import os import shutil import threading +from dataclasses import dataclass from glob import glob from pathlib import Path from typing import ( @@ -40,8 +41,14 @@ _logger = logging.getLogger(__name__) -class DiskQueue(Generic[T]): +@dataclass +class QueueElement(Generic[T]): + obj: T + ver: int + size: int + +class DiskQueue(Generic[T]): # NOTICE: This class is thread-safe as long as there is only one consumer and one producer. DEFAULT_MAX_BATCH_SIZE_BYTES = 100 * 1024**2 @@ -97,67 +104,66 @@ def put(self, obj: T) -> int: def get(self) -> Tuple[Optional[T], int]: if self._should_skip_to_ack: - serialized_obj, _ = self._skip_and_get_serialized() + top_element = self._skip_and_get() else: - serialized_obj, _ = self._get_serialized() - - if not serialized_obj: + top_element = self._get() + if top_element is None: return None, -1 - try: - return self._deserialize(serialized_obj) - except Exception as e: - raise MalformedOperation from e + return top_element.obj, top_element.ver - def _skip_and_get_serialized(self) -> Tuple[Optional[dict], int]: + def _skip_and_get(self) -> Optional[QueueElement[T]]: ack_version = self._last_ack_file.read_local() while True: - serialized, size = self._get_serialized() - if serialized is None: - return None, 0 - ver = serialized["version"] - if ver > ack_version: + top_element = self._get() + if top_element is None: + return None + if top_element.ver > ack_version: self._should_skip_to_ack = False - if ver > ack_version + 1: + if top_element.ver > ack_version + 1: _logger.warning( "Possible data loss. Last acknowledged operation version: %d, next: %d", ack_version, - ver, + top_element.ver, ) - return serialized, size + return top_element - def _get_serialized(self) -> Tuple[Optional[dict], int]: + def _get(self) -> Optional[QueueElement[T]]: _json, size = self._reader.get_with_size() if not _json: if self._read_file_version >= self._write_file_version: - return None, 0 + return None self._reader.close() self._read_file_version = self._next_log_file_version(self._read_file_version) self._reader = JsonFileSplitter(self._get_log_file(self._read_file_version)) # It is safe. Max recursion level is 2. - return self._get_serialized() - return _json, size + return self._get() + try: + obj, ver = self._deserialize(_json) + return QueueElement[T](obj, ver, size) + except Exception as e: + raise MalformedOperation from e def get_batch(self, size: int) -> Tuple[List[T], int]: if self._should_skip_to_ack: - serialized_first, cur_batch_size = self._skip_and_get_serialized() + first = self._skip_and_get() else: - serialized_first, cur_batch_size = self._get_serialized() - if not serialized_first: + first = self._get() + if not first: return [], -1 - first, ver = self._deserialize(serialized_first) - ret = [first] + ret = [first.obj] + ver = first.ver + cur_batch_size = first.size for _ in range(0, size - 1): if cur_batch_size >= self._max_batch_size_bytes: break - serialized_obj, obj_size = self._get_serialized() - if not serialized_obj: + next_obj = self._get() + if not next_obj: break - cur_batch_size += obj_size - obj, next_ver = self._deserialize(serialized_obj) - ver = next_ver - ret.append(obj) + cur_batch_size += next_obj.size + ver = next_obj.ver + ret.append(next_obj.obj) return ret, ver def flush(self): From 0bf822ad37c7bdbc0ecbc80df7aa07b96354d3d6 Mon Sep 17 00:00:00 2001 From: Pavel Sankin Date: Thu, 17 Nov 2022 17:50:44 +0100 Subject: [PATCH 08/11] return queue element instead of tuple --- src/neptune/new/cli/sync.py | 68 ++++++++++--------- src/neptune/new/internal/disk_queue.py | 23 +++---- .../async_operation_processor.py | 4 +- tests/neptune/new/internal/test_disk_queue.py | 39 +++++++---- 4 files changed, 73 insertions(+), 61 deletions(-) diff --git a/src/neptune/new/cli/sync.py b/src/neptune/new/cli/sync.py index 71bbde923..8e4aad6c7 100644 --- a/src/neptune/new/cli/sync.py +++ b/src/neptune/new/cli/sync.py @@ -34,7 +34,7 @@ get_offline_dirs, get_project, get_qualified_name, - is_container_synced_and_remove_junk, + is_container_synced, iterate_containers, split_dir_name, ) @@ -82,46 +82,48 @@ def sync_execution( container_id: UniqueId, container_type: ContainerType, ) -> None: - with DiskQueue( + disk_queue = DiskQueue( dir_path=execution_path, to_dict=lambda x: x.to_dict(), from_dict=Operation.from_dict, lock=threading.RLock(), - ) as disk_queue: + ) + while True: + batch = disk_queue.get_batch(1000) + if not batch: + break + version = batch[-1].ver + batch = [element.obj for element in batch] + + start_time = time.monotonic() + expected_count = len(batch) + version_to_ack = version - expected_count while True: - batch, version = disk_queue.get_batch(1000) - if not batch: - break - - start_time = time.monotonic() - expected_count = len(batch) - version_to_ack = version - expected_count - while True: - try: - processed_count, _ = self._backend.execute_operations( - container_id=container_id, - container_type=container_type, - operations=batch, - ) - version_to_ack += processed_count - batch = batch[processed_count:] - disk_queue.ack(version) - if version_to_ack == version: - break - except NeptuneConnectionLostException as ex: - if time.monotonic() - start_time > retries_timeout: - raise ex - logger.warning( - "Experiencing connection interruptions." - " Will try to reestablish communication with Neptune." - " Internal exception was: %s", - ex.cause.__class__.__name__, - ) + try: + processed_count, _ = self._backend.execute_operations( + container_id=container_id, + container_type=container_type, + operations=batch, + ) + version_to_ack += processed_count + batch = batch[processed_count:] + disk_queue.ack(version) + if version_to_ack == version: + break + except NeptuneConnectionLostException as ex: + if time.monotonic() - start_time > retries_timeout: + raise ex + logger.warning( + "Experiencing connection interruptions." + " Will try to reestablish communication with Neptune." + " Internal exception was: %s", + ex.cause.__class__.__name__, + ) def sync_all_registered_containers(self, base_path: Path) -> None: async_path = base_path / ASYNC_DIRECTORY for container_type, unique_id, path in iterate_containers(async_path): - if not is_container_synced_and_remove_junk(path): + if not is_container_synced(path): container = get_metadata_container( backend=self._backend, container_id=unique_id, @@ -237,4 +239,4 @@ def sync_selected_containers( def sync_all_containers(self, base_path: Path, project_name: Optional[str]) -> None: self.sync_all_registered_containers(base_path) - self.sync_all_offline_containers(base_path, project_name) + self.sync_all_offline_containers(base_path, project_name) \ No newline at end of file diff --git a/src/neptune/new/internal/disk_queue.py b/src/neptune/new/internal/disk_queue.py index 5c7dfc537..96e36e6f8 100644 --- a/src/neptune/new/internal/disk_queue.py +++ b/src/neptune/new/internal/disk_queue.py @@ -34,7 +34,7 @@ from neptune.new.internal.utils.json_file_splitter import JsonFileSplitter from neptune.new.internal.utils.sync_offset_file import SyncOffsetFile -__all__ = ["DiskQueue"] +__all__ = ["QueueElement", "DiskQueue"] T = TypeVar("T") @@ -102,14 +102,11 @@ def put(self, obj: T) -> int: self._file_size += len(_json) + 1 return version - def get(self) -> Tuple[Optional[T], int]: + def get(self) -> Optional[QueueElement[T]]: if self._should_skip_to_ack: - top_element = self._skip_and_get() + return self._skip_and_get() else: - top_element = self._get() - if top_element is None: - return None, -1 - return top_element.obj, top_element.ver + return self._get() def _skip_and_get(self) -> Optional[QueueElement[T]]: ack_version = self._last_ack_file.read_local() @@ -143,16 +140,15 @@ def _get(self) -> Optional[QueueElement[T]]: except Exception as e: raise MalformedOperation from e - def get_batch(self, size: int) -> Tuple[List[T], int]: + def get_batch(self, size: int) -> List[QueueElement[T]]: if self._should_skip_to_ack: first = self._skip_and_get() else: first = self._get() if not first: - return [], -1 + return [] - ret = [first.obj] - ver = first.ver + ret = [first] cur_batch_size = first.size for _ in range(0, size - 1): if cur_batch_size >= self._max_batch_size_bytes: @@ -162,9 +158,8 @@ def get_batch(self, size: int) -> Tuple[List[T], int]: break cur_batch_size += next_obj.size - ver = next_obj.ver - ret.append(next_obj.obj) - return ret, ver + ret.append(next_obj) + return ret def flush(self): self._writer.flush() diff --git a/src/neptune/new/internal/operation_processors/async_operation_processor.py b/src/neptune/new/internal/operation_processors/async_operation_processor.py index ca5d96022..800aaa42e 100644 --- a/src/neptune/new/internal/operation_processors/async_operation_processor.py +++ b/src/neptune/new/internal/operation_processors/async_operation_processor.py @@ -236,10 +236,10 @@ def work(self) -> None: self._processor._queue.flush() while True: - batch, version = self._processor._queue.get_batch(self._batch_size) + batch = self._processor._queue.get_batch(self._batch_size) if not batch: return - self.process_batch(batch, version) + self.process_batch([element.obj for element in batch], batch[-1].ver) @Daemon.ConnectionRetryWrapper( kill_message=( diff --git a/tests/neptune/new/internal/test_disk_queue.py b/tests/neptune/new/internal/test_disk_queue.py index 88293f749..fdd2d6b7e 100644 --- a/tests/neptune/new/internal/test_disk_queue.py +++ b/tests/neptune/new/internal/test_disk_queue.py @@ -13,6 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. # +import json import random import threading import unittest @@ -20,7 +21,10 @@ from pathlib import Path from tempfile import TemporaryDirectory -from neptune.new.internal.disk_queue import DiskQueue +from neptune.new.internal.disk_queue import ( + DiskQueue, + QueueElement, +) class TestDiskQueue(unittest.TestCase): @@ -32,6 +36,15 @@ def __init__(self, num: int, txt: str): def __eq__(self, other): return isinstance(other, TestDiskQueue.Obj) and self.num == other.num and self.txt == other.txt + @staticmethod + def get_obj_size_bytes(obj, version) -> int: + return len(json.dumps({"obj": obj.__dict__, "version": version})) + + @staticmethod + def get_queue_element(obj, version) -> QueueElement[Obj]: + obj_size = len(json.dumps({"obj": obj.__dict__, "version": version})) + return QueueElement(obj, version, obj_size) + def test_put(self): with TemporaryDirectory() as dirpath: queue = DiskQueue[TestDiskQueue.Obj]( @@ -43,7 +56,7 @@ def test_put(self): obj = TestDiskQueue.Obj(5, "test") queue.put(obj) queue.flush() - self.assertEqual(queue.get(), (obj, 1)) + self.assertEqual(queue.get(), self.get_queue_element(obj, 1)) queue.close() def test_multiple_files(self): @@ -60,7 +73,8 @@ def test_multiple_files(self): queue.put(obj) queue.flush() for i in range(1, 101): - self.assertEqual(queue.get(), (TestDiskQueue.Obj(i, str(i)), i)) + obj = TestDiskQueue.Obj(i, str(i)) + self.assertEqual(queue.get(), self.get_queue_element(obj, i)) queue.close() self.assertTrue(queue._read_file_version > 90) self.assertTrue(queue._write_file_version > 90) @@ -81,24 +95,24 @@ def test_get_batch(self): queue.flush() self.assertEqual( queue.get_batch(25), - ([TestDiskQueue.Obj(i, str(i)) for i in range(1, 26)], 25), + [self.get_queue_element(TestDiskQueue.Obj(i, str(i)), i) for i in range(1, 26)], ) self.assertEqual( queue.get_batch(25), - ([TestDiskQueue.Obj(i, str(i)) for i in range(26, 51)], 50), + [self.get_queue_element(TestDiskQueue.Obj(i, str(i)), i) for i in range(26, 51)], ) self.assertEqual( queue.get_batch(25), - ([TestDiskQueue.Obj(i, str(i)) for i in range(51, 76)], 75), + [self.get_queue_element(TestDiskQueue.Obj(i, str(i)), i) for i in range(51, 76)], ) self.assertEqual( queue.get_batch(25), - ([TestDiskQueue.Obj(i, str(i)) for i in range(76, 91)], 90), + [self.get_queue_element(TestDiskQueue.Obj(i, str(i)), i) for i in range(76, 91)], ) queue.close() def test_batch_limit(self): - obj_size = len("{'obj': {'num': 0, 'txt': '0'}, 'version': 1}") + obj_size = self.get_obj_size_bytes(TestDiskQueue.Obj(1, "1"), 1) with TemporaryDirectory() as dirpath: queue = DiskQueue[TestDiskQueue.Obj]( Path(dirpath), @@ -115,11 +129,11 @@ def test_batch_limit(self): self.assertEqual( queue.get_batch(5), - ([TestDiskQueue.Obj(i, str(i)) for i in range(3)], 3), + [self.get_queue_element(TestDiskQueue.Obj(i, str(i)), i + 1) for i in range(3)], ) self.assertEqual( queue.get_batch(2), - ([TestDiskQueue.Obj(i, str(i)) for i in range(3, 5)], 5), + [self.get_queue_element(TestDiskQueue.Obj(i, str(i)), i + 1) for i in range(3, 5)], ) def test_resuming_queue(self): @@ -135,7 +149,7 @@ def test_resuming_queue(self): obj = TestDiskQueue.Obj(i, str(i)) queue.put(obj) queue.flush() - _, version = queue.get_batch(random.randrange(300, 400)) + version = queue.get_batch(random.randrange(300, 400))[-1].ver version_to_ack = version - random.randrange(100, 200) queue.ack(version_to_ack) @@ -155,7 +169,8 @@ def test_resuming_queue(self): max_file_size=200, ) for i in range(version_to_ack + 1, 501): - self.assertEqual(queue.get(), (TestDiskQueue.Obj(i, str(i)), i)) + obj = TestDiskQueue.Obj(i, str(i)) + self.assertEqual(queue.get(), self.get_queue_element(obj, i)) queue.close() From 7911be34bf372c102336745cd150204739867cbe Mon Sep 17 00:00:00 2001 From: Pavel Sankin Date: Thu, 17 Nov 2022 18:32:10 +0100 Subject: [PATCH 09/11] merge --- src/neptune/new/cli/clear.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/neptune/new/cli/clear.py b/src/neptune/new/cli/clear.py index 4073b9231..560d74863 100644 --- a/src/neptune/new/cli/clear.py +++ b/src/neptune/new/cli/clear.py @@ -86,4 +86,4 @@ def remove_containers(paths): shutil.rmtree(path) logger.info(f"Deleted: {path}") except OSError: - logger.warn(f"Cannot remove directory: {path}") + logger.warn(f"Cannot remove directory: {path}") \ No newline at end of file From bff2f2b667f859b1035738fa7df8bd8ffc9806f8 Mon Sep 17 00:00:00 2001 From: Pavel Sankin Date: Thu, 17 Nov 2022 18:56:35 +0100 Subject: [PATCH 10/11] sync fix --- src/neptune/new/cli/sync.py | 70 ++++++++++++++++++------------------- 1 file changed, 35 insertions(+), 35 deletions(-) diff --git a/src/neptune/new/cli/sync.py b/src/neptune/new/cli/sync.py index 8e4aad6c7..ea1d55d44 100644 --- a/src/neptune/new/cli/sync.py +++ b/src/neptune/new/cli/sync.py @@ -34,7 +34,7 @@ get_offline_dirs, get_project, get_qualified_name, - is_container_synced, + is_container_synced_and_remove_junk, iterate_containers, split_dir_name, ) @@ -82,48 +82,48 @@ def sync_execution( container_id: UniqueId, container_type: ContainerType, ) -> None: - disk_queue = DiskQueue( + with DiskQueue( dir_path=execution_path, to_dict=lambda x: x.to_dict(), from_dict=Operation.from_dict, lock=threading.RLock(), - ) - while True: - batch = disk_queue.get_batch(1000) - if not batch: - break - version = batch[-1].ver - batch = [element.obj for element in batch] - - start_time = time.monotonic() - expected_count = len(batch) - version_to_ack = version - expected_count + ) as disk_queue: while True: - try: - processed_count, _ = self._backend.execute_operations( - container_id=container_id, - container_type=container_type, - operations=batch, - ) - version_to_ack += processed_count - batch = batch[processed_count:] - disk_queue.ack(version) - if version_to_ack == version: - break - except NeptuneConnectionLostException as ex: - if time.monotonic() - start_time > retries_timeout: - raise ex - logger.warning( - "Experiencing connection interruptions." - " Will try to reestablish communication with Neptune." - " Internal exception was: %s", - ex.cause.__class__.__name__, - ) + batch = disk_queue.get_batch(1000) + if not batch: + break + version = batch[-1].ver + batch = [element.obj for element in batch] + + start_time = time.monotonic() + expected_count = len(batch) + version_to_ack = version - expected_count + while True: + try: + processed_count, _ = self._backend.execute_operations( + container_id=container_id, + container_type=container_type, + operations=batch, + ) + version_to_ack += processed_count + batch = batch[processed_count:] + disk_queue.ack(version) + if version_to_ack == version: + break + except NeptuneConnectionLostException as ex: + if time.monotonic() - start_time > retries_timeout: + raise ex + logger.warning( + "Experiencing connection interruptions." + " Will try to reestablish communication with Neptune." + " Internal exception was: %s", + ex.cause.__class__.__name__, + ) def sync_all_registered_containers(self, base_path: Path) -> None: async_path = base_path / ASYNC_DIRECTORY for container_type, unique_id, path in iterate_containers(async_path): - if not is_container_synced(path): + if not is_container_synced_and_remove_junk(path): container = get_metadata_container( backend=self._backend, container_id=unique_id, @@ -239,4 +239,4 @@ def sync_selected_containers( def sync_all_containers(self, base_path: Path, project_name: Optional[str]) -> None: self.sync_all_registered_containers(base_path) - self.sync_all_offline_containers(base_path, project_name) \ No newline at end of file + self.sync_all_offline_containers(base_path, project_name) From 00e21dcadc252b60a492f0530841308d2ee61dc4 Mon Sep 17 00:00:00 2001 From: pankin397 <117637376+pankin397@users.noreply.github.com> Date: Thu, 17 Nov 2022 19:04:29 +0100 Subject: [PATCH 11/11] clear fix --- src/neptune/new/cli/clear.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/neptune/new/cli/clear.py b/src/neptune/new/cli/clear.py index 560d74863..4073b9231 100644 --- a/src/neptune/new/cli/clear.py +++ b/src/neptune/new/cli/clear.py @@ -86,4 +86,4 @@ def remove_containers(paths): shutil.rmtree(path) logger.info(f"Deleted: {path}") except OSError: - logger.warn(f"Cannot remove directory: {path}") \ No newline at end of file + logger.warn(f"Cannot remove directory: {path}")