Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(artifacts): when artifact-commit 409s, retry entire artifact-creation, not just commit #4272

Closed
Show file tree
Hide file tree
Changes from 10 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
e7b566f
retry 409s at higher level
speezepearson Sep 13, 2022
dd7e37e
refactor out _retry_conflicts
speezepearson Sep 14, 2022
8cb6ab5
introduce ad-hoc Future class to avoid awkward `nonlocal` var
speezepearson Sep 14, 2022
436c178
add explanatory comment
speezepearson Sep 15, 2022
b5c74ec
make functionally-public function actually-public
speezepearson Sep 15, 2022
25b9693
fix retry logic by introducing class ArtifactCommitFailed
speezepearson Sep 15, 2022
8749102
lint
speezepearson Sep 15, 2022
fcc93ab
Merge branch 'master' into spencerpearson/retry-conflict-higher-level
speezepearson Sep 16, 2022
cdd2401
Merge remote-tracking branch 'origin/master' into spencerpearson/retr…
speezepearson Sep 19, 2022
153932c
Merge branch 'spencerpearson/no-retry-conflict' into spencerpearson/r…
kptkin Sep 19, 2022
2ad2ca3
wip
speezepearson Sep 21, 2022
72e8efa
test(artifacts): add test for artifact retries, and GraphQL-injection…
speezepearson Sep 22, 2022
86258be
remove dead code
speezepearson Sep 22, 2022
95f7cf1
undo failed attempt to make make RelayServer error reporting better
speezepearson Sep 22, 2022
a0d01ba
add test for retry on 409
speezepearson Sep 22, 2022
394c448
Merge branch 'spencerpearson/no-retry-conflict' into spencerpearson/r…
speezepearson Sep 22, 2022
a85d0a6
lint
speezepearson Sep 22, 2022
7021af8
Merge branch 'master' into spencerpearson/relay-graphql
speezepearson Sep 22, 2022
db47ded
Merge branch 'spencerpearson/relay-graphql' into spencerpearson/retry…
speezepearson Sep 22, 2022
4cb2d1b
lint
speezepearson Sep 22, 2022
d338460
undo unnecessary isort changes
speezepearson Sep 22, 2022
f34121d
lint
speezepearson Sep 22, 2022
b980fea
fix Resolver methods, and make them stylistically more-closely-resemb…
speezepearson Sep 22, 2022
0a6b526
fix Resolver methods, and make them stylistically more-closely-resemb…
speezepearson Sep 22, 2022
75f4a1e
improve typing, add assertion
speezepearson Sep 22, 2022
4882721
undo unnecessary changes
speezepearson Sep 22, 2022
8ef08a7
make test more precise
speezepearson Sep 22, 2022
26a41fb
fix test
speezepearson Sep 23, 2022
cc526df
fix more tests by not assuming every context-entry has a `config` key
speezepearson Sep 23, 2022
1c719e0
Merge branch 'spencerpearson/relay-graphql' into spencerpearson/retry…
speezepearson Sep 23, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
16 changes: 10 additions & 6 deletions wandb/filesync/step_upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ class ArtifactStatus(TypedDict):


PreCommitFn = Callable[[], None]
PostCommitFn = Callable[[], None]
PostCommitFn = Callable[[Optional[Exception]], None]
OnRequestFinishFn = Callable[[], None]
SaveFn = Callable[["progress.ProgressFn"], Any]

Expand Down Expand Up @@ -216,12 +216,16 @@ def _maybe_commit_artifact(self, artifact_id: str) -> None:
artifact_status["pending_count"] == 0
and artifact_status["commit_requested"]
):
for callback in artifact_status["pre_commit_callbacks"]:
callback()
for pre_callback in artifact_status["pre_commit_callbacks"]:
pre_callback()
exc = None
if artifact_status["finalize"]:
self._api.commit_artifact(artifact_id)
for callback in artifact_status["post_commit_callbacks"]:
callback()
try:
self._api.commit_artifact(artifact_id)
except Exception as e:
exc = e
for post_callback in artifact_status["post_commit_callbacks"]:
post_callback(exc)

def start(self) -> None:
self._thread.start()
Expand Down
121 changes: 110 additions & 11 deletions wandb/sdk/internal/artifacts.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,29 @@
import datetime
import json
import os
import sys
import tempfile
import threading
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Sequence
from typing import (
TYPE_CHECKING,
Any,
Dict,
Generic,
List,
Optional,
Sequence,
TypeVar,
Union,
cast,
)

import requests

import wandb
import wandb.filesync.step_prepare
import wandb.sdk.lib.retry
from wandb import util

from ..interface.artifacts import ArtifactEntry, ArtifactManifest
from wandb.sdk.interface.artifacts import ArtifactEntry, ArtifactManifest

