feat: workload identity federation support (#686)

Using workload identity federation, applications can access Google Cloud resources from Amazon Web Services (AWS), Microsoft Azure or any identity provider that supports OpenID Connect (OIDC). Workload identity federation is recommended for non-Google Cloud environments as it avoids the need to download, manage and store service account private keys locally.
diff --git a/tests/test__default.py b/tests/test__default.py
index 74511f9..ef6cb78 100644
--- a/tests/test__default.py
+++ b/tests/test__default.py
@@ -20,10 +20,13 @@
 
 from google.auth import _default
 from google.auth import app_engine
+from google.auth import aws
 from google.auth import compute_engine
 from google.auth import credentials
 from google.auth import environment_vars
 from google.auth import exceptions
+from google.auth import external_account
+from google.auth import identity_pool
 from google.oauth2 import service_account
 import google.oauth2.credentials
 
@@ -49,6 +52,34 @@
 with open(SERVICE_ACCOUNT_FILE) as fh:
     SERVICE_ACCOUNT_FILE_DATA = json.load(fh)
 
+SUBJECT_TOKEN_TEXT_FILE = os.path.join(DATA_DIR, "external_subject_token.txt")
+TOKEN_URL = "https://sts.googleapis.com/v1/token"
+AUDIENCE = "//iam.googleapis.com/projects/123456/locations/global/workloadIdentityPools/POOL_ID/providers/PROVIDER_ID"
+REGION_URL = "http://169.254.169.254/latest/meta-data/placement/availability-zone"
+SECURITY_CREDS_URL = "http://169.254.169.254/latest/meta-data/iam/security-credentials"
+CRED_VERIFICATION_URL = (
+    "https://sts.{region}.amazonaws.com?Action=GetCallerIdentity&Version=2011-06-15"
+)
+IDENTITY_POOL_DATA = {
+    "type": "external_account",
+    "audience": AUDIENCE,
+    "subject_token_type": "urn:ietf:params:oauth:token-type:jwt",
+    "token_url": TOKEN_URL,
+    "credential_source": {"file": SUBJECT_TOKEN_TEXT_FILE},
+}
+AWS_DATA = {
+    "type": "external_account",
+    "audience": AUDIENCE,
+    "subject_token_type": "urn:ietf:params:aws:token-type:aws4_request",
+    "token_url": TOKEN_URL,
+    "credential_source": {
+        "environment_id": "aws1",
+        "region_url": REGION_URL,
+        "url": SECURITY_CREDS_URL,
+        "regional_cred_verification_url": CRED_VERIFICATION_URL,
+    },
+}
+
 MOCK_CREDENTIALS = mock.Mock(spec=credentials.CredentialsWithQuotaProject)
 MOCK_CREDENTIALS.with_quota_project.return_value = MOCK_CREDENTIALS
 
@@ -57,6 +88,12 @@
     return_value=(MOCK_CREDENTIALS, mock.sentinel.project_id),
     autospec=True,
 )
+EXTERNAL_ACCOUNT_GET_PROJECT_ID_PATCH = mock.patch.object(
+    external_account.Credentials,
+    "get_project_id",
+    return_value=mock.sentinel.project_id,
+    autospec=True,
+)
 
 
 def test_load_credentials_from_missing_file():
@@ -185,6 +222,92 @@
     assert excinfo.match(r"missing fields")
 
 
