-
-
Notifications
You must be signed in to change notification settings - Fork 211
/
cache.py
250 lines (209 loc) · 8.51 KB
/
cache.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
"""Helpers """
import errno
import hashlib
import json
import logging
import os
import os.path
import sys
from hashlib import md5
from typing import (
Callable,
Dict,
Hashable,
Iterable,
Optional,
TypeVar,
Union,
cast,
)
from filelock import FileLock
import requests
LOG = logging.getLogger(__name__)
_DID_LOG_UNABLE_TO_CACHE = False
T = TypeVar("T") # pylint: disable=invalid-name
def get_pkg_unique_identifier() -> str:
"""
Generate an identifier unique to the python version, tldextract version, and python instance
This will prevent interference between virtualenvs and issues that might arise when installing
a new version of tldextract
"""
try:
# pylint: disable=import-outside-toplevel
from tldextract._version import version
except ImportError:
version = "dev"
tldextract_version = "tldextract-" + version
python_env_name = os.path.basename(sys.prefix)
# just to handle the edge case of two identically named python environments
python_binary_path_short_hash = hashlib.md5(sys.prefix.encode("utf-8")).hexdigest()[
:6
]
python_version = ".".join([str(v) for v in sys.version_info[:-1]])
identifier_parts = [
python_version,
python_env_name,
python_binary_path_short_hash,
tldextract_version,
]
pkg_identifier = "__".join(identifier_parts)
return pkg_identifier
def get_cache_dir() -> str:
"""
Get a cache dir that we have permission to write to
Try to follow the XDG standard, but if that doesn't work fallback to the package directory
http://specifications.freedesktop.org/basedir-spec/basedir-spec-latest.html
"""
cache_dir = os.environ.get("TLDEXTRACT_CACHE", None)
if cache_dir is not None:
return cache_dir
xdg_cache_home = os.getenv("XDG_CACHE_HOME", None)
if xdg_cache_home is None:
user_home = os.getenv("HOME", None)
if user_home:
xdg_cache_home = os.path.join(user_home, ".cache")
if xdg_cache_home is not None:
return os.path.join(
xdg_cache_home, "python-tldextract", get_pkg_unique_identifier()
)
# fallback to trying to use package directory itself
return os.path.join(os.path.dirname(__file__), ".suffix_cache/")
class DiskCache:
"""Disk _cache that only works for jsonable values"""
def __init__(self, cache_dir: Optional[str], lock_timeout: int = 20):
self.enabled = bool(cache_dir)
self.cache_dir = os.path.expanduser(str(cache_dir) or "")
self.lock_timeout = lock_timeout
# using a unique extension provides some safety that an incorrectly set cache_dir
# combined with a call to `.clear()` wont wipe someones hard drive
self.file_ext = ".tldextract.json"
def get(self, namespace: str, key: Union[str, Dict[str, Hashable]]) -> object:
"""Retrieve a value from the disk cache"""
if not self.enabled:
raise KeyError("Cache is disabled")
cache_filepath = self._key_to_cachefile_path(namespace, key)
if not os.path.isfile(cache_filepath):
raise KeyError("namespace: " + namespace + " key: " + repr(key))
try:
# pylint: disable-next=unspecified-encoding
with open(cache_filepath) as cache_file:
return json.load(cache_file)
except (OSError, ValueError) as exc:
LOG.error("error reading TLD cache file %s: %s", cache_filepath, exc)
raise KeyError("namespace: " + namespace + " key: " + repr(key)) from None
def set(
self, namespace: str, key: Union[str, Dict[str, Hashable]], value: T
) -> None:
"""Set a value in the disk cache"""
if not self.enabled:
return
cache_filepath = self._key_to_cachefile_path(namespace, key)
try:
_make_dir(cache_filepath)
# pylint: disable-next=unspecified-encoding
with open(cache_filepath, "w") as cache_file:
json.dump(value, cache_file)
except OSError as ioe:
global _DID_LOG_UNABLE_TO_CACHE # pylint: disable=global-statement
if not _DID_LOG_UNABLE_TO_CACHE:
LOG.warning(
"unable to cache %s.%s in %s. This could refresh the "
"Public Suffix List over HTTP every app startup. "
"Construct your `TLDExtract` with a writable `cache_dir` or "
"set `cache_dir=None` to silence this warning. %s",
namespace,
key,
cache_filepath,
ioe,
)
_DID_LOG_UNABLE_TO_CACHE = True
def clear(self) -> None:
"""Clear the disk cache"""
for root, _, files in os.walk(self.cache_dir):
for filename in files:
if filename.endswith(self.file_ext) or filename.endswith(
self.file_ext + ".lock"
):
try:
os.unlink(os.path.join(root, filename))
except FileNotFoundError:
pass
except OSError as exc:
# errno.ENOENT == "No such file or directory"
# https://docs.python.org/2/library/errno.html#errno.ENOENT
if exc.errno != errno.ENOENT:
raise
def _key_to_cachefile_path(
self, namespace: str, key: Union[str, Dict[str, Hashable]]
) -> str:
namespace_path = os.path.join(self.cache_dir, namespace)
hashed_key = _make_cache_key(key)
cache_path = os.path.join(namespace_path, hashed_key + self.file_ext)
return cache_path
def run_and_cache(
self,
func: Callable[..., T],
namespace: str,
kwargs: Dict[str, Hashable],
hashed_argnames: Iterable[str],
) -> T:
"""Get a url but cache the response"""
if not self.enabled:
return func(**kwargs)
key_args = {k: v for k, v in kwargs.items() if k in hashed_argnames}
cache_filepath = self._key_to_cachefile_path(namespace, key_args)
lock_path = cache_filepath + ".lock"
try:
_make_dir(cache_filepath)
except OSError as ioe:
global _DID_LOG_UNABLE_TO_CACHE # pylint: disable=global-statement
if not _DID_LOG_UNABLE_TO_CACHE:
LOG.warning(
"unable to cache %s.%s in %s. This could refresh the "
"Public Suffix List over HTTP every app startup. "
"Construct your `TLDExtract` with a writable `cache_dir` or "
"set `cache_dir=None` to silence this warning. %s",
namespace,
key_args,
cache_filepath,
ioe,
)
_DID_LOG_UNABLE_TO_CACHE = True
return func(**kwargs)
# Disable lint of 3rd party (see also https://github.com/tox-dev/py-filelock/issues/102)
# pylint: disable-next=abstract-class-instantiated
with FileLock(lock_path, timeout=self.lock_timeout):
try:
result = cast(T, self.get(namespace=namespace, key=key_args))
except KeyError:
result = func(**kwargs)
self.set(namespace=namespace, key=key_args, value=result)
return result
def cached_fetch_url(
self, session: requests.Session, url: str, timeout: Union[float, int, None]
) -> str:
"""Get a url but cache the response"""
return self.run_and_cache(
func=_fetch_url,
namespace="urls",
kwargs={"session": session, "url": url, "timeout": timeout},
hashed_argnames=["url"],
)
def _fetch_url(session: requests.Session, url: str, timeout: Optional[int]) -> str:
response = session.get(url, timeout=timeout)
response.raise_for_status()
text = response.text
if not isinstance(text, str):
text = str(text, "utf-8")
return text
def _make_cache_key(inputs: Union[str, Dict[str, Hashable]]) -> str:
key = repr(inputs)
return md5(key.encode("utf8")).hexdigest()
def _make_dir(filename: str) -> None:
"""Make a directory if it doesn't already exist"""
if not os.path.exists(os.path.dirname(filename)):
try:
os.makedirs(os.path.dirname(filename))
except OSError as exc: # Guard against race condition
if exc.errno != errno.EEXIST:
raise