"""Tests for the packaging.install module."""
import os
import logging
from sysconfig import is_python_build
from tempfile import mkstemp

from packaging import install
from packaging.pypi.xmlrpc import Client
from packaging.metadata import Metadata
from packaging.tests.support import (LoggingCatcher, TempdirManager, unittest,
                                     fake_dec)
try:
    import threading
    from packaging.tests.pypi_server import use_xmlrpc_server
except ImportError:
    threading = None
    use_xmlrpc_server = fake_dec


class InstalledDist:
    """Distribution object, represent distributions currently installed on the
    system"""
    def __init__(self, name, version, deps):
        self.metadata = Metadata()
        self.name = name
        self.version = version
        self.metadata['Name'] = name
        self.metadata['Version'] = version
        self.metadata['Requires-Dist'] = deps

    def __repr__(self):
        return '<InstalledDist %r>' % self.metadata['Name']


class ToInstallDist:
    """Distribution that will be installed"""

    def __init__(self, files=False):
        self._files = files
        self.install_called = False
        self.install_called_with = {}
        self.uninstall_called = False
        self._real_files = []
        self.name = "fake"
        self.version = "fake"
        if files:
            for f in range(0, 3):
                fp, fn = mkstemp()
                os.close(fp)
                self._real_files.append(fn)

    def _unlink_installed_files(self):
        if self._files:
            for fn in self._real_files:
                os.unlink(fn)

    def list_installed_files(self, **args):
        if self._files:
            return self._real_files

    def get_install(self, **args):
        return self.list_installed_files()


class MagicMock:
    def __init__(self, return_value=None, raise_exception=False):
        self.called = False
        self._times_called = 0
        self._called_with = []
        self._return_value = return_value
        self._raise = raise_exception

    def __call__(self, *args, **kwargs):
        self.called = True
        self._times_called = self._times_called + 1
        self._called_with.append((args, kwargs))
        iterable = hasattr(self._raise, '__iter__')
        if self._raise:
            if ((not iterable and self._raise)
                    or self._raise[self._times_called - 1]):
                raise Exception
        return self._return_value

    def called_with(self, *args, **kwargs):
        return (args, kwargs) in self._called_with


def get_installed_dists(dists):
    """Return a list of fake installed dists.
    The list is name, version, deps"""
    objects = []
    for name, version, deps in dists:
        objects.append(InstalledDist(name, version, deps))
    return objects


