blob: 4d70186c1a43b7ea2c80ed50ed793480a79856b7 [file] [log] [blame]
Barry Warsawdeae6b42017-12-30 15:18:06 -05001import os
2import tempfile
3
4from . import abc as resources_abc
5from builtins import open as builtins_open
6from contextlib import contextmanager, suppress
7from importlib import import_module
8from importlib.abc import ResourceLoader
9from io import BytesIO, TextIOWrapper
10from pathlib import Path
11from types import ModuleType
Miss Islington (bot)2e5fa382018-04-30 12:30:26 -070012from typing import Iterable, Iterator, Optional, Set, Union # noqa: F401
Barry Warsawdeae6b42017-12-30 15:18:06 -050013from typing import cast
14from typing.io import BinaryIO, TextIO
Barry Warsaw6f6eb352018-01-24 15:36:21 -050015from zipimport import ZipImportError
Barry Warsawdeae6b42017-12-30 15:18:06 -050016
17
18Package = Union[str, ModuleType]
19Resource = Union[str, os.PathLike]
20
21
22def _get_package(package) -> ModuleType:
23 """Take a package name or module object and return the module.
24
25 If a name, the module is imported. If the passed or imported module
26 object is not a package, raise an exception.
27 """
28 if hasattr(package, '__spec__'):
29 if package.__spec__.submodule_search_locations is None:
30 raise TypeError('{!r} is not a package'.format(
31 package.__spec__.name))
32 else:
33 return package
34 else:
35 module = import_module(package)
36 if module.__spec__.submodule_search_locations is None:
37 raise TypeError('{!r} is not a package'.format(package))
38 else:
39 return module
40
41
42def _normalize_path(path) -> str:
43 """Normalize a path by ensuring it is a string.
44
45 If the resulting string contains path separators, an exception is raised.
46 """
Miss Islington (bot)2e5fa382018-04-30 12:30:26 -070047 parent, file_name = os.path.split(path)
Barry Warsawdeae6b42017-12-30 15:18:06 -050048 if parent:
49 raise ValueError('{!r} must be only a file name'.format(path))
50 else:
51 return file_name
52
53
54def _get_resource_reader(
55 package: ModuleType) -> Optional[resources_abc.ResourceReader]:
56 # Return the package's loader if it's a ResourceReader. We can't use
57 # a issubclass() check here because apparently abc.'s __subclasscheck__()
58 # hook wants to create a weak reference to the object, but
59 # zipimport.zipimporter does not support weak references, resulting in a
60 # TypeError. That seems terrible.
Barry Warsaw5ec0fee2018-01-15 15:07:11 -080061 spec = package.__spec__
62 if hasattr(spec.loader, 'get_resource_reader'):
63 return cast(resources_abc.ResourceReader,
64 spec.loader.get_resource_reader(spec.name))
Barry Warsawdeae6b42017-12-30 15:18:06 -050065 return None
66
67
Barry Warsawa23d30f2018-02-02 19:49:25 -050068def _check_location(package):
69 if package.__spec__.origin is None or not package.__spec__.has_location:
70 raise FileNotFoundError(f'Package has no location {package!r}')
71
72
Barry Warsawdeae6b42017-12-30 15:18:06 -050073def open_binary(package: Package, resource: Resource) -> BinaryIO:
74 """Return a file-like object opened for binary reading of the resource."""
75 resource = _normalize_path(resource)
76 package = _get_package(package)
77 reader = _get_resource_reader(package)
78 if reader is not None:
79 return reader.open_resource(resource)
Barry Warsawa23d30f2018-02-02 19:49:25 -050080 _check_location(package)
Barry Warsawdeae6b42017-12-30 15:18:06 -050081 absolute_package_path = os.path.abspath(package.__spec__.origin)
82 package_path = os.path.dirname(absolute_package_path)
83 full_path = os.path.join(package_path, resource)
84 try:
85 return builtins_open(full_path, mode='rb')
86 except OSError:
87 # Just assume the loader is a resource loader; all the relevant
88 # importlib.machinery loaders are and an AttributeError for
89 # get_data() will make it clear what is needed from the loader.
90 loader = cast(ResourceLoader, package.__spec__.loader)
91 data = None
92 if hasattr(package.__spec__.loader, 'get_data'):
93 with suppress(OSError):
94 data = loader.get_data(full_path)
95 if data is None:
96 package_name = package.__spec__.name
97 message = '{!r} resource not found in {!r}'.format(
98 resource, package_name)
99 raise FileNotFoundError(message)
100 else:
101 return BytesIO(data)
102
103
104def open_text(package: Package,
105 resource: Resource,
106 encoding: str = 'utf-8',
107 errors: str = 'strict') -> TextIO:
108 """Return a file-like object opened for text reading of the resource."""
109 resource = _normalize_path(resource)
110 package = _get_package(package)
111 reader = _get_resource_reader(package)
112 if reader is not None:
113 return TextIOWrapper(reader.open_resource(resource), encoding, errors)
Barry Warsawa23d30f2018-02-02 19:49:25 -0500114 _check_location(package)
Barry Warsawdeae6b42017-12-30 15:18:06 -0500115 absolute_package_path = os.path.abspath(package.__spec__.origin)
116 package_path = os.path.dirname(absolute_package_path)
117 full_path = os.path.join(package_path, resource)
118 try:
119 return builtins_open(
120 full_path, mode='r', encoding=encoding, errors=errors)
121 except OSError:
122 # Just assume the loader is a resource loader; all the relevant
123 # importlib.machinery loaders are and an AttributeError for
124 # get_data() will make it clear what is needed from the loader.
125 loader = cast(ResourceLoader, package.__spec__.loader)
126 data = None
127 if hasattr(package.__spec__.loader, 'get_data'):
128 with suppress(OSError):
129 data = loader.get_data(full_path)
130 if data is None:
131 package_name = package.__spec__.name
132 message = '{!r} resource not found in {!r}'.format(
133 resource, package_name)
134 raise FileNotFoundError(message)
135 else:
136 return TextIOWrapper(BytesIO(data), encoding, errors)
137
138
139def read_binary(package: Package, resource: Resource) -> bytes:
140 """Return the binary contents of the resource."""
141 resource = _normalize_path(resource)
142 package = _get_package(package)
143 with open_binary(package, resource) as fp:
144 return fp.read()
145
146
147def read_text(package: Package,
148 resource: Resource,
149 encoding: str = 'utf-8',
150 errors: str = 'strict') -> str:
151 """Return the decoded string of the resource.
152
153 The decoding-related arguments have the same semantics as those of
154 bytes.decode().
155 """
156 resource = _normalize_path(resource)
157 package = _get_package(package)
158 with open_text(package, resource, encoding, errors) as fp:
159 return fp.read()
160
161
162@contextmanager
163def path(package: Package, resource: Resource) -> Iterator[Path]:
164 """A context manager providing a file path object to the resource.
165
166 If the resource does not already exist on its own on the file system,
167 a temporary file will be created. If the file was created, the file
168 will be deleted upon exiting the context manager (no exception is
169 raised if the file was deleted prior to the context manager
170 exiting).
171 """
172 resource = _normalize_path(resource)
173 package = _get_package(package)
174 reader = _get_resource_reader(package)
175 if reader is not None:
176 try:
177 yield Path(reader.resource_path(resource))
178 return
179 except FileNotFoundError:
180 pass
Barry Warsawa23d30f2018-02-02 19:49:25 -0500181 else:
182 _check_location(package)
Barry Warsawdeae6b42017-12-30 15:18:06 -0500183 # Fall-through for both the lack of resource_path() *and* if
184 # resource_path() raises FileNotFoundError.
185 package_directory = Path(package.__spec__.origin).parent
186 file_path = package_directory / resource
187 if file_path.exists():
188 yield file_path
189 else:
190 with open_binary(package, resource) as fp:
191 data = fp.read()
192 # Not using tempfile.NamedTemporaryFile as it leads to deeper 'try'
193 # blocks due to the need to close the temporary file to work on
194 # Windows properly.
195 fd, raw_path = tempfile.mkstemp()
196 try:
197 os.write(fd, data)
198 os.close(fd)
199 yield Path(raw_path)
200 finally:
201 try:
202 os.remove(raw_path)
203 except FileNotFoundError:
204 pass
205
206
207def is_resource(package: Package, name: str) -> bool:
208 """True if 'name' is a resource inside 'package'.
209
210 Directories are *not* resources.
211 """
212 package = _get_package(package)
213 _normalize_path(name)
214 reader = _get_resource_reader(package)
215 if reader is not None:
216 return reader.is_resource(name)
217 try:
218 package_contents = set(contents(package))
219 except (NotADirectoryError, FileNotFoundError):
220 return False
221 if name not in package_contents:
222 return False
223 # Just because the given file_name lives as an entry in the package's
224 # contents doesn't necessarily mean it's a resource. Directories are not
225 # resources, so let's try to find out if it's a directory or not.
226 path = Path(package.__spec__.origin).parent / name
Barry Warsaw6f6eb352018-01-24 15:36:21 -0500227 return path.is_file()
Barry Warsawdeae6b42017-12-30 15:18:06 -0500228
229
Miss Islington (bot)2e5fa382018-04-30 12:30:26 -0700230def contents(package: Package) -> Iterable[str]:
231 """Return an iterable of entries in 'package'.
Barry Warsawdeae6b42017-12-30 15:18:06 -0500232
233 Note that not all entries are resources. Specifically, directories are
234 not considered resources. Use `is_resource()` on each entry returned here
235 to check if it is a resource or not.
236 """
237 package = _get_package(package)
238 reader = _get_resource_reader(package)
239 if reader is not None:
Miss Islington (bot)2e5fa382018-04-30 12:30:26 -0700240 return reader.contents()
Barry Warsawdeae6b42017-12-30 15:18:06 -0500241 # Is the package a namespace package? By definition, namespace packages
Barry Warsawa23d30f2018-02-02 19:49:25 -0500242 # cannot have resources. We could use _check_location() and catch the
243 # exception, but that's extra work, so just inline the check.
Miss Islington (bot)2e5fa382018-04-30 12:30:26 -0700244 elif package.__spec__.origin is None or not package.__spec__.has_location:
245 return ()
246 else:
247 package_directory = Path(package.__spec__.origin).parent
248 return os.listdir(package_directory)
Barry Warsaw6f6eb352018-01-24 15:36:21 -0500249
250
251# Private implementation of ResourceReader and get_resource_reader() for
252# zipimport. Don't use these directly! We're implementing these in Python
253# because 1) it's easier, 2) zipimport will likely get rewritten in Python
254# itself at some point, so doing this all in C would just be a waste of
255# effort.
256
257class _ZipImportResourceReader(resources_abc.ResourceReader):
258 """Private class used to support ZipImport.get_resource_reader().
259
260 This class is allowed to reference all the innards and private parts of
261 the zipimporter.
262 """
263
264 def __init__(self, zipimporter, fullname):
265 self.zipimporter = zipimporter
266 self.fullname = fullname
267
268 def open_resource(self, resource):
Miss Islington (bot)fd1b8f82018-03-27 10:25:28 -0700269 fullname_as_path = self.fullname.replace('.', '/')
270 path = f'{fullname_as_path}/{resource}'
Barry Warsaw6f6eb352018-01-24 15:36:21 -0500271 try:
272 return BytesIO(self.zipimporter.get_data(path))
273 except OSError:
Miss Islington (bot)fd1b8f82018-03-27 10:25:28 -0700274 raise FileNotFoundError(path)
Barry Warsaw6f6eb352018-01-24 15:36:21 -0500275
276 def resource_path(self, resource):
277 # All resources are in the zip file, so there is no path to the file.
278 # Raising FileNotFoundError tells the higher level API to extract the
279 # binary data and create a temporary file.
280 raise FileNotFoundError
281
282 def is_resource(self, name):
283 # Maybe we could do better, but if we can get the data, it's a
284 # resource. Otherwise it isn't.
Miss Islington (bot)fd1b8f82018-03-27 10:25:28 -0700285 fullname_as_path = self.fullname.replace('.', '/')
286 path = f'{fullname_as_path}/{name}'
Barry Warsaw6f6eb352018-01-24 15:36:21 -0500287 try:
288 self.zipimporter.get_data(path)
289 except OSError:
290 return False
291 return True
292
293 def contents(self):
294 # This is a bit convoluted, because fullname will be a module path,
295 # but _files is a list of file names relative to the top of the
296 # archive's namespace. We want to compare file paths to find all the
297 # names of things inside the module represented by fullname. So we
298 # turn the module path of fullname into a file path relative to the
299 # top of the archive, and then we iterate through _files looking for
300 # names inside that "directory".
301 fullname_path = Path(self.zipimporter.get_filename(self.fullname))
302 relative_path = fullname_path.relative_to(self.zipimporter.archive)
303 # Don't forget that fullname names a package, so its path will include
304 # __init__.py, which we want to ignore.
305 assert relative_path.name == '__init__.py'
306 package_path = relative_path.parent
307 subdirs_seen = set()
308 for filename in self.zipimporter._files:
309 try:
310 relative = Path(filename).relative_to(package_path)
311 except ValueError:
Barry Warsawdeae6b42017-12-30 15:18:06 -0500312 continue
Barry Warsaw6f6eb352018-01-24 15:36:21 -0500313 # If the path of the file (which is relative to the top of the zip
314 # namespace), relative to the package given when the resource
315 # reader was created, has a parent, then it's a name in a
316 # subdirectory and thus we skip it.
317 parent_name = relative.parent.name
318 if len(parent_name) == 0:
319 yield relative.name
320 elif parent_name not in subdirs_seen:
321 subdirs_seen.add(parent_name)
322 yield parent_name
323
324
325def _zipimport_get_resource_reader(zipimporter, fullname):
326 try:
327 if not zipimporter.is_package(fullname):
328 return None
329 except ZipImportError:
330 return None
331 return _ZipImportResourceReader(zipimporter, fullname)