blob: 535c8283e9081dfb33e8892e364d2a8356ad3daa [file] [log] [blame]
Jason R. Coombs67148252021-03-04 13:43:00 -05001import collections
Jason R. Coombs843c2772020-06-07 21:00:51 -04002import zipfile
3import pathlib
4from . import abc
5
6
Jason R. Coombs67148252021-03-04 13:43:00 -05007def remove_duplicates(items):
8 return iter(collections.OrderedDict.fromkeys(items))
9
10
Jason R. Coombs843c2772020-06-07 21:00:51 -040011class FileReader(abc.TraversableResources):
12 def __init__(self, loader):
13 self.path = pathlib.Path(loader.path).parent
14
Jason R. Coombs2fb5f032020-06-29 16:59:22 -040015 def resource_path(self, resource):
16 """
17 Return the file system path to prevent
18 `resources.path()` from creating a temporary
19 copy.
20 """
21 return str(self.path.joinpath(resource))
22
Jason R. Coombs843c2772020-06-07 21:00:51 -040023 def files(self):
24 return self.path
25
26
Jason R. Coombs2fb5f032020-06-29 16:59:22 -040027class ZipReader(abc.TraversableResources):
Jason R. Coombs843c2772020-06-07 21:00:51 -040028 def __init__(self, loader, module):
29 _, _, name = module.rpartition('.')
Jason R. Coombsdf8d4c82020-10-25 14:21:46 -040030 self.prefix = loader.prefix.replace('\\', '/') + name + '/'
31 self.archive = loader.archive
Jason R. Coombs843c2772020-06-07 21:00:51 -040032
33 def open_resource(self, resource):
34 try:
35 return super().open_resource(resource)
36 except KeyError as exc:
37 raise FileNotFoundError(exc.args[0])
38
39 def is_resource(self, path):
40 # workaround for `zipfile.Path.is_file` returning true
41 # for non-existent paths.
42 target = self.files().joinpath(path)
43 return target.is_file() and target.exists()
Jason R. Coombs2fb5f032020-06-29 16:59:22 -040044
45 def files(self):
Jason R. Coombsdf8d4c82020-10-25 14:21:46 -040046 return zipfile.Path(self.archive, self.prefix)
Jason R. Coombs67148252021-03-04 13:43:00 -050047
48
49class MultiplexedPath(abc.Traversable):
50 """
51 Given a series of Traversable objects, implement a merged
52 version of the interface across all objects. Useful for
53 namespace packages which may be multihomed at a single
54 name.
55 """
56
57 def __init__(self, *paths):
58 self._paths = list(map(pathlib.Path, remove_duplicates(paths)))
59 if not self._paths:
60 message = 'MultiplexedPath must contain at least one path'
61 raise FileNotFoundError(message)
62 if not all(path.is_dir() for path in self._paths):
63 raise NotADirectoryError('MultiplexedPath only supports directories')
64
65 def iterdir(self):
66 visited = []
67 for path in self._paths:
68 for file in path.iterdir():
69 if file.name in visited:
70 continue
71 visited.append(file.name)
72 yield file
73
74 def read_bytes(self):
75 raise FileNotFoundError(f'{self} is not a file')
76
77 def read_text(self, *args, **kwargs):
78 raise FileNotFoundError(f'{self} is not a file')
79
80 def is_dir(self):
81 return True
82
83 def is_file(self):
84 return False
85
86 def joinpath(self, child):
87 # first try to find child in current paths
88 for file in self.iterdir():
89 if file.name == child:
90 return file
91 # if it does not exist, construct it with the first path
92 return self._paths[0] / child
93
94 __truediv__ = joinpath
95
96 def open(self, *args, **kwargs):
97 raise FileNotFoundError('{} is not a file'.format(self))
98
99 def name(self):
100 return self._paths[0].name
101
102 def __repr__(self):
103 return 'MultiplexedPath({})'.format(
104 ', '.join("'{}'".format(path) for path in self._paths)
105 )
106
107
108class NamespaceReader(abc.TraversableResources):
109 def __init__(self, namespace_path):
110 if 'NamespacePath' not in str(namespace_path):
111 raise ValueError('Invalid path')
112 self.path = MultiplexedPath(*list(namespace_path))
113
114 def resource_path(self, resource):
115 """
116 Return the file system path to prevent
117 `resources.path()` from creating a temporary
118 copy.
119 """
120 return str(self.path.joinpath(resource))
121
122 def files(self):
123 return self.path