Switch over the threadqueue sample from moderator to urlshortener.
diff --git a/samples/threadqueue/main.py b/samples/threadqueue/main.py
index 5415a65..6ccea7c 100644
--- a/samples/threadqueue/main.py
+++ b/samples/threadqueue/main.py
@@ -1,22 +1,60 @@
-from apiclient.discovery import build
+# Copyright (C) 2010 Google Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+"""Sample for threading and queues.
+
+A simple sample that processes many requests by constructing a threadpool and
+passing client requests by a thread queue to be processed.
+"""
from apiclient.discovery import build
from apiclient.errors import HttpError
-from apiclient.ext.authtools import run
-from apiclient.ext.file import Storage
-from apiclient.oauth import CredentialsInvalidError
-from apiclient.oauth import FlowThreeLegged
+from oauth2client.file import Storage
+from oauth2client.client import OAuth2WebServerFlow
+from oauth2client.tools import run
import Queue
+import gflags
import httplib2
+import logging
+import sys
import threading
import time
-# Uncomment to get detailed logging
-# httplib2.debuglevel = 4
+# How many threads to start.
+NUM_THREADS = 3
+# A list of URLs to shorten.
+BULK = [
+ "https://code.google.com/apis/buzz/",
+ "https://code.google.com/apis/moderator/",
+ "https://code.google.com/apis/latitude/",
+ "https://code.google.com/apis/urlshortener/",
+ "https://code.google.com/apis/customsearch/",
+ "https://code.google.com/apis/shopping/search/",
+ "https://code.google.com/apis/predict",
+ "https://code.google.com/more",
+ ]
-NUM_THREADS = 4
-NUM_ITEMS = 40
+FLAGS = gflags.FLAGS
+FLOW = OAuth2WebServerFlow(
+ client_id='433807057907.apps.googleusercontent.com',
+ client_secret='jigtZpMApkRxncxikFpR+SFg',
+ scope='https://www.googleapis.com/auth/urlshortener',
+ user_agent='urlshortener-cmdline-sample/1.0')
+
+gflags.DEFINE_enum('logging_level', 'ERROR',
+ ['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'],
+ 'Set the level of logging detail.')
queue = Queue.Queue()
@@ -25,6 +63,10 @@
"""Exponential Backoff
Implements an exponential backoff algorithm.
+ Instantiate and call loop() each time through
+ the loop, and each time a request fails call
+ fail() which will delay an appropriate amount
+ of time.
"""
def __init__(self, maxretries=8):
@@ -46,27 +88,29 @@
def start_threads(credentials):
- # Start up NUM_THREADS to handle requests
+ """Create the thread pool to process the requests."""
- def process_requests():
+ def process_requests(n):
http = httplib2.Http()
http = credentials.authorize(http)
- credentials_ok = True
+ loop = True
- while credentials_ok:
+
+ while loop:
request = queue.get()
backoff = Backoff()
while backoff.loop():
try:
- request.execute(http)
+ response = request.execute(http)
+ print "Processed: %s in thread %d" % (response['id'], n)
break
except HttpError, e:
if e.resp.status in [402, 403, 408, 503, 504]:
print "Increasing backoff, got status code: %d" % e.resp.status
backoff.fail()
- except CredentialsInvalidError:
- print "Credentials no long valid. Exiting."
- credentials_ok = False
+ except Exception, e:
+ print "Unexpected error. Exiting." + str(e)
+ loop = False
break
print "Completed request"
@@ -74,64 +118,43 @@
for i in range(NUM_THREADS):
- t = threading.Thread(target=process_requests)
+ t = threading.Thread(target=process_requests, args=[i])
t.daemon = True
t.start()
-def main():
- storage = Storage('moderator.dat')
+def main(argv):
+ try:
+ argv = FLAGS(argv)
+ except gflags.FlagsError, e:
+ print '%s\\nUsage: %s ARGS\\n%s' % (e, argv[0], FLAGS)
+ sys.exit(1)
+
+ logging.getLogger().setLevel(getattr(logging, FLAGS.logging_level))
+
+ storage = Storage('threadqueue.dat')
credentials = storage.get()
if credentials is None or credentials.invalid == True:
- moderator_discovery = build("moderator", "v1").auth_discovery()
-
- flow = FlowThreeLegged(moderator_discovery,
- consumer_key='anonymous',
- consumer_secret='anonymous',
- user_agent='python-threading-sample/1.0',
- domain='anonymous',
- scope='https://www.googleapis.com/auth/moderator',
- xoauth_displayname='Google API Client Example App')
-
- credentials = run(flow, storage)
+ credentials = run(FLOW, storage)
start_threads(credentials)
http = httplib2.Http()
http = credentials.authorize(http)
- service = build("moderator", "v1", http=http)
+ service = build("urlshortener", "v1", http=http,
+ developerKey="AIzaSyDRRpR3GS1F1_jKNNM9HCNd2wJQyPG3oN0")
+ shortener = service.url()
- series_body = {
- "data": {
- "description": "An example of bulk creating topics",
- "name": "Using threading and queues",
- "videoSubmissionAllowed": False
- }
- }
- try:
- series = service.series().insert(body=series_body).execute()
- print "Created a new series"
+ for url in BULK:
+ body = {"longUrl": url }
+ shorten_request = shortener.insert(body=body)
+ print "Adding request to queue"
+ queue.put(shorten_request)
- for i in range(NUM_ITEMS):
- topic_body = {
- "data": {
- "description": "Sample Topic # %d" % i,
- "name": "Sample",
- "presenter": "me"
- }
- }
- topic_request = service.topics().insert(
- seriesId=series['id']['seriesId'], body=topic_body)
- print "Adding request to queue"
- queue.put(topic_request)
- except CredentialsInvalidError:
- print 'Your credentials are no longer valid.'
- print 'Please re-run this application to re-authorize.'
-
-
+ # Wait for all the requests to finish
queue.join()
if __name__ == "__main__":
- main()
+ main(sys.argv)