blob: 0d844fb75b13d62df81b4bff23c1c4ad0c95d9d6 [file] [log] [blame]
mbligh36768f02008-02-22 18:28:33 +00001#!/usr/bin/python -u
2
3"""
4Autotest scheduler
5"""
showard909c7a62008-07-15 21:52:38 +00006
mbligh36768f02008-02-22 18:28:33 +00007
showardef519212009-05-08 02:29:53 +00008import datetime, errno, optparse, os, pwd, Queue, re, shutil, signal
showard542e8402008-09-19 20:16:18 +00009import smtplib, socket, stat, subprocess, sys, tempfile, time, traceback
showard136e6dc2009-06-10 19:38:49 +000010import itertools, logging, weakref
mbligh70feeee2008-06-11 16:20:49 +000011import common
mbligh8bcd23a2009-02-03 19:14:06 +000012import MySQLdb
showard043c62a2009-06-10 19:48:57 +000013from autotest_lib.scheduler import scheduler_logging_config
showard21baa452008-10-21 00:08:39 +000014from autotest_lib.frontend import setup_django_environment
showard136e6dc2009-06-10 19:38:49 +000015from autotest_lib.client.common_lib import global_config, logging_manager
showardb18134f2009-03-20 20:52:18 +000016from autotest_lib.client.common_lib import host_protections, utils
showardb1e51872008-10-07 11:08:18 +000017from autotest_lib.database import database_connection
showard844960a2009-05-29 18:41:18 +000018from autotest_lib.frontend.afe import models, rpc_utils, readonly_connection
showard170873e2009-01-07 00:22:26 +000019from autotest_lib.scheduler import drone_manager, drones, email_manager
showard043c62a2009-06-10 19:48:57 +000020from autotest_lib.scheduler import monitor_db_cleanup
showardd1ee1dd2009-01-07 21:33:08 +000021from autotest_lib.scheduler import status_server, scheduler_config
mbligh70feeee2008-06-11 16:20:49 +000022
mblighb090f142008-02-27 21:33:46 +000023
mbligh36768f02008-02-22 18:28:33 +000024RESULTS_DIR = '.'
25AUTOSERV_NICE_LEVEL = 10
showard170873e2009-01-07 00:22:26 +000026DB_CONFIG_SECTION = 'AUTOTEST_WEB'
mbligh36768f02008-02-22 18:28:33 +000027AUTOTEST_PATH = os.path.join(os.path.dirname(__file__), '..')
28
29if os.environ.has_key('AUTOTEST_DIR'):
jadmanski0afbb632008-06-06 21:10:57 +000030 AUTOTEST_PATH = os.environ['AUTOTEST_DIR']
mbligh36768f02008-02-22 18:28:33 +000031AUTOTEST_SERVER_DIR = os.path.join(AUTOTEST_PATH, 'server')
32AUTOTEST_TKO_DIR = os.path.join(AUTOTEST_PATH, 'tko')
33
34if AUTOTEST_SERVER_DIR not in sys.path:
jadmanski0afbb632008-06-06 21:10:57 +000035 sys.path.insert(0, AUTOTEST_SERVER_DIR)
mbligh36768f02008-02-22 18:28:33 +000036
mbligh90a549d2008-03-25 23:52:34 +000037# how long to wait for autoserv to write a pidfile
38PIDFILE_TIMEOUT = 5 * 60 # 5 min
mblighbb421852008-03-11 22:36:16 +000039
showardd3dc1992009-04-22 21:01:40 +000040_AUTOSERV_PID_FILE = '.autoserv_execute'
41_CRASHINFO_PID_FILE = '.collect_crashinfo_execute'
42_PARSER_PID_FILE = '.parser_execute'
43
44_ALL_PIDFILE_NAMES = (_AUTOSERV_PID_FILE, _CRASHINFO_PID_FILE,
45 _PARSER_PID_FILE)
46
showard35162b02009-03-03 02:17:30 +000047# error message to leave in results dir when an autoserv process disappears
48# mysteriously
49_LOST_PROCESS_ERROR = """\
50Autoserv failed abnormally during execution for this job, probably due to a
51system error on the Autotest server. Full results may not be available. Sorry.
52"""
53
mbligh6f8bab42008-02-29 22:45:14 +000054_db = None
mbligh36768f02008-02-22 18:28:33 +000055_shutdown = False
showard170873e2009-01-07 00:22:26 +000056_autoserv_path = os.path.join(drones.AUTOTEST_INSTALL_DIR, 'server', 'autoserv')
57_parser_path = os.path.join(drones.AUTOTEST_INSTALL_DIR, 'tko', 'parse')
mbligh4314a712008-02-29 22:44:30 +000058_testing_mode = False
showard542e8402008-09-19 20:16:18 +000059_base_url = None
showardc85c21b2008-11-24 22:17:37 +000060_notify_email_statuses = []
showard170873e2009-01-07 00:22:26 +000061_drone_manager = drone_manager.DroneManager()
mbligh36768f02008-02-22 18:28:33 +000062
63
mbligh83c1e9e2009-05-01 23:10:41 +000064def _site_init_monitor_db_dummy():
65 return {}
66
67
mbligh36768f02008-02-22 18:28:33 +000068def main():
showard27f33872009-04-07 18:20:53 +000069 try:
70 main_without_exception_handling()
showard29caa4b2009-05-26 19:27:09 +000071 except SystemExit:
72 raise
showard27f33872009-04-07 18:20:53 +000073 except:
74 logging.exception('Exception escaping in monitor_db')
75 raise
76
77
78def main_without_exception_handling():
showard136e6dc2009-06-10 19:38:49 +000079 setup_logging()
mbligh36768f02008-02-22 18:28:33 +000080
showard136e6dc2009-06-10 19:38:49 +000081 usage = 'usage: %prog [options] results_dir'
jadmanski0afbb632008-06-06 21:10:57 +000082 parser = optparse.OptionParser(usage)
83 parser.add_option('--recover-hosts', help='Try to recover dead hosts',
84 action='store_true')
jadmanski0afbb632008-06-06 21:10:57 +000085 parser.add_option('--test', help='Indicate that scheduler is under ' +
86 'test and should use dummy autoserv and no parsing',
87 action='store_true')
88 (options, args) = parser.parse_args()
89 if len(args) != 1:
90 parser.print_usage()
91 return
mbligh36768f02008-02-22 18:28:33 +000092
showard5613c662009-06-08 23:30:33 +000093 scheduler_enabled = global_config.global_config.get_config_value(
94 scheduler_config.CONFIG_SECTION, 'enable_scheduler', type=bool)
95
96 if not scheduler_enabled:
97 msg = ("Scheduler not enabled, set enable_scheduler to true in the "
98 "global_config's SCHEDULER section to enabled it. Exiting.")
99 print msg
100 sys.exit(1)
101
jadmanski0afbb632008-06-06 21:10:57 +0000102 global RESULTS_DIR
103 RESULTS_DIR = args[0]
mbligh36768f02008-02-22 18:28:33 +0000104
mbligh83c1e9e2009-05-01 23:10:41 +0000105 site_init = utils.import_site_function(__file__,
106 "autotest_lib.scheduler.site_monitor_db", "site_init_monitor_db",
107 _site_init_monitor_db_dummy)
108 site_init()
109
showardcca334f2009-03-12 20:38:34 +0000110 # Change the cwd while running to avoid issues incase we were launched from
111 # somewhere odd (such as a random NFS home directory of the person running
112 # sudo to launch us as the appropriate user).
113 os.chdir(RESULTS_DIR)
114
jadmanski0afbb632008-06-06 21:10:57 +0000115 c = global_config.global_config
showardd1ee1dd2009-01-07 21:33:08 +0000116 notify_statuses_list = c.get_config_value(scheduler_config.CONFIG_SECTION,
117 "notify_email_statuses",
118 default='')
showardc85c21b2008-11-24 22:17:37 +0000119 global _notify_email_statuses
showard170873e2009-01-07 00:22:26 +0000120 _notify_email_statuses = [status for status in
121 re.split(r'[\s,;:]', notify_statuses_list.lower())
122 if status]
showardc85c21b2008-11-24 22:17:37 +0000123
jadmanski0afbb632008-06-06 21:10:57 +0000124 if options.test:
125 global _autoserv_path
126 _autoserv_path = 'autoserv_dummy'
127 global _testing_mode
128 _testing_mode = True
mbligh36768f02008-02-22 18:28:33 +0000129
mbligh37eceaa2008-12-15 22:56:37 +0000130 # AUTOTEST_WEB.base_url is still a supported config option as some people
131 # may wish to override the entire url.
showard542e8402008-09-19 20:16:18 +0000132 global _base_url
showard170873e2009-01-07 00:22:26 +0000133 config_base_url = c.get_config_value(DB_CONFIG_SECTION, 'base_url',
134 default='')
mbligh37eceaa2008-12-15 22:56:37 +0000135 if config_base_url:
136 _base_url = config_base_url
showard542e8402008-09-19 20:16:18 +0000137 else:
mbligh37eceaa2008-12-15 22:56:37 +0000138 # For the common case of everything running on a single server you
139 # can just set the hostname in a single place in the config file.
140 server_name = c.get_config_value('SERVER', 'hostname')
141 if not server_name:
showardb18134f2009-03-20 20:52:18 +0000142 logging.critical('[SERVER] hostname missing from the config file.')
mbligh37eceaa2008-12-15 22:56:37 +0000143 sys.exit(1)
144 _base_url = 'http://%s/afe/' % server_name
showard542e8402008-09-19 20:16:18 +0000145
showardc5afc462009-01-13 00:09:39 +0000146 server = status_server.StatusServer(_drone_manager)
showardd1ee1dd2009-01-07 21:33:08 +0000147 server.start()
148
jadmanski0afbb632008-06-06 21:10:57 +0000149 try:
showard136e6dc2009-06-10 19:38:49 +0000150 init()
showardc5afc462009-01-13 00:09:39 +0000151 dispatcher = Dispatcher()
showard915958d2009-04-22 21:00:58 +0000152 dispatcher.initialize(recover_hosts=options.recover_hosts)
showardc5afc462009-01-13 00:09:39 +0000153
jadmanski0afbb632008-06-06 21:10:57 +0000154 while not _shutdown:
155 dispatcher.tick()
showardd1ee1dd2009-01-07 21:33:08 +0000156 time.sleep(scheduler_config.config.tick_pause_sec)
jadmanski0afbb632008-06-06 21:10:57 +0000157 except:
showard170873e2009-01-07 00:22:26 +0000158 email_manager.manager.log_stacktrace(
159 "Uncaught exception; terminating monitor_db")
jadmanski0afbb632008-06-06 21:10:57 +0000160
showard170873e2009-01-07 00:22:26 +0000161 email_manager.manager.send_queued_emails()
showard55b4b542009-01-08 23:30:30 +0000162 server.shutdown()
showard170873e2009-01-07 00:22:26 +0000163 _drone_manager.shutdown()
jadmanski0afbb632008-06-06 21:10:57 +0000164 _db.disconnect()
mbligh36768f02008-02-22 18:28:33 +0000165
166
showard136e6dc2009-06-10 19:38:49 +0000167def setup_logging():
168 log_dir = os.environ.get('AUTOTEST_SCHEDULER_LOG_DIR', None)
169 log_name = os.environ.get('AUTOTEST_SCHEDULER_LOG_NAME', None)
170 logging_manager.configure_logging(
171 scheduler_logging_config.SchedulerLoggingConfig(), log_dir=log_dir,
172 logfile_name=log_name)
173
174
mbligh36768f02008-02-22 18:28:33 +0000175def handle_sigint(signum, frame):
jadmanski0afbb632008-06-06 21:10:57 +0000176 global _shutdown
177 _shutdown = True
showardb18134f2009-03-20 20:52:18 +0000178 logging.info("Shutdown request received.")
mbligh36768f02008-02-22 18:28:33 +0000179
180
showard136e6dc2009-06-10 19:38:49 +0000181def init():
showardb18134f2009-03-20 20:52:18 +0000182 logging.info("%s> dispatcher starting", time.strftime("%X %x"))
183 logging.info("My PID is %d", os.getpid())
mbligh36768f02008-02-22 18:28:33 +0000184
mblighfb676032009-04-01 18:25:38 +0000185 utils.write_pid("monitor_db")
186
showardb1e51872008-10-07 11:08:18 +0000187 if _testing_mode:
188 global_config.global_config.override_config_value(
showard170873e2009-01-07 00:22:26 +0000189 DB_CONFIG_SECTION, 'database', 'stresstest_autotest_web')
showardb1e51872008-10-07 11:08:18 +0000190
jadmanski0afbb632008-06-06 21:10:57 +0000191 os.environ['PATH'] = AUTOTEST_SERVER_DIR + ':' + os.environ['PATH']
192 global _db
showard170873e2009-01-07 00:22:26 +0000193 _db = database_connection.DatabaseConnection(DB_CONFIG_SECTION)
jadmanski0afbb632008-06-06 21:10:57 +0000194 _db.connect()
mbligh36768f02008-02-22 18:28:33 +0000195
showardfa8629c2008-11-04 16:51:23 +0000196 # ensure Django connection is in autocommit
197 setup_django_environment.enable_autocommit()
showard844960a2009-05-29 18:41:18 +0000198 # bypass the readonly connection
199 readonly_connection.ReadOnlyConnection.set_globally_disabled(True)
showardfa8629c2008-11-04 16:51:23 +0000200
showardb18134f2009-03-20 20:52:18 +0000201 logging.info("Setting signal handler")
jadmanski0afbb632008-06-06 21:10:57 +0000202 signal.signal(signal.SIGINT, handle_sigint)
203
showardd1ee1dd2009-01-07 21:33:08 +0000204 drones = global_config.global_config.get_config_value(
205 scheduler_config.CONFIG_SECTION, 'drones', default='localhost')
206 drone_list = [hostname.strip() for hostname in drones.split(',')]
showard170873e2009-01-07 00:22:26 +0000207 results_host = global_config.global_config.get_config_value(
showardd1ee1dd2009-01-07 21:33:08 +0000208 scheduler_config.CONFIG_SECTION, 'results_host', default='localhost')
showard170873e2009-01-07 00:22:26 +0000209 _drone_manager.initialize(RESULTS_DIR, drone_list, results_host)
210
showardb18134f2009-03-20 20:52:18 +0000211 logging.info("Connected! Running...")
mbligh36768f02008-02-22 18:28:33 +0000212
213
showard87ba02a2009-04-20 19:37:32 +0000214def _autoserv_command_line(machines, results_dir, extra_args, job=None,
showarde9c69362009-06-30 01:58:03 +0000215 queue_entry=None, verbose=True):
showardf1ae3542009-05-11 19:26:02 +0000216 """
217 @returns The autoserv command line as a list of executable + parameters.
218
219 @param machines - string - A machine or comma separated list of machines
220 for the (-m) flag.
221 @param results_dir - string - Where the results will be written (-r).
222 @param extra_args - list - Additional arguments to pass to autoserv.
223 @param job - Job object - If supplied, -u owner and -l name parameters
224 will be added.
225 @param queue_entry - A HostQueueEntry object - If supplied and no Job
226 object was supplied, this will be used to lookup the Job object.
227 """
showard87ba02a2009-04-20 19:37:32 +0000228 autoserv_argv = [_autoserv_path, '-p', '-m', machines,
229 '-r', _drone_manager.absolute_path(results_dir)]
230 if job or queue_entry:
231 if not job:
232 job = queue_entry.job
233 autoserv_argv += ['-u', job.owner, '-l', job.name]
showarde9c69362009-06-30 01:58:03 +0000234 if verbose:
235 autoserv_argv.append('--verbose')
showard87ba02a2009-04-20 19:37:32 +0000236 return autoserv_argv + extra_args
237
238
showard89f84db2009-03-12 20:39:13 +0000239class SchedulerError(Exception):
240 """Raised by HostScheduler when an inconsistent state occurs."""
241
242
showard63a34772008-08-18 19:32:50 +0000243class HostScheduler(object):
244 def _get_ready_hosts(self):
245 # avoid any host with a currently active queue entry against it
246 hosts = Host.fetch(
247 joins='LEFT JOIN host_queue_entries AS active_hqe '
248 'ON (hosts.id = active_hqe.host_id AND '
showardb1e51872008-10-07 11:08:18 +0000249 'active_hqe.active)',
showard63a34772008-08-18 19:32:50 +0000250 where="active_hqe.host_id IS NULL "
showardb1e51872008-10-07 11:08:18 +0000251 "AND NOT hosts.locked "
showard63a34772008-08-18 19:32:50 +0000252 "AND (hosts.status IS NULL OR hosts.status = 'Ready')")
253 return dict((host.id, host) for host in hosts)
254
255
256 @staticmethod
257 def _get_sql_id_list(id_list):
258 return ','.join(str(item_id) for item_id in id_list)
259
260
261 @classmethod
showard989f25d2008-10-01 11:38:11 +0000262 def _get_many2many_dict(cls, query, id_list, flip=False):
mbligh849a0f62008-08-28 20:12:19 +0000263 if not id_list:
264 return {}
showard63a34772008-08-18 19:32:50 +0000265 query %= cls._get_sql_id_list(id_list)
266 rows = _db.execute(query)
showard989f25d2008-10-01 11:38:11 +0000267 return cls._process_many2many_dict(rows, flip)
268
269
270 @staticmethod
271 def _process_many2many_dict(rows, flip=False):
showard63a34772008-08-18 19:32:50 +0000272 result = {}
273 for row in rows:
showard89f84db2009-03-12 20:39:13 +0000274 left_id, right_id = int(row[0]), int(row[1])
showard989f25d2008-10-01 11:38:11 +0000275 if flip:
276 left_id, right_id = right_id, left_id
showard63a34772008-08-18 19:32:50 +0000277 result.setdefault(left_id, set()).add(right_id)
278 return result
279
280
281 @classmethod
282 def _get_job_acl_groups(cls, job_ids):
283 query = """
showardd9ac4452009-02-07 02:04:37 +0000284 SELECT jobs.id, acl_groups_users.aclgroup_id
showard63a34772008-08-18 19:32:50 +0000285 FROM jobs
286 INNER JOIN users ON users.login = jobs.owner
287 INNER JOIN acl_groups_users ON acl_groups_users.user_id = users.id
288 WHERE jobs.id IN (%s)
289 """
290 return cls._get_many2many_dict(query, job_ids)
291
292
293 @classmethod
294 def _get_job_ineligible_hosts(cls, job_ids):
295 query = """
296 SELECT job_id, host_id
297 FROM ineligible_host_queues
298 WHERE job_id IN (%s)
299 """
300 return cls._get_many2many_dict(query, job_ids)
301
302
303 @classmethod
showard989f25d2008-10-01 11:38:11 +0000304 def _get_job_dependencies(cls, job_ids):
305 query = """
306 SELECT job_id, label_id
307 FROM jobs_dependency_labels
308 WHERE job_id IN (%s)
309 """
310 return cls._get_many2many_dict(query, job_ids)
311
312
313 @classmethod
showard63a34772008-08-18 19:32:50 +0000314 def _get_host_acls(cls, host_ids):
315 query = """
showardd9ac4452009-02-07 02:04:37 +0000316 SELECT host_id, aclgroup_id
showard63a34772008-08-18 19:32:50 +0000317 FROM acl_groups_hosts
318 WHERE host_id IN (%s)
319 """
320 return cls._get_many2many_dict(query, host_ids)
321
322
323 @classmethod
324 def _get_label_hosts(cls, host_ids):
showardfa8629c2008-11-04 16:51:23 +0000325 if not host_ids:
326 return {}, {}
showard63a34772008-08-18 19:32:50 +0000327 query = """
328 SELECT label_id, host_id
329 FROM hosts_labels
330 WHERE host_id IN (%s)
showard989f25d2008-10-01 11:38:11 +0000331 """ % cls._get_sql_id_list(host_ids)
332 rows = _db.execute(query)
333 labels_to_hosts = cls._process_many2many_dict(rows)
334 hosts_to_labels = cls._process_many2many_dict(rows, flip=True)
335 return labels_to_hosts, hosts_to_labels
336
337
338 @classmethod
339 def _get_labels(cls):
340 return dict((label.id, label) for label in Label.fetch())
showard63a34772008-08-18 19:32:50 +0000341
342
343 def refresh(self, pending_queue_entries):
344 self._hosts_available = self._get_ready_hosts()
345
346 relevant_jobs = [queue_entry.job_id
347 for queue_entry in pending_queue_entries]
348 self._job_acls = self._get_job_acl_groups(relevant_jobs)
349 self._ineligible_hosts = self._get_job_ineligible_hosts(relevant_jobs)
showard989f25d2008-10-01 11:38:11 +0000350 self._job_dependencies = self._get_job_dependencies(relevant_jobs)
showard63a34772008-08-18 19:32:50 +0000351
352 host_ids = self._hosts_available.keys()
353 self._host_acls = self._get_host_acls(host_ids)
showard989f25d2008-10-01 11:38:11 +0000354 self._label_hosts, self._host_labels = self._get_label_hosts(host_ids)
355
356 self._labels = self._get_labels()
showard63a34772008-08-18 19:32:50 +0000357
358
359 def _is_acl_accessible(self, host_id, queue_entry):
360 job_acls = self._job_acls.get(queue_entry.job_id, set())
361 host_acls = self._host_acls.get(host_id, set())
362 return len(host_acls.intersection(job_acls)) > 0
363
364
showard989f25d2008-10-01 11:38:11 +0000365 def _check_job_dependencies(self, job_dependencies, host_labels):
366 missing = job_dependencies - host_labels
showard89f84db2009-03-12 20:39:13 +0000367 return len(missing) == 0
showard989f25d2008-10-01 11:38:11 +0000368
369
370 def _check_only_if_needed_labels(self, job_dependencies, host_labels,
371 queue_entry):
showardade14e22009-01-26 22:38:32 +0000372 if not queue_entry.meta_host:
373 # bypass only_if_needed labels when a specific host is selected
374 return True
375
showard989f25d2008-10-01 11:38:11 +0000376 for label_id in host_labels:
377 label = self._labels[label_id]
378 if not label.only_if_needed:
379 # we don't care about non-only_if_needed labels
380 continue
381 if queue_entry.meta_host == label_id:
382 # if the label was requested in a metahost it's OK
383 continue
384 if label_id not in job_dependencies:
385 return False
386 return True
387
388
showard89f84db2009-03-12 20:39:13 +0000389 def _check_atomic_group_labels(self, host_labels, queue_entry):
390 """
391 Determine if the given HostQueueEntry's atomic group settings are okay
392 to schedule on a host with the given labels.
393
394 @param host_labels - A list of label ids that the host has.
395 @param queue_entry - The HostQueueEntry being considered for the host.
396
397 @returns True if atomic group settings are okay, False otherwise.
398 """
399 return (self._get_host_atomic_group_id(host_labels) ==
400 queue_entry.atomic_group_id)
401
402
403 def _get_host_atomic_group_id(self, host_labels):
404 """
405 Return the atomic group label id for a host with the given set of
406 labels if any, or None otherwise. Raises an exception if more than
407 one atomic group are found in the set of labels.
408
409 @param host_labels - A list of label ids that the host has.
410
411 @returns The id of the atomic group found on a label in host_labels
412 or None if no atomic group label is found.
413 @raises SchedulerError - If more than one atomic group label is found.
414 """
415 atomic_ids = [self._labels[label_id].atomic_group_id
416 for label_id in host_labels
417 if self._labels[label_id].atomic_group_id is not None]
418 if not atomic_ids:
419 return None
420 if len(atomic_ids) > 1:
421 raise SchedulerError('More than one atomic label on host.')
422 return atomic_ids[0]
423
424
425 def _get_atomic_group_labels(self, atomic_group_id):
426 """
427 Lookup the label ids that an atomic_group is associated with.
428
429 @param atomic_group_id - The id of the AtomicGroup to look up.
430
431 @returns A generator yeilding Label ids for this atomic group.
432 """
433 return (id for id, label in self._labels.iteritems()
434 if label.atomic_group_id == atomic_group_id
435 and not label.invalid)
436
437
showard54c1ea92009-05-20 00:32:58 +0000438 def _get_eligible_host_ids_in_group(self, group_hosts, queue_entry):
showard89f84db2009-03-12 20:39:13 +0000439 """
440 @param group_hosts - A sequence of Host ids to test for usability
441 and eligibility against the Job associated with queue_entry.
442 @param queue_entry - The HostQueueEntry that these hosts are being
443 tested for eligibility against.
444
445 @returns A subset of group_hosts Host ids that are eligible for the
446 supplied queue_entry.
447 """
448 return set(host_id for host_id in group_hosts
449 if self._is_host_usable(host_id)
450 and self._is_host_eligible_for_job(host_id, queue_entry))
451
452
showard989f25d2008-10-01 11:38:11 +0000453 def _is_host_eligible_for_job(self, host_id, queue_entry):
showard2924b0a2009-06-18 23:16:15 +0000454 if self._is_host_invalid(host_id):
455 # if an invalid host is scheduled for a job, it's a one-time host
456 # and it therefore bypasses eligibility checks. note this can only
457 # happen for non-metahosts, because invalid hosts have their label
458 # relationships cleared.
459 return True
460
showard989f25d2008-10-01 11:38:11 +0000461 job_dependencies = self._job_dependencies.get(queue_entry.job_id, set())
462 host_labels = self._host_labels.get(host_id, set())
mblighc993bee2008-10-03 03:42:34 +0000463
showard89f84db2009-03-12 20:39:13 +0000464 return (self._is_acl_accessible(host_id, queue_entry) and
465 self._check_job_dependencies(job_dependencies, host_labels) and
466 self._check_only_if_needed_labels(
467 job_dependencies, host_labels, queue_entry) and
468 self._check_atomic_group_labels(host_labels, queue_entry))
showard989f25d2008-10-01 11:38:11 +0000469
470
showard2924b0a2009-06-18 23:16:15 +0000471 def _is_host_invalid(self, host_id):
472 host_object = self._hosts_available.get(host_id, None)
473 return host_object and host_object.invalid
474
475
showard63a34772008-08-18 19:32:50 +0000476 def _schedule_non_metahost(self, queue_entry):
showard989f25d2008-10-01 11:38:11 +0000477 if not self._is_host_eligible_for_job(queue_entry.host_id, queue_entry):
showard63a34772008-08-18 19:32:50 +0000478 return None
479 return self._hosts_available.pop(queue_entry.host_id, None)
480
481
482 def _is_host_usable(self, host_id):
483 if host_id not in self._hosts_available:
484 # host was already used during this scheduling cycle
485 return False
486 if self._hosts_available[host_id].invalid:
487 # Invalid hosts cannot be used for metahosts. They're included in
488 # the original query because they can be used by non-metahosts.
489 return False
490 return True
491
492
493 def _schedule_metahost(self, queue_entry):
494 label_id = queue_entry.meta_host
495 hosts_in_label = self._label_hosts.get(label_id, set())
496 ineligible_host_ids = self._ineligible_hosts.get(queue_entry.job_id,
497 set())
498
499 # must iterate over a copy so we can mutate the original while iterating
500 for host_id in list(hosts_in_label):
501 if not self._is_host_usable(host_id):
502 hosts_in_label.remove(host_id)
503 continue
504 if host_id in ineligible_host_ids:
505 continue
showard989f25d2008-10-01 11:38:11 +0000506 if not self._is_host_eligible_for_job(host_id, queue_entry):
showard63a34772008-08-18 19:32:50 +0000507 continue
508
showard89f84db2009-03-12 20:39:13 +0000509 # Remove the host from our cached internal state before returning
510 # the host object.
showard63a34772008-08-18 19:32:50 +0000511 hosts_in_label.remove(host_id)
512 return self._hosts_available.pop(host_id)
513 return None
514
515
516 def find_eligible_host(self, queue_entry):
517 if not queue_entry.meta_host:
showard89f84db2009-03-12 20:39:13 +0000518 assert queue_entry.host_id is not None
showard63a34772008-08-18 19:32:50 +0000519 return self._schedule_non_metahost(queue_entry)
showard89f84db2009-03-12 20:39:13 +0000520 assert queue_entry.atomic_group_id is None
showard63a34772008-08-18 19:32:50 +0000521 return self._schedule_metahost(queue_entry)
522
523
showard89f84db2009-03-12 20:39:13 +0000524 def find_eligible_atomic_group(self, queue_entry):
525 """
526 Given an atomic group host queue entry, locate an appropriate group
527 of hosts for the associated job to run on.
528
529 The caller is responsible for creating new HQEs for the additional
530 hosts returned in order to run the actual job on them.
531
532 @returns A list of Host instances in a ready state to satisfy this
533 atomic group scheduling. Hosts will all belong to the same
534 atomic group label as specified by the queue_entry.
535 An empty list will be returned if no suitable atomic
536 group could be found.
537
538 TODO(gps): what is responsible for kicking off any attempted repairs on
539 a group of hosts? not this function, but something needs to. We do
540 not communicate that reason for returning [] outside of here...
541 For now, we'll just be unschedulable if enough hosts within one group
542 enter Repair Failed state.
543 """
544 assert queue_entry.atomic_group_id is not None
545 job = queue_entry.job
546 assert job.synch_count and job.synch_count > 0
showard77182562009-06-10 00:16:05 +0000547 atomic_group = queue_entry.atomic_group
showard89f84db2009-03-12 20:39:13 +0000548 if job.synch_count > atomic_group.max_number_of_machines:
549 # Such a Job and HostQueueEntry should never be possible to
550 # create using the frontend. Regardless, we can't process it.
551 # Abort it immediately and log an error on the scheduler.
552 queue_entry.set_status(models.HostQueueEntry.Status.ABORTED)
showard7629f142009-03-27 21:02:02 +0000553 logging.error(
554 'Error: job %d synch_count=%d > requested atomic_group %d '
555 'max_number_of_machines=%d. Aborted host_queue_entry %d.',
556 job.id, job.synch_count, atomic_group.id,
557 atomic_group.max_number_of_machines, queue_entry.id)
showard89f84db2009-03-12 20:39:13 +0000558 return []
559 hosts_in_label = self._label_hosts.get(queue_entry.meta_host, set())
560 ineligible_host_ids = self._ineligible_hosts.get(queue_entry.job_id,
561 set())
562
563 # Look in each label associated with atomic_group until we find one with
564 # enough hosts to satisfy the job.
565 for atomic_label_id in self._get_atomic_group_labels(atomic_group.id):
566 group_hosts = set(self._label_hosts.get(atomic_label_id, set()))
567 if queue_entry.meta_host is not None:
568 # If we have a metahost label, only allow its hosts.
569 group_hosts.intersection_update(hosts_in_label)
570 group_hosts -= ineligible_host_ids
showard54c1ea92009-05-20 00:32:58 +0000571 eligible_host_ids_in_group = self._get_eligible_host_ids_in_group(
showard89f84db2009-03-12 20:39:13 +0000572 group_hosts, queue_entry)
573
574 # Job.synch_count is treated as "minimum synch count" when
575 # scheduling for an atomic group of hosts. The atomic group
576 # number of machines is the maximum to pick out of a single
577 # atomic group label for scheduling at one time.
578 min_hosts = job.synch_count
579 max_hosts = atomic_group.max_number_of_machines
580
showard54c1ea92009-05-20 00:32:58 +0000581 if len(eligible_host_ids_in_group) < min_hosts:
showard89f84db2009-03-12 20:39:13 +0000582 # Not enough eligible hosts in this atomic group label.
583 continue
584
showard54c1ea92009-05-20 00:32:58 +0000585 eligible_hosts_in_group = [self._hosts_available[id]
586 for id in eligible_host_ids_in_group]
showardef519212009-05-08 02:29:53 +0000587 # So that they show up in a sane order when viewing the job.
showard54c1ea92009-05-20 00:32:58 +0000588 eligible_hosts_in_group.sort(cmp=Host.cmp_for_sort)
showardef519212009-05-08 02:29:53 +0000589
showard89f84db2009-03-12 20:39:13 +0000590 # Limit ourselves to scheduling the atomic group size.
591 if len(eligible_hosts_in_group) > max_hosts:
showardef519212009-05-08 02:29:53 +0000592 eligible_hosts_in_group = eligible_hosts_in_group[:max_hosts]
showard89f84db2009-03-12 20:39:13 +0000593
594 # Remove the selected hosts from our cached internal state
595 # of available hosts in order to return the Host objects.
596 host_list = []
showard54c1ea92009-05-20 00:32:58 +0000597 for host in eligible_hosts_in_group:
598 hosts_in_label.discard(host.id)
599 self._hosts_available.pop(host.id)
600 host_list.append(host)
showard89f84db2009-03-12 20:39:13 +0000601 return host_list
602
603 return []
604
605
showard170873e2009-01-07 00:22:26 +0000606class Dispatcher(object):
jadmanski0afbb632008-06-06 21:10:57 +0000607 def __init__(self):
608 self._agents = []
showard3bb499f2008-07-03 19:42:20 +0000609 self._last_clean_time = time.time()
showard63a34772008-08-18 19:32:50 +0000610 self._host_scheduler = HostScheduler()
mblighf3294cc2009-04-08 21:17:38 +0000611 user_cleanup_time = scheduler_config.config.clean_interval
612 self._periodic_cleanup = monitor_db_cleanup.UserCleanup(
613 _db, user_cleanup_time)
614 self._24hr_upkeep = monitor_db_cleanup.TwentyFourHourUpkeep(_db)
showard170873e2009-01-07 00:22:26 +0000615 self._host_agents = {}
616 self._queue_entry_agents = {}
mbligh36768f02008-02-22 18:28:33 +0000617
mbligh36768f02008-02-22 18:28:33 +0000618
showard915958d2009-04-22 21:00:58 +0000619 def initialize(self, recover_hosts=True):
620 self._periodic_cleanup.initialize()
621 self._24hr_upkeep.initialize()
622
jadmanski0afbb632008-06-06 21:10:57 +0000623 # always recover processes
624 self._recover_processes()
mblighbb421852008-03-11 22:36:16 +0000625
jadmanski0afbb632008-06-06 21:10:57 +0000626 if recover_hosts:
627 self._recover_hosts()
mbligh36768f02008-02-22 18:28:33 +0000628
629
jadmanski0afbb632008-06-06 21:10:57 +0000630 def tick(self):
showard170873e2009-01-07 00:22:26 +0000631 _drone_manager.refresh()
mblighf3294cc2009-04-08 21:17:38 +0000632 self._run_cleanup()
jadmanski0afbb632008-06-06 21:10:57 +0000633 self._find_aborting()
showard1ff7b2e2009-05-15 23:17:18 +0000634 self._find_reverify()
showard29f7cd22009-04-29 21:16:24 +0000635 self._process_recurring_runs()
jadmanski0afbb632008-06-06 21:10:57 +0000636 self._schedule_new_jobs()
637 self._handle_agents()
showard170873e2009-01-07 00:22:26 +0000638 _drone_manager.execute_actions()
639 email_manager.manager.send_queued_emails()
mbligh36768f02008-02-22 18:28:33 +0000640
showard97aed502008-11-04 02:01:24 +0000641
mblighf3294cc2009-04-08 21:17:38 +0000642 def _run_cleanup(self):
643 self._periodic_cleanup.run_cleanup_maybe()
644 self._24hr_upkeep.run_cleanup_maybe()
showarda3ab0d52008-11-03 19:03:47 +0000645
mbligh36768f02008-02-22 18:28:33 +0000646
showard170873e2009-01-07 00:22:26 +0000647 def _register_agent_for_ids(self, agent_dict, object_ids, agent):
648 for object_id in object_ids:
649 agent_dict.setdefault(object_id, set()).add(agent)
650
651
652 def _unregister_agent_for_ids(self, agent_dict, object_ids, agent):
653 for object_id in object_ids:
654 assert object_id in agent_dict
655 agent_dict[object_id].remove(agent)
656
657
jadmanski0afbb632008-06-06 21:10:57 +0000658 def add_agent(self, agent):
659 self._agents.append(agent)
660 agent.dispatcher = self
showard170873e2009-01-07 00:22:26 +0000661 self._register_agent_for_ids(self._host_agents, agent.host_ids, agent)
662 self._register_agent_for_ids(self._queue_entry_agents,
663 agent.queue_entry_ids, agent)
mblighd5c95802008-03-05 00:33:46 +0000664
showard170873e2009-01-07 00:22:26 +0000665
666 def get_agents_for_entry(self, queue_entry):
667 """
668 Find agents corresponding to the specified queue_entry.
669 """
showardd3dc1992009-04-22 21:01:40 +0000670 return list(self._queue_entry_agents.get(queue_entry.id, set()))
showard170873e2009-01-07 00:22:26 +0000671
672
673 def host_has_agent(self, host):
674 """
675 Determine if there is currently an Agent present using this host.
676 """
677 return bool(self._host_agents.get(host.id, None))
mbligh36768f02008-02-22 18:28:33 +0000678
679
jadmanski0afbb632008-06-06 21:10:57 +0000680 def remove_agent(self, agent):
681 self._agents.remove(agent)
showard170873e2009-01-07 00:22:26 +0000682 self._unregister_agent_for_ids(self._host_agents, agent.host_ids,
683 agent)
684 self._unregister_agent_for_ids(self._queue_entry_agents,
685 agent.queue_entry_ids, agent)
showardec113162008-05-08 00:52:49 +0000686
687
jadmanski0afbb632008-06-06 21:10:57 +0000688 def _recover_processes(self):
showard170873e2009-01-07 00:22:26 +0000689 self._register_pidfiles()
690 _drone_manager.refresh()
showardd3dc1992009-04-22 21:01:40 +0000691 self._recover_all_recoverable_entries()
showard170873e2009-01-07 00:22:26 +0000692 self._requeue_other_active_entries()
showard170873e2009-01-07 00:22:26 +0000693 self._reverify_remaining_hosts()
694 # reinitialize drones after killing orphaned processes, since they can
695 # leave around files when they die
696 _drone_manager.execute_actions()
697 _drone_manager.reinitialize_drones()
mblighbb421852008-03-11 22:36:16 +0000698
showard170873e2009-01-07 00:22:26 +0000699
700 def _register_pidfiles(self):
701 # during recovery we may need to read pidfiles for both running and
702 # parsing entries
703 queue_entries = HostQueueEntry.fetch(
showardd3dc1992009-04-22 21:01:40 +0000704 where="status IN ('Running', 'Gathering', 'Parsing')")
jadmanski0afbb632008-06-06 21:10:57 +0000705 for queue_entry in queue_entries:
showardd3dc1992009-04-22 21:01:40 +0000706 for pidfile_name in _ALL_PIDFILE_NAMES:
707 pidfile_id = _drone_manager.get_pidfile_id_from(
708 queue_entry.execution_tag(), pidfile_name=pidfile_name)
709 _drone_manager.register_pidfile(pidfile_id)
showard170873e2009-01-07 00:22:26 +0000710
711
showardd3dc1992009-04-22 21:01:40 +0000712 def _recover_entries_with_status(self, status, orphans, pidfile_name,
713 recover_entries_fn):
714 queue_entries = HostQueueEntry.fetch(where="status = '%s'" % status)
showard170873e2009-01-07 00:22:26 +0000715 for queue_entry in queue_entries:
716 if self.get_agents_for_entry(queue_entry):
jadmanski0afbb632008-06-06 21:10:57 +0000717 # synchronous job we've already recovered
718 continue
showardd3dc1992009-04-22 21:01:40 +0000719 queue_entries = queue_entry.job.get_group_entries(queue_entry)
showard170873e2009-01-07 00:22:26 +0000720 execution_tag = queue_entry.execution_tag()
721 run_monitor = PidfileRunMonitor()
showardd3dc1992009-04-22 21:01:40 +0000722 run_monitor.attach_to_existing_process(execution_tag,
723 pidfile_name=pidfile_name)
showard597bfd32009-05-08 18:22:50 +0000724
725 log_message = ('Recovering %s entry %s ' %
726 (status.lower(),
727 ', '.join(str(entry) for entry in queue_entries)))
showard170873e2009-01-07 00:22:26 +0000728 if not run_monitor.has_process():
showardd3dc1992009-04-22 21:01:40 +0000729 # execution apparently never happened
showard597bfd32009-05-08 18:22:50 +0000730 logging.info(log_message + 'without process')
showardd3dc1992009-04-22 21:01:40 +0000731 recover_entries_fn(queue_entry.job, queue_entries, None)
showard170873e2009-01-07 00:22:26 +0000732 continue
mbligh90a549d2008-03-25 23:52:34 +0000733
showard597bfd32009-05-08 18:22:50 +0000734 logging.info(log_message + '(process %s)',
showardd3dc1992009-04-22 21:01:40 +0000735 run_monitor.get_process())
736 recover_entries_fn(queue_entry.job, queue_entries, run_monitor)
737 orphans.discard(run_monitor.get_process())
738
739
740 def _kill_remaining_orphan_processes(self, orphans):
741 for process in orphans:
showardb18134f2009-03-20 20:52:18 +0000742 logging.info('Killing orphan %s', process)
showard170873e2009-01-07 00:22:26 +0000743 _drone_manager.kill_process(process)
jadmanski0afbb632008-06-06 21:10:57 +0000744
showard170873e2009-01-07 00:22:26 +0000745
showardd3dc1992009-04-22 21:01:40 +0000746 def _recover_running_entries(self, orphans):
747 def recover_entries(job, queue_entries, run_monitor):
748 if run_monitor is not None:
749 queue_task = RecoveryQueueTask(job=job,
750 queue_entries=queue_entries,
751 run_monitor=run_monitor)
752 self.add_agent(Agent(tasks=[queue_task],
753 num_processes=len(queue_entries)))
754 # else, _requeue_other_active_entries will cover this
755
756 self._recover_entries_with_status(models.HostQueueEntry.Status.RUNNING,
757 orphans, '.autoserv_execute',
758 recover_entries)
759
760
761 def _recover_gathering_entries(self, orphans):
762 def recover_entries(job, queue_entries, run_monitor):
763 gather_task = GatherLogsTask(job, queue_entries,
764 run_monitor=run_monitor)
765 self.add_agent(Agent([gather_task]))
766
767 self._recover_entries_with_status(
768 models.HostQueueEntry.Status.GATHERING,
769 orphans, _CRASHINFO_PID_FILE, recover_entries)
770
771
772 def _recover_parsing_entries(self, orphans):
773 def recover_entries(job, queue_entries, run_monitor):
774 reparse_task = FinalReparseTask(queue_entries,
775 run_monitor=run_monitor)
776 self.add_agent(Agent([reparse_task], num_processes=0))
777
778 self._recover_entries_with_status(models.HostQueueEntry.Status.PARSING,
779 orphans, _PARSER_PID_FILE,
780 recover_entries)
781
782
783 def _recover_all_recoverable_entries(self):
784 orphans = _drone_manager.get_orphaned_autoserv_processes()
785 self._recover_running_entries(orphans)
786 self._recover_gathering_entries(orphans)
787 self._recover_parsing_entries(orphans)
788 self._kill_remaining_orphan_processes(orphans)
jadmanski0afbb632008-06-06 21:10:57 +0000789
showard97aed502008-11-04 02:01:24 +0000790
showard170873e2009-01-07 00:22:26 +0000791 def _requeue_other_active_entries(self):
792 queue_entries = HostQueueEntry.fetch(
showardd3dc1992009-04-22 21:01:40 +0000793 where='active AND NOT complete AND '
794 '(aborted OR status != "Pending")')
showard170873e2009-01-07 00:22:26 +0000795 for queue_entry in queue_entries:
796 if self.get_agents_for_entry(queue_entry):
797 # entry has already been recovered
798 continue
showardd3dc1992009-04-22 21:01:40 +0000799 if queue_entry.aborted:
800 queue_entry.abort(self)
801 continue
802
803 logging.info('Requeuing active QE %s (status=%s)', queue_entry,
showardb18134f2009-03-20 20:52:18 +0000804 queue_entry.status)
showard170873e2009-01-07 00:22:26 +0000805 if queue_entry.host:
806 tasks = queue_entry.host.reverify_tasks()
807 self.add_agent(Agent(tasks))
808 agent = queue_entry.requeue()
809
810
showard1ff7b2e2009-05-15 23:17:18 +0000811 def _find_reverify(self):
showard6d7b2ff2009-06-10 00:16:47 +0000812 tasks = models.SpecialTask.objects.filter(
813 task=models.SpecialTask.Task.REVERIFY, is_active=False,
814 is_complete=False)
815
816 host_ids = [str(task.host.id) for task in tasks]
817
818 if host_ids:
819 where = 'id IN (%s)' % ','.join(host_ids)
820 host_ids_reverifying = self._reverify_hosts_where(
821 where, cleanup=False)
822 tasks = tasks.filter(host__id__in=host_ids_reverifying)
823 for task in tasks:
824 task.is_active=True
825 task.save()
showard1ff7b2e2009-05-15 23:17:18 +0000826
827
showard170873e2009-01-07 00:22:26 +0000828 def _reverify_remaining_hosts(self):
showard45ae8192008-11-05 19:32:53 +0000829 # reverify hosts that were in the middle of verify, repair or cleanup
jadmanski0afbb632008-06-06 21:10:57 +0000830 self._reverify_hosts_where("""(status = 'Repairing' OR
831 status = 'Verifying' OR
showard170873e2009-01-07 00:22:26 +0000832 status = 'Cleaning')""")
jadmanski0afbb632008-06-06 21:10:57 +0000833
showard170873e2009-01-07 00:22:26 +0000834 # recover "Running" hosts with no active queue entries, although this
835 # should never happen
836 message = ('Recovering running host %s - this probably indicates a '
837 'scheduler bug')
jadmanski0afbb632008-06-06 21:10:57 +0000838 self._reverify_hosts_where("""status = 'Running' AND
839 id NOT IN (SELECT host_id
840 FROM host_queue_entries
841 WHERE active)""",
842 print_message=message)
mblighbb421852008-03-11 22:36:16 +0000843
844
jadmanski0afbb632008-06-06 21:10:57 +0000845 def _reverify_hosts_where(self, where,
showarda64e52a2009-06-08 23:24:08 +0000846 print_message='Reverifying host %s',
847 cleanup=True):
showard170873e2009-01-07 00:22:26 +0000848 full_where='locked = 0 AND invalid = 0 AND ' + where
showard6d7b2ff2009-06-10 00:16:47 +0000849 host_ids_reverifying = []
showard170873e2009-01-07 00:22:26 +0000850 for host in Host.fetch(where=full_where):
851 if self.host_has_agent(host):
852 # host has already been recovered in some way
jadmanski0afbb632008-06-06 21:10:57 +0000853 continue
showard170873e2009-01-07 00:22:26 +0000854 if print_message:
showardb18134f2009-03-20 20:52:18 +0000855 logging.info(print_message, host.hostname)
showarda64e52a2009-06-08 23:24:08 +0000856 tasks = host.reverify_tasks(cleanup)
showard170873e2009-01-07 00:22:26 +0000857 self.add_agent(Agent(tasks))
showard6d7b2ff2009-06-10 00:16:47 +0000858 host_ids_reverifying.append(host.id)
859 return host_ids_reverifying
mbligh36768f02008-02-22 18:28:33 +0000860
861
jadmanski0afbb632008-06-06 21:10:57 +0000862 def _recover_hosts(self):
863 # recover "Repair Failed" hosts
864 message = 'Reverifying dead host %s'
865 self._reverify_hosts_where("status = 'Repair Failed'",
866 print_message=message)
mbligh62ba2ed2008-04-30 17:09:25 +0000867
868
showard04c82c52008-05-29 19:38:12 +0000869
showardb95b1bd2008-08-15 18:11:04 +0000870 def _get_pending_queue_entries(self):
showard63a34772008-08-18 19:32:50 +0000871 # prioritize by job priority, then non-metahost over metahost, then FIFO
872 return list(HostQueueEntry.fetch(
showard25cbdbd2009-02-17 20:57:21 +0000873 joins='INNER JOIN jobs ON (job_id=jobs.id)',
showardac9ce222008-12-03 18:19:44 +0000874 where='NOT complete AND NOT active AND status="Queued"',
showard25cbdbd2009-02-17 20:57:21 +0000875 order_by='jobs.priority DESC, meta_host, job_id'))
mbligh36768f02008-02-22 18:28:33 +0000876
877
showard89f84db2009-03-12 20:39:13 +0000878 def _refresh_pending_queue_entries(self):
879 """
880 Lookup the pending HostQueueEntries and call our HostScheduler
881 refresh() method given that list. Return the list.
882
883 @returns A list of pending HostQueueEntries sorted in priority order.
884 """
showard63a34772008-08-18 19:32:50 +0000885 queue_entries = self._get_pending_queue_entries()
886 if not queue_entries:
showard89f84db2009-03-12 20:39:13 +0000887 return []
showardb95b1bd2008-08-15 18:11:04 +0000888
showard63a34772008-08-18 19:32:50 +0000889 self._host_scheduler.refresh(queue_entries)
showardb95b1bd2008-08-15 18:11:04 +0000890
showard89f84db2009-03-12 20:39:13 +0000891 return queue_entries
892
893
894 def _schedule_atomic_group(self, queue_entry):
895 """
896 Schedule the given queue_entry on an atomic group of hosts.
897
898 Returns immediately if there are insufficient available hosts.
899
900 Creates new HostQueueEntries based off of queue_entry for the
901 scheduled hosts and starts them all running.
902 """
903 # This is a virtual host queue entry representing an entire
904 # atomic group, find a group and schedule their hosts.
905 group_hosts = self._host_scheduler.find_eligible_atomic_group(
906 queue_entry)
907 if not group_hosts:
908 return
showardcbe6f942009-06-17 19:33:49 +0000909
910 logging.info('Expanding atomic group entry %s with hosts %s',
911 queue_entry,
912 ', '.join(host.hostname for host in group_hosts))
showard89f84db2009-03-12 20:39:13 +0000913 # The first assigned host uses the original HostQueueEntry
914 group_queue_entries = [queue_entry]
915 for assigned_host in group_hosts[1:]:
916 # Create a new HQE for every additional assigned_host.
917 new_hqe = HostQueueEntry.clone(queue_entry)
918 new_hqe.save()
919 group_queue_entries.append(new_hqe)
920 assert len(group_queue_entries) == len(group_hosts)
921 for queue_entry, host in itertools.izip(group_queue_entries,
922 group_hosts):
923 self._run_queue_entry(queue_entry, host)
924
925
926 def _schedule_new_jobs(self):
927 queue_entries = self._refresh_pending_queue_entries()
928 if not queue_entries:
929 return
930
showard63a34772008-08-18 19:32:50 +0000931 for queue_entry in queue_entries:
showard89f84db2009-03-12 20:39:13 +0000932 if (queue_entry.atomic_group_id is None or
933 queue_entry.host_id is not None):
934 assigned_host = self._host_scheduler.find_eligible_host(
935 queue_entry)
936 if assigned_host:
937 self._run_queue_entry(queue_entry, assigned_host)
938 else:
939 self._schedule_atomic_group(queue_entry)
showardb95b1bd2008-08-15 18:11:04 +0000940
941
942 def _run_queue_entry(self, queue_entry, host):
showard77182562009-06-10 00:16:05 +0000943 agent = queue_entry.run_pre_job_tasks(assigned_host=host)
944 self.add_agent(agent)
mblighd5c95802008-03-05 00:33:46 +0000945
946
jadmanski0afbb632008-06-06 21:10:57 +0000947 def _find_aborting(self):
showardd3dc1992009-04-22 21:01:40 +0000948 for entry in HostQueueEntry.fetch(where='aborted and not complete'):
949 for agent in self.get_agents_for_entry(entry):
950 agent.abort()
951 entry.abort(self)
jadmanski0afbb632008-06-06 21:10:57 +0000952
953
showard324bf812009-01-20 23:23:38 +0000954 def _can_start_agent(self, agent, num_started_this_cycle,
955 have_reached_limit):
showard4c5374f2008-09-04 17:02:56 +0000956 # always allow zero-process agents to run
957 if agent.num_processes == 0:
958 return True
959 # don't allow any nonzero-process agents to run after we've reached a
960 # limit (this avoids starvation of many-process agents)
961 if have_reached_limit:
962 return False
963 # total process throttling
showard324bf812009-01-20 23:23:38 +0000964 if agent.num_processes > _drone_manager.max_runnable_processes():
showard4c5374f2008-09-04 17:02:56 +0000965 return False
966 # if a single agent exceeds the per-cycle throttling, still allow it to
967 # run when it's the first agent in the cycle
968 if num_started_this_cycle == 0:
969 return True
970 # per-cycle throttling
971 if (num_started_this_cycle + agent.num_processes >
showardd1ee1dd2009-01-07 21:33:08 +0000972 scheduler_config.config.max_processes_started_per_cycle):
showard4c5374f2008-09-04 17:02:56 +0000973 return False
974 return True
975
976
jadmanski0afbb632008-06-06 21:10:57 +0000977 def _handle_agents(self):
jadmanski0afbb632008-06-06 21:10:57 +0000978 num_started_this_cycle = 0
showard4c5374f2008-09-04 17:02:56 +0000979 have_reached_limit = False
980 # iterate over copy, so we can remove agents during iteration
981 for agent in list(self._agents):
982 if agent.is_done():
showardb18134f2009-03-20 20:52:18 +0000983 logging.info("agent finished")
showard170873e2009-01-07 00:22:26 +0000984 self.remove_agent(agent)
showard4c5374f2008-09-04 17:02:56 +0000985 continue
986 if not agent.is_running():
showard324bf812009-01-20 23:23:38 +0000987 if not self._can_start_agent(agent, num_started_this_cycle,
showard4c5374f2008-09-04 17:02:56 +0000988 have_reached_limit):
989 have_reached_limit = True
990 continue
showard4c5374f2008-09-04 17:02:56 +0000991 num_started_this_cycle += agent.num_processes
992 agent.tick()
showarda9435c02009-05-13 21:28:17 +0000993 logging.info('%d running processes',
showardb18134f2009-03-20 20:52:18 +0000994 _drone_manager.total_running_processes())
mbligh36768f02008-02-22 18:28:33 +0000995
996
showard29f7cd22009-04-29 21:16:24 +0000997 def _process_recurring_runs(self):
998 recurring_runs = models.RecurringRun.objects.filter(
999 start_date__lte=datetime.datetime.now())
1000 for rrun in recurring_runs:
1001 # Create job from template
1002 job = rrun.job
1003 info = rpc_utils.get_job_info(job)
showarda9435c02009-05-13 21:28:17 +00001004 options = job.get_object_dict()
showard29f7cd22009-04-29 21:16:24 +00001005
1006 host_objects = info['hosts']
1007 one_time_hosts = info['one_time_hosts']
1008 metahost_objects = info['meta_hosts']
1009 dependencies = info['dependencies']
1010 atomic_group = info['atomic_group']
1011
1012 for host in one_time_hosts or []:
1013 this_host = models.Host.create_one_time_host(host.hostname)
1014 host_objects.append(this_host)
1015
1016 try:
1017 rpc_utils.create_new_job(owner=rrun.owner.login,
showarda9435c02009-05-13 21:28:17 +00001018 options=options,
showard29f7cd22009-04-29 21:16:24 +00001019 host_objects=host_objects,
1020 metahost_objects=metahost_objects,
showard29f7cd22009-04-29 21:16:24 +00001021 atomic_group=atomic_group)
1022
1023 except Exception, ex:
1024 logging.exception(ex)
1025 #TODO send email
1026
1027 if rrun.loop_count == 1:
1028 rrun.delete()
1029 else:
1030 if rrun.loop_count != 0: # if not infinite loop
1031 # calculate new start_date
1032 difference = datetime.timedelta(seconds=rrun.loop_period)
1033 rrun.start_date = rrun.start_date + difference
1034 rrun.loop_count -= 1
1035 rrun.save()
1036
1037
showard170873e2009-01-07 00:22:26 +00001038class PidfileRunMonitor(object):
1039 """
1040 Client must call either run() to start a new process or
1041 attach_to_existing_process().
1042 """
mbligh36768f02008-02-22 18:28:33 +00001043
showard170873e2009-01-07 00:22:26 +00001044 class _PidfileException(Exception):
1045 """
1046 Raised when there's some unexpected behavior with the pid file, but only
1047 used internally (never allowed to escape this class).
1048 """
mbligh36768f02008-02-22 18:28:33 +00001049
1050
showard170873e2009-01-07 00:22:26 +00001051 def __init__(self):
showard35162b02009-03-03 02:17:30 +00001052 self.lost_process = False
showard170873e2009-01-07 00:22:26 +00001053 self._start_time = None
1054 self.pidfile_id = None
1055 self._state = drone_manager.PidfileContents()
showard2bab8f42008-11-12 18:15:22 +00001056
1057
showard170873e2009-01-07 00:22:26 +00001058 def _add_nice_command(self, command, nice_level):
1059 if not nice_level:
1060 return command
1061 return ['nice', '-n', str(nice_level)] + command
1062
1063
1064 def _set_start_time(self):
1065 self._start_time = time.time()
1066
1067
1068 def run(self, command, working_directory, nice_level=None, log_file=None,
1069 pidfile_name=None, paired_with_pidfile=None):
1070 assert command is not None
1071 if nice_level is not None:
1072 command = ['nice', '-n', str(nice_level)] + command
1073 self._set_start_time()
1074 self.pidfile_id = _drone_manager.execute_command(
showardd3dc1992009-04-22 21:01:40 +00001075 command, working_directory, pidfile_name=pidfile_name,
1076 log_file=log_file, paired_with_pidfile=paired_with_pidfile)
showard170873e2009-01-07 00:22:26 +00001077
1078
showardd3dc1992009-04-22 21:01:40 +00001079 def attach_to_existing_process(self, execution_tag,
1080 pidfile_name=_AUTOSERV_PID_FILE):
showard170873e2009-01-07 00:22:26 +00001081 self._set_start_time()
showardd3dc1992009-04-22 21:01:40 +00001082 self.pidfile_id = _drone_manager.get_pidfile_id_from(
1083 execution_tag, pidfile_name=pidfile_name)
showard170873e2009-01-07 00:22:26 +00001084 _drone_manager.register_pidfile(self.pidfile_id)
mblighbb421852008-03-11 22:36:16 +00001085
1086
jadmanski0afbb632008-06-06 21:10:57 +00001087 def kill(self):
showard170873e2009-01-07 00:22:26 +00001088 if self.has_process():
1089 _drone_manager.kill_process(self.get_process())
mblighbb421852008-03-11 22:36:16 +00001090
mbligh36768f02008-02-22 18:28:33 +00001091
showard170873e2009-01-07 00:22:26 +00001092 def has_process(self):
showard21baa452008-10-21 00:08:39 +00001093 self._get_pidfile_info()
showard170873e2009-01-07 00:22:26 +00001094 return self._state.process is not None
showard21baa452008-10-21 00:08:39 +00001095
1096
showard170873e2009-01-07 00:22:26 +00001097 def get_process(self):
showard21baa452008-10-21 00:08:39 +00001098 self._get_pidfile_info()
showard35162b02009-03-03 02:17:30 +00001099 assert self._state.process is not None
showard170873e2009-01-07 00:22:26 +00001100 return self._state.process
mblighbb421852008-03-11 22:36:16 +00001101
1102
showard170873e2009-01-07 00:22:26 +00001103 def _read_pidfile(self, use_second_read=False):
1104 assert self.pidfile_id is not None, (
1105 'You must call run() or attach_to_existing_process()')
1106 contents = _drone_manager.get_pidfile_contents(
1107 self.pidfile_id, use_second_read=use_second_read)
1108 if contents.is_invalid():
1109 self._state = drone_manager.PidfileContents()
1110 raise self._PidfileException(contents)
1111 self._state = contents
mbligh90a549d2008-03-25 23:52:34 +00001112
1113
showard21baa452008-10-21 00:08:39 +00001114 def _handle_pidfile_error(self, error, message=''):
showard170873e2009-01-07 00:22:26 +00001115 message = error + '\nProcess: %s\nPidfile: %s\n%s' % (
1116 self._state.process, self.pidfile_id, message)
showardb18134f2009-03-20 20:52:18 +00001117 logging.info(message)
showard170873e2009-01-07 00:22:26 +00001118 email_manager.manager.enqueue_notify_email(error, message)
showard35162b02009-03-03 02:17:30 +00001119 self.on_lost_process(self._state.process)
showard21baa452008-10-21 00:08:39 +00001120
1121
1122 def _get_pidfile_info_helper(self):
showard35162b02009-03-03 02:17:30 +00001123 if self.lost_process:
showard21baa452008-10-21 00:08:39 +00001124 return
mblighbb421852008-03-11 22:36:16 +00001125
showard21baa452008-10-21 00:08:39 +00001126 self._read_pidfile()
mblighbb421852008-03-11 22:36:16 +00001127
showard170873e2009-01-07 00:22:26 +00001128 if self._state.process is None:
1129 self._handle_no_process()
showard21baa452008-10-21 00:08:39 +00001130 return
mbligh90a549d2008-03-25 23:52:34 +00001131
showard21baa452008-10-21 00:08:39 +00001132 if self._state.exit_status is None:
jadmanski0afbb632008-06-06 21:10:57 +00001133 # double check whether or not autoserv is running
showard170873e2009-01-07 00:22:26 +00001134 if _drone_manager.is_process_running(self._state.process):
showard21baa452008-10-21 00:08:39 +00001135 return
mbligh90a549d2008-03-25 23:52:34 +00001136
showard170873e2009-01-07 00:22:26 +00001137 # pid but no running process - maybe process *just* exited
1138 self._read_pidfile(use_second_read=True)
showard21baa452008-10-21 00:08:39 +00001139 if self._state.exit_status is None:
jadmanski0afbb632008-06-06 21:10:57 +00001140 # autoserv exited without writing an exit code
1141 # to the pidfile
showard21baa452008-10-21 00:08:39 +00001142 self._handle_pidfile_error(
1143 'autoserv died without writing exit code')
mblighbb421852008-03-11 22:36:16 +00001144
showard21baa452008-10-21 00:08:39 +00001145
1146 def _get_pidfile_info(self):
1147 """\
1148 After completion, self._state will contain:
1149 pid=None, exit_status=None if autoserv has not yet run
1150 pid!=None, exit_status=None if autoserv is running
1151 pid!=None, exit_status!=None if autoserv has completed
1152 """
1153 try:
1154 self._get_pidfile_info_helper()
showard170873e2009-01-07 00:22:26 +00001155 except self._PidfileException, exc:
showard21baa452008-10-21 00:08:39 +00001156 self._handle_pidfile_error('Pidfile error', traceback.format_exc())
mblighbb421852008-03-11 22:36:16 +00001157
1158
showard170873e2009-01-07 00:22:26 +00001159 def _handle_no_process(self):
jadmanski0afbb632008-06-06 21:10:57 +00001160 """\
1161 Called when no pidfile is found or no pid is in the pidfile.
1162 """
showard170873e2009-01-07 00:22:26 +00001163 message = 'No pid found at %s' % self.pidfile_id
showardb18134f2009-03-20 20:52:18 +00001164 logging.info(message)
showard170873e2009-01-07 00:22:26 +00001165 if time.time() - self._start_time > PIDFILE_TIMEOUT:
1166 email_manager.manager.enqueue_notify_email(
jadmanski0afbb632008-06-06 21:10:57 +00001167 'Process has failed to write pidfile', message)
showard35162b02009-03-03 02:17:30 +00001168 self.on_lost_process()
mbligh90a549d2008-03-25 23:52:34 +00001169
1170
showard35162b02009-03-03 02:17:30 +00001171 def on_lost_process(self, process=None):
jadmanski0afbb632008-06-06 21:10:57 +00001172 """\
1173 Called when autoserv has exited without writing an exit status,
1174 or we've timed out waiting for autoserv to write a pid to the
1175 pidfile. In either case, we just return failure and the caller
1176 should signal some kind of warning.
mbligh90a549d2008-03-25 23:52:34 +00001177
showard170873e2009-01-07 00:22:26 +00001178 process is unimportant here, as it shouldn't be used by anyone.
jadmanski0afbb632008-06-06 21:10:57 +00001179 """
1180 self.lost_process = True
showard170873e2009-01-07 00:22:26 +00001181 self._state.process = process
showard21baa452008-10-21 00:08:39 +00001182 self._state.exit_status = 1
1183 self._state.num_tests_failed = 0
mbligh90a549d2008-03-25 23:52:34 +00001184
1185
jadmanski0afbb632008-06-06 21:10:57 +00001186 def exit_code(self):
showard21baa452008-10-21 00:08:39 +00001187 self._get_pidfile_info()
1188 return self._state.exit_status
1189
1190
1191 def num_tests_failed(self):
1192 self._get_pidfile_info()
1193 assert self._state.num_tests_failed is not None
1194 return self._state.num_tests_failed
mblighbb421852008-03-11 22:36:16 +00001195
1196
mbligh36768f02008-02-22 18:28:33 +00001197class Agent(object):
showard77182562009-06-10 00:16:05 +00001198 """
1199 An agent for use by the Dispatcher class to perform a sequence of tasks.
1200
1201 The following methods are required on all task objects:
1202 poll() - Called periodically to let the task check its status and
1203 update its internal state. If the task succeeded.
1204 is_done() - Returns True if the task is finished.
1205 abort() - Called when an abort has been requested. The task must
1206 set its aborted attribute to True if it actually aborted.
1207
1208 The following attributes are required on all task objects:
1209 aborted - bool, True if this task was aborted.
1210 failure_tasks - A sequence of tasks to be run using a new Agent
1211 by the dispatcher should this task fail.
1212 success - bool, True if this task succeeded.
1213 queue_entry_ids - A sequence of HostQueueEntry ids this task handles.
1214 host_ids - A sequence of Host ids this task represents.
1215
1216 The following attribute is written to all task objects:
1217 agent - A reference to the Agent instance that the task has been
1218 added to.
1219 """
1220
1221
showard170873e2009-01-07 00:22:26 +00001222 def __init__(self, tasks, num_processes=1):
showard77182562009-06-10 00:16:05 +00001223 """
1224 @param tasks: A list of tasks as described in the class docstring.
1225 @param num_processes: The number of subprocesses the Agent represents.
1226 This is used by the Dispatcher for managing the load on the
1227 system. Defaults to 1.
1228 """
jadmanski0afbb632008-06-06 21:10:57 +00001229 self.active_task = None
showardd3dc1992009-04-22 21:01:40 +00001230 self.queue = None
showard77182562009-06-10 00:16:05 +00001231 # This is filled in by Dispatcher.add_agent()
jadmanski0afbb632008-06-06 21:10:57 +00001232 self.dispatcher = None
showard4c5374f2008-09-04 17:02:56 +00001233 self.num_processes = num_processes
jadmanski0afbb632008-06-06 21:10:57 +00001234
showard170873e2009-01-07 00:22:26 +00001235 self.queue_entry_ids = self._union_ids(task.queue_entry_ids
1236 for task in tasks)
1237 self.host_ids = self._union_ids(task.host_ids for task in tasks)
1238
showardd3dc1992009-04-22 21:01:40 +00001239 self._clear_queue()
jadmanski0afbb632008-06-06 21:10:57 +00001240 for task in tasks:
1241 self.add_task(task)
mbligh36768f02008-02-22 18:28:33 +00001242
1243
showardd3dc1992009-04-22 21:01:40 +00001244 def _clear_queue(self):
1245 self.queue = Queue.Queue(0)
1246
1247
showard170873e2009-01-07 00:22:26 +00001248 def _union_ids(self, id_lists):
1249 return set(itertools.chain(*id_lists))
1250
1251
jadmanski0afbb632008-06-06 21:10:57 +00001252 def add_task(self, task):
1253 self.queue.put_nowait(task)
1254 task.agent = self
mbligh36768f02008-02-22 18:28:33 +00001255
1256
jadmanski0afbb632008-06-06 21:10:57 +00001257 def tick(self):
showard21baa452008-10-21 00:08:39 +00001258 while not self.is_done():
showard08a36412009-05-05 01:01:13 +00001259 if self.active_task:
showard21baa452008-10-21 00:08:39 +00001260 self.active_task.poll()
1261 if not self.active_task.is_done():
1262 return
1263 self._next_task()
mbligh36768f02008-02-22 18:28:33 +00001264
1265
jadmanski0afbb632008-06-06 21:10:57 +00001266 def _next_task(self):
showardb18134f2009-03-20 20:52:18 +00001267 logging.info("agent picking task")
showard08a36412009-05-05 01:01:13 +00001268 if self.active_task is not None:
jadmanski0afbb632008-06-06 21:10:57 +00001269 assert self.active_task.is_done()
jadmanski0afbb632008-06-06 21:10:57 +00001270 if not self.active_task.success:
1271 self.on_task_failure()
showard08a36412009-05-05 01:01:13 +00001272 self.active_task = None
mblighe2586682008-02-29 22:45:46 +00001273
jadmanski0afbb632008-06-06 21:10:57 +00001274 if not self.is_done():
1275 self.active_task = self.queue.get_nowait()
mbligh36768f02008-02-22 18:28:33 +00001276
1277
jadmanski0afbb632008-06-06 21:10:57 +00001278 def on_task_failure(self):
showardd3dc1992009-04-22 21:01:40 +00001279 self._clear_queue()
showardccbd6c52009-03-21 00:10:21 +00001280 # run failure tasks in a new Agent, so host_ids and queue_entry_ids will
1281 # get reset.
1282 new_agent = Agent(self.active_task.failure_tasks)
1283 self.dispatcher.add_agent(new_agent)
mbligh16c722d2008-03-05 00:58:44 +00001284
mblighe2586682008-02-29 22:45:46 +00001285
showard4c5374f2008-09-04 17:02:56 +00001286 def is_running(self):
jadmanski0afbb632008-06-06 21:10:57 +00001287 return self.active_task is not None
showardec113162008-05-08 00:52:49 +00001288
1289
jadmanski0afbb632008-06-06 21:10:57 +00001290 def is_done(self):
mblighd876f452008-12-03 15:09:17 +00001291 return self.active_task is None and self.queue.empty()
mbligh36768f02008-02-22 18:28:33 +00001292
1293
showardd3dc1992009-04-22 21:01:40 +00001294 def abort(self):
showard08a36412009-05-05 01:01:13 +00001295 # abort tasks until the queue is empty or a task ignores the abort
1296 while not self.is_done():
1297 if not self.active_task:
1298 self._next_task()
showardd3dc1992009-04-22 21:01:40 +00001299 self.active_task.abort()
showard08a36412009-05-05 01:01:13 +00001300 if not self.active_task.aborted:
1301 # tasks can choose to ignore aborts
showard20f9bdd2009-04-29 19:48:33 +00001302 return
1303 self.active_task = None
1304
showardd3dc1992009-04-22 21:01:40 +00001305
showard77182562009-06-10 00:16:05 +00001306class DelayedCallTask(object):
1307 """
1308 A task object like AgentTask for an Agent to run that waits for the
1309 specified amount of time to have elapsed before calling the supplied
1310 callback once and finishing. If the callback returns anything, it is
1311 assumed to be a new Agent instance and will be added to the dispatcher.
1312
1313 @attribute end_time: The absolute posix time after which this task will
1314 call its callback when it is polled and be finished.
1315
1316 Also has all attributes required by the Agent class.
1317 """
1318 def __init__(self, delay_seconds, callback, now_func=None):
1319 """
1320 @param delay_seconds: The delay in seconds from now that this task
1321 will call the supplied callback and be done.
1322 @param callback: A callable to be called by this task once after at
1323 least delay_seconds time has elapsed. It must return None
1324 or a new Agent instance.
1325 @param now_func: A time.time like function. Default: time.time.
1326 Used for testing.
1327 """
1328 assert delay_seconds > 0
1329 assert callable(callback)
1330 if not now_func:
1331 now_func = time.time
1332 self._now_func = now_func
1333 self._callback = callback
1334
1335 self.end_time = self._now_func() + delay_seconds
1336
1337 # These attributes are required by Agent.
1338 self.aborted = False
1339 self.failure_tasks = ()
1340 self.host_ids = ()
1341 self.success = False
1342 self.queue_entry_ids = ()
1343 # This is filled in by Agent.add_task().
1344 self.agent = None
1345
1346
1347 def poll(self):
1348 if self._callback and self._now_func() >= self.end_time:
1349 new_agent = self._callback()
1350 if new_agent:
1351 self.agent.dispatcher.add_agent(new_agent)
1352 self._callback = None
1353 self.success = True
1354
1355
1356 def is_done(self):
1357 return not self._callback
1358
1359
1360 def abort(self):
1361 self.aborted = True
1362 self._callback = None
1363
1364
mbligh36768f02008-02-22 18:28:33 +00001365class AgentTask(object):
showardd3dc1992009-04-22 21:01:40 +00001366 def __init__(self, cmd, working_directory=None, failure_tasks=[],
1367 pidfile_name=None, paired_with_pidfile=None):
jadmanski0afbb632008-06-06 21:10:57 +00001368 self.done = False
1369 self.failure_tasks = failure_tasks
1370 self.started = False
1371 self.cmd = cmd
showard170873e2009-01-07 00:22:26 +00001372 self._working_directory = working_directory
jadmanski0afbb632008-06-06 21:10:57 +00001373 self.task = None
1374 self.agent = None
1375 self.monitor = None
1376 self.success = None
showardd3dc1992009-04-22 21:01:40 +00001377 self.aborted = False
showard170873e2009-01-07 00:22:26 +00001378 self.queue_entry_ids = []
1379 self.host_ids = []
1380 self.log_file = None
1381
1382
1383 def _set_ids(self, host=None, queue_entries=None):
1384 if queue_entries and queue_entries != [None]:
1385 self.host_ids = [entry.host.id for entry in queue_entries]
1386 self.queue_entry_ids = [entry.id for entry in queue_entries]
1387 else:
1388 assert host
1389 self.host_ids = [host.id]
mbligh36768f02008-02-22 18:28:33 +00001390
1391
jadmanski0afbb632008-06-06 21:10:57 +00001392 def poll(self):
showard08a36412009-05-05 01:01:13 +00001393 if not self.started:
1394 self.start()
1395 self.tick()
1396
1397
1398 def tick(self):
jadmanski0afbb632008-06-06 21:10:57 +00001399 if self.monitor:
showard08a36412009-05-05 01:01:13 +00001400 exit_code = self.monitor.exit_code()
1401 if exit_code is None:
1402 return
1403 success = (exit_code == 0)
jadmanski0afbb632008-06-06 21:10:57 +00001404 else:
1405 success = False
mbligh36768f02008-02-22 18:28:33 +00001406
jadmanski0afbb632008-06-06 21:10:57 +00001407 self.finished(success)
mbligh36768f02008-02-22 18:28:33 +00001408
1409
jadmanski0afbb632008-06-06 21:10:57 +00001410 def is_done(self):
1411 return self.done
mbligh36768f02008-02-22 18:28:33 +00001412
1413
jadmanski0afbb632008-06-06 21:10:57 +00001414 def finished(self, success):
showard08a36412009-05-05 01:01:13 +00001415 if self.done:
1416 return
jadmanski0afbb632008-06-06 21:10:57 +00001417 self.done = True
1418 self.success = success
1419 self.epilog()
mbligh36768f02008-02-22 18:28:33 +00001420
1421
jadmanski0afbb632008-06-06 21:10:57 +00001422 def prolog(self):
1423 pass
mblighd64e5702008-04-04 21:39:28 +00001424
1425
jadmanski0afbb632008-06-06 21:10:57 +00001426 def create_temp_resultsdir(self, suffix=''):
showard170873e2009-01-07 00:22:26 +00001427 self.temp_results_dir = _drone_manager.get_temporary_path('agent_task')
mblighd64e5702008-04-04 21:39:28 +00001428
mbligh36768f02008-02-22 18:28:33 +00001429
jadmanski0afbb632008-06-06 21:10:57 +00001430 def cleanup(self):
showard6b733412009-04-27 20:09:18 +00001431 if self.monitor and self.monitor.has_process() and self.log_file:
showard170873e2009-01-07 00:22:26 +00001432 _drone_manager.copy_to_results_repository(
1433 self.monitor.get_process(), self.log_file)
mbligh36768f02008-02-22 18:28:33 +00001434
1435
jadmanski0afbb632008-06-06 21:10:57 +00001436 def epilog(self):
1437 self.cleanup()
mbligh36768f02008-02-22 18:28:33 +00001438
1439
jadmanski0afbb632008-06-06 21:10:57 +00001440 def start(self):
1441 assert self.agent
1442
1443 if not self.started:
1444 self.prolog()
1445 self.run()
1446
1447 self.started = True
1448
1449
1450 def abort(self):
1451 if self.monitor:
1452 self.monitor.kill()
1453 self.done = True
showardd3dc1992009-04-22 21:01:40 +00001454 self.aborted = True
jadmanski0afbb632008-06-06 21:10:57 +00001455 self.cleanup()
1456
1457
showard170873e2009-01-07 00:22:26 +00001458 def set_host_log_file(self, base_name, host):
1459 filename = '%s.%s' % (time.time(), base_name)
1460 self.log_file = os.path.join('hosts', host.hostname, filename)
1461
1462
showardde634ee2009-01-30 01:44:24 +00001463 def _get_consistent_execution_tag(self, queue_entries):
1464 first_execution_tag = queue_entries[0].execution_tag()
1465 for queue_entry in queue_entries[1:]:
1466 assert queue_entry.execution_tag() == first_execution_tag, (
1467 '%s (%s) != %s (%s)' % (queue_entry.execution_tag(),
1468 queue_entry,
1469 first_execution_tag,
1470 queue_entries[0]))
1471 return first_execution_tag
1472
1473
showarda1e74b32009-05-12 17:32:04 +00001474 def _copy_results(self, queue_entries, use_monitor=None):
showardde634ee2009-01-30 01:44:24 +00001475 assert len(queue_entries) > 0
showard6b733412009-04-27 20:09:18 +00001476 if use_monitor is None:
1477 assert self.monitor
1478 use_monitor = self.monitor
1479 assert use_monitor.has_process()
showardde634ee2009-01-30 01:44:24 +00001480 execution_tag = self._get_consistent_execution_tag(queue_entries)
showard678df4f2009-02-04 21:36:39 +00001481 results_path = execution_tag + '/'
showard6b733412009-04-27 20:09:18 +00001482 _drone_manager.copy_to_results_repository(use_monitor.get_process(),
showard678df4f2009-02-04 21:36:39 +00001483 results_path)
showardde634ee2009-01-30 01:44:24 +00001484
showarda1e74b32009-05-12 17:32:04 +00001485
1486 def _parse_results(self, queue_entries):
showardde634ee2009-01-30 01:44:24 +00001487 reparse_task = FinalReparseTask(queue_entries)
1488 self.agent.dispatcher.add_agent(Agent([reparse_task], num_processes=0))
1489
1490
showarda1e74b32009-05-12 17:32:04 +00001491 def _copy_and_parse_results(self, queue_entries, use_monitor=None):
1492 self._copy_results(queue_entries, use_monitor)
1493 self._parse_results(queue_entries)
1494
1495
showardd3dc1992009-04-22 21:01:40 +00001496 def run(self, pidfile_name=_AUTOSERV_PID_FILE, paired_with_pidfile=None):
jadmanski0afbb632008-06-06 21:10:57 +00001497 if self.cmd:
showard170873e2009-01-07 00:22:26 +00001498 self.monitor = PidfileRunMonitor()
1499 self.monitor.run(self.cmd, self._working_directory,
1500 nice_level=AUTOSERV_NICE_LEVEL,
showardd3dc1992009-04-22 21:01:40 +00001501 log_file=self.log_file,
1502 pidfile_name=pidfile_name,
1503 paired_with_pidfile=paired_with_pidfile)
mbligh36768f02008-02-22 18:28:33 +00001504
1505
showardd9205182009-04-27 20:09:55 +00001506class TaskWithJobKeyvals(object):
1507 """AgentTask mixin providing functionality to help with job keyval files."""
1508 _KEYVAL_FILE = 'keyval'
1509 def _format_keyval(self, key, value):
1510 return '%s=%s' % (key, value)
1511
1512
1513 def _keyval_path(self):
1514 """Subclasses must override this"""
1515 raise NotImplemented
1516
1517
1518 def _write_keyval_after_job(self, field, value):
1519 assert self.monitor
1520 if not self.monitor.has_process():
1521 return
1522 _drone_manager.write_lines_to_file(
1523 self._keyval_path(), [self._format_keyval(field, value)],
1524 paired_with_process=self.monitor.get_process())
1525
1526
1527 def _job_queued_keyval(self, job):
1528 return 'job_queued', int(time.mktime(job.created_on.timetuple()))
1529
1530
1531 def _write_job_finished(self):
1532 self._write_keyval_after_job("job_finished", int(time.time()))
1533
1534
1535class RepairTask(AgentTask, TaskWithJobKeyvals):
showarde788ea62008-11-17 21:02:47 +00001536 def __init__(self, host, queue_entry=None):
jadmanski0afbb632008-06-06 21:10:57 +00001537 """\
showard170873e2009-01-07 00:22:26 +00001538 queue_entry: queue entry to mark failed if this repair fails.
jadmanski0afbb632008-06-06 21:10:57 +00001539 """
jadmanskifb7cfb12008-07-09 14:13:21 +00001540 protection = host_protections.Protection.get_string(host.protection)
jadmanski542537f2008-07-24 14:14:56 +00001541 # normalize the protection name
1542 protection = host_protections.Protection.get_attr_name(protection)
showard170873e2009-01-07 00:22:26 +00001543
jadmanski0afbb632008-06-06 21:10:57 +00001544 self.host = host
showardccbd6c52009-03-21 00:10:21 +00001545 self.queue_entry_to_fail = queue_entry
1546 # *don't* include the queue entry in IDs -- if the queue entry is
1547 # aborted, we want to leave the repair task running
1548 self._set_ids(host=host)
showard170873e2009-01-07 00:22:26 +00001549
1550 self.create_temp_resultsdir('.repair')
showard87ba02a2009-04-20 19:37:32 +00001551 cmd = _autoserv_command_line(host.hostname, self.temp_results_dir,
1552 ['-R', '--host-protection', protection],
1553 queue_entry=queue_entry)
showard170873e2009-01-07 00:22:26 +00001554 super(RepairTask, self).__init__(cmd, self.temp_results_dir)
1555
showard170873e2009-01-07 00:22:26 +00001556 self.set_host_log_file('repair', self.host)
mblighe2586682008-02-29 22:45:46 +00001557
mbligh36768f02008-02-22 18:28:33 +00001558
jadmanski0afbb632008-06-06 21:10:57 +00001559 def prolog(self):
showardb18134f2009-03-20 20:52:18 +00001560 logging.info("repair_task starting")
jadmanski0afbb632008-06-06 21:10:57 +00001561 self.host.set_status('Repairing')
showardccbd6c52009-03-21 00:10:21 +00001562 if self.queue_entry_to_fail:
1563 self.queue_entry_to_fail.requeue()
mbligh36768f02008-02-22 18:28:33 +00001564
1565
showardd9205182009-04-27 20:09:55 +00001566 def _keyval_path(self):
1567 return os.path.join(self.temp_results_dir, self._KEYVAL_FILE)
1568
1569
showardde634ee2009-01-30 01:44:24 +00001570 def _fail_queue_entry(self):
showardccbd6c52009-03-21 00:10:21 +00001571 assert self.queue_entry_to_fail
1572
1573 if self.queue_entry_to_fail.meta_host:
1574 return # don't fail metahost entries, they'll be reassigned
1575
1576 self.queue_entry_to_fail.update_from_database()
1577 if self.queue_entry_to_fail.status != 'Queued':
1578 return # entry has been aborted
1579
1580 self.queue_entry_to_fail.set_execution_subdir()
showardd9205182009-04-27 20:09:55 +00001581 queued_key, queued_time = self._job_queued_keyval(
1582 self.queue_entry_to_fail.job)
1583 self._write_keyval_after_job(queued_key, queued_time)
1584 self._write_job_finished()
showard678df4f2009-02-04 21:36:39 +00001585 # copy results logs into the normal place for job results
1586 _drone_manager.copy_results_on_drone(
1587 self.monitor.get_process(),
1588 source_path=self.temp_results_dir + '/',
showardccbd6c52009-03-21 00:10:21 +00001589 destination_path=self.queue_entry_to_fail.execution_tag() + '/')
showard678df4f2009-02-04 21:36:39 +00001590
showarda1e74b32009-05-12 17:32:04 +00001591 self._copy_results([self.queue_entry_to_fail])
1592 if self.queue_entry_to_fail.job.parse_failed_repair:
1593 self._parse_results([self.queue_entry_to_fail])
showardccbd6c52009-03-21 00:10:21 +00001594 self.queue_entry_to_fail.handle_host_failure()
showardde634ee2009-01-30 01:44:24 +00001595
1596
jadmanski0afbb632008-06-06 21:10:57 +00001597 def epilog(self):
1598 super(RepairTask, self).epilog()
showard6d7b2ff2009-06-10 00:16:47 +00001599
1600 tasks = models.SpecialTask.objects.filter(host__id=self.host.id,
1601 is_active=True)
1602 for task in tasks:
1603 task.is_complete = True
1604 task.save()
1605
jadmanski0afbb632008-06-06 21:10:57 +00001606 if self.success:
1607 self.host.set_status('Ready')
1608 else:
1609 self.host.set_status('Repair Failed')
showardccbd6c52009-03-21 00:10:21 +00001610 if self.queue_entry_to_fail:
showardde634ee2009-01-30 01:44:24 +00001611 self._fail_queue_entry()
mbligh36768f02008-02-22 18:28:33 +00001612
1613
showard8fe93b52008-11-18 17:53:22 +00001614class PreJobTask(AgentTask):
showard170873e2009-01-07 00:22:26 +00001615 def epilog(self):
1616 super(PreJobTask, self).epilog()
showard8fe93b52008-11-18 17:53:22 +00001617 should_copy_results = (self.queue_entry and not self.success
1618 and not self.queue_entry.meta_host)
1619 if should_copy_results:
1620 self.queue_entry.set_execution_subdir()
showard170873e2009-01-07 00:22:26 +00001621 destination = os.path.join(self.queue_entry.execution_tag(),
1622 os.path.basename(self.log_file))
1623 _drone_manager.copy_to_results_repository(
1624 self.monitor.get_process(), self.log_file,
1625 destination_path=destination)
showard8fe93b52008-11-18 17:53:22 +00001626
1627
1628class VerifyTask(PreJobTask):
showard9976ce92008-10-15 20:28:13 +00001629 def __init__(self, queue_entry=None, host=None):
jadmanski0afbb632008-06-06 21:10:57 +00001630 assert bool(queue_entry) != bool(host)
jadmanski0afbb632008-06-06 21:10:57 +00001631 self.host = host or queue_entry.host
1632 self.queue_entry = queue_entry
mbligh36768f02008-02-22 18:28:33 +00001633
jadmanski0afbb632008-06-06 21:10:57 +00001634 self.create_temp_resultsdir('.verify')
showard87ba02a2009-04-20 19:37:32 +00001635 cmd = _autoserv_command_line(self.host.hostname, self.temp_results_dir,
1636 ['-v'], queue_entry=queue_entry)
showarde788ea62008-11-17 21:02:47 +00001637 failure_tasks = [RepairTask(self.host, queue_entry=queue_entry)]
showard170873e2009-01-07 00:22:26 +00001638 super(VerifyTask, self).__init__(cmd, self.temp_results_dir,
1639 failure_tasks=failure_tasks)
mblighe2586682008-02-29 22:45:46 +00001640
showard170873e2009-01-07 00:22:26 +00001641 self.set_host_log_file('verify', self.host)
1642 self._set_ids(host=host, queue_entries=[queue_entry])
mblighe2586682008-02-29 22:45:46 +00001643
1644
jadmanski0afbb632008-06-06 21:10:57 +00001645 def prolog(self):
showard8fe93b52008-11-18 17:53:22 +00001646 super(VerifyTask, self).prolog()
showardb18134f2009-03-20 20:52:18 +00001647 logging.info("starting verify on %s", self.host.hostname)
jadmanski0afbb632008-06-06 21:10:57 +00001648 if self.queue_entry:
1649 self.queue_entry.set_status('Verifying')
jadmanski0afbb632008-06-06 21:10:57 +00001650 self.host.set_status('Verifying')
mbligh36768f02008-02-22 18:28:33 +00001651
1652
jadmanski0afbb632008-06-06 21:10:57 +00001653 def epilog(self):
1654 super(VerifyTask, self).epilog()
mbligh36768f02008-02-22 18:28:33 +00001655
jadmanski0afbb632008-06-06 21:10:57 +00001656 if self.success:
showard6d7b2ff2009-06-10 00:16:47 +00001657 tasks = models.SpecialTask.objects.filter(host__id=self.host.id,
1658 is_active=True)
1659 for task in tasks:
1660 task.is_complete=True
1661 task.save()
1662
jadmanski0afbb632008-06-06 21:10:57 +00001663 self.host.set_status('Ready')
mbligh36768f02008-02-22 18:28:33 +00001664
1665
showardb5626452009-06-30 01:57:28 +00001666class CleanupHostsMixin(object):
1667 def _reboot_hosts(self, job, queue_entries, final_success,
1668 num_tests_failed):
1669 reboot_after = job.reboot_after
1670 do_reboot = (
1671 # always reboot after aborted jobs
1672 self._final_status == models.HostQueueEntry.Status.ABORTED
1673 or reboot_after == models.RebootAfter.ALWAYS
1674 or (reboot_after == models.RebootAfter.IF_ALL_TESTS_PASSED
1675 and final_success and num_tests_failed == 0))
1676
1677 for queue_entry in queue_entries:
1678 if do_reboot:
1679 # don't pass the queue entry to the CleanupTask. if the cleanup
1680 # fails, the job doesn't care -- it's over.
1681 cleanup_task = CleanupTask(host=queue_entry.host)
1682 self.agent.dispatcher.add_agent(Agent([cleanup_task]))
1683 else:
1684 queue_entry.host.set_status('Ready')
1685
1686
1687class QueueTask(AgentTask, TaskWithJobKeyvals, CleanupHostsMixin):
showardf1ae3542009-05-11 19:26:02 +00001688 def __init__(self, job, queue_entries, cmd, group_name=''):
jadmanski0afbb632008-06-06 21:10:57 +00001689 self.job = job
1690 self.queue_entries = queue_entries
showardf1ae3542009-05-11 19:26:02 +00001691 self.group_name = group_name
showard170873e2009-01-07 00:22:26 +00001692 super(QueueTask, self).__init__(cmd, self._execution_tag())
1693 self._set_ids(queue_entries=queue_entries)
mbligh36768f02008-02-22 18:28:33 +00001694
1695
showard73ec0442009-02-07 02:05:20 +00001696 def _keyval_path(self):
showardd9205182009-04-27 20:09:55 +00001697 return os.path.join(self._execution_tag(), self._KEYVAL_FILE)
showard73ec0442009-02-07 02:05:20 +00001698
1699
1700 def _write_keyvals_before_job_helper(self, keyval_dict, keyval_path):
1701 keyval_contents = '\n'.join(self._format_keyval(key, value)
1702 for key, value in keyval_dict.iteritems())
1703 # always end with a newline to allow additional keyvals to be written
1704 keyval_contents += '\n'
1705 _drone_manager.attach_file_to_execution(self._execution_tag(),
1706 keyval_contents,
1707 file_path=keyval_path)
1708
1709
1710 def _write_keyvals_before_job(self, keyval_dict):
1711 self._write_keyvals_before_job_helper(keyval_dict, self._keyval_path())
1712
1713
showard170873e2009-01-07 00:22:26 +00001714 def _write_host_keyvals(self, host):
1715 keyval_path = os.path.join(self._execution_tag(), 'host_keyvals',
1716 host.hostname)
1717 platform, all_labels = host.platform_and_labels()
showard73ec0442009-02-07 02:05:20 +00001718 keyval_dict = dict(platform=platform, labels=','.join(all_labels))
1719 self._write_keyvals_before_job_helper(keyval_dict, keyval_path)
showardd8e548a2008-09-09 03:04:57 +00001720
1721
showard170873e2009-01-07 00:22:26 +00001722 def _execution_tag(self):
1723 return self.queue_entries[0].execution_tag()
mblighbb421852008-03-11 22:36:16 +00001724
1725
jadmanski0afbb632008-06-06 21:10:57 +00001726 def prolog(self):
showardd9205182009-04-27 20:09:55 +00001727 queued_key, queued_time = self._job_queued_keyval(self.job)
showardf1ae3542009-05-11 19:26:02 +00001728 keyval_dict = {queued_key: queued_time}
1729 if self.group_name:
1730 keyval_dict['host_group_name'] = self.group_name
1731 self._write_keyvals_before_job(keyval_dict)
jadmanski0afbb632008-06-06 21:10:57 +00001732 for queue_entry in self.queue_entries:
showard170873e2009-01-07 00:22:26 +00001733 self._write_host_keyvals(queue_entry.host)
jadmanski0afbb632008-06-06 21:10:57 +00001734 queue_entry.set_status('Running')
showard12f3e322009-05-13 21:27:42 +00001735 queue_entry.update_field('started_on', datetime.datetime.now())
jadmanski0afbb632008-06-06 21:10:57 +00001736 queue_entry.host.set_status('Running')
showard21baa452008-10-21 00:08:39 +00001737 queue_entry.host.update_field('dirty', 1)
showard2bab8f42008-11-12 18:15:22 +00001738 if self.job.synch_count == 1:
jadmanski0afbb632008-06-06 21:10:57 +00001739 assert len(self.queue_entries) == 1
1740 self.job.write_to_machines_file(self.queue_entries[0])
mbligh36768f02008-02-22 18:28:33 +00001741
1742
showard35162b02009-03-03 02:17:30 +00001743 def _write_lost_process_error_file(self):
1744 error_file_path = os.path.join(self._execution_tag(), 'job_failure')
1745 _drone_manager.write_lines_to_file(error_file_path,
1746 [_LOST_PROCESS_ERROR])
1747
1748
showardd3dc1992009-04-22 21:01:40 +00001749 def _finish_task(self):
showard08a36412009-05-05 01:01:13 +00001750 if not self.monitor:
1751 return
1752
showardd9205182009-04-27 20:09:55 +00001753 self._write_job_finished()
1754
showardd3dc1992009-04-22 21:01:40 +00001755 # both of these conditionals can be true, iff the process ran, wrote a
1756 # pid to its pidfile, and then exited without writing an exit code
showard35162b02009-03-03 02:17:30 +00001757 if self.monitor.has_process():
showardd3dc1992009-04-22 21:01:40 +00001758 gather_task = GatherLogsTask(self.job, self.queue_entries)
1759 self.agent.dispatcher.add_agent(Agent(tasks=[gather_task]))
showardb5626452009-06-30 01:57:28 +00001760 else:
1761 self._reboot_hosts(self.job, self.queue_entries,
1762 final_success=False, num_tests_failed=0)
showard35162b02009-03-03 02:17:30 +00001763
1764 if self.monitor.lost_process:
1765 self._write_lost_process_error_file()
1766 for queue_entry in self.queue_entries:
1767 queue_entry.set_status(models.HostQueueEntry.Status.FAILED)
jadmanskif7fa2cc2008-10-01 14:13:23 +00001768
1769
showardcbd74612008-11-19 21:42:02 +00001770 def _write_status_comment(self, comment):
showard170873e2009-01-07 00:22:26 +00001771 _drone_manager.write_lines_to_file(
1772 os.path.join(self._execution_tag(), 'status.log'),
1773 ['INFO\t----\t----\t' + comment],
showard35162b02009-03-03 02:17:30 +00001774 paired_with_process=self.monitor.get_process())
showardcbd74612008-11-19 21:42:02 +00001775
1776
jadmanskif7fa2cc2008-10-01 14:13:23 +00001777 def _log_abort(self):
showard170873e2009-01-07 00:22:26 +00001778 if not self.monitor or not self.monitor.has_process():
1779 return
1780
jadmanskif7fa2cc2008-10-01 14:13:23 +00001781 # build up sets of all the aborted_by and aborted_on values
1782 aborted_by, aborted_on = set(), set()
1783 for queue_entry in self.queue_entries:
1784 if queue_entry.aborted_by:
1785 aborted_by.add(queue_entry.aborted_by)
1786 t = int(time.mktime(queue_entry.aborted_on.timetuple()))
1787 aborted_on.add(t)
1788
1789 # extract some actual, unique aborted by value and write it out
1790 assert len(aborted_by) <= 1
1791 if len(aborted_by) == 1:
showardcbd74612008-11-19 21:42:02 +00001792 aborted_by_value = aborted_by.pop()
1793 aborted_on_value = max(aborted_on)
1794 else:
1795 aborted_by_value = 'autotest_system'
1796 aborted_on_value = int(time.time())
showard170873e2009-01-07 00:22:26 +00001797
showarda0382352009-02-11 23:36:43 +00001798 self._write_keyval_after_job("aborted_by", aborted_by_value)
1799 self._write_keyval_after_job("aborted_on", aborted_on_value)
showard170873e2009-01-07 00:22:26 +00001800
showardcbd74612008-11-19 21:42:02 +00001801 aborted_on_string = str(datetime.datetime.fromtimestamp(
1802 aborted_on_value))
1803 self._write_status_comment('Job aborted by %s on %s' %
1804 (aborted_by_value, aborted_on_string))
jadmanskic2ac77f2008-05-16 21:44:04 +00001805
1806
jadmanski0afbb632008-06-06 21:10:57 +00001807 def abort(self):
1808 super(QueueTask, self).abort()
jadmanskif7fa2cc2008-10-01 14:13:23 +00001809 self._log_abort()
showardd3dc1992009-04-22 21:01:40 +00001810 self._finish_task()
showard21baa452008-10-21 00:08:39 +00001811
1812
jadmanski0afbb632008-06-06 21:10:57 +00001813 def epilog(self):
1814 super(QueueTask, self).epilog()
showardd3dc1992009-04-22 21:01:40 +00001815 self._finish_task()
1816 logging.info("queue_task finished with success=%s", self.success)
mbligh36768f02008-02-22 18:28:33 +00001817
1818
mblighbb421852008-03-11 22:36:16 +00001819class RecoveryQueueTask(QueueTask):
jadmanski0afbb632008-06-06 21:10:57 +00001820 def __init__(self, job, queue_entries, run_monitor):
showard170873e2009-01-07 00:22:26 +00001821 super(RecoveryQueueTask, self).__init__(job, queue_entries, cmd=None)
showard5add1c82009-05-26 19:27:46 +00001822 self.monitor = run_monitor
1823 self.started = True
1824 # since we set started=True here, prolog() and run() shouldn't be called
mblighbb421852008-03-11 22:36:16 +00001825
1826
jadmanski0afbb632008-06-06 21:10:57 +00001827 def run(self):
showard5add1c82009-05-26 19:27:46 +00001828 raise NotImplemented('This should never be called')
mblighbb421852008-03-11 22:36:16 +00001829
1830
jadmanski0afbb632008-06-06 21:10:57 +00001831 def prolog(self):
showard5add1c82009-05-26 19:27:46 +00001832 raise NotImplemented('This should never be called')
mblighbb421852008-03-11 22:36:16 +00001833
1834
showardd3dc1992009-04-22 21:01:40 +00001835class PostJobTask(AgentTask):
1836 def __init__(self, queue_entries, pidfile_name, logfile_name,
1837 run_monitor=None):
1838 """
1839 If run_monitor != None, we're recovering a running task.
1840 """
1841 self._queue_entries = queue_entries
1842 self._pidfile_name = pidfile_name
showardd3dc1992009-04-22 21:01:40 +00001843
1844 self._execution_tag = self._get_consistent_execution_tag(queue_entries)
1845 self._results_dir = _drone_manager.absolute_path(self._execution_tag)
1846 self._autoserv_monitor = PidfileRunMonitor()
1847 self._autoserv_monitor.attach_to_existing_process(self._execution_tag)
1848 self._paired_with_pidfile = self._autoserv_monitor.pidfile_id
1849
1850 if _testing_mode:
1851 command = 'true'
1852 else:
1853 command = self._generate_command(self._results_dir)
1854
1855 super(PostJobTask, self).__init__(cmd=command,
1856 working_directory=self._execution_tag)
showard5add1c82009-05-26 19:27:46 +00001857 # this must happen *after* the super call
1858 self.monitor = run_monitor
1859 if run_monitor:
1860 self.started = True
showardd3dc1992009-04-22 21:01:40 +00001861
1862 self.log_file = os.path.join(self._execution_tag, logfile_name)
1863 self._final_status = self._determine_final_status()
1864
1865
1866 def _generate_command(self, results_dir):
1867 raise NotImplementedError('Subclasses must override this')
1868
1869
1870 def _job_was_aborted(self):
1871 was_aborted = None
1872 for queue_entry in self._queue_entries:
1873 queue_entry.update_from_database()
1874 if was_aborted is None: # first queue entry
1875 was_aborted = bool(queue_entry.aborted)
1876 elif was_aborted != bool(queue_entry.aborted): # subsequent entries
1877 email_manager.manager.enqueue_notify_email(
1878 'Inconsistent abort state',
1879 'Queue entries have inconsistent abort state: ' +
1880 ', '.join('%s (%s)' % (queue_entry, queue_entry.aborted)))
1881 # don't crash here, just assume true
1882 return True
1883 return was_aborted
1884
1885
1886 def _determine_final_status(self):
1887 if self._job_was_aborted():
1888 return models.HostQueueEntry.Status.ABORTED
1889
1890 # we'll use a PidfileRunMonitor to read the autoserv exit status
1891 if self._autoserv_monitor.exit_code() == 0:
1892 return models.HostQueueEntry.Status.COMPLETED
1893 return models.HostQueueEntry.Status.FAILED
1894
1895
1896 def run(self):
showard5add1c82009-05-26 19:27:46 +00001897 assert not self.monitor
showardd3dc1992009-04-22 21:01:40 +00001898
showard5add1c82009-05-26 19:27:46 +00001899 # make sure we actually have results to work with.
1900 # this should never happen in normal operation.
1901 if not self._autoserv_monitor.has_process():
1902 email_manager.manager.enqueue_notify_email(
1903 'No results in post-job task',
1904 'No results in post-job task at %s' %
1905 self._autoserv_monitor.pidfile_id)
1906 self.finished(False)
1907 return
1908
1909 super(PostJobTask, self).run(
1910 pidfile_name=self._pidfile_name,
1911 paired_with_pidfile=self._paired_with_pidfile)
showardd3dc1992009-04-22 21:01:40 +00001912
1913
1914 def _set_all_statuses(self, status):
1915 for queue_entry in self._queue_entries:
1916 queue_entry.set_status(status)
1917
1918
1919 def abort(self):
1920 # override AgentTask.abort() to avoid killing the process and ending
1921 # the task. post-job tasks continue when the job is aborted.
1922 pass
1923
1924
showardb5626452009-06-30 01:57:28 +00001925class GatherLogsTask(PostJobTask, CleanupHostsMixin):
showardd3dc1992009-04-22 21:01:40 +00001926 """
1927 Task responsible for
1928 * gathering uncollected logs (if Autoserv crashed hard or was killed)
1929 * copying logs to the results repository
1930 * spawning CleanupTasks for hosts, if necessary
1931 * spawning a FinalReparseTask for the job
1932 """
1933 def __init__(self, job, queue_entries, run_monitor=None):
1934 self._job = job
1935 super(GatherLogsTask, self).__init__(
1936 queue_entries, pidfile_name=_CRASHINFO_PID_FILE,
1937 logfile_name='.collect_crashinfo.log', run_monitor=run_monitor)
1938 self._set_ids(queue_entries=queue_entries)
1939
1940
1941 def _generate_command(self, results_dir):
1942 host_list = ','.join(queue_entry.host.hostname
1943 for queue_entry in self._queue_entries)
1944 return [_autoserv_path , '-p', '--collect-crashinfo', '-m', host_list,
1945 '-r', results_dir]
1946
1947
1948 def prolog(self):
1949 super(GatherLogsTask, self).prolog()
1950 self._set_all_statuses(models.HostQueueEntry.Status.GATHERING)
1951
1952
showardd3dc1992009-04-22 21:01:40 +00001953 def epilog(self):
1954 super(GatherLogsTask, self).epilog()
showardebc0fb72009-05-13 21:28:07 +00001955 if self._autoserv_monitor.has_process():
1956 self._copy_and_parse_results(self._queue_entries,
1957 use_monitor=self._autoserv_monitor)
showardb5626452009-06-30 01:57:28 +00001958
1959 final_success = (
1960 self._final_status == models.HostQueueEntry.Status.COMPLETED)
1961 num_tests_failed = self._autoserv_monitor.num_tests_failed()
1962 self._reboot_hosts(self._job, self._queue_entries, final_success,
1963 num_tests_failed)
showardd3dc1992009-04-22 21:01:40 +00001964
1965
showard0bbfc212009-04-29 21:06:13 +00001966 def run(self):
showard597bfd32009-05-08 18:22:50 +00001967 autoserv_exit_code = self._autoserv_monitor.exit_code()
1968 # only run if Autoserv exited due to some signal. if we have no exit
1969 # code, assume something bad (and signal-like) happened.
1970 if autoserv_exit_code is None or os.WIFSIGNALED(autoserv_exit_code):
showard0bbfc212009-04-29 21:06:13 +00001971 super(GatherLogsTask, self).run()
showard597bfd32009-05-08 18:22:50 +00001972 else:
1973 self.finished(True)
showard0bbfc212009-04-29 21:06:13 +00001974
1975
showard8fe93b52008-11-18 17:53:22 +00001976class CleanupTask(PreJobTask):
showardfa8629c2008-11-04 16:51:23 +00001977 def __init__(self, host=None, queue_entry=None):
1978 assert bool(host) ^ bool(queue_entry)
1979 if queue_entry:
1980 host = queue_entry.get_host()
showardfa8629c2008-11-04 16:51:23 +00001981 self.queue_entry = queue_entry
jadmanski0afbb632008-06-06 21:10:57 +00001982 self.host = host
showard170873e2009-01-07 00:22:26 +00001983
1984 self.create_temp_resultsdir('.cleanup')
showard87ba02a2009-04-20 19:37:32 +00001985 self.cmd = _autoserv_command_line(host.hostname, self.temp_results_dir,
1986 ['--cleanup'],
1987 queue_entry=queue_entry)
showarde788ea62008-11-17 21:02:47 +00001988 repair_task = RepairTask(host, queue_entry=queue_entry)
showard170873e2009-01-07 00:22:26 +00001989 super(CleanupTask, self).__init__(self.cmd, self.temp_results_dir,
1990 failure_tasks=[repair_task])
1991
1992 self._set_ids(host=host, queue_entries=[queue_entry])
1993 self.set_host_log_file('cleanup', self.host)
mbligh16c722d2008-03-05 00:58:44 +00001994
mblighd5c95802008-03-05 00:33:46 +00001995
jadmanski0afbb632008-06-06 21:10:57 +00001996 def prolog(self):
showard8fe93b52008-11-18 17:53:22 +00001997 super(CleanupTask, self).prolog()
showardb18134f2009-03-20 20:52:18 +00001998 logging.info("starting cleanup task for host: %s", self.host.hostname)
showard45ae8192008-11-05 19:32:53 +00001999 self.host.set_status("Cleaning")
mblighd5c95802008-03-05 00:33:46 +00002000
mblighd5c95802008-03-05 00:33:46 +00002001
showard21baa452008-10-21 00:08:39 +00002002 def epilog(self):
showard45ae8192008-11-05 19:32:53 +00002003 super(CleanupTask, self).epilog()
showard21baa452008-10-21 00:08:39 +00002004 if self.success:
showardfa8629c2008-11-04 16:51:23 +00002005 self.host.set_status('Ready')
showard21baa452008-10-21 00:08:39 +00002006 self.host.update_field('dirty', 0)
2007
2008
showardd3dc1992009-04-22 21:01:40 +00002009class FinalReparseTask(PostJobTask):
showard97aed502008-11-04 02:01:24 +00002010 _num_running_parses = 0
2011
showardd3dc1992009-04-22 21:01:40 +00002012 def __init__(self, queue_entries, run_monitor=None):
2013 super(FinalReparseTask, self).__init__(queue_entries,
2014 pidfile_name=_PARSER_PID_FILE,
2015 logfile_name='.parse.log',
2016 run_monitor=run_monitor)
showard170873e2009-01-07 00:22:26 +00002017 # don't use _set_ids, since we don't want to set the host_ids
2018 self.queue_entry_ids = [entry.id for entry in queue_entries]
showard5add1c82009-05-26 19:27:46 +00002019 self._parse_started = (run_monitor is not None)
showard97aed502008-11-04 02:01:24 +00002020
showard97aed502008-11-04 02:01:24 +00002021
2022 @classmethod
2023 def _increment_running_parses(cls):
2024 cls._num_running_parses += 1
2025
2026
2027 @classmethod
2028 def _decrement_running_parses(cls):
2029 cls._num_running_parses -= 1
2030
2031
2032 @classmethod
2033 def _can_run_new_parse(cls):
showardd1ee1dd2009-01-07 21:33:08 +00002034 return (cls._num_running_parses <
2035 scheduler_config.config.max_parse_processes)
showard97aed502008-11-04 02:01:24 +00002036
2037
2038 def prolog(self):
2039 super(FinalReparseTask, self).prolog()
showardd3dc1992009-04-22 21:01:40 +00002040 self._set_all_statuses(models.HostQueueEntry.Status.PARSING)
showard97aed502008-11-04 02:01:24 +00002041
2042
2043 def epilog(self):
2044 super(FinalReparseTask, self).epilog()
showardd3dc1992009-04-22 21:01:40 +00002045 self._set_all_statuses(self._final_status)
showard97aed502008-11-04 02:01:24 +00002046
2047
showardd3dc1992009-04-22 21:01:40 +00002048 def _generate_command(self, results_dir):
mbligh9e936402009-05-13 20:42:17 +00002049 return [_parser_path, '--write-pidfile', '-l', '2', '-r', '-o', '-P',
showardd3dc1992009-04-22 21:01:40 +00002050 results_dir]
showard97aed502008-11-04 02:01:24 +00002051
2052
showard08a36412009-05-05 01:01:13 +00002053 def tick(self):
2054 # override tick to keep trying to start until the parse count goes down
showard97aed502008-11-04 02:01:24 +00002055 # and we can, at which point we revert to default behavior
2056 if self._parse_started:
showard08a36412009-05-05 01:01:13 +00002057 super(FinalReparseTask, self).tick()
showard97aed502008-11-04 02:01:24 +00002058 else:
2059 self._try_starting_parse()
2060
2061
2062 def run(self):
2063 # override run() to not actually run unless we can
2064 self._try_starting_parse()
2065
2066
2067 def _try_starting_parse(self):
2068 if not self._can_run_new_parse():
2069 return
showard170873e2009-01-07 00:22:26 +00002070
showard97aed502008-11-04 02:01:24 +00002071 # actually run the parse command
showardd3dc1992009-04-22 21:01:40 +00002072 super(FinalReparseTask, self).run()
showard170873e2009-01-07 00:22:26 +00002073
showard97aed502008-11-04 02:01:24 +00002074 self._increment_running_parses()
2075 self._parse_started = True
2076
2077
2078 def finished(self, success):
2079 super(FinalReparseTask, self).finished(success)
showard678df4f2009-02-04 21:36:39 +00002080 if self._parse_started:
2081 self._decrement_running_parses()
showard97aed502008-11-04 02:01:24 +00002082
2083
showardc9ae1782009-01-30 01:42:37 +00002084class SetEntryPendingTask(AgentTask):
2085 def __init__(self, queue_entry):
2086 super(SetEntryPendingTask, self).__init__(cmd='')
2087 self._queue_entry = queue_entry
2088 self._set_ids(queue_entries=[queue_entry])
2089
2090
2091 def run(self):
2092 agent = self._queue_entry.on_pending()
2093 if agent:
2094 self.agent.dispatcher.add_agent(agent)
2095 self.finished(True)
2096
2097
showarda3c58572009-03-12 20:36:59 +00002098class DBError(Exception):
2099 """Raised by the DBObject constructor when its select fails."""
2100
2101
mbligh36768f02008-02-22 18:28:33 +00002102class DBObject(object):
showarda3c58572009-03-12 20:36:59 +00002103 """A miniature object relational model for the database."""
showard6ae5ea92009-02-25 00:11:51 +00002104
2105 # Subclasses MUST override these:
2106 _table_name = ''
2107 _fields = ()
2108
showarda3c58572009-03-12 20:36:59 +00002109 # A mapping from (type, id) to the instance of the object for that
2110 # particular id. This prevents us from creating new Job() and Host()
2111 # instances for every HostQueueEntry object that we instantiate as
2112 # multiple HQEs often share the same Job.
2113 _instances_by_type_and_id = weakref.WeakValueDictionary()
2114 _initialized = False
showard6ae5ea92009-02-25 00:11:51 +00002115
showarda3c58572009-03-12 20:36:59 +00002116
2117 def __new__(cls, id=None, **kwargs):
2118 """
2119 Look to see if we already have an instance for this particular type
2120 and id. If so, use it instead of creating a duplicate instance.
2121 """
2122 if id is not None:
2123 instance = cls._instances_by_type_and_id.get((cls, id))
2124 if instance:
2125 return instance
2126 return super(DBObject, cls).__new__(cls, id=id, **kwargs)
2127
2128
2129 def __init__(self, id=None, row=None, new_record=False, always_query=True):
jadmanski0afbb632008-06-06 21:10:57 +00002130 assert (bool(id) != bool(row))
showard6ae5ea92009-02-25 00:11:51 +00002131 assert self._table_name, '_table_name must be defined in your class'
2132 assert self._fields, '_fields must be defined in your class'
showarda3c58572009-03-12 20:36:59 +00002133 if not new_record:
2134 if self._initialized and not always_query:
2135 return # We've already been initialized.
2136 if id is None:
2137 id = row[0]
2138 # Tell future constructors to use us instead of re-querying while
2139 # this instance is still around.
2140 self._instances_by_type_and_id[(type(self), id)] = self
mbligh36768f02008-02-22 18:28:33 +00002141
showard6ae5ea92009-02-25 00:11:51 +00002142 self.__table = self._table_name
mbligh36768f02008-02-22 18:28:33 +00002143
jadmanski0afbb632008-06-06 21:10:57 +00002144 self.__new_record = new_record
mbligh36768f02008-02-22 18:28:33 +00002145
jadmanski0afbb632008-06-06 21:10:57 +00002146 if row is None:
showardccbd6c52009-03-21 00:10:21 +00002147 row = self._fetch_row_from_db(id)
mbligh36768f02008-02-22 18:28:33 +00002148
showarda3c58572009-03-12 20:36:59 +00002149 if self._initialized:
2150 differences = self._compare_fields_in_row(row)
2151 if differences:
showard7629f142009-03-27 21:02:02 +00002152 logging.warn(
2153 'initialized %s %s instance requery is updating: %s',
2154 type(self), self.id, differences)
showard2bab8f42008-11-12 18:15:22 +00002155 self._update_fields_from_row(row)
showarda3c58572009-03-12 20:36:59 +00002156 self._initialized = True
2157
2158
2159 @classmethod
2160 def _clear_instance_cache(cls):
2161 """Used for testing, clear the internal instance cache."""
2162 cls._instances_by_type_and_id.clear()
2163
2164
showardccbd6c52009-03-21 00:10:21 +00002165 def _fetch_row_from_db(self, row_id):
2166 sql = 'SELECT * FROM %s WHERE ID=%%s' % self.__table
2167 rows = _db.execute(sql, (row_id,))
2168 if not rows:
showard76e29d12009-04-15 21:53:10 +00002169 raise DBError("row not found (table=%s, row id=%s)"
2170 % (self.__table, row_id))
showardccbd6c52009-03-21 00:10:21 +00002171 return rows[0]
2172
2173
showarda3c58572009-03-12 20:36:59 +00002174 def _assert_row_length(self, row):
2175 assert len(row) == len(self._fields), (
2176 "table = %s, row = %s/%d, fields = %s/%d" % (
2177 self.__table, row, len(row), self._fields, len(self._fields)))
2178
2179
2180 def _compare_fields_in_row(self, row):
2181 """
2182 Given a row as returned by a SELECT query, compare it to our existing
2183 in memory fields.
2184
2185 @param row - A sequence of values corresponding to fields named in
2186 The class attribute _fields.
2187
2188 @returns A dictionary listing the differences keyed by field name
2189 containing tuples of (current_value, row_value).
2190 """
2191 self._assert_row_length(row)
2192 differences = {}
2193 for field, row_value in itertools.izip(self._fields, row):
2194 current_value = getattr(self, field)
2195 if current_value != row_value:
2196 differences[field] = (current_value, row_value)
2197 return differences
showard2bab8f42008-11-12 18:15:22 +00002198
2199
2200 def _update_fields_from_row(self, row):
showarda3c58572009-03-12 20:36:59 +00002201 """
2202 Update our field attributes using a single row returned by SELECT.
2203
2204 @param row - A sequence of values corresponding to fields named in
2205 the class fields list.
2206 """
2207 self._assert_row_length(row)
mbligh36768f02008-02-22 18:28:33 +00002208
showard2bab8f42008-11-12 18:15:22 +00002209 self._valid_fields = set()
showarda3c58572009-03-12 20:36:59 +00002210 for field, value in itertools.izip(self._fields, row):
showard2bab8f42008-11-12 18:15:22 +00002211 setattr(self, field, value)
2212 self._valid_fields.add(field)
mbligh36768f02008-02-22 18:28:33 +00002213
showard2bab8f42008-11-12 18:15:22 +00002214 self._valid_fields.remove('id')
mbligh36768f02008-02-22 18:28:33 +00002215
mblighe2586682008-02-29 22:45:46 +00002216
showardccbd6c52009-03-21 00:10:21 +00002217 def update_from_database(self):
2218 assert self.id is not None
2219 row = self._fetch_row_from_db(self.id)
2220 self._update_fields_from_row(row)
2221
2222
jadmanski0afbb632008-06-06 21:10:57 +00002223 def count(self, where, table = None):
2224 if not table:
2225 table = self.__table
mbligh36768f02008-02-22 18:28:33 +00002226
jadmanski0afbb632008-06-06 21:10:57 +00002227 rows = _db.execute("""
2228 SELECT count(*) FROM %s
2229 WHERE %s
2230 """ % (table, where))
mbligh6f8bab42008-02-29 22:45:14 +00002231
jadmanski0afbb632008-06-06 21:10:57 +00002232 assert len(rows) == 1
2233
2234 return int(rows[0][0])
mbligh36768f02008-02-22 18:28:33 +00002235
2236
showardd3dc1992009-04-22 21:01:40 +00002237 def update_field(self, field, value):
showard2bab8f42008-11-12 18:15:22 +00002238 assert field in self._valid_fields
mbligh36768f02008-02-22 18:28:33 +00002239
showard2bab8f42008-11-12 18:15:22 +00002240 if getattr(self, field) == value:
jadmanski0afbb632008-06-06 21:10:57 +00002241 return
mbligh36768f02008-02-22 18:28:33 +00002242
mblighf8c624d2008-07-03 16:58:45 +00002243 query = "UPDATE %s SET %s = %%s WHERE id = %%s" % (self.__table, field)
jadmanski0afbb632008-06-06 21:10:57 +00002244 _db.execute(query, (value, self.id))
2245
showard2bab8f42008-11-12 18:15:22 +00002246 setattr(self, field, value)
mbligh36768f02008-02-22 18:28:33 +00002247
2248
jadmanski0afbb632008-06-06 21:10:57 +00002249 def save(self):
2250 if self.__new_record:
showard6ae5ea92009-02-25 00:11:51 +00002251 keys = self._fields[1:] # avoid id
jadmanski0afbb632008-06-06 21:10:57 +00002252 columns = ','.join([str(key) for key in keys])
showard76e29d12009-04-15 21:53:10 +00002253 values = []
2254 for key in keys:
2255 value = getattr(self, key)
2256 if value is None:
2257 values.append('NULL')
2258 else:
2259 values.append('"%s"' % value)
showard89f84db2009-03-12 20:39:13 +00002260 values_str = ','.join(values)
2261 query = ('INSERT INTO %s (%s) VALUES (%s)' %
2262 (self.__table, columns, values_str))
jadmanski0afbb632008-06-06 21:10:57 +00002263 _db.execute(query)
showard89f84db2009-03-12 20:39:13 +00002264 # Update our id to the one the database just assigned to us.
2265 self.id = _db.execute('SELECT LAST_INSERT_ID()')[0][0]
mbligh36768f02008-02-22 18:28:33 +00002266
2267
jadmanski0afbb632008-06-06 21:10:57 +00002268 def delete(self):
showarda3c58572009-03-12 20:36:59 +00002269 self._instances_by_type_and_id.pop((type(self), id), None)
2270 self._initialized = False
2271 self._valid_fields.clear()
jadmanski0afbb632008-06-06 21:10:57 +00002272 query = 'DELETE FROM %s WHERE id=%%s' % self.__table
2273 _db.execute(query, (self.id,))
mblighe2586682008-02-29 22:45:46 +00002274
2275
showard63a34772008-08-18 19:32:50 +00002276 @staticmethod
2277 def _prefix_with(string, prefix):
2278 if string:
2279 string = prefix + string
2280 return string
2281
2282
jadmanski0afbb632008-06-06 21:10:57 +00002283 @classmethod
showard989f25d2008-10-01 11:38:11 +00002284 def fetch(cls, where='', params=(), joins='', order_by=''):
showard89f84db2009-03-12 20:39:13 +00002285 """
2286 Construct instances of our class based on the given database query.
2287
2288 @yields One class instance for each row fetched.
2289 """
showard63a34772008-08-18 19:32:50 +00002290 order_by = cls._prefix_with(order_by, 'ORDER BY ')
2291 where = cls._prefix_with(where, 'WHERE ')
2292 query = ('SELECT %(table)s.* FROM %(table)s %(joins)s '
showard6ae5ea92009-02-25 00:11:51 +00002293 '%(where)s %(order_by)s' % {'table' : cls._table_name,
showard63a34772008-08-18 19:32:50 +00002294 'joins' : joins,
2295 'where' : where,
2296 'order_by' : order_by})
2297 rows = _db.execute(query, params)
jadmanski0afbb632008-06-06 21:10:57 +00002298 for row in rows:
2299 yield cls(row=row)
mblighe2586682008-02-29 22:45:46 +00002300
mbligh36768f02008-02-22 18:28:33 +00002301
2302class IneligibleHostQueue(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002303 _table_name = 'ineligible_host_queues'
2304 _fields = ('id', 'job_id', 'host_id')
showard04c82c52008-05-29 19:38:12 +00002305
2306
showard89f84db2009-03-12 20:39:13 +00002307class AtomicGroup(DBObject):
2308 _table_name = 'atomic_groups'
showard205fd602009-03-21 00:17:35 +00002309 _fields = ('id', 'name', 'description', 'max_number_of_machines',
2310 'invalid')
showard89f84db2009-03-12 20:39:13 +00002311
2312
showard989f25d2008-10-01 11:38:11 +00002313class Label(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002314 _table_name = 'labels'
2315 _fields = ('id', 'name', 'kernel_config', 'platform', 'invalid',
showard89f84db2009-03-12 20:39:13 +00002316 'only_if_needed', 'atomic_group_id')
showard989f25d2008-10-01 11:38:11 +00002317
2318
mbligh36768f02008-02-22 18:28:33 +00002319class Host(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002320 _table_name = 'hosts'
2321 _fields = ('id', 'hostname', 'locked', 'synch_id', 'status',
2322 'invalid', 'protection', 'locked_by_id', 'lock_time', 'dirty')
2323
2324
jadmanski0afbb632008-06-06 21:10:57 +00002325 def current_task(self):
2326 rows = _db.execute("""
2327 SELECT * FROM host_queue_entries WHERE host_id=%s AND NOT complete AND active
2328 """, (self.id,))
2329
2330 if len(rows) == 0:
2331 return None
2332 else:
2333 assert len(rows) == 1
2334 results = rows[0];
jadmanski0afbb632008-06-06 21:10:57 +00002335 return HostQueueEntry(row=results)
mbligh36768f02008-02-22 18:28:33 +00002336
2337
jadmanski0afbb632008-06-06 21:10:57 +00002338 def yield_work(self):
showardb18134f2009-03-20 20:52:18 +00002339 logging.info("%s yielding work", self.hostname)
jadmanski0afbb632008-06-06 21:10:57 +00002340 if self.current_task():
2341 self.current_task().requeue()
2342
showard6ae5ea92009-02-25 00:11:51 +00002343
jadmanski0afbb632008-06-06 21:10:57 +00002344 def set_status(self,status):
showardb18134f2009-03-20 20:52:18 +00002345 logging.info('%s -> %s', self.hostname, status)
jadmanski0afbb632008-06-06 21:10:57 +00002346 self.update_field('status',status)
mbligh36768f02008-02-22 18:28:33 +00002347
2348
showard170873e2009-01-07 00:22:26 +00002349 def platform_and_labels(self):
showardd8e548a2008-09-09 03:04:57 +00002350 """
showard170873e2009-01-07 00:22:26 +00002351 Returns a tuple (platform_name, list_of_all_label_names).
showardd8e548a2008-09-09 03:04:57 +00002352 """
2353 rows = _db.execute("""
showard170873e2009-01-07 00:22:26 +00002354 SELECT labels.name, labels.platform
showardd8e548a2008-09-09 03:04:57 +00002355 FROM labels
2356 INNER JOIN hosts_labels ON labels.id = hosts_labels.label_id
showard170873e2009-01-07 00:22:26 +00002357 WHERE hosts_labels.host_id = %s
showardd8e548a2008-09-09 03:04:57 +00002358 ORDER BY labels.name
2359 """, (self.id,))
showard170873e2009-01-07 00:22:26 +00002360 platform = None
2361 all_labels = []
2362 for label_name, is_platform in rows:
2363 if is_platform:
2364 platform = label_name
2365 all_labels.append(label_name)
2366 return platform, all_labels
2367
2368
showarda64e52a2009-06-08 23:24:08 +00002369 def reverify_tasks(self, cleanup=True):
2370 tasks = [VerifyTask(host=self)]
showard6d7b2ff2009-06-10 00:16:47 +00002371 # just to make sure this host does not get taken away
2372 self.set_status('Verifying')
showarda64e52a2009-06-08 23:24:08 +00002373 if cleanup:
2374 tasks.insert(0, CleanupTask(host=self))
showard6d7b2ff2009-06-10 00:16:47 +00002375 self.set_status('Cleaning')
showarda64e52a2009-06-08 23:24:08 +00002376 return tasks
showardd8e548a2008-09-09 03:04:57 +00002377
2378
showard54c1ea92009-05-20 00:32:58 +00002379 _ALPHANUM_HOST_RE = re.compile(r'^([a-z-]+)(\d+)$', re.IGNORECASE)
2380
2381
2382 @classmethod
2383 def cmp_for_sort(cls, a, b):
2384 """
2385 A comparison function for sorting Host objects by hostname.
2386
2387 This strips any trailing numeric digits, ignores leading 0s and
2388 compares hostnames by the leading name and the trailing digits as a
2389 number. If both hostnames do not match this pattern, they are simply
2390 compared as lower case strings.
2391
2392 Example of how hostnames will be sorted:
2393
2394 alice, host1, host2, host09, host010, host10, host11, yolkfolk
2395
2396 This hopefully satisfy most people's hostname sorting needs regardless
2397 of their exact naming schemes. Nobody sane should have both a host10
2398 and host010 (but the algorithm works regardless).
2399 """
2400 lower_a = a.hostname.lower()
2401 lower_b = b.hostname.lower()
2402 match_a = cls._ALPHANUM_HOST_RE.match(lower_a)
2403 match_b = cls._ALPHANUM_HOST_RE.match(lower_b)
2404 if match_a and match_b:
2405 name_a, number_a_str = match_a.groups()
2406 name_b, number_b_str = match_b.groups()
2407 number_a = int(number_a_str.lstrip('0'))
2408 number_b = int(number_b_str.lstrip('0'))
2409 result = cmp((name_a, number_a), (name_b, number_b))
2410 if result == 0 and lower_a != lower_b:
2411 # If they compared equal above but the lower case names are
2412 # indeed different, don't report equality. abc012 != abc12.
2413 return cmp(lower_a, lower_b)
2414 return result
2415 else:
2416 return cmp(lower_a, lower_b)
2417
2418
mbligh36768f02008-02-22 18:28:33 +00002419class HostQueueEntry(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002420 _table_name = 'host_queue_entries'
2421 _fields = ('id', 'job_id', 'host_id', 'status', 'meta_host',
showard89f84db2009-03-12 20:39:13 +00002422 'active', 'complete', 'deleted', 'execution_subdir',
showard12f3e322009-05-13 21:27:42 +00002423 'atomic_group_id', 'aborted', 'started_on')
showard6ae5ea92009-02-25 00:11:51 +00002424
2425
showarda3c58572009-03-12 20:36:59 +00002426 def __init__(self, id=None, row=None, **kwargs):
jadmanski0afbb632008-06-06 21:10:57 +00002427 assert id or row
showarda3c58572009-03-12 20:36:59 +00002428 super(HostQueueEntry, self).__init__(id=id, row=row, **kwargs)
jadmanski0afbb632008-06-06 21:10:57 +00002429 self.job = Job(self.job_id)
mbligh36768f02008-02-22 18:28:33 +00002430
jadmanski0afbb632008-06-06 21:10:57 +00002431 if self.host_id:
2432 self.host = Host(self.host_id)
2433 else:
2434 self.host = None
mbligh36768f02008-02-22 18:28:33 +00002435
showard77182562009-06-10 00:16:05 +00002436 if self.atomic_group_id:
2437 self.atomic_group = AtomicGroup(self.atomic_group_id,
2438 always_query=False)
2439 else:
2440 self.atomic_group = None
2441
showard170873e2009-01-07 00:22:26 +00002442 self.queue_log_path = os.path.join(self.job.tag(),
jadmanski0afbb632008-06-06 21:10:57 +00002443 'queue.log.' + str(self.id))
mbligh36768f02008-02-22 18:28:33 +00002444
2445
showard89f84db2009-03-12 20:39:13 +00002446 @classmethod
2447 def clone(cls, template):
2448 """
2449 Creates a new row using the values from a template instance.
2450
2451 The new instance will not exist in the database or have a valid
2452 id attribute until its save() method is called.
2453 """
2454 assert isinstance(template, cls)
2455 new_row = [getattr(template, field) for field in cls._fields]
2456 clone = cls(row=new_row, new_record=True)
2457 clone.id = None
2458 return clone
2459
2460
showardc85c21b2008-11-24 22:17:37 +00002461 def _view_job_url(self):
2462 return "%s#tab_id=view_job&object_id=%s" % (_base_url, self.job.id)
2463
2464
showardf1ae3542009-05-11 19:26:02 +00002465 def get_labels(self):
2466 """
2467 Get all labels associated with this host queue entry (either via the
2468 meta_host or as a job dependency label). The labels yielded are not
2469 guaranteed to be unique.
2470
2471 @yields Label instances associated with this host_queue_entry.
2472 """
2473 if self.meta_host:
2474 yield Label(id=self.meta_host, always_query=False)
2475 labels = Label.fetch(
2476 joins="JOIN jobs_dependency_labels AS deps "
2477 "ON (labels.id = deps.label_id)",
2478 where="deps.job_id = %d" % self.job.id)
2479 for label in labels:
2480 yield label
2481
2482
jadmanski0afbb632008-06-06 21:10:57 +00002483 def set_host(self, host):
2484 if host:
2485 self.queue_log_record('Assigning host ' + host.hostname)
2486 self.update_field('host_id', host.id)
2487 self.update_field('active', True)
2488 self.block_host(host.id)
2489 else:
2490 self.queue_log_record('Releasing host')
2491 self.unblock_host(self.host.id)
2492 self.update_field('host_id', None)
mbligh36768f02008-02-22 18:28:33 +00002493
jadmanski0afbb632008-06-06 21:10:57 +00002494 self.host = host
mbligh36768f02008-02-22 18:28:33 +00002495
2496
jadmanski0afbb632008-06-06 21:10:57 +00002497 def get_host(self):
2498 return self.host
mbligh36768f02008-02-22 18:28:33 +00002499
2500
jadmanski0afbb632008-06-06 21:10:57 +00002501 def queue_log_record(self, log_line):
2502 now = str(datetime.datetime.now())
showard170873e2009-01-07 00:22:26 +00002503 _drone_manager.write_lines_to_file(self.queue_log_path,
2504 [now + ' ' + log_line])
mbligh36768f02008-02-22 18:28:33 +00002505
2506
jadmanski0afbb632008-06-06 21:10:57 +00002507 def block_host(self, host_id):
showardb18134f2009-03-20 20:52:18 +00002508 logging.info("creating block %s/%s", self.job.id, host_id)
jadmanski0afbb632008-06-06 21:10:57 +00002509 row = [0, self.job.id, host_id]
2510 block = IneligibleHostQueue(row=row, new_record=True)
2511 block.save()
mblighe2586682008-02-29 22:45:46 +00002512
2513
jadmanski0afbb632008-06-06 21:10:57 +00002514 def unblock_host(self, host_id):
showardb18134f2009-03-20 20:52:18 +00002515 logging.info("removing block %s/%s", self.job.id, host_id)
jadmanski0afbb632008-06-06 21:10:57 +00002516 blocks = IneligibleHostQueue.fetch(
2517 'job_id=%d and host_id=%d' % (self.job.id, host_id))
2518 for block in blocks:
2519 block.delete()
mblighe2586682008-02-29 22:45:46 +00002520
2521
showard2bab8f42008-11-12 18:15:22 +00002522 def set_execution_subdir(self, subdir=None):
2523 if subdir is None:
2524 assert self.get_host()
2525 subdir = self.get_host().hostname
2526 self.update_field('execution_subdir', subdir)
mbligh36768f02008-02-22 18:28:33 +00002527
2528
showard6355f6b2008-12-05 18:52:13 +00002529 def _get_hostname(self):
2530 if self.host:
2531 return self.host.hostname
2532 return 'no host'
2533
2534
showard170873e2009-01-07 00:22:26 +00002535 def __str__(self):
2536 return "%s/%d (%d)" % (self._get_hostname(), self.job.id, self.id)
2537
2538
jadmanski0afbb632008-06-06 21:10:57 +00002539 def set_status(self, status):
showardd3dc1992009-04-22 21:01:40 +00002540 self.update_field('status', status)
mblighf8c624d2008-07-03 16:58:45 +00002541
showardb18134f2009-03-20 20:52:18 +00002542 logging.info("%s -> %s", self, self.status)
mblighf8c624d2008-07-03 16:58:45 +00002543
showardc85c21b2008-11-24 22:17:37 +00002544 if status in ['Queued', 'Parsing']:
jadmanski0afbb632008-06-06 21:10:57 +00002545 self.update_field('complete', False)
2546 self.update_field('active', False)
mbligh36768f02008-02-22 18:28:33 +00002547
jadmanski0afbb632008-06-06 21:10:57 +00002548 if status in ['Pending', 'Running', 'Verifying', 'Starting',
showardd3dc1992009-04-22 21:01:40 +00002549 'Gathering']:
jadmanski0afbb632008-06-06 21:10:57 +00002550 self.update_field('complete', False)
2551 self.update_field('active', True)
mbligh36768f02008-02-22 18:28:33 +00002552
showardc85c21b2008-11-24 22:17:37 +00002553 if status in ['Failed', 'Completed', 'Stopped', 'Aborted']:
jadmanski0afbb632008-06-06 21:10:57 +00002554 self.update_field('complete', True)
2555 self.update_field('active', False)
showardc85c21b2008-11-24 22:17:37 +00002556
2557 should_email_status = (status.lower() in _notify_email_statuses or
2558 'all' in _notify_email_statuses)
2559 if should_email_status:
2560 self._email_on_status(status)
2561
2562 self._email_on_job_complete()
2563
2564
2565 def _email_on_status(self, status):
showard6355f6b2008-12-05 18:52:13 +00002566 hostname = self._get_hostname()
showardc85c21b2008-11-24 22:17:37 +00002567
2568 subject = 'Autotest: Job ID: %s "%s" Host: %s %s' % (
2569 self.job.id, self.job.name, hostname, status)
2570 body = "Job ID: %s\nJob Name: %s\nHost: %s\nStatus: %s\n%s\n" % (
2571 self.job.id, self.job.name, hostname, status,
2572 self._view_job_url())
showard170873e2009-01-07 00:22:26 +00002573 email_manager.manager.send_email(self.job.email_list, subject, body)
showard542e8402008-09-19 20:16:18 +00002574
2575
2576 def _email_on_job_complete(self):
showardc85c21b2008-11-24 22:17:37 +00002577 if not self.job.is_finished():
2578 return
showard542e8402008-09-19 20:16:18 +00002579
showardc85c21b2008-11-24 22:17:37 +00002580 summary_text = []
showard6355f6b2008-12-05 18:52:13 +00002581 hosts_queue = HostQueueEntry.fetch('job_id = %s' % self.job.id)
showardc85c21b2008-11-24 22:17:37 +00002582 for queue_entry in hosts_queue:
2583 summary_text.append("Host: %s Status: %s" %
showard6355f6b2008-12-05 18:52:13 +00002584 (queue_entry._get_hostname(),
showardc85c21b2008-11-24 22:17:37 +00002585 queue_entry.status))
2586
2587 summary_text = "\n".join(summary_text)
2588 status_counts = models.Job.objects.get_status_counts(
2589 [self.job.id])[self.job.id]
2590 status = ', '.join('%d %s' % (count, status) for status, count
2591 in status_counts.iteritems())
2592
2593 subject = 'Autotest: Job ID: %s "%s" %s' % (
2594 self.job.id, self.job.name, status)
2595 body = "Job ID: %s\nJob Name: %s\nStatus: %s\n%s\nSummary:\n%s" % (
2596 self.job.id, self.job.name, status, self._view_job_url(),
2597 summary_text)
showard170873e2009-01-07 00:22:26 +00002598 email_manager.manager.send_email(self.job.email_list, subject, body)
mbligh36768f02008-02-22 18:28:33 +00002599
2600
showard77182562009-06-10 00:16:05 +00002601 def run_pre_job_tasks(self, assigned_host=None):
2602 if self.meta_host is not None or self.atomic_group:
jadmanski0afbb632008-06-06 21:10:57 +00002603 assert assigned_host
2604 # ensure results dir exists for the queue log
showard08356c12009-06-15 20:24:01 +00002605 if self.host_id is None:
2606 self.set_host(assigned_host)
2607 else:
2608 assert assigned_host.id == self.host_id
mbligh36768f02008-02-22 18:28:33 +00002609
showardb18134f2009-03-20 20:52:18 +00002610 logging.info("%s/%s/%s scheduled on %s, status=%s",
2611 self.job.name, self.meta_host, self.atomic_group_id,
2612 self.host.hostname, self.status)
mbligh36768f02008-02-22 18:28:33 +00002613
showard77182562009-06-10 00:16:05 +00002614 return self._do_run_pre_job_tasks()
2615
2616
2617 def _do_run_pre_job_tasks(self):
2618 # Every host goes thru the Verifying stage (which may or may not
2619 # actually do anything as determined by get_pre_job_tasks).
2620 self.set_status(models.HostQueueEntry.Status.VERIFYING)
2621
2622 # The pre job tasks always end with a SetEntryPendingTask which
2623 # will continue as appropriate through queue_entry.on_pending().
2624 return Agent(self.job.get_pre_job_tasks(queue_entry=self))
mblighe2586682008-02-29 22:45:46 +00002625
showard6ae5ea92009-02-25 00:11:51 +00002626
jadmanski0afbb632008-06-06 21:10:57 +00002627 def requeue(self):
2628 self.set_status('Queued')
showard12f3e322009-05-13 21:27:42 +00002629 self.update_field('started_on', None)
showardde634ee2009-01-30 01:44:24 +00002630 # verify/cleanup failure sets the execution subdir, so reset it here
2631 self.set_execution_subdir('')
jadmanski0afbb632008-06-06 21:10:57 +00002632 if self.meta_host:
2633 self.set_host(None)
mbligh36768f02008-02-22 18:28:33 +00002634
2635
jadmanski0afbb632008-06-06 21:10:57 +00002636 def handle_host_failure(self):
2637 """\
2638 Called when this queue entry's host has failed verification and
2639 repair.
2640 """
2641 assert not self.meta_host
2642 self.set_status('Failed')
showard2bab8f42008-11-12 18:15:22 +00002643 self.job.stop_if_necessary()
mblighe2586682008-02-29 22:45:46 +00002644
2645
jadmanskif7fa2cc2008-10-01 14:13:23 +00002646 @property
2647 def aborted_by(self):
2648 self._load_abort_info()
2649 return self._aborted_by
2650
2651
2652 @property
2653 def aborted_on(self):
2654 self._load_abort_info()
2655 return self._aborted_on
2656
2657
2658 def _load_abort_info(self):
2659 """ Fetch info about who aborted the job. """
2660 if hasattr(self, "_aborted_by"):
2661 return
2662 rows = _db.execute("""
2663 SELECT users.login, aborted_host_queue_entries.aborted_on
2664 FROM aborted_host_queue_entries
2665 INNER JOIN users
2666 ON users.id = aborted_host_queue_entries.aborted_by_id
2667 WHERE aborted_host_queue_entries.queue_entry_id = %s
2668 """, (self.id,))
2669 if rows:
2670 self._aborted_by, self._aborted_on = rows[0]
2671 else:
2672 self._aborted_by = self._aborted_on = None
2673
2674
showardb2e2c322008-10-14 17:33:55 +00002675 def on_pending(self):
2676 """
2677 Called when an entry in a synchronous job has passed verify. If the
2678 job is ready to run, returns an agent to run the job. Returns None
2679 otherwise.
2680 """
2681 self.set_status('Pending')
showardcfd66a32008-10-15 20:31:48 +00002682 self.get_host().set_status('Pending')
showard77182562009-06-10 00:16:05 +00002683 return self.job.run_if_ready(queue_entry=self)
showardb2e2c322008-10-14 17:33:55 +00002684
2685
showardd3dc1992009-04-22 21:01:40 +00002686 def abort(self, dispatcher):
2687 assert self.aborted and not self.complete
showard1be97432008-10-17 15:30:45 +00002688
showardd3dc1992009-04-22 21:01:40 +00002689 Status = models.HostQueueEntry.Status
2690 has_running_job_agent = (
2691 self.status in (Status.RUNNING, Status.GATHERING, Status.PARSING)
2692 and dispatcher.get_agents_for_entry(self))
2693 if has_running_job_agent:
2694 # do nothing; post-job tasks will finish and then mark this entry
2695 # with status "Aborted" and take care of the host
2696 return
2697
2698 if self.status in (Status.STARTING, Status.PENDING):
2699 self.host.set_status(models.Host.Status.READY)
2700 elif self.status == Status.VERIFYING:
2701 dispatcher.add_agent(Agent(tasks=self.host.reverify_tasks()))
2702
2703 self.set_status(Status.ABORTED)
showard170873e2009-01-07 00:22:26 +00002704
2705 def execution_tag(self):
2706 assert self.execution_subdir
mblighe7d9c602009-07-02 19:02:33 +00002707 return "%s/%s" % (self.job.tag(), self.execution_subdir)
showard1be97432008-10-17 15:30:45 +00002708
2709
mbligh36768f02008-02-22 18:28:33 +00002710class Job(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002711 _table_name = 'jobs'
2712 _fields = ('id', 'owner', 'name', 'priority', 'control_file',
2713 'control_type', 'created_on', 'synch_count', 'timeout',
showarda1e74b32009-05-12 17:32:04 +00002714 'run_verify', 'email_list', 'reboot_before', 'reboot_after',
showard12f3e322009-05-13 21:27:42 +00002715 'parse_failed_repair', 'max_runtime_hrs')
showard6ae5ea92009-02-25 00:11:51 +00002716
showard77182562009-06-10 00:16:05 +00002717 # This does not need to be a column in the DB. The delays are likely to
2718 # be configured short. If the scheduler is stopped and restarted in
2719 # the middle of a job's delay cycle, the delay cycle will either be
2720 # repeated or skipped depending on the number of Pending machines found
2721 # when the restarted scheduler recovers to track it. Not a problem.
2722 #
2723 # A reference to the DelayedCallTask that will wake up the job should
2724 # no other HQEs change state in time. Its end_time attribute is used
2725 # by our run_with_ready_delay() method to determine if the wait is over.
2726 _delay_ready_task = None
2727
2728 # TODO(gps): On scheduler start/recovery we need to call HQE.on_pending() on
2729 # all status='Pending' atomic group HQEs incase a delay was running when the
2730 # scheduler was restarted and no more hosts ever successfully exit Verify.
showard6ae5ea92009-02-25 00:11:51 +00002731
showarda3c58572009-03-12 20:36:59 +00002732 def __init__(self, id=None, row=None, **kwargs):
jadmanski0afbb632008-06-06 21:10:57 +00002733 assert id or row
showarda3c58572009-03-12 20:36:59 +00002734 super(Job, self).__init__(id=id, row=row, **kwargs)
mbligh36768f02008-02-22 18:28:33 +00002735
mblighe2586682008-02-29 22:45:46 +00002736
jadmanski0afbb632008-06-06 21:10:57 +00002737 def is_server_job(self):
2738 return self.control_type != 2
mbligh36768f02008-02-22 18:28:33 +00002739
2740
showard170873e2009-01-07 00:22:26 +00002741 def tag(self):
2742 return "%s-%s" % (self.id, self.owner)
2743
2744
jadmanski0afbb632008-06-06 21:10:57 +00002745 def get_host_queue_entries(self):
2746 rows = _db.execute("""
2747 SELECT * FROM host_queue_entries
2748 WHERE job_id= %s
2749 """, (self.id,))
2750 entries = [HostQueueEntry(row=i) for i in rows]
mbligh36768f02008-02-22 18:28:33 +00002751
jadmanski0afbb632008-06-06 21:10:57 +00002752 assert len(entries)>0
mbligh36768f02008-02-22 18:28:33 +00002753
jadmanski0afbb632008-06-06 21:10:57 +00002754 return entries
mbligh36768f02008-02-22 18:28:33 +00002755
2756
jadmanski0afbb632008-06-06 21:10:57 +00002757 def set_status(self, status, update_queues=False):
2758 self.update_field('status',status)
2759
2760 if update_queues:
2761 for queue_entry in self.get_host_queue_entries():
2762 queue_entry.set_status(status)
mbligh36768f02008-02-22 18:28:33 +00002763
2764
showard77182562009-06-10 00:16:05 +00002765 def _atomic_and_has_started(self):
2766 """
2767 @returns True if any of the HostQueueEntries associated with this job
2768 have entered the Status.STARTING state or beyond.
2769 """
2770 atomic_entries = models.HostQueueEntry.objects.filter(
2771 job=self.id, atomic_group__isnull=False)
2772 if atomic_entries.count() <= 0:
2773 return False
2774
showardaf8b4ca2009-06-16 18:47:26 +00002775 # These states may *only* be reached if Job.run() has been called.
2776 started_statuses = (models.HostQueueEntry.Status.STARTING,
2777 models.HostQueueEntry.Status.RUNNING,
2778 models.HostQueueEntry.Status.COMPLETED)
2779
2780 started_entries = atomic_entries.filter(status__in=started_statuses)
showard77182562009-06-10 00:16:05 +00002781 return started_entries.count() > 0
2782
2783
2784 def _pending_count(self):
2785 """The number of HostQueueEntries for this job in the Pending state."""
2786 pending_entries = models.HostQueueEntry.objects.filter(
2787 job=self.id, status=models.HostQueueEntry.Status.PENDING)
2788 return pending_entries.count()
2789
2790
jadmanski0afbb632008-06-06 21:10:57 +00002791 def is_ready(self):
showard77182562009-06-10 00:16:05 +00002792 # NOTE: Atomic group jobs stop reporting ready after they have been
2793 # started to avoid launching multiple copies of one atomic job.
2794 # Only possible if synch_count is less than than half the number of
2795 # machines in the atomic group.
2796 return (self._pending_count() >= self.synch_count
2797 and not self._atomic_and_has_started())
mbligh36768f02008-02-22 18:28:33 +00002798
2799
jadmanski0afbb632008-06-06 21:10:57 +00002800 def num_machines(self, clause = None):
2801 sql = "job_id=%s" % self.id
2802 if clause:
2803 sql += " AND (%s)" % clause
2804 return self.count(sql, table='host_queue_entries')
mbligh36768f02008-02-22 18:28:33 +00002805
2806
jadmanski0afbb632008-06-06 21:10:57 +00002807 def num_queued(self):
2808 return self.num_machines('not complete')
mbligh36768f02008-02-22 18:28:33 +00002809
2810
jadmanski0afbb632008-06-06 21:10:57 +00002811 def num_active(self):
2812 return self.num_machines('active')
mbligh36768f02008-02-22 18:28:33 +00002813
2814
jadmanski0afbb632008-06-06 21:10:57 +00002815 def num_complete(self):
2816 return self.num_machines('complete')
mbligh36768f02008-02-22 18:28:33 +00002817
2818
jadmanski0afbb632008-06-06 21:10:57 +00002819 def is_finished(self):
showardc85c21b2008-11-24 22:17:37 +00002820 return self.num_complete() == self.num_machines()
mbligh36768f02008-02-22 18:28:33 +00002821
mbligh36768f02008-02-22 18:28:33 +00002822
showard6bb7c292009-01-30 01:44:51 +00002823 def _not_yet_run_entries(self, include_verifying=True):
2824 statuses = [models.HostQueueEntry.Status.QUEUED,
2825 models.HostQueueEntry.Status.PENDING]
2826 if include_verifying:
2827 statuses.append(models.HostQueueEntry.Status.VERIFYING)
2828 return models.HostQueueEntry.objects.filter(job=self.id,
2829 status__in=statuses)
2830
2831
2832 def _stop_all_entries(self):
2833 entries_to_stop = self._not_yet_run_entries(
2834 include_verifying=False)
2835 for child_entry in entries_to_stop:
showard4f9e5372009-01-07 21:33:38 +00002836 assert not child_entry.complete, (
2837 '%s status=%s, active=%s, complete=%s' %
2838 (child_entry.id, child_entry.status, child_entry.active,
2839 child_entry.complete))
showard2bab8f42008-11-12 18:15:22 +00002840 if child_entry.status == models.HostQueueEntry.Status.PENDING:
2841 child_entry.host.status = models.Host.Status.READY
2842 child_entry.host.save()
2843 child_entry.status = models.HostQueueEntry.Status.STOPPED
2844 child_entry.save()
2845
showard2bab8f42008-11-12 18:15:22 +00002846 def stop_if_necessary(self):
showard6bb7c292009-01-30 01:44:51 +00002847 not_yet_run = self._not_yet_run_entries()
showard2bab8f42008-11-12 18:15:22 +00002848 if not_yet_run.count() < self.synch_count:
showard6bb7c292009-01-30 01:44:51 +00002849 self._stop_all_entries()
mblighe2586682008-02-29 22:45:46 +00002850
2851
jadmanski0afbb632008-06-06 21:10:57 +00002852 def write_to_machines_file(self, queue_entry):
2853 hostname = queue_entry.get_host().hostname
showard170873e2009-01-07 00:22:26 +00002854 file_path = os.path.join(self.tag(), '.machines')
2855 _drone_manager.write_lines_to_file(file_path, [hostname])
mbligh36768f02008-02-22 18:28:33 +00002856
2857
showardf1ae3542009-05-11 19:26:02 +00002858 def _next_group_name(self, group_name=''):
2859 """@returns a directory name to use for the next host group results."""
2860 if group_name:
2861 # Sanitize for use as a pathname.
2862 group_name = group_name.replace(os.path.sep, '_')
2863 if group_name.startswith('.'):
2864 group_name = '_' + group_name[1:]
2865 # Add a separator between the group name and 'group%d'.
2866 group_name += '.'
2867 group_count_re = re.compile(r'%sgroup(\d+)' % re.escape(group_name))
showard2bab8f42008-11-12 18:15:22 +00002868 query = models.HostQueueEntry.objects.filter(
2869 job=self.id).values('execution_subdir').distinct()
2870 subdirs = (entry['execution_subdir'] for entry in query)
showardf1ae3542009-05-11 19:26:02 +00002871 group_matches = (group_count_re.match(subdir) for subdir in subdirs)
2872 ids = [int(match.group(1)) for match in group_matches if match]
showard2bab8f42008-11-12 18:15:22 +00002873 if ids:
2874 next_id = max(ids) + 1
2875 else:
2876 next_id = 0
showardf1ae3542009-05-11 19:26:02 +00002877 return '%sgroup%d' % (group_name, next_id)
showard2bab8f42008-11-12 18:15:22 +00002878
2879
showard170873e2009-01-07 00:22:26 +00002880 def _write_control_file(self, execution_tag):
2881 control_path = _drone_manager.attach_file_to_execution(
2882 execution_tag, self.control_file)
2883 return control_path
mbligh36768f02008-02-22 18:28:33 +00002884
showardb2e2c322008-10-14 17:33:55 +00002885
showard2bab8f42008-11-12 18:15:22 +00002886 def get_group_entries(self, queue_entry_from_group):
2887 execution_subdir = queue_entry_from_group.execution_subdir
showarde788ea62008-11-17 21:02:47 +00002888 return list(HostQueueEntry.fetch(
2889 where='job_id=%s AND execution_subdir=%s',
2890 params=(self.id, execution_subdir)))
showard2bab8f42008-11-12 18:15:22 +00002891
2892
showardb2e2c322008-10-14 17:33:55 +00002893 def _get_autoserv_params(self, queue_entries):
showard170873e2009-01-07 00:22:26 +00002894 assert queue_entries
2895 execution_tag = queue_entries[0].execution_tag()
2896 control_path = self._write_control_file(execution_tag)
jadmanski0afbb632008-06-06 21:10:57 +00002897 hostnames = ','.join([entry.get_host().hostname
2898 for entry in queue_entries])
mbligh36768f02008-02-22 18:28:33 +00002899
showard87ba02a2009-04-20 19:37:32 +00002900 params = _autoserv_command_line(
2901 hostnames, execution_tag,
2902 ['-P', execution_tag, '-n',
2903 _drone_manager.absolute_path(control_path)],
showarde9c69362009-06-30 01:58:03 +00002904 job=self, verbose=False)
mbligh36768f02008-02-22 18:28:33 +00002905
jadmanski0afbb632008-06-06 21:10:57 +00002906 if not self.is_server_job():
2907 params.append('-c')
mbligh36768f02008-02-22 18:28:33 +00002908
showardb2e2c322008-10-14 17:33:55 +00002909 return params
mblighe2586682008-02-29 22:45:46 +00002910
mbligh36768f02008-02-22 18:28:33 +00002911
showardc9ae1782009-01-30 01:42:37 +00002912 def _should_run_cleanup(self, queue_entry):
showard0fc38302008-10-23 00:44:07 +00002913 if self.reboot_before == models.RebootBefore.ALWAYS:
showardc9ae1782009-01-30 01:42:37 +00002914 return True
showard0fc38302008-10-23 00:44:07 +00002915 elif self.reboot_before == models.RebootBefore.IF_DIRTY:
showardc9ae1782009-01-30 01:42:37 +00002916 return queue_entry.get_host().dirty
2917 return False
showard21baa452008-10-21 00:08:39 +00002918
showardc9ae1782009-01-30 01:42:37 +00002919
2920 def _should_run_verify(self, queue_entry):
2921 do_not_verify = (queue_entry.host.protection ==
2922 host_protections.Protection.DO_NOT_VERIFY)
2923 if do_not_verify:
2924 return False
2925 return self.run_verify
2926
2927
showard77182562009-06-10 00:16:05 +00002928 def get_pre_job_tasks(self, queue_entry):
2929 """
2930 Get a list of tasks to perform before the host_queue_entry
2931 may be used to run this Job (such as Cleanup & Verify).
2932
2933 @returns A list of tasks to be done to the given queue_entry before
2934 it should be considered be ready to run this job. The last
2935 task in the list calls HostQueueEntry.on_pending(), which
2936 continues the flow of the job.
2937 """
showard21baa452008-10-21 00:08:39 +00002938 tasks = []
showardc9ae1782009-01-30 01:42:37 +00002939 if self._should_run_cleanup(queue_entry):
showard45ae8192008-11-05 19:32:53 +00002940 tasks.append(CleanupTask(queue_entry=queue_entry))
showardc9ae1782009-01-30 01:42:37 +00002941 if self._should_run_verify(queue_entry):
2942 tasks.append(VerifyTask(queue_entry=queue_entry))
2943 tasks.append(SetEntryPendingTask(queue_entry))
showard21baa452008-10-21 00:08:39 +00002944 return tasks
2945
2946
showardf1ae3542009-05-11 19:26:02 +00002947 def _assign_new_group(self, queue_entries, group_name=''):
showard2bab8f42008-11-12 18:15:22 +00002948 if len(queue_entries) == 1:
showardf1ae3542009-05-11 19:26:02 +00002949 group_subdir_name = queue_entries[0].get_host().hostname
showard2bab8f42008-11-12 18:15:22 +00002950 else:
showardf1ae3542009-05-11 19:26:02 +00002951 group_subdir_name = self._next_group_name(group_name)
showardb18134f2009-03-20 20:52:18 +00002952 logging.info('Running synchronous job %d hosts %s as %s',
showard2bab8f42008-11-12 18:15:22 +00002953 self.id, [entry.host.hostname for entry in queue_entries],
showardf1ae3542009-05-11 19:26:02 +00002954 group_subdir_name)
showard2bab8f42008-11-12 18:15:22 +00002955
2956 for queue_entry in queue_entries:
showardf1ae3542009-05-11 19:26:02 +00002957 queue_entry.set_execution_subdir(group_subdir_name)
showard2bab8f42008-11-12 18:15:22 +00002958
2959
2960 def _choose_group_to_run(self, include_queue_entry):
showardf1ae3542009-05-11 19:26:02 +00002961 """
2962 @returns A tuple containing a list of HostQueueEntry instances to be
2963 used to run this Job, a string group name to suggest giving
showard54c1ea92009-05-20 00:32:58 +00002964 to this job in the results database.
showardf1ae3542009-05-11 19:26:02 +00002965 """
showard77182562009-06-10 00:16:05 +00002966 atomic_group = include_queue_entry.atomic_group
showard2bab8f42008-11-12 18:15:22 +00002967 chosen_entries = [include_queue_entry]
showardf1ae3542009-05-11 19:26:02 +00002968 if atomic_group:
2969 num_entries_wanted = atomic_group.max_number_of_machines
2970 else:
2971 num_entries_wanted = self.synch_count
2972 num_entries_wanted -= len(chosen_entries)
showard2bab8f42008-11-12 18:15:22 +00002973
showardf1ae3542009-05-11 19:26:02 +00002974 if num_entries_wanted > 0:
2975 where_clause = 'job_id = %s AND status = "Pending" AND id != %s'
showard54c1ea92009-05-20 00:32:58 +00002976 pending_entries = list(HostQueueEntry.fetch(
showardf1ae3542009-05-11 19:26:02 +00002977 where=where_clause,
showard54c1ea92009-05-20 00:32:58 +00002978 params=(self.id, include_queue_entry.id)))
2979
2980 # Sort the chosen hosts by hostname before slicing.
2981 def cmp_queue_entries_by_hostname(entry_a, entry_b):
2982 return Host.cmp_for_sort(entry_a.host, entry_b.host)
2983 pending_entries.sort(cmp=cmp_queue_entries_by_hostname)
2984 chosen_entries += pending_entries[:num_entries_wanted]
showard2bab8f42008-11-12 18:15:22 +00002985
showardf1ae3542009-05-11 19:26:02 +00002986 # Sanity check. We'll only ever be called if this can be met.
2987 assert len(chosen_entries) >= self.synch_count
2988
2989 if atomic_group:
2990 # Look at any meta_host and dependency labels and pick the first
2991 # one that also specifies this atomic group. Use that label name
2992 # as the group name if possible (it is more specific).
2993 group_name = atomic_group.name
2994 for label in include_queue_entry.get_labels():
2995 if label.atomic_group_id:
2996 assert label.atomic_group_id == atomic_group.id
2997 group_name = label.name
2998 break
2999 else:
3000 group_name = ''
3001
3002 self._assign_new_group(chosen_entries, group_name=group_name)
3003 return chosen_entries, group_name
showard2bab8f42008-11-12 18:15:22 +00003004
3005
showard77182562009-06-10 00:16:05 +00003006 def run_if_ready(self, queue_entry):
3007 """
3008 @returns An Agent instance to ultimately run this job if enough hosts
3009 are ready for it to run.
3010 @returns None and potentially cleans up excess hosts if this Job
3011 is not ready to run.
3012 """
showardb2e2c322008-10-14 17:33:55 +00003013 if not self.is_ready():
showard77182562009-06-10 00:16:05 +00003014 self.stop_if_necessary()
3015 return None
mbligh36768f02008-02-22 18:28:33 +00003016
showard77182562009-06-10 00:16:05 +00003017 if queue_entry.atomic_group:
3018 return self.run_with_ready_delay(queue_entry)
3019
3020 return self.run(queue_entry)
3021
3022
3023 def run_with_ready_delay(self, queue_entry):
3024 """
3025 Start a delay to wait for more hosts to enter Pending state before
3026 launching an atomic group job. Once set, the a delay cannot be reset.
3027
3028 @param queue_entry: The HostQueueEntry object to get atomic group
3029 info from and pass to run_if_ready when the delay is up.
3030
3031 @returns An Agent to run the job as appropriate or None if a delay
3032 has already been set.
3033 """
3034 assert queue_entry.job_id == self.id
3035 assert queue_entry.atomic_group
3036 delay = scheduler_config.config.secs_to_wait_for_atomic_group_hosts
3037 pending_threshold = queue_entry.atomic_group.max_number_of_machines
3038 over_max_threshold = (self._pending_count() >= pending_threshold)
3039 delay_expired = (self._delay_ready_task and
3040 time.time() >= self._delay_ready_task.end_time)
3041
3042 # Delay is disabled or we already have enough? Do not wait to run.
3043 if not delay or over_max_threshold or delay_expired:
3044 return self.run(queue_entry)
3045
3046 # A delay was previously scheduled.
3047 if self._delay_ready_task:
3048 return None
3049
3050 def run_job_after_delay():
3051 logging.info('Job %s done waiting for extra hosts.', self.id)
3052 return self.run(queue_entry)
3053
3054 self._delay_ready_task = DelayedCallTask(delay_seconds=delay,
3055 callback=run_job_after_delay)
3056
3057 return Agent([self._delay_ready_task], num_processes=0)
3058
3059
3060 def run(self, queue_entry):
3061 """
3062 @param queue_entry: The HostQueueEntry instance calling this method.
3063 @returns An Agent instance to run this job or None if we've already
3064 been run.
3065 """
3066 if queue_entry.atomic_group and self._atomic_and_has_started():
3067 logging.error('Job.run() called on running atomic Job %d '
3068 'with HQE %s.', self.id, queue_entry)
3069 return None
showardf1ae3542009-05-11 19:26:02 +00003070 queue_entries, group_name = self._choose_group_to_run(queue_entry)
3071 return self._finish_run(queue_entries, group_name)
showardb2e2c322008-10-14 17:33:55 +00003072
3073
showardf1ae3542009-05-11 19:26:02 +00003074 def _finish_run(self, queue_entries, group_name):
showardb2ccdda2008-10-28 20:39:05 +00003075 for queue_entry in queue_entries:
3076 queue_entry.set_status('Starting')
showardb2e2c322008-10-14 17:33:55 +00003077 params = self._get_autoserv_params(queue_entries)
3078 queue_task = QueueTask(job=self, queue_entries=queue_entries,
showardf1ae3542009-05-11 19:26:02 +00003079 cmd=params, group_name=group_name)
3080 tasks = [queue_task]
showard77182562009-06-10 00:16:05 +00003081 if self._delay_ready_task:
3082 # Cancel any pending callback that would try to run again
3083 # as we are already running.
3084 self._delay_ready_task.abort()
showardb2e2c322008-10-14 17:33:55 +00003085
showard170873e2009-01-07 00:22:26 +00003086 return Agent(tasks, num_processes=len(queue_entries))
showardb2e2c322008-10-14 17:33:55 +00003087
3088
mbligh36768f02008-02-22 18:28:33 +00003089if __name__ == '__main__':
jadmanski0afbb632008-06-06 21:10:57 +00003090 main()