blob: 6ccea7c08eec71a98ffd810dd808df74a2a6000d [file] [log] [blame]
Joe Gregorio05375c02011-03-29 16:29:37 -04001# Copyright (C) 2010 Google Inc.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14"""Sample for threading and queues.
15
16A simple sample that processes many requests by constructing a threadpool and
17passing client requests by a thread queue to be processed.
18"""
Joe Gregoriofffa7d72011-02-18 17:20:39 -050019from apiclient.discovery import build
20from apiclient.errors import HttpError
Joe Gregorio05375c02011-03-29 16:29:37 -040021from oauth2client.file import Storage
22from oauth2client.client import OAuth2WebServerFlow
23from oauth2client.tools import run
Joe Gregorio0c73a672010-10-14 08:27:59 -040024
25import Queue
Joe Gregorio05375c02011-03-29 16:29:37 -040026import gflags
Joe Gregorio0c73a672010-10-14 08:27:59 -040027import httplib2
Joe Gregorio05375c02011-03-29 16:29:37 -040028import logging
29import sys
Joe Gregorio0c73a672010-10-14 08:27:59 -040030import threading
31import time
32
Joe Gregorio05375c02011-03-29 16:29:37 -040033# How many threads to start.
34NUM_THREADS = 3
Joe Gregorio0c73a672010-10-14 08:27:59 -040035
Joe Gregorio05375c02011-03-29 16:29:37 -040036# A list of URLs to shorten.
37BULK = [
38 "https://code.google.com/apis/buzz/",
39 "https://code.google.com/apis/moderator/",
40 "https://code.google.com/apis/latitude/",
41 "https://code.google.com/apis/urlshortener/",
42 "https://code.google.com/apis/customsearch/",
43 "https://code.google.com/apis/shopping/search/",
44 "https://code.google.com/apis/predict",
45 "https://code.google.com/more",
46 ]
Joe Gregorio0c73a672010-10-14 08:27:59 -040047
Joe Gregorio05375c02011-03-29 16:29:37 -040048FLAGS = gflags.FLAGS
49FLOW = OAuth2WebServerFlow(
50 client_id='433807057907.apps.googleusercontent.com',
51 client_secret='jigtZpMApkRxncxikFpR+SFg',
52 scope='https://www.googleapis.com/auth/urlshortener',
53 user_agent='urlshortener-cmdline-sample/1.0')
54
55gflags.DEFINE_enum('logging_level', 'ERROR',
56 ['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'],
57 'Set the level of logging detail.')
Joe Gregorio0c73a672010-10-14 08:27:59 -040058
59queue = Queue.Queue()
60
61
62class Backoff:
63 """Exponential Backoff
64
65 Implements an exponential backoff algorithm.
Joe Gregorio05375c02011-03-29 16:29:37 -040066 Instantiate and call loop() each time through
67 the loop, and each time a request fails call
68 fail() which will delay an appropriate amount
69 of time.
Joe Gregorio0c73a672010-10-14 08:27:59 -040070 """
Joe Gregorioaf276d22010-12-09 14:26:58 -050071
Joe Gregorio0c73a672010-10-14 08:27:59 -040072 def __init__(self, maxretries=8):
73 self.retry = 0
74 self.maxretries = maxretries
75 self.first = True
76
77 def loop(self):
78 if self.first:
79 self.first = False
80 return True
81 else:
82 return self.retry < self.maxretries
83
84 def fail(self):
85 self.retry += 1
Joe Gregorioaf276d22010-12-09 14:26:58 -050086 delay = 2 ** self.retry
Joe Gregorio0c73a672010-10-14 08:27:59 -040087 time.sleep(delay)
88
89
90def start_threads(credentials):
Joe Gregorio05375c02011-03-29 16:29:37 -040091 """Create the thread pool to process the requests."""
Joe Gregorio0c73a672010-10-14 08:27:59 -040092
Joe Gregorio05375c02011-03-29 16:29:37 -040093 def process_requests(n):
Joe Gregorio0c73a672010-10-14 08:27:59 -040094 http = httplib2.Http()
95 http = credentials.authorize(http)
Joe Gregorio05375c02011-03-29 16:29:37 -040096 loop = True
Joe Gregorio0c73a672010-10-14 08:27:59 -040097
Joe Gregorio05375c02011-03-29 16:29:37 -040098
99 while loop:
Joe Gregorio0c73a672010-10-14 08:27:59 -0400100 request = queue.get()
101 backoff = Backoff()
102 while backoff.loop():
103 try:
Joe Gregorio05375c02011-03-29 16:29:37 -0400104 response = request.execute(http)
105 print "Processed: %s in thread %d" % (response['id'], n)
Joe Gregorio0c73a672010-10-14 08:27:59 -0400106 break
107 except HttpError, e:
108 if e.resp.status in [402, 403, 408, 503, 504]:
109 print "Increasing backoff, got status code: %d" % e.resp.status
110 backoff.fail()
Joe Gregorio05375c02011-03-29 16:29:37 -0400111 except Exception, e:
112 print "Unexpected error. Exiting." + str(e)
113 loop = False
Joe Gregoriofffa7d72011-02-18 17:20:39 -0500114 break
Joe Gregorio0c73a672010-10-14 08:27:59 -0400115
116 print "Completed request"
117 queue.task_done()
118
Joe Gregoriofffa7d72011-02-18 17:20:39 -0500119
Joe Gregorio0c73a672010-10-14 08:27:59 -0400120 for i in range(NUM_THREADS):
Joe Gregorio05375c02011-03-29 16:29:37 -0400121 t = threading.Thread(target=process_requests, args=[i])
Joe Gregorio0c73a672010-10-14 08:27:59 -0400122 t.daemon = True
123 t.start()
124
Joe Gregorio0c73a672010-10-14 08:27:59 -0400125
Joe Gregorio05375c02011-03-29 16:29:37 -0400126def main(argv):
127 try:
128 argv = FLAGS(argv)
129 except gflags.FlagsError, e:
130 print '%s\\nUsage: %s ARGS\\n%s' % (e, argv[0], FLAGS)
131 sys.exit(1)
132
133 logging.getLogger().setLevel(getattr(logging, FLAGS.logging_level))
134
135 storage = Storage('threadqueue.dat')
Joe Gregoriofffa7d72011-02-18 17:20:39 -0500136 credentials = storage.get()
137 if credentials is None or credentials.invalid == True:
Joe Gregorio05375c02011-03-29 16:29:37 -0400138 credentials = run(FLOW, storage)
Joe Gregorio0c73a672010-10-14 08:27:59 -0400139
140 start_threads(credentials)
141
142 http = httplib2.Http()
143 http = credentials.authorize(http)
144
Joe Gregorio05375c02011-03-29 16:29:37 -0400145 service = build("urlshortener", "v1", http=http,
146 developerKey="AIzaSyDRRpR3GS1F1_jKNNM9HCNd2wJQyPG3oN0")
147 shortener = service.url()
Joe Gregorio0c73a672010-10-14 08:27:59 -0400148
Joe Gregorio05375c02011-03-29 16:29:37 -0400149 for url in BULK:
150 body = {"longUrl": url }
151 shorten_request = shortener.insert(body=body)
152 print "Adding request to queue"
153 queue.put(shorten_request)
Joe Gregorio0c73a672010-10-14 08:27:59 -0400154
Joe Gregorio05375c02011-03-29 16:29:37 -0400155 # Wait for all the requests to finish
Joe Gregorio0c73a672010-10-14 08:27:59 -0400156 queue.join()
157
158
159if __name__ == "__main__":
Joe Gregorio05375c02011-03-29 16:29:37 -0400160 main(sys.argv)