Add JWT credentials (#21)

diff --git a/tests/test_jwt.py b/tests/test_jwt.py
index 69628e5..b6c07df 100644
--- a/tests/test_jwt.py
+++ b/tests/test_jwt.py
@@ -14,8 +14,10 @@
 
 import base64
 import datetime
+import json
 import os
 
+import mock
 import pytest
 
 from google.auth import _helpers
@@ -34,6 +36,11 @@
 with open(os.path.join(DATA_DIR, 'other_cert.pem'), 'rb') as fh:
     OTHER_CERT_BYTES = fh.read()
 
+SERVICE_ACCOUNT_JSON_FILE = os.path.join(DATA_DIR, 'service_account.json')
+
+with open(SERVICE_ACCOUNT_JSON_FILE, 'r') as fh:
+    SERVICE_ACCOUNT_INFO = json.load(fh)
+
 
 @pytest.fixture
 def signer():
@@ -187,3 +194,147 @@
     certs = {'2': OTHER_CERT_BYTES, '3': PUBLIC_CERT_BYTES}
     payload = jwt.decode(token, certs)
     assert payload['user'] == 'billy bob'
+
+
+class TestCredentials:
+    SERVICE_ACCOUNT_EMAIL = 'service-account@example.com'
+    SUBJECT = 'subject'
+    AUDIENCE = 'audience'
+    ADDITIONAL_CLAIMS = {'meta': 'data'}
+    credentials = None
+
+    @pytest.fixture(autouse=True)
+    def credentials_fixture(self, signer):
+        self.credentials = jwt.Credentials(
+            signer, self.SERVICE_ACCOUNT_EMAIL)
+
+    def test_from_service_account_info(self):
+        with open(SERVICE_ACCOUNT_JSON_FILE, 'r') as fh:
+            info = json.load(fh)
+
+        credentials = jwt.Credentials.from_service_account_info(info)
+
+        assert credentials._signer.key_id == info['private_key_id']
+        assert credentials._issuer == info['client_email']
+        assert credentials._subject == info['client_email']
+
+    def test_from_service_account_info_args(self):
+        info = SERVICE_ACCOUNT_INFO.copy()
+
+        credentials = jwt.Credentials.from_service_account_info(
+            info, subject=self.SUBJECT, audience=self.AUDIENCE,
+            additional_claims=self.ADDITIONAL_CLAIMS)
+
+        assert credentials._signer.key_id == info['private_key_id']
+        assert credentials._issuer == info['client_email']
+        assert credentials._subject == self.SUBJECT
+        assert credentials._audience == self.AUDIENCE
+        assert credentials._additional_claims == self.ADDITIONAL_CLAIMS
+
+    def test_from_service_account_bad_private_key(self):
+        info = SERVICE_ACCOUNT_INFO.copy()
+        info['private_key'] = 'garbage'
+
+        with pytest.raises(ValueError) as excinfo:
+            jwt.Credentials.from_service_account_info(info)
+
+        assert excinfo.match(r'No key could be detected')
+
+    def test_from_service_account_bad_format(self):
+        with pytest.raises(ValueError):
+            jwt.Credentials.from_service_account_info({})
+
+    def test_from_service_account_file(self):
+        info = SERVICE_ACCOUNT_INFO.copy()
+
+        credentials = jwt.Credentials.from_service_account_file(
+            SERVICE_ACCOUNT_JSON_FILE)
+
+        assert credentials._signer.key_id == info['private_key_id']
+        assert credentials._issuer == info['client_email']
+        assert credentials._subject == info['client_email']
+
+    def test_from_service_account_file_args(self):
+        info = SERVICE_ACCOUNT_INFO.copy()
+
+        credentials = jwt.Credentials.from_service_account_file(
+            SERVICE_ACCOUNT_JSON_FILE, subject=self.SUBJECT,
+            audience=self.AUDIENCE, additional_claims=self.ADDITIONAL_CLAIMS)
+
+        assert credentials._signer.key_id == info['private_key_id']
+        assert credentials._issuer == info['client_email']
+        assert credentials._subject == self.SUBJECT
+        assert credentials._audience == self.AUDIENCE
+        assert credentials._additional_claims == self.ADDITIONAL_CLAIMS
+
+    def test_default_state(self):
+        assert not self.credentials.valid
+        # Expiration hasn't been set yet
+        assert not self.credentials.expired
+
+    def test_sign_bytes(self):
+        to_sign = b'123'
+        signature = self.credentials.sign_bytes(to_sign)
+        assert crypt.verify_signature(to_sign, signature, PUBLIC_CERT_BYTES)
+
+    def _verify_token(self, token):
+        payload = jwt.decode(token, PUBLIC_CERT_BYTES)
+        assert payload['iss'] == self.SERVICE_ACCOUNT_EMAIL
+        return payload
+
+    def test_refresh(self):
+        self.credentials.refresh(None)
+        assert self.credentials.valid
+        assert not self.credentials.expired
+
+    def test_expired(self):
+        assert not self.credentials.expired
+
+        self.credentials.refresh(None)
+        assert not self.credentials.expired
+
+        with mock.patch('google.auth._helpers.utcnow') as now:
+            one_day = datetime.timedelta(days=1)
+            now.return_value = self.credentials.expiry + one_day
+            assert self.credentials.expired
+
+    def test_before_request_one_time_token(self):
+        headers = {}
+
+        self.credentials.refresh(None)
+        self.credentials.before_request(
+            mock.Mock(), 'GET', 'http://example.com?a=1#3', headers)
+
+        header_value = headers['authorization']
+        _, token = header_value.split(' ')
+
+        # This should be a one-off token, so it shouldn't be the same as the
+        # credentials' stored token.
+        assert token != self.credentials.token
+
+        payload = self._verify_token(token)
+        assert payload['aud'] == 'http://example.com'
+
+    def test_before_request_with_preset_audience(self):
+        headers = {}
+
+        credentials = self.credentials.with_claims(audience=self.AUDIENCE)
+        credentials.refresh(None)
+        credentials.before_request(
+            None, 'GET', 'http://example.com?a=1#3', headers)
+
+        header_value = headers['authorization']
+        _, token = header_value.split(' ')
+
+        # Since the audience is set, it should use the existing token.
+        assert token.encode('utf-8') == credentials.token
+
+        payload = self._verify_token(token)
+        assert payload['aud'] == self.AUDIENCE
+
+    def test_before_request_refreshes(self):
+        credentials = self.credentials.with_claims(audience=self.AUDIENCE)
+        assert not credentials.valid
+        credentials.before_request(
+            None, 'GET', 'http://example.com?a=1#3', {})
+        assert credentials.valid