blob: c4f6bbde45fa36c021a6cbf3a28f0eac1a006178 [file] [log] [blame]
Barry Warsawdeae6b42017-12-30 15:18:06 -05001import os
2import tempfile
3
4from . import abc as resources_abc
5from builtins import open as builtins_open
6from contextlib import contextmanager, suppress
7from importlib import import_module
8from importlib.abc import ResourceLoader
9from io import BytesIO, TextIOWrapper
10from pathlib import Path
11from types import ModuleType
12from typing import Iterator, Optional, Set, Union # noqa: F401
13from typing import cast
14from typing.io import BinaryIO, TextIO
Barry Warsaw6f6eb352018-01-24 15:36:21 -050015from zipimport import ZipImportError
Barry Warsawdeae6b42017-12-30 15:18:06 -050016
17
18Package = Union[str, ModuleType]
19Resource = Union[str, os.PathLike]
20
21
22def _get_package(package) -> ModuleType:
23 """Take a package name or module object and return the module.
24
25 If a name, the module is imported. If the passed or imported module
26 object is not a package, raise an exception.
27 """
28 if hasattr(package, '__spec__'):
29 if package.__spec__.submodule_search_locations is None:
30 raise TypeError('{!r} is not a package'.format(
31 package.__spec__.name))
32 else:
33 return package
34 else:
35 module = import_module(package)
36 if module.__spec__.submodule_search_locations is None:
37 raise TypeError('{!r} is not a package'.format(package))
38 else:
39 return module
40
41
42def _normalize_path(path) -> str:
43 """Normalize a path by ensuring it is a string.
44
45 If the resulting string contains path separators, an exception is raised.
46 """
47 str_path = str(path)
48 parent, file_name = os.path.split(str_path)
49 if parent:
50 raise ValueError('{!r} must be only a file name'.format(path))
51 else:
52 return file_name
53
54
55def _get_resource_reader(
56 package: ModuleType) -> Optional[resources_abc.ResourceReader]:
57 # Return the package's loader if it's a ResourceReader. We can't use
58 # a issubclass() check here because apparently abc.'s __subclasscheck__()
59 # hook wants to create a weak reference to the object, but
60 # zipimport.zipimporter does not support weak references, resulting in a
61 # TypeError. That seems terrible.
Barry Warsaw5ec0fee2018-01-15 15:07:11 -080062 spec = package.__spec__
63 if hasattr(spec.loader, 'get_resource_reader'):
64 return cast(resources_abc.ResourceReader,
65 spec.loader.get_resource_reader(spec.name))
Barry Warsawdeae6b42017-12-30 15:18:06 -050066 return None
67
68
Barry Warsawa23d30f2018-02-02 19:49:25 -050069def _check_location(package):
70 if package.__spec__.origin is None or not package.__spec__.has_location:
71 raise FileNotFoundError(f'Package has no location {package!r}')
72
73
Barry Warsawdeae6b42017-12-30 15:18:06 -050074def open_binary(package: Package, resource: Resource) -> BinaryIO:
75 """Return a file-like object opened for binary reading of the resource."""
76 resource = _normalize_path(resource)
77 package = _get_package(package)
78 reader = _get_resource_reader(package)
79 if reader is not None:
80 return reader.open_resource(resource)
Barry Warsawa23d30f2018-02-02 19:49:25 -050081 _check_location(package)
Barry Warsawdeae6b42017-12-30 15:18:06 -050082 absolute_package_path = os.path.abspath(package.__spec__.origin)
83 package_path = os.path.dirname(absolute_package_path)
84 full_path = os.path.join(package_path, resource)
85 try:
86 return builtins_open(full_path, mode='rb')
87 except OSError:
88 # Just assume the loader is a resource loader; all the relevant
89 # importlib.machinery loaders are and an AttributeError for
90 # get_data() will make it clear what is needed from the loader.
91 loader = cast(ResourceLoader, package.__spec__.loader)
92 data = None
93 if hasattr(package.__spec__.loader, 'get_data'):
94 with suppress(OSError):
95 data = loader.get_data(full_path)
96 if data is None:
97 package_name = package.__spec__.name
98 message = '{!r} resource not found in {!r}'.format(
99 resource, package_name)
100 raise FileNotFoundError(message)
101 else:
102 return BytesIO(data)
103
104
105def open_text(package: Package,
106 resource: Resource,
107 encoding: str = 'utf-8',
108 errors: str = 'strict') -> TextIO:
109 """Return a file-like object opened for text reading of the resource."""
110 resource = _normalize_path(resource)
111 package = _get_package(package)
112 reader = _get_resource_reader(package)
113 if reader is not None:
114 return TextIOWrapper(reader.open_resource(resource), encoding, errors)
Barry Warsawa23d30f2018-02-02 19:49:25 -0500115 _check_location(package)
Barry Warsawdeae6b42017-12-30 15:18:06 -0500116 absolute_package_path = os.path.abspath(package.__spec__.origin)
117 package_path = os.path.dirname(absolute_package_path)
118 full_path = os.path.join(package_path, resource)
119 try:
120 return builtins_open(
121 full_path, mode='r', encoding=encoding, errors=errors)
122 except OSError:
123 # Just assume the loader is a resource loader; all the relevant
124 # importlib.machinery loaders are and an AttributeError for
125 # get_data() will make it clear what is needed from the loader.
126 loader = cast(ResourceLoader, package.__spec__.loader)
127 data = None
128 if hasattr(package.__spec__.loader, 'get_data'):
129 with suppress(OSError):
130 data = loader.get_data(full_path)
131 if data is None:
132 package_name = package.__spec__.name
133 message = '{!r} resource not found in {!r}'.format(
134 resource, package_name)
135 raise FileNotFoundError(message)
136 else:
137 return TextIOWrapper(BytesIO(data), encoding, errors)
138
139
140def read_binary(package: Package, resource: Resource) -> bytes:
141 """Return the binary contents of the resource."""
142 resource = _normalize_path(resource)
143 package = _get_package(package)
144 with open_binary(package, resource) as fp:
145 return fp.read()
146
147
148def read_text(package: Package,
149 resource: Resource,
150 encoding: str = 'utf-8',
151 errors: str = 'strict') -> str:
152 """Return the decoded string of the resource.
153
154 The decoding-related arguments have the same semantics as those of
155 bytes.decode().
156 """
157 resource = _normalize_path(resource)
158 package = _get_package(package)
159 with open_text(package, resource, encoding, errors) as fp:
160 return fp.read()
161
162
163@contextmanager
164def path(package: Package, resource: Resource) -> Iterator[Path]:
165 """A context manager providing a file path object to the resource.
166
167 If the resource does not already exist on its own on the file system,
168 a temporary file will be created. If the file was created, the file
169 will be deleted upon exiting the context manager (no exception is
170 raised if the file was deleted prior to the context manager
171 exiting).
172 """
173 resource = _normalize_path(resource)
174 package = _get_package(package)
175 reader = _get_resource_reader(package)
176 if reader is not None:
177 try:
178 yield Path(reader.resource_path(resource))
179 return
180 except FileNotFoundError:
181 pass
Barry Warsawa23d30f2018-02-02 19:49:25 -0500182 else:
183 _check_location(package)
Barry Warsawdeae6b42017-12-30 15:18:06 -0500184 # Fall-through for both the lack of resource_path() *and* if
185 # resource_path() raises FileNotFoundError.
186 package_directory = Path(package.__spec__.origin).parent
187 file_path = package_directory / resource
188 if file_path.exists():
189 yield file_path
190 else:
191 with open_binary(package, resource) as fp:
192 data = fp.read()
193 # Not using tempfile.NamedTemporaryFile as it leads to deeper 'try'
194 # blocks due to the need to close the temporary file to work on
195 # Windows properly.
196 fd, raw_path = tempfile.mkstemp()
197 try:
198 os.write(fd, data)
199 os.close(fd)
200 yield Path(raw_path)
201 finally:
202 try:
203 os.remove(raw_path)
204 except FileNotFoundError:
205 pass
206
207
208def is_resource(package: Package, name: str) -> bool:
209 """True if 'name' is a resource inside 'package'.
210
211 Directories are *not* resources.
212 """
213 package = _get_package(package)
214 _normalize_path(name)
215 reader = _get_resource_reader(package)
216 if reader is not None:
217 return reader.is_resource(name)
218 try:
219 package_contents = set(contents(package))
220 except (NotADirectoryError, FileNotFoundError):
221 return False
222 if name not in package_contents:
223 return False
224 # Just because the given file_name lives as an entry in the package's
225 # contents doesn't necessarily mean it's a resource. Directories are not
226 # resources, so let's try to find out if it's a directory or not.
227 path = Path(package.__spec__.origin).parent / name
Barry Warsaw6f6eb352018-01-24 15:36:21 -0500228 return path.is_file()
Barry Warsawdeae6b42017-12-30 15:18:06 -0500229
230
231def contents(package: Package) -> Iterator[str]:
232 """Return the list of entries in 'package'.
233
234 Note that not all entries are resources. Specifically, directories are
235 not considered resources. Use `is_resource()` on each entry returned here
236 to check if it is a resource or not.
237 """
238 package = _get_package(package)
239 reader = _get_resource_reader(package)
240 if reader is not None:
241 yield from reader.contents()
242 return
243 # Is the package a namespace package? By definition, namespace packages
Barry Warsawa23d30f2018-02-02 19:49:25 -0500244 # cannot have resources. We could use _check_location() and catch the
245 # exception, but that's extra work, so just inline the check.
246 if package.__spec__.origin is None or not package.__spec__.has_location:
Barry Warsawdeae6b42017-12-30 15:18:06 -0500247 return []
248 package_directory = Path(package.__spec__.origin).parent
Barry Warsaw6f6eb352018-01-24 15:36:21 -0500249 yield from os.listdir(str(package_directory))
250
251
252# Private implementation of ResourceReader and get_resource_reader() for
253# zipimport. Don't use these directly! We're implementing these in Python
254# because 1) it's easier, 2) zipimport will likely get rewritten in Python
255# itself at some point, so doing this all in C would just be a waste of
256# effort.
257
258class _ZipImportResourceReader(resources_abc.ResourceReader):
259 """Private class used to support ZipImport.get_resource_reader().
260
261 This class is allowed to reference all the innards and private parts of
262 the zipimporter.
263 """
264
265 def __init__(self, zipimporter, fullname):
266 self.zipimporter = zipimporter
267 self.fullname = fullname
268
269 def open_resource(self, resource):
270 path = f'{self.fullname}/{resource}'
271 try:
272 return BytesIO(self.zipimporter.get_data(path))
273 except OSError:
274 raise FileNotFoundError
275
276 def resource_path(self, resource):
277 # All resources are in the zip file, so there is no path to the file.
278 # Raising FileNotFoundError tells the higher level API to extract the
279 # binary data and create a temporary file.
280 raise FileNotFoundError
281
282 def is_resource(self, name):
283 # Maybe we could do better, but if we can get the data, it's a
284 # resource. Otherwise it isn't.
285 path = f'{self.fullname}/{name}'
286 try:
287 self.zipimporter.get_data(path)
288 except OSError:
289 return False
290 return True
291
292 def contents(self):
293 # This is a bit convoluted, because fullname will be a module path,
294 # but _files is a list of file names relative to the top of the
295 # archive's namespace. We want to compare file paths to find all the
296 # names of things inside the module represented by fullname. So we
297 # turn the module path of fullname into a file path relative to the
298 # top of the archive, and then we iterate through _files looking for
299 # names inside that "directory".
300 fullname_path = Path(self.zipimporter.get_filename(self.fullname))
301 relative_path = fullname_path.relative_to(self.zipimporter.archive)
302 # Don't forget that fullname names a package, so its path will include
303 # __init__.py, which we want to ignore.
304 assert relative_path.name == '__init__.py'
305 package_path = relative_path.parent
306 subdirs_seen = set()
307 for filename in self.zipimporter._files:
308 try:
309 relative = Path(filename).relative_to(package_path)
310 except ValueError:
Barry Warsawdeae6b42017-12-30 15:18:06 -0500311 continue
Barry Warsaw6f6eb352018-01-24 15:36:21 -0500312 # If the path of the file (which is relative to the top of the zip
313 # namespace), relative to the package given when the resource
314 # reader was created, has a parent, then it's a name in a
315 # subdirectory and thus we skip it.
316 parent_name = relative.parent.name
317 if len(parent_name) == 0:
318 yield relative.name
319 elif parent_name not in subdirs_seen:
320 subdirs_seen.add(parent_name)
321 yield parent_name
322
323
324def _zipimport_get_resource_reader(zipimporter, fullname):
325 try:
326 if not zipimporter.is_package(fullname):
327 return None
328 except ZipImportError:
329 return None
330 return _ZipImportResourceReader(zipimporter, fullname)