Add support for resumable upload.

Reviewed in http://codereview.appspot.com/5417051/.
diff --git a/apiclient/discovery.py b/apiclient/discovery.py
index 2a53b0d..9e5b230 100644
--- a/apiclient/discovery.py
+++ b/apiclient/discovery.py
@@ -26,6 +26,7 @@
 import httplib2
 import logging
 import os
+import random
 import re
 import uritemplate
 import urllib
@@ -48,6 +49,8 @@
 from errors import UnknownApiNameOrVersion
 from errors import UnknownLinkType
 from http import HttpRequest
+from http import MediaUpload
+from http import MediaFileUpload
 from model import JsonModel
 
 URITEMPLATE = re.compile('{[^}]*}')
@@ -325,6 +328,7 @@
     if 'mediaUpload' in methodDesc:
       mediaUpload = methodDesc['mediaUpload']
       mediaPathUrl = mediaUpload['protocols']['simple']['path']
+      mediaResumablePathUrl = mediaUpload['protocols']['resumable']['path']
       accept = mediaUpload['accept']
       maxSize = _media_size_to_long(mediaUpload.get('maxSize', ''))
 
@@ -440,28 +444,46 @@
       expanded_url = uritemplate.expand(pathUrl, params)
       url = urlparse.urljoin(self._baseUrl, expanded_url + query)
 
+      resumable = None
+      multipart_boundary = ''
+
       if media_filename:
-        (media_mime_type, encoding) = mimetypes.guess_type(media_filename)
-        if media_mime_type is None:
-          raise UnknownFileType(media_filename)
-        if not mimeparse.best_match([media_mime_type], ','.join(accept)):
-          raise UnacceptableMimeTypeError(media_mime_type)
+        # Convert a simple filename into a MediaUpload object.
+        if isinstance(media_filename, basestring):
+          (media_mime_type, encoding) = mimetypes.guess_type(media_filename)
+          if media_mime_type is None:
+            raise UnknownFileType(media_filename)
+          if not mimeparse.best_match([media_mime_type], ','.join(accept)):
+            raise UnacceptableMimeTypeError(media_mime_type)
+          media_upload = MediaFileUpload(media_filename, media_mime_type)
+        elif isinstance(media_filename, MediaUpload):
+          media_upload = media_filename
+        else:
+          raise TypeError(
+              'media_filename must be str or MediaUpload. Got %s' % type(media_upload))
+
+        if media_upload.resumable():
+          resumable = media_upload
 
         # Check the maxSize
-        if maxSize > 0 and os.path.getsize(media_filename) > maxSize:
-          raise MediaUploadSizeError(media_filename)
+        if maxSize > 0 and media_upload.size() > maxSize:
+          raise MediaUploadSizeError("Media larger than: %s" % maxSize)
 
         # Use the media path uri for media uploads
-        expanded_url = uritemplate.expand(mediaPathUrl, params)
+        if media_upload.resumable():
+          expanded_url = uritemplate.expand(mediaResumablePathUrl, params)
+        else:
+          expanded_url = uritemplate.expand(mediaPathUrl, params)
         url = urlparse.urljoin(self._baseUrl, expanded_url + query)
 
         if body is None:
-          headers['content-type'] = media_mime_type
-          # make the body the contents of the file
-          f = file(media_filename, 'rb')
-          body = f.read()
-          f.close()
+          # This is a simple media upload
+          headers['content-type'] = media_upload.mimetype()
+          expanded_url = uritemplate.expand(mediaResumablePathUrl, params)
+          if not media_upload.resumable():
+            body = media_upload.getbytes(0, media_upload.size())
         else:
+          # This is a multipart/related upload.
           msgRoot = MIMEMultipart('related')
           # msgRoot should not write out it's own headers
           setattr(msgRoot, '_write_headers', lambda self: None)
@@ -472,19 +494,51 @@
           msgRoot.attach(msg)
 
           # attach the media as the second part
-          msg = MIMENonMultipart(*media_mime_type.split('/'))
+          msg = MIMENonMultipart(*media_upload.mimetype().split('/'))
           msg['Content-Transfer-Encoding'] = 'binary'
 