+@EXTERNAL_ACCOUNT_GET_PROJECT_ID_PATCH
+def test_load_credentials_from_file_external_account_identity_pool(
+    get_project_id, tmpdir
+):
+    config_file = tmpdir.join("config.json")
+    config_file.write(json.dumps(IDENTITY_POOL_DATA))
+    credentials, project_id = _default.load_credentials_from_file(str(config_file))
+
+    assert isinstance(credentials, identity_pool.Credentials)
+    assert project_id is mock.sentinel.project_id
+    assert get_project_id.called
+
+
+@EXTERNAL_ACCOUNT_GET_PROJECT_ID_PATCH
+def test_load_credentials_from_file_external_account_aws(get_project_id, tmpdir):
+    config_file = tmpdir.join("config.json")
+    config_file.write(json.dumps(AWS_DATA))
+    credentials, project_id = _default.load_credentials_from_file(str(config_file))
+
+    assert isinstance(credentials, aws.Credentials)
+    assert project_id is mock.sentinel.project_id
+    assert get_project_id.called
+
+
+@EXTERNAL_ACCOUNT_GET_PROJECT_ID_PATCH
+def test_load_credentials_from_file_external_account_with_user_and_default_scopes(
+    get_project_id, tmpdir
+):
+    config_file = tmpdir.join("config.json")
+    config_file.write(json.dumps(IDENTITY_POOL_DATA))
+    credentials, project_id = _default.load_credentials_from_file(
+        str(config_file),
+        scopes=["https://www.google.com/calendar/feeds"],
+        default_scopes=["https://www.googleapis.com/auth/cloud-platform"],
+    )
+
+    assert isinstance(credentials, identity_pool.Credentials)
+    assert project_id is mock.sentinel.project_id
+    assert credentials.scopes == ["https://www.google.com/calendar/feeds"]
+    assert credentials.default_scopes == [
+        "https://www.googleapis.com/auth/cloud-platform"
+    ]
+
+
+@EXTERNAL_ACCOUNT_GET_PROJECT_ID_PATCH
+def test_load_credentials_from_file_external_account_with_quota_project(
+    get_project_id, tmpdir
+):
+    config_file = tmpdir.join("config.json")
+    config_file.write(json.dumps(IDENTITY_POOL_DATA))
+    credentials, project_id = _default.load_credentials_from_file(
+        str(config_file), quota_project_id="project-foo"
+    )
+
+    assert isinstance(credentials, identity_pool.Credentials)
+    assert project_id is mock.sentinel.project_id
+    assert credentials.quota_project_id == "project-foo"
+
+
+def test_load_credentials_from_file_external_account_bad_format(tmpdir):
+    filename = tmpdir.join("external_account_bad.json")
+    filename.write(json.dumps({"type": "external_account"}))
+
+    with pytest.raises(exceptions.DefaultCredentialsError) as excinfo:
+        _default.load_credentials_from_file(str(filename))
+
+    assert excinfo.match(
+        "Failed to load external account credentials from {}".format(str(filename))
+    )
+
+
+@EXTERNAL_ACCOUNT_GET_PROJECT_ID_PATCH
+def test_load_credentials_from_file_external_account_explicit_request(
+    get_project_id, tmpdir
+):
+    config_file = tmpdir.join("config.json")
+    config_file.write(json.dumps(IDENTITY_POOL_DATA))
+    credentials, project_id = _default.load_credentials_from_file(
+        str(config_file), request=mock.sentinel.request
+    )
+
+    assert isinstance(credentials, identity_pool.Credentials)
+    assert project_id is mock.sentinel.project_id
+    get_project_id.assert_called_with(credentials, request=mock.sentinel.request)
+
+
 @mock.patch.dict(os.environ, {}, clear=True)
 def test__get_explicit_environ_credentials_no_env():
     assert _default._get_explicit_environ_credentials() == (None, None)
@@ -198,7 +321,34 @@
 
     assert credentials is MOCK_CREDENTIALS
     assert project_id is mock.sentinel.project_id
