Update Media Upload to include io.Base and also fix some bugs.

Fixes issue #139.
Fixes issue #123.

Reviewed in http://codereview.appspot.com/6307067/.
diff --git a/apiclient/http.py b/apiclient/http.py
index 0c49045..9155cab 100644
--- a/apiclient/http.py
+++ b/apiclient/http.py
@@ -46,6 +46,9 @@
 from oauth2client.anyjson import simplejson
 
 
+DEFAULT_CHUNK_SIZE = 512*1024
+
+
 class MediaUploadProgress(object):
   """Status of a resumable upload."""
 
@@ -54,14 +57,23 @@
 
     Args:
       resumable_progress: int, bytes sent so far.
-      total_size: int, total bytes in complete upload.
+      total_size: int, total bytes in complete upload, or None if the total
+        upload size isn't known ahead of time.
     """
     self.resumable_progress = resumable_progress
     self.total_size = total_size
 
   def progress(self):
-    """Percent of upload completed, as a float."""
-    return float(self.resumable_progress) / float(self.total_size)
+    """Percent of upload completed, as a float.
+
+    Returns:
+      the percentage complete as a float, returning 0.0 if the total size of
+      the upload is unknown.
+    """
+    if self.total_size is not None:
+      return float(self.resumable_progress) / float(self.total_size)
+    else:
+      return 0.0
 
 
 class MediaUpload(object):
@@ -77,21 +89,51 @@
   such as under certain classes of requests under Google App Engine.
   """
 
-  def getbytes(self, begin, end):
-    raise NotImplementedError()
-
-  def size(self):
-    raise NotImplementedError()
-
   def chunksize(self):
+    """Chunk size for resumable uploads.
+
+    Returns:
+      Chunk size in bytes.
+    """
     raise NotImplementedError()
 
   def mimetype(self):
+    """Mime type of the body.
+
+    Returns:
+      Mime type.
+    """
     return 'application/octet-stream'
 
+  def size(self):
+    """Size of upload.
+
+    Returns:
+      Size of the body, or None of the size is unknown.
+    """
+    return None
+
   def resumable(self):
+    """Whether this upload is resumable.
+
+    Returns:
+      True if resumable upload or False.
+    """
     return False
 
+  def getbytes(self, begin, end):
+    """Get bytes from the media.
+
+    Args:
+      begin: int, offset from beginning of file.
+      length: int, number of bytes to read, starting at begin.
+
+    Returns:
+      A string of bytes read. May be shorter than length if EOF was reached
+      first.
+    """
+    raise NotImplementedError()
+
   def _to_json(self, strip=None):
     """Utility function for creating a JSON representation of a MediaUpload.
 
@@ -148,15 +190,15 @@
   method. For example, if we had a service that allowed uploading images:
 
 
-    media = MediaFileUpload('smiley.png', mimetype='image/png', chunksize=1000,
-                    resumable=True)
+    media = MediaFileUpload('smiley.png', mimetype='image/png',
+      chunksize=1024*1024, resumable=True)
     service.objects().insert(
         bucket=buckets['items'][0]['id'],
         name='smiley.png',
         media_body=media).execute()
   """
 
