blob: c331e09217a90dc3bcae77fc4e5e89ff939ed987 [file] [log] [blame]
bojeil-googled4d7f382021-02-16 12:33:20 -08001# Copyright 2020 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15"""Identity Pool Credentials.
16
17This module provides credentials to access Google Cloud resources from on-prem
18or non-Google Cloud platforms which support external credentials (e.g. OIDC ID
19tokens) retrieved from local file locations or local servers. This includes
20Microsoft Azure and OIDC identity providers (e.g. K8s workloads registered with
21Hub with Hub workload identity enabled).
22
23These credentials are recommended over the use of service account credentials
24in on-prem/non-Google Cloud platforms as they do not involve the management of
25long-live service account private keys.
26
27Identity Pool Credentials are initialized using external_account
28arguments which are typically loaded from an external credentials file or
29an external credentials URL. Unlike other Credentials that can be initialized
30with a list of explicit arguments, secrets or credentials, external account
31clients use the environment and hints/guidelines provided by the
32external_account JSON file to retrieve credentials and exchange them for Google
33access tokens.
34"""
35
Tres Seaver560cf1e2021-08-03 16:35:54 -040036from collections.abc import Mapping
bojeil-googled4d7f382021-02-16 12:33:20 -080037import io
38import json
39import os
40
41from google.auth import _helpers
42from google.auth import exceptions
43from google.auth import external_account
44
45
46class Credentials(external_account.Credentials):
47 """External account credentials sourced from files and URLs."""
48
49 def __init__(
50 self,
51 audience,
52 subject_token_type,
53 token_url,
54 credential_source,
55 service_account_impersonation_url=None,
56 client_id=None,
57 client_secret=None,
58 quota_project_id=None,
59 scopes=None,
60 default_scopes=None,
61 ):
62 """Instantiates an external account credentials object from a file/URL.
63
64 Args:
65 audience (str): The STS audience field.
66 subject_token_type (str): The subject token type.
67 token_url (str): The STS endpoint URL.
68 credential_source (Mapping): The credential source dictionary used to
69 provide instructions on how to retrieve external credential to be
70 exchanged for Google access tokens.
71
72 Example credential_source for url-sourced credential::
73
74 {
75 "url": "http://www.example.com",
76 "format": {
77 "type": "json",
78 "subject_token_field_name": "access_token",
79 },
80 "headers": {"foo": "bar"},
81 }
82
83 Example credential_source for file-sourced credential::
84
85 {
86 "file": "/path/to/token/file.txt"
87 }
88
89 service_account_impersonation_url (Optional[str]): The optional service account
90 impersonation getAccessToken URL.
91 client_id (Optional[str]): The optional client ID.
92 client_secret (Optional[str]): The optional client secret.
93 quota_project_id (Optional[str]): The optional quota project ID.
94 scopes (Optional[Sequence[str]]): Optional scopes to request during the
95 authorization grant.
96 default_scopes (Optional[Sequence[str]]): Default scopes passed by a
97 Google client library. Use 'scopes' for user-defined scopes.
98
99 Raises:
100 google.auth.exceptions.RefreshError: If an error is encountered during
101 access token retrieval logic.
102 ValueError: For invalid parameters.
103
104 .. note:: Typically one of the helper constructors
105 :meth:`from_file` or
106 :meth:`from_info` are used instead of calling the constructor directly.
107 """
108
109 super(Credentials, self).__init__(
110 audience=audience,
111 subject_token_type=subject_token_type,
112 token_url=token_url,
113 credential_source=credential_source,
114 service_account_impersonation_url=service_account_impersonation_url,
115 client_id=client_id,
116 client_secret=client_secret,
117 quota_project_id=quota_project_id,
118 scopes=scopes,
119 default_scopes=default_scopes,
120 )
121 if not isinstance(credential_source, Mapping):
122 self._credential_source_file = None
123 self._credential_source_url = None
124 else:
125 self._credential_source_file = credential_source.get("file")
126 self._credential_source_url = credential_source.get("url")
127 self._credential_source_headers = credential_source.get("headers")
128 credential_source_format = credential_source.get("format", {})
129 # Get credential_source format type. When not provided, this
130 # defaults to text.
131 self._credential_source_format_type = (
132 credential_source_format.get("type") or "text"
133 )
134 # environment_id is only supported in AWS or dedicated future external
135 # account credentials.
136 if "environment_id" in credential_source:
137 raise ValueError(
138 "Invalid Identity Pool credential_source field 'environment_id'"
139 )
140 if self._credential_source_format_type not in ["text", "json"]:
141 raise ValueError(
142 "Invalid credential_source format '{}'".format(
143 self._credential_source_format_type
144 )
145 )
146 # For JSON types, get the required subject_token field name.
147 if self._credential_source_format_type == "json":
148 self._credential_source_field_name = credential_source_format.get(
149 "subject_token_field_name"
150 )
151 if self._credential_source_field_name is None:
152 raise ValueError(
153 "Missing subject_token_field_name for JSON credential_source format"
154 )
155 else:
156 self._credential_source_field_name = None
157
158 if self._credential_source_file and self._credential_source_url:
159 raise ValueError(
160 "Ambiguous credential_source. 'file' is mutually exclusive with 'url'."
161 )
162 if not self._credential_source_file and not self._credential_source_url:
163 raise ValueError(
164 "Missing credential_source. A 'file' or 'url' must be provided."
165 )
166
167 @_helpers.copy_docstring(external_account.Credentials)
168 def retrieve_subject_token(self, request):
169 return self._parse_token_data(
170 self._get_token_data(request),
171 self._credential_source_format_type,
172 self._credential_source_field_name,
173 )
174
175 def _get_token_data(self, request):
176 if self._credential_source_file:
177 return self._get_file_data(self._credential_source_file)
178 else:
179 return self._get_url_data(
180 request, self._credential_source_url, self._credential_source_headers
181 )
182
183 def _get_file_data(self, filename):
184 if not os.path.exists(filename):
185 raise exceptions.RefreshError("File '{}' was not found.".format(filename))
186
187 with io.open(filename, "r", encoding="utf-8") as file_obj:
188 return file_obj.read(), filename
189
190 def _get_url_data(self, request, url, headers):
191 response = request(url=url, method="GET", headers=headers)
192
193 # support both string and bytes type response.data
194 response_body = (
195 response.data.decode("utf-8")
196 if hasattr(response.data, "decode")
197 else response.data
198 )
199
200 if response.status != 200:
201 raise exceptions.RefreshError(
202 "Unable to retrieve Identity Pool subject token", response_body
203 )
204
205 return response_body, url
206
207 def _parse_token_data(
208 self, token_content, format_type="text", subject_token_field_name=None
209 ):
210 content, filename = token_content
211 if format_type == "text":
212 token = content
213 else:
214 try:
215 # Parse file content as JSON.
216 response_data = json.loads(content)
217 # Get the subject_token.
218 token = response_data[subject_token_field_name]
219 except (KeyError, ValueError):
220 raise exceptions.RefreshError(
221 "Unable to parse subject_token from JSON file '{}' using key '{}'".format(
222 filename, subject_token_field_name
223 )
224 )
225 if not token:
226 raise exceptions.RefreshError(
227 "Missing subject_token in the credential_source file"
228 )
229 return token
230
231 @classmethod
232 def from_info(cls, info, **kwargs):
233 """Creates an Identity Pool Credentials instance from parsed external account info.
234
235 Args:
236 info (Mapping[str, str]): The Identity Pool external account info in Google
237 format.
238 kwargs: Additional arguments to pass to the constructor.
239
240 Returns:
241 google.auth.identity_pool.Credentials: The constructed
242 credentials.
243
244 Raises:
245 ValueError: For invalid parameters.
246 """
247 return cls(
248 audience=info.get("audience"),
249 subject_token_type=info.get("subject_token_type"),
250 token_url=info.get("token_url"),
251 service_account_impersonation_url=info.get(
252 "service_account_impersonation_url"
253 ),
254 client_id=info.get("client_id"),
255 client_secret=info.get("client_secret"),
256 credential_source=info.get("credential_source"),
257 quota_project_id=info.get("quota_project_id"),
258 **kwargs
259 )
260
261 @classmethod
262 def from_file(cls, filename, **kwargs):
263 """Creates an IdentityPool Credentials instance from an external account json file.
264
265 Args:
266 filename (str): The path to the IdentityPool external account json file.
267 kwargs: Additional arguments to pass to the constructor.
268
269 Returns:
270 google.auth.identity_pool.Credentials: The constructed
271 credentials.
272 """
273 with io.open(filename, "r", encoding="utf-8") as json_file:
274 data = json.load(json_file)
275 return cls.from_info(data, **kwargs)