| """Class and functions dealing with dependencies between distributions. | 
 |  | 
 | This module provides a DependencyGraph class to represent the | 
 | dependencies between distributions.  Auxiliary functions can generate a | 
 | graph, find reverse dependencies, and print a graph in DOT format. | 
 | """ | 
 |  | 
 | import sys | 
 |  | 
 | from io import StringIO | 
 | from packaging.errors import PackagingError | 
 | from packaging.version import VersionPredicate, IrrationalVersionError | 
 |  | 
 | __all__ = ['DependencyGraph', 'generate_graph', 'dependent_dists', | 
 |            'graph_to_dot'] | 
 |  | 
 |  | 
 | class DependencyGraph: | 
 |     """ | 
 |     Represents a dependency graph between distributions. | 
 |  | 
 |     The dependency relationships are stored in an ``adjacency_list`` that maps | 
 |     distributions to a list of ``(other, label)`` tuples where  ``other`` | 
 |     is a distribution and the edge is labeled with ``label`` (i.e. the version | 
 |     specifier, if such was provided). Also, for more efficient traversal, for | 
 |     every distribution ``x``, a list of predecessors is kept in | 
 |     ``reverse_list[x]``. An edge from distribution ``a`` to | 
 |     distribution ``b`` means that ``a`` depends on ``b``. If any missing | 
 |     dependencies are found, they are stored in ``missing``, which is a | 
 |     dictionary that maps distributions to a list of requirements that were not | 
 |     provided by any other distributions. | 
 |     """ | 
 |  | 
 |     def __init__(self): | 
 |         self.adjacency_list = {} | 
 |         self.reverse_list = {} | 
 |         self.missing = {} | 
 |  | 
 |     def add_distribution(self, distribution): | 
 |         """Add the *distribution* to the graph. | 
 |  | 
 |         :type distribution: :class:`packaging.database.Distribution` or | 
 |                             :class:`packaging.database.EggInfoDistribution` | 
 |         """ | 
 |         self.adjacency_list[distribution] = [] | 
 |         self.reverse_list[distribution] = [] | 
 |         self.missing[distribution] = [] | 
 |  | 
 |     def add_edge(self, x, y, label=None): | 
 |         """Add an edge from distribution *x* to distribution *y* with the given | 
 |         *label*. | 
 |  | 
 |         :type x: :class:`packaging.database.Distribution` or | 
 |                  :class:`packaging.database.EggInfoDistribution` | 
 |         :type y: :class:`packaging.database.Distribution` or | 
 |                  :class:`packaging.database.EggInfoDistribution` | 
 |         :type label: ``str`` or ``None`` | 
 |         """ | 
 |         self.adjacency_list[x].append((y, label)) | 
 |         # multiple edges are allowed, so be careful | 
 |         if x not in self.reverse_list[y]: | 
 |             self.reverse_list[y].append(x) | 
 |  | 
 |     def add_missing(self, distribution, requirement): | 
 |         """ | 
 |         Add a missing *requirement* for the given *distribution*. | 
 |  | 
 |         :type distribution: :class:`packaging.database.Distribution` or | 
 |                             :class:`packaging.database.EggInfoDistribution` | 
 |         :type requirement: ``str`` | 
 |         """ | 
 |         self.missing[distribution].append(requirement) | 
 |  | 
 |     def _repr_dist(self, dist): | 
 |         return '%r %s' % (dist.name, dist.version) | 
 |  | 
 |     def repr_node(self, dist, level=1): | 
 |         """Prints only a subgraph""" | 
 |         output = [] | 
 |         output.append(self._repr_dist(dist)) | 
 |         for other, label in self.adjacency_list[dist]: | 
 |             dist = self._repr_dist(other) | 
 |             if label is not None: | 
 |                 dist = '%s [%s]' % (dist, label) | 
 |             output.append('    ' * level + str(dist)) | 
 |             suboutput = self.repr_node(other, level + 1) | 
 |             subs = suboutput.split('\n') | 
 |             output.extend(subs[1:]) | 
 |         return '\n'.join(output) | 
 |  | 
 |     def __repr__(self): | 
 |         """Representation of the graph""" | 
 |         output = [] | 
 |         for dist, adjs in self.adjacency_list.items(): | 
 |             output.append(self.repr_node(dist)) | 
 |         return '\n'.join(output) | 
 |  | 
 |  | 
 | def graph_to_dot(graph, f, skip_disconnected=True): | 
 |     """Writes a DOT output for the graph to the provided file *f*. | 
 |  | 
 |     If *skip_disconnected* is set to ``True``, then all distributions | 
 |     that are not dependent on any other distribution are skipped. | 
 |  | 
 |     :type f: has to support ``file``-like operations | 
 |     :type skip_disconnected: ``bool`` | 
 |     """ | 
 |     disconnected = [] | 
 |  | 
 |     f.write("digraph dependencies {\n") | 
 |     for dist, adjs in graph.adjacency_list.items(): | 
 |         if len(adjs) == 0 and not skip_disconnected: | 
 |             disconnected.append(dist) | 
 |         for other, label in adjs: | 
 |             if not label is None: | 
 |                 f.write('"%s" -> "%s" [label="%s"]\n' % | 
 |                                             (dist.name, other.name, label)) | 
 |             else: | 
 |                 f.write('"%s" -> "%s"\n' % (dist.name, other.name)) | 
 |     if not skip_disconnected and len(disconnected) > 0: | 
 |         f.write('subgraph disconnected {\n') | 
 |         f.write('label = "Disconnected"\n') | 
 |         f.write('bgcolor = red\n') | 
 |  | 
 |         for dist in disconnected: | 
 |             f.write('"%s"' % dist.name) | 
 |             f.write('\n') | 
 |         f.write('}\n') | 
 |     f.write('}\n') | 
 |  | 
 |  | 
 | def generate_graph(dists): | 
 |     """Generates a dependency graph from the given distributions. | 
 |  | 
 |     :parameter dists: a list of distributions | 
 |     :type dists: list of :class:`packaging.database.Distribution` and | 
 |                  :class:`packaging.database.EggInfoDistribution` instances | 
 |     :rtype: a :class:`DependencyGraph` instance | 
 |     """ | 
 |     graph = DependencyGraph() | 
 |     provided = {}  # maps names to lists of (version, dist) tuples | 
 |  | 
 |     # first, build the graph and find out the provides | 
 |     for dist in dists: | 
 |         graph.add_distribution(dist) | 
 |         provides = (dist.metadata['Provides-Dist'] + | 
 |                     dist.metadata['Provides'] + | 
 |                     ['%s (%s)' % (dist.name, dist.version)]) | 
 |  | 
 |         for p in provides: | 
 |             comps = p.strip().rsplit(" ", 1) | 
 |             name = comps[0] | 
 |             version = None | 
 |             if len(comps) == 2: | 
 |                 version = comps[1] | 
 |                 if len(version) < 3 or version[0] != '(' or version[-1] != ')': | 
 |                     raise PackagingError('distribution %r has ill-formed' | 
 |                                          'provides field: %r' % (dist.name, p)) | 
 |                 version = version[1:-1]  # trim off parenthesis | 
 |             if name not in provided: | 
 |                 provided[name] = [] | 
 |             provided[name].append((version, dist)) | 
 |  | 
 |     # now make the edges | 
 |     for dist in dists: | 
 |         requires = dist.metadata['Requires-Dist'] + dist.metadata['Requires'] | 
 |         for req in requires: | 
 |             try: | 
 |                 predicate = VersionPredicate(req) | 
 |             except IrrationalVersionError: | 
 |                 # XXX compat-mode if cannot read the version | 
 |                 name = req.split()[0] | 
 |                 predicate = VersionPredicate(name) | 
 |  | 
 |             name = predicate.name | 
 |  | 
 |             if name not in provided: | 
 |                 graph.add_missing(dist, req) | 
 |             else: | 
 |                 matched = False | 
 |                 for version, provider in provided[name]: | 
 |                     try: | 
 |                         match = predicate.match(version) | 
 |                     except IrrationalVersionError: | 
 |                         # XXX small compat-mode | 
 |                         if version.split(' ') == 1: | 
 |                             match = True | 
 |                         else: | 
 |                             match = False | 
 |  | 
 |                     if match: | 
 |                         graph.add_edge(dist, provider, req) | 
 |                         matched = True | 
 |                         break | 
 |                 if not matched: | 
 |                     graph.add_missing(dist, req) | 
 |     return graph | 
 |  | 
 |  | 
 | def dependent_dists(dists, dist): | 
 |     """Recursively generate a list of distributions from *dists* that are | 
 |     dependent on *dist*. | 
 |  | 
 |     :param dists: a list of distributions | 
 |     :param dist: a distribution, member of *dists* for which we are interested | 
 |     """ | 
 |     if dist not in dists: | 
 |         raise ValueError('given distribution %r is not a member of the list' % | 
 |                          dist.name) | 
 |     graph = generate_graph(dists) | 
 |  | 
 |     dep = [dist]  # dependent distributions | 
 |     fringe = graph.reverse_list[dist]  # list of nodes we should inspect | 
 |  | 
 |     while not len(fringe) == 0: | 
 |         node = fringe.pop() | 
 |         dep.append(node) | 
 |         for prev in graph.reverse_list[node]: | 
 |             if prev not in dep: | 
 |                 fringe.append(prev) | 
 |  | 
 |     dep.pop(0)  # remove dist from dep, was there to prevent infinite loops | 
 |     return dep | 
 |  | 
 |  | 
 | def main(): | 
 |     # XXX move to run._graph | 
 |     from packaging.database import get_distributions | 
 |     tempout = StringIO() | 
 |     try: | 
 |         old = sys.stderr | 
 |         sys.stderr = tempout | 
 |         try: | 
 |             dists = list(get_distributions(use_egg_info=True)) | 
 |             graph = generate_graph(dists) | 
 |         finally: | 
 |             sys.stderr = old | 
 |     except Exception as e: | 
 |         tempout.seek(0) | 
 |         tempout = tempout.read() | 
 |         print('Could not generate the graph') | 
 |         print(tempout) | 
 |         print(e) | 
 |         sys.exit(1) | 
 |  | 
 |     for dist, reqs in graph.missing.items(): | 
 |         if len(reqs) > 0: | 
 |             print("Warning: Missing dependencies for %r:" % dist.name, | 
 |                   ", ".join(reqs)) | 
 |     # XXX replace with argparse | 
 |     if len(sys.argv) == 1: | 
 |         print('Dependency graph:') | 
 |         print('   ', repr(graph).replace('\n', '\n    ')) | 
 |         sys.exit(0) | 
 |     elif len(sys.argv) > 1 and sys.argv[1] in ('-d', '--dot'): | 
 |         if len(sys.argv) > 2: | 
 |             filename = sys.argv[2] | 
 |         else: | 
 |             filename = 'depgraph.dot' | 
 |  | 
 |         with open(filename, 'w') as f: | 
 |             graph_to_dot(graph, f, True) | 
 |         tempout.seek(0) | 
 |         tempout = tempout.read() | 
 |         print(tempout) | 
 |         print('Dot file written at %r' % filename) | 
 |         sys.exit(0) | 
 |     else: | 
 |         print('Supported option: -d [filename]') | 
 |         sys.exit(1) |