[autotest] Threaded asynchronous task execution on drones.

This cl does the following:
1. Creates a ThreadedTaskQueue capable of executing calls across
    drones in parallel.
2. Breaks drone_manager.refresh into 2 stages, a trigger and sync,
    thereby making it asynchronous.
3. Adds stats, unittests and documentation.

TEST=Ran jobs, unittests.
BUG=chromium:374322, chromium:380459
DEPLOY=scheduler

Change-Id: Ib1257b362a6e4e335e46d51006aedb6b4a341bae
Reviewed-on: https://chromium-review.googlesource.com/205884
Reviewed-by: Prashanth B <beeps@chromium.org>
Tested-by: Prashanth B <beeps@chromium.org>
Commit-Queue: Prashanth B <beeps@chromium.org>
diff --git a/scheduler/drone_manager_unittest.py b/scheduler/drone_manager_unittest.py
index d2ee2a2..034cd38 100755
--- a/scheduler/drone_manager_unittest.py
+++ b/scheduler/drone_manager_unittest.py
@@ -1,11 +1,17 @@
 #!/usr/bin/python
 
+import cPickle
 import os, unittest
 import common
 from autotest_lib.client.common_lib import global_config
+from autotest_lib.client.common_lib import utils
 from autotest_lib.client.common_lib.test_utils import mock
 from autotest_lib.scheduler import drone_manager, drone_utility, drones
 from autotest_lib.scheduler import scheduler_config, site_drone_manager
