Modify oauth2client.multistore_file to store and retrieve credentials using an arbitrary key.
Reviewed in https://codereview.appspot.com/7067054/.
diff --git a/tests/test_oauth2client_file.py b/tests/test_oauth2client_file.py
index 7c65677..07e7608 100644
--- a/tests/test_oauth2client_file.py
+++ b/tests/test_oauth2client_file.py
@@ -35,6 +35,7 @@
from oauth2client import file
from oauth2client import locked_file
from oauth2client import multistore_file
+from oauth2client import util
from oauth2client.anyjson import simplejson
from oauth2client.client import AccessTokenCredentials
from oauth2client.client import AssertionCredentials
@@ -58,6 +59,21 @@
except OSError:
pass
+ def create_test_credentials(self):
+ access_token = 'foo'
+ client_secret = 'cOuDdkfjxxnv+'
+ refresh_token = '1/0/a.df219fjls0'
+ token_expiry = datetime.datetime.utcnow()
+ token_uri = 'https://www.google.com/accounts/o8/oauth2/token'
+ user_agent = 'refresh_checker/1.0'
+ client_id = 'some_client_id'
+
+ credentials = OAuth2Credentials(
+ access_token, client_id, client_secret,
+ refresh_token, token_expiry, token_uri,
+ user_agent)
+ return credentials
+
def test_non_existent_file_storage(self):
s = file.Storage(FILENAME)
credentials = s.get()
@@ -78,18 +94,7 @@
def test_pickle_and_json_interop(self):
# Write a file with a pickled OAuth2Credentials.
- access_token = 'foo'
- client_id = 'some_client_id'
- client_secret = 'cOuDdkfjxxnv+'
- refresh_token = '1/0/a.df219fjls0'
- token_expiry = datetime.datetime.utcnow()
- token_uri = 'https://www.google.com/accounts/o8/oauth2/token'
- user_agent = 'refresh_checker/1.0'
-
- credentials = OAuth2Credentials(
- access_token, client_id, client_secret,
- refresh_token, token_expiry, token_uri,
- user_agent)
+ credentials = self.create_test_credentials()
f = open(FILENAME, 'w')
pickle.dump(credentials, f)
@@ -112,18 +117,7 @@
self.assertEquals(data['_module'], OAuth2Credentials.__module__)
def test_token_refresh(self):
- access_token = 'foo'
- client_id = 'some_client_id'
- client_secret = 'cOuDdkfjxxnv+'
- refresh_token = '1/0/a.df219fjls0'
- token_expiry = datetime.datetime.utcnow()
- token_uri = 'https://www.google.com/accounts/o8/oauth2/token'
- user_agent = 'refresh_checker/1.0'
-
- credentials = OAuth2Credentials(
- access_token, client_id, client_secret,
- refresh_token, token_expiry, token_uri,
- user_agent)
+ credentials = self.create_test_credentials()
s = file.Storage(FILENAME)
s.put(credentials)
@@ -136,18 +130,7 @@
self.assertEquals(credentials.access_token, 'bar')
def test_credentials_delete(self):
- access_token = 'foo'
- client_id = 'some_client_id'
- client_secret = 'cOuDdkfjxxnv+'
- refresh_token = '1/0/a.df219fjls0'
- token_expiry = datetime.datetime.utcnow()
- token_uri = 'https://www.google.com/accounts/o8/oauth2/token'
- user_agent = 'refresh_checker/1.0'
-
- credentials = OAuth2Credentials(
- access_token, client_id, client_secret,
- refresh_token, token_expiry, token_uri,
- user_agent)
+ credentials = self.create_test_credentials()
s = file.Storage(FILENAME)
s.put(credentials)
@@ -175,18 +158,7 @@
self.assertEquals('0600', oct(stat.S_IMODE(os.stat(FILENAME).st_mode)))
def test_read_only_file_fail_lock(self):
- access_token = 'foo'
- client_secret = 'cOuDdkfjxxnv+'
- refresh_token = '1/0/a.df219fjls0'
- token_expiry = datetime.datetime.utcnow()
- token_uri = 'https://www.google.com/accounts/o8/oauth2/token'
- user_agent = 'refresh_checker/1.0'
- client_id = 'some_client_id'
-
- credentials = OAuth2Credentials(
- access_token, client_id, client_secret,
- refresh_token, token_expiry, token_uri,
- user_agent)
+ credentials = self.create_test_credentials()
open(FILENAME, 'a+b').close()
os.chmod(FILENAME, 0400)
@@ -230,18 +202,7 @@
self.assertEquals(None, credentials)
def test_multistore_file(self):
- access_token = 'foo'
- client_secret = 'cOuDdkfjxxnv+'
- refresh_token = '1/0/a.df219fjls0'
- token_expiry = datetime.datetime.utcnow()
- token_uri = 'https://www.google.com/accounts/o8/oauth2/token'
- user_agent = 'refresh_checker/1.0'
- client_id = 'some_client_id'
-
- credentials = OAuth2Credentials(
- access_token, client_id, client_secret,
- refresh_token, token_expiry, token_uri,
- user_agent)
+ credentials = self.create_test_credentials()
store = multistore_file.get_credential_storage(
FILENAME,
@@ -263,5 +224,65 @@
if os.name == 'posix':
self.assertEquals('0600', oct(stat.S_IMODE(os.stat(FILENAME).st_mode)))
+ def test_multistore_file_custom_key(self):
+ credentials = self.create_test_credentials()
+
+ custom_key = {'myapp': 'testing', 'clientid': 'some client'}
+ store = multistore_file.get_credential_storage_custom_key(
+ FILENAME, custom_key)
+
+ store.put(credentials)
+ stored_credentials = store.get()
+
+ self.assertNotEquals(None, stored_credentials)
+ self.assertEqual(credentials.access_token, stored_credentials.access_token)
+
+ store.delete()
+ stored_credentials = store.get()
+
+ self.assertEquals(None, stored_credentials)
+
+ def test_multistore_file_custom_string_key(self):
+ credentials = self.create_test_credentials()
+
+ # store with string key
+ store = multistore_file.get_credential_storage_custom_string_key(
+ FILENAME, 'mykey')
+
+ store.put(credentials)
+ stored_credentials = store.get()
+
+ self.assertNotEquals(None, stored_credentials)
+ self.assertEqual(credentials.access_token, stored_credentials.access_token)
+
+ # try retrieving with a dictionary
+ store_dict = multistore_file.get_credential_storage_custom_string_key(
+ FILENAME, {'key': 'mykey'})
+ stored_credentials = store.get()
+ self.assertNotEquals(None, stored_credentials)
+ self.assertEqual(credentials.access_token, stored_credentials.access_token)
+
+ store.delete()
+ stored_credentials = store.get()
+
+ self.assertEquals(None, stored_credentials)
+
+ def test_multistore_file_backwards_compatibility(self):
+ credentials = self.create_test_credentials()
+ scopes = ['scope1', 'scope2']
+
+ # store the credentials using the legacy key method
+ store = multistore_file.get_credential_storage(
+ FILENAME, 'client_id', 'user_agent', scopes)
+ store.put(credentials)
+
+ # retrieve the credentials using a custom key that matches the legacy key
+ key = {'clientId': 'client_id', 'userAgent': 'user_agent',
+ 'scope': util.scopes_to_string(scopes)}
+ store = multistore_file.get_credential_storage_custom_key(FILENAME, key)
+ stored_credentials = store.get()
+
+ self.assertEqual(credentials.access_token, stored_credentials.access_token)
+
if __name__ == '__main__':
unittest.main()
diff --git a/tests/test_oauth2client_util.py b/tests/test_oauth2client_util.py
index 6c72521..2d67316 100644
--- a/tests/test_oauth2client_util.py
+++ b/tests/test_oauth2client_util.py
@@ -25,3 +25,20 @@
]
for expected, case in cases:
self.assertEqual(expected, util.scopes_to_string(case))
+
+
+class KeyConversionTests(unittest.TestCase):
+
+ def test_key_conversions(self):
+ d = {'somekey': 'some value', 'another': 'something else', 'onemore': 'foo'}
+ tuple_key = util.dict_to_tuple_key(d)
+
+ # the resulting key should be naturally sorted
+ self.assertEqual(
+ (('another', 'something else'),
+ ('onemore', 'foo'),
+ ('somekey', 'some value')),
+ tuple_key)
+
+ # check we get the original dictionary back
+ self.assertEqual(d, dict(tuple_key))