blob: c1908d11c227428b6695a374c73029c0e6cda07a [file] [log] [blame]
#!/usr/bin/python2.4
#
# Copyright 2010 Google Inc. All Rights Reserved.
"""Utilities for OAuth.
Utilities for making it easier to work with OAuth.
"""
__author__ = 'jcgregorio@google.com (Joe Gregorio)'
import copy
import urllib
import oauth2 as oauth
try:
from urlparse import parse_qs, parse_qsl
except ImportError:
from cgi import parse_qs, parse_qsl
class MissingParameter(Exception):
pass
def abstract():
raise NotImplementedError("You need to override this function")
class TokenStore(object):
def get(user, service):
"""Returns an oauth.Token based on the (user, service) returning
None if there is no Token for that (user, service).
"""
abstract()
def set(user, service, token):
abstract()
buzz_discovery = {
'required': ['domain', 'scope'],
'request': {
'url': 'https://www.google.com/accounts/OAuthGetRequestToken',
'params': ['xoauth_displayname']
},
'authorize': {
'url': 'https://www.google.com/buzz/api/auth/OAuthAuthorizeToken',
'params': ['iconUrl', 'oauth_token']
},
'access': {
'url': 'https://www.google.com/accounts/OAuthGetAccessToken',
'params': []
},
}
def _oauth_uri(name, discovery, params):
if name not in ['request', 'access', 'authorize']:
raise KeyError(name)
keys = []
keys.extend(discovery['required'])
keys.extend(discovery[name]['params'])
query = {}
for key in keys:
if key in params:
query[key] = params[key]
return discovery[name]['url'] + '?' + urllib.urlencode(query)
class Flow3LO(object):
def __init__(self, discovery, consumer_key, consumer_secret, user_agent,
**kwargs):
self.discovery = discovery
self.consumer_key = consumer_key
self.consumer_secret = consumer_secret
self.user_agent = user_agent
self.params = kwargs
self.request_token = {}
for key in discovery['required']:
if key not in self.params:
raise MissingParameter('Required parameter %s not supplied' % key)
def step1(self, oauth_callback='oob'):
"""Returns a URI to redirect to the provider.
If oauth_callback is 'oob' then the next call
should be to step2_pin, otherwise oauth_callback
is a URI and the next call should be to
step2_callback() with the query parameters
received at that callback.
"""
consumer = oauth.Consumer(self.consumer_key, self.consumer_secret)
client = oauth.Client(consumer)
headers = {
'user-agent': self.user_agent,
'content-type': 'application/x-www-form-urlencoded'
}
body = urllib.urlencode({'oauth_callback': oauth_callback})
uri = _oauth_uri('request', self.discovery, self.params)
resp, content = client.request(uri, 'POST', headers=headers,
body=body)
if resp['status'] != '200':
print content
raise Exception('Invalid response %s.' % resp['status'])
self.request_token = dict(parse_qsl(content))
auth_params = copy.copy(self.params)
auth_params['oauth_token'] = self.request_token['oauth_token']
uri = _oauth_uri('authorize', self.discovery, auth_params)
return uri
def step2_pin(self, pin):
"""Returns an oauth_token and oauth_token_secret in a dictionary"""
token = oauth.Token(self.request_token['oauth_token'],
self.request_token['oauth_token_secret'])
token.set_verifier(pin)
consumer = oauth.Consumer(self.consumer_key, self.consumer_secret)
client = oauth.Client(consumer, token)
headers = {
'user-agent': self.user_agent,
'content-type': 'application/x-www-form-urlencoded'
}
uri = _oauth_uri('access', self.discovery, self.params)
resp, content = client.request(uri, 'POST', headers=headers)
return dict(parse_qsl(content))
def step2_callback(self, query_params):
"""Returns an access token via oauth.Token"""
pass