Revert "[autotest] Threaded asynchronous task execution on drones."
Problems with the retry decorator and localhost threads.
This reverts commit 0933899b0dd1320e90e06025cced8096aed44908.
Change-Id: I99318b4bdf4c11e9c4e5181c4ff5b1bdcdcbb89c
Reviewed-on: https://chromium-review.googlesource.com/207038
Reviewed-by: Prashanth B <beeps@chromium.org>
Commit-Queue: Prashanth B <beeps@chromium.org>
Tested-by: Prashanth B <beeps@chromium.org>
diff --git a/scheduler/drone_manager.py b/scheduler/drone_manager.py
index 23fcff2..cc9f44e 100644
--- a/scheduler/drone_manager.py
+++ b/scheduler/drone_manager.py
@@ -1,18 +1,10 @@
-import collections
-import heapq
-import os
-import Queue
-import time
-import threading
-import traceback
-import logging
+import os, time, heapq, traceback, logging
import common
from autotest_lib.client.common_lib import error, global_config, utils
from autotest_lib.client.common_lib.cros.graphite import stats
from autotest_lib.scheduler import email_manager, drone_utility, drones
from autotest_lib.scheduler import scheduler_config
-from autotest_lib.scheduler import thread_lib
# results on drones will be placed under the drone_installation_directory in a
@@ -152,8 +144,7 @@
# Minimum time to wait before next email
# about a drone hitting process limit is sent.
NOTIFY_INTERVAL = 60 * 60 * 24 # one day
- _STATS_KEY = 'drone_manager'
- _timer = stats.Timer(_STATS_KEY)
+ _timer = stats.Timer('drone_manager')
def __init__(self):
@@ -181,9 +172,6 @@
# map drone hostname to time stamp of email that
# has been sent about the drone hitting process limit.
self._notify_record = {}
- # A threaded task queue used to refresh drones asynchronously.
- self._refresh_task_queue = thread_lib.ThreadedTaskQueue(
- name='%s.refresh_queue' % self._STATS_KEY)
def initialize(self, base_results_dir, drone_hostnames,
@@ -314,12 +302,6 @@
def _parse_pidfile(self, drone, raw_contents):
- """Parse raw pidfile contents.
-
- @param drone: The drone on which this pidfile was found.
- @param raw_contents: The raw contents of a pidfile, eg:
- "pid\nexit_staus\nnum_tests_failed\n".
- """
contents = PidfileContents()
if not raw_contents:
return contents
@@ -408,66 +390,19 @@
self._notify_record[drone.hostname] = now
- def trigger_refresh(self):
- """Triggers a drone manager refresh.
-
- @raises DroneManagerError: If a drone has un-executed calls.
- Since they will get clobbered when we queue refresh calls.
+ def refresh(self):
+ """
+ Called at the beginning of a scheduler cycle to refresh all process
+ information.
"""
self._reset()
self._drop_old_pidfiles()
pidfile_paths = [pidfile_id.path
for pidfile_id in self._registered_pidfile_info]
- drones = list(self.get_drones())
- for drone in drones:
- calls = drone.get_calls()
- if calls:
- raise DroneManagerError('Drone %s has un-executed calls: %s '
- 'which might get corrupted through '
- 'this invocation' %
- (drone, [str(call) for call in calls]))
- drone.queue_call('refresh', pidfile_paths)
- logging.info("Invoking drone refresh.")
- with self._timer.get_client('trigger_refresh'):
- self._refresh_task_queue.execute(drones, wait=False)
+ with self._timer.get_client('refresh'):
+ all_results = self._call_all_drones('refresh', pidfile_paths)
+ logging.info("Drones refreshed")
-
- def sync_refresh(self):
- """Complete the drone refresh started by trigger_refresh.
-
- Waits for all drone threads then refreshes internal datastructures
- with drone process information.
- """
-
- # This gives us a dictionary like what follows:
- # {drone: [{'pidfiles': (raw contents of pidfile paths),
- # 'autoserv_processes': (autoserv process info from ps),
- # 'all_processes': (all process info from ps),
- # 'parse_processes': (parse process infor from ps),
- # 'pidfile_second_read': (pidfile contents, again),}]
- # drone2: ...}
- # The values of each drone are only a list because this adheres to the
- # drone utility interface (each call is executed and its results are
- # places in a list, but since we never couple the refresh calls with
- # any other call, this list will always contain a single dict).
- with self._timer.get_client('sync_refresh'):
- all_results = self._refresh_task_queue.get_results()
- logging.info("Drones refreshed.")
-
- # The loop below goes through and parses pidfile contents. Pidfiles
- # are used to track autoserv execution, and will always contain < 3
- # lines of the following: pid, exit code, number of tests. Each pidfile
- # is identified by a PidfileId object, which contains a unique pidfile
- # path (unique because it contains the job id) making it hashable.
- # All pidfiles are stored in the drone managers _pidfiles dict as:
- # {pidfile_id: pidfile_contents(Process(drone, pid),
- # exit_code, num_tests_failed)}
- # In handle agents, each agent knows its pidfile_id, and uses this
- # to retrieve the refreshed contents of its pidfile via the
- # PidfileRunMonitor (through its tick) before making decisions. If
- # the agent notices that its process has exited, it unregisters the
- # pidfile from the drone_managers._registered_pidfile_info dict
- # through its epilog.
for drone, results_list in all_results.iteritems():
results = results_list[0]
drone_hostname = drone.hostname.replace('.', '_')
@@ -495,23 +430,13 @@
self._check_drone_process_limit(drone)
- def refresh(self):
- """Refresh all drones."""
- with self._timer.get_client('refresh'):
- self.trigger_refresh()
- self.sync_refresh()
-
-
def execute_actions(self):
"""
Called at the end of a scheduler cycle to execute all queued actions
on drones.
"""
- # Invoke calls queued on all drones since the last call to execute
- # and wait for them to return.
- thread_lib.ThreadedTaskQueue(
- name='%s.execute_queue' % self._STATS_KEY).execute(
- self._drones.values())
+ for drone in self._drones.values():
+ drone.execute_queued_calls()
try:
self._results_drone.execute_queued_calls()
diff --git a/scheduler/drone_manager_unittest.py b/scheduler/drone_manager_unittest.py
index 034cd38..d2ee2a2 100755
--- a/scheduler/drone_manager_unittest.py
+++ b/scheduler/drone_manager_unittest.py
@@ -1,17 +1,11 @@
#!/usr/bin/python
-import cPickle
import os, unittest
import common
from autotest_lib.client.common_lib import global_config
-from autotest_lib.client.common_lib import utils
from autotest_lib.client.common_lib.test_utils import mock
from autotest_lib.scheduler import drone_manager, drone_utility, drones
from autotest_lib.scheduler import scheduler_config, site_drone_manager
-from autotest_lib.scheduler import thread_lib
-from autotest_lib.scheduler import pidfile_monitor
-from autotest_lib.server.hosts import ssh_host
-
class MockDrone(drones._AbstractDrone):
def __init__(self, name, active_processes=0, max_processes=10,
@@ -337,140 +331,5 @@
self.assertFalse(self.manager._registered_pidfile_info)
-class AsyncDroneManager(unittest.TestCase):
- _DRONE_INSTALL_DIR = '/drone/install/dir'
- _RESULTS_DIR = '/results/dir'
-
-
- def create_remote_drone(self, hostname, mock_host):
- """Create and initialize a Remote Drone.
-
- @return: A remote drone instance.
- """
- drones.drone_utility.create_host.expect_call(hostname).and_return(
- mock_host)
- mock_host.is_up.expect_call().and_return(True)
- return drones._RemoteDrone(hostname)
-
-
- def create_fake_pidfile_info(self, tag='tag', name='name'):
- pidfile_id = self.manager.get_pidfile_id_from(tag, name)
- self.manager.register_pidfile(pidfile_id)
- return self.manager._registered_pidfile_info
-
-
- def setUp(self):
- self.god = mock.mock_god()
- self.god.stub_with(drones, 'AUTOTEST_INSTALL_DIR',
- self._DRONE_INSTALL_DIR)
- self.manager = drone_manager.DroneManager()
- self.god.stub_with(self.manager, '_results_dir', self._RESULTS_DIR)
-
- # we don't want this to ever actually get called
- self.god.stub_function(drones, 'get_drone')
- # we don't want the DroneManager to go messing with global config
- def do_nothing():
- pass
- self.god.stub_with(self.manager, 'refresh_drone_configs', do_nothing)
-
- # set up some dummy drones with different mock hosts.
- mock_host1 = self.god.create_mock_class(ssh_host.SSHHost,
- 'mock SSHHost 1')
- mock_host2 = self.god.create_mock_class(ssh_host.SSHHost,
- 'mock SSHHost 2')
- self.god.stub_function(drones.drone_utility, 'create_host')
- self.mock_drone_1 = self.create_remote_drone(
- 'fakehostname1', mock_host1)
- self.mock_drone_2 = self.create_remote_drone(
- 'fakehostname2', mock_host2)
- for mock_drone in [self.mock_drone_1, self.mock_drone_2]:
- self.manager._drones[mock_drone.hostname] = mock_drone
-
- self.results_drone = MockDrone('results_drone', 0, 10)
- self.manager._results_drone = self.results_drone
- self.drone_utility_path = 'mock-drone-utility-path'
- self.mock_return = {'results': ['mock results'],
- 'warnings': []}
-
-
- def tearDown(self):
- self.god.unstub_all()
-
- def test_trigger_refresh(self):
- """Test drone manager trigger refresh."""
- self.god.stub_with(drones._RemoteDrone, '_drone_utility_path',
- self.drone_utility_path)
-
- # Create some fake pidfiles and confirm that a refresh call is
- # executed on each drone host, with the same pidfile paths. Then
- # check that each drone gets a key in the returned results dictionary.
- for i in range(0, 1):
- pidfile_info = self.create_fake_pidfile_info(
- 'tag%s' % i, 'name%s' %i)
- pidfile_paths = [pidfile.path for pidfile in pidfile_info.keys()]
- refresh_call = drone_utility.call('refresh', pidfile_paths)
- expected_results = {}
- for drone in self.manager.get_drones():
- mock_result = utils.CmdResult(
- stdout=cPickle.dumps(self.mock_return))
- drone._host.run.expect_call(
- 'python %s' % self.drone_utility_path,
- stdin=cPickle.dumps([refresh_call]), stdout_tee=None,
- connect_timeout=mock.is_instance_comparator(int)
- ).and_return(mock_result)
- expected_results[drone] = self.mock_return['results']
-
- self.manager.trigger_refresh()
- self.assertTrue(self.manager._refresh_task_queue.get_results() ==
- expected_results)
- self.god.check_playback()
-
-
- def test_sync_refresh(self):
- """Test drone manager sync refresh."""
-
- # Insert some drone_utility results into the results queue, then
- # check that get_results returns it in the right format, and that
- # the rest of sync_refresh populates the right datastructures for
- # correct handling of agents. Also confirm that this method of
- # syncing is sufficient for the monitor to pick up the exit status
- # of the process in the same way it would in handle_agents.
- pidfile_path = 'results/hosts/host_id/job_id-name/.autoserv_execute'
- pidfiles = {pidfile_path: '123\n12\n0\n'}
- drone_utility_results = {
- 'pidfiles': pidfiles,
- 'autoserv_processes':{},
- 'all_processes':{},
- 'parse_processes':{},
- 'pidfiles_second_read':pidfiles,
- }
- # Our manager instance isn't the drone manager singletone that the
- # pidfile_monitor will use by default, becuase setUp doesn't call
- # drone_manager.instance().
- self.god.stub_with(pidfile_monitor, '_drone_manager', self.manager)
- monitor = pidfile_monitor.PidfileRunMonitor()
- monitor.pidfile_id = drone_manager.PidfileId(pidfile_path)
- self.manager.register_pidfile(monitor.pidfile_id)
- self.assertTrue(monitor._state.exit_status == None)
-
- self.manager._refresh_task_queue.results_queue.put(
- thread_lib.ThreadedTaskQueue.result(
- self.mock_drone_1, [drone_utility_results]))
- self.manager.sync_refresh()
- pidfiles = self.manager._pidfiles
- pidfile_id = pidfiles.keys()[0]
- pidfile_contents = pidfiles[pidfile_id]
-
- self.assertTrue(
- pidfile_id.path == pidfile_path and
- pidfile_contents.process.pid == 123 and
- pidfile_contents.process.hostname ==
- self.mock_drone_1.hostname and
- pidfile_contents.exit_status == 12 and
- pidfile_contents.num_tests_failed == 0)
- self.assertTrue(monitor.exit_code() == 12)
- self.god.check_playback()
-
-
if __name__ == '__main__':
unittest.main()
diff --git a/scheduler/drone_utility.py b/scheduler/drone_utility.py
index 30cd336..ab1ce44 100755
--- a/scheduler/drone_utility.py
+++ b/scheduler/drone_utility.py
@@ -1,15 +1,4 @@
#!/usr/bin/python
-#pylint: disable-msg=C0111
-
-"""Utility module that executes management commands on the drone.
-
-1. This is the module responsible for orchestrating processes on a drone.
-2. It receives instructions via stdin and replies via stdout.
-3. Each invocation is responsible for the initiation of a set of batched calls.
-4. The batched calls may be synchronous or asynchronous.
-5. The caller is responsible for monitoring asynchronous calls through pidfiles.
-"""
-
import pickle, subprocess, os, shutil, sys, time, signal, getpass
import datetime, traceback, tempfile, itertools, logging
@@ -111,17 +100,9 @@
@classmethod
@timer.decorate
def _get_process_info(cls):
- """Parse ps output for all process information.
-
+ """
@returns A generator of dicts with cls._PS_ARGS as keys and
- string values each representing a running process. eg:
- {
- 'comm': command_name,
- 'pgid': process group id,
- 'ppid': parent process id,
- 'pid': process id,
- 'args': args the command was invoked with,
- }
+ string values each representing a running process.
"""
@retry.retry(subprocess.CalledProcessError,
timeout_min=0.5, delay_sec=0.25)
@@ -138,16 +119,6 @@
def _refresh_processes(self, command_name, open=open,
site_check_parse=None):
- """Refreshes process info for the given command_name.
-
- Examines ps output as returned by get_process_info and returns
- the process dicts for processes matching the given command name.
-
- @param command_name: The name of the command, eg 'autoserv'.
-
- @return: A list of process info dictionaries as returned by
- _get_process_info.
- """
# The open argument is used for test injection.
check_mark = global_config.global_config.get_config_value(
'SCHEDULER', 'check_processes_for_dark_mark', bool, False)
diff --git a/scheduler/drones.py b/scheduler/drones.py
index c52c3dc..605d250 100644
--- a/scheduler/drones.py
+++ b/scheduler/drones.py
@@ -1,5 +1,3 @@
-#pylint: disable-msg=C0111
-
import cPickle, os, tempfile, logging
import common
from autotest_lib.scheduler import drone_utility, email_manager
@@ -72,14 +70,6 @@
return return_message['results']
- def get_calls(self):
- """Returns the calls queued against this drone.
-
- @return: A list of calls queued against the drone.
- """
- return self._calls
-
-
def call(self, method, *args, **kwargs):
return self._execute_calls(
[drone_utility.call(method, *args, **kwargs)])
@@ -95,9 +85,8 @@
def execute_queued_calls(self):
if not self._calls:
return
- results = self._execute_calls(self._calls)
+ self._execute_calls(self._calls)
self.clear_call_queue()
- return results
def set_autotest_install_dir(self, path):
diff --git a/scheduler/drones_unittest.py b/scheduler/drones_unittest.py
index a3c3953..d395288 100755
--- a/scheduler/drones_unittest.py
+++ b/scheduler/drones_unittest.py
@@ -1,5 +1,4 @@
#!/usr/bin/python
-#pylint: disable-msg=C0111
"""Tests for autotest_lib.scheduler.drones."""
@@ -13,13 +12,11 @@
class RemoteDroneTest(unittest.TestCase):
-
def setUp(self):
self.god = mock.mock_god()
self._mock_host = self.god.create_mock_class(ssh_host.SSHHost,
'mock SSHHost')
self.god.stub_function(drones.drone_utility, 'create_host')
- self.drone_utility_path = 'mock-drone-utility-path'
def tearDown(self):
@@ -36,43 +33,22 @@
def test_execute_calls_impl(self):
self.god.stub_with(drones._RemoteDrone, '_drone_utility_path',
- self.drone_utility_path)
+ 'mock-drone-utility-path')
drones.drone_utility.create_host.expect_call('fakehost').and_return(
self._mock_host)
self._mock_host.is_up.expect_call().and_return(True)
mock_calls = ('foo',)
mock_result = utils.CmdResult(stdout=cPickle.dumps('mock return'))
self._mock_host.run.expect_call(
- 'python %s' % self.drone_utility_path,
+ 'python mock-drone-utility-path',
stdin=cPickle.dumps(mock_calls), stdout_tee=None,
connect_timeout=mock.is_instance_comparator(int)).and_return(
mock_result)
+
drone = drones._RemoteDrone('fakehost')
self.assertEqual('mock return', drone._execute_calls_impl(mock_calls))
self.god.check_playback()
- def test_execute_queued_calls(self):
- self.god.stub_with(drones._RemoteDrone, '_drone_utility_path',
- self.drone_utility_path)
- drones.drone_utility.create_host.expect_call('fakehost').and_return(
- self._mock_host)
- self._mock_host.is_up.expect_call().and_return(True)
- drone = drones._RemoteDrone('fakehost')
- mock_return={}
- mock_return['results'] = ['mock return']
- mock_return['warnings'] = []
- drone.queue_call('foo')
- mock_result = utils.CmdResult(stdout=cPickle.dumps(mock_return))
- self._mock_host.run.expect_call(
- 'python %s' % self.drone_utility_path,
- stdin=cPickle.dumps(drone.get_calls()), stdout_tee=None,
- connect_timeout=mock.is_instance_comparator(int)).and_return(
- mock_result)
- self.assertEqual(mock_return['results'], drone.execute_queued_calls())
- self.god.check_playback()
-
-
-
if __name__ == '__main__':
unittest.main()
diff --git a/scheduler/monitor_db.py b/scheduler/monitor_db.py
index ddcb028..4860859 100755
--- a/scheduler/monitor_db.py
+++ b/scheduler/monitor_db.py
@@ -299,8 +299,8 @@
timer = stats.Timer('scheduler.tick')
self._log_tick_msg('Calling new tick, starting garbage collection().')
self._garbage_collection()
- self._log_tick_msg('Calling _drone_manager.trigger_refresh().')
- _drone_manager.trigger_refresh()
+ self._log_tick_msg('Calling _drone_manager.refresh().')
+ _drone_manager.refresh()
self._log_tick_msg('Calling _run_cleanup().')
self._run_cleanup()
self._log_tick_msg('Calling _find_aborting().')
@@ -317,8 +317,6 @@
self._schedule_special_tasks()
self._log_tick_msg('Calling _schedule_new_jobs().')
self._schedule_new_jobs()
- self._log_tick_msg('Calling _drone_manager.sync_refresh().')
- _drone_manager.sync_refresh()
self._log_tick_msg('Calling _handle_agents().')
self._handle_agents()
self._log_tick_msg('Calling _host_scheduler.tick().')
diff --git a/scheduler/site_drones.py b/scheduler/site_drones.py
index 2022f64..fad9e4a 100644
--- a/scheduler/site_drones.py
+++ b/scheduler/site_drones.py
@@ -40,4 +40,4 @@
if self._processes_to_kill:
self.queue_call('kill_processes', self._processes_to_kill)
self.clear_processes_to_kill()
- return super(_SiteAbstractDrone, self).execute_queued_calls()
+ super(_SiteAbstractDrone, self).execute_queued_calls()
diff --git a/scheduler/thread_lib.py b/scheduler/thread_lib.py
deleted file mode 100644
index cb9f226..0000000
--- a/scheduler/thread_lib.py
+++ /dev/null
@@ -1,211 +0,0 @@
-# Copyright (c) 2014 The Chromium OS Authors. All rights reserved.
-# Use of this source code is governed by a BSD-style license that can be
-# found in the LICENSE file.
-
-
-"""Thread library for drone management.
-
-This library contains a threaded task queue capable of starting, monitoring
-and syncing threads across remote and localhost drones asynchronously. It also
-contains a wrapper for standard python threads that records exceptions so they
-can be re-raised in the thread manager. The api exposed by the threaded task
-queue is as follows:
- 1. worker: The staticmethod executed by all worker threads.
- 2. execute: Takes a list of drones and invokes a worker thread per drone.
- This method assumes that all drones have a queue of pending calls
- for execution.
- 3. wait_on_drones: Waits for all worker threads started by execute to finish
- and raises any exceptions as a consolidated DroneTaskQueueException.
- 4. get_results: Returns the results of all threads as a dictionary keyed
- on the drones.
-"""
-
-import collections
-import datetime as datetime_base
-from datetime import datetime
-import Queue
-import threading
-import logging
-
-import common
-from autotest_lib.client.common_lib.cros.graphite import stats
-from autotest_lib.scheduler import drone_utility
-
-
-class DroneTaskQueueException(Exception):
- """Generic task queue exception."""
- pass
-
-
-class ExceptionRememberingThread(threading.Thread):
- """A wrapper around regular python threads that records exceptions."""
-
- def run(self):
- """Wrapper around the thread's run method."""
- try:
- super(ExceptionRememberingThread, self).run()
- except Exception as self.err:
- logging.error('%s raised an exception that will be re-raised by '
- 'the thread pool manager.', self.getName())
- else:
- self.err = None
-
-
-class PersistentTimer(object):
- """A class to handle timers across local scopes."""
-
- def __init__(self, name):
- """Initialize a persistent timer.
-
- @param name: The name/key to insert timings under.
- """
- self.name = name
- self.timer = None
-
-
- def start(self):
- """Create and start a new timer."""
- self.timer = stats.Timer(self.name)
- self.timer.start()
-
-
- def stop(self):
- """Stop a previously started timer."""
- try:
- self.timer.stop()
- except (AssertionError, AttributeError) as e:
- logging.info('Stopping timer %s failed: %s', self.name, e)
- finally:
- self.timer = None
-
-
-class ThreadedTaskQueue(object):
- """Threaded implementation of a drone task queue."""
-
- result = collections.namedtuple('task', ['drone', 'results'])
-
- def __init__(self, name='thread_queue'):
- self.results_queue = Queue.Queue()
- self.drone_threads = {}
- self.name = name
- # The persistent timer is used to measure net time spent
- # refreshing all drones across 'execute' and 'get_results'.
- self.timer = PersistentTimer(self.name)
-
-
- @staticmethod
- def worker(drone, results_queue):
- """Worker for task execution.
-
- Execute calls queued against the given drone and place the return value
- in results_queue.
-
- @param drone: A drone with calls to execute.
- @param results_queue: A queue, into which the worker places
- ThreadedTaskQueue.result from the drone calls.
- """
- logging.info('(Worker-%s) starting.', drone.hostname)
- results_queue.put(ThreadedTaskQueue.result(
- drone, drone.execute_queued_calls()))
- logging.info('(Worker-%s) finished.', drone.hostname)
-
-
- def wait_on_drones(self):
- """Wait on all threads that are currently refreshing a drone.
-
- @raises DroneTaskQueueException: Consolidated exception for all
- drone thread exceptions.
- """
- if not self.drone_threads:
- return
- # TODO: Make this process more resilient. We can:
- # 1. Timeout the join.
- # 2. Kick out the exception/timeout drone.
- # 3. Selectively retry exceptions.
- # For now, it is compliant with the single threaded drone manager which
- # will raise all drone_utility, ssh and drone_manager exceptions.
- drone_exceptions = []
- for drone, thread in self.drone_threads.iteritems():
- tname = thread.getName()
- logging.info('(Task Queue) Waiting for %s', tname)
- # Note that this is only the incremental overhead of drone refresh.
- with stats.Timer('%s.%s' % (self.name, tname.replace('.', '_'))):
- thread.join()
- if thread.err:
- drone_exceptions.append((drone, thread.err))
- logging.info('(Task Queue) All threads have returned, clearing map.')
- self.drone_threads = {}
- if not drone_exceptions:
- return
- exception_msg = ''
- for drone, err in drone_exceptions:
- exception_msg += ('Drone %s raised Exception %s\n' %
- (drone.hostname, err))
- raise DroneTaskQueueException(exception_msg)
-
-
- def get_results(self):
- """Get a results dictionary keyed on the drones.
-
- This method synchronously waits till all drone threads have returned
- before checking for results. It is meant to be invoked in conjunction
- with the 'execute' method, which creates a thread per drone.
-
- @return: A dictionary of return values from the drones.
- """
- self.wait_on_drones()
- self.timer.stop()
- results = {}
- while not self.results_queue.empty():
- drone_results = self.results_queue.get()
- if drone_results.drone in results:
- raise DroneTaskQueueException(
- 'Task queue has recorded results for drone %s: %s' %
- (drone_results.drone, results))
- results[drone_results.drone] = drone_results.results
- return results
-
-
- def execute(self, drones, wait=True):
- """Invoke a thread per drone, to execute drone_utility in parallel.
-
- @param drones: A list of drones with calls to execute.
- @param wait: If True, this method will only return when all the drones
- have returned the result of their respective invocations of
- drone_utility. The results_queue and drone_threads will be cleared.
- If False, the caller must clear both the queue and the map before
- the next invocation of 'execute', by calling 'get_results'.
-
- @return: A dictionary keyed on the drones, containing a list of return
- values from the execution of drone_utility.
-
- @raises DroneManagerError: If the results queue or drone map isn't empty
- at the time of invocation.
- """
- if not self.results_queue.empty():
- raise DroneTaskQueueException(
- 'Cannot clobber results queue: %s, it should be cleared '
- 'through get_results.' % self.results_queue)
- if self.drone_threads:
- raise DroneTaskQueueException(
- 'Cannot clobber thread map: %s, it should be cleared '
- 'through wait_on_drones' % self.drone_threads)
- self.timer.start()
- for drone in drones:
- if not drone.get_calls():
- continue
- worker_thread = ExceptionRememberingThread(
- target=ThreadedTaskQueue.worker,
- args=(drone, self.results_queue))
- # None of these threads are allowed to survive past the tick they
- # were spawned in, and the scheduler won't die mid-tick, so none
- # of the threads need to be daemons. However, if the scheduler does
- # die unexpectedly we can just forsake the daemon threads.
- self.drone_threads[drone] = worker_thread
- # The name is only used for debugging
- worker_thread.setName('Worker-%s' % drone.hostname)
- worker_thread.daemon = True
- worker_thread.start()
- return self.get_results() if wait else None
-
-
diff --git a/scheduler/thread_lib_unittest.py b/scheduler/thread_lib_unittest.py
deleted file mode 100644
index 66b59f0..0000000
--- a/scheduler/thread_lib_unittest.py
+++ /dev/null
@@ -1,226 +0,0 @@
-# Copyright (c) 2014 The Chromium OS Authors. All rights reserved.
-# Use of this source code is governed by a BSD-style license that can be
-# found in the LICENSE file.
-
-"""Tests for the drone managers thread queue."""
-
-import cPickle
-import logging
-import Queue
-
-import common
-from autotest_lib.client.common_lib import utils
-from autotest_lib.client.common_lib.test_utils import mock, unittest
-from autotest_lib.scheduler import drones
-from autotest_lib.scheduler import thread_lib
-from autotest_lib.server.hosts import ssh_host
-
-
-class DroneThreadLibTest(unittest.TestCase):
- """Threaded task queue drone library tests."""
-
- def create_remote_drone(self, hostname):
- """Create and initialize a Remote Drone.
-
- @param hostname: The name of the host for the remote drone.
-
- @return: A remote drone instance.
- """
- drones.drone_utility.create_host.expect_call(hostname).and_return(
- self._mock_host)
- self._mock_host.is_up.expect_call().and_return(True)
- return drones._RemoteDrone(hostname)
-
-
- def setUp(self):
- self.god = mock.mock_god()
- self._mock_host = self.god.create_mock_class(ssh_host.SSHHost,
- 'mock SSHHost')
- self.god.stub_function(drones.drone_utility, 'create_host')
- self.drone_utility_path = 'mock-drone-utility-path'
- self.mock_return = {'results': ['mock results'],
- 'warnings': []}
- self.god.stub_with(drones._RemoteDrone, '_drone_utility_path',
- self.drone_utility_path)
-
-
- def tearDown(self):
- self.god.unstub_all()
-
-
- def test_worker(self):
- """Test the worker method of a ThreadedTaskQueue."""
- # Invoke the worker method with a drone that has a queued call and check
- # that the drones host.run method is invoked for the call, and the
- # results queue contains the expected results.
- drone = self.create_remote_drone('fakehostname')
- task_queue = thread_lib.ThreadedTaskQueue()
-
- drone.queue_call('foo')
- mock_result = utils.CmdResult(stdout=cPickle.dumps(self.mock_return))
- self._mock_host.run.expect_call(
- 'python %s' % self.drone_utility_path,
- stdin=cPickle.dumps(drone.get_calls()), stdout_tee=None,
- connect_timeout=mock.is_instance_comparator(int)).and_return(
- mock_result)
- task_queue.worker(drone, task_queue.results_queue)
- result = task_queue.results_queue.get()
-
- self.assertTrue(task_queue.results_queue.empty() and
- result.drone == drone and
- result.results == self.mock_return['results'])
- self.god.check_playback()
-
-
- def test_wait_on_drones(self):
- """Test waiting on drone threads."""
-
- def waiting_func(queue):
- while len(queue.queue) < 2:
- continue
- logging.warning('Consuming thread finished.')
- queue.put(3)
-
- def exception_func(queue):
- while queue.empty():
- continue
- queue.put(2)
- logging.warning('Failing thread raising error.')
- raise ValueError('Value error')
-
- def quick_func():
- return
-
- # Create 2 threads, one of which raises an exception while the other
- # just exits normally. Insert both threads into the thread_queue against
- # mock drones and confirm that:
- # a. The task queue waits for both threads, though the first one fails.
- # b. The task queue records the right DroneTaskQueueException, which
- # contains the original exception.
- # c. The failing thread records its own exception instead of raising it.
- task_queue = thread_lib.ThreadedTaskQueue()
- drone1 = self.create_remote_drone('fakehostname1')
- drone2 = self.create_remote_drone('fakehostname2')
- sync_queue = Queue.Queue()
-
- waiting_worker = thread_lib.ExceptionRememberingThread(
- target=waiting_func, args=(sync_queue,))
- failing_worker = thread_lib.ExceptionRememberingThread(
- target=exception_func, args=(sync_queue,))
- task_queue.drone_threads[drone1] = waiting_worker
- task_queue.drone_threads[drone2] = failing_worker
- master_thread = thread_lib.ExceptionRememberingThread(
- target=task_queue.wait_on_drones)
-
- thread_list = [failing_worker, waiting_worker, master_thread]
- for thread in thread_list:
- thread.setDaemon(True)
- thread.start()
- sync_queue.put(1)
- master_thread.join()
-
- self.assertTrue(isinstance(master_thread.err,
- thread_lib.DroneTaskQueueException))
- self.assertTrue(isinstance(failing_worker.err, ValueError))
- self.assertTrue(str(failing_worker.err) in str(master_thread.err))
- self.assertTrue(3 in list(sync_queue.queue))
- self.assertTrue(task_queue.drone_threads == {})
-
- # Call wait_on_drones after the child thread has exited.
- quick_worker = thread_lib.ExceptionRememberingThread(target=quick_func)
- task_queue.drone_threads[drone1] = quick_worker
- quick_worker.start()
- while quick_worker.isAlive():
- continue
- task_queue.wait_on_drones()
- self.assertTrue(task_queue.drone_threads == {})
-
-
- def test_get_results(self):
- """Test retrieving results from the results queue."""
-
- # Insert results for the same drone twice into the results queue
- # and confirm that an exception is raised.
- task_queue = thread_lib.ThreadedTaskQueue()
- drone1 = self.create_remote_drone('fakehostname1')
- drone2 = self.create_remote_drone('fakehostname2')
- task_queue.results_queue.put(
- thread_lib.ThreadedTaskQueue.result(drone1, self.mock_return))
- task_queue.results_queue.put(
- thread_lib.ThreadedTaskQueue.result(drone1, self.mock_return))
- self.god.stub_function(task_queue, 'wait_on_drones')
- task_queue.wait_on_drones.expect_call()
- self.assertRaises(
- thread_lib.DroneTaskQueueException, task_queue.get_results)
-
- # Insert results for different drones and check that they're returned
- # in a drone results dict.
- self.assertTrue(task_queue.results_queue.empty())
- task_queue.results_queue.put(
- thread_lib.ThreadedTaskQueue.result(drone1, self.mock_return))
- task_queue.results_queue.put(
- thread_lib.ThreadedTaskQueue.result(drone2, self.mock_return))
- task_queue.wait_on_drones.expect_call()
- results = task_queue.get_results()
- self.assertTrue(results[drone1] == self.mock_return and
- results[drone2] == self.mock_return)
- self.god.check_playback()
-
-
- def test_execute(self):
- """Test task queue execute."""
- drone1 = self.create_remote_drone('fakehostname1')
- drone2 = self.create_remote_drone('fakehostname2')
- drone3 = self.create_remote_drone('fakehostname3')
-
- # Check task queue exception conditions.
- task_queue = thread_lib.ThreadedTaskQueue()
- task_queue.results_queue.put(1)
- self.assertRaises(thread_lib.DroneTaskQueueException,
- task_queue.execute, [])
- task_queue.results_queue.get()
- task_queue.drone_threads[drone1] = None
- self.assertRaises(thread_lib.DroneTaskQueueException,
- task_queue.execute, [])
- task_queue.drone_threads = {}
-
- # Queue 2 calls against each drone, and confirm that the host's
- # run method is called 3 times. Then check the threads created,
- # and finally compare results returned by the task queue against
- # the mock results.
- drones = [drone1, drone2, drone3]
- for drone in drones:
- drone.queue_call('foo')
- drone.queue_call('bar')
- mock_result = utils.CmdResult(
- stdout=cPickle.dumps(self.mock_return))
- self._mock_host.run.expect_call(
- 'python %s' % self.drone_utility_path,
- stdin=cPickle.dumps(drone.get_calls()), stdout_tee=None,
- connect_timeout=mock.is_instance_comparator(int)
- ).and_return(mock_result)
- task_queue.execute(drones, wait=False)
- self.assertTrue(set(task_queue.drone_threads.keys()) == set(drones))
- for drone, thread in task_queue.drone_threads.iteritems():
- self.assertTrue(drone.hostname in thread.getName())
- self.assertTrue(thread.isDaemon())
- self.assertRaises(RuntimeError, thread.start)
- results = task_queue.get_results()
- for drone, result in results.iteritems():
- self.assertTrue(result == self.mock_return['results'])
-
- # Test synchronous execute
- drone1.queue_call('foo')
- mock_result = utils.CmdResult(stdout=cPickle.dumps(self.mock_return))
- self._mock_host.run.expect_call(
- 'python %s' % self.drone_utility_path,
- stdin=cPickle.dumps(drone1.get_calls()), stdout_tee=None,
- connect_timeout=mock.is_instance_comparator(int)).and_return(
- mock_result)
- self.assertTrue(task_queue.execute(drones, wait=True)[drone1] ==
- self.mock_return['results'])
- self.god.check_playback()
-
-
-
-