forked from python/importlib_resources
-
Notifications
You must be signed in to change notification settings - Fork 0
/
_common.py
116 lines (92 loc) · 3.05 KB
/
_common.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
import os
import pathlib
import tempfile
import functools
import contextlib
import types
import importlib
from typing import Union, Any, Optional
from .abc import ResourceReader, Traversable
from ._compat import wrap_spec
Package = Union[types.ModuleType, str]
Resource = Union[str, os.PathLike]
def files(package):
# type: (Package) -> Traversable
"""
Get a Traversable resource from a package
"""
return from_package(get_package(package))
def normalize_path(path):
# type: (Any) -> str
"""Normalize a path by ensuring it is a string.
If the resulting string contains path separators, an exception is raised.
"""
str_path = str(path)
parent, file_name = os.path.split(str_path)
if parent:
raise ValueError(f'{path!r} must be only a file name')
return file_name
def get_resource_reader(package):
# type: (types.ModuleType) -> Optional[ResourceReader]
"""
Return the package's loader if it's a ResourceReader.
"""
# We can't use
# a issubclass() check here because apparently abc.'s __subclasscheck__()
# hook wants to create a weak reference to the object, but
# zipimport.zipimporter does not support weak references, resulting in a
# TypeError. That seems terrible.
spec = package.__spec__
reader = getattr(spec.loader, 'get_resource_reader', None) # type: ignore
if reader is None:
return None
return reader(spec.name) # type: ignore
def resolve(cand):
# type: (Package) -> types.ModuleType
return cand if isinstance(cand, types.ModuleType) else importlib.import_module(cand)
def get_package(package):
# type: (Package) -> types.ModuleType
"""Take a package name or module object and return the module.
Raise an exception if the resolved module is not a package.
"""
resolved = resolve(package)
if wrap_spec(resolved).submodule_search_locations is None:
raise TypeError(f'{package!r} is not a package')
return resolved
def from_package(package):
"""
Return a Traversable object for the given package.
"""
spec = wrap_spec(package)
reader = spec.loader.get_resource_reader(spec.name)
return reader.files()
@contextlib.contextmanager
def _tempfile(reader, suffix=''):
# Not using tempfile.NamedTemporaryFile as it leads to deeper 'try'
# blocks due to the need to close the temporary file to work on Windows
# properly.
fd, raw_path = tempfile.mkstemp(suffix=suffix)
try:
os.write(fd, reader())
os.close(fd)
del reader
yield pathlib.Path(raw_path)
finally:
try:
os.remove(raw_path)
except (FileNotFoundError, PermissionError):
pass
@functools.singledispatch
def as_file(path):
"""
Given a Traversable object, return that object as a
path on the local file system in a context manager.
"""
return _tempfile(path.read_bytes, suffix=path.name)
@as_file.register(pathlib.Path)
@contextlib.contextmanager
def _(path):
"""
Degenerate behavior for pathlib.Path objects.
"""
yield path