Update Media Upload to include io.Base and also fix some bugs.
Fixes issue #139.
Fixes issue #123.
Reviewed in http://codereview.appspot.com/6307067/.
diff --git a/apiclient/http.py b/apiclient/http.py
index 0c49045..9155cab 100644
--- a/apiclient/http.py
+++ b/apiclient/http.py
@@ -46,6 +46,9 @@
from oauth2client.anyjson import simplejson
+DEFAULT_CHUNK_SIZE = 512*1024
+
+
class MediaUploadProgress(object):
"""Status of a resumable upload."""
@@ -54,14 +57,23 @@
Args:
resumable_progress: int, bytes sent so far.
- total_size: int, total bytes in complete upload.
+ total_size: int, total bytes in complete upload, or None if the total
+ upload size isn't known ahead of time.
"""
self.resumable_progress = resumable_progress
self.total_size = total_size
def progress(self):
- """Percent of upload completed, as a float."""
- return float(self.resumable_progress) / float(self.total_size)
+ """Percent of upload completed, as a float.
+
+ Returns:
+ the percentage complete as a float, returning 0.0 if the total size of
+ the upload is unknown.
+ """
+ if self.total_size is not None:
+ return float(self.resumable_progress) / float(self.total_size)
+ else:
+ return 0.0
class MediaUpload(object):
@@ -77,21 +89,51 @@
such as under certain classes of requests under Google App Engine.
"""
- def getbytes(self, begin, end):
- raise NotImplementedError()
-
- def size(self):
- raise NotImplementedError()
-
def chunksize(self):
+ """Chunk size for resumable uploads.
+
+ Returns:
+ Chunk size in bytes.
+ """
raise NotImplementedError()
def mimetype(self):
+ """Mime type of the body.
+
+ Returns:
+ Mime type.
+ """
return 'application/octet-stream'
+ def size(self):
+ """Size of upload.
+
+ Returns:
+ Size of the body, or None of the size is unknown.
+ """
+ return None
+
def resumable(self):
+ """Whether this upload is resumable.
+
+ Returns:
+ True if resumable upload or False.
+ """
return False
+ def getbytes(self, begin, end):
+ """Get bytes from the media.
+
+ Args:
+ begin: int, offset from beginning of file.
+ length: int, number of bytes to read, starting at begin.
+
+ Returns:
+ A string of bytes read. May be shorter than length if EOF was reached
+ first.
+ """
+ raise NotImplementedError()
+
def _to_json(self, strip=None):
"""Utility function for creating a JSON representation of a MediaUpload.
@@ -148,15 +190,15 @@
method. For example, if we had a service that allowed uploading images:
- media = MediaFileUpload('smiley.png', mimetype='image/png', chunksize=1000,
- resumable=True)
+ media = MediaFileUpload('smiley.png', mimetype='image/png',
+ chunksize=1024*1024, resumable=True)
service.objects().insert(
bucket=buckets['items'][0]['id'],
name='smiley.png',
media_body=media).execute()
"""
- def __init__(self, filename, mimetype=None, chunksize=256*1024, resumable=False):
+ def __init__(self, filename, mimetype=None, chunksize=DEFAULT_CHUNK_SIZE, resumable=False):
"""Constructor.
Args:
@@ -177,16 +219,36 @@
self._chunksize = chunksize
self._resumable = resumable
+ def chunksize(self):
+ """Chunk size for resumable uploads.
+
+ Returns:
+ Chunk size in bytes.
+ """
+ return self._chunksize
+
def mimetype(self):
+ """Mime type of the body.
+
+ Returns:
+ Mime type.
+ """
return self._mimetype
def size(self):
+ """Size of upload.
+
+ Returns:
+ Size of the body, or None of the size is unknown.
+ """
return self._size
- def chunksize(self):
- return self._chunksize
-
def resumable(self):
+ """Whether this upload is resumable.
+
+ Returns:
+ True if resumable upload or False.
+ """
return self._resumable
def getbytes(self, begin, length):
@@ -221,6 +283,98 @@
d['_filename'], d['_mimetype'], d['_chunksize'], d['_resumable'])
+class MediaIoBaseUpload(MediaUpload):
+ """A MediaUpload for a io.Base objects.
+
+ Note that the Python file object is compatible with io.Base and can be used
+ with this class also.
+
+
+ fh = io.BytesIO('...Some data to upload...')
+ media = MediaIoBaseUpload(fh, mimetype='image/png',
+ chunksize=1024*1024, resumable=True)
+ service.objects().insert(
+ bucket='a_bucket_id',
+ name='smiley.png',
+ media_body=media).execute()
+ """
+
+ def __init__(self, fh, mimetype, chunksize=DEFAULT_CHUNK_SIZE,
+ resumable=False):
+ """Constructor.
+
+ Args:
+ fh: io.Base or file object, The source of the bytes to upload.
+ mimetype: string, Mime-type of the file. If None then a mime-type will be
+ guessed from the file extension.
+ chunksize: int, File will be uploaded in chunks of this many bytes. Only
+ used if resumable=True.
+ resumable: bool, True if this is a resumable upload. False means upload
+ in a single request.
+ """
+ self._fh = fh
+ self._mimetype = mimetype
+ self._chunksize = chunksize
+ self._resumable = resumable
+ self._size = None
+ try:
+ if hasattr(self._fh, 'fileno'):
+ fileno = self._fh.fileno()
+ self._size = os.fstat(fileno).st_size
+ except IOError:
+ pass
+
+ def chunksize(self):
+ """Chunk size for resumable uploads.
+
+ Returns:
+ Chunk size in bytes.
+ """
+ return self._chunksize
+
+ def mimetype(self):
+ """Mime type of the body.
+
+ Returns:
+ Mime type.
+ """
+ return self._mimetype
+
+ def size(self):
+ """Size of upload.
+
+ Returns:
+ Size of the body, or None of the size is unknown.
+ """
+ return self._size
+
+ def resumable(self):
+ """Whether this upload is resumable.
+
+ Returns:
+ True if resumable upload or False.
+ """
+ return self._resumable
+
+ def getbytes(self, begin, length):
+ """Get bytes from the media.
+
+ Args:
+ begin: int, offset from beginning of file.
+ length: int, number of bytes to read, starting at begin.
+
+ Returns:
+ A string of bytes read. May be shorted than length if EOF was reached
+ first.
+ """
+ self._fh.seek(begin)
+ return self._fh.read(length)
+
+ def to_json(self):
+ """This upload type is not serializable."""
+ raise NotImplementedError('MediaIoBaseUpload is not serializable.')
+
+
class MediaInMemoryUpload(MediaUpload):
"""MediaUpload for a chunk of bytes.
@@ -229,7 +383,7 @@
"""
def __init__(self, body, mimetype='application/octet-stream',
- chunksize=256*1024, resumable=False):
+ chunksize=DEFAULT_CHUNK_SIZE, resumable=False):
"""Create a new MediaBytesUpload.
Args:
@@ -266,7 +420,7 @@
"""Size of upload.
Returns:
- Size of the body.
+ Size of the body, or None of the size is unknown.
"""
return len(self._body)
@@ -345,6 +499,7 @@
self.http = http
self.postproc = postproc
self.resumable = resumable
+ self._in_error_state = False
# Pull the multipart boundary out of the content-type header.
major, minor, params = mimeparse.parse_mime_type(
@@ -417,14 +572,24 @@
Returns:
(status, body): (ResumableMediaStatus, object)
The body will be None until the resumable media is fully uploaded.
+
+ Raises:
+ apiclient.errors.HttpError if the response was not a 2xx.
+ httplib2.Error if a transport error has occured.
"""
if http is None:
http = self.http
+ if self.resumable.size() is None:
+ size = '*'
+ else:
+ size = str(self.resumable.size())
+
if self.resumable_uri is None:
start_headers = copy.copy(self.headers)
start_headers['X-Upload-Content-Type'] = self.resumable.mimetype()
- start_headers['X-Upload-Content-Length'] = str(self.resumable.size())
+ if size != '*':
+ start_headers['X-Upload-Content-Length'] = size
start_headers['content-length'] = str(self.body_size)
resp, content = http.request(self.uri, self.method,
@@ -434,26 +599,63 @@
self.resumable_uri = resp['location']
else:
raise ResumableUploadError("Failed to retrieve starting URI.")
+ elif self._in_error_state:
+ # If we are in an error state then query the server for current state of
+ # the upload by sending an empty PUT and reading the 'range' header in
+ # the response.
+ headers = {
+ 'Content-Range': 'bytes */%s' % size,
+ 'content-length': '0'
+ }
+ resp, content = http.request(self.resumable_uri, 'PUT',
+ headers=headers)
+ status, body = self._process_response(resp, content)
+ if body:
+ # The upload was complete.
+ return (status, body)
- data = self.resumable.getbytes(self.resumable_progress,
- self.resumable.chunksize())
-
+ data = self.resumable.getbytes(
+ self.resumable_progress, self.resumable.chunksize())
headers = {
- 'Content-Range': 'bytes %d-%d/%d' % (
+ 'Content-Range': 'bytes %d-%d/%s' % (
self.resumable_progress, self.resumable_progress + len(data) - 1,
- self.resumable.size()),
+ size)
}
- resp, content = http.request(self.resumable_uri, 'PUT',
- body=data,
- headers=headers)
+ try:
+ resp, content = http.request(self.resumable_uri, 'PUT',
+ body=data,
+ headers=headers)
+ except:
+ self._in_error_state = True
+ raise
+
+ return self._process_response(resp, content)
+
+ def _process_response(self, resp, content):
+ """Process the response from a single chunk upload.
+
+ Args:
+ resp: httplib2.Response, the response object.
+ content: string, the content of the response.
+
+ Returns:
+ (status, body): (ResumableMediaStatus, object)
+ The body will be None until the resumable media is fully uploaded.
+
+ Raises:
+ apiclient.errors.HttpError if the response was not a 2xx or a 308.
+ """
if resp.status in [200, 201]:
+ self._in_error_state = False
return None, self.postproc(resp, content)
elif resp.status == 308:
+ self._in_error_state = False
# A "308 Resume Incomplete" indicates we are not done.
self.resumable_progress = int(resp['range'].split('-')[1]) + 1
if 'location' in resp:
self.resumable_uri = resp['location']
else:
+ self._in_error_state = True
raise HttpError(resp, content, self.uri)
return (MediaUploadProgress(self.resumable_progress, self.resumable.size()),
@@ -466,6 +668,7 @@
d['resumable'] = self.resumable.to_json()
del d['http']
del d['postproc']
+
return simplejson.dumps(d)
@staticmethod