Add support for imersonated_credentials.Sign, IDToken (#348)
diff --git a/tests/test_impersonated_credentials.py b/tests/test_impersonated_credentials.py
index 68a2af8..9945401 100644
--- a/tests/test_impersonated_credentials.py
+++ b/tests/test_impersonated_credentials.py
@@ -35,6 +35,14 @@
SERVICE_ACCOUNT_JSON_FILE = os.path.join(DATA_DIR, 'service_account.json')
+ID_TOKEN_DATA = ('eyJhbGciOiJSUzI1NiIsImtpZCI6ImRmMzc1ODkwOGI3OTIyOTNhZDk3N2Ew'
+ 'Yjk5MWQ5OGE3N2Y0ZWVlY2QiLCJ0eXAiOiJKV1QifQ.eyJhdWQiOiJodHRwc'
+ 'zovL2Zvby5iYXIiLCJhenAiOiIxMDIxMDE1NTA4MzQyMDA3MDg1NjgiLCJle'
+ 'HAiOjE1NjQ0NzUwNTEsImlhdCI6MTU2NDQ3MTQ1MSwiaXNzIjoiaHR0cHM6L'
+ 'y9hY2NvdW50cy5nb29nbGUuY29tIiwic3ViIjoiMTAyMTAxNTUwODM0MjAwN'
+ 'zA4NTY4In0.redacted')
+ID_TOKEN_EXPIRY = 1564475051
+
with open(SERVICE_ACCOUNT_JSON_FILE, 'r') as fh:
SERVICE_ACCOUNT_INFO = json.load(fh)
@@ -52,6 +60,38 @@
yield grant
+class MockResponse:
+ def __init__(self, json_data, status_code):
+ self.json_data = json_data
+ self.status_code = status_code
+
+ def json(self):
+ return self.json_data
+
+
+@pytest.fixture
+def mock_authorizedsession_sign():
+ with mock.patch('google.auth.transport.requests.AuthorizedSession.request',
+ autospec=True) as auth_session:
+ data = {
+ "keyId": "1",
+ "signedBlob": "c2lnbmF0dXJl"
+ }
+ auth_session.return_value = MockResponse(data, http_client.OK)
+ yield auth_session
+
+
+@pytest.fixture
+def mock_authorizedsession_idtoken():
+ with mock.patch('google.auth.transport.requests.AuthorizedSession.request',
+ autospec=True) as auth_session:
+ data = {
+ "token": ID_TOKEN_DATA
+ }
+ auth_session.return_value = MockResponse(data, http_client.OK)
+ yield auth_session
+
+
class TestImpersonatedCredentials(object):
SERVICE_ACCOUNT_EMAIL = 'service-account@example.com'
@@ -62,10 +102,12 @@
SOURCE_CREDENTIALS = service_account.Credentials(
SIGNER, SERVICE_ACCOUNT_EMAIL, TOKEN_URI)
- def make_credentials(self, lifetime=LIFETIME):
+ def make_credentials(self, lifetime=LIFETIME,
+ target_principal=TARGET_PRINCIPAL):
+
return Credentials(
source_credentials=self.SOURCE_CREDENTIALS,
- target_principal=self.TARGET_PRINCIPAL,
+ target_principal=target_principal,
target_scopes=self.TARGET_SCOPES,
delegates=self.DELEGATES,
lifetime=lifetime)
@@ -176,3 +218,180 @@
def test_expired(self):
credentials = self.make_credentials(lifetime=None)
assert credentials.expired
+
+ def test_signer(self):
+ credentials = self.make_credentials()
+ assert isinstance(credentials.signer,
+ impersonated_credentials.Credentials)
+
+ def test_signer_email(self):
+ credentials = self.make_credentials(
+ target_principal=self.TARGET_PRINCIPAL)
+ assert credentials.signer_email == self.TARGET_PRINCIPAL
+
+ def test_service_account_email(self):
+ credentials = self.make_credentials(
+ target_principal=self.TARGET_PRINCIPAL)
+ assert credentials.service_account_email == self.TARGET_PRINCIPAL
+
+ def test_sign_bytes(self, mock_donor_credentials,
+ mock_authorizedsession_sign):
+ credentials = self.make_credentials(lifetime=None)
+ token = 'token'
+
+ expire_time = (
+ _helpers.utcnow().replace(microsecond=0) +
+ datetime.timedelta(seconds=500)).isoformat('T') + 'Z'
+ token_response_body = {
+ "accessToken": token,
+ "expireTime": expire_time
+ }
+
+ response = mock.create_autospec(transport.Response, instance=False)
+ response.status = http_client.OK
+ response.data = _helpers.to_bytes(json.dumps(token_response_body))
+
+ request = mock.create_autospec(transport.Request, instance=False)
+ request.return_value = response
+
+ credentials.refresh(request)
+
+ assert credentials.valid
+ assert not credentials.expired
+
+ signature = credentials.sign_bytes(b'signed bytes')
+ assert signature == b'signature'
+
+ def test_id_token_success(self, mock_donor_credentials,
+ mock_authorizedsession_idtoken):
+ credentials = self.make_credentials(lifetime=None)
+ token = 'token'
+ target_audience = 'https://foo.bar'
+
+ expire_time = (
+ _helpers.utcnow().replace(microsecond=0) +
+ datetime.timedelta(seconds=500)).isoformat('T') + 'Z'
+ response_body = {
+ "accessToken": token,
+ "expireTime": expire_time
+ }
+
+ request = self.make_request(
+ data=json.dumps(response_body),
+ status=http_client.OK)
+
+ credentials.refresh(request)
+
+ assert credentials.valid
+ assert not credentials.expired
+
+ id_creds = impersonated_credentials.IDTokenCredentials(
+ credentials, target_audience=target_audience)
+ id_creds.refresh(request)
+
+ assert id_creds.token == ID_TOKEN_DATA
+ assert id_creds.expiry == datetime.datetime.fromtimestamp(
+ ID_TOKEN_EXPIRY)
+
+ def test_id_token_from_credential(self, mock_donor_credentials,
+ mock_authorizedsession_idtoken):
+ credentials = self.make_credentials(lifetime=None)
+ token = 'token'
+ target_audience = 'https://foo.bar'
+
+ expire_time = (
+ _helpers.utcnow().replace(microsecond=0) +
+ datetime.timedelta(seconds=500)).isoformat('T') + 'Z'
+ response_body = {
+ "accessToken": token,
+ "expireTime": expire_time
+ }
+
+ request = self.make_request(
+ data=json.dumps(response_body),
+ status=http_client.OK)
+
+ credentials.refresh(request)
+
+ assert credentials.valid
+ assert not credentials.expired
+
+ id_creds = impersonated_credentials.IDTokenCredentials(
+ credentials, target_audience=target_audience)
+ id_creds = id_creds.from_credentials(target_credentials=credentials)
+ id_creds.refresh(request)
+
+ assert id_creds.token == ID_TOKEN_DATA
+
+ def test_id_token_with_target_audience(self, mock_donor_credentials,
+ mock_authorizedsession_idtoken):
+ credentials = self.make_credentials(lifetime=None)
+ token = 'token'
+ target_audience = 'https://foo.bar'
+
+ expire_time = (
+ _helpers.utcnow().replace(microsecond=0) +
+ datetime.timedelta(seconds=500)).isoformat('T') + 'Z'
+ response_body = {
+ "accessToken": token,
+ "expireTime": expire_time
+ }
+
+ request = self.make_request(
+ data=json.dumps(response_body),
+ status=http_client.OK)
+
+ credentials.refresh(request)
+
+ assert credentials.valid
+ assert not credentials.expired
+
+ id_creds = impersonated_credentials.IDTokenCredentials(
+ credentials)
+ id_creds = id_creds.with_target_audience(
+ target_audience=target_audience)
+ id_creds.refresh(request)
+
+ assert id_creds.token == ID_TOKEN_DATA
+ assert id_creds.expiry == datetime.datetime.fromtimestamp(
+ ID_TOKEN_EXPIRY)
+
+ def test_id_token_invalid_cred(self, mock_donor_credentials,
+ mock_authorizedsession_idtoken):
+ credentials = None
+
+ with pytest.raises(exceptions.GoogleAuthError) as excinfo:
+ impersonated_credentials.IDTokenCredentials(credentials)
+
+ assert excinfo.match('Provided Credential must be'
+ ' impersonated_credentials')
+
+ def test_id_token_with_include_email(self, mock_donor_credentials,
+ mock_authorizedsession_idtoken):
+ credentials = self.make_credentials(lifetime=None)
+ token = 'token'
+ target_audience = 'https://foo.bar'
+
+ expire_time = (
+ _helpers.utcnow().replace(microsecond=0) +
+ datetime.timedelta(seconds=500)).isoformat('T') + 'Z'
+ response_body = {
+ "accessToken": token,
+ "expireTime": expire_time
+ }
+
+ request = self.make_request(
+ data=json.dumps(response_body),
+ status=http_client.OK)
+
+ credentials.refresh(request)
+
+ assert credentials.valid
+ assert not credentials.expired
+
+ id_creds = impersonated_credentials.IDTokenCredentials(
+ credentials, target_audience=target_audience)
+ id_creds = id_creds.with_include_email(True)
+ id_creds.refresh(request)
+
+ assert id_creds.token == ID_TOKEN_DATA