class TestInstall(LoggingCatcher, TempdirManager, unittest.TestCase):
    def _get_client(self, server, *args, **kwargs):
        return Client(server.full_address, *args, **kwargs)

    def _get_results(self, output):
        """return a list of results"""
        installed = [(o.name, str(o.version)) for o in output['install']]
        remove = [(o.name, str(o.version)) for o in output['remove']]
        conflict = [(o.name, str(o.version)) for o in output['conflict']]
        return installed, remove, conflict

    @unittest.skipIf(threading is None, 'needs threading')
    @use_xmlrpc_server()
    def test_existing_deps(self, server):
        # Test that the installer get the dependencies from the metadatas
        # and ask the index for this dependencies.
        # In this test case, we have choxie that is dependent from towel-stuff
        # 0.1, which is in-turn dependent on bacon <= 0.2:
        # choxie -> towel-stuff -> bacon.
        # Each release metadata is not provided in metadata 1.2.
        client = self._get_client(server)
        archive_path = '%s/distribution.tar.gz' % server.full_address
        server.xmlrpc.set_distributions([
            {'name': 'choxie',
             'version': '2.0.0.9',
             'requires_dist': ['towel-stuff (0.1)'],
             'url': archive_path},
            {'name': 'towel-stuff',
             'version': '0.1',
             'requires_dist': ['bacon (<= 0.2)'],
             'url': archive_path},
            {'name': 'bacon',
             'version': '0.1',
             'requires_dist': [],
             'url': archive_path},
            ])
        installed = get_installed_dists([('bacon', '0.1', [])])
        output = install.get_infos("choxie", index=client,
                                   installed=installed)

        # we don't have installed bacon as it's already installed system-wide
        self.assertEqual(0, len(output['remove']))
        self.assertEqual(2, len(output['install']))
        readable_output = [(o.name, str(o.version))
                           for o in output['install']]
        self.assertIn(('towel-stuff', '0.1'), readable_output)
        self.assertIn(('choxie', '2.0.0.9'), readable_output)

    @unittest.skipIf(threading is None, 'needs threading')
    @use_xmlrpc_server()
    def test_upgrade_existing_deps(self, server):
        client = self._get_client(server)
        archive_path = '%s/distribution.tar.gz' % server.full_address
        server.xmlrpc.set_distributions([
            {'name': 'choxie',
             'version': '2.0.0.9',
             'requires_dist': ['towel-stuff (0.1)'],
             'url': archive_path},
            {'name': 'towel-stuff',
             'version': '0.1',
             'requires_dist': ['bacon (>= 0.2)'],
             'url': archive_path},
            {'name': 'bacon',
             'version': '0.2',
             'requires_dist': [],
             'url': archive_path},
            ])

        output = install.get_infos("choxie", index=client,
                     installed=get_installed_dists([('bacon', '0.1', [])]))
        installed = [(o.name, str(o.version)) for o in output['install']]

        # we need bacon 0.2, but 0.1 is installed.
        # So we expect to remove 0.1 and to install 0.2 instead.
        remove = [(o.name, str(o.version)) for o in output['remove']]
        self.assertIn(('choxie', '2.0.0.9'), installed)
        self.assertIn(('towel-stuff', '0.1'), installed)
        self.assertIn(('bacon', '0.2'), installed)
        self.assertIn(('bacon', '0.1'), remove)
        self.assertEqual(0, len(output['conflict']))

    @unittest.skipIf(threading is None, 'needs threading')
    @use_xmlrpc_server()
    def test_conflicts(self, server):
        # Tests that conflicts are detected
        client = self._get_client(server)
        archive_path = '%s/distribution.tar.gz' % server.full_address

        # choxie depends on towel-stuff, which depends on bacon.
        server.xmlrpc.set_distributions([
            {'name': 'choxie',
             'version': '2.0.0.9',
             'requires_dist': ['towel-stuff (0.1)'],
             'url': archive_path},
            {'name': 'towel-stuff',
             'version': '0.1',
             'requires_dist': ['bacon (>= 0.2)'],
             'url': archive_path},
            {'name': 'bacon',
             'version': '0.2',
             'requires_dist': [],
             'url': archive_path},
            ])

        # name, version, deps.
        already_installed = [('bacon', '0.1', []),
                             ('chicken', '1.1', ['bacon (0.1)'])]
        output = install.get_infos(
            'choxie', index=client,
            installed=get_installed_dists(already_installed))

        # we need bacon 0.2, but 0.1 is installed.
        # So we expect to remove 0.1 and to install 0.2 instead.
        installed, remove, conflict = self._get_results(output)
        self.assertIn(('choxie', '2.0.0.9'), installed)
        self.assertIn(('towel-stuff', '0.1'), installed)
        self.assertIn(('bacon', '0.2'), installed)
        self.assertIn(('bacon', '0.1'), remove)
        self.assertIn(('chicken', '1.1'), conflict)

    @unittest.skipIf(threading is None, 'needs threading')
    @use_xmlrpc_server()
    def test_installation_unexisting_project(self, server):
        # Test that the isntalled raises an exception if the project does not
        # exists.
        client = self._get_client(server)
        self.assertRaises(install.InstallationException,
                          install.get_infos,
                          'unexisting project', index=client)

    def test_move_files(self):
        # test that the files are really moved, and that the new path is
        # returned.
        path = self.mkdtemp()
        newpath = self.mkdtemp()
        files = [os.path.join(path, str(x)) for x in range(1, 20)]
        for f in files:
            open(f, 'ab+').close()
        output = [o for o in install._move_files(files, newpath)]

        # check that output return the list of old/new places
        for file_ in files:
            name = os.path.split(file_)[-1]
            newloc = os.path.join(newpath, name)
            self.assertIn((file_, newloc), output)

        # remove the files
        for f in [o[1] for o in output]:  # o[1] is the new place
            os.remove(f)

    def test_update_infos(self):
        tests = [[
            {'foo': ['foobar', 'foo', 'baz'], 'baz': ['foo', 'foo']},
            {'foo': ['additional_content', 'yeah'], 'baz': ['test', 'foo']},
            {'foo': ['foobar', 'foo', 'baz', 'additional_content', 'yeah'],
             'baz': ['foo', 'foo', 'test', 'foo']},
        ]]

        for dict1, dict2, expect in tests:
            install._update_infos(dict1, dict2)
            for key in expect:
                self.assertEqual(expect[key], dict1[key])

    def test_install_dists_rollback(self):
        # if one of the distribution installation fails, call uninstall on all
        # installed distributions.

        old_install_dist = install._install_dist
        old_uninstall = getattr(install, 'uninstall', None)

        install._install_dist = MagicMock(return_value=[],
                                          raise_exception=(False, True))
        install.remove = MagicMock()
        try:
            d1 = ToInstallDist()
            d2 = ToInstallDist()
            path = self.mkdtemp()
            self.assertRaises(Exception, install.install_dists, [d1, d2], path)
            self.assertTrue(install._install_dist.called_with(d1, path))
            self.assertTrue(install.remove.called)
        finally:
            install._install_dist = old_install_dist
            install.remove = old_uninstall

    def test_install_dists_success(self):
        old_install_dist = install._install_dist
        install._install_dist = MagicMock(return_value=[])
        try:
            # test that the install method is called on each distributions
            d1 = ToInstallDist()
            d2 = ToInstallDist()

            # should call install
            path = self.mkdtemp()
            install.install_dists([d1, d2], path)
            for dist in (d1, d2):
                self.assertTrue(install._install_dist.called_with(dist, path))
        finally:
            install._install_dist = old_install_dist

    def test_install_from_infos_conflict(self):
        # assert conflicts raise an exception
        self.assertRaises(install.InstallationConflict,
            install.install_from_infos,
            conflicts=[ToInstallDist()])

    def test_install_from_infos_remove_success(self):
        old_install_dists = install.install_dists
        install.install_dists = lambda x, y=None: None
        try:
            dists = []
            for i in range(2):
                dists.append(ToInstallDist(files=True))
            install.install_from_infos(remove=dists)

            # assert that the files have been removed
            for dist in dists:
                for f in dist.list_installed_files():
                    self.assertFalse(os.path.exists(f))
        finally:
            install.install_dists = old_install_dists

    def test_install_from_infos_remove_rollback(self):
        old_install_dist = install._install_dist
        old_uninstall = getattr(install, 'uninstall', None)

        install._install_dist = MagicMock(return_value=[],
                raise_exception=(False, True))
        install.uninstall = MagicMock()
        try:
            # assert that if an error occurs, the removed files are restored.
            remove = []
            for i in range(2):
                remove.append(ToInstallDist(files=True))
            to_install = [ToInstallDist(), ToInstallDist()]
            temp_dir = self.mkdtemp()

            self.assertRaises(Exception, install.install_from_infos,
                              install_path=temp_dir, install=to_install,
                              remove=remove)
            # assert that the files are in the same place
            # assert that the files have been removed
            for dist in remove:
                for f in dist.list_installed_files():
                    self.assertTrue(os.path.exists(f))
                dist._unlink_installed_files()
        finally:
            install._install_dist = old_install_dist
            install.uninstall = old_uninstall

    def test_install_from_infos_install_succes(self):
        old_install_dist = install._install_dist
        install._install_dist = MagicMock([])
        try:
            # assert that the distribution can be installed
            install_path = "my_install_path"
            to_install = [ToInstallDist(), ToInstallDist()]

            install.install_from_infos(install_path, install=to_install)
            for dist in to_install:
                install._install_dist.called_with(install_path)
        finally:
            install._install_dist = old_install_dist

    def test_install_permission_denied(self):
        # if we don't have access to the installation path, we should abort
        # immediately
        project = os.path.join(os.path.dirname(__file__), 'package.tgz')

        # when running from an uninstalled build, a warning is emitted and the
        # installation is not attempted
        if is_python_build():
            self.assertFalse(install.install(project))
            self.assertEqual(1, len(self.get_logs(logging.ERROR)))
            return

        install_path = self.mkdtemp()
        old_get_path = install.get_path
        install.get_path = lambda path: install_path
        old_mod = os.stat(install_path).st_mode
        os.chmod(install_path, 0)
        try:
            self.assertFalse(install.install(project))
        finally:
            os.chmod(install_path, old_mod)
            install.get_path = old_get_path


def test_suite():
    suite = unittest.TestSuite()
    suite.addTest(unittest.makeSuite(TestInstall))
    return suite

if __name__ == '__main__':
    unittest.main(defaultTest='test_suite')