+from autotest_lib.scheduler import thread_lib
+from autotest_lib.scheduler import pidfile_monitor
+from autotest_lib.server.hosts import ssh_host
+
 
 class MockDrone(drones._AbstractDrone):
     def __init__(self, name, active_processes=0, max_processes=10,
@@ -331,5 +337,140 @@
         self.assertFalse(self.manager._registered_pidfile_info)
 
 
+class AsyncDroneManager(unittest.TestCase):
+    _DRONE_INSTALL_DIR = '/drone/install/dir'
+    _RESULTS_DIR = '/results/dir'
+
+
+    def create_remote_drone(self, hostname, mock_host):
+        """Create and initialize a Remote Drone.
+
+        @return: A remote drone instance.
+        """
+        drones.drone_utility.create_host.expect_call(hostname).and_return(
+                mock_host)
+        mock_host.is_up.expect_call().and_return(True)
+        return drones._RemoteDrone(hostname)
+
+
+    def create_fake_pidfile_info(self, tag='tag', name='name'):
+        pidfile_id = self.manager.get_pidfile_id_from(tag, name)
+        self.manager.register_pidfile(pidfile_id)
+        return self.manager._registered_pidfile_info
+
+
+    def setUp(self):
+        self.god = mock.mock_god()
+        self.god.stub_with(drones, 'AUTOTEST_INSTALL_DIR',
+                           self._DRONE_INSTALL_DIR)
+        self.manager = drone_manager.DroneManager()
+        self.god.stub_with(self.manager, '_results_dir', self._RESULTS_DIR)
+
+        # we don't want this to ever actually get called
+        self.god.stub_function(drones, 'get_drone')
+        # we don't want the DroneManager to go messing with global config
+        def do_nothing():
+            pass
+        self.god.stub_with(self.manager, 'refresh_drone_configs', do_nothing)
+
+        # set up some dummy drones with different mock hosts.
+        mock_host1 = self.god.create_mock_class(ssh_host.SSHHost,
+                                                'mock SSHHost 1')
+        mock_host2 = self.god.create_mock_class(ssh_host.SSHHost,
+                                                'mock SSHHost 2')
+        self.god.stub_function(drones.drone_utility, 'create_host')
+        self.mock_drone_1 = self.create_remote_drone(
+                'fakehostname1', mock_host1)
+        self.mock_drone_2 = self.create_remote_drone(
+                'fakehostname2', mock_host2)
+        for mock_drone in [self.mock_drone_1, self.mock_drone_2]:
+            self.manager._drones[mock_drone.hostname] = mock_drone
+
+        self.results_drone = MockDrone('results_drone', 0, 10)
+        self.manager._results_drone = self.results_drone
+        self.drone_utility_path = 'mock-drone-utility-path'
+        self.mock_return = {'results': ['mock results'],
+                            'warnings': []}
+
+
+    def tearDown(self):
+        self.god.unstub_all()
+
+    def test_trigger_refresh(self):
+        """Test drone manager trigger refresh."""
+        self.god.stub_with(drones._RemoteDrone, '_drone_utility_path',
+                           self.drone_utility_path)
+
+        # Create some fake pidfiles and confirm that a refresh call is
+        # executed on each drone host, with the same pidfile paths. Then
+        # check that each drone gets a key in the returned results dictionary.
+        for i in range(0, 1):
+            pidfile_info = self.create_fake_pidfile_info(
+                    'tag%s' % i, 'name%s' %i)
+        pidfile_paths = [pidfile.path for pidfile in pidfile_info.keys()]
+        refresh_call = drone_utility.call('refresh', pidfile_paths)
+        expected_results = {}
+        for drone in self.manager.get_drones():
+            mock_result = utils.CmdResult(
+                    stdout=cPickle.dumps(self.mock_return))
+            drone._host.run.expect_call(
+                    'python %s' % self.drone_utility_path,
+                    stdin=cPickle.dumps([refresh_call]), stdout_tee=None,
+                    connect_timeout=mock.is_instance_comparator(int)
+                ).and_return(mock_result)
+            expected_results[drone] = self.mock_return['results']
+
+        self.manager.trigger_refresh()
+        self.assertTrue(self.manager._refresh_task_queue.get_results() ==
+                        expected_results)
+        self.god.check_playback()
+
+
+    def test_sync_refresh(self):
+        """Test drone manager sync refresh."""
+
+        # Insert some drone_utility results into the results queue, then
+        # check that get_results returns it in the right format, and that
+        # the rest of sync_refresh populates the right datastructures for
+        # correct handling of agents. Also confirm that this method of
+        # syncing is sufficient for the monitor to pick up the exit status
+        # of the process in the same way it would in handle_agents.
+        pidfile_path = 'results/hosts/host_id/job_id-name/.autoserv_execute'
+        pidfiles = {pidfile_path: '123\n12\n0\n'}
+        drone_utility_results = {
+                'pidfiles': pidfiles,
+                'autoserv_processes':{},
+                'all_processes':{},
+                'parse_processes':{},
+                'pidfiles_second_read':pidfiles,
+        }
+        # Our manager instance isn't the drone manager singletone that the
+        # pidfile_monitor will use by default, becuase setUp doesn't call
+        # drone_manager.instance().
+        self.god.stub_with(pidfile_monitor, '_drone_manager', self.manager)
+        monitor = pidfile_monitor.PidfileRunMonitor()
+        monitor.pidfile_id = drone_manager.PidfileId(pidfile_path)
+        self.manager.register_pidfile(monitor.pidfile_id)
+        self.assertTrue(monitor._state.exit_status == None)
+
+        self.manager._refresh_task_queue.results_queue.put(
+                thread_lib.ThreadedTaskQueue.result(
+                    self.mock_drone_1, [drone_utility_results]))
+        self.manager.sync_refresh()
+        pidfiles = self.manager._pidfiles
+        pidfile_id = pidfiles.keys()[0]
+        pidfile_contents = pidfiles[pidfile_id]
+
+        self.assertTrue(
+                pidfile_id.path == pidfile_path and
+                pidfile_contents.process.pid == 123 and
+                pidfile_contents.process.hostname ==
+                        self.mock_drone_1.hostname and
+                pidfile_contents.exit_status == 12 and
+                pidfile_contents.num_tests_failed == 0)
+        self.assertTrue(monitor.exit_code() == 12)
+        self.god.check_playback()
+
+
 if __name__ == '__main__':
     unittest.main()