if TYPE_CHECKING:
from wandb.proto import wandb_internal_pb2
Expand All @@ -30,7 +44,13 @@ def __call__(
pass


def _manifest_json_from_proto(manifest: "wandb_internal_pb2.ArtifactManifest") -> Dict:
class ArtifactCommitError(Exception):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is your sense of having this error in the same location as other errors (i.e. errors/init.py). Honestly, we are not consistent about it, so it is fine either way...

def __init__(self, exception: Exception) -> None:
super().__init__(f"Artifact commit failed: {exception}")
self.exception = exception


def manifest_json_from_proto(manifest: "wandb_internal_pb2.ArtifactManifest") -> Dict:
if manifest.version == 1:
contents = {
content.path: {
Expand Down Expand Up @@ -61,6 +81,34 @@ def _manifest_json_from_proto(manifest: "wandb_internal_pb2.ArtifactManifest") -
}


def _retry_conflicts(e: Exception) -> Union[bool, datetime.timedelta]:
if not isinstance(e, ArtifactCommitError):
return False
e = e.exception
if not isinstance(e, requests.exceptions.HTTPError):
return False
if e.response.status_code == 409:
return datetime.timedelta(minutes=2)
return False


_T = TypeVar("_T")


class _Future(Generic[_T]):
def __init__(self) -> None:
self._object: Optional[_T] = None
self._object_ready = threading.Event()

def set(self, obj: _T) -> None:
self._object = obj
self._object_ready.set()

def get(self) -> _T:
self._object_ready.wait()
return cast(_T, self._object)


class ArtifactSaver:
_server_artifact: Optional[Dict] # TODO better define this dict

Expand Down Expand Up @@ -95,6 +143,55 @@ def save(
incremental: bool = False,
history_step: Optional[int] = None,
) -> Optional[Dict]:

# TODO: this retry is messy and should not exist!
# It's a hacky fix for https://wandb.atlassian.net/browse/WB-10888 :
# as of 2022-09-14, "409 Conflict" errors indicate that some uploaded files
# conflict with the artifact's most recent version's files. The right way to
# solve this would be for the SDK to ask the server what those files are,
# and replace their manifest entries with references to the previous version's
# entries; but, as a stopgap solution, we just retry the entire save operation,
# even though that might leave a bunch of orphaned files associated with the
# aborted artifact.
save_retry = wandb.sdk.lib.retry.Retry(
self._save,
retryable_exceptions=(ArtifactCommitError,),
check_retry_fn=_retry_conflicts,
num_retries=3,
)

return save_retry(
type,
name,
client_id,
sequence_client_id,
distributed_id,
finalize,
metadata,
description,
aliases,
labels,
use_after_commit,
incremental,
history_step,
)

def _save(
self,
type: str,
name: str,
client_id: str,
sequence_client_id: str,
distributed_id: Optional[str] = None,
finalize: bool = True,
metadata: Optional[Dict] = None,
description: Optional[str] = None,
aliases: Optional[Sequence[str]] = None,
labels: Optional[List[str]] = None,
use_after_commit: bool = False,
incremental: bool = False,
history_step: Optional[int] = None,
) -> Optional[Dict]:
aliases = aliases or []
alias_specs = []
for alias in aliases:
Expand Down Expand Up @@ -190,7 +287,7 @@ def save(
),
)

commit_event = threading.Event()
commit_exc: _Future[Optional[Exception]] = _Future()

def before_commit() -> None:
self._resolve_client_id_manifest_references()
Expand Down Expand Up @@ -232,11 +329,12 @@ def before_commit() -> None:
extra_headers=extra_headers,
)

def on_commit() -> None:
if finalize and use_after_commit:
self._api.use_artifact(artifact_id)
def on_commit(exc: Optional[Exception]) -> None:
if exc is None:
if finalize and use_after_commit:
self._api.use_artifact(artifact_id)
step_prepare.shutdown()
commit_event.set()
commit_exc.set(exc)

# This will queue the commit. It will only happen after all the file uploads are done
self._file_pusher.commit_artifact(
Expand All @@ -248,8 +346,9 @@ def on_commit() -> None:

# Block until all artifact files are uploaded and the
# artifact is committed.
while not commit_event.is_set():
commit_event.wait()
exc = commit_exc.get()
if exc is not None:
raise ArtifactCommitError(exc) from exc

return self._server_artifact

Expand Down
2 changes: 1 addition & 1 deletion wandb/sdk/internal/sender.py
Original file line number Diff line number Diff line change
Expand Up @@ -1256,7 +1256,7 @@ def _send_artifact(
saver = artifacts.ArtifactSaver(
api=self._api,
digest=artifact.digest,
manifest_json=artifacts._manifest_json_from_proto(artifact.manifest),
manifest_json=artifacts.manifest_json_from_proto(artifact.manifest),
file_pusher=self._pusher,
is_user_created=artifact.user_created,
)
Expand Down