-          f = file(media_filename, 'rb')
-          msg.set_payload(f.read())
-          f.close()
-          msgRoot.attach(msg)
+          if media_upload.resumable():
+            # This is a multipart resumable upload, where a multipart payload
+            # looks like this:
+            #
+            #  --===============1678050750164843052==
+            #  Content-Type: application/json
+            #  MIME-Version: 1.0
+            #
+            #  {'foo': 'bar'}
+            #  --===============1678050750164843052==
+            #  Content-Type: image/png
+            #  MIME-Version: 1.0
+            #  Content-Transfer-Encoding: binary
+            #
+            #  <BINARY STUFF>
+            #  --===============1678050750164843052==--
+            #
+            # In the case of resumable multipart media uploads, the <BINARY
+            # STUFF> is large and will be spread across multiple PUTs.  What we
+            # do here is compose the multipart message with a random payload in
+            # place of <BINARY STUFF> and then split the resulting content into
+            # two pieces, text before <BINARY STUFF> and text after <BINARY
+            # STUFF>. The text after <BINARY STUFF> is the multipart boundary.
+            # In apiclient.http the HttpRequest will send the text before
+            # <BINARY STUFF>, then send the actual binary media in chunks, and
+            # then will send the multipart delimeter.
 
-          body = msgRoot.as_string()
+            payload = hex(random.getrandbits(300))
+            msg.set_payload(payload)
+            msgRoot.attach(msg)
+            body = msgRoot.as_string()
+            body, _ = body.split(payload)
+            resumable = media_upload
+          else:
+            payload = media_upload.getbytes(0, media_upload.size())
+            msg.set_payload(payload)
+            msgRoot.attach(msg)
+            body = msgRoot.as_string()
 
-          # must appear after the call to as_string() to get the right boundary
+          multipart_boundary = msgRoot.get_boundary()
           headers['content-type'] = ('multipart/related; '
-                                     'boundary="%s"') % msgRoot.get_boundary()
+                                     'boundary="%s"') % multipart_boundary
 
       logging.info('URL being requested: %s' % url)
       return self._requestBuilder(self._http,
@@ -493,7 +547,8 @@
                                   method=httpMethod,
                                   body=body,
                                   headers=headers,
-                                  methodId=methodId)
+                                  methodId=methodId,
+                                  resumable=resumable)
 
     docs = [methodDesc.get('description', DEFAULT_METHOD_DOC), '\n\n']
     if len(argmap) > 0:
diff --git a/apiclient/errors.py b/apiclient/errors.py
index ff0c154..30a48e8 100644
--- a/apiclient/errors.py
+++ b/apiclient/errors.py
@@ -85,6 +85,11 @@
   pass
 
 
+class ResumableUploadError(Error):
+  """Error occured during resumable upload."""
+  pass
+
+
 class UnexpectedMethodError(Error):
   """Exception raised by RequestMockBuilder on unexpected calls."""
 
diff --git a/apiclient/http.py b/apiclient/http.py
index d2a3a2f..0b45a44 100644
--- a/apiclient/http.py
+++ b/apiclient/http.py
@@ -25,16 +25,187 @@
     'set_user_agent', 'tunnel_patch'
     ]
 
+import copy
 import httplib2
 import os
+import mimeparse
+import mimetypes
 
 from model import JsonModel
 from errors import HttpError
+from errors import ResumableUploadError
 from errors import UnexpectedBodyError
 from errors import UnexpectedMethodError
 from anyjson import simplejson
 
 
