blob: 3fec9750756d18e0e69f3f1669f2c7b2fc645218 [file] [log] [blame]
Don Garrett580717f2015-07-24 14:11:22 -07001#!/usr/bin/python
2#
Prashanth B340fd1e2014-06-22 12:44:10 -07003# Copyright (c) 2014 The Chromium OS Authors. All rights reserved.
4# Use of this source code is governed by a BSD-style license that can be
5# found in the LICENSE file.
6
7"""Tests for the drone managers thread queue."""
8
9import cPickle
10import logging
11import Queue
Don Garrett580717f2015-07-24 14:11:22 -070012import unittest
Prashanth B340fd1e2014-06-22 12:44:10 -070013
14import common
15from autotest_lib.client.common_lib import utils
Justin Giorgi67ad67d2016-06-29 14:41:04 -070016from autotest_lib.client.common_lib.test_utils import mock
MK Ryu7911ad52015-12-18 11:40:04 -080017from autotest_lib.scheduler import drone_task_queue
Prashanth B340fd1e2014-06-22 12:44:10 -070018from autotest_lib.scheduler import drones
19from autotest_lib.scheduler import thread_lib
20from autotest_lib.server.hosts import ssh_host
21
22
23class DroneThreadLibTest(unittest.TestCase):
24 """Threaded task queue drone library tests."""
25
26 def create_remote_drone(self, hostname):
27 """Create and initialize a Remote Drone.
28
29 @param hostname: The name of the host for the remote drone.
30
31 @return: A remote drone instance.
32 """
33 drones.drone_utility.create_host.expect_call(hostname).and_return(
34 self._mock_host)
35 self._mock_host.is_up.expect_call().and_return(True)
Prashanth Bcf731e32014-08-10 18:03:57 -070036 return drones._RemoteDrone(hostname, timestamp_remote_calls=False)
Prashanth B340fd1e2014-06-22 12:44:10 -070037
38
39 def setUp(self):
40 self.god = mock.mock_god()
41 self._mock_host = self.god.create_mock_class(ssh_host.SSHHost,
42 'mock SSHHost')
43 self.god.stub_function(drones.drone_utility, 'create_host')
44 self.drone_utility_path = 'mock-drone-utility-path'
45 self.mock_return = {'results': ['mock results'],
46 'warnings': []}
47 self.god.stub_with(drones._RemoteDrone, '_drone_utility_path',
48 self.drone_utility_path)
49
50
51 def tearDown(self):
52 self.god.unstub_all()
53
54
55 def test_worker(self):
56 """Test the worker method of a ThreadedTaskQueue."""
57 # Invoke the worker method with a drone that has a queued call and check
58 # that the drones host.run method is invoked for the call, and the
59 # results queue contains the expected results.
60 drone = self.create_remote_drone('fakehostname')
61 task_queue = thread_lib.ThreadedTaskQueue()
62
63 drone.queue_call('foo')
64 mock_result = utils.CmdResult(stdout=cPickle.dumps(self.mock_return))
65 self._mock_host.run.expect_call(
66 'python %s' % self.drone_utility_path,
67 stdin=cPickle.dumps(drone.get_calls()), stdout_tee=None,
68 connect_timeout=mock.is_instance_comparator(int)).and_return(
69 mock_result)
70 task_queue.worker(drone, task_queue.results_queue)
71 result = task_queue.results_queue.get()
72
73 self.assertTrue(task_queue.results_queue.empty() and
74 result.drone == drone and
75 result.results == self.mock_return['results'])
76 self.god.check_playback()
77
78
79 def test_wait_on_drones(self):
80 """Test waiting on drone threads."""
81
82 def waiting_func(queue):
83 while len(queue.queue) < 2:
84 continue
85 logging.warning('Consuming thread finished.')
86 queue.put(3)
87
88 def exception_func(queue):
89 while queue.empty():
90 continue
91 queue.put(2)
92 logging.warning('Failing thread raising error.')
93 raise ValueError('Value error')
94
95 def quick_func():
96 return
97
98 # Create 2 threads, one of which raises an exception while the other
99 # just exits normally. Insert both threads into the thread_queue against
100 # mock drones and confirm that:
101 # a. The task queue waits for both threads, though the first one fails.
102 # b. The task queue records the right DroneTaskQueueException, which
103 # contains the original exception.
104 # c. The failing thread records its own exception instead of raising it.
105 task_queue = thread_lib.ThreadedTaskQueue()
106 drone1 = self.create_remote_drone('fakehostname1')
107 drone2 = self.create_remote_drone('fakehostname2')
108 sync_queue = Queue.Queue()
109
110 waiting_worker = thread_lib.ExceptionRememberingThread(
111 target=waiting_func, args=(sync_queue,))
112 failing_worker = thread_lib.ExceptionRememberingThread(
113 target=exception_func, args=(sync_queue,))
114 task_queue.drone_threads[drone1] = waiting_worker
115 task_queue.drone_threads[drone2] = failing_worker
116 master_thread = thread_lib.ExceptionRememberingThread(
117 target=task_queue.wait_on_drones)
118
119 thread_list = [failing_worker, waiting_worker, master_thread]
120 for thread in thread_list:
121 thread.setDaemon(True)
122 thread.start()
123 sync_queue.put(1)
124 master_thread.join()
125
126 self.assertTrue(isinstance(master_thread.err,
MK Ryu7911ad52015-12-18 11:40:04 -0800127 drone_task_queue.DroneTaskQueueException))
Prashanth B340fd1e2014-06-22 12:44:10 -0700128 self.assertTrue(isinstance(failing_worker.err, ValueError))
129 self.assertTrue(str(failing_worker.err) in str(master_thread.err))
130 self.assertTrue(3 in list(sync_queue.queue))
131 self.assertTrue(task_queue.drone_threads == {})
132
133 # Call wait_on_drones after the child thread has exited.
134 quick_worker = thread_lib.ExceptionRememberingThread(target=quick_func)
135 task_queue.drone_threads[drone1] = quick_worker
136 quick_worker.start()
137 while quick_worker.isAlive():
138 continue
139 task_queue.wait_on_drones()
140 self.assertTrue(task_queue.drone_threads == {})
141
142
143 def test_get_results(self):
144 """Test retrieving results from the results queue."""
145
146 # Insert results for the same drone twice into the results queue
147 # and confirm that an exception is raised.
148 task_queue = thread_lib.ThreadedTaskQueue()
149 drone1 = self.create_remote_drone('fakehostname1')
150 drone2 = self.create_remote_drone('fakehostname2')
151 task_queue.results_queue.put(
152 thread_lib.ThreadedTaskQueue.result(drone1, self.mock_return))
153 task_queue.results_queue.put(
154 thread_lib.ThreadedTaskQueue.result(drone1, self.mock_return))
155 self.god.stub_function(task_queue, 'wait_on_drones')
156 task_queue.wait_on_drones.expect_call()
MK Ryu7911ad52015-12-18 11:40:04 -0800157 self.assertRaises(drone_task_queue.DroneTaskQueueException,
158 task_queue.get_results)
Prashanth B340fd1e2014-06-22 12:44:10 -0700159
160 # Insert results for different drones and check that they're returned
161 # in a drone results dict.
162 self.assertTrue(task_queue.results_queue.empty())
163 task_queue.results_queue.put(
164 thread_lib.ThreadedTaskQueue.result(drone1, self.mock_return))
165 task_queue.results_queue.put(
166 thread_lib.ThreadedTaskQueue.result(drone2, self.mock_return))
167 task_queue.wait_on_drones.expect_call()
168 results = task_queue.get_results()
169 self.assertTrue(results[drone1] == self.mock_return and
170 results[drone2] == self.mock_return)
171 self.god.check_playback()
172
173
174 def test_execute(self):
175 """Test task queue execute."""
176 drone1 = self.create_remote_drone('fakehostname1')
177 drone2 = self.create_remote_drone('fakehostname2')
178 drone3 = self.create_remote_drone('fakehostname3')
179
180 # Check task queue exception conditions.
181 task_queue = thread_lib.ThreadedTaskQueue()
182 task_queue.results_queue.put(1)
MK Ryu7911ad52015-12-18 11:40:04 -0800183 self.assertRaises(drone_task_queue.DroneTaskQueueException,
Prashanth B340fd1e2014-06-22 12:44:10 -0700184 task_queue.execute, [])
185 task_queue.results_queue.get()
186 task_queue.drone_threads[drone1] = None
MK Ryu7911ad52015-12-18 11:40:04 -0800187 self.assertRaises(drone_task_queue.DroneTaskQueueException,
Prashanth B340fd1e2014-06-22 12:44:10 -0700188 task_queue.execute, [])
189 task_queue.drone_threads = {}
190
191 # Queue 2 calls against each drone, and confirm that the host's
192 # run method is called 3 times. Then check the threads created,
193 # and finally compare results returned by the task queue against
194 # the mock results.
195 drones = [drone1, drone2, drone3]
196 for drone in drones:
197 drone.queue_call('foo')
198 drone.queue_call('bar')
199 mock_result = utils.CmdResult(
200 stdout=cPickle.dumps(self.mock_return))
201 self._mock_host.run.expect_call(
202 'python %s' % self.drone_utility_path,
203 stdin=cPickle.dumps(drone.get_calls()), stdout_tee=None,
204 connect_timeout=mock.is_instance_comparator(int)
205 ).and_return(mock_result)
206 task_queue.execute(drones, wait=False)
207 self.assertTrue(set(task_queue.drone_threads.keys()) == set(drones))
208 for drone, thread in task_queue.drone_threads.iteritems():
209 self.assertTrue(drone.hostname in thread.getName())
210 self.assertTrue(thread.isDaemon())
211 self.assertRaises(RuntimeError, thread.start)
212 results = task_queue.get_results()
213 for drone, result in results.iteritems():
214 self.assertTrue(result == self.mock_return['results'])
215
216 # Test synchronous execute
217 drone1.queue_call('foo')
218 mock_result = utils.CmdResult(stdout=cPickle.dumps(self.mock_return))
219 self._mock_host.run.expect_call(
220 'python %s' % self.drone_utility_path,
221 stdin=cPickle.dumps(drone1.get_calls()), stdout_tee=None,
222 connect_timeout=mock.is_instance_comparator(int)).and_return(
223 mock_result)
224 self.assertTrue(task_queue.execute(drones, wait=True)[drone1] ==
225 self.mock_return['results'])
226 self.god.check_playback()
227
228
Don Garrett580717f2015-07-24 14:11:22 -0700229if __name__ == '__main__':
230 unittest.main()