-
Notifications
You must be signed in to change notification settings - Fork 5
/
magpieprocess.py
494 lines (441 loc) · 27.7 KB
/
magpieprocess.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
"""
Store adapters to read data from magpie.
"""
from magpie.utils import get_magpie_url, get_admin_cookies
from magpie.api.api_except import raise_http
from magpie.constants import get_constant
from magpie.common import get_logger, JSON_TYPE
from magpie.definitions.pyramid_definitions import (
HTTPOk,
HTTPCreated,
HTTPNotFound,
HTTPConflict,
HTTPNotImplemented,
asbool,
)
# import 'process' elements separately than 'twitcher_definitions' because not defined in master
# noinspection PyUnresolvedReferences
from twitcher.utils import get_twitcher_url
# noinspection PyUnresolvedReferences
from twitcher.config import get_twitcher_configuration, TWITCHER_CONFIGURATION_EMS
# noinspection PyUnresolvedReferences
from twitcher.exceptions import ProcessNotFound, ProcessRegistrationError
# noinspection PyUnresolvedReferences
from twitcher.adapter.default import DefaultAdapter
# noinspection PyUnresolvedReferences
from twitcher.store.base import ProcessStore
# noinspection PyUnresolvedReferences
from twitcher.visibility import VISIBILITY_PUBLIC, VISIBILITY_PRIVATE, visibility_values
# noinspection PyUnresolvedReferences
from twitcher.datatype import Process # noqa: F401
from typing import TYPE_CHECKING
import six
import requests
if TYPE_CHECKING:
from typing import List, Optional, Iterable, Union, AnyStr # noqa: F401
from magpie.definitions.pyramid_definitions import Registry # noqa: F401
LOGGER = get_logger("TWITCHER")
class MagpieProcessStore(ProcessStore):
"""
Registry for OWS processes.
Uses default process store for most operations.
Uses magpie to update process access and visibility.
"""
def __init__(self, registry):
# type: (Registry) -> None
super(MagpieProcessStore, self).__init__()
self.magpie_url = get_magpie_url(registry)
self.twitcher_ssl_verify = asbool(registry.settings.get('twitcher.ows_proxy_ssl_verify', True))
self.magpie_admin_token = get_admin_cookies(self.magpie_url, self.twitcher_ssl_verify)
self.magpie_admin_group = get_constant('MAGPIE_ADMIN_GROUP')
self.magpie_users = get_constant('MAGPIE_USERS_GROUP')
self.magpie_editors = get_constant('MAGPIE_EDITOR_GROUP')
self.magpie_current = get_constant('MAGPIE_LOGGED_USER')
self.magpie_service = 'ems'
self.default_process_store = DefaultAdapter().processstore_factory(registry)
self.twitcher_config = get_twitcher_configuration(registry.settings)
self.twitcher_url = get_twitcher_url(registry.settings)
self.json_headers = {'Accept': JSON_TYPE, 'Content-Type': JSON_TYPE}
# setup basic configuration ('/ems' service of type 'api', '/ems/processes' resource, admin full permissions)
ems_res_id = self._create_resource(self.magpie_service, resource_parent_id=None, resource_type='service',
group_names=self.magpie_admin_group, permission_names=['read', 'write'],
extra_data={'service_type': 'api', 'service_url': self.twitcher_url})
proc_res_id = self._create_resource('processes', ems_res_id) # admins inherit from parent service permissions
# editors can read/write processes, but will only be able to modify visibility for 'their' (per user) process
# permissions of each corresponding process have to be added for the requesting user when created
self._create_group(self.magpie_editors) # create in case it doesn't exist (non-standard group)
self._create_resource_permissions(ems_res_id, ['read-match', 'write-match'], group_names=self.magpie_editors)
self._create_resource_permissions(proc_res_id, ['read-match', 'write-match'], group_names=self.magpie_editors)
# users can only read processes, but not edit/deploy/remove them (no PUT/POST/DELETE requests)
self._create_resource_permissions(ems_res_id, 'read-match', group_names=self.magpie_users)
self._create_resource_permissions(proc_res_id, 'read-match', group_names=self.magpie_users)
def _find_resource_id(self, parent_resource_id, resource_name):
# type: (int, AnyStr) -> int
"""
Finds the resource id corresponding to a child ``resource_name`` of ``parent_resource_id``.
If ``parent_resource_id`` is ``None``, suppose the resource is a ``service``, search by ``resource_name``.
:param parent_resource_id: id of the resource from which to search children resources.
:param resource_name: name of the sub resource to find.
:return: found resource id
"""
if not parent_resource_id:
path = '{host}/services/{svc}'.format(host=self.magpie_url, svc=resource_name)
resp = requests.get(path, cookies=self.magpie_admin_token,
headers=self.json_headers, verify=self.twitcher_ssl_verify)
resp.raise_for_status()
return resp.json()['service']['resource_id']
path = '{host}/resources/{id}'.format(host=self.magpie_url, id=parent_resource_id)
resp = requests.get(path, cookies=self.magpie_admin_token,
headers=self.json_headers, verify=self.twitcher_ssl_verify)
resp.raise_for_status()
child_res_id = None
parent_resource_info = resp.json()['resource']
children_resources = parent_resource_info['children']
for res_id in children_resources:
if children_resources[res_id]['resource_name'] == resource_name:
child_res_id = children_resources[res_id]['resource_id']
return child_res_id
if not child_res_id:
detail = "Could not find resource `{}` under resource `{}`." \
.format(resource_name, parent_resource_info['resource_name'])
raise_http(httpError=HTTPNotFound, detail=detail)
def _get_service_processes_resource(self):
# type: (...) -> Union[int, None]
"""
Finds the magpie resource 'processes' corresponding to '/ems/processes'.
:returns: id of the 'processes' resource.
"""
path = '{host}/resources'.format(host=self.magpie_url)
resp = requests.get(path, cookies=self.magpie_admin_token,
headers=self.json_headers, verify=self.twitcher_ssl_verify)
resp.raise_for_status()
ems_resources = None
try:
ems_resources = resp.json()['resources']['api'][self.magpie_service]['resources']
for res_id in ems_resources:
if ems_resources[res_id]['resource_name'] == 'processes':
ems_processes_id = ems_resources[res_id]['resource_id']
return ems_processes_id
except KeyError:
LOGGER.debug("Content of `{}` service resources: `{!r}`.".format(self.magpie_service, ems_resources))
raise ProcessNotFound("Could not find resource `processes` endpoint.")
except Exception as ex:
LOGGER.debug("Exception during `{}` resources retrieval: [{}]".format(self.magpie_service, repr(ex)))
raise
LOGGER.debug("Could not find resource: `processes`.")
return None
def _create_group(self, group_name):
# type: (str) -> None
"""Creates group if it doesn't exist."""
path = '{host}/groups'.format(host=self.magpie_url)
resp = requests.post(path, cookies=self.magpie_admin_token, json={u'group_name': group_name},
headers=self.json_headers, verify=self.twitcher_ssl_verify)
if resp.status_code not in (HTTPCreated.code, HTTPConflict.code):
LOGGER.debug("Group `{}` creation or validation failed.".format(group_name))
resp.raise_for_status()
def _create_resource_permissions(self,
resource_id, # type: int
permission_names, # type: Union[AnyStr, List[AnyStr]]
group_names=None, # type: Optional[Union[AnyStr, List[AnyStr]]]
user_names=None, # type: Optional[Union[AnyStr, List[AnyStr]]]
): # type: (...) -> None
"""
Creates group permission(s) on a resource.
:param resource_id: magpie id of the resource to apply permissions on.
:param permission_names: permission(s) to apply to the resource.
:param group_names: name of the group(s) for which to apply permissions, if Any.
:param user_names: name of the user(s) for which to apply permissions, if Any.
"""
if not user_names:
user_names = list()
if not group_names:
group_names = list()
if isinstance(user_names, six.string_types):
user_names = [user_names]
if isinstance(group_names, six.string_types):
group_names = [group_names]
if isinstance(permission_names, six.string_types):
permission_names = [permission_names]
user_group_tuples = [('users', user) for user in user_names] + [('groups', group) for group in group_names]
for perm in permission_names:
data = {u'permission_name': perm}
for usr_grp, usr_grp_id in user_group_tuples:
path = '{host}/{usr_grp}/{id}/resources/{res_id}/permissions' \
.format(host=self.magpie_url, usr_grp=usr_grp, id=usr_grp_id, res_id=resource_id)
resp = requests.post(path, json=data, cookies=self.magpie_admin_token,
headers=self.json_headers, verify=self.twitcher_ssl_verify)
# permission is set if created or already exists
if resp.status_code not in (HTTPCreated.code, HTTPConflict.code):
resp.raise_for_status()
def _delete_resource_permissions(self,
resource_id, # type: int
permission_names, # type: Union[AnyStr, List[AnyStr]]
group_names=None, # type: Optional[Union[AnyStr, List[AnyStr]]]
user_names=None, # type: Optional[Union[AnyStr, List[AnyStr]]]
): # type: (...) -> None
"""
Deletes group permission(s) on a resource.
:param resource_id: magpie id of the resource to remove permissions from.
:param permission_names: group permission(s) to apply to the resource.
:param group_names: name of the group(s) for which to apply permissions, if Any.
:param user_names: name of the user(s) for which to apply permissions, if Any.
"""
if not user_names:
user_names = list()
if not group_names:
group_names = list()
if isinstance(user_names, six.string_types):
user_names = [user_names]
if isinstance(group_names, six.string_types):
group_names = [group_names]
if isinstance(permission_names, six.string_types):
permission_names = [permission_names]
user_group_tuples = [('users', user) for user in user_names] + [('groups', group) for group in group_names]
for perm in permission_names:
for usr_grp, usr_grp_id in user_group_tuples:
path = '{host}/{usr_grp}/{id}/resources/{res_id}/permissions/{perm}' \
.format(host=self.magpie_url, usr_grp=usr_grp, id=usr_grp_id, res_id=resource_id, perm=perm)
reps = requests.delete(path, cookies=self.magpie_admin_token,
headers=self.json_headers, verify=self.twitcher_ssl_verify)
# permission is not set if deleted or non existing
if reps.status_code not in (HTTPOk.code, HTTPNotFound.code):
reps.raise_for_status()
def _create_resource(self,
resource_name, # type: AnyStr
resource_parent_id, # type: int, None
group_names=None, # type: Optional[Union[AnyStr, Iterable[AnyStr]]]
permission_names=None, # type: Optional[Union[AnyStr, Iterable[AnyStr]]]
resource_type='route', # type: Optional[AnyStr]
extra_data=None, # type: Optional[dict]
): # type: (...) -> int
"""
Creates a resource under another parent resource, and sets basic group permissions on it.
If the resource already exists for some reason, use it instead of the created one, and apply permissions.
:param resource_name: name of the resource to create.
:param resource_parent_id: id of the parent resource under which to create `resource_name`.
:param group_names: group name(s) for which to apply permissions to the created resource, if any.
:param permission_names: group permissions to apply to the created resource, if any.
:param resource_type: type of resource to be created.
:returns: id of the created resource
"""
try:
data = {u'parent_id': resource_parent_id, u'resource_name': resource_name, u'resource_type': resource_type}
post_type = 'resources'
if resource_type == 'service':
post_type = 'services'
data.update(extra_data or {})
data.update({'service_name': resource_name})
path = '{host}/{type}'.format(host=self.magpie_url, type=post_type)
resp = requests.post(path, json=data, cookies=self.magpie_admin_token,
headers=self.json_headers, verify=self.twitcher_ssl_verify)
res_id = None
if resp.status_code == HTTPCreated.code:
if resource_type == 'service':
res_id = self._find_resource_id(resource_parent_id, resource_name)
else:
res_id = resp.json()['resource']['resource_id']
elif resp.status_code == HTTPConflict.code:
res_id = self._find_resource_id(resource_parent_id, resource_name)
else:
resp.raise_for_status()
if group_names is not None and permission_names is not None:
self._create_resource_permissions(res_id, permission_names, group_names=group_names)
return res_id
except KeyError:
raise ProcessRegistrationError("Failed adding process resource route `{}`.".format(resource_name))
except Exception as ex:
LOGGER.debug("Exception during process resource creation: [{}]".format(repr(ex)))
raise
def save_process(self, process, overwrite=True, request=None):
# type: (Process, Optional[bool], Optional[requests.Request]) -> None
"""
Save a new process.
If twitcher is not in EMS mode, delegate execution to default twitcher process store.
If twitcher is in EMS mode:
- user requesting creation must have sufficient user/group permissions in magpie to do so.
(otherwise, this code won't be reached because of :class:`MagpieOWSSecurity` blocking the create route.
- assign any pre-required routes permissions to allow admins and current user to edit ``/ems/processes/...``
Requirements:
- service ``self.magpie_service`` of type ``'api'`` must exist (see ``__init__``)
- group ``'administrators'`` must have ``['read', 'write']`` permissions on ``magpie_service``
"""
if self.twitcher_config == TWITCHER_CONFIGURATION_EMS:
try:
# get resource id of ems service
path = '{host}/services/{svc}'.format(host=self.magpie_url, svc=self.magpie_service)
resp = requests.get(path, cookies=self.magpie_admin_token,
headers=self.json_headers, verify=self.twitcher_ssl_verify)
resp.raise_for_status()
ems_res_id = resp.json()['service']['resource_id']
except KeyError:
raise ProcessRegistrationError("Failed retrieving service resource.")
except Exception as ex:
LOGGER.debug("Exception during `{0}` resource retrieval: [{1}]".format(self.magpie_service, repr(ex)))
raise
# create resources of sub-routes '/{process_id}', '/{process_id}/jobs', '/{process_id}/quotations'
# do not apply any users/editors permissions at first, so that the process is 'private' by default
proc_res_id = self._find_resource_id(ems_res_id, 'processes')
process_res_id = self._create_resource(process.id, proc_res_id)
self._create_resource(u'jobs', process_res_id)
self._create_resource(u'quotations', process_res_id)
# current editor user is the only one allowed to edit his process (except admins), get is name from session
resp = requests.get('{host}/session'.format(host=self.magpie_url), cookies=request.cookies, # current user
headers=self.json_headers, verify=self.twitcher_ssl_verify)
resp.raise_for_status()
try:
user_name = resp.json()['user']['user_name']
self._create_resource_permissions(process_res_id, ['read', 'write'], user_names=user_name)
except KeyError:
# If the request is anonymous do not create the permission
pass
return self.default_process_store.save_process(process, overwrite, request)
def delete_process(self, process_id, visibility=None, request=None):
# type: (AnyStr, Optional[AnyStr], Optional[requests.Request]) -> bool
"""
Delete a process.
Delegate execution to default twitcher process store.
If twitcher is in EMS mode:
- user requesting deletion must have user/group permissions in magpie to do so.
(otherwise, this code won't be reached because of :class:`MagpieOWSSecurity` blocking the delete route.
- also delete magpie resources tree corresponding to the process
"""
if self.twitcher_config == TWITCHER_CONFIGURATION_EMS:
ems_processes_id = None
try:
ems_processes_id = self._get_service_processes_resource()
except (ProcessNotFound, HTTPNotFound):
pass
try:
if self.is_visible_by_user(ems_processes_id, process_id, request):
# override to allow deletion of process if accessible by user regardless of 'visibility' setting
visibility = None
# search for resource, skip cleanup with raised http if not found
process_res_id = self._find_resource_id(ems_processes_id, process_id)
# delete top-resource, magpie should automatically handle deletion of all sub-resources/permissions
path = '{host}/resources/{id}'.format(host=self.magpie_url, id=process_res_id)
resp = requests.delete(path, cookies=self.magpie_admin_token,
headers=self.json_headers, verify=self.twitcher_ssl_verify)
resp.raise_for_status()
except HTTPNotFound:
# If for any reason the resource that we want to delete does not exist silently ignore it
pass
return self.default_process_store.delete_process(process_id, visibility=visibility, request=request)
def list_processes(self, visibility=None, request=None):
# type: (Optional[bool], Optional[requests.Request]) -> List[Process]
"""
List publicly visible processes according to the requesting user's user/group permissions.
Delegate execution to default twitcher process store to retrieve visibility values per process.
If twitcher is not in EMS mode, filter by only visible processes using specified ``visibility``.
If twitcher is in EMS mode, filter processes according to magpie user and inherited group permissions.
"""
visibility_filter = visibility if self.twitcher_config != TWITCHER_CONFIGURATION_EMS else visibility_values
processes = self.default_process_store.list_processes(visibility=visibility_filter, request=request)
if self.twitcher_config == TWITCHER_CONFIGURATION_EMS:
try:
ems_processes_id = self._get_service_processes_resource()
processes = list(filter(lambda p: self.is_visible_by_user(ems_processes_id, p.id, request), processes))
except KeyError:
raise ProcessNotFound("Failed retrieving processes read permissions for listing.")
except Exception as ex:
LOGGER.debug("Exception during processes listing: [{}]".format(repr(ex)))
raise
LOGGER.debug("Found visible processes: {!s}.".format([process.id for process in processes]))
return processes
def fetch_by_id(self, process_id, visibility=None, request=None):
# type: (AnyStr, Optional[AnyStr], Optional[requests.Request]) -> Union[Process, None]
"""
Get a process if visible for user.
Delegate operation to default twitcher process store.
If twitcher is in EMS mode:
- using twitcher proxy, magpie user/group permissions on corresponding resource (/ems/processes/{process_id})
will automatically handle Ok/Unauthorized responses using the API route's read access.
- ignore passed `visibility` if any and infer it from magpie user/group permissions instead
"""
if self.twitcher_config == TWITCHER_CONFIGURATION_EMS and visibility:
ems_processes_id = self._get_service_processes_resource()
# override to allow retrieval of process if accessible by user regardless of 'visibility' setting
if self.is_visible_by_user(ems_processes_id, process_id, request):
visibility = None
return self.default_process_store.fetch_by_id(process_id, visibility=visibility, request=request)
def is_admin_user(self, request):
# type: (requests.Request) -> bool
"""Verifies if a user has administrator level permissions."""
resp = requests.get('{host}/session'.format(host=self.magpie_url), cookies=request.cookies,
headers=self.json_headers, verify=self.twitcher_ssl_verify)
return self.magpie_admin_group in resp.json().get('user', {}).get('group_names', [])
def is_visible_by_user(self, ems_processes_id, process_id, request):
# type: (int, AnyStr, requests.Request) -> bool
"""
Verifies if the user (according to cookies from the request) can see a given process by id.
:param ems_processes_id: Magpie resource id corresponding to '/ems/processes'.
:param process_id: identifier of the WPS process.
:param request: request to identify which user the visibility permissions has to be tested for.
"""
# if the resource cannot be found, permissions are definitely not set, so it is not visible
try:
process_res_id = self._find_resource_id(ems_processes_id, process_id)
except HTTPNotFound:
return False
# use inherited flag to consider both user and group permissions on the resource
# use effective flag to ensure that permissions set on parent resources are cascaded down onto this process
path = '{host}/users/{usr}/resources/{res}/permissions?inherit=true&effective=true' \
.format(host=self.magpie_url, usr=self.magpie_current, res=process_res_id)
resp = requests.get(path, cookies=request.cookies, headers=self.json_headers, verify=self.twitcher_ssl_verify)
resp.raise_for_status()
perms = resp.json()['permission_names']
# user/group must have inherited read/read-match permissions, or
# be an administrator even if permissions are not set explicitly
if 'read' not in perms and 'read-match' not in perms:
return self.is_admin_user(request)
return True
def get_visibility(self, process_id, request=None):
# type: (AnyStr, Optional[requests.Request]) -> AnyStr
"""
Get visibility of a process.
Delegate operation to default twitcher process store.
If twitcher is in EMS mode:
using twitcher proxy, only allowed users/groups can read '/ems/processes/{process_id}/visibility'
any other level user will get unauthorized on this route
"""
return self.default_process_store.get_visibility(process_id, request=request)
def set_visibility(self, process_id, visibility, request=None):
# type: (AnyStr, AnyStr, Optional[requests.Request]) -> None
"""
Set visibility of a process.
Delegate change of process visibility to default twitcher process store.
If twitcher is in EMS mode:
using twitcher proxy, only allowed users/groups can write to '/ems/processes/{process_id}/visibility'
modify magpie permissions of corresponding process access points according to desired visibility.
"""
if self.twitcher_config == TWITCHER_CONFIGURATION_EMS:
try:
# find resources corresponding to each route part of '/ems/processes/{id}/[jobs|quotations]'
ems_processes_id = self._get_service_processes_resource()
process_res_id = self._find_resource_id(ems_processes_id, process_id)
jobs_res_id = self._find_resource_id(process_res_id, 'jobs')
quotes_res_id = self._find_resource_id(process_res_id, 'quotations')
groups = [self.magpie_users, self.magpie_editors]
if visibility == VISIBILITY_PRIVATE:
# remove write-match permissions of groups on the process, cannot execute POST /jobs
self._delete_resource_permissions(jobs_res_id, u'write-match', group_names=groups)
# remove write permissions of groups, cannot request POST /quotations & /quotations/{id}
self._delete_resource_permissions(quotes_res_id, u'write', group_names=groups)
# remove group read permissions on the process, cannot GET any info from it, not even see it in list
self._delete_resource_permissions(process_res_id, u'read', group_names=groups)
elif visibility == VISIBILITY_PUBLIC:
# read permission to groups to allow any sub-route GET requests (ex: '/ems/processes/{id}/jobs')
self._create_resource_permissions(process_res_id, u'read', group_names=groups)
# write permissions to groups to allow request POST /quotations & /quotations/{id}
self._create_resource_permissions(quotes_res_id, u'write', group_names=groups)
# write-match group permission so they can ONLY execute a job (cannot DELETE process, job, etc.)
self._create_resource_permissions(jobs_res_id, u'write-match', group_names=groups)
except HTTPNotFound:
raise ProcessNotFound("Could not find process `{}` jobs resource to set visibility.".format(process_id))
except Exception as ex:
LOGGER.debug("Exception when trying to set process visibility: [{}]".format(repr(ex)))
raise
# update visibility of process, which will also reflect changes to route permissions during 'list_processes'
self.default_process_store.set_visibility(process_id, visibility=visibility, request=request)
# noinspection PyMethodMayBeStatic, PyUnusedLocal
def clear_processes(self, request=None):
raise_http(httpError=HTTPNotImplemented, detail="Clear processes not supported via MagpieAdapter.")