Revert "[autotest] Threaded asynchronous task execution on drones." 

Problems with the retry decorator and localhost threads.

This reverts commit 0933899b0dd1320e90e06025cced8096aed44908.

Change-Id: I99318b4bdf4c11e9c4e5181c4ff5b1bdcdcbb89c
Reviewed-on: https://chromium-review.googlesource.com/207038
Reviewed-by: Prashanth B <beeps@chromium.org>
Commit-Queue: Prashanth B <beeps@chromium.org>
Tested-by: Prashanth B <beeps@chromium.org>
diff --git a/scheduler/drone_manager.py b/scheduler/drone_manager.py
index 23fcff2..cc9f44e 100644
--- a/scheduler/drone_manager.py
+++ b/scheduler/drone_manager.py
@@ -1,18 +1,10 @@
-import collections
-import heapq
-import os
-import Queue
-import time
-import threading
-import traceback
-import logging
+import os, time, heapq, traceback, logging
 
 import common
 from autotest_lib.client.common_lib import error, global_config, utils
 from autotest_lib.client.common_lib.cros.graphite import stats
 from autotest_lib.scheduler import email_manager, drone_utility, drones
 from autotest_lib.scheduler import scheduler_config
-from autotest_lib.scheduler import thread_lib
 
 
 # results on drones will be placed under the drone_installation_directory in a
@@ -152,8 +144,7 @@
     # Minimum time to wait before next email
     # about a drone hitting process limit is sent.
     NOTIFY_INTERVAL = 60 * 60 * 24 # one day
-    _STATS_KEY = 'drone_manager'
-    _timer = stats.Timer(_STATS_KEY)
+    _timer = stats.Timer('drone_manager')
 
 
     def __init__(self):
@@ -181,9 +172,6 @@
         # map drone hostname to time stamp of email that
         # has been sent about the drone hitting process limit.
         self._notify_record = {}
