tests: py2/3 unified, using pytest, automated on Travis
diff --git a/.gitignore b/.gitignore
index e690d52..0a800f2 100644
--- a/.gitignore
+++ b/.gitignore
@@ -1,4 +1,5 @@
*.py[cod]
+venv*/
# C extensions
*.so
diff --git a/.travis.yml b/.travis.yml
new file mode 100644
index 0000000..abf642e
--- /dev/null
+++ b/.travis.yml
@@ -0,0 +1,29 @@
+sudo: false
+language: python
+
+matrix:
+ fast_finish: true
+ include:
+ - {python: 2.7, env: test_group=pep8}
+ - {python: 3.6, env: test_group=pep8}
+ - {python: 2.7}
+ - {python: 3.3}
+ - {python: 3.4}
+ - {python: 3.5}
+ - {python: 3.6}
+ - {python: pypy}
+
+cache:
+ apt: true
+ ccache: true
+ pip: true
+ directories:
+ - $HOME/.cache
+addons:
+ apt_packages:
+ # - libssl-dev
+
+install:
+ - pip install 'pip>=9.0' 'setuptools>=36.2' 'codecov>=2.0.15' -r requirements-test.txt
+script:
+ - script/test -sv
diff --git a/python2/httplib2/__init__.py b/python2/httplib2/__init__.py
index 69dca7b..986dc7f 100644
--- a/python2/httplib2/__init__.py
+++ b/python2/httplib2/__init__.py
@@ -1,4 +1,4 @@
-from __future__ import generators
+from __future__ import print_function
"""
httplib2
@@ -11,7 +11,6 @@
2007-08-18, Rick: Modified so it's able to use a socks proxy if needed.
"""
-from __future__ import print_function
__author__ = "Joe Gregorio (joe@bitworking.org)"
__copyright__ = "Copyright 2006, Joe Gregorio"
@@ -147,6 +146,7 @@
seq.sort()
return seq
+
# Python 2.3 support
def HTTPResponse__getheaders(self):
"""Return list of (header, value) tuples."""
@@ -224,6 +224,7 @@
# Which headers are hop-by-hop headers by default
HOP_BY_HOP = ['connection', 'keep-alive', 'proxy-authenticate', 'proxy-authorization', 'te', 'trailers', 'transfer-encoding', 'upgrade']
+
def _get_end2end_headers(response):
hopbyhop = list(HOP_BY_HOP)
hopbyhop.extend([x.strip() for x in response.get('connection', '').split(',')])
@@ -231,6 +232,7 @@
URI = re.compile(r"^(([^:/?#]+):)?(//([^/?#]*))?([^?#]*)(\?([^#]*))?(#(.*))?")
+
def parse_uri(uri):
"""Parses a URI using the regex given in Appendix B of RFC 3986.
@@ -239,6 +241,7 @@
groups = URI.match(uri).groups()
return (groups[1], groups[3], groups[4], groups[6], groups[8])
+
def urlnorm(uri):
(scheme, authority, path, query, fragment) = parse_uri(uri)
if not scheme or not authority:
@@ -259,6 +262,7 @@
re_url_scheme = re.compile(r'^\w+://')
re_slash = re.compile(r'[?/:|]+')
+
def safename(filename):
"""Return a filename suitable for the cache.
@@ -287,9 +291,12 @@
return ",".join((filename, filemd5))
NORMALIZE_SPACE = re.compile(r'(?:\r\n)?[ \t]+')
+
+
def _normalize_headers(headers):
return dict([ (key.lower(), NORMALIZE_SPACE.sub(value, ' ').strip()) for (key, value) in headers.iteritems()])
+
def _parse_cache_control(headers):
retval = {}
if 'cache-control' in headers:
@@ -345,6 +352,7 @@
return retval
+# TODO: add current time as _entry_disposition argument to avoid sleep in tests
def _entry_disposition(response_headers, request_headers):
"""Determine freshness from the Date, Expires and Cache-Control headers.
@@ -419,6 +427,7 @@
retval = "FRESH"
return retval
+
def _decompressContent(response, new_content):
content = new_content
try:
@@ -432,11 +441,12 @@
# Record the historical presence of the encoding in a way the won't interfere.
response['-content-encoding'] = response['content-encoding']
del response['content-encoding']
- except IOError:
+ except (IOError, zlib.error):
content = ""
raise FailedToDecompressContent(_("Content purported to be compressed with %s but failed to decompress.") % response.get('content-encoding'), response, content)
return content
+
def _updateCache(request_headers, response_headers, content, cache, cachekey):
if cachekey:
cc = _parse_cache_control(request_headers)
@@ -523,7 +533,6 @@
return False
-
class BasicAuthentication(Authentication):
def __init__(self, credentials, host, request_uri, headers, response, content, http):
Authentication.__init__(self, credentials, host, request_uri, headers, response, content, http)
@@ -676,6 +685,7 @@
cnonce,
iso_now)
+
class GoogleLoginAuthentication(Authentication):
def __init__(self, credentials, host, request_uri, headers, response, content, http):
from urllib import urlencode
@@ -715,12 +725,13 @@
AUTH_SCHEME_ORDER = ["hmacdigest", "googlelogin", "digest", "wsse", "basic"]
+
class FileCache(object):
"""Uses a local directory as a store for cached files.
Not really safe to use if multiple threads or processes are going to
be running on the same cache.
"""
- def __init__(self, cache, safe=safename): # use safe=lambda x: md5.new(x).hexdigest() for the old behavior
+ def __init__(self, cache, safe=safename): # use safe=lambda x: md5.new(x).hexdigest() for the old behavior
self.cache = cache
self.safe = safe
if not os.path.exists(cache):
@@ -748,6 +759,7 @@
if os.path.exists(cacheFullPath):
os.remove(cacheFullPath)
+
class Credentials(object):
def __init__(self):
self.credentials = []
@@ -763,14 +775,17 @@
if cdomain == "" or domain == cdomain:
yield (name, password)
+
class KeyCerts(Credentials):
"""Identical to Credentials except that
name/password are mapped to key/cert."""
pass
+
class AllHosts(object):
pass
+
class ProxyInfo(object):
"""Collect information required to use a proxy."""
bypass_hosts = ()
@@ -871,7 +886,7 @@
else:
port = dict(https=443, http=80)[method]
- proxy_type = 3 # socks.PROXY_TYPE_HTTP
+ proxy_type = 3 # socks.PROXY_TYPE_HTTP
pi = ProxyInfo(
proxy_type = proxy_type,
proxy_host = host,
@@ -887,7 +902,7 @@
noproxy = os.environ.get('no_proxy', os.environ.get('NO_PROXY', ''))
# Special case: A single '*' character means all hosts should be bypassed.
if noproxy == '*':
- bypass_hosts = httplib2.AllHosts
+ bypass_hosts = AllHosts
elif noproxy.strip():
bypass_hosts = noproxy.split(',')
bypass_hosts = filter(bool, bypass_hosts) # To exclude empty string.
@@ -962,7 +977,8 @@
continue
break
if not self.sock:
- raise socket.error(msg)
+ raise socket.error, msg
+
class HTTPSConnectionWithTimeout(httplib.HTTPSConnection):
"""
@@ -1116,7 +1132,7 @@
continue
break
if not self.sock:
- raise socket.error(msg)
+ raise socket.error, msg
SCHEME_TO_CONNECTION = {
'http': HTTPConnectionWithTimeout,
@@ -1343,7 +1359,7 @@
err = getattr(e, 'args')[0]
else:
err = e.errno
- if err == errno.ECONNREFUSED: # Connection refused
+ if err == errno.ECONNREFUSED: # Connection refused
raise
if err in (errno.ENETUNREACH, errno.EADDRNOTAVAIL) and i < RETRIES:
continue # retry on potentially transient socket errors
@@ -1749,7 +1765,6 @@
self.status = int(self.get('status', self.status))
self.reason = self.get('reason', self.reason)
-
def __getattr__(self, name):
if name == 'dict':
return self
diff --git a/python3/httplib2/__init__.py b/python3/httplib2/__init__.py
index 8e3a0ec..de68578 100644
--- a/python3/httplib2/__init__.py
+++ b/python3/httplib2/__init__.py
@@ -1,4 +1,3 @@
-
"""
httplib2
@@ -26,7 +25,6 @@
__license__ = "MIT"
__version__ = "0.10.3"
-from __future__ import print_function
import re
import sys
import email
@@ -340,7 +338,7 @@
# Record the historical presence of the encoding in a way the won't interfere.
response['-content-encoding'] = response['content-encoding']
del response['content-encoding']
- except IOError:
+ except (IOError, zlib.error):
content = ""
raise FailedToDecompressContent(_("Content purported to be compressed with %s but failed to decompress.") % response.get('content-encoding'), response, content)
return content
diff --git a/requirements-test.txt b/requirements-test.txt
new file mode 100644
index 0000000..674a214
--- /dev/null
+++ b/requirements-test.txt
@@ -0,0 +1,9 @@
+flake8==3.4.1
+mock==2.0.0
+pytest-cov==2.5.1
+pytest-forked==0.2
+pytest-randomly==1.2.1
+pytest-timeout==1.2.0
+pytest-xdist==1.20.0
+pytest==3.2.1
+six==1.10.0
diff --git a/script/test b/script/test
new file mode 100755
index 0000000..a88318b
--- /dev/null
+++ b/script/test
@@ -0,0 +1,45 @@
+#!/bin/bash
+set -eux
+# By default, run tests with pytest-forked plugin,
+# disable in terminal for debugging, you may add --forked
+flag_forked="--forked"
+if [[ -z "${CONTINUOUS_INTEGRATION-}" ]] && [[ -t 1 ]] ; then
+ flag_forked=""
+fi
+test_flags=(
+ $@
+ $flag_forked
+ tests/
+)
+
+cd "$( dirname "${BASH_SOURCE[0]}" )/.."
+if [[ -n "${CONTINUOUS_INTEGRATION-}" ]] ; then
+ if [[ "${test_group-}" = "pep8" ]] ; then
+ if [[ "${TRAVIS_PYTHON_VERSION}" = "2.7" ]] ; then
+ flake8 python2/
+ else
+ flake8 python3/ tests/
+ fi
+ else
+ pip install -e .
+ httplib2_test_still_run_skipped=1 pytest --fulltrace -k test_303 $@ tests/ || true
+ httplib2_test_still_run_skipped=1 pytest --fulltrace -k test_head_301 $@ tests/ || true
+ pytest --fulltrace ${test_flags[@]}
+ fi
+ codecov --flags=$(echo $python |tr -d -- '-.')
+else
+ if [[ ! -d ./venv-27 ]] ; then
+ virtualenv --python=python2.7 ./venv-27
+ ./venv-27/bin/pip install -e . -r requirements-test.txt
+ fi
+ if [[ ! -d ./venv-36 ]] ; then
+ virtualenv --python=python3.6 ./venv-36
+ ./venv-36/bin/pip install -e . -r requirements-test.txt
+ fi
+ ./venv-27/bin/pytest ${test_flags[@]}
+ ./venv-36/bin/pytest ${test_flags[@]}
+ # FIXME: too many errors
+ # ./venv-27/bin/flake8 python2/
+ # ./venv-36/bin/flake8 python3/ tests/
+fi
+rm -rf ./_httplib2_test_cache
diff --git a/setup.cfg b/setup.cfg
new file mode 100644
index 0000000..432d065
--- /dev/null
+++ b/setup.cfg
@@ -0,0 +1,22 @@
+[coverage:run]
+omit = */test/*
+
+[flake8]
+exclude = *.egg*,.env,.git,.tox,_*,build*,dist*,venv*,python2/,python3/
+ignore = E261,W503
+max-line-length = 121
+
+[tool:pytest]
+minversion = 3.2
+addopts =
+ # --fulltrace
+ # -n auto
+ --cov-config=setup.cfg
+ --cov=httplib2
+ --noconftest
+ --showlocals
+ --strict
+ --tb=short
+ --timeout=17
+ --verbose
+ -ra
diff --git a/tests/__init__.py b/tests/__init__.py
new file mode 100644
index 0000000..6cac5fd
--- /dev/null
+++ b/tests/__init__.py
@@ -0,0 +1,618 @@
+from __future__ import print_function
+
+import base64
+import contextlib
+import copy
+import email.utils
+import functools
+import gzip
+import hashlib
+import httplib2
+import os
+import random
+import re
+import shutil
+import six
+import socket
+import struct
+import threading
+import time
+import traceback
+import zlib
+from six.moves import http_client, queue
+
+
+@contextlib.contextmanager
+def assert_raises(exc_type):
+ try:
+ yield
+ except exc_type:
+ pass
+ else:
+ name = str(exc_type)
+ try:
+ name = exc_type.__name__
+ except AttributeError:
+ pass
+ assert False, 'Expected exception {0}'.format(name)
+
+
+class BufferedReader(object):
+ '''io.BufferedReader with \r\n support
+ '''
+ def __init__(self, sock):
+ self._buf = b''
+ self._end = False
+ self._newline = b'\r\n'
+ self._sock = sock
+ if isinstance(sock, bytes):
+ self._sock = None
+ self._buf = sock
+
+ def _fill(self, target=1, more=None, untilend=False):
+ if more:
+ target = len(self._buf) + more
+ while untilend or (len(self._buf) < target):
+ # crutch to enable HttpRequest.from_bytes
+ if self._sock is None:
+ chunk = b''
+ else:
+ chunk = self._sock.recv(8 << 10)
+ # print('!!! recv', chunk)
+ if not chunk:
+ self._end = True
+ if untilend:
+ return
+ else:
+ raise EOFError
+ self._buf += chunk
+
+ def peek(self, size):
+ self._fill(target=size)
+ return self._buf[:size]
+
+ def read(self, size):
+ self._fill(target=size)
+ chunk, self._buf = self._buf[:size], self._buf[size:]
+ return chunk
+
+ def readall(self):
+ self._fill(untilend=True)
+ chunk, self._buf = self._buf, b''
+ return chunk
+
+ def readline(self):
+ while True:
+ i = self._buf.find(self._newline)
+ if i >= 0:
+ break
+ self._fill(more=1)
+ inext = i + len(self._newline)
+ line, self._buf = self._buf[:inext], self._buf[inext:]
+ return line
+
+
+def parse_http_message(kind, buf):
+ if buf._end:
+ return None
+ try:
+ start_line = buf.readline()
+ except EOFError:
+ return None
+ msg = kind()
+ msg.raw = start_line
+ if kind is HttpRequest:
+ assert re.match(br'.+ HTTP/\d\.\d\r\n$', start_line), 'Start line does not look like HTTP request: ' + repr(start_line)
+ msg.method, msg.uri, msg.proto = start_line.rstrip().decode().split(' ', 2)
+ assert msg.proto.startswith('HTTP/'), repr(start_line)
+ elif kind is HttpResponse:
+ assert re.match(br'^HTTP/\d\.\d \d+ .+\r\n$', start_line), 'Start line does not look like HTTP response: ' + repr(start_line)
+ msg.proto, msg.status, msg.reason = start_line.rstrip().decode().split(' ', 2)
+ msg.status = int(msg.status)
+ assert msg.proto.startswith('HTTP/'), repr(start_line)
+ else:
+ raise Exception('Use HttpRequest or HttpResponse .from_{bytes,buffered}')
+ msg.version = msg.proto[5:]
+
+ while True:
+ line = buf.readline()
+ msg.raw += line
+ line = line.rstrip()
+ if not line:
+ break
+ t = line.decode().split(':', 1)
+ msg.headers[t[0].lower()] = t[1].lstrip()
+
+ content_length_string = msg.headers.get('content-length', '')
+ if content_length_string.isdigit():
+ content_length = int(content_length_string)
+ msg.body = msg.body_raw = buf.read(content_length)
+ elif msg.headers.get('transfer-encoding') == 'chunked':
+ raise NotImplemented
+ elif msg.version == '1.0':
+ msg.body = msg.body_raw = buf.readall()
+ else:
+ msg.body = msg.body_raw = b''
+
+ msg.raw += msg.body_raw
+ return msg
+
+
+class HttpMessage(object):
+ def __init__(self):
+ self.headers = {}
+
+ @classmethod
+ def from_bytes(cls, bs):
+ buf = BufferedReader(bs)
+ return parse_http_message(cls, buf)
+
+ @classmethod
+ def from_buffered(cls, buf):
+ return parse_http_message(cls, buf)
+
+ def __repr__(self):
+ return '{} {}'.format(self.__class__, repr(vars(self)))
+
+
+class HttpRequest(HttpMessage):
+ pass
+
+
+class HttpResponse(HttpMessage):
+ pass
+
+
+class MockResponse(six.BytesIO):
+ def __init__(self, body, **kwargs):
+ six.BytesIO.__init__(self, body)
+ self.headers = kwargs
+
+ def items(self):
+ return self.headers.items()
+
+ def iteritems(self):
+ return six.iteritems(self.headers)
+
+
+class MockHTTPConnection(object):
+ '''This class is just a mock of httplib.HTTPConnection used for testing
+ '''
+
+ def __init__(self, host, port=None, key_file=None, cert_file=None,
+ strict=None, timeout=None, proxy_info=None):
+ self.host = host
+ self.port = port
+ self.timeout = timeout
+ self.log = ''
+ self.sock = None
+
+ def set_debuglevel(self, level):
+ pass
+
+ def connect(self):
+ 'Connect to a host on a given port.'
+ pass
+
+ def close(self):
+ pass
+
+ def request(self, method, request_uri, body, headers):
+ pass
+
+ def getresponse(self):
+ return MockResponse(b'the body', status='200')
+
+
+class MockHTTPBadStatusConnection(object):
+ '''Mock of httplib.HTTPConnection that raises BadStatusLine.
+ '''
+
+ num_calls = 0
+
+ def __init__(self, host, port=None, key_file=None, cert_file=None,
+ strict=None, timeout=None, proxy_info=None):
+ self.host = host
+ self.port = port
+ self.timeout = timeout
+ self.log = ''
+ self.sock = None
+ MockHTTPBadStatusConnection.num_calls = 0
+
+ def set_debuglevel(self, level):
+ pass
+
+ def connect(self):
+ pass
+
+ def close(self):
+ pass
+
+ def request(self, method, request_uri, body, headers):
+ pass
+
+ def getresponse(self):
+ MockHTTPBadStatusConnection.num_calls += 1
+ raise http_client.BadStatusLine('')
+
+
+@contextlib.contextmanager
+def server_socket(fun, request_count=1, timeout=5):
+ gresult = [None]
+ gcounter = [0]
+
+ def tick(request):
+ gcounter[0] += 1
+ keep = True
+ keep &= gcounter[0] < request_count
+ keep &= request.headers.get('connection', '').lower() != 'close'
+ return keep
+
+ def server_socket_thread(srv):
+ try:
+ while gcounter[0] < request_count:
+ client, _ = srv.accept()
+ try:
+ client.settimeout(timeout)
+ fun(client, tick)
+ finally:
+ try:
+ client.shutdown(socket.SHUT_RDWR)
+ except (IOError, socket.error):
+ pass
+ # FIXME: client.close() introduces connection reset by peer
+ # at least in other/connection_close test
+ # should not be a problem since socket would close upon garbage collection
+ if gcounter[0] > request_count:
+ gresult[0] = Exception('Request count expected={0} actual={1}'.format(request_count, gcounter[0]))
+ except Exception as e:
+ traceback.print_exc()
+ gresult[0] = e
+
+ server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ server.bind(('localhost', 0))
+ try:
+ server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
+ except socket.error as ex:
+ print('non critical error on SO_REUSEADDR', ex)
+ server.listen(10)
+ server.settimeout(timeout)
+ t = threading.Thread(target=server_socket_thread, args=(server,))
+ t.daemon = True
+ t.start()
+ yield u'http://{0}:{1}/'.format(*server.getsockname())
+ server.close()
+ t.join()
+ if gresult[0] is not None:
+ raise gresult[0]
+
+
+def server_yield(fun, **kwargs):
+ q = queue.Queue(1)
+ g = fun(q.get)
+
+ def server_yield_socket_handler(sock, tick):
+ buf = BufferedReader(sock)
+ i = 0
+ while True:
+ request = HttpRequest.from_buffered(buf)
+ if request is None:
+ break
+ i += 1
+ request.client_addr = sock.getsockname()
+ request.number = i
+ q.put(request)
+ response = six.next(g)
+ sock.sendall(response)
+ if not tick(request):
+ break
+
+ return server_socket(server_yield_socket_handler, **kwargs)
+
+
+def server_request(request_handler, **kwargs):
+ def server_request_socket_handler(sock, tick):
+ buf = BufferedReader(sock)
+ i = 0
+ while True:
+ request = HttpRequest.from_buffered(buf)
+ if request is None:
+ break
+ i += 1
+ request.client_addr = sock.getsockname()
+ request.number = i
+ response = request_handler(request=request)
+ sock.sendall(response)
+ if not tick(request):
+ break
+
+ return server_socket(server_request_socket_handler, **kwargs)
+
+
+def server_const_bytes(response_content, **kwargs):
+ return server_request(lambda request: response_content, **kwargs)
+
+
+_http_kwargs = (
+ 'proto', 'status', 'headers', 'body', 'add_content_length', 'add_date', 'add_etag', 'undefined_body_length',
+)
+
+
+def http_response_bytes(proto='HTTP/1.1', status='200 OK', headers=None, body=b'',
+ add_content_length=True, add_date=False, add_etag=False,
+ undefined_body_length=False,
+ **kwargs):
+ if undefined_body_length:
+ add_content_length = False
+ if headers is None:
+ headers = {}
+ if add_content_length:
+ headers.setdefault('content-length', str(len(body)))
+ if add_date:
+ headers.setdefault('date', email.utils.formatdate())
+ if add_etag:
+ headers.setdefault('etag', '"{0}"'.format(hashlib.md5(body).hexdigest()))
+ header_string = ''.join('{0}: {1}\r\n'.format(k, v) for k, v in headers.items())
+ if not undefined_body_length and proto != 'HTTP/1.0' and 'content-length' not in headers:
+ raise Exception('httplib2.tests.http_response_bytes: client could not figure response body length')
+ if str(status).isdigit():
+ status = '{} {}'.format(status, http_client.responses[status])
+ response = '{proto} {status}\r\n{headers}\r\n'.format(
+ proto=proto,
+ status=status,
+ headers=header_string,
+ ).encode() + body
+ return response
+
+
+def make_http_reflect(**kwargs):
+ assert 'body' not in kwargs, 'make_http_reflect will overwrite response body'
+
+ def fun(request):
+ kw = copy.deepcopy(kwargs)
+ kw['body'] = request.raw
+ response = http_response_bytes(**kw)
+ return response
+
+ return fun
+
+
+def server_route(routes, **kwargs):
+ response_404 = http_response_bytes(status='404 Not Found')
+ response_wildcard = routes.get('')
+
+ def handler(request):
+ target = routes.get(request.uri, response_wildcard) or response_404
+ if callable(target):
+ response = target(request=request)
+ else:
+ response = target
+ return response
+
+ return server_request(handler, **kwargs)
+
+
+def server_const_http(**kwargs):
+ response_kwargs = {
+ k: kwargs.pop(k) for k in dict(kwargs)
+ if k in _http_kwargs
+ }
+ response = http_response_bytes(**response_kwargs)
+ return server_const_bytes(response, **kwargs)
+
+
+def server_list_http(responses, **kwargs):
+ i = iter(responses)
+
+ def handler(request):
+ return next(i)
+
+ kwargs.setdefault('request_count', len(responses))
+ return server_request(handler, **kwargs)
+
+
+def server_reflect(**kwargs):
+ response_kwargs = {
+ k: kwargs.pop(k) for k in dict(kwargs)
+ if k in _http_kwargs
+ }
+ http_handler = make_http_reflect(**response_kwargs)
+ return server_request(http_handler, **kwargs)
+
+
+def http_parse_auth(s):
+ '''https://tools.ietf.org/html/rfc7235#section-2.1
+ '''
+ scheme, rest = s.split(' ', 1)
+ result = {}
+ while True:
+ m = httplib2.WWW_AUTH_RELAXED.search(rest)
+ if not m:
+ break
+ if len(m.groups()) == 3:
+ key, value, rest = m.groups()
+ result[key.lower()] = httplib2.UNQUOTE_PAIRS.sub(r'\1', value)
+ return result
+
+
+def store_request_response(out):
+ def wrapper(fun):
+ @functools.wraps(fun)
+ def wrapped(request, *a, **kw):
+ response_bytes = fun(request, *a, **kw)
+ if out is not None:
+ response = HttpResponse.from_bytes(response_bytes)
+ out.append((request, response))
+ return response_bytes
+ return wrapped
+ return wrapper
+
+
+def http_reflect_with_auth(allow_scheme, allow_credentials, out_renew_nonce=None, out_requests=None):
+ '''
+ allow_scheme - 'basic', 'digest', etc
+ allow_credentials - sequence of ('name', 'password')
+ out_renew_nonce - None | [function]
+ Way to return nonce renew function to caller.
+ Kind of `out` parameter in some programming languages.
+ Allows to keep same signature for all handler builder functions.
+ out_requests - None | []
+ If set to list, every parsed request will be appended here.
+ '''
+ glastnc = [None]
+ gnextnonce = [None]
+ gserver_nonce = [gen_digest_nonce(salt=b'n')]
+ realm = 'httplib2 test'
+ server_opaque = gen_digest_nonce(salt=b'o')
+
+ def renew_nonce():
+ if gnextnonce[0]:
+ assert False, 'previous nextnonce was not used, probably bug in test code'
+ gnextnonce[0] = gen_digest_nonce()
+ return gserver_nonce[0], gnextnonce[0]
+
+ if out_renew_nonce:
+ out_renew_nonce[0] = renew_nonce
+
+ def deny(**kwargs):
+ nonce_stale = kwargs.pop('nonce_stale', False)
+ if nonce_stale:
+ kwargs.setdefault('body', b'nonce stale')
+ if allow_scheme == 'basic':
+ authenticate = 'basic realm="{realm}"'.format(realm=realm)
+ elif allow_scheme == 'digest':
+ authenticate = (
+ 'digest realm="{realm}", qop="auth"'
+ + ', nonce="{nonce}", opaque="{opaque}"'
+ + (', stale=true' if nonce_stale else '')
+ ).format(realm=realm, nonce=gserver_nonce[0], opaque=server_opaque)
+ else:
+ raise Exception('unknown allow_scheme={0}'.format(allow_scheme))
+ deny_headers = {'www-authenticate': authenticate}
+ kwargs.setdefault('status', 401)
+ # supplied headers may overwrite generated ones
+ deny_headers.update(kwargs.get('headers', {}))
+ kwargs['headers'] = deny_headers
+ kwargs.setdefault('body', b'HTTP authorization required')
+ return http_response_bytes(**kwargs)
+
+ @store_request_response(out_requests)
+ def http_reflect_with_auth_handler(request):
+ auth_header = request.headers.get('authorization', '')
+ if not auth_header:
+ return deny()
+ if ' ' not in auth_header:
+ return http_response_bytes(status=400, body=b'authorization header syntax error')
+ scheme, data = auth_header.split(' ', 1)
+ scheme = scheme.lower()
+ if scheme != allow_scheme:
+ return deny(body=b'must use different auth scheme')
+ if scheme == 'basic':
+ decoded = base64.b64decode(data).decode()
+ username, password = decoded.split(':', 1)
+ if (username, password) in allow_credentials:
+ return make_http_reflect()(request)
+ else:
+ return deny(body=b'supplied credentials are not allowed')
+ elif scheme == 'digest':
+ server_nonce_old = gserver_nonce[0]
+ nextnonce = gnextnonce[0]
+ if nextnonce:
+ # server decided to change nonce, in this case, guided by caller test code
+ gserver_nonce[0] = nextnonce
+ gnextnonce[0] = None
+ server_nonce_current = gserver_nonce[0]
+ auth_info = http_parse_auth(data)
+ client_cnonce = auth_info.get('cnonce', '')
+ client_nc = auth_info.get('nc', '')
+ client_nonce = auth_info.get('nonce', '')
+ client_opaque = auth_info.get('opaque', '')
+ client_qop = auth_info.get('qop', 'auth').strip('"')
+
+ # TODO: auth_info.get('algorithm', 'md5')
+ hasher = hashlib.md5
+
+ # TODO: client_qop auth-int
+ ha2 = hasher(':'.join((request.method, request.uri)).encode()).hexdigest()
+
+ if client_nonce != server_nonce_current:
+ if client_nonce == server_nonce_old:
+ return deny(nonce_stale=True)
+ return deny(body=b'invalid nonce')
+ if not client_nc:
+ return deny(body=b'auth-info nc missing')
+ if client_opaque != server_opaque:
+ return deny(body='auth-info opaque mismatch expected={} actual={}'
+ .format(server_opaque, client_opaque).encode())
+ for allow_username, allow_password in allow_credentials:
+ ha1 = hasher(':'.join((allow_username, realm, allow_password)).encode()).hexdigest()
+ allow_response = hasher(':'.join((
+ ha1, client_nonce, client_nc, client_cnonce, client_qop, ha2,
+ )).encode()).hexdigest()
+ rspauth_ha2 = hasher(':{}'.format(request.uri).encode()).hexdigest()
+ rspauth = hasher(':'.join((
+ ha1, client_nonce, client_nc, client_cnonce, client_qop, rspauth_ha2,
+ )).encode()).hexdigest()
+ if auth_info.get('response', '') == allow_response:
+ # TODO: fix or remove doubtful comment
+ # do we need to save nc only on success?
+ glastnc[0] = client_nc
+ allow_headers = {
+ 'authentication-info': ' '.join((
+ 'nextnonce="{}"'.format(nextnonce) if nextnonce else '',
+ 'qop={}'.format(client_qop),
+ 'rspauth="{}"'.format(rspauth),
+ 'cnonce="{}"'.format(client_cnonce),
+ 'nc={}'.format(client_nc),
+ )).strip(),
+ }
+ return make_http_reflect(headers=allow_headers)(request)
+ return deny(body=b'supplied credentials are not allowed')
+ else:
+ return http_response_bytes(
+ status=400,
+ body='unknown authorization scheme={0}'.format(scheme).encode(),
+ )
+
+ return http_reflect_with_auth_handler
+
+
+def get_cache_path():
+ default = './_httplib2_test_cache'
+ path = os.environ.get('httplib2_test_cache_path') or default
+ if os.path.exists(path):
+ shutil.rmtree(path)
+ return path
+
+
+def gen_digest_nonce(salt=b''):
+ t = struct.pack('>Q', int(time.time() * 1e9))
+ return base64.b64encode(t + b':' + hashlib.sha1(t + salt).digest()).decode()
+
+
+def gen_password():
+ length = random.randint(8, 64)
+ return ''.join(six.unichr(random.randint(0, 127)) for _ in range(length))
+
+
+def gzip_compress(bs):
+ # gzipobj = zlib.compressobj(9, zlib.DEFLATED, zlib.MAX_WBITS | 16)
+ # result = gzipobj.compress(text) + gzipobj.flush()
+ buf = six.BytesIO()
+ gf = gzip.GzipFile(fileobj=buf, mode='wb', compresslevel=6)
+ gf.write(bs)
+ gf.close()
+ return buf.getvalue()
+
+
+def gzip_decompress(bs):
+ return zlib.decompress(bs, zlib.MAX_WBITS | 16)
+
+
+def deflate_compress(bs):
+ do = zlib.compressobj(9, zlib.DEFLATED, -zlib.MAX_WBITS)
+ return do.compress(bs) + do.flush()
+
+
+def deflate_decompress(bs):
+ return zlib.decompress(bs, -zlib.MAX_WBITS)
diff --git a/tests/test_auth.py b/tests/test_auth.py
new file mode 100644
index 0000000..3768f2b
--- /dev/null
+++ b/tests/test_auth.py
@@ -0,0 +1,284 @@
+import httplib2
+import pytest
+import tests
+from six.moves import urllib
+
+
+def test_credentials():
+ c = httplib2.Credentials()
+ c.add('joe', 'password')
+ assert tuple(c.iter('bitworking.org'))[0] == ('joe', 'password')
+ assert tuple(c.iter(''))[0] == ('joe', 'password')
+ c.add('fred', 'password2', 'wellformedweb.org')
+ assert tuple(c.iter('bitworking.org'))[0] == ('joe', 'password')
+ assert len(tuple(c.iter('bitworking.org'))) == 1
+ assert len(tuple(c.iter('wellformedweb.org'))) == 2
+ assert ('fred', 'password2') in tuple(c.iter('wellformedweb.org'))
+ c.clear()
+ assert len(tuple(c.iter('bitworking.org'))) == 0
+ c.add('fred', 'password2', 'wellformedweb.org')
+ assert ('fred', 'password2') in tuple(c.iter('wellformedweb.org'))
+ assert len(tuple(c.iter('bitworking.org'))) == 0
+ assert len(tuple(c.iter(''))) == 0
+
+
+def test_basic():
+ # Test Basic Authentication
+ http = httplib2.Http()
+ password = tests.gen_password()
+ handler = tests.http_reflect_with_auth(allow_scheme='basic', allow_credentials=(('joe', password),))
+ with tests.server_request(handler, request_count=3) as uri:
+ response, content = http.request(uri, 'GET')
+ assert response.status == 401
+ http.add_credentials('joe', password)
+ response, content = http.request(uri, 'GET')
+ assert response.status == 200
+
+
+def test_basic_for_domain():
+ # Test Basic Authentication
+ http = httplib2.Http()
+ password = tests.gen_password()
+ handler = tests.http_reflect_with_auth(allow_scheme='basic', allow_credentials=(('joe', password),))
+ with tests.server_request(handler, request_count=4) as uri:
+ response, content = http.request(uri, 'GET')
+ assert response.status == 401
+ http.add_credentials('joe', password, 'example.org')
+ response, content = http.request(uri, 'GET')
+ assert response.status == 401
+ domain = urllib.parse.urlparse(uri)[1]
+ http.add_credentials('joe', password, domain)
+ response, content = http.request(uri, 'GET')
+ assert response.status == 200
+
+
+def test_basic_two_credentials():
+ # Test Basic Authentication with multiple sets of credentials
+ http = httplib2.Http()
+ password1 = tests.gen_password()
+ password2 = tests.gen_password()
+ allowed = [('joe', password1)] # exploit shared mutable list
+ handler = tests.http_reflect_with_auth(allow_scheme='basic', allow_credentials=allowed)
+ with tests.server_request(handler, request_count=7) as uri:
+ http.add_credentials('fred', password2)
+ response, content = http.request(uri, 'GET')
+ assert response.status == 401
+ http.add_credentials('joe', password1)
+ response, content = http.request(uri, 'GET')
+ assert response.status == 200
+ allowed[0] = ('fred', password2)
+ response, content = http.request(uri, 'GET')
+ assert response.status == 200
+
+
+def test_digest():
+ # Test that we support Digest Authentication
+ http = httplib2.Http()
+ password = tests.gen_password()
+ handler = tests.http_reflect_with_auth(allow_scheme='digest', allow_credentials=(('joe', password),))
+ with tests.server_request(handler, request_count=3) as uri:
+ response, content = http.request(uri, 'GET')
+ assert response.status == 401
+ http.add_credentials('joe', password)
+ response, content = http.request(uri, 'GET')
+ assert response.status == 200, content.decode()
+
+
+def test_digest_next_nonce_nc():
+ # Test that if the server sets nextnonce that we reset
+ # the nonce count back to 1
+ http = httplib2.Http()
+ password = tests.gen_password()
+ grenew_nonce = [None]
+ handler = tests.http_reflect_with_auth(
+ allow_scheme='digest',
+ allow_credentials=(('joe', password),),
+ out_renew_nonce=grenew_nonce,
+ )
+ with tests.server_request(handler, request_count=5) as uri:
+ http.add_credentials('joe', password)
+ response1, _ = http.request(uri, 'GET')
+ info = httplib2._parse_www_authenticate(response1, 'authentication-info')
+ assert response1.status == 200
+ assert info.get('digest', {}).get('nc') == '00000001', info
+ assert not info.get('digest', {}).get('nextnonce'), info
+ response2, _ = http.request(uri, 'GET')
+ info2 = httplib2._parse_www_authenticate(response2, 'authentication-info')
+ assert info2.get('digest', {}).get('nc') == '00000002', info2
+ grenew_nonce[0]()
+ response3, content = http.request(uri, 'GET')
+ info3 = httplib2._parse_www_authenticate(response3, 'authentication-info')
+ assert response3.status == 200
+ assert info3.get('digest', {}).get('nc') == '00000001', info3
+
+
+def test_digest_auth_stale():
+ # Test that we can handle a nonce becoming stale
+ http = httplib2.Http()
+ password = tests.gen_password()
+ grenew_nonce = [None]
+ requests = []
+ handler = tests.http_reflect_with_auth(
+ allow_scheme='digest',
+ allow_credentials=(('joe', password),),
+ out_renew_nonce=grenew_nonce,
+ out_requests=requests,
+ )
+ with tests.server_request(handler, request_count=4) as uri:
+ http.add_credentials('joe', password)
+ response, _ = http.request(uri, 'GET')
+ assert response.status == 200
+ info = httplib2._parse_www_authenticate(requests[0][1].headers, 'www-authenticate')
+ grenew_nonce[0]()
+ response, _ = http.request(uri, 'GET')
+ assert response.status == 200
+ assert not response.fromcache
+ assert getattr(response, '_stale_digest', False)
+ info2 = httplib2._parse_www_authenticate(requests[2][1].headers, 'www-authenticate')
+ nonce1 = info.get('digest', {}).get('nonce', '')
+ nonce2 = info2.get('digest', {}).get('nonce', '')
+ assert nonce1 != ''
+ assert nonce2 != ''
+ assert nonce1 != nonce2, (nonce1, nonce2)
+
+
+@pytest.mark.parametrize(
+ 'data', (
+ ({}, {}),
+ ({'www-authenticate': ''}, {}),
+ ({'www-authenticate': 'Test realm="test realm" , foo=foo ,bar="bar", baz=baz,qux=qux'},
+ {'test': {'realm': 'test realm', 'foo': 'foo', 'bar': 'bar', 'baz': 'baz', 'qux': 'qux'}}),
+ ({'www-authenticate': 'T*!%#st realm=to*!%#en, to*!%#en="quoted string"'},
+ {'t*!%#st': {'realm': 'to*!%#en', 'to*!%#en': 'quoted string'}}),
+ ({'www-authenticate': 'Test realm="a \\"test\\" realm"'},
+ {'test': {'realm': 'a "test" realm'}}),
+ ({'www-authenticate': 'Basic realm="me"'},
+ {'basic': {'realm': 'me'}}),
+ ({'www-authenticate': 'Basic realm="me", algorithm="MD5"'},
+ {'basic': {'realm': 'me', 'algorithm': 'MD5'}}),
+ ({'www-authenticate': 'Basic realm="me", algorithm=MD5'},
+ {'basic': {'realm': 'me', 'algorithm': 'MD5'}}),
+ ({'www-authenticate': 'Basic realm="me",other="fred" '},
+ {'basic': {'realm': 'me', 'other': 'fred'}}),
+ ({'www-authenticate': 'Basic REAlm="me" '},
+ {'basic': {'realm': 'me'}}),
+ ({'www-authenticate': 'Digest realm="digest1", qop="auth,auth-int", nonce="7102dd2", opaque="e9517f"'},
+ {'digest': {'realm': 'digest1', 'qop': 'auth,auth-int', 'nonce': '7102dd2', 'opaque': 'e9517f'}}),
+ # multiple schema choice
+ ({'www-authenticate': 'Digest realm="multi-d", nonce="8b11d0f6", opaque="cc069c" Basic realm="multi-b" '},
+ {'digest': {'realm': 'multi-d', 'nonce': '8b11d0f6', 'opaque': 'cc069c'},
+ 'basic': {'realm': 'multi-b'}}),
+ # FIXME
+ # comma between schemas (glue for multiple headers with same name)
+ # ({'www-authenticate': 'Digest realm="2-comma-d", qop="auth-int", nonce="c0c8ff1", Basic realm="2-comma-b"'},
+ # {'digest': {'realm': '2-comma-d', 'qop': 'auth-int', 'nonce': 'c0c8ff1'},
+ # 'basic': {'realm': '2-comma-b'}}),
+ # FIXME
+ # comma between schemas + WSSE (glue for multiple headers with same name)
+ # ({'www-authenticate': 'Digest realm="com3d", Basic realm="com3b", WSSE realm="com3w", profile="token"'},
+ # {'digest': {'realm': 'com3d'}, 'basic': {'realm': 'com3b'}, 'wsse': {'realm': 'com3w', profile': 'token'}}),
+ # FIXME
+ # multiple syntax figures
+ # ({'www-authenticate':
+ # 'Digest realm="brig", qop \t=\t"\tauth,auth-int", nonce="(*)&^&$%#",opaque="5ccc"' +
+ # ', Basic REAlm="zoo", WSSE realm="very", profile="UsernameToken"'},
+ # {'digest': {'realm': 'brig', 'qop': 'auth,auth-int', 'nonce': '(*)&^&$%#', 'opaque': '5ccc'},
+ # 'basic': {'realm': 'zoo'},
+ # 'wsse': {'realm': 'very', 'profile': 'UsernameToken'}}),
+ # more quote combos
+ ({'www-authenticate': 'Digest realm="myrealm", nonce="KBAA=3", algorithm=MD5, qop="auth", stale=true'},
+ {'digest': {'realm': 'myrealm', 'nonce': 'KBAA=3', 'algorithm': 'MD5', 'qop': 'auth', 'stale': 'true'}}),
+ ), ids=lambda data: str(data[0]))
+@pytest.mark.parametrize('strict', (True, False), ids=('strict', 'relax'))
+def test_parse_www_authenticate_correct(data, strict):
+ headers, info = data
+ # FIXME: move strict to parse argument
+ httplib2.USE_WWW_AUTH_STRICT_PARSING = strict
+ try:
+ assert httplib2._parse_www_authenticate(headers) == info
+ finally:
+ httplib2.USE_WWW_AUTH_STRICT_PARSING = 0
+
+
+def test_parse_www_authenticate_malformed():
+ # TODO: test (and fix) header value 'barbqwnbm-bb...:asd' leads to dead loop
+ with tests.assert_raises(httplib2.MalformedHeader):
+ httplib2._parse_www_authenticate(
+ {'www-authenticate': 'OAuth "Facebook Platform" "invalid_token" "Invalid OAuth access token."'}
+ )
+
+
+def test_digest_object():
+ credentials = ('joe', 'password')
+ host = None
+ request_uri = '/test/digest/'
+ headers = {}
+ response = {
+ 'www-authenticate': 'Digest realm="myrealm", nonce="KBAA=35", algorithm=MD5, qop="auth"'
+ }
+ content = b''
+
+ d = httplib2.DigestAuthentication(credentials, host, request_uri, headers, response, content, None)
+ d.request('GET', request_uri, headers, content, cnonce="33033375ec278a46")
+ our_request = 'authorization: ' + headers['authorization']
+ working_request = (
+ 'authorization: Digest username="joe", realm="myrealm", nonce="KBAA=35", uri="/test/digest/"' +
+ ', algorithm=MD5, response="de6d4a123b80801d0e94550411b6283f", qop=auth, nc=00000001, cnonce="33033375ec278a46"'
+ )
+ assert our_request == working_request
+
+
+def test_digest_object_with_opaque():
+ credentials = ('joe', 'password')
+ host = None
+ request_uri = '/digest/opaque/'
+ headers = {}
+ response = {
+ 'www-authenticate': 'Digest realm="myrealm", nonce="30352fd", algorithm=MD5, qop="auth", opaque="atestopaque"',
+ }
+ content = ''
+
+ d = httplib2.DigestAuthentication(credentials, host, request_uri, headers, response, content, None)
+ d.request('GET', request_uri, headers, content, cnonce="5ec2")
+ our_request = 'authorization: ' + headers['authorization']
+ working_request = (
+ 'authorization: Digest username="joe", realm="myrealm", nonce="30352fd", uri="/digest/opaque/", algorithm=MD5' +
+ ', response="a1fab43041f8f3789a447f48018bee48", qop=auth, nc=00000001, cnonce="5ec2", opaque="atestopaque"'
+ )
+ assert our_request == working_request
+
+
+def test_digest_object_stale():
+ credentials = ('joe', 'password')
+ host = None
+ request_uri = '/digest/stale/'
+ headers = {}
+ response = httplib2.Response({})
+ response['www-authenticate'] = 'Digest realm="myrealm", nonce="bd669f", algorithm=MD5, qop="auth", stale=true'
+ response.status = 401
+ content = b''
+ d = httplib2.DigestAuthentication(credentials, host, request_uri, headers, response, content, None)
+ # Returns true to force a retry
+ assert d.response(response, content)
+
+
+def test_digest_object_auth_info():
+ credentials = ('joe', 'password')
+ host = None
+ request_uri = '/digest/nextnonce/'
+ headers = {}
+ response = httplib2.Response({})
+ response['www-authenticate'] = 'Digest realm="myrealm", nonce="barney", algorithm=MD5, qop="auth", stale=true'
+ response['authentication-info'] = 'nextnonce="fred"'
+ content = b''
+ d = httplib2.DigestAuthentication(credentials, host, request_uri, headers, response, content, None)
+ # Returns true to force a retry
+ assert not d.response(response, content)
+ assert d.challenge['nonce'] == 'fred'
+ assert d.challenge['nc'] == 1
+
+
+def test_wsse_algorithm():
+ digest = httplib2._wsse_username_token('d36e316282959a9ed4c89851497a717f', '2003-12-15T14:43:07Z', 'taadtaadpstcsm')
+ expected = b'quR/EWLAV4xLf9Zqyw4pDmfV9OY='
+ assert expected == digest
diff --git a/tests/test_cache.py b/tests/test_cache.py
new file mode 100644
index 0000000..c2a2beb
--- /dev/null
+++ b/tests/test_cache.py
@@ -0,0 +1,390 @@
+import email.utils
+import httplib2
+import pytest
+import re
+import tests
+import time
+
+
+dummy_url = 'http://127.0.0.1:1'
+
+
+def test_get_only_if_cached_cache_hit():
+ # Test that can do a GET with cache and 'only-if-cached'
+ http = httplib2.Http(cache=tests.get_cache_path())
+ with tests.server_const_http(add_etag=True) as uri:
+ http.request(uri, 'GET')
+ response, content = http.request(uri, 'GET', headers={'cache-control': 'only-if-cached'})
+ assert response.fromcache
+ assert response.status == 200
+
+
+def test_get_only_if_cached_cache_miss():
+ # Test that can do a GET with no cache with 'only-if-cached'
+ http = httplib2.Http(cache=tests.get_cache_path())
+ with tests.server_const_http(request_count=0) as uri:
+ response, content = http.request(uri, 'GET', headers={'cache-control': 'only-if-cached'})
+ assert not response.fromcache
+ assert response.status == 504
+
+
+def test_get_only_if_cached_no_cache_at_all():
+ # Test that can do a GET with no cache with 'only-if-cached'
+ # Of course, there might be an intermediary beyond us
+ # that responds to the 'only-if-cached', so this
+ # test can't really be guaranteed to pass.
+ http = httplib2.Http()
+ with tests.server_const_http(request_count=0) as uri:
+ response, content = http.request(uri, 'GET', headers={'cache-control': 'only-if-cached'})
+ assert not response.fromcache
+ assert response.status == 504
+
+
+@pytest.mark.skip(reason='was commented in legacy code')
+def test_TODO_vary_no():
+ pass
+ # when there is no vary, a different Accept header (e.g.) should not
+ # impact if the cache is used
+ # test that the vary header is not sent
+ # uri = urllib.parse.urljoin(base, "vary/no-vary.asis")
+ # response, content = http.request(uri, 'GET', headers={'Accept': 'text/plain'})
+ # assert response.status == 200
+ # assert 'vary' not in response
+ #
+ # response, content = http.request(uri, 'GET', headers={'Accept': 'text/plain'})
+ # assert response.status == 200
+ # assert response.fromcache, "Should be from cache"
+ #
+ # response, content = http.request(uri, 'GET', headers={'Accept': 'text/html'})
+ # assert response.status == 200
+ # assert response.fromcache, "Should be from cache"
+
+
+def test_vary_header_simple():
+ """
+ RFC 2616 13.6
+ When the cache receives a subsequent request whose Request-URI
+ specifies one or more cache entries including a Vary header field,
+ the cache MUST NOT use such a cache entry to construct a response
+ to the new request unless all of the selecting request-headers
+ present in the new request match the corresponding stored
+ request-headers in the original request.
+ """
+ # test that the vary header is sent
+ http = httplib2.Http(cache=tests.get_cache_path())
+ response = tests.http_response_bytes(
+ headers={'vary': 'Accept', 'cache-control': 'max-age=300'},
+ add_date=True,
+ )
+ with tests.server_const_bytes(response, request_count=3) as uri:
+ response, content = http.request(uri, 'GET', headers={'accept': 'text/plain'})
+ assert response.status == 200
+ assert 'vary' in response
+
+ # get the resource again, from the cache since accept header in this
+ # request is the same as the request
+ response, content = http.request(uri, 'GET', headers={'Accept': 'text/plain'})
+ assert response.status == 200
+ assert response.fromcache, "Should be from cache"
+
+ # get the resource again, not from cache since Accept headers does not match
+ response, content = http.request(uri, 'GET', headers={'Accept': 'text/html'})
+ assert response.status == 200
+ assert not response.fromcache, "Should not be from cache"
+
+ # get the resource again, without any Accept header, so again no match
+ response, content = http.request(uri, 'GET')
+ assert response.status == 200
+ assert not response.fromcache, "Should not be from cache"
+
+
+def test_vary_header_double():
+ http = httplib2.Http(cache=tests.get_cache_path())
+ response = tests.http_response_bytes(
+ headers={'vary': 'Accept, Accept-Language', 'cache-control': 'max-age=300'},
+ add_date=True,
+ )
+ with tests.server_const_bytes(response, request_count=3) as uri:
+ response, content = http.request(uri, 'GET', headers={
+ 'Accept': 'text/plain',
+ 'Accept-Language': 'da, en-gb;q=0.8, en;q=0.7',
+ })
+ assert response.status == 200
+ assert 'vary' in response
+
+ # we are from cache
+ response, content = http.request(uri, 'GET', headers={
+ 'Accept': 'text/plain', 'Accept-Language': 'da, en-gb;q=0.8, en;q=0.7'})
+ assert response.fromcache, "Should be from cache"
+
+ response, content = http.request(uri, 'GET', headers={'Accept': 'text/plain'})
+ assert response.status == 200
+ assert not response.fromcache
+
+ # get the resource again, not from cache, varied headers don't match exact
+ response, content = http.request(uri, 'GET', headers={'Accept-Language': 'da'})
+ assert response.status == 200
+ assert not response.fromcache, "Should not be from cache"
+
+
+def test_vary_unused_header():
+ http = httplib2.Http(cache=tests.get_cache_path())
+ response = tests.http_response_bytes(
+ headers={'vary': 'X-No-Such-Header', 'cache-control': 'max-age=300'},
+ add_date=True,
+ )
+ with tests.server_const_bytes(response, request_count=1) as uri:
+ # A header's value is not considered to vary if it's not used at all.
+ response, content = http.request(uri, 'GET', headers={'Accept': 'text/plain'})
+ assert response.status == 200
+ assert 'vary' in response
+
+ # we are from cache
+ response, content = http.request(uri, 'GET', headers={'Accept': 'text/plain'})
+ assert response.fromcache, "Should be from cache"
+
+
+def test_get_cache_control_no_cache():
+ # Test Cache-Control: no-cache on requests
+ http = httplib2.Http(cache=tests.get_cache_path())
+ with tests.server_const_http(
+ add_date=True, add_etag=True,
+ headers={'cache-control': 'max-age=300'}, request_count=2) as uri:
+ response, _ = http.request(uri, 'GET', headers={'accept-encoding': 'identity'})
+ assert response.status == 200
+ assert response['etag'] != ''
+ assert not response.fromcache
+ response, _ = http.request(uri, 'GET', headers={'accept-encoding': 'identity'})
+ assert response.status == 200
+ assert response.fromcache
+ response, _ = http.request(uri, 'GET', headers={'accept-encoding': 'identity', 'Cache-Control': 'no-cache'})
+ assert response.status == 200
+ assert not response.fromcache
+
+
+def test_get_cache_control_pragma_no_cache():
+ # Test Pragma: no-cache on requests
+ http = httplib2.Http(cache=tests.get_cache_path())
+ with tests.server_const_http(
+ add_date=True, add_etag=True,
+ headers={'cache-control': 'max-age=300'}, request_count=2) as uri:
+ response, _ = http.request(uri, 'GET', headers={'accept-encoding': 'identity'})
+ assert response['etag'] != ''
+ response, _ = http.request(uri, 'GET', headers={'accept-encoding': 'identity'})
+ assert response.status == 200
+ assert response.fromcache
+ response, _ = http.request(uri, 'GET', headers={'accept-encoding': 'identity', 'Pragma': 'no-cache'})
+ assert response.status == 200
+ assert not response.fromcache
+
+
+def test_get_cache_control_no_store_request():
+ # A no-store request means that the response should not be stored.
+ http = httplib2.Http(cache=tests.get_cache_path())
+ with tests.server_const_http(
+ add_date=True, add_etag=True,
+ headers={'cache-control': 'max-age=300'}, request_count=2) as uri:
+ response, _ = http.request(uri, 'GET', headers={'Cache-Control': 'no-store'})
+ assert response.status == 200
+ assert not response.fromcache
+ response, _ = http.request(uri, 'GET', headers={'Cache-Control': 'no-store'})
+ assert response.status == 200
+ assert not response.fromcache
+
+
+def test_get_cache_control_no_store_response():
+ # A no-store response means that the response should not be stored.
+ http = httplib2.Http(cache=tests.get_cache_path())
+ with tests.server_const_http(
+ add_date=True, add_etag=True,
+ headers={'cache-control': 'max-age=300, no-store'}, request_count=2) as uri:
+ response, _ = http.request(uri, 'GET')
+ assert response.status == 200
+ assert not response.fromcache
+ response, _ = http.request(uri, 'GET')
+ assert response.status == 200
+ assert not response.fromcache
+
+
+def test_get_cache_control_no_cache_no_store_request():
+ # Test that a no-store, no-cache clears the entry from the cache
+ # even if it was cached previously.
+ http = httplib2.Http(cache=tests.get_cache_path())
+ with tests.server_const_http(
+ add_date=True, add_etag=True,
+ headers={'cache-control': 'max-age=300'}, request_count=3) as uri:
+ response, _ = http.request(uri, 'GET')
+ response, _ = http.request(uri, 'GET')
+ assert response.fromcache
+ response, _ = http.request(uri, 'GET', headers={'Cache-Control': 'no-store, no-cache'})
+ assert response.status == 200
+ assert not response.fromcache
+ response, _ = http.request(uri, 'GET', headers={'Cache-Control': 'no-store, no-cache'})
+ assert response.status == 200
+ assert not response.fromcache
+
+
+def test_update_invalidates_cache():
+ # Test that calling PUT or DELETE on a
+ # URI that is cache invalidates that cache.
+ http = httplib2.Http(cache=tests.get_cache_path())
+
+ def handler(request):
+ if request.method in ('PUT', 'PATCH', 'DELETE'):
+ return tests.http_response_bytes(status=405)
+ return tests.http_response_bytes(
+ add_date=True, add_etag=True, headers={'cache-control': 'max-age=300'})
+
+ with tests.server_request(handler, request_count=3) as uri:
+ response, _ = http.request(uri, 'GET')
+ response, _ = http.request(uri, 'GET')
+ assert response.fromcache
+ response, _ = http.request(uri, 'DELETE')
+ assert response.status == 405
+ assert not response.fromcache
+ response, _ = http.request(uri, 'GET')
+ assert not response.fromcache
+
+
+def handler_conditional_update(request):
+ respond = tests.http_response_bytes
+ if request.method == 'GET':
+ if request.headers.get('if-none-match', '') == '12345':
+ return respond(status=304)
+ return respond(add_date=True, headers={'etag': '12345', 'cache-control': 'max-age=300'})
+ elif request.method in ('PUT', 'PATCH', 'DELETE'):
+ if request.headers.get('if-match', '') == '12345':
+ return respond(status=200)
+ return respond(status=412)
+ return respond(status=405)
+
+
+@pytest.mark.parametrize('method', ('PUT', 'PATCH'))
+def test_update_uses_cached_etag(method):
+ # Test that we natively support http://www.w3.org/1999/04/Editing/
+ http = httplib2.Http(cache=tests.get_cache_path())
+ with tests.server_request(handler_conditional_update, request_count=3) as uri:
+ response, _ = http.request(uri, 'GET')
+ assert response.status == 200
+ assert not response.fromcache
+ response, _ = http.request(uri, 'GET')
+ assert response.status == 200
+ assert response.fromcache
+ response, _ = http.request(uri, method, body=b'foo')
+ assert response.status == 200
+ response, _ = http.request(uri, method, body=b'foo')
+ assert response.status == 412
+
+
+def test_update_uses_cached_etag_and_oc_method():
+ # Test that we natively support http://www.w3.org/1999/04/Editing/
+ http = httplib2.Http(cache=tests.get_cache_path())
+ with tests.server_request(handler_conditional_update, request_count=2) as uri:
+ response, _ = http.request(uri, 'GET')
+ assert response.status == 200
+ assert not response.fromcache
+ response, _ = http.request(uri, 'GET')
+ assert response.status == 200
+ assert response.fromcache
+ http.optimistic_concurrency_methods.append('DELETE')
+ response, _ = http.request(uri, 'DELETE')
+ assert response.status == 200
+
+
+def test_update_uses_cached_etag_overridden():
+ # Test that we natively support http://www.w3.org/1999/04/Editing/
+ http = httplib2.Http(cache=tests.get_cache_path())
+ with tests.server_request(handler_conditional_update, request_count=2) as uri:
+ response, content = http.request(uri, 'GET')
+ assert response.status == 200
+ assert not response.fromcache
+ response, content = http.request(uri, 'GET')
+ assert response.status == 200
+ assert response.fromcache
+ response, content = http.request(uri, 'PUT', body=b'foo', headers={'if-match': 'fred'})
+ assert response.status == 412
+
+
+@pytest.mark.parametrize(
+ 'data', (
+ ({}, {}),
+ ({'cache-control': ' no-cache'},
+ {'no-cache': 1}),
+ ({'cache-control': ' no-store, max-age = 7200'},
+ {'no-store': 1, 'max-age': '7200'}),
+ ({'cache-control': ' , '}, {'': 1}), # FIXME
+ ({'cache-control': 'Max-age=3600;post-check=1800,pre-check=3600'},
+ {'max-age': '3600;post-check=1800', 'pre-check': '3600'}),
+ ), ids=lambda data: str(data[0]))
+def test_parse_cache_control(data):
+ header, expected = data
+ assert httplib2._parse_cache_control(header) == expected
+
+
+def test_normalize_headers():
+ # Test that we normalize headers to lowercase
+ h = httplib2._normalize_headers({'Cache-Control': 'no-cache', 'Other': 'Stuff'})
+ assert 'cache-control' in h
+ assert 'other' in h
+ assert h['other'] == 'Stuff'
+
+
+@pytest.mark.parametrize(
+ 'data', (
+ ({'cache-control': 'no-cache'}, {'cache-control': 'max-age=7200'}, 'TRANSPARENT'),
+ ({}, {'cache-control': 'max-age=fred, min-fresh=barney'}, 'STALE'),
+ ({}, {'date': '{now}', 'expires': '{now+3}'}, 'FRESH'),
+ ({}, {'date': '{now}', 'expires': '{now+3}', 'cache-control': 'no-cache'}, 'STALE'),
+ ({'cache-control': 'must-revalidate'}, {}, 'STALE'),
+ ({}, {'cache-control': 'must-revalidate'}, 'STALE'),
+ ({}, {'date': '{now}', 'cache-control': 'max-age=0'}, 'STALE'),
+ ({'cache-control': 'only-if-cached'}, {}, 'FRESH'),
+ ({}, {'date': '{now}', 'expires': '0'}, 'STALE'),
+ ({}, {'data': '{now+3}'}, 'STALE'),
+ ({'cache-control': 'max-age=0'}, {'date': '{now}', 'cache-control': 'max-age=2'}, 'STALE'),
+ ({'cache-control': 'min-fresh=2'}, {'date': '{now}', 'expires': '{now+2}'}, 'STALE'),
+ ({'cache-control': 'min-fresh=2'}, {'date': '{now}', 'expires': '{now+4}'}, 'FRESH'),
+ ), ids=lambda data: str(data))
+def test_entry_disposition(data):
+ now = time.time()
+ nowre = re.compile(r'{now([\+\-]\d+)?}')
+
+ def render(s):
+ m = nowre.match(s)
+ if m:
+ offset = int(m.expand(r'\1')) if m.group(1) else 0
+ s = email.utils.formatdate(now + offset, usegmt=True)
+ return s
+
+ request, response, expected = data
+ request = {k: render(v) for k, v in request.items()}
+ response = {k: render(v) for k, v in response.items()}
+ assert httplib2._entry_disposition(response, request) == expected
+
+
+def test_expiration_model_fresh():
+ response_headers = {
+ 'date': email.utils.formatdate(usegmt=True),
+ 'cache-control': 'max-age=2'
+ }
+ assert httplib2._entry_disposition(response_headers, {}) == 'FRESH'
+ # TODO: add current time as _entry_disposition argument to avoid sleep in tests
+ time.sleep(3)
+ assert httplib2._entry_disposition(response_headers, {}) == 'STALE'
+
+
+def test_expiration_model_date_and_expires():
+ now = time.time()
+ response_headers = {
+ 'date': email.utils.formatdate(now, usegmt=True),
+ 'expires': email.utils.formatdate(now + 2, usegmt=True),
+ }
+ assert httplib2._entry_disposition(response_headers, {}) == 'FRESH'
+ time.sleep(3)
+ assert httplib2._entry_disposition(response_headers, {}) == 'STALE'
+
+
+# TODO: Repeat all cache tests with memcache. pytest.mark.parametrize
+# cache = memcache.Client(['127.0.0.1:11211'], debug=0)
+# #cache = memcache.Client(['10.0.0.4:11211'], debug=1)
+# http = httplib2.Http(cache)
diff --git a/tests/test_encoding.py b/tests/test_encoding.py
new file mode 100644
index 0000000..df991a1
--- /dev/null
+++ b/tests/test_encoding.py
@@ -0,0 +1,99 @@
+import httplib2
+import tests
+
+
+def test_gzip_head():
+ # Test that we don't try to decompress a HEAD response
+ http = httplib2.Http()
+ response = tests.http_response_bytes(
+ headers={'content-encoding': 'gzip', 'content-length': 42},
+ )
+ with tests.server_const_bytes(response) as uri:
+ response, content = http.request(uri, 'HEAD')
+ assert response.status == 200
+ assert int(response['content-length']) != 0
+ assert content == b''
+
+
+def test_gzip_get():
+ # Test that we support gzip compression
+ http = httplib2.Http()
+ response = tests.http_response_bytes(
+ headers={'content-encoding': 'gzip'},
+ body=tests.gzip_compress(b'properly compressed'),
+ )
+ with tests.server_const_bytes(response) as uri:
+ response, content = http.request(uri, 'GET')
+ assert response.status == 200
+ assert 'content-encoding' not in response
+ assert '-content-encoding' in response
+ assert int(response['content-length']) == len(b'properly compressed')
+ assert content == b'properly compressed'
+
+
+def test_gzip_post_response():
+ http = httplib2.Http()
+ response = tests.http_response_bytes(
+ headers={'content-encoding': 'gzip'},
+ body=tests.gzip_compress(b'properly compressed'),
+ )
+ with tests.server_const_bytes(response) as uri:
+ response, content = http.request(uri, 'POST', body=b'')
+ assert response.status == 200
+ assert 'content-encoding' not in response
+ assert '-content-encoding' in response
+
+
+def test_gzip_malformed_response():
+ http = httplib2.Http()
+ # Test that we raise a good exception when the gzip fails
+ http.force_exception_to_status_code = False
+ response = tests.http_response_bytes(
+ headers={'content-encoding': 'gzip'},
+ body=b'obviously not compressed',
+ )
+ with tests.server_const_bytes(response, request_count=2) as uri:
+ with tests.assert_raises(httplib2.FailedToDecompressContent):
+ http.request(uri, 'GET')
+
+ # Re-run the test with out the exceptions
+ http.force_exception_to_status_code = True
+
+ response, content = http.request(uri, 'GET')
+ assert response.status == 500
+ assert response.reason.startswith('Content purported')
+
+
+def test_deflate_get():
+ # Test that we support deflate compression
+ http = httplib2.Http()
+ response = tests.http_response_bytes(
+ headers={'content-encoding': 'deflate'},
+ body=tests.deflate_compress(b'properly compressed'),
+ )
+ with tests.server_const_bytes(response) as uri:
+ response, content = http.request(uri, 'GET')
+ assert response.status == 200
+ assert 'content-encoding' not in response
+ assert int(response['content-length']) == len(b'properly compressed')
+ assert content == b'properly compressed'
+
+
+def test_deflate_malformed_response():
+ # Test that we raise a good exception when the deflate fails
+ http = httplib2.Http()
+ http.force_exception_to_status_code = False
+ response = tests.http_response_bytes(
+ headers={'content-encoding': 'deflate'},
+ body=b'obviously not compressed',
+ )
+ with tests.server_const_bytes(response, request_count=2) as uri:
+ with tests.assert_raises(httplib2.FailedToDecompressContent):
+ http.request(uri, 'GET')
+
+ # Re-run the test with out the exceptions
+ http.force_exception_to_status_code = True
+
+ response, content = http.request(uri, 'GET')
+ assert response.status == 500
+ assert response.reason.startswith('Content purported')
diff --git a/tests/test_external.py b/tests/test_external.py
new file mode 100644
index 0000000..20652c9
--- /dev/null
+++ b/tests/test_external.py
@@ -0,0 +1,95 @@
+'''These tests rely on replies from public internet services
+
+TODO: reimplement with local stubs
+'''
+import httplib2
+import os
+import pytest
+import ssl
+import sys
+import tests
+
+
+def test_get_301_via_https():
+ # Google always redirects to http://google.com
+ http = httplib2.Http()
+ response, content = http.request('https://code.google.com/apis/', 'GET')
+ assert response.status == 200
+ assert response.previous.status == 301
+
+
+def test_get_via_https():
+ # Test that we can handle HTTPS
+ http = httplib2.Http()
+ response, content = http.request('https://google.com/adsense/', 'GET')
+ assert response.status == 200
+
+
+def test_get_via_https_spec_violation_on_location():
+ # Test that we follow redirects through HTTPS
+ # even if they violate the spec by including
+ # a relative Location: header instead of an
+ # absolute one.
+ http = httplib2.Http()
+ response, content = http.request('https://google.com/adsense', 'GET')
+ assert response.status == 200
+ assert response.previous is not None
+
+
+def test_get_via_https_key_cert():
+ # At this point I can only test
+ # that the key and cert files are passed in
+ # correctly to httplib. It would be nice to have
+ # a real https endpoint to test against.
+ http = httplib2.Http(timeout=2)
+ http.add_certificate('akeyfile', 'acertfile', 'bitworking.org')
+ try:
+ http.request('https://bitworking.org', 'GET')
+ except AttributeError:
+ assert http.connections['https:bitworking.org'].key_file == 'akeyfile'
+ assert http.connections['https:bitworking.org'].cert_file == 'acertfile'
+ except IOError:
+ # Skip on 3.2
+ pass
+
+ try:
+ http.request('https://notthere.bitworking.org', 'GET')
+ except httplib2.ServerNotFoundError:
+ assert http.connections['https:notthere.bitworking.org'].key_file is None
+ assert http.connections['https:notthere.bitworking.org'].cert_file is None
+ except IOError:
+ # Skip on 3.2
+ pass
+
+
+def test_ssl_invalid_ca_certs_path():
+ # Test that we get an ssl.SSLError when specifying a non-existent CA
+ # certs file.
+ http = httplib2.Http(ca_certs='/nosuchfile')
+ with tests.assert_raises(IOError):
+ http.request('https://www.google.com/', 'GET')
+
+
+@pytest.mark.xfail(
+ sys.version_info <= (3,),
+ reason='FIXME: for unknown reason Python 2.7.10 validates www.google.com against dummy CA www.example.com',
+)
+def test_ssl_wrong_ca():
+ # Test that we get a SSLHandshakeError if we try to access
+ # https://www.google.com, using a CA cert file that doesn't contain
+ # the CA Google uses (i.e., simulating a cert that's not signed by a
+ # trusted CA).
+ other_ca_certs = os.path.join(
+ os.path.dirname(os.path.abspath(httplib2.__file__)),
+ 'test', 'other_cacerts.txt')
+ assert os.path.exists(other_ca_certs)
+ http = httplib2.Http(ca_certs=other_ca_certs)
+ http.follow_redirects = False
+ with tests.assert_raises(ssl.SSLError):
+ http.request('https://www.google.com/', 'GET')
+
+
+def test_sni_hostname_validation():
+ # TODO: make explicit test server with SNI validation
+ http = httplib2.Http()
+ http.request('https://google.com/', method='GET')
diff --git a/tests/test_http.py b/tests/test_http.py
new file mode 100644
index 0000000..29d67af
--- /dev/null
+++ b/tests/test_http.py
@@ -0,0 +1,592 @@
+import email.utils
+import httplib2
+import mock
+import os
+import pytest
+import socket
+import tests
+from six.moves import http_client, urllib
+
+
+dummy_url = 'http://127.0.0.1:1'
+
+
+def test_connection_type():
+ http = httplib2.Http()
+ http.force_exception_to_status_code = False
+ response, content = http.request(dummy_url, connection_type=tests.MockHTTPConnection)
+ assert response['content-location'] == dummy_url
+ assert content == b'the body'
+
+
+def test_bad_status_line_retry():
+ http = httplib2.Http()
+ old_retries = httplib2.RETRIES
+ httplib2.RETRIES = 1
+ http.force_exception_to_status_code = False
+ try:
+ response, content = http.request(dummy_url, connection_type=tests.MockHTTPBadStatusConnection)
+ except http_client.BadStatusLine:
+ assert tests.MockHTTPBadStatusConnection.num_calls == 2
+ httplib2.RETRIES = old_retries
+
+
+def test_unknown_server():
+ http = httplib2.Http()
+ http.force_exception_to_status_code = False
+ with tests.assert_raises(httplib2.ServerNotFoundError):
+ with mock.patch('socket.socket.connect', side_effect=socket.gaierror):
+ http.request("http://no-such-hostname./")
+
+ # Now test with exceptions turned off
+ http.force_exception_to_status_code = True
+ response, content = http.request("http://no-such-hostname./")
+ assert response['content-type'] == 'text/plain'
+ assert content.startswith(b"Unable to find")
+ assert response.status == 400
+
+
+def test_connection_refused():
+ http = httplib2.Http()
+ http.force_exception_to_status_code = False
+ with tests.assert_raises(socket.error):
+ http.request(dummy_url)
+
+ # Now test with exceptions turned off
+ http.force_exception_to_status_code = True
+ response, content = http.request(dummy_url)
+ assert response['content-type'] == 'text/plain'
+ assert (b"Connection refused" in content or b"actively refused" in content)
+ assert response.status == 400
+
+
+def test_get_iri():
+ http = httplib2.Http()
+ query = u'?a=\N{CYRILLIC CAPITAL LETTER DJE}'
+ with tests.server_reflect() as uri:
+ response, content = http.request(uri + query, 'GET')
+ assert response.status == 200
+ reflected = tests.HttpRequest.from_bytes(content)
+ assert reflected.uri == '/?a=%D0%82'
+
+
+def test_get_is_default_method():
+ # Test that GET is the default method
+ http = httplib2.Http()
+ with tests.server_reflect() as uri:
+ response, content = http.request(uri)
+ assert response.status == 200
+ reflected = tests.HttpRequest.from_bytes(content)
+ assert reflected.method == 'GET'
+
+
+def test_different_methods():
+ # Test that all methods can be used
+ http = httplib2.Http()
+ methods = ['GET', 'PUT', 'DELETE', 'POST', 'unknown']
+ with tests.server_reflect(request_count=len(methods)) as uri:
+ for method in methods:
+ response, content = http.request(uri, method, body=b" ")
+ assert response.status == 200
+ reflected = tests.HttpRequest.from_bytes(content)
+ assert reflected.method == method
+
+
+def test_head_read():
+ # Test that we don't try to read the response of a HEAD request
+ # since httplib blocks response.read() for HEAD requests.
+ http = httplib2.Http()
+ respond_with = b'HTTP/1.0 200 OK\r\ncontent-length: 14\r\n\r\nnon-empty-body'
+ with tests.server_const_bytes(respond_with) as uri:
+ response, content = http.request(uri, 'HEAD')
+ assert response.status == 200
+ assert content == b""
+
+
+def test_get_no_cache():
+ # Test that can do a GET w/o the cache turned on.
+ http = httplib2.Http()
+ with tests.server_const_http() as uri:
+ response, content = http.request(uri, 'GET')
+ assert response.status == 200
+ assert response.previous is None
+
+
+def test_user_agent():
+ # Test that we provide a default user-agent
+ http = httplib2.Http()
+ with tests.server_reflect() as uri:
+ response, content = http.request(uri, 'GET')
+ assert response.status == 200
+ reflected = tests.HttpRequest.from_bytes(content)
+ assert reflected.headers.get('user-agent', '').startswith('Python-httplib2/')
+
+
+def test_user_agent_non_default():
+ # Test that the default user-agent can be over-ridden
+ http = httplib2.Http()
+ with tests.server_reflect() as uri:
+ response, content = http.request(uri, 'GET', headers={'User-Agent': 'fred/1.0'})
+ assert response.status == 200
+ reflected = tests.HttpRequest.from_bytes(content)
+ assert reflected.headers.get('user-agent') == 'fred/1.0'
+
+
+def test_get_300_with_location():
+ # Test the we automatically follow 300 redirects if a Location: header is provided
+ http = httplib2.Http()
+ final_content = b'This is the final destination.\n'
+ routes = {
+ '/final': tests.http_response_bytes(body=final_content),
+ '': tests.http_response_bytes(status='300 Multiple Choices', headers={'location': '/final'}),
+ }
+ with tests.server_route(routes, request_count=2) as uri:
+ response, content = http.request(uri, 'GET')
+ assert response.status == 200
+ assert content == final_content
+ assert response.previous.status == 300
+ assert not response.previous.fromcache
+
+ # Confirm that the intermediate 300 is not cached
+ with tests.server_route(routes, request_count=2) as uri:
+ response, content = http.request(uri, 'GET')
+ assert response.status == 200
+ assert content == final_content
+ assert response.previous.status == 300
+ assert not response.previous.fromcache
+
+
+def test_get_300_with_location_noredirect():
+ # Test the we automatically follow 300 redirects if a Location: header is provided
+ http = httplib2.Http()
+ http.follow_redirects = False
+ response = tests.http_response_bytes(
+ status='300 Multiple Choices',
+ headers={'location': '/final'},
+ body=b'redirect body')
+ with tests.server_const_bytes(response) as uri:
+ response, content = http.request(uri, 'GET')
+ assert response.status == 300
+
+
+def test_get_300_without_location():
+ # Not giving a Location: header in a 300 response is acceptable
+ # In which case we just return the 300 response
+ http = httplib2.Http()
+ with tests.server_const_http(status='300 Multiple Choices', body=b'redirect body') as uri:
+ response, content = http.request(uri, 'GET')
+ assert response.status == 300
+ assert response.previous is None
+ assert content == b'redirect body'
+
+
+def test_get_301():
+ # Test that we automatically follow 301 redirects
+ # and that we cache the 301 response
+ http = httplib2.Http(cache=tests.get_cache_path())
+ destination = ''
+ routes = {
+ '/final': tests.http_response_bytes(body=b'This is the final destination.\n'),
+ '': tests.http_response_bytes(
+ status='301 Now where did I leave that URL', headers={'location': '/final'}, body=b'redirect body'),
+ }
+ with tests.server_route(routes, request_count=3) as uri:
+ destination = urllib.parse.urljoin(uri, '/final')
+ response1, content1 = http.request(uri, 'GET')
+ response2, content2 = http.request(uri, 'GET')
+ assert response1.status == 200
+ assert 'content-location' in response2
+ assert response1['content-location'] == destination
+ assert content1 == b'This is the final destination.\n'
+ assert response1.previous.status == 301
+ assert not response1.previous.fromcache
+
+ assert response2.status == 200
+ assert response2['content-location'] == destination
+ assert content2 == b'This is the final destination.\n'
+ assert response2.previous.status == 301
+ assert response2.previous.fromcache
+
+
+@pytest.mark.skip(
+ not os.environ.get('httplib2_test_still_run_skipped') and
+ os.environ.get('TRAVIS_PYTHON_VERSION') in ('2.7', 'pypy'),
+ reason='FIXME: timeout on Travis py27 and pypy, works elsewhere',
+)
+def test_head_301():
+ # Test that we automatically follow 301 redirects
+ http = httplib2.Http()
+ destination = ''
+ routes = {
+ '/final': tests.http_response_bytes(body=b'This is the final destination.\n'),
+ '': tests.http_response_bytes(
+ status='301 Now where did I leave that URL', headers={'location': '/final'}, body=b'redirect body'),
+ }
+ with tests.server_route(routes, request_count=2) as uri:
+ destination = urllib.parse.urljoin(uri, '/final')
+ response, content = http.request(uri, 'HEAD')
+ assert response.status == 200
+ assert response['content-location'] == destination
+ assert response.previous.status == 301
+ assert not response.previous.fromcache
+
+
+@pytest.mark.xfail(reason='FIXME: 301 cache works only with follow_redirects, should work regardless')
+def test_get_301_no_redirect():
+ # Test that we cache the 301 response
+ http = httplib2.Http(cache=tests.get_cache_path(), timeout=0.5)
+ http.follow_redirects = False
+ response = tests.http_response_bytes(
+ status='301 Now where did I leave that URL',
+ headers={'location': '/final', 'cache-control': 'max-age=300'},
+ body=b'redirect body',
+ add_date=True,
+ )
+ with tests.server_const_bytes(response) as uri:
+ response, _ = http.request(uri, 'GET')
+ assert response.status == 301
+ assert not response.fromcache
+ response, _ = http.request(uri, 'GET')
+ assert response.status == 301
+ assert response.fromcache
+
+
+def test_get_302():
+ # Test that we automatically follow 302 redirects
+ # and that we DO NOT cache the 302 response
+ http = httplib2.Http(cache=tests.get_cache_path())
+ second_url, final_url = '', ''
+ routes = {
+ '/final': tests.http_response_bytes(body=b'This is the final destination.\n'),
+ '/second': tests.http_response_bytes(
+ status='302 Found', headers={'location': '/final'}, body=b'second redirect'),
+ '': tests.http_response_bytes(
+ status='302 Found', headers={'location': '/second'}, body=b'redirect body'),
+ }
+ with tests.server_route(routes, request_count=7) as uri:
+ second_url = urllib.parse.urljoin(uri, '/second')
+ final_url = urllib.parse.urljoin(uri, '/final')
+ response1, content1 = http.request(second_url, 'GET')
+ response2, content2 = http.request(second_url, 'GET')
+ response3, content3 = http.request(uri, 'GET')
+ assert response1.status == 200
+ assert response1['content-location'] == final_url
+ assert content1 == b'This is the final destination.\n'
+ assert response1.previous.status == 302
+ assert not response1.previous.fromcache
+
+ assert response2.status == 200
+ # FIXME:
+ # assert response2.fromcache
+ assert response2['content-location'] == final_url
+ assert content2 == b'This is the final destination.\n'
+ assert response2.previous.status == 302
+ assert not response2.previous.fromcache
+ assert response2.previous['content-location'] == second_url
+
+ assert response3.status == 200
+ # FIXME:
+ # assert response3.fromcache
+ assert content3 == b'This is the final destination.\n'
+ assert response3.previous.status == 302
+ assert not response3.previous.fromcache
+
+
+def test_get_302_redirection_limit():
+ # Test that we can set a lower redirection limit
+ # and that we raise an exception when we exceed
+ # that limit.
+ http = httplib2.Http()
+ http.force_exception_to_status_code = False
+ routes = {
+ '/second': tests.http_response_bytes(
+ status='302 Found', headers={'location': '/final'}, body=b'second redirect'),
+ '': tests.http_response_bytes(
+ status='302 Found', headers={'location': '/second'}, body=b'redirect body'),
+ }
+ with tests.server_route(routes, request_count=4) as uri:
+ try:
+ http.request(uri, 'GET', redirections=1)
+ assert False, 'This should not happen'
+ except httplib2.RedirectLimit:
+ pass
+ except Exception:
+ assert False, 'Threw wrong kind of exception '
+
+ # Re-run the test with out the exceptions
+ http.force_exception_to_status_code = True
+ response, content = http.request(uri, 'GET', redirections=1)
+
+ assert response.status == 500
+ assert response.reason.startswith('Redirected more')
+ assert response['status'] == '302'
+ assert content == b'second redirect'
+ assert response.previous is not None
+
+
+def test_get_302_no_location():
+ # Test that we throw an exception when we get
+ # a 302 with no Location: header.
+ http = httplib2.Http()
+ http.force_exception_to_status_code = False
+ with tests.server_const_http(status='302 Found', request_count=2) as uri:
+ try:
+ http.request(uri, 'GET')
+ assert False, 'Should never reach here'
+ except httplib2.RedirectMissingLocation:
+ pass
+ except Exception:
+ assert False, 'Threw wrong kind of exception '
+
+ # Re-run the test with out the exceptions
+ http.force_exception_to_status_code = True
+ response, content = http.request(uri, 'GET')
+
+ assert response.status == 500
+ assert response.reason.startswith('Redirected but')
+ assert '302' == response['status']
+ assert content == b''
+
+
+@pytest.mark.skip(
+ not os.environ.get('httplib2_test_still_run_skipped') and
+ os.environ.get('TRAVIS_PYTHON_VERSION') in ('2.7', 'pypy'),
+ reason='FIXME: timeout on Travis py27 and pypy, works elsewhere',
+)
+def test_303():
+ # Do a follow-up GET on a Location: header
+ # returned from a POST that gave a 303.
+ http = httplib2.Http()
+ routes = {
+ '/final': tests.make_http_reflect(),
+ '': tests.make_http_reflect(status='303 See Other', headers={'location': '/final'}),
+ }
+ with tests.server_route(routes, request_count=2) as uri:
+ response, content = http.request(uri, 'POST', " ")
+ assert response.status == 200
+ reflected = tests.HttpRequest.from_bytes(content)
+ assert reflected.uri == '/final'
+ assert response.previous.status == 303
+
+ # Skip follow-up GET
+ http = httplib2.Http()
+ http.follow_redirects = False
+ with tests.server_route(routes, request_count=1) as uri:
+ response, content = http.request(uri, 'POST', " ")
+ assert response.status == 303
+
+ # All methods can be used
+ http = httplib2.Http()
+ cases = 'DELETE GET HEAD POST PUT EVEN_NEW_ONES'.split(' ')
+ with tests.server_route(routes, request_count=len(cases) * 2) as uri:
+ for method in cases:
+ response, content = http.request(uri, method, body=b'q q')
+ assert response.status == 200
+ reflected = tests.HttpRequest.from_bytes(content)
+ assert reflected.method == 'GET'
+
+
+def test_etag_used():
+ # Test that we use ETags properly to validate our cache
+ cache_path = tests.get_cache_path()
+ http = httplib2.Http(cache=cache_path)
+ response_kwargs = dict(
+ add_date=True,
+ add_etag=True,
+ body=b'something',
+ headers={
+ 'cache-control': 'public,max-age=300',
+ },
+ )
+
+ def handler(request):
+ if request.headers.get('range'):
+ return tests.http_response_bytes(status=206, **response_kwargs)
+ return tests.http_response_bytes(**response_kwargs)
+
+ with tests.server_request(handler, request_count=2) as uri:
+ response, _ = http.request(uri, 'GET', headers={'accept-encoding': 'identity'})
+ assert response['etag'] == '"437b930db84b8079c2dd804a71936b5f"'
+
+ http.request(uri, 'GET', headers={'accept-encoding': 'identity'})
+ response, _ = http.request(
+ uri, 'GET',
+ headers={'accept-encoding': 'identity', 'cache-control': 'must-revalidate'},
+ )
+ assert response.status == 200
+ assert response.fromcache
+
+ # TODO: API to read cache item, at least internal to tests
+ cache_file_name = os.path.join(cache_path, httplib2.safename(httplib2.urlnorm(uri)[-1]))
+ with open(cache_file_name, 'r') as f:
+ status_line = f.readline()
+ assert status_line.startswith("status:")
+
+ response, content = http.request(uri, 'HEAD', headers={'accept-encoding': 'identity'})
+ assert response.status == 200
+ assert response.fromcache
+
+ response, content = http.request(uri, 'GET', headers={'accept-encoding': 'identity', 'range': 'bytes=0-0'})
+ assert response.status == 206
+ assert not response.fromcache
+
+
+def test_etag_ignore():
+ # Test that we can forcibly ignore ETags
+ http = httplib2.Http(cache=tests.get_cache_path())
+ response_kwargs = dict(
+ add_date=True,
+ add_etag=True,
+ )
+ with tests.server_reflect(request_count=3, **response_kwargs) as uri:
+ response, content = http.request(uri, 'GET', headers={'accept-encoding': 'identity'})
+ assert response.status == 200
+ assert response['etag'] != ""
+
+ response, content = http.request(
+ uri, 'GET',
+ headers={'accept-encoding': 'identity', 'cache-control': 'max-age=0'},
+ )
+ reflected = tests.HttpRequest.from_bytes(content)
+ assert reflected.headers.get('if-none-match')
+
+ http.ignore_etag = True
+ response, content = http.request(
+ uri, 'GET',
+ headers={'accept-encoding': 'identity', 'cache-control': 'max-age=0'},
+ )
+ assert not response.fromcache
+ reflected = tests.HttpRequest.from_bytes(content)
+ assert not reflected.headers.get('if-none-match')
+
+
+def test_etag_override():
+ # Test that we can forcibly ignore ETags
+ http = httplib2.Http(cache=tests.get_cache_path())
+ response_kwargs = dict(
+ add_date=True,
+ add_etag=True,
+ )
+ with tests.server_reflect(request_count=3, **response_kwargs) as uri:
+ response, _ = http.request(uri, 'GET', headers={'accept-encoding': 'identity'})
+ assert response.status == 200
+ assert response['etag'] != ''
+
+ response, content = http.request(
+ uri, 'GET',
+ headers={'accept-encoding': 'identity', 'cache-control': 'max-age=0'},
+ )
+ assert response.status == 200
+ reflected = tests.HttpRequest.from_bytes(content)
+ assert reflected.headers.get('if-none-match')
+ assert reflected.headers.get('if-none-match') != 'fred'
+
+ response, content = http.request(
+ uri, 'GET',
+ headers={'accept-encoding': 'identity', 'cache-control': 'max-age=0', 'if-none-match': 'fred'},
+ )
+ assert response.status == 200
+ reflected = tests.HttpRequest.from_bytes(content)
+ assert reflected.headers.get('if-none-match') == 'fred'
+
+
+@pytest.mark.skip(reason='was commented in legacy code')
+def test_get_304_end_to_end():
+ pass
+ # Test that end to end headers get overwritten in the cache
+ # uri = urllib.parse.urljoin(base, "304/end2end.cgi")
+ # response, content = http.request(uri, 'GET')
+ # assertNotEqual(response['etag'], "")
+ # old_date = response['date']
+ # time.sleep(2)
+
+ # response, content = http.request(uri, 'GET', headers = {'Cache-Control': 'max-age=0'})
+ # # The response should be from the cache, but the Date: header should be updated.
+ # new_date = response['date']
+ # assert new_date != old_date
+ # assert response.status == 200
+ # assert response.fromcache == True
+
+
+def test_get_304_last_modified():
+ # Test that we can still handle a 304
+ # by only using the last-modified cache validator.
+ http = httplib2.Http(cache=tests.get_cache_path())
+ date = email.utils.formatdate()
+
+ def handler(read):
+ read()
+ yield tests.http_response_bytes(
+ status=200,
+ body=b'something',
+ headers={
+ 'date': date,
+ 'last-modified': date,
+ },
+ )
+
+ request2 = read()
+ assert request2.headers['if-modified-since'] == date
+ yield tests.http_response_bytes(status=304)
+
+ with tests.server_yield(handler, request_count=2) as uri:
+ response, content = http.request(uri, 'GET')
+ assert response.get('last-modified') == date
+
+ response, content = http.request(uri, 'GET')
+ assert response.status == 200
+ assert response.fromcache
+
+
+def test_get_307():
+ # Test that we do follow 307 redirects but
+ # do not cache the 307
+ http = httplib2.Http(cache=tests.get_cache_path(), timeout=1)
+ r307 = tests.http_response_bytes(
+ status=307,
+ headers={'location': '/final'},
+ )
+ r200 = tests.http_response_bytes(
+ status=200,
+ add_date=True,
+ body=b'final content\n',
+ headers={'cache-control': 'max-age=300'},
+ )
+
+ with tests.server_list_http([r307, r200, r307]) as uri:
+ response, content = http.request(uri, 'GET')
+ assert response.previous.status == 307
+ assert not response.previous.fromcache
+ assert response.status == 200
+ assert not response.fromcache
+ assert content == b'final content\n'
+
+ response, content = http.request(uri, 'GET')
+ assert response.previous.status == 307
+ assert not response.previous.fromcache
+ assert response.status == 200
+ assert response.fromcache
+ assert content == b'final content\n'
+
+
+def test_get_410():
+ # Test that we pass 410's through
+ http = httplib2.Http()
+ with tests.server_const_http(status=410) as uri:
+ response, content = http.request(uri, 'GET')
+ assert response.status == 410
+
+
+def test_get_duplicate_headers():
+ # Test that duplicate headers get concatenated via ','
+ http = httplib2.Http()
+ response = b'''HTTP/1.0 200 OK\r\n\
+Link: link1\r\n\
+Content-Length: 7\r\n\
+Link: link2\r\n\r\n\
+content'''
+ with tests.server_const_bytes(response) as uri:
+ response, content = http.request(uri, 'GET')
+ assert response.status == 200
+ assert content == b"content"
+ assert response['link'], 'link1, link2'
diff --git a/tests/test_other.py b/tests/test_other.py
new file mode 100644
index 0000000..63788c6
--- /dev/null
+++ b/tests/test_other.py
@@ -0,0 +1,167 @@
+import httplib2
+import os
+import pickle
+import pytest
+import socket
+import sys
+import tests
+import time
+from six.moves import urllib
+
+
+@pytest.mark.skipif(
+ sys.version_info <= (3,),
+ reason='TODO: httplib2._convert_byte_str was defined only in python3 code version',
+)
+def test_convert_byte_str():
+ with tests.assert_raises(TypeError):
+ httplib2._convert_byte_str(4)
+ assert httplib2._convert_byte_str(b'Hello') == 'Hello'
+ assert httplib2._convert_byte_str('World') == 'World'
+
+
+def test_reflect():
+ http = httplib2.Http()
+ with tests.server_reflect() as uri:
+ response, content = http.request(uri + '?query', 'METHOD')
+ assert response.status == 200
+ host = urllib.parse.urlparse(uri).netloc
+ assert content.startswith('''\
+METHOD /?query HTTP/1.1\r\n\
+Host: {host}\r\n'''.format(host=host).encode()), content
+
+
+def test_pickle_http():
+ http = httplib2.Http(cache=tests.get_cache_path())
+ new_http = pickle.loads(pickle.dumps(http))
+
+ assert tuple(sorted(new_http.__dict__)) == tuple(sorted(http.__dict__))
+ assert new_http.credentials.credentials == http.credentials.credentials
+ assert new_http.certificates.credentials == http.certificates.credentials
+ assert new_http.cache.cache == http.cache.cache
+ for key in new_http.__dict__:
+ if key not in ('cache', 'certificates', 'credentials'):
+ assert getattr(new_http, key) == getattr(http, key)
+
+
+def test_pickle_http_with_connection():
+ http = httplib2.Http()
+ http.request('http://random-domain:81/', connection_type=tests.MockHTTPConnection)
+ new_http = pickle.loads(pickle.dumps(http))
+ assert tuple(http.connections) == ('http:random-domain:81',)
+ assert new_http.connections == {}
+
+
+def test_pickle_custom_request_http():
+ http = httplib2.Http()
+ http.request = lambda: None
+ http.request.dummy_attr = 'dummy_value'
+ new_http = pickle.loads(pickle.dumps(http))
+ assert getattr(new_http.request, 'dummy_attr', None) is None
+
+
+@pytest.mark.xfail(
+ sys.version_info >= (3,),
+ reason='FIXME: for unknown reason global timeout test fails in Python3 with response 200',
+)
+def test_timeout_global():
+ def handler(request):
+ time.sleep(0.5)
+ return tests.http_response_bytes()
+
+ try:
+ socket.setdefaulttimeout(0.1)
+ except Exception:
+ pytest.skip('cannot set global socket timeout')
+ try:
+ http = httplib2.Http()
+ http.force_exception_to_status_code = True
+ with tests.server_request(handler) as uri:
+ response, content = http.request(uri)
+ assert response.status == 408
+ assert response.reason.startswith("Request Timeout")
+ finally:
+ socket.setdefaulttimeout(None)
+
+
+def test_timeout_individual():
+ def handler(request):
+ time.sleep(0.5)
+ return tests.http_response_bytes()
+
+ http = httplib2.Http(timeout=0.1)
+ http.force_exception_to_status_code = True
+
+ with tests.server_request(handler) as uri:
+ response, content = http.request(uri)
+ assert response.status == 408
+ assert response.reason.startswith("Request Timeout")
+
+
+def test_timeout_https():
+ c = httplib2.HTTPSConnectionWithTimeout('localhost', 80, timeout=47)
+ assert 47 == c.timeout
+
+
+# @pytest.mark.xfail(
+# sys.version_info >= (3,),
+# reason='[py3] last request should open new connection, but client does not realize socket was closed by server',
+# )
+def test_connection_close():
+ http = httplib2.Http()
+ g = []
+
+ def handler(request):
+ g.append(request.number)
+ return tests.http_response_bytes(proto='HTTP/1.1')
+
+ with tests.server_request(handler, request_count=3) as uri:
+ http.request(uri, 'GET') # conn1 req1
+ for c in http.connections.values():
+ assert c.sock is not None
+ http.request(uri, 'GET', headers={'connection': 'close'})
+ time.sleep(0.7)
+ http.request(uri, 'GET') # conn2 req1
+ assert g == [1, 2, 1]
+
+
+def test_get_end2end_headers():
+ # one end to end header
+ response = {'content-type': 'application/atom+xml', 'te': 'deflate'}
+ end2end = httplib2._get_end2end_headers(response)
+ assert 'content-type' in end2end
+ assert 'te' not in end2end
+ assert 'connection' not in end2end
+
+ # one end to end header that gets eliminated
+ response = {'connection': 'content-type', 'content-type': 'application/atom+xml', 'te': 'deflate'}
+ end2end = httplib2._get_end2end_headers(response)
+ assert 'content-type' not in end2end
+ assert 'te' not in end2end
+ assert 'connection' not in end2end
+
+ # Degenerate case of no headers
+ response = {}
+ end2end = httplib2._get_end2end_headers(response)
+ assert len(end2end) == 0
+
+ # Degenerate case of connection referrring to a header not passed in
+ response = {'connection': 'content-type'}
+ end2end = httplib2._get_end2end_headers(response)
+ assert len(end2end) == 0
+
+
+@pytest.mark.xfail(
+ os.environ.get('TRAVIS_PYTHON_VERSION') in ('2.7', 'pypy'),
+ reason='FIXME: fail on Travis py27 and pypy, works elsewhere',
+)
+@pytest.mark.parametrize('scheme', ('http', 'https'))
+def test_ipv6(scheme):
+ # Even if IPv6 isn't installed on a machine it should just raise socket.error
+ uri = '{scheme}://[::1]:1/'.format(scheme=scheme)
+ try:
+ httplib2.Http(timeout=0.1).request(uri)
+ except socket.gaierror:
+ assert False, 'should get the address family right for IPv6'
+ except socket.error:
+ pass
diff --git a/tests/test_proxy.py b/tests/test_proxy.py
new file mode 100644
index 0000000..9d75f78
--- /dev/null
+++ b/tests/test_proxy.py
@@ -0,0 +1,72 @@
+'''Warning: these tests modify os.environ global state.
+Each test must be run in separate process.
+Must use pytest --forked or similar technique.
+'''
+import httplib2
+import os
+import pytest
+import sys
+# import tests
+
+
+def test_from_url():
+ pi = httplib2.proxy_info_from_url('http://myproxy.example.com')
+ assert pi.proxy_host == 'myproxy.example.com'
+ assert pi.proxy_port == 80
+ assert pi.proxy_user is None
+
+
+def test_from_url_ident():
+ pi = httplib2.proxy_info_from_url('http://zoidberg:fish@someproxy:99')
+ assert pi.proxy_host == 'someproxy'
+ assert pi.proxy_port == 99
+ assert pi.proxy_user == 'zoidberg'
+ assert pi.proxy_pass == 'fish'
+
+
+def test_from_env():
+ os.environ['http_proxy'] = 'http://myproxy.example.com:8080'
+ pi = httplib2.proxy_info_from_environment()
+ assert pi.proxy_host == 'myproxy.example.com'
+ assert pi.proxy_port == 8080
+
+
+def test_from_env_https():
+ os.environ['http_proxy'] = 'http://myproxy.example.com:80'
+ os.environ['https_proxy'] = 'http://myproxy.example.com:81'
+ pi = httplib2.proxy_info_from_environment('https')
+ assert pi.proxy_host == 'myproxy.example.com'
+ assert pi.proxy_port == 81
+
+
+def test_from_env_none():
+ os.environ.clear()
+ pi = httplib2.proxy_info_from_environment()
+ assert pi is None
+
+
+@pytest.mark.skipif(sys.version_info >= (3,), reason='FIXME: https://github.com/httplib2/httplib2/issues/53')
+def test_applies_to():
+ os.environ['http_proxy'] = 'http://myproxy.example.com:80'
+ os.environ['https_proxy'] = 'http://myproxy.example.com:81'
+ os.environ['no_proxy'] = 'localhost,otherhost.domain.local,example.com'
+ pi = httplib2.proxy_info_from_environment()
+ assert not pi.applies_to('localhost')
+ assert pi.applies_to('www.google.com')
+ assert not pi.applies_to('www.example.com')
+
+
+@pytest.mark.skipif(sys.version_info >= (3,), reason='FIXME: https://github.com/httplib2/httplib2/issues/53')
+def test_noproxy_star():
+ os.environ['http_proxy'] = 'http://myproxy.example.com:80'
+ os.environ['NO_PROXY'] = '*'
+ pi = httplib2.proxy_info_from_environment()
+ for host in ('localhost', '169.254.38.192', 'www.google.com'):
+ assert not pi.applies_to(host)
+
+
+@pytest.mark.skipif(sys.version_info >= (3,), reason='FIXME: https://github.com/httplib2/httplib2/issues/53')
+def test_headers():
+ headers = {'key0': 'val0', 'key1': 'val1'}
+ pi = httplib2.ProxyInfo(httplib2.socks.PROXY_TYPE_HTTP, 'localhost', 1234, proxy_headers=headers)
+ assert pi.proxy_headers == headers
diff --git a/tests/test_uri.py b/tests/test_uri.py
new file mode 100644
index 0000000..3ed3b74
--- /dev/null
+++ b/tests/test_uri.py
@@ -0,0 +1,78 @@
+import httplib2
+
+
+def test_from_std66():
+ cases = (
+ ('http://example.com',
+ ('http', 'example.com', '', None, None)),
+ ('https://example.com',
+ ('https', 'example.com', '', None, None)),
+ ('https://example.com:8080',
+ ('https', 'example.com:8080', '', None, None)),
+ ('http://example.com/',
+ ('http', 'example.com', '/', None, None)),
+ ('http://example.com/path',
+ ('http', 'example.com', '/path', None, None)),
+ ('http://example.com/path?a=1&b=2',
+ ('http', 'example.com', '/path', 'a=1&b=2', None)),
+ ('http://example.com/path?a=1&b=2#fred',
+ ('http', 'example.com', '/path', 'a=1&b=2', 'fred')),
+ ('http://example.com/path?a=1&b=2#fred',
+ ('http', 'example.com', '/path', 'a=1&b=2', 'fred')),
+ )
+ for a, b in cases:
+ assert httplib2.parse_uri(a) == b
+
+
+def test_norm():
+ cases = (
+ ('http://example.org',
+ 'http://example.org/'),
+ ('http://EXAMple.org',
+ 'http://example.org/'),
+ ('http://EXAMple.org?=b',
+ 'http://example.org/?=b'),
+ ('http://EXAMple.org/mypath?a=b',
+ 'http://example.org/mypath?a=b'),
+ ('http://localhost:80',
+ 'http://localhost:80/'),
+ )
+ for a, b in cases:
+ assert httplib2.urlnorm(a)[-1] == b
+
+ assert httplib2.urlnorm('http://localhost:80/') == httplib2.urlnorm('HTTP://LOCALHOST:80')
+
+ try:
+ httplib2.urlnorm('/')
+ assert False, 'Non-absolute URIs should raise an exception'
+ except httplib2.RelativeURIError:
+ pass
+
+
+def test_safename():
+ cases = (
+ ('http://example.org/fred/?a=b',
+ 'example.org,fred,a=b,58489f63a7a83c3b7794a6a398ee8b1f'),
+ ('http://example.org/fred?/a=b',
+ 'example.org,fred,a=b,8c5946d56fec453071f43329ff0be46b'),
+ ('http://www.example.org/fred?/a=b',
+ 'www.example.org,fred,a=b,499c44b8d844a011b67ea2c015116968'),
+ ('https://www.example.org/fred?/a=b',
+ 'www.example.org,fred,a=b,692e843a333484ce0095b070497ab45d'),
+ (httplib2.urlnorm('http://WWW')[-1],
+ httplib2.safename(httplib2.urlnorm('http://www')[-1])),
+ (u'http://\u2304.org/fred/?a=b',
+ 'xn--http,-4y1d.org,fred,a=b,579924c35db315e5a32e3d9963388193'),
+ )
+ for a, b in cases:
+ assert httplib2.safename(a) == b
+
+ assert httplib2.safename('http://www') != httplib2.safename('https://www')
+
+ # Test the max length limits
+ uri = 'http://' + ('w' * 200) + '.org'
+ uri2 = 'http://' + ('w' * 201) + '.org'
+ assert httplib2.safename(uri) != httplib2.safename(uri2)
+ # Max length should be 200 + 1 (',') + 32
+ assert len(httplib2.safename(uri2)) == 233
+ assert len(httplib2.safename(uri)) == 233