blob: 596a4b6a3fb8f472f57fb3832b174c8b012522d2 [file] [log] [blame]
#!/usr/bin/python2.4
#
# Copyright 2010 Google Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Oauth2client.file tests
Unit tests for oauth2client.file
"""
__author__ = 'jcgregorio@google.com (Joe Gregorio)'
import copy
import datetime
import httplib2
import os
import pickle
import stat
import tempfile
import unittest
from apiclient.http import HttpMockSequence
from oauth2client import multistore_file
from oauth2client.anyjson import simplejson
from oauth2client.client import AccessTokenCredentials
from oauth2client.client import AssertionCredentials
from oauth2client.client import OAuth2Credentials
from oauth2client.file import Storage
FILENAME = tempfile.mktemp('oauth2client_test.data')
class OAuth2ClientFileTests(unittest.TestCase):
def tearDown(self):
try:
os.unlink(FILENAME)
except OSError:
pass
def setUp(self):
try:
os.unlink(FILENAME)
except OSError:
pass
def test_non_existent_file_storage(self):
s = Storage(FILENAME)
credentials = s.get()
self.assertEquals(None, credentials)
def test_pickle_and_json_interop(self):
# Write a file with a pickled OAuth2Credentials.
access_token = 'foo'
client_id = 'some_client_id'
client_secret = 'cOuDdkfjxxnv+'
refresh_token = '1/0/a.df219fjls0'
token_expiry = datetime.datetime.utcnow()
token_uri = 'https://www.google.com/accounts/o8/oauth2/token'
user_agent = 'refresh_checker/1.0'
credentials = OAuth2Credentials(
access_token, client_id, client_secret,
refresh_token, token_expiry, token_uri,
user_agent)
f = open(FILENAME, 'w')
pickle.dump(credentials, f)
f.close()
# Storage should be not be able to read that object, as the capability to
# read and write credentials as pickled objects has been removed.
s = Storage(FILENAME)
read_credentials = s.get()
self.assertEquals(None, read_credentials)
# Now write it back out and confirm it has been rewritten as JSON
s.put(credentials)
f = file(FILENAME)
data = simplejson.load(f)
f.close()
self.assertEquals(data['access_token'], 'foo')
self.assertEquals(data['_class'], 'OAuth2Credentials')
self.assertEquals(data['_module'], OAuth2Credentials.__module__)
def test_token_refresh(self):
access_token = 'foo'
client_id = 'some_client_id'
client_secret = 'cOuDdkfjxxnv+'
refresh_token = '1/0/a.df219fjls0'
token_expiry = datetime.datetime.utcnow()
token_uri = 'https://www.google.com/accounts/o8/oauth2/token'
user_agent = 'refresh_checker/1.0'
credentials = OAuth2Credentials(
access_token, client_id, client_secret,
refresh_token, token_expiry, token_uri,
user_agent)
s = Storage(FILENAME)
s.put(credentials)
credentials = s.get()
new_cred = copy.copy(credentials)
new_cred.access_token = 'bar'
s.put(new_cred)
credentials._refresh(lambda x: x)
self.assertEquals(credentials.access_token, 'bar')
def test_credentials_delete(self):
access_token = 'foo'
client_id = 'some_client_id'
client_secret = 'cOuDdkfjxxnv+'
refresh_token = '1/0/a.df219fjls0'
token_expiry = datetime.datetime.utcnow()
token_uri = 'https://www.google.com/accounts/o8/oauth2/token'
user_agent = 'refresh_checker/1.0'
credentials = OAuth2Credentials(
access_token, client_id, client_secret,
refresh_token, token_expiry, token_uri,
user_agent)
s = Storage(FILENAME)
s.put(credentials)
credentials = s.get()
self.assertNotEquals(None, credentials)
s.delete()
credentials = s.get()
self.assertEquals(None, credentials)
def test_access_token_credentials(self):
access_token = 'foo'
user_agent = 'refresh_checker/1.0'
credentials = AccessTokenCredentials(access_token, user_agent)
s = Storage(FILENAME)
credentials = s.put(credentials)
credentials = s.get()
self.assertNotEquals(None, credentials)
self.assertEquals('foo', credentials.access_token)
mode = os.stat(FILENAME).st_mode
if os.name == 'posix':
self.assertEquals('0600', oct(stat.S_IMODE(os.stat(FILENAME).st_mode)))
def test_read_only_file_fail_lock(self):
access_token = 'foo'
client_secret = 'cOuDdkfjxxnv+'
refresh_token = '1/0/a.df219fjls0'
token_expiry = datetime.datetime.utcnow()
token_uri = 'https://www.google.com/accounts/o8/oauth2/token'
user_agent = 'refresh_checker/1.0'
client_id = 'some_client_id'
credentials = OAuth2Credentials(
access_token, client_id, client_secret,
refresh_token, token_expiry, token_uri,
user_agent)
open(FILENAME, 'a+b').close()
os.chmod(FILENAME, 0400)
store = multistore_file.get_credential_storage(
FILENAME,
credentials.client_id,
credentials.user_agent,
['some-scope', 'some-other-scope'])
store.put(credentials)
if os.name == 'posix':
self.assertTrue(store._multistore._read_only)
os.chmod(FILENAME, 0600)
def test_multistore_non_existent_file(self):
store = multistore_file.get_credential_storage(
FILENAME,
'some_client_id',
'user-agent/1.0',
['some-scope', 'some-other-scope'])
credentials = store.get()
self.assertEquals(None, credentials)
def test_multistore_file(self):
access_token = 'foo'
client_secret = 'cOuDdkfjxxnv+'
refresh_token = '1/0/a.df219fjls0'
token_expiry = datetime.datetime.utcnow()
token_uri = 'https://www.google.com/accounts/o8/oauth2/token'
user_agent = 'refresh_checker/1.0'
client_id = 'some_client_id'
credentials = OAuth2Credentials(
access_token, client_id, client_secret,
refresh_token, token_expiry, token_uri,
user_agent)
store = multistore_file.get_credential_storage(
FILENAME,
credentials.client_id,
credentials.user_agent,
['some-scope', 'some-other-scope'])
store.put(credentials)
credentials = store.get()
self.assertNotEquals(None, credentials)
self.assertEquals('foo', credentials.access_token)
store.delete()
credentials = store.get()
self.assertEquals(None, credentials)
if os.name == 'posix':
self.assertEquals('0600', oct(stat.S_IMODE(os.stat(FILENAME).st_mode)))
if __name__ == '__main__':
unittest.main()