blob: e39016c901abbf094a6c2a764f49e6987f3527b8 [file] [log] [blame]
"""Upload a distribution to a project index."""
import os
import socket
import logging
import platform
import urllib.parse
from io import BytesIO
from base64 import standard_b64encode
from hashlib import md5
from urllib.error import HTTPError
from urllib.request import urlopen, Request
from packaging import logger
from packaging.errors import PackagingOptionError
from packaging.util import (spawn, read_pypirc, DEFAULT_REPOSITORY,
DEFAULT_REALM, encode_multipart)
from packaging.command.cmd import Command
class upload(Command):
description = "upload distribution to PyPI"
user_options = [
('repository=', 'r',
"repository URL [default: %s]" % DEFAULT_REPOSITORY),
('show-response', None,
"display full response text from server"),
('sign', 's',
"sign files to upload using gpg"),
('identity=', 'i',
"GPG identity used to sign files"),
('upload-docs', None,
"upload documentation too"),
]
boolean_options = ['show-response', 'sign']
def initialize_options(self):
self.repository = None
self.realm = None
self.show_response = False
self.username = ''
self.password = ''
self.show_response = False
self.sign = False
self.identity = None
self.upload_docs = False
def finalize_options(self):
if self.repository is None:
self.repository = DEFAULT_REPOSITORY
if self.realm is None:
self.realm = DEFAULT_REALM
if self.identity and not self.sign:
raise PackagingOptionError(
"Must use --sign for --identity to have meaning")
config = read_pypirc(self.repository, self.realm)
if config != {}:
self.username = config['username']
self.password = config['password']
self.repository = config['repository']
self.realm = config['realm']
# getting the password from the distribution
# if previously set by the register command
if not self.password and self.distribution.password:
self.password = self.distribution.password
def run(self):
if not self.distribution.dist_files:
raise PackagingOptionError(
"No dist file created in earlier command")
for command, pyversion, filename in self.distribution.dist_files:
self.upload_file(command, pyversion, filename)
if self.upload_docs:
upload_docs = self.get_finalized_command("upload_docs")
upload_docs.repository = self.repository
upload_docs.username = self.username
upload_docs.password = self.password
upload_docs.run()
# XXX to be refactored with register.post_to_server
def upload_file(self, command, pyversion, filename):
# Makes sure the repository URL is compliant
scheme, netloc, url, params, query, fragments = \
urllib.parse.urlparse(self.repository)
if params or query or fragments:
raise AssertionError("Incompatible url %s" % self.repository)
if scheme not in ('http', 'https'):
raise AssertionError("unsupported scheme " + scheme)
# Sign if requested
if self.sign:
gpg_args = ["gpg", "--detach-sign", "-a", filename]
if self.identity:
gpg_args[2:2] = ["--local-user", self.identity]
spawn(gpg_args,
dry_run=self.dry_run)
# Fill in the data - send all the metadata in case we need to
# register a new release
with open(filename, 'rb') as f:
content = f.read()
data = self.distribution.metadata.todict()
# extra upload infos
data[':action'] = 'file_upload'
data['protcol_version'] = '1'
data['content'] = (os.path.basename(filename), content)
data['filetype'] = command
data['pyversion'] = pyversion
data['md5_digest'] = md5(content).hexdigest()
if command == 'bdist_dumb':
data['comment'] = 'built for %s' % platform.platform(terse=True)
if self.sign:
with open(filename + '.asc') as fp:
sig = fp.read()
data['gpg_signature'] = [
(os.path.basename(filename) + ".asc", sig)]
# set up the authentication
# The exact encoding of the authentication string is debated.
# Anyway PyPI only accepts ascii for both username or password.
user_pass = (self.username + ":" + self.password).encode('ascii')
auth = b"Basic " + standard_b64encode(user_pass)
# Build up the MIME payload for the POST data
files = []
for key in ('content', 'gpg_signature'):
if key in data:
filename_, value = data.pop(key)
files.append((key, filename_, value))
content_type, body = encode_multipart(data.items(), files)
logger.info("Submitting %s to %s", filename, self.repository)
# build the Request
headers = {'Content-type': content_type,
'Content-length': str(len(body)),
'Authorization': auth}
request = Request(self.repository, body, headers)
# send the data
try:
result = urlopen(request)
status = result.code
reason = result.msg
except socket.error as e:
logger.error(e)
return
except HTTPError as e:
status = e.code
reason = e.msg
if status == 200:
logger.info('Server response (%s): %s', status, reason)
else:
logger.error('Upload failed (%s): %s', status, reason)
if self.show_response and logger.isEnabledFor(logging.INFO):
sep = '-' * 75
logger.info('%s\n%s\n%s', sep, result.read().decode(), sep)