blob: 30ced1eeadccab77172612554de258b043735d43 [file] [log] [blame]
mbligh36768f02008-02-22 18:28:33 +00001#!/usr/bin/python -u
2
3"""
4Autotest scheduler
5"""
showard909c7a62008-07-15 21:52:38 +00006
mbligh36768f02008-02-22 18:28:33 +00007
showardef519212009-05-08 02:29:53 +00008import datetime, errno, optparse, os, pwd, Queue, re, shutil, signal
showard542e8402008-09-19 20:16:18 +00009import smtplib, socket, stat, subprocess, sys, tempfile, time, traceback
showard136e6dc2009-06-10 19:38:49 +000010import itertools, logging, weakref
mbligh70feeee2008-06-11 16:20:49 +000011import common
mbligh8bcd23a2009-02-03 19:14:06 +000012import MySQLdb
showard043c62a2009-06-10 19:48:57 +000013from autotest_lib.scheduler import scheduler_logging_config
showard21baa452008-10-21 00:08:39 +000014from autotest_lib.frontend import setup_django_environment
showard136e6dc2009-06-10 19:38:49 +000015from autotest_lib.client.common_lib import global_config, logging_manager
showardb18134f2009-03-20 20:52:18 +000016from autotest_lib.client.common_lib import host_protections, utils
showardb1e51872008-10-07 11:08:18 +000017from autotest_lib.database import database_connection
showard844960a2009-05-29 18:41:18 +000018from autotest_lib.frontend.afe import models, rpc_utils, readonly_connection
showard170873e2009-01-07 00:22:26 +000019from autotest_lib.scheduler import drone_manager, drones, email_manager
showard043c62a2009-06-10 19:48:57 +000020from autotest_lib.scheduler import monitor_db_cleanup
showardd1ee1dd2009-01-07 21:33:08 +000021from autotest_lib.scheduler import status_server, scheduler_config
mbligh70feeee2008-06-11 16:20:49 +000022
mblighb090f142008-02-27 21:33:46 +000023
mbligh36768f02008-02-22 18:28:33 +000024RESULTS_DIR = '.'
25AUTOSERV_NICE_LEVEL = 10
showard170873e2009-01-07 00:22:26 +000026DB_CONFIG_SECTION = 'AUTOTEST_WEB'
mbligh36768f02008-02-22 18:28:33 +000027AUTOTEST_PATH = os.path.join(os.path.dirname(__file__), '..')
28
29if os.environ.has_key('AUTOTEST_DIR'):
jadmanski0afbb632008-06-06 21:10:57 +000030 AUTOTEST_PATH = os.environ['AUTOTEST_DIR']
mbligh36768f02008-02-22 18:28:33 +000031AUTOTEST_SERVER_DIR = os.path.join(AUTOTEST_PATH, 'server')
32AUTOTEST_TKO_DIR = os.path.join(AUTOTEST_PATH, 'tko')
33
34if AUTOTEST_SERVER_DIR not in sys.path:
jadmanski0afbb632008-06-06 21:10:57 +000035 sys.path.insert(0, AUTOTEST_SERVER_DIR)
mbligh36768f02008-02-22 18:28:33 +000036
mbligh90a549d2008-03-25 23:52:34 +000037# how long to wait for autoserv to write a pidfile
38PIDFILE_TIMEOUT = 5 * 60 # 5 min
mblighbb421852008-03-11 22:36:16 +000039
showardd3dc1992009-04-22 21:01:40 +000040_AUTOSERV_PID_FILE = '.autoserv_execute'
41_CRASHINFO_PID_FILE = '.collect_crashinfo_execute'
42_PARSER_PID_FILE = '.parser_execute'
43
44_ALL_PIDFILE_NAMES = (_AUTOSERV_PID_FILE, _CRASHINFO_PID_FILE,
45 _PARSER_PID_FILE)
46
showard35162b02009-03-03 02:17:30 +000047# error message to leave in results dir when an autoserv process disappears
48# mysteriously
49_LOST_PROCESS_ERROR = """\
50Autoserv failed abnormally during execution for this job, probably due to a
51system error on the Autotest server. Full results may not be available. Sorry.
52"""
53
mbligh6f8bab42008-02-29 22:45:14 +000054_db = None
mbligh36768f02008-02-22 18:28:33 +000055_shutdown = False
showard170873e2009-01-07 00:22:26 +000056_autoserv_path = os.path.join(drones.AUTOTEST_INSTALL_DIR, 'server', 'autoserv')
57_parser_path = os.path.join(drones.AUTOTEST_INSTALL_DIR, 'tko', 'parse')
mbligh4314a712008-02-29 22:44:30 +000058_testing_mode = False
showard542e8402008-09-19 20:16:18 +000059_base_url = None
showardc85c21b2008-11-24 22:17:37 +000060_notify_email_statuses = []
showard170873e2009-01-07 00:22:26 +000061_drone_manager = drone_manager.DroneManager()
mbligh36768f02008-02-22 18:28:33 +000062
63
mbligh83c1e9e2009-05-01 23:10:41 +000064def _site_init_monitor_db_dummy():
65 return {}
66
67
mbligh36768f02008-02-22 18:28:33 +000068def main():
showard27f33872009-04-07 18:20:53 +000069 try:
70 main_without_exception_handling()
showard29caa4b2009-05-26 19:27:09 +000071 except SystemExit:
72 raise
showard27f33872009-04-07 18:20:53 +000073 except:
74 logging.exception('Exception escaping in monitor_db')
75 raise
76
77
78def main_without_exception_handling():
showard136e6dc2009-06-10 19:38:49 +000079 setup_logging()
mbligh36768f02008-02-22 18:28:33 +000080
showard136e6dc2009-06-10 19:38:49 +000081 usage = 'usage: %prog [options] results_dir'
jadmanski0afbb632008-06-06 21:10:57 +000082 parser = optparse.OptionParser(usage)
83 parser.add_option('--recover-hosts', help='Try to recover dead hosts',
84 action='store_true')
jadmanski0afbb632008-06-06 21:10:57 +000085 parser.add_option('--test', help='Indicate that scheduler is under ' +
86 'test and should use dummy autoserv and no parsing',
87 action='store_true')
88 (options, args) = parser.parse_args()
89 if len(args) != 1:
90 parser.print_usage()
91 return
mbligh36768f02008-02-22 18:28:33 +000092
showard5613c662009-06-08 23:30:33 +000093 scheduler_enabled = global_config.global_config.get_config_value(
94 scheduler_config.CONFIG_SECTION, 'enable_scheduler', type=bool)
95
96 if not scheduler_enabled:
97 msg = ("Scheduler not enabled, set enable_scheduler to true in the "
98 "global_config's SCHEDULER section to enabled it. Exiting.")
99 print msg
100 sys.exit(1)
101
jadmanski0afbb632008-06-06 21:10:57 +0000102 global RESULTS_DIR
103 RESULTS_DIR = args[0]
mbligh36768f02008-02-22 18:28:33 +0000104
mbligh83c1e9e2009-05-01 23:10:41 +0000105 site_init = utils.import_site_function(__file__,
106 "autotest_lib.scheduler.site_monitor_db", "site_init_monitor_db",
107 _site_init_monitor_db_dummy)
108 site_init()
109
showardcca334f2009-03-12 20:38:34 +0000110 # Change the cwd while running to avoid issues incase we were launched from
111 # somewhere odd (such as a random NFS home directory of the person running
112 # sudo to launch us as the appropriate user).
113 os.chdir(RESULTS_DIR)
114
jadmanski0afbb632008-06-06 21:10:57 +0000115 c = global_config.global_config
showardd1ee1dd2009-01-07 21:33:08 +0000116 notify_statuses_list = c.get_config_value(scheduler_config.CONFIG_SECTION,
117 "notify_email_statuses",
118 default='')
showardc85c21b2008-11-24 22:17:37 +0000119 global _notify_email_statuses
showard170873e2009-01-07 00:22:26 +0000120 _notify_email_statuses = [status for status in
121 re.split(r'[\s,;:]', notify_statuses_list.lower())
122 if status]
showardc85c21b2008-11-24 22:17:37 +0000123
jadmanski0afbb632008-06-06 21:10:57 +0000124 if options.test:
125 global _autoserv_path
126 _autoserv_path = 'autoserv_dummy'
127 global _testing_mode
128 _testing_mode = True
mbligh36768f02008-02-22 18:28:33 +0000129
mbligh37eceaa2008-12-15 22:56:37 +0000130 # AUTOTEST_WEB.base_url is still a supported config option as some people
131 # may wish to override the entire url.
showard542e8402008-09-19 20:16:18 +0000132 global _base_url
showard170873e2009-01-07 00:22:26 +0000133 config_base_url = c.get_config_value(DB_CONFIG_SECTION, 'base_url',
134 default='')
mbligh37eceaa2008-12-15 22:56:37 +0000135 if config_base_url:
136 _base_url = config_base_url
showard542e8402008-09-19 20:16:18 +0000137 else:
mbligh37eceaa2008-12-15 22:56:37 +0000138 # For the common case of everything running on a single server you
139 # can just set the hostname in a single place in the config file.
140 server_name = c.get_config_value('SERVER', 'hostname')
141 if not server_name:
showardb18134f2009-03-20 20:52:18 +0000142 logging.critical('[SERVER] hostname missing from the config file.')
mbligh37eceaa2008-12-15 22:56:37 +0000143 sys.exit(1)
144 _base_url = 'http://%s/afe/' % server_name
showard542e8402008-09-19 20:16:18 +0000145
showardc5afc462009-01-13 00:09:39 +0000146 server = status_server.StatusServer(_drone_manager)
showardd1ee1dd2009-01-07 21:33:08 +0000147 server.start()
148
jadmanski0afbb632008-06-06 21:10:57 +0000149 try:
showard136e6dc2009-06-10 19:38:49 +0000150 init()
showardc5afc462009-01-13 00:09:39 +0000151 dispatcher = Dispatcher()
showard915958d2009-04-22 21:00:58 +0000152 dispatcher.initialize(recover_hosts=options.recover_hosts)
showardc5afc462009-01-13 00:09:39 +0000153
jadmanski0afbb632008-06-06 21:10:57 +0000154 while not _shutdown:
155 dispatcher.tick()
showardd1ee1dd2009-01-07 21:33:08 +0000156 time.sleep(scheduler_config.config.tick_pause_sec)
jadmanski0afbb632008-06-06 21:10:57 +0000157 except:
showard170873e2009-01-07 00:22:26 +0000158 email_manager.manager.log_stacktrace(
159 "Uncaught exception; terminating monitor_db")
jadmanski0afbb632008-06-06 21:10:57 +0000160
showard170873e2009-01-07 00:22:26 +0000161 email_manager.manager.send_queued_emails()
showard55b4b542009-01-08 23:30:30 +0000162 server.shutdown()
showard170873e2009-01-07 00:22:26 +0000163 _drone_manager.shutdown()
jadmanski0afbb632008-06-06 21:10:57 +0000164 _db.disconnect()
mbligh36768f02008-02-22 18:28:33 +0000165
166
showard136e6dc2009-06-10 19:38:49 +0000167def setup_logging():
168 log_dir = os.environ.get('AUTOTEST_SCHEDULER_LOG_DIR', None)
169 log_name = os.environ.get('AUTOTEST_SCHEDULER_LOG_NAME', None)
170 logging_manager.configure_logging(
171 scheduler_logging_config.SchedulerLoggingConfig(), log_dir=log_dir,
172 logfile_name=log_name)
173
174
mbligh36768f02008-02-22 18:28:33 +0000175def handle_sigint(signum, frame):
jadmanski0afbb632008-06-06 21:10:57 +0000176 global _shutdown
177 _shutdown = True
showardb18134f2009-03-20 20:52:18 +0000178 logging.info("Shutdown request received.")
mbligh36768f02008-02-22 18:28:33 +0000179
180
showard136e6dc2009-06-10 19:38:49 +0000181def init():
showardb18134f2009-03-20 20:52:18 +0000182 logging.info("%s> dispatcher starting", time.strftime("%X %x"))
183 logging.info("My PID is %d", os.getpid())
mbligh36768f02008-02-22 18:28:33 +0000184
mblighfb676032009-04-01 18:25:38 +0000185 utils.write_pid("monitor_db")
186
showardb1e51872008-10-07 11:08:18 +0000187 if _testing_mode:
188 global_config.global_config.override_config_value(
showard170873e2009-01-07 00:22:26 +0000189 DB_CONFIG_SECTION, 'database', 'stresstest_autotest_web')
showardb1e51872008-10-07 11:08:18 +0000190
jadmanski0afbb632008-06-06 21:10:57 +0000191 os.environ['PATH'] = AUTOTEST_SERVER_DIR + ':' + os.environ['PATH']
192 global _db
showard170873e2009-01-07 00:22:26 +0000193 _db = database_connection.DatabaseConnection(DB_CONFIG_SECTION)
jadmanski0afbb632008-06-06 21:10:57 +0000194 _db.connect()
mbligh36768f02008-02-22 18:28:33 +0000195
showardfa8629c2008-11-04 16:51:23 +0000196 # ensure Django connection is in autocommit
197 setup_django_environment.enable_autocommit()
showard844960a2009-05-29 18:41:18 +0000198 # bypass the readonly connection
199 readonly_connection.ReadOnlyConnection.set_globally_disabled(True)
showardfa8629c2008-11-04 16:51:23 +0000200
showardb18134f2009-03-20 20:52:18 +0000201 logging.info("Setting signal handler")
jadmanski0afbb632008-06-06 21:10:57 +0000202 signal.signal(signal.SIGINT, handle_sigint)
203
showardd1ee1dd2009-01-07 21:33:08 +0000204 drones = global_config.global_config.get_config_value(
205 scheduler_config.CONFIG_SECTION, 'drones', default='localhost')
206 drone_list = [hostname.strip() for hostname in drones.split(',')]
showard170873e2009-01-07 00:22:26 +0000207 results_host = global_config.global_config.get_config_value(
showardd1ee1dd2009-01-07 21:33:08 +0000208 scheduler_config.CONFIG_SECTION, 'results_host', default='localhost')
showard170873e2009-01-07 00:22:26 +0000209 _drone_manager.initialize(RESULTS_DIR, drone_list, results_host)
210
showardb18134f2009-03-20 20:52:18 +0000211 logging.info("Connected! Running...")
mbligh36768f02008-02-22 18:28:33 +0000212
213
showard87ba02a2009-04-20 19:37:32 +0000214def _autoserv_command_line(machines, results_dir, extra_args, job=None,
215 queue_entry=None):
showardf1ae3542009-05-11 19:26:02 +0000216 """
217 @returns The autoserv command line as a list of executable + parameters.
218
219 @param machines - string - A machine or comma separated list of machines
220 for the (-m) flag.
221 @param results_dir - string - Where the results will be written (-r).
222 @param extra_args - list - Additional arguments to pass to autoserv.
223 @param job - Job object - If supplied, -u owner and -l name parameters
224 will be added.
225 @param queue_entry - A HostQueueEntry object - If supplied and no Job
226 object was supplied, this will be used to lookup the Job object.
227 """
showard87ba02a2009-04-20 19:37:32 +0000228 autoserv_argv = [_autoserv_path, '-p', '-m', machines,
229 '-r', _drone_manager.absolute_path(results_dir)]
230 if job or queue_entry:
231 if not job:
232 job = queue_entry.job
233 autoserv_argv += ['-u', job.owner, '-l', job.name]
234 return autoserv_argv + extra_args
235
236
showard89f84db2009-03-12 20:39:13 +0000237class SchedulerError(Exception):
238 """Raised by HostScheduler when an inconsistent state occurs."""
239
240
showard63a34772008-08-18 19:32:50 +0000241class HostScheduler(object):
242 def _get_ready_hosts(self):
243 # avoid any host with a currently active queue entry against it
244 hosts = Host.fetch(
245 joins='LEFT JOIN host_queue_entries AS active_hqe '
246 'ON (hosts.id = active_hqe.host_id AND '
showardb1e51872008-10-07 11:08:18 +0000247 'active_hqe.active)',
showard63a34772008-08-18 19:32:50 +0000248 where="active_hqe.host_id IS NULL "
showardb1e51872008-10-07 11:08:18 +0000249 "AND NOT hosts.locked "
showard63a34772008-08-18 19:32:50 +0000250 "AND (hosts.status IS NULL OR hosts.status = 'Ready')")
251 return dict((host.id, host) for host in hosts)
252
253
254 @staticmethod
255 def _get_sql_id_list(id_list):
256 return ','.join(str(item_id) for item_id in id_list)
257
258
259 @classmethod
showard989f25d2008-10-01 11:38:11 +0000260 def _get_many2many_dict(cls, query, id_list, flip=False):
mbligh849a0f62008-08-28 20:12:19 +0000261 if not id_list:
262 return {}
showard63a34772008-08-18 19:32:50 +0000263 query %= cls._get_sql_id_list(id_list)
264 rows = _db.execute(query)
showard989f25d2008-10-01 11:38:11 +0000265 return cls._process_many2many_dict(rows, flip)
266
267
268 @staticmethod
269 def _process_many2many_dict(rows, flip=False):
showard63a34772008-08-18 19:32:50 +0000270 result = {}
271 for row in rows:
showard89f84db2009-03-12 20:39:13 +0000272 left_id, right_id = int(row[0]), int(row[1])
showard989f25d2008-10-01 11:38:11 +0000273 if flip:
274 left_id, right_id = right_id, left_id
showard63a34772008-08-18 19:32:50 +0000275 result.setdefault(left_id, set()).add(right_id)
276 return result
277
278
279 @classmethod
280 def _get_job_acl_groups(cls, job_ids):
281 query = """
showardd9ac4452009-02-07 02:04:37 +0000282 SELECT jobs.id, acl_groups_users.aclgroup_id
showard63a34772008-08-18 19:32:50 +0000283 FROM jobs
284 INNER JOIN users ON users.login = jobs.owner
285 INNER JOIN acl_groups_users ON acl_groups_users.user_id = users.id
286 WHERE jobs.id IN (%s)
287 """
288 return cls._get_many2many_dict(query, job_ids)
289
290
291 @classmethod
292 def _get_job_ineligible_hosts(cls, job_ids):
293 query = """
294 SELECT job_id, host_id
295 FROM ineligible_host_queues
296 WHERE job_id IN (%s)
297 """
298 return cls._get_many2many_dict(query, job_ids)
299
300
301 @classmethod
showard989f25d2008-10-01 11:38:11 +0000302 def _get_job_dependencies(cls, job_ids):
303 query = """
304 SELECT job_id, label_id
305 FROM jobs_dependency_labels
306 WHERE job_id IN (%s)
307 """
308 return cls._get_many2many_dict(query, job_ids)
309
310
311 @classmethod
showard63a34772008-08-18 19:32:50 +0000312 def _get_host_acls(cls, host_ids):
313 query = """
showardd9ac4452009-02-07 02:04:37 +0000314 SELECT host_id, aclgroup_id
showard63a34772008-08-18 19:32:50 +0000315 FROM acl_groups_hosts
316 WHERE host_id IN (%s)
317 """
318 return cls._get_many2many_dict(query, host_ids)
319
320
321 @classmethod
322 def _get_label_hosts(cls, host_ids):
showardfa8629c2008-11-04 16:51:23 +0000323 if not host_ids:
324 return {}, {}
showard63a34772008-08-18 19:32:50 +0000325 query = """
326 SELECT label_id, host_id
327 FROM hosts_labels
328 WHERE host_id IN (%s)
showard989f25d2008-10-01 11:38:11 +0000329 """ % cls._get_sql_id_list(host_ids)
330 rows = _db.execute(query)
331 labels_to_hosts = cls._process_many2many_dict(rows)
332 hosts_to_labels = cls._process_many2many_dict(rows, flip=True)
333 return labels_to_hosts, hosts_to_labels
334
335
336 @classmethod
337 def _get_labels(cls):
338 return dict((label.id, label) for label in Label.fetch())
showard63a34772008-08-18 19:32:50 +0000339
340
341 def refresh(self, pending_queue_entries):
342 self._hosts_available = self._get_ready_hosts()
343
344 relevant_jobs = [queue_entry.job_id
345 for queue_entry in pending_queue_entries]
346 self._job_acls = self._get_job_acl_groups(relevant_jobs)
347 self._ineligible_hosts = self._get_job_ineligible_hosts(relevant_jobs)
showard989f25d2008-10-01 11:38:11 +0000348 self._job_dependencies = self._get_job_dependencies(relevant_jobs)
showard63a34772008-08-18 19:32:50 +0000349
350 host_ids = self._hosts_available.keys()
351 self._host_acls = self._get_host_acls(host_ids)
showard989f25d2008-10-01 11:38:11 +0000352 self._label_hosts, self._host_labels = self._get_label_hosts(host_ids)
353
354 self._labels = self._get_labels()
showard63a34772008-08-18 19:32:50 +0000355
356
357 def _is_acl_accessible(self, host_id, queue_entry):
358 job_acls = self._job_acls.get(queue_entry.job_id, set())
359 host_acls = self._host_acls.get(host_id, set())
360 return len(host_acls.intersection(job_acls)) > 0
361
362
showard989f25d2008-10-01 11:38:11 +0000363 def _check_job_dependencies(self, job_dependencies, host_labels):
364 missing = job_dependencies - host_labels
showard89f84db2009-03-12 20:39:13 +0000365 return len(missing) == 0
showard989f25d2008-10-01 11:38:11 +0000366
367
368 def _check_only_if_needed_labels(self, job_dependencies, host_labels,
369 queue_entry):
showardade14e22009-01-26 22:38:32 +0000370 if not queue_entry.meta_host:
371 # bypass only_if_needed labels when a specific host is selected
372 return True
373
showard989f25d2008-10-01 11:38:11 +0000374 for label_id in host_labels:
375 label = self._labels[label_id]
376 if not label.only_if_needed:
377 # we don't care about non-only_if_needed labels
378 continue
379 if queue_entry.meta_host == label_id:
380 # if the label was requested in a metahost it's OK
381 continue
382 if label_id not in job_dependencies:
383 return False
384 return True
385
386
showard89f84db2009-03-12 20:39:13 +0000387 def _check_atomic_group_labels(self, host_labels, queue_entry):
388 """
389 Determine if the given HostQueueEntry's atomic group settings are okay
390 to schedule on a host with the given labels.
391
392 @param host_labels - A list of label ids that the host has.
393 @param queue_entry - The HostQueueEntry being considered for the host.
394
395 @returns True if atomic group settings are okay, False otherwise.
396 """
397 return (self._get_host_atomic_group_id(host_labels) ==
398 queue_entry.atomic_group_id)
399
400
401 def _get_host_atomic_group_id(self, host_labels):
402 """
403 Return the atomic group label id for a host with the given set of
404 labels if any, or None otherwise. Raises an exception if more than
405 one atomic group are found in the set of labels.
406
407 @param host_labels - A list of label ids that the host has.
408
409 @returns The id of the atomic group found on a label in host_labels
410 or None if no atomic group label is found.
411 @raises SchedulerError - If more than one atomic group label is found.
412 """
413 atomic_ids = [self._labels[label_id].atomic_group_id
414 for label_id in host_labels
415 if self._labels[label_id].atomic_group_id is not None]
416 if not atomic_ids:
417 return None
418 if len(atomic_ids) > 1:
419 raise SchedulerError('More than one atomic label on host.')
420 return atomic_ids[0]
421
422
423 def _get_atomic_group_labels(self, atomic_group_id):
424 """
425 Lookup the label ids that an atomic_group is associated with.
426
427 @param atomic_group_id - The id of the AtomicGroup to look up.
428
429 @returns A generator yeilding Label ids for this atomic group.
430 """
431 return (id for id, label in self._labels.iteritems()
432 if label.atomic_group_id == atomic_group_id
433 and not label.invalid)
434
435
showard54c1ea92009-05-20 00:32:58 +0000436 def _get_eligible_host_ids_in_group(self, group_hosts, queue_entry):
showard89f84db2009-03-12 20:39:13 +0000437 """
438 @param group_hosts - A sequence of Host ids to test for usability
439 and eligibility against the Job associated with queue_entry.
440 @param queue_entry - The HostQueueEntry that these hosts are being
441 tested for eligibility against.
442
443 @returns A subset of group_hosts Host ids that are eligible for the
444 supplied queue_entry.
445 """
446 return set(host_id for host_id in group_hosts
447 if self._is_host_usable(host_id)
448 and self._is_host_eligible_for_job(host_id, queue_entry))
449
450
showard989f25d2008-10-01 11:38:11 +0000451 def _is_host_eligible_for_job(self, host_id, queue_entry):
452 job_dependencies = self._job_dependencies.get(queue_entry.job_id, set())
453 host_labels = self._host_labels.get(host_id, set())
mblighc993bee2008-10-03 03:42:34 +0000454
showard89f84db2009-03-12 20:39:13 +0000455 return (self._is_acl_accessible(host_id, queue_entry) and
456 self._check_job_dependencies(job_dependencies, host_labels) and
457 self._check_only_if_needed_labels(
458 job_dependencies, host_labels, queue_entry) and
459 self._check_atomic_group_labels(host_labels, queue_entry))
showard989f25d2008-10-01 11:38:11 +0000460
461
showard63a34772008-08-18 19:32:50 +0000462 def _schedule_non_metahost(self, queue_entry):
showard989f25d2008-10-01 11:38:11 +0000463 if not self._is_host_eligible_for_job(queue_entry.host_id, queue_entry):
showard63a34772008-08-18 19:32:50 +0000464 return None
465 return self._hosts_available.pop(queue_entry.host_id, None)
466
467
468 def _is_host_usable(self, host_id):
469 if host_id not in self._hosts_available:
470 # host was already used during this scheduling cycle
471 return False
472 if self._hosts_available[host_id].invalid:
473 # Invalid hosts cannot be used for metahosts. They're included in
474 # the original query because they can be used by non-metahosts.
475 return False
476 return True
477
478
479 def _schedule_metahost(self, queue_entry):
480 label_id = queue_entry.meta_host
481 hosts_in_label = self._label_hosts.get(label_id, set())
482 ineligible_host_ids = self._ineligible_hosts.get(queue_entry.job_id,
483 set())
484
485 # must iterate over a copy so we can mutate the original while iterating
486 for host_id in list(hosts_in_label):
487 if not self._is_host_usable(host_id):
488 hosts_in_label.remove(host_id)
489 continue
490 if host_id in ineligible_host_ids:
491 continue
showard989f25d2008-10-01 11:38:11 +0000492 if not self._is_host_eligible_for_job(host_id, queue_entry):
showard63a34772008-08-18 19:32:50 +0000493 continue
494
showard89f84db2009-03-12 20:39:13 +0000495 # Remove the host from our cached internal state before returning
496 # the host object.
showard63a34772008-08-18 19:32:50 +0000497 hosts_in_label.remove(host_id)
498 return self._hosts_available.pop(host_id)
499 return None
500
501
502 def find_eligible_host(self, queue_entry):
503 if not queue_entry.meta_host:
showard89f84db2009-03-12 20:39:13 +0000504 assert queue_entry.host_id is not None
showard63a34772008-08-18 19:32:50 +0000505 return self._schedule_non_metahost(queue_entry)
showard89f84db2009-03-12 20:39:13 +0000506 assert queue_entry.atomic_group_id is None
showard63a34772008-08-18 19:32:50 +0000507 return self._schedule_metahost(queue_entry)
508
509
showard89f84db2009-03-12 20:39:13 +0000510 def find_eligible_atomic_group(self, queue_entry):
511 """
512 Given an atomic group host queue entry, locate an appropriate group
513 of hosts for the associated job to run on.
514
515 The caller is responsible for creating new HQEs for the additional
516 hosts returned in order to run the actual job on them.
517
518 @returns A list of Host instances in a ready state to satisfy this
519 atomic group scheduling. Hosts will all belong to the same
520 atomic group label as specified by the queue_entry.
521 An empty list will be returned if no suitable atomic
522 group could be found.
523
524 TODO(gps): what is responsible for kicking off any attempted repairs on
525 a group of hosts? not this function, but something needs to. We do
526 not communicate that reason for returning [] outside of here...
527 For now, we'll just be unschedulable if enough hosts within one group
528 enter Repair Failed state.
529 """
530 assert queue_entry.atomic_group_id is not None
531 job = queue_entry.job
532 assert job.synch_count and job.synch_count > 0
showard77182562009-06-10 00:16:05 +0000533 atomic_group = queue_entry.atomic_group
showard89f84db2009-03-12 20:39:13 +0000534 if job.synch_count > atomic_group.max_number_of_machines:
535 # Such a Job and HostQueueEntry should never be possible to
536 # create using the frontend. Regardless, we can't process it.
537 # Abort it immediately and log an error on the scheduler.
538 queue_entry.set_status(models.HostQueueEntry.Status.ABORTED)
showard7629f142009-03-27 21:02:02 +0000539 logging.error(
540 'Error: job %d synch_count=%d > requested atomic_group %d '
541 'max_number_of_machines=%d. Aborted host_queue_entry %d.',
542 job.id, job.synch_count, atomic_group.id,
543 atomic_group.max_number_of_machines, queue_entry.id)
showard89f84db2009-03-12 20:39:13 +0000544 return []
545 hosts_in_label = self._label_hosts.get(queue_entry.meta_host, set())
546 ineligible_host_ids = self._ineligible_hosts.get(queue_entry.job_id,
547 set())
548
549 # Look in each label associated with atomic_group until we find one with
550 # enough hosts to satisfy the job.
551 for atomic_label_id in self._get_atomic_group_labels(atomic_group.id):
552 group_hosts = set(self._label_hosts.get(atomic_label_id, set()))
553 if queue_entry.meta_host is not None:
554 # If we have a metahost label, only allow its hosts.
555 group_hosts.intersection_update(hosts_in_label)
556 group_hosts -= ineligible_host_ids
showard54c1ea92009-05-20 00:32:58 +0000557 eligible_host_ids_in_group = self._get_eligible_host_ids_in_group(
showard89f84db2009-03-12 20:39:13 +0000558 group_hosts, queue_entry)
559
560 # Job.synch_count is treated as "minimum synch count" when
561 # scheduling for an atomic group of hosts. The atomic group
562 # number of machines is the maximum to pick out of a single
563 # atomic group label for scheduling at one time.
564 min_hosts = job.synch_count
565 max_hosts = atomic_group.max_number_of_machines
566
showard54c1ea92009-05-20 00:32:58 +0000567 if len(eligible_host_ids_in_group) < min_hosts:
showard89f84db2009-03-12 20:39:13 +0000568 # Not enough eligible hosts in this atomic group label.
569 continue
570
showard54c1ea92009-05-20 00:32:58 +0000571 eligible_hosts_in_group = [self._hosts_available[id]
572 for id in eligible_host_ids_in_group]
showardef519212009-05-08 02:29:53 +0000573 # So that they show up in a sane order when viewing the job.
showard54c1ea92009-05-20 00:32:58 +0000574 eligible_hosts_in_group.sort(cmp=Host.cmp_for_sort)
showardef519212009-05-08 02:29:53 +0000575
showard89f84db2009-03-12 20:39:13 +0000576 # Limit ourselves to scheduling the atomic group size.
577 if len(eligible_hosts_in_group) > max_hosts:
showardef519212009-05-08 02:29:53 +0000578 eligible_hosts_in_group = eligible_hosts_in_group[:max_hosts]
showard89f84db2009-03-12 20:39:13 +0000579
580 # Remove the selected hosts from our cached internal state
581 # of available hosts in order to return the Host objects.
582 host_list = []
showard54c1ea92009-05-20 00:32:58 +0000583 for host in eligible_hosts_in_group:
584 hosts_in_label.discard(host.id)
585 self._hosts_available.pop(host.id)
586 host_list.append(host)
showard89f84db2009-03-12 20:39:13 +0000587 return host_list
588
589 return []
590
591
showard170873e2009-01-07 00:22:26 +0000592class Dispatcher(object):
jadmanski0afbb632008-06-06 21:10:57 +0000593 def __init__(self):
594 self._agents = []
showard3bb499f2008-07-03 19:42:20 +0000595 self._last_clean_time = time.time()
showard63a34772008-08-18 19:32:50 +0000596 self._host_scheduler = HostScheduler()
mblighf3294cc2009-04-08 21:17:38 +0000597 user_cleanup_time = scheduler_config.config.clean_interval
598 self._periodic_cleanup = monitor_db_cleanup.UserCleanup(
599 _db, user_cleanup_time)
600 self._24hr_upkeep = monitor_db_cleanup.TwentyFourHourUpkeep(_db)
showard170873e2009-01-07 00:22:26 +0000601 self._host_agents = {}
602 self._queue_entry_agents = {}
mbligh36768f02008-02-22 18:28:33 +0000603
mbligh36768f02008-02-22 18:28:33 +0000604
showard915958d2009-04-22 21:00:58 +0000605 def initialize(self, recover_hosts=True):
606 self._periodic_cleanup.initialize()
607 self._24hr_upkeep.initialize()
608
jadmanski0afbb632008-06-06 21:10:57 +0000609 # always recover processes
610 self._recover_processes()
mblighbb421852008-03-11 22:36:16 +0000611
jadmanski0afbb632008-06-06 21:10:57 +0000612 if recover_hosts:
613 self._recover_hosts()
mbligh36768f02008-02-22 18:28:33 +0000614
615
jadmanski0afbb632008-06-06 21:10:57 +0000616 def tick(self):
showard170873e2009-01-07 00:22:26 +0000617 _drone_manager.refresh()
mblighf3294cc2009-04-08 21:17:38 +0000618 self._run_cleanup()
jadmanski0afbb632008-06-06 21:10:57 +0000619 self._find_aborting()
showard1ff7b2e2009-05-15 23:17:18 +0000620 self._find_reverify()
showard29f7cd22009-04-29 21:16:24 +0000621 self._process_recurring_runs()
jadmanski0afbb632008-06-06 21:10:57 +0000622 self._schedule_new_jobs()
623 self._handle_agents()
showard170873e2009-01-07 00:22:26 +0000624 _drone_manager.execute_actions()
625 email_manager.manager.send_queued_emails()
mbligh36768f02008-02-22 18:28:33 +0000626
showard97aed502008-11-04 02:01:24 +0000627
mblighf3294cc2009-04-08 21:17:38 +0000628 def _run_cleanup(self):
629 self._periodic_cleanup.run_cleanup_maybe()
630 self._24hr_upkeep.run_cleanup_maybe()
showarda3ab0d52008-11-03 19:03:47 +0000631
mbligh36768f02008-02-22 18:28:33 +0000632
showard170873e2009-01-07 00:22:26 +0000633 def _register_agent_for_ids(self, agent_dict, object_ids, agent):
634 for object_id in object_ids:
635 agent_dict.setdefault(object_id, set()).add(agent)
636
637
638 def _unregister_agent_for_ids(self, agent_dict, object_ids, agent):
639 for object_id in object_ids:
640 assert object_id in agent_dict
641 agent_dict[object_id].remove(agent)
642
643
jadmanski0afbb632008-06-06 21:10:57 +0000644 def add_agent(self, agent):
645 self._agents.append(agent)
646 agent.dispatcher = self
showard170873e2009-01-07 00:22:26 +0000647 self._register_agent_for_ids(self._host_agents, agent.host_ids, agent)
648 self._register_agent_for_ids(self._queue_entry_agents,
649 agent.queue_entry_ids, agent)
mblighd5c95802008-03-05 00:33:46 +0000650
showard170873e2009-01-07 00:22:26 +0000651
652 def get_agents_for_entry(self, queue_entry):
653 """
654 Find agents corresponding to the specified queue_entry.
655 """
showardd3dc1992009-04-22 21:01:40 +0000656 return list(self._queue_entry_agents.get(queue_entry.id, set()))
showard170873e2009-01-07 00:22:26 +0000657
658
659 def host_has_agent(self, host):
660 """
661 Determine if there is currently an Agent present using this host.
662 """
663 return bool(self._host_agents.get(host.id, None))
mbligh36768f02008-02-22 18:28:33 +0000664
665
jadmanski0afbb632008-06-06 21:10:57 +0000666 def remove_agent(self, agent):
667 self._agents.remove(agent)
showard170873e2009-01-07 00:22:26 +0000668 self._unregister_agent_for_ids(self._host_agents, agent.host_ids,
669 agent)
670 self._unregister_agent_for_ids(self._queue_entry_agents,
671 agent.queue_entry_ids, agent)
showardec113162008-05-08 00:52:49 +0000672
673
jadmanski0afbb632008-06-06 21:10:57 +0000674 def _recover_processes(self):
showard170873e2009-01-07 00:22:26 +0000675 self._register_pidfiles()
676 _drone_manager.refresh()
showardd3dc1992009-04-22 21:01:40 +0000677 self._recover_all_recoverable_entries()
showard170873e2009-01-07 00:22:26 +0000678 self._requeue_other_active_entries()
showard170873e2009-01-07 00:22:26 +0000679 self._reverify_remaining_hosts()
680 # reinitialize drones after killing orphaned processes, since they can
681 # leave around files when they die
682 _drone_manager.execute_actions()
683 _drone_manager.reinitialize_drones()
mblighbb421852008-03-11 22:36:16 +0000684
showard170873e2009-01-07 00:22:26 +0000685
686 def _register_pidfiles(self):
687 # during recovery we may need to read pidfiles for both running and
688 # parsing entries
689 queue_entries = HostQueueEntry.fetch(
showardd3dc1992009-04-22 21:01:40 +0000690 where="status IN ('Running', 'Gathering', 'Parsing')")
jadmanski0afbb632008-06-06 21:10:57 +0000691 for queue_entry in queue_entries:
showardd3dc1992009-04-22 21:01:40 +0000692 for pidfile_name in _ALL_PIDFILE_NAMES:
693 pidfile_id = _drone_manager.get_pidfile_id_from(
694 queue_entry.execution_tag(), pidfile_name=pidfile_name)
695 _drone_manager.register_pidfile(pidfile_id)
showard170873e2009-01-07 00:22:26 +0000696
697
showardd3dc1992009-04-22 21:01:40 +0000698 def _recover_entries_with_status(self, status, orphans, pidfile_name,
699 recover_entries_fn):
700 queue_entries = HostQueueEntry.fetch(where="status = '%s'" % status)
showard170873e2009-01-07 00:22:26 +0000701 for queue_entry in queue_entries:
702 if self.get_agents_for_entry(queue_entry):
jadmanski0afbb632008-06-06 21:10:57 +0000703 # synchronous job we've already recovered
704 continue
showardd3dc1992009-04-22 21:01:40 +0000705 queue_entries = queue_entry.job.get_group_entries(queue_entry)
showard170873e2009-01-07 00:22:26 +0000706 execution_tag = queue_entry.execution_tag()
707 run_monitor = PidfileRunMonitor()
showardd3dc1992009-04-22 21:01:40 +0000708 run_monitor.attach_to_existing_process(execution_tag,
709 pidfile_name=pidfile_name)
showard597bfd32009-05-08 18:22:50 +0000710
711 log_message = ('Recovering %s entry %s ' %
712 (status.lower(),
713 ', '.join(str(entry) for entry in queue_entries)))
showard170873e2009-01-07 00:22:26 +0000714 if not run_monitor.has_process():
showardd3dc1992009-04-22 21:01:40 +0000715 # execution apparently never happened
showard597bfd32009-05-08 18:22:50 +0000716 logging.info(log_message + 'without process')
showardd3dc1992009-04-22 21:01:40 +0000717 recover_entries_fn(queue_entry.job, queue_entries, None)
showard170873e2009-01-07 00:22:26 +0000718 continue
mbligh90a549d2008-03-25 23:52:34 +0000719
showard597bfd32009-05-08 18:22:50 +0000720 logging.info(log_message + '(process %s)',
showardd3dc1992009-04-22 21:01:40 +0000721 run_monitor.get_process())
722 recover_entries_fn(queue_entry.job, queue_entries, run_monitor)
723 orphans.discard(run_monitor.get_process())
724
725
726 def _kill_remaining_orphan_processes(self, orphans):
727 for process in orphans:
showardb18134f2009-03-20 20:52:18 +0000728 logging.info('Killing orphan %s', process)
showard170873e2009-01-07 00:22:26 +0000729 _drone_manager.kill_process(process)
jadmanski0afbb632008-06-06 21:10:57 +0000730
showard170873e2009-01-07 00:22:26 +0000731
showardd3dc1992009-04-22 21:01:40 +0000732 def _recover_running_entries(self, orphans):
733 def recover_entries(job, queue_entries, run_monitor):
734 if run_monitor is not None:
735 queue_task = RecoveryQueueTask(job=job,
736 queue_entries=queue_entries,
737 run_monitor=run_monitor)
738 self.add_agent(Agent(tasks=[queue_task],
739 num_processes=len(queue_entries)))
740 # else, _requeue_other_active_entries will cover this
741
742 self._recover_entries_with_status(models.HostQueueEntry.Status.RUNNING,
743 orphans, '.autoserv_execute',
744 recover_entries)
745
746
747 def _recover_gathering_entries(self, orphans):
748 def recover_entries(job, queue_entries, run_monitor):
749 gather_task = GatherLogsTask(job, queue_entries,
750 run_monitor=run_monitor)
751 self.add_agent(Agent([gather_task]))
752
753 self._recover_entries_with_status(
754 models.HostQueueEntry.Status.GATHERING,
755 orphans, _CRASHINFO_PID_FILE, recover_entries)
756
757
758 def _recover_parsing_entries(self, orphans):
759 def recover_entries(job, queue_entries, run_monitor):
760 reparse_task = FinalReparseTask(queue_entries,
761 run_monitor=run_monitor)
762 self.add_agent(Agent([reparse_task], num_processes=0))
763
764 self._recover_entries_with_status(models.HostQueueEntry.Status.PARSING,
765 orphans, _PARSER_PID_FILE,
766 recover_entries)
767
768
769 def _recover_all_recoverable_entries(self):
770 orphans = _drone_manager.get_orphaned_autoserv_processes()
771 self._recover_running_entries(orphans)
772 self._recover_gathering_entries(orphans)
773 self._recover_parsing_entries(orphans)
774 self._kill_remaining_orphan_processes(orphans)
jadmanski0afbb632008-06-06 21:10:57 +0000775
showard97aed502008-11-04 02:01:24 +0000776
showard170873e2009-01-07 00:22:26 +0000777 def _requeue_other_active_entries(self):
778 queue_entries = HostQueueEntry.fetch(
showardd3dc1992009-04-22 21:01:40 +0000779 where='active AND NOT complete AND '
780 '(aborted OR status != "Pending")')
showard170873e2009-01-07 00:22:26 +0000781 for queue_entry in queue_entries:
782 if self.get_agents_for_entry(queue_entry):
783 # entry has already been recovered
784 continue
showardd3dc1992009-04-22 21:01:40 +0000785 if queue_entry.aborted:
786 queue_entry.abort(self)
787 continue
788
789 logging.info('Requeuing active QE %s (status=%s)', queue_entry,
showardb18134f2009-03-20 20:52:18 +0000790 queue_entry.status)
showard170873e2009-01-07 00:22:26 +0000791 if queue_entry.host:
792 tasks = queue_entry.host.reverify_tasks()
793 self.add_agent(Agent(tasks))
794 agent = queue_entry.requeue()
795
796
showard1ff7b2e2009-05-15 23:17:18 +0000797 def _find_reverify(self):
showard6d7b2ff2009-06-10 00:16:47 +0000798 tasks = models.SpecialTask.objects.filter(
799 task=models.SpecialTask.Task.REVERIFY, is_active=False,
800 is_complete=False)
801
802 host_ids = [str(task.host.id) for task in tasks]
803
804 if host_ids:
805 where = 'id IN (%s)' % ','.join(host_ids)
806 host_ids_reverifying = self._reverify_hosts_where(
807 where, cleanup=False)
808 tasks = tasks.filter(host__id__in=host_ids_reverifying)
809 for task in tasks:
810 task.is_active=True
811 task.save()
showard1ff7b2e2009-05-15 23:17:18 +0000812
813
showard170873e2009-01-07 00:22:26 +0000814 def _reverify_remaining_hosts(self):
showard45ae8192008-11-05 19:32:53 +0000815 # reverify hosts that were in the middle of verify, repair or cleanup
jadmanski0afbb632008-06-06 21:10:57 +0000816 self._reverify_hosts_where("""(status = 'Repairing' OR
817 status = 'Verifying' OR
showard170873e2009-01-07 00:22:26 +0000818 status = 'Cleaning')""")
jadmanski0afbb632008-06-06 21:10:57 +0000819
showard170873e2009-01-07 00:22:26 +0000820 # recover "Running" hosts with no active queue entries, although this
821 # should never happen
822 message = ('Recovering running host %s - this probably indicates a '
823 'scheduler bug')
jadmanski0afbb632008-06-06 21:10:57 +0000824 self._reverify_hosts_where("""status = 'Running' AND
825 id NOT IN (SELECT host_id
826 FROM host_queue_entries
827 WHERE active)""",
828 print_message=message)
mblighbb421852008-03-11 22:36:16 +0000829
830
jadmanski0afbb632008-06-06 21:10:57 +0000831 def _reverify_hosts_where(self, where,
showarda64e52a2009-06-08 23:24:08 +0000832 print_message='Reverifying host %s',
833 cleanup=True):
showard170873e2009-01-07 00:22:26 +0000834 full_where='locked = 0 AND invalid = 0 AND ' + where
showard6d7b2ff2009-06-10 00:16:47 +0000835 host_ids_reverifying = []
showard170873e2009-01-07 00:22:26 +0000836 for host in Host.fetch(where=full_where):
837 if self.host_has_agent(host):
838 # host has already been recovered in some way
jadmanski0afbb632008-06-06 21:10:57 +0000839 continue
showard170873e2009-01-07 00:22:26 +0000840 if print_message:
showardb18134f2009-03-20 20:52:18 +0000841 logging.info(print_message, host.hostname)
showarda64e52a2009-06-08 23:24:08 +0000842 tasks = host.reverify_tasks(cleanup)
showard170873e2009-01-07 00:22:26 +0000843 self.add_agent(Agent(tasks))
showard6d7b2ff2009-06-10 00:16:47 +0000844 host_ids_reverifying.append(host.id)
845 return host_ids_reverifying
mbligh36768f02008-02-22 18:28:33 +0000846
847
jadmanski0afbb632008-06-06 21:10:57 +0000848 def _recover_hosts(self):
849 # recover "Repair Failed" hosts
850 message = 'Reverifying dead host %s'
851 self._reverify_hosts_where("status = 'Repair Failed'",
852 print_message=message)
mbligh62ba2ed2008-04-30 17:09:25 +0000853
854
showard04c82c52008-05-29 19:38:12 +0000855
showardb95b1bd2008-08-15 18:11:04 +0000856 def _get_pending_queue_entries(self):
showard63a34772008-08-18 19:32:50 +0000857 # prioritize by job priority, then non-metahost over metahost, then FIFO
858 return list(HostQueueEntry.fetch(
showard25cbdbd2009-02-17 20:57:21 +0000859 joins='INNER JOIN jobs ON (job_id=jobs.id)',
showardac9ce222008-12-03 18:19:44 +0000860 where='NOT complete AND NOT active AND status="Queued"',
showard25cbdbd2009-02-17 20:57:21 +0000861 order_by='jobs.priority DESC, meta_host, job_id'))
mbligh36768f02008-02-22 18:28:33 +0000862
863
showard89f84db2009-03-12 20:39:13 +0000864 def _refresh_pending_queue_entries(self):
865 """
866 Lookup the pending HostQueueEntries and call our HostScheduler
867 refresh() method given that list. Return the list.
868
869 @returns A list of pending HostQueueEntries sorted in priority order.
870 """
showard63a34772008-08-18 19:32:50 +0000871 queue_entries = self._get_pending_queue_entries()
872 if not queue_entries:
showard89f84db2009-03-12 20:39:13 +0000873 return []
showardb95b1bd2008-08-15 18:11:04 +0000874
showard63a34772008-08-18 19:32:50 +0000875 self._host_scheduler.refresh(queue_entries)
showardb95b1bd2008-08-15 18:11:04 +0000876
showard89f84db2009-03-12 20:39:13 +0000877 return queue_entries
878
879
880 def _schedule_atomic_group(self, queue_entry):
881 """
882 Schedule the given queue_entry on an atomic group of hosts.
883
884 Returns immediately if there are insufficient available hosts.
885
886 Creates new HostQueueEntries based off of queue_entry for the
887 scheduled hosts and starts them all running.
888 """
889 # This is a virtual host queue entry representing an entire
890 # atomic group, find a group and schedule their hosts.
891 group_hosts = self._host_scheduler.find_eligible_atomic_group(
892 queue_entry)
893 if not group_hosts:
894 return
showardcbe6f942009-06-17 19:33:49 +0000895
896 logging.info('Expanding atomic group entry %s with hosts %s',
897 queue_entry,
898 ', '.join(host.hostname for host in group_hosts))
showard89f84db2009-03-12 20:39:13 +0000899 # The first assigned host uses the original HostQueueEntry
900 group_queue_entries = [queue_entry]
901 for assigned_host in group_hosts[1:]:
902 # Create a new HQE for every additional assigned_host.
903 new_hqe = HostQueueEntry.clone(queue_entry)
904 new_hqe.save()
905 group_queue_entries.append(new_hqe)
906 assert len(group_queue_entries) == len(group_hosts)
907 for queue_entry, host in itertools.izip(group_queue_entries,
908 group_hosts):
909 self._run_queue_entry(queue_entry, host)
910
911
912 def _schedule_new_jobs(self):
913 queue_entries = self._refresh_pending_queue_entries()
914 if not queue_entries:
915 return
916
showard63a34772008-08-18 19:32:50 +0000917 for queue_entry in queue_entries:
showard89f84db2009-03-12 20:39:13 +0000918 if (queue_entry.atomic_group_id is None or
919 queue_entry.host_id is not None):
920 assigned_host = self._host_scheduler.find_eligible_host(
921 queue_entry)
922 if assigned_host:
923 self._run_queue_entry(queue_entry, assigned_host)
924 else:
925 self._schedule_atomic_group(queue_entry)
showardb95b1bd2008-08-15 18:11:04 +0000926
927
928 def _run_queue_entry(self, queue_entry, host):
showard77182562009-06-10 00:16:05 +0000929 agent = queue_entry.run_pre_job_tasks(assigned_host=host)
930 self.add_agent(agent)
mblighd5c95802008-03-05 00:33:46 +0000931
932
jadmanski0afbb632008-06-06 21:10:57 +0000933 def _find_aborting(self):
showardd3dc1992009-04-22 21:01:40 +0000934 for entry in HostQueueEntry.fetch(where='aborted and not complete'):
935 for agent in self.get_agents_for_entry(entry):
936 agent.abort()
937 entry.abort(self)
jadmanski0afbb632008-06-06 21:10:57 +0000938
939
showard324bf812009-01-20 23:23:38 +0000940 def _can_start_agent(self, agent, num_started_this_cycle,
941 have_reached_limit):
showard4c5374f2008-09-04 17:02:56 +0000942 # always allow zero-process agents to run
943 if agent.num_processes == 0:
944 return True
945 # don't allow any nonzero-process agents to run after we've reached a
946 # limit (this avoids starvation of many-process agents)
947 if have_reached_limit:
948 return False
949 # total process throttling
showard324bf812009-01-20 23:23:38 +0000950 if agent.num_processes > _drone_manager.max_runnable_processes():
showard4c5374f2008-09-04 17:02:56 +0000951 return False
952 # if a single agent exceeds the per-cycle throttling, still allow it to
953 # run when it's the first agent in the cycle
954 if num_started_this_cycle == 0:
955 return True
956 # per-cycle throttling
957 if (num_started_this_cycle + agent.num_processes >
showardd1ee1dd2009-01-07 21:33:08 +0000958 scheduler_config.config.max_processes_started_per_cycle):
showard4c5374f2008-09-04 17:02:56 +0000959 return False
960 return True
961
962
jadmanski0afbb632008-06-06 21:10:57 +0000963 def _handle_agents(self):
jadmanski0afbb632008-06-06 21:10:57 +0000964 num_started_this_cycle = 0
showard4c5374f2008-09-04 17:02:56 +0000965 have_reached_limit = False
966 # iterate over copy, so we can remove agents during iteration
967 for agent in list(self._agents):
968 if agent.is_done():
showardb18134f2009-03-20 20:52:18 +0000969 logging.info("agent finished")
showard170873e2009-01-07 00:22:26 +0000970 self.remove_agent(agent)
showard4c5374f2008-09-04 17:02:56 +0000971 continue
972 if not agent.is_running():
showard324bf812009-01-20 23:23:38 +0000973 if not self._can_start_agent(agent, num_started_this_cycle,
showard4c5374f2008-09-04 17:02:56 +0000974 have_reached_limit):
975 have_reached_limit = True
976 continue
showard4c5374f2008-09-04 17:02:56 +0000977 num_started_this_cycle += agent.num_processes
978 agent.tick()
showarda9435c02009-05-13 21:28:17 +0000979 logging.info('%d running processes',
showardb18134f2009-03-20 20:52:18 +0000980 _drone_manager.total_running_processes())
mbligh36768f02008-02-22 18:28:33 +0000981
982
showard29f7cd22009-04-29 21:16:24 +0000983 def _process_recurring_runs(self):
984 recurring_runs = models.RecurringRun.objects.filter(
985 start_date__lte=datetime.datetime.now())
986 for rrun in recurring_runs:
987 # Create job from template
988 job = rrun.job
989 info = rpc_utils.get_job_info(job)
showarda9435c02009-05-13 21:28:17 +0000990 options = job.get_object_dict()
showard29f7cd22009-04-29 21:16:24 +0000991
992 host_objects = info['hosts']
993 one_time_hosts = info['one_time_hosts']
994 metahost_objects = info['meta_hosts']
995 dependencies = info['dependencies']
996 atomic_group = info['atomic_group']
997
998 for host in one_time_hosts or []:
999 this_host = models.Host.create_one_time_host(host.hostname)
1000 host_objects.append(this_host)
1001
1002 try:
1003 rpc_utils.create_new_job(owner=rrun.owner.login,
showarda9435c02009-05-13 21:28:17 +00001004 options=options,
showard29f7cd22009-04-29 21:16:24 +00001005 host_objects=host_objects,
1006 metahost_objects=metahost_objects,
showard29f7cd22009-04-29 21:16:24 +00001007 atomic_group=atomic_group)
1008
1009 except Exception, ex:
1010 logging.exception(ex)
1011 #TODO send email
1012
1013 if rrun.loop_count == 1:
1014 rrun.delete()
1015 else:
1016 if rrun.loop_count != 0: # if not infinite loop
1017 # calculate new start_date
1018 difference = datetime.timedelta(seconds=rrun.loop_period)
1019 rrun.start_date = rrun.start_date + difference
1020 rrun.loop_count -= 1
1021 rrun.save()
1022
1023
showard170873e2009-01-07 00:22:26 +00001024class PidfileRunMonitor(object):
1025 """
1026 Client must call either run() to start a new process or
1027 attach_to_existing_process().
1028 """
mbligh36768f02008-02-22 18:28:33 +00001029
showard170873e2009-01-07 00:22:26 +00001030 class _PidfileException(Exception):
1031 """
1032 Raised when there's some unexpected behavior with the pid file, but only
1033 used internally (never allowed to escape this class).
1034 """
mbligh36768f02008-02-22 18:28:33 +00001035
1036
showard170873e2009-01-07 00:22:26 +00001037 def __init__(self):
showard35162b02009-03-03 02:17:30 +00001038 self.lost_process = False
showard170873e2009-01-07 00:22:26 +00001039 self._start_time = None
1040 self.pidfile_id = None
1041 self._state = drone_manager.PidfileContents()
showard2bab8f42008-11-12 18:15:22 +00001042
1043
showard170873e2009-01-07 00:22:26 +00001044 def _add_nice_command(self, command, nice_level):
1045 if not nice_level:
1046 return command
1047 return ['nice', '-n', str(nice_level)] + command
1048
1049
1050 def _set_start_time(self):
1051 self._start_time = time.time()
1052
1053
1054 def run(self, command, working_directory, nice_level=None, log_file=None,
1055 pidfile_name=None, paired_with_pidfile=None):
1056 assert command is not None
1057 if nice_level is not None:
1058 command = ['nice', '-n', str(nice_level)] + command
1059 self._set_start_time()
1060 self.pidfile_id = _drone_manager.execute_command(
showardd3dc1992009-04-22 21:01:40 +00001061 command, working_directory, pidfile_name=pidfile_name,
1062 log_file=log_file, paired_with_pidfile=paired_with_pidfile)
showard170873e2009-01-07 00:22:26 +00001063
1064
showardd3dc1992009-04-22 21:01:40 +00001065 def attach_to_existing_process(self, execution_tag,
1066 pidfile_name=_AUTOSERV_PID_FILE):
showard170873e2009-01-07 00:22:26 +00001067 self._set_start_time()
showardd3dc1992009-04-22 21:01:40 +00001068 self.pidfile_id = _drone_manager.get_pidfile_id_from(
1069 execution_tag, pidfile_name=pidfile_name)
showard170873e2009-01-07 00:22:26 +00001070 _drone_manager.register_pidfile(self.pidfile_id)
mblighbb421852008-03-11 22:36:16 +00001071
1072
jadmanski0afbb632008-06-06 21:10:57 +00001073 def kill(self):
showard170873e2009-01-07 00:22:26 +00001074 if self.has_process():
1075 _drone_manager.kill_process(self.get_process())
mblighbb421852008-03-11 22:36:16 +00001076
mbligh36768f02008-02-22 18:28:33 +00001077
showard170873e2009-01-07 00:22:26 +00001078 def has_process(self):
showard21baa452008-10-21 00:08:39 +00001079 self._get_pidfile_info()
showard170873e2009-01-07 00:22:26 +00001080 return self._state.process is not None
showard21baa452008-10-21 00:08:39 +00001081
1082
showard170873e2009-01-07 00:22:26 +00001083 def get_process(self):
showard21baa452008-10-21 00:08:39 +00001084 self._get_pidfile_info()
showard35162b02009-03-03 02:17:30 +00001085 assert self._state.process is not None
showard170873e2009-01-07 00:22:26 +00001086 return self._state.process
mblighbb421852008-03-11 22:36:16 +00001087
1088
showard170873e2009-01-07 00:22:26 +00001089 def _read_pidfile(self, use_second_read=False):
1090 assert self.pidfile_id is not None, (
1091 'You must call run() or attach_to_existing_process()')
1092 contents = _drone_manager.get_pidfile_contents(
1093 self.pidfile_id, use_second_read=use_second_read)
1094 if contents.is_invalid():
1095 self._state = drone_manager.PidfileContents()
1096 raise self._PidfileException(contents)
1097 self._state = contents
mbligh90a549d2008-03-25 23:52:34 +00001098
1099
showard21baa452008-10-21 00:08:39 +00001100 def _handle_pidfile_error(self, error, message=''):
showard170873e2009-01-07 00:22:26 +00001101 message = error + '\nProcess: %s\nPidfile: %s\n%s' % (
1102 self._state.process, self.pidfile_id, message)
showardb18134f2009-03-20 20:52:18 +00001103 logging.info(message)
showard170873e2009-01-07 00:22:26 +00001104 email_manager.manager.enqueue_notify_email(error, message)
showard35162b02009-03-03 02:17:30 +00001105 self.on_lost_process(self._state.process)
showard21baa452008-10-21 00:08:39 +00001106
1107
1108 def _get_pidfile_info_helper(self):
showard35162b02009-03-03 02:17:30 +00001109 if self.lost_process:
showard21baa452008-10-21 00:08:39 +00001110 return
mblighbb421852008-03-11 22:36:16 +00001111
showard21baa452008-10-21 00:08:39 +00001112 self._read_pidfile()
mblighbb421852008-03-11 22:36:16 +00001113
showard170873e2009-01-07 00:22:26 +00001114 if self._state.process is None:
1115 self._handle_no_process()
showard21baa452008-10-21 00:08:39 +00001116 return
mbligh90a549d2008-03-25 23:52:34 +00001117
showard21baa452008-10-21 00:08:39 +00001118 if self._state.exit_status is None:
jadmanski0afbb632008-06-06 21:10:57 +00001119 # double check whether or not autoserv is running
showard170873e2009-01-07 00:22:26 +00001120 if _drone_manager.is_process_running(self._state.process):
showard21baa452008-10-21 00:08:39 +00001121 return
mbligh90a549d2008-03-25 23:52:34 +00001122
showard170873e2009-01-07 00:22:26 +00001123 # pid but no running process - maybe process *just* exited
1124 self._read_pidfile(use_second_read=True)
showard21baa452008-10-21 00:08:39 +00001125 if self._state.exit_status is None:
jadmanski0afbb632008-06-06 21:10:57 +00001126 # autoserv exited without writing an exit code
1127 # to the pidfile
showard21baa452008-10-21 00:08:39 +00001128 self._handle_pidfile_error(
1129 'autoserv died without writing exit code')
mblighbb421852008-03-11 22:36:16 +00001130
showard21baa452008-10-21 00:08:39 +00001131
1132 def _get_pidfile_info(self):
1133 """\
1134 After completion, self._state will contain:
1135 pid=None, exit_status=None if autoserv has not yet run
1136 pid!=None, exit_status=None if autoserv is running
1137 pid!=None, exit_status!=None if autoserv has completed
1138 """
1139 try:
1140 self._get_pidfile_info_helper()
showard170873e2009-01-07 00:22:26 +00001141 except self._PidfileException, exc:
showard21baa452008-10-21 00:08:39 +00001142 self._handle_pidfile_error('Pidfile error', traceback.format_exc())
mblighbb421852008-03-11 22:36:16 +00001143
1144
showard170873e2009-01-07 00:22:26 +00001145 def _handle_no_process(self):
jadmanski0afbb632008-06-06 21:10:57 +00001146 """\
1147 Called when no pidfile is found or no pid is in the pidfile.
1148 """
showard170873e2009-01-07 00:22:26 +00001149 message = 'No pid found at %s' % self.pidfile_id
showardb18134f2009-03-20 20:52:18 +00001150 logging.info(message)
showard170873e2009-01-07 00:22:26 +00001151 if time.time() - self._start_time > PIDFILE_TIMEOUT:
1152 email_manager.manager.enqueue_notify_email(
jadmanski0afbb632008-06-06 21:10:57 +00001153 'Process has failed to write pidfile', message)
showard35162b02009-03-03 02:17:30 +00001154 self.on_lost_process()
mbligh90a549d2008-03-25 23:52:34 +00001155
1156
showard35162b02009-03-03 02:17:30 +00001157 def on_lost_process(self, process=None):
jadmanski0afbb632008-06-06 21:10:57 +00001158 """\
1159 Called when autoserv has exited without writing an exit status,
1160 or we've timed out waiting for autoserv to write a pid to the
1161 pidfile. In either case, we just return failure and the caller
1162 should signal some kind of warning.
mbligh90a549d2008-03-25 23:52:34 +00001163
showard170873e2009-01-07 00:22:26 +00001164 process is unimportant here, as it shouldn't be used by anyone.
jadmanski0afbb632008-06-06 21:10:57 +00001165 """
1166 self.lost_process = True
showard170873e2009-01-07 00:22:26 +00001167 self._state.process = process
showard21baa452008-10-21 00:08:39 +00001168 self._state.exit_status = 1
1169 self._state.num_tests_failed = 0
mbligh90a549d2008-03-25 23:52:34 +00001170
1171
jadmanski0afbb632008-06-06 21:10:57 +00001172 def exit_code(self):
showard21baa452008-10-21 00:08:39 +00001173 self._get_pidfile_info()
1174 return self._state.exit_status
1175
1176
1177 def num_tests_failed(self):
1178 self._get_pidfile_info()
1179 assert self._state.num_tests_failed is not None
1180 return self._state.num_tests_failed
mblighbb421852008-03-11 22:36:16 +00001181
1182
mbligh36768f02008-02-22 18:28:33 +00001183class Agent(object):
showard77182562009-06-10 00:16:05 +00001184 """
1185 An agent for use by the Dispatcher class to perform a sequence of tasks.
1186
1187 The following methods are required on all task objects:
1188 poll() - Called periodically to let the task check its status and
1189 update its internal state. If the task succeeded.
1190 is_done() - Returns True if the task is finished.
1191 abort() - Called when an abort has been requested. The task must
1192 set its aborted attribute to True if it actually aborted.
1193
1194 The following attributes are required on all task objects:
1195 aborted - bool, True if this task was aborted.
1196 failure_tasks - A sequence of tasks to be run using a new Agent
1197 by the dispatcher should this task fail.
1198 success - bool, True if this task succeeded.
1199 queue_entry_ids - A sequence of HostQueueEntry ids this task handles.
1200 host_ids - A sequence of Host ids this task represents.
1201
1202 The following attribute is written to all task objects:
1203 agent - A reference to the Agent instance that the task has been
1204 added to.
1205 """
1206
1207
showard170873e2009-01-07 00:22:26 +00001208 def __init__(self, tasks, num_processes=1):
showard77182562009-06-10 00:16:05 +00001209 """
1210 @param tasks: A list of tasks as described in the class docstring.
1211 @param num_processes: The number of subprocesses the Agent represents.
1212 This is used by the Dispatcher for managing the load on the
1213 system. Defaults to 1.
1214 """
jadmanski0afbb632008-06-06 21:10:57 +00001215 self.active_task = None
showardd3dc1992009-04-22 21:01:40 +00001216 self.queue = None
showard77182562009-06-10 00:16:05 +00001217 # This is filled in by Dispatcher.add_agent()
jadmanski0afbb632008-06-06 21:10:57 +00001218 self.dispatcher = None
showard4c5374f2008-09-04 17:02:56 +00001219 self.num_processes = num_processes
jadmanski0afbb632008-06-06 21:10:57 +00001220
showard170873e2009-01-07 00:22:26 +00001221 self.queue_entry_ids = self._union_ids(task.queue_entry_ids
1222 for task in tasks)
1223 self.host_ids = self._union_ids(task.host_ids for task in tasks)
1224
showardd3dc1992009-04-22 21:01:40 +00001225 self._clear_queue()
jadmanski0afbb632008-06-06 21:10:57 +00001226 for task in tasks:
1227 self.add_task(task)
mbligh36768f02008-02-22 18:28:33 +00001228
1229
showardd3dc1992009-04-22 21:01:40 +00001230 def _clear_queue(self):
1231 self.queue = Queue.Queue(0)
1232
1233
showard170873e2009-01-07 00:22:26 +00001234 def _union_ids(self, id_lists):
1235 return set(itertools.chain(*id_lists))
1236
1237
jadmanski0afbb632008-06-06 21:10:57 +00001238 def add_task(self, task):
1239 self.queue.put_nowait(task)
1240 task.agent = self
mbligh36768f02008-02-22 18:28:33 +00001241
1242
jadmanski0afbb632008-06-06 21:10:57 +00001243 def tick(self):
showard21baa452008-10-21 00:08:39 +00001244 while not self.is_done():
showard08a36412009-05-05 01:01:13 +00001245 if self.active_task:
showard21baa452008-10-21 00:08:39 +00001246 self.active_task.poll()
1247 if not self.active_task.is_done():
1248 return
1249 self._next_task()
mbligh36768f02008-02-22 18:28:33 +00001250
1251
jadmanski0afbb632008-06-06 21:10:57 +00001252 def _next_task(self):
showardb18134f2009-03-20 20:52:18 +00001253 logging.info("agent picking task")
showard08a36412009-05-05 01:01:13 +00001254 if self.active_task is not None:
jadmanski0afbb632008-06-06 21:10:57 +00001255 assert self.active_task.is_done()
jadmanski0afbb632008-06-06 21:10:57 +00001256 if not self.active_task.success:
1257 self.on_task_failure()
showard08a36412009-05-05 01:01:13 +00001258 self.active_task = None
mblighe2586682008-02-29 22:45:46 +00001259
jadmanski0afbb632008-06-06 21:10:57 +00001260 if not self.is_done():
1261 self.active_task = self.queue.get_nowait()
mbligh36768f02008-02-22 18:28:33 +00001262
1263
jadmanski0afbb632008-06-06 21:10:57 +00001264 def on_task_failure(self):
showardd3dc1992009-04-22 21:01:40 +00001265 self._clear_queue()
showardccbd6c52009-03-21 00:10:21 +00001266 # run failure tasks in a new Agent, so host_ids and queue_entry_ids will
1267 # get reset.
1268 new_agent = Agent(self.active_task.failure_tasks)
1269 self.dispatcher.add_agent(new_agent)
mbligh16c722d2008-03-05 00:58:44 +00001270
mblighe2586682008-02-29 22:45:46 +00001271
showard4c5374f2008-09-04 17:02:56 +00001272 def is_running(self):
jadmanski0afbb632008-06-06 21:10:57 +00001273 return self.active_task is not None
showardec113162008-05-08 00:52:49 +00001274
1275
jadmanski0afbb632008-06-06 21:10:57 +00001276 def is_done(self):
mblighd876f452008-12-03 15:09:17 +00001277 return self.active_task is None and self.queue.empty()
mbligh36768f02008-02-22 18:28:33 +00001278
1279
showardd3dc1992009-04-22 21:01:40 +00001280 def abort(self):
showard08a36412009-05-05 01:01:13 +00001281 # abort tasks until the queue is empty or a task ignores the abort
1282 while not self.is_done():
1283 if not self.active_task:
1284 self._next_task()
showardd3dc1992009-04-22 21:01:40 +00001285 self.active_task.abort()
showard08a36412009-05-05 01:01:13 +00001286 if not self.active_task.aborted:
1287 # tasks can choose to ignore aborts
showard20f9bdd2009-04-29 19:48:33 +00001288 return
1289 self.active_task = None
1290
showardd3dc1992009-04-22 21:01:40 +00001291
showard77182562009-06-10 00:16:05 +00001292class DelayedCallTask(object):
1293 """
1294 A task object like AgentTask for an Agent to run that waits for the
1295 specified amount of time to have elapsed before calling the supplied
1296 callback once and finishing. If the callback returns anything, it is
1297 assumed to be a new Agent instance and will be added to the dispatcher.
1298
1299 @attribute end_time: The absolute posix time after which this task will
1300 call its callback when it is polled and be finished.
1301
1302 Also has all attributes required by the Agent class.
1303 """
1304 def __init__(self, delay_seconds, callback, now_func=None):
1305 """
1306 @param delay_seconds: The delay in seconds from now that this task
1307 will call the supplied callback and be done.
1308 @param callback: A callable to be called by this task once after at
1309 least delay_seconds time has elapsed. It must return None
1310 or a new Agent instance.
1311 @param now_func: A time.time like function. Default: time.time.
1312 Used for testing.
1313 """
1314 assert delay_seconds > 0
1315 assert callable(callback)
1316 if not now_func:
1317 now_func = time.time
1318 self._now_func = now_func
1319 self._callback = callback
1320
1321 self.end_time = self._now_func() + delay_seconds
1322
1323 # These attributes are required by Agent.
1324 self.aborted = False
1325 self.failure_tasks = ()
1326 self.host_ids = ()
1327 self.success = False
1328 self.queue_entry_ids = ()
1329 # This is filled in by Agent.add_task().
1330 self.agent = None
1331
1332
1333 def poll(self):
1334 if self._callback and self._now_func() >= self.end_time:
1335 new_agent = self._callback()
1336 if new_agent:
1337 self.agent.dispatcher.add_agent(new_agent)
1338 self._callback = None
1339 self.success = True
1340
1341
1342 def is_done(self):
1343 return not self._callback
1344
1345
1346 def abort(self):
1347 self.aborted = True
1348 self._callback = None
1349
1350
mbligh36768f02008-02-22 18:28:33 +00001351class AgentTask(object):
showardd3dc1992009-04-22 21:01:40 +00001352 def __init__(self, cmd, working_directory=None, failure_tasks=[],
1353 pidfile_name=None, paired_with_pidfile=None):
jadmanski0afbb632008-06-06 21:10:57 +00001354 self.done = False
1355 self.failure_tasks = failure_tasks
1356 self.started = False
1357 self.cmd = cmd
showard170873e2009-01-07 00:22:26 +00001358 self._working_directory = working_directory
jadmanski0afbb632008-06-06 21:10:57 +00001359 self.task = None
1360 self.agent = None
1361 self.monitor = None
1362 self.success = None
showardd3dc1992009-04-22 21:01:40 +00001363 self.aborted = False
showard170873e2009-01-07 00:22:26 +00001364 self.queue_entry_ids = []
1365 self.host_ids = []
1366 self.log_file = None
1367
1368
1369 def _set_ids(self, host=None, queue_entries=None):
1370 if queue_entries and queue_entries != [None]:
1371 self.host_ids = [entry.host.id for entry in queue_entries]
1372 self.queue_entry_ids = [entry.id for entry in queue_entries]
1373 else:
1374 assert host
1375 self.host_ids = [host.id]
mbligh36768f02008-02-22 18:28:33 +00001376
1377
jadmanski0afbb632008-06-06 21:10:57 +00001378 def poll(self):
showard08a36412009-05-05 01:01:13 +00001379 if not self.started:
1380 self.start()
1381 self.tick()
1382
1383
1384 def tick(self):
jadmanski0afbb632008-06-06 21:10:57 +00001385 if self.monitor:
showard08a36412009-05-05 01:01:13 +00001386 exit_code = self.monitor.exit_code()
1387 if exit_code is None:
1388 return
1389 success = (exit_code == 0)
jadmanski0afbb632008-06-06 21:10:57 +00001390 else:
1391 success = False
mbligh36768f02008-02-22 18:28:33 +00001392
jadmanski0afbb632008-06-06 21:10:57 +00001393 self.finished(success)
mbligh36768f02008-02-22 18:28:33 +00001394
1395
jadmanski0afbb632008-06-06 21:10:57 +00001396 def is_done(self):
1397 return self.done
mbligh36768f02008-02-22 18:28:33 +00001398
1399
jadmanski0afbb632008-06-06 21:10:57 +00001400 def finished(self, success):
showard08a36412009-05-05 01:01:13 +00001401 if self.done:
1402 return
jadmanski0afbb632008-06-06 21:10:57 +00001403 self.done = True
1404 self.success = success
1405 self.epilog()
mbligh36768f02008-02-22 18:28:33 +00001406
1407
jadmanski0afbb632008-06-06 21:10:57 +00001408 def prolog(self):
1409 pass
mblighd64e5702008-04-04 21:39:28 +00001410
1411
jadmanski0afbb632008-06-06 21:10:57 +00001412 def create_temp_resultsdir(self, suffix=''):
showard170873e2009-01-07 00:22:26 +00001413 self.temp_results_dir = _drone_manager.get_temporary_path('agent_task')
mblighd64e5702008-04-04 21:39:28 +00001414
mbligh36768f02008-02-22 18:28:33 +00001415
jadmanski0afbb632008-06-06 21:10:57 +00001416 def cleanup(self):
showard6b733412009-04-27 20:09:18 +00001417 if self.monitor and self.monitor.has_process() and self.log_file:
showard170873e2009-01-07 00:22:26 +00001418 _drone_manager.copy_to_results_repository(
1419 self.monitor.get_process(), self.log_file)
mbligh36768f02008-02-22 18:28:33 +00001420
1421
jadmanski0afbb632008-06-06 21:10:57 +00001422 def epilog(self):
1423 self.cleanup()
mbligh36768f02008-02-22 18:28:33 +00001424
1425
jadmanski0afbb632008-06-06 21:10:57 +00001426 def start(self):
1427 assert self.agent
1428
1429 if not self.started:
1430 self.prolog()
1431 self.run()
1432
1433 self.started = True
1434
1435
1436 def abort(self):
1437 if self.monitor:
1438 self.monitor.kill()
1439 self.done = True
showardd3dc1992009-04-22 21:01:40 +00001440 self.aborted = True
jadmanski0afbb632008-06-06 21:10:57 +00001441 self.cleanup()
1442
1443
showard170873e2009-01-07 00:22:26 +00001444 def set_host_log_file(self, base_name, host):
1445 filename = '%s.%s' % (time.time(), base_name)
1446 self.log_file = os.path.join('hosts', host.hostname, filename)
1447
1448
showardde634ee2009-01-30 01:44:24 +00001449 def _get_consistent_execution_tag(self, queue_entries):
1450 first_execution_tag = queue_entries[0].execution_tag()
1451 for queue_entry in queue_entries[1:]:
1452 assert queue_entry.execution_tag() == first_execution_tag, (
1453 '%s (%s) != %s (%s)' % (queue_entry.execution_tag(),
1454 queue_entry,
1455 first_execution_tag,
1456 queue_entries[0]))
1457 return first_execution_tag
1458
1459
showarda1e74b32009-05-12 17:32:04 +00001460 def _copy_results(self, queue_entries, use_monitor=None):
showardde634ee2009-01-30 01:44:24 +00001461 assert len(queue_entries) > 0
showard6b733412009-04-27 20:09:18 +00001462 if use_monitor is None:
1463 assert self.monitor
1464 use_monitor = self.monitor
1465 assert use_monitor.has_process()
showardde634ee2009-01-30 01:44:24 +00001466 execution_tag = self._get_consistent_execution_tag(queue_entries)
showard678df4f2009-02-04 21:36:39 +00001467 results_path = execution_tag + '/'
showard6b733412009-04-27 20:09:18 +00001468 _drone_manager.copy_to_results_repository(use_monitor.get_process(),
showard678df4f2009-02-04 21:36:39 +00001469 results_path)
showardde634ee2009-01-30 01:44:24 +00001470
showarda1e74b32009-05-12 17:32:04 +00001471
1472 def _parse_results(self, queue_entries):
showardde634ee2009-01-30 01:44:24 +00001473 reparse_task = FinalReparseTask(queue_entries)
1474 self.agent.dispatcher.add_agent(Agent([reparse_task], num_processes=0))
1475
1476
showarda1e74b32009-05-12 17:32:04 +00001477 def _copy_and_parse_results(self, queue_entries, use_monitor=None):
1478 self._copy_results(queue_entries, use_monitor)
1479 self._parse_results(queue_entries)
1480
1481
showardd3dc1992009-04-22 21:01:40 +00001482 def run(self, pidfile_name=_AUTOSERV_PID_FILE, paired_with_pidfile=None):
jadmanski0afbb632008-06-06 21:10:57 +00001483 if self.cmd:
showard170873e2009-01-07 00:22:26 +00001484 self.monitor = PidfileRunMonitor()
1485 self.monitor.run(self.cmd, self._working_directory,
1486 nice_level=AUTOSERV_NICE_LEVEL,
showardd3dc1992009-04-22 21:01:40 +00001487 log_file=self.log_file,
1488 pidfile_name=pidfile_name,
1489 paired_with_pidfile=paired_with_pidfile)
mbligh36768f02008-02-22 18:28:33 +00001490
1491
showardd9205182009-04-27 20:09:55 +00001492class TaskWithJobKeyvals(object):
1493 """AgentTask mixin providing functionality to help with job keyval files."""
1494 _KEYVAL_FILE = 'keyval'
1495 def _format_keyval(self, key, value):
1496 return '%s=%s' % (key, value)
1497
1498
1499 def _keyval_path(self):
1500 """Subclasses must override this"""
1501 raise NotImplemented
1502
1503
1504 def _write_keyval_after_job(self, field, value):
1505 assert self.monitor
1506 if not self.monitor.has_process():
1507 return
1508 _drone_manager.write_lines_to_file(
1509 self._keyval_path(), [self._format_keyval(field, value)],
1510 paired_with_process=self.monitor.get_process())
1511
1512
1513 def _job_queued_keyval(self, job):
1514 return 'job_queued', int(time.mktime(job.created_on.timetuple()))
1515
1516
1517 def _write_job_finished(self):
1518 self._write_keyval_after_job("job_finished", int(time.time()))
1519
1520
1521class RepairTask(AgentTask, TaskWithJobKeyvals):
showarde788ea62008-11-17 21:02:47 +00001522 def __init__(self, host, queue_entry=None):
jadmanski0afbb632008-06-06 21:10:57 +00001523 """\
showard170873e2009-01-07 00:22:26 +00001524 queue_entry: queue entry to mark failed if this repair fails.
jadmanski0afbb632008-06-06 21:10:57 +00001525 """
jadmanskifb7cfb12008-07-09 14:13:21 +00001526 protection = host_protections.Protection.get_string(host.protection)
jadmanski542537f2008-07-24 14:14:56 +00001527 # normalize the protection name
1528 protection = host_protections.Protection.get_attr_name(protection)
showard170873e2009-01-07 00:22:26 +00001529
jadmanski0afbb632008-06-06 21:10:57 +00001530 self.host = host
showardccbd6c52009-03-21 00:10:21 +00001531 self.queue_entry_to_fail = queue_entry
1532 # *don't* include the queue entry in IDs -- if the queue entry is
1533 # aborted, we want to leave the repair task running
1534 self._set_ids(host=host)
showard170873e2009-01-07 00:22:26 +00001535
1536 self.create_temp_resultsdir('.repair')
showard87ba02a2009-04-20 19:37:32 +00001537 cmd = _autoserv_command_line(host.hostname, self.temp_results_dir,
1538 ['-R', '--host-protection', protection],
1539 queue_entry=queue_entry)
showard170873e2009-01-07 00:22:26 +00001540 super(RepairTask, self).__init__(cmd, self.temp_results_dir)
1541
showard170873e2009-01-07 00:22:26 +00001542 self.set_host_log_file('repair', self.host)
mblighe2586682008-02-29 22:45:46 +00001543
mbligh36768f02008-02-22 18:28:33 +00001544
jadmanski0afbb632008-06-06 21:10:57 +00001545 def prolog(self):
showardb18134f2009-03-20 20:52:18 +00001546 logging.info("repair_task starting")
jadmanski0afbb632008-06-06 21:10:57 +00001547 self.host.set_status('Repairing')
showardccbd6c52009-03-21 00:10:21 +00001548 if self.queue_entry_to_fail:
1549 self.queue_entry_to_fail.requeue()
mbligh36768f02008-02-22 18:28:33 +00001550
1551
showardd9205182009-04-27 20:09:55 +00001552 def _keyval_path(self):
1553 return os.path.join(self.temp_results_dir, self._KEYVAL_FILE)
1554
1555
showardde634ee2009-01-30 01:44:24 +00001556 def _fail_queue_entry(self):
showardccbd6c52009-03-21 00:10:21 +00001557 assert self.queue_entry_to_fail
1558
1559 if self.queue_entry_to_fail.meta_host:
1560 return # don't fail metahost entries, they'll be reassigned
1561
1562 self.queue_entry_to_fail.update_from_database()
1563 if self.queue_entry_to_fail.status != 'Queued':
1564 return # entry has been aborted
1565
1566 self.queue_entry_to_fail.set_execution_subdir()
showardd9205182009-04-27 20:09:55 +00001567 queued_key, queued_time = self._job_queued_keyval(
1568 self.queue_entry_to_fail.job)
1569 self._write_keyval_after_job(queued_key, queued_time)
1570 self._write_job_finished()
showard678df4f2009-02-04 21:36:39 +00001571 # copy results logs into the normal place for job results
1572 _drone_manager.copy_results_on_drone(
1573 self.monitor.get_process(),
1574 source_path=self.temp_results_dir + '/',
showardccbd6c52009-03-21 00:10:21 +00001575 destination_path=self.queue_entry_to_fail.execution_tag() + '/')
showard678df4f2009-02-04 21:36:39 +00001576
showarda1e74b32009-05-12 17:32:04 +00001577 self._copy_results([self.queue_entry_to_fail])
1578 if self.queue_entry_to_fail.job.parse_failed_repair:
1579 self._parse_results([self.queue_entry_to_fail])
showardccbd6c52009-03-21 00:10:21 +00001580 self.queue_entry_to_fail.handle_host_failure()
showardde634ee2009-01-30 01:44:24 +00001581
1582
jadmanski0afbb632008-06-06 21:10:57 +00001583 def epilog(self):
1584 super(RepairTask, self).epilog()
showard6d7b2ff2009-06-10 00:16:47 +00001585
1586 tasks = models.SpecialTask.objects.filter(host__id=self.host.id,
1587 is_active=True)
1588 for task in tasks:
1589 task.is_complete = True
1590 task.save()
1591
jadmanski0afbb632008-06-06 21:10:57 +00001592 if self.success:
1593 self.host.set_status('Ready')
1594 else:
1595 self.host.set_status('Repair Failed')
showardccbd6c52009-03-21 00:10:21 +00001596 if self.queue_entry_to_fail:
showardde634ee2009-01-30 01:44:24 +00001597 self._fail_queue_entry()
mbligh36768f02008-02-22 18:28:33 +00001598
1599
showard8fe93b52008-11-18 17:53:22 +00001600class PreJobTask(AgentTask):
showard170873e2009-01-07 00:22:26 +00001601 def epilog(self):
1602 super(PreJobTask, self).epilog()
showard8fe93b52008-11-18 17:53:22 +00001603 should_copy_results = (self.queue_entry and not self.success
1604 and not self.queue_entry.meta_host)
1605 if should_copy_results:
1606 self.queue_entry.set_execution_subdir()
showard170873e2009-01-07 00:22:26 +00001607 destination = os.path.join(self.queue_entry.execution_tag(),
1608 os.path.basename(self.log_file))
1609 _drone_manager.copy_to_results_repository(
1610 self.monitor.get_process(), self.log_file,
1611 destination_path=destination)
showard8fe93b52008-11-18 17:53:22 +00001612
1613
1614class VerifyTask(PreJobTask):
showard9976ce92008-10-15 20:28:13 +00001615 def __init__(self, queue_entry=None, host=None):
jadmanski0afbb632008-06-06 21:10:57 +00001616 assert bool(queue_entry) != bool(host)
jadmanski0afbb632008-06-06 21:10:57 +00001617 self.host = host or queue_entry.host
1618 self.queue_entry = queue_entry
mbligh36768f02008-02-22 18:28:33 +00001619
jadmanski0afbb632008-06-06 21:10:57 +00001620 self.create_temp_resultsdir('.verify')
showard87ba02a2009-04-20 19:37:32 +00001621 cmd = _autoserv_command_line(self.host.hostname, self.temp_results_dir,
1622 ['-v'], queue_entry=queue_entry)
showarde788ea62008-11-17 21:02:47 +00001623 failure_tasks = [RepairTask(self.host, queue_entry=queue_entry)]
showard170873e2009-01-07 00:22:26 +00001624 super(VerifyTask, self).__init__(cmd, self.temp_results_dir,
1625 failure_tasks=failure_tasks)
mblighe2586682008-02-29 22:45:46 +00001626
showard170873e2009-01-07 00:22:26 +00001627 self.set_host_log_file('verify', self.host)
1628 self._set_ids(host=host, queue_entries=[queue_entry])
mblighe2586682008-02-29 22:45:46 +00001629
1630
jadmanski0afbb632008-06-06 21:10:57 +00001631 def prolog(self):
showard8fe93b52008-11-18 17:53:22 +00001632 super(VerifyTask, self).prolog()
showardb18134f2009-03-20 20:52:18 +00001633 logging.info("starting verify on %s", self.host.hostname)
jadmanski0afbb632008-06-06 21:10:57 +00001634 if self.queue_entry:
1635 self.queue_entry.set_status('Verifying')
jadmanski0afbb632008-06-06 21:10:57 +00001636 self.host.set_status('Verifying')
mbligh36768f02008-02-22 18:28:33 +00001637
1638
jadmanski0afbb632008-06-06 21:10:57 +00001639 def epilog(self):
1640 super(VerifyTask, self).epilog()
mbligh36768f02008-02-22 18:28:33 +00001641
jadmanski0afbb632008-06-06 21:10:57 +00001642 if self.success:
showard6d7b2ff2009-06-10 00:16:47 +00001643 tasks = models.SpecialTask.objects.filter(host__id=self.host.id,
1644 is_active=True)
1645 for task in tasks:
1646 task.is_complete=True
1647 task.save()
1648
jadmanski0afbb632008-06-06 21:10:57 +00001649 self.host.set_status('Ready')
mbligh36768f02008-02-22 18:28:33 +00001650
1651
showardd9205182009-04-27 20:09:55 +00001652class QueueTask(AgentTask, TaskWithJobKeyvals):
showardf1ae3542009-05-11 19:26:02 +00001653 def __init__(self, job, queue_entries, cmd, group_name=''):
jadmanski0afbb632008-06-06 21:10:57 +00001654 self.job = job
1655 self.queue_entries = queue_entries
showardf1ae3542009-05-11 19:26:02 +00001656 self.group_name = group_name
showard170873e2009-01-07 00:22:26 +00001657 super(QueueTask, self).__init__(cmd, self._execution_tag())
1658 self._set_ids(queue_entries=queue_entries)
mbligh36768f02008-02-22 18:28:33 +00001659
1660
showard73ec0442009-02-07 02:05:20 +00001661 def _keyval_path(self):
showardd9205182009-04-27 20:09:55 +00001662 return os.path.join(self._execution_tag(), self._KEYVAL_FILE)
showard73ec0442009-02-07 02:05:20 +00001663
1664
1665 def _write_keyvals_before_job_helper(self, keyval_dict, keyval_path):
1666 keyval_contents = '\n'.join(self._format_keyval(key, value)
1667 for key, value in keyval_dict.iteritems())
1668 # always end with a newline to allow additional keyvals to be written
1669 keyval_contents += '\n'
1670 _drone_manager.attach_file_to_execution(self._execution_tag(),
1671 keyval_contents,
1672 file_path=keyval_path)
1673
1674
1675 def _write_keyvals_before_job(self, keyval_dict):
1676 self._write_keyvals_before_job_helper(keyval_dict, self._keyval_path())
1677
1678
showard170873e2009-01-07 00:22:26 +00001679 def _write_host_keyvals(self, host):
1680 keyval_path = os.path.join(self._execution_tag(), 'host_keyvals',
1681 host.hostname)
1682 platform, all_labels = host.platform_and_labels()
showard73ec0442009-02-07 02:05:20 +00001683 keyval_dict = dict(platform=platform, labels=','.join(all_labels))
1684 self._write_keyvals_before_job_helper(keyval_dict, keyval_path)
showardd8e548a2008-09-09 03:04:57 +00001685
1686
showard170873e2009-01-07 00:22:26 +00001687 def _execution_tag(self):
1688 return self.queue_entries[0].execution_tag()
mblighbb421852008-03-11 22:36:16 +00001689
1690
jadmanski0afbb632008-06-06 21:10:57 +00001691 def prolog(self):
showardd9205182009-04-27 20:09:55 +00001692 queued_key, queued_time = self._job_queued_keyval(self.job)
showardf1ae3542009-05-11 19:26:02 +00001693 keyval_dict = {queued_key: queued_time}
1694 if self.group_name:
1695 keyval_dict['host_group_name'] = self.group_name
1696 self._write_keyvals_before_job(keyval_dict)
jadmanski0afbb632008-06-06 21:10:57 +00001697 for queue_entry in self.queue_entries:
showard170873e2009-01-07 00:22:26 +00001698 self._write_host_keyvals(queue_entry.host)
jadmanski0afbb632008-06-06 21:10:57 +00001699 queue_entry.set_status('Running')
showard12f3e322009-05-13 21:27:42 +00001700 queue_entry.update_field('started_on', datetime.datetime.now())
jadmanski0afbb632008-06-06 21:10:57 +00001701 queue_entry.host.set_status('Running')
showard21baa452008-10-21 00:08:39 +00001702 queue_entry.host.update_field('dirty', 1)
showard2bab8f42008-11-12 18:15:22 +00001703 if self.job.synch_count == 1:
jadmanski0afbb632008-06-06 21:10:57 +00001704 assert len(self.queue_entries) == 1
1705 self.job.write_to_machines_file(self.queue_entries[0])
mbligh36768f02008-02-22 18:28:33 +00001706
1707
showard35162b02009-03-03 02:17:30 +00001708 def _write_lost_process_error_file(self):
1709 error_file_path = os.path.join(self._execution_tag(), 'job_failure')
1710 _drone_manager.write_lines_to_file(error_file_path,
1711 [_LOST_PROCESS_ERROR])
1712
1713
showardd3dc1992009-04-22 21:01:40 +00001714 def _finish_task(self):
showard08a36412009-05-05 01:01:13 +00001715 if not self.monitor:
1716 return
1717
showardd9205182009-04-27 20:09:55 +00001718 self._write_job_finished()
1719
showardd3dc1992009-04-22 21:01:40 +00001720 # both of these conditionals can be true, iff the process ran, wrote a
1721 # pid to its pidfile, and then exited without writing an exit code
showard35162b02009-03-03 02:17:30 +00001722 if self.monitor.has_process():
showardd3dc1992009-04-22 21:01:40 +00001723 gather_task = GatherLogsTask(self.job, self.queue_entries)
1724 self.agent.dispatcher.add_agent(Agent(tasks=[gather_task]))
showard35162b02009-03-03 02:17:30 +00001725
1726 if self.monitor.lost_process:
1727 self._write_lost_process_error_file()
1728 for queue_entry in self.queue_entries:
1729 queue_entry.set_status(models.HostQueueEntry.Status.FAILED)
jadmanskif7fa2cc2008-10-01 14:13:23 +00001730
1731
showardcbd74612008-11-19 21:42:02 +00001732 def _write_status_comment(self, comment):
showard170873e2009-01-07 00:22:26 +00001733 _drone_manager.write_lines_to_file(
1734 os.path.join(self._execution_tag(), 'status.log'),
1735 ['INFO\t----\t----\t' + comment],
showard35162b02009-03-03 02:17:30 +00001736 paired_with_process=self.monitor.get_process())
showardcbd74612008-11-19 21:42:02 +00001737
1738
jadmanskif7fa2cc2008-10-01 14:13:23 +00001739 def _log_abort(self):
showard170873e2009-01-07 00:22:26 +00001740 if not self.monitor or not self.monitor.has_process():
1741 return
1742
jadmanskif7fa2cc2008-10-01 14:13:23 +00001743 # build up sets of all the aborted_by and aborted_on values
1744 aborted_by, aborted_on = set(), set()
1745 for queue_entry in self.queue_entries:
1746 if queue_entry.aborted_by:
1747 aborted_by.add(queue_entry.aborted_by)
1748 t = int(time.mktime(queue_entry.aborted_on.timetuple()))
1749 aborted_on.add(t)
1750
1751 # extract some actual, unique aborted by value and write it out
1752 assert len(aborted_by) <= 1
1753 if len(aborted_by) == 1:
showardcbd74612008-11-19 21:42:02 +00001754 aborted_by_value = aborted_by.pop()
1755 aborted_on_value = max(aborted_on)
1756 else:
1757 aborted_by_value = 'autotest_system'
1758 aborted_on_value = int(time.time())
showard170873e2009-01-07 00:22:26 +00001759
showarda0382352009-02-11 23:36:43 +00001760 self._write_keyval_after_job("aborted_by", aborted_by_value)
1761 self._write_keyval_after_job("aborted_on", aborted_on_value)
showard170873e2009-01-07 00:22:26 +00001762
showardcbd74612008-11-19 21:42:02 +00001763 aborted_on_string = str(datetime.datetime.fromtimestamp(
1764 aborted_on_value))
1765 self._write_status_comment('Job aborted by %s on %s' %
1766 (aborted_by_value, aborted_on_string))
jadmanskic2ac77f2008-05-16 21:44:04 +00001767
1768
jadmanski0afbb632008-06-06 21:10:57 +00001769 def abort(self):
1770 super(QueueTask, self).abort()
jadmanskif7fa2cc2008-10-01 14:13:23 +00001771 self._log_abort()
showardd3dc1992009-04-22 21:01:40 +00001772 self._finish_task()
showard21baa452008-10-21 00:08:39 +00001773
1774
jadmanski0afbb632008-06-06 21:10:57 +00001775 def epilog(self):
1776 super(QueueTask, self).epilog()
showardd3dc1992009-04-22 21:01:40 +00001777 self._finish_task()
1778 logging.info("queue_task finished with success=%s", self.success)
mbligh36768f02008-02-22 18:28:33 +00001779
1780
mblighbb421852008-03-11 22:36:16 +00001781class RecoveryQueueTask(QueueTask):
jadmanski0afbb632008-06-06 21:10:57 +00001782 def __init__(self, job, queue_entries, run_monitor):
showard170873e2009-01-07 00:22:26 +00001783 super(RecoveryQueueTask, self).__init__(job, queue_entries, cmd=None)
showard5add1c82009-05-26 19:27:46 +00001784 self.monitor = run_monitor
1785 self.started = True
1786 # since we set started=True here, prolog() and run() shouldn't be called
mblighbb421852008-03-11 22:36:16 +00001787
1788
jadmanski0afbb632008-06-06 21:10:57 +00001789 def run(self):
showard5add1c82009-05-26 19:27:46 +00001790 raise NotImplemented('This should never be called')
mblighbb421852008-03-11 22:36:16 +00001791
1792
jadmanski0afbb632008-06-06 21:10:57 +00001793 def prolog(self):
showard5add1c82009-05-26 19:27:46 +00001794 raise NotImplemented('This should never be called')
mblighbb421852008-03-11 22:36:16 +00001795
1796
showardd3dc1992009-04-22 21:01:40 +00001797class PostJobTask(AgentTask):
1798 def __init__(self, queue_entries, pidfile_name, logfile_name,
1799 run_monitor=None):
1800 """
1801 If run_monitor != None, we're recovering a running task.
1802 """
1803 self._queue_entries = queue_entries
1804 self._pidfile_name = pidfile_name
showardd3dc1992009-04-22 21:01:40 +00001805
1806 self._execution_tag = self._get_consistent_execution_tag(queue_entries)
1807 self._results_dir = _drone_manager.absolute_path(self._execution_tag)
1808 self._autoserv_monitor = PidfileRunMonitor()
1809 self._autoserv_monitor.attach_to_existing_process(self._execution_tag)
1810 self._paired_with_pidfile = self._autoserv_monitor.pidfile_id
1811
1812 if _testing_mode:
1813 command = 'true'
1814 else:
1815 command = self._generate_command(self._results_dir)
1816
1817 super(PostJobTask, self).__init__(cmd=command,
1818 working_directory=self._execution_tag)
showard5add1c82009-05-26 19:27:46 +00001819 # this must happen *after* the super call
1820 self.monitor = run_monitor
1821 if run_monitor:
1822 self.started = True
showardd3dc1992009-04-22 21:01:40 +00001823
1824 self.log_file = os.path.join(self._execution_tag, logfile_name)
1825 self._final_status = self._determine_final_status()
1826
1827
1828 def _generate_command(self, results_dir):
1829 raise NotImplementedError('Subclasses must override this')
1830
1831
1832 def _job_was_aborted(self):
1833 was_aborted = None
1834 for queue_entry in self._queue_entries:
1835 queue_entry.update_from_database()
1836 if was_aborted is None: # first queue entry
1837 was_aborted = bool(queue_entry.aborted)
1838 elif was_aborted != bool(queue_entry.aborted): # subsequent entries
1839 email_manager.manager.enqueue_notify_email(
1840 'Inconsistent abort state',
1841 'Queue entries have inconsistent abort state: ' +
1842 ', '.join('%s (%s)' % (queue_entry, queue_entry.aborted)))
1843 # don't crash here, just assume true
1844 return True
1845 return was_aborted
1846
1847
1848 def _determine_final_status(self):
1849 if self._job_was_aborted():
1850 return models.HostQueueEntry.Status.ABORTED
1851
1852 # we'll use a PidfileRunMonitor to read the autoserv exit status
1853 if self._autoserv_monitor.exit_code() == 0:
1854 return models.HostQueueEntry.Status.COMPLETED
1855 return models.HostQueueEntry.Status.FAILED
1856
1857
1858 def run(self):
showard5add1c82009-05-26 19:27:46 +00001859 assert not self.monitor
showardd3dc1992009-04-22 21:01:40 +00001860
showard5add1c82009-05-26 19:27:46 +00001861 # make sure we actually have results to work with.
1862 # this should never happen in normal operation.
1863 if not self._autoserv_monitor.has_process():
1864 email_manager.manager.enqueue_notify_email(
1865 'No results in post-job task',
1866 'No results in post-job task at %s' %
1867 self._autoserv_monitor.pidfile_id)
1868 self.finished(False)
1869 return
1870
1871 super(PostJobTask, self).run(
1872 pidfile_name=self._pidfile_name,
1873 paired_with_pidfile=self._paired_with_pidfile)
showardd3dc1992009-04-22 21:01:40 +00001874
1875
1876 def _set_all_statuses(self, status):
1877 for queue_entry in self._queue_entries:
1878 queue_entry.set_status(status)
1879
1880
1881 def abort(self):
1882 # override AgentTask.abort() to avoid killing the process and ending
1883 # the task. post-job tasks continue when the job is aborted.
1884 pass
1885
1886
1887class GatherLogsTask(PostJobTask):
1888 """
1889 Task responsible for
1890 * gathering uncollected logs (if Autoserv crashed hard or was killed)
1891 * copying logs to the results repository
1892 * spawning CleanupTasks for hosts, if necessary
1893 * spawning a FinalReparseTask for the job
1894 """
1895 def __init__(self, job, queue_entries, run_monitor=None):
1896 self._job = job
1897 super(GatherLogsTask, self).__init__(
1898 queue_entries, pidfile_name=_CRASHINFO_PID_FILE,
1899 logfile_name='.collect_crashinfo.log', run_monitor=run_monitor)
1900 self._set_ids(queue_entries=queue_entries)
1901
1902
1903 def _generate_command(self, results_dir):
1904 host_list = ','.join(queue_entry.host.hostname
1905 for queue_entry in self._queue_entries)
1906 return [_autoserv_path , '-p', '--collect-crashinfo', '-m', host_list,
1907 '-r', results_dir]
1908
1909
1910 def prolog(self):
1911 super(GatherLogsTask, self).prolog()
1912 self._set_all_statuses(models.HostQueueEntry.Status.GATHERING)
1913
1914
1915 def _reboot_hosts(self):
1916 reboot_after = self._job.reboot_after
1917 do_reboot = False
showard6b733412009-04-27 20:09:18 +00001918 if self._final_status == models.HostQueueEntry.Status.ABORTED:
1919 do_reboot = True
1920 elif reboot_after == models.RebootAfter.ALWAYS:
showardd3dc1992009-04-22 21:01:40 +00001921 do_reboot = True
1922 elif reboot_after == models.RebootAfter.IF_ALL_TESTS_PASSED:
1923 final_success = (
1924 self._final_status == models.HostQueueEntry.Status.COMPLETED)
1925 num_tests_failed = self._autoserv_monitor.num_tests_failed()
1926 do_reboot = (final_success and num_tests_failed == 0)
1927
1928 for queue_entry in self._queue_entries:
1929 if do_reboot:
1930 # don't pass the queue entry to the CleanupTask. if the cleanup
1931 # fails, the job doesn't care -- it's over.
1932 cleanup_task = CleanupTask(host=queue_entry.host)
1933 self.agent.dispatcher.add_agent(Agent([cleanup_task]))
1934 else:
1935 queue_entry.host.set_status('Ready')
1936
1937
1938 def epilog(self):
1939 super(GatherLogsTask, self).epilog()
showardebc0fb72009-05-13 21:28:07 +00001940 if self._autoserv_monitor.has_process():
1941 self._copy_and_parse_results(self._queue_entries,
1942 use_monitor=self._autoserv_monitor)
showardd3dc1992009-04-22 21:01:40 +00001943 self._reboot_hosts()
1944
1945
showard0bbfc212009-04-29 21:06:13 +00001946 def run(self):
showard597bfd32009-05-08 18:22:50 +00001947 autoserv_exit_code = self._autoserv_monitor.exit_code()
1948 # only run if Autoserv exited due to some signal. if we have no exit
1949 # code, assume something bad (and signal-like) happened.
1950 if autoserv_exit_code is None or os.WIFSIGNALED(autoserv_exit_code):
showard0bbfc212009-04-29 21:06:13 +00001951 super(GatherLogsTask, self).run()
showard597bfd32009-05-08 18:22:50 +00001952 else:
1953 self.finished(True)
showard0bbfc212009-04-29 21:06:13 +00001954
1955
showard8fe93b52008-11-18 17:53:22 +00001956class CleanupTask(PreJobTask):
showardfa8629c2008-11-04 16:51:23 +00001957 def __init__(self, host=None, queue_entry=None):
1958 assert bool(host) ^ bool(queue_entry)
1959 if queue_entry:
1960 host = queue_entry.get_host()
showardfa8629c2008-11-04 16:51:23 +00001961 self.queue_entry = queue_entry
jadmanski0afbb632008-06-06 21:10:57 +00001962 self.host = host
showard170873e2009-01-07 00:22:26 +00001963
1964 self.create_temp_resultsdir('.cleanup')
showard87ba02a2009-04-20 19:37:32 +00001965 self.cmd = _autoserv_command_line(host.hostname, self.temp_results_dir,
1966 ['--cleanup'],
1967 queue_entry=queue_entry)
showarde788ea62008-11-17 21:02:47 +00001968 repair_task = RepairTask(host, queue_entry=queue_entry)
showard170873e2009-01-07 00:22:26 +00001969 super(CleanupTask, self).__init__(self.cmd, self.temp_results_dir,
1970 failure_tasks=[repair_task])
1971
1972 self._set_ids(host=host, queue_entries=[queue_entry])
1973 self.set_host_log_file('cleanup', self.host)
mbligh16c722d2008-03-05 00:58:44 +00001974
mblighd5c95802008-03-05 00:33:46 +00001975
jadmanski0afbb632008-06-06 21:10:57 +00001976 def prolog(self):
showard8fe93b52008-11-18 17:53:22 +00001977 super(CleanupTask, self).prolog()
showardb18134f2009-03-20 20:52:18 +00001978 logging.info("starting cleanup task for host: %s", self.host.hostname)
showard45ae8192008-11-05 19:32:53 +00001979 self.host.set_status("Cleaning")
mblighd5c95802008-03-05 00:33:46 +00001980
mblighd5c95802008-03-05 00:33:46 +00001981
showard21baa452008-10-21 00:08:39 +00001982 def epilog(self):
showard45ae8192008-11-05 19:32:53 +00001983 super(CleanupTask, self).epilog()
showard21baa452008-10-21 00:08:39 +00001984 if self.success:
showardfa8629c2008-11-04 16:51:23 +00001985 self.host.set_status('Ready')
showard21baa452008-10-21 00:08:39 +00001986 self.host.update_field('dirty', 0)
1987
1988
showardd3dc1992009-04-22 21:01:40 +00001989class FinalReparseTask(PostJobTask):
showard97aed502008-11-04 02:01:24 +00001990 _num_running_parses = 0
1991
showardd3dc1992009-04-22 21:01:40 +00001992 def __init__(self, queue_entries, run_monitor=None):
1993 super(FinalReparseTask, self).__init__(queue_entries,
1994 pidfile_name=_PARSER_PID_FILE,
1995 logfile_name='.parse.log',
1996 run_monitor=run_monitor)
showard170873e2009-01-07 00:22:26 +00001997 # don't use _set_ids, since we don't want to set the host_ids
1998 self.queue_entry_ids = [entry.id for entry in queue_entries]
showard5add1c82009-05-26 19:27:46 +00001999 self._parse_started = (run_monitor is not None)
showard97aed502008-11-04 02:01:24 +00002000
showard97aed502008-11-04 02:01:24 +00002001
2002 @classmethod
2003 def _increment_running_parses(cls):
2004 cls._num_running_parses += 1
2005
2006
2007 @classmethod
2008 def _decrement_running_parses(cls):
2009 cls._num_running_parses -= 1
2010
2011
2012 @classmethod
2013 def _can_run_new_parse(cls):
showardd1ee1dd2009-01-07 21:33:08 +00002014 return (cls._num_running_parses <
2015 scheduler_config.config.max_parse_processes)
showard97aed502008-11-04 02:01:24 +00002016
2017
2018 def prolog(self):
2019 super(FinalReparseTask, self).prolog()
showardd3dc1992009-04-22 21:01:40 +00002020 self._set_all_statuses(models.HostQueueEntry.Status.PARSING)
showard97aed502008-11-04 02:01:24 +00002021
2022
2023 def epilog(self):
2024 super(FinalReparseTask, self).epilog()
showardd3dc1992009-04-22 21:01:40 +00002025 self._set_all_statuses(self._final_status)
showard97aed502008-11-04 02:01:24 +00002026
2027
showardd3dc1992009-04-22 21:01:40 +00002028 def _generate_command(self, results_dir):
mbligh9e936402009-05-13 20:42:17 +00002029 return [_parser_path, '--write-pidfile', '-l', '2', '-r', '-o', '-P',
showardd3dc1992009-04-22 21:01:40 +00002030 results_dir]
showard97aed502008-11-04 02:01:24 +00002031
2032
showard08a36412009-05-05 01:01:13 +00002033 def tick(self):
2034 # override tick to keep trying to start until the parse count goes down
showard97aed502008-11-04 02:01:24 +00002035 # and we can, at which point we revert to default behavior
2036 if self._parse_started:
showard08a36412009-05-05 01:01:13 +00002037 super(FinalReparseTask, self).tick()
showard97aed502008-11-04 02:01:24 +00002038 else:
2039 self._try_starting_parse()
2040
2041
2042 def run(self):
2043 # override run() to not actually run unless we can
2044 self._try_starting_parse()
2045
2046
2047 def _try_starting_parse(self):
2048 if not self._can_run_new_parse():
2049 return
showard170873e2009-01-07 00:22:26 +00002050
showard97aed502008-11-04 02:01:24 +00002051 # actually run the parse command
showardd3dc1992009-04-22 21:01:40 +00002052 super(FinalReparseTask, self).run()
showard170873e2009-01-07 00:22:26 +00002053
showard97aed502008-11-04 02:01:24 +00002054 self._increment_running_parses()
2055 self._parse_started = True
2056
2057
2058 def finished(self, success):
2059 super(FinalReparseTask, self).finished(success)
showard678df4f2009-02-04 21:36:39 +00002060 if self._parse_started:
2061 self._decrement_running_parses()
showard97aed502008-11-04 02:01:24 +00002062
2063
showardc9ae1782009-01-30 01:42:37 +00002064class SetEntryPendingTask(AgentTask):
2065 def __init__(self, queue_entry):
2066 super(SetEntryPendingTask, self).__init__(cmd='')
2067 self._queue_entry = queue_entry
2068 self._set_ids(queue_entries=[queue_entry])
2069
2070
2071 def run(self):
2072 agent = self._queue_entry.on_pending()
2073 if agent:
2074 self.agent.dispatcher.add_agent(agent)
2075 self.finished(True)
2076
2077
showarda3c58572009-03-12 20:36:59 +00002078class DBError(Exception):
2079 """Raised by the DBObject constructor when its select fails."""
2080
2081
mbligh36768f02008-02-22 18:28:33 +00002082class DBObject(object):
showarda3c58572009-03-12 20:36:59 +00002083 """A miniature object relational model for the database."""
showard6ae5ea92009-02-25 00:11:51 +00002084
2085 # Subclasses MUST override these:
2086 _table_name = ''
2087 _fields = ()
2088
showarda3c58572009-03-12 20:36:59 +00002089 # A mapping from (type, id) to the instance of the object for that
2090 # particular id. This prevents us from creating new Job() and Host()
2091 # instances for every HostQueueEntry object that we instantiate as
2092 # multiple HQEs often share the same Job.
2093 _instances_by_type_and_id = weakref.WeakValueDictionary()
2094 _initialized = False
showard6ae5ea92009-02-25 00:11:51 +00002095
showarda3c58572009-03-12 20:36:59 +00002096
2097 def __new__(cls, id=None, **kwargs):
2098 """
2099 Look to see if we already have an instance for this particular type
2100 and id. If so, use it instead of creating a duplicate instance.
2101 """
2102 if id is not None:
2103 instance = cls._instances_by_type_and_id.get((cls, id))
2104 if instance:
2105 return instance
2106 return super(DBObject, cls).__new__(cls, id=id, **kwargs)
2107
2108
2109 def __init__(self, id=None, row=None, new_record=False, always_query=True):
jadmanski0afbb632008-06-06 21:10:57 +00002110 assert (bool(id) != bool(row))
showard6ae5ea92009-02-25 00:11:51 +00002111 assert self._table_name, '_table_name must be defined in your class'
2112 assert self._fields, '_fields must be defined in your class'
showarda3c58572009-03-12 20:36:59 +00002113 if not new_record:
2114 if self._initialized and not always_query:
2115 return # We've already been initialized.
2116 if id is None:
2117 id = row[0]
2118 # Tell future constructors to use us instead of re-querying while
2119 # this instance is still around.
2120 self._instances_by_type_and_id[(type(self), id)] = self
mbligh36768f02008-02-22 18:28:33 +00002121
showard6ae5ea92009-02-25 00:11:51 +00002122 self.__table = self._table_name
mbligh36768f02008-02-22 18:28:33 +00002123
jadmanski0afbb632008-06-06 21:10:57 +00002124 self.__new_record = new_record
mbligh36768f02008-02-22 18:28:33 +00002125
jadmanski0afbb632008-06-06 21:10:57 +00002126 if row is None:
showardccbd6c52009-03-21 00:10:21 +00002127 row = self._fetch_row_from_db(id)
mbligh36768f02008-02-22 18:28:33 +00002128
showarda3c58572009-03-12 20:36:59 +00002129 if self._initialized:
2130 differences = self._compare_fields_in_row(row)
2131 if differences:
showard7629f142009-03-27 21:02:02 +00002132 logging.warn(
2133 'initialized %s %s instance requery is updating: %s',
2134 type(self), self.id, differences)
showard2bab8f42008-11-12 18:15:22 +00002135 self._update_fields_from_row(row)
showarda3c58572009-03-12 20:36:59 +00002136 self._initialized = True
2137
2138
2139 @classmethod
2140 def _clear_instance_cache(cls):
2141 """Used for testing, clear the internal instance cache."""
2142 cls._instances_by_type_and_id.clear()
2143
2144
showardccbd6c52009-03-21 00:10:21 +00002145 def _fetch_row_from_db(self, row_id):
2146 sql = 'SELECT * FROM %s WHERE ID=%%s' % self.__table
2147 rows = _db.execute(sql, (row_id,))
2148 if not rows:
showard76e29d12009-04-15 21:53:10 +00002149 raise DBError("row not found (table=%s, row id=%s)"
2150 % (self.__table, row_id))
showardccbd6c52009-03-21 00:10:21 +00002151 return rows[0]
2152
2153
showarda3c58572009-03-12 20:36:59 +00002154 def _assert_row_length(self, row):
2155 assert len(row) == len(self._fields), (
2156 "table = %s, row = %s/%d, fields = %s/%d" % (
2157 self.__table, row, len(row), self._fields, len(self._fields)))
2158
2159
2160 def _compare_fields_in_row(self, row):
2161 """
2162 Given a row as returned by a SELECT query, compare it to our existing
2163 in memory fields.
2164
2165 @param row - A sequence of values corresponding to fields named in
2166 The class attribute _fields.
2167
2168 @returns A dictionary listing the differences keyed by field name
2169 containing tuples of (current_value, row_value).
2170 """
2171 self._assert_row_length(row)
2172 differences = {}
2173 for field, row_value in itertools.izip(self._fields, row):
2174 current_value = getattr(self, field)
2175 if current_value != row_value:
2176 differences[field] = (current_value, row_value)
2177 return differences
showard2bab8f42008-11-12 18:15:22 +00002178
2179
2180 def _update_fields_from_row(self, row):
showarda3c58572009-03-12 20:36:59 +00002181 """
2182 Update our field attributes using a single row returned by SELECT.
2183
2184 @param row - A sequence of values corresponding to fields named in
2185 the class fields list.
2186 """
2187 self._assert_row_length(row)
mbligh36768f02008-02-22 18:28:33 +00002188
showard2bab8f42008-11-12 18:15:22 +00002189 self._valid_fields = set()
showarda3c58572009-03-12 20:36:59 +00002190 for field, value in itertools.izip(self._fields, row):
showard2bab8f42008-11-12 18:15:22 +00002191 setattr(self, field, value)
2192 self._valid_fields.add(field)
mbligh36768f02008-02-22 18:28:33 +00002193
showard2bab8f42008-11-12 18:15:22 +00002194 self._valid_fields.remove('id')
mbligh36768f02008-02-22 18:28:33 +00002195
mblighe2586682008-02-29 22:45:46 +00002196
showardccbd6c52009-03-21 00:10:21 +00002197 def update_from_database(self):
2198 assert self.id is not None
2199 row = self._fetch_row_from_db(self.id)
2200 self._update_fields_from_row(row)
2201
2202
jadmanski0afbb632008-06-06 21:10:57 +00002203 def count(self, where, table = None):
2204 if not table:
2205 table = self.__table
mbligh36768f02008-02-22 18:28:33 +00002206
jadmanski0afbb632008-06-06 21:10:57 +00002207 rows = _db.execute("""
2208 SELECT count(*) FROM %s
2209 WHERE %s
2210 """ % (table, where))
mbligh6f8bab42008-02-29 22:45:14 +00002211
jadmanski0afbb632008-06-06 21:10:57 +00002212 assert len(rows) == 1
2213
2214 return int(rows[0][0])
mbligh36768f02008-02-22 18:28:33 +00002215
2216
showardd3dc1992009-04-22 21:01:40 +00002217 def update_field(self, field, value):
showard2bab8f42008-11-12 18:15:22 +00002218 assert field in self._valid_fields
mbligh36768f02008-02-22 18:28:33 +00002219
showard2bab8f42008-11-12 18:15:22 +00002220 if getattr(self, field) == value:
jadmanski0afbb632008-06-06 21:10:57 +00002221 return
mbligh36768f02008-02-22 18:28:33 +00002222
mblighf8c624d2008-07-03 16:58:45 +00002223 query = "UPDATE %s SET %s = %%s WHERE id = %%s" % (self.__table, field)
jadmanski0afbb632008-06-06 21:10:57 +00002224 _db.execute(query, (value, self.id))
2225
showard2bab8f42008-11-12 18:15:22 +00002226 setattr(self, field, value)
mbligh36768f02008-02-22 18:28:33 +00002227
2228
jadmanski0afbb632008-06-06 21:10:57 +00002229 def save(self):
2230 if self.__new_record:
showard6ae5ea92009-02-25 00:11:51 +00002231 keys = self._fields[1:] # avoid id
jadmanski0afbb632008-06-06 21:10:57 +00002232 columns = ','.join([str(key) for key in keys])
showard76e29d12009-04-15 21:53:10 +00002233 values = []
2234 for key in keys:
2235 value = getattr(self, key)
2236 if value is None:
2237 values.append('NULL')
2238 else:
2239 values.append('"%s"' % value)
showard89f84db2009-03-12 20:39:13 +00002240 values_str = ','.join(values)
2241 query = ('INSERT INTO %s (%s) VALUES (%s)' %
2242 (self.__table, columns, values_str))
jadmanski0afbb632008-06-06 21:10:57 +00002243 _db.execute(query)
showard89f84db2009-03-12 20:39:13 +00002244 # Update our id to the one the database just assigned to us.
2245 self.id = _db.execute('SELECT LAST_INSERT_ID()')[0][0]
mbligh36768f02008-02-22 18:28:33 +00002246
2247
jadmanski0afbb632008-06-06 21:10:57 +00002248 def delete(self):
showarda3c58572009-03-12 20:36:59 +00002249 self._instances_by_type_and_id.pop((type(self), id), None)
2250 self._initialized = False
2251 self._valid_fields.clear()
jadmanski0afbb632008-06-06 21:10:57 +00002252 query = 'DELETE FROM %s WHERE id=%%s' % self.__table
2253 _db.execute(query, (self.id,))
mblighe2586682008-02-29 22:45:46 +00002254
2255
showard63a34772008-08-18 19:32:50 +00002256 @staticmethod
2257 def _prefix_with(string, prefix):
2258 if string:
2259 string = prefix + string
2260 return string
2261
2262
jadmanski0afbb632008-06-06 21:10:57 +00002263 @classmethod
showard989f25d2008-10-01 11:38:11 +00002264 def fetch(cls, where='', params=(), joins='', order_by=''):
showard89f84db2009-03-12 20:39:13 +00002265 """
2266 Construct instances of our class based on the given database query.
2267
2268 @yields One class instance for each row fetched.
2269 """
showard63a34772008-08-18 19:32:50 +00002270 order_by = cls._prefix_with(order_by, 'ORDER BY ')
2271 where = cls._prefix_with(where, 'WHERE ')
2272 query = ('SELECT %(table)s.* FROM %(table)s %(joins)s '
showard6ae5ea92009-02-25 00:11:51 +00002273 '%(where)s %(order_by)s' % {'table' : cls._table_name,
showard63a34772008-08-18 19:32:50 +00002274 'joins' : joins,
2275 'where' : where,
2276 'order_by' : order_by})
2277 rows = _db.execute(query, params)
jadmanski0afbb632008-06-06 21:10:57 +00002278 for row in rows:
2279 yield cls(row=row)
mblighe2586682008-02-29 22:45:46 +00002280
mbligh36768f02008-02-22 18:28:33 +00002281
2282class IneligibleHostQueue(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002283 _table_name = 'ineligible_host_queues'
2284 _fields = ('id', 'job_id', 'host_id')
showard04c82c52008-05-29 19:38:12 +00002285
2286
showard89f84db2009-03-12 20:39:13 +00002287class AtomicGroup(DBObject):
2288 _table_name = 'atomic_groups'
showard205fd602009-03-21 00:17:35 +00002289 _fields = ('id', 'name', 'description', 'max_number_of_machines',
2290 'invalid')
showard89f84db2009-03-12 20:39:13 +00002291
2292
showard989f25d2008-10-01 11:38:11 +00002293class Label(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002294 _table_name = 'labels'
2295 _fields = ('id', 'name', 'kernel_config', 'platform', 'invalid',
showard89f84db2009-03-12 20:39:13 +00002296 'only_if_needed', 'atomic_group_id')
showard989f25d2008-10-01 11:38:11 +00002297
2298
mbligh36768f02008-02-22 18:28:33 +00002299class Host(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002300 _table_name = 'hosts'
2301 _fields = ('id', 'hostname', 'locked', 'synch_id', 'status',
2302 'invalid', 'protection', 'locked_by_id', 'lock_time', 'dirty')
2303
2304
jadmanski0afbb632008-06-06 21:10:57 +00002305 def current_task(self):
2306 rows = _db.execute("""
2307 SELECT * FROM host_queue_entries WHERE host_id=%s AND NOT complete AND active
2308 """, (self.id,))
2309
2310 if len(rows) == 0:
2311 return None
2312 else:
2313 assert len(rows) == 1
2314 results = rows[0];
jadmanski0afbb632008-06-06 21:10:57 +00002315 return HostQueueEntry(row=results)
mbligh36768f02008-02-22 18:28:33 +00002316
2317
jadmanski0afbb632008-06-06 21:10:57 +00002318 def yield_work(self):
showardb18134f2009-03-20 20:52:18 +00002319 logging.info("%s yielding work", self.hostname)
jadmanski0afbb632008-06-06 21:10:57 +00002320 if self.current_task():
2321 self.current_task().requeue()
2322
showard6ae5ea92009-02-25 00:11:51 +00002323
jadmanski0afbb632008-06-06 21:10:57 +00002324 def set_status(self,status):
showardb18134f2009-03-20 20:52:18 +00002325 logging.info('%s -> %s', self.hostname, status)
jadmanski0afbb632008-06-06 21:10:57 +00002326 self.update_field('status',status)
mbligh36768f02008-02-22 18:28:33 +00002327
2328
showard170873e2009-01-07 00:22:26 +00002329 def platform_and_labels(self):
showardd8e548a2008-09-09 03:04:57 +00002330 """
showard170873e2009-01-07 00:22:26 +00002331 Returns a tuple (platform_name, list_of_all_label_names).
showardd8e548a2008-09-09 03:04:57 +00002332 """
2333 rows = _db.execute("""
showard170873e2009-01-07 00:22:26 +00002334 SELECT labels.name, labels.platform
showardd8e548a2008-09-09 03:04:57 +00002335 FROM labels
2336 INNER JOIN hosts_labels ON labels.id = hosts_labels.label_id
showard170873e2009-01-07 00:22:26 +00002337 WHERE hosts_labels.host_id = %s
showardd8e548a2008-09-09 03:04:57 +00002338 ORDER BY labels.name
2339 """, (self.id,))
showard170873e2009-01-07 00:22:26 +00002340 platform = None
2341 all_labels = []
2342 for label_name, is_platform in rows:
2343 if is_platform:
2344 platform = label_name
2345 all_labels.append(label_name)
2346 return platform, all_labels
2347
2348
showarda64e52a2009-06-08 23:24:08 +00002349 def reverify_tasks(self, cleanup=True):
2350 tasks = [VerifyTask(host=self)]
showard6d7b2ff2009-06-10 00:16:47 +00002351 # just to make sure this host does not get taken away
2352 self.set_status('Verifying')
showarda64e52a2009-06-08 23:24:08 +00002353 if cleanup:
2354 tasks.insert(0, CleanupTask(host=self))
showard6d7b2ff2009-06-10 00:16:47 +00002355 self.set_status('Cleaning')
showarda64e52a2009-06-08 23:24:08 +00002356 return tasks
showardd8e548a2008-09-09 03:04:57 +00002357
2358
showard54c1ea92009-05-20 00:32:58 +00002359 _ALPHANUM_HOST_RE = re.compile(r'^([a-z-]+)(\d+)$', re.IGNORECASE)
2360
2361
2362 @classmethod
2363 def cmp_for_sort(cls, a, b):
2364 """
2365 A comparison function for sorting Host objects by hostname.
2366
2367 This strips any trailing numeric digits, ignores leading 0s and
2368 compares hostnames by the leading name and the trailing digits as a
2369 number. If both hostnames do not match this pattern, they are simply
2370 compared as lower case strings.
2371
2372 Example of how hostnames will be sorted:
2373
2374 alice, host1, host2, host09, host010, host10, host11, yolkfolk
2375
2376 This hopefully satisfy most people's hostname sorting needs regardless
2377 of their exact naming schemes. Nobody sane should have both a host10
2378 and host010 (but the algorithm works regardless).
2379 """
2380 lower_a = a.hostname.lower()
2381 lower_b = b.hostname.lower()
2382 match_a = cls._ALPHANUM_HOST_RE.match(lower_a)
2383 match_b = cls._ALPHANUM_HOST_RE.match(lower_b)
2384 if match_a and match_b:
2385 name_a, number_a_str = match_a.groups()
2386 name_b, number_b_str = match_b.groups()
2387 number_a = int(number_a_str.lstrip('0'))
2388 number_b = int(number_b_str.lstrip('0'))
2389 result = cmp((name_a, number_a), (name_b, number_b))
2390 if result == 0 and lower_a != lower_b:
2391 # If they compared equal above but the lower case names are
2392 # indeed different, don't report equality. abc012 != abc12.
2393 return cmp(lower_a, lower_b)
2394 return result
2395 else:
2396 return cmp(lower_a, lower_b)
2397
2398
mbligh36768f02008-02-22 18:28:33 +00002399class HostQueueEntry(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002400 _table_name = 'host_queue_entries'
2401 _fields = ('id', 'job_id', 'host_id', 'status', 'meta_host',
showard89f84db2009-03-12 20:39:13 +00002402 'active', 'complete', 'deleted', 'execution_subdir',
showard12f3e322009-05-13 21:27:42 +00002403 'atomic_group_id', 'aborted', 'started_on')
showard6ae5ea92009-02-25 00:11:51 +00002404
2405
showarda3c58572009-03-12 20:36:59 +00002406 def __init__(self, id=None, row=None, **kwargs):
jadmanski0afbb632008-06-06 21:10:57 +00002407 assert id or row
showarda3c58572009-03-12 20:36:59 +00002408 super(HostQueueEntry, self).__init__(id=id, row=row, **kwargs)
jadmanski0afbb632008-06-06 21:10:57 +00002409 self.job = Job(self.job_id)
mbligh36768f02008-02-22 18:28:33 +00002410
jadmanski0afbb632008-06-06 21:10:57 +00002411 if self.host_id:
2412 self.host = Host(self.host_id)
2413 else:
2414 self.host = None
mbligh36768f02008-02-22 18:28:33 +00002415
showard77182562009-06-10 00:16:05 +00002416 if self.atomic_group_id:
2417 self.atomic_group = AtomicGroup(self.atomic_group_id,
2418 always_query=False)
2419 else:
2420 self.atomic_group = None
2421
showard170873e2009-01-07 00:22:26 +00002422 self.queue_log_path = os.path.join(self.job.tag(),
jadmanski0afbb632008-06-06 21:10:57 +00002423 'queue.log.' + str(self.id))
mbligh36768f02008-02-22 18:28:33 +00002424
2425
showard89f84db2009-03-12 20:39:13 +00002426 @classmethod
2427 def clone(cls, template):
2428 """
2429 Creates a new row using the values from a template instance.
2430
2431 The new instance will not exist in the database or have a valid
2432 id attribute until its save() method is called.
2433 """
2434 assert isinstance(template, cls)
2435 new_row = [getattr(template, field) for field in cls._fields]
2436 clone = cls(row=new_row, new_record=True)
2437 clone.id = None
2438 return clone
2439
2440
showardc85c21b2008-11-24 22:17:37 +00002441 def _view_job_url(self):
2442 return "%s#tab_id=view_job&object_id=%s" % (_base_url, self.job.id)
2443
2444
showardf1ae3542009-05-11 19:26:02 +00002445 def get_labels(self):
2446 """
2447 Get all labels associated with this host queue entry (either via the
2448 meta_host or as a job dependency label). The labels yielded are not
2449 guaranteed to be unique.
2450
2451 @yields Label instances associated with this host_queue_entry.
2452 """
2453 if self.meta_host:
2454 yield Label(id=self.meta_host, always_query=False)
2455 labels = Label.fetch(
2456 joins="JOIN jobs_dependency_labels AS deps "
2457 "ON (labels.id = deps.label_id)",
2458 where="deps.job_id = %d" % self.job.id)
2459 for label in labels:
2460 yield label
2461
2462
jadmanski0afbb632008-06-06 21:10:57 +00002463 def set_host(self, host):
2464 if host:
2465 self.queue_log_record('Assigning host ' + host.hostname)
2466 self.update_field('host_id', host.id)
2467 self.update_field('active', True)
2468 self.block_host(host.id)
2469 else:
2470 self.queue_log_record('Releasing host')
2471 self.unblock_host(self.host.id)
2472 self.update_field('host_id', None)
mbligh36768f02008-02-22 18:28:33 +00002473
jadmanski0afbb632008-06-06 21:10:57 +00002474 self.host = host
mbligh36768f02008-02-22 18:28:33 +00002475
2476
jadmanski0afbb632008-06-06 21:10:57 +00002477 def get_host(self):
2478 return self.host
mbligh36768f02008-02-22 18:28:33 +00002479
2480
jadmanski0afbb632008-06-06 21:10:57 +00002481 def queue_log_record(self, log_line):
2482 now = str(datetime.datetime.now())
showard170873e2009-01-07 00:22:26 +00002483 _drone_manager.write_lines_to_file(self.queue_log_path,
2484 [now + ' ' + log_line])
mbligh36768f02008-02-22 18:28:33 +00002485
2486
jadmanski0afbb632008-06-06 21:10:57 +00002487 def block_host(self, host_id):
showardb18134f2009-03-20 20:52:18 +00002488 logging.info("creating block %s/%s", self.job.id, host_id)
jadmanski0afbb632008-06-06 21:10:57 +00002489 row = [0, self.job.id, host_id]
2490 block = IneligibleHostQueue(row=row, new_record=True)
2491 block.save()
mblighe2586682008-02-29 22:45:46 +00002492
2493
jadmanski0afbb632008-06-06 21:10:57 +00002494 def unblock_host(self, host_id):
showardb18134f2009-03-20 20:52:18 +00002495 logging.info("removing block %s/%s", self.job.id, host_id)
jadmanski0afbb632008-06-06 21:10:57 +00002496 blocks = IneligibleHostQueue.fetch(
2497 'job_id=%d and host_id=%d' % (self.job.id, host_id))
2498 for block in blocks:
2499 block.delete()
mblighe2586682008-02-29 22:45:46 +00002500
2501
showard2bab8f42008-11-12 18:15:22 +00002502 def set_execution_subdir(self, subdir=None):
2503 if subdir is None:
2504 assert self.get_host()
2505 subdir = self.get_host().hostname
2506 self.update_field('execution_subdir', subdir)
mbligh36768f02008-02-22 18:28:33 +00002507
2508
showard6355f6b2008-12-05 18:52:13 +00002509 def _get_hostname(self):
2510 if self.host:
2511 return self.host.hostname
2512 return 'no host'
2513
2514
showard170873e2009-01-07 00:22:26 +00002515 def __str__(self):
2516 return "%s/%d (%d)" % (self._get_hostname(), self.job.id, self.id)
2517
2518
jadmanski0afbb632008-06-06 21:10:57 +00002519 def set_status(self, status):
showardd3dc1992009-04-22 21:01:40 +00002520 self.update_field('status', status)
mblighf8c624d2008-07-03 16:58:45 +00002521
showardb18134f2009-03-20 20:52:18 +00002522 logging.info("%s -> %s", self, self.status)
mblighf8c624d2008-07-03 16:58:45 +00002523
showardc85c21b2008-11-24 22:17:37 +00002524 if status in ['Queued', 'Parsing']:
jadmanski0afbb632008-06-06 21:10:57 +00002525 self.update_field('complete', False)
2526 self.update_field('active', False)
mbligh36768f02008-02-22 18:28:33 +00002527
jadmanski0afbb632008-06-06 21:10:57 +00002528 if status in ['Pending', 'Running', 'Verifying', 'Starting',
showardd3dc1992009-04-22 21:01:40 +00002529 'Gathering']:
jadmanski0afbb632008-06-06 21:10:57 +00002530 self.update_field('complete', False)
2531 self.update_field('active', True)
mbligh36768f02008-02-22 18:28:33 +00002532
showardc85c21b2008-11-24 22:17:37 +00002533 if status in ['Failed', 'Completed', 'Stopped', 'Aborted']:
jadmanski0afbb632008-06-06 21:10:57 +00002534 self.update_field('complete', True)
2535 self.update_field('active', False)
showardc85c21b2008-11-24 22:17:37 +00002536
2537 should_email_status = (status.lower() in _notify_email_statuses or
2538 'all' in _notify_email_statuses)
2539 if should_email_status:
2540 self._email_on_status(status)
2541
2542 self._email_on_job_complete()
2543
2544
2545 def _email_on_status(self, status):
showard6355f6b2008-12-05 18:52:13 +00002546 hostname = self._get_hostname()
showardc85c21b2008-11-24 22:17:37 +00002547
2548 subject = 'Autotest: Job ID: %s "%s" Host: %s %s' % (
2549 self.job.id, self.job.name, hostname, status)
2550 body = "Job ID: %s\nJob Name: %s\nHost: %s\nStatus: %s\n%s\n" % (
2551 self.job.id, self.job.name, hostname, status,
2552 self._view_job_url())
showard170873e2009-01-07 00:22:26 +00002553 email_manager.manager.send_email(self.job.email_list, subject, body)
showard542e8402008-09-19 20:16:18 +00002554
2555
2556 def _email_on_job_complete(self):
showardc85c21b2008-11-24 22:17:37 +00002557 if not self.job.is_finished():
2558 return
showard542e8402008-09-19 20:16:18 +00002559
showardc85c21b2008-11-24 22:17:37 +00002560 summary_text = []
showard6355f6b2008-12-05 18:52:13 +00002561 hosts_queue = HostQueueEntry.fetch('job_id = %s' % self.job.id)
showardc85c21b2008-11-24 22:17:37 +00002562 for queue_entry in hosts_queue:
2563 summary_text.append("Host: %s Status: %s" %
showard6355f6b2008-12-05 18:52:13 +00002564 (queue_entry._get_hostname(),
showardc85c21b2008-11-24 22:17:37 +00002565 queue_entry.status))
2566
2567 summary_text = "\n".join(summary_text)
2568 status_counts = models.Job.objects.get_status_counts(
2569 [self.job.id])[self.job.id]
2570 status = ', '.join('%d %s' % (count, status) for status, count
2571 in status_counts.iteritems())
2572
2573 subject = 'Autotest: Job ID: %s "%s" %s' % (
2574 self.job.id, self.job.name, status)
2575 body = "Job ID: %s\nJob Name: %s\nStatus: %s\n%s\nSummary:\n%s" % (
2576 self.job.id, self.job.name, status, self._view_job_url(),
2577 summary_text)
showard170873e2009-01-07 00:22:26 +00002578 email_manager.manager.send_email(self.job.email_list, subject, body)
mbligh36768f02008-02-22 18:28:33 +00002579
2580
showard77182562009-06-10 00:16:05 +00002581 def run_pre_job_tasks(self, assigned_host=None):
2582 if self.meta_host is not None or self.atomic_group:
jadmanski0afbb632008-06-06 21:10:57 +00002583 assert assigned_host
2584 # ensure results dir exists for the queue log
showard08356c12009-06-15 20:24:01 +00002585 if self.host_id is None:
2586 self.set_host(assigned_host)
2587 else:
2588 assert assigned_host.id == self.host_id
mbligh36768f02008-02-22 18:28:33 +00002589
showardb18134f2009-03-20 20:52:18 +00002590 logging.info("%s/%s/%s scheduled on %s, status=%s",
2591 self.job.name, self.meta_host, self.atomic_group_id,
2592 self.host.hostname, self.status)
mbligh36768f02008-02-22 18:28:33 +00002593
showard77182562009-06-10 00:16:05 +00002594 return self._do_run_pre_job_tasks()
2595
2596
2597 def _do_run_pre_job_tasks(self):
2598 # Every host goes thru the Verifying stage (which may or may not
2599 # actually do anything as determined by get_pre_job_tasks).
2600 self.set_status(models.HostQueueEntry.Status.VERIFYING)
2601
2602 # The pre job tasks always end with a SetEntryPendingTask which
2603 # will continue as appropriate through queue_entry.on_pending().
2604 return Agent(self.job.get_pre_job_tasks(queue_entry=self))
mblighe2586682008-02-29 22:45:46 +00002605
showard6ae5ea92009-02-25 00:11:51 +00002606
jadmanski0afbb632008-06-06 21:10:57 +00002607 def requeue(self):
2608 self.set_status('Queued')
showard12f3e322009-05-13 21:27:42 +00002609 self.update_field('started_on', None)
showardde634ee2009-01-30 01:44:24 +00002610 # verify/cleanup failure sets the execution subdir, so reset it here
2611 self.set_execution_subdir('')
jadmanski0afbb632008-06-06 21:10:57 +00002612 if self.meta_host:
2613 self.set_host(None)
mbligh36768f02008-02-22 18:28:33 +00002614
2615
jadmanski0afbb632008-06-06 21:10:57 +00002616 def handle_host_failure(self):
2617 """\
2618 Called when this queue entry's host has failed verification and
2619 repair.
2620 """
2621 assert not self.meta_host
2622 self.set_status('Failed')
showard2bab8f42008-11-12 18:15:22 +00002623 self.job.stop_if_necessary()
mblighe2586682008-02-29 22:45:46 +00002624
2625
jadmanskif7fa2cc2008-10-01 14:13:23 +00002626 @property
2627 def aborted_by(self):
2628 self._load_abort_info()
2629 return self._aborted_by
2630
2631
2632 @property
2633 def aborted_on(self):
2634 self._load_abort_info()
2635 return self._aborted_on
2636
2637
2638 def _load_abort_info(self):
2639 """ Fetch info about who aborted the job. """
2640 if hasattr(self, "_aborted_by"):
2641 return
2642 rows = _db.execute("""
2643 SELECT users.login, aborted_host_queue_entries.aborted_on
2644 FROM aborted_host_queue_entries
2645 INNER JOIN users
2646 ON users.id = aborted_host_queue_entries.aborted_by_id
2647 WHERE aborted_host_queue_entries.queue_entry_id = %s
2648 """, (self.id,))
2649 if rows:
2650 self._aborted_by, self._aborted_on = rows[0]
2651 else:
2652 self._aborted_by = self._aborted_on = None
2653
2654
showardb2e2c322008-10-14 17:33:55 +00002655 def on_pending(self):
2656 """
2657 Called when an entry in a synchronous job has passed verify. If the
2658 job is ready to run, returns an agent to run the job. Returns None
2659 otherwise.
2660 """
2661 self.set_status('Pending')
showardcfd66a32008-10-15 20:31:48 +00002662 self.get_host().set_status('Pending')
showard77182562009-06-10 00:16:05 +00002663 return self.job.run_if_ready(queue_entry=self)
showardb2e2c322008-10-14 17:33:55 +00002664
2665
showardd3dc1992009-04-22 21:01:40 +00002666 def abort(self, dispatcher):
2667 assert self.aborted and not self.complete
showard1be97432008-10-17 15:30:45 +00002668
showardd3dc1992009-04-22 21:01:40 +00002669 Status = models.HostQueueEntry.Status
2670 has_running_job_agent = (
2671 self.status in (Status.RUNNING, Status.GATHERING, Status.PARSING)
2672 and dispatcher.get_agents_for_entry(self))
2673 if has_running_job_agent:
2674 # do nothing; post-job tasks will finish and then mark this entry
2675 # with status "Aborted" and take care of the host
2676 return
2677
2678 if self.status in (Status.STARTING, Status.PENDING):
2679 self.host.set_status(models.Host.Status.READY)
2680 elif self.status == Status.VERIFYING:
2681 dispatcher.add_agent(Agent(tasks=self.host.reverify_tasks()))
2682
2683 self.set_status(Status.ABORTED)
showard170873e2009-01-07 00:22:26 +00002684
2685 def execution_tag(self):
2686 assert self.execution_subdir
2687 return "%s-%s/%s" % (self.job.id, self.job.owner, self.execution_subdir)
showard1be97432008-10-17 15:30:45 +00002688
2689
mbligh36768f02008-02-22 18:28:33 +00002690class Job(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002691 _table_name = 'jobs'
2692 _fields = ('id', 'owner', 'name', 'priority', 'control_file',
2693 'control_type', 'created_on', 'synch_count', 'timeout',
showarda1e74b32009-05-12 17:32:04 +00002694 'run_verify', 'email_list', 'reboot_before', 'reboot_after',
showard12f3e322009-05-13 21:27:42 +00002695 'parse_failed_repair', 'max_runtime_hrs')
showard6ae5ea92009-02-25 00:11:51 +00002696
showard77182562009-06-10 00:16:05 +00002697 # This does not need to be a column in the DB. The delays are likely to
2698 # be configured short. If the scheduler is stopped and restarted in
2699 # the middle of a job's delay cycle, the delay cycle will either be
2700 # repeated or skipped depending on the number of Pending machines found
2701 # when the restarted scheduler recovers to track it. Not a problem.
2702 #
2703 # A reference to the DelayedCallTask that will wake up the job should
2704 # no other HQEs change state in time. Its end_time attribute is used
2705 # by our run_with_ready_delay() method to determine if the wait is over.
2706 _delay_ready_task = None
2707
2708 # TODO(gps): On scheduler start/recovery we need to call HQE.on_pending() on
2709 # all status='Pending' atomic group HQEs incase a delay was running when the
2710 # scheduler was restarted and no more hosts ever successfully exit Verify.
showard6ae5ea92009-02-25 00:11:51 +00002711
showarda3c58572009-03-12 20:36:59 +00002712 def __init__(self, id=None, row=None, **kwargs):
jadmanski0afbb632008-06-06 21:10:57 +00002713 assert id or row
showarda3c58572009-03-12 20:36:59 +00002714 super(Job, self).__init__(id=id, row=row, **kwargs)
mbligh36768f02008-02-22 18:28:33 +00002715
mblighe2586682008-02-29 22:45:46 +00002716
jadmanski0afbb632008-06-06 21:10:57 +00002717 def is_server_job(self):
2718 return self.control_type != 2
mbligh36768f02008-02-22 18:28:33 +00002719
2720
showard170873e2009-01-07 00:22:26 +00002721 def tag(self):
2722 return "%s-%s" % (self.id, self.owner)
2723
2724
jadmanski0afbb632008-06-06 21:10:57 +00002725 def get_host_queue_entries(self):
2726 rows = _db.execute("""
2727 SELECT * FROM host_queue_entries
2728 WHERE job_id= %s
2729 """, (self.id,))
2730 entries = [HostQueueEntry(row=i) for i in rows]
mbligh36768f02008-02-22 18:28:33 +00002731
jadmanski0afbb632008-06-06 21:10:57 +00002732 assert len(entries)>0
mbligh36768f02008-02-22 18:28:33 +00002733
jadmanski0afbb632008-06-06 21:10:57 +00002734 return entries
mbligh36768f02008-02-22 18:28:33 +00002735
2736
jadmanski0afbb632008-06-06 21:10:57 +00002737 def set_status(self, status, update_queues=False):
2738 self.update_field('status',status)
2739
2740 if update_queues:
2741 for queue_entry in self.get_host_queue_entries():
2742 queue_entry.set_status(status)
mbligh36768f02008-02-22 18:28:33 +00002743
2744
showard77182562009-06-10 00:16:05 +00002745 def _atomic_and_has_started(self):
2746 """
2747 @returns True if any of the HostQueueEntries associated with this job
2748 have entered the Status.STARTING state or beyond.
2749 """
2750 atomic_entries = models.HostQueueEntry.objects.filter(
2751 job=self.id, atomic_group__isnull=False)
2752 if atomic_entries.count() <= 0:
2753 return False
2754
showardaf8b4ca2009-06-16 18:47:26 +00002755 # These states may *only* be reached if Job.run() has been called.
2756 started_statuses = (models.HostQueueEntry.Status.STARTING,
2757 models.HostQueueEntry.Status.RUNNING,
2758 models.HostQueueEntry.Status.COMPLETED)
2759
2760 started_entries = atomic_entries.filter(status__in=started_statuses)
showard77182562009-06-10 00:16:05 +00002761 return started_entries.count() > 0
2762
2763
2764 def _pending_count(self):
2765 """The number of HostQueueEntries for this job in the Pending state."""
2766 pending_entries = models.HostQueueEntry.objects.filter(
2767 job=self.id, status=models.HostQueueEntry.Status.PENDING)
2768 return pending_entries.count()
2769
2770
jadmanski0afbb632008-06-06 21:10:57 +00002771 def is_ready(self):
showard77182562009-06-10 00:16:05 +00002772 # NOTE: Atomic group jobs stop reporting ready after they have been
2773 # started to avoid launching multiple copies of one atomic job.
2774 # Only possible if synch_count is less than than half the number of
2775 # machines in the atomic group.
2776 return (self._pending_count() >= self.synch_count
2777 and not self._atomic_and_has_started())
mbligh36768f02008-02-22 18:28:33 +00002778
2779
jadmanski0afbb632008-06-06 21:10:57 +00002780 def num_machines(self, clause = None):
2781 sql = "job_id=%s" % self.id
2782 if clause:
2783 sql += " AND (%s)" % clause
2784 return self.count(sql, table='host_queue_entries')
mbligh36768f02008-02-22 18:28:33 +00002785
2786
jadmanski0afbb632008-06-06 21:10:57 +00002787 def num_queued(self):
2788 return self.num_machines('not complete')
mbligh36768f02008-02-22 18:28:33 +00002789
2790
jadmanski0afbb632008-06-06 21:10:57 +00002791 def num_active(self):
2792 return self.num_machines('active')
mbligh36768f02008-02-22 18:28:33 +00002793
2794
jadmanski0afbb632008-06-06 21:10:57 +00002795 def num_complete(self):
2796 return self.num_machines('complete')
mbligh36768f02008-02-22 18:28:33 +00002797
2798
jadmanski0afbb632008-06-06 21:10:57 +00002799 def is_finished(self):
showardc85c21b2008-11-24 22:17:37 +00002800 return self.num_complete() == self.num_machines()
mbligh36768f02008-02-22 18:28:33 +00002801
mbligh36768f02008-02-22 18:28:33 +00002802
showard6bb7c292009-01-30 01:44:51 +00002803 def _not_yet_run_entries(self, include_verifying=True):
2804 statuses = [models.HostQueueEntry.Status.QUEUED,
2805 models.HostQueueEntry.Status.PENDING]
2806 if include_verifying:
2807 statuses.append(models.HostQueueEntry.Status.VERIFYING)
2808 return models.HostQueueEntry.objects.filter(job=self.id,
2809 status__in=statuses)
2810
2811
2812 def _stop_all_entries(self):
2813 entries_to_stop = self._not_yet_run_entries(
2814 include_verifying=False)
2815 for child_entry in entries_to_stop:
showard4f9e5372009-01-07 21:33:38 +00002816 assert not child_entry.complete, (
2817 '%s status=%s, active=%s, complete=%s' %
2818 (child_entry.id, child_entry.status, child_entry.active,
2819 child_entry.complete))
showard2bab8f42008-11-12 18:15:22 +00002820 if child_entry.status == models.HostQueueEntry.Status.PENDING:
2821 child_entry.host.status = models.Host.Status.READY
2822 child_entry.host.save()
2823 child_entry.status = models.HostQueueEntry.Status.STOPPED
2824 child_entry.save()
2825
showard2bab8f42008-11-12 18:15:22 +00002826 def stop_if_necessary(self):
showard6bb7c292009-01-30 01:44:51 +00002827 not_yet_run = self._not_yet_run_entries()
showard2bab8f42008-11-12 18:15:22 +00002828 if not_yet_run.count() < self.synch_count:
showard6bb7c292009-01-30 01:44:51 +00002829 self._stop_all_entries()
mblighe2586682008-02-29 22:45:46 +00002830
2831
jadmanski0afbb632008-06-06 21:10:57 +00002832 def write_to_machines_file(self, queue_entry):
2833 hostname = queue_entry.get_host().hostname
showard170873e2009-01-07 00:22:26 +00002834 file_path = os.path.join(self.tag(), '.machines')
2835 _drone_manager.write_lines_to_file(file_path, [hostname])
mbligh36768f02008-02-22 18:28:33 +00002836
2837
showardf1ae3542009-05-11 19:26:02 +00002838 def _next_group_name(self, group_name=''):
2839 """@returns a directory name to use for the next host group results."""
2840 if group_name:
2841 # Sanitize for use as a pathname.
2842 group_name = group_name.replace(os.path.sep, '_')
2843 if group_name.startswith('.'):
2844 group_name = '_' + group_name[1:]
2845 # Add a separator between the group name and 'group%d'.
2846 group_name += '.'
2847 group_count_re = re.compile(r'%sgroup(\d+)' % re.escape(group_name))
showard2bab8f42008-11-12 18:15:22 +00002848 query = models.HostQueueEntry.objects.filter(
2849 job=self.id).values('execution_subdir').distinct()
2850 subdirs = (entry['execution_subdir'] for entry in query)
showardf1ae3542009-05-11 19:26:02 +00002851 group_matches = (group_count_re.match(subdir) for subdir in subdirs)
2852 ids = [int(match.group(1)) for match in group_matches if match]
showard2bab8f42008-11-12 18:15:22 +00002853 if ids:
2854 next_id = max(ids) + 1
2855 else:
2856 next_id = 0
showardf1ae3542009-05-11 19:26:02 +00002857 return '%sgroup%d' % (group_name, next_id)
showard2bab8f42008-11-12 18:15:22 +00002858
2859
showard170873e2009-01-07 00:22:26 +00002860 def _write_control_file(self, execution_tag):
2861 control_path = _drone_manager.attach_file_to_execution(
2862 execution_tag, self.control_file)
2863 return control_path
mbligh36768f02008-02-22 18:28:33 +00002864
showardb2e2c322008-10-14 17:33:55 +00002865
showard2bab8f42008-11-12 18:15:22 +00002866 def get_group_entries(self, queue_entry_from_group):
2867 execution_subdir = queue_entry_from_group.execution_subdir
showarde788ea62008-11-17 21:02:47 +00002868 return list(HostQueueEntry.fetch(
2869 where='job_id=%s AND execution_subdir=%s',
2870 params=(self.id, execution_subdir)))
showard2bab8f42008-11-12 18:15:22 +00002871
2872
showardb2e2c322008-10-14 17:33:55 +00002873 def _get_autoserv_params(self, queue_entries):
showard170873e2009-01-07 00:22:26 +00002874 assert queue_entries
2875 execution_tag = queue_entries[0].execution_tag()
2876 control_path = self._write_control_file(execution_tag)
jadmanski0afbb632008-06-06 21:10:57 +00002877 hostnames = ','.join([entry.get_host().hostname
2878 for entry in queue_entries])
mbligh36768f02008-02-22 18:28:33 +00002879
showard87ba02a2009-04-20 19:37:32 +00002880 params = _autoserv_command_line(
2881 hostnames, execution_tag,
2882 ['-P', execution_tag, '-n',
2883 _drone_manager.absolute_path(control_path)],
2884 job=self)
mbligh36768f02008-02-22 18:28:33 +00002885
jadmanski0afbb632008-06-06 21:10:57 +00002886 if not self.is_server_job():
2887 params.append('-c')
mbligh36768f02008-02-22 18:28:33 +00002888
showardb2e2c322008-10-14 17:33:55 +00002889 return params
mblighe2586682008-02-29 22:45:46 +00002890
mbligh36768f02008-02-22 18:28:33 +00002891
showardc9ae1782009-01-30 01:42:37 +00002892 def _should_run_cleanup(self, queue_entry):
showard0fc38302008-10-23 00:44:07 +00002893 if self.reboot_before == models.RebootBefore.ALWAYS:
showardc9ae1782009-01-30 01:42:37 +00002894 return True
showard0fc38302008-10-23 00:44:07 +00002895 elif self.reboot_before == models.RebootBefore.IF_DIRTY:
showardc9ae1782009-01-30 01:42:37 +00002896 return queue_entry.get_host().dirty
2897 return False
showard21baa452008-10-21 00:08:39 +00002898
showardc9ae1782009-01-30 01:42:37 +00002899
2900 def _should_run_verify(self, queue_entry):
2901 do_not_verify = (queue_entry.host.protection ==
2902 host_protections.Protection.DO_NOT_VERIFY)
2903 if do_not_verify:
2904 return False
2905 return self.run_verify
2906
2907
showard77182562009-06-10 00:16:05 +00002908 def get_pre_job_tasks(self, queue_entry):
2909 """
2910 Get a list of tasks to perform before the host_queue_entry
2911 may be used to run this Job (such as Cleanup & Verify).
2912
2913 @returns A list of tasks to be done to the given queue_entry before
2914 it should be considered be ready to run this job. The last
2915 task in the list calls HostQueueEntry.on_pending(), which
2916 continues the flow of the job.
2917 """
showard21baa452008-10-21 00:08:39 +00002918 tasks = []
showardc9ae1782009-01-30 01:42:37 +00002919 if self._should_run_cleanup(queue_entry):
showard45ae8192008-11-05 19:32:53 +00002920 tasks.append(CleanupTask(queue_entry=queue_entry))
showardc9ae1782009-01-30 01:42:37 +00002921 if self._should_run_verify(queue_entry):
2922 tasks.append(VerifyTask(queue_entry=queue_entry))
2923 tasks.append(SetEntryPendingTask(queue_entry))
showard21baa452008-10-21 00:08:39 +00002924 return tasks
2925
2926
showardf1ae3542009-05-11 19:26:02 +00002927 def _assign_new_group(self, queue_entries, group_name=''):
showard2bab8f42008-11-12 18:15:22 +00002928 if len(queue_entries) == 1:
showardf1ae3542009-05-11 19:26:02 +00002929 group_subdir_name = queue_entries[0].get_host().hostname
showard2bab8f42008-11-12 18:15:22 +00002930 else:
showardf1ae3542009-05-11 19:26:02 +00002931 group_subdir_name = self._next_group_name(group_name)
showardb18134f2009-03-20 20:52:18 +00002932 logging.info('Running synchronous job %d hosts %s as %s',
showard2bab8f42008-11-12 18:15:22 +00002933 self.id, [entry.host.hostname for entry in queue_entries],
showardf1ae3542009-05-11 19:26:02 +00002934 group_subdir_name)
showard2bab8f42008-11-12 18:15:22 +00002935
2936 for queue_entry in queue_entries:
showardf1ae3542009-05-11 19:26:02 +00002937 queue_entry.set_execution_subdir(group_subdir_name)
showard2bab8f42008-11-12 18:15:22 +00002938
2939
2940 def _choose_group_to_run(self, include_queue_entry):
showardf1ae3542009-05-11 19:26:02 +00002941 """
2942 @returns A tuple containing a list of HostQueueEntry instances to be
2943 used to run this Job, a string group name to suggest giving
showard54c1ea92009-05-20 00:32:58 +00002944 to this job in the results database.
showardf1ae3542009-05-11 19:26:02 +00002945 """
showard77182562009-06-10 00:16:05 +00002946 atomic_group = include_queue_entry.atomic_group
showard2bab8f42008-11-12 18:15:22 +00002947 chosen_entries = [include_queue_entry]
showardf1ae3542009-05-11 19:26:02 +00002948 if atomic_group:
2949 num_entries_wanted = atomic_group.max_number_of_machines
2950 else:
2951 num_entries_wanted = self.synch_count
2952 num_entries_wanted -= len(chosen_entries)
showard2bab8f42008-11-12 18:15:22 +00002953
showardf1ae3542009-05-11 19:26:02 +00002954 if num_entries_wanted > 0:
2955 where_clause = 'job_id = %s AND status = "Pending" AND id != %s'
showard54c1ea92009-05-20 00:32:58 +00002956 pending_entries = list(HostQueueEntry.fetch(
showardf1ae3542009-05-11 19:26:02 +00002957 where=where_clause,
showard54c1ea92009-05-20 00:32:58 +00002958 params=(self.id, include_queue_entry.id)))
2959
2960 # Sort the chosen hosts by hostname before slicing.
2961 def cmp_queue_entries_by_hostname(entry_a, entry_b):
2962 return Host.cmp_for_sort(entry_a.host, entry_b.host)
2963 pending_entries.sort(cmp=cmp_queue_entries_by_hostname)
2964 chosen_entries += pending_entries[:num_entries_wanted]
showard2bab8f42008-11-12 18:15:22 +00002965
showardf1ae3542009-05-11 19:26:02 +00002966 # Sanity check. We'll only ever be called if this can be met.
2967 assert len(chosen_entries) >= self.synch_count
2968
2969 if atomic_group:
2970 # Look at any meta_host and dependency labels and pick the first
2971 # one that also specifies this atomic group. Use that label name
2972 # as the group name if possible (it is more specific).
2973 group_name = atomic_group.name
2974 for label in include_queue_entry.get_labels():
2975 if label.atomic_group_id:
2976 assert label.atomic_group_id == atomic_group.id
2977 group_name = label.name
2978 break
2979 else:
2980 group_name = ''
2981
2982 self._assign_new_group(chosen_entries, group_name=group_name)
2983 return chosen_entries, group_name
showard2bab8f42008-11-12 18:15:22 +00002984
2985
showard77182562009-06-10 00:16:05 +00002986 def run_if_ready(self, queue_entry):
2987 """
2988 @returns An Agent instance to ultimately run this job if enough hosts
2989 are ready for it to run.
2990 @returns None and potentially cleans up excess hosts if this Job
2991 is not ready to run.
2992 """
showardb2e2c322008-10-14 17:33:55 +00002993 if not self.is_ready():
showard77182562009-06-10 00:16:05 +00002994 self.stop_if_necessary()
2995 return None
mbligh36768f02008-02-22 18:28:33 +00002996
showard77182562009-06-10 00:16:05 +00002997 if queue_entry.atomic_group:
2998 return self.run_with_ready_delay(queue_entry)
2999
3000 return self.run(queue_entry)
3001
3002
3003 def run_with_ready_delay(self, queue_entry):
3004 """
3005 Start a delay to wait for more hosts to enter Pending state before
3006 launching an atomic group job. Once set, the a delay cannot be reset.
3007
3008 @param queue_entry: The HostQueueEntry object to get atomic group
3009 info from and pass to run_if_ready when the delay is up.
3010
3011 @returns An Agent to run the job as appropriate or None if a delay
3012 has already been set.
3013 """
3014 assert queue_entry.job_id == self.id
3015 assert queue_entry.atomic_group
3016 delay = scheduler_config.config.secs_to_wait_for_atomic_group_hosts
3017 pending_threshold = queue_entry.atomic_group.max_number_of_machines
3018 over_max_threshold = (self._pending_count() >= pending_threshold)
3019 delay_expired = (self._delay_ready_task and
3020 time.time() >= self._delay_ready_task.end_time)
3021
3022 # Delay is disabled or we already have enough? Do not wait to run.
3023 if not delay or over_max_threshold or delay_expired:
3024 return self.run(queue_entry)
3025
3026 # A delay was previously scheduled.
3027 if self._delay_ready_task:
3028 return None
3029
3030 def run_job_after_delay():
3031 logging.info('Job %s done waiting for extra hosts.', self.id)
3032 return self.run(queue_entry)
3033
3034 self._delay_ready_task = DelayedCallTask(delay_seconds=delay,
3035 callback=run_job_after_delay)
3036
3037 return Agent([self._delay_ready_task], num_processes=0)
3038
3039
3040 def run(self, queue_entry):
3041 """
3042 @param queue_entry: The HostQueueEntry instance calling this method.
3043 @returns An Agent instance to run this job or None if we've already
3044 been run.
3045 """
3046 if queue_entry.atomic_group and self._atomic_and_has_started():
3047 logging.error('Job.run() called on running atomic Job %d '
3048 'with HQE %s.', self.id, queue_entry)
3049 return None
showardf1ae3542009-05-11 19:26:02 +00003050 queue_entries, group_name = self._choose_group_to_run(queue_entry)
3051 return self._finish_run(queue_entries, group_name)
showardb2e2c322008-10-14 17:33:55 +00003052
3053
showardf1ae3542009-05-11 19:26:02 +00003054 def _finish_run(self, queue_entries, group_name):
showardb2ccdda2008-10-28 20:39:05 +00003055 for queue_entry in queue_entries:
3056 queue_entry.set_status('Starting')
showardb2e2c322008-10-14 17:33:55 +00003057 params = self._get_autoserv_params(queue_entries)
3058 queue_task = QueueTask(job=self, queue_entries=queue_entries,
showardf1ae3542009-05-11 19:26:02 +00003059 cmd=params, group_name=group_name)
3060 tasks = [queue_task]
showard77182562009-06-10 00:16:05 +00003061 if self._delay_ready_task:
3062 # Cancel any pending callback that would try to run again
3063 # as we are already running.
3064 self._delay_ready_task.abort()
showardb2e2c322008-10-14 17:33:55 +00003065
showard170873e2009-01-07 00:22:26 +00003066 return Agent(tasks, num_processes=len(queue_entries))
showardb2e2c322008-10-14 17:33:55 +00003067
3068
mbligh36768f02008-02-22 18:28:33 +00003069if __name__ == '__main__':
jadmanski0afbb632008-06-06 21:10:57 +00003070 main()