feat: allow scopes for self signed jwt (#776)

* feat: allow scopes for self signed jwt

* Update service_account.py

* add http changes

* Update google/auth/jwt.py
diff --git a/google/auth/jwt.py b/google/auth/jwt.py
index 892f3a8..e9f4f69 100644
--- a/google/auth/jwt.py
+++ b/google/auth/jwt.py
@@ -525,8 +525,9 @@
             "sub": self._subject,
             "iat": _helpers.datetime_to_secs(now),
             "exp": _helpers.datetime_to_secs(expiry),
-            "aud": self._audience,
         }
+        if self._audience:
+            payload["aud"] = self._audience
 
         payload.update(self._additional_claims)
 
diff --git a/google/auth/transport/grpc.py b/google/auth/transport/grpc.py
index 04c0f4f..c47cb3d 100644
--- a/google/auth/transport/grpc.py
+++ b/google/auth/transport/grpc.py
@@ -79,12 +79,9 @@
         # Attempt to use self-signed JWTs when a service account is used.
         # A default host must be explicitly provided since it cannot always
         # be determined from the context.service_url.
-        if (
-            isinstance(self._credentials, service_account.Credentials)
-            and self._default_host
-        ):
+        if isinstance(self._credentials, service_account.Credentials):
             self._credentials._create_self_signed_jwt(
-                "https://{}/".format(self._default_host)
+                "https://{}/".format(self._default_host) if self._default_host else None
             )
 
         self._credentials.before_request(
diff --git a/google/auth/transport/requests.py b/google/auth/transport/requests.py
index d317544..a4784b3 100644
--- a/google/auth/transport/requests.py
+++ b/google/auth/transport/requests.py
@@ -358,13 +358,9 @@
 
         # https://google.aip.dev/auth/4111
         # Attempt to use self-signed JWTs when a service account is used.
-        # A default host must be explicitly provided.
-        if (
-            isinstance(self.credentials, service_account.Credentials)
-            and self._default_host
-        ):
+        if isinstance(self.credentials, service_account.Credentials):
             self.credentials._create_self_signed_jwt(
-                "https://{}/".format(self._default_host)
+                "https://{}/".format(self._default_host) if self._default_host else None
             )
 
     def configure_mtls_channel(self, client_cert_callback=None):
diff --git a/google/auth/transport/urllib3.py b/google/auth/transport/urllib3.py
index aadd116..6a2504d 100644
--- a/google/auth/transport/urllib3.py
+++ b/google/auth/transport/urllib3.py
@@ -293,13 +293,9 @@
 
         # https://google.aip.dev/auth/4111
         # Attempt to use self-signed JWTs when a service account is used.
-        # A default host must be explicitly provided.
-        if (
-            isinstance(self.credentials, service_account.Credentials)
-            and self._default_host
-        ):
+        if isinstance(self.credentials, service_account.Credentials):
             self.credentials._create_self_signed_jwt(
-                "https://{}/".format(self._default_host)
+                "https://{}/".format(self._default_host) if self._default_host else None
             )
 
         super(AuthorizedHttp, self).__init__()
diff --git a/google/oauth2/service_account.py b/google/oauth2/service_account.py
index 1ccfa19..dd36589 100644
--- a/google/oauth2/service_account.py
+++ b/google/oauth2/service_account.py
@@ -131,6 +131,7 @@
         project_id=None,
         quota_project_id=None,
         additional_claims=None,
+        always_use_jwt_access=False,
     ):
         """
         Args:
@@ -149,6 +150,8 @@
                 billing.
             additional_claims (Mapping[str, str]): Any additional claims for
                 the JWT assertion used in the authorization grant.
+            always_use_jwt_access (Optional[bool]): Whether self signed JWT should
+                be always used.
 
         .. note:: Typically one of the helper constructors
             :meth:`from_service_account_file` or
@@ -165,6 +168,7 @@
         self._project_id = project_id
         self._quota_project_id = quota_project_id
         self._token_uri = token_uri
+        self._always_use_jwt_access = always_use_jwt_access
 
         self._jwt_credentials = None
 
@@ -266,6 +270,30 @@
             project_id=self._project_id,
             quota_project_id=self._quota_project_id,
             additional_claims=self._additional_claims.copy(),
+            always_use_jwt_access=self._always_use_jwt_access,
+        )
+
+    def with_always_use_jwt_access(self, always_use_jwt_access):
+        """Create a copy of these credentials with the specified always_use_jwt_access value.
+
+        Args:
+            always_use_jwt_access (bool): Whether always use self signed JWT or not.
+
+        Returns:
+            google.auth.service_account.Credentials: A new credentials
+                instance.
+        """
+        return self.__class__(
+            self._signer,
+            service_account_email=self._service_account_email,
+            scopes=self._scopes,
+            default_scopes=self._default_scopes,
+            token_uri=self._token_uri,
+            subject=self._subject,
+            project_id=self._project_id,
+            quota_project_id=self._quota_project_id,
+            additional_claims=self._additional_claims.copy(),
+            always_use_jwt_access=always_use_jwt_access,
         )
 
     def with_subject(self, subject):
@@ -288,6 +316,7 @@
             project_id=self._project_id,
             quota_project_id=self._quota_project_id,
             additional_claims=self._additional_claims.copy(),
+            always_use_jwt_access=self._always_use_jwt_access,
         )
 
     def with_claims(self, additional_claims):
@@ -315,6 +344,7 @@
             project_id=self._project_id,
             quota_project_id=self._quota_project_id,
             additional_claims=new_additional_claims,
+            always_use_jwt_access=self._always_use_jwt_access,
         )
 
     @_helpers.copy_docstring(credentials.CredentialsWithQuotaProject)
@@ -330,6 +360,7 @@
             project_id=self._project_id,
             quota_project_id=quota_project_id,
             additional_claims=self._additional_claims.copy(),
+            always_use_jwt_access=self._always_use_jwt_access,
         )
 
     def _make_authorization_grant_assertion(self):
@@ -386,8 +417,22 @@
             audience (str): The service URL. ``https://[API_ENDPOINT]/``
         """
         # https://google.aip.dev/auth/4111
