| """Class and functions dealing with dependencies between distributions. |
| |
| This module provides a DependencyGraph class to represent the |
| dependencies between distributions. Auxiliary functions can generate a |
| graph, find reverse dependencies, and print a graph in DOT format. |
| """ |
| |
| import sys |
| |
| from io import StringIO |
| from packaging.errors import PackagingError |
| from packaging.version import VersionPredicate, IrrationalVersionError |
| |
| __all__ = ['DependencyGraph', 'generate_graph', 'dependent_dists', |
| 'graph_to_dot'] |
| |
| |
| class DependencyGraph: |
| """ |
| Represents a dependency graph between distributions. |
| |
| The dependency relationships are stored in an ``adjacency_list`` that maps |
| distributions to a list of ``(other, label)`` tuples where ``other`` |
| is a distribution and the edge is labeled with ``label`` (i.e. the version |
| specifier, if such was provided). Also, for more efficient traversal, for |
| every distribution ``x``, a list of predecessors is kept in |
| ``reverse_list[x]``. An edge from distribution ``a`` to |
| distribution ``b`` means that ``a`` depends on ``b``. If any missing |
| dependencies are found, they are stored in ``missing``, which is a |
| dictionary that maps distributions to a list of requirements that were not |
| provided by any other distributions. |
| """ |
| |
| def __init__(self): |
| self.adjacency_list = {} |
| self.reverse_list = {} |
| self.missing = {} |
| |
| def add_distribution(self, distribution): |
| """Add the *distribution* to the graph. |
| |
| :type distribution: :class:`packaging.database.Distribution` or |
| :class:`packaging.database.EggInfoDistribution` |
| """ |
| self.adjacency_list[distribution] = [] |
| self.reverse_list[distribution] = [] |
| self.missing[distribution] = [] |
| |
| def add_edge(self, x, y, label=None): |
| """Add an edge from distribution *x* to distribution *y* with the given |
| *label*. |
| |
| :type x: :class:`packaging.database.Distribution` or |
| :class:`packaging.database.EggInfoDistribution` |
| :type y: :class:`packaging.database.Distribution` or |
| :class:`packaging.database.EggInfoDistribution` |
| :type label: ``str`` or ``None`` |
| """ |
| self.adjacency_list[x].append((y, label)) |
| # multiple edges are allowed, so be careful |
| if not x in self.reverse_list[y]: |
| self.reverse_list[y].append(x) |
| |
| def add_missing(self, distribution, requirement): |
| """ |
| Add a missing *requirement* for the given *distribution*. |
| |
| :type distribution: :class:`packaging.database.Distribution` or |
| :class:`packaging.database.EggInfoDistribution` |
| :type requirement: ``str`` |
| """ |
| self.missing[distribution].append(requirement) |
| |
| def _repr_dist(self, dist): |
| return '%s %s' % (dist.name, dist.metadata['Version']) |
| |
| def repr_node(self, dist, level=1): |
| """Prints only a subgraph""" |
| output = [] |
| output.append(self._repr_dist(dist)) |
| for other, label in self.adjacency_list[dist]: |
| dist = self._repr_dist(other) |
| if label is not None: |
| dist = '%s [%s]' % (dist, label) |
| output.append(' ' * level + str(dist)) |
| suboutput = self.repr_node(other, level + 1) |
| subs = suboutput.split('\n') |
| output.extend(subs[1:]) |
| return '\n'.join(output) |
| |
| def __repr__(self): |
| """Representation of the graph""" |
| output = [] |
| for dist, adjs in self.adjacency_list.items(): |
| output.append(self.repr_node(dist)) |
| return '\n'.join(output) |
| |
| |
| def graph_to_dot(graph, f, skip_disconnected=True): |
| """Writes a DOT output for the graph to the provided file *f*. |
| |
| If *skip_disconnected* is set to ``True``, then all distributions |
| that are not dependent on any other distribution are skipped. |
| |
| :type f: has to support ``file``-like operations |
| :type skip_disconnected: ``bool`` |
| """ |
| disconnected = [] |
| |
| f.write("digraph dependencies {\n") |
| for dist, adjs in graph.adjacency_list.items(): |
| if len(adjs) == 0 and not skip_disconnected: |
| disconnected.append(dist) |
| for other, label in adjs: |
| if not label is None: |
| f.write('"%s" -> "%s" [label="%s"]\n' % |
| (dist.name, other.name, label)) |
| else: |
| f.write('"%s" -> "%s"\n' % (dist.name, other.name)) |
| if not skip_disconnected and len(disconnected) > 0: |
| f.write('subgraph disconnected {\n') |
| f.write('label = "Disconnected"\n') |
| f.write('bgcolor = red\n') |
| |
| for dist in disconnected: |
| f.write('"%s"' % dist.name) |
| f.write('\n') |
| f.write('}\n') |
| f.write('}\n') |
| |
| |
| def generate_graph(dists): |
| """Generates a dependency graph from the given distributions. |
| |
| :parameter dists: a list of distributions |
| :type dists: list of :class:`packaging.database.Distribution` and |
| :class:`packaging.database.EggInfoDistribution` instances |
| :rtype: a :class:`DependencyGraph` instance |
| """ |
| graph = DependencyGraph() |
| provided = {} # maps names to lists of (version, dist) tuples |
| |
| # first, build the graph and find out the provides |
| for dist in dists: |
| graph.add_distribution(dist) |
| provides = (dist.metadata['Provides-Dist'] + |
| dist.metadata['Provides'] + |
| ['%s (%s)' % (dist.name, dist.metadata['Version'])]) |
| |
| for p in provides: |
| comps = p.strip().rsplit(" ", 1) |
| name = comps[0] |
| version = None |
| if len(comps) == 2: |
| version = comps[1] |
| if len(version) < 3 or version[0] != '(' or version[-1] != ')': |
| raise PackagingError('Distribution %s has ill formed' \ |
| 'provides field: %s' % (dist.name, p)) |
| version = version[1:-1] # trim off parenthesis |
| if not name in provided: |
| provided[name] = [] |
| provided[name].append((version, dist)) |
| |
| # now make the edges |
| for dist in dists: |
| requires = dist.metadata['Requires-Dist'] + dist.metadata['Requires'] |
| for req in requires: |
| try: |
| predicate = VersionPredicate(req) |
| except IrrationalVersionError: |
| # XXX compat-mode if cannot read the version |
| name = req.split()[0] |
| predicate = VersionPredicate(name) |
| |
| name = predicate.name |
| |
| if not name in provided: |
| graph.add_missing(dist, req) |
| else: |
| matched = False |
| for version, provider in provided[name]: |
| try: |
| match = predicate.match(version) |
| except IrrationalVersionError: |
| # XXX small compat-mode |
| if version.split(' ') == 1: |
| match = True |
| else: |
| match = False |
| |
| if match: |
| graph.add_edge(dist, provider, req) |
| matched = True |
| break |
| if not matched: |
| graph.add_missing(dist, req) |
| return graph |
| |
| |
| def dependent_dists(dists, dist): |
| """Recursively generate a list of distributions from *dists* that are |
| dependent on *dist*. |
| |
| :param dists: a list of distributions |
| :param dist: a distribution, member of *dists* for which we are interested |
| """ |
| if not dist in dists: |
| raise ValueError('The given distribution is not a member of the list') |
| graph = generate_graph(dists) |
| |
| dep = [dist] # dependent distributions |
| fringe = graph.reverse_list[dist] # list of nodes we should inspect |
| |
| while not len(fringe) == 0: |
| node = fringe.pop() |
| dep.append(node) |
| for prev in graph.reverse_list[node]: |
| if not prev in dep: |
| fringe.append(prev) |
| |
| dep.pop(0) # remove dist from dep, was there to prevent infinite loops |
| return dep |
| |
| |
| def main(): |
| from packaging.database import get_distributions |
| tempout = StringIO() |
| try: |
| old = sys.stderr |
| sys.stderr = tempout |
| try: |
| dists = list(get_distributions(use_egg_info=True)) |
| graph = generate_graph(dists) |
| finally: |
| sys.stderr = old |
| except Exception as e: |
| tempout.seek(0) |
| tempout = tempout.read() |
| print('Could not generate the graph') |
| print(tempout) |
| print(e) |
| sys.exit(1) |
| |
| for dist, reqs in graph.missing.items(): |
| if len(reqs) > 0: |
| print("Warning: Missing dependencies for %s:" % dist.name, |
| ", ".join(reqs)) |
| # XXX replace with argparse |
| if len(sys.argv) == 1: |
| print('Dependency graph:') |
| print(' ', repr(graph).replace('\n', '\n ')) |
| sys.exit(0) |
| elif len(sys.argv) > 1 and sys.argv[1] in ('-d', '--dot'): |
| if len(sys.argv) > 2: |
| filename = sys.argv[2] |
| else: |
| filename = 'depgraph.dot' |
| |
| with open(filename, 'w') as f: |
| graph_to_dot(graph, f, True) |
| tempout.seek(0) |
| tempout = tempout.read() |
| print(tempout) |
| print('Dot file written at "%s"' % filename) |
| sys.exit(0) |
| else: |
| print('Supported option: -d [filename]') |
| sys.exit(1) |
| |
| |
| if __name__ == '__main__': |
| main() |