blob: f51886557466c745ccebb69d7d0cc972590f1470 [file] [log] [blame]
Barry Warsawdeae6b42017-12-30 15:18:06 -05001import os
2import tempfile
3
4from . import abc as resources_abc
Barry Warsawdeae6b42017-12-30 15:18:06 -05005from contextlib import contextmanager, suppress
6from importlib import import_module
7from importlib.abc import ResourceLoader
8from io import BytesIO, TextIOWrapper
9from pathlib import Path
10from types import ModuleType
Victor Stinnereb0d3592020-05-01 02:38:00 +020011from typing import Iterable, Iterator, Optional, Union # noqa: F401
Barry Warsawdeae6b42017-12-30 15:18:06 -050012from typing import cast
13from typing.io import BinaryIO, TextIO
Barry Warsawdeae6b42017-12-30 15:18:06 -050014
15
Barry Warsaw0ed66df2018-05-17 11:41:53 -040016__all__ = [
17 'Package',
18 'Resource',
19 'contents',
20 'is_resource',
21 'open_binary',
22 'open_text',
23 'path',
24 'read_binary',
25 'read_text',
26 ]
27
28
Barry Warsawdeae6b42017-12-30 15:18:06 -050029Package = Union[str, ModuleType]
30Resource = Union[str, os.PathLike]
31
32
33def _get_package(package) -> ModuleType:
34 """Take a package name or module object and return the module.
35
36 If a name, the module is imported. If the passed or imported module
37 object is not a package, raise an exception.
38 """
39 if hasattr(package, '__spec__'):
40 if package.__spec__.submodule_search_locations is None:
41 raise TypeError('{!r} is not a package'.format(
42 package.__spec__.name))
43 else:
44 return package
45 else:
46 module = import_module(package)
47 if module.__spec__.submodule_search_locations is None:
48 raise TypeError('{!r} is not a package'.format(package))
49 else:
50 return module
51
52
53def _normalize_path(path) -> str:
54 """Normalize a path by ensuring it is a string.
55
56 If the resulting string contains path separators, an exception is raised.
57 """
Brett Cannon3ab93652018-04-30 11:31:45 -070058 parent, file_name = os.path.split(path)
Barry Warsawdeae6b42017-12-30 15:18:06 -050059 if parent:
60 raise ValueError('{!r} must be only a file name'.format(path))
61 else:
62 return file_name
63
64
65def _get_resource_reader(
66 package: ModuleType) -> Optional[resources_abc.ResourceReader]:
67 # Return the package's loader if it's a ResourceReader. We can't use
68 # a issubclass() check here because apparently abc.'s __subclasscheck__()
69 # hook wants to create a weak reference to the object, but
70 # zipimport.zipimporter does not support weak references, resulting in a
71 # TypeError. That seems terrible.
Barry Warsaw5ec0fee2018-01-15 15:07:11 -080072 spec = package.__spec__
73 if hasattr(spec.loader, 'get_resource_reader'):
74 return cast(resources_abc.ResourceReader,
75 spec.loader.get_resource_reader(spec.name))
Barry Warsawdeae6b42017-12-30 15:18:06 -050076 return None
77
78
Barry Warsawbbbcf862018-02-02 15:15:58 -050079def _check_location(package):
80 if package.__spec__.origin is None or not package.__spec__.has_location:
81 raise FileNotFoundError(f'Package has no location {package!r}')
82
83
Barry Warsawdeae6b42017-12-30 15:18:06 -050084def open_binary(package: Package, resource: Resource) -> BinaryIO:
85 """Return a file-like object opened for binary reading of the resource."""
86 resource = _normalize_path(resource)
87 package = _get_package(package)
88 reader = _get_resource_reader(package)
89 if reader is not None:
90 return reader.open_resource(resource)
Barry Warsawbbbcf862018-02-02 15:15:58 -050091 _check_location(package)
Barry Warsawdeae6b42017-12-30 15:18:06 -050092 absolute_package_path = os.path.abspath(package.__spec__.origin)
93 package_path = os.path.dirname(absolute_package_path)
94 full_path = os.path.join(package_path, resource)
95 try:
Barry Warsaw0ed66df2018-05-17 11:41:53 -040096 return open(full_path, mode='rb')
Barry Warsawdeae6b42017-12-30 15:18:06 -050097 except OSError:
98 # Just assume the loader is a resource loader; all the relevant
99 # importlib.machinery loaders are and an AttributeError for
100 # get_data() will make it clear what is needed from the loader.
101 loader = cast(ResourceLoader, package.__spec__.loader)
102 data = None
103 if hasattr(package.__spec__.loader, 'get_data'):
104 with suppress(OSError):
105 data = loader.get_data(full_path)
106 if data is None:
107 package_name = package.__spec__.name
108 message = '{!r} resource not found in {!r}'.format(
109 resource, package_name)
110 raise FileNotFoundError(message)
111 else:
112 return BytesIO(data)
113
114
115def open_text(package: Package,
116 resource: Resource,
117 encoding: str = 'utf-8',
118 errors: str = 'strict') -> TextIO:
119 """Return a file-like object opened for text reading of the resource."""
120 resource = _normalize_path(resource)
121 package = _get_package(package)
122 reader = _get_resource_reader(package)
123 if reader is not None:
124 return TextIOWrapper(reader.open_resource(resource), encoding, errors)
Barry Warsawbbbcf862018-02-02 15:15:58 -0500125 _check_location(package)
Barry Warsawdeae6b42017-12-30 15:18:06 -0500126 absolute_package_path = os.path.abspath(package.__spec__.origin)
127 package_path = os.path.dirname(absolute_package_path)
128 full_path = os.path.join(package_path, resource)
129 try:
Barry Warsaw0ed66df2018-05-17 11:41:53 -0400130 return open(full_path, mode='r', encoding=encoding, errors=errors)
Barry Warsawdeae6b42017-12-30 15:18:06 -0500131 except OSError:
132 # Just assume the loader is a resource loader; all the relevant
133 # importlib.machinery loaders are and an AttributeError for
134 # get_data() will make it clear what is needed from the loader.
135 loader = cast(ResourceLoader, package.__spec__.loader)
136 data = None
137 if hasattr(package.__spec__.loader, 'get_data'):
138 with suppress(OSError):
139 data = loader.get_data(full_path)
140 if data is None:
141 package_name = package.__spec__.name
142 message = '{!r} resource not found in {!r}'.format(
143 resource, package_name)
144 raise FileNotFoundError(message)
145 else:
146 return TextIOWrapper(BytesIO(data), encoding, errors)
147
148
149def read_binary(package: Package, resource: Resource) -> bytes:
150 """Return the binary contents of the resource."""
151 resource = _normalize_path(resource)
152 package = _get_package(package)
153 with open_binary(package, resource) as fp:
154 return fp.read()
155
156
157def read_text(package: Package,
158 resource: Resource,
159 encoding: str = 'utf-8',
160 errors: str = 'strict') -> str:
161 """Return the decoded string of the resource.
162
163 The decoding-related arguments have the same semantics as those of
164 bytes.decode().
165 """
166 resource = _normalize_path(resource)
167 package = _get_package(package)
168 with open_text(package, resource, encoding, errors) as fp:
169 return fp.read()
170
171
172@contextmanager
173def path(package: Package, resource: Resource) -> Iterator[Path]:
174 """A context manager providing a file path object to the resource.
175
176 If the resource does not already exist on its own on the file system,
177 a temporary file will be created. If the file was created, the file
178 will be deleted upon exiting the context manager (no exception is
179 raised if the file was deleted prior to the context manager
180 exiting).
181 """
182 resource = _normalize_path(resource)
183 package = _get_package(package)
184 reader = _get_resource_reader(package)
185 if reader is not None:
186 try:
187 yield Path(reader.resource_path(resource))
188 return
189 except FileNotFoundError:
190 pass
Barry Warsawbbbcf862018-02-02 15:15:58 -0500191 else:
192 _check_location(package)
Barry Warsawdeae6b42017-12-30 15:18:06 -0500193 # Fall-through for both the lack of resource_path() *and* if
194 # resource_path() raises FileNotFoundError.
195 package_directory = Path(package.__spec__.origin).parent
196 file_path = package_directory / resource
197 if file_path.exists():
198 yield file_path
199 else:
200 with open_binary(package, resource) as fp:
201 data = fp.read()
202 # Not using tempfile.NamedTemporaryFile as it leads to deeper 'try'
203 # blocks due to the need to close the temporary file to work on
204 # Windows properly.
205 fd, raw_path = tempfile.mkstemp()
206 try:
207 os.write(fd, data)
208 os.close(fd)
209 yield Path(raw_path)
210 finally:
211 try:
212 os.remove(raw_path)
213 except FileNotFoundError:
214 pass
215
216
217def is_resource(package: Package, name: str) -> bool:
218 """True if 'name' is a resource inside 'package'.
219
220 Directories are *not* resources.
221 """
222 package = _get_package(package)
223 _normalize_path(name)
224 reader = _get_resource_reader(package)
225 if reader is not None:
226 return reader.is_resource(name)
227 try:
228 package_contents = set(contents(package))
229 except (NotADirectoryError, FileNotFoundError):
230 return False
231 if name not in package_contents:
232 return False
233 # Just because the given file_name lives as an entry in the package's
234 # contents doesn't necessarily mean it's a resource. Directories are not
235 # resources, so let's try to find out if it's a directory or not.
236 path = Path(package.__spec__.origin).parent / name
Barry Warsaw6f6eb352018-01-24 15:36:21 -0500237 return path.is_file()
Barry Warsawdeae6b42017-12-30 15:18:06 -0500238
239
Brett Cannon3ab93652018-04-30 11:31:45 -0700240def contents(package: Package) -> Iterable[str]:
241 """Return an iterable of entries in 'package'.
Barry Warsawdeae6b42017-12-30 15:18:06 -0500242
243 Note that not all entries are resources. Specifically, directories are
244 not considered resources. Use `is_resource()` on each entry returned here
245 to check if it is a resource or not.
246 """
247 package = _get_package(package)
248 reader = _get_resource_reader(package)
249 if reader is not None:
Brett Cannon3ab93652018-04-30 11:31:45 -0700250 return reader.contents()
Barry Warsawdeae6b42017-12-30 15:18:06 -0500251 # Is the package a namespace package? By definition, namespace packages
Barry Warsawbbbcf862018-02-02 15:15:58 -0500252 # cannot have resources. We could use _check_location() and catch the
253 # exception, but that's extra work, so just inline the check.
Brett Cannon3ab93652018-04-30 11:31:45 -0700254 elif package.__spec__.origin is None or not package.__spec__.has_location:
255 return ()
256 else:
257 package_directory = Path(package.__spec__.origin).parent
258 return os.listdir(package_directory)