blob: fc3a1c9cabe636bb88be89c27225ebf66a8e5967 [file] [log] [blame]
Barry Warsawdeae6b42017-12-30 15:18:06 -05001import os
2import tempfile
3
4from . import abc as resources_abc
Barry Warsawdeae6b42017-12-30 15:18:06 -05005from contextlib import contextmanager, suppress
6from importlib import import_module
7from importlib.abc import ResourceLoader
8from io import BytesIO, TextIOWrapper
9from pathlib import Path
10from types import ModuleType
Brett Cannon3ab93652018-04-30 11:31:45 -070011from typing import Iterable, Iterator, Optional, Set, Union # noqa: F401
Barry Warsawdeae6b42017-12-30 15:18:06 -050012from typing import cast
13from typing.io import BinaryIO, TextIO
Barry Warsaw6f6eb352018-01-24 15:36:21 -050014from zipimport import ZipImportError
Barry Warsawdeae6b42017-12-30 15:18:06 -050015
16
Barry Warsaw0ed66df2018-05-17 11:41:53 -040017__all__ = [
18 'Package',
19 'Resource',
20 'contents',
21 'is_resource',
22 'open_binary',
23 'open_text',
24 'path',
25 'read_binary',
26 'read_text',
27 ]
28
29
Barry Warsawdeae6b42017-12-30 15:18:06 -050030Package = Union[str, ModuleType]
31Resource = Union[str, os.PathLike]
32
33
34def _get_package(package) -> ModuleType:
35 """Take a package name or module object and return the module.
36
37 If a name, the module is imported. If the passed or imported module
38 object is not a package, raise an exception.
39 """
40 if hasattr(package, '__spec__'):
41 if package.__spec__.submodule_search_locations is None:
42 raise TypeError('{!r} is not a package'.format(
43 package.__spec__.name))
44 else:
45 return package
46 else:
47 module = import_module(package)
48 if module.__spec__.submodule_search_locations is None:
49 raise TypeError('{!r} is not a package'.format(package))
50 else:
51 return module
52
53
54def _normalize_path(path) -> str:
55 """Normalize a path by ensuring it is a string.
56
57 If the resulting string contains path separators, an exception is raised.
58 """
Brett Cannon3ab93652018-04-30 11:31:45 -070059 parent, file_name = os.path.split(path)
Barry Warsawdeae6b42017-12-30 15:18:06 -050060 if parent:
61 raise ValueError('{!r} must be only a file name'.format(path))
62 else:
63 return file_name
64
65
66def _get_resource_reader(
67 package: ModuleType) -> Optional[resources_abc.ResourceReader]:
68 # Return the package's loader if it's a ResourceReader. We can't use
69 # a issubclass() check here because apparently abc.'s __subclasscheck__()
70 # hook wants to create a weak reference to the object, but
71 # zipimport.zipimporter does not support weak references, resulting in a
72 # TypeError. That seems terrible.
Barry Warsaw5ec0fee2018-01-15 15:07:11 -080073 spec = package.__spec__
74 if hasattr(spec.loader, 'get_resource_reader'):
75 return cast(resources_abc.ResourceReader,
76 spec.loader.get_resource_reader(spec.name))
Barry Warsawdeae6b42017-12-30 15:18:06 -050077 return None
78
79
Barry Warsawbbbcf862018-02-02 15:15:58 -050080def _check_location(package):
81 if package.__spec__.origin is None or not package.__spec__.has_location:
82 raise FileNotFoundError(f'Package has no location {package!r}')
83
84
Barry Warsawdeae6b42017-12-30 15:18:06 -050085def open_binary(package: Package, resource: Resource) -> BinaryIO:
86 """Return a file-like object opened for binary reading of the resource."""
87 resource = _normalize_path(resource)
88 package = _get_package(package)
89 reader = _get_resource_reader(package)
90 if reader is not None:
91 return reader.open_resource(resource)
Barry Warsawbbbcf862018-02-02 15:15:58 -050092 _check_location(package)
Barry Warsawdeae6b42017-12-30 15:18:06 -050093 absolute_package_path = os.path.abspath(package.__spec__.origin)
94 package_path = os.path.dirname(absolute_package_path)
95 full_path = os.path.join(package_path, resource)
96 try:
Barry Warsaw0ed66df2018-05-17 11:41:53 -040097 return open(full_path, mode='rb')
Barry Warsawdeae6b42017-12-30 15:18:06 -050098 except OSError:
99 # Just assume the loader is a resource loader; all the relevant
100 # importlib.machinery loaders are and an AttributeError for
101 # get_data() will make it clear what is needed from the loader.
102 loader = cast(ResourceLoader, package.__spec__.loader)
103 data = None
104 if hasattr(package.__spec__.loader, 'get_data'):
105 with suppress(OSError):
106 data = loader.get_data(full_path)
107 if data is None:
108 package_name = package.__spec__.name
109 message = '{!r} resource not found in {!r}'.format(
110 resource, package_name)
111 raise FileNotFoundError(message)
112 else:
113 return BytesIO(data)
114
115
116def open_text(package: Package,
117 resource: Resource,
118 encoding: str = 'utf-8',
119 errors: str = 'strict') -> TextIO:
120 """Return a file-like object opened for text reading of the resource."""
121 resource = _normalize_path(resource)
122 package = _get_package(package)
123 reader = _get_resource_reader(package)
124 if reader is not None:
125 return TextIOWrapper(reader.open_resource(resource), encoding, errors)
Barry Warsawbbbcf862018-02-02 15:15:58 -0500126 _check_location(package)
Barry Warsawdeae6b42017-12-30 15:18:06 -0500127 absolute_package_path = os.path.abspath(package.__spec__.origin)
128 package_path = os.path.dirname(absolute_package_path)
129 full_path = os.path.join(package_path, resource)
130 try:
Barry Warsaw0ed66df2018-05-17 11:41:53 -0400131 return open(full_path, mode='r', encoding=encoding, errors=errors)
Barry Warsawdeae6b42017-12-30 15:18:06 -0500132 except OSError:
133 # Just assume the loader is a resource loader; all the relevant
134 # importlib.machinery loaders are and an AttributeError for
135 # get_data() will make it clear what is needed from the loader.
136 loader = cast(ResourceLoader, package.__spec__.loader)
137 data = None
138 if hasattr(package.__spec__.loader, 'get_data'):
139 with suppress(OSError):
140 data = loader.get_data(full_path)
141 if data is None:
142 package_name = package.__spec__.name
143 message = '{!r} resource not found in {!r}'.format(
144 resource, package_name)
145 raise FileNotFoundError(message)
146 else:
147 return TextIOWrapper(BytesIO(data), encoding, errors)
148
149
150def read_binary(package: Package, resource: Resource) -> bytes:
151 """Return the binary contents of the resource."""
152 resource = _normalize_path(resource)
153 package = _get_package(package)
154 with open_binary(package, resource) as fp:
155 return fp.read()
156
157
158def read_text(package: Package,
159 resource: Resource,
160 encoding: str = 'utf-8',
161 errors: str = 'strict') -> str:
162 """Return the decoded string of the resource.
163
164 The decoding-related arguments have the same semantics as those of
165 bytes.decode().
166 """
167 resource = _normalize_path(resource)
168 package = _get_package(package)
169 with open_text(package, resource, encoding, errors) as fp:
170 return fp.read()
171
172
173@contextmanager
174def path(package: Package, resource: Resource) -> Iterator[Path]:
175 """A context manager providing a file path object to the resource.
176
177 If the resource does not already exist on its own on the file system,
178 a temporary file will be created. If the file was created, the file
179 will be deleted upon exiting the context manager (no exception is
180 raised if the file was deleted prior to the context manager
181 exiting).
182 """
183 resource = _normalize_path(resource)
184 package = _get_package(package)
185 reader = _get_resource_reader(package)
186 if reader is not None:
187 try:
188 yield Path(reader.resource_path(resource))
189 return
190 except FileNotFoundError:
191 pass
Barry Warsawbbbcf862018-02-02 15:15:58 -0500192 else:
193 _check_location(package)
Barry Warsawdeae6b42017-12-30 15:18:06 -0500194 # Fall-through for both the lack of resource_path() *and* if
195 # resource_path() raises FileNotFoundError.
196 package_directory = Path(package.__spec__.origin).parent
197 file_path = package_directory / resource
198 if file_path.exists():
199 yield file_path
200 else:
201 with open_binary(package, resource) as fp:
202 data = fp.read()
203 # Not using tempfile.NamedTemporaryFile as it leads to deeper 'try'
204 # blocks due to the need to close the temporary file to work on
205 # Windows properly.
206 fd, raw_path = tempfile.mkstemp()
207 try:
208 os.write(fd, data)
209 os.close(fd)
210 yield Path(raw_path)
211 finally:
212 try:
213 os.remove(raw_path)
214 except FileNotFoundError:
215 pass
216
217
218def is_resource(package: Package, name: str) -> bool:
219 """True if 'name' is a resource inside 'package'.
220
221 Directories are *not* resources.
222 """
223 package = _get_package(package)
224 _normalize_path(name)
225 reader = _get_resource_reader(package)
226 if reader is not None:
227 return reader.is_resource(name)
228 try:
229 package_contents = set(contents(package))
230 except (NotADirectoryError, FileNotFoundError):
231 return False
232 if name not in package_contents:
233 return False
234 # Just because the given file_name lives as an entry in the package's
235 # contents doesn't necessarily mean it's a resource. Directories are not
236 # resources, so let's try to find out if it's a directory or not.
237 path = Path(package.__spec__.origin).parent / name
Barry Warsaw6f6eb352018-01-24 15:36:21 -0500238 return path.is_file()
Barry Warsawdeae6b42017-12-30 15:18:06 -0500239
240
Brett Cannon3ab93652018-04-30 11:31:45 -0700241def contents(package: Package) -> Iterable[str]:
242 """Return an iterable of entries in 'package'.
Barry Warsawdeae6b42017-12-30 15:18:06 -0500243
244 Note that not all entries are resources. Specifically, directories are
245 not considered resources. Use `is_resource()` on each entry returned here
246 to check if it is a resource or not.
247 """
248 package = _get_package(package)
249 reader = _get_resource_reader(package)
250 if reader is not None:
Brett Cannon3ab93652018-04-30 11:31:45 -0700251 return reader.contents()
Barry Warsawdeae6b42017-12-30 15:18:06 -0500252 # Is the package a namespace package? By definition, namespace packages
Barry Warsawbbbcf862018-02-02 15:15:58 -0500253 # cannot have resources. We could use _check_location() and catch the
254 # exception, but that's extra work, so just inline the check.
Brett Cannon3ab93652018-04-30 11:31:45 -0700255 elif package.__spec__.origin is None or not package.__spec__.has_location:
256 return ()
257 else:
258 package_directory = Path(package.__spec__.origin).parent
259 return os.listdir(package_directory)