diff --git a/src/neptune/new/cli/clear.py b/src/neptune/new/cli/clear.py index 3a31a0963..419be8caf 100644 --- a/src/neptune/new/cli/clear.py +++ b/src/neptune/new/cli/clear.py @@ -32,7 +32,7 @@ class ClearRunner(AbstractBackendRunner): - def clear(self, path: Path, force: bool = False): + def clear(self, path: Path, force: bool = False, clear_eventual: bool = True): container_manager = ContainersManager(self._backend, path) synced_containers, unsynced_containers, not_found = container_manager.partition_containers_and_clean_junk(path) @@ -40,7 +40,7 @@ def clear(self, path: Path, force: bool = False): ClearRunner.remove_containers(not_found) - if offline_containers or unsynced_containers: + if clear_eventual and (offline_containers or unsynced_containers): self.log_junk_metadata(offline_containers, unsynced_containers) if force or click.confirm("\nDo you want to delete the listed metadata?"): diff --git a/src/neptune/new/cli/commands.py b/src/neptune/new/cli/commands.py index 92d26fc26..f7a4f51fb 100644 --- a/src/neptune/new/cli/commands.py +++ b/src/neptune/new/cli/commands.py @@ -150,7 +150,8 @@ def sync( neptune sync --project workspace/project --offline-only """ - sync_runner = SyncRunner(backend=HostedNeptuneBackend(Credentials.from_token())) + backend = HostedNeptuneBackend(Credentials.from_token()) + sync_runner = SyncRunner(backend=backend) if runs_names: logger.warning( @@ -171,6 +172,10 @@ def sync( else: sync_runner.sync_all_containers(path, project_name) + clear_runner = ClearRunner(backend=backend) + + clear_runner.clear(path, clear_eventual=False) + @click.command() @path_option diff --git a/src/neptune/new/cli/sync.py b/src/neptune/new/cli/sync.py index 769516798..71bbde923 100644 --- a/src/neptune/new/cli/sync.py +++ b/src/neptune/new/cli/sync.py @@ -82,41 +82,41 @@ def sync_execution( container_id: UniqueId, container_type: ContainerType, ) -> None: - disk_queue = DiskQueue( + with DiskQueue( dir_path=execution_path, to_dict=lambda x: x.to_dict(), from_dict=Operation.from_dict, lock=threading.RLock(), - ) - while True: - batch, version = disk_queue.get_batch(1000) - if not batch: - break - - start_time = time.monotonic() - expected_count = len(batch) - version_to_ack = version - expected_count + ) as disk_queue: while True: - try: - processed_count, _ = self._backend.execute_operations( - container_id=container_id, - container_type=container_type, - operations=batch, - ) - version_to_ack += processed_count - batch = batch[processed_count:] - disk_queue.ack(version) - if version_to_ack == version: - break - except NeptuneConnectionLostException as ex: - if time.monotonic() - start_time > retries_timeout: - raise ex - logger.warning( - "Experiencing connection interruptions." - " Will try to reestablish communication with Neptune." - " Internal exception was: %s", - ex.cause.__class__.__name__, - ) + batch, version = disk_queue.get_batch(1000) + if not batch: + break + + start_time = time.monotonic() + expected_count = len(batch) + version_to_ack = version - expected_count + while True: + try: + processed_count, _ = self._backend.execute_operations( + container_id=container_id, + container_type=container_type, + operations=batch, + ) + version_to_ack += processed_count + batch = batch[processed_count:] + disk_queue.ack(version) + if version_to_ack == version: + break + except NeptuneConnectionLostException as ex: + if time.monotonic() - start_time > retries_timeout: + raise ex + logger.warning( + "Experiencing connection interruptions." + " Will try to reestablish communication with Neptune." + " Internal exception was: %s", + ex.cause.__class__.__name__, + ) def sync_all_registered_containers(self, base_path: Path) -> None: async_path = base_path / ASYNC_DIRECTORY diff --git a/src/neptune/new/internal/disk_queue.py b/src/neptune/new/internal/disk_queue.py index 6217c6047..69763d8f9 100644 --- a/src/neptune/new/internal/disk_queue.py +++ b/src/neptune/new/internal/disk_queue.py @@ -233,4 +233,5 @@ def __enter__(self): return self def __exit__(self, exc_type, exc_val, exc_tb): + self.flush() self.close()