/
cache.py
83 lines (61 loc) · 2.29 KB
/
cache.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
"""Caching of formatted files with feature-based invalidation."""
import os
import pickle
from pathlib import Path
import tempfile
from typing import Dict, Iterable, Set, Tuple
from platformdirs import user_cache_dir
from black.mode import Mode
from _black_version import version as __version__
# types
Timestamp = float
FileSize = int
CacheInfo = Tuple[Timestamp, FileSize]
Cache = Dict[str, CacheInfo]
CACHE_DIR = Path(user_cache_dir("black", version=__version__))
def read_cache(mode: Mode) -> Cache:
"""Read the cache if it exists and is well formed.
If it is not well formed, the call to write_cache later should resolve the issue.
"""
cache_file = get_cache_file(mode)
if not cache_file.exists():
return {}
with cache_file.open("rb") as fobj:
try:
cache: Cache = pickle.load(fobj)
except (pickle.UnpicklingError, ValueError, IndexError):
return {}
return cache
def get_cache_file(mode: Mode) -> Path:
return CACHE_DIR / f"cache.{mode.get_cache_key()}.pickle"
def get_cache_info(path: Path) -> CacheInfo:
"""Return the information used to check if a file is already formatted or not."""
stat = path.stat()
return stat.st_mtime, stat.st_size
def filter_cached(cache: Cache, sources: Iterable[Path]) -> Tuple[Set[Path], Set[Path]]:
"""Split an iterable of paths in `sources` into two sets.
The first contains paths of files that modified on disk or are not in the
cache. The other contains paths to non-modified files.
"""
todo, done = set(), set()
for src in sources:
res_src = src.resolve()
if cache.get(str(res_src)) != get_cache_info(res_src):
todo.add(src)
else:
done.add(src)
return todo, done
def write_cache(cache: Cache, sources: Iterable[Path], mode: Mode) -> None:
"""Update the cache file."""
cache_file = get_cache_file(mode)
try:
CACHE_DIR.mkdir(parents=True, exist_ok=True)
new_cache = {
**cache,
**{str(src.resolve()): get_cache_info(src) for src in sources},
}
with tempfile.NamedTemporaryFile(dir=str(cache_file.parent), delete=False) as f:
pickle.dump(new_cache, f, protocol=4)
os.replace(f.name, cache_file)
except OSError:
pass