-        # A threaded task queue used to refresh drones asynchronously.
-        self._refresh_task_queue = thread_lib.ThreadedTaskQueue(
-                name='%s.refresh_queue' % self._STATS_KEY)
 
 
     def initialize(self, base_results_dir, drone_hostnames,
@@ -314,12 +302,6 @@
 
 
     def _parse_pidfile(self, drone, raw_contents):
-        """Parse raw pidfile contents.
-
-        @param drone: The drone on which this pidfile was found.
-        @param raw_contents: The raw contents of a pidfile, eg:
-            "pid\nexit_staus\nnum_tests_failed\n".
-        """
         contents = PidfileContents()
         if not raw_contents:
             return contents
@@ -408,66 +390,19 @@
                 self._notify_record[drone.hostname] = now
 
 
-    def trigger_refresh(self):
-        """Triggers a drone manager refresh.
-
-        @raises DroneManagerError: If a drone has un-executed calls.
-            Since they will get clobbered when we queue refresh calls.
+    def refresh(self):
+        """
+        Called at the beginning of a scheduler cycle to refresh all process
+        information.
         """
         self._reset()
         self._drop_old_pidfiles()
         pidfile_paths = [pidfile_id.path
                          for pidfile_id in self._registered_pidfile_info]
-        drones = list(self.get_drones())
-        for drone in drones:
-            calls = drone.get_calls()
-            if calls:
-                raise DroneManagerError('Drone %s has un-executed calls: %s '
-                                        'which might get corrupted through '
-                                        'this invocation' %
-                                        (drone, [str(call) for call in calls]))
-            drone.queue_call('refresh', pidfile_paths)
-        logging.info("Invoking drone refresh.")
-        with self._timer.get_client('trigger_refresh'):
-            self._refresh_task_queue.execute(drones, wait=False)
+        with self._timer.get_client('refresh'):
+            all_results = self._call_all_drones('refresh', pidfile_paths)
+        logging.info("Drones refreshed")
 
-
-    def sync_refresh(self):
-        """Complete the drone refresh started by trigger_refresh.
-
-        Waits for all drone threads then refreshes internal datastructures
-        with drone process information.
-        """
-
-        # This gives us a dictionary like what follows:
-        # {drone: [{'pidfiles': (raw contents of pidfile paths),
-        #           'autoserv_processes': (autoserv process info from ps),
-        #           'all_processes': (all process info from ps),
-        #           'parse_processes': (parse process infor from ps),
-        #           'pidfile_second_read': (pidfile contents, again),}]
-        #   drone2: ...}
-        # The values of each drone are only a list because this adheres to the
-        # drone utility interface (each call is executed and its results are
-        # places in a list, but since we never couple the refresh calls with
-        # any other call, this list will always contain a single dict).
-        with self._timer.get_client('sync_refresh'):
-            all_results = self._refresh_task_queue.get_results()
-        logging.info("Drones refreshed.")
-
-        # The loop below goes through and parses pidfile contents. Pidfiles
-        # are used to track autoserv execution, and will always contain < 3
-        # lines of the following: pid, exit code, number of tests. Each pidfile
-        # is identified by a PidfileId object, which contains a unique pidfile
-        # path (unique because it contains the job id) making it hashable.
-        # All pidfiles are stored in the drone managers _pidfiles dict as:
-        #   {pidfile_id: pidfile_contents(Process(drone, pid),
-        #                                 exit_code, num_tests_failed)}
-        # In handle agents, each agent knows its pidfile_id, and uses this
-        # to retrieve the refreshed contents of its pidfile via the
-        # PidfileRunMonitor (through its tick) before making decisions. If
-        # the agent notices that its process has exited, it unregisters the
-        # pidfile from the drone_managers._registered_pidfile_info dict
-        # through its epilog.
         for drone, results_list in all_results.iteritems():
             results = results_list[0]
             drone_hostname = drone.hostname.replace('.', '_')
@@ -495,23 +430,13 @@
                 self._check_drone_process_limit(drone)
 
 
-    def refresh(self):
-        """Refresh all drones."""
-        with self._timer.get_client('refresh'):
-            self.trigger_refresh()
-            self.sync_refresh()
-
-
     def execute_actions(self):
         """
         Called at the end of a scheduler cycle to execute all queued actions
         on drones.
         """
-        # Invoke calls queued on all drones since the last call to execute
-        # and wait for them to return.
-        thread_lib.ThreadedTaskQueue(
-                name='%s.execute_queue' % self._STATS_KEY).execute(
-                        self._drones.values())
+        for drone in self._drones.values():
+            drone.execute_queued_calls()
 
         try:
             self._results_drone.execute_queued_calls()
diff --git a/scheduler/drone_manager_unittest.py b/scheduler/drone_manager_unittest.py
index 034cd38..d2ee2a2 100755
--- a/scheduler/drone_manager_unittest.py
+++ b/scheduler/drone_manager_unittest.py
@@ -1,17 +1,11 @@
 #!/usr/bin/python
 
-import cPickle
 import os, unittest
 import common
 from autotest_lib.client.common_lib import global_config
-from autotest_lib.client.common_lib import utils
 from autotest_lib.client.common_lib.test_utils import mock
 from autotest_lib.scheduler import drone_manager, drone_utility, drones
 from autotest_lib.scheduler import scheduler_config, site_drone_manager
-from autotest_lib.scheduler import thread_lib
-from autotest_lib.scheduler import pidfile_monitor
-from autotest_lib.server.hosts import ssh_host
-
 
 class MockDrone(drones._AbstractDrone):
     def __init__(self, name, active_processes=0, max_processes=10,
@@ -337,140 +331,5 @@
         self.assertFalse(self.manager._registered_pidfile_info)
 
 
-class AsyncDroneManager(unittest.TestCase):
-    _DRONE_INSTALL_DIR = '/drone/install/dir'
-    _RESULTS_DIR = '/results/dir'
-
-
-    def create_remote_drone(self, hostname, mock_host):
-        """Create and initialize a Remote Drone.
-
-        @return: A remote drone instance.
-        """
-        drones.drone_utility.create_host.expect_call(hostname).and_return(
-                mock_host)
-        mock_host.is_up.expect_call().and_return(True)
-        return drones._RemoteDrone(hostname)
-
-
-    def create_fake_pidfile_info(self, tag='tag', name='name'):
-        pidfile_id = self.manager.get_pidfile_id_from(tag, name)
-        self.manager.register_pidfile(pidfile_id)
-        return self.manager._registered_pidfile_info
-
-
-    def setUp(self):
-        self.god = mock.mock_god()
-        self.god.stub_with(drones, 'AUTOTEST_INSTALL_DIR',
-                           self._DRONE_INSTALL_DIR)
-        self.manager = drone_manager.DroneManager()
-        self.god.stub_with(self.manager, '_results_dir', self._RESULTS_DIR)
-
-        # we don't want this to ever actually get called
-        self.god.stub_function(drones, 'get_drone')
-        # we don't want the DroneManager to go messing with global config
-        def do_nothing():
-            pass
-        self.god.stub_with(self.manager, 'refresh_drone_configs', do_nothing)
-
-        # set up some dummy drones with different mock hosts.
-        mock_host1 = self.god.create_mock_class(ssh_host.SSHHost,
-                                                'mock SSHHost 1')
-        mock_host2 = self.god.create_mock_class(ssh_host.SSHHost,
-                                                'mock SSHHost 2')
-        self.god.stub_function(drones.drone_utility, 'create_host')
-        self.mock_drone_1 = self.create_remote_drone(
-                'fakehostname1', mock_host1)
-        self.mock_drone_2 = self.create_remote_drone(
-                'fakehostname2', mock_host2)
-        for mock_drone in [self.mock_drone_1, self.mock_drone_2]:
-            self.manager._drones[mock_drone.hostname] = mock_drone
-
-        self.results_drone = MockDrone('results_drone', 0, 10)
-        self.manager._results_drone = self.results_drone
-        self.drone_utility_path = 'mock-drone-utility-path'
-        self.mock_return = {'results': ['mock results'],
-                            'warnings': []}
-
-
-    def tearDown(self):
-        self.god.unstub_all()
-
-    def test_trigger_refresh(self):
-        """Test drone manager trigger refresh."""
-        self.god.stub_with(drones._RemoteDrone, '_drone_utility_path',
-                           self.drone_utility_path)
-
-        # Create some fake pidfiles and confirm that a refresh call is
-        # executed on each drone host, with the same pidfile paths. Then
-        # check that each drone gets a key in the returned results dictionary.
-        for i in range(0, 1):
-            pidfile_info = self.create_fake_pidfile_info(
-                    'tag%s' % i, 'name%s' %i)
-        pidfile_paths = [pidfile.path for pidfile in pidfile_info.keys()]
-        refresh_call = drone_utility.call('refresh', pidfile_paths)
-        expected_results = {}
-        for drone in self.manager.get_drones():
-            mock_result = utils.CmdResult(
-                    stdout=cPickle.dumps(self.mock_return))
-            drone._host.run.expect_call(
-                    'python %s' % self.drone_utility_path,
-                    stdin=cPickle.dumps([refresh_call]), stdout_tee=None,
-                    connect_timeout=mock.is_instance_comparator(int)
-                ).and_return(mock_result)
-            expected_results[drone] = self.mock_return['results']
-
-        self.manager.trigger_refresh()
-        self.assertTrue(self.manager._refresh_task_queue.get_results() ==
-                        expected_results)
-        self.god.check_playback()
-
-
-    def test_sync_refresh(self):
-        """Test drone manager sync refresh."""
-
-        # Insert some drone_utility results into the results queue, then
-        # check that get_results returns it in the right format, and that
-        # the rest of sync_refresh populates the right datastructures for
-        # correct handling of agents. Also confirm that this method of
-        # syncing is sufficient for the monitor to pick up the exit status
-        # of the process in the same way it would in handle_agents.
-        pidfile_path = 'results/hosts/host_id/job_id-name/.autoserv_execute'
-        pidfiles = {pidfile_path: '123\n12\n0\n'}
-        drone_utility_results = {
-                'pidfiles': pidfiles,
-                'autoserv_processes':{},
-                'all_processes':{},
-                'parse_processes':{},
-                'pidfiles_second_read':pidfiles,
-        }
-        # Our manager instance isn't the drone manager singletone that the
-        # pidfile_monitor will use by default, becuase setUp doesn't call
-        # drone_manager.instance().
-        self.god.stub_with(pidfile_monitor, '_drone_manager', self.manager)
-        monitor = pidfile_monitor.PidfileRunMonitor()
-        monitor.pidfile_id = drone_manager.PidfileId(pidfile_path)
-        self.manager.register_pidfile(monitor.pidfile_id)
-        self.assertTrue(monitor._state.exit_status == None)
-
-        self.manager._refresh_task_queue.results_queue.put(
-                thread_lib.ThreadedTaskQueue.result(
-                    self.mock_drone_1, [drone_utility_results]))
-        self.manager.sync_refresh()
-        pidfiles = self.manager._pidfiles
-        pidfile_id = pidfiles.keys()[0]
-        pidfile_contents = pidfiles[pidfile_id]
-
-        self.assertTrue(
-                pidfile_id.path == pidfile_path and
-                pidfile_contents.process.pid == 123 and
-                pidfile_contents.process.hostname ==
-                        self.mock_drone_1.hostname and
-                pidfile_contents.exit_status == 12 and
-                pidfile_contents.num_tests_failed == 0)
-        self.assertTrue(monitor.exit_code() == 12)
-        self.god.check_playback()
-
-
 if __name__ == '__main__':
     unittest.main()
diff --git a/scheduler/drone_utility.py b/scheduler/drone_utility.py
index 30cd336..ab1ce44 100755
--- a/scheduler/drone_utility.py
+++ b/scheduler/drone_utility.py
@@ -1,15 +1,4 @@
 #!/usr/bin/python
-#pylint: disable-msg=C0111
-
-"""Utility module that executes management commands on the drone.
-
-1. This is the module responsible for orchestrating processes on a drone.
-2. It receives instructions via stdin and replies via stdout.
-3. Each invocation is responsible for the initiation of a set of batched calls.
-4. The batched calls may be synchronous or asynchronous.
-5. The caller is responsible for monitoring asynchronous calls through pidfiles.
-"""
-
 
 import pickle, subprocess, os, shutil, sys, time, signal, getpass
 import datetime, traceback, tempfile, itertools, logging
@@ -111,17 +100,9 @@
     @classmethod
     @timer.decorate
     def _get_process_info(cls):
-        """Parse ps output for all process information.
-
+        """
         @returns A generator of dicts with cls._PS_ARGS as keys and
-            string values each representing a running process. eg:
-            {
-                'comm': command_name,
-                'pgid': process group id,
-                'ppid': parent process id,
-                'pid': process id,
-                'args': args the command was invoked with,
-            }
+                string values each representing a running process.
         """
         @retry.retry(subprocess.CalledProcessError,
                      timeout_min=0.5, delay_sec=0.25)
@@ -138,16 +119,6 @@
 
     def _refresh_processes(self, command_name, open=open,
                            site_check_parse=None):
-        """Refreshes process info for the given command_name.
-
-        Examines ps output as returned by get_process_info and returns
-        the process dicts for processes matching the given command name.
-
-        @param command_name: The name of the command, eg 'autoserv'.
-
-        @return: A list of process info dictionaries as returned by
-            _get_process_info.
-        """
         # The open argument is used for test injection.
         check_mark = global_config.global_config.get_config_value(
             'SCHEDULER', 'check_processes_for_dark_mark', bool, False)
diff --git a/scheduler/drones.py b/scheduler/drones.py
index c52c3dc..605d250 100644
--- a/scheduler/drones.py
+++ b/scheduler/drones.py
@@ -1,5 +1,3 @@
-#pylint: disable-msg=C0111
-
 import cPickle, os, tempfile, logging
 import common
 from autotest_lib.scheduler import drone_utility, email_manager
@@ -72,14 +70,6 @@
         return return_message['results']
 
 
-    def get_calls(self):
-        """Returns the calls queued against this drone.
-
-        @return: A list of calls queued against the drone.
-        """
-        return self._calls
-
-
     def call(self, method, *args, **kwargs):
         return self._execute_calls(
             [drone_utility.call(method, *args, **kwargs)])
@@ -95,9 +85,8 @@
     def execute_queued_calls(self):
         if not self._calls:
             return
-        results = self._execute_calls(self._calls)
+        self._execute_calls(self._calls)
         self.clear_call_queue()
-        return results
 
 
     def set_autotest_install_dir(self, path):
diff --git a/scheduler/drones_unittest.py b/scheduler/drones_unittest.py
index a3c3953..d395288 100755
--- a/scheduler/drones_unittest.py
+++ b/scheduler/drones_unittest.py
@@ -1,5 +1,4 @@
 #!/usr/bin/python
-#pylint: disable-msg=C0111
 
 """Tests for autotest_lib.scheduler.drones."""
 
@@ -13,13 +12,11 @@
 
 
 class RemoteDroneTest(unittest.TestCase):
-
     def setUp(self):
         self.god = mock.mock_god()
         self._mock_host = self.god.create_mock_class(ssh_host.SSHHost,
                                                      'mock SSHHost')
         self.god.stub_function(drones.drone_utility, 'create_host')
-        self.drone_utility_path = 'mock-drone-utility-path'
 
 
     def tearDown(self):
@@ -36,43 +33,22 @@
 
     def test_execute_calls_impl(self):
         self.god.stub_with(drones._RemoteDrone, '_drone_utility_path',
-                           self.drone_utility_path)
+                           'mock-drone-utility-path')
         drones.drone_utility.create_host.expect_call('fakehost').and_return(
                 self._mock_host)
         self._mock_host.is_up.expect_call().and_return(True)
         mock_calls = ('foo',)
         mock_result = utils.CmdResult(stdout=cPickle.dumps('mock return'))
         self._mock_host.run.expect_call(
-                'python %s' % self.drone_utility_path,
+                'python mock-drone-utility-path',
                 stdin=cPickle.dumps(mock_calls), stdout_tee=None,
                 connect_timeout=mock.is_instance_comparator(int)).and_return(
                         mock_result)