+class MediaUploadProgress(object):
+  """Status of a resumable upload."""
+
+  def __init__(self, resumable_progress, total_size):
+    """Constructor.
+
+    Args:
+      resumable_progress: int, bytes sent so far.
+      total_size: int, total bytes in complete upload.
+    """
+    self.resumable_progress = resumable_progress
+    self.total_size = total_size
+
+  def progress(self):
+    """Percent of upload completed, as a float."""
+    return float(self.resumable_progress)/float(self.total_size)
+
+
+class MediaUpload(object):
+  """Describes a media object to upload.
+
+  Base class that defines the interface of MediaUpload subclasses.
+  """
+
+  def getbytes(self, begin, end):
+    raise NotImplementedError()
+
+  def size(self):
+    raise NotImplementedError()
+
+  def chunksize(self):
+    raise NotImplementedError()
+
+  def mimetype(self):
+    return 'application/octet-stream'
+
+  def resumable(self):
+    return False
+
+  def _to_json(self, strip=None):
+    """Utility function for creating a JSON representation of a MediaUpload.
+
+    Args:
+      strip: array, An array of names of members to not include in the JSON.
+
+    Returns:
+       string, a JSON representation of this instance, suitable to pass to
+       from_json().
+    """
+    t = type(self)
+    d = copy.copy(self.__dict__)
+    if strip is not None:
+      for member in strip:
+        del d[member]
+    d['_class'] = t.__name__
+    d['_module'] = t.__module__
+    return simplejson.dumps(d)
+
+  def to_json(self):
+    """Create a JSON representation of an instance of MediaUpload.
+
+    Returns:
+       string, a JSON representation of this instance, suitable to pass to
+       from_json().
+    """
+    return self._to_json()
+
+  @classmethod
+  def new_from_json(cls, s):
+    """Utility class method to instantiate a MediaUpload subclass from a JSON
+    representation produced by to_json().
+
+    Args:
+      s: string, JSON from to_json().
+
+    Returns:
+      An instance of the subclass of MediaUpload that was serialized with
+      to_json().
+    """
+    data = simplejson.loads(s)
+    # Find and call the right classmethod from_json() to restore the object.
+    module = data['_module']
+    m = __import__(module, fromlist=module.split('.')[:-1])
+    kls = getattr(m, data['_class'])
+    from_json = getattr(kls, 'from_json')
+    return from_json(s)
+
+class MediaFileUpload(MediaUpload):
+  """A MediaUpload for a file.
+
+  Construct a MediaFileUpload and pass as the media_body parameter of the
+  method. For example, if we had a service that allowed uploading images:
+
+
+    media = MediaFileUpload('smiley.png', mimetype='image/png', chunksize=1000,
+                    resumable=True)
+    service.objects().insert(
+        bucket=buckets['items'][0]['id'],
+        name='smiley.png',
+        media_body=media).execute()
+  """
+
+  def __init__(self, filename, mimetype=None, chunksize=10000, resumable=False):
+    """Constructor.
+
+    Args:
+      filename: string, Name of the file.
+      mimetype: string, Mime-type of the file. If None then a mime-type will be
+        guessed from the file extension.
+      chunksize: int, File will be uploaded in chunks of this many bytes. Only
+        used if resumable=True.
+      resumable: bool, True if this is a resumable upload. False means upload in
+        a single request.
+    """
+    self._filename = filename
+    self._size = os.path.getsize(filename)
+    self._fd = None
+    if mimetype is None:
+      (mimetype, encoding) = mimetypes.guess_type(filename)
+    self._mimetype = mimetype
+    self._chunksize = chunksize
+    self._resumable = resumable
+
+  def mimetype(self):
+    return self._mimetype
+
+  def size(self):
+    return self._size
+
+  def chunksize(self):
+    return self._chunksize
+
+  def resumable(self):
+    return self._resumable
+
+  def getbytes(self, begin, length):
+    """Get bytes from the media.
+
+    Args:
+      begin: int, offset from beginning of file.
+      length: int, number of bytes to read, starting at begin.
+
+    Returns:
+      A string of bytes read. May be shorted than length if EOF was reached
+      first.
+    """
+    if self._fd is None:
+      self._fd = open(self._filename, 'rb')
+    self._fd.seek(begin)
+    return self._fd.read(length)
+
+  def to_json(self):
+    """Creating a JSON representation of an instance of Credentials.
+
+    Returns:
+       string, a JSON representation of this instance, suitable to pass to
+       from_json().
+    """
+    return self._to_json(['_fd'])
+
+  @staticmethod
+  def from_json(s):
+    d = simplejson.loads(s)
+    return MediaFileUpload(
+        d['_filename'], d['_mimetype'], d['_chunksize'], d['_resumable'])
+
+
 class HttpRequest(object):
   """Encapsulates a single HTTP request.
   """
@@ -43,7 +214,8 @@
                method='GET',
                body=None,
                headers=None,
-               methodId=None):
+               methodId=None,
+               resumable=None):
     """Constructor for an HttpRequest.
 
     Args:
@@ -53,16 +225,39 @@
                 on an error.
       uri: string, the absolute URI to send the request to
       method: string, the HTTP method to use
-      body: string, the request body of the HTTP request
+      body: string, the request body of the HTTP request,
       headers: dict, the HTTP request headers
       methodId: string, a unique identifier for the API method being called.
+      resumable: MediaUpload, None if this is not a resumbale request.
     """
     self.uri = uri
     self.method = method
     self.body = body
     self.headers = headers or {}
+    self.methodId = methodId
     self.http = http
     self.postproc = postproc
+    self.resumable = resumable
+
+    major, minor, params = mimeparse.parse_mime_type(
+        headers.get('content-type', 'application/json'))
+    self.multipart_boundary = params.get('boundary', '').strip('"')
+
+    # If this was a multipart resumable, the size of the non-media part.
+    self.multipart_size = 0
+
+    # The resumable URI to send chunks to.
+    self.resumable_uri = None
+
+    # The bytes that have been uploaded.
+    self.resumable_progress = 0
+
+    if resumable is not None:
+      if self.body is not None:
+        self.multipart_size = len(self.body)
+      else:
+        self.multipart_size = 0
+      self.total_size = self.resumable.size() + self.multipart_size + len(self.multipart_boundary)
 
   def execute(self, http=None):
     """Execute the request.
@@ -81,14 +276,118 @@
     """
     if http is None:
       http = self.http
