/
credential.py
359 lines (297 loc) · 12.3 KB
/
credential.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
# -*- coding: utf-8 -*-
# Copyright (c) 2018 Tencent Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
import os
import time
try:
# py3
import configparser
from urllib.parse import urlencode
from urllib.request import urlopen
except ImportError:
# py2
import ConfigParser as configparser
from urllib import urlencode
from urllib import urlopen
from tencentcloud.common.exception.tencent_cloud_sdk_exception import TencentCloudSDKException
from tencentcloud.common.common_client import CommonClient
class Credential(object):
def __init__(self, secret_id, secret_key, token=None):
"""Tencent Cloud Credentials.
Access https://console.cloud.tencent.com/cam/capi to manage your
credentials.
:param secret_id: The secret id of your credential.
:type secret_id: str
:param secret_key: The secret key of your credential.
:type secret_key: str
:param token: The federation token of your credential, if this field
is specified, secret_id and secret_key should be set
accordingly, see: https://cloud.tencent.com/document/product/598/13896
"""
if secret_id is None or secret_id.strip() == "":
raise TencentCloudSDKException("InvalidCredential", "secret id should not be none or empty")
if secret_id.strip() != secret_id:
raise TencentCloudSDKException("InvalidCredential", "secret id should not contain spaces")
self.secret_id = secret_id
if secret_key is None or secret_key.strip() == "":
raise TencentCloudSDKException("InvalidCredential", "secret key should not be none or empty")
if secret_key.strip() != secret_key:
raise TencentCloudSDKException("InvalidCredential", "secret key should not contain spaces")
self.secret_key = secret_key
self.token = token
@property
def secretId(self):
return self.secret_id
@property
def secretKey(self):
return self.secret_key
class CVMRoleCredential(object):
_metadata_endpoint = "http://metadata.tencentyun.com/latest/meta-data/"
_role_endpoint = _metadata_endpoint + "cam/security-credentials/"
# In seconds.
# Signatrue will fail if request timestamp gap is larger than 300s,
# so a strategy to refresh token just before that time is acceptable.
_expired_timeout = 300
def __init__(self, role_name=None):
self.role = role_name
self._secret_id = None
self._secret_key = None
self._token = None
self._expired_ts = 0
@property
def secretId(self):
return self.secret_id
@property
def secret_id(self):
self.update_credential()
return self._secret_id
@property
def secretKey(self):
return self.secret_key
@property
def secret_key(self):
self.update_credential()
return self._secret_key
@property
def token(self):
self.update_credential()
return self._token
def get_role_name(self):
if self.role:
return self.role
try:
resp = urlopen(self._role_endpoint)
self.role = resp.read().decode("utf8")
except Exception as e:
raise TencentCloudSDKException("ClientError.MetadataError", str(e))
finally:
return self.role
def _need_refresh(self):
ts_remain = self._expired_ts - int(time.time())
if ts_remain <= self._expired_timeout:
return True
else:
return False
def update_credential(self):
if not self._need_refresh():
return
role = self.get_role_name()
try:
# TODO: what if role has special characters such as space and unicode?
resp = urlopen(self._role_endpoint + role)
# py3 requires it to be string rather than byte
data = resp.read().decode("utf8")
j = json.loads(data)
if j.get("Code") != "Success":
raise Exception("CVM role token data failed: %s" % data)
self._secret_id = j["TmpSecretId"]
self._secret_key = j["TmpSecretKey"]
self._token = j["Token"]
self._expired_ts = j["ExpiredTime"]
except Exception as e:
# we shoud log it
# maybe we should validate token to None as well
pass
def get_credential(self):
if self.secretId is None or self.secretKey is None or self._token is None:
return None
if len(self.secretId) == 0 or len(self.secretKey) == 0 or len(self.token) == 0:
return None
return self
class STSAssumeRoleCredential(object):
"""使用STSAssumeRoleCredential,制动role,
可以自动生成临时凭证,并使用临时凭证调用接口
"""
_region = "ap-guangzhou"
_version = '2018-08-13'
_service = "sts"
def __init__(self, secret_id, secret_key, role_arn, role_session_name, duration_seconds=7200):
"""
:param secret_id: 接口调用凭证id
:type secret_id: str
:param secret_key: 接口调用凭证key
:type secret_key: str
https://cloud.tencent.com/document/api/1312/48197
:param role_arn: 角色的资源描述,参考官网文档 https://cloud.tencent.com/document/api/1312/48197 中 RoleArn 参数的描述。
:type role_arn: str
:param role_session_name: 临时会话名称,由用户自定义名称
:type role_session_name: str
:param duration_seconds: 获取临时凭证的有效期,默认7200s
:type duration_seconds: int
"""
self._long_secret_id = secret_id
self._long_secret_key = secret_key
self._role_arn = role_arn
self._role_session_name = role_session_name
self._duration_seconds = duration_seconds
self._token = None
self._tmp_secret_id = None
self._tmp_secret_key = None
self._expired_time = 0
self._last_role_arn = None
self._tmp_credential = None
@property
def secretId(self):
self._need_refresh()
return self._tmp_secret_id
@property
def secretKey(self):
self._need_refresh()
return self._tmp_secret_key
@property
def secret_id(self):
self._need_refresh()
return self._tmp_secret_id
@property
def secret_key(self):
self._need_refresh()
return self._tmp_secret_key
@property
def token(self):
self._need_refresh()
return self._token
def _need_refresh(self):
"""
https://cloud.tencent.com/document/api/1312/48197
此函数自动使用初始secret_id和secret_key,自动调用上述链接中获取临时凭证的接口,并返回临时凭证
:param role_arn: 角色的资源描述,上述链接RoleArn参数中有详细获取方式
:type role_arn: str
:param role_session_name: 临时会话名称,由用户自定义名称
:type role_session_name: str
:param duration_seconds: 获取临时凭证的有效期,默认7200s
:type duration_seconds: int
"""
if None in [self._token, self._tmp_secret_key, self._tmp_secret_id] or self._expired_time < int(time.time()):
self.get_sts_tmp_role_arn()
def get_sts_tmp_role_arn(self):
cred = Credential(self._long_secret_id, self._long_secret_key)
common_client = CommonClient(credential=cred, region=self._region, version=self._version, service=self._service)
params = {
"RoleArn": self._role_arn,
"RoleSessionName": self._role_session_name,
"DurationSeconds": self._duration_seconds
}
t_c = common_client.call_json("AssumeRole", params)
self._token = t_c["Response"]["Credentials"]["Token"]
self._tmp_secret_id = t_c["Response"]["Credentials"]["TmpSecretId"]
self._tmp_secret_key = t_c["Response"]["Credentials"]["TmpSecretKey"]
self._expired_time = t_c["Response"]["ExpiredTime"] - self._duration_seconds*0.9
class EnvironmentVariableCredential():
def get_credential(self):
"""Tencent Cloud EnvironmentVariableCredential.
Access https://console.cloud.tencent.com/cam/capi to manage your
credentials.
:param secret_id: The secret id of your credential, get by environment variable TENCENTCLOUD_SECRET_ID
:type secret_id: str
:param secret_key: The secret key of your credential. get by environment variable TENCENTCLOUD_SECRET_KEY
:type secret_key: str
"""
self.secret_id = os.environ.get('TENCENTCLOUD_SECRET_ID')
self.secret_key = os.environ.get('TENCENTCLOUD_SECRET_KEY')
if self.secret_id is None or self.secret_key is None:
return None
if len(self.secret_id) == 0 or len(self.secret_key) == 0:
return None
return Credential(self.secret_id, self.secret_key)
class ProfileCredential():
def get_credential(self):
"""Tencent Cloud ProfileCredential.
Access https://console.cloud.tencent.com/cam/capi to manage your credentials.
default file position is "~/.tencentcloud/credentials" or "/etc/tencentcloud/credentials", it is ini format.
such as:
[default]
secret_id=""
secret_key=""
:param secret_id: The secret id of your credential.
:type secret_id: str
:param secret_key: The secret key of your credential.
:type secret_key: str
"""
if os.path.exists(os.environ['HOME'] + "/.tencentcloud/credentials"):
file_path = os.environ['HOME'] + "/.tencentcloud/credentials"
elif os.path.exists("/etc/tencentcloud/credentials"):
file_path = "/etc/tencentcloud/credentials"
else:
file_path = ""
if file_path:
# loads config
conf = configparser.ConfigParser()
conf.read(file_path)
ini_map = dict(conf._sections)
for k in dict(conf._sections):
option = dict(ini_map[k])
for key, value in dict(ini_map[k]).items():
option[key] = value.strip()
ini_map[k] = option
if "default" in ini_map:
client_config = ini_map.get("default")
self.secret_id = client_config.get('secret_id', None)
self.secret_key = client_config.get('secret_key', None)
self.role_arn = client_config.get('role_arn', None)
else:
self.secret_id = None
self.secret_key = None
self.role_arn = None
if self.secret_id is None or self.secret_key is None:
return None
if len(self.secret_id) == 0 or len(self.secret_key) == 0:
return None
return Credential(self.secret_id, self.secret_key)
class DefaultCredentialProvider(object):
"""Tencent Cloud DefaultCredentialProvider.
DefaultCredentialProvider will search credential by order EnvironmentVariableCredential ProfileCredential
and CVMRoleCredential.
"""
def __init__(self):
self.cred = None
def get_credentials(self):
if self.cred is not None:
return self.cred
env_cred_ins = EnvironmentVariableCredential()
env_cred = env_cred_ins.get_credential()
self.cred = env_cred
if self.cred is not None:
return self.cred
prof_cred_ins = ProfileCredential()
prof_cred = prof_cred_ins.get_credential()
self.cred = prof_cred
if self.cred is not None:
return self.cred
cvm_role_crd_ins = CVMRoleCredential()
cvm_role_crd = cvm_role_crd_ins.get_credential()
self.cred = cvm_role_crd
if self.cred is not None:
return self.cred
raise TencentCloudSDKException("ClientSideError", "no valid credentail.")