Add TaskQueue sample.
Index: samples/gtaskqueue_sample/README
===================================================================
new file mode 100755
diff --git a/samples/gtaskqueue_sample/README b/samples/gtaskqueue_sample/README
new file mode 100644
index 0000000..6535127
--- /dev/null
+++ b/samples/gtaskqueue_sample/README
@@ -0,0 +1,46 @@
+This acts as a sample as well as commandline tool for accessing Google TaskQueue
+APIs.
+
+Installation
+============
+
+To install, simply say
+
+ $ python setup.py install --record=files.txt
+
+to install the files and record what files are installed in files.txt.
+Make sure that you have already installed the google-python-client-libraries
+using the setup.py in the toplevel directory.
+
+You may need root priveleges for install.
+
+Running
+=======
+This sample provides following:
+1. gtaskqueue: This works as a command-line tool to access Google TaskQueue
+API. You can perform various operations on taskqueues such as leastask,
+getask, listtasks, deletetask, getqueue.
+Example usage:
+ i. To lease a task
+ gtaskqueue leasetask --taskqueue_name=<your queue_name>
+ -lease_secs=30 --project_name=<your appengine app_name>
+ ii. To get stats on a queue
+ gtaskqueue getqueue --taskqueue_name=<your queue_name>
+ --project_name=<your appengine app_name> --get_stats
+
+2. gtaskqueue_puller: This works as a worker to continuously pull tasks enqueued
+by your app,perform the task and the post the output back to your app.
+Example Usage:
+ gtaskqueue_puller --taskqueue_name=<your queue_name>
+ -lease_secs=30 --project_name=<your appengine app_name>
+ --executable_binary=”cat” --output_url=<url location if you want to pos the data
+ back(optional)>
+
+Third Party Libraries
+=====================
+
+These libraries will be installed when you install the client library:
+
+http://code.google.com/p/httplib2
+http://code.google.com/p/uri-templates
+http://github.com/simplegeo/python-oauth2
diff --git a/samples/gtaskqueue_sample/gtaskqueue/client_task.py b/samples/gtaskqueue_sample/gtaskqueue/client_task.py
new file mode 100644
index 0000000..5214e81
--- /dev/null
+++ b/samples/gtaskqueue_sample/gtaskqueue/client_task.py
@@ -0,0 +1,334 @@
+#!/usr/bin/env python
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""Class to encapsulate task related information and methods on task_puller."""
+
+
+
+import base64
+import oauth2 as oauth
+import os
+import subprocess
+import tempfile
+import time
+import urllib2
+from apiclient.errors import HttpError
+from gtaskqueue.taskqueue_logger import logger
+import gflags as flags
+
+
+FLAGS = flags.FLAGS
+flags.DEFINE_string(
+ 'executable_binary',
+ '/bin/cat',
+ 'path of the binary to be executed')
+flags.DEFINE_string(
+ 'output_url',
+ '',
+ 'url to which output is posted. The url must include param name, '
+ 'value for which is populated with task_id from puller while posting '
+ 'the data. Format of output url is absolute url which handles the'
+ 'post request from task queue puller.'
+ '(Eg: "http://taskpuller.appspot.com/taskdata?name=").'
+ 'The Param value is always the task_id. The handler for this post'
+ 'should be able to associate the task with its id and take'
+ 'appropriate action. Use the appengine_access_token.py tool to'
+ 'generate the token and store it in a file before you start.')
+flags.DEFINE_string(
+ 'appengine_access_token_file',
+ None,
+ 'File containing an Appengine Access token, if any. If present this'
+ 'token is added to the output_url request, so that the output_url can'
+ 'be an authenticated end-point. Use the appengine_access_token.py tool'
+ 'to generate the token and store it in a file before you start.')
+flags.DEFINE_float(
+ 'task_timeout_secs',
+ '3600',
+ 'timeout to kill the task')
+
+
+class ClientTaskInitError(Exception):
+ """Raised when initialization of client task fails."""
+
+ def __init__(self, task_id, error_str):
+ Exception.__init__(self)
+ self.task_id = task_id
+ self.error_str = error_str
+
+ def __str__(self):
+ return ('Error initializing task "%s". Error details "%s". '
+ % (self.task_id, self.error_str))
+
+
+class ClientTask(object):
+ """Class to encapsulate task information pulled by taskqueue_puller module.
+
+ This class is responsible for creating an independent client task object by
+ taking some information from lease response task object. It encapsulates
+ methods responsible for spawning an independent subprocess for executing
+ the task, tracking the status of the task and also deleting the task from
+ taskqeueue when completed. It also has the functionality to give the output
+ back to the application by posting to the specified url.
+ """
+
+ def __init__(self, task):
+ self._task = task
+ self._process = None
+ self._output_file = None
+
+ # Class method that caches the Appengine Access Token if any
+ @classmethod
+ def get_access_token(cls):
+ if not FLAGS.appengine_access_token_file:
+ return None
+ if not _access_token:
+ fhandle = open(FLAGS.appengine_access_token_file, 'rb')
+ _access_token = oauth.Token.from_string(fhandle.read())
+ fhandle.close()
+ return _access_token
+
+ def init(self):
+ """Extracts information from task object and intializes processing.
+
+ Extracts id and payload from task object, decodes the payload and puts
+ it in input file. After this, it spawns a subprocess to execute the
+ task.
+
+ Returns:
+ True if everything till task execution starts fine.
+ False if anything goes wrong in initialization of task execution.
+ """
+ try:
+ self.task_id = self._task.get('id')
+ self._payload = self._decode_base64_payload(
+ self._task.get('payloadBase64'))
+ self._payload_file = self._dump_payload_to_file()
+ self._start_task_execution()
+ return True
+ except ClientTaskInitError, ctie:
+ logger.error(str(ctie))
+ return False
+
+ def _decode_base64_payload(self, encoded_str):
+ """Method to decode payload encoded in base64."""
+ try:
+ # If the payload is empty, do not try to decode it. Payload usually
+ # not expected to be empty and hence log a warning and then
+ # continue.
+ if encoded_str:
+ decoded_str = base64.urlsafe_b64decode(
+ encoded_str.encode('utf-8'))
+ return decoded_str
+ else:
+ logger.warn('Empty paylaod for task %s' % self.task_id)
+ return ''
+ except base64.binascii.Error, berror:
+ logger.error('Error decoding payload for task %s. Error details %s'
+ % (self.task_id, str(berror)))
+ raise ClientTaskInitError(self.task_id, 'Error decoding payload')
+ # Generic catch block to avoid crashing of puller due to some bad
+ # encoding issue wih payload of any task.
+ except:
+ raise ClientTaskInitError(self.task_id, 'Error decoding payload')
+
+ def _dump_payload_to_file(self):
+ """Method to write input extracted from payload to a temporary file."""
+ try:
+ (fd, fname) = tempfile.mkstemp()
+ f = os.fdopen(fd, 'w')
+ f.write(self._payload)
+ f.close()
+ return fname
+ except OSError:
+ logger.error('Error dumping payload %s. Error details %s' %
+ (self.task_id, str(OSError)))
+ raise ClientTaskInitError(self.task_id, 'Error dumping payload')
+
+ def _get_input_file(self):
+ return self._payload_file
+
+ def _post_output(self):
+ """Posts the outback back to specified url in the form of a byte
+ array.
+
+ It reads the output generated by the task as a byte-array. It posts the
+ response to specified url appended with the taskId. The application
+ using the taskqueue must have a handler to handle the data being posted
+ from puller. Format of body of response object is byte-array to make
+ the it genric for any kind of output generated.
+
+ Returns:
+ True/False based on post status.
+ """
+ if FLAGS.output_url:
+ try:
+ f = open(self._get_output_file(), 'rb')
+ body = f.read()
+ f.close()
+ url = FLAGS.output_url + self.task_id
+ logger.debug('Posting data to url %s' % url)
+ headers = {'Content-Type': 'byte-array'}
+ # Add an access token to the headers if specified.
+ # This enables the output_url to be authenticated and not open.
+ access_token = ClientTask.get_access_token()
+ if access_token:
+ consumer = oauth.Consumer('anonymous', 'anonymous')
+ oauth_req = oauth.Request.from_consumer_and_token(
+ consumer,
+ token=access_token,
+ http_url=url)
+ headers.update(oauth_req.to_header())
+ # TODO: Use httplib instead of urllib for consistency.
+ req = urllib2.Request(url, body, headers)
+ urllib2.urlopen(req)
+ except ValueError:
+ logger.error('Error posting data back %s. Error details %s'
+ % (self.task_id, str(ValueError)))
+ return False
+ except Exception:
+ logger.error('Exception while posting data back %s. Error'
+ 'details %s' % (self.task_id, str(Exception)))
+ return False
+ return True
+
+ def _get_output_file(self):
+ """Returns the output file if it exists, else creates it and returns
+ it."""
+ if not self._output_file:
+ (_, self._output_file) = tempfile.mkstemp()
+ return self._output_file
+
+ def get_task_id(self):
+ return self.task_id
+
+ def _start_task_execution(self):
+ """Method to spawn subprocess to execute the tasks.
+
+ This method splits the commands/executable_binary to desired arguments
+ format for Popen API. It appends input and output files to the
+ arguments. It is assumed that commands/executable_binary expects input
+ and output files as first and second positional parameters
+ respectively.
+ """
+ # TODO: Add code to handle the cleanly shutdown when a process is killed
+ # by Ctrl+C.
+ try:
+ cmdline = FLAGS.executable_binary.split(' ')
+ cmdline.append(self._get_input_file())
+ cmdline.append(self._get_output_file())
+ self._process = subprocess.Popen(cmdline)
+ self.task_start_time = time.time()
+ except OSError:
+ logger.error('Error creating subprocess %s. Error details %s'
+ % (self.task_id, str(OSError)))
+ self._cleanup()
+ raise ClientTaskInitError(self.task_id,
+ 'Error creating subprocess')
+ except ValueError:
+ logger.error('Invalid arguments while executing task ',
+ self.task_id)
+ self._cleanup()
+ raise ClientTaskInitError(self.task_id,
+ 'Invalid arguments while executing task')
+
+ def is_completed(self, task_api):
+ """Method to check if task has finished executing.
+
+ This is responsible for checking status of task execution. If the task
+ has already finished executing, it deletes the task from the task
+ queue. If the task has been running since long time then it assumes
+ that there is high proabbility that it is dfunct and hence kills the
+ corresponding subprocess. In this case, task had not completed
+ successfully and hence we do not delete it form the taskqueue. In above
+ two cases, task completion status is returned as true since there is
+ nothing more to run in the task. In all other cases, task is still
+ running and hence we return false as completion status.
+
+ Args:
+ task_api: handle for taskqueue api collection.
+
+ Returns:
+ Task completion status (True/False)
+ """
+ status = False
+ try:
+ task_status = self._process.poll()
+ if task_status == 0:
+ status = True
+ if self._post_output():
+ self._delete_task_from_queue(task_api)
+ self._cleanup()
+ elif self._has_timedout():
+ status = True
+ self._kill_subprocess()
+ except OSError:
+ logger.error('Error during polling status of task %s, Error '
+ 'details %s' % (self.task_id, str(OSError)))
+ return status
+
+ def _cleanup(self):
+ """Cleans up temporary input/output files used in task execution."""
+ try:
+ if os.path.exists(self._get_input_file()):
+ os.remove(self._get_input_file())
+ if os.path.exists(self._get_output_file()):
+ os.remove(self._get_output_file())
+ except OSError:
+ logger.error('Error during file cleanup for task %s. Error'
+ 'details %s' % (self.task_id, str(OSError)))
+
+ def _delete_task_from_queue(self, task_api):
+ """Method to delete the task from the taskqueue.
+
+ First, it tries to post the output back to speified url. On successful
+ post, the task is deleted from taskqueue since the task has produced
+ expected output. If the post was unsuccessful, the task is not deleted
+ form the tskqueue since the expected output has yet not reached the
+ application. In either case cleanup is performed on the task.
+
+ Args:
+ task_api: handle for taskqueue api collection.
+
+ Returns:
+ Delete status (True/False)
+ """
+
+ try:
+ delete_request = task_api.tasks().delete(
+ project=FLAGS.project_name,
+ taskqueue=FLAGS.taskqueue_name,
+ task=self.task_id)
+ delete_request.execute()
+ except HttpError, http_error:
+ logger.error('Error deleting task %s from taskqueue.'
+ 'Error details %s'
+ % (self.task_id, str(http_error)))
+
+ def _has_timedout(self):
+ """Checks if task has been running since long and has timedout."""
+ if (time.time() - self.task_start_time) > FLAGS.task_timeout_secs:
+ return True
+ else:
+ return False
+
+ def _kill_subprocess(self):
+ """Kills the process after cleaning up the task."""
+ self._cleanup()
+ try:
+ self._process.kill()
+ logger.info('Trying to kill task %s, since it has been running '
+ 'for long' % self.task_id)
+ except OSError:
+ logger.error('Error killing task %s. Error details %s'
+ % (self.task_id, str(OSError)))
diff --git a/samples/gtaskqueue_sample/gtaskqueue/gen_appengine_access_token b/samples/gtaskqueue_sample/gtaskqueue/gen_appengine_access_token
new file mode 100644
index 0000000..eeef137
--- /dev/null
+++ b/samples/gtaskqueue_sample/gtaskqueue/gen_appengine_access_token
@@ -0,0 +1,116 @@
+#!/usr/bin/env python
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""Tool to get an Access Token to access an auth protected Appengine end point.
+
+This tool talks to the appengine end point, and gets an Access Token that is
+stored in a file. This token can be used by a tool to do authorized access to
+an appengine end point.
+"""
+
+
+
+from google.apputils import app
+import gflags as flags
+import httplib2
+import oauth2 as oauth
+import time
+
+FLAGS = flags.FLAGS
+
+flags.DEFINE_string(
+ 'appengine_host',
+ None,
+ 'Appengine Host for whom we are trying to get an access token')
+flags.DEFINE_string(
+ 'access_token_file',
+ None,
+ 'The file where the access token is stored')
+
+
+def get_access_token():
+ if not FLAGS.appengine_host:
+ print('must supply the appengine host')
+ exit(1)
+
+ # setup
+ server = FLAGS.appengine_host
+ request_token_url = server + '/_ah/OAuthGetRequestToken'
+ authorization_url = server + '/_ah/OAuthAuthorizeToken'
+ access_token_url = server + '/_ah/OAuthGetAccessToken'
+ consumer = oauth.Consumer('anonymous', 'anonymous')
+ signature_method_hmac_sha1 = oauth.SignatureMethod_HMAC_SHA1()
+
+ # The Http client that will be used to make the requests.
+ h = httplib2.Http()
+
+ # get request token
+ print '* Obtain a request token ...'
+ parameters = {}
+ # We dont have a callback server, we're going to use the browser to
+ # authorize.
+
+ #TODO: Add check for 401 etc
+ parameters['oauth_callback'] = 'oob'
+ oauth_req1 = oauth.Request.from_consumer_and_token(
+ consumer, http_url=request_token_url, parameters=parameters)
+ oauth_req1.sign_request(signature_method_hmac_sha1, consumer, None)
+ print 'Request headers: %s' % str(oauth_req1.to_header())
+ response, content = h.request(oauth_req1.to_url(), 'GET')
+ token = oauth.Token.from_string(content)
+ print 'GOT key: %s secret:%s' % (str(token.key), str(token.secret))
+
+ print '* Authorize the request token ...'
+ oauth_req2 = oauth.Request.from_token_and_callback(
+ token=token, callback='oob', http_url=authorization_url)
+ print 'Please run this URL in a browser and paste the token back here'
+ print oauth_req2.to_url()
+ verification_code = raw_input('Enter verification code: ').strip()
+ token.set_verifier(verification_code)
+
+ # get access token
+ print '* Obtain an access token ...'
+ oauth_req3 = oauth.Request.from_consumer_and_token(
+ consumer, token=token, http_url=access_token_url)
+ oauth_req3.sign_request(signature_method_hmac_sha1, consumer, token)
+ print 'Request headers: %s' % str(oauth_req3.to_header())
+ response, content = h.request(oauth_req3.to_url(), 'GET')
+ access_token = oauth.Token.from_string(content)
+ print 'Access Token key: %s secret:%s' % (str(access_token.key),
+ str(access_token.secret))
+
+ # Save the token to a file if its specified.
+ if FLAGS.access_token_file:
+ fhandle = open(FLAGS.access_token_file, 'w')
+ fhandle.write(access_token.to_string())
+ fhandle.close()
+
+ # Example : access some protected resources
+ print '* Checking the access token against protected resources...'
+ # Assumes that the server + "/" is protected.
+ test_url = server + "/"
+ oauth_req4 = oauth.Request.from_consumer_and_token(consumer,
+ token=token,
+ http_url=test_url)
+ oauth_req4.sign_request(signature_method_hmac_sha1, consumer, token)
+ resp, content = h.request(test_url, "GET", headers=oauth_req4.to_header())
+ print resp
+ print content
+
+
+def main(argv):
+ get_access_token()
+
+if __name__ == '__main__':
+ app.run()
diff --git a/samples/gtaskqueue_sample/gtaskqueue/gtaskqueue b/samples/gtaskqueue_sample/gtaskqueue/gtaskqueue
new file mode 100644
index 0000000..ce68ef1
--- /dev/null
+++ b/samples/gtaskqueue_sample/gtaskqueue/gtaskqueue
@@ -0,0 +1,53 @@
+#!/usr/bin/env python
+#
+# Copyright (C) 2010 Google Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Command line tool for interacting with Google TaskQueue API."""
+
+__version__ = '0.0.1'
+
+
+import logging
+
+from gtaskqueue import task_cmds
+from gtaskqueue import taskqueue_cmds
+
+from google.apputils import appcommands
+import gflags as flags
+
+LOG_LEVELS = [logging.DEBUG,
+ logging.INFO,
+ logging.WARNING,
+ logging.CRITICAL]
+LOG_LEVEL_NAMES = map(logging.getLevelName, LOG_LEVELS)
+FLAGS = flags.FLAGS
+
+flags.DEFINE_enum(
+ 'log_level',
+ logging.getLevelName(logging.WARNING),
+ LOG_LEVEL_NAMES,
+ 'Logging output level.')
+
+
+def main(unused_argv):
+ log_level_map = dict(
+ [(logging.getLevelName(level), level) for level in LOG_LEVELS])
+ logging.getLogger().setLevel(log_level_map[FLAGS.log_level])
+ taskqueue_cmds.add_commands()
+ task_cmds.add_commands()
+
+if __name__ == '__main__':
+ appcommands.Run()
diff --git a/samples/gtaskqueue_sample/gtaskqueue/gtaskqueue_puller b/samples/gtaskqueue_sample/gtaskqueue/gtaskqueue_puller
new file mode 100644
index 0000000..f53181a
--- /dev/null
+++ b/samples/gtaskqueue_sample/gtaskqueue/gtaskqueue_puller
@@ -0,0 +1,348 @@
+#!/usr/bin/env python
+#
+# Copyright (C) 2010 Google Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""Module to pull tasks from TaskQueues and execute them.
+
+This module does the following in an infinite loop.
+1. Connects to Task API (of TaskQueues API collection) to request lease on
+ certain number of tasks (specified by user).
+2. Spawns parallel processes to execute the leased tasks.
+3. Polls all the tasks continously till they finish.
+4. Deletes the tasks from taskqueue on their successful completion.
+5. It lets the user specify when to invoke the lease request instead of polling
+ tasks status in a tight loop for better resource utilization:
+ a. Invoke the Lease request when runnning tasks go beyound certain
+ threshold (min_running_tasks)
+ b. Wait time becomes more than specified poll-time-out interval.
+6. Repeat the steps from 1 to 5 when either all tasks have finished executing
+ or one of the conditions in 5) is met. """
+
+
+
+import sys
+import time
+from apiclient.errors import HttpError
+from gtaskqueue.client_task import ClientTask
+from gtaskqueue.taskqueue_client import TaskQueueClient
+from gtaskqueue.taskqueue_logger import logger
+from gtaskqueue.taskqueue_logger import set_logger
+from google.apputils import app
+import gflags as flags
+
+FLAGS = flags.FLAGS
+flags.DEFINE_string(
+ 'project_name',
+ 'default',
+ 'The name of the Taskqueue API project.')
+flags.DEFINE_string(
+ 'taskqueue_name',
+ 'testpuller',
+ 'taskqueue to which client wants to connect to')
+flags.DEFINE_integer(
+ 'lease_secs',
+ 30,
+ 'The lease for the task in seconds')
+flags.DEFINE_integer(
+ 'num_tasks',
+ 10,
+ 'The number of tasks to lease')
+flags.DEFINE_integer(
+ 'min_running_tasks',
+ 0,
+ 'minmum number of tasks below which lease can be invoked')
+flags.DEFINE_float(
+ 'sleep_interval_secs',
+ 2,
+ 'sleep interval when no tasks are found in the taskqueue')
+flags.DEFINE_float(
+ 'timeout_secs_for_next_lease_request',
+ 600,
+ 'Wait time before next poll when no tasks are found in the'
+ 'queue (in seconds)')
+flags.DEFINE_integer(
+ 'taskapi_requests_per_sec',
+ None,
+ 'limit on task_api requests per second')
+flags.DEFINE_float(
+ 'sleep_before_next_poll_secs',
+ 2,
+ 'sleep interval before next poll')
+
+
+class TaskQueuePuller(object):
+ """Maintains state information for TaskQueuePuller."""
+
+ def __init__(self):
+ self._last_lease_time = None
+ self._poll_timeout_start = None
+ self._num_last_leased_tasks = 0
+ # Dictionary for running tasks's ids and their corresponding
+ # client_task object.
+ self._taskprocess_map = {}
+ try:
+ self.__tcq = TaskQueueClient()
+ self.task_api = self.__tcq.get_taskapi()
+ except HttpError, http_error:
+ logger.error('Could not get TaskQueue API handler and hence' \
+ 'exiting: %s' % str(http_error))
+ sys.exit()
+
+ def _can_lease(self):
+ """Determines if new tasks can be leased.
+
+ Determines if new taks can be leased based on
+ 1. Number of tasks already running in the system.
+ 2. Limit on accessing the taskqueue apirary.
+
+ Returns:
+ True/False.
+ """
+ if self._num_tasks_to_lease() > 0 and not self._is_rate_exceeded():
+ return True
+ else:
+ return False
+
+ def _is_rate_exceeded(self):
+
+ """Determines if requests/second to TaskQueue API has exceeded limit.
+
+ We do not access the APIs beyond the specified permissible limit.
+ If we have run N tasks in elapsed time since last lease, we have
+ already made N+1 requests to API (1 for collective lease and N for
+ their individual delete operations). If K reqs/sec is the limit on
+ accessing APIs, then we sould not invoke any request to API before
+ N+1/K sec approximately. The above condition is formulated in the
+ following method.
+ Returns:
+ True/False
+ """
+ if not FLAGS.taskapi_requests_per_sec:
+ return False
+ if not self._last_lease_time:
+ return False
+ curr_time = time.time()
+ if ((curr_time - self._last_lease_time) <
+ ((1.0 * (self._num_last_leased_tasks -
+ len(self._taskprocess_map)) /
+ FLAGS.taskapi_requests_per_sec))):
+ return True
+ else:
+ return False
+
+ def _num_tasks_to_lease(self):
+
+ """Determines how many tasks can be leased.
+
+ num_tasks is upper limit to running tasks in the system and hence
+ number of tasks which could be leased is difference of numtasks and
+ currently running tasks.
+
+ Returns:
+ Number of tasks to lease.
+ """
+ return FLAGS.num_tasks - len(self._taskprocess_map)
+
+ def _update_last_lease_info(self, result):
+
+ """Updates the information regarding last lease.
+
+ Args:
+ result: Response object from TaskQueue API, containing list of
+ tasks.
+ """
+ self._last_lease_time = time.time()
+ if result:
+ if result.get('items'):
+ self._num_last_leased_tasks = len(result.get('items'))
+ else:
+ self._num_last_leased_tasks = 0
+ else:
+ self._num_last_leased_tasks = 0
+
+ def _update_poll_timeout_start(self):
+
+ """Updates the start time for poll-timeout."""
+ if not self._poll_timeout_start:
+ self._poll_timeout_start = time.time()
+
+ def _continue_polling(self):
+
+ """Checks whether lease can be invoked based on running tasks and
+ timeout.
+
+ Lease can be invoked if
+ 1. Running tasks in the sytem has gone below the specified
+ threshold (min_running_tasks).
+ 2. Wait time has exceeded beyond time-out specified and at least one
+ tas has finished since last lease invocation.
+
+ By doing this, we are essentially trying to batch the lease requests.
+ If this is not done and we start off leasing N tasks, its likely tasks
+ may finish slightly one after another, and we make N lease requests for
+ each task for next N tasks and so on. This can result in unnecessary
+ lease API call and hence to avoid that, we try and batch the lease
+ requests. Also we put certain limit on wait time for batching the
+ requests by incororating the time-out.
+
+ Returns:
+ True/False
+ """
+ if len(self._taskprocess_map) <= FLAGS.min_running_tasks:
+ return False
+ if self._poll_timeout_start:
+ elapsed_time = time.time() - self._poll_timeout_start
+ if elapsed_time > FLAGS.timeout_secs_for_next_lease_request:
+ self._poll_timeout_start = None
+ return False
+ return True
+
+ def _get_tasks_from_queue(self):
+
+ """Gets the available tasks from the taskqueue.
+
+ Returns:
+ Lease response object.
+ """
+ try:
+ tasks_to_fetch = self._num_tasks_to_lease()
+ lease_req = self.task_api.tasks().lease(
+ project=FLAGS.project_name,
+ taskqueue=FLAGS.taskqueue_name,
+ leaseSecs=FLAGS.lease_secs,
+ numTasks=tasks_to_fetch,
+ body={})
+ result = lease_req.execute()
+ return result
+ except HttpError, http_error:
+ logger.error('Error during lease request: %s' % str(http_error))
+ return None
+
+ def _create_subprocesses_for_tasks(self, result):
+
+ """Spawns parallel sub processes to execute tasks for better
+ throughput.
+
+ Args:
+ result: lease resonse dictionary object.
+ """
+ if not result:
+ logger.info('Error: result is not defined')
+ return None
+ if result.get('items'):
+ for task in result.get('items'):
+ task_id = task.get('id')
+ # Given that a task may be leased multiple times, we may get a
+ # task which we are currently executing on, so make sure we
+ # dont spaw another subprocess for it.
+ if task_id not in self._taskprocess_map:
+ ct = ClientTask(task)
+ # Check if tasks got initialized properly and then pu them
+ # in running tasks map.
+ if ct.init():
+ # Put the clientTask objects in a dictionary to keep
+ # track of stats and objects are used later to delete
+ # the tasks from taskqueue
+ self._taskprocess_map[ct.get_task_id()] = ct
+
+ def _poll_running_tasks(self):
+
+ """Polls all the running tasks and delete them from taskqueue if
+ completed."""
+ if self._taskprocess_map:
+ for task in self._taskprocess_map.values():
+ if task.is_completed(self.task_api):
+ del self._taskprocess_map[task.get_task_id()]
+ # updates scheduling information for later use.
+ self._update_poll_timeout_start()
+
+ def _sleep_before_next_lease(self):
+
+ """Sleeps before invoking lease if required based on last lease info.
+
+ It sleeps when no tasks were found on the taskqueue during last lease
+ request. To note, it discount the time taken in polling the tasks and
+ sleeps for (sleep_interval - time taken in poll). This avoids the
+ unnecessary wait if tasks could be leased. If no time was taken in
+ poll since there were not tasks in the system, it waits for full sleep
+ interval and thus optimizes the CPU cycles.
+ It does not sleep if the method is called for the first time (when no
+ lease request has ever been made).
+ """
+ if not self._last_lease_time:
+ sleep_secs = 0
+ elif self._num_last_leased_tasks <= 0:
+ time_elpased_since_last_lease = time.time() - self._last_lease_time
+ sleep_secs = (FLAGS.sleep_interval_secs -
+ time_elpased_since_last_lease)
+ if sleep_secs > 0:
+ logger.info('No tasks found and hence sleeping for sometime')
+ time.sleep(FLAGS.sleep_interval_secs)
+
+ def lease_tasks(self):
+
+ """Requests lease for specified number of tasks.
+
+ It invokes lease request for appropriate number of tasks, spawns
+ parallel processes to execute them and also maintains scheduling
+ information.
+
+ LeaseTask also takes care of waiting(sleeping) before invoking lease if
+ there are no tasks which can be leased in the taskqueue. This results
+ in better resource utilization. Apart from this, it also controls the
+ number of requests being sent to taskqueue APIs.
+
+ Returns:
+ True/False based on if tasks could be leased or not.
+ """
+ self._sleep_before_next_lease()
+ if self._can_lease():
+ result = self._get_tasks_from_queue()
+ self._update_last_lease_info(result)
+ self._create_subprocesses_for_tasks(result)
+ return True
+ return False
+
+ def poll_tasks(self):
+
+ """Polls the status of running tasks of the system.
+
+ Polls the status of tasks and then decides if it should continue to
+ poll depending on number of tasks running in the system and timeouts.
+ Instead of polling in a tight loop, it sleeps for sometime before the
+ next poll to avoid any unnecessary CPU cycles. poll_tasks returns
+ only when system has capability to accomodate at least one new task.
+ """
+
+ self._poll_running_tasks()
+ while self._continue_polling():
+ logger.info('Sleeping before next poll')
+ time.sleep(FLAGS.sleep_before_next_poll_secs)
+ self._poll_running_tasks()
+
+
+def main(argv):
+
+ """Infinite loop to lease new tasks and poll them for completion."""
+ # Settings for logger
+ set_logger()
+ # Instantiate puller
+ puller = TaskQueuePuller()
+ while True:
+ puller.lease_tasks()
+ puller.poll_tasks()
+
+if __name__ == '__main__':
+ app.run()
diff --git a/samples/gtaskqueue_sample/gtaskqueue/task_cmds.py b/samples/gtaskqueue_sample/gtaskqueue/task_cmds.py
new file mode 100644
index 0000000..4adea2b
--- /dev/null
+++ b/samples/gtaskqueue_sample/gtaskqueue/task_cmds.py
@@ -0,0 +1,150 @@
+#!/usr/bin/env python
+#
+# Copyright (C) 2010 Google Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+"""Commands to interact with the Task object of the TaskQueue API."""
+
+
+__version__ = '0.0.1'
+
+
+
+from gtaskqueue.taskqueue_cmd_base import GoogleTaskCommand
+
+from google.apputils import app
+from google.apputils import appcommands
+import gflags as flags
+
+FLAGS = flags.FLAGS
+
+
+class GetTaskCommand(GoogleTaskCommand):
+ """Get properties of an existing task."""
+
+ def __init__(self, name, flag_values):
+ super(GetTaskCommand, self).__init__(name, flag_values)
+
+ def build_request(self, task_api, flag_values):
+ """Build a request to get properties of a Task.
+
+ Args:
+ task_api: The handle to the task collection API.
+ flag_values: The parsed command flags.
+ Returns:
+ The properties of the task.
+ """
+ return task_api.get(project=flag_values.project_name,
+ taskqueue=flag_values.taskqueue_name,
+ task=flag_values.task_name)
+
+
+class LeaseTaskCommand(GoogleTaskCommand):
+ """Lease a new task from the queue."""
+
+ def __init__(self, name, flag_values):
+ super(LeaseTaskCommand, self).__init__(name,
+ flag_values,
+ need_task_flag=False)
+ flags.DEFINE_integer('lease_secs',
+ None,
+ 'The lease for the task in seconds')
+ flags.DEFINE_integer('num_tasks',
+ 1,
+ 'The number of tasks to lease')
+ flags.DEFINE_integer('payload_size_to_display',
+ 2 * 1024 * 1024,
+ 'Size of the payload for leased tasks to show')
+
+ def build_request(self, task_api, flag_values):
+ """Build a request to lease a pending task from the TaskQueue.
+
+ Args:
+ task_api: The handle to the task collection API.
+ flag_values: The parsed command flags.
+ Returns:
+ A new leased task.
+ """
+ if not flag_values.lease_secs:
+ raise app.UsageError('lease_secs must be specified')
+
+ return task_api.lease(project=flag_values.project_name,
+ taskqueue=flag_values.taskqueue_name,
+ leaseSecs=flag_values.lease_secs,
+ numTasks=flag_values.num_tasks,
+ body={})
+
+ def print_result(self, result):
+ """Override to optionally strip the payload since it can be long."""
+ if result.get('items'):
+ items = []
+ for task in result.get('items'):
+ payloadlen = len(task['payloadBase64'])
+ if payloadlen > FLAGS.payload_size_to_display:
+ extra = payloadlen - FLAGS.payload_size_to_display
+ task['payloadBase64'] = ('%s(%d more bytes)' %
+ (task['payloadBase64'][:FLAGS.payload_size_to_display],
+ extra))
+ items.append(task)
+ result['items'] = items
+ GoogleTaskCommand.print_result(self, result)
+
+
+class DeleteTaskCommand(GoogleTaskCommand):
+ """Delete an existing task."""
+
+ def __init__(self, name, flag_values):
+ super(DeleteTaskCommand, self).__init__(name, flag_values)
+
+ def build_request(self, task_api, flag_values):
+ """Build a request to delete a Task.
+
+ Args:
+ task_api: The handle to the taskqueue collection API.
+ flag_values: The parsed command flags.
+ Returns:
+ Whether the delete was successful.
+ """
+ return task_api.delete(project=flag_values.project_name,
+ taskqueue=flag_values.taskqueue_name,
+ task=flag_values.task_name)
+
+
+class ListTasksCommand(GoogleTaskCommand):
+ """Lists all tasks in a queue (currently upto a max of 100)."""
+
+ def __init__(self, name, flag_values):
+ super(ListTasksCommand, self).__init__(name,
+ flag_values,
+ need_task_flag=False)
+
+ def build_request(self, task_api, flag_values):
+ """Build a request to lists tasks in a queue.
+
+ Args:
+ task_api: The handle to the taskqueue collection API.
+ flag_values: The parsed command flags.
+ Returns:
+ A list of pending tasks in the queue.
+ """
+ return task_api.list(project=flag_values.project_name,
+ taskqueue=flag_values.taskqueue_name)
+
+
+def add_commands():
+ appcommands.AddCmd('listtasks', ListTasksCommand)
+ appcommands.AddCmd('gettask', GetTaskCommand)
+ appcommands.AddCmd('deletetask', DeleteTaskCommand)
+ appcommands.AddCmd('leasetask', LeaseTaskCommand)
diff --git a/samples/gtaskqueue_sample/gtaskqueue/taskqueue_client.py b/samples/gtaskqueue_sample/gtaskqueue/taskqueue_client.py
new file mode 100644
index 0000000..425e156
--- /dev/null
+++ b/samples/gtaskqueue_sample/gtaskqueue/taskqueue_client.py
@@ -0,0 +1,189 @@
+#!/usr/bin/env python
+#
+# Copyright (C) 2010 Google Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""Class to connect to TaskQueue API."""
+
+
+
+import os
+import sys
+import urlparse
+from apiclient.anyjson import simplejson as json
+from apiclient.discovery import build
+from apiclient.errors import HttpError
+import httplib2
+from oauth2client.file import Storage
+from oauth2client.client import OAuth2WebServerFlow
+from oauth2client.tools import run
+from gtaskqueue.taskqueue_logger import logger
+
+from google.apputils import app
+import gflags as flags
+
+FLAGS = flags.FLAGS
+flags.DEFINE_string(
+ 'service_version',
+ 'v1beta1',
+ 'Google taskqueue api version.')
+flags.DEFINE_string(
+ 'api_host',
+ 'https://www.googleapis.com/',
+ 'API host name')
+flags.DEFINE_bool(
+ 'use_developer_key',
+ False,
+ 'User wants to use the developer key while accessing taskqueue apis')
+flags.DEFINE_string(
+ 'developer_key_file',
+ '~/.taskqueue.apikey',
+ 'Developer key provisioned from api console')
+flags.DEFINE_bool(
+ 'dump_request',
+ False,
+ 'Prints the outgoing HTTP request along with headers and body.')
+flags.DEFINE_string(
+ 'credentials_file',
+ 'taskqueue.dat',
+ 'File where you want to store the auth credentails for later user')
+
+# Set up a Flow object to be used if we need to authenticate. This
+# sample uses OAuth 2.0, and we set up the OAuth2WebServerFlow with
+# the information it needs to authenticate. Note that it is called
+# the Web Server Flow, but it can also handle the flow for native
+# applications <http://code.google.com/apis/accounts/docs/OAuth2.html#IA>
+# The client_id client_secret are copied from the Identity tab on
+# the Google APIs Console <http://code.google.com/apis/console>
+FLOW = OAuth2WebServerFlow(
+ client_id='157776985798.apps.googleusercontent.com',
+ client_secret='tlpVCmaS6yLjxnnPu0ARIhNw',
+ scope='https://www.googleapis.com/auth/taskqueue',
+ user_agent='taskqueue-cmdline-sample/1.0')
+
+
+class TaskQueueClient:
+ """Class to setup connection with taskqueue API."""
+
+ def __init__(self):
+ if not FLAGS.project_name:
+ raise app.UsageError('You must specify a project name'
+ ' using the "--project_name" flag.')
+ discovery_uri = (
+ FLAGS.api_host + 'discovery/v0.3/describe/{api}/{apiVersion}')
+ logger.info(discovery_uri)
+ try:
+ # If the Credentials don't exist or are invalid run through the
+ # native clien flow. The Storage object will ensure that if
+ # successful the good Credentials will get written back to a file.
+ # Setting FLAGS.auth_local_webserver to false since we can run our
+ # tool on Virtual Machines and we do not want to run the webserver
+ # on VMs.
+ FLAGS.auth_local_webserver = False
+ storage = Storage(FLAGS.credentials_file)
+ credentials = storage.get()
+ if credentials is None or credentials.invalid == True:
+ credentials = run(FLOW, storage)
+ http = credentials.authorize(self._dump_request_wrapper(
+ httplib2.Http()))
+ self.task_api = build('taskqueue',
+ FLAGS.service_version,
+ http=http,
+ discoveryServiceUrl=discovery_uri)
+ except HttpError, http_error:
+ logger.error('Error gettin task_api: %s' % http_error)
+
+ def get_taskapi(self):
+ """Returns handler for tasks API from taskqueue API collection."""
+ return self.task_api
+
+
+ def _dump_request_wrapper(self, http):
+ """Dumps the outgoing HTTP request if requested.
+
+ Args:
+ http: An instance of httplib2.Http or something that acts like it.
+
+ Returns:
+ httplib2.Http like object.
+ """
+ request_orig = http.request
+
+ def new_request(uri, method='GET', body=None, headers=None,
+ redirections=httplib2.DEFAULT_MAX_REDIRECTS,
+ connection_type=None):
+ """Overrides the http.request method to add some utilities."""
+ if (FLAGS.api_host + "discovery/" not in uri and
+ FLAGS.use_developer_key):
+ developer_key_path = os.path.expanduser(
+ FLAGS.developer_key_file)
+ if not os.path.isfile(developer_key_path):
+ print 'Please generate developer key from the Google API' \
+ 'Console and store it in %s' % (FLAGS.developer_key_file)
+ sys.exit()
+ developer_key_file = open(developer_key_path, 'r')
+ try:
+ developer_key = developer_key_file.read().strip()
+ except IOError, io_error:
+ print 'Error loading developer key from file %s' % (
+ FLAGS.developer_key_file)
+ print 'Error details: %s' % str(io_error)
+ sys.exit()
+ finally:
+ developer_key_file.close()
+ s = urlparse.urlparse(uri)
+ query = 'key=' + developer_key
+ if s.query:
+ query = s.query + '&key=' + developer_key
+ d = urlparse.ParseResult(s.scheme,
+ s.netloc,
+ s.path,
+ s.params,
+ query,
+ s.fragment)
+ uri = urlparse.urlunparse(d)
+ if FLAGS.dump_request:
+ print '--request-start--'
+ print '%s %s' % (method, uri)
+ if headers:
+ for (h, v) in headers.iteritems():
+ print '%s: %s' % (h, v)
+ print ''
+ if body:
+ print json.dumps(json.loads(body),
+ sort_keys=True,
+ indent=2)
+ print '--request-end--'
+ return request_orig(uri,
+ method,
+ body,
+ headers,
+ redirections,
+ connection_type)
+ http.request = new_request
+ return http
+
+ def print_result(self, result):
+ """Pretty-print the result of the command.
+
+ The default behavior is to dump a formatted JSON encoding
+ of the result.
+
+ Args:
+ result: The JSON-serializable result to print.
+ """
+ # We could have used the pprint module, but it produces
+ # noisy output due to all of our keys and values being
+ # unicode strings rather than simply ascii.
+ print json.dumps(result, sort_keys=True, indent=2)
diff --git a/samples/gtaskqueue_sample/gtaskqueue/taskqueue_cmd_base.py b/samples/gtaskqueue_sample/gtaskqueue/taskqueue_cmd_base.py
new file mode 100644
index 0000000..42474c6
--- /dev/null
+++ b/samples/gtaskqueue_sample/gtaskqueue/taskqueue_cmd_base.py
@@ -0,0 +1,275 @@
+#!/usr/bin/env python
+#
+# Copyright (C) 2010 Google Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+"""Commands for interacting with Google TaskQueue."""
+
+__version__ = '0.0.1'
+
+
+import os
+import sys
+import urlparse
+
+
+from apiclient.discovery import build
+from apiclient.errors import HttpError
+from apiclient.anyjson import simplejson as json
+import httplib2
+from oauth2client.file import Storage
+from oauth2client.client import OAuth2WebServerFlow
+from oauth2client.tools import run
+
+from google.apputils import app
+from google.apputils import appcommands
+import gflags as flags
+
+
+FLAGS = flags.FLAGS
+
+flags.DEFINE_string(
+ 'service_version',
+ 'v1beta1',
+ 'Google taskqueue api version.')
+flags.DEFINE_string(
+ 'api_host',
+ 'https://www.googleapis.com/',
+ 'API host name')
+flags.DEFINE_string(
+ 'project_name',
+ 'default',
+ 'The name of the Taskqueue API project.')
+flags.DEFINE_bool(
+ 'use_developer_key',
+ False,
+ 'User wants to use the developer key while accessing taskqueue apis')
+flags.DEFINE_string(
+ 'developer_key_file',
+ '~/.taskqueue.apikey',
+ 'Developer key provisioned from api console')
+flags.DEFINE_bool(
+ 'dump_request',
+ False,
+ 'Prints the outgoing HTTP request along with headers and body.')
+flags.DEFINE_string(
+ 'credentials_file',
+ 'taskqueue.dat',
+ 'File where you want to store the auth credentails for later user')
+
+# Set up a Flow object to be used if we need to authenticate. This
+# sample uses OAuth 2.0, and we set up the OAuth2WebServerFlow with
+# the information it needs to authenticate. Note that it is called
+# the Web Server Flow, but it can also handle the flow for native
+# applications <http://code.google.com/apis/accounts/docs/OAuth2.html#IA>
+# The client_id client_secret are copied from the Identity tab on
+# the Google APIs Console <http://code.google.com/apis/console>
+FLOW = OAuth2WebServerFlow(
+ client_id='157776985798.apps.googleusercontent.com',
+ client_secret='tlpVCmaS6yLjxnnPu0ARIhNw',
+ scope='https://www.googleapis.com/auth/taskqueue',
+ user_agent='taskqueue-cmdline-sample/1.0')
+
+class GoogleTaskQueueCommandBase(appcommands.Cmd):
+ """Base class for all the Google TaskQueue client commands."""
+
+ DEFAULT_PROJECT_PATH = 'projects/default'
+
+ def __init__(self, name, flag_values):
+ super(GoogleTaskQueueCommandBase, self).__init__(name, flag_values)
+
+ def _dump_request_wrapper(self, http):
+ """Dumps the outgoing HTTP request if requested.
+
+ Args:
+ http: An instance of httplib2.Http or something that acts like it.
+
+ Returns:
+ httplib2.Http like object.
+ """
+ request_orig = http.request
+
+ def new_request(uri, method='GET', body=None, headers=None,
+ redirections=httplib2.DEFAULT_MAX_REDIRECTS,
+ connection_type=None):
+ """Overrides the http.request method to add some utilities."""
+ if (FLAGS.api_host + "discovery/" not in uri and
+ FLAGS.use_developer_key):
+ developer_key_path = os.path.expanduser(
+ FLAGS.developer_key_file)
+ if not os.path.isfile(developer_key_path):
+ print 'Please generate developer key from the Google APIs' \
+ 'Console and store it in %s' % (FLAGS.developer_key_file)
+ sys.exit()
+ developer_key_file = open(developer_key_path, 'r')
+ try:
+ developer_key = developer_key_file.read().strip()
+ except IOError, io_error:
+ print 'Error loading developer key from file %s' % (
+ FLAGS.developer_key_file)
+ print 'Error details: %s' % str(io_error)
+ sys.exit()
+ finally:
+ developer_key_file.close()
+ s = urlparse.urlparse(uri)
+ query = 'key=' + developer_key
+ if s.query:
+ query = s.query + '&key=' + developer_key
+ d = urlparse.ParseResult(s.scheme,
+ s.netloc,
+ s.path,
+ s.params,
+ query,
+ s.fragment)
+ uri = urlparse.urlunparse(d)
+
+ if FLAGS.dump_request:
+ print '--request-start--'
+ print '%s %s' % (method, uri)
+ if headers:
+ for (h, v) in headers.iteritems():
+ print '%s: %s' % (h, v)
+ print ''
+ if body:
+ print json.dumps(json.loads(body), sort_keys=True, indent=2)
+ print '--request-end--'
+ return request_orig(uri,
+ method,
+ body,
+ headers,
+ redirections,
+ connection_type)
+ http.request = new_request
+ return http
+
+ def Run(self, argv):
+ """Run the command, printing the result.
+
+ Args:
+ argv: The non-flag arguments to the command.
+ """
+ if not FLAGS.project_name:
+ raise app.UsageError('You must specify a project name'
+ ' using the "--project_name" flag.')
+ discovery_uri = (
+ FLAGS.api_host + 'discovery/v0.3/describe/{api}/{apiVersion}')
+ try:
+ # If the Credentials don't exist or are invalid run through the
+ # native client flow. The Storage object will ensure that if
+ # successful the good Credentials will get written back to a file.
+ # Setting FLAGS.auth_local_webserver to false since we can run our
+ # tool on Virtual Machines and we do not want to run the webserver
+ # on VMs.
+ FLAGS.auth_local_webserver = False
+ storage = Storage(FLAGS.credentials_file)
+ credentials = storage.get()
+ if credentials is None or credentials.invalid == True:
+ credentials = run(FLOW, storage)
+ http = credentials.authorize(self._dump_request_wrapper(
+ httplib2.Http()))
+ api = build('taskqueue',
+ FLAGS.service_version,
+ http=http,
+ discoveryServiceUrl=discovery_uri)
+ result = self.run_with_api_and_flags_and_args(api, FLAGS, argv)
+ self.print_result(result)
+ except HttpError, http_error:
+ print 'Error Processing request: %s' % str(http_error)
+
+ def run_with_api_and_flags_and_args(self, api, flag_values, unused_argv):
+ """Run the command given the API, flags, and args.
+
+ The default implementation of this method discards the args and
+ calls into run_with_api_and_flags.
+
+ Args:
+ api: The handle to the Google TaskQueue API.
+ flag_values: The parsed command flags.
+ unused_argv: The non-flag arguments to the command.
+ Returns:
+ The result of running the command
+ """
+ return self.run_with_api_and_flags(api, flag_values)
+
+ def print_result(self, result):
+ """Pretty-print the result of the command.
+
+ The default behavior is to dump a formatted JSON encoding
+ of the result.
+
+ Args:
+ result: The JSON-serializable result to print.
+ """
+ # We could have used the pprint module, but it produces
+ # noisy output due to all of our keys and values being
+ # unicode strings rather than simply ascii.
+ print json.dumps(result, sort_keys=True, indent=2)
+
+
+
+class GoogleTaskQueueCommand(GoogleTaskQueueCommandBase):
+ """Base command for working with the taskqueues collection."""
+
+ def __init__(self, name, flag_values):
+ super(GoogleTaskQueueCommand, self).__init__(name, flag_values)
+ flags.DEFINE_string('taskqueue_name',
+ 'myqueue',
+ 'TaskQueue name',
+ flag_values=flag_values)
+
+ def run_with_api_and_flags(self, api, flag_values):
+ """Run the command, returning the result.
+
+ Args:
+ api: The handle to the Google TaskQueue API.
+ flag_values: The parsed command flags.
+ Returns:
+ The result of running the command.
+ """
+ taskqueue_request = self.build_request(api.taskqueues(), flag_values)
+ return taskqueue_request.execute()
+
+
+class GoogleTaskCommand(GoogleTaskQueueCommandBase):
+ """Base command for working with the tasks collection."""
+
+ def __init__(self, name, flag_values, need_task_flag=True):
+ super(GoogleTaskCommand, self).__init__(name, flag_values)
+ # Common flags that are shared by all the Task commands.
+ flags.DEFINE_string('taskqueue_name',
+ 'myqueue',
+ 'TaskQueue name',
+ flag_values=flag_values)
+ # Not all task commands need the task_name flag.
+ if need_task_flag:
+ flags.DEFINE_string('task_name',
+ None,
+ 'Task name',
+ flag_values=flag_values)
+
+ def run_with_api_and_flags(self, api, flag_values):
+ """Run the command, returning the result.
+
+ Args:
+ api: The handle to the Google TaskQueue API.
+ flag_values: The parsed command flags.
+ flags.DEFINE_string('payload',
+ None,
+ 'Payload of the task')
+ Returns:
+ The result of running the command.
+ """
+ task_request = self.build_request(api.tasks(), flag_values)
+ return task_request.execute()
diff --git a/samples/gtaskqueue_sample/gtaskqueue/taskqueue_cmds.py b/samples/gtaskqueue_sample/gtaskqueue/taskqueue_cmds.py
new file mode 100644
index 0000000..99c3f2a
--- /dev/null
+++ b/samples/gtaskqueue_sample/gtaskqueue/taskqueue_cmds.py
@@ -0,0 +1,56 @@
+#!/usr/bin/env python
+#
+# Copyright (C) 2010 Google Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""Commands to interact with the TaskQueue object of the TaskQueue API."""
+
+
+__version__ = '0.0.1'
+
+
+
+from gtaskqueue.taskqueue_cmd_base import GoogleTaskQueueCommand
+
+from google.apputils import appcommands
+import gflags as flags
+
+FLAGS = flags.FLAGS
+
+
+class GetTaskQueueCommand(GoogleTaskQueueCommand):
+ """Get properties of an existing task queue."""
+
+ def __init__(self, name, flag_values):
+ super(GetTaskQueueCommand, self).__init__(name, flag_values)
+ flags.DEFINE_boolean('get_stats',
+ False,
+ 'Whether to get Stats')
+
+ def build_request(self, taskqueue_api, flag_values):
+ """Build a request to get properties of a TaskQueue.
+
+ Args:
+ taskqueue_api: The handle to the taskqueue collection API.
+ flag_values: The parsed command flags.
+ Returns:
+ The properties of the taskqueue.
+ """
+ return taskqueue_api.get(project=flag_values.project_name,
+ taskqueue=flag_values.taskqueue_name,
+ getStats=flag_values.get_stats)
+
+
+def add_commands():
+ appcommands.AddCmd('getqueue', GetTaskQueueCommand)
diff --git a/samples/gtaskqueue_sample/gtaskqueue/taskqueue_logger.py b/samples/gtaskqueue_sample/gtaskqueue/taskqueue_logger.py
new file mode 100644
index 0000000..3f445f3
--- /dev/null
+++ b/samples/gtaskqueue_sample/gtaskqueue/taskqueue_logger.py
@@ -0,0 +1,54 @@
+#!/usr/bin/env python
+#
+# Copyright (C) 2010 Google Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""Log settings for taskqueue_puller module."""
+
+
+
+import logging
+import logging.config
+from google.apputils import app
+import gflags as flags
+
+FLAGS = flags.FLAGS
+flags.DEFINE_string(
+ 'log_output_file',
+ '/tmp/taskqueue-puller.log',
+ 'Logfile name for taskqueue_puller.')
+
+
+logger = logging.getLogger('TaskQueueClient')
+
+
+def set_logger():
+ """Settings for taskqueue_puller logger."""
+ logger.setLevel(logging.INFO)
+
+ # create formatter
+ formatter = logging.Formatter(
+ '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
+
+ # Set size of the log file and the backup count for rotated log files.
+ handler = logging.handlers.RotatingFileHandler(FLAGS.log_output_file,
+ maxBytes = 1024 * 1024,
+ backupCount = 5)
+ # add formatter to handler
+ handler.setFormatter(formatter)
+ # add formatter to handler
+ logger.addHandler(handler)
+
+if __name__ == '__main__':
+ app.run()
diff --git a/samples/gtaskqueue_sample/setup.py b/samples/gtaskqueue_sample/setup.py
new file mode 100644
index 0000000..b7eff8f
--- /dev/null
+++ b/samples/gtaskqueue_sample/setup.py
@@ -0,0 +1,53 @@
+#!/usr/bin/env python
+#
+# Copyright (C) 2010 Google Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""Setup script for the Google TaskQueue API command-line tool."""
+
+__version__ = '1.0.2'
+
+
+import sys
+try:
+ from setuptools import setup
+ print 'Loaded setuptools'
+except ImportError:
+ from distutils.core import setup
+ print 'Loaded distutils.core'
+
+
+PACKAGE_NAME = 'google-taskqueue-client'
+INSTALL_REQUIRES = ['google-apputils==0.1',
+ 'google-api-python-client',
+ 'httplib2',
+ 'oauth2',
+ 'python-gflags']
+setup(name=PACKAGE_NAME,
+ version=__version__,
+ description='Google TaskQueue API command-line tool and utils',
+ author='Google Inc.',
+ author_email='google-appengine@googlegroups.com',
+ url='http://code.google.com/appengine/docs/python/taskqueue/pull/overview.html',
+ install_requires=INSTALL_REQUIRES,
+ packages=['gtaskqueue'],
+ scripts=['gtaskqueue/gtaskqueue', 'gtaskqueue/gtaskqueue_puller',
+ 'gtaskqueue/gen_appengine_access_token'],
+ license='Apache 2.0',
+ keywords='google taskqueue api client',
+ classifiers=['Development Status :: 3 - Alpha',
+ 'Intended Audience :: Developers',
+ 'License :: OSI Approved :: Apache Software License',
+ 'Operating System :: POSIX',
+ 'Topic :: Internet :: WWW/HTTP'])