-
Notifications
You must be signed in to change notification settings - Fork 3.3k
/
login.py
202 lines (168 loc) · 7.32 KB
/
login.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
import base64
import json
import os
import pathlib
from dataclasses import dataclass
from enum import Enum
from time import sleep
from typing import Optional
from urllib.parse import urlencode
import click
import requests
import uvicorn
from fastapi import FastAPI, Query, Request
from starlette.background import BackgroundTask
from starlette.responses import RedirectResponse
from lightning_app.core.constants import get_lightning_cloud_url, LIGHTNING_CREDENTIAL_PATH
from lightning_app.utilities.app_helpers import Logger
from lightning_app.utilities.network import find_free_network_port
logger = Logger(__name__)
class Keys(Enum):
USERNAME = "LIGHTNING_USERNAME"
USER_ID = "LIGHTNING_USER_ID"
API_KEY = "LIGHTNING_API_KEY"
@property
def suffix(self):
return self.value.lstrip("LIGHTNING_").lower()
@dataclass
class Auth:
username: Optional[str] = None
user_id: Optional[str] = None
api_key: Optional[str] = None
secrets_file = pathlib.Path(LIGHTNING_CREDENTIAL_PATH)
def __post_init__(self):
for key in Keys:
setattr(self, key.suffix, os.environ.get(key.value, None))
self._with_env_var = bool(self.user_id and self.api_key) # used by authenticate method
if self._with_env_var:
self.save("", self.user_id, self.api_key, self.user_id)
logger.info("Credentials loaded from environment variables")
elif self.api_key or self.user_id:
raise ValueError(
"To use env vars for authentication both "
f"{Keys.USER_ID.value} and {Keys.API_KEY.value} should be set."
)
def load(self) -> bool:
"""Load credentials from disk and update properties with credentials.
Returns
----------
True if credentials are available.
"""
if not self.secrets_file.exists():
logger.debug("Credentials file not found.")
return False
with self.secrets_file.open() as creds:
credentials = json.load(creds)
for key in Keys:
setattr(self, key.suffix, credentials.get(key.suffix, None))
return True
def save(self, token: str = "", user_id: str = "", api_key: str = "", username: str = "") -> None:
"""save credentials to disk."""
self.secrets_file.parent.mkdir(exist_ok=True, parents=True)
with self.secrets_file.open("w") as f:
json.dump(
{
f"{Keys.USERNAME.suffix}": username,
f"{Keys.USER_ID.suffix}": user_id,
f"{Keys.API_KEY.suffix}": api_key,
},
f,
)
self.username = username
self.user_id = user_id
self.api_key = api_key
logger.debug("credentials saved successfully")
@classmethod
def clear(cls) -> None:
"""remove credentials from disk and env variables."""
if cls.secrets_file.exists():
cls.secrets_file.unlink()
for key in Keys:
os.environ.pop(key.value, None)
logger.debug("credentials removed successfully")
@property
def auth_header(self) -> Optional[str]:
"""authentication header used by lightning-cloud client."""
if self.api_key:
token = f"{self.user_id}:{self.api_key}"
return f"Basic {base64.b64encode(token.encode('ascii')).decode('ascii')}" # E501
raise AttributeError(
"Authentication Failed, no authentication header available. "
"This is most likely a bug in the LightningCloud Framework"
)
def _run_server(self) -> None:
"""start a server to complete authentication."""
AuthServer().login_with_browser(self)
def authenticate(self) -> Optional[str]:
"""Performs end to end authentication flow.
Returns
----------
authorization header to use when authentication completes.
"""
if self._with_env_var:
logger.debug("successfully loaded credentials from env")
return self.auth_header
if not self.load():
logger.debug("failed to load credentials, opening browser to get new.")
self._run_server()
return self.auth_header
elif self.user_id and self.api_key:
return self.auth_header
raise ValueError(
"We couldn't find any credentials linked to your account. "
"Please try logging in using the CLI command `lightning login`"
)
class AuthServer:
@staticmethod
def get_auth_url(port: int) -> str:
redirect_uri = f"http://localhost:{port}/login-complete"
params = urlencode(dict(redirectTo=redirect_uri))
return f"{get_lightning_cloud_url()}/sign-in?{params}"
def login_with_browser(self, auth: Auth) -> None:
app = FastAPI()
port = find_free_network_port()
url = self.get_auth_url(port)
try:
# check if server is reachable or catch any network errors
requests.head(url)
except requests.ConnectionError as e:
raise requests.ConnectionError(
f"No internet connection available. Please connect to a stable internet connection \n{e}" # E501
)
except requests.RequestException as e:
raise requests.RequestException(
f"An error occurred with the request. Please report this issue to Lightning Team \n{e}" # E501
)
logger.info(
"\nAttempting to automatically open the login page in your default browser.\n"
'If the browser does not open, navigate to the "Keys" tab on your Lightning AI profile page:\n\n'
f"{get_lightning_cloud_url()}/me/keys\n\n"
'Copy the "Headless CLI Login" command, and execute it in your terminal.\n'
)
click.launch(url)
@app.get("/login-complete")
async def save_token(request: Request, token="", key="", user_id: str = Query("", alias="userID")):
async def stop_server_once_request_is_done():
while not await request.is_disconnected():
sleep(0.25)
server.should_exit = True
if not token:
logger.warn(
"Login Failed. This is most likely because you're using an older version of the CLI. \n" # noqa E501
"Please try to update the CLI or open an issue with this information \n" # E501
f"expected token in {request.query_params.items()}"
)
return RedirectResponse(
url=f"{get_lightning_cloud_url()}/cli-login-failed",
background=BackgroundTask(stop_server_once_request_is_done),
)
auth.save(token=token, username=user_id, user_id=user_id, api_key=key)
logger.info("Login Successful")
# Include the credentials in the redirect so that UI will also be logged in
params = urlencode(dict(token=token, key=key, userID=user_id))
return RedirectResponse(
url=f"{get_lightning_cloud_url()}/cli-login-successful?{params}",
background=BackgroundTask(stop_server_once_request_is_done),
)
server = uvicorn.Server(config=uvicorn.Config(app, port=port, log_level="error"))
server.run()