-        # If the user has not defined scopes, create a self-signed jwt
-        if not self.scopes:
+        if self._always_use_jwt_access:
+            if self._scopes:
+                self._jwt_credentials = jwt.Credentials.from_signing_credentials(
+                    self, None, additional_claims={"scope": " ".join(self._scopes)}
+                )
+            elif audience:
+                self._jwt_credentials = jwt.Credentials.from_signing_credentials(
+                    self, audience
+                )
+            elif self._default_scopes:
+                self._jwt_credentials = jwt.Credentials.from_signing_credentials(
+                    self,
+                    None,
+                    additional_claims={"scope": " ".join(self._default_scopes)},
+                )
+        elif not self._scopes and audience:
             self._jwt_credentials = jwt.Credentials.from_signing_credentials(
                 self, audience
             )
diff --git a/tests/oauth2/test_service_account.py b/tests/oauth2/test_service_account.py
index 648541e..5852d37 100644
--- a/tests/oauth2/test_service_account.py
+++ b/tests/oauth2/test_service_account.py
@@ -155,6 +155,13 @@
         new_credentials.apply(hdrs, token="tok")
         assert "x-goog-user-project" in hdrs
 
+    def test__with_always_use_jwt_access(self):
+        credentials = self.make_credentials()
+        assert not credentials._always_use_jwt_access
+
+        new_credentials = credentials.with_always_use_jwt_access(True)
+        assert new_credentials._always_use_jwt_access
+
     def test__make_authorization_grant_assertion(self):
         credentials = self.make_credentials()
         token = credentials._make_authorization_grant_assertion()
@@ -225,6 +232,65 @@
         # JWT should not be created if there are user-defined scopes
         jwt.from_signing_credentials.assert_not_called()
 