+
         drone = drones._RemoteDrone('fakehost')
         self.assertEqual('mock return', drone._execute_calls_impl(mock_calls))
         self.god.check_playback()
 
 
-    def test_execute_queued_calls(self):
-        self.god.stub_with(drones._RemoteDrone, '_drone_utility_path',
-                           self.drone_utility_path)
-        drones.drone_utility.create_host.expect_call('fakehost').and_return(
-                self._mock_host)
-        self._mock_host.is_up.expect_call().and_return(True)
-        drone = drones._RemoteDrone('fakehost')
-        mock_return={}
-        mock_return['results'] = ['mock return']
-        mock_return['warnings'] = []
-        drone.queue_call('foo')
-        mock_result = utils.CmdResult(stdout=cPickle.dumps(mock_return))
-        self._mock_host.run.expect_call(
-                'python %s' % self.drone_utility_path,
-                stdin=cPickle.dumps(drone.get_calls()), stdout_tee=None,
-                connect_timeout=mock.is_instance_comparator(int)).and_return(
-                        mock_result)
-        self.assertEqual(mock_return['results'], drone.execute_queued_calls())
-        self.god.check_playback()
-
-
-
 if __name__ == '__main__':
     unittest.main()
diff --git a/scheduler/monitor_db.py b/scheduler/monitor_db.py
index ddcb028..4860859 100755
--- a/scheduler/monitor_db.py
+++ b/scheduler/monitor_db.py
@@ -299,8 +299,8 @@
         timer = stats.Timer('scheduler.tick')
         self._log_tick_msg('Calling new tick, starting garbage collection().')
         self._garbage_collection()
