Removed buzz_gae_client.py and it's associated tests.
Renamed simple_buzz_wrapper.py and it's tests to simple_wrapper.py.
SimpleWrapper now uses the Credential class.
Added oauth_required decorator for use on AppEngine. This aims to
greatly simplify OAuth use with this library. At the moment it only
supports Buzz.
oacurl.py now uses spaces for indentation rather than tabs.
diff --git a/contrib/buzz/buzz_appengine.py b/contrib/buzz/buzz_appengine.py
new file mode 100644
index 0000000..68a514b
--- /dev/null
+++ b/contrib/buzz/buzz_appengine.py
@@ -0,0 +1,109 @@
+# Copyright (C) 2010 Google Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from google.appengine.api import users
+from google.appengine.ext import db
+
+import apiclient.ext.appengine
+import logging
+import settings
+import simple_buzz_wrapper
+
+
+class Flow(db.Model):
+ flow = apiclient.ext.appengine.FlowThreeLeggedProperty()
+
+
+class Credentials(db.Model):
+ credentials = apiclient.ext.appengine.OAuthCredentialsProperty()
+
+
+def oauth_required(handler_method):
+ """A decorator to require that a user has gone through the OAuth dance before accessing a handler.
+
+ To use it, decorate your get() method like this:
+ @oauth_required
+ def get(self):
+ buzz_wrapper = oauth_handlers.build_buzz_wrapper_for_current_user()
+ user_profile_data = buzz_wrapper.get_profile()
+ self.response.out.write('Hello, ' + user_profile_data.displayName)
+
+ We will redirect the user to the OAuth endpoint and afterwards the OAuth
+ will send the user back to the DanceFinishingHandler that you have configured.
+ This should only used for GET requests since any payload in a POST request
+ will be lost. Any parameters in the original URL will be preserved.
+ """
+ def check_oauth_credentials(self, *args):
+ if self.request.method != 'GET':
+ raise webapp.Error('The check_oauth decorator can only be used for GET '
+ 'requests')
+
+ # Is this a request from the OAuth system after finishing the OAuth dance?
+ if self.request.get('oauth_verifier'):
+ user = users.get_current_user()
+ logging.debug('Finished OAuth dance for: %s' % user.email())
+
+ f = Flow.get_by_key_name(user.user_id())
+ if f:
+ credentials = f.flow.step2_exchange(self.request.params)
+ c = Credentials(key_name=user.user_id(), credentials=credentials)
+ c.put()
+
+ # We delete the flow so that a malicious actor can't pretend to be the OAuth service
+ # and replace a valid token with an invalid token
+ f.delete()
+ handler_method(self, *args)
+ return
+
+ # Find out who the user is. If we don't know who you are then we can't
+ # look up your OAuth credentials thus we must ensure the user is logged in.
+ user = users.get_current_user()
+ if not user:
+ self.redirect(users.create_login_url(self.request.uri))
+ return
+
+ # Now that we know who the user is look up their OAuth credentials
+ # if we don't find the credentials then send them through the OAuth dance
+ if not Credentials.get_by_key_name(user.user_id()):
+ # TODO(ade) make this more configurable using settings.py
+ # Domain, and scope should be configurable
+ p = apiclient.discovery.build("buzz", "v1")
+ flow = apiclient.oauth.FlowThreeLegged(p.auth_discovery(),
+ consumer_key=settings.CONSUMER_KEY,
+ consumer_secret=settings.CONSUMER_SECRET,
+ user_agent='google-api-client-python-buzz-webapp/1.0',
+ domain='anonymous',
+ scope='https://www.googleapis.com/auth/buzz',
+ xoauth_displayname=settings.DISPLAY_NAME)
+
+ # The OAuth system needs to send the user right back here so that they
+ # get to the page they originally intended to visit.
+ oauth_return_url = self.request.uri
+ authorize_url = flow.step1_get_authorize_url(oauth_return_url)
+
+ f = Flow(key_name=user.user_id(), flow=flow)
+ f.put()
+
+ self.redirect(authorize_url)
+ return
+
+ # If the user already has a token then call the wrapped handler
+ handler_method(self, *args)
+ return check_oauth_credentials
+
+def build_buzz_wrapper_for_current_user():
+ user = users.get_current_user()
+ credentials = Credentials.get_by_key_name(user.user_id()).credentials
+ return simple_buzz_wrapper.SimpleBuzzWrapper(api_key=settings.API_KEY,
+ credentials=credentials)
\ No newline at end of file
diff --git a/contrib/buzz/buzz_gae_client.py b/contrib/buzz/buzz_gae_client.py
deleted file mode 100644
index c5954c2..0000000
--- a/contrib/buzz/buzz_gae_client.py
+++ /dev/null
@@ -1,107 +0,0 @@
-#!/usr/bin/python2.4
-#
-# Copyright 2010 Google Inc. All Rights Reserved.
-
-__author__ = 'ade@google.com (Ade Oshineye)'
-
-import apiclient.discovery
-import logging
-import oauth_wrap
-import oauth2 as oauth
-import urllib
-import urlparse
-
-try:
- from urlparse import parse_qs, parse_qsl
-except ImportError:
- from cgi import parse_qs, parse_qsl
-
-# TODO(ade) Replace user-agent with something specific
-HEADERS = {
- 'user-agent': 'gdata-python-v3-sample-client/0.1',
- 'content-type': 'application/x-www-form-urlencoded'
- }
-
-REQUEST_TOKEN_URL = 'https://www.google.com/accounts/OAuthGetRequestToken?domain=anonymous&scope=https://www.googleapis.com/auth/buzz'
-AUTHORIZE_URL = 'https://www.google.com/buzz/api/auth/OAuthAuthorizeToken?domain=anonymous&scope=https://www.googleapis.com/auth/buzz'
-ACCESS_TOKEN_URL = 'https://www.google.com/accounts/OAuthGetAccessToken'
-
-
-class Error(Exception):
- """Base error for this module."""
- pass
-
-
-class RequestError(Error):
- """Request returned failure or unexpected data."""
- pass
-
-
-# TODO(ade) This class is really a BuzzGaeBuilder. Rename it.
-class BuzzGaeClient(object):
- def __init__(self, consumer_key='anonymous', consumer_secret='anonymous', api_key=None):
- self.consumer = oauth.Consumer(consumer_key, consumer_secret)
- self.consumer_key = consumer_key
- self.consumer_secret = consumer_secret
- self.api_key = api_key
-
- def _make_post_request(self, client, url, parameters):
- resp, content = client.request(url, 'POST', headers=HEADERS,
- body=urllib.urlencode(parameters, True))
-
- if resp['status'] != '200':
- logging.warn('Request: %s failed with status: %s. Content was: %s' % (url, resp['status'], content))
- raise RequestError('Invalid response %s.' % resp['status'])
- return resp, content
-
- def get_request_token(self, callback_url, display_name = None):
- parameters = {
- 'oauth_callback': callback_url
- }
-
- if display_name is not None:
- parameters['xoauth_displayname'] = display_name
-
- client = oauth.Client(self.consumer)
- resp, content = self._make_post_request(client, REQUEST_TOKEN_URL, parameters)
-
- request_token = dict(parse_qsl(content))
- return request_token
-
- def generate_authorisation_url(self, request_token):
- """Returns the URL the user should be redirected to in other to gain access to their account."""
-
- base_url = urlparse.urlparse(AUTHORIZE_URL)
- query = parse_qs(base_url.query)
- query['oauth_token'] = request_token['oauth_token']
-
- logging.info(urllib.urlencode(query, True))
-
- url = (base_url.scheme, base_url.netloc, base_url.path, base_url.params,
- urllib.urlencode(query, True), base_url.fragment)
- authorisation_url = urlparse.urlunparse(url)
- return authorisation_url
-
- def upgrade_to_access_token(self, request_token, oauth_verifier):
- token = oauth.Token(request_token['oauth_token'],
- request_token['oauth_token_secret'])
- token.set_verifier(oauth_verifier)
- client = oauth.Client(self.consumer, token)
-
- parameters = {}
- resp, content = self._make_post_request(client, ACCESS_TOKEN_URL, parameters)
- access_token = dict(parse_qsl(content))
-
- d = {
- 'consumer_key' : self.consumer_key,
- 'consumer_secret' : self.consumer_secret
- }
- d.update(access_token)
- return d
-
- def build_api_client(self, oauth_params=None):
- if oauth_params is not None:
- http = oauth_wrap.get_authorised_http(oauth_params)
- return apiclient.discovery.build('buzz', 'v1', http=http, developerKey=self.api_key)
- else:
- return apiclient.discovery.build('buzz', 'v1', developerKey=self.api_key)
diff --git a/contrib/buzz/simple_buzz_wrapper.py b/contrib/buzz/simple_buzz_wrapper.py
deleted file mode 100644
index a82b57c..0000000
--- a/contrib/buzz/simple_buzz_wrapper.py
+++ /dev/null
@@ -1,60 +0,0 @@
-#!/usr/bin/python2.4
-#
-# Copyright 2010 Google Inc. All Rights Reserved.
-
-__author__ = 'ade@google.com (Ade Oshineye)'
-
-import buzz_gae_client
-import logging
-
-class SimpleBuzzWrapper(object):
- "Simple client that exposes the bare minimum set of common Buzz operations"
-
- def __init__(self, api_key=None, consumer_key='anonymous', consumer_secret='anonymous',
- oauth_token=None, oauth_token_secret=None):
-
- self.builder = buzz_gae_client.BuzzGaeClient(consumer_key, consumer_secret, api_key=api_key)
- if oauth_token and oauth_token_secret:
- logging.debug('Using api_client with authorisation')
- oauth_params_dict = {}
- oauth_params_dict['consumer_key'] = consumer_key
- oauth_params_dict['consumer_secret'] = consumer_secret
- oauth_params_dict['oauth_token'] = oauth_token
- oauth_params_dict['oauth_token_secret'] = oauth_token_secret
- self.api_client = self.builder.build_api_client(oauth_params=oauth_params_dict)
- else:
- logging.debug('Using api_client that doesn\'t have authorisation')
- self.api_client = self.builder.build_api_client()
-
- def search(self, query, user_token=None, max_results=10):
- if query is None or query.strip() is '':
- return None
-
- json = self.api_client.activities().search(q=query, max_results=max_results).execute()
- if json.has_key('items'):
- return json['items']
- return []
-
- def post(self, message_body, user_id='@me'):
- if message_body is None or message_body.strip() is '':
- return None
-
- activities = self.api_client.activities()
- logging.info('Retrieved activities for: %s' % user_id)
- activity = activities.insert(userId=user_id, body={
- 'data' : {
- 'title': message_body,
- 'object': {
- 'content': message_body,
- 'type': 'note'}
- }
- }
- ).execute()
- url = activity['links']['alternate'][0]['href']
- logging.info('Just created: %s' % url)
- return url
-
- def get_profile(self, user_id='@me'):
- user_profile_data = self.api_client.people().get(userId=user_id).execute()
- return user_profile_data
-
diff --git a/contrib/buzz/simple_wrapper.py b/contrib/buzz/simple_wrapper.py
new file mode 100644
index 0000000..eb12f85
--- /dev/null
+++ b/contrib/buzz/simple_wrapper.py
@@ -0,0 +1,54 @@
+#!/usr/bin/python2.4
+#
+# Copyright 2010 Google Inc. All Rights Reserved.
+
+__author__ = 'ade@google.com (Ade Oshineye)'
+
+import apiclient.discovery
+import httplib2
+import logging
+
+class SimpleWrapper(object):
+ "Simple client that exposes the bare minimum set of common Buzz operations"
+
+ def __init__(self, api_key=None, credentials=None):
+ if credentials:
+ logging.debug('Using api_client with credentials')
+ http = httplib2.Http()
+ http = credentials.authorize(http)
+ self.api_client = apiclient.discovery.build('buzz', 'v1', http=http, developerKey=api_key)
+ else:
+ logging.debug('Using api_client that doesn\'t have credentials')
+ self.api_client = apiclient.discovery.build('buzz', 'v1', developerKey=api_key)
+
+ def search(self, query, user_token=None, max_results=10):
+ if query is None or query.strip() is '':
+ return None
+
+ json = self.api_client.activities().search(q=query, max_results=max_results).execute()
+ if json.has_key('items'):
+ return json['items']
+ return []
+
+ def post(self, message_body, user_id='@me'):
+ if message_body is None or message_body.strip() is '':
+ return None
+
+ activities = self.api_client.activities()
+ logging.info('Retrieved activities for: %s' % user_id)
+ activity = activities.insert(userId=user_id, body={
+ 'data' : {
+ 'title': message_body,
+ 'object': {
+ 'content': message_body,
+ 'type': 'note'}
+ }
+ }
+ ).execute()
+ url = activity['links']['alternate'][0]['href']
+ logging.info('Just created: %s' % url)
+ return url
+
+ def get_profile(self, user_id='@me'):
+ user_profile_data = self.api_client.people().get(userId=user_id).execute()
+ return user_profile_data
diff --git a/contrib_tests/buzz/test_buzz_gae_client.py b/contrib_tests/buzz/test_buzz_gae_client.py
deleted file mode 100644
index 4a08545..0000000
--- a/contrib_tests/buzz/test_buzz_gae_client.py
+++ /dev/null
@@ -1,64 +0,0 @@
-#!/usr/bin/python2.4
-#
-# Copyright 2010 Google Inc. All Rights Reserved.
-
-__author__ = 'ade@google.com (Ade Oshineye)'
-
-from contrib.buzz.buzz_gae_client import BuzzGaeClient
-import unittest
-
-class BuzzGaeClientTest(unittest.TestCase):
- def test_can_build_client(self):
- client = BuzzGaeClient()
- self.assertNotEquals(None, client)
-
- def test_can_get_request_token(self):
- client = BuzzGaeClient()
- callback_url = 'http://example.com'
- request_token = client.get_request_token(callback_url)
- self.assertTrue(request_token['oauth_token'] is not None)
- self.assertTrue(request_token['oauth_token_secret'] is not None)
- self.assertEquals(request_token['oauth_callback_confirmed'], 'true')
-
- def test_can_generate_authorisation_url(self):
- client = BuzzGaeClient()
- callback_url = 'http://example.com'
- request_token = client.get_request_token(callback_url)
- authorisation_url = client.generate_authorisation_url(request_token)
- self.assertTrue(authorisation_url is not None)
- self.assertTrue(authorisation_url.startswith('https://www.google.com/buzz/api/auth/OAuthAuthorizeToken?scope='))
-
- # TODO(ade) this test can't work outside of the AppEngine environment. Find an equivalent that works.
- def _test_can_upgrade_tokens(self):
- client = BuzzGaeClient()
- callback_url = 'http://example.com'
- request_token = client.get_request_token(callback_url)
- oauth_verifier = 'some verifier'
-
- consumer_key = 'anonymous'
- consumer_secret = 'anonymous'
- usable_token = client.upgrade_to_access_token(request_token, oauth_verifier)
- self.assertTrue(usable_token is not None)
- self.assertEquals(consumer_key, usable_token['consumer_key'])
- self.assertEquals(consumer_secret, usable_token['consumer_secret'])
- self.assertTrue(usable_token['access_token'] is not None)
-
- def test_can_build_apiclient_with_access_token(self):
- client = BuzzGaeClient()
- oauth_parameters = {}
- oauth_parameters['oauth_token'] = ''
- oauth_parameters['oauth_token_secret'] = ''
- oauth_parameters['consumer_key'] = ''
- oauth_parameters['consumer_secret'] = ''
- api_client = client.build_api_client(oauth_parameters)
- self.assertTrue(api_client is not None)
-
- def test_can_fetch_activites_from_buzz(self):
- client = BuzzGaeClient()
- api_client = client.build_api_client()
- count = 9
- activities = api_client.activities().list(userId='googlebuzz', scope='@self', max_results=count).execute()['items']
- self.assertEquals(count, len(activities))
-
-if __name__ == '__main__':
- unittest.main()
\ No newline at end of file
diff --git a/contrib_tests/buzz/test_simple_buzz_wrapper.py b/contrib_tests/buzz/test_simple_wrapper.py
similarity index 74%
rename from contrib_tests/buzz/test_simple_buzz_wrapper.py
rename to contrib_tests/buzz/test_simple_wrapper.py
index 347595b..505dfee 100644
--- a/contrib_tests/buzz/test_simple_buzz_wrapper.py
+++ b/contrib_tests/buzz/test_simple_wrapper.py
@@ -4,43 +4,45 @@
__author__ = 'ade@google.com (Ade Oshineye)'
-from contrib.buzz.simple_buzz_wrapper import SimpleBuzzWrapper
+from contrib.buzz.simple_wrapper import SimpleWrapper
+import apiclient.oauth
import httplib2
import logging
+import oauth2 as oauth
import os
import pickle
import unittest
-class SimpleBuzzWrapperTest(unittest.TestCase):
+class SimpleWrapperTest(unittest.TestCase):
# None of these tests make a remote call. We assume the underlying libraries
# and servers are working.
def test_wrapper_rejects_empty_post(self):
- wrapper = SimpleBuzzWrapper()
+ wrapper = SimpleWrapper()
self.assertEquals(None, wrapper.post('', '108242092577082601423'))
def test_wrapper_rejects_post_containing_only_whitespace(self):
- wrapper = SimpleBuzzWrapper()
+ wrapper = SimpleWrapper()
self.assertEquals(None, wrapper.post(' ', '108242092577082601423'))
def test_wrapper_rejects_none_post(self):
- wrapper = SimpleBuzzWrapper()
+ wrapper = SimpleWrapper()
self.assertEquals(None, wrapper.post(None, '108242092577082601423'))
def test_wrapper_rejects_empty_search(self):
- wrapper = SimpleBuzzWrapper()
+ wrapper = SimpleWrapper()
self.assertEquals(None, wrapper.search(''))
def test_wrapper_rejects_search_containing_only_whitespace(self):
- wrapper = SimpleBuzzWrapper()
+ wrapper = SimpleWrapper()
self.assertEquals(None, wrapper.search(' '))
def test_wrapper_rejects_search_with_none(self):
- wrapper = SimpleBuzzWrapper()
+ wrapper = SimpleWrapper()
self.assertEquals(None, wrapper.search(None))
-class SimpleBuzzWrapperRemoteTest(unittest.TestCase):
+class SimpleWrapperRemoteTest(unittest.TestCase):
# These tests make remote calls
def __init__(self, method_name):
unittest.TestCase.__init__(self, method_name)
@@ -52,9 +54,13 @@
key,value = line.split('=')
oauth_params_dict[key.strip()] = value.strip()
- self.wrapper = SimpleBuzzWrapper(consumer_key=oauth_params_dict['consumerKey'],
- consumer_secret=oauth_params_dict['consumerSecret'], oauth_token=oauth_params_dict['accessToken'],
- oauth_token_secret=oauth_params_dict['accessTokenSecret'])
+ consumer = oauth.Consumer(oauth_params_dict['consumerKey'],
+ oauth_params_dict['consumerSecret'])
+ token = oauth.Token(oauth_params_dict['accessToken'],
+ oauth_params_dict['accessTokenSecret'])
+ user_agent = 'google-api-client-python-buzz-webapp/1.0'
+ credentials = apiclient.oauth.OAuthCredentials(consumer, token, user_agent)
+ self.wrapper = SimpleWrapper(credentials=credentials)
def test_searching_returns_results(self):
results = self.wrapper.search('oshineye')
diff --git a/oacurl.py b/oacurl.py
index 64493b5..ffb3a4d 100644
--- a/oacurl.py
+++ b/oacurl.py
@@ -24,133 +24,137 @@
def load_properties_file(path):
- properties = {}
- for line in open(path):
- line = line.strip()
- if line.startswith('#'):
- continue
-
- key,value = line.split('=')
- properties[key.strip()] = value.strip()
- return properties
+ properties = {}
+ for line in open(path):
+ line = line.strip()
+ if line.startswith('#'):
+ continue
+
+ key,value = line.split('=')
+ properties[key.strip()] = value.strip()
+ return properties
def save_properties(consumer_key, consumer_secret, token_key, token_secret, path):
- file = open(path, 'w')
-
- # File format and order is based on oacurl.java's defaults
- now = datetime.datetime.today()
- now_string = now.strftime('%a %b %d %H:%m:%S %Z %Y')
- file.write('#%s\n' % now_string)
- file.write('consumerSecret=%s\n' % consumer_secret)
- file.write('accessToken=%s\n' % token_key)
- file.write('consumerKey=%s\n' % consumer_key)
- file.write('accessTokenSecret=%s\n' % token_secret)
- file.close()
+ file = open(path, 'w')
+
+ # File format and order is based on oacurl.java's defaults
+ now = datetime.datetime.today()
+ now_string = now.strftime('%a %b %d %H:%m:%S %Z %Y')
+ file.write('#%s\n' % now_string)
+ file.write('consumerSecret=%s\n' % consumer_secret)
+ file.write('accessToken=%s\n' % token_key)
+ file.write('consumerKey=%s\n' % consumer_key)
+ file.write('accessTokenSecret=%s\n' % token_secret)
+ file.close()
+
def fetch(url):
- logging.debug('Now fetching: %s' % url)
-
- path = os.path.expanduser('~/.oacurl.properties')
- if not os.path.exists(path):
- logging.debug('User is not logged in.')
-
- print 'You are not logged in'
- sys.exit(1)
+ logging.debug('Now fetching: %s' % url)
+
+ path = os.path.expanduser('~/.oacurl.properties')
+ if not os.path.exists(path):
+ logging.debug('User is not logged in.')
+
+ print 'You are not logged in'
+ sys.exit(1)
- properties = load_properties_file(path)
- oauth_parameters = {
- 'consumer_key': properties['consumerKey'],
- 'consumer_secret' : properties['consumerSecret'],
- 'oauth_token' : properties['accessToken'],
- 'oauth_token_secret':properties['accessTokenSecret']}
-
- http = oauth_wrap.get_authorised_http(oauth_parameters)
- response, content = http.request(url)
- logging.debug(response)
- logging.debug(content)
-
- return response,content
+ properties = load_properties_file(path)
+ oauth_parameters = {
+ 'consumer_key': properties['consumerKey'],
+ 'consumer_secret' : properties['consumerSecret'],
+ 'oauth_token' : properties['accessToken'],
+ 'oauth_token_secret':properties['accessTokenSecret']}
+
+ http = oauth_wrap.get_authorised_http(oauth_parameters)
+ response, content = http.request(url)
+ logging.debug(response)
+ logging.debug(content)
+
+ return response,content
def buzz_login():
- buzz_discovery = build("buzz", "v1").auth_discovery()
+ buzz_discovery = build("buzz", "v1").auth_discovery()
- flow = FlowThreeLegged(buzz_discovery,
- consumer_key='anonymous',
- consumer_secret='anonymous',
- user_agent='google-api-client-python-buzz-cmdline/1.0',
- domain='anonymous',
- scope='https://www.googleapis.com/auth/buzz',
- xoauth_displayname='oacurl.py')
+ flow = FlowThreeLegged(buzz_discovery,
+ consumer_key='anonymous',
+ consumer_secret='anonymous',
+ user_agent='google-api-client-python-buzz-cmdline/1.0',
+ domain='anonymous',
+ scope='https://www.googleapis.com/auth/buzz',
+ xoauth_displayname='oacurl.py')
- authorize_url = flow.step1_get_authorize_url()
+ authorize_url = flow.step1_get_authorize_url()
- print 'Go to the following link in your browser:'
- print authorize_url
- print
+ print 'Go to the following link in your browser:'
+ print authorize_url
+ print
- accepted = 'n'
- while accepted.lower() == 'n':
- accepted = raw_input('Have you authorized me? (y/n) ')
- verification = raw_input('What is the verification code? ').strip()
+ accepted = 'n'
+ while accepted.lower() == 'n':
+ accepted = raw_input('Have you authorized me? (y/n) ')
+ verification = raw_input('What is the verification code? ').strip()
- credentials = flow.step2_exchange(verification)
- path = os.path.expanduser('~/.oacurl.properties')
- save_properties('anonymous', 'anonymous', credentials.token.key, credentials.token.secret,path)
-
-
+ credentials = flow.step2_exchange(verification)
+ path = os.path.expanduser('~/.oacurl.properties')
+ save_properties('anonymous', 'anonymous', credentials.token.key, credentials.token.secret,path)
+
+
def generic_login():
- #TODO(ade) Implement support for other services
- print 'Support for services other than Buzz is not implemented yet. Sorry.'
+ #TODO(ade) Implement support for other services
+ print 'Support for services other than Buzz is not implemented yet. Sorry.'
+
def login(options):
- if options.buzz:
- buzz_login()
- else:
- generic_login()
+ if options.buzz:
+ buzz_login()
+ else:
+ generic_login()
def get_command(args):
- if args[0] == 'login':
- return 'login'
- if args[0] == 'fetch':
- return 'fetch'
- return None
+ if args and args[0] == 'login':
+ return 'login'
+ if args and args[0] == 'fetch':
+ return 'fetch'
+ return None
+
def configure_logging(options):
- if options.verbose:
- logging.basicConfig(level=logging.DEBUG)
+ if options.verbose:
+ logging.basicConfig(level=logging.DEBUG)
+
def main():
- usage = '''Usage: %prog [options] fetch <url>
- Example: %prog -v fetch "https://www.googleapis.com/buzz/v1/people/@me/@self?alt=json&pp=1"
- '''
- parser = optparse.OptionParser(usage=usage)
- parser.set_defaults(verbose=False)
- parser.add_option('-v', '--verbose', action='store_true', dest='verbose')
- parser.add_option('-q', '--quiet', action='store_false', dest='verbose')
- parser.add_option('--buzz', action='store_true', dest='buzz')
-
- (options, args) = parser.parse_args()
+ usage = '''Usage: %prog [options] fetch <url>
+ Example: %prog -v fetch "https://www.googleapis.com/buzz/v1/people/@me/@self?alt=json&pp=1"
+ '''
+ parser = optparse.OptionParser(usage=usage)
+ parser.set_defaults(verbose=False)
+ parser.add_option('-v', '--verbose', action='store_true', dest='verbose')
+ parser.add_option('-q', '--quiet', action='store_false', dest='verbose')
+ parser.add_option('--buzz', action='store_true', dest='buzz')
+
+ (options, args) = parser.parse_args()
- configure_logging(options)
- logging.debug('Options: %s and Args: %s' % (str(options), str(args)))
-
- command = get_command(args)
-
- if not command:
- parser.error('Invalid arguments')
- return
-
- if command == 'fetch':
- response, content = fetch(args[1])
- print response
- print content
- return
-
- if command == 'login':
- login(options)
+ configure_logging(options)
+ logging.debug('Options: %s and Args: %s' % (str(options), str(args)))
+
+ command = get_command(args)
+
+ if not command:
+ parser.error('Invalid arguments')
+ return
+
+ if command == 'fetch':
+ response, content = fetch(args[1])
+ print response
+ print content
+ return
+
+ if command == 'login':
+ login(options)
if __name__ == '__main__':
- main()
\ No newline at end of file
+ main()
\ No newline at end of file