blob: bf6d7035850ce8b47c00cde1fe30d6f802d7e974 [file] [log] [blame]
Barry Warsawdeae6b42017-12-30 15:18:06 -05001import os
2import tempfile
3
4from . import abc as resources_abc
5from builtins import open as builtins_open
6from contextlib import contextmanager, suppress
7from importlib import import_module
8from importlib.abc import ResourceLoader
9from io import BytesIO, TextIOWrapper
10from pathlib import Path
11from types import ModuleType
12from typing import Iterator, Optional, Set, Union # noqa: F401
13from typing import cast
14from typing.io import BinaryIO, TextIO
Barry Warsaw6f6eb352018-01-24 15:36:21 -050015from zipimport import ZipImportError
Barry Warsawdeae6b42017-12-30 15:18:06 -050016
17
18Package = Union[str, ModuleType]
19Resource = Union[str, os.PathLike]
20
21
22def _get_package(package) -> ModuleType:
23 """Take a package name or module object and return the module.
24
25 If a name, the module is imported. If the passed or imported module
26 object is not a package, raise an exception.
27 """
28 if hasattr(package, '__spec__'):
29 if package.__spec__.submodule_search_locations is None:
30 raise TypeError('{!r} is not a package'.format(
31 package.__spec__.name))
32 else:
33 return package
34 else:
35 module = import_module(package)
36 if module.__spec__.submodule_search_locations is None:
37 raise TypeError('{!r} is not a package'.format(package))
38 else:
39 return module
40
41
42def _normalize_path(path) -> str:
43 """Normalize a path by ensuring it is a string.
44
45 If the resulting string contains path separators, an exception is raised.
46 """
47 str_path = str(path)
48 parent, file_name = os.path.split(str_path)
49 if parent:
50 raise ValueError('{!r} must be only a file name'.format(path))
51 else:
52 return file_name
53
54
55def _get_resource_reader(
56 package: ModuleType) -> Optional[resources_abc.ResourceReader]:
57 # Return the package's loader if it's a ResourceReader. We can't use
58 # a issubclass() check here because apparently abc.'s __subclasscheck__()
59 # hook wants to create a weak reference to the object, but
60 # zipimport.zipimporter does not support weak references, resulting in a
61 # TypeError. That seems terrible.
Barry Warsaw5ec0fee2018-01-15 15:07:11 -080062 spec = package.__spec__
63 if hasattr(spec.loader, 'get_resource_reader'):
64 return cast(resources_abc.ResourceReader,
65 spec.loader.get_resource_reader(spec.name))
Barry Warsawdeae6b42017-12-30 15:18:06 -050066 return None
67
68
69def open_binary(package: Package, resource: Resource) -> BinaryIO:
70 """Return a file-like object opened for binary reading of the resource."""
71 resource = _normalize_path(resource)
72 package = _get_package(package)
73 reader = _get_resource_reader(package)
74 if reader is not None:
75 return reader.open_resource(resource)
76 absolute_package_path = os.path.abspath(package.__spec__.origin)
77 package_path = os.path.dirname(absolute_package_path)
78 full_path = os.path.join(package_path, resource)
79 try:
80 return builtins_open(full_path, mode='rb')
81 except OSError:
82 # Just assume the loader is a resource loader; all the relevant
83 # importlib.machinery loaders are and an AttributeError for
84 # get_data() will make it clear what is needed from the loader.
85 loader = cast(ResourceLoader, package.__spec__.loader)
86 data = None
87 if hasattr(package.__spec__.loader, 'get_data'):
88 with suppress(OSError):
89 data = loader.get_data(full_path)
90 if data is None:
91 package_name = package.__spec__.name
92 message = '{!r} resource not found in {!r}'.format(
93 resource, package_name)
94 raise FileNotFoundError(message)
95 else:
96 return BytesIO(data)
97
98
99def open_text(package: Package,
100 resource: Resource,
101 encoding: str = 'utf-8',
102 errors: str = 'strict') -> TextIO:
103 """Return a file-like object opened for text reading of the resource."""
104 resource = _normalize_path(resource)
105 package = _get_package(package)
106 reader = _get_resource_reader(package)
107 if reader is not None:
108 return TextIOWrapper(reader.open_resource(resource), encoding, errors)
109 absolute_package_path = os.path.abspath(package.__spec__.origin)
110 package_path = os.path.dirname(absolute_package_path)
111 full_path = os.path.join(package_path, resource)
112 try:
113 return builtins_open(
114 full_path, mode='r', encoding=encoding, errors=errors)
115 except OSError:
116 # Just assume the loader is a resource loader; all the relevant
117 # importlib.machinery loaders are and an AttributeError for
118 # get_data() will make it clear what is needed from the loader.
119 loader = cast(ResourceLoader, package.__spec__.loader)
120 data = None
121 if hasattr(package.__spec__.loader, 'get_data'):
122 with suppress(OSError):
123 data = loader.get_data(full_path)
124 if data is None:
125 package_name = package.__spec__.name
126 message = '{!r} resource not found in {!r}'.format(
127 resource, package_name)
128 raise FileNotFoundError(message)
129 else:
130 return TextIOWrapper(BytesIO(data), encoding, errors)
131
132
133def read_binary(package: Package, resource: Resource) -> bytes:
134 """Return the binary contents of the resource."""
135 resource = _normalize_path(resource)
136 package = _get_package(package)
137 with open_binary(package, resource) as fp:
138 return fp.read()
139
140
141def read_text(package: Package,
142 resource: Resource,
143 encoding: str = 'utf-8',
144 errors: str = 'strict') -> str:
145 """Return the decoded string of the resource.
146
147 The decoding-related arguments have the same semantics as those of
148 bytes.decode().
149 """
150 resource = _normalize_path(resource)
151 package = _get_package(package)
152 with open_text(package, resource, encoding, errors) as fp:
153 return fp.read()
154
155
156@contextmanager
157def path(package: Package, resource: Resource) -> Iterator[Path]:
158 """A context manager providing a file path object to the resource.
159
160 If the resource does not already exist on its own on the file system,
161 a temporary file will be created. If the file was created, the file
162 will be deleted upon exiting the context manager (no exception is
163 raised if the file was deleted prior to the context manager
164 exiting).
165 """
166 resource = _normalize_path(resource)
167 package = _get_package(package)
168 reader = _get_resource_reader(package)
169 if reader is not None:
170 try:
171 yield Path(reader.resource_path(resource))
172 return
173 except FileNotFoundError:
174 pass
175 # Fall-through for both the lack of resource_path() *and* if
176 # resource_path() raises FileNotFoundError.
177 package_directory = Path(package.__spec__.origin).parent
178 file_path = package_directory / resource
179 if file_path.exists():
180 yield file_path
181 else:
182 with open_binary(package, resource) as fp:
183 data = fp.read()
184 # Not using tempfile.NamedTemporaryFile as it leads to deeper 'try'
185 # blocks due to the need to close the temporary file to work on
186 # Windows properly.
187 fd, raw_path = tempfile.mkstemp()
188 try:
189 os.write(fd, data)
190 os.close(fd)
191 yield Path(raw_path)
192 finally:
193 try:
194 os.remove(raw_path)
195 except FileNotFoundError:
196 pass
197
198
199def is_resource(package: Package, name: str) -> bool:
200 """True if 'name' is a resource inside 'package'.
201
202 Directories are *not* resources.
203 """
204 package = _get_package(package)
205 _normalize_path(name)
206 reader = _get_resource_reader(package)
207 if reader is not None:
208 return reader.is_resource(name)
209 try:
210 package_contents = set(contents(package))
211 except (NotADirectoryError, FileNotFoundError):
212 return False
213 if name not in package_contents:
214 return False
215 # Just because the given file_name lives as an entry in the package's
216 # contents doesn't necessarily mean it's a resource. Directories are not
217 # resources, so let's try to find out if it's a directory or not.
218 path = Path(package.__spec__.origin).parent / name
Barry Warsaw6f6eb352018-01-24 15:36:21 -0500219 return path.is_file()
Barry Warsawdeae6b42017-12-30 15:18:06 -0500220
221
222def contents(package: Package) -> Iterator[str]:
223 """Return the list of entries in 'package'.
224
225 Note that not all entries are resources. Specifically, directories are
226 not considered resources. Use `is_resource()` on each entry returned here
227 to check if it is a resource or not.
228 """
229 package = _get_package(package)
230 reader = _get_resource_reader(package)
231 if reader is not None:
232 yield from reader.contents()
233 return
234 # Is the package a namespace package? By definition, namespace packages
235 # cannot have resources.
236 if (package.__spec__.origin == 'namespace' and
237 not package.__spec__.has_location):
238 return []
239 package_directory = Path(package.__spec__.origin).parent
Barry Warsaw6f6eb352018-01-24 15:36:21 -0500240 yield from os.listdir(str(package_directory))
241
242
243# Private implementation of ResourceReader and get_resource_reader() for
244# zipimport. Don't use these directly! We're implementing these in Python
245# because 1) it's easier, 2) zipimport will likely get rewritten in Python
246# itself at some point, so doing this all in C would just be a waste of
247# effort.
248
249class _ZipImportResourceReader(resources_abc.ResourceReader):
250 """Private class used to support ZipImport.get_resource_reader().
251
252 This class is allowed to reference all the innards and private parts of
253 the zipimporter.
254 """
255
256 def __init__(self, zipimporter, fullname):
257 self.zipimporter = zipimporter
258 self.fullname = fullname
259
260 def open_resource(self, resource):
261 path = f'{self.fullname}/{resource}'
262 try:
263 return BytesIO(self.zipimporter.get_data(path))
264 except OSError:
265 raise FileNotFoundError
266
267 def resource_path(self, resource):
268 # All resources are in the zip file, so there is no path to the file.
269 # Raising FileNotFoundError tells the higher level API to extract the
270 # binary data and create a temporary file.
271 raise FileNotFoundError
272
273 def is_resource(self, name):
274 # Maybe we could do better, but if we can get the data, it's a
275 # resource. Otherwise it isn't.
276 path = f'{self.fullname}/{name}'
277 try:
278 self.zipimporter.get_data(path)
279 except OSError:
280 return False
281 return True
282
283 def contents(self):
284 # This is a bit convoluted, because fullname will be a module path,
285 # but _files is a list of file names relative to the top of the
286 # archive's namespace. We want to compare file paths to find all the
287 # names of things inside the module represented by fullname. So we
288 # turn the module path of fullname into a file path relative to the
289 # top of the archive, and then we iterate through _files looking for
290 # names inside that "directory".
291 fullname_path = Path(self.zipimporter.get_filename(self.fullname))
292 relative_path = fullname_path.relative_to(self.zipimporter.archive)
293 # Don't forget that fullname names a package, so its path will include
294 # __init__.py, which we want to ignore.
295 assert relative_path.name == '__init__.py'
296 package_path = relative_path.parent
297 subdirs_seen = set()
298 for filename in self.zipimporter._files:
299 try:
300 relative = Path(filename).relative_to(package_path)
301 except ValueError:
Barry Warsawdeae6b42017-12-30 15:18:06 -0500302 continue
Barry Warsaw6f6eb352018-01-24 15:36:21 -0500303 # If the path of the file (which is relative to the top of the zip
304 # namespace), relative to the package given when the resource
305 # reader was created, has a parent, then it's a name in a
306 # subdirectory and thus we skip it.
307 parent_name = relative.parent.name
308 if len(parent_name) == 0:
309 yield relative.name
310 elif parent_name not in subdirs_seen:
311 subdirs_seen.add(parent_name)
312 yield parent_name
313
314
315def _zipimport_get_resource_reader(zipimporter, fullname):
316 try:
317 if not zipimporter.is_package(fullname):
318 return None
319 except ZipImportError:
320 return None
321 return _ZipImportResourceReader(zipimporter, fullname)