[autotest] Threaded asynchronous task execution on drones.
This cl does the following:
1. Creates a ThreadedTaskQueue capable of executing calls across
drones in parallel.
2. Breaks drone_manager.refresh into 2 stages, a trigger and sync,
thereby making it asynchronous.
3. Creates a localhost host object for the localhost drone so we run
drone_utility through another process instead of directly importing it
as a module. This fits better with the overall drone manager design, and
allows us to multithread the monitoring of drones while still using
signals within drone utility.
4. Adds stats, unittests and documentation.
TEST=Ran jobs, added unittests.
BUG=chromium:374322, chromium:380459
DEPLOY=scheduler
Change-Id: I950cf260fdc3e5d1a2d4f6fdb4f5954c6371c871
Reviewed-on: https://chromium-review.googlesource.com/207094
Reviewed-by: Prashanth B <beeps@chromium.org>
Tested-by: Prashanth B <beeps@chromium.org>
Commit-Queue: Prashanth B <beeps@chromium.org>
diff --git a/scheduler/drone_manager_unittest.py b/scheduler/drone_manager_unittest.py
index d2ee2a2..f06e15a 100755
--- a/scheduler/drone_manager_unittest.py
+++ b/scheduler/drone_manager_unittest.py
@@ -1,11 +1,18 @@
#!/usr/bin/python
+import cPickle
import os, unittest
import common
+from autotest_lib.client.bin import local_host
from autotest_lib.client.common_lib import global_config
+from autotest_lib.client.common_lib import utils
from autotest_lib.client.common_lib.test_utils import mock
from autotest_lib.scheduler import drone_manager, drone_utility, drones
from autotest_lib.scheduler import scheduler_config, site_drone_manager
+from autotest_lib.scheduler import thread_lib
+from autotest_lib.scheduler import pidfile_monitor
+from autotest_lib.server.hosts import ssh_host
+
class MockDrone(drones._AbstractDrone):
def __init__(self, name, active_processes=0, max_processes=10,
@@ -331,5 +338,152 @@
self.assertFalse(self.manager._registered_pidfile_info)
+class ThreadedDroneTest(unittest.TestCase):
+ _DRONE_INSTALL_DIR = '/drone/install/dir'
+ _RESULTS_DIR = '/results/dir'
+ _DRONE_CLASS = drones._RemoteDrone
+ _DRONE_HOST = ssh_host.SSHHost
+
+
+ def create_drone(self, drone_hostname, mock_hostname):
+ """Create and initialize a Remote Drone.
+
+ @return: A remote drone instance.
+ """
+ mock_host = self.god.create_mock_class(self._DRONE_HOST, mock_hostname)
+ self.god.stub_function(drones.drone_utility, 'create_host')
+ drones.drone_utility.create_host.expect_call(drone_hostname).and_return(
+ mock_host)
+ mock_host.is_up.expect_call().and_return(True)
+ return self._DRONE_CLASS(drone_hostname)
+
+
+ def create_fake_pidfile_info(self, tag='tag', name='name'):
+ pidfile_id = self.manager.get_pidfile_id_from(tag, name)
+ self.manager.register_pidfile(pidfile_id)
+ return self.manager._registered_pidfile_info
+
+
+ def setUp(self):
+ self.god = mock.mock_god()
+ self.god.stub_with(drones, 'AUTOTEST_INSTALL_DIR',
+ self._DRONE_INSTALL_DIR)
+ self.manager = drone_manager.DroneManager()
+ self.god.stub_with(self.manager, '_results_dir', self._RESULTS_DIR)
+
+ # we don't want this to ever actually get called
+ self.god.stub_function(drones, 'get_drone')
+ # we don't want the DroneManager to go messing with global config
+ def do_nothing():
+ pass
+ self.god.stub_with(self.manager, 'refresh_drone_configs', do_nothing)
+
+ self.results_drone = MockDrone('results_drone', 0, 10)
+ self.manager._results_drone = self.results_drone
+ self.drone_utility_path = 'mock-drone-utility-path'
+ self.mock_return = {'results': ['mock results'],
+ 'warnings': []}
+
+
+ def tearDown(self):
+ self.god.unstub_all()
+
+ def test_trigger_refresh(self):
+ """Test drone manager trigger refresh."""
+ self.god.stub_with(self._DRONE_CLASS, '_drone_utility_path',
+ self.drone_utility_path)
+ mock_drone = self.create_drone('fakedrone1', 'fakehost1')
+ self.manager._drones[mock_drone.hostname] = mock_drone
+
+ # Create some fake pidfiles and confirm that a refresh call is
+ # executed on each drone host, with the same pidfile paths. Then
+ # check that each drone gets a key in the returned results dictionary.
+ for i in range(0, 1):
+ pidfile_info = self.create_fake_pidfile_info(
+ 'tag%s' % i, 'name%s' %i)
+ pidfile_paths = [pidfile.path for pidfile in pidfile_info.keys()]
+ refresh_call = drone_utility.call('refresh', pidfile_paths)
+ expected_results = {}
+ mock_result = utils.CmdResult(
+ stdout=cPickle.dumps(self.mock_return))
+ for drone in self.manager.get_drones():
+ drone._host.run.expect_call(
+ 'python %s' % self.drone_utility_path,
+ stdin=cPickle.dumps([refresh_call]), stdout_tee=None,
+ connect_timeout=mock.is_instance_comparator(int)
+ ).and_return(mock_result)
+ expected_results[drone] = self.mock_return['results']
+ self.manager.trigger_refresh()
+ self.assertTrue(self.manager._refresh_task_queue.get_results() ==
+ expected_results)
+ self.god.check_playback()
+
+
+ def test_sync_refresh(self):
+ """Test drone manager sync refresh."""
+
+ mock_drone = self.create_drone('fakedrone1', 'fakehost1')
+ self.manager._drones[mock_drone.hostname] = mock_drone
+
+ # Insert some drone_utility results into the results queue, then
+ # check that get_results returns it in the right format, and that
+ # the rest of sync_refresh populates the right datastructures for
+ # correct handling of agents. Also confirm that this method of
+ # syncing is sufficient for the monitor to pick up the exit status
+ # of the process in the same way it would in handle_agents.
+ pidfile_path = 'results/hosts/host_id/job_id-name/.autoserv_execute'
+ pidfiles = {pidfile_path: '123\n12\n0\n'}
+ drone_utility_results = {
+ 'pidfiles': pidfiles,
+ 'autoserv_processes':{},
+ 'all_processes':{},
+ 'parse_processes':{},
+ 'pidfiles_second_read':pidfiles,
+ }
+ # Our manager instance isn't the drone manager singletone that the
+ # pidfile_monitor will use by default, becuase setUp doesn't call
+ # drone_manager.instance().
+ self.god.stub_with(pidfile_monitor, '_drone_manager', self.manager)
+ monitor = pidfile_monitor.PidfileRunMonitor()
+ monitor.pidfile_id = drone_manager.PidfileId(pidfile_path)
+ self.manager.register_pidfile(monitor.pidfile_id)
+ self.assertTrue(monitor._state.exit_status == None)
+
+ self.manager._refresh_task_queue.results_queue.put(
+ thread_lib.ThreadedTaskQueue.result(
+ mock_drone, [drone_utility_results]))
+ self.manager.sync_refresh()
+ pidfiles = self.manager._pidfiles
+ pidfile_id = pidfiles.keys()[0]
+ pidfile_contents = pidfiles[pidfile_id]
+
+ self.assertTrue(
+ pidfile_id.path == pidfile_path and
+ pidfile_contents.process.pid == 123 and
+ pidfile_contents.process.hostname ==
+ mock_drone.hostname and
+ pidfile_contents.exit_status == 12 and
+ pidfile_contents.num_tests_failed == 0)
+ self.assertTrue(monitor.exit_code() == 12)
+ self.god.check_playback()
+
+
+class ThreadedLocalhostDroneTest(ThreadedDroneTest):
+ _DRONE_CLASS = drones._LocalDrone
+ _DRONE_HOST = local_host.LocalHost
+
+
+ def create_drone(self, drone_hostname, mock_hostname):
+ """Create and initialize a Remote Drone.
+
+ @return: A remote drone instance.
+ """
+ mock_host = self.god.create_mock_class(self._DRONE_HOST, mock_hostname)
+ self.god.stub_function(drones.drone_utility, 'create_host')
+ local_drone = self._DRONE_CLASS()
+ self.god.stub_with(local_drone, '_host', mock_host)
+ return local_drone
+
+
if __name__ == '__main__':
unittest.main()