Autoformat py files using Black (#105)
* Autoformat all py files using Black
* Fix lint errors
* Fix indentation errors (https://travis-ci.org/httplib2/httplib2/jobs/408136309)
* Refactor three test cases and exclude them on on Travis py27/pypy
diff --git a/python2/httplib2/__init__.py b/python2/httplib2/__init__.py
index f57278c..d1e0ed5 100644
--- a/python2/httplib2/__init__.py
+++ b/python2/httplib2/__init__.py
@@ -1,55 +1,52 @@
+"""Small, fast HTTP client library for Python.
+
+Features persistent connections, cache, and Google App Engine support.
+"""
+
from __future__ import print_function
-"""
-httplib2
-
-A caching http interface that supports ETags and gzip
-to conserve bandwidth.
-
-Requires Python 2.3 or later
-
-Changelog:
-2007-08-18, Rick: Modified so it's able to use a socks proxy if needed.
-
-"""
__author__ = "Joe Gregorio (joe@bitworking.org)"
__copyright__ = "Copyright 2006, Joe Gregorio"
-__contributors__ = ["Thomas Broyer (t.broyer@ltgt.net)",
- "James Antill",
- "Xavier Verges Farrero",
- "Jonathan Feinberg",
- "Blair Zajac",
- "Sam Ruby",
- "Louis Nyffenegger",
- "Alex Yu"]
+__contributors__ = [
+ "Thomas Broyer (t.broyer@ltgt.net)",
+ "James Antill",
+ "Xavier Verges Farrero",
+ "Jonathan Feinberg",
+ "Blair Zajac",
+ "Sam Ruby",
+ "Louis Nyffenegger",
+ "Alex Yu",
+]
__license__ = "MIT"
-__version__ = '0.11.3'
+__version__ = "0.11.3"
-import re
-import sys
-import email
-import email.Utils
-import email.Message
-import email.FeedParser
-import StringIO
-import gzip
-import zlib
-import httplib
-import urlparse
-import urllib
import base64
-import os
-import copy
import calendar
-import time
-import random
+import copy
+import email
+import email.FeedParser
+import email.Message
+import email.Utils
import errno
+import gzip
+import httplib
+import os
+import random
+import re
+import StringIO
+import sys
+import time
+import urllib
+import urlparse
+import zlib
+
try:
from hashlib import sha1 as _sha, md5 as _md5
except ImportError:
# prior to Python 2.5, these were separate modules
import sha
import md5
+
_sha = sha.new
_md5 = md5.new
import hmac
@@ -73,12 +70,13 @@
except ImportError:
pass
if ssl is not None:
- ssl_SSLError = getattr(ssl, 'SSLError', None)
- ssl_CertificateError = getattr(ssl, 'CertificateError', None)
+ ssl_SSLError = getattr(ssl, "SSLError", None)
+ ssl_CertificateError = getattr(ssl, "CertificateError", None)
-def _ssl_wrap_socket(sock, key_file, cert_file, disable_validation,
- ca_certs, ssl_version, hostname):
+def _ssl_wrap_socket(
+ sock, key_file, cert_file, disable_validation, ca_certs, ssl_version, hostname
+):
if disable_validation:
cert_reqs = ssl.CERT_NONE
else:
@@ -86,53 +84,69 @@
if ssl_version is None:
ssl_version = ssl.PROTOCOL_SSLv23
- if hasattr(ssl, 'SSLContext'): # Python 2.7.9
+ if hasattr(ssl, "SSLContext"): # Python 2.7.9
context = ssl.SSLContext(ssl_version)
context.verify_mode = cert_reqs
- context.check_hostname = (cert_reqs != ssl.CERT_NONE)
+ context.check_hostname = cert_reqs != ssl.CERT_NONE
if cert_file:
context.load_cert_chain(cert_file, key_file)
if ca_certs:
context.load_verify_locations(ca_certs)
return context.wrap_socket(sock, server_hostname=hostname)
else:
- return ssl.wrap_socket(sock, keyfile=key_file, certfile=cert_file,
- cert_reqs=cert_reqs, ca_certs=ca_certs,
- ssl_version=ssl_version)
+ return ssl.wrap_socket(
+ sock,
+ keyfile=key_file,
+ certfile=cert_file,
+ cert_reqs=cert_reqs,
+ ca_certs=ca_certs,
+ ssl_version=ssl_version,
+ )
-def _ssl_wrap_socket_unsupported(sock, key_file, cert_file, disable_validation,
- ca_certs, ssl_version, hostname):
+def _ssl_wrap_socket_unsupported(
+ sock, key_file, cert_file, disable_validation, ca_certs, ssl_version, hostname
+):
if not disable_validation:
raise CertificateValidationUnsupported(
- "SSL certificate validation is not supported without "
- "the ssl module installed. To avoid this error, install "
- "the ssl module, or explicity disable validation.")
+ "SSL certificate validation is not supported without "
+ "the ssl module installed. To avoid this error, install "
+ "the ssl module, or explicity disable validation."
+ )
ssl_sock = socket.ssl(sock, key_file, cert_file)
return httplib.FakeSocket(sock, ssl_sock)
+
if ssl is None:
_ssl_wrap_socket = _ssl_wrap_socket_unsupported
-
-if sys.version_info >= (2,3):
+if sys.version_info >= (2, 3):
from iri2uri import iri2uri
else:
+
def iri2uri(uri):
return uri
-def has_timeout(timeout): # python 2.6
- if hasattr(socket, '_GLOBAL_DEFAULT_TIMEOUT'):
- return (timeout is not None and timeout is not socket._GLOBAL_DEFAULT_TIMEOUT)
- return (timeout is not None)
+
+def has_timeout(timeout): # python 2.6
+ if hasattr(socket, "_GLOBAL_DEFAULT_TIMEOUT"):
+ return timeout is not None and timeout is not socket._GLOBAL_DEFAULT_TIMEOUT
+ return timeout is not None
+
__all__ = [
- 'Http', 'Response', 'ProxyInfo', 'HttpLib2Error', 'RedirectMissingLocation',
- 'RedirectLimit', 'FailedToDecompressContent',
- 'UnimplementedDigestAuthOptionError',
- 'UnimplementedHmacDigestAuthOptionError',
- 'debuglevel', 'ProxiesUnavailableError']
-
+ "Http",
+ "Response",
+ "ProxyInfo",
+ "HttpLib2Error",
+ "RedirectMissingLocation",
+ "RedirectLimit",
+ "FailedToDecompressContent",
+ "UnimplementedDigestAuthOptionError",
+ "UnimplementedHmacDigestAuthOptionError",
+ "debuglevel",
+ "ProxiesUnavailableError",
+]
# The httplib debug level, set to a non-zero value to get debug output
debuglevel = 0
@@ -141,7 +155,8 @@
RETRIES = 2
# Python 2.3 support
-if sys.version_info < (2,4):
+if sys.version_info < (2, 4):
+
def sorted(seq):
seq.sort()
return seq
@@ -154,11 +169,15 @@
raise httplib.ResponseNotReady()
return self.msg.items()
-if not hasattr(httplib.HTTPResponse, 'getheaders'):
+
+if not hasattr(httplib.HTTPResponse, "getheaders"):
httplib.HTTPResponse.getheaders = HTTPResponse__getheaders
+
# All exceptions raised here derive from HttpLib2Error
-class HttpLib2Error(Exception): pass
+class HttpLib2Error(Exception):
+ pass
+
# Some exceptions can be caught and optionally
# be turned back into responses.
@@ -168,26 +187,65 @@
self.content = content
HttpLib2Error.__init__(self, desc)
-class RedirectMissingLocation(HttpLib2ErrorWithResponse): pass
-class RedirectLimit(HttpLib2ErrorWithResponse): pass
-class FailedToDecompressContent(HttpLib2ErrorWithResponse): pass
-class UnimplementedDigestAuthOptionError(HttpLib2ErrorWithResponse): pass
-class UnimplementedHmacDigestAuthOptionError(HttpLib2ErrorWithResponse): pass
-class MalformedHeader(HttpLib2Error): pass
-class RelativeURIError(HttpLib2Error): pass
-class ServerNotFoundError(HttpLib2Error): pass
-class ProxiesUnavailableError(HttpLib2Error): pass
-class CertificateValidationUnsupported(HttpLib2Error): pass
-class SSLHandshakeError(HttpLib2Error): pass
-class NotSupportedOnThisPlatform(HttpLib2Error): pass
+class RedirectMissingLocation(HttpLib2ErrorWithResponse):
+ pass
+
+
+class RedirectLimit(HttpLib2ErrorWithResponse):
+ pass
+
+
+class FailedToDecompressContent(HttpLib2ErrorWithResponse):
+ pass
+
+
+class UnimplementedDigestAuthOptionError(HttpLib2ErrorWithResponse):
+ pass
+
+
+class UnimplementedHmacDigestAuthOptionError(HttpLib2ErrorWithResponse):
+ pass
+
+
+class MalformedHeader(HttpLib2Error):
+ pass
+
+
+class RelativeURIError(HttpLib2Error):
+ pass
+
+
+class ServerNotFoundError(HttpLib2Error):
+ pass
+
+
+class ProxiesUnavailableError(HttpLib2Error):
+ pass
+
+
+class CertificateValidationUnsupported(HttpLib2Error):
+ pass
+
+
+class SSLHandshakeError(HttpLib2Error):
+ pass
+
+
+class NotSupportedOnThisPlatform(HttpLib2Error):
+ pass
+
+
class CertificateHostnameMismatch(SSLHandshakeError):
def __init__(self, desc, host, cert):
HttpLib2Error.__init__(self, desc)
self.host = host
self.cert = cert
-class NotRunningAppEngineEnvironment(HttpLib2Error): pass
+
+class NotRunningAppEngineEnvironment(HttpLib2Error):
+ pass
+
# Open Items:
# -----------
@@ -204,7 +262,6 @@
# Does not handle Cache-Control: max-stale
# Does not use Age: headers when calculating cache freshness.
-
# The number of redirections to follow before giving up.
# Note that only GET redirects are automatically followed.
# Will also honor 301 requests by saving that info and never
@@ -215,21 +272,31 @@
# Users can optionally provide a module that tells us where the CA_CERTS
# are located.
import ca_certs_locater
+
CA_CERTS = ca_certs_locater.get()
except ImportError:
# Default CA certificates file bundled with httplib2.
- CA_CERTS = os.path.join(
- os.path.dirname(os.path.abspath(__file__ )), "cacerts.txt")
+ CA_CERTS = os.path.join(os.path.dirname(os.path.abspath(__file__)), "cacerts.txt")
# Which headers are hop-by-hop headers by default
-HOP_BY_HOP = ['connection', 'keep-alive', 'proxy-authenticate', 'proxy-authorization', 'te', 'trailers', 'transfer-encoding', 'upgrade']
+HOP_BY_HOP = [
+ "connection",
+ "keep-alive",
+ "proxy-authenticate",
+ "proxy-authorization",
+ "te",
+ "trailers",
+ "transfer-encoding",
+ "upgrade",
+]
def _get_end2end_headers(response):
hopbyhop = list(HOP_BY_HOP)
- hopbyhop.extend([x.strip() for x in response.get('connection', '').split(',')])
+ hopbyhop.extend([x.strip() for x in response.get("connection", "").split(",")])
return [header for header in response.keys() if header not in hopbyhop]
+
URI = re.compile(r"^(([^:/?#]+):)?(//([^/?#]*))?([^?#]*)(\?([^#]*))?(#(.*))?")
@@ -259,8 +326,8 @@
# Cache filename construction (original borrowed from Venus http://intertwingly.net/code/venus/)
-re_url_scheme = re.compile(r'^\w+://')
-re_unsafe = re.compile(r'[^\w\-_.()=!]+')
+re_url_scheme = re.compile(r"^\w+://")
+re_unsafe = re.compile(r"[^\w\-_.()=!]+")
def safename(filename):
@@ -270,12 +337,12 @@
"""
if isinstance(filename, str):
filename_bytes = filename
- filename = filename.decode('utf-8')
+ filename = filename.decode("utf-8")
else:
- filename_bytes = filename.encode('utf-8')
+ filename_bytes = filename.encode("utf-8")
filemd5 = _md5(filename_bytes).hexdigest()
- filename = re_url_scheme.sub('', filename)
- filename = re_unsafe.sub('', filename)
+ filename = re_url_scheme.sub("", filename)
+ filename = re_unsafe.sub("", filename)
# limit length of filename (vital for Windows)
# https://github.com/httplib2/httplib2/pull/74
@@ -284,25 +351,37 @@
# Thus max safe filename x = 93 chars. Let it be 90 to make a round sum:
filename = filename[:90]
- return ','.join((filename, filemd5))
+ return ",".join((filename, filemd5))
-NORMALIZE_SPACE = re.compile(r'(?:\r\n)?[ \t]+')
+NORMALIZE_SPACE = re.compile(r"(?:\r\n)?[ \t]+")
def _normalize_headers(headers):
- return dict([ (key.lower(), NORMALIZE_SPACE.sub(value, ' ').strip()) for (key, value) in headers.iteritems()])
+ return dict(
+ [
+ (key.lower(), NORMALIZE_SPACE.sub(value, " ").strip())
+ for (key, value) in headers.iteritems()
+ ]
+ )
def _parse_cache_control(headers):
retval = {}
- if 'cache-control' in headers:
- parts = headers['cache-control'].split(',')
- parts_with_args = [tuple([x.strip().lower() for x in part.split("=", 1)]) for part in parts if -1 != part.find("=")]
- parts_wo_args = [(name.strip().lower(), 1) for name in parts if -1 == name.find("=")]
+ if "cache-control" in headers:
+ parts = headers["cache-control"].split(",")
+ parts_with_args = [
+ tuple([x.strip().lower() for x in part.split("=", 1)])
+ for part in parts
+ if -1 != part.find("=")
+ ]
+ parts_wo_args = [
+ (name.strip().lower(), 1) for name in parts if -1 == name.find("=")
+ ]
retval = dict(parts_with_args + parts_wo_args)
return retval
+
# Whether to use a strict mode to parse WWW-Authenticate headers
# Might lead to bad results in case of ill-formed header value,
# so disabled by default, falling back to relaxed parsing.
@@ -314,10 +393,16 @@
# "(?:[^\0-\x08\x0A-\x1f\x7f-\xff\\\"]|\\[\0-\x7f])*?" matches a "quoted-string" as defined by HTTP, when LWS have already been replaced by a single space
# Actually, as an auth-param value can be either a token or a quoted-string, they are combined in a single pattern which matches both:
# \"?((?<=\")(?:[^\0-\x1f\x7f-\xff\\\"]|\\[\0-\x7f])*?(?=\")|(?<!\")[^\0-\x08\x0A-\x1f\x7f-\xff()<>@,;:\\\"/[\]?={} \t]+(?!\"))\"?
-WWW_AUTH_STRICT = re.compile(r"^(?:\s*(?:,\s*)?([^\0-\x1f\x7f-\xff()<>@,;:\\\"/[\]?={} \t]+)\s*=\s*\"?((?<=\")(?:[^\0-\x08\x0A-\x1f\x7f-\xff\\\"]|\\[\0-\x7f])*?(?=\")|(?<!\")[^\0-\x1f\x7f-\xff()<>@,;:\\\"/[\]?={} \t]+(?!\"))\"?)(.*)$")
-WWW_AUTH_RELAXED = re.compile(r"^(?:\s*(?:,\s*)?([^ \t\r\n=]+)\s*=\s*\"?((?<=\")(?:[^\\\"]|\\.)*?(?=\")|(?<!\")[^ \t\r\n,]+(?!\"))\"?)(.*)$")
-UNQUOTE_PAIRS = re.compile(r'\\(.)')
-def _parse_www_authenticate(headers, headername='www-authenticate'):
+WWW_AUTH_STRICT = re.compile(
+ r"^(?:\s*(?:,\s*)?([^\0-\x1f\x7f-\xff()<>@,;:\\\"/[\]?={} \t]+)\s*=\s*\"?((?<=\")(?:[^\0-\x08\x0A-\x1f\x7f-\xff\\\"]|\\[\0-\x7f])*?(?=\")|(?<!\")[^\0-\x1f\x7f-\xff()<>@,;:\\\"/[\]?={} \t]+(?!\"))\"?)(.*)$"
+)
+WWW_AUTH_RELAXED = re.compile(
+ r"^(?:\s*(?:,\s*)?([^ \t\r\n=]+)\s*=\s*\"?((?<=\")(?:[^\\\"]|\\.)*?(?=\")|(?<!\")[^ \t\r\n,]+(?!\"))\"?)(.*)$"
+)
+UNQUOTE_PAIRS = re.compile(r"\\(.)")
+
+
+def _parse_www_authenticate(headers, headername="www-authenticate"):
"""Returns a dictionary of dictionaries, one dict
per auth_scheme."""
retval = {}
@@ -325,11 +410,13 @@
try:
authenticate = headers[headername].strip()
- www_auth = USE_WWW_AUTH_STRICT_PARSING and WWW_AUTH_STRICT or WWW_AUTH_RELAXED
+ www_auth = (
+ USE_WWW_AUTH_STRICT_PARSING and WWW_AUTH_STRICT or WWW_AUTH_RELAXED
+ )
while authenticate:
# Break off the scheme at the beginning of the line
- if headername == 'authentication-info':
- (auth_scheme, the_rest) = ('digest', authenticate)
+ if headername == "authentication-info":
+ (auth_scheme, the_rest) = ("digest", authenticate)
else:
(auth_scheme, the_rest) = authenticate.split(" ", 1)
# Now loop over all the key value pairs that come after the scheme,
@@ -339,7 +426,9 @@
while match:
if match and len(match.groups()) == 3:
(key, value, the_rest) = match.groups()
- auth_params[key.lower()] = UNQUOTE_PAIRS.sub(r'\1', value) # '\\'.join([x.replace('\\', '') for x in value.split('\\\\')])
+ auth_params[key.lower()] = UNQUOTE_PAIRS.sub(
+ r"\1", value
+ ) # '\\'.join([x.replace('\\', '') for x in value.split('\\\\')])
match = www_auth.search(the_rest)
retval[auth_scheme.lower()] = auth_params
authenticate = the_rest.strip()
@@ -382,41 +471,44 @@
cc = _parse_cache_control(request_headers)
cc_response = _parse_cache_control(response_headers)
- if 'pragma' in request_headers and request_headers['pragma'].lower().find('no-cache') != -1:
+ if (
+ "pragma" in request_headers
+ and request_headers["pragma"].lower().find("no-cache") != -1
+ ):
retval = "TRANSPARENT"
- if 'cache-control' not in request_headers:
- request_headers['cache-control'] = 'no-cache'
- elif 'no-cache' in cc:
+ if "cache-control" not in request_headers:
+ request_headers["cache-control"] = "no-cache"
+ elif "no-cache" in cc:
retval = "TRANSPARENT"
- elif 'no-cache' in cc_response:
+ elif "no-cache" in cc_response:
retval = "STALE"
- elif 'only-if-cached' in cc:
+ elif "only-if-cached" in cc:
retval = "FRESH"
- elif 'date' in response_headers:
- date = calendar.timegm(email.Utils.parsedate_tz(response_headers['date']))
+ elif "date" in response_headers:
+ date = calendar.timegm(email.Utils.parsedate_tz(response_headers["date"]))
now = time.time()
current_age = max(0, now - date)
- if 'max-age' in cc_response:
+ if "max-age" in cc_response:
try:
- freshness_lifetime = int(cc_response['max-age'])
+ freshness_lifetime = int(cc_response["max-age"])
except ValueError:
freshness_lifetime = 0
- elif 'expires' in response_headers:
- expires = email.Utils.parsedate_tz(response_headers['expires'])
+ elif "expires" in response_headers:
+ expires = email.Utils.parsedate_tz(response_headers["expires"])
if None == expires:
freshness_lifetime = 0
else:
freshness_lifetime = max(0, calendar.timegm(expires) - date)
else:
freshness_lifetime = 0
- if 'max-age' in cc:
+ if "max-age" in cc:
try:
- freshness_lifetime = int(cc['max-age'])
+ freshness_lifetime = int(cc["max-age"])
except ValueError:
freshness_lifetime = 0
- if 'min-fresh' in cc:
+ if "min-fresh" in cc:
try:
- min_fresh = int(cc['min-fresh'])
+ min_fresh = int(cc["min-fresh"])
except ValueError:
min_fresh = 0
current_age += min_fresh
@@ -428,19 +520,24 @@
def _decompressContent(response, new_content):
content = new_content
try:
- encoding = response.get('content-encoding', None)
- if encoding in ['gzip', 'deflate']:
- if encoding == 'gzip':
+ encoding = response.get("content-encoding", None)
+ if encoding in ["gzip", "deflate"]:
+ if encoding == "gzip":
content = gzip.GzipFile(fileobj=StringIO.StringIO(new_content)).read()
- if encoding == 'deflate':
+ if encoding == "deflate":
content = zlib.decompress(content, -zlib.MAX_WBITS)
- response['content-length'] = str(len(content))
+ response["content-length"] = str(len(content))
# Record the historical presence of the encoding in a way the won't interfere.
- response['-content-encoding'] = response['content-encoding']
- del response['content-encoding']
+ response["-content-encoding"] = response["content-encoding"]
+ del response["content-encoding"]
except (IOError, zlib.error):
content = ""
- raise FailedToDecompressContent(_("Content purported to be compressed with %s but failed to decompress.") % response.get('content-encoding'), response, content)
+ raise FailedToDecompressContent(
+ _("Content purported to be compressed with %s but failed to decompress.")
+ % response.get("content-encoding"),
+ response,
+ content,
+ )
return content
@@ -448,21 +545,21 @@
if cachekey:
cc = _parse_cache_control(request_headers)
cc_response = _parse_cache_control(response_headers)
- if 'no-store' in cc or 'no-store' in cc_response:
+ if "no-store" in cc or "no-store" in cc_response:
cache.delete(cachekey)
else:
info = email.Message.Message()
for key, value in response_headers.iteritems():
- if key not in ['status','content-encoding','transfer-encoding']:
+ if key not in ["status", "content-encoding", "transfer-encoding"]:
info[key] = value
# Add annotations to the cache to indicate what headers
# are variant for this request.
- vary = response_headers.get('vary', None)
+ vary = response_headers.get("vary", None)
if vary:
- vary_headers = vary.lower().replace(' ', '').split(',')
+ vary_headers = vary.lower().replace(" ", "").split(",")
for header in vary_headers:
- key = '-varied-%s' % header
+ key = "-varied-%s" % header
try:
info[key] = request_headers[header]
except KeyError:
@@ -472,7 +569,7 @@
if status == 304:
status = 200
- status_header = 'status: %d\r\n' % status
+ status_header = "status: %d\r\n" % status
header_str = info.as_string()
@@ -481,12 +578,19 @@
cache.set(cachekey, text)
+
def _cnonce():
- dig = _md5("%s:%s" % (time.ctime(), ["0123456789"[random.randrange(0, 9)] for i in range(20)])).hexdigest()
+ dig = _md5(
+ "%s:%s"
+ % (time.ctime(), ["0123456789"[random.randrange(0, 9)] for i in range(20)])
+ ).hexdigest()
return dig[:16]
+
def _wsse_username_token(cnonce, iso_now, password):
- return base64.b64encode(_sha("%s%s%s" % (cnonce, iso_now, password)).digest()).strip()
+ return base64.b64encode(
+ _sha("%s%s%s" % (cnonce, iso_now, password)).digest()
+ ).strip()
# For credentials we need two things, first
@@ -497,8 +601,11 @@
# So we also need each Auth instance to be able to tell us
# how close to the 'top' it is.
+
class Authentication(object):
- def __init__(self, credentials, host, request_uri, headers, response, content, http):
+ def __init__(
+ self, credentials, host, request_uri, headers, response, content, http
+ ):
(scheme, authority, path, query, fragment) = parse_uri(request_uri)
self.path = path
self.host = host
@@ -507,7 +614,7 @@
def depth(self, request_uri):
(scheme, authority, path, query, fragment) = parse_uri(request_uri)
- return request_uri[len(self.path):].count("/")
+ return request_uri[len(self.path) :].count("/")
def inscope(self, host, request_uri):
# XXX Should we normalize the request_uri?
@@ -531,105 +638,169 @@
class BasicAuthentication(Authentication):
- def __init__(self, credentials, host, request_uri, headers, response, content, http):
- Authentication.__init__(self, credentials, host, request_uri, headers, response, content, http)
+ def __init__(
+ self, credentials, host, request_uri, headers, response, content, http
+ ):
+ Authentication.__init__(
+ self, credentials, host, request_uri, headers, response, content, http
+ )
def request(self, method, request_uri, headers, content):
"""Modify the request headers to add the appropriate
Authorization header."""
- headers['authorization'] = 'Basic ' + base64.b64encode("%s:%s" % self.credentials).strip()
+ headers["authorization"] = (
+ "Basic " + base64.b64encode("%s:%s" % self.credentials).strip()
+ )
class DigestAuthentication(Authentication):
"""Only do qop='auth' and MD5, since that
is all Apache currently implements"""
- def __init__(self, credentials, host, request_uri, headers, response, content, http):
- Authentication.__init__(self, credentials, host, request_uri, headers, response, content, http)
- challenge = _parse_www_authenticate(response, 'www-authenticate')
- self.challenge = challenge['digest']
- qop = self.challenge.get('qop', 'auth')
- self.challenge['qop'] = ('auth' in [x.strip() for x in qop.split()]) and 'auth' or None
- if self.challenge['qop'] is None:
- raise UnimplementedDigestAuthOptionError( _("Unsupported value for qop: %s." % qop))
- self.challenge['algorithm'] = self.challenge.get('algorithm', 'MD5').upper()
- if self.challenge['algorithm'] != 'MD5':
- raise UnimplementedDigestAuthOptionError( _("Unsupported value for algorithm: %s." % self.challenge['algorithm']))
- self.A1 = "".join([self.credentials[0], ":", self.challenge['realm'], ":", self.credentials[1]])
- self.challenge['nc'] = 1
- def request(self, method, request_uri, headers, content, cnonce = None):
+ def __init__(
+ self, credentials, host, request_uri, headers, response, content, http
+ ):
+ Authentication.__init__(
+ self, credentials, host, request_uri, headers, response, content, http
+ )
+ challenge = _parse_www_authenticate(response, "www-authenticate")
+ self.challenge = challenge["digest"]
+ qop = self.challenge.get("qop", "auth")
+ self.challenge["qop"] = (
+ ("auth" in [x.strip() for x in qop.split()]) and "auth" or None
+ )
+ if self.challenge["qop"] is None:
+ raise UnimplementedDigestAuthOptionError(
+ _("Unsupported value for qop: %s." % qop)
+ )
+ self.challenge["algorithm"] = self.challenge.get("algorithm", "MD5").upper()
+ if self.challenge["algorithm"] != "MD5":
+ raise UnimplementedDigestAuthOptionError(
+ _("Unsupported value for algorithm: %s." % self.challenge["algorithm"])
+ )
+ self.A1 = "".join(
+ [
+ self.credentials[0],
+ ":",
+ self.challenge["realm"],
+ ":",
+ self.credentials[1],
+ ]
+ )
+ self.challenge["nc"] = 1
+
+ def request(self, method, request_uri, headers, content, cnonce=None):
"""Modify the request headers"""
H = lambda x: _md5(x).hexdigest()
KD = lambda s, d: H("%s:%s" % (s, d))
A2 = "".join([method, ":", request_uri])
- self.challenge['cnonce'] = cnonce or _cnonce()
- request_digest = '"%s"' % KD(H(self.A1), "%s:%s:%s:%s:%s" % (
- self.challenge['nonce'],
- '%08x' % self.challenge['nc'],
- self.challenge['cnonce'],
- self.challenge['qop'], H(A2)))
- headers['authorization'] = 'Digest username="%s", realm="%s", nonce="%s", uri="%s", algorithm=%s, response=%s, qop=%s, nc=%08x, cnonce="%s"' % (
- self.credentials[0],
- self.challenge['realm'],
- self.challenge['nonce'],
- request_uri,
- self.challenge['algorithm'],
- request_digest,
- self.challenge['qop'],
- self.challenge['nc'],
- self.challenge['cnonce'])
- if self.challenge.get('opaque'):
- headers['authorization'] += ', opaque="%s"' % self.challenge['opaque']
- self.challenge['nc'] += 1
+ self.challenge["cnonce"] = cnonce or _cnonce()
+ request_digest = '"%s"' % KD(
+ H(self.A1),
+ "%s:%s:%s:%s:%s"
+ % (
+ self.challenge["nonce"],
+ "%08x" % self.challenge["nc"],
+ self.challenge["cnonce"],
+ self.challenge["qop"],
+ H(A2),
+ ),
+ )
+ headers["authorization"] = (
+ 'Digest username="%s", realm="%s", nonce="%s", '
+ 'uri="%s", algorithm=%s, response=%s, qop=%s, '
+ 'nc=%08x, cnonce="%s"'
+ ) % (
+ self.credentials[0],
+ self.challenge["realm"],
+ self.challenge["nonce"],
+ request_uri,
+ self.challenge["algorithm"],
+ request_digest,
+ self.challenge["qop"],
+ self.challenge["nc"],
+ self.challenge["cnonce"],
+ )
+ if self.challenge.get("opaque"):
+ headers["authorization"] += ', opaque="%s"' % self.challenge["opaque"]
+ self.challenge["nc"] += 1
def response(self, response, content):
- if 'authentication-info' not in response:
- challenge = _parse_www_authenticate(response, 'www-authenticate').get('digest', {})
- if 'true' == challenge.get('stale'):
- self.challenge['nonce'] = challenge['nonce']
- self.challenge['nc'] = 1
+ if "authentication-info" not in response:
+ challenge = _parse_www_authenticate(response, "www-authenticate").get(
+ "digest", {}
+ )
+ if "true" == challenge.get("stale"):
+ self.challenge["nonce"] = challenge["nonce"]
+ self.challenge["nc"] = 1
return True
else:
- updated_challenge = _parse_www_authenticate(response, 'authentication-info').get('digest', {})
+ updated_challenge = _parse_www_authenticate(
+ response, "authentication-info"
+ ).get("digest", {})
- if 'nextnonce' in updated_challenge:
- self.challenge['nonce'] = updated_challenge['nextnonce']
- self.challenge['nc'] = 1
+ if "nextnonce" in updated_challenge:
+ self.challenge["nonce"] = updated_challenge["nextnonce"]
+ self.challenge["nc"] = 1
return False
class HmacDigestAuthentication(Authentication):
"""Adapted from Robert Sayre's code and DigestAuthentication above."""
+
__author__ = "Thomas Broyer (t.broyer@ltgt.net)"
- def __init__(self, credentials, host, request_uri, headers, response, content, http):
- Authentication.__init__(self, credentials, host, request_uri, headers, response, content, http)
- challenge = _parse_www_authenticate(response, 'www-authenticate')
- self.challenge = challenge['hmacdigest']
+ def __init__(
+ self, credentials, host, request_uri, headers, response, content, http
+ ):
+ Authentication.__init__(
+ self, credentials, host, request_uri, headers, response, content, http
+ )
+ challenge = _parse_www_authenticate(response, "www-authenticate")
+ self.challenge = challenge["hmacdigest"]
# TODO: self.challenge['domain']
- self.challenge['reason'] = self.challenge.get('reason', 'unauthorized')
- if self.challenge['reason'] not in ['unauthorized', 'integrity']:
- self.challenge['reason'] = 'unauthorized'
- self.challenge['salt'] = self.challenge.get('salt', '')
- if not self.challenge.get('snonce'):
- raise UnimplementedHmacDigestAuthOptionError( _("The challenge doesn't contain a server nonce, or this one is empty."))
- self.challenge['algorithm'] = self.challenge.get('algorithm', 'HMAC-SHA-1')
- if self.challenge['algorithm'] not in ['HMAC-SHA-1', 'HMAC-MD5']:
- raise UnimplementedHmacDigestAuthOptionError( _("Unsupported value for algorithm: %s." % self.challenge['algorithm']))
- self.challenge['pw-algorithm'] = self.challenge.get('pw-algorithm', 'SHA-1')
- if self.challenge['pw-algorithm'] not in ['SHA-1', 'MD5']:
- raise UnimplementedHmacDigestAuthOptionError( _("Unsupported value for pw-algorithm: %s." % self.challenge['pw-algorithm']))
- if self.challenge['algorithm'] == 'HMAC-MD5':
+ self.challenge["reason"] = self.challenge.get("reason", "unauthorized")
+ if self.challenge["reason"] not in ["unauthorized", "integrity"]:
+ self.challenge["reason"] = "unauthorized"
+ self.challenge["salt"] = self.challenge.get("salt", "")
+ if not self.challenge.get("snonce"):
+ raise UnimplementedHmacDigestAuthOptionError(
+ _("The challenge doesn't contain a server nonce, or this one is empty.")
+ )
+ self.challenge["algorithm"] = self.challenge.get("algorithm", "HMAC-SHA-1")
+ if self.challenge["algorithm"] not in ["HMAC-SHA-1", "HMAC-MD5"]:
+ raise UnimplementedHmacDigestAuthOptionError(
+ _("Unsupported value for algorithm: %s." % self.challenge["algorithm"])
+ )
+ self.challenge["pw-algorithm"] = self.challenge.get("pw-algorithm", "SHA-1")
+ if self.challenge["pw-algorithm"] not in ["SHA-1", "MD5"]:
+ raise UnimplementedHmacDigestAuthOptionError(
+ _(
+ "Unsupported value for pw-algorithm: %s."
+ % self.challenge["pw-algorithm"]
+ )
+ )
+ if self.challenge["algorithm"] == "HMAC-MD5":
self.hashmod = _md5
else:
self.hashmod = _sha
- if self.challenge['pw-algorithm'] == 'MD5':
+ if self.challenge["pw-algorithm"] == "MD5":
self.pwhashmod = _md5
else:
self.pwhashmod = _sha
- self.key = "".join([self.credentials[0], ":",
- self.pwhashmod.new("".join([self.credentials[1], self.challenge['salt']])).hexdigest().lower(),
- ":", self.challenge['realm']])
+ self.key = "".join(
+ [
+ self.credentials[0],
+ ":",
+ self.pwhashmod.new(
+ "".join([self.credentials[1], self.challenge["salt"]])
+ )
+ .hexdigest()
+ .lower(),
+ ":",
+ self.challenge["realm"],
+ ]
+ )
self.key = self.pwhashmod.new(self.key).hexdigest().lower()
def request(self, method, request_uri, headers, content):
@@ -637,23 +808,38 @@
keys = _get_end2end_headers(headers)
keylist = "".join(["%s " % k for k in keys])
headers_val = "".join([headers[k] for k in keys])
- created = time.strftime('%Y-%m-%dT%H:%M:%SZ',time.gmtime())
+ created = time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime())
cnonce = _cnonce()
- request_digest = "%s:%s:%s:%s:%s" % (method, request_uri, cnonce, self.challenge['snonce'], headers_val)
- request_digest = hmac.new(self.key, request_digest, self.hashmod).hexdigest().lower()
- headers['authorization'] = 'HMACDigest username="%s", realm="%s", snonce="%s", cnonce="%s", uri="%s", created="%s", response="%s", headers="%s"' % (
- self.credentials[0],
- self.challenge['realm'],
- self.challenge['snonce'],
- cnonce,
- request_uri,
- created,
- request_digest,
- keylist)
+ request_digest = "%s:%s:%s:%s:%s" % (
+ method,
+ request_uri,
+ cnonce,
+ self.challenge["snonce"],
+ headers_val,
+ )
+ request_digest = (
+ hmac.new(self.key, request_digest, self.hashmod).hexdigest().lower()
+ )
+ headers["authorization"] = (
+ 'HMACDigest username="%s", realm="%s", snonce="%s",'
+ ' cnonce="%s", uri="%s", created="%s", '
+ 'response="%s", headers="%s"'
+ ) % (
+ self.credentials[0],
+ self.challenge["realm"],
+ self.challenge["snonce"],
+ cnonce,
+ request_uri,
+ created,
+ request_digest,
+ keylist,
+ )
def response(self, response, content):
- challenge = _parse_www_authenticate(response, 'www-authenticate').get('hmacdigest', {})
- if challenge.get('reason') in ['integrity', 'stale']:
+ challenge = _parse_www_authenticate(response, "www-authenticate").get(
+ "hmacdigest", {}
+ )
+ if challenge.get("reason") in ["integrity", "stale"]:
return True
return False
@@ -666,50 +852,69 @@
TypePad has implemented it wrong, by never issuing a 401
challenge but instead requiring your client to telepathically know that
their endpoint is expecting WSSE profile="UsernameToken"."""
- def __init__(self, credentials, host, request_uri, headers, response, content, http):
- Authentication.__init__(self, credentials, host, request_uri, headers, response, content, http)
+
+ def __init__(
+ self, credentials, host, request_uri, headers, response, content, http
+ ):
+ Authentication.__init__(
+ self, credentials, host, request_uri, headers, response, content, http
+ )
def request(self, method, request_uri, headers, content):
"""Modify the request headers to add the appropriate
Authorization header."""
- headers['authorization'] = 'WSSE profile="UsernameToken"'
+ headers["authorization"] = 'WSSE profile="UsernameToken"'
iso_now = time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime())
cnonce = _cnonce()
password_digest = _wsse_username_token(cnonce, iso_now, self.credentials[1])
- headers['X-WSSE'] = 'UsernameToken Username="%s", PasswordDigest="%s", Nonce="%s", Created="%s"' % (
- self.credentials[0],
- password_digest,
- cnonce,
- iso_now)
+ headers["X-WSSE"] = (
+ 'UsernameToken Username="%s", PasswordDigest="%s", '
+ 'Nonce="%s", Created="%s"'
+ ) % (self.credentials[0], password_digest, cnonce, iso_now)
class GoogleLoginAuthentication(Authentication):
- def __init__(self, credentials, host, request_uri, headers, response, content, http):
+ def __init__(
+ self, credentials, host, request_uri, headers, response, content, http
+ ):
from urllib import urlencode
- Authentication.__init__(self, credentials, host, request_uri, headers, response, content, http)
- challenge = _parse_www_authenticate(response, 'www-authenticate')
- service = challenge['googlelogin'].get('service', 'xapi')
+
+ Authentication.__init__(
+ self, credentials, host, request_uri, headers, response, content, http
+ )
+ challenge = _parse_www_authenticate(response, "www-authenticate")
+ service = challenge["googlelogin"].get("service", "xapi")
# Bloggger actually returns the service in the challenge
# For the rest we guess based on the URI
- if service == 'xapi' and request_uri.find("calendar") > 0:
+ if service == "xapi" and request_uri.find("calendar") > 0:
service = "cl"
# No point in guessing Base or Spreadsheet
- #elif request_uri.find("spreadsheets") > 0:
+ # elif request_uri.find("spreadsheets") > 0:
# service = "wise"
- auth = dict(Email=credentials[0], Passwd=credentials[1], service=service, source=headers['user-agent'])
- resp, content = self.http.request("https://www.google.com/accounts/ClientLogin", method="POST", body=urlencode(auth), headers={'Content-Type': 'application/x-www-form-urlencoded'})
- lines = content.split('\n')
+ auth = dict(
+ Email=credentials[0],
+ Passwd=credentials[1],
+ service=service,
+ source=headers["user-agent"],
+ )
+ resp, content = self.http.request(
+ "https://www.google.com/accounts/ClientLogin",
+ method="POST",
+ body=urlencode(auth),
+ headers={"Content-Type": "application/x-www-form-urlencoded"},
+ )
+ lines = content.split("\n")
d = dict([tuple(line.split("=", 1)) for line in lines if line])
if resp.status == 403:
self.Auth = ""
else:
- self.Auth = d['Auth']
+ self.Auth = d["Auth"]
def request(self, method, request_uri, headers, content):
"""Modify the request headers to add the appropriate
Authorization header."""
- headers['authorization'] = 'GoogleLogin Auth=' + self.Auth
+ headers["authorization"] = "GoogleLogin Auth=" + self.Auth
AUTH_SCHEME_CLASSES = {
@@ -717,7 +922,7 @@
"wsse": WsseAuthentication,
"digest": DigestAuthentication,
"hmacdigest": HmacDigestAuthentication,
- "googlelogin": GoogleLoginAuthentication
+ "googlelogin": GoogleLoginAuthentication,
}
AUTH_SCHEME_ORDER = ["hmacdigest", "googlelogin", "digest", "wsse", "basic"]
@@ -728,7 +933,10 @@
Not really safe to use if multiple threads or processes are going to
be running on the same cache.
"""
- def __init__(self, cache, safe=safename): # use safe=lambda x: md5.new(x).hexdigest() for the old behavior
+
+ def __init__(
+ self, cache, safe=safename
+ ): # use safe=lambda x: md5.new(x).hexdigest() for the old behavior
self.cache = cache
self.safe = safe
if not os.path.exists(cache):
@@ -776,6 +984,7 @@
class KeyCerts(Credentials):
"""Identical to Credentials except that
name/password are mapped to key/cert."""
+
pass
@@ -785,32 +994,35 @@
class ProxyInfo(object):
"""Collect information required to use a proxy."""
+
bypass_hosts = ()
- def __init__(self, proxy_type, proxy_host, proxy_port,
- proxy_rdns=True, proxy_user=None, proxy_pass=None, proxy_headers=None):
- """
- Args:
+ def __init__(
+ self,
+ proxy_type,
+ proxy_host,
+ proxy_port,
+ proxy_rdns=True,
+ proxy_user=None,
+ proxy_pass=None,
+ proxy_headers=None,
+ ):
+ """Args:
+
proxy_type: The type of proxy server. This must be set to one of
- socks.PROXY_TYPE_XXX constants. For example:
-
- p = ProxyInfo(proxy_type=socks.PROXY_TYPE_HTTP,
- proxy_host='localhost', proxy_port=8000)
-
+ socks.PROXY_TYPE_XXX constants. For example: p =
+ ProxyInfo(proxy_type=socks.PROXY_TYPE_HTTP, proxy_host='localhost',
+ proxy_port=8000)
proxy_host: The hostname or IP address of the proxy server.
-
proxy_port: The port that the proxy server is running on.
-
proxy_rdns: If True (default), DNS queries will not be performed
locally, and instead, handed to the proxy to resolve. This is useful
- if the network does not allow resolution of non-local names. In
+ if the network does not allow resolution of non-local names. In
httplib2 0.9 and earlier, this defaulted to False.
-
proxy_user: The username used to authenticate with the proxy server.
-
proxy_pass: The password used to authenticate with the proxy server.
-
- proxy_headers: Additional or modified headers for the proxy connect request.
+ proxy_headers: Additional or modified headers for the proxy connect
+ request.
"""
self.proxy_type = proxy_type
self.proxy_host = proxy_host
@@ -821,8 +1033,15 @@
self.proxy_headers = proxy_headers
def astuple(self):
- return (self.proxy_type, self.proxy_host, self.proxy_port,
- self.proxy_rdns, self.proxy_user, self.proxy_pass, self.proxy_headers)
+ return (
+ self.proxy_type,
+ self.proxy_host,
+ self.proxy_port,
+ self.proxy_rdns,
+ self.proxy_user,
+ self.proxy_pass,
+ self.proxy_headers,
+ )
def isgood(self):
return (self.proxy_host != None) and (self.proxy_port != None)
@@ -835,54 +1054,54 @@
if self.bypass_hosts is AllHosts:
return True
- hostname = '.' + hostname.lstrip('.')
+ hostname = "." + hostname.lstrip(".")
for skip_name in self.bypass_hosts:
# *.suffix
- if skip_name.startswith('.') and hostname.endswith(skip_name):
+ if skip_name.startswith(".") and hostname.endswith(skip_name):
return True
# exact match
- if hostname == '.' + skip_name:
+ if hostname == "." + skip_name:
return True
return False
def __repr__(self):
return (
- '<ProxyInfo type={p.proxy_type} host:port={p.proxy_host}:{p.proxy_port} rdns={p.proxy_rdns}' +
- ' user={p.proxy_user} headers={p.proxy_headers}>').format(p=self)
+ "<ProxyInfo type={p.proxy_type} "
+ "host:port={p.proxy_host}:{p.proxy_port} rdns={p.proxy_rdns}"
+ + " user={p.proxy_user} headers={p.proxy_headers}>"
+ ).format(p=self)
-def proxy_info_from_environment(method='http'):
+def proxy_info_from_environment(method="http"):
+ """Read proxy info from the environment variables.
"""
- Read proxy info from the environment variables.
- """
- if method not in ['http', 'https']:
+ if method not in ["http", "https"]:
return
- env_var = method + '_proxy'
+ env_var = method + "_proxy"
url = os.environ.get(env_var, os.environ.get(env_var.upper()))
if not url:
return
return proxy_info_from_url(url, method, None)
-def proxy_info_from_url(url, method='http', noproxy=None):
- """
- Construct a ProxyInfo from a URL (such as http_proxy env var)
+def proxy_info_from_url(url, method="http", noproxy=None):
+ """Construct a ProxyInfo from a URL (such as http_proxy env var)
"""
url = urlparse.urlparse(url)
username = None
password = None
port = None
- if '@' in url[1]:
- ident, host_port = url[1].split('@', 1)
- if ':' in ident:
- username, password = ident.split(':', 1)
+ if "@" in url[1]:
+ ident, host_port = url[1].split("@", 1)
+ if ":" in ident:
+ username, password = ident.split(":", 1)
else:
password = ident
else:
host_port = url[1]
- if ':' in host_port:
- host, port = host_port.split(':', 1)
+ if ":" in host_port:
+ host, port = host_port.split(":", 1)
else:
host = host_port
@@ -893,23 +1112,23 @@
proxy_type = 3 # socks.PROXY_TYPE_HTTP
pi = ProxyInfo(
- proxy_type = proxy_type,
- proxy_host = host,
- proxy_port = port,
- proxy_user = username or None,
- proxy_pass = password or None,
- proxy_headers = None,
+ proxy_type=proxy_type,
+ proxy_host=host,
+ proxy_port=port,
+ proxy_user=username or None,
+ proxy_pass=password or None,
+ proxy_headers=None,
)
bypass_hosts = []
# If not given an explicit noproxy value, respect values in env vars.
if noproxy is None:
- noproxy = os.environ.get('no_proxy', os.environ.get('NO_PROXY', ''))
+ noproxy = os.environ.get("no_proxy", os.environ.get("NO_PROXY", ""))
# Special case: A single '*' character means all hosts should be bypassed.
- if noproxy == '*':
+ if noproxy == "*":
bypass_hosts = AllHosts
elif noproxy.strip():
- bypass_hosts = noproxy.split(',')
+ bypass_hosts = noproxy.split(",")
bypass_hosts = filter(bool, bypass_hosts) # To exclude empty string.
pi.bypass_hosts = bypass_hosts
@@ -917,8 +1136,7 @@
class HTTPConnectionWithTimeout(httplib.HTTPConnection):
- """
- HTTPConnection subclass that supports timeouts
+ """HTTPConnection subclass that supports timeouts
All timeouts are in seconds. If None is passed for timeout then
Python's default timeout for sockets will be used. See for example
@@ -936,11 +1154,14 @@
# Mostly verbatim from httplib.py.
if self.proxy_info and socks is None:
raise ProxiesUnavailableError(
- 'Proxy support missing but proxy use was requested!')
+ "Proxy support missing but proxy use was requested!"
+ )
msg = "getaddrinfo returns an empty list"
if self.proxy_info and self.proxy_info.isgood():
use_proxy = True
- proxy_type, proxy_host, proxy_port, proxy_rdns, proxy_user, proxy_pass, proxy_headers = self.proxy_info.astuple()
+ proxy_type, proxy_host, proxy_port, proxy_rdns, proxy_user, proxy_pass, proxy_headers = (
+ self.proxy_info.astuple()
+ )
host = proxy_host
port = proxy_port
@@ -955,7 +1176,15 @@
try:
if use_proxy:
self.sock = socks.socksocket(af, socktype, proto)
- self.sock.setproxy(proxy_type, proxy_host, proxy_port, proxy_rdns, proxy_user, proxy_pass, proxy_headers)
+ self.sock.setproxy(
+ proxy_type,
+ proxy_host,
+ proxy_port,
+ proxy_rdns,
+ proxy_user,
+ proxy_pass,
+ proxy_headers,
+ )
else:
self.sock = socket.socket(af, socktype, proto)
self.sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
@@ -966,7 +1195,19 @@
if self.debuglevel > 0:
print("connect: (%s, %s) ************" % (self.host, self.port))
if use_proxy:
- print("proxy: %s ************" % str((proxy_host, proxy_port, proxy_rdns, proxy_user, proxy_pass, proxy_headers)))
+ print(
+ "proxy: %s ************"
+ % str(
+ (
+ proxy_host,
+ proxy_port,
+ proxy_rdns,
+ proxy_user,
+ proxy_pass,
+ proxy_headers,
+ )
+ )
+ )
if use_proxy:
self.sock.connect((self.host, self.port) + sa[2:])
else:
@@ -975,39 +1216,59 @@
if self.debuglevel > 0:
print("connect fail: (%s, %s)" % (self.host, self.port))
if use_proxy:
- print("proxy: %s" % str((proxy_host, proxy_port, proxy_rdns, proxy_user, proxy_pass, proxy_headers)))
+ print(
+ "proxy: %s"
+ % str(
+ (
+ proxy_host,
+ proxy_port,
+ proxy_rdns,
+ proxy_user,
+ proxy_pass,
+ proxy_headers,
+ )
+ )
+ )
if self.sock:
self.sock.close()
self.sock = None
continue
break
if not self.sock:
- raise socket.error, msg
+ raise socket.error(msg)
class HTTPSConnectionWithTimeout(httplib.HTTPSConnection):
- """
- This class allows communication via SSL.
+ """This class allows communication via SSL.
All timeouts are in seconds. If None is passed for timeout then
Python's default timeout for sockets will be used. See for example
the docs of socket.setdefaulttimeout():
http://docs.python.org/library/socket.html#socket.setdefaulttimeout
"""
- def __init__(self, host, port=None, key_file=None, cert_file=None,
- strict=None, timeout=None, proxy_info=None,
- ca_certs=None, disable_ssl_certificate_validation=False,
- ssl_version=None):
- httplib.HTTPSConnection.__init__(self, host, port=port,
- key_file=key_file,
- cert_file=cert_file, strict=strict)
+
+ def __init__(
+ self,
+ host,
+ port=None,
+ key_file=None,
+ cert_file=None,
+ strict=None,
+ timeout=None,
+ proxy_info=None,
+ ca_certs=None,
+ disable_ssl_certificate_validation=False,
+ ssl_version=None,
+ ):
+ httplib.HTTPSConnection.__init__(
+ self, host, port=port, key_file=key_file, cert_file=cert_file, strict=strict
+ )
self.timeout = timeout
self.proxy_info = proxy_info
if ca_certs is None:
ca_certs = CA_CERTS
self.ca_certs = ca_certs
- self.disable_ssl_certificate_validation = \
- disable_ssl_certificate_validation
+ self.disable_ssl_certificate_validation = disable_ssl_certificate_validation
self.ssl_version = ssl_version
# The following two methods were adapted from https_wrapper.py, released
@@ -1038,12 +1299,10 @@
Returns:
list: A list of valid host globs.
"""
- if 'subjectAltName' in cert:
- return [x[1] for x in cert['subjectAltName']
- if x[0].lower() == 'dns']
+ if "subjectAltName" in cert:
+ return [x[1] for x in cert["subjectAltName"] if x[0].lower() == "dns"]
else:
- return [x[0][1] for x in cert['subject']
- if x[0][0].lower() == 'commonname']
+ return [x[0][1] for x in cert["subject"] if x[0][0].lower() == "commonname"]
def _ValidateCertificateHostname(self, cert, hostname):
"""Validates that a given hostname is valid for an SSL certificate.
@@ -1056,8 +1315,8 @@
"""
hosts = self._GetValidHostsForCert(cert)
for host in hosts:
- host_re = host.replace('.', '\.').replace('*', '[^.]*')
- if re.search('^%s$' % (host_re,), hostname, re.I):
+ host_re = host.replace(".", "\.").replace("*", "[^.]*")
+ if re.search("^%s$" % (host_re,), hostname, re.I):
return True
return False
@@ -1067,7 +1326,9 @@
msg = "getaddrinfo returns an empty list"
if self.proxy_info and self.proxy_info.isgood():
use_proxy = True
- proxy_type, proxy_host, proxy_port, proxy_rdns, proxy_user, proxy_pass, proxy_headers = self.proxy_info.astuple()
+ proxy_type, proxy_host, proxy_port, proxy_rdns, proxy_user, proxy_pass, proxy_headers = (
+ self.proxy_info.astuple()
+ )
host = proxy_host
port = proxy_port
@@ -1083,7 +1344,15 @@
if use_proxy:
sock = socks.socksocket(family, socktype, proto)
- sock.setproxy(proxy_type, proxy_host, proxy_port, proxy_rdns, proxy_user, proxy_pass, proxy_headers)
+ sock.setproxy(
+ proxy_type,
+ proxy_host,
+ proxy_port,
+ proxy_rdns,
+ proxy_user,
+ proxy_pass,
+ proxy_headers,
+ )
else:
sock = socket.socket(family, socktype, proto)
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
@@ -1095,22 +1364,46 @@
sock.connect((self.host, self.port) + sockaddr[:2])
else:
sock.connect(sockaddr)
- self.sock =_ssl_wrap_socket(
- sock, self.key_file, self.cert_file,
- self.disable_ssl_certificate_validation, self.ca_certs,
- self.ssl_version, self.host)
+ self.sock = _ssl_wrap_socket(
+ sock,
+ self.key_file,
+ self.cert_file,
+ self.disable_ssl_certificate_validation,
+ self.ca_certs,
+ self.ssl_version,
+ self.host,
+ )
if self.debuglevel > 0:
print("connect: (%s, %s)" % (self.host, self.port))
if use_proxy:
- print("proxy: %s" % str((proxy_host, proxy_port, proxy_rdns, proxy_user, proxy_pass, proxy_headers)))
+ print(
+ "proxy: %s"
+ % str(
+ (
+ proxy_host,
+ proxy_port,
+ proxy_rdns,
+ proxy_user,
+ proxy_pass,
+ proxy_headers,
+ )
+ )
+ )
if not self.disable_ssl_certificate_validation:
cert = self.sock.getpeercert()
- hostname = self.host.split(':', 0)[0]
+ hostname = self.host.split(":", 0)[0]
if not self._ValidateCertificateHostname(cert, hostname):
raise CertificateHostnameMismatch(
- 'Server presented certificate that does not match '
- 'host %s: %s' % (hostname, cert), hostname, cert)
- except (ssl_SSLError, ssl_CertificateError, CertificateHostnameMismatch) as e:
+ "Server presented certificate that does not match "
+ "host %s: %s" % (hostname, cert),
+ hostname,
+ cert,
+ )
+ except (
+ ssl_SSLError,
+ ssl_CertificateError,
+ CertificateHostnameMismatch,
+ ) as e:
if sock:
sock.close()
if self.sock:
@@ -1120,7 +1413,7 @@
# to get at more detailed error information, in particular
# whether the error is due to certificate validation or
# something else (such as SSL protocol mismatch).
- if getattr(e, 'errno', None) == ssl.SSL_ERROR_SSL:
+ if getattr(e, "errno", None) == ssl.SSL_ERROR_SSL:
raise SSLHandshakeError(e)
else:
raise
@@ -1130,31 +1423,57 @@
if self.debuglevel > 0:
print("connect fail: (%s, %s)" % (self.host, self.port))
if use_proxy:
- print("proxy: %s" % str((proxy_host, proxy_port, proxy_rdns, proxy_user, proxy_pass, proxy_headers)))
+ print(
+ "proxy: %s"
+ % str(
+ (
+ proxy_host,
+ proxy_port,
+ proxy_rdns,
+ proxy_user,
+ proxy_pass,
+ proxy_headers,
+ )
+ )
+ )
if self.sock:
self.sock.close()
self.sock = None
continue
break
if not self.sock:
- raise socket.error, msg
+ raise socket.error(msg)
+
SCHEME_TO_CONNECTION = {
- 'http': HTTPConnectionWithTimeout,
- 'https': HTTPSConnectionWithTimeout
+ "http": HTTPConnectionWithTimeout,
+ "https": HTTPSConnectionWithTimeout,
}
def _new_fixed_fetch(validate_certificate):
- def fixed_fetch(url, payload=None, method="GET", headers={},
- allow_truncated=False, follow_redirects=True,
- deadline=None):
+ def fixed_fetch(
+ url,
+ payload=None,
+ method="GET",
+ headers={},
+ allow_truncated=False,
+ follow_redirects=True,
+ deadline=None,
+ ):
if deadline is None:
deadline = socket.getdefaulttimeout()
- return fetch(url, payload=payload, method=method, headers=headers,
- allow_truncated=allow_truncated,
- follow_redirects=follow_redirects, deadline=deadline,
- validate_certificate=validate_certificate)
+ return fetch(
+ url,
+ payload=payload,
+ method=method,
+ headers=headers,
+ allow_truncated=allow_truncated,
+ follow_redirects=follow_redirects,
+ deadline=deadline,
+ validate_certificate=validate_certificate,
+ )
+
return fixed_fetch
@@ -1165,12 +1484,23 @@
disable_ssl_certificate_validation, and ssl_version are all dropped on
the ground.
"""
- def __init__(self, host, port=None, key_file=None, cert_file=None,
- strict=None, timeout=None, proxy_info=None, ca_certs=None,
- disable_ssl_certificate_validation=False,
- ssl_version=None):
- httplib.HTTPConnection.__init__(self, host, port=port,
- strict=strict, timeout=timeout)
+
+ def __init__(
+ self,
+ host,
+ port=None,
+ key_file=None,
+ cert_file=None,
+ strict=None,
+ timeout=None,
+ proxy_info=None,
+ ca_certs=None,
+ disable_ssl_certificate_validation=False,
+ ssl_version=None,
+ ):
+ httplib.HTTPConnection.__init__(
+ self, host, port=port, strict=strict, timeout=timeout
+ )
class AppEngineHttpsConnection(httplib.HTTPSConnection):
@@ -1179,36 +1509,54 @@
The parameters proxy_info, ca_certs, disable_ssl_certificate_validation,
and ssl_version are all dropped on the ground.
"""
- def __init__(self, host, port=None, key_file=None, cert_file=None,
- strict=None, timeout=None, proxy_info=None, ca_certs=None,
- disable_ssl_certificate_validation=False,
- ssl_version=None):
- httplib.HTTPSConnection.__init__(self, host, port=port,
- key_file=key_file,
- cert_file=cert_file, strict=strict,
- timeout=timeout)
- self._fetch = _new_fixed_fetch(
- not disable_ssl_certificate_validation)
+
+ def __init__(
+ self,
+ host,
+ port=None,
+ key_file=None,
+ cert_file=None,
+ strict=None,
+ timeout=None,
+ proxy_info=None,
+ ca_certs=None,
+ disable_ssl_certificate_validation=False,
+ ssl_version=None,
+ ):
+ httplib.HTTPSConnection.__init__(
+ self,
+ host,
+ port=port,
+ key_file=key_file,
+ cert_file=cert_file,
+ strict=strict,
+ timeout=timeout,
+ )
+ self._fetch = _new_fixed_fetch(not disable_ssl_certificate_validation)
+
# Use a different connection object for Google App Engine
try:
- server_software = os.environ.get('SERVER_SOFTWARE')
+ server_software = os.environ.get("SERVER_SOFTWARE")
if not server_software:
raise NotRunningAppEngineEnvironment()
- elif not (server_software.startswith('Google App Engine/') or
- server_software.startswith('Development/')):
+ elif not (
+ server_software.startswith("Google App Engine/")
+ or server_software.startswith("Development/")
+ ):
raise NotRunningAppEngineEnvironment()
from google.appengine.api import apiproxy_stub_map
- if apiproxy_stub_map.apiproxy.GetStub('urlfetch') is None:
+
+ if apiproxy_stub_map.apiproxy.GetStub("urlfetch") is None:
raise ImportError # Bail out; we're not actually running on App Engine.
from google.appengine.api.urlfetch import fetch
from google.appengine.api.urlfetch import InvalidURLError
# Update the connection classes to use the Googel App Engine specific ones.
SCHEME_TO_CONNECTION = {
- 'http': AppEngineHttpConnection,
- 'https': AppEngineHttpsConnection
+ "http": AppEngineHttpConnection,
+ "https": AppEngineHttpsConnection,
}
except (ImportError, AttributeError, NotRunningAppEngineEnvironment):
pass
@@ -1228,10 +1576,16 @@
and more.
"""
- def __init__(self, cache=None, timeout=None,
- proxy_info=proxy_info_from_environment,
- ca_certs=None, disable_ssl_certificate_validation=False,
- ssl_version=None):
+
+ def __init__(
+ self,
+ cache=None,
+ timeout=None,
+ proxy_info=proxy_info_from_environment,
+ ca_certs=None,
+ disable_ssl_certificate_validation=False,
+ ssl_version=None,
+ ):
"""If 'cache' is a string then it is used as a directory name for
a disk cache. Otherwise it must be an object that supports the
same interface as FileCache.
@@ -1259,8 +1613,7 @@
"""
self.proxy_info = proxy_info
self.ca_certs = ca_certs
- self.disable_ssl_certificate_validation = \
- disable_ssl_certificate_validation
+ self.disable_ssl_certificate_validation = disable_ssl_certificate_validation
self.ssl_version = ssl_version
# Map domain name to an httplib connection
@@ -1305,10 +1658,10 @@
state_dict = copy.copy(self.__dict__)
# In case request is augmented by some foreign object such as
# credentials which handle auth
- if 'request' in state_dict:
- del state_dict['request']
- if 'connections' in state_dict:
- del state_dict['connections']
+ if "request" in state_dict:
+ del state_dict["request"]
+ if "connections" in state_dict:
+ del state_dict["connections"]
return state_dict
def __setstate__(self, state):
@@ -1319,11 +1672,13 @@
"""A generator that creates Authorization objects
that can be applied to requests.
"""
- challenges = _parse_www_authenticate(response, 'www-authenticate')
+ challenges = _parse_www_authenticate(response, "www-authenticate")
for cred in self.credentials.iter(host):
for scheme in AUTH_SCHEME_ORDER:
if scheme in challenges:
- yield AUTH_SCHEME_CLASSES[scheme](cred, host, request_uri, headers, response, content, self)
+ yield AUTH_SCHEME_CLASSES[scheme](
+ cred, host, request_uri, headers, response, content, self
+ )
def add_credentials(self, name, password, domain=""):
"""Add a name and password that will be used
@@ -1347,7 +1702,7 @@
while i < RETRIES:
i += 1
try:
- if hasattr(conn, 'sock') and conn.sock is None:
+ if hasattr(conn, "sock") and conn.sock is None:
conn.connect()
conn.request(method, request_uri, body, headers)
except socket.timeout:
@@ -1360,8 +1715,8 @@
raise
except socket.error as e:
err = 0
- if hasattr(e, 'args'):
- err = getattr(e, 'args')[0]
+ if hasattr(e, "args"):
+ err = getattr(e, "args")[0]
else:
err = e.errno
if err == errno.ECONNREFUSED: # Connection refused
@@ -1371,15 +1726,15 @@
except httplib.HTTPException:
# Just because the server closed the connection doesn't apparently mean
# that the server didn't send a response.
- if hasattr(conn, 'sock') and conn.sock is None:
- if i < RETRIES-1:
+ if hasattr(conn, "sock") and conn.sock is None:
+ if i < RETRIES - 1:
conn.close()
conn.connect()
continue
else:
conn.close()
raise
- if i < RETRIES-1:
+ if i < RETRIES - 1:
conn.close()
conn.connect()
continue
@@ -1399,7 +1754,7 @@
conn.close()
raise
except (socket.error, httplib.HTTPException):
- if i < RETRIES-1:
+ if i < RETRIES - 1:
conn.close()
conn.connect()
continue
@@ -1418,77 +1773,121 @@
break
return (response, content)
-
- def _request(self, conn, host, absolute_uri, request_uri, method, body, headers, redirections, cachekey):
+ def _request(
+ self,
+ conn,
+ host,
+ absolute_uri,
+ request_uri,
+ method,
+ body,
+ headers,
+ redirections,
+ cachekey,
+ ):
"""Do the actual request using the connection object
and also follow one level of redirects if necessary"""
- auths = [(auth.depth(request_uri), auth) for auth in self.authorizations if auth.inscope(host, request_uri)]
+ auths = [
+ (auth.depth(request_uri), auth)
+ for auth in self.authorizations
+ if auth.inscope(host, request_uri)
+ ]
auth = auths and sorted(auths)[0][1] or None
if auth:
auth.request(method, request_uri, headers, body)
- (response, content) = self._conn_request(conn, request_uri, method, body, headers)
+ (response, content) = self._conn_request(
+ conn, request_uri, method, body, headers
+ )
if auth:
if auth.response(response, body):
auth.request(method, request_uri, headers, body)
- (response, content) = self._conn_request(conn, request_uri, method, body, headers )
+ (response, content) = self._conn_request(
+ conn, request_uri, method, body, headers
+ )
response._stale_digest = 1
if response.status == 401:
- for authorization in self._auth_from_challenge(host, request_uri, headers, response, content):
+ for authorization in self._auth_from_challenge(
+ host, request_uri, headers, response, content
+ ):
authorization.request(method, request_uri, headers, body)
- (response, content) = self._conn_request(conn, request_uri, method, body, headers, )
+ (response, content) = self._conn_request(
+ conn, request_uri, method, body, headers
+ )
if response.status != 401:
self.authorizations.append(authorization)
authorization.response(response, body)
break
- if (self.follow_all_redirects or (method in ["GET", "HEAD"]) or response.status == 303):
+ if (
+ self.follow_all_redirects
+ or (method in ["GET", "HEAD"])
+ or response.status == 303
+ ):
if self.follow_redirects and response.status in [300, 301, 302, 303, 307]:
# Pick out the location header and basically start from the beginning
# remembering first to strip the ETag header and decrement our 'depth'
if redirections:
- if 'location' not in response and response.status != 300:
- raise RedirectMissingLocation( _("Redirected but the response is missing a Location: header."), response, content)
+ if "location" not in response and response.status != 300:
+ raise RedirectMissingLocation(
+ _(
+ "Redirected but the response is missing a Location: header."
+ ),
+ response,
+ content,
+ )
# Fix-up relative redirects (which violate an RFC 2616 MUST)
- if 'location' in response:
- location = response['location']
+ if "location" in response:
+ location = response["location"]
(scheme, authority, path, query, fragment) = parse_uri(location)
if authority == None:
- response['location'] = urlparse.urljoin(absolute_uri, location)
+ response["location"] = urlparse.urljoin(
+ absolute_uri, location
+ )
if response.status == 301 and method in ["GET", "HEAD"]:
- response['-x-permanent-redirect-url'] = response['location']
- if 'content-location' not in response:
- response['content-location'] = absolute_uri
+ response["-x-permanent-redirect-url"] = response["location"]
+ if "content-location" not in response:
+ response["content-location"] = absolute_uri
_updateCache(headers, response, content, self.cache, cachekey)
- if 'if-none-match' in headers:
- del headers['if-none-match']
- if 'if-modified-since' in headers:
- del headers['if-modified-since']
- if 'authorization' in headers and not self.forward_authorization_headers:
- del headers['authorization']
- if 'location' in response:
- location = response['location']
+ if "if-none-match" in headers:
+ del headers["if-none-match"]
+ if "if-modified-since" in headers:
+ del headers["if-modified-since"]
+ if (
+ "authorization" in headers
+ and not self.forward_authorization_headers
+ ):
+ del headers["authorization"]
+ if "location" in response:
+ location = response["location"]
old_response = copy.deepcopy(response)
- if 'content-location' not in old_response:
- old_response['content-location'] = absolute_uri
+ if "content-location" not in old_response:
+ old_response["content-location"] = absolute_uri
redirect_method = method
if response.status in [302, 303]:
redirect_method = "GET"
body = None
(response, content) = self.request(
- location, method=redirect_method,
- body=body, headers=headers,
- redirections=redirections - 1)
+ location,
+ method=redirect_method,
+ body=body,
+ headers=headers,
+ redirections=redirections - 1,
+ )
response.previous = old_response
else:
- raise RedirectLimit("Redirected more times than rediection_limit allows.", response, content)
+ raise RedirectLimit(
+ "Redirected more times than rediection_limit allows.",
+ response,
+ content,
+ )
elif response.status in [200, 203] and method in ["GET", "HEAD"]:
# Don't cache 206's since we aren't going to handle byte range requests
- if 'content-location' not in response:
- response['content-location'] = absolute_uri
+ if "content-location" not in response:
+ response["content-location"] = absolute_uri
_updateCache(headers, response, content, self.cache, cachekey)
return (response, content)
@@ -1496,12 +1895,19 @@
def _normalize_headers(self, headers):
return _normalize_headers(headers)
-# Need to catch and rebrand some exceptions
-# Then need to optionally turn all exceptions into status codes
-# including all socket.* and httplib.* exceptions.
+ # Need to catch and rebrand some exceptions
+ # Then need to optionally turn all exceptions into status codes
+ # including all socket.* and httplib.* exceptions.
-
- def request(self, uri, method="GET", body=None, headers=None, redirections=DEFAULT_MAX_REDIRECTS, connection_type=None):
+ def request(
+ self,
+ uri,
+ method="GET",
+ body=None,
+ headers=None,
+ redirections=DEFAULT_MAX_REDIRECTS,
+ connection_type=None,
+ ):
""" Performs a single HTTP request.
The 'uri' is the URI of the HTTP resource and can begin with either
@@ -1529,57 +1935,60 @@
else:
headers = self._normalize_headers(headers)
- if 'user-agent' not in headers:
- headers['user-agent'] = "Python-httplib2/%s (gzip)" % __version__
+ if "user-agent" not in headers:
+ headers["user-agent"] = "Python-httplib2/%s (gzip)" % __version__
uri = iri2uri(uri)
(scheme, authority, request_uri, defrag_uri) = urlnorm(uri)
domain_port = authority.split(":")[0:2]
- if len(domain_port) == 2 and domain_port[1] == '443' and scheme == 'http':
- scheme = 'https'
+ if len(domain_port) == 2 and domain_port[1] == "443" and scheme == "http":
+ scheme = "https"
authority = domain_port[0]
proxy_info = self._get_proxy_info(scheme, authority)
- conn_key = scheme+":"+authority
+ conn_key = scheme + ":" + authority
if conn_key in self.connections:
conn = self.connections[conn_key]
else:
if not connection_type:
connection_type = SCHEME_TO_CONNECTION[scheme]
certs = list(self.certificates.iter(authority))
- if scheme == 'https':
+ if scheme == "https":
if certs:
conn = self.connections[conn_key] = connection_type(
- authority, key_file=certs[0][0],
- cert_file=certs[0][1], timeout=self.timeout,
- proxy_info=proxy_info,
- ca_certs=self.ca_certs,
- disable_ssl_certificate_validation=
- self.disable_ssl_certificate_validation,
- ssl_version=self.ssl_version)
+ authority,
+ key_file=certs[0][0],
+ cert_file=certs[0][1],
+ timeout=self.timeout,
+ proxy_info=proxy_info,
+ ca_certs=self.ca_certs,
+ disable_ssl_certificate_validation=self.disable_ssl_certificate_validation,
+ ssl_version=self.ssl_version,
+ )
else:
conn = self.connections[conn_key] = connection_type(
- authority, timeout=self.timeout,
- proxy_info=proxy_info,
- ca_certs=self.ca_certs,
- disable_ssl_certificate_validation=
- self.disable_ssl_certificate_validation,
- ssl_version=self.ssl_version)
+ authority,
+ timeout=self.timeout,
+ proxy_info=proxy_info,
+ ca_certs=self.ca_certs,
+ disable_ssl_certificate_validation=self.disable_ssl_certificate_validation,
+ ssl_version=self.ssl_version,
+ )
else:
conn = self.connections[conn_key] = connection_type(
- authority, timeout=self.timeout,
- proxy_info=proxy_info)
+ authority, timeout=self.timeout, proxy_info=proxy_info
+ )
conn.set_debuglevel(debuglevel)
- if 'range' not in headers and 'accept-encoding' not in headers:
- headers['accept-encoding'] = 'gzip, deflate'
+ if "range" not in headers and "accept-encoding" not in headers:
+ headers["accept-encoding"] = "gzip, deflate"
info = email.Message.Message()
cached_value = None
if self.cache:
- cachekey = defrag_uri.encode('utf-8')
+ cachekey = defrag_uri.encode("utf-8")
cached_value = self.cache.get(cachekey)
if cached_value:
# info = email.message_from_string(cached_value)
@@ -1588,7 +1997,7 @@
# to fix the non-existent bug not fixed in this
# bug report: http://mail.python.org/pipermail/python-bugs-list/2005-September/030289.html
try:
- info, content = cached_value.split('\r\n\r\n', 1)
+ info, content = cached_value.split("\r\n\r\n", 1)
feedparser = email.FeedParser.FeedParser()
feedparser.feed(info)
info = feedparser.close()
@@ -1600,9 +2009,15 @@
else:
cachekey = None
- if method in self.optimistic_concurrency_methods and self.cache and 'etag' in info and not self.ignore_etag and 'if-match' not in headers:
+ if (
+ method in self.optimistic_concurrency_methods
+ and self.cache
+ and "etag" in info
+ and not self.ignore_etag
+ and "if-match" not in headers
+ ):
# http://www.w3.org/1999/04/Editing/
- headers['if-match'] = info['etag']
+ headers["if-match"] = info["etag"]
if method not in ["GET", "HEAD"] and self.cache and cachekey:
# RFC 2616 Section 13.10
@@ -1610,24 +2025,36 @@
# Check the vary header in the cache to see if this request
# matches what varies in the cache.
- if method in ['GET', 'HEAD'] and 'vary' in info:
- vary = info['vary']
- vary_headers = vary.lower().replace(' ', '').split(',')
+ if method in ["GET", "HEAD"] and "vary" in info:
+ vary = info["vary"]
+ vary_headers = vary.lower().replace(" ", "").split(",")
for header in vary_headers:
- key = '-varied-%s' % header
+ key = "-varied-%s" % header
value = info[key]
if headers.get(header, None) != value:
cached_value = None
break
- if cached_value and method in ["GET", "HEAD"] and self.cache and 'range' not in headers:
- if '-x-permanent-redirect-url' in info:
+ if (
+ cached_value
+ and method in ["GET", "HEAD"]
+ and self.cache
+ and "range" not in headers
+ ):
+ if "-x-permanent-redirect-url" in info:
# Should cached permanent redirects be counted in our redirection count? For now, yes.
if redirections <= 0:
- raise RedirectLimit("Redirected more times than rediection_limit allows.", {}, "")
+ raise RedirectLimit(
+ "Redirected more times than rediection_limit allows.",
+ {},
+ "",
+ )
(response, new_content) = self.request(
- info['-x-permanent-redirect-url'], method='GET',
- headers=headers, redirections=redirections - 1)
+ info["-x-permanent-redirect-url"],
+ method="GET",
+ headers=headers,
+ redirections=redirections - 1,
+ )
response.previous = Response(info)
response.previous.fromcache = True
else:
@@ -1643,7 +2070,7 @@
if entry_disposition == "FRESH":
if not cached_value:
- info['status'] = '504'
+ info["status"] = "504"
content = ""
response = Response(info)
if cached_value:
@@ -1651,14 +2078,28 @@
return (response, content)
if entry_disposition == "STALE":
- if 'etag' in info and not self.ignore_etag and not 'if-none-match' in headers:
- headers['if-none-match'] = info['etag']
- if 'last-modified' in info and not 'last-modified' in headers:
- headers['if-modified-since'] = info['last-modified']
+ if (
+ "etag" in info
+ and not self.ignore_etag
+ and not "if-none-match" in headers
+ ):
+ headers["if-none-match"] = info["etag"]
+ if "last-modified" in info and not "last-modified" in headers:
+ headers["if-modified-since"] = info["last-modified"]
elif entry_disposition == "TRANSPARENT":
pass
- (response, new_content) = self._request(conn, authority, uri, request_uri, method, body, headers, redirections, cachekey)
+ (response, new_content) = self._request(
+ conn,
+ authority,
+ uri,
+ request_uri,
+ method,
+ body,
+ headers,
+ redirections,
+ cachekey,
+ )
if response.status == 304 and method == "GET":
# Rewrite the cache entry with the new end-to-end headers
@@ -1671,7 +2112,9 @@
merged_response = Response(info)
if hasattr(response, "_stale_digest"):
merged_response._stale_digest = response._stale_digest
- _updateCache(headers, merged_response, content, self.cache, cachekey)
+ _updateCache(
+ headers, merged_response, content, self.cache, cachekey
+ )
response = merged_response
response.status = 200
response.fromcache = True
@@ -1683,12 +2126,22 @@
content = new_content
else:
cc = _parse_cache_control(headers)
- if 'only-if-cached' in cc:
- info['status'] = '504'
+ if "only-if-cached" in cc:
+ info["status"] = "504"
response = Response(info)
content = ""
else:
- (response, content) = self._request(conn, authority, uri, request_uri, method, body, headers, redirections, cachekey)
+ (response, content) = self._request(
+ conn,
+ authority,
+ uri,
+ request_uri,
+ method,
+ body,
+ headers,
+ redirections,
+ cachekey,
+ )
except Exception as e:
if self.force_exception_to_status_code:
if isinstance(e, HttpLib2ErrorWithResponse):
@@ -1698,24 +2151,27 @@
response.reason = str(e)
elif isinstance(e, socket.timeout):
content = "Request Timeout"
- response = Response({
- "content-type": "text/plain",
- "status": "408",
- "content-length": len(content)
- })
+ response = Response(
+ {
+ "content-type": "text/plain",
+ "status": "408",
+ "content-length": len(content),
+ }
+ )
response.reason = "Request Timeout"
else:
content = str(e)
- response = Response({
- "content-type": "text/plain",
- "status": "400",
- "content-length": len(content)
- })
+ response = Response(
+ {
+ "content-type": "text/plain",
+ "status": "400",
+ "content-length": len(content),
+ }
+ )
response.reason = "Bad Request"
else:
raise
-
return (response, content)
def _get_proxy_info(self, scheme, authority):
@@ -1727,8 +2183,7 @@
if callable(proxy_info):
proxy_info = proxy_info(scheme)
- if (hasattr(proxy_info, 'applies_to')
- and not proxy_info.applies_to(hostname)):
+ if hasattr(proxy_info, "applies_to") and not proxy_info.applies_to(hostname):
proxy_info = None
return proxy_info
@@ -1738,13 +2193,14 @@
"""Is this response from our local cache"""
fromcache = False
+ """HTTP protocol version used by server.
- """HTTP protocol version used by server. 10 for HTTP/1.0, 11 for HTTP/1.1. """
+ 10 for HTTP/1.0, 11 for HTTP/1.1.
+ """
version = 11
"Status code returned by server. "
status = 200
-
"""Reason phrase returned by server."""
reason = "Ok"
@@ -1757,21 +2213,21 @@
for key, value in info.getheaders():
self[key.lower()] = value
self.status = info.status
- self['status'] = str(self.status)
+ self["status"] = str(self.status)
self.reason = info.reason
self.version = info.version
elif isinstance(info, email.Message.Message):
for key, value in info.items():
self[key.lower()] = value
- self.status = int(self['status'])
+ self.status = int(self["status"])
else:
for key, value in info.iteritems():
self[key.lower()] = value
- self.status = int(self.get('status', self.status))
- self.reason = self.get('reason', self.reason)
+ self.status = int(self.get("status", self.status))
+ self.reason = self.get("reason", self.reason)
def __getattr__(self, name):
- if name == 'dict':
+ if name == "dict":
return self
else:
raise AttributeError(name)
diff --git a/python2/httplib2/iri2uri.py b/python2/httplib2/iri2uri.py
index d88c91f..0a978a7 100644
--- a/python2/httplib2/iri2uri.py
+++ b/python2/httplib2/iri2uri.py
@@ -1,20 +1,13 @@
-"""
-iri2uri
+"""Converts an IRI to a URI."""
-Converts an IRI to a URI.
-
-"""
__author__ = "Joe Gregorio (joe@bitworking.org)"
__copyright__ = "Copyright 2006, Joe Gregorio"
__contributors__ = []
__version__ = "1.0.0"
__license__ = "MIT"
-__history__ = """
-"""
import urlparse
-
# Convert an IRI to a URI following the rules in RFC 3987
#
# The characters we need to enocde and escape are defined in the spec:
@@ -50,6 +43,7 @@
(0x100000, 0x10FFFD),
]
+
def encode(c):
retval = c
i = ord(c)
@@ -57,7 +51,7 @@
if i < low:
break
if i >= low and i <= high:
- retval = "".join(["%%%2X" % ord(o) for o in c.encode('utf-8')])
+ retval = "".join(["%%%2X" % ord(o) for o in c.encode("utf-8")])
break
return retval
@@ -66,9 +60,9 @@
"""Convert an IRI to a URI. Note that IRIs must be
passed in a unicode strings. That is, do not utf-8 encode
the IRI before passing it into the function."""
- if isinstance(uri ,unicode):
+ if isinstance(uri, unicode):
(scheme, authority, path, query, fragment) = urlparse.urlsplit(uri)
- authority = authority.encode('idna')
+ authority = authority.encode("idna")
# For each character in 'ucschar' or 'iprivate'
# 1. encode as utf-8
# 2. then %-encode each octet of that utf-8
@@ -76,11 +70,11 @@
uri = "".join([encode(c) for c in uri])
return uri
+
if __name__ == "__main__":
import unittest
class Test(unittest.TestCase):
-
def test_uris(self):
"""Test that URIs are invariant under the transformation."""
invariant = [
@@ -91,20 +85,39 @@
u"news:comp.infosystems.www.servers.unix",
u"tel:+1-816-555-1212",
u"telnet://192.0.2.16:80/",
- u"urn:oasis:names:specification:docbook:dtd:xml:4.1.2" ]
+ u"urn:oasis:names:specification:docbook:dtd:xml:4.1.2",
+ ]
for uri in invariant:
self.assertEqual(uri, iri2uri(uri))
def test_iri(self):
- """ Test that the right type of escaping is done for each part of the URI."""
- self.assertEqual("http://xn--o3h.com/%E2%98%84", iri2uri(u"http://\N{COMET}.com/\N{COMET}"))
- self.assertEqual("http://bitworking.org/?fred=%E2%98%84", iri2uri(u"http://bitworking.org/?fred=\N{COMET}"))
- self.assertEqual("http://bitworking.org/#%E2%98%84", iri2uri(u"http://bitworking.org/#\N{COMET}"))
+ """Test that the right type of escaping is done for each part of the URI."""
+ self.assertEqual(
+ "http://xn--o3h.com/%E2%98%84",
+ iri2uri(u"http://\N{COMET}.com/\N{COMET}"),
+ )
+ self.assertEqual(
+ "http://bitworking.org/?fred=%E2%98%84",
+ iri2uri(u"http://bitworking.org/?fred=\N{COMET}"),
+ )
+ self.assertEqual(
+ "http://bitworking.org/#%E2%98%84",
+ iri2uri(u"http://bitworking.org/#\N{COMET}"),
+ )
self.assertEqual("#%E2%98%84", iri2uri(u"#\N{COMET}"))
- self.assertEqual("/fred?bar=%E2%98%9A#%E2%98%84", iri2uri(u"/fred?bar=\N{BLACK LEFT POINTING INDEX}#\N{COMET}"))
- self.assertEqual("/fred?bar=%E2%98%9A#%E2%98%84", iri2uri(iri2uri(u"/fred?bar=\N{BLACK LEFT POINTING INDEX}#\N{COMET}")))
- self.assertNotEqual("/fred?bar=%E2%98%9A#%E2%98%84", iri2uri(u"/fred?bar=\N{BLACK LEFT POINTING INDEX}#\N{COMET}".encode('utf-8')))
+ self.assertEqual(
+ "/fred?bar=%E2%98%9A#%E2%98%84",
+ iri2uri(u"/fred?bar=\N{BLACK LEFT POINTING INDEX}#\N{COMET}"),
+ )
+ self.assertEqual(
+ "/fred?bar=%E2%98%9A#%E2%98%84",
+ iri2uri(iri2uri(u"/fred?bar=\N{BLACK LEFT POINTING INDEX}#\N{COMET}")),
+ )
+ self.assertNotEqual(
+ "/fred?bar=%E2%98%9A#%E2%98%84",
+ iri2uri(
+ u"/fred?bar=\N{BLACK LEFT POINTING INDEX}#\N{COMET}".encode("utf-8")
+ ),
+ )
unittest.main()
-
-
diff --git a/python2/httplib2/socks.py b/python2/httplib2/socks.py
index dbbe511..5cef776 100644
--- a/python2/httplib2/socks.py
+++ b/python2/httplib2/socks.py
@@ -1,4 +1,5 @@
"""SocksiPy - Python SOCKS module.
+
Version 1.00
Copyright 2006 Dan-Haim. All rights reserved.
@@ -24,20 +25,14 @@
LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMANGE.
-
This module provides a standard socket-like interface for Python
for tunneling connections through SOCKS proxies.
-"""
-
-"""
-
-Minor modifications made by Christopher Gilbert (http://motomastyle.com/)
-for use in PyLoris (http://pyloris.sourceforge.net/)
+Minor modifications made by Christopher Gilbert (http://motomastyle.com/) for
+use in PyLoris (http://pyloris.sourceforge.net/).
Minor modifications made by Mario Vilas (http://breakingcode.wordpress.com/)
-mainly to merge bug fixes found in Sourceforge
-
+mainly to merge bug fixes found in Sourceforge.
"""
import base64
@@ -45,8 +40,8 @@
import struct
import sys
-if getattr(socket, 'socket', None) is None:
- raise ImportError('socket.socket missing, proxy support unusable')
+if getattr(socket, "socket", None) is None:
+ raise ImportError("socket.socket missing, proxy support unusable")
PROXY_TYPE_SOCKS4 = 1
PROXY_TYPE_SOCKS5 = 2
@@ -56,21 +51,42 @@
_defaultproxy = None
_orgsocket = socket.socket
-class ProxyError(Exception): pass
-class GeneralProxyError(ProxyError): pass
-class Socks5AuthError(ProxyError): pass
-class Socks5Error(ProxyError): pass
-class Socks4Error(ProxyError): pass
-class HTTPError(ProxyError): pass
-_generalerrors = ("success",
+class ProxyError(Exception):
+ pass
+
+
+class GeneralProxyError(ProxyError):
+ pass
+
+
+class Socks5AuthError(ProxyError):
+ pass
+
+
+class Socks5Error(ProxyError):
+ pass
+
+
+class Socks4Error(ProxyError):
+ pass
+
+
+class HTTPError(ProxyError):
+ pass
+
+
+_generalerrors = (
+ "success",
"invalid data",
"not connected",
"not available",
"bad proxy type",
- "bad input")
+ "bad input",
+)
-_socks5errors = ("succeeded",
+_socks5errors = (
+ "succeeded",
"general SOCKS server failure",
"connection not allowed by ruleset",
"Network unreachable",
@@ -79,21 +95,30 @@
"TTL expired",
"Command not supported",
"Address type not supported",
- "Unknown error")
+ "Unknown error",
+)
-_socks5autherrors = ("succeeded",
+_socks5autherrors = (
+ "succeeded",
"authentication is required",
"all offered authentication methods were rejected",
"unknown username or invalid password",
- "unknown error")
+ "unknown error",
+)
-_socks4errors = ("request granted",
+_socks4errors = (
+ "request granted",
"request rejected or failed",
"request rejected because SOCKS server cannot connect to identd on the client",
- "request rejected because the client program and identd report different user-ids",
- "unknown error")
+ "request rejected because the client program and identd report different "
+ "user-ids",
+ "unknown error",
+)
-def setdefaultproxy(proxytype=None, addr=None, port=None, rdns=True, username=None, password=None):
+
+def setdefaultproxy(
+ proxytype=None, addr=None, port=None, rdns=True, username=None, password=None
+):
"""setdefaultproxy(proxytype, addr[, port[, rdns[, username[, password]]]])
Sets a default proxy which all further socksocket objects will use,
unless explicitly changed.
@@ -101,11 +126,14 @@
global _defaultproxy
_defaultproxy = (proxytype, addr, port, rdns, username, password)
+
def wrapmodule(module):
"""wrapmodule(module)
+
Attempts to replace a module's socket library with a SOCKS socket. Must set
a default proxy using setdefaultproxy(...) first.
- This will only work on modules that import socket directly into the namespace;
+ This will only work on modules that import socket directly into the
+ namespace;
most of the Python Standard Library falls into this category.
"""
if _defaultproxy != None:
@@ -113,6 +141,7 @@
else:
raise GeneralProxyError((4, "no proxy specified"))
+
class socksocket(socket.socket):
"""socksocket([family[, type[, proto]]]) -> socket object
Open a SOCKS enabled socket. The parameters are the same as
@@ -120,7 +149,9 @@
you must specify family=AF_INET, type=SOCK_STREAM and proto=0.
"""
- def __init__(self, family=socket.AF_INET, type=socket.SOCK_STREAM, proto=0, _sock=None):
+ def __init__(
+ self, family=socket.AF_INET, type=socket.SOCK_STREAM, proto=0, _sock=None
+ ):
_orgsocket.__init__(self, family, type, proto, _sock)
if _defaultproxy != None:
self.__proxy = _defaultproxy
@@ -137,8 +168,9 @@
"""
data = self.recv(count)
while len(data) < count:
- d = self.recv(count-len(data))
- if not d: raise GeneralProxyError((0, "connection closed unexpectedly"))
+ d = self.recv(count - len(data))
+ if not d:
+ raise GeneralProxyError((0, "connection closed unexpectedly"))
data = data + d
return data
@@ -167,7 +199,7 @@
hdrs.remove(endpt)
host = host.split(" ")[1]
endpt = endpt.split(" ")
- if (self.__proxy[4] != None and self.__proxy[5] != None):
+ if self.__proxy[4] != None and self.__proxy[5] != None:
hdrs.insert(0, self.__getauthheader())
hdrs.insert(0, "Host: %s" % host)
hdrs.insert(0, "%s http://%s%s %s" % (endpt[0], host, endpt[1], endpt[2]))
@@ -177,8 +209,18 @@
auth = self.__proxy[4] + ":" + self.__proxy[5]
return "Proxy-Authorization: Basic " + base64.b64encode(auth)
- def setproxy(self, proxytype=None, addr=None, port=None, rdns=True, username=None, password=None, headers=None):
+ def setproxy(
+ self,
+ proxytype=None,
+ addr=None,
+ port=None,
+ rdns=True,
+ username=None,
+ password=None,
+ headers=None,
+ ):
"""setproxy(proxytype, addr[, port[, rdns[, username[, password]]]])
+
Sets the proxy to be used.
proxytype - The type of the proxy to be used. Three types
are supported: PROXY_TYPE_SOCKS4 (including socks4a),
@@ -193,7 +235,8 @@
The default is no authentication.
password - Password to authenticate with to the server.
Only relevant when username is also provided.
- headers - Additional or modified headers for the proxy connect request.
+ headers - Additional or modified headers for the proxy connect
+ request.
"""
self.__proxy = (proxytype, addr, port, rdns, username, password, headers)
@@ -202,15 +245,15 @@
Negotiates a connection through a SOCKS5 server.
"""
# First we'll send the authentication packages we support.
- if (self.__proxy[4]!=None) and (self.__proxy[5]!=None):
+ if (self.__proxy[4] != None) and (self.__proxy[5] != None):
# The username/password details were supplied to the
# setproxy method so we support the USERNAME/PASSWORD
# authentication (in addition to the standard none).
- self.sendall(struct.pack('BBBB', 0x05, 0x02, 0x00, 0x02))
+ self.sendall(struct.pack("BBBB", 0x05, 0x02, 0x00, 0x02))
else:
# No username/password were entered, therefore we
# only support connections with no authentication.
- self.sendall(struct.pack('BBB', 0x05, 0x01, 0x00))
+ self.sendall(struct.pack("BBB", 0x05, 0x01, 0x00))
# We'll receive the server's response to determine which
# method was selected
chosenauth = self.__recvall(2)
@@ -224,7 +267,13 @@
elif chosenauth[1:2] == chr(0x02).encode():
# Okay, we need to perform a basic username/password
# authentication.
- self.sendall(chr(0x01).encode() + chr(len(self.__proxy[4])) + self.__proxy[4] + chr(len(self.__proxy[5])) + self.__proxy[5])
+ self.sendall(
+ chr(0x01).encode()
+ + chr(len(self.__proxy[4]))
+ + self.__proxy[4]
+ + chr(len(self.__proxy[5]))
+ + self.__proxy[5]
+ )
authstat = self.__recvall(2)
if authstat[0:1] != chr(0x01).encode():
# Bad response
@@ -243,7 +292,7 @@
else:
raise GeneralProxyError((1, _generalerrors[1]))
# Now we can request the actual connection
- req = struct.pack('BBB', 0x05, 0x01, 0x00)
+ req = struct.pack("BBB", 0x05, 0x01, 0x00)
# If the given destination address is an IP address, we'll
# use the IPv4 address request even if remote resolving was specified.
try:
@@ -254,7 +303,12 @@
if self.__proxy[3]:
# Resolve remotely
ipaddr = None
- req = req + chr(0x03).encode() + chr(len(destaddr)).encode() + destaddr.encode()
+ req = (
+ req
+ + chr(0x03).encode()
+ + chr(len(destaddr)).encode()
+ + destaddr.encode()
+ )
else:
# Resolve locally
ipaddr = socket.inet_aton(socket.gethostbyname(destaddr))
@@ -269,7 +323,7 @@
elif resp[1:2] != chr(0x00).encode():
# Connection failed
self.close()
- if ord(resp[1:2])<=8:
+ if ord(resp[1:2]) <= 8:
raise Socks5Error((ord(resp[1:2]), _socks5errors[ord(resp[1:2])]))
else:
raise Socks5Error((9, _socks5errors[9]))
@@ -281,7 +335,7 @@
boundaddr = self.__recvall(ord(resp[4:5]))
else:
self.close()
- raise GeneralProxyError((1,_generalerrors[1]))
+ raise GeneralProxyError((1, _generalerrors[1]))
boundport = struct.unpack(">H", self.__recvall(2))[0]
self.__proxysockname = (boundaddr, boundport)
if ipaddr != None:
@@ -308,7 +362,7 @@
"""
return self.__proxypeername
- def __negotiatesocks4(self,destaddr,destport):
+ def __negotiatesocks4(self, destaddr, destport):
"""__negotiatesocks4(self,destaddr,destport)
Negotiates a connection through a SOCKS4 server.
"""
@@ -340,7 +394,7 @@
if resp[0:1] != chr(0x00).encode():
# Bad data
self.close()
- raise GeneralProxyError((1,_generalerrors[1]))
+ raise GeneralProxyError((1, _generalerrors[1]))
if resp[1:2] != chr(0x5A).encode():
# Server returned an error
self.close()
@@ -350,7 +404,10 @@
else:
raise Socks4Error((94, _socks4errors[4]))
# Get the bound address/port
- self.__proxysockname = (socket.inet_ntoa(resp[4:]), struct.unpack(">H", resp[2:4])[0])
+ self.__proxysockname = (
+ socket.inet_ntoa(resp[4:]),
+ struct.unpack(">H", resp[2:4])[0],
+ )
if rmtrslv != None:
self.__proxypeername = (socket.inet_ntoa(ipaddr), destport)
else:
@@ -365,18 +422,18 @@
addr = socket.gethostbyname(destaddr)
else:
addr = destaddr
- headers = ["CONNECT ", addr, ":", str(destport), " HTTP/1.1\r\n"]
+ headers = ["CONNECT ", addr, ":", str(destport), " HTTP/1.1\r\n"]
wrote_host_header = False
wrote_auth_header = False
if self.__proxy[6] != None:
for key, val in self.__proxy[6].iteritems():
headers += [key, ": ", val, "\r\n"]
- wrote_host_header = (key.lower() == "host")
- wrote_auth_header = (key.lower() == "proxy-authorization")
+ wrote_host_header = key.lower() == "host"
+ wrote_auth_header = key.lower() == "proxy-authorization"
if not wrote_host_header:
headers += ["Host: ", destaddr, "\r\n"]
if not wrote_auth_header:
- if (self.__proxy[4] != None and self.__proxy[5] != None):
+ if self.__proxy[4] != None and self.__proxy[5] != None:
headers += [self.__getauthheader(), "\r\n"]
headers.append("\r\n")
self.sendall("".join(headers).encode())
@@ -409,7 +466,12 @@
To select the proxy server use setproxy().
"""
# Do a minimal input check first
- if (not type(destpair) in (list,tuple)) or (len(destpair) < 2) or (not isinstance(destpair[0], basestring)) or (type(destpair[1]) != int):
+ if (
+ (not type(destpair) in (list, tuple))
+ or (len(destpair) < 2)
+ or (not isinstance(destpair[0], basestring))
+ or (type(destpair[1]) != int)
+ ):
raise GeneralProxyError((5, _generalerrors[5]))
if self.__proxy[0] == PROXY_TYPE_SOCKS5:
if self.__proxy[2] != None:
@@ -423,23 +485,23 @@
portnum = self.__proxy[2]
else:
portnum = 1080
- _orgsocket.connect(self,(self.__proxy[1], portnum))
+ _orgsocket.connect(self, (self.__proxy[1], portnum))
self.__negotiatesocks4(destpair[0], destpair[1])
elif self.__proxy[0] == PROXY_TYPE_HTTP:
if self.__proxy[2] != None:
portnum = self.__proxy[2]
else:
portnum = 8080
- _orgsocket.connect(self,(self.__proxy[1], portnum))
+ _orgsocket.connect(self, (self.__proxy[1], portnum))
self.__negotiatehttp(destpair[0], destpair[1])
elif self.__proxy[0] == PROXY_TYPE_HTTP_NO_TUNNEL:
if self.__proxy[2] != None:
portnum = self.__proxy[2]
else:
portnum = 8080
- _orgsocket.connect(self,(self.__proxy[1],portnum))
+ _orgsocket.connect(self, (self.__proxy[1], portnum))
if destpair[1] == 443:
- self.__negotiatehttp(destpair[0],destpair[1])
+ self.__negotiatehttp(destpair[0], destpair[1])
else:
self.__httptunnel = False
elif self.__proxy[0] == None:
diff --git a/python2/httplib2/test/functional/test_proxies.py b/python2/httplib2/test/functional/test_proxies.py
index e11369d..939140d 100644
--- a/python2/httplib2/test/functional/test_proxies.py
+++ b/python2/httplib2/test/functional/test_proxies.py
@@ -27,35 +27,35 @@
class FunctionalProxyHttpTest(unittest.TestCase):
def setUp(self):
if not socks:
- raise nose.SkipTest('socks module unavailable')
+ raise nose.SkipTest("socks module unavailable")
if not subprocess:
- raise nose.SkipTest('subprocess module unavailable')
+ raise nose.SkipTest("subprocess module unavailable")
# start a short-lived miniserver so we can get a likely port
# for the proxy
- self.httpd, self.proxyport = miniserver.start_server(
- miniserver.ThisDirHandler)
+ self.httpd, self.proxyport = miniserver.start_server(miniserver.ThisDirHandler)
self.httpd.shutdown()
- self.httpd, self.port = miniserver.start_server(
- miniserver.ThisDirHandler)
+ self.httpd, self.port = miniserver.start_server(miniserver.ThisDirHandler)
self.pidfile = tempfile.mktemp()
self.logfile = tempfile.mktemp()
fd, self.conffile = tempfile.mkstemp()
- f = os.fdopen(fd, 'w')
- our_cfg = tinyproxy_cfg % {'user': os.getlogin(),
- 'pidfile': self.pidfile,
- 'port': self.proxyport,
- 'logfile': self.logfile}
+ f = os.fdopen(fd, "w")
+ our_cfg = tinyproxy_cfg % {
+ "user": os.getlogin(),
+ "pidfile": self.pidfile,
+ "port": self.proxyport,
+ "logfile": self.logfile,
+ }
f.write(our_cfg)
f.close()
try:
# TODO use subprocess.check_call when 2.4 is dropped
- ret = subprocess.call(['tinyproxy', '-c', self.conffile])
+ ret = subprocess.call(["tinyproxy", "-c", self.conffile])
self.assertEqual(0, ret)
except OSError as e:
if e.errno == errno.ENOENT:
- raise nose.SkipTest('tinyproxy not available')
+ raise nose.SkipTest("tinyproxy not available")
raise
def tearDown(self):
@@ -65,25 +65,23 @@
os.kill(pid, signal.SIGTERM)
except OSError as e:
if e.errno == errno.ESRCH:
- print('\n\n\nTinyProxy Failed to start, log follows:')
+ print("\n\n\nTinyProxy Failed to start, log follows:")
print(open(self.logfile).read())
- print('end tinyproxy log\n\n\n')
+ print("end tinyproxy log\n\n\n")
raise
- map(os.unlink, (self.pidfile,
- self.logfile,
- self.conffile))
+ map(os.unlink, (self.pidfile, self.logfile, self.conffile))
def testSimpleProxy(self):
- proxy_info = httplib2.ProxyInfo(socks.PROXY_TYPE_HTTP,
- 'localhost', self.proxyport)
+ proxy_info = httplib2.ProxyInfo(
+ socks.PROXY_TYPE_HTTP, "localhost", self.proxyport
+ )
client = httplib2.Http(proxy_info=proxy_info)
- src = 'miniserver.py'
- response, body = client.request('http://localhost:%d/%s' %
- (self.port, src))
+ src = "miniserver.py"
+ response, body = client.request("http://localhost:%d/%s" % (self.port, src))
self.assertEqual(response.status, 200)
self.assertEqual(body, open(os.path.join(miniserver.HERE, src)).read())
lf = open(self.logfile).read()
- expect = ('Established connection to host "127.0.0.1" '
- 'using file descriptor')
- self.assertTrue(expect in lf,
- 'tinyproxy did not proxy a request for miniserver')
+ expect = 'Established connection to host "127.0.0.1" ' "using file descriptor"
+ self.assertTrue(
+ expect in lf, "tinyproxy did not proxy a request for miniserver"
+ )
diff --git a/python2/httplib2/test/miniserver.py b/python2/httplib2/test/miniserver.py
index f72ecca..47c3ee5 100644
--- a/python2/httplib2/test/miniserver.py
+++ b/python2/httplib2/test/miniserver.py
@@ -12,8 +12,8 @@
class ThisDirHandler(SimpleHTTPServer.SimpleHTTPRequestHandler):
def translate_path(self, path):
- path = path.split('?', 1)[0].split('#', 1)[0]
- return os.path.join(HERE, *filter(None, path.split('/')))
+ path = path.split("?", 1)[0].split("#", 1)[0]
+ return os.path.join(HERE, *filter(None, path.split("/")))
def log_message(self, s, *args):
# output via logging so nose can catch it
@@ -38,12 +38,13 @@
SocketServer.TCPServer.server_bind(self)
if self.__use_tls:
import ssl
- self.socket = ssl.wrap_socket(self.socket,
- os.path.join(os.path.dirname(__file__), 'server.key'),
- os.path.join(os.path.dirname(__file__), 'server.pem'),
- True
- )
+ self.socket = ssl.wrap_socket(
+ self.socket,
+ os.path.join(os.path.dirname(__file__), "server.key"),
+ os.path.join(os.path.dirname(__file__), "server.pem"),
+ True,
+ )
def serve_forever(self, poll_interval=0.1):
"""Handle one request at a time until shutdown.
diff --git a/python2/httplib2/test/smoke_test.py b/python2/httplib2/test/smoke_test.py
index 9f1e6f0..25e9cf2 100644
--- a/python2/httplib2/test/smoke_test.py
+++ b/python2/httplib2/test/smoke_test.py
@@ -8,16 +8,14 @@
class HttpSmokeTest(unittest.TestCase):
def setUp(self):
- self.httpd, self.port = miniserver.start_server(
- miniserver.ThisDirHandler)
+ self.httpd, self.port = miniserver.start_server(miniserver.ThisDirHandler)
def tearDown(self):
self.httpd.shutdown()
def testGetFile(self):
client = httplib2.Http()
- src = 'miniserver.py'
- response, body = client.request('http://localhost:%d/%s' %
- (self.port, src))
+ src = "miniserver.py"
+ response, body = client.request("http://localhost:%d/%s" % (self.port, src))
self.assertEqual(response.status, 200)
self.assertEqual(body, open(os.path.join(miniserver.HERE, src)).read())
diff --git a/python2/httplib2/test/test_no_socket.py b/python2/httplib2/test/test_no_socket.py
index 66ba056..d251cbc 100644
--- a/python2/httplib2/test/test_no_socket.py
+++ b/python2/httplib2/test/test_no_socket.py
@@ -8,6 +8,7 @@
import httplib2
+
class MissingSocketTest(unittest.TestCase):
def setUp(self):
self._oldsocks = httplib2.socks
@@ -17,8 +18,8 @@
httplib2.socks = self._oldsocks
def testProxyDisabled(self):
- proxy_info = httplib2.ProxyInfo('blah',
- 'localhost', 0)
+ proxy_info = httplib2.ProxyInfo("blah", "localhost", 0)
client = httplib2.Http(proxy_info=proxy_info)
- self.assertRaises(httplib2.ProxiesUnavailableError,
- client.request, 'http://localhost:-1/')
+ self.assertRaises(
+ httplib2.ProxiesUnavailableError, client.request, "http://localhost:-1/"
+ )
diff --git a/python2/httplib2/test/test_ssl_context.py b/python2/httplib2/test/test_ssl_context.py
index 5cf9efb..43504dc 100644
--- a/python2/httplib2/test/test_ssl_context.py
+++ b/python2/httplib2/test/test_ssl_context.py
@@ -10,15 +10,14 @@
import httplib2
from httplib2.test import miniserver
-
logger = logging.getLogger(__name__)
class KeepAliveHandler(BaseHTTPServer.BaseHTTPRequestHandler):
+ """Request handler that keeps the HTTP connection open, so that the test can inspect the resulting SSL connection object
+
"""
- Request handler that keeps the HTTP connection open, so that the test can
- inspect the resulting SSL connection object
- """
+
def do_GET(self):
self.send_response(200)
self.send_header("Content-Length", "0")
@@ -40,7 +39,7 @@
else:
return
- self.ca_certs_path = os.path.join(os.path.dirname(__file__), 'server.pem')
+ self.ca_certs_path = os.path.join(os.path.dirname(__file__), "server.pem")
self.httpd, self.port = miniserver.start_server(KeepAliveHandler, True)
def tearDown(self):
@@ -50,16 +49,16 @@
client = httplib2.Http(ca_certs=self.ca_certs_path)
# Establish connection to local server
- client.request('https://localhost:%d/' % (self.port))
+ client.request("https://localhost:%d/" % (self.port))
# Verify that connection uses a TLS context with the correct hostname
- conn = client.connections['https:localhost:%d' % self.port]
+ conn = client.connections["https:localhost:%d" % self.port]
self.assertIsInstance(conn.sock, ssl.SSLSocket)
- self.assertTrue(hasattr(conn.sock, 'context'))
+ self.assertTrue(hasattr(conn.sock, "context"))
self.assertIsInstance(conn.sock.context, ssl.SSLContext)
self.assertTrue(conn.sock.context.check_hostname)
- self.assertEqual(conn.sock.server_hostname, 'localhost')
+ self.assertEqual(conn.sock.server_hostname, "localhost")
self.assertEqual(conn.sock.context.verify_mode, ssl.CERT_REQUIRED)
self.assertEqual(conn.sock.context.protocol, ssl.PROTOCOL_SSLv23)
@@ -72,15 +71,15 @@
# which was also added to original patch.
# url host is intentionally different, we provoke ssl hostname mismatch error
- url = 'https://127.0.0.1:%d/' % (self.port,)
+ url = "https://127.0.0.1:%d/" % (self.port,)
http = httplib2.Http(ca_certs=self.ca_certs_path, proxy_info=None)
def once():
try:
http.request(url)
- assert False, 'expected certificate hostname mismatch error'
+ assert False, "expected certificate hostname mismatch error"
except Exception as e:
- print('%s errno=%s' % (repr(e), getattr(e, 'errno', None)))
+ print("%s errno=%s" % (repr(e), getattr(e, "errno", None)))
once()
once()
diff --git a/python2/httplib2test.py b/python2/httplib2test.py
index 82faabc..3999622 100755
--- a/python2/httplib2test.py
+++ b/python2/httplib2test.py
@@ -1,27 +1,19 @@
#!/usr/bin/env python2.4
-"""
-httplib2test
-
-A set of unit tests for httplib2.py.
-
-Requires Python 2.4 or later
-"""
+"""A set of unit tests for httplib2.py."""
__author__ = "Joe Gregorio (joe@bitworking.org)"
__copyright__ = "Copyright 2006, Joe Gregorio"
__contributors__ = []
__license__ = "MIT"
-__history__ = """ """
__version__ = "0.1 ($Rev: 118 $)"
-
-import StringIO
import base64
import httplib
import httplib2
import os
import pickle
import socket
+import StringIO
import sys
import time
import unittest
@@ -33,13 +25,13 @@
pass
# Python 2.3 support
-if not hasattr(unittest.TestCase, 'assertTrue'):
+if not hasattr(unittest.TestCase, "assertTrue"):
unittest.TestCase.assertTrue = unittest.TestCase.failUnless
unittest.TestCase.assertFalse = unittest.TestCase.failIf
# The test resources base uri
-base = 'http://bitworking.org/projects/httplib2/test/'
-#base = 'http://localhost/projects/httplib2/test/'
+base = "http://bitworking.org/projects/httplib2/test/"
+# base = 'http://localhost/projects/httplib2/test/'
cacheDirName = ".cache"
@@ -64,49 +56,109 @@
class ParserTest(unittest.TestCase):
def testFromStd66(self):
- self.assertEqual( ('http', 'example.com', '', None, None ), httplib2.parse_uri("http://example.com"))
- self.assertEqual( ('https', 'example.com', '', None, None ), httplib2.parse_uri("https://example.com"))
- self.assertEqual( ('https', 'example.com:8080', '', None, None ), httplib2.parse_uri("https://example.com:8080"))
- self.assertEqual( ('http', 'example.com', '/', None, None ), httplib2.parse_uri("http://example.com/"))
- self.assertEqual( ('http', 'example.com', '/path', None, None ), httplib2.parse_uri("http://example.com/path"))
- self.assertEqual( ('http', 'example.com', '/path', 'a=1&b=2', None ), httplib2.parse_uri("http://example.com/path?a=1&b=2"))
- self.assertEqual( ('http', 'example.com', '/path', 'a=1&b=2', 'fred' ), httplib2.parse_uri("http://example.com/path?a=1&b=2#fred"))
- self.assertEqual( ('http', 'example.com', '/path', 'a=1&b=2', 'fred' ), httplib2.parse_uri("http://example.com/path?a=1&b=2#fred"))
+ self.assertEqual(
+ ("http", "example.com", "", None, None),
+ httplib2.parse_uri("http://example.com"),
+ )
+ self.assertEqual(
+ ("https", "example.com", "", None, None),
+ httplib2.parse_uri("https://example.com"),
+ )
+ self.assertEqual(
+ ("https", "example.com:8080", "", None, None),
+ httplib2.parse_uri("https://example.com:8080"),
+ )
+ self.assertEqual(
+ ("http", "example.com", "/", None, None),
+ httplib2.parse_uri("http://example.com/"),
+ )
+ self.assertEqual(
+ ("http", "example.com", "/path", None, None),
+ httplib2.parse_uri("http://example.com/path"),
+ )
+ self.assertEqual(
+ ("http", "example.com", "/path", "a=1&b=2", None),
+ httplib2.parse_uri("http://example.com/path?a=1&b=2"),
+ )
+ self.assertEqual(
+ ("http", "example.com", "/path", "a=1&b=2", "fred"),
+ httplib2.parse_uri("http://example.com/path?a=1&b=2#fred"),
+ )
+ self.assertEqual(
+ ("http", "example.com", "/path", "a=1&b=2", "fred"),
+ httplib2.parse_uri("http://example.com/path?a=1&b=2#fred"),
+ )
class UrlNormTest(unittest.TestCase):
def test(self):
- self.assertEqual( "http://example.org/", httplib2.urlnorm("http://example.org")[-1])
- self.assertEqual( "http://example.org/", httplib2.urlnorm("http://EXAMple.org")[-1])
- self.assertEqual( "http://example.org/?=b", httplib2.urlnorm("http://EXAMple.org?=b")[-1])
- self.assertEqual( "http://example.org/mypath?a=b", httplib2.urlnorm("http://EXAMple.org/mypath?a=b")[-1])
- self.assertEqual( "http://localhost:80/", httplib2.urlnorm("http://localhost:80")[-1])
- self.assertEqual( httplib2.urlnorm("http://localhost:80/"), httplib2.urlnorm("HTTP://LOCALHOST:80"))
+ self.assertEqual(
+ "http://example.org/", httplib2.urlnorm("http://example.org")[-1]
+ )
+ self.assertEqual(
+ "http://example.org/", httplib2.urlnorm("http://EXAMple.org")[-1]
+ )
+ self.assertEqual(
+ "http://example.org/?=b", httplib2.urlnorm("http://EXAMple.org?=b")[-1]
+ )
+ self.assertEqual(
+ "http://example.org/mypath?a=b",
+ httplib2.urlnorm("http://EXAMple.org/mypath?a=b")[-1],
+ )
+ self.assertEqual(
+ "http://localhost:80/", httplib2.urlnorm("http://localhost:80")[-1]
+ )
+ self.assertEqual(
+ httplib2.urlnorm("http://localhost:80/"),
+ httplib2.urlnorm("HTTP://LOCALHOST:80"),
+ )
try:
httplib2.urlnorm("/")
self.fail("Non-absolute URIs should raise an exception")
except httplib2.RelativeURIError:
pass
+
class UrlSafenameTest(unittest.TestCase):
def test(self):
# Test that different URIs end up generating different safe names
- self.assertEqual( "example.org,fred,a=b,58489f63a7a83c3b7794a6a398ee8b1f", httplib2.safename("http://example.org/fred/?a=b"))
- self.assertEqual( "example.org,fred,a=b,8c5946d56fec453071f43329ff0be46b", httplib2.safename("http://example.org/fred?/a=b"))
- self.assertEqual( "www.example.org,fred,a=b,499c44b8d844a011b67ea2c015116968", httplib2.safename("http://www.example.org/fred?/a=b"))
- self.assertEqual( httplib2.safename(httplib2.urlnorm("http://www")[-1]), httplib2.safename(httplib2.urlnorm("http://WWW")[-1]))
- self.assertEqual( "www.example.org,fred,a=b,692e843a333484ce0095b070497ab45d", httplib2.safename("https://www.example.org/fred?/a=b"))
- self.assertNotEqual( httplib2.safename("http://www"), httplib2.safename("https://www"))
+ self.assertEqual(
+ "example.org,fred,a=b,58489f63a7a83c3b7794a6a398ee8b1f",
+ httplib2.safename("http://example.org/fred/?a=b"),
+ )
+ self.assertEqual(
+ "example.org,fred,a=b,8c5946d56fec453071f43329ff0be46b",
+ httplib2.safename("http://example.org/fred?/a=b"),
+ )
+ self.assertEqual(
+ "www.example.org,fred,a=b,499c44b8d844a011b67ea2c015116968",
+ httplib2.safename("http://www.example.org/fred?/a=b"),
+ )
+ self.assertEqual(
+ httplib2.safename(httplib2.urlnorm("http://www")[-1]),
+ httplib2.safename(httplib2.urlnorm("http://WWW")[-1]),
+ )
+ self.assertEqual(
+ "www.example.org,fred,a=b,692e843a333484ce0095b070497ab45d",
+ httplib2.safename("https://www.example.org/fred?/a=b"),
+ )
+ self.assertNotEqual(
+ httplib2.safename("http://www"), httplib2.safename("https://www")
+ )
# Test the max length limits
uri = "http://" + ("w" * 200) + ".org"
uri2 = "http://" + ("w" * 201) + ".org"
- self.assertNotEqual( httplib2.safename(uri2), httplib2.safename(uri))
+ self.assertNotEqual(httplib2.safename(uri2), httplib2.safename(uri))
# Max length should be 200 + 1 (",") + 32
self.assertEqual(233, len(httplib2.safename(uri2)))
self.assertEqual(233, len(httplib2.safename(uri)))
# Unicode
- if sys.version_info >= (2,3):
- self.assertEqual( "xn--http,-4y1d.org,fred,a=b,579924c35db315e5a32e3d9963388193", httplib2.safename(u"http://\u2304.org/fred/?a=b"))
+ if sys.version_info >= (2, 3):
+ self.assertEqual(
+ "xn--http,-4y1d.org,fred,a=b,579924c35db315e5a32e3d9963388193",
+ httplib2.safename(u"http://\u2304.org/fred/?a=b"),
+ )
+
class _MyResponse(StringIO.StringIO):
def __init__(self, body, **kwargs):
@@ -120,8 +172,16 @@
class _MyHTTPConnection(object):
"This class is just a mock of httplib.HTTPConnection used for testing"
- def __init__(self, host, port=None, key_file=None, cert_file=None,
- strict=None, timeout=None, proxy_info=None):
+ def __init__(
+ self,
+ host,
+ port=None,
+ key_file=None,
+ cert_file=None,
+ strict=None,
+ timeout=None,
+ proxy_info=None,
+ ):
self.host = host
self.port = port
self.timeout = timeout
@@ -144,13 +204,22 @@
def getresponse(self):
return _MyResponse("the body", status="200")
+
class _MyHTTPBadStatusConnection(object):
"Mock of httplib.HTTPConnection that raises BadStatusLine."
num_calls = 0
- def __init__(self, host, port=None, key_file=None, cert_file=None,
- strict=None, timeout=None, proxy_info=None):
+ def __init__(
+ self,
+ host,
+ port=None,
+ key_file=None,
+ cert_file=None,
+ strict=None,
+ timeout=None,
+ proxy_info=None,
+ ):
self.host = host
self.port = port
self.timeout = timeout
@@ -178,43 +247,48 @@
class HttpTest(unittest.TestCase):
def setUp(self):
if os.path.exists(cacheDirName):
- [os.remove(os.path.join(cacheDirName, file)) for file in os.listdir(cacheDirName)]
+ [
+ os.remove(os.path.join(cacheDirName, file))
+ for file in os.listdir(cacheDirName)
+ ]
if sys.version_info < (2, 6):
disable_cert_validation = True
else:
disable_cert_validation = False
self.http = httplib2.Http(
- cacheDirName,
- disable_ssl_certificate_validation=disable_cert_validation)
+ cacheDirName, disable_ssl_certificate_validation=disable_cert_validation
+ )
self.http.clear_credentials()
def testIPv6NoSSL(self):
try:
- self.http.request("http://[::1]/")
+ self.http.request("http://[::1]/")
except socket.gaierror:
- self.fail("should get the address family right for IPv6")
+ self.fail("should get the address family right for IPv6")
except socket.error:
- # Even if IPv6 isn't installed on a machine it should just raise socket.error
- pass
+ # Even if IPv6 isn't installed on a machine it should just raise socket.error
+ pass
def testIPv6SSL(self):
try:
- self.http.request("https://[::1]/")
+ self.http.request("https://[::1]/")
except socket.gaierror:
- self.fail("should get the address family right for IPv6")
+ self.fail("should get the address family right for IPv6")
except httplib2.CertificateHostnameMismatch:
- # We connected and verified that the certificate doesn't match
- # the name. Good enough.
- pass
+ # We connected and verified that the certificate doesn't match
+ # the name. Good enough.
+ pass
except socket.error:
- # Even if IPv6 isn't installed on a machine it should just raise socket.error
- pass
+ # Even if IPv6 isn't installed on a machine it should just raise socket.error
+ pass
def testConnectionType(self):
self.http.force_exception_to_status_code = False
- response, content = self.http.request("http://bitworking.org", connection_type=_MyHTTPConnection)
- self.assertEqual(response['content-location'], "http://bitworking.org")
+ response, content = self.http.request(
+ "http://bitworking.org", connection_type=_MyHTTPConnection
+ )
+ self.assertEqual(response["content-location"], "http://bitworking.org")
self.assertEqual(content, "the body")
def testBadStatusLineRetry(self):
@@ -222,8 +296,9 @@
httplib2.RETRIES = 1
self.http.force_exception_to_status_code = False
try:
- response, content = self.http.request("http://bitworking.org",
- connection_type=_MyHTTPBadStatusConnection)
+ response, content = self.http.request(
+ "http://bitworking.org", connection_type=_MyHTTPBadStatusConnection
+ )
except httplib.BadStatusLine:
self.assertEqual(2, _MyHTTPBadStatusConnection.num_calls)
httplib2.RETRIES = old_retries
@@ -232,7 +307,9 @@
self.http.force_exception_to_status_code = False
try:
self.http.request("http://fred.bitworking.org/")
- self.fail("An httplib2.ServerNotFoundError Exception must be thrown on an unresolvable server.")
+ self.fail(
+ "An httplib2.ServerNotFoundError Exception must be thrown on an unresolvable server."
+ )
except httplib2.ServerNotFoundError:
pass
@@ -240,15 +317,15 @@
self.http.force_exception_to_status_code = True
(response, content) = self.http.request("http://fred.bitworking.org/")
- self.assertEqual(response['content-type'], 'text/plain')
+ self.assertEqual(response["content-type"], "text/plain")
self.assertTrue(content.startswith("Unable to find"))
self.assertEqual(response.status, 400)
def testGetConnectionRefused(self):
self.http.force_exception_to_status_code = False
try:
- self.http.request("http://localhost:7777/")
- self.fail("An socket.error exception must be thrown on Connection Refused.")
+ self.http.request("http://localhost:7777/")
+ self.fail("An socket.error exception must be thrown on Connection Refused.")
except socket.error:
pass
@@ -256,32 +333,35 @@
self.http.force_exception_to_status_code = True
(response, content) = self.http.request("http://localhost:7777/")
- self.assertEqual(response['content-type'], 'text/plain')
- self.assertTrue("Connection refused" in content
- or "actively refused" in content,
- "Unexpected status %(content)s" % vars())
+ self.assertEqual(response["content-type"], "text/plain")
+ self.assertTrue(
+ "Connection refused" in content or "actively refused" in content,
+ "Unexpected status %(content)s" % vars(),
+ )
self.assertEqual(response.status, 400)
def testGetIRI(self):
- if sys.version_info >= (2,3):
- uri = urlparse.urljoin(base, u"reflector/reflector.cgi?d=\N{CYRILLIC CAPITAL LETTER DJE}")
+ if sys.version_info >= (2, 3):
+ uri = urlparse.urljoin(
+ base, u"reflector/reflector.cgi?d=\N{CYRILLIC CAPITAL LETTER DJE}"
+ )
(response, content) = self.http.request(uri, "GET")
d = self.reflector(content)
- self.assertTrue('QUERY_STRING' in d)
- self.assertTrue(d['QUERY_STRING'].find('%D0%82') > 0)
+ self.assertTrue("QUERY_STRING" in d)
+ self.assertTrue(d["QUERY_STRING"].find("%D0%82") > 0)
def testGetIsDefaultMethod(self):
# Test that GET is the default method
uri = urlparse.urljoin(base, "methods/method_reflector.cgi")
(response, content) = self.http.request(uri)
- self.assertEqual(response['x-method'], "GET")
+ self.assertEqual(response["x-method"], "GET")
def testDifferentMethods(self):
# Test that all methods can be used
uri = urlparse.urljoin(base, "methods/method_reflector.cgi")
for method in ["GET", "PUT", "DELETE", "POST"]:
(response, content) = self.http.request(uri, method, body=" ")
- self.assertEqual(response['x-method'], method)
+ self.assertEqual(response["x-method"], method)
def testHeadRead(self):
# Test that we don't try to read the response of a HEAD request
@@ -305,14 +385,18 @@
# Test that can do a GET with cache and 'only-if-cached'
uri = urlparse.urljoin(base, "304/test_etag.txt")
(response, content) = self.http.request(uri, "GET")
- (response, content) = self.http.request(uri, "GET", headers={'cache-control': 'only-if-cached'})
+ (response, content) = self.http.request(
+ uri, "GET", headers={"cache-control": "only-if-cached"}
+ )
self.assertEqual(response.fromcache, True)
self.assertEqual(response.status, 200)
def testGetOnlyIfCachedCacheMiss(self):
# Test that can do a GET with no cache with 'only-if-cached'
uri = urlparse.urljoin(base, "304/test_etag.txt")
- (response, content) = self.http.request(uri, "GET", headers={'cache-control': 'only-if-cached'})
+ (response, content) = self.http.request(
+ uri, "GET", headers={"cache-control": "only-if-cached"}
+ )
self.assertEqual(response.fromcache, False)
self.assertEqual(response.status, 504)
@@ -323,7 +407,9 @@
# test can't really be guaranteed to pass.
http = httplib2.Http()
uri = urlparse.urljoin(base, "304/test_etag.txt")
- (response, content) = http.request(uri, "GET", headers={'cache-control': 'only-if-cached'})
+ (response, content) = http.request(
+ uri, "GET", headers={"cache-control": "only-if-cached"}
+ )
self.assertEqual(response.fromcache, False)
self.assertEqual(response.status, 504)
@@ -338,7 +424,9 @@
# Test that the default user-agent can be over-ridden
uri = urlparse.urljoin(base, "user-agent/test.cgi")
- (response, content) = self.http.request(uri, "GET", headers={'User-Agent': 'fred/1.0'})
+ (response, content) = self.http.request(
+ uri, "GET", headers={"User-Agent": "fred/1.0"}
+ )
self.assertEqual(response.status, 200)
self.assertTrue(content.startswith("fred/1.0"))
@@ -371,7 +459,7 @@
uri = urlparse.urljoin(base, "300/without-location-header.asis")
(response, content) = self.http.request(uri, "GET")
self.assertEqual(response.status, 300)
- self.assertTrue(response['content-type'].startswith("text/html"))
+ self.assertTrue(response["content-type"].startswith("text/html"))
self.assertEqual(response.previous, None)
def testGet301(self):
@@ -381,15 +469,15 @@
destination = urlparse.urljoin(base, "302/final-destination.txt")
(response, content) = self.http.request(uri, "GET")
self.assertEqual(response.status, 200)
- self.assertTrue('content-location' in response)
- self.assertEqual(response['content-location'], destination)
+ self.assertTrue("content-location" in response)
+ self.assertEqual(response["content-location"], destination)
self.assertEqual(content, "This is the final destination.\n")
self.assertEqual(response.previous.status, 301)
self.assertEqual(response.previous.fromcache, False)
(response, content) = self.http.request(uri, "GET")
self.assertEqual(response.status, 200)
- self.assertEqual(response['content-location'], destination)
+ self.assertEqual(response["content-location"], destination)
self.assertEqual(content, "This is the final destination.\n")
self.assertEqual(response.previous.status, 301)
self.assertEqual(response.previous.fromcache, True)
@@ -412,7 +500,6 @@
(response, content) = self.http.request(uri, "GET")
self.assertEqual(response.status, 301)
-
def testGet302(self):
# Test that we automatically follow 302 redirects
# and that we DO NOT cache the 302 response
@@ -420,7 +507,7 @@
destination = urlparse.urljoin(base, "302/final-destination.txt")
(response, content) = self.http.request(uri, "GET")
self.assertEqual(response.status, 200)
- self.assertEqual(response['content-location'], destination)
+ self.assertEqual(response["content-location"], destination)
self.assertEqual(content, "This is the final destination.\n")
self.assertEqual(response.previous.status, 302)
self.assertEqual(response.previous.fromcache, False)
@@ -429,11 +516,11 @@
(response, content) = self.http.request(uri, "GET")
self.assertEqual(response.status, 200)
self.assertEqual(response.fromcache, True)
- self.assertEqual(response['content-location'], destination)
+ self.assertEqual(response["content-location"], destination)
self.assertEqual(content, "This is the final destination.\n")
self.assertEqual(response.previous.status, 302)
self.assertEqual(response.previous.fromcache, False)
- self.assertEqual(response.previous['content-location'], uri)
+ self.assertEqual(response.previous["content-location"], uri)
uri = urlparse.urljoin(base, "302/twostep.asis")
@@ -452,7 +539,7 @@
uri = urlparse.urljoin(base, "302/twostep.asis")
try:
- (response, content) = self.http.request(uri, "GET", redirections = 1)
+ (response, content) = self.http.request(uri, "GET", redirections=1)
self.fail("This should not happen")
except httplib2.RedirectLimit:
pass
@@ -462,10 +549,10 @@
# Re-run the test with out the exceptions
self.http.force_exception_to_status_code = True
- (response, content) = self.http.request(uri, "GET", redirections = 1)
+ (response, content) = self.http.request(uri, "GET", redirections=1)
self.assertEqual(response.status, 500)
self.assertTrue(response.reason.startswith("Redirected more"))
- self.assertEqual("302", response['status'])
+ self.assertEqual("302", response["status"])
self.assertTrue(content.startswith("<html>"))
self.assertTrue(response.previous != None)
@@ -488,7 +575,7 @@
(response, content) = self.http.request(uri, "GET")
self.assertEqual(response.status, 500)
self.assertTrue(response.reason.startswith("Redirected but"))
- self.assertEqual("302", response['status'])
+ self.assertEqual("302", response["status"])
self.assertTrue(content.startswith("This is content"))
def testGet301ViaHttps(self):
@@ -499,7 +586,9 @@
def testGetViaHttps(self):
# Test that we can handle HTTPS
- (response, content) = self.http.request("https://www.google.com/adsense/", "GET")
+ (response, content) = self.http.request(
+ "https://www.google.com/adsense/", "GET"
+ )
self.assertEqual(200, response.status)
def testGetViaHttpsSpecViolationOnLocation(self):
@@ -514,30 +603,34 @@
def testSslCertValidationDoubleDots(self):
pass
# No longer a valid test.
- #if sys.version_info >= (2, 6):
+ # if sys.version_info >= (2, 6):
# Test that we get match a double dot cert
- #try:
+ # try:
# self.http.request("https://www.appspot.com/", "GET")
- #except httplib2.CertificateHostnameMismatch:
+ # except httplib2.CertificateHostnameMismatch:
# self.fail('cert with *.*.appspot.com should not raise an exception.')
def testSslHostnameValidation(self):
- pass
+ pass
# No longer a valid test.
- #if sys.version_info >= (2, 6):
- # The SSL server at google.com:443 returns a certificate for
- # 'www.google.com', which results in a host name mismatch.
- # Note that this test only works because the ssl module and httplib2
- # do not support SNI; for requests specifying a server name of
- # 'google.com' via SNI, a matching cert would be returned.
+ # if sys.version_info >= (2, 6):
+ # The SSL server at google.com:443 returns a certificate for
+ # 'www.google.com', which results in a host name mismatch.
+ # Note that this test only works because the ssl module and httplib2
+ # do not support SNI; for requests specifying a server name of
+ # 'google.com' via SNI, a matching cert would be returned.
# self.assertRaises(httplib2.CertificateHostnameMismatch,
# self.http.request, "https://google.com/", "GET")
def testSslCertValidationWithoutSslModuleFails(self):
if sys.version_info < (2, 6):
http = httplib2.Http(disable_ssl_certificate_validation=False)
- self.assertRaises(httplib2.CertificateValidationUnsupported,
- http.request, "https://www.google.com/", "GET")
+ self.assertRaises(
+ httplib2.CertificateValidationUnsupported,
+ http.request,
+ "https://www.google.com/",
+ "GET",
+ )
def testGetViaHttpsKeyCert(self):
# At this point I can only test
@@ -555,17 +648,20 @@
except:
pass
self.assertEqual(http.connections["https:bitworking.org"].key_file, "akeyfile")
- self.assertEqual(http.connections["https:bitworking.org"].cert_file, "acertfile")
+ self.assertEqual(
+ http.connections["https:bitworking.org"].cert_file, "acertfile"
+ )
try:
(response, content) = http.request("https://notthere.bitworking.org", "GET")
except:
pass
- self.assertEqual(http.connections["https:notthere.bitworking.org"].key_file, None)
- self.assertEqual(http.connections["https:notthere.bitworking.org"].cert_file, None)
-
-
-
+ self.assertEqual(
+ http.connections["https:notthere.bitworking.org"].key_file, None
+ )
+ self.assertEqual(
+ http.connections["https:notthere.bitworking.org"].cert_file, None
+ )
def testGet303(self):
# Do a follow-up GET on a Location: header
@@ -587,36 +683,46 @@
def test303ForDifferentMethods(self):
# Test that all methods can be used
uri = urlparse.urljoin(base, "303/redirect-to-reflector.cgi")
- for (method, method_on_303) in [("PUT", "GET"), ("DELETE", "GET"), ("POST", "GET"), ("GET", "GET"), ("HEAD", "GET")]:
+ for (method, method_on_303) in [
+ ("PUT", "GET"),
+ ("DELETE", "GET"),
+ ("POST", "GET"),
+ ("GET", "GET"),
+ ("HEAD", "GET"),
+ ]:
(response, content) = self.http.request(uri, method, body=" ")
- self.assertEqual(response['x-method'], method_on_303)
+ self.assertEqual(response["x-method"], method_on_303)
def test303AndForwardAuthorizationHeader(self):
# Test that all methods can be used
uri = urlparse.urljoin(base, "303/redirect-to-header-reflector.cgi")
- headers = {'authorization': 'Bearer foo'}
- response, content = self.http.request(uri, 'GET', body=" ",
- headers=headers)
+ headers = {"authorization": "Bearer foo"}
+ response, content = self.http.request(uri, "GET", body=" ", headers=headers)
# self.assertTrue('authorization' not in content)
self.http.follow_all_redirects = True
self.http.forward_authorization_headers = True
- response, content = self.http.request(uri, 'GET', body=" ",
- headers=headers)
+ response, content = self.http.request(uri, "GET", body=" ", headers=headers)
# Oh, how I wish Apache didn't eat the Authorization header.
# self.assertTrue('authorization' in content)
def testGet304(self):
# Test that we use ETags properly to validate our cache
uri = urlparse.urljoin(base, "304/test_etag.txt")
- (response, content) = self.http.request(uri, "GET", headers= {'accept-encoding': 'identity'})
- self.assertNotEqual(response['etag'], "")
+ (response, content) = self.http.request(
+ uri, "GET", headers={"accept-encoding": "identity"}
+ )
+ self.assertNotEqual(response["etag"], "")
(response, content) = self.http.request(uri, "GET")
- (response, content) = self.http.request(uri, "GET", headers = {'cache-control': 'must-revalidate'})
+ (response, content) = self.http.request(
+ uri, "GET", headers={"cache-control": "must-revalidate"}
+ )
self.assertEqual(response.status, 200)
self.assertEqual(response.fromcache, True)
- cache_file_name = os.path.join(cacheDirName, httplib2.safename(httplib2.urlnorm(uri)[-1]))
+ cache_file_name = os.path.join(
+ cacheDirName, httplib2.safename(httplib2.urlnorm(uri)[-1])
+ )
f = open(cache_file_name, "r")
status_line = f.readline()
f.close()
@@ -627,57 +733,83 @@
self.assertEqual(response.status, 200)
self.assertEqual(response.fromcache, True)
- (response, content) = self.http.request(uri, "GET", headers = {'range': 'bytes=0-0'})
+ (response, content) = self.http.request(
+ uri, "GET", headers={"range": "bytes=0-0"}
+ )
self.assertEqual(response.status, 206)
self.assertEqual(response.fromcache, False)
def testGetIgnoreEtag(self):
# Test that we can forcibly ignore ETags
uri = urlparse.urljoin(base, "reflector/reflector.cgi")
- (response, content) = self.http.request(uri, "GET", headers= {'accept-encoding': 'identity'})
- self.assertNotEqual(response['etag'], "")
+ (response, content) = self.http.request(
+ uri, "GET", headers={"accept-encoding": "identity"}
+ )
+ self.assertNotEqual(response["etag"], "")
- (response, content) = self.http.request(uri, "GET", headers = {'accept-encoding': 'identity', 'cache-control': 'max-age=0'})
+ (response, content) = self.http.request(
+ uri,
+ "GET",
+ headers={"accept-encoding": "identity", "cache-control": "max-age=0"},
+ )
d = self.reflector(content)
- self.assertTrue('HTTP_IF_NONE_MATCH' in d)
+ self.assertTrue("HTTP_IF_NONE_MATCH" in d)
self.http.ignore_etag = True
- (response, content) = self.http.request(uri, "GET", headers = {'accept-encoding': 'identity', 'cache-control': 'max-age=0'})
+ (response, content) = self.http.request(
+ uri,
+ "GET",
+ headers={"accept-encoding": "identity", "cache-control": "max-age=0"},
+ )
d = self.reflector(content)
self.assertEqual(response.fromcache, False)
- self.assertFalse('HTTP_IF_NONE_MATCH' in d)
+ self.assertFalse("HTTP_IF_NONE_MATCH" in d)
def testOverrideEtag(self):
# Test that we can forcibly ignore ETags
uri = urlparse.urljoin(base, "reflector/reflector.cgi")
- (response, content) = self.http.request(uri, "GET", headers= {'accept-encoding': 'identity'})
- self.assertNotEqual(response['etag'], "")
+ (response, content) = self.http.request(
+ uri, "GET", headers={"accept-encoding": "identity"}
+ )
+ self.assertNotEqual(response["etag"], "")
- (response, content) = self.http.request(uri, "GET", headers = {'accept-encoding': 'identity', 'cache-control': 'max-age=0'})
+ (response, content) = self.http.request(
+ uri,
+ "GET",
+ headers={"accept-encoding": "identity", "cache-control": "max-age=0"},
+ )
d = self.reflector(content)
- self.assertTrue('HTTP_IF_NONE_MATCH' in d)
- self.assertNotEqual(d['HTTP_IF_NONE_MATCH'], "fred")
+ self.assertTrue("HTTP_IF_NONE_MATCH" in d)
+ self.assertNotEqual(d["HTTP_IF_NONE_MATCH"], "fred")
- (response, content) = self.http.request(uri, "GET", headers = {'accept-encoding': 'identity', 'cache-control': 'max-age=0', 'if-none-match': 'fred'})
+ (response, content) = self.http.request(
+ uri,
+ "GET",
+ headers={
+ "accept-encoding": "identity",
+ "cache-control": "max-age=0",
+ "if-none-match": "fred",
+ },
+ )
d = self.reflector(content)
- self.assertTrue('HTTP_IF_NONE_MATCH' in d)
- self.assertEqual(d['HTTP_IF_NONE_MATCH'], "fred")
+ self.assertTrue("HTTP_IF_NONE_MATCH" in d)
+ self.assertEqual(d["HTTP_IF_NONE_MATCH"], "fred")
-#MAP-commented this out because it consistently fails
-# def testGet304EndToEnd(self):
-# # Test that end to end headers get overwritten in the cache
-# uri = urlparse.urljoin(base, "304/end2end.cgi")
-# (response, content) = self.http.request(uri, "GET")
-# self.assertNotEqual(response['etag'], "")
-# old_date = response['date']
-# time.sleep(2)
-#
-# (response, content) = self.http.request(uri, "GET", headers = {'Cache-Control': 'max-age=0'})
-# # The response should be from the cache, but the Date: header should be updated.
-# new_date = response['date']
-# self.assertNotEqual(new_date, old_date)
-# self.assertEqual(response.status, 200)
-# self.assertEqual(response.fromcache, True)
+ # MAP-commented this out because it consistently fails
+ # def testGet304EndToEnd(self):
+ # # Test that end to end headers get overwritten in the cache
+ # uri = urlparse.urljoin(base, "304/end2end.cgi")
+ # (response, content) = self.http.request(uri, "GET")
+ # self.assertNotEqual(response['etag'], "")
+ # old_date = response['date']
+ # time.sleep(2)
+ #
+ # (response, content) = self.http.request(uri, "GET", headers = {'Cache-Control': 'max-age=0'})
+ # # The response should be from the cache, but the Date: header should be updated.
+ # new_date = response['date']
+ # self.assertNotEqual(new_date, old_date)
+ # self.assertEqual(response.status, 200)
+ # self.assertEqual(response.fromcache, True)
def testGet304LastModified(self):
# Test that we can still handle a 304
@@ -685,7 +817,7 @@
uri = urlparse.urljoin(base, "304/last-modified-only/last-modified-only.txt")
(response, content) = self.http.request(uri, "GET")
- self.assertNotEqual(response['last-modified'], "")
+ self.assertNotEqual(response["last-modified"], "")
(response, content) = self.http.request(uri, "GET")
(response, content) = self.http.request(uri, "GET")
self.assertEqual(response.status, 200)
@@ -715,29 +847,29 @@
self.assertEqual(response.status, 410)
def testVaryHeaderSimple(self):
- """
- RFC 2616 13.6
- When the cache receives a subsequent request whose Request-URI
- specifies one or more cache entries including a Vary header field,
- the cache MUST NOT use such a cache entry to construct a response
- to the new request unless all of the selecting request-headers
- present in the new request match the corresponding stored
- request-headers in the original request.
+ """RFC 2616 13.6 When the cache receives a subsequent request whose Request-URI specifies one or more cache entries including a Vary header field, the cache MUST NOT use such a cache entry to construct a response to the new request unless all of the selecting request-headers present in the new request match the corresponding stored request-headers in the original request.
+
"""
# test that the vary header is sent
uri = urlparse.urljoin(base, "vary/accept.asis")
- (response, content) = self.http.request(uri, "GET", headers={'Accept': 'text/plain'})
+ (response, content) = self.http.request(
+ uri, "GET", headers={"Accept": "text/plain"}
+ )
self.assertEqual(response.status, 200)
- self.assertTrue('vary' in response)
+ self.assertTrue("vary" in response)
# get the resource again, from the cache since accept header in this
# request is the same as the request
- (response, content) = self.http.request(uri, "GET", headers={'Accept': 'text/plain'})
+ (response, content) = self.http.request(
+ uri, "GET", headers={"Accept": "text/plain"}
+ )
self.assertEqual(response.status, 200)
self.assertEqual(response.fromcache, True, msg="Should be from cache")
# get the resource again, not from cache since Accept headers does not match
- (response, content) = self.http.request(uri, "GET", headers={'Accept': 'text/html'})
+ (response, content) = self.http.request(
+ uri, "GET", headers={"Accept": "text/html"}
+ )
self.assertEqual(response.status, 200)
self.assertEqual(response.fromcache, False, msg="Should not be from cache")
@@ -766,45 +898,62 @@
def testVaryHeaderDouble(self):
uri = urlparse.urljoin(base, "vary/accept-double.asis")
- (response, content) = self.http.request(uri, "GET", headers={
- 'Accept': 'text/plain', 'Accept-Language': 'da, en-gb;q=0.8, en;q=0.7'})
+ (response, content) = self.http.request(
+ uri,
+ "GET",
+ headers={
+ "Accept": "text/plain",
+ "Accept-Language": "da, en-gb;q=0.8, en;q=0.7",
+ },
+ )
self.assertEqual(response.status, 200)
- self.assertTrue('vary' in response)
+ self.assertTrue("vary" in response)
# we are from cache
- (response, content) = self.http.request(uri, "GET", headers={
- 'Accept': 'text/plain', 'Accept-Language': 'da, en-gb;q=0.8, en;q=0.7'})
+ (response, content) = self.http.request(
+ uri,
+ "GET",
+ headers={
+ "Accept": "text/plain",
+ "Accept-Language": "da, en-gb;q=0.8, en;q=0.7",
+ },
+ )
self.assertEqual(response.fromcache, True, msg="Should be from cache")
- (response, content) = self.http.request(uri, "GET", headers={'Accept': 'text/plain'})
+ (response, content) = self.http.request(
+ uri, "GET", headers={"Accept": "text/plain"}
+ )
self.assertEqual(response.status, 200)
self.assertEqual(response.fromcache, False)
# get the resource again, not from cache, varied headers don't match exact
- (response, content) = self.http.request(uri, "GET", headers={'Accept-Language': 'da'})
+ (response, content) = self.http.request(
+ uri, "GET", headers={"Accept-Language": "da"}
+ )
self.assertEqual(response.status, 200)
self.assertEqual(response.fromcache, False, msg="Should not be from cache")
def testVaryUnusedHeader(self):
# A header's value is not considered to vary if it's not used at all.
uri = urlparse.urljoin(base, "vary/unused-header.asis")
- (response, content) = self.http.request(uri, "GET", headers={
- 'Accept': 'text/plain'})
+ (response, content) = self.http.request(
+ uri, "GET", headers={"Accept": "text/plain"}
+ )
self.assertEqual(response.status, 200)
- self.assertTrue('vary' in response)
+ self.assertTrue("vary" in response)
# we are from cache
- (response, content) = self.http.request(uri, "GET", headers={
- 'Accept': 'text/plain',})
+ (response, content) = self.http.request(
+ uri, "GET", headers={"Accept": "text/plain"}
+ )
self.assertEqual(response.fromcache, True, msg="Should be from cache")
-
def testHeadGZip(self):
# Test that we don't try to decompress a HEAD response
uri = urlparse.urljoin(base, "gzip/final-destination.txt")
(response, content) = self.http.request(uri, "HEAD")
self.assertEqual(response.status, 200)
- self.assertNotEqual(int(response['content-length']), 0)
+ self.assertNotEqual(int(response["content-length"]), 0)
self.assertEqual(content, "")
def testGetGZip(self):
@@ -812,17 +961,19 @@
uri = urlparse.urljoin(base, "gzip/final-destination.txt")
(response, content) = self.http.request(uri, "GET")
self.assertEqual(response.status, 200)
- self.assertFalse('content-encoding' in response)
- self.assertTrue('-content-encoding' in response)
- self.assertEqual(int(response['content-length']), len("This is the final destination.\n"))
+ self.assertFalse("content-encoding" in response)
+ self.assertTrue("-content-encoding" in response)
+ self.assertEqual(
+ int(response["content-length"]), len("This is the final destination.\n")
+ )
self.assertEqual(content, "This is the final destination.\n")
def testPostAndGZipResponse(self):
uri = urlparse.urljoin(base, "gzip/post.cgi")
(response, content) = self.http.request(uri, "POST", body=" ")
self.assertEqual(response.status, 200)
- self.assertFalse('content-encoding' in response)
- self.assertTrue('-content-encoding' in response)
+ self.assertFalse("content-encoding" in response)
+ self.assertTrue("-content-encoding" in response)
def testGetGZipFailure(self):
# Test that we raise a good exception when the gzip fails
@@ -848,6 +999,7 @@
uri = urlparse.urljoin(base, "timeout/timeout.cgi")
try:
import socket
+
socket.setdefaulttimeout(1)
except:
# Don't run the test if we can't set the timeout
@@ -867,9 +1019,8 @@
self.assertTrue(response.reason.startswith("Request Timeout"))
self.assertTrue(content.startswith("Request Timeout"))
-
def testHTTPSInitTimeout(self):
- c = httplib2.HTTPSConnectionWithTimeout('localhost', 80, timeout=47)
+ c = httplib2.HTTPSConnectionWithTimeout("localhost", 80, timeout=47)
self.assertEqual(47, c.timeout)
def testGetDeflate(self):
@@ -877,8 +1028,10 @@
uri = urlparse.urljoin(base, "deflate/deflated.asis")
(response, content) = self.http.request(uri, "GET")
self.assertEqual(response.status, 200)
- self.assertFalse('content-encoding' in response)
- self.assertEqual(int(response['content-length']), len("This is the final destination."))
+ self.assertFalse("content-encoding" in response)
+ self.assertEqual(
+ int(response["content-length"]), len("This is the final destination.")
+ )
self.assertEqual(content, "This is the final destination.")
def testGetDeflateFailure(self):
@@ -907,31 +1060,48 @@
(response, content) = self.http.request(uri, "GET")
self.assertEqual(response.status, 200)
self.assertEqual(content, "This is content\n")
- self.assertEqual(response['link'].split(",")[0], '<http://bitworking.org>; rel="home"; title="BitWorking"')
+ self.assertEqual(
+ response["link"].split(",")[0],
+ '<http://bitworking.org>; rel="home"; title="BitWorking"',
+ )
def testGetCacheControlNoCache(self):
# Test Cache-Control: no-cache on requests
uri = urlparse.urljoin(base, "304/test_etag.txt")
- (response, content) = self.http.request(uri, "GET", headers= {'accept-encoding': 'identity'})
- self.assertNotEqual(response['etag'], "")
- (response, content) = self.http.request(uri, "GET", headers= {'accept-encoding': 'identity'})
+ (response, content) = self.http.request(
+ uri, "GET", headers={"accept-encoding": "identity"}
+ )
+ self.assertNotEqual(response["etag"], "")
+ (response, content) = self.http.request(
+ uri, "GET", headers={"accept-encoding": "identity"}
+ )
self.assertEqual(response.status, 200)
self.assertEqual(response.fromcache, True)
- (response, content) = self.http.request(uri, "GET", headers={'accept-encoding': 'identity', 'Cache-Control': 'no-cache'})
+ (response, content) = self.http.request(
+ uri,
+ "GET",
+ headers={"accept-encoding": "identity", "Cache-Control": "no-cache"},
+ )
self.assertEqual(response.status, 200)
self.assertEqual(response.fromcache, False)
def testGetCacheControlPragmaNoCache(self):
# Test Pragma: no-cache on requests
uri = urlparse.urljoin(base, "304/test_etag.txt")
- (response, content) = self.http.request(uri, "GET", headers= {'accept-encoding': 'identity'})
- self.assertNotEqual(response['etag'], "")
- (response, content) = self.http.request(uri, "GET", headers= {'accept-encoding': 'identity'})
+ (response, content) = self.http.request(
+ uri, "GET", headers={"accept-encoding": "identity"}
+ )
+ self.assertNotEqual(response["etag"], "")
+ (response, content) = self.http.request(
+ uri, "GET", headers={"accept-encoding": "identity"}
+ )
self.assertEqual(response.status, 200)
self.assertEqual(response.fromcache, True)
- (response, content) = self.http.request(uri, "GET", headers={'accept-encoding': 'identity', 'Pragma': 'no-cache'})
+ (response, content) = self.http.request(
+ uri, "GET", headers={"accept-encoding": "identity", "Pragma": "no-cache"}
+ )
self.assertEqual(response.status, 200)
self.assertEqual(response.fromcache, False)
@@ -939,11 +1109,15 @@
# A no-store request means that the response should not be stored.
uri = urlparse.urljoin(base, "304/test_etag.txt")
- (response, content) = self.http.request(uri, "GET", headers={'Cache-Control': 'no-store'})
+ (response, content) = self.http.request(
+ uri, "GET", headers={"Cache-Control": "no-store"}
+ )
self.assertEqual(response.status, 200)
self.assertEqual(response.fromcache, False)
- (response, content) = self.http.request(uri, "GET", headers={'Cache-Control': 'no-store'})
+ (response, content) = self.http.request(
+ uri, "GET", headers={"Cache-Control": "no-store"}
+ )
self.assertEqual(response.status, 200)
self.assertEqual(response.fromcache, False)
@@ -967,8 +1141,12 @@
(response, content) = self.http.request(uri, "GET")
(response, content) = self.http.request(uri, "GET")
self.assertEqual(response.fromcache, True)
- (response, content) = self.http.request(uri, "GET", headers={'Cache-Control': 'no-store, no-cache'})
- (response, content) = self.http.request(uri, "GET", headers={'Cache-Control': 'no-store, no-cache'})
+ (response, content) = self.http.request(
+ uri, "GET", headers={"Cache-Control": "no-store, no-cache"}
+ )
+ (response, content) = self.http.request(
+ uri, "GET", headers={"Cache-Control": "no-store, no-cache"}
+ )
self.assertEqual(response.status, 200)
self.assertEqual(response.fromcache, False)
@@ -1016,7 +1194,6 @@
(response, content) = self.http.request(uri, "PATCH", body="foo")
self.assertEqual(response.status, 412)
-
def testUpdateUsesCachedETagAndOCMethod(self):
# Test that we natively support http://www.w3.org/1999/04/Editing/
uri = urlparse.urljoin(base, "conditional-updates/test.cgi")
@@ -1031,7 +1208,6 @@
(response, content) = self.http.request(uri, "DELETE")
self.assertEqual(response.status, 200)
-
def testUpdateUsesCachedETagOverridden(self):
# Test that we natively support http://www.w3.org/1999/04/Editing/
uri = urlparse.urljoin(base, "conditional-updates/test.cgi")
@@ -1042,7 +1218,9 @@
(response, content) = self.http.request(uri, "GET")
self.assertEqual(response.status, 200)
self.assertEqual(response.fromcache, True)
- (response, content) = self.http.request(uri, "PUT", body="foo", headers={'if-match': 'fred'})
+ (response, content) = self.http.request(
+ uri, "PUT", body="foo", headers={"if-match": "fred"}
+ )
self.assertEqual(response.status, 412)
def testBasicAuth(self):
@@ -1055,7 +1233,7 @@
(response, content) = self.http.request(uri, "GET")
self.assertEqual(response.status, 401)
- self.http.add_credentials('joe', 'password')
+ self.http.add_credentials("joe", "password")
(response, content) = self.http.request(uri, "GET")
self.assertEqual(response.status, 200)
@@ -1073,7 +1251,7 @@
(response, content) = self.http.request(uri, "GET")
self.assertEqual(response.status, 401)
- self.http.add_credentials('joe', 'password', "example.org")
+ self.http.add_credentials("joe", "password", "example.org")
(response, content) = self.http.request(uri, "GET")
self.assertEqual(response.status, 401)
@@ -1082,7 +1260,7 @@
self.assertEqual(response.status, 401)
domain = urlparse.urlparse(base)[1]
- self.http.add_credentials('joe', 'password', domain)
+ self.http.add_credentials("joe", "password", domain)
(response, content) = self.http.request(uri, "GET")
self.assertEqual(response.status, 200)
@@ -1090,11 +1268,6 @@
(response, content) = self.http.request(uri, "GET")
self.assertEqual(response.status, 200)
-
-
-
-
-
def testBasicAuthTwoDifferentCredentials(self):
# Test Basic Authentication with multiple sets of credentials
uri = urlparse.urljoin(base, "basic2/file.txt")
@@ -1105,7 +1278,7 @@
(response, content) = self.http.request(uri, "GET")
self.assertEqual(response.status, 401)
- self.http.add_credentials('fred', 'barney')
+ self.http.add_credentials("fred", "barney")
(response, content) = self.http.request(uri, "GET")
self.assertEqual(response.status, 200)
@@ -1125,7 +1298,7 @@
self.assertEqual(response.status, 401)
# Now add in credentials one at a time and test.
- self.http.add_credentials('joe', 'password')
+ self.http.add_credentials("joe", "password")
uri = urlparse.urljoin(base, "basic-nested/")
(response, content) = self.http.request(uri, "GET")
@@ -1135,7 +1308,7 @@
(response, content) = self.http.request(uri, "GET")
self.assertEqual(response.status, 401)
- self.http.add_credentials('fred', 'barney')
+ self.http.add_credentials("fred", "barney")
uri = urlparse.urljoin(base, "basic-nested/")
(response, content) = self.http.request(uri, "GET")
@@ -1151,7 +1324,7 @@
(response, content) = self.http.request(uri, "GET")
self.assertEqual(response.status, 401)
- self.http.add_credentials('joe', 'password')
+ self.http.add_credentials("joe", "password")
(response, content) = self.http.request(uri, "GET")
self.assertEqual(response.status, 200)
@@ -1162,49 +1335,59 @@
# Test that if the server sets nextnonce that we reset
# the nonce count back to 1
uri = urlparse.urljoin(base, "digest/file.txt")
- self.http.add_credentials('joe', 'password')
- (response, content) = self.http.request(uri, "GET", headers = {"cache-control":"no-cache"})
- info = httplib2._parse_www_authenticate(response, 'authentication-info')
+ self.http.add_credentials("joe", "password")
+ (response, content) = self.http.request(
+ uri, "GET", headers={"cache-control": "no-cache"}
+ )
+ info = httplib2._parse_www_authenticate(response, "authentication-info")
self.assertEqual(response.status, 200)
- (response, content) = self.http.request(uri, "GET", headers = {"cache-control":"no-cache"})
- info2 = httplib2._parse_www_authenticate(response, 'authentication-info')
+ (response, content) = self.http.request(
+ uri, "GET", headers={"cache-control": "no-cache"}
+ )
+ info2 = httplib2._parse_www_authenticate(response, "authentication-info")
self.assertEqual(response.status, 200)
- if 'nextnonce' in info:
- self.assertEqual(info2['nc'], 1)
+ if "nextnonce" in info:
+ self.assertEqual(info2["nc"], 1)
def testDigestAuthStale(self):
# Test that we can handle a nonce becoming stale
uri = urlparse.urljoin(base, "digest-expire/file.txt")
- self.http.add_credentials('joe', 'password')
- (response, content) = self.http.request(uri, "GET", headers = {"cache-control":"no-cache"})
- info = httplib2._parse_www_authenticate(response, 'authentication-info')
+ self.http.add_credentials("joe", "password")
+ (response, content) = self.http.request(
+ uri, "GET", headers={"cache-control": "no-cache"}
+ )
+ info = httplib2._parse_www_authenticate(response, "authentication-info")
self.assertEqual(response.status, 200)
time.sleep(3)
# Sleep long enough that the nonce becomes stale
- (response, content) = self.http.request(uri, "GET", headers = {"cache-control":"no-cache"})
+ (response, content) = self.http.request(
+ uri, "GET", headers={"cache-control": "no-cache"}
+ )
self.assertFalse(response.fromcache)
self.assertTrue(response._stale_digest)
- info3 = httplib2._parse_www_authenticate(response, 'authentication-info')
+ info3 = httplib2._parse_www_authenticate(response, "authentication-info")
self.assertEqual(response.status, 200)
def reflector(self, content):
- return dict( [tuple(x.split("=", 1)) for x in content.strip().split("\n")] )
+ return dict([tuple(x.split("=", 1)) for x in content.strip().split("\n")])
def testReflector(self):
uri = urlparse.urljoin(base, "reflector/reflector.cgi")
(response, content) = self.http.request(uri, "GET")
d = self.reflector(content)
- self.assertTrue('HTTP_USER_AGENT' in d)
+ self.assertTrue("HTTP_USER_AGENT" in d)
def testConnectionClose(self):
uri = "http://www.google.com/"
(response, content) = self.http.request(uri, "GET")
for c in self.http.connections.values():
self.assertNotEqual(None, c.sock)
- (response, content) = self.http.request(uri, "GET", headers={"connection": "close"})
+ (response, content) = self.http.request(
+ uri, "GET", headers={"connection": "close"}
+ )
for c in self.http.connections.values():
self.assertEqual(None, c.sock)
@@ -1212,43 +1395,48 @@
pickled_http = pickle.dumps(self.http)
new_http = pickle.loads(pickled_http)
- self.assertEqual(sorted(new_http.__dict__.keys()),
- sorted(self.http.__dict__.keys()))
+ self.assertEqual(
+ sorted(new_http.__dict__.keys()), sorted(self.http.__dict__.keys())
+ )
for key in new_http.__dict__:
- if key in ('certificates', 'credentials'):
- self.assertEqual(new_http.__dict__[key].credentials,
- self.http.__dict__[key].credentials)
- elif key == 'cache':
- self.assertEqual(new_http.__dict__[key].cache,
- self.http.__dict__[key].cache)
+ if key in ("certificates", "credentials"):
+ self.assertEqual(
+ new_http.__dict__[key].credentials,
+ self.http.__dict__[key].credentials,
+ )
+ elif key == "cache":
+ self.assertEqual(
+ new_http.__dict__[key].cache, self.http.__dict__[key].cache
+ )
else:
- self.assertEqual(new_http.__dict__[key],
- self.http.__dict__[key])
+ self.assertEqual(new_http.__dict__[key], self.http.__dict__[key])
def testPickleHttpWithConnection(self):
- self.http.request('http://bitworking.org',
- connection_type=_MyHTTPConnection)
+ self.http.request("http://bitworking.org", connection_type=_MyHTTPConnection)
pickled_http = pickle.dumps(self.http)
new_http = pickle.loads(pickled_http)
- self.assertEqual(self.http.connections.keys(), ['http:bitworking.org'])
+ self.assertEqual(self.http.connections.keys(), ["http:bitworking.org"])
self.assertEqual(new_http.connections, {})
def testPickleCustomRequestHttp(self):
def dummy_request(*args, **kwargs):
return new_request(*args, **kwargs)
- dummy_request.dummy_attr = 'dummy_value'
+
+ dummy_request.dummy_attr = "dummy_value"
self.http.request = dummy_request
pickled_http = pickle.dumps(self.http)
self.assertFalse("S'request'" in pickled_http)
+
try:
import memcache
+
class HttpTestMemCached(HttpTest):
def setUp(self):
- self.cache = memcache.Client(['127.0.0.1:11211'], debug=0)
- #self.cache = memcache.Client(['10.0.0.4:11211'], debug=1)
+ self.cache = memcache.Client(["127.0.0.1:11211"], debug=0)
+ # self.cache = memcache.Client(['10.0.0.4:11211'], debug=1)
self.http = httplib2.Http(self.cache)
self.cache.flush_all()
# Not exactly sure why the sleep is needed here, but
@@ -1258,171 +1446,189 @@
# was previously cached. (Maybe the flush is handled async?)
time.sleep(1)
self.http.clear_credentials()
+
+
except:
pass
-
-
-
# ------------------------------------------------------------------------
-class HttpPrivateTest(unittest.TestCase):
+class HttpPrivateTest(unittest.TestCase):
def testParseCacheControl(self):
# Test that we can parse the Cache-Control header
self.assertEqual({}, httplib2._parse_cache_control({}))
- self.assertEqual({'no-cache': 1}, httplib2._parse_cache_control({'cache-control': ' no-cache'}))
- cc = httplib2._parse_cache_control({'cache-control': ' no-cache, max-age = 7200'})
- self.assertEqual(cc['no-cache'], 1)
- self.assertEqual(cc['max-age'], '7200')
- cc = httplib2._parse_cache_control({'cache-control': ' , '})
- self.assertEqual(cc[''], 1)
+ self.assertEqual(
+ {"no-cache": 1},
+ httplib2._parse_cache_control({"cache-control": " no-cache"}),
+ )
+ cc = httplib2._parse_cache_control(
+ {"cache-control": " no-cache, max-age = 7200"}
+ )
+ self.assertEqual(cc["no-cache"], 1)
+ self.assertEqual(cc["max-age"], "7200")
+ cc = httplib2._parse_cache_control({"cache-control": " , "})
+ self.assertEqual(cc[""], 1)
try:
- cc = httplib2._parse_cache_control({'cache-control': 'Max-age=3600;post-check=1800,pre-check=3600'})
+ cc = httplib2._parse_cache_control(
+ {"cache-control": "Max-age=3600;post-check=1800,pre-check=3600"}
+ )
self.assertTrue("max-age" in cc)
except:
self.fail("Should not throw exception")
def testNormalizeHeaders(self):
# Test that we normalize headers to lowercase
- h = httplib2._normalize_headers({'Cache-Control': 'no-cache', 'Other': 'Stuff'})
- self.assertTrue('cache-control' in h)
- self.assertTrue('other' in h)
- self.assertEqual('Stuff', h['other'])
+ h = httplib2._normalize_headers({"Cache-Control": "no-cache", "Other": "Stuff"})
+ self.assertTrue("cache-control" in h)
+ self.assertTrue("other" in h)
+ self.assertEqual("Stuff", h["other"])
def testExpirationModelTransparent(self):
# Test that no-cache makes our request TRANSPARENT
- response_headers = {
- 'cache-control': 'max-age=7200'
- }
- request_headers = {
- 'cache-control': 'no-cache'
- }
- self.assertEqual("TRANSPARENT", httplib2._entry_disposition(response_headers, request_headers))
+ response_headers = {"cache-control": "max-age=7200"}
+ request_headers = {"cache-control": "no-cache"}
+ self.assertEqual(
+ "TRANSPARENT",
+ httplib2._entry_disposition(response_headers, request_headers),
+ )
def testMaxAgeNonNumeric(self):
# Test that no-cache makes our request TRANSPARENT
- response_headers = {
- 'cache-control': 'max-age=fred, min-fresh=barney'
- }
- request_headers = {
- }
- self.assertEqual("STALE", httplib2._entry_disposition(response_headers, request_headers))
-
+ response_headers = {"cache-control": "max-age=fred, min-fresh=barney"}
+ request_headers = {}
+ self.assertEqual(
+ "STALE", httplib2._entry_disposition(response_headers, request_headers)
+ )
def testExpirationModelNoCacheResponse(self):
# The date and expires point to an entry that should be
# FRESH, but the no-cache over-rides that.
now = time.time()
response_headers = {
- 'date': time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now)),
- 'expires': time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now+4)),
- 'cache-control': 'no-cache'
+ "date": time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now)),
+ "expires": time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now + 4)),
+ "cache-control": "no-cache",
}
- request_headers = {
- }
- self.assertEqual("STALE", httplib2._entry_disposition(response_headers, request_headers))
+ request_headers = {}
+ self.assertEqual(
+ "STALE", httplib2._entry_disposition(response_headers, request_headers)
+ )
def testExpirationModelStaleRequestMustReval(self):
# must-revalidate forces STALE
- self.assertEqual("STALE", httplib2._entry_disposition({}, {'cache-control': 'must-revalidate'}))
+ self.assertEqual(
+ "STALE",
+ httplib2._entry_disposition({}, {"cache-control": "must-revalidate"}),
+ )
def testExpirationModelStaleResponseMustReval(self):
# must-revalidate forces STALE
- self.assertEqual("STALE", httplib2._entry_disposition({'cache-control': 'must-revalidate'}, {}))
+ self.assertEqual(
+ "STALE",
+ httplib2._entry_disposition({"cache-control": "must-revalidate"}, {}),
+ )
def testExpirationModelFresh(self):
response_headers = {
- 'date': time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime()),
- 'cache-control': 'max-age=2'
+ "date": time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime()),
+ "cache-control": "max-age=2",
}
- request_headers = {
- }
- self.assertEqual("FRESH", httplib2._entry_disposition(response_headers, request_headers))
+ request_headers = {}
+ self.assertEqual(
+ "FRESH", httplib2._entry_disposition(response_headers, request_headers)
+ )
time.sleep(3)
- self.assertEqual("STALE", httplib2._entry_disposition(response_headers, request_headers))
+ self.assertEqual(
+ "STALE", httplib2._entry_disposition(response_headers, request_headers)
+ )
def testExpirationMaxAge0(self):
response_headers = {
- 'date': time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime()),
- 'cache-control': 'max-age=0'
+ "date": time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime()),
+ "cache-control": "max-age=0",
}
- request_headers = {
- }
- self.assertEqual("STALE", httplib2._entry_disposition(response_headers, request_headers))
+ request_headers = {}
+ self.assertEqual(
+ "STALE", httplib2._entry_disposition(response_headers, request_headers)
+ )
def testExpirationModelDateAndExpires(self):
now = time.time()
response_headers = {
- 'date': time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now)),
- 'expires': time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now+2)),
+ "date": time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now)),
+ "expires": time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now + 2)),
}
- request_headers = {
- }
- self.assertEqual("FRESH", httplib2._entry_disposition(response_headers, request_headers))
+ request_headers = {}
+ self.assertEqual(
+ "FRESH", httplib2._entry_disposition(response_headers, request_headers)
+ )
time.sleep(3)
- self.assertEqual("STALE", httplib2._entry_disposition(response_headers, request_headers))
+ self.assertEqual(
+ "STALE", httplib2._entry_disposition(response_headers, request_headers)
+ )
def testExpiresZero(self):
now = time.time()
response_headers = {
- 'date': time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now)),
- 'expires': "0",
+ "date": time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now)),
+ "expires": "0",
}
- request_headers = {
- }
- self.assertEqual("STALE", httplib2._entry_disposition(response_headers, request_headers))
+ request_headers = {}
+ self.assertEqual(
+ "STALE", httplib2._entry_disposition(response_headers, request_headers)
+ )
def testExpirationModelDateOnly(self):
now = time.time()
response_headers = {
- 'date': time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now+3)),
+ "date": time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now + 3))
}
- request_headers = {
- }
- self.assertEqual("STALE", httplib2._entry_disposition(response_headers, request_headers))
+ request_headers = {}
+ self.assertEqual(
+ "STALE", httplib2._entry_disposition(response_headers, request_headers)
+ )
def testExpirationModelOnlyIfCached(self):
- response_headers = {
- }
- request_headers = {
- 'cache-control': 'only-if-cached',
- }
- self.assertEqual("FRESH", httplib2._entry_disposition(response_headers, request_headers))
+ response_headers = {}
+ request_headers = {"cache-control": "only-if-cached"}
+ self.assertEqual(
+ "FRESH", httplib2._entry_disposition(response_headers, request_headers)
+ )
def testExpirationModelMaxAgeBoth(self):
now = time.time()
response_headers = {
- 'date': time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now)),
- 'cache-control': 'max-age=2'
+ "date": time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now)),
+ "cache-control": "max-age=2",
}
- request_headers = {
- 'cache-control': 'max-age=0'
- }
- self.assertEqual("STALE", httplib2._entry_disposition(response_headers, request_headers))
+ request_headers = {"cache-control": "max-age=0"}
+ self.assertEqual(
+ "STALE", httplib2._entry_disposition(response_headers, request_headers)
+ )
def testExpirationModelDateAndExpiresMinFresh1(self):
now = time.time()
response_headers = {
- 'date': time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now)),
- 'expires': time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now+2)),
+ "date": time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now)),
+ "expires": time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now + 2)),
}
- request_headers = {
- 'cache-control': 'min-fresh=2'
- }
- self.assertEqual("STALE", httplib2._entry_disposition(response_headers, request_headers))
+ request_headers = {"cache-control": "min-fresh=2"}
+ self.assertEqual(
+ "STALE", httplib2._entry_disposition(response_headers, request_headers)
+ )
def testExpirationModelDateAndExpiresMinFresh2(self):
now = time.time()
response_headers = {
- 'date': time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now)),
- 'expires': time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now+4)),
+ "date": time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now)),
+ "expires": time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now + 4)),
}
- request_headers = {
- 'cache-control': 'min-fresh=2'
- }
- self.assertEqual("FRESH", httplib2._entry_disposition(response_headers, request_headers))
+ request_headers = {"cache-control": "min-fresh=2"}
+ self.assertEqual(
+ "FRESH", httplib2._entry_disposition(response_headers, request_headers)
+ )
def testParseWWWAuthenticateEmpty(self):
res = httplib2._parse_www_authenticate({})
@@ -1430,199 +1636,275 @@
def testParseWWWAuthenticate(self):
# different uses of spaces around commas
- res = httplib2._parse_www_authenticate({ 'www-authenticate': 'Test realm="test realm" , foo=foo ,bar="bar", baz=baz,qux=qux'})
+ res = httplib2._parse_www_authenticate(
+ {
+ "www-authenticate": 'Test realm="test realm" , foo=foo ,bar="bar", baz=baz,qux=qux'
+ }
+ )
self.assertEqual(len(res.keys()), 1)
- self.assertEqual(len(res['test'].keys()), 5)
+ self.assertEqual(len(res["test"].keys()), 5)
# tokens with non-alphanum
- res = httplib2._parse_www_authenticate({ 'www-authenticate': 'T*!%#st realm=to*!%#en, to*!%#en="quoted string"'})
+ res = httplib2._parse_www_authenticate(
+ {"www-authenticate": 'T*!%#st realm=to*!%#en, to*!%#en="quoted string"'}
+ )
self.assertEqual(len(res.keys()), 1)
- self.assertEqual(len(res['t*!%#st'].keys()), 2)
+ self.assertEqual(len(res["t*!%#st"].keys()), 2)
# quoted string with quoted pairs
- res = httplib2._parse_www_authenticate({ 'www-authenticate': 'Test realm="a \\"test\\" realm"'})
+ res = httplib2._parse_www_authenticate(
+ {"www-authenticate": 'Test realm="a \\"test\\" realm"'}
+ )
self.assertEqual(len(res.keys()), 1)
- self.assertEqual(res['test']['realm'], 'a "test" realm')
+ self.assertEqual(res["test"]["realm"], 'a "test" realm')
def testParseWWWAuthenticateStrict(self):
- httplib2.USE_WWW_AUTH_STRICT_PARSING = 1;
- self.testParseWWWAuthenticate();
- httplib2.USE_WWW_AUTH_STRICT_PARSING = 0;
+ httplib2.USE_WWW_AUTH_STRICT_PARSING = 1
+ self.testParseWWWAuthenticate()
+ httplib2.USE_WWW_AUTH_STRICT_PARSING = 0
def testParseWWWAuthenticateBasic(self):
- res = httplib2._parse_www_authenticate({ 'www-authenticate': 'Basic realm="me"'})
- basic = res['basic']
- self.assertEqual('me', basic['realm'])
+ res = httplib2._parse_www_authenticate({"www-authenticate": 'Basic realm="me"'})
+ basic = res["basic"]
+ self.assertEqual("me", basic["realm"])
- res = httplib2._parse_www_authenticate({ 'www-authenticate': 'Basic realm="me", algorithm="MD5"'})
- basic = res['basic']
- self.assertEqual('me', basic['realm'])
- self.assertEqual('MD5', basic['algorithm'])
+ res = httplib2._parse_www_authenticate(
+ {"www-authenticate": 'Basic realm="me", algorithm="MD5"'}
+ )
+ basic = res["basic"]
+ self.assertEqual("me", basic["realm"])
+ self.assertEqual("MD5", basic["algorithm"])
- res = httplib2._parse_www_authenticate({ 'www-authenticate': 'Basic realm="me", algorithm=MD5'})
- basic = res['basic']
- self.assertEqual('me', basic['realm'])
- self.assertEqual('MD5', basic['algorithm'])
+ res = httplib2._parse_www_authenticate(
+ {"www-authenticate": 'Basic realm="me", algorithm=MD5'}
+ )
+ basic = res["basic"]
+ self.assertEqual("me", basic["realm"])
+ self.assertEqual("MD5", basic["algorithm"])
def testParseWWWAuthenticateBasic2(self):
- res = httplib2._parse_www_authenticate({ 'www-authenticate': 'Basic realm="me",other="fred" '})
- basic = res['basic']
- self.assertEqual('me', basic['realm'])
- self.assertEqual('fred', basic['other'])
+ res = httplib2._parse_www_authenticate(
+ {"www-authenticate": 'Basic realm="me",other="fred" '}
+ )
+ basic = res["basic"]
+ self.assertEqual("me", basic["realm"])
+ self.assertEqual("fred", basic["other"])
def testParseWWWAuthenticateBasic3(self):
- res = httplib2._parse_www_authenticate({ 'www-authenticate': 'Basic REAlm="me" '})
- basic = res['basic']
- self.assertEqual('me', basic['realm'])
-
+ res = httplib2._parse_www_authenticate(
+ {"www-authenticate": 'Basic REAlm="me" '}
+ )
+ basic = res["basic"]
+ self.assertEqual("me", basic["realm"])
def testParseWWWAuthenticateDigest(self):
- res = httplib2._parse_www_authenticate({ 'www-authenticate':
- 'Digest realm="testrealm@host.com", qop="auth,auth-int", nonce="dcd98b7102dd2f0e8b11d0f600bfb0c093", opaque="5ccc069c403ebaf9f0171e9517f40e41"'})
- digest = res['digest']
- self.assertEqual('testrealm@host.com', digest['realm'])
- self.assertEqual('auth,auth-int', digest['qop'])
-
+ res = httplib2._parse_www_authenticate(
+ {
+ "www-authenticate": 'Digest realm="testrealm@host.com", qop="auth,auth-int", nonce="dcd98b7102dd2f0e8b11d0f600bfb0c093", opaque="5ccc069c403ebaf9f0171e9517f40e41"'
+ }
+ )
+ digest = res["digest"]
+ self.assertEqual("testrealm@host.com", digest["realm"])
+ self.assertEqual("auth,auth-int", digest["qop"])
def testParseWWWAuthenticateMultiple(self):
- res = httplib2._parse_www_authenticate({ 'www-authenticate':
- 'Digest realm="testrealm@host.com", qop="auth,auth-int", nonce="dcd98b7102dd2f0e8b11d0f600bfb0c093", opaque="5ccc069c403ebaf9f0171e9517f40e41" Basic REAlm="me" '})
- digest = res['digest']
- self.assertEqual('testrealm@host.com', digest['realm'])
- self.assertEqual('auth,auth-int', digest['qop'])
- self.assertEqual('dcd98b7102dd2f0e8b11d0f600bfb0c093', digest['nonce'])
- self.assertEqual('5ccc069c403ebaf9f0171e9517f40e41', digest['opaque'])
- basic = res['basic']
- self.assertEqual('me', basic['realm'])
+ res = httplib2._parse_www_authenticate(
+ {
+ "www-authenticate": 'Digest realm="testrealm@host.com", qop="auth,auth-int", nonce="dcd98b7102dd2f0e8b11d0f600bfb0c093", opaque="5ccc069c403ebaf9f0171e9517f40e41" Basic REAlm="me" '
+ }
+ )
+ digest = res["digest"]
+ self.assertEqual("testrealm@host.com", digest["realm"])
+ self.assertEqual("auth,auth-int", digest["qop"])
+ self.assertEqual("dcd98b7102dd2f0e8b11d0f600bfb0c093", digest["nonce"])
+ self.assertEqual("5ccc069c403ebaf9f0171e9517f40e41", digest["opaque"])
+ basic = res["basic"]
+ self.assertEqual("me", basic["realm"])
def testParseWWWAuthenticateMultiple2(self):
# Handle an added comma between challenges, which might get thrown in if the challenges were
# originally sent in separate www-authenticate headers.
- res = httplib2._parse_www_authenticate({ 'www-authenticate':
- 'Digest realm="testrealm@host.com", qop="auth,auth-int", nonce="dcd98b7102dd2f0e8b11d0f600bfb0c093", opaque="5ccc069c403ebaf9f0171e9517f40e41", Basic REAlm="me" '})
- digest = res['digest']
- self.assertEqual('testrealm@host.com', digest['realm'])
- self.assertEqual('auth,auth-int', digest['qop'])
- self.assertEqual('dcd98b7102dd2f0e8b11d0f600bfb0c093', digest['nonce'])
- self.assertEqual('5ccc069c403ebaf9f0171e9517f40e41', digest['opaque'])
- basic = res['basic']
- self.assertEqual('me', basic['realm'])
+ res = httplib2._parse_www_authenticate(
+ {
+ "www-authenticate": 'Digest realm="testrealm@host.com", qop="auth,auth-int", nonce="dcd98b7102dd2f0e8b11d0f600bfb0c093", opaque="5ccc069c403ebaf9f0171e9517f40e41", Basic REAlm="me" '
+ }
+ )
+ digest = res["digest"]
+ self.assertEqual("testrealm@host.com", digest["realm"])
+ self.assertEqual("auth,auth-int", digest["qop"])
+ self.assertEqual("dcd98b7102dd2f0e8b11d0f600bfb0c093", digest["nonce"])
+ self.assertEqual("5ccc069c403ebaf9f0171e9517f40e41", digest["opaque"])
+ basic = res["basic"]
+ self.assertEqual("me", basic["realm"])
def testParseWWWAuthenticateMultiple3(self):
# Handle an added comma between challenges, which might get thrown in if the challenges were
# originally sent in separate www-authenticate headers.
- res = httplib2._parse_www_authenticate({ 'www-authenticate':
- 'Digest realm="testrealm@host.com", qop="auth,auth-int", nonce="dcd98b7102dd2f0e8b11d0f600bfb0c093", opaque="5ccc069c403ebaf9f0171e9517f40e41", Basic REAlm="me", WSSE realm="foo", profile="UsernameToken"'})
- digest = res['digest']
- self.assertEqual('testrealm@host.com', digest['realm'])
- self.assertEqual('auth,auth-int', digest['qop'])
- self.assertEqual('dcd98b7102dd2f0e8b11d0f600bfb0c093', digest['nonce'])
- self.assertEqual('5ccc069c403ebaf9f0171e9517f40e41', digest['opaque'])
- basic = res['basic']
- self.assertEqual('me', basic['realm'])
- wsse = res['wsse']
- self.assertEqual('foo', wsse['realm'])
- self.assertEqual('UsernameToken', wsse['profile'])
+ res = httplib2._parse_www_authenticate(
+ {
+ "www-authenticate": 'Digest realm="testrealm@host.com", qop="auth,auth-int", nonce="dcd98b7102dd2f0e8b11d0f600bfb0c093", opaque="5ccc069c403ebaf9f0171e9517f40e41", Basic REAlm="me", WSSE realm="foo", profile="UsernameToken"'
+ }
+ )
+ digest = res["digest"]
+ self.assertEqual("testrealm@host.com", digest["realm"])
+ self.assertEqual("auth,auth-int", digest["qop"])
+ self.assertEqual("dcd98b7102dd2f0e8b11d0f600bfb0c093", digest["nonce"])
+ self.assertEqual("5ccc069c403ebaf9f0171e9517f40e41", digest["opaque"])
+ basic = res["basic"]
+ self.assertEqual("me", basic["realm"])
+ wsse = res["wsse"]
+ self.assertEqual("foo", wsse["realm"])
+ self.assertEqual("UsernameToken", wsse["profile"])
def testParseWWWAuthenticateMultiple4(self):
- res = httplib2._parse_www_authenticate({ 'www-authenticate':
- 'Digest realm="test-real.m@host.com", qop \t=\t"\tauth,auth-int", nonce="(*)&^&$%#",opaque="5ccc069c403ebaf9f0171e9517f40e41", Basic REAlm="me", WSSE realm="foo", profile="UsernameToken"'})
- digest = res['digest']
- self.assertEqual('test-real.m@host.com', digest['realm'])
- self.assertEqual('\tauth,auth-int', digest['qop'])
- self.assertEqual('(*)&^&$%#', digest['nonce'])
+ res = httplib2._parse_www_authenticate(
+ {
+ "www-authenticate": 'Digest realm="test-real.m@host.com", qop \t=\t"\tauth,auth-int", nonce="(*)&^&$%#",opaque="5ccc069c403ebaf9f0171e9517f40e41", Basic REAlm="me", WSSE realm="foo", profile="UsernameToken"'
+ }
+ )
+ digest = res["digest"]
+ self.assertEqual("test-real.m@host.com", digest["realm"])
+ self.assertEqual("\tauth,auth-int", digest["qop"])
+ self.assertEqual("(*)&^&$%#", digest["nonce"])
def testParseWWWAuthenticateMoreQuoteCombos(self):
- res = httplib2._parse_www_authenticate({'www-authenticate':'Digest realm="myrealm", nonce="Ygk86AsKBAA=3516200d37f9a3230352fde99977bd6d472d4306", algorithm=MD5, qop="auth", stale=true'})
- digest = res['digest']
- self.assertEqual('myrealm', digest['realm'])
+ res = httplib2._parse_www_authenticate(
+ {
+ "www-authenticate": 'Digest realm="myrealm", nonce="Ygk86AsKBAA=3516200d37f9a3230352fde99977bd6d472d4306", algorithm=MD5, qop="auth", stale=true'
+ }
+ )
+ digest = res["digest"]
+ self.assertEqual("myrealm", digest["realm"])
def testParseWWWAuthenticateMalformed(self):
try:
- res = httplib2._parse_www_authenticate({'www-authenticate':'OAuth "Facebook Platform" "invalid_token" "Invalid OAuth access token."'})
- self.fail("should raise an exception")
+ res = httplib2._parse_www_authenticate(
+ {
+ "www-authenticate": 'OAuth "Facebook Platform" "invalid_token" "Invalid OAuth access token."'
+ }
+ )
+ self.fail("should raise an exception")
except httplib2.MalformedHeader:
- pass
+ pass
def testDigestObject(self):
- credentials = ('joe', 'password')
+ credentials = ("joe", "password")
host = None
- request_uri = '/projects/httplib2/test/digest/'
+ request_uri = "/projects/httplib2/test/digest/"
headers = {}
response = {
- 'www-authenticate': 'Digest realm="myrealm", nonce="Ygk86AsKBAA=3516200d37f9a3230352fde99977bd6d472d4306", algorithm=MD5, qop="auth"'
+ "www-authenticate": 'Digest realm="myrealm", '
+ 'nonce="Ygk86AsKBAA=3516200d37f9a3230352fde99977bd6d472d4306", '
+ 'algorithm=MD5, qop="auth"'
}
content = ""
- d = httplib2.DigestAuthentication(credentials, host, request_uri, headers, response, content, None)
+ d = httplib2.DigestAuthentication(
+ credentials, host, request_uri, headers, response, content, None
+ )
d.request("GET", request_uri, headers, content, cnonce="33033375ec278a46")
- our_request = "authorization: %s" % headers['authorization']
- working_request = 'authorization: Digest username="joe", realm="myrealm", nonce="Ygk86AsKBAA=3516200d37f9a3230352fde99977bd6d472d4306", uri="/projects/httplib2/test/digest/", algorithm=MD5, response="97ed129401f7cdc60e5db58a80f3ea8b", qop=auth, nc=00000001, cnonce="33033375ec278a46"'
+ our_request = "authorization: %s" % headers["authorization"]
+ working_request = (
+ 'authorization: Digest username="joe", realm="myrealm", '
+ 'nonce="Ygk86AsKBAA=3516200d37f9a3230352fde99977bd6d472d4306",'
+ ' uri="/projects/httplib2/test/digest/", algorithm=MD5, '
+ 'response="97ed129401f7cdc60e5db58a80f3ea8b", qop=auth, '
+ 'nc=00000001, cnonce="33033375ec278a46"'
+ )
self.assertEqual(our_request, working_request)
def testDigestObjectWithOpaque(self):
- credentials = ('joe', 'password')
+ credentials = ("joe", "password")
host = None
- request_uri = '/projects/httplib2/test/digest/'
+ request_uri = "/projects/httplib2/test/digest/"
headers = {}
response = {
- 'www-authenticate': 'Digest realm="myrealm", nonce="Ygk86AsKBAA=3516200d37f9a3230352fde99977bd6d472d4306", algorithm=MD5, qop="auth", opaque="atestopaque"'
+ "www-authenticate": 'Digest realm="myrealm", '
+ 'nonce="Ygk86AsKBAA=3516200d37f9a3230352fde99977bd6d472d4306", '
+ 'algorithm=MD5, qop="auth", opaque="atestopaque"'
}
content = ""
- d = httplib2.DigestAuthentication(credentials, host, request_uri, headers, response, content, None)
+ d = httplib2.DigestAuthentication(
+ credentials, host, request_uri, headers, response, content, None
+ )
d.request("GET", request_uri, headers, content, cnonce="33033375ec278a46")
- our_request = "authorization: %s" % headers['authorization']
- working_request = 'authorization: Digest username="joe", realm="myrealm", nonce="Ygk86AsKBAA=3516200d37f9a3230352fde99977bd6d472d4306", uri="/projects/httplib2/test/digest/", algorithm=MD5, response="97ed129401f7cdc60e5db58a80f3ea8b", qop=auth, nc=00000001, cnonce="33033375ec278a46", opaque="atestopaque"'
+ our_request = "authorization: %s" % headers["authorization"]
+ working_request = (
+ 'authorization: Digest username="joe", realm="myrealm", '
+ 'nonce="Ygk86AsKBAA=3516200d37f9a3230352fde99977bd6d472d4306",'
+ ' uri="/projects/httplib2/test/digest/", algorithm=MD5, '
+ 'response="97ed129401f7cdc60e5db58a80f3ea8b", qop=auth, '
+ 'nc=00000001, cnonce="33033375ec278a46", '
+ 'opaque="atestopaque"'
+ )
self.assertEqual(our_request, working_request)
def testDigestObjectStale(self):
- credentials = ('joe', 'password')
+ credentials = ("joe", "password")
host = None
- request_uri = '/projects/httplib2/test/digest/'
+ request_uri = "/projects/httplib2/test/digest/"
headers = {}
- response = httplib2.Response({ })
- response['www-authenticate'] = 'Digest realm="myrealm", nonce="Ygk86AsKBAA=3516200d37f9a3230352fde99977bd6d472d4306", algorithm=MD5, qop="auth", stale=true'
+ response = httplib2.Response({})
+ response["www-authenticate"] = (
+ 'Digest realm="myrealm", '
+ 'nonce="Ygk86AsKBAA=3516200d37f9a3230352fde99977bd6d472d4306",'
+ ' algorithm=MD5, qop="auth", stale=true'
+ )
response.status = 401
content = ""
- d = httplib2.DigestAuthentication(credentials, host, request_uri, headers, response, content, None)
+ d = httplib2.DigestAuthentication(
+ credentials, host, request_uri, headers, response, content, None
+ )
# Returns true to force a retry
- self.assertTrue( d.response(response, content) )
+ self.assertTrue(d.response(response, content))
def testDigestObjectAuthInfo(self):
- credentials = ('joe', 'password')
+ credentials = ("joe", "password")
host = None
- request_uri = '/projects/httplib2/test/digest/'
+ request_uri = "/projects/httplib2/test/digest/"
headers = {}
- response = httplib2.Response({ })
- response['www-authenticate'] = 'Digest realm="myrealm", nonce="Ygk86AsKBAA=3516200d37f9a3230352fde99977bd6d472d4306", algorithm=MD5, qop="auth", stale=true'
- response['authentication-info'] = 'nextnonce="fred"'
+ response = httplib2.Response({})
+ response["www-authenticate"] = (
+ 'Digest realm="myrealm", '
+ 'nonce="Ygk86AsKBAA=3516200d37f9a3230352fde99977bd6d472d4306",'
+ ' algorithm=MD5, qop="auth", stale=true'
+ )
+ response["authentication-info"] = 'nextnonce="fred"'
content = ""
- d = httplib2.DigestAuthentication(credentials, host, request_uri, headers, response, content, None)
+ d = httplib2.DigestAuthentication(
+ credentials, host, request_uri, headers, response, content, None
+ )
# Returns true to force a retry
- self.assertFalse( d.response(response, content) )
- self.assertEqual('fred', d.challenge['nonce'])
- self.assertEqual(1, d.challenge['nc'])
+ self.assertFalse(d.response(response, content))
+ self.assertEqual("fred", d.challenge["nonce"])
+ self.assertEqual(1, d.challenge["nc"])
def testWsseAlgorithm(self):
- digest = httplib2._wsse_username_token("d36e316282959a9ed4c89851497a717f", "2003-12-15T14:43:07Z", "taadtaadpstcsm")
+ digest = httplib2._wsse_username_token(
+ "d36e316282959a9ed4c89851497a717f", "2003-12-15T14:43:07Z", "taadtaadpstcsm"
+ )
expected = "quR/EWLAV4xLf9Zqyw4pDmfV9OY="
self.assertEqual(expected, digest)
def testEnd2End(self):
# one end to end header
- response = {'content-type': 'application/atom+xml', 'te': 'deflate'}
+ response = {"content-type": "application/atom+xml", "te": "deflate"}
end2end = httplib2._get_end2end_headers(response)
- self.assertTrue('content-type' in end2end)
- self.assertTrue('te' not in end2end)
- self.assertTrue('connection' not in end2end)
+ self.assertTrue("content-type" in end2end)
+ self.assertTrue("te" not in end2end)
+ self.assertTrue("connection" not in end2end)
# one end to end header that gets eliminated
- response = {'connection': 'content-type', 'content-type': 'application/atom+xml', 'te': 'deflate'}
+ response = {
+ "connection": "content-type",
+ "content-type": "application/atom+xml",
+ "te": "deflate",
+ }
end2end = httplib2._get_end2end_headers(response)
- self.assertTrue('content-type' not in end2end)
- self.assertTrue('te' not in end2end)
- self.assertTrue('connection' not in end2end)
+ self.assertTrue("content-type" not in end2end)
+ self.assertTrue("te" not in end2end)
+ self.assertTrue("connection" not in end2end)
# Degenerate case of no headers
response = {}
@@ -1630,7 +1912,7 @@
self.assertEquals(0, len(end2end))
# Degenerate case of connection referrring to a header not passed in
- response = {'connection': 'content-type'}
+ response = {"connection": "content-type"}
end2end = httplib2._get_end2end_headers(response)
self.assertEquals(0, len(end2end))
@@ -1644,34 +1926,33 @@
os.environ.update(self.orig_env)
def test_from_url(self):
- pi = httplib2.proxy_info_from_url('http://myproxy.example.com')
- self.assertEquals(pi.proxy_host, 'myproxy.example.com')
+ pi = httplib2.proxy_info_from_url("http://myproxy.example.com")
+ self.assertEquals(pi.proxy_host, "myproxy.example.com")
self.assertEquals(pi.proxy_port, 80)
self.assertEquals(pi.proxy_user, None)
def test_from_url_ident(self):
- pi = httplib2.proxy_info_from_url('http://zoidberg:fish@someproxy:99')
- self.assertEquals(pi.proxy_host, 'someproxy')
+ pi = httplib2.proxy_info_from_url("http://zoidberg:fish@someproxy:99")
+ self.assertEquals(pi.proxy_host, "someproxy")
self.assertEquals(pi.proxy_port, 99)
- self.assertEquals(pi.proxy_user, 'zoidberg')
- self.assertEquals(pi.proxy_pass, 'fish')
+ self.assertEquals(pi.proxy_user, "zoidberg")
+ self.assertEquals(pi.proxy_pass, "fish")
def test_from_env(self):
- os.environ['http_proxy'] = 'http://myproxy.example.com:8080'
+ os.environ["http_proxy"] = "http://myproxy.example.com:8080"
pi = httplib2.proxy_info_from_environment()
- self.assertEquals(pi.proxy_host, 'myproxy.example.com')
+ self.assertEquals(pi.proxy_host, "myproxy.example.com")
self.assertEquals(pi.proxy_port, 8080)
self.assertEquals(pi.bypass_hosts, [])
def test_from_env_no_proxy(self):
- os.environ['http_proxy'] = 'http://myproxy.example.com:80'
- os.environ['https_proxy'] = 'http://myproxy.example.com:81'
- os.environ['no_proxy'] = 'localhost,otherhost.domain.local'
- pi = httplib2.proxy_info_from_environment('https')
- self.assertEquals(pi.proxy_host, 'myproxy.example.com')
+ os.environ["http_proxy"] = "http://myproxy.example.com:80"
+ os.environ["https_proxy"] = "http://myproxy.example.com:81"
+ os.environ["no_proxy"] = "localhost,otherhost.domain.local"
+ pi = httplib2.proxy_info_from_environment("https")
+ self.assertEquals(pi.proxy_host, "myproxy.example.com")
self.assertEquals(pi.proxy_port, 81)
- self.assertEquals(pi.bypass_hosts, ['localhost',
- 'otherhost.domain.local'])
+ self.assertEquals(pi.bypass_hosts, ["localhost", "otherhost.domain.local"])
def test_from_env_none(self):
os.environ.clear()
@@ -1679,25 +1960,28 @@
self.assertEquals(pi, None)
def test_applies_to(self):
- os.environ['http_proxy'] = 'http://myproxy.example.com:80'
- os.environ['https_proxy'] = 'http://myproxy.example.com:81'
- os.environ['no_proxy'] = 'localhost,otherhost.domain.local,example.com'
+ os.environ["http_proxy"] = "http://myproxy.example.com:80"
+ os.environ["https_proxy"] = "http://myproxy.example.com:81"
+ os.environ["no_proxy"] = "localhost,otherhost.domain.local,example.com"
pi = httplib2.proxy_info_from_environment()
- self.assertFalse(pi.applies_to('localhost'))
- self.assertTrue(pi.applies_to('www.google.com'))
- self.assertFalse(pi.applies_to('www.example.com'))
+ self.assertFalse(pi.applies_to("localhost"))
+ self.assertTrue(pi.applies_to("www.google.com"))
+ self.assertFalse(pi.applies_to("www.example.com"))
def test_no_proxy_star(self):
- os.environ['http_proxy'] = 'http://myproxy.example.com:80'
- os.environ['NO_PROXY'] = '*'
+ os.environ["http_proxy"] = "http://myproxy.example.com:80"
+ os.environ["NO_PROXY"] = "*"
pi = httplib2.proxy_info_from_environment()
- for host in ('localhost', '169.254.38.192', 'www.google.com'):
+ for host in ("localhost", "169.254.38.192", "www.google.com"):
self.assertFalse(pi.applies_to(host))
def test_proxy_headers(self):
- headers = {'key0': 'val0', 'key1': 'val1'}
- pi = httplib2.ProxyInfo(httplib2.socks.PROXY_TYPE_HTTP, 'localhost', 1234, proxy_headers = headers)
+ headers = {"key0": "val0", "key1": "val1"}
+ pi = httplib2.ProxyInfo(
+ httplib2.socks.PROXY_TYPE_HTTP, "localhost", 1234, proxy_headers=headers
+ )
self.assertEquals(pi.proxy_headers, headers)
-if __name__ == '__main__':
+
+if __name__ == "__main__":
unittest.main()
diff --git a/python2/httplib2test_appengine.py b/python2/httplib2test_appengine.py
index 9fad05a..d5c5786 100755
--- a/python2/httplib2test_appengine.py
+++ b/python2/httplib2test_appengine.py
@@ -5,11 +5,12 @@
import sys
import unittest
-APP_ENGINE_PATH='/usr/local/google_appengine'
+APP_ENGINE_PATH = "/usr/local/google_appengine"
sys.path.insert(0, APP_ENGINE_PATH)
import dev_appserver
+
dev_appserver.fix_sys_path()
from google.appengine.ext import testbed
@@ -20,60 +21,65 @@
class AberrationsTest(unittest.TestCase):
+ def setUp(self):
+ self.testbed = testbed.Testbed()
+ self.testbed.activate()
+ self.testbed.init_urlfetch_stub()
- def setUp(self):
- self.testbed = testbed.Testbed()
- self.testbed.activate()
- self.testbed.init_urlfetch_stub()
+ def tearDown(self):
+ self.testbed.deactivate()
- def tearDown(self):
- self.testbed.deactivate()
+ @mock.patch.dict("os.environ", {"SERVER_SOFTWARE": ""})
+ def testConnectionInit(self):
+ global httplib2
+ import httplib2
- @mock.patch.dict('os.environ', {'SERVER_SOFTWARE': ''})
- def testConnectionInit(self):
- global httplib2
- import httplib2
- self.assertNotEqual(
- httplib2.SCHEME_TO_CONNECTION['https'], httplib2.AppEngineHttpsConnection)
- self.assertNotEqual(
- httplib2.SCHEME_TO_CONNECTION['http'], httplib2.AppEngineHttpConnection)
- del globals()['httplib2']
+ self.assertNotEqual(
+ httplib2.SCHEME_TO_CONNECTION["https"], httplib2.AppEngineHttpsConnection
+ )
+ self.assertNotEqual(
+ httplib2.SCHEME_TO_CONNECTION["http"], httplib2.AppEngineHttpConnection
+ )
+ del globals()["httplib2"]
class AppEngineHttpTest(unittest.TestCase):
+ def setUp(self):
+ self.testbed = testbed.Testbed()
+ self.testbed.activate()
+ self.testbed.init_urlfetch_stub()
+ global httplib2
+ import httplib2
- def setUp(self):
- self.testbed = testbed.Testbed()
- self.testbed.activate()
- self.testbed.init_urlfetch_stub()
- global httplib2
- import httplib2
- reload(httplib2)
+ reload(httplib2)
- def tearDown(self):
- self.testbed.deactivate()
- del globals()['httplib2']
+ def tearDown(self):
+ self.testbed.deactivate()
+ del globals()["httplib2"]
- def testConnectionInit(self):
- self.assertEqual(
- httplib2.SCHEME_TO_CONNECTION['https'], httplib2.AppEngineHttpsConnection)
- self.assertEqual(
- httplib2.SCHEME_TO_CONNECTION['http'], httplib2.AppEngineHttpConnection)
+ def testConnectionInit(self):
+ self.assertEqual(
+ httplib2.SCHEME_TO_CONNECTION["https"], httplib2.AppEngineHttpsConnection
+ )
+ self.assertEqual(
+ httplib2.SCHEME_TO_CONNECTION["http"], httplib2.AppEngineHttpConnection
+ )
- def testGet(self):
- http = httplib2.Http()
- response, content = http.request("http://www.google.com")
- self.assertEqual(httplib2.SCHEME_TO_CONNECTION['https'],
- httplib2.AppEngineHttpsConnection)
- self.assertEquals(1, len(http.connections))
- self.assertEquals(response.status, 200)
- self.assertEquals(response['status'], '200')
+ def testGet(self):
+ http = httplib2.Http()
+ response, content = http.request("http://www.google.com")
+ self.assertEqual(
+ httplib2.SCHEME_TO_CONNECTION["https"], httplib2.AppEngineHttpsConnection
+ )
+ self.assertEquals(1, len(http.connections))
+ self.assertEquals(response.status, 200)
+ self.assertEquals(response["status"], "200")
- def testProxyInfoIgnored(self):
- http = httplib2.Http(proxy_info=mock.MagicMock())
- response, content = http.request("http://www.google.com")
- self.assertEquals(response.status, 200)
+ def testProxyInfoIgnored(self):
+ http = httplib2.Http(proxy_info=mock.MagicMock())
+ response, content = http.request("http://www.google.com")
+ self.assertEquals(response.status, 200)
-if __name__ == '__main__':
+if __name__ == "__main__":
unittest.main()
diff --git a/python2/ssl_protocol_test.py b/python2/ssl_protocol_test.py
index bac84c0..fedf7cf 100755
--- a/python2/ssl_protocol_test.py
+++ b/python2/ssl_protocol_test.py
@@ -8,50 +8,56 @@
class TestSslProtocol(unittest.TestCase):
+ def testSslCertValidationWithInvalidCaCert(self):
+ if sys.version_info >= (2, 6):
+ http = httplib2.Http(ca_certs="/nosuchfile")
+ if sys.version_info >= (2, 7):
+ with self.assertRaises(IOError):
+ http.request("https://www.google.com/", "GET")
+ else:
+ self.assertRaises(
+ ssl.SSLError, http.request, "https://www.google.com/", "GET"
+ )
- def testSslCertValidationWithInvalidCaCert(self):
- if sys.version_info >= (2, 6):
- http = httplib2.Http(ca_certs='/nosuchfile')
- if sys.version_info >= (2, 7):
- with self.assertRaises(IOError):
- http.request('https://www.google.com/', 'GET')
- else:
- self.assertRaises(
- ssl.SSLError, http.request, 'https://www.google.com/', 'GET')
+ def testSslCertValidationWithSelfSignedCaCert(self):
+ if sys.version_info >= (2, 7):
+ other_ca_certs = os.path.join(
+ os.path.dirname(os.path.abspath(httplib2.__file__)),
+ "test",
+ "other_cacerts.txt",
+ )
+ http = httplib2.Http(ca_certs=other_ca_certs)
+ if sys.platform != "darwin":
+ with self.assertRaises(httplib2.SSLHandshakeError):
+ http.request("https://www.google.com/", "GET")
- def testSslCertValidationWithSelfSignedCaCert(self):
- if sys.version_info >= (2, 7):
- other_ca_certs = os.path.join(
- os.path.dirname(os.path.abspath(httplib2.__file__ )), 'test',
- 'other_cacerts.txt')
- http = httplib2.Http(ca_certs=other_ca_certs)
- if sys.platform != 'darwin':
- with self.assertRaises(httplib2.SSLHandshakeError):
- http.request('https://www.google.com/', 'GET')
+ def testSslProtocolTlsV1AndShouldPass(self):
+ http = httplib2.Http(ssl_version=ssl.PROTOCOL_TLSv1)
+ urls = [
+ "https://www.amazon.com",
+ "https://www.apple.com",
+ "https://www.twitter.com",
+ ]
+ for url in urls:
+ if sys.version_info >= (2, 7):
+ self.assertIsNotNone(http.request(uri=url))
- def testSslProtocolTlsV1AndShouldPass(self):
- http = httplib2.Http(ssl_version=ssl.PROTOCOL_TLSv1)
- urls = ['https://www.amazon.com',
- 'https://www.apple.com',
- 'https://www.twitter.com']
- for url in urls:
- if sys.version_info >= (2, 7):
- self.assertIsNotNone(http.request(uri=url))
-
- def testSslProtocolV3AndShouldFailDueToPoodle(self):
- http = httplib2.Http(ssl_version=ssl.PROTOCOL_SSLv3)
- urls = ['https://www.amazon.com',
- 'https://www.apple.com',
- 'https://www.twitter.com']
- for url in urls:
- if sys.version_info >= (2, 7):
- with self.assertRaises(httplib2.SSLHandshakeError):
- http.request(url)
- try:
- http.request(url)
- except httplib2.SSLHandshakeError as e:
- self.assertTrue('sslv3 alert handshake failure' in str(e))
+ def testSslProtocolV3AndShouldFailDueToPoodle(self):
+ http = httplib2.Http(ssl_version=ssl.PROTOCOL_SSLv3)
+ urls = [
+ "https://www.amazon.com",
+ "https://www.apple.com",
+ "https://www.twitter.com",
+ ]
+ for url in urls:
+ if sys.version_info >= (2, 7):
+ with self.assertRaises(httplib2.SSLHandshakeError):
+ http.request(url)
+ try:
+ http.request(url)
+ except httplib2.SSLHandshakeError as e:
+ self.assertTrue("sslv3 alert handshake failure" in str(e))
-if __name__ == '__main__':
- unittest.main()
+if __name__ == "__main__":
+ unittest.main()