Add support for resumable upload.
Reviewed in http://codereview.appspot.com/5417051/.
diff --git a/apiclient/http.py b/apiclient/http.py
index d2a3a2f..0b45a44 100644
--- a/apiclient/http.py
+++ b/apiclient/http.py
@@ -25,16 +25,187 @@
'set_user_agent', 'tunnel_patch'
]
+import copy
import httplib2
import os
+import mimeparse
+import mimetypes
from model import JsonModel
from errors import HttpError
+from errors import ResumableUploadError
from errors import UnexpectedBodyError
from errors import UnexpectedMethodError
from anyjson import simplejson
+class MediaUploadProgress(object):
+ """Status of a resumable upload."""
+
+ def __init__(self, resumable_progress, total_size):
+ """Constructor.
+
+ Args:
+ resumable_progress: int, bytes sent so far.
+ total_size: int, total bytes in complete upload.
+ """
+ self.resumable_progress = resumable_progress
+ self.total_size = total_size
+
+ def progress(self):
+ """Percent of upload completed, as a float."""
+ return float(self.resumable_progress)/float(self.total_size)
+
+
+class MediaUpload(object):
+ """Describes a media object to upload.
+
+ Base class that defines the interface of MediaUpload subclasses.
+ """
+
+ def getbytes(self, begin, end):
+ raise NotImplementedError()
+
+ def size(self):
+ raise NotImplementedError()
+
+ def chunksize(self):
+ raise NotImplementedError()
+
+ def mimetype(self):
+ return 'application/octet-stream'
+
+ def resumable(self):
+ return False
+
+ def _to_json(self, strip=None):
+ """Utility function for creating a JSON representation of a MediaUpload.
+
+ Args:
+ strip: array, An array of names of members to not include in the JSON.
+
+ Returns:
+ string, a JSON representation of this instance, suitable to pass to
+ from_json().
+ """
+ t = type(self)
+ d = copy.copy(self.__dict__)
+ if strip is not None:
+ for member in strip:
+ del d[member]
+ d['_class'] = t.__name__
+ d['_module'] = t.__module__
+ return simplejson.dumps(d)
+
+ def to_json(self):
+ """Create a JSON representation of an instance of MediaUpload.
+
+ Returns:
+ string, a JSON representation of this instance, suitable to pass to
+ from_json().
+ """
+ return self._to_json()
+
+ @classmethod
+ def new_from_json(cls, s):
+ """Utility class method to instantiate a MediaUpload subclass from a JSON
+ representation produced by to_json().
+
+ Args:
+ s: string, JSON from to_json().
+
+ Returns:
+ An instance of the subclass of MediaUpload that was serialized with
+ to_json().
+ """
+ data = simplejson.loads(s)
+ # Find and call the right classmethod from_json() to restore the object.
+ module = data['_module']
+ m = __import__(module, fromlist=module.split('.')[:-1])
+ kls = getattr(m, data['_class'])
+ from_json = getattr(kls, 'from_json')
+ return from_json(s)
+
+class MediaFileUpload(MediaUpload):
+ """A MediaUpload for a file.
+
+ Construct a MediaFileUpload and pass as the media_body parameter of the
+ method. For example, if we had a service that allowed uploading images:
+
+
+ media = MediaFileUpload('smiley.png', mimetype='image/png', chunksize=1000,
+ resumable=True)
+ service.objects().insert(
+ bucket=buckets['items'][0]['id'],
+ name='smiley.png',
+ media_body=media).execute()
+ """
+
+ def __init__(self, filename, mimetype=None, chunksize=10000, resumable=False):
+ """Constructor.
+
+ Args:
+ filename: string, Name of the file.
+ mimetype: string, Mime-type of the file. If None then a mime-type will be
+ guessed from the file extension.
+ chunksize: int, File will be uploaded in chunks of this many bytes. Only
+ used if resumable=True.
+ resumable: bool, True if this is a resumable upload. False means upload in
+ a single request.
+ """
+ self._filename = filename
+ self._size = os.path.getsize(filename)
+ self._fd = None
+ if mimetype is None:
+ (mimetype, encoding) = mimetypes.guess_type(filename)
+ self._mimetype = mimetype
+ self._chunksize = chunksize
+ self._resumable = resumable
+
+ def mimetype(self):
+ return self._mimetype
+
+ def size(self):
+ return self._size
+
+ def chunksize(self):
+ return self._chunksize
+
+ def resumable(self):
+ return self._resumable
+
+ def getbytes(self, begin, length):
+ """Get bytes from the media.
+
+ Args:
+ begin: int, offset from beginning of file.
+ length: int, number of bytes to read, starting at begin.
+
+ Returns:
+ A string of bytes read. May be shorted than length if EOF was reached
+ first.
+ """
+ if self._fd is None:
+ self._fd = open(self._filename, 'rb')
+ self._fd.seek(begin)
+ return self._fd.read(length)
+
+ def to_json(self):
+ """Creating a JSON representation of an instance of Credentials.
+
+ Returns:
+ string, a JSON representation of this instance, suitable to pass to
+ from_json().
+ """
+ return self._to_json(['_fd'])
+
+ @staticmethod
+ def from_json(s):
+ d = simplejson.loads(s)
+ return MediaFileUpload(
+ d['_filename'], d['_mimetype'], d['_chunksize'], d['_resumable'])
+
+
class HttpRequest(object):
"""Encapsulates a single HTTP request.
"""
@@ -43,7 +214,8 @@
method='GET',
body=None,
headers=None,
- methodId=None):
+ methodId=None,
+ resumable=None):
"""Constructor for an HttpRequest.
Args:
@@ -53,16 +225,39 @@
on an error.
uri: string, the absolute URI to send the request to
method: string, the HTTP method to use
- body: string, the request body of the HTTP request
+ body: string, the request body of the HTTP request,
headers: dict, the HTTP request headers
methodId: string, a unique identifier for the API method being called.
+ resumable: MediaUpload, None if this is not a resumbale request.
"""
self.uri = uri
self.method = method
self.body = body
self.headers = headers or {}
+ self.methodId = methodId
self.http = http
self.postproc = postproc
+ self.resumable = resumable
+
+ major, minor, params = mimeparse.parse_mime_type(
+ headers.get('content-type', 'application/json'))
+ self.multipart_boundary = params.get('boundary', '').strip('"')
+
+ # If this was a multipart resumable, the size of the non-media part.
+ self.multipart_size = 0
+
+ # The resumable URI to send chunks to.
+ self.resumable_uri = None
+
+ # The bytes that have been uploaded.
+ self.resumable_progress = 0
+
+ if resumable is not None:
+ if self.body is not None:
+ self.multipart_size = len(self.body)
+ else:
+ self.multipart_size = 0
+ self.total_size = self.resumable.size() + self.multipart_size + len(self.multipart_boundary)
def execute(self, http=None):
"""Execute the request.
@@ -81,14 +276,118 @@
"""
if http is None:
http = self.http
- resp, content = http.request(self.uri, self.method,
- body=self.body,
- headers=self.headers)
+ if self.resumable:
+ body = None
+ while body is None:
+ _, body = self.next_chunk(http)
+ return body
+ else:
+ resp, content = http.request(self.uri, self.method,
+ body=self.body,
+ headers=self.headers)
- if resp.status >= 300:
- raise HttpError(resp, content, self.uri)
+ if resp.status >= 300:
+ raise HttpError(resp, content, self.uri)
return self.postproc(resp, content)
+ def next_chunk(self, http=None):
+ """Execute the next step of a resumable upload.
+
+ Can only be used if the method being executed supports media uploads and the
+ MediaUpload object passed in was flagged as using resumable upload.
+
+ Example:
+
+ media = MediaFileUpload('smiley.png', mimetype='image/png', chunksize=1000,
+ resumable=True)
+ request = service.objects().insert(
+ bucket=buckets['items'][0]['id'],
+ name='smiley.png',
+ media_body=media)
+
+ response = None
+ while response is None:
+ status, response = request.next_chunk()
+ if status:
+ print "Upload %d%% complete." % int(status.progress() * 100)
+
+
+ Returns:
+ (status, body): (ResumableMediaStatus, object)
+ The body will be None until the resumable media is fully uploaded.
+ """
+ if http is None:
+ http = self.http
+
+ if self.resumable_uri is None:
+ start_headers = copy.copy(self.headers)
+ start_headers['X-Upload-Content-Type'] = self.resumable.mimetype()
+ start_headers['X-Upload-Content-Length'] = str(self.resumable.size())
+ start_headers['Content-Length'] = '0'
+ resp, content = http.request(self.uri, self.method,
+ body="",
+ headers=start_headers)
+ if resp.status == 200 and 'location' in resp:
+ self.resumable_uri = resp['location']
+ else:
+ raise ResumableUploadError("Failed to retrieve starting URI.")
+ if self.body:
+ begin = 0
+ data = self.body
+ else:
+ begin = self.resumable_progress - self.multipart_size
+ data = self.resumable.getbytes(begin, self.resumable.chunksize())
+
+ # Tack on the multipart/related boundary if we are at the end of the file.
+ if begin + self.resumable.chunksize() >= self.resumable.size():
+ data += self.multipart_boundary
+ headers = {
+ 'Content-Range': 'bytes %d-%d/%d' % (
+ self.resumable_progress, self.resumable_progress + len(data) - 1,
+ self.total_size),
+ }
+ resp, content = http.request(self.resumable_uri, 'PUT',
+ body=data,
+ headers=headers)
+ if resp.status in [200, 201]:
+ return None, self.postproc(resp, content)
+ # A "308 Resume Incomplete" indicates we are not done.
+ elif resp.status == 308:
+ self.resumable_progress = int(resp['range'].split('-')[1]) + 1
+ if self.resumable_progress >= self.multipart_size:
+ self.body = None
+ if 'location' in resp:
+ self.resumable_uri = resp['location']
+ else:
+ raise HttpError(resp, content, self.uri)
+
+ return MediaUploadProgress(self.resumable_progress, self.total_size), None
+
+ def to_json(self):
+ """Returns a JSON representation of the HttpRequest."""
+ d = copy.copy(self.__dict__)
+ if d['resumable'] is not None:
+ d['resumable'] = self.resumable.to_json()
+ del d['http']
+ del d['postproc']
+ return simplejson.dumps(d)
+
+ @staticmethod
+ def from_json(s, http, postproc):
+ """Returns an HttpRequest populated with info from a JSON object."""
+ d = simplejson.loads(s)
+ if d['resumable'] is not None:
+ d['resumable'] = MediaUpload.new_from_json(d['resumable'])
+ return HttpRequest(
+ http,
+ postproc,
+ uri = d['uri'],
+ method= d['method'],
+ body=d['body'],
+ headers=d['headers'],
+ methodId=d['methodId'],
+ resumable=d['resumable'])
+
class HttpRequestMock(object):
"""Mock of HttpRequest.
@@ -166,7 +465,7 @@
self.check_unexpected = check_unexpected
def __call__(self, http, postproc, uri, method='GET', body=None,
- headers=None, methodId=None):
+ headers=None, methodId=None, resumable=None):
"""Implements the callable interface that discovery.build() expects
of requestBuilder, which is to build an object compatible with
HttpRequest.execute(). See that method for the description of the