blob: 53621995557f840dd28ddcd8e4ac54770507497a [file] [log] [blame]
bojeil-googled4d7f382021-02-16 12:33:20 -08001# Copyright 2020 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15"""Identity Pool Credentials.
16
17This module provides credentials to access Google Cloud resources from on-prem
18or non-Google Cloud platforms which support external credentials (e.g. OIDC ID
19tokens) retrieved from local file locations or local servers. This includes
20Microsoft Azure and OIDC identity providers (e.g. K8s workloads registered with
21Hub with Hub workload identity enabled).
22
23These credentials are recommended over the use of service account credentials
24in on-prem/non-Google Cloud platforms as they do not involve the management of
25long-live service account private keys.
26
27Identity Pool Credentials are initialized using external_account
28arguments which are typically loaded from an external credentials file or
29an external credentials URL. Unlike other Credentials that can be initialized
30with a list of explicit arguments, secrets or credentials, external account
31clients use the environment and hints/guidelines provided by the
32external_account JSON file to retrieve credentials and exchange them for Google
33access tokens.
34"""
35
36try:
37 from collections.abc import Mapping
38# Python 2.7 compatibility
39except ImportError: # pragma: NO COVER
40 from collections import Mapping
41import io
42import json
43import os
44
45from google.auth import _helpers
46from google.auth import exceptions
47from google.auth import external_account
48
49
50class Credentials(external_account.Credentials):
51 """External account credentials sourced from files and URLs."""
52
53 def __init__(
54 self,
55 audience,
56 subject_token_type,
57 token_url,
58 credential_source,
59 service_account_impersonation_url=None,
60 client_id=None,
61 client_secret=None,
62 quota_project_id=None,
63 scopes=None,
64 default_scopes=None,
65 ):
66 """Instantiates an external account credentials object from a file/URL.
67
68 Args:
69 audience (str): The STS audience field.
70 subject_token_type (str): The subject token type.
71 token_url (str): The STS endpoint URL.
72 credential_source (Mapping): The credential source dictionary used to
73 provide instructions on how to retrieve external credential to be
74 exchanged for Google access tokens.
75
76 Example credential_source for url-sourced credential::
77
78 {
79 "url": "http://www.example.com",
80 "format": {
81 "type": "json",
82 "subject_token_field_name": "access_token",
83 },
84 "headers": {"foo": "bar"},
85 }
86
87 Example credential_source for file-sourced credential::
88
89 {
90 "file": "/path/to/token/file.txt"
91 }
92
93 service_account_impersonation_url (Optional[str]): The optional service account
94 impersonation getAccessToken URL.
95 client_id (Optional[str]): The optional client ID.
96 client_secret (Optional[str]): The optional client secret.
97 quota_project_id (Optional[str]): The optional quota project ID.
98 scopes (Optional[Sequence[str]]): Optional scopes to request during the
99 authorization grant.
100 default_scopes (Optional[Sequence[str]]): Default scopes passed by a
101 Google client library. Use 'scopes' for user-defined scopes.
102
103 Raises:
104 google.auth.exceptions.RefreshError: If an error is encountered during
105 access token retrieval logic.
106 ValueError: For invalid parameters.
107
108 .. note:: Typically one of the helper constructors
109 :meth:`from_file` or
110 :meth:`from_info` are used instead of calling the constructor directly.
111 """
112
113 super(Credentials, self).__init__(
114 audience=audience,
115 subject_token_type=subject_token_type,
116 token_url=token_url,
117 credential_source=credential_source,
118 service_account_impersonation_url=service_account_impersonation_url,
119 client_id=client_id,
120 client_secret=client_secret,
121 quota_project_id=quota_project_id,
122 scopes=scopes,
123 default_scopes=default_scopes,
124 )
125 if not isinstance(credential_source, Mapping):
126 self._credential_source_file = None
127 self._credential_source_url = None
128 else:
129 self._credential_source_file = credential_source.get("file")
130 self._credential_source_url = credential_source.get("url")
131 self._credential_source_headers = credential_source.get("headers")
132 credential_source_format = credential_source.get("format", {})
133 # Get credential_source format type. When not provided, this
134 # defaults to text.
135 self._credential_source_format_type = (
136 credential_source_format.get("type") or "text"
137 )
138 # environment_id is only supported in AWS or dedicated future external
139 # account credentials.
140 if "environment_id" in credential_source:
141 raise ValueError(
142 "Invalid Identity Pool credential_source field 'environment_id'"
143 )
144 if self._credential_source_format_type not in ["text", "json"]:
145 raise ValueError(
146 "Invalid credential_source format '{}'".format(
147 self._credential_source_format_type
148 )
149 )
150 # For JSON types, get the required subject_token field name.
151 if self._credential_source_format_type == "json":
152 self._credential_source_field_name = credential_source_format.get(
153 "subject_token_field_name"
154 )
155 if self._credential_source_field_name is None:
156 raise ValueError(
157 "Missing subject_token_field_name for JSON credential_source format"
158 )
159 else:
160 self._credential_source_field_name = None
161
162 if self._credential_source_file and self._credential_source_url:
163 raise ValueError(
164 "Ambiguous credential_source. 'file' is mutually exclusive with 'url'."
165 )
166 if not self._credential_source_file and not self._credential_source_url:
167 raise ValueError(
168 "Missing credential_source. A 'file' or 'url' must be provided."
169 )
170
171 @_helpers.copy_docstring(external_account.Credentials)
172 def retrieve_subject_token(self, request):
173 return self._parse_token_data(
174 self._get_token_data(request),
175 self._credential_source_format_type,
176 self._credential_source_field_name,
177 )
178
179 def _get_token_data(self, request):
180 if self._credential_source_file:
181 return self._get_file_data(self._credential_source_file)
182 else:
183 return self._get_url_data(
184 request, self._credential_source_url, self._credential_source_headers
185 )
186
187 def _get_file_data(self, filename):
188 if not os.path.exists(filename):
189 raise exceptions.RefreshError("File '{}' was not found.".format(filename))
190
191 with io.open(filename, "r", encoding="utf-8") as file_obj:
192 return file_obj.read(), filename
193
194 def _get_url_data(self, request, url, headers):
195 response = request(url=url, method="GET", headers=headers)
196
197 # support both string and bytes type response.data
198 response_body = (
199 response.data.decode("utf-8")
200 if hasattr(response.data, "decode")
201 else response.data
202 )
203
204 if response.status != 200:
205 raise exceptions.RefreshError(
206 "Unable to retrieve Identity Pool subject token", response_body
207 )
208
209 return response_body, url
210
211 def _parse_token_data(
212 self, token_content, format_type="text", subject_token_field_name=None
213 ):
214 content, filename = token_content
215 if format_type == "text":
216 token = content
217 else:
218 try:
219 # Parse file content as JSON.
220 response_data = json.loads(content)
221 # Get the subject_token.
222 token = response_data[subject_token_field_name]
223 except (KeyError, ValueError):
224 raise exceptions.RefreshError(
225 "Unable to parse subject_token from JSON file '{}' using key '{}'".format(
226 filename, subject_token_field_name
227 )
228 )
229 if not token:
230 raise exceptions.RefreshError(
231 "Missing subject_token in the credential_source file"
232 )
233 return token
234
235 @classmethod
236 def from_info(cls, info, **kwargs):
237 """Creates an Identity Pool Credentials instance from parsed external account info.
238
239 Args:
240 info (Mapping[str, str]): The Identity Pool external account info in Google
241 format.
242 kwargs: Additional arguments to pass to the constructor.
243
244 Returns:
245 google.auth.identity_pool.Credentials: The constructed
246 credentials.
247
248 Raises:
249 ValueError: For invalid parameters.
250 """
251 return cls(
252 audience=info.get("audience"),
253 subject_token_type=info.get("subject_token_type"),
254 token_url=info.get("token_url"),
255 service_account_impersonation_url=info.get(
256 "service_account_impersonation_url"
257 ),
258 client_id=info.get("client_id"),
259 client_secret=info.get("client_secret"),
260 credential_source=info.get("credential_source"),
261 quota_project_id=info.get("quota_project_id"),
262 **kwargs
263 )
264
265 @classmethod
266 def from_file(cls, filename, **kwargs):
267 """Creates an IdentityPool Credentials instance from an external account json file.
268
269 Args:
270 filename (str): The path to the IdentityPool external account json file.
271 kwargs: Additional arguments to pass to the constructor.
272
273 Returns:
274 google.auth.identity_pool.Credentials: The constructed
275 credentials.
276 """
277 with io.open(filename, "r", encoding="utf-8") as json_file:
278 data = json.load(json_file)
279 return cls.from_info(data, **kwargs)