blob: cb7eea1f1a3916c1b7b32bbbdadc3892edb8ca7a [file] [log] [blame]
mbligh36768f02008-02-22 18:28:33 +00001#!/usr/bin/python -u
2
3"""
4Autotest scheduler
5"""
showard909c7a62008-07-15 21:52:38 +00006
mbligh36768f02008-02-22 18:28:33 +00007
showard89f84db2009-03-12 20:39:13 +00008import datetime, errno, optparse, os, pwd, Queue, random, re, shutil, signal
showard542e8402008-09-19 20:16:18 +00009import smtplib, socket, stat, subprocess, sys, tempfile, time, traceback
showardb18134f2009-03-20 20:52:18 +000010import itertools, logging, logging.config, weakref
mbligh70feeee2008-06-11 16:20:49 +000011import common
mbligh8bcd23a2009-02-03 19:14:06 +000012import MySQLdb
showard21baa452008-10-21 00:08:39 +000013from autotest_lib.frontend import setup_django_environment
showard542e8402008-09-19 20:16:18 +000014from autotest_lib.client.common_lib import global_config
showardb18134f2009-03-20 20:52:18 +000015from autotest_lib.client.common_lib import host_protections, utils
showardb1e51872008-10-07 11:08:18 +000016from autotest_lib.database import database_connection
showard21baa452008-10-21 00:08:39 +000017from autotest_lib.frontend.afe import models
showard170873e2009-01-07 00:22:26 +000018from autotest_lib.scheduler import drone_manager, drones, email_manager
mblighf3294cc2009-04-08 21:17:38 +000019from autotest_lib.scheduler import monitor_db_cleanup
showardd1ee1dd2009-01-07 21:33:08 +000020from autotest_lib.scheduler import status_server, scheduler_config
mbligh70feeee2008-06-11 16:20:49 +000021
mblighb090f142008-02-27 21:33:46 +000022
mbligh36768f02008-02-22 18:28:33 +000023RESULTS_DIR = '.'
24AUTOSERV_NICE_LEVEL = 10
showard170873e2009-01-07 00:22:26 +000025DB_CONFIG_SECTION = 'AUTOTEST_WEB'
mbligh36768f02008-02-22 18:28:33 +000026
27AUTOTEST_PATH = os.path.join(os.path.dirname(__file__), '..')
28
29if os.environ.has_key('AUTOTEST_DIR'):
jadmanski0afbb632008-06-06 21:10:57 +000030 AUTOTEST_PATH = os.environ['AUTOTEST_DIR']
mbligh36768f02008-02-22 18:28:33 +000031AUTOTEST_SERVER_DIR = os.path.join(AUTOTEST_PATH, 'server')
32AUTOTEST_TKO_DIR = os.path.join(AUTOTEST_PATH, 'tko')
33
34if AUTOTEST_SERVER_DIR not in sys.path:
jadmanski0afbb632008-06-06 21:10:57 +000035 sys.path.insert(0, AUTOTEST_SERVER_DIR)
mbligh36768f02008-02-22 18:28:33 +000036
mbligh90a549d2008-03-25 23:52:34 +000037# how long to wait for autoserv to write a pidfile
38PIDFILE_TIMEOUT = 5 * 60 # 5 min
mblighbb421852008-03-11 22:36:16 +000039
showardd3dc1992009-04-22 21:01:40 +000040_AUTOSERV_PID_FILE = '.autoserv_execute'
41_CRASHINFO_PID_FILE = '.collect_crashinfo_execute'
42_PARSER_PID_FILE = '.parser_execute'
43
44_ALL_PIDFILE_NAMES = (_AUTOSERV_PID_FILE, _CRASHINFO_PID_FILE,
45 _PARSER_PID_FILE)
46
showard35162b02009-03-03 02:17:30 +000047# error message to leave in results dir when an autoserv process disappears
48# mysteriously
49_LOST_PROCESS_ERROR = """\
50Autoserv failed abnormally during execution for this job, probably due to a
51system error on the Autotest server. Full results may not be available. Sorry.
52"""
53
mbligh6f8bab42008-02-29 22:45:14 +000054_db = None
mbligh36768f02008-02-22 18:28:33 +000055_shutdown = False
showard170873e2009-01-07 00:22:26 +000056_autoserv_path = os.path.join(drones.AUTOTEST_INSTALL_DIR, 'server', 'autoserv')
57_parser_path = os.path.join(drones.AUTOTEST_INSTALL_DIR, 'tko', 'parse')
mbligh4314a712008-02-29 22:44:30 +000058_testing_mode = False
showard542e8402008-09-19 20:16:18 +000059_base_url = None
showardc85c21b2008-11-24 22:17:37 +000060_notify_email_statuses = []
showard170873e2009-01-07 00:22:26 +000061_drone_manager = drone_manager.DroneManager()
mbligh36768f02008-02-22 18:28:33 +000062
showardb18134f2009-03-20 20:52:18 +000063# load the logging settings
64scheduler_dir = os.path.join(AUTOTEST_PATH, 'scheduler')
showard50e463b2009-04-07 18:13:45 +000065if not os.environ.has_key('AUTOTEST_SCHEDULER_LOG_DIR'):
66 os.environ['AUTOTEST_SCHEDULER_LOG_DIR'] = os.path.join(AUTOTEST_PATH, 'logs')
showardb18134f2009-03-20 20:52:18 +000067# Here we export the log name, using the same convention as autoserv's results
68# directory.
mblighc9895aa2009-04-01 18:36:58 +000069if os.environ.has_key('AUTOTEST_SCHEDULER_LOG_NAME'):
70 scheduler_log_name = os.environ['AUTOTEST_SCHEDULER_LOG_NAME']
71else:
72 scheduler_log_name = 'scheduler.log.%s' % time.strftime('%Y-%m-%d-%H.%M.%S')
73 os.environ['AUTOTEST_SCHEDULER_LOG_NAME'] = scheduler_log_name
74
showardb18134f2009-03-20 20:52:18 +000075logging.config.fileConfig(os.path.join(scheduler_dir, 'debug_scheduler.ini'))
76
mbligh36768f02008-02-22 18:28:33 +000077
78def main():
showard27f33872009-04-07 18:20:53 +000079 try:
80 main_without_exception_handling()
81 except:
82 logging.exception('Exception escaping in monitor_db')
83 raise
84
85
86def main_without_exception_handling():
jadmanski0afbb632008-06-06 21:10:57 +000087 usage = 'usage: %prog [options] results_dir'
mbligh36768f02008-02-22 18:28:33 +000088
jadmanski0afbb632008-06-06 21:10:57 +000089 parser = optparse.OptionParser(usage)
90 parser.add_option('--recover-hosts', help='Try to recover dead hosts',
91 action='store_true')
92 parser.add_option('--logfile', help='Set a log file that all stdout ' +
93 'should be redirected to. Stderr will go to this ' +
94 'file + ".err"')
95 parser.add_option('--test', help='Indicate that scheduler is under ' +
96 'test and should use dummy autoserv and no parsing',
97 action='store_true')
98 (options, args) = parser.parse_args()
99 if len(args) != 1:
100 parser.print_usage()
101 return
mbligh36768f02008-02-22 18:28:33 +0000102
jadmanski0afbb632008-06-06 21:10:57 +0000103 global RESULTS_DIR
104 RESULTS_DIR = args[0]
mbligh36768f02008-02-22 18:28:33 +0000105
showardcca334f2009-03-12 20:38:34 +0000106 # Change the cwd while running to avoid issues incase we were launched from
107 # somewhere odd (such as a random NFS home directory of the person running
108 # sudo to launch us as the appropriate user).
109 os.chdir(RESULTS_DIR)
110
jadmanski0afbb632008-06-06 21:10:57 +0000111 c = global_config.global_config
showardd1ee1dd2009-01-07 21:33:08 +0000112 notify_statuses_list = c.get_config_value(scheduler_config.CONFIG_SECTION,
113 "notify_email_statuses",
114 default='')
showardc85c21b2008-11-24 22:17:37 +0000115 global _notify_email_statuses
showard170873e2009-01-07 00:22:26 +0000116 _notify_email_statuses = [status for status in
117 re.split(r'[\s,;:]', notify_statuses_list.lower())
118 if status]
showardc85c21b2008-11-24 22:17:37 +0000119
jadmanski0afbb632008-06-06 21:10:57 +0000120 if options.test:
121 global _autoserv_path
122 _autoserv_path = 'autoserv_dummy'
123 global _testing_mode
124 _testing_mode = True
mbligh36768f02008-02-22 18:28:33 +0000125
mbligh37eceaa2008-12-15 22:56:37 +0000126 # AUTOTEST_WEB.base_url is still a supported config option as some people
127 # may wish to override the entire url.
showard542e8402008-09-19 20:16:18 +0000128 global _base_url
showard170873e2009-01-07 00:22:26 +0000129 config_base_url = c.get_config_value(DB_CONFIG_SECTION, 'base_url',
130 default='')
mbligh37eceaa2008-12-15 22:56:37 +0000131 if config_base_url:
132 _base_url = config_base_url
showard542e8402008-09-19 20:16:18 +0000133 else:
mbligh37eceaa2008-12-15 22:56:37 +0000134 # For the common case of everything running on a single server you
135 # can just set the hostname in a single place in the config file.
136 server_name = c.get_config_value('SERVER', 'hostname')
137 if not server_name:
showardb18134f2009-03-20 20:52:18 +0000138 logging.critical('[SERVER] hostname missing from the config file.')
mbligh37eceaa2008-12-15 22:56:37 +0000139 sys.exit(1)
140 _base_url = 'http://%s/afe/' % server_name
showard542e8402008-09-19 20:16:18 +0000141
showardc5afc462009-01-13 00:09:39 +0000142 server = status_server.StatusServer(_drone_manager)
showardd1ee1dd2009-01-07 21:33:08 +0000143 server.start()
144
jadmanski0afbb632008-06-06 21:10:57 +0000145 try:
showardc5afc462009-01-13 00:09:39 +0000146 init(options.logfile)
147 dispatcher = Dispatcher()
showard915958d2009-04-22 21:00:58 +0000148 dispatcher.initialize(recover_hosts=options.recover_hosts)
showardc5afc462009-01-13 00:09:39 +0000149
jadmanski0afbb632008-06-06 21:10:57 +0000150 while not _shutdown:
151 dispatcher.tick()
showardd1ee1dd2009-01-07 21:33:08 +0000152 time.sleep(scheduler_config.config.tick_pause_sec)
jadmanski0afbb632008-06-06 21:10:57 +0000153 except:
showard170873e2009-01-07 00:22:26 +0000154 email_manager.manager.log_stacktrace(
155 "Uncaught exception; terminating monitor_db")
jadmanski0afbb632008-06-06 21:10:57 +0000156
showard170873e2009-01-07 00:22:26 +0000157 email_manager.manager.send_queued_emails()
showard55b4b542009-01-08 23:30:30 +0000158 server.shutdown()
showard170873e2009-01-07 00:22:26 +0000159 _drone_manager.shutdown()
jadmanski0afbb632008-06-06 21:10:57 +0000160 _db.disconnect()
mbligh36768f02008-02-22 18:28:33 +0000161
162
163def handle_sigint(signum, frame):
jadmanski0afbb632008-06-06 21:10:57 +0000164 global _shutdown
165 _shutdown = True
showardb18134f2009-03-20 20:52:18 +0000166 logging.info("Shutdown request received.")
mbligh36768f02008-02-22 18:28:33 +0000167
168
169def init(logfile):
jadmanski0afbb632008-06-06 21:10:57 +0000170 if logfile:
171 enable_logging(logfile)
showardb18134f2009-03-20 20:52:18 +0000172 logging.info("%s> dispatcher starting", time.strftime("%X %x"))
173 logging.info("My PID is %d", os.getpid())
mbligh36768f02008-02-22 18:28:33 +0000174
mblighfb676032009-04-01 18:25:38 +0000175 utils.write_pid("monitor_db")
176
showardb1e51872008-10-07 11:08:18 +0000177 if _testing_mode:
178 global_config.global_config.override_config_value(
showard170873e2009-01-07 00:22:26 +0000179 DB_CONFIG_SECTION, 'database', 'stresstest_autotest_web')
showardb1e51872008-10-07 11:08:18 +0000180
jadmanski0afbb632008-06-06 21:10:57 +0000181 os.environ['PATH'] = AUTOTEST_SERVER_DIR + ':' + os.environ['PATH']
182 global _db
showard170873e2009-01-07 00:22:26 +0000183 _db = database_connection.DatabaseConnection(DB_CONFIG_SECTION)
jadmanski0afbb632008-06-06 21:10:57 +0000184 _db.connect()
mbligh36768f02008-02-22 18:28:33 +0000185
showardfa8629c2008-11-04 16:51:23 +0000186 # ensure Django connection is in autocommit
187 setup_django_environment.enable_autocommit()
188
showardb18134f2009-03-20 20:52:18 +0000189 logging.info("Setting signal handler")
jadmanski0afbb632008-06-06 21:10:57 +0000190 signal.signal(signal.SIGINT, handle_sigint)
191
showardd1ee1dd2009-01-07 21:33:08 +0000192 drones = global_config.global_config.get_config_value(
193 scheduler_config.CONFIG_SECTION, 'drones', default='localhost')
194 drone_list = [hostname.strip() for hostname in drones.split(',')]
showard170873e2009-01-07 00:22:26 +0000195 results_host = global_config.global_config.get_config_value(
showardd1ee1dd2009-01-07 21:33:08 +0000196 scheduler_config.CONFIG_SECTION, 'results_host', default='localhost')
showard170873e2009-01-07 00:22:26 +0000197 _drone_manager.initialize(RESULTS_DIR, drone_list, results_host)
198
showardb18134f2009-03-20 20:52:18 +0000199 logging.info("Connected! Running...")
mbligh36768f02008-02-22 18:28:33 +0000200
201
202def enable_logging(logfile):
jadmanski0afbb632008-06-06 21:10:57 +0000203 out_file = logfile
204 err_file = "%s.err" % logfile
showardb18134f2009-03-20 20:52:18 +0000205 logging.info("Enabling logging to %s (%s)", out_file, err_file)
jadmanski0afbb632008-06-06 21:10:57 +0000206 out_fd = open(out_file, "a", buffering=0)
207 err_fd = open(err_file, "a", buffering=0)
mbligh36768f02008-02-22 18:28:33 +0000208
jadmanski0afbb632008-06-06 21:10:57 +0000209 os.dup2(out_fd.fileno(), sys.stdout.fileno())
210 os.dup2(err_fd.fileno(), sys.stderr.fileno())
mbligh36768f02008-02-22 18:28:33 +0000211
jadmanski0afbb632008-06-06 21:10:57 +0000212 sys.stdout = out_fd
213 sys.stderr = err_fd
mbligh36768f02008-02-22 18:28:33 +0000214
215
showard87ba02a2009-04-20 19:37:32 +0000216def _autoserv_command_line(machines, results_dir, extra_args, job=None,
217 queue_entry=None):
218 autoserv_argv = [_autoserv_path, '-p', '-m', machines,
219 '-r', _drone_manager.absolute_path(results_dir)]
220 if job or queue_entry:
221 if not job:
222 job = queue_entry.job
223 autoserv_argv += ['-u', job.owner, '-l', job.name]
224 return autoserv_argv + extra_args
225
226
showard89f84db2009-03-12 20:39:13 +0000227class SchedulerError(Exception):
228 """Raised by HostScheduler when an inconsistent state occurs."""
229
230
showard63a34772008-08-18 19:32:50 +0000231class HostScheduler(object):
232 def _get_ready_hosts(self):
233 # avoid any host with a currently active queue entry against it
234 hosts = Host.fetch(
235 joins='LEFT JOIN host_queue_entries AS active_hqe '
236 'ON (hosts.id = active_hqe.host_id AND '
showardb1e51872008-10-07 11:08:18 +0000237 'active_hqe.active)',
showard63a34772008-08-18 19:32:50 +0000238 where="active_hqe.host_id IS NULL "
showardb1e51872008-10-07 11:08:18 +0000239 "AND NOT hosts.locked "
showard63a34772008-08-18 19:32:50 +0000240 "AND (hosts.status IS NULL OR hosts.status = 'Ready')")
241 return dict((host.id, host) for host in hosts)
242
243
244 @staticmethod
245 def _get_sql_id_list(id_list):
246 return ','.join(str(item_id) for item_id in id_list)
247
248
249 @classmethod
showard989f25d2008-10-01 11:38:11 +0000250 def _get_many2many_dict(cls, query, id_list, flip=False):
mbligh849a0f62008-08-28 20:12:19 +0000251 if not id_list:
252 return {}
showard63a34772008-08-18 19:32:50 +0000253 query %= cls._get_sql_id_list(id_list)
254 rows = _db.execute(query)
showard989f25d2008-10-01 11:38:11 +0000255 return cls._process_many2many_dict(rows, flip)
256
257
258 @staticmethod
259 def _process_many2many_dict(rows, flip=False):
showard63a34772008-08-18 19:32:50 +0000260 result = {}
261 for row in rows:
showard89f84db2009-03-12 20:39:13 +0000262 left_id, right_id = int(row[0]), int(row[1])
showard989f25d2008-10-01 11:38:11 +0000263 if flip:
264 left_id, right_id = right_id, left_id
showard63a34772008-08-18 19:32:50 +0000265 result.setdefault(left_id, set()).add(right_id)
266 return result
267
268
269 @classmethod
270 def _get_job_acl_groups(cls, job_ids):
271 query = """
showardd9ac4452009-02-07 02:04:37 +0000272 SELECT jobs.id, acl_groups_users.aclgroup_id
showard63a34772008-08-18 19:32:50 +0000273 FROM jobs
274 INNER JOIN users ON users.login = jobs.owner
275 INNER JOIN acl_groups_users ON acl_groups_users.user_id = users.id
276 WHERE jobs.id IN (%s)
277 """
278 return cls._get_many2many_dict(query, job_ids)
279
280
281 @classmethod
282 def _get_job_ineligible_hosts(cls, job_ids):
283 query = """
284 SELECT job_id, host_id
285 FROM ineligible_host_queues
286 WHERE job_id IN (%s)
287 """
288 return cls._get_many2many_dict(query, job_ids)
289
290
291 @classmethod
showard989f25d2008-10-01 11:38:11 +0000292 def _get_job_dependencies(cls, job_ids):
293 query = """
294 SELECT job_id, label_id
295 FROM jobs_dependency_labels
296 WHERE job_id IN (%s)
297 """
298 return cls._get_many2many_dict(query, job_ids)
299
300
301 @classmethod
showard63a34772008-08-18 19:32:50 +0000302 def _get_host_acls(cls, host_ids):
303 query = """
showardd9ac4452009-02-07 02:04:37 +0000304 SELECT host_id, aclgroup_id
showard63a34772008-08-18 19:32:50 +0000305 FROM acl_groups_hosts
306 WHERE host_id IN (%s)
307 """
308 return cls._get_many2many_dict(query, host_ids)
309
310
311 @classmethod
312 def _get_label_hosts(cls, host_ids):
showardfa8629c2008-11-04 16:51:23 +0000313 if not host_ids:
314 return {}, {}
showard63a34772008-08-18 19:32:50 +0000315 query = """
316 SELECT label_id, host_id
317 FROM hosts_labels
318 WHERE host_id IN (%s)
showard989f25d2008-10-01 11:38:11 +0000319 """ % cls._get_sql_id_list(host_ids)
320 rows = _db.execute(query)
321 labels_to_hosts = cls._process_many2many_dict(rows)
322 hosts_to_labels = cls._process_many2many_dict(rows, flip=True)
323 return labels_to_hosts, hosts_to_labels
324
325
326 @classmethod
327 def _get_labels(cls):
328 return dict((label.id, label) for label in Label.fetch())
showard63a34772008-08-18 19:32:50 +0000329
330
331 def refresh(self, pending_queue_entries):
332 self._hosts_available = self._get_ready_hosts()
333
334 relevant_jobs = [queue_entry.job_id
335 for queue_entry in pending_queue_entries]
336 self._job_acls = self._get_job_acl_groups(relevant_jobs)
337 self._ineligible_hosts = self._get_job_ineligible_hosts(relevant_jobs)
showard989f25d2008-10-01 11:38:11 +0000338 self._job_dependencies = self._get_job_dependencies(relevant_jobs)
showard63a34772008-08-18 19:32:50 +0000339
340 host_ids = self._hosts_available.keys()
341 self._host_acls = self._get_host_acls(host_ids)
showard989f25d2008-10-01 11:38:11 +0000342 self._label_hosts, self._host_labels = self._get_label_hosts(host_ids)
343
344 self._labels = self._get_labels()
showard63a34772008-08-18 19:32:50 +0000345
346
347 def _is_acl_accessible(self, host_id, queue_entry):
348 job_acls = self._job_acls.get(queue_entry.job_id, set())
349 host_acls = self._host_acls.get(host_id, set())
350 return len(host_acls.intersection(job_acls)) > 0
351
352
showard989f25d2008-10-01 11:38:11 +0000353 def _check_job_dependencies(self, job_dependencies, host_labels):
354 missing = job_dependencies - host_labels
showard89f84db2009-03-12 20:39:13 +0000355 return len(missing) == 0
showard989f25d2008-10-01 11:38:11 +0000356
357
358 def _check_only_if_needed_labels(self, job_dependencies, host_labels,
359 queue_entry):
showardade14e22009-01-26 22:38:32 +0000360 if not queue_entry.meta_host:
361 # bypass only_if_needed labels when a specific host is selected
362 return True
363
showard989f25d2008-10-01 11:38:11 +0000364 for label_id in host_labels:
365 label = self._labels[label_id]
366 if not label.only_if_needed:
367 # we don't care about non-only_if_needed labels
368 continue
369 if queue_entry.meta_host == label_id:
370 # if the label was requested in a metahost it's OK
371 continue
372 if label_id not in job_dependencies:
373 return False
374 return True
375
376
showard89f84db2009-03-12 20:39:13 +0000377 def _check_atomic_group_labels(self, host_labels, queue_entry):
378 """
379 Determine if the given HostQueueEntry's atomic group settings are okay
380 to schedule on a host with the given labels.
381
382 @param host_labels - A list of label ids that the host has.
383 @param queue_entry - The HostQueueEntry being considered for the host.
384
385 @returns True if atomic group settings are okay, False otherwise.
386 """
387 return (self._get_host_atomic_group_id(host_labels) ==
388 queue_entry.atomic_group_id)
389
390
391 def _get_host_atomic_group_id(self, host_labels):
392 """
393 Return the atomic group label id for a host with the given set of
394 labels if any, or None otherwise. Raises an exception if more than
395 one atomic group are found in the set of labels.
396
397 @param host_labels - A list of label ids that the host has.
398
399 @returns The id of the atomic group found on a label in host_labels
400 or None if no atomic group label is found.
401 @raises SchedulerError - If more than one atomic group label is found.
402 """
403 atomic_ids = [self._labels[label_id].atomic_group_id
404 for label_id in host_labels
405 if self._labels[label_id].atomic_group_id is not None]
406 if not atomic_ids:
407 return None
408 if len(atomic_ids) > 1:
409 raise SchedulerError('More than one atomic label on host.')
410 return atomic_ids[0]
411
412
413 def _get_atomic_group_labels(self, atomic_group_id):
414 """
415 Lookup the label ids that an atomic_group is associated with.
416
417 @param atomic_group_id - The id of the AtomicGroup to look up.
418
419 @returns A generator yeilding Label ids for this atomic group.
420 """
421 return (id for id, label in self._labels.iteritems()
422 if label.atomic_group_id == atomic_group_id
423 and not label.invalid)
424
425
426 def _get_eligible_hosts_in_group(self, group_hosts, queue_entry):
427 """
428 @param group_hosts - A sequence of Host ids to test for usability
429 and eligibility against the Job associated with queue_entry.
430 @param queue_entry - The HostQueueEntry that these hosts are being
431 tested for eligibility against.
432
433 @returns A subset of group_hosts Host ids that are eligible for the
434 supplied queue_entry.
435 """
436 return set(host_id for host_id in group_hosts
437 if self._is_host_usable(host_id)
438 and self._is_host_eligible_for_job(host_id, queue_entry))
439
440
showard989f25d2008-10-01 11:38:11 +0000441 def _is_host_eligible_for_job(self, host_id, queue_entry):
442 job_dependencies = self._job_dependencies.get(queue_entry.job_id, set())
443 host_labels = self._host_labels.get(host_id, set())
mblighc993bee2008-10-03 03:42:34 +0000444
showard89f84db2009-03-12 20:39:13 +0000445 return (self._is_acl_accessible(host_id, queue_entry) and
446 self._check_job_dependencies(job_dependencies, host_labels) and
447 self._check_only_if_needed_labels(
448 job_dependencies, host_labels, queue_entry) and
449 self._check_atomic_group_labels(host_labels, queue_entry))
showard989f25d2008-10-01 11:38:11 +0000450
451
showard63a34772008-08-18 19:32:50 +0000452 def _schedule_non_metahost(self, queue_entry):
showard989f25d2008-10-01 11:38:11 +0000453 if not self._is_host_eligible_for_job(queue_entry.host_id, queue_entry):
showard63a34772008-08-18 19:32:50 +0000454 return None
455 return self._hosts_available.pop(queue_entry.host_id, None)
456
457
458 def _is_host_usable(self, host_id):
459 if host_id not in self._hosts_available:
460 # host was already used during this scheduling cycle
461 return False
462 if self._hosts_available[host_id].invalid:
463 # Invalid hosts cannot be used for metahosts. They're included in
464 # the original query because they can be used by non-metahosts.
465 return False
466 return True
467
468
469 def _schedule_metahost(self, queue_entry):
470 label_id = queue_entry.meta_host
471 hosts_in_label = self._label_hosts.get(label_id, set())
472 ineligible_host_ids = self._ineligible_hosts.get(queue_entry.job_id,
473 set())
474
475 # must iterate over a copy so we can mutate the original while iterating
476 for host_id in list(hosts_in_label):
477 if not self._is_host_usable(host_id):
478 hosts_in_label.remove(host_id)
479 continue
480 if host_id in ineligible_host_ids:
481 continue
showard989f25d2008-10-01 11:38:11 +0000482 if not self._is_host_eligible_for_job(host_id, queue_entry):
showard63a34772008-08-18 19:32:50 +0000483 continue
484
showard89f84db2009-03-12 20:39:13 +0000485 # Remove the host from our cached internal state before returning
486 # the host object.
showard63a34772008-08-18 19:32:50 +0000487 hosts_in_label.remove(host_id)
488 return self._hosts_available.pop(host_id)
489 return None
490
491
492 def find_eligible_host(self, queue_entry):
493 if not queue_entry.meta_host:
showard89f84db2009-03-12 20:39:13 +0000494 assert queue_entry.host_id is not None
showard63a34772008-08-18 19:32:50 +0000495 return self._schedule_non_metahost(queue_entry)
showard89f84db2009-03-12 20:39:13 +0000496 assert queue_entry.atomic_group_id is None
showard63a34772008-08-18 19:32:50 +0000497 return self._schedule_metahost(queue_entry)
498
499
showard89f84db2009-03-12 20:39:13 +0000500 def find_eligible_atomic_group(self, queue_entry):
501 """
502 Given an atomic group host queue entry, locate an appropriate group
503 of hosts for the associated job to run on.
504
505 The caller is responsible for creating new HQEs for the additional
506 hosts returned in order to run the actual job on them.
507
508 @returns A list of Host instances in a ready state to satisfy this
509 atomic group scheduling. Hosts will all belong to the same
510 atomic group label as specified by the queue_entry.
511 An empty list will be returned if no suitable atomic
512 group could be found.
513
514 TODO(gps): what is responsible for kicking off any attempted repairs on
515 a group of hosts? not this function, but something needs to. We do
516 not communicate that reason for returning [] outside of here...
517 For now, we'll just be unschedulable if enough hosts within one group
518 enter Repair Failed state.
519 """
520 assert queue_entry.atomic_group_id is not None
521 job = queue_entry.job
522 assert job.synch_count and job.synch_count > 0
523 atomic_group = AtomicGroup(id=queue_entry.atomic_group_id)
524 if job.synch_count > atomic_group.max_number_of_machines:
525 # Such a Job and HostQueueEntry should never be possible to
526 # create using the frontend. Regardless, we can't process it.
527 # Abort it immediately and log an error on the scheduler.
528 queue_entry.set_status(models.HostQueueEntry.Status.ABORTED)
showard7629f142009-03-27 21:02:02 +0000529 logging.error(
530 'Error: job %d synch_count=%d > requested atomic_group %d '
531 'max_number_of_machines=%d. Aborted host_queue_entry %d.',
532 job.id, job.synch_count, atomic_group.id,
533 atomic_group.max_number_of_machines, queue_entry.id)
showard89f84db2009-03-12 20:39:13 +0000534 return []
535 hosts_in_label = self._label_hosts.get(queue_entry.meta_host, set())
536 ineligible_host_ids = self._ineligible_hosts.get(queue_entry.job_id,
537 set())
538
539 # Look in each label associated with atomic_group until we find one with
540 # enough hosts to satisfy the job.
541 for atomic_label_id in self._get_atomic_group_labels(atomic_group.id):
542 group_hosts = set(self._label_hosts.get(atomic_label_id, set()))
543 if queue_entry.meta_host is not None:
544 # If we have a metahost label, only allow its hosts.
545 group_hosts.intersection_update(hosts_in_label)
546 group_hosts -= ineligible_host_ids
547 eligible_hosts_in_group = self._get_eligible_hosts_in_group(
548 group_hosts, queue_entry)
549
550 # Job.synch_count is treated as "minimum synch count" when
551 # scheduling for an atomic group of hosts. The atomic group
552 # number of machines is the maximum to pick out of a single
553 # atomic group label for scheduling at one time.
554 min_hosts = job.synch_count
555 max_hosts = atomic_group.max_number_of_machines
556
557 if len(eligible_hosts_in_group) < min_hosts:
558 # Not enough eligible hosts in this atomic group label.
559 continue
560
561 # Limit ourselves to scheduling the atomic group size.
562 if len(eligible_hosts_in_group) > max_hosts:
563 eligible_hosts_in_group = random.sample(
564 eligible_hosts_in_group, max_hosts)
565
566 # Remove the selected hosts from our cached internal state
567 # of available hosts in order to return the Host objects.
568 host_list = []
569 for host_id in eligible_hosts_in_group:
570 hosts_in_label.discard(host_id)
571 host_list.append(self._hosts_available.pop(host_id))
572 return host_list
573
574 return []
575
576
showard170873e2009-01-07 00:22:26 +0000577class Dispatcher(object):
jadmanski0afbb632008-06-06 21:10:57 +0000578 def __init__(self):
579 self._agents = []
showard3bb499f2008-07-03 19:42:20 +0000580 self._last_clean_time = time.time()
showard63a34772008-08-18 19:32:50 +0000581 self._host_scheduler = HostScheduler()
mblighf3294cc2009-04-08 21:17:38 +0000582 user_cleanup_time = scheduler_config.config.clean_interval
583 self._periodic_cleanup = monitor_db_cleanup.UserCleanup(
584 _db, user_cleanup_time)
585 self._24hr_upkeep = monitor_db_cleanup.TwentyFourHourUpkeep(_db)
showard170873e2009-01-07 00:22:26 +0000586 self._host_agents = {}
587 self._queue_entry_agents = {}
mbligh36768f02008-02-22 18:28:33 +0000588
mbligh36768f02008-02-22 18:28:33 +0000589
showard915958d2009-04-22 21:00:58 +0000590 def initialize(self, recover_hosts=True):
591 self._periodic_cleanup.initialize()
592 self._24hr_upkeep.initialize()
593
jadmanski0afbb632008-06-06 21:10:57 +0000594 # always recover processes
595 self._recover_processes()
mblighbb421852008-03-11 22:36:16 +0000596
jadmanski0afbb632008-06-06 21:10:57 +0000597 if recover_hosts:
598 self._recover_hosts()
mbligh36768f02008-02-22 18:28:33 +0000599
600
jadmanski0afbb632008-06-06 21:10:57 +0000601 def tick(self):
showard170873e2009-01-07 00:22:26 +0000602 _drone_manager.refresh()
mblighf3294cc2009-04-08 21:17:38 +0000603 self._run_cleanup()
jadmanski0afbb632008-06-06 21:10:57 +0000604 self._find_aborting()
605 self._schedule_new_jobs()
606 self._handle_agents()
showard170873e2009-01-07 00:22:26 +0000607 _drone_manager.execute_actions()
608 email_manager.manager.send_queued_emails()
mbligh36768f02008-02-22 18:28:33 +0000609
showard97aed502008-11-04 02:01:24 +0000610
mblighf3294cc2009-04-08 21:17:38 +0000611 def _run_cleanup(self):
612 self._periodic_cleanup.run_cleanup_maybe()
613 self._24hr_upkeep.run_cleanup_maybe()
showarda3ab0d52008-11-03 19:03:47 +0000614
mbligh36768f02008-02-22 18:28:33 +0000615
showard170873e2009-01-07 00:22:26 +0000616 def _register_agent_for_ids(self, agent_dict, object_ids, agent):
617 for object_id in object_ids:
618 agent_dict.setdefault(object_id, set()).add(agent)
619
620
621 def _unregister_agent_for_ids(self, agent_dict, object_ids, agent):
622 for object_id in object_ids:
623 assert object_id in agent_dict
624 agent_dict[object_id].remove(agent)
625
626
jadmanski0afbb632008-06-06 21:10:57 +0000627 def add_agent(self, agent):
628 self._agents.append(agent)
629 agent.dispatcher = self
showard170873e2009-01-07 00:22:26 +0000630 self._register_agent_for_ids(self._host_agents, agent.host_ids, agent)
631 self._register_agent_for_ids(self._queue_entry_agents,
632 agent.queue_entry_ids, agent)
mblighd5c95802008-03-05 00:33:46 +0000633
showard170873e2009-01-07 00:22:26 +0000634
635 def get_agents_for_entry(self, queue_entry):
636 """
637 Find agents corresponding to the specified queue_entry.
638 """
showardd3dc1992009-04-22 21:01:40 +0000639 return list(self._queue_entry_agents.get(queue_entry.id, set()))
showard170873e2009-01-07 00:22:26 +0000640
641
642 def host_has_agent(self, host):
643 """
644 Determine if there is currently an Agent present using this host.
645 """
646 return bool(self._host_agents.get(host.id, None))
mbligh36768f02008-02-22 18:28:33 +0000647
648
jadmanski0afbb632008-06-06 21:10:57 +0000649 def remove_agent(self, agent):
650 self._agents.remove(agent)
showard170873e2009-01-07 00:22:26 +0000651 self._unregister_agent_for_ids(self._host_agents, agent.host_ids,
652 agent)
653 self._unregister_agent_for_ids(self._queue_entry_agents,
654 agent.queue_entry_ids, agent)
showardec113162008-05-08 00:52:49 +0000655
656
jadmanski0afbb632008-06-06 21:10:57 +0000657 def _recover_processes(self):
showard170873e2009-01-07 00:22:26 +0000658 self._register_pidfiles()
659 _drone_manager.refresh()
showardd3dc1992009-04-22 21:01:40 +0000660 self._recover_all_recoverable_entries()
showard170873e2009-01-07 00:22:26 +0000661 self._requeue_other_active_entries()
showard170873e2009-01-07 00:22:26 +0000662 self._reverify_remaining_hosts()
663 # reinitialize drones after killing orphaned processes, since they can
664 # leave around files when they die
665 _drone_manager.execute_actions()
666 _drone_manager.reinitialize_drones()
mblighbb421852008-03-11 22:36:16 +0000667
showard170873e2009-01-07 00:22:26 +0000668
669 def _register_pidfiles(self):
670 # during recovery we may need to read pidfiles for both running and
671 # parsing entries
672 queue_entries = HostQueueEntry.fetch(
showardd3dc1992009-04-22 21:01:40 +0000673 where="status IN ('Running', 'Gathering', 'Parsing')")
jadmanski0afbb632008-06-06 21:10:57 +0000674 for queue_entry in queue_entries:
showardd3dc1992009-04-22 21:01:40 +0000675 for pidfile_name in _ALL_PIDFILE_NAMES:
676 pidfile_id = _drone_manager.get_pidfile_id_from(
677 queue_entry.execution_tag(), pidfile_name=pidfile_name)
678 _drone_manager.register_pidfile(pidfile_id)
showard170873e2009-01-07 00:22:26 +0000679
680
showardd3dc1992009-04-22 21:01:40 +0000681 def _recover_entries_with_status(self, status, orphans, pidfile_name,
682 recover_entries_fn):
683 queue_entries = HostQueueEntry.fetch(where="status = '%s'" % status)
showard170873e2009-01-07 00:22:26 +0000684 for queue_entry in queue_entries:
685 if self.get_agents_for_entry(queue_entry):
jadmanski0afbb632008-06-06 21:10:57 +0000686 # synchronous job we've already recovered
687 continue
showardd3dc1992009-04-22 21:01:40 +0000688 queue_entries = queue_entry.job.get_group_entries(queue_entry)
showard170873e2009-01-07 00:22:26 +0000689 execution_tag = queue_entry.execution_tag()
690 run_monitor = PidfileRunMonitor()
showardd3dc1992009-04-22 21:01:40 +0000691 run_monitor.attach_to_existing_process(execution_tag,
692 pidfile_name=pidfile_name)
showard170873e2009-01-07 00:22:26 +0000693 if not run_monitor.has_process():
showardd3dc1992009-04-22 21:01:40 +0000694 # execution apparently never happened
695 recover_entries_fn(queue_entry.job, queue_entries, None)
showard170873e2009-01-07 00:22:26 +0000696 continue
mbligh90a549d2008-03-25 23:52:34 +0000697
showardd3dc1992009-04-22 21:01:40 +0000698 logging.info('Recovering %s entry %s (process %s)',
699 status.lower(),
700 ', '.join(str(entry) for entry in queue_entries),
701 run_monitor.get_process())
702 recover_entries_fn(queue_entry.job, queue_entries, run_monitor)
703 orphans.discard(run_monitor.get_process())
704
705
706 def _kill_remaining_orphan_processes(self, orphans):
707 for process in orphans:
showardb18134f2009-03-20 20:52:18 +0000708 logging.info('Killing orphan %s', process)
showard170873e2009-01-07 00:22:26 +0000709 _drone_manager.kill_process(process)
jadmanski0afbb632008-06-06 21:10:57 +0000710
showard170873e2009-01-07 00:22:26 +0000711
showardd3dc1992009-04-22 21:01:40 +0000712 def _recover_running_entries(self, orphans):
713 def recover_entries(job, queue_entries, run_monitor):
714 if run_monitor is not None:
715 queue_task = RecoveryQueueTask(job=job,
716 queue_entries=queue_entries,
717 run_monitor=run_monitor)
718 self.add_agent(Agent(tasks=[queue_task],
719 num_processes=len(queue_entries)))
720 # else, _requeue_other_active_entries will cover this
721
722 self._recover_entries_with_status(models.HostQueueEntry.Status.RUNNING,
723 orphans, '.autoserv_execute',
724 recover_entries)
725
726
727 def _recover_gathering_entries(self, orphans):
728 def recover_entries(job, queue_entries, run_monitor):
729 gather_task = GatherLogsTask(job, queue_entries,
730 run_monitor=run_monitor)
731 self.add_agent(Agent([gather_task]))
732
733 self._recover_entries_with_status(
734 models.HostQueueEntry.Status.GATHERING,
735 orphans, _CRASHINFO_PID_FILE, recover_entries)
736
737
738 def _recover_parsing_entries(self, orphans):
739 def recover_entries(job, queue_entries, run_monitor):
740 reparse_task = FinalReparseTask(queue_entries,
741 run_monitor=run_monitor)
742 self.add_agent(Agent([reparse_task], num_processes=0))
743
744 self._recover_entries_with_status(models.HostQueueEntry.Status.PARSING,
745 orphans, _PARSER_PID_FILE,
746 recover_entries)
747
748
749 def _recover_all_recoverable_entries(self):
750 orphans = _drone_manager.get_orphaned_autoserv_processes()
751 self._recover_running_entries(orphans)
752 self._recover_gathering_entries(orphans)
753 self._recover_parsing_entries(orphans)
754 self._kill_remaining_orphan_processes(orphans)
jadmanski0afbb632008-06-06 21:10:57 +0000755
showard97aed502008-11-04 02:01:24 +0000756
showard170873e2009-01-07 00:22:26 +0000757 def _requeue_other_active_entries(self):
758 queue_entries = HostQueueEntry.fetch(
showardd3dc1992009-04-22 21:01:40 +0000759 where='active AND NOT complete AND '
760 '(aborted OR status != "Pending")')
showard170873e2009-01-07 00:22:26 +0000761 for queue_entry in queue_entries:
762 if self.get_agents_for_entry(queue_entry):
763 # entry has already been recovered
764 continue
showardd3dc1992009-04-22 21:01:40 +0000765 if queue_entry.aborted:
766 queue_entry.abort(self)
767 continue
768
769 logging.info('Requeuing active QE %s (status=%s)', queue_entry,
showardb18134f2009-03-20 20:52:18 +0000770 queue_entry.status)
showard170873e2009-01-07 00:22:26 +0000771 if queue_entry.host:
772 tasks = queue_entry.host.reverify_tasks()
773 self.add_agent(Agent(tasks))
774 agent = queue_entry.requeue()
775
776
777 def _reverify_remaining_hosts(self):
showard45ae8192008-11-05 19:32:53 +0000778 # reverify hosts that were in the middle of verify, repair or cleanup
jadmanski0afbb632008-06-06 21:10:57 +0000779 self._reverify_hosts_where("""(status = 'Repairing' OR
780 status = 'Verifying' OR
showard170873e2009-01-07 00:22:26 +0000781 status = 'Cleaning')""")
jadmanski0afbb632008-06-06 21:10:57 +0000782
showard170873e2009-01-07 00:22:26 +0000783 # recover "Running" hosts with no active queue entries, although this
784 # should never happen
785 message = ('Recovering running host %s - this probably indicates a '
786 'scheduler bug')
jadmanski0afbb632008-06-06 21:10:57 +0000787 self._reverify_hosts_where("""status = 'Running' AND
788 id NOT IN (SELECT host_id
789 FROM host_queue_entries
790 WHERE active)""",
791 print_message=message)
mblighbb421852008-03-11 22:36:16 +0000792
793
jadmanski0afbb632008-06-06 21:10:57 +0000794 def _reverify_hosts_where(self, where,
showard170873e2009-01-07 00:22:26 +0000795 print_message='Reverifying host %s'):
796 full_where='locked = 0 AND invalid = 0 AND ' + where
797 for host in Host.fetch(where=full_where):
798 if self.host_has_agent(host):
799 # host has already been recovered in some way
jadmanski0afbb632008-06-06 21:10:57 +0000800 continue
showard170873e2009-01-07 00:22:26 +0000801 if print_message:
showardb18134f2009-03-20 20:52:18 +0000802 logging.info(print_message, host.hostname)
showard170873e2009-01-07 00:22:26 +0000803 tasks = host.reverify_tasks()
804 self.add_agent(Agent(tasks))
mbligh36768f02008-02-22 18:28:33 +0000805
806
jadmanski0afbb632008-06-06 21:10:57 +0000807 def _recover_hosts(self):
808 # recover "Repair Failed" hosts
809 message = 'Reverifying dead host %s'
810 self._reverify_hosts_where("status = 'Repair Failed'",
811 print_message=message)
mbligh62ba2ed2008-04-30 17:09:25 +0000812
813
showard04c82c52008-05-29 19:38:12 +0000814
showardb95b1bd2008-08-15 18:11:04 +0000815 def _get_pending_queue_entries(self):
showard63a34772008-08-18 19:32:50 +0000816 # prioritize by job priority, then non-metahost over metahost, then FIFO
817 return list(HostQueueEntry.fetch(
showard25cbdbd2009-02-17 20:57:21 +0000818 joins='INNER JOIN jobs ON (job_id=jobs.id)',
showardac9ce222008-12-03 18:19:44 +0000819 where='NOT complete AND NOT active AND status="Queued"',
showard25cbdbd2009-02-17 20:57:21 +0000820 order_by='jobs.priority DESC, meta_host, job_id'))
mbligh36768f02008-02-22 18:28:33 +0000821
822
showard89f84db2009-03-12 20:39:13 +0000823 def _refresh_pending_queue_entries(self):
824 """
825 Lookup the pending HostQueueEntries and call our HostScheduler
826 refresh() method given that list. Return the list.
827
828 @returns A list of pending HostQueueEntries sorted in priority order.
829 """
showard63a34772008-08-18 19:32:50 +0000830 queue_entries = self._get_pending_queue_entries()
831 if not queue_entries:
showard89f84db2009-03-12 20:39:13 +0000832 return []
showardb95b1bd2008-08-15 18:11:04 +0000833
showard63a34772008-08-18 19:32:50 +0000834 self._host_scheduler.refresh(queue_entries)
showardb95b1bd2008-08-15 18:11:04 +0000835
showard89f84db2009-03-12 20:39:13 +0000836 return queue_entries
837
838
839 def _schedule_atomic_group(self, queue_entry):
840 """
841 Schedule the given queue_entry on an atomic group of hosts.
842
843 Returns immediately if there are insufficient available hosts.
844
845 Creates new HostQueueEntries based off of queue_entry for the
846 scheduled hosts and starts them all running.
847 """
848 # This is a virtual host queue entry representing an entire
849 # atomic group, find a group and schedule their hosts.
850 group_hosts = self._host_scheduler.find_eligible_atomic_group(
851 queue_entry)
852 if not group_hosts:
853 return
854 # The first assigned host uses the original HostQueueEntry
855 group_queue_entries = [queue_entry]
856 for assigned_host in group_hosts[1:]:
857 # Create a new HQE for every additional assigned_host.
858 new_hqe = HostQueueEntry.clone(queue_entry)
859 new_hqe.save()
860 group_queue_entries.append(new_hqe)
861 assert len(group_queue_entries) == len(group_hosts)
862 for queue_entry, host in itertools.izip(group_queue_entries,
863 group_hosts):
864 self._run_queue_entry(queue_entry, host)
865
866
867 def _schedule_new_jobs(self):
868 queue_entries = self._refresh_pending_queue_entries()
869 if not queue_entries:
870 return
871
showard63a34772008-08-18 19:32:50 +0000872 for queue_entry in queue_entries:
showard89f84db2009-03-12 20:39:13 +0000873 if (queue_entry.atomic_group_id is None or
874 queue_entry.host_id is not None):
875 assigned_host = self._host_scheduler.find_eligible_host(
876 queue_entry)
877 if assigned_host:
878 self._run_queue_entry(queue_entry, assigned_host)
879 else:
880 self._schedule_atomic_group(queue_entry)
showardb95b1bd2008-08-15 18:11:04 +0000881
882
883 def _run_queue_entry(self, queue_entry, host):
884 agent = queue_entry.run(assigned_host=host)
showard170873e2009-01-07 00:22:26 +0000885 # in some cases (synchronous jobs with run_verify=False), agent may be
886 # None
showard9976ce92008-10-15 20:28:13 +0000887 if agent:
888 self.add_agent(agent)
mblighd5c95802008-03-05 00:33:46 +0000889
890
jadmanski0afbb632008-06-06 21:10:57 +0000891 def _find_aborting(self):
showardd3dc1992009-04-22 21:01:40 +0000892 for entry in HostQueueEntry.fetch(where='aborted and not complete'):
893 for agent in self.get_agents_for_entry(entry):
894 agent.abort()
895 entry.abort(self)
jadmanski0afbb632008-06-06 21:10:57 +0000896
897
showard324bf812009-01-20 23:23:38 +0000898 def _can_start_agent(self, agent, num_started_this_cycle,
899 have_reached_limit):
showard4c5374f2008-09-04 17:02:56 +0000900 # always allow zero-process agents to run
901 if agent.num_processes == 0:
902 return True
903 # don't allow any nonzero-process agents to run after we've reached a
904 # limit (this avoids starvation of many-process agents)
905 if have_reached_limit:
906 return False
907 # total process throttling
showard324bf812009-01-20 23:23:38 +0000908 if agent.num_processes > _drone_manager.max_runnable_processes():
showard4c5374f2008-09-04 17:02:56 +0000909 return False
910 # if a single agent exceeds the per-cycle throttling, still allow it to
911 # run when it's the first agent in the cycle
912 if num_started_this_cycle == 0:
913 return True
914 # per-cycle throttling
915 if (num_started_this_cycle + agent.num_processes >
showardd1ee1dd2009-01-07 21:33:08 +0000916 scheduler_config.config.max_processes_started_per_cycle):
showard4c5374f2008-09-04 17:02:56 +0000917 return False
918 return True
919
920
jadmanski0afbb632008-06-06 21:10:57 +0000921 def _handle_agents(self):
jadmanski0afbb632008-06-06 21:10:57 +0000922 num_started_this_cycle = 0
showard4c5374f2008-09-04 17:02:56 +0000923 have_reached_limit = False
924 # iterate over copy, so we can remove agents during iteration
925 for agent in list(self._agents):
926 if agent.is_done():
showardb18134f2009-03-20 20:52:18 +0000927 logging.info("agent finished")
showard170873e2009-01-07 00:22:26 +0000928 self.remove_agent(agent)
showard4c5374f2008-09-04 17:02:56 +0000929 continue
930 if not agent.is_running():
showard324bf812009-01-20 23:23:38 +0000931 if not self._can_start_agent(agent, num_started_this_cycle,
showard4c5374f2008-09-04 17:02:56 +0000932 have_reached_limit):
933 have_reached_limit = True
934 continue
showard4c5374f2008-09-04 17:02:56 +0000935 num_started_this_cycle += agent.num_processes
936 agent.tick()
showardb18134f2009-03-20 20:52:18 +0000937 logging.info('%d running processes',
938 _drone_manager.total_running_processes())
mbligh36768f02008-02-22 18:28:33 +0000939
940
showard170873e2009-01-07 00:22:26 +0000941class PidfileRunMonitor(object):
942 """
943 Client must call either run() to start a new process or
944 attach_to_existing_process().
945 """
mbligh36768f02008-02-22 18:28:33 +0000946
showard170873e2009-01-07 00:22:26 +0000947 class _PidfileException(Exception):
948 """
949 Raised when there's some unexpected behavior with the pid file, but only
950 used internally (never allowed to escape this class).
951 """
mbligh36768f02008-02-22 18:28:33 +0000952
953
showard170873e2009-01-07 00:22:26 +0000954 def __init__(self):
showard35162b02009-03-03 02:17:30 +0000955 self.lost_process = False
showard170873e2009-01-07 00:22:26 +0000956 self._start_time = None
957 self.pidfile_id = None
958 self._state = drone_manager.PidfileContents()
showard2bab8f42008-11-12 18:15:22 +0000959
960
showard170873e2009-01-07 00:22:26 +0000961 def _add_nice_command(self, command, nice_level):
962 if not nice_level:
963 return command
964 return ['nice', '-n', str(nice_level)] + command
965
966
967 def _set_start_time(self):
968 self._start_time = time.time()
969
970
971 def run(self, command, working_directory, nice_level=None, log_file=None,
972 pidfile_name=None, paired_with_pidfile=None):
973 assert command is not None
974 if nice_level is not None:
975 command = ['nice', '-n', str(nice_level)] + command
976 self._set_start_time()
977 self.pidfile_id = _drone_manager.execute_command(
showardd3dc1992009-04-22 21:01:40 +0000978 command, working_directory, pidfile_name=pidfile_name,
979 log_file=log_file, paired_with_pidfile=paired_with_pidfile)
showard170873e2009-01-07 00:22:26 +0000980
981
showardd3dc1992009-04-22 21:01:40 +0000982 def attach_to_existing_process(self, execution_tag,
983 pidfile_name=_AUTOSERV_PID_FILE):
showard170873e2009-01-07 00:22:26 +0000984 self._set_start_time()
showardd3dc1992009-04-22 21:01:40 +0000985 self.pidfile_id = _drone_manager.get_pidfile_id_from(
986 execution_tag, pidfile_name=pidfile_name)
showard170873e2009-01-07 00:22:26 +0000987 _drone_manager.register_pidfile(self.pidfile_id)
mblighbb421852008-03-11 22:36:16 +0000988
989
jadmanski0afbb632008-06-06 21:10:57 +0000990 def kill(self):
showard170873e2009-01-07 00:22:26 +0000991 if self.has_process():
992 _drone_manager.kill_process(self.get_process())
mblighbb421852008-03-11 22:36:16 +0000993
mbligh36768f02008-02-22 18:28:33 +0000994
showard170873e2009-01-07 00:22:26 +0000995 def has_process(self):
showard21baa452008-10-21 00:08:39 +0000996 self._get_pidfile_info()
showard170873e2009-01-07 00:22:26 +0000997 return self._state.process is not None
showard21baa452008-10-21 00:08:39 +0000998
999
showard170873e2009-01-07 00:22:26 +00001000 def get_process(self):
showard21baa452008-10-21 00:08:39 +00001001 self._get_pidfile_info()
showard35162b02009-03-03 02:17:30 +00001002 assert self._state.process is not None
showard170873e2009-01-07 00:22:26 +00001003 return self._state.process
mblighbb421852008-03-11 22:36:16 +00001004
1005
showard170873e2009-01-07 00:22:26 +00001006 def _read_pidfile(self, use_second_read=False):
1007 assert self.pidfile_id is not None, (
1008 'You must call run() or attach_to_existing_process()')
1009 contents = _drone_manager.get_pidfile_contents(
1010 self.pidfile_id, use_second_read=use_second_read)
1011 if contents.is_invalid():
1012 self._state = drone_manager.PidfileContents()
1013 raise self._PidfileException(contents)
1014 self._state = contents
mbligh90a549d2008-03-25 23:52:34 +00001015
1016
showard21baa452008-10-21 00:08:39 +00001017 def _handle_pidfile_error(self, error, message=''):
showard170873e2009-01-07 00:22:26 +00001018 message = error + '\nProcess: %s\nPidfile: %s\n%s' % (
1019 self._state.process, self.pidfile_id, message)
showardb18134f2009-03-20 20:52:18 +00001020 logging.info(message)
showard170873e2009-01-07 00:22:26 +00001021 email_manager.manager.enqueue_notify_email(error, message)
showard35162b02009-03-03 02:17:30 +00001022 self.on_lost_process(self._state.process)
showard21baa452008-10-21 00:08:39 +00001023
1024
1025 def _get_pidfile_info_helper(self):
showard35162b02009-03-03 02:17:30 +00001026 if self.lost_process:
showard21baa452008-10-21 00:08:39 +00001027 return
mblighbb421852008-03-11 22:36:16 +00001028
showard21baa452008-10-21 00:08:39 +00001029 self._read_pidfile()
mblighbb421852008-03-11 22:36:16 +00001030
showard170873e2009-01-07 00:22:26 +00001031 if self._state.process is None:
1032 self._handle_no_process()
showard21baa452008-10-21 00:08:39 +00001033 return
mbligh90a549d2008-03-25 23:52:34 +00001034
showard21baa452008-10-21 00:08:39 +00001035 if self._state.exit_status is None:
jadmanski0afbb632008-06-06 21:10:57 +00001036 # double check whether or not autoserv is running
showard170873e2009-01-07 00:22:26 +00001037 if _drone_manager.is_process_running(self._state.process):
showard21baa452008-10-21 00:08:39 +00001038 return
mbligh90a549d2008-03-25 23:52:34 +00001039
showard170873e2009-01-07 00:22:26 +00001040 # pid but no running process - maybe process *just* exited
1041 self._read_pidfile(use_second_read=True)
showard21baa452008-10-21 00:08:39 +00001042 if self._state.exit_status is None:
jadmanski0afbb632008-06-06 21:10:57 +00001043 # autoserv exited without writing an exit code
1044 # to the pidfile
showard21baa452008-10-21 00:08:39 +00001045 self._handle_pidfile_error(
1046 'autoserv died without writing exit code')
mblighbb421852008-03-11 22:36:16 +00001047
showard21baa452008-10-21 00:08:39 +00001048
1049 def _get_pidfile_info(self):
1050 """\
1051 After completion, self._state will contain:
1052 pid=None, exit_status=None if autoserv has not yet run
1053 pid!=None, exit_status=None if autoserv is running
1054 pid!=None, exit_status!=None if autoserv has completed
1055 """
1056 try:
1057 self._get_pidfile_info_helper()
showard170873e2009-01-07 00:22:26 +00001058 except self._PidfileException, exc:
showard21baa452008-10-21 00:08:39 +00001059 self._handle_pidfile_error('Pidfile error', traceback.format_exc())
mblighbb421852008-03-11 22:36:16 +00001060
1061
showard170873e2009-01-07 00:22:26 +00001062 def _handle_no_process(self):
jadmanski0afbb632008-06-06 21:10:57 +00001063 """\
1064 Called when no pidfile is found or no pid is in the pidfile.
1065 """
showard170873e2009-01-07 00:22:26 +00001066 message = 'No pid found at %s' % self.pidfile_id
showardb18134f2009-03-20 20:52:18 +00001067 logging.info(message)
showard170873e2009-01-07 00:22:26 +00001068 if time.time() - self._start_time > PIDFILE_TIMEOUT:
1069 email_manager.manager.enqueue_notify_email(
jadmanski0afbb632008-06-06 21:10:57 +00001070 'Process has failed to write pidfile', message)
showard35162b02009-03-03 02:17:30 +00001071 self.on_lost_process()
mbligh90a549d2008-03-25 23:52:34 +00001072
1073
showard35162b02009-03-03 02:17:30 +00001074 def on_lost_process(self, process=None):
jadmanski0afbb632008-06-06 21:10:57 +00001075 """\
1076 Called when autoserv has exited without writing an exit status,
1077 or we've timed out waiting for autoserv to write a pid to the
1078 pidfile. In either case, we just return failure and the caller
1079 should signal some kind of warning.
mbligh90a549d2008-03-25 23:52:34 +00001080
showard170873e2009-01-07 00:22:26 +00001081 process is unimportant here, as it shouldn't be used by anyone.
jadmanski0afbb632008-06-06 21:10:57 +00001082 """
1083 self.lost_process = True
showard170873e2009-01-07 00:22:26 +00001084 self._state.process = process
showard21baa452008-10-21 00:08:39 +00001085 self._state.exit_status = 1
1086 self._state.num_tests_failed = 0
mbligh90a549d2008-03-25 23:52:34 +00001087
1088
jadmanski0afbb632008-06-06 21:10:57 +00001089 def exit_code(self):
showard21baa452008-10-21 00:08:39 +00001090 self._get_pidfile_info()
1091 return self._state.exit_status
1092
1093
1094 def num_tests_failed(self):
1095 self._get_pidfile_info()
1096 assert self._state.num_tests_failed is not None
1097 return self._state.num_tests_failed
mblighbb421852008-03-11 22:36:16 +00001098
1099
mbligh36768f02008-02-22 18:28:33 +00001100class Agent(object):
showard170873e2009-01-07 00:22:26 +00001101 def __init__(self, tasks, num_processes=1):
jadmanski0afbb632008-06-06 21:10:57 +00001102 self.active_task = None
showardd3dc1992009-04-22 21:01:40 +00001103 self.queue = None
jadmanski0afbb632008-06-06 21:10:57 +00001104 self.dispatcher = None
showard4c5374f2008-09-04 17:02:56 +00001105 self.num_processes = num_processes
jadmanski0afbb632008-06-06 21:10:57 +00001106
showard170873e2009-01-07 00:22:26 +00001107 self.queue_entry_ids = self._union_ids(task.queue_entry_ids
1108 for task in tasks)
1109 self.host_ids = self._union_ids(task.host_ids for task in tasks)
1110
showardd3dc1992009-04-22 21:01:40 +00001111 self._clear_queue()
jadmanski0afbb632008-06-06 21:10:57 +00001112 for task in tasks:
1113 self.add_task(task)
mbligh36768f02008-02-22 18:28:33 +00001114
1115
showardd3dc1992009-04-22 21:01:40 +00001116 def _clear_queue(self):
1117 self.queue = Queue.Queue(0)
1118
1119
showard170873e2009-01-07 00:22:26 +00001120 def _union_ids(self, id_lists):
1121 return set(itertools.chain(*id_lists))
1122
1123
jadmanski0afbb632008-06-06 21:10:57 +00001124 def add_task(self, task):
1125 self.queue.put_nowait(task)
1126 task.agent = self
mbligh36768f02008-02-22 18:28:33 +00001127
1128
jadmanski0afbb632008-06-06 21:10:57 +00001129 def tick(self):
showard21baa452008-10-21 00:08:39 +00001130 while not self.is_done():
1131 if self.active_task and not self.active_task.is_done():
1132 self.active_task.poll()
1133 if not self.active_task.is_done():
1134 return
1135 self._next_task()
mbligh36768f02008-02-22 18:28:33 +00001136
1137
jadmanski0afbb632008-06-06 21:10:57 +00001138 def _next_task(self):
showardb18134f2009-03-20 20:52:18 +00001139 logging.info("agent picking task")
jadmanski0afbb632008-06-06 21:10:57 +00001140 if self.active_task:
1141 assert self.active_task.is_done()
jadmanski0afbb632008-06-06 21:10:57 +00001142 if not self.active_task.success:
1143 self.on_task_failure()
mblighe2586682008-02-29 22:45:46 +00001144
jadmanski0afbb632008-06-06 21:10:57 +00001145 self.active_task = None
1146 if not self.is_done():
1147 self.active_task = self.queue.get_nowait()
1148 if self.active_task:
1149 self.active_task.start()
mbligh36768f02008-02-22 18:28:33 +00001150
1151
jadmanski0afbb632008-06-06 21:10:57 +00001152 def on_task_failure(self):
showardd3dc1992009-04-22 21:01:40 +00001153 self._clear_queue()
showardccbd6c52009-03-21 00:10:21 +00001154 # run failure tasks in a new Agent, so host_ids and queue_entry_ids will
1155 # get reset.
1156 new_agent = Agent(self.active_task.failure_tasks)
1157 self.dispatcher.add_agent(new_agent)
mbligh16c722d2008-03-05 00:58:44 +00001158
mblighe2586682008-02-29 22:45:46 +00001159
showard4c5374f2008-09-04 17:02:56 +00001160 def is_running(self):
jadmanski0afbb632008-06-06 21:10:57 +00001161 return self.active_task is not None
showardec113162008-05-08 00:52:49 +00001162
1163
jadmanski0afbb632008-06-06 21:10:57 +00001164 def is_done(self):
mblighd876f452008-12-03 15:09:17 +00001165 return self.active_task is None and self.queue.empty()
mbligh36768f02008-02-22 18:28:33 +00001166
1167
jadmanski0afbb632008-06-06 21:10:57 +00001168 def start(self):
1169 assert self.dispatcher
jadmanski0afbb632008-06-06 21:10:57 +00001170 self._next_task()
mbligh36768f02008-02-22 18:28:33 +00001171
jadmanski0afbb632008-06-06 21:10:57 +00001172
showardd3dc1992009-04-22 21:01:40 +00001173 def abort(self):
1174 if self.active_task:
1175 self.active_task.abort()
showard6b733412009-04-27 20:09:18 +00001176 if self.active_task.aborted: # tasks can choose to ignore aborts
1177 self.active_task = None
1178 self._clear_queue()
showardd3dc1992009-04-22 21:01:40 +00001179
1180
mbligh36768f02008-02-22 18:28:33 +00001181class AgentTask(object):
showardd3dc1992009-04-22 21:01:40 +00001182 def __init__(self, cmd, working_directory=None, failure_tasks=[],
1183 pidfile_name=None, paired_with_pidfile=None):
jadmanski0afbb632008-06-06 21:10:57 +00001184 self.done = False
1185 self.failure_tasks = failure_tasks
1186 self.started = False
1187 self.cmd = cmd
showard170873e2009-01-07 00:22:26 +00001188 self._working_directory = working_directory
jadmanski0afbb632008-06-06 21:10:57 +00001189 self.task = None
1190 self.agent = None
1191 self.monitor = None
1192 self.success = None
showardd3dc1992009-04-22 21:01:40 +00001193 self.aborted = False
showard170873e2009-01-07 00:22:26 +00001194 self.queue_entry_ids = []
1195 self.host_ids = []
1196 self.log_file = None
1197
1198
1199 def _set_ids(self, host=None, queue_entries=None):
1200 if queue_entries and queue_entries != [None]:
1201 self.host_ids = [entry.host.id for entry in queue_entries]
1202 self.queue_entry_ids = [entry.id for entry in queue_entries]
1203 else:
1204 assert host
1205 self.host_ids = [host.id]
mbligh36768f02008-02-22 18:28:33 +00001206
1207
jadmanski0afbb632008-06-06 21:10:57 +00001208 def poll(self):
jadmanski0afbb632008-06-06 21:10:57 +00001209 if self.monitor:
1210 self.tick(self.monitor.exit_code())
1211 else:
1212 self.finished(False)
mbligh36768f02008-02-22 18:28:33 +00001213
1214
jadmanski0afbb632008-06-06 21:10:57 +00001215 def tick(self, exit_code):
showard170873e2009-01-07 00:22:26 +00001216 if exit_code is None:
jadmanski0afbb632008-06-06 21:10:57 +00001217 return
jadmanski0afbb632008-06-06 21:10:57 +00001218 if exit_code == 0:
1219 success = True
1220 else:
1221 success = False
mbligh36768f02008-02-22 18:28:33 +00001222
jadmanski0afbb632008-06-06 21:10:57 +00001223 self.finished(success)
mbligh36768f02008-02-22 18:28:33 +00001224
1225
jadmanski0afbb632008-06-06 21:10:57 +00001226 def is_done(self):
1227 return self.done
mbligh36768f02008-02-22 18:28:33 +00001228
1229
jadmanski0afbb632008-06-06 21:10:57 +00001230 def finished(self, success):
1231 self.done = True
1232 self.success = success
1233 self.epilog()
mbligh36768f02008-02-22 18:28:33 +00001234
1235
jadmanski0afbb632008-06-06 21:10:57 +00001236 def prolog(self):
1237 pass
mblighd64e5702008-04-04 21:39:28 +00001238
1239
jadmanski0afbb632008-06-06 21:10:57 +00001240 def create_temp_resultsdir(self, suffix=''):
showard170873e2009-01-07 00:22:26 +00001241 self.temp_results_dir = _drone_manager.get_temporary_path('agent_task')
mblighd64e5702008-04-04 21:39:28 +00001242
mbligh36768f02008-02-22 18:28:33 +00001243
jadmanski0afbb632008-06-06 21:10:57 +00001244 def cleanup(self):
showard6b733412009-04-27 20:09:18 +00001245 if self.monitor and self.monitor.has_process() and self.log_file:
showard170873e2009-01-07 00:22:26 +00001246 _drone_manager.copy_to_results_repository(
1247 self.monitor.get_process(), self.log_file)
mbligh36768f02008-02-22 18:28:33 +00001248
1249
jadmanski0afbb632008-06-06 21:10:57 +00001250 def epilog(self):
1251 self.cleanup()
mbligh36768f02008-02-22 18:28:33 +00001252
1253
jadmanski0afbb632008-06-06 21:10:57 +00001254 def start(self):
1255 assert self.agent
1256
1257 if not self.started:
1258 self.prolog()
1259 self.run()
1260
1261 self.started = True
1262
1263
1264 def abort(self):
1265 if self.monitor:
1266 self.monitor.kill()
1267 self.done = True
showardd3dc1992009-04-22 21:01:40 +00001268 self.aborted = True
jadmanski0afbb632008-06-06 21:10:57 +00001269 self.cleanup()
1270
1271
showard170873e2009-01-07 00:22:26 +00001272 def set_host_log_file(self, base_name, host):
1273 filename = '%s.%s' % (time.time(), base_name)
1274 self.log_file = os.path.join('hosts', host.hostname, filename)
1275
1276
showardde634ee2009-01-30 01:44:24 +00001277 def _get_consistent_execution_tag(self, queue_entries):
1278 first_execution_tag = queue_entries[0].execution_tag()
1279 for queue_entry in queue_entries[1:]:
1280 assert queue_entry.execution_tag() == first_execution_tag, (
1281 '%s (%s) != %s (%s)' % (queue_entry.execution_tag(),
1282 queue_entry,
1283 first_execution_tag,
1284 queue_entries[0]))
1285 return first_execution_tag
1286
1287
showard6b733412009-04-27 20:09:18 +00001288 def _copy_and_parse_results(self, queue_entries, use_monitor=None):
showardde634ee2009-01-30 01:44:24 +00001289 assert len(queue_entries) > 0
showard6b733412009-04-27 20:09:18 +00001290 if use_monitor is None:
1291 assert self.monitor
1292 use_monitor = self.monitor
1293 assert use_monitor.has_process()
showardde634ee2009-01-30 01:44:24 +00001294 execution_tag = self._get_consistent_execution_tag(queue_entries)
showard678df4f2009-02-04 21:36:39 +00001295 results_path = execution_tag + '/'
showard6b733412009-04-27 20:09:18 +00001296 _drone_manager.copy_to_results_repository(use_monitor.get_process(),
showard678df4f2009-02-04 21:36:39 +00001297 results_path)
showardde634ee2009-01-30 01:44:24 +00001298
1299 reparse_task = FinalReparseTask(queue_entries)
1300 self.agent.dispatcher.add_agent(Agent([reparse_task], num_processes=0))
1301
1302
showardd3dc1992009-04-22 21:01:40 +00001303 def run(self, pidfile_name=_AUTOSERV_PID_FILE, paired_with_pidfile=None):
jadmanski0afbb632008-06-06 21:10:57 +00001304 if self.cmd:
showard170873e2009-01-07 00:22:26 +00001305 self.monitor = PidfileRunMonitor()
1306 self.monitor.run(self.cmd, self._working_directory,
1307 nice_level=AUTOSERV_NICE_LEVEL,
showardd3dc1992009-04-22 21:01:40 +00001308 log_file=self.log_file,
1309 pidfile_name=pidfile_name,
1310 paired_with_pidfile=paired_with_pidfile)
mbligh36768f02008-02-22 18:28:33 +00001311
1312
showardd9205182009-04-27 20:09:55 +00001313class TaskWithJobKeyvals(object):
1314 """AgentTask mixin providing functionality to help with job keyval files."""
1315 _KEYVAL_FILE = 'keyval'
1316 def _format_keyval(self, key, value):
1317 return '%s=%s' % (key, value)
1318
1319
1320 def _keyval_path(self):
1321 """Subclasses must override this"""
1322 raise NotImplemented
1323
1324
1325 def _write_keyval_after_job(self, field, value):
1326 assert self.monitor
1327 if not self.monitor.has_process():
1328 return
1329 _drone_manager.write_lines_to_file(
1330 self._keyval_path(), [self._format_keyval(field, value)],
1331 paired_with_process=self.monitor.get_process())
1332
1333
1334 def _job_queued_keyval(self, job):
1335 return 'job_queued', int(time.mktime(job.created_on.timetuple()))
1336
1337
1338 def _write_job_finished(self):
1339 self._write_keyval_after_job("job_finished", int(time.time()))
1340
1341
1342class RepairTask(AgentTask, TaskWithJobKeyvals):
showarde788ea62008-11-17 21:02:47 +00001343 def __init__(self, host, queue_entry=None):
jadmanski0afbb632008-06-06 21:10:57 +00001344 """\
showard170873e2009-01-07 00:22:26 +00001345 queue_entry: queue entry to mark failed if this repair fails.
jadmanski0afbb632008-06-06 21:10:57 +00001346 """
jadmanskifb7cfb12008-07-09 14:13:21 +00001347 protection = host_protections.Protection.get_string(host.protection)
jadmanski542537f2008-07-24 14:14:56 +00001348 # normalize the protection name
1349 protection = host_protections.Protection.get_attr_name(protection)
showard170873e2009-01-07 00:22:26 +00001350
jadmanski0afbb632008-06-06 21:10:57 +00001351 self.host = host
showardccbd6c52009-03-21 00:10:21 +00001352 self.queue_entry_to_fail = queue_entry
1353 # *don't* include the queue entry in IDs -- if the queue entry is
1354 # aborted, we want to leave the repair task running
1355 self._set_ids(host=host)
showard170873e2009-01-07 00:22:26 +00001356
1357 self.create_temp_resultsdir('.repair')
showard87ba02a2009-04-20 19:37:32 +00001358 cmd = _autoserv_command_line(host.hostname, self.temp_results_dir,
1359 ['-R', '--host-protection', protection],
1360 queue_entry=queue_entry)
showard170873e2009-01-07 00:22:26 +00001361 super(RepairTask, self).__init__(cmd, self.temp_results_dir)
1362
showard170873e2009-01-07 00:22:26 +00001363 self.set_host_log_file('repair', self.host)
mblighe2586682008-02-29 22:45:46 +00001364
mbligh36768f02008-02-22 18:28:33 +00001365
jadmanski0afbb632008-06-06 21:10:57 +00001366 def prolog(self):
showardb18134f2009-03-20 20:52:18 +00001367 logging.info("repair_task starting")
jadmanski0afbb632008-06-06 21:10:57 +00001368 self.host.set_status('Repairing')
showardccbd6c52009-03-21 00:10:21 +00001369 if self.queue_entry_to_fail:
1370 self.queue_entry_to_fail.requeue()
mbligh36768f02008-02-22 18:28:33 +00001371
1372
showardd9205182009-04-27 20:09:55 +00001373 def _keyval_path(self):
1374 return os.path.join(self.temp_results_dir, self._KEYVAL_FILE)
1375
1376
showardde634ee2009-01-30 01:44:24 +00001377 def _fail_queue_entry(self):
showardccbd6c52009-03-21 00:10:21 +00001378 assert self.queue_entry_to_fail
1379
1380 if self.queue_entry_to_fail.meta_host:
1381 return # don't fail metahost entries, they'll be reassigned
1382
1383 self.queue_entry_to_fail.update_from_database()
1384 if self.queue_entry_to_fail.status != 'Queued':
1385 return # entry has been aborted
1386
1387 self.queue_entry_to_fail.set_execution_subdir()
showardd9205182009-04-27 20:09:55 +00001388 queued_key, queued_time = self._job_queued_keyval(
1389 self.queue_entry_to_fail.job)
1390 self._write_keyval_after_job(queued_key, queued_time)
1391 self._write_job_finished()
showard678df4f2009-02-04 21:36:39 +00001392 # copy results logs into the normal place for job results
1393 _drone_manager.copy_results_on_drone(
1394 self.monitor.get_process(),
1395 source_path=self.temp_results_dir + '/',
showardccbd6c52009-03-21 00:10:21 +00001396 destination_path=self.queue_entry_to_fail.execution_tag() + '/')
showard678df4f2009-02-04 21:36:39 +00001397
showardccbd6c52009-03-21 00:10:21 +00001398 self._copy_and_parse_results([self.queue_entry_to_fail])
1399 self.queue_entry_to_fail.handle_host_failure()
showardde634ee2009-01-30 01:44:24 +00001400
1401
jadmanski0afbb632008-06-06 21:10:57 +00001402 def epilog(self):
1403 super(RepairTask, self).epilog()
1404 if self.success:
1405 self.host.set_status('Ready')
1406 else:
1407 self.host.set_status('Repair Failed')
showardccbd6c52009-03-21 00:10:21 +00001408 if self.queue_entry_to_fail:
showardde634ee2009-01-30 01:44:24 +00001409 self._fail_queue_entry()
mbligh36768f02008-02-22 18:28:33 +00001410
1411
showard8fe93b52008-11-18 17:53:22 +00001412class PreJobTask(AgentTask):
showard170873e2009-01-07 00:22:26 +00001413 def epilog(self):
1414 super(PreJobTask, self).epilog()
showard8fe93b52008-11-18 17:53:22 +00001415 should_copy_results = (self.queue_entry and not self.success
1416 and not self.queue_entry.meta_host)
1417 if should_copy_results:
1418 self.queue_entry.set_execution_subdir()
showard170873e2009-01-07 00:22:26 +00001419 destination = os.path.join(self.queue_entry.execution_tag(),
1420 os.path.basename(self.log_file))
1421 _drone_manager.copy_to_results_repository(
1422 self.monitor.get_process(), self.log_file,
1423 destination_path=destination)
showard8fe93b52008-11-18 17:53:22 +00001424
1425
1426class VerifyTask(PreJobTask):
showard9976ce92008-10-15 20:28:13 +00001427 def __init__(self, queue_entry=None, host=None):
jadmanski0afbb632008-06-06 21:10:57 +00001428 assert bool(queue_entry) != bool(host)
jadmanski0afbb632008-06-06 21:10:57 +00001429 self.host = host or queue_entry.host
1430 self.queue_entry = queue_entry
mbligh36768f02008-02-22 18:28:33 +00001431
jadmanski0afbb632008-06-06 21:10:57 +00001432 self.create_temp_resultsdir('.verify')
showard87ba02a2009-04-20 19:37:32 +00001433 cmd = _autoserv_command_line(self.host.hostname, self.temp_results_dir,
1434 ['-v'], queue_entry=queue_entry)
showarde788ea62008-11-17 21:02:47 +00001435 failure_tasks = [RepairTask(self.host, queue_entry=queue_entry)]
showard170873e2009-01-07 00:22:26 +00001436 super(VerifyTask, self).__init__(cmd, self.temp_results_dir,
1437 failure_tasks=failure_tasks)
mblighe2586682008-02-29 22:45:46 +00001438
showard170873e2009-01-07 00:22:26 +00001439 self.set_host_log_file('verify', self.host)
1440 self._set_ids(host=host, queue_entries=[queue_entry])
mblighe2586682008-02-29 22:45:46 +00001441
1442
jadmanski0afbb632008-06-06 21:10:57 +00001443 def prolog(self):
showard8fe93b52008-11-18 17:53:22 +00001444 super(VerifyTask, self).prolog()
showardb18134f2009-03-20 20:52:18 +00001445 logging.info("starting verify on %s", self.host.hostname)
jadmanski0afbb632008-06-06 21:10:57 +00001446 if self.queue_entry:
1447 self.queue_entry.set_status('Verifying')
jadmanski0afbb632008-06-06 21:10:57 +00001448 self.host.set_status('Verifying')
mbligh36768f02008-02-22 18:28:33 +00001449
1450
jadmanski0afbb632008-06-06 21:10:57 +00001451 def epilog(self):
1452 super(VerifyTask, self).epilog()
mbligh36768f02008-02-22 18:28:33 +00001453
jadmanski0afbb632008-06-06 21:10:57 +00001454 if self.success:
1455 self.host.set_status('Ready')
mbligh36768f02008-02-22 18:28:33 +00001456
1457
showardd9205182009-04-27 20:09:55 +00001458class QueueTask(AgentTask, TaskWithJobKeyvals):
jadmanski0afbb632008-06-06 21:10:57 +00001459 def __init__(self, job, queue_entries, cmd):
jadmanski0afbb632008-06-06 21:10:57 +00001460 self.job = job
1461 self.queue_entries = queue_entries
showard170873e2009-01-07 00:22:26 +00001462 super(QueueTask, self).__init__(cmd, self._execution_tag())
1463 self._set_ids(queue_entries=queue_entries)
mbligh36768f02008-02-22 18:28:33 +00001464
1465
showard73ec0442009-02-07 02:05:20 +00001466 def _keyval_path(self):
showardd9205182009-04-27 20:09:55 +00001467 return os.path.join(self._execution_tag(), self._KEYVAL_FILE)
showard73ec0442009-02-07 02:05:20 +00001468
1469
1470 def _write_keyvals_before_job_helper(self, keyval_dict, keyval_path):
1471 keyval_contents = '\n'.join(self._format_keyval(key, value)
1472 for key, value in keyval_dict.iteritems())
1473 # always end with a newline to allow additional keyvals to be written
1474 keyval_contents += '\n'
1475 _drone_manager.attach_file_to_execution(self._execution_tag(),
1476 keyval_contents,
1477 file_path=keyval_path)
1478
1479
1480 def _write_keyvals_before_job(self, keyval_dict):
1481 self._write_keyvals_before_job_helper(keyval_dict, self._keyval_path())
1482
1483
showard170873e2009-01-07 00:22:26 +00001484 def _write_host_keyvals(self, host):
1485 keyval_path = os.path.join(self._execution_tag(), 'host_keyvals',
1486 host.hostname)
1487 platform, all_labels = host.platform_and_labels()
showard73ec0442009-02-07 02:05:20 +00001488 keyval_dict = dict(platform=platform, labels=','.join(all_labels))
1489 self._write_keyvals_before_job_helper(keyval_dict, keyval_path)
showardd8e548a2008-09-09 03:04:57 +00001490
1491
showard170873e2009-01-07 00:22:26 +00001492 def _execution_tag(self):
1493 return self.queue_entries[0].execution_tag()
mblighbb421852008-03-11 22:36:16 +00001494
1495
jadmanski0afbb632008-06-06 21:10:57 +00001496 def prolog(self):
showardd9205182009-04-27 20:09:55 +00001497 queued_key, queued_time = self._job_queued_keyval(self.job)
1498 self._write_keyvals_before_job({queued_key : queued_time})
jadmanski0afbb632008-06-06 21:10:57 +00001499 for queue_entry in self.queue_entries:
showard170873e2009-01-07 00:22:26 +00001500 self._write_host_keyvals(queue_entry.host)
jadmanski0afbb632008-06-06 21:10:57 +00001501 queue_entry.set_status('Running')
1502 queue_entry.host.set_status('Running')
showard21baa452008-10-21 00:08:39 +00001503 queue_entry.host.update_field('dirty', 1)
showard2bab8f42008-11-12 18:15:22 +00001504 if self.job.synch_count == 1:
jadmanski0afbb632008-06-06 21:10:57 +00001505 assert len(self.queue_entries) == 1
1506 self.job.write_to_machines_file(self.queue_entries[0])
mbligh36768f02008-02-22 18:28:33 +00001507
1508
showard35162b02009-03-03 02:17:30 +00001509 def _write_lost_process_error_file(self):
1510 error_file_path = os.path.join(self._execution_tag(), 'job_failure')
1511 _drone_manager.write_lines_to_file(error_file_path,
1512 [_LOST_PROCESS_ERROR])
1513
1514
showardd3dc1992009-04-22 21:01:40 +00001515 def _finish_task(self):
showardd9205182009-04-27 20:09:55 +00001516 self._write_job_finished()
1517
showardd3dc1992009-04-22 21:01:40 +00001518 # both of these conditionals can be true, iff the process ran, wrote a
1519 # pid to its pidfile, and then exited without writing an exit code
showard35162b02009-03-03 02:17:30 +00001520 if self.monitor.has_process():
showardd3dc1992009-04-22 21:01:40 +00001521 gather_task = GatherLogsTask(self.job, self.queue_entries)
1522 self.agent.dispatcher.add_agent(Agent(tasks=[gather_task]))
showard35162b02009-03-03 02:17:30 +00001523
1524 if self.monitor.lost_process:
1525 self._write_lost_process_error_file()
1526 for queue_entry in self.queue_entries:
1527 queue_entry.set_status(models.HostQueueEntry.Status.FAILED)
jadmanskif7fa2cc2008-10-01 14:13:23 +00001528
1529
showardcbd74612008-11-19 21:42:02 +00001530 def _write_status_comment(self, comment):
showard170873e2009-01-07 00:22:26 +00001531 _drone_manager.write_lines_to_file(
1532 os.path.join(self._execution_tag(), 'status.log'),
1533 ['INFO\t----\t----\t' + comment],
showard35162b02009-03-03 02:17:30 +00001534 paired_with_process=self.monitor.get_process())
showardcbd74612008-11-19 21:42:02 +00001535
1536
jadmanskif7fa2cc2008-10-01 14:13:23 +00001537 def _log_abort(self):
showard170873e2009-01-07 00:22:26 +00001538 if not self.monitor or not self.monitor.has_process():
1539 return
1540
jadmanskif7fa2cc2008-10-01 14:13:23 +00001541 # build up sets of all the aborted_by and aborted_on values
1542 aborted_by, aborted_on = set(), set()
1543 for queue_entry in self.queue_entries:
1544 if queue_entry.aborted_by:
1545 aborted_by.add(queue_entry.aborted_by)
1546 t = int(time.mktime(queue_entry.aborted_on.timetuple()))
1547 aborted_on.add(t)
1548
1549 # extract some actual, unique aborted by value and write it out
1550 assert len(aborted_by) <= 1
1551 if len(aborted_by) == 1:
showardcbd74612008-11-19 21:42:02 +00001552 aborted_by_value = aborted_by.pop()
1553 aborted_on_value = max(aborted_on)
1554 else:
1555 aborted_by_value = 'autotest_system'
1556 aborted_on_value = int(time.time())
showard170873e2009-01-07 00:22:26 +00001557
showarda0382352009-02-11 23:36:43 +00001558 self._write_keyval_after_job("aborted_by", aborted_by_value)
1559 self._write_keyval_after_job("aborted_on", aborted_on_value)
showard170873e2009-01-07 00:22:26 +00001560
showardcbd74612008-11-19 21:42:02 +00001561 aborted_on_string = str(datetime.datetime.fromtimestamp(
1562 aborted_on_value))
1563 self._write_status_comment('Job aborted by %s on %s' %
1564 (aborted_by_value, aborted_on_string))
jadmanskic2ac77f2008-05-16 21:44:04 +00001565
1566
jadmanski0afbb632008-06-06 21:10:57 +00001567 def abort(self):
1568 super(QueueTask, self).abort()
jadmanskif7fa2cc2008-10-01 14:13:23 +00001569 self._log_abort()
showardd3dc1992009-04-22 21:01:40 +00001570 self._finish_task()
showard21baa452008-10-21 00:08:39 +00001571
1572
jadmanski0afbb632008-06-06 21:10:57 +00001573 def epilog(self):
1574 super(QueueTask, self).epilog()
showardd3dc1992009-04-22 21:01:40 +00001575 self._finish_task()
1576 logging.info("queue_task finished with success=%s", self.success)
mbligh36768f02008-02-22 18:28:33 +00001577
1578
mblighbb421852008-03-11 22:36:16 +00001579class RecoveryQueueTask(QueueTask):
jadmanski0afbb632008-06-06 21:10:57 +00001580 def __init__(self, job, queue_entries, run_monitor):
showard170873e2009-01-07 00:22:26 +00001581 super(RecoveryQueueTask, self).__init__(job, queue_entries, cmd=None)
jadmanski0afbb632008-06-06 21:10:57 +00001582 self.run_monitor = run_monitor
mblighbb421852008-03-11 22:36:16 +00001583
1584
jadmanski0afbb632008-06-06 21:10:57 +00001585 def run(self):
1586 self.monitor = self.run_monitor
mblighbb421852008-03-11 22:36:16 +00001587
1588
jadmanski0afbb632008-06-06 21:10:57 +00001589 def prolog(self):
1590 # recovering an existing process - don't do prolog
1591 pass
mblighbb421852008-03-11 22:36:16 +00001592
1593
showardd3dc1992009-04-22 21:01:40 +00001594class PostJobTask(AgentTask):
1595 def __init__(self, queue_entries, pidfile_name, logfile_name,
1596 run_monitor=None):
1597 """
1598 If run_monitor != None, we're recovering a running task.
1599 """
1600 self._queue_entries = queue_entries
1601 self._pidfile_name = pidfile_name
1602 self._run_monitor = run_monitor
1603
1604 self._execution_tag = self._get_consistent_execution_tag(queue_entries)
1605 self._results_dir = _drone_manager.absolute_path(self._execution_tag)
1606 self._autoserv_monitor = PidfileRunMonitor()
1607 self._autoserv_monitor.attach_to_existing_process(self._execution_tag)
1608 self._paired_with_pidfile = self._autoserv_monitor.pidfile_id
1609
1610 if _testing_mode:
1611 command = 'true'
1612 else:
1613 command = self._generate_command(self._results_dir)
1614
1615 super(PostJobTask, self).__init__(cmd=command,
1616 working_directory=self._execution_tag)
1617
1618 self.log_file = os.path.join(self._execution_tag, logfile_name)
1619 self._final_status = self._determine_final_status()
1620
1621
1622 def _generate_command(self, results_dir):
1623 raise NotImplementedError('Subclasses must override this')
1624
1625
1626 def _job_was_aborted(self):
1627 was_aborted = None
1628 for queue_entry in self._queue_entries:
1629 queue_entry.update_from_database()
1630 if was_aborted is None: # first queue entry
1631 was_aborted = bool(queue_entry.aborted)
1632 elif was_aborted != bool(queue_entry.aborted): # subsequent entries
1633 email_manager.manager.enqueue_notify_email(
1634 'Inconsistent abort state',
1635 'Queue entries have inconsistent abort state: ' +
1636 ', '.join('%s (%s)' % (queue_entry, queue_entry.aborted)))
1637 # don't crash here, just assume true
1638 return True
1639 return was_aborted
1640
1641
1642 def _determine_final_status(self):
1643 if self._job_was_aborted():
1644 return models.HostQueueEntry.Status.ABORTED
1645
1646 # we'll use a PidfileRunMonitor to read the autoserv exit status
1647 if self._autoserv_monitor.exit_code() == 0:
1648 return models.HostQueueEntry.Status.COMPLETED
1649 return models.HostQueueEntry.Status.FAILED
1650
1651
1652 def run(self):
1653 if self._run_monitor is not None:
1654 self.monitor = self._run_monitor
1655 else:
1656 # make sure we actually have results to work with.
1657 # this should never happen in normal operation.
1658 if not self._autoserv_monitor.has_process():
1659 email_manager.manager.enqueue_notify_email(
1660 'No results in post-job task',
1661 'No results in post-job task at %s' %
1662 self._autoserv_monitor.pidfile_id)
1663 self.finished(False)
1664 return
1665
1666 super(PostJobTask, self).run(
1667 pidfile_name=self._pidfile_name,
1668 paired_with_pidfile=self._paired_with_pidfile)
1669
1670
1671 def _set_all_statuses(self, status):
1672 for queue_entry in self._queue_entries:
1673 queue_entry.set_status(status)
1674
1675
1676 def abort(self):
1677 # override AgentTask.abort() to avoid killing the process and ending
1678 # the task. post-job tasks continue when the job is aborted.
1679 pass
1680
1681
1682class GatherLogsTask(PostJobTask):
1683 """
1684 Task responsible for
1685 * gathering uncollected logs (if Autoserv crashed hard or was killed)
1686 * copying logs to the results repository
1687 * spawning CleanupTasks for hosts, if necessary
1688 * spawning a FinalReparseTask for the job
1689 """
1690 def __init__(self, job, queue_entries, run_monitor=None):
1691 self._job = job
1692 super(GatherLogsTask, self).__init__(
1693 queue_entries, pidfile_name=_CRASHINFO_PID_FILE,
1694 logfile_name='.collect_crashinfo.log', run_monitor=run_monitor)
1695 self._set_ids(queue_entries=queue_entries)
1696
1697
1698 def _generate_command(self, results_dir):
1699 host_list = ','.join(queue_entry.host.hostname
1700 for queue_entry in self._queue_entries)
1701 return [_autoserv_path , '-p', '--collect-crashinfo', '-m', host_list,
1702 '-r', results_dir]
1703
1704
1705 def prolog(self):
1706 super(GatherLogsTask, self).prolog()
1707 self._set_all_statuses(models.HostQueueEntry.Status.GATHERING)
1708
1709
1710 def _reboot_hosts(self):
1711 reboot_after = self._job.reboot_after
1712 do_reboot = False
showard6b733412009-04-27 20:09:18 +00001713 if self._final_status == models.HostQueueEntry.Status.ABORTED:
1714 do_reboot = True
1715 elif reboot_after == models.RebootAfter.ALWAYS:
showardd3dc1992009-04-22 21:01:40 +00001716 do_reboot = True
1717 elif reboot_after == models.RebootAfter.IF_ALL_TESTS_PASSED:
1718 final_success = (
1719 self._final_status == models.HostQueueEntry.Status.COMPLETED)
1720 num_tests_failed = self._autoserv_monitor.num_tests_failed()
1721 do_reboot = (final_success and num_tests_failed == 0)
1722
1723 for queue_entry in self._queue_entries:
1724 if do_reboot:
1725 # don't pass the queue entry to the CleanupTask. if the cleanup
1726 # fails, the job doesn't care -- it's over.
1727 cleanup_task = CleanupTask(host=queue_entry.host)
1728 self.agent.dispatcher.add_agent(Agent([cleanup_task]))
1729 else:
1730 queue_entry.host.set_status('Ready')
1731
1732
1733 def epilog(self):
1734 super(GatherLogsTask, self).epilog()
showard6b733412009-04-27 20:09:18 +00001735 self._copy_and_parse_results(self._queue_entries,
1736 use_monitor=self._autoserv_monitor)
showardd3dc1992009-04-22 21:01:40 +00001737 self._reboot_hosts()
1738
1739
showard8fe93b52008-11-18 17:53:22 +00001740class CleanupTask(PreJobTask):
showardfa8629c2008-11-04 16:51:23 +00001741 def __init__(self, host=None, queue_entry=None):
1742 assert bool(host) ^ bool(queue_entry)
1743 if queue_entry:
1744 host = queue_entry.get_host()
showardfa8629c2008-11-04 16:51:23 +00001745 self.queue_entry = queue_entry
jadmanski0afbb632008-06-06 21:10:57 +00001746 self.host = host
showard170873e2009-01-07 00:22:26 +00001747
1748 self.create_temp_resultsdir('.cleanup')
showard87ba02a2009-04-20 19:37:32 +00001749 self.cmd = _autoserv_command_line(host.hostname, self.temp_results_dir,
1750 ['--cleanup'],
1751 queue_entry=queue_entry)
showarde788ea62008-11-17 21:02:47 +00001752 repair_task = RepairTask(host, queue_entry=queue_entry)
showard170873e2009-01-07 00:22:26 +00001753 super(CleanupTask, self).__init__(self.cmd, self.temp_results_dir,
1754 failure_tasks=[repair_task])
1755
1756 self._set_ids(host=host, queue_entries=[queue_entry])
1757 self.set_host_log_file('cleanup', self.host)
mbligh16c722d2008-03-05 00:58:44 +00001758
mblighd5c95802008-03-05 00:33:46 +00001759
jadmanski0afbb632008-06-06 21:10:57 +00001760 def prolog(self):
showard8fe93b52008-11-18 17:53:22 +00001761 super(CleanupTask, self).prolog()
showardb18134f2009-03-20 20:52:18 +00001762 logging.info("starting cleanup task for host: %s", self.host.hostname)
showard45ae8192008-11-05 19:32:53 +00001763 self.host.set_status("Cleaning")
mblighd5c95802008-03-05 00:33:46 +00001764
mblighd5c95802008-03-05 00:33:46 +00001765
showard21baa452008-10-21 00:08:39 +00001766 def epilog(self):
showard45ae8192008-11-05 19:32:53 +00001767 super(CleanupTask, self).epilog()
showard21baa452008-10-21 00:08:39 +00001768 if self.success:
showardfa8629c2008-11-04 16:51:23 +00001769 self.host.set_status('Ready')
showard21baa452008-10-21 00:08:39 +00001770 self.host.update_field('dirty', 0)
1771
1772
showardd3dc1992009-04-22 21:01:40 +00001773class FinalReparseTask(PostJobTask):
showard97aed502008-11-04 02:01:24 +00001774 _num_running_parses = 0
1775
showardd3dc1992009-04-22 21:01:40 +00001776 def __init__(self, queue_entries, run_monitor=None):
1777 super(FinalReparseTask, self).__init__(queue_entries,
1778 pidfile_name=_PARSER_PID_FILE,
1779 logfile_name='.parse.log',
1780 run_monitor=run_monitor)
showard170873e2009-01-07 00:22:26 +00001781 # don't use _set_ids, since we don't want to set the host_ids
1782 self.queue_entry_ids = [entry.id for entry in queue_entries]
showard97aed502008-11-04 02:01:24 +00001783 self._parse_started = False
1784
showard97aed502008-11-04 02:01:24 +00001785
1786 @classmethod
1787 def _increment_running_parses(cls):
1788 cls._num_running_parses += 1
1789
1790
1791 @classmethod
1792 def _decrement_running_parses(cls):
1793 cls._num_running_parses -= 1
1794
1795
1796 @classmethod
1797 def _can_run_new_parse(cls):
showardd1ee1dd2009-01-07 21:33:08 +00001798 return (cls._num_running_parses <
1799 scheduler_config.config.max_parse_processes)
showard97aed502008-11-04 02:01:24 +00001800
1801
1802 def prolog(self):
1803 super(FinalReparseTask, self).prolog()
showardd3dc1992009-04-22 21:01:40 +00001804 self._set_all_statuses(models.HostQueueEntry.Status.PARSING)
showard97aed502008-11-04 02:01:24 +00001805
1806
1807 def epilog(self):
1808 super(FinalReparseTask, self).epilog()
showardd3dc1992009-04-22 21:01:40 +00001809 self._set_all_statuses(self._final_status)
showard97aed502008-11-04 02:01:24 +00001810
1811
showardd3dc1992009-04-22 21:01:40 +00001812 def _generate_command(self, results_dir):
showard170873e2009-01-07 00:22:26 +00001813 return [_parser_path, '--write-pidfile', '-l', '2', '-r', '-o',
showardd3dc1992009-04-22 21:01:40 +00001814 results_dir]
showard97aed502008-11-04 02:01:24 +00001815
1816
1817 def poll(self):
1818 # override poll to keep trying to start until the parse count goes down
1819 # and we can, at which point we revert to default behavior
1820 if self._parse_started:
1821 super(FinalReparseTask, self).poll()
1822 else:
1823 self._try_starting_parse()
1824
1825
1826 def run(self):
1827 # override run() to not actually run unless we can
1828 self._try_starting_parse()
1829
1830
1831 def _try_starting_parse(self):
1832 if not self._can_run_new_parse():
1833 return
showard170873e2009-01-07 00:22:26 +00001834
showard97aed502008-11-04 02:01:24 +00001835 # actually run the parse command
showardd3dc1992009-04-22 21:01:40 +00001836 super(FinalReparseTask, self).run()
showard170873e2009-01-07 00:22:26 +00001837
showard97aed502008-11-04 02:01:24 +00001838 self._increment_running_parses()
1839 self._parse_started = True
1840
1841
1842 def finished(self, success):
1843 super(FinalReparseTask, self).finished(success)
showard678df4f2009-02-04 21:36:39 +00001844 if self._parse_started:
1845 self._decrement_running_parses()
showard97aed502008-11-04 02:01:24 +00001846
1847
showardc9ae1782009-01-30 01:42:37 +00001848class SetEntryPendingTask(AgentTask):
1849 def __init__(self, queue_entry):
1850 super(SetEntryPendingTask, self).__init__(cmd='')
1851 self._queue_entry = queue_entry
1852 self._set_ids(queue_entries=[queue_entry])
1853
1854
1855 def run(self):
1856 agent = self._queue_entry.on_pending()
1857 if agent:
1858 self.agent.dispatcher.add_agent(agent)
1859 self.finished(True)
1860
1861
showarda3c58572009-03-12 20:36:59 +00001862class DBError(Exception):
1863 """Raised by the DBObject constructor when its select fails."""
1864
1865
mbligh36768f02008-02-22 18:28:33 +00001866class DBObject(object):
showarda3c58572009-03-12 20:36:59 +00001867 """A miniature object relational model for the database."""
showard6ae5ea92009-02-25 00:11:51 +00001868
1869 # Subclasses MUST override these:
1870 _table_name = ''
1871 _fields = ()
1872
showarda3c58572009-03-12 20:36:59 +00001873 # A mapping from (type, id) to the instance of the object for that
1874 # particular id. This prevents us from creating new Job() and Host()
1875 # instances for every HostQueueEntry object that we instantiate as
1876 # multiple HQEs often share the same Job.
1877 _instances_by_type_and_id = weakref.WeakValueDictionary()
1878 _initialized = False
showard6ae5ea92009-02-25 00:11:51 +00001879
showarda3c58572009-03-12 20:36:59 +00001880
1881 def __new__(cls, id=None, **kwargs):
1882 """
1883 Look to see if we already have an instance for this particular type
1884 and id. If so, use it instead of creating a duplicate instance.
1885 """
1886 if id is not None:
1887 instance = cls._instances_by_type_and_id.get((cls, id))
1888 if instance:
1889 return instance
1890 return super(DBObject, cls).__new__(cls, id=id, **kwargs)
1891
1892
1893 def __init__(self, id=None, row=None, new_record=False, always_query=True):
jadmanski0afbb632008-06-06 21:10:57 +00001894 assert (bool(id) != bool(row))
showard6ae5ea92009-02-25 00:11:51 +00001895 assert self._table_name, '_table_name must be defined in your class'
1896 assert self._fields, '_fields must be defined in your class'
showarda3c58572009-03-12 20:36:59 +00001897 if not new_record:
1898 if self._initialized and not always_query:
1899 return # We've already been initialized.
1900 if id is None:
1901 id = row[0]
1902 # Tell future constructors to use us instead of re-querying while
1903 # this instance is still around.
1904 self._instances_by_type_and_id[(type(self), id)] = self
mbligh36768f02008-02-22 18:28:33 +00001905
showard6ae5ea92009-02-25 00:11:51 +00001906 self.__table = self._table_name
mbligh36768f02008-02-22 18:28:33 +00001907
jadmanski0afbb632008-06-06 21:10:57 +00001908 self.__new_record = new_record
mbligh36768f02008-02-22 18:28:33 +00001909
jadmanski0afbb632008-06-06 21:10:57 +00001910 if row is None:
showardccbd6c52009-03-21 00:10:21 +00001911 row = self._fetch_row_from_db(id)
mbligh36768f02008-02-22 18:28:33 +00001912
showarda3c58572009-03-12 20:36:59 +00001913 if self._initialized:
1914 differences = self._compare_fields_in_row(row)
1915 if differences:
showard7629f142009-03-27 21:02:02 +00001916 logging.warn(
1917 'initialized %s %s instance requery is updating: %s',
1918 type(self), self.id, differences)
showard2bab8f42008-11-12 18:15:22 +00001919 self._update_fields_from_row(row)
showarda3c58572009-03-12 20:36:59 +00001920 self._initialized = True
1921
1922
1923 @classmethod
1924 def _clear_instance_cache(cls):
1925 """Used for testing, clear the internal instance cache."""
1926 cls._instances_by_type_and_id.clear()
1927
1928
showardccbd6c52009-03-21 00:10:21 +00001929 def _fetch_row_from_db(self, row_id):
1930 sql = 'SELECT * FROM %s WHERE ID=%%s' % self.__table
1931 rows = _db.execute(sql, (row_id,))
1932 if not rows:
showard76e29d12009-04-15 21:53:10 +00001933 raise DBError("row not found (table=%s, row id=%s)"
1934 % (self.__table, row_id))
showardccbd6c52009-03-21 00:10:21 +00001935 return rows[0]
1936
1937
showarda3c58572009-03-12 20:36:59 +00001938 def _assert_row_length(self, row):
1939 assert len(row) == len(self._fields), (
1940 "table = %s, row = %s/%d, fields = %s/%d" % (
1941 self.__table, row, len(row), self._fields, len(self._fields)))
1942
1943
1944 def _compare_fields_in_row(self, row):
1945 """
1946 Given a row as returned by a SELECT query, compare it to our existing
1947 in memory fields.
1948
1949 @param row - A sequence of values corresponding to fields named in
1950 The class attribute _fields.
1951
1952 @returns A dictionary listing the differences keyed by field name
1953 containing tuples of (current_value, row_value).
1954 """
1955 self._assert_row_length(row)
1956 differences = {}
1957 for field, row_value in itertools.izip(self._fields, row):
1958 current_value = getattr(self, field)
1959 if current_value != row_value:
1960 differences[field] = (current_value, row_value)
1961 return differences
showard2bab8f42008-11-12 18:15:22 +00001962
1963
1964 def _update_fields_from_row(self, row):
showarda3c58572009-03-12 20:36:59 +00001965 """
1966 Update our field attributes using a single row returned by SELECT.
1967
1968 @param row - A sequence of values corresponding to fields named in
1969 the class fields list.
1970 """
1971 self._assert_row_length(row)
mbligh36768f02008-02-22 18:28:33 +00001972
showard2bab8f42008-11-12 18:15:22 +00001973 self._valid_fields = set()
showarda3c58572009-03-12 20:36:59 +00001974 for field, value in itertools.izip(self._fields, row):
showard2bab8f42008-11-12 18:15:22 +00001975 setattr(self, field, value)
1976 self._valid_fields.add(field)
mbligh36768f02008-02-22 18:28:33 +00001977
showard2bab8f42008-11-12 18:15:22 +00001978 self._valid_fields.remove('id')
mbligh36768f02008-02-22 18:28:33 +00001979
mblighe2586682008-02-29 22:45:46 +00001980
showardccbd6c52009-03-21 00:10:21 +00001981 def update_from_database(self):
1982 assert self.id is not None
1983 row = self._fetch_row_from_db(self.id)
1984 self._update_fields_from_row(row)
1985
1986
jadmanski0afbb632008-06-06 21:10:57 +00001987 def count(self, where, table = None):
1988 if not table:
1989 table = self.__table
mbligh36768f02008-02-22 18:28:33 +00001990
jadmanski0afbb632008-06-06 21:10:57 +00001991 rows = _db.execute("""
1992 SELECT count(*) FROM %s
1993 WHERE %s
1994 """ % (table, where))
mbligh6f8bab42008-02-29 22:45:14 +00001995
jadmanski0afbb632008-06-06 21:10:57 +00001996 assert len(rows) == 1
1997
1998 return int(rows[0][0])
mbligh36768f02008-02-22 18:28:33 +00001999
2000
showardd3dc1992009-04-22 21:01:40 +00002001 def update_field(self, field, value):
showard2bab8f42008-11-12 18:15:22 +00002002 assert field in self._valid_fields
mbligh36768f02008-02-22 18:28:33 +00002003
showard2bab8f42008-11-12 18:15:22 +00002004 if getattr(self, field) == value:
jadmanski0afbb632008-06-06 21:10:57 +00002005 return
mbligh36768f02008-02-22 18:28:33 +00002006
mblighf8c624d2008-07-03 16:58:45 +00002007 query = "UPDATE %s SET %s = %%s WHERE id = %%s" % (self.__table, field)
jadmanski0afbb632008-06-06 21:10:57 +00002008 _db.execute(query, (value, self.id))
2009
showard2bab8f42008-11-12 18:15:22 +00002010 setattr(self, field, value)
mbligh36768f02008-02-22 18:28:33 +00002011
2012
jadmanski0afbb632008-06-06 21:10:57 +00002013 def save(self):
2014 if self.__new_record:
showard6ae5ea92009-02-25 00:11:51 +00002015 keys = self._fields[1:] # avoid id
jadmanski0afbb632008-06-06 21:10:57 +00002016 columns = ','.join([str(key) for key in keys])
showard76e29d12009-04-15 21:53:10 +00002017 values = []
2018 for key in keys:
2019 value = getattr(self, key)
2020 if value is None:
2021 values.append('NULL')
2022 else:
2023 values.append('"%s"' % value)
showard89f84db2009-03-12 20:39:13 +00002024 values_str = ','.join(values)
2025 query = ('INSERT INTO %s (%s) VALUES (%s)' %
2026 (self.__table, columns, values_str))
jadmanski0afbb632008-06-06 21:10:57 +00002027 _db.execute(query)
showard89f84db2009-03-12 20:39:13 +00002028 # Update our id to the one the database just assigned to us.
2029 self.id = _db.execute('SELECT LAST_INSERT_ID()')[0][0]
mbligh36768f02008-02-22 18:28:33 +00002030
2031
jadmanski0afbb632008-06-06 21:10:57 +00002032 def delete(self):
showarda3c58572009-03-12 20:36:59 +00002033 self._instances_by_type_and_id.pop((type(self), id), None)
2034 self._initialized = False
2035 self._valid_fields.clear()
jadmanski0afbb632008-06-06 21:10:57 +00002036 query = 'DELETE FROM %s WHERE id=%%s' % self.__table
2037 _db.execute(query, (self.id,))
mblighe2586682008-02-29 22:45:46 +00002038
2039
showard63a34772008-08-18 19:32:50 +00002040 @staticmethod
2041 def _prefix_with(string, prefix):
2042 if string:
2043 string = prefix + string
2044 return string
2045
2046
jadmanski0afbb632008-06-06 21:10:57 +00002047 @classmethod
showard989f25d2008-10-01 11:38:11 +00002048 def fetch(cls, where='', params=(), joins='', order_by=''):
showard89f84db2009-03-12 20:39:13 +00002049 """
2050 Construct instances of our class based on the given database query.
2051
2052 @yields One class instance for each row fetched.
2053 """
showard63a34772008-08-18 19:32:50 +00002054 order_by = cls._prefix_with(order_by, 'ORDER BY ')
2055 where = cls._prefix_with(where, 'WHERE ')
2056 query = ('SELECT %(table)s.* FROM %(table)s %(joins)s '
showard6ae5ea92009-02-25 00:11:51 +00002057 '%(where)s %(order_by)s' % {'table' : cls._table_name,
showard63a34772008-08-18 19:32:50 +00002058 'joins' : joins,
2059 'where' : where,
2060 'order_by' : order_by})
2061 rows = _db.execute(query, params)
jadmanski0afbb632008-06-06 21:10:57 +00002062 for row in rows:
2063 yield cls(row=row)
mblighe2586682008-02-29 22:45:46 +00002064
mbligh36768f02008-02-22 18:28:33 +00002065
2066class IneligibleHostQueue(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002067 _table_name = 'ineligible_host_queues'
2068 _fields = ('id', 'job_id', 'host_id')
showard04c82c52008-05-29 19:38:12 +00002069
2070
showard89f84db2009-03-12 20:39:13 +00002071class AtomicGroup(DBObject):
2072 _table_name = 'atomic_groups'
showard205fd602009-03-21 00:17:35 +00002073 _fields = ('id', 'name', 'description', 'max_number_of_machines',
2074 'invalid')
showard89f84db2009-03-12 20:39:13 +00002075
2076
showard989f25d2008-10-01 11:38:11 +00002077class Label(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002078 _table_name = 'labels'
2079 _fields = ('id', 'name', 'kernel_config', 'platform', 'invalid',
showard89f84db2009-03-12 20:39:13 +00002080 'only_if_needed', 'atomic_group_id')
showard989f25d2008-10-01 11:38:11 +00002081
2082
mbligh36768f02008-02-22 18:28:33 +00002083class Host(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002084 _table_name = 'hosts'
2085 _fields = ('id', 'hostname', 'locked', 'synch_id', 'status',
2086 'invalid', 'protection', 'locked_by_id', 'lock_time', 'dirty')
2087
2088
jadmanski0afbb632008-06-06 21:10:57 +00002089 def current_task(self):
2090 rows = _db.execute("""
2091 SELECT * FROM host_queue_entries WHERE host_id=%s AND NOT complete AND active
2092 """, (self.id,))
2093
2094 if len(rows) == 0:
2095 return None
2096 else:
2097 assert len(rows) == 1
2098 results = rows[0];
jadmanski0afbb632008-06-06 21:10:57 +00002099 return HostQueueEntry(row=results)
mbligh36768f02008-02-22 18:28:33 +00002100
2101
jadmanski0afbb632008-06-06 21:10:57 +00002102 def yield_work(self):
showardb18134f2009-03-20 20:52:18 +00002103 logging.info("%s yielding work", self.hostname)
jadmanski0afbb632008-06-06 21:10:57 +00002104 if self.current_task():
2105 self.current_task().requeue()
2106
showard6ae5ea92009-02-25 00:11:51 +00002107
jadmanski0afbb632008-06-06 21:10:57 +00002108 def set_status(self,status):
showardb18134f2009-03-20 20:52:18 +00002109 logging.info('%s -> %s', self.hostname, status)
jadmanski0afbb632008-06-06 21:10:57 +00002110 self.update_field('status',status)
mbligh36768f02008-02-22 18:28:33 +00002111
2112
showard170873e2009-01-07 00:22:26 +00002113 def platform_and_labels(self):
showardd8e548a2008-09-09 03:04:57 +00002114 """
showard170873e2009-01-07 00:22:26 +00002115 Returns a tuple (platform_name, list_of_all_label_names).
showardd8e548a2008-09-09 03:04:57 +00002116 """
2117 rows = _db.execute("""
showard170873e2009-01-07 00:22:26 +00002118 SELECT labels.name, labels.platform
showardd8e548a2008-09-09 03:04:57 +00002119 FROM labels
2120 INNER JOIN hosts_labels ON labels.id = hosts_labels.label_id
showard170873e2009-01-07 00:22:26 +00002121 WHERE hosts_labels.host_id = %s
showardd8e548a2008-09-09 03:04:57 +00002122 ORDER BY labels.name
2123 """, (self.id,))
showard170873e2009-01-07 00:22:26 +00002124 platform = None
2125 all_labels = []
2126 for label_name, is_platform in rows:
2127 if is_platform:
2128 platform = label_name
2129 all_labels.append(label_name)
2130 return platform, all_labels
2131
2132
2133 def reverify_tasks(self):
2134 cleanup_task = CleanupTask(host=self)
2135 verify_task = VerifyTask(host=self)
2136 # just to make sure this host does not get taken away
2137 self.set_status('Cleaning')
2138 return [cleanup_task, verify_task]
showardd8e548a2008-09-09 03:04:57 +00002139
2140
mbligh36768f02008-02-22 18:28:33 +00002141class HostQueueEntry(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002142 _table_name = 'host_queue_entries'
2143 _fields = ('id', 'job_id', 'host_id', 'status', 'meta_host',
showard89f84db2009-03-12 20:39:13 +00002144 'active', 'complete', 'deleted', 'execution_subdir',
showardd3dc1992009-04-22 21:01:40 +00002145 'atomic_group_id', 'aborted')
showard6ae5ea92009-02-25 00:11:51 +00002146
2147
showarda3c58572009-03-12 20:36:59 +00002148 def __init__(self, id=None, row=None, **kwargs):
jadmanski0afbb632008-06-06 21:10:57 +00002149 assert id or row
showarda3c58572009-03-12 20:36:59 +00002150 super(HostQueueEntry, self).__init__(id=id, row=row, **kwargs)
jadmanski0afbb632008-06-06 21:10:57 +00002151 self.job = Job(self.job_id)
mbligh36768f02008-02-22 18:28:33 +00002152
jadmanski0afbb632008-06-06 21:10:57 +00002153 if self.host_id:
2154 self.host = Host(self.host_id)
2155 else:
2156 self.host = None
mbligh36768f02008-02-22 18:28:33 +00002157
showard170873e2009-01-07 00:22:26 +00002158 self.queue_log_path = os.path.join(self.job.tag(),
jadmanski0afbb632008-06-06 21:10:57 +00002159 'queue.log.' + str(self.id))
mbligh36768f02008-02-22 18:28:33 +00002160
2161
showard89f84db2009-03-12 20:39:13 +00002162 @classmethod
2163 def clone(cls, template):
2164 """
2165 Creates a new row using the values from a template instance.
2166
2167 The new instance will not exist in the database or have a valid
2168 id attribute until its save() method is called.
2169 """
2170 assert isinstance(template, cls)
2171 new_row = [getattr(template, field) for field in cls._fields]
2172 clone = cls(row=new_row, new_record=True)
2173 clone.id = None
2174 return clone
2175
2176
showardc85c21b2008-11-24 22:17:37 +00002177 def _view_job_url(self):
2178 return "%s#tab_id=view_job&object_id=%s" % (_base_url, self.job.id)
2179
2180
jadmanski0afbb632008-06-06 21:10:57 +00002181 def set_host(self, host):
2182 if host:
2183 self.queue_log_record('Assigning host ' + host.hostname)
2184 self.update_field('host_id', host.id)
2185 self.update_field('active', True)
2186 self.block_host(host.id)
2187 else:
2188 self.queue_log_record('Releasing host')
2189 self.unblock_host(self.host.id)
2190 self.update_field('host_id', None)
mbligh36768f02008-02-22 18:28:33 +00002191
jadmanski0afbb632008-06-06 21:10:57 +00002192 self.host = host
mbligh36768f02008-02-22 18:28:33 +00002193
2194
jadmanski0afbb632008-06-06 21:10:57 +00002195 def get_host(self):
2196 return self.host
mbligh36768f02008-02-22 18:28:33 +00002197
2198
jadmanski0afbb632008-06-06 21:10:57 +00002199 def queue_log_record(self, log_line):
2200 now = str(datetime.datetime.now())
showard170873e2009-01-07 00:22:26 +00002201 _drone_manager.write_lines_to_file(self.queue_log_path,
2202 [now + ' ' + log_line])
mbligh36768f02008-02-22 18:28:33 +00002203
2204
jadmanski0afbb632008-06-06 21:10:57 +00002205 def block_host(self, host_id):
showardb18134f2009-03-20 20:52:18 +00002206 logging.info("creating block %s/%s", self.job.id, host_id)
jadmanski0afbb632008-06-06 21:10:57 +00002207 row = [0, self.job.id, host_id]
2208 block = IneligibleHostQueue(row=row, new_record=True)
2209 block.save()
mblighe2586682008-02-29 22:45:46 +00002210
2211
jadmanski0afbb632008-06-06 21:10:57 +00002212 def unblock_host(self, host_id):
showardb18134f2009-03-20 20:52:18 +00002213 logging.info("removing block %s/%s", self.job.id, host_id)
jadmanski0afbb632008-06-06 21:10:57 +00002214 blocks = IneligibleHostQueue.fetch(
2215 'job_id=%d and host_id=%d' % (self.job.id, host_id))
2216 for block in blocks:
2217 block.delete()
mblighe2586682008-02-29 22:45:46 +00002218
2219
showard2bab8f42008-11-12 18:15:22 +00002220 def set_execution_subdir(self, subdir=None):
2221 if subdir is None:
2222 assert self.get_host()
2223 subdir = self.get_host().hostname
2224 self.update_field('execution_subdir', subdir)
mbligh36768f02008-02-22 18:28:33 +00002225
2226
showard6355f6b2008-12-05 18:52:13 +00002227 def _get_hostname(self):
2228 if self.host:
2229 return self.host.hostname
2230 return 'no host'
2231
2232
showard170873e2009-01-07 00:22:26 +00002233 def __str__(self):
2234 return "%s/%d (%d)" % (self._get_hostname(), self.job.id, self.id)
2235
2236
jadmanski0afbb632008-06-06 21:10:57 +00002237 def set_status(self, status):
showardd3dc1992009-04-22 21:01:40 +00002238 self.update_field('status', status)
mblighf8c624d2008-07-03 16:58:45 +00002239
showardb18134f2009-03-20 20:52:18 +00002240 logging.info("%s -> %s", self, self.status)
mblighf8c624d2008-07-03 16:58:45 +00002241
showardc85c21b2008-11-24 22:17:37 +00002242 if status in ['Queued', 'Parsing']:
jadmanski0afbb632008-06-06 21:10:57 +00002243 self.update_field('complete', False)
2244 self.update_field('active', False)
mbligh36768f02008-02-22 18:28:33 +00002245
jadmanski0afbb632008-06-06 21:10:57 +00002246 if status in ['Pending', 'Running', 'Verifying', 'Starting',
showardd3dc1992009-04-22 21:01:40 +00002247 'Gathering']:
jadmanski0afbb632008-06-06 21:10:57 +00002248 self.update_field('complete', False)
2249 self.update_field('active', True)
mbligh36768f02008-02-22 18:28:33 +00002250
showardc85c21b2008-11-24 22:17:37 +00002251 if status in ['Failed', 'Completed', 'Stopped', 'Aborted']:
jadmanski0afbb632008-06-06 21:10:57 +00002252 self.update_field('complete', True)
2253 self.update_field('active', False)
showardc85c21b2008-11-24 22:17:37 +00002254
2255 should_email_status = (status.lower() in _notify_email_statuses or
2256 'all' in _notify_email_statuses)
2257 if should_email_status:
2258 self._email_on_status(status)
2259
2260 self._email_on_job_complete()
2261
2262
2263 def _email_on_status(self, status):
showard6355f6b2008-12-05 18:52:13 +00002264 hostname = self._get_hostname()
showardc85c21b2008-11-24 22:17:37 +00002265
2266 subject = 'Autotest: Job ID: %s "%s" Host: %s %s' % (
2267 self.job.id, self.job.name, hostname, status)
2268 body = "Job ID: %s\nJob Name: %s\nHost: %s\nStatus: %s\n%s\n" % (
2269 self.job.id, self.job.name, hostname, status,
2270 self._view_job_url())
showard170873e2009-01-07 00:22:26 +00002271 email_manager.manager.send_email(self.job.email_list, subject, body)
showard542e8402008-09-19 20:16:18 +00002272
2273
2274 def _email_on_job_complete(self):
showardc85c21b2008-11-24 22:17:37 +00002275 if not self.job.is_finished():
2276 return
showard542e8402008-09-19 20:16:18 +00002277
showardc85c21b2008-11-24 22:17:37 +00002278 summary_text = []
showard6355f6b2008-12-05 18:52:13 +00002279 hosts_queue = HostQueueEntry.fetch('job_id = %s' % self.job.id)
showardc85c21b2008-11-24 22:17:37 +00002280 for queue_entry in hosts_queue:
2281 summary_text.append("Host: %s Status: %s" %
showard6355f6b2008-12-05 18:52:13 +00002282 (queue_entry._get_hostname(),
showardc85c21b2008-11-24 22:17:37 +00002283 queue_entry.status))
2284
2285 summary_text = "\n".join(summary_text)
2286 status_counts = models.Job.objects.get_status_counts(
2287 [self.job.id])[self.job.id]
2288 status = ', '.join('%d %s' % (count, status) for status, count
2289 in status_counts.iteritems())
2290
2291 subject = 'Autotest: Job ID: %s "%s" %s' % (
2292 self.job.id, self.job.name, status)
2293 body = "Job ID: %s\nJob Name: %s\nStatus: %s\n%s\nSummary:\n%s" % (
2294 self.job.id, self.job.name, status, self._view_job_url(),
2295 summary_text)
showard170873e2009-01-07 00:22:26 +00002296 email_manager.manager.send_email(self.job.email_list, subject, body)
mbligh36768f02008-02-22 18:28:33 +00002297
2298
showard89f84db2009-03-12 20:39:13 +00002299 def run(self, assigned_host=None):
2300 if self.meta_host is not None or self.atomic_group_id is not None:
jadmanski0afbb632008-06-06 21:10:57 +00002301 assert assigned_host
2302 # ensure results dir exists for the queue log
jadmanski0afbb632008-06-06 21:10:57 +00002303 self.set_host(assigned_host)
mbligh36768f02008-02-22 18:28:33 +00002304
showardb18134f2009-03-20 20:52:18 +00002305 logging.info("%s/%s/%s scheduled on %s, status=%s",
2306 self.job.name, self.meta_host, self.atomic_group_id,
2307 self.host.hostname, self.status)
mbligh36768f02008-02-22 18:28:33 +00002308
jadmanski0afbb632008-06-06 21:10:57 +00002309 return self.job.run(queue_entry=self)
mblighe2586682008-02-29 22:45:46 +00002310
showard6ae5ea92009-02-25 00:11:51 +00002311
jadmanski0afbb632008-06-06 21:10:57 +00002312 def requeue(self):
2313 self.set_status('Queued')
showardde634ee2009-01-30 01:44:24 +00002314 # verify/cleanup failure sets the execution subdir, so reset it here
2315 self.set_execution_subdir('')
jadmanski0afbb632008-06-06 21:10:57 +00002316 if self.meta_host:
2317 self.set_host(None)
mbligh36768f02008-02-22 18:28:33 +00002318
2319
jadmanski0afbb632008-06-06 21:10:57 +00002320 def handle_host_failure(self):
2321 """\
2322 Called when this queue entry's host has failed verification and
2323 repair.
2324 """
2325 assert not self.meta_host
2326 self.set_status('Failed')
showard2bab8f42008-11-12 18:15:22 +00002327 self.job.stop_if_necessary()
mblighe2586682008-02-29 22:45:46 +00002328
2329
jadmanskif7fa2cc2008-10-01 14:13:23 +00002330 @property
2331 def aborted_by(self):
2332 self._load_abort_info()
2333 return self._aborted_by
2334
2335
2336 @property
2337 def aborted_on(self):
2338 self._load_abort_info()
2339 return self._aborted_on
2340
2341
2342 def _load_abort_info(self):
2343 """ Fetch info about who aborted the job. """
2344 if hasattr(self, "_aborted_by"):
2345 return
2346 rows = _db.execute("""
2347 SELECT users.login, aborted_host_queue_entries.aborted_on
2348 FROM aborted_host_queue_entries
2349 INNER JOIN users
2350 ON users.id = aborted_host_queue_entries.aborted_by_id
2351 WHERE aborted_host_queue_entries.queue_entry_id = %s
2352 """, (self.id,))
2353 if rows:
2354 self._aborted_by, self._aborted_on = rows[0]
2355 else:
2356 self._aborted_by = self._aborted_on = None
2357
2358
showardb2e2c322008-10-14 17:33:55 +00002359 def on_pending(self):
2360 """
2361 Called when an entry in a synchronous job has passed verify. If the
2362 job is ready to run, returns an agent to run the job. Returns None
2363 otherwise.
2364 """
2365 self.set_status('Pending')
showardcfd66a32008-10-15 20:31:48 +00002366 self.get_host().set_status('Pending')
showardb2e2c322008-10-14 17:33:55 +00002367 if self.job.is_ready():
2368 return self.job.run(self)
showard2bab8f42008-11-12 18:15:22 +00002369 self.job.stop_if_necessary()
showardb2e2c322008-10-14 17:33:55 +00002370 return None
2371
2372
showardd3dc1992009-04-22 21:01:40 +00002373 def abort(self, dispatcher):
2374 assert self.aborted and not self.complete
showard1be97432008-10-17 15:30:45 +00002375
showardd3dc1992009-04-22 21:01:40 +00002376 Status = models.HostQueueEntry.Status
2377 has_running_job_agent = (
2378 self.status in (Status.RUNNING, Status.GATHERING, Status.PARSING)
2379 and dispatcher.get_agents_for_entry(self))
2380 if has_running_job_agent:
2381 # do nothing; post-job tasks will finish and then mark this entry
2382 # with status "Aborted" and take care of the host
2383 return
2384
2385 if self.status in (Status.STARTING, Status.PENDING):
2386 self.host.set_status(models.Host.Status.READY)
2387 elif self.status == Status.VERIFYING:
2388 dispatcher.add_agent(Agent(tasks=self.host.reverify_tasks()))
2389
2390 self.set_status(Status.ABORTED)
showard170873e2009-01-07 00:22:26 +00002391
2392 def execution_tag(self):
2393 assert self.execution_subdir
2394 return "%s-%s/%s" % (self.job.id, self.job.owner, self.execution_subdir)
showard1be97432008-10-17 15:30:45 +00002395
2396
mbligh36768f02008-02-22 18:28:33 +00002397class Job(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002398 _table_name = 'jobs'
2399 _fields = ('id', 'owner', 'name', 'priority', 'control_file',
2400 'control_type', 'created_on', 'synch_count', 'timeout',
2401 'run_verify', 'email_list', 'reboot_before', 'reboot_after')
2402
2403
showarda3c58572009-03-12 20:36:59 +00002404 def __init__(self, id=None, row=None, **kwargs):
jadmanski0afbb632008-06-06 21:10:57 +00002405 assert id or row
showarda3c58572009-03-12 20:36:59 +00002406 super(Job, self).__init__(id=id, row=row, **kwargs)
mbligh36768f02008-02-22 18:28:33 +00002407
mblighe2586682008-02-29 22:45:46 +00002408
jadmanski0afbb632008-06-06 21:10:57 +00002409 def is_server_job(self):
2410 return self.control_type != 2
mbligh36768f02008-02-22 18:28:33 +00002411
2412
showard170873e2009-01-07 00:22:26 +00002413 def tag(self):
2414 return "%s-%s" % (self.id, self.owner)
2415
2416
jadmanski0afbb632008-06-06 21:10:57 +00002417 def get_host_queue_entries(self):
2418 rows = _db.execute("""
2419 SELECT * FROM host_queue_entries
2420 WHERE job_id= %s
2421 """, (self.id,))
2422 entries = [HostQueueEntry(row=i) for i in rows]
mbligh36768f02008-02-22 18:28:33 +00002423
jadmanski0afbb632008-06-06 21:10:57 +00002424 assert len(entries)>0
mbligh36768f02008-02-22 18:28:33 +00002425
jadmanski0afbb632008-06-06 21:10:57 +00002426 return entries
mbligh36768f02008-02-22 18:28:33 +00002427
2428
jadmanski0afbb632008-06-06 21:10:57 +00002429 def set_status(self, status, update_queues=False):
2430 self.update_field('status',status)
2431
2432 if update_queues:
2433 for queue_entry in self.get_host_queue_entries():
2434 queue_entry.set_status(status)
mbligh36768f02008-02-22 18:28:33 +00002435
2436
jadmanski0afbb632008-06-06 21:10:57 +00002437 def is_ready(self):
showard2bab8f42008-11-12 18:15:22 +00002438 pending_entries = models.HostQueueEntry.objects.filter(job=self.id,
2439 status='Pending')
2440 return (pending_entries.count() >= self.synch_count)
mbligh36768f02008-02-22 18:28:33 +00002441
2442
jadmanski0afbb632008-06-06 21:10:57 +00002443 def num_machines(self, clause = None):
2444 sql = "job_id=%s" % self.id
2445 if clause:
2446 sql += " AND (%s)" % clause
2447 return self.count(sql, table='host_queue_entries')
mbligh36768f02008-02-22 18:28:33 +00002448
2449
jadmanski0afbb632008-06-06 21:10:57 +00002450 def num_queued(self):
2451 return self.num_machines('not complete')
mbligh36768f02008-02-22 18:28:33 +00002452
2453
jadmanski0afbb632008-06-06 21:10:57 +00002454 def num_active(self):
2455 return self.num_machines('active')
mbligh36768f02008-02-22 18:28:33 +00002456
2457
jadmanski0afbb632008-06-06 21:10:57 +00002458 def num_complete(self):
2459 return self.num_machines('complete')
mbligh36768f02008-02-22 18:28:33 +00002460
2461
jadmanski0afbb632008-06-06 21:10:57 +00002462 def is_finished(self):
showardc85c21b2008-11-24 22:17:37 +00002463 return self.num_complete() == self.num_machines()
mbligh36768f02008-02-22 18:28:33 +00002464
mbligh36768f02008-02-22 18:28:33 +00002465
showard6bb7c292009-01-30 01:44:51 +00002466 def _not_yet_run_entries(self, include_verifying=True):
2467 statuses = [models.HostQueueEntry.Status.QUEUED,
2468 models.HostQueueEntry.Status.PENDING]
2469 if include_verifying:
2470 statuses.append(models.HostQueueEntry.Status.VERIFYING)
2471 return models.HostQueueEntry.objects.filter(job=self.id,
2472 status__in=statuses)
2473
2474
2475 def _stop_all_entries(self):
2476 entries_to_stop = self._not_yet_run_entries(
2477 include_verifying=False)
2478 for child_entry in entries_to_stop:
showard4f9e5372009-01-07 21:33:38 +00002479 assert not child_entry.complete, (
2480 '%s status=%s, active=%s, complete=%s' %
2481 (child_entry.id, child_entry.status, child_entry.active,
2482 child_entry.complete))
showard2bab8f42008-11-12 18:15:22 +00002483 if child_entry.status == models.HostQueueEntry.Status.PENDING:
2484 child_entry.host.status = models.Host.Status.READY
2485 child_entry.host.save()
2486 child_entry.status = models.HostQueueEntry.Status.STOPPED
2487 child_entry.save()
2488
showard2bab8f42008-11-12 18:15:22 +00002489 def stop_if_necessary(self):
showard6bb7c292009-01-30 01:44:51 +00002490 not_yet_run = self._not_yet_run_entries()
showard2bab8f42008-11-12 18:15:22 +00002491 if not_yet_run.count() < self.synch_count:
showard6bb7c292009-01-30 01:44:51 +00002492 self._stop_all_entries()
mblighe2586682008-02-29 22:45:46 +00002493
2494
jadmanski0afbb632008-06-06 21:10:57 +00002495 def write_to_machines_file(self, queue_entry):
2496 hostname = queue_entry.get_host().hostname
showard170873e2009-01-07 00:22:26 +00002497 file_path = os.path.join(self.tag(), '.machines')
2498 _drone_manager.write_lines_to_file(file_path, [hostname])
mbligh36768f02008-02-22 18:28:33 +00002499
2500
showard2bab8f42008-11-12 18:15:22 +00002501 def _next_group_name(self):
2502 query = models.HostQueueEntry.objects.filter(
2503 job=self.id).values('execution_subdir').distinct()
2504 subdirs = (entry['execution_subdir'] for entry in query)
2505 groups = (re.match(r'group(\d+)', subdir) for subdir in subdirs)
2506 ids = [int(match.group(1)) for match in groups if match]
2507 if ids:
2508 next_id = max(ids) + 1
2509 else:
2510 next_id = 0
2511 return "group%d" % next_id
2512
2513
showard170873e2009-01-07 00:22:26 +00002514 def _write_control_file(self, execution_tag):
2515 control_path = _drone_manager.attach_file_to_execution(
2516 execution_tag, self.control_file)
2517 return control_path
mbligh36768f02008-02-22 18:28:33 +00002518
showardb2e2c322008-10-14 17:33:55 +00002519
showard2bab8f42008-11-12 18:15:22 +00002520 def get_group_entries(self, queue_entry_from_group):
2521 execution_subdir = queue_entry_from_group.execution_subdir
showarde788ea62008-11-17 21:02:47 +00002522 return list(HostQueueEntry.fetch(
2523 where='job_id=%s AND execution_subdir=%s',
2524 params=(self.id, execution_subdir)))
showard2bab8f42008-11-12 18:15:22 +00002525
2526
showardb2e2c322008-10-14 17:33:55 +00002527 def _get_autoserv_params(self, queue_entries):
showard170873e2009-01-07 00:22:26 +00002528 assert queue_entries
2529 execution_tag = queue_entries[0].execution_tag()
2530 control_path = self._write_control_file(execution_tag)
jadmanski0afbb632008-06-06 21:10:57 +00002531 hostnames = ','.join([entry.get_host().hostname
2532 for entry in queue_entries])
mbligh36768f02008-02-22 18:28:33 +00002533
showard87ba02a2009-04-20 19:37:32 +00002534 params = _autoserv_command_line(
2535 hostnames, execution_tag,
2536 ['-P', execution_tag, '-n',
2537 _drone_manager.absolute_path(control_path)],
2538 job=self)
mbligh36768f02008-02-22 18:28:33 +00002539
jadmanski0afbb632008-06-06 21:10:57 +00002540 if not self.is_server_job():
2541 params.append('-c')
mbligh36768f02008-02-22 18:28:33 +00002542
showardb2e2c322008-10-14 17:33:55 +00002543 return params
mblighe2586682008-02-29 22:45:46 +00002544
mbligh36768f02008-02-22 18:28:33 +00002545
showardc9ae1782009-01-30 01:42:37 +00002546 def _should_run_cleanup(self, queue_entry):
showard0fc38302008-10-23 00:44:07 +00002547 if self.reboot_before == models.RebootBefore.ALWAYS:
showardc9ae1782009-01-30 01:42:37 +00002548 return True
showard0fc38302008-10-23 00:44:07 +00002549 elif self.reboot_before == models.RebootBefore.IF_DIRTY:
showardc9ae1782009-01-30 01:42:37 +00002550 return queue_entry.get_host().dirty
2551 return False
showard21baa452008-10-21 00:08:39 +00002552
showardc9ae1782009-01-30 01:42:37 +00002553
2554 def _should_run_verify(self, queue_entry):
2555 do_not_verify = (queue_entry.host.protection ==
2556 host_protections.Protection.DO_NOT_VERIFY)
2557 if do_not_verify:
2558 return False
2559 return self.run_verify
2560
2561
2562 def _get_pre_job_tasks(self, queue_entry):
showard21baa452008-10-21 00:08:39 +00002563 tasks = []
showardc9ae1782009-01-30 01:42:37 +00002564 if self._should_run_cleanup(queue_entry):
showard45ae8192008-11-05 19:32:53 +00002565 tasks.append(CleanupTask(queue_entry=queue_entry))
showardc9ae1782009-01-30 01:42:37 +00002566 if self._should_run_verify(queue_entry):
2567 tasks.append(VerifyTask(queue_entry=queue_entry))
2568 tasks.append(SetEntryPendingTask(queue_entry))
showard21baa452008-10-21 00:08:39 +00002569 return tasks
2570
2571
showard2bab8f42008-11-12 18:15:22 +00002572 def _assign_new_group(self, queue_entries):
2573 if len(queue_entries) == 1:
2574 group_name = queue_entries[0].get_host().hostname
2575 else:
2576 group_name = self._next_group_name()
showardb18134f2009-03-20 20:52:18 +00002577 logging.info('Running synchronous job %d hosts %s as %s',
showard2bab8f42008-11-12 18:15:22 +00002578 self.id, [entry.host.hostname for entry in queue_entries],
2579 group_name)
2580
2581 for queue_entry in queue_entries:
2582 queue_entry.set_execution_subdir(group_name)
2583
2584
2585 def _choose_group_to_run(self, include_queue_entry):
2586 chosen_entries = [include_queue_entry]
2587
2588 num_entries_needed = self.synch_count - 1
2589 if num_entries_needed > 0:
2590 pending_entries = HostQueueEntry.fetch(
2591 where='job_id = %s AND status = "Pending" AND id != %s',
2592 params=(self.id, include_queue_entry.id))
2593 chosen_entries += list(pending_entries)[:num_entries_needed]
2594
2595 self._assign_new_group(chosen_entries)
2596 return chosen_entries
2597
2598
2599 def run(self, queue_entry):
showardb2e2c322008-10-14 17:33:55 +00002600 if not self.is_ready():
showardc9ae1782009-01-30 01:42:37 +00002601 queue_entry.set_status(models.HostQueueEntry.Status.VERIFYING)
2602 return Agent(self._get_pre_job_tasks(queue_entry))
mbligh36768f02008-02-22 18:28:33 +00002603
showard2bab8f42008-11-12 18:15:22 +00002604 queue_entries = self._choose_group_to_run(queue_entry)
2605 return self._finish_run(queue_entries)
showardb2e2c322008-10-14 17:33:55 +00002606
2607
2608 def _finish_run(self, queue_entries, initial_tasks=[]):
showardb2ccdda2008-10-28 20:39:05 +00002609 for queue_entry in queue_entries:
2610 queue_entry.set_status('Starting')
showardb2e2c322008-10-14 17:33:55 +00002611 params = self._get_autoserv_params(queue_entries)
2612 queue_task = QueueTask(job=self, queue_entries=queue_entries,
2613 cmd=params)
2614 tasks = initial_tasks + [queue_task]
2615 entry_ids = [entry.id for entry in queue_entries]
2616
showard170873e2009-01-07 00:22:26 +00002617 return Agent(tasks, num_processes=len(queue_entries))
showardb2e2c322008-10-14 17:33:55 +00002618
2619
mbligh36768f02008-02-22 18:28:33 +00002620if __name__ == '__main__':
jadmanski0afbb632008-06-06 21:10:57 +00002621 main()