blob: c23c2f7bbe0c56605de8f3f5e4704194f561f842 [file] [log] [blame]
mbligh36768f02008-02-22 18:28:33 +00001#!/usr/bin/python -u
2
3"""
4Autotest scheduler
5"""
showard909c7a62008-07-15 21:52:38 +00006
mbligh36768f02008-02-22 18:28:33 +00007
showardef519212009-05-08 02:29:53 +00008import datetime, errno, optparse, os, pwd, Queue, re, shutil, signal
showard542e8402008-09-19 20:16:18 +00009import smtplib, socket, stat, subprocess, sys, tempfile, time, traceback
showard136e6dc2009-06-10 19:38:49 +000010import itertools, logging, weakref
mbligh70feeee2008-06-11 16:20:49 +000011import common
mbligh8bcd23a2009-02-03 19:14:06 +000012import MySQLdb
showard043c62a2009-06-10 19:48:57 +000013from autotest_lib.scheduler import scheduler_logging_config
showard21baa452008-10-21 00:08:39 +000014from autotest_lib.frontend import setup_django_environment
showard136e6dc2009-06-10 19:38:49 +000015from autotest_lib.client.common_lib import global_config, logging_manager
showardb18134f2009-03-20 20:52:18 +000016from autotest_lib.client.common_lib import host_protections, utils
showardb1e51872008-10-07 11:08:18 +000017from autotest_lib.database import database_connection
showard844960a2009-05-29 18:41:18 +000018from autotest_lib.frontend.afe import models, rpc_utils, readonly_connection
showard170873e2009-01-07 00:22:26 +000019from autotest_lib.scheduler import drone_manager, drones, email_manager
showard043c62a2009-06-10 19:48:57 +000020from autotest_lib.scheduler import monitor_db_cleanup
showardd1ee1dd2009-01-07 21:33:08 +000021from autotest_lib.scheduler import status_server, scheduler_config
mbligh70feeee2008-06-11 16:20:49 +000022
mblighb090f142008-02-27 21:33:46 +000023
mbligh36768f02008-02-22 18:28:33 +000024RESULTS_DIR = '.'
25AUTOSERV_NICE_LEVEL = 10
showard170873e2009-01-07 00:22:26 +000026DB_CONFIG_SECTION = 'AUTOTEST_WEB'
mbligh36768f02008-02-22 18:28:33 +000027AUTOTEST_PATH = os.path.join(os.path.dirname(__file__), '..')
28
29if os.environ.has_key('AUTOTEST_DIR'):
jadmanski0afbb632008-06-06 21:10:57 +000030 AUTOTEST_PATH = os.environ['AUTOTEST_DIR']
mbligh36768f02008-02-22 18:28:33 +000031AUTOTEST_SERVER_DIR = os.path.join(AUTOTEST_PATH, 'server')
32AUTOTEST_TKO_DIR = os.path.join(AUTOTEST_PATH, 'tko')
33
34if AUTOTEST_SERVER_DIR not in sys.path:
jadmanski0afbb632008-06-06 21:10:57 +000035 sys.path.insert(0, AUTOTEST_SERVER_DIR)
mbligh36768f02008-02-22 18:28:33 +000036
mbligh90a549d2008-03-25 23:52:34 +000037# how long to wait for autoserv to write a pidfile
38PIDFILE_TIMEOUT = 5 * 60 # 5 min
mblighbb421852008-03-11 22:36:16 +000039
showardd3dc1992009-04-22 21:01:40 +000040_AUTOSERV_PID_FILE = '.autoserv_execute'
41_CRASHINFO_PID_FILE = '.collect_crashinfo_execute'
42_PARSER_PID_FILE = '.parser_execute'
43
44_ALL_PIDFILE_NAMES = (_AUTOSERV_PID_FILE, _CRASHINFO_PID_FILE,
45 _PARSER_PID_FILE)
46
showard35162b02009-03-03 02:17:30 +000047# error message to leave in results dir when an autoserv process disappears
48# mysteriously
49_LOST_PROCESS_ERROR = """\
50Autoserv failed abnormally during execution for this job, probably due to a
51system error on the Autotest server. Full results may not be available. Sorry.
52"""
53
mbligh6f8bab42008-02-29 22:45:14 +000054_db = None
mbligh36768f02008-02-22 18:28:33 +000055_shutdown = False
showard170873e2009-01-07 00:22:26 +000056_autoserv_path = os.path.join(drones.AUTOTEST_INSTALL_DIR, 'server', 'autoserv')
57_parser_path = os.path.join(drones.AUTOTEST_INSTALL_DIR, 'tko', 'parse')
mbligh4314a712008-02-29 22:44:30 +000058_testing_mode = False
showard542e8402008-09-19 20:16:18 +000059_base_url = None
showardc85c21b2008-11-24 22:17:37 +000060_notify_email_statuses = []
showard170873e2009-01-07 00:22:26 +000061_drone_manager = drone_manager.DroneManager()
mbligh36768f02008-02-22 18:28:33 +000062
63
mbligh83c1e9e2009-05-01 23:10:41 +000064def _site_init_monitor_db_dummy():
65 return {}
66
67
mbligh36768f02008-02-22 18:28:33 +000068def main():
showard27f33872009-04-07 18:20:53 +000069 try:
70 main_without_exception_handling()
showard29caa4b2009-05-26 19:27:09 +000071 except SystemExit:
72 raise
showard27f33872009-04-07 18:20:53 +000073 except:
74 logging.exception('Exception escaping in monitor_db')
75 raise
76
77
78def main_without_exception_handling():
showard136e6dc2009-06-10 19:38:49 +000079 setup_logging()
mbligh36768f02008-02-22 18:28:33 +000080
showard136e6dc2009-06-10 19:38:49 +000081 usage = 'usage: %prog [options] results_dir'
jadmanski0afbb632008-06-06 21:10:57 +000082 parser = optparse.OptionParser(usage)
83 parser.add_option('--recover-hosts', help='Try to recover dead hosts',
84 action='store_true')
jadmanski0afbb632008-06-06 21:10:57 +000085 parser.add_option('--test', help='Indicate that scheduler is under ' +
86 'test and should use dummy autoserv and no parsing',
87 action='store_true')
88 (options, args) = parser.parse_args()
89 if len(args) != 1:
90 parser.print_usage()
91 return
mbligh36768f02008-02-22 18:28:33 +000092
showard5613c662009-06-08 23:30:33 +000093 scheduler_enabled = global_config.global_config.get_config_value(
94 scheduler_config.CONFIG_SECTION, 'enable_scheduler', type=bool)
95
96 if not scheduler_enabled:
97 msg = ("Scheduler not enabled, set enable_scheduler to true in the "
98 "global_config's SCHEDULER section to enabled it. Exiting.")
99 print msg
100 sys.exit(1)
101
jadmanski0afbb632008-06-06 21:10:57 +0000102 global RESULTS_DIR
103 RESULTS_DIR = args[0]
mbligh36768f02008-02-22 18:28:33 +0000104
mbligh83c1e9e2009-05-01 23:10:41 +0000105 site_init = utils.import_site_function(__file__,
106 "autotest_lib.scheduler.site_monitor_db", "site_init_monitor_db",
107 _site_init_monitor_db_dummy)
108 site_init()
109
showardcca334f2009-03-12 20:38:34 +0000110 # Change the cwd while running to avoid issues incase we were launched from
111 # somewhere odd (such as a random NFS home directory of the person running
112 # sudo to launch us as the appropriate user).
113 os.chdir(RESULTS_DIR)
114
jadmanski0afbb632008-06-06 21:10:57 +0000115 c = global_config.global_config
showardd1ee1dd2009-01-07 21:33:08 +0000116 notify_statuses_list = c.get_config_value(scheduler_config.CONFIG_SECTION,
117 "notify_email_statuses",
118 default='')
showardc85c21b2008-11-24 22:17:37 +0000119 global _notify_email_statuses
showard170873e2009-01-07 00:22:26 +0000120 _notify_email_statuses = [status for status in
121 re.split(r'[\s,;:]', notify_statuses_list.lower())
122 if status]
showardc85c21b2008-11-24 22:17:37 +0000123
jadmanski0afbb632008-06-06 21:10:57 +0000124 if options.test:
125 global _autoserv_path
126 _autoserv_path = 'autoserv_dummy'
127 global _testing_mode
128 _testing_mode = True
mbligh36768f02008-02-22 18:28:33 +0000129
mbligh37eceaa2008-12-15 22:56:37 +0000130 # AUTOTEST_WEB.base_url is still a supported config option as some people
131 # may wish to override the entire url.
showard542e8402008-09-19 20:16:18 +0000132 global _base_url
showard170873e2009-01-07 00:22:26 +0000133 config_base_url = c.get_config_value(DB_CONFIG_SECTION, 'base_url',
134 default='')
mbligh37eceaa2008-12-15 22:56:37 +0000135 if config_base_url:
136 _base_url = config_base_url
showard542e8402008-09-19 20:16:18 +0000137 else:
mbligh37eceaa2008-12-15 22:56:37 +0000138 # For the common case of everything running on a single server you
139 # can just set the hostname in a single place in the config file.
140 server_name = c.get_config_value('SERVER', 'hostname')
141 if not server_name:
showardb18134f2009-03-20 20:52:18 +0000142 logging.critical('[SERVER] hostname missing from the config file.')
mbligh37eceaa2008-12-15 22:56:37 +0000143 sys.exit(1)
144 _base_url = 'http://%s/afe/' % server_name
showard542e8402008-09-19 20:16:18 +0000145
showardc5afc462009-01-13 00:09:39 +0000146 server = status_server.StatusServer(_drone_manager)
showardd1ee1dd2009-01-07 21:33:08 +0000147 server.start()
148
jadmanski0afbb632008-06-06 21:10:57 +0000149 try:
showard136e6dc2009-06-10 19:38:49 +0000150 init()
showardc5afc462009-01-13 00:09:39 +0000151 dispatcher = Dispatcher()
showard915958d2009-04-22 21:00:58 +0000152 dispatcher.initialize(recover_hosts=options.recover_hosts)
showardc5afc462009-01-13 00:09:39 +0000153
jadmanski0afbb632008-06-06 21:10:57 +0000154 while not _shutdown:
155 dispatcher.tick()
showardd1ee1dd2009-01-07 21:33:08 +0000156 time.sleep(scheduler_config.config.tick_pause_sec)
jadmanski0afbb632008-06-06 21:10:57 +0000157 except:
showard170873e2009-01-07 00:22:26 +0000158 email_manager.manager.log_stacktrace(
159 "Uncaught exception; terminating monitor_db")
jadmanski0afbb632008-06-06 21:10:57 +0000160
showard170873e2009-01-07 00:22:26 +0000161 email_manager.manager.send_queued_emails()
showard55b4b542009-01-08 23:30:30 +0000162 server.shutdown()
showard170873e2009-01-07 00:22:26 +0000163 _drone_manager.shutdown()
jadmanski0afbb632008-06-06 21:10:57 +0000164 _db.disconnect()
mbligh36768f02008-02-22 18:28:33 +0000165
166
showard136e6dc2009-06-10 19:38:49 +0000167def setup_logging():
168 log_dir = os.environ.get('AUTOTEST_SCHEDULER_LOG_DIR', None)
169 log_name = os.environ.get('AUTOTEST_SCHEDULER_LOG_NAME', None)
170 logging_manager.configure_logging(
171 scheduler_logging_config.SchedulerLoggingConfig(), log_dir=log_dir,
172 logfile_name=log_name)
173
174
mbligh36768f02008-02-22 18:28:33 +0000175def handle_sigint(signum, frame):
jadmanski0afbb632008-06-06 21:10:57 +0000176 global _shutdown
177 _shutdown = True
showardb18134f2009-03-20 20:52:18 +0000178 logging.info("Shutdown request received.")
mbligh36768f02008-02-22 18:28:33 +0000179
180
showard136e6dc2009-06-10 19:38:49 +0000181def init():
showardb18134f2009-03-20 20:52:18 +0000182 logging.info("%s> dispatcher starting", time.strftime("%X %x"))
183 logging.info("My PID is %d", os.getpid())
mbligh36768f02008-02-22 18:28:33 +0000184
mblighfb676032009-04-01 18:25:38 +0000185 utils.write_pid("monitor_db")
186
showardb1e51872008-10-07 11:08:18 +0000187 if _testing_mode:
188 global_config.global_config.override_config_value(
showard170873e2009-01-07 00:22:26 +0000189 DB_CONFIG_SECTION, 'database', 'stresstest_autotest_web')
showardb1e51872008-10-07 11:08:18 +0000190
jadmanski0afbb632008-06-06 21:10:57 +0000191 os.environ['PATH'] = AUTOTEST_SERVER_DIR + ':' + os.environ['PATH']
192 global _db
showard170873e2009-01-07 00:22:26 +0000193 _db = database_connection.DatabaseConnection(DB_CONFIG_SECTION)
jadmanski0afbb632008-06-06 21:10:57 +0000194 _db.connect()
mbligh36768f02008-02-22 18:28:33 +0000195
showardfa8629c2008-11-04 16:51:23 +0000196 # ensure Django connection is in autocommit
197 setup_django_environment.enable_autocommit()
showard844960a2009-05-29 18:41:18 +0000198 # bypass the readonly connection
199 readonly_connection.ReadOnlyConnection.set_globally_disabled(True)
showardfa8629c2008-11-04 16:51:23 +0000200
showardb18134f2009-03-20 20:52:18 +0000201 logging.info("Setting signal handler")
jadmanski0afbb632008-06-06 21:10:57 +0000202 signal.signal(signal.SIGINT, handle_sigint)
203
showardd1ee1dd2009-01-07 21:33:08 +0000204 drones = global_config.global_config.get_config_value(
205 scheduler_config.CONFIG_SECTION, 'drones', default='localhost')
206 drone_list = [hostname.strip() for hostname in drones.split(',')]
showard170873e2009-01-07 00:22:26 +0000207 results_host = global_config.global_config.get_config_value(
showardd1ee1dd2009-01-07 21:33:08 +0000208 scheduler_config.CONFIG_SECTION, 'results_host', default='localhost')
showard170873e2009-01-07 00:22:26 +0000209 _drone_manager.initialize(RESULTS_DIR, drone_list, results_host)
210
showardb18134f2009-03-20 20:52:18 +0000211 logging.info("Connected! Running...")
mbligh36768f02008-02-22 18:28:33 +0000212
213
showard87ba02a2009-04-20 19:37:32 +0000214def _autoserv_command_line(machines, results_dir, extra_args, job=None,
215 queue_entry=None):
showardf1ae3542009-05-11 19:26:02 +0000216 """
217 @returns The autoserv command line as a list of executable + parameters.
218
219 @param machines - string - A machine or comma separated list of machines
220 for the (-m) flag.
221 @param results_dir - string - Where the results will be written (-r).
222 @param extra_args - list - Additional arguments to pass to autoserv.
223 @param job - Job object - If supplied, -u owner and -l name parameters
224 will be added.
225 @param queue_entry - A HostQueueEntry object - If supplied and no Job
226 object was supplied, this will be used to lookup the Job object.
227 """
showard87ba02a2009-04-20 19:37:32 +0000228 autoserv_argv = [_autoserv_path, '-p', '-m', machines,
229 '-r', _drone_manager.absolute_path(results_dir)]
230 if job or queue_entry:
231 if not job:
232 job = queue_entry.job
233 autoserv_argv += ['-u', job.owner, '-l', job.name]
234 return autoserv_argv + extra_args
235
236
showard89f84db2009-03-12 20:39:13 +0000237class SchedulerError(Exception):
238 """Raised by HostScheduler when an inconsistent state occurs."""
239
240
showard63a34772008-08-18 19:32:50 +0000241class HostScheduler(object):
242 def _get_ready_hosts(self):
243 # avoid any host with a currently active queue entry against it
244 hosts = Host.fetch(
245 joins='LEFT JOIN host_queue_entries AS active_hqe '
246 'ON (hosts.id = active_hqe.host_id AND '
showardb1e51872008-10-07 11:08:18 +0000247 'active_hqe.active)',
showard63a34772008-08-18 19:32:50 +0000248 where="active_hqe.host_id IS NULL "
showardb1e51872008-10-07 11:08:18 +0000249 "AND NOT hosts.locked "
showard63a34772008-08-18 19:32:50 +0000250 "AND (hosts.status IS NULL OR hosts.status = 'Ready')")
251 return dict((host.id, host) for host in hosts)
252
253
254 @staticmethod
255 def _get_sql_id_list(id_list):
256 return ','.join(str(item_id) for item_id in id_list)
257
258
259 @classmethod
showard989f25d2008-10-01 11:38:11 +0000260 def _get_many2many_dict(cls, query, id_list, flip=False):
mbligh849a0f62008-08-28 20:12:19 +0000261 if not id_list:
262 return {}
showard63a34772008-08-18 19:32:50 +0000263 query %= cls._get_sql_id_list(id_list)
264 rows = _db.execute(query)
showard989f25d2008-10-01 11:38:11 +0000265 return cls._process_many2many_dict(rows, flip)
266
267
268 @staticmethod
269 def _process_many2many_dict(rows, flip=False):
showard63a34772008-08-18 19:32:50 +0000270 result = {}
271 for row in rows:
showard89f84db2009-03-12 20:39:13 +0000272 left_id, right_id = int(row[0]), int(row[1])
showard989f25d2008-10-01 11:38:11 +0000273 if flip:
274 left_id, right_id = right_id, left_id
showard63a34772008-08-18 19:32:50 +0000275 result.setdefault(left_id, set()).add(right_id)
276 return result
277
278
279 @classmethod
280 def _get_job_acl_groups(cls, job_ids):
281 query = """
showardd9ac4452009-02-07 02:04:37 +0000282 SELECT jobs.id, acl_groups_users.aclgroup_id
showard63a34772008-08-18 19:32:50 +0000283 FROM jobs
284 INNER JOIN users ON users.login = jobs.owner
285 INNER JOIN acl_groups_users ON acl_groups_users.user_id = users.id
286 WHERE jobs.id IN (%s)
287 """
288 return cls._get_many2many_dict(query, job_ids)
289
290
291 @classmethod
292 def _get_job_ineligible_hosts(cls, job_ids):
293 query = """
294 SELECT job_id, host_id
295 FROM ineligible_host_queues
296 WHERE job_id IN (%s)
297 """
298 return cls._get_many2many_dict(query, job_ids)
299
300
301 @classmethod
showard989f25d2008-10-01 11:38:11 +0000302 def _get_job_dependencies(cls, job_ids):
303 query = """
304 SELECT job_id, label_id
305 FROM jobs_dependency_labels
306 WHERE job_id IN (%s)
307 """
308 return cls._get_many2many_dict(query, job_ids)
309
310
311 @classmethod
showard63a34772008-08-18 19:32:50 +0000312 def _get_host_acls(cls, host_ids):
313 query = """
showardd9ac4452009-02-07 02:04:37 +0000314 SELECT host_id, aclgroup_id
showard63a34772008-08-18 19:32:50 +0000315 FROM acl_groups_hosts
316 WHERE host_id IN (%s)
317 """
318 return cls._get_many2many_dict(query, host_ids)
319
320
321 @classmethod
322 def _get_label_hosts(cls, host_ids):
showardfa8629c2008-11-04 16:51:23 +0000323 if not host_ids:
324 return {}, {}
showard63a34772008-08-18 19:32:50 +0000325 query = """
326 SELECT label_id, host_id
327 FROM hosts_labels
328 WHERE host_id IN (%s)
showard989f25d2008-10-01 11:38:11 +0000329 """ % cls._get_sql_id_list(host_ids)
330 rows = _db.execute(query)
331 labels_to_hosts = cls._process_many2many_dict(rows)
332 hosts_to_labels = cls._process_many2many_dict(rows, flip=True)
333 return labels_to_hosts, hosts_to_labels
334
335
336 @classmethod
337 def _get_labels(cls):
338 return dict((label.id, label) for label in Label.fetch())
showard63a34772008-08-18 19:32:50 +0000339
340
341 def refresh(self, pending_queue_entries):
342 self._hosts_available = self._get_ready_hosts()
343
344 relevant_jobs = [queue_entry.job_id
345 for queue_entry in pending_queue_entries]
346 self._job_acls = self._get_job_acl_groups(relevant_jobs)
347 self._ineligible_hosts = self._get_job_ineligible_hosts(relevant_jobs)
showard989f25d2008-10-01 11:38:11 +0000348 self._job_dependencies = self._get_job_dependencies(relevant_jobs)
showard63a34772008-08-18 19:32:50 +0000349
350 host_ids = self._hosts_available.keys()
351 self._host_acls = self._get_host_acls(host_ids)
showard989f25d2008-10-01 11:38:11 +0000352 self._label_hosts, self._host_labels = self._get_label_hosts(host_ids)
353
354 self._labels = self._get_labels()
showard63a34772008-08-18 19:32:50 +0000355
356
357 def _is_acl_accessible(self, host_id, queue_entry):
358 job_acls = self._job_acls.get(queue_entry.job_id, set())
359 host_acls = self._host_acls.get(host_id, set())
360 return len(host_acls.intersection(job_acls)) > 0
361
362
showard989f25d2008-10-01 11:38:11 +0000363 def _check_job_dependencies(self, job_dependencies, host_labels):
364 missing = job_dependencies - host_labels
showard89f84db2009-03-12 20:39:13 +0000365 return len(missing) == 0
showard989f25d2008-10-01 11:38:11 +0000366
367
368 def _check_only_if_needed_labels(self, job_dependencies, host_labels,
369 queue_entry):
showardade14e22009-01-26 22:38:32 +0000370 if not queue_entry.meta_host:
371 # bypass only_if_needed labels when a specific host is selected
372 return True
373
showard989f25d2008-10-01 11:38:11 +0000374 for label_id in host_labels:
375 label = self._labels[label_id]
376 if not label.only_if_needed:
377 # we don't care about non-only_if_needed labels
378 continue
379 if queue_entry.meta_host == label_id:
380 # if the label was requested in a metahost it's OK
381 continue
382 if label_id not in job_dependencies:
383 return False
384 return True
385
386
showard89f84db2009-03-12 20:39:13 +0000387 def _check_atomic_group_labels(self, host_labels, queue_entry):
388 """
389 Determine if the given HostQueueEntry's atomic group settings are okay
390 to schedule on a host with the given labels.
391
392 @param host_labels - A list of label ids that the host has.
393 @param queue_entry - The HostQueueEntry being considered for the host.
394
395 @returns True if atomic group settings are okay, False otherwise.
396 """
397 return (self._get_host_atomic_group_id(host_labels) ==
398 queue_entry.atomic_group_id)
399
400
401 def _get_host_atomic_group_id(self, host_labels):
402 """
403 Return the atomic group label id for a host with the given set of
404 labels if any, or None otherwise. Raises an exception if more than
405 one atomic group are found in the set of labels.
406
407 @param host_labels - A list of label ids that the host has.
408
409 @returns The id of the atomic group found on a label in host_labels
410 or None if no atomic group label is found.
411 @raises SchedulerError - If more than one atomic group label is found.
412 """
413 atomic_ids = [self._labels[label_id].atomic_group_id
414 for label_id in host_labels
415 if self._labels[label_id].atomic_group_id is not None]
416 if not atomic_ids:
417 return None
418 if len(atomic_ids) > 1:
419 raise SchedulerError('More than one atomic label on host.')
420 return atomic_ids[0]
421
422
423 def _get_atomic_group_labels(self, atomic_group_id):
424 """
425 Lookup the label ids that an atomic_group is associated with.
426
427 @param atomic_group_id - The id of the AtomicGroup to look up.
428
429 @returns A generator yeilding Label ids for this atomic group.
430 """
431 return (id for id, label in self._labels.iteritems()
432 if label.atomic_group_id == atomic_group_id
433 and not label.invalid)
434
435
showard54c1ea92009-05-20 00:32:58 +0000436 def _get_eligible_host_ids_in_group(self, group_hosts, queue_entry):
showard89f84db2009-03-12 20:39:13 +0000437 """
438 @param group_hosts - A sequence of Host ids to test for usability
439 and eligibility against the Job associated with queue_entry.
440 @param queue_entry - The HostQueueEntry that these hosts are being
441 tested for eligibility against.
442
443 @returns A subset of group_hosts Host ids that are eligible for the
444 supplied queue_entry.
445 """
446 return set(host_id for host_id in group_hosts
447 if self._is_host_usable(host_id)
448 and self._is_host_eligible_for_job(host_id, queue_entry))
449
450
showard989f25d2008-10-01 11:38:11 +0000451 def _is_host_eligible_for_job(self, host_id, queue_entry):
452 job_dependencies = self._job_dependencies.get(queue_entry.job_id, set())
453 host_labels = self._host_labels.get(host_id, set())
mblighc993bee2008-10-03 03:42:34 +0000454
showard89f84db2009-03-12 20:39:13 +0000455 return (self._is_acl_accessible(host_id, queue_entry) and
456 self._check_job_dependencies(job_dependencies, host_labels) and
457 self._check_only_if_needed_labels(
458 job_dependencies, host_labels, queue_entry) and
459 self._check_atomic_group_labels(host_labels, queue_entry))
showard989f25d2008-10-01 11:38:11 +0000460
461
showard63a34772008-08-18 19:32:50 +0000462 def _schedule_non_metahost(self, queue_entry):
showard989f25d2008-10-01 11:38:11 +0000463 if not self._is_host_eligible_for_job(queue_entry.host_id, queue_entry):
showard63a34772008-08-18 19:32:50 +0000464 return None
465 return self._hosts_available.pop(queue_entry.host_id, None)
466
467
468 def _is_host_usable(self, host_id):
469 if host_id not in self._hosts_available:
470 # host was already used during this scheduling cycle
471 return False
472 if self._hosts_available[host_id].invalid:
473 # Invalid hosts cannot be used for metahosts. They're included in
474 # the original query because they can be used by non-metahosts.
475 return False
476 return True
477
478
479 def _schedule_metahost(self, queue_entry):
480 label_id = queue_entry.meta_host
481 hosts_in_label = self._label_hosts.get(label_id, set())
482 ineligible_host_ids = self._ineligible_hosts.get(queue_entry.job_id,
483 set())
484
485 # must iterate over a copy so we can mutate the original while iterating
486 for host_id in list(hosts_in_label):
487 if not self._is_host_usable(host_id):
488 hosts_in_label.remove(host_id)
489 continue
490 if host_id in ineligible_host_ids:
491 continue
showard989f25d2008-10-01 11:38:11 +0000492 if not self._is_host_eligible_for_job(host_id, queue_entry):
showard63a34772008-08-18 19:32:50 +0000493 continue
494
showard89f84db2009-03-12 20:39:13 +0000495 # Remove the host from our cached internal state before returning
496 # the host object.
showard63a34772008-08-18 19:32:50 +0000497 hosts_in_label.remove(host_id)
498 return self._hosts_available.pop(host_id)
499 return None
500
501
502 def find_eligible_host(self, queue_entry):
503 if not queue_entry.meta_host:
showard89f84db2009-03-12 20:39:13 +0000504 assert queue_entry.host_id is not None
showard63a34772008-08-18 19:32:50 +0000505 return self._schedule_non_metahost(queue_entry)
showard89f84db2009-03-12 20:39:13 +0000506 assert queue_entry.atomic_group_id is None
showard63a34772008-08-18 19:32:50 +0000507 return self._schedule_metahost(queue_entry)
508
509
showard89f84db2009-03-12 20:39:13 +0000510 def find_eligible_atomic_group(self, queue_entry):
511 """
512 Given an atomic group host queue entry, locate an appropriate group
513 of hosts for the associated job to run on.
514
515 The caller is responsible for creating new HQEs for the additional
516 hosts returned in order to run the actual job on them.
517
518 @returns A list of Host instances in a ready state to satisfy this
519 atomic group scheduling. Hosts will all belong to the same
520 atomic group label as specified by the queue_entry.
521 An empty list will be returned if no suitable atomic
522 group could be found.
523
524 TODO(gps): what is responsible for kicking off any attempted repairs on
525 a group of hosts? not this function, but something needs to. We do
526 not communicate that reason for returning [] outside of here...
527 For now, we'll just be unschedulable if enough hosts within one group
528 enter Repair Failed state.
529 """
530 assert queue_entry.atomic_group_id is not None
531 job = queue_entry.job
532 assert job.synch_count and job.synch_count > 0
showard77182562009-06-10 00:16:05 +0000533 atomic_group = queue_entry.atomic_group
showard89f84db2009-03-12 20:39:13 +0000534 if job.synch_count > atomic_group.max_number_of_machines:
535 # Such a Job and HostQueueEntry should never be possible to
536 # create using the frontend. Regardless, we can't process it.
537 # Abort it immediately and log an error on the scheduler.
538 queue_entry.set_status(models.HostQueueEntry.Status.ABORTED)
showard7629f142009-03-27 21:02:02 +0000539 logging.error(
540 'Error: job %d synch_count=%d > requested atomic_group %d '
541 'max_number_of_machines=%d. Aborted host_queue_entry %d.',
542 job.id, job.synch_count, atomic_group.id,
543 atomic_group.max_number_of_machines, queue_entry.id)
showard89f84db2009-03-12 20:39:13 +0000544 return []
545 hosts_in_label = self._label_hosts.get(queue_entry.meta_host, set())
546 ineligible_host_ids = self._ineligible_hosts.get(queue_entry.job_id,
547 set())
548
549 # Look in each label associated with atomic_group until we find one with
550 # enough hosts to satisfy the job.
551 for atomic_label_id in self._get_atomic_group_labels(atomic_group.id):
552 group_hosts = set(self._label_hosts.get(atomic_label_id, set()))
553 if queue_entry.meta_host is not None:
554 # If we have a metahost label, only allow its hosts.
555 group_hosts.intersection_update(hosts_in_label)
556 group_hosts -= ineligible_host_ids
showard54c1ea92009-05-20 00:32:58 +0000557 eligible_host_ids_in_group = self._get_eligible_host_ids_in_group(
showard89f84db2009-03-12 20:39:13 +0000558 group_hosts, queue_entry)
559
560 # Job.synch_count is treated as "minimum synch count" when
561 # scheduling for an atomic group of hosts. The atomic group
562 # number of machines is the maximum to pick out of a single
563 # atomic group label for scheduling at one time.
564 min_hosts = job.synch_count
565 max_hosts = atomic_group.max_number_of_machines
566
showard54c1ea92009-05-20 00:32:58 +0000567 if len(eligible_host_ids_in_group) < min_hosts:
showard89f84db2009-03-12 20:39:13 +0000568 # Not enough eligible hosts in this atomic group label.
569 continue
570
showard54c1ea92009-05-20 00:32:58 +0000571 eligible_hosts_in_group = [self._hosts_available[id]
572 for id in eligible_host_ids_in_group]
showardef519212009-05-08 02:29:53 +0000573 # So that they show up in a sane order when viewing the job.
showard54c1ea92009-05-20 00:32:58 +0000574 eligible_hosts_in_group.sort(cmp=Host.cmp_for_sort)
showardef519212009-05-08 02:29:53 +0000575
showard89f84db2009-03-12 20:39:13 +0000576 # Limit ourselves to scheduling the atomic group size.
577 if len(eligible_hosts_in_group) > max_hosts:
showardef519212009-05-08 02:29:53 +0000578 eligible_hosts_in_group = eligible_hosts_in_group[:max_hosts]
showard89f84db2009-03-12 20:39:13 +0000579
580 # Remove the selected hosts from our cached internal state
581 # of available hosts in order to return the Host objects.
582 host_list = []
showard54c1ea92009-05-20 00:32:58 +0000583 for host in eligible_hosts_in_group:
584 hosts_in_label.discard(host.id)
585 self._hosts_available.pop(host.id)
586 host_list.append(host)
showard89f84db2009-03-12 20:39:13 +0000587 return host_list
588
589 return []
590
591
showard170873e2009-01-07 00:22:26 +0000592class Dispatcher(object):
jadmanski0afbb632008-06-06 21:10:57 +0000593 def __init__(self):
594 self._agents = []
showard3bb499f2008-07-03 19:42:20 +0000595 self._last_clean_time = time.time()
showard63a34772008-08-18 19:32:50 +0000596 self._host_scheduler = HostScheduler()
mblighf3294cc2009-04-08 21:17:38 +0000597 user_cleanup_time = scheduler_config.config.clean_interval
598 self._periodic_cleanup = monitor_db_cleanup.UserCleanup(
599 _db, user_cleanup_time)
600 self._24hr_upkeep = monitor_db_cleanup.TwentyFourHourUpkeep(_db)
showard170873e2009-01-07 00:22:26 +0000601 self._host_agents = {}
602 self._queue_entry_agents = {}
mbligh36768f02008-02-22 18:28:33 +0000603
mbligh36768f02008-02-22 18:28:33 +0000604
showard915958d2009-04-22 21:00:58 +0000605 def initialize(self, recover_hosts=True):
606 self._periodic_cleanup.initialize()
607 self._24hr_upkeep.initialize()
608
jadmanski0afbb632008-06-06 21:10:57 +0000609 # always recover processes
610 self._recover_processes()
mblighbb421852008-03-11 22:36:16 +0000611
jadmanski0afbb632008-06-06 21:10:57 +0000612 if recover_hosts:
613 self._recover_hosts()
mbligh36768f02008-02-22 18:28:33 +0000614
615
jadmanski0afbb632008-06-06 21:10:57 +0000616 def tick(self):
showard170873e2009-01-07 00:22:26 +0000617 _drone_manager.refresh()
mblighf3294cc2009-04-08 21:17:38 +0000618 self._run_cleanup()
jadmanski0afbb632008-06-06 21:10:57 +0000619 self._find_aborting()
showard1ff7b2e2009-05-15 23:17:18 +0000620 self._find_reverify()
showard29f7cd22009-04-29 21:16:24 +0000621 self._process_recurring_runs()
jadmanski0afbb632008-06-06 21:10:57 +0000622 self._schedule_new_jobs()
623 self._handle_agents()
showard170873e2009-01-07 00:22:26 +0000624 _drone_manager.execute_actions()
625 email_manager.manager.send_queued_emails()
mbligh36768f02008-02-22 18:28:33 +0000626
showard97aed502008-11-04 02:01:24 +0000627
mblighf3294cc2009-04-08 21:17:38 +0000628 def _run_cleanup(self):
629 self._periodic_cleanup.run_cleanup_maybe()
630 self._24hr_upkeep.run_cleanup_maybe()
showarda3ab0d52008-11-03 19:03:47 +0000631
mbligh36768f02008-02-22 18:28:33 +0000632
showard170873e2009-01-07 00:22:26 +0000633 def _register_agent_for_ids(self, agent_dict, object_ids, agent):
634 for object_id in object_ids:
635 agent_dict.setdefault(object_id, set()).add(agent)
636
637
638 def _unregister_agent_for_ids(self, agent_dict, object_ids, agent):
639 for object_id in object_ids:
640 assert object_id in agent_dict
641 agent_dict[object_id].remove(agent)
642
643
jadmanski0afbb632008-06-06 21:10:57 +0000644 def add_agent(self, agent):
645 self._agents.append(agent)
646 agent.dispatcher = self
showard170873e2009-01-07 00:22:26 +0000647 self._register_agent_for_ids(self._host_agents, agent.host_ids, agent)
648 self._register_agent_for_ids(self._queue_entry_agents,
649 agent.queue_entry_ids, agent)
mblighd5c95802008-03-05 00:33:46 +0000650
showard170873e2009-01-07 00:22:26 +0000651
652 def get_agents_for_entry(self, queue_entry):
653 """
654 Find agents corresponding to the specified queue_entry.
655 """
showardd3dc1992009-04-22 21:01:40 +0000656 return list(self._queue_entry_agents.get(queue_entry.id, set()))
showard170873e2009-01-07 00:22:26 +0000657
658
659 def host_has_agent(self, host):
660 """
661 Determine if there is currently an Agent present using this host.
662 """
663 return bool(self._host_agents.get(host.id, None))
mbligh36768f02008-02-22 18:28:33 +0000664
665
jadmanski0afbb632008-06-06 21:10:57 +0000666 def remove_agent(self, agent):
667 self._agents.remove(agent)
showard170873e2009-01-07 00:22:26 +0000668 self._unregister_agent_for_ids(self._host_agents, agent.host_ids,
669 agent)
670 self._unregister_agent_for_ids(self._queue_entry_agents,
671 agent.queue_entry_ids, agent)
showardec113162008-05-08 00:52:49 +0000672
673
jadmanski0afbb632008-06-06 21:10:57 +0000674 def _recover_processes(self):
showard170873e2009-01-07 00:22:26 +0000675 self._register_pidfiles()
676 _drone_manager.refresh()
showardd3dc1992009-04-22 21:01:40 +0000677 self._recover_all_recoverable_entries()
showard170873e2009-01-07 00:22:26 +0000678 self._requeue_other_active_entries()
showard170873e2009-01-07 00:22:26 +0000679 self._reverify_remaining_hosts()
680 # reinitialize drones after killing orphaned processes, since they can
681 # leave around files when they die
682 _drone_manager.execute_actions()
683 _drone_manager.reinitialize_drones()
mblighbb421852008-03-11 22:36:16 +0000684
showard170873e2009-01-07 00:22:26 +0000685
686 def _register_pidfiles(self):
687 # during recovery we may need to read pidfiles for both running and
688 # parsing entries
689 queue_entries = HostQueueEntry.fetch(
showardd3dc1992009-04-22 21:01:40 +0000690 where="status IN ('Running', 'Gathering', 'Parsing')")
jadmanski0afbb632008-06-06 21:10:57 +0000691 for queue_entry in queue_entries:
showardd3dc1992009-04-22 21:01:40 +0000692 for pidfile_name in _ALL_PIDFILE_NAMES:
693 pidfile_id = _drone_manager.get_pidfile_id_from(
694 queue_entry.execution_tag(), pidfile_name=pidfile_name)
695 _drone_manager.register_pidfile(pidfile_id)
showard170873e2009-01-07 00:22:26 +0000696
697
showardd3dc1992009-04-22 21:01:40 +0000698 def _recover_entries_with_status(self, status, orphans, pidfile_name,
699 recover_entries_fn):
700 queue_entries = HostQueueEntry.fetch(where="status = '%s'" % status)
showard170873e2009-01-07 00:22:26 +0000701 for queue_entry in queue_entries:
702 if self.get_agents_for_entry(queue_entry):
jadmanski0afbb632008-06-06 21:10:57 +0000703 # synchronous job we've already recovered
704 continue
showardd3dc1992009-04-22 21:01:40 +0000705 queue_entries = queue_entry.job.get_group_entries(queue_entry)
showard170873e2009-01-07 00:22:26 +0000706 execution_tag = queue_entry.execution_tag()
707 run_monitor = PidfileRunMonitor()
showardd3dc1992009-04-22 21:01:40 +0000708 run_monitor.attach_to_existing_process(execution_tag,
709 pidfile_name=pidfile_name)
showard597bfd32009-05-08 18:22:50 +0000710
711 log_message = ('Recovering %s entry %s ' %
712 (status.lower(),
713 ', '.join(str(entry) for entry in queue_entries)))
showard170873e2009-01-07 00:22:26 +0000714 if not run_monitor.has_process():
showardd3dc1992009-04-22 21:01:40 +0000715 # execution apparently never happened
showard597bfd32009-05-08 18:22:50 +0000716 logging.info(log_message + 'without process')
showardd3dc1992009-04-22 21:01:40 +0000717 recover_entries_fn(queue_entry.job, queue_entries, None)
showard170873e2009-01-07 00:22:26 +0000718 continue
mbligh90a549d2008-03-25 23:52:34 +0000719
showard597bfd32009-05-08 18:22:50 +0000720 logging.info(log_message + '(process %s)',
showardd3dc1992009-04-22 21:01:40 +0000721 run_monitor.get_process())
722 recover_entries_fn(queue_entry.job, queue_entries, run_monitor)
723 orphans.discard(run_monitor.get_process())
724
725
726 def _kill_remaining_orphan_processes(self, orphans):
727 for process in orphans:
showardb18134f2009-03-20 20:52:18 +0000728 logging.info('Killing orphan %s', process)
showard170873e2009-01-07 00:22:26 +0000729 _drone_manager.kill_process(process)
jadmanski0afbb632008-06-06 21:10:57 +0000730
showard170873e2009-01-07 00:22:26 +0000731
showardd3dc1992009-04-22 21:01:40 +0000732 def _recover_running_entries(self, orphans):
733 def recover_entries(job, queue_entries, run_monitor):
734 if run_monitor is not None:
735 queue_task = RecoveryQueueTask(job=job,
736 queue_entries=queue_entries,
737 run_monitor=run_monitor)
738 self.add_agent(Agent(tasks=[queue_task],
739 num_processes=len(queue_entries)))
740 # else, _requeue_other_active_entries will cover this
741
742 self._recover_entries_with_status(models.HostQueueEntry.Status.RUNNING,
743 orphans, '.autoserv_execute',
744 recover_entries)
745
746
747 def _recover_gathering_entries(self, orphans):
748 def recover_entries(job, queue_entries, run_monitor):
749 gather_task = GatherLogsTask(job, queue_entries,
750 run_monitor=run_monitor)
751 self.add_agent(Agent([gather_task]))
752
753 self._recover_entries_with_status(
754 models.HostQueueEntry.Status.GATHERING,
755 orphans, _CRASHINFO_PID_FILE, recover_entries)
756
757
758 def _recover_parsing_entries(self, orphans):
759 def recover_entries(job, queue_entries, run_monitor):
760 reparse_task = FinalReparseTask(queue_entries,
761 run_monitor=run_monitor)
762 self.add_agent(Agent([reparse_task], num_processes=0))
763
764 self._recover_entries_with_status(models.HostQueueEntry.Status.PARSING,
765 orphans, _PARSER_PID_FILE,
766 recover_entries)
767
768
769 def _recover_all_recoverable_entries(self):
770 orphans = _drone_manager.get_orphaned_autoserv_processes()
771 self._recover_running_entries(orphans)
772 self._recover_gathering_entries(orphans)
773 self._recover_parsing_entries(orphans)
774 self._kill_remaining_orphan_processes(orphans)
jadmanski0afbb632008-06-06 21:10:57 +0000775
showard97aed502008-11-04 02:01:24 +0000776
showard170873e2009-01-07 00:22:26 +0000777 def _requeue_other_active_entries(self):
778 queue_entries = HostQueueEntry.fetch(
showardd3dc1992009-04-22 21:01:40 +0000779 where='active AND NOT complete AND '
780 '(aborted OR status != "Pending")')
showard170873e2009-01-07 00:22:26 +0000781 for queue_entry in queue_entries:
782 if self.get_agents_for_entry(queue_entry):
783 # entry has already been recovered
784 continue
showardd3dc1992009-04-22 21:01:40 +0000785 if queue_entry.aborted:
786 queue_entry.abort(self)
787 continue
788
789 logging.info('Requeuing active QE %s (status=%s)', queue_entry,
showardb18134f2009-03-20 20:52:18 +0000790 queue_entry.status)
showard170873e2009-01-07 00:22:26 +0000791 if queue_entry.host:
792 tasks = queue_entry.host.reverify_tasks()
793 self.add_agent(Agent(tasks))
794 agent = queue_entry.requeue()
795
796
showard1ff7b2e2009-05-15 23:17:18 +0000797 def _find_reverify(self):
showard6d7b2ff2009-06-10 00:16:47 +0000798 tasks = models.SpecialTask.objects.filter(
799 task=models.SpecialTask.Task.REVERIFY, is_active=False,
800 is_complete=False)
801
802 host_ids = [str(task.host.id) for task in tasks]
803
804 if host_ids:
805 where = 'id IN (%s)' % ','.join(host_ids)
806 host_ids_reverifying = self._reverify_hosts_where(
807 where, cleanup=False)
808 tasks = tasks.filter(host__id__in=host_ids_reverifying)
809 for task in tasks:
810 task.is_active=True
811 task.save()
showard1ff7b2e2009-05-15 23:17:18 +0000812
813
showard170873e2009-01-07 00:22:26 +0000814 def _reverify_remaining_hosts(self):
showard45ae8192008-11-05 19:32:53 +0000815 # reverify hosts that were in the middle of verify, repair or cleanup
jadmanski0afbb632008-06-06 21:10:57 +0000816 self._reverify_hosts_where("""(status = 'Repairing' OR
817 status = 'Verifying' OR
showard170873e2009-01-07 00:22:26 +0000818 status = 'Cleaning')""")
jadmanski0afbb632008-06-06 21:10:57 +0000819
showard170873e2009-01-07 00:22:26 +0000820 # recover "Running" hosts with no active queue entries, although this
821 # should never happen
822 message = ('Recovering running host %s - this probably indicates a '
823 'scheduler bug')
jadmanski0afbb632008-06-06 21:10:57 +0000824 self._reverify_hosts_where("""status = 'Running' AND
825 id NOT IN (SELECT host_id
826 FROM host_queue_entries
827 WHERE active)""",
828 print_message=message)
mblighbb421852008-03-11 22:36:16 +0000829
830
jadmanski0afbb632008-06-06 21:10:57 +0000831 def _reverify_hosts_where(self, where,
showarda64e52a2009-06-08 23:24:08 +0000832 print_message='Reverifying host %s',
833 cleanup=True):
showard170873e2009-01-07 00:22:26 +0000834 full_where='locked = 0 AND invalid = 0 AND ' + where
showard6d7b2ff2009-06-10 00:16:47 +0000835 host_ids_reverifying = []
showard170873e2009-01-07 00:22:26 +0000836 for host in Host.fetch(where=full_where):
837 if self.host_has_agent(host):
838 # host has already been recovered in some way
jadmanski0afbb632008-06-06 21:10:57 +0000839 continue
showard170873e2009-01-07 00:22:26 +0000840 if print_message:
showardb18134f2009-03-20 20:52:18 +0000841 logging.info(print_message, host.hostname)
showarda64e52a2009-06-08 23:24:08 +0000842 tasks = host.reverify_tasks(cleanup)
showard170873e2009-01-07 00:22:26 +0000843 self.add_agent(Agent(tasks))
showard6d7b2ff2009-06-10 00:16:47 +0000844 host_ids_reverifying.append(host.id)
845 return host_ids_reverifying
mbligh36768f02008-02-22 18:28:33 +0000846
847
jadmanski0afbb632008-06-06 21:10:57 +0000848 def _recover_hosts(self):
849 # recover "Repair Failed" hosts
850 message = 'Reverifying dead host %s'
851 self._reverify_hosts_where("status = 'Repair Failed'",
852 print_message=message)
mbligh62ba2ed2008-04-30 17:09:25 +0000853
854
showard04c82c52008-05-29 19:38:12 +0000855
showardb95b1bd2008-08-15 18:11:04 +0000856 def _get_pending_queue_entries(self):
showard63a34772008-08-18 19:32:50 +0000857 # prioritize by job priority, then non-metahost over metahost, then FIFO
858 return list(HostQueueEntry.fetch(
showard25cbdbd2009-02-17 20:57:21 +0000859 joins='INNER JOIN jobs ON (job_id=jobs.id)',
showardac9ce222008-12-03 18:19:44 +0000860 where='NOT complete AND NOT active AND status="Queued"',
showard25cbdbd2009-02-17 20:57:21 +0000861 order_by='jobs.priority DESC, meta_host, job_id'))
mbligh36768f02008-02-22 18:28:33 +0000862
863
showard89f84db2009-03-12 20:39:13 +0000864 def _refresh_pending_queue_entries(self):
865 """
866 Lookup the pending HostQueueEntries and call our HostScheduler
867 refresh() method given that list. Return the list.
868
869 @returns A list of pending HostQueueEntries sorted in priority order.
870 """
showard63a34772008-08-18 19:32:50 +0000871 queue_entries = self._get_pending_queue_entries()
872 if not queue_entries:
showard89f84db2009-03-12 20:39:13 +0000873 return []
showardb95b1bd2008-08-15 18:11:04 +0000874
showard63a34772008-08-18 19:32:50 +0000875 self._host_scheduler.refresh(queue_entries)
showardb95b1bd2008-08-15 18:11:04 +0000876
showard89f84db2009-03-12 20:39:13 +0000877 return queue_entries
878
879
880 def _schedule_atomic_group(self, queue_entry):
881 """
882 Schedule the given queue_entry on an atomic group of hosts.
883
884 Returns immediately if there are insufficient available hosts.
885
886 Creates new HostQueueEntries based off of queue_entry for the
887 scheduled hosts and starts them all running.
888 """
889 # This is a virtual host queue entry representing an entire
890 # atomic group, find a group and schedule their hosts.
891 group_hosts = self._host_scheduler.find_eligible_atomic_group(
892 queue_entry)
893 if not group_hosts:
894 return
895 # The first assigned host uses the original HostQueueEntry
896 group_queue_entries = [queue_entry]
897 for assigned_host in group_hosts[1:]:
898 # Create a new HQE for every additional assigned_host.
899 new_hqe = HostQueueEntry.clone(queue_entry)
900 new_hqe.save()
901 group_queue_entries.append(new_hqe)
902 assert len(group_queue_entries) == len(group_hosts)
903 for queue_entry, host in itertools.izip(group_queue_entries,
904 group_hosts):
905 self._run_queue_entry(queue_entry, host)
906
907
908 def _schedule_new_jobs(self):
909 queue_entries = self._refresh_pending_queue_entries()
910 if not queue_entries:
911 return
912
showard63a34772008-08-18 19:32:50 +0000913 for queue_entry in queue_entries:
showard89f84db2009-03-12 20:39:13 +0000914 if (queue_entry.atomic_group_id is None or
915 queue_entry.host_id is not None):
916 assigned_host = self._host_scheduler.find_eligible_host(
917 queue_entry)
918 if assigned_host:
919 self._run_queue_entry(queue_entry, assigned_host)
920 else:
921 self._schedule_atomic_group(queue_entry)
showardb95b1bd2008-08-15 18:11:04 +0000922
923
924 def _run_queue_entry(self, queue_entry, host):
showard77182562009-06-10 00:16:05 +0000925 agent = queue_entry.run_pre_job_tasks(assigned_host=host)
926 self.add_agent(agent)
mblighd5c95802008-03-05 00:33:46 +0000927
928
jadmanski0afbb632008-06-06 21:10:57 +0000929 def _find_aborting(self):
showardd3dc1992009-04-22 21:01:40 +0000930 for entry in HostQueueEntry.fetch(where='aborted and not complete'):
931 for agent in self.get_agents_for_entry(entry):
932 agent.abort()
933 entry.abort(self)
jadmanski0afbb632008-06-06 21:10:57 +0000934
935
showard324bf812009-01-20 23:23:38 +0000936 def _can_start_agent(self, agent, num_started_this_cycle,
937 have_reached_limit):
showard4c5374f2008-09-04 17:02:56 +0000938 # always allow zero-process agents to run
939 if agent.num_processes == 0:
940 return True
941 # don't allow any nonzero-process agents to run after we've reached a
942 # limit (this avoids starvation of many-process agents)
943 if have_reached_limit:
944 return False
945 # total process throttling
showard324bf812009-01-20 23:23:38 +0000946 if agent.num_processes > _drone_manager.max_runnable_processes():
showard4c5374f2008-09-04 17:02:56 +0000947 return False
948 # if a single agent exceeds the per-cycle throttling, still allow it to
949 # run when it's the first agent in the cycle
950 if num_started_this_cycle == 0:
951 return True
952 # per-cycle throttling
953 if (num_started_this_cycle + agent.num_processes >
showardd1ee1dd2009-01-07 21:33:08 +0000954 scheduler_config.config.max_processes_started_per_cycle):
showard4c5374f2008-09-04 17:02:56 +0000955 return False
956 return True
957
958
jadmanski0afbb632008-06-06 21:10:57 +0000959 def _handle_agents(self):
jadmanski0afbb632008-06-06 21:10:57 +0000960 num_started_this_cycle = 0
showard4c5374f2008-09-04 17:02:56 +0000961 have_reached_limit = False
962 # iterate over copy, so we can remove agents during iteration
963 for agent in list(self._agents):
964 if agent.is_done():
showardb18134f2009-03-20 20:52:18 +0000965 logging.info("agent finished")
showard170873e2009-01-07 00:22:26 +0000966 self.remove_agent(agent)
showard4c5374f2008-09-04 17:02:56 +0000967 continue
968 if not agent.is_running():
showard324bf812009-01-20 23:23:38 +0000969 if not self._can_start_agent(agent, num_started_this_cycle,
showard4c5374f2008-09-04 17:02:56 +0000970 have_reached_limit):
971 have_reached_limit = True
972 continue
showard4c5374f2008-09-04 17:02:56 +0000973 num_started_this_cycle += agent.num_processes
974 agent.tick()
showarda9435c02009-05-13 21:28:17 +0000975 logging.info('%d running processes',
showardb18134f2009-03-20 20:52:18 +0000976 _drone_manager.total_running_processes())
mbligh36768f02008-02-22 18:28:33 +0000977
978
showard29f7cd22009-04-29 21:16:24 +0000979 def _process_recurring_runs(self):
980 recurring_runs = models.RecurringRun.objects.filter(
981 start_date__lte=datetime.datetime.now())
982 for rrun in recurring_runs:
983 # Create job from template
984 job = rrun.job
985 info = rpc_utils.get_job_info(job)
showarda9435c02009-05-13 21:28:17 +0000986 options = job.get_object_dict()
showard29f7cd22009-04-29 21:16:24 +0000987
988 host_objects = info['hosts']
989 one_time_hosts = info['one_time_hosts']
990 metahost_objects = info['meta_hosts']
991 dependencies = info['dependencies']
992 atomic_group = info['atomic_group']
993
994 for host in one_time_hosts or []:
995 this_host = models.Host.create_one_time_host(host.hostname)
996 host_objects.append(this_host)
997
998 try:
999 rpc_utils.create_new_job(owner=rrun.owner.login,
showarda9435c02009-05-13 21:28:17 +00001000 options=options,
showard29f7cd22009-04-29 21:16:24 +00001001 host_objects=host_objects,
1002 metahost_objects=metahost_objects,
showard29f7cd22009-04-29 21:16:24 +00001003 atomic_group=atomic_group)
1004
1005 except Exception, ex:
1006 logging.exception(ex)
1007 #TODO send email
1008
1009 if rrun.loop_count == 1:
1010 rrun.delete()
1011 else:
1012 if rrun.loop_count != 0: # if not infinite loop
1013 # calculate new start_date
1014 difference = datetime.timedelta(seconds=rrun.loop_period)
1015 rrun.start_date = rrun.start_date + difference
1016 rrun.loop_count -= 1
1017 rrun.save()
1018
1019
showard170873e2009-01-07 00:22:26 +00001020class PidfileRunMonitor(object):
1021 """
1022 Client must call either run() to start a new process or
1023 attach_to_existing_process().
1024 """
mbligh36768f02008-02-22 18:28:33 +00001025
showard170873e2009-01-07 00:22:26 +00001026 class _PidfileException(Exception):
1027 """
1028 Raised when there's some unexpected behavior with the pid file, but only
1029 used internally (never allowed to escape this class).
1030 """
mbligh36768f02008-02-22 18:28:33 +00001031
1032
showard170873e2009-01-07 00:22:26 +00001033 def __init__(self):
showard35162b02009-03-03 02:17:30 +00001034 self.lost_process = False
showard170873e2009-01-07 00:22:26 +00001035 self._start_time = None
1036 self.pidfile_id = None
1037 self._state = drone_manager.PidfileContents()
showard2bab8f42008-11-12 18:15:22 +00001038
1039
showard170873e2009-01-07 00:22:26 +00001040 def _add_nice_command(self, command, nice_level):
1041 if not nice_level:
1042 return command
1043 return ['nice', '-n', str(nice_level)] + command
1044
1045
1046 def _set_start_time(self):
1047 self._start_time = time.time()
1048
1049
1050 def run(self, command, working_directory, nice_level=None, log_file=None,
1051 pidfile_name=None, paired_with_pidfile=None):
1052 assert command is not None
1053 if nice_level is not None:
1054 command = ['nice', '-n', str(nice_level)] + command
1055 self._set_start_time()
1056 self.pidfile_id = _drone_manager.execute_command(
showardd3dc1992009-04-22 21:01:40 +00001057 command, working_directory, pidfile_name=pidfile_name,
1058 log_file=log_file, paired_with_pidfile=paired_with_pidfile)
showard170873e2009-01-07 00:22:26 +00001059
1060
showardd3dc1992009-04-22 21:01:40 +00001061 def attach_to_existing_process(self, execution_tag,
1062 pidfile_name=_AUTOSERV_PID_FILE):
showard170873e2009-01-07 00:22:26 +00001063 self._set_start_time()
showardd3dc1992009-04-22 21:01:40 +00001064 self.pidfile_id = _drone_manager.get_pidfile_id_from(
1065 execution_tag, pidfile_name=pidfile_name)
showard170873e2009-01-07 00:22:26 +00001066 _drone_manager.register_pidfile(self.pidfile_id)
mblighbb421852008-03-11 22:36:16 +00001067
1068
jadmanski0afbb632008-06-06 21:10:57 +00001069 def kill(self):
showard170873e2009-01-07 00:22:26 +00001070 if self.has_process():
1071 _drone_manager.kill_process(self.get_process())
mblighbb421852008-03-11 22:36:16 +00001072
mbligh36768f02008-02-22 18:28:33 +00001073
showard170873e2009-01-07 00:22:26 +00001074 def has_process(self):
showard21baa452008-10-21 00:08:39 +00001075 self._get_pidfile_info()
showard170873e2009-01-07 00:22:26 +00001076 return self._state.process is not None
showard21baa452008-10-21 00:08:39 +00001077
1078
showard170873e2009-01-07 00:22:26 +00001079 def get_process(self):
showard21baa452008-10-21 00:08:39 +00001080 self._get_pidfile_info()
showard35162b02009-03-03 02:17:30 +00001081 assert self._state.process is not None
showard170873e2009-01-07 00:22:26 +00001082 return self._state.process
mblighbb421852008-03-11 22:36:16 +00001083
1084
showard170873e2009-01-07 00:22:26 +00001085 def _read_pidfile(self, use_second_read=False):
1086 assert self.pidfile_id is not None, (
1087 'You must call run() or attach_to_existing_process()')
1088 contents = _drone_manager.get_pidfile_contents(
1089 self.pidfile_id, use_second_read=use_second_read)
1090 if contents.is_invalid():
1091 self._state = drone_manager.PidfileContents()
1092 raise self._PidfileException(contents)
1093 self._state = contents
mbligh90a549d2008-03-25 23:52:34 +00001094
1095
showard21baa452008-10-21 00:08:39 +00001096 def _handle_pidfile_error(self, error, message=''):
showard170873e2009-01-07 00:22:26 +00001097 message = error + '\nProcess: %s\nPidfile: %s\n%s' % (
1098 self._state.process, self.pidfile_id, message)
showardb18134f2009-03-20 20:52:18 +00001099 logging.info(message)
showard170873e2009-01-07 00:22:26 +00001100 email_manager.manager.enqueue_notify_email(error, message)
showard35162b02009-03-03 02:17:30 +00001101 self.on_lost_process(self._state.process)
showard21baa452008-10-21 00:08:39 +00001102
1103
1104 def _get_pidfile_info_helper(self):
showard35162b02009-03-03 02:17:30 +00001105 if self.lost_process:
showard21baa452008-10-21 00:08:39 +00001106 return
mblighbb421852008-03-11 22:36:16 +00001107
showard21baa452008-10-21 00:08:39 +00001108 self._read_pidfile()
mblighbb421852008-03-11 22:36:16 +00001109
showard170873e2009-01-07 00:22:26 +00001110 if self._state.process is None:
1111 self._handle_no_process()
showard21baa452008-10-21 00:08:39 +00001112 return
mbligh90a549d2008-03-25 23:52:34 +00001113
showard21baa452008-10-21 00:08:39 +00001114 if self._state.exit_status is None:
jadmanski0afbb632008-06-06 21:10:57 +00001115 # double check whether or not autoserv is running
showard170873e2009-01-07 00:22:26 +00001116 if _drone_manager.is_process_running(self._state.process):
showard21baa452008-10-21 00:08:39 +00001117 return
mbligh90a549d2008-03-25 23:52:34 +00001118
showard170873e2009-01-07 00:22:26 +00001119 # pid but no running process - maybe process *just* exited
1120 self._read_pidfile(use_second_read=True)
showard21baa452008-10-21 00:08:39 +00001121 if self._state.exit_status is None:
jadmanski0afbb632008-06-06 21:10:57 +00001122 # autoserv exited without writing an exit code
1123 # to the pidfile
showard21baa452008-10-21 00:08:39 +00001124 self._handle_pidfile_error(
1125 'autoserv died without writing exit code')
mblighbb421852008-03-11 22:36:16 +00001126
showard21baa452008-10-21 00:08:39 +00001127
1128 def _get_pidfile_info(self):
1129 """\
1130 After completion, self._state will contain:
1131 pid=None, exit_status=None if autoserv has not yet run
1132 pid!=None, exit_status=None if autoserv is running
1133 pid!=None, exit_status!=None if autoserv has completed
1134 """
1135 try:
1136 self._get_pidfile_info_helper()
showard170873e2009-01-07 00:22:26 +00001137 except self._PidfileException, exc:
showard21baa452008-10-21 00:08:39 +00001138 self._handle_pidfile_error('Pidfile error', traceback.format_exc())
mblighbb421852008-03-11 22:36:16 +00001139
1140
showard170873e2009-01-07 00:22:26 +00001141 def _handle_no_process(self):
jadmanski0afbb632008-06-06 21:10:57 +00001142 """\
1143 Called when no pidfile is found or no pid is in the pidfile.
1144 """
showard170873e2009-01-07 00:22:26 +00001145 message = 'No pid found at %s' % self.pidfile_id
showardb18134f2009-03-20 20:52:18 +00001146 logging.info(message)
showard170873e2009-01-07 00:22:26 +00001147 if time.time() - self._start_time > PIDFILE_TIMEOUT:
1148 email_manager.manager.enqueue_notify_email(
jadmanski0afbb632008-06-06 21:10:57 +00001149 'Process has failed to write pidfile', message)
showard35162b02009-03-03 02:17:30 +00001150 self.on_lost_process()
mbligh90a549d2008-03-25 23:52:34 +00001151
1152
showard35162b02009-03-03 02:17:30 +00001153 def on_lost_process(self, process=None):
jadmanski0afbb632008-06-06 21:10:57 +00001154 """\
1155 Called when autoserv has exited without writing an exit status,
1156 or we've timed out waiting for autoserv to write a pid to the
1157 pidfile. In either case, we just return failure and the caller
1158 should signal some kind of warning.
mbligh90a549d2008-03-25 23:52:34 +00001159
showard170873e2009-01-07 00:22:26 +00001160 process is unimportant here, as it shouldn't be used by anyone.
jadmanski0afbb632008-06-06 21:10:57 +00001161 """
1162 self.lost_process = True
showard170873e2009-01-07 00:22:26 +00001163 self._state.process = process
showard21baa452008-10-21 00:08:39 +00001164 self._state.exit_status = 1
1165 self._state.num_tests_failed = 0
mbligh90a549d2008-03-25 23:52:34 +00001166
1167
jadmanski0afbb632008-06-06 21:10:57 +00001168 def exit_code(self):
showard21baa452008-10-21 00:08:39 +00001169 self._get_pidfile_info()
1170 return self._state.exit_status
1171
1172
1173 def num_tests_failed(self):
1174 self._get_pidfile_info()
1175 assert self._state.num_tests_failed is not None
1176 return self._state.num_tests_failed
mblighbb421852008-03-11 22:36:16 +00001177
1178
mbligh36768f02008-02-22 18:28:33 +00001179class Agent(object):
showard77182562009-06-10 00:16:05 +00001180 """
1181 An agent for use by the Dispatcher class to perform a sequence of tasks.
1182
1183 The following methods are required on all task objects:
1184 poll() - Called periodically to let the task check its status and
1185 update its internal state. If the task succeeded.
1186 is_done() - Returns True if the task is finished.
1187 abort() - Called when an abort has been requested. The task must
1188 set its aborted attribute to True if it actually aborted.
1189
1190 The following attributes are required on all task objects:
1191 aborted - bool, True if this task was aborted.
1192 failure_tasks - A sequence of tasks to be run using a new Agent
1193 by the dispatcher should this task fail.
1194 success - bool, True if this task succeeded.
1195 queue_entry_ids - A sequence of HostQueueEntry ids this task handles.
1196 host_ids - A sequence of Host ids this task represents.
1197
1198 The following attribute is written to all task objects:
1199 agent - A reference to the Agent instance that the task has been
1200 added to.
1201 """
1202
1203
showard170873e2009-01-07 00:22:26 +00001204 def __init__(self, tasks, num_processes=1):
showard77182562009-06-10 00:16:05 +00001205 """
1206 @param tasks: A list of tasks as described in the class docstring.
1207 @param num_processes: The number of subprocesses the Agent represents.
1208 This is used by the Dispatcher for managing the load on the
1209 system. Defaults to 1.
1210 """
jadmanski0afbb632008-06-06 21:10:57 +00001211 self.active_task = None
showardd3dc1992009-04-22 21:01:40 +00001212 self.queue = None
showard77182562009-06-10 00:16:05 +00001213 # This is filled in by Dispatcher.add_agent()
jadmanski0afbb632008-06-06 21:10:57 +00001214 self.dispatcher = None
showard4c5374f2008-09-04 17:02:56 +00001215 self.num_processes = num_processes
jadmanski0afbb632008-06-06 21:10:57 +00001216
showard170873e2009-01-07 00:22:26 +00001217 self.queue_entry_ids = self._union_ids(task.queue_entry_ids
1218 for task in tasks)
1219 self.host_ids = self._union_ids(task.host_ids for task in tasks)
1220
showardd3dc1992009-04-22 21:01:40 +00001221 self._clear_queue()
jadmanski0afbb632008-06-06 21:10:57 +00001222 for task in tasks:
1223 self.add_task(task)
mbligh36768f02008-02-22 18:28:33 +00001224
1225
showardd3dc1992009-04-22 21:01:40 +00001226 def _clear_queue(self):
1227 self.queue = Queue.Queue(0)
1228
1229
showard170873e2009-01-07 00:22:26 +00001230 def _union_ids(self, id_lists):
1231 return set(itertools.chain(*id_lists))
1232
1233
jadmanski0afbb632008-06-06 21:10:57 +00001234 def add_task(self, task):
1235 self.queue.put_nowait(task)
1236 task.agent = self
mbligh36768f02008-02-22 18:28:33 +00001237
1238
jadmanski0afbb632008-06-06 21:10:57 +00001239 def tick(self):
showard21baa452008-10-21 00:08:39 +00001240 while not self.is_done():
showard08a36412009-05-05 01:01:13 +00001241 if self.active_task:
showard21baa452008-10-21 00:08:39 +00001242 self.active_task.poll()
1243 if not self.active_task.is_done():
1244 return
1245 self._next_task()
mbligh36768f02008-02-22 18:28:33 +00001246
1247
jadmanski0afbb632008-06-06 21:10:57 +00001248 def _next_task(self):
showardb18134f2009-03-20 20:52:18 +00001249 logging.info("agent picking task")
showard08a36412009-05-05 01:01:13 +00001250 if self.active_task is not None:
jadmanski0afbb632008-06-06 21:10:57 +00001251 assert self.active_task.is_done()
jadmanski0afbb632008-06-06 21:10:57 +00001252 if not self.active_task.success:
1253 self.on_task_failure()
showard08a36412009-05-05 01:01:13 +00001254 self.active_task = None
mblighe2586682008-02-29 22:45:46 +00001255
jadmanski0afbb632008-06-06 21:10:57 +00001256 if not self.is_done():
1257 self.active_task = self.queue.get_nowait()
mbligh36768f02008-02-22 18:28:33 +00001258
1259
jadmanski0afbb632008-06-06 21:10:57 +00001260 def on_task_failure(self):
showardd3dc1992009-04-22 21:01:40 +00001261 self._clear_queue()
showardccbd6c52009-03-21 00:10:21 +00001262 # run failure tasks in a new Agent, so host_ids and queue_entry_ids will
1263 # get reset.
1264 new_agent = Agent(self.active_task.failure_tasks)
1265 self.dispatcher.add_agent(new_agent)
mbligh16c722d2008-03-05 00:58:44 +00001266
mblighe2586682008-02-29 22:45:46 +00001267
showard4c5374f2008-09-04 17:02:56 +00001268 def is_running(self):
jadmanski0afbb632008-06-06 21:10:57 +00001269 return self.active_task is not None
showardec113162008-05-08 00:52:49 +00001270
1271
jadmanski0afbb632008-06-06 21:10:57 +00001272 def is_done(self):
mblighd876f452008-12-03 15:09:17 +00001273 return self.active_task is None and self.queue.empty()
mbligh36768f02008-02-22 18:28:33 +00001274
1275
showardd3dc1992009-04-22 21:01:40 +00001276 def abort(self):
showard08a36412009-05-05 01:01:13 +00001277 # abort tasks until the queue is empty or a task ignores the abort
1278 while not self.is_done():
1279 if not self.active_task:
1280 self._next_task()
showardd3dc1992009-04-22 21:01:40 +00001281 self.active_task.abort()
showard08a36412009-05-05 01:01:13 +00001282 if not self.active_task.aborted:
1283 # tasks can choose to ignore aborts
showard20f9bdd2009-04-29 19:48:33 +00001284 return
1285 self.active_task = None
1286
showardd3dc1992009-04-22 21:01:40 +00001287
showard77182562009-06-10 00:16:05 +00001288class DelayedCallTask(object):
1289 """
1290 A task object like AgentTask for an Agent to run that waits for the
1291 specified amount of time to have elapsed before calling the supplied
1292 callback once and finishing. If the callback returns anything, it is
1293 assumed to be a new Agent instance and will be added to the dispatcher.
1294
1295 @attribute end_time: The absolute posix time after which this task will
1296 call its callback when it is polled and be finished.
1297
1298 Also has all attributes required by the Agent class.
1299 """
1300 def __init__(self, delay_seconds, callback, now_func=None):
1301 """
1302 @param delay_seconds: The delay in seconds from now that this task
1303 will call the supplied callback and be done.
1304 @param callback: A callable to be called by this task once after at
1305 least delay_seconds time has elapsed. It must return None
1306 or a new Agent instance.
1307 @param now_func: A time.time like function. Default: time.time.
1308 Used for testing.
1309 """
1310 assert delay_seconds > 0
1311 assert callable(callback)
1312 if not now_func:
1313 now_func = time.time
1314 self._now_func = now_func
1315 self._callback = callback
1316
1317 self.end_time = self._now_func() + delay_seconds
1318
1319 # These attributes are required by Agent.
1320 self.aborted = False
1321 self.failure_tasks = ()
1322 self.host_ids = ()
1323 self.success = False
1324 self.queue_entry_ids = ()
1325 # This is filled in by Agent.add_task().
1326 self.agent = None
1327
1328
1329 def poll(self):
1330 if self._callback and self._now_func() >= self.end_time:
1331 new_agent = self._callback()
1332 if new_agent:
1333 self.agent.dispatcher.add_agent(new_agent)
1334 self._callback = None
1335 self.success = True
1336
1337
1338 def is_done(self):
1339 return not self._callback
1340
1341
1342 def abort(self):
1343 self.aborted = True
1344 self._callback = None
1345
1346
mbligh36768f02008-02-22 18:28:33 +00001347class AgentTask(object):
showardd3dc1992009-04-22 21:01:40 +00001348 def __init__(self, cmd, working_directory=None, failure_tasks=[],
1349 pidfile_name=None, paired_with_pidfile=None):
jadmanski0afbb632008-06-06 21:10:57 +00001350 self.done = False
1351 self.failure_tasks = failure_tasks
1352 self.started = False
1353 self.cmd = cmd
showard170873e2009-01-07 00:22:26 +00001354 self._working_directory = working_directory
jadmanski0afbb632008-06-06 21:10:57 +00001355 self.task = None
1356 self.agent = None
1357 self.monitor = None
1358 self.success = None
showardd3dc1992009-04-22 21:01:40 +00001359 self.aborted = False
showard170873e2009-01-07 00:22:26 +00001360 self.queue_entry_ids = []
1361 self.host_ids = []
1362 self.log_file = None
1363
1364
1365 def _set_ids(self, host=None, queue_entries=None):
1366 if queue_entries and queue_entries != [None]:
1367 self.host_ids = [entry.host.id for entry in queue_entries]
1368 self.queue_entry_ids = [entry.id for entry in queue_entries]
1369 else:
1370 assert host
1371 self.host_ids = [host.id]
mbligh36768f02008-02-22 18:28:33 +00001372
1373
jadmanski0afbb632008-06-06 21:10:57 +00001374 def poll(self):
showard08a36412009-05-05 01:01:13 +00001375 if not self.started:
1376 self.start()
1377 self.tick()
1378
1379
1380 def tick(self):
jadmanski0afbb632008-06-06 21:10:57 +00001381 if self.monitor:
showard08a36412009-05-05 01:01:13 +00001382 exit_code = self.monitor.exit_code()
1383 if exit_code is None:
1384 return
1385 success = (exit_code == 0)
jadmanski0afbb632008-06-06 21:10:57 +00001386 else:
1387 success = False
mbligh36768f02008-02-22 18:28:33 +00001388
jadmanski0afbb632008-06-06 21:10:57 +00001389 self.finished(success)
mbligh36768f02008-02-22 18:28:33 +00001390
1391
jadmanski0afbb632008-06-06 21:10:57 +00001392 def is_done(self):
1393 return self.done
mbligh36768f02008-02-22 18:28:33 +00001394
1395
jadmanski0afbb632008-06-06 21:10:57 +00001396 def finished(self, success):
showard08a36412009-05-05 01:01:13 +00001397 if self.done:
1398 return
jadmanski0afbb632008-06-06 21:10:57 +00001399 self.done = True
1400 self.success = success
1401 self.epilog()
mbligh36768f02008-02-22 18:28:33 +00001402
1403
jadmanski0afbb632008-06-06 21:10:57 +00001404 def prolog(self):
1405 pass
mblighd64e5702008-04-04 21:39:28 +00001406
1407
jadmanski0afbb632008-06-06 21:10:57 +00001408 def create_temp_resultsdir(self, suffix=''):
showard170873e2009-01-07 00:22:26 +00001409 self.temp_results_dir = _drone_manager.get_temporary_path('agent_task')
mblighd64e5702008-04-04 21:39:28 +00001410
mbligh36768f02008-02-22 18:28:33 +00001411
jadmanski0afbb632008-06-06 21:10:57 +00001412 def cleanup(self):
showard6b733412009-04-27 20:09:18 +00001413 if self.monitor and self.monitor.has_process() and self.log_file:
showard170873e2009-01-07 00:22:26 +00001414 _drone_manager.copy_to_results_repository(
1415 self.monitor.get_process(), self.log_file)
mbligh36768f02008-02-22 18:28:33 +00001416
1417
jadmanski0afbb632008-06-06 21:10:57 +00001418 def epilog(self):
1419 self.cleanup()
mbligh36768f02008-02-22 18:28:33 +00001420
1421
jadmanski0afbb632008-06-06 21:10:57 +00001422 def start(self):
1423 assert self.agent
1424
1425 if not self.started:
1426 self.prolog()
1427 self.run()
1428
1429 self.started = True
1430
1431
1432 def abort(self):
1433 if self.monitor:
1434 self.monitor.kill()
1435 self.done = True
showardd3dc1992009-04-22 21:01:40 +00001436 self.aborted = True
jadmanski0afbb632008-06-06 21:10:57 +00001437 self.cleanup()
1438
1439
showard170873e2009-01-07 00:22:26 +00001440 def set_host_log_file(self, base_name, host):
1441 filename = '%s.%s' % (time.time(), base_name)
1442 self.log_file = os.path.join('hosts', host.hostname, filename)
1443
1444
showardde634ee2009-01-30 01:44:24 +00001445 def _get_consistent_execution_tag(self, queue_entries):
1446 first_execution_tag = queue_entries[0].execution_tag()
1447 for queue_entry in queue_entries[1:]:
1448 assert queue_entry.execution_tag() == first_execution_tag, (
1449 '%s (%s) != %s (%s)' % (queue_entry.execution_tag(),
1450 queue_entry,
1451 first_execution_tag,
1452 queue_entries[0]))
1453 return first_execution_tag
1454
1455
showarda1e74b32009-05-12 17:32:04 +00001456 def _copy_results(self, queue_entries, use_monitor=None):
showardde634ee2009-01-30 01:44:24 +00001457 assert len(queue_entries) > 0
showard6b733412009-04-27 20:09:18 +00001458 if use_monitor is None:
1459 assert self.monitor
1460 use_monitor = self.monitor
1461 assert use_monitor.has_process()
showardde634ee2009-01-30 01:44:24 +00001462 execution_tag = self._get_consistent_execution_tag(queue_entries)
showard678df4f2009-02-04 21:36:39 +00001463 results_path = execution_tag + '/'
showard6b733412009-04-27 20:09:18 +00001464 _drone_manager.copy_to_results_repository(use_monitor.get_process(),
showard678df4f2009-02-04 21:36:39 +00001465 results_path)
showardde634ee2009-01-30 01:44:24 +00001466
showarda1e74b32009-05-12 17:32:04 +00001467
1468 def _parse_results(self, queue_entries):
showardde634ee2009-01-30 01:44:24 +00001469 reparse_task = FinalReparseTask(queue_entries)
1470 self.agent.dispatcher.add_agent(Agent([reparse_task], num_processes=0))
1471
1472
showarda1e74b32009-05-12 17:32:04 +00001473 def _copy_and_parse_results(self, queue_entries, use_monitor=None):
1474 self._copy_results(queue_entries, use_monitor)
1475 self._parse_results(queue_entries)
1476
1477
showardd3dc1992009-04-22 21:01:40 +00001478 def run(self, pidfile_name=_AUTOSERV_PID_FILE, paired_with_pidfile=None):
jadmanski0afbb632008-06-06 21:10:57 +00001479 if self.cmd:
showard170873e2009-01-07 00:22:26 +00001480 self.monitor = PidfileRunMonitor()
1481 self.monitor.run(self.cmd, self._working_directory,
1482 nice_level=AUTOSERV_NICE_LEVEL,
showardd3dc1992009-04-22 21:01:40 +00001483 log_file=self.log_file,
1484 pidfile_name=pidfile_name,
1485 paired_with_pidfile=paired_with_pidfile)
mbligh36768f02008-02-22 18:28:33 +00001486
1487
showardd9205182009-04-27 20:09:55 +00001488class TaskWithJobKeyvals(object):
1489 """AgentTask mixin providing functionality to help with job keyval files."""
1490 _KEYVAL_FILE = 'keyval'
1491 def _format_keyval(self, key, value):
1492 return '%s=%s' % (key, value)
1493
1494
1495 def _keyval_path(self):
1496 """Subclasses must override this"""
1497 raise NotImplemented
1498
1499
1500 def _write_keyval_after_job(self, field, value):
1501 assert self.monitor
1502 if not self.monitor.has_process():
1503 return
1504 _drone_manager.write_lines_to_file(
1505 self._keyval_path(), [self._format_keyval(field, value)],
1506 paired_with_process=self.monitor.get_process())
1507
1508
1509 def _job_queued_keyval(self, job):
1510 return 'job_queued', int(time.mktime(job.created_on.timetuple()))
1511
1512
1513 def _write_job_finished(self):
1514 self._write_keyval_after_job("job_finished", int(time.time()))
1515
1516
1517class RepairTask(AgentTask, TaskWithJobKeyvals):
showarde788ea62008-11-17 21:02:47 +00001518 def __init__(self, host, queue_entry=None):
jadmanski0afbb632008-06-06 21:10:57 +00001519 """\
showard170873e2009-01-07 00:22:26 +00001520 queue_entry: queue entry to mark failed if this repair fails.
jadmanski0afbb632008-06-06 21:10:57 +00001521 """
jadmanskifb7cfb12008-07-09 14:13:21 +00001522 protection = host_protections.Protection.get_string(host.protection)
jadmanski542537f2008-07-24 14:14:56 +00001523 # normalize the protection name
1524 protection = host_protections.Protection.get_attr_name(protection)
showard170873e2009-01-07 00:22:26 +00001525
jadmanski0afbb632008-06-06 21:10:57 +00001526 self.host = host
showardccbd6c52009-03-21 00:10:21 +00001527 self.queue_entry_to_fail = queue_entry
1528 # *don't* include the queue entry in IDs -- if the queue entry is
1529 # aborted, we want to leave the repair task running
1530 self._set_ids(host=host)
showard170873e2009-01-07 00:22:26 +00001531
1532 self.create_temp_resultsdir('.repair')
showard87ba02a2009-04-20 19:37:32 +00001533 cmd = _autoserv_command_line(host.hostname, self.temp_results_dir,
1534 ['-R', '--host-protection', protection],
1535 queue_entry=queue_entry)
showard170873e2009-01-07 00:22:26 +00001536 super(RepairTask, self).__init__(cmd, self.temp_results_dir)
1537
showard170873e2009-01-07 00:22:26 +00001538 self.set_host_log_file('repair', self.host)
mblighe2586682008-02-29 22:45:46 +00001539
mbligh36768f02008-02-22 18:28:33 +00001540
jadmanski0afbb632008-06-06 21:10:57 +00001541 def prolog(self):
showardb18134f2009-03-20 20:52:18 +00001542 logging.info("repair_task starting")
jadmanski0afbb632008-06-06 21:10:57 +00001543 self.host.set_status('Repairing')
showardccbd6c52009-03-21 00:10:21 +00001544 if self.queue_entry_to_fail:
1545 self.queue_entry_to_fail.requeue()
mbligh36768f02008-02-22 18:28:33 +00001546
1547
showardd9205182009-04-27 20:09:55 +00001548 def _keyval_path(self):
1549 return os.path.join(self.temp_results_dir, self._KEYVAL_FILE)
1550
1551
showardde634ee2009-01-30 01:44:24 +00001552 def _fail_queue_entry(self):
showardccbd6c52009-03-21 00:10:21 +00001553 assert self.queue_entry_to_fail
1554
1555 if self.queue_entry_to_fail.meta_host:
1556 return # don't fail metahost entries, they'll be reassigned
1557
1558 self.queue_entry_to_fail.update_from_database()
1559 if self.queue_entry_to_fail.status != 'Queued':
1560 return # entry has been aborted
1561
1562 self.queue_entry_to_fail.set_execution_subdir()
showardd9205182009-04-27 20:09:55 +00001563 queued_key, queued_time = self._job_queued_keyval(
1564 self.queue_entry_to_fail.job)
1565 self._write_keyval_after_job(queued_key, queued_time)
1566 self._write_job_finished()
showard678df4f2009-02-04 21:36:39 +00001567 # copy results logs into the normal place for job results
1568 _drone_manager.copy_results_on_drone(
1569 self.monitor.get_process(),
1570 source_path=self.temp_results_dir + '/',
showardccbd6c52009-03-21 00:10:21 +00001571 destination_path=self.queue_entry_to_fail.execution_tag() + '/')
showard678df4f2009-02-04 21:36:39 +00001572
showarda1e74b32009-05-12 17:32:04 +00001573 self._copy_results([self.queue_entry_to_fail])
1574 if self.queue_entry_to_fail.job.parse_failed_repair:
1575 self._parse_results([self.queue_entry_to_fail])
showardccbd6c52009-03-21 00:10:21 +00001576 self.queue_entry_to_fail.handle_host_failure()
showardde634ee2009-01-30 01:44:24 +00001577
1578
jadmanski0afbb632008-06-06 21:10:57 +00001579 def epilog(self):
1580 super(RepairTask, self).epilog()
showard6d7b2ff2009-06-10 00:16:47 +00001581
1582 tasks = models.SpecialTask.objects.filter(host__id=self.host.id,
1583 is_active=True)
1584 for task in tasks:
1585 task.is_complete = True
1586 task.save()
1587
jadmanski0afbb632008-06-06 21:10:57 +00001588 if self.success:
1589 self.host.set_status('Ready')
1590 else:
1591 self.host.set_status('Repair Failed')
showardccbd6c52009-03-21 00:10:21 +00001592 if self.queue_entry_to_fail:
showardde634ee2009-01-30 01:44:24 +00001593 self._fail_queue_entry()
mbligh36768f02008-02-22 18:28:33 +00001594
1595
showard8fe93b52008-11-18 17:53:22 +00001596class PreJobTask(AgentTask):
showard170873e2009-01-07 00:22:26 +00001597 def epilog(self):
1598 super(PreJobTask, self).epilog()
showard8fe93b52008-11-18 17:53:22 +00001599 should_copy_results = (self.queue_entry and not self.success
1600 and not self.queue_entry.meta_host)
1601 if should_copy_results:
1602 self.queue_entry.set_execution_subdir()
showard170873e2009-01-07 00:22:26 +00001603 destination = os.path.join(self.queue_entry.execution_tag(),
1604 os.path.basename(self.log_file))
1605 _drone_manager.copy_to_results_repository(
1606 self.monitor.get_process(), self.log_file,
1607 destination_path=destination)
showard8fe93b52008-11-18 17:53:22 +00001608
1609
1610class VerifyTask(PreJobTask):
showard9976ce92008-10-15 20:28:13 +00001611 def __init__(self, queue_entry=None, host=None):
jadmanski0afbb632008-06-06 21:10:57 +00001612 assert bool(queue_entry) != bool(host)
jadmanski0afbb632008-06-06 21:10:57 +00001613 self.host = host or queue_entry.host
1614 self.queue_entry = queue_entry
mbligh36768f02008-02-22 18:28:33 +00001615
jadmanski0afbb632008-06-06 21:10:57 +00001616 self.create_temp_resultsdir('.verify')
showard87ba02a2009-04-20 19:37:32 +00001617 cmd = _autoserv_command_line(self.host.hostname, self.temp_results_dir,
1618 ['-v'], queue_entry=queue_entry)
showarde788ea62008-11-17 21:02:47 +00001619 failure_tasks = [RepairTask(self.host, queue_entry=queue_entry)]
showard170873e2009-01-07 00:22:26 +00001620 super(VerifyTask, self).__init__(cmd, self.temp_results_dir,
1621 failure_tasks=failure_tasks)
mblighe2586682008-02-29 22:45:46 +00001622
showard170873e2009-01-07 00:22:26 +00001623 self.set_host_log_file('verify', self.host)
1624 self._set_ids(host=host, queue_entries=[queue_entry])
mblighe2586682008-02-29 22:45:46 +00001625
1626
jadmanski0afbb632008-06-06 21:10:57 +00001627 def prolog(self):
showard8fe93b52008-11-18 17:53:22 +00001628 super(VerifyTask, self).prolog()
showardb18134f2009-03-20 20:52:18 +00001629 logging.info("starting verify on %s", self.host.hostname)
jadmanski0afbb632008-06-06 21:10:57 +00001630 if self.queue_entry:
1631 self.queue_entry.set_status('Verifying')
jadmanski0afbb632008-06-06 21:10:57 +00001632 self.host.set_status('Verifying')
mbligh36768f02008-02-22 18:28:33 +00001633
1634
jadmanski0afbb632008-06-06 21:10:57 +00001635 def epilog(self):
1636 super(VerifyTask, self).epilog()
mbligh36768f02008-02-22 18:28:33 +00001637
jadmanski0afbb632008-06-06 21:10:57 +00001638 if self.success:
showard6d7b2ff2009-06-10 00:16:47 +00001639 tasks = models.SpecialTask.objects.filter(host__id=self.host.id,
1640 is_active=True)
1641 for task in tasks:
1642 task.is_complete=True
1643 task.save()
1644
jadmanski0afbb632008-06-06 21:10:57 +00001645 self.host.set_status('Ready')
mbligh36768f02008-02-22 18:28:33 +00001646
1647
showardd9205182009-04-27 20:09:55 +00001648class QueueTask(AgentTask, TaskWithJobKeyvals):
showardf1ae3542009-05-11 19:26:02 +00001649 def __init__(self, job, queue_entries, cmd, group_name=''):
jadmanski0afbb632008-06-06 21:10:57 +00001650 self.job = job
1651 self.queue_entries = queue_entries
showardf1ae3542009-05-11 19:26:02 +00001652 self.group_name = group_name
showard170873e2009-01-07 00:22:26 +00001653 super(QueueTask, self).__init__(cmd, self._execution_tag())
1654 self._set_ids(queue_entries=queue_entries)
mbligh36768f02008-02-22 18:28:33 +00001655
1656
showard73ec0442009-02-07 02:05:20 +00001657 def _keyval_path(self):
showardd9205182009-04-27 20:09:55 +00001658 return os.path.join(self._execution_tag(), self._KEYVAL_FILE)
showard73ec0442009-02-07 02:05:20 +00001659
1660
1661 def _write_keyvals_before_job_helper(self, keyval_dict, keyval_path):
1662 keyval_contents = '\n'.join(self._format_keyval(key, value)
1663 for key, value in keyval_dict.iteritems())
1664 # always end with a newline to allow additional keyvals to be written
1665 keyval_contents += '\n'
1666 _drone_manager.attach_file_to_execution(self._execution_tag(),
1667 keyval_contents,
1668 file_path=keyval_path)
1669
1670
1671 def _write_keyvals_before_job(self, keyval_dict):
1672 self._write_keyvals_before_job_helper(keyval_dict, self._keyval_path())
1673
1674
showard170873e2009-01-07 00:22:26 +00001675 def _write_host_keyvals(self, host):
1676 keyval_path = os.path.join(self._execution_tag(), 'host_keyvals',
1677 host.hostname)
1678 platform, all_labels = host.platform_and_labels()
showard73ec0442009-02-07 02:05:20 +00001679 keyval_dict = dict(platform=platform, labels=','.join(all_labels))
1680 self._write_keyvals_before_job_helper(keyval_dict, keyval_path)
showardd8e548a2008-09-09 03:04:57 +00001681
1682
showard170873e2009-01-07 00:22:26 +00001683 def _execution_tag(self):
1684 return self.queue_entries[0].execution_tag()
mblighbb421852008-03-11 22:36:16 +00001685
1686
jadmanski0afbb632008-06-06 21:10:57 +00001687 def prolog(self):
showardd9205182009-04-27 20:09:55 +00001688 queued_key, queued_time = self._job_queued_keyval(self.job)
showardf1ae3542009-05-11 19:26:02 +00001689 keyval_dict = {queued_key: queued_time}
1690 if self.group_name:
1691 keyval_dict['host_group_name'] = self.group_name
1692 self._write_keyvals_before_job(keyval_dict)
jadmanski0afbb632008-06-06 21:10:57 +00001693 for queue_entry in self.queue_entries:
showard170873e2009-01-07 00:22:26 +00001694 self._write_host_keyvals(queue_entry.host)
jadmanski0afbb632008-06-06 21:10:57 +00001695 queue_entry.set_status('Running')
showard12f3e322009-05-13 21:27:42 +00001696 queue_entry.update_field('started_on', datetime.datetime.now())
jadmanski0afbb632008-06-06 21:10:57 +00001697 queue_entry.host.set_status('Running')
showard21baa452008-10-21 00:08:39 +00001698 queue_entry.host.update_field('dirty', 1)
showard2bab8f42008-11-12 18:15:22 +00001699 if self.job.synch_count == 1:
jadmanski0afbb632008-06-06 21:10:57 +00001700 assert len(self.queue_entries) == 1
1701 self.job.write_to_machines_file(self.queue_entries[0])
mbligh36768f02008-02-22 18:28:33 +00001702
1703
showard35162b02009-03-03 02:17:30 +00001704 def _write_lost_process_error_file(self):
1705 error_file_path = os.path.join(self._execution_tag(), 'job_failure')
1706 _drone_manager.write_lines_to_file(error_file_path,
1707 [_LOST_PROCESS_ERROR])
1708
1709
showardd3dc1992009-04-22 21:01:40 +00001710 def _finish_task(self):
showard08a36412009-05-05 01:01:13 +00001711 if not self.monitor:
1712 return
1713
showardd9205182009-04-27 20:09:55 +00001714 self._write_job_finished()
1715
showardd3dc1992009-04-22 21:01:40 +00001716 # both of these conditionals can be true, iff the process ran, wrote a
1717 # pid to its pidfile, and then exited without writing an exit code
showard35162b02009-03-03 02:17:30 +00001718 if self.monitor.has_process():
showardd3dc1992009-04-22 21:01:40 +00001719 gather_task = GatherLogsTask(self.job, self.queue_entries)
1720 self.agent.dispatcher.add_agent(Agent(tasks=[gather_task]))
showard35162b02009-03-03 02:17:30 +00001721
1722 if self.monitor.lost_process:
1723 self._write_lost_process_error_file()
1724 for queue_entry in self.queue_entries:
1725 queue_entry.set_status(models.HostQueueEntry.Status.FAILED)
jadmanskif7fa2cc2008-10-01 14:13:23 +00001726
1727
showardcbd74612008-11-19 21:42:02 +00001728 def _write_status_comment(self, comment):
showard170873e2009-01-07 00:22:26 +00001729 _drone_manager.write_lines_to_file(
1730 os.path.join(self._execution_tag(), 'status.log'),
1731 ['INFO\t----\t----\t' + comment],
showard35162b02009-03-03 02:17:30 +00001732 paired_with_process=self.monitor.get_process())
showardcbd74612008-11-19 21:42:02 +00001733
1734
jadmanskif7fa2cc2008-10-01 14:13:23 +00001735 def _log_abort(self):
showard170873e2009-01-07 00:22:26 +00001736 if not self.monitor or not self.monitor.has_process():
1737 return
1738
jadmanskif7fa2cc2008-10-01 14:13:23 +00001739 # build up sets of all the aborted_by and aborted_on values
1740 aborted_by, aborted_on = set(), set()
1741 for queue_entry in self.queue_entries:
1742 if queue_entry.aborted_by:
1743 aborted_by.add(queue_entry.aborted_by)
1744 t = int(time.mktime(queue_entry.aborted_on.timetuple()))
1745 aborted_on.add(t)
1746
1747 # extract some actual, unique aborted by value and write it out
1748 assert len(aborted_by) <= 1
1749 if len(aborted_by) == 1:
showardcbd74612008-11-19 21:42:02 +00001750 aborted_by_value = aborted_by.pop()
1751 aborted_on_value = max(aborted_on)
1752 else:
1753 aborted_by_value = 'autotest_system'
1754 aborted_on_value = int(time.time())
showard170873e2009-01-07 00:22:26 +00001755
showarda0382352009-02-11 23:36:43 +00001756 self._write_keyval_after_job("aborted_by", aborted_by_value)
1757 self._write_keyval_after_job("aborted_on", aborted_on_value)
showard170873e2009-01-07 00:22:26 +00001758
showardcbd74612008-11-19 21:42:02 +00001759 aborted_on_string = str(datetime.datetime.fromtimestamp(
1760 aborted_on_value))
1761 self._write_status_comment('Job aborted by %s on %s' %
1762 (aborted_by_value, aborted_on_string))
jadmanskic2ac77f2008-05-16 21:44:04 +00001763
1764
jadmanski0afbb632008-06-06 21:10:57 +00001765 def abort(self):
1766 super(QueueTask, self).abort()
jadmanskif7fa2cc2008-10-01 14:13:23 +00001767 self._log_abort()
showardd3dc1992009-04-22 21:01:40 +00001768 self._finish_task()
showard21baa452008-10-21 00:08:39 +00001769
1770
jadmanski0afbb632008-06-06 21:10:57 +00001771 def epilog(self):
1772 super(QueueTask, self).epilog()
showardd3dc1992009-04-22 21:01:40 +00001773 self._finish_task()
1774 logging.info("queue_task finished with success=%s", self.success)
mbligh36768f02008-02-22 18:28:33 +00001775
1776
mblighbb421852008-03-11 22:36:16 +00001777class RecoveryQueueTask(QueueTask):
jadmanski0afbb632008-06-06 21:10:57 +00001778 def __init__(self, job, queue_entries, run_monitor):
showard170873e2009-01-07 00:22:26 +00001779 super(RecoveryQueueTask, self).__init__(job, queue_entries, cmd=None)
showard5add1c82009-05-26 19:27:46 +00001780 self.monitor = run_monitor
1781 self.started = True
1782 # since we set started=True here, prolog() and run() shouldn't be called
mblighbb421852008-03-11 22:36:16 +00001783
1784
jadmanski0afbb632008-06-06 21:10:57 +00001785 def run(self):
showard5add1c82009-05-26 19:27:46 +00001786 raise NotImplemented('This should never be called')
mblighbb421852008-03-11 22:36:16 +00001787
1788
jadmanski0afbb632008-06-06 21:10:57 +00001789 def prolog(self):
showard5add1c82009-05-26 19:27:46 +00001790 raise NotImplemented('This should never be called')
mblighbb421852008-03-11 22:36:16 +00001791
1792
showardd3dc1992009-04-22 21:01:40 +00001793class PostJobTask(AgentTask):
1794 def __init__(self, queue_entries, pidfile_name, logfile_name,
1795 run_monitor=None):
1796 """
1797 If run_monitor != None, we're recovering a running task.
1798 """
1799 self._queue_entries = queue_entries
1800 self._pidfile_name = pidfile_name
showardd3dc1992009-04-22 21:01:40 +00001801
1802 self._execution_tag = self._get_consistent_execution_tag(queue_entries)
1803 self._results_dir = _drone_manager.absolute_path(self._execution_tag)
1804 self._autoserv_monitor = PidfileRunMonitor()
1805 self._autoserv_monitor.attach_to_existing_process(self._execution_tag)
1806 self._paired_with_pidfile = self._autoserv_monitor.pidfile_id
1807
1808 if _testing_mode:
1809 command = 'true'
1810 else:
1811 command = self._generate_command(self._results_dir)
1812
1813 super(PostJobTask, self).__init__(cmd=command,
1814 working_directory=self._execution_tag)
showard5add1c82009-05-26 19:27:46 +00001815 # this must happen *after* the super call
1816 self.monitor = run_monitor
1817 if run_monitor:
1818 self.started = True
showardd3dc1992009-04-22 21:01:40 +00001819
1820 self.log_file = os.path.join(self._execution_tag, logfile_name)
1821 self._final_status = self._determine_final_status()
1822
1823
1824 def _generate_command(self, results_dir):
1825 raise NotImplementedError('Subclasses must override this')
1826
1827
1828 def _job_was_aborted(self):
1829 was_aborted = None
1830 for queue_entry in self._queue_entries:
1831 queue_entry.update_from_database()
1832 if was_aborted is None: # first queue entry
1833 was_aborted = bool(queue_entry.aborted)
1834 elif was_aborted != bool(queue_entry.aborted): # subsequent entries
1835 email_manager.manager.enqueue_notify_email(
1836 'Inconsistent abort state',
1837 'Queue entries have inconsistent abort state: ' +
1838 ', '.join('%s (%s)' % (queue_entry, queue_entry.aborted)))
1839 # don't crash here, just assume true
1840 return True
1841 return was_aborted
1842
1843
1844 def _determine_final_status(self):
1845 if self._job_was_aborted():
1846 return models.HostQueueEntry.Status.ABORTED
1847
1848 # we'll use a PidfileRunMonitor to read the autoserv exit status
1849 if self._autoserv_monitor.exit_code() == 0:
1850 return models.HostQueueEntry.Status.COMPLETED
1851 return models.HostQueueEntry.Status.FAILED
1852
1853
1854 def run(self):
showard5add1c82009-05-26 19:27:46 +00001855 assert not self.monitor
showardd3dc1992009-04-22 21:01:40 +00001856
showard5add1c82009-05-26 19:27:46 +00001857 # make sure we actually have results to work with.
1858 # this should never happen in normal operation.
1859 if not self._autoserv_monitor.has_process():
1860 email_manager.manager.enqueue_notify_email(
1861 'No results in post-job task',
1862 'No results in post-job task at %s' %
1863 self._autoserv_monitor.pidfile_id)
1864 self.finished(False)
1865 return
1866
1867 super(PostJobTask, self).run(
1868 pidfile_name=self._pidfile_name,
1869 paired_with_pidfile=self._paired_with_pidfile)
showardd3dc1992009-04-22 21:01:40 +00001870
1871
1872 def _set_all_statuses(self, status):
1873 for queue_entry in self._queue_entries:
1874 queue_entry.set_status(status)
1875
1876
1877 def abort(self):
1878 # override AgentTask.abort() to avoid killing the process and ending
1879 # the task. post-job tasks continue when the job is aborted.
1880 pass
1881
1882
1883class GatherLogsTask(PostJobTask):
1884 """
1885 Task responsible for
1886 * gathering uncollected logs (if Autoserv crashed hard or was killed)
1887 * copying logs to the results repository
1888 * spawning CleanupTasks for hosts, if necessary
1889 * spawning a FinalReparseTask for the job
1890 """
1891 def __init__(self, job, queue_entries, run_monitor=None):
1892 self._job = job
1893 super(GatherLogsTask, self).__init__(
1894 queue_entries, pidfile_name=_CRASHINFO_PID_FILE,
1895 logfile_name='.collect_crashinfo.log', run_monitor=run_monitor)
1896 self._set_ids(queue_entries=queue_entries)
1897
1898
1899 def _generate_command(self, results_dir):
1900 host_list = ','.join(queue_entry.host.hostname
1901 for queue_entry in self._queue_entries)
1902 return [_autoserv_path , '-p', '--collect-crashinfo', '-m', host_list,
1903 '-r', results_dir]
1904
1905
1906 def prolog(self):
1907 super(GatherLogsTask, self).prolog()
1908 self._set_all_statuses(models.HostQueueEntry.Status.GATHERING)
1909
1910
1911 def _reboot_hosts(self):
1912 reboot_after = self._job.reboot_after
1913 do_reboot = False
showard6b733412009-04-27 20:09:18 +00001914 if self._final_status == models.HostQueueEntry.Status.ABORTED:
1915 do_reboot = True
1916 elif reboot_after == models.RebootAfter.ALWAYS:
showardd3dc1992009-04-22 21:01:40 +00001917 do_reboot = True
1918 elif reboot_after == models.RebootAfter.IF_ALL_TESTS_PASSED:
1919 final_success = (
1920 self._final_status == models.HostQueueEntry.Status.COMPLETED)
1921 num_tests_failed = self._autoserv_monitor.num_tests_failed()
1922 do_reboot = (final_success and num_tests_failed == 0)
1923
1924 for queue_entry in self._queue_entries:
1925 if do_reboot:
1926 # don't pass the queue entry to the CleanupTask. if the cleanup
1927 # fails, the job doesn't care -- it's over.
1928 cleanup_task = CleanupTask(host=queue_entry.host)
1929 self.agent.dispatcher.add_agent(Agent([cleanup_task]))
1930 else:
1931 queue_entry.host.set_status('Ready')
1932
1933
1934 def epilog(self):
1935 super(GatherLogsTask, self).epilog()
showardebc0fb72009-05-13 21:28:07 +00001936 if self._autoserv_monitor.has_process():
1937 self._copy_and_parse_results(self._queue_entries,
1938 use_monitor=self._autoserv_monitor)
showardd3dc1992009-04-22 21:01:40 +00001939 self._reboot_hosts()
1940
1941
showard0bbfc212009-04-29 21:06:13 +00001942 def run(self):
showard597bfd32009-05-08 18:22:50 +00001943 autoserv_exit_code = self._autoserv_monitor.exit_code()
1944 # only run if Autoserv exited due to some signal. if we have no exit
1945 # code, assume something bad (and signal-like) happened.
1946 if autoserv_exit_code is None or os.WIFSIGNALED(autoserv_exit_code):
showard0bbfc212009-04-29 21:06:13 +00001947 super(GatherLogsTask, self).run()
showard597bfd32009-05-08 18:22:50 +00001948 else:
1949 self.finished(True)
showard0bbfc212009-04-29 21:06:13 +00001950
1951
showard8fe93b52008-11-18 17:53:22 +00001952class CleanupTask(PreJobTask):
showardfa8629c2008-11-04 16:51:23 +00001953 def __init__(self, host=None, queue_entry=None):
1954 assert bool(host) ^ bool(queue_entry)
1955 if queue_entry:
1956 host = queue_entry.get_host()
showardfa8629c2008-11-04 16:51:23 +00001957 self.queue_entry = queue_entry
jadmanski0afbb632008-06-06 21:10:57 +00001958 self.host = host
showard170873e2009-01-07 00:22:26 +00001959
1960 self.create_temp_resultsdir('.cleanup')
showard87ba02a2009-04-20 19:37:32 +00001961 self.cmd = _autoserv_command_line(host.hostname, self.temp_results_dir,
1962 ['--cleanup'],
1963 queue_entry=queue_entry)
showarde788ea62008-11-17 21:02:47 +00001964 repair_task = RepairTask(host, queue_entry=queue_entry)
showard170873e2009-01-07 00:22:26 +00001965 super(CleanupTask, self).__init__(self.cmd, self.temp_results_dir,
1966 failure_tasks=[repair_task])
1967
1968 self._set_ids(host=host, queue_entries=[queue_entry])
1969 self.set_host_log_file('cleanup', self.host)
mbligh16c722d2008-03-05 00:58:44 +00001970
mblighd5c95802008-03-05 00:33:46 +00001971
jadmanski0afbb632008-06-06 21:10:57 +00001972 def prolog(self):
showard8fe93b52008-11-18 17:53:22 +00001973 super(CleanupTask, self).prolog()
showardb18134f2009-03-20 20:52:18 +00001974 logging.info("starting cleanup task for host: %s", self.host.hostname)
showard45ae8192008-11-05 19:32:53 +00001975 self.host.set_status("Cleaning")
mblighd5c95802008-03-05 00:33:46 +00001976
mblighd5c95802008-03-05 00:33:46 +00001977
showard21baa452008-10-21 00:08:39 +00001978 def epilog(self):
showard45ae8192008-11-05 19:32:53 +00001979 super(CleanupTask, self).epilog()
showard21baa452008-10-21 00:08:39 +00001980 if self.success:
showardfa8629c2008-11-04 16:51:23 +00001981 self.host.set_status('Ready')
showard21baa452008-10-21 00:08:39 +00001982 self.host.update_field('dirty', 0)
1983
1984
showardd3dc1992009-04-22 21:01:40 +00001985class FinalReparseTask(PostJobTask):
showard97aed502008-11-04 02:01:24 +00001986 _num_running_parses = 0
1987
showardd3dc1992009-04-22 21:01:40 +00001988 def __init__(self, queue_entries, run_monitor=None):
1989 super(FinalReparseTask, self).__init__(queue_entries,
1990 pidfile_name=_PARSER_PID_FILE,
1991 logfile_name='.parse.log',
1992 run_monitor=run_monitor)
showard170873e2009-01-07 00:22:26 +00001993 # don't use _set_ids, since we don't want to set the host_ids
1994 self.queue_entry_ids = [entry.id for entry in queue_entries]
showard5add1c82009-05-26 19:27:46 +00001995 self._parse_started = (run_monitor is not None)
showard97aed502008-11-04 02:01:24 +00001996
showard97aed502008-11-04 02:01:24 +00001997
1998 @classmethod
1999 def _increment_running_parses(cls):
2000 cls._num_running_parses += 1
2001
2002
2003 @classmethod
2004 def _decrement_running_parses(cls):
2005 cls._num_running_parses -= 1
2006
2007
2008 @classmethod
2009 def _can_run_new_parse(cls):
showardd1ee1dd2009-01-07 21:33:08 +00002010 return (cls._num_running_parses <
2011 scheduler_config.config.max_parse_processes)
showard97aed502008-11-04 02:01:24 +00002012
2013
2014 def prolog(self):
2015 super(FinalReparseTask, self).prolog()
showardd3dc1992009-04-22 21:01:40 +00002016 self._set_all_statuses(models.HostQueueEntry.Status.PARSING)
showard97aed502008-11-04 02:01:24 +00002017
2018
2019 def epilog(self):
2020 super(FinalReparseTask, self).epilog()
showardd3dc1992009-04-22 21:01:40 +00002021 self._set_all_statuses(self._final_status)
showard97aed502008-11-04 02:01:24 +00002022
2023
showardd3dc1992009-04-22 21:01:40 +00002024 def _generate_command(self, results_dir):
mbligh9e936402009-05-13 20:42:17 +00002025 return [_parser_path, '--write-pidfile', '-l', '2', '-r', '-o', '-P',
showardd3dc1992009-04-22 21:01:40 +00002026 results_dir]
showard97aed502008-11-04 02:01:24 +00002027
2028
showard08a36412009-05-05 01:01:13 +00002029 def tick(self):
2030 # override tick to keep trying to start until the parse count goes down
showard97aed502008-11-04 02:01:24 +00002031 # and we can, at which point we revert to default behavior
2032 if self._parse_started:
showard08a36412009-05-05 01:01:13 +00002033 super(FinalReparseTask, self).tick()
showard97aed502008-11-04 02:01:24 +00002034 else:
2035 self._try_starting_parse()
2036
2037
2038 def run(self):
2039 # override run() to not actually run unless we can
2040 self._try_starting_parse()
2041
2042
2043 def _try_starting_parse(self):
2044 if not self._can_run_new_parse():
2045 return
showard170873e2009-01-07 00:22:26 +00002046
showard97aed502008-11-04 02:01:24 +00002047 # actually run the parse command
showardd3dc1992009-04-22 21:01:40 +00002048 super(FinalReparseTask, self).run()
showard170873e2009-01-07 00:22:26 +00002049
showard97aed502008-11-04 02:01:24 +00002050 self._increment_running_parses()
2051 self._parse_started = True
2052
2053
2054 def finished(self, success):
2055 super(FinalReparseTask, self).finished(success)
showard678df4f2009-02-04 21:36:39 +00002056 if self._parse_started:
2057 self._decrement_running_parses()
showard97aed502008-11-04 02:01:24 +00002058
2059
showardc9ae1782009-01-30 01:42:37 +00002060class SetEntryPendingTask(AgentTask):
2061 def __init__(self, queue_entry):
2062 super(SetEntryPendingTask, self).__init__(cmd='')
2063 self._queue_entry = queue_entry
2064 self._set_ids(queue_entries=[queue_entry])
2065
2066
2067 def run(self):
2068 agent = self._queue_entry.on_pending()
2069 if agent:
2070 self.agent.dispatcher.add_agent(agent)
2071 self.finished(True)
2072
2073
showarda3c58572009-03-12 20:36:59 +00002074class DBError(Exception):
2075 """Raised by the DBObject constructor when its select fails."""
2076
2077
mbligh36768f02008-02-22 18:28:33 +00002078class DBObject(object):
showarda3c58572009-03-12 20:36:59 +00002079 """A miniature object relational model for the database."""
showard6ae5ea92009-02-25 00:11:51 +00002080
2081 # Subclasses MUST override these:
2082 _table_name = ''
2083 _fields = ()
2084
showarda3c58572009-03-12 20:36:59 +00002085 # A mapping from (type, id) to the instance of the object for that
2086 # particular id. This prevents us from creating new Job() and Host()
2087 # instances for every HostQueueEntry object that we instantiate as
2088 # multiple HQEs often share the same Job.
2089 _instances_by_type_and_id = weakref.WeakValueDictionary()
2090 _initialized = False
showard6ae5ea92009-02-25 00:11:51 +00002091
showarda3c58572009-03-12 20:36:59 +00002092
2093 def __new__(cls, id=None, **kwargs):
2094 """
2095 Look to see if we already have an instance for this particular type
2096 and id. If so, use it instead of creating a duplicate instance.
2097 """
2098 if id is not None:
2099 instance = cls._instances_by_type_and_id.get((cls, id))
2100 if instance:
2101 return instance
2102 return super(DBObject, cls).__new__(cls, id=id, **kwargs)
2103
2104
2105 def __init__(self, id=None, row=None, new_record=False, always_query=True):
jadmanski0afbb632008-06-06 21:10:57 +00002106 assert (bool(id) != bool(row))
showard6ae5ea92009-02-25 00:11:51 +00002107 assert self._table_name, '_table_name must be defined in your class'
2108 assert self._fields, '_fields must be defined in your class'
showarda3c58572009-03-12 20:36:59 +00002109 if not new_record:
2110 if self._initialized and not always_query:
2111 return # We've already been initialized.
2112 if id is None:
2113 id = row[0]
2114 # Tell future constructors to use us instead of re-querying while
2115 # this instance is still around.
2116 self._instances_by_type_and_id[(type(self), id)] = self
mbligh36768f02008-02-22 18:28:33 +00002117
showard6ae5ea92009-02-25 00:11:51 +00002118 self.__table = self._table_name
mbligh36768f02008-02-22 18:28:33 +00002119
jadmanski0afbb632008-06-06 21:10:57 +00002120 self.__new_record = new_record
mbligh36768f02008-02-22 18:28:33 +00002121
jadmanski0afbb632008-06-06 21:10:57 +00002122 if row is None:
showardccbd6c52009-03-21 00:10:21 +00002123 row = self._fetch_row_from_db(id)
mbligh36768f02008-02-22 18:28:33 +00002124
showarda3c58572009-03-12 20:36:59 +00002125 if self._initialized:
2126 differences = self._compare_fields_in_row(row)
2127 if differences:
showard7629f142009-03-27 21:02:02 +00002128 logging.warn(
2129 'initialized %s %s instance requery is updating: %s',
2130 type(self), self.id, differences)
showard2bab8f42008-11-12 18:15:22 +00002131 self._update_fields_from_row(row)
showarda3c58572009-03-12 20:36:59 +00002132 self._initialized = True
2133
2134
2135 @classmethod
2136 def _clear_instance_cache(cls):
2137 """Used for testing, clear the internal instance cache."""
2138 cls._instances_by_type_and_id.clear()
2139
2140
showardccbd6c52009-03-21 00:10:21 +00002141 def _fetch_row_from_db(self, row_id):
2142 sql = 'SELECT * FROM %s WHERE ID=%%s' % self.__table
2143 rows = _db.execute(sql, (row_id,))
2144 if not rows:
showard76e29d12009-04-15 21:53:10 +00002145 raise DBError("row not found (table=%s, row id=%s)"
2146 % (self.__table, row_id))
showardccbd6c52009-03-21 00:10:21 +00002147 return rows[0]
2148
2149
showarda3c58572009-03-12 20:36:59 +00002150 def _assert_row_length(self, row):
2151 assert len(row) == len(self._fields), (
2152 "table = %s, row = %s/%d, fields = %s/%d" % (
2153 self.__table, row, len(row), self._fields, len(self._fields)))
2154
2155
2156 def _compare_fields_in_row(self, row):
2157 """
2158 Given a row as returned by a SELECT query, compare it to our existing
2159 in memory fields.
2160
2161 @param row - A sequence of values corresponding to fields named in
2162 The class attribute _fields.
2163
2164 @returns A dictionary listing the differences keyed by field name
2165 containing tuples of (current_value, row_value).
2166 """
2167 self._assert_row_length(row)
2168 differences = {}
2169 for field, row_value in itertools.izip(self._fields, row):
2170 current_value = getattr(self, field)
2171 if current_value != row_value:
2172 differences[field] = (current_value, row_value)
2173 return differences
showard2bab8f42008-11-12 18:15:22 +00002174
2175
2176 def _update_fields_from_row(self, row):
showarda3c58572009-03-12 20:36:59 +00002177 """
2178 Update our field attributes using a single row returned by SELECT.
2179
2180 @param row - A sequence of values corresponding to fields named in
2181 the class fields list.
2182 """
2183 self._assert_row_length(row)
mbligh36768f02008-02-22 18:28:33 +00002184
showard2bab8f42008-11-12 18:15:22 +00002185 self._valid_fields = set()
showarda3c58572009-03-12 20:36:59 +00002186 for field, value in itertools.izip(self._fields, row):
showard2bab8f42008-11-12 18:15:22 +00002187 setattr(self, field, value)
2188 self._valid_fields.add(field)
mbligh36768f02008-02-22 18:28:33 +00002189
showard2bab8f42008-11-12 18:15:22 +00002190 self._valid_fields.remove('id')
mbligh36768f02008-02-22 18:28:33 +00002191
mblighe2586682008-02-29 22:45:46 +00002192
showardccbd6c52009-03-21 00:10:21 +00002193 def update_from_database(self):
2194 assert self.id is not None
2195 row = self._fetch_row_from_db(self.id)
2196 self._update_fields_from_row(row)
2197
2198
jadmanski0afbb632008-06-06 21:10:57 +00002199 def count(self, where, table = None):
2200 if not table:
2201 table = self.__table
mbligh36768f02008-02-22 18:28:33 +00002202
jadmanski0afbb632008-06-06 21:10:57 +00002203 rows = _db.execute("""
2204 SELECT count(*) FROM %s
2205 WHERE %s
2206 """ % (table, where))
mbligh6f8bab42008-02-29 22:45:14 +00002207
jadmanski0afbb632008-06-06 21:10:57 +00002208 assert len(rows) == 1
2209
2210 return int(rows[0][0])
mbligh36768f02008-02-22 18:28:33 +00002211
2212
showardd3dc1992009-04-22 21:01:40 +00002213 def update_field(self, field, value):
showard2bab8f42008-11-12 18:15:22 +00002214 assert field in self._valid_fields
mbligh36768f02008-02-22 18:28:33 +00002215
showard2bab8f42008-11-12 18:15:22 +00002216 if getattr(self, field) == value:
jadmanski0afbb632008-06-06 21:10:57 +00002217 return
mbligh36768f02008-02-22 18:28:33 +00002218
mblighf8c624d2008-07-03 16:58:45 +00002219 query = "UPDATE %s SET %s = %%s WHERE id = %%s" % (self.__table, field)
jadmanski0afbb632008-06-06 21:10:57 +00002220 _db.execute(query, (value, self.id))
2221
showard2bab8f42008-11-12 18:15:22 +00002222 setattr(self, field, value)
mbligh36768f02008-02-22 18:28:33 +00002223
2224
jadmanski0afbb632008-06-06 21:10:57 +00002225 def save(self):
2226 if self.__new_record:
showard6ae5ea92009-02-25 00:11:51 +00002227 keys = self._fields[1:] # avoid id
jadmanski0afbb632008-06-06 21:10:57 +00002228 columns = ','.join([str(key) for key in keys])
showard76e29d12009-04-15 21:53:10 +00002229 values = []
2230 for key in keys:
2231 value = getattr(self, key)
2232 if value is None:
2233 values.append('NULL')
2234 else:
2235 values.append('"%s"' % value)
showard89f84db2009-03-12 20:39:13 +00002236 values_str = ','.join(values)
2237 query = ('INSERT INTO %s (%s) VALUES (%s)' %
2238 (self.__table, columns, values_str))
jadmanski0afbb632008-06-06 21:10:57 +00002239 _db.execute(query)
showard89f84db2009-03-12 20:39:13 +00002240 # Update our id to the one the database just assigned to us.
2241 self.id = _db.execute('SELECT LAST_INSERT_ID()')[0][0]
mbligh36768f02008-02-22 18:28:33 +00002242
2243
jadmanski0afbb632008-06-06 21:10:57 +00002244 def delete(self):
showarda3c58572009-03-12 20:36:59 +00002245 self._instances_by_type_and_id.pop((type(self), id), None)
2246 self._initialized = False
2247 self._valid_fields.clear()
jadmanski0afbb632008-06-06 21:10:57 +00002248 query = 'DELETE FROM %s WHERE id=%%s' % self.__table
2249 _db.execute(query, (self.id,))
mblighe2586682008-02-29 22:45:46 +00002250
2251
showard63a34772008-08-18 19:32:50 +00002252 @staticmethod
2253 def _prefix_with(string, prefix):
2254 if string:
2255 string = prefix + string
2256 return string
2257
2258
jadmanski0afbb632008-06-06 21:10:57 +00002259 @classmethod
showard989f25d2008-10-01 11:38:11 +00002260 def fetch(cls, where='', params=(), joins='', order_by=''):
showard89f84db2009-03-12 20:39:13 +00002261 """
2262 Construct instances of our class based on the given database query.
2263
2264 @yields One class instance for each row fetched.
2265 """
showard63a34772008-08-18 19:32:50 +00002266 order_by = cls._prefix_with(order_by, 'ORDER BY ')
2267 where = cls._prefix_with(where, 'WHERE ')
2268 query = ('SELECT %(table)s.* FROM %(table)s %(joins)s '
showard6ae5ea92009-02-25 00:11:51 +00002269 '%(where)s %(order_by)s' % {'table' : cls._table_name,
showard63a34772008-08-18 19:32:50 +00002270 'joins' : joins,
2271 'where' : where,
2272 'order_by' : order_by})
2273 rows = _db.execute(query, params)
jadmanski0afbb632008-06-06 21:10:57 +00002274 for row in rows:
2275 yield cls(row=row)
mblighe2586682008-02-29 22:45:46 +00002276
mbligh36768f02008-02-22 18:28:33 +00002277
2278class IneligibleHostQueue(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002279 _table_name = 'ineligible_host_queues'
2280 _fields = ('id', 'job_id', 'host_id')
showard04c82c52008-05-29 19:38:12 +00002281
2282
showard89f84db2009-03-12 20:39:13 +00002283class AtomicGroup(DBObject):
2284 _table_name = 'atomic_groups'
showard205fd602009-03-21 00:17:35 +00002285 _fields = ('id', 'name', 'description', 'max_number_of_machines',
2286 'invalid')
showard89f84db2009-03-12 20:39:13 +00002287
2288
showard989f25d2008-10-01 11:38:11 +00002289class Label(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002290 _table_name = 'labels'
2291 _fields = ('id', 'name', 'kernel_config', 'platform', 'invalid',
showard89f84db2009-03-12 20:39:13 +00002292 'only_if_needed', 'atomic_group_id')
showard989f25d2008-10-01 11:38:11 +00002293
2294
mbligh36768f02008-02-22 18:28:33 +00002295class Host(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002296 _table_name = 'hosts'
2297 _fields = ('id', 'hostname', 'locked', 'synch_id', 'status',
2298 'invalid', 'protection', 'locked_by_id', 'lock_time', 'dirty')
2299
2300
jadmanski0afbb632008-06-06 21:10:57 +00002301 def current_task(self):
2302 rows = _db.execute("""
2303 SELECT * FROM host_queue_entries WHERE host_id=%s AND NOT complete AND active
2304 """, (self.id,))
2305
2306 if len(rows) == 0:
2307 return None
2308 else:
2309 assert len(rows) == 1
2310 results = rows[0];
jadmanski0afbb632008-06-06 21:10:57 +00002311 return HostQueueEntry(row=results)
mbligh36768f02008-02-22 18:28:33 +00002312
2313
jadmanski0afbb632008-06-06 21:10:57 +00002314 def yield_work(self):
showardb18134f2009-03-20 20:52:18 +00002315 logging.info("%s yielding work", self.hostname)
jadmanski0afbb632008-06-06 21:10:57 +00002316 if self.current_task():
2317 self.current_task().requeue()
2318
showard6ae5ea92009-02-25 00:11:51 +00002319
jadmanski0afbb632008-06-06 21:10:57 +00002320 def set_status(self,status):
showardb18134f2009-03-20 20:52:18 +00002321 logging.info('%s -> %s', self.hostname, status)
jadmanski0afbb632008-06-06 21:10:57 +00002322 self.update_field('status',status)
mbligh36768f02008-02-22 18:28:33 +00002323
2324
showard170873e2009-01-07 00:22:26 +00002325 def platform_and_labels(self):
showardd8e548a2008-09-09 03:04:57 +00002326 """
showard170873e2009-01-07 00:22:26 +00002327 Returns a tuple (platform_name, list_of_all_label_names).
showardd8e548a2008-09-09 03:04:57 +00002328 """
2329 rows = _db.execute("""
showard170873e2009-01-07 00:22:26 +00002330 SELECT labels.name, labels.platform
showardd8e548a2008-09-09 03:04:57 +00002331 FROM labels
2332 INNER JOIN hosts_labels ON labels.id = hosts_labels.label_id
showard170873e2009-01-07 00:22:26 +00002333 WHERE hosts_labels.host_id = %s
showardd8e548a2008-09-09 03:04:57 +00002334 ORDER BY labels.name
2335 """, (self.id,))
showard170873e2009-01-07 00:22:26 +00002336 platform = None
2337 all_labels = []
2338 for label_name, is_platform in rows:
2339 if is_platform:
2340 platform = label_name
2341 all_labels.append(label_name)
2342 return platform, all_labels
2343
2344
showarda64e52a2009-06-08 23:24:08 +00002345 def reverify_tasks(self, cleanup=True):
2346 tasks = [VerifyTask(host=self)]
showard6d7b2ff2009-06-10 00:16:47 +00002347 # just to make sure this host does not get taken away
2348 self.set_status('Verifying')
showarda64e52a2009-06-08 23:24:08 +00002349 if cleanup:
2350 tasks.insert(0, CleanupTask(host=self))
showard6d7b2ff2009-06-10 00:16:47 +00002351 self.set_status('Cleaning')
showarda64e52a2009-06-08 23:24:08 +00002352 return tasks
showardd8e548a2008-09-09 03:04:57 +00002353
2354
showard54c1ea92009-05-20 00:32:58 +00002355 _ALPHANUM_HOST_RE = re.compile(r'^([a-z-]+)(\d+)$', re.IGNORECASE)
2356
2357
2358 @classmethod
2359 def cmp_for_sort(cls, a, b):
2360 """
2361 A comparison function for sorting Host objects by hostname.
2362
2363 This strips any trailing numeric digits, ignores leading 0s and
2364 compares hostnames by the leading name and the trailing digits as a
2365 number. If both hostnames do not match this pattern, they are simply
2366 compared as lower case strings.
2367
2368 Example of how hostnames will be sorted:
2369
2370 alice, host1, host2, host09, host010, host10, host11, yolkfolk
2371
2372 This hopefully satisfy most people's hostname sorting needs regardless
2373 of their exact naming schemes. Nobody sane should have both a host10
2374 and host010 (but the algorithm works regardless).
2375 """
2376 lower_a = a.hostname.lower()
2377 lower_b = b.hostname.lower()
2378 match_a = cls._ALPHANUM_HOST_RE.match(lower_a)
2379 match_b = cls._ALPHANUM_HOST_RE.match(lower_b)
2380 if match_a and match_b:
2381 name_a, number_a_str = match_a.groups()
2382 name_b, number_b_str = match_b.groups()
2383 number_a = int(number_a_str.lstrip('0'))
2384 number_b = int(number_b_str.lstrip('0'))
2385 result = cmp((name_a, number_a), (name_b, number_b))
2386 if result == 0 and lower_a != lower_b:
2387 # If they compared equal above but the lower case names are
2388 # indeed different, don't report equality. abc012 != abc12.
2389 return cmp(lower_a, lower_b)
2390 return result
2391 else:
2392 return cmp(lower_a, lower_b)
2393
2394
mbligh36768f02008-02-22 18:28:33 +00002395class HostQueueEntry(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002396 _table_name = 'host_queue_entries'
2397 _fields = ('id', 'job_id', 'host_id', 'status', 'meta_host',
showard89f84db2009-03-12 20:39:13 +00002398 'active', 'complete', 'deleted', 'execution_subdir',
showard12f3e322009-05-13 21:27:42 +00002399 'atomic_group_id', 'aborted', 'started_on')
showard6ae5ea92009-02-25 00:11:51 +00002400
2401
showarda3c58572009-03-12 20:36:59 +00002402 def __init__(self, id=None, row=None, **kwargs):
jadmanski0afbb632008-06-06 21:10:57 +00002403 assert id or row
showarda3c58572009-03-12 20:36:59 +00002404 super(HostQueueEntry, self).__init__(id=id, row=row, **kwargs)
jadmanski0afbb632008-06-06 21:10:57 +00002405 self.job = Job(self.job_id)
mbligh36768f02008-02-22 18:28:33 +00002406
jadmanski0afbb632008-06-06 21:10:57 +00002407 if self.host_id:
2408 self.host = Host(self.host_id)
2409 else:
2410 self.host = None
mbligh36768f02008-02-22 18:28:33 +00002411
showard77182562009-06-10 00:16:05 +00002412 if self.atomic_group_id:
2413 self.atomic_group = AtomicGroup(self.atomic_group_id,
2414 always_query=False)
2415 else:
2416 self.atomic_group = None
2417
showard170873e2009-01-07 00:22:26 +00002418 self.queue_log_path = os.path.join(self.job.tag(),
jadmanski0afbb632008-06-06 21:10:57 +00002419 'queue.log.' + str(self.id))
mbligh36768f02008-02-22 18:28:33 +00002420
2421
showard89f84db2009-03-12 20:39:13 +00002422 @classmethod
2423 def clone(cls, template):
2424 """
2425 Creates a new row using the values from a template instance.
2426
2427 The new instance will not exist in the database or have a valid
2428 id attribute until its save() method is called.
2429 """
2430 assert isinstance(template, cls)
2431 new_row = [getattr(template, field) for field in cls._fields]
2432 clone = cls(row=new_row, new_record=True)
2433 clone.id = None
2434 return clone
2435
2436
showardc85c21b2008-11-24 22:17:37 +00002437 def _view_job_url(self):
2438 return "%s#tab_id=view_job&object_id=%s" % (_base_url, self.job.id)
2439
2440
showardf1ae3542009-05-11 19:26:02 +00002441 def get_labels(self):
2442 """
2443 Get all labels associated with this host queue entry (either via the
2444 meta_host or as a job dependency label). The labels yielded are not
2445 guaranteed to be unique.
2446
2447 @yields Label instances associated with this host_queue_entry.
2448 """
2449 if self.meta_host:
2450 yield Label(id=self.meta_host, always_query=False)
2451 labels = Label.fetch(
2452 joins="JOIN jobs_dependency_labels AS deps "
2453 "ON (labels.id = deps.label_id)",
2454 where="deps.job_id = %d" % self.job.id)
2455 for label in labels:
2456 yield label
2457
2458
jadmanski0afbb632008-06-06 21:10:57 +00002459 def set_host(self, host):
2460 if host:
2461 self.queue_log_record('Assigning host ' + host.hostname)
2462 self.update_field('host_id', host.id)
2463 self.update_field('active', True)
2464 self.block_host(host.id)
2465 else:
2466 self.queue_log_record('Releasing host')
2467 self.unblock_host(self.host.id)
2468 self.update_field('host_id', None)
mbligh36768f02008-02-22 18:28:33 +00002469
jadmanski0afbb632008-06-06 21:10:57 +00002470 self.host = host
mbligh36768f02008-02-22 18:28:33 +00002471
2472
jadmanski0afbb632008-06-06 21:10:57 +00002473 def get_host(self):
2474 return self.host
mbligh36768f02008-02-22 18:28:33 +00002475
2476
jadmanski0afbb632008-06-06 21:10:57 +00002477 def queue_log_record(self, log_line):
2478 now = str(datetime.datetime.now())
showard170873e2009-01-07 00:22:26 +00002479 _drone_manager.write_lines_to_file(self.queue_log_path,
2480 [now + ' ' + log_line])
mbligh36768f02008-02-22 18:28:33 +00002481
2482
jadmanski0afbb632008-06-06 21:10:57 +00002483 def block_host(self, host_id):
showardb18134f2009-03-20 20:52:18 +00002484 logging.info("creating block %s/%s", self.job.id, host_id)
jadmanski0afbb632008-06-06 21:10:57 +00002485 row = [0, self.job.id, host_id]
2486 block = IneligibleHostQueue(row=row, new_record=True)
2487 block.save()
mblighe2586682008-02-29 22:45:46 +00002488
2489
jadmanski0afbb632008-06-06 21:10:57 +00002490 def unblock_host(self, host_id):
showardb18134f2009-03-20 20:52:18 +00002491 logging.info("removing block %s/%s", self.job.id, host_id)
jadmanski0afbb632008-06-06 21:10:57 +00002492 blocks = IneligibleHostQueue.fetch(
2493 'job_id=%d and host_id=%d' % (self.job.id, host_id))
2494 for block in blocks:
2495 block.delete()
mblighe2586682008-02-29 22:45:46 +00002496
2497
showard2bab8f42008-11-12 18:15:22 +00002498 def set_execution_subdir(self, subdir=None):
2499 if subdir is None:
2500 assert self.get_host()
2501 subdir = self.get_host().hostname
2502 self.update_field('execution_subdir', subdir)
mbligh36768f02008-02-22 18:28:33 +00002503
2504
showard6355f6b2008-12-05 18:52:13 +00002505 def _get_hostname(self):
2506 if self.host:
2507 return self.host.hostname
2508 return 'no host'
2509
2510
showard170873e2009-01-07 00:22:26 +00002511 def __str__(self):
2512 return "%s/%d (%d)" % (self._get_hostname(), self.job.id, self.id)
2513
2514
jadmanski0afbb632008-06-06 21:10:57 +00002515 def set_status(self, status):
showardd3dc1992009-04-22 21:01:40 +00002516 self.update_field('status', status)
mblighf8c624d2008-07-03 16:58:45 +00002517
showardb18134f2009-03-20 20:52:18 +00002518 logging.info("%s -> %s", self, self.status)
mblighf8c624d2008-07-03 16:58:45 +00002519
showardc85c21b2008-11-24 22:17:37 +00002520 if status in ['Queued', 'Parsing']:
jadmanski0afbb632008-06-06 21:10:57 +00002521 self.update_field('complete', False)
2522 self.update_field('active', False)
mbligh36768f02008-02-22 18:28:33 +00002523
jadmanski0afbb632008-06-06 21:10:57 +00002524 if status in ['Pending', 'Running', 'Verifying', 'Starting',
showardd3dc1992009-04-22 21:01:40 +00002525 'Gathering']:
jadmanski0afbb632008-06-06 21:10:57 +00002526 self.update_field('complete', False)
2527 self.update_field('active', True)
mbligh36768f02008-02-22 18:28:33 +00002528
showardc85c21b2008-11-24 22:17:37 +00002529 if status in ['Failed', 'Completed', 'Stopped', 'Aborted']:
jadmanski0afbb632008-06-06 21:10:57 +00002530 self.update_field('complete', True)
2531 self.update_field('active', False)
showardc85c21b2008-11-24 22:17:37 +00002532
2533 should_email_status = (status.lower() in _notify_email_statuses or
2534 'all' in _notify_email_statuses)
2535 if should_email_status:
2536 self._email_on_status(status)
2537
2538 self._email_on_job_complete()
2539
2540
2541 def _email_on_status(self, status):
showard6355f6b2008-12-05 18:52:13 +00002542 hostname = self._get_hostname()
showardc85c21b2008-11-24 22:17:37 +00002543
2544 subject = 'Autotest: Job ID: %s "%s" Host: %s %s' % (
2545 self.job.id, self.job.name, hostname, status)
2546 body = "Job ID: %s\nJob Name: %s\nHost: %s\nStatus: %s\n%s\n" % (
2547 self.job.id, self.job.name, hostname, status,
2548 self._view_job_url())
showard170873e2009-01-07 00:22:26 +00002549 email_manager.manager.send_email(self.job.email_list, subject, body)
showard542e8402008-09-19 20:16:18 +00002550
2551
2552 def _email_on_job_complete(self):
showardc85c21b2008-11-24 22:17:37 +00002553 if not self.job.is_finished():
2554 return
showard542e8402008-09-19 20:16:18 +00002555
showardc85c21b2008-11-24 22:17:37 +00002556 summary_text = []
showard6355f6b2008-12-05 18:52:13 +00002557 hosts_queue = HostQueueEntry.fetch('job_id = %s' % self.job.id)
showardc85c21b2008-11-24 22:17:37 +00002558 for queue_entry in hosts_queue:
2559 summary_text.append("Host: %s Status: %s" %
showard6355f6b2008-12-05 18:52:13 +00002560 (queue_entry._get_hostname(),
showardc85c21b2008-11-24 22:17:37 +00002561 queue_entry.status))
2562
2563 summary_text = "\n".join(summary_text)
2564 status_counts = models.Job.objects.get_status_counts(
2565 [self.job.id])[self.job.id]
2566 status = ', '.join('%d %s' % (count, status) for status, count
2567 in status_counts.iteritems())
2568
2569 subject = 'Autotest: Job ID: %s "%s" %s' % (
2570 self.job.id, self.job.name, status)
2571 body = "Job ID: %s\nJob Name: %s\nStatus: %s\n%s\nSummary:\n%s" % (
2572 self.job.id, self.job.name, status, self._view_job_url(),
2573 summary_text)
showard170873e2009-01-07 00:22:26 +00002574 email_manager.manager.send_email(self.job.email_list, subject, body)
mbligh36768f02008-02-22 18:28:33 +00002575
2576
showard77182562009-06-10 00:16:05 +00002577 def run_pre_job_tasks(self, assigned_host=None):
2578 if self.meta_host is not None or self.atomic_group:
jadmanski0afbb632008-06-06 21:10:57 +00002579 assert assigned_host
2580 # ensure results dir exists for the queue log
showard08356c12009-06-15 20:24:01 +00002581 if self.host_id is None:
2582 self.set_host(assigned_host)
2583 else:
2584 assert assigned_host.id == self.host_id
mbligh36768f02008-02-22 18:28:33 +00002585
showardb18134f2009-03-20 20:52:18 +00002586 logging.info("%s/%s/%s scheduled on %s, status=%s",
2587 self.job.name, self.meta_host, self.atomic_group_id,
2588 self.host.hostname, self.status)
mbligh36768f02008-02-22 18:28:33 +00002589
showard77182562009-06-10 00:16:05 +00002590 return self._do_run_pre_job_tasks()
2591
2592
2593 def _do_run_pre_job_tasks(self):
2594 # Every host goes thru the Verifying stage (which may or may not
2595 # actually do anything as determined by get_pre_job_tasks).
2596 self.set_status(models.HostQueueEntry.Status.VERIFYING)
2597
2598 # The pre job tasks always end with a SetEntryPendingTask which
2599 # will continue as appropriate through queue_entry.on_pending().
2600 return Agent(self.job.get_pre_job_tasks(queue_entry=self))
mblighe2586682008-02-29 22:45:46 +00002601
showard6ae5ea92009-02-25 00:11:51 +00002602
jadmanski0afbb632008-06-06 21:10:57 +00002603 def requeue(self):
2604 self.set_status('Queued')
showard12f3e322009-05-13 21:27:42 +00002605 self.update_field('started_on', None)
showardde634ee2009-01-30 01:44:24 +00002606 # verify/cleanup failure sets the execution subdir, so reset it here
2607 self.set_execution_subdir('')
jadmanski0afbb632008-06-06 21:10:57 +00002608 if self.meta_host:
2609 self.set_host(None)
mbligh36768f02008-02-22 18:28:33 +00002610
2611
jadmanski0afbb632008-06-06 21:10:57 +00002612 def handle_host_failure(self):
2613 """\
2614 Called when this queue entry's host has failed verification and
2615 repair.
2616 """
2617 assert not self.meta_host
2618 self.set_status('Failed')
showard2bab8f42008-11-12 18:15:22 +00002619 self.job.stop_if_necessary()
mblighe2586682008-02-29 22:45:46 +00002620
2621
jadmanskif7fa2cc2008-10-01 14:13:23 +00002622 @property
2623 def aborted_by(self):
2624 self._load_abort_info()
2625 return self._aborted_by
2626
2627
2628 @property
2629 def aborted_on(self):
2630 self._load_abort_info()
2631 return self._aborted_on
2632
2633
2634 def _load_abort_info(self):
2635 """ Fetch info about who aborted the job. """
2636 if hasattr(self, "_aborted_by"):
2637 return
2638 rows = _db.execute("""
2639 SELECT users.login, aborted_host_queue_entries.aborted_on
2640 FROM aborted_host_queue_entries
2641 INNER JOIN users
2642 ON users.id = aborted_host_queue_entries.aborted_by_id
2643 WHERE aborted_host_queue_entries.queue_entry_id = %s
2644 """, (self.id,))
2645 if rows:
2646 self._aborted_by, self._aborted_on = rows[0]
2647 else:
2648 self._aborted_by = self._aborted_on = None
2649
2650
showardb2e2c322008-10-14 17:33:55 +00002651 def on_pending(self):
2652 """
2653 Called when an entry in a synchronous job has passed verify. If the
2654 job is ready to run, returns an agent to run the job. Returns None
2655 otherwise.
2656 """
2657 self.set_status('Pending')
showardcfd66a32008-10-15 20:31:48 +00002658 self.get_host().set_status('Pending')
showard77182562009-06-10 00:16:05 +00002659 return self.job.run_if_ready(queue_entry=self)
showardb2e2c322008-10-14 17:33:55 +00002660
2661
showardd3dc1992009-04-22 21:01:40 +00002662 def abort(self, dispatcher):
2663 assert self.aborted and not self.complete
showard1be97432008-10-17 15:30:45 +00002664
showardd3dc1992009-04-22 21:01:40 +00002665 Status = models.HostQueueEntry.Status
2666 has_running_job_agent = (
2667 self.status in (Status.RUNNING, Status.GATHERING, Status.PARSING)
2668 and dispatcher.get_agents_for_entry(self))
2669 if has_running_job_agent:
2670 # do nothing; post-job tasks will finish and then mark this entry
2671 # with status "Aborted" and take care of the host
2672 return
2673
2674 if self.status in (Status.STARTING, Status.PENDING):
2675 self.host.set_status(models.Host.Status.READY)
2676 elif self.status == Status.VERIFYING:
2677 dispatcher.add_agent(Agent(tasks=self.host.reverify_tasks()))
2678
2679 self.set_status(Status.ABORTED)
showard170873e2009-01-07 00:22:26 +00002680
2681 def execution_tag(self):
2682 assert self.execution_subdir
2683 return "%s-%s/%s" % (self.job.id, self.job.owner, self.execution_subdir)
showard1be97432008-10-17 15:30:45 +00002684
2685
mbligh36768f02008-02-22 18:28:33 +00002686class Job(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002687 _table_name = 'jobs'
2688 _fields = ('id', 'owner', 'name', 'priority', 'control_file',
2689 'control_type', 'created_on', 'synch_count', 'timeout',
showarda1e74b32009-05-12 17:32:04 +00002690 'run_verify', 'email_list', 'reboot_before', 'reboot_after',
showard12f3e322009-05-13 21:27:42 +00002691 'parse_failed_repair', 'max_runtime_hrs')
showard6ae5ea92009-02-25 00:11:51 +00002692
showard77182562009-06-10 00:16:05 +00002693 # This does not need to be a column in the DB. The delays are likely to
2694 # be configured short. If the scheduler is stopped and restarted in
2695 # the middle of a job's delay cycle, the delay cycle will either be
2696 # repeated or skipped depending on the number of Pending machines found
2697 # when the restarted scheduler recovers to track it. Not a problem.
2698 #
2699 # A reference to the DelayedCallTask that will wake up the job should
2700 # no other HQEs change state in time. Its end_time attribute is used
2701 # by our run_with_ready_delay() method to determine if the wait is over.
2702 _delay_ready_task = None
2703
2704 # TODO(gps): On scheduler start/recovery we need to call HQE.on_pending() on
2705 # all status='Pending' atomic group HQEs incase a delay was running when the
2706 # scheduler was restarted and no more hosts ever successfully exit Verify.
showard6ae5ea92009-02-25 00:11:51 +00002707
showarda3c58572009-03-12 20:36:59 +00002708 def __init__(self, id=None, row=None, **kwargs):
jadmanski0afbb632008-06-06 21:10:57 +00002709 assert id or row
showarda3c58572009-03-12 20:36:59 +00002710 super(Job, self).__init__(id=id, row=row, **kwargs)
mbligh36768f02008-02-22 18:28:33 +00002711
mblighe2586682008-02-29 22:45:46 +00002712
jadmanski0afbb632008-06-06 21:10:57 +00002713 def is_server_job(self):
2714 return self.control_type != 2
mbligh36768f02008-02-22 18:28:33 +00002715
2716
showard170873e2009-01-07 00:22:26 +00002717 def tag(self):
2718 return "%s-%s" % (self.id, self.owner)
2719
2720
jadmanski0afbb632008-06-06 21:10:57 +00002721 def get_host_queue_entries(self):
2722 rows = _db.execute("""
2723 SELECT * FROM host_queue_entries
2724 WHERE job_id= %s
2725 """, (self.id,))
2726 entries = [HostQueueEntry(row=i) for i in rows]
mbligh36768f02008-02-22 18:28:33 +00002727
jadmanski0afbb632008-06-06 21:10:57 +00002728 assert len(entries)>0
mbligh36768f02008-02-22 18:28:33 +00002729
jadmanski0afbb632008-06-06 21:10:57 +00002730 return entries
mbligh36768f02008-02-22 18:28:33 +00002731
2732
jadmanski0afbb632008-06-06 21:10:57 +00002733 def set_status(self, status, update_queues=False):
2734 self.update_field('status',status)
2735
2736 if update_queues:
2737 for queue_entry in self.get_host_queue_entries():
2738 queue_entry.set_status(status)
mbligh36768f02008-02-22 18:28:33 +00002739
2740
showard77182562009-06-10 00:16:05 +00002741 def _atomic_and_has_started(self):
2742 """
2743 @returns True if any of the HostQueueEntries associated with this job
2744 have entered the Status.STARTING state or beyond.
2745 """
2746 atomic_entries = models.HostQueueEntry.objects.filter(
2747 job=self.id, atomic_group__isnull=False)
2748 if atomic_entries.count() <= 0:
2749 return False
2750
2751 non_started_statuses = (models.HostQueueEntry.Status.QUEUED,
2752 models.HostQueueEntry.Status.VERIFYING,
2753 models.HostQueueEntry.Status.PENDING)
2754 started_entries = atomic_entries.exclude(
2755 status__in=non_started_statuses)
2756 return started_entries.count() > 0
2757
2758
2759 def _pending_count(self):
2760 """The number of HostQueueEntries for this job in the Pending state."""
2761 pending_entries = models.HostQueueEntry.objects.filter(
2762 job=self.id, status=models.HostQueueEntry.Status.PENDING)
2763 return pending_entries.count()
2764
2765
jadmanski0afbb632008-06-06 21:10:57 +00002766 def is_ready(self):
showard77182562009-06-10 00:16:05 +00002767 # NOTE: Atomic group jobs stop reporting ready after they have been
2768 # started to avoid launching multiple copies of one atomic job.
2769 # Only possible if synch_count is less than than half the number of
2770 # machines in the atomic group.
2771 return (self._pending_count() >= self.synch_count
2772 and not self._atomic_and_has_started())
mbligh36768f02008-02-22 18:28:33 +00002773
2774
jadmanski0afbb632008-06-06 21:10:57 +00002775 def num_machines(self, clause = None):
2776 sql = "job_id=%s" % self.id
2777 if clause:
2778 sql += " AND (%s)" % clause
2779 return self.count(sql, table='host_queue_entries')
mbligh36768f02008-02-22 18:28:33 +00002780
2781
jadmanski0afbb632008-06-06 21:10:57 +00002782 def num_queued(self):
2783 return self.num_machines('not complete')
mbligh36768f02008-02-22 18:28:33 +00002784
2785
jadmanski0afbb632008-06-06 21:10:57 +00002786 def num_active(self):
2787 return self.num_machines('active')
mbligh36768f02008-02-22 18:28:33 +00002788
2789
jadmanski0afbb632008-06-06 21:10:57 +00002790 def num_complete(self):
2791 return self.num_machines('complete')
mbligh36768f02008-02-22 18:28:33 +00002792
2793
jadmanski0afbb632008-06-06 21:10:57 +00002794 def is_finished(self):
showardc85c21b2008-11-24 22:17:37 +00002795 return self.num_complete() == self.num_machines()
mbligh36768f02008-02-22 18:28:33 +00002796
mbligh36768f02008-02-22 18:28:33 +00002797
showard6bb7c292009-01-30 01:44:51 +00002798 def _not_yet_run_entries(self, include_verifying=True):
2799 statuses = [models.HostQueueEntry.Status.QUEUED,
2800 models.HostQueueEntry.Status.PENDING]
2801 if include_verifying:
2802 statuses.append(models.HostQueueEntry.Status.VERIFYING)
2803 return models.HostQueueEntry.objects.filter(job=self.id,
2804 status__in=statuses)
2805
2806
2807 def _stop_all_entries(self):
2808 entries_to_stop = self._not_yet_run_entries(
2809 include_verifying=False)
2810 for child_entry in entries_to_stop:
showard4f9e5372009-01-07 21:33:38 +00002811 assert not child_entry.complete, (
2812 '%s status=%s, active=%s, complete=%s' %
2813 (child_entry.id, child_entry.status, child_entry.active,
2814 child_entry.complete))
showard2bab8f42008-11-12 18:15:22 +00002815 if child_entry.status == models.HostQueueEntry.Status.PENDING:
2816 child_entry.host.status = models.Host.Status.READY
2817 child_entry.host.save()
2818 child_entry.status = models.HostQueueEntry.Status.STOPPED
2819 child_entry.save()
2820
showard2bab8f42008-11-12 18:15:22 +00002821 def stop_if_necessary(self):
showard6bb7c292009-01-30 01:44:51 +00002822 not_yet_run = self._not_yet_run_entries()
showard2bab8f42008-11-12 18:15:22 +00002823 if not_yet_run.count() < self.synch_count:
showard6bb7c292009-01-30 01:44:51 +00002824 self._stop_all_entries()
mblighe2586682008-02-29 22:45:46 +00002825
2826
jadmanski0afbb632008-06-06 21:10:57 +00002827 def write_to_machines_file(self, queue_entry):
2828 hostname = queue_entry.get_host().hostname
showard170873e2009-01-07 00:22:26 +00002829 file_path = os.path.join(self.tag(), '.machines')
2830 _drone_manager.write_lines_to_file(file_path, [hostname])
mbligh36768f02008-02-22 18:28:33 +00002831
2832
showardf1ae3542009-05-11 19:26:02 +00002833 def _next_group_name(self, group_name=''):
2834 """@returns a directory name to use for the next host group results."""
2835 if group_name:
2836 # Sanitize for use as a pathname.
2837 group_name = group_name.replace(os.path.sep, '_')
2838 if group_name.startswith('.'):
2839 group_name = '_' + group_name[1:]
2840 # Add a separator between the group name and 'group%d'.
2841 group_name += '.'
2842 group_count_re = re.compile(r'%sgroup(\d+)' % re.escape(group_name))
showard2bab8f42008-11-12 18:15:22 +00002843 query = models.HostQueueEntry.objects.filter(
2844 job=self.id).values('execution_subdir').distinct()
2845 subdirs = (entry['execution_subdir'] for entry in query)
showardf1ae3542009-05-11 19:26:02 +00002846 group_matches = (group_count_re.match(subdir) for subdir in subdirs)
2847 ids = [int(match.group(1)) for match in group_matches if match]
showard2bab8f42008-11-12 18:15:22 +00002848 if ids:
2849 next_id = max(ids) + 1
2850 else:
2851 next_id = 0
showardf1ae3542009-05-11 19:26:02 +00002852 return '%sgroup%d' % (group_name, next_id)
showard2bab8f42008-11-12 18:15:22 +00002853
2854
showard170873e2009-01-07 00:22:26 +00002855 def _write_control_file(self, execution_tag):
2856 control_path = _drone_manager.attach_file_to_execution(
2857 execution_tag, self.control_file)
2858 return control_path
mbligh36768f02008-02-22 18:28:33 +00002859
showardb2e2c322008-10-14 17:33:55 +00002860
showard2bab8f42008-11-12 18:15:22 +00002861 def get_group_entries(self, queue_entry_from_group):
2862 execution_subdir = queue_entry_from_group.execution_subdir
showarde788ea62008-11-17 21:02:47 +00002863 return list(HostQueueEntry.fetch(
2864 where='job_id=%s AND execution_subdir=%s',
2865 params=(self.id, execution_subdir)))
showard2bab8f42008-11-12 18:15:22 +00002866
2867
showardb2e2c322008-10-14 17:33:55 +00002868 def _get_autoserv_params(self, queue_entries):
showard170873e2009-01-07 00:22:26 +00002869 assert queue_entries
2870 execution_tag = queue_entries[0].execution_tag()
2871 control_path = self._write_control_file(execution_tag)
jadmanski0afbb632008-06-06 21:10:57 +00002872 hostnames = ','.join([entry.get_host().hostname
2873 for entry in queue_entries])
mbligh36768f02008-02-22 18:28:33 +00002874
showard87ba02a2009-04-20 19:37:32 +00002875 params = _autoserv_command_line(
2876 hostnames, execution_tag,
2877 ['-P', execution_tag, '-n',
2878 _drone_manager.absolute_path(control_path)],
2879 job=self)
mbligh36768f02008-02-22 18:28:33 +00002880
jadmanski0afbb632008-06-06 21:10:57 +00002881 if not self.is_server_job():
2882 params.append('-c')
mbligh36768f02008-02-22 18:28:33 +00002883
showardb2e2c322008-10-14 17:33:55 +00002884 return params
mblighe2586682008-02-29 22:45:46 +00002885
mbligh36768f02008-02-22 18:28:33 +00002886
showardc9ae1782009-01-30 01:42:37 +00002887 def _should_run_cleanup(self, queue_entry):
showard0fc38302008-10-23 00:44:07 +00002888 if self.reboot_before == models.RebootBefore.ALWAYS:
showardc9ae1782009-01-30 01:42:37 +00002889 return True
showard0fc38302008-10-23 00:44:07 +00002890 elif self.reboot_before == models.RebootBefore.IF_DIRTY:
showardc9ae1782009-01-30 01:42:37 +00002891 return queue_entry.get_host().dirty
2892 return False
showard21baa452008-10-21 00:08:39 +00002893
showardc9ae1782009-01-30 01:42:37 +00002894
2895 def _should_run_verify(self, queue_entry):
2896 do_not_verify = (queue_entry.host.protection ==
2897 host_protections.Protection.DO_NOT_VERIFY)
2898 if do_not_verify:
2899 return False
2900 return self.run_verify
2901
2902
showard77182562009-06-10 00:16:05 +00002903 def get_pre_job_tasks(self, queue_entry):
2904 """
2905 Get a list of tasks to perform before the host_queue_entry
2906 may be used to run this Job (such as Cleanup & Verify).
2907
2908 @returns A list of tasks to be done to the given queue_entry before
2909 it should be considered be ready to run this job. The last
2910 task in the list calls HostQueueEntry.on_pending(), which
2911 continues the flow of the job.
2912 """
showard21baa452008-10-21 00:08:39 +00002913 tasks = []
showardc9ae1782009-01-30 01:42:37 +00002914 if self._should_run_cleanup(queue_entry):
showard45ae8192008-11-05 19:32:53 +00002915 tasks.append(CleanupTask(queue_entry=queue_entry))
showardc9ae1782009-01-30 01:42:37 +00002916 if self._should_run_verify(queue_entry):
2917 tasks.append(VerifyTask(queue_entry=queue_entry))
2918 tasks.append(SetEntryPendingTask(queue_entry))
showard21baa452008-10-21 00:08:39 +00002919 return tasks
2920
2921
showardf1ae3542009-05-11 19:26:02 +00002922 def _assign_new_group(self, queue_entries, group_name=''):
showard2bab8f42008-11-12 18:15:22 +00002923 if len(queue_entries) == 1:
showardf1ae3542009-05-11 19:26:02 +00002924 group_subdir_name = queue_entries[0].get_host().hostname
showard2bab8f42008-11-12 18:15:22 +00002925 else:
showardf1ae3542009-05-11 19:26:02 +00002926 group_subdir_name = self._next_group_name(group_name)
showardb18134f2009-03-20 20:52:18 +00002927 logging.info('Running synchronous job %d hosts %s as %s',
showard2bab8f42008-11-12 18:15:22 +00002928 self.id, [entry.host.hostname for entry in queue_entries],
showardf1ae3542009-05-11 19:26:02 +00002929 group_subdir_name)
showard2bab8f42008-11-12 18:15:22 +00002930
2931 for queue_entry in queue_entries:
showardf1ae3542009-05-11 19:26:02 +00002932 queue_entry.set_execution_subdir(group_subdir_name)
showard2bab8f42008-11-12 18:15:22 +00002933
2934
2935 def _choose_group_to_run(self, include_queue_entry):
showardf1ae3542009-05-11 19:26:02 +00002936 """
2937 @returns A tuple containing a list of HostQueueEntry instances to be
2938 used to run this Job, a string group name to suggest giving
showard54c1ea92009-05-20 00:32:58 +00002939 to this job in the results database.
showardf1ae3542009-05-11 19:26:02 +00002940 """
showard77182562009-06-10 00:16:05 +00002941 atomic_group = include_queue_entry.atomic_group
showard2bab8f42008-11-12 18:15:22 +00002942 chosen_entries = [include_queue_entry]
showardf1ae3542009-05-11 19:26:02 +00002943 if atomic_group:
2944 num_entries_wanted = atomic_group.max_number_of_machines
2945 else:
2946 num_entries_wanted = self.synch_count
2947 num_entries_wanted -= len(chosen_entries)
showard2bab8f42008-11-12 18:15:22 +00002948
showardf1ae3542009-05-11 19:26:02 +00002949 if num_entries_wanted > 0:
2950 where_clause = 'job_id = %s AND status = "Pending" AND id != %s'
showard54c1ea92009-05-20 00:32:58 +00002951 pending_entries = list(HostQueueEntry.fetch(
showardf1ae3542009-05-11 19:26:02 +00002952 where=where_clause,
showard54c1ea92009-05-20 00:32:58 +00002953 params=(self.id, include_queue_entry.id)))
2954
2955 # Sort the chosen hosts by hostname before slicing.
2956 def cmp_queue_entries_by_hostname(entry_a, entry_b):
2957 return Host.cmp_for_sort(entry_a.host, entry_b.host)
2958 pending_entries.sort(cmp=cmp_queue_entries_by_hostname)
2959 chosen_entries += pending_entries[:num_entries_wanted]
showard2bab8f42008-11-12 18:15:22 +00002960
showardf1ae3542009-05-11 19:26:02 +00002961 # Sanity check. We'll only ever be called if this can be met.
2962 assert len(chosen_entries) >= self.synch_count
2963
2964 if atomic_group:
2965 # Look at any meta_host and dependency labels and pick the first
2966 # one that also specifies this atomic group. Use that label name
2967 # as the group name if possible (it is more specific).
2968 group_name = atomic_group.name
2969 for label in include_queue_entry.get_labels():
2970 if label.atomic_group_id:
2971 assert label.atomic_group_id == atomic_group.id
2972 group_name = label.name
2973 break
2974 else:
2975 group_name = ''
2976
2977 self._assign_new_group(chosen_entries, group_name=group_name)
2978 return chosen_entries, group_name
showard2bab8f42008-11-12 18:15:22 +00002979
2980
showard77182562009-06-10 00:16:05 +00002981 def run_if_ready(self, queue_entry):
2982 """
2983 @returns An Agent instance to ultimately run this job if enough hosts
2984 are ready for it to run.
2985 @returns None and potentially cleans up excess hosts if this Job
2986 is not ready to run.
2987 """
showardb2e2c322008-10-14 17:33:55 +00002988 if not self.is_ready():
showard77182562009-06-10 00:16:05 +00002989 self.stop_if_necessary()
2990 return None
mbligh36768f02008-02-22 18:28:33 +00002991
showard77182562009-06-10 00:16:05 +00002992 if queue_entry.atomic_group:
2993 return self.run_with_ready_delay(queue_entry)
2994
2995 return self.run(queue_entry)
2996
2997
2998 def run_with_ready_delay(self, queue_entry):
2999 """
3000 Start a delay to wait for more hosts to enter Pending state before
3001 launching an atomic group job. Once set, the a delay cannot be reset.
3002
3003 @param queue_entry: The HostQueueEntry object to get atomic group
3004 info from and pass to run_if_ready when the delay is up.
3005
3006 @returns An Agent to run the job as appropriate or None if a delay
3007 has already been set.
3008 """
3009 assert queue_entry.job_id == self.id
3010 assert queue_entry.atomic_group
3011 delay = scheduler_config.config.secs_to_wait_for_atomic_group_hosts
3012 pending_threshold = queue_entry.atomic_group.max_number_of_machines
3013 over_max_threshold = (self._pending_count() >= pending_threshold)
3014 delay_expired = (self._delay_ready_task and
3015 time.time() >= self._delay_ready_task.end_time)
3016
3017 # Delay is disabled or we already have enough? Do not wait to run.
3018 if not delay or over_max_threshold or delay_expired:
3019 return self.run(queue_entry)
3020
3021 # A delay was previously scheduled.
3022 if self._delay_ready_task:
3023 return None
3024
3025 def run_job_after_delay():
3026 logging.info('Job %s done waiting for extra hosts.', self.id)
3027 return self.run(queue_entry)
3028
3029 self._delay_ready_task = DelayedCallTask(delay_seconds=delay,
3030 callback=run_job_after_delay)
3031
3032 return Agent([self._delay_ready_task], num_processes=0)
3033
3034
3035 def run(self, queue_entry):
3036 """
3037 @param queue_entry: The HostQueueEntry instance calling this method.
3038 @returns An Agent instance to run this job or None if we've already
3039 been run.
3040 """
3041 if queue_entry.atomic_group and self._atomic_and_has_started():
3042 logging.error('Job.run() called on running atomic Job %d '
3043 'with HQE %s.', self.id, queue_entry)
3044 return None
showardf1ae3542009-05-11 19:26:02 +00003045 queue_entries, group_name = self._choose_group_to_run(queue_entry)
3046 return self._finish_run(queue_entries, group_name)
showardb2e2c322008-10-14 17:33:55 +00003047
3048
showardf1ae3542009-05-11 19:26:02 +00003049 def _finish_run(self, queue_entries, group_name):
showardb2ccdda2008-10-28 20:39:05 +00003050 for queue_entry in queue_entries:
3051 queue_entry.set_status('Starting')
showardb2e2c322008-10-14 17:33:55 +00003052 params = self._get_autoserv_params(queue_entries)
3053 queue_task = QueueTask(job=self, queue_entries=queue_entries,
showardf1ae3542009-05-11 19:26:02 +00003054 cmd=params, group_name=group_name)
3055 tasks = [queue_task]
showard77182562009-06-10 00:16:05 +00003056 if self._delay_ready_task:
3057 # Cancel any pending callback that would try to run again
3058 # as we are already running.
3059 self._delay_ready_task.abort()
showardb2e2c322008-10-14 17:33:55 +00003060
showard170873e2009-01-07 00:22:26 +00003061 return Agent(tasks, num_processes=len(queue_entries))
showardb2e2c322008-10-14 17:33:55 +00003062
3063
mbligh36768f02008-02-22 18:28:33 +00003064if __name__ == '__main__':
jadmanski0afbb632008-06-06 21:10:57 +00003065 main()