-
-
Notifications
You must be signed in to change notification settings - Fork 2.7k
/
fscache.py
285 lines (245 loc) · 10.3 KB
/
fscache.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
"""Interface for accessing the file system with automatic caching.
The idea is to cache the results of any file system state reads during
a single transaction. This has two main benefits:
* This avoids redundant syscalls, as we won't perform the same OS
operations multiple times.
* This makes it easier to reason about concurrent FS updates, as different
operations targeting the same paths can't report different state during
a transaction.
Note that this only deals with reading state, not writing.
Properties maintained by the API:
* The contents of the file are always from the same or later time compared
to the reported mtime of the file, even if mtime is queried after reading
a file.
* Repeating an operation produces the same result as the first one during
a transaction.
* Call flush() to start a new transaction (flush the caches).
The API is a bit limited. It's easy to add new cached operations, however.
You should perform all file system reads through the API to actually take
advantage of the benefits.
"""
import os
import stat
from typing import Dict, List, Set
from mypy.util import hash_digest
from mypy_extensions import mypyc_attr
@mypyc_attr(allow_interpreted_subclasses=True) # for tests
class FileSystemCache:
def __init__(self) -> None:
# The package root is not flushed with the caches.
# It is set by set_package_root() below.
self.package_root = [] # type: List[str]
self.flush()
def set_package_root(self, package_root: List[str]) -> None:
self.package_root = package_root
def flush(self) -> None:
"""Start another transaction and empty all caches."""
self.stat_cache = {} # type: Dict[str, os.stat_result]
self.stat_error_cache = {} # type: Dict[str, OSError]
self.listdir_cache = {} # type: Dict[str, List[str]]
self.listdir_error_cache = {} # type: Dict[str, OSError]
self.isfile_case_cache = {} # type: Dict[str, bool]
self.read_cache = {} # type: Dict[str, bytes]
self.read_error_cache = {} # type: Dict[str, Exception]
self.hash_cache = {} # type: Dict[str, str]
self.fake_package_cache = set() # type: Set[str]
def stat(self, path: str) -> os.stat_result:
if path in self.stat_cache:
return self.stat_cache[path]
if path in self.stat_error_cache:
raise copy_os_error(self.stat_error_cache[path])
try:
st = os.stat(path)
except OSError as err:
if self.init_under_package_root(path):
try:
return self._fake_init(path)
except OSError:
pass
# Take a copy to get rid of associated traceback and frame objects.
# Just assigning to __traceback__ doesn't free them.
self.stat_error_cache[path] = copy_os_error(err)
raise err
self.stat_cache[path] = st
return st
def init_under_package_root(self, path: str) -> bool:
"""Is this path an __init__.py under a package root?
This is used to detect packages that don't contain __init__.py
files, which is needed to support Bazel. The function should
only be called for non-existing files.
It will return True if it refers to a __init__.py file that
Bazel would create, so that at runtime Python would think the
directory containing it is a package. For this to work you
must pass one or more package roots using the --package-root
flag.
As an exceptional case, any directory that is a package root
itself will not be considered to contain a __init__.py file.
This is different from the rules Bazel itself applies, but is
necessary for mypy to properly distinguish packages from other
directories.
See https://docs.bazel.build/versions/master/be/python.html,
where this behavior is described under legacy_create_init.
"""
if not self.package_root:
return False
dirname, basename = os.path.split(path)
if basename != '__init__.py':
return False
try:
st = self.stat(dirname)
except OSError:
return False
else:
if not stat.S_ISDIR(st.st_mode):
return False
ok = False
drive, path = os.path.splitdrive(path) # Ignore Windows drive name
if os.path.isabs(path):
path = os.path.relpath(path)
path = os.path.normpath(path)
for root in self.package_root:
if path.startswith(root):
if path == root + basename:
# A package root itself is never a package.
ok = False
break
else:
ok = True
return ok
def _fake_init(self, path: str) -> os.stat_result:
"""Prime the cache with a fake __init__.py file.
This makes code that looks for path believe an empty file by
that name exists. Should only be called after
init_under_package_root() returns True.
"""
dirname, basename = os.path.split(path)
assert basename == '__init__.py', path
assert not os.path.exists(path), path # Not cached!
dirname = os.path.normpath(dirname)
st = self.stat(dirname) # May raise OSError
# Get stat result as a sequence so we can modify it.
# (Alas, typeshed's os.stat_result is not a sequence yet.)
tpl = tuple(st) # type: ignore[arg-type, var-annotated]
seq = list(tpl) # type: List[float]
seq[stat.ST_MODE] = stat.S_IFREG | 0o444
seq[stat.ST_INO] = 1
seq[stat.ST_NLINK] = 1
seq[stat.ST_SIZE] = 0
tpl = tuple(seq)
st = os.stat_result(tpl)
self.stat_cache[path] = st
# Make listdir() and read() also pretend this file exists.
self.fake_package_cache.add(dirname)
return st
def listdir(self, path: str) -> List[str]:
path = os.path.normpath(path)
if path in self.listdir_cache:
res = self.listdir_cache[path]
# Check the fake cache.
if path in self.fake_package_cache and '__init__.py' not in res:
res.append('__init__.py') # Updates the result as well as the cache
return res
if path in self.listdir_error_cache:
raise copy_os_error(self.listdir_error_cache[path])
try:
results = os.listdir(path)
except OSError as err:
# Like above, take a copy to reduce memory use.
self.listdir_error_cache[path] = copy_os_error(err)
raise err
self.listdir_cache[path] = results
# Check the fake cache.
if path in self.fake_package_cache and '__init__.py' not in results:
results.append('__init__.py')
return results
def isfile(self, path: str) -> bool:
try:
st = self.stat(path)
except OSError:
return False
return stat.S_ISREG(st.st_mode)
def isfile_case(self, path: str, prefix: str) -> bool:
"""Return whether path exists and is a file.
On case-insensitive filesystems (like Mac or Windows) this returns
False if the case of path's last component does not exactly match
the case found in the filesystem.
We check also the case of other path components up to prefix.
For example, if path is 'user-stubs/pack/mod.pyi' and prefix is 'user-stubs',
we check that the case of 'pack' and 'mod.py' matches exactly, 'user-stubs' will be
case insensitive on case insensitive filesystems.
The caller must ensure that prefix is a valid file system prefix of path.
"""
if path in self.isfile_case_cache:
return self.isfile_case_cache[path]
head, tail = os.path.split(path)
if not tail:
res = False
else:
try:
names = self.listdir(head)
# This allows one to check file name case sensitively in
# case-insensitive filesystems.
res = tail in names and self.isfile(path)
except OSError:
res = False
# Also check the other path components in case sensitive way.
head, dir = os.path.split(head)
while res and head and dir and head.startswith(prefix):
try:
res = dir in self.listdir(head)
except OSError:
res = False
head, dir = os.path.split(head)
self.isfile_case_cache[path] = res
return res
def isdir(self, path: str) -> bool:
try:
st = self.stat(path)
except OSError:
return False
return stat.S_ISDIR(st.st_mode)
def exists(self, path: str) -> bool:
try:
self.stat(path)
except FileNotFoundError:
return False
return True
def read(self, path: str) -> bytes:
if path in self.read_cache:
return self.read_cache[path]
if path in self.read_error_cache:
raise self.read_error_cache[path]
# Need to stat first so that the contents of file are from no
# earlier instant than the mtime reported by self.stat().
self.stat(path)
dirname, basename = os.path.split(path)
dirname = os.path.normpath(dirname)
# Check the fake cache.
if basename == '__init__.py' and dirname in self.fake_package_cache:
data = b''
else:
try:
with open(path, 'rb') as f:
data = f.read()
except OSError as err:
self.read_error_cache[path] = err
raise
self.read_cache[path] = data
self.hash_cache[path] = hash_digest(data)
return data
def hash_digest(self, path: str) -> str:
if path not in self.hash_cache:
self.read(path)
return self.hash_cache[path]
def samefile(self, f1: str, f2: str) -> bool:
s1 = self.stat(f1)
s2 = self.stat(f2)
return os.path.samestat(s1, s2)
def copy_os_error(e: OSError) -> OSError:
new = OSError(*e.args)
new.errno = e.errno
new.strerror = e.strerror
new.filename = e.filename
if e.filename2:
new.filename2 = e.filename2
return new