blob: f03d02ef6453d27a3a8d5220eb6228a07302b029 [file] [log] [blame]
mbligh36768f02008-02-22 18:28:33 +00001#!/usr/bin/python -u
2
3"""
4Autotest scheduler
5"""
showard909c7a62008-07-15 21:52:38 +00006
mbligh36768f02008-02-22 18:28:33 +00007
showardef519212009-05-08 02:29:53 +00008import datetime, errno, optparse, os, pwd, Queue, re, shutil, signal
showard542e8402008-09-19 20:16:18 +00009import smtplib, socket, stat, subprocess, sys, tempfile, time, traceback
showard136e6dc2009-06-10 19:38:49 +000010import itertools, logging, weakref
mbligh70feeee2008-06-11 16:20:49 +000011import common
mbligh8bcd23a2009-02-03 19:14:06 +000012import MySQLdb
showard043c62a2009-06-10 19:48:57 +000013from autotest_lib.scheduler import scheduler_logging_config
showard21baa452008-10-21 00:08:39 +000014from autotest_lib.frontend import setup_django_environment
showard136e6dc2009-06-10 19:38:49 +000015from autotest_lib.client.common_lib import global_config, logging_manager
showardb18134f2009-03-20 20:52:18 +000016from autotest_lib.client.common_lib import host_protections, utils
showardb1e51872008-10-07 11:08:18 +000017from autotest_lib.database import database_connection
showard844960a2009-05-29 18:41:18 +000018from autotest_lib.frontend.afe import models, rpc_utils, readonly_connection
showard170873e2009-01-07 00:22:26 +000019from autotest_lib.scheduler import drone_manager, drones, email_manager
showard043c62a2009-06-10 19:48:57 +000020from autotest_lib.scheduler import monitor_db_cleanup
showardd1ee1dd2009-01-07 21:33:08 +000021from autotest_lib.scheduler import status_server, scheduler_config
mbligh70feeee2008-06-11 16:20:49 +000022
mblighb090f142008-02-27 21:33:46 +000023
mbligh36768f02008-02-22 18:28:33 +000024RESULTS_DIR = '.'
25AUTOSERV_NICE_LEVEL = 10
showard170873e2009-01-07 00:22:26 +000026DB_CONFIG_SECTION = 'AUTOTEST_WEB'
mbligh36768f02008-02-22 18:28:33 +000027AUTOTEST_PATH = os.path.join(os.path.dirname(__file__), '..')
28
29if os.environ.has_key('AUTOTEST_DIR'):
jadmanski0afbb632008-06-06 21:10:57 +000030 AUTOTEST_PATH = os.environ['AUTOTEST_DIR']
mbligh36768f02008-02-22 18:28:33 +000031AUTOTEST_SERVER_DIR = os.path.join(AUTOTEST_PATH, 'server')
32AUTOTEST_TKO_DIR = os.path.join(AUTOTEST_PATH, 'tko')
33
34if AUTOTEST_SERVER_DIR not in sys.path:
jadmanski0afbb632008-06-06 21:10:57 +000035 sys.path.insert(0, AUTOTEST_SERVER_DIR)
mbligh36768f02008-02-22 18:28:33 +000036
mbligh90a549d2008-03-25 23:52:34 +000037# how long to wait for autoserv to write a pidfile
38PIDFILE_TIMEOUT = 5 * 60 # 5 min
mblighbb421852008-03-11 22:36:16 +000039
showardd3dc1992009-04-22 21:01:40 +000040_AUTOSERV_PID_FILE = '.autoserv_execute'
41_CRASHINFO_PID_FILE = '.collect_crashinfo_execute'
42_PARSER_PID_FILE = '.parser_execute'
43
44_ALL_PIDFILE_NAMES = (_AUTOSERV_PID_FILE, _CRASHINFO_PID_FILE,
45 _PARSER_PID_FILE)
46
showard35162b02009-03-03 02:17:30 +000047# error message to leave in results dir when an autoserv process disappears
48# mysteriously
49_LOST_PROCESS_ERROR = """\
50Autoserv failed abnormally during execution for this job, probably due to a
51system error on the Autotest server. Full results may not be available. Sorry.
52"""
53
mbligh6f8bab42008-02-29 22:45:14 +000054_db = None
mbligh36768f02008-02-22 18:28:33 +000055_shutdown = False
showard170873e2009-01-07 00:22:26 +000056_autoserv_path = os.path.join(drones.AUTOTEST_INSTALL_DIR, 'server', 'autoserv')
57_parser_path = os.path.join(drones.AUTOTEST_INSTALL_DIR, 'tko', 'parse')
mbligh4314a712008-02-29 22:44:30 +000058_testing_mode = False
showard542e8402008-09-19 20:16:18 +000059_base_url = None
showardc85c21b2008-11-24 22:17:37 +000060_notify_email_statuses = []
showard170873e2009-01-07 00:22:26 +000061_drone_manager = drone_manager.DroneManager()
mbligh36768f02008-02-22 18:28:33 +000062
63
mbligh83c1e9e2009-05-01 23:10:41 +000064def _site_init_monitor_db_dummy():
65 return {}
66
67
mbligh36768f02008-02-22 18:28:33 +000068def main():
showard27f33872009-04-07 18:20:53 +000069 try:
70 main_without_exception_handling()
showard29caa4b2009-05-26 19:27:09 +000071 except SystemExit:
72 raise
showard27f33872009-04-07 18:20:53 +000073 except:
74 logging.exception('Exception escaping in monitor_db')
75 raise
76
77
78def main_without_exception_handling():
showard136e6dc2009-06-10 19:38:49 +000079 setup_logging()
mbligh36768f02008-02-22 18:28:33 +000080
showard136e6dc2009-06-10 19:38:49 +000081 usage = 'usage: %prog [options] results_dir'
jadmanski0afbb632008-06-06 21:10:57 +000082 parser = optparse.OptionParser(usage)
83 parser.add_option('--recover-hosts', help='Try to recover dead hosts',
84 action='store_true')
jadmanski0afbb632008-06-06 21:10:57 +000085 parser.add_option('--test', help='Indicate that scheduler is under ' +
86 'test and should use dummy autoserv and no parsing',
87 action='store_true')
88 (options, args) = parser.parse_args()
89 if len(args) != 1:
90 parser.print_usage()
91 return
mbligh36768f02008-02-22 18:28:33 +000092
showard5613c662009-06-08 23:30:33 +000093 scheduler_enabled = global_config.global_config.get_config_value(
94 scheduler_config.CONFIG_SECTION, 'enable_scheduler', type=bool)
95
96 if not scheduler_enabled:
97 msg = ("Scheduler not enabled, set enable_scheduler to true in the "
98 "global_config's SCHEDULER section to enabled it. Exiting.")
99 print msg
100 sys.exit(1)
101
jadmanski0afbb632008-06-06 21:10:57 +0000102 global RESULTS_DIR
103 RESULTS_DIR = args[0]
mbligh36768f02008-02-22 18:28:33 +0000104
mbligh83c1e9e2009-05-01 23:10:41 +0000105 site_init = utils.import_site_function(__file__,
106 "autotest_lib.scheduler.site_monitor_db", "site_init_monitor_db",
107 _site_init_monitor_db_dummy)
108 site_init()
109
showardcca334f2009-03-12 20:38:34 +0000110 # Change the cwd while running to avoid issues incase we were launched from
111 # somewhere odd (such as a random NFS home directory of the person running
112 # sudo to launch us as the appropriate user).
113 os.chdir(RESULTS_DIR)
114
jadmanski0afbb632008-06-06 21:10:57 +0000115 c = global_config.global_config
showardd1ee1dd2009-01-07 21:33:08 +0000116 notify_statuses_list = c.get_config_value(scheduler_config.CONFIG_SECTION,
117 "notify_email_statuses",
118 default='')
showardc85c21b2008-11-24 22:17:37 +0000119 global _notify_email_statuses
showard170873e2009-01-07 00:22:26 +0000120 _notify_email_statuses = [status for status in
121 re.split(r'[\s,;:]', notify_statuses_list.lower())
122 if status]
showardc85c21b2008-11-24 22:17:37 +0000123
jadmanski0afbb632008-06-06 21:10:57 +0000124 if options.test:
125 global _autoserv_path
126 _autoserv_path = 'autoserv_dummy'
127 global _testing_mode
128 _testing_mode = True
mbligh36768f02008-02-22 18:28:33 +0000129
mbligh37eceaa2008-12-15 22:56:37 +0000130 # AUTOTEST_WEB.base_url is still a supported config option as some people
131 # may wish to override the entire url.
showard542e8402008-09-19 20:16:18 +0000132 global _base_url
showard170873e2009-01-07 00:22:26 +0000133 config_base_url = c.get_config_value(DB_CONFIG_SECTION, 'base_url',
134 default='')
mbligh37eceaa2008-12-15 22:56:37 +0000135 if config_base_url:
136 _base_url = config_base_url
showard542e8402008-09-19 20:16:18 +0000137 else:
mbligh37eceaa2008-12-15 22:56:37 +0000138 # For the common case of everything running on a single server you
139 # can just set the hostname in a single place in the config file.
140 server_name = c.get_config_value('SERVER', 'hostname')
141 if not server_name:
showardb18134f2009-03-20 20:52:18 +0000142 logging.critical('[SERVER] hostname missing from the config file.')
mbligh37eceaa2008-12-15 22:56:37 +0000143 sys.exit(1)
144 _base_url = 'http://%s/afe/' % server_name
showard542e8402008-09-19 20:16:18 +0000145
showardc5afc462009-01-13 00:09:39 +0000146 server = status_server.StatusServer(_drone_manager)
showardd1ee1dd2009-01-07 21:33:08 +0000147 server.start()
148
jadmanski0afbb632008-06-06 21:10:57 +0000149 try:
showard136e6dc2009-06-10 19:38:49 +0000150 init()
showardc5afc462009-01-13 00:09:39 +0000151 dispatcher = Dispatcher()
showard915958d2009-04-22 21:00:58 +0000152 dispatcher.initialize(recover_hosts=options.recover_hosts)
showardc5afc462009-01-13 00:09:39 +0000153
jadmanski0afbb632008-06-06 21:10:57 +0000154 while not _shutdown:
155 dispatcher.tick()
showardd1ee1dd2009-01-07 21:33:08 +0000156 time.sleep(scheduler_config.config.tick_pause_sec)
jadmanski0afbb632008-06-06 21:10:57 +0000157 except:
showard170873e2009-01-07 00:22:26 +0000158 email_manager.manager.log_stacktrace(
159 "Uncaught exception; terminating monitor_db")
jadmanski0afbb632008-06-06 21:10:57 +0000160
showard170873e2009-01-07 00:22:26 +0000161 email_manager.manager.send_queued_emails()
showard55b4b542009-01-08 23:30:30 +0000162 server.shutdown()
showard170873e2009-01-07 00:22:26 +0000163 _drone_manager.shutdown()
jadmanski0afbb632008-06-06 21:10:57 +0000164 _db.disconnect()
mbligh36768f02008-02-22 18:28:33 +0000165
166
showard136e6dc2009-06-10 19:38:49 +0000167def setup_logging():
168 log_dir = os.environ.get('AUTOTEST_SCHEDULER_LOG_DIR', None)
169 log_name = os.environ.get('AUTOTEST_SCHEDULER_LOG_NAME', None)
170 logging_manager.configure_logging(
171 scheduler_logging_config.SchedulerLoggingConfig(), log_dir=log_dir,
172 logfile_name=log_name)
173
174
mbligh36768f02008-02-22 18:28:33 +0000175def handle_sigint(signum, frame):
jadmanski0afbb632008-06-06 21:10:57 +0000176 global _shutdown
177 _shutdown = True
showardb18134f2009-03-20 20:52:18 +0000178 logging.info("Shutdown request received.")
mbligh36768f02008-02-22 18:28:33 +0000179
180
showard136e6dc2009-06-10 19:38:49 +0000181def init():
showardb18134f2009-03-20 20:52:18 +0000182 logging.info("%s> dispatcher starting", time.strftime("%X %x"))
183 logging.info("My PID is %d", os.getpid())
mbligh36768f02008-02-22 18:28:33 +0000184
mblighfb676032009-04-01 18:25:38 +0000185 utils.write_pid("monitor_db")
186
showardb1e51872008-10-07 11:08:18 +0000187 if _testing_mode:
188 global_config.global_config.override_config_value(
showard170873e2009-01-07 00:22:26 +0000189 DB_CONFIG_SECTION, 'database', 'stresstest_autotest_web')
showardb1e51872008-10-07 11:08:18 +0000190
jadmanski0afbb632008-06-06 21:10:57 +0000191 os.environ['PATH'] = AUTOTEST_SERVER_DIR + ':' + os.environ['PATH']
192 global _db
showard170873e2009-01-07 00:22:26 +0000193 _db = database_connection.DatabaseConnection(DB_CONFIG_SECTION)
jadmanski0afbb632008-06-06 21:10:57 +0000194 _db.connect()
mbligh36768f02008-02-22 18:28:33 +0000195
showardfa8629c2008-11-04 16:51:23 +0000196 # ensure Django connection is in autocommit
197 setup_django_environment.enable_autocommit()
showard844960a2009-05-29 18:41:18 +0000198 # bypass the readonly connection
199 readonly_connection.ReadOnlyConnection.set_globally_disabled(True)
showardfa8629c2008-11-04 16:51:23 +0000200
showardb18134f2009-03-20 20:52:18 +0000201 logging.info("Setting signal handler")
jadmanski0afbb632008-06-06 21:10:57 +0000202 signal.signal(signal.SIGINT, handle_sigint)
203
showardd1ee1dd2009-01-07 21:33:08 +0000204 drones = global_config.global_config.get_config_value(
205 scheduler_config.CONFIG_SECTION, 'drones', default='localhost')
206 drone_list = [hostname.strip() for hostname in drones.split(',')]
showard170873e2009-01-07 00:22:26 +0000207 results_host = global_config.global_config.get_config_value(
showardd1ee1dd2009-01-07 21:33:08 +0000208 scheduler_config.CONFIG_SECTION, 'results_host', default='localhost')
showard170873e2009-01-07 00:22:26 +0000209 _drone_manager.initialize(RESULTS_DIR, drone_list, results_host)
210
showardb18134f2009-03-20 20:52:18 +0000211 logging.info("Connected! Running...")
mbligh36768f02008-02-22 18:28:33 +0000212
213
showard87ba02a2009-04-20 19:37:32 +0000214def _autoserv_command_line(machines, results_dir, extra_args, job=None,
showarde9c69362009-06-30 01:58:03 +0000215 queue_entry=None, verbose=True):
showardf1ae3542009-05-11 19:26:02 +0000216 """
217 @returns The autoserv command line as a list of executable + parameters.
218
219 @param machines - string - A machine or comma separated list of machines
220 for the (-m) flag.
221 @param results_dir - string - Where the results will be written (-r).
222 @param extra_args - list - Additional arguments to pass to autoserv.
223 @param job - Job object - If supplied, -u owner and -l name parameters
224 will be added.
225 @param queue_entry - A HostQueueEntry object - If supplied and no Job
226 object was supplied, this will be used to lookup the Job object.
227 """
showard87ba02a2009-04-20 19:37:32 +0000228 autoserv_argv = [_autoserv_path, '-p', '-m', machines,
229 '-r', _drone_manager.absolute_path(results_dir)]
230 if job or queue_entry:
231 if not job:
232 job = queue_entry.job
233 autoserv_argv += ['-u', job.owner, '-l', job.name]
showarde9c69362009-06-30 01:58:03 +0000234 if verbose:
235 autoserv_argv.append('--verbose')
showard87ba02a2009-04-20 19:37:32 +0000236 return autoserv_argv + extra_args
237
238
showard89f84db2009-03-12 20:39:13 +0000239class SchedulerError(Exception):
240 """Raised by HostScheduler when an inconsistent state occurs."""
241
242
showard63a34772008-08-18 19:32:50 +0000243class HostScheduler(object):
244 def _get_ready_hosts(self):
245 # avoid any host with a currently active queue entry against it
246 hosts = Host.fetch(
247 joins='LEFT JOIN host_queue_entries AS active_hqe '
248 'ON (hosts.id = active_hqe.host_id AND '
showardb1e51872008-10-07 11:08:18 +0000249 'active_hqe.active)',
showard63a34772008-08-18 19:32:50 +0000250 where="active_hqe.host_id IS NULL "
showardb1e51872008-10-07 11:08:18 +0000251 "AND NOT hosts.locked "
showard63a34772008-08-18 19:32:50 +0000252 "AND (hosts.status IS NULL OR hosts.status = 'Ready')")
253 return dict((host.id, host) for host in hosts)
254
255
256 @staticmethod
257 def _get_sql_id_list(id_list):
258 return ','.join(str(item_id) for item_id in id_list)
259
260
261 @classmethod
showard989f25d2008-10-01 11:38:11 +0000262 def _get_many2many_dict(cls, query, id_list, flip=False):
mbligh849a0f62008-08-28 20:12:19 +0000263 if not id_list:
264 return {}
showard63a34772008-08-18 19:32:50 +0000265 query %= cls._get_sql_id_list(id_list)
266 rows = _db.execute(query)
showard989f25d2008-10-01 11:38:11 +0000267 return cls._process_many2many_dict(rows, flip)
268
269
270 @staticmethod
271 def _process_many2many_dict(rows, flip=False):
showard63a34772008-08-18 19:32:50 +0000272 result = {}
273 for row in rows:
showard89f84db2009-03-12 20:39:13 +0000274 left_id, right_id = int(row[0]), int(row[1])
showard989f25d2008-10-01 11:38:11 +0000275 if flip:
276 left_id, right_id = right_id, left_id
showard63a34772008-08-18 19:32:50 +0000277 result.setdefault(left_id, set()).add(right_id)
278 return result
279
280
281 @classmethod
282 def _get_job_acl_groups(cls, job_ids):
283 query = """
showardd9ac4452009-02-07 02:04:37 +0000284 SELECT jobs.id, acl_groups_users.aclgroup_id
showard63a34772008-08-18 19:32:50 +0000285 FROM jobs
286 INNER JOIN users ON users.login = jobs.owner
287 INNER JOIN acl_groups_users ON acl_groups_users.user_id = users.id
288 WHERE jobs.id IN (%s)
289 """
290 return cls._get_many2many_dict(query, job_ids)
291
292
293 @classmethod
294 def _get_job_ineligible_hosts(cls, job_ids):
295 query = """
296 SELECT job_id, host_id
297 FROM ineligible_host_queues
298 WHERE job_id IN (%s)
299 """
300 return cls._get_many2many_dict(query, job_ids)
301
302
303 @classmethod
showard989f25d2008-10-01 11:38:11 +0000304 def _get_job_dependencies(cls, job_ids):
305 query = """
306 SELECT job_id, label_id
307 FROM jobs_dependency_labels
308 WHERE job_id IN (%s)
309 """
310 return cls._get_many2many_dict(query, job_ids)
311
312
313 @classmethod
showard63a34772008-08-18 19:32:50 +0000314 def _get_host_acls(cls, host_ids):
315 query = """
showardd9ac4452009-02-07 02:04:37 +0000316 SELECT host_id, aclgroup_id
showard63a34772008-08-18 19:32:50 +0000317 FROM acl_groups_hosts
318 WHERE host_id IN (%s)
319 """
320 return cls._get_many2many_dict(query, host_ids)
321
322
323 @classmethod
324 def _get_label_hosts(cls, host_ids):
showardfa8629c2008-11-04 16:51:23 +0000325 if not host_ids:
326 return {}, {}
showard63a34772008-08-18 19:32:50 +0000327 query = """
328 SELECT label_id, host_id
329 FROM hosts_labels
330 WHERE host_id IN (%s)
showard989f25d2008-10-01 11:38:11 +0000331 """ % cls._get_sql_id_list(host_ids)
332 rows = _db.execute(query)
333 labels_to_hosts = cls._process_many2many_dict(rows)
334 hosts_to_labels = cls._process_many2many_dict(rows, flip=True)
335 return labels_to_hosts, hosts_to_labels
336
337
338 @classmethod
339 def _get_labels(cls):
340 return dict((label.id, label) for label in Label.fetch())
showard63a34772008-08-18 19:32:50 +0000341
342
343 def refresh(self, pending_queue_entries):
344 self._hosts_available = self._get_ready_hosts()
345
346 relevant_jobs = [queue_entry.job_id
347 for queue_entry in pending_queue_entries]
348 self._job_acls = self._get_job_acl_groups(relevant_jobs)
349 self._ineligible_hosts = self._get_job_ineligible_hosts(relevant_jobs)
showard989f25d2008-10-01 11:38:11 +0000350 self._job_dependencies = self._get_job_dependencies(relevant_jobs)
showard63a34772008-08-18 19:32:50 +0000351
352 host_ids = self._hosts_available.keys()
353 self._host_acls = self._get_host_acls(host_ids)
showard989f25d2008-10-01 11:38:11 +0000354 self._label_hosts, self._host_labels = self._get_label_hosts(host_ids)
355
356 self._labels = self._get_labels()
showard63a34772008-08-18 19:32:50 +0000357
358
359 def _is_acl_accessible(self, host_id, queue_entry):
360 job_acls = self._job_acls.get(queue_entry.job_id, set())
361 host_acls = self._host_acls.get(host_id, set())
362 return len(host_acls.intersection(job_acls)) > 0
363
364
showard989f25d2008-10-01 11:38:11 +0000365 def _check_job_dependencies(self, job_dependencies, host_labels):
366 missing = job_dependencies - host_labels
showard89f84db2009-03-12 20:39:13 +0000367 return len(missing) == 0
showard989f25d2008-10-01 11:38:11 +0000368
369
370 def _check_only_if_needed_labels(self, job_dependencies, host_labels,
371 queue_entry):
showardade14e22009-01-26 22:38:32 +0000372 if not queue_entry.meta_host:
373 # bypass only_if_needed labels when a specific host is selected
374 return True
375
showard989f25d2008-10-01 11:38:11 +0000376 for label_id in host_labels:
377 label = self._labels[label_id]
378 if not label.only_if_needed:
379 # we don't care about non-only_if_needed labels
380 continue
381 if queue_entry.meta_host == label_id:
382 # if the label was requested in a metahost it's OK
383 continue
384 if label_id not in job_dependencies:
385 return False
386 return True
387
388
showard89f84db2009-03-12 20:39:13 +0000389 def _check_atomic_group_labels(self, host_labels, queue_entry):
390 """
391 Determine if the given HostQueueEntry's atomic group settings are okay
392 to schedule on a host with the given labels.
393
394 @param host_labels - A list of label ids that the host has.
395 @param queue_entry - The HostQueueEntry being considered for the host.
396
397 @returns True if atomic group settings are okay, False otherwise.
398 """
399 return (self._get_host_atomic_group_id(host_labels) ==
400 queue_entry.atomic_group_id)
401
402
403 def _get_host_atomic_group_id(self, host_labels):
404 """
405 Return the atomic group label id for a host with the given set of
406 labels if any, or None otherwise. Raises an exception if more than
407 one atomic group are found in the set of labels.
408
409 @param host_labels - A list of label ids that the host has.
410
411 @returns The id of the atomic group found on a label in host_labels
412 or None if no atomic group label is found.
413 @raises SchedulerError - If more than one atomic group label is found.
414 """
415 atomic_ids = [self._labels[label_id].atomic_group_id
416 for label_id in host_labels
417 if self._labels[label_id].atomic_group_id is not None]
418 if not atomic_ids:
419 return None
420 if len(atomic_ids) > 1:
421 raise SchedulerError('More than one atomic label on host.')
422 return atomic_ids[0]
423
424
425 def _get_atomic_group_labels(self, atomic_group_id):
426 """
427 Lookup the label ids that an atomic_group is associated with.
428
429 @param atomic_group_id - The id of the AtomicGroup to look up.
430
431 @returns A generator yeilding Label ids for this atomic group.
432 """
433 return (id for id, label in self._labels.iteritems()
434 if label.atomic_group_id == atomic_group_id
435 and not label.invalid)
436
437
showard54c1ea92009-05-20 00:32:58 +0000438 def _get_eligible_host_ids_in_group(self, group_hosts, queue_entry):
showard89f84db2009-03-12 20:39:13 +0000439 """
440 @param group_hosts - A sequence of Host ids to test for usability
441 and eligibility against the Job associated with queue_entry.
442 @param queue_entry - The HostQueueEntry that these hosts are being
443 tested for eligibility against.
444
445 @returns A subset of group_hosts Host ids that are eligible for the
446 supplied queue_entry.
447 """
448 return set(host_id for host_id in group_hosts
449 if self._is_host_usable(host_id)
450 and self._is_host_eligible_for_job(host_id, queue_entry))
451
452
showard989f25d2008-10-01 11:38:11 +0000453 def _is_host_eligible_for_job(self, host_id, queue_entry):
showard2924b0a2009-06-18 23:16:15 +0000454 if self._is_host_invalid(host_id):
455 # if an invalid host is scheduled for a job, it's a one-time host
456 # and it therefore bypasses eligibility checks. note this can only
457 # happen for non-metahosts, because invalid hosts have their label
458 # relationships cleared.
459 return True
460
showard989f25d2008-10-01 11:38:11 +0000461 job_dependencies = self._job_dependencies.get(queue_entry.job_id, set())
462 host_labels = self._host_labels.get(host_id, set())
mblighc993bee2008-10-03 03:42:34 +0000463
showard89f84db2009-03-12 20:39:13 +0000464 return (self._is_acl_accessible(host_id, queue_entry) and
465 self._check_job_dependencies(job_dependencies, host_labels) and
466 self._check_only_if_needed_labels(
467 job_dependencies, host_labels, queue_entry) and
468 self._check_atomic_group_labels(host_labels, queue_entry))
showard989f25d2008-10-01 11:38:11 +0000469
470
showard2924b0a2009-06-18 23:16:15 +0000471 def _is_host_invalid(self, host_id):
472 host_object = self._hosts_available.get(host_id, None)
473 return host_object and host_object.invalid
474
475
showard63a34772008-08-18 19:32:50 +0000476 def _schedule_non_metahost(self, queue_entry):
showard989f25d2008-10-01 11:38:11 +0000477 if not self._is_host_eligible_for_job(queue_entry.host_id, queue_entry):
showard63a34772008-08-18 19:32:50 +0000478 return None
479 return self._hosts_available.pop(queue_entry.host_id, None)
480
481
482 def _is_host_usable(self, host_id):
483 if host_id not in self._hosts_available:
484 # host was already used during this scheduling cycle
485 return False
486 if self._hosts_available[host_id].invalid:
487 # Invalid hosts cannot be used for metahosts. They're included in
488 # the original query because they can be used by non-metahosts.
489 return False
490 return True
491
492
493 def _schedule_metahost(self, queue_entry):
494 label_id = queue_entry.meta_host
495 hosts_in_label = self._label_hosts.get(label_id, set())
496 ineligible_host_ids = self._ineligible_hosts.get(queue_entry.job_id,
497 set())
498
499 # must iterate over a copy so we can mutate the original while iterating
500 for host_id in list(hosts_in_label):
501 if not self._is_host_usable(host_id):
502 hosts_in_label.remove(host_id)
503 continue
504 if host_id in ineligible_host_ids:
505 continue
showard989f25d2008-10-01 11:38:11 +0000506 if not self._is_host_eligible_for_job(host_id, queue_entry):
showard63a34772008-08-18 19:32:50 +0000507 continue
508
showard89f84db2009-03-12 20:39:13 +0000509 # Remove the host from our cached internal state before returning
510 # the host object.
showard63a34772008-08-18 19:32:50 +0000511 hosts_in_label.remove(host_id)
512 return self._hosts_available.pop(host_id)
513 return None
514
515
516 def find_eligible_host(self, queue_entry):
517 if not queue_entry.meta_host:
showard89f84db2009-03-12 20:39:13 +0000518 assert queue_entry.host_id is not None
showard63a34772008-08-18 19:32:50 +0000519 return self._schedule_non_metahost(queue_entry)
showard89f84db2009-03-12 20:39:13 +0000520 assert queue_entry.atomic_group_id is None
showard63a34772008-08-18 19:32:50 +0000521 return self._schedule_metahost(queue_entry)
522
523
showard89f84db2009-03-12 20:39:13 +0000524 def find_eligible_atomic_group(self, queue_entry):
525 """
526 Given an atomic group host queue entry, locate an appropriate group
527 of hosts for the associated job to run on.
528
529 The caller is responsible for creating new HQEs for the additional
530 hosts returned in order to run the actual job on them.
531
532 @returns A list of Host instances in a ready state to satisfy this
533 atomic group scheduling. Hosts will all belong to the same
534 atomic group label as specified by the queue_entry.
535 An empty list will be returned if no suitable atomic
536 group could be found.
537
538 TODO(gps): what is responsible for kicking off any attempted repairs on
539 a group of hosts? not this function, but something needs to. We do
540 not communicate that reason for returning [] outside of here...
541 For now, we'll just be unschedulable if enough hosts within one group
542 enter Repair Failed state.
543 """
544 assert queue_entry.atomic_group_id is not None
545 job = queue_entry.job
546 assert job.synch_count and job.synch_count > 0
showard77182562009-06-10 00:16:05 +0000547 atomic_group = queue_entry.atomic_group
showard89f84db2009-03-12 20:39:13 +0000548 if job.synch_count > atomic_group.max_number_of_machines:
549 # Such a Job and HostQueueEntry should never be possible to
550 # create using the frontend. Regardless, we can't process it.
551 # Abort it immediately and log an error on the scheduler.
552 queue_entry.set_status(models.HostQueueEntry.Status.ABORTED)
showard7629f142009-03-27 21:02:02 +0000553 logging.error(
554 'Error: job %d synch_count=%d > requested atomic_group %d '
555 'max_number_of_machines=%d. Aborted host_queue_entry %d.',
556 job.id, job.synch_count, atomic_group.id,
557 atomic_group.max_number_of_machines, queue_entry.id)
showard89f84db2009-03-12 20:39:13 +0000558 return []
559 hosts_in_label = self._label_hosts.get(queue_entry.meta_host, set())
560 ineligible_host_ids = self._ineligible_hosts.get(queue_entry.job_id,
561 set())
562
563 # Look in each label associated with atomic_group until we find one with
564 # enough hosts to satisfy the job.
565 for atomic_label_id in self._get_atomic_group_labels(atomic_group.id):
566 group_hosts = set(self._label_hosts.get(atomic_label_id, set()))
567 if queue_entry.meta_host is not None:
568 # If we have a metahost label, only allow its hosts.
569 group_hosts.intersection_update(hosts_in_label)
570 group_hosts -= ineligible_host_ids
showard54c1ea92009-05-20 00:32:58 +0000571 eligible_host_ids_in_group = self._get_eligible_host_ids_in_group(
showard89f84db2009-03-12 20:39:13 +0000572 group_hosts, queue_entry)
573
574 # Job.synch_count is treated as "minimum synch count" when
575 # scheduling for an atomic group of hosts. The atomic group
576 # number of machines is the maximum to pick out of a single
577 # atomic group label for scheduling at one time.
578 min_hosts = job.synch_count
579 max_hosts = atomic_group.max_number_of_machines
580
showard54c1ea92009-05-20 00:32:58 +0000581 if len(eligible_host_ids_in_group) < min_hosts:
showard89f84db2009-03-12 20:39:13 +0000582 # Not enough eligible hosts in this atomic group label.
583 continue
584
showard54c1ea92009-05-20 00:32:58 +0000585 eligible_hosts_in_group = [self._hosts_available[id]
586 for id in eligible_host_ids_in_group]
showardef519212009-05-08 02:29:53 +0000587 # So that they show up in a sane order when viewing the job.
showard54c1ea92009-05-20 00:32:58 +0000588 eligible_hosts_in_group.sort(cmp=Host.cmp_for_sort)
showardef519212009-05-08 02:29:53 +0000589
showard89f84db2009-03-12 20:39:13 +0000590 # Limit ourselves to scheduling the atomic group size.
591 if len(eligible_hosts_in_group) > max_hosts:
showardef519212009-05-08 02:29:53 +0000592 eligible_hosts_in_group = eligible_hosts_in_group[:max_hosts]
showard89f84db2009-03-12 20:39:13 +0000593
594 # Remove the selected hosts from our cached internal state
595 # of available hosts in order to return the Host objects.
596 host_list = []
showard54c1ea92009-05-20 00:32:58 +0000597 for host in eligible_hosts_in_group:
598 hosts_in_label.discard(host.id)
599 self._hosts_available.pop(host.id)
600 host_list.append(host)
showard89f84db2009-03-12 20:39:13 +0000601 return host_list
602
603 return []
604
605
showard170873e2009-01-07 00:22:26 +0000606class Dispatcher(object):
jadmanski0afbb632008-06-06 21:10:57 +0000607 def __init__(self):
608 self._agents = []
showard3bb499f2008-07-03 19:42:20 +0000609 self._last_clean_time = time.time()
showard63a34772008-08-18 19:32:50 +0000610 self._host_scheduler = HostScheduler()
mblighf3294cc2009-04-08 21:17:38 +0000611 user_cleanup_time = scheduler_config.config.clean_interval
612 self._periodic_cleanup = monitor_db_cleanup.UserCleanup(
613 _db, user_cleanup_time)
614 self._24hr_upkeep = monitor_db_cleanup.TwentyFourHourUpkeep(_db)
showard170873e2009-01-07 00:22:26 +0000615 self._host_agents = {}
616 self._queue_entry_agents = {}
mbligh36768f02008-02-22 18:28:33 +0000617
mbligh36768f02008-02-22 18:28:33 +0000618
showard915958d2009-04-22 21:00:58 +0000619 def initialize(self, recover_hosts=True):
620 self._periodic_cleanup.initialize()
621 self._24hr_upkeep.initialize()
622
jadmanski0afbb632008-06-06 21:10:57 +0000623 # always recover processes
624 self._recover_processes()
mblighbb421852008-03-11 22:36:16 +0000625
jadmanski0afbb632008-06-06 21:10:57 +0000626 if recover_hosts:
627 self._recover_hosts()
mbligh36768f02008-02-22 18:28:33 +0000628
629
jadmanski0afbb632008-06-06 21:10:57 +0000630 def tick(self):
showard170873e2009-01-07 00:22:26 +0000631 _drone_manager.refresh()
mblighf3294cc2009-04-08 21:17:38 +0000632 self._run_cleanup()
jadmanski0afbb632008-06-06 21:10:57 +0000633 self._find_aborting()
showard1ff7b2e2009-05-15 23:17:18 +0000634 self._find_reverify()
showard29f7cd22009-04-29 21:16:24 +0000635 self._process_recurring_runs()
jadmanski0afbb632008-06-06 21:10:57 +0000636 self._schedule_new_jobs()
637 self._handle_agents()
showard170873e2009-01-07 00:22:26 +0000638 _drone_manager.execute_actions()
639 email_manager.manager.send_queued_emails()
mbligh36768f02008-02-22 18:28:33 +0000640
showard97aed502008-11-04 02:01:24 +0000641
mblighf3294cc2009-04-08 21:17:38 +0000642 def _run_cleanup(self):
643 self._periodic_cleanup.run_cleanup_maybe()
644 self._24hr_upkeep.run_cleanup_maybe()
showarda3ab0d52008-11-03 19:03:47 +0000645
mbligh36768f02008-02-22 18:28:33 +0000646
showard170873e2009-01-07 00:22:26 +0000647 def _register_agent_for_ids(self, agent_dict, object_ids, agent):
648 for object_id in object_ids:
649 agent_dict.setdefault(object_id, set()).add(agent)
650
651
652 def _unregister_agent_for_ids(self, agent_dict, object_ids, agent):
653 for object_id in object_ids:
654 assert object_id in agent_dict
655 agent_dict[object_id].remove(agent)
656
657
jadmanski0afbb632008-06-06 21:10:57 +0000658 def add_agent(self, agent):
659 self._agents.append(agent)
660 agent.dispatcher = self
showard170873e2009-01-07 00:22:26 +0000661 self._register_agent_for_ids(self._host_agents, agent.host_ids, agent)
662 self._register_agent_for_ids(self._queue_entry_agents,
663 agent.queue_entry_ids, agent)
mblighd5c95802008-03-05 00:33:46 +0000664
showard170873e2009-01-07 00:22:26 +0000665
666 def get_agents_for_entry(self, queue_entry):
667 """
668 Find agents corresponding to the specified queue_entry.
669 """
showardd3dc1992009-04-22 21:01:40 +0000670 return list(self._queue_entry_agents.get(queue_entry.id, set()))
showard170873e2009-01-07 00:22:26 +0000671
672
673 def host_has_agent(self, host):
674 """
675 Determine if there is currently an Agent present using this host.
676 """
677 return bool(self._host_agents.get(host.id, None))
mbligh36768f02008-02-22 18:28:33 +0000678
679
jadmanski0afbb632008-06-06 21:10:57 +0000680 def remove_agent(self, agent):
681 self._agents.remove(agent)
showard170873e2009-01-07 00:22:26 +0000682 self._unregister_agent_for_ids(self._host_agents, agent.host_ids,
683 agent)
684 self._unregister_agent_for_ids(self._queue_entry_agents,
685 agent.queue_entry_ids, agent)
showardec113162008-05-08 00:52:49 +0000686
687
jadmanski0afbb632008-06-06 21:10:57 +0000688 def _recover_processes(self):
showard170873e2009-01-07 00:22:26 +0000689 self._register_pidfiles()
690 _drone_manager.refresh()
showardd3dc1992009-04-22 21:01:40 +0000691 self._recover_all_recoverable_entries()
showard170873e2009-01-07 00:22:26 +0000692 self._requeue_other_active_entries()
showard170873e2009-01-07 00:22:26 +0000693 self._reverify_remaining_hosts()
694 # reinitialize drones after killing orphaned processes, since they can
695 # leave around files when they die
696 _drone_manager.execute_actions()
697 _drone_manager.reinitialize_drones()
mblighbb421852008-03-11 22:36:16 +0000698
showard170873e2009-01-07 00:22:26 +0000699
700 def _register_pidfiles(self):
701 # during recovery we may need to read pidfiles for both running and
702 # parsing entries
703 queue_entries = HostQueueEntry.fetch(
showardd3dc1992009-04-22 21:01:40 +0000704 where="status IN ('Running', 'Gathering', 'Parsing')")
jadmanski0afbb632008-06-06 21:10:57 +0000705 for queue_entry in queue_entries:
showardd3dc1992009-04-22 21:01:40 +0000706 for pidfile_name in _ALL_PIDFILE_NAMES:
707 pidfile_id = _drone_manager.get_pidfile_id_from(
708 queue_entry.execution_tag(), pidfile_name=pidfile_name)
709 _drone_manager.register_pidfile(pidfile_id)
showard170873e2009-01-07 00:22:26 +0000710
711
showardd3dc1992009-04-22 21:01:40 +0000712 def _recover_entries_with_status(self, status, orphans, pidfile_name,
713 recover_entries_fn):
714 queue_entries = HostQueueEntry.fetch(where="status = '%s'" % status)
showard170873e2009-01-07 00:22:26 +0000715 for queue_entry in queue_entries:
716 if self.get_agents_for_entry(queue_entry):
jadmanski0afbb632008-06-06 21:10:57 +0000717 # synchronous job we've already recovered
718 continue
showardd3dc1992009-04-22 21:01:40 +0000719 queue_entries = queue_entry.job.get_group_entries(queue_entry)
showard170873e2009-01-07 00:22:26 +0000720 execution_tag = queue_entry.execution_tag()
721 run_monitor = PidfileRunMonitor()
showardd3dc1992009-04-22 21:01:40 +0000722 run_monitor.attach_to_existing_process(execution_tag,
723 pidfile_name=pidfile_name)
showard597bfd32009-05-08 18:22:50 +0000724
725 log_message = ('Recovering %s entry %s ' %
726 (status.lower(),
727 ', '.join(str(entry) for entry in queue_entries)))
showard170873e2009-01-07 00:22:26 +0000728 if not run_monitor.has_process():
showardd3dc1992009-04-22 21:01:40 +0000729 # execution apparently never happened
showard597bfd32009-05-08 18:22:50 +0000730 logging.info(log_message + 'without process')
showardd3dc1992009-04-22 21:01:40 +0000731 recover_entries_fn(queue_entry.job, queue_entries, None)
showard170873e2009-01-07 00:22:26 +0000732 continue
mbligh90a549d2008-03-25 23:52:34 +0000733
showard597bfd32009-05-08 18:22:50 +0000734 logging.info(log_message + '(process %s)',
showardd3dc1992009-04-22 21:01:40 +0000735 run_monitor.get_process())
736 recover_entries_fn(queue_entry.job, queue_entries, run_monitor)
737 orphans.discard(run_monitor.get_process())
738
739
740 def _kill_remaining_orphan_processes(self, orphans):
741 for process in orphans:
showardb18134f2009-03-20 20:52:18 +0000742 logging.info('Killing orphan %s', process)
showard170873e2009-01-07 00:22:26 +0000743 _drone_manager.kill_process(process)
jadmanski0afbb632008-06-06 21:10:57 +0000744
showard170873e2009-01-07 00:22:26 +0000745
showardd3dc1992009-04-22 21:01:40 +0000746 def _recover_running_entries(self, orphans):
747 def recover_entries(job, queue_entries, run_monitor):
748 if run_monitor is not None:
749 queue_task = RecoveryQueueTask(job=job,
750 queue_entries=queue_entries,
751 run_monitor=run_monitor)
752 self.add_agent(Agent(tasks=[queue_task],
753 num_processes=len(queue_entries)))
754 # else, _requeue_other_active_entries will cover this
755
756 self._recover_entries_with_status(models.HostQueueEntry.Status.RUNNING,
757 orphans, '.autoserv_execute',
758 recover_entries)
759
760
761 def _recover_gathering_entries(self, orphans):
762 def recover_entries(job, queue_entries, run_monitor):
763 gather_task = GatherLogsTask(job, queue_entries,
764 run_monitor=run_monitor)
765 self.add_agent(Agent([gather_task]))
766
767 self._recover_entries_with_status(
768 models.HostQueueEntry.Status.GATHERING,
769 orphans, _CRASHINFO_PID_FILE, recover_entries)
770
771
772 def _recover_parsing_entries(self, orphans):
773 def recover_entries(job, queue_entries, run_monitor):
774 reparse_task = FinalReparseTask(queue_entries,
775 run_monitor=run_monitor)
776 self.add_agent(Agent([reparse_task], num_processes=0))
777
778 self._recover_entries_with_status(models.HostQueueEntry.Status.PARSING,
779 orphans, _PARSER_PID_FILE,
780 recover_entries)
781
782
783 def _recover_all_recoverable_entries(self):
784 orphans = _drone_manager.get_orphaned_autoserv_processes()
785 self._recover_running_entries(orphans)
786 self._recover_gathering_entries(orphans)
787 self._recover_parsing_entries(orphans)
showard2fe3f1d2009-07-06 20:19:11 +0000788 self._recover_special_tasks()
showardd3dc1992009-04-22 21:01:40 +0000789 self._kill_remaining_orphan_processes(orphans)
jadmanski0afbb632008-06-06 21:10:57 +0000790
showard97aed502008-11-04 02:01:24 +0000791
showard2fe3f1d2009-07-06 20:19:11 +0000792 def _recover_special_tasks(self):
793 """\
794 Recovers all special tasks that have started running but have not
795 completed.
796 """
797
798 tasks = models.SpecialTask.objects.filter(is_active=True,
799 is_complete=False)
800 # Use ordering to force NULL queue_entry_id's to the end of the list
801 for task in tasks.order_by('-queue_entry_id'):
802 if self.host_has_agent(task.host):
803 # Duplicated verify task that we've already recovered
804 continue
805
806 logging.info("Recovering %s", task)
807
808 host = Host(id=task.host.id)
809 queue_entry = None
810 if task.queue_entry:
811 queue_entry = HostQueueEntry.fetch(
812 where='id = %s', params=(task.queue_entry.id,)).next()
813
814 self._recover_special_task(task, host, queue_entry)
815
816
817 def _recover_special_task(self, task, host, queue_entry):
818 """\
819 Recovers a single special task.
820 """
821 if task.task == models.SpecialTask.Task.VERIFY:
822 agent_tasks = self._recover_verify(task, host, queue_entry)
823 elif task.task == models.SpecialTask.Task.REPAIR:
824 agent_tasks = self._recover_repair(task, host, queue_entry)
825 elif task.task == models.SpecialTask.Task.CLEANUP:
826 agent_tasks = self._recover_cleanup(task, host, queue_entry)
827 else:
828 # Should never happen
829 logging.error(
830 "Special task id %d had invalid task %s", (task.id, task.task))
831
832 self.add_agent(Agent(agent_tasks))
833
834
835 def _recover_verify(self, task, host, queue_entry):
836 """\
837 Recovers a verify task.
838 No associated queue entry: Verify host
839 With associated queue entry: Verify host, and run associated queue
840 entry
841 """
842 if not task.queue_entry:
843 return [VerifyTask(host=host, task=task)]
844 else:
845 return [VerifyTask(queue_entry=queue_entry, task=task),
846 SetEntryPendingTask(queue_entry=queue_entry)]
847
848
849 def _recover_repair(self, task, host, queue_entry):
850 """\
851 Recovers a repair task.
852 Always repair host
853 """
854 return [RepairTask(host=host, queue_entry=queue_entry, task=task)]
855
856
857 def _recover_cleanup(self, task, host, queue_entry):
858 """\
859 Recovers a cleanup task.
860 No associated queue entry: Clean host
861 With associated queue entry: Clean host, verify host if needed, and
862 run associated queue entry
863 """
864 if not task.queue_entry:
865 return [CleanupTask(host=host, task=task)]
866 else:
867 agent_tasks = [CleanupTask(queue_entry=queue_entry,
868 task=task)]
869 if queue_entry.job.should_run_verify(queue_entry):
870 agent_tasks.append(VerifyTask(queue_entry=queue_entry))
871 agent_tasks.append(
872 SetEntryPendingTask(queue_entry=queue_entry))
873 return agent_tasks
874
875
showard170873e2009-01-07 00:22:26 +0000876 def _requeue_other_active_entries(self):
877 queue_entries = HostQueueEntry.fetch(
showardd3dc1992009-04-22 21:01:40 +0000878 where='active AND NOT complete AND '
879 '(aborted OR status != "Pending")')
showardd3dc1992009-04-22 21:01:40 +0000880
showard2fe3f1d2009-07-06 20:19:11 +0000881 message = '\n'.join(str(entry) for entry in queue_entries
882 if not self.get_agents_for_entry(entry))
883 if message:
884 email_manager.manager.enqueue_notify_email(
885 'Unrecovered active host queue entries exist',
886 message)
showard170873e2009-01-07 00:22:26 +0000887
888
showard1ff7b2e2009-05-15 23:17:18 +0000889 def _find_reverify(self):
showard6d7b2ff2009-06-10 00:16:47 +0000890 tasks = models.SpecialTask.objects.filter(
showard2fe3f1d2009-07-06 20:19:11 +0000891 task=models.SpecialTask.Task.VERIFY, is_active=False,
892 is_complete=False, queue_entry__isnull=True)
showard6d7b2ff2009-06-10 00:16:47 +0000893
showard2fe3f1d2009-07-06 20:19:11 +0000894 for task in tasks:
895 host = Host.fetch(where='id = %s', params=(task.host.id,)).next()
896 if host.locked or host.invalid or self.host_has_agent(host):
897 continue
showard6d7b2ff2009-06-10 00:16:47 +0000898
showard2fe3f1d2009-07-06 20:19:11 +0000899 logging.info('Force reverifying host %s', host.hostname)
900 self.add_agent(Agent([VerifyTask(host=host, task=task)]))
showard1ff7b2e2009-05-15 23:17:18 +0000901
902
showard170873e2009-01-07 00:22:26 +0000903 def _reverify_remaining_hosts(self):
showard45ae8192008-11-05 19:32:53 +0000904 # reverify hosts that were in the middle of verify, repair or cleanup
jadmanski0afbb632008-06-06 21:10:57 +0000905 self._reverify_hosts_where("""(status = 'Repairing' OR
906 status = 'Verifying' OR
showard170873e2009-01-07 00:22:26 +0000907 status = 'Cleaning')""")
jadmanski0afbb632008-06-06 21:10:57 +0000908
showard170873e2009-01-07 00:22:26 +0000909 # recover "Running" hosts with no active queue entries, although this
910 # should never happen
911 message = ('Recovering running host %s - this probably indicates a '
912 'scheduler bug')
jadmanski0afbb632008-06-06 21:10:57 +0000913 self._reverify_hosts_where("""status = 'Running' AND
914 id NOT IN (SELECT host_id
915 FROM host_queue_entries
916 WHERE active)""",
917 print_message=message)
mblighbb421852008-03-11 22:36:16 +0000918
919
jadmanski0afbb632008-06-06 21:10:57 +0000920 def _reverify_hosts_where(self, where,
showard2fe3f1d2009-07-06 20:19:11 +0000921 print_message='Reverifying host %s'):
showard170873e2009-01-07 00:22:26 +0000922 full_where='locked = 0 AND invalid = 0 AND ' + where
923 for host in Host.fetch(where=full_where):
924 if self.host_has_agent(host):
925 # host has already been recovered in some way
jadmanski0afbb632008-06-06 21:10:57 +0000926 continue
showard170873e2009-01-07 00:22:26 +0000927 if print_message:
showardb18134f2009-03-20 20:52:18 +0000928 logging.info(print_message, host.hostname)
showard2fe3f1d2009-07-06 20:19:11 +0000929 tasks = host.reverify_tasks()
showard170873e2009-01-07 00:22:26 +0000930 self.add_agent(Agent(tasks))
mbligh36768f02008-02-22 18:28:33 +0000931
932
jadmanski0afbb632008-06-06 21:10:57 +0000933 def _recover_hosts(self):
934 # recover "Repair Failed" hosts
935 message = 'Reverifying dead host %s'
936 self._reverify_hosts_where("status = 'Repair Failed'",
937 print_message=message)
mbligh62ba2ed2008-04-30 17:09:25 +0000938
939
showard04c82c52008-05-29 19:38:12 +0000940
showardb95b1bd2008-08-15 18:11:04 +0000941 def _get_pending_queue_entries(self):
showard63a34772008-08-18 19:32:50 +0000942 # prioritize by job priority, then non-metahost over metahost, then FIFO
943 return list(HostQueueEntry.fetch(
showard25cbdbd2009-02-17 20:57:21 +0000944 joins='INNER JOIN jobs ON (job_id=jobs.id)',
showardac9ce222008-12-03 18:19:44 +0000945 where='NOT complete AND NOT active AND status="Queued"',
showard25cbdbd2009-02-17 20:57:21 +0000946 order_by='jobs.priority DESC, meta_host, job_id'))
mbligh36768f02008-02-22 18:28:33 +0000947
948
showard89f84db2009-03-12 20:39:13 +0000949 def _refresh_pending_queue_entries(self):
950 """
951 Lookup the pending HostQueueEntries and call our HostScheduler
952 refresh() method given that list. Return the list.
953
954 @returns A list of pending HostQueueEntries sorted in priority order.
955 """
showard63a34772008-08-18 19:32:50 +0000956 queue_entries = self._get_pending_queue_entries()
957 if not queue_entries:
showard89f84db2009-03-12 20:39:13 +0000958 return []
showardb95b1bd2008-08-15 18:11:04 +0000959
showard63a34772008-08-18 19:32:50 +0000960 self._host_scheduler.refresh(queue_entries)
showardb95b1bd2008-08-15 18:11:04 +0000961
showard89f84db2009-03-12 20:39:13 +0000962 return queue_entries
963
964
965 def _schedule_atomic_group(self, queue_entry):
966 """
967 Schedule the given queue_entry on an atomic group of hosts.
968
969 Returns immediately if there are insufficient available hosts.
970
971 Creates new HostQueueEntries based off of queue_entry for the
972 scheduled hosts and starts them all running.
973 """
974 # This is a virtual host queue entry representing an entire
975 # atomic group, find a group and schedule their hosts.
976 group_hosts = self._host_scheduler.find_eligible_atomic_group(
977 queue_entry)
978 if not group_hosts:
979 return
showardcbe6f942009-06-17 19:33:49 +0000980
981 logging.info('Expanding atomic group entry %s with hosts %s',
982 queue_entry,
983 ', '.join(host.hostname for host in group_hosts))
showard89f84db2009-03-12 20:39:13 +0000984 # The first assigned host uses the original HostQueueEntry
985 group_queue_entries = [queue_entry]
986 for assigned_host in group_hosts[1:]:
987 # Create a new HQE for every additional assigned_host.
988 new_hqe = HostQueueEntry.clone(queue_entry)
989 new_hqe.save()
990 group_queue_entries.append(new_hqe)
991 assert len(group_queue_entries) == len(group_hosts)
992 for queue_entry, host in itertools.izip(group_queue_entries,
993 group_hosts):
994 self._run_queue_entry(queue_entry, host)
995
996
997 def _schedule_new_jobs(self):
998 queue_entries = self._refresh_pending_queue_entries()
999 if not queue_entries:
1000 return
1001
showard63a34772008-08-18 19:32:50 +00001002 for queue_entry in queue_entries:
showard89f84db2009-03-12 20:39:13 +00001003 if (queue_entry.atomic_group_id is None or
1004 queue_entry.host_id is not None):
1005 assigned_host = self._host_scheduler.find_eligible_host(
1006 queue_entry)
1007 if assigned_host:
1008 self._run_queue_entry(queue_entry, assigned_host)
1009 else:
1010 self._schedule_atomic_group(queue_entry)
showardb95b1bd2008-08-15 18:11:04 +00001011
1012
1013 def _run_queue_entry(self, queue_entry, host):
showard77182562009-06-10 00:16:05 +00001014 agent = queue_entry.run_pre_job_tasks(assigned_host=host)
1015 self.add_agent(agent)
mblighd5c95802008-03-05 00:33:46 +00001016
1017
jadmanski0afbb632008-06-06 21:10:57 +00001018 def _find_aborting(self):
showardd3dc1992009-04-22 21:01:40 +00001019 for entry in HostQueueEntry.fetch(where='aborted and not complete'):
1020 for agent in self.get_agents_for_entry(entry):
1021 agent.abort()
1022 entry.abort(self)
jadmanski0afbb632008-06-06 21:10:57 +00001023
1024
showard324bf812009-01-20 23:23:38 +00001025 def _can_start_agent(self, agent, num_started_this_cycle,
1026 have_reached_limit):
showard4c5374f2008-09-04 17:02:56 +00001027 # always allow zero-process agents to run
1028 if agent.num_processes == 0:
1029 return True
1030 # don't allow any nonzero-process agents to run after we've reached a
1031 # limit (this avoids starvation of many-process agents)
1032 if have_reached_limit:
1033 return False
1034 # total process throttling
showard324bf812009-01-20 23:23:38 +00001035 if agent.num_processes > _drone_manager.max_runnable_processes():
showard4c5374f2008-09-04 17:02:56 +00001036 return False
1037 # if a single agent exceeds the per-cycle throttling, still allow it to
1038 # run when it's the first agent in the cycle
1039 if num_started_this_cycle == 0:
1040 return True
1041 # per-cycle throttling
1042 if (num_started_this_cycle + agent.num_processes >
showardd1ee1dd2009-01-07 21:33:08 +00001043 scheduler_config.config.max_processes_started_per_cycle):
showard4c5374f2008-09-04 17:02:56 +00001044 return False
1045 return True
1046
1047
jadmanski0afbb632008-06-06 21:10:57 +00001048 def _handle_agents(self):
jadmanski0afbb632008-06-06 21:10:57 +00001049 num_started_this_cycle = 0
showard4c5374f2008-09-04 17:02:56 +00001050 have_reached_limit = False
1051 # iterate over copy, so we can remove agents during iteration
1052 for agent in list(self._agents):
1053 if agent.is_done():
showardb18134f2009-03-20 20:52:18 +00001054 logging.info("agent finished")
showard170873e2009-01-07 00:22:26 +00001055 self.remove_agent(agent)
showard4c5374f2008-09-04 17:02:56 +00001056 continue
1057 if not agent.is_running():
showard324bf812009-01-20 23:23:38 +00001058 if not self._can_start_agent(agent, num_started_this_cycle,
showard4c5374f2008-09-04 17:02:56 +00001059 have_reached_limit):
1060 have_reached_limit = True
1061 continue
showard4c5374f2008-09-04 17:02:56 +00001062 num_started_this_cycle += agent.num_processes
1063 agent.tick()
showarda9435c02009-05-13 21:28:17 +00001064 logging.info('%d running processes',
showardb18134f2009-03-20 20:52:18 +00001065 _drone_manager.total_running_processes())
mbligh36768f02008-02-22 18:28:33 +00001066
1067
showard29f7cd22009-04-29 21:16:24 +00001068 def _process_recurring_runs(self):
1069 recurring_runs = models.RecurringRun.objects.filter(
1070 start_date__lte=datetime.datetime.now())
1071 for rrun in recurring_runs:
1072 # Create job from template
1073 job = rrun.job
1074 info = rpc_utils.get_job_info(job)
showarda9435c02009-05-13 21:28:17 +00001075 options = job.get_object_dict()
showard29f7cd22009-04-29 21:16:24 +00001076
1077 host_objects = info['hosts']
1078 one_time_hosts = info['one_time_hosts']
1079 metahost_objects = info['meta_hosts']
1080 dependencies = info['dependencies']
1081 atomic_group = info['atomic_group']
1082
1083 for host in one_time_hosts or []:
1084 this_host = models.Host.create_one_time_host(host.hostname)
1085 host_objects.append(this_host)
1086
1087 try:
1088 rpc_utils.create_new_job(owner=rrun.owner.login,
showarda9435c02009-05-13 21:28:17 +00001089 options=options,
showard29f7cd22009-04-29 21:16:24 +00001090 host_objects=host_objects,
1091 metahost_objects=metahost_objects,
showard29f7cd22009-04-29 21:16:24 +00001092 atomic_group=atomic_group)
1093
1094 except Exception, ex:
1095 logging.exception(ex)
1096 #TODO send email
1097
1098 if rrun.loop_count == 1:
1099 rrun.delete()
1100 else:
1101 if rrun.loop_count != 0: # if not infinite loop
1102 # calculate new start_date
1103 difference = datetime.timedelta(seconds=rrun.loop_period)
1104 rrun.start_date = rrun.start_date + difference
1105 rrun.loop_count -= 1
1106 rrun.save()
1107
1108
showard170873e2009-01-07 00:22:26 +00001109class PidfileRunMonitor(object):
1110 """
1111 Client must call either run() to start a new process or
1112 attach_to_existing_process().
1113 """
mbligh36768f02008-02-22 18:28:33 +00001114
showard170873e2009-01-07 00:22:26 +00001115 class _PidfileException(Exception):
1116 """
1117 Raised when there's some unexpected behavior with the pid file, but only
1118 used internally (never allowed to escape this class).
1119 """
mbligh36768f02008-02-22 18:28:33 +00001120
1121
showard170873e2009-01-07 00:22:26 +00001122 def __init__(self):
showard35162b02009-03-03 02:17:30 +00001123 self.lost_process = False
showard170873e2009-01-07 00:22:26 +00001124 self._start_time = None
1125 self.pidfile_id = None
1126 self._state = drone_manager.PidfileContents()
showard2bab8f42008-11-12 18:15:22 +00001127
1128
showard170873e2009-01-07 00:22:26 +00001129 def _add_nice_command(self, command, nice_level):
1130 if not nice_level:
1131 return command
1132 return ['nice', '-n', str(nice_level)] + command
1133
1134
1135 def _set_start_time(self):
1136 self._start_time = time.time()
1137
1138
1139 def run(self, command, working_directory, nice_level=None, log_file=None,
1140 pidfile_name=None, paired_with_pidfile=None):
1141 assert command is not None
1142 if nice_level is not None:
1143 command = ['nice', '-n', str(nice_level)] + command
1144 self._set_start_time()
1145 self.pidfile_id = _drone_manager.execute_command(
showardd3dc1992009-04-22 21:01:40 +00001146 command, working_directory, pidfile_name=pidfile_name,
1147 log_file=log_file, paired_with_pidfile=paired_with_pidfile)
showard170873e2009-01-07 00:22:26 +00001148
1149
showardd3dc1992009-04-22 21:01:40 +00001150 def attach_to_existing_process(self, execution_tag,
1151 pidfile_name=_AUTOSERV_PID_FILE):
showard170873e2009-01-07 00:22:26 +00001152 self._set_start_time()
showardd3dc1992009-04-22 21:01:40 +00001153 self.pidfile_id = _drone_manager.get_pidfile_id_from(
1154 execution_tag, pidfile_name=pidfile_name)
showard170873e2009-01-07 00:22:26 +00001155 _drone_manager.register_pidfile(self.pidfile_id)
mblighbb421852008-03-11 22:36:16 +00001156
1157
jadmanski0afbb632008-06-06 21:10:57 +00001158 def kill(self):
showard170873e2009-01-07 00:22:26 +00001159 if self.has_process():
1160 _drone_manager.kill_process(self.get_process())
mblighbb421852008-03-11 22:36:16 +00001161
mbligh36768f02008-02-22 18:28:33 +00001162
showard170873e2009-01-07 00:22:26 +00001163 def has_process(self):
showard21baa452008-10-21 00:08:39 +00001164 self._get_pidfile_info()
showard170873e2009-01-07 00:22:26 +00001165 return self._state.process is not None
showard21baa452008-10-21 00:08:39 +00001166
1167
showard170873e2009-01-07 00:22:26 +00001168 def get_process(self):
showard21baa452008-10-21 00:08:39 +00001169 self._get_pidfile_info()
showard35162b02009-03-03 02:17:30 +00001170 assert self._state.process is not None
showard170873e2009-01-07 00:22:26 +00001171 return self._state.process
mblighbb421852008-03-11 22:36:16 +00001172
1173
showard170873e2009-01-07 00:22:26 +00001174 def _read_pidfile(self, use_second_read=False):
1175 assert self.pidfile_id is not None, (
1176 'You must call run() or attach_to_existing_process()')
1177 contents = _drone_manager.get_pidfile_contents(
1178 self.pidfile_id, use_second_read=use_second_read)
1179 if contents.is_invalid():
1180 self._state = drone_manager.PidfileContents()
1181 raise self._PidfileException(contents)
1182 self._state = contents
mbligh90a549d2008-03-25 23:52:34 +00001183
1184
showard21baa452008-10-21 00:08:39 +00001185 def _handle_pidfile_error(self, error, message=''):
showard170873e2009-01-07 00:22:26 +00001186 message = error + '\nProcess: %s\nPidfile: %s\n%s' % (
1187 self._state.process, self.pidfile_id, message)
showardb18134f2009-03-20 20:52:18 +00001188 logging.info(message)
showard170873e2009-01-07 00:22:26 +00001189 email_manager.manager.enqueue_notify_email(error, message)
showard35162b02009-03-03 02:17:30 +00001190 self.on_lost_process(self._state.process)
showard21baa452008-10-21 00:08:39 +00001191
1192
1193 def _get_pidfile_info_helper(self):
showard35162b02009-03-03 02:17:30 +00001194 if self.lost_process:
showard21baa452008-10-21 00:08:39 +00001195 return
mblighbb421852008-03-11 22:36:16 +00001196
showard21baa452008-10-21 00:08:39 +00001197 self._read_pidfile()
mblighbb421852008-03-11 22:36:16 +00001198
showard170873e2009-01-07 00:22:26 +00001199 if self._state.process is None:
1200 self._handle_no_process()
showard21baa452008-10-21 00:08:39 +00001201 return
mbligh90a549d2008-03-25 23:52:34 +00001202
showard21baa452008-10-21 00:08:39 +00001203 if self._state.exit_status is None:
jadmanski0afbb632008-06-06 21:10:57 +00001204 # double check whether or not autoserv is running
showard170873e2009-01-07 00:22:26 +00001205 if _drone_manager.is_process_running(self._state.process):
showard21baa452008-10-21 00:08:39 +00001206 return
mbligh90a549d2008-03-25 23:52:34 +00001207
showard170873e2009-01-07 00:22:26 +00001208 # pid but no running process - maybe process *just* exited
1209 self._read_pidfile(use_second_read=True)
showard21baa452008-10-21 00:08:39 +00001210 if self._state.exit_status is None:
jadmanski0afbb632008-06-06 21:10:57 +00001211 # autoserv exited without writing an exit code
1212 # to the pidfile
showard21baa452008-10-21 00:08:39 +00001213 self._handle_pidfile_error(
1214 'autoserv died without writing exit code')
mblighbb421852008-03-11 22:36:16 +00001215
showard21baa452008-10-21 00:08:39 +00001216
1217 def _get_pidfile_info(self):
1218 """\
1219 After completion, self._state will contain:
1220 pid=None, exit_status=None if autoserv has not yet run
1221 pid!=None, exit_status=None if autoserv is running
1222 pid!=None, exit_status!=None if autoserv has completed
1223 """
1224 try:
1225 self._get_pidfile_info_helper()
showard170873e2009-01-07 00:22:26 +00001226 except self._PidfileException, exc:
showard21baa452008-10-21 00:08:39 +00001227 self._handle_pidfile_error('Pidfile error', traceback.format_exc())
mblighbb421852008-03-11 22:36:16 +00001228
1229
showard170873e2009-01-07 00:22:26 +00001230 def _handle_no_process(self):
jadmanski0afbb632008-06-06 21:10:57 +00001231 """\
1232 Called when no pidfile is found or no pid is in the pidfile.
1233 """
showard170873e2009-01-07 00:22:26 +00001234 message = 'No pid found at %s' % self.pidfile_id
showardb18134f2009-03-20 20:52:18 +00001235 logging.info(message)
showard170873e2009-01-07 00:22:26 +00001236 if time.time() - self._start_time > PIDFILE_TIMEOUT:
1237 email_manager.manager.enqueue_notify_email(
jadmanski0afbb632008-06-06 21:10:57 +00001238 'Process has failed to write pidfile', message)
showard35162b02009-03-03 02:17:30 +00001239 self.on_lost_process()
mbligh90a549d2008-03-25 23:52:34 +00001240
1241
showard35162b02009-03-03 02:17:30 +00001242 def on_lost_process(self, process=None):
jadmanski0afbb632008-06-06 21:10:57 +00001243 """\
1244 Called when autoserv has exited without writing an exit status,
1245 or we've timed out waiting for autoserv to write a pid to the
1246 pidfile. In either case, we just return failure and the caller
1247 should signal some kind of warning.
mbligh90a549d2008-03-25 23:52:34 +00001248
showard170873e2009-01-07 00:22:26 +00001249 process is unimportant here, as it shouldn't be used by anyone.
jadmanski0afbb632008-06-06 21:10:57 +00001250 """
1251 self.lost_process = True
showard170873e2009-01-07 00:22:26 +00001252 self._state.process = process
showard21baa452008-10-21 00:08:39 +00001253 self._state.exit_status = 1
1254 self._state.num_tests_failed = 0
mbligh90a549d2008-03-25 23:52:34 +00001255
1256
jadmanski0afbb632008-06-06 21:10:57 +00001257 def exit_code(self):
showard21baa452008-10-21 00:08:39 +00001258 self._get_pidfile_info()
1259 return self._state.exit_status
1260
1261
1262 def num_tests_failed(self):
1263 self._get_pidfile_info()
1264 assert self._state.num_tests_failed is not None
1265 return self._state.num_tests_failed
mblighbb421852008-03-11 22:36:16 +00001266
1267
mbligh36768f02008-02-22 18:28:33 +00001268class Agent(object):
showard77182562009-06-10 00:16:05 +00001269 """
1270 An agent for use by the Dispatcher class to perform a sequence of tasks.
1271
1272 The following methods are required on all task objects:
1273 poll() - Called periodically to let the task check its status and
1274 update its internal state. If the task succeeded.
1275 is_done() - Returns True if the task is finished.
1276 abort() - Called when an abort has been requested. The task must
1277 set its aborted attribute to True if it actually aborted.
1278
1279 The following attributes are required on all task objects:
1280 aborted - bool, True if this task was aborted.
1281 failure_tasks - A sequence of tasks to be run using a new Agent
1282 by the dispatcher should this task fail.
1283 success - bool, True if this task succeeded.
1284 queue_entry_ids - A sequence of HostQueueEntry ids this task handles.
1285 host_ids - A sequence of Host ids this task represents.
1286
1287 The following attribute is written to all task objects:
1288 agent - A reference to the Agent instance that the task has been
1289 added to.
1290 """
1291
1292
showard170873e2009-01-07 00:22:26 +00001293 def __init__(self, tasks, num_processes=1):
showard77182562009-06-10 00:16:05 +00001294 """
1295 @param tasks: A list of tasks as described in the class docstring.
1296 @param num_processes: The number of subprocesses the Agent represents.
1297 This is used by the Dispatcher for managing the load on the
1298 system. Defaults to 1.
1299 """
jadmanski0afbb632008-06-06 21:10:57 +00001300 self.active_task = None
showardd3dc1992009-04-22 21:01:40 +00001301 self.queue = None
showard77182562009-06-10 00:16:05 +00001302 # This is filled in by Dispatcher.add_agent()
jadmanski0afbb632008-06-06 21:10:57 +00001303 self.dispatcher = None
showard4c5374f2008-09-04 17:02:56 +00001304 self.num_processes = num_processes
jadmanski0afbb632008-06-06 21:10:57 +00001305
showard170873e2009-01-07 00:22:26 +00001306 self.queue_entry_ids = self._union_ids(task.queue_entry_ids
1307 for task in tasks)
1308 self.host_ids = self._union_ids(task.host_ids for task in tasks)
1309
showardd3dc1992009-04-22 21:01:40 +00001310 self._clear_queue()
jadmanski0afbb632008-06-06 21:10:57 +00001311 for task in tasks:
1312 self.add_task(task)
mbligh36768f02008-02-22 18:28:33 +00001313
1314
showardd3dc1992009-04-22 21:01:40 +00001315 def _clear_queue(self):
1316 self.queue = Queue.Queue(0)
1317
1318
showard170873e2009-01-07 00:22:26 +00001319 def _union_ids(self, id_lists):
1320 return set(itertools.chain(*id_lists))
1321
1322
jadmanski0afbb632008-06-06 21:10:57 +00001323 def add_task(self, task):
1324 self.queue.put_nowait(task)
1325 task.agent = self
mbligh36768f02008-02-22 18:28:33 +00001326
1327
jadmanski0afbb632008-06-06 21:10:57 +00001328 def tick(self):
showard21baa452008-10-21 00:08:39 +00001329 while not self.is_done():
showard08a36412009-05-05 01:01:13 +00001330 if self.active_task:
showard21baa452008-10-21 00:08:39 +00001331 self.active_task.poll()
1332 if not self.active_task.is_done():
1333 return
1334 self._next_task()
mbligh36768f02008-02-22 18:28:33 +00001335
1336
jadmanski0afbb632008-06-06 21:10:57 +00001337 def _next_task(self):
showardb18134f2009-03-20 20:52:18 +00001338 logging.info("agent picking task")
showard08a36412009-05-05 01:01:13 +00001339 if self.active_task is not None:
jadmanski0afbb632008-06-06 21:10:57 +00001340 assert self.active_task.is_done()
jadmanski0afbb632008-06-06 21:10:57 +00001341 if not self.active_task.success:
1342 self.on_task_failure()
showard08a36412009-05-05 01:01:13 +00001343 self.active_task = None
mblighe2586682008-02-29 22:45:46 +00001344
jadmanski0afbb632008-06-06 21:10:57 +00001345 if not self.is_done():
1346 self.active_task = self.queue.get_nowait()
mbligh36768f02008-02-22 18:28:33 +00001347
1348
jadmanski0afbb632008-06-06 21:10:57 +00001349 def on_task_failure(self):
showardd3dc1992009-04-22 21:01:40 +00001350 self._clear_queue()
showardccbd6c52009-03-21 00:10:21 +00001351 # run failure tasks in a new Agent, so host_ids and queue_entry_ids will
1352 # get reset.
1353 new_agent = Agent(self.active_task.failure_tasks)
1354 self.dispatcher.add_agent(new_agent)
mbligh16c722d2008-03-05 00:58:44 +00001355
mblighe2586682008-02-29 22:45:46 +00001356
showard4c5374f2008-09-04 17:02:56 +00001357 def is_running(self):
jadmanski0afbb632008-06-06 21:10:57 +00001358 return self.active_task is not None
showardec113162008-05-08 00:52:49 +00001359
1360
jadmanski0afbb632008-06-06 21:10:57 +00001361 def is_done(self):
mblighd876f452008-12-03 15:09:17 +00001362 return self.active_task is None and self.queue.empty()
mbligh36768f02008-02-22 18:28:33 +00001363
1364
showardd3dc1992009-04-22 21:01:40 +00001365 def abort(self):
showard08a36412009-05-05 01:01:13 +00001366 # abort tasks until the queue is empty or a task ignores the abort
1367 while not self.is_done():
1368 if not self.active_task:
1369 self._next_task()
showardd3dc1992009-04-22 21:01:40 +00001370 self.active_task.abort()
showard08a36412009-05-05 01:01:13 +00001371 if not self.active_task.aborted:
1372 # tasks can choose to ignore aborts
showard20f9bdd2009-04-29 19:48:33 +00001373 return
1374 self.active_task = None
1375
showardd3dc1992009-04-22 21:01:40 +00001376
showard77182562009-06-10 00:16:05 +00001377class DelayedCallTask(object):
1378 """
1379 A task object like AgentTask for an Agent to run that waits for the
1380 specified amount of time to have elapsed before calling the supplied
1381 callback once and finishing. If the callback returns anything, it is
1382 assumed to be a new Agent instance and will be added to the dispatcher.
1383
1384 @attribute end_time: The absolute posix time after which this task will
1385 call its callback when it is polled and be finished.
1386
1387 Also has all attributes required by the Agent class.
1388 """
1389 def __init__(self, delay_seconds, callback, now_func=None):
1390 """
1391 @param delay_seconds: The delay in seconds from now that this task
1392 will call the supplied callback and be done.
1393 @param callback: A callable to be called by this task once after at
1394 least delay_seconds time has elapsed. It must return None
1395 or a new Agent instance.
1396 @param now_func: A time.time like function. Default: time.time.
1397 Used for testing.
1398 """
1399 assert delay_seconds > 0
1400 assert callable(callback)
1401 if not now_func:
1402 now_func = time.time
1403 self._now_func = now_func
1404 self._callback = callback
1405
1406 self.end_time = self._now_func() + delay_seconds
1407
1408 # These attributes are required by Agent.
1409 self.aborted = False
1410 self.failure_tasks = ()
1411 self.host_ids = ()
1412 self.success = False
1413 self.queue_entry_ids = ()
1414 # This is filled in by Agent.add_task().
1415 self.agent = None
1416
1417
1418 def poll(self):
1419 if self._callback and self._now_func() >= self.end_time:
1420 new_agent = self._callback()
1421 if new_agent:
1422 self.agent.dispatcher.add_agent(new_agent)
1423 self._callback = None
1424 self.success = True
1425
1426
1427 def is_done(self):
1428 return not self._callback
1429
1430
1431 def abort(self):
1432 self.aborted = True
1433 self._callback = None
1434
1435
mbligh36768f02008-02-22 18:28:33 +00001436class AgentTask(object):
showardd3dc1992009-04-22 21:01:40 +00001437 def __init__(self, cmd, working_directory=None, failure_tasks=[],
1438 pidfile_name=None, paired_with_pidfile=None):
jadmanski0afbb632008-06-06 21:10:57 +00001439 self.done = False
1440 self.failure_tasks = failure_tasks
1441 self.started = False
1442 self.cmd = cmd
showard170873e2009-01-07 00:22:26 +00001443 self._working_directory = working_directory
jadmanski0afbb632008-06-06 21:10:57 +00001444 self.task = None
1445 self.agent = None
1446 self.monitor = None
1447 self.success = None
showardd3dc1992009-04-22 21:01:40 +00001448 self.aborted = False
showard170873e2009-01-07 00:22:26 +00001449 self.queue_entry_ids = []
1450 self.host_ids = []
1451 self.log_file = None
1452
1453
1454 def _set_ids(self, host=None, queue_entries=None):
1455 if queue_entries and queue_entries != [None]:
1456 self.host_ids = [entry.host.id for entry in queue_entries]
1457 self.queue_entry_ids = [entry.id for entry in queue_entries]
1458 else:
1459 assert host
1460 self.host_ids = [host.id]
mbligh36768f02008-02-22 18:28:33 +00001461
1462
jadmanski0afbb632008-06-06 21:10:57 +00001463 def poll(self):
showard08a36412009-05-05 01:01:13 +00001464 if not self.started:
1465 self.start()
1466 self.tick()
1467
1468
1469 def tick(self):
jadmanski0afbb632008-06-06 21:10:57 +00001470 if self.monitor:
showard08a36412009-05-05 01:01:13 +00001471 exit_code = self.monitor.exit_code()
1472 if exit_code is None:
1473 return
1474 success = (exit_code == 0)
jadmanski0afbb632008-06-06 21:10:57 +00001475 else:
1476 success = False
mbligh36768f02008-02-22 18:28:33 +00001477
jadmanski0afbb632008-06-06 21:10:57 +00001478 self.finished(success)
mbligh36768f02008-02-22 18:28:33 +00001479
1480
jadmanski0afbb632008-06-06 21:10:57 +00001481 def is_done(self):
1482 return self.done
mbligh36768f02008-02-22 18:28:33 +00001483
1484
jadmanski0afbb632008-06-06 21:10:57 +00001485 def finished(self, success):
showard08a36412009-05-05 01:01:13 +00001486 if self.done:
1487 return
jadmanski0afbb632008-06-06 21:10:57 +00001488 self.done = True
1489 self.success = success
1490 self.epilog()
mbligh36768f02008-02-22 18:28:33 +00001491
1492
jadmanski0afbb632008-06-06 21:10:57 +00001493 def prolog(self):
1494 pass
mblighd64e5702008-04-04 21:39:28 +00001495
1496
jadmanski0afbb632008-06-06 21:10:57 +00001497 def create_temp_resultsdir(self, suffix=''):
showard170873e2009-01-07 00:22:26 +00001498 self.temp_results_dir = _drone_manager.get_temporary_path('agent_task')
mblighd64e5702008-04-04 21:39:28 +00001499
mbligh36768f02008-02-22 18:28:33 +00001500
jadmanski0afbb632008-06-06 21:10:57 +00001501 def cleanup(self):
showard6b733412009-04-27 20:09:18 +00001502 if self.monitor and self.monitor.has_process() and self.log_file:
showard170873e2009-01-07 00:22:26 +00001503 _drone_manager.copy_to_results_repository(
1504 self.monitor.get_process(), self.log_file)
mbligh36768f02008-02-22 18:28:33 +00001505
1506
jadmanski0afbb632008-06-06 21:10:57 +00001507 def epilog(self):
1508 self.cleanup()
mbligh36768f02008-02-22 18:28:33 +00001509
1510
jadmanski0afbb632008-06-06 21:10:57 +00001511 def start(self):
1512 assert self.agent
1513
1514 if not self.started:
1515 self.prolog()
1516 self.run()
1517
1518 self.started = True
1519
1520
1521 def abort(self):
1522 if self.monitor:
1523 self.monitor.kill()
1524 self.done = True
showardd3dc1992009-04-22 21:01:40 +00001525 self.aborted = True
jadmanski0afbb632008-06-06 21:10:57 +00001526 self.cleanup()
1527
1528
showard170873e2009-01-07 00:22:26 +00001529 def set_host_log_file(self, base_name, host):
1530 filename = '%s.%s' % (time.time(), base_name)
1531 self.log_file = os.path.join('hosts', host.hostname, filename)
1532
1533
showardde634ee2009-01-30 01:44:24 +00001534 def _get_consistent_execution_tag(self, queue_entries):
1535 first_execution_tag = queue_entries[0].execution_tag()
1536 for queue_entry in queue_entries[1:]:
1537 assert queue_entry.execution_tag() == first_execution_tag, (
1538 '%s (%s) != %s (%s)' % (queue_entry.execution_tag(),
1539 queue_entry,
1540 first_execution_tag,
1541 queue_entries[0]))
1542 return first_execution_tag
1543
1544
showarda1e74b32009-05-12 17:32:04 +00001545 def _copy_results(self, queue_entries, use_monitor=None):
showardde634ee2009-01-30 01:44:24 +00001546 assert len(queue_entries) > 0
showard6b733412009-04-27 20:09:18 +00001547 if use_monitor is None:
1548 assert self.monitor
1549 use_monitor = self.monitor
1550 assert use_monitor.has_process()
showardde634ee2009-01-30 01:44:24 +00001551 execution_tag = self._get_consistent_execution_tag(queue_entries)
showard678df4f2009-02-04 21:36:39 +00001552 results_path = execution_tag + '/'
showard6b733412009-04-27 20:09:18 +00001553 _drone_manager.copy_to_results_repository(use_monitor.get_process(),
showard678df4f2009-02-04 21:36:39 +00001554 results_path)
showardde634ee2009-01-30 01:44:24 +00001555
showarda1e74b32009-05-12 17:32:04 +00001556
1557 def _parse_results(self, queue_entries):
showardde634ee2009-01-30 01:44:24 +00001558 reparse_task = FinalReparseTask(queue_entries)
1559 self.agent.dispatcher.add_agent(Agent([reparse_task], num_processes=0))
1560
1561
showarda1e74b32009-05-12 17:32:04 +00001562 def _copy_and_parse_results(self, queue_entries, use_monitor=None):
1563 self._copy_results(queue_entries, use_monitor)
1564 self._parse_results(queue_entries)
1565
1566
showardd3dc1992009-04-22 21:01:40 +00001567 def run(self, pidfile_name=_AUTOSERV_PID_FILE, paired_with_pidfile=None):
jadmanski0afbb632008-06-06 21:10:57 +00001568 if self.cmd:
showard170873e2009-01-07 00:22:26 +00001569 self.monitor = PidfileRunMonitor()
1570 self.monitor.run(self.cmd, self._working_directory,
1571 nice_level=AUTOSERV_NICE_LEVEL,
showardd3dc1992009-04-22 21:01:40 +00001572 log_file=self.log_file,
1573 pidfile_name=pidfile_name,
1574 paired_with_pidfile=paired_with_pidfile)
mbligh36768f02008-02-22 18:28:33 +00001575
1576
showardd9205182009-04-27 20:09:55 +00001577class TaskWithJobKeyvals(object):
1578 """AgentTask mixin providing functionality to help with job keyval files."""
1579 _KEYVAL_FILE = 'keyval'
1580 def _format_keyval(self, key, value):
1581 return '%s=%s' % (key, value)
1582
1583
1584 def _keyval_path(self):
1585 """Subclasses must override this"""
1586 raise NotImplemented
1587
1588
1589 def _write_keyval_after_job(self, field, value):
1590 assert self.monitor
1591 if not self.monitor.has_process():
1592 return
1593 _drone_manager.write_lines_to_file(
1594 self._keyval_path(), [self._format_keyval(field, value)],
1595 paired_with_process=self.monitor.get_process())
1596
1597
1598 def _job_queued_keyval(self, job):
1599 return 'job_queued', int(time.mktime(job.created_on.timetuple()))
1600
1601
1602 def _write_job_finished(self):
1603 self._write_keyval_after_job("job_finished", int(time.time()))
1604
1605
1606class RepairTask(AgentTask, TaskWithJobKeyvals):
showard2fe3f1d2009-07-06 20:19:11 +00001607 def __init__(self, host, queue_entry=None, task=None):
jadmanski0afbb632008-06-06 21:10:57 +00001608 """\
showard170873e2009-01-07 00:22:26 +00001609 queue_entry: queue entry to mark failed if this repair fails.
jadmanski0afbb632008-06-06 21:10:57 +00001610 """
jadmanskifb7cfb12008-07-09 14:13:21 +00001611 protection = host_protections.Protection.get_string(host.protection)
jadmanski542537f2008-07-24 14:14:56 +00001612 # normalize the protection name
1613 protection = host_protections.Protection.get_attr_name(protection)
showard170873e2009-01-07 00:22:26 +00001614
jadmanski0afbb632008-06-06 21:10:57 +00001615 self.host = host
showard2fe3f1d2009-07-06 20:19:11 +00001616 self.queue_entry = queue_entry
showard170873e2009-01-07 00:22:26 +00001617
1618 self.create_temp_resultsdir('.repair')
showard87ba02a2009-04-20 19:37:32 +00001619 cmd = _autoserv_command_line(host.hostname, self.temp_results_dir,
1620 ['-R', '--host-protection', protection],
1621 queue_entry=queue_entry)
showard170873e2009-01-07 00:22:26 +00001622 super(RepairTask, self).__init__(cmd, self.temp_results_dir)
1623
showard2fe3f1d2009-07-06 20:19:11 +00001624 # *don't* include the queue entry in IDs -- if the queue entry is
1625 # aborted, we want to leave the repair task running
1626 self._set_ids(host=host)
1627
showard170873e2009-01-07 00:22:26 +00001628 self.set_host_log_file('repair', self.host)
showard2fe3f1d2009-07-06 20:19:11 +00001629 self._task = task
mblighe2586682008-02-29 22:45:46 +00001630
mbligh36768f02008-02-22 18:28:33 +00001631
jadmanski0afbb632008-06-06 21:10:57 +00001632 def prolog(self):
showardb18134f2009-03-20 20:52:18 +00001633 logging.info("repair_task starting")
jadmanski0afbb632008-06-06 21:10:57 +00001634 self.host.set_status('Repairing')
showard2fe3f1d2009-07-06 20:19:11 +00001635 if self.queue_entry:
1636 self.queue_entry.requeue()
1637
1638 self.task_type = models.SpecialTask.Task.REPAIR
1639 self._task = models.SpecialTask.prepare(self, self._task)
mbligh36768f02008-02-22 18:28:33 +00001640
1641
showardd9205182009-04-27 20:09:55 +00001642 def _keyval_path(self):
1643 return os.path.join(self.temp_results_dir, self._KEYVAL_FILE)
1644
1645
showardde634ee2009-01-30 01:44:24 +00001646 def _fail_queue_entry(self):
showard2fe3f1d2009-07-06 20:19:11 +00001647 assert self.queue_entry
showardccbd6c52009-03-21 00:10:21 +00001648
showard2fe3f1d2009-07-06 20:19:11 +00001649 if self.queue_entry.meta_host:
showardccbd6c52009-03-21 00:10:21 +00001650 return # don't fail metahost entries, they'll be reassigned
1651
showard2fe3f1d2009-07-06 20:19:11 +00001652 self.queue_entry.update_from_database()
1653 if self.queue_entry.status != 'Queued':
showardccbd6c52009-03-21 00:10:21 +00001654 return # entry has been aborted
1655
showard2fe3f1d2009-07-06 20:19:11 +00001656 self.queue_entry.set_execution_subdir()
showardd9205182009-04-27 20:09:55 +00001657 queued_key, queued_time = self._job_queued_keyval(
showard2fe3f1d2009-07-06 20:19:11 +00001658 self.queue_entry.job)
showardd9205182009-04-27 20:09:55 +00001659 self._write_keyval_after_job(queued_key, queued_time)
1660 self._write_job_finished()
showard678df4f2009-02-04 21:36:39 +00001661 # copy results logs into the normal place for job results
1662 _drone_manager.copy_results_on_drone(
1663 self.monitor.get_process(),
1664 source_path=self.temp_results_dir + '/',
showard2fe3f1d2009-07-06 20:19:11 +00001665 destination_path=self.queue_entry.execution_tag() + '/')
showard678df4f2009-02-04 21:36:39 +00001666
showard2fe3f1d2009-07-06 20:19:11 +00001667 self._copy_results([self.queue_entry])
1668 if self.queue_entry.job.parse_failed_repair:
1669 self._parse_results([self.queue_entry])
1670 self.queue_entry.handle_host_failure()
showardde634ee2009-01-30 01:44:24 +00001671
1672
jadmanski0afbb632008-06-06 21:10:57 +00001673 def epilog(self):
1674 super(RepairTask, self).epilog()
showard6d7b2ff2009-06-10 00:16:47 +00001675
showard2fe3f1d2009-07-06 20:19:11 +00001676 self._task.finish()
showard6d7b2ff2009-06-10 00:16:47 +00001677
jadmanski0afbb632008-06-06 21:10:57 +00001678 if self.success:
1679 self.host.set_status('Ready')
1680 else:
1681 self.host.set_status('Repair Failed')
showard2fe3f1d2009-07-06 20:19:11 +00001682 if self.queue_entry:
showardde634ee2009-01-30 01:44:24 +00001683 self._fail_queue_entry()
mbligh36768f02008-02-22 18:28:33 +00001684
1685
showard8fe93b52008-11-18 17:53:22 +00001686class PreJobTask(AgentTask):
showard170873e2009-01-07 00:22:26 +00001687 def epilog(self):
1688 super(PreJobTask, self).epilog()
showard8fe93b52008-11-18 17:53:22 +00001689 should_copy_results = (self.queue_entry and not self.success
1690 and not self.queue_entry.meta_host)
1691 if should_copy_results:
1692 self.queue_entry.set_execution_subdir()
showard170873e2009-01-07 00:22:26 +00001693 destination = os.path.join(self.queue_entry.execution_tag(),
1694 os.path.basename(self.log_file))
1695 _drone_manager.copy_to_results_repository(
1696 self.monitor.get_process(), self.log_file,
1697 destination_path=destination)
showard8fe93b52008-11-18 17:53:22 +00001698
1699
1700class VerifyTask(PreJobTask):
showard2fe3f1d2009-07-06 20:19:11 +00001701 def __init__(self, queue_entry=None, host=None, task=None):
jadmanski0afbb632008-06-06 21:10:57 +00001702 assert bool(queue_entry) != bool(host)
jadmanski0afbb632008-06-06 21:10:57 +00001703 self.host = host or queue_entry.host
1704 self.queue_entry = queue_entry
mbligh36768f02008-02-22 18:28:33 +00001705
jadmanski0afbb632008-06-06 21:10:57 +00001706 self.create_temp_resultsdir('.verify')
showard87ba02a2009-04-20 19:37:32 +00001707 cmd = _autoserv_command_line(self.host.hostname, self.temp_results_dir,
1708 ['-v'], queue_entry=queue_entry)
showarde788ea62008-11-17 21:02:47 +00001709 failure_tasks = [RepairTask(self.host, queue_entry=queue_entry)]
showard170873e2009-01-07 00:22:26 +00001710 super(VerifyTask, self).__init__(cmd, self.temp_results_dir,
1711 failure_tasks=failure_tasks)
mblighe2586682008-02-29 22:45:46 +00001712
showard170873e2009-01-07 00:22:26 +00001713 self.set_host_log_file('verify', self.host)
1714 self._set_ids(host=host, queue_entries=[queue_entry])
showard2fe3f1d2009-07-06 20:19:11 +00001715 self._task = task
mblighe2586682008-02-29 22:45:46 +00001716
1717
jadmanski0afbb632008-06-06 21:10:57 +00001718 def prolog(self):
showard8fe93b52008-11-18 17:53:22 +00001719 super(VerifyTask, self).prolog()
showardb18134f2009-03-20 20:52:18 +00001720 logging.info("starting verify on %s", self.host.hostname)
jadmanski0afbb632008-06-06 21:10:57 +00001721 if self.queue_entry:
1722 self.queue_entry.set_status('Verifying')
jadmanski0afbb632008-06-06 21:10:57 +00001723 self.host.set_status('Verifying')
mbligh36768f02008-02-22 18:28:33 +00001724
showard2fe3f1d2009-07-06 20:19:11 +00001725 self.task_type = models.SpecialTask.Task.VERIFY
1726
1727 # Prepare all SpecialTasks associated with this verify.
1728 # Include "active" verify tasks for recovery; we want the new log file
1729 # and started time to be recorded
1730 self._tasks = list(models.SpecialTask.objects.filter(
1731 host__id=self.host.id,
1732 task=models.SpecialTask.Task.VERIFY,
1733 is_complete=False,
1734 queue_entry__isnull=True))
1735 task_not_included = (
1736 not self._task
1737 or self._task.id not in [task.id for task in self._tasks])
1738 if task_not_included:
1739 self._tasks.append(self._task)
1740 for i in range(len(self._tasks)):
1741 self._tasks[i] = models.SpecialTask.prepare(self, self._tasks[i])
1742
mbligh36768f02008-02-22 18:28:33 +00001743
jadmanski0afbb632008-06-06 21:10:57 +00001744 def epilog(self):
1745 super(VerifyTask, self).epilog()
mbligh36768f02008-02-22 18:28:33 +00001746
showard2fe3f1d2009-07-06 20:19:11 +00001747 for task in self._tasks:
1748 task.finish()
showard6d7b2ff2009-06-10 00:16:47 +00001749
showard2fe3f1d2009-07-06 20:19:11 +00001750 if self.success:
jadmanski0afbb632008-06-06 21:10:57 +00001751 self.host.set_status('Ready')
mbligh36768f02008-02-22 18:28:33 +00001752
1753
showardb5626452009-06-30 01:57:28 +00001754class CleanupHostsMixin(object):
1755 def _reboot_hosts(self, job, queue_entries, final_success,
1756 num_tests_failed):
1757 reboot_after = job.reboot_after
1758 do_reboot = (
1759 # always reboot after aborted jobs
1760 self._final_status == models.HostQueueEntry.Status.ABORTED
1761 or reboot_after == models.RebootAfter.ALWAYS
1762 or (reboot_after == models.RebootAfter.IF_ALL_TESTS_PASSED
1763 and final_success and num_tests_failed == 0))
1764
1765 for queue_entry in queue_entries:
1766 if do_reboot:
1767 # don't pass the queue entry to the CleanupTask. if the cleanup
1768 # fails, the job doesn't care -- it's over.
1769 cleanup_task = CleanupTask(host=queue_entry.host)
1770 self.agent.dispatcher.add_agent(Agent([cleanup_task]))
1771 else:
1772 queue_entry.host.set_status('Ready')
1773
1774
1775class QueueTask(AgentTask, TaskWithJobKeyvals, CleanupHostsMixin):
showardf1ae3542009-05-11 19:26:02 +00001776 def __init__(self, job, queue_entries, cmd, group_name=''):
jadmanski0afbb632008-06-06 21:10:57 +00001777 self.job = job
1778 self.queue_entries = queue_entries
showardf1ae3542009-05-11 19:26:02 +00001779 self.group_name = group_name
showard170873e2009-01-07 00:22:26 +00001780 super(QueueTask, self).__init__(cmd, self._execution_tag())
1781 self._set_ids(queue_entries=queue_entries)
mbligh36768f02008-02-22 18:28:33 +00001782
1783
showard73ec0442009-02-07 02:05:20 +00001784 def _keyval_path(self):
showardd9205182009-04-27 20:09:55 +00001785 return os.path.join(self._execution_tag(), self._KEYVAL_FILE)
showard73ec0442009-02-07 02:05:20 +00001786
1787
1788 def _write_keyvals_before_job_helper(self, keyval_dict, keyval_path):
1789 keyval_contents = '\n'.join(self._format_keyval(key, value)
1790 for key, value in keyval_dict.iteritems())
1791 # always end with a newline to allow additional keyvals to be written
1792 keyval_contents += '\n'
1793 _drone_manager.attach_file_to_execution(self._execution_tag(),
1794 keyval_contents,
1795 file_path=keyval_path)
1796
1797
1798 def _write_keyvals_before_job(self, keyval_dict):
1799 self._write_keyvals_before_job_helper(keyval_dict, self._keyval_path())
1800
1801
showard170873e2009-01-07 00:22:26 +00001802 def _write_host_keyvals(self, host):
1803 keyval_path = os.path.join(self._execution_tag(), 'host_keyvals',
1804 host.hostname)
1805 platform, all_labels = host.platform_and_labels()
showard73ec0442009-02-07 02:05:20 +00001806 keyval_dict = dict(platform=platform, labels=','.join(all_labels))
1807 self._write_keyvals_before_job_helper(keyval_dict, keyval_path)
showardd8e548a2008-09-09 03:04:57 +00001808
1809
showard170873e2009-01-07 00:22:26 +00001810 def _execution_tag(self):
1811 return self.queue_entries[0].execution_tag()
mblighbb421852008-03-11 22:36:16 +00001812
1813
jadmanski0afbb632008-06-06 21:10:57 +00001814 def prolog(self):
showardd9205182009-04-27 20:09:55 +00001815 queued_key, queued_time = self._job_queued_keyval(self.job)
showardf1ae3542009-05-11 19:26:02 +00001816 keyval_dict = {queued_key: queued_time}
1817 if self.group_name:
1818 keyval_dict['host_group_name'] = self.group_name
1819 self._write_keyvals_before_job(keyval_dict)
jadmanski0afbb632008-06-06 21:10:57 +00001820 for queue_entry in self.queue_entries:
showard170873e2009-01-07 00:22:26 +00001821 self._write_host_keyvals(queue_entry.host)
jadmanski0afbb632008-06-06 21:10:57 +00001822 queue_entry.set_status('Running')
showard12f3e322009-05-13 21:27:42 +00001823 queue_entry.update_field('started_on', datetime.datetime.now())
jadmanski0afbb632008-06-06 21:10:57 +00001824 queue_entry.host.set_status('Running')
showard21baa452008-10-21 00:08:39 +00001825 queue_entry.host.update_field('dirty', 1)
showard2bab8f42008-11-12 18:15:22 +00001826 if self.job.synch_count == 1:
jadmanski0afbb632008-06-06 21:10:57 +00001827 assert len(self.queue_entries) == 1
1828 self.job.write_to_machines_file(self.queue_entries[0])
mbligh36768f02008-02-22 18:28:33 +00001829
1830
showard35162b02009-03-03 02:17:30 +00001831 def _write_lost_process_error_file(self):
1832 error_file_path = os.path.join(self._execution_tag(), 'job_failure')
1833 _drone_manager.write_lines_to_file(error_file_path,
1834 [_LOST_PROCESS_ERROR])
1835
1836
showardd3dc1992009-04-22 21:01:40 +00001837 def _finish_task(self):
showard08a36412009-05-05 01:01:13 +00001838 if not self.monitor:
1839 return
1840
showardd9205182009-04-27 20:09:55 +00001841 self._write_job_finished()
1842
showardd3dc1992009-04-22 21:01:40 +00001843 # both of these conditionals can be true, iff the process ran, wrote a
1844 # pid to its pidfile, and then exited without writing an exit code
showard35162b02009-03-03 02:17:30 +00001845 if self.monitor.has_process():
showardd3dc1992009-04-22 21:01:40 +00001846 gather_task = GatherLogsTask(self.job, self.queue_entries)
1847 self.agent.dispatcher.add_agent(Agent(tasks=[gather_task]))
showardb5626452009-06-30 01:57:28 +00001848 else:
1849 self._reboot_hosts(self.job, self.queue_entries,
1850 final_success=False, num_tests_failed=0)
showard35162b02009-03-03 02:17:30 +00001851
1852 if self.monitor.lost_process:
1853 self._write_lost_process_error_file()
1854 for queue_entry in self.queue_entries:
1855 queue_entry.set_status(models.HostQueueEntry.Status.FAILED)
jadmanskif7fa2cc2008-10-01 14:13:23 +00001856
1857
showardcbd74612008-11-19 21:42:02 +00001858 def _write_status_comment(self, comment):
showard170873e2009-01-07 00:22:26 +00001859 _drone_manager.write_lines_to_file(
1860 os.path.join(self._execution_tag(), 'status.log'),
1861 ['INFO\t----\t----\t' + comment],
showard35162b02009-03-03 02:17:30 +00001862 paired_with_process=self.monitor.get_process())
showardcbd74612008-11-19 21:42:02 +00001863
1864
jadmanskif7fa2cc2008-10-01 14:13:23 +00001865 def _log_abort(self):
showard170873e2009-01-07 00:22:26 +00001866 if not self.monitor or not self.monitor.has_process():
1867 return
1868
jadmanskif7fa2cc2008-10-01 14:13:23 +00001869 # build up sets of all the aborted_by and aborted_on values
1870 aborted_by, aborted_on = set(), set()
1871 for queue_entry in self.queue_entries:
1872 if queue_entry.aborted_by:
1873 aborted_by.add(queue_entry.aborted_by)
1874 t = int(time.mktime(queue_entry.aborted_on.timetuple()))
1875 aborted_on.add(t)
1876
1877 # extract some actual, unique aborted by value and write it out
1878 assert len(aborted_by) <= 1
1879 if len(aborted_by) == 1:
showardcbd74612008-11-19 21:42:02 +00001880 aborted_by_value = aborted_by.pop()
1881 aborted_on_value = max(aborted_on)
1882 else:
1883 aborted_by_value = 'autotest_system'
1884 aborted_on_value = int(time.time())
showard170873e2009-01-07 00:22:26 +00001885
showarda0382352009-02-11 23:36:43 +00001886 self._write_keyval_after_job("aborted_by", aborted_by_value)
1887 self._write_keyval_after_job("aborted_on", aborted_on_value)
showard170873e2009-01-07 00:22:26 +00001888
showardcbd74612008-11-19 21:42:02 +00001889 aborted_on_string = str(datetime.datetime.fromtimestamp(
1890 aborted_on_value))
1891 self._write_status_comment('Job aborted by %s on %s' %
1892 (aborted_by_value, aborted_on_string))
jadmanskic2ac77f2008-05-16 21:44:04 +00001893
1894
jadmanski0afbb632008-06-06 21:10:57 +00001895 def abort(self):
1896 super(QueueTask, self).abort()
jadmanskif7fa2cc2008-10-01 14:13:23 +00001897 self._log_abort()
showardd3dc1992009-04-22 21:01:40 +00001898 self._finish_task()
showard21baa452008-10-21 00:08:39 +00001899
1900
jadmanski0afbb632008-06-06 21:10:57 +00001901 def epilog(self):
1902 super(QueueTask, self).epilog()
showardd3dc1992009-04-22 21:01:40 +00001903 self._finish_task()
1904 logging.info("queue_task finished with success=%s", self.success)
mbligh36768f02008-02-22 18:28:33 +00001905
1906
mblighbb421852008-03-11 22:36:16 +00001907class RecoveryQueueTask(QueueTask):
jadmanski0afbb632008-06-06 21:10:57 +00001908 def __init__(self, job, queue_entries, run_monitor):
showard170873e2009-01-07 00:22:26 +00001909 super(RecoveryQueueTask, self).__init__(job, queue_entries, cmd=None)
showard5add1c82009-05-26 19:27:46 +00001910 self.monitor = run_monitor
1911 self.started = True
1912 # since we set started=True here, prolog() and run() shouldn't be called
mblighbb421852008-03-11 22:36:16 +00001913
1914
jadmanski0afbb632008-06-06 21:10:57 +00001915 def run(self):
showard5add1c82009-05-26 19:27:46 +00001916 raise NotImplemented('This should never be called')
mblighbb421852008-03-11 22:36:16 +00001917
1918
jadmanski0afbb632008-06-06 21:10:57 +00001919 def prolog(self):
showard5add1c82009-05-26 19:27:46 +00001920 raise NotImplemented('This should never be called')
mblighbb421852008-03-11 22:36:16 +00001921
1922
showardd3dc1992009-04-22 21:01:40 +00001923class PostJobTask(AgentTask):
1924 def __init__(self, queue_entries, pidfile_name, logfile_name,
1925 run_monitor=None):
1926 """
1927 If run_monitor != None, we're recovering a running task.
1928 """
1929 self._queue_entries = queue_entries
1930 self._pidfile_name = pidfile_name
showardd3dc1992009-04-22 21:01:40 +00001931
1932 self._execution_tag = self._get_consistent_execution_tag(queue_entries)
1933 self._results_dir = _drone_manager.absolute_path(self._execution_tag)
1934 self._autoserv_monitor = PidfileRunMonitor()
1935 self._autoserv_monitor.attach_to_existing_process(self._execution_tag)
1936 self._paired_with_pidfile = self._autoserv_monitor.pidfile_id
1937
1938 if _testing_mode:
1939 command = 'true'
1940 else:
1941 command = self._generate_command(self._results_dir)
1942
1943 super(PostJobTask, self).__init__(cmd=command,
1944 working_directory=self._execution_tag)
showard5add1c82009-05-26 19:27:46 +00001945 # this must happen *after* the super call
1946 self.monitor = run_monitor
1947 if run_monitor:
1948 self.started = True
showardd3dc1992009-04-22 21:01:40 +00001949
1950 self.log_file = os.path.join(self._execution_tag, logfile_name)
1951 self._final_status = self._determine_final_status()
1952
1953
1954 def _generate_command(self, results_dir):
1955 raise NotImplementedError('Subclasses must override this')
1956
1957
1958 def _job_was_aborted(self):
1959 was_aborted = None
1960 for queue_entry in self._queue_entries:
1961 queue_entry.update_from_database()
1962 if was_aborted is None: # first queue entry
1963 was_aborted = bool(queue_entry.aborted)
1964 elif was_aborted != bool(queue_entry.aborted): # subsequent entries
1965 email_manager.manager.enqueue_notify_email(
1966 'Inconsistent abort state',
1967 'Queue entries have inconsistent abort state: ' +
1968 ', '.join('%s (%s)' % (queue_entry, queue_entry.aborted)))
1969 # don't crash here, just assume true
1970 return True
1971 return was_aborted
1972
1973
1974 def _determine_final_status(self):
1975 if self._job_was_aborted():
1976 return models.HostQueueEntry.Status.ABORTED
1977
1978 # we'll use a PidfileRunMonitor to read the autoserv exit status
1979 if self._autoserv_monitor.exit_code() == 0:
1980 return models.HostQueueEntry.Status.COMPLETED
1981 return models.HostQueueEntry.Status.FAILED
1982
1983
1984 def run(self):
showard5add1c82009-05-26 19:27:46 +00001985 assert not self.monitor
showardd3dc1992009-04-22 21:01:40 +00001986
showard5add1c82009-05-26 19:27:46 +00001987 # make sure we actually have results to work with.
1988 # this should never happen in normal operation.
1989 if not self._autoserv_monitor.has_process():
1990 email_manager.manager.enqueue_notify_email(
1991 'No results in post-job task',
1992 'No results in post-job task at %s' %
1993 self._autoserv_monitor.pidfile_id)
1994 self.finished(False)
1995 return
1996
1997 super(PostJobTask, self).run(
1998 pidfile_name=self._pidfile_name,
1999 paired_with_pidfile=self._paired_with_pidfile)
showardd3dc1992009-04-22 21:01:40 +00002000
2001
2002 def _set_all_statuses(self, status):
2003 for queue_entry in self._queue_entries:
2004 queue_entry.set_status(status)
2005
2006
2007 def abort(self):
2008 # override AgentTask.abort() to avoid killing the process and ending
2009 # the task. post-job tasks continue when the job is aborted.
2010 pass
2011
2012
showardb5626452009-06-30 01:57:28 +00002013class GatherLogsTask(PostJobTask, CleanupHostsMixin):
showardd3dc1992009-04-22 21:01:40 +00002014 """
2015 Task responsible for
2016 * gathering uncollected logs (if Autoserv crashed hard or was killed)
2017 * copying logs to the results repository
2018 * spawning CleanupTasks for hosts, if necessary
2019 * spawning a FinalReparseTask for the job
2020 """
2021 def __init__(self, job, queue_entries, run_monitor=None):
2022 self._job = job
2023 super(GatherLogsTask, self).__init__(
2024 queue_entries, pidfile_name=_CRASHINFO_PID_FILE,
2025 logfile_name='.collect_crashinfo.log', run_monitor=run_monitor)
2026 self._set_ids(queue_entries=queue_entries)
2027
2028
2029 def _generate_command(self, results_dir):
2030 host_list = ','.join(queue_entry.host.hostname
2031 for queue_entry in self._queue_entries)
2032 return [_autoserv_path , '-p', '--collect-crashinfo', '-m', host_list,
2033 '-r', results_dir]
2034
2035
2036 def prolog(self):
2037 super(GatherLogsTask, self).prolog()
2038 self._set_all_statuses(models.HostQueueEntry.Status.GATHERING)
2039
2040
showardd3dc1992009-04-22 21:01:40 +00002041 def epilog(self):
2042 super(GatherLogsTask, self).epilog()
showardebc0fb72009-05-13 21:28:07 +00002043 if self._autoserv_monitor.has_process():
2044 self._copy_and_parse_results(self._queue_entries,
2045 use_monitor=self._autoserv_monitor)
showardb5626452009-06-30 01:57:28 +00002046
2047 final_success = (
2048 self._final_status == models.HostQueueEntry.Status.COMPLETED)
2049 num_tests_failed = self._autoserv_monitor.num_tests_failed()
2050 self._reboot_hosts(self._job, self._queue_entries, final_success,
2051 num_tests_failed)
showardd3dc1992009-04-22 21:01:40 +00002052
2053
showard0bbfc212009-04-29 21:06:13 +00002054 def run(self):
showard597bfd32009-05-08 18:22:50 +00002055 autoserv_exit_code = self._autoserv_monitor.exit_code()
2056 # only run if Autoserv exited due to some signal. if we have no exit
2057 # code, assume something bad (and signal-like) happened.
2058 if autoserv_exit_code is None or os.WIFSIGNALED(autoserv_exit_code):
showard0bbfc212009-04-29 21:06:13 +00002059 super(GatherLogsTask, self).run()
showard597bfd32009-05-08 18:22:50 +00002060 else:
2061 self.finished(True)
showard0bbfc212009-04-29 21:06:13 +00002062
2063
showard8fe93b52008-11-18 17:53:22 +00002064class CleanupTask(PreJobTask):
showard2fe3f1d2009-07-06 20:19:11 +00002065 def __init__(self, host=None, queue_entry=None, task=None):
showardfa8629c2008-11-04 16:51:23 +00002066 assert bool(host) ^ bool(queue_entry)
2067 if queue_entry:
2068 host = queue_entry.get_host()
showardfa8629c2008-11-04 16:51:23 +00002069 self.queue_entry = queue_entry
jadmanski0afbb632008-06-06 21:10:57 +00002070 self.host = host
showard170873e2009-01-07 00:22:26 +00002071
2072 self.create_temp_resultsdir('.cleanup')
showard87ba02a2009-04-20 19:37:32 +00002073 self.cmd = _autoserv_command_line(host.hostname, self.temp_results_dir,
2074 ['--cleanup'],
2075 queue_entry=queue_entry)
showarde788ea62008-11-17 21:02:47 +00002076 repair_task = RepairTask(host, queue_entry=queue_entry)
showard170873e2009-01-07 00:22:26 +00002077 super(CleanupTask, self).__init__(self.cmd, self.temp_results_dir,
2078 failure_tasks=[repair_task])
2079
2080 self._set_ids(host=host, queue_entries=[queue_entry])
2081 self.set_host_log_file('cleanup', self.host)
showard2fe3f1d2009-07-06 20:19:11 +00002082 self._task = task
mbligh16c722d2008-03-05 00:58:44 +00002083
mblighd5c95802008-03-05 00:33:46 +00002084
jadmanski0afbb632008-06-06 21:10:57 +00002085 def prolog(self):
showard8fe93b52008-11-18 17:53:22 +00002086 super(CleanupTask, self).prolog()
showardb18134f2009-03-20 20:52:18 +00002087 logging.info("starting cleanup task for host: %s", self.host.hostname)
showard45ae8192008-11-05 19:32:53 +00002088 self.host.set_status("Cleaning")
mblighd5c95802008-03-05 00:33:46 +00002089
showard2fe3f1d2009-07-06 20:19:11 +00002090 self.task_type = models.SpecialTask.Task.CLEANUP
2091 self._task = models.SpecialTask.prepare(self, self._task)
2092
mblighd5c95802008-03-05 00:33:46 +00002093
showard21baa452008-10-21 00:08:39 +00002094 def epilog(self):
showard45ae8192008-11-05 19:32:53 +00002095 super(CleanupTask, self).epilog()
showard2fe3f1d2009-07-06 20:19:11 +00002096
2097 self._task.finish()
2098
showard21baa452008-10-21 00:08:39 +00002099 if self.success:
showardfa8629c2008-11-04 16:51:23 +00002100 self.host.set_status('Ready')
showard21baa452008-10-21 00:08:39 +00002101 self.host.update_field('dirty', 0)
2102
2103
showardd3dc1992009-04-22 21:01:40 +00002104class FinalReparseTask(PostJobTask):
showard97aed502008-11-04 02:01:24 +00002105 _num_running_parses = 0
2106
showardd3dc1992009-04-22 21:01:40 +00002107 def __init__(self, queue_entries, run_monitor=None):
2108 super(FinalReparseTask, self).__init__(queue_entries,
2109 pidfile_name=_PARSER_PID_FILE,
2110 logfile_name='.parse.log',
2111 run_monitor=run_monitor)
showard170873e2009-01-07 00:22:26 +00002112 # don't use _set_ids, since we don't want to set the host_ids
2113 self.queue_entry_ids = [entry.id for entry in queue_entries]
showard5add1c82009-05-26 19:27:46 +00002114 self._parse_started = (run_monitor is not None)
showard97aed502008-11-04 02:01:24 +00002115
showard97aed502008-11-04 02:01:24 +00002116
2117 @classmethod
2118 def _increment_running_parses(cls):
2119 cls._num_running_parses += 1
2120
2121
2122 @classmethod
2123 def _decrement_running_parses(cls):
2124 cls._num_running_parses -= 1
2125
2126
2127 @classmethod
2128 def _can_run_new_parse(cls):
showardd1ee1dd2009-01-07 21:33:08 +00002129 return (cls._num_running_parses <
2130 scheduler_config.config.max_parse_processes)
showard97aed502008-11-04 02:01:24 +00002131
2132
2133 def prolog(self):
2134 super(FinalReparseTask, self).prolog()
showardd3dc1992009-04-22 21:01:40 +00002135 self._set_all_statuses(models.HostQueueEntry.Status.PARSING)
showard97aed502008-11-04 02:01:24 +00002136
2137
2138 def epilog(self):
2139 super(FinalReparseTask, self).epilog()
showardd3dc1992009-04-22 21:01:40 +00002140 self._set_all_statuses(self._final_status)
showard97aed502008-11-04 02:01:24 +00002141
2142
showardd3dc1992009-04-22 21:01:40 +00002143 def _generate_command(self, results_dir):
mbligh9e936402009-05-13 20:42:17 +00002144 return [_parser_path, '--write-pidfile', '-l', '2', '-r', '-o', '-P',
showardd3dc1992009-04-22 21:01:40 +00002145 results_dir]
showard97aed502008-11-04 02:01:24 +00002146
2147
showard08a36412009-05-05 01:01:13 +00002148 def tick(self):
2149 # override tick to keep trying to start until the parse count goes down
showard97aed502008-11-04 02:01:24 +00002150 # and we can, at which point we revert to default behavior
2151 if self._parse_started:
showard08a36412009-05-05 01:01:13 +00002152 super(FinalReparseTask, self).tick()
showard97aed502008-11-04 02:01:24 +00002153 else:
2154 self._try_starting_parse()
2155
2156
2157 def run(self):
2158 # override run() to not actually run unless we can
2159 self._try_starting_parse()
2160
2161
2162 def _try_starting_parse(self):
2163 if not self._can_run_new_parse():
2164 return
showard170873e2009-01-07 00:22:26 +00002165
showard97aed502008-11-04 02:01:24 +00002166 # actually run the parse command
showardd3dc1992009-04-22 21:01:40 +00002167 super(FinalReparseTask, self).run()
showard170873e2009-01-07 00:22:26 +00002168
showard97aed502008-11-04 02:01:24 +00002169 self._increment_running_parses()
2170 self._parse_started = True
2171
2172
2173 def finished(self, success):
2174 super(FinalReparseTask, self).finished(success)
showard678df4f2009-02-04 21:36:39 +00002175 if self._parse_started:
2176 self._decrement_running_parses()
showard97aed502008-11-04 02:01:24 +00002177
2178
showardc9ae1782009-01-30 01:42:37 +00002179class SetEntryPendingTask(AgentTask):
2180 def __init__(self, queue_entry):
2181 super(SetEntryPendingTask, self).__init__(cmd='')
2182 self._queue_entry = queue_entry
2183 self._set_ids(queue_entries=[queue_entry])
2184
2185
2186 def run(self):
2187 agent = self._queue_entry.on_pending()
2188 if agent:
2189 self.agent.dispatcher.add_agent(agent)
2190 self.finished(True)
2191
2192
showarda3c58572009-03-12 20:36:59 +00002193class DBError(Exception):
2194 """Raised by the DBObject constructor when its select fails."""
2195
2196
mbligh36768f02008-02-22 18:28:33 +00002197class DBObject(object):
showarda3c58572009-03-12 20:36:59 +00002198 """A miniature object relational model for the database."""
showard6ae5ea92009-02-25 00:11:51 +00002199
2200 # Subclasses MUST override these:
2201 _table_name = ''
2202 _fields = ()
2203
showarda3c58572009-03-12 20:36:59 +00002204 # A mapping from (type, id) to the instance of the object for that
2205 # particular id. This prevents us from creating new Job() and Host()
2206 # instances for every HostQueueEntry object that we instantiate as
2207 # multiple HQEs often share the same Job.
2208 _instances_by_type_and_id = weakref.WeakValueDictionary()
2209 _initialized = False
showard6ae5ea92009-02-25 00:11:51 +00002210
showarda3c58572009-03-12 20:36:59 +00002211
2212 def __new__(cls, id=None, **kwargs):
2213 """
2214 Look to see if we already have an instance for this particular type
2215 and id. If so, use it instead of creating a duplicate instance.
2216 """
2217 if id is not None:
2218 instance = cls._instances_by_type_and_id.get((cls, id))
2219 if instance:
2220 return instance
2221 return super(DBObject, cls).__new__(cls, id=id, **kwargs)
2222
2223
2224 def __init__(self, id=None, row=None, new_record=False, always_query=True):
jadmanski0afbb632008-06-06 21:10:57 +00002225 assert (bool(id) != bool(row))
showard6ae5ea92009-02-25 00:11:51 +00002226 assert self._table_name, '_table_name must be defined in your class'
2227 assert self._fields, '_fields must be defined in your class'
showarda3c58572009-03-12 20:36:59 +00002228 if not new_record:
2229 if self._initialized and not always_query:
2230 return # We've already been initialized.
2231 if id is None:
2232 id = row[0]
2233 # Tell future constructors to use us instead of re-querying while
2234 # this instance is still around.
2235 self._instances_by_type_and_id[(type(self), id)] = self
mbligh36768f02008-02-22 18:28:33 +00002236
showard6ae5ea92009-02-25 00:11:51 +00002237 self.__table = self._table_name
mbligh36768f02008-02-22 18:28:33 +00002238
jadmanski0afbb632008-06-06 21:10:57 +00002239 self.__new_record = new_record
mbligh36768f02008-02-22 18:28:33 +00002240
jadmanski0afbb632008-06-06 21:10:57 +00002241 if row is None:
showardccbd6c52009-03-21 00:10:21 +00002242 row = self._fetch_row_from_db(id)
mbligh36768f02008-02-22 18:28:33 +00002243
showarda3c58572009-03-12 20:36:59 +00002244 if self._initialized:
2245 differences = self._compare_fields_in_row(row)
2246 if differences:
showard7629f142009-03-27 21:02:02 +00002247 logging.warn(
2248 'initialized %s %s instance requery is updating: %s',
2249 type(self), self.id, differences)
showard2bab8f42008-11-12 18:15:22 +00002250 self._update_fields_from_row(row)
showarda3c58572009-03-12 20:36:59 +00002251 self._initialized = True
2252
2253
2254 @classmethod
2255 def _clear_instance_cache(cls):
2256 """Used for testing, clear the internal instance cache."""
2257 cls._instances_by_type_and_id.clear()
2258
2259
showardccbd6c52009-03-21 00:10:21 +00002260 def _fetch_row_from_db(self, row_id):
2261 sql = 'SELECT * FROM %s WHERE ID=%%s' % self.__table
2262 rows = _db.execute(sql, (row_id,))
2263 if not rows:
showard76e29d12009-04-15 21:53:10 +00002264 raise DBError("row not found (table=%s, row id=%s)"
2265 % (self.__table, row_id))
showardccbd6c52009-03-21 00:10:21 +00002266 return rows[0]
2267
2268
showarda3c58572009-03-12 20:36:59 +00002269 def _assert_row_length(self, row):
2270 assert len(row) == len(self._fields), (
2271 "table = %s, row = %s/%d, fields = %s/%d" % (
2272 self.__table, row, len(row), self._fields, len(self._fields)))
2273
2274
2275 def _compare_fields_in_row(self, row):
2276 """
2277 Given a row as returned by a SELECT query, compare it to our existing
2278 in memory fields.
2279
2280 @param row - A sequence of values corresponding to fields named in
2281 The class attribute _fields.
2282
2283 @returns A dictionary listing the differences keyed by field name
2284 containing tuples of (current_value, row_value).
2285 """
2286 self._assert_row_length(row)
2287 differences = {}
2288 for field, row_value in itertools.izip(self._fields, row):
2289 current_value = getattr(self, field)
2290 if current_value != row_value:
2291 differences[field] = (current_value, row_value)
2292 return differences
showard2bab8f42008-11-12 18:15:22 +00002293
2294
2295 def _update_fields_from_row(self, row):
showarda3c58572009-03-12 20:36:59 +00002296 """
2297 Update our field attributes using a single row returned by SELECT.
2298
2299 @param row - A sequence of values corresponding to fields named in
2300 the class fields list.
2301 """
2302 self._assert_row_length(row)
mbligh36768f02008-02-22 18:28:33 +00002303
showard2bab8f42008-11-12 18:15:22 +00002304 self._valid_fields = set()
showarda3c58572009-03-12 20:36:59 +00002305 for field, value in itertools.izip(self._fields, row):
showard2bab8f42008-11-12 18:15:22 +00002306 setattr(self, field, value)
2307 self._valid_fields.add(field)
mbligh36768f02008-02-22 18:28:33 +00002308
showard2bab8f42008-11-12 18:15:22 +00002309 self._valid_fields.remove('id')
mbligh36768f02008-02-22 18:28:33 +00002310
mblighe2586682008-02-29 22:45:46 +00002311
showardccbd6c52009-03-21 00:10:21 +00002312 def update_from_database(self):
2313 assert self.id is not None
2314 row = self._fetch_row_from_db(self.id)
2315 self._update_fields_from_row(row)
2316
2317
jadmanski0afbb632008-06-06 21:10:57 +00002318 def count(self, where, table = None):
2319 if not table:
2320 table = self.__table
mbligh36768f02008-02-22 18:28:33 +00002321
jadmanski0afbb632008-06-06 21:10:57 +00002322 rows = _db.execute("""
2323 SELECT count(*) FROM %s
2324 WHERE %s
2325 """ % (table, where))
mbligh6f8bab42008-02-29 22:45:14 +00002326
jadmanski0afbb632008-06-06 21:10:57 +00002327 assert len(rows) == 1
2328
2329 return int(rows[0][0])
mbligh36768f02008-02-22 18:28:33 +00002330
2331
showardd3dc1992009-04-22 21:01:40 +00002332 def update_field(self, field, value):
showard2bab8f42008-11-12 18:15:22 +00002333 assert field in self._valid_fields
mbligh36768f02008-02-22 18:28:33 +00002334
showard2bab8f42008-11-12 18:15:22 +00002335 if getattr(self, field) == value:
jadmanski0afbb632008-06-06 21:10:57 +00002336 return
mbligh36768f02008-02-22 18:28:33 +00002337
mblighf8c624d2008-07-03 16:58:45 +00002338 query = "UPDATE %s SET %s = %%s WHERE id = %%s" % (self.__table, field)
jadmanski0afbb632008-06-06 21:10:57 +00002339 _db.execute(query, (value, self.id))
2340
showard2bab8f42008-11-12 18:15:22 +00002341 setattr(self, field, value)
mbligh36768f02008-02-22 18:28:33 +00002342
2343
jadmanski0afbb632008-06-06 21:10:57 +00002344 def save(self):
2345 if self.__new_record:
showard6ae5ea92009-02-25 00:11:51 +00002346 keys = self._fields[1:] # avoid id
jadmanski0afbb632008-06-06 21:10:57 +00002347 columns = ','.join([str(key) for key in keys])
showard76e29d12009-04-15 21:53:10 +00002348 values = []
2349 for key in keys:
2350 value = getattr(self, key)
2351 if value is None:
2352 values.append('NULL')
2353 else:
2354 values.append('"%s"' % value)
showard89f84db2009-03-12 20:39:13 +00002355 values_str = ','.join(values)
2356 query = ('INSERT INTO %s (%s) VALUES (%s)' %
2357 (self.__table, columns, values_str))
jadmanski0afbb632008-06-06 21:10:57 +00002358 _db.execute(query)
showard89f84db2009-03-12 20:39:13 +00002359 # Update our id to the one the database just assigned to us.
2360 self.id = _db.execute('SELECT LAST_INSERT_ID()')[0][0]
mbligh36768f02008-02-22 18:28:33 +00002361
2362
jadmanski0afbb632008-06-06 21:10:57 +00002363 def delete(self):
showarda3c58572009-03-12 20:36:59 +00002364 self._instances_by_type_and_id.pop((type(self), id), None)
2365 self._initialized = False
2366 self._valid_fields.clear()
jadmanski0afbb632008-06-06 21:10:57 +00002367 query = 'DELETE FROM %s WHERE id=%%s' % self.__table
2368 _db.execute(query, (self.id,))
mblighe2586682008-02-29 22:45:46 +00002369
2370
showard63a34772008-08-18 19:32:50 +00002371 @staticmethod
2372 def _prefix_with(string, prefix):
2373 if string:
2374 string = prefix + string
2375 return string
2376
2377
jadmanski0afbb632008-06-06 21:10:57 +00002378 @classmethod
showard989f25d2008-10-01 11:38:11 +00002379 def fetch(cls, where='', params=(), joins='', order_by=''):
showard89f84db2009-03-12 20:39:13 +00002380 """
2381 Construct instances of our class based on the given database query.
2382
2383 @yields One class instance for each row fetched.
2384 """
showard63a34772008-08-18 19:32:50 +00002385 order_by = cls._prefix_with(order_by, 'ORDER BY ')
2386 where = cls._prefix_with(where, 'WHERE ')
2387 query = ('SELECT %(table)s.* FROM %(table)s %(joins)s '
showard6ae5ea92009-02-25 00:11:51 +00002388 '%(where)s %(order_by)s' % {'table' : cls._table_name,
showard63a34772008-08-18 19:32:50 +00002389 'joins' : joins,
2390 'where' : where,
2391 'order_by' : order_by})
2392 rows = _db.execute(query, params)
jadmanski0afbb632008-06-06 21:10:57 +00002393 for row in rows:
2394 yield cls(row=row)
mblighe2586682008-02-29 22:45:46 +00002395
mbligh36768f02008-02-22 18:28:33 +00002396
2397class IneligibleHostQueue(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002398 _table_name = 'ineligible_host_queues'
2399 _fields = ('id', 'job_id', 'host_id')
showard04c82c52008-05-29 19:38:12 +00002400
2401
showard89f84db2009-03-12 20:39:13 +00002402class AtomicGroup(DBObject):
2403 _table_name = 'atomic_groups'
showard205fd602009-03-21 00:17:35 +00002404 _fields = ('id', 'name', 'description', 'max_number_of_machines',
2405 'invalid')
showard89f84db2009-03-12 20:39:13 +00002406
2407
showard989f25d2008-10-01 11:38:11 +00002408class Label(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002409 _table_name = 'labels'
2410 _fields = ('id', 'name', 'kernel_config', 'platform', 'invalid',
showard89f84db2009-03-12 20:39:13 +00002411 'only_if_needed', 'atomic_group_id')
showard989f25d2008-10-01 11:38:11 +00002412
2413
mbligh36768f02008-02-22 18:28:33 +00002414class Host(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002415 _table_name = 'hosts'
2416 _fields = ('id', 'hostname', 'locked', 'synch_id', 'status',
2417 'invalid', 'protection', 'locked_by_id', 'lock_time', 'dirty')
2418
2419
jadmanski0afbb632008-06-06 21:10:57 +00002420 def current_task(self):
2421 rows = _db.execute("""
2422 SELECT * FROM host_queue_entries WHERE host_id=%s AND NOT complete AND active
2423 """, (self.id,))
2424
2425 if len(rows) == 0:
2426 return None
2427 else:
2428 assert len(rows) == 1
2429 results = rows[0];
jadmanski0afbb632008-06-06 21:10:57 +00002430 return HostQueueEntry(row=results)
mbligh36768f02008-02-22 18:28:33 +00002431
2432
jadmanski0afbb632008-06-06 21:10:57 +00002433 def yield_work(self):
showardb18134f2009-03-20 20:52:18 +00002434 logging.info("%s yielding work", self.hostname)
jadmanski0afbb632008-06-06 21:10:57 +00002435 if self.current_task():
2436 self.current_task().requeue()
2437
showard6ae5ea92009-02-25 00:11:51 +00002438
jadmanski0afbb632008-06-06 21:10:57 +00002439 def set_status(self,status):
showardb18134f2009-03-20 20:52:18 +00002440 logging.info('%s -> %s', self.hostname, status)
jadmanski0afbb632008-06-06 21:10:57 +00002441 self.update_field('status',status)
mbligh36768f02008-02-22 18:28:33 +00002442
2443
showard170873e2009-01-07 00:22:26 +00002444 def platform_and_labels(self):
showardd8e548a2008-09-09 03:04:57 +00002445 """
showard170873e2009-01-07 00:22:26 +00002446 Returns a tuple (platform_name, list_of_all_label_names).
showardd8e548a2008-09-09 03:04:57 +00002447 """
2448 rows = _db.execute("""
showard170873e2009-01-07 00:22:26 +00002449 SELECT labels.name, labels.platform
showardd8e548a2008-09-09 03:04:57 +00002450 FROM labels
2451 INNER JOIN hosts_labels ON labels.id = hosts_labels.label_id
showard170873e2009-01-07 00:22:26 +00002452 WHERE hosts_labels.host_id = %s
showardd8e548a2008-09-09 03:04:57 +00002453 ORDER BY labels.name
2454 """, (self.id,))
showard170873e2009-01-07 00:22:26 +00002455 platform = None
2456 all_labels = []
2457 for label_name, is_platform in rows:
2458 if is_platform:
2459 platform = label_name
2460 all_labels.append(label_name)
2461 return platform, all_labels
2462
2463
showard2fe3f1d2009-07-06 20:19:11 +00002464 def reverify_tasks(self):
2465 cleanup_task = CleanupTask(host=self)
2466 verify_task = VerifyTask(host=self)
2467
showard6d7b2ff2009-06-10 00:16:47 +00002468 # just to make sure this host does not get taken away
showard2fe3f1d2009-07-06 20:19:11 +00002469 self.set_status('Cleaning')
2470 return [cleanup_task, verify_task]
showardd8e548a2008-09-09 03:04:57 +00002471
2472
showard54c1ea92009-05-20 00:32:58 +00002473 _ALPHANUM_HOST_RE = re.compile(r'^([a-z-]+)(\d+)$', re.IGNORECASE)
2474
2475
2476 @classmethod
2477 def cmp_for_sort(cls, a, b):
2478 """
2479 A comparison function for sorting Host objects by hostname.
2480
2481 This strips any trailing numeric digits, ignores leading 0s and
2482 compares hostnames by the leading name and the trailing digits as a
2483 number. If both hostnames do not match this pattern, they are simply
2484 compared as lower case strings.
2485
2486 Example of how hostnames will be sorted:
2487
2488 alice, host1, host2, host09, host010, host10, host11, yolkfolk
2489
2490 This hopefully satisfy most people's hostname sorting needs regardless
2491 of their exact naming schemes. Nobody sane should have both a host10
2492 and host010 (but the algorithm works regardless).
2493 """
2494 lower_a = a.hostname.lower()
2495 lower_b = b.hostname.lower()
2496 match_a = cls._ALPHANUM_HOST_RE.match(lower_a)
2497 match_b = cls._ALPHANUM_HOST_RE.match(lower_b)
2498 if match_a and match_b:
2499 name_a, number_a_str = match_a.groups()
2500 name_b, number_b_str = match_b.groups()
2501 number_a = int(number_a_str.lstrip('0'))
2502 number_b = int(number_b_str.lstrip('0'))
2503 result = cmp((name_a, number_a), (name_b, number_b))
2504 if result == 0 and lower_a != lower_b:
2505 # If they compared equal above but the lower case names are
2506 # indeed different, don't report equality. abc012 != abc12.
2507 return cmp(lower_a, lower_b)
2508 return result
2509 else:
2510 return cmp(lower_a, lower_b)
2511
2512
mbligh36768f02008-02-22 18:28:33 +00002513class HostQueueEntry(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002514 _table_name = 'host_queue_entries'
2515 _fields = ('id', 'job_id', 'host_id', 'status', 'meta_host',
showard89f84db2009-03-12 20:39:13 +00002516 'active', 'complete', 'deleted', 'execution_subdir',
showard12f3e322009-05-13 21:27:42 +00002517 'atomic_group_id', 'aborted', 'started_on')
showard6ae5ea92009-02-25 00:11:51 +00002518
2519
showarda3c58572009-03-12 20:36:59 +00002520 def __init__(self, id=None, row=None, **kwargs):
jadmanski0afbb632008-06-06 21:10:57 +00002521 assert id or row
showarda3c58572009-03-12 20:36:59 +00002522 super(HostQueueEntry, self).__init__(id=id, row=row, **kwargs)
jadmanski0afbb632008-06-06 21:10:57 +00002523 self.job = Job(self.job_id)
mbligh36768f02008-02-22 18:28:33 +00002524
jadmanski0afbb632008-06-06 21:10:57 +00002525 if self.host_id:
2526 self.host = Host(self.host_id)
2527 else:
2528 self.host = None
mbligh36768f02008-02-22 18:28:33 +00002529
showard77182562009-06-10 00:16:05 +00002530 if self.atomic_group_id:
2531 self.atomic_group = AtomicGroup(self.atomic_group_id,
2532 always_query=False)
2533 else:
2534 self.atomic_group = None
2535
showard170873e2009-01-07 00:22:26 +00002536 self.queue_log_path = os.path.join(self.job.tag(),
jadmanski0afbb632008-06-06 21:10:57 +00002537 'queue.log.' + str(self.id))
mbligh36768f02008-02-22 18:28:33 +00002538
2539
showard89f84db2009-03-12 20:39:13 +00002540 @classmethod
2541 def clone(cls, template):
2542 """
2543 Creates a new row using the values from a template instance.
2544
2545 The new instance will not exist in the database or have a valid
2546 id attribute until its save() method is called.
2547 """
2548 assert isinstance(template, cls)
2549 new_row = [getattr(template, field) for field in cls._fields]
2550 clone = cls(row=new_row, new_record=True)
2551 clone.id = None
2552 return clone
2553
2554
showardc85c21b2008-11-24 22:17:37 +00002555 def _view_job_url(self):
2556 return "%s#tab_id=view_job&object_id=%s" % (_base_url, self.job.id)
2557
2558
showardf1ae3542009-05-11 19:26:02 +00002559 def get_labels(self):
2560 """
2561 Get all labels associated with this host queue entry (either via the
2562 meta_host or as a job dependency label). The labels yielded are not
2563 guaranteed to be unique.
2564
2565 @yields Label instances associated with this host_queue_entry.
2566 """
2567 if self.meta_host:
2568 yield Label(id=self.meta_host, always_query=False)
2569 labels = Label.fetch(
2570 joins="JOIN jobs_dependency_labels AS deps "
2571 "ON (labels.id = deps.label_id)",
2572 where="deps.job_id = %d" % self.job.id)
2573 for label in labels:
2574 yield label
2575
2576
jadmanski0afbb632008-06-06 21:10:57 +00002577 def set_host(self, host):
2578 if host:
2579 self.queue_log_record('Assigning host ' + host.hostname)
2580 self.update_field('host_id', host.id)
2581 self.update_field('active', True)
2582 self.block_host(host.id)
2583 else:
2584 self.queue_log_record('Releasing host')
2585 self.unblock_host(self.host.id)
2586 self.update_field('host_id', None)
mbligh36768f02008-02-22 18:28:33 +00002587
jadmanski0afbb632008-06-06 21:10:57 +00002588 self.host = host
mbligh36768f02008-02-22 18:28:33 +00002589
2590
jadmanski0afbb632008-06-06 21:10:57 +00002591 def get_host(self):
2592 return self.host
mbligh36768f02008-02-22 18:28:33 +00002593
2594
jadmanski0afbb632008-06-06 21:10:57 +00002595 def queue_log_record(self, log_line):
2596 now = str(datetime.datetime.now())
showard170873e2009-01-07 00:22:26 +00002597 _drone_manager.write_lines_to_file(self.queue_log_path,
2598 [now + ' ' + log_line])
mbligh36768f02008-02-22 18:28:33 +00002599
2600
jadmanski0afbb632008-06-06 21:10:57 +00002601 def block_host(self, host_id):
showardb18134f2009-03-20 20:52:18 +00002602 logging.info("creating block %s/%s", self.job.id, host_id)
jadmanski0afbb632008-06-06 21:10:57 +00002603 row = [0, self.job.id, host_id]
2604 block = IneligibleHostQueue(row=row, new_record=True)
2605 block.save()
mblighe2586682008-02-29 22:45:46 +00002606
2607
jadmanski0afbb632008-06-06 21:10:57 +00002608 def unblock_host(self, host_id):
showardb18134f2009-03-20 20:52:18 +00002609 logging.info("removing block %s/%s", self.job.id, host_id)
jadmanski0afbb632008-06-06 21:10:57 +00002610 blocks = IneligibleHostQueue.fetch(
2611 'job_id=%d and host_id=%d' % (self.job.id, host_id))
2612 for block in blocks:
2613 block.delete()
mblighe2586682008-02-29 22:45:46 +00002614
2615
showard2bab8f42008-11-12 18:15:22 +00002616 def set_execution_subdir(self, subdir=None):
2617 if subdir is None:
2618 assert self.get_host()
2619 subdir = self.get_host().hostname
2620 self.update_field('execution_subdir', subdir)
mbligh36768f02008-02-22 18:28:33 +00002621
2622
showard6355f6b2008-12-05 18:52:13 +00002623 def _get_hostname(self):
2624 if self.host:
2625 return self.host.hostname
2626 return 'no host'
2627
2628
showard170873e2009-01-07 00:22:26 +00002629 def __str__(self):
2630 return "%s/%d (%d)" % (self._get_hostname(), self.job.id, self.id)
2631
2632
jadmanski0afbb632008-06-06 21:10:57 +00002633 def set_status(self, status):
showardd3dc1992009-04-22 21:01:40 +00002634 self.update_field('status', status)
mblighf8c624d2008-07-03 16:58:45 +00002635
showardb18134f2009-03-20 20:52:18 +00002636 logging.info("%s -> %s", self, self.status)
mblighf8c624d2008-07-03 16:58:45 +00002637
showardc85c21b2008-11-24 22:17:37 +00002638 if status in ['Queued', 'Parsing']:
jadmanski0afbb632008-06-06 21:10:57 +00002639 self.update_field('complete', False)
2640 self.update_field('active', False)
mbligh36768f02008-02-22 18:28:33 +00002641
jadmanski0afbb632008-06-06 21:10:57 +00002642 if status in ['Pending', 'Running', 'Verifying', 'Starting',
showardd3dc1992009-04-22 21:01:40 +00002643 'Gathering']:
jadmanski0afbb632008-06-06 21:10:57 +00002644 self.update_field('complete', False)
2645 self.update_field('active', True)
mbligh36768f02008-02-22 18:28:33 +00002646
showardc85c21b2008-11-24 22:17:37 +00002647 if status in ['Failed', 'Completed', 'Stopped', 'Aborted']:
jadmanski0afbb632008-06-06 21:10:57 +00002648 self.update_field('complete', True)
2649 self.update_field('active', False)
showardc85c21b2008-11-24 22:17:37 +00002650
2651 should_email_status = (status.lower() in _notify_email_statuses or
2652 'all' in _notify_email_statuses)
2653 if should_email_status:
2654 self._email_on_status(status)
2655
2656 self._email_on_job_complete()
2657
2658
2659 def _email_on_status(self, status):
showard6355f6b2008-12-05 18:52:13 +00002660 hostname = self._get_hostname()
showardc85c21b2008-11-24 22:17:37 +00002661
2662 subject = 'Autotest: Job ID: %s "%s" Host: %s %s' % (
2663 self.job.id, self.job.name, hostname, status)
2664 body = "Job ID: %s\nJob Name: %s\nHost: %s\nStatus: %s\n%s\n" % (
2665 self.job.id, self.job.name, hostname, status,
2666 self._view_job_url())
showard170873e2009-01-07 00:22:26 +00002667 email_manager.manager.send_email(self.job.email_list, subject, body)
showard542e8402008-09-19 20:16:18 +00002668
2669
2670 def _email_on_job_complete(self):
showardc85c21b2008-11-24 22:17:37 +00002671 if not self.job.is_finished():
2672 return
showard542e8402008-09-19 20:16:18 +00002673
showardc85c21b2008-11-24 22:17:37 +00002674 summary_text = []
showard6355f6b2008-12-05 18:52:13 +00002675 hosts_queue = HostQueueEntry.fetch('job_id = %s' % self.job.id)
showardc85c21b2008-11-24 22:17:37 +00002676 for queue_entry in hosts_queue:
2677 summary_text.append("Host: %s Status: %s" %
showard6355f6b2008-12-05 18:52:13 +00002678 (queue_entry._get_hostname(),
showardc85c21b2008-11-24 22:17:37 +00002679 queue_entry.status))
2680
2681 summary_text = "\n".join(summary_text)
2682 status_counts = models.Job.objects.get_status_counts(
2683 [self.job.id])[self.job.id]
2684 status = ', '.join('%d %s' % (count, status) for status, count
2685 in status_counts.iteritems())
2686
2687 subject = 'Autotest: Job ID: %s "%s" %s' % (
2688 self.job.id, self.job.name, status)
2689 body = "Job ID: %s\nJob Name: %s\nStatus: %s\n%s\nSummary:\n%s" % (
2690 self.job.id, self.job.name, status, self._view_job_url(),
2691 summary_text)
showard170873e2009-01-07 00:22:26 +00002692 email_manager.manager.send_email(self.job.email_list, subject, body)
mbligh36768f02008-02-22 18:28:33 +00002693
2694
showard77182562009-06-10 00:16:05 +00002695 def run_pre_job_tasks(self, assigned_host=None):
2696 if self.meta_host is not None or self.atomic_group:
jadmanski0afbb632008-06-06 21:10:57 +00002697 assert assigned_host
2698 # ensure results dir exists for the queue log
showard08356c12009-06-15 20:24:01 +00002699 if self.host_id is None:
2700 self.set_host(assigned_host)
2701 else:
2702 assert assigned_host.id == self.host_id
mbligh36768f02008-02-22 18:28:33 +00002703
showardb18134f2009-03-20 20:52:18 +00002704 logging.info("%s/%s/%s scheduled on %s, status=%s",
2705 self.job.name, self.meta_host, self.atomic_group_id,
2706 self.host.hostname, self.status)
mbligh36768f02008-02-22 18:28:33 +00002707
showard77182562009-06-10 00:16:05 +00002708 return self._do_run_pre_job_tasks()
2709
2710
2711 def _do_run_pre_job_tasks(self):
2712 # Every host goes thru the Verifying stage (which may or may not
2713 # actually do anything as determined by get_pre_job_tasks).
2714 self.set_status(models.HostQueueEntry.Status.VERIFYING)
2715
2716 # The pre job tasks always end with a SetEntryPendingTask which
2717 # will continue as appropriate through queue_entry.on_pending().
2718 return Agent(self.job.get_pre_job_tasks(queue_entry=self))
mblighe2586682008-02-29 22:45:46 +00002719
showard6ae5ea92009-02-25 00:11:51 +00002720
jadmanski0afbb632008-06-06 21:10:57 +00002721 def requeue(self):
2722 self.set_status('Queued')
showard12f3e322009-05-13 21:27:42 +00002723 self.update_field('started_on', None)
showardde634ee2009-01-30 01:44:24 +00002724 # verify/cleanup failure sets the execution subdir, so reset it here
2725 self.set_execution_subdir('')
jadmanski0afbb632008-06-06 21:10:57 +00002726 if self.meta_host:
2727 self.set_host(None)
mbligh36768f02008-02-22 18:28:33 +00002728
2729
jadmanski0afbb632008-06-06 21:10:57 +00002730 def handle_host_failure(self):
2731 """\
2732 Called when this queue entry's host has failed verification and
2733 repair.
2734 """
2735 assert not self.meta_host
2736 self.set_status('Failed')
showard2bab8f42008-11-12 18:15:22 +00002737 self.job.stop_if_necessary()
mblighe2586682008-02-29 22:45:46 +00002738
2739
jadmanskif7fa2cc2008-10-01 14:13:23 +00002740 @property
2741 def aborted_by(self):
2742 self._load_abort_info()
2743 return self._aborted_by
2744
2745
2746 @property
2747 def aborted_on(self):
2748 self._load_abort_info()
2749 return self._aborted_on
2750
2751
2752 def _load_abort_info(self):
2753 """ Fetch info about who aborted the job. """
2754 if hasattr(self, "_aborted_by"):
2755 return
2756 rows = _db.execute("""
2757 SELECT users.login, aborted_host_queue_entries.aborted_on
2758 FROM aborted_host_queue_entries
2759 INNER JOIN users
2760 ON users.id = aborted_host_queue_entries.aborted_by_id
2761 WHERE aborted_host_queue_entries.queue_entry_id = %s
2762 """, (self.id,))
2763 if rows:
2764 self._aborted_by, self._aborted_on = rows[0]
2765 else:
2766 self._aborted_by = self._aborted_on = None
2767
2768
showardb2e2c322008-10-14 17:33:55 +00002769 def on_pending(self):
2770 """
2771 Called when an entry in a synchronous job has passed verify. If the
2772 job is ready to run, returns an agent to run the job. Returns None
2773 otherwise.
2774 """
2775 self.set_status('Pending')
showardcfd66a32008-10-15 20:31:48 +00002776 self.get_host().set_status('Pending')
showard77182562009-06-10 00:16:05 +00002777 return self.job.run_if_ready(queue_entry=self)
showardb2e2c322008-10-14 17:33:55 +00002778
2779
showardd3dc1992009-04-22 21:01:40 +00002780 def abort(self, dispatcher):
2781 assert self.aborted and not self.complete
showard1be97432008-10-17 15:30:45 +00002782
showardd3dc1992009-04-22 21:01:40 +00002783 Status = models.HostQueueEntry.Status
2784 has_running_job_agent = (
2785 self.status in (Status.RUNNING, Status.GATHERING, Status.PARSING)
2786 and dispatcher.get_agents_for_entry(self))
2787 if has_running_job_agent:
2788 # do nothing; post-job tasks will finish and then mark this entry
2789 # with status "Aborted" and take care of the host
2790 return
2791
2792 if self.status in (Status.STARTING, Status.PENDING):
2793 self.host.set_status(models.Host.Status.READY)
2794 elif self.status == Status.VERIFYING:
2795 dispatcher.add_agent(Agent(tasks=self.host.reverify_tasks()))
2796
2797 self.set_status(Status.ABORTED)
showard170873e2009-01-07 00:22:26 +00002798
2799 def execution_tag(self):
2800 assert self.execution_subdir
mblighe7d9c602009-07-02 19:02:33 +00002801 return "%s/%s" % (self.job.tag(), self.execution_subdir)
showard1be97432008-10-17 15:30:45 +00002802
2803
mbligh36768f02008-02-22 18:28:33 +00002804class Job(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002805 _table_name = 'jobs'
2806 _fields = ('id', 'owner', 'name', 'priority', 'control_file',
2807 'control_type', 'created_on', 'synch_count', 'timeout',
showarda1e74b32009-05-12 17:32:04 +00002808 'run_verify', 'email_list', 'reboot_before', 'reboot_after',
showard12f3e322009-05-13 21:27:42 +00002809 'parse_failed_repair', 'max_runtime_hrs')
showard6ae5ea92009-02-25 00:11:51 +00002810
showard77182562009-06-10 00:16:05 +00002811 # This does not need to be a column in the DB. The delays are likely to
2812 # be configured short. If the scheduler is stopped and restarted in
2813 # the middle of a job's delay cycle, the delay cycle will either be
2814 # repeated or skipped depending on the number of Pending machines found
2815 # when the restarted scheduler recovers to track it. Not a problem.
2816 #
2817 # A reference to the DelayedCallTask that will wake up the job should
2818 # no other HQEs change state in time. Its end_time attribute is used
2819 # by our run_with_ready_delay() method to determine if the wait is over.
2820 _delay_ready_task = None
2821
2822 # TODO(gps): On scheduler start/recovery we need to call HQE.on_pending() on
2823 # all status='Pending' atomic group HQEs incase a delay was running when the
2824 # scheduler was restarted and no more hosts ever successfully exit Verify.
showard6ae5ea92009-02-25 00:11:51 +00002825
showarda3c58572009-03-12 20:36:59 +00002826 def __init__(self, id=None, row=None, **kwargs):
jadmanski0afbb632008-06-06 21:10:57 +00002827 assert id or row
showarda3c58572009-03-12 20:36:59 +00002828 super(Job, self).__init__(id=id, row=row, **kwargs)
mbligh36768f02008-02-22 18:28:33 +00002829
mblighe2586682008-02-29 22:45:46 +00002830
jadmanski0afbb632008-06-06 21:10:57 +00002831 def is_server_job(self):
2832 return self.control_type != 2
mbligh36768f02008-02-22 18:28:33 +00002833
2834
showard170873e2009-01-07 00:22:26 +00002835 def tag(self):
2836 return "%s-%s" % (self.id, self.owner)
2837
2838
jadmanski0afbb632008-06-06 21:10:57 +00002839 def get_host_queue_entries(self):
2840 rows = _db.execute("""
2841 SELECT * FROM host_queue_entries
2842 WHERE job_id= %s
2843 """, (self.id,))
2844 entries = [HostQueueEntry(row=i) for i in rows]
mbligh36768f02008-02-22 18:28:33 +00002845
jadmanski0afbb632008-06-06 21:10:57 +00002846 assert len(entries)>0
mbligh36768f02008-02-22 18:28:33 +00002847
jadmanski0afbb632008-06-06 21:10:57 +00002848 return entries
mbligh36768f02008-02-22 18:28:33 +00002849
2850
jadmanski0afbb632008-06-06 21:10:57 +00002851 def set_status(self, status, update_queues=False):
2852 self.update_field('status',status)
2853
2854 if update_queues:
2855 for queue_entry in self.get_host_queue_entries():
2856 queue_entry.set_status(status)
mbligh36768f02008-02-22 18:28:33 +00002857
2858
showard77182562009-06-10 00:16:05 +00002859 def _atomic_and_has_started(self):
2860 """
2861 @returns True if any of the HostQueueEntries associated with this job
2862 have entered the Status.STARTING state or beyond.
2863 """
2864 atomic_entries = models.HostQueueEntry.objects.filter(
2865 job=self.id, atomic_group__isnull=False)
2866 if atomic_entries.count() <= 0:
2867 return False
2868
showardaf8b4ca2009-06-16 18:47:26 +00002869 # These states may *only* be reached if Job.run() has been called.
2870 started_statuses = (models.HostQueueEntry.Status.STARTING,
2871 models.HostQueueEntry.Status.RUNNING,
2872 models.HostQueueEntry.Status.COMPLETED)
2873
2874 started_entries = atomic_entries.filter(status__in=started_statuses)
showard77182562009-06-10 00:16:05 +00002875 return started_entries.count() > 0
2876
2877
2878 def _pending_count(self):
2879 """The number of HostQueueEntries for this job in the Pending state."""
2880 pending_entries = models.HostQueueEntry.objects.filter(
2881 job=self.id, status=models.HostQueueEntry.Status.PENDING)
2882 return pending_entries.count()
2883
2884
jadmanski0afbb632008-06-06 21:10:57 +00002885 def is_ready(self):
showard77182562009-06-10 00:16:05 +00002886 # NOTE: Atomic group jobs stop reporting ready after they have been
2887 # started to avoid launching multiple copies of one atomic job.
2888 # Only possible if synch_count is less than than half the number of
2889 # machines in the atomic group.
2890 return (self._pending_count() >= self.synch_count
2891 and not self._atomic_and_has_started())
mbligh36768f02008-02-22 18:28:33 +00002892
2893
jadmanski0afbb632008-06-06 21:10:57 +00002894 def num_machines(self, clause = None):
2895 sql = "job_id=%s" % self.id
2896 if clause:
2897 sql += " AND (%s)" % clause
2898 return self.count(sql, table='host_queue_entries')
mbligh36768f02008-02-22 18:28:33 +00002899
2900
jadmanski0afbb632008-06-06 21:10:57 +00002901 def num_queued(self):
2902 return self.num_machines('not complete')
mbligh36768f02008-02-22 18:28:33 +00002903
2904
jadmanski0afbb632008-06-06 21:10:57 +00002905 def num_active(self):
2906 return self.num_machines('active')
mbligh36768f02008-02-22 18:28:33 +00002907
2908
jadmanski0afbb632008-06-06 21:10:57 +00002909 def num_complete(self):
2910 return self.num_machines('complete')
mbligh36768f02008-02-22 18:28:33 +00002911
2912
jadmanski0afbb632008-06-06 21:10:57 +00002913 def is_finished(self):
showardc85c21b2008-11-24 22:17:37 +00002914 return self.num_complete() == self.num_machines()
mbligh36768f02008-02-22 18:28:33 +00002915
mbligh36768f02008-02-22 18:28:33 +00002916
showard6bb7c292009-01-30 01:44:51 +00002917 def _not_yet_run_entries(self, include_verifying=True):
2918 statuses = [models.HostQueueEntry.Status.QUEUED,
2919 models.HostQueueEntry.Status.PENDING]
2920 if include_verifying:
2921 statuses.append(models.HostQueueEntry.Status.VERIFYING)
2922 return models.HostQueueEntry.objects.filter(job=self.id,
2923 status__in=statuses)
2924
2925
2926 def _stop_all_entries(self):
2927 entries_to_stop = self._not_yet_run_entries(
2928 include_verifying=False)
2929 for child_entry in entries_to_stop:
showard4f9e5372009-01-07 21:33:38 +00002930 assert not child_entry.complete, (
2931 '%s status=%s, active=%s, complete=%s' %
2932 (child_entry.id, child_entry.status, child_entry.active,
2933 child_entry.complete))
showard2bab8f42008-11-12 18:15:22 +00002934 if child_entry.status == models.HostQueueEntry.Status.PENDING:
2935 child_entry.host.status = models.Host.Status.READY
2936 child_entry.host.save()
2937 child_entry.status = models.HostQueueEntry.Status.STOPPED
2938 child_entry.save()
2939
showard2bab8f42008-11-12 18:15:22 +00002940 def stop_if_necessary(self):
showard6bb7c292009-01-30 01:44:51 +00002941 not_yet_run = self._not_yet_run_entries()
showard2bab8f42008-11-12 18:15:22 +00002942 if not_yet_run.count() < self.synch_count:
showard6bb7c292009-01-30 01:44:51 +00002943 self._stop_all_entries()
mblighe2586682008-02-29 22:45:46 +00002944
2945
jadmanski0afbb632008-06-06 21:10:57 +00002946 def write_to_machines_file(self, queue_entry):
2947 hostname = queue_entry.get_host().hostname
showard170873e2009-01-07 00:22:26 +00002948 file_path = os.path.join(self.tag(), '.machines')
2949 _drone_manager.write_lines_to_file(file_path, [hostname])
mbligh36768f02008-02-22 18:28:33 +00002950
2951
showardf1ae3542009-05-11 19:26:02 +00002952 def _next_group_name(self, group_name=''):
2953 """@returns a directory name to use for the next host group results."""
2954 if group_name:
2955 # Sanitize for use as a pathname.
2956 group_name = group_name.replace(os.path.sep, '_')
2957 if group_name.startswith('.'):
2958 group_name = '_' + group_name[1:]
2959 # Add a separator between the group name and 'group%d'.
2960 group_name += '.'
2961 group_count_re = re.compile(r'%sgroup(\d+)' % re.escape(group_name))
showard2bab8f42008-11-12 18:15:22 +00002962 query = models.HostQueueEntry.objects.filter(
2963 job=self.id).values('execution_subdir').distinct()
2964 subdirs = (entry['execution_subdir'] for entry in query)
showardf1ae3542009-05-11 19:26:02 +00002965 group_matches = (group_count_re.match(subdir) for subdir in subdirs)
2966 ids = [int(match.group(1)) for match in group_matches if match]
showard2bab8f42008-11-12 18:15:22 +00002967 if ids:
2968 next_id = max(ids) + 1
2969 else:
2970 next_id = 0
showardf1ae3542009-05-11 19:26:02 +00002971 return '%sgroup%d' % (group_name, next_id)
showard2bab8f42008-11-12 18:15:22 +00002972
2973
showard170873e2009-01-07 00:22:26 +00002974 def _write_control_file(self, execution_tag):
2975 control_path = _drone_manager.attach_file_to_execution(
2976 execution_tag, self.control_file)
2977 return control_path
mbligh36768f02008-02-22 18:28:33 +00002978
showardb2e2c322008-10-14 17:33:55 +00002979
showard2bab8f42008-11-12 18:15:22 +00002980 def get_group_entries(self, queue_entry_from_group):
2981 execution_subdir = queue_entry_from_group.execution_subdir
showarde788ea62008-11-17 21:02:47 +00002982 return list(HostQueueEntry.fetch(
2983 where='job_id=%s AND execution_subdir=%s',
2984 params=(self.id, execution_subdir)))
showard2bab8f42008-11-12 18:15:22 +00002985
2986
showardb2e2c322008-10-14 17:33:55 +00002987 def _get_autoserv_params(self, queue_entries):
showard170873e2009-01-07 00:22:26 +00002988 assert queue_entries
2989 execution_tag = queue_entries[0].execution_tag()
2990 control_path = self._write_control_file(execution_tag)
jadmanski0afbb632008-06-06 21:10:57 +00002991 hostnames = ','.join([entry.get_host().hostname
2992 for entry in queue_entries])
mbligh36768f02008-02-22 18:28:33 +00002993
showard87ba02a2009-04-20 19:37:32 +00002994 params = _autoserv_command_line(
2995 hostnames, execution_tag,
2996 ['-P', execution_tag, '-n',
2997 _drone_manager.absolute_path(control_path)],
showarde9c69362009-06-30 01:58:03 +00002998 job=self, verbose=False)
mbligh36768f02008-02-22 18:28:33 +00002999
jadmanski0afbb632008-06-06 21:10:57 +00003000 if not self.is_server_job():
3001 params.append('-c')
mbligh36768f02008-02-22 18:28:33 +00003002
showardb2e2c322008-10-14 17:33:55 +00003003 return params
mblighe2586682008-02-29 22:45:46 +00003004
mbligh36768f02008-02-22 18:28:33 +00003005
showardc9ae1782009-01-30 01:42:37 +00003006 def _should_run_cleanup(self, queue_entry):
showard0fc38302008-10-23 00:44:07 +00003007 if self.reboot_before == models.RebootBefore.ALWAYS:
showardc9ae1782009-01-30 01:42:37 +00003008 return True
showard0fc38302008-10-23 00:44:07 +00003009 elif self.reboot_before == models.RebootBefore.IF_DIRTY:
showardc9ae1782009-01-30 01:42:37 +00003010 return queue_entry.get_host().dirty
3011 return False
showard21baa452008-10-21 00:08:39 +00003012
showardc9ae1782009-01-30 01:42:37 +00003013
showard2fe3f1d2009-07-06 20:19:11 +00003014 def should_run_verify(self, queue_entry):
showardc9ae1782009-01-30 01:42:37 +00003015 do_not_verify = (queue_entry.host.protection ==
3016 host_protections.Protection.DO_NOT_VERIFY)
3017 if do_not_verify:
3018 return False
3019 return self.run_verify
3020
3021
showard77182562009-06-10 00:16:05 +00003022 def get_pre_job_tasks(self, queue_entry):
3023 """
3024 Get a list of tasks to perform before the host_queue_entry
3025 may be used to run this Job (such as Cleanup & Verify).
3026
3027 @returns A list of tasks to be done to the given queue_entry before
3028 it should be considered be ready to run this job. The last
3029 task in the list calls HostQueueEntry.on_pending(), which
3030 continues the flow of the job.
3031 """
showard21baa452008-10-21 00:08:39 +00003032 tasks = []
showardc9ae1782009-01-30 01:42:37 +00003033 if self._should_run_cleanup(queue_entry):
showard45ae8192008-11-05 19:32:53 +00003034 tasks.append(CleanupTask(queue_entry=queue_entry))
showard2fe3f1d2009-07-06 20:19:11 +00003035 if self.should_run_verify(queue_entry):
showardc9ae1782009-01-30 01:42:37 +00003036 tasks.append(VerifyTask(queue_entry=queue_entry))
3037 tasks.append(SetEntryPendingTask(queue_entry))
showard21baa452008-10-21 00:08:39 +00003038 return tasks
3039
3040
showardf1ae3542009-05-11 19:26:02 +00003041 def _assign_new_group(self, queue_entries, group_name=''):
showard2bab8f42008-11-12 18:15:22 +00003042 if len(queue_entries) == 1:
showardf1ae3542009-05-11 19:26:02 +00003043 group_subdir_name = queue_entries[0].get_host().hostname
showard2bab8f42008-11-12 18:15:22 +00003044 else:
showardf1ae3542009-05-11 19:26:02 +00003045 group_subdir_name = self._next_group_name(group_name)
showardb18134f2009-03-20 20:52:18 +00003046 logging.info('Running synchronous job %d hosts %s as %s',
showard2bab8f42008-11-12 18:15:22 +00003047 self.id, [entry.host.hostname for entry in queue_entries],
showardf1ae3542009-05-11 19:26:02 +00003048 group_subdir_name)
showard2bab8f42008-11-12 18:15:22 +00003049
3050 for queue_entry in queue_entries:
showardf1ae3542009-05-11 19:26:02 +00003051 queue_entry.set_execution_subdir(group_subdir_name)
showard2bab8f42008-11-12 18:15:22 +00003052
3053
3054 def _choose_group_to_run(self, include_queue_entry):
showardf1ae3542009-05-11 19:26:02 +00003055 """
3056 @returns A tuple containing a list of HostQueueEntry instances to be
3057 used to run this Job, a string group name to suggest giving
showard54c1ea92009-05-20 00:32:58 +00003058 to this job in the results database.
showardf1ae3542009-05-11 19:26:02 +00003059 """
showard77182562009-06-10 00:16:05 +00003060 atomic_group = include_queue_entry.atomic_group
showard2bab8f42008-11-12 18:15:22 +00003061 chosen_entries = [include_queue_entry]
showardf1ae3542009-05-11 19:26:02 +00003062 if atomic_group:
3063 num_entries_wanted = atomic_group.max_number_of_machines
3064 else:
3065 num_entries_wanted = self.synch_count
3066 num_entries_wanted -= len(chosen_entries)
showard2bab8f42008-11-12 18:15:22 +00003067
showardf1ae3542009-05-11 19:26:02 +00003068 if num_entries_wanted > 0:
3069 where_clause = 'job_id = %s AND status = "Pending" AND id != %s'
showard54c1ea92009-05-20 00:32:58 +00003070 pending_entries = list(HostQueueEntry.fetch(
showardf1ae3542009-05-11 19:26:02 +00003071 where=where_clause,
showard54c1ea92009-05-20 00:32:58 +00003072 params=(self.id, include_queue_entry.id)))
3073
3074 # Sort the chosen hosts by hostname before slicing.
3075 def cmp_queue_entries_by_hostname(entry_a, entry_b):
3076 return Host.cmp_for_sort(entry_a.host, entry_b.host)
3077 pending_entries.sort(cmp=cmp_queue_entries_by_hostname)
3078 chosen_entries += pending_entries[:num_entries_wanted]
showard2bab8f42008-11-12 18:15:22 +00003079
showardf1ae3542009-05-11 19:26:02 +00003080 # Sanity check. We'll only ever be called if this can be met.
3081 assert len(chosen_entries) >= self.synch_count
3082
3083 if atomic_group:
3084 # Look at any meta_host and dependency labels and pick the first
3085 # one that also specifies this atomic group. Use that label name
3086 # as the group name if possible (it is more specific).
3087 group_name = atomic_group.name
3088 for label in include_queue_entry.get_labels():
3089 if label.atomic_group_id:
3090 assert label.atomic_group_id == atomic_group.id
3091 group_name = label.name
3092 break
3093 else:
3094 group_name = ''
3095
3096 self._assign_new_group(chosen_entries, group_name=group_name)
3097 return chosen_entries, group_name
showard2bab8f42008-11-12 18:15:22 +00003098
3099
showard77182562009-06-10 00:16:05 +00003100 def run_if_ready(self, queue_entry):
3101 """
3102 @returns An Agent instance to ultimately run this job if enough hosts
3103 are ready for it to run.
3104 @returns None and potentially cleans up excess hosts if this Job
3105 is not ready to run.
3106 """
showardb2e2c322008-10-14 17:33:55 +00003107 if not self.is_ready():
showard77182562009-06-10 00:16:05 +00003108 self.stop_if_necessary()
3109 return None
mbligh36768f02008-02-22 18:28:33 +00003110
showard77182562009-06-10 00:16:05 +00003111 if queue_entry.atomic_group:
3112 return self.run_with_ready_delay(queue_entry)
3113
3114 return self.run(queue_entry)
3115
3116
3117 def run_with_ready_delay(self, queue_entry):
3118 """
3119 Start a delay to wait for more hosts to enter Pending state before
3120 launching an atomic group job. Once set, the a delay cannot be reset.
3121
3122 @param queue_entry: The HostQueueEntry object to get atomic group
3123 info from and pass to run_if_ready when the delay is up.
3124
3125 @returns An Agent to run the job as appropriate or None if a delay
3126 has already been set.
3127 """
3128 assert queue_entry.job_id == self.id
3129 assert queue_entry.atomic_group
3130 delay = scheduler_config.config.secs_to_wait_for_atomic_group_hosts
3131 pending_threshold = queue_entry.atomic_group.max_number_of_machines
3132 over_max_threshold = (self._pending_count() >= pending_threshold)
3133 delay_expired = (self._delay_ready_task and
3134 time.time() >= self._delay_ready_task.end_time)
3135
3136 # Delay is disabled or we already have enough? Do not wait to run.
3137 if not delay or over_max_threshold or delay_expired:
3138 return self.run(queue_entry)
3139
3140 # A delay was previously scheduled.
3141 if self._delay_ready_task:
3142 return None
3143
3144 def run_job_after_delay():
3145 logging.info('Job %s done waiting for extra hosts.', self.id)
3146 return self.run(queue_entry)
3147
3148 self._delay_ready_task = DelayedCallTask(delay_seconds=delay,
3149 callback=run_job_after_delay)
3150
3151 return Agent([self._delay_ready_task], num_processes=0)
3152
3153
3154 def run(self, queue_entry):
3155 """
3156 @param queue_entry: The HostQueueEntry instance calling this method.
3157 @returns An Agent instance to run this job or None if we've already
3158 been run.
3159 """
3160 if queue_entry.atomic_group and self._atomic_and_has_started():
3161 logging.error('Job.run() called on running atomic Job %d '
3162 'with HQE %s.', self.id, queue_entry)
3163 return None
showardf1ae3542009-05-11 19:26:02 +00003164 queue_entries, group_name = self._choose_group_to_run(queue_entry)
3165 return self._finish_run(queue_entries, group_name)
showardb2e2c322008-10-14 17:33:55 +00003166
3167
showardf1ae3542009-05-11 19:26:02 +00003168 def _finish_run(self, queue_entries, group_name):
showardb2ccdda2008-10-28 20:39:05 +00003169 for queue_entry in queue_entries:
3170 queue_entry.set_status('Starting')
showardb2e2c322008-10-14 17:33:55 +00003171 params = self._get_autoserv_params(queue_entries)
3172 queue_task = QueueTask(job=self, queue_entries=queue_entries,
showardf1ae3542009-05-11 19:26:02 +00003173 cmd=params, group_name=group_name)
3174 tasks = [queue_task]
showard77182562009-06-10 00:16:05 +00003175 if self._delay_ready_task:
3176 # Cancel any pending callback that would try to run again
3177 # as we are already running.
3178 self._delay_ready_task.abort()
showardb2e2c322008-10-14 17:33:55 +00003179
showard170873e2009-01-07 00:22:26 +00003180 return Agent(tasks, num_processes=len(queue_entries))
showardb2e2c322008-10-14 17:33:55 +00003181
3182
mbligh36768f02008-02-22 18:28:33 +00003183if __name__ == '__main__':
jadmanski0afbb632008-06-06 21:10:57 +00003184 main()