blob: 5b9764a2690ddf67160e1ea3d3be8dcb9acfcde4 [file] [log] [blame]
Joe Gregorio05375c02011-03-29 16:29:37 -04001# Copyright (C) 2010 Google Inc.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14"""Sample for threading and queues.
15
16A simple sample that processes many requests by constructing a threadpool and
17passing client requests by a thread queue to be processed.
18"""
Joe Gregoriofffa7d72011-02-18 17:20:39 -050019from apiclient.discovery import build
20from apiclient.errors import HttpError
Joe Gregorio05375c02011-03-29 16:29:37 -040021from oauth2client.file import Storage
22from oauth2client.client import OAuth2WebServerFlow
23from oauth2client.tools import run
Joe Gregorio0c73a672010-10-14 08:27:59 -040024
25import Queue
Joe Gregorio05375c02011-03-29 16:29:37 -040026import gflags
Joe Gregorio0c73a672010-10-14 08:27:59 -040027import httplib2
Joe Gregorio05375c02011-03-29 16:29:37 -040028import logging
29import sys
Joe Gregorio0c73a672010-10-14 08:27:59 -040030import threading
31import time
32
Joe Gregorio05375c02011-03-29 16:29:37 -040033# How many threads to start.
34NUM_THREADS = 3
Joe Gregorio0c73a672010-10-14 08:27:59 -040035
Joe Gregorio05375c02011-03-29 16:29:37 -040036# A list of URLs to shorten.
37BULK = [
Joe Gregorio05375c02011-03-29 16:29:37 -040038 "https://code.google.com/apis/moderator/",
39 "https://code.google.com/apis/latitude/",
40 "https://code.google.com/apis/urlshortener/",
41 "https://code.google.com/apis/customsearch/",
42 "https://code.google.com/apis/shopping/search/",
43 "https://code.google.com/apis/predict",
44 "https://code.google.com/more",
45 ]
Joe Gregorio0c73a672010-10-14 08:27:59 -040046
Joe Gregorio05375c02011-03-29 16:29:37 -040047FLAGS = gflags.FLAGS
48FLOW = OAuth2WebServerFlow(
49 client_id='433807057907.apps.googleusercontent.com',
50 client_secret='jigtZpMApkRxncxikFpR+SFg',
51 scope='https://www.googleapis.com/auth/urlshortener',
52 user_agent='urlshortener-cmdline-sample/1.0')
53
54gflags.DEFINE_enum('logging_level', 'ERROR',
55 ['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'],
56 'Set the level of logging detail.')
Joe Gregorio0c73a672010-10-14 08:27:59 -040057
58queue = Queue.Queue()
59
60
61class Backoff:
62 """Exponential Backoff
63
64 Implements an exponential backoff algorithm.
Joe Gregorio05375c02011-03-29 16:29:37 -040065 Instantiate and call loop() each time through
66 the loop, and each time a request fails call
67 fail() which will delay an appropriate amount
68 of time.
Joe Gregorio0c73a672010-10-14 08:27:59 -040069 """
Joe Gregorioaf276d22010-12-09 14:26:58 -050070
Joe Gregorio0c73a672010-10-14 08:27:59 -040071 def __init__(self, maxretries=8):
72 self.retry = 0
73 self.maxretries = maxretries
74 self.first = True
75
76 def loop(self):
77 if self.first:
78 self.first = False
79 return True
80 else:
81 return self.retry < self.maxretries
82
83 def fail(self):
84 self.retry += 1
Joe Gregorioaf276d22010-12-09 14:26:58 -050085 delay = 2 ** self.retry
Joe Gregorio0c73a672010-10-14 08:27:59 -040086 time.sleep(delay)
87
88
89def start_threads(credentials):
Joe Gregorio05375c02011-03-29 16:29:37 -040090 """Create the thread pool to process the requests."""
Joe Gregorio0c73a672010-10-14 08:27:59 -040091
Joe Gregorio05375c02011-03-29 16:29:37 -040092 def process_requests(n):
Joe Gregorio0c73a672010-10-14 08:27:59 -040093 http = httplib2.Http()
94 http = credentials.authorize(http)
Joe Gregorio05375c02011-03-29 16:29:37 -040095 loop = True
Joe Gregorio0c73a672010-10-14 08:27:59 -040096
Joe Gregorio05375c02011-03-29 16:29:37 -040097
98 while loop:
Joe Gregorio0c73a672010-10-14 08:27:59 -040099 request = queue.get()
100 backoff = Backoff()
101 while backoff.loop():
102 try:
Joe Gregorio68a8cfe2012-08-03 16:17:40 -0400103 response = request.execute(http=http)
Joe Gregorio05375c02011-03-29 16:29:37 -0400104 print "Processed: %s in thread %d" % (response['id'], n)
Joe Gregorio0c73a672010-10-14 08:27:59 -0400105 break
106 except HttpError, e:
107 if e.resp.status in [402, 403, 408, 503, 504]:
108 print "Increasing backoff, got status code: %d" % e.resp.status
109 backoff.fail()
Joe Gregorio05375c02011-03-29 16:29:37 -0400110 except Exception, e:
111 print "Unexpected error. Exiting." + str(e)
112 loop = False
Joe Gregoriofffa7d72011-02-18 17:20:39 -0500113 break
Joe Gregorio0c73a672010-10-14 08:27:59 -0400114
115 print "Completed request"
116 queue.task_done()
117
Joe Gregoriofffa7d72011-02-18 17:20:39 -0500118
Joe Gregorio0c73a672010-10-14 08:27:59 -0400119 for i in range(NUM_THREADS):
Joe Gregorio05375c02011-03-29 16:29:37 -0400120 t = threading.Thread(target=process_requests, args=[i])
Joe Gregorio0c73a672010-10-14 08:27:59 -0400121 t.daemon = True
122 t.start()
123
Joe Gregorio0c73a672010-10-14 08:27:59 -0400124
Joe Gregorio05375c02011-03-29 16:29:37 -0400125def main(argv):
126 try:
127 argv = FLAGS(argv)
128 except gflags.FlagsError, e:
129 print '%s\\nUsage: %s ARGS\\n%s' % (e, argv[0], FLAGS)
130 sys.exit(1)
131
132 logging.getLogger().setLevel(getattr(logging, FLAGS.logging_level))
133
134 storage = Storage('threadqueue.dat')
Joe Gregoriofffa7d72011-02-18 17:20:39 -0500135 credentials = storage.get()
136 if credentials is None or credentials.invalid == True:
Joe Gregorio05375c02011-03-29 16:29:37 -0400137 credentials = run(FLOW, storage)
Joe Gregorio0c73a672010-10-14 08:27:59 -0400138
139 start_threads(credentials)
140
141 http = httplib2.Http()
142 http = credentials.authorize(http)
143
Joe Gregorio05375c02011-03-29 16:29:37 -0400144 service = build("urlshortener", "v1", http=http,
145 developerKey="AIzaSyDRRpR3GS1F1_jKNNM9HCNd2wJQyPG3oN0")
146 shortener = service.url()
Joe Gregorio0c73a672010-10-14 08:27:59 -0400147
Joe Gregorio05375c02011-03-29 16:29:37 -0400148 for url in BULK:
149 body = {"longUrl": url }
150 shorten_request = shortener.insert(body=body)
151 print "Adding request to queue"
152 queue.put(shorten_request)
Joe Gregorio0c73a672010-10-14 08:27:59 -0400153
Joe Gregorio05375c02011-03-29 16:29:37 -0400154 # Wait for all the requests to finish
Joe Gregorio0c73a672010-10-14 08:27:59 -0400155 queue.join()
156
157
158if __name__ == "__main__":
Joe Gregorio05375c02011-03-29 16:29:37 -0400159 main(sys.argv)