feat: Implement ES256 for JWT verification (#340)
feat: Implement EC256 for JWT verification
diff --git a/google/auth/jwt.py b/google/auth/jwt.py
index cdd69ac..9248eb2 100644
--- a/google/auth/jwt.py
+++ b/google/auth/jwt.py
@@ -59,8 +59,18 @@
from google.auth import exceptions
import google.auth.credentials
+try:
+ from google.auth.crypt import es256
+except ImportError: # pragma: NO COVER
+ es256 = None
+
_DEFAULT_TOKEN_LIFETIME_SECS = 3600 # 1 hour in seconds
_DEFAULT_MAX_CACHE_SIZE = 10
+_ALGORITHM_TO_VERIFIER_CLASS = {"RS256": crypt.RSAVerifier}
+_CRYPTOGRAPHY_BASED_ALGORITHMS = set(["ES256"])
+
+if es256 is not None: # pragma: NO COVER
+ _ALGORITHM_TO_VERIFIER_CLASS["ES256"] = es256.ES256Verifier
def encode(signer, payload, header=None, key_id=None):
@@ -83,7 +93,12 @@
if key_id is None:
key_id = signer.key_id
- header.update({"typ": "JWT", "alg": "RS256"})
+ header.update({"typ": "JWT"})
+
+ if es256 is not None and isinstance(signer, es256.ES256Signer):
+ header.update({"alg": "ES256"})
+ else:
+ header.update({"alg": "RS256"})
if key_id is not None:
header["kid"] = key_id
@@ -217,10 +232,30 @@
if not verify:
return payload
+ # Pluck the key id and algorithm from the header and make sure we have
+ # a verifier that can support it.
+ key_alg = header.get("alg")
+ key_id = header.get("kid")
+
+ try:
+ verifier_cls = _ALGORITHM_TO_VERIFIER_CLASS[key_alg]
+ except KeyError as exc:
+ if key_alg in _CRYPTOGRAPHY_BASED_ALGORITHMS:
+ six.raise_from(
+ ValueError(
+ "The key algorithm {} requires the cryptography package "
+ "to be installed.".format(key_alg)
+ ),
+ exc,
+ )
+ else:
+ six.raise_from(
+ ValueError("Unsupported signature algorithm {}".format(key_alg)), exc
+ )
+
# If certs is specified as a dictionary of key IDs to certificates, then
# use the certificate identified by the key ID in the token header.
if isinstance(certs, Mapping):
- key_id = header.get("kid")
if key_id:
if key_id not in certs:
raise ValueError("Certificate for key id {} not found.".format(key_id))
@@ -232,7 +267,9 @@
certs_to_check = certs
# Verify that the signature matches the message.
- if not crypt.verify_signature(signed_section, signature, certs_to_check):
+ if not crypt.verify_signature(
+ signed_section, signature, certs_to_check, verifier_cls
+ ):
raise ValueError("Could not verify token signature.")
# Verify the issued at and created times in the payload.