blob: 541465e63fb10e8eff5e5d7f5cba9095b72c0e14 [file] [log] [blame]
"""Classes representing releases and distributions retrieved from indexes.
A project (= unique name) can have several releases (= versions) and
each release can have several distributions (= sdist and bdists).
Release objects contain metadata-related information (see PEP 376);
distribution objects contain download-related information.
"""
import re
import hashlib
import tempfile
import urllib.request
import urllib.parse
import urllib.error
import urllib.parse
from shutil import unpack_archive
from packaging.errors import IrrationalVersionError
from packaging.version import (suggest_normalized_version, NormalizedVersion,
get_version_predicate)
from packaging.metadata import Metadata
from packaging.pypi.errors import (HashDoesNotMatch, UnsupportedHashName,
CantParseArchiveName)
__all__ = ['ReleaseInfo', 'DistInfo', 'ReleasesList', 'get_infos_from_url']
EXTENSIONS = ".tar.gz .tar.bz2 .tar .zip .tgz .egg".split()
MD5_HASH = re.compile(r'^.*#md5=([a-f0-9]+)$')
DIST_TYPES = ['bdist', 'sdist']
class IndexReference:
"""Mixin used to store the index reference"""
def set_index(self, index=None):
self._index = index
class ReleaseInfo(IndexReference):
"""Represent a release of a project (a project with a specific version).
The release contain the _metadata informations related to this specific
version, and is also a container for distribution related informations.
See the DistInfo class for more information about distributions.
"""
def __init__(self, name, version, metadata=None, hidden=False,
index=None, **kwargs):
"""
:param name: the name of the distribution
:param version: the version of the distribution
:param metadata: the metadata fields of the release.
:type metadata: dict
:param kwargs: optional arguments for a new distribution.
"""
self.set_index(index)
self.name = name
self._version = None
self.version = version
if metadata:
self.metadata = Metadata(mapping=metadata)
else:
self.metadata = None
self.dists = {}
self.hidden = hidden
if 'dist_type' in kwargs:
dist_type = kwargs.pop('dist_type')
self.add_distribution(dist_type, **kwargs)
def set_version(self, version):
try:
self._version = NormalizedVersion(version)
except IrrationalVersionError:
suggestion = suggest_normalized_version(version)
if suggestion:
self.version = suggestion
else:
raise IrrationalVersionError(version)
def get_version(self):
return self._version
version = property(get_version, set_version)
def fetch_metadata(self):
"""If the metadata is not set, use the indexes to get it"""
if not self.metadata:
self._index.get_metadata(self.name, str(self.version))
return self.metadata
@property
def is_final(self):
"""proxy to version.is_final"""
return self.version.is_final
def fetch_distributions(self):
if self.dists is None:
self._index.get_distributions(self.name, str(self.version))
if self.dists is None:
self.dists = {}
return self.dists
def add_distribution(self, dist_type='sdist', python_version=None,
**params):
"""Add distribution informations to this release.
If distribution information is already set for this distribution type,
add the given url paths to the distribution. This can be useful while
some of them fails to download.
:param dist_type: the distribution type (eg. "sdist", "bdist", etc.)
:param params: the fields to be passed to the distribution object
(see the :class:DistInfo constructor).
"""
if dist_type not in DIST_TYPES:
raise ValueError(dist_type)
if dist_type in self.dists:
self.dists[dist_type].add_url(**params)
else:
self.dists[dist_type] = DistInfo(self, dist_type,
index=self._index, **params)
if python_version:
self.dists[dist_type].python_version = python_version
def get_distribution(self, dist_type=None, prefer_source=True):
"""Return a distribution.
If dist_type is set, find first for this distribution type, and just
act as an alias of __get_item__.
If prefer_source is True, search first for source distribution, and if
not return one existing distribution.
"""
if len(self.dists) == 0:
raise LookupError
if dist_type:
return self[dist_type]
if prefer_source:
if "sdist" in self.dists:
dist = self["sdist"]
else:
dist = next(self.dists.values())
return dist
def unpack(self, path=None, prefer_source=True):
"""Unpack the distribution to the given path.
If not destination is given, creates a temporary location.
Returns the location of the extracted files (root).
"""
return self.get_distribution(prefer_source=prefer_source)\
.unpack(path=path)
def download(self, temp_path=None, prefer_source=True):
"""Download the distribution, using the requirements.
If more than one distribution match the requirements, use the last
version.
Download the distribution, and put it in the temp_path. If no temp_path
is given, creates and return one.
Returns the complete absolute path to the downloaded archive.
"""
return self.get_distribution(prefer_source=prefer_source)\
.download(path=temp_path)
def set_metadata(self, metadata):
if not self.metadata:
self.metadata = Metadata()
self.metadata.update(metadata)
def __getitem__(self, item):
"""distributions are available using release["sdist"]"""
return self.dists[item]
def _check_is_comparable(self, other):
if not isinstance(other, ReleaseInfo):
raise TypeError("cannot compare %s and %s"
% (type(self).__name__, type(other).__name__))
elif self.name != other.name:
raise TypeError("cannot compare %s and %s"
% (self.name, other.name))
def __repr__(self):
return "<%s %s>" % (self.name, self.version)
def __eq__(self, other):
self._check_is_comparable(other)
return self.version == other.version
def __lt__(self, other):
self._check_is_comparable(other)
return self.version < other.version
def __ne__(self, other):
return not self.__eq__(other)
def __gt__(self, other):
return not (self.__lt__(other) or self.__eq__(other))
def __le__(self, other):
return self.__eq__(other) or self.__lt__(other)
def __ge__(self, other):
return self.__eq__(other) or self.__gt__(other)
# See http://docs.python.org/reference/datamodel#object.__hash__
__hash__ = object.__hash__
class DistInfo(IndexReference):
"""Represents a distribution retrieved from an index (sdist, bdist, ...)
"""
def __init__(self, release, dist_type=None, url=None, hashname=None,
hashval=None, is_external=True, python_version=None,
index=None):
"""Create a new instance of DistInfo.
:param release: a DistInfo class is relative to a release.
:param dist_type: the type of the dist (eg. source, bin-*, etc.)
:param url: URL where we found this distribution
:param hashname: the name of the hash we want to use. Refer to the
hashlib.new documentation for more information.
:param hashval: the hash value.
:param is_external: we need to know if the provided url comes from
an index browsing, or from an external resource.
"""
self.set_index(index)
self.release = release
self.dist_type = dist_type
self.python_version = python_version
self._unpacked_dir = None
# set the downloaded path to None by default. The goal here
# is to not download distributions multiple times
self.downloaded_location = None
# We store urls in dict, because we need to have a bit more infos
# than the simple URL. It will be used later to find the good url to
# use.
# We have two _url* attributes: _url and urls. urls contains a list
# of dict for the different urls, and _url contains the choosen url, in
# order to dont make the selection process multiple times.
self.urls = []
self._url = None
self.add_url(url, hashname, hashval, is_external)
def add_url(self, url=None, hashname=None, hashval=None, is_external=True):
"""Add a new url to the list of urls"""
if hashname is not None:
try:
hashlib.new(hashname)
except ValueError:
raise UnsupportedHashName(hashname)
if url not in [u['url'] for u in self.urls]:
self.urls.append({
'url': url,
'hashname': hashname,
'hashval': hashval,
'is_external': is_external,
})
# reset the url selection process
self._url = None
@property
def url(self):
"""Pick up the right url for the list of urls in self.urls"""
# We return internal urls over externals.
# If there is more than one internal or external, return the first
# one.
if self._url is None:
if len(self.urls) > 1:
internals_urls = [u for u in self.urls \
if u['is_external'] == False]
if len(internals_urls) >= 1:
self._url = internals_urls[0]
if self._url is None:
self._url = self.urls[0]
return self._url
@property
def is_source(self):
"""return if the distribution is a source one or not"""
return self.dist_type == 'sdist'
def download(self, path=None):
"""Download the distribution to a path, and return it.
If the path is given in path, use this, otherwise, generates a new one
Return the download location.
"""
if path is None:
path = tempfile.mkdtemp()
# if we do not have downloaded it yet, do it.
if self.downloaded_location is None:
url = self.url['url']
archive_name = urllib.parse.urlparse(url)[2].split('/')[-1]
filename, headers = urllib.request.urlretrieve(url,
path + "/" + archive_name)
self.downloaded_location = filename
self._check_md5(filename)
return self.downloaded_location
def unpack(self, path=None):
"""Unpack the distribution to the given path.
If not destination is given, creates a temporary location.
Returns the location of the extracted files (root).
"""
if not self._unpacked_dir:
if path is None:
path = tempfile.mkdtemp()
filename = self.download(path)
unpack_archive(filename, path)
self._unpacked_dir = path
return path
def _check_md5(self, filename):
"""Check that the md5 checksum of the given file matches the one in
url param"""
hashname = self.url['hashname']
expected_hashval = self.url['hashval']
if None not in (expected_hashval, hashname):
with open(filename, 'rb') as f:
hashval = hashlib.new(hashname)
hashval.update(f.read())
if hashval.hexdigest() != expected_hashval:
raise HashDoesNotMatch("got %s instead of %s"
% (hashval.hexdigest(), expected_hashval))
def __repr__(self):
if self.release is None:
return "<? ? %s>" % self.dist_type
return "<%s %s %s>" % (
self.release.name, self.release.version, self.dist_type or "")
class ReleasesList(IndexReference):
"""A container of Release.
Provides useful methods and facilities to sort and filter releases.
"""
def __init__(self, name, releases=None, contains_hidden=False, index=None):
self.set_index(index)
self.releases = []
self.name = name
self.contains_hidden = contains_hidden
if releases:
self.add_releases(releases)
def fetch_releases(self):
self._index.get_releases(self.name)
return self.releases
def filter(self, predicate):
"""Filter and return a subset of releases matching the given predicate.
"""
return ReleasesList(self.name, [release for release in self.releases
if predicate.match(release.version)],
index=self._index)
def get_last(self, requirements, prefer_final=None):
"""Return the "last" release, that satisfy the given predicates.
"last" is defined by the version number of the releases, you also could
set prefer_final parameter to True or False to change the order results
"""
predicate = get_version_predicate(requirements)
releases = self.filter(predicate)
if len(releases) == 0:
return None
releases.sort_releases(prefer_final, reverse=True)
return releases[0]
def add_releases(self, releases):
"""Add releases in the release list.
:param: releases is a list of ReleaseInfo objects.
"""
for r in releases:
self.add_release(release=r)
def add_release(self, version=None, dist_type='sdist', release=None,
**dist_args):
"""Add a release to the list.
The release can be passed in the `release` parameter, and in this case,
it will be crawled to extract the useful informations if necessary, or
the release informations can be directly passed in the `version` and
`dist_type` arguments.
Other keywords arguments can be provided, and will be forwarded to the
distribution creation (eg. the arguments of the DistInfo constructor).
"""
if release:
if release.name.lower() != self.name.lower():
raise ValueError("%s is not the same project as %s" %
(release.name, self.name))
version = str(release.version)
if version not in self.get_versions():
# append only if not already exists
self.releases.append(release)
for dist in release.dists.values():
for url in dist.urls:
self.add_release(version, dist.dist_type, **url)
else:
matches = [r for r in self.releases
if str(r.version) == version and r.name == self.name]
if not matches:
release = ReleaseInfo(self.name, version, index=self._index)
self.releases.append(release)
else:
release = matches[0]
release.add_distribution(dist_type=dist_type, **dist_args)
def sort_releases(self, prefer_final=False, reverse=True, *args, **kwargs):
"""Sort the results with the given properties.
The `prefer_final` argument can be used to specify if final
distributions (eg. not dev, beta or alpha) would be preferred or not.
Results can be inverted by using `reverse`.
Any other parameter provided will be forwarded to the sorted call. You
cannot redefine the key argument of "sorted" here, as it is used
internally to sort the releases.
"""
sort_by = []
if prefer_final:
sort_by.append("is_final")
sort_by.append("version")
self.releases.sort(
key=lambda i: tuple(getattr(i, arg) for arg in sort_by),
reverse=reverse, *args, **kwargs)
def get_release(self, version):
"""Return a release from its version."""
matches = [r for r in self.releases if str(r.version) == version]
if len(matches) != 1:
raise KeyError(version)
return matches[0]
def get_versions(self):
"""Return a list of releases versions contained"""
return [str(r.version) for r in self.releases]
def __getitem__(self, key):
return self.releases[key]
def __len__(self):
return len(self.releases)
def __repr__(self):
string = 'Project "%s"' % self.name
if self.get_versions():
string += ' versions: %s' % ', '.join(self.get_versions())
return '<%s>' % string
def get_infos_from_url(url, probable_dist_name=None, is_external=True):
"""Get useful informations from an URL.
Return a dict of (name, version, url, hashtype, hash, is_external)
:param url: complete url of the distribution
:param probable_dist_name: A probable name of the project.
:param is_external: Tell if the url commes from an index or from
an external URL.
"""
# if the url contains a md5 hash, get it.
md5_hash = None
match = MD5_HASH.match(url)
if match is not None:
md5_hash = match.group(1)
# remove the hash
url = url.replace("#md5=%s" % md5_hash, "")
# parse the archive name to find dist name and version
archive_name = urllib.parse.urlparse(url)[2].split('/')[-1]
extension_matched = False
# remove the extension from the name
for ext in EXTENSIONS:
if archive_name.endswith(ext):
archive_name = archive_name[:-len(ext)]
extension_matched = True
name, version = split_archive_name(archive_name)
if extension_matched is True:
return {'name': name,
'version': version,
'url': url,
'hashname': "md5",
'hashval': md5_hash,
'is_external': is_external,
'dist_type': 'sdist'}
def split_archive_name(archive_name, probable_name=None):
"""Split an archive name into two parts: name and version.
Return the tuple (name, version)
"""
# Try to determine wich part is the name and wich is the version using the
# "-" separator. Take the larger part to be the version number then reduce
# if this not works.
def eager_split(str, maxsplit=2):
# split using the "-" separator
splits = str.rsplit("-", maxsplit)
name = splits[0]
version = "-".join(splits[1:])
if version.startswith("-"):
version = version[1:]
if suggest_normalized_version(version) is None and maxsplit >= 0:
# we dont get a good version number: recurse !
return eager_split(str, maxsplit - 1)
else:
return name, version
if probable_name is not None:
probable_name = probable_name.lower()
name = None
if probable_name is not None and probable_name in archive_name:
# we get the name from probable_name, if given.
name = probable_name
version = archive_name.lstrip(name)
else:
name, version = eager_split(archive_name)
version = suggest_normalized_version(version)
if version is not None and name != "":
return name.lower(), version
else:
raise CantParseArchiveName(archive_name)