| """Tests for the packaging.install module.""" |
| import os |
| import logging |
| from sysconfig import is_python_build |
| from tempfile import mkstemp |
| |
| from packaging import install |
| from packaging.pypi.xmlrpc import Client |
| from packaging.metadata import Metadata |
| from packaging.tests.support import (LoggingCatcher, TempdirManager, unittest, |
| fake_dec) |
| try: |
| import threading |
| from packaging.tests.pypi_server import use_xmlrpc_server |
| except ImportError: |
| threading = None |
| use_xmlrpc_server = fake_dec |
| |
| |
| class InstalledDist: |
| """Distribution object, represent distributions currently installed on the |
| system""" |
| def __init__(self, name, version, deps): |
| self.metadata = Metadata() |
| self.name = name |
| self.version = version |
| self.metadata['Name'] = name |
| self.metadata['Version'] = version |
| self.metadata['Requires-Dist'] = deps |
| |
| def __repr__(self): |
| return '<InstalledDist %s>' % self.metadata['Name'] |
| |
| |
| class ToInstallDist: |
| """Distribution that will be installed""" |
| |
| def __init__(self, files=False): |
| self._files = files |
| self.install_called = False |
| self.install_called_with = {} |
| self.uninstall_called = False |
| self._real_files = [] |
| self.name = "fake" |
| self.version = "fake" |
| if files: |
| for f in range(0, 3): |
| fp, fn = mkstemp() |
| os.close(fp) |
| self._real_files.append(fn) |
| |
| def _unlink_installed_files(self): |
| if self._files: |
| for fn in self._real_files: |
| os.unlink(fn) |
| |
| def list_installed_files(self, **args): |
| if self._files: |
| return self._real_files |
| |
| def get_install(self, **args): |
| return self.list_installed_files() |
| |
| |
| class MagicMock: |
| def __init__(self, return_value=None, raise_exception=False): |
| self.called = False |
| self._times_called = 0 |
| self._called_with = [] |
| self._return_value = return_value |
| self._raise = raise_exception |
| |
| def __call__(self, *args, **kwargs): |
| self.called = True |
| self._times_called = self._times_called + 1 |
| self._called_with.append((args, kwargs)) |
| iterable = hasattr(self._raise, '__iter__') |
| if self._raise: |
| if ((not iterable and self._raise) |
| or self._raise[self._times_called - 1]): |
| raise Exception |
| return self._return_value |
| |
| def called_with(self, *args, **kwargs): |
| return (args, kwargs) in self._called_with |
| |
| |
| def get_installed_dists(dists): |
| """Return a list of fake installed dists. |
| The list is name, version, deps""" |
| objects = [] |
| for name, version, deps in dists: |
| objects.append(InstalledDist(name, version, deps)) |
| return objects |
| |
| |
| class TestInstall(LoggingCatcher, TempdirManager, unittest.TestCase): |
| def _get_client(self, server, *args, **kwargs): |
| return Client(server.full_address, *args, **kwargs) |
| |
| def _get_results(self, output): |
| """return a list of results""" |
| installed = [(o.name, str(o.version)) for o in output['install']] |
| remove = [(o.name, str(o.version)) for o in output['remove']] |
| conflict = [(o.name, str(o.version)) for o in output['conflict']] |
| return installed, remove, conflict |
| |
| @unittest.skipIf(threading is None, 'needs threading') |
| @use_xmlrpc_server() |
| def test_existing_deps(self, server): |
| # Test that the installer get the dependencies from the metadatas |
| # and ask the index for this dependencies. |
| # In this test case, we have choxie that is dependent from towel-stuff |
| # 0.1, which is in-turn dependent on bacon <= 0.2: |
| # choxie -> towel-stuff -> bacon. |
| # Each release metadata is not provided in metadata 1.2. |
| client = self._get_client(server) |
| archive_path = '%s/distribution.tar.gz' % server.full_address |
| server.xmlrpc.set_distributions([ |
| {'name': 'choxie', |
| 'version': '2.0.0.9', |
| 'requires_dist': ['towel-stuff (0.1)'], |
| 'url': archive_path}, |
| {'name': 'towel-stuff', |
| 'version': '0.1', |
| 'requires_dist': ['bacon (<= 0.2)'], |
| 'url': archive_path}, |
| {'name': 'bacon', |
| 'version': '0.1', |
| 'requires_dist': [], |
| 'url': archive_path}, |
| ]) |
| installed = get_installed_dists([('bacon', '0.1', [])]) |
| output = install.get_infos("choxie", index=client, |
| installed=installed) |
| |
| # we don't have installed bacon as it's already installed system-wide |
| self.assertEqual(0, len(output['remove'])) |
| self.assertEqual(2, len(output['install'])) |
| readable_output = [(o.name, str(o.version)) |
| for o in output['install']] |
| self.assertIn(('towel-stuff', '0.1'), readable_output) |
| self.assertIn(('choxie', '2.0.0.9'), readable_output) |
| |
| @unittest.skipIf(threading is None, 'needs threading') |
| @use_xmlrpc_server() |
| def test_upgrade_existing_deps(self, server): |
| client = self._get_client(server) |
| archive_path = '%s/distribution.tar.gz' % server.full_address |
| server.xmlrpc.set_distributions([ |
| {'name': 'choxie', |
| 'version': '2.0.0.9', |
| 'requires_dist': ['towel-stuff (0.1)'], |
| 'url': archive_path}, |
| {'name': 'towel-stuff', |
| 'version': '0.1', |
| 'requires_dist': ['bacon (>= 0.2)'], |
| 'url': archive_path}, |
| {'name': 'bacon', |
| 'version': '0.2', |
| 'requires_dist': [], |
| 'url': archive_path}, |
| ]) |
| |
| output = install.get_infos("choxie", index=client, |
| installed=get_installed_dists([('bacon', '0.1', [])])) |
| installed = [(o.name, str(o.version)) for o in output['install']] |
| |
| # we need bacon 0.2, but 0.1 is installed. |
| # So we expect to remove 0.1 and to install 0.2 instead. |
| remove = [(o.name, str(o.version)) for o in output['remove']] |
| self.assertIn(('choxie', '2.0.0.9'), installed) |
| self.assertIn(('towel-stuff', '0.1'), installed) |
| self.assertIn(('bacon', '0.2'), installed) |
| self.assertIn(('bacon', '0.1'), remove) |
| self.assertEqual(0, len(output['conflict'])) |
| |
| @unittest.skipIf(threading is None, 'needs threading') |
| @use_xmlrpc_server() |
| def test_conflicts(self, server): |
| # Tests that conflicts are detected |
| client = self._get_client(server) |
| archive_path = '%s/distribution.tar.gz' % server.full_address |
| |
| # choxie depends on towel-stuff, which depends on bacon. |
| server.xmlrpc.set_distributions([ |
| {'name': 'choxie', |
| 'version': '2.0.0.9', |
| 'requires_dist': ['towel-stuff (0.1)'], |
| 'url': archive_path}, |
| {'name': 'towel-stuff', |
| 'version': '0.1', |
| 'requires_dist': ['bacon (>= 0.2)'], |
| 'url': archive_path}, |
| {'name': 'bacon', |
| 'version': '0.2', |
| 'requires_dist': [], |
| 'url': archive_path}, |
| ]) |
| |
| # name, version, deps. |
| already_installed = [('bacon', '0.1', []), |
| ('chicken', '1.1', ['bacon (0.1)'])] |
| output = install.get_infos( |
| 'choxie', index=client, |
| installed=get_installed_dists(already_installed)) |
| |
| # we need bacon 0.2, but 0.1 is installed. |
| # So we expect to remove 0.1 and to install 0.2 instead. |
| installed, remove, conflict = self._get_results(output) |
| self.assertIn(('choxie', '2.0.0.9'), installed) |
| self.assertIn(('towel-stuff', '0.1'), installed) |
| self.assertIn(('bacon', '0.2'), installed) |
| self.assertIn(('bacon', '0.1'), remove) |
| self.assertIn(('chicken', '1.1'), conflict) |
| |
| @unittest.skipIf(threading is None, 'needs threading') |
| @use_xmlrpc_server() |
| def test_installation_unexisting_project(self, server): |
| # Test that the isntalled raises an exception if the project does not |
| # exists. |
| client = self._get_client(server) |
| self.assertRaises(install.InstallationException, |
| install.get_infos, |
| 'unexisting project', index=client) |
| |
| def test_move_files(self): |
| # test that the files are really moved, and that the new path is |
| # returned. |
| path = self.mkdtemp() |
| newpath = self.mkdtemp() |
| files = [os.path.join(path, str(x)) for x in range(1, 20)] |
| for f in files: |
| open(f, 'ab+').close() |
| output = [o for o in install._move_files(files, newpath)] |
| |
| # check that output return the list of old/new places |
| for file_ in files: |
| name = os.path.split(file_)[-1] |
| newloc = os.path.join(newpath, name) |
| self.assertIn((file_, newloc), output) |
| |
| # remove the files |
| for f in [o[1] for o in output]: # o[1] is the new place |
| os.remove(f) |
| |
| def test_update_infos(self): |
| tests = [[ |
| {'foo': ['foobar', 'foo', 'baz'], 'baz': ['foo', 'foo']}, |
| {'foo': ['additional_content', 'yeah'], 'baz': ['test', 'foo']}, |
| {'foo': ['foobar', 'foo', 'baz', 'additional_content', 'yeah'], |
| 'baz': ['foo', 'foo', 'test', 'foo']}, |
| ]] |
| |
| for dict1, dict2, expect in tests: |
| install._update_infos(dict1, dict2) |
| for key in expect: |
| self.assertEqual(expect[key], dict1[key]) |
| |
| def test_install_dists_rollback(self): |
| # if one of the distribution installation fails, call uninstall on all |
| # installed distributions. |
| |
| old_install_dist = install._install_dist |
| old_uninstall = getattr(install, 'uninstall', None) |
| |
| install._install_dist = MagicMock(return_value=[], |
| raise_exception=(False, True)) |
| install.remove = MagicMock() |
| try: |
| d1 = ToInstallDist() |
| d2 = ToInstallDist() |
| path = self.mkdtemp() |
| self.assertRaises(Exception, install.install_dists, [d1, d2], path) |
| self.assertTrue(install._install_dist.called_with(d1, path)) |
| self.assertTrue(install.remove.called) |
| finally: |
| install._install_dist = old_install_dist |
| install.remove = old_uninstall |
| |
| def test_install_dists_success(self): |
| old_install_dist = install._install_dist |
| install._install_dist = MagicMock(return_value=[]) |
| try: |
| # test that the install method is called on each distributions |
| d1 = ToInstallDist() |
| d2 = ToInstallDist() |
| |
| # should call install |
| path = self.mkdtemp() |
| install.install_dists([d1, d2], path) |
| for dist in (d1, d2): |
| self.assertTrue(install._install_dist.called_with(dist, path)) |
| finally: |
| install._install_dist = old_install_dist |
| |
| def test_install_from_infos_conflict(self): |
| # assert conflicts raise an exception |
| self.assertRaises(install.InstallationConflict, |
| install.install_from_infos, |
| conflicts=[ToInstallDist()]) |
| |
| def test_install_from_infos_remove_success(self): |
| old_install_dists = install.install_dists |
| install.install_dists = lambda x, y=None: None |
| try: |
| dists = [] |
| for i in range(2): |
| dists.append(ToInstallDist(files=True)) |
| install.install_from_infos(remove=dists) |
| |
| # assert that the files have been removed |
| for dist in dists: |
| for f in dist.list_installed_files(): |
| self.assertFalse(os.path.exists(f)) |
| finally: |
| install.install_dists = old_install_dists |
| |
| def test_install_from_infos_remove_rollback(self): |
| old_install_dist = install._install_dist |
| old_uninstall = getattr(install, 'uninstall', None) |
| |
| install._install_dist = MagicMock(return_value=[], |
| raise_exception=(False, True)) |
| install.uninstall = MagicMock() |
| try: |
| # assert that if an error occurs, the removed files are restored. |
| remove = [] |
| for i in range(2): |
| remove.append(ToInstallDist(files=True)) |
| to_install = [ToInstallDist(), ToInstallDist()] |
| temp_dir = self.mkdtemp() |
| |
| self.assertRaises(Exception, install.install_from_infos, |
| install_path=temp_dir, install=to_install, |
| remove=remove) |
| # assert that the files are in the same place |
| # assert that the files have been removed |
| for dist in remove: |
| for f in dist.list_installed_files(): |
| self.assertTrue(os.path.exists(f)) |
| dist._unlink_installed_files() |
| finally: |
| install._install_dist = old_install_dist |
| install.uninstall = old_uninstall |
| |
| def test_install_from_infos_install_succes(self): |
| old_install_dist = install._install_dist |
| install._install_dist = MagicMock([]) |
| try: |
| # assert that the distribution can be installed |
| install_path = "my_install_path" |
| to_install = [ToInstallDist(), ToInstallDist()] |
| |
| install.install_from_infos(install_path, install=to_install) |
| for dist in to_install: |
| install._install_dist.called_with(install_path) |
| finally: |
| install._install_dist = old_install_dist |
| |
| def test_install_permission_denied(self): |
| # if we don't have access to the installation path, we should abort |
| # immediately |
| project = os.path.join(os.path.dirname(__file__), 'package.tgz') |
| |
| # when running from an uninstalled build, a warning is emitted and the |
| # installation is not attempted |
| if is_python_build(): |
| self.assertFalse(install.install(project)) |
| self.assertEqual(1, len(self.get_logs(logging.ERROR))) |
| return |
| |
| install_path = self.mkdtemp() |
| old_get_path = install.get_path |
| install.get_path = lambda path: install_path |
| old_mod = os.stat(install_path).st_mode |
| os.chmod(install_path, 0) |
| try: |
| self.assertFalse(install.install(project)) |
| finally: |
| os.chmod(install_path, old_mod) |
| install.get_path = old_get_path |
| |
| |
| def test_suite(): |
| suite = unittest.TestSuite() |
| suite.addTest(unittest.makeSuite(TestInstall)) |
| return suite |
| |
| if __name__ == '__main__': |
| unittest.main(defaultTest='test_suite') |