feat: define `google.auth.downscoped.Credentials` class (#801)

* feat: define `google.auth.downscoped.Credentials` class

This is based on [Downscoping with Credential Access Boundaries](https://cloud.google.com/iam/docs/downscoping-short-lived-credentials).
The new credentials are initialized mainly using elevated source
credentials and a `google.auth.downscoped.CredentialAccessBoundary`
instance.
The credentials will then get access tokens from the source
credentials and exchange them via the GCP STS token exchange
endpoint using the provided credentials access boundary rules
for downscoped access tokens.

The new credentials will inherit the source credentials' scopes
but the scopes are not exposed as we cannot always determine the
scopes form the source credentials.

* Fixes typos in comments.

* Addresses review comments.

* Moves all constants in the test file to module scope.
diff --git a/tests/test_downscoped.py b/tests/test_downscoped.py
index 61b1d18..ac60e5b 100644
--- a/tests/test_downscoped.py
+++ b/tests/test_downscoped.py
@@ -12,9 +12,19 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-import pytest
+import datetime
+import json
 
+import mock
+import pytest
+from six.moves import http_client
+from six.moves import urllib
+
+from google.auth import _helpers
+from google.auth import credentials
 from google.auth import downscoped
+from google.auth import exceptions
+from google.auth import transport
 
 
 EXPRESSION = (
@@ -36,6 +46,54 @@
 )
 OTHER_AVAILABLE_RESOURCE = "//storage.googleapis.com/projects/_/buckets/other-bucket"
 OTHER_AVAILABLE_PERMISSIONS = ["inRole:roles/storage.objectCreator"]
+QUOTA_PROJECT_ID = "QUOTA_PROJECT_ID"
+GRANT_TYPE = "urn:ietf:params:oauth:grant-type:token-exchange"
+REQUESTED_TOKEN_TYPE = "urn:ietf:params:oauth:token-type:access_token"
+TOKEN_EXCHANGE_ENDPOINT = "https://sts.googleapis.com/v1/token"
+SUBJECT_TOKEN_TYPE = "urn:ietf:params:oauth:token-type:access_token"
+SUCCESS_RESPONSE = {
+    "access_token": "ACCESS_TOKEN",
+    "issued_token_type": "urn:ietf:params:oauth:token-type:access_token",
+    "token_type": "Bearer",
+    "expires_in": 3600,
+}
+ERROR_RESPONSE = {
+    "error": "invalid_grant",
+    "error_description": "Subject token is invalid.",
+    "error_uri": "https://tools.ietf.org/html/rfc6749",
+}
+CREDENTIAL_ACCESS_BOUNDARY_JSON = {
+    "accessBoundary": {
+        "accessBoundaryRules": [
+            {
+                "availablePermissions": AVAILABLE_PERMISSIONS,
+                "availableResource": AVAILABLE_RESOURCE,
+                "availabilityCondition": {
+                    "expression": EXPRESSION,
+                    "title": TITLE,
+                    "description": DESCRIPTION,
+                },
+            }
+        ]
+    }
+}
+
+
+class SourceCredentials(credentials.Credentials):
+    def __init__(self, raise_error=False):
+        super(SourceCredentials, self).__init__()
+        self._counter = 0
+        self._raise_error = raise_error
+
+    def refresh(self, request):
+        if self._raise_error:
+            raise exceptions.RefreshError(
+                "Failed to refresh access token in source credentials."
+            )
+        now = _helpers.utcnow()
+        self._counter += 1
+        self.token = "ACCESS_TOKEN_{}".format(self._counter)
+        self.expiry = now + datetime.timedelta(seconds=3600)
 
 
 def make_availability_condition(expression, title=None, description=None):
@@ -383,3 +441,212 @@
                 ]
             }
         }
