Joe Gregorio | 05375c0 | 2011-03-29 16:29:37 -0400 | [diff] [blame] | 1 | # Copyright (C) 2010 Google Inc. |
| 2 | # |
| 3 | # Licensed under the Apache License, Version 2.0 (the "License"); |
| 4 | # you may not use this file except in compliance with the License. |
| 5 | # You may obtain a copy of the License at |
| 6 | # |
| 7 | # http://www.apache.org/licenses/LICENSE-2.0 |
| 8 | # |
| 9 | # Unless required by applicable law or agreed to in writing, software |
| 10 | # distributed under the License is distributed on an "AS IS" BASIS, |
| 11 | # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 12 | # See the License for the specific language governing permissions and |
| 13 | # limitations under the License. |
| 14 | """Sample for threading and queues. |
| 15 | |
| 16 | A simple sample that processes many requests by constructing a threadpool and |
| 17 | passing client requests by a thread queue to be processed. |
| 18 | """ |
Joe Gregorio | fffa7d7 | 2011-02-18 17:20:39 -0500 | [diff] [blame] | 19 | from apiclient.discovery import build |
| 20 | from apiclient.errors import HttpError |
Joe Gregorio | 05375c0 | 2011-03-29 16:29:37 -0400 | [diff] [blame] | 21 | from oauth2client.file import Storage |
| 22 | from oauth2client.client import OAuth2WebServerFlow |
| 23 | from oauth2client.tools import run |
Joe Gregorio | 0c73a67 | 2010-10-14 08:27:59 -0400 | [diff] [blame] | 24 | |
| 25 | import Queue |
Joe Gregorio | 05375c0 | 2011-03-29 16:29:37 -0400 | [diff] [blame] | 26 | import gflags |
Joe Gregorio | 0c73a67 | 2010-10-14 08:27:59 -0400 | [diff] [blame] | 27 | import httplib2 |
Joe Gregorio | 05375c0 | 2011-03-29 16:29:37 -0400 | [diff] [blame] | 28 | import logging |
| 29 | import sys |
Joe Gregorio | 0c73a67 | 2010-10-14 08:27:59 -0400 | [diff] [blame] | 30 | import threading |
| 31 | import time |
| 32 | |
Joe Gregorio | 05375c0 | 2011-03-29 16:29:37 -0400 | [diff] [blame] | 33 | # How many threads to start. |
| 34 | NUM_THREADS = 3 |
Joe Gregorio | 0c73a67 | 2010-10-14 08:27:59 -0400 | [diff] [blame] | 35 | |
Joe Gregorio | 05375c0 | 2011-03-29 16:29:37 -0400 | [diff] [blame] | 36 | # A list of URLs to shorten. |
| 37 | BULK = [ |
Joe Gregorio | 05375c0 | 2011-03-29 16:29:37 -0400 | [diff] [blame] | 38 | "https://code.google.com/apis/moderator/", |
| 39 | "https://code.google.com/apis/latitude/", |
| 40 | "https://code.google.com/apis/urlshortener/", |
| 41 | "https://code.google.com/apis/customsearch/", |
| 42 | "https://code.google.com/apis/shopping/search/", |
| 43 | "https://code.google.com/apis/predict", |
| 44 | "https://code.google.com/more", |
| 45 | ] |
Joe Gregorio | 0c73a67 | 2010-10-14 08:27:59 -0400 | [diff] [blame] | 46 | |
Joe Gregorio | 05375c0 | 2011-03-29 16:29:37 -0400 | [diff] [blame] | 47 | FLAGS = gflags.FLAGS |
| 48 | FLOW = OAuth2WebServerFlow( |
| 49 | client_id='433807057907.apps.googleusercontent.com', |
| 50 | client_secret='jigtZpMApkRxncxikFpR+SFg', |
| 51 | scope='https://www.googleapis.com/auth/urlshortener', |
| 52 | user_agent='urlshortener-cmdline-sample/1.0') |
| 53 | |
| 54 | gflags.DEFINE_enum('logging_level', 'ERROR', |
| 55 | ['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'], |
| 56 | 'Set the level of logging detail.') |
Joe Gregorio | 0c73a67 | 2010-10-14 08:27:59 -0400 | [diff] [blame] | 57 | |
| 58 | queue = Queue.Queue() |
| 59 | |
| 60 | |
| 61 | class Backoff: |
| 62 | """Exponential Backoff |
| 63 | |
| 64 | Implements an exponential backoff algorithm. |
Joe Gregorio | 05375c0 | 2011-03-29 16:29:37 -0400 | [diff] [blame] | 65 | Instantiate and call loop() each time through |
| 66 | the loop, and each time a request fails call |
| 67 | fail() which will delay an appropriate amount |
| 68 | of time. |
Joe Gregorio | 0c73a67 | 2010-10-14 08:27:59 -0400 | [diff] [blame] | 69 | """ |
Joe Gregorio | af276d2 | 2010-12-09 14:26:58 -0500 | [diff] [blame] | 70 | |
Joe Gregorio | 0c73a67 | 2010-10-14 08:27:59 -0400 | [diff] [blame] | 71 | def __init__(self, maxretries=8): |
| 72 | self.retry = 0 |
| 73 | self.maxretries = maxretries |
| 74 | self.first = True |
| 75 | |
| 76 | def loop(self): |
| 77 | if self.first: |
| 78 | self.first = False |
| 79 | return True |
| 80 | else: |
| 81 | return self.retry < self.maxretries |
| 82 | |
| 83 | def fail(self): |
| 84 | self.retry += 1 |
Joe Gregorio | af276d2 | 2010-12-09 14:26:58 -0500 | [diff] [blame] | 85 | delay = 2 ** self.retry |
Joe Gregorio | 0c73a67 | 2010-10-14 08:27:59 -0400 | [diff] [blame] | 86 | time.sleep(delay) |
| 87 | |
| 88 | |
| 89 | def start_threads(credentials): |
Joe Gregorio | 05375c0 | 2011-03-29 16:29:37 -0400 | [diff] [blame] | 90 | """Create the thread pool to process the requests.""" |
Joe Gregorio | 0c73a67 | 2010-10-14 08:27:59 -0400 | [diff] [blame] | 91 | |
Joe Gregorio | 05375c0 | 2011-03-29 16:29:37 -0400 | [diff] [blame] | 92 | def process_requests(n): |
Joe Gregorio | 0c73a67 | 2010-10-14 08:27:59 -0400 | [diff] [blame] | 93 | http = httplib2.Http() |
| 94 | http = credentials.authorize(http) |
Joe Gregorio | 05375c0 | 2011-03-29 16:29:37 -0400 | [diff] [blame] | 95 | loop = True |
Joe Gregorio | 0c73a67 | 2010-10-14 08:27:59 -0400 | [diff] [blame] | 96 | |
Joe Gregorio | 05375c0 | 2011-03-29 16:29:37 -0400 | [diff] [blame] | 97 | |
| 98 | while loop: |
Joe Gregorio | 0c73a67 | 2010-10-14 08:27:59 -0400 | [diff] [blame] | 99 | request = queue.get() |
| 100 | backoff = Backoff() |
| 101 | while backoff.loop(): |
| 102 | try: |
Joe Gregorio | 68a8cfe | 2012-08-03 16:17:40 -0400 | [diff] [blame^] | 103 | response = request.execute(http=http) |
Joe Gregorio | 05375c0 | 2011-03-29 16:29:37 -0400 | [diff] [blame] | 104 | print "Processed: %s in thread %d" % (response['id'], n) |
Joe Gregorio | 0c73a67 | 2010-10-14 08:27:59 -0400 | [diff] [blame] | 105 | break |
| 106 | except HttpError, e: |
| 107 | if e.resp.status in [402, 403, 408, 503, 504]: |
| 108 | print "Increasing backoff, got status code: %d" % e.resp.status |
| 109 | backoff.fail() |
Joe Gregorio | 05375c0 | 2011-03-29 16:29:37 -0400 | [diff] [blame] | 110 | except Exception, e: |
| 111 | print "Unexpected error. Exiting." + str(e) |
| 112 | loop = False |
Joe Gregorio | fffa7d7 | 2011-02-18 17:20:39 -0500 | [diff] [blame] | 113 | break |
Joe Gregorio | 0c73a67 | 2010-10-14 08:27:59 -0400 | [diff] [blame] | 114 | |
| 115 | print "Completed request" |
| 116 | queue.task_done() |
| 117 | |
Joe Gregorio | fffa7d7 | 2011-02-18 17:20:39 -0500 | [diff] [blame] | 118 | |
Joe Gregorio | 0c73a67 | 2010-10-14 08:27:59 -0400 | [diff] [blame] | 119 | for i in range(NUM_THREADS): |
Joe Gregorio | 05375c0 | 2011-03-29 16:29:37 -0400 | [diff] [blame] | 120 | t = threading.Thread(target=process_requests, args=[i]) |
Joe Gregorio | 0c73a67 | 2010-10-14 08:27:59 -0400 | [diff] [blame] | 121 | t.daemon = True |
| 122 | t.start() |
| 123 | |
Joe Gregorio | 0c73a67 | 2010-10-14 08:27:59 -0400 | [diff] [blame] | 124 | |
Joe Gregorio | 05375c0 | 2011-03-29 16:29:37 -0400 | [diff] [blame] | 125 | def main(argv): |
| 126 | try: |
| 127 | argv = FLAGS(argv) |
| 128 | except gflags.FlagsError, e: |
| 129 | print '%s\\nUsage: %s ARGS\\n%s' % (e, argv[0], FLAGS) |
| 130 | sys.exit(1) |
| 131 | |
| 132 | logging.getLogger().setLevel(getattr(logging, FLAGS.logging_level)) |
| 133 | |
| 134 | storage = Storage('threadqueue.dat') |
Joe Gregorio | fffa7d7 | 2011-02-18 17:20:39 -0500 | [diff] [blame] | 135 | credentials = storage.get() |
| 136 | if credentials is None or credentials.invalid == True: |
Joe Gregorio | 05375c0 | 2011-03-29 16:29:37 -0400 | [diff] [blame] | 137 | credentials = run(FLOW, storage) |
Joe Gregorio | 0c73a67 | 2010-10-14 08:27:59 -0400 | [diff] [blame] | 138 | |
| 139 | start_threads(credentials) |
| 140 | |
| 141 | http = httplib2.Http() |
| 142 | http = credentials.authorize(http) |
| 143 | |
Joe Gregorio | 05375c0 | 2011-03-29 16:29:37 -0400 | [diff] [blame] | 144 | service = build("urlshortener", "v1", http=http, |
| 145 | developerKey="AIzaSyDRRpR3GS1F1_jKNNM9HCNd2wJQyPG3oN0") |
| 146 | shortener = service.url() |
Joe Gregorio | 0c73a67 | 2010-10-14 08:27:59 -0400 | [diff] [blame] | 147 | |
Joe Gregorio | 05375c0 | 2011-03-29 16:29:37 -0400 | [diff] [blame] | 148 | for url in BULK: |
| 149 | body = {"longUrl": url } |
| 150 | shorten_request = shortener.insert(body=body) |
| 151 | print "Adding request to queue" |
| 152 | queue.put(shorten_request) |
Joe Gregorio | 0c73a67 | 2010-10-14 08:27:59 -0400 | [diff] [blame] | 153 | |
Joe Gregorio | 05375c0 | 2011-03-29 16:29:37 -0400 | [diff] [blame] | 154 | # Wait for all the requests to finish |
Joe Gregorio | 0c73a67 | 2010-10-14 08:27:59 -0400 | [diff] [blame] | 155 | queue.join() |
| 156 | |
| 157 | |
| 158 | if __name__ == "__main__": |
Joe Gregorio | 05375c0 | 2011-03-29 16:29:37 -0400 | [diff] [blame] | 159 | main(sys.argv) |