-    resp, content = http.request(self.uri, self.method,
-                                      body=self.body,
-                                      headers=self.headers)
+    if self.resumable:
+      body = None
+      while body is None:
+        _, body = self.next_chunk(http)
+      return body
+    else:
+      resp, content = http.request(self.uri, self.method,
+                                   body=self.body,
+                                   headers=self.headers)
 
-    if resp.status >= 300:
-      raise HttpError(resp, content, self.uri)
+      if resp.status >= 300:
+        raise HttpError(resp, content, self.uri)
     return self.postproc(resp, content)
 
+  def next_chunk(self, http=None):
+    """Execute the next step of a resumable upload.
+
+    Can only be used if the method being executed supports media uploads and the
+    MediaUpload object passed in was flagged as using resumable upload.
+
+    Example:
+
+      media = MediaFileUpload('smiley.png', mimetype='image/png', chunksize=1000,
+                              resumable=True)
+      request = service.objects().insert(
+          bucket=buckets['items'][0]['id'],
+          name='smiley.png',
+          media_body=media)
+
+      response = None
+      while response is None:
+        status, response = request.next_chunk()
+        if status:
+          print "Upload %d%% complete." % int(status.progress() * 100)
+
+
+    Returns:
+      (status, body): (ResumableMediaStatus, object)
+         The body will be None until the resumable media is fully uploaded.
+    """
+    if http is None:
+      http = self.http
+
+    if self.resumable_uri is None:
+      start_headers = copy.copy(self.headers)
+      start_headers['X-Upload-Content-Type'] = self.resumable.mimetype()
+      start_headers['X-Upload-Content-Length'] = str(self.resumable.size())
+      start_headers['Content-Length'] = '0'
+      resp, content = http.request(self.uri, self.method,
+                                   body="",
+                                   headers=start_headers)
+      if resp.status == 200 and 'location' in resp:
+        self.resumable_uri = resp['location']
+      else:
+        raise ResumableUploadError("Failed to retrieve starting URI.")
+    if self.body:
+      begin = 0
+      data = self.body
+    else:
+      begin = self.resumable_progress - self.multipart_size
+      data = self.resumable.getbytes(begin, self.resumable.chunksize())
+
+    # Tack on the multipart/related boundary if we are at the end of the file.
+    if begin + self.resumable.chunksize() >= self.resumable.size():
+      data += self.multipart_boundary
+    headers = {
+        'Content-Range': 'bytes %d-%d/%d' % (
+            self.resumable_progress, self.resumable_progress + len(data) - 1,
+            self.total_size),
+        }
+    resp, content = http.request(self.resumable_uri, 'PUT',
+                                 body=data,
+                                 headers=headers)
+    if resp.status in [200, 201]:
+      return None, self.postproc(resp, content)
+    # A "308 Resume Incomplete" indicates we are not done.
+    elif resp.status == 308:
+      self.resumable_progress = int(resp['range'].split('-')[1]) + 1
+      if self.resumable_progress >= self.multipart_size:
+        self.body = None
+      if 'location' in resp:
+        self.resumable_uri = resp['location']
+    else:
+      raise HttpError(resp, content, self.uri)
+
+    return MediaUploadProgress(self.resumable_progress, self.total_size), None
+
+  def to_json(self):
+    """Returns a JSON representation of the HttpRequest."""
+    d = copy.copy(self.__dict__)
+    if d['resumable'] is not None:
+      d['resumable'] = self.resumable.to_json()
+    del d['http']
+    del d['postproc']
+    return simplejson.dumps(d)
+
+  @staticmethod
+  def from_json(s, http, postproc):
+    """Returns an HttpRequest populated with info from a JSON object."""
+    d = simplejson.loads(s)
+    if d['resumable'] is not None:
+      d['resumable'] = MediaUpload.new_from_json(d['resumable'])
+    return HttpRequest(
+        http,
+        postproc,
+        uri = d['uri'],
+        method= d['method'],
+        body=d['body'],
+        headers=d['headers'],
+        methodId=d['methodId'],
+        resumable=d['resumable'])
+
 
 class HttpRequestMock(object):
   """Mock of HttpRequest.
@@ -166,7 +465,7 @@
     self.check_unexpected = check_unexpected
 
   def __call__(self, http, postproc, uri, method='GET', body=None,
-               headers=None, methodId=None):
+               headers=None, methodId=None, resumable=None):
     """Implements the callable interface that discovery.build() expects
     of requestBuilder, which is to build an object compatible with
     HttpRequest.execute(). See that method for the description of the