-
Notifications
You must be signed in to change notification settings - Fork 395
/
runtime_metrics.py
165 lines (130 loc) · 5.14 KB
/
runtime_metrics.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
import itertools
import os
from typing import ClassVar
from typing import Optional
from typing import Set
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from ddtrace import Span
import attr
import ddtrace
from ddtrace.internal import forksafe
from .. import periodic
from ..dogstatsd import get_dogstatsd_client
from ..logger import get_logger
from .constants import DEFAULT_RUNTIME_METRICS
from .constants import DEFAULT_RUNTIME_TAGS
from .metric_collectors import GCRuntimeMetricCollector
from .metric_collectors import PSUtilRuntimeMetricCollector
from .tag_collectors import PlatformTagCollector
from .tag_collectors import TracerTagCollector
log = get_logger(__name__)
class RuntimeCollectorsIterable(object):
def __init__(self, enabled=None):
self._enabled = enabled or self.ENABLED
# Initialize the collectors.
self._collectors = [c() for c in self.COLLECTORS]
def __iter__(self):
collected = (collector.collect(self._enabled) for collector in self._collectors)
return itertools.chain.from_iterable(collected)
def __repr__(self):
return "{}(enabled={})".format(
self.__class__.__name__,
self._enabled,
)
class RuntimeTags(RuntimeCollectorsIterable):
ENABLED = DEFAULT_RUNTIME_TAGS
COLLECTORS = [
PlatformTagCollector,
TracerTagCollector,
]
class RuntimeMetrics(RuntimeCollectorsIterable):
ENABLED = DEFAULT_RUNTIME_METRICS
COLLECTORS = [
GCRuntimeMetricCollector,
PSUtilRuntimeMetricCollector,
]
def _get_interval_or_default():
return float(os.getenv("DD_RUNTIME_METRICS_INTERVAL", default=10))
@attr.s(eq=False)
class RuntimeWorker(periodic.PeriodicService):
"""Worker thread for collecting and writing runtime metrics to a DogStatsd
client.
"""
_interval = attr.ib(type=float, factory=_get_interval_or_default)
tracer = attr.ib(type=ddtrace.Tracer, default=None)
dogstatsd_url = attr.ib(type=Optional[str], default=None)
_dogstatsd_client = attr.ib(init=False, repr=False)
_runtime_metrics = attr.ib(factory=RuntimeMetrics, repr=False)
_services = attr.ib(type=Set[str], init=False, factory=set)
enabled = False
_instance = None # type: ClassVar[Optional[RuntimeWorker]]
_lock = forksafe.Lock()
def __attrs_post_init__(self):
# type: () -> None
self._dogstatsd_client = get_dogstatsd_client(self.dogstatsd_url or ddtrace.internal.agent.get_stats_url())
self.tracer = self.tracer or ddtrace.tracer
self.tracer.on_start_span(self._set_language_on_span)
def _set_language_on_span(
self,
span, # type: Span
):
# type: (...) -> None
# add tags to root span to correlate trace with runtime metrics
# only applied to spans with types that are internal to applications
if span.parent_id is None and self.tracer._is_span_internal(span):
span._set_str_tag("language", "python")
@classmethod
def disable(cls):
# type: () -> None
with cls._lock:
if cls._instance is None:
return
forksafe.unregister(cls._restart)
cls._instance.stop()
cls._instance.join()
cls._instance = None
cls.enabled = False
@classmethod
def _restart(cls):
cls.disable()
cls.enable()
@classmethod
def enable(cls, flush_interval=None, tracer=None, dogstatsd_url=None):
# type: (Optional[float], Optional[ddtrace.Tracer], Optional[str]) -> None
with cls._lock:
if cls._instance is not None:
return
if flush_interval is None:
flush_interval = _get_interval_or_default()
runtime_worker = cls(flush_interval, tracer, dogstatsd_url) # type: ignore[arg-type]
runtime_worker.start()
# force an immediate update constant tags
runtime_worker.update_runtime_tags()
forksafe.register(cls._restart)
cls._instance = runtime_worker
cls.enabled = True
def flush(self):
# type: () -> None
# The constant tags for the dogstatsd client needs to updated with any new
# service(s) that may have been added.
if self._services != self.tracer._services:
self._services = self.tracer._services
self.update_runtime_tags()
with self._dogstatsd_client:
for key, value in self._runtime_metrics:
log.debug("Writing metric %s:%s", key, value)
self._dogstatsd_client.distribution(key, value)
def _stop_service(self):
# type: (...) -> None
# De-register span hook
super(RuntimeWorker, self)._stop_service()
self.tracer.deregister_on_start_span(self._set_language_on_span)
def update_runtime_tags(self):
# type: () -> None
# DEV: ddstatsd expects tags in the form ['key1:value1', 'key2:value2', ...]
tags = ["{}:{}".format(k, v) for k, v in RuntimeTags()]
log.debug("Updating constant tags %s", tags)
self._dogstatsd_client.constant_tags = tags
periodic = flush
on_shutdown = flush