-
Notifications
You must be signed in to change notification settings - Fork 619
/
wandb_run.py
3725 lines (3238 loc) 路 136 KB
/
wandb_run.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
import _thread as thread
import atexit
import functools
import glob
import json
import logging
import numbers
import os
import re
import sys
import threading
import time
import traceback
from collections.abc import Mapping
from datetime import timedelta
from enum import IntEnum
from types import TracebackType
from typing import (
TYPE_CHECKING,
Any,
Callable,
Dict,
List,
NamedTuple,
Optional,
Sequence,
TextIO,
Tuple,
Type,
Union,
)
import requests
import wandb
from wandb import errors, trigger
from wandb._globals import _datatypes_set_callback
from wandb.apis import internal, public
from wandb.apis.internal import Api
from wandb.apis.public import Api as PublicApi
from wandb.proto.wandb_internal_pb2 import MetricRecord, PollExitResponse, RunRecord
from wandb.sdk.lib.import_hooks import (
register_post_import_hook,
unregister_post_import_hook,
)
from wandb.util import (
_is_artifact_object,
_is_artifact_string,
_is_artifact_version_weave_dict,
_is_py_path,
add_import_hook,
parse_artifact_string,
sentry_set_scope,
to_forward_slash_path,
)
from wandb.viz import CustomChart, Visualize, custom_chart
from . import wandb_artifacts, wandb_config, wandb_metric, wandb_summary
from .data_types._dtypes import TypeRegistry
from .interface.artifacts import Artifact as ArtifactInterface
from .interface.interface import GlobStr, InterfaceBase
from .interface.summary_record import SummaryRecord
from .lib import (
config_util,
deprecate,
filenames,
filesystem,
ipython,
module,
proto_util,
redirect,
telemetry,
)
from .lib.exit_hooks import ExitHooks
from .lib.filenames import DIFF_FNAME
from .lib.git import GitRepo
from .lib.printer import get_printer
from .lib.reporting import Reporter
from .lib.wburls import wburls
from .wandb_artifacts import Artifact
from .wandb_settings import Settings, SettingsConsole
from .wandb_setup import _WandbSetup
if TYPE_CHECKING:
if sys.version_info >= (3, 8):
from typing import TypedDict
else:
from typing_extensions import TypedDict
from wandb.proto.wandb_internal_pb2 import (
CheckVersionResponse,
GetSummaryResponse,
SampledHistoryResponse,
)
from .data_types.base_types.wb_value import WBValue
from .interface.artifacts import ArtifactEntry, ArtifactManifest
from .interface.interface import FilesDict, PolicyName
from .lib.printer import PrinterJupyter, PrinterTerm
from .wandb_alerts import AlertLevel
class GitSourceDict(TypedDict):
remote: str
commit: str
entrypoint: List[str]
args: Sequence[str]
class ArtifactSourceDict(TypedDict):
artifact: str
entrypoint: List[str]
args: Sequence[str]
class ImageSourceDict(TypedDict):
image: str
args: Sequence[str]
class JobSourceDict(TypedDict, total=False):
_version: str
source_type: str
source: Union[GitSourceDict, ArtifactSourceDict, ImageSourceDict]
input_types: Dict[str, Any]
output_types: Dict[str, Any]
runtime: Optional[str]
logger = logging.getLogger("wandb")
EXIT_TIMEOUT = 60
RE_LABEL = re.compile(r"[a-zA-Z0-9_-]+$")
class TeardownStage(IntEnum):
EARLY = 1
LATE = 2
class TeardownHook(NamedTuple):
call: Callable[[], None]
stage: TeardownStage
class RunStatusChecker:
"""Periodically polls the background process for relevant updates.
For now, we just use this to figure out if the user has requested a stop.
"""
def __init__(
self,
interface: InterfaceBase,
stop_polling_interval: int = 15,
retry_polling_interval: int = 5,
) -> None:
self._interface = interface
self._stop_polling_interval = stop_polling_interval
self._retry_polling_interval = retry_polling_interval
self._join_event = threading.Event()
self._stop_thread = threading.Thread(target=self.check_status)
self._stop_thread.name = "ChkStopThr"
self._stop_thread.daemon = True
self._stop_thread.start()
self._retry_thread = threading.Thread(target=self.check_network_status)
self._retry_thread.name = "NetStatThr"
self._retry_thread.daemon = True
self._retry_thread.start()
def check_network_status(self) -> None:
join_requested = False
while not join_requested:
status_response = self._interface.communicate_network_status()
if status_response and status_response.network_responses:
for hr in status_response.network_responses:
if (
hr.http_status_code == 200 or hr.http_status_code == 0
): # we use 0 for non-http errors (eg wandb errors)
wandb.termlog(f"{hr.http_response_text}")
else:
wandb.termlog(
"{} encountered ({}), retrying request".format(
hr.http_status_code, hr.http_response_text.rstrip()
)
)
join_requested = self._join_event.wait(self._retry_polling_interval)
def check_status(self) -> None:
join_requested = False
while not join_requested:
status_response = self._interface.communicate_stop_status()
if status_response and status_response.run_should_stop:
# TODO(frz): This check is required
# until WB-3606 is resolved on server side.
if not wandb.agents.pyagent.is_running():
thread.interrupt_main()
return
join_requested = self._join_event.wait(self._stop_polling_interval)
def stop(self) -> None:
self._join_event.set()
def join(self) -> None:
self.stop()
self._stop_thread.join()
self._retry_thread.join()
class _run_decorator: # noqa: N801
_is_attaching: str = ""
class Dummy:
...
@classmethod
def _attach(cls, func: Callable) -> Callable:
@functools.wraps(func)
def wrapper(self: Type["Run"], *args: Any, **kwargs: Any) -> Any:
# * `_attach_id` is only assigned in service hence for all non-service cases
# it will be a passthrough.
# * `_attach_pid` is only assigned in _init (using _attach_pid guarantees single attach):
# - for non-fork case the object is shared through pickling so will be None.
# - for fork case the new process share mem space hence the value would be of parent process.
if (
getattr(self, "_attach_id", None)
and getattr(self, "_attach_pid", None) != os.getpid()
):
if cls._is_attaching:
message = (
f"Trying to attach `{func.__name__}` "
f"while in the middle of attaching `{cls._is_attaching}`"
)
raise RuntimeError(message)
cls._is_attaching = func.__name__
try:
wandb._attach(run=self)
except Exception as e:
# In case the attach fails we will raise the exception that caused the issue.
# This exception should be caught and fail the execution of the program.
cls._is_attaching = ""
raise e
cls._is_attaching = ""
return func(self, *args, **kwargs)
return wrapper
@classmethod
def _noop(cls, func: Callable) -> Callable:
@functools.wraps(func)
def wrapper(self: Type["Run"], *args: Any, **kwargs: Any) -> Any:
# `_attach_id` is only assigned in service hence for all service cases
# it will be a passthrough. We don't pickle non-service so again a way
# to see that we are in non-service case
if getattr(self, "_attach_id", None) is None:
# `_init_pid` is only assigned in __init__ (this will be constant check for mp):
# - for non-fork case the object is shared through pickling,
# and we don't pickle non-service so will be None
# - for fork case the new process share mem space hence the value would be of parent process.
_init_pid = getattr(self, "_init_pid", None)
if _init_pid != os.getpid():
message = "`{}` ignored (called from pid={}, `init` called from pid={}). See: {}".format(
func.__name__,
os.getpid(),
_init_pid,
wburls.get("multiprocess"),
)
# - if this process was pickled in non-service case,
# we ignore the attributes (since pickle is not supported)
# - for fork case will use the settings of the parent process
# - only point of inconsistent behavior from forked and non-forked cases
settings = getattr(self, "_settings", None)
if settings and settings["strict"]:
wandb.termerror(message, repeat=False)
raise errors.MultiprocessError(
f"`{func.__name__}` does not support multiprocessing"
)
wandb.termwarn(message, repeat=False)
return cls.Dummy()
return func(self, *args, **kwargs)
return wrapper
class Run:
"""A unit of computation logged by wandb. Typically, this is an ML experiment.
Create a run with `wandb.init()`:
<!--yeadoc-test:run-object-basic-->
```python
import wandb
run = wandb.init()
```
There is only ever at most one active `wandb.Run` in any process,
and it is accessible as `wandb.run`:
<!--yeadoc-test:global-run-object-->
```python
import wandb
assert wandb.run is None
wandb.init()
assert wandb.run is not None
```
anything you log with `wandb.log` will be sent to that run.
If you want to start more runs in the same script or notebook, you'll need to
finish the run that is in-flight. Runs can be finished with `wandb.finish` or
by using them in a `with` block:
<!--yeadoc-test:run-context-manager-->
```python
import wandb
wandb.init()
wandb.finish()
assert wandb.run is None
with wandb.init() as run:
pass # log data here
assert wandb.run is None
```
See the documentation for `wandb.init` for more on creating runs, or check out
[our guide to `wandb.init`](https://docs.wandb.ai/guides/track/launch).
In distributed training, you can either create a single run in the rank 0 process
and then log information only from that process, or you can create a run in each process,
logging from each separately, and group the results together with the `group` argument
to `wandb.init`. For more details on distributed training with W&B, check out
[our guide](https://docs.wandb.ai/guides/track/advanced/distributed-training).
Currently, there is a parallel `Run` object in the `wandb.Api`. Eventually these
two objects will be merged.
Attributes:
summary: (Summary) Single values set for each `wandb.log()` key. By
default, summary is set to the last value logged. You can manually
set summary to the best value, like max accuracy, instead of the
final value.
"""
_telemetry_obj: telemetry.TelemetryRecord
_telemetry_obj_active: bool
_telemetry_obj_dirty: bool
_telemetry_obj_flushed: bytes
_teardown_hooks: List[TeardownHook]
_tags: Optional[Tuple[Any, ...]]
_entity: Optional[str]
_project: Optional[str]
_group: Optional[str]
_job_type: Optional[str]
_name: Optional[str]
_notes: Optional[str]
_run_obj: Optional[RunRecord]
_run_obj_offline: Optional[RunRecord]
# Use string literal annotation because of type reference loop
_backend: Optional["wandb.sdk.backend.backend.Backend"]
_internal_run_interface: Optional[
Union[
"wandb.sdk.interface.interface_queue.InterfaceQueue",
"wandb.sdk.interface.interface_grpc.InterfaceGrpc",
]
]
_wl: Optional[_WandbSetup]
_out_redir: Optional[redirect.RedirectBase]
_err_redir: Optional[redirect.RedirectBase]
_redirect_cb: Optional[Callable[[str, str], None]]
_redirect_raw_cb: Optional[Callable[[str, str], None]]
_output_writer: Optional["filesystem.CRDedupedFile"]
_quiet: Optional[bool]
_atexit_cleanup_called: bool
_hooks: Optional[ExitHooks]
_exit_code: Optional[int]
_run_status_checker: Optional[RunStatusChecker]
_check_version: Optional["CheckVersionResponse"]
_sampled_history: Optional["SampledHistoryResponse"]
_final_summary: Optional["GetSummaryResponse"]
_poll_exit_response: Optional[PollExitResponse]
_stdout_slave_fd: Optional[int]
_stderr_slave_fd: Optional[int]
_artifact_slots: List[str]
_init_pid: int
_attach_pid: int
_iface_pid: Optional[int]
_iface_port: Optional[int]
_attach_id: Optional[str]
_is_attached: bool
_settings: Settings
_launch_artifacts: Optional[Dict[str, Any]]
def __init__(
self,
settings: Settings,
config: Optional[Dict[str, Any]] = None,
sweep_config: Optional[Dict[str, Any]] = None,
launch_config: Optional[Dict[str, Any]] = None,
) -> None:
# pid is set, so we know if this run object was initialized by this process
self._init_pid = os.getpid()
self._init(
settings=settings,
config=config,
sweep_config=sweep_config,
launch_config=launch_config,
)
def _init(
self,
settings: Settings,
config: Optional[Dict[str, Any]] = None,
sweep_config: Optional[Dict[str, Any]] = None,
launch_config: Optional[Dict[str, Any]] = None,
) -> None:
self._settings = settings
self._config = wandb_config.Config()
self._config._set_callback(self._config_callback)
self._config._set_artifact_callback(self._config_artifact_callback)
self._config._set_settings(self._settings)
self._backend = None
self._internal_run_interface = None
self.summary = wandb_summary.Summary(
self._summary_get_current_summary_callback,
)
self.summary._set_update_callback(self._summary_update_callback)
self._step = 0
self._torch_history: Optional["wandb.wandb_torch.TorchHistory"] = None
# todo: eventually would be nice to make this configurable using self._settings._start_time
# need to test (jhr): if you set start time to 2 days ago and run a test for 15 minutes,
# does the total time get calculated right (not as 2 days and 15 minutes)?
self._start_time = time.time()
_datatypes_set_callback(self._datatypes_callback)
self._printer = get_printer(self._settings._jupyter)
self._wl = None
self._reporter: Optional[Reporter] = None
self._entity = None
self._project = None
self._group = None
self._job_type = None
self._run_id = self._settings.run_id
self._starting_step = 0
self._name = None
self._notes = None
self._tags = None
self._remote_url = None
self._commit = None
self._hooks = None
self._teardown_hooks = []
self._out_redir = None
self._err_redir = None
self._stdout_slave_fd = None
self._stderr_slave_fd = None
self._exit_code = None
self._exit_result = None
self._quiet = self._settings.quiet
self._code_artifact_info: Optional[Dict[str, str]] = None
self._output_writer = None
self._used_artifact_slots: Dict[str, str] = {}
# Returned from backend request_run(), set from wandb_init?
self._run_obj = None
self._run_obj_offline = None
# Created when the run "starts".
self._run_status_checker = None
self._check_version = None
self._sampled_history = None
self._final_summary = None
self._poll_exit_response = None
# Initialize telemetry object
self._telemetry_obj = telemetry.TelemetryRecord()
self._telemetry_obj_active = False
self._telemetry_obj_flushed = b""
self._telemetry_obj_dirty = False
self._atexit_cleanup_called = False
# Pull info from settings
self._init_from_settings(self._settings)
# Initial scope setup for sentry. This might get changed when the
# actual run comes back.
sentry_set_scope(
settings_dict=self._settings,
process_context="user",
)
# Populate config
config = config or dict()
wandb_key = "_wandb"
config.setdefault(wandb_key, dict())
self._launch_artifact_mapping: Dict[str, Any] = {}
self._unique_launch_artifact_sequence_names: Dict[str, Any] = {}
if self._settings.save_code and self._settings.program_relpath:
config[wandb_key]["code_path"] = to_forward_slash_path(
os.path.join("code", self._settings.program_relpath)
)
if sweep_config:
self._config.update_locked(
sweep_config, user="sweep", _allow_val_change=True
)
if launch_config:
self._config.update_locked(
launch_config, user="launch", _allow_val_change=True
)
self._config._update(config, ignore_locked=True)
# interface pid and port configured when backend is configured (See _hack_set_run)
# TODO: using pid isnt the best for windows as pid reuse can happen more often than unix
self._iface_pid = None
self._iface_port = None
self._attach_id = None
self._is_attached = False
self._attach_pid = os.getpid()
# for now, use runid as attach id, this could/should be versioned in the future
if self._settings._require_service:
self._attach_id = self._settings.run_id
def _set_iface_pid(self, iface_pid: int) -> None:
self._iface_pid = iface_pid
def _set_iface_port(self, iface_port: int) -> None:
self._iface_port = iface_port
def _handle_launch_artifact_overrides(self) -> None:
if self._settings.launch and (os.environ.get("WANDB_ARTIFACTS") is not None):
try:
artifacts: Dict[str, Any] = json.loads(
os.environ.get("WANDB_ARTIFACTS", "{}")
)
except (ValueError, SyntaxError):
wandb.termwarn("Malformed WANDB_ARTIFACTS, using original artifacts")
else:
self._initialize_launch_artifact_maps(artifacts)
elif (
self._settings.launch
and self._settings.launch_config_path
and os.path.exists(self._settings.launch_config_path)
):
self._save(self._settings.launch_config_path)
with open(self._settings.launch_config_path) as fp:
launch_config = json.loads(fp.read())
if launch_config.get("overrides", {}).get("artifacts") is not None:
artifacts = launch_config.get("overrides").get("artifacts")
self._initialize_launch_artifact_maps(artifacts)
def _initialize_launch_artifact_maps(self, artifacts: Dict[str, Any]) -> None:
for key, item in artifacts.items():
self._launch_artifact_mapping[key] = item
artifact_sequence_tuple_or_slot = key.split(":")
if len(artifact_sequence_tuple_or_slot) == 2:
sequence_name = artifact_sequence_tuple_or_slot[0].split("/")[-1]
if self._unique_launch_artifact_sequence_names.get(sequence_name):
self._unique_launch_artifact_sequence_names.pop(sequence_name)
else:
self._unique_launch_artifact_sequence_names[sequence_name] = item
def _telemetry_callback(self, telem_obj: telemetry.TelemetryRecord) -> None:
self._telemetry_obj.MergeFrom(telem_obj)
self._telemetry_obj_dirty = True
self._telemetry_flush()
def _telemetry_flush(self) -> None:
if not self._telemetry_obj_active:
return
if not self._telemetry_obj_dirty:
return
if self._backend and self._backend.interface:
serialized = self._telemetry_obj.SerializeToString()
if serialized == self._telemetry_obj_flushed:
return
self._backend.interface._publish_telemetry(self._telemetry_obj)
self._telemetry_obj_flushed = serialized
self._telemetry_obj_dirty = False
def _freeze(self) -> None:
self._frozen = True
def __setattr__(self, attr: str, value: object) -> None:
if getattr(self, "_frozen", None) and not hasattr(self, attr):
raise Exception(f"Attribute {attr} is not supported on Run object.")
super().__setattr__(attr, value)
def _update_settings(self, settings: Settings) -> None:
self._settings = settings
self._init_from_settings(settings)
def _init_from_settings(self, settings: Settings) -> None:
if settings.entity is not None:
self._entity = settings.entity
if settings.project is not None:
self._project = settings.project
if settings.run_group is not None:
self._group = settings.run_group
if settings.run_job_type is not None:
self._job_type = settings.run_job_type
if settings.run_name is not None:
self._name = settings.run_name
if settings.run_notes is not None:
self._notes = settings.run_notes
if settings.run_tags is not None:
self._tags = settings.run_tags
def _make_proto_run(self, run: RunRecord) -> None:
"""Populate protocol buffer RunData for interface/interface."""
if self._entity is not None:
run.entity = self._entity
if self._project is not None:
run.project = self._project
if self._group is not None:
run.run_group = self._group
if self._job_type is not None:
run.job_type = self._job_type
if self._run_id is not None:
run.run_id = self._run_id
if self._name is not None:
run.display_name = self._name
if self._notes is not None:
run.notes = self._notes
if self._tags is not None:
for tag in self._tags:
run.tags.append(tag)
if self._start_time is not None:
run.start_time.FromMicroseconds(int(self._start_time * 1e6))
if self._remote_url is not None:
run.git.remote_url = self._remote_url
if self._commit is not None:
run.git.commit = self._commit
# Note: run.config is set in interface/interface:_make_run()
def _populate_git_info(self) -> None:
# Use user provided git info if available otherwise resolve it from the environment
try:
repo = GitRepo(
root=self._settings.git_root,
remote=self._settings.git_remote,
remote_url=self._settings.git_remote_url,
commit=self._settings.git_commit,
lazy=False,
)
self._remote_url, self._commit = repo.remote_url, repo.last_commit
except Exception:
wandb.termwarn("Cannot find valid git repo associated with this directory.")
def __getstate__(self) -> Any:
"""Custom pickler."""
# We only pickle in service mode
if not self._settings or not self._settings._require_service:
return
_attach_id = self._attach_id
if not _attach_id:
return
return dict(_attach_id=self._attach_id, _init_pid=self._init_pid)
def __setstate__(self, state: Any) -> None:
"""Custom unpickler."""
if not state:
return
_attach_id = state.get("_attach_id")
if not _attach_id:
return
if state["_init_pid"] == os.getpid():
raise RuntimeError("attach in the same process is not supported currently")
self.__dict__.update(state)
@property
def _torch(self) -> "wandb.wandb_torch.TorchHistory":
if self._torch_history is None:
self._torch_history = wandb.wandb_torch.TorchHistory()
return self._torch_history
@property # type: ignore
@_run_decorator._attach
def settings(self) -> Settings:
"""Returns a frozen copy of run's Settings object."""
cp = self._settings.copy()
cp.freeze()
return cp
@property # type: ignore
@_run_decorator._attach
def dir(self) -> str:
"""Returns the directory where files associated with the run are saved."""
return self._settings.files_dir
@property # type: ignore
@_run_decorator._attach
def config(self) -> wandb_config.Config:
"""Returns the config object associated with this run."""
return self._config
@property # type: ignore
@_run_decorator._attach
def config_static(self) -> wandb_config.ConfigStatic:
return wandb_config.ConfigStatic(self._config)
@property # type: ignore
@_run_decorator._attach
def name(self) -> Optional[str]:
"""Returns the display name of the run.
Display names are not guaranteed to be unique and may be descriptive.
By default, they are randomly generated.
"""
if self._name:
return self._name
if not self._run_obj:
return None
return self._run_obj.display_name
@name.setter
def name(self, name: str) -> None:
with telemetry.context(run=self) as tel:
tel.feature.set_run_name = True
self._name = name
if self._backend and self._backend.interface:
self._backend.interface.publish_run(self)
@property # type: ignore
@_run_decorator._attach
def notes(self) -> Optional[str]:
"""Returns the notes associated with the run, if there are any.
Notes can be a multiline string and can also use markdown and latex equations
inside `$$`, like `$x + 3$`.
"""
if self._notes:
return self._notes
if not self._run_obj:
return None
return self._run_obj.notes
@notes.setter
def notes(self, notes: str) -> None:
self._notes = notes
if self._backend and self._backend.interface:
self._backend.interface.publish_run(self)
@property # type: ignore
@_run_decorator._attach
def tags(self) -> Optional[Tuple]:
"""Returns the tags associated with the run, if there are any."""
if self._tags:
return self._tags
run_obj = self._run_obj or self._run_obj_offline
if run_obj:
return tuple(run_obj.tags)
return None
@tags.setter
def tags(self, tags: Sequence) -> None:
with telemetry.context(run=self) as tel:
tel.feature.set_run_tags = True
self._tags = tuple(tags)
if self._backend and self._backend.interface:
self._backend.interface.publish_run(self)
@property # type: ignore
@_run_decorator._attach
def id(self) -> str:
"""Returns the identifier for this run."""
if TYPE_CHECKING:
assert self._run_id is not None
return self._run_id
@property # type: ignore
@_run_decorator._attach
def sweep_id(self) -> Optional[str]:
"""Returns the ID of the sweep associated with the run, if there is one."""
if not self._run_obj:
return None
return self._run_obj.sweep_id or None
def _get_path(self) -> str:
parts = [
e for e in [self._entity, self._project, self._run_id] if e is not None
]
return "/".join(parts)
@property # type: ignore
@_run_decorator._attach
def path(self) -> str:
"""Returns the path to the run.
Run paths include entity, project, and run ID, in the format
`entity/project/run_id`.
"""
return self._get_path()
def _get_start_time(self) -> float:
return (
self._start_time
if not self._run_obj
else (self._run_obj.start_time.ToMicroseconds() / 1e6)
)
@property # type: ignore
@_run_decorator._attach
def start_time(self) -> float:
"""Returns the unix time stamp, in seconds, when the run started."""
return self._get_start_time()
def _get_starting_step(self) -> int:
return self._starting_step if not self._run_obj else self._run_obj.starting_step
@property # type: ignore
@_run_decorator._attach
def starting_step(self) -> int:
"""Returns the first step of the run."""
return self._get_starting_step()
@property # type: ignore
@_run_decorator._attach
def resumed(self) -> bool:
"""Returns True if the run was resumed, False otherwise."""
return self._run_obj.resumed if self._run_obj else False
@property # type: ignore
@_run_decorator._attach
def step(self) -> int:
"""Returns the current value of the step.
This counter is incremented by `wandb.log`.
"""
return self._step
def project_name(self) -> str:
run_obj = self._run_obj or self._run_obj_offline
return run_obj.project if run_obj else ""
@property # type: ignore
@_run_decorator._attach
def mode(self) -> str:
"""For compatibility with `0.9.x` and earlier, deprecate eventually."""
deprecate.deprecate(
field_name=deprecate.Deprecated.run__mode,
warning_message=(
"The mode property of wandb.run is deprecated "
"and will be removed in a future release."
),
)
return "dryrun" if self._settings._offline else "run"
@property # type: ignore
@_run_decorator._attach
def offline(self) -> bool:
return self._settings._offline
@property # type: ignore
@_run_decorator._attach
def disabled(self) -> bool:
return self._settings._noop
def _get_group(self) -> str:
run_obj = self._run_obj or self._run_obj_offline
return run_obj.run_group if run_obj else ""
@property # type: ignore
@_run_decorator._attach
def group(self) -> str:
"""Returns the name of the group associated with the run.
Setting a group helps the W&B UI organize runs in a sensible way.
If you are doing a distributed training you should give all of the
runs in the training the same group.
If you are doing crossvalidation you should give all the crossvalidation
folds the same group.
"""
return self._get_group()
@property # type: ignore
@_run_decorator._attach
def job_type(self) -> str:
run_obj = self._run_obj or self._run_obj_offline
return run_obj.job_type if run_obj else ""
@property # type: ignore
@_run_decorator._attach
def project(self) -> str:
"""Returns the name of the W&B project associated with the run."""
return self.project_name()
@_run_decorator._attach
def log_code(
self,
root: str = ".",
name: str = None,
include_fn: Callable[[str], bool] = _is_py_path,
exclude_fn: Callable[[str], bool] = filenames.exclude_wandb_fn,
) -> Optional[Artifact]:
"""Saves the current state of your code to a W&B Artifact.
By default, it walks the current directory and logs all files that end with `.py`.
Arguments:
root: The relative (to `os.getcwd()`) or absolute path to recursively find code from.
name: (str, optional) The name of our code artifact. By default, we'll name
the artifact `source-$PROJECT_ID-$ENTRYPOINT_RELPATH`. There may be scenarios where you want
many runs to share the same artifact. Specifying name allows you to achieve that.
include_fn: A callable that accepts a file path and
returns True when it should be included and False otherwise. This
defaults to: `lambda path: path.endswith(".py")`
exclude_fn: A callable that accepts a file path and returns `True` when it should be
excluded and `False` otherwise. This defaults to: `lambda path: False`
Examples:
Basic usage
```python
run.log_code()
```
Advanced usage
```python
run.log_code(
"../", include_fn=lambda path: path.endswith(".py") or path.endswith(".ipynb")
)
```
Returns:
An `Artifact` object if code was logged
"""
if name is None:
name_string = wandb.util.make_artifact_name_safe(
f"{self._project}-{self._settings.program_relpath}"
)
name = f"source-{name_string}"
art = wandb.Artifact(name, "code")
files_added = False
if root is not None:
root = os.path.abspath(root)
for file_path in filenames.filtered_dir(root, include_fn, exclude_fn):
files_added = True
save_name = os.path.relpath(file_path, root)
art.add_file(file_path, name=save_name)
# Add any manually staged files such is ipynb notebooks
for dirpath, _, files in os.walk(self._settings._tmp_code_dir):
for fname in files:
file_path = os.path.join(dirpath, fname)
save_name = os.path.relpath(file_path, self._settings._tmp_code_dir)
files_added = True
art.add_file(file_path, name=save_name)
if not files_added:
return None
self._code_artifact_info = {"name": name, "client_id": art._client_id}
return self._log_artifact(art)
def get_url(self) -> Optional[str]:
"""Returns the url for the W&B run, if there is one.
Offline runs will not have a url.
"""
if self._settings._offline:
wandb.termwarn("URL not available in offline run")
return None
return self._settings.run_url
def get_project_url(self) -> Optional[str]:
"""Returns the url for the W&B project associated with the run, if there is one.
Offline runs will not have a project url.
"""