Jason R. Coombs | 6714825 | 2021-03-04 13:43:00 -0500 | [diff] [blame^] | 1 | import collections |
Jason R. Coombs | 843c277 | 2020-06-07 21:00:51 -0400 | [diff] [blame] | 2 | import zipfile |
| 3 | import pathlib |
| 4 | from . import abc |
| 5 | |
| 6 | |
Jason R. Coombs | 6714825 | 2021-03-04 13:43:00 -0500 | [diff] [blame^] | 7 | def remove_duplicates(items): |
| 8 | return iter(collections.OrderedDict.fromkeys(items)) |
| 9 | |
| 10 | |
Jason R. Coombs | 843c277 | 2020-06-07 21:00:51 -0400 | [diff] [blame] | 11 | class FileReader(abc.TraversableResources): |
| 12 | def __init__(self, loader): |
| 13 | self.path = pathlib.Path(loader.path).parent |
| 14 | |
Jason R. Coombs | 2fb5f03 | 2020-06-29 16:59:22 -0400 | [diff] [blame] | 15 | def resource_path(self, resource): |
| 16 | """ |
| 17 | Return the file system path to prevent |
| 18 | `resources.path()` from creating a temporary |
| 19 | copy. |
| 20 | """ |
| 21 | return str(self.path.joinpath(resource)) |
| 22 | |
Jason R. Coombs | 843c277 | 2020-06-07 21:00:51 -0400 | [diff] [blame] | 23 | def files(self): |
| 24 | return self.path |
| 25 | |
| 26 | |
Jason R. Coombs | 2fb5f03 | 2020-06-29 16:59:22 -0400 | [diff] [blame] | 27 | class ZipReader(abc.TraversableResources): |
Jason R. Coombs | 843c277 | 2020-06-07 21:00:51 -0400 | [diff] [blame] | 28 | def __init__(self, loader, module): |
| 29 | _, _, name = module.rpartition('.') |
Jason R. Coombs | df8d4c8 | 2020-10-25 14:21:46 -0400 | [diff] [blame] | 30 | self.prefix = loader.prefix.replace('\\', '/') + name + '/' |
| 31 | self.archive = loader.archive |
Jason R. Coombs | 843c277 | 2020-06-07 21:00:51 -0400 | [diff] [blame] | 32 | |
| 33 | def open_resource(self, resource): |
| 34 | try: |
| 35 | return super().open_resource(resource) |
| 36 | except KeyError as exc: |
| 37 | raise FileNotFoundError(exc.args[0]) |
| 38 | |
| 39 | def is_resource(self, path): |
| 40 | # workaround for `zipfile.Path.is_file` returning true |
| 41 | # for non-existent paths. |
| 42 | target = self.files().joinpath(path) |
| 43 | return target.is_file() and target.exists() |
Jason R. Coombs | 2fb5f03 | 2020-06-29 16:59:22 -0400 | [diff] [blame] | 44 | |
| 45 | def files(self): |
Jason R. Coombs | df8d4c8 | 2020-10-25 14:21:46 -0400 | [diff] [blame] | 46 | return zipfile.Path(self.archive, self.prefix) |
Jason R. Coombs | 6714825 | 2021-03-04 13:43:00 -0500 | [diff] [blame^] | 47 | |
| 48 | |
| 49 | class MultiplexedPath(abc.Traversable): |
| 50 | """ |
| 51 | Given a series of Traversable objects, implement a merged |
| 52 | version of the interface across all objects. Useful for |
| 53 | namespace packages which may be multihomed at a single |
| 54 | name. |
| 55 | """ |
| 56 | |
| 57 | def __init__(self, *paths): |
| 58 | self._paths = list(map(pathlib.Path, remove_duplicates(paths))) |
| 59 | if not self._paths: |
| 60 | message = 'MultiplexedPath must contain at least one path' |
| 61 | raise FileNotFoundError(message) |
| 62 | if not all(path.is_dir() for path in self._paths): |
| 63 | raise NotADirectoryError('MultiplexedPath only supports directories') |
| 64 | |
| 65 | def iterdir(self): |
| 66 | visited = [] |
| 67 | for path in self._paths: |
| 68 | for file in path.iterdir(): |
| 69 | if file.name in visited: |
| 70 | continue |
| 71 | visited.append(file.name) |
| 72 | yield file |
| 73 | |
| 74 | def read_bytes(self): |
| 75 | raise FileNotFoundError(f'{self} is not a file') |
| 76 | |
| 77 | def read_text(self, *args, **kwargs): |
| 78 | raise FileNotFoundError(f'{self} is not a file') |
| 79 | |
| 80 | def is_dir(self): |
| 81 | return True |
| 82 | |
| 83 | def is_file(self): |
| 84 | return False |
| 85 | |
| 86 | def joinpath(self, child): |
| 87 | # first try to find child in current paths |
| 88 | for file in self.iterdir(): |
| 89 | if file.name == child: |
| 90 | return file |
| 91 | # if it does not exist, construct it with the first path |
| 92 | return self._paths[0] / child |
| 93 | |
| 94 | __truediv__ = joinpath |
| 95 | |
| 96 | def open(self, *args, **kwargs): |
| 97 | raise FileNotFoundError('{} is not a file'.format(self)) |
| 98 | |
| 99 | def name(self): |
| 100 | return self._paths[0].name |
| 101 | |
| 102 | def __repr__(self): |
| 103 | return 'MultiplexedPath({})'.format( |
| 104 | ', '.join("'{}'".format(path) for path in self._paths) |
| 105 | ) |
| 106 | |
| 107 | |
| 108 | class NamespaceReader(abc.TraversableResources): |
| 109 | def __init__(self, namespace_path): |
| 110 | if 'NamespacePath' not in str(namespace_path): |
| 111 | raise ValueError('Invalid path') |
| 112 | self.path = MultiplexedPath(*list(namespace_path)) |
| 113 | |
| 114 | def resource_path(self, resource): |
| 115 | """ |
| 116 | Return the file system path to prevent |
| 117 | `resources.path()` from creating a temporary |
| 118 | copy. |
| 119 | """ |
| 120 | return str(self.path.joinpath(resource)) |
| 121 | |
| 122 | def files(self): |
| 123 | return self.path |