-
Notifications
You must be signed in to change notification settings - Fork 395
/
http.py
250 lines (213 loc) · 8.67 KB
/
http.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
# -*- encoding: utf-8 -*-
import binascii
import datetime
import gzip
import itertools
import json
import os
import platform
import typing
import attr
import six
from six.moves import http_client
import tenacity
import ddtrace
from ddtrace.internal import agent
from ddtrace.internal import runtime
from ddtrace.internal.runtime import container
from ddtrace.internal.utils import attr as attr_utils
from ddtrace.internal.utils.formats import parse_tags_str
from ddtrace.profiling import exporter
from ddtrace.profiling import recorder
from ddtrace.profiling.exporter import pprof
HOSTNAME = platform.node()
PYTHON_IMPLEMENTATION = platform.python_implementation().encode()
PYTHON_VERSION = platform.python_version().encode()
class UploadFailed(tenacity.RetryError, exporter.ExportError):
"""Upload failure."""
def __str__(self):
return str(self.last_attempt.exception())
@attr.s
class PprofHTTPExporter(pprof.PprofExporter):
"""PProf HTTP exporter."""
# repeat this to please mypy
enable_code_provenance = attr.ib(default=True, type=bool)
endpoint = attr.ib(type=str, factory=agent.get_trace_url)
api_key = attr.ib(default=None, type=typing.Optional[str])
# Do not use the default agent timeout: it is too short, the agent is just a unbuffered proxy and the profiling
# backend is not as fast as the tracer one.
<<<<<<< HEAD
timeout = attr.ib(factory=attr_utils.from_env("DD_PROFILING_API_TIMEOUT", 10.0, float), type=float)
=======
timeout = attr.ib(
factory=attr_utils.from_env("DD_PROFILING_API_TIMEOUT", 10.0, float),
type=float,
)
>>>>>>> 0ba3e299 (Remove ignores for updated mypy (#4253))
service = attr.ib(default=None, type=typing.Optional[str])
env = attr.ib(default=None, type=typing.Optional[str])
version = attr.ib(default=None, type=typing.Optional[str])
tags = attr.ib(factory=dict, type=typing.Dict[str, bytes])
max_retry_delay = attr.ib(default=None)
_container_info = attr.ib(factory=container.get_container_info, repr=False)
_retry_upload = attr.ib(init=False, eq=False)
endpoint_path = attr.ib(default="/profiling/v1/input")
def __attrs_post_init__(self):
if self.max_retry_delay is None:
self.max_retry_delay = self.timeout * 3
self._retry_upload = tenacity.Retrying(
# Retry after 1s, 2s, 4s, 8s with some randomness
wait=tenacity.wait_random_exponential(multiplier=0.5),
stop=tenacity.stop_after_delay(self.max_retry_delay),
retry_error_cls=UploadFailed,
retry=tenacity.retry_if_exception_type((http_client.HTTPException, OSError, IOError)),
)
tags = {
k: six.ensure_binary(v)
for k, v in itertools.chain(
parse_tags_str(os.environ.get("DD_TAGS")).items(),
parse_tags_str(os.environ.get("DD_PROFILING_TAGS")).items(),
)
}
tags.update({k: six.ensure_binary(v) for k, v in self.tags.items()})
tags.update(
{
"host": HOSTNAME.encode("utf-8"),
"language": b"python",
"runtime": PYTHON_IMPLEMENTATION,
"runtime_version": PYTHON_VERSION,
"profiler_version": ddtrace.__version__.encode("ascii"),
}
)
if self.version:
tags["version"] = self.version.encode("utf-8")
if self.env:
tags["env"] = self.env.encode("utf-8")
self.tags = tags
@staticmethod
def _encode_multipart_formdata(
fields, # type: typing.Dict[str, bytes]
tags, # type: typing.Dict[str, bytes]
data, # type: typing.Dict[bytes, bytes]
):
# type: (...) -> typing.Tuple[bytes, bytes]
boundary = binascii.hexlify(os.urandom(16))
# The body that is generated is very sensitive and must perfectly match what the server expects.
body = (
b"".join(
b"--%s\r\n"
b'Content-Disposition: form-data; name="%s"\r\n'
b"\r\n"
b"%s\r\n" % (boundary, field.encode(), value)
for field, value in fields.items()
if field != "chunk-data"
)
+ b"".join(
b"--%s\r\n"
b'Content-Disposition: form-data; name="tags[]"\r\n'
b"\r\n"
b"%s:%s\r\n" % (boundary, tag.encode(), value)
for tag, value in tags.items()
)
+ b"".join(
(
b'--%s\r\nContent-Disposition: form-data; name="data[%s]"; filename="%s"\r\n'
% (boundary, field_name, field_name)
)
+ b"Content-Type: application/octet-stream\r\n\r\n"
+ field_data
+ b"\r\n"
for field_name, field_data in data.items()
)
+ b"--%s--" % boundary
)
content_type = b"multipart/form-data; boundary=%s" % boundary
return content_type, body
def _get_tags(
self, service # type: str
):
# type: (...) -> typing.Dict[str, bytes]
tags = {
"service": service.encode("utf-8"),
"runtime-id": runtime.get_runtime_id().encode("ascii"),
}
tags.update(self.tags)
return tags
def export(
self,
events, # type: recorder.EventsType
start_time_ns, # type: int
end_time_ns, # type: int
):
# type: (...) -> typing.Tuple[pprof.pprof_ProfileType, typing.List[pprof.Package]]
"""Export events to an HTTP endpoint.
:param events: The event dictionary from a `ddtrace.profiling.recorder.Recorder`.
:param start_time_ns: The start time of recording.
:param end_time_ns: The end time of recording.
"""
if self.api_key:
headers = {
"DD-API-KEY": self.api_key.encode(),
}
else:
headers = {}
if self._container_info and self._container_info.container_id:
headers["Datadog-Container-Id"] = self._container_info.container_id
profile, libs = super(PprofHTTPExporter, self).export(events, start_time_ns, end_time_ns)
pprof = six.BytesIO()
with gzip.GzipFile(fileobj=pprof, mode="wb") as gz:
gz.write(profile.SerializeToString())
fields = {
"version": b"3",
"family": b"python",
"runtime-id": runtime.get_runtime_id().encode("ascii"),
"start": (
datetime.datetime.utcfromtimestamp(start_time_ns / 1e9).replace(microsecond=0).isoformat() + "Z"
).encode(),
"end": (
datetime.datetime.utcfromtimestamp(end_time_ns / 1e9).replace(microsecond=0).isoformat() + "Z"
).encode(),
}
service = self.service or os.path.basename(profile.string_table[profile.mapping[0].filename])
data = {b"auto.pprof": pprof.getvalue()}
if self.enable_code_provenance:
code_provenance = six.BytesIO()
with gzip.GzipFile(fileobj=code_provenance, mode="wb") as gz:
gz.write(
json.dumps(
{
"v1": libs,
}
).encode("utf-8")
)
data[b"code-provenance.json"] = code_provenance.getvalue()
content_type, body = self._encode_multipart_formdata(
fields,
tags=self._get_tags(service),
data=data,
)
headers["Content-Type"] = content_type
client = agent.get_connection(self.endpoint, self.timeout)
self._upload(client, self.endpoint_path, body, headers)
return profile, libs
def _upload(self, client, path, body, headers):
self._retry_upload(self._upload_once, client, path, body, headers)
def _upload_once(self, client, path, body, headers):
try:
client.request("POST", path, body=body, headers=headers)
response = client.getresponse()
response.read() # reading is mandatory
finally:
client.close()
if 200 <= response.status < 300:
return
if 500 <= response.status < 600:
raise tenacity.TryAgain
if response.status == 400:
raise exporter.ExportError("Server returned 400, check your API key")
elif response.status == 404 and not self.api_key:
raise exporter.ExportError(
"Datadog Agent is not accepting profiles. "
"Agent-based profiling deployments require Datadog Agent >= 7.20"
)
raise exporter.ExportError("HTTP Error %d" % response.status)