blob: f07fb07e896f71088b42cbe5a8e856307a1cdb77 [file] [log] [blame]
mbligh36768f02008-02-22 18:28:33 +00001#!/usr/bin/python -u
2
3"""
4Autotest scheduler
5"""
showard909c7a62008-07-15 21:52:38 +00006
mbligh36768f02008-02-22 18:28:33 +00007
showardef519212009-05-08 02:29:53 +00008import datetime, errno, optparse, os, pwd, Queue, re, shutil, signal
showard542e8402008-09-19 20:16:18 +00009import smtplib, socket, stat, subprocess, sys, tempfile, time, traceback
showard136e6dc2009-06-10 19:38:49 +000010import itertools, logging, weakref
mbligh70feeee2008-06-11 16:20:49 +000011import common
mbligh8bcd23a2009-02-03 19:14:06 +000012import MySQLdb
showard043c62a2009-06-10 19:48:57 +000013from autotest_lib.scheduler import scheduler_logging_config
showard21baa452008-10-21 00:08:39 +000014from autotest_lib.frontend import setup_django_environment
showard136e6dc2009-06-10 19:38:49 +000015from autotest_lib.client.common_lib import global_config, logging_manager
showardb18134f2009-03-20 20:52:18 +000016from autotest_lib.client.common_lib import host_protections, utils
showardb1e51872008-10-07 11:08:18 +000017from autotest_lib.database import database_connection
showard844960a2009-05-29 18:41:18 +000018from autotest_lib.frontend.afe import models, rpc_utils, readonly_connection
showard170873e2009-01-07 00:22:26 +000019from autotest_lib.scheduler import drone_manager, drones, email_manager
showard043c62a2009-06-10 19:48:57 +000020from autotest_lib.scheduler import monitor_db_cleanup
showardd1ee1dd2009-01-07 21:33:08 +000021from autotest_lib.scheduler import status_server, scheduler_config
mbligh70feeee2008-06-11 16:20:49 +000022
mblighb090f142008-02-27 21:33:46 +000023
mbligh36768f02008-02-22 18:28:33 +000024RESULTS_DIR = '.'
25AUTOSERV_NICE_LEVEL = 10
showard170873e2009-01-07 00:22:26 +000026DB_CONFIG_SECTION = 'AUTOTEST_WEB'
mbligh36768f02008-02-22 18:28:33 +000027AUTOTEST_PATH = os.path.join(os.path.dirname(__file__), '..')
28
29if os.environ.has_key('AUTOTEST_DIR'):
jadmanski0afbb632008-06-06 21:10:57 +000030 AUTOTEST_PATH = os.environ['AUTOTEST_DIR']
mbligh36768f02008-02-22 18:28:33 +000031AUTOTEST_SERVER_DIR = os.path.join(AUTOTEST_PATH, 'server')
32AUTOTEST_TKO_DIR = os.path.join(AUTOTEST_PATH, 'tko')
33
34if AUTOTEST_SERVER_DIR not in sys.path:
jadmanski0afbb632008-06-06 21:10:57 +000035 sys.path.insert(0, AUTOTEST_SERVER_DIR)
mbligh36768f02008-02-22 18:28:33 +000036
mbligh90a549d2008-03-25 23:52:34 +000037# how long to wait for autoserv to write a pidfile
38PIDFILE_TIMEOUT = 5 * 60 # 5 min
mblighbb421852008-03-11 22:36:16 +000039
showardd3dc1992009-04-22 21:01:40 +000040_AUTOSERV_PID_FILE = '.autoserv_execute'
41_CRASHINFO_PID_FILE = '.collect_crashinfo_execute'
42_PARSER_PID_FILE = '.parser_execute'
43
44_ALL_PIDFILE_NAMES = (_AUTOSERV_PID_FILE, _CRASHINFO_PID_FILE,
45 _PARSER_PID_FILE)
46
showard35162b02009-03-03 02:17:30 +000047# error message to leave in results dir when an autoserv process disappears
48# mysteriously
49_LOST_PROCESS_ERROR = """\
50Autoserv failed abnormally during execution for this job, probably due to a
51system error on the Autotest server. Full results may not be available. Sorry.
52"""
53
mbligh6f8bab42008-02-29 22:45:14 +000054_db = None
mbligh36768f02008-02-22 18:28:33 +000055_shutdown = False
showard170873e2009-01-07 00:22:26 +000056_autoserv_path = os.path.join(drones.AUTOTEST_INSTALL_DIR, 'server', 'autoserv')
57_parser_path = os.path.join(drones.AUTOTEST_INSTALL_DIR, 'tko', 'parse')
mbligh4314a712008-02-29 22:44:30 +000058_testing_mode = False
showard542e8402008-09-19 20:16:18 +000059_base_url = None
showardc85c21b2008-11-24 22:17:37 +000060_notify_email_statuses = []
showard170873e2009-01-07 00:22:26 +000061_drone_manager = drone_manager.DroneManager()
mbligh36768f02008-02-22 18:28:33 +000062
63
mbligh83c1e9e2009-05-01 23:10:41 +000064def _site_init_monitor_db_dummy():
65 return {}
66
67
mbligh36768f02008-02-22 18:28:33 +000068def main():
showard27f33872009-04-07 18:20:53 +000069 try:
70 main_without_exception_handling()
showard29caa4b2009-05-26 19:27:09 +000071 except SystemExit:
72 raise
showard27f33872009-04-07 18:20:53 +000073 except:
74 logging.exception('Exception escaping in monitor_db')
75 raise
76
77
78def main_without_exception_handling():
showard136e6dc2009-06-10 19:38:49 +000079 setup_logging()
mbligh36768f02008-02-22 18:28:33 +000080
showard136e6dc2009-06-10 19:38:49 +000081 usage = 'usage: %prog [options] results_dir'
jadmanski0afbb632008-06-06 21:10:57 +000082 parser = optparse.OptionParser(usage)
83 parser.add_option('--recover-hosts', help='Try to recover dead hosts',
84 action='store_true')
jadmanski0afbb632008-06-06 21:10:57 +000085 parser.add_option('--test', help='Indicate that scheduler is under ' +
86 'test and should use dummy autoserv and no parsing',
87 action='store_true')
88 (options, args) = parser.parse_args()
89 if len(args) != 1:
90 parser.print_usage()
91 return
mbligh36768f02008-02-22 18:28:33 +000092
showard5613c662009-06-08 23:30:33 +000093 scheduler_enabled = global_config.global_config.get_config_value(
94 scheduler_config.CONFIG_SECTION, 'enable_scheduler', type=bool)
95
96 if not scheduler_enabled:
97 msg = ("Scheduler not enabled, set enable_scheduler to true in the "
98 "global_config's SCHEDULER section to enabled it. Exiting.")
99 print msg
100 sys.exit(1)
101
jadmanski0afbb632008-06-06 21:10:57 +0000102 global RESULTS_DIR
103 RESULTS_DIR = args[0]
mbligh36768f02008-02-22 18:28:33 +0000104
mbligh83c1e9e2009-05-01 23:10:41 +0000105 site_init = utils.import_site_function(__file__,
106 "autotest_lib.scheduler.site_monitor_db", "site_init_monitor_db",
107 _site_init_monitor_db_dummy)
108 site_init()
109
showardcca334f2009-03-12 20:38:34 +0000110 # Change the cwd while running to avoid issues incase we were launched from
111 # somewhere odd (such as a random NFS home directory of the person running
112 # sudo to launch us as the appropriate user).
113 os.chdir(RESULTS_DIR)
114
jadmanski0afbb632008-06-06 21:10:57 +0000115 c = global_config.global_config
showardd1ee1dd2009-01-07 21:33:08 +0000116 notify_statuses_list = c.get_config_value(scheduler_config.CONFIG_SECTION,
117 "notify_email_statuses",
118 default='')
showardc85c21b2008-11-24 22:17:37 +0000119 global _notify_email_statuses
showard170873e2009-01-07 00:22:26 +0000120 _notify_email_statuses = [status for status in
121 re.split(r'[\s,;:]', notify_statuses_list.lower())
122 if status]
showardc85c21b2008-11-24 22:17:37 +0000123
jadmanski0afbb632008-06-06 21:10:57 +0000124 if options.test:
125 global _autoserv_path
126 _autoserv_path = 'autoserv_dummy'
127 global _testing_mode
128 _testing_mode = True
mbligh36768f02008-02-22 18:28:33 +0000129
mbligh37eceaa2008-12-15 22:56:37 +0000130 # AUTOTEST_WEB.base_url is still a supported config option as some people
131 # may wish to override the entire url.
showard542e8402008-09-19 20:16:18 +0000132 global _base_url
showard170873e2009-01-07 00:22:26 +0000133 config_base_url = c.get_config_value(DB_CONFIG_SECTION, 'base_url',
134 default='')
mbligh37eceaa2008-12-15 22:56:37 +0000135 if config_base_url:
136 _base_url = config_base_url
showard542e8402008-09-19 20:16:18 +0000137 else:
mbligh37eceaa2008-12-15 22:56:37 +0000138 # For the common case of everything running on a single server you
139 # can just set the hostname in a single place in the config file.
140 server_name = c.get_config_value('SERVER', 'hostname')
141 if not server_name:
showardb18134f2009-03-20 20:52:18 +0000142 logging.critical('[SERVER] hostname missing from the config file.')
mbligh37eceaa2008-12-15 22:56:37 +0000143 sys.exit(1)
144 _base_url = 'http://%s/afe/' % server_name
showard542e8402008-09-19 20:16:18 +0000145
showardc5afc462009-01-13 00:09:39 +0000146 server = status_server.StatusServer(_drone_manager)
showardd1ee1dd2009-01-07 21:33:08 +0000147 server.start()
148
jadmanski0afbb632008-06-06 21:10:57 +0000149 try:
showard136e6dc2009-06-10 19:38:49 +0000150 init()
showardc5afc462009-01-13 00:09:39 +0000151 dispatcher = Dispatcher()
showard915958d2009-04-22 21:00:58 +0000152 dispatcher.initialize(recover_hosts=options.recover_hosts)
showardc5afc462009-01-13 00:09:39 +0000153
jadmanski0afbb632008-06-06 21:10:57 +0000154 while not _shutdown:
155 dispatcher.tick()
showardd1ee1dd2009-01-07 21:33:08 +0000156 time.sleep(scheduler_config.config.tick_pause_sec)
jadmanski0afbb632008-06-06 21:10:57 +0000157 except:
showard170873e2009-01-07 00:22:26 +0000158 email_manager.manager.log_stacktrace(
159 "Uncaught exception; terminating monitor_db")
jadmanski0afbb632008-06-06 21:10:57 +0000160
showard170873e2009-01-07 00:22:26 +0000161 email_manager.manager.send_queued_emails()
showard55b4b542009-01-08 23:30:30 +0000162 server.shutdown()
showard170873e2009-01-07 00:22:26 +0000163 _drone_manager.shutdown()
jadmanski0afbb632008-06-06 21:10:57 +0000164 _db.disconnect()
mbligh36768f02008-02-22 18:28:33 +0000165
166
showard136e6dc2009-06-10 19:38:49 +0000167def setup_logging():
168 log_dir = os.environ.get('AUTOTEST_SCHEDULER_LOG_DIR', None)
169 log_name = os.environ.get('AUTOTEST_SCHEDULER_LOG_NAME', None)
170 logging_manager.configure_logging(
171 scheduler_logging_config.SchedulerLoggingConfig(), log_dir=log_dir,
172 logfile_name=log_name)
173
174
mbligh36768f02008-02-22 18:28:33 +0000175def handle_sigint(signum, frame):
jadmanski0afbb632008-06-06 21:10:57 +0000176 global _shutdown
177 _shutdown = True
showardb18134f2009-03-20 20:52:18 +0000178 logging.info("Shutdown request received.")
mbligh36768f02008-02-22 18:28:33 +0000179
180
showard136e6dc2009-06-10 19:38:49 +0000181def init():
showardb18134f2009-03-20 20:52:18 +0000182 logging.info("%s> dispatcher starting", time.strftime("%X %x"))
183 logging.info("My PID is %d", os.getpid())
mbligh36768f02008-02-22 18:28:33 +0000184
mblighfb676032009-04-01 18:25:38 +0000185 utils.write_pid("monitor_db")
186
showardb1e51872008-10-07 11:08:18 +0000187 if _testing_mode:
188 global_config.global_config.override_config_value(
showard170873e2009-01-07 00:22:26 +0000189 DB_CONFIG_SECTION, 'database', 'stresstest_autotest_web')
showardb1e51872008-10-07 11:08:18 +0000190
jadmanski0afbb632008-06-06 21:10:57 +0000191 os.environ['PATH'] = AUTOTEST_SERVER_DIR + ':' + os.environ['PATH']
192 global _db
showard170873e2009-01-07 00:22:26 +0000193 _db = database_connection.DatabaseConnection(DB_CONFIG_SECTION)
jadmanski0afbb632008-06-06 21:10:57 +0000194 _db.connect()
mbligh36768f02008-02-22 18:28:33 +0000195
showardfa8629c2008-11-04 16:51:23 +0000196 # ensure Django connection is in autocommit
197 setup_django_environment.enable_autocommit()
showard844960a2009-05-29 18:41:18 +0000198 # bypass the readonly connection
199 readonly_connection.ReadOnlyConnection.set_globally_disabled(True)
showardfa8629c2008-11-04 16:51:23 +0000200
showardb18134f2009-03-20 20:52:18 +0000201 logging.info("Setting signal handler")
jadmanski0afbb632008-06-06 21:10:57 +0000202 signal.signal(signal.SIGINT, handle_sigint)
203
showardd1ee1dd2009-01-07 21:33:08 +0000204 drones = global_config.global_config.get_config_value(
205 scheduler_config.CONFIG_SECTION, 'drones', default='localhost')
206 drone_list = [hostname.strip() for hostname in drones.split(',')]
showard170873e2009-01-07 00:22:26 +0000207 results_host = global_config.global_config.get_config_value(
showardd1ee1dd2009-01-07 21:33:08 +0000208 scheduler_config.CONFIG_SECTION, 'results_host', default='localhost')
showard170873e2009-01-07 00:22:26 +0000209 _drone_manager.initialize(RESULTS_DIR, drone_list, results_host)
210
showardb18134f2009-03-20 20:52:18 +0000211 logging.info("Connected! Running...")
mbligh36768f02008-02-22 18:28:33 +0000212
213
showard87ba02a2009-04-20 19:37:32 +0000214def _autoserv_command_line(machines, results_dir, extra_args, job=None,
215 queue_entry=None):
showardf1ae3542009-05-11 19:26:02 +0000216 """
217 @returns The autoserv command line as a list of executable + parameters.
218
219 @param machines - string - A machine or comma separated list of machines
220 for the (-m) flag.
221 @param results_dir - string - Where the results will be written (-r).
222 @param extra_args - list - Additional arguments to pass to autoserv.
223 @param job - Job object - If supplied, -u owner and -l name parameters
224 will be added.
225 @param queue_entry - A HostQueueEntry object - If supplied and no Job
226 object was supplied, this will be used to lookup the Job object.
227 """
showard87ba02a2009-04-20 19:37:32 +0000228 autoserv_argv = [_autoserv_path, '-p', '-m', machines,
229 '-r', _drone_manager.absolute_path(results_dir)]
230 if job or queue_entry:
231 if not job:
232 job = queue_entry.job
233 autoserv_argv += ['-u', job.owner, '-l', job.name]
234 return autoserv_argv + extra_args
235
236
showard89f84db2009-03-12 20:39:13 +0000237class SchedulerError(Exception):
238 """Raised by HostScheduler when an inconsistent state occurs."""
239
240
showard63a34772008-08-18 19:32:50 +0000241class HostScheduler(object):
242 def _get_ready_hosts(self):
243 # avoid any host with a currently active queue entry against it
244 hosts = Host.fetch(
245 joins='LEFT JOIN host_queue_entries AS active_hqe '
246 'ON (hosts.id = active_hqe.host_id AND '
showardb1e51872008-10-07 11:08:18 +0000247 'active_hqe.active)',
showard63a34772008-08-18 19:32:50 +0000248 where="active_hqe.host_id IS NULL "
showardb1e51872008-10-07 11:08:18 +0000249 "AND NOT hosts.locked "
showard63a34772008-08-18 19:32:50 +0000250 "AND (hosts.status IS NULL OR hosts.status = 'Ready')")
251 return dict((host.id, host) for host in hosts)
252
253
254 @staticmethod
255 def _get_sql_id_list(id_list):
256 return ','.join(str(item_id) for item_id in id_list)
257
258
259 @classmethod
showard989f25d2008-10-01 11:38:11 +0000260 def _get_many2many_dict(cls, query, id_list, flip=False):
mbligh849a0f62008-08-28 20:12:19 +0000261 if not id_list:
262 return {}
showard63a34772008-08-18 19:32:50 +0000263 query %= cls._get_sql_id_list(id_list)
264 rows = _db.execute(query)
showard989f25d2008-10-01 11:38:11 +0000265 return cls._process_many2many_dict(rows, flip)
266
267
268 @staticmethod
269 def _process_many2many_dict(rows, flip=False):
showard63a34772008-08-18 19:32:50 +0000270 result = {}
271 for row in rows:
showard89f84db2009-03-12 20:39:13 +0000272 left_id, right_id = int(row[0]), int(row[1])
showard989f25d2008-10-01 11:38:11 +0000273 if flip:
274 left_id, right_id = right_id, left_id
showard63a34772008-08-18 19:32:50 +0000275 result.setdefault(left_id, set()).add(right_id)
276 return result
277
278
279 @classmethod
280 def _get_job_acl_groups(cls, job_ids):
281 query = """
showardd9ac4452009-02-07 02:04:37 +0000282 SELECT jobs.id, acl_groups_users.aclgroup_id
showard63a34772008-08-18 19:32:50 +0000283 FROM jobs
284 INNER JOIN users ON users.login = jobs.owner
285 INNER JOIN acl_groups_users ON acl_groups_users.user_id = users.id
286 WHERE jobs.id IN (%s)
287 """
288 return cls._get_many2many_dict(query, job_ids)
289
290
291 @classmethod
292 def _get_job_ineligible_hosts(cls, job_ids):
293 query = """
294 SELECT job_id, host_id
295 FROM ineligible_host_queues
296 WHERE job_id IN (%s)
297 """
298 return cls._get_many2many_dict(query, job_ids)
299
300
301 @classmethod
showard989f25d2008-10-01 11:38:11 +0000302 def _get_job_dependencies(cls, job_ids):
303 query = """
304 SELECT job_id, label_id
305 FROM jobs_dependency_labels
306 WHERE job_id IN (%s)
307 """
308 return cls._get_many2many_dict(query, job_ids)
309
310
311 @classmethod
showard63a34772008-08-18 19:32:50 +0000312 def _get_host_acls(cls, host_ids):
313 query = """
showardd9ac4452009-02-07 02:04:37 +0000314 SELECT host_id, aclgroup_id
showard63a34772008-08-18 19:32:50 +0000315 FROM acl_groups_hosts
316 WHERE host_id IN (%s)
317 """
318 return cls._get_many2many_dict(query, host_ids)
319
320
321 @classmethod
322 def _get_label_hosts(cls, host_ids):
showardfa8629c2008-11-04 16:51:23 +0000323 if not host_ids:
324 return {}, {}
showard63a34772008-08-18 19:32:50 +0000325 query = """
326 SELECT label_id, host_id
327 FROM hosts_labels
328 WHERE host_id IN (%s)
showard989f25d2008-10-01 11:38:11 +0000329 """ % cls._get_sql_id_list(host_ids)
330 rows = _db.execute(query)
331 labels_to_hosts = cls._process_many2many_dict(rows)
332 hosts_to_labels = cls._process_many2many_dict(rows, flip=True)
333 return labels_to_hosts, hosts_to_labels
334
335
336 @classmethod
337 def _get_labels(cls):
338 return dict((label.id, label) for label in Label.fetch())
showard63a34772008-08-18 19:32:50 +0000339
340
341 def refresh(self, pending_queue_entries):
342 self._hosts_available = self._get_ready_hosts()
343
344 relevant_jobs = [queue_entry.job_id
345 for queue_entry in pending_queue_entries]
346 self._job_acls = self._get_job_acl_groups(relevant_jobs)
347 self._ineligible_hosts = self._get_job_ineligible_hosts(relevant_jobs)
showard989f25d2008-10-01 11:38:11 +0000348 self._job_dependencies = self._get_job_dependencies(relevant_jobs)
showard63a34772008-08-18 19:32:50 +0000349
350 host_ids = self._hosts_available.keys()
351 self._host_acls = self._get_host_acls(host_ids)
showard989f25d2008-10-01 11:38:11 +0000352 self._label_hosts, self._host_labels = self._get_label_hosts(host_ids)
353
354 self._labels = self._get_labels()
showard63a34772008-08-18 19:32:50 +0000355
356
357 def _is_acl_accessible(self, host_id, queue_entry):
358 job_acls = self._job_acls.get(queue_entry.job_id, set())
359 host_acls = self._host_acls.get(host_id, set())
360 return len(host_acls.intersection(job_acls)) > 0
361
362
showard989f25d2008-10-01 11:38:11 +0000363 def _check_job_dependencies(self, job_dependencies, host_labels):
364 missing = job_dependencies - host_labels
showard89f84db2009-03-12 20:39:13 +0000365 return len(missing) == 0
showard989f25d2008-10-01 11:38:11 +0000366
367
368 def _check_only_if_needed_labels(self, job_dependencies, host_labels,
369 queue_entry):
showardade14e22009-01-26 22:38:32 +0000370 if not queue_entry.meta_host:
371 # bypass only_if_needed labels when a specific host is selected
372 return True
373
showard989f25d2008-10-01 11:38:11 +0000374 for label_id in host_labels:
375 label = self._labels[label_id]
376 if not label.only_if_needed:
377 # we don't care about non-only_if_needed labels
378 continue
379 if queue_entry.meta_host == label_id:
380 # if the label was requested in a metahost it's OK
381 continue
382 if label_id not in job_dependencies:
383 return False
384 return True
385
386
showard89f84db2009-03-12 20:39:13 +0000387 def _check_atomic_group_labels(self, host_labels, queue_entry):
388 """
389 Determine if the given HostQueueEntry's atomic group settings are okay
390 to schedule on a host with the given labels.
391
392 @param host_labels - A list of label ids that the host has.
393 @param queue_entry - The HostQueueEntry being considered for the host.
394
395 @returns True if atomic group settings are okay, False otherwise.
396 """
397 return (self._get_host_atomic_group_id(host_labels) ==
398 queue_entry.atomic_group_id)
399
400
401 def _get_host_atomic_group_id(self, host_labels):
402 """
403 Return the atomic group label id for a host with the given set of
404 labels if any, or None otherwise. Raises an exception if more than
405 one atomic group are found in the set of labels.
406
407 @param host_labels - A list of label ids that the host has.
408
409 @returns The id of the atomic group found on a label in host_labels
410 or None if no atomic group label is found.
411 @raises SchedulerError - If more than one atomic group label is found.
412 """
413 atomic_ids = [self._labels[label_id].atomic_group_id
414 for label_id in host_labels
415 if self._labels[label_id].atomic_group_id is not None]
416 if not atomic_ids:
417 return None
418 if len(atomic_ids) > 1:
419 raise SchedulerError('More than one atomic label on host.')
420 return atomic_ids[0]
421
422
423 def _get_atomic_group_labels(self, atomic_group_id):
424 """
425 Lookup the label ids that an atomic_group is associated with.
426
427 @param atomic_group_id - The id of the AtomicGroup to look up.
428
429 @returns A generator yeilding Label ids for this atomic group.
430 """
431 return (id for id, label in self._labels.iteritems()
432 if label.atomic_group_id == atomic_group_id
433 and not label.invalid)
434
435
showard54c1ea92009-05-20 00:32:58 +0000436 def _get_eligible_host_ids_in_group(self, group_hosts, queue_entry):
showard89f84db2009-03-12 20:39:13 +0000437 """
438 @param group_hosts - A sequence of Host ids to test for usability
439 and eligibility against the Job associated with queue_entry.
440 @param queue_entry - The HostQueueEntry that these hosts are being
441 tested for eligibility against.
442
443 @returns A subset of group_hosts Host ids that are eligible for the
444 supplied queue_entry.
445 """
446 return set(host_id for host_id in group_hosts
447 if self._is_host_usable(host_id)
448 and self._is_host_eligible_for_job(host_id, queue_entry))
449
450
showard989f25d2008-10-01 11:38:11 +0000451 def _is_host_eligible_for_job(self, host_id, queue_entry):
showard2924b0a2009-06-18 23:16:15 +0000452 if self._is_host_invalid(host_id):
453 # if an invalid host is scheduled for a job, it's a one-time host
454 # and it therefore bypasses eligibility checks. note this can only
455 # happen for non-metahosts, because invalid hosts have their label
456 # relationships cleared.
457 return True
458
showard989f25d2008-10-01 11:38:11 +0000459 job_dependencies = self._job_dependencies.get(queue_entry.job_id, set())
460 host_labels = self._host_labels.get(host_id, set())
mblighc993bee2008-10-03 03:42:34 +0000461
showard89f84db2009-03-12 20:39:13 +0000462 return (self._is_acl_accessible(host_id, queue_entry) and
463 self._check_job_dependencies(job_dependencies, host_labels) and
464 self._check_only_if_needed_labels(
465 job_dependencies, host_labels, queue_entry) and
466 self._check_atomic_group_labels(host_labels, queue_entry))
showard989f25d2008-10-01 11:38:11 +0000467
468
showard2924b0a2009-06-18 23:16:15 +0000469 def _is_host_invalid(self, host_id):
470 host_object = self._hosts_available.get(host_id, None)
471 return host_object and host_object.invalid
472
473
showard63a34772008-08-18 19:32:50 +0000474 def _schedule_non_metahost(self, queue_entry):
showard989f25d2008-10-01 11:38:11 +0000475 if not self._is_host_eligible_for_job(queue_entry.host_id, queue_entry):
showard63a34772008-08-18 19:32:50 +0000476 return None
477 return self._hosts_available.pop(queue_entry.host_id, None)
478
479
480 def _is_host_usable(self, host_id):
481 if host_id not in self._hosts_available:
482 # host was already used during this scheduling cycle
483 return False
484 if self._hosts_available[host_id].invalid:
485 # Invalid hosts cannot be used for metahosts. They're included in
486 # the original query because they can be used by non-metahosts.
487 return False
488 return True
489
490
491 def _schedule_metahost(self, queue_entry):
492 label_id = queue_entry.meta_host
493 hosts_in_label = self._label_hosts.get(label_id, set())
494 ineligible_host_ids = self._ineligible_hosts.get(queue_entry.job_id,
495 set())
496
497 # must iterate over a copy so we can mutate the original while iterating
498 for host_id in list(hosts_in_label):
499 if not self._is_host_usable(host_id):
500 hosts_in_label.remove(host_id)
501 continue
502 if host_id in ineligible_host_ids:
503 continue
showard989f25d2008-10-01 11:38:11 +0000504 if not self._is_host_eligible_for_job(host_id, queue_entry):
showard63a34772008-08-18 19:32:50 +0000505 continue
506
showard89f84db2009-03-12 20:39:13 +0000507 # Remove the host from our cached internal state before returning
508 # the host object.
showard63a34772008-08-18 19:32:50 +0000509 hosts_in_label.remove(host_id)
510 return self._hosts_available.pop(host_id)
511 return None
512
513
514 def find_eligible_host(self, queue_entry):
515 if not queue_entry.meta_host:
showard89f84db2009-03-12 20:39:13 +0000516 assert queue_entry.host_id is not None
showard63a34772008-08-18 19:32:50 +0000517 return self._schedule_non_metahost(queue_entry)
showard89f84db2009-03-12 20:39:13 +0000518 assert queue_entry.atomic_group_id is None
showard63a34772008-08-18 19:32:50 +0000519 return self._schedule_metahost(queue_entry)
520
521
showard89f84db2009-03-12 20:39:13 +0000522 def find_eligible_atomic_group(self, queue_entry):
523 """
524 Given an atomic group host queue entry, locate an appropriate group
525 of hosts for the associated job to run on.
526
527 The caller is responsible for creating new HQEs for the additional
528 hosts returned in order to run the actual job on them.
529
530 @returns A list of Host instances in a ready state to satisfy this
531 atomic group scheduling. Hosts will all belong to the same
532 atomic group label as specified by the queue_entry.
533 An empty list will be returned if no suitable atomic
534 group could be found.
535
536 TODO(gps): what is responsible for kicking off any attempted repairs on
537 a group of hosts? not this function, but something needs to. We do
538 not communicate that reason for returning [] outside of here...
539 For now, we'll just be unschedulable if enough hosts within one group
540 enter Repair Failed state.
541 """
542 assert queue_entry.atomic_group_id is not None
543 job = queue_entry.job
544 assert job.synch_count and job.synch_count > 0
showard77182562009-06-10 00:16:05 +0000545 atomic_group = queue_entry.atomic_group
showard89f84db2009-03-12 20:39:13 +0000546 if job.synch_count > atomic_group.max_number_of_machines:
547 # Such a Job and HostQueueEntry should never be possible to
548 # create using the frontend. Regardless, we can't process it.
549 # Abort it immediately and log an error on the scheduler.
550 queue_entry.set_status(models.HostQueueEntry.Status.ABORTED)
showard7629f142009-03-27 21:02:02 +0000551 logging.error(
552 'Error: job %d synch_count=%d > requested atomic_group %d '
553 'max_number_of_machines=%d. Aborted host_queue_entry %d.',
554 job.id, job.synch_count, atomic_group.id,
555 atomic_group.max_number_of_machines, queue_entry.id)
showard89f84db2009-03-12 20:39:13 +0000556 return []
557 hosts_in_label = self._label_hosts.get(queue_entry.meta_host, set())
558 ineligible_host_ids = self._ineligible_hosts.get(queue_entry.job_id,
559 set())
560
561 # Look in each label associated with atomic_group until we find one with
562 # enough hosts to satisfy the job.
563 for atomic_label_id in self._get_atomic_group_labels(atomic_group.id):
564 group_hosts = set(self._label_hosts.get(atomic_label_id, set()))
565 if queue_entry.meta_host is not None:
566 # If we have a metahost label, only allow its hosts.
567 group_hosts.intersection_update(hosts_in_label)
568 group_hosts -= ineligible_host_ids
showard54c1ea92009-05-20 00:32:58 +0000569 eligible_host_ids_in_group = self._get_eligible_host_ids_in_group(
showard89f84db2009-03-12 20:39:13 +0000570 group_hosts, queue_entry)
571
572 # Job.synch_count is treated as "minimum synch count" when
573 # scheduling for an atomic group of hosts. The atomic group
574 # number of machines is the maximum to pick out of a single
575 # atomic group label for scheduling at one time.
576 min_hosts = job.synch_count
577 max_hosts = atomic_group.max_number_of_machines
578
showard54c1ea92009-05-20 00:32:58 +0000579 if len(eligible_host_ids_in_group) < min_hosts:
showard89f84db2009-03-12 20:39:13 +0000580 # Not enough eligible hosts in this atomic group label.
581 continue
582
showard54c1ea92009-05-20 00:32:58 +0000583 eligible_hosts_in_group = [self._hosts_available[id]
584 for id in eligible_host_ids_in_group]
showardef519212009-05-08 02:29:53 +0000585 # So that they show up in a sane order when viewing the job.
showard54c1ea92009-05-20 00:32:58 +0000586 eligible_hosts_in_group.sort(cmp=Host.cmp_for_sort)
showardef519212009-05-08 02:29:53 +0000587
showard89f84db2009-03-12 20:39:13 +0000588 # Limit ourselves to scheduling the atomic group size.
589 if len(eligible_hosts_in_group) > max_hosts:
showardef519212009-05-08 02:29:53 +0000590 eligible_hosts_in_group = eligible_hosts_in_group[:max_hosts]
showard89f84db2009-03-12 20:39:13 +0000591
592 # Remove the selected hosts from our cached internal state
593 # of available hosts in order to return the Host objects.
594 host_list = []
showard54c1ea92009-05-20 00:32:58 +0000595 for host in eligible_hosts_in_group:
596 hosts_in_label.discard(host.id)
597 self._hosts_available.pop(host.id)
598 host_list.append(host)
showard89f84db2009-03-12 20:39:13 +0000599 return host_list
600
601 return []
602
603
showard170873e2009-01-07 00:22:26 +0000604class Dispatcher(object):
jadmanski0afbb632008-06-06 21:10:57 +0000605 def __init__(self):
606 self._agents = []
showard3bb499f2008-07-03 19:42:20 +0000607 self._last_clean_time = time.time()
showard63a34772008-08-18 19:32:50 +0000608 self._host_scheduler = HostScheduler()
mblighf3294cc2009-04-08 21:17:38 +0000609 user_cleanup_time = scheduler_config.config.clean_interval
610 self._periodic_cleanup = monitor_db_cleanup.UserCleanup(
611 _db, user_cleanup_time)
612 self._24hr_upkeep = monitor_db_cleanup.TwentyFourHourUpkeep(_db)
showard170873e2009-01-07 00:22:26 +0000613 self._host_agents = {}
614 self._queue_entry_agents = {}
mbligh36768f02008-02-22 18:28:33 +0000615
mbligh36768f02008-02-22 18:28:33 +0000616
showard915958d2009-04-22 21:00:58 +0000617 def initialize(self, recover_hosts=True):
618 self._periodic_cleanup.initialize()
619 self._24hr_upkeep.initialize()
620
jadmanski0afbb632008-06-06 21:10:57 +0000621 # always recover processes
622 self._recover_processes()
mblighbb421852008-03-11 22:36:16 +0000623
jadmanski0afbb632008-06-06 21:10:57 +0000624 if recover_hosts:
625 self._recover_hosts()
mbligh36768f02008-02-22 18:28:33 +0000626
627
jadmanski0afbb632008-06-06 21:10:57 +0000628 def tick(self):
showard170873e2009-01-07 00:22:26 +0000629 _drone_manager.refresh()
mblighf3294cc2009-04-08 21:17:38 +0000630 self._run_cleanup()
jadmanski0afbb632008-06-06 21:10:57 +0000631 self._find_aborting()
showard1ff7b2e2009-05-15 23:17:18 +0000632 self._find_reverify()
showard29f7cd22009-04-29 21:16:24 +0000633 self._process_recurring_runs()
jadmanski0afbb632008-06-06 21:10:57 +0000634 self._schedule_new_jobs()
635 self._handle_agents()
showard170873e2009-01-07 00:22:26 +0000636 _drone_manager.execute_actions()
637 email_manager.manager.send_queued_emails()
mbligh36768f02008-02-22 18:28:33 +0000638
showard97aed502008-11-04 02:01:24 +0000639
mblighf3294cc2009-04-08 21:17:38 +0000640 def _run_cleanup(self):
641 self._periodic_cleanup.run_cleanup_maybe()
642 self._24hr_upkeep.run_cleanup_maybe()
showarda3ab0d52008-11-03 19:03:47 +0000643
mbligh36768f02008-02-22 18:28:33 +0000644
showard170873e2009-01-07 00:22:26 +0000645 def _register_agent_for_ids(self, agent_dict, object_ids, agent):
646 for object_id in object_ids:
647 agent_dict.setdefault(object_id, set()).add(agent)
648
649
650 def _unregister_agent_for_ids(self, agent_dict, object_ids, agent):
651 for object_id in object_ids:
652 assert object_id in agent_dict
653 agent_dict[object_id].remove(agent)
654
655
jadmanski0afbb632008-06-06 21:10:57 +0000656 def add_agent(self, agent):
657 self._agents.append(agent)
658 agent.dispatcher = self
showard170873e2009-01-07 00:22:26 +0000659 self._register_agent_for_ids(self._host_agents, agent.host_ids, agent)
660 self._register_agent_for_ids(self._queue_entry_agents,
661 agent.queue_entry_ids, agent)
mblighd5c95802008-03-05 00:33:46 +0000662
showard170873e2009-01-07 00:22:26 +0000663
664 def get_agents_for_entry(self, queue_entry):
665 """
666 Find agents corresponding to the specified queue_entry.
667 """
showardd3dc1992009-04-22 21:01:40 +0000668 return list(self._queue_entry_agents.get(queue_entry.id, set()))
showard170873e2009-01-07 00:22:26 +0000669
670
671 def host_has_agent(self, host):
672 """
673 Determine if there is currently an Agent present using this host.
674 """
675 return bool(self._host_agents.get(host.id, None))
mbligh36768f02008-02-22 18:28:33 +0000676
677
jadmanski0afbb632008-06-06 21:10:57 +0000678 def remove_agent(self, agent):
679 self._agents.remove(agent)
showard170873e2009-01-07 00:22:26 +0000680 self._unregister_agent_for_ids(self._host_agents, agent.host_ids,
681 agent)
682 self._unregister_agent_for_ids(self._queue_entry_agents,
683 agent.queue_entry_ids, agent)
showardec113162008-05-08 00:52:49 +0000684
685
jadmanski0afbb632008-06-06 21:10:57 +0000686 def _recover_processes(self):
showard170873e2009-01-07 00:22:26 +0000687 self._register_pidfiles()
688 _drone_manager.refresh()
showardd3dc1992009-04-22 21:01:40 +0000689 self._recover_all_recoverable_entries()
showard170873e2009-01-07 00:22:26 +0000690 self._requeue_other_active_entries()
showard170873e2009-01-07 00:22:26 +0000691 self._reverify_remaining_hosts()
692 # reinitialize drones after killing orphaned processes, since they can
693 # leave around files when they die
694 _drone_manager.execute_actions()
695 _drone_manager.reinitialize_drones()
mblighbb421852008-03-11 22:36:16 +0000696
showard170873e2009-01-07 00:22:26 +0000697
698 def _register_pidfiles(self):
699 # during recovery we may need to read pidfiles for both running and
700 # parsing entries
701 queue_entries = HostQueueEntry.fetch(
showardd3dc1992009-04-22 21:01:40 +0000702 where="status IN ('Running', 'Gathering', 'Parsing')")
jadmanski0afbb632008-06-06 21:10:57 +0000703 for queue_entry in queue_entries:
showardd3dc1992009-04-22 21:01:40 +0000704 for pidfile_name in _ALL_PIDFILE_NAMES:
705 pidfile_id = _drone_manager.get_pidfile_id_from(
706 queue_entry.execution_tag(), pidfile_name=pidfile_name)
707 _drone_manager.register_pidfile(pidfile_id)
showard170873e2009-01-07 00:22:26 +0000708
709
showardd3dc1992009-04-22 21:01:40 +0000710 def _recover_entries_with_status(self, status, orphans, pidfile_name,
711 recover_entries_fn):
712 queue_entries = HostQueueEntry.fetch(where="status = '%s'" % status)
showard170873e2009-01-07 00:22:26 +0000713 for queue_entry in queue_entries:
714 if self.get_agents_for_entry(queue_entry):
jadmanski0afbb632008-06-06 21:10:57 +0000715 # synchronous job we've already recovered
716 continue
showardd3dc1992009-04-22 21:01:40 +0000717 queue_entries = queue_entry.job.get_group_entries(queue_entry)
showard170873e2009-01-07 00:22:26 +0000718 execution_tag = queue_entry.execution_tag()
719 run_monitor = PidfileRunMonitor()
showardd3dc1992009-04-22 21:01:40 +0000720 run_monitor.attach_to_existing_process(execution_tag,
721 pidfile_name=pidfile_name)
showard597bfd32009-05-08 18:22:50 +0000722
723 log_message = ('Recovering %s entry %s ' %
724 (status.lower(),
725 ', '.join(str(entry) for entry in queue_entries)))
showard170873e2009-01-07 00:22:26 +0000726 if not run_monitor.has_process():
showardd3dc1992009-04-22 21:01:40 +0000727 # execution apparently never happened
showard597bfd32009-05-08 18:22:50 +0000728 logging.info(log_message + 'without process')
showardd3dc1992009-04-22 21:01:40 +0000729 recover_entries_fn(queue_entry.job, queue_entries, None)
showard170873e2009-01-07 00:22:26 +0000730 continue
mbligh90a549d2008-03-25 23:52:34 +0000731
showard597bfd32009-05-08 18:22:50 +0000732 logging.info(log_message + '(process %s)',
showardd3dc1992009-04-22 21:01:40 +0000733 run_monitor.get_process())
734 recover_entries_fn(queue_entry.job, queue_entries, run_monitor)
735 orphans.discard(run_monitor.get_process())
736
737
738 def _kill_remaining_orphan_processes(self, orphans):
739 for process in orphans:
showardb18134f2009-03-20 20:52:18 +0000740 logging.info('Killing orphan %s', process)
showard170873e2009-01-07 00:22:26 +0000741 _drone_manager.kill_process(process)
jadmanski0afbb632008-06-06 21:10:57 +0000742
showard170873e2009-01-07 00:22:26 +0000743
showardd3dc1992009-04-22 21:01:40 +0000744 def _recover_running_entries(self, orphans):
745 def recover_entries(job, queue_entries, run_monitor):
746 if run_monitor is not None:
747 queue_task = RecoveryQueueTask(job=job,
748 queue_entries=queue_entries,
749 run_monitor=run_monitor)
750 self.add_agent(Agent(tasks=[queue_task],
751 num_processes=len(queue_entries)))
752 # else, _requeue_other_active_entries will cover this
753
754 self._recover_entries_with_status(models.HostQueueEntry.Status.RUNNING,
755 orphans, '.autoserv_execute',
756 recover_entries)
757
758
759 def _recover_gathering_entries(self, orphans):
760 def recover_entries(job, queue_entries, run_monitor):
761 gather_task = GatherLogsTask(job, queue_entries,
762 run_monitor=run_monitor)
763 self.add_agent(Agent([gather_task]))
764
765 self._recover_entries_with_status(
766 models.HostQueueEntry.Status.GATHERING,
767 orphans, _CRASHINFO_PID_FILE, recover_entries)
768
769
770 def _recover_parsing_entries(self, orphans):
771 def recover_entries(job, queue_entries, run_monitor):
772 reparse_task = FinalReparseTask(queue_entries,
773 run_monitor=run_monitor)
774 self.add_agent(Agent([reparse_task], num_processes=0))
775
776 self._recover_entries_with_status(models.HostQueueEntry.Status.PARSING,
777 orphans, _PARSER_PID_FILE,
778 recover_entries)
779
780
781 def _recover_all_recoverable_entries(self):
782 orphans = _drone_manager.get_orphaned_autoserv_processes()
783 self._recover_running_entries(orphans)
784 self._recover_gathering_entries(orphans)
785 self._recover_parsing_entries(orphans)
786 self._kill_remaining_orphan_processes(orphans)
jadmanski0afbb632008-06-06 21:10:57 +0000787
showard97aed502008-11-04 02:01:24 +0000788
showard170873e2009-01-07 00:22:26 +0000789 def _requeue_other_active_entries(self):
790 queue_entries = HostQueueEntry.fetch(
showardd3dc1992009-04-22 21:01:40 +0000791 where='active AND NOT complete AND '
792 '(aborted OR status != "Pending")')
showard170873e2009-01-07 00:22:26 +0000793 for queue_entry in queue_entries:
794 if self.get_agents_for_entry(queue_entry):
795 # entry has already been recovered
796 continue
showardd3dc1992009-04-22 21:01:40 +0000797 if queue_entry.aborted:
798 queue_entry.abort(self)
799 continue
800
801 logging.info('Requeuing active QE %s (status=%s)', queue_entry,
showardb18134f2009-03-20 20:52:18 +0000802 queue_entry.status)
showard170873e2009-01-07 00:22:26 +0000803 if queue_entry.host:
804 tasks = queue_entry.host.reverify_tasks()
805 self.add_agent(Agent(tasks))
806 agent = queue_entry.requeue()
807
808
showard1ff7b2e2009-05-15 23:17:18 +0000809 def _find_reverify(self):
showard6d7b2ff2009-06-10 00:16:47 +0000810 tasks = models.SpecialTask.objects.filter(
811 task=models.SpecialTask.Task.REVERIFY, is_active=False,
812 is_complete=False)
813
814 host_ids = [str(task.host.id) for task in tasks]
815
816 if host_ids:
817 where = 'id IN (%s)' % ','.join(host_ids)
818 host_ids_reverifying = self._reverify_hosts_where(
819 where, cleanup=False)
820 tasks = tasks.filter(host__id__in=host_ids_reverifying)
821 for task in tasks:
822 task.is_active=True
823 task.save()
showard1ff7b2e2009-05-15 23:17:18 +0000824
825
showard170873e2009-01-07 00:22:26 +0000826 def _reverify_remaining_hosts(self):
showard45ae8192008-11-05 19:32:53 +0000827 # reverify hosts that were in the middle of verify, repair or cleanup
jadmanski0afbb632008-06-06 21:10:57 +0000828 self._reverify_hosts_where("""(status = 'Repairing' OR
829 status = 'Verifying' OR
showard170873e2009-01-07 00:22:26 +0000830 status = 'Cleaning')""")
jadmanski0afbb632008-06-06 21:10:57 +0000831
showard170873e2009-01-07 00:22:26 +0000832 # recover "Running" hosts with no active queue entries, although this
833 # should never happen
834 message = ('Recovering running host %s - this probably indicates a '
835 'scheduler bug')
jadmanski0afbb632008-06-06 21:10:57 +0000836 self._reverify_hosts_where("""status = 'Running' AND
837 id NOT IN (SELECT host_id
838 FROM host_queue_entries
839 WHERE active)""",
840 print_message=message)
mblighbb421852008-03-11 22:36:16 +0000841
842
jadmanski0afbb632008-06-06 21:10:57 +0000843 def _reverify_hosts_where(self, where,
showarda64e52a2009-06-08 23:24:08 +0000844 print_message='Reverifying host %s',
845 cleanup=True):
showard170873e2009-01-07 00:22:26 +0000846 full_where='locked = 0 AND invalid = 0 AND ' + where
showard6d7b2ff2009-06-10 00:16:47 +0000847 host_ids_reverifying = []
showard170873e2009-01-07 00:22:26 +0000848 for host in Host.fetch(where=full_where):
849 if self.host_has_agent(host):
850 # host has already been recovered in some way
jadmanski0afbb632008-06-06 21:10:57 +0000851 continue
showard170873e2009-01-07 00:22:26 +0000852 if print_message:
showardb18134f2009-03-20 20:52:18 +0000853 logging.info(print_message, host.hostname)
showarda64e52a2009-06-08 23:24:08 +0000854 tasks = host.reverify_tasks(cleanup)
showard170873e2009-01-07 00:22:26 +0000855 self.add_agent(Agent(tasks))
showard6d7b2ff2009-06-10 00:16:47 +0000856 host_ids_reverifying.append(host.id)
857 return host_ids_reverifying
mbligh36768f02008-02-22 18:28:33 +0000858
859
jadmanski0afbb632008-06-06 21:10:57 +0000860 def _recover_hosts(self):
861 # recover "Repair Failed" hosts
862 message = 'Reverifying dead host %s'
863 self._reverify_hosts_where("status = 'Repair Failed'",
864 print_message=message)
mbligh62ba2ed2008-04-30 17:09:25 +0000865
866
showard04c82c52008-05-29 19:38:12 +0000867
showardb95b1bd2008-08-15 18:11:04 +0000868 def _get_pending_queue_entries(self):
showard63a34772008-08-18 19:32:50 +0000869 # prioritize by job priority, then non-metahost over metahost, then FIFO
870 return list(HostQueueEntry.fetch(
showard25cbdbd2009-02-17 20:57:21 +0000871 joins='INNER JOIN jobs ON (job_id=jobs.id)',
showardac9ce222008-12-03 18:19:44 +0000872 where='NOT complete AND NOT active AND status="Queued"',
showard25cbdbd2009-02-17 20:57:21 +0000873 order_by='jobs.priority DESC, meta_host, job_id'))
mbligh36768f02008-02-22 18:28:33 +0000874
875
showard89f84db2009-03-12 20:39:13 +0000876 def _refresh_pending_queue_entries(self):
877 """
878 Lookup the pending HostQueueEntries and call our HostScheduler
879 refresh() method given that list. Return the list.
880
881 @returns A list of pending HostQueueEntries sorted in priority order.
882 """
showard63a34772008-08-18 19:32:50 +0000883 queue_entries = self._get_pending_queue_entries()
884 if not queue_entries:
showard89f84db2009-03-12 20:39:13 +0000885 return []
showardb95b1bd2008-08-15 18:11:04 +0000886
showard63a34772008-08-18 19:32:50 +0000887 self._host_scheduler.refresh(queue_entries)
showardb95b1bd2008-08-15 18:11:04 +0000888
showard89f84db2009-03-12 20:39:13 +0000889 return queue_entries
890
891
892 def _schedule_atomic_group(self, queue_entry):
893 """
894 Schedule the given queue_entry on an atomic group of hosts.
895
896 Returns immediately if there are insufficient available hosts.
897
898 Creates new HostQueueEntries based off of queue_entry for the
899 scheduled hosts and starts them all running.
900 """
901 # This is a virtual host queue entry representing an entire
902 # atomic group, find a group and schedule their hosts.
903 group_hosts = self._host_scheduler.find_eligible_atomic_group(
904 queue_entry)
905 if not group_hosts:
906 return
showardcbe6f942009-06-17 19:33:49 +0000907
908 logging.info('Expanding atomic group entry %s with hosts %s',
909 queue_entry,
910 ', '.join(host.hostname for host in group_hosts))
showard89f84db2009-03-12 20:39:13 +0000911 # The first assigned host uses the original HostQueueEntry
912 group_queue_entries = [queue_entry]
913 for assigned_host in group_hosts[1:]:
914 # Create a new HQE for every additional assigned_host.
915 new_hqe = HostQueueEntry.clone(queue_entry)
916 new_hqe.save()
917 group_queue_entries.append(new_hqe)
918 assert len(group_queue_entries) == len(group_hosts)
919 for queue_entry, host in itertools.izip(group_queue_entries,
920 group_hosts):
921 self._run_queue_entry(queue_entry, host)
922
923
924 def _schedule_new_jobs(self):
925 queue_entries = self._refresh_pending_queue_entries()
926 if not queue_entries:
927 return
928
showard63a34772008-08-18 19:32:50 +0000929 for queue_entry in queue_entries:
showard89f84db2009-03-12 20:39:13 +0000930 if (queue_entry.atomic_group_id is None or
931 queue_entry.host_id is not None):
932 assigned_host = self._host_scheduler.find_eligible_host(
933 queue_entry)
934 if assigned_host:
935 self._run_queue_entry(queue_entry, assigned_host)
936 else:
937 self._schedule_atomic_group(queue_entry)
showardb95b1bd2008-08-15 18:11:04 +0000938
939
940 def _run_queue_entry(self, queue_entry, host):
showard77182562009-06-10 00:16:05 +0000941 agent = queue_entry.run_pre_job_tasks(assigned_host=host)
942 self.add_agent(agent)
mblighd5c95802008-03-05 00:33:46 +0000943
944
jadmanski0afbb632008-06-06 21:10:57 +0000945 def _find_aborting(self):
showardd3dc1992009-04-22 21:01:40 +0000946 for entry in HostQueueEntry.fetch(where='aborted and not complete'):
947 for agent in self.get_agents_for_entry(entry):
948 agent.abort()
949 entry.abort(self)
jadmanski0afbb632008-06-06 21:10:57 +0000950
951
showard324bf812009-01-20 23:23:38 +0000952 def _can_start_agent(self, agent, num_started_this_cycle,
953 have_reached_limit):
showard4c5374f2008-09-04 17:02:56 +0000954 # always allow zero-process agents to run
955 if agent.num_processes == 0:
956 return True
957 # don't allow any nonzero-process agents to run after we've reached a
958 # limit (this avoids starvation of many-process agents)
959 if have_reached_limit:
960 return False
961 # total process throttling
showard324bf812009-01-20 23:23:38 +0000962 if agent.num_processes > _drone_manager.max_runnable_processes():
showard4c5374f2008-09-04 17:02:56 +0000963 return False
964 # if a single agent exceeds the per-cycle throttling, still allow it to
965 # run when it's the first agent in the cycle
966 if num_started_this_cycle == 0:
967 return True
968 # per-cycle throttling
969 if (num_started_this_cycle + agent.num_processes >
showardd1ee1dd2009-01-07 21:33:08 +0000970 scheduler_config.config.max_processes_started_per_cycle):
showard4c5374f2008-09-04 17:02:56 +0000971 return False
972 return True
973
974
jadmanski0afbb632008-06-06 21:10:57 +0000975 def _handle_agents(self):
jadmanski0afbb632008-06-06 21:10:57 +0000976 num_started_this_cycle = 0
showard4c5374f2008-09-04 17:02:56 +0000977 have_reached_limit = False
978 # iterate over copy, so we can remove agents during iteration
979 for agent in list(self._agents):
980 if agent.is_done():
showardb18134f2009-03-20 20:52:18 +0000981 logging.info("agent finished")
showard170873e2009-01-07 00:22:26 +0000982 self.remove_agent(agent)
showard4c5374f2008-09-04 17:02:56 +0000983 continue
984 if not agent.is_running():
showard324bf812009-01-20 23:23:38 +0000985 if not self._can_start_agent(agent, num_started_this_cycle,
showard4c5374f2008-09-04 17:02:56 +0000986 have_reached_limit):
987 have_reached_limit = True
988 continue
showard4c5374f2008-09-04 17:02:56 +0000989 num_started_this_cycle += agent.num_processes
990 agent.tick()
showarda9435c02009-05-13 21:28:17 +0000991 logging.info('%d running processes',
showardb18134f2009-03-20 20:52:18 +0000992 _drone_manager.total_running_processes())
mbligh36768f02008-02-22 18:28:33 +0000993
994
showard29f7cd22009-04-29 21:16:24 +0000995 def _process_recurring_runs(self):
996 recurring_runs = models.RecurringRun.objects.filter(
997 start_date__lte=datetime.datetime.now())
998 for rrun in recurring_runs:
999 # Create job from template
1000 job = rrun.job
1001 info = rpc_utils.get_job_info(job)
showarda9435c02009-05-13 21:28:17 +00001002 options = job.get_object_dict()
showard29f7cd22009-04-29 21:16:24 +00001003
1004 host_objects = info['hosts']
1005 one_time_hosts = info['one_time_hosts']
1006 metahost_objects = info['meta_hosts']
1007 dependencies = info['dependencies']
1008 atomic_group = info['atomic_group']
1009
1010 for host in one_time_hosts or []:
1011 this_host = models.Host.create_one_time_host(host.hostname)
1012 host_objects.append(this_host)
1013
1014 try:
1015 rpc_utils.create_new_job(owner=rrun.owner.login,
showarda9435c02009-05-13 21:28:17 +00001016 options=options,
showard29f7cd22009-04-29 21:16:24 +00001017 host_objects=host_objects,
1018 metahost_objects=metahost_objects,
showard29f7cd22009-04-29 21:16:24 +00001019 atomic_group=atomic_group)
1020
1021 except Exception, ex:
1022 logging.exception(ex)
1023 #TODO send email
1024
1025 if rrun.loop_count == 1:
1026 rrun.delete()
1027 else:
1028 if rrun.loop_count != 0: # if not infinite loop
1029 # calculate new start_date
1030 difference = datetime.timedelta(seconds=rrun.loop_period)
1031 rrun.start_date = rrun.start_date + difference
1032 rrun.loop_count -= 1
1033 rrun.save()
1034
1035
showard170873e2009-01-07 00:22:26 +00001036class PidfileRunMonitor(object):
1037 """
1038 Client must call either run() to start a new process or
1039 attach_to_existing_process().
1040 """
mbligh36768f02008-02-22 18:28:33 +00001041
showard170873e2009-01-07 00:22:26 +00001042 class _PidfileException(Exception):
1043 """
1044 Raised when there's some unexpected behavior with the pid file, but only
1045 used internally (never allowed to escape this class).
1046 """
mbligh36768f02008-02-22 18:28:33 +00001047
1048
showard170873e2009-01-07 00:22:26 +00001049 def __init__(self):
showard35162b02009-03-03 02:17:30 +00001050 self.lost_process = False
showard170873e2009-01-07 00:22:26 +00001051 self._start_time = None
1052 self.pidfile_id = None
1053 self._state = drone_manager.PidfileContents()
showard2bab8f42008-11-12 18:15:22 +00001054
1055
showard170873e2009-01-07 00:22:26 +00001056 def _add_nice_command(self, command, nice_level):
1057 if not nice_level:
1058 return command
1059 return ['nice', '-n', str(nice_level)] + command
1060
1061
1062 def _set_start_time(self):
1063 self._start_time = time.time()
1064
1065
1066 def run(self, command, working_directory, nice_level=None, log_file=None,
1067 pidfile_name=None, paired_with_pidfile=None):
1068 assert command is not None
1069 if nice_level is not None:
1070 command = ['nice', '-n', str(nice_level)] + command
1071 self._set_start_time()
1072 self.pidfile_id = _drone_manager.execute_command(
showardd3dc1992009-04-22 21:01:40 +00001073 command, working_directory, pidfile_name=pidfile_name,
1074 log_file=log_file, paired_with_pidfile=paired_with_pidfile)
showard170873e2009-01-07 00:22:26 +00001075
1076
showardd3dc1992009-04-22 21:01:40 +00001077 def attach_to_existing_process(self, execution_tag,
1078 pidfile_name=_AUTOSERV_PID_FILE):
showard170873e2009-01-07 00:22:26 +00001079 self._set_start_time()
showardd3dc1992009-04-22 21:01:40 +00001080 self.pidfile_id = _drone_manager.get_pidfile_id_from(
1081 execution_tag, pidfile_name=pidfile_name)
showard170873e2009-01-07 00:22:26 +00001082 _drone_manager.register_pidfile(self.pidfile_id)
mblighbb421852008-03-11 22:36:16 +00001083
1084
jadmanski0afbb632008-06-06 21:10:57 +00001085 def kill(self):
showard170873e2009-01-07 00:22:26 +00001086 if self.has_process():
1087 _drone_manager.kill_process(self.get_process())
mblighbb421852008-03-11 22:36:16 +00001088
mbligh36768f02008-02-22 18:28:33 +00001089
showard170873e2009-01-07 00:22:26 +00001090 def has_process(self):
showard21baa452008-10-21 00:08:39 +00001091 self._get_pidfile_info()
showard170873e2009-01-07 00:22:26 +00001092 return self._state.process is not None
showard21baa452008-10-21 00:08:39 +00001093
1094
showard170873e2009-01-07 00:22:26 +00001095 def get_process(self):
showard21baa452008-10-21 00:08:39 +00001096 self._get_pidfile_info()
showard35162b02009-03-03 02:17:30 +00001097 assert self._state.process is not None
showard170873e2009-01-07 00:22:26 +00001098 return self._state.process
mblighbb421852008-03-11 22:36:16 +00001099
1100
showard170873e2009-01-07 00:22:26 +00001101 def _read_pidfile(self, use_second_read=False):
1102 assert self.pidfile_id is not None, (
1103 'You must call run() or attach_to_existing_process()')
1104 contents = _drone_manager.get_pidfile_contents(
1105 self.pidfile_id, use_second_read=use_second_read)
1106 if contents.is_invalid():
1107 self._state = drone_manager.PidfileContents()
1108 raise self._PidfileException(contents)
1109 self._state = contents
mbligh90a549d2008-03-25 23:52:34 +00001110
1111
showard21baa452008-10-21 00:08:39 +00001112 def _handle_pidfile_error(self, error, message=''):
showard170873e2009-01-07 00:22:26 +00001113 message = error + '\nProcess: %s\nPidfile: %s\n%s' % (
1114 self._state.process, self.pidfile_id, message)
showardb18134f2009-03-20 20:52:18 +00001115 logging.info(message)
showard170873e2009-01-07 00:22:26 +00001116 email_manager.manager.enqueue_notify_email(error, message)
showard35162b02009-03-03 02:17:30 +00001117 self.on_lost_process(self._state.process)
showard21baa452008-10-21 00:08:39 +00001118
1119
1120 def _get_pidfile_info_helper(self):
showard35162b02009-03-03 02:17:30 +00001121 if self.lost_process:
showard21baa452008-10-21 00:08:39 +00001122 return
mblighbb421852008-03-11 22:36:16 +00001123
showard21baa452008-10-21 00:08:39 +00001124 self._read_pidfile()
mblighbb421852008-03-11 22:36:16 +00001125
showard170873e2009-01-07 00:22:26 +00001126 if self._state.process is None:
1127 self._handle_no_process()
showard21baa452008-10-21 00:08:39 +00001128 return
mbligh90a549d2008-03-25 23:52:34 +00001129
showard21baa452008-10-21 00:08:39 +00001130 if self._state.exit_status is None:
jadmanski0afbb632008-06-06 21:10:57 +00001131 # double check whether or not autoserv is running
showard170873e2009-01-07 00:22:26 +00001132 if _drone_manager.is_process_running(self._state.process):
showard21baa452008-10-21 00:08:39 +00001133 return
mbligh90a549d2008-03-25 23:52:34 +00001134
showard170873e2009-01-07 00:22:26 +00001135 # pid but no running process - maybe process *just* exited
1136 self._read_pidfile(use_second_read=True)
showard21baa452008-10-21 00:08:39 +00001137 if self._state.exit_status is None:
jadmanski0afbb632008-06-06 21:10:57 +00001138 # autoserv exited without writing an exit code
1139 # to the pidfile
showard21baa452008-10-21 00:08:39 +00001140 self._handle_pidfile_error(
1141 'autoserv died without writing exit code')
mblighbb421852008-03-11 22:36:16 +00001142
showard21baa452008-10-21 00:08:39 +00001143
1144 def _get_pidfile_info(self):
1145 """\
1146 After completion, self._state will contain:
1147 pid=None, exit_status=None if autoserv has not yet run
1148 pid!=None, exit_status=None if autoserv is running
1149 pid!=None, exit_status!=None if autoserv has completed
1150 """
1151 try:
1152 self._get_pidfile_info_helper()
showard170873e2009-01-07 00:22:26 +00001153 except self._PidfileException, exc:
showard21baa452008-10-21 00:08:39 +00001154 self._handle_pidfile_error('Pidfile error', traceback.format_exc())
mblighbb421852008-03-11 22:36:16 +00001155
1156
showard170873e2009-01-07 00:22:26 +00001157 def _handle_no_process(self):
jadmanski0afbb632008-06-06 21:10:57 +00001158 """\
1159 Called when no pidfile is found or no pid is in the pidfile.
1160 """
showard170873e2009-01-07 00:22:26 +00001161 message = 'No pid found at %s' % self.pidfile_id
showardb18134f2009-03-20 20:52:18 +00001162 logging.info(message)
showard170873e2009-01-07 00:22:26 +00001163 if time.time() - self._start_time > PIDFILE_TIMEOUT:
1164 email_manager.manager.enqueue_notify_email(
jadmanski0afbb632008-06-06 21:10:57 +00001165 'Process has failed to write pidfile', message)
showard35162b02009-03-03 02:17:30 +00001166 self.on_lost_process()
mbligh90a549d2008-03-25 23:52:34 +00001167
1168
showard35162b02009-03-03 02:17:30 +00001169 def on_lost_process(self, process=None):
jadmanski0afbb632008-06-06 21:10:57 +00001170 """\
1171 Called when autoserv has exited without writing an exit status,
1172 or we've timed out waiting for autoserv to write a pid to the
1173 pidfile. In either case, we just return failure and the caller
1174 should signal some kind of warning.
mbligh90a549d2008-03-25 23:52:34 +00001175
showard170873e2009-01-07 00:22:26 +00001176 process is unimportant here, as it shouldn't be used by anyone.
jadmanski0afbb632008-06-06 21:10:57 +00001177 """
1178 self.lost_process = True
showard170873e2009-01-07 00:22:26 +00001179 self._state.process = process
showard21baa452008-10-21 00:08:39 +00001180 self._state.exit_status = 1
1181 self._state.num_tests_failed = 0
mbligh90a549d2008-03-25 23:52:34 +00001182
1183
jadmanski0afbb632008-06-06 21:10:57 +00001184 def exit_code(self):
showard21baa452008-10-21 00:08:39 +00001185 self._get_pidfile_info()
1186 return self._state.exit_status
1187
1188
1189 def num_tests_failed(self):
1190 self._get_pidfile_info()
1191 assert self._state.num_tests_failed is not None
1192 return self._state.num_tests_failed
mblighbb421852008-03-11 22:36:16 +00001193
1194
mbligh36768f02008-02-22 18:28:33 +00001195class Agent(object):
showard77182562009-06-10 00:16:05 +00001196 """
1197 An agent for use by the Dispatcher class to perform a sequence of tasks.
1198
1199 The following methods are required on all task objects:
1200 poll() - Called periodically to let the task check its status and
1201 update its internal state. If the task succeeded.
1202 is_done() - Returns True if the task is finished.
1203 abort() - Called when an abort has been requested. The task must
1204 set its aborted attribute to True if it actually aborted.
1205
1206 The following attributes are required on all task objects:
1207 aborted - bool, True if this task was aborted.
1208 failure_tasks - A sequence of tasks to be run using a new Agent
1209 by the dispatcher should this task fail.
1210 success - bool, True if this task succeeded.
1211 queue_entry_ids - A sequence of HostQueueEntry ids this task handles.
1212 host_ids - A sequence of Host ids this task represents.
1213
1214 The following attribute is written to all task objects:
1215 agent - A reference to the Agent instance that the task has been
1216 added to.
1217 """
1218
1219
showard170873e2009-01-07 00:22:26 +00001220 def __init__(self, tasks, num_processes=1):
showard77182562009-06-10 00:16:05 +00001221 """
1222 @param tasks: A list of tasks as described in the class docstring.
1223 @param num_processes: The number of subprocesses the Agent represents.
1224 This is used by the Dispatcher for managing the load on the
1225 system. Defaults to 1.
1226 """
jadmanski0afbb632008-06-06 21:10:57 +00001227 self.active_task = None
showardd3dc1992009-04-22 21:01:40 +00001228 self.queue = None
showard77182562009-06-10 00:16:05 +00001229 # This is filled in by Dispatcher.add_agent()
jadmanski0afbb632008-06-06 21:10:57 +00001230 self.dispatcher = None
showard4c5374f2008-09-04 17:02:56 +00001231 self.num_processes = num_processes
jadmanski0afbb632008-06-06 21:10:57 +00001232
showard170873e2009-01-07 00:22:26 +00001233 self.queue_entry_ids = self._union_ids(task.queue_entry_ids
1234 for task in tasks)
1235 self.host_ids = self._union_ids(task.host_ids for task in tasks)
1236
showardd3dc1992009-04-22 21:01:40 +00001237 self._clear_queue()
jadmanski0afbb632008-06-06 21:10:57 +00001238 for task in tasks:
1239 self.add_task(task)
mbligh36768f02008-02-22 18:28:33 +00001240
1241
showardd3dc1992009-04-22 21:01:40 +00001242 def _clear_queue(self):
1243 self.queue = Queue.Queue(0)
1244
1245
showard170873e2009-01-07 00:22:26 +00001246 def _union_ids(self, id_lists):
1247 return set(itertools.chain(*id_lists))
1248
1249
jadmanski0afbb632008-06-06 21:10:57 +00001250 def add_task(self, task):
1251 self.queue.put_nowait(task)
1252 task.agent = self
mbligh36768f02008-02-22 18:28:33 +00001253
1254
jadmanski0afbb632008-06-06 21:10:57 +00001255 def tick(self):
showard21baa452008-10-21 00:08:39 +00001256 while not self.is_done():
showard08a36412009-05-05 01:01:13 +00001257 if self.active_task:
showard21baa452008-10-21 00:08:39 +00001258 self.active_task.poll()
1259 if not self.active_task.is_done():
1260 return
1261 self._next_task()
mbligh36768f02008-02-22 18:28:33 +00001262
1263
jadmanski0afbb632008-06-06 21:10:57 +00001264 def _next_task(self):
showardb18134f2009-03-20 20:52:18 +00001265 logging.info("agent picking task")
showard08a36412009-05-05 01:01:13 +00001266 if self.active_task is not None:
jadmanski0afbb632008-06-06 21:10:57 +00001267 assert self.active_task.is_done()
jadmanski0afbb632008-06-06 21:10:57 +00001268 if not self.active_task.success:
1269 self.on_task_failure()
showard08a36412009-05-05 01:01:13 +00001270 self.active_task = None
mblighe2586682008-02-29 22:45:46 +00001271
jadmanski0afbb632008-06-06 21:10:57 +00001272 if not self.is_done():
1273 self.active_task = self.queue.get_nowait()
mbligh36768f02008-02-22 18:28:33 +00001274
1275
jadmanski0afbb632008-06-06 21:10:57 +00001276 def on_task_failure(self):
showardd3dc1992009-04-22 21:01:40 +00001277 self._clear_queue()
showardccbd6c52009-03-21 00:10:21 +00001278 # run failure tasks in a new Agent, so host_ids and queue_entry_ids will
1279 # get reset.
1280 new_agent = Agent(self.active_task.failure_tasks)
1281 self.dispatcher.add_agent(new_agent)
mbligh16c722d2008-03-05 00:58:44 +00001282
mblighe2586682008-02-29 22:45:46 +00001283
showard4c5374f2008-09-04 17:02:56 +00001284 def is_running(self):
jadmanski0afbb632008-06-06 21:10:57 +00001285 return self.active_task is not None
showardec113162008-05-08 00:52:49 +00001286
1287
jadmanski0afbb632008-06-06 21:10:57 +00001288 def is_done(self):
mblighd876f452008-12-03 15:09:17 +00001289 return self.active_task is None and self.queue.empty()
mbligh36768f02008-02-22 18:28:33 +00001290
1291
showardd3dc1992009-04-22 21:01:40 +00001292 def abort(self):
showard08a36412009-05-05 01:01:13 +00001293 # abort tasks until the queue is empty or a task ignores the abort
1294 while not self.is_done():
1295 if not self.active_task:
1296 self._next_task()
showardd3dc1992009-04-22 21:01:40 +00001297 self.active_task.abort()
showard08a36412009-05-05 01:01:13 +00001298 if not self.active_task.aborted:
1299 # tasks can choose to ignore aborts
showard20f9bdd2009-04-29 19:48:33 +00001300 return
1301 self.active_task = None
1302
showardd3dc1992009-04-22 21:01:40 +00001303
showard77182562009-06-10 00:16:05 +00001304class DelayedCallTask(object):
1305 """
1306 A task object like AgentTask for an Agent to run that waits for the
1307 specified amount of time to have elapsed before calling the supplied
1308 callback once and finishing. If the callback returns anything, it is
1309 assumed to be a new Agent instance and will be added to the dispatcher.
1310
1311 @attribute end_time: The absolute posix time after which this task will
1312 call its callback when it is polled and be finished.
1313
1314 Also has all attributes required by the Agent class.
1315 """
1316 def __init__(self, delay_seconds, callback, now_func=None):
1317 """
1318 @param delay_seconds: The delay in seconds from now that this task
1319 will call the supplied callback and be done.
1320 @param callback: A callable to be called by this task once after at
1321 least delay_seconds time has elapsed. It must return None
1322 or a new Agent instance.
1323 @param now_func: A time.time like function. Default: time.time.
1324 Used for testing.
1325 """
1326 assert delay_seconds > 0
1327 assert callable(callback)
1328 if not now_func:
1329 now_func = time.time
1330 self._now_func = now_func
1331 self._callback = callback
1332
1333 self.end_time = self._now_func() + delay_seconds
1334
1335 # These attributes are required by Agent.
1336 self.aborted = False
1337 self.failure_tasks = ()
1338 self.host_ids = ()
1339 self.success = False
1340 self.queue_entry_ids = ()
1341 # This is filled in by Agent.add_task().
1342 self.agent = None
1343
1344
1345 def poll(self):
1346 if self._callback and self._now_func() >= self.end_time:
1347 new_agent = self._callback()
1348 if new_agent:
1349 self.agent.dispatcher.add_agent(new_agent)
1350 self._callback = None
1351 self.success = True
1352
1353
1354 def is_done(self):
1355 return not self._callback
1356
1357
1358 def abort(self):
1359 self.aborted = True
1360 self._callback = None
1361
1362
mbligh36768f02008-02-22 18:28:33 +00001363class AgentTask(object):
showardd3dc1992009-04-22 21:01:40 +00001364 def __init__(self, cmd, working_directory=None, failure_tasks=[],
1365 pidfile_name=None, paired_with_pidfile=None):
jadmanski0afbb632008-06-06 21:10:57 +00001366 self.done = False
1367 self.failure_tasks = failure_tasks
1368 self.started = False
1369 self.cmd = cmd
showard170873e2009-01-07 00:22:26 +00001370 self._working_directory = working_directory
jadmanski0afbb632008-06-06 21:10:57 +00001371 self.task = None
1372 self.agent = None
1373 self.monitor = None
1374 self.success = None
showardd3dc1992009-04-22 21:01:40 +00001375 self.aborted = False
showard170873e2009-01-07 00:22:26 +00001376 self.queue_entry_ids = []
1377 self.host_ids = []
1378 self.log_file = None
1379
1380
1381 def _set_ids(self, host=None, queue_entries=None):
1382 if queue_entries and queue_entries != [None]:
1383 self.host_ids = [entry.host.id for entry in queue_entries]
1384 self.queue_entry_ids = [entry.id for entry in queue_entries]
1385 else:
1386 assert host
1387 self.host_ids = [host.id]
mbligh36768f02008-02-22 18:28:33 +00001388
1389
jadmanski0afbb632008-06-06 21:10:57 +00001390 def poll(self):
showard08a36412009-05-05 01:01:13 +00001391 if not self.started:
1392 self.start()
1393 self.tick()
1394
1395
1396 def tick(self):
jadmanski0afbb632008-06-06 21:10:57 +00001397 if self.monitor:
showard08a36412009-05-05 01:01:13 +00001398 exit_code = self.monitor.exit_code()
1399 if exit_code is None:
1400 return
1401 success = (exit_code == 0)
jadmanski0afbb632008-06-06 21:10:57 +00001402 else:
1403 success = False
mbligh36768f02008-02-22 18:28:33 +00001404
jadmanski0afbb632008-06-06 21:10:57 +00001405 self.finished(success)
mbligh36768f02008-02-22 18:28:33 +00001406
1407
jadmanski0afbb632008-06-06 21:10:57 +00001408 def is_done(self):
1409 return self.done
mbligh36768f02008-02-22 18:28:33 +00001410
1411
jadmanski0afbb632008-06-06 21:10:57 +00001412 def finished(self, success):
showard08a36412009-05-05 01:01:13 +00001413 if self.done:
1414 return
jadmanski0afbb632008-06-06 21:10:57 +00001415 self.done = True
1416 self.success = success
1417 self.epilog()
mbligh36768f02008-02-22 18:28:33 +00001418
1419
jadmanski0afbb632008-06-06 21:10:57 +00001420 def prolog(self):
1421 pass
mblighd64e5702008-04-04 21:39:28 +00001422
1423
jadmanski0afbb632008-06-06 21:10:57 +00001424 def create_temp_resultsdir(self, suffix=''):
showard170873e2009-01-07 00:22:26 +00001425 self.temp_results_dir = _drone_manager.get_temporary_path('agent_task')
mblighd64e5702008-04-04 21:39:28 +00001426
mbligh36768f02008-02-22 18:28:33 +00001427
jadmanski0afbb632008-06-06 21:10:57 +00001428 def cleanup(self):
showard6b733412009-04-27 20:09:18 +00001429 if self.monitor and self.monitor.has_process() and self.log_file:
showard170873e2009-01-07 00:22:26 +00001430 _drone_manager.copy_to_results_repository(
1431 self.monitor.get_process(), self.log_file)
mbligh36768f02008-02-22 18:28:33 +00001432
1433
jadmanski0afbb632008-06-06 21:10:57 +00001434 def epilog(self):
1435 self.cleanup()
mbligh36768f02008-02-22 18:28:33 +00001436
1437
jadmanski0afbb632008-06-06 21:10:57 +00001438 def start(self):
1439 assert self.agent
1440
1441 if not self.started:
1442 self.prolog()
1443 self.run()
1444
1445 self.started = True
1446
1447
1448 def abort(self):
1449 if self.monitor:
1450 self.monitor.kill()
1451 self.done = True
showardd3dc1992009-04-22 21:01:40 +00001452 self.aborted = True
jadmanski0afbb632008-06-06 21:10:57 +00001453 self.cleanup()
1454
1455
showard170873e2009-01-07 00:22:26 +00001456 def set_host_log_file(self, base_name, host):
1457 filename = '%s.%s' % (time.time(), base_name)
1458 self.log_file = os.path.join('hosts', host.hostname, filename)
1459
1460
showardde634ee2009-01-30 01:44:24 +00001461 def _get_consistent_execution_tag(self, queue_entries):
1462 first_execution_tag = queue_entries[0].execution_tag()
1463 for queue_entry in queue_entries[1:]:
1464 assert queue_entry.execution_tag() == first_execution_tag, (
1465 '%s (%s) != %s (%s)' % (queue_entry.execution_tag(),
1466 queue_entry,
1467 first_execution_tag,
1468 queue_entries[0]))
1469 return first_execution_tag
1470
1471
showarda1e74b32009-05-12 17:32:04 +00001472 def _copy_results(self, queue_entries, use_monitor=None):
showardde634ee2009-01-30 01:44:24 +00001473 assert len(queue_entries) > 0
showard6b733412009-04-27 20:09:18 +00001474 if use_monitor is None:
1475 assert self.monitor
1476 use_monitor = self.monitor
1477 assert use_monitor.has_process()
showardde634ee2009-01-30 01:44:24 +00001478 execution_tag = self._get_consistent_execution_tag(queue_entries)
showard678df4f2009-02-04 21:36:39 +00001479 results_path = execution_tag + '/'
showard6b733412009-04-27 20:09:18 +00001480 _drone_manager.copy_to_results_repository(use_monitor.get_process(),
showard678df4f2009-02-04 21:36:39 +00001481 results_path)
showardde634ee2009-01-30 01:44:24 +00001482
showarda1e74b32009-05-12 17:32:04 +00001483
1484 def _parse_results(self, queue_entries):
showardde634ee2009-01-30 01:44:24 +00001485 reparse_task = FinalReparseTask(queue_entries)
1486 self.agent.dispatcher.add_agent(Agent([reparse_task], num_processes=0))
1487
1488
showarda1e74b32009-05-12 17:32:04 +00001489 def _copy_and_parse_results(self, queue_entries, use_monitor=None):
1490 self._copy_results(queue_entries, use_monitor)
1491 self._parse_results(queue_entries)
1492
1493
showardd3dc1992009-04-22 21:01:40 +00001494 def run(self, pidfile_name=_AUTOSERV_PID_FILE, paired_with_pidfile=None):
jadmanski0afbb632008-06-06 21:10:57 +00001495 if self.cmd:
showard170873e2009-01-07 00:22:26 +00001496 self.monitor = PidfileRunMonitor()
1497 self.monitor.run(self.cmd, self._working_directory,
1498 nice_level=AUTOSERV_NICE_LEVEL,
showardd3dc1992009-04-22 21:01:40 +00001499 log_file=self.log_file,
1500 pidfile_name=pidfile_name,
1501 paired_with_pidfile=paired_with_pidfile)
mbligh36768f02008-02-22 18:28:33 +00001502
1503
showardd9205182009-04-27 20:09:55 +00001504class TaskWithJobKeyvals(object):
1505 """AgentTask mixin providing functionality to help with job keyval files."""
1506 _KEYVAL_FILE = 'keyval'
1507 def _format_keyval(self, key, value):
1508 return '%s=%s' % (key, value)
1509
1510
1511 def _keyval_path(self):
1512 """Subclasses must override this"""
1513 raise NotImplemented
1514
1515
1516 def _write_keyval_after_job(self, field, value):
1517 assert self.monitor
1518 if not self.monitor.has_process():
1519 return
1520 _drone_manager.write_lines_to_file(
1521 self._keyval_path(), [self._format_keyval(field, value)],
1522 paired_with_process=self.monitor.get_process())
1523
1524
1525 def _job_queued_keyval(self, job):
1526 return 'job_queued', int(time.mktime(job.created_on.timetuple()))
1527
1528
1529 def _write_job_finished(self):
1530 self._write_keyval_after_job("job_finished", int(time.time()))
1531
1532
1533class RepairTask(AgentTask, TaskWithJobKeyvals):
showarde788ea62008-11-17 21:02:47 +00001534 def __init__(self, host, queue_entry=None):
jadmanski0afbb632008-06-06 21:10:57 +00001535 """\
showard170873e2009-01-07 00:22:26 +00001536 queue_entry: queue entry to mark failed if this repair fails.
jadmanski0afbb632008-06-06 21:10:57 +00001537 """
jadmanskifb7cfb12008-07-09 14:13:21 +00001538 protection = host_protections.Protection.get_string(host.protection)
jadmanski542537f2008-07-24 14:14:56 +00001539 # normalize the protection name
1540 protection = host_protections.Protection.get_attr_name(protection)
showard170873e2009-01-07 00:22:26 +00001541
jadmanski0afbb632008-06-06 21:10:57 +00001542 self.host = host
showardccbd6c52009-03-21 00:10:21 +00001543 self.queue_entry_to_fail = queue_entry
1544 # *don't* include the queue entry in IDs -- if the queue entry is
1545 # aborted, we want to leave the repair task running
1546 self._set_ids(host=host)
showard170873e2009-01-07 00:22:26 +00001547
1548 self.create_temp_resultsdir('.repair')
showard87ba02a2009-04-20 19:37:32 +00001549 cmd = _autoserv_command_line(host.hostname, self.temp_results_dir,
1550 ['-R', '--host-protection', protection],
1551 queue_entry=queue_entry)
showard170873e2009-01-07 00:22:26 +00001552 super(RepairTask, self).__init__(cmd, self.temp_results_dir)
1553
showard170873e2009-01-07 00:22:26 +00001554 self.set_host_log_file('repair', self.host)
mblighe2586682008-02-29 22:45:46 +00001555
mbligh36768f02008-02-22 18:28:33 +00001556
jadmanski0afbb632008-06-06 21:10:57 +00001557 def prolog(self):
showardb18134f2009-03-20 20:52:18 +00001558 logging.info("repair_task starting")
jadmanski0afbb632008-06-06 21:10:57 +00001559 self.host.set_status('Repairing')
showardccbd6c52009-03-21 00:10:21 +00001560 if self.queue_entry_to_fail:
1561 self.queue_entry_to_fail.requeue()
mbligh36768f02008-02-22 18:28:33 +00001562
1563
showardd9205182009-04-27 20:09:55 +00001564 def _keyval_path(self):
1565 return os.path.join(self.temp_results_dir, self._KEYVAL_FILE)
1566
1567
showardde634ee2009-01-30 01:44:24 +00001568 def _fail_queue_entry(self):
showardccbd6c52009-03-21 00:10:21 +00001569 assert self.queue_entry_to_fail
1570
1571 if self.queue_entry_to_fail.meta_host:
1572 return # don't fail metahost entries, they'll be reassigned
1573
1574 self.queue_entry_to_fail.update_from_database()
1575 if self.queue_entry_to_fail.status != 'Queued':
1576 return # entry has been aborted
1577
1578 self.queue_entry_to_fail.set_execution_subdir()
showardd9205182009-04-27 20:09:55 +00001579 queued_key, queued_time = self._job_queued_keyval(
1580 self.queue_entry_to_fail.job)
1581 self._write_keyval_after_job(queued_key, queued_time)
1582 self._write_job_finished()
showard678df4f2009-02-04 21:36:39 +00001583 # copy results logs into the normal place for job results
1584 _drone_manager.copy_results_on_drone(
1585 self.monitor.get_process(),
1586 source_path=self.temp_results_dir + '/',
showardccbd6c52009-03-21 00:10:21 +00001587 destination_path=self.queue_entry_to_fail.execution_tag() + '/')
showard678df4f2009-02-04 21:36:39 +00001588
showarda1e74b32009-05-12 17:32:04 +00001589 self._copy_results([self.queue_entry_to_fail])
1590 if self.queue_entry_to_fail.job.parse_failed_repair:
1591 self._parse_results([self.queue_entry_to_fail])
showardccbd6c52009-03-21 00:10:21 +00001592 self.queue_entry_to_fail.handle_host_failure()
showardde634ee2009-01-30 01:44:24 +00001593
1594
jadmanski0afbb632008-06-06 21:10:57 +00001595 def epilog(self):
1596 super(RepairTask, self).epilog()
showard6d7b2ff2009-06-10 00:16:47 +00001597
1598 tasks = models.SpecialTask.objects.filter(host__id=self.host.id,
1599 is_active=True)
1600 for task in tasks:
1601 task.is_complete = True
1602 task.save()
1603
jadmanski0afbb632008-06-06 21:10:57 +00001604 if self.success:
1605 self.host.set_status('Ready')
1606 else:
1607 self.host.set_status('Repair Failed')
showardccbd6c52009-03-21 00:10:21 +00001608 if self.queue_entry_to_fail:
showardde634ee2009-01-30 01:44:24 +00001609 self._fail_queue_entry()
mbligh36768f02008-02-22 18:28:33 +00001610
1611
showard8fe93b52008-11-18 17:53:22 +00001612class PreJobTask(AgentTask):
showard170873e2009-01-07 00:22:26 +00001613 def epilog(self):
1614 super(PreJobTask, self).epilog()
showard8fe93b52008-11-18 17:53:22 +00001615 should_copy_results = (self.queue_entry and not self.success
1616 and not self.queue_entry.meta_host)
1617 if should_copy_results:
1618 self.queue_entry.set_execution_subdir()
showard170873e2009-01-07 00:22:26 +00001619 destination = os.path.join(self.queue_entry.execution_tag(),
1620 os.path.basename(self.log_file))
1621 _drone_manager.copy_to_results_repository(
1622 self.monitor.get_process(), self.log_file,
1623 destination_path=destination)
showard8fe93b52008-11-18 17:53:22 +00001624
1625
1626class VerifyTask(PreJobTask):
showard9976ce92008-10-15 20:28:13 +00001627 def __init__(self, queue_entry=None, host=None):
jadmanski0afbb632008-06-06 21:10:57 +00001628 assert bool(queue_entry) != bool(host)
jadmanski0afbb632008-06-06 21:10:57 +00001629 self.host = host or queue_entry.host
1630 self.queue_entry = queue_entry
mbligh36768f02008-02-22 18:28:33 +00001631
jadmanski0afbb632008-06-06 21:10:57 +00001632 self.create_temp_resultsdir('.verify')
showard87ba02a2009-04-20 19:37:32 +00001633 cmd = _autoserv_command_line(self.host.hostname, self.temp_results_dir,
1634 ['-v'], queue_entry=queue_entry)
showarde788ea62008-11-17 21:02:47 +00001635 failure_tasks = [RepairTask(self.host, queue_entry=queue_entry)]
showard170873e2009-01-07 00:22:26 +00001636 super(VerifyTask, self).__init__(cmd, self.temp_results_dir,
1637 failure_tasks=failure_tasks)
mblighe2586682008-02-29 22:45:46 +00001638
showard170873e2009-01-07 00:22:26 +00001639 self.set_host_log_file('verify', self.host)
1640 self._set_ids(host=host, queue_entries=[queue_entry])
mblighe2586682008-02-29 22:45:46 +00001641
1642
jadmanski0afbb632008-06-06 21:10:57 +00001643 def prolog(self):
showard8fe93b52008-11-18 17:53:22 +00001644 super(VerifyTask, self).prolog()
showardb18134f2009-03-20 20:52:18 +00001645 logging.info("starting verify on %s", self.host.hostname)
jadmanski0afbb632008-06-06 21:10:57 +00001646 if self.queue_entry:
1647 self.queue_entry.set_status('Verifying')
jadmanski0afbb632008-06-06 21:10:57 +00001648 self.host.set_status('Verifying')
mbligh36768f02008-02-22 18:28:33 +00001649
1650
jadmanski0afbb632008-06-06 21:10:57 +00001651 def epilog(self):
1652 super(VerifyTask, self).epilog()
mbligh36768f02008-02-22 18:28:33 +00001653
jadmanski0afbb632008-06-06 21:10:57 +00001654 if self.success:
showard6d7b2ff2009-06-10 00:16:47 +00001655 tasks = models.SpecialTask.objects.filter(host__id=self.host.id,
1656 is_active=True)
1657 for task in tasks:
1658 task.is_complete=True
1659 task.save()
1660
jadmanski0afbb632008-06-06 21:10:57 +00001661 self.host.set_status('Ready')
mbligh36768f02008-02-22 18:28:33 +00001662
1663
showardb5626452009-06-30 01:57:28 +00001664class CleanupHostsMixin(object):
1665 def _reboot_hosts(self, job, queue_entries, final_success,
1666 num_tests_failed):
1667 reboot_after = job.reboot_after
1668 do_reboot = (
1669 # always reboot after aborted jobs
1670 self._final_status == models.HostQueueEntry.Status.ABORTED
1671 or reboot_after == models.RebootAfter.ALWAYS
1672 or (reboot_after == models.RebootAfter.IF_ALL_TESTS_PASSED
1673 and final_success and num_tests_failed == 0))
1674
1675 for queue_entry in queue_entries:
1676 if do_reboot:
1677 # don't pass the queue entry to the CleanupTask. if the cleanup
1678 # fails, the job doesn't care -- it's over.
1679 cleanup_task = CleanupTask(host=queue_entry.host)
1680 self.agent.dispatcher.add_agent(Agent([cleanup_task]))
1681 else:
1682 queue_entry.host.set_status('Ready')
1683
1684
1685class QueueTask(AgentTask, TaskWithJobKeyvals, CleanupHostsMixin):
showardf1ae3542009-05-11 19:26:02 +00001686 def __init__(self, job, queue_entries, cmd, group_name=''):
jadmanski0afbb632008-06-06 21:10:57 +00001687 self.job = job
1688 self.queue_entries = queue_entries
showardf1ae3542009-05-11 19:26:02 +00001689 self.group_name = group_name
showard170873e2009-01-07 00:22:26 +00001690 super(QueueTask, self).__init__(cmd, self._execution_tag())
1691 self._set_ids(queue_entries=queue_entries)
mbligh36768f02008-02-22 18:28:33 +00001692
1693
showard73ec0442009-02-07 02:05:20 +00001694 def _keyval_path(self):
showardd9205182009-04-27 20:09:55 +00001695 return os.path.join(self._execution_tag(), self._KEYVAL_FILE)
showard73ec0442009-02-07 02:05:20 +00001696
1697
1698 def _write_keyvals_before_job_helper(self, keyval_dict, keyval_path):
1699 keyval_contents = '\n'.join(self._format_keyval(key, value)
1700 for key, value in keyval_dict.iteritems())
1701 # always end with a newline to allow additional keyvals to be written
1702 keyval_contents += '\n'
1703 _drone_manager.attach_file_to_execution(self._execution_tag(),
1704 keyval_contents,
1705 file_path=keyval_path)
1706
1707
1708 def _write_keyvals_before_job(self, keyval_dict):
1709 self._write_keyvals_before_job_helper(keyval_dict, self._keyval_path())
1710
1711
showard170873e2009-01-07 00:22:26 +00001712 def _write_host_keyvals(self, host):
1713 keyval_path = os.path.join(self._execution_tag(), 'host_keyvals',
1714 host.hostname)
1715 platform, all_labels = host.platform_and_labels()
showard73ec0442009-02-07 02:05:20 +00001716 keyval_dict = dict(platform=platform, labels=','.join(all_labels))
1717 self._write_keyvals_before_job_helper(keyval_dict, keyval_path)
showardd8e548a2008-09-09 03:04:57 +00001718
1719
showard170873e2009-01-07 00:22:26 +00001720 def _execution_tag(self):
1721 return self.queue_entries[0].execution_tag()
mblighbb421852008-03-11 22:36:16 +00001722
1723
jadmanski0afbb632008-06-06 21:10:57 +00001724 def prolog(self):
showardd9205182009-04-27 20:09:55 +00001725 queued_key, queued_time = self._job_queued_keyval(self.job)
showardf1ae3542009-05-11 19:26:02 +00001726 keyval_dict = {queued_key: queued_time}
1727 if self.group_name:
1728 keyval_dict['host_group_name'] = self.group_name
1729 self._write_keyvals_before_job(keyval_dict)
jadmanski0afbb632008-06-06 21:10:57 +00001730 for queue_entry in self.queue_entries:
showard170873e2009-01-07 00:22:26 +00001731 self._write_host_keyvals(queue_entry.host)
jadmanski0afbb632008-06-06 21:10:57 +00001732 queue_entry.set_status('Running')
showard12f3e322009-05-13 21:27:42 +00001733 queue_entry.update_field('started_on', datetime.datetime.now())
jadmanski0afbb632008-06-06 21:10:57 +00001734 queue_entry.host.set_status('Running')
showard21baa452008-10-21 00:08:39 +00001735 queue_entry.host.update_field('dirty', 1)
showard2bab8f42008-11-12 18:15:22 +00001736 if self.job.synch_count == 1:
jadmanski0afbb632008-06-06 21:10:57 +00001737 assert len(self.queue_entries) == 1
1738 self.job.write_to_machines_file(self.queue_entries[0])
mbligh36768f02008-02-22 18:28:33 +00001739
1740
showard35162b02009-03-03 02:17:30 +00001741 def _write_lost_process_error_file(self):
1742 error_file_path = os.path.join(self._execution_tag(), 'job_failure')
1743 _drone_manager.write_lines_to_file(error_file_path,
1744 [_LOST_PROCESS_ERROR])
1745
1746
showardd3dc1992009-04-22 21:01:40 +00001747 def _finish_task(self):
showard08a36412009-05-05 01:01:13 +00001748 if not self.monitor:
1749 return
1750
showardd9205182009-04-27 20:09:55 +00001751 self._write_job_finished()
1752
showardd3dc1992009-04-22 21:01:40 +00001753 # both of these conditionals can be true, iff the process ran, wrote a
1754 # pid to its pidfile, and then exited without writing an exit code
showard35162b02009-03-03 02:17:30 +00001755 if self.monitor.has_process():
showardd3dc1992009-04-22 21:01:40 +00001756 gather_task = GatherLogsTask(self.job, self.queue_entries)
1757 self.agent.dispatcher.add_agent(Agent(tasks=[gather_task]))
showardb5626452009-06-30 01:57:28 +00001758 else:
1759 self._reboot_hosts(self.job, self.queue_entries,
1760 final_success=False, num_tests_failed=0)
showard35162b02009-03-03 02:17:30 +00001761
1762 if self.monitor.lost_process:
1763 self._write_lost_process_error_file()
1764 for queue_entry in self.queue_entries:
1765 queue_entry.set_status(models.HostQueueEntry.Status.FAILED)
jadmanskif7fa2cc2008-10-01 14:13:23 +00001766
1767
showardcbd74612008-11-19 21:42:02 +00001768 def _write_status_comment(self, comment):
showard170873e2009-01-07 00:22:26 +00001769 _drone_manager.write_lines_to_file(
1770 os.path.join(self._execution_tag(), 'status.log'),
1771 ['INFO\t----\t----\t' + comment],
showard35162b02009-03-03 02:17:30 +00001772 paired_with_process=self.monitor.get_process())
showardcbd74612008-11-19 21:42:02 +00001773
1774
jadmanskif7fa2cc2008-10-01 14:13:23 +00001775 def _log_abort(self):
showard170873e2009-01-07 00:22:26 +00001776 if not self.monitor or not self.monitor.has_process():
1777 return
1778
jadmanskif7fa2cc2008-10-01 14:13:23 +00001779 # build up sets of all the aborted_by and aborted_on values
1780 aborted_by, aborted_on = set(), set()
1781 for queue_entry in self.queue_entries:
1782 if queue_entry.aborted_by:
1783 aborted_by.add(queue_entry.aborted_by)
1784 t = int(time.mktime(queue_entry.aborted_on.timetuple()))
1785 aborted_on.add(t)
1786
1787 # extract some actual, unique aborted by value and write it out
1788 assert len(aborted_by) <= 1
1789 if len(aborted_by) == 1:
showardcbd74612008-11-19 21:42:02 +00001790 aborted_by_value = aborted_by.pop()
1791 aborted_on_value = max(aborted_on)
1792 else:
1793 aborted_by_value = 'autotest_system'
1794 aborted_on_value = int(time.time())
showard170873e2009-01-07 00:22:26 +00001795
showarda0382352009-02-11 23:36:43 +00001796 self._write_keyval_after_job("aborted_by", aborted_by_value)
1797 self._write_keyval_after_job("aborted_on", aborted_on_value)
showard170873e2009-01-07 00:22:26 +00001798
showardcbd74612008-11-19 21:42:02 +00001799 aborted_on_string = str(datetime.datetime.fromtimestamp(
1800 aborted_on_value))
1801 self._write_status_comment('Job aborted by %s on %s' %
1802 (aborted_by_value, aborted_on_string))
jadmanskic2ac77f2008-05-16 21:44:04 +00001803
1804
jadmanski0afbb632008-06-06 21:10:57 +00001805 def abort(self):
1806 super(QueueTask, self).abort()
jadmanskif7fa2cc2008-10-01 14:13:23 +00001807 self._log_abort()
showardd3dc1992009-04-22 21:01:40 +00001808 self._finish_task()
showard21baa452008-10-21 00:08:39 +00001809
1810
jadmanski0afbb632008-06-06 21:10:57 +00001811 def epilog(self):
1812 super(QueueTask, self).epilog()
showardd3dc1992009-04-22 21:01:40 +00001813 self._finish_task()
1814 logging.info("queue_task finished with success=%s", self.success)
mbligh36768f02008-02-22 18:28:33 +00001815
1816
mblighbb421852008-03-11 22:36:16 +00001817class RecoveryQueueTask(QueueTask):
jadmanski0afbb632008-06-06 21:10:57 +00001818 def __init__(self, job, queue_entries, run_monitor):
showard170873e2009-01-07 00:22:26 +00001819 super(RecoveryQueueTask, self).__init__(job, queue_entries, cmd=None)
showard5add1c82009-05-26 19:27:46 +00001820 self.monitor = run_monitor
1821 self.started = True
1822 # since we set started=True here, prolog() and run() shouldn't be called
mblighbb421852008-03-11 22:36:16 +00001823
1824
jadmanski0afbb632008-06-06 21:10:57 +00001825 def run(self):
showard5add1c82009-05-26 19:27:46 +00001826 raise NotImplemented('This should never be called')
mblighbb421852008-03-11 22:36:16 +00001827
1828
jadmanski0afbb632008-06-06 21:10:57 +00001829 def prolog(self):
showard5add1c82009-05-26 19:27:46 +00001830 raise NotImplemented('This should never be called')
mblighbb421852008-03-11 22:36:16 +00001831
1832
showardd3dc1992009-04-22 21:01:40 +00001833class PostJobTask(AgentTask):
1834 def __init__(self, queue_entries, pidfile_name, logfile_name,
1835 run_monitor=None):
1836 """
1837 If run_monitor != None, we're recovering a running task.
1838 """
1839 self._queue_entries = queue_entries
1840 self._pidfile_name = pidfile_name
showardd3dc1992009-04-22 21:01:40 +00001841
1842 self._execution_tag = self._get_consistent_execution_tag(queue_entries)
1843 self._results_dir = _drone_manager.absolute_path(self._execution_tag)
1844 self._autoserv_monitor = PidfileRunMonitor()
1845 self._autoserv_monitor.attach_to_existing_process(self._execution_tag)
1846 self._paired_with_pidfile = self._autoserv_monitor.pidfile_id
1847
1848 if _testing_mode:
1849 command = 'true'
1850 else:
1851 command = self._generate_command(self._results_dir)
1852
1853 super(PostJobTask, self).__init__(cmd=command,
1854 working_directory=self._execution_tag)
showard5add1c82009-05-26 19:27:46 +00001855 # this must happen *after* the super call
1856 self.monitor = run_monitor
1857 if run_monitor:
1858 self.started = True
showardd3dc1992009-04-22 21:01:40 +00001859
1860 self.log_file = os.path.join(self._execution_tag, logfile_name)
1861 self._final_status = self._determine_final_status()
1862
1863
1864 def _generate_command(self, results_dir):
1865 raise NotImplementedError('Subclasses must override this')
1866
1867
1868 def _job_was_aborted(self):
1869 was_aborted = None
1870 for queue_entry in self._queue_entries:
1871 queue_entry.update_from_database()
1872 if was_aborted is None: # first queue entry
1873 was_aborted = bool(queue_entry.aborted)
1874 elif was_aborted != bool(queue_entry.aborted): # subsequent entries
1875 email_manager.manager.enqueue_notify_email(
1876 'Inconsistent abort state',
1877 'Queue entries have inconsistent abort state: ' +
1878 ', '.join('%s (%s)' % (queue_entry, queue_entry.aborted)))
1879 # don't crash here, just assume true
1880 return True
1881 return was_aborted
1882
1883
1884 def _determine_final_status(self):
1885 if self._job_was_aborted():
1886 return models.HostQueueEntry.Status.ABORTED
1887
1888 # we'll use a PidfileRunMonitor to read the autoserv exit status
1889 if self._autoserv_monitor.exit_code() == 0:
1890 return models.HostQueueEntry.Status.COMPLETED
1891 return models.HostQueueEntry.Status.FAILED
1892
1893
1894 def run(self):
showard5add1c82009-05-26 19:27:46 +00001895 assert not self.monitor
showardd3dc1992009-04-22 21:01:40 +00001896
showard5add1c82009-05-26 19:27:46 +00001897 # make sure we actually have results to work with.
1898 # this should never happen in normal operation.
1899 if not self._autoserv_monitor.has_process():
1900 email_manager.manager.enqueue_notify_email(
1901 'No results in post-job task',
1902 'No results in post-job task at %s' %
1903 self._autoserv_monitor.pidfile_id)
1904 self.finished(False)
1905 return
1906
1907 super(PostJobTask, self).run(
1908 pidfile_name=self._pidfile_name,
1909 paired_with_pidfile=self._paired_with_pidfile)
showardd3dc1992009-04-22 21:01:40 +00001910
1911
1912 def _set_all_statuses(self, status):
1913 for queue_entry in self._queue_entries:
1914 queue_entry.set_status(status)
1915
1916
1917 def abort(self):
1918 # override AgentTask.abort() to avoid killing the process and ending
1919 # the task. post-job tasks continue when the job is aborted.
1920 pass
1921
1922
showardb5626452009-06-30 01:57:28 +00001923class GatherLogsTask(PostJobTask, CleanupHostsMixin):
showardd3dc1992009-04-22 21:01:40 +00001924 """
1925 Task responsible for
1926 * gathering uncollected logs (if Autoserv crashed hard or was killed)
1927 * copying logs to the results repository
1928 * spawning CleanupTasks for hosts, if necessary
1929 * spawning a FinalReparseTask for the job
1930 """
1931 def __init__(self, job, queue_entries, run_monitor=None):
1932 self._job = job
1933 super(GatherLogsTask, self).__init__(
1934 queue_entries, pidfile_name=_CRASHINFO_PID_FILE,
1935 logfile_name='.collect_crashinfo.log', run_monitor=run_monitor)
1936 self._set_ids(queue_entries=queue_entries)
1937
1938
1939 def _generate_command(self, results_dir):
1940 host_list = ','.join(queue_entry.host.hostname
1941 for queue_entry in self._queue_entries)
1942 return [_autoserv_path , '-p', '--collect-crashinfo', '-m', host_list,
1943 '-r', results_dir]
1944
1945
1946 def prolog(self):
1947 super(GatherLogsTask, self).prolog()
1948 self._set_all_statuses(models.HostQueueEntry.Status.GATHERING)
1949
1950
showardd3dc1992009-04-22 21:01:40 +00001951 def epilog(self):
1952 super(GatherLogsTask, self).epilog()
showardebc0fb72009-05-13 21:28:07 +00001953 if self._autoserv_monitor.has_process():
1954 self._copy_and_parse_results(self._queue_entries,
1955 use_monitor=self._autoserv_monitor)
showardb5626452009-06-30 01:57:28 +00001956
1957 final_success = (
1958 self._final_status == models.HostQueueEntry.Status.COMPLETED)
1959 num_tests_failed = self._autoserv_monitor.num_tests_failed()
1960 self._reboot_hosts(self._job, self._queue_entries, final_success,
1961 num_tests_failed)
showardd3dc1992009-04-22 21:01:40 +00001962
1963
showard0bbfc212009-04-29 21:06:13 +00001964 def run(self):
showard597bfd32009-05-08 18:22:50 +00001965 autoserv_exit_code = self._autoserv_monitor.exit_code()
1966 # only run if Autoserv exited due to some signal. if we have no exit
1967 # code, assume something bad (and signal-like) happened.
1968 if autoserv_exit_code is None or os.WIFSIGNALED(autoserv_exit_code):
showard0bbfc212009-04-29 21:06:13 +00001969 super(GatherLogsTask, self).run()
showard597bfd32009-05-08 18:22:50 +00001970 else:
1971 self.finished(True)
showard0bbfc212009-04-29 21:06:13 +00001972
1973
showard8fe93b52008-11-18 17:53:22 +00001974class CleanupTask(PreJobTask):
showardfa8629c2008-11-04 16:51:23 +00001975 def __init__(self, host=None, queue_entry=None):
1976 assert bool(host) ^ bool(queue_entry)
1977 if queue_entry:
1978 host = queue_entry.get_host()
showardfa8629c2008-11-04 16:51:23 +00001979 self.queue_entry = queue_entry
jadmanski0afbb632008-06-06 21:10:57 +00001980 self.host = host
showard170873e2009-01-07 00:22:26 +00001981
1982 self.create_temp_resultsdir('.cleanup')
showard87ba02a2009-04-20 19:37:32 +00001983 self.cmd = _autoserv_command_line(host.hostname, self.temp_results_dir,
1984 ['--cleanup'],
1985 queue_entry=queue_entry)
showarde788ea62008-11-17 21:02:47 +00001986 repair_task = RepairTask(host, queue_entry=queue_entry)
showard170873e2009-01-07 00:22:26 +00001987 super(CleanupTask, self).__init__(self.cmd, self.temp_results_dir,
1988 failure_tasks=[repair_task])
1989
1990 self._set_ids(host=host, queue_entries=[queue_entry])
1991 self.set_host_log_file('cleanup', self.host)
mbligh16c722d2008-03-05 00:58:44 +00001992
mblighd5c95802008-03-05 00:33:46 +00001993
jadmanski0afbb632008-06-06 21:10:57 +00001994 def prolog(self):
showard8fe93b52008-11-18 17:53:22 +00001995 super(CleanupTask, self).prolog()
showardb18134f2009-03-20 20:52:18 +00001996 logging.info("starting cleanup task for host: %s", self.host.hostname)
showard45ae8192008-11-05 19:32:53 +00001997 self.host.set_status("Cleaning")
mblighd5c95802008-03-05 00:33:46 +00001998
mblighd5c95802008-03-05 00:33:46 +00001999
showard21baa452008-10-21 00:08:39 +00002000 def epilog(self):
showard45ae8192008-11-05 19:32:53 +00002001 super(CleanupTask, self).epilog()
showard21baa452008-10-21 00:08:39 +00002002 if self.success:
showardfa8629c2008-11-04 16:51:23 +00002003 self.host.set_status('Ready')
showard21baa452008-10-21 00:08:39 +00002004 self.host.update_field('dirty', 0)
2005
2006
showardd3dc1992009-04-22 21:01:40 +00002007class FinalReparseTask(PostJobTask):
showard97aed502008-11-04 02:01:24 +00002008 _num_running_parses = 0
2009
showardd3dc1992009-04-22 21:01:40 +00002010 def __init__(self, queue_entries, run_monitor=None):
2011 super(FinalReparseTask, self).__init__(queue_entries,
2012 pidfile_name=_PARSER_PID_FILE,
2013 logfile_name='.parse.log',
2014 run_monitor=run_monitor)
showard170873e2009-01-07 00:22:26 +00002015 # don't use _set_ids, since we don't want to set the host_ids
2016 self.queue_entry_ids = [entry.id for entry in queue_entries]
showard5add1c82009-05-26 19:27:46 +00002017 self._parse_started = (run_monitor is not None)
showard97aed502008-11-04 02:01:24 +00002018
showard97aed502008-11-04 02:01:24 +00002019
2020 @classmethod
2021 def _increment_running_parses(cls):
2022 cls._num_running_parses += 1
2023
2024
2025 @classmethod
2026 def _decrement_running_parses(cls):
2027 cls._num_running_parses -= 1
2028
2029
2030 @classmethod
2031 def _can_run_new_parse(cls):
showardd1ee1dd2009-01-07 21:33:08 +00002032 return (cls._num_running_parses <
2033 scheduler_config.config.max_parse_processes)
showard97aed502008-11-04 02:01:24 +00002034
2035
2036 def prolog(self):
2037 super(FinalReparseTask, self).prolog()
showardd3dc1992009-04-22 21:01:40 +00002038 self._set_all_statuses(models.HostQueueEntry.Status.PARSING)
showard97aed502008-11-04 02:01:24 +00002039
2040
2041 def epilog(self):
2042 super(FinalReparseTask, self).epilog()
showardd3dc1992009-04-22 21:01:40 +00002043 self._set_all_statuses(self._final_status)
showard97aed502008-11-04 02:01:24 +00002044
2045
showardd3dc1992009-04-22 21:01:40 +00002046 def _generate_command(self, results_dir):
mbligh9e936402009-05-13 20:42:17 +00002047 return [_parser_path, '--write-pidfile', '-l', '2', '-r', '-o', '-P',
showardd3dc1992009-04-22 21:01:40 +00002048 results_dir]
showard97aed502008-11-04 02:01:24 +00002049
2050
showard08a36412009-05-05 01:01:13 +00002051 def tick(self):
2052 # override tick to keep trying to start until the parse count goes down
showard97aed502008-11-04 02:01:24 +00002053 # and we can, at which point we revert to default behavior
2054 if self._parse_started:
showard08a36412009-05-05 01:01:13 +00002055 super(FinalReparseTask, self).tick()
showard97aed502008-11-04 02:01:24 +00002056 else:
2057 self._try_starting_parse()
2058
2059
2060 def run(self):
2061 # override run() to not actually run unless we can
2062 self._try_starting_parse()
2063
2064
2065 def _try_starting_parse(self):
2066 if not self._can_run_new_parse():
2067 return
showard170873e2009-01-07 00:22:26 +00002068
showard97aed502008-11-04 02:01:24 +00002069 # actually run the parse command
showardd3dc1992009-04-22 21:01:40 +00002070 super(FinalReparseTask, self).run()
showard170873e2009-01-07 00:22:26 +00002071
showard97aed502008-11-04 02:01:24 +00002072 self._increment_running_parses()
2073 self._parse_started = True
2074
2075
2076 def finished(self, success):
2077 super(FinalReparseTask, self).finished(success)
showard678df4f2009-02-04 21:36:39 +00002078 if self._parse_started:
2079 self._decrement_running_parses()
showard97aed502008-11-04 02:01:24 +00002080
2081
showardc9ae1782009-01-30 01:42:37 +00002082class SetEntryPendingTask(AgentTask):
2083 def __init__(self, queue_entry):
2084 super(SetEntryPendingTask, self).__init__(cmd='')
2085 self._queue_entry = queue_entry
2086 self._set_ids(queue_entries=[queue_entry])
2087
2088
2089 def run(self):
2090 agent = self._queue_entry.on_pending()
2091 if agent:
2092 self.agent.dispatcher.add_agent(agent)
2093 self.finished(True)
2094
2095
showarda3c58572009-03-12 20:36:59 +00002096class DBError(Exception):
2097 """Raised by the DBObject constructor when its select fails."""
2098
2099
mbligh36768f02008-02-22 18:28:33 +00002100class DBObject(object):
showarda3c58572009-03-12 20:36:59 +00002101 """A miniature object relational model for the database."""
showard6ae5ea92009-02-25 00:11:51 +00002102
2103 # Subclasses MUST override these:
2104 _table_name = ''
2105 _fields = ()
2106
showarda3c58572009-03-12 20:36:59 +00002107 # A mapping from (type, id) to the instance of the object for that
2108 # particular id. This prevents us from creating new Job() and Host()
2109 # instances for every HostQueueEntry object that we instantiate as
2110 # multiple HQEs often share the same Job.
2111 _instances_by_type_and_id = weakref.WeakValueDictionary()
2112 _initialized = False
showard6ae5ea92009-02-25 00:11:51 +00002113
showarda3c58572009-03-12 20:36:59 +00002114
2115 def __new__(cls, id=None, **kwargs):
2116 """
2117 Look to see if we already have an instance for this particular type
2118 and id. If so, use it instead of creating a duplicate instance.
2119 """
2120 if id is not None:
2121 instance = cls._instances_by_type_and_id.get((cls, id))
2122 if instance:
2123 return instance
2124 return super(DBObject, cls).__new__(cls, id=id, **kwargs)
2125
2126
2127 def __init__(self, id=None, row=None, new_record=False, always_query=True):
jadmanski0afbb632008-06-06 21:10:57 +00002128 assert (bool(id) != bool(row))
showard6ae5ea92009-02-25 00:11:51 +00002129 assert self._table_name, '_table_name must be defined in your class'
2130 assert self._fields, '_fields must be defined in your class'
showarda3c58572009-03-12 20:36:59 +00002131 if not new_record:
2132 if self._initialized and not always_query:
2133 return # We've already been initialized.
2134 if id is None:
2135 id = row[0]
2136 # Tell future constructors to use us instead of re-querying while
2137 # this instance is still around.
2138 self._instances_by_type_and_id[(type(self), id)] = self
mbligh36768f02008-02-22 18:28:33 +00002139
showard6ae5ea92009-02-25 00:11:51 +00002140 self.__table = self._table_name
mbligh36768f02008-02-22 18:28:33 +00002141
jadmanski0afbb632008-06-06 21:10:57 +00002142 self.__new_record = new_record
mbligh36768f02008-02-22 18:28:33 +00002143
jadmanski0afbb632008-06-06 21:10:57 +00002144 if row is None:
showardccbd6c52009-03-21 00:10:21 +00002145 row = self._fetch_row_from_db(id)
mbligh36768f02008-02-22 18:28:33 +00002146
showarda3c58572009-03-12 20:36:59 +00002147 if self._initialized:
2148 differences = self._compare_fields_in_row(row)
2149 if differences:
showard7629f142009-03-27 21:02:02 +00002150 logging.warn(
2151 'initialized %s %s instance requery is updating: %s',
2152 type(self), self.id, differences)
showard2bab8f42008-11-12 18:15:22 +00002153 self._update_fields_from_row(row)
showarda3c58572009-03-12 20:36:59 +00002154 self._initialized = True
2155
2156
2157 @classmethod
2158 def _clear_instance_cache(cls):
2159 """Used for testing, clear the internal instance cache."""
2160 cls._instances_by_type_and_id.clear()
2161
2162
showardccbd6c52009-03-21 00:10:21 +00002163 def _fetch_row_from_db(self, row_id):
2164 sql = 'SELECT * FROM %s WHERE ID=%%s' % self.__table
2165 rows = _db.execute(sql, (row_id,))
2166 if not rows:
showard76e29d12009-04-15 21:53:10 +00002167 raise DBError("row not found (table=%s, row id=%s)"
2168 % (self.__table, row_id))
showardccbd6c52009-03-21 00:10:21 +00002169 return rows[0]
2170
2171
showarda3c58572009-03-12 20:36:59 +00002172 def _assert_row_length(self, row):
2173 assert len(row) == len(self._fields), (
2174 "table = %s, row = %s/%d, fields = %s/%d" % (
2175 self.__table, row, len(row), self._fields, len(self._fields)))
2176
2177
2178 def _compare_fields_in_row(self, row):
2179 """
2180 Given a row as returned by a SELECT query, compare it to our existing
2181 in memory fields.
2182
2183 @param row - A sequence of values corresponding to fields named in
2184 The class attribute _fields.
2185
2186 @returns A dictionary listing the differences keyed by field name
2187 containing tuples of (current_value, row_value).
2188 """
2189 self._assert_row_length(row)
2190 differences = {}
2191 for field, row_value in itertools.izip(self._fields, row):
2192 current_value = getattr(self, field)
2193 if current_value != row_value:
2194 differences[field] = (current_value, row_value)
2195 return differences
showard2bab8f42008-11-12 18:15:22 +00002196
2197
2198 def _update_fields_from_row(self, row):
showarda3c58572009-03-12 20:36:59 +00002199 """
2200 Update our field attributes using a single row returned by SELECT.
2201
2202 @param row - A sequence of values corresponding to fields named in
2203 the class fields list.
2204 """
2205 self._assert_row_length(row)
mbligh36768f02008-02-22 18:28:33 +00002206
showard2bab8f42008-11-12 18:15:22 +00002207 self._valid_fields = set()
showarda3c58572009-03-12 20:36:59 +00002208 for field, value in itertools.izip(self._fields, row):
showard2bab8f42008-11-12 18:15:22 +00002209 setattr(self, field, value)
2210 self._valid_fields.add(field)
mbligh36768f02008-02-22 18:28:33 +00002211
showard2bab8f42008-11-12 18:15:22 +00002212 self._valid_fields.remove('id')
mbligh36768f02008-02-22 18:28:33 +00002213
mblighe2586682008-02-29 22:45:46 +00002214
showardccbd6c52009-03-21 00:10:21 +00002215 def update_from_database(self):
2216 assert self.id is not None
2217 row = self._fetch_row_from_db(self.id)
2218 self._update_fields_from_row(row)
2219
2220
jadmanski0afbb632008-06-06 21:10:57 +00002221 def count(self, where, table = None):
2222 if not table:
2223 table = self.__table
mbligh36768f02008-02-22 18:28:33 +00002224
jadmanski0afbb632008-06-06 21:10:57 +00002225 rows = _db.execute("""
2226 SELECT count(*) FROM %s
2227 WHERE %s
2228 """ % (table, where))
mbligh6f8bab42008-02-29 22:45:14 +00002229
jadmanski0afbb632008-06-06 21:10:57 +00002230 assert len(rows) == 1
2231
2232 return int(rows[0][0])
mbligh36768f02008-02-22 18:28:33 +00002233
2234
showardd3dc1992009-04-22 21:01:40 +00002235 def update_field(self, field, value):
showard2bab8f42008-11-12 18:15:22 +00002236 assert field in self._valid_fields
mbligh36768f02008-02-22 18:28:33 +00002237
showard2bab8f42008-11-12 18:15:22 +00002238 if getattr(self, field) == value:
jadmanski0afbb632008-06-06 21:10:57 +00002239 return
mbligh36768f02008-02-22 18:28:33 +00002240
mblighf8c624d2008-07-03 16:58:45 +00002241 query = "UPDATE %s SET %s = %%s WHERE id = %%s" % (self.__table, field)
jadmanski0afbb632008-06-06 21:10:57 +00002242 _db.execute(query, (value, self.id))
2243
showard2bab8f42008-11-12 18:15:22 +00002244 setattr(self, field, value)
mbligh36768f02008-02-22 18:28:33 +00002245
2246
jadmanski0afbb632008-06-06 21:10:57 +00002247 def save(self):
2248 if self.__new_record:
showard6ae5ea92009-02-25 00:11:51 +00002249 keys = self._fields[1:] # avoid id
jadmanski0afbb632008-06-06 21:10:57 +00002250 columns = ','.join([str(key) for key in keys])
showard76e29d12009-04-15 21:53:10 +00002251 values = []
2252 for key in keys:
2253 value = getattr(self, key)
2254 if value is None:
2255 values.append('NULL')
2256 else:
2257 values.append('"%s"' % value)
showard89f84db2009-03-12 20:39:13 +00002258 values_str = ','.join(values)
2259 query = ('INSERT INTO %s (%s) VALUES (%s)' %
2260 (self.__table, columns, values_str))
jadmanski0afbb632008-06-06 21:10:57 +00002261 _db.execute(query)
showard89f84db2009-03-12 20:39:13 +00002262 # Update our id to the one the database just assigned to us.
2263 self.id = _db.execute('SELECT LAST_INSERT_ID()')[0][0]
mbligh36768f02008-02-22 18:28:33 +00002264
2265
jadmanski0afbb632008-06-06 21:10:57 +00002266 def delete(self):
showarda3c58572009-03-12 20:36:59 +00002267 self._instances_by_type_and_id.pop((type(self), id), None)
2268 self._initialized = False
2269 self._valid_fields.clear()
jadmanski0afbb632008-06-06 21:10:57 +00002270 query = 'DELETE FROM %s WHERE id=%%s' % self.__table
2271 _db.execute(query, (self.id,))
mblighe2586682008-02-29 22:45:46 +00002272
2273
showard63a34772008-08-18 19:32:50 +00002274 @staticmethod
2275 def _prefix_with(string, prefix):
2276 if string:
2277 string = prefix + string
2278 return string
2279
2280
jadmanski0afbb632008-06-06 21:10:57 +00002281 @classmethod
showard989f25d2008-10-01 11:38:11 +00002282 def fetch(cls, where='', params=(), joins='', order_by=''):
showard89f84db2009-03-12 20:39:13 +00002283 """
2284 Construct instances of our class based on the given database query.
2285
2286 @yields One class instance for each row fetched.
2287 """
showard63a34772008-08-18 19:32:50 +00002288 order_by = cls._prefix_with(order_by, 'ORDER BY ')
2289 where = cls._prefix_with(where, 'WHERE ')
2290 query = ('SELECT %(table)s.* FROM %(table)s %(joins)s '
showard6ae5ea92009-02-25 00:11:51 +00002291 '%(where)s %(order_by)s' % {'table' : cls._table_name,
showard63a34772008-08-18 19:32:50 +00002292 'joins' : joins,
2293 'where' : where,
2294 'order_by' : order_by})
2295 rows = _db.execute(query, params)
jadmanski0afbb632008-06-06 21:10:57 +00002296 for row in rows:
2297 yield cls(row=row)
mblighe2586682008-02-29 22:45:46 +00002298
mbligh36768f02008-02-22 18:28:33 +00002299
2300class IneligibleHostQueue(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002301 _table_name = 'ineligible_host_queues'
2302 _fields = ('id', 'job_id', 'host_id')
showard04c82c52008-05-29 19:38:12 +00002303
2304
showard89f84db2009-03-12 20:39:13 +00002305class AtomicGroup(DBObject):
2306 _table_name = 'atomic_groups'
showard205fd602009-03-21 00:17:35 +00002307 _fields = ('id', 'name', 'description', 'max_number_of_machines',
2308 'invalid')
showard89f84db2009-03-12 20:39:13 +00002309
2310
showard989f25d2008-10-01 11:38:11 +00002311class Label(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002312 _table_name = 'labels'
2313 _fields = ('id', 'name', 'kernel_config', 'platform', 'invalid',
showard89f84db2009-03-12 20:39:13 +00002314 'only_if_needed', 'atomic_group_id')
showard989f25d2008-10-01 11:38:11 +00002315
2316
mbligh36768f02008-02-22 18:28:33 +00002317class Host(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002318 _table_name = 'hosts'
2319 _fields = ('id', 'hostname', 'locked', 'synch_id', 'status',
2320 'invalid', 'protection', 'locked_by_id', 'lock_time', 'dirty')
2321
2322
jadmanski0afbb632008-06-06 21:10:57 +00002323 def current_task(self):
2324 rows = _db.execute("""
2325 SELECT * FROM host_queue_entries WHERE host_id=%s AND NOT complete AND active
2326 """, (self.id,))
2327
2328 if len(rows) == 0:
2329 return None
2330 else:
2331 assert len(rows) == 1
2332 results = rows[0];
jadmanski0afbb632008-06-06 21:10:57 +00002333 return HostQueueEntry(row=results)
mbligh36768f02008-02-22 18:28:33 +00002334
2335
jadmanski0afbb632008-06-06 21:10:57 +00002336 def yield_work(self):
showardb18134f2009-03-20 20:52:18 +00002337 logging.info("%s yielding work", self.hostname)
jadmanski0afbb632008-06-06 21:10:57 +00002338 if self.current_task():
2339 self.current_task().requeue()
2340
showard6ae5ea92009-02-25 00:11:51 +00002341
jadmanski0afbb632008-06-06 21:10:57 +00002342 def set_status(self,status):
showardb18134f2009-03-20 20:52:18 +00002343 logging.info('%s -> %s', self.hostname, status)
jadmanski0afbb632008-06-06 21:10:57 +00002344 self.update_field('status',status)
mbligh36768f02008-02-22 18:28:33 +00002345
2346
showard170873e2009-01-07 00:22:26 +00002347 def platform_and_labels(self):
showardd8e548a2008-09-09 03:04:57 +00002348 """
showard170873e2009-01-07 00:22:26 +00002349 Returns a tuple (platform_name, list_of_all_label_names).
showardd8e548a2008-09-09 03:04:57 +00002350 """
2351 rows = _db.execute("""
showard170873e2009-01-07 00:22:26 +00002352 SELECT labels.name, labels.platform
showardd8e548a2008-09-09 03:04:57 +00002353 FROM labels
2354 INNER JOIN hosts_labels ON labels.id = hosts_labels.label_id
showard170873e2009-01-07 00:22:26 +00002355 WHERE hosts_labels.host_id = %s
showardd8e548a2008-09-09 03:04:57 +00002356 ORDER BY labels.name
2357 """, (self.id,))
showard170873e2009-01-07 00:22:26 +00002358 platform = None
2359 all_labels = []
2360 for label_name, is_platform in rows:
2361 if is_platform:
2362 platform = label_name
2363 all_labels.append(label_name)
2364 return platform, all_labels
2365
2366
showarda64e52a2009-06-08 23:24:08 +00002367 def reverify_tasks(self, cleanup=True):
2368 tasks = [VerifyTask(host=self)]
showard6d7b2ff2009-06-10 00:16:47 +00002369 # just to make sure this host does not get taken away
2370 self.set_status('Verifying')
showarda64e52a2009-06-08 23:24:08 +00002371 if cleanup:
2372 tasks.insert(0, CleanupTask(host=self))
showard6d7b2ff2009-06-10 00:16:47 +00002373 self.set_status('Cleaning')
showarda64e52a2009-06-08 23:24:08 +00002374 return tasks
showardd8e548a2008-09-09 03:04:57 +00002375
2376
showard54c1ea92009-05-20 00:32:58 +00002377 _ALPHANUM_HOST_RE = re.compile(r'^([a-z-]+)(\d+)$', re.IGNORECASE)
2378
2379
2380 @classmethod
2381 def cmp_for_sort(cls, a, b):
2382 """
2383 A comparison function for sorting Host objects by hostname.
2384
2385 This strips any trailing numeric digits, ignores leading 0s and
2386 compares hostnames by the leading name and the trailing digits as a
2387 number. If both hostnames do not match this pattern, they are simply
2388 compared as lower case strings.
2389
2390 Example of how hostnames will be sorted:
2391
2392 alice, host1, host2, host09, host010, host10, host11, yolkfolk
2393
2394 This hopefully satisfy most people's hostname sorting needs regardless
2395 of their exact naming schemes. Nobody sane should have both a host10
2396 and host010 (but the algorithm works regardless).
2397 """
2398 lower_a = a.hostname.lower()
2399 lower_b = b.hostname.lower()
2400 match_a = cls._ALPHANUM_HOST_RE.match(lower_a)
2401 match_b = cls._ALPHANUM_HOST_RE.match(lower_b)
2402 if match_a and match_b:
2403 name_a, number_a_str = match_a.groups()
2404 name_b, number_b_str = match_b.groups()
2405 number_a = int(number_a_str.lstrip('0'))
2406 number_b = int(number_b_str.lstrip('0'))
2407 result = cmp((name_a, number_a), (name_b, number_b))
2408 if result == 0 and lower_a != lower_b:
2409 # If they compared equal above but the lower case names are
2410 # indeed different, don't report equality. abc012 != abc12.
2411 return cmp(lower_a, lower_b)
2412 return result
2413 else:
2414 return cmp(lower_a, lower_b)
2415
2416
mbligh36768f02008-02-22 18:28:33 +00002417class HostQueueEntry(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002418 _table_name = 'host_queue_entries'
2419 _fields = ('id', 'job_id', 'host_id', 'status', 'meta_host',
showard89f84db2009-03-12 20:39:13 +00002420 'active', 'complete', 'deleted', 'execution_subdir',
showard12f3e322009-05-13 21:27:42 +00002421 'atomic_group_id', 'aborted', 'started_on')
showard6ae5ea92009-02-25 00:11:51 +00002422
2423
showarda3c58572009-03-12 20:36:59 +00002424 def __init__(self, id=None, row=None, **kwargs):
jadmanski0afbb632008-06-06 21:10:57 +00002425 assert id or row
showarda3c58572009-03-12 20:36:59 +00002426 super(HostQueueEntry, self).__init__(id=id, row=row, **kwargs)
jadmanski0afbb632008-06-06 21:10:57 +00002427 self.job = Job(self.job_id)
mbligh36768f02008-02-22 18:28:33 +00002428
jadmanski0afbb632008-06-06 21:10:57 +00002429 if self.host_id:
2430 self.host = Host(self.host_id)
2431 else:
2432 self.host = None
mbligh36768f02008-02-22 18:28:33 +00002433
showard77182562009-06-10 00:16:05 +00002434 if self.atomic_group_id:
2435 self.atomic_group = AtomicGroup(self.atomic_group_id,
2436 always_query=False)
2437 else:
2438 self.atomic_group = None
2439
showard170873e2009-01-07 00:22:26 +00002440 self.queue_log_path = os.path.join(self.job.tag(),
jadmanski0afbb632008-06-06 21:10:57 +00002441 'queue.log.' + str(self.id))
mbligh36768f02008-02-22 18:28:33 +00002442
2443
showard89f84db2009-03-12 20:39:13 +00002444 @classmethod
2445 def clone(cls, template):
2446 """
2447 Creates a new row using the values from a template instance.
2448
2449 The new instance will not exist in the database or have a valid
2450 id attribute until its save() method is called.
2451 """
2452 assert isinstance(template, cls)
2453 new_row = [getattr(template, field) for field in cls._fields]
2454 clone = cls(row=new_row, new_record=True)
2455 clone.id = None
2456 return clone
2457
2458
showardc85c21b2008-11-24 22:17:37 +00002459 def _view_job_url(self):
2460 return "%s#tab_id=view_job&object_id=%s" % (_base_url, self.job.id)
2461
2462
showardf1ae3542009-05-11 19:26:02 +00002463 def get_labels(self):
2464 """
2465 Get all labels associated with this host queue entry (either via the
2466 meta_host or as a job dependency label). The labels yielded are not
2467 guaranteed to be unique.
2468
2469 @yields Label instances associated with this host_queue_entry.
2470 """
2471 if self.meta_host:
2472 yield Label(id=self.meta_host, always_query=False)
2473 labels = Label.fetch(
2474 joins="JOIN jobs_dependency_labels AS deps "
2475 "ON (labels.id = deps.label_id)",
2476 where="deps.job_id = %d" % self.job.id)
2477 for label in labels:
2478 yield label
2479
2480
jadmanski0afbb632008-06-06 21:10:57 +00002481 def set_host(self, host):
2482 if host:
2483 self.queue_log_record('Assigning host ' + host.hostname)
2484 self.update_field('host_id', host.id)
2485 self.update_field('active', True)
2486 self.block_host(host.id)
2487 else:
2488 self.queue_log_record('Releasing host')
2489 self.unblock_host(self.host.id)
2490 self.update_field('host_id', None)
mbligh36768f02008-02-22 18:28:33 +00002491
jadmanski0afbb632008-06-06 21:10:57 +00002492 self.host = host
mbligh36768f02008-02-22 18:28:33 +00002493
2494
jadmanski0afbb632008-06-06 21:10:57 +00002495 def get_host(self):
2496 return self.host
mbligh36768f02008-02-22 18:28:33 +00002497
2498
jadmanski0afbb632008-06-06 21:10:57 +00002499 def queue_log_record(self, log_line):
2500 now = str(datetime.datetime.now())
showard170873e2009-01-07 00:22:26 +00002501 _drone_manager.write_lines_to_file(self.queue_log_path,
2502 [now + ' ' + log_line])
mbligh36768f02008-02-22 18:28:33 +00002503
2504
jadmanski0afbb632008-06-06 21:10:57 +00002505 def block_host(self, host_id):
showardb18134f2009-03-20 20:52:18 +00002506 logging.info("creating block %s/%s", self.job.id, host_id)
jadmanski0afbb632008-06-06 21:10:57 +00002507 row = [0, self.job.id, host_id]
2508 block = IneligibleHostQueue(row=row, new_record=True)
2509 block.save()
mblighe2586682008-02-29 22:45:46 +00002510
2511
jadmanski0afbb632008-06-06 21:10:57 +00002512 def unblock_host(self, host_id):
showardb18134f2009-03-20 20:52:18 +00002513 logging.info("removing block %s/%s", self.job.id, host_id)
jadmanski0afbb632008-06-06 21:10:57 +00002514 blocks = IneligibleHostQueue.fetch(
2515 'job_id=%d and host_id=%d' % (self.job.id, host_id))
2516 for block in blocks:
2517 block.delete()
mblighe2586682008-02-29 22:45:46 +00002518
2519
showard2bab8f42008-11-12 18:15:22 +00002520 def set_execution_subdir(self, subdir=None):
2521 if subdir is None:
2522 assert self.get_host()
2523 subdir = self.get_host().hostname
2524 self.update_field('execution_subdir', subdir)
mbligh36768f02008-02-22 18:28:33 +00002525
2526
showard6355f6b2008-12-05 18:52:13 +00002527 def _get_hostname(self):
2528 if self.host:
2529 return self.host.hostname
2530 return 'no host'
2531
2532
showard170873e2009-01-07 00:22:26 +00002533 def __str__(self):
2534 return "%s/%d (%d)" % (self._get_hostname(), self.job.id, self.id)
2535
2536
jadmanski0afbb632008-06-06 21:10:57 +00002537 def set_status(self, status):
showardd3dc1992009-04-22 21:01:40 +00002538 self.update_field('status', status)
mblighf8c624d2008-07-03 16:58:45 +00002539
showardb18134f2009-03-20 20:52:18 +00002540 logging.info("%s -> %s", self, self.status)
mblighf8c624d2008-07-03 16:58:45 +00002541
showardc85c21b2008-11-24 22:17:37 +00002542 if status in ['Queued', 'Parsing']:
jadmanski0afbb632008-06-06 21:10:57 +00002543 self.update_field('complete', False)
2544 self.update_field('active', False)
mbligh36768f02008-02-22 18:28:33 +00002545
jadmanski0afbb632008-06-06 21:10:57 +00002546 if status in ['Pending', 'Running', 'Verifying', 'Starting',
showardd3dc1992009-04-22 21:01:40 +00002547 'Gathering']:
jadmanski0afbb632008-06-06 21:10:57 +00002548 self.update_field('complete', False)
2549 self.update_field('active', True)
mbligh36768f02008-02-22 18:28:33 +00002550
showardc85c21b2008-11-24 22:17:37 +00002551 if status in ['Failed', 'Completed', 'Stopped', 'Aborted']:
jadmanski0afbb632008-06-06 21:10:57 +00002552 self.update_field('complete', True)
2553 self.update_field('active', False)
showardc85c21b2008-11-24 22:17:37 +00002554
2555 should_email_status = (status.lower() in _notify_email_statuses or
2556 'all' in _notify_email_statuses)
2557 if should_email_status:
2558 self._email_on_status(status)
2559
2560 self._email_on_job_complete()
2561
2562
2563 def _email_on_status(self, status):
showard6355f6b2008-12-05 18:52:13 +00002564 hostname = self._get_hostname()
showardc85c21b2008-11-24 22:17:37 +00002565
2566 subject = 'Autotest: Job ID: %s "%s" Host: %s %s' % (
2567 self.job.id, self.job.name, hostname, status)
2568 body = "Job ID: %s\nJob Name: %s\nHost: %s\nStatus: %s\n%s\n" % (
2569 self.job.id, self.job.name, hostname, status,
2570 self._view_job_url())
showard170873e2009-01-07 00:22:26 +00002571 email_manager.manager.send_email(self.job.email_list, subject, body)
showard542e8402008-09-19 20:16:18 +00002572
2573
2574 def _email_on_job_complete(self):
showardc85c21b2008-11-24 22:17:37 +00002575 if not self.job.is_finished():
2576 return
showard542e8402008-09-19 20:16:18 +00002577
showardc85c21b2008-11-24 22:17:37 +00002578 summary_text = []
showard6355f6b2008-12-05 18:52:13 +00002579 hosts_queue = HostQueueEntry.fetch('job_id = %s' % self.job.id)
showardc85c21b2008-11-24 22:17:37 +00002580 for queue_entry in hosts_queue:
2581 summary_text.append("Host: %s Status: %s" %
showard6355f6b2008-12-05 18:52:13 +00002582 (queue_entry._get_hostname(),
showardc85c21b2008-11-24 22:17:37 +00002583 queue_entry.status))
2584
2585 summary_text = "\n".join(summary_text)
2586 status_counts = models.Job.objects.get_status_counts(
2587 [self.job.id])[self.job.id]
2588 status = ', '.join('%d %s' % (count, status) for status, count
2589 in status_counts.iteritems())
2590
2591 subject = 'Autotest: Job ID: %s "%s" %s' % (
2592 self.job.id, self.job.name, status)
2593 body = "Job ID: %s\nJob Name: %s\nStatus: %s\n%s\nSummary:\n%s" % (
2594 self.job.id, self.job.name, status, self._view_job_url(),
2595 summary_text)
showard170873e2009-01-07 00:22:26 +00002596 email_manager.manager.send_email(self.job.email_list, subject, body)
mbligh36768f02008-02-22 18:28:33 +00002597
2598
showard77182562009-06-10 00:16:05 +00002599 def run_pre_job_tasks(self, assigned_host=None):
2600 if self.meta_host is not None or self.atomic_group:
jadmanski0afbb632008-06-06 21:10:57 +00002601 assert assigned_host
2602 # ensure results dir exists for the queue log
showard08356c12009-06-15 20:24:01 +00002603 if self.host_id is None:
2604 self.set_host(assigned_host)
2605 else:
2606 assert assigned_host.id == self.host_id
mbligh36768f02008-02-22 18:28:33 +00002607
showardb18134f2009-03-20 20:52:18 +00002608 logging.info("%s/%s/%s scheduled on %s, status=%s",
2609 self.job.name, self.meta_host, self.atomic_group_id,
2610 self.host.hostname, self.status)
mbligh36768f02008-02-22 18:28:33 +00002611
showard77182562009-06-10 00:16:05 +00002612 return self._do_run_pre_job_tasks()
2613
2614
2615 def _do_run_pre_job_tasks(self):
2616 # Every host goes thru the Verifying stage (which may or may not
2617 # actually do anything as determined by get_pre_job_tasks).
2618 self.set_status(models.HostQueueEntry.Status.VERIFYING)
2619
2620 # The pre job tasks always end with a SetEntryPendingTask which
2621 # will continue as appropriate through queue_entry.on_pending().
2622 return Agent(self.job.get_pre_job_tasks(queue_entry=self))
mblighe2586682008-02-29 22:45:46 +00002623
showard6ae5ea92009-02-25 00:11:51 +00002624
jadmanski0afbb632008-06-06 21:10:57 +00002625 def requeue(self):
2626 self.set_status('Queued')
showard12f3e322009-05-13 21:27:42 +00002627 self.update_field('started_on', None)
showardde634ee2009-01-30 01:44:24 +00002628 # verify/cleanup failure sets the execution subdir, so reset it here
2629 self.set_execution_subdir('')
jadmanski0afbb632008-06-06 21:10:57 +00002630 if self.meta_host:
2631 self.set_host(None)
mbligh36768f02008-02-22 18:28:33 +00002632
2633
jadmanski0afbb632008-06-06 21:10:57 +00002634 def handle_host_failure(self):
2635 """\
2636 Called when this queue entry's host has failed verification and
2637 repair.
2638 """
2639 assert not self.meta_host
2640 self.set_status('Failed')
showard2bab8f42008-11-12 18:15:22 +00002641 self.job.stop_if_necessary()
mblighe2586682008-02-29 22:45:46 +00002642
2643
jadmanskif7fa2cc2008-10-01 14:13:23 +00002644 @property
2645 def aborted_by(self):
2646 self._load_abort_info()
2647 return self._aborted_by
2648
2649
2650 @property
2651 def aborted_on(self):
2652 self._load_abort_info()
2653 return self._aborted_on
2654
2655
2656 def _load_abort_info(self):
2657 """ Fetch info about who aborted the job. """
2658 if hasattr(self, "_aborted_by"):
2659 return
2660 rows = _db.execute("""
2661 SELECT users.login, aborted_host_queue_entries.aborted_on
2662 FROM aborted_host_queue_entries
2663 INNER JOIN users
2664 ON users.id = aborted_host_queue_entries.aborted_by_id
2665 WHERE aborted_host_queue_entries.queue_entry_id = %s
2666 """, (self.id,))
2667 if rows:
2668 self._aborted_by, self._aborted_on = rows[0]
2669 else:
2670 self._aborted_by = self._aborted_on = None
2671
2672
showardb2e2c322008-10-14 17:33:55 +00002673 def on_pending(self):
2674 """
2675 Called when an entry in a synchronous job has passed verify. If the
2676 job is ready to run, returns an agent to run the job. Returns None
2677 otherwise.
2678 """
2679 self.set_status('Pending')
showardcfd66a32008-10-15 20:31:48 +00002680 self.get_host().set_status('Pending')
showard77182562009-06-10 00:16:05 +00002681 return self.job.run_if_ready(queue_entry=self)
showardb2e2c322008-10-14 17:33:55 +00002682
2683
showardd3dc1992009-04-22 21:01:40 +00002684 def abort(self, dispatcher):
2685 assert self.aborted and not self.complete
showard1be97432008-10-17 15:30:45 +00002686
showardd3dc1992009-04-22 21:01:40 +00002687 Status = models.HostQueueEntry.Status
2688 has_running_job_agent = (
2689 self.status in (Status.RUNNING, Status.GATHERING, Status.PARSING)
2690 and dispatcher.get_agents_for_entry(self))
2691 if has_running_job_agent:
2692 # do nothing; post-job tasks will finish and then mark this entry
2693 # with status "Aborted" and take care of the host
2694 return
2695
2696 if self.status in (Status.STARTING, Status.PENDING):
2697 self.host.set_status(models.Host.Status.READY)
2698 elif self.status == Status.VERIFYING:
2699 dispatcher.add_agent(Agent(tasks=self.host.reverify_tasks()))
2700
2701 self.set_status(Status.ABORTED)
showard170873e2009-01-07 00:22:26 +00002702
2703 def execution_tag(self):
2704 assert self.execution_subdir
2705 return "%s-%s/%s" % (self.job.id, self.job.owner, self.execution_subdir)
showard1be97432008-10-17 15:30:45 +00002706
2707
mbligh36768f02008-02-22 18:28:33 +00002708class Job(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002709 _table_name = 'jobs'
2710 _fields = ('id', 'owner', 'name', 'priority', 'control_file',
2711 'control_type', 'created_on', 'synch_count', 'timeout',
showarda1e74b32009-05-12 17:32:04 +00002712 'run_verify', 'email_list', 'reboot_before', 'reboot_after',
showard12f3e322009-05-13 21:27:42 +00002713 'parse_failed_repair', 'max_runtime_hrs')
showard6ae5ea92009-02-25 00:11:51 +00002714
showard77182562009-06-10 00:16:05 +00002715 # This does not need to be a column in the DB. The delays are likely to
2716 # be configured short. If the scheduler is stopped and restarted in
2717 # the middle of a job's delay cycle, the delay cycle will either be
2718 # repeated or skipped depending on the number of Pending machines found
2719 # when the restarted scheduler recovers to track it. Not a problem.
2720 #
2721 # A reference to the DelayedCallTask that will wake up the job should
2722 # no other HQEs change state in time. Its end_time attribute is used
2723 # by our run_with_ready_delay() method to determine if the wait is over.
2724 _delay_ready_task = None
2725
2726 # TODO(gps): On scheduler start/recovery we need to call HQE.on_pending() on
2727 # all status='Pending' atomic group HQEs incase a delay was running when the
2728 # scheduler was restarted and no more hosts ever successfully exit Verify.
showard6ae5ea92009-02-25 00:11:51 +00002729
showarda3c58572009-03-12 20:36:59 +00002730 def __init__(self, id=None, row=None, **kwargs):
jadmanski0afbb632008-06-06 21:10:57 +00002731 assert id or row
showarda3c58572009-03-12 20:36:59 +00002732 super(Job, self).__init__(id=id, row=row, **kwargs)
mbligh36768f02008-02-22 18:28:33 +00002733
mblighe2586682008-02-29 22:45:46 +00002734
jadmanski0afbb632008-06-06 21:10:57 +00002735 def is_server_job(self):
2736 return self.control_type != 2
mbligh36768f02008-02-22 18:28:33 +00002737
2738
showard170873e2009-01-07 00:22:26 +00002739 def tag(self):
2740 return "%s-%s" % (self.id, self.owner)
2741
2742
jadmanski0afbb632008-06-06 21:10:57 +00002743 def get_host_queue_entries(self):
2744 rows = _db.execute("""
2745 SELECT * FROM host_queue_entries
2746 WHERE job_id= %s
2747 """, (self.id,))
2748 entries = [HostQueueEntry(row=i) for i in rows]
mbligh36768f02008-02-22 18:28:33 +00002749
jadmanski0afbb632008-06-06 21:10:57 +00002750 assert len(entries)>0
mbligh36768f02008-02-22 18:28:33 +00002751
jadmanski0afbb632008-06-06 21:10:57 +00002752 return entries
mbligh36768f02008-02-22 18:28:33 +00002753
2754
jadmanski0afbb632008-06-06 21:10:57 +00002755 def set_status(self, status, update_queues=False):
2756 self.update_field('status',status)
2757
2758 if update_queues:
2759 for queue_entry in self.get_host_queue_entries():
2760 queue_entry.set_status(status)
mbligh36768f02008-02-22 18:28:33 +00002761
2762
showard77182562009-06-10 00:16:05 +00002763 def _atomic_and_has_started(self):
2764 """
2765 @returns True if any of the HostQueueEntries associated with this job
2766 have entered the Status.STARTING state or beyond.
2767 """
2768 atomic_entries = models.HostQueueEntry.objects.filter(
2769 job=self.id, atomic_group__isnull=False)
2770 if atomic_entries.count() <= 0:
2771 return False
2772
showardaf8b4ca2009-06-16 18:47:26 +00002773 # These states may *only* be reached if Job.run() has been called.
2774 started_statuses = (models.HostQueueEntry.Status.STARTING,
2775 models.HostQueueEntry.Status.RUNNING,
2776 models.HostQueueEntry.Status.COMPLETED)
2777
2778 started_entries = atomic_entries.filter(status__in=started_statuses)
showard77182562009-06-10 00:16:05 +00002779 return started_entries.count() > 0
2780
2781
2782 def _pending_count(self):
2783 """The number of HostQueueEntries for this job in the Pending state."""
2784 pending_entries = models.HostQueueEntry.objects.filter(
2785 job=self.id, status=models.HostQueueEntry.Status.PENDING)
2786 return pending_entries.count()
2787
2788
jadmanski0afbb632008-06-06 21:10:57 +00002789 def is_ready(self):
showard77182562009-06-10 00:16:05 +00002790 # NOTE: Atomic group jobs stop reporting ready after they have been
2791 # started to avoid launching multiple copies of one atomic job.
2792 # Only possible if synch_count is less than than half the number of
2793 # machines in the atomic group.
2794 return (self._pending_count() >= self.synch_count
2795 and not self._atomic_and_has_started())
mbligh36768f02008-02-22 18:28:33 +00002796
2797
jadmanski0afbb632008-06-06 21:10:57 +00002798 def num_machines(self, clause = None):
2799 sql = "job_id=%s" % self.id
2800 if clause:
2801 sql += " AND (%s)" % clause
2802 return self.count(sql, table='host_queue_entries')
mbligh36768f02008-02-22 18:28:33 +00002803
2804
jadmanski0afbb632008-06-06 21:10:57 +00002805 def num_queued(self):
2806 return self.num_machines('not complete')
mbligh36768f02008-02-22 18:28:33 +00002807
2808
jadmanski0afbb632008-06-06 21:10:57 +00002809 def num_active(self):
2810 return self.num_machines('active')
mbligh36768f02008-02-22 18:28:33 +00002811
2812
jadmanski0afbb632008-06-06 21:10:57 +00002813 def num_complete(self):
2814 return self.num_machines('complete')
mbligh36768f02008-02-22 18:28:33 +00002815
2816
jadmanski0afbb632008-06-06 21:10:57 +00002817 def is_finished(self):
showardc85c21b2008-11-24 22:17:37 +00002818 return self.num_complete() == self.num_machines()
mbligh36768f02008-02-22 18:28:33 +00002819
mbligh36768f02008-02-22 18:28:33 +00002820
showard6bb7c292009-01-30 01:44:51 +00002821 def _not_yet_run_entries(self, include_verifying=True):
2822 statuses = [models.HostQueueEntry.Status.QUEUED,
2823 models.HostQueueEntry.Status.PENDING]
2824 if include_verifying:
2825 statuses.append(models.HostQueueEntry.Status.VERIFYING)
2826 return models.HostQueueEntry.objects.filter(job=self.id,
2827 status__in=statuses)
2828
2829
2830 def _stop_all_entries(self):
2831 entries_to_stop = self._not_yet_run_entries(
2832 include_verifying=False)
2833 for child_entry in entries_to_stop:
showard4f9e5372009-01-07 21:33:38 +00002834 assert not child_entry.complete, (
2835 '%s status=%s, active=%s, complete=%s' %
2836 (child_entry.id, child_entry.status, child_entry.active,
2837 child_entry.complete))
showard2bab8f42008-11-12 18:15:22 +00002838 if child_entry.status == models.HostQueueEntry.Status.PENDING:
2839 child_entry.host.status = models.Host.Status.READY
2840 child_entry.host.save()
2841 child_entry.status = models.HostQueueEntry.Status.STOPPED
2842 child_entry.save()
2843
showard2bab8f42008-11-12 18:15:22 +00002844 def stop_if_necessary(self):
showard6bb7c292009-01-30 01:44:51 +00002845 not_yet_run = self._not_yet_run_entries()
showard2bab8f42008-11-12 18:15:22 +00002846 if not_yet_run.count() < self.synch_count:
showard6bb7c292009-01-30 01:44:51 +00002847 self._stop_all_entries()
mblighe2586682008-02-29 22:45:46 +00002848
2849
jadmanski0afbb632008-06-06 21:10:57 +00002850 def write_to_machines_file(self, queue_entry):
2851 hostname = queue_entry.get_host().hostname
showard170873e2009-01-07 00:22:26 +00002852 file_path = os.path.join(self.tag(), '.machines')
2853 _drone_manager.write_lines_to_file(file_path, [hostname])
mbligh36768f02008-02-22 18:28:33 +00002854
2855
showardf1ae3542009-05-11 19:26:02 +00002856 def _next_group_name(self, group_name=''):
2857 """@returns a directory name to use for the next host group results."""
2858 if group_name:
2859 # Sanitize for use as a pathname.
2860 group_name = group_name.replace(os.path.sep, '_')
2861 if group_name.startswith('.'):
2862 group_name = '_' + group_name[1:]
2863 # Add a separator between the group name and 'group%d'.
2864 group_name += '.'
2865 group_count_re = re.compile(r'%sgroup(\d+)' % re.escape(group_name))
showard2bab8f42008-11-12 18:15:22 +00002866 query = models.HostQueueEntry.objects.filter(
2867 job=self.id).values('execution_subdir').distinct()
2868 subdirs = (entry['execution_subdir'] for entry in query)
showardf1ae3542009-05-11 19:26:02 +00002869 group_matches = (group_count_re.match(subdir) for subdir in subdirs)
2870 ids = [int(match.group(1)) for match in group_matches if match]
showard2bab8f42008-11-12 18:15:22 +00002871 if ids:
2872 next_id = max(ids) + 1
2873 else:
2874 next_id = 0
showardf1ae3542009-05-11 19:26:02 +00002875 return '%sgroup%d' % (group_name, next_id)
showard2bab8f42008-11-12 18:15:22 +00002876
2877
showard170873e2009-01-07 00:22:26 +00002878 def _write_control_file(self, execution_tag):
2879 control_path = _drone_manager.attach_file_to_execution(
2880 execution_tag, self.control_file)
2881 return control_path
mbligh36768f02008-02-22 18:28:33 +00002882
showardb2e2c322008-10-14 17:33:55 +00002883
showard2bab8f42008-11-12 18:15:22 +00002884 def get_group_entries(self, queue_entry_from_group):
2885 execution_subdir = queue_entry_from_group.execution_subdir
showarde788ea62008-11-17 21:02:47 +00002886 return list(HostQueueEntry.fetch(
2887 where='job_id=%s AND execution_subdir=%s',
2888 params=(self.id, execution_subdir)))
showard2bab8f42008-11-12 18:15:22 +00002889
2890
showardb2e2c322008-10-14 17:33:55 +00002891 def _get_autoserv_params(self, queue_entries):
showard170873e2009-01-07 00:22:26 +00002892 assert queue_entries
2893 execution_tag = queue_entries[0].execution_tag()
2894 control_path = self._write_control_file(execution_tag)
jadmanski0afbb632008-06-06 21:10:57 +00002895 hostnames = ','.join([entry.get_host().hostname
2896 for entry in queue_entries])
mbligh36768f02008-02-22 18:28:33 +00002897
showard87ba02a2009-04-20 19:37:32 +00002898 params = _autoserv_command_line(
2899 hostnames, execution_tag,
2900 ['-P', execution_tag, '-n',
2901 _drone_manager.absolute_path(control_path)],
2902 job=self)
mbligh36768f02008-02-22 18:28:33 +00002903
jadmanski0afbb632008-06-06 21:10:57 +00002904 if not self.is_server_job():
2905 params.append('-c')
mbligh36768f02008-02-22 18:28:33 +00002906
showardb2e2c322008-10-14 17:33:55 +00002907 return params
mblighe2586682008-02-29 22:45:46 +00002908
mbligh36768f02008-02-22 18:28:33 +00002909
showardc9ae1782009-01-30 01:42:37 +00002910 def _should_run_cleanup(self, queue_entry):
showard0fc38302008-10-23 00:44:07 +00002911 if self.reboot_before == models.RebootBefore.ALWAYS:
showardc9ae1782009-01-30 01:42:37 +00002912 return True
showard0fc38302008-10-23 00:44:07 +00002913 elif self.reboot_before == models.RebootBefore.IF_DIRTY:
showardc9ae1782009-01-30 01:42:37 +00002914 return queue_entry.get_host().dirty
2915 return False
showard21baa452008-10-21 00:08:39 +00002916
showardc9ae1782009-01-30 01:42:37 +00002917
2918 def _should_run_verify(self, queue_entry):
2919 do_not_verify = (queue_entry.host.protection ==
2920 host_protections.Protection.DO_NOT_VERIFY)
2921 if do_not_verify:
2922 return False
2923 return self.run_verify
2924
2925
showard77182562009-06-10 00:16:05 +00002926 def get_pre_job_tasks(self, queue_entry):
2927 """
2928 Get a list of tasks to perform before the host_queue_entry
2929 may be used to run this Job (such as Cleanup & Verify).
2930
2931 @returns A list of tasks to be done to the given queue_entry before
2932 it should be considered be ready to run this job. The last
2933 task in the list calls HostQueueEntry.on_pending(), which
2934 continues the flow of the job.
2935 """
showard21baa452008-10-21 00:08:39 +00002936 tasks = []
showardc9ae1782009-01-30 01:42:37 +00002937 if self._should_run_cleanup(queue_entry):
showard45ae8192008-11-05 19:32:53 +00002938 tasks.append(CleanupTask(queue_entry=queue_entry))
showardc9ae1782009-01-30 01:42:37 +00002939 if self._should_run_verify(queue_entry):
2940 tasks.append(VerifyTask(queue_entry=queue_entry))
2941 tasks.append(SetEntryPendingTask(queue_entry))
showard21baa452008-10-21 00:08:39 +00002942 return tasks
2943
2944
showardf1ae3542009-05-11 19:26:02 +00002945 def _assign_new_group(self, queue_entries, group_name=''):
showard2bab8f42008-11-12 18:15:22 +00002946 if len(queue_entries) == 1:
showardf1ae3542009-05-11 19:26:02 +00002947 group_subdir_name = queue_entries[0].get_host().hostname
showard2bab8f42008-11-12 18:15:22 +00002948 else:
showardf1ae3542009-05-11 19:26:02 +00002949 group_subdir_name = self._next_group_name(group_name)
showardb18134f2009-03-20 20:52:18 +00002950 logging.info('Running synchronous job %d hosts %s as %s',
showard2bab8f42008-11-12 18:15:22 +00002951 self.id, [entry.host.hostname for entry in queue_entries],
showardf1ae3542009-05-11 19:26:02 +00002952 group_subdir_name)
showard2bab8f42008-11-12 18:15:22 +00002953
2954 for queue_entry in queue_entries:
showardf1ae3542009-05-11 19:26:02 +00002955 queue_entry.set_execution_subdir(group_subdir_name)
showard2bab8f42008-11-12 18:15:22 +00002956
2957
2958 def _choose_group_to_run(self, include_queue_entry):
showardf1ae3542009-05-11 19:26:02 +00002959 """
2960 @returns A tuple containing a list of HostQueueEntry instances to be
2961 used to run this Job, a string group name to suggest giving
showard54c1ea92009-05-20 00:32:58 +00002962 to this job in the results database.
showardf1ae3542009-05-11 19:26:02 +00002963 """
showard77182562009-06-10 00:16:05 +00002964 atomic_group = include_queue_entry.atomic_group
showard2bab8f42008-11-12 18:15:22 +00002965 chosen_entries = [include_queue_entry]
showardf1ae3542009-05-11 19:26:02 +00002966 if atomic_group:
2967 num_entries_wanted = atomic_group.max_number_of_machines
2968 else:
2969 num_entries_wanted = self.synch_count
2970 num_entries_wanted -= len(chosen_entries)
showard2bab8f42008-11-12 18:15:22 +00002971
showardf1ae3542009-05-11 19:26:02 +00002972 if num_entries_wanted > 0:
2973 where_clause = 'job_id = %s AND status = "Pending" AND id != %s'
showard54c1ea92009-05-20 00:32:58 +00002974 pending_entries = list(HostQueueEntry.fetch(
showardf1ae3542009-05-11 19:26:02 +00002975 where=where_clause,
showard54c1ea92009-05-20 00:32:58 +00002976 params=(self.id, include_queue_entry.id)))
2977
2978 # Sort the chosen hosts by hostname before slicing.
2979 def cmp_queue_entries_by_hostname(entry_a, entry_b):
2980 return Host.cmp_for_sort(entry_a.host, entry_b.host)
2981 pending_entries.sort(cmp=cmp_queue_entries_by_hostname)
2982 chosen_entries += pending_entries[:num_entries_wanted]
showard2bab8f42008-11-12 18:15:22 +00002983
showardf1ae3542009-05-11 19:26:02 +00002984 # Sanity check. We'll only ever be called if this can be met.
2985 assert len(chosen_entries) >= self.synch_count
2986
2987 if atomic_group:
2988 # Look at any meta_host and dependency labels and pick the first
2989 # one that also specifies this atomic group. Use that label name
2990 # as the group name if possible (it is more specific).
2991 group_name = atomic_group.name
2992 for label in include_queue_entry.get_labels():
2993 if label.atomic_group_id:
2994 assert label.atomic_group_id == atomic_group.id
2995 group_name = label.name
2996 break
2997 else:
2998 group_name = ''
2999
3000 self._assign_new_group(chosen_entries, group_name=group_name)
3001 return chosen_entries, group_name
showard2bab8f42008-11-12 18:15:22 +00003002
3003
showard77182562009-06-10 00:16:05 +00003004 def run_if_ready(self, queue_entry):
3005 """
3006 @returns An Agent instance to ultimately run this job if enough hosts
3007 are ready for it to run.
3008 @returns None and potentially cleans up excess hosts if this Job
3009 is not ready to run.
3010 """
showardb2e2c322008-10-14 17:33:55 +00003011 if not self.is_ready():
showard77182562009-06-10 00:16:05 +00003012 self.stop_if_necessary()
3013 return None
mbligh36768f02008-02-22 18:28:33 +00003014
showard77182562009-06-10 00:16:05 +00003015 if queue_entry.atomic_group:
3016 return self.run_with_ready_delay(queue_entry)
3017
3018 return self.run(queue_entry)
3019
3020
3021 def run_with_ready_delay(self, queue_entry):
3022 """
3023 Start a delay to wait for more hosts to enter Pending state before
3024 launching an atomic group job. Once set, the a delay cannot be reset.
3025
3026 @param queue_entry: The HostQueueEntry object to get atomic group
3027 info from and pass to run_if_ready when the delay is up.
3028
3029 @returns An Agent to run the job as appropriate or None if a delay
3030 has already been set.
3031 """
3032 assert queue_entry.job_id == self.id
3033 assert queue_entry.atomic_group
3034 delay = scheduler_config.config.secs_to_wait_for_atomic_group_hosts
3035 pending_threshold = queue_entry.atomic_group.max_number_of_machines
3036 over_max_threshold = (self._pending_count() >= pending_threshold)
3037 delay_expired = (self._delay_ready_task and
3038 time.time() >= self._delay_ready_task.end_time)
3039
3040 # Delay is disabled or we already have enough? Do not wait to run.
3041 if not delay or over_max_threshold or delay_expired:
3042 return self.run(queue_entry)
3043
3044 # A delay was previously scheduled.
3045 if self._delay_ready_task:
3046 return None
3047
3048 def run_job_after_delay():
3049 logging.info('Job %s done waiting for extra hosts.', self.id)
3050 return self.run(queue_entry)
3051
3052 self._delay_ready_task = DelayedCallTask(delay_seconds=delay,
3053 callback=run_job_after_delay)
3054
3055 return Agent([self._delay_ready_task], num_processes=0)
3056
3057
3058 def run(self, queue_entry):
3059 """
3060 @param queue_entry: The HostQueueEntry instance calling this method.
3061 @returns An Agent instance to run this job or None if we've already
3062 been run.
3063 """
3064 if queue_entry.atomic_group and self._atomic_and_has_started():
3065 logging.error('Job.run() called on running atomic Job %d '
3066 'with HQE %s.', self.id, queue_entry)
3067 return None
showardf1ae3542009-05-11 19:26:02 +00003068 queue_entries, group_name = self._choose_group_to_run(queue_entry)
3069 return self._finish_run(queue_entries, group_name)
showardb2e2c322008-10-14 17:33:55 +00003070
3071
showardf1ae3542009-05-11 19:26:02 +00003072 def _finish_run(self, queue_entries, group_name):
showardb2ccdda2008-10-28 20:39:05 +00003073 for queue_entry in queue_entries:
3074 queue_entry.set_status('Starting')
showardb2e2c322008-10-14 17:33:55 +00003075 params = self._get_autoserv_params(queue_entries)
3076 queue_task = QueueTask(job=self, queue_entries=queue_entries,
showardf1ae3542009-05-11 19:26:02 +00003077 cmd=params, group_name=group_name)
3078 tasks = [queue_task]
showard77182562009-06-10 00:16:05 +00003079 if self._delay_ready_task:
3080 # Cancel any pending callback that would try to run again
3081 # as we are already running.
3082 self._delay_ready_task.abort()
showardb2e2c322008-10-14 17:33:55 +00003083
showard170873e2009-01-07 00:22:26 +00003084 return Agent(tasks, num_processes=len(queue_entries))
showardb2e2c322008-10-14 17:33:55 +00003085
3086
mbligh36768f02008-02-22 18:28:33 +00003087if __name__ == '__main__':
jadmanski0afbb632008-06-06 21:10:57 +00003088 main()