-
Notifications
You must be signed in to change notification settings - Fork 4k
/
client.py
152 lines (132 loc) · 5.68 KB
/
client.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
"""
This module provides utilities for performing Azure Blob Storage operations without requiring
the heavyweight azure-storage-blob library dependency
"""
from copy import deepcopy
import urllib
import logging
from mlflow.utils import rest_utils
_logger = logging.getLogger(__name__)
_PUT_BLOCK_HEADERS = {
"x-ms-blob-type": "BlockBlob",
}
def put_block(sas_url, block_id, data, headers):
"""
Performs an Azure `Put Block` operation
(https://docs.microsoft.com/en-us/rest/api/storageservices/put-block)
:param sas_url: A shared access signature URL referring to the Azure Block Blob
to which the specified data should be staged.
:param block_id: A base64-encoded string identifying the block.
:param data: Data to include in the Put Block request body.
:param headers: Additional headers to include in the Put Block request body
(the `x-ms-blob-type` header is always included automatically).
"""
request_url = _append_query_parameters(sas_url, {"comp": "block", "blockid": block_id})
request_headers = deepcopy(_PUT_BLOCK_HEADERS)
for name, value in headers.items():
if _is_valid_put_block_header(name):
request_headers[name] = value
else:
_logger.debug("Removed unsupported '%s' header for Put Block operation", name)
with rest_utils.cloud_storage_http_request(
"put", request_url, data=data, headers=request_headers
) as response:
rest_utils.augmented_raise_for_status(response)
def put_block_list(sas_url, block_list, headers):
"""
Performs an Azure `Put Block List` operation
(https://docs.microsoft.com/en-us/rest/api/storageservices/put-block-list)
:param sas_url: A shared access signature URL referring to the Azure Block Blob
to which the specified data should be staged.
:param block_list: A list of uncommitted base64-encoded string block IDs to commit. For
more information, see
https://docs.microsoft.com/en-us/rest/api/storageservices/put-block-list.
:param headers: Headers to include in the Put Block request body.
"""
request_url = _append_query_parameters(sas_url, {"comp": "blocklist"})
data = _build_block_list_xml(block_list)
request_headers = {}
for name, value in headers.items():
if _is_valid_put_block_list_header(name):
request_headers[name] = value
else:
_logger.debug("Removed unsupported '%s' header for Put Block List operation", name)
with rest_utils.cloud_storage_http_request(
"put", request_url, data=data, headers=request_headers
) as response:
rest_utils.augmented_raise_for_status(response)
def _append_query_parameters(url, parameters):
parsed_url = urllib.parse.urlparse(url)
query_dict = dict(urllib.parse.parse_qsl(parsed_url.query))
query_dict.update(parameters)
new_query = urllib.parse.urlencode(query_dict)
new_url_components = parsed_url._replace(query=new_query)
new_url = urllib.parse.urlunparse(new_url_components)
return new_url
def _build_block_list_xml(block_list):
xml = '<?xml version="1.0" encoding="utf-8"?>\n<BlockList>\n'
for block_id in block_list:
# Because block IDs are base64-encoded and base64 strings do not contain
# XML special characters, we can safely insert the block ID directly into
# the XML document
xml += "<Uncommitted>{}</Uncommitted>\n".format(block_id)
xml += "</BlockList>"
return xml
def _is_valid_put_block_list_header(header_name):
"""
:return: True if the specified header name is a valid header for the Put Block List operation,
False otherwise. For a list of valid headers, see https://docs.microsoft.com/en-us/
rest/api/storageservices/put-block-list#request-headers and https://docs.microsoft.com/
en-us/rest/api/storageservices/
specifying-conditional-headers-for-blob-service-operations#Subheading1.
"""
return header_name.startswith("x-ms-meta-") or header_name in set(
[
"Authorization",
"Date",
"x-ms-date",
"x-ms-version",
"Content-Length",
"Content-MD5",
"x-ms-content-crc64",
"x-ms-blob-cache-control",
"x-ms-blob-content-type",
"x-ms-blob-content-encoding",
"x-ms-blob-content-language",
"x-ms-blob-content-md5",
"x-ms-encryption-scope",
"x-ms-tags",
"x-ms-lease-id",
"x-ms-client-request-id",
"x-ms-blob-content-disposition",
"x-ms-access-tier",
"If-Modified-Since",
"If-Unmodified-Since",
"If-Match",
"If-None-Match",
]
)
def _is_valid_put_block_header(header_name):
"""
:return: True if the specified header name is a valid header for the Put Block operation, False
otherwise. For a list of valid headers, see
https://docs.microsoft.com/en-us/rest/api/storageservices/put-block#request-headers and
https://docs.microsoft.com/en-us/rest/api/storageservices/put-block#
request-headers-customer-provided-encryption-keys.
"""
return header_name in set(
[
"Authorization",
"x-ms-date",
"x-ms-version",
"Content-Length",
"Content-MD5",
"x-ms-content-crc64",
"x-ms-encryption-scope",
"x-ms-lease-id",
"x-ms-client-request-id",
"x-ms-encryption-key",
"x-ms-encryption-key-sha256",
"x-ms-encryption-algorithm",
]
)