Add robot helpers and a sample.
diff --git a/tests/test_oauth2client.py b/tests/test_oauth2client.py
index 3c6e0ba..417807a 100644
--- a/tests/test_oauth2client.py
+++ b/tests/test_oauth2client.py
@@ -35,6 +35,7 @@
from oauth2client.client import AccessTokenCredentials
from oauth2client.client import AccessTokenCredentialsError
from oauth2client.client import AccessTokenRefreshError
+from oauth2client.client import AssertionCredentials
from oauth2client.client import FlowExchangeError
from oauth2client.client import OAuth2Credentials
from oauth2client.client import OAuth2WebServerFlow
@@ -115,6 +116,35 @@
self.assertEqual(400, resp.status)
+class TestAssertionCredentials(unittest.TestCase):
+ assertion_text = "This is the assertion"
+ assertion_type = "http://www.google.com/assertionType"
+
+ class AssertionCredentialsTestImpl(AssertionCredentials):
+
+ def _generate_assertion(self):
+ return TestAssertionCredentials.assertion_text
+
+ def setUp(self):
+ user_agent = "fun/2.0"
+ self.credentials = self.AssertionCredentialsTestImpl(self.assertion_type,
+ user_agent)
+
+ def test_assertion_body(self):
+ body = urlparse.parse_qs(self.credentials._generate_refresh_request_body())
+ self.assertEqual(body['assertion'][0], self.assertion_text)
+ self.assertEqual(body['assertion_type'][0], self.assertion_type)
+
+ def test_assertion_refresh(self):
+ http = HttpMockSequence([
+ ({'status': '200'}, '{"access_token":"1/3w"}'),
+ ({'status': '200'}, 'echo_request_headers'),
+ ])
+ http = self.credentials.authorize(http)
+ resp, content = http.request("http://example.com")
+ self.assertEqual(content['authorization'], 'OAuth 1/3w')
+
+
class OAuth2WebServerFlowTest(unittest.TestCase):
def setUp(self):
@@ -137,7 +167,7 @@
def test_exchange_failure(self):
http = HttpMockSequence([
- ({'status': '400'}, '{"error":"invalid_request"}')
+ ({'status': '400'}, '{"error":"invalid_request"}'),
])
try:
@@ -159,7 +189,6 @@
self.assertNotEqual(credentials.token_expiry, None)
self.assertEqual(credentials.refresh_token, '8xLOxBtZp8')
-
def test_exchange_no_expires_in(self):
http = HttpMockSequence([
({'status': '200'}, """{ "access_token":"SlAV32hkKG",
diff --git a/tests/test_oauth2client_appengine.py b/tests/test_oauth2client_appengine.py
index 964092d..7f370b3 100644
--- a/tests/test_oauth2client_appengine.py
+++ b/tests/test_oauth2client_appengine.py
@@ -22,6 +22,7 @@
__author__ = 'jcgregorio@google.com (Joe Gregorio)'
+import base64
import httplib2
import unittest
import urlparse
@@ -31,20 +32,24 @@
except ImportError:
from cgi import parse_qs
-from apiclient.http import HttpMockSequence
from apiclient.anyjson import simplejson
-from webtest import TestApp
+from apiclient.http import HttpMockSequence
+from google.appengine.api import apiproxy_stub
+from google.appengine.api import apiproxy_stub_map
+from google.appengine.api import users
+from google.appengine.ext import testbed
+from google.appengine.ext import webapp
from oauth2client.client import AccessTokenRefreshError
from oauth2client.client import FlowExchangeError
+from oauth2client.appengine import AppAssertionCredentials
from oauth2client.appengine import OAuth2Decorator
-from google.appengine.ext import webapp
-from google.appengine.api import users
from oauth2client.appengine import OAuth2Handler
-from google.appengine.ext import testbed
+from webtest import TestApp
class UserMock(object):
"""Mock the app engine user service"""
+
def user_id(self):
return 'foo_user'
@@ -55,7 +60,7 @@
content = {
'access_token': 'foo_access_token',
'refresh_token': 'foo_refresh_token',
- 'expires_in': 3600
+ 'expires_in': 3600,
}
def request(self, token_uri, method, body, headers, *args, **kwargs):
@@ -64,6 +69,59 @@
return (self, simplejson.dumps(self.content))
+class TestAppAssertionCredentials(unittest.TestCase):
+ account_name = "service_account_name@appspot.com"
+ signature = "signature"
+
+ class AppIdentityStubImpl(apiproxy_stub.APIProxyStub):
+
+ def __init__(self):
+ super(TestAppAssertionCredentials.AppIdentityStubImpl, self).__init__(
+ 'app_identity_service')
+
+ def _Dynamic_GetServiceAccountName(self, request, response):
+ return response.set_service_account_name(
+ TestAppAssertionCredentials.account_name)
+
+ def _Dynamic_SignForApp(self, request, response):
+ return response.set_signature_bytes(
+ TestAppAssertionCredentials.signature)
+
+ def setUp(self):
+ app_identity_stub = self.AppIdentityStubImpl()
+ apiproxy_stub_map.apiproxy.RegisterStub("app_identity_service",
+ app_identity_stub)
+
+ self.scope = "http://www.googleapis.com/scope"
+ user_agent = "hal/3.0"
+
+ self.credentials = AppAssertionCredentials(self.scope, user_agent)
+
+ def test_assertion(self):
+ assertion = self.credentials._generate_assertion()
+
+ parts = assertion.split(".")
+ self.assertTrue(len(parts) == 3)
+
+ header, body, signature = [base64.b64decode(part) for part in parts]
+
+ header_dict = simplejson.loads(header)
+ self.assertEqual(header_dict['typ'], 'JWT')
+ self.assertEqual(header_dict['alg'], 'RS256')
+
+ body_dict = simplejson.loads(body)
+ self.assertEqual(body_dict['aud'],
+ 'https://accounts.google.com/o/oauth2/token')
+ self.assertEqual(body_dict['scope'], self.scope)
+ self.assertEqual(body_dict['iss'], self.account_name)
+
+ issuedAt = body_dict['iat']
+ self.assertTrue(issuedAt > 0)
+ self.assertEqual(body_dict['exp'], issuedAt + 3600)
+
+ self.assertEqual(signature, self.signature)
+
+
class DecoratorTests(unittest.TestCase):
def setUp(self):
@@ -79,14 +137,14 @@
user_agent='foo_user_agent')
self.decorator = decorator
-
class TestRequiredHandler(webapp.RequestHandler):
+
@decorator.oauth_required
def get(self):
pass
-
class TestAwareHandler(webapp.RequestHandler):
+
@decorator.oauth_aware
def get(self):
self.response.out.write('Hello World!')
@@ -121,7 +179,7 @@
# Now simulate the callback to /oauth2callback
response = self.app.get('/oauth2callback', {
'code': 'foo_access_code',
- 'state': 'foo_path'
+ 'state': 'foo_path',
})
self.assertEqual('http://localhost/foo_path', response.headers['Location'])
self.assertEqual(None, self.decorator.credentials)
@@ -130,8 +188,10 @@
response = self.app.get('/foo_path')
self.assertEqual('200 OK', response.status)
self.assertEqual(True, self.decorator.has_credentials())
- self.assertEqual('foo_refresh_token', self.decorator.credentials.refresh_token)
- self.assertEqual('foo_access_token', self.decorator.credentials.access_token)
+ self.assertEqual('foo_refresh_token',
+ self.decorator.credentials.refresh_token)
+ self.assertEqual('foo_access_token',
+ self.decorator.credentials.access_token)
# Invalidate the stored Credentials
self.decorator.credentials._invalid = True
@@ -161,7 +221,7 @@
url = self.decorator.authorize_url()
response = self.app.get('/oauth2callback', {
'code': 'foo_access_code',
- 'state': 'bar_path'
+ 'state': 'bar_path',
})
self.assertEqual('http://localhost/bar_path', response.headers['Location'])
self.assertEqual(False, self.decorator.has_credentials())
@@ -171,8 +231,10 @@
self.assertEqual('200 OK', response.status)
self.assertEqual('Hello World!', response.body)
self.assertEqual(True, self.decorator.has_credentials())
- self.assertEqual('foo_refresh_token', self.decorator.credentials.refresh_token)
- self.assertEqual('foo_access_token', self.decorator.credentials.access_token)
+ self.assertEqual('foo_refresh_token',
+ self.decorator.credentials.refresh_token)
+ self.assertEqual('foo_access_token',
+ self.decorator.credentials.access_token)
if __name__ == '__main__':
unittest.main()