blob: 35733c8bf5de071224bf9df687c94df5c696e908 [file] [log] [blame]
Tarek Ziade1231a4e2011-05-19 13:07:25 +02001"""Tests for the packaging.install module."""
Tarek Ziade1231a4e2011-05-19 13:07:25 +02002import os
Éric Araujofa6cfbc2011-06-10 18:31:40 +02003import logging
4from sysconfig import is_python_build
Tarek Ziade1231a4e2011-05-19 13:07:25 +02005from tempfile import mkstemp
Tarek Ziade5a5ce382011-05-31 12:09:34 +02006
Tarek Ziade1231a4e2011-05-19 13:07:25 +02007from packaging import install
8from packaging.pypi.xmlrpc import Client
9from packaging.metadata import Metadata
Tarek Ziade5eb55592011-05-25 23:46:09 +020010from packaging.tests.support import (LoggingCatcher, TempdirManager, unittest,
11 fake_dec)
Tarek Ziadefd883182011-05-19 15:26:59 +020012try:
13 import threading
14 from packaging.tests.pypi_server import use_xmlrpc_server
15except ImportError:
16 threading = None
Tarek Ziade5eb55592011-05-25 23:46:09 +020017 use_xmlrpc_server = fake_dec
Tarek Ziade1231a4e2011-05-19 13:07:25 +020018
19
20class InstalledDist:
21 """Distribution object, represent distributions currently installed on the
22 system"""
23 def __init__(self, name, version, deps):
24 self.metadata = Metadata()
25 self.name = name
26 self.version = version
27 self.metadata['Name'] = name
28 self.metadata['Version'] = version
29 self.metadata['Requires-Dist'] = deps
30
31 def __repr__(self):
32 return '<InstalledDist %s>' % self.metadata['Name']
33
34
35class ToInstallDist:
36 """Distribution that will be installed"""
37
38 def __init__(self, files=False):
39 self._files = files
40 self.install_called = False
41 self.install_called_with = {}
42 self.uninstall_called = False
43 self._real_files = []
44 self.name = "fake"
45 self.version = "fake"
46 if files:
47 for f in range(0, 3):
Tarek Ziade4bdd9f32011-05-21 15:12:10 +020048 fp, fn = mkstemp()
49 os.close(fp)
50 self._real_files.append(fn)
Tarek Ziade1231a4e2011-05-19 13:07:25 +020051
52 def _unlink_installed_files(self):
53 if self._files:
Tarek Ziade4bdd9f32011-05-21 15:12:10 +020054 for fn in self._real_files:
55 os.unlink(fn)
Tarek Ziade1231a4e2011-05-19 13:07:25 +020056
57 def list_installed_files(self, **args):
58 if self._files:
Tarek Ziade4bdd9f32011-05-21 15:12:10 +020059 return self._real_files
Tarek Ziade1231a4e2011-05-19 13:07:25 +020060
61 def get_install(self, **args):
62 return self.list_installed_files()
63
64
65class MagicMock:
66 def __init__(self, return_value=None, raise_exception=False):
67 self.called = False
68 self._times_called = 0
69 self._called_with = []
70 self._return_value = return_value
71 self._raise = raise_exception
72
73 def __call__(self, *args, **kwargs):
74 self.called = True
75 self._times_called = self._times_called + 1
76 self._called_with.append((args, kwargs))
77 iterable = hasattr(self._raise, '__iter__')
78 if self._raise:
79 if ((not iterable and self._raise)
80 or self._raise[self._times_called - 1]):
81 raise Exception
82 return self._return_value
83
84 def called_with(self, *args, **kwargs):
85 return (args, kwargs) in self._called_with
86
87
88def get_installed_dists(dists):
89 """Return a list of fake installed dists.
90 The list is name, version, deps"""
91 objects = []
92 for name, version, deps in dists:
93 objects.append(InstalledDist(name, version, deps))
94 return objects
95
96
97class TestInstall(LoggingCatcher, TempdirManager, unittest.TestCase):
98 def _get_client(self, server, *args, **kwargs):
99 return Client(server.full_address, *args, **kwargs)
100
101 def _get_results(self, output):
102 """return a list of results"""
103 installed = [(o.name, str(o.version)) for o in output['install']]
104 remove = [(o.name, str(o.version)) for o in output['remove']]
105 conflict = [(o.name, str(o.version)) for o in output['conflict']]
106 return installed, remove, conflict
107
Tarek Ziadefd883182011-05-19 15:26:59 +0200108 @unittest.skipIf(threading is None, 'needs threading')
Tarek Ziade1231a4e2011-05-19 13:07:25 +0200109 @use_xmlrpc_server()
110 def test_existing_deps(self, server):
111 # Test that the installer get the dependencies from the metadatas
112 # and ask the index for this dependencies.
113 # In this test case, we have choxie that is dependent from towel-stuff
114 # 0.1, which is in-turn dependent on bacon <= 0.2:
115 # choxie -> towel-stuff -> bacon.
116 # Each release metadata is not provided in metadata 1.2.
117 client = self._get_client(server)
118 archive_path = '%s/distribution.tar.gz' % server.full_address
119 server.xmlrpc.set_distributions([
120 {'name': 'choxie',
121 'version': '2.0.0.9',
122 'requires_dist': ['towel-stuff (0.1)'],
123 'url': archive_path},
124 {'name': 'towel-stuff',
125 'version': '0.1',
126 'requires_dist': ['bacon (<= 0.2)'],
127 'url': archive_path},
128 {'name': 'bacon',
129 'version': '0.1',
130 'requires_dist': [],
131 'url': archive_path},
132 ])
133 installed = get_installed_dists([('bacon', '0.1', [])])
134 output = install.get_infos("choxie", index=client,
135 installed=installed)
136
137 # we don't have installed bacon as it's already installed system-wide
138 self.assertEqual(0, len(output['remove']))
139 self.assertEqual(2, len(output['install']))
140 readable_output = [(o.name, str(o.version))
141 for o in output['install']]
142 self.assertIn(('towel-stuff', '0.1'), readable_output)
143 self.assertIn(('choxie', '2.0.0.9'), readable_output)
144
Tarek Ziadefd883182011-05-19 15:26:59 +0200145 @unittest.skipIf(threading is None, 'needs threading')
Tarek Ziade1231a4e2011-05-19 13:07:25 +0200146 @use_xmlrpc_server()
147 def test_upgrade_existing_deps(self, server):
148 client = self._get_client(server)
149 archive_path = '%s/distribution.tar.gz' % server.full_address
150 server.xmlrpc.set_distributions([
151 {'name': 'choxie',
152 'version': '2.0.0.9',
153 'requires_dist': ['towel-stuff (0.1)'],
154 'url': archive_path},
155 {'name': 'towel-stuff',
156 'version': '0.1',
157 'requires_dist': ['bacon (>= 0.2)'],
158 'url': archive_path},
159 {'name': 'bacon',
160 'version': '0.2',
161 'requires_dist': [],
162 'url': archive_path},
163 ])
164
165 output = install.get_infos("choxie", index=client,
166 installed=get_installed_dists([('bacon', '0.1', [])]))
167 installed = [(o.name, str(o.version)) for o in output['install']]
168
169 # we need bacon 0.2, but 0.1 is installed.
170 # So we expect to remove 0.1 and to install 0.2 instead.
171 remove = [(o.name, str(o.version)) for o in output['remove']]
172 self.assertIn(('choxie', '2.0.0.9'), installed)
173 self.assertIn(('towel-stuff', '0.1'), installed)
174 self.assertIn(('bacon', '0.2'), installed)
175 self.assertIn(('bacon', '0.1'), remove)
176 self.assertEqual(0, len(output['conflict']))
177
Tarek Ziadefd883182011-05-19 15:26:59 +0200178 @unittest.skipIf(threading is None, 'needs threading')
Tarek Ziade1231a4e2011-05-19 13:07:25 +0200179 @use_xmlrpc_server()
180 def test_conflicts(self, server):
181 # Tests that conflicts are detected
182 client = self._get_client(server)
183 archive_path = '%s/distribution.tar.gz' % server.full_address
184
185 # choxie depends on towel-stuff, which depends on bacon.
186 server.xmlrpc.set_distributions([
187 {'name': 'choxie',
188 'version': '2.0.0.9',
189 'requires_dist': ['towel-stuff (0.1)'],
190 'url': archive_path},
191 {'name': 'towel-stuff',
192 'version': '0.1',
193 'requires_dist': ['bacon (>= 0.2)'],
194 'url': archive_path},
195 {'name': 'bacon',
196 'version': '0.2',
197 'requires_dist': [],
198 'url': archive_path},
199 ])
200
201 # name, version, deps.
202 already_installed = [('bacon', '0.1', []),
203 ('chicken', '1.1', ['bacon (0.1)'])]
204 output = install.get_infos(
205 'choxie', index=client,
206 installed=get_installed_dists(already_installed))
207
208 # we need bacon 0.2, but 0.1 is installed.
209 # So we expect to remove 0.1 and to install 0.2 instead.
210 installed, remove, conflict = self._get_results(output)
211 self.assertIn(('choxie', '2.0.0.9'), installed)
212 self.assertIn(('towel-stuff', '0.1'), installed)
213 self.assertIn(('bacon', '0.2'), installed)
214 self.assertIn(('bacon', '0.1'), remove)
215 self.assertIn(('chicken', '1.1'), conflict)
216
Tarek Ziadefd883182011-05-19 15:26:59 +0200217 @unittest.skipIf(threading is None, 'needs threading')
Tarek Ziade1231a4e2011-05-19 13:07:25 +0200218 @use_xmlrpc_server()
219 def test_installation_unexisting_project(self, server):
220 # Test that the isntalled raises an exception if the project does not
221 # exists.
222 client = self._get_client(server)
223 self.assertRaises(install.InstallationException,
224 install.get_infos,
225 'unexisting project', index=client)
226
227 def test_move_files(self):
228 # test that the files are really moved, and that the new path is
229 # returned.
230 path = self.mkdtemp()
231 newpath = self.mkdtemp()
232 files = [os.path.join(path, str(x)) for x in range(1, 20)]
233 for f in files:
Victor Stinner4c9706b2011-05-19 15:52:59 +0200234 open(f, 'ab+').close()
Tarek Ziade1231a4e2011-05-19 13:07:25 +0200235 output = [o for o in install._move_files(files, newpath)]
236
237 # check that output return the list of old/new places
Tarek Ziade4bdd9f32011-05-21 15:12:10 +0200238 for file_ in files:
239 name = os.path.split(file_)[-1]
240 newloc = os.path.join(newpath, name)
241 self.assertIn((file_, newloc), output)
Tarek Ziade1231a4e2011-05-19 13:07:25 +0200242
243 # remove the files
244 for f in [o[1] for o in output]: # o[1] is the new place
245 os.remove(f)
246
247 def test_update_infos(self):
248 tests = [[
249 {'foo': ['foobar', 'foo', 'baz'], 'baz': ['foo', 'foo']},
250 {'foo': ['additional_content', 'yeah'], 'baz': ['test', 'foo']},
251 {'foo': ['foobar', 'foo', 'baz', 'additional_content', 'yeah'],
252 'baz': ['foo', 'foo', 'test', 'foo']},
253 ]]
254
255 for dict1, dict2, expect in tests:
256 install._update_infos(dict1, dict2)
257 for key in expect:
258 self.assertEqual(expect[key], dict1[key])
259
260 def test_install_dists_rollback(self):
261 # if one of the distribution installation fails, call uninstall on all
262 # installed distributions.
263
264 old_install_dist = install._install_dist
265 old_uninstall = getattr(install, 'uninstall', None)
266
267 install._install_dist = MagicMock(return_value=[],
268 raise_exception=(False, True))
269 install.remove = MagicMock()
270 try:
271 d1 = ToInstallDist()
272 d2 = ToInstallDist()
273 path = self.mkdtemp()
274 self.assertRaises(Exception, install.install_dists, [d1, d2], path)
275 self.assertTrue(install._install_dist.called_with(d1, path))
276 self.assertTrue(install.remove.called)
277 finally:
278 install._install_dist = old_install_dist
279 install.remove = old_uninstall
280
281 def test_install_dists_success(self):
282 old_install_dist = install._install_dist
283 install._install_dist = MagicMock(return_value=[])
284 try:
285 # test that the install method is called on each distributions
286 d1 = ToInstallDist()
287 d2 = ToInstallDist()
288
289 # should call install
290 path = self.mkdtemp()
291 install.install_dists([d1, d2], path)
292 for dist in (d1, d2):
293 self.assertTrue(install._install_dist.called_with(dist, path))
294 finally:
295 install._install_dist = old_install_dist
296
297 def test_install_from_infos_conflict(self):
298 # assert conflicts raise an exception
299 self.assertRaises(install.InstallationConflict,
300 install.install_from_infos,
301 conflicts=[ToInstallDist()])
302
303 def test_install_from_infos_remove_success(self):
304 old_install_dists = install.install_dists
305 install.install_dists = lambda x, y=None: None
306 try:
307 dists = []
308 for i in range(2):
309 dists.append(ToInstallDist(files=True))
310 install.install_from_infos(remove=dists)
311
312 # assert that the files have been removed
313 for dist in dists:
314 for f in dist.list_installed_files():
315 self.assertFalse(os.path.exists(f))
316 finally:
317 install.install_dists = old_install_dists
318
319 def test_install_from_infos_remove_rollback(self):
320 old_install_dist = install._install_dist
321 old_uninstall = getattr(install, 'uninstall', None)
322
323 install._install_dist = MagicMock(return_value=[],
324 raise_exception=(False, True))
325 install.uninstall = MagicMock()
326 try:
327 # assert that if an error occurs, the removed files are restored.
328 remove = []
329 for i in range(2):
330 remove.append(ToInstallDist(files=True))
331 to_install = [ToInstallDist(), ToInstallDist()]
332 temp_dir = self.mkdtemp()
333
334 self.assertRaises(Exception, install.install_from_infos,
335 install_path=temp_dir, install=to_install,
336 remove=remove)
337 # assert that the files are in the same place
338 # assert that the files have been removed
339 for dist in remove:
340 for f in dist.list_installed_files():
341 self.assertTrue(os.path.exists(f))
342 dist._unlink_installed_files()
343 finally:
Tarek Ziade5a5ce382011-05-31 12:09:34 +0200344 install._install_dist = old_install_dist
Tarek Ziade1231a4e2011-05-19 13:07:25 +0200345 install.uninstall = old_uninstall
346
347 def test_install_from_infos_install_succes(self):
348 old_install_dist = install._install_dist
349 install._install_dist = MagicMock([])
350 try:
351 # assert that the distribution can be installed
352 install_path = "my_install_path"
353 to_install = [ToInstallDist(), ToInstallDist()]
354
355 install.install_from_infos(install_path, install=to_install)
356 for dist in to_install:
357 install._install_dist.called_with(install_path)
358 finally:
359 install._install_dist = old_install_dist
360
Tarek Ziade5a5ce382011-05-31 12:09:34 +0200361 def test_install_permission_denied(self):
Éric Araujofa6cfbc2011-06-10 18:31:40 +0200362 # if we don't have access to the installation path, we should abort
363 # immediately
Tarek Ziade5a5ce382011-05-31 12:09:34 +0200364 project = os.path.join(os.path.dirname(__file__), 'package.tgz')
Éric Araujofa6cfbc2011-06-10 18:31:40 +0200365
366 # when running from an uninstalled build, a warning is emitted and the
367 # installation is not attempted
368 if is_python_build():
369 self.assertFalse(install.install(project))
370 self.assertEqual(1, len(self.get_logs(logging.ERROR)))
371 return
372
Tarek Ziade5a5ce382011-05-31 12:09:34 +0200373 install_path = self.mkdtemp()
374 old_get_path = install.get_path
375 install.get_path = lambda path: install_path
376 old_mod = os.stat(install_path).st_mode
377 os.chmod(install_path, 0)
378 try:
379 self.assertFalse(install.install(project))
380 finally:
381 os.chmod(install_path, old_mod)
382 install.get_path = old_get_path
383
Tarek Ziade1231a4e2011-05-19 13:07:25 +0200384
385def test_suite():
386 suite = unittest.TestSuite()
387 suite.addTest(unittest.makeSuite(TestInstall))
388 return suite
389
390if __name__ == '__main__':
391 unittest.main(defaultTest='test_suite')