Create abstract Verifier and Signer, remove key_id hack from App Engine and IAM signers (#115)
diff --git a/google/auth/_service_account_info.py b/google/auth/_service_account_info.py
index 9891011..dd39ea7 100644
--- a/google/auth/_service_account_info.py
+++ b/google/auth/_service_account_info.py
@@ -51,7 +51,7 @@
'fields {}.'.format(', '.join(missing)))
# Create a signer.
- signer = crypt.Signer.from_service_account_info(data)
+ signer = crypt.RSASigner.from_service_account_info(data)
return signer
diff --git a/google/auth/app_engine.py b/google/auth/app_engine.py
index e6e84d2..6dc8712 100644
--- a/google/auth/app_engine.py
+++ b/google/auth/app_engine.py
@@ -26,6 +26,7 @@
from google.auth import _helpers
from google.auth import credentials
+from google.auth import crypt
try:
from google.appengine.api import app_identity
@@ -33,42 +34,25 @@
app_identity = None
-class Signer(object):
+class Signer(crypt.Signer):
"""Signs messages using the App Engine App Identity service.
This can be used in place of :class:`google.auth.crypt.Signer` when
running in the App Engine standard environment.
-
- .. warning::
- The App Identity service signs bytes using Google-managed keys.
- Because of this it's possible that the key used to sign bytes will
- change. In some cases this change can occur between successive calls
- to :attr:`key_id` and :meth:`sign`. This could result in a signature
- that was signed with a different key than the one indicated by
- :attr:`key_id`. It's recommended that if you use this in your code
- that you account for this behavior by building in retry logic.
"""
@property
def key_id(self):
"""Optional[str]: The key ID used to identify this private key.
- .. note::
- This makes a request to the App Identity service.
+ .. warning::
+ This is always ``None``. The key ID used by App Engine can not
+ be reliably determined ahead of time.
"""
- key_id, _ = app_identity.sign_blob(b'')
- return key_id
+ return None
- @staticmethod
- def sign(message):
- """Signs a message.
-
- Args:
- message (Union[str, bytes]): The message to be signed.
-
- Returns:
- bytes: The signature of the message.
- """
+ @_helpers.copy_docstring(crypt.Signer)
+ def sign(self, message):
message = _helpers.to_bytes(message)
_, signature = app_identity.sign_blob(message)
return signature
diff --git a/google/auth/crypt.py b/google/auth/crypt.py
index 05839b4..65bf37f 100644
--- a/google/auth/crypt.py
+++ b/google/auth/crypt.py
@@ -24,20 +24,21 @@
valid = crypt.verify_signature(message, signature, cert)
If you're going to verify many messages with the same certificate, you can use
-:class:`Verifier`::
+:class:`RSAVerifier`::
cert = open('certs.pem').read()
- verifier = crypt.Verifier.from_string(cert)
+ verifier = crypt.RSAVerifier.from_string(cert)
valid = verifier.verify(message, signature)
-To sign messages use :class:`Signer` with a private key::
+To sign messages use :class:`RSASigner` with a private key::
private_key = open('private_key.pem').read()
- signer = crypt.Signer(private_key)
+ signer = crypt.RSASigner(private_key)
signature = signer.sign(message)
"""
+import abc
import io
import json
@@ -77,23 +78,17 @@
byte_vals = bytearray()
for start in six.moves.xrange(0, num_bits, 8):
curr_bits = bit_list[start:start + 8]
- char_val = sum(val * digit
- for val, digit in six.moves.zip(_POW2, curr_bits))
+ char_val = sum(
+ val * digit for val, digit in six.moves.zip(_POW2, curr_bits))
byte_vals.append(char_val)
return bytes(byte_vals)
+@six.add_metaclass(abc.ABCMeta)
class Verifier(object):
- """This object is used to verify cryptographic signatures.
+ """Abstract base class for crytographic signature verifiers."""
- Args:
- public_key (rsa.key.PublicKey): The public key used to verify
- signatures.
- """
-
- def __init__(self, public_key):
- self._pubkey = public_key
-
+ @abc.abstractmethod
def verify(self, message, signature):
"""Verifies a message against a cryptographic signature.
@@ -105,6 +100,24 @@
bool: True if message was signed by the private key associated
with the public key that this object was constructed with.
"""
+ # pylint: disable=missing-raises-doc,redundant-returns-doc
+ # (pylint doesn't recognize that this is abstract)
+ raise NotImplementedError('Verify must be implemented')
+
+
+class RSAVerifier(Verifier):
+ """Verifies RSA cryptographic signatures using public keys.
+
+ Args:
+ public_key (rsa.key.PublicKey): The public key used to verify
+ signatures.
+ """
+
+ def __init__(self, public_key):
+ self._pubkey = public_key
+
+ @_helpers.copy_docstring(Verifier)
+ def verify(self, message, signature):
message = _helpers.to_bytes(message)
try:
return rsa.pkcs1.verify(message, signature, self._pubkey)
@@ -145,7 +158,7 @@
def verify_signature(message, signature, certs):
- """Verify a cryptographic signature.
+ """Verify an RSA cryptographic signature.
Checks that the provided ``signature`` was generated from ``bytes`` using
the private key associated with the ``cert``.
@@ -163,14 +176,38 @@
certs = [certs]
for cert in certs:
- verifier = Verifier.from_string(cert)
+ verifier = RSAVerifier.from_string(cert)
if verifier.verify(message, signature):
return True
return False
+@six.add_metaclass(abc.ABCMeta)
class Signer(object):
- """Signs messages with a private key.
+ """Abstract base class for cryptographic signers."""
+
+ @abc.abstractproperty
+ def key_id(self):
+ """Optional[str]: The key ID used to identify this private key."""
+ raise NotImplementedError('Key id must be implemented')
+
+ @abc.abstractmethod
+ def sign(self, message):
+ """Signs a message.
+
+ Args:
+ message (Union[str, bytes]): The message to be signed.
+
+ Returns:
+ bytes: The signature of the message.
+ """
+ # pylint: disable=missing-raises-doc,redundant-returns-doc
+ # (pylint doesn't recognize that this is abstract)
+ raise NotImplementedError('Sign must be implemented')
+
+
+class RSASigner(Signer):
+ """Signs messages with an RSA private key.
Args:
private_key (rsa.key.PrivateKey): The private key to sign with.
@@ -181,18 +218,15 @@
def __init__(self, private_key, key_id=None):
self._key = private_key
- self.key_id = key_id
- """Optional[str]: The key ID used to identify this private key."""
+ self._key_id = key_id
+ @property
+ @_helpers.copy_docstring(Signer)
+ def key_id(self):
+ return self._key_id
+
+ @_helpers.copy_docstring(Signer)
def sign(self, message):
- """Signs a message.
-
- Args:
- message (Union[str, bytes]): The message to be signed.
-
- Returns:
- bytes: The signature of the message.
- """
message = _helpers.to_bytes(message)
return rsa.pkcs1.sign(message, self._key, 'SHA-256')
diff --git a/google/auth/iam.py b/google/auth/iam.py
index efa3127..e091e47 100644
--- a/google/auth/iam.py
+++ b/google/auth/iam.py
@@ -25,6 +25,7 @@
from six.moves import http_client
from google.auth import _helpers
+from google.auth import crypt
from google.auth import exceptions
_IAM_API_ROOT_URI = 'https://iam.googleapis.com/v1'
@@ -32,21 +33,12 @@
_IAM_API_ROOT_URI + '/projects/-/serviceAccounts/{}:signBlob?alt=json')
-class Signer(object):
+class Signer(crypt.Signer):
"""Signs messages using the IAM `signBlob API`_.
This is useful when you need to sign bytes but do not have access to the
credential's private key file.
- .. warning::
- The IAM API signs bytes using Google-managed keys. Because of this
- it's possible that the key used to sign bytes will change. In some
- cases this change can occur between successive calls to :attr:`key_id`
- and :meth:`sign`. This could result in a signature that was signed
- with a different key than the one indicated by :attr:`key_id`. It's
- recommended that if you use this in your code that you account for
- this behavior by building in retry logic.
-
.. _signBlob API:
https://cloud.google.com/iam/reference/rest/v1/projects.serviceAccounts
/signBlob
@@ -98,20 +90,13 @@
def key_id(self):
"""Optional[str]: The key ID used to identify this private key.
- .. note::
- This makes an API request to the IAM API.
+ .. warning::
+ This is always ``None``. The key ID used by IAM can not
+ be reliably determined ahead of time.
"""
- response = self._make_signing_request('')
- return response['keyId']
+ return None
+ @_helpers.copy_docstring(crypt.Signer)
def sign(self, message):
- """Signs a message.
-
- Args:
- message (Union[str, bytes]): The message to be signed.
-
- Returns:
- bytes: The signature of the message.
- """
response = self._make_signing_request(message)
return base64.b64decode(response['signature'])
diff --git a/tests/oauth2/test_service_account.py b/tests/oauth2/test_service_account.py
index f40ebf2..8ae545a 100644
--- a/tests/oauth2/test_service_account.py
+++ b/tests/oauth2/test_service_account.py
@@ -44,7 +44,7 @@
@pytest.fixture(scope='module')
def signer():
- return crypt.Signer.from_string(PRIVATE_KEY_BYTES, '1')
+ return crypt.RSASigner.from_string(PRIVATE_KEY_BYTES, '1')
class TestCredentials(object):
diff --git a/tests/test__service_account_info.py b/tests/test__service_account_info.py
index 4caea95..5466865 100644
--- a/tests/test__service_account_info.py
+++ b/tests/test__service_account_info.py
@@ -31,7 +31,7 @@
def test_from_dict():
signer = _service_account_info.from_dict(SERVICE_ACCOUNT_INFO)
- assert isinstance(signer, crypt.Signer)
+ assert isinstance(signer, crypt.RSASigner)
assert signer.key_id == SERVICE_ACCOUNT_INFO['private_key_id']
@@ -59,5 +59,5 @@
for key, value in six.iteritems(SERVICE_ACCOUNT_INFO):
assert info[key] == value
- assert isinstance(signer, crypt.Signer)
+ assert isinstance(signer, crypt.RSASigner)
assert signer.key_id == SERVICE_ACCOUNT_INFO['private_key_id']
diff --git a/tests/test_app_engine.py b/tests/test_app_engine.py
index af60bcf..d3a79d5 100644
--- a/tests/test_app_engine.py
+++ b/tests/test_app_engine.py
@@ -48,7 +48,7 @@
signer = app_engine.Signer()
- assert signer.key_id == mock.sentinel.key_id
+ assert signer.key_id is None
def test_sign(self, app_identity_mock):
app_identity_mock.sign_blob.return_value = (
diff --git a/tests/test_crypt.py b/tests/test_crypt.py
index 9671230..56612da 100644
--- a/tests/test_crypt.py
+++ b/tests/test_crypt.py
@@ -69,7 +69,7 @@
def test_verify_signature():
to_sign = b'foo'
- signer = crypt.Signer.from_string(PRIVATE_KEY_BYTES)
+ signer = crypt.RSASigner.from_string(PRIVATE_KEY_BYTES)
signature = signer.sign(to_sign)
assert crypt.verify_signature(
@@ -82,57 +82,57 @@
def test_verify_signature_failure():
to_sign = b'foo'
- signer = crypt.Signer.from_string(PRIVATE_KEY_BYTES)
+ signer = crypt.RSASigner.from_string(PRIVATE_KEY_BYTES)
signature = signer.sign(to_sign)
assert not crypt.verify_signature(
to_sign, signature, OTHER_CERT_BYTES)
-class TestVerifier(object):
+class TestRSAVerifier(object):
def test_verify_success(self):
to_sign = b'foo'
- signer = crypt.Signer.from_string(PRIVATE_KEY_BYTES)
+ signer = crypt.RSASigner.from_string(PRIVATE_KEY_BYTES)
actual_signature = signer.sign(to_sign)
- verifier = crypt.Verifier.from_string(PUBLIC_KEY_BYTES)
+ verifier = crypt.RSAVerifier.from_string(PUBLIC_KEY_BYTES)
assert verifier.verify(to_sign, actual_signature)
def test_verify_unicode_success(self):
to_sign = u'foo'
- signer = crypt.Signer.from_string(PRIVATE_KEY_BYTES)
+ signer = crypt.RSASigner.from_string(PRIVATE_KEY_BYTES)
actual_signature = signer.sign(to_sign)
- verifier = crypt.Verifier.from_string(PUBLIC_KEY_BYTES)
+ verifier = crypt.RSAVerifier.from_string(PUBLIC_KEY_BYTES)
assert verifier.verify(to_sign, actual_signature)
def test_verify_failure(self):
- verifier = crypt.Verifier.from_string(PUBLIC_KEY_BYTES)
+ verifier = crypt.RSAVerifier.from_string(PUBLIC_KEY_BYTES)
bad_signature1 = b''
assert not verifier.verify(b'foo', bad_signature1)
bad_signature2 = b'a'
assert not verifier.verify(b'foo', bad_signature2)
def test_from_string_pub_key(self):
- verifier = crypt.Verifier.from_string(PUBLIC_KEY_BYTES)
- assert isinstance(verifier, crypt.Verifier)
+ verifier = crypt.RSAVerifier.from_string(PUBLIC_KEY_BYTES)
+ assert isinstance(verifier, crypt.RSAVerifier)
assert isinstance(verifier._pubkey, rsa.key.PublicKey)
def test_from_string_pub_key_unicode(self):
public_key = _helpers.from_bytes(PUBLIC_KEY_BYTES)
- verifier = crypt.Verifier.from_string(public_key)
- assert isinstance(verifier, crypt.Verifier)
+ verifier = crypt.RSAVerifier.from_string(public_key)
+ assert isinstance(verifier, crypt.RSAVerifier)
assert isinstance(verifier._pubkey, rsa.key.PublicKey)
def test_from_string_pub_cert(self):
- verifier = crypt.Verifier.from_string(PUBLIC_CERT_BYTES)
- assert isinstance(verifier, crypt.Verifier)
+ verifier = crypt.RSAVerifier.from_string(PUBLIC_CERT_BYTES)
+ assert isinstance(verifier, crypt.RSAVerifier)
assert isinstance(verifier._pubkey, rsa.key.PublicKey)
def test_from_string_pub_cert_unicode(self):
public_cert = _helpers.from_bytes(PUBLIC_CERT_BYTES)
- verifier = crypt.Verifier.from_string(public_cert)
- assert isinstance(verifier, crypt.Verifier)
+ verifier = crypt.RSAVerifier.from_string(public_cert)
+ assert isinstance(verifier, crypt.RSAVerifier)
assert isinstance(verifier._pubkey, rsa.key.PublicKey)
def test_from_string_pub_cert_failure(self):
@@ -144,25 +144,25 @@
with load_pem_patch as load_pem:
with pytest.raises(ValueError):
- crypt.Verifier.from_string(cert_bytes)
+ crypt.RSAVerifier.from_string(cert_bytes)
load_pem.assert_called_once_with(cert_bytes, 'CERTIFICATE')
-class TestSigner(object):
+class TestRSASigner(object):
def test_from_string_pkcs1(self):
- signer = crypt.Signer.from_string(PKCS1_KEY_BYTES)
- assert isinstance(signer, crypt.Signer)
+ signer = crypt.RSASigner.from_string(PKCS1_KEY_BYTES)
+ assert isinstance(signer, crypt.RSASigner)
assert isinstance(signer._key, rsa.key.PrivateKey)
def test_from_string_pkcs1_unicode(self):
key_bytes = _helpers.from_bytes(PKCS1_KEY_BYTES)
- signer = crypt.Signer.from_string(key_bytes)
- assert isinstance(signer, crypt.Signer)
+ signer = crypt.RSASigner.from_string(key_bytes)
+ assert isinstance(signer, crypt.RSASigner)
assert isinstance(signer._key, rsa.key.PrivateKey)
def test_from_string_pkcs8(self):
- signer = crypt.Signer.from_string(PKCS8_KEY_BYTES)
- assert isinstance(signer, crypt.Signer)
+ signer = crypt.RSASigner.from_string(PKCS8_KEY_BYTES)
+ assert isinstance(signer, crypt.RSASigner)
assert isinstance(signer._key, rsa.key.PrivateKey)
def test_from_string_pkcs8_extra_bytes(self):
@@ -179,28 +179,29 @@
with decode_patch as decode:
with pytest.raises(ValueError):
- crypt.Signer.from_string(key_bytes)
+ crypt.RSASigner.from_string(key_bytes)
# Verify mock was called.
decode.assert_called_once_with(
pem_bytes, asn1Spec=crypt._PKCS8_SPEC)
def test_from_string_pkcs8_unicode(self):
key_bytes = _helpers.from_bytes(PKCS8_KEY_BYTES)
- signer = crypt.Signer.from_string(key_bytes)
- assert isinstance(signer, crypt.Signer)
+ signer = crypt.RSASigner.from_string(key_bytes)
+ assert isinstance(signer, crypt.RSASigner)
assert isinstance(signer._key, rsa.key.PrivateKey)
def test_from_string_pkcs12(self):
with pytest.raises(ValueError):
- crypt.Signer.from_string(PKCS12_KEY_BYTES)
+ crypt.RSASigner.from_string(PKCS12_KEY_BYTES)
def test_from_string_bogus_key(self):
key_bytes = 'bogus-key'
with pytest.raises(ValueError):
- crypt.Signer.from_string(key_bytes)
+ crypt.RSASigner.from_string(key_bytes)
def test_from_service_account_info(self):
- signer = crypt.Signer.from_service_account_info(SERVICE_ACCOUNT_INFO)
+ signer = crypt.RSASigner.from_service_account_info(
+ SERVICE_ACCOUNT_INFO)
assert signer.key_id == SERVICE_ACCOUNT_INFO[
crypt._JSON_FILE_PRIVATE_KEY_ID]
@@ -208,12 +209,12 @@
def test_from_service_account_info_missing_key(self):
with pytest.raises(ValueError) as excinfo:
- crypt.Signer.from_service_account_info({})
+ crypt.RSASigner.from_service_account_info({})
assert excinfo.match(crypt._JSON_FILE_PRIVATE_KEY)
def test_from_service_account_file(self):
- signer = crypt.Signer.from_service_account_file(
+ signer = crypt.RSASigner.from_service_account_file(
SERVICE_ACCOUNT_JSON_FILE)
assert signer.key_id == SERVICE_ACCOUNT_INFO[
diff --git a/tests/test_iam.py b/tests/test_iam.py
index 5ac9911..f776788 100644
--- a/tests/test_iam.py
+++ b/tests/test_iam.py
@@ -64,16 +64,12 @@
mock.sentinel.service_account_email)
def test_key_id(self):
- key_id = '123'
- request = make_request(http_client.OK, data={'keyId': key_id})
- credentials = make_credentials()
-
signer = iam.Signer(
- request, credentials, mock.sentinel.service_account_email)
+ mock.sentinel.request,
+ mock.sentinel.credentials,
+ mock.sentinel.service_account_email)
- assert signer.key_id == '123'
- auth_header = request.call_args[1]['headers']['authorization']
- assert auth_header == 'Bearer token'
+ assert signer.key_id is None
def test_sign_bytes(self):
signature = b'DEADBEEF'
diff --git a/tests/test_jwt.py b/tests/test_jwt.py
index e4a9a0a..df09ece 100644
--- a/tests/test_jwt.py
+++ b/tests/test_jwt.py
@@ -44,7 +44,7 @@
@pytest.fixture
def signer():
- return crypt.Signer.from_string(PRIVATE_KEY_BYTES, '1')
+ return crypt.RSASigner.from_string(PRIVATE_KEY_BYTES, '1')
def test_encode_basic(signer):
@@ -78,7 +78,7 @@
# False is specified to remove the signer's key id for testing
# headers without key ids.
if key_id is False:
- signer.key_id = None
+ signer._key_id = None
key_id = None
return jwt.encode(signer, payload, key_id=key_id)
@@ -265,7 +265,7 @@
assert crypt.verify_signature(to_sign, signature, PUBLIC_CERT_BYTES)
def test_signer(self):
- assert isinstance(self.credentials.signer, crypt.Signer)
+ assert isinstance(self.credentials.signer, crypt.RSASigner)
def test_signer_email(self):
assert (self.credentials.signer_email ==