Make testing style more consistent (#168)
Several overall style changes:
1. Avoid plain `Mock()` and `Mock(spec=thing)`., prefer `mock.create_autospec()`.
1. Don't `mock.patch` without `autospec`.
1. Don't give mock instances special names. Prefer `thing` over `thing_mock` and `mock_thing`.
1. When using `mock.patch`, use the same name as the item being patched to refer to the mock.
```python
with mock.patch('module.thing') as thing:
...
```
and
```
@mock.patch('module.thing')
def test(thing):
...
```
1. Test helper factories should follow the naming convention `make_thing()`.
1. Use `ThingStub` when creating semi-functioning subclasses for testing purposes.
diff --git a/tests/compute_engine/test__metadata.py b/tests/compute_engine/test__metadata.py
index 0adf882..bf48882 100644
--- a/tests/compute_engine/test__metadata.py
+++ b/tests/compute_engine/test__metadata.py
@@ -24,103 +24,101 @@
from google.auth import _helpers
from google.auth import environment_vars
from google.auth import exceptions
+from google.auth import transport
from google.auth.compute_engine import _metadata
PATH = 'instance/service-accounts/default'
-@pytest.fixture
-def mock_request():
- request_mock = mock.Mock()
+def make_request(data, status=http_client.OK, headers=None):
+ response = mock.create_autospec(transport.Response, instance=True)
+ response.status = status
+ response.data = _helpers.to_bytes(data)
+ response.headers = headers or {}
- def set_response(data, status=http_client.OK, headers=None):
- response = mock.Mock()
- response.status = status
- response.data = _helpers.to_bytes(data)
- response.headers = headers or {}
- request_mock.return_value = response
- return request_mock
+ request = mock.create_autospec(transport.Request)
+ request.return_value = response
- yield set_response
+ return request
-def test_ping_success(mock_request):
- request_mock = mock_request('', headers=_metadata._METADATA_HEADERS)
+def test_ping_success():
+ request = make_request('', headers=_metadata._METADATA_HEADERS)
- assert _metadata.ping(request_mock)
+ assert _metadata.ping(request)
- request_mock.assert_called_once_with(
+ request.assert_called_once_with(
method='GET',
url=_metadata._METADATA_IP_ROOT,
headers=_metadata._METADATA_HEADERS,
timeout=_metadata._METADATA_DEFAULT_TIMEOUT)
-def test_ping_failure_bad_flavor(mock_request):
- request_mock = mock_request(
+def test_ping_failure_bad_flavor():
+ request = make_request(
'', headers={_metadata._METADATA_FLAVOR_HEADER: 'meep'})
- assert not _metadata.ping(request_mock)
+ assert not _metadata.ping(request)
-def test_ping_failure_connection_failed(mock_request):
- request_mock = mock_request('')
- request_mock.side_effect = exceptions.TransportError()
+def test_ping_failure_connection_failed():
+ request = make_request('')
+ request.side_effect = exceptions.TransportError()
- assert not _metadata.ping(request_mock)
+ assert not _metadata.ping(request)
-def test_ping_success_custom_root(mock_request):
- request_mock = mock_request('', headers=_metadata._METADATA_HEADERS)
+def test_ping_success_custom_root():
+ request = make_request('', headers=_metadata._METADATA_HEADERS)
fake_ip = '1.2.3.4'
os.environ[environment_vars.GCE_METADATA_IP] = fake_ip
reload_module(_metadata)
try:
- assert _metadata.ping(request_mock)
+ assert _metadata.ping(request)
finally:
del os.environ[environment_vars.GCE_METADATA_IP]
reload_module(_metadata)
- request_mock.assert_called_once_with(
+ request.assert_called_once_with(
method='GET',
url='http://' + fake_ip,
headers=_metadata._METADATA_HEADERS,
timeout=_metadata._METADATA_DEFAULT_TIMEOUT)
-def test_get_success_json(mock_request):
+def test_get_success_json():
key, value = 'foo', 'bar'
data = json.dumps({key: value})
- request_mock = mock_request(
+ request = make_request(
data, headers={'content-type': 'application/json'})
- result = _metadata.get(request_mock, PATH)
+ result = _metadata.get(request, PATH)
- request_mock.assert_called_once_with(
+ request.assert_called_once_with(
method='GET',
url=_metadata._METADATA_ROOT + PATH,
headers=_metadata._METADATA_HEADERS)
assert result[key] == value
-def test_get_success_text(mock_request):
+def test_get_success_text():
data = 'foobar'
- request_mock = mock_request(data, headers={'content-type': 'text/plain'})
+ request = make_request(data, headers={'content-type': 'text/plain'})
- result = _metadata.get(request_mock, PATH)
+ result = _metadata.get(request, PATH)
- request_mock.assert_called_once_with(
+ request.assert_called_once_with(
method='GET',
url=_metadata._METADATA_ROOT + PATH,
headers=_metadata._METADATA_HEADERS)
assert result == data
-def test_get_success_custom_root(mock_request):
- request_mock = mock_request(
+def test_get_success_custom_root():
+ request = make_request(
'{}', headers={'content-type': 'application/json'})
fake_root = 'another.metadata.service'
@@ -128,55 +126,55 @@
reload_module(_metadata)
try:
- _metadata.get(request_mock, PATH)
+ _metadata.get(request, PATH)
finally:
del os.environ[environment_vars.GCE_METADATA_ROOT]
reload_module(_metadata)
- request_mock.assert_called_once_with(
+ request.assert_called_once_with(
method='GET',
url='http://{}/computeMetadata/v1/{}'.format(fake_root, PATH),
headers=_metadata._METADATA_HEADERS)
-def test_get_failure(mock_request):
- request_mock = mock_request(
+def test_get_failure():
+ request = make_request(
'Metadata error', status=http_client.NOT_FOUND)
with pytest.raises(exceptions.TransportError) as excinfo:
- _metadata.get(request_mock, PATH)
+ _metadata.get(request, PATH)
assert excinfo.match(r'Metadata error')
- request_mock.assert_called_once_with(
+ request.assert_called_once_with(
method='GET',
url=_metadata._METADATA_ROOT + PATH,
headers=_metadata._METADATA_HEADERS)
-def test_get_failure_bad_json(mock_request):
- request_mock = mock_request(
+def test_get_failure_bad_json():
+ request = make_request(
'{', headers={'content-type': 'application/json'})
with pytest.raises(exceptions.TransportError) as excinfo:
- _metadata.get(request_mock, PATH)
+ _metadata.get(request, PATH)
assert excinfo.match(r'invalid JSON')
- request_mock.assert_called_once_with(
+ request.assert_called_once_with(
method='GET',
url=_metadata._METADATA_ROOT + PATH,
headers=_metadata._METADATA_HEADERS)
-def test_get_project_id(mock_request):
+def test_get_project_id():
project = 'example-project'
- request_mock = mock_request(
+ request = make_request(
project, headers={'content-type': 'text/plain'})
- project_id = _metadata.get_project_id(request_mock)
+ project_id = _metadata.get_project_id(request)
- request_mock.assert_called_once_with(
+ request.assert_called_once_with(
method='GET',
url=_metadata._METADATA_ROOT + 'project/project-id',
headers=_metadata._METADATA_HEADERS)
@@ -184,15 +182,15 @@
@mock.patch('google.auth._helpers.utcnow', return_value=datetime.datetime.min)
-def test_get_service_account_token(utcnow, mock_request):
+def test_get_service_account_token(utcnow):
ttl = 500
- request_mock = mock_request(
+ request = make_request(
json.dumps({'access_token': 'token', 'expires_in': ttl}),
headers={'content-type': 'application/json'})
- token, expiry = _metadata.get_service_account_token(request_mock)
+ token, expiry = _metadata.get_service_account_token(request)
- request_mock.assert_called_once_with(
+ request.assert_called_once_with(
method='GET',
url=_metadata._METADATA_ROOT + PATH + '/token',
headers=_metadata._METADATA_HEADERS)
@@ -200,15 +198,15 @@
assert expiry == utcnow() + datetime.timedelta(seconds=ttl)
-def test_get_service_account_info(mock_request):
+def test_get_service_account_info():
key, value = 'foo', 'bar'
- request_mock = mock_request(
+ request = make_request(
json.dumps({key: value}),
headers={'content-type': 'application/json'})
- info = _metadata.get_service_account_info(request_mock)
+ info = _metadata.get_service_account_info(request)
- request_mock.assert_called_once_with(
+ request.assert_called_once_with(
method='GET',
url=_metadata._METADATA_ROOT + PATH + '/?recursive=true',
headers=_metadata._METADATA_HEADERS)
diff --git a/tests/compute_engine/test_credentials.py b/tests/compute_engine/test_credentials.py
index 2564da9..732cb41 100644
--- a/tests/compute_engine/test_credentials.py
+++ b/tests/compute_engine/test_credentials.py
@@ -19,6 +19,7 @@
from google.auth import _helpers
from google.auth import exceptions
+from google.auth import transport
from google.auth.compute_engine import credentials
@@ -41,9 +42,9 @@
@mock.patch(
'google.auth._helpers.utcnow',
return_value=datetime.datetime.min + _helpers.CLOCK_SKEW)
- @mock.patch('google.auth.compute_engine._metadata.get')
- def test_refresh_success(self, get_mock, now_mock):
- get_mock.side_effect = [{
+ @mock.patch('google.auth.compute_engine._metadata.get', autospec=True)
+ def test_refresh_success(self, get, utcnow):
+ get.side_effect = [{
# First request is for sevice account info.
'email': 'service-account@example.com',
'scopes': ['one', 'two']
@@ -59,7 +60,7 @@
# Check that the credentials have the token and proper expiration
assert self.credentials.token == 'token'
assert self.credentials.expiry == (
- now_mock() + datetime.timedelta(seconds=500))
+ utcnow() + datetime.timedelta(seconds=500))
# Check the credential info
assert (self.credentials.service_account_email ==
@@ -71,8 +72,8 @@
assert self.credentials.valid
@mock.patch('google.auth.compute_engine._metadata.get', autospec=True)
- def test_refresh_error(self, get_mock):
- get_mock.side_effect = exceptions.TransportError('http error')
+ def test_refresh_error(self, get):
+ get.side_effect = exceptions.TransportError('http error')
with pytest.raises(exceptions.RefreshError) as excinfo:
self.credentials.refresh(None)
@@ -80,8 +81,8 @@
assert excinfo.match(r'http error')
@mock.patch('google.auth.compute_engine._metadata.get', autospec=True)
- def test_before_request_refreshes(self, get_mock):
- get_mock.side_effect = [{
+ def test_before_request_refreshes(self, get):
+ get.side_effect = [{
# First request is for sevice account info.
'email': 'service-account@example.com',
'scopes': 'one two'
@@ -95,11 +96,12 @@
assert not self.credentials.valid
# before_request should cause a refresh
+ request = mock.create_autospec(transport.Request, instance=True)
self.credentials.before_request(
- mock.Mock(), 'GET', 'http://example.com?a=1#3', {})
+ request, 'GET', 'http://example.com?a=1#3', {})
# The refresh endpoint should've been called.
- assert get_mock.called
+ assert get.called
# Credentials should now be valid.
assert self.credentials.valid
diff --git a/tests/oauth2/test__client.py b/tests/oauth2/test__client.py
index 8c19c3e..6aeb3d1 100644
--- a/tests/oauth2/test__client.py
+++ b/tests/oauth2/test__client.py
@@ -22,6 +22,7 @@
from six.moves import urllib
from google.auth import exceptions
+from google.auth import transport
from google.oauth2 import _client
@@ -46,7 +47,7 @@
@mock.patch('google.auth._helpers.utcnow', return_value=datetime.datetime.min)
-def test__parse_expiry(now_mock):
+def test__parse_expiry(unused_utcnow):
result = _client._parse_expiry({'expires_in': 500})
assert result == datetime.datetime.min + datetime.timedelta(seconds=500)
@@ -55,15 +56,17 @@
assert _client._parse_expiry({}) is None
-def _make_request(response_data):
- response = mock.Mock()
- response.status = http_client.OK
+def make_request(response_data, status=http_client.OK):
+ response = mock.create_autospec(transport.Response, instance=True)
+ response.status = status
response.data = json.dumps(response_data).encode('utf-8')
- return mock.Mock(return_value=response)
+ request = mock.create_autospec(transport.Request)
+ request.return_value = response
+ return request
def test__token_endpoint_request():
- request = _make_request({'test': 'response'})
+ request = make_request({'test': 'response'})
result = _client._token_endpoint_request(
request, 'http://example.com', {'test': 'params'})
@@ -80,16 +83,13 @@
def test__token_endpoint_request_error():
- response = mock.Mock()
- response.status = http_client.BAD_REQUEST
- response.data = b'Error'
- request = mock.Mock(return_value=response)
+ request = make_request({}, status=http_client.BAD_REQUEST)
with pytest.raises(exceptions.RefreshError):
_client._token_endpoint_request(request, 'http://example.com', {})
-def _verify_request_params(request, params):
+def verify_request_params(request, params):
request_body = request.call_args[1]['body']
request_params = urllib.parse.parse_qs(request_body)
@@ -98,8 +98,8 @@
@mock.patch('google.auth._helpers.utcnow', return_value=datetime.datetime.min)
-def test_jwt_grant(now_mock):
- request = _make_request({
+def test_jwt_grant(utcnow):
+ request = make_request({
'access_token': 'token',
'expires_in': 500,
'extra': 'data'})
@@ -108,19 +108,19 @@
request, 'http://example.com', 'assertion_value')
# Check request call
- _verify_request_params(request, {
+ verify_request_params(request, {
'grant_type': _client._JWT_GRANT_TYPE,
'assertion': 'assertion_value'
})
# Check result
assert token == 'token'
- assert expiry == datetime.datetime.min + datetime.timedelta(seconds=500)
+ assert expiry == utcnow() + datetime.timedelta(seconds=500)
assert extra_data['extra'] == 'data'
def test_jwt_grant_no_access_token():
- request = _make_request({
+ request = make_request({
# No access token.
'expires_in': 500,
'extra': 'data'})
@@ -130,8 +130,8 @@
@mock.patch('google.auth._helpers.utcnow', return_value=datetime.datetime.min)
-def test_refresh_grant(now_mock):
- request = _make_request({
+def test_refresh_grant(unused_utcnow):
+ request = make_request({
'access_token': 'token',
'refresh_token': 'new_refresh_token',
'expires_in': 500,
@@ -142,7 +142,7 @@
'client_secret')
# Check request call
- _verify_request_params(request, {
+ verify_request_params(request, {
'grant_type': _client._REFRESH_GRANT_TYPE,
'refresh_token': 'refresh_token',
'client_id': 'client_id',
@@ -157,7 +157,7 @@
def test_refresh_grant_no_access_token():
- request = _make_request({
+ request = make_request({
# No access token.
'refresh_token': 'new_refresh_token',
'expires_in': 500,
diff --git a/tests/oauth2/test_credentials.py b/tests/oauth2/test_credentials.py
index e15766a..14984a1 100644
--- a/tests/oauth2/test_credentials.py
+++ b/tests/oauth2/test_credentials.py
@@ -18,6 +18,7 @@
import pytest
from google.auth import _helpers
+from google.auth import transport
from google.oauth2 import credentials
@@ -26,40 +27,41 @@
REFRESH_TOKEN = 'refresh_token'
CLIENT_ID = 'client_id'
CLIENT_SECRET = 'client_secret'
- credentials = None
- @pytest.fixture(autouse=True)
- def credentials_fixture(self):
- self.credentials = credentials.Credentials(
- token=None, refresh_token=self.REFRESH_TOKEN,
- token_uri=self.TOKEN_URI, client_id=self.CLIENT_ID,
- client_secret=self.CLIENT_SECRET)
+ @classmethod
+ def make_credentials(cls):
+ return credentials.Credentials(
+ token=None, refresh_token=cls.REFRESH_TOKEN,
+ token_uri=cls.TOKEN_URI, client_id=cls.CLIENT_ID,
+ client_secret=cls.CLIENT_SECRET)
def test_default_state(self):
- assert not self.credentials.valid
+ credentials = self.make_credentials()
+ assert not credentials.valid
# Expiration hasn't been set yet
- assert not self.credentials.expired
+ assert not credentials.expired
# Scopes aren't required for these credentials
- assert not self.credentials.requires_scopes
+ assert not credentials.requires_scopes
# Test properties
- assert self.credentials.refresh_token == self.REFRESH_TOKEN
- assert self.credentials.token_uri == self.TOKEN_URI
- assert self.credentials.client_id == self.CLIENT_ID
- assert self.credentials.client_secret == self.CLIENT_SECRET
+ assert credentials.refresh_token == self.REFRESH_TOKEN
+ assert credentials.token_uri == self.TOKEN_URI
+ assert credentials.client_id == self.CLIENT_ID
+ assert credentials.client_secret == self.CLIENT_SECRET
def test_create_scoped(self):
+ credentials = self.make_credentials()
with pytest.raises(NotImplementedError):
- self.credentials.with_scopes(['email'])
+ credentials.with_scopes(['email'])
@mock.patch('google.oauth2._client.refresh_grant', autospec=True)
@mock.patch(
'google.auth._helpers.utcnow',
return_value=datetime.datetime.min + _helpers.CLOCK_SKEW)
- def test_refresh_success(self, now_mock, refresh_grant_mock):
+ def test_refresh_success(self, unused_utcnow, refresh_grant):
token = 'token'
expiry = _helpers.utcnow() + datetime.timedelta(seconds=500)
grant_response = {'id_token': mock.sentinel.id_token}
- refresh_grant_mock.return_value = (
+ refresh_grant.return_value = (
# Access token
token,
# New refresh token
@@ -68,21 +70,23 @@
expiry,
# Extra data
grant_response)
- request_mock = mock.Mock()
+
+ request = mock.create_autospec(transport.Request)
+ credentials = self.make_credentials()
# Refresh credentials
- self.credentials.refresh(request_mock)
+ credentials.refresh(request)
# Check jwt grant call.
- refresh_grant_mock.assert_called_with(
- request_mock, self.TOKEN_URI, self.REFRESH_TOKEN, self.CLIENT_ID,
+ refresh_grant.assert_called_with(
+ request, self.TOKEN_URI, self.REFRESH_TOKEN, self.CLIENT_ID,
self.CLIENT_SECRET)
# Check that the credentials have the token and expiry
- assert self.credentials.token == token
- assert self.credentials.expiry == expiry
- assert self.credentials.id_token == mock.sentinel.id_token
+ assert credentials.token == token
+ assert credentials.expiry == expiry
+ assert credentials.id_token == mock.sentinel.id_token
# Check that the credentials are valid (have a token and are not
# expired)
- assert self.credentials.valid
+ assert credentials.valid
diff --git a/tests/oauth2/test_id_token.py b/tests/oauth2/test_id_token.py
index 7f89547..360f92f 100644
--- a/tests/oauth2/test_id_token.py
+++ b/tests/oauth2/test_id_token.py
@@ -18,18 +18,20 @@
import pytest
from google.auth import exceptions
-import google.auth.transport
+from google.auth import transport
from google.oauth2 import id_token
def make_request(status, data=None):
- response = mock.Mock()
+ response = mock.create_autospec(transport.Response, instance=True)
response.status = status
if data is not None:
response.data = json.dumps(data).encode('utf-8')
- return mock.Mock(return_value=response, spec=google.auth.transport.Request)
+ request = mock.create_autospec(transport.Request)
+ request.return_value = response
+ return request
def test__fetch_certs_success():
diff --git a/tests/oauth2/test_service_account.py b/tests/oauth2/test_service_account.py
index e80b7d4..94bd7f4 100644
--- a/tests/oauth2/test_service_account.py
+++ b/tests/oauth2/test_service_account.py
@@ -17,11 +17,11 @@
import os
import mock
-import pytest
from google.auth import _helpers
from google.auth import crypt
from google.auth import jwt
+from google.auth import transport
from google.oauth2 import service_account
@@ -41,21 +41,17 @@
with open(SERVICE_ACCOUNT_JSON_FILE, 'r') as fh:
SERVICE_ACCOUNT_INFO = json.load(fh)
-
-@pytest.fixture(scope='module')
-def signer():
- return crypt.RSASigner.from_string(PRIVATE_KEY_BYTES, '1')
+SIGNER = crypt.RSASigner.from_string(PRIVATE_KEY_BYTES, '1')
class TestCredentials(object):
SERVICE_ACCOUNT_EMAIL = 'service-account@example.com'
TOKEN_URI = 'https://example.com/oauth2/token'
- credentials = None
- @pytest.fixture(autouse=True)
- def credentials_fixture(self, signer):
- self.credentials = service_account.Credentials(
- signer, self.SERVICE_ACCOUNT_EMAIL, self.TOKEN_URI)
+ @classmethod
+ def make_credentials(cls):
+ return service_account.Credentials(
+ SIGNER, cls.SERVICE_ACCOUNT_EMAIL, cls.TOKEN_URI)
def test_from_service_account_info(self):
credentials = service_account.Credentials.from_service_account_info(
@@ -112,96 +108,108 @@
assert credentials._additional_claims == additional_claims
def test_default_state(self):
- assert not self.credentials.valid
+ credentials = self.make_credentials()
+ assert not credentials.valid
# Expiration hasn't been set yet
- assert not self.credentials.expired
+ assert not credentials.expired
# Scopes haven't been specified yet
- assert self.credentials.requires_scopes
+ assert credentials.requires_scopes
def test_sign_bytes(self):
+ credentials = self.make_credentials()
to_sign = b'123'
- signature = self.credentials.sign_bytes(to_sign)
+ signature = credentials.sign_bytes(to_sign)
assert crypt.verify_signature(to_sign, signature, PUBLIC_CERT_BYTES)
def test_signer(self):
- assert isinstance(self.credentials.signer, crypt.Signer)
+ credentials = self.make_credentials()
+ assert isinstance(credentials.signer, crypt.Signer)
def test_signer_email(self):
- assert self.credentials.signer_email == self.SERVICE_ACCOUNT_EMAIL
+ credentials = self.make_credentials()
+ assert credentials.signer_email == self.SERVICE_ACCOUNT_EMAIL
def test_create_scoped(self):
+ credentials = self.make_credentials()
scopes = ['email', 'profile']
- credentials = self.credentials.with_scopes(scopes)
+ credentials = credentials.with_scopes(scopes)
assert credentials._scopes == scopes
def test_with_claims(self):
- new_credentials = self.credentials.with_claims({'meep': 'moop'})
+ credentials = self.make_credentials()
+ new_credentials = credentials.with_claims({'meep': 'moop'})
assert new_credentials._additional_claims == {'meep': 'moop'}
def test__make_authorization_grant_assertion(self):
- token = self.credentials._make_authorization_grant_assertion()
+ credentials = self.make_credentials()
+ token = credentials._make_authorization_grant_assertion()
payload = jwt.decode(token, PUBLIC_CERT_BYTES)
assert payload['iss'] == self.SERVICE_ACCOUNT_EMAIL
assert payload['aud'] == self.TOKEN_URI
def test__make_authorization_grant_assertion_scoped(self):
+ credentials = self.make_credentials()
scopes = ['email', 'profile']
- credentials = self.credentials.with_scopes(scopes)
+ credentials = credentials.with_scopes(scopes)
token = credentials._make_authorization_grant_assertion()
payload = jwt.decode(token, PUBLIC_CERT_BYTES)
assert payload['scope'] == 'email profile'
def test__make_authorization_grant_assertion_subject(self):
+ credentials = self.make_credentials()
subject = 'user@example.com'
- credentials = self.credentials.with_subject(subject)
+ credentials = credentials.with_subject(subject)
token = credentials._make_authorization_grant_assertion()
payload = jwt.decode(token, PUBLIC_CERT_BYTES)
assert payload['sub'] == subject
@mock.patch('google.oauth2._client.jwt_grant', autospec=True)
- def test_refresh_success(self, jwt_grant_mock):
+ def test_refresh_success(self, jwt_grant):
+ credentials = self.make_credentials()
token = 'token'
- jwt_grant_mock.return_value = (
+ jwt_grant.return_value = (
token,
_helpers.utcnow() + datetime.timedelta(seconds=500),
{})
- request_mock = mock.Mock()
+ request = mock.create_autospec(transport.Request, instance=True)
# Refresh credentials
- self.credentials.refresh(request_mock)
+ credentials.refresh(request)
# Check jwt grant call.
- assert jwt_grant_mock.called
- request, token_uri, assertion = jwt_grant_mock.call_args[0]
- assert request == request_mock
- assert token_uri == self.credentials._token_uri
+ assert jwt_grant.called
+
+ called_request, token_uri, assertion = jwt_grant.call_args[0]
+ assert called_request == request
+ assert token_uri == credentials._token_uri
assert jwt.decode(assertion, PUBLIC_CERT_BYTES)
# No further assertion done on the token, as there are separate tests
# for checking the authorization grant assertion.
# Check that the credentials have the token.
- assert self.credentials.token == token
+ assert credentials.token == token
# Check that the credentials are valid (have a token and are not
# expired)
- assert self.credentials.valid
+ assert credentials.valid
@mock.patch('google.oauth2._client.jwt_grant', autospec=True)
- def test_before_request_refreshes(self, jwt_grant_mock):
+ def test_before_request_refreshes(self, jwt_grant):
+ credentials = self.make_credentials()
token = 'token'
- jwt_grant_mock.return_value = (
+ jwt_grant.return_value = (
token, _helpers.utcnow() + datetime.timedelta(seconds=500), None)
- request_mock = mock.Mock()
+ request = mock.create_autospec(transport.Request, instance=True)
# Credentials should start as invalid
- assert not self.credentials.valid
+ assert not credentials.valid
# before_request should cause a refresh
- self.credentials.before_request(
- request_mock, 'GET', 'http://example.com?a=1#3', {})
+ credentials.before_request(
+ request, 'GET', 'http://example.com?a=1#3', {})
# The refresh endpoint should've been called.
- assert jwt_grant_mock.called
+ assert jwt_grant.called
# Credentials should now be valid.
- assert self.credentials.valid
+ assert credentials.valid
diff --git a/tests/test__cloud_sdk.py b/tests/test__cloud_sdk.py
index 482a7ed..6e92ca6 100644
--- a/tests/test__cloud_sdk.py
+++ b/tests/test__cloud_sdk.py
@@ -40,43 +40,36 @@
CLOUD_SDK_CONFIG_FILE_DATA = fh.read()
-@mock.patch(
- 'subprocess.check_output', autospec=True,
- return_value=CLOUD_SDK_CONFIG_FILE_DATA)
-def test_get_project_id(check_output_mock):
- project_id = _cloud_sdk.get_project_id()
- assert project_id == 'example-project'
+@pytest.mark.parametrize('data, expected_project_id', [
+ (CLOUD_SDK_CONFIG_FILE_DATA, 'example-project'),
+ (b'I am some bad json', None),
+ (b'{}', None)
+])
+def test_get_project_id(data, expected_project_id):
+ check_output_patch = mock.patch(
+ 'subprocess.check_output', autospec=True, return_value=data)
+
+ with check_output_patch as check_output:
+ project_id = _cloud_sdk.get_project_id()
+
+ assert project_id == expected_project_id
+ assert check_output.called
@mock.patch(
'subprocess.check_output', autospec=True,
side_effect=subprocess.CalledProcessError(-1, None))
-def test_get_project_id_call_error(check_output_mock):
+def test_get_project_id_call_error(check_output):
project_id = _cloud_sdk.get_project_id()
assert project_id is None
-
-
-@mock.patch(
- 'subprocess.check_output', autospec=True,
- return_value=b'I am some bad json')
-def test_get_project_id_bad_json(check_output_mock):
- project_id = _cloud_sdk.get_project_id()
- assert project_id is None
-
-
-@mock.patch(
- 'subprocess.check_output', autospec=True,
- return_value=b'{}')
-def test_get_project_id_missing_value(check_output_mock):
- project_id = _cloud_sdk.get_project_id()
- assert project_id is None
+ assert check_output.called
@mock.patch(
'google.auth._cloud_sdk.get_config_path', autospec=True)
-def test_get_application_default_credentials_path(mock_get_config_dir):
+def test_get_application_default_credentials_path(get_config_dir):
config_path = 'config_path'
- mock_get_config_dir.return_value = config_path
+ get_config_dir.return_value = config_path
credentials_path = _cloud_sdk.get_application_default_credentials_path()
assert credentials_path == os.path.join(
config_path, _cloud_sdk._CREDENTIALS_FILENAME)
@@ -91,8 +84,8 @@
@mock.patch('os.path.expanduser')
-def test_get_config_path_unix(mock_expanduser):
- mock_expanduser.side_effect = lambda path: path
+def test_get_config_path_unix(expanduser):
+ expanduser.side_effect = lambda path: path
config_path = _cloud_sdk.get_config_path()
diff --git a/tests/test__default.py b/tests/test__default.py
index 001a19c..8054ac5 100644
--- a/tests/test__default.py
+++ b/tests/test__default.py
@@ -105,20 +105,19 @@
@LOAD_FILE_PATCH
-def test__get_explicit_environ_credentials(mock_load, monkeypatch):
+def test__get_explicit_environ_credentials(load, monkeypatch):
monkeypatch.setenv(environment_vars.CREDENTIALS, 'filename')
credentials, project_id = _default._get_explicit_environ_credentials()
assert credentials is mock.sentinel.credentials
assert project_id is mock.sentinel.project_id
- mock_load.assert_called_with('filename')
+ load.assert_called_with('filename')
@LOAD_FILE_PATCH
-def test__get_explicit_environ_credentials_no_project_id(
- mock_load, monkeypatch):
- mock_load.return_value = (mock.sentinel.credentials, None)
+def test__get_explicit_environ_credentials_no_project_id(load, monkeypatch):
+ load.return_value = mock.sentinel.credentials, None
monkeypatch.setenv(environment_vars.CREDENTIALS, 'filename')
credentials, project_id = _default._get_explicit_environ_credentials()
@@ -131,23 +130,22 @@
@mock.patch(
'google.auth._cloud_sdk.get_application_default_credentials_path',
autospec=True)
-def test__get_gcloud_sdk_credentials(
- mock_get_adc_path, mock_load):
- mock_get_adc_path.return_value = SERVICE_ACCOUNT_FILE
+def test__get_gcloud_sdk_credentials(get_adc_path, load):
+ get_adc_path.return_value = SERVICE_ACCOUNT_FILE
credentials, project_id = _default._get_gcloud_sdk_credentials()
assert credentials is mock.sentinel.credentials
assert project_id is mock.sentinel.project_id
- mock_load.assert_called_with(SERVICE_ACCOUNT_FILE)
+ load.assert_called_with(SERVICE_ACCOUNT_FILE)
@mock.patch(
'google.auth._cloud_sdk.get_application_default_credentials_path',
autospec=True)
-def test__get_gcloud_sdk_credentials_non_existent(mock_get_adc_path, tmpdir):
+def test__get_gcloud_sdk_credentials_non_existent(get_adc_path, tmpdir):
non_existent = tmpdir.join('non-existent')
- mock_get_adc_path.return_value = str(non_existent)
+ get_adc_path.return_value = str(non_existent)
credentials, project_id = _default._get_gcloud_sdk_credentials()
@@ -158,19 +156,19 @@
@mock.patch(
'google.auth._cloud_sdk.get_project_id',
return_value=mock.sentinel.project_id, autospec=True)
-@mock.patch('os.path.isfile', return_value=True)
+@mock.patch('os.path.isfile', return_value=True, autospec=True)
@LOAD_FILE_PATCH
def test__get_gcloud_sdk_credentials_project_id(
- mock_load, unused_mock_isfile, mock_get_project_id):
+ load, unused_isfile, get_project_id):
# Don't return a project ID from load file, make the function check
# the Cloud SDK project.
- mock_load.return_value = (mock.sentinel.credentials, None)
+ load.return_value = mock.sentinel.credentials, None
credentials, project_id = _default._get_gcloud_sdk_credentials()
assert credentials == mock.sentinel.credentials
assert project_id == mock.sentinel.project_id
- assert mock_get_project_id.called
+ assert get_project_id.called
@mock.patch(
@@ -179,28 +177,39 @@
@mock.patch('os.path.isfile', return_value=True)
@LOAD_FILE_PATCH
def test__get_gcloud_sdk_credentials_no_project_id(
- mock_load, unused_mock_isfile, mock_get_project_id):
+ load, unused_isfile, get_project_id):
# Don't return a project ID from load file, make the function check
# the Cloud SDK project.
- mock_load.return_value = (mock.sentinel.credentials, None)
+ load.return_value = mock.sentinel.credentials, None
credentials, project_id = _default._get_gcloud_sdk_credentials()
assert credentials == mock.sentinel.credentials
assert project_id is None
+ assert get_project_id.called
+
+
+class _AppIdentityModule(object):
+ """The interface of the App Idenity app engine module.
+ See https://cloud.google.com/appengine/docs/standard/python/refdocs\
+ /google.appengine.api.app_identity.app_identity
+ """
+ def get_application_id(self):
+ raise NotImplementedError()
@pytest.fixture
-def app_identity_mock(monkeypatch):
+def app_identity(monkeypatch):
"""Mocks the app_identity module for google.auth.app_engine."""
- app_identity_mock = mock.Mock()
+ app_identity_module = mock.create_autospec(
+ _AppIdentityModule, instance=True)
monkeypatch.setattr(
- app_engine, 'app_identity', app_identity_mock)
- yield app_identity_mock
+ app_engine, 'app_identity', app_identity_module)
+ yield app_identity_module
-def test__get_gae_credentials(app_identity_mock):
- app_identity_mock.get_application_id.return_value = mock.sentinel.project
+def test__get_gae_credentials(app_identity):
+ app_identity.get_application_id.return_value = mock.sentinel.project
credentials, project_id = _default._get_gae_credentials()
@@ -218,7 +227,7 @@
@mock.patch(
'google.auth.compute_engine._metadata.get_project_id',
return_value='example-project', autospec=True)
-def test__get_gce_credentials(get_mock, ping_mock):
+def test__get_gce_credentials(unused_get, unused_ping):
credentials, project_id = _default._get_gce_credentials()
assert isinstance(credentials, compute_engine.Credentials)
@@ -228,7 +237,7 @@
@mock.patch(
'google.auth.compute_engine._metadata.ping', return_value=False,
autospec=True)
-def test__get_gce_credentials_no_ping(ping_mock):
+def test__get_gce_credentials_no_ping(unused_ping):
credentials, project_id = _default._get_gce_credentials()
assert credentials is None
@@ -241,7 +250,7 @@
@mock.patch(
'google.auth.compute_engine._metadata.get_project_id',
side_effect=exceptions.TransportError(), autospec=True)
-def test__get_gce_credentials_no_project_id(get_mock, ping_mock):
+def test__get_gce_credentials_no_project_id(unused_get, unused_ping):
credentials, project_id = _default._get_gce_credentials()
assert isinstance(credentials, compute_engine.Credentials)
@@ -251,16 +260,16 @@
@mock.patch(
'google.auth.compute_engine._metadata.ping', return_value=False,
autospec=True)
-def test__get_gce_credentials_explicit_request(ping_mock):
+def test__get_gce_credentials_explicit_request(ping):
_default._get_gce_credentials(mock.sentinel.request)
- ping_mock.assert_called_with(request=mock.sentinel.request)
+ ping.assert_called_with(request=mock.sentinel.request)
@mock.patch(
'google.auth._default._get_explicit_environ_credentials',
return_value=(mock.sentinel.credentials, mock.sentinel.project_id),
autospec=True)
-def test_default_early_out(get_mock):
+def test_default_early_out(unused_get):
assert _default.default() == (
mock.sentinel.credentials, mock.sentinel.project_id)
@@ -269,7 +278,7 @@
'google.auth._default._get_explicit_environ_credentials',
return_value=(mock.sentinel.credentials, mock.sentinel.project_id),
autospec=True)
-def test_default_explict_project_id(get_mock, monkeypatch):
+def test_default_explict_project_id(unused_get, monkeypatch):
monkeypatch.setenv(environment_vars.PROJECT, 'explicit-env')
assert _default.default() == (
mock.sentinel.credentials, 'explicit-env')
@@ -279,7 +288,7 @@
'google.auth._default._get_explicit_environ_credentials',
return_value=(mock.sentinel.credentials, mock.sentinel.project_id),
autospec=True)
-def test_default_explict_legacy_project_id(get_mock, monkeypatch):
+def test_default_explict_legacy_project_id(unused_get, monkeypatch):
monkeypatch.setenv(environment_vars.LEGACY_PROJECT, 'explicit-env')
assert _default.default() == (
mock.sentinel.credentials, 'explicit-env')
@@ -308,12 +317,12 @@
autospec=True)
@mock.patch(
'google.auth.credentials.with_scopes_if_required', autospec=True)
-def test_default_scoped(with_scopes_mock, get_mock):
+def test_default_scoped(with_scopes, get):
scopes = ['one', 'two']
credentials, project_id = _default.default(scopes=scopes)
- assert credentials == with_scopes_mock.return_value
+ assert credentials == with_scopes.return_value
assert project_id == mock.sentinel.project_id
- with_scopes_mock.assert_called_once_with(
+ with_scopes.assert_called_once_with(
mock.sentinel.credentials, scopes)
diff --git a/tests/test__oauth2client.py b/tests/test__oauth2client.py
index 1d7a4a8..832b24f 100644
--- a/tests/test__oauth2client.py
+++ b/tests/test__oauth2client.py
@@ -112,16 +112,16 @@
old_credentials.service_account_id)
-class MockCredentials(object):
+class FakeCredentials(object):
pass
def test_convert_success():
- convert_function = mock.Mock()
+ convert_function = mock.Mock(spec=['__call__'])
conversion_map_patch = mock.patch.object(
_oauth2client, '_CLASS_CONVERSION_MAP',
- {MockCredentials: convert_function})
- credentials = MockCredentials()
+ {FakeCredentials: convert_function})
+ credentials = FakeCredentials()
with conversion_map_patch:
result = _oauth2client.convert(credentials)
diff --git a/tests/test_app_engine.py b/tests/test_app_engine.py
index 2b957f0..9a62359 100644
--- a/tests/test_app_engine.py
+++ b/tests/test_app_engine.py
@@ -21,17 +21,36 @@
from google.auth import app_engine
+class _AppIdentityModule(object):
+ """The interface of the App Idenity app engine module.
+ See https://cloud.google.com/appengine/docs/standard/python/refdocs
+ /google.appengine.api.app_identity.app_identity
+ """
+ def get_application_id(self):
+ raise NotImplementedError()
+
+ def sign_blob(self, bytes_to_sign, deadline=None):
+ raise NotImplementedError()
+
+ def get_service_account_name(self, deadline=None):
+ raise NotImplementedError()
+
+ def get_access_token(self, scopes, service_account_id=None):
+ raise NotImplementedError()
+
+
@pytest.fixture
-def app_identity_mock(monkeypatch):
+def app_identity(monkeypatch):
"""Mocks the app_identity module for google.auth.app_engine."""
- app_identity_mock = mock.Mock()
+ app_identity_module = mock.create_autospec(
+ _AppIdentityModule, instance=True)
monkeypatch.setattr(
- app_engine, 'app_identity', app_identity_mock)
- yield app_identity_mock
+ app_engine, 'app_identity', app_identity_module)
+ yield app_identity_module
-def test_get_project_id(app_identity_mock):
- app_identity_mock.get_application_id.return_value = mock.sentinel.project
+def test_get_project_id(app_identity):
+ app_identity.get_application_id.return_value = mock.sentinel.project
assert app_engine.get_project_id() == mock.sentinel.project
@@ -43,16 +62,16 @@
class TestSigner(object):
- def test_key_id(self, app_identity_mock):
- app_identity_mock.sign_blob.return_value = (
+ def test_key_id(self, app_identity):
+ app_identity.sign_blob.return_value = (
mock.sentinel.key_id, mock.sentinel.signature)
signer = app_engine.Signer()
assert signer.key_id is None
- def test_sign(self, app_identity_mock):
- app_identity_mock.sign_blob.return_value = (
+ def test_sign(self, app_identity):
+ app_identity.sign_blob.return_value = (
mock.sentinel.key_id, mock.sentinel.signature)
signer = app_engine.Signer()
@@ -61,7 +80,7 @@
signature = signer.sign(to_sign)
assert signature == mock.sentinel.signature
- app_identity_mock.sign_blob.assert_called_with(to_sign)
+ app_identity.sign_blob.assert_called_with(to_sign)
class TestCredentials(object):
@@ -71,7 +90,7 @@
assert excinfo.match(r'App Engine APIs are not available')
- def test_default_state(self, app_identity_mock):
+ def test_default_state(self, app_identity):
credentials = app_engine.Credentials()
# Not token acquired yet
@@ -82,7 +101,7 @@
assert not credentials.scopes
assert credentials.requires_scopes
- def test_with_scopes(self, app_identity_mock):
+ def test_with_scopes(self, app_identity):
credentials = app_engine.Credentials()
assert not credentials.scopes
@@ -93,44 +112,44 @@
assert scoped_credentials.has_scopes(['email'])
assert not scoped_credentials.requires_scopes
- def test_service_account_email_implicit(self, app_identity_mock):
- app_identity_mock.get_service_account_name.return_value = (
+ def test_service_account_email_implicit(self, app_identity):
+ app_identity.get_service_account_name.return_value = (
mock.sentinel.service_account_email)
credentials = app_engine.Credentials()
assert (credentials.service_account_email ==
mock.sentinel.service_account_email)
- assert app_identity_mock.get_service_account_name.called
+ assert app_identity.get_service_account_name.called
- def test_service_account_email_explicit(self, app_identity_mock):
+ def test_service_account_email_explicit(self, app_identity):
credentials = app_engine.Credentials(
service_account_id=mock.sentinel.service_account_email)
assert (credentials.service_account_email ==
mock.sentinel.service_account_email)
- assert not app_identity_mock.get_service_account_name.called
+ assert not app_identity.get_service_account_name.called
@mock.patch(
'google.auth._helpers.utcnow',
return_value=datetime.datetime.min)
- def test_refresh(self, now_mock, app_identity_mock):
+ def test_refresh(self, utcnow, app_identity):
token = 'token'
ttl = _helpers.CLOCK_SKEW_SECS + 100
- app_identity_mock.get_access_token.return_value = token, ttl
+ app_identity.get_access_token.return_value = token, ttl
credentials = app_engine.Credentials(scopes=['email'])
credentials.refresh(None)
- app_identity_mock.get_access_token.assert_called_with(
+ app_identity.get_access_token.assert_called_with(
credentials.scopes, credentials._service_account_id)
assert credentials.token == token
assert credentials.expiry == (
- now_mock() + datetime.timedelta(seconds=ttl))
+ utcnow() + datetime.timedelta(seconds=ttl))
assert credentials.valid
assert not credentials.expired
- def test_sign_bytes(self, app_identity_mock):
- app_identity_mock.sign_blob.return_value = (
+ def test_sign_bytes(self, app_identity):
+ app_identity.sign_blob.return_value = (
mock.sentinel.key_id, mock.sentinel.signature)
credentials = app_engine.Credentials()
to_sign = b'123'
@@ -138,12 +157,12 @@
signature = credentials.sign_bytes(to_sign)
assert signature == mock.sentinel.signature
- app_identity_mock.sign_blob.assert_called_with(to_sign)
+ app_identity.sign_blob.assert_called_with(to_sign)
- def test_signer(self, app_identity_mock):
+ def test_signer(self, app_identity):
credentials = app_engine.Credentials()
assert isinstance(credentials.signer, app_engine.Signer)
- def test_signer_email(self, app_identity_mock):
+ def test_signer_email(self, app_identity):
credentials = app_engine.Credentials()
assert credentials.signer_email == credentials.service_account_email
diff --git a/tests/test_iam.py b/tests/test_iam.py
index 97fa907..cc09085 100644
--- a/tests/test_iam.py
+++ b/tests/test_iam.py
@@ -28,13 +28,15 @@
def make_request(status, data=None):
- response = mock.Mock(spec=transport.Response)
+ response = mock.create_autospec(transport.Response, instance=True)
response.status = status
if data is not None:
response.data = json.dumps(data).encode('utf-8')
- return mock.Mock(return_value=response, spec=transport.Request)
+ request = mock.create_autospec(transport.Request)
+ request.return_value = response
+ return request
def make_credentials():
@@ -54,7 +56,8 @@
class TestSigner(object):
def test_constructor(self):
request = mock.sentinel.request
- credentials = mock.Mock(spec=google.auth.credentials.Credentials)
+ credentials = mock.create_autospec(
+ google.auth.credentials.Credentials, instance=True)
signer = iam.Signer(
request, credentials, mock.sentinel.service_account_email)
diff --git a/tests/transport/test_grpc.py b/tests/transport/test_grpc.py
index a494ee5..c98dcff 100644
--- a/tests/transport/test_grpc.py
+++ b/tests/transport/test_grpc.py
@@ -19,7 +19,11 @@
from google.auth import _helpers
from google.auth import credentials
+from google.auth import transport
+
try:
+ # pylint: disable=ungrouped-imports
+ import grpc
import google.auth.transport.grpc
HAS_GRPC = True
except ImportError: # pragma: NO COVER
@@ -29,9 +33,9 @@
pytestmark = pytest.mark.skipif(not HAS_GRPC, reason='gRPC is unavailable.')
-class MockCredentials(credentials.Credentials):
+class CredentialsStub(credentials.Credentials):
def __init__(self, token='token'):
- super(MockCredentials, self).__init__()
+ super(CredentialsStub, self).__init__()
self.token = token
self.expiry = None
@@ -41,36 +45,40 @@
class TestAuthMetadataPlugin(object):
def test_call_no_refresh(self):
- credentials = MockCredentials()
- request = mock.Mock()
+ credentials = CredentialsStub()
+ request = mock.create_autospec(transport.Request)
plugin = google.auth.transport.grpc.AuthMetadataPlugin(
credentials, request)
- context = mock.Mock()
- callback = mock.Mock()
+ context = mock.create_autospec(grpc.AuthMetadataContext, instance=True)
+ context.method_name = mock.sentinel.method_name
+ context.service_url = mock.sentinel.service_url
+ callback = mock.create_autospec(grpc.AuthMetadataPluginCallback)
plugin(context, callback)
- assert callback.called_once_with(
- [('authorization', 'Bearer {}'.format(credentials.token))], None)
+ callback.assert_called_once_with(
+ [(u'authorization', u'Bearer {}'.format(credentials.token))], None)
def test_call_refresh(self):
- credentials = MockCredentials()
+ credentials = CredentialsStub()
credentials.expiry = datetime.datetime.min + _helpers.CLOCK_SKEW
- request = mock.Mock()
+ request = mock.create_autospec(transport.Request)
plugin = google.auth.transport.grpc.AuthMetadataPlugin(
credentials, request)
- context = mock.Mock()
- callback = mock.Mock()
+ context = mock.create_autospec(grpc.AuthMetadataContext, instance=True)
+ context.method_name = mock.sentinel.method_name
+ context.service_url = mock.sentinel.service_url
+ callback = mock.create_autospec(grpc.AuthMetadataPluginCallback)
plugin(context, callback)
assert credentials.token == 'token1'
- assert callback.called_once_with(
- [('authorization', 'Bearer {}'.format(credentials.token))], None)
+ callback.assert_called_once_with(
+ [(u'authorization', u'Bearer {}'.format(credentials.token))], None)
@mock.patch('grpc.composite_channel_credentials', autospec=True)
@@ -80,8 +88,8 @@
def test_secure_authorized_channel(
secure_channel, ssl_channel_credentials, metadata_call_credentials,
composite_channel_credentials):
- credentials = mock.Mock()
- request = mock.Mock()
+ credentials = CredentialsStub()
+ request = mock.create_autospec(transport.Request)
target = 'example.com:80'
channel = google.auth.transport.grpc.secure_authorized_channel(
diff --git a/tests/transport/test_requests.py b/tests/transport/test_requests.py
index 8c3a423..41dc237 100644
--- a/tests/transport/test_requests.py
+++ b/tests/transport/test_requests.py
@@ -17,6 +17,7 @@
import requests.adapters
from six.moves import http_client
+import google.auth.credentials
import google.auth.transport.requests
from tests.transport import compliance
@@ -26,18 +27,19 @@
return google.auth.transport.requests.Request()
def test_timeout(self):
- http = mock.Mock()
+ http = mock.create_autospec(requests.Session, instance=True)
request = google.auth.transport.requests.Request(http)
request(url='http://example.com', method='GET', timeout=5)
assert http.request.call_args[1]['timeout'] == 5
-class MockCredentials(object):
+class CredentialsStub(google.auth.credentials.Credentials):
def __init__(self, token='token'):
+ super(CredentialsStub, self).__init__()
self.token = token
- def apply(self, headers):
+ def apply(self, headers, token=None):
headers['authorization'] = self.token
def before_request(self, request, method, url, headers):
@@ -47,9 +49,9 @@
self.token += '1'
-class MockAdapter(requests.adapters.BaseAdapter):
+class AdapterStub(requests.adapters.BaseAdapter):
def __init__(self, responses, headers=None):
- super(MockAdapter, self).__init__()
+ super(AdapterStub, self).__init__()
self.responses = responses
self.requests = []
self.headers = headers or {}
@@ -84,44 +86,44 @@
assert authed_session.credentials == mock.sentinel.credentials
def test_request_no_refresh(self):
- mock_credentials = mock.Mock(wraps=MockCredentials())
- mock_response = make_response()
- mock_adapter = MockAdapter([mock_response])
+ credentials = mock.Mock(wraps=CredentialsStub())
+ response = make_response()
+ adapter = AdapterStub([response])
authed_session = google.auth.transport.requests.AuthorizedSession(
- mock_credentials)
- authed_session.mount(self.TEST_URL, mock_adapter)
+ credentials)
+ authed_session.mount(self.TEST_URL, adapter)
- response = authed_session.request('GET', self.TEST_URL)
+ result = authed_session.request('GET', self.TEST_URL)
- assert response == mock_response
- assert mock_credentials.before_request.called
- assert not mock_credentials.refresh.called
- assert len(mock_adapter.requests) == 1
- assert mock_adapter.requests[0].url == self.TEST_URL
- assert mock_adapter.requests[0].headers['authorization'] == 'token'
+ assert response == result
+ assert credentials.before_request.called
+ assert not credentials.refresh.called
+ assert len(adapter.requests) == 1
+ assert adapter.requests[0].url == self.TEST_URL
+ assert adapter.requests[0].headers['authorization'] == 'token'
def test_request_refresh(self):
- mock_credentials = mock.Mock(wraps=MockCredentials())
- mock_final_response = make_response(status=http_client.OK)
+ credentials = mock.Mock(wraps=CredentialsStub())
+ final_response = make_response(status=http_client.OK)
# First request will 401, second request will succeed.
- mock_adapter = MockAdapter([
+ adapter = AdapterStub([
make_response(status=http_client.UNAUTHORIZED),
- mock_final_response])
+ final_response])
authed_session = google.auth.transport.requests.AuthorizedSession(
- mock_credentials)
- authed_session.mount(self.TEST_URL, mock_adapter)
+ credentials)
+ authed_session.mount(self.TEST_URL, adapter)
- response = authed_session.request('GET', self.TEST_URL)
+ result = authed_session.request('GET', self.TEST_URL)
- assert response == mock_final_response
- assert mock_credentials.before_request.call_count == 2
- assert mock_credentials.refresh.called
- assert len(mock_adapter.requests) == 2
+ assert result == final_response
+ assert credentials.before_request.call_count == 2
+ assert credentials.refresh.called
+ assert len(adapter.requests) == 2
- assert mock_adapter.requests[0].url == self.TEST_URL
- assert mock_adapter.requests[0].headers['authorization'] == 'token'
+ assert adapter.requests[0].url == self.TEST_URL
+ assert adapter.requests[0].headers['authorization'] == 'token'
- assert mock_adapter.requests[1].url == self.TEST_URL
- assert mock_adapter.requests[1].headers['authorization'] == 'token1'
+ assert adapter.requests[1].url == self.TEST_URL
+ assert adapter.requests[1].headers['authorization'] == 'token1'
diff --git a/tests/transport/test_urllib3.py b/tests/transport/test_urllib3.py
index d35a759..4d1a5a6 100644
--- a/tests/transport/test_urllib3.py
+++ b/tests/transport/test_urllib3.py
@@ -16,6 +16,7 @@
from six.moves import http_client
import urllib3
+import google.auth.credentials
import google.auth.transport.urllib3
from tests.transport import compliance
@@ -26,7 +27,7 @@
return google.auth.transport.urllib3.Request(http)
def test_timeout(self):
- http = mock.Mock()
+ http = mock.create_autospec(urllib3.PoolManager)
request = google.auth.transport.urllib3.Request(http)
request(url='http://example.com', method='GET', timeout=5)
@@ -44,11 +45,12 @@
assert 'cert_reqs' not in http.connection_pool_kw
-class MockCredentials(object):
+class CredentialsStub(google.auth.credentials.Credentials):
def __init__(self, token='token'):
+ super(CredentialsStub, self).__init__()
self.token = token
- def apply(self, headers):
+ def apply(self, headers, token=None):
headers['authorization'] = self.token
def before_request(self, request, method, url, headers):
@@ -58,7 +60,7 @@
self.token += '1'
-class MockHttp(object):
+class HttpStub(object):
def __init__(self, responses, headers=None):
self.responses = responses
self.requests = []
@@ -69,7 +71,7 @@
return self.responses.pop(0)
-class MockResponse(object):
+class ResponseStub(object):
def __init__(self, status=http_client.OK, data=None):
self.status = status
self.data = data
@@ -86,52 +88,51 @@
assert isinstance(authed_http.http, urllib3.PoolManager)
def test_urlopen_no_refresh(self):
- mock_credentials = mock.Mock(wraps=MockCredentials())
- mock_response = MockResponse()
- mock_http = MockHttp([mock_response])
+ credentials = mock.Mock(wraps=CredentialsStub())
+ response = ResponseStub()
+ http = HttpStub([response])
authed_http = google.auth.transport.urllib3.AuthorizedHttp(
- mock_credentials, http=mock_http)
+ credentials, http=http)
- response = authed_http.urlopen('GET', self.TEST_URL)
+ result = authed_http.urlopen('GET', self.TEST_URL)
- assert response == mock_response
- assert mock_credentials.before_request.called
- assert not mock_credentials.refresh.called
- assert mock_http.requests == [
+ assert result == response
+ assert credentials.before_request.called
+ assert not credentials.refresh.called
+ assert http.requests == [
('GET', self.TEST_URL, None, {'authorization': 'token'}, {})]
def test_urlopen_refresh(self):
- mock_credentials = mock.Mock(wraps=MockCredentials())
- mock_final_response = MockResponse(status=http_client.OK)
+ credentials = mock.Mock(wraps=CredentialsStub())
+ final_response = ResponseStub(status=http_client.OK)
# First request will 401, second request will succeed.
- mock_http = MockHttp([
- MockResponse(status=http_client.UNAUTHORIZED),
- mock_final_response])
+ http = HttpStub([
+ ResponseStub(status=http_client.UNAUTHORIZED),
+ final_response])
authed_http = google.auth.transport.urllib3.AuthorizedHttp(
- mock_credentials, http=mock_http)
+ credentials, http=http)
- response = authed_http.urlopen('GET', 'http://example.com')
+ authed_http = authed_http.urlopen('GET', 'http://example.com')
- assert response == mock_final_response
- assert mock_credentials.before_request.call_count == 2
- assert mock_credentials.refresh.called
- assert mock_http.requests == [
+ assert authed_http == final_response
+ assert credentials.before_request.call_count == 2
+ assert credentials.refresh.called
+ assert http.requests == [
('GET', self.TEST_URL, None, {'authorization': 'token'}, {}),
('GET', self.TEST_URL, None, {'authorization': 'token1'}, {})]
def test_proxies(self):
- mock_http = mock.MagicMock()
-
+ http = mock.create_autospec(urllib3.PoolManager)
authed_http = google.auth.transport.urllib3.AuthorizedHttp(
- None, http=mock_http)
+ None, http=http)
with authed_http:
pass
- assert mock_http.__enter__.called
- assert mock_http.__exit__.called
+ assert http.__enter__.called
+ assert http.__exit__.called
authed_http.headers = mock.sentinel.headers
- assert authed_http.headers == mock_http.headers
+ assert authed_http.headers == http.headers