Remove gtaskqueue sample.
Reviewed in https://codereview.appspot.com/6821087/.
diff --git a/samples/gtaskqueue_sample/README b/samples/gtaskqueue_sample/README
deleted file mode 100644
index f823db0..0000000
--- a/samples/gtaskqueue_sample/README
+++ /dev/null
@@ -1,49 +0,0 @@
-This acts as a sample as well as commandline tool for accessing Google TaskQueue
-APIs.
-
-api: taskqueue
-keywords: cmdline
-
-Installation
-============
-
-To install, simply say
-
- $ python setup.py install --record=files.txt
-
-to install the files and record what files are installed in files.txt.
-Make sure that you have already installed the google-python-client-libraries
-using the setup.py in the toplevel directory.
-
-You may need root priveleges for install.
-
-Running
-=======
-This sample provides following:
-1. gtaskqueue: This works as a command-line tool to access Google TaskQueue
-API. You can perform various operations on taskqueues such as leastask,
-getask, listtasks, deletetask, getqueue.
-Example usage:
- i. To lease a task
- gtaskqueue leasetask --taskqueue_name=<your queue_name>
- -lease_secs=30 --project_name=<your appengine app_name>
- ii. To get stats on a queue
- gtaskqueue getqueue --taskqueue_name=<your queue_name>
- --project_name=<your appengine app_name> --get_stats
-
-2. gtaskqueue_puller: This works as a worker to continuously pull tasks enqueued
-by your app,perform the task and the post the output back to your app.
-Example Usage:
- gtaskqueue_puller --taskqueue_name=<your queue_name>
- -lease_secs=30 --project_name=<your appengine app_name>
- --executable_binary=”cat” --output_url=<url location if you want to pos the data
- back(optional)>
-
-Third Party Libraries
-=====================
-
-These libraries will be installed when you install the client library:
-
-http://code.google.com/p/httplib2
-http://code.google.com/p/uri-templates
-http://github.com/simplegeo/python-oauth2
diff --git a/samples/gtaskqueue_sample/gtaskqueue/client_task.py b/samples/gtaskqueue_sample/gtaskqueue/client_task.py
deleted file mode 100644
index 5214e81..0000000
--- a/samples/gtaskqueue_sample/gtaskqueue/client_task.py
+++ /dev/null
@@ -1,334 +0,0 @@
-#!/usr/bin/env python
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-"""Class to encapsulate task related information and methods on task_puller."""
-
-
-
-import base64
-import oauth2 as oauth
-import os
-import subprocess
-import tempfile
-import time
-import urllib2
-from apiclient.errors import HttpError
-from gtaskqueue.taskqueue_logger import logger
-import gflags as flags
-
-
-FLAGS = flags.FLAGS
-flags.DEFINE_string(
- 'executable_binary',
- '/bin/cat',
- 'path of the binary to be executed')
-flags.DEFINE_string(
- 'output_url',
- '',
- 'url to which output is posted. The url must include param name, '
- 'value for which is populated with task_id from puller while posting '
- 'the data. Format of output url is absolute url which handles the'
- 'post request from task queue puller.'
- '(Eg: "http://taskpuller.appspot.com/taskdata?name=").'
- 'The Param value is always the task_id. The handler for this post'
- 'should be able to associate the task with its id and take'
- 'appropriate action. Use the appengine_access_token.py tool to'
- 'generate the token and store it in a file before you start.')
-flags.DEFINE_string(
- 'appengine_access_token_file',
- None,
- 'File containing an Appengine Access token, if any. If present this'
- 'token is added to the output_url request, so that the output_url can'
- 'be an authenticated end-point. Use the appengine_access_token.py tool'
- 'to generate the token and store it in a file before you start.')
-flags.DEFINE_float(
- 'task_timeout_secs',
- '3600',
- 'timeout to kill the task')
-
-
-class ClientTaskInitError(Exception):
- """Raised when initialization of client task fails."""
-
- def __init__(self, task_id, error_str):
- Exception.__init__(self)
- self.task_id = task_id
- self.error_str = error_str
-
- def __str__(self):
- return ('Error initializing task "%s". Error details "%s". '
- % (self.task_id, self.error_str))
-
-
-class ClientTask(object):
- """Class to encapsulate task information pulled by taskqueue_puller module.
-
- This class is responsible for creating an independent client task object by
- taking some information from lease response task object. It encapsulates
- methods responsible for spawning an independent subprocess for executing
- the task, tracking the status of the task and also deleting the task from
- taskqeueue when completed. It also has the functionality to give the output
- back to the application by posting to the specified url.
- """
-
- def __init__(self, task):
- self._task = task
- self._process = None
- self._output_file = None
-
- # Class method that caches the Appengine Access Token if any
- @classmethod
- def get_access_token(cls):
- if not FLAGS.appengine_access_token_file:
- return None
- if not _access_token:
- fhandle = open(FLAGS.appengine_access_token_file, 'rb')
- _access_token = oauth.Token.from_string(fhandle.read())
- fhandle.close()
- return _access_token
-
- def init(self):
- """Extracts information from task object and intializes processing.
-
- Extracts id and payload from task object, decodes the payload and puts
- it in input file. After this, it spawns a subprocess to execute the
- task.
-
- Returns:
- True if everything till task execution starts fine.
- False if anything goes wrong in initialization of task execution.
- """
- try:
- self.task_id = self._task.get('id')
- self._payload = self._decode_base64_payload(
- self._task.get('payloadBase64'))
- self._payload_file = self._dump_payload_to_file()
- self._start_task_execution()
- return True
- except ClientTaskInitError, ctie:
- logger.error(str(ctie))
- return False
-
- def _decode_base64_payload(self, encoded_str):
- """Method to decode payload encoded in base64."""
- try:
- # If the payload is empty, do not try to decode it. Payload usually
- # not expected to be empty and hence log a warning and then
- # continue.
- if encoded_str:
- decoded_str = base64.urlsafe_b64decode(
- encoded_str.encode('utf-8'))
- return decoded_str
- else:
- logger.warn('Empty paylaod for task %s' % self.task_id)
- return ''
- except base64.binascii.Error, berror:
- logger.error('Error decoding payload for task %s. Error details %s'
- % (self.task_id, str(berror)))
- raise ClientTaskInitError(self.task_id, 'Error decoding payload')
- # Generic catch block to avoid crashing of puller due to some bad
- # encoding issue wih payload of any task.
- except:
- raise ClientTaskInitError(self.task_id, 'Error decoding payload')
-
- def _dump_payload_to_file(self):
- """Method to write input extracted from payload to a temporary file."""
- try:
- (fd, fname) = tempfile.mkstemp()
- f = os.fdopen(fd, 'w')
- f.write(self._payload)
- f.close()
- return fname
- except OSError:
- logger.error('Error dumping payload %s. Error details %s' %
- (self.task_id, str(OSError)))
- raise ClientTaskInitError(self.task_id, 'Error dumping payload')
-
- def _get_input_file(self):
- return self._payload_file
-
- def _post_output(self):
- """Posts the outback back to specified url in the form of a byte
- array.
-
- It reads the output generated by the task as a byte-array. It posts the
- response to specified url appended with the taskId. The application
- using the taskqueue must have a handler to handle the data being posted
- from puller. Format of body of response object is byte-array to make
- the it genric for any kind of output generated.
-
- Returns:
- True/False based on post status.
- """
- if FLAGS.output_url:
- try:
- f = open(self._get_output_file(), 'rb')
- body = f.read()
- f.close()
- url = FLAGS.output_url + self.task_id
- logger.debug('Posting data to url %s' % url)
- headers = {'Content-Type': 'byte-array'}
- # Add an access token to the headers if specified.
- # This enables the output_url to be authenticated and not open.
- access_token = ClientTask.get_access_token()
- if access_token:
- consumer = oauth.Consumer('anonymous', 'anonymous')
- oauth_req = oauth.Request.from_consumer_and_token(
- consumer,
- token=access_token,
- http_url=url)
- headers.update(oauth_req.to_header())
- # TODO: Use httplib instead of urllib for consistency.
- req = urllib2.Request(url, body, headers)
- urllib2.urlopen(req)
- except ValueError:
- logger.error('Error posting data back %s. Error details %s'
- % (self.task_id, str(ValueError)))
- return False
- except Exception:
- logger.error('Exception while posting data back %s. Error'
- 'details %s' % (self.task_id, str(Exception)))
- return False
- return True
-
- def _get_output_file(self):
- """Returns the output file if it exists, else creates it and returns
- it."""
- if not self._output_file:
- (_, self._output_file) = tempfile.mkstemp()
- return self._output_file
-
- def get_task_id(self):
- return self.task_id
-
- def _start_task_execution(self):
- """Method to spawn subprocess to execute the tasks.
-
- This method splits the commands/executable_binary to desired arguments
- format for Popen API. It appends input and output files to the
- arguments. It is assumed that commands/executable_binary expects input
- and output files as first and second positional parameters
- respectively.
- """
- # TODO: Add code to handle the cleanly shutdown when a process is killed
- # by Ctrl+C.
- try:
- cmdline = FLAGS.executable_binary.split(' ')
- cmdline.append(self._get_input_file())
- cmdline.append(self._get_output_file())
- self._process = subprocess.Popen(cmdline)
- self.task_start_time = time.time()
- except OSError:
- logger.error('Error creating subprocess %s. Error details %s'
- % (self.task_id, str(OSError)))
- self._cleanup()
- raise ClientTaskInitError(self.task_id,
- 'Error creating subprocess')
- except ValueError:
- logger.error('Invalid arguments while executing task ',
- self.task_id)
- self._cleanup()
- raise ClientTaskInitError(self.task_id,
- 'Invalid arguments while executing task')
-
- def is_completed(self, task_api):
- """Method to check if task has finished executing.
-
- This is responsible for checking status of task execution. If the task
- has already finished executing, it deletes the task from the task
- queue. If the task has been running since long time then it assumes
- that there is high proabbility that it is dfunct and hence kills the
- corresponding subprocess. In this case, task had not completed
- successfully and hence we do not delete it form the taskqueue. In above
- two cases, task completion status is returned as true since there is
- nothing more to run in the task. In all other cases, task is still
- running and hence we return false as completion status.
-
- Args:
- task_api: handle for taskqueue api collection.
-
- Returns:
- Task completion status (True/False)
- """
- status = False
- try:
- task_status = self._process.poll()
- if task_status == 0:
- status = True
- if self._post_output():
- self._delete_task_from_queue(task_api)
- self._cleanup()
- elif self._has_timedout():
- status = True
- self._kill_subprocess()
- except OSError:
- logger.error('Error during polling status of task %s, Error '
- 'details %s' % (self.task_id, str(OSError)))
- return status
-
- def _cleanup(self):
- """Cleans up temporary input/output files used in task execution."""
- try:
- if os.path.exists(self._get_input_file()):
- os.remove(self._get_input_file())
- if os.path.exists(self._get_output_file()):
- os.remove(self._get_output_file())
- except OSError:
- logger.error('Error during file cleanup for task %s. Error'
- 'details %s' % (self.task_id, str(OSError)))
-
- def _delete_task_from_queue(self, task_api):
- """Method to delete the task from the taskqueue.
-
- First, it tries to post the output back to speified url. On successful
- post, the task is deleted from taskqueue since the task has produced
- expected output. If the post was unsuccessful, the task is not deleted
- form the tskqueue since the expected output has yet not reached the
- application. In either case cleanup is performed on the task.
-
- Args:
- task_api: handle for taskqueue api collection.
-
- Returns:
- Delete status (True/False)
- """
-
- try:
- delete_request = task_api.tasks().delete(
- project=FLAGS.project_name,
- taskqueue=FLAGS.taskqueue_name,
- task=self.task_id)
- delete_request.execute()
- except HttpError, http_error:
- logger.error('Error deleting task %s from taskqueue.'
- 'Error details %s'
- % (self.task_id, str(http_error)))
-
- def _has_timedout(self):
- """Checks if task has been running since long and has timedout."""
- if (time.time() - self.task_start_time) > FLAGS.task_timeout_secs:
- return True
- else:
- return False
-
- def _kill_subprocess(self):
- """Kills the process after cleaning up the task."""
- self._cleanup()
- try:
- self._process.kill()
- logger.info('Trying to kill task %s, since it has been running '
- 'for long' % self.task_id)
- except OSError:
- logger.error('Error killing task %s. Error details %s'
- % (self.task_id, str(OSError)))
diff --git a/samples/gtaskqueue_sample/gtaskqueue/gen_appengine_access_token b/samples/gtaskqueue_sample/gtaskqueue/gen_appengine_access_token
deleted file mode 100644
index eeef137..0000000
--- a/samples/gtaskqueue_sample/gtaskqueue/gen_appengine_access_token
+++ /dev/null
@@ -1,116 +0,0 @@
-#!/usr/bin/env python
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-"""Tool to get an Access Token to access an auth protected Appengine end point.
-
-This tool talks to the appengine end point, and gets an Access Token that is
-stored in a file. This token can be used by a tool to do authorized access to
-an appengine end point.
-"""
-
-
-
-from google.apputils import app
-import gflags as flags
-import httplib2
-import oauth2 as oauth
-import time
-
-FLAGS = flags.FLAGS
-
-flags.DEFINE_string(
- 'appengine_host',
- None,
- 'Appengine Host for whom we are trying to get an access token')
-flags.DEFINE_string(
- 'access_token_file',
- None,
- 'The file where the access token is stored')
-
-
-def get_access_token():
- if not FLAGS.appengine_host:
- print('must supply the appengine host')
- exit(1)
-
- # setup
- server = FLAGS.appengine_host
- request_token_url = server + '/_ah/OAuthGetRequestToken'
- authorization_url = server + '/_ah/OAuthAuthorizeToken'
- access_token_url = server + '/_ah/OAuthGetAccessToken'
- consumer = oauth.Consumer('anonymous', 'anonymous')
- signature_method_hmac_sha1 = oauth.SignatureMethod_HMAC_SHA1()
-
- # The Http client that will be used to make the requests.
- h = httplib2.Http()
-
- # get request token
- print '* Obtain a request token ...'
- parameters = {}
- # We dont have a callback server, we're going to use the browser to
- # authorize.
-
- #TODO: Add check for 401 etc
- parameters['oauth_callback'] = 'oob'
- oauth_req1 = oauth.Request.from_consumer_and_token(
- consumer, http_url=request_token_url, parameters=parameters)
- oauth_req1.sign_request(signature_method_hmac_sha1, consumer, None)
- print 'Request headers: %s' % str(oauth_req1.to_header())
- response, content = h.request(oauth_req1.to_url(), 'GET')
- token = oauth.Token.from_string(content)
- print 'GOT key: %s secret:%s' % (str(token.key), str(token.secret))
-
- print '* Authorize the request token ...'
- oauth_req2 = oauth.Request.from_token_and_callback(
- token=token, callback='oob', http_url=authorization_url)
- print 'Please run this URL in a browser and paste the token back here'
- print oauth_req2.to_url()
- verification_code = raw_input('Enter verification code: ').strip()
- token.set_verifier(verification_code)
-
- # get access token
- print '* Obtain an access token ...'
- oauth_req3 = oauth.Request.from_consumer_and_token(
- consumer, token=token, http_url=access_token_url)
- oauth_req3.sign_request(signature_method_hmac_sha1, consumer, token)
- print 'Request headers: %s' % str(oauth_req3.to_header())
- response, content = h.request(oauth_req3.to_url(), 'GET')
- access_token = oauth.Token.from_string(content)
- print 'Access Token key: %s secret:%s' % (str(access_token.key),
- str(access_token.secret))
-
- # Save the token to a file if its specified.
- if FLAGS.access_token_file:
- fhandle = open(FLAGS.access_token_file, 'w')
- fhandle.write(access_token.to_string())
- fhandle.close()
-
- # Example : access some protected resources
- print '* Checking the access token against protected resources...'
- # Assumes that the server + "/" is protected.
- test_url = server + "/"
- oauth_req4 = oauth.Request.from_consumer_and_token(consumer,
- token=token,
- http_url=test_url)
- oauth_req4.sign_request(signature_method_hmac_sha1, consumer, token)
- resp, content = h.request(test_url, "GET", headers=oauth_req4.to_header())
- print resp
- print content
-
-
-def main(argv):
- get_access_token()
-
-if __name__ == '__main__':
- app.run()
diff --git a/samples/gtaskqueue_sample/gtaskqueue/gtaskqueue b/samples/gtaskqueue_sample/gtaskqueue/gtaskqueue
deleted file mode 100644
index ce68ef1..0000000
--- a/samples/gtaskqueue_sample/gtaskqueue/gtaskqueue
+++ /dev/null
@@ -1,53 +0,0 @@
-#!/usr/bin/env python
-#
-# Copyright (C) 2010 Google Inc.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-"""Command line tool for interacting with Google TaskQueue API."""
-
-__version__ = '0.0.1'
-
-
-import logging
-
-from gtaskqueue import task_cmds
-from gtaskqueue import taskqueue_cmds
-
-from google.apputils import appcommands
-import gflags as flags
-
-LOG_LEVELS = [logging.DEBUG,
- logging.INFO,
- logging.WARNING,
- logging.CRITICAL]
-LOG_LEVEL_NAMES = map(logging.getLevelName, LOG_LEVELS)
-FLAGS = flags.FLAGS
-
-flags.DEFINE_enum(
- 'log_level',
- logging.getLevelName(logging.WARNING),
- LOG_LEVEL_NAMES,
- 'Logging output level.')
-
-
-def main(unused_argv):
- log_level_map = dict(
- [(logging.getLevelName(level), level) for level in LOG_LEVELS])
- logging.getLogger().setLevel(log_level_map[FLAGS.log_level])
- taskqueue_cmds.add_commands()
- task_cmds.add_commands()
-
-if __name__ == '__main__':
- appcommands.Run()
diff --git a/samples/gtaskqueue_sample/gtaskqueue/gtaskqueue_puller b/samples/gtaskqueue_sample/gtaskqueue/gtaskqueue_puller
deleted file mode 100644
index f53181a..0000000
--- a/samples/gtaskqueue_sample/gtaskqueue/gtaskqueue_puller
+++ /dev/null
@@ -1,348 +0,0 @@
-#!/usr/bin/env python
-#
-# Copyright (C) 2010 Google Inc.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-"""Module to pull tasks from TaskQueues and execute them.
-
-This module does the following in an infinite loop.
-1. Connects to Task API (of TaskQueues API collection) to request lease on
- certain number of tasks (specified by user).
-2. Spawns parallel processes to execute the leased tasks.
-3. Polls all the tasks continously till they finish.
-4. Deletes the tasks from taskqueue on their successful completion.
-5. It lets the user specify when to invoke the lease request instead of polling
- tasks status in a tight loop for better resource utilization:
- a. Invoke the Lease request when runnning tasks go beyound certain
- threshold (min_running_tasks)
- b. Wait time becomes more than specified poll-time-out interval.
-6. Repeat the steps from 1 to 5 when either all tasks have finished executing
- or one of the conditions in 5) is met. """
-
-
-
-import sys
-import time
-from apiclient.errors import HttpError
-from gtaskqueue.client_task import ClientTask
-from gtaskqueue.taskqueue_client import TaskQueueClient
-from gtaskqueue.taskqueue_logger import logger
-from gtaskqueue.taskqueue_logger import set_logger
-from google.apputils import app
-import gflags as flags
-
-FLAGS = flags.FLAGS
-flags.DEFINE_string(
- 'project_name',
- 'default',
- 'The name of the Taskqueue API project.')
-flags.DEFINE_string(
- 'taskqueue_name',
- 'testpuller',
- 'taskqueue to which client wants to connect to')
-flags.DEFINE_integer(
- 'lease_secs',
- 30,
- 'The lease for the task in seconds')
-flags.DEFINE_integer(
- 'num_tasks',
- 10,
- 'The number of tasks to lease')
-flags.DEFINE_integer(
- 'min_running_tasks',
- 0,
- 'minmum number of tasks below which lease can be invoked')
-flags.DEFINE_float(
- 'sleep_interval_secs',
- 2,
- 'sleep interval when no tasks are found in the taskqueue')
-flags.DEFINE_float(
- 'timeout_secs_for_next_lease_request',
- 600,
- 'Wait time before next poll when no tasks are found in the'
- 'queue (in seconds)')
-flags.DEFINE_integer(
- 'taskapi_requests_per_sec',
- None,
- 'limit on task_api requests per second')
-flags.DEFINE_float(
- 'sleep_before_next_poll_secs',
- 2,
- 'sleep interval before next poll')
-
-
-class TaskQueuePuller(object):
- """Maintains state information for TaskQueuePuller."""
-
- def __init__(self):
- self._last_lease_time = None
- self._poll_timeout_start = None
- self._num_last_leased_tasks = 0
- # Dictionary for running tasks's ids and their corresponding
- # client_task object.
- self._taskprocess_map = {}
- try:
- self.__tcq = TaskQueueClient()
- self.task_api = self.__tcq.get_taskapi()
- except HttpError, http_error:
- logger.error('Could not get TaskQueue API handler and hence' \
- 'exiting: %s' % str(http_error))
- sys.exit()
-
- def _can_lease(self):
- """Determines if new tasks can be leased.
-
- Determines if new taks can be leased based on
- 1. Number of tasks already running in the system.
- 2. Limit on accessing the taskqueue apirary.
-
- Returns:
- True/False.
- """
- if self._num_tasks_to_lease() > 0 and not self._is_rate_exceeded():
- return True
- else:
- return False
-
- def _is_rate_exceeded(self):
-
- """Determines if requests/second to TaskQueue API has exceeded limit.
-
- We do not access the APIs beyond the specified permissible limit.
- If we have run N tasks in elapsed time since last lease, we have
- already made N+1 requests to API (1 for collective lease and N for
- their individual delete operations). If K reqs/sec is the limit on
- accessing APIs, then we sould not invoke any request to API before
- N+1/K sec approximately. The above condition is formulated in the
- following method.
- Returns:
- True/False
- """
- if not FLAGS.taskapi_requests_per_sec:
- return False
- if not self._last_lease_time:
- return False
- curr_time = time.time()
- if ((curr_time - self._last_lease_time) <
- ((1.0 * (self._num_last_leased_tasks -
- len(self._taskprocess_map)) /
- FLAGS.taskapi_requests_per_sec))):
- return True
- else:
- return False
-
- def _num_tasks_to_lease(self):
-
- """Determines how many tasks can be leased.
-
- num_tasks is upper limit to running tasks in the system and hence
- number of tasks which could be leased is difference of numtasks and
- currently running tasks.
-
- Returns:
- Number of tasks to lease.
- """
- return FLAGS.num_tasks - len(self._taskprocess_map)
-
- def _update_last_lease_info(self, result):
-
- """Updates the information regarding last lease.
-
- Args:
- result: Response object from TaskQueue API, containing list of
- tasks.
- """
- self._last_lease_time = time.time()
- if result:
- if result.get('items'):
- self._num_last_leased_tasks = len(result.get('items'))
- else:
- self._num_last_leased_tasks = 0
- else:
- self._num_last_leased_tasks = 0
-
- def _update_poll_timeout_start(self):
-
- """Updates the start time for poll-timeout."""
- if not self._poll_timeout_start:
- self._poll_timeout_start = time.time()
-
- def _continue_polling(self):
-
- """Checks whether lease can be invoked based on running tasks and
- timeout.
-
- Lease can be invoked if
- 1. Running tasks in the sytem has gone below the specified
- threshold (min_running_tasks).
- 2. Wait time has exceeded beyond time-out specified and at least one
- tas has finished since last lease invocation.
-
- By doing this, we are essentially trying to batch the lease requests.
- If this is not done and we start off leasing N tasks, its likely tasks
- may finish slightly one after another, and we make N lease requests for
- each task for next N tasks and so on. This can result in unnecessary
- lease API call and hence to avoid that, we try and batch the lease
- requests. Also we put certain limit on wait time for batching the
- requests by incororating the time-out.
-
- Returns:
- True/False
- """
- if len(self._taskprocess_map) <= FLAGS.min_running_tasks:
- return False
- if self._poll_timeout_start:
- elapsed_time = time.time() - self._poll_timeout_start
- if elapsed_time > FLAGS.timeout_secs_for_next_lease_request:
- self._poll_timeout_start = None
- return False
- return True
-
- def _get_tasks_from_queue(self):
-
- """Gets the available tasks from the taskqueue.
-
- Returns:
- Lease response object.
- """
- try:
- tasks_to_fetch = self._num_tasks_to_lease()
- lease_req = self.task_api.tasks().lease(
- project=FLAGS.project_name,
- taskqueue=FLAGS.taskqueue_name,
- leaseSecs=FLAGS.lease_secs,
- numTasks=tasks_to_fetch,
- body={})
- result = lease_req.execute()
- return result
- except HttpError, http_error:
- logger.error('Error during lease request: %s' % str(http_error))
- return None
-
- def _create_subprocesses_for_tasks(self, result):
-
- """Spawns parallel sub processes to execute tasks for better
- throughput.
-
- Args:
- result: lease resonse dictionary object.
- """
- if not result:
- logger.info('Error: result is not defined')
- return None
- if result.get('items'):
- for task in result.get('items'):
- task_id = task.get('id')
- # Given that a task may be leased multiple times, we may get a
- # task which we are currently executing on, so make sure we
- # dont spaw another subprocess for it.
- if task_id not in self._taskprocess_map:
- ct = ClientTask(task)
- # Check if tasks got initialized properly and then pu them
- # in running tasks map.
- if ct.init():
- # Put the clientTask objects in a dictionary to keep
- # track of stats and objects are used later to delete
- # the tasks from taskqueue
- self._taskprocess_map[ct.get_task_id()] = ct
-
- def _poll_running_tasks(self):
-
- """Polls all the running tasks and delete them from taskqueue if
- completed."""
- if self._taskprocess_map:
- for task in self._taskprocess_map.values():
- if task.is_completed(self.task_api):
- del self._taskprocess_map[task.get_task_id()]
- # updates scheduling information for later use.
- self._update_poll_timeout_start()
-
- def _sleep_before_next_lease(self):
-
- """Sleeps before invoking lease if required based on last lease info.
-
- It sleeps when no tasks were found on the taskqueue during last lease
- request. To note, it discount the time taken in polling the tasks and
- sleeps for (sleep_interval - time taken in poll). This avoids the
- unnecessary wait if tasks could be leased. If no time was taken in
- poll since there were not tasks in the system, it waits for full sleep
- interval and thus optimizes the CPU cycles.
- It does not sleep if the method is called for the first time (when no
- lease request has ever been made).
- """
- if not self._last_lease_time:
- sleep_secs = 0
- elif self._num_last_leased_tasks <= 0:
- time_elpased_since_last_lease = time.time() - self._last_lease_time
- sleep_secs = (FLAGS.sleep_interval_secs -
- time_elpased_since_last_lease)
- if sleep_secs > 0:
- logger.info('No tasks found and hence sleeping for sometime')
- time.sleep(FLAGS.sleep_interval_secs)
-
- def lease_tasks(self):
-
- """Requests lease for specified number of tasks.
-
- It invokes lease request for appropriate number of tasks, spawns
- parallel processes to execute them and also maintains scheduling
- information.
-
- LeaseTask also takes care of waiting(sleeping) before invoking lease if
- there are no tasks which can be leased in the taskqueue. This results
- in better resource utilization. Apart from this, it also controls the
- number of requests being sent to taskqueue APIs.
-
- Returns:
- True/False based on if tasks could be leased or not.
- """
- self._sleep_before_next_lease()
- if self._can_lease():
- result = self._get_tasks_from_queue()
- self._update_last_lease_info(result)
- self._create_subprocesses_for_tasks(result)
- return True
- return False
-
- def poll_tasks(self):
-
- """Polls the status of running tasks of the system.
-
- Polls the status of tasks and then decides if it should continue to
- poll depending on number of tasks running in the system and timeouts.
- Instead of polling in a tight loop, it sleeps for sometime before the
- next poll to avoid any unnecessary CPU cycles. poll_tasks returns
- only when system has capability to accomodate at least one new task.
- """
-
- self._poll_running_tasks()
- while self._continue_polling():
- logger.info('Sleeping before next poll')
- time.sleep(FLAGS.sleep_before_next_poll_secs)
- self._poll_running_tasks()
-
-
-def main(argv):
-
- """Infinite loop to lease new tasks and poll them for completion."""
- # Settings for logger
- set_logger()
- # Instantiate puller
- puller = TaskQueuePuller()
- while True:
- puller.lease_tasks()
- puller.poll_tasks()
-
-if __name__ == '__main__':
- app.run()
diff --git a/samples/gtaskqueue_sample/gtaskqueue/task_cmds.py b/samples/gtaskqueue_sample/gtaskqueue/task_cmds.py
deleted file mode 100644
index d4d651a..0000000
--- a/samples/gtaskqueue_sample/gtaskqueue/task_cmds.py
+++ /dev/null
@@ -1,211 +0,0 @@
-#!/usr/bin/env python
-#
-# Copyright (C) 2010 Google Inc.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-
-"""Commands to interact with the Task object of the TaskQueue API."""
-
-
-__version__ = '0.0.1'
-
-
-
-from gtaskqueue.taskqueue_cmd_base import GoogleTaskCommand
-
-from google.apputils import app
-from google.apputils import appcommands
-import gflags as flags
-
-FLAGS = flags.FLAGS
-
-
-class GetTaskCommand(GoogleTaskCommand):
- """Get properties of an existing task."""
-
- def __init__(self, name, flag_values):
- super(GetTaskCommand, self).__init__(name, flag_values)
-
- def build_request(self, task_api, flag_values):
- """Build a request to get properties of a Task.
-
- Args:
- task_api: The handle to the task collection API.
- flag_values: The parsed command flags.
- Returns:
- The properties of the task.
- """
- return task_api.get(project=flag_values.project_name,
- taskqueue=flag_values.taskqueue_name,
- task=flag_values.task_name)
-
-
-class LeaseTaskCommand(GoogleTaskCommand):
- """Lease a new task from the queue."""
-
- def __init__(self, name, flag_values):
- flags.DEFINE_integer('lease_secs',
- None,
- 'The lease for the task in seconds',
- flag_values=flag_values)
- flags.DEFINE_integer('num_tasks',
- 1,
- 'The number of tasks to lease',
- flag_values=flag_values)
- flags.DEFINE_integer('payload_size_to_display',
- 2 * 1024 * 1024,
- 'Size of the payload for leased tasks to show',
- flag_values=flag_values)
- super(LeaseTaskCommand, self).__init__(name,
- flag_values,
- need_task_flag=False)
-
- def build_request(self, task_api, flag_values):
- """Build a request to lease a pending task from the TaskQueue.
-
- Args:
- task_api: The handle to the task collection API.
- flag_values: The parsed command flags.
- Returns:
- A new leased task.
- """
- if not flag_values.lease_secs:
- raise app.UsageError('lease_secs must be specified')
-
- return task_api.lease(project=flag_values.project_name,
- taskqueue=flag_values.taskqueue_name,
- leaseSecs=flag_values.lease_secs,
- numTasks=flag_values.num_tasks)
-
- def print_result(self, result):
- """Override to optionally strip the payload since it can be long."""
- if result.get('items'):
- items = []
- for task in result.get('items'):
- payloadlen = len(task['payloadBase64'])
- if payloadlen > FLAGS.payload_size_to_display:
- extra = payloadlen - FLAGS.payload_size_to_display
- task['payloadBase64'] = ('%s(%d more bytes)' %
- (task['payloadBase64'][:FLAGS.payload_size_to_display],
- extra))
- items.append(task)
- result['items'] = items
- GoogleTaskCommand.print_result(self, result)
-
-
-class DeleteTaskCommand(GoogleTaskCommand):
- """Delete an existing task."""
-
- def __init__(self, name, flag_values):
- super(DeleteTaskCommand, self).__init__(name, flag_values)
-
- def build_request(self, task_api, flag_values):
- """Build a request to delete a Task.
-
- Args:
- task_api: The handle to the taskqueue collection API.
- flag_values: The parsed command flags.
- Returns:
- Whether the delete was successful.
- """
- return task_api.delete(project=flag_values.project_name,
- taskqueue=flag_values.taskqueue_name,
- task=flag_values.task_name)
-
-
-class ListTasksCommand(GoogleTaskCommand):
- """Lists all tasks in a queue (currently upto a max of 100)."""
-
- def __init__(self, name, flag_values):
- super(ListTasksCommand, self).__init__(name,
- flag_values,
- need_task_flag=False)
-
- def build_request(self, task_api, flag_values):
- """Build a request to lists tasks in a queue.
-
- Args:
- task_api: The handle to the taskqueue collection API.
- flag_values: The parsed command flags.
- Returns:
- A list of pending tasks in the queue.
- """
- return task_api.list(project=flag_values.project_name,
- taskqueue=flag_values.taskqueue_name)
-
-
-class ClearTaskQueueCommand(GoogleTaskCommand):
- """Deletes all tasks in a queue (default to a max of 100)."""
-
-
- def __init__(self, name, flag_values):
- flags.DEFINE_integer('max_delete', 100, 'How many to clear at most',
- flag_values=flag_values)
- super(ClearTaskQueueCommand, self).__init__(name,
- flag_values,
- need_task_flag=False)
-
- def run_with_api_and_flags(self, api, flag_values):
- """Run the command, returning the result.
-
- Args:
- api: The handle to the Google TaskQueue API.
- flag_values: The parsed command flags.
- Returns:
- The result of running the command.
- """
- tasks_api = api.tasks()
- self._flag_values = flag_values
- self._to_delete = flag_values.max_delete
- total_deleted = 0
- while self._to_delete > 0:
- n_deleted = self._delete_a_batch(tasks_api)
- if n_deleted <= 0:
- break
- total_deleted += n_deleted
- return {'deleted': total_deleted}
-
- def _delete_a_batch(self, tasks):
- """Delete a batch of tasks.
-
- Since the list method only gives us back 100 at a time, we may have
- to call it several times to clear the entire queue.
-
- Args:
- tasks: The handle to the Google TaskQueue API Tasks resource.
- Returns:
- The number of tasks deleted.
- """
- list_request = tasks.list(project=self._flag_values.project_name,
- taskqueue=self._flag_values.taskqueue_name)
- result = list_request.execute()
- n_deleted = 0
- if result:
- for task in result.get('items', []):
- if self._to_delete > 0:
- self._to_delete -= 1
- n_deleted += 1
- print 'Deleting: %s' % task['id']
- tasks.delete(project=self._flag_values.project_name,
- taskqueue=self._flag_values.taskqueue_name,
- task=task['id']).execute()
- return n_deleted
-
-
-def add_commands():
- appcommands.AddCmd('listtasks', ListTasksCommand)
- appcommands.AddCmd('gettask', GetTaskCommand)
- appcommands.AddCmd('deletetask', DeleteTaskCommand)
- appcommands.AddCmd('leasetask', LeaseTaskCommand)
- appcommands.AddCmd('clear', ClearTaskQueueCommand)
diff --git a/samples/gtaskqueue_sample/gtaskqueue/taskqueue_client.py b/samples/gtaskqueue_sample/gtaskqueue/taskqueue_client.py
deleted file mode 100644
index 1df012d..0000000
--- a/samples/gtaskqueue_sample/gtaskqueue/taskqueue_client.py
+++ /dev/null
@@ -1,189 +0,0 @@
-#!/usr/bin/env python
-#
-# Copyright (C) 2010 Google Inc.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-"""Class to connect to TaskQueue API."""
-
-
-
-import os
-import sys
-import urlparse
-from apiclient.discovery import build
-from apiclient.errors import HttpError
-import httplib2
-from oauth2client.anyjson import simplejson as json
-from oauth2client.file import Storage
-from oauth2client.client import OAuth2WebServerFlow
-from oauth2client.tools import run
-from gtaskqueue.taskqueue_logger import logger
-
-from google.apputils import app
-import gflags as flags
-
-FLAGS = flags.FLAGS
-flags.DEFINE_string(
- 'service_version',
- 'v1beta1',
- 'Google taskqueue api version.')
-flags.DEFINE_string(
- 'api_host',
- 'https://www.googleapis.com/',
- 'API host name')
-flags.DEFINE_bool(
- 'use_developer_key',
- False,
- 'User wants to use the developer key while accessing taskqueue apis')
-flags.DEFINE_string(
- 'developer_key_file',
- '~/.taskqueue.apikey',
- 'Developer key provisioned from api console')
-flags.DEFINE_bool(
- 'dump_request',
- False,
- 'Prints the outgoing HTTP request along with headers and body.')
-flags.DEFINE_string(
- 'credentials_file',
- 'taskqueue.dat',
- 'File where you want to store the auth credentails for later user')
-
-# Set up a Flow object to be used if we need to authenticate. This
-# sample uses OAuth 2.0, and we set up the OAuth2WebServerFlow with
-# the information it needs to authenticate. Note that it is called
-# the Web Server Flow, but it can also handle the flow for native
-# applications <http://code.google.com/apis/accounts/docs/OAuth2.html#IA>
-# The client_id client_secret are copied from the Identity tab on
-# the Google APIs Console <http://code.google.com/apis/console>
-FLOW = OAuth2WebServerFlow(
- client_id='157776985798.apps.googleusercontent.com',
- client_secret='tlpVCmaS6yLjxnnPu0ARIhNw',
- scope='https://www.googleapis.com/auth/taskqueue',
- user_agent='taskqueue-cmdline-sample/1.0')
-
-
-class TaskQueueClient:
- """Class to setup connection with taskqueue API."""
-
- def __init__(self):
- if not FLAGS.project_name:
- raise app.UsageError('You must specify a project name'
- ' using the "--project_name" flag.')
- discovery_uri = (
- FLAGS.api_host + 'discovery/v1/apis/{api}/{apiVersion}/rest')
- logger.info(discovery_uri)
- try:
- # If the Credentials don't exist or are invalid run through the
- # native clien flow. The Storage object will ensure that if
- # successful the good Credentials will get written back to a file.
- # Setting FLAGS.auth_local_webserver to false since we can run our
- # tool on Virtual Machines and we do not want to run the webserver
- # on VMs.
- FLAGS.auth_local_webserver = False
- storage = Storage(FLAGS.credentials_file)
- credentials = storage.get()
- if credentials is None or credentials.invalid == True:
- credentials = run(FLOW, storage)
- http = credentials.authorize(self._dump_request_wrapper(
- httplib2.Http()))
- self.task_api = build('taskqueue',
- FLAGS.service_version,
- http=http,
- discoveryServiceUrl=discovery_uri)
- except HttpError, http_error:
- logger.error('Error gettin task_api: %s' % http_error)
-
- def get_taskapi(self):
- """Returns handler for tasks API from taskqueue API collection."""
- return self.task_api
-
-
- def _dump_request_wrapper(self, http):
- """Dumps the outgoing HTTP request if requested.
-
- Args:
- http: An instance of httplib2.Http or something that acts like it.
-
- Returns:
- httplib2.Http like object.
- """
- request_orig = http.request
-
- def new_request(uri, method='GET', body=None, headers=None,
- redirections=httplib2.DEFAULT_MAX_REDIRECTS,
- connection_type=None):
- """Overrides the http.request method to add some utilities."""
- if (FLAGS.api_host + "discovery/" not in uri and
- FLAGS.use_developer_key):
- developer_key_path = os.path.expanduser(
- FLAGS.developer_key_file)
- if not os.path.isfile(developer_key_path):
- print 'Please generate developer key from the Google API' \
- 'Console and store it in %s' % (FLAGS.developer_key_file)
- sys.exit()
- developer_key_file = open(developer_key_path, 'r')
- try:
- developer_key = developer_key_file.read().strip()
- except IOError, io_error:
- print 'Error loading developer key from file %s' % (
- FLAGS.developer_key_file)
- print 'Error details: %s' % str(io_error)
- sys.exit()
- finally:
- developer_key_file.close()
- s = urlparse.urlparse(uri)
- query = 'key=' + developer_key
- if s.query:
- query = s.query + '&key=' + developer_key
- d = urlparse.ParseResult(s.scheme,
- s.netloc,
- s.path,
- s.params,
- query,
- s.fragment)
- uri = urlparse.urlunparse(d)
- if FLAGS.dump_request:
- print '--request-start--'
- print '%s %s' % (method, uri)
- if headers:
- for (h, v) in headers.iteritems():
- print '%s: %s' % (h, v)
- print ''
- if body:
- print json.dumps(json.loads(body),
- sort_keys=True,
- indent=2)
- print '--request-end--'
- return request_orig(uri,
- method,
- body,
- headers,
- redirections,
- connection_type)
- http.request = new_request
- return http
-
- def print_result(self, result):
- """Pretty-print the result of the command.
-
- The default behavior is to dump a formatted JSON encoding
- of the result.
-
- Args:
- result: The JSON-serializable result to print.
- """
- # We could have used the pprint module, but it produces
- # noisy output due to all of our keys and values being
- # unicode strings rather than simply ascii.
- print json.dumps(result, sort_keys=True, indent=2)
diff --git a/samples/gtaskqueue_sample/gtaskqueue/taskqueue_cmd_base.py b/samples/gtaskqueue_sample/gtaskqueue/taskqueue_cmd_base.py
deleted file mode 100644
index 0400acd..0000000
--- a/samples/gtaskqueue_sample/gtaskqueue/taskqueue_cmd_base.py
+++ /dev/null
@@ -1,275 +0,0 @@
-#!/usr/bin/env python
-#
-# Copyright (C) 2010 Google Inc.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-
-"""Commands for interacting with Google TaskQueue."""
-
-__version__ = '0.0.1'
-
-
-import os
-import sys
-import urlparse
-
-
-from apiclient.discovery import build
-from apiclient.errors import HttpError
-import httplib2
-from oauth2client.anyjson import simplejson as json
-from oauth2client.file import Storage
-from oauth2client.client import OAuth2WebServerFlow
-from oauth2client.tools import run
-
-from google.apputils import app
-from google.apputils import appcommands
-import gflags as flags
-
-
-FLAGS = flags.FLAGS
-
-flags.DEFINE_string(
- 'service_version',
- 'v1beta1',
- 'Google taskqueue api version.')
-flags.DEFINE_string(
- 'api_host',
- 'https://www.googleapis.com/',
- 'API host name')
-flags.DEFINE_string(
- 'project_name',
- 'default',
- 'The name of the Taskqueue API project.')
-flags.DEFINE_bool(
- 'use_developer_key',
- False,
- 'User wants to use the developer key while accessing taskqueue apis')
-flags.DEFINE_string(
- 'developer_key_file',
- '~/.taskqueue.apikey',
- 'Developer key provisioned from api console')
-flags.DEFINE_bool(
- 'dump_request',
- False,
- 'Prints the outgoing HTTP request along with headers and body.')
-flags.DEFINE_string(
- 'credentials_file',
- 'taskqueue.dat',
- 'File where you want to store the auth credentails for later user')
-
-# Set up a Flow object to be used if we need to authenticate. This
-# sample uses OAuth 2.0, and we set up the OAuth2WebServerFlow with
-# the information it needs to authenticate. Note that it is called
-# the Web Server Flow, but it can also handle the flow for native
-# applications <http://code.google.com/apis/accounts/docs/OAuth2.html#IA>
-# The client_id client_secret are copied from the Identity tab on
-# the Google APIs Console <http://code.google.com/apis/console>
-FLOW = OAuth2WebServerFlow(
- client_id='157776985798.apps.googleusercontent.com',
- client_secret='tlpVCmaS6yLjxnnPu0ARIhNw',
- scope='https://www.googleapis.com/auth/taskqueue',
- user_agent='taskqueue-cmdline-sample/1.0')
-
-class GoogleTaskQueueCommandBase(appcommands.Cmd):
- """Base class for all the Google TaskQueue client commands."""
-
- DEFAULT_PROJECT_PATH = 'projects/default'
-
- def __init__(self, name, flag_values):
- super(GoogleTaskQueueCommandBase, self).__init__(name, flag_values)
-
- def _dump_request_wrapper(self, http):
- """Dumps the outgoing HTTP request if requested.
-
- Args:
- http: An instance of httplib2.Http or something that acts like it.
-
- Returns:
- httplib2.Http like object.
- """
- request_orig = http.request
-
- def new_request(uri, method='GET', body=None, headers=None,
- redirections=httplib2.DEFAULT_MAX_REDIRECTS,
- connection_type=None):
- """Overrides the http.request method to add some utilities."""
- if (FLAGS.api_host + "discovery/" not in uri and
- FLAGS.use_developer_key):
- developer_key_path = os.path.expanduser(
- FLAGS.developer_key_file)
- if not os.path.isfile(developer_key_path):
- print 'Please generate developer key from the Google APIs' \
- 'Console and store it in %s' % (FLAGS.developer_key_file)
- sys.exit()
- developer_key_file = open(developer_key_path, 'r')
- try:
- developer_key = developer_key_file.read().strip()
- except IOError, io_error:
- print 'Error loading developer key from file %s' % (
- FLAGS.developer_key_file)
- print 'Error details: %s' % str(io_error)
- sys.exit()
- finally:
- developer_key_file.close()
- s = urlparse.urlparse(uri)
- query = 'key=' + developer_key
- if s.query:
- query = s.query + '&key=' + developer_key
- d = urlparse.ParseResult(s.scheme,
- s.netloc,
- s.path,
- s.params,
- query,
- s.fragment)
- uri = urlparse.urlunparse(d)
-
- if FLAGS.dump_request:
- print '--request-start--'
- print '%s %s' % (method, uri)
- if headers:
- for (h, v) in headers.iteritems():
- print '%s: %s' % (h, v)
- print ''
- if body:
- print json.dumps(json.loads(body), sort_keys=True, indent=2)
- print '--request-end--'
- return request_orig(uri,
- method,
- body,
- headers,
- redirections,
- connection_type)
- http.request = new_request
- return http
-
- def Run(self, argv):
- """Run the command, printing the result.
-
- Args:
- argv: The non-flag arguments to the command.
- """
- if not FLAGS.project_name:
- raise app.UsageError('You must specify a project name'
- ' using the "--project_name" flag.')
- discovery_uri = (
- FLAGS.api_host + 'discovery/v1/apis/{api}/{apiVersion}/rest')
- try:
- # If the Credentials don't exist or are invalid run through the
- # native client flow. The Storage object will ensure that if
- # successful the good Credentials will get written back to a file.
- # Setting FLAGS.auth_local_webserver to false since we can run our
- # tool on Virtual Machines and we do not want to run the webserver
- # on VMs.
- FLAGS.auth_local_webserver = False
- storage = Storage(FLAGS.credentials_file)
- credentials = storage.get()
- if credentials is None or credentials.invalid == True:
- credentials = run(FLOW, storage)
- http = credentials.authorize(self._dump_request_wrapper(
- httplib2.Http()))
- api = build('taskqueue',
- FLAGS.service_version,
- http=http,
- discoveryServiceUrl=discovery_uri)
- result = self.run_with_api_and_flags_and_args(api, FLAGS, argv)
- self.print_result(result)
- except HttpError, http_error:
- print 'Error Processing request: %s' % str(http_error)
-
- def run_with_api_and_flags_and_args(self, api, flag_values, unused_argv):
- """Run the command given the API, flags, and args.
-
- The default implementation of this method discards the args and
- calls into run_with_api_and_flags.
-
- Args:
- api: The handle to the Google TaskQueue API.
- flag_values: The parsed command flags.
- unused_argv: The non-flag arguments to the command.
- Returns:
- The result of running the command
- """
- return self.run_with_api_and_flags(api, flag_values)
-
- def print_result(self, result):
- """Pretty-print the result of the command.
-
- The default behavior is to dump a formatted JSON encoding
- of the result.
-
- Args:
- result: The JSON-serializable result to print.
- """
- # We could have used the pprint module, but it produces
- # noisy output due to all of our keys and values being
- # unicode strings rather than simply ascii.
- print json.dumps(result, sort_keys=True, indent=2)
-
-
-
-class GoogleTaskQueueCommand(GoogleTaskQueueCommandBase):
- """Base command for working with the taskqueues collection."""
-
- def __init__(self, name, flag_values):
- super(GoogleTaskQueueCommand, self).__init__(name, flag_values)
- flags.DEFINE_string('taskqueue_name',
- 'myqueue',
- 'TaskQueue name',
- flag_values=flag_values)
-
- def run_with_api_and_flags(self, api, flag_values):
- """Run the command, returning the result.
-
- Args:
- api: The handle to the Google TaskQueue API.
- flag_values: The parsed command flags.
- Returns:
- The result of running the command.
- """
- taskqueue_request = self.build_request(api.taskqueues(), flag_values)
- return taskqueue_request.execute()
-
-
-class GoogleTaskCommand(GoogleTaskQueueCommandBase):
- """Base command for working with the tasks collection."""
-
- def __init__(self, name, flag_values, need_task_flag=True):
- super(GoogleTaskCommand, self).__init__(name, flag_values)
- # Common flags that are shared by all the Task commands.
- flags.DEFINE_string('taskqueue_name',
- 'myqueue',
- 'TaskQueue name',
- flag_values=flag_values)
- # Not all task commands need the task_name flag.
- if need_task_flag:
- flags.DEFINE_string('task_name',
- None,
- 'Task name',
- flag_values=flag_values)
-
- def run_with_api_and_flags(self, api, flag_values):
- """Run the command, returning the result.
-
- Args:
- api: The handle to the Google TaskQueue API.
- flag_values: The parsed command flags.
- flags.DEFINE_string('payload',
- None,
- 'Payload of the task')
- Returns:
- The result of running the command.
- """
- task_request = self.build_request(api.tasks(), flag_values)
- return task_request.execute()
diff --git a/samples/gtaskqueue_sample/gtaskqueue/taskqueue_cmds.py b/samples/gtaskqueue_sample/gtaskqueue/taskqueue_cmds.py
deleted file mode 100644
index f21840a..0000000
--- a/samples/gtaskqueue_sample/gtaskqueue/taskqueue_cmds.py
+++ /dev/null
@@ -1,57 +0,0 @@
-#!/usr/bin/env python
-#
-# Copyright (C) 2010 Google Inc.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-"""Commands to interact with the TaskQueue object of the TaskQueue API."""
-
-
-__version__ = '0.0.1'
-
-
-
-from gtaskqueue.taskqueue_cmd_base import GoogleTaskQueueCommand
-
-from google.apputils import appcommands
-import gflags as flags
-
-FLAGS = flags.FLAGS
-
-
-class GetTaskQueueCommand(GoogleTaskQueueCommand):
- """Get properties of an existing task queue."""
-
- def __init__(self, name, flag_values):
- flags.DEFINE_boolean('get_stats',
- False,
- 'Whether to get Stats',
- flag_values=flag_values)
- super(GetTaskQueueCommand, self).__init__(name, flag_values)
-
- def build_request(self, taskqueue_api, flag_values):
- """Build a request to get properties of a TaskQueue.
-
- Args:
- taskqueue_api: The handle to the taskqueue collection API.
- flag_values: The parsed command flags.
- Returns:
- The properties of the taskqueue.
- """
- return taskqueue_api.get(project=flag_values.project_name,
- taskqueue=flag_values.taskqueue_name,
- getStats=flag_values.get_stats)
-
-
-def add_commands():
- appcommands.AddCmd('getqueue', GetTaskQueueCommand)
diff --git a/samples/gtaskqueue_sample/gtaskqueue/taskqueue_logger.py b/samples/gtaskqueue_sample/gtaskqueue/taskqueue_logger.py
deleted file mode 100644
index 3f445f3..0000000
--- a/samples/gtaskqueue_sample/gtaskqueue/taskqueue_logger.py
+++ /dev/null
@@ -1,54 +0,0 @@
-#!/usr/bin/env python
-#
-# Copyright (C) 2010 Google Inc.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-"""Log settings for taskqueue_puller module."""
-
-
-
-import logging
-import logging.config
-from google.apputils import app
-import gflags as flags
-
-FLAGS = flags.FLAGS
-flags.DEFINE_string(
- 'log_output_file',
- '/tmp/taskqueue-puller.log',
- 'Logfile name for taskqueue_puller.')
-
-
-logger = logging.getLogger('TaskQueueClient')
-
-
-def set_logger():
- """Settings for taskqueue_puller logger."""
- logger.setLevel(logging.INFO)
-
- # create formatter
- formatter = logging.Formatter(
- '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
-
- # Set size of the log file and the backup count for rotated log files.
- handler = logging.handlers.RotatingFileHandler(FLAGS.log_output_file,
- maxBytes = 1024 * 1024,
- backupCount = 5)
- # add formatter to handler
- handler.setFormatter(formatter)
- # add formatter to handler
- logger.addHandler(handler)
-
-if __name__ == '__main__':
- app.run()
diff --git a/samples/gtaskqueue_sample/setup.py b/samples/gtaskqueue_sample/setup.py
deleted file mode 100644
index b7eff8f..0000000
--- a/samples/gtaskqueue_sample/setup.py
+++ /dev/null
@@ -1,53 +0,0 @@
-#!/usr/bin/env python
-#
-# Copyright (C) 2010 Google Inc.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-"""Setup script for the Google TaskQueue API command-line tool."""
-
-__version__ = '1.0.2'
-
-
-import sys
-try:
- from setuptools import setup
- print 'Loaded setuptools'
-except ImportError:
- from distutils.core import setup
- print 'Loaded distutils.core'
-
-
-PACKAGE_NAME = 'google-taskqueue-client'
-INSTALL_REQUIRES = ['google-apputils==0.1',
- 'google-api-python-client',
- 'httplib2',
- 'oauth2',
- 'python-gflags']
-setup(name=PACKAGE_NAME,
- version=__version__,
- description='Google TaskQueue API command-line tool and utils',
- author='Google Inc.',
- author_email='google-appengine@googlegroups.com',
- url='http://code.google.com/appengine/docs/python/taskqueue/pull/overview.html',
- install_requires=INSTALL_REQUIRES,
- packages=['gtaskqueue'],
- scripts=['gtaskqueue/gtaskqueue', 'gtaskqueue/gtaskqueue_puller',
- 'gtaskqueue/gen_appengine_access_token'],
- license='Apache 2.0',
- keywords='google taskqueue api client',
- classifiers=['Development Status :: 3 - Alpha',
- 'Intended Audience :: Developers',
- 'License :: OSI Approved :: Apache Software License',
- 'Operating System :: POSIX',
- 'Topic :: Internet :: WWW/HTTP'])