Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We鈥檒l occasionally send you account related emails.

Already on GitHub? Sign in to your account

Run clean runner on sync operation #1092

Merged
merged 1 commit into from Nov 17, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/neptune/new/cli/clear.py
Expand Up @@ -32,15 +32,15 @@


class ClearRunner(AbstractBackendRunner):
def clear(self, path: Path, force: bool = False):
def clear(self, path: Path, force: bool = False, clear_eventual: bool = True):
container_manager = ContainersManager(self._backend, path)
synced_containers, unsynced_containers, not_found = container_manager.partition_containers_and_clean_junk(path)

offline_containers = get_offline_dirs(path)

ClearRunner.remove_containers(not_found)

if offline_containers or unsynced_containers:
if clear_eventual and (offline_containers or unsynced_containers):
self.log_junk_metadata(offline_containers, unsynced_containers)

if force or click.confirm("\nDo you want to delete the listed metadata?"):
Expand Down
7 changes: 6 additions & 1 deletion src/neptune/new/cli/commands.py
Expand Up @@ -150,7 +150,8 @@ def sync(
neptune sync --project workspace/project --offline-only
"""

sync_runner = SyncRunner(backend=HostedNeptuneBackend(Credentials.from_token()))
backend = HostedNeptuneBackend(Credentials.from_token())
sync_runner = SyncRunner(backend=backend)

if runs_names:
logger.warning(
Expand All @@ -171,6 +172,10 @@ def sync(
else:
sync_runner.sync_all_containers(path, project_name)

clear_runner = ClearRunner(backend=backend)

clear_runner.clear(path, clear_eventual=False)


@click.command()
@path_option
Expand Down
60 changes: 30 additions & 30 deletions src/neptune/new/cli/sync.py
Expand Up @@ -82,41 +82,41 @@ def sync_execution(
container_id: UniqueId,
container_type: ContainerType,
) -> None:
disk_queue = DiskQueue(
with DiskQueue(
dir_path=execution_path,
to_dict=lambda x: x.to_dict(),
from_dict=Operation.from_dict,
lock=threading.RLock(),
)
while True:
batch, version = disk_queue.get_batch(1000)
if not batch:
break

start_time = time.monotonic()
expected_count = len(batch)
version_to_ack = version - expected_count
) as disk_queue:
while True:
try:
processed_count, _ = self._backend.execute_operations(
container_id=container_id,
container_type=container_type,
operations=batch,
)
version_to_ack += processed_count
batch = batch[processed_count:]
disk_queue.ack(version)
if version_to_ack == version:
break
except NeptuneConnectionLostException as ex:
if time.monotonic() - start_time > retries_timeout:
raise ex
logger.warning(
"Experiencing connection interruptions."
" Will try to reestablish communication with Neptune."
" Internal exception was: %s",
ex.cause.__class__.__name__,
)
batch, version = disk_queue.get_batch(1000)
if not batch:
break

start_time = time.monotonic()
expected_count = len(batch)
version_to_ack = version - expected_count
while True:
try:
processed_count, _ = self._backend.execute_operations(
container_id=container_id,
container_type=container_type,
operations=batch,
)
version_to_ack += processed_count
batch = batch[processed_count:]
disk_queue.ack(version)
if version_to_ack == version:
break
except NeptuneConnectionLostException as ex:
if time.monotonic() - start_time > retries_timeout:
raise ex
logger.warning(
"Experiencing connection interruptions."
" Will try to reestablish communication with Neptune."
" Internal exception was: %s",
ex.cause.__class__.__name__,
)

def sync_all_registered_containers(self, base_path: Path) -> None:
async_path = base_path / ASYNC_DIRECTORY
Expand Down
1 change: 1 addition & 0 deletions src/neptune/new/internal/disk_queue.py
Expand Up @@ -233,4 +233,5 @@ def __enter__(self):
return self

def __exit__(self, exc_type, exc_val, exc_tb):
PatrykGala marked this conversation as resolved.
Show resolved Hide resolved
self.flush()
self.close()