-  def __init__(self, filename, mimetype=None, chunksize=256*1024, resumable=False):
+  def __init__(self, filename, mimetype=None, chunksize=DEFAULT_CHUNK_SIZE, resumable=False):
     """Constructor.
 
     Args:
@@ -177,16 +219,36 @@
     self._chunksize = chunksize
     self._resumable = resumable
 
+  def chunksize(self):
+    """Chunk size for resumable uploads.
+
+    Returns:
+      Chunk size in bytes.
+    """
+    return self._chunksize
+
   def mimetype(self):
+    """Mime type of the body.
+
+    Returns:
+      Mime type.
+    """
     return self._mimetype
 
   def size(self):
+    """Size of upload.
+
+    Returns:
+      Size of the body, or None of the size is unknown.
+    """
     return self._size
 
-  def chunksize(self):
-    return self._chunksize
-
   def resumable(self):
+    """Whether this upload is resumable.
+
+    Returns:
+      True if resumable upload or False.
+    """
     return self._resumable
 
   def getbytes(self, begin, length):
@@ -221,6 +283,98 @@
         d['_filename'], d['_mimetype'], d['_chunksize'], d['_resumable'])
 
 
+class MediaIoBaseUpload(MediaUpload):
+  """A MediaUpload for a io.Base objects.
+
+  Note that the Python file object is compatible with io.Base and can be used
+  with this class also.
+
+
+    fh = io.BytesIO('...Some data to upload...')
+    media = MediaIoBaseUpload(fh, mimetype='image/png',
+      chunksize=1024*1024, resumable=True)
+    service.objects().insert(
+        bucket='a_bucket_id',
+        name='smiley.png',
+        media_body=media).execute()
+  """
+
+  def __init__(self, fh, mimetype, chunksize=DEFAULT_CHUNK_SIZE,
+      resumable=False):
+    """Constructor.
+
+    Args:
+      fh: io.Base or file object, The source of the bytes to upload.
+      mimetype: string, Mime-type of the file. If None then a mime-type will be
+        guessed from the file extension.
+      chunksize: int, File will be uploaded in chunks of this many bytes. Only
+        used if resumable=True.
+      resumable: bool, True if this is a resumable upload. False means upload
+        in a single request.
+    """
+    self._fh = fh
+    self._mimetype = mimetype
+    self._chunksize = chunksize
+    self._resumable = resumable
+    self._size = None
+    try:
+      if hasattr(self._fh, 'fileno'):
+        fileno = self._fh.fileno()
+        self._size = os.fstat(fileno).st_size
+    except IOError:
+      pass
+
+  def chunksize(self):
+    """Chunk size for resumable uploads.
+
+    Returns:
+      Chunk size in bytes.
+    """
+    return self._chunksize
+
+  def mimetype(self):
+    """Mime type of the body.
+
+    Returns:
+      Mime type.
+    """
+    return self._mimetype
+
+  def size(self):
+    """Size of upload.
+
+    Returns:
+      Size of the body, or None of the size is unknown.
+    """
+    return self._size
+
+  def resumable(self):
+    """Whether this upload is resumable.
+
+    Returns:
+      True if resumable upload or False.
+    """
+    return self._resumable
+
+  def getbytes(self, begin, length):
+    """Get bytes from the media.
+
+    Args:
+      begin: int, offset from beginning of file.
+      length: int, number of bytes to read, starting at begin.
+
+    Returns:
+      A string of bytes read. May be shorted than length if EOF was reached
+      first.
+    """
+    self._fh.seek(begin)
+    return self._fh.read(length)
+
+  def to_json(self):
+    """This upload type is not serializable."""
+    raise NotImplementedError('MediaIoBaseUpload is not serializable.')
+
+
 class MediaInMemoryUpload(MediaUpload):
   """MediaUpload for a chunk of bytes.
 
@@ -229,7 +383,7 @@
   """
 
   def __init__(self, body, mimetype='application/octet-stream',
-               chunksize=256*1024, resumable=False):
+               chunksize=DEFAULT_CHUNK_SIZE, resumable=False):
     """Create a new MediaBytesUpload.
 
     Args:
@@ -266,7 +420,7 @@
     """Size of upload.
 
     Returns:
-      Size of the body.
+      Size of the body, or None of the size is unknown.
     """
     return len(self._body)
 
@@ -345,6 +499,7 @@
     self.http = http
     self.postproc = postproc
     self.resumable = resumable
