Skip to content

Commit

Permalink
Run clean runner on sync operation
Browse files Browse the repository at this point in the history
  • Loading branch information
PatrykGala committed Nov 17, 2022
1 parent 96868af commit 120ed03
Show file tree
Hide file tree
Showing 4 changed files with 39 additions and 33 deletions.
4 changes: 2 additions & 2 deletions src/neptune/new/cli/clear.py
Expand Up @@ -32,15 +32,15 @@


class ClearRunner(AbstractBackendRunner):
def clear(self, path: Path, force: bool = False):
def clear(self, path: Path, force: bool = False, clear_eventual: bool = True):
container_manager = ContainersManager(self._backend, path)
synced_containers, unsynced_containers, not_found = container_manager.partition_containers_and_clean_junk(path)

offline_containers = get_offline_dirs(path)

ClearRunner.remove_containers(not_found)

if offline_containers or unsynced_containers:
if clear_eventual and (offline_containers or unsynced_containers):
self.log_junk_metadata(offline_containers, unsynced_containers)

if force or click.confirm("\nDo you want to delete the listed metadata?"):
Expand Down
7 changes: 6 additions & 1 deletion src/neptune/new/cli/commands.py
Expand Up @@ -150,7 +150,8 @@ def sync(
neptune sync --project workspace/project --offline-only
"""

sync_runner = SyncRunner(backend=HostedNeptuneBackend(Credentials.from_token()))
backend = HostedNeptuneBackend(Credentials.from_token())
sync_runner = SyncRunner(backend=backend)

if runs_names:
logger.warning(
Expand All @@ -171,6 +172,10 @@ def sync(
else:
sync_runner.sync_all_containers(path, project_name)

clear_runner = ClearRunner(backend=backend)

clear_runner.clear(path, clear_eventual=False)


@click.command()
@path_option
Expand Down
60 changes: 30 additions & 30 deletions src/neptune/new/cli/sync.py
Expand Up @@ -82,41 +82,41 @@ def sync_execution(
container_id: UniqueId,
container_type: ContainerType,
) -> None:
disk_queue = DiskQueue(
with DiskQueue(
dir_path=execution_path,
to_dict=lambda x: x.to_dict(),
from_dict=Operation.from_dict,
lock=threading.RLock(),
)
while True:
batch, version = disk_queue.get_batch(1000)
if not batch:
break

start_time = time.monotonic()
expected_count = len(batch)
version_to_ack = version - expected_count
) as disk_queue:
while True:
try:
processed_count, _ = self._backend.execute_operations(
container_id=container_id,
container_type=container_type,
operations=batch,
)
version_to_ack += processed_count
batch = batch[processed_count:]
disk_queue.ack(version)
if version_to_ack == version:
break
except NeptuneConnectionLostException as ex:
if time.monotonic() - start_time > retries_timeout:
raise ex
logger.warning(
"Experiencing connection interruptions."
" Will try to reestablish communication with Neptune."
" Internal exception was: %s",
ex.cause.__class__.__name__,
)
batch, version = disk_queue.get_batch(1000)
if not batch:
break

start_time = time.monotonic()
expected_count = len(batch)
version_to_ack = version - expected_count
while True:
try:
processed_count, _ = self._backend.execute_operations(
container_id=container_id,
container_type=container_type,
operations=batch,
)
version_to_ack += processed_count
batch = batch[processed_count:]
disk_queue.ack(version)
if version_to_ack == version:
break
except NeptuneConnectionLostException as ex:
if time.monotonic() - start_time > retries_timeout:
raise ex
logger.warning(
"Experiencing connection interruptions."
" Will try to reestablish communication with Neptune."
" Internal exception was: %s",
ex.cause.__class__.__name__,
)

def sync_all_registered_containers(self, base_path: Path) -> None:
async_path = base_path / ASYNC_DIRECTORY
Expand Down
1 change: 1 addition & 0 deletions src/neptune/new/internal/disk_queue.py
Expand Up @@ -233,4 +233,5 @@ def __enter__(self):
return self

def __exit__(self, exc_type, exc_val, exc_tb):
self.flush()
self.close()

0 comments on commit 120ed03

Please sign in to comment.