blob: a9c7b0f18cfffb20d1ed5c6b3cf81ec20d1e6299 [file] [log] [blame]
mbligh36768f02008-02-22 18:28:33 +00001#!/usr/bin/python -u
2
3"""
4Autotest scheduler
5"""
showard909c7a62008-07-15 21:52:38 +00006
mbligh36768f02008-02-22 18:28:33 +00007
showard89f84db2009-03-12 20:39:13 +00008import datetime, errno, optparse, os, pwd, Queue, random, re, shutil, signal
showard542e8402008-09-19 20:16:18 +00009import smtplib, socket, stat, subprocess, sys, tempfile, time, traceback
showardb18134f2009-03-20 20:52:18 +000010import itertools, logging, logging.config, weakref
mbligh70feeee2008-06-11 16:20:49 +000011import common
mbligh8bcd23a2009-02-03 19:14:06 +000012import MySQLdb
showard21baa452008-10-21 00:08:39 +000013from autotest_lib.frontend import setup_django_environment
showard542e8402008-09-19 20:16:18 +000014from autotest_lib.client.common_lib import global_config
showardb18134f2009-03-20 20:52:18 +000015from autotest_lib.client.common_lib import host_protections, utils
showardb1e51872008-10-07 11:08:18 +000016from autotest_lib.database import database_connection
showard21baa452008-10-21 00:08:39 +000017from autotest_lib.frontend.afe import models
showard170873e2009-01-07 00:22:26 +000018from autotest_lib.scheduler import drone_manager, drones, email_manager
mblighf3294cc2009-04-08 21:17:38 +000019from autotest_lib.scheduler import monitor_db_cleanup
showardd1ee1dd2009-01-07 21:33:08 +000020from autotest_lib.scheduler import status_server, scheduler_config
mbligh70feeee2008-06-11 16:20:49 +000021
mblighb090f142008-02-27 21:33:46 +000022
mbligh36768f02008-02-22 18:28:33 +000023RESULTS_DIR = '.'
24AUTOSERV_NICE_LEVEL = 10
showard170873e2009-01-07 00:22:26 +000025DB_CONFIG_SECTION = 'AUTOTEST_WEB'
mbligh36768f02008-02-22 18:28:33 +000026
27AUTOTEST_PATH = os.path.join(os.path.dirname(__file__), '..')
28
29if os.environ.has_key('AUTOTEST_DIR'):
jadmanski0afbb632008-06-06 21:10:57 +000030 AUTOTEST_PATH = os.environ['AUTOTEST_DIR']
mbligh36768f02008-02-22 18:28:33 +000031AUTOTEST_SERVER_DIR = os.path.join(AUTOTEST_PATH, 'server')
32AUTOTEST_TKO_DIR = os.path.join(AUTOTEST_PATH, 'tko')
33
34if AUTOTEST_SERVER_DIR not in sys.path:
jadmanski0afbb632008-06-06 21:10:57 +000035 sys.path.insert(0, AUTOTEST_SERVER_DIR)
mbligh36768f02008-02-22 18:28:33 +000036
mbligh90a549d2008-03-25 23:52:34 +000037# how long to wait for autoserv to write a pidfile
38PIDFILE_TIMEOUT = 5 * 60 # 5 min
mblighbb421852008-03-11 22:36:16 +000039
showardd3dc1992009-04-22 21:01:40 +000040_AUTOSERV_PID_FILE = '.autoserv_execute'
41_CRASHINFO_PID_FILE = '.collect_crashinfo_execute'
42_PARSER_PID_FILE = '.parser_execute'
43
44_ALL_PIDFILE_NAMES = (_AUTOSERV_PID_FILE, _CRASHINFO_PID_FILE,
45 _PARSER_PID_FILE)
46
showard35162b02009-03-03 02:17:30 +000047# error message to leave in results dir when an autoserv process disappears
48# mysteriously
49_LOST_PROCESS_ERROR = """\
50Autoserv failed abnormally during execution for this job, probably due to a
51system error on the Autotest server. Full results may not be available. Sorry.
52"""
53
mbligh6f8bab42008-02-29 22:45:14 +000054_db = None
mbligh36768f02008-02-22 18:28:33 +000055_shutdown = False
showard170873e2009-01-07 00:22:26 +000056_autoserv_path = os.path.join(drones.AUTOTEST_INSTALL_DIR, 'server', 'autoserv')
57_parser_path = os.path.join(drones.AUTOTEST_INSTALL_DIR, 'tko', 'parse')
mbligh4314a712008-02-29 22:44:30 +000058_testing_mode = False
showard542e8402008-09-19 20:16:18 +000059_base_url = None
showardc85c21b2008-11-24 22:17:37 +000060_notify_email_statuses = []
showard170873e2009-01-07 00:22:26 +000061_drone_manager = drone_manager.DroneManager()
mbligh36768f02008-02-22 18:28:33 +000062
showardb18134f2009-03-20 20:52:18 +000063# load the logging settings
64scheduler_dir = os.path.join(AUTOTEST_PATH, 'scheduler')
showard50e463b2009-04-07 18:13:45 +000065if not os.environ.has_key('AUTOTEST_SCHEDULER_LOG_DIR'):
66 os.environ['AUTOTEST_SCHEDULER_LOG_DIR'] = os.path.join(AUTOTEST_PATH, 'logs')
showardb18134f2009-03-20 20:52:18 +000067# Here we export the log name, using the same convention as autoserv's results
68# directory.
mblighc9895aa2009-04-01 18:36:58 +000069if os.environ.has_key('AUTOTEST_SCHEDULER_LOG_NAME'):
70 scheduler_log_name = os.environ['AUTOTEST_SCHEDULER_LOG_NAME']
71else:
72 scheduler_log_name = 'scheduler.log.%s' % time.strftime('%Y-%m-%d-%H.%M.%S')
73 os.environ['AUTOTEST_SCHEDULER_LOG_NAME'] = scheduler_log_name
74
showardb18134f2009-03-20 20:52:18 +000075logging.config.fileConfig(os.path.join(scheduler_dir, 'debug_scheduler.ini'))
76
mbligh36768f02008-02-22 18:28:33 +000077
78def main():
showard27f33872009-04-07 18:20:53 +000079 try:
80 main_without_exception_handling()
81 except:
82 logging.exception('Exception escaping in monitor_db')
83 raise
84
85
86def main_without_exception_handling():
jadmanski0afbb632008-06-06 21:10:57 +000087 usage = 'usage: %prog [options] results_dir'
mbligh36768f02008-02-22 18:28:33 +000088
jadmanski0afbb632008-06-06 21:10:57 +000089 parser = optparse.OptionParser(usage)
90 parser.add_option('--recover-hosts', help='Try to recover dead hosts',
91 action='store_true')
92 parser.add_option('--logfile', help='Set a log file that all stdout ' +
93 'should be redirected to. Stderr will go to this ' +
94 'file + ".err"')
95 parser.add_option('--test', help='Indicate that scheduler is under ' +
96 'test and should use dummy autoserv and no parsing',
97 action='store_true')
98 (options, args) = parser.parse_args()
99 if len(args) != 1:
100 parser.print_usage()
101 return
mbligh36768f02008-02-22 18:28:33 +0000102
jadmanski0afbb632008-06-06 21:10:57 +0000103 global RESULTS_DIR
104 RESULTS_DIR = args[0]
mbligh36768f02008-02-22 18:28:33 +0000105
showardcca334f2009-03-12 20:38:34 +0000106 # Change the cwd while running to avoid issues incase we were launched from
107 # somewhere odd (such as a random NFS home directory of the person running
108 # sudo to launch us as the appropriate user).
109 os.chdir(RESULTS_DIR)
110
jadmanski0afbb632008-06-06 21:10:57 +0000111 c = global_config.global_config
showardd1ee1dd2009-01-07 21:33:08 +0000112 notify_statuses_list = c.get_config_value(scheduler_config.CONFIG_SECTION,
113 "notify_email_statuses",
114 default='')
showardc85c21b2008-11-24 22:17:37 +0000115 global _notify_email_statuses
showard170873e2009-01-07 00:22:26 +0000116 _notify_email_statuses = [status for status in
117 re.split(r'[\s,;:]', notify_statuses_list.lower())
118 if status]
showardc85c21b2008-11-24 22:17:37 +0000119
jadmanski0afbb632008-06-06 21:10:57 +0000120 if options.test:
121 global _autoserv_path
122 _autoserv_path = 'autoserv_dummy'
123 global _testing_mode
124 _testing_mode = True
mbligh36768f02008-02-22 18:28:33 +0000125
mbligh37eceaa2008-12-15 22:56:37 +0000126 # AUTOTEST_WEB.base_url is still a supported config option as some people
127 # may wish to override the entire url.
showard542e8402008-09-19 20:16:18 +0000128 global _base_url
showard170873e2009-01-07 00:22:26 +0000129 config_base_url = c.get_config_value(DB_CONFIG_SECTION, 'base_url',
130 default='')
mbligh37eceaa2008-12-15 22:56:37 +0000131 if config_base_url:
132 _base_url = config_base_url
showard542e8402008-09-19 20:16:18 +0000133 else:
mbligh37eceaa2008-12-15 22:56:37 +0000134 # For the common case of everything running on a single server you
135 # can just set the hostname in a single place in the config file.
136 server_name = c.get_config_value('SERVER', 'hostname')
137 if not server_name:
showardb18134f2009-03-20 20:52:18 +0000138 logging.critical('[SERVER] hostname missing from the config file.')
mbligh37eceaa2008-12-15 22:56:37 +0000139 sys.exit(1)
140 _base_url = 'http://%s/afe/' % server_name
showard542e8402008-09-19 20:16:18 +0000141
showardc5afc462009-01-13 00:09:39 +0000142 server = status_server.StatusServer(_drone_manager)
showardd1ee1dd2009-01-07 21:33:08 +0000143 server.start()
144
jadmanski0afbb632008-06-06 21:10:57 +0000145 try:
showardc5afc462009-01-13 00:09:39 +0000146 init(options.logfile)
147 dispatcher = Dispatcher()
showard915958d2009-04-22 21:00:58 +0000148 dispatcher.initialize(recover_hosts=options.recover_hosts)
showardc5afc462009-01-13 00:09:39 +0000149
jadmanski0afbb632008-06-06 21:10:57 +0000150 while not _shutdown:
151 dispatcher.tick()
showardd1ee1dd2009-01-07 21:33:08 +0000152 time.sleep(scheduler_config.config.tick_pause_sec)
jadmanski0afbb632008-06-06 21:10:57 +0000153 except:
showard170873e2009-01-07 00:22:26 +0000154 email_manager.manager.log_stacktrace(
155 "Uncaught exception; terminating monitor_db")
jadmanski0afbb632008-06-06 21:10:57 +0000156
showard170873e2009-01-07 00:22:26 +0000157 email_manager.manager.send_queued_emails()
showard55b4b542009-01-08 23:30:30 +0000158 server.shutdown()
showard170873e2009-01-07 00:22:26 +0000159 _drone_manager.shutdown()
jadmanski0afbb632008-06-06 21:10:57 +0000160 _db.disconnect()
mbligh36768f02008-02-22 18:28:33 +0000161
162
163def handle_sigint(signum, frame):
jadmanski0afbb632008-06-06 21:10:57 +0000164 global _shutdown
165 _shutdown = True
showardb18134f2009-03-20 20:52:18 +0000166 logging.info("Shutdown request received.")
mbligh36768f02008-02-22 18:28:33 +0000167
168
169def init(logfile):
jadmanski0afbb632008-06-06 21:10:57 +0000170 if logfile:
171 enable_logging(logfile)
showardb18134f2009-03-20 20:52:18 +0000172 logging.info("%s> dispatcher starting", time.strftime("%X %x"))
173 logging.info("My PID is %d", os.getpid())
mbligh36768f02008-02-22 18:28:33 +0000174
mblighfb676032009-04-01 18:25:38 +0000175 utils.write_pid("monitor_db")
176
showardb1e51872008-10-07 11:08:18 +0000177 if _testing_mode:
178 global_config.global_config.override_config_value(
showard170873e2009-01-07 00:22:26 +0000179 DB_CONFIG_SECTION, 'database', 'stresstest_autotest_web')
showardb1e51872008-10-07 11:08:18 +0000180
jadmanski0afbb632008-06-06 21:10:57 +0000181 os.environ['PATH'] = AUTOTEST_SERVER_DIR + ':' + os.environ['PATH']
182 global _db
showard170873e2009-01-07 00:22:26 +0000183 _db = database_connection.DatabaseConnection(DB_CONFIG_SECTION)
jadmanski0afbb632008-06-06 21:10:57 +0000184 _db.connect()
mbligh36768f02008-02-22 18:28:33 +0000185
showardfa8629c2008-11-04 16:51:23 +0000186 # ensure Django connection is in autocommit
187 setup_django_environment.enable_autocommit()
188
showardb18134f2009-03-20 20:52:18 +0000189 logging.info("Setting signal handler")
jadmanski0afbb632008-06-06 21:10:57 +0000190 signal.signal(signal.SIGINT, handle_sigint)
191
showardd1ee1dd2009-01-07 21:33:08 +0000192 drones = global_config.global_config.get_config_value(
193 scheduler_config.CONFIG_SECTION, 'drones', default='localhost')
194 drone_list = [hostname.strip() for hostname in drones.split(',')]
showard170873e2009-01-07 00:22:26 +0000195 results_host = global_config.global_config.get_config_value(
showardd1ee1dd2009-01-07 21:33:08 +0000196 scheduler_config.CONFIG_SECTION, 'results_host', default='localhost')
showard170873e2009-01-07 00:22:26 +0000197 _drone_manager.initialize(RESULTS_DIR, drone_list, results_host)
198
showardb18134f2009-03-20 20:52:18 +0000199 logging.info("Connected! Running...")
mbligh36768f02008-02-22 18:28:33 +0000200
201
202def enable_logging(logfile):
jadmanski0afbb632008-06-06 21:10:57 +0000203 out_file = logfile
204 err_file = "%s.err" % logfile
showardb18134f2009-03-20 20:52:18 +0000205 logging.info("Enabling logging to %s (%s)", out_file, err_file)
jadmanski0afbb632008-06-06 21:10:57 +0000206 out_fd = open(out_file, "a", buffering=0)
207 err_fd = open(err_file, "a", buffering=0)
mbligh36768f02008-02-22 18:28:33 +0000208
jadmanski0afbb632008-06-06 21:10:57 +0000209 os.dup2(out_fd.fileno(), sys.stdout.fileno())
210 os.dup2(err_fd.fileno(), sys.stderr.fileno())
mbligh36768f02008-02-22 18:28:33 +0000211
jadmanski0afbb632008-06-06 21:10:57 +0000212 sys.stdout = out_fd
213 sys.stderr = err_fd
mbligh36768f02008-02-22 18:28:33 +0000214
215
showard87ba02a2009-04-20 19:37:32 +0000216def _autoserv_command_line(machines, results_dir, extra_args, job=None,
217 queue_entry=None):
218 autoserv_argv = [_autoserv_path, '-p', '-m', machines,
219 '-r', _drone_manager.absolute_path(results_dir)]
220 if job or queue_entry:
221 if not job:
222 job = queue_entry.job
223 autoserv_argv += ['-u', job.owner, '-l', job.name]
224 return autoserv_argv + extra_args
225
226
showard89f84db2009-03-12 20:39:13 +0000227class SchedulerError(Exception):
228 """Raised by HostScheduler when an inconsistent state occurs."""
229
230
showard63a34772008-08-18 19:32:50 +0000231class HostScheduler(object):
232 def _get_ready_hosts(self):
233 # avoid any host with a currently active queue entry against it
234 hosts = Host.fetch(
235 joins='LEFT JOIN host_queue_entries AS active_hqe '
236 'ON (hosts.id = active_hqe.host_id AND '
showardb1e51872008-10-07 11:08:18 +0000237 'active_hqe.active)',
showard63a34772008-08-18 19:32:50 +0000238 where="active_hqe.host_id IS NULL "
showardb1e51872008-10-07 11:08:18 +0000239 "AND NOT hosts.locked "
showard63a34772008-08-18 19:32:50 +0000240 "AND (hosts.status IS NULL OR hosts.status = 'Ready')")
241 return dict((host.id, host) for host in hosts)
242
243
244 @staticmethod
245 def _get_sql_id_list(id_list):
246 return ','.join(str(item_id) for item_id in id_list)
247
248
249 @classmethod
showard989f25d2008-10-01 11:38:11 +0000250 def _get_many2many_dict(cls, query, id_list, flip=False):
mbligh849a0f62008-08-28 20:12:19 +0000251 if not id_list:
252 return {}
showard63a34772008-08-18 19:32:50 +0000253 query %= cls._get_sql_id_list(id_list)
254 rows = _db.execute(query)
showard989f25d2008-10-01 11:38:11 +0000255 return cls._process_many2many_dict(rows, flip)
256
257
258 @staticmethod
259 def _process_many2many_dict(rows, flip=False):
showard63a34772008-08-18 19:32:50 +0000260 result = {}
261 for row in rows:
showard89f84db2009-03-12 20:39:13 +0000262 left_id, right_id = int(row[0]), int(row[1])
showard989f25d2008-10-01 11:38:11 +0000263 if flip:
264 left_id, right_id = right_id, left_id
showard63a34772008-08-18 19:32:50 +0000265 result.setdefault(left_id, set()).add(right_id)
266 return result
267
268
269 @classmethod
270 def _get_job_acl_groups(cls, job_ids):
271 query = """
showardd9ac4452009-02-07 02:04:37 +0000272 SELECT jobs.id, acl_groups_users.aclgroup_id
showard63a34772008-08-18 19:32:50 +0000273 FROM jobs
274 INNER JOIN users ON users.login = jobs.owner
275 INNER JOIN acl_groups_users ON acl_groups_users.user_id = users.id
276 WHERE jobs.id IN (%s)
277 """
278 return cls._get_many2many_dict(query, job_ids)
279
280
281 @classmethod
282 def _get_job_ineligible_hosts(cls, job_ids):
283 query = """
284 SELECT job_id, host_id
285 FROM ineligible_host_queues
286 WHERE job_id IN (%s)
287 """
288 return cls._get_many2many_dict(query, job_ids)
289
290
291 @classmethod
showard989f25d2008-10-01 11:38:11 +0000292 def _get_job_dependencies(cls, job_ids):
293 query = """
294 SELECT job_id, label_id
295 FROM jobs_dependency_labels
296 WHERE job_id IN (%s)
297 """
298 return cls._get_many2many_dict(query, job_ids)
299
300
301 @classmethod
showard63a34772008-08-18 19:32:50 +0000302 def _get_host_acls(cls, host_ids):
303 query = """
showardd9ac4452009-02-07 02:04:37 +0000304 SELECT host_id, aclgroup_id
showard63a34772008-08-18 19:32:50 +0000305 FROM acl_groups_hosts
306 WHERE host_id IN (%s)
307 """
308 return cls._get_many2many_dict(query, host_ids)
309
310
311 @classmethod
312 def _get_label_hosts(cls, host_ids):
showardfa8629c2008-11-04 16:51:23 +0000313 if not host_ids:
314 return {}, {}
showard63a34772008-08-18 19:32:50 +0000315 query = """
316 SELECT label_id, host_id
317 FROM hosts_labels
318 WHERE host_id IN (%s)
showard989f25d2008-10-01 11:38:11 +0000319 """ % cls._get_sql_id_list(host_ids)
320 rows = _db.execute(query)
321 labels_to_hosts = cls._process_many2many_dict(rows)
322 hosts_to_labels = cls._process_many2many_dict(rows, flip=True)
323 return labels_to_hosts, hosts_to_labels
324
325
326 @classmethod
327 def _get_labels(cls):
328 return dict((label.id, label) for label in Label.fetch())
showard63a34772008-08-18 19:32:50 +0000329
330
331 def refresh(self, pending_queue_entries):
332 self._hosts_available = self._get_ready_hosts()
333
334 relevant_jobs = [queue_entry.job_id
335 for queue_entry in pending_queue_entries]
336 self._job_acls = self._get_job_acl_groups(relevant_jobs)
337 self._ineligible_hosts = self._get_job_ineligible_hosts(relevant_jobs)
showard989f25d2008-10-01 11:38:11 +0000338 self._job_dependencies = self._get_job_dependencies(relevant_jobs)
showard63a34772008-08-18 19:32:50 +0000339
340 host_ids = self._hosts_available.keys()
341 self._host_acls = self._get_host_acls(host_ids)
showard989f25d2008-10-01 11:38:11 +0000342 self._label_hosts, self._host_labels = self._get_label_hosts(host_ids)
343
344 self._labels = self._get_labels()
showard63a34772008-08-18 19:32:50 +0000345
346
347 def _is_acl_accessible(self, host_id, queue_entry):
348 job_acls = self._job_acls.get(queue_entry.job_id, set())
349 host_acls = self._host_acls.get(host_id, set())
350 return len(host_acls.intersection(job_acls)) > 0
351
352
showard989f25d2008-10-01 11:38:11 +0000353 def _check_job_dependencies(self, job_dependencies, host_labels):
354 missing = job_dependencies - host_labels
showard89f84db2009-03-12 20:39:13 +0000355 return len(missing) == 0
showard989f25d2008-10-01 11:38:11 +0000356
357
358 def _check_only_if_needed_labels(self, job_dependencies, host_labels,
359 queue_entry):
showardade14e22009-01-26 22:38:32 +0000360 if not queue_entry.meta_host:
361 # bypass only_if_needed labels when a specific host is selected
362 return True
363
showard989f25d2008-10-01 11:38:11 +0000364 for label_id in host_labels:
365 label = self._labels[label_id]
366 if not label.only_if_needed:
367 # we don't care about non-only_if_needed labels
368 continue
369 if queue_entry.meta_host == label_id:
370 # if the label was requested in a metahost it's OK
371 continue
372 if label_id not in job_dependencies:
373 return False
374 return True
375
376
showard89f84db2009-03-12 20:39:13 +0000377 def _check_atomic_group_labels(self, host_labels, queue_entry):
378 """
379 Determine if the given HostQueueEntry's atomic group settings are okay
380 to schedule on a host with the given labels.
381
382 @param host_labels - A list of label ids that the host has.
383 @param queue_entry - The HostQueueEntry being considered for the host.
384
385 @returns True if atomic group settings are okay, False otherwise.
386 """
387 return (self._get_host_atomic_group_id(host_labels) ==
388 queue_entry.atomic_group_id)
389
390
391 def _get_host_atomic_group_id(self, host_labels):
392 """
393 Return the atomic group label id for a host with the given set of
394 labels if any, or None otherwise. Raises an exception if more than
395 one atomic group are found in the set of labels.
396
397 @param host_labels - A list of label ids that the host has.
398
399 @returns The id of the atomic group found on a label in host_labels
400 or None if no atomic group label is found.
401 @raises SchedulerError - If more than one atomic group label is found.
402 """
403 atomic_ids = [self._labels[label_id].atomic_group_id
404 for label_id in host_labels
405 if self._labels[label_id].atomic_group_id is not None]
406 if not atomic_ids:
407 return None
408 if len(atomic_ids) > 1:
409 raise SchedulerError('More than one atomic label on host.')
410 return atomic_ids[0]
411
412
413 def _get_atomic_group_labels(self, atomic_group_id):
414 """
415 Lookup the label ids that an atomic_group is associated with.
416
417 @param atomic_group_id - The id of the AtomicGroup to look up.
418
419 @returns A generator yeilding Label ids for this atomic group.
420 """
421 return (id for id, label in self._labels.iteritems()
422 if label.atomic_group_id == atomic_group_id
423 and not label.invalid)
424
425
426 def _get_eligible_hosts_in_group(self, group_hosts, queue_entry):
427 """
428 @param group_hosts - A sequence of Host ids to test for usability
429 and eligibility against the Job associated with queue_entry.
430 @param queue_entry - The HostQueueEntry that these hosts are being
431 tested for eligibility against.
432
433 @returns A subset of group_hosts Host ids that are eligible for the
434 supplied queue_entry.
435 """
436 return set(host_id for host_id in group_hosts
437 if self._is_host_usable(host_id)
438 and self._is_host_eligible_for_job(host_id, queue_entry))
439
440
showard989f25d2008-10-01 11:38:11 +0000441 def _is_host_eligible_for_job(self, host_id, queue_entry):
442 job_dependencies = self._job_dependencies.get(queue_entry.job_id, set())
443 host_labels = self._host_labels.get(host_id, set())
mblighc993bee2008-10-03 03:42:34 +0000444
showard89f84db2009-03-12 20:39:13 +0000445 return (self._is_acl_accessible(host_id, queue_entry) and
446 self._check_job_dependencies(job_dependencies, host_labels) and
447 self._check_only_if_needed_labels(
448 job_dependencies, host_labels, queue_entry) and
449 self._check_atomic_group_labels(host_labels, queue_entry))
showard989f25d2008-10-01 11:38:11 +0000450
451
showard63a34772008-08-18 19:32:50 +0000452 def _schedule_non_metahost(self, queue_entry):
showard989f25d2008-10-01 11:38:11 +0000453 if not self._is_host_eligible_for_job(queue_entry.host_id, queue_entry):
showard63a34772008-08-18 19:32:50 +0000454 return None
455 return self._hosts_available.pop(queue_entry.host_id, None)
456
457
458 def _is_host_usable(self, host_id):
459 if host_id not in self._hosts_available:
460 # host was already used during this scheduling cycle
461 return False
462 if self._hosts_available[host_id].invalid:
463 # Invalid hosts cannot be used for metahosts. They're included in
464 # the original query because they can be used by non-metahosts.
465 return False
466 return True
467
468
469 def _schedule_metahost(self, queue_entry):
470 label_id = queue_entry.meta_host
471 hosts_in_label = self._label_hosts.get(label_id, set())
472 ineligible_host_ids = self._ineligible_hosts.get(queue_entry.job_id,
473 set())
474
475 # must iterate over a copy so we can mutate the original while iterating
476 for host_id in list(hosts_in_label):
477 if not self._is_host_usable(host_id):
478 hosts_in_label.remove(host_id)
479 continue
480 if host_id in ineligible_host_ids:
481 continue
showard989f25d2008-10-01 11:38:11 +0000482 if not self._is_host_eligible_for_job(host_id, queue_entry):
showard63a34772008-08-18 19:32:50 +0000483 continue
484
showard89f84db2009-03-12 20:39:13 +0000485 # Remove the host from our cached internal state before returning
486 # the host object.
showard63a34772008-08-18 19:32:50 +0000487 hosts_in_label.remove(host_id)
488 return self._hosts_available.pop(host_id)
489 return None
490
491
492 def find_eligible_host(self, queue_entry):
493 if not queue_entry.meta_host:
showard89f84db2009-03-12 20:39:13 +0000494 assert queue_entry.host_id is not None
showard63a34772008-08-18 19:32:50 +0000495 return self._schedule_non_metahost(queue_entry)
showard89f84db2009-03-12 20:39:13 +0000496 assert queue_entry.atomic_group_id is None
showard63a34772008-08-18 19:32:50 +0000497 return self._schedule_metahost(queue_entry)
498
499
showard89f84db2009-03-12 20:39:13 +0000500 def find_eligible_atomic_group(self, queue_entry):
501 """
502 Given an atomic group host queue entry, locate an appropriate group
503 of hosts for the associated job to run on.
504
505 The caller is responsible for creating new HQEs for the additional
506 hosts returned in order to run the actual job on them.
507
508 @returns A list of Host instances in a ready state to satisfy this
509 atomic group scheduling. Hosts will all belong to the same
510 atomic group label as specified by the queue_entry.
511 An empty list will be returned if no suitable atomic
512 group could be found.
513
514 TODO(gps): what is responsible for kicking off any attempted repairs on
515 a group of hosts? not this function, but something needs to. We do
516 not communicate that reason for returning [] outside of here...
517 For now, we'll just be unschedulable if enough hosts within one group
518 enter Repair Failed state.
519 """
520 assert queue_entry.atomic_group_id is not None
521 job = queue_entry.job
522 assert job.synch_count and job.synch_count > 0
523 atomic_group = AtomicGroup(id=queue_entry.atomic_group_id)
524 if job.synch_count > atomic_group.max_number_of_machines:
525 # Such a Job and HostQueueEntry should never be possible to
526 # create using the frontend. Regardless, we can't process it.
527 # Abort it immediately and log an error on the scheduler.
528 queue_entry.set_status(models.HostQueueEntry.Status.ABORTED)
showard7629f142009-03-27 21:02:02 +0000529 logging.error(
530 'Error: job %d synch_count=%d > requested atomic_group %d '
531 'max_number_of_machines=%d. Aborted host_queue_entry %d.',
532 job.id, job.synch_count, atomic_group.id,
533 atomic_group.max_number_of_machines, queue_entry.id)
showard89f84db2009-03-12 20:39:13 +0000534 return []
535 hosts_in_label = self._label_hosts.get(queue_entry.meta_host, set())
536 ineligible_host_ids = self._ineligible_hosts.get(queue_entry.job_id,
537 set())
538
539 # Look in each label associated with atomic_group until we find one with
540 # enough hosts to satisfy the job.
541 for atomic_label_id in self._get_atomic_group_labels(atomic_group.id):
542 group_hosts = set(self._label_hosts.get(atomic_label_id, set()))
543 if queue_entry.meta_host is not None:
544 # If we have a metahost label, only allow its hosts.
545 group_hosts.intersection_update(hosts_in_label)
546 group_hosts -= ineligible_host_ids
547 eligible_hosts_in_group = self._get_eligible_hosts_in_group(
548 group_hosts, queue_entry)
549
550 # Job.synch_count is treated as "minimum synch count" when
551 # scheduling for an atomic group of hosts. The atomic group
552 # number of machines is the maximum to pick out of a single
553 # atomic group label for scheduling at one time.
554 min_hosts = job.synch_count
555 max_hosts = atomic_group.max_number_of_machines
556
557 if len(eligible_hosts_in_group) < min_hosts:
558 # Not enough eligible hosts in this atomic group label.
559 continue
560
561 # Limit ourselves to scheduling the atomic group size.
562 if len(eligible_hosts_in_group) > max_hosts:
563 eligible_hosts_in_group = random.sample(
564 eligible_hosts_in_group, max_hosts)
565
566 # Remove the selected hosts from our cached internal state
567 # of available hosts in order to return the Host objects.
568 host_list = []
569 for host_id in eligible_hosts_in_group:
570 hosts_in_label.discard(host_id)
571 host_list.append(self._hosts_available.pop(host_id))
572 return host_list
573
574 return []
575
576
showard170873e2009-01-07 00:22:26 +0000577class Dispatcher(object):
jadmanski0afbb632008-06-06 21:10:57 +0000578 def __init__(self):
579 self._agents = []
showard3bb499f2008-07-03 19:42:20 +0000580 self._last_clean_time = time.time()
showard63a34772008-08-18 19:32:50 +0000581 self._host_scheduler = HostScheduler()
mblighf3294cc2009-04-08 21:17:38 +0000582 user_cleanup_time = scheduler_config.config.clean_interval
583 self._periodic_cleanup = monitor_db_cleanup.UserCleanup(
584 _db, user_cleanup_time)
585 self._24hr_upkeep = monitor_db_cleanup.TwentyFourHourUpkeep(_db)
showard170873e2009-01-07 00:22:26 +0000586 self._host_agents = {}
587 self._queue_entry_agents = {}
mbligh36768f02008-02-22 18:28:33 +0000588
mbligh36768f02008-02-22 18:28:33 +0000589
showard915958d2009-04-22 21:00:58 +0000590 def initialize(self, recover_hosts=True):
591 self._periodic_cleanup.initialize()
592 self._24hr_upkeep.initialize()
593
jadmanski0afbb632008-06-06 21:10:57 +0000594 # always recover processes
595 self._recover_processes()
mblighbb421852008-03-11 22:36:16 +0000596
jadmanski0afbb632008-06-06 21:10:57 +0000597 if recover_hosts:
598 self._recover_hosts()
mbligh36768f02008-02-22 18:28:33 +0000599
600
jadmanski0afbb632008-06-06 21:10:57 +0000601 def tick(self):
showard170873e2009-01-07 00:22:26 +0000602 _drone_manager.refresh()
mblighf3294cc2009-04-08 21:17:38 +0000603 self._run_cleanup()
jadmanski0afbb632008-06-06 21:10:57 +0000604 self._find_aborting()
605 self._schedule_new_jobs()
606 self._handle_agents()
showard170873e2009-01-07 00:22:26 +0000607 _drone_manager.execute_actions()
608 email_manager.manager.send_queued_emails()
mbligh36768f02008-02-22 18:28:33 +0000609
showard97aed502008-11-04 02:01:24 +0000610
mblighf3294cc2009-04-08 21:17:38 +0000611 def _run_cleanup(self):
612 self._periodic_cleanup.run_cleanup_maybe()
613 self._24hr_upkeep.run_cleanup_maybe()
showarda3ab0d52008-11-03 19:03:47 +0000614
mbligh36768f02008-02-22 18:28:33 +0000615
showard170873e2009-01-07 00:22:26 +0000616 def _register_agent_for_ids(self, agent_dict, object_ids, agent):
617 for object_id in object_ids:
618 agent_dict.setdefault(object_id, set()).add(agent)
619
620
621 def _unregister_agent_for_ids(self, agent_dict, object_ids, agent):
622 for object_id in object_ids:
623 assert object_id in agent_dict
624 agent_dict[object_id].remove(agent)
625
626
jadmanski0afbb632008-06-06 21:10:57 +0000627 def add_agent(self, agent):
628 self._agents.append(agent)
629 agent.dispatcher = self
showard170873e2009-01-07 00:22:26 +0000630 self._register_agent_for_ids(self._host_agents, agent.host_ids, agent)
631 self._register_agent_for_ids(self._queue_entry_agents,
632 agent.queue_entry_ids, agent)
mblighd5c95802008-03-05 00:33:46 +0000633
showard170873e2009-01-07 00:22:26 +0000634
635 def get_agents_for_entry(self, queue_entry):
636 """
637 Find agents corresponding to the specified queue_entry.
638 """
showardd3dc1992009-04-22 21:01:40 +0000639 return list(self._queue_entry_agents.get(queue_entry.id, set()))
showard170873e2009-01-07 00:22:26 +0000640
641
642 def host_has_agent(self, host):
643 """
644 Determine if there is currently an Agent present using this host.
645 """
646 return bool(self._host_agents.get(host.id, None))
mbligh36768f02008-02-22 18:28:33 +0000647
648
jadmanski0afbb632008-06-06 21:10:57 +0000649 def remove_agent(self, agent):
650 self._agents.remove(agent)
showard170873e2009-01-07 00:22:26 +0000651 self._unregister_agent_for_ids(self._host_agents, agent.host_ids,
652 agent)
653 self._unregister_agent_for_ids(self._queue_entry_agents,
654 agent.queue_entry_ids, agent)
showardec113162008-05-08 00:52:49 +0000655
656
jadmanski0afbb632008-06-06 21:10:57 +0000657 def _recover_processes(self):
showard170873e2009-01-07 00:22:26 +0000658 self._register_pidfiles()
659 _drone_manager.refresh()
showardd3dc1992009-04-22 21:01:40 +0000660 self._recover_all_recoverable_entries()
showard170873e2009-01-07 00:22:26 +0000661 self._requeue_other_active_entries()
showard170873e2009-01-07 00:22:26 +0000662 self._reverify_remaining_hosts()
663 # reinitialize drones after killing orphaned processes, since they can
664 # leave around files when they die
665 _drone_manager.execute_actions()
666 _drone_manager.reinitialize_drones()
mblighbb421852008-03-11 22:36:16 +0000667
showard170873e2009-01-07 00:22:26 +0000668
669 def _register_pidfiles(self):
670 # during recovery we may need to read pidfiles for both running and
671 # parsing entries
672 queue_entries = HostQueueEntry.fetch(
showardd3dc1992009-04-22 21:01:40 +0000673 where="status IN ('Running', 'Gathering', 'Parsing')")
jadmanski0afbb632008-06-06 21:10:57 +0000674 for queue_entry in queue_entries:
showardd3dc1992009-04-22 21:01:40 +0000675 for pidfile_name in _ALL_PIDFILE_NAMES:
676 pidfile_id = _drone_manager.get_pidfile_id_from(
677 queue_entry.execution_tag(), pidfile_name=pidfile_name)
678 _drone_manager.register_pidfile(pidfile_id)
showard170873e2009-01-07 00:22:26 +0000679
680
showardd3dc1992009-04-22 21:01:40 +0000681 def _recover_entries_with_status(self, status, orphans, pidfile_name,
682 recover_entries_fn):
683 queue_entries = HostQueueEntry.fetch(where="status = '%s'" % status)
showard170873e2009-01-07 00:22:26 +0000684 for queue_entry in queue_entries:
685 if self.get_agents_for_entry(queue_entry):
jadmanski0afbb632008-06-06 21:10:57 +0000686 # synchronous job we've already recovered
687 continue
showardd3dc1992009-04-22 21:01:40 +0000688 queue_entries = queue_entry.job.get_group_entries(queue_entry)
showard170873e2009-01-07 00:22:26 +0000689 execution_tag = queue_entry.execution_tag()
690 run_monitor = PidfileRunMonitor()
showardd3dc1992009-04-22 21:01:40 +0000691 run_monitor.attach_to_existing_process(execution_tag,
692 pidfile_name=pidfile_name)
showard170873e2009-01-07 00:22:26 +0000693 if not run_monitor.has_process():
showardd3dc1992009-04-22 21:01:40 +0000694 # execution apparently never happened
695 recover_entries_fn(queue_entry.job, queue_entries, None)
showard170873e2009-01-07 00:22:26 +0000696 continue
mbligh90a549d2008-03-25 23:52:34 +0000697
showardd3dc1992009-04-22 21:01:40 +0000698 logging.info('Recovering %s entry %s (process %s)',
699 status.lower(),
700 ', '.join(str(entry) for entry in queue_entries),
701 run_monitor.get_process())
702 recover_entries_fn(queue_entry.job, queue_entries, run_monitor)
703 orphans.discard(run_monitor.get_process())
704
705
706 def _kill_remaining_orphan_processes(self, orphans):
707 for process in orphans:
showardb18134f2009-03-20 20:52:18 +0000708 logging.info('Killing orphan %s', process)
showard170873e2009-01-07 00:22:26 +0000709 _drone_manager.kill_process(process)
jadmanski0afbb632008-06-06 21:10:57 +0000710
showard170873e2009-01-07 00:22:26 +0000711
showardd3dc1992009-04-22 21:01:40 +0000712 def _recover_running_entries(self, orphans):
713 def recover_entries(job, queue_entries, run_monitor):
714 if run_monitor is not None:
715 queue_task = RecoveryQueueTask(job=job,
716 queue_entries=queue_entries,
717 run_monitor=run_monitor)
718 self.add_agent(Agent(tasks=[queue_task],
719 num_processes=len(queue_entries)))
720 # else, _requeue_other_active_entries will cover this
721
722 self._recover_entries_with_status(models.HostQueueEntry.Status.RUNNING,
723 orphans, '.autoserv_execute',
724 recover_entries)
725
726
727 def _recover_gathering_entries(self, orphans):
728 def recover_entries(job, queue_entries, run_monitor):
729 gather_task = GatherLogsTask(job, queue_entries,
730 run_monitor=run_monitor)
731 self.add_agent(Agent([gather_task]))
732
733 self._recover_entries_with_status(
734 models.HostQueueEntry.Status.GATHERING,
735 orphans, _CRASHINFO_PID_FILE, recover_entries)
736
737
738 def _recover_parsing_entries(self, orphans):
739 def recover_entries(job, queue_entries, run_monitor):
740 reparse_task = FinalReparseTask(queue_entries,
741 run_monitor=run_monitor)
742 self.add_agent(Agent([reparse_task], num_processes=0))
743
744 self._recover_entries_with_status(models.HostQueueEntry.Status.PARSING,
745 orphans, _PARSER_PID_FILE,
746 recover_entries)
747
748
749 def _recover_all_recoverable_entries(self):
750 orphans = _drone_manager.get_orphaned_autoserv_processes()
751 self._recover_running_entries(orphans)
752 self._recover_gathering_entries(orphans)
753 self._recover_parsing_entries(orphans)
754 self._kill_remaining_orphan_processes(orphans)
jadmanski0afbb632008-06-06 21:10:57 +0000755
showard97aed502008-11-04 02:01:24 +0000756
showard170873e2009-01-07 00:22:26 +0000757 def _requeue_other_active_entries(self):
758 queue_entries = HostQueueEntry.fetch(
showardd3dc1992009-04-22 21:01:40 +0000759 where='active AND NOT complete AND '
760 '(aborted OR status != "Pending")')
showard170873e2009-01-07 00:22:26 +0000761 for queue_entry in queue_entries:
762 if self.get_agents_for_entry(queue_entry):
763 # entry has already been recovered
764 continue
showardd3dc1992009-04-22 21:01:40 +0000765 if queue_entry.aborted:
766 queue_entry.abort(self)
767 continue
768
769 logging.info('Requeuing active QE %s (status=%s)', queue_entry,
showardb18134f2009-03-20 20:52:18 +0000770 queue_entry.status)
showard170873e2009-01-07 00:22:26 +0000771 if queue_entry.host:
772 tasks = queue_entry.host.reverify_tasks()
773 self.add_agent(Agent(tasks))
774 agent = queue_entry.requeue()
775
776
777 def _reverify_remaining_hosts(self):
showard45ae8192008-11-05 19:32:53 +0000778 # reverify hosts that were in the middle of verify, repair or cleanup
jadmanski0afbb632008-06-06 21:10:57 +0000779 self._reverify_hosts_where("""(status = 'Repairing' OR
780 status = 'Verifying' OR
showard170873e2009-01-07 00:22:26 +0000781 status = 'Cleaning')""")
jadmanski0afbb632008-06-06 21:10:57 +0000782
showard170873e2009-01-07 00:22:26 +0000783 # recover "Running" hosts with no active queue entries, although this
784 # should never happen
785 message = ('Recovering running host %s - this probably indicates a '
786 'scheduler bug')
jadmanski0afbb632008-06-06 21:10:57 +0000787 self._reverify_hosts_where("""status = 'Running' AND
788 id NOT IN (SELECT host_id
789 FROM host_queue_entries
790 WHERE active)""",
791 print_message=message)
mblighbb421852008-03-11 22:36:16 +0000792
793
jadmanski0afbb632008-06-06 21:10:57 +0000794 def _reverify_hosts_where(self, where,
showard170873e2009-01-07 00:22:26 +0000795 print_message='Reverifying host %s'):
796 full_where='locked = 0 AND invalid = 0 AND ' + where
797 for host in Host.fetch(where=full_where):
798 if self.host_has_agent(host):
799 # host has already been recovered in some way
jadmanski0afbb632008-06-06 21:10:57 +0000800 continue
showard170873e2009-01-07 00:22:26 +0000801 if print_message:
showardb18134f2009-03-20 20:52:18 +0000802 logging.info(print_message, host.hostname)
showard170873e2009-01-07 00:22:26 +0000803 tasks = host.reverify_tasks()
804 self.add_agent(Agent(tasks))
mbligh36768f02008-02-22 18:28:33 +0000805
806
jadmanski0afbb632008-06-06 21:10:57 +0000807 def _recover_hosts(self):
808 # recover "Repair Failed" hosts
809 message = 'Reverifying dead host %s'
810 self._reverify_hosts_where("status = 'Repair Failed'",
811 print_message=message)
mbligh62ba2ed2008-04-30 17:09:25 +0000812
813
showard04c82c52008-05-29 19:38:12 +0000814
showardb95b1bd2008-08-15 18:11:04 +0000815 def _get_pending_queue_entries(self):
showard63a34772008-08-18 19:32:50 +0000816 # prioritize by job priority, then non-metahost over metahost, then FIFO
817 return list(HostQueueEntry.fetch(
showard25cbdbd2009-02-17 20:57:21 +0000818 joins='INNER JOIN jobs ON (job_id=jobs.id)',
showardac9ce222008-12-03 18:19:44 +0000819 where='NOT complete AND NOT active AND status="Queued"',
showard25cbdbd2009-02-17 20:57:21 +0000820 order_by='jobs.priority DESC, meta_host, job_id'))
mbligh36768f02008-02-22 18:28:33 +0000821
822
showard89f84db2009-03-12 20:39:13 +0000823 def _refresh_pending_queue_entries(self):
824 """
825 Lookup the pending HostQueueEntries and call our HostScheduler
826 refresh() method given that list. Return the list.
827
828 @returns A list of pending HostQueueEntries sorted in priority order.
829 """
showard63a34772008-08-18 19:32:50 +0000830 queue_entries = self._get_pending_queue_entries()
831 if not queue_entries:
showard89f84db2009-03-12 20:39:13 +0000832 return []
showardb95b1bd2008-08-15 18:11:04 +0000833
showard63a34772008-08-18 19:32:50 +0000834 self._host_scheduler.refresh(queue_entries)
showardb95b1bd2008-08-15 18:11:04 +0000835
showard89f84db2009-03-12 20:39:13 +0000836 return queue_entries
837
838
839 def _schedule_atomic_group(self, queue_entry):
840 """
841 Schedule the given queue_entry on an atomic group of hosts.
842
843 Returns immediately if there are insufficient available hosts.
844
845 Creates new HostQueueEntries based off of queue_entry for the
846 scheduled hosts and starts them all running.
847 """
848 # This is a virtual host queue entry representing an entire
849 # atomic group, find a group and schedule their hosts.
850 group_hosts = self._host_scheduler.find_eligible_atomic_group(
851 queue_entry)
852 if not group_hosts:
853 return
854 # The first assigned host uses the original HostQueueEntry
855 group_queue_entries = [queue_entry]
856 for assigned_host in group_hosts[1:]:
857 # Create a new HQE for every additional assigned_host.
858 new_hqe = HostQueueEntry.clone(queue_entry)
859 new_hqe.save()
860 group_queue_entries.append(new_hqe)
861 assert len(group_queue_entries) == len(group_hosts)
862 for queue_entry, host in itertools.izip(group_queue_entries,
863 group_hosts):
864 self._run_queue_entry(queue_entry, host)
865
866
867 def _schedule_new_jobs(self):
868 queue_entries = self._refresh_pending_queue_entries()
869 if not queue_entries:
870 return
871
showard63a34772008-08-18 19:32:50 +0000872 for queue_entry in queue_entries:
showard89f84db2009-03-12 20:39:13 +0000873 if (queue_entry.atomic_group_id is None or
874 queue_entry.host_id is not None):
875 assigned_host = self._host_scheduler.find_eligible_host(
876 queue_entry)
877 if assigned_host:
878 self._run_queue_entry(queue_entry, assigned_host)
879 else:
880 self._schedule_atomic_group(queue_entry)
showardb95b1bd2008-08-15 18:11:04 +0000881
882
883 def _run_queue_entry(self, queue_entry, host):
884 agent = queue_entry.run(assigned_host=host)
showard170873e2009-01-07 00:22:26 +0000885 # in some cases (synchronous jobs with run_verify=False), agent may be
886 # None
showard9976ce92008-10-15 20:28:13 +0000887 if agent:
888 self.add_agent(agent)
mblighd5c95802008-03-05 00:33:46 +0000889
890
jadmanski0afbb632008-06-06 21:10:57 +0000891 def _find_aborting(self):
showardd3dc1992009-04-22 21:01:40 +0000892 for entry in HostQueueEntry.fetch(where='aborted and not complete'):
893 for agent in self.get_agents_for_entry(entry):
894 agent.abort()
895 entry.abort(self)
jadmanski0afbb632008-06-06 21:10:57 +0000896
897
showard324bf812009-01-20 23:23:38 +0000898 def _can_start_agent(self, agent, num_started_this_cycle,
899 have_reached_limit):
showard4c5374f2008-09-04 17:02:56 +0000900 # always allow zero-process agents to run
901 if agent.num_processes == 0:
902 return True
903 # don't allow any nonzero-process agents to run after we've reached a
904 # limit (this avoids starvation of many-process agents)
905 if have_reached_limit:
906 return False
907 # total process throttling
showard324bf812009-01-20 23:23:38 +0000908 if agent.num_processes > _drone_manager.max_runnable_processes():
showard4c5374f2008-09-04 17:02:56 +0000909 return False
910 # if a single agent exceeds the per-cycle throttling, still allow it to
911 # run when it's the first agent in the cycle
912 if num_started_this_cycle == 0:
913 return True
914 # per-cycle throttling
915 if (num_started_this_cycle + agent.num_processes >
showardd1ee1dd2009-01-07 21:33:08 +0000916 scheduler_config.config.max_processes_started_per_cycle):
showard4c5374f2008-09-04 17:02:56 +0000917 return False
918 return True
919
920
jadmanski0afbb632008-06-06 21:10:57 +0000921 def _handle_agents(self):
jadmanski0afbb632008-06-06 21:10:57 +0000922 num_started_this_cycle = 0
showard4c5374f2008-09-04 17:02:56 +0000923 have_reached_limit = False
924 # iterate over copy, so we can remove agents during iteration
925 for agent in list(self._agents):
926 if agent.is_done():
showardb18134f2009-03-20 20:52:18 +0000927 logging.info("agent finished")
showard170873e2009-01-07 00:22:26 +0000928 self.remove_agent(agent)
showard4c5374f2008-09-04 17:02:56 +0000929 continue
930 if not agent.is_running():
showard324bf812009-01-20 23:23:38 +0000931 if not self._can_start_agent(agent, num_started_this_cycle,
showard4c5374f2008-09-04 17:02:56 +0000932 have_reached_limit):
933 have_reached_limit = True
934 continue
showard4c5374f2008-09-04 17:02:56 +0000935 num_started_this_cycle += agent.num_processes
936 agent.tick()
showardb18134f2009-03-20 20:52:18 +0000937 logging.info('%d running processes',
938 _drone_manager.total_running_processes())
mbligh36768f02008-02-22 18:28:33 +0000939
940
showard170873e2009-01-07 00:22:26 +0000941class PidfileRunMonitor(object):
942 """
943 Client must call either run() to start a new process or
944 attach_to_existing_process().
945 """
mbligh36768f02008-02-22 18:28:33 +0000946
showard170873e2009-01-07 00:22:26 +0000947 class _PidfileException(Exception):
948 """
949 Raised when there's some unexpected behavior with the pid file, but only
950 used internally (never allowed to escape this class).
951 """
mbligh36768f02008-02-22 18:28:33 +0000952
953
showard170873e2009-01-07 00:22:26 +0000954 def __init__(self):
showard35162b02009-03-03 02:17:30 +0000955 self.lost_process = False
showard170873e2009-01-07 00:22:26 +0000956 self._start_time = None
957 self.pidfile_id = None
958 self._state = drone_manager.PidfileContents()
showard2bab8f42008-11-12 18:15:22 +0000959
960
showard170873e2009-01-07 00:22:26 +0000961 def _add_nice_command(self, command, nice_level):
962 if not nice_level:
963 return command
964 return ['nice', '-n', str(nice_level)] + command
965
966
967 def _set_start_time(self):
968 self._start_time = time.time()
969
970
971 def run(self, command, working_directory, nice_level=None, log_file=None,
972 pidfile_name=None, paired_with_pidfile=None):
973 assert command is not None
974 if nice_level is not None:
975 command = ['nice', '-n', str(nice_level)] + command
976 self._set_start_time()
977 self.pidfile_id = _drone_manager.execute_command(
showardd3dc1992009-04-22 21:01:40 +0000978 command, working_directory, pidfile_name=pidfile_name,
979 log_file=log_file, paired_with_pidfile=paired_with_pidfile)
showard170873e2009-01-07 00:22:26 +0000980
981
showardd3dc1992009-04-22 21:01:40 +0000982 def attach_to_existing_process(self, execution_tag,
983 pidfile_name=_AUTOSERV_PID_FILE):
showard170873e2009-01-07 00:22:26 +0000984 self._set_start_time()
showardd3dc1992009-04-22 21:01:40 +0000985 self.pidfile_id = _drone_manager.get_pidfile_id_from(
986 execution_tag, pidfile_name=pidfile_name)
showard170873e2009-01-07 00:22:26 +0000987 _drone_manager.register_pidfile(self.pidfile_id)
mblighbb421852008-03-11 22:36:16 +0000988
989
jadmanski0afbb632008-06-06 21:10:57 +0000990 def kill(self):
showard170873e2009-01-07 00:22:26 +0000991 if self.has_process():
992 _drone_manager.kill_process(self.get_process())
mblighbb421852008-03-11 22:36:16 +0000993
mbligh36768f02008-02-22 18:28:33 +0000994
showard170873e2009-01-07 00:22:26 +0000995 def has_process(self):
showard21baa452008-10-21 00:08:39 +0000996 self._get_pidfile_info()
showard170873e2009-01-07 00:22:26 +0000997 return self._state.process is not None
showard21baa452008-10-21 00:08:39 +0000998
999
showard170873e2009-01-07 00:22:26 +00001000 def get_process(self):
showard21baa452008-10-21 00:08:39 +00001001 self._get_pidfile_info()
showard35162b02009-03-03 02:17:30 +00001002 assert self._state.process is not None
showard170873e2009-01-07 00:22:26 +00001003 return self._state.process
mblighbb421852008-03-11 22:36:16 +00001004
1005
showard170873e2009-01-07 00:22:26 +00001006 def _read_pidfile(self, use_second_read=False):
1007 assert self.pidfile_id is not None, (
1008 'You must call run() or attach_to_existing_process()')
1009 contents = _drone_manager.get_pidfile_contents(
1010 self.pidfile_id, use_second_read=use_second_read)
1011 if contents.is_invalid():
1012 self._state = drone_manager.PidfileContents()
1013 raise self._PidfileException(contents)
1014 self._state = contents
mbligh90a549d2008-03-25 23:52:34 +00001015
1016
showard21baa452008-10-21 00:08:39 +00001017 def _handle_pidfile_error(self, error, message=''):
showard170873e2009-01-07 00:22:26 +00001018 message = error + '\nProcess: %s\nPidfile: %s\n%s' % (
1019 self._state.process, self.pidfile_id, message)
showardb18134f2009-03-20 20:52:18 +00001020 logging.info(message)
showard170873e2009-01-07 00:22:26 +00001021 email_manager.manager.enqueue_notify_email(error, message)
showard35162b02009-03-03 02:17:30 +00001022 self.on_lost_process(self._state.process)
showard21baa452008-10-21 00:08:39 +00001023
1024
1025 def _get_pidfile_info_helper(self):
showard35162b02009-03-03 02:17:30 +00001026 if self.lost_process:
showard21baa452008-10-21 00:08:39 +00001027 return
mblighbb421852008-03-11 22:36:16 +00001028
showard21baa452008-10-21 00:08:39 +00001029 self._read_pidfile()
mblighbb421852008-03-11 22:36:16 +00001030
showard170873e2009-01-07 00:22:26 +00001031 if self._state.process is None:
1032 self._handle_no_process()
showard21baa452008-10-21 00:08:39 +00001033 return
mbligh90a549d2008-03-25 23:52:34 +00001034
showard21baa452008-10-21 00:08:39 +00001035 if self._state.exit_status is None:
jadmanski0afbb632008-06-06 21:10:57 +00001036 # double check whether or not autoserv is running
showard170873e2009-01-07 00:22:26 +00001037 if _drone_manager.is_process_running(self._state.process):
showard21baa452008-10-21 00:08:39 +00001038 return
mbligh90a549d2008-03-25 23:52:34 +00001039
showard170873e2009-01-07 00:22:26 +00001040 # pid but no running process - maybe process *just* exited
1041 self._read_pidfile(use_second_read=True)
showard21baa452008-10-21 00:08:39 +00001042 if self._state.exit_status is None:
jadmanski0afbb632008-06-06 21:10:57 +00001043 # autoserv exited without writing an exit code
1044 # to the pidfile
showard21baa452008-10-21 00:08:39 +00001045 self._handle_pidfile_error(
1046 'autoserv died without writing exit code')
mblighbb421852008-03-11 22:36:16 +00001047
showard21baa452008-10-21 00:08:39 +00001048
1049 def _get_pidfile_info(self):
1050 """\
1051 After completion, self._state will contain:
1052 pid=None, exit_status=None if autoserv has not yet run
1053 pid!=None, exit_status=None if autoserv is running
1054 pid!=None, exit_status!=None if autoserv has completed
1055 """
1056 try:
1057 self._get_pidfile_info_helper()
showard170873e2009-01-07 00:22:26 +00001058 except self._PidfileException, exc:
showard21baa452008-10-21 00:08:39 +00001059 self._handle_pidfile_error('Pidfile error', traceback.format_exc())
mblighbb421852008-03-11 22:36:16 +00001060
1061
showard170873e2009-01-07 00:22:26 +00001062 def _handle_no_process(self):
jadmanski0afbb632008-06-06 21:10:57 +00001063 """\
1064 Called when no pidfile is found or no pid is in the pidfile.
1065 """
showard170873e2009-01-07 00:22:26 +00001066 message = 'No pid found at %s' % self.pidfile_id
showardb18134f2009-03-20 20:52:18 +00001067 logging.info(message)
showard170873e2009-01-07 00:22:26 +00001068 if time.time() - self._start_time > PIDFILE_TIMEOUT:
1069 email_manager.manager.enqueue_notify_email(
jadmanski0afbb632008-06-06 21:10:57 +00001070 'Process has failed to write pidfile', message)
showard35162b02009-03-03 02:17:30 +00001071 self.on_lost_process()
mbligh90a549d2008-03-25 23:52:34 +00001072
1073
showard35162b02009-03-03 02:17:30 +00001074 def on_lost_process(self, process=None):
jadmanski0afbb632008-06-06 21:10:57 +00001075 """\
1076 Called when autoserv has exited without writing an exit status,
1077 or we've timed out waiting for autoserv to write a pid to the
1078 pidfile. In either case, we just return failure and the caller
1079 should signal some kind of warning.
mbligh90a549d2008-03-25 23:52:34 +00001080
showard170873e2009-01-07 00:22:26 +00001081 process is unimportant here, as it shouldn't be used by anyone.
jadmanski0afbb632008-06-06 21:10:57 +00001082 """
1083 self.lost_process = True
showard170873e2009-01-07 00:22:26 +00001084 self._state.process = process
showard21baa452008-10-21 00:08:39 +00001085 self._state.exit_status = 1
1086 self._state.num_tests_failed = 0
mbligh90a549d2008-03-25 23:52:34 +00001087
1088
jadmanski0afbb632008-06-06 21:10:57 +00001089 def exit_code(self):
showard21baa452008-10-21 00:08:39 +00001090 self._get_pidfile_info()
1091 return self._state.exit_status
1092
1093
1094 def num_tests_failed(self):
1095 self._get_pidfile_info()
1096 assert self._state.num_tests_failed is not None
1097 return self._state.num_tests_failed
mblighbb421852008-03-11 22:36:16 +00001098
1099
mbligh36768f02008-02-22 18:28:33 +00001100class Agent(object):
showard170873e2009-01-07 00:22:26 +00001101 def __init__(self, tasks, num_processes=1):
jadmanski0afbb632008-06-06 21:10:57 +00001102 self.active_task = None
showardd3dc1992009-04-22 21:01:40 +00001103 self.queue = None
jadmanski0afbb632008-06-06 21:10:57 +00001104 self.dispatcher = None
showard4c5374f2008-09-04 17:02:56 +00001105 self.num_processes = num_processes
jadmanski0afbb632008-06-06 21:10:57 +00001106
showard170873e2009-01-07 00:22:26 +00001107 self.queue_entry_ids = self._union_ids(task.queue_entry_ids
1108 for task in tasks)
1109 self.host_ids = self._union_ids(task.host_ids for task in tasks)
1110
showardd3dc1992009-04-22 21:01:40 +00001111 self._clear_queue()
jadmanski0afbb632008-06-06 21:10:57 +00001112 for task in tasks:
1113 self.add_task(task)
mbligh36768f02008-02-22 18:28:33 +00001114
1115
showardd3dc1992009-04-22 21:01:40 +00001116 def _clear_queue(self):
1117 self.queue = Queue.Queue(0)
1118
1119
showard170873e2009-01-07 00:22:26 +00001120 def _union_ids(self, id_lists):
1121 return set(itertools.chain(*id_lists))
1122
1123
jadmanski0afbb632008-06-06 21:10:57 +00001124 def add_task(self, task):
1125 self.queue.put_nowait(task)
1126 task.agent = self
mbligh36768f02008-02-22 18:28:33 +00001127
1128
jadmanski0afbb632008-06-06 21:10:57 +00001129 def tick(self):
showard21baa452008-10-21 00:08:39 +00001130 while not self.is_done():
1131 if self.active_task and not self.active_task.is_done():
1132 self.active_task.poll()
1133 if not self.active_task.is_done():
1134 return
1135 self._next_task()
mbligh36768f02008-02-22 18:28:33 +00001136
1137
jadmanski0afbb632008-06-06 21:10:57 +00001138 def _next_task(self):
showardb18134f2009-03-20 20:52:18 +00001139 logging.info("agent picking task")
jadmanski0afbb632008-06-06 21:10:57 +00001140 if self.active_task:
1141 assert self.active_task.is_done()
jadmanski0afbb632008-06-06 21:10:57 +00001142 if not self.active_task.success:
1143 self.on_task_failure()
mblighe2586682008-02-29 22:45:46 +00001144
jadmanski0afbb632008-06-06 21:10:57 +00001145 self.active_task = None
1146 if not self.is_done():
1147 self.active_task = self.queue.get_nowait()
1148 if self.active_task:
1149 self.active_task.start()
mbligh36768f02008-02-22 18:28:33 +00001150
1151
jadmanski0afbb632008-06-06 21:10:57 +00001152 def on_task_failure(self):
showardd3dc1992009-04-22 21:01:40 +00001153 self._clear_queue()
showardccbd6c52009-03-21 00:10:21 +00001154 # run failure tasks in a new Agent, so host_ids and queue_entry_ids will
1155 # get reset.
1156 new_agent = Agent(self.active_task.failure_tasks)
1157 self.dispatcher.add_agent(new_agent)
mbligh16c722d2008-03-05 00:58:44 +00001158
mblighe2586682008-02-29 22:45:46 +00001159
showard4c5374f2008-09-04 17:02:56 +00001160 def is_running(self):
jadmanski0afbb632008-06-06 21:10:57 +00001161 return self.active_task is not None
showardec113162008-05-08 00:52:49 +00001162
1163
jadmanski0afbb632008-06-06 21:10:57 +00001164 def is_done(self):
mblighd876f452008-12-03 15:09:17 +00001165 return self.active_task is None and self.queue.empty()
mbligh36768f02008-02-22 18:28:33 +00001166
1167
jadmanski0afbb632008-06-06 21:10:57 +00001168 def start(self):
1169 assert self.dispatcher
jadmanski0afbb632008-06-06 21:10:57 +00001170 self._next_task()
mbligh36768f02008-02-22 18:28:33 +00001171
jadmanski0afbb632008-06-06 21:10:57 +00001172
showardd3dc1992009-04-22 21:01:40 +00001173 def abort(self):
1174 if self.active_task:
1175 self.active_task.abort()
showard20f9bdd2009-04-29 19:48:33 +00001176 if not self.active_task.aborted: # tasks can choose to ignore aborts
1177 return
1178 self.active_task = None
1179
1180 self._clear_queue()
1181
showardd3dc1992009-04-22 21:01:40 +00001182
1183
mbligh36768f02008-02-22 18:28:33 +00001184class AgentTask(object):
showardd3dc1992009-04-22 21:01:40 +00001185 def __init__(self, cmd, working_directory=None, failure_tasks=[],
1186 pidfile_name=None, paired_with_pidfile=None):
jadmanski0afbb632008-06-06 21:10:57 +00001187 self.done = False
1188 self.failure_tasks = failure_tasks
1189 self.started = False
1190 self.cmd = cmd
showard170873e2009-01-07 00:22:26 +00001191 self._working_directory = working_directory
jadmanski0afbb632008-06-06 21:10:57 +00001192 self.task = None
1193 self.agent = None
1194 self.monitor = None
1195 self.success = None
showardd3dc1992009-04-22 21:01:40 +00001196 self.aborted = False
showard170873e2009-01-07 00:22:26 +00001197 self.queue_entry_ids = []
1198 self.host_ids = []
1199 self.log_file = None
1200
1201
1202 def _set_ids(self, host=None, queue_entries=None):
1203 if queue_entries and queue_entries != [None]:
1204 self.host_ids = [entry.host.id for entry in queue_entries]
1205 self.queue_entry_ids = [entry.id for entry in queue_entries]
1206 else:
1207 assert host
1208 self.host_ids = [host.id]
mbligh36768f02008-02-22 18:28:33 +00001209
1210
jadmanski0afbb632008-06-06 21:10:57 +00001211 def poll(self):
jadmanski0afbb632008-06-06 21:10:57 +00001212 if self.monitor:
1213 self.tick(self.monitor.exit_code())
1214 else:
1215 self.finished(False)
mbligh36768f02008-02-22 18:28:33 +00001216
1217
jadmanski0afbb632008-06-06 21:10:57 +00001218 def tick(self, exit_code):
showard170873e2009-01-07 00:22:26 +00001219 if exit_code is None:
jadmanski0afbb632008-06-06 21:10:57 +00001220 return
jadmanski0afbb632008-06-06 21:10:57 +00001221 if exit_code == 0:
1222 success = True
1223 else:
1224 success = False
mbligh36768f02008-02-22 18:28:33 +00001225
jadmanski0afbb632008-06-06 21:10:57 +00001226 self.finished(success)
mbligh36768f02008-02-22 18:28:33 +00001227
1228
jadmanski0afbb632008-06-06 21:10:57 +00001229 def is_done(self):
1230 return self.done
mbligh36768f02008-02-22 18:28:33 +00001231
1232
jadmanski0afbb632008-06-06 21:10:57 +00001233 def finished(self, success):
1234 self.done = True
1235 self.success = success
1236 self.epilog()
mbligh36768f02008-02-22 18:28:33 +00001237
1238
jadmanski0afbb632008-06-06 21:10:57 +00001239 def prolog(self):
1240 pass
mblighd64e5702008-04-04 21:39:28 +00001241
1242
jadmanski0afbb632008-06-06 21:10:57 +00001243 def create_temp_resultsdir(self, suffix=''):
showard170873e2009-01-07 00:22:26 +00001244 self.temp_results_dir = _drone_manager.get_temporary_path('agent_task')
mblighd64e5702008-04-04 21:39:28 +00001245
mbligh36768f02008-02-22 18:28:33 +00001246
jadmanski0afbb632008-06-06 21:10:57 +00001247 def cleanup(self):
showard6b733412009-04-27 20:09:18 +00001248 if self.monitor and self.monitor.has_process() and self.log_file:
showard170873e2009-01-07 00:22:26 +00001249 _drone_manager.copy_to_results_repository(
1250 self.monitor.get_process(), self.log_file)
mbligh36768f02008-02-22 18:28:33 +00001251
1252
jadmanski0afbb632008-06-06 21:10:57 +00001253 def epilog(self):
1254 self.cleanup()
mbligh36768f02008-02-22 18:28:33 +00001255
1256
jadmanski0afbb632008-06-06 21:10:57 +00001257 def start(self):
1258 assert self.agent
1259
1260 if not self.started:
1261 self.prolog()
1262 self.run()
1263
1264 self.started = True
1265
1266
1267 def abort(self):
1268 if self.monitor:
1269 self.monitor.kill()
1270 self.done = True
showardd3dc1992009-04-22 21:01:40 +00001271 self.aborted = True
jadmanski0afbb632008-06-06 21:10:57 +00001272 self.cleanup()
1273
1274
showard170873e2009-01-07 00:22:26 +00001275 def set_host_log_file(self, base_name, host):
1276 filename = '%s.%s' % (time.time(), base_name)
1277 self.log_file = os.path.join('hosts', host.hostname, filename)
1278
1279
showardde634ee2009-01-30 01:44:24 +00001280 def _get_consistent_execution_tag(self, queue_entries):
1281 first_execution_tag = queue_entries[0].execution_tag()
1282 for queue_entry in queue_entries[1:]:
1283 assert queue_entry.execution_tag() == first_execution_tag, (
1284 '%s (%s) != %s (%s)' % (queue_entry.execution_tag(),
1285 queue_entry,
1286 first_execution_tag,
1287 queue_entries[0]))
1288 return first_execution_tag
1289
1290
showard6b733412009-04-27 20:09:18 +00001291 def _copy_and_parse_results(self, queue_entries, use_monitor=None):
showardde634ee2009-01-30 01:44:24 +00001292 assert len(queue_entries) > 0
showard6b733412009-04-27 20:09:18 +00001293 if use_monitor is None:
1294 assert self.monitor
1295 use_monitor = self.monitor
1296 assert use_monitor.has_process()
showardde634ee2009-01-30 01:44:24 +00001297 execution_tag = self._get_consistent_execution_tag(queue_entries)
showard678df4f2009-02-04 21:36:39 +00001298 results_path = execution_tag + '/'
showard6b733412009-04-27 20:09:18 +00001299 _drone_manager.copy_to_results_repository(use_monitor.get_process(),
showard678df4f2009-02-04 21:36:39 +00001300 results_path)
showardde634ee2009-01-30 01:44:24 +00001301
1302 reparse_task = FinalReparseTask(queue_entries)
1303 self.agent.dispatcher.add_agent(Agent([reparse_task], num_processes=0))
1304
1305
showardd3dc1992009-04-22 21:01:40 +00001306 def run(self, pidfile_name=_AUTOSERV_PID_FILE, paired_with_pidfile=None):
jadmanski0afbb632008-06-06 21:10:57 +00001307 if self.cmd:
showard170873e2009-01-07 00:22:26 +00001308 self.monitor = PidfileRunMonitor()
1309 self.monitor.run(self.cmd, self._working_directory,
1310 nice_level=AUTOSERV_NICE_LEVEL,
showardd3dc1992009-04-22 21:01:40 +00001311 log_file=self.log_file,
1312 pidfile_name=pidfile_name,
1313 paired_with_pidfile=paired_with_pidfile)
mbligh36768f02008-02-22 18:28:33 +00001314
1315
showardd9205182009-04-27 20:09:55 +00001316class TaskWithJobKeyvals(object):
1317 """AgentTask mixin providing functionality to help with job keyval files."""
1318 _KEYVAL_FILE = 'keyval'
1319 def _format_keyval(self, key, value):
1320 return '%s=%s' % (key, value)
1321
1322
1323 def _keyval_path(self):
1324 """Subclasses must override this"""
1325 raise NotImplemented
1326
1327
1328 def _write_keyval_after_job(self, field, value):
1329 assert self.monitor
1330 if not self.monitor.has_process():
1331 return
1332 _drone_manager.write_lines_to_file(
1333 self._keyval_path(), [self._format_keyval(field, value)],
1334 paired_with_process=self.monitor.get_process())
1335
1336
1337 def _job_queued_keyval(self, job):
1338 return 'job_queued', int(time.mktime(job.created_on.timetuple()))
1339
1340
1341 def _write_job_finished(self):
1342 self._write_keyval_after_job("job_finished", int(time.time()))
1343
1344
1345class RepairTask(AgentTask, TaskWithJobKeyvals):
showarde788ea62008-11-17 21:02:47 +00001346 def __init__(self, host, queue_entry=None):
jadmanski0afbb632008-06-06 21:10:57 +00001347 """\
showard170873e2009-01-07 00:22:26 +00001348 queue_entry: queue entry to mark failed if this repair fails.
jadmanski0afbb632008-06-06 21:10:57 +00001349 """
jadmanskifb7cfb12008-07-09 14:13:21 +00001350 protection = host_protections.Protection.get_string(host.protection)
jadmanski542537f2008-07-24 14:14:56 +00001351 # normalize the protection name
1352 protection = host_protections.Protection.get_attr_name(protection)
showard170873e2009-01-07 00:22:26 +00001353
jadmanski0afbb632008-06-06 21:10:57 +00001354 self.host = host
showardccbd6c52009-03-21 00:10:21 +00001355 self.queue_entry_to_fail = queue_entry
1356 # *don't* include the queue entry in IDs -- if the queue entry is
1357 # aborted, we want to leave the repair task running
1358 self._set_ids(host=host)
showard170873e2009-01-07 00:22:26 +00001359
1360 self.create_temp_resultsdir('.repair')
showard87ba02a2009-04-20 19:37:32 +00001361 cmd = _autoserv_command_line(host.hostname, self.temp_results_dir,
1362 ['-R', '--host-protection', protection],
1363 queue_entry=queue_entry)
showard170873e2009-01-07 00:22:26 +00001364 super(RepairTask, self).__init__(cmd, self.temp_results_dir)
1365
showard170873e2009-01-07 00:22:26 +00001366 self.set_host_log_file('repair', self.host)
mblighe2586682008-02-29 22:45:46 +00001367
mbligh36768f02008-02-22 18:28:33 +00001368
jadmanski0afbb632008-06-06 21:10:57 +00001369 def prolog(self):
showardb18134f2009-03-20 20:52:18 +00001370 logging.info("repair_task starting")
jadmanski0afbb632008-06-06 21:10:57 +00001371 self.host.set_status('Repairing')
showardccbd6c52009-03-21 00:10:21 +00001372 if self.queue_entry_to_fail:
1373 self.queue_entry_to_fail.requeue()
mbligh36768f02008-02-22 18:28:33 +00001374
1375
showardd9205182009-04-27 20:09:55 +00001376 def _keyval_path(self):
1377 return os.path.join(self.temp_results_dir, self._KEYVAL_FILE)
1378
1379
showardde634ee2009-01-30 01:44:24 +00001380 def _fail_queue_entry(self):
showardccbd6c52009-03-21 00:10:21 +00001381 assert self.queue_entry_to_fail
1382
1383 if self.queue_entry_to_fail.meta_host:
1384 return # don't fail metahost entries, they'll be reassigned
1385
1386 self.queue_entry_to_fail.update_from_database()
1387 if self.queue_entry_to_fail.status != 'Queued':
1388 return # entry has been aborted
1389
1390 self.queue_entry_to_fail.set_execution_subdir()
showardd9205182009-04-27 20:09:55 +00001391 queued_key, queued_time = self._job_queued_keyval(
1392 self.queue_entry_to_fail.job)
1393 self._write_keyval_after_job(queued_key, queued_time)
1394 self._write_job_finished()
showard678df4f2009-02-04 21:36:39 +00001395 # copy results logs into the normal place for job results
1396 _drone_manager.copy_results_on_drone(
1397 self.monitor.get_process(),
1398 source_path=self.temp_results_dir + '/',
showardccbd6c52009-03-21 00:10:21 +00001399 destination_path=self.queue_entry_to_fail.execution_tag() + '/')
showard678df4f2009-02-04 21:36:39 +00001400
showardccbd6c52009-03-21 00:10:21 +00001401 self._copy_and_parse_results([self.queue_entry_to_fail])
1402 self.queue_entry_to_fail.handle_host_failure()
showardde634ee2009-01-30 01:44:24 +00001403
1404
jadmanski0afbb632008-06-06 21:10:57 +00001405 def epilog(self):
1406 super(RepairTask, self).epilog()
1407 if self.success:
1408 self.host.set_status('Ready')
1409 else:
1410 self.host.set_status('Repair Failed')
showardccbd6c52009-03-21 00:10:21 +00001411 if self.queue_entry_to_fail:
showardde634ee2009-01-30 01:44:24 +00001412 self._fail_queue_entry()
mbligh36768f02008-02-22 18:28:33 +00001413
1414
showard8fe93b52008-11-18 17:53:22 +00001415class PreJobTask(AgentTask):
showard170873e2009-01-07 00:22:26 +00001416 def epilog(self):
1417 super(PreJobTask, self).epilog()
showard8fe93b52008-11-18 17:53:22 +00001418 should_copy_results = (self.queue_entry and not self.success
1419 and not self.queue_entry.meta_host)
1420 if should_copy_results:
1421 self.queue_entry.set_execution_subdir()
showard170873e2009-01-07 00:22:26 +00001422 destination = os.path.join(self.queue_entry.execution_tag(),
1423 os.path.basename(self.log_file))
1424 _drone_manager.copy_to_results_repository(
1425 self.monitor.get_process(), self.log_file,
1426 destination_path=destination)
showard8fe93b52008-11-18 17:53:22 +00001427
1428
1429class VerifyTask(PreJobTask):
showard9976ce92008-10-15 20:28:13 +00001430 def __init__(self, queue_entry=None, host=None):
jadmanski0afbb632008-06-06 21:10:57 +00001431 assert bool(queue_entry) != bool(host)
jadmanski0afbb632008-06-06 21:10:57 +00001432 self.host = host or queue_entry.host
1433 self.queue_entry = queue_entry
mbligh36768f02008-02-22 18:28:33 +00001434
jadmanski0afbb632008-06-06 21:10:57 +00001435 self.create_temp_resultsdir('.verify')
showard87ba02a2009-04-20 19:37:32 +00001436 cmd = _autoserv_command_line(self.host.hostname, self.temp_results_dir,
1437 ['-v'], queue_entry=queue_entry)
showarde788ea62008-11-17 21:02:47 +00001438 failure_tasks = [RepairTask(self.host, queue_entry=queue_entry)]
showard170873e2009-01-07 00:22:26 +00001439 super(VerifyTask, self).__init__(cmd, self.temp_results_dir,
1440 failure_tasks=failure_tasks)
mblighe2586682008-02-29 22:45:46 +00001441
showard170873e2009-01-07 00:22:26 +00001442 self.set_host_log_file('verify', self.host)
1443 self._set_ids(host=host, queue_entries=[queue_entry])
mblighe2586682008-02-29 22:45:46 +00001444
1445
jadmanski0afbb632008-06-06 21:10:57 +00001446 def prolog(self):
showard8fe93b52008-11-18 17:53:22 +00001447 super(VerifyTask, self).prolog()
showardb18134f2009-03-20 20:52:18 +00001448 logging.info("starting verify on %s", self.host.hostname)
jadmanski0afbb632008-06-06 21:10:57 +00001449 if self.queue_entry:
1450 self.queue_entry.set_status('Verifying')
jadmanski0afbb632008-06-06 21:10:57 +00001451 self.host.set_status('Verifying')
mbligh36768f02008-02-22 18:28:33 +00001452
1453
jadmanski0afbb632008-06-06 21:10:57 +00001454 def epilog(self):
1455 super(VerifyTask, self).epilog()
mbligh36768f02008-02-22 18:28:33 +00001456
jadmanski0afbb632008-06-06 21:10:57 +00001457 if self.success:
1458 self.host.set_status('Ready')
mbligh36768f02008-02-22 18:28:33 +00001459
1460
showardd9205182009-04-27 20:09:55 +00001461class QueueTask(AgentTask, TaskWithJobKeyvals):
jadmanski0afbb632008-06-06 21:10:57 +00001462 def __init__(self, job, queue_entries, cmd):
jadmanski0afbb632008-06-06 21:10:57 +00001463 self.job = job
1464 self.queue_entries = queue_entries
showard170873e2009-01-07 00:22:26 +00001465 super(QueueTask, self).__init__(cmd, self._execution_tag())
1466 self._set_ids(queue_entries=queue_entries)
mbligh36768f02008-02-22 18:28:33 +00001467
1468
showard73ec0442009-02-07 02:05:20 +00001469 def _keyval_path(self):
showardd9205182009-04-27 20:09:55 +00001470 return os.path.join(self._execution_tag(), self._KEYVAL_FILE)
showard73ec0442009-02-07 02:05:20 +00001471
1472
1473 def _write_keyvals_before_job_helper(self, keyval_dict, keyval_path):
1474 keyval_contents = '\n'.join(self._format_keyval(key, value)
1475 for key, value in keyval_dict.iteritems())
1476 # always end with a newline to allow additional keyvals to be written
1477 keyval_contents += '\n'
1478 _drone_manager.attach_file_to_execution(self._execution_tag(),
1479 keyval_contents,
1480 file_path=keyval_path)
1481
1482
1483 def _write_keyvals_before_job(self, keyval_dict):
1484 self._write_keyvals_before_job_helper(keyval_dict, self._keyval_path())
1485
1486
showard170873e2009-01-07 00:22:26 +00001487 def _write_host_keyvals(self, host):
1488 keyval_path = os.path.join(self._execution_tag(), 'host_keyvals',
1489 host.hostname)
1490 platform, all_labels = host.platform_and_labels()
showard73ec0442009-02-07 02:05:20 +00001491 keyval_dict = dict(platform=platform, labels=','.join(all_labels))
1492 self._write_keyvals_before_job_helper(keyval_dict, keyval_path)
showardd8e548a2008-09-09 03:04:57 +00001493
1494
showard170873e2009-01-07 00:22:26 +00001495 def _execution_tag(self):
1496 return self.queue_entries[0].execution_tag()
mblighbb421852008-03-11 22:36:16 +00001497
1498
jadmanski0afbb632008-06-06 21:10:57 +00001499 def prolog(self):
showardd9205182009-04-27 20:09:55 +00001500 queued_key, queued_time = self._job_queued_keyval(self.job)
1501 self._write_keyvals_before_job({queued_key : queued_time})
jadmanski0afbb632008-06-06 21:10:57 +00001502 for queue_entry in self.queue_entries:
showard170873e2009-01-07 00:22:26 +00001503 self._write_host_keyvals(queue_entry.host)
jadmanski0afbb632008-06-06 21:10:57 +00001504 queue_entry.set_status('Running')
1505 queue_entry.host.set_status('Running')
showard21baa452008-10-21 00:08:39 +00001506 queue_entry.host.update_field('dirty', 1)
showard2bab8f42008-11-12 18:15:22 +00001507 if self.job.synch_count == 1:
jadmanski0afbb632008-06-06 21:10:57 +00001508 assert len(self.queue_entries) == 1
1509 self.job.write_to_machines_file(self.queue_entries[0])
mbligh36768f02008-02-22 18:28:33 +00001510
1511
showard35162b02009-03-03 02:17:30 +00001512 def _write_lost_process_error_file(self):
1513 error_file_path = os.path.join(self._execution_tag(), 'job_failure')
1514 _drone_manager.write_lines_to_file(error_file_path,
1515 [_LOST_PROCESS_ERROR])
1516
1517
showardd3dc1992009-04-22 21:01:40 +00001518 def _finish_task(self):
showardd9205182009-04-27 20:09:55 +00001519 self._write_job_finished()
1520
showardd3dc1992009-04-22 21:01:40 +00001521 # both of these conditionals can be true, iff the process ran, wrote a
1522 # pid to its pidfile, and then exited without writing an exit code
showard35162b02009-03-03 02:17:30 +00001523 if self.monitor.has_process():
showardd3dc1992009-04-22 21:01:40 +00001524 gather_task = GatherLogsTask(self.job, self.queue_entries)
1525 self.agent.dispatcher.add_agent(Agent(tasks=[gather_task]))
showard35162b02009-03-03 02:17:30 +00001526
1527 if self.monitor.lost_process:
1528 self._write_lost_process_error_file()
1529 for queue_entry in self.queue_entries:
1530 queue_entry.set_status(models.HostQueueEntry.Status.FAILED)
jadmanskif7fa2cc2008-10-01 14:13:23 +00001531
1532
showardcbd74612008-11-19 21:42:02 +00001533 def _write_status_comment(self, comment):
showard170873e2009-01-07 00:22:26 +00001534 _drone_manager.write_lines_to_file(
1535 os.path.join(self._execution_tag(), 'status.log'),
1536 ['INFO\t----\t----\t' + comment],
showard35162b02009-03-03 02:17:30 +00001537 paired_with_process=self.monitor.get_process())
showardcbd74612008-11-19 21:42:02 +00001538
1539
jadmanskif7fa2cc2008-10-01 14:13:23 +00001540 def _log_abort(self):
showard170873e2009-01-07 00:22:26 +00001541 if not self.monitor or not self.monitor.has_process():
1542 return
1543
jadmanskif7fa2cc2008-10-01 14:13:23 +00001544 # build up sets of all the aborted_by and aborted_on values
1545 aborted_by, aborted_on = set(), set()
1546 for queue_entry in self.queue_entries:
1547 if queue_entry.aborted_by:
1548 aborted_by.add(queue_entry.aborted_by)
1549 t = int(time.mktime(queue_entry.aborted_on.timetuple()))
1550 aborted_on.add(t)
1551
1552 # extract some actual, unique aborted by value and write it out
1553 assert len(aborted_by) <= 1
1554 if len(aborted_by) == 1:
showardcbd74612008-11-19 21:42:02 +00001555 aborted_by_value = aborted_by.pop()
1556 aborted_on_value = max(aborted_on)
1557 else:
1558 aborted_by_value = 'autotest_system'
1559 aborted_on_value = int(time.time())
showard170873e2009-01-07 00:22:26 +00001560
showarda0382352009-02-11 23:36:43 +00001561 self._write_keyval_after_job("aborted_by", aborted_by_value)
1562 self._write_keyval_after_job("aborted_on", aborted_on_value)
showard170873e2009-01-07 00:22:26 +00001563
showardcbd74612008-11-19 21:42:02 +00001564 aborted_on_string = str(datetime.datetime.fromtimestamp(
1565 aborted_on_value))
1566 self._write_status_comment('Job aborted by %s on %s' %
1567 (aborted_by_value, aborted_on_string))
jadmanskic2ac77f2008-05-16 21:44:04 +00001568
1569
jadmanski0afbb632008-06-06 21:10:57 +00001570 def abort(self):
1571 super(QueueTask, self).abort()
jadmanskif7fa2cc2008-10-01 14:13:23 +00001572 self._log_abort()
showardd3dc1992009-04-22 21:01:40 +00001573 self._finish_task()
showard21baa452008-10-21 00:08:39 +00001574
1575
jadmanski0afbb632008-06-06 21:10:57 +00001576 def epilog(self):
1577 super(QueueTask, self).epilog()
showardd3dc1992009-04-22 21:01:40 +00001578 self._finish_task()
1579 logging.info("queue_task finished with success=%s", self.success)
mbligh36768f02008-02-22 18:28:33 +00001580
1581
mblighbb421852008-03-11 22:36:16 +00001582class RecoveryQueueTask(QueueTask):
jadmanski0afbb632008-06-06 21:10:57 +00001583 def __init__(self, job, queue_entries, run_monitor):
showard170873e2009-01-07 00:22:26 +00001584 super(RecoveryQueueTask, self).__init__(job, queue_entries, cmd=None)
jadmanski0afbb632008-06-06 21:10:57 +00001585 self.run_monitor = run_monitor
mblighbb421852008-03-11 22:36:16 +00001586
1587
jadmanski0afbb632008-06-06 21:10:57 +00001588 def run(self):
1589 self.monitor = self.run_monitor
mblighbb421852008-03-11 22:36:16 +00001590
1591
jadmanski0afbb632008-06-06 21:10:57 +00001592 def prolog(self):
1593 # recovering an existing process - don't do prolog
1594 pass
mblighbb421852008-03-11 22:36:16 +00001595
1596
showardd3dc1992009-04-22 21:01:40 +00001597class PostJobTask(AgentTask):
1598 def __init__(self, queue_entries, pidfile_name, logfile_name,
1599 run_monitor=None):
1600 """
1601 If run_monitor != None, we're recovering a running task.
1602 """
1603 self._queue_entries = queue_entries
1604 self._pidfile_name = pidfile_name
1605 self._run_monitor = run_monitor
1606
1607 self._execution_tag = self._get_consistent_execution_tag(queue_entries)
1608 self._results_dir = _drone_manager.absolute_path(self._execution_tag)
1609 self._autoserv_monitor = PidfileRunMonitor()
1610 self._autoserv_monitor.attach_to_existing_process(self._execution_tag)
1611 self._paired_with_pidfile = self._autoserv_monitor.pidfile_id
1612
1613 if _testing_mode:
1614 command = 'true'
1615 else:
1616 command = self._generate_command(self._results_dir)
1617
1618 super(PostJobTask, self).__init__(cmd=command,
1619 working_directory=self._execution_tag)
1620
1621 self.log_file = os.path.join(self._execution_tag, logfile_name)
1622 self._final_status = self._determine_final_status()
1623
1624
1625 def _generate_command(self, results_dir):
1626 raise NotImplementedError('Subclasses must override this')
1627
1628
1629 def _job_was_aborted(self):
1630 was_aborted = None
1631 for queue_entry in self._queue_entries:
1632 queue_entry.update_from_database()
1633 if was_aborted is None: # first queue entry
1634 was_aborted = bool(queue_entry.aborted)
1635 elif was_aborted != bool(queue_entry.aborted): # subsequent entries
1636 email_manager.manager.enqueue_notify_email(
1637 'Inconsistent abort state',
1638 'Queue entries have inconsistent abort state: ' +
1639 ', '.join('%s (%s)' % (queue_entry, queue_entry.aborted)))
1640 # don't crash here, just assume true
1641 return True
1642 return was_aborted
1643
1644
1645 def _determine_final_status(self):
1646 if self._job_was_aborted():
1647 return models.HostQueueEntry.Status.ABORTED
1648
1649 # we'll use a PidfileRunMonitor to read the autoserv exit status
1650 if self._autoserv_monitor.exit_code() == 0:
1651 return models.HostQueueEntry.Status.COMPLETED
1652 return models.HostQueueEntry.Status.FAILED
1653
1654
1655 def run(self):
1656 if self._run_monitor is not None:
1657 self.monitor = self._run_monitor
1658 else:
1659 # make sure we actually have results to work with.
1660 # this should never happen in normal operation.
1661 if not self._autoserv_monitor.has_process():
1662 email_manager.manager.enqueue_notify_email(
1663 'No results in post-job task',
1664 'No results in post-job task at %s' %
1665 self._autoserv_monitor.pidfile_id)
1666 self.finished(False)
1667 return
1668
1669 super(PostJobTask, self).run(
1670 pidfile_name=self._pidfile_name,
1671 paired_with_pidfile=self._paired_with_pidfile)
1672
1673
1674 def _set_all_statuses(self, status):
1675 for queue_entry in self._queue_entries:
1676 queue_entry.set_status(status)
1677
1678
1679 def abort(self):
1680 # override AgentTask.abort() to avoid killing the process and ending
1681 # the task. post-job tasks continue when the job is aborted.
1682 pass
1683
1684
1685class GatherLogsTask(PostJobTask):
1686 """
1687 Task responsible for
1688 * gathering uncollected logs (if Autoserv crashed hard or was killed)
1689 * copying logs to the results repository
1690 * spawning CleanupTasks for hosts, if necessary
1691 * spawning a FinalReparseTask for the job
1692 """
1693 def __init__(self, job, queue_entries, run_monitor=None):
1694 self._job = job
1695 super(GatherLogsTask, self).__init__(
1696 queue_entries, pidfile_name=_CRASHINFO_PID_FILE,
1697 logfile_name='.collect_crashinfo.log', run_monitor=run_monitor)
1698 self._set_ids(queue_entries=queue_entries)
1699
1700
1701 def _generate_command(self, results_dir):
1702 host_list = ','.join(queue_entry.host.hostname
1703 for queue_entry in self._queue_entries)
1704 return [_autoserv_path , '-p', '--collect-crashinfo', '-m', host_list,
1705 '-r', results_dir]
1706
1707
1708 def prolog(self):
1709 super(GatherLogsTask, self).prolog()
1710 self._set_all_statuses(models.HostQueueEntry.Status.GATHERING)
1711
1712
1713 def _reboot_hosts(self):
1714 reboot_after = self._job.reboot_after
1715 do_reboot = False
showard6b733412009-04-27 20:09:18 +00001716 if self._final_status == models.HostQueueEntry.Status.ABORTED:
1717 do_reboot = True
1718 elif reboot_after == models.RebootAfter.ALWAYS:
showardd3dc1992009-04-22 21:01:40 +00001719 do_reboot = True
1720 elif reboot_after == models.RebootAfter.IF_ALL_TESTS_PASSED:
1721 final_success = (
1722 self._final_status == models.HostQueueEntry.Status.COMPLETED)
1723 num_tests_failed = self._autoserv_monitor.num_tests_failed()
1724 do_reboot = (final_success and num_tests_failed == 0)
1725
1726 for queue_entry in self._queue_entries:
1727 if do_reboot:
1728 # don't pass the queue entry to the CleanupTask. if the cleanup
1729 # fails, the job doesn't care -- it's over.
1730 cleanup_task = CleanupTask(host=queue_entry.host)
1731 self.agent.dispatcher.add_agent(Agent([cleanup_task]))
1732 else:
1733 queue_entry.host.set_status('Ready')
1734
1735
1736 def epilog(self):
1737 super(GatherLogsTask, self).epilog()
showard6b733412009-04-27 20:09:18 +00001738 self._copy_and_parse_results(self._queue_entries,
1739 use_monitor=self._autoserv_monitor)
showardd3dc1992009-04-22 21:01:40 +00001740 self._reboot_hosts()
1741
1742
showard8fe93b52008-11-18 17:53:22 +00001743class CleanupTask(PreJobTask):
showardfa8629c2008-11-04 16:51:23 +00001744 def __init__(self, host=None, queue_entry=None):
1745 assert bool(host) ^ bool(queue_entry)
1746 if queue_entry:
1747 host = queue_entry.get_host()
showardfa8629c2008-11-04 16:51:23 +00001748 self.queue_entry = queue_entry
jadmanski0afbb632008-06-06 21:10:57 +00001749 self.host = host
showard170873e2009-01-07 00:22:26 +00001750
1751 self.create_temp_resultsdir('.cleanup')
showard87ba02a2009-04-20 19:37:32 +00001752 self.cmd = _autoserv_command_line(host.hostname, self.temp_results_dir,
1753 ['--cleanup'],
1754 queue_entry=queue_entry)
showarde788ea62008-11-17 21:02:47 +00001755 repair_task = RepairTask(host, queue_entry=queue_entry)
showard170873e2009-01-07 00:22:26 +00001756 super(CleanupTask, self).__init__(self.cmd, self.temp_results_dir,
1757 failure_tasks=[repair_task])
1758
1759 self._set_ids(host=host, queue_entries=[queue_entry])
1760 self.set_host_log_file('cleanup', self.host)
mbligh16c722d2008-03-05 00:58:44 +00001761
mblighd5c95802008-03-05 00:33:46 +00001762
jadmanski0afbb632008-06-06 21:10:57 +00001763 def prolog(self):
showard8fe93b52008-11-18 17:53:22 +00001764 super(CleanupTask, self).prolog()
showardb18134f2009-03-20 20:52:18 +00001765 logging.info("starting cleanup task for host: %s", self.host.hostname)
showard45ae8192008-11-05 19:32:53 +00001766 self.host.set_status("Cleaning")
mblighd5c95802008-03-05 00:33:46 +00001767
mblighd5c95802008-03-05 00:33:46 +00001768
showard21baa452008-10-21 00:08:39 +00001769 def epilog(self):
showard45ae8192008-11-05 19:32:53 +00001770 super(CleanupTask, self).epilog()
showard21baa452008-10-21 00:08:39 +00001771 if self.success:
showardfa8629c2008-11-04 16:51:23 +00001772 self.host.set_status('Ready')
showard21baa452008-10-21 00:08:39 +00001773 self.host.update_field('dirty', 0)
1774
1775
showardd3dc1992009-04-22 21:01:40 +00001776class FinalReparseTask(PostJobTask):
showard97aed502008-11-04 02:01:24 +00001777 _num_running_parses = 0
1778
showardd3dc1992009-04-22 21:01:40 +00001779 def __init__(self, queue_entries, run_monitor=None):
1780 super(FinalReparseTask, self).__init__(queue_entries,
1781 pidfile_name=_PARSER_PID_FILE,
1782 logfile_name='.parse.log',
1783 run_monitor=run_monitor)
showard170873e2009-01-07 00:22:26 +00001784 # don't use _set_ids, since we don't want to set the host_ids
1785 self.queue_entry_ids = [entry.id for entry in queue_entries]
showard97aed502008-11-04 02:01:24 +00001786 self._parse_started = False
1787
showard97aed502008-11-04 02:01:24 +00001788
1789 @classmethod
1790 def _increment_running_parses(cls):
1791 cls._num_running_parses += 1
1792
1793
1794 @classmethod
1795 def _decrement_running_parses(cls):
1796 cls._num_running_parses -= 1
1797
1798
1799 @classmethod
1800 def _can_run_new_parse(cls):
showardd1ee1dd2009-01-07 21:33:08 +00001801 return (cls._num_running_parses <
1802 scheduler_config.config.max_parse_processes)
showard97aed502008-11-04 02:01:24 +00001803
1804
1805 def prolog(self):
1806 super(FinalReparseTask, self).prolog()
showardd3dc1992009-04-22 21:01:40 +00001807 self._set_all_statuses(models.HostQueueEntry.Status.PARSING)
showard97aed502008-11-04 02:01:24 +00001808
1809
1810 def epilog(self):
1811 super(FinalReparseTask, self).epilog()
showardd3dc1992009-04-22 21:01:40 +00001812 self._set_all_statuses(self._final_status)
showard97aed502008-11-04 02:01:24 +00001813
1814
showardd3dc1992009-04-22 21:01:40 +00001815 def _generate_command(self, results_dir):
showard170873e2009-01-07 00:22:26 +00001816 return [_parser_path, '--write-pidfile', '-l', '2', '-r', '-o',
showardd3dc1992009-04-22 21:01:40 +00001817 results_dir]
showard97aed502008-11-04 02:01:24 +00001818
1819
1820 def poll(self):
1821 # override poll to keep trying to start until the parse count goes down
1822 # and we can, at which point we revert to default behavior
1823 if self._parse_started:
1824 super(FinalReparseTask, self).poll()
1825 else:
1826 self._try_starting_parse()
1827
1828
1829 def run(self):
1830 # override run() to not actually run unless we can
1831 self._try_starting_parse()
1832
1833
1834 def _try_starting_parse(self):
1835 if not self._can_run_new_parse():
1836 return
showard170873e2009-01-07 00:22:26 +00001837
showard97aed502008-11-04 02:01:24 +00001838 # actually run the parse command
showardd3dc1992009-04-22 21:01:40 +00001839 super(FinalReparseTask, self).run()
showard170873e2009-01-07 00:22:26 +00001840
showard97aed502008-11-04 02:01:24 +00001841 self._increment_running_parses()
1842 self._parse_started = True
1843
1844
1845 def finished(self, success):
1846 super(FinalReparseTask, self).finished(success)
showard678df4f2009-02-04 21:36:39 +00001847 if self._parse_started:
1848 self._decrement_running_parses()
showard97aed502008-11-04 02:01:24 +00001849
1850
showardc9ae1782009-01-30 01:42:37 +00001851class SetEntryPendingTask(AgentTask):
1852 def __init__(self, queue_entry):
1853 super(SetEntryPendingTask, self).__init__(cmd='')
1854 self._queue_entry = queue_entry
1855 self._set_ids(queue_entries=[queue_entry])
1856
1857
1858 def run(self):
1859 agent = self._queue_entry.on_pending()
1860 if agent:
1861 self.agent.dispatcher.add_agent(agent)
1862 self.finished(True)
1863
1864
showarda3c58572009-03-12 20:36:59 +00001865class DBError(Exception):
1866 """Raised by the DBObject constructor when its select fails."""
1867
1868
mbligh36768f02008-02-22 18:28:33 +00001869class DBObject(object):
showarda3c58572009-03-12 20:36:59 +00001870 """A miniature object relational model for the database."""
showard6ae5ea92009-02-25 00:11:51 +00001871
1872 # Subclasses MUST override these:
1873 _table_name = ''
1874 _fields = ()
1875
showarda3c58572009-03-12 20:36:59 +00001876 # A mapping from (type, id) to the instance of the object for that
1877 # particular id. This prevents us from creating new Job() and Host()
1878 # instances for every HostQueueEntry object that we instantiate as
1879 # multiple HQEs often share the same Job.
1880 _instances_by_type_and_id = weakref.WeakValueDictionary()
1881 _initialized = False
showard6ae5ea92009-02-25 00:11:51 +00001882
showarda3c58572009-03-12 20:36:59 +00001883
1884 def __new__(cls, id=None, **kwargs):
1885 """
1886 Look to see if we already have an instance for this particular type
1887 and id. If so, use it instead of creating a duplicate instance.
1888 """
1889 if id is not None:
1890 instance = cls._instances_by_type_and_id.get((cls, id))
1891 if instance:
1892 return instance
1893 return super(DBObject, cls).__new__(cls, id=id, **kwargs)
1894
1895
1896 def __init__(self, id=None, row=None, new_record=False, always_query=True):
jadmanski0afbb632008-06-06 21:10:57 +00001897 assert (bool(id) != bool(row))
showard6ae5ea92009-02-25 00:11:51 +00001898 assert self._table_name, '_table_name must be defined in your class'
1899 assert self._fields, '_fields must be defined in your class'
showarda3c58572009-03-12 20:36:59 +00001900 if not new_record:
1901 if self._initialized and not always_query:
1902 return # We've already been initialized.
1903 if id is None:
1904 id = row[0]
1905 # Tell future constructors to use us instead of re-querying while
1906 # this instance is still around.
1907 self._instances_by_type_and_id[(type(self), id)] = self
mbligh36768f02008-02-22 18:28:33 +00001908
showard6ae5ea92009-02-25 00:11:51 +00001909 self.__table = self._table_name
mbligh36768f02008-02-22 18:28:33 +00001910
jadmanski0afbb632008-06-06 21:10:57 +00001911 self.__new_record = new_record
mbligh36768f02008-02-22 18:28:33 +00001912
jadmanski0afbb632008-06-06 21:10:57 +00001913 if row is None:
showardccbd6c52009-03-21 00:10:21 +00001914 row = self._fetch_row_from_db(id)
mbligh36768f02008-02-22 18:28:33 +00001915
showarda3c58572009-03-12 20:36:59 +00001916 if self._initialized:
1917 differences = self._compare_fields_in_row(row)
1918 if differences:
showard7629f142009-03-27 21:02:02 +00001919 logging.warn(
1920 'initialized %s %s instance requery is updating: %s',
1921 type(self), self.id, differences)
showard2bab8f42008-11-12 18:15:22 +00001922 self._update_fields_from_row(row)
showarda3c58572009-03-12 20:36:59 +00001923 self._initialized = True
1924
1925
1926 @classmethod
1927 def _clear_instance_cache(cls):
1928 """Used for testing, clear the internal instance cache."""
1929 cls._instances_by_type_and_id.clear()
1930
1931
showardccbd6c52009-03-21 00:10:21 +00001932 def _fetch_row_from_db(self, row_id):
1933 sql = 'SELECT * FROM %s WHERE ID=%%s' % self.__table
1934 rows = _db.execute(sql, (row_id,))
1935 if not rows:
showard76e29d12009-04-15 21:53:10 +00001936 raise DBError("row not found (table=%s, row id=%s)"
1937 % (self.__table, row_id))
showardccbd6c52009-03-21 00:10:21 +00001938 return rows[0]
1939
1940
showarda3c58572009-03-12 20:36:59 +00001941 def _assert_row_length(self, row):
1942 assert len(row) == len(self._fields), (
1943 "table = %s, row = %s/%d, fields = %s/%d" % (
1944 self.__table, row, len(row), self._fields, len(self._fields)))
1945
1946
1947 def _compare_fields_in_row(self, row):
1948 """
1949 Given a row as returned by a SELECT query, compare it to our existing
1950 in memory fields.
1951
1952 @param row - A sequence of values corresponding to fields named in
1953 The class attribute _fields.
1954
1955 @returns A dictionary listing the differences keyed by field name
1956 containing tuples of (current_value, row_value).
1957 """
1958 self._assert_row_length(row)
1959 differences = {}
1960 for field, row_value in itertools.izip(self._fields, row):
1961 current_value = getattr(self, field)
1962 if current_value != row_value:
1963 differences[field] = (current_value, row_value)
1964 return differences
showard2bab8f42008-11-12 18:15:22 +00001965
1966
1967 def _update_fields_from_row(self, row):
showarda3c58572009-03-12 20:36:59 +00001968 """
1969 Update our field attributes using a single row returned by SELECT.
1970
1971 @param row - A sequence of values corresponding to fields named in
1972 the class fields list.
1973 """
1974 self._assert_row_length(row)
mbligh36768f02008-02-22 18:28:33 +00001975
showard2bab8f42008-11-12 18:15:22 +00001976 self._valid_fields = set()
showarda3c58572009-03-12 20:36:59 +00001977 for field, value in itertools.izip(self._fields, row):
showard2bab8f42008-11-12 18:15:22 +00001978 setattr(self, field, value)
1979 self._valid_fields.add(field)
mbligh36768f02008-02-22 18:28:33 +00001980
showard2bab8f42008-11-12 18:15:22 +00001981 self._valid_fields.remove('id')
mbligh36768f02008-02-22 18:28:33 +00001982
mblighe2586682008-02-29 22:45:46 +00001983
showardccbd6c52009-03-21 00:10:21 +00001984 def update_from_database(self):
1985 assert self.id is not None
1986 row = self._fetch_row_from_db(self.id)
1987 self._update_fields_from_row(row)
1988
1989
jadmanski0afbb632008-06-06 21:10:57 +00001990 def count(self, where, table = None):
1991 if not table:
1992 table = self.__table
mbligh36768f02008-02-22 18:28:33 +00001993
jadmanski0afbb632008-06-06 21:10:57 +00001994 rows = _db.execute("""
1995 SELECT count(*) FROM %s
1996 WHERE %s
1997 """ % (table, where))
mbligh6f8bab42008-02-29 22:45:14 +00001998
jadmanski0afbb632008-06-06 21:10:57 +00001999 assert len(rows) == 1
2000
2001 return int(rows[0][0])
mbligh36768f02008-02-22 18:28:33 +00002002
2003
showardd3dc1992009-04-22 21:01:40 +00002004 def update_field(self, field, value):
showard2bab8f42008-11-12 18:15:22 +00002005 assert field in self._valid_fields
mbligh36768f02008-02-22 18:28:33 +00002006
showard2bab8f42008-11-12 18:15:22 +00002007 if getattr(self, field) == value:
jadmanski0afbb632008-06-06 21:10:57 +00002008 return
mbligh36768f02008-02-22 18:28:33 +00002009
mblighf8c624d2008-07-03 16:58:45 +00002010 query = "UPDATE %s SET %s = %%s WHERE id = %%s" % (self.__table, field)
jadmanski0afbb632008-06-06 21:10:57 +00002011 _db.execute(query, (value, self.id))
2012
showard2bab8f42008-11-12 18:15:22 +00002013 setattr(self, field, value)
mbligh36768f02008-02-22 18:28:33 +00002014
2015
jadmanski0afbb632008-06-06 21:10:57 +00002016 def save(self):
2017 if self.__new_record:
showard6ae5ea92009-02-25 00:11:51 +00002018 keys = self._fields[1:] # avoid id
jadmanski0afbb632008-06-06 21:10:57 +00002019 columns = ','.join([str(key) for key in keys])
showard76e29d12009-04-15 21:53:10 +00002020 values = []
2021 for key in keys:
2022 value = getattr(self, key)
2023 if value is None:
2024 values.append('NULL')
2025 else:
2026 values.append('"%s"' % value)
showard89f84db2009-03-12 20:39:13 +00002027 values_str = ','.join(values)
2028 query = ('INSERT INTO %s (%s) VALUES (%s)' %
2029 (self.__table, columns, values_str))
jadmanski0afbb632008-06-06 21:10:57 +00002030 _db.execute(query)
showard89f84db2009-03-12 20:39:13 +00002031 # Update our id to the one the database just assigned to us.
2032 self.id = _db.execute('SELECT LAST_INSERT_ID()')[0][0]
mbligh36768f02008-02-22 18:28:33 +00002033
2034
jadmanski0afbb632008-06-06 21:10:57 +00002035 def delete(self):
showarda3c58572009-03-12 20:36:59 +00002036 self._instances_by_type_and_id.pop((type(self), id), None)
2037 self._initialized = False
2038 self._valid_fields.clear()
jadmanski0afbb632008-06-06 21:10:57 +00002039 query = 'DELETE FROM %s WHERE id=%%s' % self.__table
2040 _db.execute(query, (self.id,))
mblighe2586682008-02-29 22:45:46 +00002041
2042
showard63a34772008-08-18 19:32:50 +00002043 @staticmethod
2044 def _prefix_with(string, prefix):
2045 if string:
2046 string = prefix + string
2047 return string
2048
2049
jadmanski0afbb632008-06-06 21:10:57 +00002050 @classmethod
showard989f25d2008-10-01 11:38:11 +00002051 def fetch(cls, where='', params=(), joins='', order_by=''):
showard89f84db2009-03-12 20:39:13 +00002052 """
2053 Construct instances of our class based on the given database query.
2054
2055 @yields One class instance for each row fetched.
2056 """
showard63a34772008-08-18 19:32:50 +00002057 order_by = cls._prefix_with(order_by, 'ORDER BY ')
2058 where = cls._prefix_with(where, 'WHERE ')
2059 query = ('SELECT %(table)s.* FROM %(table)s %(joins)s '
showard6ae5ea92009-02-25 00:11:51 +00002060 '%(where)s %(order_by)s' % {'table' : cls._table_name,
showard63a34772008-08-18 19:32:50 +00002061 'joins' : joins,
2062 'where' : where,
2063 'order_by' : order_by})
2064 rows = _db.execute(query, params)
jadmanski0afbb632008-06-06 21:10:57 +00002065 for row in rows:
2066 yield cls(row=row)
mblighe2586682008-02-29 22:45:46 +00002067
mbligh36768f02008-02-22 18:28:33 +00002068
2069class IneligibleHostQueue(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002070 _table_name = 'ineligible_host_queues'
2071 _fields = ('id', 'job_id', 'host_id')
showard04c82c52008-05-29 19:38:12 +00002072
2073
showard89f84db2009-03-12 20:39:13 +00002074class AtomicGroup(DBObject):
2075 _table_name = 'atomic_groups'
showard205fd602009-03-21 00:17:35 +00002076 _fields = ('id', 'name', 'description', 'max_number_of_machines',
2077 'invalid')
showard89f84db2009-03-12 20:39:13 +00002078
2079
showard989f25d2008-10-01 11:38:11 +00002080class Label(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002081 _table_name = 'labels'
2082 _fields = ('id', 'name', 'kernel_config', 'platform', 'invalid',
showard89f84db2009-03-12 20:39:13 +00002083 'only_if_needed', 'atomic_group_id')
showard989f25d2008-10-01 11:38:11 +00002084
2085
mbligh36768f02008-02-22 18:28:33 +00002086class Host(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002087 _table_name = 'hosts'
2088 _fields = ('id', 'hostname', 'locked', 'synch_id', 'status',
2089 'invalid', 'protection', 'locked_by_id', 'lock_time', 'dirty')
2090
2091
jadmanski0afbb632008-06-06 21:10:57 +00002092 def current_task(self):
2093 rows = _db.execute("""
2094 SELECT * FROM host_queue_entries WHERE host_id=%s AND NOT complete AND active
2095 """, (self.id,))
2096
2097 if len(rows) == 0:
2098 return None
2099 else:
2100 assert len(rows) == 1
2101 results = rows[0];
jadmanski0afbb632008-06-06 21:10:57 +00002102 return HostQueueEntry(row=results)
mbligh36768f02008-02-22 18:28:33 +00002103
2104
jadmanski0afbb632008-06-06 21:10:57 +00002105 def yield_work(self):
showardb18134f2009-03-20 20:52:18 +00002106 logging.info("%s yielding work", self.hostname)
jadmanski0afbb632008-06-06 21:10:57 +00002107 if self.current_task():
2108 self.current_task().requeue()
2109
showard6ae5ea92009-02-25 00:11:51 +00002110
jadmanski0afbb632008-06-06 21:10:57 +00002111 def set_status(self,status):
showardb18134f2009-03-20 20:52:18 +00002112 logging.info('%s -> %s', self.hostname, status)
jadmanski0afbb632008-06-06 21:10:57 +00002113 self.update_field('status',status)
mbligh36768f02008-02-22 18:28:33 +00002114
2115
showard170873e2009-01-07 00:22:26 +00002116 def platform_and_labels(self):
showardd8e548a2008-09-09 03:04:57 +00002117 """
showard170873e2009-01-07 00:22:26 +00002118 Returns a tuple (platform_name, list_of_all_label_names).
showardd8e548a2008-09-09 03:04:57 +00002119 """
2120 rows = _db.execute("""
showard170873e2009-01-07 00:22:26 +00002121 SELECT labels.name, labels.platform
showardd8e548a2008-09-09 03:04:57 +00002122 FROM labels
2123 INNER JOIN hosts_labels ON labels.id = hosts_labels.label_id
showard170873e2009-01-07 00:22:26 +00002124 WHERE hosts_labels.host_id = %s
showardd8e548a2008-09-09 03:04:57 +00002125 ORDER BY labels.name
2126 """, (self.id,))
showard170873e2009-01-07 00:22:26 +00002127 platform = None
2128 all_labels = []
2129 for label_name, is_platform in rows:
2130 if is_platform:
2131 platform = label_name
2132 all_labels.append(label_name)
2133 return platform, all_labels
2134
2135
2136 def reverify_tasks(self):
2137 cleanup_task = CleanupTask(host=self)
2138 verify_task = VerifyTask(host=self)
2139 # just to make sure this host does not get taken away
2140 self.set_status('Cleaning')
2141 return [cleanup_task, verify_task]
showardd8e548a2008-09-09 03:04:57 +00002142
2143
mbligh36768f02008-02-22 18:28:33 +00002144class HostQueueEntry(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002145 _table_name = 'host_queue_entries'
2146 _fields = ('id', 'job_id', 'host_id', 'status', 'meta_host',
showard89f84db2009-03-12 20:39:13 +00002147 'active', 'complete', 'deleted', 'execution_subdir',
showardd3dc1992009-04-22 21:01:40 +00002148 'atomic_group_id', 'aborted')
showard6ae5ea92009-02-25 00:11:51 +00002149
2150
showarda3c58572009-03-12 20:36:59 +00002151 def __init__(self, id=None, row=None, **kwargs):
jadmanski0afbb632008-06-06 21:10:57 +00002152 assert id or row
showarda3c58572009-03-12 20:36:59 +00002153 super(HostQueueEntry, self).__init__(id=id, row=row, **kwargs)
jadmanski0afbb632008-06-06 21:10:57 +00002154 self.job = Job(self.job_id)
mbligh36768f02008-02-22 18:28:33 +00002155
jadmanski0afbb632008-06-06 21:10:57 +00002156 if self.host_id:
2157 self.host = Host(self.host_id)
2158 else:
2159 self.host = None
mbligh36768f02008-02-22 18:28:33 +00002160
showard170873e2009-01-07 00:22:26 +00002161 self.queue_log_path = os.path.join(self.job.tag(),
jadmanski0afbb632008-06-06 21:10:57 +00002162 'queue.log.' + str(self.id))
mbligh36768f02008-02-22 18:28:33 +00002163
2164
showard89f84db2009-03-12 20:39:13 +00002165 @classmethod
2166 def clone(cls, template):
2167 """
2168 Creates a new row using the values from a template instance.
2169
2170 The new instance will not exist in the database or have a valid
2171 id attribute until its save() method is called.
2172 """
2173 assert isinstance(template, cls)
2174 new_row = [getattr(template, field) for field in cls._fields]
2175 clone = cls(row=new_row, new_record=True)
2176 clone.id = None
2177 return clone
2178
2179
showardc85c21b2008-11-24 22:17:37 +00002180 def _view_job_url(self):
2181 return "%s#tab_id=view_job&object_id=%s" % (_base_url, self.job.id)
2182
2183
jadmanski0afbb632008-06-06 21:10:57 +00002184 def set_host(self, host):
2185 if host:
2186 self.queue_log_record('Assigning host ' + host.hostname)
2187 self.update_field('host_id', host.id)
2188 self.update_field('active', True)
2189 self.block_host(host.id)
2190 else:
2191 self.queue_log_record('Releasing host')
2192 self.unblock_host(self.host.id)
2193 self.update_field('host_id', None)
mbligh36768f02008-02-22 18:28:33 +00002194
jadmanski0afbb632008-06-06 21:10:57 +00002195 self.host = host
mbligh36768f02008-02-22 18:28:33 +00002196
2197
jadmanski0afbb632008-06-06 21:10:57 +00002198 def get_host(self):
2199 return self.host
mbligh36768f02008-02-22 18:28:33 +00002200
2201
jadmanski0afbb632008-06-06 21:10:57 +00002202 def queue_log_record(self, log_line):
2203 now = str(datetime.datetime.now())
showard170873e2009-01-07 00:22:26 +00002204 _drone_manager.write_lines_to_file(self.queue_log_path,
2205 [now + ' ' + log_line])
mbligh36768f02008-02-22 18:28:33 +00002206
2207
jadmanski0afbb632008-06-06 21:10:57 +00002208 def block_host(self, host_id):
showardb18134f2009-03-20 20:52:18 +00002209 logging.info("creating block %s/%s", self.job.id, host_id)
jadmanski0afbb632008-06-06 21:10:57 +00002210 row = [0, self.job.id, host_id]
2211 block = IneligibleHostQueue(row=row, new_record=True)
2212 block.save()
mblighe2586682008-02-29 22:45:46 +00002213
2214
jadmanski0afbb632008-06-06 21:10:57 +00002215 def unblock_host(self, host_id):
showardb18134f2009-03-20 20:52:18 +00002216 logging.info("removing block %s/%s", self.job.id, host_id)
jadmanski0afbb632008-06-06 21:10:57 +00002217 blocks = IneligibleHostQueue.fetch(
2218 'job_id=%d and host_id=%d' % (self.job.id, host_id))
2219 for block in blocks:
2220 block.delete()
mblighe2586682008-02-29 22:45:46 +00002221
2222
showard2bab8f42008-11-12 18:15:22 +00002223 def set_execution_subdir(self, subdir=None):
2224 if subdir is None:
2225 assert self.get_host()
2226 subdir = self.get_host().hostname
2227 self.update_field('execution_subdir', subdir)
mbligh36768f02008-02-22 18:28:33 +00002228
2229
showard6355f6b2008-12-05 18:52:13 +00002230 def _get_hostname(self):
2231 if self.host:
2232 return self.host.hostname
2233 return 'no host'
2234
2235
showard170873e2009-01-07 00:22:26 +00002236 def __str__(self):
2237 return "%s/%d (%d)" % (self._get_hostname(), self.job.id, self.id)
2238
2239
jadmanski0afbb632008-06-06 21:10:57 +00002240 def set_status(self, status):
showardd3dc1992009-04-22 21:01:40 +00002241 self.update_field('status', status)
mblighf8c624d2008-07-03 16:58:45 +00002242
showardb18134f2009-03-20 20:52:18 +00002243 logging.info("%s -> %s", self, self.status)
mblighf8c624d2008-07-03 16:58:45 +00002244
showardc85c21b2008-11-24 22:17:37 +00002245 if status in ['Queued', 'Parsing']:
jadmanski0afbb632008-06-06 21:10:57 +00002246 self.update_field('complete', False)
2247 self.update_field('active', False)
mbligh36768f02008-02-22 18:28:33 +00002248
jadmanski0afbb632008-06-06 21:10:57 +00002249 if status in ['Pending', 'Running', 'Verifying', 'Starting',
showardd3dc1992009-04-22 21:01:40 +00002250 'Gathering']:
jadmanski0afbb632008-06-06 21:10:57 +00002251 self.update_field('complete', False)
2252 self.update_field('active', True)
mbligh36768f02008-02-22 18:28:33 +00002253
showardc85c21b2008-11-24 22:17:37 +00002254 if status in ['Failed', 'Completed', 'Stopped', 'Aborted']:
jadmanski0afbb632008-06-06 21:10:57 +00002255 self.update_field('complete', True)
2256 self.update_field('active', False)
showardc85c21b2008-11-24 22:17:37 +00002257
2258 should_email_status = (status.lower() in _notify_email_statuses or
2259 'all' in _notify_email_statuses)
2260 if should_email_status:
2261 self._email_on_status(status)
2262
2263 self._email_on_job_complete()
2264
2265
2266 def _email_on_status(self, status):
showard6355f6b2008-12-05 18:52:13 +00002267 hostname = self._get_hostname()
showardc85c21b2008-11-24 22:17:37 +00002268
2269 subject = 'Autotest: Job ID: %s "%s" Host: %s %s' % (
2270 self.job.id, self.job.name, hostname, status)
2271 body = "Job ID: %s\nJob Name: %s\nHost: %s\nStatus: %s\n%s\n" % (
2272 self.job.id, self.job.name, hostname, status,
2273 self._view_job_url())
showard170873e2009-01-07 00:22:26 +00002274 email_manager.manager.send_email(self.job.email_list, subject, body)
showard542e8402008-09-19 20:16:18 +00002275
2276
2277 def _email_on_job_complete(self):
showardc85c21b2008-11-24 22:17:37 +00002278 if not self.job.is_finished():
2279 return
showard542e8402008-09-19 20:16:18 +00002280
showardc85c21b2008-11-24 22:17:37 +00002281 summary_text = []
showard6355f6b2008-12-05 18:52:13 +00002282 hosts_queue = HostQueueEntry.fetch('job_id = %s' % self.job.id)
showardc85c21b2008-11-24 22:17:37 +00002283 for queue_entry in hosts_queue:
2284 summary_text.append("Host: %s Status: %s" %
showard6355f6b2008-12-05 18:52:13 +00002285 (queue_entry._get_hostname(),
showardc85c21b2008-11-24 22:17:37 +00002286 queue_entry.status))
2287
2288 summary_text = "\n".join(summary_text)
2289 status_counts = models.Job.objects.get_status_counts(
2290 [self.job.id])[self.job.id]
2291 status = ', '.join('%d %s' % (count, status) for status, count
2292 in status_counts.iteritems())
2293
2294 subject = 'Autotest: Job ID: %s "%s" %s' % (
2295 self.job.id, self.job.name, status)
2296 body = "Job ID: %s\nJob Name: %s\nStatus: %s\n%s\nSummary:\n%s" % (
2297 self.job.id, self.job.name, status, self._view_job_url(),
2298 summary_text)
showard170873e2009-01-07 00:22:26 +00002299 email_manager.manager.send_email(self.job.email_list, subject, body)
mbligh36768f02008-02-22 18:28:33 +00002300
2301
showard89f84db2009-03-12 20:39:13 +00002302 def run(self, assigned_host=None):
2303 if self.meta_host is not None or self.atomic_group_id is not None:
jadmanski0afbb632008-06-06 21:10:57 +00002304 assert assigned_host
2305 # ensure results dir exists for the queue log
jadmanski0afbb632008-06-06 21:10:57 +00002306 self.set_host(assigned_host)
mbligh36768f02008-02-22 18:28:33 +00002307
showardb18134f2009-03-20 20:52:18 +00002308 logging.info("%s/%s/%s scheduled on %s, status=%s",
2309 self.job.name, self.meta_host, self.atomic_group_id,
2310 self.host.hostname, self.status)
mbligh36768f02008-02-22 18:28:33 +00002311
jadmanski0afbb632008-06-06 21:10:57 +00002312 return self.job.run(queue_entry=self)
mblighe2586682008-02-29 22:45:46 +00002313
showard6ae5ea92009-02-25 00:11:51 +00002314
jadmanski0afbb632008-06-06 21:10:57 +00002315 def requeue(self):
2316 self.set_status('Queued')
showardde634ee2009-01-30 01:44:24 +00002317 # verify/cleanup failure sets the execution subdir, so reset it here
2318 self.set_execution_subdir('')
jadmanski0afbb632008-06-06 21:10:57 +00002319 if self.meta_host:
2320 self.set_host(None)
mbligh36768f02008-02-22 18:28:33 +00002321
2322
jadmanski0afbb632008-06-06 21:10:57 +00002323 def handle_host_failure(self):
2324 """\
2325 Called when this queue entry's host has failed verification and
2326 repair.
2327 """
2328 assert not self.meta_host
2329 self.set_status('Failed')
showard2bab8f42008-11-12 18:15:22 +00002330 self.job.stop_if_necessary()
mblighe2586682008-02-29 22:45:46 +00002331
2332
jadmanskif7fa2cc2008-10-01 14:13:23 +00002333 @property
2334 def aborted_by(self):
2335 self._load_abort_info()
2336 return self._aborted_by
2337
2338
2339 @property
2340 def aborted_on(self):
2341 self._load_abort_info()
2342 return self._aborted_on
2343
2344
2345 def _load_abort_info(self):
2346 """ Fetch info about who aborted the job. """
2347 if hasattr(self, "_aborted_by"):
2348 return
2349 rows = _db.execute("""
2350 SELECT users.login, aborted_host_queue_entries.aborted_on
2351 FROM aborted_host_queue_entries
2352 INNER JOIN users
2353 ON users.id = aborted_host_queue_entries.aborted_by_id
2354 WHERE aborted_host_queue_entries.queue_entry_id = %s
2355 """, (self.id,))
2356 if rows:
2357 self._aborted_by, self._aborted_on = rows[0]
2358 else:
2359 self._aborted_by = self._aborted_on = None
2360
2361
showardb2e2c322008-10-14 17:33:55 +00002362 def on_pending(self):
2363 """
2364 Called when an entry in a synchronous job has passed verify. If the
2365 job is ready to run, returns an agent to run the job. Returns None
2366 otherwise.
2367 """
2368 self.set_status('Pending')
showardcfd66a32008-10-15 20:31:48 +00002369 self.get_host().set_status('Pending')
showardb2e2c322008-10-14 17:33:55 +00002370 if self.job.is_ready():
2371 return self.job.run(self)
showard2bab8f42008-11-12 18:15:22 +00002372 self.job.stop_if_necessary()
showardb2e2c322008-10-14 17:33:55 +00002373 return None
2374
2375
showardd3dc1992009-04-22 21:01:40 +00002376 def abort(self, dispatcher):
2377 assert self.aborted and not self.complete
showard1be97432008-10-17 15:30:45 +00002378
showardd3dc1992009-04-22 21:01:40 +00002379 Status = models.HostQueueEntry.Status
2380 has_running_job_agent = (
2381 self.status in (Status.RUNNING, Status.GATHERING, Status.PARSING)
2382 and dispatcher.get_agents_for_entry(self))
2383 if has_running_job_agent:
2384 # do nothing; post-job tasks will finish and then mark this entry
2385 # with status "Aborted" and take care of the host
2386 return
2387
2388 if self.status in (Status.STARTING, Status.PENDING):
2389 self.host.set_status(models.Host.Status.READY)
2390 elif self.status == Status.VERIFYING:
2391 dispatcher.add_agent(Agent(tasks=self.host.reverify_tasks()))
2392
2393 self.set_status(Status.ABORTED)
showard170873e2009-01-07 00:22:26 +00002394
2395 def execution_tag(self):
2396 assert self.execution_subdir
2397 return "%s-%s/%s" % (self.job.id, self.job.owner, self.execution_subdir)
showard1be97432008-10-17 15:30:45 +00002398
2399
mbligh36768f02008-02-22 18:28:33 +00002400class Job(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002401 _table_name = 'jobs'
2402 _fields = ('id', 'owner', 'name', 'priority', 'control_file',
2403 'control_type', 'created_on', 'synch_count', 'timeout',
2404 'run_verify', 'email_list', 'reboot_before', 'reboot_after')
2405
2406
showarda3c58572009-03-12 20:36:59 +00002407 def __init__(self, id=None, row=None, **kwargs):
jadmanski0afbb632008-06-06 21:10:57 +00002408 assert id or row
showarda3c58572009-03-12 20:36:59 +00002409 super(Job, self).__init__(id=id, row=row, **kwargs)
mbligh36768f02008-02-22 18:28:33 +00002410
mblighe2586682008-02-29 22:45:46 +00002411
jadmanski0afbb632008-06-06 21:10:57 +00002412 def is_server_job(self):
2413 return self.control_type != 2
mbligh36768f02008-02-22 18:28:33 +00002414
2415
showard170873e2009-01-07 00:22:26 +00002416 def tag(self):
2417 return "%s-%s" % (self.id, self.owner)
2418
2419
jadmanski0afbb632008-06-06 21:10:57 +00002420 def get_host_queue_entries(self):
2421 rows = _db.execute("""
2422 SELECT * FROM host_queue_entries
2423 WHERE job_id= %s
2424 """, (self.id,))
2425 entries = [HostQueueEntry(row=i) for i in rows]
mbligh36768f02008-02-22 18:28:33 +00002426
jadmanski0afbb632008-06-06 21:10:57 +00002427 assert len(entries)>0
mbligh36768f02008-02-22 18:28:33 +00002428
jadmanski0afbb632008-06-06 21:10:57 +00002429 return entries
mbligh36768f02008-02-22 18:28:33 +00002430
2431
jadmanski0afbb632008-06-06 21:10:57 +00002432 def set_status(self, status, update_queues=False):
2433 self.update_field('status',status)
2434
2435 if update_queues:
2436 for queue_entry in self.get_host_queue_entries():
2437 queue_entry.set_status(status)
mbligh36768f02008-02-22 18:28:33 +00002438
2439
jadmanski0afbb632008-06-06 21:10:57 +00002440 def is_ready(self):
showard2bab8f42008-11-12 18:15:22 +00002441 pending_entries = models.HostQueueEntry.objects.filter(job=self.id,
2442 status='Pending')
2443 return (pending_entries.count() >= self.synch_count)
mbligh36768f02008-02-22 18:28:33 +00002444
2445
jadmanski0afbb632008-06-06 21:10:57 +00002446 def num_machines(self, clause = None):
2447 sql = "job_id=%s" % self.id
2448 if clause:
2449 sql += " AND (%s)" % clause
2450 return self.count(sql, table='host_queue_entries')
mbligh36768f02008-02-22 18:28:33 +00002451
2452
jadmanski0afbb632008-06-06 21:10:57 +00002453 def num_queued(self):
2454 return self.num_machines('not complete')
mbligh36768f02008-02-22 18:28:33 +00002455
2456
jadmanski0afbb632008-06-06 21:10:57 +00002457 def num_active(self):
2458 return self.num_machines('active')
mbligh36768f02008-02-22 18:28:33 +00002459
2460
jadmanski0afbb632008-06-06 21:10:57 +00002461 def num_complete(self):
2462 return self.num_machines('complete')
mbligh36768f02008-02-22 18:28:33 +00002463
2464
jadmanski0afbb632008-06-06 21:10:57 +00002465 def is_finished(self):
showardc85c21b2008-11-24 22:17:37 +00002466 return self.num_complete() == self.num_machines()
mbligh36768f02008-02-22 18:28:33 +00002467
mbligh36768f02008-02-22 18:28:33 +00002468
showard6bb7c292009-01-30 01:44:51 +00002469 def _not_yet_run_entries(self, include_verifying=True):
2470 statuses = [models.HostQueueEntry.Status.QUEUED,
2471 models.HostQueueEntry.Status.PENDING]
2472 if include_verifying:
2473 statuses.append(models.HostQueueEntry.Status.VERIFYING)
2474 return models.HostQueueEntry.objects.filter(job=self.id,
2475 status__in=statuses)
2476
2477
2478 def _stop_all_entries(self):
2479 entries_to_stop = self._not_yet_run_entries(
2480 include_verifying=False)
2481 for child_entry in entries_to_stop:
showard4f9e5372009-01-07 21:33:38 +00002482 assert not child_entry.complete, (
2483 '%s status=%s, active=%s, complete=%s' %
2484 (child_entry.id, child_entry.status, child_entry.active,
2485 child_entry.complete))
showard2bab8f42008-11-12 18:15:22 +00002486 if child_entry.status == models.HostQueueEntry.Status.PENDING:
2487 child_entry.host.status = models.Host.Status.READY
2488 child_entry.host.save()
2489 child_entry.status = models.HostQueueEntry.Status.STOPPED
2490 child_entry.save()
2491
showard2bab8f42008-11-12 18:15:22 +00002492 def stop_if_necessary(self):
showard6bb7c292009-01-30 01:44:51 +00002493 not_yet_run = self._not_yet_run_entries()
showard2bab8f42008-11-12 18:15:22 +00002494 if not_yet_run.count() < self.synch_count:
showard6bb7c292009-01-30 01:44:51 +00002495 self._stop_all_entries()
mblighe2586682008-02-29 22:45:46 +00002496
2497
jadmanski0afbb632008-06-06 21:10:57 +00002498 def write_to_machines_file(self, queue_entry):
2499 hostname = queue_entry.get_host().hostname
showard170873e2009-01-07 00:22:26 +00002500 file_path = os.path.join(self.tag(), '.machines')
2501 _drone_manager.write_lines_to_file(file_path, [hostname])
mbligh36768f02008-02-22 18:28:33 +00002502
2503
showard2bab8f42008-11-12 18:15:22 +00002504 def _next_group_name(self):
2505 query = models.HostQueueEntry.objects.filter(
2506 job=self.id).values('execution_subdir').distinct()
2507 subdirs = (entry['execution_subdir'] for entry in query)
2508 groups = (re.match(r'group(\d+)', subdir) for subdir in subdirs)
2509 ids = [int(match.group(1)) for match in groups if match]
2510 if ids:
2511 next_id = max(ids) + 1
2512 else:
2513 next_id = 0
2514 return "group%d" % next_id
2515
2516
showard170873e2009-01-07 00:22:26 +00002517 def _write_control_file(self, execution_tag):
2518 control_path = _drone_manager.attach_file_to_execution(
2519 execution_tag, self.control_file)
2520 return control_path
mbligh36768f02008-02-22 18:28:33 +00002521
showardb2e2c322008-10-14 17:33:55 +00002522
showard2bab8f42008-11-12 18:15:22 +00002523 def get_group_entries(self, queue_entry_from_group):
2524 execution_subdir = queue_entry_from_group.execution_subdir
showarde788ea62008-11-17 21:02:47 +00002525 return list(HostQueueEntry.fetch(
2526 where='job_id=%s AND execution_subdir=%s',
2527 params=(self.id, execution_subdir)))
showard2bab8f42008-11-12 18:15:22 +00002528
2529
showardb2e2c322008-10-14 17:33:55 +00002530 def _get_autoserv_params(self, queue_entries):
showard170873e2009-01-07 00:22:26 +00002531 assert queue_entries
2532 execution_tag = queue_entries[0].execution_tag()
2533 control_path = self._write_control_file(execution_tag)
jadmanski0afbb632008-06-06 21:10:57 +00002534 hostnames = ','.join([entry.get_host().hostname
2535 for entry in queue_entries])
mbligh36768f02008-02-22 18:28:33 +00002536
showard87ba02a2009-04-20 19:37:32 +00002537 params = _autoserv_command_line(
2538 hostnames, execution_tag,
2539 ['-P', execution_tag, '-n',
2540 _drone_manager.absolute_path(control_path)],
2541 job=self)
mbligh36768f02008-02-22 18:28:33 +00002542
jadmanski0afbb632008-06-06 21:10:57 +00002543 if not self.is_server_job():
2544 params.append('-c')
mbligh36768f02008-02-22 18:28:33 +00002545
showardb2e2c322008-10-14 17:33:55 +00002546 return params
mblighe2586682008-02-29 22:45:46 +00002547
mbligh36768f02008-02-22 18:28:33 +00002548
showardc9ae1782009-01-30 01:42:37 +00002549 def _should_run_cleanup(self, queue_entry):
showard0fc38302008-10-23 00:44:07 +00002550 if self.reboot_before == models.RebootBefore.ALWAYS:
showardc9ae1782009-01-30 01:42:37 +00002551 return True
showard0fc38302008-10-23 00:44:07 +00002552 elif self.reboot_before == models.RebootBefore.IF_DIRTY:
showardc9ae1782009-01-30 01:42:37 +00002553 return queue_entry.get_host().dirty
2554 return False
showard21baa452008-10-21 00:08:39 +00002555
showardc9ae1782009-01-30 01:42:37 +00002556
2557 def _should_run_verify(self, queue_entry):
2558 do_not_verify = (queue_entry.host.protection ==
2559 host_protections.Protection.DO_NOT_VERIFY)
2560 if do_not_verify:
2561 return False
2562 return self.run_verify
2563
2564
2565 def _get_pre_job_tasks(self, queue_entry):
showard21baa452008-10-21 00:08:39 +00002566 tasks = []
showardc9ae1782009-01-30 01:42:37 +00002567 if self._should_run_cleanup(queue_entry):
showard45ae8192008-11-05 19:32:53 +00002568 tasks.append(CleanupTask(queue_entry=queue_entry))
showardc9ae1782009-01-30 01:42:37 +00002569 if self._should_run_verify(queue_entry):
2570 tasks.append(VerifyTask(queue_entry=queue_entry))
2571 tasks.append(SetEntryPendingTask(queue_entry))
showard21baa452008-10-21 00:08:39 +00002572 return tasks
2573
2574
showard2bab8f42008-11-12 18:15:22 +00002575 def _assign_new_group(self, queue_entries):
2576 if len(queue_entries) == 1:
2577 group_name = queue_entries[0].get_host().hostname
2578 else:
2579 group_name = self._next_group_name()
showardb18134f2009-03-20 20:52:18 +00002580 logging.info('Running synchronous job %d hosts %s as %s',
showard2bab8f42008-11-12 18:15:22 +00002581 self.id, [entry.host.hostname for entry in queue_entries],
2582 group_name)
2583
2584 for queue_entry in queue_entries:
2585 queue_entry.set_execution_subdir(group_name)
2586
2587
2588 def _choose_group_to_run(self, include_queue_entry):
2589 chosen_entries = [include_queue_entry]
2590
2591 num_entries_needed = self.synch_count - 1
2592 if num_entries_needed > 0:
2593 pending_entries = HostQueueEntry.fetch(
2594 where='job_id = %s AND status = "Pending" AND id != %s',
2595 params=(self.id, include_queue_entry.id))
2596 chosen_entries += list(pending_entries)[:num_entries_needed]
2597
2598 self._assign_new_group(chosen_entries)
2599 return chosen_entries
2600
2601
2602 def run(self, queue_entry):
showardb2e2c322008-10-14 17:33:55 +00002603 if not self.is_ready():
showardc9ae1782009-01-30 01:42:37 +00002604 queue_entry.set_status(models.HostQueueEntry.Status.VERIFYING)
2605 return Agent(self._get_pre_job_tasks(queue_entry))
mbligh36768f02008-02-22 18:28:33 +00002606
showard2bab8f42008-11-12 18:15:22 +00002607 queue_entries = self._choose_group_to_run(queue_entry)
2608 return self._finish_run(queue_entries)
showardb2e2c322008-10-14 17:33:55 +00002609
2610
2611 def _finish_run(self, queue_entries, initial_tasks=[]):
showardb2ccdda2008-10-28 20:39:05 +00002612 for queue_entry in queue_entries:
2613 queue_entry.set_status('Starting')
showardb2e2c322008-10-14 17:33:55 +00002614 params = self._get_autoserv_params(queue_entries)
2615 queue_task = QueueTask(job=self, queue_entries=queue_entries,
2616 cmd=params)
2617 tasks = initial_tasks + [queue_task]
2618 entry_ids = [entry.id for entry in queue_entries]
2619
showard170873e2009-01-07 00:22:26 +00002620 return Agent(tasks, num_processes=len(queue_entries))
showardb2e2c322008-10-14 17:33:55 +00002621
2622
mbligh36768f02008-02-22 18:28:33 +00002623if __name__ == '__main__':
jadmanski0afbb632008-06-06 21:10:57 +00002624 main()