[autotest] Threaded asynchronous task execution on drones.
This cl does the following:
1. Creates a ThreadedTaskQueue capable of executing calls across
drones in parallel.
2. Breaks drone_manager.refresh into 2 stages, a trigger and sync,
thereby making it asynchronous.
3. Adds stats, unittests and documentation.
TEST=Ran jobs, unittests.
BUG=chromium:374322, chromium:380459
DEPLOY=scheduler
Change-Id: Ib1257b362a6e4e335e46d51006aedb6b4a341bae
Reviewed-on: https://chromium-review.googlesource.com/205884
Reviewed-by: Prashanth B <beeps@chromium.org>
Tested-by: Prashanth B <beeps@chromium.org>
Commit-Queue: Prashanth B <beeps@chromium.org>
diff --git a/scheduler/drone_manager_unittest.py b/scheduler/drone_manager_unittest.py
index d2ee2a2..034cd38 100755
--- a/scheduler/drone_manager_unittest.py
+++ b/scheduler/drone_manager_unittest.py
@@ -1,11 +1,17 @@
#!/usr/bin/python
+import cPickle
import os, unittest
import common
from autotest_lib.client.common_lib import global_config
+from autotest_lib.client.common_lib import utils
from autotest_lib.client.common_lib.test_utils import mock
from autotest_lib.scheduler import drone_manager, drone_utility, drones
from autotest_lib.scheduler import scheduler_config, site_drone_manager
+from autotest_lib.scheduler import thread_lib
+from autotest_lib.scheduler import pidfile_monitor
+from autotest_lib.server.hosts import ssh_host
+
class MockDrone(drones._AbstractDrone):
def __init__(self, name, active_processes=0, max_processes=10,
@@ -331,5 +337,140 @@
self.assertFalse(self.manager._registered_pidfile_info)
+class AsyncDroneManager(unittest.TestCase):
+ _DRONE_INSTALL_DIR = '/drone/install/dir'
+ _RESULTS_DIR = '/results/dir'
+
+
+ def create_remote_drone(self, hostname, mock_host):
+ """Create and initialize a Remote Drone.
+
+ @return: A remote drone instance.
+ """
+ drones.drone_utility.create_host.expect_call(hostname).and_return(
+ mock_host)
+ mock_host.is_up.expect_call().and_return(True)
+ return drones._RemoteDrone(hostname)
+
+
+ def create_fake_pidfile_info(self, tag='tag', name='name'):
+ pidfile_id = self.manager.get_pidfile_id_from(tag, name)
+ self.manager.register_pidfile(pidfile_id)
+ return self.manager._registered_pidfile_info
+
+
+ def setUp(self):
+ self.god = mock.mock_god()
+ self.god.stub_with(drones, 'AUTOTEST_INSTALL_DIR',
+ self._DRONE_INSTALL_DIR)
+ self.manager = drone_manager.DroneManager()
+ self.god.stub_with(self.manager, '_results_dir', self._RESULTS_DIR)
+
+ # we don't want this to ever actually get called
+ self.god.stub_function(drones, 'get_drone')
+ # we don't want the DroneManager to go messing with global config
+ def do_nothing():
+ pass
+ self.god.stub_with(self.manager, 'refresh_drone_configs', do_nothing)
+
+ # set up some dummy drones with different mock hosts.
+ mock_host1 = self.god.create_mock_class(ssh_host.SSHHost,
+ 'mock SSHHost 1')
+ mock_host2 = self.god.create_mock_class(ssh_host.SSHHost,
+ 'mock SSHHost 2')
+ self.god.stub_function(drones.drone_utility, 'create_host')
+ self.mock_drone_1 = self.create_remote_drone(
+ 'fakehostname1', mock_host1)
+ self.mock_drone_2 = self.create_remote_drone(
+ 'fakehostname2', mock_host2)
+ for mock_drone in [self.mock_drone_1, self.mock_drone_2]:
+ self.manager._drones[mock_drone.hostname] = mock_drone
+
+ self.results_drone = MockDrone('results_drone', 0, 10)
+ self.manager._results_drone = self.results_drone
+ self.drone_utility_path = 'mock-drone-utility-path'
+ self.mock_return = {'results': ['mock results'],
+ 'warnings': []}
+
+
+ def tearDown(self):
+ self.god.unstub_all()
+
+ def test_trigger_refresh(self):
+ """Test drone manager trigger refresh."""
+ self.god.stub_with(drones._RemoteDrone, '_drone_utility_path',
+ self.drone_utility_path)
+
+ # Create some fake pidfiles and confirm that a refresh call is
+ # executed on each drone host, with the same pidfile paths. Then
+ # check that each drone gets a key in the returned results dictionary.
+ for i in range(0, 1):
+ pidfile_info = self.create_fake_pidfile_info(
+ 'tag%s' % i, 'name%s' %i)
+ pidfile_paths = [pidfile.path for pidfile in pidfile_info.keys()]
+ refresh_call = drone_utility.call('refresh', pidfile_paths)
+ expected_results = {}
+ for drone in self.manager.get_drones():
+ mock_result = utils.CmdResult(
+ stdout=cPickle.dumps(self.mock_return))
+ drone._host.run.expect_call(
+ 'python %s' % self.drone_utility_path,
+ stdin=cPickle.dumps([refresh_call]), stdout_tee=None,
+ connect_timeout=mock.is_instance_comparator(int)
+ ).and_return(mock_result)
+ expected_results[drone] = self.mock_return['results']
+
+ self.manager.trigger_refresh()
+ self.assertTrue(self.manager._refresh_task_queue.get_results() ==
+ expected_results)
+ self.god.check_playback()
+
+
+ def test_sync_refresh(self):
+ """Test drone manager sync refresh."""
+
+ # Insert some drone_utility results into the results queue, then
+ # check that get_results returns it in the right format, and that
+ # the rest of sync_refresh populates the right datastructures for
+ # correct handling of agents. Also confirm that this method of
+ # syncing is sufficient for the monitor to pick up the exit status
+ # of the process in the same way it would in handle_agents.
+ pidfile_path = 'results/hosts/host_id/job_id-name/.autoserv_execute'
+ pidfiles = {pidfile_path: '123\n12\n0\n'}
+ drone_utility_results = {
+ 'pidfiles': pidfiles,
+ 'autoserv_processes':{},
+ 'all_processes':{},
+ 'parse_processes':{},
+ 'pidfiles_second_read':pidfiles,
+ }
+ # Our manager instance isn't the drone manager singletone that the
+ # pidfile_monitor will use by default, becuase setUp doesn't call
+ # drone_manager.instance().
+ self.god.stub_with(pidfile_monitor, '_drone_manager', self.manager)
+ monitor = pidfile_monitor.PidfileRunMonitor()
+ monitor.pidfile_id = drone_manager.PidfileId(pidfile_path)
+ self.manager.register_pidfile(monitor.pidfile_id)
+ self.assertTrue(monitor._state.exit_status == None)
+
+ self.manager._refresh_task_queue.results_queue.put(
+ thread_lib.ThreadedTaskQueue.result(
+ self.mock_drone_1, [drone_utility_results]))
+ self.manager.sync_refresh()
+ pidfiles = self.manager._pidfiles
+ pidfile_id = pidfiles.keys()[0]
+ pidfile_contents = pidfiles[pidfile_id]
+
+ self.assertTrue(
+ pidfile_id.path == pidfile_path and
+ pidfile_contents.process.pid == 123 and
+ pidfile_contents.process.hostname ==
+ self.mock_drone_1.hostname and
+ pidfile_contents.exit_status == 12 and
+ pidfile_contents.num_tests_failed == 0)
+ self.assertTrue(monitor.exit_code() == 12)
+ self.god.check_playback()
+
+
if __name__ == '__main__':
unittest.main()