+    self._in_error_state = False
 
     # Pull the multipart boundary out of the content-type header.
     major, minor, params = mimeparse.parse_mime_type(
@@ -417,14 +572,24 @@
     Returns:
       (status, body): (ResumableMediaStatus, object)
          The body will be None until the resumable media is fully uploaded.
+
+    Raises:
+      apiclient.errors.HttpError if the response was not a 2xx.
+      httplib2.Error if a transport error has occured.
     """
     if http is None:
       http = self.http
 
+    if self.resumable.size() is None:
+      size = '*'
+    else:
+      size = str(self.resumable.size())
+
     if self.resumable_uri is None:
       start_headers = copy.copy(self.headers)
       start_headers['X-Upload-Content-Type'] = self.resumable.mimetype()
-      start_headers['X-Upload-Content-Length'] = str(self.resumable.size())
+      if size != '*':
+        start_headers['X-Upload-Content-Length'] = size
       start_headers['content-length'] = str(self.body_size)
 
       resp, content = http.request(self.uri, self.method,
@@ -434,26 +599,63 @@
         self.resumable_uri = resp['location']
       else:
         raise ResumableUploadError("Failed to retrieve starting URI.")
+    elif self._in_error_state:
+      # If we are in an error state then query the server for current state of
+      # the upload by sending an empty PUT and reading the 'range' header in
+      # the response.
+      headers = {
+          'Content-Range': 'bytes */%s' % size,
+          'content-length': '0'
+          }
+      resp, content = http.request(self.resumable_uri, 'PUT',
+                                   headers=headers)
+      status, body = self._process_response(resp, content)
+      if body:
+        # The upload was complete.
+        return (status, body)
 
-    data = self.resumable.getbytes(self.resumable_progress,
-                                   self.resumable.chunksize())
-
+    data = self.resumable.getbytes(
+        self.resumable_progress, self.resumable.chunksize())
     headers = {
-        'Content-Range': 'bytes %d-%d/%d' % (
+        'Content-Range': 'bytes %d-%d/%s' % (
             self.resumable_progress, self.resumable_progress + len(data) - 1,
-            self.resumable.size()),
+            size)
         }
-    resp, content = http.request(self.resumable_uri, 'PUT',
-                                 body=data,
-                                 headers=headers)
+    try:
+      resp, content = http.request(self.resumable_uri, 'PUT',
+                                   body=data,
+                                   headers=headers)
+    except:
+      self._in_error_state = True
+      raise
+
+    return self._process_response(resp, content)
+
+  def _process_response(self, resp, content):
+    """Process the response from a single chunk upload.
+
+    Args:
+      resp: httplib2.Response, the response object.
+      content: string, the content of the response.
+
+    Returns:
+      (status, body): (ResumableMediaStatus, object)
+         The body will be None until the resumable media is fully uploaded.
+
+    Raises:
+      apiclient.errors.HttpError if the response was not a 2xx or a 308.
+    """
     if resp.status in [200, 201]:
+      self._in_error_state = False
       return None, self.postproc(resp, content)
     elif resp.status == 308:
+      self._in_error_state = False
       # A "308 Resume Incomplete" indicates we are not done.
       self.resumable_progress = int(resp['range'].split('-')[1]) + 1
       if 'location' in resp:
         self.resumable_uri = resp['location']
     else:
+      self._in_error_state = True
       raise HttpError(resp, content, self.uri)
 
     return (MediaUploadProgress(self.resumable_progress, self.resumable.size()),
@@ -466,6 +668,7 @@
       d['resumable'] = self.resumable.to_json()
     del d['http']
     del d['postproc']
+
     return simplejson.dumps(d)
 
   @staticmethod
diff --git a/tests/test_discovery.py b/tests/test_discovery.py
index 1cefefd..5970970 100644
--- a/tests/test_discovery.py
+++ b/tests/test_discovery.py
@@ -27,6 +27,8 @@
 import os
 import unittest
 import urlparse
+import StringIO
+
 
 try:
     from urlparse import parse_qs
@@ -42,9 +44,10 @@
 from apiclient.http import HttpMock
 from apiclient.http import HttpMockSequence
 from apiclient.http import MediaFileUpload
+from apiclient.http import MediaIoBaseUpload
 from apiclient.http import MediaUploadProgress
 from apiclient.http import tunnel_patch
-
+from oauth2client.anyjson import simplejson
 
 DATA_DIR = os.path.join(os.path.dirname(__file__), 'data')
 
@@ -505,7 +508,90 @@
       ])
 
     self.assertRaises(HttpError, request.execute, http)
+    self.assertTrue(request._in_error_state)
 
+    http = HttpMockSequence([
+      ({'status': '308',
+        'range': '0-5'}, ''),
+      ({'status': '308',
+        'range': '0-6'}, ''),
+      ])
+
+    status, body = request.next_chunk(http)
+    self.assertEquals(status.resumable_progress, 7,
+      'Should have first checked length and then tried to PUT more.')
+    self.assertFalse(request._in_error_state)
+
+    # Put it back in an error state.
+    http = HttpMockSequence([
+      ({'status': '400'}, ''),
+      ])
+    self.assertRaises(HttpError, request.execute, http)
+    self.assertTrue(request._in_error_state)
+
+    # Pretend the last request that 400'd actually succeeded.
+    http = HttpMockSequence([
+      ({'status': '200'}, '{"foo": "bar"}'),
+      ])
+    status, body = request.next_chunk(http)
+    self.assertEqual(body, {'foo': 'bar'})
+
+  def test_resumable_media_handle_uploads_of_unknown_size(self):
+    http = HttpMockSequence([
+      ({'status': '200',
+        'location': 'http://upload.example.com'}, ''),
+      ({'status': '200'}, 'echo_request_headers_as_json'),
+      ])
+
+    self.http = HttpMock(datafile('zoo.json'), {'status': '200'})
+    zoo = build('zoo', 'v1', self.http)
+
+    fh = StringIO.StringIO('data goes here')
+
+    # Create an upload that doesn't know the full size of the media.
+    upload = MediaIoBaseUpload(
+        fh=fh, mimetype='image/png', chunksize=500, resumable=True)
+
+    request = zoo.animals().insert(media_body=upload, body=None)
+    status, body = request.next_chunk(http)
+    self.assertEqual(body, {'Content-Range': 'bytes 0-13/*'},
+      'Should be 14 out of * bytes.')
+
+  def test_resumable_media_handle_resume_of_upload_of_unknown_size(self):
+    http = HttpMockSequence([
+      ({'status': '200',
+        'location': 'http://upload.example.com'}, ''),
+      ({'status': '400'}, ''),
+      ])
+
+    self.http = HttpMock(datafile('zoo.json'), {'status': '200'})
+    zoo = build('zoo', 'v1', self.http)
+
+    # Create an upload that doesn't know the full size of the media.
+    fh = StringIO.StringIO('data goes here')
+
+    upload = MediaIoBaseUpload(
+        fh=fh, mimetype='image/png', chunksize=500, resumable=True)
+
+    request = zoo.animals().insert(media_body=upload, body=None)
+
+    # Put it in an error state.
+    self.assertRaises(HttpError, request.next_chunk, http)
+
+    http = HttpMockSequence([
+      ({'status': '400',
+        'range': '0-5'}, 'echo_request_headers_as_json'),
+      ])
+    try:
+      # Should resume the upload by first querying the status of the upload.
+      request.next_chunk(http)
+    except HttpError, e:
+      expected = {
+          'Content-Range': 'bytes */*',
+          'content-length': '0'
+          }
+      self.assertEqual(expected, simplejson.loads(e.content),
+        'Should send an empty body when requesting the current upload status.')
 
 class Next(unittest.TestCase):
 
diff --git a/tests/test_http.py b/tests/test_http.py
index 499366a..b8b638e 100644
--- a/tests/test_http.py
+++ b/tests/test_http.py
@@ -25,6 +25,7 @@
 import httplib2
 import os
 import unittest
+import StringIO
 
 from apiclient.errors import BatchError
 from apiclient.http import BatchHttpRequest
@@ -33,6 +34,7 @@
 from apiclient.http import MediaFileUpload
 from apiclient.http import MediaUpload
 from apiclient.http import MediaInMemoryUpload
+from apiclient.http import MediaIoBaseUpload
 from apiclient.http import set_user_agent
 from apiclient.model import JsonModel
 from oauth2client.client import Credentials
@@ -110,6 +112,9 @@
     resp, content = http.request("http://example.com")
     self.assertEqual('my_app/5.5 my_library/0.1', content['user-agent'])
 
+
+class TestMediaUpload(unittest.TestCase):
+
   def test_media_file_upload_to_from_json(self):
     upload = MediaFileUpload(
         datafile('small.png'), chunksize=500, resumable=True)
@@ -176,6 +181,76 @@
     self.assertEqual(http, new_req.http)
     self.assertEqual(media_upload.to_json(), new_req.resumable.to_json())
 
+
+class TestMediaIoBaseUpload(unittest.TestCase):
+
+  def test_media_io_base_upload_from_file_io(self):
+    try:
+      import io
+
+      fh = io.FileIO(datafile('small.png'), 'r')
+      upload = MediaIoBaseUpload(
+          fh=fh, mimetype='image/png', chunksize=500, resumable=True)
+      self.assertEqual('image/png', upload.mimetype())
+      self.assertEqual(190, upload.size())
+      self.assertEqual(True, upload.resumable())
+      self.assertEqual(500, upload.chunksize())
+      self.assertEqual('PNG', upload.getbytes(1, 3))
+    except ImportError:
+      pass
+
+  def test_media_io_base_upload_from_file_object(self):
+    f = open(datafile('small.png'), 'r')
+    upload = MediaIoBaseUpload(
+        fh=f, mimetype='image/png', chunksize=500, resumable=True)
+    self.assertEqual('image/png', upload.mimetype())
+    self.assertEqual(190, upload.size())
+    self.assertEqual(True, upload.resumable())
+    self.assertEqual(500, upload.chunksize())
+    self.assertEqual('PNG', upload.getbytes(1, 3))
+    f.close()
+
+  def test_media_io_base_upload_serializable(self):
+    f = open(datafile('small.png'), 'r')
+    upload = MediaIoBaseUpload(fh=f, mimetype='image/png')
+
+    try:
+      json = upload.to_json()
+      self.fail('MediaIoBaseUpload should not be serializable.')
+    except NotImplementedError:
+      pass
+
+  def test_media_io_base_upload_from_string_io(self):
+    f = open(datafile('small.png'), 'r')
+    fh = StringIO.StringIO(f.read())
+    f.close()
+
+    upload = MediaIoBaseUpload(
+        fh=fh, mimetype='image/png', chunksize=500, resumable=True)
+    self.assertEqual('image/png', upload.mimetype())
+    self.assertEqual(None, upload.size())
+    self.assertEqual(True, upload.resumable())
+    self.assertEqual(500, upload.chunksize())
+    self.assertEqual('PNG', upload.getbytes(1, 3))
+    f.close()
+
+  def test_media_io_base_upload_from_bytes(self):
+    try:
+      import io
+
+      f = open(datafile('small.png'), 'r')
+      fh = io.BytesIO(f.read())
+      upload = MediaIoBaseUpload(
+          fh=fh, mimetype='image/png', chunksize=500, resumable=True)
+      self.assertEqual('image/png', upload.mimetype())
+      self.assertEqual(None, upload.size())
+      self.assertEqual(True, upload.resumable())
+      self.assertEqual(500, upload.chunksize())
+      self.assertEqual('PNG', upload.getbytes(1, 3))
+    except ImportError:
+      pass
+
+
 EXPECTED = """POST /someapi/v1/collection/?foo=bar HTTP/1.1
 Content-Type: application/json
 MIME-Version: 1.0