-
Notifications
You must be signed in to change notification settings - Fork 360
/
generic.py
172 lines (140 loc) · 5.74 KB
/
generic.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
"""
A JupyterHub authenticator class for use with any OAuth2 based identity provider.
"""
import os
from functools import reduce
from jupyterhub.auth import LocalAuthenticator
from jupyterhub.traitlets import Callable
from tornado.httpclient import AsyncHTTPClient
from traitlets import Bool, Dict, Set, Unicode, Union, default
from .oauth2 import OAuthenticator
class GenericOAuthenticator(OAuthenticator):
@default("login_service")
def _login_service_default(self):
return os.environ.get("LOGIN_SERVICE", "OAuth 2.0")
claim_groups_key = Union(
[Unicode(os.environ.get('OAUTH2_GROUPS_KEY', 'groups')), Callable()],
config=True,
help="""
Userdata groups claim key from returned json for USERDATA_URL.
Can be a string key name (use periods for nested keys), or a callable
that accepts the returned json (as a dict) and returns the groups list.
This configures how group membership in the upstream provider is determined
for use by `allowed_groups`, `admin_groups`, etc. If `manage_groups` is True,
this will also determine users' _JupyterHub_ group membership.
""",
)
allowed_groups = Set(
Unicode(),
config=True,
help="""
Allow members of selected groups to sign in.
When configuring this you may need to configure `claim_groups_key` as
well as it determines the key in the `userdata_url` response that is
assumed to list the groups a user is a member of.
""",
)
admin_groups = Set(
Unicode(),
config=True,
help="""
Allow members of selected groups to sign in and consider them as
JupyterHub admins.
If this is set and a user isn't part of one of these groups or listed in
`admin_users`, a user signing in will have their admin status revoked.
When configuring this you may need to configure `claim_groups_key` as
well as it determines the key in the `userdata_url` response that is
assumed to list the groups a user is a member of.
""",
)
@default("http_client")
def _default_http_client(self):
return AsyncHTTPClient(
force_instance=True, defaults=dict(validate_cert=self.validate_server_cert)
)
# _deprecated_oauth_aliases is used by deprecation logic in OAuthenticator
_deprecated_oauth_aliases = {
"username_key": ("username_claim", "16.0.0"),
"extra_params": ("token_params", "16.0.0"),
"tls_verify": ("validate_server_cert", "16.0.2"),
**OAuthenticator._deprecated_oauth_aliases,
}
username_key = Union(
[Unicode(), Callable()],
config=True,
help="""
.. deprecated:: 16.0
Use :attr:`username_claim`.
""",
)
extra_params = Dict(
config=True,
help="""
.. deprecated:: 16.0
Use :attr:`token_params`.
""",
)
tls_verify = Bool(
config=True,
help="""
.. deprecated:: 16.0
Use :attr:`validate_server_cert`.
""",
)
def get_user_groups(self, user_info):
"""
Returns a set of groups the user belongs to based on claim_groups_key
and provided user_info.
- If claim_groups_key is a callable, it is meant to return the groups
directly.
- If claim_groups_key is a nested dictionary key like
"permissions.groups", this function returns
user_info["permissions"]["groups"].
Note that this method is introduced by GenericOAuthenticator and not
present in the base class.
"""
if callable(self.claim_groups_key):
return set(self.claim_groups_key(user_info))
try:
return set(reduce(dict.get, self.claim_groups_key.split("."), user_info))
except TypeError:
self.log.error(
f"The claim_groups_key {self.claim_groups_key} does not exist in the user token"
)
return set()
async def update_auth_model(self, auth_model):
"""
Sets admin status to True or False if `admin_groups` is configured and
the user isn't part of `admin_users` or `admin_groups`. Note that
leaving it at None makes users able to retain an admin status while
setting it to False makes it be revoked.
Also populates groups if `manage_groups` is set.
"""
if self.manage_groups or self.admin_groups:
user_info = auth_model["auth_state"][self.user_auth_state_key]
user_groups = self.get_user_groups(user_info)
if self.manage_groups:
auth_model["groups"] = sorted(user_groups)
if auth_model["admin"]:
# auth_model["admin"] being True means the user was in admin_users
return auth_model
if self.admin_groups:
# admin status should in this case be True or False, not None
auth_model["admin"] = bool(user_groups & self.admin_groups)
return auth_model
async def check_allowed(self, username, auth_model):
"""
Overrides the OAuthenticator.check_allowed to also allow users part of
`allowed_groups`.
"""
if await super().check_allowed(username, auth_model):
return True
if self.allowed_groups:
user_info = auth_model["auth_state"][self.user_auth_state_key]
user_groups = self.get_user_groups(user_info)
if any(user_groups & self.allowed_groups):
return True
# users should be explicitly allowed via config, otherwise they aren't
return False
class LocalGenericOAuthenticator(LocalAuthenticator, GenericOAuthenticator):
"""A version that mixes in local system user creation"""