Add support for resumable upload.
Reviewed in http://codereview.appspot.com/5417051/.
diff --git a/apiclient/discovery.py b/apiclient/discovery.py
index 2a53b0d..9e5b230 100644
--- a/apiclient/discovery.py
+++ b/apiclient/discovery.py
@@ -26,6 +26,7 @@
import httplib2
import logging
import os
+import random
import re
import uritemplate
import urllib
@@ -48,6 +49,8 @@
from errors import UnknownApiNameOrVersion
from errors import UnknownLinkType
from http import HttpRequest
+from http import MediaUpload
+from http import MediaFileUpload
from model import JsonModel
URITEMPLATE = re.compile('{[^}]*}')
@@ -325,6 +328,7 @@
if 'mediaUpload' in methodDesc:
mediaUpload = methodDesc['mediaUpload']
mediaPathUrl = mediaUpload['protocols']['simple']['path']
+ mediaResumablePathUrl = mediaUpload['protocols']['resumable']['path']
accept = mediaUpload['accept']
maxSize = _media_size_to_long(mediaUpload.get('maxSize', ''))
@@ -440,28 +444,46 @@
expanded_url = uritemplate.expand(pathUrl, params)
url = urlparse.urljoin(self._baseUrl, expanded_url + query)
+ resumable = None
+ multipart_boundary = ''
+
if media_filename:
- (media_mime_type, encoding) = mimetypes.guess_type(media_filename)
- if media_mime_type is None:
- raise UnknownFileType(media_filename)
- if not mimeparse.best_match([media_mime_type], ','.join(accept)):
- raise UnacceptableMimeTypeError(media_mime_type)
+ # Convert a simple filename into a MediaUpload object.
+ if isinstance(media_filename, basestring):
+ (media_mime_type, encoding) = mimetypes.guess_type(media_filename)
+ if media_mime_type is None:
+ raise UnknownFileType(media_filename)
+ if not mimeparse.best_match([media_mime_type], ','.join(accept)):
+ raise UnacceptableMimeTypeError(media_mime_type)
+ media_upload = MediaFileUpload(media_filename, media_mime_type)
+ elif isinstance(media_filename, MediaUpload):
+ media_upload = media_filename
+ else:
+ raise TypeError(
+ 'media_filename must be str or MediaUpload. Got %s' % type(media_upload))
+
+ if media_upload.resumable():
+ resumable = media_upload
# Check the maxSize
- if maxSize > 0 and os.path.getsize(media_filename) > maxSize:
- raise MediaUploadSizeError(media_filename)
+ if maxSize > 0 and media_upload.size() > maxSize:
+ raise MediaUploadSizeError("Media larger than: %s" % maxSize)
# Use the media path uri for media uploads
- expanded_url = uritemplate.expand(mediaPathUrl, params)
+ if media_upload.resumable():
+ expanded_url = uritemplate.expand(mediaResumablePathUrl, params)
+ else:
+ expanded_url = uritemplate.expand(mediaPathUrl, params)
url = urlparse.urljoin(self._baseUrl, expanded_url + query)
if body is None:
- headers['content-type'] = media_mime_type
- # make the body the contents of the file
- f = file(media_filename, 'rb')
- body = f.read()
- f.close()
+ # This is a simple media upload
+ headers['content-type'] = media_upload.mimetype()
+ expanded_url = uritemplate.expand(mediaResumablePathUrl, params)
+ if not media_upload.resumable():
+ body = media_upload.getbytes(0, media_upload.size())
else:
+ # This is a multipart/related upload.
msgRoot = MIMEMultipart('related')
# msgRoot should not write out it's own headers
setattr(msgRoot, '_write_headers', lambda self: None)
@@ -472,19 +494,51 @@
msgRoot.attach(msg)
# attach the media as the second part
- msg = MIMENonMultipart(*media_mime_type.split('/'))
+ msg = MIMENonMultipart(*media_upload.mimetype().split('/'))
msg['Content-Transfer-Encoding'] = 'binary'
- f = file(media_filename, 'rb')
- msg.set_payload(f.read())
- f.close()
- msgRoot.attach(msg)
+ if media_upload.resumable():
+ # This is a multipart resumable upload, where a multipart payload
+ # looks like this:
+ #
+ # --===============1678050750164843052==
+ # Content-Type: application/json
+ # MIME-Version: 1.0
+ #
+ # {'foo': 'bar'}
+ # --===============1678050750164843052==
+ # Content-Type: image/png
+ # MIME-Version: 1.0
+ # Content-Transfer-Encoding: binary
+ #
+ # <BINARY STUFF>
+ # --===============1678050750164843052==--
+ #
+ # In the case of resumable multipart media uploads, the <BINARY
+ # STUFF> is large and will be spread across multiple PUTs. What we
+ # do here is compose the multipart message with a random payload in
+ # place of <BINARY STUFF> and then split the resulting content into
+ # two pieces, text before <BINARY STUFF> and text after <BINARY
+ # STUFF>. The text after <BINARY STUFF> is the multipart boundary.
+ # In apiclient.http the HttpRequest will send the text before
+ # <BINARY STUFF>, then send the actual binary media in chunks, and
+ # then will send the multipart delimeter.
- body = msgRoot.as_string()
+ payload = hex(random.getrandbits(300))
+ msg.set_payload(payload)
+ msgRoot.attach(msg)
+ body = msgRoot.as_string()
+ body, _ = body.split(payload)
+ resumable = media_upload
+ else:
+ payload = media_upload.getbytes(0, media_upload.size())
+ msg.set_payload(payload)
+ msgRoot.attach(msg)
+ body = msgRoot.as_string()
- # must appear after the call to as_string() to get the right boundary
+ multipart_boundary = msgRoot.get_boundary()
headers['content-type'] = ('multipart/related; '
- 'boundary="%s"') % msgRoot.get_boundary()
+ 'boundary="%s"') % multipart_boundary
logging.info('URL being requested: %s' % url)
return self._requestBuilder(self._http,
@@ -493,7 +547,8 @@
method=httpMethod,
body=body,
headers=headers,
- methodId=methodId)
+ methodId=methodId,
+ resumable=resumable)
docs = [methodDesc.get('description', DEFAULT_METHOD_DOC), '\n\n']
if len(argmap) > 0:
diff --git a/apiclient/errors.py b/apiclient/errors.py
index ff0c154..30a48e8 100644
--- a/apiclient/errors.py
+++ b/apiclient/errors.py
@@ -85,6 +85,11 @@
pass
+class ResumableUploadError(Error):
+ """Error occured during resumable upload."""
+ pass
+
+
class UnexpectedMethodError(Error):
"""Exception raised by RequestMockBuilder on unexpected calls."""
diff --git a/apiclient/http.py b/apiclient/http.py
index d2a3a2f..0b45a44 100644
--- a/apiclient/http.py
+++ b/apiclient/http.py
@@ -25,16 +25,187 @@
'set_user_agent', 'tunnel_patch'
]
+import copy
import httplib2
import os
+import mimeparse
+import mimetypes
from model import JsonModel
from errors import HttpError
+from errors import ResumableUploadError
from errors import UnexpectedBodyError
from errors import UnexpectedMethodError
from anyjson import simplejson
+class MediaUploadProgress(object):
+ """Status of a resumable upload."""
+
+ def __init__(self, resumable_progress, total_size):
+ """Constructor.
+
+ Args:
+ resumable_progress: int, bytes sent so far.
+ total_size: int, total bytes in complete upload.
+ """
+ self.resumable_progress = resumable_progress
+ self.total_size = total_size
+
+ def progress(self):
+ """Percent of upload completed, as a float."""
+ return float(self.resumable_progress)/float(self.total_size)
+
+
+class MediaUpload(object):
+ """Describes a media object to upload.
+
+ Base class that defines the interface of MediaUpload subclasses.
+ """
+
+ def getbytes(self, begin, end):
+ raise NotImplementedError()
+
+ def size(self):
+ raise NotImplementedError()
+
+ def chunksize(self):
+ raise NotImplementedError()
+
+ def mimetype(self):
+ return 'application/octet-stream'
+
+ def resumable(self):
+ return False
+
+ def _to_json(self, strip=None):
+ """Utility function for creating a JSON representation of a MediaUpload.
+
+ Args:
+ strip: array, An array of names of members to not include in the JSON.
+
+ Returns:
+ string, a JSON representation of this instance, suitable to pass to
+ from_json().
+ """
+ t = type(self)
+ d = copy.copy(self.__dict__)
+ if strip is not None:
+ for member in strip:
+ del d[member]
+ d['_class'] = t.__name__
+ d['_module'] = t.__module__
+ return simplejson.dumps(d)
+
+ def to_json(self):
+ """Create a JSON representation of an instance of MediaUpload.
+
+ Returns:
+ string, a JSON representation of this instance, suitable to pass to
+ from_json().
+ """
+ return self._to_json()
+
+ @classmethod
+ def new_from_json(cls, s):
+ """Utility class method to instantiate a MediaUpload subclass from a JSON
+ representation produced by to_json().
+
+ Args:
+ s: string, JSON from to_json().
+
+ Returns:
+ An instance of the subclass of MediaUpload that was serialized with
+ to_json().
+ """
+ data = simplejson.loads(s)
+ # Find and call the right classmethod from_json() to restore the object.
+ module = data['_module']
+ m = __import__(module, fromlist=module.split('.')[:-1])
+ kls = getattr(m, data['_class'])
+ from_json = getattr(kls, 'from_json')
+ return from_json(s)
+
+class MediaFileUpload(MediaUpload):
+ """A MediaUpload for a file.
+
+ Construct a MediaFileUpload and pass as the media_body parameter of the
+ method. For example, if we had a service that allowed uploading images:
+
+
+ media = MediaFileUpload('smiley.png', mimetype='image/png', chunksize=1000,
+ resumable=True)
+ service.objects().insert(
+ bucket=buckets['items'][0]['id'],
+ name='smiley.png',
+ media_body=media).execute()
+ """
+
+ def __init__(self, filename, mimetype=None, chunksize=10000, resumable=False):
+ """Constructor.
+
+ Args:
+ filename: string, Name of the file.
+ mimetype: string, Mime-type of the file. If None then a mime-type will be
+ guessed from the file extension.
+ chunksize: int, File will be uploaded in chunks of this many bytes. Only
+ used if resumable=True.
+ resumable: bool, True if this is a resumable upload. False means upload in
+ a single request.
+ """
+ self._filename = filename
+ self._size = os.path.getsize(filename)
+ self._fd = None
+ if mimetype is None:
+ (mimetype, encoding) = mimetypes.guess_type(filename)
+ self._mimetype = mimetype
+ self._chunksize = chunksize
+ self._resumable = resumable
+
+ def mimetype(self):
+ return self._mimetype
+
+ def size(self):
+ return self._size
+
+ def chunksize(self):
+ return self._chunksize
+
+ def resumable(self):
+ return self._resumable
+
+ def getbytes(self, begin, length):
+ """Get bytes from the media.
+
+ Args:
+ begin: int, offset from beginning of file.
+ length: int, number of bytes to read, starting at begin.
+
+ Returns:
+ A string of bytes read. May be shorted than length if EOF was reached
+ first.
+ """
+ if self._fd is None:
+ self._fd = open(self._filename, 'rb')
+ self._fd.seek(begin)
+ return self._fd.read(length)
+
+ def to_json(self):
+ """Creating a JSON representation of an instance of Credentials.
+
+ Returns:
+ string, a JSON representation of this instance, suitable to pass to
+ from_json().
+ """
+ return self._to_json(['_fd'])
+
+ @staticmethod
+ def from_json(s):
+ d = simplejson.loads(s)
+ return MediaFileUpload(
+ d['_filename'], d['_mimetype'], d['_chunksize'], d['_resumable'])
+
+
class HttpRequest(object):
"""Encapsulates a single HTTP request.
"""
@@ -43,7 +214,8 @@
method='GET',
body=None,
headers=None,
- methodId=None):
+ methodId=None,
+ resumable=None):
"""Constructor for an HttpRequest.
Args:
@@ -53,16 +225,39 @@
on an error.
uri: string, the absolute URI to send the request to
method: string, the HTTP method to use
- body: string, the request body of the HTTP request
+ body: string, the request body of the HTTP request,
headers: dict, the HTTP request headers
methodId: string, a unique identifier for the API method being called.
+ resumable: MediaUpload, None if this is not a resumbale request.
"""
self.uri = uri
self.method = method
self.body = body
self.headers = headers or {}
+ self.methodId = methodId
self.http = http
self.postproc = postproc
+ self.resumable = resumable
+
+ major, minor, params = mimeparse.parse_mime_type(
+ headers.get('content-type', 'application/json'))
+ self.multipart_boundary = params.get('boundary', '').strip('"')
+
+ # If this was a multipart resumable, the size of the non-media part.
+ self.multipart_size = 0
+
+ # The resumable URI to send chunks to.
+ self.resumable_uri = None
+
+ # The bytes that have been uploaded.
+ self.resumable_progress = 0
+
+ if resumable is not None:
+ if self.body is not None:
+ self.multipart_size = len(self.body)
+ else:
+ self.multipart_size = 0
+ self.total_size = self.resumable.size() + self.multipart_size + len(self.multipart_boundary)
def execute(self, http=None):
"""Execute the request.
@@ -81,14 +276,118 @@
"""
if http is None:
http = self.http
- resp, content = http.request(self.uri, self.method,
- body=self.body,
- headers=self.headers)
+ if self.resumable:
+ body = None
+ while body is None:
+ _, body = self.next_chunk(http)
+ return body
+ else:
+ resp, content = http.request(self.uri, self.method,
+ body=self.body,
+ headers=self.headers)
- if resp.status >= 300:
- raise HttpError(resp, content, self.uri)
+ if resp.status >= 300:
+ raise HttpError(resp, content, self.uri)
return self.postproc(resp, content)
+ def next_chunk(self, http=None):
+ """Execute the next step of a resumable upload.
+
+ Can only be used if the method being executed supports media uploads and the
+ MediaUpload object passed in was flagged as using resumable upload.
+
+ Example:
+
+ media = MediaFileUpload('smiley.png', mimetype='image/png', chunksize=1000,
+ resumable=True)
+ request = service.objects().insert(
+ bucket=buckets['items'][0]['id'],
+ name='smiley.png',
+ media_body=media)
+
+ response = None
+ while response is None:
+ status, response = request.next_chunk()
+ if status:
+ print "Upload %d%% complete." % int(status.progress() * 100)
+
+
+ Returns:
+ (status, body): (ResumableMediaStatus, object)
+ The body will be None until the resumable media is fully uploaded.
+ """
+ if http is None:
+ http = self.http
+
+ if self.resumable_uri is None:
+ start_headers = copy.copy(self.headers)
+ start_headers['X-Upload-Content-Type'] = self.resumable.mimetype()
+ start_headers['X-Upload-Content-Length'] = str(self.resumable.size())
+ start_headers['Content-Length'] = '0'
+ resp, content = http.request(self.uri, self.method,
+ body="",
+ headers=start_headers)
+ if resp.status == 200 and 'location' in resp:
+ self.resumable_uri = resp['location']
+ else:
+ raise ResumableUploadError("Failed to retrieve starting URI.")
+ if self.body:
+ begin = 0
+ data = self.body
+ else:
+ begin = self.resumable_progress - self.multipart_size
+ data = self.resumable.getbytes(begin, self.resumable.chunksize())
+
+ # Tack on the multipart/related boundary if we are at the end of the file.
+ if begin + self.resumable.chunksize() >= self.resumable.size():
+ data += self.multipart_boundary
+ headers = {
+ 'Content-Range': 'bytes %d-%d/%d' % (
+ self.resumable_progress, self.resumable_progress + len(data) - 1,
+ self.total_size),
+ }
+ resp, content = http.request(self.resumable_uri, 'PUT',
+ body=data,
+ headers=headers)
+ if resp.status in [200, 201]:
+ return None, self.postproc(resp, content)
+ # A "308 Resume Incomplete" indicates we are not done.
+ elif resp.status == 308:
+ self.resumable_progress = int(resp['range'].split('-')[1]) + 1
+ if self.resumable_progress >= self.multipart_size:
+ self.body = None
+ if 'location' in resp:
+ self.resumable_uri = resp['location']
+ else:
+ raise HttpError(resp, content, self.uri)
+
+ return MediaUploadProgress(self.resumable_progress, self.total_size), None
+
+ def to_json(self):
+ """Returns a JSON representation of the HttpRequest."""
+ d = copy.copy(self.__dict__)
+ if d['resumable'] is not None:
+ d['resumable'] = self.resumable.to_json()
+ del d['http']
+ del d['postproc']
+ return simplejson.dumps(d)
+
+ @staticmethod
+ def from_json(s, http, postproc):
+ """Returns an HttpRequest populated with info from a JSON object."""
+ d = simplejson.loads(s)
+ if d['resumable'] is not None:
+ d['resumable'] = MediaUpload.new_from_json(d['resumable'])
+ return HttpRequest(
+ http,
+ postproc,
+ uri = d['uri'],
+ method= d['method'],
+ body=d['body'],
+ headers=d['headers'],
+ methodId=d['methodId'],
+ resumable=d['resumable'])
+
class HttpRequestMock(object):
"""Mock of HttpRequest.
@@ -166,7 +465,7 @@
self.check_unexpected = check_unexpected
def __call__(self, http, postproc, uri, method='GET', body=None,
- headers=None, methodId=None):
+ headers=None, methodId=None, resumable=None):
"""Implements the callable interface that discovery.build() expects
of requestBuilder, which is to build an object compatible with
HttpRequest.execute(). See that method for the description of the