Add InstalledAppFlow (#128)
diff --git a/additional_packages/google_auth_oauthlib/google_auth_oauthlib/flow.py b/additional_packages/google_auth_oauthlib/google_auth_oauthlib/flow.py
index a040659..0df96c0 100644
--- a/additional_packages/google_auth_oauthlib/google_auth_oauthlib/flow.py
+++ b/additional_packages/google_auth_oauthlib/google_auth_oauthlib/flow.py
@@ -17,7 +17,7 @@
This module provides integration with `requests-oauthlib`_ for running the
`OAuth 2.0 Authorization Flow`_ and acquiring user credentials.
-Here's an example of using the flow with the installed application
+Here's an example of using :class:`Flow` with the installed application
authorization flow::
from google_auth_oauthlib.flow import Flow
@@ -44,19 +44,30 @@
session = flow.authorized_session()
print(session.get('https://www.googleapis.com/userinfo/v2/me').json())
+This particular flow can be handled entirely by using
+:class:`InstalledAppFlow`.
+
.. _requests-oauthlib: http://requests-oauthlib.readthedocs.io/en/stable/
.. _OAuth 2.0 Authorization Flow:
https://tools.ietf.org/html/rfc6749#section-1.2
"""
import json
+import logging
+import webbrowser
+import wsgiref.simple_server
+import wsgiref.util
import google.auth.transport.requests
import google.oauth2.credentials
+from six.moves import input
import google_auth_oauthlib.helpers
+_LOGGER = logging.getLogger(__name__)
+
+
class Flow(object):
"""OAuth 2.0 Authorization Flow
@@ -253,3 +264,195 @@
"""
return google.auth.transport.requests.AuthorizedSession(
self.credentials)
+
+
+class InstalledAppFlow(Flow):
+ """Authorization flow helper for installed applications.
+
+ This :class:`Flow` subclass makes it easier to perform the
+ `Installed Application Authorization Flow`_. This flow is useful for
+ local development or applications that are installed on a desktop operating
+ system.
+
+ This flow has two strategies: The console strategy provided by
+ :meth:`run_console` and the local server strategy provided by
+ :meth:`run_local_server`.
+
+ Example::
+
+ from google_auth_oauthlib.flow import InstalledAppFlow
+
+ flow = InstalledAppFlow.from_client_secrets_file(
+ 'client_secrets.json',
+ scopes=['profile', 'email'])
+
+ flow.run_local_server()
+
+ session = flow.authorized_session()
+
+ profile_info = session.get(
+ 'https://www.googleapis.com/userinfo/v2/me').json()
+
+ print(profile_info)
+ # {'name': '...', 'email': '...', ...}
+
+
+ Note that these aren't the only two ways to accomplish the installed
+ application flow, they are just the most common ways. You can use the
+ :class:`Flow` class to perform the same flow with different methods of
+ presenting the authorization URL to the user or obtaining the authorization
+ response, such as using an embedded web view.
+
+ .. _Installed Application Authorization Flow:
+ https://developers.google.com/api-client-library/python/auth
+ /installed-app
+ """
+ _OOB_REDIRECT_URI = 'urn:ietf:wg:oauth:2.0:oob'
+
+ _DEFAULT_AUTH_PROMPT_MESSAGE = (
+ 'Please visit this URL to authorize this application: {url}')
+ """str: The message to display when prompting the user for
+ authorization."""
+ _DEFAULT_AUTH_CODE_MESSAGE = (
+ 'Enter the authorization code: ')
+ """str: The message to display when prompting the user for the
+ authorization code. Used only by the console strategy."""
+
+ _DEFAULT_WEB_SUCCESS_MESSAGE = (
+ 'The authentication flow has completed, you may close this window.')
+
+ def run_console(
+ self,
+ authorization_prompt_message=_DEFAULT_AUTH_PROMPT_MESSAGE,
+ authorization_code_message=_DEFAULT_AUTH_CODE_MESSAGE,
+ **kwargs):
+ """Run the flow using the console strategy.
+
+ The console strategy instructs the user to open the authorization URL
+ in their browser. Once the authorization is complete the authorization
+ server will give the user a code. The user then must copy & paste this
+ code into the application. The code is then exchanged for a token.
+
+ Args:
+ authorization_prompt_message (str): The message to display to tell
+ the user to navigate to the authorization URL.
+ authorization_code_message (str): The message to display when
+ prompting the user for the authorization code.
+ kwargs: Additional keyword arguments passed through to
+ :meth:`authorization_url`.
+
+ Returns:
+ google.oauth2.credentials.Credentials: The OAuth 2.0 credentials
+ for the user.
+ """
+ kwargs.setdefault('prompt', 'consent')
+
+ self.redirect_uri = self._OOB_REDIRECT_URI
+
+ auth_url, _ = self.authorization_url(**kwargs)
+
+ print(authorization_prompt_message.format(url=auth_url))
+
+ code = input(authorization_code_message)
+
+ self.fetch_token(code=code)
+
+ return self.credentials
+
+ def run_local_server(
+ self, host='localhost', port=8080,
+ authorization_prompt_message=_DEFAULT_AUTH_PROMPT_MESSAGE,
+ success_message=_DEFAULT_WEB_SUCCESS_MESSAGE,
+ open_browser=True,
+ **kwargs):
+ """Run the flow using the server strategy.
+
+ The server strategy instructs the user to open the authorization URL in
+ their browser and will attempt to automatically open the URL for them.
+ It will start a local web server to listen for the authorization
+ response. Once authorization is complete the authorization server will
+ redirect the user's browser to the local web server. The web server
+ will get the authorization code from the response and shutdown. The
+ code is then exchanged for a token.
+
+ Args:
+ host (str): The hostname for the local redirect server. This will
+ be served over http, not https.
+ port (int): The port for the local redirect server.
+ authorization_prompt_message (str): The message to display to tell
+ the user to navigate to the authorization URL.
+ success_message (str): The message to display in the web browser
+ the authorization flow is complete.
+ open_browser (bool): Whether or not to open the authorization URL
+ in the user's browser.
+ kwargs: Additional keyword arguments passed through to
+ :meth:`authorization_url`.
+
+ Returns:
+ google.oauth2.credentials.Credentials: The OAuth 2.0 credentials
+ for the user.
+ """
+ self.redirect_uri = 'http://{}:{}/'.format(host, port)
+
+ auth_url, _ = self.authorization_url(**kwargs)
+
+ wsgi_app = _RedirectWSGIApp(success_message)
+ local_server = wsgiref.simple_server.make_server(
+ host, port, wsgi_app, handler_class=_WSGIRequestHandler)
+
+ if open_browser:
+ webbrowser.open(auth_url, new=1, autoraise=True)
+
+ print(authorization_prompt_message.format(url=auth_url))
+
+ local_server.handle_request()
+
+ # Note: using https here because oauthlib is very picky that
+ # OAuth 2.0 should only occur over https.
+ authorization_response = wsgi_app.last_request_uri.replace(
+ 'http', 'https')
+ self.fetch_token(authorization_response=authorization_response)
+
+ return self.credentials
+
+
+class _WSGIRequestHandler(wsgiref.simple_server.WSGIRequestHandler):
+ """Custom WSGIRequestHandler.
+
+ Uses a named logger instead of printing to stderr.
+ """
+ def log_message(self, format, *args, **kwargs):
+ # pylint: disable=redefined-builtin
+ # (format is the argument name defined in the superclass.)
+ _LOGGER.info(format, *args, **kwargs)
+
+
+class _RedirectWSGIApp(object):
+ """WSGI app to handle the authorization redirect.
+
+ Stores the request URI and displays the given success message.
+ """
+
+ def __init__(self, success_message):
+ """
+ Args:
+ success_message (str): The message to display in the web browser
+ the authorization flow is complete.
+ """
+ self.last_request_uri = None
+ self._success_message = success_message
+
+ def __call__(self, environ, start_response):
+ """WSGI Callable.
+
+ Args:
+ environ (Mapping[str, Any]): The WSGI environment.
+ start_response (Callable[str, list]): The WSGI start_response
+ callable.
+
+ Returns:
+ Iterable[bytes]: The response body.
+ """
+ start_response('200 OK', [('Content-type', 'text/plain')])
+ self.last_request_uri = wsgiref.util.request_uri(environ)
+ return [self._success_message.encode('utf-8')]
diff --git a/additional_packages/google_auth_oauthlib/tests/test_flow.py b/additional_packages/google_auth_oauthlib/tests/test_flow.py
index 663a02f..88d283d 100644
--- a/additional_packages/google_auth_oauthlib/tests/test_flow.py
+++ b/additional_packages/google_auth_oauthlib/tests/test_flow.py
@@ -12,11 +12,14 @@
# See the License for the specific language governing permissions and
# limitations under the License.
+import concurrent.futures
import json
import os
import mock
import pytest
+import requests
+from six.moves import urllib
from google_auth_oauthlib import flow
@@ -27,98 +30,176 @@
CLIENT_SECRETS_INFO = json.load(fh)
-def test_from_client_secrets_file():
- instance = flow.Flow.from_client_secrets_file(
- CLIENT_SECRETS_FILE, scopes=mock.sentinel.scopes)
- assert instance.client_config == CLIENT_SECRETS_INFO['web']
- assert (instance.oauth2session.client_id ==
- CLIENT_SECRETS_INFO['web']['client_id'])
- assert instance.oauth2session.scope == mock.sentinel.scopes
+class TestFlow(object):
+ def test_from_client_secrets_file(self):
+ instance = flow.Flow.from_client_secrets_file(
+ CLIENT_SECRETS_FILE, scopes=mock.sentinel.scopes)
+ assert instance.client_config == CLIENT_SECRETS_INFO['web']
+ assert (instance.oauth2session.client_id ==
+ CLIENT_SECRETS_INFO['web']['client_id'])
+ assert instance.oauth2session.scope == mock.sentinel.scopes
+
+ def test_from_client_config_installed(self):
+ client_config = {'installed': CLIENT_SECRETS_INFO['web']}
+ instance = flow.Flow.from_client_config(
+ client_config, scopes=mock.sentinel.scopes)
+ assert instance.client_config == client_config['installed']
+ assert (instance.oauth2session.client_id ==
+ client_config['installed']['client_id'])
+ assert instance.oauth2session.scope == mock.sentinel.scopes
+
+ def test_from_client_config_bad_format(self):
+ with pytest.raises(ValueError):
+ flow.Flow.from_client_config({}, scopes=mock.sentinel.scopes)
+
+ @pytest.fixture
+ def instance(self):
+ yield flow.Flow.from_client_config(
+ CLIENT_SECRETS_INFO, scopes=mock.sentinel.scopes)
+
+ def test_redirect_uri(self, instance):
+ instance.redirect_uri = mock.sentinel.redirect_uri
+ assert (instance.redirect_uri ==
+ instance.oauth2session.redirect_uri ==
+ mock.sentinel.redirect_uri)
+
+ def test_authorization_url(self, instance):
+ scope = 'scope_one'
+ instance.oauth2session.scope = [scope]
+ authorization_url_patch = mock.patch.object(
+ instance.oauth2session, 'authorization_url',
+ wraps=instance.oauth2session.authorization_url)
+
+ with authorization_url_patch as authorization_url_spy:
+ url, _ = instance.authorization_url(prompt='consent')
+
+ assert CLIENT_SECRETS_INFO['web']['auth_uri'] in url
+ assert scope in url
+ authorization_url_spy.assert_called_with(
+ CLIENT_SECRETS_INFO['web']['auth_uri'],
+ access_type='offline',
+ prompt='consent')
+
+ def test_fetch_token(self, instance):
+ fetch_token_patch = mock.patch.object(
+ instance.oauth2session, 'fetch_token', autospec=True,
+ return_value=mock.sentinel.token)
+
+ with fetch_token_patch as fetch_token_mock:
+ token = instance.fetch_token(code=mock.sentinel.code)
+
+ assert token == mock.sentinel.token
+ fetch_token_mock.assert_called_with(
+ CLIENT_SECRETS_INFO['web']['token_uri'],
+ client_secret=CLIENT_SECRETS_INFO['web']['client_secret'],
+ code=mock.sentinel.code)
+
+ def test_credentials(self, instance):
+ instance.oauth2session.token = {
+ 'access_token': mock.sentinel.access_token,
+ 'refresh_token': mock.sentinel.refresh_token
+ }
+
+ credentials = instance.credentials
+
+ assert credentials.token == mock.sentinel.access_token
+ assert credentials._refresh_token == mock.sentinel.refresh_token
+ assert (credentials._client_id ==
+ CLIENT_SECRETS_INFO['web']['client_id'])
+ assert (credentials._client_secret ==
+ CLIENT_SECRETS_INFO['web']['client_secret'])
+ assert (credentials._token_uri ==
+ CLIENT_SECRETS_INFO['web']['token_uri'])
+
+ def test_authorized_session(self, instance):
+ instance.oauth2session.token = {
+ 'access_token': mock.sentinel.access_token,
+ 'refresh_token': mock.sentinel.refresh_token
+ }
+
+ session = instance.authorized_session()
+
+ assert session.credentials.token == mock.sentinel.access_token
-def test_from_client_config_installed():
- client_config = {'installed': CLIENT_SECRETS_INFO['web']}
- instance = flow.Flow.from_client_config(
- client_config, scopes=mock.sentinel.scopes)
- assert instance.client_config == client_config['installed']
- assert (instance.oauth2session.client_id ==
- client_config['installed']['client_id'])
- assert instance.oauth2session.scope == mock.sentinel.scopes
+class TestInstalledAppFlow(object):
+ SCOPES = ['email', 'profile']
+ REDIRECT_REQUEST_PATH = '/?code=code&state=state'
+ @pytest.fixture
+ def instance(self):
+ yield flow.InstalledAppFlow.from_client_config(
+ CLIENT_SECRETS_INFO, scopes=self.SCOPES)
-def test_from_client_config_bad_format():
- with pytest.raises(ValueError):
- flow.Flow.from_client_config({}, scopes=mock.sentinel.scopes)
+ @pytest.fixture
+ def mock_fetch_token(self, instance):
+ def set_token(*args, **kwargs):
+ instance.oauth2session.token = {
+ 'access_token': mock.sentinel.access_token,
+ 'refresh_token': mock.sentinel.refresh_token
+ }
+ fetch_token_patch = mock.patch.object(
+ instance.oauth2session, 'fetch_token', autospec=True,
+ side_effect=set_token)
-@pytest.fixture
-def instance():
- yield flow.Flow.from_client_config(
- CLIENT_SECRETS_INFO, scopes=mock.sentinel.scopes)
+ with fetch_token_patch as fetch_token_mock:
+ yield fetch_token_mock
+ @mock.patch('google_auth_oauthlib.flow.input', autospec=True)
+ def test_run_console(self, input_mock, instance, mock_fetch_token):
+ input_mock.return_value = mock.sentinel.code
-def test_redirect_uri(instance):
- instance.redirect_uri = mock.sentinel.redirect_uri
- assert (instance.redirect_uri ==
- instance.oauth2session.redirect_uri ==
- mock.sentinel.redirect_uri)
+ credentials = instance.run_console()
+ assert credentials.token == mock.sentinel.access_token
+ assert credentials._refresh_token == mock.sentinel.refresh_token
-def test_authorization_url(instance):
- scope = 'scope_one'
- instance.oauth2session.scope = [scope]
- authorization_url_patch = mock.patch.object(
- instance.oauth2session, 'authorization_url',
- wraps=instance.oauth2session.authorization_url)
-
- with authorization_url_patch as authorization_url_spy:
- url, _ = instance.authorization_url(prompt='consent')
-
- assert CLIENT_SECRETS_INFO['web']['auth_uri'] in url
- assert scope in url
- authorization_url_spy.assert_called_with(
- CLIENT_SECRETS_INFO['web']['auth_uri'],
- access_type='offline',
- prompt='consent')
-
-
-def test_fetch_token(instance):
- fetch_token_patch = mock.patch.object(
- instance.oauth2session, 'fetch_token', autospec=True,
- return_value=mock.sentinel.token)
-
- with fetch_token_patch as fetch_token_mock:
- token = instance.fetch_token(code=mock.sentinel.code)
-
- assert token == mock.sentinel.token
- fetch_token_mock.assert_called_with(
+ mock_fetch_token.assert_called_with(
CLIENT_SECRETS_INFO['web']['token_uri'],
client_secret=CLIENT_SECRETS_INFO['web']['client_secret'],
code=mock.sentinel.code)
+ @mock.patch('google_auth_oauthlib.flow.webbrowser', autospec=True)
+ def test_run_local_server(
+ self, webbrowser_mock, instance, mock_fetch_token):
+ auth_redirect_url = urllib.parse.urljoin(
+ 'http://localhost:8080',
+ self.REDIRECT_REQUEST_PATH)
-def test_credentials(instance):
- instance.oauth2session.token = {
- 'access_token': mock.sentinel.access_token,
- 'refresh_token': mock.sentinel.refresh_token
- }
+ with concurrent.futures.ThreadPoolExecutor(max_workers=1) as pool:
+ future = pool.submit(instance.run_local_server)
- credentials = instance.credentials
+ while not future.done():
+ try:
+ requests.get(auth_redirect_url)
+ except requests.ConnectionError: # pragma: NO COVER
+ pass
- assert credentials.token == mock.sentinel.access_token
- assert credentials._refresh_token == mock.sentinel.refresh_token
- assert credentials._client_id == CLIENT_SECRETS_INFO['web']['client_id']
- assert (credentials._client_secret ==
- CLIENT_SECRETS_INFO['web']['client_secret'])
- assert credentials._token_uri == CLIENT_SECRETS_INFO['web']['token_uri']
+ credentials = future.result()
+ assert credentials.token == mock.sentinel.access_token
+ assert credentials._refresh_token == mock.sentinel.refresh_token
+ assert webbrowser_mock.open.called
-def test_authorized_session(instance):
- instance.oauth2session.token = {
- 'access_token': mock.sentinel.access_token,
- 'refresh_token': mock.sentinel.refresh_token
- }
+ expected_auth_response = auth_redirect_url.replace('http', 'https')
+ mock_fetch_token.assert_called_with(
+ CLIENT_SECRETS_INFO['web']['token_uri'],
+ client_secret=CLIENT_SECRETS_INFO['web']['client_secret'],
+ authorization_response=expected_auth_response)
- session = instance.authorized_session()
+ @mock.patch('google_auth_oauthlib.flow.webbrowser', autospec=True)
+ @mock.patch('wsgiref.simple_server.make_server', autospec=True)
+ def test_run_local_server_no_browser(
+ self, make_server_mock, webbrowser_mock, instance,
+ mock_fetch_token):
- assert session.credentials.token == mock.sentinel.access_token
+ def assign_last_request_uri(host, port, wsgi_app, **kwargs):
+ wsgi_app.last_request_uri = self.REDIRECT_REQUEST_PATH
+ return mock.Mock()
+
+ make_server_mock.side_effect = assign_last_request_uri
+
+ instance.run_local_server(open_browser=False)
+
+ assert not webbrowser_mock.open.called
diff --git a/additional_packages/google_auth_oauthlib/tox.ini b/additional_packages/google_auth_oauthlib/tox.ini
index 3cac089..c0983f6 100644
--- a/additional_packages/google_auth_oauthlib/tox.ini
+++ b/additional_packages/google_auth_oauthlib/tox.ini
@@ -6,6 +6,7 @@
mock
pytest
pytest-cov
+ futures
commands =
py.test --cov=google_auth_oauthlib --cov=tests {posargs:tests}
diff --git a/tox.ini b/tox.ini
index 27f7d87..0a0d806 100644
--- a/tox.ini
+++ b/tox.ini
@@ -3,16 +3,16 @@
[testenv]
deps =
+ certifi
flask
mock
+ oauth2client
pytest
pytest-cov
pytest-localserver
- urllib3
- certifi
requests
requests-oauthlib
- oauth2client
+ urllib3
grpcio; platform_python_implementation != 'PyPy'
commands =
py.test --cov=google.auth --cov=google.oauth2 --cov=tests {posargs:tests}