+
+
+class TestCredentials(object):
+    @staticmethod
+    def make_credentials(source_credentials=SourceCredentials(), quota_project_id=None):
+        availability_condition = make_availability_condition(
+            EXPRESSION, TITLE, DESCRIPTION
+        )
+        access_boundary_rule = make_access_boundary_rule(
+            AVAILABLE_RESOURCE, AVAILABLE_PERMISSIONS, availability_condition
+        )
+        rules = [access_boundary_rule]
+        credential_access_boundary = make_credential_access_boundary(rules)
+
+        return downscoped.Credentials(
+            source_credentials, credential_access_boundary, quota_project_id
+        )
+
+    @staticmethod
+    def make_mock_request(data, status=http_client.OK):
+        response = mock.create_autospec(transport.Response, instance=True)
+        response.status = status
+        response.data = json.dumps(data).encode("utf-8")
+
+        request = mock.create_autospec(transport.Request)
+        request.return_value = response
+
+        return request
+
+    @staticmethod
+    def assert_request_kwargs(request_kwargs, headers, request_data):
+        """Asserts the request was called with the expected parameters.
+        """
+        assert request_kwargs["url"] == TOKEN_EXCHANGE_ENDPOINT
+        assert request_kwargs["method"] == "POST"
+        assert request_kwargs["headers"] == headers
+        assert request_kwargs["body"] is not None
+        body_tuples = urllib.parse.parse_qsl(request_kwargs["body"])
+        for (k, v) in body_tuples:
+            assert v.decode("utf-8") == request_data[k.decode("utf-8")]
+        assert len(body_tuples) == len(request_data.keys())
+
+    def test_default_state(self):
+        credentials = self.make_credentials()
+
+        # No token acquired yet.
+        assert not credentials.token
+        assert not credentials.valid
+        # Expiration hasn't been set yet.
+        assert not credentials.expiry
+        assert not credentials.expired
+        # No quota project ID set.
+        assert not credentials.quota_project_id
+
+    def test_with_quota_project(self):
+        credentials = self.make_credentials()
+
+        assert not credentials.quota_project_id
+
+        quota_project_creds = credentials.with_quota_project("project-foo")
+
+        assert quota_project_creds.quota_project_id == "project-foo"
+
+    @mock.patch("google.auth._helpers.utcnow", return_value=datetime.datetime.min)
+    def test_refresh(self, unused_utcnow):
+        response = SUCCESS_RESPONSE.copy()
+        # Test custom expiration to confirm expiry is set correctly.
+        response["expires_in"] = 2800
+        expected_expiry = datetime.datetime.min + datetime.timedelta(
+            seconds=response["expires_in"]
+        )
+        headers = {"Content-Type": "application/x-www-form-urlencoded"}
+        request_data = {
+            "grant_type": GRANT_TYPE,
+            "subject_token": "ACCESS_TOKEN_1",
+            "subject_token_type": SUBJECT_TOKEN_TYPE,
+            "requested_token_type": REQUESTED_TOKEN_TYPE,
+            "options": urllib.parse.quote(json.dumps(CREDENTIAL_ACCESS_BOUNDARY_JSON)),
+        }
+        request = self.make_mock_request(status=http_client.OK, data=response)
+        source_credentials = SourceCredentials()
+        credentials = self.make_credentials(source_credentials=source_credentials)
+
+        # Spy on calls to source credentials refresh to confirm the expected request
+        # instance is used.
+        with mock.patch.object(
+            source_credentials, "refresh", wraps=source_credentials.refresh
+        ) as wrapped_souce_cred_refresh:
+            credentials.refresh(request)
+
+            self.assert_request_kwargs(request.call_args[1], headers, request_data)
+            assert credentials.valid
+            assert credentials.expiry == expected_expiry
+            assert not credentials.expired
+            assert credentials.token == response["access_token"]
+            # Confirm source credentials called with the same request instance.
+            wrapped_souce_cred_refresh.assert_called_with(request)
+
+    def test_refresh_token_exchange_error(self):
+        request = self.make_mock_request(
+            status=http_client.BAD_REQUEST, data=ERROR_RESPONSE
+        )
+        credentials = self.make_credentials()
+
+        with pytest.raises(exceptions.OAuthError) as excinfo:
+            credentials.refresh(request)
+
+        assert excinfo.match(
+            r"Error code invalid_grant: Subject token is invalid. - https://tools.ietf.org/html/rfc6749"
+        )
+        assert not credentials.expired
+        assert credentials.token is None
+
+    def test_refresh_source_credentials_refresh_error(self):
+        # Initialize downscoped credentials with source credentials that raise
+        # an error on refresh.
+        credentials = self.make_credentials(
+            source_credentials=SourceCredentials(raise_error=True)
+        )
+
+        with pytest.raises(exceptions.RefreshError) as excinfo:
+            credentials.refresh(mock.sentinel.request)
+
+        assert excinfo.match(r"Failed to refresh access token in source credentials.")
+        assert not credentials.expired
+        assert credentials.token is None
+
+    def test_apply_without_quota_project_id(self):
+        headers = {}
+        request = self.make_mock_request(status=http_client.OK, data=SUCCESS_RESPONSE)
+        credentials = self.make_credentials()
+
+        credentials.refresh(request)
+        credentials.apply(headers)
+
+        assert headers == {
+            "authorization": "Bearer {}".format(SUCCESS_RESPONSE["access_token"])
+        }
+
+    def test_apply_with_quota_project_id(self):
+        headers = {"other": "header-value"}
+        request = self.make_mock_request(status=http_client.OK, data=SUCCESS_RESPONSE)
+        credentials = self.make_credentials(quota_project_id=QUOTA_PROJECT_ID)
+
+        credentials.refresh(request)
+        credentials.apply(headers)
+
+        assert headers == {
+            "other": "header-value",
+            "authorization": "Bearer {}".format(SUCCESS_RESPONSE["access_token"]),
+            "x-goog-user-project": QUOTA_PROJECT_ID,
+        }
+
+    def test_before_request(self):
+        headers = {"other": "header-value"}
+        request = self.make_mock_request(status=http_client.OK, data=SUCCESS_RESPONSE)
+        credentials = self.make_credentials()
+
+        # First call should call refresh, setting the token.
+        credentials.before_request(request, "POST", "https://example.com/api", headers)
+
+        assert headers == {
+            "other": "header-value",
+            "authorization": "Bearer {}".format(SUCCESS_RESPONSE["access_token"]),
+        }
+
+        # Second call shouldn't call refresh (request should be untouched).
+        credentials.before_request(
+            mock.sentinel.request, "POST", "https://example.com/api", headers
+        )
+
+        assert headers == {
+            "other": "header-value",
+            "authorization": "Bearer {}".format(SUCCESS_RESPONSE["access_token"]),
+        }
+
+    @mock.patch("google.auth._helpers.utcnow")
+    def test_before_request_expired(self, utcnow):
+        headers = {}
+        request = self.make_mock_request(status=http_client.OK, data=SUCCESS_RESPONSE)
+        credentials = self.make_credentials()
+        credentials.token = "token"
+        utcnow.return_value = datetime.datetime.min
+        # Set the expiration to one second more than now plus the clock skew
+        # accommodation. These credentials should be valid.
+        credentials.expiry = (
+            datetime.datetime.min + _helpers.CLOCK_SKEW + datetime.timedelta(seconds=1)
+        )
+
+        assert credentials.valid
+        assert not credentials.expired
+
+        credentials.before_request(request, "POST", "https://example.com/api", headers)
+
+        # Cached token should be used.
+        assert headers == {"authorization": "Bearer token"}
+
+        # Next call should simulate 1 second passed.
+        utcnow.return_value = datetime.datetime.min + datetime.timedelta(seconds=1)
+
+        assert not credentials.valid
+        assert credentials.expired
+
+        credentials.before_request(request, "POST", "https://example.com/api", headers)
+
+        # New token should be retrieved.
+        assert headers == {
+            "authorization": "Bearer {}".format(SUCCESS_RESPONSE["access_token"])
+        }