Don Garrett | 580717f | 2015-07-24 14:11:22 -0700 | [diff] [blame] | 1 | #!/usr/bin/python |
| 2 | # |
Prashanth B | 340fd1e | 2014-06-22 12:44:10 -0700 | [diff] [blame] | 3 | # Copyright (c) 2014 The Chromium OS Authors. All rights reserved. |
| 4 | # Use of this source code is governed by a BSD-style license that can be |
| 5 | # found in the LICENSE file. |
| 6 | |
| 7 | """Tests for the drone managers thread queue.""" |
| 8 | |
| 9 | import cPickle |
| 10 | import logging |
| 11 | import Queue |
Don Garrett | 580717f | 2015-07-24 14:11:22 -0700 | [diff] [blame] | 12 | import unittest |
Prashanth B | 340fd1e | 2014-06-22 12:44:10 -0700 | [diff] [blame] | 13 | |
| 14 | import common |
| 15 | from autotest_lib.client.common_lib import utils |
Justin Giorgi | 67ad67d | 2016-06-29 14:41:04 -0700 | [diff] [blame] | 16 | from autotest_lib.client.common_lib.test_utils import mock |
MK Ryu | 7911ad5 | 2015-12-18 11:40:04 -0800 | [diff] [blame] | 17 | from autotest_lib.scheduler import drone_task_queue |
Prashanth B | 340fd1e | 2014-06-22 12:44:10 -0700 | [diff] [blame] | 18 | from autotest_lib.scheduler import drones |
| 19 | from autotest_lib.scheduler import thread_lib |
| 20 | from autotest_lib.server.hosts import ssh_host |
| 21 | |
| 22 | |
| 23 | class DroneThreadLibTest(unittest.TestCase): |
| 24 | """Threaded task queue drone library tests.""" |
| 25 | |
| 26 | def create_remote_drone(self, hostname): |
| 27 | """Create and initialize a Remote Drone. |
| 28 | |
| 29 | @param hostname: The name of the host for the remote drone. |
| 30 | |
| 31 | @return: A remote drone instance. |
| 32 | """ |
| 33 | drones.drone_utility.create_host.expect_call(hostname).and_return( |
| 34 | self._mock_host) |
| 35 | self._mock_host.is_up.expect_call().and_return(True) |
Prashanth B | cf731e3 | 2014-08-10 18:03:57 -0700 | [diff] [blame] | 36 | return drones._RemoteDrone(hostname, timestamp_remote_calls=False) |
Prashanth B | 340fd1e | 2014-06-22 12:44:10 -0700 | [diff] [blame] | 37 | |
| 38 | |
| 39 | def setUp(self): |
| 40 | self.god = mock.mock_god() |
| 41 | self._mock_host = self.god.create_mock_class(ssh_host.SSHHost, |
| 42 | 'mock SSHHost') |
| 43 | self.god.stub_function(drones.drone_utility, 'create_host') |
| 44 | self.drone_utility_path = 'mock-drone-utility-path' |
| 45 | self.mock_return = {'results': ['mock results'], |
| 46 | 'warnings': []} |
| 47 | self.god.stub_with(drones._RemoteDrone, '_drone_utility_path', |
| 48 | self.drone_utility_path) |
| 49 | |
| 50 | |
| 51 | def tearDown(self): |
| 52 | self.god.unstub_all() |
| 53 | |
| 54 | |
| 55 | def test_worker(self): |
| 56 | """Test the worker method of a ThreadedTaskQueue.""" |
| 57 | # Invoke the worker method with a drone that has a queued call and check |
| 58 | # that the drones host.run method is invoked for the call, and the |
| 59 | # results queue contains the expected results. |
| 60 | drone = self.create_remote_drone('fakehostname') |
| 61 | task_queue = thread_lib.ThreadedTaskQueue() |
| 62 | |
| 63 | drone.queue_call('foo') |
| 64 | mock_result = utils.CmdResult(stdout=cPickle.dumps(self.mock_return)) |
| 65 | self._mock_host.run.expect_call( |
| 66 | 'python %s' % self.drone_utility_path, |
| 67 | stdin=cPickle.dumps(drone.get_calls()), stdout_tee=None, |
| 68 | connect_timeout=mock.is_instance_comparator(int)).and_return( |
| 69 | mock_result) |
| 70 | task_queue.worker(drone, task_queue.results_queue) |
| 71 | result = task_queue.results_queue.get() |
| 72 | |
| 73 | self.assertTrue(task_queue.results_queue.empty() and |
| 74 | result.drone == drone and |
| 75 | result.results == self.mock_return['results']) |
| 76 | self.god.check_playback() |
| 77 | |
| 78 | |
| 79 | def test_wait_on_drones(self): |
| 80 | """Test waiting on drone threads.""" |
| 81 | |
| 82 | def waiting_func(queue): |
| 83 | while len(queue.queue) < 2: |
| 84 | continue |
| 85 | logging.warning('Consuming thread finished.') |
| 86 | queue.put(3) |
| 87 | |
| 88 | def exception_func(queue): |
| 89 | while queue.empty(): |
| 90 | continue |
| 91 | queue.put(2) |
| 92 | logging.warning('Failing thread raising error.') |
| 93 | raise ValueError('Value error') |
| 94 | |
| 95 | def quick_func(): |
| 96 | return |
| 97 | |
| 98 | # Create 2 threads, one of which raises an exception while the other |
| 99 | # just exits normally. Insert both threads into the thread_queue against |
| 100 | # mock drones and confirm that: |
| 101 | # a. The task queue waits for both threads, though the first one fails. |
| 102 | # b. The task queue records the right DroneTaskQueueException, which |
| 103 | # contains the original exception. |
| 104 | # c. The failing thread records its own exception instead of raising it. |
| 105 | task_queue = thread_lib.ThreadedTaskQueue() |
| 106 | drone1 = self.create_remote_drone('fakehostname1') |
| 107 | drone2 = self.create_remote_drone('fakehostname2') |
| 108 | sync_queue = Queue.Queue() |
| 109 | |
| 110 | waiting_worker = thread_lib.ExceptionRememberingThread( |
| 111 | target=waiting_func, args=(sync_queue,)) |
| 112 | failing_worker = thread_lib.ExceptionRememberingThread( |
| 113 | target=exception_func, args=(sync_queue,)) |
| 114 | task_queue.drone_threads[drone1] = waiting_worker |
| 115 | task_queue.drone_threads[drone2] = failing_worker |
| 116 | master_thread = thread_lib.ExceptionRememberingThread( |
| 117 | target=task_queue.wait_on_drones) |
| 118 | |
| 119 | thread_list = [failing_worker, waiting_worker, master_thread] |
| 120 | for thread in thread_list: |
| 121 | thread.setDaemon(True) |
| 122 | thread.start() |
| 123 | sync_queue.put(1) |
| 124 | master_thread.join() |
| 125 | |
| 126 | self.assertTrue(isinstance(master_thread.err, |
MK Ryu | 7911ad5 | 2015-12-18 11:40:04 -0800 | [diff] [blame] | 127 | drone_task_queue.DroneTaskQueueException)) |
Prashanth B | 340fd1e | 2014-06-22 12:44:10 -0700 | [diff] [blame] | 128 | self.assertTrue(isinstance(failing_worker.err, ValueError)) |
| 129 | self.assertTrue(str(failing_worker.err) in str(master_thread.err)) |
| 130 | self.assertTrue(3 in list(sync_queue.queue)) |
| 131 | self.assertTrue(task_queue.drone_threads == {}) |
| 132 | |
| 133 | # Call wait_on_drones after the child thread has exited. |
| 134 | quick_worker = thread_lib.ExceptionRememberingThread(target=quick_func) |
| 135 | task_queue.drone_threads[drone1] = quick_worker |
| 136 | quick_worker.start() |
| 137 | while quick_worker.isAlive(): |
| 138 | continue |
| 139 | task_queue.wait_on_drones() |
| 140 | self.assertTrue(task_queue.drone_threads == {}) |
| 141 | |
| 142 | |
| 143 | def test_get_results(self): |
| 144 | """Test retrieving results from the results queue.""" |
| 145 | |
| 146 | # Insert results for the same drone twice into the results queue |
| 147 | # and confirm that an exception is raised. |
| 148 | task_queue = thread_lib.ThreadedTaskQueue() |
| 149 | drone1 = self.create_remote_drone('fakehostname1') |
| 150 | drone2 = self.create_remote_drone('fakehostname2') |
| 151 | task_queue.results_queue.put( |
| 152 | thread_lib.ThreadedTaskQueue.result(drone1, self.mock_return)) |
| 153 | task_queue.results_queue.put( |
| 154 | thread_lib.ThreadedTaskQueue.result(drone1, self.mock_return)) |
| 155 | self.god.stub_function(task_queue, 'wait_on_drones') |
| 156 | task_queue.wait_on_drones.expect_call() |
MK Ryu | 7911ad5 | 2015-12-18 11:40:04 -0800 | [diff] [blame] | 157 | self.assertRaises(drone_task_queue.DroneTaskQueueException, |
| 158 | task_queue.get_results) |
Prashanth B | 340fd1e | 2014-06-22 12:44:10 -0700 | [diff] [blame] | 159 | |
| 160 | # Insert results for different drones and check that they're returned |
| 161 | # in a drone results dict. |
| 162 | self.assertTrue(task_queue.results_queue.empty()) |
| 163 | task_queue.results_queue.put( |
| 164 | thread_lib.ThreadedTaskQueue.result(drone1, self.mock_return)) |
| 165 | task_queue.results_queue.put( |
| 166 | thread_lib.ThreadedTaskQueue.result(drone2, self.mock_return)) |
| 167 | task_queue.wait_on_drones.expect_call() |
| 168 | results = task_queue.get_results() |
| 169 | self.assertTrue(results[drone1] == self.mock_return and |
| 170 | results[drone2] == self.mock_return) |
| 171 | self.god.check_playback() |
| 172 | |
| 173 | |
| 174 | def test_execute(self): |
| 175 | """Test task queue execute.""" |
| 176 | drone1 = self.create_remote_drone('fakehostname1') |
| 177 | drone2 = self.create_remote_drone('fakehostname2') |
| 178 | drone3 = self.create_remote_drone('fakehostname3') |
| 179 | |
| 180 | # Check task queue exception conditions. |
| 181 | task_queue = thread_lib.ThreadedTaskQueue() |
| 182 | task_queue.results_queue.put(1) |
MK Ryu | 7911ad5 | 2015-12-18 11:40:04 -0800 | [diff] [blame] | 183 | self.assertRaises(drone_task_queue.DroneTaskQueueException, |
Prashanth B | 340fd1e | 2014-06-22 12:44:10 -0700 | [diff] [blame] | 184 | task_queue.execute, []) |
| 185 | task_queue.results_queue.get() |
| 186 | task_queue.drone_threads[drone1] = None |
MK Ryu | 7911ad5 | 2015-12-18 11:40:04 -0800 | [diff] [blame] | 187 | self.assertRaises(drone_task_queue.DroneTaskQueueException, |
Prashanth B | 340fd1e | 2014-06-22 12:44:10 -0700 | [diff] [blame] | 188 | task_queue.execute, []) |
| 189 | task_queue.drone_threads = {} |
| 190 | |
| 191 | # Queue 2 calls against each drone, and confirm that the host's |
| 192 | # run method is called 3 times. Then check the threads created, |
| 193 | # and finally compare results returned by the task queue against |
| 194 | # the mock results. |
| 195 | drones = [drone1, drone2, drone3] |
| 196 | for drone in drones: |
| 197 | drone.queue_call('foo') |
| 198 | drone.queue_call('bar') |
| 199 | mock_result = utils.CmdResult( |
| 200 | stdout=cPickle.dumps(self.mock_return)) |
| 201 | self._mock_host.run.expect_call( |
| 202 | 'python %s' % self.drone_utility_path, |
| 203 | stdin=cPickle.dumps(drone.get_calls()), stdout_tee=None, |
| 204 | connect_timeout=mock.is_instance_comparator(int) |
| 205 | ).and_return(mock_result) |
| 206 | task_queue.execute(drones, wait=False) |
| 207 | self.assertTrue(set(task_queue.drone_threads.keys()) == set(drones)) |
| 208 | for drone, thread in task_queue.drone_threads.iteritems(): |
| 209 | self.assertTrue(drone.hostname in thread.getName()) |
| 210 | self.assertTrue(thread.isDaemon()) |
| 211 | self.assertRaises(RuntimeError, thread.start) |
| 212 | results = task_queue.get_results() |
| 213 | for drone, result in results.iteritems(): |
| 214 | self.assertTrue(result == self.mock_return['results']) |
| 215 | |
| 216 | # Test synchronous execute |
| 217 | drone1.queue_call('foo') |
| 218 | mock_result = utils.CmdResult(stdout=cPickle.dumps(self.mock_return)) |
| 219 | self._mock_host.run.expect_call( |
| 220 | 'python %s' % self.drone_utility_path, |
| 221 | stdin=cPickle.dumps(drone1.get_calls()), stdout_tee=None, |
| 222 | connect_timeout=mock.is_instance_comparator(int)).and_return( |
| 223 | mock_result) |
| 224 | self.assertTrue(task_queue.execute(drones, wait=True)[drone1] == |
| 225 | self.mock_return['results']) |
| 226 | self.god.check_playback() |
| 227 | |
| 228 | |
Don Garrett | 580717f | 2015-07-24 14:11:22 -0700 | [diff] [blame] | 229 | if __name__ == '__main__': |
| 230 | unittest.main() |