Add XSRF protection to oauth2decorator callback.
Also update all samples to use XSRF callback protection.
Reviewed in https://codereview.appspot.com/6473053/.
diff --git a/tests/test_oauth2client_appengine.py b/tests/test_oauth2client_appengine.py
index 6774abd..1c2d17a 100644
--- a/tests/test_oauth2client_appengine.py
+++ b/tests/test_oauth2client_appengine.py
@@ -25,6 +25,7 @@
import base64
import datetime
import httplib2
+import mox
import os
import time
import unittest
@@ -49,8 +50,10 @@
from google.appengine.ext import db
from google.appengine.ext import testbed
from google.appengine.runtime import apiproxy_errors
+from oauth2client import appengine
from oauth2client.anyjson import simplejson
from oauth2client.clientsecrets import _loadfile
+from oauth2client.clientsecrets import InvalidClientSecretsError
from oauth2client.appengine import AppAssertionCredentials
from oauth2client.appengine import CredentialsModel
from oauth2client.appengine import FlowProperty
@@ -276,15 +279,18 @@
self.assertEqual(None, credentials)
self.assertEqual(None, memcache.get('foo'))
+
class MockRequest(object):
url = 'https://example.org'
def relative_url(self, rel):
return self.url + rel
+
class MockRequestHandler(object):
request = MockRequest()
+
class DecoratorTests(unittest.TestCase):
def setUp(self):
@@ -343,18 +349,28 @@
self.assertEqual('http://localhost/oauth2callback', q['redirect_uri'][0])
self.assertEqual('foo_client_id', q['client_id'][0])
self.assertEqual('foo_scope bar_scope', q['scope'][0])
- self.assertEqual('http://localhost/foo_path', q['state'][0])
+ self.assertEqual('http://localhost/foo_path',
+ q['state'][0].rsplit(':', 1)[0])
self.assertEqual('code', q['response_type'][0])
self.assertEqual(False, self.decorator.has_credentials())
+ m = mox.Mox()
+ m.StubOutWithMock(appengine, "_parse_state_value")
+ appengine._parse_state_value('foo_path:xsrfkey123',
+ mox.IgnoreArg()).AndReturn('foo_path')
+ m.ReplayAll()
+
# Now simulate the callback to /oauth2callback.
response = self.app.get('/oauth2callback', {
'code': 'foo_access_code',
- 'state': 'foo_path',
+ 'state': 'foo_path:xsrfkey123',
})
self.assertEqual('http://localhost/foo_path', response.headers['Location'])
self.assertEqual(None, self.decorator.credentials)
+ m.UnsetStubs()
+ m.VerifyAll()
+
# Now requesting the decorated path should work.
response = self.app.get('/foo_path')
self.assertEqual('200 OK', response.status)
@@ -380,10 +396,16 @@
response = self.app.get('/foo_path')
self.assertTrue(response.status.startswith('302'))
+ m = mox.Mox()
+ m.StubOutWithMock(appengine, "_parse_state_value")
+ appengine._parse_state_value('foo_path:xsrfkey123',
+ mox.IgnoreArg()).AndReturn('foo_path')
+ m.ReplayAll()
+
# Now simulate the callback to /oauth2callback.
response = self.app.get('/oauth2callback', {
'code': 'foo_access_code',
- 'state': 'foo_path',
+ 'state': 'foo_path:xsrfkey123',
})
self.assertEqual('http://localhost/foo_path', response.headers['Location'])
self.assertEqual(None, self.decorator.credentials)
@@ -398,6 +420,9 @@
response = self.app.get('/foo_path')
self.assertTrue(response.status.startswith('302'))
+ m.UnsetStubs()
+ m.VerifyAll()
+
def test_aware(self):
# An initial request to an oauth_aware decorated path should not redirect.
response = self.app.get('/bar_path/2012/01')
@@ -409,18 +434,28 @@
self.assertEqual('http://localhost/oauth2callback', q['redirect_uri'][0])
self.assertEqual('foo_client_id', q['client_id'][0])
self.assertEqual('foo_scope bar_scope', q['scope'][0])
- self.assertEqual('http://localhost/bar_path/2012/01', q['state'][0])
+ self.assertEqual('http://localhost/bar_path/2012/01',
+ q['state'][0].rsplit(':', 1)[0])
self.assertEqual('code', q['response_type'][0])
+ m = mox.Mox()
+ m.StubOutWithMock(appengine, "_parse_state_value")
+ appengine._parse_state_value('bar_path:xsrfkey456',
+ mox.IgnoreArg()).AndReturn('bar_path')
+ m.ReplayAll()
+
# Now simulate the callback to /oauth2callback.
url = self.decorator.authorize_url()
response = self.app.get('/oauth2callback', {
'code': 'foo_access_code',
- 'state': 'bar_path',
+ 'state': 'bar_path:xsrfkey456',
})
self.assertEqual('http://localhost/bar_path', response.headers['Location'])
self.assertEqual(False, self.decorator.has_credentials())
+ m.UnsetStubs()
+ m.VerifyAll()
+
# Now requesting the decorated path will have credentials.
response = self.app.get('/bar_path/2012/01')
self.assertEqual('200 OK', response.status)
@@ -431,7 +466,6 @@
self.assertEqual('foo_access_token',
self.decorator.credentials.access_token)
-
def test_error_in_step2(self):
# An initial request to an oauth_aware decorated path should not redirect.
response = self.app.get('/bar_path/2012/01')
@@ -509,33 +543,77 @@
def test_decorator_from_unfilled_client_secrets_required(self):
MESSAGE = 'File is missing'
- decorator = oauth2decorator_from_clientsecrets(
- datafile('unfilled_client_secrets.json'),
- scope=['foo_scope', 'bar_scope'], message=MESSAGE)
- self._finish_setup(decorator, user_mock=UserNotLoggedInMock)
- self.assertTrue(decorator._in_error)
- self.assertEqual(MESSAGE, decorator._message)
-
- # An initial request to an oauth_required decorated path should be an
- # error message.
- response = self.app.get('/foo_path')
- self.assertTrue(response.status.startswith('200'))
- self.assertTrue(MESSAGE in str(response))
+ try:
+ decorator = oauth2decorator_from_clientsecrets(
+ datafile('unfilled_client_secrets.json'),
+ scope=['foo_scope', 'bar_scope'], message=MESSAGE)
+ except InvalidClientSecretsError:
+ pass
def test_decorator_from_unfilled_client_secrets_aware(self):
MESSAGE = 'File is missing'
- decorator = oauth2decorator_from_clientsecrets(
- datafile('unfilled_client_secrets.json'),
- scope=['foo_scope', 'bar_scope'], message=MESSAGE)
- self._finish_setup(decorator, user_mock=UserNotLoggedInMock)
- self.assertTrue(decorator._in_error)
- self.assertEqual(MESSAGE, decorator._message)
+ try:
+ decorator = oauth2decorator_from_clientsecrets(
+ datafile('unfilled_client_secrets.json'),
+ scope=['foo_scope', 'bar_scope'], message=MESSAGE)
+ except InvalidClientSecretsError:
+ pass
- # An initial request to an oauth_aware decorated path should be an
- # error message.
- response = self.app.get('/bar_path/2012/03')
- self.assertTrue(response.status.startswith('200'))
- self.assertTrue(MESSAGE in str(response))
+
+class DecoratorXsrfSecretTests(unittest.TestCase):
+ """Test xsrf_secret_key."""
+
+ def setUp(self):
+ self.testbed = testbed.Testbed()
+ self.testbed.activate()
+ self.testbed.init_datastore_v3_stub()
+ self.testbed.init_memcache_stub()
+
+ def tearDown(self):
+ self.testbed.deactivate()
+
+ def test_build_and_parse_state(self):
+ secret = appengine.xsrf_secret_key()
+
+ # Secret shouldn't change from call to call.
+ secret2 = appengine.xsrf_secret_key()
+ self.assertEqual(secret, secret2)
+
+ # Secret shouldn't change if memcache goes away.
+ memcache.delete(appengine.XSRF_MEMCACHE_ID,
+ namespace=appengine.OAUTH2CLIENT_NAMESPACE)
+ secret3 = appengine.xsrf_secret_key()
+ self.assertEqual(secret2, secret3)
+
+ # Secret should change if both memcache and the model goes away.
+ memcache.delete(appengine.XSRF_MEMCACHE_ID,
+ namespace=appengine.OAUTH2CLIENT_NAMESPACE)
+ model = appengine.SiteXsrfSecretKey.get_or_insert('site')
+ model.delete()
+
+ secret4 = appengine.xsrf_secret_key()
+ self.assertNotEqual(secret3, secret4)
+
+
+class DecoratorXsrfProtectionTests(unittest.TestCase):
+ """Test _build_state_value and _parse_state_value."""
+
+ def setUp(self):
+ self.testbed = testbed.Testbed()
+ self.testbed.activate()
+ self.testbed.init_datastore_v3_stub()
+ self.testbed.init_memcache_stub()
+
+ def tearDown(self):
+ self.testbed.deactivate()
+
+ def test_build_and_parse_state(self):
+ state = appengine._build_state_value(MockRequestHandler(), UserMock())
+ self.assertEqual(
+ 'https://example.org',
+ appengine._parse_state_value(state, UserMock()))
+ self.assertRaises(appengine.InvalidXsrfTokenError,
+ appengine._parse_state_value, state[1:], UserMock())
if __name__ == '__main__':