-
Notifications
You must be signed in to change notification settings - Fork 13
/
http_client.py
119 lines (88 loc) · 3.7 KB
/
http_client.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
import gzip
import json
import os
import platform
from requests import Session
from requests.adapters import HTTPAdapter
from requests.packages.urllib3.util.retry import Retry
from launchable.version import __version__
from .authentication import get_org_workspace, authentication_headers
from .env_keys import BASE_URL_KEY
from .logger import Logger, AUDIT_LOG_FORMAT
DEFAULT_BASE_URL = "https://api.mercury.launchableinc.com"
def get_base_url():
return os.getenv(BASE_URL_KEY) or DEFAULT_BASE_URL
class DryRunResponse:
def __init__(self, status_code, payload):
self.status_code = status_code
self.payload = payload
def raise_for_status(self):
return
def json(self):
return self.payload
class LaunchableClient:
def __init__(self, base_url: str = "", session: Session = None, test_runner: str = "", dry_run: bool = False):
self.base_url = base_url or get_base_url()
self.dry_run = dry_run
if session is None:
strategy = Retry(
total=3,
allowed_methods=["GET", "PUT", "PATCH", "DELETE"],
status_forcelist=[429, 500, 502, 503, 504],
backoff_factor=2
)
adapter = HTTPAdapter(max_retries=strategy)
s = Session()
s.mount("http://", adapter)
s.mount("https://", adapter)
self.session = s
else:
self.session = session
self.test_runner = test_runner
self.organization, self.workspace = get_org_workspace()
if self.organization is None or self.workspace is None:
raise ValueError("Organization/workspace cannot be empty")
def request(self, method, sub_path, payload=None, timeout=(5, 60), compress=False):
url = _join_paths(self.base_url, "/intake/organizations/{}/workspaces/{}".format(
self.organization, self.workspace), sub_path)
headers = self._headers(compress)
Logger().audit(AUDIT_LOG_FORMAT.format(
"(DRY RUN) " if self.dry_run else "", method, url, headers, payload))
if self.dry_run and method.upper() not in ["HEAD", "GET"]:
return DryRunResponse(status_code=200, payload={
"id": "dry-run-session", # `record session` use this
"testPaths": [], # `split_subset` use this
"rest": [], # `split_subset` use this
})
data = _build_data(payload, compress)
try:
response = self.session.request(
method, url, headers=headers, timeout=timeout, data=data)
Logger().debug(
"received response status:{} message:{} headers:{}".format(
response.status_code, response.reason, response.headers)
)
return response
except Exception as e:
raise Exception("unable to post to %s" % url) from e
def _headers(self, compress):
h = {
"User-Agent": "Launchable/{} (Python {}, {})".format(__version__, platform.python_version(),
platform.platform()),
"Content-Type": "application/json"
}
if compress:
h["Content-Encoding"] = "gzip"
if self.test_runner != "":
h["User-Agent"] = h["User-Agent"] + \
" TestRunner/{}".format(self.test_runner)
return {**h, **authentication_headers()}
def _build_data(payload, compress):
if payload is None:
return None
encoded = json.dumps(payload).encode()
if compress:
return gzip.compress(encoded)
return encoded
def _join_paths(*components):
return '/'.join([c.strip('/') for c in components])