blob: dc72690cb69b8cf6250bfc5be753a84818944dac [file] [log] [blame]
mbligh36768f02008-02-22 18:28:33 +00001#!/usr/bin/python -u
2
3"""
4Autotest scheduler
5"""
showard909c7a62008-07-15 21:52:38 +00006
mbligh36768f02008-02-22 18:28:33 +00007
showardef519212009-05-08 02:29:53 +00008import datetime, errno, optparse, os, pwd, Queue, re, shutil, signal
showard542e8402008-09-19 20:16:18 +00009import smtplib, socket, stat, subprocess, sys, tempfile, time, traceback
showard136e6dc2009-06-10 19:38:49 +000010import itertools, logging, weakref
mbligh70feeee2008-06-11 16:20:49 +000011import common
mbligh8bcd23a2009-02-03 19:14:06 +000012import MySQLdb
showard043c62a2009-06-10 19:48:57 +000013from autotest_lib.scheduler import scheduler_logging_config
showard21baa452008-10-21 00:08:39 +000014from autotest_lib.frontend import setup_django_environment
showard136e6dc2009-06-10 19:38:49 +000015from autotest_lib.client.common_lib import global_config, logging_manager
showardb18134f2009-03-20 20:52:18 +000016from autotest_lib.client.common_lib import host_protections, utils
showardb1e51872008-10-07 11:08:18 +000017from autotest_lib.database import database_connection
showard844960a2009-05-29 18:41:18 +000018from autotest_lib.frontend.afe import models, rpc_utils, readonly_connection
showard170873e2009-01-07 00:22:26 +000019from autotest_lib.scheduler import drone_manager, drones, email_manager
showard043c62a2009-06-10 19:48:57 +000020from autotest_lib.scheduler import monitor_db_cleanup
showardd1ee1dd2009-01-07 21:33:08 +000021from autotest_lib.scheduler import status_server, scheduler_config
mbligh70feeee2008-06-11 16:20:49 +000022
mblighb090f142008-02-27 21:33:46 +000023
mbligh36768f02008-02-22 18:28:33 +000024RESULTS_DIR = '.'
25AUTOSERV_NICE_LEVEL = 10
showard170873e2009-01-07 00:22:26 +000026DB_CONFIG_SECTION = 'AUTOTEST_WEB'
mbligh36768f02008-02-22 18:28:33 +000027AUTOTEST_PATH = os.path.join(os.path.dirname(__file__), '..')
28
29if os.environ.has_key('AUTOTEST_DIR'):
jadmanski0afbb632008-06-06 21:10:57 +000030 AUTOTEST_PATH = os.environ['AUTOTEST_DIR']
mbligh36768f02008-02-22 18:28:33 +000031AUTOTEST_SERVER_DIR = os.path.join(AUTOTEST_PATH, 'server')
32AUTOTEST_TKO_DIR = os.path.join(AUTOTEST_PATH, 'tko')
33
34if AUTOTEST_SERVER_DIR not in sys.path:
jadmanski0afbb632008-06-06 21:10:57 +000035 sys.path.insert(0, AUTOTEST_SERVER_DIR)
mbligh36768f02008-02-22 18:28:33 +000036
mbligh90a549d2008-03-25 23:52:34 +000037# how long to wait for autoserv to write a pidfile
38PIDFILE_TIMEOUT = 5 * 60 # 5 min
mblighbb421852008-03-11 22:36:16 +000039
showardd3dc1992009-04-22 21:01:40 +000040_AUTOSERV_PID_FILE = '.autoserv_execute'
41_CRASHINFO_PID_FILE = '.collect_crashinfo_execute'
42_PARSER_PID_FILE = '.parser_execute'
43
44_ALL_PIDFILE_NAMES = (_AUTOSERV_PID_FILE, _CRASHINFO_PID_FILE,
45 _PARSER_PID_FILE)
46
showard35162b02009-03-03 02:17:30 +000047# error message to leave in results dir when an autoserv process disappears
48# mysteriously
49_LOST_PROCESS_ERROR = """\
50Autoserv failed abnormally during execution for this job, probably due to a
51system error on the Autotest server. Full results may not be available. Sorry.
52"""
53
mbligh6f8bab42008-02-29 22:45:14 +000054_db = None
mbligh36768f02008-02-22 18:28:33 +000055_shutdown = False
showard170873e2009-01-07 00:22:26 +000056_autoserv_path = os.path.join(drones.AUTOTEST_INSTALL_DIR, 'server', 'autoserv')
57_parser_path = os.path.join(drones.AUTOTEST_INSTALL_DIR, 'tko', 'parse')
mbligh4314a712008-02-29 22:44:30 +000058_testing_mode = False
showard542e8402008-09-19 20:16:18 +000059_base_url = None
showardc85c21b2008-11-24 22:17:37 +000060_notify_email_statuses = []
showard170873e2009-01-07 00:22:26 +000061_drone_manager = drone_manager.DroneManager()
mbligh36768f02008-02-22 18:28:33 +000062
63
mbligh83c1e9e2009-05-01 23:10:41 +000064def _site_init_monitor_db_dummy():
65 return {}
66
67
mbligh36768f02008-02-22 18:28:33 +000068def main():
showard27f33872009-04-07 18:20:53 +000069 try:
70 main_without_exception_handling()
showard29caa4b2009-05-26 19:27:09 +000071 except SystemExit:
72 raise
showard27f33872009-04-07 18:20:53 +000073 except:
74 logging.exception('Exception escaping in monitor_db')
75 raise
76
77
78def main_without_exception_handling():
showard136e6dc2009-06-10 19:38:49 +000079 setup_logging()
mbligh36768f02008-02-22 18:28:33 +000080
showard136e6dc2009-06-10 19:38:49 +000081 usage = 'usage: %prog [options] results_dir'
jadmanski0afbb632008-06-06 21:10:57 +000082 parser = optparse.OptionParser(usage)
83 parser.add_option('--recover-hosts', help='Try to recover dead hosts',
84 action='store_true')
jadmanski0afbb632008-06-06 21:10:57 +000085 parser.add_option('--test', help='Indicate that scheduler is under ' +
86 'test and should use dummy autoserv and no parsing',
87 action='store_true')
88 (options, args) = parser.parse_args()
89 if len(args) != 1:
90 parser.print_usage()
91 return
mbligh36768f02008-02-22 18:28:33 +000092
showard5613c662009-06-08 23:30:33 +000093 scheduler_enabled = global_config.global_config.get_config_value(
94 scheduler_config.CONFIG_SECTION, 'enable_scheduler', type=bool)
95
96 if not scheduler_enabled:
97 msg = ("Scheduler not enabled, set enable_scheduler to true in the "
98 "global_config's SCHEDULER section to enabled it. Exiting.")
99 print msg
100 sys.exit(1)
101
jadmanski0afbb632008-06-06 21:10:57 +0000102 global RESULTS_DIR
103 RESULTS_DIR = args[0]
mbligh36768f02008-02-22 18:28:33 +0000104
mbligh83c1e9e2009-05-01 23:10:41 +0000105 site_init = utils.import_site_function(__file__,
106 "autotest_lib.scheduler.site_monitor_db", "site_init_monitor_db",
107 _site_init_monitor_db_dummy)
108 site_init()
109
showardcca334f2009-03-12 20:38:34 +0000110 # Change the cwd while running to avoid issues incase we were launched from
111 # somewhere odd (such as a random NFS home directory of the person running
112 # sudo to launch us as the appropriate user).
113 os.chdir(RESULTS_DIR)
114
jadmanski0afbb632008-06-06 21:10:57 +0000115 c = global_config.global_config
showardd1ee1dd2009-01-07 21:33:08 +0000116 notify_statuses_list = c.get_config_value(scheduler_config.CONFIG_SECTION,
117 "notify_email_statuses",
118 default='')
showardc85c21b2008-11-24 22:17:37 +0000119 global _notify_email_statuses
showard170873e2009-01-07 00:22:26 +0000120 _notify_email_statuses = [status for status in
121 re.split(r'[\s,;:]', notify_statuses_list.lower())
122 if status]
showardc85c21b2008-11-24 22:17:37 +0000123
jadmanski0afbb632008-06-06 21:10:57 +0000124 if options.test:
125 global _autoserv_path
126 _autoserv_path = 'autoserv_dummy'
127 global _testing_mode
128 _testing_mode = True
mbligh36768f02008-02-22 18:28:33 +0000129
mbligh37eceaa2008-12-15 22:56:37 +0000130 # AUTOTEST_WEB.base_url is still a supported config option as some people
131 # may wish to override the entire url.
showard542e8402008-09-19 20:16:18 +0000132 global _base_url
showard170873e2009-01-07 00:22:26 +0000133 config_base_url = c.get_config_value(DB_CONFIG_SECTION, 'base_url',
134 default='')
mbligh37eceaa2008-12-15 22:56:37 +0000135 if config_base_url:
136 _base_url = config_base_url
showard542e8402008-09-19 20:16:18 +0000137 else:
mbligh37eceaa2008-12-15 22:56:37 +0000138 # For the common case of everything running on a single server you
139 # can just set the hostname in a single place in the config file.
140 server_name = c.get_config_value('SERVER', 'hostname')
141 if not server_name:
showardb18134f2009-03-20 20:52:18 +0000142 logging.critical('[SERVER] hostname missing from the config file.')
mbligh37eceaa2008-12-15 22:56:37 +0000143 sys.exit(1)
144 _base_url = 'http://%s/afe/' % server_name
showard542e8402008-09-19 20:16:18 +0000145
showardc5afc462009-01-13 00:09:39 +0000146 server = status_server.StatusServer(_drone_manager)
showardd1ee1dd2009-01-07 21:33:08 +0000147 server.start()
148
jadmanski0afbb632008-06-06 21:10:57 +0000149 try:
showard136e6dc2009-06-10 19:38:49 +0000150 init()
showardc5afc462009-01-13 00:09:39 +0000151 dispatcher = Dispatcher()
showard915958d2009-04-22 21:00:58 +0000152 dispatcher.initialize(recover_hosts=options.recover_hosts)
showardc5afc462009-01-13 00:09:39 +0000153
jadmanski0afbb632008-06-06 21:10:57 +0000154 while not _shutdown:
155 dispatcher.tick()
showardd1ee1dd2009-01-07 21:33:08 +0000156 time.sleep(scheduler_config.config.tick_pause_sec)
jadmanski0afbb632008-06-06 21:10:57 +0000157 except:
showard170873e2009-01-07 00:22:26 +0000158 email_manager.manager.log_stacktrace(
159 "Uncaught exception; terminating monitor_db")
jadmanski0afbb632008-06-06 21:10:57 +0000160
showard170873e2009-01-07 00:22:26 +0000161 email_manager.manager.send_queued_emails()
showard55b4b542009-01-08 23:30:30 +0000162 server.shutdown()
showard170873e2009-01-07 00:22:26 +0000163 _drone_manager.shutdown()
jadmanski0afbb632008-06-06 21:10:57 +0000164 _db.disconnect()
mbligh36768f02008-02-22 18:28:33 +0000165
166
showard136e6dc2009-06-10 19:38:49 +0000167def setup_logging():
168 log_dir = os.environ.get('AUTOTEST_SCHEDULER_LOG_DIR', None)
169 log_name = os.environ.get('AUTOTEST_SCHEDULER_LOG_NAME', None)
170 logging_manager.configure_logging(
171 scheduler_logging_config.SchedulerLoggingConfig(), log_dir=log_dir,
172 logfile_name=log_name)
173
174
mbligh36768f02008-02-22 18:28:33 +0000175def handle_sigint(signum, frame):
jadmanski0afbb632008-06-06 21:10:57 +0000176 global _shutdown
177 _shutdown = True
showardb18134f2009-03-20 20:52:18 +0000178 logging.info("Shutdown request received.")
mbligh36768f02008-02-22 18:28:33 +0000179
180
showard136e6dc2009-06-10 19:38:49 +0000181def init():
showardb18134f2009-03-20 20:52:18 +0000182 logging.info("%s> dispatcher starting", time.strftime("%X %x"))
183 logging.info("My PID is %d", os.getpid())
mbligh36768f02008-02-22 18:28:33 +0000184
mblighfb676032009-04-01 18:25:38 +0000185 utils.write_pid("monitor_db")
186
showardb1e51872008-10-07 11:08:18 +0000187 if _testing_mode:
188 global_config.global_config.override_config_value(
showard170873e2009-01-07 00:22:26 +0000189 DB_CONFIG_SECTION, 'database', 'stresstest_autotest_web')
showardb1e51872008-10-07 11:08:18 +0000190
jadmanski0afbb632008-06-06 21:10:57 +0000191 os.environ['PATH'] = AUTOTEST_SERVER_DIR + ':' + os.environ['PATH']
192 global _db
showard170873e2009-01-07 00:22:26 +0000193 _db = database_connection.DatabaseConnection(DB_CONFIG_SECTION)
jadmanski0afbb632008-06-06 21:10:57 +0000194 _db.connect()
mbligh36768f02008-02-22 18:28:33 +0000195
showardfa8629c2008-11-04 16:51:23 +0000196 # ensure Django connection is in autocommit
197 setup_django_environment.enable_autocommit()
showard844960a2009-05-29 18:41:18 +0000198 # bypass the readonly connection
199 readonly_connection.ReadOnlyConnection.set_globally_disabled(True)
showardfa8629c2008-11-04 16:51:23 +0000200
showardb18134f2009-03-20 20:52:18 +0000201 logging.info("Setting signal handler")
jadmanski0afbb632008-06-06 21:10:57 +0000202 signal.signal(signal.SIGINT, handle_sigint)
203
showardd1ee1dd2009-01-07 21:33:08 +0000204 drones = global_config.global_config.get_config_value(
205 scheduler_config.CONFIG_SECTION, 'drones', default='localhost')
206 drone_list = [hostname.strip() for hostname in drones.split(',')]
showard170873e2009-01-07 00:22:26 +0000207 results_host = global_config.global_config.get_config_value(
showardd1ee1dd2009-01-07 21:33:08 +0000208 scheduler_config.CONFIG_SECTION, 'results_host', default='localhost')
showard170873e2009-01-07 00:22:26 +0000209 _drone_manager.initialize(RESULTS_DIR, drone_list, results_host)
210
showardb18134f2009-03-20 20:52:18 +0000211 logging.info("Connected! Running...")
mbligh36768f02008-02-22 18:28:33 +0000212
213
showard87ba02a2009-04-20 19:37:32 +0000214def _autoserv_command_line(machines, results_dir, extra_args, job=None,
215 queue_entry=None):
showardf1ae3542009-05-11 19:26:02 +0000216 """
217 @returns The autoserv command line as a list of executable + parameters.
218
219 @param machines - string - A machine or comma separated list of machines
220 for the (-m) flag.
221 @param results_dir - string - Where the results will be written (-r).
222 @param extra_args - list - Additional arguments to pass to autoserv.
223 @param job - Job object - If supplied, -u owner and -l name parameters
224 will be added.
225 @param queue_entry - A HostQueueEntry object - If supplied and no Job
226 object was supplied, this will be used to lookup the Job object.
227 """
showard87ba02a2009-04-20 19:37:32 +0000228 autoserv_argv = [_autoserv_path, '-p', '-m', machines,
229 '-r', _drone_manager.absolute_path(results_dir)]
230 if job or queue_entry:
231 if not job:
232 job = queue_entry.job
233 autoserv_argv += ['-u', job.owner, '-l', job.name]
234 return autoserv_argv + extra_args
235
236
showard89f84db2009-03-12 20:39:13 +0000237class SchedulerError(Exception):
238 """Raised by HostScheduler when an inconsistent state occurs."""
239
240
showard63a34772008-08-18 19:32:50 +0000241class HostScheduler(object):
242 def _get_ready_hosts(self):
243 # avoid any host with a currently active queue entry against it
244 hosts = Host.fetch(
245 joins='LEFT JOIN host_queue_entries AS active_hqe '
246 'ON (hosts.id = active_hqe.host_id AND '
showardb1e51872008-10-07 11:08:18 +0000247 'active_hqe.active)',
showard63a34772008-08-18 19:32:50 +0000248 where="active_hqe.host_id IS NULL "
showardb1e51872008-10-07 11:08:18 +0000249 "AND NOT hosts.locked "
showard63a34772008-08-18 19:32:50 +0000250 "AND (hosts.status IS NULL OR hosts.status = 'Ready')")
251 return dict((host.id, host) for host in hosts)
252
253
254 @staticmethod
255 def _get_sql_id_list(id_list):
256 return ','.join(str(item_id) for item_id in id_list)
257
258
259 @classmethod
showard989f25d2008-10-01 11:38:11 +0000260 def _get_many2many_dict(cls, query, id_list, flip=False):
mbligh849a0f62008-08-28 20:12:19 +0000261 if not id_list:
262 return {}
showard63a34772008-08-18 19:32:50 +0000263 query %= cls._get_sql_id_list(id_list)
264 rows = _db.execute(query)
showard989f25d2008-10-01 11:38:11 +0000265 return cls._process_many2many_dict(rows, flip)
266
267
268 @staticmethod
269 def _process_many2many_dict(rows, flip=False):
showard63a34772008-08-18 19:32:50 +0000270 result = {}
271 for row in rows:
showard89f84db2009-03-12 20:39:13 +0000272 left_id, right_id = int(row[0]), int(row[1])
showard989f25d2008-10-01 11:38:11 +0000273 if flip:
274 left_id, right_id = right_id, left_id
showard63a34772008-08-18 19:32:50 +0000275 result.setdefault(left_id, set()).add(right_id)
276 return result
277
278
279 @classmethod
280 def _get_job_acl_groups(cls, job_ids):
281 query = """
showardd9ac4452009-02-07 02:04:37 +0000282 SELECT jobs.id, acl_groups_users.aclgroup_id
showard63a34772008-08-18 19:32:50 +0000283 FROM jobs
284 INNER JOIN users ON users.login = jobs.owner
285 INNER JOIN acl_groups_users ON acl_groups_users.user_id = users.id
286 WHERE jobs.id IN (%s)
287 """
288 return cls._get_many2many_dict(query, job_ids)
289
290
291 @classmethod
292 def _get_job_ineligible_hosts(cls, job_ids):
293 query = """
294 SELECT job_id, host_id
295 FROM ineligible_host_queues
296 WHERE job_id IN (%s)
297 """
298 return cls._get_many2many_dict(query, job_ids)
299
300
301 @classmethod
showard989f25d2008-10-01 11:38:11 +0000302 def _get_job_dependencies(cls, job_ids):
303 query = """
304 SELECT job_id, label_id
305 FROM jobs_dependency_labels
306 WHERE job_id IN (%s)
307 """
308 return cls._get_many2many_dict(query, job_ids)
309
310
311 @classmethod
showard63a34772008-08-18 19:32:50 +0000312 def _get_host_acls(cls, host_ids):
313 query = """
showardd9ac4452009-02-07 02:04:37 +0000314 SELECT host_id, aclgroup_id
showard63a34772008-08-18 19:32:50 +0000315 FROM acl_groups_hosts
316 WHERE host_id IN (%s)
317 """
318 return cls._get_many2many_dict(query, host_ids)
319
320
321 @classmethod
322 def _get_label_hosts(cls, host_ids):
showardfa8629c2008-11-04 16:51:23 +0000323 if not host_ids:
324 return {}, {}
showard63a34772008-08-18 19:32:50 +0000325 query = """
326 SELECT label_id, host_id
327 FROM hosts_labels
328 WHERE host_id IN (%s)
showard989f25d2008-10-01 11:38:11 +0000329 """ % cls._get_sql_id_list(host_ids)
330 rows = _db.execute(query)
331 labels_to_hosts = cls._process_many2many_dict(rows)
332 hosts_to_labels = cls._process_many2many_dict(rows, flip=True)
333 return labels_to_hosts, hosts_to_labels
334
335
336 @classmethod
337 def _get_labels(cls):
338 return dict((label.id, label) for label in Label.fetch())
showard63a34772008-08-18 19:32:50 +0000339
340
341 def refresh(self, pending_queue_entries):
342 self._hosts_available = self._get_ready_hosts()
343
344 relevant_jobs = [queue_entry.job_id
345 for queue_entry in pending_queue_entries]
346 self._job_acls = self._get_job_acl_groups(relevant_jobs)
347 self._ineligible_hosts = self._get_job_ineligible_hosts(relevant_jobs)
showard989f25d2008-10-01 11:38:11 +0000348 self._job_dependencies = self._get_job_dependencies(relevant_jobs)
showard63a34772008-08-18 19:32:50 +0000349
350 host_ids = self._hosts_available.keys()
351 self._host_acls = self._get_host_acls(host_ids)
showard989f25d2008-10-01 11:38:11 +0000352 self._label_hosts, self._host_labels = self._get_label_hosts(host_ids)
353
354 self._labels = self._get_labels()
showard63a34772008-08-18 19:32:50 +0000355
356
357 def _is_acl_accessible(self, host_id, queue_entry):
358 job_acls = self._job_acls.get(queue_entry.job_id, set())
359 host_acls = self._host_acls.get(host_id, set())
360 return len(host_acls.intersection(job_acls)) > 0
361
362
showard989f25d2008-10-01 11:38:11 +0000363 def _check_job_dependencies(self, job_dependencies, host_labels):
364 missing = job_dependencies - host_labels
showard89f84db2009-03-12 20:39:13 +0000365 return len(missing) == 0
showard989f25d2008-10-01 11:38:11 +0000366
367
368 def _check_only_if_needed_labels(self, job_dependencies, host_labels,
369 queue_entry):
showardade14e22009-01-26 22:38:32 +0000370 if not queue_entry.meta_host:
371 # bypass only_if_needed labels when a specific host is selected
372 return True
373
showard989f25d2008-10-01 11:38:11 +0000374 for label_id in host_labels:
375 label = self._labels[label_id]
376 if not label.only_if_needed:
377 # we don't care about non-only_if_needed labels
378 continue
379 if queue_entry.meta_host == label_id:
380 # if the label was requested in a metahost it's OK
381 continue
382 if label_id not in job_dependencies:
383 return False
384 return True
385
386
showard89f84db2009-03-12 20:39:13 +0000387 def _check_atomic_group_labels(self, host_labels, queue_entry):
388 """
389 Determine if the given HostQueueEntry's atomic group settings are okay
390 to schedule on a host with the given labels.
391
392 @param host_labels - A list of label ids that the host has.
393 @param queue_entry - The HostQueueEntry being considered for the host.
394
395 @returns True if atomic group settings are okay, False otherwise.
396 """
397 return (self._get_host_atomic_group_id(host_labels) ==
398 queue_entry.atomic_group_id)
399
400
401 def _get_host_atomic_group_id(self, host_labels):
402 """
403 Return the atomic group label id for a host with the given set of
404 labels if any, or None otherwise. Raises an exception if more than
405 one atomic group are found in the set of labels.
406
407 @param host_labels - A list of label ids that the host has.
408
409 @returns The id of the atomic group found on a label in host_labels
410 or None if no atomic group label is found.
411 @raises SchedulerError - If more than one atomic group label is found.
412 """
413 atomic_ids = [self._labels[label_id].atomic_group_id
414 for label_id in host_labels
415 if self._labels[label_id].atomic_group_id is not None]
416 if not atomic_ids:
417 return None
418 if len(atomic_ids) > 1:
419 raise SchedulerError('More than one atomic label on host.')
420 return atomic_ids[0]
421
422
423 def _get_atomic_group_labels(self, atomic_group_id):
424 """
425 Lookup the label ids that an atomic_group is associated with.
426
427 @param atomic_group_id - The id of the AtomicGroup to look up.
428
429 @returns A generator yeilding Label ids for this atomic group.
430 """
431 return (id for id, label in self._labels.iteritems()
432 if label.atomic_group_id == atomic_group_id
433 and not label.invalid)
434
435
showard54c1ea92009-05-20 00:32:58 +0000436 def _get_eligible_host_ids_in_group(self, group_hosts, queue_entry):
showard89f84db2009-03-12 20:39:13 +0000437 """
438 @param group_hosts - A sequence of Host ids to test for usability
439 and eligibility against the Job associated with queue_entry.
440 @param queue_entry - The HostQueueEntry that these hosts are being
441 tested for eligibility against.
442
443 @returns A subset of group_hosts Host ids that are eligible for the
444 supplied queue_entry.
445 """
446 return set(host_id for host_id in group_hosts
447 if self._is_host_usable(host_id)
448 and self._is_host_eligible_for_job(host_id, queue_entry))
449
450
showard989f25d2008-10-01 11:38:11 +0000451 def _is_host_eligible_for_job(self, host_id, queue_entry):
showard2924b0a2009-06-18 23:16:15 +0000452 if self._is_host_invalid(host_id):
453 # if an invalid host is scheduled for a job, it's a one-time host
454 # and it therefore bypasses eligibility checks. note this can only
455 # happen for non-metahosts, because invalid hosts have their label
456 # relationships cleared.
457 return True
458
showard989f25d2008-10-01 11:38:11 +0000459 job_dependencies = self._job_dependencies.get(queue_entry.job_id, set())
460 host_labels = self._host_labels.get(host_id, set())
mblighc993bee2008-10-03 03:42:34 +0000461
showard89f84db2009-03-12 20:39:13 +0000462 return (self._is_acl_accessible(host_id, queue_entry) and
463 self._check_job_dependencies(job_dependencies, host_labels) and
464 self._check_only_if_needed_labels(
465 job_dependencies, host_labels, queue_entry) and
466 self._check_atomic_group_labels(host_labels, queue_entry))
showard989f25d2008-10-01 11:38:11 +0000467
468
showard2924b0a2009-06-18 23:16:15 +0000469 def _is_host_invalid(self, host_id):
470 host_object = self._hosts_available.get(host_id, None)
471 return host_object and host_object.invalid
472
473
showard63a34772008-08-18 19:32:50 +0000474 def _schedule_non_metahost(self, queue_entry):
showard989f25d2008-10-01 11:38:11 +0000475 if not self._is_host_eligible_for_job(queue_entry.host_id, queue_entry):
showard63a34772008-08-18 19:32:50 +0000476 return None
477 return self._hosts_available.pop(queue_entry.host_id, None)
478
479
480 def _is_host_usable(self, host_id):
481 if host_id not in self._hosts_available:
482 # host was already used during this scheduling cycle
483 return False
484 if self._hosts_available[host_id].invalid:
485 # Invalid hosts cannot be used for metahosts. They're included in
486 # the original query because they can be used by non-metahosts.
487 return False
488 return True
489
490
491 def _schedule_metahost(self, queue_entry):
492 label_id = queue_entry.meta_host
493 hosts_in_label = self._label_hosts.get(label_id, set())
494 ineligible_host_ids = self._ineligible_hosts.get(queue_entry.job_id,
495 set())
496
497 # must iterate over a copy so we can mutate the original while iterating
498 for host_id in list(hosts_in_label):
499 if not self._is_host_usable(host_id):
500 hosts_in_label.remove(host_id)
501 continue
502 if host_id in ineligible_host_ids:
503 continue
showard989f25d2008-10-01 11:38:11 +0000504 if not self._is_host_eligible_for_job(host_id, queue_entry):
showard63a34772008-08-18 19:32:50 +0000505 continue
506
showard89f84db2009-03-12 20:39:13 +0000507 # Remove the host from our cached internal state before returning
508 # the host object.
showard63a34772008-08-18 19:32:50 +0000509 hosts_in_label.remove(host_id)
510 return self._hosts_available.pop(host_id)
511 return None
512
513
514 def find_eligible_host(self, queue_entry):
515 if not queue_entry.meta_host:
showard89f84db2009-03-12 20:39:13 +0000516 assert queue_entry.host_id is not None
showard63a34772008-08-18 19:32:50 +0000517 return self._schedule_non_metahost(queue_entry)
showard89f84db2009-03-12 20:39:13 +0000518 assert queue_entry.atomic_group_id is None
showard63a34772008-08-18 19:32:50 +0000519 return self._schedule_metahost(queue_entry)
520
521
showard89f84db2009-03-12 20:39:13 +0000522 def find_eligible_atomic_group(self, queue_entry):
523 """
524 Given an atomic group host queue entry, locate an appropriate group
525 of hosts for the associated job to run on.
526
527 The caller is responsible for creating new HQEs for the additional
528 hosts returned in order to run the actual job on them.
529
530 @returns A list of Host instances in a ready state to satisfy this
531 atomic group scheduling. Hosts will all belong to the same
532 atomic group label as specified by the queue_entry.
533 An empty list will be returned if no suitable atomic
534 group could be found.
535
536 TODO(gps): what is responsible for kicking off any attempted repairs on
537 a group of hosts? not this function, but something needs to. We do
538 not communicate that reason for returning [] outside of here...
539 For now, we'll just be unschedulable if enough hosts within one group
540 enter Repair Failed state.
541 """
542 assert queue_entry.atomic_group_id is not None
543 job = queue_entry.job
544 assert job.synch_count and job.synch_count > 0
showard77182562009-06-10 00:16:05 +0000545 atomic_group = queue_entry.atomic_group
showard89f84db2009-03-12 20:39:13 +0000546 if job.synch_count > atomic_group.max_number_of_machines:
547 # Such a Job and HostQueueEntry should never be possible to
548 # create using the frontend. Regardless, we can't process it.
549 # Abort it immediately and log an error on the scheduler.
550 queue_entry.set_status(models.HostQueueEntry.Status.ABORTED)
showard7629f142009-03-27 21:02:02 +0000551 logging.error(
552 'Error: job %d synch_count=%d > requested atomic_group %d '
553 'max_number_of_machines=%d. Aborted host_queue_entry %d.',
554 job.id, job.synch_count, atomic_group.id,
555 atomic_group.max_number_of_machines, queue_entry.id)
showard89f84db2009-03-12 20:39:13 +0000556 return []
557 hosts_in_label = self._label_hosts.get(queue_entry.meta_host, set())
558 ineligible_host_ids = self._ineligible_hosts.get(queue_entry.job_id,
559 set())
560
561 # Look in each label associated with atomic_group until we find one with
562 # enough hosts to satisfy the job.
563 for atomic_label_id in self._get_atomic_group_labels(atomic_group.id):
564 group_hosts = set(self._label_hosts.get(atomic_label_id, set()))
565 if queue_entry.meta_host is not None:
566 # If we have a metahost label, only allow its hosts.
567 group_hosts.intersection_update(hosts_in_label)
568 group_hosts -= ineligible_host_ids
showard54c1ea92009-05-20 00:32:58 +0000569 eligible_host_ids_in_group = self._get_eligible_host_ids_in_group(
showard89f84db2009-03-12 20:39:13 +0000570 group_hosts, queue_entry)
571
572 # Job.synch_count is treated as "minimum synch count" when
573 # scheduling for an atomic group of hosts. The atomic group
574 # number of machines is the maximum to pick out of a single
575 # atomic group label for scheduling at one time.
576 min_hosts = job.synch_count
577 max_hosts = atomic_group.max_number_of_machines
578
showard54c1ea92009-05-20 00:32:58 +0000579 if len(eligible_host_ids_in_group) < min_hosts:
showard89f84db2009-03-12 20:39:13 +0000580 # Not enough eligible hosts in this atomic group label.
581 continue
582
showard54c1ea92009-05-20 00:32:58 +0000583 eligible_hosts_in_group = [self._hosts_available[id]
584 for id in eligible_host_ids_in_group]
showardef519212009-05-08 02:29:53 +0000585 # So that they show up in a sane order when viewing the job.
showard54c1ea92009-05-20 00:32:58 +0000586 eligible_hosts_in_group.sort(cmp=Host.cmp_for_sort)
showardef519212009-05-08 02:29:53 +0000587
showard89f84db2009-03-12 20:39:13 +0000588 # Limit ourselves to scheduling the atomic group size.
589 if len(eligible_hosts_in_group) > max_hosts:
showardef519212009-05-08 02:29:53 +0000590 eligible_hosts_in_group = eligible_hosts_in_group[:max_hosts]
showard89f84db2009-03-12 20:39:13 +0000591
592 # Remove the selected hosts from our cached internal state
593 # of available hosts in order to return the Host objects.
594 host_list = []
showard54c1ea92009-05-20 00:32:58 +0000595 for host in eligible_hosts_in_group:
596 hosts_in_label.discard(host.id)
597 self._hosts_available.pop(host.id)
598 host_list.append(host)
showard89f84db2009-03-12 20:39:13 +0000599 return host_list
600
601 return []
602
603
showard170873e2009-01-07 00:22:26 +0000604class Dispatcher(object):
jadmanski0afbb632008-06-06 21:10:57 +0000605 def __init__(self):
606 self._agents = []
showard3bb499f2008-07-03 19:42:20 +0000607 self._last_clean_time = time.time()
showard63a34772008-08-18 19:32:50 +0000608 self._host_scheduler = HostScheduler()
mblighf3294cc2009-04-08 21:17:38 +0000609 user_cleanup_time = scheduler_config.config.clean_interval
610 self._periodic_cleanup = monitor_db_cleanup.UserCleanup(
611 _db, user_cleanup_time)
612 self._24hr_upkeep = monitor_db_cleanup.TwentyFourHourUpkeep(_db)
showard170873e2009-01-07 00:22:26 +0000613 self._host_agents = {}
614 self._queue_entry_agents = {}
mbligh36768f02008-02-22 18:28:33 +0000615
mbligh36768f02008-02-22 18:28:33 +0000616
showard915958d2009-04-22 21:00:58 +0000617 def initialize(self, recover_hosts=True):
618 self._periodic_cleanup.initialize()
619 self._24hr_upkeep.initialize()
620
jadmanski0afbb632008-06-06 21:10:57 +0000621 # always recover processes
622 self._recover_processes()
mblighbb421852008-03-11 22:36:16 +0000623
jadmanski0afbb632008-06-06 21:10:57 +0000624 if recover_hosts:
625 self._recover_hosts()
mbligh36768f02008-02-22 18:28:33 +0000626
627
jadmanski0afbb632008-06-06 21:10:57 +0000628 def tick(self):
showard170873e2009-01-07 00:22:26 +0000629 _drone_manager.refresh()
mblighf3294cc2009-04-08 21:17:38 +0000630 self._run_cleanup()
jadmanski0afbb632008-06-06 21:10:57 +0000631 self._find_aborting()
showard1ff7b2e2009-05-15 23:17:18 +0000632 self._find_reverify()
showard29f7cd22009-04-29 21:16:24 +0000633 self._process_recurring_runs()
jadmanski0afbb632008-06-06 21:10:57 +0000634 self._schedule_new_jobs()
635 self._handle_agents()
showard170873e2009-01-07 00:22:26 +0000636 _drone_manager.execute_actions()
637 email_manager.manager.send_queued_emails()
mbligh36768f02008-02-22 18:28:33 +0000638
showard97aed502008-11-04 02:01:24 +0000639
mblighf3294cc2009-04-08 21:17:38 +0000640 def _run_cleanup(self):
641 self._periodic_cleanup.run_cleanup_maybe()
642 self._24hr_upkeep.run_cleanup_maybe()
showarda3ab0d52008-11-03 19:03:47 +0000643
mbligh36768f02008-02-22 18:28:33 +0000644
showard170873e2009-01-07 00:22:26 +0000645 def _register_agent_for_ids(self, agent_dict, object_ids, agent):
646 for object_id in object_ids:
647 agent_dict.setdefault(object_id, set()).add(agent)
648
649
650 def _unregister_agent_for_ids(self, agent_dict, object_ids, agent):
651 for object_id in object_ids:
652 assert object_id in agent_dict
653 agent_dict[object_id].remove(agent)
654
655
jadmanski0afbb632008-06-06 21:10:57 +0000656 def add_agent(self, agent):
657 self._agents.append(agent)
658 agent.dispatcher = self
showard170873e2009-01-07 00:22:26 +0000659 self._register_agent_for_ids(self._host_agents, agent.host_ids, agent)
660 self._register_agent_for_ids(self._queue_entry_agents,
661 agent.queue_entry_ids, agent)
mblighd5c95802008-03-05 00:33:46 +0000662
showard170873e2009-01-07 00:22:26 +0000663
664 def get_agents_for_entry(self, queue_entry):
665 """
666 Find agents corresponding to the specified queue_entry.
667 """
showardd3dc1992009-04-22 21:01:40 +0000668 return list(self._queue_entry_agents.get(queue_entry.id, set()))
showard170873e2009-01-07 00:22:26 +0000669
670
671 def host_has_agent(self, host):
672 """
673 Determine if there is currently an Agent present using this host.
674 """
675 return bool(self._host_agents.get(host.id, None))
mbligh36768f02008-02-22 18:28:33 +0000676
677
jadmanski0afbb632008-06-06 21:10:57 +0000678 def remove_agent(self, agent):
679 self._agents.remove(agent)
showard170873e2009-01-07 00:22:26 +0000680 self._unregister_agent_for_ids(self._host_agents, agent.host_ids,
681 agent)
682 self._unregister_agent_for_ids(self._queue_entry_agents,
683 agent.queue_entry_ids, agent)
showardec113162008-05-08 00:52:49 +0000684
685
jadmanski0afbb632008-06-06 21:10:57 +0000686 def _recover_processes(self):
showard170873e2009-01-07 00:22:26 +0000687 self._register_pidfiles()
688 _drone_manager.refresh()
showardd3dc1992009-04-22 21:01:40 +0000689 self._recover_all_recoverable_entries()
showard170873e2009-01-07 00:22:26 +0000690 self._requeue_other_active_entries()
showard170873e2009-01-07 00:22:26 +0000691 self._reverify_remaining_hosts()
692 # reinitialize drones after killing orphaned processes, since they can
693 # leave around files when they die
694 _drone_manager.execute_actions()
695 _drone_manager.reinitialize_drones()
mblighbb421852008-03-11 22:36:16 +0000696
showard170873e2009-01-07 00:22:26 +0000697
698 def _register_pidfiles(self):
699 # during recovery we may need to read pidfiles for both running and
700 # parsing entries
701 queue_entries = HostQueueEntry.fetch(
showardd3dc1992009-04-22 21:01:40 +0000702 where="status IN ('Running', 'Gathering', 'Parsing')")
jadmanski0afbb632008-06-06 21:10:57 +0000703 for queue_entry in queue_entries:
showardd3dc1992009-04-22 21:01:40 +0000704 for pidfile_name in _ALL_PIDFILE_NAMES:
705 pidfile_id = _drone_manager.get_pidfile_id_from(
706 queue_entry.execution_tag(), pidfile_name=pidfile_name)
707 _drone_manager.register_pidfile(pidfile_id)
showard170873e2009-01-07 00:22:26 +0000708
709
showardd3dc1992009-04-22 21:01:40 +0000710 def _recover_entries_with_status(self, status, orphans, pidfile_name,
711 recover_entries_fn):
712 queue_entries = HostQueueEntry.fetch(where="status = '%s'" % status)
showard170873e2009-01-07 00:22:26 +0000713 for queue_entry in queue_entries:
714 if self.get_agents_for_entry(queue_entry):
jadmanski0afbb632008-06-06 21:10:57 +0000715 # synchronous job we've already recovered
716 continue
showardd3dc1992009-04-22 21:01:40 +0000717 queue_entries = queue_entry.job.get_group_entries(queue_entry)
showard170873e2009-01-07 00:22:26 +0000718 execution_tag = queue_entry.execution_tag()
719 run_monitor = PidfileRunMonitor()
showardd3dc1992009-04-22 21:01:40 +0000720 run_monitor.attach_to_existing_process(execution_tag,
721 pidfile_name=pidfile_name)
showard597bfd32009-05-08 18:22:50 +0000722
723 log_message = ('Recovering %s entry %s ' %
724 (status.lower(),
725 ', '.join(str(entry) for entry in queue_entries)))
showard170873e2009-01-07 00:22:26 +0000726 if not run_monitor.has_process():
showardd3dc1992009-04-22 21:01:40 +0000727 # execution apparently never happened
showard597bfd32009-05-08 18:22:50 +0000728 logging.info(log_message + 'without process')
showardd3dc1992009-04-22 21:01:40 +0000729 recover_entries_fn(queue_entry.job, queue_entries, None)
showard170873e2009-01-07 00:22:26 +0000730 continue
mbligh90a549d2008-03-25 23:52:34 +0000731
showard597bfd32009-05-08 18:22:50 +0000732 logging.info(log_message + '(process %s)',
showardd3dc1992009-04-22 21:01:40 +0000733 run_monitor.get_process())
734 recover_entries_fn(queue_entry.job, queue_entries, run_monitor)
735 orphans.discard(run_monitor.get_process())
736
737
738 def _kill_remaining_orphan_processes(self, orphans):
739 for process in orphans:
showardb18134f2009-03-20 20:52:18 +0000740 logging.info('Killing orphan %s', process)
showard170873e2009-01-07 00:22:26 +0000741 _drone_manager.kill_process(process)
jadmanski0afbb632008-06-06 21:10:57 +0000742
showard170873e2009-01-07 00:22:26 +0000743
showardd3dc1992009-04-22 21:01:40 +0000744 def _recover_running_entries(self, orphans):
745 def recover_entries(job, queue_entries, run_monitor):
746 if run_monitor is not None:
747 queue_task = RecoveryQueueTask(job=job,
748 queue_entries=queue_entries,
749 run_monitor=run_monitor)
750 self.add_agent(Agent(tasks=[queue_task],
751 num_processes=len(queue_entries)))
752 # else, _requeue_other_active_entries will cover this
753
754 self._recover_entries_with_status(models.HostQueueEntry.Status.RUNNING,
755 orphans, '.autoserv_execute',
756 recover_entries)
757
758
759 def _recover_gathering_entries(self, orphans):
760 def recover_entries(job, queue_entries, run_monitor):
761 gather_task = GatherLogsTask(job, queue_entries,
762 run_monitor=run_monitor)
763 self.add_agent(Agent([gather_task]))
764
765 self._recover_entries_with_status(
766 models.HostQueueEntry.Status.GATHERING,
767 orphans, _CRASHINFO_PID_FILE, recover_entries)
768
769
770 def _recover_parsing_entries(self, orphans):
771 def recover_entries(job, queue_entries, run_monitor):
772 reparse_task = FinalReparseTask(queue_entries,
773 run_monitor=run_monitor)
774 self.add_agent(Agent([reparse_task], num_processes=0))
775
776 self._recover_entries_with_status(models.HostQueueEntry.Status.PARSING,
777 orphans, _PARSER_PID_FILE,
778 recover_entries)
779
780
781 def _recover_all_recoverable_entries(self):
782 orphans = _drone_manager.get_orphaned_autoserv_processes()
783 self._recover_running_entries(orphans)
784 self._recover_gathering_entries(orphans)
785 self._recover_parsing_entries(orphans)
786 self._kill_remaining_orphan_processes(orphans)
jadmanski0afbb632008-06-06 21:10:57 +0000787
showard97aed502008-11-04 02:01:24 +0000788
showard170873e2009-01-07 00:22:26 +0000789 def _requeue_other_active_entries(self):
790 queue_entries = HostQueueEntry.fetch(
showardd3dc1992009-04-22 21:01:40 +0000791 where='active AND NOT complete AND '
792 '(aborted OR status != "Pending")')
showard170873e2009-01-07 00:22:26 +0000793 for queue_entry in queue_entries:
794 if self.get_agents_for_entry(queue_entry):
795 # entry has already been recovered
796 continue
showardd3dc1992009-04-22 21:01:40 +0000797 if queue_entry.aborted:
798 queue_entry.abort(self)
799 continue
800
801 logging.info('Requeuing active QE %s (status=%s)', queue_entry,
showardb18134f2009-03-20 20:52:18 +0000802 queue_entry.status)
showard170873e2009-01-07 00:22:26 +0000803 if queue_entry.host:
804 tasks = queue_entry.host.reverify_tasks()
805 self.add_agent(Agent(tasks))
806 agent = queue_entry.requeue()
807
808
showard1ff7b2e2009-05-15 23:17:18 +0000809 def _find_reverify(self):
showard6d7b2ff2009-06-10 00:16:47 +0000810 tasks = models.SpecialTask.objects.filter(
811 task=models.SpecialTask.Task.REVERIFY, is_active=False,
812 is_complete=False)
813
814 host_ids = [str(task.host.id) for task in tasks]
815
816 if host_ids:
817 where = 'id IN (%s)' % ','.join(host_ids)
818 host_ids_reverifying = self._reverify_hosts_where(
819 where, cleanup=False)
820 tasks = tasks.filter(host__id__in=host_ids_reverifying)
821 for task in tasks:
822 task.is_active=True
823 task.save()
showard1ff7b2e2009-05-15 23:17:18 +0000824
825
showard170873e2009-01-07 00:22:26 +0000826 def _reverify_remaining_hosts(self):
showard45ae8192008-11-05 19:32:53 +0000827 # reverify hosts that were in the middle of verify, repair or cleanup
jadmanski0afbb632008-06-06 21:10:57 +0000828 self._reverify_hosts_where("""(status = 'Repairing' OR
829 status = 'Verifying' OR
showard170873e2009-01-07 00:22:26 +0000830 status = 'Cleaning')""")
jadmanski0afbb632008-06-06 21:10:57 +0000831
showard170873e2009-01-07 00:22:26 +0000832 # recover "Running" hosts with no active queue entries, although this
833 # should never happen
834 message = ('Recovering running host %s - this probably indicates a '
835 'scheduler bug')
jadmanski0afbb632008-06-06 21:10:57 +0000836 self._reverify_hosts_where("""status = 'Running' AND
837 id NOT IN (SELECT host_id
838 FROM host_queue_entries
839 WHERE active)""",
840 print_message=message)
mblighbb421852008-03-11 22:36:16 +0000841
842
jadmanski0afbb632008-06-06 21:10:57 +0000843 def _reverify_hosts_where(self, where,
showarda64e52a2009-06-08 23:24:08 +0000844 print_message='Reverifying host %s',
845 cleanup=True):
showard170873e2009-01-07 00:22:26 +0000846 full_where='locked = 0 AND invalid = 0 AND ' + where
showard6d7b2ff2009-06-10 00:16:47 +0000847 host_ids_reverifying = []
showard170873e2009-01-07 00:22:26 +0000848 for host in Host.fetch(where=full_where):
849 if self.host_has_agent(host):
850 # host has already been recovered in some way
jadmanski0afbb632008-06-06 21:10:57 +0000851 continue
showard170873e2009-01-07 00:22:26 +0000852 if print_message:
showardb18134f2009-03-20 20:52:18 +0000853 logging.info(print_message, host.hostname)
showarda64e52a2009-06-08 23:24:08 +0000854 tasks = host.reverify_tasks(cleanup)
showard170873e2009-01-07 00:22:26 +0000855 self.add_agent(Agent(tasks))
showard6d7b2ff2009-06-10 00:16:47 +0000856 host_ids_reverifying.append(host.id)
857 return host_ids_reverifying
mbligh36768f02008-02-22 18:28:33 +0000858
859
jadmanski0afbb632008-06-06 21:10:57 +0000860 def _recover_hosts(self):
861 # recover "Repair Failed" hosts
862 message = 'Reverifying dead host %s'
863 self._reverify_hosts_where("status = 'Repair Failed'",
864 print_message=message)
mbligh62ba2ed2008-04-30 17:09:25 +0000865
866
showard04c82c52008-05-29 19:38:12 +0000867
showardb95b1bd2008-08-15 18:11:04 +0000868 def _get_pending_queue_entries(self):
showard63a34772008-08-18 19:32:50 +0000869 # prioritize by job priority, then non-metahost over metahost, then FIFO
870 return list(HostQueueEntry.fetch(
showard25cbdbd2009-02-17 20:57:21 +0000871 joins='INNER JOIN jobs ON (job_id=jobs.id)',
showardac9ce222008-12-03 18:19:44 +0000872 where='NOT complete AND NOT active AND status="Queued"',
showard25cbdbd2009-02-17 20:57:21 +0000873 order_by='jobs.priority DESC, meta_host, job_id'))
mbligh36768f02008-02-22 18:28:33 +0000874
875
showard89f84db2009-03-12 20:39:13 +0000876 def _refresh_pending_queue_entries(self):
877 """
878 Lookup the pending HostQueueEntries and call our HostScheduler
879 refresh() method given that list. Return the list.
880
881 @returns A list of pending HostQueueEntries sorted in priority order.
882 """
showard63a34772008-08-18 19:32:50 +0000883 queue_entries = self._get_pending_queue_entries()
884 if not queue_entries:
showard89f84db2009-03-12 20:39:13 +0000885 return []
showardb95b1bd2008-08-15 18:11:04 +0000886
showard63a34772008-08-18 19:32:50 +0000887 self._host_scheduler.refresh(queue_entries)
showardb95b1bd2008-08-15 18:11:04 +0000888
showard89f84db2009-03-12 20:39:13 +0000889 return queue_entries
890
891
892 def _schedule_atomic_group(self, queue_entry):
893 """
894 Schedule the given queue_entry on an atomic group of hosts.
895
896 Returns immediately if there are insufficient available hosts.
897
898 Creates new HostQueueEntries based off of queue_entry for the
899 scheduled hosts and starts them all running.
900 """
901 # This is a virtual host queue entry representing an entire
902 # atomic group, find a group and schedule their hosts.
903 group_hosts = self._host_scheduler.find_eligible_atomic_group(
904 queue_entry)
905 if not group_hosts:
906 return
showardcbe6f942009-06-17 19:33:49 +0000907
908 logging.info('Expanding atomic group entry %s with hosts %s',
909 queue_entry,
910 ', '.join(host.hostname for host in group_hosts))
showard89f84db2009-03-12 20:39:13 +0000911 # The first assigned host uses the original HostQueueEntry
912 group_queue_entries = [queue_entry]
913 for assigned_host in group_hosts[1:]:
914 # Create a new HQE for every additional assigned_host.
915 new_hqe = HostQueueEntry.clone(queue_entry)
916 new_hqe.save()
917 group_queue_entries.append(new_hqe)
918 assert len(group_queue_entries) == len(group_hosts)
919 for queue_entry, host in itertools.izip(group_queue_entries,
920 group_hosts):
921 self._run_queue_entry(queue_entry, host)
922
923
924 def _schedule_new_jobs(self):
925 queue_entries = self._refresh_pending_queue_entries()
926 if not queue_entries:
927 return
928
showard63a34772008-08-18 19:32:50 +0000929 for queue_entry in queue_entries:
showard89f84db2009-03-12 20:39:13 +0000930 if (queue_entry.atomic_group_id is None or
931 queue_entry.host_id is not None):
932 assigned_host = self._host_scheduler.find_eligible_host(
933 queue_entry)
934 if assigned_host:
935 self._run_queue_entry(queue_entry, assigned_host)
936 else:
937 self._schedule_atomic_group(queue_entry)
showardb95b1bd2008-08-15 18:11:04 +0000938
939
940 def _run_queue_entry(self, queue_entry, host):
showard77182562009-06-10 00:16:05 +0000941 agent = queue_entry.run_pre_job_tasks(assigned_host=host)
942 self.add_agent(agent)
mblighd5c95802008-03-05 00:33:46 +0000943
944
jadmanski0afbb632008-06-06 21:10:57 +0000945 def _find_aborting(self):
showardd3dc1992009-04-22 21:01:40 +0000946 for entry in HostQueueEntry.fetch(where='aborted and not complete'):
947 for agent in self.get_agents_for_entry(entry):
948 agent.abort()
949 entry.abort(self)
jadmanski0afbb632008-06-06 21:10:57 +0000950
951
showard324bf812009-01-20 23:23:38 +0000952 def _can_start_agent(self, agent, num_started_this_cycle,
953 have_reached_limit):
showard4c5374f2008-09-04 17:02:56 +0000954 # always allow zero-process agents to run
955 if agent.num_processes == 0:
956 return True
957 # don't allow any nonzero-process agents to run after we've reached a
958 # limit (this avoids starvation of many-process agents)
959 if have_reached_limit:
960 return False
961 # total process throttling
showard324bf812009-01-20 23:23:38 +0000962 if agent.num_processes > _drone_manager.max_runnable_processes():
showard4c5374f2008-09-04 17:02:56 +0000963 return False
964 # if a single agent exceeds the per-cycle throttling, still allow it to
965 # run when it's the first agent in the cycle
966 if num_started_this_cycle == 0:
967 return True
968 # per-cycle throttling
969 if (num_started_this_cycle + agent.num_processes >
showardd1ee1dd2009-01-07 21:33:08 +0000970 scheduler_config.config.max_processes_started_per_cycle):
showard4c5374f2008-09-04 17:02:56 +0000971 return False
972 return True
973
974
jadmanski0afbb632008-06-06 21:10:57 +0000975 def _handle_agents(self):
jadmanski0afbb632008-06-06 21:10:57 +0000976 num_started_this_cycle = 0
showard4c5374f2008-09-04 17:02:56 +0000977 have_reached_limit = False
978 # iterate over copy, so we can remove agents during iteration
979 for agent in list(self._agents):
980 if agent.is_done():
showardb18134f2009-03-20 20:52:18 +0000981 logging.info("agent finished")
showard170873e2009-01-07 00:22:26 +0000982 self.remove_agent(agent)
showard4c5374f2008-09-04 17:02:56 +0000983 continue
984 if not agent.is_running():
showard324bf812009-01-20 23:23:38 +0000985 if not self._can_start_agent(agent, num_started_this_cycle,
showard4c5374f2008-09-04 17:02:56 +0000986 have_reached_limit):
987 have_reached_limit = True
988 continue
showard4c5374f2008-09-04 17:02:56 +0000989 num_started_this_cycle += agent.num_processes
990 agent.tick()
showarda9435c02009-05-13 21:28:17 +0000991 logging.info('%d running processes',
showardb18134f2009-03-20 20:52:18 +0000992 _drone_manager.total_running_processes())
mbligh36768f02008-02-22 18:28:33 +0000993
994
showard29f7cd22009-04-29 21:16:24 +0000995 def _process_recurring_runs(self):
996 recurring_runs = models.RecurringRun.objects.filter(
997 start_date__lte=datetime.datetime.now())
998 for rrun in recurring_runs:
999 # Create job from template
1000 job = rrun.job
1001 info = rpc_utils.get_job_info(job)
showarda9435c02009-05-13 21:28:17 +00001002 options = job.get_object_dict()
showard29f7cd22009-04-29 21:16:24 +00001003
1004 host_objects = info['hosts']
1005 one_time_hosts = info['one_time_hosts']
1006 metahost_objects = info['meta_hosts']
1007 dependencies = info['dependencies']
1008 atomic_group = info['atomic_group']
1009
1010 for host in one_time_hosts or []:
1011 this_host = models.Host.create_one_time_host(host.hostname)
1012 host_objects.append(this_host)
1013
1014 try:
1015 rpc_utils.create_new_job(owner=rrun.owner.login,
showarda9435c02009-05-13 21:28:17 +00001016 options=options,
showard29f7cd22009-04-29 21:16:24 +00001017 host_objects=host_objects,
1018 metahost_objects=metahost_objects,
showard29f7cd22009-04-29 21:16:24 +00001019 atomic_group=atomic_group)
1020
1021 except Exception, ex:
1022 logging.exception(ex)
1023 #TODO send email
1024
1025 if rrun.loop_count == 1:
1026 rrun.delete()
1027 else:
1028 if rrun.loop_count != 0: # if not infinite loop
1029 # calculate new start_date
1030 difference = datetime.timedelta(seconds=rrun.loop_period)
1031 rrun.start_date = rrun.start_date + difference
1032 rrun.loop_count -= 1
1033 rrun.save()
1034
1035
showard170873e2009-01-07 00:22:26 +00001036class PidfileRunMonitor(object):
1037 """
1038 Client must call either run() to start a new process or
1039 attach_to_existing_process().
1040 """
mbligh36768f02008-02-22 18:28:33 +00001041
showard170873e2009-01-07 00:22:26 +00001042 class _PidfileException(Exception):
1043 """
1044 Raised when there's some unexpected behavior with the pid file, but only
1045 used internally (never allowed to escape this class).
1046 """
mbligh36768f02008-02-22 18:28:33 +00001047
1048
showard170873e2009-01-07 00:22:26 +00001049 def __init__(self):
showard35162b02009-03-03 02:17:30 +00001050 self.lost_process = False
showard170873e2009-01-07 00:22:26 +00001051 self._start_time = None
1052 self.pidfile_id = None
1053 self._state = drone_manager.PidfileContents()
showard2bab8f42008-11-12 18:15:22 +00001054
1055
showard170873e2009-01-07 00:22:26 +00001056 def _add_nice_command(self, command, nice_level):
1057 if not nice_level:
1058 return command
1059 return ['nice', '-n', str(nice_level)] + command
1060
1061
1062 def _set_start_time(self):
1063 self._start_time = time.time()
1064
1065
1066 def run(self, command, working_directory, nice_level=None, log_file=None,
1067 pidfile_name=None, paired_with_pidfile=None):
1068 assert command is not None
1069 if nice_level is not None:
1070 command = ['nice', '-n', str(nice_level)] + command
1071 self._set_start_time()
1072 self.pidfile_id = _drone_manager.execute_command(
showardd3dc1992009-04-22 21:01:40 +00001073 command, working_directory, pidfile_name=pidfile_name,
1074 log_file=log_file, paired_with_pidfile=paired_with_pidfile)
showard170873e2009-01-07 00:22:26 +00001075
1076
showardd3dc1992009-04-22 21:01:40 +00001077 def attach_to_existing_process(self, execution_tag,
1078 pidfile_name=_AUTOSERV_PID_FILE):
showard170873e2009-01-07 00:22:26 +00001079 self._set_start_time()
showardd3dc1992009-04-22 21:01:40 +00001080 self.pidfile_id = _drone_manager.get_pidfile_id_from(
1081 execution_tag, pidfile_name=pidfile_name)
showard170873e2009-01-07 00:22:26 +00001082 _drone_manager.register_pidfile(self.pidfile_id)
mblighbb421852008-03-11 22:36:16 +00001083
1084
jadmanski0afbb632008-06-06 21:10:57 +00001085 def kill(self):
showard170873e2009-01-07 00:22:26 +00001086 if self.has_process():
1087 _drone_manager.kill_process(self.get_process())
mblighbb421852008-03-11 22:36:16 +00001088
mbligh36768f02008-02-22 18:28:33 +00001089
showard170873e2009-01-07 00:22:26 +00001090 def has_process(self):
showard21baa452008-10-21 00:08:39 +00001091 self._get_pidfile_info()
showard170873e2009-01-07 00:22:26 +00001092 return self._state.process is not None
showard21baa452008-10-21 00:08:39 +00001093
1094
showard170873e2009-01-07 00:22:26 +00001095 def get_process(self):
showard21baa452008-10-21 00:08:39 +00001096 self._get_pidfile_info()
showard35162b02009-03-03 02:17:30 +00001097 assert self._state.process is not None
showard170873e2009-01-07 00:22:26 +00001098 return self._state.process
mblighbb421852008-03-11 22:36:16 +00001099
1100
showard170873e2009-01-07 00:22:26 +00001101 def _read_pidfile(self, use_second_read=False):
1102 assert self.pidfile_id is not None, (
1103 'You must call run() or attach_to_existing_process()')
1104 contents = _drone_manager.get_pidfile_contents(
1105 self.pidfile_id, use_second_read=use_second_read)
1106 if contents.is_invalid():
1107 self._state = drone_manager.PidfileContents()
1108 raise self._PidfileException(contents)
1109 self._state = contents
mbligh90a549d2008-03-25 23:52:34 +00001110
1111
showard21baa452008-10-21 00:08:39 +00001112 def _handle_pidfile_error(self, error, message=''):
showard170873e2009-01-07 00:22:26 +00001113 message = error + '\nProcess: %s\nPidfile: %s\n%s' % (
1114 self._state.process, self.pidfile_id, message)
showardb18134f2009-03-20 20:52:18 +00001115 logging.info(message)
showard170873e2009-01-07 00:22:26 +00001116 email_manager.manager.enqueue_notify_email(error, message)
showard35162b02009-03-03 02:17:30 +00001117 self.on_lost_process(self._state.process)
showard21baa452008-10-21 00:08:39 +00001118
1119
1120 def _get_pidfile_info_helper(self):
showard35162b02009-03-03 02:17:30 +00001121 if self.lost_process:
showard21baa452008-10-21 00:08:39 +00001122 return
mblighbb421852008-03-11 22:36:16 +00001123
showard21baa452008-10-21 00:08:39 +00001124 self._read_pidfile()
mblighbb421852008-03-11 22:36:16 +00001125
showard170873e2009-01-07 00:22:26 +00001126 if self._state.process is None:
1127 self._handle_no_process()
showard21baa452008-10-21 00:08:39 +00001128 return
mbligh90a549d2008-03-25 23:52:34 +00001129
showard21baa452008-10-21 00:08:39 +00001130 if self._state.exit_status is None:
jadmanski0afbb632008-06-06 21:10:57 +00001131 # double check whether or not autoserv is running
showard170873e2009-01-07 00:22:26 +00001132 if _drone_manager.is_process_running(self._state.process):
showard21baa452008-10-21 00:08:39 +00001133 return
mbligh90a549d2008-03-25 23:52:34 +00001134
showard170873e2009-01-07 00:22:26 +00001135 # pid but no running process - maybe process *just* exited
1136 self._read_pidfile(use_second_read=True)
showard21baa452008-10-21 00:08:39 +00001137 if self._state.exit_status is None:
jadmanski0afbb632008-06-06 21:10:57 +00001138 # autoserv exited without writing an exit code
1139 # to the pidfile
showard21baa452008-10-21 00:08:39 +00001140 self._handle_pidfile_error(
1141 'autoserv died without writing exit code')
mblighbb421852008-03-11 22:36:16 +00001142
showard21baa452008-10-21 00:08:39 +00001143
1144 def _get_pidfile_info(self):
1145 """\
1146 After completion, self._state will contain:
1147 pid=None, exit_status=None if autoserv has not yet run
1148 pid!=None, exit_status=None if autoserv is running
1149 pid!=None, exit_status!=None if autoserv has completed
1150 """
1151 try:
1152 self._get_pidfile_info_helper()
showard170873e2009-01-07 00:22:26 +00001153 except self._PidfileException, exc:
showard21baa452008-10-21 00:08:39 +00001154 self._handle_pidfile_error('Pidfile error', traceback.format_exc())
mblighbb421852008-03-11 22:36:16 +00001155
1156
showard170873e2009-01-07 00:22:26 +00001157 def _handle_no_process(self):
jadmanski0afbb632008-06-06 21:10:57 +00001158 """\
1159 Called when no pidfile is found or no pid is in the pidfile.
1160 """
showard170873e2009-01-07 00:22:26 +00001161 message = 'No pid found at %s' % self.pidfile_id
showardb18134f2009-03-20 20:52:18 +00001162 logging.info(message)
showard170873e2009-01-07 00:22:26 +00001163 if time.time() - self._start_time > PIDFILE_TIMEOUT:
1164 email_manager.manager.enqueue_notify_email(
jadmanski0afbb632008-06-06 21:10:57 +00001165 'Process has failed to write pidfile', message)
showard35162b02009-03-03 02:17:30 +00001166 self.on_lost_process()
mbligh90a549d2008-03-25 23:52:34 +00001167
1168
showard35162b02009-03-03 02:17:30 +00001169 def on_lost_process(self, process=None):
jadmanski0afbb632008-06-06 21:10:57 +00001170 """\
1171 Called when autoserv has exited without writing an exit status,
1172 or we've timed out waiting for autoserv to write a pid to the
1173 pidfile. In either case, we just return failure and the caller
1174 should signal some kind of warning.
mbligh90a549d2008-03-25 23:52:34 +00001175
showard170873e2009-01-07 00:22:26 +00001176 process is unimportant here, as it shouldn't be used by anyone.
jadmanski0afbb632008-06-06 21:10:57 +00001177 """
1178 self.lost_process = True
showard170873e2009-01-07 00:22:26 +00001179 self._state.process = process
showard21baa452008-10-21 00:08:39 +00001180 self._state.exit_status = 1
1181 self._state.num_tests_failed = 0
mbligh90a549d2008-03-25 23:52:34 +00001182
1183
jadmanski0afbb632008-06-06 21:10:57 +00001184 def exit_code(self):
showard21baa452008-10-21 00:08:39 +00001185 self._get_pidfile_info()
1186 return self._state.exit_status
1187
1188
1189 def num_tests_failed(self):
1190 self._get_pidfile_info()
1191 assert self._state.num_tests_failed is not None
1192 return self._state.num_tests_failed
mblighbb421852008-03-11 22:36:16 +00001193
1194
mbligh36768f02008-02-22 18:28:33 +00001195class Agent(object):
showard77182562009-06-10 00:16:05 +00001196 """
1197 An agent for use by the Dispatcher class to perform a sequence of tasks.
1198
1199 The following methods are required on all task objects:
1200 poll() - Called periodically to let the task check its status and
1201 update its internal state. If the task succeeded.
1202 is_done() - Returns True if the task is finished.
1203 abort() - Called when an abort has been requested. The task must
1204 set its aborted attribute to True if it actually aborted.
1205
1206 The following attributes are required on all task objects:
1207 aborted - bool, True if this task was aborted.
1208 failure_tasks - A sequence of tasks to be run using a new Agent
1209 by the dispatcher should this task fail.
1210 success - bool, True if this task succeeded.
1211 queue_entry_ids - A sequence of HostQueueEntry ids this task handles.
1212 host_ids - A sequence of Host ids this task represents.
1213
1214 The following attribute is written to all task objects:
1215 agent - A reference to the Agent instance that the task has been
1216 added to.
1217 """
1218
1219
showard170873e2009-01-07 00:22:26 +00001220 def __init__(self, tasks, num_processes=1):
showard77182562009-06-10 00:16:05 +00001221 """
1222 @param tasks: A list of tasks as described in the class docstring.
1223 @param num_processes: The number of subprocesses the Agent represents.
1224 This is used by the Dispatcher for managing the load on the
1225 system. Defaults to 1.
1226 """
jadmanski0afbb632008-06-06 21:10:57 +00001227 self.active_task = None
showardd3dc1992009-04-22 21:01:40 +00001228 self.queue = None
showard77182562009-06-10 00:16:05 +00001229 # This is filled in by Dispatcher.add_agent()
jadmanski0afbb632008-06-06 21:10:57 +00001230 self.dispatcher = None
showard4c5374f2008-09-04 17:02:56 +00001231 self.num_processes = num_processes
jadmanski0afbb632008-06-06 21:10:57 +00001232
showard170873e2009-01-07 00:22:26 +00001233 self.queue_entry_ids = self._union_ids(task.queue_entry_ids
1234 for task in tasks)
1235 self.host_ids = self._union_ids(task.host_ids for task in tasks)
1236
showardd3dc1992009-04-22 21:01:40 +00001237 self._clear_queue()
jadmanski0afbb632008-06-06 21:10:57 +00001238 for task in tasks:
1239 self.add_task(task)
mbligh36768f02008-02-22 18:28:33 +00001240
1241
showardd3dc1992009-04-22 21:01:40 +00001242 def _clear_queue(self):
1243 self.queue = Queue.Queue(0)
1244
1245
showard170873e2009-01-07 00:22:26 +00001246 def _union_ids(self, id_lists):
1247 return set(itertools.chain(*id_lists))
1248
1249
jadmanski0afbb632008-06-06 21:10:57 +00001250 def add_task(self, task):
1251 self.queue.put_nowait(task)
1252 task.agent = self
mbligh36768f02008-02-22 18:28:33 +00001253
1254
jadmanski0afbb632008-06-06 21:10:57 +00001255 def tick(self):
showard21baa452008-10-21 00:08:39 +00001256 while not self.is_done():
showard08a36412009-05-05 01:01:13 +00001257 if self.active_task:
showard21baa452008-10-21 00:08:39 +00001258 self.active_task.poll()
1259 if not self.active_task.is_done():
1260 return
1261 self._next_task()
mbligh36768f02008-02-22 18:28:33 +00001262
1263
jadmanski0afbb632008-06-06 21:10:57 +00001264 def _next_task(self):
showardb18134f2009-03-20 20:52:18 +00001265 logging.info("agent picking task")
showard08a36412009-05-05 01:01:13 +00001266 if self.active_task is not None:
jadmanski0afbb632008-06-06 21:10:57 +00001267 assert self.active_task.is_done()
jadmanski0afbb632008-06-06 21:10:57 +00001268 if not self.active_task.success:
1269 self.on_task_failure()
showard08a36412009-05-05 01:01:13 +00001270 self.active_task = None
mblighe2586682008-02-29 22:45:46 +00001271
jadmanski0afbb632008-06-06 21:10:57 +00001272 if not self.is_done():
1273 self.active_task = self.queue.get_nowait()
mbligh36768f02008-02-22 18:28:33 +00001274
1275
jadmanski0afbb632008-06-06 21:10:57 +00001276 def on_task_failure(self):
showardd3dc1992009-04-22 21:01:40 +00001277 self._clear_queue()
showardccbd6c52009-03-21 00:10:21 +00001278 # run failure tasks in a new Agent, so host_ids and queue_entry_ids will
1279 # get reset.
1280 new_agent = Agent(self.active_task.failure_tasks)
1281 self.dispatcher.add_agent(new_agent)
mbligh16c722d2008-03-05 00:58:44 +00001282
mblighe2586682008-02-29 22:45:46 +00001283
showard4c5374f2008-09-04 17:02:56 +00001284 def is_running(self):
jadmanski0afbb632008-06-06 21:10:57 +00001285 return self.active_task is not None
showardec113162008-05-08 00:52:49 +00001286
1287
jadmanski0afbb632008-06-06 21:10:57 +00001288 def is_done(self):
mblighd876f452008-12-03 15:09:17 +00001289 return self.active_task is None and self.queue.empty()
mbligh36768f02008-02-22 18:28:33 +00001290
1291
showardd3dc1992009-04-22 21:01:40 +00001292 def abort(self):
showard08a36412009-05-05 01:01:13 +00001293 # abort tasks until the queue is empty or a task ignores the abort
1294 while not self.is_done():
1295 if not self.active_task:
1296 self._next_task()
showardd3dc1992009-04-22 21:01:40 +00001297 self.active_task.abort()
showard08a36412009-05-05 01:01:13 +00001298 if not self.active_task.aborted:
1299 # tasks can choose to ignore aborts
showard20f9bdd2009-04-29 19:48:33 +00001300 return
1301 self.active_task = None
1302
showardd3dc1992009-04-22 21:01:40 +00001303
showard77182562009-06-10 00:16:05 +00001304class DelayedCallTask(object):
1305 """
1306 A task object like AgentTask for an Agent to run that waits for the
1307 specified amount of time to have elapsed before calling the supplied
1308 callback once and finishing. If the callback returns anything, it is
1309 assumed to be a new Agent instance and will be added to the dispatcher.
1310
1311 @attribute end_time: The absolute posix time after which this task will
1312 call its callback when it is polled and be finished.
1313
1314 Also has all attributes required by the Agent class.
1315 """
1316 def __init__(self, delay_seconds, callback, now_func=None):
1317 """
1318 @param delay_seconds: The delay in seconds from now that this task
1319 will call the supplied callback and be done.
1320 @param callback: A callable to be called by this task once after at
1321 least delay_seconds time has elapsed. It must return None
1322 or a new Agent instance.
1323 @param now_func: A time.time like function. Default: time.time.
1324 Used for testing.
1325 """
1326 assert delay_seconds > 0
1327 assert callable(callback)
1328 if not now_func:
1329 now_func = time.time
1330 self._now_func = now_func
1331 self._callback = callback
1332
1333 self.end_time = self._now_func() + delay_seconds
1334
1335 # These attributes are required by Agent.
1336 self.aborted = False
1337 self.failure_tasks = ()
1338 self.host_ids = ()
1339 self.success = False
1340 self.queue_entry_ids = ()
1341 # This is filled in by Agent.add_task().
1342 self.agent = None
1343
1344
1345 def poll(self):
1346 if self._callback and self._now_func() >= self.end_time:
1347 new_agent = self._callback()
1348 if new_agent:
1349 self.agent.dispatcher.add_agent(new_agent)
1350 self._callback = None
1351 self.success = True
1352
1353
1354 def is_done(self):
1355 return not self._callback
1356
1357
1358 def abort(self):
1359 self.aborted = True
1360 self._callback = None
1361
1362
mbligh36768f02008-02-22 18:28:33 +00001363class AgentTask(object):
showardd3dc1992009-04-22 21:01:40 +00001364 def __init__(self, cmd, working_directory=None, failure_tasks=[],
1365 pidfile_name=None, paired_with_pidfile=None):
jadmanski0afbb632008-06-06 21:10:57 +00001366 self.done = False
1367 self.failure_tasks = failure_tasks
1368 self.started = False
1369 self.cmd = cmd
showard170873e2009-01-07 00:22:26 +00001370 self._working_directory = working_directory
jadmanski0afbb632008-06-06 21:10:57 +00001371 self.task = None
1372 self.agent = None
1373 self.monitor = None
1374 self.success = None
showardd3dc1992009-04-22 21:01:40 +00001375 self.aborted = False
showard170873e2009-01-07 00:22:26 +00001376 self.queue_entry_ids = []
1377 self.host_ids = []
1378 self.log_file = None
1379
1380
1381 def _set_ids(self, host=None, queue_entries=None):
1382 if queue_entries and queue_entries != [None]:
1383 self.host_ids = [entry.host.id for entry in queue_entries]
1384 self.queue_entry_ids = [entry.id for entry in queue_entries]
1385 else:
1386 assert host
1387 self.host_ids = [host.id]
mbligh36768f02008-02-22 18:28:33 +00001388
1389
jadmanski0afbb632008-06-06 21:10:57 +00001390 def poll(self):
showard08a36412009-05-05 01:01:13 +00001391 if not self.started:
1392 self.start()
1393 self.tick()
1394
1395
1396 def tick(self):
jadmanski0afbb632008-06-06 21:10:57 +00001397 if self.monitor:
showard08a36412009-05-05 01:01:13 +00001398 exit_code = self.monitor.exit_code()
1399 if exit_code is None:
1400 return
1401 success = (exit_code == 0)
jadmanski0afbb632008-06-06 21:10:57 +00001402 else:
1403 success = False
mbligh36768f02008-02-22 18:28:33 +00001404
jadmanski0afbb632008-06-06 21:10:57 +00001405 self.finished(success)
mbligh36768f02008-02-22 18:28:33 +00001406
1407
jadmanski0afbb632008-06-06 21:10:57 +00001408 def is_done(self):
1409 return self.done
mbligh36768f02008-02-22 18:28:33 +00001410
1411
jadmanski0afbb632008-06-06 21:10:57 +00001412 def finished(self, success):
showard08a36412009-05-05 01:01:13 +00001413 if self.done:
1414 return
jadmanski0afbb632008-06-06 21:10:57 +00001415 self.done = True
1416 self.success = success
1417 self.epilog()
mbligh36768f02008-02-22 18:28:33 +00001418
1419
jadmanski0afbb632008-06-06 21:10:57 +00001420 def prolog(self):
1421 pass
mblighd64e5702008-04-04 21:39:28 +00001422
1423
jadmanski0afbb632008-06-06 21:10:57 +00001424 def create_temp_resultsdir(self, suffix=''):
showard170873e2009-01-07 00:22:26 +00001425 self.temp_results_dir = _drone_manager.get_temporary_path('agent_task')
mblighd64e5702008-04-04 21:39:28 +00001426
mbligh36768f02008-02-22 18:28:33 +00001427
jadmanski0afbb632008-06-06 21:10:57 +00001428 def cleanup(self):
showard6b733412009-04-27 20:09:18 +00001429 if self.monitor and self.monitor.has_process() and self.log_file:
showard170873e2009-01-07 00:22:26 +00001430 _drone_manager.copy_to_results_repository(
1431 self.monitor.get_process(), self.log_file)
mbligh36768f02008-02-22 18:28:33 +00001432
1433
jadmanski0afbb632008-06-06 21:10:57 +00001434 def epilog(self):
1435 self.cleanup()
mbligh36768f02008-02-22 18:28:33 +00001436
1437
jadmanski0afbb632008-06-06 21:10:57 +00001438 def start(self):
1439 assert self.agent
1440
1441 if not self.started:
1442 self.prolog()
1443 self.run()
1444
1445 self.started = True
1446
1447
1448 def abort(self):
1449 if self.monitor:
1450 self.monitor.kill()
1451 self.done = True
showardd3dc1992009-04-22 21:01:40 +00001452 self.aborted = True
jadmanski0afbb632008-06-06 21:10:57 +00001453 self.cleanup()
1454
1455
showard170873e2009-01-07 00:22:26 +00001456 def set_host_log_file(self, base_name, host):
1457 filename = '%s.%s' % (time.time(), base_name)
1458 self.log_file = os.path.join('hosts', host.hostname, filename)
1459
1460
showardde634ee2009-01-30 01:44:24 +00001461 def _get_consistent_execution_tag(self, queue_entries):
1462 first_execution_tag = queue_entries[0].execution_tag()
1463 for queue_entry in queue_entries[1:]:
1464 assert queue_entry.execution_tag() == first_execution_tag, (
1465 '%s (%s) != %s (%s)' % (queue_entry.execution_tag(),
1466 queue_entry,
1467 first_execution_tag,
1468 queue_entries[0]))
1469 return first_execution_tag
1470
1471
showarda1e74b32009-05-12 17:32:04 +00001472 def _copy_results(self, queue_entries, use_monitor=None):
showardde634ee2009-01-30 01:44:24 +00001473 assert len(queue_entries) > 0
showard6b733412009-04-27 20:09:18 +00001474 if use_monitor is None:
1475 assert self.monitor
1476 use_monitor = self.monitor
1477 assert use_monitor.has_process()
showardde634ee2009-01-30 01:44:24 +00001478 execution_tag = self._get_consistent_execution_tag(queue_entries)
showard678df4f2009-02-04 21:36:39 +00001479 results_path = execution_tag + '/'
showard6b733412009-04-27 20:09:18 +00001480 _drone_manager.copy_to_results_repository(use_monitor.get_process(),
showard678df4f2009-02-04 21:36:39 +00001481 results_path)
showardde634ee2009-01-30 01:44:24 +00001482
showarda1e74b32009-05-12 17:32:04 +00001483
1484 def _parse_results(self, queue_entries):
showardde634ee2009-01-30 01:44:24 +00001485 reparse_task = FinalReparseTask(queue_entries)
1486 self.agent.dispatcher.add_agent(Agent([reparse_task], num_processes=0))
1487
1488
showarda1e74b32009-05-12 17:32:04 +00001489 def _copy_and_parse_results(self, queue_entries, use_monitor=None):
1490 self._copy_results(queue_entries, use_monitor)
1491 self._parse_results(queue_entries)
1492
1493
showardd3dc1992009-04-22 21:01:40 +00001494 def run(self, pidfile_name=_AUTOSERV_PID_FILE, paired_with_pidfile=None):
jadmanski0afbb632008-06-06 21:10:57 +00001495 if self.cmd:
showard170873e2009-01-07 00:22:26 +00001496 self.monitor = PidfileRunMonitor()
1497 self.monitor.run(self.cmd, self._working_directory,
1498 nice_level=AUTOSERV_NICE_LEVEL,
showardd3dc1992009-04-22 21:01:40 +00001499 log_file=self.log_file,
1500 pidfile_name=pidfile_name,
1501 paired_with_pidfile=paired_with_pidfile)
mbligh36768f02008-02-22 18:28:33 +00001502
1503
showardd9205182009-04-27 20:09:55 +00001504class TaskWithJobKeyvals(object):
1505 """AgentTask mixin providing functionality to help with job keyval files."""
1506 _KEYVAL_FILE = 'keyval'
1507 def _format_keyval(self, key, value):
1508 return '%s=%s' % (key, value)
1509
1510
1511 def _keyval_path(self):
1512 """Subclasses must override this"""
1513 raise NotImplemented
1514
1515
1516 def _write_keyval_after_job(self, field, value):
1517 assert self.monitor
1518 if not self.monitor.has_process():
1519 return
1520 _drone_manager.write_lines_to_file(
1521 self._keyval_path(), [self._format_keyval(field, value)],
1522 paired_with_process=self.monitor.get_process())
1523
1524
1525 def _job_queued_keyval(self, job):
1526 return 'job_queued', int(time.mktime(job.created_on.timetuple()))
1527
1528
1529 def _write_job_finished(self):
1530 self._write_keyval_after_job("job_finished", int(time.time()))
1531
1532
1533class RepairTask(AgentTask, TaskWithJobKeyvals):
showarde788ea62008-11-17 21:02:47 +00001534 def __init__(self, host, queue_entry=None):
jadmanski0afbb632008-06-06 21:10:57 +00001535 """\
showard170873e2009-01-07 00:22:26 +00001536 queue_entry: queue entry to mark failed if this repair fails.
jadmanski0afbb632008-06-06 21:10:57 +00001537 """
jadmanskifb7cfb12008-07-09 14:13:21 +00001538 protection = host_protections.Protection.get_string(host.protection)
jadmanski542537f2008-07-24 14:14:56 +00001539 # normalize the protection name
1540 protection = host_protections.Protection.get_attr_name(protection)
showard170873e2009-01-07 00:22:26 +00001541
jadmanski0afbb632008-06-06 21:10:57 +00001542 self.host = host
showardccbd6c52009-03-21 00:10:21 +00001543 self.queue_entry_to_fail = queue_entry
1544 # *don't* include the queue entry in IDs -- if the queue entry is
1545 # aborted, we want to leave the repair task running
1546 self._set_ids(host=host)
showard170873e2009-01-07 00:22:26 +00001547
1548 self.create_temp_resultsdir('.repair')
showard87ba02a2009-04-20 19:37:32 +00001549 cmd = _autoserv_command_line(host.hostname, self.temp_results_dir,
1550 ['-R', '--host-protection', protection],
1551 queue_entry=queue_entry)
showard170873e2009-01-07 00:22:26 +00001552 super(RepairTask, self).__init__(cmd, self.temp_results_dir)
1553
showard170873e2009-01-07 00:22:26 +00001554 self.set_host_log_file('repair', self.host)
mblighe2586682008-02-29 22:45:46 +00001555
mbligh36768f02008-02-22 18:28:33 +00001556
jadmanski0afbb632008-06-06 21:10:57 +00001557 def prolog(self):
showardb18134f2009-03-20 20:52:18 +00001558 logging.info("repair_task starting")
jadmanski0afbb632008-06-06 21:10:57 +00001559 self.host.set_status('Repairing')
showardccbd6c52009-03-21 00:10:21 +00001560 if self.queue_entry_to_fail:
1561 self.queue_entry_to_fail.requeue()
mbligh36768f02008-02-22 18:28:33 +00001562
1563
showardd9205182009-04-27 20:09:55 +00001564 def _keyval_path(self):
1565 return os.path.join(self.temp_results_dir, self._KEYVAL_FILE)
1566
1567
showardde634ee2009-01-30 01:44:24 +00001568 def _fail_queue_entry(self):
showardccbd6c52009-03-21 00:10:21 +00001569 assert self.queue_entry_to_fail
1570
1571 if self.queue_entry_to_fail.meta_host:
1572 return # don't fail metahost entries, they'll be reassigned
1573
1574 self.queue_entry_to_fail.update_from_database()
1575 if self.queue_entry_to_fail.status != 'Queued':
1576 return # entry has been aborted
1577
1578 self.queue_entry_to_fail.set_execution_subdir()
showardd9205182009-04-27 20:09:55 +00001579 queued_key, queued_time = self._job_queued_keyval(
1580 self.queue_entry_to_fail.job)
1581 self._write_keyval_after_job(queued_key, queued_time)
1582 self._write_job_finished()
showard678df4f2009-02-04 21:36:39 +00001583 # copy results logs into the normal place for job results
1584 _drone_manager.copy_results_on_drone(
1585 self.monitor.get_process(),
1586 source_path=self.temp_results_dir + '/',
showardccbd6c52009-03-21 00:10:21 +00001587 destination_path=self.queue_entry_to_fail.execution_tag() + '/')
showard678df4f2009-02-04 21:36:39 +00001588
showarda1e74b32009-05-12 17:32:04 +00001589 self._copy_results([self.queue_entry_to_fail])
1590 if self.queue_entry_to_fail.job.parse_failed_repair:
1591 self._parse_results([self.queue_entry_to_fail])
showardccbd6c52009-03-21 00:10:21 +00001592 self.queue_entry_to_fail.handle_host_failure()
showardde634ee2009-01-30 01:44:24 +00001593
1594
jadmanski0afbb632008-06-06 21:10:57 +00001595 def epilog(self):
1596 super(RepairTask, self).epilog()
showard6d7b2ff2009-06-10 00:16:47 +00001597
1598 tasks = models.SpecialTask.objects.filter(host__id=self.host.id,
1599 is_active=True)
1600 for task in tasks:
1601 task.is_complete = True
1602 task.save()
1603
jadmanski0afbb632008-06-06 21:10:57 +00001604 if self.success:
1605 self.host.set_status('Ready')
1606 else:
1607 self.host.set_status('Repair Failed')
showardccbd6c52009-03-21 00:10:21 +00001608 if self.queue_entry_to_fail:
showardde634ee2009-01-30 01:44:24 +00001609 self._fail_queue_entry()
mbligh36768f02008-02-22 18:28:33 +00001610
1611
showard8fe93b52008-11-18 17:53:22 +00001612class PreJobTask(AgentTask):
showard170873e2009-01-07 00:22:26 +00001613 def epilog(self):
1614 super(PreJobTask, self).epilog()
showard8fe93b52008-11-18 17:53:22 +00001615 should_copy_results = (self.queue_entry and not self.success
1616 and not self.queue_entry.meta_host)
1617 if should_copy_results:
1618 self.queue_entry.set_execution_subdir()
showard170873e2009-01-07 00:22:26 +00001619 destination = os.path.join(self.queue_entry.execution_tag(),
1620 os.path.basename(self.log_file))
1621 _drone_manager.copy_to_results_repository(
1622 self.monitor.get_process(), self.log_file,
1623 destination_path=destination)
showard8fe93b52008-11-18 17:53:22 +00001624
1625
1626class VerifyTask(PreJobTask):
showard9976ce92008-10-15 20:28:13 +00001627 def __init__(self, queue_entry=None, host=None):
jadmanski0afbb632008-06-06 21:10:57 +00001628 assert bool(queue_entry) != bool(host)
jadmanski0afbb632008-06-06 21:10:57 +00001629 self.host = host or queue_entry.host
1630 self.queue_entry = queue_entry
mbligh36768f02008-02-22 18:28:33 +00001631
jadmanski0afbb632008-06-06 21:10:57 +00001632 self.create_temp_resultsdir('.verify')
showard87ba02a2009-04-20 19:37:32 +00001633 cmd = _autoserv_command_line(self.host.hostname, self.temp_results_dir,
1634 ['-v'], queue_entry=queue_entry)
showarde788ea62008-11-17 21:02:47 +00001635 failure_tasks = [RepairTask(self.host, queue_entry=queue_entry)]
showard170873e2009-01-07 00:22:26 +00001636 super(VerifyTask, self).__init__(cmd, self.temp_results_dir,
1637 failure_tasks=failure_tasks)
mblighe2586682008-02-29 22:45:46 +00001638
showard170873e2009-01-07 00:22:26 +00001639 self.set_host_log_file('verify', self.host)
1640 self._set_ids(host=host, queue_entries=[queue_entry])
mblighe2586682008-02-29 22:45:46 +00001641
1642
jadmanski0afbb632008-06-06 21:10:57 +00001643 def prolog(self):
showard8fe93b52008-11-18 17:53:22 +00001644 super(VerifyTask, self).prolog()
showardb18134f2009-03-20 20:52:18 +00001645 logging.info("starting verify on %s", self.host.hostname)
jadmanski0afbb632008-06-06 21:10:57 +00001646 if self.queue_entry:
1647 self.queue_entry.set_status('Verifying')
jadmanski0afbb632008-06-06 21:10:57 +00001648 self.host.set_status('Verifying')
mbligh36768f02008-02-22 18:28:33 +00001649
1650
jadmanski0afbb632008-06-06 21:10:57 +00001651 def epilog(self):
1652 super(VerifyTask, self).epilog()
mbligh36768f02008-02-22 18:28:33 +00001653
jadmanski0afbb632008-06-06 21:10:57 +00001654 if self.success:
showard6d7b2ff2009-06-10 00:16:47 +00001655 tasks = models.SpecialTask.objects.filter(host__id=self.host.id,
1656 is_active=True)
1657 for task in tasks:
1658 task.is_complete=True
1659 task.save()
1660
jadmanski0afbb632008-06-06 21:10:57 +00001661 self.host.set_status('Ready')
mbligh36768f02008-02-22 18:28:33 +00001662
1663
showardd9205182009-04-27 20:09:55 +00001664class QueueTask(AgentTask, TaskWithJobKeyvals):
showardf1ae3542009-05-11 19:26:02 +00001665 def __init__(self, job, queue_entries, cmd, group_name=''):
jadmanski0afbb632008-06-06 21:10:57 +00001666 self.job = job
1667 self.queue_entries = queue_entries
showardf1ae3542009-05-11 19:26:02 +00001668 self.group_name = group_name
showard170873e2009-01-07 00:22:26 +00001669 super(QueueTask, self).__init__(cmd, self._execution_tag())
1670 self._set_ids(queue_entries=queue_entries)
mbligh36768f02008-02-22 18:28:33 +00001671
1672
showard73ec0442009-02-07 02:05:20 +00001673 def _keyval_path(self):
showardd9205182009-04-27 20:09:55 +00001674 return os.path.join(self._execution_tag(), self._KEYVAL_FILE)
showard73ec0442009-02-07 02:05:20 +00001675
1676
1677 def _write_keyvals_before_job_helper(self, keyval_dict, keyval_path):
1678 keyval_contents = '\n'.join(self._format_keyval(key, value)
1679 for key, value in keyval_dict.iteritems())
1680 # always end with a newline to allow additional keyvals to be written
1681 keyval_contents += '\n'
1682 _drone_manager.attach_file_to_execution(self._execution_tag(),
1683 keyval_contents,
1684 file_path=keyval_path)
1685
1686
1687 def _write_keyvals_before_job(self, keyval_dict):
1688 self._write_keyvals_before_job_helper(keyval_dict, self._keyval_path())
1689
1690
showard170873e2009-01-07 00:22:26 +00001691 def _write_host_keyvals(self, host):
1692 keyval_path = os.path.join(self._execution_tag(), 'host_keyvals',
1693 host.hostname)
1694 platform, all_labels = host.platform_and_labels()
showard73ec0442009-02-07 02:05:20 +00001695 keyval_dict = dict(platform=platform, labels=','.join(all_labels))
1696 self._write_keyvals_before_job_helper(keyval_dict, keyval_path)
showardd8e548a2008-09-09 03:04:57 +00001697
1698
showard170873e2009-01-07 00:22:26 +00001699 def _execution_tag(self):
1700 return self.queue_entries[0].execution_tag()
mblighbb421852008-03-11 22:36:16 +00001701
1702
jadmanski0afbb632008-06-06 21:10:57 +00001703 def prolog(self):
showardd9205182009-04-27 20:09:55 +00001704 queued_key, queued_time = self._job_queued_keyval(self.job)
showardf1ae3542009-05-11 19:26:02 +00001705 keyval_dict = {queued_key: queued_time}
1706 if self.group_name:
1707 keyval_dict['host_group_name'] = self.group_name
1708 self._write_keyvals_before_job(keyval_dict)
jadmanski0afbb632008-06-06 21:10:57 +00001709 for queue_entry in self.queue_entries:
showard170873e2009-01-07 00:22:26 +00001710 self._write_host_keyvals(queue_entry.host)
jadmanski0afbb632008-06-06 21:10:57 +00001711 queue_entry.set_status('Running')
showard12f3e322009-05-13 21:27:42 +00001712 queue_entry.update_field('started_on', datetime.datetime.now())
jadmanski0afbb632008-06-06 21:10:57 +00001713 queue_entry.host.set_status('Running')
showard21baa452008-10-21 00:08:39 +00001714 queue_entry.host.update_field('dirty', 1)
showard2bab8f42008-11-12 18:15:22 +00001715 if self.job.synch_count == 1:
jadmanski0afbb632008-06-06 21:10:57 +00001716 assert len(self.queue_entries) == 1
1717 self.job.write_to_machines_file(self.queue_entries[0])
mbligh36768f02008-02-22 18:28:33 +00001718
1719
showard35162b02009-03-03 02:17:30 +00001720 def _write_lost_process_error_file(self):
1721 error_file_path = os.path.join(self._execution_tag(), 'job_failure')
1722 _drone_manager.write_lines_to_file(error_file_path,
1723 [_LOST_PROCESS_ERROR])
1724
1725
showardd3dc1992009-04-22 21:01:40 +00001726 def _finish_task(self):
showard08a36412009-05-05 01:01:13 +00001727 if not self.monitor:
1728 return
1729
showardd9205182009-04-27 20:09:55 +00001730 self._write_job_finished()
1731
showardd3dc1992009-04-22 21:01:40 +00001732 # both of these conditionals can be true, iff the process ran, wrote a
1733 # pid to its pidfile, and then exited without writing an exit code
showard35162b02009-03-03 02:17:30 +00001734 if self.monitor.has_process():
showardd3dc1992009-04-22 21:01:40 +00001735 gather_task = GatherLogsTask(self.job, self.queue_entries)
1736 self.agent.dispatcher.add_agent(Agent(tasks=[gather_task]))
showard35162b02009-03-03 02:17:30 +00001737
1738 if self.monitor.lost_process:
1739 self._write_lost_process_error_file()
1740 for queue_entry in self.queue_entries:
1741 queue_entry.set_status(models.HostQueueEntry.Status.FAILED)
jadmanskif7fa2cc2008-10-01 14:13:23 +00001742
1743
showardcbd74612008-11-19 21:42:02 +00001744 def _write_status_comment(self, comment):
showard170873e2009-01-07 00:22:26 +00001745 _drone_manager.write_lines_to_file(
1746 os.path.join(self._execution_tag(), 'status.log'),
1747 ['INFO\t----\t----\t' + comment],
showard35162b02009-03-03 02:17:30 +00001748 paired_with_process=self.monitor.get_process())
showardcbd74612008-11-19 21:42:02 +00001749
1750
jadmanskif7fa2cc2008-10-01 14:13:23 +00001751 def _log_abort(self):
showard170873e2009-01-07 00:22:26 +00001752 if not self.monitor or not self.monitor.has_process():
1753 return
1754
jadmanskif7fa2cc2008-10-01 14:13:23 +00001755 # build up sets of all the aborted_by and aborted_on values
1756 aborted_by, aborted_on = set(), set()
1757 for queue_entry in self.queue_entries:
1758 if queue_entry.aborted_by:
1759 aborted_by.add(queue_entry.aborted_by)
1760 t = int(time.mktime(queue_entry.aborted_on.timetuple()))
1761 aborted_on.add(t)
1762
1763 # extract some actual, unique aborted by value and write it out
1764 assert len(aborted_by) <= 1
1765 if len(aborted_by) == 1:
showardcbd74612008-11-19 21:42:02 +00001766 aborted_by_value = aborted_by.pop()
1767 aborted_on_value = max(aborted_on)
1768 else:
1769 aborted_by_value = 'autotest_system'
1770 aborted_on_value = int(time.time())
showard170873e2009-01-07 00:22:26 +00001771
showarda0382352009-02-11 23:36:43 +00001772 self._write_keyval_after_job("aborted_by", aborted_by_value)
1773 self._write_keyval_after_job("aborted_on", aborted_on_value)
showard170873e2009-01-07 00:22:26 +00001774
showardcbd74612008-11-19 21:42:02 +00001775 aborted_on_string = str(datetime.datetime.fromtimestamp(
1776 aborted_on_value))
1777 self._write_status_comment('Job aborted by %s on %s' %
1778 (aborted_by_value, aborted_on_string))
jadmanskic2ac77f2008-05-16 21:44:04 +00001779
1780
jadmanski0afbb632008-06-06 21:10:57 +00001781 def abort(self):
1782 super(QueueTask, self).abort()
jadmanskif7fa2cc2008-10-01 14:13:23 +00001783 self._log_abort()
showardd3dc1992009-04-22 21:01:40 +00001784 self._finish_task()
showard21baa452008-10-21 00:08:39 +00001785
1786
jadmanski0afbb632008-06-06 21:10:57 +00001787 def epilog(self):
1788 super(QueueTask, self).epilog()
showardd3dc1992009-04-22 21:01:40 +00001789 self._finish_task()
1790 logging.info("queue_task finished with success=%s", self.success)
mbligh36768f02008-02-22 18:28:33 +00001791
1792
mblighbb421852008-03-11 22:36:16 +00001793class RecoveryQueueTask(QueueTask):
jadmanski0afbb632008-06-06 21:10:57 +00001794 def __init__(self, job, queue_entries, run_monitor):
showard170873e2009-01-07 00:22:26 +00001795 super(RecoveryQueueTask, self).__init__(job, queue_entries, cmd=None)
showard5add1c82009-05-26 19:27:46 +00001796 self.monitor = run_monitor
1797 self.started = True
1798 # since we set started=True here, prolog() and run() shouldn't be called
mblighbb421852008-03-11 22:36:16 +00001799
1800
jadmanski0afbb632008-06-06 21:10:57 +00001801 def run(self):
showard5add1c82009-05-26 19:27:46 +00001802 raise NotImplemented('This should never be called')
mblighbb421852008-03-11 22:36:16 +00001803
1804
jadmanski0afbb632008-06-06 21:10:57 +00001805 def prolog(self):
showard5add1c82009-05-26 19:27:46 +00001806 raise NotImplemented('This should never be called')
mblighbb421852008-03-11 22:36:16 +00001807
1808
showardd3dc1992009-04-22 21:01:40 +00001809class PostJobTask(AgentTask):
1810 def __init__(self, queue_entries, pidfile_name, logfile_name,
1811 run_monitor=None):
1812 """
1813 If run_monitor != None, we're recovering a running task.
1814 """
1815 self._queue_entries = queue_entries
1816 self._pidfile_name = pidfile_name
showardd3dc1992009-04-22 21:01:40 +00001817
1818 self._execution_tag = self._get_consistent_execution_tag(queue_entries)
1819 self._results_dir = _drone_manager.absolute_path(self._execution_tag)
1820 self._autoserv_monitor = PidfileRunMonitor()
1821 self._autoserv_monitor.attach_to_existing_process(self._execution_tag)
1822 self._paired_with_pidfile = self._autoserv_monitor.pidfile_id
1823
1824 if _testing_mode:
1825 command = 'true'
1826 else:
1827 command = self._generate_command(self._results_dir)
1828
1829 super(PostJobTask, self).__init__(cmd=command,
1830 working_directory=self._execution_tag)
showard5add1c82009-05-26 19:27:46 +00001831 # this must happen *after* the super call
1832 self.monitor = run_monitor
1833 if run_monitor:
1834 self.started = True
showardd3dc1992009-04-22 21:01:40 +00001835
1836 self.log_file = os.path.join(self._execution_tag, logfile_name)
1837 self._final_status = self._determine_final_status()
1838
1839
1840 def _generate_command(self, results_dir):
1841 raise NotImplementedError('Subclasses must override this')
1842
1843
1844 def _job_was_aborted(self):
1845 was_aborted = None
1846 for queue_entry in self._queue_entries:
1847 queue_entry.update_from_database()
1848 if was_aborted is None: # first queue entry
1849 was_aborted = bool(queue_entry.aborted)
1850 elif was_aborted != bool(queue_entry.aborted): # subsequent entries
1851 email_manager.manager.enqueue_notify_email(
1852 'Inconsistent abort state',
1853 'Queue entries have inconsistent abort state: ' +
1854 ', '.join('%s (%s)' % (queue_entry, queue_entry.aborted)))
1855 # don't crash here, just assume true
1856 return True
1857 return was_aborted
1858
1859
1860 def _determine_final_status(self):
1861 if self._job_was_aborted():
1862 return models.HostQueueEntry.Status.ABORTED
1863
1864 # we'll use a PidfileRunMonitor to read the autoserv exit status
1865 if self._autoserv_monitor.exit_code() == 0:
1866 return models.HostQueueEntry.Status.COMPLETED
1867 return models.HostQueueEntry.Status.FAILED
1868
1869
1870 def run(self):
showard5add1c82009-05-26 19:27:46 +00001871 assert not self.monitor
showardd3dc1992009-04-22 21:01:40 +00001872
showard5add1c82009-05-26 19:27:46 +00001873 # make sure we actually have results to work with.
1874 # this should never happen in normal operation.
1875 if not self._autoserv_monitor.has_process():
1876 email_manager.manager.enqueue_notify_email(
1877 'No results in post-job task',
1878 'No results in post-job task at %s' %
1879 self._autoserv_monitor.pidfile_id)
1880 self.finished(False)
1881 return
1882
1883 super(PostJobTask, self).run(
1884 pidfile_name=self._pidfile_name,
1885 paired_with_pidfile=self._paired_with_pidfile)
showardd3dc1992009-04-22 21:01:40 +00001886
1887
1888 def _set_all_statuses(self, status):
1889 for queue_entry in self._queue_entries:
1890 queue_entry.set_status(status)
1891
1892
1893 def abort(self):
1894 # override AgentTask.abort() to avoid killing the process and ending
1895 # the task. post-job tasks continue when the job is aborted.
1896 pass
1897
1898
1899class GatherLogsTask(PostJobTask):
1900 """
1901 Task responsible for
1902 * gathering uncollected logs (if Autoserv crashed hard or was killed)
1903 * copying logs to the results repository
1904 * spawning CleanupTasks for hosts, if necessary
1905 * spawning a FinalReparseTask for the job
1906 """
1907 def __init__(self, job, queue_entries, run_monitor=None):
1908 self._job = job
1909 super(GatherLogsTask, self).__init__(
1910 queue_entries, pidfile_name=_CRASHINFO_PID_FILE,
1911 logfile_name='.collect_crashinfo.log', run_monitor=run_monitor)
1912 self._set_ids(queue_entries=queue_entries)
1913
1914
1915 def _generate_command(self, results_dir):
1916 host_list = ','.join(queue_entry.host.hostname
1917 for queue_entry in self._queue_entries)
1918 return [_autoserv_path , '-p', '--collect-crashinfo', '-m', host_list,
1919 '-r', results_dir]
1920
1921
1922 def prolog(self):
1923 super(GatherLogsTask, self).prolog()
1924 self._set_all_statuses(models.HostQueueEntry.Status.GATHERING)
1925
1926
1927 def _reboot_hosts(self):
1928 reboot_after = self._job.reboot_after
1929 do_reboot = False
showard6b733412009-04-27 20:09:18 +00001930 if self._final_status == models.HostQueueEntry.Status.ABORTED:
1931 do_reboot = True
1932 elif reboot_after == models.RebootAfter.ALWAYS:
showardd3dc1992009-04-22 21:01:40 +00001933 do_reboot = True
1934 elif reboot_after == models.RebootAfter.IF_ALL_TESTS_PASSED:
1935 final_success = (
1936 self._final_status == models.HostQueueEntry.Status.COMPLETED)
1937 num_tests_failed = self._autoserv_monitor.num_tests_failed()
1938 do_reboot = (final_success and num_tests_failed == 0)
1939
1940 for queue_entry in self._queue_entries:
1941 if do_reboot:
1942 # don't pass the queue entry to the CleanupTask. if the cleanup
1943 # fails, the job doesn't care -- it's over.
1944 cleanup_task = CleanupTask(host=queue_entry.host)
1945 self.agent.dispatcher.add_agent(Agent([cleanup_task]))
1946 else:
1947 queue_entry.host.set_status('Ready')
1948
1949
1950 def epilog(self):
1951 super(GatherLogsTask, self).epilog()
showardebc0fb72009-05-13 21:28:07 +00001952 if self._autoserv_monitor.has_process():
1953 self._copy_and_parse_results(self._queue_entries,
1954 use_monitor=self._autoserv_monitor)
showardd3dc1992009-04-22 21:01:40 +00001955 self._reboot_hosts()
1956
1957
showard0bbfc212009-04-29 21:06:13 +00001958 def run(self):
showard597bfd32009-05-08 18:22:50 +00001959 autoserv_exit_code = self._autoserv_monitor.exit_code()
1960 # only run if Autoserv exited due to some signal. if we have no exit
1961 # code, assume something bad (and signal-like) happened.
1962 if autoserv_exit_code is None or os.WIFSIGNALED(autoserv_exit_code):
showard0bbfc212009-04-29 21:06:13 +00001963 super(GatherLogsTask, self).run()
showard597bfd32009-05-08 18:22:50 +00001964 else:
1965 self.finished(True)
showard0bbfc212009-04-29 21:06:13 +00001966
1967
showard8fe93b52008-11-18 17:53:22 +00001968class CleanupTask(PreJobTask):
showardfa8629c2008-11-04 16:51:23 +00001969 def __init__(self, host=None, queue_entry=None):
1970 assert bool(host) ^ bool(queue_entry)
1971 if queue_entry:
1972 host = queue_entry.get_host()
showardfa8629c2008-11-04 16:51:23 +00001973 self.queue_entry = queue_entry
jadmanski0afbb632008-06-06 21:10:57 +00001974 self.host = host
showard170873e2009-01-07 00:22:26 +00001975
1976 self.create_temp_resultsdir('.cleanup')
showard87ba02a2009-04-20 19:37:32 +00001977 self.cmd = _autoserv_command_line(host.hostname, self.temp_results_dir,
1978 ['--cleanup'],
1979 queue_entry=queue_entry)
showarde788ea62008-11-17 21:02:47 +00001980 repair_task = RepairTask(host, queue_entry=queue_entry)
showard170873e2009-01-07 00:22:26 +00001981 super(CleanupTask, self).__init__(self.cmd, self.temp_results_dir,
1982 failure_tasks=[repair_task])
1983
1984 self._set_ids(host=host, queue_entries=[queue_entry])
1985 self.set_host_log_file('cleanup', self.host)
mbligh16c722d2008-03-05 00:58:44 +00001986
mblighd5c95802008-03-05 00:33:46 +00001987
jadmanski0afbb632008-06-06 21:10:57 +00001988 def prolog(self):
showard8fe93b52008-11-18 17:53:22 +00001989 super(CleanupTask, self).prolog()
showardb18134f2009-03-20 20:52:18 +00001990 logging.info("starting cleanup task for host: %s", self.host.hostname)
showard45ae8192008-11-05 19:32:53 +00001991 self.host.set_status("Cleaning")
mblighd5c95802008-03-05 00:33:46 +00001992
mblighd5c95802008-03-05 00:33:46 +00001993
showard21baa452008-10-21 00:08:39 +00001994 def epilog(self):
showard45ae8192008-11-05 19:32:53 +00001995 super(CleanupTask, self).epilog()
showard21baa452008-10-21 00:08:39 +00001996 if self.success:
showardfa8629c2008-11-04 16:51:23 +00001997 self.host.set_status('Ready')
showard21baa452008-10-21 00:08:39 +00001998 self.host.update_field('dirty', 0)
1999
2000
showardd3dc1992009-04-22 21:01:40 +00002001class FinalReparseTask(PostJobTask):
showard97aed502008-11-04 02:01:24 +00002002 _num_running_parses = 0
2003
showardd3dc1992009-04-22 21:01:40 +00002004 def __init__(self, queue_entries, run_monitor=None):
2005 super(FinalReparseTask, self).__init__(queue_entries,
2006 pidfile_name=_PARSER_PID_FILE,
2007 logfile_name='.parse.log',
2008 run_monitor=run_monitor)
showard170873e2009-01-07 00:22:26 +00002009 # don't use _set_ids, since we don't want to set the host_ids
2010 self.queue_entry_ids = [entry.id for entry in queue_entries]
showard5add1c82009-05-26 19:27:46 +00002011 self._parse_started = (run_monitor is not None)
showard97aed502008-11-04 02:01:24 +00002012
showard97aed502008-11-04 02:01:24 +00002013
2014 @classmethod
2015 def _increment_running_parses(cls):
2016 cls._num_running_parses += 1
2017
2018
2019 @classmethod
2020 def _decrement_running_parses(cls):
2021 cls._num_running_parses -= 1
2022
2023
2024 @classmethod
2025 def _can_run_new_parse(cls):
showardd1ee1dd2009-01-07 21:33:08 +00002026 return (cls._num_running_parses <
2027 scheduler_config.config.max_parse_processes)
showard97aed502008-11-04 02:01:24 +00002028
2029
2030 def prolog(self):
2031 super(FinalReparseTask, self).prolog()
showardd3dc1992009-04-22 21:01:40 +00002032 self._set_all_statuses(models.HostQueueEntry.Status.PARSING)
showard97aed502008-11-04 02:01:24 +00002033
2034
2035 def epilog(self):
2036 super(FinalReparseTask, self).epilog()
showardd3dc1992009-04-22 21:01:40 +00002037 self._set_all_statuses(self._final_status)
showard97aed502008-11-04 02:01:24 +00002038
2039
showardd3dc1992009-04-22 21:01:40 +00002040 def _generate_command(self, results_dir):
mbligh9e936402009-05-13 20:42:17 +00002041 return [_parser_path, '--write-pidfile', '-l', '2', '-r', '-o', '-P',
showardd3dc1992009-04-22 21:01:40 +00002042 results_dir]
showard97aed502008-11-04 02:01:24 +00002043
2044
showard08a36412009-05-05 01:01:13 +00002045 def tick(self):
2046 # override tick to keep trying to start until the parse count goes down
showard97aed502008-11-04 02:01:24 +00002047 # and we can, at which point we revert to default behavior
2048 if self._parse_started:
showard08a36412009-05-05 01:01:13 +00002049 super(FinalReparseTask, self).tick()
showard97aed502008-11-04 02:01:24 +00002050 else:
2051 self._try_starting_parse()
2052
2053
2054 def run(self):
2055 # override run() to not actually run unless we can
2056 self._try_starting_parse()
2057
2058
2059 def _try_starting_parse(self):
2060 if not self._can_run_new_parse():
2061 return
showard170873e2009-01-07 00:22:26 +00002062
showard97aed502008-11-04 02:01:24 +00002063 # actually run the parse command
showardd3dc1992009-04-22 21:01:40 +00002064 super(FinalReparseTask, self).run()
showard170873e2009-01-07 00:22:26 +00002065
showard97aed502008-11-04 02:01:24 +00002066 self._increment_running_parses()
2067 self._parse_started = True
2068
2069
2070 def finished(self, success):
2071 super(FinalReparseTask, self).finished(success)
showard678df4f2009-02-04 21:36:39 +00002072 if self._parse_started:
2073 self._decrement_running_parses()
showard97aed502008-11-04 02:01:24 +00002074
2075
showardc9ae1782009-01-30 01:42:37 +00002076class SetEntryPendingTask(AgentTask):
2077 def __init__(self, queue_entry):
2078 super(SetEntryPendingTask, self).__init__(cmd='')
2079 self._queue_entry = queue_entry
2080 self._set_ids(queue_entries=[queue_entry])
2081
2082
2083 def run(self):
2084 agent = self._queue_entry.on_pending()
2085 if agent:
2086 self.agent.dispatcher.add_agent(agent)
2087 self.finished(True)
2088
2089
showarda3c58572009-03-12 20:36:59 +00002090class DBError(Exception):
2091 """Raised by the DBObject constructor when its select fails."""
2092
2093
mbligh36768f02008-02-22 18:28:33 +00002094class DBObject(object):
showarda3c58572009-03-12 20:36:59 +00002095 """A miniature object relational model for the database."""
showard6ae5ea92009-02-25 00:11:51 +00002096
2097 # Subclasses MUST override these:
2098 _table_name = ''
2099 _fields = ()
2100
showarda3c58572009-03-12 20:36:59 +00002101 # A mapping from (type, id) to the instance of the object for that
2102 # particular id. This prevents us from creating new Job() and Host()
2103 # instances for every HostQueueEntry object that we instantiate as
2104 # multiple HQEs often share the same Job.
2105 _instances_by_type_and_id = weakref.WeakValueDictionary()
2106 _initialized = False
showard6ae5ea92009-02-25 00:11:51 +00002107
showarda3c58572009-03-12 20:36:59 +00002108
2109 def __new__(cls, id=None, **kwargs):
2110 """
2111 Look to see if we already have an instance for this particular type
2112 and id. If so, use it instead of creating a duplicate instance.
2113 """
2114 if id is not None:
2115 instance = cls._instances_by_type_and_id.get((cls, id))
2116 if instance:
2117 return instance
2118 return super(DBObject, cls).__new__(cls, id=id, **kwargs)
2119
2120
2121 def __init__(self, id=None, row=None, new_record=False, always_query=True):
jadmanski0afbb632008-06-06 21:10:57 +00002122 assert (bool(id) != bool(row))
showard6ae5ea92009-02-25 00:11:51 +00002123 assert self._table_name, '_table_name must be defined in your class'
2124 assert self._fields, '_fields must be defined in your class'
showarda3c58572009-03-12 20:36:59 +00002125 if not new_record:
2126 if self._initialized and not always_query:
2127 return # We've already been initialized.
2128 if id is None:
2129 id = row[0]
2130 # Tell future constructors to use us instead of re-querying while
2131 # this instance is still around.
2132 self._instances_by_type_and_id[(type(self), id)] = self
mbligh36768f02008-02-22 18:28:33 +00002133
showard6ae5ea92009-02-25 00:11:51 +00002134 self.__table = self._table_name
mbligh36768f02008-02-22 18:28:33 +00002135
jadmanski0afbb632008-06-06 21:10:57 +00002136 self.__new_record = new_record
mbligh36768f02008-02-22 18:28:33 +00002137
jadmanski0afbb632008-06-06 21:10:57 +00002138 if row is None:
showardccbd6c52009-03-21 00:10:21 +00002139 row = self._fetch_row_from_db(id)
mbligh36768f02008-02-22 18:28:33 +00002140
showarda3c58572009-03-12 20:36:59 +00002141 if self._initialized:
2142 differences = self._compare_fields_in_row(row)
2143 if differences:
showard7629f142009-03-27 21:02:02 +00002144 logging.warn(
2145 'initialized %s %s instance requery is updating: %s',
2146 type(self), self.id, differences)
showard2bab8f42008-11-12 18:15:22 +00002147 self._update_fields_from_row(row)
showarda3c58572009-03-12 20:36:59 +00002148 self._initialized = True
2149
2150
2151 @classmethod
2152 def _clear_instance_cache(cls):
2153 """Used for testing, clear the internal instance cache."""
2154 cls._instances_by_type_and_id.clear()
2155
2156
showardccbd6c52009-03-21 00:10:21 +00002157 def _fetch_row_from_db(self, row_id):
2158 sql = 'SELECT * FROM %s WHERE ID=%%s' % self.__table
2159 rows = _db.execute(sql, (row_id,))
2160 if not rows:
showard76e29d12009-04-15 21:53:10 +00002161 raise DBError("row not found (table=%s, row id=%s)"
2162 % (self.__table, row_id))
showardccbd6c52009-03-21 00:10:21 +00002163 return rows[0]
2164
2165
showarda3c58572009-03-12 20:36:59 +00002166 def _assert_row_length(self, row):
2167 assert len(row) == len(self._fields), (
2168 "table = %s, row = %s/%d, fields = %s/%d" % (
2169 self.__table, row, len(row), self._fields, len(self._fields)))
2170
2171
2172 def _compare_fields_in_row(self, row):
2173 """
2174 Given a row as returned by a SELECT query, compare it to our existing
2175 in memory fields.
2176
2177 @param row - A sequence of values corresponding to fields named in
2178 The class attribute _fields.
2179
2180 @returns A dictionary listing the differences keyed by field name
2181 containing tuples of (current_value, row_value).
2182 """
2183 self._assert_row_length(row)
2184 differences = {}
2185 for field, row_value in itertools.izip(self._fields, row):
2186 current_value = getattr(self, field)
2187 if current_value != row_value:
2188 differences[field] = (current_value, row_value)
2189 return differences
showard2bab8f42008-11-12 18:15:22 +00002190
2191
2192 def _update_fields_from_row(self, row):
showarda3c58572009-03-12 20:36:59 +00002193 """
2194 Update our field attributes using a single row returned by SELECT.
2195
2196 @param row - A sequence of values corresponding to fields named in
2197 the class fields list.
2198 """
2199 self._assert_row_length(row)
mbligh36768f02008-02-22 18:28:33 +00002200
showard2bab8f42008-11-12 18:15:22 +00002201 self._valid_fields = set()
showarda3c58572009-03-12 20:36:59 +00002202 for field, value in itertools.izip(self._fields, row):
showard2bab8f42008-11-12 18:15:22 +00002203 setattr(self, field, value)
2204 self._valid_fields.add(field)
mbligh36768f02008-02-22 18:28:33 +00002205
showard2bab8f42008-11-12 18:15:22 +00002206 self._valid_fields.remove('id')
mbligh36768f02008-02-22 18:28:33 +00002207
mblighe2586682008-02-29 22:45:46 +00002208
showardccbd6c52009-03-21 00:10:21 +00002209 def update_from_database(self):
2210 assert self.id is not None
2211 row = self._fetch_row_from_db(self.id)
2212 self._update_fields_from_row(row)
2213
2214
jadmanski0afbb632008-06-06 21:10:57 +00002215 def count(self, where, table = None):
2216 if not table:
2217 table = self.__table
mbligh36768f02008-02-22 18:28:33 +00002218
jadmanski0afbb632008-06-06 21:10:57 +00002219 rows = _db.execute("""
2220 SELECT count(*) FROM %s
2221 WHERE %s
2222 """ % (table, where))
mbligh6f8bab42008-02-29 22:45:14 +00002223
jadmanski0afbb632008-06-06 21:10:57 +00002224 assert len(rows) == 1
2225
2226 return int(rows[0][0])
mbligh36768f02008-02-22 18:28:33 +00002227
2228
showardd3dc1992009-04-22 21:01:40 +00002229 def update_field(self, field, value):
showard2bab8f42008-11-12 18:15:22 +00002230 assert field in self._valid_fields
mbligh36768f02008-02-22 18:28:33 +00002231
showard2bab8f42008-11-12 18:15:22 +00002232 if getattr(self, field) == value:
jadmanski0afbb632008-06-06 21:10:57 +00002233 return
mbligh36768f02008-02-22 18:28:33 +00002234
mblighf8c624d2008-07-03 16:58:45 +00002235 query = "UPDATE %s SET %s = %%s WHERE id = %%s" % (self.__table, field)
jadmanski0afbb632008-06-06 21:10:57 +00002236 _db.execute(query, (value, self.id))
2237
showard2bab8f42008-11-12 18:15:22 +00002238 setattr(self, field, value)
mbligh36768f02008-02-22 18:28:33 +00002239
2240
jadmanski0afbb632008-06-06 21:10:57 +00002241 def save(self):
2242 if self.__new_record:
showard6ae5ea92009-02-25 00:11:51 +00002243 keys = self._fields[1:] # avoid id
jadmanski0afbb632008-06-06 21:10:57 +00002244 columns = ','.join([str(key) for key in keys])
showard76e29d12009-04-15 21:53:10 +00002245 values = []
2246 for key in keys:
2247 value = getattr(self, key)
2248 if value is None:
2249 values.append('NULL')
2250 else:
2251 values.append('"%s"' % value)
showard89f84db2009-03-12 20:39:13 +00002252 values_str = ','.join(values)
2253 query = ('INSERT INTO %s (%s) VALUES (%s)' %
2254 (self.__table, columns, values_str))
jadmanski0afbb632008-06-06 21:10:57 +00002255 _db.execute(query)
showard89f84db2009-03-12 20:39:13 +00002256 # Update our id to the one the database just assigned to us.
2257 self.id = _db.execute('SELECT LAST_INSERT_ID()')[0][0]
mbligh36768f02008-02-22 18:28:33 +00002258
2259
jadmanski0afbb632008-06-06 21:10:57 +00002260 def delete(self):
showarda3c58572009-03-12 20:36:59 +00002261 self._instances_by_type_and_id.pop((type(self), id), None)
2262 self._initialized = False
2263 self._valid_fields.clear()
jadmanski0afbb632008-06-06 21:10:57 +00002264 query = 'DELETE FROM %s WHERE id=%%s' % self.__table
2265 _db.execute(query, (self.id,))
mblighe2586682008-02-29 22:45:46 +00002266
2267
showard63a34772008-08-18 19:32:50 +00002268 @staticmethod
2269 def _prefix_with(string, prefix):
2270 if string:
2271 string = prefix + string
2272 return string
2273
2274
jadmanski0afbb632008-06-06 21:10:57 +00002275 @classmethod
showard989f25d2008-10-01 11:38:11 +00002276 def fetch(cls, where='', params=(), joins='', order_by=''):
showard89f84db2009-03-12 20:39:13 +00002277 """
2278 Construct instances of our class based on the given database query.
2279
2280 @yields One class instance for each row fetched.
2281 """
showard63a34772008-08-18 19:32:50 +00002282 order_by = cls._prefix_with(order_by, 'ORDER BY ')
2283 where = cls._prefix_with(where, 'WHERE ')
2284 query = ('SELECT %(table)s.* FROM %(table)s %(joins)s '
showard6ae5ea92009-02-25 00:11:51 +00002285 '%(where)s %(order_by)s' % {'table' : cls._table_name,
showard63a34772008-08-18 19:32:50 +00002286 'joins' : joins,
2287 'where' : where,
2288 'order_by' : order_by})
2289 rows = _db.execute(query, params)
jadmanski0afbb632008-06-06 21:10:57 +00002290 for row in rows:
2291 yield cls(row=row)
mblighe2586682008-02-29 22:45:46 +00002292
mbligh36768f02008-02-22 18:28:33 +00002293
2294class IneligibleHostQueue(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002295 _table_name = 'ineligible_host_queues'
2296 _fields = ('id', 'job_id', 'host_id')
showard04c82c52008-05-29 19:38:12 +00002297
2298
showard89f84db2009-03-12 20:39:13 +00002299class AtomicGroup(DBObject):
2300 _table_name = 'atomic_groups'
showard205fd602009-03-21 00:17:35 +00002301 _fields = ('id', 'name', 'description', 'max_number_of_machines',
2302 'invalid')
showard89f84db2009-03-12 20:39:13 +00002303
2304
showard989f25d2008-10-01 11:38:11 +00002305class Label(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002306 _table_name = 'labels'
2307 _fields = ('id', 'name', 'kernel_config', 'platform', 'invalid',
showard89f84db2009-03-12 20:39:13 +00002308 'only_if_needed', 'atomic_group_id')
showard989f25d2008-10-01 11:38:11 +00002309
2310
mbligh36768f02008-02-22 18:28:33 +00002311class Host(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002312 _table_name = 'hosts'
2313 _fields = ('id', 'hostname', 'locked', 'synch_id', 'status',
2314 'invalid', 'protection', 'locked_by_id', 'lock_time', 'dirty')
2315
2316
jadmanski0afbb632008-06-06 21:10:57 +00002317 def current_task(self):
2318 rows = _db.execute("""
2319 SELECT * FROM host_queue_entries WHERE host_id=%s AND NOT complete AND active
2320 """, (self.id,))
2321
2322 if len(rows) == 0:
2323 return None
2324 else:
2325 assert len(rows) == 1
2326 results = rows[0];
jadmanski0afbb632008-06-06 21:10:57 +00002327 return HostQueueEntry(row=results)
mbligh36768f02008-02-22 18:28:33 +00002328
2329
jadmanski0afbb632008-06-06 21:10:57 +00002330 def yield_work(self):
showardb18134f2009-03-20 20:52:18 +00002331 logging.info("%s yielding work", self.hostname)
jadmanski0afbb632008-06-06 21:10:57 +00002332 if self.current_task():
2333 self.current_task().requeue()
2334
showard6ae5ea92009-02-25 00:11:51 +00002335
jadmanski0afbb632008-06-06 21:10:57 +00002336 def set_status(self,status):
showardb18134f2009-03-20 20:52:18 +00002337 logging.info('%s -> %s', self.hostname, status)
jadmanski0afbb632008-06-06 21:10:57 +00002338 self.update_field('status',status)
mbligh36768f02008-02-22 18:28:33 +00002339
2340
showard170873e2009-01-07 00:22:26 +00002341 def platform_and_labels(self):
showardd8e548a2008-09-09 03:04:57 +00002342 """
showard170873e2009-01-07 00:22:26 +00002343 Returns a tuple (platform_name, list_of_all_label_names).
showardd8e548a2008-09-09 03:04:57 +00002344 """
2345 rows = _db.execute("""
showard170873e2009-01-07 00:22:26 +00002346 SELECT labels.name, labels.platform
showardd8e548a2008-09-09 03:04:57 +00002347 FROM labels
2348 INNER JOIN hosts_labels ON labels.id = hosts_labels.label_id
showard170873e2009-01-07 00:22:26 +00002349 WHERE hosts_labels.host_id = %s
showardd8e548a2008-09-09 03:04:57 +00002350 ORDER BY labels.name
2351 """, (self.id,))
showard170873e2009-01-07 00:22:26 +00002352 platform = None
2353 all_labels = []
2354 for label_name, is_platform in rows:
2355 if is_platform:
2356 platform = label_name
2357 all_labels.append(label_name)
2358 return platform, all_labels
2359
2360
showarda64e52a2009-06-08 23:24:08 +00002361 def reverify_tasks(self, cleanup=True):
2362 tasks = [VerifyTask(host=self)]
showard6d7b2ff2009-06-10 00:16:47 +00002363 # just to make sure this host does not get taken away
2364 self.set_status('Verifying')
showarda64e52a2009-06-08 23:24:08 +00002365 if cleanup:
2366 tasks.insert(0, CleanupTask(host=self))
showard6d7b2ff2009-06-10 00:16:47 +00002367 self.set_status('Cleaning')
showarda64e52a2009-06-08 23:24:08 +00002368 return tasks
showardd8e548a2008-09-09 03:04:57 +00002369
2370
showard54c1ea92009-05-20 00:32:58 +00002371 _ALPHANUM_HOST_RE = re.compile(r'^([a-z-]+)(\d+)$', re.IGNORECASE)
2372
2373
2374 @classmethod
2375 def cmp_for_sort(cls, a, b):
2376 """
2377 A comparison function for sorting Host objects by hostname.
2378
2379 This strips any trailing numeric digits, ignores leading 0s and
2380 compares hostnames by the leading name and the trailing digits as a
2381 number. If both hostnames do not match this pattern, they are simply
2382 compared as lower case strings.
2383
2384 Example of how hostnames will be sorted:
2385
2386 alice, host1, host2, host09, host010, host10, host11, yolkfolk
2387
2388 This hopefully satisfy most people's hostname sorting needs regardless
2389 of their exact naming schemes. Nobody sane should have both a host10
2390 and host010 (but the algorithm works regardless).
2391 """
2392 lower_a = a.hostname.lower()
2393 lower_b = b.hostname.lower()
2394 match_a = cls._ALPHANUM_HOST_RE.match(lower_a)
2395 match_b = cls._ALPHANUM_HOST_RE.match(lower_b)
2396 if match_a and match_b:
2397 name_a, number_a_str = match_a.groups()
2398 name_b, number_b_str = match_b.groups()
2399 number_a = int(number_a_str.lstrip('0'))
2400 number_b = int(number_b_str.lstrip('0'))
2401 result = cmp((name_a, number_a), (name_b, number_b))
2402 if result == 0 and lower_a != lower_b:
2403 # If they compared equal above but the lower case names are
2404 # indeed different, don't report equality. abc012 != abc12.
2405 return cmp(lower_a, lower_b)
2406 return result
2407 else:
2408 return cmp(lower_a, lower_b)
2409
2410
mbligh36768f02008-02-22 18:28:33 +00002411class HostQueueEntry(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002412 _table_name = 'host_queue_entries'
2413 _fields = ('id', 'job_id', 'host_id', 'status', 'meta_host',
showard89f84db2009-03-12 20:39:13 +00002414 'active', 'complete', 'deleted', 'execution_subdir',
showard12f3e322009-05-13 21:27:42 +00002415 'atomic_group_id', 'aborted', 'started_on')
showard6ae5ea92009-02-25 00:11:51 +00002416
2417
showarda3c58572009-03-12 20:36:59 +00002418 def __init__(self, id=None, row=None, **kwargs):
jadmanski0afbb632008-06-06 21:10:57 +00002419 assert id or row
showarda3c58572009-03-12 20:36:59 +00002420 super(HostQueueEntry, self).__init__(id=id, row=row, **kwargs)
jadmanski0afbb632008-06-06 21:10:57 +00002421 self.job = Job(self.job_id)
mbligh36768f02008-02-22 18:28:33 +00002422
jadmanski0afbb632008-06-06 21:10:57 +00002423 if self.host_id:
2424 self.host = Host(self.host_id)
2425 else:
2426 self.host = None
mbligh36768f02008-02-22 18:28:33 +00002427
showard77182562009-06-10 00:16:05 +00002428 if self.atomic_group_id:
2429 self.atomic_group = AtomicGroup(self.atomic_group_id,
2430 always_query=False)
2431 else:
2432 self.atomic_group = None
2433
showard170873e2009-01-07 00:22:26 +00002434 self.queue_log_path = os.path.join(self.job.tag(),
jadmanski0afbb632008-06-06 21:10:57 +00002435 'queue.log.' + str(self.id))
mbligh36768f02008-02-22 18:28:33 +00002436
2437
showard89f84db2009-03-12 20:39:13 +00002438 @classmethod
2439 def clone(cls, template):
2440 """
2441 Creates a new row using the values from a template instance.
2442
2443 The new instance will not exist in the database or have a valid
2444 id attribute until its save() method is called.
2445 """
2446 assert isinstance(template, cls)
2447 new_row = [getattr(template, field) for field in cls._fields]
2448 clone = cls(row=new_row, new_record=True)
2449 clone.id = None
2450 return clone
2451
2452
showardc85c21b2008-11-24 22:17:37 +00002453 def _view_job_url(self):
2454 return "%s#tab_id=view_job&object_id=%s" % (_base_url, self.job.id)
2455
2456
showardf1ae3542009-05-11 19:26:02 +00002457 def get_labels(self):
2458 """
2459 Get all labels associated with this host queue entry (either via the
2460 meta_host or as a job dependency label). The labels yielded are not
2461 guaranteed to be unique.
2462
2463 @yields Label instances associated with this host_queue_entry.
2464 """
2465 if self.meta_host:
2466 yield Label(id=self.meta_host, always_query=False)
2467 labels = Label.fetch(
2468 joins="JOIN jobs_dependency_labels AS deps "
2469 "ON (labels.id = deps.label_id)",
2470 where="deps.job_id = %d" % self.job.id)
2471 for label in labels:
2472 yield label
2473
2474
jadmanski0afbb632008-06-06 21:10:57 +00002475 def set_host(self, host):
2476 if host:
2477 self.queue_log_record('Assigning host ' + host.hostname)
2478 self.update_field('host_id', host.id)
2479 self.update_field('active', True)
2480 self.block_host(host.id)
2481 else:
2482 self.queue_log_record('Releasing host')
2483 self.unblock_host(self.host.id)
2484 self.update_field('host_id', None)
mbligh36768f02008-02-22 18:28:33 +00002485
jadmanski0afbb632008-06-06 21:10:57 +00002486 self.host = host
mbligh36768f02008-02-22 18:28:33 +00002487
2488
jadmanski0afbb632008-06-06 21:10:57 +00002489 def get_host(self):
2490 return self.host
mbligh36768f02008-02-22 18:28:33 +00002491
2492
jadmanski0afbb632008-06-06 21:10:57 +00002493 def queue_log_record(self, log_line):
2494 now = str(datetime.datetime.now())
showard170873e2009-01-07 00:22:26 +00002495 _drone_manager.write_lines_to_file(self.queue_log_path,
2496 [now + ' ' + log_line])
mbligh36768f02008-02-22 18:28:33 +00002497
2498
jadmanski0afbb632008-06-06 21:10:57 +00002499 def block_host(self, host_id):
showardb18134f2009-03-20 20:52:18 +00002500 logging.info("creating block %s/%s", self.job.id, host_id)
jadmanski0afbb632008-06-06 21:10:57 +00002501 row = [0, self.job.id, host_id]
2502 block = IneligibleHostQueue(row=row, new_record=True)
2503 block.save()
mblighe2586682008-02-29 22:45:46 +00002504
2505
jadmanski0afbb632008-06-06 21:10:57 +00002506 def unblock_host(self, host_id):
showardb18134f2009-03-20 20:52:18 +00002507 logging.info("removing block %s/%s", self.job.id, host_id)
jadmanski0afbb632008-06-06 21:10:57 +00002508 blocks = IneligibleHostQueue.fetch(
2509 'job_id=%d and host_id=%d' % (self.job.id, host_id))
2510 for block in blocks:
2511 block.delete()
mblighe2586682008-02-29 22:45:46 +00002512
2513
showard2bab8f42008-11-12 18:15:22 +00002514 def set_execution_subdir(self, subdir=None):
2515 if subdir is None:
2516 assert self.get_host()
2517 subdir = self.get_host().hostname
2518 self.update_field('execution_subdir', subdir)
mbligh36768f02008-02-22 18:28:33 +00002519
2520
showard6355f6b2008-12-05 18:52:13 +00002521 def _get_hostname(self):
2522 if self.host:
2523 return self.host.hostname
2524 return 'no host'
2525
2526
showard170873e2009-01-07 00:22:26 +00002527 def __str__(self):
2528 return "%s/%d (%d)" % (self._get_hostname(), self.job.id, self.id)
2529
2530
jadmanski0afbb632008-06-06 21:10:57 +00002531 def set_status(self, status):
showardd3dc1992009-04-22 21:01:40 +00002532 self.update_field('status', status)
mblighf8c624d2008-07-03 16:58:45 +00002533
showardb18134f2009-03-20 20:52:18 +00002534 logging.info("%s -> %s", self, self.status)
mblighf8c624d2008-07-03 16:58:45 +00002535
showardc85c21b2008-11-24 22:17:37 +00002536 if status in ['Queued', 'Parsing']:
jadmanski0afbb632008-06-06 21:10:57 +00002537 self.update_field('complete', False)
2538 self.update_field('active', False)
mbligh36768f02008-02-22 18:28:33 +00002539
jadmanski0afbb632008-06-06 21:10:57 +00002540 if status in ['Pending', 'Running', 'Verifying', 'Starting',
showardd3dc1992009-04-22 21:01:40 +00002541 'Gathering']:
jadmanski0afbb632008-06-06 21:10:57 +00002542 self.update_field('complete', False)
2543 self.update_field('active', True)
mbligh36768f02008-02-22 18:28:33 +00002544
showardc85c21b2008-11-24 22:17:37 +00002545 if status in ['Failed', 'Completed', 'Stopped', 'Aborted']:
jadmanski0afbb632008-06-06 21:10:57 +00002546 self.update_field('complete', True)
2547 self.update_field('active', False)
showardc85c21b2008-11-24 22:17:37 +00002548
2549 should_email_status = (status.lower() in _notify_email_statuses or
2550 'all' in _notify_email_statuses)
2551 if should_email_status:
2552 self._email_on_status(status)
2553
2554 self._email_on_job_complete()
2555
2556
2557 def _email_on_status(self, status):
showard6355f6b2008-12-05 18:52:13 +00002558 hostname = self._get_hostname()
showardc85c21b2008-11-24 22:17:37 +00002559
2560 subject = 'Autotest: Job ID: %s "%s" Host: %s %s' % (
2561 self.job.id, self.job.name, hostname, status)
2562 body = "Job ID: %s\nJob Name: %s\nHost: %s\nStatus: %s\n%s\n" % (
2563 self.job.id, self.job.name, hostname, status,
2564 self._view_job_url())
showard170873e2009-01-07 00:22:26 +00002565 email_manager.manager.send_email(self.job.email_list, subject, body)
showard542e8402008-09-19 20:16:18 +00002566
2567
2568 def _email_on_job_complete(self):
showardc85c21b2008-11-24 22:17:37 +00002569 if not self.job.is_finished():
2570 return
showard542e8402008-09-19 20:16:18 +00002571
showardc85c21b2008-11-24 22:17:37 +00002572 summary_text = []
showard6355f6b2008-12-05 18:52:13 +00002573 hosts_queue = HostQueueEntry.fetch('job_id = %s' % self.job.id)
showardc85c21b2008-11-24 22:17:37 +00002574 for queue_entry in hosts_queue:
2575 summary_text.append("Host: %s Status: %s" %
showard6355f6b2008-12-05 18:52:13 +00002576 (queue_entry._get_hostname(),
showardc85c21b2008-11-24 22:17:37 +00002577 queue_entry.status))
2578
2579 summary_text = "\n".join(summary_text)
2580 status_counts = models.Job.objects.get_status_counts(
2581 [self.job.id])[self.job.id]
2582 status = ', '.join('%d %s' % (count, status) for status, count
2583 in status_counts.iteritems())
2584
2585 subject = 'Autotest: Job ID: %s "%s" %s' % (
2586 self.job.id, self.job.name, status)
2587 body = "Job ID: %s\nJob Name: %s\nStatus: %s\n%s\nSummary:\n%s" % (
2588 self.job.id, self.job.name, status, self._view_job_url(),
2589 summary_text)
showard170873e2009-01-07 00:22:26 +00002590 email_manager.manager.send_email(self.job.email_list, subject, body)
mbligh36768f02008-02-22 18:28:33 +00002591
2592
showard77182562009-06-10 00:16:05 +00002593 def run_pre_job_tasks(self, assigned_host=None):
2594 if self.meta_host is not None or self.atomic_group:
jadmanski0afbb632008-06-06 21:10:57 +00002595 assert assigned_host
2596 # ensure results dir exists for the queue log
showard08356c12009-06-15 20:24:01 +00002597 if self.host_id is None:
2598 self.set_host(assigned_host)
2599 else:
2600 assert assigned_host.id == self.host_id
mbligh36768f02008-02-22 18:28:33 +00002601
showardb18134f2009-03-20 20:52:18 +00002602 logging.info("%s/%s/%s scheduled on %s, status=%s",
2603 self.job.name, self.meta_host, self.atomic_group_id,
2604 self.host.hostname, self.status)
mbligh36768f02008-02-22 18:28:33 +00002605
showard77182562009-06-10 00:16:05 +00002606 return self._do_run_pre_job_tasks()
2607
2608
2609 def _do_run_pre_job_tasks(self):
2610 # Every host goes thru the Verifying stage (which may or may not
2611 # actually do anything as determined by get_pre_job_tasks).
2612 self.set_status(models.HostQueueEntry.Status.VERIFYING)
2613
2614 # The pre job tasks always end with a SetEntryPendingTask which
2615 # will continue as appropriate through queue_entry.on_pending().
2616 return Agent(self.job.get_pre_job_tasks(queue_entry=self))
mblighe2586682008-02-29 22:45:46 +00002617
showard6ae5ea92009-02-25 00:11:51 +00002618
jadmanski0afbb632008-06-06 21:10:57 +00002619 def requeue(self):
2620 self.set_status('Queued')
showard12f3e322009-05-13 21:27:42 +00002621 self.update_field('started_on', None)
showardde634ee2009-01-30 01:44:24 +00002622 # verify/cleanup failure sets the execution subdir, so reset it here
2623 self.set_execution_subdir('')
jadmanski0afbb632008-06-06 21:10:57 +00002624 if self.meta_host:
2625 self.set_host(None)
mbligh36768f02008-02-22 18:28:33 +00002626
2627
jadmanski0afbb632008-06-06 21:10:57 +00002628 def handle_host_failure(self):
2629 """\
2630 Called when this queue entry's host has failed verification and
2631 repair.
2632 """
2633 assert not self.meta_host
2634 self.set_status('Failed')
showard2bab8f42008-11-12 18:15:22 +00002635 self.job.stop_if_necessary()
mblighe2586682008-02-29 22:45:46 +00002636
2637
jadmanskif7fa2cc2008-10-01 14:13:23 +00002638 @property
2639 def aborted_by(self):
2640 self._load_abort_info()
2641 return self._aborted_by
2642
2643
2644 @property
2645 def aborted_on(self):
2646 self._load_abort_info()
2647 return self._aborted_on
2648
2649
2650 def _load_abort_info(self):
2651 """ Fetch info about who aborted the job. """
2652 if hasattr(self, "_aborted_by"):
2653 return
2654 rows = _db.execute("""
2655 SELECT users.login, aborted_host_queue_entries.aborted_on
2656 FROM aborted_host_queue_entries
2657 INNER JOIN users
2658 ON users.id = aborted_host_queue_entries.aborted_by_id
2659 WHERE aborted_host_queue_entries.queue_entry_id = %s
2660 """, (self.id,))
2661 if rows:
2662 self._aborted_by, self._aborted_on = rows[0]
2663 else:
2664 self._aborted_by = self._aborted_on = None
2665
2666
showardb2e2c322008-10-14 17:33:55 +00002667 def on_pending(self):
2668 """
2669 Called when an entry in a synchronous job has passed verify. If the
2670 job is ready to run, returns an agent to run the job. Returns None
2671 otherwise.
2672 """
2673 self.set_status('Pending')
showardcfd66a32008-10-15 20:31:48 +00002674 self.get_host().set_status('Pending')
showard77182562009-06-10 00:16:05 +00002675 return self.job.run_if_ready(queue_entry=self)
showardb2e2c322008-10-14 17:33:55 +00002676
2677
showardd3dc1992009-04-22 21:01:40 +00002678 def abort(self, dispatcher):
2679 assert self.aborted and not self.complete
showard1be97432008-10-17 15:30:45 +00002680
showardd3dc1992009-04-22 21:01:40 +00002681 Status = models.HostQueueEntry.Status
2682 has_running_job_agent = (
2683 self.status in (Status.RUNNING, Status.GATHERING, Status.PARSING)
2684 and dispatcher.get_agents_for_entry(self))
2685 if has_running_job_agent:
2686 # do nothing; post-job tasks will finish and then mark this entry
2687 # with status "Aborted" and take care of the host
2688 return
2689
2690 if self.status in (Status.STARTING, Status.PENDING):
2691 self.host.set_status(models.Host.Status.READY)
2692 elif self.status == Status.VERIFYING:
2693 dispatcher.add_agent(Agent(tasks=self.host.reverify_tasks()))
2694
2695 self.set_status(Status.ABORTED)
showard170873e2009-01-07 00:22:26 +00002696
2697 def execution_tag(self):
2698 assert self.execution_subdir
2699 return "%s-%s/%s" % (self.job.id, self.job.owner, self.execution_subdir)
showard1be97432008-10-17 15:30:45 +00002700
2701
mbligh36768f02008-02-22 18:28:33 +00002702class Job(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002703 _table_name = 'jobs'
2704 _fields = ('id', 'owner', 'name', 'priority', 'control_file',
2705 'control_type', 'created_on', 'synch_count', 'timeout',
showarda1e74b32009-05-12 17:32:04 +00002706 'run_verify', 'email_list', 'reboot_before', 'reboot_after',
showard12f3e322009-05-13 21:27:42 +00002707 'parse_failed_repair', 'max_runtime_hrs')
showard6ae5ea92009-02-25 00:11:51 +00002708
showard77182562009-06-10 00:16:05 +00002709 # This does not need to be a column in the DB. The delays are likely to
2710 # be configured short. If the scheduler is stopped and restarted in
2711 # the middle of a job's delay cycle, the delay cycle will either be
2712 # repeated or skipped depending on the number of Pending machines found
2713 # when the restarted scheduler recovers to track it. Not a problem.
2714 #
2715 # A reference to the DelayedCallTask that will wake up the job should
2716 # no other HQEs change state in time. Its end_time attribute is used
2717 # by our run_with_ready_delay() method to determine if the wait is over.
2718 _delay_ready_task = None
2719
2720 # TODO(gps): On scheduler start/recovery we need to call HQE.on_pending() on
2721 # all status='Pending' atomic group HQEs incase a delay was running when the
2722 # scheduler was restarted and no more hosts ever successfully exit Verify.
showard6ae5ea92009-02-25 00:11:51 +00002723
showarda3c58572009-03-12 20:36:59 +00002724 def __init__(self, id=None, row=None, **kwargs):
jadmanski0afbb632008-06-06 21:10:57 +00002725 assert id or row
showarda3c58572009-03-12 20:36:59 +00002726 super(Job, self).__init__(id=id, row=row, **kwargs)
mbligh36768f02008-02-22 18:28:33 +00002727
mblighe2586682008-02-29 22:45:46 +00002728
jadmanski0afbb632008-06-06 21:10:57 +00002729 def is_server_job(self):
2730 return self.control_type != 2
mbligh36768f02008-02-22 18:28:33 +00002731
2732
showard170873e2009-01-07 00:22:26 +00002733 def tag(self):
2734 return "%s-%s" % (self.id, self.owner)
2735
2736
jadmanski0afbb632008-06-06 21:10:57 +00002737 def get_host_queue_entries(self):
2738 rows = _db.execute("""
2739 SELECT * FROM host_queue_entries
2740 WHERE job_id= %s
2741 """, (self.id,))
2742 entries = [HostQueueEntry(row=i) for i in rows]
mbligh36768f02008-02-22 18:28:33 +00002743
jadmanski0afbb632008-06-06 21:10:57 +00002744 assert len(entries)>0
mbligh36768f02008-02-22 18:28:33 +00002745
jadmanski0afbb632008-06-06 21:10:57 +00002746 return entries
mbligh36768f02008-02-22 18:28:33 +00002747
2748
jadmanski0afbb632008-06-06 21:10:57 +00002749 def set_status(self, status, update_queues=False):
2750 self.update_field('status',status)
2751
2752 if update_queues:
2753 for queue_entry in self.get_host_queue_entries():
2754 queue_entry.set_status(status)
mbligh36768f02008-02-22 18:28:33 +00002755
2756
showard77182562009-06-10 00:16:05 +00002757 def _atomic_and_has_started(self):
2758 """
2759 @returns True if any of the HostQueueEntries associated with this job
2760 have entered the Status.STARTING state or beyond.
2761 """
2762 atomic_entries = models.HostQueueEntry.objects.filter(
2763 job=self.id, atomic_group__isnull=False)
2764 if atomic_entries.count() <= 0:
2765 return False
2766
showardaf8b4ca2009-06-16 18:47:26 +00002767 # These states may *only* be reached if Job.run() has been called.
2768 started_statuses = (models.HostQueueEntry.Status.STARTING,
2769 models.HostQueueEntry.Status.RUNNING,
2770 models.HostQueueEntry.Status.COMPLETED)
2771
2772 started_entries = atomic_entries.filter(status__in=started_statuses)
showard77182562009-06-10 00:16:05 +00002773 return started_entries.count() > 0
2774
2775
2776 def _pending_count(self):
2777 """The number of HostQueueEntries for this job in the Pending state."""
2778 pending_entries = models.HostQueueEntry.objects.filter(
2779 job=self.id, status=models.HostQueueEntry.Status.PENDING)
2780 return pending_entries.count()
2781
2782
jadmanski0afbb632008-06-06 21:10:57 +00002783 def is_ready(self):
showard77182562009-06-10 00:16:05 +00002784 # NOTE: Atomic group jobs stop reporting ready after they have been
2785 # started to avoid launching multiple copies of one atomic job.
2786 # Only possible if synch_count is less than than half the number of
2787 # machines in the atomic group.
2788 return (self._pending_count() >= self.synch_count
2789 and not self._atomic_and_has_started())
mbligh36768f02008-02-22 18:28:33 +00002790
2791
jadmanski0afbb632008-06-06 21:10:57 +00002792 def num_machines(self, clause = None):
2793 sql = "job_id=%s" % self.id
2794 if clause:
2795 sql += " AND (%s)" % clause
2796 return self.count(sql, table='host_queue_entries')
mbligh36768f02008-02-22 18:28:33 +00002797
2798
jadmanski0afbb632008-06-06 21:10:57 +00002799 def num_queued(self):
2800 return self.num_machines('not complete')
mbligh36768f02008-02-22 18:28:33 +00002801
2802
jadmanski0afbb632008-06-06 21:10:57 +00002803 def num_active(self):
2804 return self.num_machines('active')
mbligh36768f02008-02-22 18:28:33 +00002805
2806
jadmanski0afbb632008-06-06 21:10:57 +00002807 def num_complete(self):
2808 return self.num_machines('complete')
mbligh36768f02008-02-22 18:28:33 +00002809
2810
jadmanski0afbb632008-06-06 21:10:57 +00002811 def is_finished(self):
showardc85c21b2008-11-24 22:17:37 +00002812 return self.num_complete() == self.num_machines()
mbligh36768f02008-02-22 18:28:33 +00002813
mbligh36768f02008-02-22 18:28:33 +00002814
showard6bb7c292009-01-30 01:44:51 +00002815 def _not_yet_run_entries(self, include_verifying=True):
2816 statuses = [models.HostQueueEntry.Status.QUEUED,
2817 models.HostQueueEntry.Status.PENDING]
2818 if include_verifying:
2819 statuses.append(models.HostQueueEntry.Status.VERIFYING)
2820 return models.HostQueueEntry.objects.filter(job=self.id,
2821 status__in=statuses)
2822
2823
2824 def _stop_all_entries(self):
2825 entries_to_stop = self._not_yet_run_entries(
2826 include_verifying=False)
2827 for child_entry in entries_to_stop:
showard4f9e5372009-01-07 21:33:38 +00002828 assert not child_entry.complete, (
2829 '%s status=%s, active=%s, complete=%s' %
2830 (child_entry.id, child_entry.status, child_entry.active,
2831 child_entry.complete))
showard2bab8f42008-11-12 18:15:22 +00002832 if child_entry.status == models.HostQueueEntry.Status.PENDING:
2833 child_entry.host.status = models.Host.Status.READY
2834 child_entry.host.save()
2835 child_entry.status = models.HostQueueEntry.Status.STOPPED
2836 child_entry.save()
2837
showard2bab8f42008-11-12 18:15:22 +00002838 def stop_if_necessary(self):
showard6bb7c292009-01-30 01:44:51 +00002839 not_yet_run = self._not_yet_run_entries()
showard2bab8f42008-11-12 18:15:22 +00002840 if not_yet_run.count() < self.synch_count:
showard6bb7c292009-01-30 01:44:51 +00002841 self._stop_all_entries()
mblighe2586682008-02-29 22:45:46 +00002842
2843
jadmanski0afbb632008-06-06 21:10:57 +00002844 def write_to_machines_file(self, queue_entry):
2845 hostname = queue_entry.get_host().hostname
showard170873e2009-01-07 00:22:26 +00002846 file_path = os.path.join(self.tag(), '.machines')
2847 _drone_manager.write_lines_to_file(file_path, [hostname])
mbligh36768f02008-02-22 18:28:33 +00002848
2849
showardf1ae3542009-05-11 19:26:02 +00002850 def _next_group_name(self, group_name=''):
2851 """@returns a directory name to use for the next host group results."""
2852 if group_name:
2853 # Sanitize for use as a pathname.
2854 group_name = group_name.replace(os.path.sep, '_')
2855 if group_name.startswith('.'):
2856 group_name = '_' + group_name[1:]
2857 # Add a separator between the group name and 'group%d'.
2858 group_name += '.'
2859 group_count_re = re.compile(r'%sgroup(\d+)' % re.escape(group_name))
showard2bab8f42008-11-12 18:15:22 +00002860 query = models.HostQueueEntry.objects.filter(
2861 job=self.id).values('execution_subdir').distinct()
2862 subdirs = (entry['execution_subdir'] for entry in query)
showardf1ae3542009-05-11 19:26:02 +00002863 group_matches = (group_count_re.match(subdir) for subdir in subdirs)
2864 ids = [int(match.group(1)) for match in group_matches if match]
showard2bab8f42008-11-12 18:15:22 +00002865 if ids:
2866 next_id = max(ids) + 1
2867 else:
2868 next_id = 0
showardf1ae3542009-05-11 19:26:02 +00002869 return '%sgroup%d' % (group_name, next_id)
showard2bab8f42008-11-12 18:15:22 +00002870
2871
showard170873e2009-01-07 00:22:26 +00002872 def _write_control_file(self, execution_tag):
2873 control_path = _drone_manager.attach_file_to_execution(
2874 execution_tag, self.control_file)
2875 return control_path
mbligh36768f02008-02-22 18:28:33 +00002876
showardb2e2c322008-10-14 17:33:55 +00002877
showard2bab8f42008-11-12 18:15:22 +00002878 def get_group_entries(self, queue_entry_from_group):
2879 execution_subdir = queue_entry_from_group.execution_subdir
showarde788ea62008-11-17 21:02:47 +00002880 return list(HostQueueEntry.fetch(
2881 where='job_id=%s AND execution_subdir=%s',
2882 params=(self.id, execution_subdir)))
showard2bab8f42008-11-12 18:15:22 +00002883
2884
showardb2e2c322008-10-14 17:33:55 +00002885 def _get_autoserv_params(self, queue_entries):
showard170873e2009-01-07 00:22:26 +00002886 assert queue_entries
2887 execution_tag = queue_entries[0].execution_tag()
2888 control_path = self._write_control_file(execution_tag)
jadmanski0afbb632008-06-06 21:10:57 +00002889 hostnames = ','.join([entry.get_host().hostname
2890 for entry in queue_entries])
mbligh36768f02008-02-22 18:28:33 +00002891
showard87ba02a2009-04-20 19:37:32 +00002892 params = _autoserv_command_line(
2893 hostnames, execution_tag,
2894 ['-P', execution_tag, '-n',
2895 _drone_manager.absolute_path(control_path)],
2896 job=self)
mbligh36768f02008-02-22 18:28:33 +00002897
jadmanski0afbb632008-06-06 21:10:57 +00002898 if not self.is_server_job():
2899 params.append('-c')
mbligh36768f02008-02-22 18:28:33 +00002900
showardb2e2c322008-10-14 17:33:55 +00002901 return params
mblighe2586682008-02-29 22:45:46 +00002902
mbligh36768f02008-02-22 18:28:33 +00002903
showardc9ae1782009-01-30 01:42:37 +00002904 def _should_run_cleanup(self, queue_entry):
showard0fc38302008-10-23 00:44:07 +00002905 if self.reboot_before == models.RebootBefore.ALWAYS:
showardc9ae1782009-01-30 01:42:37 +00002906 return True
showard0fc38302008-10-23 00:44:07 +00002907 elif self.reboot_before == models.RebootBefore.IF_DIRTY:
showardc9ae1782009-01-30 01:42:37 +00002908 return queue_entry.get_host().dirty
2909 return False
showard21baa452008-10-21 00:08:39 +00002910
showardc9ae1782009-01-30 01:42:37 +00002911
2912 def _should_run_verify(self, queue_entry):
2913 do_not_verify = (queue_entry.host.protection ==
2914 host_protections.Protection.DO_NOT_VERIFY)
2915 if do_not_verify:
2916 return False
2917 return self.run_verify
2918
2919
showard77182562009-06-10 00:16:05 +00002920 def get_pre_job_tasks(self, queue_entry):
2921 """
2922 Get a list of tasks to perform before the host_queue_entry
2923 may be used to run this Job (such as Cleanup & Verify).
2924
2925 @returns A list of tasks to be done to the given queue_entry before
2926 it should be considered be ready to run this job. The last
2927 task in the list calls HostQueueEntry.on_pending(), which
2928 continues the flow of the job.
2929 """
showard21baa452008-10-21 00:08:39 +00002930 tasks = []
showardc9ae1782009-01-30 01:42:37 +00002931 if self._should_run_cleanup(queue_entry):
showard45ae8192008-11-05 19:32:53 +00002932 tasks.append(CleanupTask(queue_entry=queue_entry))
showardc9ae1782009-01-30 01:42:37 +00002933 if self._should_run_verify(queue_entry):
2934 tasks.append(VerifyTask(queue_entry=queue_entry))
2935 tasks.append(SetEntryPendingTask(queue_entry))
showard21baa452008-10-21 00:08:39 +00002936 return tasks
2937
2938
showardf1ae3542009-05-11 19:26:02 +00002939 def _assign_new_group(self, queue_entries, group_name=''):
showard2bab8f42008-11-12 18:15:22 +00002940 if len(queue_entries) == 1:
showardf1ae3542009-05-11 19:26:02 +00002941 group_subdir_name = queue_entries[0].get_host().hostname
showard2bab8f42008-11-12 18:15:22 +00002942 else:
showardf1ae3542009-05-11 19:26:02 +00002943 group_subdir_name = self._next_group_name(group_name)
showardb18134f2009-03-20 20:52:18 +00002944 logging.info('Running synchronous job %d hosts %s as %s',
showard2bab8f42008-11-12 18:15:22 +00002945 self.id, [entry.host.hostname for entry in queue_entries],
showardf1ae3542009-05-11 19:26:02 +00002946 group_subdir_name)
showard2bab8f42008-11-12 18:15:22 +00002947
2948 for queue_entry in queue_entries:
showardf1ae3542009-05-11 19:26:02 +00002949 queue_entry.set_execution_subdir(group_subdir_name)
showard2bab8f42008-11-12 18:15:22 +00002950
2951
2952 def _choose_group_to_run(self, include_queue_entry):
showardf1ae3542009-05-11 19:26:02 +00002953 """
2954 @returns A tuple containing a list of HostQueueEntry instances to be
2955 used to run this Job, a string group name to suggest giving
showard54c1ea92009-05-20 00:32:58 +00002956 to this job in the results database.
showardf1ae3542009-05-11 19:26:02 +00002957 """
showard77182562009-06-10 00:16:05 +00002958 atomic_group = include_queue_entry.atomic_group
showard2bab8f42008-11-12 18:15:22 +00002959 chosen_entries = [include_queue_entry]
showardf1ae3542009-05-11 19:26:02 +00002960 if atomic_group:
2961 num_entries_wanted = atomic_group.max_number_of_machines
2962 else:
2963 num_entries_wanted = self.synch_count
2964 num_entries_wanted -= len(chosen_entries)
showard2bab8f42008-11-12 18:15:22 +00002965
showardf1ae3542009-05-11 19:26:02 +00002966 if num_entries_wanted > 0:
2967 where_clause = 'job_id = %s AND status = "Pending" AND id != %s'
showard54c1ea92009-05-20 00:32:58 +00002968 pending_entries = list(HostQueueEntry.fetch(
showardf1ae3542009-05-11 19:26:02 +00002969 where=where_clause,
showard54c1ea92009-05-20 00:32:58 +00002970 params=(self.id, include_queue_entry.id)))
2971
2972 # Sort the chosen hosts by hostname before slicing.
2973 def cmp_queue_entries_by_hostname(entry_a, entry_b):
2974 return Host.cmp_for_sort(entry_a.host, entry_b.host)
2975 pending_entries.sort(cmp=cmp_queue_entries_by_hostname)
2976 chosen_entries += pending_entries[:num_entries_wanted]
showard2bab8f42008-11-12 18:15:22 +00002977
showardf1ae3542009-05-11 19:26:02 +00002978 # Sanity check. We'll only ever be called if this can be met.
2979 assert len(chosen_entries) >= self.synch_count
2980
2981 if atomic_group:
2982 # Look at any meta_host and dependency labels and pick the first
2983 # one that also specifies this atomic group. Use that label name
2984 # as the group name if possible (it is more specific).
2985 group_name = atomic_group.name
2986 for label in include_queue_entry.get_labels():
2987 if label.atomic_group_id:
2988 assert label.atomic_group_id == atomic_group.id
2989 group_name = label.name
2990 break
2991 else:
2992 group_name = ''
2993
2994 self._assign_new_group(chosen_entries, group_name=group_name)
2995 return chosen_entries, group_name
showard2bab8f42008-11-12 18:15:22 +00002996
2997
showard77182562009-06-10 00:16:05 +00002998 def run_if_ready(self, queue_entry):
2999 """
3000 @returns An Agent instance to ultimately run this job if enough hosts
3001 are ready for it to run.
3002 @returns None and potentially cleans up excess hosts if this Job
3003 is not ready to run.
3004 """
showardb2e2c322008-10-14 17:33:55 +00003005 if not self.is_ready():
showard77182562009-06-10 00:16:05 +00003006 self.stop_if_necessary()
3007 return None
mbligh36768f02008-02-22 18:28:33 +00003008
showard77182562009-06-10 00:16:05 +00003009 if queue_entry.atomic_group:
3010 return self.run_with_ready_delay(queue_entry)
3011
3012 return self.run(queue_entry)
3013
3014
3015 def run_with_ready_delay(self, queue_entry):
3016 """
3017 Start a delay to wait for more hosts to enter Pending state before
3018 launching an atomic group job. Once set, the a delay cannot be reset.
3019
3020 @param queue_entry: The HostQueueEntry object to get atomic group
3021 info from and pass to run_if_ready when the delay is up.
3022
3023 @returns An Agent to run the job as appropriate or None if a delay
3024 has already been set.
3025 """
3026 assert queue_entry.job_id == self.id
3027 assert queue_entry.atomic_group
3028 delay = scheduler_config.config.secs_to_wait_for_atomic_group_hosts
3029 pending_threshold = queue_entry.atomic_group.max_number_of_machines
3030 over_max_threshold = (self._pending_count() >= pending_threshold)
3031 delay_expired = (self._delay_ready_task and
3032 time.time() >= self._delay_ready_task.end_time)
3033
3034 # Delay is disabled or we already have enough? Do not wait to run.
3035 if not delay or over_max_threshold or delay_expired:
3036 return self.run(queue_entry)
3037
3038 # A delay was previously scheduled.
3039 if self._delay_ready_task:
3040 return None
3041
3042 def run_job_after_delay():
3043 logging.info('Job %s done waiting for extra hosts.', self.id)
3044 return self.run(queue_entry)
3045
3046 self._delay_ready_task = DelayedCallTask(delay_seconds=delay,
3047 callback=run_job_after_delay)
3048
3049 return Agent([self._delay_ready_task], num_processes=0)
3050
3051
3052 def run(self, queue_entry):
3053 """
3054 @param queue_entry: The HostQueueEntry instance calling this method.
3055 @returns An Agent instance to run this job or None if we've already
3056 been run.
3057 """
3058 if queue_entry.atomic_group and self._atomic_and_has_started():
3059 logging.error('Job.run() called on running atomic Job %d '
3060 'with HQE %s.', self.id, queue_entry)
3061 return None
showardf1ae3542009-05-11 19:26:02 +00003062 queue_entries, group_name = self._choose_group_to_run(queue_entry)
3063 return self._finish_run(queue_entries, group_name)
showardb2e2c322008-10-14 17:33:55 +00003064
3065
showardf1ae3542009-05-11 19:26:02 +00003066 def _finish_run(self, queue_entries, group_name):
showardb2ccdda2008-10-28 20:39:05 +00003067 for queue_entry in queue_entries:
3068 queue_entry.set_status('Starting')
showardb2e2c322008-10-14 17:33:55 +00003069 params = self._get_autoserv_params(queue_entries)
3070 queue_task = QueueTask(job=self, queue_entries=queue_entries,
showardf1ae3542009-05-11 19:26:02 +00003071 cmd=params, group_name=group_name)
3072 tasks = [queue_task]
showard77182562009-06-10 00:16:05 +00003073 if self._delay_ready_task:
3074 # Cancel any pending callback that would try to run again
3075 # as we are already running.
3076 self._delay_ready_task.abort()
showardb2e2c322008-10-14 17:33:55 +00003077
showard170873e2009-01-07 00:22:26 +00003078 return Agent(tasks, num_processes=len(queue_entries))
showardb2e2c322008-10-14 17:33:55 +00003079
3080
mbligh36768f02008-02-22 18:28:33 +00003081if __name__ == '__main__':
jadmanski0afbb632008-06-06 21:10:57 +00003082 main()