+    @mock.patch("google.auth.jwt.Credentials", instance=True, autospec=True)
+    def test__create_self_signed_jwt_always_use_jwt_access_with_audience(self, jwt):
+        credentials = service_account.Credentials(
+            SIGNER,
+            self.SERVICE_ACCOUNT_EMAIL,
+            self.TOKEN_URI,
+            default_scopes=["bar", "foo"],
+            always_use_jwt_access=True,
+        )
+
+        audience = "https://pubsub.googleapis.com"
+        credentials._create_self_signed_jwt(audience)
+        jwt.from_signing_credentials.assert_called_once_with(credentials, audience)
+
+    @mock.patch("google.auth.jwt.Credentials", instance=True, autospec=True)
+    def test__create_self_signed_jwt_always_use_jwt_access_with_scopes(self, jwt):
+        credentials = service_account.Credentials(
+            SIGNER,
+            self.SERVICE_ACCOUNT_EMAIL,
+            self.TOKEN_URI,
+            scopes=["bar", "foo"],
+            always_use_jwt_access=True,
+        )
+
+        audience = "https://pubsub.googleapis.com"
+        credentials._create_self_signed_jwt(audience)
+        jwt.from_signing_credentials.assert_called_once_with(
+            credentials, None, additional_claims={"scope": "bar foo"}
+        )
+
+    @mock.patch("google.auth.jwt.Credentials", instance=True, autospec=True)
+    def test__create_self_signed_jwt_always_use_jwt_access_with_default_scopes(
+        self, jwt
+    ):
+        credentials = service_account.Credentials(
+            SIGNER,
+            self.SERVICE_ACCOUNT_EMAIL,
+            self.TOKEN_URI,
+            default_scopes=["bar", "foo"],
+            always_use_jwt_access=True,
+        )
+
+        credentials._create_self_signed_jwt(None)
+        jwt.from_signing_credentials.assert_called_once_with(
+            credentials, None, additional_claims={"scope": "bar foo"}
+        )
+
+    @mock.patch("google.auth.jwt.Credentials", instance=True, autospec=True)
+    def test__create_self_signed_jwt_always_use_jwt_access(self, jwt):
+        credentials = service_account.Credentials(
+            SIGNER,
+            self.SERVICE_ACCOUNT_EMAIL,
+            self.TOKEN_URI,
+            always_use_jwt_access=True,
+        )
+
+        credentials._create_self_signed_jwt(None)
+        jwt.from_signing_credentials.assert_not_called()
+
     @mock.patch("google.oauth2._client.jwt_grant", autospec=True)
     def test_refresh_success(self, jwt_grant):
         credentials = self.make_credentials()
diff --git a/tests/test_jwt.py b/tests/test_jwt.py
index c5290eb..39c45bd 100644
--- a/tests/test_jwt.py
+++ b/tests/test_jwt.py
@@ -390,6 +390,18 @@
         assert new_credentials._additional_claims == self.credentials._additional_claims
         assert new_credentials._quota_project_id == self.credentials._quota_project_id
 
+    def test__make_jwt_without_audience(self):
+        cred = jwt.Credentials.from_service_account_info(
+            SERVICE_ACCOUNT_INFO.copy(),
+            subject=self.SUBJECT,
+            audience=None,
+            additional_claims={"scope": "foo bar"},
+        )
+        token, _ = cred._make_jwt()
+        payload = jwt.decode(token, PUBLIC_CERT_BYTES)
+        assert payload["scope"] == "foo bar"
+        assert "aud" not in payload
+
     def test_with_quota_project(self):
         quota_project_id = "project-foo"
 
diff --git a/tests/transport/test_grpc.py b/tests/transport/test_grpc.py
index 1602f4c..926c1bc 100644
--- a/tests/transport/test_grpc.py
+++ b/tests/transport/test_grpc.py
@@ -111,8 +111,7 @@
 
         plugin._get_authorization_headers(context)
 
-        # self-signed JWT should not be created when default_host is not set
-        credentials._create_self_signed_jwt.assert_not_called()
+        credentials._create_self_signed_jwt.assert_called_once_with(None)
 
     def test__get_authorization_headers_with_service_account_and_default_host(self):
         credentials = mock.create_autospec(service_account.Credentials)
diff --git a/tests/transport/test_requests.py b/tests/transport/test_requests.py
index 3fdd17c..f494c14 100644
--- a/tests/transport/test_requests.py
+++ b/tests/transport/test_requests.py
@@ -378,7 +378,7 @@
 
         authed_session = google.auth.transport.requests.AuthorizedSession(credentials)
 
-        authed_session.credentials._create_self_signed_jwt.assert_not_called()
+        authed_session.credentials._create_self_signed_jwt.assert_called_once_with(None)
 
     def test_authorized_session_with_default_host(self):
         default_host = "pubsub.googleapis.com"
diff --git a/tests/transport/test_urllib3.py b/tests/transport/test_urllib3.py
index 7c06934..e3848c1 100644
--- a/tests/transport/test_urllib3.py
+++ b/tests/transport/test_urllib3.py
@@ -164,7 +164,7 @@
 
         authed_http = google.auth.transport.urllib3.AuthorizedHttp(credentials)
 
-        authed_http.credentials._create_self_signed_jwt.assert_not_called()
+        authed_http.credentials._create_self_signed_jwt.assert_called_once_with(None)
 
     def test_urlopen_with_default_host(self):
         default_host = "pubsub.googleapis.com"