bpo-39791: Add files() to importlib.resources (GH-19722)
* bpo-39791: Update importlib.resources to support files() API (importlib_resources 1.5).
* 📜🤖 Added by blurb_it.
* Add some documentation about the new objects added.
Co-authored-by: blurb-it[bot] <43283697+blurb-it[bot]@users.noreply.github.com>
diff --git a/Lib/importlib/resources.py b/Lib/importlib/resources.py
index f518865..b803a01 100644
--- a/Lib/importlib/resources.py
+++ b/Lib/importlib/resources.py
@@ -1,14 +1,15 @@
import os
-import tempfile
from . import abc as resources_abc
+from . import _common
+from ._common import as_file
from contextlib import contextmanager, suppress
from importlib import import_module
from importlib.abc import ResourceLoader
from io import BytesIO, TextIOWrapper
from pathlib import Path
from types import ModuleType
-from typing import Iterable, Iterator, Optional, Union # noqa: F401
+from typing import ContextManager, Iterable, Optional, Union
from typing import cast
from typing.io import BinaryIO, TextIO
@@ -16,7 +17,9 @@
__all__ = [
'Package',
'Resource',
+ 'as_file',
'contents',
+ 'files',
'is_resource',
'open_binary',
'open_text',
@@ -30,24 +33,23 @@
Resource = Union[str, os.PathLike]
+def _resolve(name) -> ModuleType:
+ """If name is a string, resolve to a module."""
+ if hasattr(name, '__spec__'):
+ return name
+ return import_module(name)
+
+
def _get_package(package) -> ModuleType:
"""Take a package name or module object and return the module.
- If a name, the module is imported. If the passed or imported module
+ If a name, the module is imported. If the resolved module
object is not a package, raise an exception.
"""
- if hasattr(package, '__spec__'):
- if package.__spec__.submodule_search_locations is None:
- raise TypeError('{!r} is not a package'.format(
- package.__spec__.name))
- else:
- return package
- else:
- module = import_module(package)
- if module.__spec__.submodule_search_locations is None:
- raise TypeError('{!r} is not a package'.format(package))
- else:
- return module
+ module = _resolve(package)
+ if module.__spec__.submodule_search_locations is None:
+ raise TypeError('{!r} is not a package'.format(package))
+ return module
def _normalize_path(path) -> str:
@@ -58,8 +60,7 @@
parent, file_name = os.path.split(path)
if parent:
raise ValueError('{!r} must be only a file name'.format(path))
- else:
- return file_name
+ return file_name
def _get_resource_reader(
@@ -88,8 +89,8 @@
reader = _get_resource_reader(package)
if reader is not None:
return reader.open_resource(resource)
- _check_location(package)
- absolute_package_path = os.path.abspath(package.__spec__.origin)
+ absolute_package_path = os.path.abspath(
+ package.__spec__.origin or 'non-existent file')
package_path = os.path.dirname(absolute_package_path)
full_path = os.path.join(package_path, resource)
try:
@@ -108,8 +109,7 @@
message = '{!r} resource not found in {!r}'.format(
resource, package_name)
raise FileNotFoundError(message)
- else:
- return BytesIO(data)
+ return BytesIO(data)
def open_text(package: Package,
@@ -117,39 +117,12 @@
encoding: str = 'utf-8',
errors: str = 'strict') -> TextIO:
"""Return a file-like object opened for text reading of the resource."""
- resource = _normalize_path(resource)
- package = _get_package(package)
- reader = _get_resource_reader(package)
- if reader is not None:
- return TextIOWrapper(reader.open_resource(resource), encoding, errors)
- _check_location(package)
- absolute_package_path = os.path.abspath(package.__spec__.origin)
- package_path = os.path.dirname(absolute_package_path)
- full_path = os.path.join(package_path, resource)
- try:
- return open(full_path, mode='r', encoding=encoding, errors=errors)
- except OSError:
- # Just assume the loader is a resource loader; all the relevant
- # importlib.machinery loaders are and an AttributeError for
- # get_data() will make it clear what is needed from the loader.
- loader = cast(ResourceLoader, package.__spec__.loader)
- data = None
- if hasattr(package.__spec__.loader, 'get_data'):
- with suppress(OSError):
- data = loader.get_data(full_path)
- if data is None:
- package_name = package.__spec__.name
- message = '{!r} resource not found in {!r}'.format(
- resource, package_name)
- raise FileNotFoundError(message)
- else:
- return TextIOWrapper(BytesIO(data), encoding, errors)
+ return TextIOWrapper(
+ open_binary(package, resource), encoding=encoding, errors=errors)
def read_binary(package: Package, resource: Resource) -> bytes:
"""Return the binary contents of the resource."""
- resource = _normalize_path(resource)
- package = _get_package(package)
with open_binary(package, resource) as fp:
return fp.read()
@@ -163,14 +136,20 @@
The decoding-related arguments have the same semantics as those of
bytes.decode().
"""
- resource = _normalize_path(resource)
- package = _get_package(package)
with open_text(package, resource, encoding, errors) as fp:
return fp.read()
-@contextmanager
-def path(package: Package, resource: Resource) -> Iterator[Path]:
+def files(package: Package) -> resources_abc.Traversable:
+ """
+ Get a Traversable resource from a package
+ """
+ return _common.from_package(_get_package(package))
+
+
+def path(
+ package: Package, resource: Resource,
+ ) -> 'ContextManager[Path]':
"""A context manager providing a file path object to the resource.
If the resource does not already exist on its own on the file system,
@@ -179,39 +158,23 @@
raised if the file was deleted prior to the context manager
exiting).
"""
- resource = _normalize_path(resource)
- package = _get_package(package)
- reader = _get_resource_reader(package)
- if reader is not None:
- try:
- yield Path(reader.resource_path(resource))
- return
- except FileNotFoundError:
- pass
- else:
- _check_location(package)
- # Fall-through for both the lack of resource_path() *and* if
- # resource_path() raises FileNotFoundError.
- package_directory = Path(package.__spec__.origin).parent
- file_path = package_directory / resource
- if file_path.exists():
- yield file_path
- else:
- with open_binary(package, resource) as fp:
- data = fp.read()
- # Not using tempfile.NamedTemporaryFile as it leads to deeper 'try'
- # blocks due to the need to close the temporary file to work on
- # Windows properly.
- fd, raw_path = tempfile.mkstemp()
- try:
- os.write(fd, data)
- os.close(fd)
- yield Path(raw_path)
- finally:
- try:
- os.remove(raw_path)
- except FileNotFoundError:
- pass
+ reader = _get_resource_reader(_get_package(package))
+ return (
+ _path_from_reader(reader, resource)
+ if reader else
+ _common.as_file(files(package).joinpath(_normalize_path(resource)))
+ )
+
+
+@contextmanager
+def _path_from_reader(reader, resource):
+ norm_resource = _normalize_path(resource)
+ with suppress(FileNotFoundError):
+ yield Path(reader.resource_path(norm_resource))
+ return
+ opener_reader = reader.open_resource(norm_resource)
+ with _common._tempfile(opener_reader.read, suffix=norm_resource) as res:
+ yield res
def is_resource(package: Package, name: str) -> bool:
@@ -224,17 +187,10 @@
reader = _get_resource_reader(package)
if reader is not None:
return reader.is_resource(name)
- try:
- package_contents = set(contents(package))
- except (NotADirectoryError, FileNotFoundError):
- return False
+ package_contents = set(contents(package))
if name not in package_contents:
return False
- # Just because the given file_name lives as an entry in the package's
- # contents doesn't necessarily mean it's a resource. Directories are not
- # resources, so let's try to find out if it's a directory or not.
- path = Path(package.__spec__.origin).parent / name
- return path.is_file()
+ return (_common.from_package(package) / name).is_file()
def contents(package: Package) -> Iterable[str]:
@@ -249,10 +205,11 @@
if reader is not None:
return reader.contents()
# Is the package a namespace package? By definition, namespace packages
- # cannot have resources. We could use _check_location() and catch the
- # exception, but that's extra work, so just inline the check.
- elif package.__spec__.origin is None or not package.__spec__.has_location:
+ # cannot have resources.
+ namespace = (
+ package.__spec__.origin is None or
+ package.__spec__.origin == 'namespace'
+ )
+ if namespace or not package.__spec__.has_location:
return ()
- else:
- package_directory = Path(package.__spec__.origin).parent
- return os.listdir(package_directory)
+ return list(item.name for item in _common.from_package(package).iterdir())