-    load.assert_called_with("filename")
+    load.assert_called_with(
+        "filename",
+        scopes=None,
+        default_scopes=None,
+        quota_project_id=None,
+        request=None,
+    )
+
+
+@LOAD_FILE_PATCH
+def test__get_explicit_environ_credentials_with_scopes_and_request(load, monkeypatch):
+    scopes = ["one", "two"]
+    monkeypatch.setenv(environment_vars.CREDENTIALS, "filename")
+
+    credentials, project_id = _default._get_explicit_environ_credentials(
+        request=mock.sentinel.request, scopes=scopes
+    )
+
+    assert credentials is MOCK_CREDENTIALS
+    assert project_id is mock.sentinel.project_id
+    # Request and scopes should be propagated.
+    load.assert_called_with(
+        "filename",
+        scopes=scopes,
+        default_scopes=None,
+        quota_project_id=None,
+        request=mock.sentinel.request,
+    )
 
 
 @LOAD_FILE_PATCH
@@ -503,3 +653,70 @@
         sys.modules["google.auth.compute_engine"] = None
         sys.modules["google.auth.app_engine"] = None
         assert _default.default() == (MOCK_CREDENTIALS, mock.sentinel.project_id)
+
+
+@EXTERNAL_ACCOUNT_GET_PROJECT_ID_PATCH
+def test_default_environ_external_credentials(get_project_id, monkeypatch, tmpdir):
+    config_file = tmpdir.join("config.json")
+    config_file.write(json.dumps(IDENTITY_POOL_DATA))
+    monkeypatch.setenv(environment_vars.CREDENTIALS, str(config_file))
+
+    credentials, project_id = _default.default()
+
+    assert isinstance(credentials, identity_pool.Credentials)
+    assert project_id is mock.sentinel.project_id
+
+
+@EXTERNAL_ACCOUNT_GET_PROJECT_ID_PATCH
+def test_default_environ_external_credentials_with_user_and_default_scopes_and_quota_project_id(
+    get_project_id, monkeypatch, tmpdir
+):
+    config_file = tmpdir.join("config.json")
+    config_file.write(json.dumps(IDENTITY_POOL_DATA))
+    monkeypatch.setenv(environment_vars.CREDENTIALS, str(config_file))
+
+    credentials, project_id = _default.default(
+        scopes=["https://www.google.com/calendar/feeds"],
+        default_scopes=["https://www.googleapis.com/auth/cloud-platform"],
+        quota_project_id="project-foo",
+    )
+
+    assert isinstance(credentials, identity_pool.Credentials)
+    assert project_id is mock.sentinel.project_id
+    assert credentials.quota_project_id == "project-foo"
+    assert credentials.scopes == ["https://www.google.com/calendar/feeds"]
+    assert credentials.default_scopes == [
+        "https://www.googleapis.com/auth/cloud-platform"
+    ]
+
+
+@EXTERNAL_ACCOUNT_GET_PROJECT_ID_PATCH
+def test_default_environ_external_credentials_explicit_request(
+    get_project_id, monkeypatch, tmpdir
+):
+    config_file = tmpdir.join("config.json")
+    config_file.write(json.dumps(IDENTITY_POOL_DATA))
+    monkeypatch.setenv(environment_vars.CREDENTIALS, str(config_file))
+
+    credentials, project_id = _default.default(request=mock.sentinel.request)
+
+    assert isinstance(credentials, identity_pool.Credentials)
+    assert project_id is mock.sentinel.project_id
+    # default() will initialize new credentials via with_scopes_if_required
+    # and potentially with_quota_project.
+    # As a result the caller of get_project_id() will not match the returned
+    # credentials.
+    get_project_id.assert_called_with(mock.ANY, request=mock.sentinel.request)
+
+
+def test_default_environ_external_credentials_bad_format(monkeypatch, tmpdir):
+    filename = tmpdir.join("external_account_bad.json")
+    filename.write(json.dumps({"type": "external_account"}))
+    monkeypatch.setenv(environment_vars.CREDENTIALS, str(filename))
+
+    with pytest.raises(exceptions.DefaultCredentialsError) as excinfo:
+        _default.default()
+
+    assert excinfo.match(
+        "Failed to load external account credentials from {}".format(str(filename))
+    )