blob: 4fa2abac3b66a38b9b0b1dec120bb55a0f899ac6 [file] [log] [blame]
showardce38e0c2008-05-29 19:36:16 +00001#!/usr/bin/python
2
showardf13a9e22009-12-18 22:54:09 +00003import time, subprocess, os, StringIO, tempfile, datetime, shutil
4import gc, logging
showardce38e0c2008-05-29 19:36:16 +00005import common
mbligh8bcd23a2009-02-03 19:14:06 +00006import MySQLdb
showard364fe862008-10-17 02:01:16 +00007from autotest_lib.frontend import setup_django_environment
showardb6d16622009-05-26 19:35:29 +00008from autotest_lib.frontend.afe import frontend_test_utils
jadmanskifb7cfb12008-07-09 14:13:21 +00009from autotest_lib.client.common_lib import global_config, host_protections
jadmanski3d161b02008-06-06 15:43:36 +000010from autotest_lib.client.common_lib.test_utils import mock
showardf13a9e22009-12-18 22:54:09 +000011from autotest_lib.client.common_lib.test_utils import unittest
showard442e71e2008-10-06 10:05:20 +000012from autotest_lib.database import database_connection, migrate
showard21baa452008-10-21 00:08:39 +000013from autotest_lib.frontend import thread_local
showardb1e51872008-10-07 11:08:18 +000014from autotest_lib.frontend.afe import models
showard170873e2009-01-07 00:22:26 +000015from autotest_lib.scheduler import monitor_db, drone_manager, email_manager
showardf13a9e22009-12-18 22:54:09 +000016from autotest_lib.scheduler import scheduler_config, gc_stats
showardce38e0c2008-05-29 19:36:16 +000017
18_DEBUG = False
19
showarda3c58572009-03-12 20:36:59 +000020
showard9bb960b2009-11-19 01:02:11 +000021class DummyAgentTask(object):
showardd1195652009-12-08 22:21:02 +000022 num_processes = 1
23 owner_username = 'my_user'
showard9bb960b2009-11-19 01:02:11 +000024
25
showard170873e2009-01-07 00:22:26 +000026class DummyAgent(object):
showard8cc058f2009-09-08 16:26:33 +000027 started = False
showard170873e2009-01-07 00:22:26 +000028 _is_done = False
showardd1195652009-12-08 22:21:02 +000029 host_ids = ()
30 queue_entry_ids = ()
31
32 def __init__(self):
33 self.task = DummyAgentTask()
showard170873e2009-01-07 00:22:26 +000034
showard170873e2009-01-07 00:22:26 +000035
36 def tick(self):
showard8cc058f2009-09-08 16:26:33 +000037 self.started = True
showard170873e2009-01-07 00:22:26 +000038
39
40 def is_done(self):
41 return self._is_done
42
43
44 def set_done(self, done):
45 self._is_done = done
showard04c82c52008-05-29 19:38:12 +000046
showard56193bb2008-08-13 20:07:41 +000047
48class IsRow(mock.argument_comparator):
49 def __init__(self, row_id):
50 self.row_id = row_id
showardce38e0c2008-05-29 19:36:16 +000051
52
showard56193bb2008-08-13 20:07:41 +000053 def is_satisfied_by(self, parameter):
54 return list(parameter)[0] == self.row_id
55
56
57 def __str__(self):
58 return 'row with id %s' % self.row_id
59
60
showardd3dc1992009-04-22 21:01:40 +000061class IsAgentWithTask(mock.argument_comparator):
mbligh1ef218d2009-08-03 16:57:56 +000062 def __init__(self, task):
63 self._task = task
showardd3dc1992009-04-22 21:01:40 +000064
65
mbligh1ef218d2009-08-03 16:57:56 +000066 def is_satisfied_by(self, parameter):
67 if not isinstance(parameter, monitor_db.Agent):
68 return False
69 tasks = list(parameter.queue.queue)
70 if len(tasks) != 1:
71 return False
72 return tasks[0] == self._task
showardd3dc1992009-04-22 21:01:40 +000073
74
showard6b733412009-04-27 20:09:18 +000075def _set_host_and_qe_ids(agent_or_task, id_list=None):
76 if id_list is None:
77 id_list = []
78 agent_or_task.host_ids = agent_or_task.queue_entry_ids = id_list
79
80
showardb6d16622009-05-26 19:35:29 +000081class BaseSchedulerTest(unittest.TestCase,
82 frontend_test_utils.FrontendTestMixin):
showard50c0e712008-09-22 16:20:37 +000083 _config_section = 'AUTOTEST_WEB'
showardce38e0c2008-05-29 19:36:16 +000084
jadmanski0afbb632008-06-06 21:10:57 +000085 def _do_query(self, sql):
showardb1e51872008-10-07 11:08:18 +000086 self._database.execute(sql)
showardce38e0c2008-05-29 19:36:16 +000087
88
showardb6d16622009-05-26 19:35:29 +000089 def _set_monitor_stubs(self):
90 # Clear the instance cache as this is a brand new database.
91 monitor_db.DBObject._clear_instance_cache()
showardce38e0c2008-05-29 19:36:16 +000092
showardb1e51872008-10-07 11:08:18 +000093 self._database = (
94 database_connection.DatabaseConnection.get_test_database(
95 self._test_db_file))
96 self._database.connect()
97 self._database.debug = _DEBUG
showardce38e0c2008-05-29 19:36:16 +000098
showardb1e51872008-10-07 11:08:18 +000099 monitor_db._db = self._database
showard170873e2009-01-07 00:22:26 +0000100 monitor_db._drone_manager._results_dir = '/test/path'
101 monitor_db._drone_manager._temporary_directory = '/test/path/tmp'
showard56193bb2008-08-13 20:07:41 +0000102
103
showard56193bb2008-08-13 20:07:41 +0000104 def setUp(self):
showardb6d16622009-05-26 19:35:29 +0000105 self._frontend_common_setup()
showard56193bb2008-08-13 20:07:41 +0000106 self._set_monitor_stubs()
107 self._dispatcher = monitor_db.Dispatcher()
showardce38e0c2008-05-29 19:36:16 +0000108
109
showard56193bb2008-08-13 20:07:41 +0000110 def tearDown(self):
showardb6d16622009-05-26 19:35:29 +0000111 self._database.disconnect()
112 self._frontend_common_teardown()
showardce38e0c2008-05-29 19:36:16 +0000113
114
showard56193bb2008-08-13 20:07:41 +0000115 def _update_hqe(self, set, where=''):
showardeab66ce2009-12-23 00:03:56 +0000116 query = 'UPDATE afe_host_queue_entries SET ' + set
showard56193bb2008-08-13 20:07:41 +0000117 if where:
118 query += ' WHERE ' + where
119 self._do_query(query)
120
121
showarda3c58572009-03-12 20:36:59 +0000122class DBObjectTest(BaseSchedulerTest):
123 # It may seem odd to subclass BaseSchedulerTest for this but it saves us
124 # duplicating some setup work for what we want to test.
125
126
127 def test_compare_fields_in_row(self):
128 host = monitor_db.Host(id=1)
129 fields = list(host._fields)
130 row_data = [getattr(host, fieldname) for fieldname in fields]
131 self.assertEqual({}, host._compare_fields_in_row(row_data))
132 row_data[fields.index('hostname')] = 'spam'
133 self.assertEqual({'hostname': ('host1', 'spam')},
134 host._compare_fields_in_row(row_data))
135 row_data[fields.index('id')] = 23
136 self.assertEqual({'hostname': ('host1', 'spam'), 'id': (1, 23)},
137 host._compare_fields_in_row(row_data))
138
139
showarddae680a2009-10-12 20:26:43 +0000140 def test_compare_fields_in_row_datetime_ignores_microseconds(self):
141 datetime_with_us = datetime.datetime(2009, 10, 07, 12, 34, 56, 7890)
142 datetime_without_us = datetime.datetime(2009, 10, 07, 12, 34, 56, 0)
143 class TestTable(monitor_db.DBObject):
144 _table_name = 'test_table'
145 _fields = ('id', 'test_datetime')
146 tt = TestTable(row=[1, datetime_without_us])
147 self.assertEqual({}, tt._compare_fields_in_row([1, datetime_with_us]))
148
149
showarda3c58572009-03-12 20:36:59 +0000150 def test_always_query(self):
151 host_a = monitor_db.Host(id=2)
152 self.assertEqual(host_a.hostname, 'host2')
showardeab66ce2009-12-23 00:03:56 +0000153 self._do_query('UPDATE afe_hosts SET hostname="host2-updated" '
154 'WHERE id=2')
showarda3c58572009-03-12 20:36:59 +0000155 host_b = monitor_db.Host(id=2, always_query=True)
156 self.assert_(host_a is host_b, 'Cached instance not returned.')
157 self.assertEqual(host_a.hostname, 'host2-updated',
158 'Database was not re-queried')
159
160 # If either of these are called, a query was made when it shouldn't be.
161 host_a._compare_fields_in_row = lambda _: self.fail('eek! a query!')
showard12f3e322009-05-13 21:27:42 +0000162 host_a._update_fields_from_row = host_a._compare_fields_in_row
showarda3c58572009-03-12 20:36:59 +0000163 host_c = monitor_db.Host(id=2, always_query=False)
164 self.assert_(host_a is host_c, 'Cached instance not returned')
165
166
167 def test_delete(self):
168 host = monitor_db.Host(id=3)
169 host.delete()
170 host = self.assertRaises(monitor_db.DBError, monitor_db.Host, id=3,
171 always_query=False)
172 host = self.assertRaises(monitor_db.DBError, monitor_db.Host, id=3,
173 always_query=True)
174
showard76e29d12009-04-15 21:53:10 +0000175 def test_save(self):
176 # Dummy Job to avoid creating a one in the HostQueueEntry __init__.
177 class MockJob(object):
178 def __init__(self, id):
179 pass
180 def tag(self):
181 return 'MockJob'
182 self.god.stub_with(monitor_db, 'Job', MockJob)
183 hqe = monitor_db.HostQueueEntry(
184 new_record=True,
showard12f3e322009-05-13 21:27:42 +0000185 row=[0, 1, 2, 'Queued', None, 0, 0, 0, '.', None, False, None])
showard76e29d12009-04-15 21:53:10 +0000186 hqe.save()
187 new_id = hqe.id
188 # Force a re-query and verify that the correct data was stored.
189 monitor_db.DBObject._clear_instance_cache()
190 hqe = monitor_db.HostQueueEntry(id=new_id)
191 self.assertEqual(hqe.id, new_id)
192 self.assertEqual(hqe.job_id, 1)
193 self.assertEqual(hqe.host_id, 2)
194 self.assertEqual(hqe.status, 'Queued')
195 self.assertEqual(hqe.meta_host, None)
196 self.assertEqual(hqe.active, False)
197 self.assertEqual(hqe.complete, False)
198 self.assertEqual(hqe.deleted, False)
199 self.assertEqual(hqe.execution_subdir, '.')
200 self.assertEqual(hqe.atomic_group_id, None)
showard12f3e322009-05-13 21:27:42 +0000201 self.assertEqual(hqe.started_on, None)
showarda3c58572009-03-12 20:36:59 +0000202
203
showardb2e2c322008-10-14 17:33:55 +0000204class DispatcherSchedulingTest(BaseSchedulerTest):
showard56193bb2008-08-13 20:07:41 +0000205 _jobs_scheduled = []
206
showard89f84db2009-03-12 20:39:13 +0000207
208 def tearDown(self):
209 super(DispatcherSchedulingTest, self).tearDown()
210
211
showard56193bb2008-08-13 20:07:41 +0000212 def _set_monitor_stubs(self):
213 super(DispatcherSchedulingTest, self)._set_monitor_stubs()
showard89f84db2009-03-12 20:39:13 +0000214
showard8cc058f2009-09-08 16:26:33 +0000215 def hqe__do_schedule_pre_job_tasks_stub(queue_entry):
216 """Called by HostQueueEntry.run()."""
showard77182562009-06-10 00:16:05 +0000217 self._record_job_scheduled(queue_entry.job.id, queue_entry.host.id)
showard89f84db2009-03-12 20:39:13 +0000218 queue_entry.set_status('Starting')
showard89f84db2009-03-12 20:39:13 +0000219
showard8cc058f2009-09-08 16:26:33 +0000220 self.god.stub_with(monitor_db.HostQueueEntry,
221 '_do_schedule_pre_job_tasks',
222 hqe__do_schedule_pre_job_tasks_stub)
showard89f84db2009-03-12 20:39:13 +0000223
224 def hqe_queue_log_record_stub(self, log_line):
225 """No-Op to avoid calls down to the _drone_manager during tests."""
226
227 self.god.stub_with(monitor_db.HostQueueEntry, 'queue_log_record',
228 hqe_queue_log_record_stub)
showard56193bb2008-08-13 20:07:41 +0000229
230
231 def _record_job_scheduled(self, job_id, host_id):
232 record = (job_id, host_id)
233 self.assert_(record not in self._jobs_scheduled,
234 'Job %d scheduled on host %d twice' %
235 (job_id, host_id))
236 self._jobs_scheduled.append(record)
237
238
239 def _assert_job_scheduled_on(self, job_id, host_id):
240 record = (job_id, host_id)
241 self.assert_(record in self._jobs_scheduled,
242 'Job %d not scheduled on host %d as expected\n'
243 'Jobs scheduled: %s' %
244 (job_id, host_id, self._jobs_scheduled))
245 self._jobs_scheduled.remove(record)
246
247
showard89f84db2009-03-12 20:39:13 +0000248 def _assert_job_scheduled_on_number_of(self, job_id, host_ids, number):
249 """Assert job was scheduled on exactly number hosts out of a set."""
250 found = []
251 for host_id in host_ids:
252 record = (job_id, host_id)
253 if record in self._jobs_scheduled:
254 found.append(record)
255 self._jobs_scheduled.remove(record)
256 if len(found) < number:
257 self.fail('Job %d scheduled on fewer than %d hosts in %s.\n'
258 'Jobs scheduled: %s' % (job_id, number, host_ids, found))
259 elif len(found) > number:
260 self.fail('Job %d scheduled on more than %d hosts in %s.\n'
261 'Jobs scheduled: %s' % (job_id, number, host_ids, found))
262
263
showard56193bb2008-08-13 20:07:41 +0000264 def _check_for_extra_schedulings(self):
265 if len(self._jobs_scheduled) != 0:
266 self.fail('Extra jobs scheduled: ' +
267 str(self._jobs_scheduled))
268
269
jadmanski0afbb632008-06-06 21:10:57 +0000270 def _convert_jobs_to_metahosts(self, *job_ids):
271 sql_tuple = '(' + ','.join(str(i) for i in job_ids) + ')'
showardeab66ce2009-12-23 00:03:56 +0000272 self._do_query('UPDATE afe_host_queue_entries SET '
jadmanski0afbb632008-06-06 21:10:57 +0000273 'meta_host=host_id, host_id=NULL '
274 'WHERE job_id IN ' + sql_tuple)
showardce38e0c2008-05-29 19:36:16 +0000275
276
jadmanski0afbb632008-06-06 21:10:57 +0000277 def _lock_host(self, host_id):
showardeab66ce2009-12-23 00:03:56 +0000278 self._do_query('UPDATE afe_hosts SET locked=1 WHERE id=' +
jadmanski0afbb632008-06-06 21:10:57 +0000279 str(host_id))
showardce38e0c2008-05-29 19:36:16 +0000280
281
jadmanski0afbb632008-06-06 21:10:57 +0000282 def setUp(self):
showard56193bb2008-08-13 20:07:41 +0000283 super(DispatcherSchedulingTest, self).setUp()
jadmanski0afbb632008-06-06 21:10:57 +0000284 self._jobs_scheduled = []
showardce38e0c2008-05-29 19:36:16 +0000285
286
jadmanski0afbb632008-06-06 21:10:57 +0000287 def _test_basic_scheduling_helper(self, use_metahosts):
288 'Basic nonmetahost scheduling'
289 self._create_job_simple([1], use_metahosts)
290 self._create_job_simple([2], use_metahosts)
291 self._dispatcher._schedule_new_jobs()
292 self._assert_job_scheduled_on(1, 1)
293 self._assert_job_scheduled_on(2, 2)
294 self._check_for_extra_schedulings()
showardce38e0c2008-05-29 19:36:16 +0000295
296
jadmanski0afbb632008-06-06 21:10:57 +0000297 def _test_priorities_helper(self, use_metahosts):
298 'Test prioritization ordering'
299 self._create_job_simple([1], use_metahosts)
300 self._create_job_simple([2], use_metahosts)
301 self._create_job_simple([1,2], use_metahosts)
302 self._create_job_simple([1], use_metahosts, priority=1)
303 self._dispatcher._schedule_new_jobs()
304 self._assert_job_scheduled_on(4, 1) # higher priority
305 self._assert_job_scheduled_on(2, 2) # earlier job over later
306 self._check_for_extra_schedulings()
showardce38e0c2008-05-29 19:36:16 +0000307
308
jadmanski0afbb632008-06-06 21:10:57 +0000309 def _test_hosts_ready_helper(self, use_metahosts):
310 """
311 Only hosts that are status=Ready, unlocked and not invalid get
312 scheduled.
313 """
314 self._create_job_simple([1], use_metahosts)
showardeab66ce2009-12-23 00:03:56 +0000315 self._do_query('UPDATE afe_hosts SET status="Running" WHERE id=1')
jadmanski0afbb632008-06-06 21:10:57 +0000316 self._dispatcher._schedule_new_jobs()
317 self._check_for_extra_schedulings()
showardce38e0c2008-05-29 19:36:16 +0000318
showardeab66ce2009-12-23 00:03:56 +0000319 self._do_query('UPDATE afe_hosts SET status="Ready", locked=1 '
jadmanski0afbb632008-06-06 21:10:57 +0000320 'WHERE id=1')
321 self._dispatcher._schedule_new_jobs()
322 self._check_for_extra_schedulings()
showardce38e0c2008-05-29 19:36:16 +0000323
showardeab66ce2009-12-23 00:03:56 +0000324 self._do_query('UPDATE afe_hosts SET locked=0, invalid=1 '
jadmanski0afbb632008-06-06 21:10:57 +0000325 'WHERE id=1')
326 self._dispatcher._schedule_new_jobs()
showard5df2b192008-07-03 19:51:57 +0000327 if not use_metahosts:
328 self._assert_job_scheduled_on(1, 1)
jadmanski0afbb632008-06-06 21:10:57 +0000329 self._check_for_extra_schedulings()
showardce38e0c2008-05-29 19:36:16 +0000330
331
jadmanski0afbb632008-06-06 21:10:57 +0000332 def _test_hosts_idle_helper(self, use_metahosts):
333 'Only idle hosts get scheduled'
showard2bab8f42008-11-12 18:15:22 +0000334 self._create_job(hosts=[1], active=True)
jadmanski0afbb632008-06-06 21:10:57 +0000335 self._create_job_simple([1], use_metahosts)
336 self._dispatcher._schedule_new_jobs()
337 self._check_for_extra_schedulings()
showardce38e0c2008-05-29 19:36:16 +0000338
339
showard63a34772008-08-18 19:32:50 +0000340 def _test_obey_ACLs_helper(self, use_metahosts):
showardeab66ce2009-12-23 00:03:56 +0000341 self._do_query('DELETE FROM afe_acl_groups_hosts WHERE host_id=1')
showard63a34772008-08-18 19:32:50 +0000342 self._create_job_simple([1], use_metahosts)
343 self._dispatcher._schedule_new_jobs()
344 self._check_for_extra_schedulings()
345
346
jadmanski0afbb632008-06-06 21:10:57 +0000347 def test_basic_scheduling(self):
348 self._test_basic_scheduling_helper(False)
showardce38e0c2008-05-29 19:36:16 +0000349
350
jadmanski0afbb632008-06-06 21:10:57 +0000351 def test_priorities(self):
352 self._test_priorities_helper(False)
showardce38e0c2008-05-29 19:36:16 +0000353
354
jadmanski0afbb632008-06-06 21:10:57 +0000355 def test_hosts_ready(self):
356 self._test_hosts_ready_helper(False)
showardce38e0c2008-05-29 19:36:16 +0000357
358
jadmanski0afbb632008-06-06 21:10:57 +0000359 def test_hosts_idle(self):
360 self._test_hosts_idle_helper(False)
showardce38e0c2008-05-29 19:36:16 +0000361
362
showard63a34772008-08-18 19:32:50 +0000363 def test_obey_ACLs(self):
364 self._test_obey_ACLs_helper(False)
365
366
showard2924b0a2009-06-18 23:16:15 +0000367 def test_one_time_hosts_ignore_ACLs(self):
showardeab66ce2009-12-23 00:03:56 +0000368 self._do_query('DELETE FROM afe_acl_groups_hosts WHERE host_id=1')
369 self._do_query('UPDATE afe_hosts SET invalid=1 WHERE id=1')
showard2924b0a2009-06-18 23:16:15 +0000370 self._create_job_simple([1])
371 self._dispatcher._schedule_new_jobs()
372 self._assert_job_scheduled_on(1, 1)
373 self._check_for_extra_schedulings()
374
375
showard63a34772008-08-18 19:32:50 +0000376 def test_non_metahost_on_invalid_host(self):
377 """
378 Non-metahost entries can get scheduled on invalid hosts (this is how
379 one-time hosts work).
380 """
showardeab66ce2009-12-23 00:03:56 +0000381 self._do_query('UPDATE afe_hosts SET invalid=1')
showard63a34772008-08-18 19:32:50 +0000382 self._test_basic_scheduling_helper(False)
383
384
jadmanski0afbb632008-06-06 21:10:57 +0000385 def test_metahost_scheduling(self):
showard63a34772008-08-18 19:32:50 +0000386 """
387 Basic metahost scheduling
388 """
jadmanski0afbb632008-06-06 21:10:57 +0000389 self._test_basic_scheduling_helper(True)
showardce38e0c2008-05-29 19:36:16 +0000390
391
jadmanski0afbb632008-06-06 21:10:57 +0000392 def test_metahost_priorities(self):
393 self._test_priorities_helper(True)
showardce38e0c2008-05-29 19:36:16 +0000394
395
jadmanski0afbb632008-06-06 21:10:57 +0000396 def test_metahost_hosts_ready(self):
397 self._test_hosts_ready_helper(True)
showardce38e0c2008-05-29 19:36:16 +0000398
399
jadmanski0afbb632008-06-06 21:10:57 +0000400 def test_metahost_hosts_idle(self):
401 self._test_hosts_idle_helper(True)
showardce38e0c2008-05-29 19:36:16 +0000402
403
showard63a34772008-08-18 19:32:50 +0000404 def test_metahost_obey_ACLs(self):
405 self._test_obey_ACLs_helper(True)
406
407
showard89f84db2009-03-12 20:39:13 +0000408 def _setup_test_only_if_needed_labels(self):
showardade14e22009-01-26 22:38:32 +0000409 # apply only_if_needed label3 to host1
showard89f84db2009-03-12 20:39:13 +0000410 models.Host.smart_get('host1').labels.add(self.label3)
411 return self._create_job_simple([1], use_metahost=True)
showardade14e22009-01-26 22:38:32 +0000412
showard89f84db2009-03-12 20:39:13 +0000413
414 def test_only_if_needed_labels_avoids_host(self):
415 job = self._setup_test_only_if_needed_labels()
showardade14e22009-01-26 22:38:32 +0000416 # if the job doesn't depend on label3, there should be no scheduling
417 self._dispatcher._schedule_new_jobs()
418 self._check_for_extra_schedulings()
419
showard89f84db2009-03-12 20:39:13 +0000420
421 def test_only_if_needed_labels_schedules(self):
422 job = self._setup_test_only_if_needed_labels()
423 job.dependency_labels.add(self.label3)
showardade14e22009-01-26 22:38:32 +0000424 self._dispatcher._schedule_new_jobs()
425 self._assert_job_scheduled_on(1, 1)
426 self._check_for_extra_schedulings()
427
showard89f84db2009-03-12 20:39:13 +0000428
429 def test_only_if_needed_labels_via_metahost(self):
430 job = self._setup_test_only_if_needed_labels()
431 job.dependency_labels.add(self.label3)
showardade14e22009-01-26 22:38:32 +0000432 # should also work if the metahost is the only_if_needed label
showardeab66ce2009-12-23 00:03:56 +0000433 self._do_query('DELETE FROM afe_jobs_dependency_labels')
showardade14e22009-01-26 22:38:32 +0000434 self._create_job(metahosts=[3])
435 self._dispatcher._schedule_new_jobs()
436 self._assert_job_scheduled_on(2, 1)
437 self._check_for_extra_schedulings()
showard989f25d2008-10-01 11:38:11 +0000438
439
jadmanski0afbb632008-06-06 21:10:57 +0000440 def test_nonmetahost_over_metahost(self):
441 """
442 Non-metahost entries should take priority over metahost entries
443 for the same host
444 """
445 self._create_job(metahosts=[1])
446 self._create_job(hosts=[1])
447 self._dispatcher._schedule_new_jobs()
448 self._assert_job_scheduled_on(2, 1)
449 self._check_for_extra_schedulings()
showardce38e0c2008-05-29 19:36:16 +0000450
451
jadmanski0afbb632008-06-06 21:10:57 +0000452 def test_metahosts_obey_blocks(self):
453 """
454 Metahosts can't get scheduled on hosts already scheduled for
455 that job.
456 """
457 self._create_job(metahosts=[1], hosts=[1])
458 # make the nonmetahost entry complete, so the metahost can try
459 # to get scheduled
showard56193bb2008-08-13 20:07:41 +0000460 self._update_hqe(set='complete = 1', where='host_id=1')
jadmanski0afbb632008-06-06 21:10:57 +0000461 self._dispatcher._schedule_new_jobs()
462 self._check_for_extra_schedulings()
showardce38e0c2008-05-29 19:36:16 +0000463
464
showard89f84db2009-03-12 20:39:13 +0000465 # TODO(gps): These should probably live in their own TestCase class
466 # specific to testing HostScheduler methods directly. It was convenient
467 # to put it here for now to share existing test environment setup code.
468 def test_HostScheduler_check_atomic_group_labels(self):
469 normal_job = self._create_job(metahosts=[0])
470 atomic_job = self._create_job(atomic_group=1)
471 # Indirectly initialize the internal state of the host scheduler.
472 self._dispatcher._refresh_pending_queue_entries()
473
showard6157c632009-07-06 20:19:31 +0000474 atomic_hqe = monitor_db.HostQueueEntry.fetch(where='job_id=%d' %
showard8cc058f2009-09-08 16:26:33 +0000475 atomic_job.id)[0]
showard6157c632009-07-06 20:19:31 +0000476 normal_hqe = monitor_db.HostQueueEntry.fetch(where='job_id=%d' %
showard8cc058f2009-09-08 16:26:33 +0000477 normal_job.id)[0]
showard89f84db2009-03-12 20:39:13 +0000478
479 host_scheduler = self._dispatcher._host_scheduler
480 self.assertTrue(host_scheduler._check_atomic_group_labels(
481 [self.label4.id], atomic_hqe))
482 self.assertFalse(host_scheduler._check_atomic_group_labels(
483 [self.label4.id], normal_hqe))
484 self.assertFalse(host_scheduler._check_atomic_group_labels(
485 [self.label5.id, self.label6.id, self.label7.id], normal_hqe))
486 self.assertTrue(host_scheduler._check_atomic_group_labels(
487 [self.label4.id, self.label6.id], atomic_hqe))
showard6157c632009-07-06 20:19:31 +0000488 self.assertTrue(host_scheduler._check_atomic_group_labels(
489 [self.label4.id, self.label5.id],
490 atomic_hqe))
showard89f84db2009-03-12 20:39:13 +0000491
492
493 def test_HostScheduler_get_host_atomic_group_id(self):
showard6157c632009-07-06 20:19:31 +0000494 job = self._create_job(metahosts=[self.label6.id])
495 queue_entry = monitor_db.HostQueueEntry.fetch(
showard8cc058f2009-09-08 16:26:33 +0000496 where='job_id=%d' % job.id)[0]
showard89f84db2009-03-12 20:39:13 +0000497 # Indirectly initialize the internal state of the host scheduler.
498 self._dispatcher._refresh_pending_queue_entries()
499
500 # Test the host scheduler
501 host_scheduler = self._dispatcher._host_scheduler
showard6157c632009-07-06 20:19:31 +0000502
503 # Two labels each in a different atomic group. This should log an
504 # error and continue.
505 orig_logging_error = logging.error
506 def mock_logging_error(message, *args):
507 mock_logging_error._num_calls += 1
508 # Test the logging call itself, we just wrapped it to count it.
509 orig_logging_error(message, *args)
510 mock_logging_error._num_calls = 0
511 self.god.stub_with(logging, 'error', mock_logging_error)
512 self.assertNotEquals(None, host_scheduler._get_host_atomic_group_id(
513 [self.label4.id, self.label8.id], queue_entry))
514 self.assertTrue(mock_logging_error._num_calls > 0)
515 self.god.unstub(logging, 'error')
516
517 # Two labels both in the same atomic group, this should not raise an
518 # error, it will merely cause the job to schedule on the intersection.
519 self.assertEquals(1, host_scheduler._get_host_atomic_group_id(
520 [self.label4.id, self.label5.id]))
521
522 self.assertEquals(None, host_scheduler._get_host_atomic_group_id([]))
523 self.assertEquals(None, host_scheduler._get_host_atomic_group_id(
showard89f84db2009-03-12 20:39:13 +0000524 [self.label3.id, self.label7.id, self.label6.id]))
showard6157c632009-07-06 20:19:31 +0000525 self.assertEquals(1, host_scheduler._get_host_atomic_group_id(
showard89f84db2009-03-12 20:39:13 +0000526 [self.label4.id, self.label7.id, self.label6.id]))
showard6157c632009-07-06 20:19:31 +0000527 self.assertEquals(1, host_scheduler._get_host_atomic_group_id(
showard89f84db2009-03-12 20:39:13 +0000528 [self.label7.id, self.label5.id]))
529
530
531 def test_atomic_group_hosts_blocked_from_non_atomic_jobs(self):
532 # Create a job scheduled to run on label6.
533 self._create_job(metahosts=[self.label6.id])
534 self._dispatcher._schedule_new_jobs()
535 # label6 only has hosts that are in atomic groups associated with it,
536 # there should be no scheduling.
537 self._check_for_extra_schedulings()
538
539
540 def test_atomic_group_hosts_blocked_from_non_atomic_jobs_explicit(self):
541 # Create a job scheduled to run on label5. This is an atomic group
542 # label but this job does not request atomic group scheduling.
543 self._create_job(metahosts=[self.label5.id])
544 self._dispatcher._schedule_new_jobs()
545 # label6 only has hosts that are in atomic groups associated with it,
546 # there should be no scheduling.
547 self._check_for_extra_schedulings()
548
549
550 def test_atomic_group_scheduling_basics(self):
551 # Create jobs scheduled to run on an atomic group.
552 job_a = self._create_job(synchronous=True, metahosts=[self.label4.id],
553 atomic_group=1)
554 job_b = self._create_job(synchronous=True, metahosts=[self.label5.id],
555 atomic_group=1)
556 self._dispatcher._schedule_new_jobs()
557 # atomic_group.max_number_of_machines was 2 so we should run on 2.
558 self._assert_job_scheduled_on_number_of(job_a.id, (5, 6, 7), 2)
559 self._assert_job_scheduled_on(job_b.id, 8) # label5
560 self._assert_job_scheduled_on(job_b.id, 9) # label5
561 self._check_for_extra_schedulings()
562
563 # The three host label4 atomic group still has one host available.
564 # That means a job with a synch_count of 1 asking to be scheduled on
565 # the atomic group can still use the final machine.
566 #
567 # This may seem like a somewhat odd use case. It allows the use of an
568 # atomic group as a set of machines to run smaller jobs within (a set
569 # of hosts configured for use in network tests with eachother perhaps?)
570 onehost_job = self._create_job(atomic_group=1)
571 self._dispatcher._schedule_new_jobs()
572 self._assert_job_scheduled_on_number_of(onehost_job.id, (5, 6, 7), 1)
573 self._check_for_extra_schedulings()
574
575 # No more atomic groups have hosts available, no more jobs should
576 # be scheduled.
577 self._create_job(atomic_group=1)
578 self._dispatcher._schedule_new_jobs()
579 self._check_for_extra_schedulings()
580
581
582 def test_atomic_group_scheduling_obeys_acls(self):
583 # Request scheduling on a specific atomic label but be denied by ACLs.
showardeab66ce2009-12-23 00:03:56 +0000584 self._do_query('DELETE FROM afe_acl_groups_hosts '
585 'WHERE host_id in (8,9)')
showard89f84db2009-03-12 20:39:13 +0000586 job = self._create_job(metahosts=[self.label5.id], atomic_group=1)
587 self._dispatcher._schedule_new_jobs()
588 self._check_for_extra_schedulings()
589
590
591 def test_atomic_group_scheduling_dependency_label_exclude(self):
592 # A dependency label that matches no hosts in the atomic group.
593 job_a = self._create_job(atomic_group=1)
594 job_a.dependency_labels.add(self.label3)
595 self._dispatcher._schedule_new_jobs()
596 self._check_for_extra_schedulings()
597
598
599 def test_atomic_group_scheduling_metahost_dependency_label_exclude(self):
600 # A metahost and dependency label that excludes too many hosts.
601 job_b = self._create_job(synchronous=True, metahosts=[self.label4.id],
602 atomic_group=1)
603 job_b.dependency_labels.add(self.label7)
604 self._dispatcher._schedule_new_jobs()
605 self._check_for_extra_schedulings()
606
607
608 def test_atomic_group_scheduling_dependency_label_match(self):
609 # A dependency label that exists on enough atomic group hosts in only
610 # one of the two atomic group labels.
611 job_c = self._create_job(synchronous=True, atomic_group=1)
612 job_c.dependency_labels.add(self.label7)
613 self._dispatcher._schedule_new_jobs()
614 self._assert_job_scheduled_on_number_of(job_c.id, (8, 9), 2)
615 self._check_for_extra_schedulings()
616
617
618 def test_atomic_group_scheduling_no_metahost(self):
619 # Force it to schedule on the other group for a reliable test.
showardeab66ce2009-12-23 00:03:56 +0000620 self._do_query('UPDATE afe_hosts SET invalid=1 WHERE id=9')
showard89f84db2009-03-12 20:39:13 +0000621 # An atomic job without a metahost.
622 job = self._create_job(synchronous=True, atomic_group=1)
623 self._dispatcher._schedule_new_jobs()
624 self._assert_job_scheduled_on_number_of(job.id, (5, 6, 7), 2)
625 self._check_for_extra_schedulings()
626
627
628 def test_atomic_group_scheduling_partial_group(self):
629 # Make one host in labels[3] unavailable so that there are only two
630 # hosts left in the group.
showardeab66ce2009-12-23 00:03:56 +0000631 self._do_query('UPDATE afe_hosts SET status="Repair Failed" WHERE id=5')
showard89f84db2009-03-12 20:39:13 +0000632 job = self._create_job(synchronous=True, metahosts=[self.label4.id],
633 atomic_group=1)
634 self._dispatcher._schedule_new_jobs()
635 # Verify that it was scheduled on the 2 ready hosts in that group.
636 self._assert_job_scheduled_on(job.id, 6)
637 self._assert_job_scheduled_on(job.id, 7)
638 self._check_for_extra_schedulings()
639
640
641 def test_atomic_group_scheduling_not_enough_available(self):
642 # Mark some hosts in each atomic group label as not usable.
643 # One host running, another invalid in the first group label.
showardeab66ce2009-12-23 00:03:56 +0000644 self._do_query('UPDATE afe_hosts SET status="Running" WHERE id=5')
645 self._do_query('UPDATE afe_hosts SET invalid=1 WHERE id=6')
showard89f84db2009-03-12 20:39:13 +0000646 # One host invalid in the second group label.
showardeab66ce2009-12-23 00:03:56 +0000647 self._do_query('UPDATE afe_hosts SET invalid=1 WHERE id=9')
showard89f84db2009-03-12 20:39:13 +0000648 # Nothing to schedule when no group label has enough (2) good hosts..
649 self._create_job(atomic_group=1, synchronous=True)
650 self._dispatcher._schedule_new_jobs()
651 # There are not enough hosts in either atomic group,
652 # No more scheduling should occur.
653 self._check_for_extra_schedulings()
654
655 # Now create an atomic job that has a synch count of 1. It should
656 # schedule on exactly one of the hosts.
657 onehost_job = self._create_job(atomic_group=1)
658 self._dispatcher._schedule_new_jobs()
659 self._assert_job_scheduled_on_number_of(onehost_job.id, (7, 8), 1)
660
661
662 def test_atomic_group_scheduling_no_valid_hosts(self):
showardeab66ce2009-12-23 00:03:56 +0000663 self._do_query('UPDATE afe_hosts SET invalid=1 WHERE id in (8,9)')
showard89f84db2009-03-12 20:39:13 +0000664 self._create_job(synchronous=True, metahosts=[self.label5.id],
665 atomic_group=1)
666 self._dispatcher._schedule_new_jobs()
667 # no hosts in the selected group and label are valid. no schedulings.
668 self._check_for_extra_schedulings()
669
670
671 def test_atomic_group_scheduling_metahost_works(self):
672 # Test that atomic group scheduling also obeys metahosts.
673 self._create_job(metahosts=[0], atomic_group=1)
674 self._dispatcher._schedule_new_jobs()
675 # There are no atomic group hosts that also have that metahost.
676 self._check_for_extra_schedulings()
677
678 job_b = self._create_job(metahosts=[self.label5.id], atomic_group=1)
679 self._dispatcher._schedule_new_jobs()
680 self._assert_job_scheduled_on(job_b.id, 8)
681 self._assert_job_scheduled_on(job_b.id, 9)
682 self._check_for_extra_schedulings()
683
684
685 def test_atomic_group_skips_ineligible_hosts(self):
686 # Test hosts marked ineligible for this job are not eligible.
687 # How would this ever happen anyways?
688 job = self._create_job(metahosts=[self.label4.id], atomic_group=1)
689 models.IneligibleHostQueue.objects.create(job=job, host_id=5)
690 models.IneligibleHostQueue.objects.create(job=job, host_id=6)
691 models.IneligibleHostQueue.objects.create(job=job, host_id=7)
692 self._dispatcher._schedule_new_jobs()
693 # No scheduling should occur as all desired hosts were ineligible.
694 self._check_for_extra_schedulings()
695
696
697 def test_atomic_group_scheduling_fail(self):
698 # If synch_count is > the atomic group number of machines, the job
699 # should be aborted immediately.
700 model_job = self._create_job(synchronous=True, atomic_group=1)
701 model_job.synch_count = 4
702 model_job.save()
703 job = monitor_db.Job(id=model_job.id)
704 self._dispatcher._schedule_new_jobs()
705 self._check_for_extra_schedulings()
706 queue_entries = job.get_host_queue_entries()
707 self.assertEqual(1, len(queue_entries))
708 self.assertEqual(queue_entries[0].status,
709 models.HostQueueEntry.Status.ABORTED)
710
711
showard205fd602009-03-21 00:17:35 +0000712 def test_atomic_group_no_labels_no_scheduling(self):
713 # Never schedule on atomic groups marked invalid.
714 job = self._create_job(metahosts=[self.label5.id], synchronous=True,
715 atomic_group=1)
716 # Deleting an atomic group via the frontend marks it invalid and
717 # removes all label references to the group. The job now references
718 # an invalid atomic group with no labels associated with it.
719 self.label5.atomic_group.invalid = True
720 self.label5.atomic_group.save()
721 self.label5.atomic_group = None
722 self.label5.save()
723
724 self._dispatcher._schedule_new_jobs()
725 self._check_for_extra_schedulings()
726
727
showard89f84db2009-03-12 20:39:13 +0000728 def test_schedule_directly_on_atomic_group_host_fail(self):
729 # Scheduling a job directly on hosts in an atomic group must
730 # fail to avoid users inadvertently holding up the use of an
731 # entire atomic group by using the machines individually.
732 job = self._create_job(hosts=[5])
733 self._dispatcher._schedule_new_jobs()
734 self._check_for_extra_schedulings()
735
736
737 def test_schedule_directly_on_atomic_group_host(self):
738 # Scheduling a job directly on one host in an atomic group will
739 # work when the atomic group is listed on the HQE in addition
740 # to the host (assuming the sync count is 1).
741 job = self._create_job(hosts=[5], atomic_group=1)
742 self._dispatcher._schedule_new_jobs()
743 self._assert_job_scheduled_on(job.id, 5)
744 self._check_for_extra_schedulings()
745
746
747 def test_schedule_directly_on_atomic_group_hosts_sync2(self):
748 job = self._create_job(hosts=[5,8], atomic_group=1, synchronous=True)
749 self._dispatcher._schedule_new_jobs()
750 self._assert_job_scheduled_on(job.id, 5)
751 self._assert_job_scheduled_on(job.id, 8)
752 self._check_for_extra_schedulings()
753
754
755 def test_schedule_directly_on_atomic_group_hosts_wrong_group(self):
756 job = self._create_job(hosts=[5,8], atomic_group=2, synchronous=True)
757 self._dispatcher._schedule_new_jobs()
758 self._check_for_extra_schedulings()
759
760
showard56193bb2008-08-13 20:07:41 +0000761 def test_only_schedule_queued_entries(self):
762 self._create_job(metahosts=[1])
763 self._update_hqe(set='active=1, host_id=2')
764 self._dispatcher._schedule_new_jobs()
765 self._check_for_extra_schedulings()
766
767
showardfa8629c2008-11-04 16:51:23 +0000768 def test_no_ready_hosts(self):
769 self._create_job(hosts=[1])
showardeab66ce2009-12-23 00:03:56 +0000770 self._do_query('UPDATE afe_hosts SET status="Repair Failed"')
showardfa8629c2008-11-04 16:51:23 +0000771 self._dispatcher._schedule_new_jobs()
772 self._check_for_extra_schedulings()
773
774
showardf13a9e22009-12-18 22:54:09 +0000775 def test_garbage_collection(self):
776 self.god.stub_with(self._dispatcher, '_seconds_between_garbage_stats',
777 999999)
778 self.god.stub_function(gc, 'collect')
779 self.god.stub_function(gc_stats, '_log_garbage_collector_stats')
780 gc.collect.expect_call().and_return(0)
781 gc_stats._log_garbage_collector_stats.expect_call()
782 # Force a garbage collection run
783 self._dispatcher._last_garbage_stats_time = 0
784 self._dispatcher._garbage_collection()
785 # The previous call should have reset the time, it won't do anything
786 # the second time. If it does, we'll get an unexpected call.
787 self._dispatcher._garbage_collection()
788
789
790
showardb2e2c322008-10-14 17:33:55 +0000791class DispatcherThrottlingTest(BaseSchedulerTest):
showard4c5374f2008-09-04 17:02:56 +0000792 """
793 Test that the dispatcher throttles:
794 * total number of running processes
795 * number of processes started per cycle
796 """
797 _MAX_RUNNING = 3
798 _MAX_STARTED = 2
799
800 def setUp(self):
801 super(DispatcherThrottlingTest, self).setUp()
showard324bf812009-01-20 23:23:38 +0000802 scheduler_config.config.max_processes_per_drone = self._MAX_RUNNING
showardd1ee1dd2009-01-07 21:33:08 +0000803 scheduler_config.config.max_processes_started_per_cycle = (
804 self._MAX_STARTED)
showard4c5374f2008-09-04 17:02:56 +0000805
showard9bb960b2009-11-19 01:02:11 +0000806 def fake_max_runnable_processes(fake_self, username):
showardd1195652009-12-08 22:21:02 +0000807 running = sum(agent.task.num_processes
showard324bf812009-01-20 23:23:38 +0000808 for agent in self._agents
showard8cc058f2009-09-08 16:26:33 +0000809 if agent.started and not agent.is_done())
showard324bf812009-01-20 23:23:38 +0000810 return self._MAX_RUNNING - running
811 self.god.stub_with(drone_manager.DroneManager, 'max_runnable_processes',
812 fake_max_runnable_processes)
showard2fa51692009-01-13 23:48:08 +0000813
showard4c5374f2008-09-04 17:02:56 +0000814
showard4c5374f2008-09-04 17:02:56 +0000815 def _setup_some_agents(self, num_agents):
showard170873e2009-01-07 00:22:26 +0000816 self._agents = [DummyAgent() for i in xrange(num_agents)]
showard4c5374f2008-09-04 17:02:56 +0000817 self._dispatcher._agents = list(self._agents)
818
819
820 def _run_a_few_cycles(self):
821 for i in xrange(4):
822 self._dispatcher._handle_agents()
823
824
825 def _assert_agents_started(self, indexes, is_started=True):
826 for i in indexes:
showard8cc058f2009-09-08 16:26:33 +0000827 self.assert_(self._agents[i].started == is_started,
showard4c5374f2008-09-04 17:02:56 +0000828 'Agent %d %sstarted' %
829 (i, is_started and 'not ' or ''))
830
831
832 def _assert_agents_not_started(self, indexes):
833 self._assert_agents_started(indexes, False)
834
835
836 def test_throttle_total(self):
837 self._setup_some_agents(4)
838 self._run_a_few_cycles()
839 self._assert_agents_started([0, 1, 2])
840 self._assert_agents_not_started([3])
841
842
843 def test_throttle_per_cycle(self):
844 self._setup_some_agents(3)
845 self._dispatcher._handle_agents()
846 self._assert_agents_started([0, 1])
847 self._assert_agents_not_started([2])
848
849
850 def test_throttle_with_synchronous(self):
851 self._setup_some_agents(2)
showardd1195652009-12-08 22:21:02 +0000852 self._agents[0].task.num_processes = 3
showard4c5374f2008-09-04 17:02:56 +0000853 self._run_a_few_cycles()
854 self._assert_agents_started([0])
855 self._assert_agents_not_started([1])
856
857
858 def test_large_agent_starvation(self):
859 """
860 Ensure large agents don't get starved by lower-priority agents.
861 """
862 self._setup_some_agents(3)
showardd1195652009-12-08 22:21:02 +0000863 self._agents[1].task.num_processes = 3
showard4c5374f2008-09-04 17:02:56 +0000864 self._run_a_few_cycles()
865 self._assert_agents_started([0])
866 self._assert_agents_not_started([1, 2])
867
868 self._agents[0].set_done(True)
869 self._run_a_few_cycles()
870 self._assert_agents_started([1])
871 self._assert_agents_not_started([2])
872
873
874 def test_zero_process_agent(self):
875 self._setup_some_agents(5)
showardd1195652009-12-08 22:21:02 +0000876 self._agents[4].task.num_processes = 0
showard4c5374f2008-09-04 17:02:56 +0000877 self._run_a_few_cycles()
878 self._assert_agents_started([0, 1, 2, 4])
879 self._assert_agents_not_started([3])
880
881
jadmanski3d161b02008-06-06 15:43:36 +0000882class PidfileRunMonitorTest(unittest.TestCase):
showard170873e2009-01-07 00:22:26 +0000883 execution_tag = 'test_tag'
jadmanski0afbb632008-06-06 21:10:57 +0000884 pid = 12345
showard170873e2009-01-07 00:22:26 +0000885 process = drone_manager.Process('myhost', pid)
showard21baa452008-10-21 00:08:39 +0000886 num_tests_failed = 1
jadmanski3d161b02008-06-06 15:43:36 +0000887
jadmanski0afbb632008-06-06 21:10:57 +0000888 def setUp(self):
889 self.god = mock.mock_god()
showard170873e2009-01-07 00:22:26 +0000890 self.mock_drone_manager = self.god.create_mock_class(
891 drone_manager.DroneManager, 'drone_manager')
892 self.god.stub_with(monitor_db, '_drone_manager',
893 self.mock_drone_manager)
894 self.god.stub_function(email_manager.manager, 'enqueue_notify_email')
showardec6a3b92009-09-25 20:29:13 +0000895 self.god.stub_with(monitor_db, '_get_pidfile_timeout_secs',
896 self._mock_get_pidfile_timeout_secs)
showard170873e2009-01-07 00:22:26 +0000897
898 self.pidfile_id = object()
899
showardd3dc1992009-04-22 21:01:40 +0000900 (self.mock_drone_manager.get_pidfile_id_from
901 .expect_call(self.execution_tag,
902 pidfile_name=monitor_db._AUTOSERV_PID_FILE)
903 .and_return(self.pidfile_id))
showard170873e2009-01-07 00:22:26 +0000904
905 self.monitor = monitor_db.PidfileRunMonitor()
906 self.monitor.attach_to_existing_process(self.execution_tag)
jadmanski3d161b02008-06-06 15:43:36 +0000907
jadmanski0afbb632008-06-06 21:10:57 +0000908 def tearDown(self):
909 self.god.unstub_all()
jadmanski3d161b02008-06-06 15:43:36 +0000910
911
showardec6a3b92009-09-25 20:29:13 +0000912 def _mock_get_pidfile_timeout_secs(self):
913 return 300
914
915
showard170873e2009-01-07 00:22:26 +0000916 def setup_pidfile(self, pid=None, exit_code=None, tests_failed=None,
917 use_second_read=False):
918 contents = drone_manager.PidfileContents()
919 if pid is not None:
920 contents.process = drone_manager.Process('myhost', pid)
921 contents.exit_status = exit_code
922 contents.num_tests_failed = tests_failed
923 self.mock_drone_manager.get_pidfile_contents.expect_call(
924 self.pidfile_id, use_second_read=use_second_read).and_return(
925 contents)
926
927
jadmanski0afbb632008-06-06 21:10:57 +0000928 def set_not_yet_run(self):
showard170873e2009-01-07 00:22:26 +0000929 self.setup_pidfile()
jadmanski3d161b02008-06-06 15:43:36 +0000930
931
showard3dd6b882008-10-27 19:21:39 +0000932 def set_empty_pidfile(self):
showard170873e2009-01-07 00:22:26 +0000933 self.setup_pidfile()
showard3dd6b882008-10-27 19:21:39 +0000934
935
showard170873e2009-01-07 00:22:26 +0000936 def set_running(self, use_second_read=False):
937 self.setup_pidfile(self.pid, use_second_read=use_second_read)
jadmanski3d161b02008-06-06 15:43:36 +0000938
939
showard170873e2009-01-07 00:22:26 +0000940 def set_complete(self, error_code, use_second_read=False):
941 self.setup_pidfile(self.pid, error_code, self.num_tests_failed,
942 use_second_read=use_second_read)
943
944
945 def _check_monitor(self, expected_pid, expected_exit_status,
946 expected_num_tests_failed):
947 if expected_pid is None:
948 self.assertEquals(self.monitor._state.process, None)
949 else:
950 self.assertEquals(self.monitor._state.process.pid, expected_pid)
951 self.assertEquals(self.monitor._state.exit_status, expected_exit_status)
952 self.assertEquals(self.monitor._state.num_tests_failed,
953 expected_num_tests_failed)
954
955
956 self.god.check_playback()
jadmanski3d161b02008-06-06 15:43:36 +0000957
958
showard21baa452008-10-21 00:08:39 +0000959 def _test_read_pidfile_helper(self, expected_pid, expected_exit_status,
960 expected_num_tests_failed):
961 self.monitor._read_pidfile()
showard170873e2009-01-07 00:22:26 +0000962 self._check_monitor(expected_pid, expected_exit_status,
963 expected_num_tests_failed)
jadmanski3d161b02008-06-06 15:43:36 +0000964
965
showard21baa452008-10-21 00:08:39 +0000966 def _get_expected_tests_failed(self, expected_exit_status):
967 if expected_exit_status is None:
968 expected_tests_failed = None
969 else:
970 expected_tests_failed = self.num_tests_failed
971 return expected_tests_failed
972
973
jadmanski0afbb632008-06-06 21:10:57 +0000974 def test_read_pidfile(self):
975 self.set_not_yet_run()
showard21baa452008-10-21 00:08:39 +0000976 self._test_read_pidfile_helper(None, None, None)
jadmanski3d161b02008-06-06 15:43:36 +0000977
showard3dd6b882008-10-27 19:21:39 +0000978 self.set_empty_pidfile()
979 self._test_read_pidfile_helper(None, None, None)
980
jadmanski0afbb632008-06-06 21:10:57 +0000981 self.set_running()
showard21baa452008-10-21 00:08:39 +0000982 self._test_read_pidfile_helper(self.pid, None, None)
jadmanski3d161b02008-06-06 15:43:36 +0000983
jadmanski0afbb632008-06-06 21:10:57 +0000984 self.set_complete(123)
showard21baa452008-10-21 00:08:39 +0000985 self._test_read_pidfile_helper(self.pid, 123, self.num_tests_failed)
jadmanski3d161b02008-06-06 15:43:36 +0000986
987
jadmanski0afbb632008-06-06 21:10:57 +0000988 def test_read_pidfile_error(self):
showard170873e2009-01-07 00:22:26 +0000989 self.mock_drone_manager.get_pidfile_contents.expect_call(
990 self.pidfile_id, use_second_read=False).and_return(
991 drone_manager.InvalidPidfile('error'))
992 self.assertRaises(monitor_db.PidfileRunMonitor._PidfileException,
showard21baa452008-10-21 00:08:39 +0000993 self.monitor._read_pidfile)
jadmanski0afbb632008-06-06 21:10:57 +0000994 self.god.check_playback()
jadmanski3d161b02008-06-06 15:43:36 +0000995
996
showard170873e2009-01-07 00:22:26 +0000997 def setup_is_running(self, is_running):
998 self.mock_drone_manager.is_process_running.expect_call(
999 self.process).and_return(is_running)
jadmanski3d161b02008-06-06 15:43:36 +00001000
1001
showard21baa452008-10-21 00:08:39 +00001002 def _test_get_pidfile_info_helper(self, expected_pid, expected_exit_status,
1003 expected_num_tests_failed):
1004 self.monitor._get_pidfile_info()
showard170873e2009-01-07 00:22:26 +00001005 self._check_monitor(expected_pid, expected_exit_status,
1006 expected_num_tests_failed)
jadmanski3d161b02008-06-06 15:43:36 +00001007
1008
jadmanski0afbb632008-06-06 21:10:57 +00001009 def test_get_pidfile_info(self):
showard21baa452008-10-21 00:08:39 +00001010 """
1011 normal cases for get_pidfile_info
1012 """
jadmanski0afbb632008-06-06 21:10:57 +00001013 # running
1014 self.set_running()
showard170873e2009-01-07 00:22:26 +00001015 self.setup_is_running(True)
showard21baa452008-10-21 00:08:39 +00001016 self._test_get_pidfile_info_helper(self.pid, None, None)
jadmanski3d161b02008-06-06 15:43:36 +00001017
jadmanski0afbb632008-06-06 21:10:57 +00001018 # exited during check
1019 self.set_running()
showard170873e2009-01-07 00:22:26 +00001020 self.setup_is_running(False)
1021 self.set_complete(123, use_second_read=True) # pidfile gets read again
showard21baa452008-10-21 00:08:39 +00001022 self._test_get_pidfile_info_helper(self.pid, 123, self.num_tests_failed)
jadmanski3d161b02008-06-06 15:43:36 +00001023
jadmanski0afbb632008-06-06 21:10:57 +00001024 # completed
1025 self.set_complete(123)
showard21baa452008-10-21 00:08:39 +00001026 self._test_get_pidfile_info_helper(self.pid, 123, self.num_tests_failed)
jadmanski3d161b02008-06-06 15:43:36 +00001027
1028
jadmanski0afbb632008-06-06 21:10:57 +00001029 def test_get_pidfile_info_running_no_proc(self):
showard21baa452008-10-21 00:08:39 +00001030 """
1031 pidfile shows process running, but no proc exists
1032 """
jadmanski0afbb632008-06-06 21:10:57 +00001033 # running but no proc
1034 self.set_running()
showard170873e2009-01-07 00:22:26 +00001035 self.setup_is_running(False)
1036 self.set_running(use_second_read=True)
1037 email_manager.manager.enqueue_notify_email.expect_call(
jadmanski0afbb632008-06-06 21:10:57 +00001038 mock.is_string_comparator(), mock.is_string_comparator())
showard21baa452008-10-21 00:08:39 +00001039 self._test_get_pidfile_info_helper(self.pid, 1, 0)
jadmanski0afbb632008-06-06 21:10:57 +00001040 self.assertTrue(self.monitor.lost_process)
jadmanski3d161b02008-06-06 15:43:36 +00001041
1042
jadmanski0afbb632008-06-06 21:10:57 +00001043 def test_get_pidfile_info_not_yet_run(self):
showard21baa452008-10-21 00:08:39 +00001044 """
1045 pidfile hasn't been written yet
1046 """
jadmanski0afbb632008-06-06 21:10:57 +00001047 self.set_not_yet_run()
showard21baa452008-10-21 00:08:39 +00001048 self._test_get_pidfile_info_helper(None, None, None)
jadmanski3d161b02008-06-06 15:43:36 +00001049
jadmanski3d161b02008-06-06 15:43:36 +00001050
showard170873e2009-01-07 00:22:26 +00001051 def test_process_failed_to_write_pidfile(self):
jadmanski0afbb632008-06-06 21:10:57 +00001052 self.set_not_yet_run()
showard170873e2009-01-07 00:22:26 +00001053 email_manager.manager.enqueue_notify_email.expect_call(
1054 mock.is_string_comparator(), mock.is_string_comparator())
showardec6a3b92009-09-25 20:29:13 +00001055 self.monitor._start_time = (time.time() -
1056 monitor_db._get_pidfile_timeout_secs() - 1)
showard35162b02009-03-03 02:17:30 +00001057 self._test_get_pidfile_info_helper(None, 1, 0)
1058 self.assertTrue(self.monitor.lost_process)
jadmanski3d161b02008-06-06 15:43:36 +00001059
1060
1061class AgentTest(unittest.TestCase):
jadmanski0afbb632008-06-06 21:10:57 +00001062 def setUp(self):
1063 self.god = mock.mock_god()
showard6b733412009-04-27 20:09:18 +00001064 self._dispatcher = self.god.create_mock_class(monitor_db.Dispatcher,
1065 'dispatcher')
jadmanski3d161b02008-06-06 15:43:36 +00001066
1067
jadmanski0afbb632008-06-06 21:10:57 +00001068 def tearDown(self):
1069 self.god.unstub_all()
jadmanski3d161b02008-06-06 15:43:36 +00001070
1071
showard170873e2009-01-07 00:22:26 +00001072 def _create_mock_task(self, name):
1073 task = self.god.create_mock_class(monitor_db.AgentTask, name)
showard418785b2009-11-23 20:19:59 +00001074 task.num_processes = 1
showard6b733412009-04-27 20:09:18 +00001075 _set_host_and_qe_ids(task)
showard170873e2009-01-07 00:22:26 +00001076 return task
1077
showard8cc058f2009-09-08 16:26:33 +00001078 def _create_agent(self, task):
1079 agent = monitor_db.Agent(task)
showard6b733412009-04-27 20:09:18 +00001080 agent.dispatcher = self._dispatcher
1081 return agent
1082
1083
1084 def _finish_agent(self, agent):
1085 while not agent.is_done():
1086 agent.tick()
1087
showard170873e2009-01-07 00:22:26 +00001088
showard8cc058f2009-09-08 16:26:33 +00001089 def test_agent_abort(self):
1090 task = self._create_mock_task('task')
1091 task.poll.expect_call()
1092 task.is_done.expect_call().and_return(False)
1093 task.abort.expect_call()
1094 task.aborted = True
jadmanski3d161b02008-06-06 15:43:36 +00001095
showard8cc058f2009-09-08 16:26:33 +00001096 agent = self._create_agent(task)
showard6b733412009-04-27 20:09:18 +00001097 agent.tick()
1098 agent.abort()
1099 self._finish_agent(agent)
1100 self.god.check_playback()
1101
1102
showard08a36412009-05-05 01:01:13 +00001103 def _test_agent_abort_before_started_helper(self, ignore_abort=False):
showard20f9bdd2009-04-29 19:48:33 +00001104 task = self._create_mock_task('task')
showard08a36412009-05-05 01:01:13 +00001105 task.abort.expect_call()
1106 if ignore_abort:
1107 task.aborted = False
1108 task.poll.expect_call()
1109 task.is_done.expect_call().and_return(True)
showard08a36412009-05-05 01:01:13 +00001110 task.success = True
1111 else:
1112 task.aborted = True
1113
showard8cc058f2009-09-08 16:26:33 +00001114 agent = self._create_agent(task)
showard20f9bdd2009-04-29 19:48:33 +00001115 agent.abort()
showard20f9bdd2009-04-29 19:48:33 +00001116 self._finish_agent(agent)
1117 self.god.check_playback()
1118
1119
showard08a36412009-05-05 01:01:13 +00001120 def test_agent_abort_before_started(self):
1121 self._test_agent_abort_before_started_helper()
1122 self._test_agent_abort_before_started_helper(True)
1123
1124
showard77182562009-06-10 00:16:05 +00001125class DelayedCallTaskTest(unittest.TestCase):
1126 def setUp(self):
1127 self.god = mock.mock_god()
1128
1129
1130 def tearDown(self):
1131 self.god.unstub_all()
1132
1133
1134 def test_delayed_call(self):
mbligh1ef218d2009-08-03 16:57:56 +00001135 test_time = self.god.create_mock_function('time')
showard77182562009-06-10 00:16:05 +00001136 test_time.expect_call().and_return(33)
1137 test_time.expect_call().and_return(34.01)
1138 test_time.expect_call().and_return(34.99)
1139 test_time.expect_call().and_return(35.01)
1140 def test_callback():
1141 test_callback.calls += 1
1142 test_callback.calls = 0
1143 delay_task = monitor_db.DelayedCallTask(
1144 delay_seconds=2, callback=test_callback,
1145 now_func=test_time) # time 33
1146 self.assertEqual(35, delay_task.end_time)
showard418785b2009-11-23 20:19:59 +00001147 agent = monitor_db.Agent(delay_task)
showard8cc058f2009-09-08 16:26:33 +00001148 self.assert_(not agent.started)
showard77182562009-06-10 00:16:05 +00001149 agent.tick() # activates the task and polls it once, time 34.01
1150 self.assertEqual(0, test_callback.calls, "callback called early")
1151 agent.tick() # time 34.99
1152 self.assertEqual(0, test_callback.calls, "callback called early")
1153 agent.tick() # time 35.01
1154 self.assertEqual(1, test_callback.calls)
1155 self.assert_(agent.is_done())
1156 self.assert_(delay_task.is_done())
1157 self.assert_(delay_task.success)
1158 self.assert_(not delay_task.aborted)
1159 self.god.check_playback()
1160
1161
1162 def test_delayed_call_abort(self):
1163 delay_task = monitor_db.DelayedCallTask(
1164 delay_seconds=987654, callback=lambda : None)
showard418785b2009-11-23 20:19:59 +00001165 agent = monitor_db.Agent(delay_task)
showard77182562009-06-10 00:16:05 +00001166 agent.abort()
1167 agent.tick()
1168 self.assert_(agent.is_done())
1169 self.assert_(delay_task.aborted)
1170 self.assert_(delay_task.is_done())
1171 self.assert_(not delay_task.success)
1172 self.god.check_playback()
1173
1174
showard54c1ea92009-05-20 00:32:58 +00001175class HostTest(BaseSchedulerTest):
1176 def test_cmp_for_sort(self):
1177 expected_order = [
1178 'alice', 'Host1', 'host2', 'host3', 'host09', 'HOST010',
1179 'host10', 'host11', 'yolkfolk']
1180 hostname_idx = list(monitor_db.Host._fields).index('hostname')
1181 row = [None] * len(monitor_db.Host._fields)
1182 hosts = []
1183 for hostname in expected_order:
1184 row[hostname_idx] = hostname
1185 hosts.append(monitor_db.Host(row=row, new_record=True))
1186
1187 host1 = hosts[expected_order.index('Host1')]
1188 host010 = hosts[expected_order.index('HOST010')]
1189 host10 = hosts[expected_order.index('host10')]
1190 host3 = hosts[expected_order.index('host3')]
1191 alice = hosts[expected_order.index('alice')]
1192 self.assertEqual(0, monitor_db.Host.cmp_for_sort(host10, host10))
1193 self.assertEqual(1, monitor_db.Host.cmp_for_sort(host10, host010))
1194 self.assertEqual(-1, monitor_db.Host.cmp_for_sort(host010, host10))
1195 self.assertEqual(-1, monitor_db.Host.cmp_for_sort(host1, host10))
1196 self.assertEqual(-1, monitor_db.Host.cmp_for_sort(host1, host010))
1197 self.assertEqual(-1, monitor_db.Host.cmp_for_sort(host3, host10))
1198 self.assertEqual(-1, monitor_db.Host.cmp_for_sort(host3, host010))
1199 self.assertEqual(1, monitor_db.Host.cmp_for_sort(host3, host1))
1200 self.assertEqual(-1, monitor_db.Host.cmp_for_sort(host1, host3))
1201 self.assertEqual(-1, monitor_db.Host.cmp_for_sort(alice, host3))
1202 self.assertEqual(1, monitor_db.Host.cmp_for_sort(host3, alice))
1203 self.assertEqual(0, monitor_db.Host.cmp_for_sort(alice, alice))
1204
1205 hosts.sort(cmp=monitor_db.Host.cmp_for_sort)
1206 self.assertEqual(expected_order, [h.hostname for h in hosts])
1207
1208 hosts.reverse()
1209 hosts.sort(cmp=monitor_db.Host.cmp_for_sort)
1210 self.assertEqual(expected_order, [h.hostname for h in hosts])
1211
1212
showardf1ae3542009-05-11 19:26:02 +00001213class HostQueueEntryTest(BaseSchedulerTest):
1214 def _create_hqe(self, dependency_labels=(), **create_job_kwargs):
1215 job = self._create_job(**create_job_kwargs)
1216 for label in dependency_labels:
1217 job.dependency_labels.add(label)
1218 hqes = list(monitor_db.HostQueueEntry.fetch(where='job_id=%d' % job.id))
1219 self.assertEqual(1, len(hqes))
1220 return hqes[0]
1221
showard77182562009-06-10 00:16:05 +00001222
showardf1ae3542009-05-11 19:26:02 +00001223 def _check_hqe_labels(self, hqe, expected_labels):
1224 expected_labels = set(expected_labels)
1225 label_names = set(label.name for label in hqe.get_labels())
1226 self.assertEqual(expected_labels, label_names)
1227
showard77182562009-06-10 00:16:05 +00001228
showardf1ae3542009-05-11 19:26:02 +00001229 def test_get_labels_empty(self):
1230 hqe = self._create_hqe(hosts=[1])
1231 labels = list(hqe.get_labels())
1232 self.assertEqual([], labels)
1233
showard77182562009-06-10 00:16:05 +00001234
showardf1ae3542009-05-11 19:26:02 +00001235 def test_get_labels_metahost(self):
1236 hqe = self._create_hqe(metahosts=[2])
1237 self._check_hqe_labels(hqe, ['label2'])
1238
showard77182562009-06-10 00:16:05 +00001239
showardf1ae3542009-05-11 19:26:02 +00001240 def test_get_labels_dependancies(self):
1241 hqe = self._create_hqe(dependency_labels=(self.label3, self.label4),
1242 metahosts=[1])
1243 self._check_hqe_labels(hqe, ['label1', 'label3', 'label4'])
1244
1245
showardb2e2c322008-10-14 17:33:55 +00001246class JobTest(BaseSchedulerTest):
showard2bab8f42008-11-12 18:15:22 +00001247 def setUp(self):
1248 super(JobTest, self).setUp()
showard170873e2009-01-07 00:22:26 +00001249 self.god.stub_with(
1250 drone_manager.DroneManager, 'attach_file_to_execution',
1251 mock.mock_function('attach_file_to_execution',
1252 default_return_val='/test/path/tmp/foo'))
showard2bab8f42008-11-12 18:15:22 +00001253
showard8cc058f2009-09-08 16:26:33 +00001254 def _mock_create(**kwargs):
1255 task = models.SpecialTask(**kwargs)
1256 task.save()
1257 self._task = task
1258 self.god.stub_with(models.SpecialTask.objects, 'create', _mock_create)
1259
showard2bab8f42008-11-12 18:15:22 +00001260
showard77182562009-06-10 00:16:05 +00001261 def _test_pre_job_tasks_helper(self):
1262 """
showard8cc058f2009-09-08 16:26:33 +00001263 Calls HQE._do_schedule_pre_job_tasks() and returns the created special
1264 task
showard77182562009-06-10 00:16:05 +00001265 """
showard8cc058f2009-09-08 16:26:33 +00001266 self._task = None
1267 queue_entry = monitor_db.HostQueueEntry.fetch('id = 1')[0]
1268 queue_entry._do_schedule_pre_job_tasks()
1269 return self._task
showard2bab8f42008-11-12 18:15:22 +00001270
1271
showarde58e3f82008-11-20 19:04:59 +00001272 def _test_run_helper(self, expect_agent=True, expect_starting=False,
1273 expect_pending=False):
1274 if expect_starting:
1275 expected_status = models.HostQueueEntry.Status.STARTING
1276 elif expect_pending:
1277 expected_status = models.HostQueueEntry.Status.PENDING
1278 else:
1279 expected_status = models.HostQueueEntry.Status.VERIFYING
showard8cc058f2009-09-08 16:26:33 +00001280 job = monitor_db.Job.fetch('id = 1')[0]
1281 queue_entry = monitor_db.HostQueueEntry.fetch('id = 1')[0]
showard77182562009-06-10 00:16:05 +00001282 assert queue_entry.job is job
showard8cc058f2009-09-08 16:26:33 +00001283 job.run_if_ready(queue_entry)
showardb2e2c322008-10-14 17:33:55 +00001284
showard2bab8f42008-11-12 18:15:22 +00001285 self.god.check_playback()
showard8cc058f2009-09-08 16:26:33 +00001286
1287 self._dispatcher._schedule_delay_tasks()
1288 self._dispatcher._schedule_running_host_queue_entries()
1289 agent = self._dispatcher._agents[0]
1290
showard77182562009-06-10 00:16:05 +00001291 actual_status = models.HostQueueEntry.smart_get(1).status
1292 self.assertEquals(expected_status, actual_status)
showard2bab8f42008-11-12 18:15:22 +00001293
showard9976ce92008-10-15 20:28:13 +00001294 if not expect_agent:
1295 self.assertEquals(agent, None)
1296 return
1297
showardb2e2c322008-10-14 17:33:55 +00001298 self.assert_(isinstance(agent, monitor_db.Agent))
showard8cc058f2009-09-08 16:26:33 +00001299 self.assert_(agent.task)
1300 return agent.task
showardc9ae1782009-01-30 01:42:37 +00001301
1302
showard8375ce02009-10-12 20:35:13 +00001303 def test_schedule_running_host_queue_entries_fail(self):
1304 self._create_job(hosts=[2])
1305 self._update_hqe("status='%s', execution_subdir=''" %
1306 models.HostQueueEntry.Status.PENDING)
1307 job = monitor_db.Job.fetch('id = 1')[0]
1308 queue_entry = monitor_db.HostQueueEntry.fetch('id = 1')[0]
1309 assert queue_entry.job is job
1310 job.run_if_ready(queue_entry)
1311 self.assertEqual(queue_entry.status,
1312 models.HostQueueEntry.Status.STARTING)
1313 self.assert_(queue_entry.execution_subdir)
1314 self.god.check_playback()
1315
1316 class dummy_test_agent(object):
1317 task = 'dummy_test_agent'
1318 self._dispatcher._register_agent_for_ids(
1319 self._dispatcher._host_agents, [queue_entry.host.id],
1320 dummy_test_agent)
1321
1322 # Attempted to schedule on a host that already has an agent.
1323 self.assertRaises(monitor_db.SchedulerError,
1324 self._dispatcher._schedule_running_host_queue_entries)
1325
1326
showardd07a5f32009-12-07 19:36:20 +00001327 def test_job_request_abort(self):
1328 django_job = self._create_job(hosts=[5, 6], atomic_group=1)
1329 job = monitor_db.Job(django_job.id)
1330 job.request_abort()
1331 django_hqes = list(models.HostQueueEntry.objects.filter(job=job.id))
1332 for hqe in django_hqes:
1333 self.assertTrue(hqe.aborted)
1334
1335
showard77182562009-06-10 00:16:05 +00001336 def test_run_if_ready_delays(self):
1337 # Also tests Job.run_with_ready_delay() on atomic group jobs.
1338 django_job = self._create_job(hosts=[5, 6], atomic_group=1)
1339 job = monitor_db.Job(django_job.id)
1340 self.assertEqual(1, job.synch_count)
1341 django_hqes = list(models.HostQueueEntry.objects.filter(job=job.id))
1342 self.assertEqual(2, len(django_hqes))
1343 self.assertEqual(2, django_hqes[0].atomic_group.max_number_of_machines)
1344
1345 def set_hqe_status(django_hqe, status):
1346 django_hqe.status = status
1347 django_hqe.save()
1348 monitor_db.HostQueueEntry(django_hqe.id).host.set_status(status)
1349
1350 # An initial state, our synch_count is 1
1351 set_hqe_status(django_hqes[0], models.HostQueueEntry.Status.VERIFYING)
1352 set_hqe_status(django_hqes[1], models.HostQueueEntry.Status.PENDING)
1353
1354 # So that we don't depend on the config file value during the test.
1355 self.assert_(scheduler_config.config
1356 .secs_to_wait_for_atomic_group_hosts is not None)
1357 self.god.stub_with(scheduler_config.config,
1358 'secs_to_wait_for_atomic_group_hosts', 123456)
1359
1360 # Get the pending one as a monitor_db.HostQueueEntry object.
showard8cc058f2009-09-08 16:26:33 +00001361 hqe = monitor_db.HostQueueEntry(django_hqes[1].id)
showard77182562009-06-10 00:16:05 +00001362 self.assert_(not job._delay_ready_task)
1363 self.assertTrue(job.is_ready())
1364
1365 # Ready with one pending, one verifying and an atomic group should
1366 # result in a DelayCallTask to re-check if we're ready a while later.
showard8cc058f2009-09-08 16:26:33 +00001367 job.run_if_ready(hqe)
1368 self.assertEquals('Waiting', hqe.status)
1369 self._dispatcher._schedule_delay_tasks()
1370 self.assertEquals('Pending', hqe.status)
1371 agent = self._dispatcher._agents[0]
showard77182562009-06-10 00:16:05 +00001372 self.assert_(job._delay_ready_task)
1373 self.assert_(isinstance(agent, monitor_db.Agent))
showard8cc058f2009-09-08 16:26:33 +00001374 self.assert_(agent.task)
1375 delay_task = agent.task
1376 self.assert_(isinstance(delay_task, monitor_db.DelayedCallTask))
showard77182562009-06-10 00:16:05 +00001377 self.assert_(not delay_task.is_done())
1378
showard8cc058f2009-09-08 16:26:33 +00001379 self.god.stub_function(delay_task, 'abort')
1380
showard77182562009-06-10 00:16:05 +00001381 self.god.stub_function(job, 'run')
1382
showardd2014822009-10-12 20:26:58 +00001383 self.god.stub_function(job, '_pending_count')
showardd07a5f32009-12-07 19:36:20 +00001384 self.god.stub_with(job, 'synch_count', 9)
1385 self.god.stub_function(job, 'request_abort')
showardd2014822009-10-12 20:26:58 +00001386
showard77182562009-06-10 00:16:05 +00001387 # Test that the DelayedCallTask's callback queued up above does the
showardd2014822009-10-12 20:26:58 +00001388 # correct thing and does not call run if there are not enough hosts
1389 # in pending after the delay.
showardd2014822009-10-12 20:26:58 +00001390 job._pending_count.expect_call().and_return(0)
showardd07a5f32009-12-07 19:36:20 +00001391 job.request_abort.expect_call()
showardd2014822009-10-12 20:26:58 +00001392 delay_task._callback()
1393 self.god.check_playback()
1394
1395 # Test that the DelayedCallTask's callback queued up above does the
1396 # correct thing and returns the Agent returned by job.run() if
1397 # there are still enough hosts pending after the delay.
showardd07a5f32009-12-07 19:36:20 +00001398 job.synch_count = 4
showardd2014822009-10-12 20:26:58 +00001399 job._pending_count.expect_call().and_return(4)
showard8cc058f2009-09-08 16:26:33 +00001400 job.run.expect_call(hqe)
1401 delay_task._callback()
1402 self.god.check_playback()
showard77182562009-06-10 00:16:05 +00001403
showardd2014822009-10-12 20:26:58 +00001404 job._pending_count.expect_call().and_return(4)
1405
showard77182562009-06-10 00:16:05 +00001406 # Adjust the delay deadline so that enough time has passed.
1407 job._delay_ready_task.end_time = time.time() - 111111
showard8cc058f2009-09-08 16:26:33 +00001408 job.run.expect_call(hqe)
showard77182562009-06-10 00:16:05 +00001409 # ...the delay_expired condition should cause us to call run()
showard8cc058f2009-09-08 16:26:33 +00001410 self._dispatcher._handle_agents()
1411 self.god.check_playback()
1412 delay_task.success = False
showard77182562009-06-10 00:16:05 +00001413
1414 # Adjust the delay deadline back so that enough time has not passed.
1415 job._delay_ready_task.end_time = time.time() + 111111
showard8cc058f2009-09-08 16:26:33 +00001416 self._dispatcher._handle_agents()
1417 self.god.check_playback()
showard77182562009-06-10 00:16:05 +00001418
showard77182562009-06-10 00:16:05 +00001419 # Now max_number_of_machines HQEs are in pending state. Remaining
1420 # delay will now be ignored.
showard8cc058f2009-09-08 16:26:33 +00001421 other_hqe = monitor_db.HostQueueEntry(django_hqes[0].id)
1422 self.god.unstub(job, 'run')
showardd2014822009-10-12 20:26:58 +00001423 self.god.unstub(job, '_pending_count')
showardd07a5f32009-12-07 19:36:20 +00001424 self.god.unstub(job, 'synch_count')
1425 self.god.unstub(job, 'request_abort')
showard77182562009-06-10 00:16:05 +00001426 # ...the over_max_threshold test should cause us to call run()
showard8cc058f2009-09-08 16:26:33 +00001427 delay_task.abort.expect_call()
1428 other_hqe.on_pending()
1429 self.assertEquals('Starting', other_hqe.status)
1430 self.assertEquals('Starting', hqe.status)
1431 self.god.stub_function(job, 'run')
1432 self.god.unstub(delay_task, 'abort')
showard77182562009-06-10 00:16:05 +00001433
showard8cc058f2009-09-08 16:26:33 +00001434 hqe.set_status('Pending')
1435 other_hqe.set_status('Pending')
showard708b3522009-08-20 23:26:15 +00001436 # Now we're not over the max for the atomic group. But all assigned
1437 # hosts are in pending state. over_max_threshold should make us run().
showard8cc058f2009-09-08 16:26:33 +00001438 hqe.atomic_group.max_number_of_machines += 1
1439 hqe.atomic_group.save()
1440 job.run.expect_call(hqe)
1441 hqe.on_pending()
1442 self.god.check_playback()
1443 hqe.atomic_group.max_number_of_machines -= 1
1444 hqe.atomic_group.save()
showard708b3522009-08-20 23:26:15 +00001445
showard77182562009-06-10 00:16:05 +00001446 other_hqe = monitor_db.HostQueueEntry(django_hqes[0].id)
showard8cc058f2009-09-08 16:26:33 +00001447 self.assertTrue(hqe.job is other_hqe.job)
showard77182562009-06-10 00:16:05 +00001448 # DBObject classes should reuse instances so these should be the same.
1449 self.assertEqual(job, other_hqe.job)
showard8cc058f2009-09-08 16:26:33 +00001450 self.assertEqual(other_hqe.job, hqe.job)
showard77182562009-06-10 00:16:05 +00001451 # Be sure our delay was not lost during the other_hqe construction.
showard8cc058f2009-09-08 16:26:33 +00001452 self.assertEqual(job._delay_ready_task, delay_task)
showard77182562009-06-10 00:16:05 +00001453 self.assert_(job._delay_ready_task)
1454 self.assertFalse(job._delay_ready_task.is_done())
1455 self.assertFalse(job._delay_ready_task.aborted)
1456
1457 # We want the real run() to be called below.
1458 self.god.unstub(job, 'run')
1459
1460 # We pass in the other HQE this time the same way it would happen
1461 # for real when one host finishes verifying and enters pending.
showard8cc058f2009-09-08 16:26:33 +00001462 job.run_if_ready(other_hqe)
showard77182562009-06-10 00:16:05 +00001463
1464 # The delayed task must be aborted by the actual run() call above.
1465 self.assertTrue(job._delay_ready_task.aborted)
1466 self.assertFalse(job._delay_ready_task.success)
1467 self.assertTrue(job._delay_ready_task.is_done())
1468
1469 # Check that job run() and _finish_run() were called by the above:
showard8cc058f2009-09-08 16:26:33 +00001470 self._dispatcher._schedule_running_host_queue_entries()
1471 agent = self._dispatcher._agents[0]
1472 self.assert_(agent.task)
1473 task = agent.task
1474 self.assert_(isinstance(task, monitor_db.QueueTask))
showard77182562009-06-10 00:16:05 +00001475 # Requery these hqes in order to verify the status from the DB.
1476 django_hqes = list(models.HostQueueEntry.objects.filter(job=job.id))
1477 for entry in django_hqes:
1478 self.assertEqual(models.HostQueueEntry.Status.STARTING,
1479 entry.status)
1480
1481 # We're already running, but more calls to run_with_ready_delay can
1482 # continue to come in due to straggler hosts enter Pending. Make
1483 # sure we don't do anything.
showard8cc058f2009-09-08 16:26:33 +00001484 self.god.stub_function(job, 'run')
1485 job.run_with_ready_delay(hqe)
1486 self.god.check_playback()
1487 self.god.unstub(job, 'run')
showard77182562009-06-10 00:16:05 +00001488
1489
1490 def test__atomic_and_has_started__on_atomic(self):
1491 self._create_job(hosts=[5, 6], atomic_group=1)
showard8cc058f2009-09-08 16:26:33 +00001492 job = monitor_db.Job.fetch('id = 1')[0]
showard77182562009-06-10 00:16:05 +00001493 self.assertFalse(job._atomic_and_has_started())
showardaf8b4ca2009-06-16 18:47:26 +00001494
showard77182562009-06-10 00:16:05 +00001495 self._update_hqe("status='Pending'")
1496 self.assertFalse(job._atomic_and_has_started())
1497 self._update_hqe("status='Verifying'")
1498 self.assertFalse(job._atomic_and_has_started())
showardaf8b4ca2009-06-16 18:47:26 +00001499 self.assertFalse(job._atomic_and_has_started())
1500 self._update_hqe("status='Failed'")
1501 self.assertFalse(job._atomic_and_has_started())
1502 self._update_hqe("status='Stopped'")
1503 self.assertFalse(job._atomic_and_has_started())
1504
showard77182562009-06-10 00:16:05 +00001505 self._update_hqe("status='Starting'")
1506 self.assertTrue(job._atomic_and_has_started())
1507 self._update_hqe("status='Completed'")
1508 self.assertTrue(job._atomic_and_has_started())
1509 self._update_hqe("status='Aborted'")
showard77182562009-06-10 00:16:05 +00001510
1511
1512 def test__atomic_and_has_started__not_atomic(self):
1513 self._create_job(hosts=[1, 2])
showard8cc058f2009-09-08 16:26:33 +00001514 job = monitor_db.Job.fetch('id = 1')[0]
showard77182562009-06-10 00:16:05 +00001515 self.assertFalse(job._atomic_and_has_started())
1516 self._update_hqe("status='Starting'")
1517 self.assertFalse(job._atomic_and_has_started())
1518
1519
showard8cc058f2009-09-08 16:26:33 +00001520 def _check_special_task(self, task, task_type, queue_entry_id=None):
1521 self.assertEquals(task.task, task_type)
1522 self.assertEquals(task.host.id, 1)
1523 if queue_entry_id:
1524 self.assertEquals(task.queue_entry.id, queue_entry_id)
1525
1526
showardb2e2c322008-10-14 17:33:55 +00001527 def test_run_asynchronous(self):
1528 self._create_job(hosts=[1, 2])
1529
showard8cc058f2009-09-08 16:26:33 +00001530 task = self._test_pre_job_tasks_helper()
showardb2e2c322008-10-14 17:33:55 +00001531
showard8cc058f2009-09-08 16:26:33 +00001532 self._check_special_task(task, models.SpecialTask.Task.VERIFY, 1)
showardb2e2c322008-10-14 17:33:55 +00001533
showardb2e2c322008-10-14 17:33:55 +00001534
showard9976ce92008-10-15 20:28:13 +00001535 def test_run_asynchronous_skip_verify(self):
1536 job = self._create_job(hosts=[1, 2])
1537 job.run_verify = False
1538 job.save()
1539
showard8cc058f2009-09-08 16:26:33 +00001540 task = self._test_pre_job_tasks_helper()
showard9976ce92008-10-15 20:28:13 +00001541
showard8cc058f2009-09-08 16:26:33 +00001542 self.assertEquals(task, None)
showard9976ce92008-10-15 20:28:13 +00001543
1544
showardb2e2c322008-10-14 17:33:55 +00001545 def test_run_synchronous_verify(self):
1546 self._create_job(hosts=[1, 2], synchronous=True)
1547
showard8cc058f2009-09-08 16:26:33 +00001548 task = self._test_pre_job_tasks_helper()
1549
1550 self._check_special_task(task, models.SpecialTask.Task.VERIFY, 1)
showardb2e2c322008-10-14 17:33:55 +00001551
1552
showard9976ce92008-10-15 20:28:13 +00001553 def test_run_synchronous_skip_verify(self):
1554 job = self._create_job(hosts=[1, 2], synchronous=True)
1555 job.run_verify = False
1556 job.save()
1557
showard8cc058f2009-09-08 16:26:33 +00001558 task = self._test_pre_job_tasks_helper()
1559
1560 self.assertEquals(task, None)
showard9976ce92008-10-15 20:28:13 +00001561
1562
showardb2e2c322008-10-14 17:33:55 +00001563 def test_run_synchronous_ready(self):
1564 self._create_job(hosts=[1, 2], synchronous=True)
showardd9ac4452009-02-07 02:04:37 +00001565 self._update_hqe("status='Pending', execution_subdir=''")
showardb2e2c322008-10-14 17:33:55 +00001566
showard8cc058f2009-09-08 16:26:33 +00001567 queue_task = self._test_run_helper(expect_starting=True)
showardb2e2c322008-10-14 17:33:55 +00001568
1569 self.assert_(isinstance(queue_task, monitor_db.QueueTask))
1570 self.assertEquals(queue_task.job.id, 1)
1571 hqe_ids = [hqe.id for hqe in queue_task.queue_entries]
1572 self.assertEquals(hqe_ids, [1, 2])
1573
1574
showard77182562009-06-10 00:16:05 +00001575 def test_run_atomic_group_already_started(self):
1576 self._create_job(hosts=[5, 6], atomic_group=1, synchronous=True)
1577 self._update_hqe("status='Starting', execution_subdir=''")
1578
showard8cc058f2009-09-08 16:26:33 +00001579 job = monitor_db.Job.fetch('id = 1')[0]
1580 queue_entry = monitor_db.HostQueueEntry.fetch('id = 1')[0]
showard77182562009-06-10 00:16:05 +00001581 assert queue_entry.job is job
1582 self.assertEqual(None, job.run(queue_entry))
1583
1584 self.god.check_playback()
1585
1586
showardf1ae3542009-05-11 19:26:02 +00001587 def test_run_synchronous_atomic_group_ready(self):
1588 self._create_job(hosts=[5, 6], atomic_group=1, synchronous=True)
1589 self._update_hqe("status='Pending', execution_subdir=''")
1590
showard8cc058f2009-09-08 16:26:33 +00001591 queue_task = self._test_run_helper(expect_starting=True)
showardf1ae3542009-05-11 19:26:02 +00001592
1593 self.assert_(isinstance(queue_task, monitor_db.QueueTask))
showard77182562009-06-10 00:16:05 +00001594 # Atomic group jobs that do not depend on a specific label in the
1595 # atomic group will use the atomic group name as their group name.
showardd1195652009-12-08 22:21:02 +00001596 self.assertEquals(queue_task.queue_entries[0].get_group_name(),
1597 'atomic1')
showardf1ae3542009-05-11 19:26:02 +00001598
1599
1600 def test_run_synchronous_atomic_group_with_label_ready(self):
1601 job = self._create_job(hosts=[5, 6], atomic_group=1, synchronous=True)
1602 job.dependency_labels.add(self.label4)
1603 self._update_hqe("status='Pending', execution_subdir=''")
1604
showard8cc058f2009-09-08 16:26:33 +00001605 queue_task = self._test_run_helper(expect_starting=True)
showardf1ae3542009-05-11 19:26:02 +00001606
1607 self.assert_(isinstance(queue_task, monitor_db.QueueTask))
1608 # Atomic group jobs that also specify a label in the atomic group
1609 # will use the label name as their group name.
showardd1195652009-12-08 22:21:02 +00001610 self.assertEquals(queue_task.queue_entries[0].get_group_name(),
1611 'label4')
showardf1ae3542009-05-11 19:26:02 +00001612
1613
showard21baa452008-10-21 00:08:39 +00001614 def test_reboot_before_always(self):
1615 job = self._create_job(hosts=[1])
showard0fc38302008-10-23 00:44:07 +00001616 job.reboot_before = models.RebootBefore.ALWAYS
showard21baa452008-10-21 00:08:39 +00001617 job.save()
1618
showard8cc058f2009-09-08 16:26:33 +00001619 task = self._test_pre_job_tasks_helper()
1620
1621 self._check_special_task(task, models.SpecialTask.Task.CLEANUP)
showard21baa452008-10-21 00:08:39 +00001622
1623
1624 def _test_reboot_before_if_dirty_helper(self, expect_reboot):
1625 job = self._create_job(hosts=[1])
showard0fc38302008-10-23 00:44:07 +00001626 job.reboot_before = models.RebootBefore.IF_DIRTY
showard21baa452008-10-21 00:08:39 +00001627 job.save()
1628
showard8cc058f2009-09-08 16:26:33 +00001629 task = self._test_pre_job_tasks_helper()
showard21baa452008-10-21 00:08:39 +00001630 if expect_reboot:
showard8cc058f2009-09-08 16:26:33 +00001631 task_type = models.SpecialTask.Task.CLEANUP
1632 else:
1633 task_type = models.SpecialTask.Task.VERIFY
1634 self._check_special_task(task, task_type)
showard21baa452008-10-21 00:08:39 +00001635
showard77182562009-06-10 00:16:05 +00001636
showard21baa452008-10-21 00:08:39 +00001637 def test_reboot_before_if_dirty(self):
1638 models.Host.smart_get(1).update_object(dirty=True)
1639 self._test_reboot_before_if_dirty_helper(True)
1640
1641
1642 def test_reboot_before_not_dirty(self):
1643 models.Host.smart_get(1).update_object(dirty=False)
1644 self._test_reboot_before_if_dirty_helper(False)
1645
1646
showardf1ae3542009-05-11 19:26:02 +00001647 def test_next_group_name(self):
1648 django_job = self._create_job(metahosts=[1])
1649 job = monitor_db.Job(id=django_job.id)
1650 self.assertEqual('group0', job._next_group_name())
1651
1652 for hqe in django_job.hostqueueentry_set.filter():
1653 hqe.execution_subdir = 'my_rack.group0'
1654 hqe.save()
1655 self.assertEqual('my_rack.group1', job._next_group_name('my/rack'))
1656
1657
1658class TopLevelFunctionsTest(unittest.TestCase):
mblighe7d9c602009-07-02 19:02:33 +00001659 def setUp(self):
1660 self.god = mock.mock_god()
1661
1662
1663 def tearDown(self):
1664 self.god.unstub_all()
1665
1666
showardf1ae3542009-05-11 19:26:02 +00001667 def test_autoserv_command_line(self):
1668 machines = 'abcd12,efgh34'
showardf1ae3542009-05-11 19:26:02 +00001669 extra_args = ['-Z', 'hello']
showardf65b7402009-12-18 22:44:35 +00001670 expected_command_line_base = set((monitor_db._autoserv_path, '-p',
1671 '-m', machines, '-r',
1672 drone_manager.WORKING_DIRECTORY))
showardf1ae3542009-05-11 19:26:02 +00001673
showardf65b7402009-12-18 22:44:35 +00001674 expected_command_line = expected_command_line_base.union(
1675 ['--verbose']).union(extra_args)
1676 command_line = set(
1677 monitor_db._autoserv_command_line(machines, extra_args))
1678 self.assertEqual(expected_command_line, command_line)
showardf1ae3542009-05-11 19:26:02 +00001679
1680 class FakeJob(object):
1681 owner = 'Bob'
1682 name = 'fake job name'
mblighe7d9c602009-07-02 19:02:33 +00001683 id = 1337
1684
1685 class FakeHQE(object):
1686 job = FakeJob
showardf1ae3542009-05-11 19:26:02 +00001687
showardf65b7402009-12-18 22:44:35 +00001688 expected_command_line = expected_command_line_base.union(
1689 ['-u', FakeJob.owner, '-l', FakeJob.name])
1690 command_line = set(monitor_db._autoserv_command_line(
1691 machines, extra_args=[], queue_entry=FakeHQE, verbose=False))
1692 self.assertEqual(expected_command_line, command_line)
showardf1ae3542009-05-11 19:26:02 +00001693
showard21baa452008-10-21 00:08:39 +00001694
showardce38e0c2008-05-29 19:36:16 +00001695if __name__ == '__main__':
jadmanski0afbb632008-06-06 21:10:57 +00001696 unittest.main()