Autoformat py files using Black (#105)
* Autoformat all py files using Black
* Fix lint errors
* Fix indentation errors (https://travis-ci.org/httplib2/httplib2/jobs/408136309)
* Refactor three test cases and exclude them on on Travis py27/pypy
diff --git a/python3/.DS_Store b/python3/.DS_Store
new file mode 100644
index 0000000..054d822
--- /dev/null
+++ b/python3/.DS_Store
Binary files differ
diff --git a/python3/httplib2/__init__.py b/python3/httplib2/__init__.py
index 44247b7..5d9adec 100644
--- a/python3/httplib2/__init__.py
+++ b/python3/httplib2/__init__.py
@@ -1,54 +1,46 @@
-"""
-httplib2
-
-A caching http interface that supports ETags and gzip
-to conserve bandwidth.
-
-Requires Python 3.0 or later
-
-Changelog:
-2009-05-28, Pilgrim: ported to Python 3
-2007-08-18, Rick: Modified so it's able to use a socks proxy if needed.
-
-"""
+"""Small, fast HTTP client library for Python."""
__author__ = "Joe Gregorio (joe@bitworking.org)"
__copyright__ = "Copyright 2006, Joe Gregorio"
-__contributors__ = ["Thomas Broyer (t.broyer@ltgt.net)",
+__contributors__ = [
+ "Thomas Broyer (t.broyer@ltgt.net)",
"James Antill",
"Xavier Verges Farrero",
"Jonathan Feinberg",
"Blair Zajac",
"Sam Ruby",
"Louis Nyffenegger",
- "Mark Pilgrim"]
+ "Mark Pilgrim",
+ "Alex Yu",
+]
__license__ = "MIT"
-__version__ = '0.11.3'
+__version__ = "0.11.3"
-import re
-import sys
-import email
-import email.utils
-import email.message
-import email.feedparser
-import io
-import gzip
-import zlib
-import http.client
-import urllib.parse
import base64
-import os
-import copy
import calendar
-import time
-import random
+import copy
+import email
+import email.feedparser
+from email import header
+import email.message
+import email.utils
import errno
-from hashlib import sha1 as _sha, md5 as _md5
-import hmac
from gettext import gettext as _
+import gzip
+from hashlib import md5 as _md5
+from hashlib import sha1 as _sha
+import hmac
+import http.client
+import io
+import os
+import random
+import re
import socket
import ssl
-
+import sys
+import time
+import urllib.parse
+import zlib
try:
import socks
@@ -58,17 +50,26 @@
from . import socks
from .iri2uri import iri2uri
+
def has_timeout(timeout):
- if hasattr(socket, '_GLOBAL_DEFAULT_TIMEOUT'):
- return (timeout is not None and timeout is not socket._GLOBAL_DEFAULT_TIMEOUT)
- return (timeout is not None)
+ if hasattr(socket, "_GLOBAL_DEFAULT_TIMEOUT"):
+ return timeout is not None and timeout is not socket._GLOBAL_DEFAULT_TIMEOUT
+ return timeout is not None
-__all__ = ['Http', 'Response', 'ProxyInfo', 'HttpLib2Error',
- 'RedirectMissingLocation', 'RedirectLimit',
- 'FailedToDecompressContent', 'UnimplementedDigestAuthOptionError',
- 'UnimplementedHmacDigestAuthOptionError',
- 'debuglevel', 'RETRIES']
+__all__ = [
+ "debuglevel",
+ "FailedToDecompressContent",
+ "Http",
+ "HttpLib2Error",
+ "ProxyInfo",
+ "RedirectLimit",
+ "RedirectMissingLocation",
+ "Response",
+ "RETRIES",
+ "UnimplementedDigestAuthOptionError",
+ "UnimplementedHmacDigestAuthOptionError",
+]
# The httplib debug level, set to a non-zero value to get debug output
debuglevel = 0
@@ -76,8 +77,11 @@
# A request will be tried 'RETRIES' times if it fails at the socket/connection level.
RETRIES = 2
+
# All exceptions raised here derive from HttpLib2Error
-class HttpLib2Error(Exception): pass
+class HttpLib2Error(Exception):
+ pass
+
# Some exceptions can be caught and optionally
# be turned back into responses.
@@ -87,17 +91,41 @@
self.content = content
HttpLib2Error.__init__(self, desc)
-class RedirectMissingLocation(HttpLib2ErrorWithResponse): pass
-class RedirectLimit(HttpLib2ErrorWithResponse): pass
-class FailedToDecompressContent(HttpLib2ErrorWithResponse): pass
-class UnimplementedDigestAuthOptionError(HttpLib2ErrorWithResponse): pass
-class UnimplementedHmacDigestAuthOptionError(HttpLib2ErrorWithResponse): pass
-class MalformedHeader(HttpLib2Error): pass
-class RelativeURIError(HttpLib2Error): pass
-class ServerNotFoundError(HttpLib2Error): pass
+class RedirectMissingLocation(HttpLib2ErrorWithResponse):
+ pass
-class ProxiesUnavailableError(HttpLib2Error): pass
+
+class RedirectLimit(HttpLib2ErrorWithResponse):
+ pass
+
+
+class FailedToDecompressContent(HttpLib2ErrorWithResponse):
+ pass
+
+
+class UnimplementedDigestAuthOptionError(HttpLib2ErrorWithResponse):
+ pass
+
+
+class UnimplementedHmacDigestAuthOptionError(HttpLib2ErrorWithResponse):
+ pass
+
+
+class MalformedHeader(HttpLib2Error):
+ pass
+
+
+class RelativeURIError(HttpLib2Error):
+ pass
+
+
+class ServerNotFoundError(HttpLib2Error):
+ pass
+
+
+class ProxiesUnavailableError(HttpLib2Error):
+ pass
# Open Items:
@@ -114,7 +142,6 @@
# Does not handle Cache-Control: max-stale
# Does not use Age: headers when calculating cache freshness.
-
# The number of redirections to follow before giving up.
# Note that only GET redirects are automatically followed.
# Will also honor 301 requests by saving that info and never
@@ -122,31 +149,45 @@
DEFAULT_MAX_REDIRECTS = 5
# Which headers are hop-by-hop headers by default
-HOP_BY_HOP = ['connection', 'keep-alive', 'proxy-authenticate', 'proxy-authorization', 'te', 'trailers', 'transfer-encoding', 'upgrade']
+HOP_BY_HOP = [
+ "connection",
+ "keep-alive",
+ "proxy-authenticate",
+ "proxy-authorization",
+ "te",
+ "trailers",
+ "transfer-encoding",
+ "upgrade",
+]
# Default CA certificates file bundled with httplib2.
-CA_CERTS = os.path.join(
- os.path.dirname(os.path.abspath(__file__ )), "cacerts.txt")
+CA_CERTS = os.path.join(os.path.dirname(os.path.abspath(__file__)), "cacerts.txt")
# PROTOCOL_TLS is python 3.5.3+. PROTOCOL_SSLv23 is deprecated.
# Both PROTOCOL_TLS and PROTOCOL_SSLv23 are equivalent and means:
# > Selects the highest protocol version that both the client and server support.
# > Despite the name, this option can select “TLS” protocols as well as “SSL”.
# source: https://docs.python.org/3.5/library/ssl.html#ssl.PROTOCOL_TLS
-DEFAULT_TLS_VERSION = getattr(ssl, 'PROTOCOL_TLS', None) or getattr(ssl, 'PROTOCOL_SSLv23')
+DEFAULT_TLS_VERSION = getattr(ssl, "PROTOCOL_TLS", None) or getattr(
+ ssl, "PROTOCOL_SSLv23"
+)
-def _build_ssl_context(disable_ssl_certificate_validation, ca_certs, cert_file=None, key_file=None):
- if not hasattr(ssl, 'SSLContext'):
+def _build_ssl_context(
+ disable_ssl_certificate_validation, ca_certs, cert_file=None, key_file=None
+):
+ if not hasattr(ssl, "SSLContext"):
raise RuntimeError("httplib2 requires Python 3.2+ for ssl.SSLContext")
context = ssl.SSLContext(DEFAULT_TLS_VERSION)
- context.verify_mode = ssl.CERT_NONE if disable_ssl_certificate_validation else ssl.CERT_REQUIRED
+ context.verify_mode = (
+ ssl.CERT_NONE if disable_ssl_certificate_validation else ssl.CERT_REQUIRED
+ )
# check_hostname requires python 3.4+
# we will perform the equivalent in HTTPSConnectionWithTimeout.connect() by calling ssl.match_hostname
# if check_hostname is not supported.
- if hasattr(context, 'check_hostname'):
+ if hasattr(context, "check_hostname"):
context.check_hostname = not disable_ssl_certificate_validation
context.load_verify_locations(ca_certs)
@@ -156,13 +197,16 @@
return context
+
def _get_end2end_headers(response):
hopbyhop = list(HOP_BY_HOP)
- hopbyhop.extend([x.strip() for x in response.get('connection', '').split(',')])
+ hopbyhop.extend([x.strip() for x in response.get("connection", "").split(",")])
return [header for header in list(response.keys()) if header not in hopbyhop]
+
URI = re.compile(r"^(([^:/?#]+):)?(//([^/?#]*))?([^?#]*)(\?([^#]*))?(#(.*))?")
+
def parse_uri(uri):
"""Parses a URI using the regex given in Appendix B of RFC 3986.
@@ -171,6 +215,7 @@
groups = URI.match(uri).groups()
return (groups[1], groups[3], groups[4], groups[6], groups[8])
+
def urlnorm(uri):
(scheme, authority, path, query, fragment) = parse_uri(uri)
if not scheme or not authority:
@@ -188,8 +233,8 @@
# Cache filename construction (original borrowed from Venus http://intertwingly.net/code/venus/)
-re_url_scheme = re.compile(r'^\w+://')
-re_unsafe = re.compile(r'[^\w\-_.()=!]+', re.ASCII)
+re_url_scheme = re.compile(r"^\w+://")
+re_unsafe = re.compile(r"[^\w\-_.()=!]+", re.ASCII)
def safename(filename):
@@ -199,12 +244,12 @@
"""
if isinstance(filename, bytes):
filename_bytes = filename
- filename = filename.decode('utf-8')
+ filename = filename.decode("utf-8")
else:
- filename_bytes = filename.encode('utf-8')
+ filename_bytes = filename.encode("utf-8")
filemd5 = _md5(filename_bytes).hexdigest()
- filename = re_url_scheme.sub('', filename)
- filename = re_unsafe.sub('', filename)
+ filename = re_url_scheme.sub("", filename)
+ filename = re_unsafe.sub("", filename)
# limit length of filename (vital for Windows)
# https://github.com/httplib2/httplib2/pull/74
@@ -213,27 +258,46 @@
# Thus max safe filename x = 93 chars. Let it be 90 to make a round sum:
filename = filename[:90]
- return ','.join((filename, filemd5))
+ return ",".join((filename, filemd5))
-NORMALIZE_SPACE = re.compile(r'(?:\r\n)?[ \t]+')
+NORMALIZE_SPACE = re.compile(r"(?:\r\n)?[ \t]+")
+
+
def _normalize_headers(headers):
- return dict([ (_convert_byte_str(key).lower(), NORMALIZE_SPACE.sub(_convert_byte_str(value), ' ').strip()) for (key, value) in headers.items()])
+ return dict(
+ [
+ (
+ _convert_byte_str(key).lower(),
+ NORMALIZE_SPACE.sub(_convert_byte_str(value), " ").strip(),
+ )
+ for (key, value) in headers.items()
+ ]
+ )
+
def _convert_byte_str(s):
if not isinstance(s, str):
- return str(s, 'utf-8')
+ return str(s, "utf-8")
return s
+
def _parse_cache_control(headers):
retval = {}
- if 'cache-control' in headers:
- parts = headers['cache-control'].split(',')
- parts_with_args = [tuple([x.strip().lower() for x in part.split("=", 1)]) for part in parts if -1 != part.find("=")]
- parts_wo_args = [(name.strip().lower(), 1) for name in parts if -1 == name.find("=")]
+ if "cache-control" in headers:
+ parts = headers["cache-control"].split(",")
+ parts_with_args = [
+ tuple([x.strip().lower() for x in part.split("=", 1)])
+ for part in parts
+ if -1 != part.find("=")
+ ]
+ parts_wo_args = [
+ (name.strip().lower(), 1) for name in parts if -1 == name.find("=")
+ ]
retval = dict(parts_with_args + parts_wo_args)
return retval
+
# Whether to use a strict mode to parse WWW-Authenticate headers
# Might lead to bad results in case of ill-formed header value,
# so disabled by default, falling back to relaxed parsing.
@@ -245,21 +309,29 @@
# "(?:[^\0-\x08\x0A-\x1f\x7f-\xff\\\"]|\\[\0-\x7f])*?" matches a "quoted-string" as defined by HTTP, when LWS have already been replaced by a single space
# Actually, as an auth-param value can be either a token or a quoted-string, they are combined in a single pattern which matches both:
# \"?((?<=\")(?:[^\0-\x1f\x7f-\xff\\\"]|\\[\0-\x7f])*?(?=\")|(?<!\")[^\0-\x08\x0A-\x1f\x7f-\xff()<>@,;:\\\"/[\]?={} \t]+(?!\"))\"?
-WWW_AUTH_STRICT = re.compile(r"^(?:\s*(?:,\s*)?([^\0-\x1f\x7f-\xff()<>@,;:\\\"/[\]?={} \t]+)\s*=\s*\"?((?<=\")(?:[^\0-\x08\x0A-\x1f\x7f-\xff\\\"]|\\[\0-\x7f])*?(?=\")|(?<!\")[^\0-\x1f\x7f-\xff()<>@,;:\\\"/[\]?={} \t]+(?!\"))\"?)(.*)$")
-WWW_AUTH_RELAXED = re.compile(r"^(?:\s*(?:,\s*)?([^ \t\r\n=]+)\s*=\s*\"?((?<=\")(?:[^\\\"]|\\.)*?(?=\")|(?<!\")[^ \t\r\n,]+(?!\"))\"?)(.*)$")
-UNQUOTE_PAIRS = re.compile(r'\\(.)')
-def _parse_www_authenticate(headers, headername='www-authenticate'):
+WWW_AUTH_STRICT = re.compile(
+ r"^(?:\s*(?:,\s*)?([^\0-\x1f\x7f-\xff()<>@,;:\\\"/[\]?={} \t]+)\s*=\s*\"?((?<=\")(?:[^\0-\x08\x0A-\x1f\x7f-\xff\\\"]|\\[\0-\x7f])*?(?=\")|(?<!\")[^\0-\x1f\x7f-\xff()<>@,;:\\\"/[\]?={} \t]+(?!\"))\"?)(.*)$"
+)
+WWW_AUTH_RELAXED = re.compile(
+ r"^(?:\s*(?:,\s*)?([^ \t\r\n=]+)\s*=\s*\"?((?<=\")(?:[^\\\"]|\\.)*?(?=\")|(?<!\")[^ \t\r\n,]+(?!\"))\"?)(.*)$"
+)
+UNQUOTE_PAIRS = re.compile(r"\\(.)")
+
+
+def _parse_www_authenticate(headers, headername="www-authenticate"):
"""Returns a dictionary of dictionaries, one dict
per auth_scheme."""
retval = {}
if headername in headers:
try:
authenticate = headers[headername].strip()
- www_auth = USE_WWW_AUTH_STRICT_PARSING and WWW_AUTH_STRICT or WWW_AUTH_RELAXED
+ www_auth = (
+ USE_WWW_AUTH_STRICT_PARSING and WWW_AUTH_STRICT or WWW_AUTH_RELAXED
+ )
while authenticate:
# Break off the scheme at the beginning of the line
- if headername == 'authentication-info':
- (auth_scheme, the_rest) = ('digest', authenticate)
+ if headername == "authentication-info":
+ (auth_scheme, the_rest) = ("digest", authenticate)
else:
(auth_scheme, the_rest) = authenticate.split(" ", 1)
# Now loop over all the key value pairs that come after the scheme,
@@ -269,7 +341,9 @@
while match:
if match and len(match.groups()) == 3:
(key, value, the_rest) = match.groups()
- auth_params[key.lower()] = UNQUOTE_PAIRS.sub(r'\1', value) # '\\'.join([x.replace('\\', '') for x in value.split('\\\\')])
+ auth_params[key.lower()] = UNQUOTE_PAIRS.sub(
+ r"\1", value
+ ) # '\\'.join([x.replace('\\', '') for x in value.split('\\\\')])
match = www_auth.search(the_rest)
retval[auth_scheme.lower()] = auth_params
authenticate = the_rest.strip()
@@ -310,41 +384,44 @@
cc = _parse_cache_control(request_headers)
cc_response = _parse_cache_control(response_headers)
- if 'pragma' in request_headers and request_headers['pragma'].lower().find('no-cache') != -1:
+ if (
+ "pragma" in request_headers
+ and request_headers["pragma"].lower().find("no-cache") != -1
+ ):
retval = "TRANSPARENT"
- if 'cache-control' not in request_headers:
- request_headers['cache-control'] = 'no-cache'
- elif 'no-cache' in cc:
+ if "cache-control" not in request_headers:
+ request_headers["cache-control"] = "no-cache"
+ elif "no-cache" in cc:
retval = "TRANSPARENT"
- elif 'no-cache' in cc_response:
+ elif "no-cache" in cc_response:
retval = "STALE"
- elif 'only-if-cached' in cc:
+ elif "only-if-cached" in cc:
retval = "FRESH"
- elif 'date' in response_headers:
- date = calendar.timegm(email.utils.parsedate_tz(response_headers['date']))
+ elif "date" in response_headers:
+ date = calendar.timegm(email.utils.parsedate_tz(response_headers["date"]))
now = time.time()
current_age = max(0, now - date)
- if 'max-age' in cc_response:
+ if "max-age" in cc_response:
try:
- freshness_lifetime = int(cc_response['max-age'])
+ freshness_lifetime = int(cc_response["max-age"])
except ValueError:
freshness_lifetime = 0
- elif 'expires' in response_headers:
- expires = email.utils.parsedate_tz(response_headers['expires'])
+ elif "expires" in response_headers:
+ expires = email.utils.parsedate_tz(response_headers["expires"])
if None == expires:
freshness_lifetime = 0
else:
freshness_lifetime = max(0, calendar.timegm(expires) - date)
else:
freshness_lifetime = 0
- if 'max-age' in cc:
+ if "max-age" in cc:
try:
- freshness_lifetime = int(cc['max-age'])
+ freshness_lifetime = int(cc["max-age"])
except ValueError:
freshness_lifetime = 0
- if 'min-fresh' in cc:
+ if "min-fresh" in cc:
try:
- min_fresh = int(cc['min-fresh'])
+ min_fresh = int(cc["min-fresh"])
except ValueError:
min_fresh = 0
current_age += min_fresh
@@ -352,60 +429,69 @@
retval = "FRESH"
return retval
+
def _decompressContent(response, new_content):
content = new_content
try:
- encoding = response.get('content-encoding', None)
- if encoding in ['gzip', 'deflate']:
- if encoding == 'gzip':
+ encoding = response.get("content-encoding", None)
+ if encoding in ["gzip", "deflate"]:
+ if encoding == "gzip":
content = gzip.GzipFile(fileobj=io.BytesIO(new_content)).read()
- if encoding == 'deflate':
+ if encoding == "deflate":
content = zlib.decompress(content, -zlib.MAX_WBITS)
- response['content-length'] = str(len(content))
+ response["content-length"] = str(len(content))
# Record the historical presence of the encoding in a way the won't interfere.
- response['-content-encoding'] = response['content-encoding']
- del response['content-encoding']
+ response["-content-encoding"] = response["content-encoding"]
+ del response["content-encoding"]
except (IOError, zlib.error):
content = ""
- raise FailedToDecompressContent(_("Content purported to be compressed with %s but failed to decompress.") % response.get('content-encoding'), response, content)
+ raise FailedToDecompressContent(
+ _("Content purported to be compressed with %s but failed to decompress.")
+ % response.get("content-encoding"),
+ response,
+ content,
+ )
return content
+
def _bind_write_headers(msg):
- from email.header import Header
- def _write_headers(self):
- # Self refers to the Generator object
- for h, v in msg.items():
- print('%s:' % h, end=' ', file=self._fp)
- if isinstance(v, Header):
- print(v.encode(maxlinelen=self._maxheaderlen), file=self._fp)
- else:
- # Header's got lots of smarts, so use it.
- header = Header(v, maxlinelen=self._maxheaderlen, charset='utf-8',
- header_name=h)
- print(header.encode(), file=self._fp)
- # A blank line always separates headers from body
- print(file=self._fp)
- return _write_headers
+ def _write_headers(self):
+ # Self refers to the Generator object.
+ for h, v in msg.items():
+ print("%s:" % h, end=" ", file=self._fp)
+ if isinstance(v, header.Header):
+ print(v.encode(maxlinelen=self._maxheaderlen), file=self._fp)
+ else:
+ # email.Header got lots of smarts, so use it.
+ headers = header.Header(
+ v, maxlinelen=self._maxheaderlen, charset="utf-8", header_name=h
+ )
+ print(headers.encode(), file=self._fp)
+ # A blank line always separates headers from body.
+ print(file=self._fp)
+
+ return _write_headers
+
def _updateCache(request_headers, response_headers, content, cache, cachekey):
if cachekey:
cc = _parse_cache_control(request_headers)
cc_response = _parse_cache_control(response_headers)
- if 'no-store' in cc or 'no-store' in cc_response:
+ if "no-store" in cc or "no-store" in cc_response:
cache.delete(cachekey)
else:
info = email.message.Message()
for key, value in response_headers.items():
- if key not in ['status','content-encoding','transfer-encoding']:
+ if key not in ["status", "content-encoding", "transfer-encoding"]:
info[key] = value
# Add annotations to the cache to indicate what headers
# are variant for this request.
- vary = response_headers.get('vary', None)
+ vary = response_headers.get("vary", None)
if vary:
- vary_headers = vary.lower().replace(' ', '').split(',')
+ vary_headers = vary.lower().replace(" ", "").split(",")
for header in vary_headers:
- key = '-varied-%s' % header
+ key = "-varied-%s" % header
try:
info[key] = request_headers[header]
except KeyError:
@@ -415,25 +501,36 @@
if status == 304:
status = 200
- status_header = 'status: %d\r\n' % status
+ status_header = "status: %d\r\n" % status
try:
header_str = info.as_string()
except UnicodeEncodeError:
- setattr(info, '_write_headers', _bind_write_headers(info))
+ setattr(info, "_write_headers", _bind_write_headers(info))
header_str = info.as_string()
header_str = re.sub("\r(?!\n)|(?<!\r)\n", "\r\n", header_str)
- text = b"".join([status_header.encode('utf-8'), header_str.encode('utf-8'), content])
+ text = b"".join(
+ [status_header.encode("utf-8"), header_str.encode("utf-8"), content]
+ )
cache.set(cachekey, text)
+
def _cnonce():
- dig = _md5(("%s:%s" % (time.ctime(), ["0123456789"[random.randrange(0, 9)] for i in range(20)])).encode('utf-8')).hexdigest()
+ dig = _md5(
+ (
+ "%s:%s"
+ % (time.ctime(), ["0123456789"[random.randrange(0, 9)] for i in range(20)])
+ ).encode("utf-8")
+ ).hexdigest()
return dig[:16]
+
def _wsse_username_token(cnonce, iso_now, password):
- return base64.b64encode(_sha(("%s%s%s" % (cnonce, iso_now, password)).encode('utf-8')).digest()).strip()
+ return base64.b64encode(
+ _sha(("%s%s%s" % (cnonce, iso_now, password)).encode("utf-8")).digest()
+ ).strip()
# For credentials we need two things, first
@@ -444,8 +541,11 @@
# So we also need each Auth instance to be able to tell us
# how close to the 'top' it is.
+
class Authentication(object):
- def __init__(self, credentials, host, request_uri, headers, response, content, http):
+ def __init__(
+ self, credentials, host, request_uri, headers, response, content, http
+ ):
(scheme, authority, path, query, fragment) = parse_uri(request_uri)
self.path = path
self.host = host
@@ -454,7 +554,7 @@
def depth(self, request_uri):
(scheme, authority, path, query, fragment) = parse_uri(request_uri)
- return request_uri[len(self.path):].count("/")
+ return request_uri[len(self.path) :].count("/")
def inscope(self, host, request_uri):
# XXX Should we normalize the request_uri?
@@ -499,105 +599,169 @@
class BasicAuthentication(Authentication):
- def __init__(self, credentials, host, request_uri, headers, response, content, http):
- Authentication.__init__(self, credentials, host, request_uri, headers, response, content, http)
+ def __init__(
+ self, credentials, host, request_uri, headers, response, content, http
+ ):
+ Authentication.__init__(
+ self, credentials, host, request_uri, headers, response, content, http
+ )
def request(self, method, request_uri, headers, content):
"""Modify the request headers to add the appropriate
Authorization header."""
- headers['authorization'] = 'Basic ' + base64.b64encode(("%s:%s" % self.credentials).encode('utf-8')).strip().decode('utf-8')
+ headers["authorization"] = "Basic " + base64.b64encode(
+ ("%s:%s" % self.credentials).encode("utf-8")
+ ).strip().decode("utf-8")
class DigestAuthentication(Authentication):
"""Only do qop='auth' and MD5, since that
is all Apache currently implements"""
- def __init__(self, credentials, host, request_uri, headers, response, content, http):
- Authentication.__init__(self, credentials, host, request_uri, headers, response, content, http)
- challenge = _parse_www_authenticate(response, 'www-authenticate')
- self.challenge = challenge['digest']
- qop = self.challenge.get('qop', 'auth')
- self.challenge['qop'] = ('auth' in [x.strip() for x in qop.split()]) and 'auth' or None
- if self.challenge['qop'] is None:
- raise UnimplementedDigestAuthOptionError( _("Unsupported value for qop: %s." % qop))
- self.challenge['algorithm'] = self.challenge.get('algorithm', 'MD5').upper()
- if self.challenge['algorithm'] != 'MD5':
- raise UnimplementedDigestAuthOptionError( _("Unsupported value for algorithm: %s." % self.challenge['algorithm']))
- self.A1 = "".join([self.credentials[0], ":", self.challenge['realm'], ":", self.credentials[1]])
- self.challenge['nc'] = 1
- def request(self, method, request_uri, headers, content, cnonce = None):
+ def __init__(
+ self, credentials, host, request_uri, headers, response, content, http
+ ):
+ Authentication.__init__(
+ self, credentials, host, request_uri, headers, response, content, http
+ )
+ challenge = _parse_www_authenticate(response, "www-authenticate")
+ self.challenge = challenge["digest"]
+ qop = self.challenge.get("qop", "auth")
+ self.challenge["qop"] = (
+ ("auth" in [x.strip() for x in qop.split()]) and "auth" or None
+ )
+ if self.challenge["qop"] is None:
+ raise UnimplementedDigestAuthOptionError(
+ _("Unsupported value for qop: %s." % qop)
+ )
+ self.challenge["algorithm"] = self.challenge.get("algorithm", "MD5").upper()
+ if self.challenge["algorithm"] != "MD5":
+ raise UnimplementedDigestAuthOptionError(
+ _("Unsupported value for algorithm: %s." % self.challenge["algorithm"])
+ )
+ self.A1 = "".join(
+ [
+ self.credentials[0],
+ ":",
+ self.challenge["realm"],
+ ":",
+ self.credentials[1],
+ ]
+ )
+ self.challenge["nc"] = 1
+
+ def request(self, method, request_uri, headers, content, cnonce=None):
"""Modify the request headers"""
- H = lambda x: _md5(x.encode('utf-8')).hexdigest()
+ H = lambda x: _md5(x.encode("utf-8")).hexdigest()
KD = lambda s, d: H("%s:%s" % (s, d))
A2 = "".join([method, ":", request_uri])
- self.challenge['cnonce'] = cnonce or _cnonce()
- request_digest = '"%s"' % KD(H(self.A1), "%s:%s:%s:%s:%s" % (
- self.challenge['nonce'],
- '%08x' % self.challenge['nc'],
- self.challenge['cnonce'],
- self.challenge['qop'], H(A2)))
- headers['authorization'] = 'Digest username="%s", realm="%s", nonce="%s", uri="%s", algorithm=%s, response=%s, qop=%s, nc=%08x, cnonce="%s"' % (
- self.credentials[0],
- self.challenge['realm'],
- self.challenge['nonce'],
- request_uri,
- self.challenge['algorithm'],
- request_digest,
- self.challenge['qop'],
- self.challenge['nc'],
- self.challenge['cnonce'])
- if self.challenge.get('opaque'):
- headers['authorization'] += ', opaque="%s"' % self.challenge['opaque']
- self.challenge['nc'] += 1
+ self.challenge["cnonce"] = cnonce or _cnonce()
+ request_digest = '"%s"' % KD(
+ H(self.A1),
+ "%s:%s:%s:%s:%s"
+ % (
+ self.challenge["nonce"],
+ "%08x" % self.challenge["nc"],
+ self.challenge["cnonce"],
+ self.challenge["qop"],
+ H(A2),
+ ),
+ )
+ headers["authorization"] = (
+ 'Digest username="%s", realm="%s", nonce="%s", '
+ 'uri="%s", algorithm=%s, response=%s, qop=%s, '
+ 'nc=%08x, cnonce="%s"'
+ ) % (
+ self.credentials[0],
+ self.challenge["realm"],
+ self.challenge["nonce"],
+ request_uri,
+ self.challenge["algorithm"],
+ request_digest,
+ self.challenge["qop"],
+ self.challenge["nc"],
+ self.challenge["cnonce"],
+ )
+ if self.challenge.get("opaque"):
+ headers["authorization"] += ', opaque="%s"' % self.challenge["opaque"]
+ self.challenge["nc"] += 1
def response(self, response, content):
- if 'authentication-info' not in response:
- challenge = _parse_www_authenticate(response, 'www-authenticate').get('digest', {})
- if 'true' == challenge.get('stale'):
- self.challenge['nonce'] = challenge['nonce']
- self.challenge['nc'] = 1
+ if "authentication-info" not in response:
+ challenge = _parse_www_authenticate(response, "www-authenticate").get(
+ "digest", {}
+ )
+ if "true" == challenge.get("stale"):
+ self.challenge["nonce"] = challenge["nonce"]
+ self.challenge["nc"] = 1
return True
else:
- updated_challenge = _parse_www_authenticate(response, 'authentication-info').get('digest', {})
+ updated_challenge = _parse_www_authenticate(
+ response, "authentication-info"
+ ).get("digest", {})
- if 'nextnonce' in updated_challenge:
- self.challenge['nonce'] = updated_challenge['nextnonce']
- self.challenge['nc'] = 1
+ if "nextnonce" in updated_challenge:
+ self.challenge["nonce"] = updated_challenge["nextnonce"]
+ self.challenge["nc"] = 1
return False
class HmacDigestAuthentication(Authentication):
"""Adapted from Robert Sayre's code and DigestAuthentication above."""
+
__author__ = "Thomas Broyer (t.broyer@ltgt.net)"
- def __init__(self, credentials, host, request_uri, headers, response, content, http):
- Authentication.__init__(self, credentials, host, request_uri, headers, response, content, http)
- challenge = _parse_www_authenticate(response, 'www-authenticate')
- self.challenge = challenge['hmacdigest']
+ def __init__(
+ self, credentials, host, request_uri, headers, response, content, http
+ ):
+ Authentication.__init__(
+ self, credentials, host, request_uri, headers, response, content, http
+ )
+ challenge = _parse_www_authenticate(response, "www-authenticate")
+ self.challenge = challenge["hmacdigest"]
# TODO: self.challenge['domain']
- self.challenge['reason'] = self.challenge.get('reason', 'unauthorized')
- if self.challenge['reason'] not in ['unauthorized', 'integrity']:
- self.challenge['reason'] = 'unauthorized'
- self.challenge['salt'] = self.challenge.get('salt', '')
- if not self.challenge.get('snonce'):
- raise UnimplementedHmacDigestAuthOptionError( _("The challenge doesn't contain a server nonce, or this one is empty."))
- self.challenge['algorithm'] = self.challenge.get('algorithm', 'HMAC-SHA-1')
- if self.challenge['algorithm'] not in ['HMAC-SHA-1', 'HMAC-MD5']:
- raise UnimplementedHmacDigestAuthOptionError( _("Unsupported value for algorithm: %s." % self.challenge['algorithm']))
- self.challenge['pw-algorithm'] = self.challenge.get('pw-algorithm', 'SHA-1')
- if self.challenge['pw-algorithm'] not in ['SHA-1', 'MD5']:
- raise UnimplementedHmacDigestAuthOptionError( _("Unsupported value for pw-algorithm: %s." % self.challenge['pw-algorithm']))
- if self.challenge['algorithm'] == 'HMAC-MD5':
+ self.challenge["reason"] = self.challenge.get("reason", "unauthorized")
+ if self.challenge["reason"] not in ["unauthorized", "integrity"]:
+ self.challenge["reason"] = "unauthorized"
+ self.challenge["salt"] = self.challenge.get("salt", "")
+ if not self.challenge.get("snonce"):
+ raise UnimplementedHmacDigestAuthOptionError(
+ _("The challenge doesn't contain a server nonce, or this one is empty.")
+ )
+ self.challenge["algorithm"] = self.challenge.get("algorithm", "HMAC-SHA-1")
+ if self.challenge["algorithm"] not in ["HMAC-SHA-1", "HMAC-MD5"]:
+ raise UnimplementedHmacDigestAuthOptionError(
+ _("Unsupported value for algorithm: %s." % self.challenge["algorithm"])
+ )
+ self.challenge["pw-algorithm"] = self.challenge.get("pw-algorithm", "SHA-1")
+ if self.challenge["pw-algorithm"] not in ["SHA-1", "MD5"]:
+ raise UnimplementedHmacDigestAuthOptionError(
+ _(
+ "Unsupported value for pw-algorithm: %s."
+ % self.challenge["pw-algorithm"]
+ )
+ )
+ if self.challenge["algorithm"] == "HMAC-MD5":
self.hashmod = _md5
else:
self.hashmod = _sha
- if self.challenge['pw-algorithm'] == 'MD5':
+ if self.challenge["pw-algorithm"] == "MD5":
self.pwhashmod = _md5
else:
self.pwhashmod = _sha
- self.key = "".join([self.credentials[0], ":",
- self.pwhashmod.new("".join([self.credentials[1], self.challenge['salt']])).hexdigest().lower(),
- ":", self.challenge['realm']])
+ self.key = "".join(
+ [
+ self.credentials[0],
+ ":",
+ self.pwhashmod.new(
+ "".join([self.credentials[1], self.challenge["salt"]])
+ )
+ .hexdigest()
+ .lower(),
+ ":",
+ self.challenge["realm"],
+ ]
+ )
self.key = self.pwhashmod.new(self.key).hexdigest().lower()
def request(self, method, request_uri, headers, content):
@@ -605,23 +769,38 @@
keys = _get_end2end_headers(headers)
keylist = "".join(["%s " % k for k in keys])
headers_val = "".join([headers[k] for k in keys])
- created = time.strftime('%Y-%m-%dT%H:%M:%SZ',time.gmtime())
+ created = time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime())
cnonce = _cnonce()
- request_digest = "%s:%s:%s:%s:%s" % (method, request_uri, cnonce, self.challenge['snonce'], headers_val)
- request_digest = hmac.new(self.key, request_digest, self.hashmod).hexdigest().lower()
- headers['authorization'] = 'HMACDigest username="%s", realm="%s", snonce="%s", cnonce="%s", uri="%s", created="%s", response="%s", headers="%s"' % (
- self.credentials[0],
- self.challenge['realm'],
- self.challenge['snonce'],
- cnonce,
- request_uri,
- created,
- request_digest,
- keylist)
+ request_digest = "%s:%s:%s:%s:%s" % (
+ method,
+ request_uri,
+ cnonce,
+ self.challenge["snonce"],
+ headers_val,
+ )
+ request_digest = (
+ hmac.new(self.key, request_digest, self.hashmod).hexdigest().lower()
+ )
+ headers["authorization"] = (
+ 'HMACDigest username="%s", realm="%s", snonce="%s",'
+ ' cnonce="%s", uri="%s", created="%s", '
+ 'response="%s", headers="%s"'
+ ) % (
+ self.credentials[0],
+ self.challenge["realm"],
+ self.challenge["snonce"],
+ cnonce,
+ request_uri,
+ created,
+ request_digest,
+ keylist,
+ )
def response(self, response, content):
- challenge = _parse_www_authenticate(response, 'www-authenticate').get('hmacdigest', {})
- if challenge.get('reason') in ['integrity', 'stale']:
+ challenge = _parse_www_authenticate(response, "www-authenticate").get(
+ "hmacdigest", {}
+ )
+ if challenge.get("reason") in ["integrity", "stale"]:
return True
return False
@@ -634,49 +813,69 @@
TypePad has implemented it wrong, by never issuing a 401
challenge but instead requiring your client to telepathically know that
their endpoint is expecting WSSE profile="UsernameToken"."""
- def __init__(self, credentials, host, request_uri, headers, response, content, http):
- Authentication.__init__(self, credentials, host, request_uri, headers, response, content, http)
+
+ def __init__(
+ self, credentials, host, request_uri, headers, response, content, http
+ ):
+ Authentication.__init__(
+ self, credentials, host, request_uri, headers, response, content, http
+ )
def request(self, method, request_uri, headers, content):
"""Modify the request headers to add the appropriate
Authorization header."""
- headers['authorization'] = 'WSSE profile="UsernameToken"'
+ headers["authorization"] = 'WSSE profile="UsernameToken"'
iso_now = time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime())
cnonce = _cnonce()
password_digest = _wsse_username_token(cnonce, iso_now, self.credentials[1])
- headers['X-WSSE'] = 'UsernameToken Username="%s", PasswordDigest="%s", Nonce="%s", Created="%s"' % (
- self.credentials[0],
- password_digest,
- cnonce,
- iso_now)
+ headers["X-WSSE"] = (
+ 'UsernameToken Username="%s", PasswordDigest="%s", '
+ 'Nonce="%s", Created="%s"'
+ ) % (self.credentials[0], password_digest, cnonce, iso_now)
+
class GoogleLoginAuthentication(Authentication):
- def __init__(self, credentials, host, request_uri, headers, response, content, http):
+ def __init__(
+ self, credentials, host, request_uri, headers, response, content, http
+ ):
from urllib.parse import urlencode
- Authentication.__init__(self, credentials, host, request_uri, headers, response, content, http)
- challenge = _parse_www_authenticate(response, 'www-authenticate')
- service = challenge['googlelogin'].get('service', 'xapi')
+
+ Authentication.__init__(
+ self, credentials, host, request_uri, headers, response, content, http
+ )
+ challenge = _parse_www_authenticate(response, "www-authenticate")
+ service = challenge["googlelogin"].get("service", "xapi")
# Bloggger actually returns the service in the challenge
# For the rest we guess based on the URI
- if service == 'xapi' and request_uri.find("calendar") > 0:
+ if service == "xapi" and request_uri.find("calendar") > 0:
service = "cl"
# No point in guessing Base or Spreadsheet
- #elif request_uri.find("spreadsheets") > 0:
+ # elif request_uri.find("spreadsheets") > 0:
# service = "wise"
- auth = dict(Email=credentials[0], Passwd=credentials[1], service=service, source=headers['user-agent'])
- resp, content = self.http.request("https://www.google.com/accounts/ClientLogin", method="POST", body=urlencode(auth), headers={'Content-Type': 'application/x-www-form-urlencoded'})
- lines = content.split('\n')
+ auth = dict(
+ Email=credentials[0],
+ Passwd=credentials[1],
+ service=service,
+ source=headers["user-agent"],
+ )
+ resp, content = self.http.request(
+ "https://www.google.com/accounts/ClientLogin",
+ method="POST",
+ body=urlencode(auth),
+ headers={"Content-Type": "application/x-www-form-urlencoded"},
+ )
+ lines = content.split("\n")
d = dict([tuple(line.split("=", 1)) for line in lines if line])
if resp.status == 403:
self.Auth = ""
else:
- self.Auth = d['Auth']
+ self.Auth = d["Auth"]
def request(self, method, request_uri, headers, content):
"""Modify the request headers to add the appropriate
Authorization header."""
- headers['authorization'] = 'GoogleLogin Auth=' + self.Auth
+ headers["authorization"] = "GoogleLogin Auth=" + self.Auth
AUTH_SCHEME_CLASSES = {
@@ -684,17 +883,21 @@
"wsse": WsseAuthentication,
"digest": DigestAuthentication,
"hmacdigest": HmacDigestAuthentication,
- "googlelogin": GoogleLoginAuthentication
+ "googlelogin": GoogleLoginAuthentication,
}
AUTH_SCHEME_ORDER = ["hmacdigest", "googlelogin", "digest", "wsse", "basic"]
+
class FileCache(object):
"""Uses a local directory as a store for cached files.
Not really safe to use if multiple threads or processes are going to
be running on the same cache.
"""
- def __init__(self, cache, safe=safename): # use safe=lambda x: md5.new(x).hexdigest() for the old behavior
+
+ def __init__(
+ self, cache, safe=safename
+ ): # use safe=lambda x: md5.new(x).hexdigest() for the old behavior
self.cache = cache
self.safe = safe
if not os.path.exists(cache):
@@ -722,6 +925,7 @@
if os.path.exists(cacheFullPath):
os.remove(cacheFullPath)
+
class Credentials(object):
def __init__(self):
self.credentials = []
@@ -737,9 +941,11 @@
if cdomain == "" or domain == cdomain:
yield (name, password)
+
class KeyCerts(Credentials):
"""Identical to Credentials except that
name/password are mapped to key/cert."""
+
pass
@@ -748,98 +954,117 @@
class ProxyInfo(object):
- """Collect information required to use a proxy."""
- bypass_hosts = ()
+ """Collect information required to use a proxy."""
- def __init__(self, proxy_type, proxy_host, proxy_port, proxy_rdns=True, proxy_user=None, proxy_pass=None, proxy_headers=None):
- """
- Args:
+ bypass_hosts = ()
+
+ def __init__(
+ self,
+ proxy_type,
+ proxy_host,
+ proxy_port,
+ proxy_rdns=True,
+ proxy_user=None,
+ proxy_pass=None,
+ proxy_headers=None,
+ ):
+ """Args:
+
proxy_type: The type of proxy server. This must be set to one of
- socks.PROXY_TYPE_XXX constants. For example:
-
- p = ProxyInfo(proxy_type=socks.PROXY_TYPE_HTTP,
- proxy_host='localhost', proxy_port=8000)
-
+ socks.PROXY_TYPE_XXX constants. For example: p =
+ ProxyInfo(proxy_type=socks.PROXY_TYPE_HTTP, proxy_host='localhost',
+ proxy_port=8000)
proxy_host: The hostname or IP address of the proxy server.
-
proxy_port: The port that the proxy server is running on.
-
proxy_rdns: If True (default), DNS queries will not be performed
locally, and instead, handed to the proxy to resolve. This is useful
- if the network does not allow resolution of non-local names. In
+ if the network does not allow resolution of non-local names. In
httplib2 0.9 and earlier, this defaulted to False.
-
proxy_user: The username used to authenticate with the proxy server.
-
proxy_pass: The password used to authenticate with the proxy server.
+ proxy_headers: Additional or modified headers for the proxy connect
+ request.
+ """
+ self.proxy_type, self.proxy_host, self.proxy_port, self.proxy_rdns, self.proxy_user, self.proxy_pass, self.proxy_headers = (
+ proxy_type,
+ proxy_host,
+ proxy_port,
+ proxy_rdns,
+ proxy_user,
+ proxy_pass,
+ proxy_headers,
+ )
- proxy_headers: Additional or modified headers for the proxy connect request.
- """
- self.proxy_type, self.proxy_host, self.proxy_port, self.proxy_rdns, self.proxy_user, self.proxy_pass, self.proxy_headers = proxy_type, proxy_host, proxy_port, proxy_rdns, proxy_user, proxy_pass, proxy_headers
+ def astuple(self):
+ return (
+ self.proxy_type,
+ self.proxy_host,
+ self.proxy_port,
+ self.proxy_rdns,
+ self.proxy_user,
+ self.proxy_pass,
+ self.proxy_headers,
+ )
- def astuple(self):
- return (self.proxy_type, self.proxy_host, self.proxy_port, self.proxy_rdns,
- self.proxy_user, self.proxy_pass, self.proxy_headers)
+ def isgood(self):
+ return socks and (self.proxy_host != None) and (self.proxy_port != None)
- def isgood(self):
- return socks and (self.proxy_host != None) and (self.proxy_port != None)
+ def applies_to(self, hostname):
+ return not self.bypass_host(hostname)
- def applies_to(self, hostname):
- return not self.bypass_host(hostname)
+ def bypass_host(self, hostname):
+ """Has this host been excluded from the proxy config"""
+ if self.bypass_hosts is AllHosts:
+ return True
- def bypass_host(self, hostname):
- """Has this host been excluded from the proxy config"""
- if self.bypass_hosts is AllHosts:
- return True
+ hostname = "." + hostname.lstrip(".")
+ for skip_name in self.bypass_hosts:
+ # *.suffix
+ if skip_name.startswith(".") and hostname.endswith(skip_name):
+ return True
+ # exact match
+ if hostname == "." + skip_name:
+ return True
+ return False
- hostname = '.' + hostname.lstrip('.')
- for skip_name in self.bypass_hosts:
- # *.suffix
- if skip_name.startswith('.') and hostname.endswith(skip_name):
- return True
- # exact match
- if hostname == '.' + skip_name:
- return True
- return False
-
- def __repr__(self):
- return (
- '<ProxyInfo type={p.proxy_type} host:port={p.proxy_host}:{p.proxy_port} rdns={p.proxy_rdns}' +
- ' user={p.proxy_user} headers={p.proxy_headers}>').format(p=self)
+ def __repr__(self):
+ return (
+ "<ProxyInfo type={p.proxy_type} "
+ "host:port={p.proxy_host}:{p.proxy_port} rdns={p.proxy_rdns}"
+ + " user={p.proxy_user} headers={p.proxy_headers}>"
+ ).format(p=self)
-def proxy_info_from_environment(method='http'):
+def proxy_info_from_environment(method="http"):
+ """Read proxy info from the environment variables.
"""
- Read proxy info from the environment variables.
- """
- if method not in ('http', 'https'):
+ if method not in ("http", "https"):
return
- env_var = method + '_proxy'
+ env_var = method + "_proxy"
url = os.environ.get(env_var, os.environ.get(env_var.upper()))
if not url:
return
return proxy_info_from_url(url, method, noproxy=None)
-def proxy_info_from_url(url, method='http', noproxy=None):
- """
- Construct a ProxyInfo from a URL (such as http_proxy env var)
+def proxy_info_from_url(url, method="http", noproxy=None):
+ """Construct a ProxyInfo from a URL (such as http_proxy env var)
"""
url = urllib.parse.urlparse(url)
username = None
password = None
port = None
- if '@' in url[1]:
- ident, host_port = url[1].split('@', 1)
- if ':' in ident:
- username, password = ident.split(':', 1)
+ if "@" in url[1]:
+ ident, host_port = url[1].split("@", 1)
+ if ":" in ident:
+ username, password = ident.split(":", 1)
else:
password = ident
else:
host_port = url[1]
- if ':' in host_port:
- host, port = host_port.split(':', 1)
+ if ":" in host_port:
+ host, port = host_port.split(":", 1)
else:
host = host_port
@@ -861,12 +1086,12 @@
bypass_hosts = []
# If not given an explicit noproxy value, respect values in env vars.
if noproxy is None:
- noproxy = os.environ.get('no_proxy', os.environ.get('NO_PROXY', ''))
+ noproxy = os.environ.get("no_proxy", os.environ.get("NO_PROXY", ""))
# Special case: A single '*' character means all hosts should be bypassed.
- if noproxy == '*':
+ if noproxy == "*":
bypass_hosts = AllHosts
elif noproxy.strip():
- bypass_hosts = noproxy.split(',')
+ bypass_hosts = noproxy.split(",")
bypass_hosts = tuple(filter(bool, bypass_hosts)) # To exclude empty string.
pi.bypass_hosts = bypass_hosts
@@ -885,21 +1110,23 @@
"""
def __init__(self, host, port=None, timeout=None, proxy_info=None):
- http.client.HTTPConnection.__init__(self, host, port=port,
- timeout=timeout)
+ http.client.HTTPConnection.__init__(self, host, port=port, timeout=timeout)
self.proxy_info = proxy_info
if proxy_info and not isinstance(proxy_info, ProxyInfo):
- self.proxy_info = proxy_info('http')
+ self.proxy_info = proxy_info("http")
def connect(self):
"""Connect to the host and port specified in __init__."""
if self.proxy_info and socks is None:
raise ProxiesUnavailableError(
- 'Proxy support missing but proxy use was requested!')
+ "Proxy support missing but proxy use was requested!"
+ )
if self.proxy_info and self.proxy_info.isgood():
use_proxy = True
- proxy_type, proxy_host, proxy_port, proxy_rdns, proxy_user, proxy_pass, proxy_headers = self.proxy_info.astuple()
+ proxy_type, proxy_host, proxy_port, proxy_rdns, proxy_user, proxy_pass, proxy_headers = (
+ self.proxy_info.astuple()
+ )
host = proxy_host
port = proxy_port
@@ -917,7 +1144,14 @@
try:
if use_proxy:
self.sock = socks.socksocket(af, socktype, proto)
- self.sock.setproxy(proxy_type, proxy_host, proxy_port, proxy_rdns, proxy_user, proxy_pass)
+ self.sock.setproxy(
+ proxy_type,
+ proxy_host,
+ proxy_port,
+ proxy_rdns,
+ proxy_user,
+ proxy_pass,
+ )
else:
self.sock = socket.socket(af, socktype, proto)
self.sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
@@ -925,22 +1159,44 @@
self.sock.settimeout(self.timeout)
if self.debuglevel > 0:
print(
- "connect: ({0}, {1}) ************".format(self.host, self.port))
+ "connect: ({0}, {1}) ************".format(self.host, self.port)
+ )
if use_proxy:
print(
- "proxy: {0} ************".format(str(
- (proxy_host, proxy_port, proxy_rdns, proxy_user, proxy_pass, proxy_headers))))
+ "proxy: {0} ************".format(
+ str(
+ (
+ proxy_host,
+ proxy_port,
+ proxy_rdns,
+ proxy_user,
+ proxy_pass,
+ proxy_headers,
+ )
+ )
+ )
+ )
self.sock.connect((self.host, self.port) + sa[2:])
except socket.error as e:
socket_err = e
if self.debuglevel > 0:
- print(
- "connect fail: ({0}, {1})".format(self.host, self.port))
+ print("connect fail: ({0}, {1})".format(self.host, self.port))
if use_proxy:
print(
- "proxy: {0}".format(str(
- (proxy_host, proxy_port, proxy_rdns, proxy_user, proxy_pass, proxy_headers))))
+ "proxy: {0}".format(
+ str(
+ (
+ proxy_host,
+ proxy_port,
+ proxy_rdns,
+ proxy_user,
+ proxy_pass,
+ proxy_headers,
+ )
+ )
+ )
+ )
if self.sock:
self.sock.close()
self.sock = None
@@ -951,8 +1207,7 @@
class HTTPSConnectionWithTimeout(http.client.HTTPSConnection):
- """
- This class allows communication via SSL.
+ """This class allows communication via SSL.
All timeouts are in seconds. If None is passed for timeout then
Python's default timeout for sockets will be used. See for example
@@ -960,26 +1215,44 @@
http://docs.python.org/library/socket.html#socket.setdefaulttimeout
"""
- def __init__(self, host, port=None, key_file=None, cert_file=None,
- timeout=None, proxy_info=None,
- ca_certs=None, disable_ssl_certificate_validation=False):
+ def __init__(
+ self,
+ host,
+ port=None,
+ key_file=None,
+ cert_file=None,
+ timeout=None,
+ proxy_info=None,
+ ca_certs=None,
+ disable_ssl_certificate_validation=False,
+ ):
self.disable_ssl_certificate_validation = disable_ssl_certificate_validation
self.ca_certs = ca_certs if ca_certs else CA_CERTS
self.proxy_info = proxy_info
if proxy_info and not isinstance(proxy_info, ProxyInfo):
- self.proxy_info = proxy_info('https')
+ self.proxy_info = proxy_info("https")
- context = _build_ssl_context(self.disable_ssl_certificate_validation, self.ca_certs, cert_file, key_file)
- super(HTTPSConnectionWithTimeout, self).__init__(host, port=port, key_file=key_file, cert_file=cert_file,
- timeout=timeout, context=context)
+ context = _build_ssl_context(
+ self.disable_ssl_certificate_validation, self.ca_certs, cert_file, key_file
+ )
+ super(HTTPSConnectionWithTimeout, self).__init__(
+ host,
+ port=port,
+ key_file=key_file,
+ cert_file=cert_file,
+ timeout=timeout,
+ context=context,
+ )
def connect(self):
"""Connect to a host on a given (SSL) port."""
if self.proxy_info and self.proxy_info.isgood():
use_proxy = True
- proxy_type, proxy_host, proxy_port, proxy_rdns, proxy_user, proxy_pass, proxy_headers = self.proxy_info.astuple()
+ proxy_type, proxy_host, proxy_port, proxy_rdns, proxy_user, proxy_pass, proxy_headers = (
+ self.proxy_info.astuple()
+ )
host = proxy_host
port = proxy_port
@@ -999,7 +1272,14 @@
if use_proxy:
sock = socks.socksocket(family, socktype, proto)
- sock.setproxy(proxy_type, proxy_host, proxy_port, proxy_rdns, proxy_user, proxy_pass)
+ sock.setproxy(
+ proxy_type,
+ proxy_host,
+ proxy_port,
+ proxy_rdns,
+ proxy_user,
+ proxy_pass,
+ )
else:
sock = socket.socket(family, socktype, proto)
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
@@ -1010,7 +1290,10 @@
self.sock = self._context.wrap_socket(sock, server_hostname=self.host)
# Python 3.3 compatibility: emulate the check_hostname behavior
- if not hasattr(self._context, 'check_hostname') and not self.disable_ssl_certificate_validation:
+ if (
+ not hasattr(self._context, "check_hostname")
+ and not self.disable_ssl_certificate_validation
+ ):
try:
ssl.match_hostname(self.sock.getpeercert(), self.host)
except Exception:
@@ -1021,8 +1304,20 @@
if self.debuglevel > 0:
print("connect: ({0}, {1})".format(self.host, self.port))
if use_proxy:
- print("proxy: {0}".format(str(
- (proxy_host, proxy_port, proxy_rdns, proxy_user, proxy_pass, proxy_headers))))
+ print(
+ "proxy: {0}".format(
+ str(
+ (
+ proxy_host,
+ proxy_port,
+ proxy_rdns,
+ proxy_user,
+ proxy_pass,
+ proxy_headers,
+ )
+ )
+ )
+ )
except (ssl.SSLError, ssl.CertificateError) as e:
if sock:
sock.close()
@@ -1037,7 +1332,20 @@
if self.debuglevel > 0:
print("connect fail: ({0}, {1})".format((self.host, self.port)))
if use_proxy:
- print("proxy: {0}".format(str((proxy_host, proxy_port, proxy_rdns, proxy_user, proxy_pass, proxy_headers))))
+ print(
+ "proxy: {0}".format(
+ str(
+ (
+ proxy_host,
+ proxy_port,
+ proxy_rdns,
+ proxy_user,
+ proxy_pass,
+ proxy_headers,
+ )
+ )
+ )
+ )
if self.sock:
self.sock.close()
self.sock = None
@@ -1048,10 +1356,11 @@
SCHEME_TO_CONNECTION = {
- 'http': HTTPConnectionWithTimeout,
- 'https': HTTPSConnectionWithTimeout,
+ "http": HTTPConnectionWithTimeout,
+ "https": HTTPSConnectionWithTimeout,
}
+
class Http(object):
"""An HTTP client that handles:
@@ -1066,9 +1375,15 @@
and more.
"""
- def __init__(self, cache=None, timeout=None,
- proxy_info=proxy_info_from_environment,
- ca_certs=None, disable_ssl_certificate_validation=False):
+
+ def __init__(
+ self,
+ cache=None,
+ timeout=None,
+ proxy_info=proxy_info_from_environment,
+ ca_certs=None,
+ disable_ssl_certificate_validation=False,
+ ):
"""If 'cache' is a string then it is used as a directory name for
a disk cache. Otherwise it must be an object that supports the
same interface as FileCache.
@@ -1094,8 +1409,7 @@
"""
self.proxy_info = proxy_info
self.ca_certs = ca_certs
- self.disable_ssl_certificate_validation = \
- disable_ssl_certificate_validation
+ self.disable_ssl_certificate_validation = disable_ssl_certificate_validation
# Map domain name to an httplib connection
self.connections = {}
# The location of the cache, for now a directory
@@ -1138,10 +1452,10 @@
state_dict = copy.copy(self.__dict__)
# In case request is augmented by some foreign object such as
# credentials which handle auth
- if 'request' in state_dict:
- del state_dict['request']
- if 'connections' in state_dict:
- del state_dict['connections']
+ if "request" in state_dict:
+ del state_dict["request"]
+ if "connections" in state_dict:
+ del state_dict["connections"]
return state_dict
def __setstate__(self, state):
@@ -1152,11 +1466,13 @@
"""A generator that creates Authorization objects
that can be applied to requests.
"""
- challenges = _parse_www_authenticate(response, 'www-authenticate')
+ challenges = _parse_www_authenticate(response, "www-authenticate")
for cred in self.credentials.iter(host):
for scheme in AUTH_SCHEME_ORDER:
if scheme in challenges:
- yield AUTH_SCHEME_CLASSES[scheme](cred, host, request_uri, headers, response, content, self)
+ yield AUTH_SCHEME_CLASSES[scheme](
+ cred, host, request_uri, headers, response, content, self
+ )
def add_credentials(self, name, password, domain=""):
"""Add a name and password that will be used
@@ -1190,20 +1506,22 @@
conn.close()
raise ServerNotFoundError("Unable to find the server at %s" % conn.host)
except socket.error as e:
- errno_ = (e.args[0].errno if isinstance(e.args[0], socket.error) else e.errno)
+ errno_ = (
+ e.args[0].errno if isinstance(e.args[0], socket.error) else e.errno
+ )
if errno_ in (errno.ENETUNREACH, errno.EADDRNOTAVAIL) and i < RETRIES:
continue # retry on potentially transient errors
raise
except http.client.HTTPException:
if conn.sock is None:
- if i < RETRIES-1:
+ if i < RETRIES - 1:
conn.close()
conn.connect()
continue
else:
conn.close()
raise
- if i < RETRIES-1:
+ if i < RETRIES - 1:
conn.close()
conn.connect()
continue
@@ -1248,76 +1566,121 @@
break
return (response, content)
-
- def _request(self, conn, host, absolute_uri, request_uri, method, body, headers, redirections, cachekey):
+ def _request(
+ self,
+ conn,
+ host,
+ absolute_uri,
+ request_uri,
+ method,
+ body,
+ headers,
+ redirections,
+ cachekey,
+ ):
"""Do the actual request using the connection object
and also follow one level of redirects if necessary"""
- auths = [(auth.depth(request_uri), auth) for auth in self.authorizations if auth.inscope(host, request_uri)]
+ auths = [
+ (auth.depth(request_uri), auth)
+ for auth in self.authorizations
+ if auth.inscope(host, request_uri)
+ ]
auth = auths and sorted(auths)[0][1] or None
if auth:
auth.request(method, request_uri, headers, body)
- (response, content) = self._conn_request(conn, request_uri, method, body, headers)
+ (response, content) = self._conn_request(
+ conn, request_uri, method, body, headers
+ )
if auth:
if auth.response(response, body):
auth.request(method, request_uri, headers, body)
- (response, content) = self._conn_request(conn, request_uri, method, body, headers )
+ (response, content) = self._conn_request(
+ conn, request_uri, method, body, headers
+ )
response._stale_digest = 1
if response.status == 401:
- for authorization in self._auth_from_challenge(host, request_uri, headers, response, content):
+ for authorization in self._auth_from_challenge(
+ host, request_uri, headers, response, content
+ ):
authorization.request(method, request_uri, headers, body)
- (response, content) = self._conn_request(conn, request_uri, method, body, headers, )
+ (response, content) = self._conn_request(
+ conn, request_uri, method, body, headers
+ )
if response.status != 401:
self.authorizations.append(authorization)
authorization.response(response, body)
break
- if (self.follow_all_redirects or (method in ["GET", "HEAD"]) or response.status == 303):
+ if (
+ self.follow_all_redirects
+ or (method in ["GET", "HEAD"])
+ or response.status == 303
+ ):
if self.follow_redirects and response.status in [300, 301, 302, 303, 307]:
# Pick out the location header and basically start from the beginning
# remembering first to strip the ETag header and decrement our 'depth'
if redirections:
- if 'location' not in response and response.status != 300:
- raise RedirectMissingLocation( _("Redirected but the response is missing a Location: header."), response, content)
+ if "location" not in response and response.status != 300:
+ raise RedirectMissingLocation(
+ _(
+ "Redirected but the response is missing a Location: header."
+ ),
+ response,
+ content,
+ )
# Fix-up relative redirects (which violate an RFC 2616 MUST)
- if 'location' in response:
- location = response['location']
+ if "location" in response:
+ location = response["location"]
(scheme, authority, path, query, fragment) = parse_uri(location)
if authority == None:
- response['location'] = urllib.parse.urljoin(absolute_uri, location)
+ response["location"] = urllib.parse.urljoin(
+ absolute_uri, location
+ )
if response.status == 301 and method in ["GET", "HEAD"]:
- response['-x-permanent-redirect-url'] = response['location']
- if 'content-location' not in response:
- response['content-location'] = absolute_uri
+ response["-x-permanent-redirect-url"] = response["location"]
+ if "content-location" not in response:
+ response["content-location"] = absolute_uri
_updateCache(headers, response, content, self.cache, cachekey)
- if 'if-none-match' in headers:
- del headers['if-none-match']
- if 'if-modified-since' in headers:
- del headers['if-modified-since']
- if 'authorization' in headers and not self.forward_authorization_headers:
- del headers['authorization']
- if 'location' in response:
- location = response['location']
+ if "if-none-match" in headers:
+ del headers["if-none-match"]
+ if "if-modified-since" in headers:
+ del headers["if-modified-since"]
+ if (
+ "authorization" in headers
+ and not self.forward_authorization_headers
+ ):
+ del headers["authorization"]
+ if "location" in response:
+ location = response["location"]
old_response = copy.deepcopy(response)
- if 'content-location' not in old_response:
- old_response['content-location'] = absolute_uri
+ if "content-location" not in old_response:
+ old_response["content-location"] = absolute_uri
redirect_method = method
if response.status in [302, 303]:
- redirect_method = "GET"
- body = None
+ redirect_method = "GET"
+ body = None
(response, content) = self.request(
- location, method=redirect_method, body=body,
- headers=headers, redirections=redirections - 1)
+ location,
+ method=redirect_method,
+ body=body,
+ headers=headers,
+ redirections=redirections - 1,
+ )
response.previous = old_response
else:
- raise RedirectLimit("Redirected more times than redirection_limit allows.", response, content)
+ raise RedirectLimit(
+ "Redirected more times than redirection_limit allows.",
+ response,
+ content,
+ )
elif response.status in [200, 203] and method in ["GET", "HEAD"]:
# Don't cache 206's since we aren't going to handle byte range requests
- if 'content-location' not in response:
- response['content-location'] = absolute_uri
+ if "content-location" not in response:
+ response["content-location"] = absolute_uri
_updateCache(headers, response, content, self.cache, cachekey)
return (response, content)
@@ -1325,12 +1688,19 @@
def _normalize_headers(self, headers):
return _normalize_headers(headers)
-# Need to catch and rebrand some exceptions
-# Then need to optionally turn all exceptions into status codes
-# including all socket.* and httplib.* exceptions.
+ # Need to catch and rebrand some exceptions
+ # Then need to optionally turn all exceptions into status codes
+ # including all socket.* and httplib.* exceptions.
-
- def request(self, uri, method="GET", body=None, headers=None, redirections=DEFAULT_MAX_REDIRECTS, connection_type=None):
+ def request(
+ self,
+ uri,
+ method="GET",
+ body=None,
+ headers=None,
+ redirections=DEFAULT_MAX_REDIRECTS,
+ connection_type=None,
+ ):
""" Performs a single HTTP request.
The 'uri' is the URI of the HTTP resource and can begin
with either 'http' or 'https'. The value of 'uri' must be an absolute URI.
@@ -1357,18 +1727,18 @@
else:
headers = self._normalize_headers(headers)
- if 'user-agent' not in headers:
- headers['user-agent'] = "Python-httplib2/%s (gzip)" % __version__
+ if "user-agent" not in headers:
+ headers["user-agent"] = "Python-httplib2/%s (gzip)" % __version__
uri = iri2uri(uri)
(scheme, authority, request_uri, defrag_uri) = urlnorm(uri)
domain_port = authority.split(":")[0:2]
- if len(domain_port) == 2 and domain_port[1] == '443' and scheme == 'http':
- scheme = 'https'
+ if len(domain_port) == 2 and domain_port[1] == "443" and scheme == "http":
+ scheme = "https"
authority = domain_port[0]
- conn_key = scheme+":"+authority
+ conn_key = scheme + ":" + authority
if conn_key in self.connections:
conn = self.connections[conn_key]
else:
@@ -1378,27 +1748,30 @@
if issubclass(connection_type, HTTPSConnectionWithTimeout):
if certs:
conn = self.connections[conn_key] = connection_type(
- authority, key_file=certs[0][0],
- cert_file=certs[0][1], timeout=self.timeout,
- proxy_info=self.proxy_info,
- ca_certs=self.ca_certs,
- disable_ssl_certificate_validation=
- self.disable_ssl_certificate_validation)
+ authority,
+ key_file=certs[0][0],
+ cert_file=certs[0][1],
+ timeout=self.timeout,
+ proxy_info=self.proxy_info,
+ ca_certs=self.ca_certs,
+ disable_ssl_certificate_validation=self.disable_ssl_certificate_validation,
+ )
else:
conn = self.connections[conn_key] = connection_type(
- authority, timeout=self.timeout,
- proxy_info=self.proxy_info,
- ca_certs=self.ca_certs,
- disable_ssl_certificate_validation=
- self.disable_ssl_certificate_validation)
+ authority,
+ timeout=self.timeout,
+ proxy_info=self.proxy_info,
+ ca_certs=self.ca_certs,
+ disable_ssl_certificate_validation=self.disable_ssl_certificate_validation,
+ )
else:
conn = self.connections[conn_key] = connection_type(
- authority, timeout=self.timeout,
- proxy_info=self.proxy_info)
+ authority, timeout=self.timeout, proxy_info=self.proxy_info
+ )
conn.set_debuglevel(debuglevel)
- if 'range' not in headers and 'accept-encoding' not in headers:
- headers['accept-encoding'] = 'gzip, deflate'
+ if "range" not in headers and "accept-encoding" not in headers:
+ headers["accept-encoding"] = "gzip, deflate"
info = email.message.Message()
cached_value = None
@@ -1407,12 +1780,13 @@
cached_value = self.cache.get(cachekey)
if cached_value:
try:
- info, content = cached_value.split(b'\r\n\r\n', 1)
+ info, content = cached_value.split(b"\r\n\r\n", 1)
info = email.message_from_bytes(info)
for k, v in info.items():
- if v.startswith('=?') and v.endswith('?='):
- info.replace_header(k,
- str(*email.header.decode_header(v)[0]))
+ if v.startswith("=?") and v.endswith("?="):
+ info.replace_header(
+ k, str(*email.header.decode_header(v)[0])
+ )
except (IndexError, ValueError):
self.cache.delete(cachekey)
cachekey = None
@@ -1420,9 +1794,15 @@
else:
cachekey = None
- if method in self.optimistic_concurrency_methods and self.cache and 'etag' in info and not self.ignore_etag and 'if-match' not in headers:
+ if (
+ method in self.optimistic_concurrency_methods
+ and self.cache
+ and "etag" in info
+ and not self.ignore_etag
+ and "if-match" not in headers
+ ):
# http://www.w3.org/1999/04/Editing/
- headers['if-match'] = info['etag']
+ headers["if-match"] = info["etag"]
if method not in ["GET", "HEAD"] and self.cache and cachekey:
# RFC 2616 Section 13.10
@@ -1430,24 +1810,36 @@
# Check the vary header in the cache to see if this request
# matches what varies in the cache.
- if method in ['GET', 'HEAD'] and 'vary' in info:
- vary = info['vary']
- vary_headers = vary.lower().replace(' ', '').split(',')
+ if method in ["GET", "HEAD"] and "vary" in info:
+ vary = info["vary"]
+ vary_headers = vary.lower().replace(" ", "").split(",")
for header in vary_headers:
- key = '-varied-%s' % header
+ key = "-varied-%s" % header
value = info[key]
if headers.get(header, None) != value:
- cached_value = None
- break
+ cached_value = None
+ break
- if cached_value and method in ["GET", "HEAD"] and self.cache and 'range' not in headers:
- if '-x-permanent-redirect-url' in info:
+ if (
+ cached_value
+ and method in ["GET", "HEAD"]
+ and self.cache
+ and "range" not in headers
+ ):
+ if "-x-permanent-redirect-url" in info:
# Should cached permanent redirects be counted in our redirection count? For now, yes.
if redirections <= 0:
- raise RedirectLimit("Redirected more times than redirection_limit allows.", {}, "")
+ raise RedirectLimit(
+ "Redirected more times than redirection_limit allows.",
+ {},
+ "",
+ )
(response, new_content) = self.request(
- info['-x-permanent-redirect-url'], method='GET',
- headers=headers, redirections=redirections - 1)
+ info["-x-permanent-redirect-url"],
+ method="GET",
+ headers=headers,
+ redirections=redirections - 1,
+ )
response.previous = Response(info)
response.previous.fromcache = True
else:
@@ -1463,7 +1855,7 @@
if entry_disposition == "FRESH":
if not cached_value:
- info['status'] = '504'
+ info["status"] = "504"
content = b""
response = Response(info)
if cached_value:
@@ -1471,14 +1863,28 @@
return (response, content)
if entry_disposition == "STALE":
- if 'etag' in info and not self.ignore_etag and not 'if-none-match' in headers:
- headers['if-none-match'] = info['etag']
- if 'last-modified' in info and not 'last-modified' in headers:
- headers['if-modified-since'] = info['last-modified']
+ if (
+ "etag" in info
+ and not self.ignore_etag
+ and not "if-none-match" in headers
+ ):
+ headers["if-none-match"] = info["etag"]
+ if "last-modified" in info and not "last-modified" in headers:
+ headers["if-modified-since"] = info["last-modified"]
elif entry_disposition == "TRANSPARENT":
pass
- (response, new_content) = self._request(conn, authority, uri, request_uri, method, body, headers, redirections, cachekey)
+ (response, new_content) = self._request(
+ conn,
+ authority,
+ uri,
+ request_uri,
+ method,
+ body,
+ headers,
+ redirections,
+ cachekey,
+ )
if response.status == 304 and method == "GET":
# Rewrite the cache entry with the new end-to-end headers
@@ -1491,7 +1897,9 @@
merged_response = Response(info)
if hasattr(response, "_stale_digest"):
merged_response._stale_digest = response._stale_digest
- _updateCache(headers, merged_response, content, self.cache, cachekey)
+ _updateCache(
+ headers, merged_response, content, self.cache, cachekey
+ )
response = merged_response
response.status = 200
response.fromcache = True
@@ -1503,12 +1911,22 @@
content = new_content
else:
cc = _parse_cache_control(headers)
- if 'only-if-cached'in cc:
- info['status'] = '504'
+ if "only-if-cached" in cc:
+ info["status"] = "504"
response = Response(info)
content = b""
else:
- (response, content) = self._request(conn, authority, uri, request_uri, method, body, headers, redirections, cachekey)
+ (response, content) = self._request(
+ conn,
+ authority,
+ uri,
+ request_uri,
+ method,
+ body,
+ headers,
+ redirections,
+ cachekey,
+ )
except Exception as e:
if self.force_exception_to_status_code:
if isinstance(e, HttpLib2ErrorWithResponse):
@@ -1518,40 +1936,43 @@
response.reason = str(e)
elif isinstance(e, socket.timeout):
content = b"Request Timeout"
- response = Response({
- "content-type": "text/plain",
- "status": "408",
- "content-length": len(content)
- })
+ response = Response(
+ {
+ "content-type": "text/plain",
+ "status": "408",
+ "content-length": len(content),
+ }
+ )
response.reason = "Request Timeout"
else:
- content = str(e).encode('utf-8')
- response = Response({
- "content-type": "text/plain",
- "status": "400",
- "content-length": len(content)
- })
+ content = str(e).encode("utf-8")
+ response = Response(
+ {
+ "content-type": "text/plain",
+ "status": "400",
+ "content-length": len(content),
+ }
+ )
response.reason = "Bad Request"
else:
raise
-
return (response, content)
-
class Response(dict):
"""An object more like email.message than httplib.HTTPResponse."""
"""Is this response from our local cache"""
fromcache = False
+ """HTTP protocol version used by server.
- """HTTP protocol version used by server. 10 for HTTP/1.0, 11 for HTTP/1.1. """
+ 10 for HTTP/1.0, 11 for HTTP/1.1.
+ """
version = 11
"Status code returned by server. "
status = 200
-
"""Reason phrase returned by server."""
reason = "Ok"
@@ -1565,24 +1986,23 @@
key = key.lower()
prev = self.get(key)
if prev is not None:
- value = ', '.join((prev, value))
+ value = ", ".join((prev, value))
self[key] = value
self.status = info.status
- self['status'] = str(self.status)
+ self["status"] = str(self.status)
self.reason = info.reason
self.version = info.version
elif isinstance(info, email.message.Message):
for key, value in list(info.items()):
self[key.lower()] = value
- self.status = int(self['status'])
+ self.status = int(self["status"])
else:
for key, value in info.items():
self[key.lower()] = value
- self.status = int(self.get('status', self.status))
-
+ self.status = int(self.get("status", self.status))
def __getattr__(self, name):
- if name == 'dict':
+ if name == "dict":
return self
else:
raise AttributeError(name)
diff --git a/python3/httplib2/iri2uri.py b/python3/httplib2/iri2uri.py
index 98985f8..0f8fbf9 100644
--- a/python3/httplib2/iri2uri.py
+++ b/python3/httplib2/iri2uri.py
@@ -1,110 +1,123 @@
-"""
-iri2uri
-
-Converts an IRI to a URI.
-
-"""
-__author__ = "Joe Gregorio (joe@bitworking.org)"
-__copyright__ = "Copyright 2006, Joe Gregorio"
-__contributors__ = []
-__version__ = "1.0.0"
-__license__ = "MIT"
-__history__ = """
-"""
-
-import urllib.parse
-
-
-# Convert an IRI to a URI following the rules in RFC 3987
-#
-# The characters we need to enocde and escape are defined in the spec:
-#
-# iprivate = %xE000-F8FF / %xF0000-FFFFD / %x100000-10FFFD
-# ucschar = %xA0-D7FF / %xF900-FDCF / %xFDF0-FFEF
-# / %x10000-1FFFD / %x20000-2FFFD / %x30000-3FFFD
-# / %x40000-4FFFD / %x50000-5FFFD / %x60000-6FFFD
-# / %x70000-7FFFD / %x80000-8FFFD / %x90000-9FFFD
-# / %xA0000-AFFFD / %xB0000-BFFFD / %xC0000-CFFFD
-# / %xD0000-DFFFD / %xE1000-EFFFD
-
-escape_range = [
- (0xA0, 0xD7FF),
- (0xE000, 0xF8FF),
- (0xF900, 0xFDCF),
- (0xFDF0, 0xFFEF),
- (0x10000, 0x1FFFD),
- (0x20000, 0x2FFFD),
- (0x30000, 0x3FFFD),
- (0x40000, 0x4FFFD),
- (0x50000, 0x5FFFD),
- (0x60000, 0x6FFFD),
- (0x70000, 0x7FFFD),
- (0x80000, 0x8FFFD),
- (0x90000, 0x9FFFD),
- (0xA0000, 0xAFFFD),
- (0xB0000, 0xBFFFD),
- (0xC0000, 0xCFFFD),
- (0xD0000, 0xDFFFD),
- (0xE1000, 0xEFFFD),
- (0xF0000, 0xFFFFD),
- (0x100000, 0x10FFFD),
-]
-
-def encode(c):
- retval = c
- i = ord(c)
- for low, high in escape_range:
- if i < low:
- break
- if i >= low and i <= high:
- retval = "".join(["%%%2X" % o for o in c.encode('utf-8')])
- break
- return retval
-
-
-def iri2uri(uri):
- """Convert an IRI to a URI. Note that IRIs must be
- passed in a unicode strings. That is, do not utf-8 encode
- the IRI before passing it into the function."""
- if isinstance(uri ,str):
- (scheme, authority, path, query, fragment) = urllib.parse.urlsplit(uri)
- authority = authority.encode('idna').decode('utf-8')
- # For each character in 'ucschar' or 'iprivate'
- # 1. encode as utf-8
- # 2. then %-encode each octet of that utf-8
- uri = urllib.parse.urlunsplit((scheme, authority, path, query, fragment))
- uri = "".join([encode(c) for c in uri])
- return uri
-
-if __name__ == "__main__":
- import unittest
-
- class Test(unittest.TestCase):
-
- def test_uris(self):
- """Test that URIs are invariant under the transformation."""
- invariant = [
- "ftp://ftp.is.co.za/rfc/rfc1808.txt",
- "http://www.ietf.org/rfc/rfc2396.txt",
- "ldap://[2001:db8::7]/c=GB?objectClass?one",
- "mailto:John.Doe@example.com",
- "news:comp.infosystems.www.servers.unix",
- "tel:+1-816-555-1212",
- "telnet://192.0.2.16:80/",
- "urn:oasis:names:specification:docbook:dtd:xml:4.1.2" ]
- for uri in invariant:
- self.assertEqual(uri, iri2uri(uri))
-
- def test_iri(self):
- """ Test that the right type of escaping is done for each part of the URI."""
- self.assertEqual("http://xn--o3h.com/%E2%98%84", iri2uri("http://\N{COMET}.com/\N{COMET}"))
- self.assertEqual("http://bitworking.org/?fred=%E2%98%84", iri2uri("http://bitworking.org/?fred=\N{COMET}"))
- self.assertEqual("http://bitworking.org/#%E2%98%84", iri2uri("http://bitworking.org/#\N{COMET}"))
- self.assertEqual("#%E2%98%84", iri2uri("#\N{COMET}"))
- self.assertEqual("/fred?bar=%E2%98%9A#%E2%98%84", iri2uri("/fred?bar=\N{BLACK LEFT POINTING INDEX}#\N{COMET}"))
- self.assertEqual("/fred?bar=%E2%98%9A#%E2%98%84", iri2uri(iri2uri("/fred?bar=\N{BLACK LEFT POINTING INDEX}#\N{COMET}")))
- self.assertNotEqual("/fred?bar=%E2%98%9A#%E2%98%84", iri2uri("/fred?bar=\N{BLACK LEFT POINTING INDEX}#\N{COMET}".encode('utf-8')))
-
- unittest.main()
-
-
+"""Converts an IRI to a URI."""
+
+__author__ = "Joe Gregorio (joe@bitworking.org)"
+__copyright__ = "Copyright 2006, Joe Gregorio"
+__contributors__ = []
+__version__ = "1.0.0"
+__license__ = "MIT"
+
+import urllib.parse
+
+# Convert an IRI to a URI following the rules in RFC 3987
+#
+# The characters we need to enocde and escape are defined in the spec:
+#
+# iprivate = %xE000-F8FF / %xF0000-FFFFD / %x100000-10FFFD
+# ucschar = %xA0-D7FF / %xF900-FDCF / %xFDF0-FFEF
+# / %x10000-1FFFD / %x20000-2FFFD / %x30000-3FFFD
+# / %x40000-4FFFD / %x50000-5FFFD / %x60000-6FFFD
+# / %x70000-7FFFD / %x80000-8FFFD / %x90000-9FFFD
+# / %xA0000-AFFFD / %xB0000-BFFFD / %xC0000-CFFFD
+# / %xD0000-DFFFD / %xE1000-EFFFD
+
+escape_range = [
+ (0xA0, 0xD7FF),
+ (0xE000, 0xF8FF),
+ (0xF900, 0xFDCF),
+ (0xFDF0, 0xFFEF),
+ (0x10000, 0x1FFFD),
+ (0x20000, 0x2FFFD),
+ (0x30000, 0x3FFFD),
+ (0x40000, 0x4FFFD),
+ (0x50000, 0x5FFFD),
+ (0x60000, 0x6FFFD),
+ (0x70000, 0x7FFFD),
+ (0x80000, 0x8FFFD),
+ (0x90000, 0x9FFFD),
+ (0xA0000, 0xAFFFD),
+ (0xB0000, 0xBFFFD),
+ (0xC0000, 0xCFFFD),
+ (0xD0000, 0xDFFFD),
+ (0xE1000, 0xEFFFD),
+ (0xF0000, 0xFFFFD),
+ (0x100000, 0x10FFFD),
+]
+
+
+def encode(c):
+ retval = c
+ i = ord(c)
+ for low, high in escape_range:
+ if i < low:
+ break
+ if i >= low and i <= high:
+ retval = "".join(["%%%2X" % o for o in c.encode("utf-8")])
+ break
+ return retval
+
+
+def iri2uri(uri):
+ """Convert an IRI to a URI. Note that IRIs must be
+ passed in a unicode strings. That is, do not utf-8 encode
+ the IRI before passing it into the function."""
+ if isinstance(uri, str):
+ (scheme, authority, path, query, fragment) = urllib.parse.urlsplit(uri)
+ authority = authority.encode("idna").decode("utf-8")
+ # For each character in 'ucschar' or 'iprivate'
+ # 1. encode as utf-8
+ # 2. then %-encode each octet of that utf-8
+ uri = urllib.parse.urlunsplit((scheme, authority, path, query, fragment))
+ uri = "".join([encode(c) for c in uri])
+ return uri
+
+
+if __name__ == "__main__":
+ import unittest
+
+ class Test(unittest.TestCase):
+ def test_uris(self):
+ """Test that URIs are invariant under the transformation."""
+ invariant = [
+ "ftp://ftp.is.co.za/rfc/rfc1808.txt",
+ "http://www.ietf.org/rfc/rfc2396.txt",
+ "ldap://[2001:db8::7]/c=GB?objectClass?one",
+ "mailto:John.Doe@example.com",
+ "news:comp.infosystems.www.servers.unix",
+ "tel:+1-816-555-1212",
+ "telnet://192.0.2.16:80/",
+ "urn:oasis:names:specification:docbook:dtd:xml:4.1.2",
+ ]
+ for uri in invariant:
+ self.assertEqual(uri, iri2uri(uri))
+
+ def test_iri(self):
+ """Test that the right type of escaping is done for each part of the URI."""
+ self.assertEqual(
+ "http://xn--o3h.com/%E2%98%84",
+ iri2uri("http://\N{COMET}.com/\N{COMET}"),
+ )
+ self.assertEqual(
+ "http://bitworking.org/?fred=%E2%98%84",
+ iri2uri("http://bitworking.org/?fred=\N{COMET}"),
+ )
+ self.assertEqual(
+ "http://bitworking.org/#%E2%98%84",
+ iri2uri("http://bitworking.org/#\N{COMET}"),
+ )
+ self.assertEqual("#%E2%98%84", iri2uri("#\N{COMET}"))
+ self.assertEqual(
+ "/fred?bar=%E2%98%9A#%E2%98%84",
+ iri2uri("/fred?bar=\N{BLACK LEFT POINTING INDEX}#\N{COMET}"),
+ )
+ self.assertEqual(
+ "/fred?bar=%E2%98%9A#%E2%98%84",
+ iri2uri(iri2uri("/fred?bar=\N{BLACK LEFT POINTING INDEX}#\N{COMET}")),
+ )
+ self.assertNotEqual(
+ "/fred?bar=%E2%98%9A#%E2%98%84",
+ iri2uri(
+ "/fred?bar=\N{BLACK LEFT POINTING INDEX}#\N{COMET}".encode("utf-8")
+ ),
+ )
+
+ unittest.main()
diff --git a/python3/httplib2/socks.py b/python3/httplib2/socks.py
index 7fc0591..9848f70 100644
--- a/python3/httplib2/socks.py
+++ b/python3/httplib2/socks.py
@@ -1,4 +1,5 @@
"""SocksiPy - Python SOCKS module.
+
Version 1.00
Copyright 2006 Dan-Haim. All rights reserved.
@@ -24,20 +25,14 @@
LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMANGE.
-
This module provides a standard socket-like interface for Python
for tunneling connections through SOCKS proxies.
-"""
-
-"""
-
-Minor modifications made by Christopher Gilbert (http://motomastyle.com/)
-for use in PyLoris (http://pyloris.sourceforge.net/)
+Minor modifications made by Christopher Gilbert (http://motomastyle.com/) for
+use in PyLoris (http://pyloris.sourceforge.net/).
Minor modifications made by Mario Vilas (http://breakingcode.wordpress.com/)
-mainly to merge bug fixes found in Sourceforge
-
+mainly to merge bug fixes found in Sourceforge.
"""
import base64
@@ -45,8 +40,8 @@
import struct
import sys
-if getattr(socket, 'socket', None) is None:
- raise ImportError('socket.socket missing, proxy support unusable')
+if getattr(socket, "socket", None) is None:
+ raise ImportError("socket.socket missing, proxy support unusable")
PROXY_TYPE_SOCKS4 = 1
PROXY_TYPE_SOCKS5 = 2
@@ -56,21 +51,42 @@
_defaultproxy = None
_orgsocket = socket.socket
-class ProxyError(Exception): pass
-class GeneralProxyError(ProxyError): pass
-class Socks5AuthError(ProxyError): pass
-class Socks5Error(ProxyError): pass
-class Socks4Error(ProxyError): pass
-class HTTPError(ProxyError): pass
-_generalerrors = ("success",
+class ProxyError(Exception):
+ pass
+
+
+class GeneralProxyError(ProxyError):
+ pass
+
+
+class Socks5AuthError(ProxyError):
+ pass
+
+
+class Socks5Error(ProxyError):
+ pass
+
+
+class Socks4Error(ProxyError):
+ pass
+
+
+class HTTPError(ProxyError):
+ pass
+
+
+_generalerrors = (
+ "success",
"invalid data",
"not connected",
"not available",
"bad proxy type",
- "bad input")
+ "bad input",
+)
-_socks5errors = ("succeeded",
+_socks5errors = (
+ "succeeded",
"general SOCKS server failure",
"connection not allowed by ruleset",
"Network unreachable",
@@ -79,21 +95,30 @@
"TTL expired",
"Command not supported",
"Address type not supported",
- "Unknown error")
+ "Unknown error",
+)
-_socks5autherrors = ("succeeded",
+_socks5autherrors = (
+ "succeeded",
"authentication is required",
"all offered authentication methods were rejected",
"unknown username or invalid password",
- "unknown error")
+ "unknown error",
+)
-_socks4errors = ("request granted",
+_socks4errors = (
+ "request granted",
"request rejected or failed",
"request rejected because SOCKS server cannot connect to identd on the client",
- "request rejected because the client program and identd report different user-ids",
- "unknown error")
+ "request rejected because the client program and identd report different "
+ "user-ids",
+ "unknown error",
+)
-def setdefaultproxy(proxytype=None, addr=None, port=None, rdns=True, username=None, password=None):
+
+def setdefaultproxy(
+ proxytype=None, addr=None, port=None, rdns=True, username=None, password=None
+):
"""setdefaultproxy(proxytype, addr[, port[, rdns[, username[, password]]]])
Sets a default proxy which all further socksocket objects will use,
unless explicitly changed.
@@ -101,11 +126,14 @@
global _defaultproxy
_defaultproxy = (proxytype, addr, port, rdns, username, password)
+
def wrapmodule(module):
"""wrapmodule(module)
+
Attempts to replace a module's socket library with a SOCKS socket. Must set
a default proxy using setdefaultproxy(...) first.
- This will only work on modules that import socket directly into the namespace;
+ This will only work on modules that import socket directly into the
+ namespace;
most of the Python Standard Library falls into this category.
"""
if _defaultproxy != None:
@@ -113,6 +141,7 @@
else:
raise GeneralProxyError((4, "no proxy specified"))
+
class socksocket(socket.socket):
"""socksocket([family[, type[, proto]]]) -> socket object
Open a SOCKS enabled socket. The parameters are the same as
@@ -120,7 +149,9 @@
you must specify family=AF_INET, type=SOCK_STREAM and proto=0.
"""
- def __init__(self, family=socket.AF_INET, type=socket.SOCK_STREAM, proto=0, _sock=None):
+ def __init__(
+ self, family=socket.AF_INET, type=socket.SOCK_STREAM, proto=0, _sock=None
+ ):
_orgsocket.__init__(self, family, type, proto, _sock)
if _defaultproxy != None:
self.__proxy = _defaultproxy
@@ -137,8 +168,9 @@
"""
data = self.recv(count)
while len(data) < count:
- d = self.recv(count-len(data))
- if not d: raise GeneralProxyError((0, "connection closed unexpectedly"))
+ d = self.recv(count - len(data))
+ if not d:
+ raise GeneralProxyError((0, "connection closed unexpectedly"))
data = data + d
return data
@@ -167,7 +199,7 @@
hdrs.remove(endpt)
host = host.split(" ")[1]
endpt = endpt.split(" ")
- if (self.__proxy[4] != None and self.__proxy[5] != None):
+ if self.__proxy[4] != None and self.__proxy[5] != None:
hdrs.insert(0, self.__getauthheader())
hdrs.insert(0, "Host: %s" % host)
hdrs.insert(0, "%s http://%s%s %s" % (endpt[0], host, endpt[1], endpt[2]))
@@ -177,8 +209,18 @@
auth = self.__proxy[4] + ":" + self.__proxy[5]
return "Proxy-Authorization: Basic " + base64.b64encode(auth)
- def setproxy(self, proxytype=None, addr=None, port=None, rdns=True, username=None, password=None, headers=None):
+ def setproxy(
+ self,
+ proxytype=None,
+ addr=None,
+ port=None,
+ rdns=True,
+ username=None,
+ password=None,
+ headers=None,
+ ):
"""setproxy(proxytype, addr[, port[, rdns[, username[, password]]]])
+
Sets the proxy to be used.
proxytype - The type of the proxy to be used. Three types
are supported: PROXY_TYPE_SOCKS4 (including socks4a),
@@ -193,7 +235,8 @@
The default is no authentication.
password - Password to authenticate with to the server.
Only relevant when username is also provided.
- headers - Additional or modified headers for the proxy connect request.
+ headers - Additional or modified headers for the proxy connect
+ request.
"""
self.__proxy = (proxytype, addr, port, rdns, username, password, headers)
@@ -202,15 +245,15 @@
Negotiates a connection through a SOCKS5 server.
"""
# First we'll send the authentication packages we support.
- if (self.__proxy[4]!=None) and (self.__proxy[5]!=None):
+ if (self.__proxy[4] != None) and (self.__proxy[5] != None):
# The username/password details were supplied to the
# setproxy method so we support the USERNAME/PASSWORD
# authentication (in addition to the standard none).
- self.sendall(struct.pack('BBBB', 0x05, 0x02, 0x00, 0x02))
+ self.sendall(struct.pack("BBBB", 0x05, 0x02, 0x00, 0x02))
else:
# No username/password were entered, therefore we
# only support connections with no authentication.
- self.sendall(struct.pack('BBB', 0x05, 0x01, 0x00))
+ self.sendall(struct.pack("BBB", 0x05, 0x01, 0x00))
# We'll receive the server's response to determine which
# method was selected
chosenauth = self.__recvall(2)
@@ -224,7 +267,13 @@
elif chosenauth[1:2] == chr(0x02).encode():
# Okay, we need to perform a basic username/password
# authentication.
- self.sendall(chr(0x01).encode() + chr(len(self.__proxy[4])) + self.__proxy[4] + chr(len(self.__proxy[5])) + self.__proxy[5])
+ self.sendall(
+ chr(0x01).encode()
+ + chr(len(self.__proxy[4]))
+ + self.__proxy[4]
+ + chr(len(self.__proxy[5]))
+ + self.__proxy[5]
+ )
authstat = self.__recvall(2)
if authstat[0:1] != chr(0x01).encode():
# Bad response
@@ -243,7 +292,7 @@
else:
raise GeneralProxyError((1, _generalerrors[1]))
# Now we can request the actual connection
- req = struct.pack('BBB', 0x05, 0x01, 0x00)
+ req = struct.pack("BBB", 0x05, 0x01, 0x00)
# If the given destination address is an IP address, we'll
# use the IPv4 address request even if remote resolving was specified.
try:
@@ -254,7 +303,12 @@
if self.__proxy[3]:
# Resolve remotely
ipaddr = None
- req = req + chr(0x03).encode() + chr(len(destaddr)).encode() + destaddr.encode()
+ req = (
+ req
+ + chr(0x03).encode()
+ + chr(len(destaddr)).encode()
+ + destaddr.encode()
+ )
else:
# Resolve locally
ipaddr = socket.inet_aton(socket.gethostbyname(destaddr))
@@ -269,7 +323,7 @@
elif resp[1:2] != chr(0x00).encode():
# Connection failed
self.close()
- if ord(resp[1:2])<=8:
+ if ord(resp[1:2]) <= 8:
raise Socks5Error((ord(resp[1:2]), _socks5errors[ord(resp[1:2])]))
else:
raise Socks5Error((9, _socks5errors[9]))
@@ -281,7 +335,7 @@
boundaddr = self.__recvall(ord(resp[4:5]))
else:
self.close()
- raise GeneralProxyError((1,_generalerrors[1]))
+ raise GeneralProxyError((1, _generalerrors[1]))
boundport = struct.unpack(">H", self.__recvall(2))[0]
self.__proxysockname = (boundaddr, boundport)
if ipaddr != None:
@@ -308,7 +362,7 @@
"""
return self.__proxypeername
- def __negotiatesocks4(self,destaddr,destport):
+ def __negotiatesocks4(self, destaddr, destport):
"""__negotiatesocks4(self,destaddr,destport)
Negotiates a connection through a SOCKS4 server.
"""
@@ -340,7 +394,7 @@
if resp[0:1] != chr(0x00).encode():
# Bad data
self.close()
- raise GeneralProxyError((1,_generalerrors[1]))
+ raise GeneralProxyError((1, _generalerrors[1]))
if resp[1:2] != chr(0x5A).encode():
# Server returned an error
self.close()
@@ -350,7 +404,10 @@
else:
raise Socks4Error((94, _socks4errors[4]))
# Get the bound address/port
- self.__proxysockname = (socket.inet_ntoa(resp[4:]), struct.unpack(">H", resp[2:4])[0])
+ self.__proxysockname = (
+ socket.inet_ntoa(resp[4:]),
+ struct.unpack(">H", resp[2:4])[0],
+ )
if rmtrslv != None:
self.__proxypeername = (socket.inet_ntoa(ipaddr), destport)
else:
@@ -365,18 +422,18 @@
addr = socket.gethostbyname(destaddr)
else:
addr = destaddr
- headers = ["CONNECT ", addr, ":", str(destport), " HTTP/1.1\r\n"]
+ headers = ["CONNECT ", addr, ":", str(destport), " HTTP/1.1\r\n"]
wrote_host_header = False
wrote_auth_header = False
if self.__proxy[6] != None:
for key, val in self.__proxy[6].iteritems():
headers += [key, ": ", val, "\r\n"]
- wrote_host_header = (key.lower() == "host")
- wrote_auth_header = (key.lower() == "proxy-authorization")
+ wrote_host_header = key.lower() == "host"
+ wrote_auth_header = key.lower() == "proxy-authorization"
if not wrote_host_header:
headers += ["Host: ", destaddr, "\r\n"]
if not wrote_auth_header:
- if (self.__proxy[4] != None and self.__proxy[5] != None):
+ if self.__proxy[4] != None and self.__proxy[5] != None:
headers += [self.__getauthheader(), "\r\n"]
headers.append("\r\n")
self.sendall("".join(headers).encode())
@@ -409,7 +466,12 @@
To select the proxy server use setproxy().
"""
# Do a minimal input check first
- if (not type(destpair) in (list,tuple)) or (len(destpair) < 2) or (not isinstance(destpair[0], (str, bytes))) or (type(destpair[1]) != int):
+ if (
+ (not type(destpair) in (list, tuple))
+ or (len(destpair) < 2)
+ or (not isinstance(destpair[0], (str, bytes)))
+ or (type(destpair[1]) != int)
+ ):
raise GeneralProxyError((5, _generalerrors[5]))
if self.__proxy[0] == PROXY_TYPE_SOCKS5:
if self.__proxy[2] != None:
@@ -423,23 +485,23 @@
portnum = self.__proxy[2]
else:
portnum = 1080
- _orgsocket.connect(self,(self.__proxy[1], portnum))
+ _orgsocket.connect(self, (self.__proxy[1], portnum))
self.__negotiatesocks4(destpair[0], destpair[1])
elif self.__proxy[0] == PROXY_TYPE_HTTP:
if self.__proxy[2] != None:
portnum = self.__proxy[2]
else:
portnum = 8080
- _orgsocket.connect(self,(self.__proxy[1], portnum))
+ _orgsocket.connect(self, (self.__proxy[1], portnum))
self.__negotiatehttp(destpair[0], destpair[1])
elif self.__proxy[0] == PROXY_TYPE_HTTP_NO_TUNNEL:
if self.__proxy[2] != None:
portnum = self.__proxy[2]
else:
portnum = 8080
- _orgsocket.connect(self,(self.__proxy[1],portnum))
+ _orgsocket.connect(self, (self.__proxy[1], portnum))
if destpair[1] == 443:
- self.__negotiatehttp(destpair[0],destpair[1])
+ self.__negotiatehttp(destpair[0], destpair[1])
else:
self.__httptunnel = False
elif self.__proxy[0] == None:
diff --git a/python3/httplib2test.py b/python3/httplib2test.py
index a4afae9..c1fd484 100755
--- a/python3/httplib2test.py
+++ b/python3/httplib2test.py
@@ -1,1640 +1,1929 @@
-#!/usr/bin/env python3
-"""
-httplib2test
-
-A set of unit tests for httplib2.py.
-
-Requires Python 3.0 or later
-"""
-
-__author__ = "Joe Gregorio (joe@bitworking.org)"
-__copyright__ = "Copyright 2006, Joe Gregorio"
-__contributors__ = ["Mark Pilgrim"]
-__license__ = "MIT"
-__history__ = """ """
-__version__ = "0.2 ($Rev: 118 $)"
-
-import base64
-import http.client
-import httplib2
-import io
-import os
-import pickle
-import socket
-import ssl
-import sys
-import time
-import unittest
-import urllib.parse
-
-# The test resources base uri
-base = 'http://bitworking.org/projects/httplib2/test/'
-#base = 'http://localhost/projects/httplib2/test/'
-cacheDirName = ".cache"
-
-
-class CredentialsTest(unittest.TestCase):
- def test(self):
- c = httplib2.Credentials()
- c.add("joe", "password")
- self.assertEqual(("joe", "password"), list(c.iter("bitworking.org"))[0])
- self.assertEqual(("joe", "password"), list(c.iter(""))[0])
- c.add("fred", "password2", "wellformedweb.org")
- self.assertEqual(("joe", "password"), list(c.iter("bitworking.org"))[0])
- self.assertEqual(1, len(list(c.iter("bitworking.org"))))
- self.assertEqual(2, len(list(c.iter("wellformedweb.org"))))
- self.assertTrue(("fred", "password2") in list(c.iter("wellformedweb.org")))
- c.clear()
- self.assertEqual(0, len(list(c.iter("bitworking.org"))))
- c.add("fred", "password2", "wellformedweb.org")
- self.assertTrue(("fred", "password2") in list(c.iter("wellformedweb.org")))
- self.assertEqual(0, len(list(c.iter("bitworking.org"))))
- self.assertEqual(0, len(list(c.iter(""))))
-
-
-class ParserTest(unittest.TestCase):
- def testFromStd66(self):
- self.assertEqual( ('http', 'example.com', '', None, None ), httplib2.parse_uri("http://example.com"))
- self.assertEqual( ('https', 'example.com', '', None, None ), httplib2.parse_uri("https://example.com"))
- self.assertEqual( ('https', 'example.com:8080', '', None, None ), httplib2.parse_uri("https://example.com:8080"))
- self.assertEqual( ('http', 'example.com', '/', None, None ), httplib2.parse_uri("http://example.com/"))
- self.assertEqual( ('http', 'example.com', '/path', None, None ), httplib2.parse_uri("http://example.com/path"))
- self.assertEqual( ('http', 'example.com', '/path', 'a=1&b=2', None ), httplib2.parse_uri("http://example.com/path?a=1&b=2"))
- self.assertEqual( ('http', 'example.com', '/path', 'a=1&b=2', 'fred' ), httplib2.parse_uri("http://example.com/path?a=1&b=2#fred"))
- self.assertEqual( ('http', 'example.com', '/path', 'a=1&b=2', 'fred' ), httplib2.parse_uri("http://example.com/path?a=1&b=2#fred"))
-
-
-class UrlNormTest(unittest.TestCase):
- def test(self):
- self.assertEqual( "http://example.org/", httplib2.urlnorm("http://example.org")[-1])
- self.assertEqual( "http://example.org/", httplib2.urlnorm("http://EXAMple.org")[-1])
- self.assertEqual( "http://example.org/?=b", httplib2.urlnorm("http://EXAMple.org?=b")[-1])
- self.assertEqual( "http://example.org/mypath?a=b", httplib2.urlnorm("http://EXAMple.org/mypath?a=b")[-1])
- self.assertEqual( "http://localhost:80/", httplib2.urlnorm("http://localhost:80")[-1])
- self.assertEqual( httplib2.urlnorm("http://localhost:80/"), httplib2.urlnorm("HTTP://LOCALHOST:80"))
- try:
- httplib2.urlnorm("/")
- self.fail("Non-absolute URIs should raise an exception")
- except httplib2.RelativeURIError:
- pass
-
-class UrlSafenameTest(unittest.TestCase):
- def test(self):
- # Test that different URIs end up generating different safe names
- self.assertEqual( "example.org,fred,a=b,58489f63a7a83c3b7794a6a398ee8b1f", httplib2.safename("http://example.org/fred/?a=b"))
- self.assertEqual( "example.org,fred,a=b,8c5946d56fec453071f43329ff0be46b", httplib2.safename("http://example.org/fred?/a=b"))
- self.assertEqual( "www.example.org,fred,a=b,499c44b8d844a011b67ea2c015116968", httplib2.safename("http://www.example.org/fred?/a=b"))
- self.assertEqual( httplib2.safename(httplib2.urlnorm("http://www")[-1]), httplib2.safename(httplib2.urlnorm("http://WWW")[-1]))
- self.assertEqual( "www.example.org,fred,a=b,692e843a333484ce0095b070497ab45d", httplib2.safename("https://www.example.org/fred?/a=b"))
- self.assertNotEqual( httplib2.safename("http://www"), httplib2.safename("https://www"))
- # Test the max length limits
- uri = "http://" + ("w" * 200) + ".org"
- uri2 = "http://" + ("w" * 201) + ".org"
- self.assertNotEqual( httplib2.safename(uri2), httplib2.safename(uri))
- # Max length should be 200 + 1 (",") + 32
- self.assertEqual(233, len(httplib2.safename(uri2)))
- self.assertEqual(233, len(httplib2.safename(uri)))
- # Unicode
- if sys.version_info >= (2,3):
- self.assertEqual( "xn--http,-4y1d.org,fred,a=b,579924c35db315e5a32e3d9963388193", httplib2.safename("http://\u2304.org/fred/?a=b"))
-
-class _MyResponse(io.BytesIO):
- def __init__(self, body, **kwargs):
- io.BytesIO.__init__(self, body)
- self.headers = kwargs
-
- def items(self):
- return self.headers.items()
-
- def iteritems(self):
- return iter(self.headers.items())
-
-
-class _MyHTTPConnection(object):
- "This class is just a mock of httplib.HTTPConnection used for testing"
-
- def __init__(self, host, port=None, key_file=None, cert_file=None,
- strict=None, timeout=None, proxy_info=None):
- self.host = host
- self.port = port
- self.timeout = timeout
- self.log = ""
- self.sock = None
-
- def set_debuglevel(self, level):
- pass
-
- def connect(self):
- "Connect to a host on a given port."
- pass
-
- def close(self):
- pass
-
- def request(self, method, request_uri, body, headers):
- pass
-
- def getresponse(self):
- return _MyResponse(b"the body", status="200")
-
-
-class _MyHTTPBadStatusConnection(object):
- "Mock of httplib.HTTPConnection that raises BadStatusLine."
-
- num_calls = 0
-
- def __init__(self, host, port=None, key_file=None, cert_file=None,
- strict=None, timeout=None, proxy_info=None):
- self.host = host
- self.port = port
- self.timeout = timeout
- self.log = ""
- self.sock = None
- _MyHTTPBadStatusConnection.num_calls = 0
-
- def set_debuglevel(self, level):
- pass
-
- def connect(self):
- pass
-
- def close(self):
- pass
-
- def request(self, method, request_uri, body, headers):
- pass
-
- def getresponse(self):
- _MyHTTPBadStatusConnection.num_calls += 1
- raise http.client.BadStatusLine("")
-
-
-class HttpTest(unittest.TestCase):
- def setUp(self):
- if os.path.exists(cacheDirName):
- [os.remove(os.path.join(cacheDirName, file)) for file in os.listdir(cacheDirName)]
- self.http = httplib2.Http(cacheDirName)
- self.http.clear_credentials()
-
- def testIPv6NoSSL(self):
- try:
- self.http.request("http://[::1]/")
- except socket.gaierror:
- self.fail("should get the address family right for IPv6")
- except socket.error:
- # Even if IPv6 isn't installed on a machine it should just raise socket.error
- pass
-
- def testIPv6SSL(self):
- try:
- self.http.request("https://[::1]/")
- except socket.gaierror:
- self.fail("should get the address family right for IPv6")
- except socket.error:
- # Even if IPv6 isn't installed on a machine it should just raise socket.error
- pass
-
- def testConnectionType(self):
- self.http.force_exception_to_status_code = False
- response, content = self.http.request("http://bitworking.org", connection_type=_MyHTTPConnection)
- self.assertEqual(response['content-location'], "http://bitworking.org")
- self.assertEqual(content, b"the body")
-
-
- def testBadStatusLineRetry(self):
- old_retries = httplib2.RETRIES
- httplib2.RETRIES = 1
- self.http.force_exception_to_status_code = False
- try:
- response, content = self.http.request("http://bitworking.org",
- connection_type=_MyHTTPBadStatusConnection)
- except http.client.BadStatusLine:
- self.assertEqual(2, _MyHTTPBadStatusConnection.num_calls)
- httplib2.RETRIES = old_retries
-
-
- def testGetUnknownServer(self):
- self.http.force_exception_to_status_code = False
- try:
- self.http.request("http://fred.bitworking.org/")
- self.fail("An httplib2.ServerNotFoundError Exception must be thrown on an unresolvable server.")
- except httplib2.ServerNotFoundError:
- pass
-
- # Now test with exceptions turned off
- self.http.force_exception_to_status_code = True
-
- (response, content) = self.http.request("http://fred.bitworking.org/")
- self.assertEqual(response['content-type'], 'text/plain')
- self.assertTrue(content.startswith(b"Unable to find"))
- self.assertEqual(response.status, 400)
-
- def testGetConnectionRefused(self):
- self.http.force_exception_to_status_code = False
- try:
- self.http.request("http://localhost:7777/")
- self.fail("An socket.error exception must be thrown on Connection Refused.")
- except socket.error:
- pass
-
- # Now test with exceptions turned off
- self.http.force_exception_to_status_code = True
-
- (response, content) = self.http.request("http://localhost:7777/")
- self.assertEqual(response['content-type'], 'text/plain')
- self.assertTrue(b"Connection refused" in content)
- self.assertEqual(response.status, 400)
-
- def testGetIRI(self):
- if sys.version_info >= (2,3):
- uri = urllib.parse.urljoin(base, "reflector/reflector.cgi?d=\N{CYRILLIC CAPITAL LETTER DJE}")
- (response, content) = self.http.request(uri, "GET")
- d = self.reflector(content)
- self.assertTrue('QUERY_STRING' in d)
- self.assertTrue(d['QUERY_STRING'].find('%D0%82') > 0)
-
- def testGetIsDefaultMethod(self):
- # Test that GET is the default method
- uri = urllib.parse.urljoin(base, "methods/method_reflector.cgi")
- (response, content) = self.http.request(uri)
- self.assertEqual(response['x-method'], "GET")
-
- def testDifferentMethods(self):
- # Test that all methods can be used
- uri = urllib.parse.urljoin(base, "methods/method_reflector.cgi")
- for method in ["GET", "PUT", "DELETE", "POST"]:
- (response, content) = self.http.request(uri, method, body=b" ")
- self.assertEqual(response['x-method'], method)
-
- def testHeadRead(self):
- # Test that we don't try to read the response of a HEAD request
- # since httplib blocks response.read() for HEAD requests.
- # Oddly enough this doesn't appear as a problem when doing HEAD requests
- # against Apache servers.
- uri = "http://www.google.com/"
- (response, content) = self.http.request(uri, "HEAD")
- self.assertEqual(response.status, 200)
- self.assertEqual(content, b"")
-
- def testGetNoCache(self):
- # Test that can do a GET w/o the cache turned on.
- http = httplib2.Http()
- uri = urllib.parse.urljoin(base, "304/test_etag.txt")
- (response, content) = http.request(uri, "GET")
- self.assertEqual(response.status, 200)
- self.assertEqual(response.previous, None)
-
- def testGetOnlyIfCachedCacheHit(self):
- # Test that can do a GET with cache and 'only-if-cached'
- uri = urllib.parse.urljoin(base, "304/test_etag.txt")
- (response, content) = self.http.request(uri, "GET")
- (response, content) = self.http.request(uri, "GET", headers={'cache-control': 'only-if-cached'})
- self.assertEqual(response.fromcache, True)
- self.assertEqual(response.status, 200)
-
- def testGetOnlyIfCachedCacheMiss(self):
- # Test that can do a GET with no cache with 'only-if-cached'
- uri = urllib.parse.urljoin(base, "304/test_etag.txt")
- (response, content) = self.http.request(uri, "GET", headers={'cache-control': 'only-if-cached'})
- self.assertEqual(response.fromcache, False)
- self.assertEqual(response.status, 504)
-
- def testGetOnlyIfCachedNoCacheAtAll(self):
- # Test that can do a GET with no cache with 'only-if-cached'
- # Of course, there might be an intermediary beyond us
- # that responds to the 'only-if-cached', so this
- # test can't really be guaranteed to pass.
- http = httplib2.Http()
- uri = urllib.parse.urljoin(base, "304/test_etag.txt")
- (response, content) = http.request(uri, "GET", headers={'cache-control': 'only-if-cached'})
- self.assertEqual(response.fromcache, False)
- self.assertEqual(response.status, 504)
-
- def testUserAgent(self):
- # Test that we provide a default user-agent
- uri = urllib.parse.urljoin(base, "user-agent/test.cgi")
- (response, content) = self.http.request(uri, "GET")
- self.assertEqual(response.status, 200)
- self.assertTrue(content.startswith(b"Python-httplib2/"))
-
- def testUserAgentNonDefault(self):
- # Test that the default user-agent can be over-ridden
-
- uri = urllib.parse.urljoin(base, "user-agent/test.cgi")
- (response, content) = self.http.request(uri, "GET", headers={'User-Agent': 'fred/1.0'})
- self.assertEqual(response.status, 200)
- self.assertTrue(content.startswith(b"fred/1.0"))
-
- def testGet300WithLocation(self):
- # Test the we automatically follow 300 redirects if a Location: header is provided
- uri = urllib.parse.urljoin(base, "300/with-location-header.asis")
- (response, content) = self.http.request(uri, "GET")
- self.assertEqual(response.status, 200)
- self.assertEqual(content, b"This is the final destination.\n")
- self.assertEqual(response.previous.status, 300)
- self.assertEqual(response.previous.fromcache, False)
-
- # Confirm that the intermediate 300 is not cached
- (response, content) = self.http.request(uri, "GET")
- self.assertEqual(response.status, 200)
- self.assertEqual(content, b"This is the final destination.\n")
- self.assertEqual(response.previous.status, 300)
- self.assertEqual(response.previous.fromcache, False)
-
- def testGet300WithLocationNoRedirect(self):
- # Test the we automatically follow 300 redirects if a Location: header is provided
- self.http.follow_redirects = False
- uri = urllib.parse.urljoin(base, "300/with-location-header.asis")
- (response, content) = self.http.request(uri, "GET")
- self.assertEqual(response.status, 300)
-
- def testGet300WithoutLocation(self):
- # Not giving a Location: header in a 300 response is acceptable
- # In which case we just return the 300 response
- uri = urllib.parse.urljoin(base, "300/without-location-header.asis")
- (response, content) = self.http.request(uri, "GET")
- self.assertEqual(response.status, 300)
- self.assertTrue(response['content-type'].startswith("text/html"))
- self.assertEqual(response.previous, None)
-
- def testGet301(self):
- # Test that we automatically follow 301 redirects
- # and that we cache the 301 response
- uri = urllib.parse.urljoin(base, "301/onestep.asis")
- destination = urllib.parse.urljoin(base, "302/final-destination.txt")
- (response, content) = self.http.request(uri, "GET")
- self.assertEqual(response.status, 200)
- self.assertTrue('content-location' in response)
- self.assertEqual(response['content-location'], destination)
- self.assertEqual(content, b"This is the final destination.\n")
- self.assertEqual(response.previous.status, 301)
- self.assertEqual(response.previous.fromcache, False)
-
- (response, content) = self.http.request(uri, "GET")
- self.assertEqual(response.status, 200)
- self.assertEqual(response['content-location'], destination)
- self.assertEqual(content, b"This is the final destination.\n")
- self.assertEqual(response.previous.status, 301)
- self.assertEqual(response.previous.fromcache, True)
-
- def testHead301(self):
- # Test that we automatically follow 301 redirects
- uri = urllib.parse.urljoin(base, "301/onestep.asis")
- (response, content) = self.http.request(uri, "HEAD")
- self.assertEqual(response.status, 200)
- self.assertEqual(response.previous.status, 301)
- self.assertEqual(response.previous.fromcache, False)
-
- def testGet301NoRedirect(self):
- # Test that we automatically follow 301 redirects
- # and that we cache the 301 response
- self.http.follow_redirects = False
- uri = urllib.parse.urljoin(base, "301/onestep.asis")
- destination = urllib.parse.urljoin(base, "302/final-destination.txt")
- (response, content) = self.http.request(uri, "GET")
- self.assertEqual(response.status, 301)
-
-
- def testGet302(self):
- # Test that we automatically follow 302 redirects
- # and that we DO NOT cache the 302 response
- uri = urllib.parse.urljoin(base, "302/onestep.asis")
- destination = urllib.parse.urljoin(base, "302/final-destination.txt")
- (response, content) = self.http.request(uri, "GET")
- self.assertEqual(response.status, 200)
- self.assertEqual(response['content-location'], destination)
- self.assertEqual(content, b"This is the final destination.\n")
- self.assertEqual(response.previous.status, 302)
- self.assertEqual(response.previous.fromcache, False)
-
- uri = urllib.parse.urljoin(base, "302/onestep.asis")
- (response, content) = self.http.request(uri, "GET")
- self.assertEqual(response.status, 200)
- self.assertEqual(response.fromcache, True)
- self.assertEqual(response['content-location'], destination)
- self.assertEqual(content, b"This is the final destination.\n")
- self.assertEqual(response.previous.status, 302)
- self.assertEqual(response.previous.fromcache, False)
- self.assertEqual(response.previous['content-location'], uri)
-
- uri = urllib.parse.urljoin(base, "302/twostep.asis")
-
- (response, content) = self.http.request(uri, "GET")
- self.assertEqual(response.status, 200)
- self.assertEqual(response.fromcache, True)
- self.assertEqual(content, b"This is the final destination.\n")
- self.assertEqual(response.previous.status, 302)
- self.assertEqual(response.previous.fromcache, False)
-
- def testGet302RedirectionLimit(self):
- # Test that we can set a lower redirection limit
- # and that we raise an exception when we exceed
- # that limit.
- self.http.force_exception_to_status_code = False
-
- uri = urllib.parse.urljoin(base, "302/twostep.asis")
- try:
- (response, content) = self.http.request(uri, "GET", redirections = 1)
- self.fail("This should not happen")
- except httplib2.RedirectLimit:
- pass
- except Exception as e:
- self.fail("Threw wrong kind of exception ")
-
- # Re-run the test with out the exceptions
- self.http.force_exception_to_status_code = True
-
- (response, content) = self.http.request(uri, "GET", redirections = 1)
- self.assertEqual(response.status, 500)
- self.assertTrue(response.reason.startswith("Redirected more"))
- self.assertEqual("302", response['status'])
- self.assertTrue(content.startswith(b"<html>"))
- self.assertTrue(response.previous != None)
-
- def testGet302NoLocation(self):
- # Test that we throw an exception when we get
- # a 302 with no Location: header.
- self.http.force_exception_to_status_code = False
- uri = urllib.parse.urljoin(base, "302/no-location.asis")
- try:
- (response, content) = self.http.request(uri, "GET")
- self.fail("Should never reach here")
- except httplib2.RedirectMissingLocation:
- pass
- except Exception as e:
- self.fail("Threw wrong kind of exception ")
-
- # Re-run the test with out the exceptions
- self.http.force_exception_to_status_code = True
-
- (response, content) = self.http.request(uri, "GET")
- self.assertEqual(response.status, 500)
- self.assertTrue(response.reason.startswith("Redirected but"))
- self.assertEqual("302", response['status'])
- self.assertTrue(content.startswith(b"This is content"))
-
- def testGet301ViaHttps(self):
- # Google always redirects to http://google.com
- (response, content) = self.http.request("https://code.google.com/apis/", "GET")
- self.assertEqual(200, response.status)
- self.assertEqual(301, response.previous.status)
-
- def testGetViaHttps(self):
- # Test that we can handle HTTPS
- (response, content) = self.http.request("https://google.com/adsense/", "GET")
- self.assertEqual(200, response.status)
-
- def testGetViaHttpsSpecViolationOnLocation(self):
- # Test that we follow redirects through HTTPS
- # even if they violate the spec by including
- # a relative Location: header instead of an
- # absolute one.
- (response, content) = self.http.request("https://google.com/adsense", "GET")
- self.assertEqual(200, response.status)
- self.assertNotEqual(None, response.previous)
-
-
- def testGetViaHttpsKeyCert(self):
- # At this point I can only test
- # that the key and cert files are passed in
- # correctly to httplib. It would be nice to have
- # a real https endpoint to test against.
- http = httplib2.Http(timeout=2)
-
- http.add_certificate("akeyfile", "acertfile", "bitworking.org")
- try:
- (response, content) = http.request("https://bitworking.org", "GET")
- except AttributeError:
- self.assertEqual(http.connections["https:bitworking.org"].key_file, "akeyfile")
- self.assertEqual(http.connections["https:bitworking.org"].cert_file, "acertfile")
- except IOError:
- # Skip on 3.2
- pass
-
- try:
- (response, content) = http.request("https://notthere.bitworking.org", "GET")
- except httplib2.ServerNotFoundError:
- self.assertEqual(http.connections["https:notthere.bitworking.org"].key_file, None)
- self.assertEqual(http.connections["https:notthere.bitworking.org"].cert_file, None)
- except IOError:
- # Skip on 3.2
- pass
-
- def testSslCertValidation(self):
- # Test that we get an ssl.SSLError when specifying a non-existent CA
- # certs file.
- http = httplib2.Http(ca_certs='/nosuchfile')
- self.assertRaises(IOError,
- http.request, "https://www.google.com/", "GET")
-
- # Test that we get a SSLHandshakeError if we try to access
- # https://www.google.com, using a CA cert file that doesn't contain
- # the CA Google uses (i.e., simulating a cert that's not signed by a
- # trusted CA).
- other_ca_certs = os.path.join(
- os.path.dirname(os.path.abspath(httplib2.__file__ )),
- "test", "other_cacerts.txt")
- http = httplib2.Http(ca_certs=other_ca_certs)
- self.assertRaises(ssl.SSLError,
- http.request,"https://www.google.com/", "GET")
-
- def testSniHostnameValidation(self):
- self.http.request("https://google.com/", method="GET")
-
- def testGet303(self):
- # Do a follow-up GET on a Location: header
- # returned from a POST that gave a 303.
- uri = urllib.parse.urljoin(base, "303/303.cgi")
- (response, content) = self.http.request(uri, "POST", " ")
- self.assertEqual(response.status, 200)
- self.assertEqual(content, b"This is the final destination.\n")
- self.assertEqual(response.previous.status, 303)
-
- def testGet303NoRedirect(self):
- # Do a follow-up GET on a Location: header
- # returned from a POST that gave a 303.
- self.http.follow_redirects = False
- uri = urllib.parse.urljoin(base, "303/303.cgi")
- (response, content) = self.http.request(uri, "POST", " ")
- self.assertEqual(response.status, 303)
-
- def test303ForDifferentMethods(self):
- # Test that all methods can be used
- uri = urllib.parse.urljoin(base, "303/redirect-to-reflector.cgi")
- for (method, method_on_303) in [("PUT", "GET"), ("DELETE", "GET"), ("POST", "GET"), ("GET", "GET"), ("HEAD", "GET")]:
- (response, content) = self.http.request(uri, method, body=b" ")
- self.assertEqual(response['x-method'], method_on_303)
-
- def testGet304(self):
- # Test that we use ETags properly to validate our cache
- uri = urllib.parse.urljoin(base, "304/test_etag.txt")
- (response, content) = self.http.request(uri, "GET", headers = {'accept-encoding': 'identity'})
- self.assertNotEqual(response['etag'], "")
-
- (response, content) = self.http.request(uri, "GET", headers = {'accept-encoding': 'identity'})
- (response, content) = self.http.request(uri, "GET", headers = {'accept-encoding': 'identity', 'cache-control': 'must-revalidate'})
- self.assertEqual(response.status, 200)
- self.assertEqual(response.fromcache, True)
-
- cache_file_name = os.path.join(cacheDirName, httplib2.safename(httplib2.urlnorm(uri)[-1]))
- f = open(cache_file_name, "r")
- status_line = f.readline()
- f.close()
-
- self.assertTrue(status_line.startswith("status:"))
-
- (response, content) = self.http.request(uri, "HEAD", headers = {'accept-encoding': 'identity'})
- self.assertEqual(response.status, 200)
- self.assertEqual(response.fromcache, True)
-
- (response, content) = self.http.request(uri, "GET", headers = {'accept-encoding': 'identity', 'range': 'bytes=0-0'})
- self.assertEqual(response.status, 206)
- self.assertEqual(response.fromcache, False)
-
- def testGetIgnoreEtag(self):
- # Test that we can forcibly ignore ETags
- uri = urllib.parse.urljoin(base, "reflector/reflector.cgi")
- (response, content) = self.http.request(uri, "GET", headers = {'accept-encoding': 'identity'})
- self.assertNotEqual(response['etag'], "")
-
- (response, content) = self.http.request(uri, "GET", headers = {'accept-encoding': 'identity', 'cache-control': 'max-age=0'})
- d = self.reflector(content)
- self.assertTrue('HTTP_IF_NONE_MATCH' in d)
-
- self.http.ignore_etag = True
- (response, content) = self.http.request(uri, "GET", headers = {'accept-encoding': 'identity', 'cache-control': 'max-age=0'})
- d = self.reflector(content)
- self.assertEqual(response.fromcache, False)
- self.assertFalse('HTTP_IF_NONE_MATCH' in d)
-
- def testOverrideEtag(self):
- # Test that we can forcibly ignore ETags
- uri = urllib.parse.urljoin(base, "reflector/reflector.cgi")
- (response, content) = self.http.request(uri, "GET", headers = {'accept-encoding': 'identity'})
- self.assertNotEqual(response['etag'], "")
-
- (response, content) = self.http.request(uri, "GET", headers = {'accept-encoding': 'identity', 'cache-control': 'max-age=0'})
- d = self.reflector(content)
- self.assertTrue('HTTP_IF_NONE_MATCH' in d)
- self.assertNotEqual(d['HTTP_IF_NONE_MATCH'], "fred")
-
- (response, content) = self.http.request(uri, "GET", headers = {'accept-encoding': 'identity', 'cache-control': 'max-age=0', 'if-none-match': 'fred'})
- d = self.reflector(content)
- self.assertTrue('HTTP_IF_NONE_MATCH' in d)
- self.assertEqual(d['HTTP_IF_NONE_MATCH'], "fred")
-
-#MAP-commented this out because it consistently fails
-# def testGet304EndToEnd(self):
-# # Test that end to end headers get overwritten in the cache
-# uri = urllib.parse.urljoin(base, "304/end2end.cgi")
-# (response, content) = self.http.request(uri, "GET")
-# self.assertNotEqual(response['etag'], "")
-# old_date = response['date']
-# time.sleep(2)
-#
-# (response, content) = self.http.request(uri, "GET", headers = {'Cache-Control': 'max-age=0'})
-# # The response should be from the cache, but the Date: header should be updated.
-# new_date = response['date']
-# self.assertNotEqual(new_date, old_date)
-# self.assertEqual(response.status, 200)
-# self.assertEqual(response.fromcache, True)
-
- def testGet304LastModified(self):
- # Test that we can still handle a 304
- # by only using the last-modified cache validator.
- uri = urllib.parse.urljoin(base, "304/last-modified-only/last-modified-only.txt")
- (response, content) = self.http.request(uri, "GET")
-
- self.assertNotEqual(response['last-modified'], "")
- (response, content) = self.http.request(uri, "GET")
- (response, content) = self.http.request(uri, "GET")
- self.assertEqual(response.status, 200)
- self.assertEqual(response.fromcache, True)
-
- def testGet307(self):
- # Test that we do follow 307 redirects but
- # do not cache the 307
- uri = urllib.parse.urljoin(base, "307/onestep.asis")
- (response, content) = self.http.request(uri, "GET")
- self.assertEqual(response.status, 200)
- self.assertEqual(content, b"This is the final destination.\n")
- self.assertEqual(response.previous.status, 307)
- self.assertEqual(response.previous.fromcache, False)
-
- (response, content) = self.http.request(uri, "GET")
- self.assertEqual(response.status, 200)
- self.assertEqual(response.fromcache, True)
- self.assertEqual(content, b"This is the final destination.\n")
- self.assertEqual(response.previous.status, 307)
- self.assertEqual(response.previous.fromcache, False)
-
- def testGet410(self):
- # Test that we pass 410's through
- uri = urllib.parse.urljoin(base, "410/410.asis")
- (response, content) = self.http.request(uri, "GET")
- self.assertEqual(response.status, 410)
-
- def testVaryHeaderSimple(self):
- """
- RFC 2616 13.6
- When the cache receives a subsequent request whose Request-URI
- specifies one or more cache entries including a Vary header field,
- the cache MUST NOT use such a cache entry to construct a response
- to the new request unless all of the selecting request-headers
- present in the new request match the corresponding stored
- request-headers in the original request.
- """
- # test that the vary header is sent
- uri = urllib.parse.urljoin(base, "vary/accept.asis")
- (response, content) = self.http.request(uri, "GET", headers={'Accept': 'text/plain'})
- self.assertEqual(response.status, 200)
- self.assertTrue('vary' in response)
-
- # get the resource again, from the cache since accept header in this
- # request is the same as the request
- (response, content) = self.http.request(uri, "GET", headers={'Accept': 'text/plain'})
- self.assertEqual(response.status, 200)
- self.assertEqual(response.fromcache, True, msg="Should be from cache")
-
- # get the resource again, not from cache since Accept headers does not match
- (response, content) = self.http.request(uri, "GET", headers={'Accept': 'text/html'})
- self.assertEqual(response.status, 200)
- self.assertEqual(response.fromcache, False, msg="Should not be from cache")
-
- # get the resource again, without any Accept header, so again no match
- (response, content) = self.http.request(uri, "GET")
- self.assertEqual(response.status, 200)
- self.assertEqual(response.fromcache, False, msg="Should not be from cache")
-
- def testNoVary(self):
- pass
- # when there is no vary, a different Accept header (e.g.) should not
- # impact if the cache is used
- # test that the vary header is not sent
- # uri = urllib.parse.urljoin(base, "vary/no-vary.asis")
- # (response, content) = self.http.request(uri, "GET", headers={'Accept': 'text/plain'})
- # self.assertEqual(response.status, 200)
- # self.assertFalse('vary' in response)
- #
- # (response, content) = self.http.request(uri, "GET", headers={'Accept': 'text/plain'})
- # self.assertEqual(response.status, 200)
- # self.assertEqual(response.fromcache, True, msg="Should be from cache")
- #
- # (response, content) = self.http.request(uri, "GET", headers={'Accept': 'text/html'})
- # self.assertEqual(response.status, 200)
- # self.assertEqual(response.fromcache, True, msg="Should be from cache")
-
- def testVaryHeaderDouble(self):
- uri = urllib.parse.urljoin(base, "vary/accept-double.asis")
- (response, content) = self.http.request(uri, "GET", headers={
- 'Accept': 'text/plain', 'Accept-Language': 'da, en-gb;q=0.8, en;q=0.7'})
- self.assertEqual(response.status, 200)
- self.assertTrue('vary' in response)
-
- # we are from cache
- (response, content) = self.http.request(uri, "GET", headers={
- 'Accept': 'text/plain', 'Accept-Language': 'da, en-gb;q=0.8, en;q=0.7'})
- self.assertEqual(response.fromcache, True, msg="Should be from cache")
-
- (response, content) = self.http.request(uri, "GET", headers={'Accept': 'text/plain'})
- self.assertEqual(response.status, 200)
- self.assertEqual(response.fromcache, False)
-
- # get the resource again, not from cache, varied headers don't match exact
- (response, content) = self.http.request(uri, "GET", headers={'Accept-Language': 'da'})
- self.assertEqual(response.status, 200)
- self.assertEqual(response.fromcache, False, msg="Should not be from cache")
-
- def testVaryUnusedHeader(self):
- # A header's value is not considered to vary if it's not used at all.
- uri = urllib.parse.urljoin(base, "vary/unused-header.asis")
- (response, content) = self.http.request(uri, "GET", headers={
- 'Accept': 'text/plain'})
- self.assertEqual(response.status, 200)
- self.assertTrue('vary' in response)
-
- # we are from cache
- (response, content) = self.http.request(uri, "GET", headers={
- 'Accept': 'text/plain',})
- self.assertEqual(response.fromcache, True, msg="Should be from cache")
-
- def testHeadGZip(self):
- # Test that we don't try to decompress a HEAD response
- uri = urllib.parse.urljoin(base, "gzip/final-destination.txt")
- (response, content) = self.http.request(uri, "HEAD")
- self.assertEqual(response.status, 200)
- self.assertNotEqual(int(response['content-length']), 0)
- self.assertEqual(content, b"")
-
- def testGetGZip(self):
- # Test that we support gzip compression
- uri = urllib.parse.urljoin(base, "gzip/final-destination.txt")
- (response, content) = self.http.request(uri, "GET")
- self.assertEqual(response.status, 200)
- self.assertFalse('content-encoding' in response)
- self.assertTrue('-content-encoding' in response)
- self.assertEqual(int(response['content-length']), len(b"This is the final destination.\n"))
- self.assertEqual(content, b"This is the final destination.\n")
-
- def testPostAndGZipResponse(self):
- uri = urllib.parse.urljoin(base, "gzip/post.cgi")
- (response, content) = self.http.request(uri, "POST", body=" ")
- self.assertEqual(response.status, 200)
- self.assertFalse('content-encoding' in response)
- self.assertTrue('-content-encoding' in response)
-
- def testGetGZipFailure(self):
- # Test that we raise a good exception when the gzip fails
- self.http.force_exception_to_status_code = False
- uri = urllib.parse.urljoin(base, "gzip/failed-compression.asis")
- try:
- (response, content) = self.http.request(uri, "GET")
- self.fail("Should never reach here")
- except httplib2.FailedToDecompressContent:
- pass
- except Exception:
- self.fail("Threw wrong kind of exception")
-
- # Re-run the test with out the exceptions
- self.http.force_exception_to_status_code = True
-
- (response, content) = self.http.request(uri, "GET")
- self.assertEqual(response.status, 500)
- self.assertTrue(response.reason.startswith("Content purported"))
-
- def testIndividualTimeout(self):
- uri = urllib.parse.urljoin(base, "timeout/timeout.cgi")
- http = httplib2.Http(timeout=1)
- http.force_exception_to_status_code = True
-
- (response, content) = http.request(uri)
- self.assertEqual(response.status, 408)
- self.assertTrue(response.reason.startswith("Request Timeout"))
- self.assertTrue(content.startswith(b"Request Timeout"))
-
-
- def testGetDeflate(self):
- # Test that we support deflate compression
- uri = urllib.parse.urljoin(base, "deflate/deflated.asis")
- (response, content) = self.http.request(uri, "GET")
- self.assertEqual(response.status, 200)
- self.assertFalse('content-encoding' in response)
- self.assertEqual(int(response['content-length']), len("This is the final destination."))
- self.assertEqual(content, b"This is the final destination.")
-
- def testGetDeflateFailure(self):
- # Test that we raise a good exception when the deflate fails
- self.http.force_exception_to_status_code = False
-
- uri = urllib.parse.urljoin(base, "deflate/failed-compression.asis")
- try:
- (response, content) = self.http.request(uri, "GET")
- self.fail("Should never reach here")
- except httplib2.FailedToDecompressContent:
- pass
- except Exception:
- self.fail("Threw wrong kind of exception")
-
- # Re-run the test with out the exceptions
- self.http.force_exception_to_status_code = True
-
- (response, content) = self.http.request(uri, "GET")
- self.assertEqual(response.status, 500)
- self.assertTrue(response.reason.startswith("Content purported"))
-
- def testGetDuplicateHeaders(self):
- # Test that duplicate headers get concatenated via ','
- uri = urllib.parse.urljoin(base, "duplicate-headers/multilink.asis")
- (response, content) = self.http.request(uri, "GET")
- self.assertEqual(response.status, 200)
- self.assertEqual(content, b"This is content\n")
- self.assertEqual(response['link'].split(",")[0], '<http://bitworking.org>; rel="home"; title="BitWorking"')
-
- def testGetCacheControlNoCache(self):
- # Test Cache-Control: no-cache on requests
- uri = urllib.parse.urljoin(base, "304/test_etag.txt")
- (response, content) = self.http.request(uri, "GET", headers = {'accept-encoding': 'identity'})
- self.assertNotEqual(response['etag'], "")
- (response, content) = self.http.request(uri, "GET", headers = {'accept-encoding': 'identity'})
- self.assertEqual(response.status, 200)
- self.assertEqual(response.fromcache, True)
-
- (response, content) = self.http.request(uri, "GET", headers = {'accept-encoding': 'identity', 'Cache-Control': 'no-cache'})
- self.assertEqual(response.status, 200)
- self.assertEqual(response.fromcache, False)
-
- def testGetCacheControlPragmaNoCache(self):
- # Test Pragma: no-cache on requests
- uri = urllib.parse.urljoin(base, "304/test_etag.txt")
- (response, content) = self.http.request(uri, "GET", headers = {'accept-encoding': 'identity'})
- self.assertNotEqual(response['etag'], "")
- (response, content) = self.http.request(uri, "GET", headers = {'accept-encoding': 'identity'})
- self.assertEqual(response.status, 200)
- self.assertEqual(response.fromcache, True)
-
- (response, content) = self.http.request(uri, "GET", headers = {'accept-encoding': 'identity', 'Pragma': 'no-cache'})
- self.assertEqual(response.status, 200)
- self.assertEqual(response.fromcache, False)
-
- def testGetCacheControlNoStoreRequest(self):
- # A no-store request means that the response should not be stored.
- uri = urllib.parse.urljoin(base, "304/test_etag.txt")
-
- (response, content) = self.http.request(uri, "GET", headers={'Cache-Control': 'no-store'})
- self.assertEqual(response.status, 200)
- self.assertEqual(response.fromcache, False)
-
- (response, content) = self.http.request(uri, "GET", headers={'Cache-Control': 'no-store'})
- self.assertEqual(response.status, 200)
- self.assertEqual(response.fromcache, False)
-
- def testGetCacheControlNoStoreResponse(self):
- # A no-store response means that the response should not be stored.
- uri = urllib.parse.urljoin(base, "no-store/no-store.asis")
-
- (response, content) = self.http.request(uri, "GET")
- self.assertEqual(response.status, 200)
- self.assertEqual(response.fromcache, False)
-
- (response, content) = self.http.request(uri, "GET")
- self.assertEqual(response.status, 200)
- self.assertEqual(response.fromcache, False)
-
- def testGetCacheControlNoCacheNoStoreRequest(self):
- # Test that a no-store, no-cache clears the entry from the cache
- # even if it was cached previously.
- uri = urllib.parse.urljoin(base, "304/test_etag.txt")
-
- (response, content) = self.http.request(uri, "GET")
- (response, content) = self.http.request(uri, "GET")
- self.assertEqual(response.fromcache, True)
- (response, content) = self.http.request(uri, "GET", headers={'Cache-Control': 'no-store, no-cache'})
- (response, content) = self.http.request(uri, "GET", headers={'Cache-Control': 'no-store, no-cache'})
- self.assertEqual(response.status, 200)
- self.assertEqual(response.fromcache, False)
-
- def testUpdateInvalidatesCache(self):
- # Test that calling PUT or DELETE on a
- # URI that is cache invalidates that cache.
- uri = urllib.parse.urljoin(base, "304/test_etag.txt")
-
- (response, content) = self.http.request(uri, "GET")
- (response, content) = self.http.request(uri, "GET")
- self.assertEqual(response.fromcache, True)
- (response, content) = self.http.request(uri, "DELETE")
- self.assertEqual(response.status, 405)
-
- (response, content) = self.http.request(uri, "GET")
- self.assertEqual(response.fromcache, False)
-
- def testUpdateUsesCachedETag(self):
- # Test that we natively support http://www.w3.org/1999/04/Editing/
- uri = urllib.parse.urljoin(base, "conditional-updates/test.cgi")
-
- (response, content) = self.http.request(uri, "GET")
- self.assertEqual(response.status, 200)
- self.assertEqual(response.fromcache, False)
- (response, content) = self.http.request(uri, "GET")
- self.assertEqual(response.status, 200)
- self.assertEqual(response.fromcache, True)
- (response, content) = self.http.request(uri, "PUT", body="foo")
- self.assertEqual(response.status, 200)
- (response, content) = self.http.request(uri, "PUT", body="foo")
- self.assertEqual(response.status, 412)
-
-
- def testUpdatePatchUsesCachedETag(self):
- # Test that we natively support http://www.w3.org/1999/04/Editing/
- uri = urllib.parse.urljoin(base, "conditional-updates/test.cgi")
-
- (response, content) = self.http.request(uri, "GET")
- self.assertEqual(response.status, 200)
- self.assertEqual(response.fromcache, False)
- (response, content) = self.http.request(uri, "GET")
- self.assertEqual(response.status, 200)
- self.assertEqual(response.fromcache, True)
- (response, content) = self.http.request(uri, "PATCH", body="foo")
- self.assertEqual(response.status, 200)
- (response, content) = self.http.request(uri, "PATCH", body="foo")
- self.assertEqual(response.status, 412)
-
- def testUpdateUsesCachedETagAndOCMethod(self):
- # Test that we natively support http://www.w3.org/1999/04/Editing/
- uri = urllib.parse.urljoin(base, "conditional-updates/test.cgi")
-
- (response, content) = self.http.request(uri, "GET")
- self.assertEqual(response.status, 200)
- self.assertEqual(response.fromcache, False)
- (response, content) = self.http.request(uri, "GET")
- self.assertEqual(response.status, 200)
- self.assertEqual(response.fromcache, True)
- self.http.optimistic_concurrency_methods.append("DELETE")
- (response, content) = self.http.request(uri, "DELETE")
- self.assertEqual(response.status, 200)
-
-
- def testUpdateUsesCachedETagOverridden(self):
- # Test that we natively support http://www.w3.org/1999/04/Editing/
- uri = urllib.parse.urljoin(base, "conditional-updates/test.cgi")
-
- (response, content) = self.http.request(uri, "GET")
- self.assertEqual(response.status, 200)
- self.assertEqual(response.fromcache, False)
- (response, content) = self.http.request(uri, "GET")
- self.assertEqual(response.status, 200)
- self.assertEqual(response.fromcache, True)
- (response, content) = self.http.request(uri, "PUT", body="foo", headers={'if-match': 'fred'})
- self.assertEqual(response.status, 412)
-
- def testBasicAuth(self):
- # Test Basic Authentication
- uri = urllib.parse.urljoin(base, "basic/file.txt")
- (response, content) = self.http.request(uri, "GET")
- self.assertEqual(response.status, 401)
-
- uri = urllib.parse.urljoin(base, "basic/")
- (response, content) = self.http.request(uri, "GET")
- self.assertEqual(response.status, 401)
-
- self.http.add_credentials('joe', 'password')
- (response, content) = self.http.request(uri, "GET")
- self.assertEqual(response.status, 200)
-
- uri = urllib.parse.urljoin(base, "basic/file.txt")
- (response, content) = self.http.request(uri, "GET")
- self.assertEqual(response.status, 200)
-
- def testBasicAuthWithDomain(self):
- # Test Basic Authentication
- uri = urllib.parse.urljoin(base, "basic/file.txt")
- (response, content) = self.http.request(uri, "GET")
- self.assertEqual(response.status, 401)
-
- uri = urllib.parse.urljoin(base, "basic/")
- (response, content) = self.http.request(uri, "GET")
- self.assertEqual(response.status, 401)
-
- self.http.add_credentials('joe', 'password', "example.org")
- (response, content) = self.http.request(uri, "GET")
- self.assertEqual(response.status, 401)
-
- uri = urllib.parse.urljoin(base, "basic/file.txt")
- (response, content) = self.http.request(uri, "GET")
- self.assertEqual(response.status, 401)
-
- domain = urllib.parse.urlparse(base)[1]
- self.http.add_credentials('joe', 'password', domain)
- (response, content) = self.http.request(uri, "GET")
- self.assertEqual(response.status, 200)
-
- uri = urllib.parse.urljoin(base, "basic/file.txt")
- (response, content) = self.http.request(uri, "GET")
- self.assertEqual(response.status, 200)
-
-
-
-
-
-
- def testBasicAuthTwoDifferentCredentials(self):
- # Test Basic Authentication with multiple sets of credentials
- uri = urllib.parse.urljoin(base, "basic2/file.txt")
- (response, content) = self.http.request(uri, "GET")
- self.assertEqual(response.status, 401)
-
- uri = urllib.parse.urljoin(base, "basic2/")
- (response, content) = self.http.request(uri, "GET")
- self.assertEqual(response.status, 401)
-
- self.http.add_credentials('fred', 'barney')
- (response, content) = self.http.request(uri, "GET")
- self.assertEqual(response.status, 200)
-
- uri = urllib.parse.urljoin(base, "basic2/file.txt")
- (response, content) = self.http.request(uri, "GET")
- self.assertEqual(response.status, 200)
-
- def testBasicAuthNested(self):
- # Test Basic Authentication with resources
- # that are nested
- uri = urllib.parse.urljoin(base, "basic-nested/")
- (response, content) = self.http.request(uri, "GET")
- self.assertEqual(response.status, 401)
-
- uri = urllib.parse.urljoin(base, "basic-nested/subdir")
- (response, content) = self.http.request(uri, "GET")
- self.assertEqual(response.status, 401)
-
- # Now add in credentials one at a time and test.
- self.http.add_credentials('joe', 'password')
-
- uri = urllib.parse.urljoin(base, "basic-nested/")
- (response, content) = self.http.request(uri, "GET")
- self.assertEqual(response.status, 200)
-
- uri = urllib.parse.urljoin(base, "basic-nested/subdir")
- (response, content) = self.http.request(uri, "GET")
- self.assertEqual(response.status, 401)
-
- self.http.add_credentials('fred', 'barney')
-
- uri = urllib.parse.urljoin(base, "basic-nested/")
- (response, content) = self.http.request(uri, "GET")
- self.assertEqual(response.status, 200)
-
- uri = urllib.parse.urljoin(base, "basic-nested/subdir")
- (response, content) = self.http.request(uri, "GET")
- self.assertEqual(response.status, 200)
-
- def testDigestAuth(self):
- # Test that we support Digest Authentication
- uri = urllib.parse.urljoin(base, "digest/")
- (response, content) = self.http.request(uri, "GET")
- self.assertEqual(response.status, 401)
-
- self.http.add_credentials('joe', 'password')
- (response, content) = self.http.request(uri, "GET")
- self.assertEqual(response.status, 200)
-
- uri = urllib.parse.urljoin(base, "digest/file.txt")
- (response, content) = self.http.request(uri, "GET")
-
- def testDigestAuthNextNonceAndNC(self):
- # Test that if the server sets nextnonce that we reset
- # the nonce count back to 1
- uri = urllib.parse.urljoin(base, "digest/file.txt")
- self.http.add_credentials('joe', 'password')
- (response, content) = self.http.request(uri, "GET", headers = {"cache-control":"no-cache"})
- info = httplib2._parse_www_authenticate(response, 'authentication-info')
- self.assertEqual(response.status, 200)
- (response, content) = self.http.request(uri, "GET", headers = {"cache-control":"no-cache"})
- info2 = httplib2._parse_www_authenticate(response, 'authentication-info')
- self.assertEqual(response.status, 200)
-
- if 'nextnonce' in info:
- self.assertEqual(info2['nc'], 1)
-
- def testDigestAuthStale(self):
- # Test that we can handle a nonce becoming stale
- uri = urllib.parse.urljoin(base, "digest-expire/file.txt")
- self.http.add_credentials('joe', 'password')
- (response, content) = self.http.request(uri, "GET", headers = {"cache-control":"no-cache"})
- info = httplib2._parse_www_authenticate(response, 'authentication-info')
- self.assertEqual(response.status, 200)
-
- time.sleep(3)
- # Sleep long enough that the nonce becomes stale
-
- (response, content) = self.http.request(uri, "GET", headers = {"cache-control":"no-cache"})
- self.assertFalse(response.fromcache)
- self.assertTrue(response._stale_digest)
- info3 = httplib2._parse_www_authenticate(response, 'authentication-info')
- self.assertEqual(response.status, 200)
-
- def reflector(self, content):
- return dict( [tuple(x.split("=", 1)) for x in content.decode('utf-8').strip().split("\n")] )
-
- def testReflector(self):
- uri = urllib.parse.urljoin(base, "reflector/reflector.cgi")
- (response, content) = self.http.request(uri, "GET")
- d = self.reflector(content)
- self.assertTrue('HTTP_USER_AGENT' in d)
-
-
- def testConnectionClose(self):
- uri = "http://www.google.com/"
- (response, content) = self.http.request(uri, "GET")
- for c in self.http.connections.values():
- self.assertNotEqual(None, c.sock)
- (response, content) = self.http.request(uri, "GET", headers={"connection": "close"})
- for c in self.http.connections.values():
- self.assertEqual(None, c.sock)
-
- def testPickleHttp(self):
- pickled_http = pickle.dumps(self.http)
- new_http = pickle.loads(pickled_http)
-
- self.assertEqual(sorted(new_http.__dict__.keys()),
- sorted(self.http.__dict__.keys()))
- for key in new_http.__dict__:
- if key in ('certificates', 'credentials'):
- self.assertEqual(new_http.__dict__[key].credentials,
- self.http.__dict__[key].credentials)
- elif key == 'cache':
- self.assertEqual(new_http.__dict__[key].cache,
- self.http.__dict__[key].cache)
- else:
- self.assertEqual(new_http.__dict__[key],
- self.http.__dict__[key])
-
- def testPickleHttpWithConnection(self):
- self.http.request('http://bitworking.org',
- connection_type=_MyHTTPConnection)
- pickled_http = pickle.dumps(self.http)
- new_http = pickle.loads(pickled_http)
-
- self.assertEqual(list(self.http.connections.keys()),
- ['http:bitworking.org'])
- self.assertEqual(new_http.connections, {})
-
- def testPickleCustomRequestHttp(self):
- def dummy_request(*args, **kwargs):
- return new_request(*args, **kwargs)
- dummy_request.dummy_attr = 'dummy_value'
-
- self.http.request = dummy_request
- pickled_http = pickle.dumps(self.http)
- self.assertFalse(b"S'request'" in pickled_http)
-
-try:
- import memcache
- class HttpTestMemCached(HttpTest):
- def setUp(self):
- self.cache = memcache.Client(['127.0.0.1:11211'], debug=0)
- #self.cache = memcache.Client(['10.0.0.4:11211'], debug=1)
- self.http = httplib2.Http(self.cache)
- self.cache.flush_all()
- # Not exactly sure why the sleep is needed here, but
- # if not present then some unit tests that rely on caching
- # fail. Memcached seems to lose some sets immediately
- # after a flush_all if the set is to a value that
- # was previously cached. (Maybe the flush is handled async?)
- time.sleep(1)
- self.http.clear_credentials()
-except:
- pass
-
-
-
-# ------------------------------------------------------------------------
-
-class HttpPrivateTest(unittest.TestCase):
-
- def testParseCacheControl(self):
- # Test that we can parse the Cache-Control header
- self.assertEqual({}, httplib2._parse_cache_control({}))
- self.assertEqual({'no-cache': 1}, httplib2._parse_cache_control({'cache-control': ' no-cache'}))
- cc = httplib2._parse_cache_control({'cache-control': ' no-cache, max-age = 7200'})
- self.assertEqual(cc['no-cache'], 1)
- self.assertEqual(cc['max-age'], '7200')
- cc = httplib2._parse_cache_control({'cache-control': ' , '})
- self.assertEqual(cc[''], 1)
-
- try:
- cc = httplib2._parse_cache_control({'cache-control': 'Max-age=3600;post-check=1800,pre-check=3600'})
- self.assertTrue("max-age" in cc)
- except:
- self.fail("Should not throw exception")
-
-
-
-
- def testNormalizeHeaders(self):
- # Test that we normalize headers to lowercase
- h = httplib2._normalize_headers({'Cache-Control': 'no-cache', 'Other': 'Stuff'})
- self.assertTrue('cache-control' in h)
- self.assertTrue('other' in h)
- self.assertEqual('Stuff', h['other'])
-
- def testConvertByteStr(self):
- with self.assertRaises(TypeError):
- httplib2._convert_byte_str(4)
- self.assertEqual('Hello World', httplib2._convert_byte_str(b'Hello World'))
- self.assertEqual('Bye World', httplib2._convert_byte_str('Bye World'))
-
- def testExpirationModelTransparent(self):
- # Test that no-cache makes our request TRANSPARENT
- response_headers = {
- 'cache-control': 'max-age=7200'
- }
- request_headers = {
- 'cache-control': 'no-cache'
- }
- self.assertEqual("TRANSPARENT", httplib2._entry_disposition(response_headers, request_headers))
-
- def testMaxAgeNonNumeric(self):
- # Test that no-cache makes our request TRANSPARENT
- response_headers = {
- 'cache-control': 'max-age=fred, min-fresh=barney'
- }
- request_headers = {
- }
- self.assertEqual("STALE", httplib2._entry_disposition(response_headers, request_headers))
-
-
- def testExpirationModelNoCacheResponse(self):
- # The date and expires point to an entry that should be
- # FRESH, but the no-cache over-rides that.
- now = time.time()
- response_headers = {
- 'date': time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now)),
- 'expires': time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now+4)),
- 'cache-control': 'no-cache'
- }
- request_headers = {
- }
- self.assertEqual("STALE", httplib2._entry_disposition(response_headers, request_headers))
-
- def testExpirationModelStaleRequestMustReval(self):
- # must-revalidate forces STALE
- self.assertEqual("STALE", httplib2._entry_disposition({}, {'cache-control': 'must-revalidate'}))
-
- def testExpirationModelStaleResponseMustReval(self):
- # must-revalidate forces STALE
- self.assertEqual("STALE", httplib2._entry_disposition({'cache-control': 'must-revalidate'}, {}))
-
- def testExpirationModelFresh(self):
- response_headers = {
- 'date': time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime()),
- 'cache-control': 'max-age=2'
- }
- request_headers = {
- }
- self.assertEqual("FRESH", httplib2._entry_disposition(response_headers, request_headers))
- time.sleep(3)
- self.assertEqual("STALE", httplib2._entry_disposition(response_headers, request_headers))
-
- def testExpirationMaxAge0(self):
- response_headers = {
- 'date': time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime()),
- 'cache-control': 'max-age=0'
- }
- request_headers = {
- }
- self.assertEqual("STALE", httplib2._entry_disposition(response_headers, request_headers))
-
- def testExpirationModelDateAndExpires(self):
- now = time.time()
- response_headers = {
- 'date': time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now)),
- 'expires': time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now+2)),
- }
- request_headers = {
- }
- self.assertEqual("FRESH", httplib2._entry_disposition(response_headers, request_headers))
- time.sleep(3)
- self.assertEqual("STALE", httplib2._entry_disposition(response_headers, request_headers))
-
- def testExpiresZero(self):
- now = time.time()
- response_headers = {
- 'date': time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now)),
- 'expires': "0",
- }
- request_headers = {
- }
- self.assertEqual("STALE", httplib2._entry_disposition(response_headers, request_headers))
-
- def testExpirationModelDateOnly(self):
- now = time.time()
- response_headers = {
- 'date': time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now+3)),
- }
- request_headers = {
- }
- self.assertEqual("STALE", httplib2._entry_disposition(response_headers, request_headers))
-
- def testExpirationModelOnlyIfCached(self):
- response_headers = {
- }
- request_headers = {
- 'cache-control': 'only-if-cached',
- }
- self.assertEqual("FRESH", httplib2._entry_disposition(response_headers, request_headers))
-
- def testExpirationModelMaxAgeBoth(self):
- now = time.time()
- response_headers = {
- 'date': time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now)),
- 'cache-control': 'max-age=2'
- }
- request_headers = {
- 'cache-control': 'max-age=0'
- }
- self.assertEqual("STALE", httplib2._entry_disposition(response_headers, request_headers))
-
- def testExpirationModelDateAndExpiresMinFresh1(self):
- now = time.time()
- response_headers = {
- 'date': time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now)),
- 'expires': time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now+2)),
- }
- request_headers = {
- 'cache-control': 'min-fresh=2'
- }
- self.assertEqual("STALE", httplib2._entry_disposition(response_headers, request_headers))
-
- def testExpirationModelDateAndExpiresMinFresh2(self):
- now = time.time()
- response_headers = {
- 'date': time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now)),
- 'expires': time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now+4)),
- }
- request_headers = {
- 'cache-control': 'min-fresh=2'
- }
- self.assertEqual("FRESH", httplib2._entry_disposition(response_headers, request_headers))
-
- def testParseWWWAuthenticateEmpty(self):
- res = httplib2._parse_www_authenticate({})
- self.assertEqual(len(list(res.keys())), 0)
-
- def testParseWWWAuthenticate(self):
- # different uses of spaces around commas
- res = httplib2._parse_www_authenticate({ 'www-authenticate': 'Test realm="test realm" , foo=foo ,bar="bar", baz=baz,qux=qux'})
- self.assertEqual(len(list(res.keys())), 1)
- self.assertEqual(len(list(res['test'].keys())), 5)
-
- # tokens with non-alphanum
- res = httplib2._parse_www_authenticate({ 'www-authenticate': 'T*!%#st realm=to*!%#en, to*!%#en="quoted string"'})
- self.assertEqual(len(list(res.keys())), 1)
- self.assertEqual(len(list(res['t*!%#st'].keys())), 2)
-
- # quoted string with quoted pairs
- res = httplib2._parse_www_authenticate({ 'www-authenticate': 'Test realm="a \\"test\\" realm"'})
- self.assertEqual(len(list(res.keys())), 1)
- self.assertEqual(res['test']['realm'], 'a "test" realm')
-
- def testParseWWWAuthenticateStrict(self):
- httplib2.USE_WWW_AUTH_STRICT_PARSING = 1;
- self.testParseWWWAuthenticate();
- httplib2.USE_WWW_AUTH_STRICT_PARSING = 0;
-
- def testParseWWWAuthenticateBasic(self):
- res = httplib2._parse_www_authenticate({ 'www-authenticate': 'Basic realm="me"'})
- basic = res['basic']
- self.assertEqual('me', basic['realm'])
-
- res = httplib2._parse_www_authenticate({ 'www-authenticate': 'Basic realm="me", algorithm="MD5"'})
- basic = res['basic']
- self.assertEqual('me', basic['realm'])
- self.assertEqual('MD5', basic['algorithm'])
-
- res = httplib2._parse_www_authenticate({ 'www-authenticate': 'Basic realm="me", algorithm=MD5'})
- basic = res['basic']
- self.assertEqual('me', basic['realm'])
- self.assertEqual('MD5', basic['algorithm'])
-
- def testParseWWWAuthenticateBasic2(self):
- res = httplib2._parse_www_authenticate({ 'www-authenticate': 'Basic realm="me",other="fred" '})
- basic = res['basic']
- self.assertEqual('me', basic['realm'])
- self.assertEqual('fred', basic['other'])
-
- def testParseWWWAuthenticateBasic3(self):
- res = httplib2._parse_www_authenticate({ 'www-authenticate': 'Basic REAlm="me" '})
- basic = res['basic']
- self.assertEqual('me', basic['realm'])
-
-
- def testParseWWWAuthenticateDigest(self):
- res = httplib2._parse_www_authenticate({ 'www-authenticate':
- 'Digest realm="testrealm@host.com", qop="auth,auth-int", nonce="dcd98b7102dd2f0e8b11d0f600bfb0c093", opaque="5ccc069c403ebaf9f0171e9517f40e41"'})
- digest = res['digest']
- self.assertEqual('testrealm@host.com', digest['realm'])
- self.assertEqual('auth,auth-int', digest['qop'])
-
-
- def testParseWWWAuthenticateMultiple(self):
- res = httplib2._parse_www_authenticate({ 'www-authenticate':
- 'Digest realm="testrealm@host.com", qop="auth,auth-int", nonce="dcd98b7102dd2f0e8b11d0f600bfb0c093", opaque="5ccc069c403ebaf9f0171e9517f40e41" Basic REAlm="me" '})
- digest = res['digest']
- self.assertEqual('testrealm@host.com', digest['realm'])
- self.assertEqual('auth,auth-int', digest['qop'])
- self.assertEqual('dcd98b7102dd2f0e8b11d0f600bfb0c093', digest['nonce'])
- self.assertEqual('5ccc069c403ebaf9f0171e9517f40e41', digest['opaque'])
- basic = res['basic']
- self.assertEqual('me', basic['realm'])
-
- def testParseWWWAuthenticateMultiple2(self):
- # Handle an added comma between challenges, which might get thrown in if the challenges were
- # originally sent in separate www-authenticate headers.
- res = httplib2._parse_www_authenticate({ 'www-authenticate':
- 'Digest realm="testrealm@host.com", qop="auth,auth-int", nonce="dcd98b7102dd2f0e8b11d0f600bfb0c093", opaque="5ccc069c403ebaf9f0171e9517f40e41", Basic REAlm="me" '})
- digest = res['digest']
- self.assertEqual('testrealm@host.com', digest['realm'])
- self.assertEqual('auth,auth-int', digest['qop'])
- self.assertEqual('dcd98b7102dd2f0e8b11d0f600bfb0c093', digest['nonce'])
- self.assertEqual('5ccc069c403ebaf9f0171e9517f40e41', digest['opaque'])
- basic = res['basic']
- self.assertEqual('me', basic['realm'])
-
- def testParseWWWAuthenticateMultiple3(self):
- # Handle an added comma between challenges, which might get thrown in if the challenges were
- # originally sent in separate www-authenticate headers.
- res = httplib2._parse_www_authenticate({ 'www-authenticate':
- 'Digest realm="testrealm@host.com", qop="auth,auth-int", nonce="dcd98b7102dd2f0e8b11d0f600bfb0c093", opaque="5ccc069c403ebaf9f0171e9517f40e41", Basic REAlm="me", WSSE realm="foo", profile="UsernameToken"'})
- digest = res['digest']
- self.assertEqual('testrealm@host.com', digest['realm'])
- self.assertEqual('auth,auth-int', digest['qop'])
- self.assertEqual('dcd98b7102dd2f0e8b11d0f600bfb0c093', digest['nonce'])
- self.assertEqual('5ccc069c403ebaf9f0171e9517f40e41', digest['opaque'])
- basic = res['basic']
- self.assertEqual('me', basic['realm'])
- wsse = res['wsse']
- self.assertEqual('foo', wsse['realm'])
- self.assertEqual('UsernameToken', wsse['profile'])
-
- def testParseWWWAuthenticateMultiple4(self):
- res = httplib2._parse_www_authenticate({ 'www-authenticate':
- 'Digest realm="test-real.m@host.com", qop \t=\t"\tauth,auth-int", nonce="(*)&^&$%#",opaque="5ccc069c403ebaf9f0171e9517f40e41", Basic REAlm="me", WSSE realm="foo", profile="UsernameToken"'})
- digest = res['digest']
- self.assertEqual('test-real.m@host.com', digest['realm'])
- self.assertEqual('\tauth,auth-int', digest['qop'])
- self.assertEqual('(*)&^&$%#', digest['nonce'])
-
- def testParseWWWAuthenticateMoreQuoteCombos(self):
- res = httplib2._parse_www_authenticate({'www-authenticate':'Digest realm="myrealm", nonce="Ygk86AsKBAA=3516200d37f9a3230352fde99977bd6d472d4306", algorithm=MD5, qop="auth", stale=true'})
- digest = res['digest']
- self.assertEqual('myrealm', digest['realm'])
-
- def testParseWWWAuthenticateMalformed(self):
- try:
- res = httplib2._parse_www_authenticate({'www-authenticate':'OAuth "Facebook Platform" "invalid_token" "Invalid OAuth access token."'})
- self.fail("should raise an exception")
- except httplib2.MalformedHeader:
- pass
-
- def testDigestObject(self):
- credentials = ('joe', 'password')
- host = None
- request_uri = '/projects/httplib2/test/digest/'
- headers = {}
- response = {
- 'www-authenticate': 'Digest realm="myrealm", nonce="Ygk86AsKBAA=3516200d37f9a3230352fde99977bd6d472d4306", algorithm=MD5, qop="auth"'
- }
- content = b""
-
- d = httplib2.DigestAuthentication(credentials, host, request_uri, headers, response, content, None)
- d.request("GET", request_uri, headers, content, cnonce="33033375ec278a46")
- our_request = "authorization: %s" % headers['authorization']
- working_request = 'authorization: Digest username="joe", realm="myrealm", nonce="Ygk86AsKBAA=3516200d37f9a3230352fde99977bd6d472d4306", uri="/projects/httplib2/test/digest/", algorithm=MD5, response="97ed129401f7cdc60e5db58a80f3ea8b", qop=auth, nc=00000001, cnonce="33033375ec278a46"'
- self.assertEqual(our_request, working_request)
-
- def testDigestObjectWithOpaque(self):
- credentials = ('joe', 'password')
- host = None
- request_uri = '/projects/httplib2/test/digest/'
- headers = {}
- response = {
- 'www-authenticate': 'Digest realm="myrealm", nonce="Ygk86AsKBAA=3516200d37f9a3230352fde99977bd6d472d4306", algorithm=MD5, qop="auth", opaque="atestopaque"'
- }
- content = ""
-
- d = httplib2.DigestAuthentication(credentials, host, request_uri, headers, response, content, None)
- d.request("GET", request_uri, headers, content, cnonce="33033375ec278a46")
- our_request = "authorization: %s" % headers['authorization']
- working_request = 'authorization: Digest username="joe", realm="myrealm", nonce="Ygk86AsKBAA=3516200d37f9a3230352fde99977bd6d472d4306", uri="/projects/httplib2/test/digest/", algorithm=MD5, response="97ed129401f7cdc60e5db58a80f3ea8b", qop=auth, nc=00000001, cnonce="33033375ec278a46", opaque="atestopaque"'
- self.assertEqual(our_request, working_request)
-
- def testDigestObjectStale(self):
- credentials = ('joe', 'password')
- host = None
- request_uri = '/projects/httplib2/test/digest/'
- headers = {}
- response = httplib2.Response({ })
- response['www-authenticate'] = 'Digest realm="myrealm", nonce="Ygk86AsKBAA=3516200d37f9a3230352fde99977bd6d472d4306", algorithm=MD5, qop="auth", stale=true'
- response.status = 401
- content = b""
- d = httplib2.DigestAuthentication(credentials, host, request_uri, headers, response, content, None)
- # Returns true to force a retry
- self.assertTrue( d.response(response, content) )
-
- def testDigestObjectAuthInfo(self):
- credentials = ('joe', 'password')
- host = None
- request_uri = '/projects/httplib2/test/digest/'
- headers = {}
- response = httplib2.Response({ })
- response['www-authenticate'] = 'Digest realm="myrealm", nonce="Ygk86AsKBAA=3516200d37f9a3230352fde99977bd6d472d4306", algorithm=MD5, qop="auth", stale=true'
- response['authentication-info'] = 'nextnonce="fred"'
- content = b""
- d = httplib2.DigestAuthentication(credentials, host, request_uri, headers, response, content, None)
- # Returns true to force a retry
- self.assertFalse( d.response(response, content) )
- self.assertEqual('fred', d.challenge['nonce'])
- self.assertEqual(1, d.challenge['nc'])
-
- def testWsseAlgorithm(self):
- digest = httplib2._wsse_username_token("d36e316282959a9ed4c89851497a717f", "2003-12-15T14:43:07Z", "taadtaadpstcsm")
- expected = b"quR/EWLAV4xLf9Zqyw4pDmfV9OY="
- self.assertEqual(expected, digest)
-
- def testEnd2End(self):
- # one end to end header
- response = {'content-type': 'application/atom+xml', 'te': 'deflate'}
- end2end = httplib2._get_end2end_headers(response)
- self.assertTrue('content-type' in end2end)
- self.assertTrue('te' not in end2end)
- self.assertTrue('connection' not in end2end)
-
- # one end to end header that gets eliminated
- response = {'connection': 'content-type', 'content-type': 'application/atom+xml', 'te': 'deflate'}
- end2end = httplib2._get_end2end_headers(response)
- self.assertTrue('content-type' not in end2end)
- self.assertTrue('te' not in end2end)
- self.assertTrue('connection' not in end2end)
-
- # Degenerate case of no headers
- response = {}
- end2end = httplib2._get_end2end_headers(response)
- self.assertEqual(0, len(end2end))
-
- # Degenerate case of connection referrring to a header not passed in
- response = {'connection': 'content-type'}
- end2end = httplib2._get_end2end_headers(response)
- self.assertEqual(0, len(end2end))
-
-
-class TestProxyInfo(unittest.TestCase):
- def setUp(self):
- self.orig_env = dict(os.environ)
-
- def tearDown(self):
- os.environ.clear()
- os.environ.update(self.orig_env)
-
- def test_from_url(self):
- pi = httplib2.proxy_info_from_url('http://myproxy.example.com')
- self.assertEqual(pi.proxy_host, 'myproxy.example.com')
- self.assertEqual(pi.proxy_port, 80)
- self.assertEqual(pi.proxy_user, None)
-
- def test_from_url_ident(self):
- pi = httplib2.proxy_info_from_url('http://zoidberg:fish@someproxy:99')
- self.assertEqual(pi.proxy_host, 'someproxy')
- self.assertEqual(pi.proxy_port, 99)
- self.assertEqual(pi.proxy_user, 'zoidberg')
- self.assertEqual(pi.proxy_pass, 'fish')
-
- def test_from_env(self):
- os.environ['http_proxy'] = 'http://myproxy.example.com:8080'
- pi = httplib2.proxy_info_from_environment()
- self.assertEqual(pi.proxy_host, 'myproxy.example.com')
- self.assertEqual(pi.proxy_port, 8080)
-
- def test_from_env_no_proxy(self):
- os.environ['http_proxy'] = 'http://myproxy.example.com:80'
- os.environ['https_proxy'] = 'http://myproxy.example.com:81'
- pi = httplib2.proxy_info_from_environment('https')
- self.assertEqual(pi.proxy_host, 'myproxy.example.com')
- self.assertEqual(pi.proxy_port, 81)
-
- def test_from_env_none(self):
- os.environ.clear()
- pi = httplib2.proxy_info_from_environment()
- self.assertEqual(pi, None)
-
- def test_proxy_headers(self):
- headers = {'key0': 'val0', 'key1': 'val1'}
- pi = httplib2.ProxyInfo(httplib2.socks.PROXY_TYPE_HTTP, 'localhost', 1234, proxy_headers = headers)
- self.assertEqual(pi.proxy_headers, headers)
-
- # regression: ensure that httplib2.HTTPConnectionWithTimeout initializes when proxy_info is not supplied
- def test_proxy_init(self):
- connection = httplib2.HTTPConnectionWithTimeout('www.google.com', 80)
- connection.request('GET', '/')
- connection.close()
-
-if __name__ == '__main__':
- unittest.main()
+#!/usr/bin/env python3
+"""A set of unit tests for httplib2.py."""
+
+__author__ = "Joe Gregorio (joe@bitworking.org)"
+__copyright__ = "Copyright 2006, Joe Gregorio"
+__contributors__ = ["Mark Pilgrim"]
+__license__ = "MIT"
+__version__ = "0.2 ($Rev: 118 $)"
+
+import base64
+import http.client
+import httplib2
+import io
+import os
+import pickle
+import socket
+import ssl
+import sys
+import time
+import unittest
+import urllib.parse
+
+base = "http://bitworking.org/projects/httplib2/test/"
+cacheDirName = ".cache"
+
+
+class CredentialsTest(unittest.TestCase):
+ def test(self):
+ c = httplib2.Credentials()
+ c.add("joe", "password")
+ self.assertEqual(("joe", "password"), list(c.iter("bitworking.org"))[0])
+ self.assertEqual(("joe", "password"), list(c.iter(""))[0])
+ c.add("fred", "password2", "wellformedweb.org")
+ self.assertEqual(("joe", "password"), list(c.iter("bitworking.org"))[0])
+ self.assertEqual(1, len(list(c.iter("bitworking.org"))))
+ self.assertEqual(2, len(list(c.iter("wellformedweb.org"))))
+ self.assertTrue(("fred", "password2") in list(c.iter("wellformedweb.org")))
+ c.clear()
+ self.assertEqual(0, len(list(c.iter("bitworking.org"))))
+ c.add("fred", "password2", "wellformedweb.org")
+ self.assertTrue(("fred", "password2") in list(c.iter("wellformedweb.org")))
+ self.assertEqual(0, len(list(c.iter("bitworking.org"))))
+ self.assertEqual(0, len(list(c.iter(""))))
+
+
+class ParserTest(unittest.TestCase):
+ def testFromStd66(self):
+ self.assertEqual(
+ ("http", "example.com", "", None, None),
+ httplib2.parse_uri("http://example.com"),
+ )
+ self.assertEqual(
+ ("https", "example.com", "", None, None),
+ httplib2.parse_uri("https://example.com"),
+ )
+ self.assertEqual(
+ ("https", "example.com:8080", "", None, None),
+ httplib2.parse_uri("https://example.com:8080"),
+ )
+ self.assertEqual(
+ ("http", "example.com", "/", None, None),
+ httplib2.parse_uri("http://example.com/"),
+ )
+ self.assertEqual(
+ ("http", "example.com", "/path", None, None),
+ httplib2.parse_uri("http://example.com/path"),
+ )
+ self.assertEqual(
+ ("http", "example.com", "/path", "a=1&b=2", None),
+ httplib2.parse_uri("http://example.com/path?a=1&b=2"),
+ )
+ self.assertEqual(
+ ("http", "example.com", "/path", "a=1&b=2", "fred"),
+ httplib2.parse_uri("http://example.com/path?a=1&b=2#fred"),
+ )
+ self.assertEqual(
+ ("http", "example.com", "/path", "a=1&b=2", "fred"),
+ httplib2.parse_uri("http://example.com/path?a=1&b=2#fred"),
+ )
+
+
+class UrlNormTest(unittest.TestCase):
+ def test(self):
+ self.assertEqual(
+ "http://example.org/", httplib2.urlnorm("http://example.org")[-1]
+ )
+ self.assertEqual(
+ "http://example.org/", httplib2.urlnorm("http://EXAMple.org")[-1]
+ )
+ self.assertEqual(
+ "http://example.org/?=b", httplib2.urlnorm("http://EXAMple.org?=b")[-1]
+ )
+ self.assertEqual(
+ "http://example.org/mypath?a=b",
+ httplib2.urlnorm("http://EXAMple.org/mypath?a=b")[-1],
+ )
+ self.assertEqual(
+ "http://localhost:80/", httplib2.urlnorm("http://localhost:80")[-1]
+ )
+ self.assertEqual(
+ httplib2.urlnorm("http://localhost:80/"),
+ httplib2.urlnorm("HTTP://LOCALHOST:80"),
+ )
+ try:
+ httplib2.urlnorm("/")
+ self.fail("Non-absolute URIs should raise an exception")
+ except httplib2.RelativeURIError:
+ pass
+
+
+class UrlSafenameTest(unittest.TestCase):
+ def test(self):
+ # Test that different URIs end up generating different safe names
+ self.assertEqual(
+ "example.org,fred,a=b,58489f63a7a83c3b7794a6a398ee8b1f",
+ httplib2.safename("http://example.org/fred/?a=b"),
+ )
+ self.assertEqual(
+ "example.org,fred,a=b,8c5946d56fec453071f43329ff0be46b",
+ httplib2.safename("http://example.org/fred?/a=b"),
+ )
+ self.assertEqual(
+ "www.example.org,fred,a=b,499c44b8d844a011b67ea2c015116968",
+ httplib2.safename("http://www.example.org/fred?/a=b"),
+ )
+ self.assertEqual(
+ httplib2.safename(httplib2.urlnorm("http://www")[-1]),
+ httplib2.safename(httplib2.urlnorm("http://WWW")[-1]),
+ )
+ self.assertEqual(
+ "www.example.org,fred,a=b,692e843a333484ce0095b070497ab45d",
+ httplib2.safename("https://www.example.org/fred?/a=b"),
+ )
+ self.assertNotEqual(
+ httplib2.safename("http://www"), httplib2.safename("https://www")
+ )
+ # Test the max length limits
+ uri = "http://" + ("w" * 200) + ".org"
+ uri2 = "http://" + ("w" * 201) + ".org"
+ self.assertNotEqual(httplib2.safename(uri2), httplib2.safename(uri))
+ # Max length should be 200 + 1 (",") + 32
+ self.assertEqual(233, len(httplib2.safename(uri2)))
+ self.assertEqual(233, len(httplib2.safename(uri)))
+ # Unicode
+ if sys.version_info >= (2, 3):
+ self.assertEqual(
+ "xn--http,-4y1d.org,fred,a=b,579924c35db315e5a32e3d9963388193",
+ httplib2.safename("http://\u2304.org/fred/?a=b"),
+ )
+
+
+class _MyResponse(io.BytesIO):
+ def __init__(self, body, **kwargs):
+ io.BytesIO.__init__(self, body)
+ self.headers = kwargs
+
+ def items(self):
+ return self.headers.items()
+
+ def iteritems(self):
+ return iter(self.headers.items())
+
+
+class _MyHTTPConnection(object):
+ "This class is just a mock of httplib.HTTPConnection used for testing"
+
+ def __init__(
+ self,
+ host,
+ port=None,
+ key_file=None,
+ cert_file=None,
+ strict=None,
+ timeout=None,
+ proxy_info=None,
+ ):
+ self.host = host
+ self.port = port
+ self.timeout = timeout
+ self.log = ""
+ self.sock = None
+
+ def set_debuglevel(self, level):
+ pass
+
+ def connect(self):
+ "Connect to a host on a given port."
+ pass
+
+ def close(self):
+ pass
+
+ def request(self, method, request_uri, body, headers):
+ pass
+
+ def getresponse(self):
+ return _MyResponse(b"the body", status="200")
+
+
+class _MyHTTPBadStatusConnection(object):
+ "Mock of httplib.HTTPConnection that raises BadStatusLine."
+
+ num_calls = 0
+
+ def __init__(
+ self,
+ host,
+ port=None,
+ key_file=None,
+ cert_file=None,
+ strict=None,
+ timeout=None,
+ proxy_info=None,
+ ):
+ self.host = host
+ self.port = port
+ self.timeout = timeout
+ self.log = ""
+ self.sock = None
+ _MyHTTPBadStatusConnection.num_calls = 0
+
+ def set_debuglevel(self, level):
+ pass
+
+ def connect(self):
+ pass
+
+ def close(self):
+ pass
+
+ def request(self, method, request_uri, body, headers):
+ pass
+
+ def getresponse(self):
+ _MyHTTPBadStatusConnection.num_calls += 1
+ raise http.client.BadStatusLine("")
+
+
+class HttpTest(unittest.TestCase):
+ def setUp(self):
+ if os.path.exists(cacheDirName):
+ [
+ os.remove(os.path.join(cacheDirName, file))
+ for file in os.listdir(cacheDirName)
+ ]
+ self.http = httplib2.Http(cacheDirName)
+ self.http.clear_credentials()
+
+ def testIPv6NoSSL(self):
+ try:
+ self.http.request("http://[::1]/")
+ except socket.gaierror:
+ self.fail("should get the address family right for IPv6")
+ except socket.error:
+ # Even if IPv6 isn't installed on a machine it should just raise socket.error
+ pass
+
+ def testIPv6SSL(self):
+ try:
+ self.http.request("https://[::1]/")
+ except socket.gaierror:
+ self.fail("should get the address family right for IPv6")
+ except socket.error:
+ # Even if IPv6 isn't installed on a machine it should just raise socket.error
+ pass
+
+ def testConnectionType(self):
+ self.http.force_exception_to_status_code = False
+ response, content = self.http.request(
+ "http://bitworking.org", connection_type=_MyHTTPConnection
+ )
+ self.assertEqual(response["content-location"], "http://bitworking.org")
+ self.assertEqual(content, b"the body")
+
+ def testBadStatusLineRetry(self):
+ old_retries = httplib2.RETRIES
+ httplib2.RETRIES = 1
+ self.http.force_exception_to_status_code = False
+ try:
+ response, content = self.http.request(
+ "http://bitworking.org", connection_type=_MyHTTPBadStatusConnection
+ )
+ except http.client.BadStatusLine:
+ self.assertEqual(2, _MyHTTPBadStatusConnection.num_calls)
+ httplib2.RETRIES = old_retries
+
+ def testGetUnknownServer(self):
+ self.http.force_exception_to_status_code = False
+ try:
+ self.http.request("http://fred.bitworking.org/")
+ self.fail(
+ "An httplib2.ServerNotFoundError Exception must be thrown on an unresolvable server."
+ )
+ except httplib2.ServerNotFoundError:
+ pass
+
+ # Now test with exceptions turned off
+ self.http.force_exception_to_status_code = True
+
+ (response, content) = self.http.request("http://fred.bitworking.org/")
+ self.assertEqual(response["content-type"], "text/plain")
+ self.assertTrue(content.startswith(b"Unable to find"))
+ self.assertEqual(response.status, 400)
+
+ def testGetConnectionRefused(self):
+ self.http.force_exception_to_status_code = False
+ try:
+ self.http.request("http://localhost:7777/")
+ self.fail("An socket.error exception must be thrown on Connection Refused.")
+ except socket.error:
+ pass
+
+ # Now test with exceptions turned off
+ self.http.force_exception_to_status_code = True
+
+ (response, content) = self.http.request("http://localhost:7777/")
+ self.assertEqual(response["content-type"], "text/plain")
+ self.assertTrue(b"Connection refused" in content)
+ self.assertEqual(response.status, 400)
+
+ def testGetIRI(self):
+ if sys.version_info >= (2, 3):
+ uri = urllib.parse.urljoin(
+ base, "reflector/reflector.cgi?d=\N{CYRILLIC CAPITAL LETTER DJE}"
+ )
+ (response, content) = self.http.request(uri, "GET")
+ d = self.reflector(content)
+ self.assertTrue("QUERY_STRING" in d)
+ self.assertTrue(d["QUERY_STRING"].find("%D0%82") > 0)
+
+ def testGetIsDefaultMethod(self):
+ # Test that GET is the default method
+ uri = urllib.parse.urljoin(base, "methods/method_reflector.cgi")
+ (response, content) = self.http.request(uri)
+ self.assertEqual(response["x-method"], "GET")
+
+ def testDifferentMethods(self):
+ # Test that all methods can be used
+ uri = urllib.parse.urljoin(base, "methods/method_reflector.cgi")
+ for method in ["GET", "PUT", "DELETE", "POST"]:
+ (response, content) = self.http.request(uri, method, body=b" ")
+ self.assertEqual(response["x-method"], method)
+
+ def testHeadRead(self):
+ # Test that we don't try to read the response of a HEAD request
+ # since httplib blocks response.read() for HEAD requests.
+ # Oddly enough this doesn't appear as a problem when doing HEAD requests
+ # against Apache servers.
+ uri = "http://www.google.com/"
+ (response, content) = self.http.request(uri, "HEAD")
+ self.assertEqual(response.status, 200)
+ self.assertEqual(content, b"")
+
+ def testGetNoCache(self):
+ # Test that can do a GET w/o the cache turned on.
+ http = httplib2.Http()
+ uri = urllib.parse.urljoin(base, "304/test_etag.txt")
+ (response, content) = http.request(uri, "GET")
+ self.assertEqual(response.status, 200)
+ self.assertEqual(response.previous, None)
+
+ def testGetOnlyIfCachedCacheHit(self):
+ # Test that can do a GET with cache and 'only-if-cached'
+ uri = urllib.parse.urljoin(base, "304/test_etag.txt")
+ (response, content) = self.http.request(uri, "GET")
+ (response, content) = self.http.request(
+ uri, "GET", headers={"cache-control": "only-if-cached"}
+ )
+ self.assertEqual(response.fromcache, True)
+ self.assertEqual(response.status, 200)
+
+ def testGetOnlyIfCachedCacheMiss(self):
+ # Test that can do a GET with no cache with 'only-if-cached'
+ uri = urllib.parse.urljoin(base, "304/test_etag.txt")
+ (response, content) = self.http.request(
+ uri, "GET", headers={"cache-control": "only-if-cached"}
+ )
+ self.assertEqual(response.fromcache, False)
+ self.assertEqual(response.status, 504)
+
+ def testGetOnlyIfCachedNoCacheAtAll(self):
+ # Test that can do a GET with no cache with 'only-if-cached'
+ # Of course, there might be an intermediary beyond us
+ # that responds to the 'only-if-cached', so this
+ # test can't really be guaranteed to pass.
+ http = httplib2.Http()
+ uri = urllib.parse.urljoin(base, "304/test_etag.txt")
+ (response, content) = http.request(
+ uri, "GET", headers={"cache-control": "only-if-cached"}
+ )
+ self.assertEqual(response.fromcache, False)
+ self.assertEqual(response.status, 504)
+
+ def testUserAgent(self):
+ # Test that we provide a default user-agent
+ uri = urllib.parse.urljoin(base, "user-agent/test.cgi")
+ (response, content) = self.http.request(uri, "GET")
+ self.assertEqual(response.status, 200)
+ self.assertTrue(content.startswith(b"Python-httplib2/"))
+
+ def testUserAgentNonDefault(self):
+ # Test that the default user-agent can be over-ridden
+
+ uri = urllib.parse.urljoin(base, "user-agent/test.cgi")
+ (response, content) = self.http.request(
+ uri, "GET", headers={"User-Agent": "fred/1.0"}
+ )
+ self.assertEqual(response.status, 200)
+ self.assertTrue(content.startswith(b"fred/1.0"))
+
+ def testGet300WithLocation(self):
+ # Test the we automatically follow 300 redirects if a Location: header is provided
+ uri = urllib.parse.urljoin(base, "300/with-location-header.asis")
+ (response, content) = self.http.request(uri, "GET")
+ self.assertEqual(response.status, 200)
+ self.assertEqual(content, b"This is the final destination.\n")
+ self.assertEqual(response.previous.status, 300)
+ self.assertEqual(response.previous.fromcache, False)
+
+ # Confirm that the intermediate 300 is not cached
+ (response, content) = self.http.request(uri, "GET")
+ self.assertEqual(response.status, 200)
+ self.assertEqual(content, b"This is the final destination.\n")
+ self.assertEqual(response.previous.status, 300)
+ self.assertEqual(response.previous.fromcache, False)
+
+ def testGet300WithLocationNoRedirect(self):
+ # Test the we automatically follow 300 redirects if a Location: header is provided
+ self.http.follow_redirects = False
+ uri = urllib.parse.urljoin(base, "300/with-location-header.asis")
+ (response, content) = self.http.request(uri, "GET")
+ self.assertEqual(response.status, 300)
+
+ def testGet300WithoutLocation(self):
+ # Not giving a Location: header in a 300 response is acceptable
+ # In which case we just return the 300 response
+ uri = urllib.parse.urljoin(base, "300/without-location-header.asis")
+ (response, content) = self.http.request(uri, "GET")
+ self.assertEqual(response.status, 300)
+ self.assertTrue(response["content-type"].startswith("text/html"))
+ self.assertEqual(response.previous, None)
+
+ def testGet301(self):
+ # Test that we automatically follow 301 redirects
+ # and that we cache the 301 response
+ uri = urllib.parse.urljoin(base, "301/onestep.asis")
+ destination = urllib.parse.urljoin(base, "302/final-destination.txt")
+ (response, content) = self.http.request(uri, "GET")
+ self.assertEqual(response.status, 200)
+ self.assertTrue("content-location" in response)
+ self.assertEqual(response["content-location"], destination)
+ self.assertEqual(content, b"This is the final destination.\n")
+ self.assertEqual(response.previous.status, 301)
+ self.assertEqual(response.previous.fromcache, False)
+
+ (response, content) = self.http.request(uri, "GET")
+ self.assertEqual(response.status, 200)
+ self.assertEqual(response["content-location"], destination)
+ self.assertEqual(content, b"This is the final destination.\n")
+ self.assertEqual(response.previous.status, 301)
+ self.assertEqual(response.previous.fromcache, True)
+
+ def testHead301(self):
+ # Test that we automatically follow 301 redirects
+ uri = urllib.parse.urljoin(base, "301/onestep.asis")
+ (response, content) = self.http.request(uri, "HEAD")
+ self.assertEqual(response.status, 200)
+ self.assertEqual(response.previous.status, 301)
+ self.assertEqual(response.previous.fromcache, False)
+
+ def testGet301NoRedirect(self):
+ # Test that we automatically follow 301 redirects
+ # and that we cache the 301 response
+ self.http.follow_redirects = False
+ uri = urllib.parse.urljoin(base, "301/onestep.asis")
+ destination = urllib.parse.urljoin(base, "302/final-destination.txt")
+ (response, content) = self.http.request(uri, "GET")
+ self.assertEqual(response.status, 301)
+
+ def testGet302(self):
+ # Test that we automatically follow 302 redirects
+ # and that we DO NOT cache the 302 response
+ uri = urllib.parse.urljoin(base, "302/onestep.asis")
+ destination = urllib.parse.urljoin(base, "302/final-destination.txt")
+ (response, content) = self.http.request(uri, "GET")
+ self.assertEqual(response.status, 200)
+ self.assertEqual(response["content-location"], destination)
+ self.assertEqual(content, b"This is the final destination.\n")
+ self.assertEqual(response.previous.status, 302)
+ self.assertEqual(response.previous.fromcache, False)
+
+ uri = urllib.parse.urljoin(base, "302/onestep.asis")
+ (response, content) = self.http.request(uri, "GET")
+ self.assertEqual(response.status, 200)
+ self.assertEqual(response.fromcache, True)
+ self.assertEqual(response["content-location"], destination)
+ self.assertEqual(content, b"This is the final destination.\n")
+ self.assertEqual(response.previous.status, 302)
+ self.assertEqual(response.previous.fromcache, False)
+ self.assertEqual(response.previous["content-location"], uri)
+
+ uri = urllib.parse.urljoin(base, "302/twostep.asis")
+
+ (response, content) = self.http.request(uri, "GET")
+ self.assertEqual(response.status, 200)
+ self.assertEqual(response.fromcache, True)
+ self.assertEqual(content, b"This is the final destination.\n")
+ self.assertEqual(response.previous.status, 302)
+ self.assertEqual(response.previous.fromcache, False)
+
+ def testGet302RedirectionLimit(self):
+ # Test that we can set a lower redirection limit
+ # and that we raise an exception when we exceed
+ # that limit.
+ self.http.force_exception_to_status_code = False
+
+ uri = urllib.parse.urljoin(base, "302/twostep.asis")
+ try:
+ (response, content) = self.http.request(uri, "GET", redirections=1)
+ self.fail("This should not happen")
+ except httplib2.RedirectLimit:
+ pass
+ except Exception as e:
+ self.fail("Threw wrong kind of exception ")
+
+ # Re-run the test with out the exceptions
+ self.http.force_exception_to_status_code = True
+
+ (response, content) = self.http.request(uri, "GET", redirections=1)
+ self.assertEqual(response.status, 500)
+ self.assertTrue(response.reason.startswith("Redirected more"))
+ self.assertEqual("302", response["status"])
+ self.assertTrue(content.startswith(b"<html>"))
+ self.assertTrue(response.previous != None)
+
+ def testGet302NoLocation(self):
+ # Test that we throw an exception when we get
+ # a 302 with no Location: header.
+ self.http.force_exception_to_status_code = False
+ uri = urllib.parse.urljoin(base, "302/no-location.asis")
+ try:
+ (response, content) = self.http.request(uri, "GET")
+ self.fail("Should never reach here")
+ except httplib2.RedirectMissingLocation:
+ pass
+ except Exception as e:
+ self.fail("Threw wrong kind of exception ")
+
+ # Re-run the test with out the exceptions
+ self.http.force_exception_to_status_code = True
+
+ (response, content) = self.http.request(uri, "GET")
+ self.assertEqual(response.status, 500)
+ self.assertTrue(response.reason.startswith("Redirected but"))
+ self.assertEqual("302", response["status"])
+ self.assertTrue(content.startswith(b"This is content"))
+
+ def testGet301ViaHttps(self):
+ # Google always redirects to http://google.com
+ (response, content) = self.http.request("https://code.google.com/apis/", "GET")
+ self.assertEqual(200, response.status)
+ self.assertEqual(301, response.previous.status)
+
+ def testGetViaHttps(self):
+ # Test that we can handle HTTPS
+ (response, content) = self.http.request("https://google.com/adsense/", "GET")
+ self.assertEqual(200, response.status)
+
+ def testGetViaHttpsSpecViolationOnLocation(self):
+ # Test that we follow redirects through HTTPS
+ # even if they violate the spec by including
+ # a relative Location: header instead of an
+ # absolute one.
+ (response, content) = self.http.request("https://google.com/adsense", "GET")
+ self.assertEqual(200, response.status)
+ self.assertNotEqual(None, response.previous)
+
+ def testGetViaHttpsKeyCert(self):
+ # At this point I can only test
+ # that the key and cert files are passed in
+ # correctly to httplib. It would be nice to have
+ # a real https endpoint to test against.
+ http = httplib2.Http(timeout=2)
+
+ http.add_certificate("akeyfile", "acertfile", "bitworking.org")
+ try:
+ (response, content) = http.request("https://bitworking.org", "GET")
+ except AttributeError:
+ self.assertEqual(
+ http.connections["https:bitworking.org"].key_file, "akeyfile"
+ )
+ self.assertEqual(
+ http.connections["https:bitworking.org"].cert_file, "acertfile"
+ )
+ except IOError:
+ # Skip on 3.2
+ pass
+
+ try:
+ (response, content) = http.request("https://notthere.bitworking.org", "GET")
+ except httplib2.ServerNotFoundError:
+ self.assertEqual(
+ http.connections["https:notthere.bitworking.org"].key_file, None
+ )
+ self.assertEqual(
+ http.connections["https:notthere.bitworking.org"].cert_file, None
+ )
+ except IOError:
+ # Skip on 3.2
+ pass
+
+ def testSslCertValidation(self):
+ # Test that we get an ssl.SSLError when specifying a non-existent CA
+ # certs file.
+ http = httplib2.Http(ca_certs="/nosuchfile")
+ self.assertRaises(IOError, http.request, "https://www.google.com/", "GET")
+
+ # Test that we get a SSLHandshakeError if we try to access
+ # https://www.google.com, using a CA cert file that doesn't contain
+ # the CA Google uses (i.e., simulating a cert that's not signed by a
+ # trusted CA).
+ other_ca_certs = os.path.join(
+ os.path.dirname(os.path.abspath(httplib2.__file__)),
+ "test",
+ "other_cacerts.txt",
+ )
+ http = httplib2.Http(ca_certs=other_ca_certs)
+ self.assertRaises(ssl.SSLError, http.request, "https://www.google.com/", "GET")
+
+ def testSniHostnameValidation(self):
+ self.http.request("https://google.com/", method="GET")
+
+ def testGet303(self):
+ # Do a follow-up GET on a Location: header
+ # returned from a POST that gave a 303.
+ uri = urllib.parse.urljoin(base, "303/303.cgi")
+ (response, content) = self.http.request(uri, "POST", " ")
+ self.assertEqual(response.status, 200)
+ self.assertEqual(content, b"This is the final destination.\n")
+ self.assertEqual(response.previous.status, 303)
+
+ def testGet303NoRedirect(self):
+ # Do a follow-up GET on a Location: header
+ # returned from a POST that gave a 303.
+ self.http.follow_redirects = False
+ uri = urllib.parse.urljoin(base, "303/303.cgi")
+ (response, content) = self.http.request(uri, "POST", " ")
+ self.assertEqual(response.status, 303)
+
+ def test303ForDifferentMethods(self):
+ # Test that all methods can be used
+ uri = urllib.parse.urljoin(base, "303/redirect-to-reflector.cgi")
+ for (method, method_on_303) in [
+ ("PUT", "GET"),
+ ("DELETE", "GET"),
+ ("POST", "GET"),
+ ("GET", "GET"),
+ ("HEAD", "GET"),
+ ]:
+ (response, content) = self.http.request(uri, method, body=b" ")
+ self.assertEqual(response["x-method"], method_on_303)
+
+ def testGet304(self):
+ # Test that we use ETags properly to validate our cache
+ uri = urllib.parse.urljoin(base, "304/test_etag.txt")
+ (response, content) = self.http.request(
+ uri, "GET", headers={"accept-encoding": "identity"}
+ )
+ self.assertNotEqual(response["etag"], "")
+
+ (response, content) = self.http.request(
+ uri, "GET", headers={"accept-encoding": "identity"}
+ )
+ (response, content) = self.http.request(
+ uri,
+ "GET",
+ headers={"accept-encoding": "identity", "cache-control": "must-revalidate"},
+ )
+ self.assertEqual(response.status, 200)
+ self.assertEqual(response.fromcache, True)
+
+ cache_file_name = os.path.join(
+ cacheDirName, httplib2.safename(httplib2.urlnorm(uri)[-1])
+ )
+ f = open(cache_file_name, "r")
+ status_line = f.readline()
+ f.close()
+
+ self.assertTrue(status_line.startswith("status:"))
+
+ (response, content) = self.http.request(
+ uri, "HEAD", headers={"accept-encoding": "identity"}
+ )
+ self.assertEqual(response.status, 200)
+ self.assertEqual(response.fromcache, True)
+
+ (response, content) = self.http.request(
+ uri, "GET", headers={"accept-encoding": "identity", "range": "bytes=0-0"}
+ )
+ self.assertEqual(response.status, 206)
+ self.assertEqual(response.fromcache, False)
+
+ def testGetIgnoreEtag(self):
+ # Test that we can forcibly ignore ETags
+ uri = urllib.parse.urljoin(base, "reflector/reflector.cgi")
+ (response, content) = self.http.request(
+ uri, "GET", headers={"accept-encoding": "identity"}
+ )
+ self.assertNotEqual(response["etag"], "")
+
+ (response, content) = self.http.request(
+ uri,
+ "GET",
+ headers={"accept-encoding": "identity", "cache-control": "max-age=0"},
+ )
+ d = self.reflector(content)
+ self.assertTrue("HTTP_IF_NONE_MATCH" in d)
+
+ self.http.ignore_etag = True
+ (response, content) = self.http.request(
+ uri,
+ "GET",
+ headers={"accept-encoding": "identity", "cache-control": "max-age=0"},
+ )
+ d = self.reflector(content)
+ self.assertEqual(response.fromcache, False)
+ self.assertFalse("HTTP_IF_NONE_MATCH" in d)
+
+ def testOverrideEtag(self):
+ # Test that we can forcibly ignore ETags
+ uri = urllib.parse.urljoin(base, "reflector/reflector.cgi")
+ (response, content) = self.http.request(
+ uri, "GET", headers={"accept-encoding": "identity"}
+ )
+ self.assertNotEqual(response["etag"], "")
+
+ (response, content) = self.http.request(
+ uri,
+ "GET",
+ headers={"accept-encoding": "identity", "cache-control": "max-age=0"},
+ )
+ d = self.reflector(content)
+ self.assertTrue("HTTP_IF_NONE_MATCH" in d)
+ self.assertNotEqual(d["HTTP_IF_NONE_MATCH"], "fred")
+
+ (response, content) = self.http.request(
+ uri,
+ "GET",
+ headers={
+ "accept-encoding": "identity",
+ "cache-control": "max-age=0",
+ "if-none-match": "fred",
+ },
+ )
+ d = self.reflector(content)
+ self.assertTrue("HTTP_IF_NONE_MATCH" in d)
+ self.assertEqual(d["HTTP_IF_NONE_MATCH"], "fred")
+
+ # MAP-commented this out because it consistently fails
+ # def testGet304EndToEnd(self):
+ # # Test that end to end headers get overwritten in the cache
+ # uri = urllib.parse.urljoin(base, "304/end2end.cgi")
+ # (response, content) = self.http.request(uri, "GET")
+ # self.assertNotEqual(response['etag'], "")
+ # old_date = response['date']
+ # time.sleep(2)
+ #
+ # (response, content) = self.http.request(uri, "GET", headers = {'Cache-Control': 'max-age=0'})
+ # # The response should be from the cache, but the Date: header should be updated.
+ # new_date = response['date']
+ # self.assertNotEqual(new_date, old_date)
+ # self.assertEqual(response.status, 200)
+ # self.assertEqual(response.fromcache, True)
+
+ def testGet304LastModified(self):
+ # Test that we can still handle a 304
+ # by only using the last-modified cache validator.
+ uri = urllib.parse.urljoin(
+ base, "304/last-modified-only/last-modified-only.txt"
+ )
+ (response, content) = self.http.request(uri, "GET")
+
+ self.assertNotEqual(response["last-modified"], "")
+ (response, content) = self.http.request(uri, "GET")
+ (response, content) = self.http.request(uri, "GET")
+ self.assertEqual(response.status, 200)
+ self.assertEqual(response.fromcache, True)
+
+ def testGet307(self):
+ # Test that we do follow 307 redirects but
+ # do not cache the 307
+ uri = urllib.parse.urljoin(base, "307/onestep.asis")
+ (response, content) = self.http.request(uri, "GET")
+ self.assertEqual(response.status, 200)
+ self.assertEqual(content, b"This is the final destination.\n")
+ self.assertEqual(response.previous.status, 307)
+ self.assertEqual(response.previous.fromcache, False)
+
+ (response, content) = self.http.request(uri, "GET")
+ self.assertEqual(response.status, 200)
+ self.assertEqual(response.fromcache, True)
+ self.assertEqual(content, b"This is the final destination.\n")
+ self.assertEqual(response.previous.status, 307)
+ self.assertEqual(response.previous.fromcache, False)
+
+ def testGet410(self):
+ # Test that we pass 410's through
+ uri = urllib.parse.urljoin(base, "410/410.asis")
+ (response, content) = self.http.request(uri, "GET")
+ self.assertEqual(response.status, 410)
+
+ def testVaryHeaderSimple(self):
+ """RFC 2616 13.6 When the cache receives a subsequent request whose Request-URI specifies one or more cache entries including a Vary header field, the cache MUST NOT use such a cache entry to construct a response to the new request unless all of the selecting request-headers present in the new request match the corresponding stored request-headers in the original request.
+
+ """
+ # test that the vary header is sent
+ uri = urllib.parse.urljoin(base, "vary/accept.asis")
+ (response, content) = self.http.request(
+ uri, "GET", headers={"Accept": "text/plain"}
+ )
+ self.assertEqual(response.status, 200)
+ self.assertTrue("vary" in response)
+
+ # get the resource again, from the cache since accept header in this
+ # request is the same as the request
+ (response, content) = self.http.request(
+ uri, "GET", headers={"Accept": "text/plain"}
+ )
+ self.assertEqual(response.status, 200)
+ self.assertEqual(response.fromcache, True, msg="Should be from cache")
+
+ # get the resource again, not from cache since Accept headers does not match
+ (response, content) = self.http.request(
+ uri, "GET", headers={"Accept": "text/html"}
+ )
+ self.assertEqual(response.status, 200)
+ self.assertEqual(response.fromcache, False, msg="Should not be from cache")
+
+ # get the resource again, without any Accept header, so again no match
+ (response, content) = self.http.request(uri, "GET")
+ self.assertEqual(response.status, 200)
+ self.assertEqual(response.fromcache, False, msg="Should not be from cache")
+
+ def testNoVary(self):
+ pass
+ # when there is no vary, a different Accept header (e.g.) should not
+ # impact if the cache is used
+ # test that the vary header is not sent
+ # uri = urllib.parse.urljoin(base, "vary/no-vary.asis")
+ # (response, content) = self.http.request(uri, "GET", headers={'Accept': 'text/plain'})
+ # self.assertEqual(response.status, 200)
+ # self.assertFalse('vary' in response)
+ #
+ # (response, content) = self.http.request(uri, "GET", headers={'Accept': 'text/plain'})
+ # self.assertEqual(response.status, 200)
+ # self.assertEqual(response.fromcache, True, msg="Should be from cache")
+ #
+ # (response, content) = self.http.request(uri, "GET", headers={'Accept': 'text/html'})
+ # self.assertEqual(response.status, 200)
+ # self.assertEqual(response.fromcache, True, msg="Should be from cache")
+
+ def testVaryHeaderDouble(self):
+ uri = urllib.parse.urljoin(base, "vary/accept-double.asis")
+ (response, content) = self.http.request(
+ uri,
+ "GET",
+ headers={
+ "Accept": "text/plain",
+ "Accept-Language": "da, en-gb;q=0.8, en;q=0.7",
+ },
+ )
+ self.assertEqual(response.status, 200)
+ self.assertTrue("vary" in response)
+
+ # we are from cache
+ (response, content) = self.http.request(
+ uri,
+ "GET",
+ headers={
+ "Accept": "text/plain",
+ "Accept-Language": "da, en-gb;q=0.8, en;q=0.7",
+ },
+ )
+ self.assertEqual(response.fromcache, True, msg="Should be from cache")
+
+ (response, content) = self.http.request(
+ uri, "GET", headers={"Accept": "text/plain"}
+ )
+ self.assertEqual(response.status, 200)
+ self.assertEqual(response.fromcache, False)
+
+ # get the resource again, not from cache, varied headers don't match exact
+ (response, content) = self.http.request(
+ uri, "GET", headers={"Accept-Language": "da"}
+ )
+ self.assertEqual(response.status, 200)
+ self.assertEqual(response.fromcache, False, msg="Should not be from cache")
+
+ def testVaryUnusedHeader(self):
+ # A header's value is not considered to vary if it's not used at all.
+ uri = urllib.parse.urljoin(base, "vary/unused-header.asis")
+ (response, content) = self.http.request(
+ uri, "GET", headers={"Accept": "text/plain"}
+ )
+ self.assertEqual(response.status, 200)
+ self.assertTrue("vary" in response)
+
+ # we are from cache
+ (response, content) = self.http.request(
+ uri, "GET", headers={"Accept": "text/plain"}
+ )
+ self.assertEqual(response.fromcache, True, msg="Should be from cache")
+
+ def testHeadGZip(self):
+ # Test that we don't try to decompress a HEAD response
+ uri = urllib.parse.urljoin(base, "gzip/final-destination.txt")
+ (response, content) = self.http.request(uri, "HEAD")
+ self.assertEqual(response.status, 200)
+ self.assertNotEqual(int(response["content-length"]), 0)
+ self.assertEqual(content, b"")
+
+ def testGetGZip(self):
+ # Test that we support gzip compression
+ uri = urllib.parse.urljoin(base, "gzip/final-destination.txt")
+ (response, content) = self.http.request(uri, "GET")
+ self.assertEqual(response.status, 200)
+ self.assertFalse("content-encoding" in response)
+ self.assertTrue("-content-encoding" in response)
+ self.assertEqual(
+ int(response["content-length"]), len(b"This is the final destination.\n")
+ )
+ self.assertEqual(content, b"This is the final destination.\n")
+
+ def testPostAndGZipResponse(self):
+ uri = urllib.parse.urljoin(base, "gzip/post.cgi")
+ (response, content) = self.http.request(uri, "POST", body=" ")
+ self.assertEqual(response.status, 200)
+ self.assertFalse("content-encoding" in response)
+ self.assertTrue("-content-encoding" in response)
+
+ def testGetGZipFailure(self):
+ # Test that we raise a good exception when the gzip fails
+ self.http.force_exception_to_status_code = False
+ uri = urllib.parse.urljoin(base, "gzip/failed-compression.asis")
+ try:
+ (response, content) = self.http.request(uri, "GET")
+ self.fail("Should never reach here")
+ except httplib2.FailedToDecompressContent:
+ pass
+ except Exception:
+ self.fail("Threw wrong kind of exception")
+
+ # Re-run the test with out the exceptions
+ self.http.force_exception_to_status_code = True
+
+ (response, content) = self.http.request(uri, "GET")
+ self.assertEqual(response.status, 500)
+ self.assertTrue(response.reason.startswith("Content purported"))
+
+ def testIndividualTimeout(self):
+ uri = urllib.parse.urljoin(base, "timeout/timeout.cgi")
+ http = httplib2.Http(timeout=1)
+ http.force_exception_to_status_code = True
+
+ (response, content) = http.request(uri)
+ self.assertEqual(response.status, 408)
+ self.assertTrue(response.reason.startswith("Request Timeout"))
+ self.assertTrue(content.startswith(b"Request Timeout"))
+
+ def testGetDeflate(self):
+ # Test that we support deflate compression
+ uri = urllib.parse.urljoin(base, "deflate/deflated.asis")
+ (response, content) = self.http.request(uri, "GET")
+ self.assertEqual(response.status, 200)
+ self.assertFalse("content-encoding" in response)
+ self.assertEqual(
+ int(response["content-length"]), len("This is the final destination.")
+ )
+ self.assertEqual(content, b"This is the final destination.")
+
+ def testGetDeflateFailure(self):
+ # Test that we raise a good exception when the deflate fails
+ self.http.force_exception_to_status_code = False
+
+ uri = urllib.parse.urljoin(base, "deflate/failed-compression.asis")
+ try:
+ (response, content) = self.http.request(uri, "GET")
+ self.fail("Should never reach here")
+ except httplib2.FailedToDecompressContent:
+ pass
+ except Exception:
+ self.fail("Threw wrong kind of exception")
+
+ # Re-run the test with out the exceptions
+ self.http.force_exception_to_status_code = True
+
+ (response, content) = self.http.request(uri, "GET")
+ self.assertEqual(response.status, 500)
+ self.assertTrue(response.reason.startswith("Content purported"))
+
+ def testGetDuplicateHeaders(self):
+ # Test that duplicate headers get concatenated via ','
+ uri = urllib.parse.urljoin(base, "duplicate-headers/multilink.asis")
+ (response, content) = self.http.request(uri, "GET")
+ self.assertEqual(response.status, 200)
+ self.assertEqual(content, b"This is content\n")
+ self.assertEqual(
+ response["link"].split(",")[0],
+ '<http://bitworking.org>; rel="home"; title="BitWorking"',
+ )
+
+ def testGetCacheControlNoCache(self):
+ # Test Cache-Control: no-cache on requests
+ uri = urllib.parse.urljoin(base, "304/test_etag.txt")
+ (response, content) = self.http.request(
+ uri, "GET", headers={"accept-encoding": "identity"}
+ )
+ self.assertNotEqual(response["etag"], "")
+ (response, content) = self.http.request(
+ uri, "GET", headers={"accept-encoding": "identity"}
+ )
+ self.assertEqual(response.status, 200)
+ self.assertEqual(response.fromcache, True)
+
+ (response, content) = self.http.request(
+ uri,
+ "GET",
+ headers={"accept-encoding": "identity", "Cache-Control": "no-cache"},
+ )
+ self.assertEqual(response.status, 200)
+ self.assertEqual(response.fromcache, False)
+
+ def testGetCacheControlPragmaNoCache(self):
+ # Test Pragma: no-cache on requests
+ uri = urllib.parse.urljoin(base, "304/test_etag.txt")
+ (response, content) = self.http.request(
+ uri, "GET", headers={"accept-encoding": "identity"}
+ )
+ self.assertNotEqual(response["etag"], "")
+ (response, content) = self.http.request(
+ uri, "GET", headers={"accept-encoding": "identity"}
+ )
+ self.assertEqual(response.status, 200)
+ self.assertEqual(response.fromcache, True)
+
+ (response, content) = self.http.request(
+ uri, "GET", headers={"accept-encoding": "identity", "Pragma": "no-cache"}
+ )
+ self.assertEqual(response.status, 200)
+ self.assertEqual(response.fromcache, False)
+
+ def testGetCacheControlNoStoreRequest(self):
+ # A no-store request means that the response should not be stored.
+ uri = urllib.parse.urljoin(base, "304/test_etag.txt")
+
+ (response, content) = self.http.request(
+ uri, "GET", headers={"Cache-Control": "no-store"}
+ )
+ self.assertEqual(response.status, 200)
+ self.assertEqual(response.fromcache, False)
+
+ (response, content) = self.http.request(
+ uri, "GET", headers={"Cache-Control": "no-store"}
+ )
+ self.assertEqual(response.status, 200)
+ self.assertEqual(response.fromcache, False)
+
+ def testGetCacheControlNoStoreResponse(self):
+ # A no-store response means that the response should not be stored.
+ uri = urllib.parse.urljoin(base, "no-store/no-store.asis")
+
+ (response, content) = self.http.request(uri, "GET")
+ self.assertEqual(response.status, 200)
+ self.assertEqual(response.fromcache, False)
+
+ (response, content) = self.http.request(uri, "GET")
+ self.assertEqual(response.status, 200)
+ self.assertEqual(response.fromcache, False)
+
+ def testGetCacheControlNoCacheNoStoreRequest(self):
+ # Test that a no-store, no-cache clears the entry from the cache
+ # even if it was cached previously.
+ uri = urllib.parse.urljoin(base, "304/test_etag.txt")
+
+ (response, content) = self.http.request(uri, "GET")
+ (response, content) = self.http.request(uri, "GET")
+ self.assertEqual(response.fromcache, True)
+ (response, content) = self.http.request(
+ uri, "GET", headers={"Cache-Control": "no-store, no-cache"}
+ )
+ (response, content) = self.http.request(
+ uri, "GET", headers={"Cache-Control": "no-store, no-cache"}
+ )
+ self.assertEqual(response.status, 200)
+ self.assertEqual(response.fromcache, False)
+
+ def testUpdateInvalidatesCache(self):
+ # Test that calling PUT or DELETE on a
+ # URI that is cache invalidates that cache.
+ uri = urllib.parse.urljoin(base, "304/test_etag.txt")
+
+ (response, content) = self.http.request(uri, "GET")
+ (response, content) = self.http.request(uri, "GET")
+ self.assertEqual(response.fromcache, True)
+ (response, content) = self.http.request(uri, "DELETE")
+ self.assertEqual(response.status, 405)
+
+ (response, content) = self.http.request(uri, "GET")
+ self.assertEqual(response.fromcache, False)
+
+ def testUpdateUsesCachedETag(self):
+ # Test that we natively support http://www.w3.org/1999/04/Editing/
+ uri = urllib.parse.urljoin(base, "conditional-updates/test.cgi")
+
+ (response, content) = self.http.request(uri, "GET")
+ self.assertEqual(response.status, 200)
+ self.assertEqual(response.fromcache, False)
+ (response, content) = self.http.request(uri, "GET")
+ self.assertEqual(response.status, 200)
+ self.assertEqual(response.fromcache, True)
+ (response, content) = self.http.request(uri, "PUT", body="foo")
+ self.assertEqual(response.status, 200)
+ (response, content) = self.http.request(uri, "PUT", body="foo")
+ self.assertEqual(response.status, 412)
+
+ def testUpdatePatchUsesCachedETag(self):
+ # Test that we natively support http://www.w3.org/1999/04/Editing/
+ uri = urllib.parse.urljoin(base, "conditional-updates/test.cgi")
+
+ (response, content) = self.http.request(uri, "GET")
+ self.assertEqual(response.status, 200)
+ self.assertEqual(response.fromcache, False)
+ (response, content) = self.http.request(uri, "GET")
+ self.assertEqual(response.status, 200)
+ self.assertEqual(response.fromcache, True)
+ (response, content) = self.http.request(uri, "PATCH", body="foo")
+ self.assertEqual(response.status, 200)
+ (response, content) = self.http.request(uri, "PATCH", body="foo")
+ self.assertEqual(response.status, 412)
+
+ def testUpdateUsesCachedETagAndOCMethod(self):
+ # Test that we natively support http://www.w3.org/1999/04/Editing/
+ uri = urllib.parse.urljoin(base, "conditional-updates/test.cgi")
+
+ (response, content) = self.http.request(uri, "GET")
+ self.assertEqual(response.status, 200)
+ self.assertEqual(response.fromcache, False)
+ (response, content) = self.http.request(uri, "GET")
+ self.assertEqual(response.status, 200)
+ self.assertEqual(response.fromcache, True)
+ self.http.optimistic_concurrency_methods.append("DELETE")
+ (response, content) = self.http.request(uri, "DELETE")
+ self.assertEqual(response.status, 200)
+
+ def testUpdateUsesCachedETagOverridden(self):
+ # Test that we natively support http://www.w3.org/1999/04/Editing/
+ uri = urllib.parse.urljoin(base, "conditional-updates/test.cgi")
+
+ (response, content) = self.http.request(uri, "GET")
+ self.assertEqual(response.status, 200)
+ self.assertEqual(response.fromcache, False)
+ (response, content) = self.http.request(uri, "GET")
+ self.assertEqual(response.status, 200)
+ self.assertEqual(response.fromcache, True)
+ (response, content) = self.http.request(
+ uri, "PUT", body="foo", headers={"if-match": "fred"}
+ )
+ self.assertEqual(response.status, 412)
+
+ def testBasicAuth(self):
+ # Test Basic Authentication
+ uri = urllib.parse.urljoin(base, "basic/file.txt")
+ (response, content) = self.http.request(uri, "GET")
+ self.assertEqual(response.status, 401)
+
+ uri = urllib.parse.urljoin(base, "basic/")
+ (response, content) = self.http.request(uri, "GET")
+ self.assertEqual(response.status, 401)
+
+ self.http.add_credentials("joe", "password")
+ (response, content) = self.http.request(uri, "GET")
+ self.assertEqual(response.status, 200)
+
+ uri = urllib.parse.urljoin(base, "basic/file.txt")
+ (response, content) = self.http.request(uri, "GET")
+ self.assertEqual(response.status, 200)
+
+ def testBasicAuthWithDomain(self):
+ # Test Basic Authentication
+ uri = urllib.parse.urljoin(base, "basic/file.txt")
+ (response, content) = self.http.request(uri, "GET")
+ self.assertEqual(response.status, 401)
+
+ uri = urllib.parse.urljoin(base, "basic/")
+ (response, content) = self.http.request(uri, "GET")
+ self.assertEqual(response.status, 401)
+
+ self.http.add_credentials("joe", "password", "example.org")
+ (response, content) = self.http.request(uri, "GET")
+ self.assertEqual(response.status, 401)
+
+ uri = urllib.parse.urljoin(base, "basic/file.txt")
+ (response, content) = self.http.request(uri, "GET")
+ self.assertEqual(response.status, 401)
+
+ domain = urllib.parse.urlparse(base)[1]
+ self.http.add_credentials("joe", "password", domain)
+ (response, content) = self.http.request(uri, "GET")
+ self.assertEqual(response.status, 200)
+
+ uri = urllib.parse.urljoin(base, "basic/file.txt")
+ (response, content) = self.http.request(uri, "GET")
+ self.assertEqual(response.status, 200)
+
+ def testBasicAuthTwoDifferentCredentials(self):
+ # Test Basic Authentication with multiple sets of credentials
+ uri = urllib.parse.urljoin(base, "basic2/file.txt")
+ (response, content) = self.http.request(uri, "GET")
+ self.assertEqual(response.status, 401)
+
+ uri = urllib.parse.urljoin(base, "basic2/")
+ (response, content) = self.http.request(uri, "GET")
+ self.assertEqual(response.status, 401)
+
+ self.http.add_credentials("fred", "barney")
+ (response, content) = self.http.request(uri, "GET")
+ self.assertEqual(response.status, 200)
+
+ uri = urllib.parse.urljoin(base, "basic2/file.txt")
+ (response, content) = self.http.request(uri, "GET")
+ self.assertEqual(response.status, 200)
+
+ def testBasicAuthNested(self):
+ # Test Basic Authentication with resources
+ # that are nested
+ uri = urllib.parse.urljoin(base, "basic-nested/")
+ (response, content) = self.http.request(uri, "GET")
+ self.assertEqual(response.status, 401)
+
+ uri = urllib.parse.urljoin(base, "basic-nested/subdir")
+ (response, content) = self.http.request(uri, "GET")
+ self.assertEqual(response.status, 401)
+
+ # Now add in credentials one at a time and test.
+ self.http.add_credentials("joe", "password")
+
+ uri = urllib.parse.urljoin(base, "basic-nested/")
+ (response, content) = self.http.request(uri, "GET")
+ self.assertEqual(response.status, 200)
+
+ uri = urllib.parse.urljoin(base, "basic-nested/subdir")
+ (response, content) = self.http.request(uri, "GET")
+ self.assertEqual(response.status, 401)
+
+ self.http.add_credentials("fred", "barney")
+
+ uri = urllib.parse.urljoin(base, "basic-nested/")
+ (response, content) = self.http.request(uri, "GET")
+ self.assertEqual(response.status, 200)
+
+ uri = urllib.parse.urljoin(base, "basic-nested/subdir")
+ (response, content) = self.http.request(uri, "GET")
+ self.assertEqual(response.status, 200)
+
+ def testDigestAuth(self):
+ # Test that we support Digest Authentication
+ uri = urllib.parse.urljoin(base, "digest/")
+ (response, content) = self.http.request(uri, "GET")
+ self.assertEqual(response.status, 401)
+
+ self.http.add_credentials("joe", "password")
+ (response, content) = self.http.request(uri, "GET")
+ self.assertEqual(response.status, 200)
+
+ uri = urllib.parse.urljoin(base, "digest/file.txt")
+ (response, content) = self.http.request(uri, "GET")
+
+ def testDigestAuthNextNonceAndNC(self):
+ # Test that if the server sets nextnonce that we reset
+ # the nonce count back to 1
+ uri = urllib.parse.urljoin(base, "digest/file.txt")
+ self.http.add_credentials("joe", "password")
+ (response, content) = self.http.request(
+ uri, "GET", headers={"cache-control": "no-cache"}
+ )
+ info = httplib2._parse_www_authenticate(response, "authentication-info")
+ self.assertEqual(response.status, 200)
+ (response, content) = self.http.request(
+ uri, "GET", headers={"cache-control": "no-cache"}
+ )
+ info2 = httplib2._parse_www_authenticate(response, "authentication-info")
+ self.assertEqual(response.status, 200)
+
+ if "nextnonce" in info:
+ self.assertEqual(info2["nc"], 1)
+
+ def testDigestAuthStale(self):
+ # Test that we can handle a nonce becoming stale
+ uri = urllib.parse.urljoin(base, "digest-expire/file.txt")
+ self.http.add_credentials("joe", "password")
+ (response, content) = self.http.request(
+ uri, "GET", headers={"cache-control": "no-cache"}
+ )
+ info = httplib2._parse_www_authenticate(response, "authentication-info")
+ self.assertEqual(response.status, 200)
+
+ time.sleep(3)
+ # Sleep long enough that the nonce becomes stale
+
+ (response, content) = self.http.request(
+ uri, "GET", headers={"cache-control": "no-cache"}
+ )
+ self.assertFalse(response.fromcache)
+ self.assertTrue(response._stale_digest)
+ info3 = httplib2._parse_www_authenticate(response, "authentication-info")
+ self.assertEqual(response.status, 200)
+
+ def reflector(self, content):
+ return dict(
+ [
+ tuple(x.split("=", 1))
+ for x in content.decode("utf-8").strip().split("\n")
+ ]
+ )
+
+ def testReflector(self):
+ uri = urllib.parse.urljoin(base, "reflector/reflector.cgi")
+ (response, content) = self.http.request(uri, "GET")
+ d = self.reflector(content)
+ self.assertTrue("HTTP_USER_AGENT" in d)
+
+ def testConnectionClose(self):
+ uri = "http://www.google.com/"
+ (response, content) = self.http.request(uri, "GET")
+ for c in self.http.connections.values():
+ self.assertNotEqual(None, c.sock)
+ (response, content) = self.http.request(
+ uri, "GET", headers={"connection": "close"}
+ )
+ for c in self.http.connections.values():
+ self.assertEqual(None, c.sock)
+
+ def testPickleHttp(self):
+ pickled_http = pickle.dumps(self.http)
+ new_http = pickle.loads(pickled_http)
+
+ self.assertEqual(
+ sorted(new_http.__dict__.keys()), sorted(self.http.__dict__.keys())
+ )
+ for key in new_http.__dict__:
+ if key in ("certificates", "credentials"):
+ self.assertEqual(
+ new_http.__dict__[key].credentials,
+ self.http.__dict__[key].credentials,
+ )
+ elif key == "cache":
+ self.assertEqual(
+ new_http.__dict__[key].cache, self.http.__dict__[key].cache
+ )
+ else:
+ self.assertEqual(new_http.__dict__[key], self.http.__dict__[key])
+
+ def testPickleHttpWithConnection(self):
+ self.http.request("http://bitworking.org", connection_type=_MyHTTPConnection)
+ pickled_http = pickle.dumps(self.http)
+ new_http = pickle.loads(pickled_http)
+
+ self.assertEqual(list(self.http.connections.keys()), ["http:bitworking.org"])
+ self.assertEqual(new_http.connections, {})
+
+ def testPickleCustomRequestHttp(self):
+ def dummy_request(*args, **kwargs):
+ return new_request(*args, **kwargs)
+
+ dummy_request.dummy_attr = "dummy_value"
+
+ self.http.request = dummy_request
+ pickled_http = pickle.dumps(self.http)
+ self.assertFalse(b"S'request'" in pickled_http)
+
+
+try:
+ import memcache
+
+ class HttpTestMemCached(HttpTest):
+ def setUp(self):
+ self.cache = memcache.Client(["127.0.0.1:11211"], debug=0)
+ # self.cache = memcache.Client(['10.0.0.4:11211'], debug=1)
+ self.http = httplib2.Http(self.cache)
+ self.cache.flush_all()
+ # Not exactly sure why the sleep is needed here, but
+ # if not present then some unit tests that rely on caching
+ # fail. Memcached seems to lose some sets immediately
+ # after a flush_all if the set is to a value that
+ # was previously cached. (Maybe the flush is handled async?)
+ time.sleep(1)
+ self.http.clear_credentials()
+
+
+except:
+ pass
+
+# ------------------------------------------------------------------------
+
+
+class HttpPrivateTest(unittest.TestCase):
+ def testParseCacheControl(self):
+ # Test that we can parse the Cache-Control header
+ self.assertEqual({}, httplib2._parse_cache_control({}))
+ self.assertEqual(
+ {"no-cache": 1},
+ httplib2._parse_cache_control({"cache-control": " no-cache"}),
+ )
+ cc = httplib2._parse_cache_control(
+ {"cache-control": " no-cache, max-age = 7200"}
+ )
+ self.assertEqual(cc["no-cache"], 1)
+ self.assertEqual(cc["max-age"], "7200")
+ cc = httplib2._parse_cache_control({"cache-control": " , "})
+ self.assertEqual(cc[""], 1)
+
+ try:
+ cc = httplib2._parse_cache_control(
+ {"cache-control": "Max-age=3600;post-check=1800,pre-check=3600"}
+ )
+ self.assertTrue("max-age" in cc)
+ except:
+ self.fail("Should not throw exception")
+
+ def testNormalizeHeaders(self):
+ # Test that we normalize headers to lowercase
+ h = httplib2._normalize_headers({"Cache-Control": "no-cache", "Other": "Stuff"})
+ self.assertTrue("cache-control" in h)
+ self.assertTrue("other" in h)
+ self.assertEqual("Stuff", h["other"])
+
+ def testConvertByteStr(self):
+ with self.assertRaises(TypeError):
+ httplib2._convert_byte_str(4)
+ self.assertEqual("Hello World", httplib2._convert_byte_str(b"Hello World"))
+ self.assertEqual("Bye World", httplib2._convert_byte_str("Bye World"))
+
+ def testExpirationModelTransparent(self):
+ # Test that no-cache makes our request TRANSPARENT
+ response_headers = {"cache-control": "max-age=7200"}
+ request_headers = {"cache-control": "no-cache"}
+ self.assertEqual(
+ "TRANSPARENT",
+ httplib2._entry_disposition(response_headers, request_headers),
+ )
+
+ def testMaxAgeNonNumeric(self):
+ # Test that no-cache makes our request TRANSPARENT
+ response_headers = {"cache-control": "max-age=fred, min-fresh=barney"}
+ request_headers = {}
+ self.assertEqual(
+ "STALE", httplib2._entry_disposition(response_headers, request_headers)
+ )
+
+ def testExpirationModelNoCacheResponse(self):
+ # The date and expires point to an entry that should be
+ # FRESH, but the no-cache over-rides that.
+ now = time.time()
+ response_headers = {
+ "date": time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now)),
+ "expires": time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now + 4)),
+ "cache-control": "no-cache",
+ }
+ request_headers = {}
+ self.assertEqual(
+ "STALE", httplib2._entry_disposition(response_headers, request_headers)
+ )
+
+ def testExpirationModelStaleRequestMustReval(self):
+ # must-revalidate forces STALE
+ self.assertEqual(
+ "STALE",
+ httplib2._entry_disposition({}, {"cache-control": "must-revalidate"}),
+ )
+
+ def testExpirationModelStaleResponseMustReval(self):
+ # must-revalidate forces STALE
+ self.assertEqual(
+ "STALE",
+ httplib2._entry_disposition({"cache-control": "must-revalidate"}, {}),
+ )
+
+ def testExpirationModelFresh(self):
+ response_headers = {
+ "date": time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime()),
+ "cache-control": "max-age=2",
+ }
+ request_headers = {}
+ self.assertEqual(
+ "FRESH", httplib2._entry_disposition(response_headers, request_headers)
+ )
+ time.sleep(3)
+ self.assertEqual(
+ "STALE", httplib2._entry_disposition(response_headers, request_headers)
+ )
+
+ def testExpirationMaxAge0(self):
+ response_headers = {
+ "date": time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime()),
+ "cache-control": "max-age=0",
+ }
+ request_headers = {}
+ self.assertEqual(
+ "STALE", httplib2._entry_disposition(response_headers, request_headers)
+ )
+
+ def testExpirationModelDateAndExpires(self):
+ now = time.time()
+ response_headers = {
+ "date": time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now)),
+ "expires": time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now + 2)),
+ }
+ request_headers = {}
+ self.assertEqual(
+ "FRESH", httplib2._entry_disposition(response_headers, request_headers)
+ )
+ time.sleep(3)
+ self.assertEqual(
+ "STALE", httplib2._entry_disposition(response_headers, request_headers)
+ )
+
+ def testExpiresZero(self):
+ now = time.time()
+ response_headers = {
+ "date": time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now)),
+ "expires": "0",
+ }
+ request_headers = {}
+ self.assertEqual(
+ "STALE", httplib2._entry_disposition(response_headers, request_headers)
+ )
+
+ def testExpirationModelDateOnly(self):
+ now = time.time()
+ response_headers = {
+ "date": time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now + 3))
+ }
+ request_headers = {}
+ self.assertEqual(
+ "STALE", httplib2._entry_disposition(response_headers, request_headers)
+ )
+
+ def testExpirationModelOnlyIfCached(self):
+ response_headers = {}
+ request_headers = {"cache-control": "only-if-cached"}
+ self.assertEqual(
+ "FRESH", httplib2._entry_disposition(response_headers, request_headers)
+ )
+
+ def testExpirationModelMaxAgeBoth(self):
+ now = time.time()
+ response_headers = {
+ "date": time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now)),
+ "cache-control": "max-age=2",
+ }
+ request_headers = {"cache-control": "max-age=0"}
+ self.assertEqual(
+ "STALE", httplib2._entry_disposition(response_headers, request_headers)
+ )
+
+ def testExpirationModelDateAndExpiresMinFresh1(self):
+ now = time.time()
+ response_headers = {
+ "date": time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now)),
+ "expires": time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now + 2)),
+ }
+ request_headers = {"cache-control": "min-fresh=2"}
+ self.assertEqual(
+ "STALE", httplib2._entry_disposition(response_headers, request_headers)
+ )
+
+ def testExpirationModelDateAndExpiresMinFresh2(self):
+ now = time.time()
+ response_headers = {
+ "date": time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now)),
+ "expires": time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now + 4)),
+ }
+ request_headers = {"cache-control": "min-fresh=2"}
+ self.assertEqual(
+ "FRESH", httplib2._entry_disposition(response_headers, request_headers)
+ )
+
+ def testParseWWWAuthenticateEmpty(self):
+ res = httplib2._parse_www_authenticate({})
+ self.assertEqual(len(list(res.keys())), 0)
+
+ def testParseWWWAuthenticate(self):
+ # different uses of spaces around commas
+ res = httplib2._parse_www_authenticate(
+ {
+ "www-authenticate": 'Test realm="test realm" , foo=foo ,bar="bar", baz=baz,qux=qux'
+ }
+ )
+ self.assertEqual(len(list(res.keys())), 1)
+ self.assertEqual(len(list(res["test"].keys())), 5)
+
+ # tokens with non-alphanum
+ res = httplib2._parse_www_authenticate(
+ {"www-authenticate": 'T*!%#st realm=to*!%#en, to*!%#en="quoted string"'}
+ )
+ self.assertEqual(len(list(res.keys())), 1)
+ self.assertEqual(len(list(res["t*!%#st"].keys())), 2)
+
+ # quoted string with quoted pairs
+ res = httplib2._parse_www_authenticate(
+ {"www-authenticate": 'Test realm="a \\"test\\" realm"'}
+ )
+ self.assertEqual(len(list(res.keys())), 1)
+ self.assertEqual(res["test"]["realm"], 'a "test" realm')
+
+ def testParseWWWAuthenticateStrict(self):
+ httplib2.USE_WWW_AUTH_STRICT_PARSING = 1
+ self.testParseWWWAuthenticate()
+ httplib2.USE_WWW_AUTH_STRICT_PARSING = 0
+
+ def testParseWWWAuthenticateBasic(self):
+ res = httplib2._parse_www_authenticate({"www-authenticate": 'Basic realm="me"'})
+ basic = res["basic"]
+ self.assertEqual("me", basic["realm"])
+
+ res = httplib2._parse_www_authenticate(
+ {"www-authenticate": 'Basic realm="me", algorithm="MD5"'}
+ )
+ basic = res["basic"]
+ self.assertEqual("me", basic["realm"])
+ self.assertEqual("MD5", basic["algorithm"])
+
+ res = httplib2._parse_www_authenticate(
+ {"www-authenticate": 'Basic realm="me", algorithm=MD5'}
+ )
+ basic = res["basic"]
+ self.assertEqual("me", basic["realm"])
+ self.assertEqual("MD5", basic["algorithm"])
+
+ def testParseWWWAuthenticateBasic2(self):
+ res = httplib2._parse_www_authenticate(
+ {"www-authenticate": 'Basic realm="me",other="fred" '}
+ )
+ basic = res["basic"]
+ self.assertEqual("me", basic["realm"])
+ self.assertEqual("fred", basic["other"])
+
+ def testParseWWWAuthenticateBasic3(self):
+ res = httplib2._parse_www_authenticate(
+ {"www-authenticate": 'Basic REAlm="me" '}
+ )
+ basic = res["basic"]
+ self.assertEqual("me", basic["realm"])
+
+ def testParseWWWAuthenticateDigest(self):
+ res = httplib2._parse_www_authenticate(
+ {
+ "www-authenticate": 'Digest realm="testrealm@host.com", qop="auth,auth-int", nonce="dcd98b7102dd2f0e8b11d0f600bfb0c093", opaque="5ccc069c403ebaf9f0171e9517f40e41"'
+ }
+ )
+ digest = res["digest"]
+ self.assertEqual("testrealm@host.com", digest["realm"])
+ self.assertEqual("auth,auth-int", digest["qop"])
+
+ def testParseWWWAuthenticateMultiple(self):
+ res = httplib2._parse_www_authenticate(
+ {
+ "www-authenticate": 'Digest realm="testrealm@host.com", qop="auth,auth-int", nonce="dcd98b7102dd2f0e8b11d0f600bfb0c093", opaque="5ccc069c403ebaf9f0171e9517f40e41" Basic REAlm="me" '
+ }
+ )
+ digest = res["digest"]
+ self.assertEqual("testrealm@host.com", digest["realm"])
+ self.assertEqual("auth,auth-int", digest["qop"])
+ self.assertEqual("dcd98b7102dd2f0e8b11d0f600bfb0c093", digest["nonce"])
+ self.assertEqual("5ccc069c403ebaf9f0171e9517f40e41", digest["opaque"])
+ basic = res["basic"]
+ self.assertEqual("me", basic["realm"])
+
+ def testParseWWWAuthenticateMultiple2(self):
+ # Handle an added comma between challenges, which might get thrown in if the challenges were
+ # originally sent in separate www-authenticate headers.
+ res = httplib2._parse_www_authenticate(
+ {
+ "www-authenticate": 'Digest realm="testrealm@host.com", qop="auth,auth-int", nonce="dcd98b7102dd2f0e8b11d0f600bfb0c093", opaque="5ccc069c403ebaf9f0171e9517f40e41", Basic REAlm="me" '
+ }
+ )
+ digest = res["digest"]
+ self.assertEqual("testrealm@host.com", digest["realm"])
+ self.assertEqual("auth,auth-int", digest["qop"])
+ self.assertEqual("dcd98b7102dd2f0e8b11d0f600bfb0c093", digest["nonce"])
+ self.assertEqual("5ccc069c403ebaf9f0171e9517f40e41", digest["opaque"])
+ basic = res["basic"]
+ self.assertEqual("me", basic["realm"])
+
+ def testParseWWWAuthenticateMultiple3(self):
+ # Handle an added comma between challenges, which might get thrown in if the challenges were
+ # originally sent in separate www-authenticate headers.
+ res = httplib2._parse_www_authenticate(
+ {
+ "www-authenticate": 'Digest realm="testrealm@host.com", qop="auth,auth-int", nonce="dcd98b7102dd2f0e8b11d0f600bfb0c093", opaque="5ccc069c403ebaf9f0171e9517f40e41", Basic REAlm="me", WSSE realm="foo", profile="UsernameToken"'
+ }
+ )
+ digest = res["digest"]
+ self.assertEqual("testrealm@host.com", digest["realm"])
+ self.assertEqual("auth,auth-int", digest["qop"])
+ self.assertEqual("dcd98b7102dd2f0e8b11d0f600bfb0c093", digest["nonce"])
+ self.assertEqual("5ccc069c403ebaf9f0171e9517f40e41", digest["opaque"])
+ basic = res["basic"]
+ self.assertEqual("me", basic["realm"])
+ wsse = res["wsse"]
+ self.assertEqual("foo", wsse["realm"])
+ self.assertEqual("UsernameToken", wsse["profile"])
+
+ def testParseWWWAuthenticateMultiple4(self):
+ res = httplib2._parse_www_authenticate(
+ {
+ "www-authenticate": 'Digest realm="test-real.m@host.com", qop \t=\t"\tauth,auth-int", nonce="(*)&^&$%#",opaque="5ccc069c403ebaf9f0171e9517f40e41", Basic REAlm="me", WSSE realm="foo", profile="UsernameToken"'
+ }
+ )
+ digest = res["digest"]
+ self.assertEqual("test-real.m@host.com", digest["realm"])
+ self.assertEqual("\tauth,auth-int", digest["qop"])
+ self.assertEqual("(*)&^&$%#", digest["nonce"])
+
+ def testParseWWWAuthenticateMoreQuoteCombos(self):
+ res = httplib2._parse_www_authenticate(
+ {
+ "www-authenticate": 'Digest realm="myrealm", nonce="Ygk86AsKBAA=3516200d37f9a3230352fde99977bd6d472d4306", algorithm=MD5, qop="auth", stale=true'
+ }
+ )
+ digest = res["digest"]
+ self.assertEqual("myrealm", digest["realm"])
+
+ def testParseWWWAuthenticateMalformed(self):
+ try:
+ res = httplib2._parse_www_authenticate(
+ {
+ "www-authenticate": 'OAuth "Facebook Platform" "invalid_token" "Invalid OAuth access token."'
+ }
+ )
+ self.fail("should raise an exception")
+ except httplib2.MalformedHeader:
+ pass
+
+ def testDigestObject(self):
+ credentials = ("joe", "password")
+ host = None
+ request_uri = "/projects/httplib2/test/digest/"
+ headers = {}
+ response = {
+ "www-authenticate": 'Digest realm="myrealm", '
+ 'nonce="Ygk86AsKBAA=3516200d37f9a3230352fde99977bd6d472d4306", '
+ 'algorithm=MD5, qop="auth"'
+ }
+ content = b""
+
+ d = httplib2.DigestAuthentication(
+ credentials, host, request_uri, headers, response, content, None
+ )
+ d.request("GET", request_uri, headers, content, cnonce="33033375ec278a46")
+ our_request = "authorization: %s" % headers["authorization"]
+ working_request = (
+ 'authorization: Digest username="joe", realm="myrealm", '
+ 'nonce="Ygk86AsKBAA=3516200d37f9a3230352fde99977bd6d472d4306",'
+ ' uri="/projects/httplib2/test/digest/", algorithm=MD5, '
+ 'response="97ed129401f7cdc60e5db58a80f3ea8b", qop=auth, '
+ 'nc=00000001, cnonce="33033375ec278a46"'
+ )
+ self.assertEqual(our_request, working_request)
+
+ def testDigestObjectWithOpaque(self):
+ credentials = ("joe", "password")
+ host = None
+ request_uri = "/projects/httplib2/test/digest/"
+ headers = {}
+ response = {
+ "www-authenticate": 'Digest realm="myrealm", '
+ 'nonce="Ygk86AsKBAA=3516200d37f9a3230352fde99977bd6d472d4306", '
+ 'algorithm=MD5, qop="auth", opaque="atestopaque"'
+ }
+ content = ""
+
+ d = httplib2.DigestAuthentication(
+ credentials, host, request_uri, headers, response, content, None
+ )
+ d.request("GET", request_uri, headers, content, cnonce="33033375ec278a46")
+ our_request = "authorization: %s" % headers["authorization"]
+ working_request = (
+ 'authorization: Digest username="joe", realm="myrealm", '
+ 'nonce="Ygk86AsKBAA=3516200d37f9a3230352fde99977bd6d472d4306",'
+ ' uri="/projects/httplib2/test/digest/", algorithm=MD5, '
+ 'response="97ed129401f7cdc60e5db58a80f3ea8b", qop=auth, '
+ 'nc=00000001, cnonce="33033375ec278a46", '
+ 'opaque="atestopaque"'
+ )
+ self.assertEqual(our_request, working_request)
+
+ def testDigestObjectStale(self):
+ credentials = ("joe", "password")
+ host = None
+ request_uri = "/projects/httplib2/test/digest/"
+ headers = {}
+ response = httplib2.Response({})
+ response["www-authenticate"] = (
+ 'Digest realm="myrealm", '
+ 'nonce="Ygk86AsKBAA=3516200d37f9a3230352fde99977bd6d472d4306",'
+ ' algorithm=MD5, qop="auth", stale=true'
+ )
+ response.status = 401
+ content = b""
+ d = httplib2.DigestAuthentication(
+ credentials, host, request_uri, headers, response, content, None
+ )
+ # Returns true to force a retry
+ self.assertTrue(d.response(response, content))
+
+ def testDigestObjectAuthInfo(self):
+ credentials = ("joe", "password")
+ host = None
+ request_uri = "/projects/httplib2/test/digest/"
+ headers = {}
+ response = httplib2.Response({})
+ response["www-authenticate"] = (
+ 'Digest realm="myrealm", '
+ 'nonce="Ygk86AsKBAA=3516200d37f9a3230352fde99977bd6d472d4306",'
+ ' algorithm=MD5, qop="auth", stale=true'
+ )
+ response["authentication-info"] = 'nextnonce="fred"'
+ content = b""
+ d = httplib2.DigestAuthentication(
+ credentials, host, request_uri, headers, response, content, None
+ )
+ # Returns true to force a retry
+ self.assertFalse(d.response(response, content))
+ self.assertEqual("fred", d.challenge["nonce"])
+ self.assertEqual(1, d.challenge["nc"])
+
+ def testWsseAlgorithm(self):
+ digest = httplib2._wsse_username_token(
+ "d36e316282959a9ed4c89851497a717f", "2003-12-15T14:43:07Z", "taadtaadpstcsm"
+ )
+ expected = b"quR/EWLAV4xLf9Zqyw4pDmfV9OY="
+ self.assertEqual(expected, digest)
+
+ def testEnd2End(self):
+ # one end to end header
+ response = {"content-type": "application/atom+xml", "te": "deflate"}
+ end2end = httplib2._get_end2end_headers(response)
+ self.assertTrue("content-type" in end2end)
+ self.assertTrue("te" not in end2end)
+ self.assertTrue("connection" not in end2end)
+
+ # one end to end header that gets eliminated
+ response = {
+ "connection": "content-type",
+ "content-type": "application/atom+xml",
+ "te": "deflate",
+ }
+ end2end = httplib2._get_end2end_headers(response)
+ self.assertTrue("content-type" not in end2end)
+ self.assertTrue("te" not in end2end)
+ self.assertTrue("connection" not in end2end)
+
+ # Degenerate case of no headers
+ response = {}
+ end2end = httplib2._get_end2end_headers(response)
+ self.assertEqual(0, len(end2end))
+
+ # Degenerate case of connection referrring to a header not passed in
+ response = {"connection": "content-type"}
+ end2end = httplib2._get_end2end_headers(response)
+ self.assertEqual(0, len(end2end))
+
+
+class TestProxyInfo(unittest.TestCase):
+ def setUp(self):
+ self.orig_env = dict(os.environ)
+
+ def tearDown(self):
+ os.environ.clear()
+ os.environ.update(self.orig_env)
+
+ def test_from_url(self):
+ pi = httplib2.proxy_info_from_url("http://myproxy.example.com")
+ self.assertEqual(pi.proxy_host, "myproxy.example.com")
+ self.assertEqual(pi.proxy_port, 80)
+ self.assertEqual(pi.proxy_user, None)
+
+ def test_from_url_ident(self):
+ pi = httplib2.proxy_info_from_url("http://zoidberg:fish@someproxy:99")
+ self.assertEqual(pi.proxy_host, "someproxy")
+ self.assertEqual(pi.proxy_port, 99)
+ self.assertEqual(pi.proxy_user, "zoidberg")
+ self.assertEqual(pi.proxy_pass, "fish")
+
+ def test_from_env(self):
+ os.environ["http_proxy"] = "http://myproxy.example.com:8080"
+ pi = httplib2.proxy_info_from_environment()
+ self.assertEqual(pi.proxy_host, "myproxy.example.com")
+ self.assertEqual(pi.proxy_port, 8080)
+
+ def test_from_env_no_proxy(self):
+ os.environ["http_proxy"] = "http://myproxy.example.com:80"
+ os.environ["https_proxy"] = "http://myproxy.example.com:81"
+ pi = httplib2.proxy_info_from_environment("https")
+ self.assertEqual(pi.proxy_host, "myproxy.example.com")
+ self.assertEqual(pi.proxy_port, 81)
+
+ def test_from_env_none(self):
+ os.environ.clear()
+ pi = httplib2.proxy_info_from_environment()
+ self.assertEqual(pi, None)
+
+ def test_proxy_headers(self):
+ headers = {"key0": "val0", "key1": "val1"}
+ pi = httplib2.ProxyInfo(
+ httplib2.socks.PROXY_TYPE_HTTP, "localhost", 1234, proxy_headers=headers
+ )
+ self.assertEqual(pi.proxy_headers, headers)
+
+ # regression: ensure that httplib2.HTTPConnectionWithTimeout initializes when proxy_info is not supplied
+ def test_proxy_init(self):
+ connection = httplib2.HTTPConnectionWithTimeout("www.google.com", 80)
+ connection.request("GET", "/")
+ connection.close()
+
+
+if __name__ == "__main__":
+ unittest.main()