-        self._log_tick_msg('Calling _drone_manager.trigger_refresh().')
-        _drone_manager.trigger_refresh()
+        self._log_tick_msg('Calling _drone_manager.refresh().')
+        _drone_manager.refresh()
         self._log_tick_msg('Calling _run_cleanup().')
         self._run_cleanup()
         self._log_tick_msg('Calling _find_aborting().')
@@ -317,8 +317,6 @@
         self._schedule_special_tasks()
         self._log_tick_msg('Calling _schedule_new_jobs().')
         self._schedule_new_jobs()
-        self._log_tick_msg('Calling _drone_manager.sync_refresh().')
-        _drone_manager.sync_refresh()
         self._log_tick_msg('Calling _handle_agents().')
         self._handle_agents()
         self._log_tick_msg('Calling _host_scheduler.tick().')
diff --git a/scheduler/site_drones.py b/scheduler/site_drones.py
index 2022f64..fad9e4a 100644
--- a/scheduler/site_drones.py
+++ b/scheduler/site_drones.py
@@ -40,4 +40,4 @@
         if self._processes_to_kill:
             self.queue_call('kill_processes', self._processes_to_kill)
         self.clear_processes_to_kill()
-        return super(_SiteAbstractDrone, self).execute_queued_calls()
+        super(_SiteAbstractDrone, self).execute_queued_calls()
diff --git a/scheduler/thread_lib.py b/scheduler/thread_lib.py
deleted file mode 100644
index cb9f226..0000000
--- a/scheduler/thread_lib.py
+++ /dev/null
@@ -1,211 +0,0 @@
-# Copyright (c) 2014 The Chromium OS Authors. All rights reserved.
-# Use of this source code is governed by a BSD-style license that can be
-# found in the LICENSE file.
-
-
-"""Thread library for drone management.
-
-This library contains a threaded task queue capable of starting, monitoring
-and syncing threads across remote and localhost drones asynchronously. It also
-contains a wrapper for standard python threads that records exceptions so they
-can be re-raised in the thread manager. The api exposed by the threaded task
-queue is as follows:
-    1. worker: The staticmethod executed by all worker threads.
-    2. execute: Takes a list of drones and invokes a worker thread per drone.
-        This method assumes that all drones have a queue of pending calls
-        for execution.
-    3. wait_on_drones: Waits for all worker threads started by execute to finish
-        and raises any exceptions as a consolidated DroneTaskQueueException.
-    4. get_results: Returns the results of all threads as a dictionary keyed
-        on the drones.
-"""
-
-import collections
-import datetime as datetime_base
-from datetime import datetime
-import Queue
-import threading
-import logging
-
-import common
-from autotest_lib.client.common_lib.cros.graphite import stats
-from autotest_lib.scheduler import drone_utility
-
-
-class DroneTaskQueueException(Exception):
-    """Generic task queue exception."""
-    pass
-
-
-class ExceptionRememberingThread(threading.Thread):
-    """A wrapper around regular python threads that records exceptions."""
-
-    def run(self):
-        """Wrapper around the thread's run method."""
-        try:
-            super(ExceptionRememberingThread, self).run()
-        except Exception as self.err:
-            logging.error('%s raised an exception that will be re-raised by '
-                          'the thread pool manager.', self.getName())
-        else:
-            self.err = None
-
-
-class PersistentTimer(object):
-    """A class to handle timers across local scopes."""
-
-    def __init__(self, name):
-        """Initialize a persistent timer.
-
-        @param name: The name/key to insert timings under.
-        """
-        self.name = name
-        self.timer = None
-
-
-    def start(self):
-        """Create and start a new timer."""
-        self.timer = stats.Timer(self.name)
-        self.timer.start()
-
-
-    def stop(self):
-        """Stop a previously started timer."""
-        try:
-            self.timer.stop()
-        except (AssertionError, AttributeError) as e:
-            logging.info('Stopping timer %s failed: %s', self.name, e)
-        finally:
-            self.timer = None
-
-
-class ThreadedTaskQueue(object):
-    """Threaded implementation of a drone task queue."""
-
-    result = collections.namedtuple('task', ['drone', 'results'])
-
-    def __init__(self, name='thread_queue'):
-        self.results_queue = Queue.Queue()
-        self.drone_threads = {}
-        self.name = name
-        # The persistent timer is used to measure net time spent
-        # refreshing all drones across 'execute' and 'get_results'.
-        self.timer = PersistentTimer(self.name)
-
-
-    @staticmethod
-    def worker(drone, results_queue):
-        """Worker for task execution.
-
-        Execute calls queued against the given drone and place the return value
-        in results_queue.
-
-        @param drone: A drone with calls to execute.
-        @param results_queue: A queue, into which the worker places
-            ThreadedTaskQueue.result from the drone calls.
-        """
-        logging.info('(Worker-%s) starting.', drone.hostname)
-        results_queue.put(ThreadedTaskQueue.result(
-            drone, drone.execute_queued_calls()))
-        logging.info('(Worker-%s) finished.', drone.hostname)
-
-
-    def wait_on_drones(self):
-        """Wait on all threads that are currently refreshing a drone.
-
-        @raises DroneTaskQueueException: Consolidated exception for all
-            drone thread exceptions.
-        """
-        if not self.drone_threads:
-            return
-        # TODO: Make this process more resilient. We can:
-        # 1. Timeout the join.
-        # 2. Kick out the exception/timeout drone.
-        # 3. Selectively retry exceptions.
-        # For now, it is compliant with the single threaded drone manager which
-        # will raise all drone_utility, ssh and drone_manager exceptions.
-        drone_exceptions = []
-        for drone, thread in self.drone_threads.iteritems():
-            tname = thread.getName()
-            logging.info('(Task Queue) Waiting for %s', tname)
-            # Note that this is only the incremental overhead of drone refresh.
-            with stats.Timer('%s.%s' % (self.name, tname.replace('.', '_'))):
-                thread.join()
-            if thread.err:
-                drone_exceptions.append((drone, thread.err))
-        logging.info('(Task Queue) All threads have returned, clearing map.')
-        self.drone_threads = {}
-        if not drone_exceptions:
-            return
-        exception_msg = ''
-        for drone, err in drone_exceptions:
-            exception_msg += ('Drone %s raised Exception %s\n' %
-                              (drone.hostname, err))
-        raise DroneTaskQueueException(exception_msg)
-
-
-    def get_results(self):
-        """Get a results dictionary keyed on the drones.
-
-        This method synchronously waits till all drone threads have returned
-        before checking for results. It is meant to be invoked in conjunction
-        with the 'execute' method, which creates a thread per drone.
-
-        @return: A dictionary of return values from the drones.
-        """
-        self.wait_on_drones()
-        self.timer.stop()
-        results = {}
-        while not self.results_queue.empty():
-            drone_results = self.results_queue.get()
-            if drone_results.drone in results:
-                raise DroneTaskQueueException(
-                        'Task queue has recorded results for drone %s: %s' %
-                        (drone_results.drone, results))
-            results[drone_results.drone] = drone_results.results
-        return results
-
-
-    def execute(self, drones, wait=True):
-        """Invoke a thread per drone, to execute drone_utility in parallel.
-
-        @param drones: A list of drones with calls to execute.
-        @param wait: If True, this method will only return when all the drones
-            have returned the result of their respective invocations of
-            drone_utility. The results_queue and drone_threads will be cleared.
-            If False, the caller must clear both the queue and the map before
-            the next invocation of 'execute', by calling 'get_results'.
-
-        @return: A dictionary keyed on the drones, containing a list of return
-            values from the execution of drone_utility.
-
-        @raises DroneManagerError: If the results queue or drone map isn't empty
-            at the time of invocation.
-        """
-        if not self.results_queue.empty():
-            raise DroneTaskQueueException(
-                    'Cannot clobber results queue: %s, it should be cleared '
-                    'through get_results.' % self.results_queue)
-        if self.drone_threads:
-            raise DroneTaskQueueException(
-                    'Cannot clobber thread map: %s, it should be cleared '
-                    'through wait_on_drones' % self.drone_threads)
-        self.timer.start()
-        for drone in drones:
-            if not drone.get_calls():
-                continue
-            worker_thread = ExceptionRememberingThread(
-                    target=ThreadedTaskQueue.worker,
-                    args=(drone, self.results_queue))
-            # None of these threads are allowed to survive past the tick they
-            # were spawned in, and the scheduler won't die mid-tick, so none
-            # of the threads need to be daemons. However, if the scheduler does
-            # die unexpectedly we can just forsake the daemon threads.
-            self.drone_threads[drone] = worker_thread
-            # The name is only used for debugging
-            worker_thread.setName('Worker-%s' % drone.hostname)
-            worker_thread.daemon = True
-            worker_thread.start()
-        return self.get_results() if wait else None
-
-
diff --git a/scheduler/thread_lib_unittest.py b/scheduler/thread_lib_unittest.py
deleted file mode 100644
index 66b59f0..0000000
--- a/scheduler/thread_lib_unittest.py
+++ /dev/null
@@ -1,226 +0,0 @@
-# Copyright (c) 2014 The Chromium OS Authors. All rights reserved.
-# Use of this source code is governed by a BSD-style license that can be
-# found in the LICENSE file.
-
-"""Tests for the drone managers thread queue."""
-
-import cPickle
-import logging
-import Queue
-
-import common
-from autotest_lib.client.common_lib import utils
-from autotest_lib.client.common_lib.test_utils import mock, unittest
-from autotest_lib.scheduler import drones
-from autotest_lib.scheduler import thread_lib
-from autotest_lib.server.hosts import ssh_host
-
-
-class DroneThreadLibTest(unittest.TestCase):
-    """Threaded task queue drone library tests."""
-
-    def create_remote_drone(self, hostname):
-        """Create and initialize a Remote Drone.
-
-        @param hostname: The name of the host for the remote drone.
-
-        @return: A remote drone instance.
-        """
-        drones.drone_utility.create_host.expect_call(hostname).and_return(
-                self._mock_host)
-        self._mock_host.is_up.expect_call().and_return(True)
-        return drones._RemoteDrone(hostname)
-
-
-    def setUp(self):
-        self.god = mock.mock_god()
-        self._mock_host = self.god.create_mock_class(ssh_host.SSHHost,
-                                                     'mock SSHHost')
-        self.god.stub_function(drones.drone_utility, 'create_host')
-        self.drone_utility_path = 'mock-drone-utility-path'
-        self.mock_return = {'results': ['mock results'],
-                            'warnings': []}
-        self.god.stub_with(drones._RemoteDrone, '_drone_utility_path',
-                self.drone_utility_path)
-
-
-    def tearDown(self):
-        self.god.unstub_all()
-
-
-    def test_worker(self):
-        """Test the worker method of a ThreadedTaskQueue."""
-        # Invoke the worker method with a drone that has a queued call and check
-        # that the drones host.run method is invoked for the call, and the
-        # results queue contains the expected results.
-        drone = self.create_remote_drone('fakehostname')
-        task_queue = thread_lib.ThreadedTaskQueue()
-
-        drone.queue_call('foo')
-        mock_result = utils.CmdResult(stdout=cPickle.dumps(self.mock_return))
-        self._mock_host.run.expect_call(
-                'python %s' % self.drone_utility_path,
-                stdin=cPickle.dumps(drone.get_calls()), stdout_tee=None,
-                connect_timeout=mock.is_instance_comparator(int)).and_return(
-                        mock_result)
-        task_queue.worker(drone, task_queue.results_queue)
-        result = task_queue.results_queue.get()
-
-        self.assertTrue(task_queue.results_queue.empty() and
-                        result.drone == drone and
-                        result.results == self.mock_return['results'])
-        self.god.check_playback()
-
-
-    def test_wait_on_drones(self):
-        """Test waiting on drone threads."""
-
-        def waiting_func(queue):
-            while len(queue.queue) < 2:
-                continue
-            logging.warning('Consuming thread finished.')
-            queue.put(3)
-
-        def exception_func(queue):
-            while queue.empty():
-                continue
-            queue.put(2)
-            logging.warning('Failing thread raising error.')
-            raise ValueError('Value error')
-
-        def quick_func():
-            return
-
-        # Create 2 threads, one of which raises an exception while the other
-        # just exits normally. Insert both threads into the thread_queue against
-        # mock drones and confirm that:
-        # a. The task queue waits for both threads, though the first one fails.
-        # b. The task queue records the right DroneTaskQueueException, which
-        #       contains the original exception.
-        # c. The failing thread records its own exception instead of raising it.
-        task_queue = thread_lib.ThreadedTaskQueue()
-        drone1 = self.create_remote_drone('fakehostname1')
-        drone2 = self.create_remote_drone('fakehostname2')
-        sync_queue = Queue.Queue()
-
-        waiting_worker = thread_lib.ExceptionRememberingThread(
-                target=waiting_func, args=(sync_queue,))
-        failing_worker = thread_lib.ExceptionRememberingThread(
-                target=exception_func, args=(sync_queue,))
-        task_queue.drone_threads[drone1] = waiting_worker
-        task_queue.drone_threads[drone2] = failing_worker
-        master_thread = thread_lib.ExceptionRememberingThread(
-                target=task_queue.wait_on_drones)
-
-        thread_list = [failing_worker, waiting_worker, master_thread]
-        for thread in thread_list:
-            thread.setDaemon(True)
-            thread.start()
-        sync_queue.put(1)
-        master_thread.join()
-
-        self.assertTrue(isinstance(master_thread.err,
-                                   thread_lib.DroneTaskQueueException))
-        self.assertTrue(isinstance(failing_worker.err, ValueError))
-        self.assertTrue(str(failing_worker.err) in str(master_thread.err))
-        self.assertTrue(3 in list(sync_queue.queue))
-        self.assertTrue(task_queue.drone_threads == {})
-
-        # Call wait_on_drones after the child thread has exited.
-        quick_worker = thread_lib.ExceptionRememberingThread(target=quick_func)
-        task_queue.drone_threads[drone1] = quick_worker
-        quick_worker.start()
-        while quick_worker.isAlive():
-            continue
-        task_queue.wait_on_drones()
-        self.assertTrue(task_queue.drone_threads == {})
-
-
-    def test_get_results(self):
-        """Test retrieving results from the results queue."""
-
-        # Insert results for the same drone twice into the results queue
-        # and confirm that an exception is raised.
-        task_queue = thread_lib.ThreadedTaskQueue()
-        drone1 = self.create_remote_drone('fakehostname1')
-        drone2 = self.create_remote_drone('fakehostname2')
-        task_queue.results_queue.put(
-                thread_lib.ThreadedTaskQueue.result(drone1, self.mock_return))
-        task_queue.results_queue.put(
-                thread_lib.ThreadedTaskQueue.result(drone1, self.mock_return))
-        self.god.stub_function(task_queue, 'wait_on_drones')
-        task_queue.wait_on_drones.expect_call()
-        self.assertRaises(
-                thread_lib.DroneTaskQueueException, task_queue.get_results)
-
-        # Insert results for different drones and check that they're returned
-        # in a drone results dict.
-        self.assertTrue(task_queue.results_queue.empty())
-        task_queue.results_queue.put(
-                thread_lib.ThreadedTaskQueue.result(drone1, self.mock_return))
-        task_queue.results_queue.put(
-                thread_lib.ThreadedTaskQueue.result(drone2, self.mock_return))
-        task_queue.wait_on_drones.expect_call()
-        results = task_queue.get_results()
-        self.assertTrue(results[drone1] == self.mock_return and
-                        results[drone2] == self.mock_return)
-        self.god.check_playback()
-
-
-    def test_execute(self):
-        """Test task queue execute."""
-        drone1 = self.create_remote_drone('fakehostname1')
-        drone2 = self.create_remote_drone('fakehostname2')
-        drone3 = self.create_remote_drone('fakehostname3')
-
-        # Check task queue exception conditions.
-        task_queue = thread_lib.ThreadedTaskQueue()
-        task_queue.results_queue.put(1)
-        self.assertRaises(thread_lib.DroneTaskQueueException,
-                          task_queue.execute, [])
-        task_queue.results_queue.get()
-        task_queue.drone_threads[drone1] = None
-        self.assertRaises(thread_lib.DroneTaskQueueException,
-                          task_queue.execute, [])
-        task_queue.drone_threads = {}
-
-        # Queue 2 calls against each drone, and confirm that the host's
-        # run method is called 3 times. Then check the threads created,
-        # and finally compare results returned by the task queue against
-        # the mock results.
-        drones = [drone1, drone2, drone3]
-        for drone in drones:
-            drone.queue_call('foo')
-            drone.queue_call('bar')
-            mock_result = utils.CmdResult(
-                    stdout=cPickle.dumps(self.mock_return))
-            self._mock_host.run.expect_call(
-                    'python %s' % self.drone_utility_path,
-                    stdin=cPickle.dumps(drone.get_calls()), stdout_tee=None,
-                    connect_timeout=mock.is_instance_comparator(int)
-                    ).and_return(mock_result)
-        task_queue.execute(drones, wait=False)
-        self.assertTrue(set(task_queue.drone_threads.keys()) == set(drones))
-        for drone, thread in task_queue.drone_threads.iteritems():
-            self.assertTrue(drone.hostname in thread.getName())
-            self.assertTrue(thread.isDaemon())
-            self.assertRaises(RuntimeError, thread.start)
-        results = task_queue.get_results()
-        for drone, result in results.iteritems():
-            self.assertTrue(result == self.mock_return['results'])
-
-        # Test synchronous execute
-        drone1.queue_call('foo')
-        mock_result = utils.CmdResult(stdout=cPickle.dumps(self.mock_return))
-        self._mock_host.run.expect_call(
-                'python %s' % self.drone_utility_path,
-                stdin=cPickle.dumps(drone1.get_calls()), stdout_tee=None,
-                connect_timeout=mock.is_instance_comparator(int)).and_return(
-                        mock_result)
-        self.assertTrue(task_queue.execute(drones, wait=True)[drone1] ==
-                        self.mock_return['results'])
-        self.god.check_playback()
-
-
-
-